purged file copies
parent
9dacb2d007
commit
a839beec16
@ -1,240 +0,0 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"sync"
|
||||
_ "fmt"
|
||||
)
|
||||
|
||||
// package will create and maintain a concurrent system structure
|
||||
// allows for multiple readers/writers
|
||||
type DeviceInfo struct {
|
||||
Id uint32
|
||||
Type string
|
||||
Status string
|
||||
Data string
|
||||
TransactionId uint32
|
||||
}
|
||||
|
||||
func NewStatusMonitor(ch chan *DeviceInfo) *StatusMonitor {
|
||||
sm := &StatusMonitor{StatusChan:ch}
|
||||
buf := make(map[uint32]*DeviceInfo)
|
||||
sm.Status.Buffer = buf
|
||||
return sm
|
||||
}
|
||||
|
||||
type InfoStream struct {
|
||||
// used for reactor and device status streams
|
||||
sync.Mutex
|
||||
Reactors chan *DeviceInfo
|
||||
Listener listeners
|
||||
}
|
||||
|
||||
type listeners struct {
|
||||
sync.Mutex
|
||||
s map[uint32] chan *DeviceInfo
|
||||
}
|
||||
|
||||
|
||||
func NewInfoStream() *InfoStream {
|
||||
dch := make(chan *DeviceInfo)
|
||||
lis := new(listeners)
|
||||
s := &InfoStream{Stream:dch}
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *InfoStream) Start() {
|
||||
// consistency
|
||||
go s.Listener()
|
||||
}
|
||||
// goal is to hook every new manager into the reactor status chan
|
||||
func (s *InfoStream) AddSender() *StatusMonitor {
|
||||
sm := NewStatusMonitor(s.Stream) // give the channel we monitor for incoming updates
|
||||
return sm
|
||||
}
|
||||
|
||||
func (s *InfoStream) AddListener(id uint32) *StatusMonitor {
|
||||
s.Listener.Lock()
|
||||
defer s.Listener.Unlock()
|
||||
dChan := make(chan *DeviceInfo)
|
||||
sm := NewStatusMonitor(dChan)
|
||||
s.Listener.s[id] = dChan
|
||||
return sm
|
||||
}
|
||||
|
||||
func (s *InfoStream) Listener() {
|
||||
for {
|
||||
deviceInfo <-s.Stream
|
||||
go s.Echo(deviceInfo) // going to send copy for now because f it
|
||||
}
|
||||
}
|
||||
|
||||
func (s *InfoStream) Echo(d *DeviceInfo) {
|
||||
s.Listener.Lock()
|
||||
defer s.Listener.Unlock()
|
||||
for _,tm := range s.Listner.s {
|
||||
go func() {
|
||||
tm.StatusChan <-d
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// status buffer maintaince
|
||||
type StatusMonitor struct {
|
||||
// serve as base to embed in managers to send/receive reactor info
|
||||
TransactionID chan uint // monotonically increases to track outdated reqs
|
||||
StatusChan chan *DeviceInfo // sending reactor info in same fmt
|
||||
Status *sb //struct for storing and updating reactors
|
||||
}
|
||||
|
||||
type sb struct {
|
||||
sync.Mutex
|
||||
buffer map[uint32]*DeviceInfo
|
||||
}
|
||||
|
||||
type DevAcks struct {
|
||||
Id uint32
|
||||
TransactionId int
|
||||
}
|
||||
|
||||
func (s *StatusMonitor) StartLis(devInfo []*DeviceInfo) {
|
||||
s.Status.Lock()
|
||||
defer s.Status.Unlock()
|
||||
for _,dev := range devInfo {
|
||||
s.Status.Buffer[dev.Id] = dev
|
||||
}
|
||||
go s.Listen()
|
||||
}
|
||||
|
||||
func (s *StatusMonitor) Listen() {
|
||||
// long running to handle incoming reqs
|
||||
for {
|
||||
status := <-s.StatusChan
|
||||
go s.UpdateBuffer(status)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *StatusMonitor) UpdateBuffer(dev *DeviceInfo) {
|
||||
s.Status.Lock()
|
||||
defer s.Status.Unlock()
|
||||
if _, ok := s.Status.Buffer[dev.Id]; !ok {
|
||||
s.Status.Buffer[dev.Id] = dev
|
||||
} else if tid := s.Status.Buffer[dev.Id].TransactionId; tid < dev.TransactionId {
|
||||
s.Status.Buffer[dev.Id] = dev
|
||||
} // only other case is we recieved outdated update and we can safely discard
|
||||
}
|
||||
|
||||
func (s *StatusMonitor) GetBuffer() []*DeviceInfo{
|
||||
// get the buffer for client req and clear it for mow
|
||||
s.Status.Lock()
|
||||
defer s.Status.Unlock()
|
||||
devs := []*DeviceInfo{}
|
||||
for _, dev := range s.Status.Buffer {
|
||||
devs = append(devs, dev)
|
||||
delete(s.Status.Buffer,dev.Id)
|
||||
}
|
||||
return devs
|
||||
}
|
||||
|
||||
func (s *StatusMonitor) EmptyBuffer(da []DevAcks) {
|
||||
s.Status.Lock()
|
||||
defer s.Status.Unlock()
|
||||
for _, d := range da {
|
||||
if d.TransactionId == s.Status.Buffer[d.Id].TransactionId {
|
||||
// we acked a dev in buffer
|
||||
// not going to check > because how would that even happen
|
||||
delete(s.Status.Buffer, d.Id)
|
||||
} // other case is our buffer has a newer entry (or older but again how)
|
||||
}
|
||||
}
|
||||
|
||||
// should be all the logic for actually monitoring a chan
|
||||
// shouldnt matter whether it is sending reactor status or device status per reactor
|
||||
// sender only needs to pass value onto chan
|
||||
|
||||
func (s *StatusMonitor) ReactorStart() {
|
||||
go s.GenerateIds()
|
||||
}
|
||||
|
||||
func (s *StatusMonitor) GenerateIds() {
|
||||
var id uint
|
||||
id = 0
|
||||
for {
|
||||
s.TransactionId <-id
|
||||
id += 1
|
||||
}
|
||||
}
|
||||
|
||||
func (s *StatusMonitor) Send(d *DeviceInfo) {
|
||||
d.TransactionId = <-s.TransactionId
|
||||
s.StatusChan <-d
|
||||
}
|
||||
|
||||
// now to tie it all together
|
||||
|
||||
type SystemViewer struct {
|
||||
// stores system directory and provide methods to be embedded in managers
|
||||
ReactorStream *InfoStream // can add itself as a listener to provide methods
|
||||
DeviceStream ds
|
||||
}
|
||||
|
||||
type ds struct {
|
||||
Reactors map[uint32]*InfoStream //map from reactor id to its device info stream
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
func NewSystemViewer () *SystemViewer {
|
||||
rs := NewInfoStream()
|
||||
s := &SystemViewer{ReactorStream:rs}
|
||||
ds := make(map[uint32]*InfoStream)
|
||||
s.DeviceStream = ds
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *SystemViewer) AddReactorListener(clientId uint32) *StatusMonitor {
|
||||
return s.ReactorStream.AddListener(clientId)
|
||||
}
|
||||
|
||||
func (s *SystemViewer) AddReactorSender() *StatusMonitor {
|
||||
return s.ReactorStream.AddSender()
|
||||
}
|
||||
|
||||
func (s *SystemViewer) AddDeviceListener(reactorId, clientId uint32) *StatusMonitor {
|
||||
s.DeviceStream.Lock()
|
||||
defer s.DeviceStream.Unlock()
|
||||
var ds *InfoStream
|
||||
var ok bool
|
||||
if ds, ok = s.DeviceStream.Reactors[reactorId]; !ok {
|
||||
ds = NewInfoStream()
|
||||
s.DeviceStream.Reactors[reactorId] = ds
|
||||
}
|
||||
return ds.AddListener(clientId)
|
||||
}
|
||||
|
||||
func (s *SystemViewer) DeviceSender(reactorId uint32) *StatusMonitor {
|
||||
s.DeviceStream.Lock()
|
||||
defer s.DeviceStream.Unlock()
|
||||
var ds *InfoStream
|
||||
var ok bool
|
||||
if ds, ok = s.DeviceStream.Reactors[reactorId]; !ok {
|
||||
ds = newInfoStream()
|
||||
s.DeviceStream.Reactors[reactorId] = ds
|
||||
}
|
||||
return ds.AddSender()
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue