added efficient sends reactor side, in process of adding listener struct for tui manager

main
Keegan 3 years ago
parent f8b9324a0d
commit 5022c7e0bd

@ -4,6 +4,7 @@ import (
"log" "log"
"fmt" "fmt"
"os" "os"
"errors"
"time" "time"
"strconv" "strconv"
) )
@ -41,9 +42,17 @@ func init() {
debugVerbosity = getVerbosity() debugVerbosity = getVerbosity()
debugStart = time.Now() debugStart = time.Now()
if debugVerbosity > 0 { if debugVerbosity > 0 {
path := "log/"
if _, err := os.Stat(path); errors.Is(err, os.ErrNotExist) {
err := os.Mkdir(path, os.ModePerm)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
}
filename := time.Now().Format("01-02T15:04:05") filename := time.Now().Format("01-02T15:04:05")
filename += ".log" filename += ".log"
f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0664) f, err := os.OpenFile(path+filename, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0664)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }

@ -108,20 +108,36 @@ func (r *ReactorManager) Monitor(conn *grpc.ClientConn) {
for _, d := range r.Devs { for _, d := range r.Devs {
newd := d newd := d
newd.Status = "[yellow]UNKOWN[white]" newd.Status = "[yellow]UNKOWN[white]"
r.Devs[newd.Id] = newd
go r.DevsMon.Send(newd) go r.DevsMon.Send(newd)
} }
r.devstatus.Unlock() r.devstatus.Unlock()
r.Exit() r.Exit()
break; break;
} }
r.devstatus.Lock() //r.devstatus.Lock()
for _,v := range resp.GetDevices() { for _,v := range resp.GetDevices() {
d := &DeviceInfo{Id:uint32(v.GetAddr()),Type:v.GetType(),Status:v.GetStatus(),Data:v.GetData()} d := &DeviceInfo{Id:uint32(v.GetAddr()),Type:v.GetType(),Status:v.GetStatus(),Data:v.GetData()}
go r.DevsMon.Send(d) go r.UpdateDevice(d)
r.Devs[d.Id] = d //go r.DevsMon.Send(d)
//r.Devs[d.Id] = d
} }
r.devstatus.Unlock() //r.devstatus.Unlock()
logging.Debug(logging.DPing, "RMA %v Reactor Reached", r.Id) logging.Debug(logging.DPing, "RMA %v Reactor Reached", r.Id)
time.Sleep(r.Hb) // time between sensor pings time.Sleep(r.Hb) // time between sensor pings
} }
} }
func (r *ReactorManager) UpdateDevice(d *DeviceInfo) {
r.devstatus.Lock()
defer r.devstatus.Unlock()
if olddev, ok := r.Devs[d.Id]; !ok {
// new device
r.Devs[d.Id] = d
go r.DevsMon.Send(d)
} else if olddev.Status != d.Status || olddev.Data != d.Data {
// dev status or data has changed
r.Devs[d.Id] = d
go r.DevsMon.Send(d)
}
}

@ -3,6 +3,7 @@ package server
import ( import (
"sync" "sync"
_ "fmt" _ "fmt"
"FRMS/internal/pkg/logging"
) )
// package will create and maintain a concurrent system structure // package will create and maintain a concurrent system structure
@ -23,8 +24,9 @@ func NewStatusMonitor(ch chan *DeviceInfo) *StatusMonitor {
} }
type InfoStream struct { type InfoStream struct {
// used for reactor and device status streams // base stream for any reactor/device
sync.Mutex // NewSender embedds the channel to send updates on
// NewListener will add the statusmon to the list of devs to echo to
Stream chan *DeviceInfo Stream chan *DeviceInfo
Layout *syslayout Layout *syslayout
} }
@ -32,7 +34,7 @@ type InfoStream struct {
type syslayout struct { type syslayout struct {
Devs map[uint32]*DeviceInfo Devs map[uint32]*DeviceInfo
uint32 //index uint32 //index
sync.Mutex sync.RWMutex
} }
func NewInfoStream() *InfoStream { func NewInfoStream() *InfoStream {
@ -56,32 +58,48 @@ func (s *InfoStream) AddSender() *StatusMonitor {
func (s *InfoStream) Listener() { func (s *InfoStream) Listener() {
for { for {
deviceInfo := <-s.Stream deviceInfo := <-s.Stream
go s.Layout.Update(deviceInfo) go s.Update(deviceInfo)
} }
} }
func (s *syslayout) Update(d *DeviceInfo) { func (s *InfoStream) Update(d *DeviceInfo) {
s.Lock() s.Layout.Lock()
defer s.Unlock() if dev, ok := s.Layout.Devs[d.Id]; !ok {
// set index to be kept for the entire time the dev exists d.Index = s.Layout.uint32
if dev, ok := s.Devs[d.Id]; !ok { s.Layout.uint32 += 1
d.Index = s.uint32
s.uint32 += 1
} else { } else {
d.Index = dev.Index d.Index = dev.Index
} }
s.Devs[d.Id] = d s.Layout.Devs[d.Id] = d
go s.Echo(d)
} }
func (s *InfoStream) GetLayout() map[uint32]*DeviceInfo { func (s *InfoStream) Echo(d *DeviceInfo) {
s.Layout.Lock() s.Listeners.RLock()
defer s.Layout.Unlock() defer s.Listeners.RUnlock()
return s.Layout.Devs // read only lock
for _, lis := range s.Listeners {
go func(){
lis <-d
}()
} }
}
func (s *InfoStream) AddListner(ch chan *DeviceInfo) {
s.Listeners.Lock()
defer s.Listeners.Unlock()
s.Listeners = append(s.Listeners, ch)
}
//func (s *InfoStream) GetLayout() map[uint32]*DeviceInfo {
//s.Layout.RLock()
//defer s.Layout.RUnlock()
//return s.Layout.Devs
//}
// status buffer maintaince // status buffer maintaince
type StatusMonitor struct { type StatusMonitor struct {
// serve as base to embed in managers to send/receive reactor info // serve as base to embed in managers to send/receive device info
TransactionId chan uint32 // monotonically increases to track outdated reqs TransactionId chan uint32 // monotonically increases to track outdated reqs
StatusChan chan *DeviceInfo // sending reactor info in same fmt StatusChan chan *DeviceInfo // sending reactor info in same fmt
} }
@ -101,6 +119,7 @@ func (s *StatusMonitor) GenerateIds() {
func (s *StatusMonitor) Send(d *DeviceInfo) { func (s *StatusMonitor) Send(d *DeviceInfo) {
d.TransactionId = <-s.TransactionId d.TransactionId = <-s.TransactionId
logging.Debug(logging.DClient,"SYS 01 Updating %s Info (%s, %s)", d.Type, d.Status, d.Data)
s.StatusChan <-d s.StatusChan <-d
} }

Loading…
Cancel
Save