package server import ( "sync" _ "fmt" "FRMS/internal/pkg/logging" ) // allows for multiple readers/writers type DeviceInfo struct { Id uint32 Type string Status string Data string Index uint32 TransactionId uint32 } type StatusMonitor struct { // allows for embedding into managers TransactionId chan uint32 // monotonically increases to track outdated reqs DevChan chan *DeviceInfo // channel for device status ReactorChan chan *DeviceInfo // channel for reactor status *SystemViewer DevBuf *devbuf sync.Mutex } type devbuf struct { ReactorId uint32 // reactor we are looking at, if any Buf map[string]map[uint32]*DeviceInfo // convienent way to store/seperate device data sync.Mutex } func NewBuffer() map[string]map[uint32]*DeviceInfo { rbuf := make(map[uint32]*DeviceInfo) dbuf := make(map[uint32]*DeviceInfo) sbuf := make(map[string]map[uint32]*DeviceInfo) sbuf["Reactor"] = rbuf sbuf["Device"] = dbuf return sbuf } func NewStatusMonitor(t string, id uint32, sys *SystemViewer) *StatusMonitor { tid := make(chan uint32) sm := &StatusMonitor{TransactionId:tid} sm.SystemViewer = sys if t == "Reactor" { // reactor status monitor sm.ReactorChan = sys.AddReactorSender() sm.DevChan = sys.AddDeviceSender(id) go sm.GenerateIds() } else { // tui status monitor sbuf := NewBuffer() //sm.ReactorChan, sbuf["Reactor"] = sys.AddListener(id,0) sm.DevBuf = &devbuf{Buf:sbuf} // makes it easier to work with go sm.UpdateListener(id,0) } return sm } func (s *StatusMonitor) GenerateIds() { var id uint32 id = 0 for { s.TransactionId <-id id += 1 } } func (s *StatusMonitor) Send(d *DeviceInfo, dtype string) { d.TransactionId = <-s.TransactionId logging.Debug(logging.DClient,"SYS 01 Sending update for: %s (%s)", d.Type, d.Status) if dtype == "Reactor" { s.ReactorChan <-d } else { s.DevChan <-d } } func (s *StatusMonitor) GetBuffer() []*DeviceInfo { // also clears buffer s.DevBuf.Lock() defer s.DevBuf.Unlock() res := []*DeviceInfo{} if len(s.DevBuf.Buf["Reactor"]) != 0 || len(s.DevBuf.Buf["Device"]) != 0 { logging.Debug(logging.DClient,"Clearing buff %v", s.DevBuf.Buf) } for _, devs := range s.DevBuf.Buf { for _, dev := range devs { // loops over reactors then devices res = append(res,dev) } } s.DevBuf.Buf = NewBuffer() // clearing old buffer return res } func (s *StatusMonitor) UpdateListener(tid, reactorId uint32) { s.DevBuf.Lock() defer s.DevBuf.Unlock() // clearing proper buffer if reactorId == 0 { logging.Debug(logging.DClient,"SYS 01 Adding %v as reactor listener", tid) s.DevBuf.Buf["Reactor"] = make(map[uint32]*DeviceInfo) s.ReactorChan, s.DevBuf.Buf["Reactor"] = s.SystemViewer.AddListener(tid, reactorId) go s.Listen(s.ReactorChan) } else { logging.Debug(logging.DClient,"SYS 01 Adding %v as reactor %v listener", tid, reactorId) s.DevBuf.Buf["Device"] = make(map[uint32]*DeviceInfo) // clearing old devices if s.DevBuf.ReactorId != reactorId && s.DevBuf.ReactorId != 0{ go s.SystemViewer.RemoveListener(s.DevBuf.ReactorId, tid) } s.DevBuf.ReactorId = reactorId s.DevChan, s.DevBuf.Buf["Device"] = s.SystemViewer.AddListener(tid, reactorId) go s.Listen(s.DevChan) } } func (s *StatusMonitor) UpdateBuffer(d *DeviceInfo, dtype string, ch chan *DeviceInfo) { s.DevBuf.Lock() defer s.DevBuf.Unlock() logging.Debug(logging.DClient,"SYS 01 Dev %v update requested", d) if dev, exists := s.DevBuf.Buf[dtype][d.Id]; exists { // already a device in the buffer if dev.TransactionId > d.TransactionId { logging.Debug(logging.DClient,"SYS 01 Update Processed. Old: %v, New: %v \n", dev, d) d = dev // not sure if i can do this lol } } if ch == s.ReactorChan || ch == s.DevChan { // hacky way to check if the device came from a listener of a current channel s.DevBuf.Buf[dtype][d.Id] = d } else { logging.Debug(logging.DClient,"Dev out of date!") } } func (s *StatusMonitor) Listen(ch chan *DeviceInfo) { for dev := range ch { if dev.Type == "Reactor" { go s.UpdateBuffer(dev,"Reactor", ch) } else { go s.UpdateBuffer(dev, "Device", ch) } } } type InfoStream struct { // base stream for any reactor/device // NewSender embedds the channel to send updates on // NewListener will add the statusmon to the list of devs to echo to Stream chan *DeviceInfo Layout *syslayout *listeners } type listeners struct { sync.RWMutex Listeners map[uint32]*lischan } type lischan struct { sync.WaitGroup StatusChan chan *DeviceInfo } type syslayout struct { Devs map[uint32]*DeviceInfo uint32 //index sync.RWMutex } func NewLisChan(ch chan *DeviceInfo) *lischan { l := &lischan{StatusChan:ch} return l } func NewInfoStream() *InfoStream { dch := make(chan *DeviceInfo) s := &InfoStream{Stream:dch} m := make(map[uint32]*DeviceInfo) s.Layout = &syslayout{Devs:m} s.listeners = &listeners{Listeners:make(map[uint32]*lischan)} return s } func (s *InfoStream) Start() { // consistency go s.Listen() } // goal is to hook every new manager into the reactor status chan func (s *InfoStream) AddSender() chan *DeviceInfo { return s.Stream } func (s *InfoStream) Listen() { for deviceInfo := range s.Stream { go s.Update(deviceInfo) } } func (s *InfoStream) Update(d *DeviceInfo) { s.Layout.Lock() defer s.Layout.Unlock() if dev, ok := s.Layout.Devs[d.Id]; !ok { d.Index = s.Layout.uint32 s.Layout.uint32 += 1 } else { d.Index = dev.Index } s.Layout.Devs[d.Id] = d go s.Echo(d) } func (l *listeners) Echo(d *DeviceInfo) { l.RLock() defer l.RUnlock() // read only lock for _, lis := range l.Listeners { lis.Add(1) go func(listener *lischan, dev *DeviceInfo){ defer listener.Done() listener.StatusChan <-dev }(lis,d) } } func (s *InfoStream) AddListener(id uint32, ch chan *DeviceInfo) map[uint32]*DeviceInfo { // if i get a memory leak ill eat my shoe s.listeners.Lock() defer s.listeners.Unlock() if _, ok := s.listeners.Listeners[id]; ok { // exists go s.RemoveListener(id) } s.listeners.Listeners[id] = NewLisChan(ch) logging.Debug(logging.DClient,"SYS 01 Getting Devices") //s.Layout.RLock() //defer s.Layout.RUnlock() logging.Debug(logging.DClient,"SYS 01 Returning Devices %v", s.Layout.Devs) return s.Layout.Devs } func (l *listeners) RemoveListener(id uint32) { l.Lock() defer l.Unlock() if lis, ok := l.Listeners[id]; ok { delete(l.Listeners,id) go func(ls *lischan){ ls.Wait() close(ls.StatusChan) }(lis) } } // status buffer maintaince type SystemViewer struct { // stores system directory and provide methods to be embedded in managers ReactorStream *InfoStream // can add itself as a listener to provide methods DeviceStream *ds } type ds struct { Reactors map[uint32]*InfoStream //map from reactor id to its device info stream sync.Mutex } func NewSystemViewer() *SystemViewer { rs := NewInfoStream() s := &SystemViewer{ReactorStream:rs} m := make(map[uint32]*InfoStream) s.DeviceStream = &ds{Reactors:m} return s } func (s *SystemViewer) Start() { go s.ReactorStream.Start() } func (s *SystemViewer) AddReactorSender() chan *DeviceInfo { return s.ReactorStream.AddSender() } func (s *SystemViewer) AddDeviceSender(reactorId uint32) chan *DeviceInfo { s.DeviceStream.Lock() defer s.DeviceStream.Unlock() var ds *InfoStream var ok bool if ds, ok = s.DeviceStream.Reactors[reactorId]; !ok { ds = NewInfoStream() s.DeviceStream.Reactors[reactorId] = ds go ds.Start() } return ds.AddSender() } func (s *SystemViewer) AddListener(id, rid uint32) (chan *DeviceInfo, map[uint32]*DeviceInfo) { // returns a listener for that chan ch := make(chan *DeviceInfo) if rid != 0 { return ch, s.DeviceStream.Reactors[rid].AddListener(id, ch) } else { return ch, s.ReactorStream.AddListener(id, ch) } } func (s *SystemViewer) RemoveListener(rid, tid uint32) { // removes chan for specific tid and rid s.DeviceStream.Lock() defer s.DeviceStream.Unlock() go s.DeviceStream.Reactors[rid].RemoveListener(tid) }