You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
316 lines
8.7 KiB
Go
316 lines
8.7 KiB
Go
package server
|
|
|
|
import (
|
|
"sync"
|
|
_ "fmt"
|
|
"FRMS/internal/pkg/logging"
|
|
)
|
|
// allows for multiple readers/writers
|
|
type DeviceInfo struct {
|
|
Id uint32
|
|
Type string
|
|
Status string
|
|
Data string
|
|
Index uint32
|
|
TransactionId uint32
|
|
}
|
|
|
|
type StatusMonitor struct {
|
|
// allows for embedding into managers
|
|
TransactionId chan uint32 // monotonically increases to track outdated reqs
|
|
DevChan chan *DeviceInfo // channel for device status
|
|
ReactorChan chan *DeviceInfo // channel for reactor status
|
|
*SystemViewer
|
|
DevBuf *devbuf
|
|
sync.Mutex
|
|
}
|
|
|
|
type devbuf struct {
|
|
ReactorId uint32 // reactor we are looking at, if any
|
|
Buf map[string]map[uint32]*DeviceInfo // convienent way to store/seperate device data
|
|
sync.Mutex
|
|
}
|
|
|
|
func NewBuffer() map[string]map[uint32]*DeviceInfo {
|
|
rbuf := make(map[uint32]*DeviceInfo)
|
|
dbuf := make(map[uint32]*DeviceInfo)
|
|
sbuf := make(map[string]map[uint32]*DeviceInfo)
|
|
sbuf["Reactor"] = rbuf
|
|
sbuf["Device"] = dbuf
|
|
return sbuf
|
|
}
|
|
|
|
func NewStatusMonitor(t string, id uint32, sys *SystemViewer) *StatusMonitor {
|
|
tid := make(chan uint32)
|
|
sm := &StatusMonitor{TransactionId:tid}
|
|
sm.SystemViewer = sys
|
|
logging.Debug(logging.DClient,"SYS Creating new status monitor")
|
|
if t == "Reactor" {
|
|
// reactor status monitor
|
|
sm.ReactorChan = sys.AddReactorSender()
|
|
sm.DevChan = sys.AddDeviceSender(id)
|
|
go sm.GenerateIds()
|
|
} else {
|
|
// tui status monitor
|
|
sbuf := NewBuffer()
|
|
//sm.ReactorChan, sbuf["Reactor"] = sys.AddListener(id,0)
|
|
sm.DevBuf = &devbuf{Buf:sbuf} // makes it easier to work with
|
|
go sm.UpdateListener(id,0)
|
|
}
|
|
return sm
|
|
}
|
|
|
|
func (s *StatusMonitor) GenerateIds() {
|
|
var id uint32
|
|
id = 0
|
|
for {
|
|
s.TransactionId <-id
|
|
id += 1
|
|
}
|
|
}
|
|
|
|
func (s *StatusMonitor) Send(d *DeviceInfo, dtype string) {
|
|
d.TransactionId = <-s.TransactionId
|
|
logging.Debug(logging.DClient,"SYS 01 Sending update for: %s (%s)", d.Type, d.Status)
|
|
if dtype == "Reactor" {
|
|
s.ReactorChan <-d
|
|
} else {
|
|
s.DevChan <-d
|
|
}
|
|
}
|
|
|
|
func (s *StatusMonitor) GetBuffer() []*DeviceInfo {
|
|
// also clears buffer
|
|
s.DevBuf.Lock()
|
|
defer s.DevBuf.Unlock()
|
|
res := []*DeviceInfo{}
|
|
if len(s.DevBuf.Buf["Reactor"]) != 0 || len(s.DevBuf.Buf["Device"]) != 0 {
|
|
logging.Debug(logging.DClient,"Clearing buff %v", s.DevBuf.Buf)
|
|
}
|
|
for _, devs := range s.DevBuf.Buf {
|
|
for _, dev := range devs {
|
|
// loops over reactors then devices
|
|
res = append(res,dev)
|
|
}
|
|
}
|
|
s.DevBuf.Buf = NewBuffer() // clearing old buffer
|
|
return res
|
|
}
|
|
|
|
func (s *StatusMonitor) UpdateListener(tid, reactorId uint32) {
|
|
s.DevBuf.Lock()
|
|
defer s.DevBuf.Unlock()
|
|
// clearing proper buffer
|
|
if reactorId == 0 {
|
|
logging.Debug(logging.DClient,"SYS 01 Adding %v as reactor listener", tid)
|
|
s.DevBuf.Buf["Reactor"] = make(map[uint32]*DeviceInfo)
|
|
s.ReactorChan, s.DevBuf.Buf["Reactor"] = s.SystemViewer.AddListener(tid, reactorId)
|
|
go s.Listen(s.ReactorChan)
|
|
} else {
|
|
logging.Debug(logging.DClient,"SYS 01 Adding %v as reactor %v listener", tid, reactorId)
|
|
s.DevBuf.Buf["Device"] = make(map[uint32]*DeviceInfo) // clearing old devices
|
|
if s.DevBuf.ReactorId != reactorId && s.DevBuf.ReactorId != 0{
|
|
go s.SystemViewer.RemoveListener(s.DevBuf.ReactorId, tid)
|
|
}
|
|
s.DevBuf.ReactorId = reactorId
|
|
s.DevChan, s.DevBuf.Buf["Device"] = s.SystemViewer.AddListener(tid, reactorId)
|
|
go s.Listen(s.DevChan)
|
|
}
|
|
}
|
|
|
|
func (s *StatusMonitor) UpdateBuffer(d *DeviceInfo, dtype string, ch chan *DeviceInfo) {
|
|
s.DevBuf.Lock()
|
|
defer s.DevBuf.Unlock()
|
|
logging.Debug(logging.DClient,"SYS 01 Dev %v update requested", d)
|
|
if dev, exists := s.DevBuf.Buf[dtype][d.Id]; exists {
|
|
// already a device in the buffer
|
|
if dev.TransactionId > d.TransactionId {
|
|
logging.Debug(logging.DClient,"SYS 01 Update Processed. Old: %v, New: %v \n", dev, d)
|
|
d = dev // not sure if i can do this lol
|
|
}
|
|
}
|
|
if ch == s.ReactorChan || ch == s.DevChan {
|
|
// hacky way to check if the device came from a listener of a current channel
|
|
s.DevBuf.Buf[dtype][d.Id] = d
|
|
} else {
|
|
logging.Debug(logging.DClient,"Dev out of date!")
|
|
}
|
|
}
|
|
|
|
func (s *StatusMonitor) Listen(ch chan *DeviceInfo) {
|
|
for dev := range ch {
|
|
if dev.Type == "Reactor" {
|
|
go s.UpdateBuffer(dev,"Reactor", ch)
|
|
} else {
|
|
go s.UpdateBuffer(dev, "Device", ch)
|
|
}
|
|
}
|
|
}
|
|
|
|
type InfoStream struct {
|
|
// base stream for any reactor/device
|
|
// NewSender embedds the channel to send updates on
|
|
// NewListener will add the statusmon to the list of devs to echo to
|
|
Stream chan *DeviceInfo
|
|
Layout *syslayout
|
|
*listeners
|
|
}
|
|
|
|
type listeners struct {
|
|
sync.RWMutex
|
|
Listeners map[uint32]*lischan
|
|
}
|
|
|
|
type lischan struct {
|
|
sync.WaitGroup
|
|
StatusChan chan *DeviceInfo
|
|
}
|
|
|
|
type syslayout struct {
|
|
Devs map[uint32]*DeviceInfo
|
|
uint32 //index
|
|
sync.RWMutex
|
|
}
|
|
|
|
func NewLisChan(ch chan *DeviceInfo) *lischan {
|
|
l := &lischan{StatusChan:ch}
|
|
return l
|
|
}
|
|
|
|
func NewInfoStream() *InfoStream {
|
|
dch := make(chan *DeviceInfo)
|
|
s := &InfoStream{Stream:dch}
|
|
m := make(map[uint32]*DeviceInfo)
|
|
s.Layout = &syslayout{Devs:m}
|
|
s.listeners = &listeners{Listeners:make(map[uint32]*lischan)}
|
|
return s
|
|
}
|
|
|
|
func (s *InfoStream) Start() {
|
|
// consistency
|
|
go s.Listen()
|
|
}
|
|
// goal is to hook every new manager into the reactor status chan
|
|
func (s *InfoStream) AddSender() chan *DeviceInfo {
|
|
return s.Stream
|
|
}
|
|
|
|
func (s *InfoStream) Listen() {
|
|
for deviceInfo := range s.Stream {
|
|
go s.Update(deviceInfo)
|
|
}
|
|
}
|
|
|
|
func (s *InfoStream) Update(d *DeviceInfo) {
|
|
s.Layout.Lock()
|
|
defer s.Layout.Unlock()
|
|
if dev, ok := s.Layout.Devs[d.Id]; !ok {
|
|
d.Index = s.Layout.uint32
|
|
s.Layout.uint32 += 1
|
|
} else {
|
|
d.Index = dev.Index
|
|
}
|
|
s.Layout.Devs[d.Id] = d
|
|
go s.Echo(d)
|
|
}
|
|
|
|
func (l *listeners) Echo(d *DeviceInfo) {
|
|
l.RLock()
|
|
defer l.RUnlock()
|
|
// read only lock
|
|
for _, lis := range l.Listeners {
|
|
lis.Add(1)
|
|
go func(listener *lischan, dev *DeviceInfo){
|
|
defer listener.Done()
|
|
listener.StatusChan <-dev
|
|
}(lis,d)
|
|
}
|
|
}
|
|
|
|
func (s *InfoStream) AddListener(id uint32, ch chan *DeviceInfo) map[uint32]*DeviceInfo {
|
|
// if i get a memory leak ill eat my shoe
|
|
s.listeners.Lock()
|
|
defer s.listeners.Unlock()
|
|
if _, ok := s.listeners.Listeners[id]; ok {
|
|
// exists
|
|
go s.RemoveListener(id)
|
|
}
|
|
s.listeners.Listeners[id] = NewLisChan(ch)
|
|
logging.Debug(logging.DClient,"SYS 01 Getting Devices")
|
|
//s.Layout.RLock()
|
|
//defer s.Layout.RUnlock()
|
|
logging.Debug(logging.DClient,"SYS 01 Returning Devices %v", s.Layout.Devs)
|
|
return s.Layout.Devs
|
|
}
|
|
|
|
func (l *listeners) RemoveListener(id uint32) {
|
|
l.Lock()
|
|
defer l.Unlock()
|
|
if lis, ok := l.Listeners[id]; ok {
|
|
delete(l.Listeners,id)
|
|
go func(ls *lischan){
|
|
ls.Wait()
|
|
close(ls.StatusChan)
|
|
}(lis)
|
|
}
|
|
}
|
|
|
|
// status buffer maintaince
|
|
|
|
type SystemViewer struct {
|
|
// stores system directory and provide methods to be embedded in managers
|
|
ReactorStream *InfoStream // can add itself as a listener to provide methods
|
|
DeviceStream *ds
|
|
}
|
|
|
|
type ds struct {
|
|
Reactors map[uint32]*InfoStream //map from reactor id to its device info stream
|
|
sync.Mutex
|
|
}
|
|
|
|
func NewSystemViewer() *SystemViewer {
|
|
rs := NewInfoStream()
|
|
s := &SystemViewer{ReactorStream:rs}
|
|
m := make(map[uint32]*InfoStream)
|
|
s.DeviceStream = &ds{Reactors:m}
|
|
return s
|
|
}
|
|
|
|
func (s *SystemViewer) Start() {
|
|
go s.ReactorStream.Start()
|
|
}
|
|
|
|
func (s *SystemViewer) AddReactorSender() chan *DeviceInfo {
|
|
return s.ReactorStream.AddSender()
|
|
}
|
|
|
|
func (s *SystemViewer) AddDeviceSender(reactorId uint32) chan *DeviceInfo {
|
|
s.DeviceStream.Lock()
|
|
defer s.DeviceStream.Unlock()
|
|
var ds *InfoStream
|
|
var ok bool
|
|
if ds, ok = s.DeviceStream.Reactors[reactorId]; !ok {
|
|
ds = NewInfoStream()
|
|
s.DeviceStream.Reactors[reactorId] = ds
|
|
go ds.Start()
|
|
}
|
|
return ds.AddSender()
|
|
}
|
|
|
|
func (s *SystemViewer) AddListener(id, rid uint32) (chan *DeviceInfo, map[uint32]*DeviceInfo) {
|
|
// returns a listener for that chan
|
|
ch := make(chan *DeviceInfo)
|
|
if rid != 0 {
|
|
return ch, s.DeviceStream.Reactors[rid].AddListener(id, ch)
|
|
} else {
|
|
return ch, s.ReactorStream.AddListener(id, ch)
|
|
}
|
|
}
|
|
|
|
func (s *SystemViewer) RemoveListener(rid, tid uint32) {
|
|
// removes chan for specific tid and rid
|
|
s.DeviceStream.Lock()
|
|
defer s.DeviceStream.Unlock()
|
|
go s.DeviceStream.Reactors[rid].RemoveListener(tid)
|
|
}
|