From 42ce886114cb6fe956578892eac8398d7af66c19 Mon Sep 17 00:00:00 2001 From: KeeganForelight Date: Wed, 18 Jan 2023 16:56:06 -0500 Subject: [PATCH] working downstream --- internal/notes/daily/1-18-23.md | 42 +++++ internal/pkg/I2C/bus.go | 2 +- internal/pkg/controller/manager.go | 1 + internal/pkg/controller/pwm.go | 7 + internal/pkg/device/coordinator.go | 154 ++++++++++++++++-- internal/pkg/device/manager.go | 40 ++--- internal/pkg/device/mappings.go | 8 +- internal/pkg/manager/manager.go | 4 + .../{rlcoordinator.go => coordinator.go} | 58 ++++--- internal/pkg/reactor/devices.go | 100 ------------ internal/pkg/reactor/monitoring.go | 27 --- internal/pkg/sensor/atlas.go | 17 +- internal/pkg/sensor/do_sensor.go | 14 +- internal/pkg/sensor/manager.go | 8 + internal/pkg/sensor/ph_sensor.go | 14 +- internal/pkg/sensor/rtd_sensor.go | 14 +- internal/pkg/server/reactormanager.go | 148 ++++++++--------- 17 files changed, 384 insertions(+), 274 deletions(-) rename internal/pkg/reactor/{rlcoordinator.go => coordinator.go} (84%) delete mode 100644 internal/pkg/reactor/devices.go delete mode 100644 internal/pkg/reactor/monitoring.go diff --git a/internal/notes/daily/1-18-23.md b/internal/notes/daily/1-18-23.md index 1bab68e..14b1286 100644 --- a/internal/notes/daily/1-18-23.md +++ b/internal/notes/daily/1-18-23.md @@ -1,13 +1,55 @@ ### Planning **Monitoring Changes** + I want to refactor the reactor stuff to be less method oriented as far as data collection. For example, the monitoring stuff is all about events that happen pretty infrequently. It makes sense to then use a channel on the device side to just feed relevant status updates back to the reactor. I think that this makes the most sense because this will synchronize updates and leverage the rarity of events to cut down on errant calls. - pros - less repitive method calls needed + - less device locking - localize the information to different packages - cons - extra memory for channels and duplicate storage info - could just remove status from dm? +**New Idea** + +I can leverage wireguard to do server-> reactor connections even beyond the testing phase + +Changes: +1) move device coordinator into device package +2) expose relevant methods to reactor interface +3) clarify individual package responsibilities +4) add stuff server side to create/destroy grpc connections as the information is rendered client side + - this might be scuffed but oh well + +### Package Separation +**Reactor** +- coordinator + - creates initial link to the server + - creates database client + - creates and starts a device coordinator + +**Device** +- coordinator + - searches i2c bus for connected devices + - spins up managers to control the connected devices + - relays information back up to the reactor coordinator +- manager + - control over singular device + - has the core information that will be needed across any type of device (name, status, address etc) +- sub-manager + - fine grained struct with methods specific to the device + +**Server** + +Going to ignore for now because I am lazy +- central coordinator starts up database connection config etc +- reactor coordinator + + + + + + ### TODO **Monitoring Changes** diff --git a/internal/pkg/I2C/bus.go b/internal/pkg/I2C/bus.go index 1f63f03..08d5ab4 100644 --- a/internal/pkg/I2C/bus.go +++ b/internal/pkg/I2C/bus.go @@ -23,7 +23,7 @@ type I2CClient struct { sync.Mutex } -func NewI2CClient(config *viper.Viper) (*I2CClient, error) { +func NewClient(config *viper.Viper) (*I2CClient, error) { var err error var bus int client := &I2CClient{} diff --git a/internal/pkg/controller/manager.go b/internal/pkg/controller/manager.go index 4131fad..fffd651 100644 --- a/internal/pkg/controller/manager.go +++ b/internal/pkg/controller/manager.go @@ -10,6 +10,7 @@ import ( type Manager interface { Start() error Exit() error + IsActive() int } func NewManager(max int) Manager { diff --git a/internal/pkg/controller/pwm.go b/internal/pkg/controller/pwm.go index b65cad1..90805ca 100644 --- a/internal/pkg/controller/pwm.go +++ b/internal/pkg/controller/pwm.go @@ -3,7 +3,10 @@ package controller // do sensor and methods import ( + "fmt" "sync" + + "github.com/spf13/viper" ) type PWMManager struct { @@ -26,6 +29,10 @@ func (m *PWMManager) GetFrequency() (int, error) { return m.Frequency, nil } +func (m *PWMManager) LoadConfig(config *viper.Viper, key string) { + fmt.Printf("config\n") +} + func (m *PWMManager) GetDefaultName() string { return "pwm controller" } diff --git a/internal/pkg/device/coordinator.go b/internal/pkg/device/coordinator.go index aeb32e1..425abc9 100644 --- a/internal/pkg/device/coordinator.go +++ b/internal/pkg/device/coordinator.go @@ -1,25 +1,147 @@ package device -// serves as a server side device coordinator to sync +import ( + "FRMS/internal/pkg/I2C" + pb "FRMS/internal/pkg/grpc" + "FRMS/internal/pkg/manager" + "fmt" + "sync" + "time" -// assume has a server connection from DM -// STEPS -// 1) client loads web page -// 2) DC pushes what it has to the client -// 3) requests DM for what it doesnt -// 4) DM responds -// 5) DC forwards responses to client + "github.com/spf13/viper" +) -// What can happen -// - Client can push something to client -// - Client requests info -// - If this happens, we can push it to client when we get it. Just need to know it was requests +// Created by rlc to manage devices +// basic manager to embed +type Manager interface { + Start() error + Exit() error + // create a heartbeat to send to chan at intervals + HeartBeat(chan struct{}, int, int, time.Duration) +} + +func NewManager() Manager { + // dont need timeout functionality + return manager.New(0) +} + +// I2C client for locking +type I2CClient interface { + // i2c client w/ locking + GetConnected() (map[int]bool, error) // gets connected addresses + SendCmd(int, string) (string, error) +} + +func NewI2CClient(config *viper.Viper) (I2CClient, error) { + return I2C.NewClient(config) +} + +// device coordinator itself type DeviceCoordinator struct { - NameChan chan string - StatusChan chan string + I2C I2CClient + Manager + Config *viper.Viper + + managersMu sync.RWMutex + Managers map[int]*DeviceManager +} + +func NewCoordinator(config *viper.Viper) *DeviceCoordinator { + dm := make(map[int]*DeviceManager) + m := NewManager() + c := &DeviceCoordinator{ + Manager: m, + Managers: dm, + Config: config, + } + return c +} + +func (c *DeviceCoordinator) Start() error { + var err error + + if err = c.Manager.Start(); err != nil { + return err + } + + if c.I2C, err = NewI2CClient(c.Config); err != nil { + return err + } + go c.Monitor() + return err } -func ClientSetName() { - // pass client set names to DM +func (c *DeviceCoordinator) Monitor() { + // monitor I2C for new devices + ch := make(chan struct{}) + go c.HeartBeat(ch, 10, 0, time.Second) + + for range ch { + // on notification (10s) + devs, err := c.I2C.GetConnected() + if err != nil { + panic(err) + } + // update list + go c.UpdateManagers(devs) + } +} + +func (c *DeviceCoordinator) UpdateManagers(active map[int]bool) { + // updates managers + c.managersMu.Lock() + defer c.managersMu.Unlock() + + for addr, dm := range c.Managers { + _, ok := active[addr] + + if ok && dm.IsActive() == 0 { + // active and dm not + if err := dm.Start(); err != nil { + panic(err) + } + } else if dm.IsActive() == 1 { + // not active and dm is + if err := dm.Exit(); err != nil { + panic(err) + } + } + // remove from map + delete(active, addr) + } + + for addr, _ := range active { + // no manager, create one + fmt.Printf("New device %d!\n", addr) + + dm, err := NewDeviceManager(addr, c.Config, "", c.I2C) + if err != nil { + panic(err) + } + + if err := dm.Start(); err != nil { + panic(err) + } + + c.Managers[addr] = dm + } +} + +func (c *DeviceCoordinator) GetDeviceInfo() ([]*pb.Device, error) { + // gets device info for monitoring + c.managersMu.RLock() + defer c.managersMu.RUnlock() + + var devices []*pb.Device + + for addr, dm := range c.Managers { + // looping over devices + devices = append(devices, &pb.Device{ + Addr: int32(addr), + Status: pb.Status(dm.IsActive()), + }) + } + + return devices, nil } diff --git a/internal/pkg/device/manager.go b/internal/pkg/device/manager.go index 098eb34..6acd6fd 100644 --- a/internal/pkg/device/manager.go +++ b/internal/pkg/device/manager.go @@ -10,7 +10,9 @@ import ( type SubManager interface { Start(int) error Exit() error + IsActive() int String() string // printing info about the sub manager + LoadConfig(*viper.Viper, string) // for config bs GetDefaultName() string @@ -23,23 +25,22 @@ type NameChan struct { // base device manager type DeviceManager struct { - SubManager `mapstructure:",squash"` + SubManager // across controllers/sensors - Address int `mapstructure:"address"` + Address int `mapstructure:"address"` + Name string `mapstructure:"name"` infoMu sync.RWMutex - Name string `mapstructure:"name"` - Status string // easier to remember Config *viper.Viper ConfigPrefix string } -func NewDeviceManager(addr int, config *viper.Viper, configPrefix string) (*DeviceManager, error) { +func NewDeviceManager(addr int, config *viper.Viper, configPrefix string, i2c I2CClient) (*DeviceManager, error) { // validate prefix - s, err := NewSubManager(addr) + s, err := NewSubManager(addr, i2c) dm := &DeviceManager{ SubManager: s, @@ -61,26 +62,25 @@ func (m *DeviceManager) LoadConfig() error { } m.Config.UnmarshalKey(mainKey, m) + m.SubManager.LoadConfig(m.Config, mainKey) return nil } func (m *DeviceManager) Start() error { - return m.SubManager.Start(m.Address) -} + // load config and then start + var err error -func (m *DeviceManager) UpdateStatus(status string) error { - // updates device status - m.infoMu.Lock() - defer m.infoMu.Unlock() - m.Status = status - return nil -} + // load config + if err = m.LoadConfig(); err != nil { + return err + } + + // start + if err = m.SubManager.Start(m.Address); err != nil { + return err + } -func (m *DeviceManager) GetStatus() string { - // updates device status - m.infoMu.RLock() - defer m.infoMu.RUnlock() - return m.Status + return err } // dev info grpc handlers diff --git a/internal/pkg/device/mappings.go b/internal/pkg/device/mappings.go index e2cb7a2..8553505 100644 --- a/internal/pkg/device/mappings.go +++ b/internal/pkg/device/mappings.go @@ -11,20 +11,20 @@ import ( Returns the correct manager for sensor/controller */ -func NewSubManager(addr int) (SubManager, error) { +func NewSubManager(addr int, i2c I2CClient) (SubManager, error) { // returns correct device manager by ID var m SubManager var err error switch addr { case 97: // DO - m = sensor.NewDOManager() + m = sensor.NewDOManager(i2c) case 99: // pH - m = sensor.NewPHManager() + m = sensor.NewPHManager(i2c) case 102: // RTD - m = sensor.NewRTDManager() + m = sensor.NewRTDManager(i2c) case 256: // PWM m = controller.NewPWMManager() diff --git a/internal/pkg/manager/manager.go b/internal/pkg/manager/manager.go index b2d0eba..ecdd3a0 100644 --- a/internal/pkg/manager/manager.go +++ b/internal/pkg/manager/manager.go @@ -50,6 +50,10 @@ func (m *Manager) Exit() error { return errors.New("Manager not active!") } +func (m *Manager) IsActive() int { + return int(atomic.LoadInt32(&m.Active)) +} + // Heartbeat tracker func (m *Manager) HeartBeat(ping chan struct{}, hb, interval int, scale time.Duration) { diff --git a/internal/pkg/reactor/rlcoordinator.go b/internal/pkg/reactor/coordinator.go similarity index 84% rename from internal/pkg/reactor/rlcoordinator.go rename to internal/pkg/reactor/coordinator.go index 91e245c..b07a5f7 100644 --- a/internal/pkg/reactor/rlcoordinator.go +++ b/internal/pkg/reactor/coordinator.go @@ -3,7 +3,7 @@ package reactor // file describes reactor level coordinator and associated implementation import ( - "FRMS/internal/pkg/I2C" + "FRMS/internal/pkg/device" pb "FRMS/internal/pkg/grpc" "FRMS/internal/pkg/influxdb" "FRMS/internal/pkg/logging" @@ -31,7 +31,7 @@ func NewManager(max int) Manager { return manager.New(max) } -// db client interface +// db client type DBClient interface { // Start() error @@ -41,14 +41,15 @@ func NewDBClient(config *viper.Viper) (DBClient, error) { return influxdb.NewDBClient(config) } -type I2CClient interface { - // simple client to push responsibilites to sensor - GetConnected() (map[int]bool, error) // gets all connected addr - SendCmd(int, string) (string, error) // send cmd, string is return +// device coordinator +type DeviceCoordinator interface { + Start() error + // in grpc format + GetDeviceInfo() ([]*pb.Device, error) } -func NewI2CClient(config *viper.Viper) (I2CClient, error) { - return I2C.NewI2CClient(config) +func NewDeviceCoordinator(config *viper.Viper) DeviceCoordinator { + return device.NewCoordinator(config) } type Server struct { @@ -71,11 +72,10 @@ type ReactorCoordinator struct { Info `mapstructure:",squash"` Database DBClient - I2C I2CClient - MonitoringClient pb.MonitoringClient // grpc + pb.MonitoringClient // grpc embedding - *DeviceCoordinator // struct for locking + DeviceCoordinator // struct for locking Err chan error } @@ -111,12 +111,12 @@ func (c *ReactorCoordinator) Start() { c.Err <- err } - // loading clients - if c.Database, err = NewDBClient(c.Config); err != nil { + if err = c.DeviceCoordinator.Start(); err != nil { c.Err <- err } - if c.I2C, err = NewI2CClient(c.Config); err != nil { + // loading clients + if c.Database, err = NewDBClient(c.Config); err != nil { c.Err <- err } @@ -168,12 +168,7 @@ func (c *ReactorCoordinator) Monitor() { for range ch { // check devs and ping logging.Debug(logging.DClient, "RLC Pinging server") - // this can probably be offloaded - active, err := c.I2C.GetConnected() - if err != nil { - c.Err <- err - } - go c.DeviceCoordinator.UpdateDevices(c.Config, c.I2C, active) + // ping central server with status go c.Ping() } } @@ -230,3 +225,26 @@ func (c *ReactorCoordinator) Connect(ip string, port int) (*grpc.ClientConn, err } return conn, nil } + +func (c *ReactorCoordinator) Ping() { + // send device info to central coordinator + fmt.Printf("Pinging server\n") + + var devices []*pb.Device + var err error + + if devices, err = c.GetDeviceInfo(); err != nil { + c.Err <- err + } + + // create request + req := &pb.ReactorStatusPing{ + Id: int32(c.ID), + Devices: devices, + } + + // ping server + if _, err = c.ReactorStatusHandler(context.Background(), req); err != nil { + c.Err <- err + } +} diff --git a/internal/pkg/reactor/devices.go b/internal/pkg/reactor/devices.go deleted file mode 100644 index 4f5ba01..0000000 --- a/internal/pkg/reactor/devices.go +++ /dev/null @@ -1,100 +0,0 @@ -package reactor - -import ( - "FRMS/internal/pkg/device" - pb "FRMS/internal/pkg/grpc" - "sync" - - "github.com/spf13/viper" -) - -type DeviceManager interface { - Start() error - Exit() error - GetStatus() string - LoadConfig() error -} - -func NewDeviceManager(addr int, config *viper.Viper, prefix string) (DeviceManager, error) { - return device.NewDeviceManager(addr, config, prefix) -} - -type DeviceCoordinator struct { - Config *viper.Viper - Managers map[int]DeviceManager - sync.RWMutex -} - -func NewDeviceCoordinator(config *viper.Viper) *DeviceCoordinator { - dm := &DeviceCoordinator{Config: config} - dm.Managers = make(map[int]DeviceManager) - return dm -} - -func (c *DeviceCoordinator) UpdateDevices(config *viper.Viper, i2c I2CClient, active map[int]bool) error { - // update device list - - c.Lock() - defer c.Unlock() - - for addr, _ := range active { - // loop over devs - if _, ok := c.Managers[addr]; !ok { - // no device, creating one - - dm, err := NewDeviceManager(addr, c.Config, "") - if err != nil { - return err - } - - // starting - if err = dm.Start(); err != nil { - return err - } - - // loading config - if err = dm.LoadConfig(); err != nil { - return err - } - - // update entry - c.Managers[addr] = dm - } - } - // all devs accounted for - // I can rework this to rely on individual devices to keep track of status and only need above - // for addr, dm := range c.Managers { - // if active[addr] { - // // active - // if dm.IsActive() != 1 { - // err = dm.Start() - // } - // } else { - // if dm.IsActive() != 0 { - // err = dm.Exit() - // } - // } - // if err != nil { - // return err - // } - // } - return nil -} - -func (c *DeviceCoordinator) GetDevices() ([]*pb.Device, error) { - c.RLock() - defer c.RUnlock() - - var err error - var devices []*pb.Device - - for addr, dm := range c.Managers { - status := pb.Status(pb.Status_value[dm.GetStatus()]) - devices = append(devices, &pb.Device{ - Addr: int32(addr), - Status: status, - }) - } - - return devices, err -} diff --git a/internal/pkg/reactor/monitoring.go b/internal/pkg/reactor/monitoring.go deleted file mode 100644 index 76c2dcf..0000000 --- a/internal/pkg/reactor/monitoring.go +++ /dev/null @@ -1,27 +0,0 @@ -package reactor - -import ( - "context" - "fmt" - - //"FRMS/internal/pkg/logging" - //"google.golang.org/grpc" - pb "FRMS/internal/pkg/grpc" -) - -// implements grpc handler and device data aggregater handler -// grpc status update handler -func (c *ReactorCoordinator) Ping() { - // sends all device status to central coordinator - fmt.Printf("Pinging coordinator\n") - // get devices - devices, err := c.GetDevices() - if err != nil { - c.Err <- err - } - - req := &pb.ReactorStatusPing{Id: int32(c.ID), Devices: devices} - if _, err := c.MonitoringClient.ReactorStatusHandler(context.Background(), req); err != nil { - c.Err <- err - } -} diff --git a/internal/pkg/sensor/atlas.go b/internal/pkg/sensor/atlas.go index c753cec..ed71d1d 100644 --- a/internal/pkg/sensor/atlas.go +++ b/internal/pkg/sensor/atlas.go @@ -2,6 +2,7 @@ package sensor import ( "errors" + "strconv" "time" ) @@ -15,8 +16,8 @@ type Atlas struct { // helper struct to embedd I2C I2CClient // delays unmarshalled - CalDelay int `mapstructure:"cal"` - ReadDelay int `mapstructure:"read"` + CalDelay int `mapstructure:"cal_delay"` + ReadDelay int `mapstructure:"read_delay"` } func (a *Atlas) Calibrate(addr int, cal string) error { @@ -42,10 +43,18 @@ func (a *Atlas) Read(addr int) (float32, error) { //fmt.Printf("Error writing! %s", err) return 0, err } + if a.ReadDelay == 0 { + return 0, errors.New("Read Delay unset, please check config") + } sleep := time.Duration(a.ReadDelay) * time.Millisecond time.Sleep(sleep) // sleep between reads - //data, err := a.I2C.SendCmd(addr, "") - return 0, nil + data, err := a.I2C.SendCmd(addr, "") + if err != nil { + return 0, err + } + + f, err := strconv.ParseFloat(data, 32) + return float32(f), err } // for config diff --git a/internal/pkg/sensor/do_sensor.go b/internal/pkg/sensor/do_sensor.go index 4d06ff8..f1c0110 100644 --- a/internal/pkg/sensor/do_sensor.go +++ b/internal/pkg/sensor/do_sensor.go @@ -3,19 +3,22 @@ package sensor // do sensor and methods import ( + "fmt" "sync" + + "github.com/spf13/viper" ) type DOManager struct { // do sensor manager - *Atlas // atlas helper + *Atlas `mapstructure:",squash"` *SensorManager `mapstructure:",squash"` sync.RWMutex } -func NewDOManager() *DOManager { - a := &Atlas{} +func NewDOManager(i2c I2CClient) *DOManager { + a := &Atlas{I2C: i2c} sm := NewSensorManager(a.Read) m := &DOManager{ @@ -25,6 +28,11 @@ func NewDOManager() *DOManager { return m } +func (m *DOManager) LoadConfig(config *viper.Viper, key string) { + config.UnmarshalKey(key, m) + fmt.Printf("DO: %v\n", m.Atlas) +} + func (m *DOManager) GetDefaultName() string { return "DO Sensor" } diff --git a/internal/pkg/sensor/manager.go b/internal/pkg/sensor/manager.go index faba6fb..153771f 100644 --- a/internal/pkg/sensor/manager.go +++ b/internal/pkg/sensor/manager.go @@ -2,6 +2,7 @@ package sensor import ( "FRMS/internal/pkg/manager" + "fmt" "sync" "time" ) @@ -9,6 +10,7 @@ import ( type Manager interface { Start() error Exit() error + IsActive() int HeartBeat(chan struct{}, int, int, time.Duration) } @@ -43,12 +45,17 @@ func (s *SensorManager) Start(addr int) error { if err := s.Manager.Start(); err != nil { return err } + fmt.Printf("Sensor %d: %v\n", addr, s) + // starting monitoring go s.Monitor(addr) return nil } func (s *SensorManager) Monitor(addr int) { ch := make(chan struct{}) // hb chan + if s.SampleRate == 0 { + s.SampleRate = 5000 + } go s.HeartBeat(ch, s.SampleRate, 1000, time.Millisecond) for range ch { @@ -61,6 +68,7 @@ func (r *Reading) TakeReading(addr int) { if err != nil { panic(err) } + fmt.Printf("got sample: %v\n", sample) r.Lock() defer r.Unlock() r.Latest = sample diff --git a/internal/pkg/sensor/ph_sensor.go b/internal/pkg/sensor/ph_sensor.go index e5281e6..745427b 100644 --- a/internal/pkg/sensor/ph_sensor.go +++ b/internal/pkg/sensor/ph_sensor.go @@ -3,19 +3,22 @@ package sensor // do sensor and methods import ( + "fmt" "sync" + + "github.com/spf13/viper" ) type PHManager struct { // do sensor manager - *Atlas + *Atlas `mapstructure:",squash"` *SensorManager `mapstructure:",squash"` sync.RWMutex } -func NewPHManager() *PHManager { - a := &Atlas{} +func NewPHManager(i2c I2CClient) *PHManager { + a := &Atlas{I2C: i2c} sm := NewSensorManager(a.Read) m := &PHManager{ SensorManager: sm, @@ -24,6 +27,11 @@ func NewPHManager() *PHManager { return m } +func (s *PHManager) LoadConfig(config *viper.Viper, key string) { + config.UnmarshalKey(key, s) + fmt.Printf("PH: %v\n", s.Atlas) +} + func (s *PHManager) GetDefaultName() string { return "pH Sensor" } diff --git a/internal/pkg/sensor/rtd_sensor.go b/internal/pkg/sensor/rtd_sensor.go index d0f2409..35864f9 100644 --- a/internal/pkg/sensor/rtd_sensor.go +++ b/internal/pkg/sensor/rtd_sensor.go @@ -3,18 +3,21 @@ package sensor // do sensor and methods import ( + "fmt" "sync" + + "github.com/spf13/viper" ) type RTDManager struct { // do sensor manager - *Atlas + *Atlas `mapstructure:",squash"` *SensorManager `mapstructure:",squash"` sync.RWMutex } -func NewRTDManager() *RTDManager { - a := &Atlas{} +func NewRTDManager(i2c I2CClient) *RTDManager { + a := &Atlas{I2C: i2c} sm := NewSensorManager(a.Read) m := &RTDManager{ SensorManager: sm, @@ -23,6 +26,11 @@ func NewRTDManager() *RTDManager { return m } +func (s *RTDManager) LoadConfig(config *viper.Viper, key string) { + config.UnmarshalKey(key, s) + fmt.Printf("RTD: %v\n", s.Atlas) +} + func (s *RTDManager) GetDefaultName() string { return "RTD Sensor" } diff --git a/internal/pkg/server/reactormanager.go b/internal/pkg/server/reactormanager.go index e9c240f..b1fbef0 100644 --- a/internal/pkg/server/reactormanager.go +++ b/internal/pkg/server/reactormanager.go @@ -1,7 +1,6 @@ package server import ( - "FRMS/internal/pkg/device" pb "FRMS/internal/pkg/grpc" "FRMS/internal/pkg/logging" "FRMS/internal/pkg/manager" @@ -11,7 +10,6 @@ import ( "context" "fmt" _ "log" - "sync" "github.com/spf13/viper" ) @@ -34,30 +32,30 @@ type ReactorManager struct { // *ClientManager // client manager (OUTDATED) *Client // access to ID etc // StatusMon *StatusMonitor putting on pause - *ReactorDevices + // *ReactorDevices Config *viper.Viper // config to update Err chan error } -type ReactorDevices struct { - // device struct - Devices map[int]DeviceManager - sync.RWMutex -} +// type ReactorDevices struct { +// // device struct +// Devices map[int]DeviceManager +// sync.RWMutex +// } func NewReactorManager(cl *Client, config *viper.Viper, errCh chan error) *ReactorManager { // making managers m := NewManager(6) - dm := make(map[int]DeviceManager) - rd := &ReactorDevices{Devices: dm} + //dm := make(map[int]DeviceManager) + //rd := &ReactorDevices{Devices: dm} //cm := NewClientManager(cl) r := &ReactorManager{ //ClientManager: cm, - Manager: m, - Client: cl, - ReactorDevices: rd, - Config: config, - Err: errCh, + Manager: m, + Client: cl, + //ReactorDevices: rd, + Config: config, + Err: errCh, } return r } @@ -88,65 +86,69 @@ func (r *ReactorManager) ReactorStatusHandler(ctx context.Context, req *pb.React //go r.PingReset() fmt.Printf("Recieved ping from %d!\n", req.GetId()) // update devices/sensors - go r.UpdateDevices(req.GetDevices()) + for _, dev := range req.GetDevices() { + fmt.Printf("Device %d is %s ", dev.GetAddr(), dev.GetStatus().String()) + } + fmt.Printf("\n") + // go r.UpdateDevices(req.GetDevices()) return &pb.ReactorStatusResponse{Id: int32(r.Id)}, nil } -// device stuff - -type DeviceManager interface { - LoadConfig() error - UpdateStatus(string) error - String() string // printing -} - -func NewDeviceManager(addr int, config *viper.Viper, prefix string) (DeviceManager, error) { - // returns a manager struct - return device.NewDeviceManager(addr, config, prefix) -} - -func (r *ReactorManager) UpdateDevices(devs []*pb.Device) { - // pass updates to correct manager - r.ReactorDevices.RLock() // read lock only - defer r.ReactorDevices.RUnlock() - - for _, dev := range devs { - // looping over devs - if dm, ok := r.ReactorDevices.Devices[int(dev.GetAddr())]; ok { - // device manager found - go dm.UpdateStatus(dev.GetStatus().String()) - //fmt.Println(dm) - } else { - // not found - go r.AddDevice(dev, r.Id, r.Config, r.Err) - } - } -} - -func (r *ReactorDevices) AddDevice(dev *pb.Device, id int, config *viper.Viper, errCh chan error) { - - // setting vars - prefix := fmt.Sprintf("reactors.%d.", id) - addr := int(dev.GetAddr()) - var dm DeviceManager - var err error - // write locking - r.Lock() - defer r.Unlock() - - if dm, err = NewDeviceManager(addr, config, prefix); err != nil { - errCh <- err - } - - // setting status - if err = dm.UpdateStatus(dev.GetStatus().String()); err != nil { - errCh <- err - } - - // loading config - if err = dm.LoadConfig(); err != nil { - errCh <- err - } - r.Devices[int(addr)] = dm -} +// // device stuff + +// type DeviceManager interface { +// LoadConfig() error +// UpdateStatus(string) error +// String() string // printing +// } + +// func NewDeviceManager(addr int, config *viper.Viper, prefix string) (DeviceManager, error) { +// // returns a manager struct +// return device.NewDeviceManager(addr, config, prefix) +// } + +//func (r *ReactorManager) UpdateDevices(devs []*pb.Device) { +// // pass updates to correct manager +// r.ReactorDevices.RLock() // read lock only +// defer r.ReactorDevices.RUnlock() + +// for _, dev := range devs { +// // looping over devs +// if dm, ok := r.ReactorDevices.Devices[int(dev.GetAddr())]; ok { +// // device manager found +// go dm.UpdateStatus(dev.GetStatus().String()) +// //fmt.Println(dm) +// } else { +// // not found +// go r.AddDevice(dev, r.Id, r.Config, r.Err) +// } +// } +//} + +// func (r *ReactorDevices) AddDevice(dev *pb.Device, id int, config *viper.Viper, errCh chan error) { + +// // setting vars +// prefix := fmt.Sprintf("reactors.%d.", id) +// addr := int(dev.GetAddr()) +// var dm DeviceManager +// var err error +// // write locking +// r.Lock() +// defer r.Unlock() + +// if dm, err = NewDeviceManager(addr, config, prefix); err != nil { +// errCh <- err +// } + +// // setting status +// if err = dm.UpdateStatus(dev.GetStatus().String()); err != nil { +// errCh <- err +// } + +// // loading config +// if err = dm.LoadConfig(); err != nil { +// errCh <- err +// } +// r.Devices[int(addr)] = dm +// }