From 5022c7e0bd7a2f769b28e9446dd0ba437a369548 Mon Sep 17 00:00:00 2001 From: Keegan Date: Mon, 11 Jul 2022 21:08:20 +0000 Subject: [PATCH] added efficient sends reactor side, in process of adding listener struct for tui manager --- internal/pkg/logging/logging.go | 11 +++++- internal/pkg/server/reactormanager.go | 24 ++++++++++-- internal/pkg/server/system.go | 53 ++++++++++++++++++--------- 3 files changed, 66 insertions(+), 22 deletions(-) diff --git a/internal/pkg/logging/logging.go b/internal/pkg/logging/logging.go index 44002c1..aad238a 100644 --- a/internal/pkg/logging/logging.go +++ b/internal/pkg/logging/logging.go @@ -4,6 +4,7 @@ import ( "log" "fmt" "os" + "errors" "time" "strconv" ) @@ -41,9 +42,17 @@ func init() { debugVerbosity = getVerbosity() debugStart = time.Now() if debugVerbosity > 0 { + path := "log/" + if _, err := os.Stat(path); errors.Is(err, os.ErrNotExist) { + err := os.Mkdir(path, os.ModePerm) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + } filename := time.Now().Format("01-02T15:04:05") filename += ".log" - f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0664) + f, err := os.OpenFile(path+filename, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0664) if err != nil { log.Fatal(err) } diff --git a/internal/pkg/server/reactormanager.go b/internal/pkg/server/reactormanager.go index a12776e..be3b57b 100644 --- a/internal/pkg/server/reactormanager.go +++ b/internal/pkg/server/reactormanager.go @@ -108,20 +108,36 @@ func (r *ReactorManager) Monitor(conn *grpc.ClientConn) { for _, d := range r.Devs { newd := d newd.Status = "[yellow]UNKOWN[white]" + r.Devs[newd.Id] = newd go r.DevsMon.Send(newd) } r.devstatus.Unlock() r.Exit() break; } - r.devstatus.Lock() + //r.devstatus.Lock() for _,v := range resp.GetDevices() { d := &DeviceInfo{Id:uint32(v.GetAddr()),Type:v.GetType(),Status:v.GetStatus(),Data:v.GetData()} - go r.DevsMon.Send(d) - r.Devs[d.Id] = d + go r.UpdateDevice(d) + //go r.DevsMon.Send(d) + //r.Devs[d.Id] = d } - r.devstatus.Unlock() + //r.devstatus.Unlock() logging.Debug(logging.DPing, "RMA %v Reactor Reached", r.Id) time.Sleep(r.Hb) // time between sensor pings } } + +func (r *ReactorManager) UpdateDevice(d *DeviceInfo) { + r.devstatus.Lock() + defer r.devstatus.Unlock() + if olddev, ok := r.Devs[d.Id]; !ok { + // new device + r.Devs[d.Id] = d + go r.DevsMon.Send(d) + } else if olddev.Status != d.Status || olddev.Data != d.Data { + // dev status or data has changed + r.Devs[d.Id] = d + go r.DevsMon.Send(d) + } +} diff --git a/internal/pkg/server/system.go b/internal/pkg/server/system.go index a9d2d5d..f9eab08 100644 --- a/internal/pkg/server/system.go +++ b/internal/pkg/server/system.go @@ -3,6 +3,7 @@ package server import ( "sync" _ "fmt" + "FRMS/internal/pkg/logging" ) // package will create and maintain a concurrent system structure @@ -23,8 +24,9 @@ func NewStatusMonitor(ch chan *DeviceInfo) *StatusMonitor { } type InfoStream struct { - // used for reactor and device status streams - sync.Mutex + // base stream for any reactor/device + // NewSender embedds the channel to send updates on + // NewListener will add the statusmon to the list of devs to echo to Stream chan *DeviceInfo Layout *syslayout } @@ -32,7 +34,7 @@ type InfoStream struct { type syslayout struct { Devs map[uint32]*DeviceInfo uint32 //index - sync.Mutex + sync.RWMutex } func NewInfoStream() *InfoStream { @@ -56,32 +58,48 @@ func (s *InfoStream) AddSender() *StatusMonitor { func (s *InfoStream) Listener() { for { deviceInfo := <-s.Stream - go s.Layout.Update(deviceInfo) + go s.Update(deviceInfo) } } -func (s *syslayout) Update(d *DeviceInfo) { - s.Lock() - defer s.Unlock() - // set index to be kept for the entire time the dev exists - if dev, ok := s.Devs[d.Id]; !ok { - d.Index = s.uint32 - s.uint32 += 1 +func (s *InfoStream) Update(d *DeviceInfo) { + s.Layout.Lock() + if dev, ok := s.Layout.Devs[d.Id]; !ok { + d.Index = s.Layout.uint32 + s.Layout.uint32 += 1 } else { d.Index = dev.Index } - s.Devs[d.Id] = d + s.Layout.Devs[d.Id] = d + go s.Echo(d) +} + +func (s *InfoStream) Echo(d *DeviceInfo) { + s.Listeners.RLock() + defer s.Listeners.RUnlock() + // read only lock + for _, lis := range s.Listeners { + go func(){ + lis <-d + }() + } } -func (s *InfoStream) GetLayout() map[uint32]*DeviceInfo { - s.Layout.Lock() - defer s.Layout.Unlock() - return s.Layout.Devs +func (s *InfoStream) AddListner(ch chan *DeviceInfo) { + s.Listeners.Lock() + defer s.Listeners.Unlock() + s.Listeners = append(s.Listeners, ch) } +//func (s *InfoStream) GetLayout() map[uint32]*DeviceInfo { + //s.Layout.RLock() + //defer s.Layout.RUnlock() + //return s.Layout.Devs +//} + // status buffer maintaince type StatusMonitor struct { - // serve as base to embed in managers to send/receive reactor info + // serve as base to embed in managers to send/receive device info TransactionId chan uint32 // monotonically increases to track outdated reqs StatusChan chan *DeviceInfo // sending reactor info in same fmt } @@ -101,6 +119,7 @@ func (s *StatusMonitor) GenerateIds() { func (s *StatusMonitor) Send(d *DeviceInfo) { d.TransactionId = <-s.TransactionId + logging.Debug(logging.DClient,"SYS 01 Updating %s Info (%s, %s)", d.Type, d.Status, d.Data) s.StatusChan <-d }