You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

315 lines
8.7 KiB
Go

package server
import (
"sync"
_ "fmt"
"FRMS/internal/pkg/logging"
)
// allows for multiple readers/writers
type DeviceInfo struct {
Id uint32
Type string
Status string
Data string
Index uint32
TransactionId uint32
}
type StatusMonitor struct {
// allows for embedding into managers
TransactionId chan uint32 // monotonically increases to track outdated reqs
DevChan chan *DeviceInfo // channel for device status
ReactorChan chan *DeviceInfo // channel for reactor status
*SystemViewer
DevBuf *devbuf
sync.Mutex
}
type devbuf struct {
ReactorId uint32 // reactor we are looking at, if any
Buf map[string]map[uint32]*DeviceInfo // convienent way to store/seperate device data
sync.Mutex
}
func NewBuffer() map[string]map[uint32]*DeviceInfo {
rbuf := make(map[uint32]*DeviceInfo)
dbuf := make(map[uint32]*DeviceInfo)
sbuf := make(map[string]map[uint32]*DeviceInfo)
sbuf["Reactor"] = rbuf
sbuf["Device"] = dbuf
return sbuf
}
func NewStatusMonitor(t string, id uint32, sys *SystemViewer) *StatusMonitor {
tid := make(chan uint32)
sm := &StatusMonitor{TransactionId:tid}
sm.SystemViewer = sys
if t == "Reactor" {
// reactor status monitor
sm.ReactorChan = sys.AddReactorSender()
sm.DevChan = sys.AddDeviceSender(id)
go sm.GenerateIds()
} else {
// tui status monitor
sbuf := NewBuffer()
//sm.ReactorChan, sbuf["Reactor"] = sys.AddListener(id,0)
sm.DevBuf = &devbuf{Buf:sbuf} // makes it easier to work with
go sm.UpdateListener(id,0)
}
return sm
}
func (s *StatusMonitor) GenerateIds() {
var id uint32
id = 0
for {
s.TransactionId <-id
id += 1
}
}
func (s *StatusMonitor) Send(d *DeviceInfo, dtype string) {
d.TransactionId = <-s.TransactionId
logging.Debug(logging.DClient,"SYS 01 Sending update for: %s (%s)", d.Type, d.Status)
if dtype == "Reactor" {
s.ReactorChan <-d
} else {
s.DevChan <-d
}
}
func (s *StatusMonitor) GetBuffer() []*DeviceInfo {
// also clears buffer
s.DevBuf.Lock()
defer s.DevBuf.Unlock()
res := []*DeviceInfo{}
if len(s.DevBuf.Buf["Reactor"]) != 0 || len(s.DevBuf.Buf["Device"]) != 0 {
logging.Debug(logging.DClient,"Clearing buff %v", s.DevBuf.Buf)
}
for _, devs := range s.DevBuf.Buf {
for _, dev := range devs {
// loops over reactors then devices
res = append(res,dev)
}
}
s.DevBuf.Buf = NewBuffer() // clearing old buffer
return res
}
func (s *StatusMonitor) UpdateListener(tid, reactorId uint32) {
s.DevBuf.Lock()
defer s.DevBuf.Unlock()
// clearing proper buffer
if reactorId == 0 {
logging.Debug(logging.DClient,"SYS 01 Adding %v as reactor listener", tid)
s.DevBuf.Buf["Reactor"] = make(map[uint32]*DeviceInfo)
s.ReactorChan, s.DevBuf.Buf["Reactor"] = s.SystemViewer.AddListener(tid, reactorId)
go s.Listen(s.ReactorChan)
} else {
logging.Debug(logging.DClient,"SYS 01 Adding %v as reactor %v listener", tid, reactorId)
s.DevBuf.Buf["Device"] = make(map[uint32]*DeviceInfo) // clearing old devices
if s.DevBuf.ReactorId != reactorId && s.DevBuf.ReactorId != 0{
go s.SystemViewer.RemoveListener(s.DevBuf.ReactorId, tid)
}
s.DevBuf.ReactorId = reactorId
s.DevChan, s.DevBuf.Buf["Device"] = s.SystemViewer.AddListener(tid, reactorId)
go s.Listen(s.DevChan)
}
}
func (s *StatusMonitor) UpdateBuffer(d *DeviceInfo, dtype string, ch chan *DeviceInfo) {
s.DevBuf.Lock()
defer s.DevBuf.Unlock()
logging.Debug(logging.DClient,"SYS 01 Dev %v update requested", d)
if dev, exists := s.DevBuf.Buf[dtype][d.Id]; exists {
// already a device in the buffer
if dev.TransactionId > d.TransactionId {
logging.Debug(logging.DClient,"SYS 01 Update Processed. Old: %v, New: %v \n", dev, d)
d = dev // not sure if i can do this lol
}
}
if ch == s.ReactorChan || ch == s.DevChan {
// hacky way to check if the device came from a listener of a current channel
s.DevBuf.Buf[dtype][d.Id] = d
} else {
logging.Debug(logging.DClient,"Dev out of date!")
}
}
func (s *StatusMonitor) Listen(ch chan *DeviceInfo) {
for dev := range ch {
if dev.Type == "Reactor" {
go s.UpdateBuffer(dev,"Reactor", ch)
} else {
go s.UpdateBuffer(dev, "Device", ch)
}
}
}
type InfoStream struct {
// base stream for any reactor/device
// NewSender embedds the channel to send updates on
// NewListener will add the statusmon to the list of devs to echo to
Stream chan *DeviceInfo
Layout *syslayout
*listeners
}
type listeners struct {
sync.RWMutex
Listeners map[uint32]*lischan
}
type lischan struct {
sync.WaitGroup
StatusChan chan *DeviceInfo
}
type syslayout struct {
Devs map[uint32]*DeviceInfo
uint32 //index
sync.RWMutex
}
func NewLisChan(ch chan *DeviceInfo) *lischan {
l := &lischan{StatusChan:ch}
return l
}
func NewInfoStream() *InfoStream {
dch := make(chan *DeviceInfo)
s := &InfoStream{Stream:dch}
m := make(map[uint32]*DeviceInfo)
s.Layout = &syslayout{Devs:m}
s.listeners = &listeners{Listeners:make(map[uint32]*lischan)}
return s
}
func (s *InfoStream) Start() {
// consistency
go s.Listen()
}
// goal is to hook every new manager into the reactor status chan
func (s *InfoStream) AddSender() chan *DeviceInfo {
return s.Stream
}
func (s *InfoStream) Listen() {
for deviceInfo := range s.Stream {
go s.Update(deviceInfo)
}
}
func (s *InfoStream) Update(d *DeviceInfo) {
s.Layout.Lock()
defer s.Layout.Unlock()
if dev, ok := s.Layout.Devs[d.Id]; !ok {
d.Index = s.Layout.uint32
s.Layout.uint32 += 1
} else {
d.Index = dev.Index
}
s.Layout.Devs[d.Id] = d
go s.Echo(d)
}
func (l *listeners) Echo(d *DeviceInfo) {
l.RLock()
defer l.RUnlock()
// read only lock
for _, lis := range l.Listeners {
lis.Add(1)
go func(listener *lischan, dev *DeviceInfo){
defer listener.Done()
listener.StatusChan <-dev
}(lis,d)
}
}
func (s *InfoStream) AddListener(id uint32, ch chan *DeviceInfo) map[uint32]*DeviceInfo {
// if i get a memory leak ill eat my shoe
s.listeners.Lock()
defer s.listeners.Unlock()
if _, ok := s.listeners.Listeners[id]; ok {
// exists
go s.RemoveListener(id)
}
s.listeners.Listeners[id] = NewLisChan(ch)
logging.Debug(logging.DClient,"SYS 01 Getting Devices")
//s.Layout.RLock()
//defer s.Layout.RUnlock()
logging.Debug(logging.DClient,"SYS 01 Returning Devices %v", s.Layout.Devs)
return s.Layout.Devs
}
func (l *listeners) RemoveListener(id uint32) {
l.Lock()
defer l.Unlock()
if lis, ok := l.Listeners[id]; ok {
delete(l.Listeners,id)
go func(ls *lischan){
ls.Wait()
close(ls.StatusChan)
}(lis)
}
}
// status buffer maintaince
type SystemViewer struct {
// stores system directory and provide methods to be embedded in managers
ReactorStream *InfoStream // can add itself as a listener to provide methods
DeviceStream *ds
}
type ds struct {
Reactors map[uint32]*InfoStream //map from reactor id to its device info stream
sync.Mutex
}
func NewSystemViewer() *SystemViewer {
rs := NewInfoStream()
s := &SystemViewer{ReactorStream:rs}
m := make(map[uint32]*InfoStream)
s.DeviceStream = &ds{Reactors:m}
return s
}
func (s *SystemViewer) Start() {
go s.ReactorStream.Start()
}
func (s *SystemViewer) AddReactorSender() chan *DeviceInfo {
return s.ReactorStream.AddSender()
}
func (s *SystemViewer) AddDeviceSender(reactorId uint32) chan *DeviceInfo {
s.DeviceStream.Lock()
defer s.DeviceStream.Unlock()
var ds *InfoStream
var ok bool
if ds, ok = s.DeviceStream.Reactors[reactorId]; !ok {
ds = NewInfoStream()
s.DeviceStream.Reactors[reactorId] = ds
go ds.Start()
}
return ds.AddSender()
}
func (s *SystemViewer) AddListener(id, rid uint32) (chan *DeviceInfo, map[uint32]*DeviceInfo) {
// returns a listener for that chan
ch := make(chan *DeviceInfo)
if rid != 0 {
return ch, s.DeviceStream.Reactors[rid].AddListener(id, ch)
} else {
return ch, s.ReactorStream.AddListener(id, ch)
}
}
func (s *SystemViewer) RemoveListener(rid, tid uint32) {
// removes chan for specific tid and rid
s.DeviceStream.Lock()
defer s.DeviceStream.Unlock()
go s.DeviceStream.Reactors[rid].RemoveListener(tid)
}