From 4ab69cb5032f545feea489f2f9c9161d097e8e26 Mon Sep 17 00:00:00 2001 From: KeeganForelight Date: Tue, 10 Jan 2023 19:14:56 -0500 Subject: [PATCH] regening grpc proto --- cmd/reactor/main.go | 6 +- internal/configs/reactor.yaml | 1 + "internal/pkg/\\" | 48 ++++++ internal/pkg/controller/manager.go | 41 +++++ internal/pkg/controller/pwm.go | 37 +++++ internal/pkg/device/manager.go | 55 +++++++ internal/pkg/device/mappings.go | 37 +++++ internal/pkg/grpc/monitoring.proto | 4 +- internal/pkg/manager/manager.go | 97 ++++++++++++ internal/pkg/reactor/devices.go | 68 ++++---- internal/pkg/reactor/monitoring.go | 2 +- internal/pkg/reactor/rlcoordinator.go | 216 +++++++++++--------------- internal/pkg/sensor/atlas.go | 22 +-- internal/pkg/sensor/do_sensor.go | 40 +++-- internal/pkg/sensor/manager.go | 208 ++++++------------------- internal/pkg/sensor/mappings.go | 56 ------- internal/pkg/sensor/ph_sensor.go | 29 ++-- internal/pkg/sensor/pwm_device.go | 36 ----- internal/pkg/sensor/rtd_sensor.go | 28 ++-- internal/pkg/server/coordinator.go | 18 ++- internal/pkg/server/manager.go | 100 +----------- internal/pkg/server/reactormanager.go | 71 ++++++--- 22 files changed, 625 insertions(+), 595 deletions(-) create mode 100644 "internal/pkg/\\" create mode 100644 internal/pkg/controller/manager.go create mode 100644 internal/pkg/controller/pwm.go create mode 100644 internal/pkg/device/manager.go create mode 100644 internal/pkg/device/mappings.go create mode 100644 internal/pkg/manager/manager.go delete mode 100644 internal/pkg/sensor/mappings.go delete mode 100644 internal/pkg/sensor/pwm_device.go diff --git a/cmd/reactor/main.go b/cmd/reactor/main.go index 18170cc..ae0ed60 100644 --- a/cmd/reactor/main.go +++ b/cmd/reactor/main.go @@ -13,11 +13,11 @@ import ( "github.com/spf13/viper" ) -type coordinator interface { +type reactorCoordinator interface { Start() } -func NewCoordinator(config *viper.Viper, ch chan error) coordinator { +func NewReactorCoordinator(config *viper.Viper, ch chan error) reactorCoordinator { // allows interface checking as opposed to calling directly return reactor.NewCoordinator(config, ch) } @@ -36,7 +36,7 @@ func main() { conf := NewConfig("reactor") ch := make(chan error) - rlc := NewCoordinator(conf, ch) // passing conf and err + rlc := NewReactorCoordinator(conf, ch) // passing conf and err go rlc.Start() logging.Debug(logging.DStart, "Reactor Started") diff --git a/internal/configs/reactor.yaml b/internal/configs/reactor.yaml index 26f2e46..8682f29 100644 --- a/internal/configs/reactor.yaml +++ b/internal/configs/reactor.yaml @@ -2,6 +2,7 @@ devices: address: 112 name: DO Sensor reactor: + heartbeat: 5 id: 2166136261 model: "" name: Dummy Reactor diff --git "a/internal/pkg/\\" "b/internal/pkg/\\" new file mode 100644 index 0000000..a2b6a26 --- /dev/null +++ "b/internal/pkg/\\" @@ -0,0 +1,48 @@ +package sensor + +// do sensor and methods + +import ( + pb "FRMS/internal/pkg/grpc" + "sync" +) + +type DOManager struct { + // do sensor manager + *Atlas // atlas helper + *SensorManager `mapstructure:",squash"` + + sync.RWMutex + Status int + Name string +} + +func NewDOManager(device *pb.Device) *DOManager { + a := NewAtlasManager() + sm := NewSensorManager(device*pb.Device, a.Read) + + m := &DOManager{ + SensorManager: sm, + Atlas: a, + } + return m +} + +func (m *DOManager) Start() error { + // start sm + if err := m.SensorManager.Start(); err != nil { + return err + } + // start taking samples + go m.Monitor(m.Address, m.Atlas.Read) + return nil +} + +func (m *DOManager) GetDefaultName() string { + return "DO Sensor" +} + +func (m *DOManager) String() string { + // TODO + return "" +} diff --git a/internal/pkg/controller/manager.go b/internal/pkg/controller/manager.go new file mode 100644 index 0000000..9de570b --- /dev/null +++ b/internal/pkg/controller/manager.go @@ -0,0 +1,41 @@ +package controller + +import ( + pb "FRMS/internal/pkg/grpc" + "FRMS/internal/pkg/manager" + "sync" +) + +// base controller manager + +type Manager interface { + Start() error + Exit() error +} + +func NewManager(max int) Manager { + return manager.New(max) +} + +type ControllerManager struct { + Manager + + *pb.Device + + sync.Mutex + Enabled bool // turn controller on or off +} + +func NewControllerManager(device *pb.Device) *ControllerManager { + m := NewManager(0) // no connections + return &ControllerManager{Manager: m, Device: device} +} + +func (c *ControllerManager) GetDevice() *pb.Device { + return c.Device +} + +func (c *ControllerManager) UpdateDevice(device *pb.Device) error { + c.Device = device + return nil +} diff --git a/internal/pkg/controller/pwm.go b/internal/pkg/controller/pwm.go new file mode 100644 index 0000000..3a0b562 --- /dev/null +++ b/internal/pkg/controller/pwm.go @@ -0,0 +1,37 @@ +package controller + +// do sensor and methods + +import ( + pb "FRMS/internal/pkg/grpc" + "sync" +) + +type PWMManager struct { + // do sensor manager + *ControllerManager + sync.RWMutex + Frequency int + DutyCycle int +} + +func NewPWMManager(device *pb.Device) *PWMManager { + cm := NewControllerManager(device) + return &PWMManager{ControllerManager: cm} +} + +// freq changing +func (m *PWMManager) GetFrequency() (int, error) { + m.Lock() + defer m.Unlock() + return m.Frequency, nil +} + +func (m *PWMManager) GetDefaultName() string { + return "pwm controller" +} + +func (m *PWMManager) String() string { + // TODO + return "" +} diff --git a/internal/pkg/device/manager.go b/internal/pkg/device/manager.go new file mode 100644 index 0000000..10d6f76 --- /dev/null +++ b/internal/pkg/device/manager.go @@ -0,0 +1,55 @@ +package device + +import ( + pb "FRMS/internal/pkg/grpc" + "fmt" + "sync" + + "github.com/spf13/viper" +) + +type SubManager interface { + Start() error + Exit() error + String() string // printing info about the sub manager + GetDevice() *pb.Device + UpdateDevice(*pb.Device) error // updates device + GetAddr() int32 + GetStatus() pb.Status + GetName() string + GetDefaultName() string +} + +// base device manager + +type DeviceManager struct { + SubManager `mapstructure:",squash"` + + Config *viper.Viper + + sync.RWMutex +} + +func NewDeviceManager(device *pb.Device, config *viper.Viper) (*DeviceManager, error) { + s, err := NewSubManager(device) + + dm := &DeviceManager{ + SubManager: s, + Config: config, + } + return dm, err +} + +func (m *DeviceManager) LoadConfig(prefix string) error { + + // setting default name + // prefix = [ reactors | reactor.id ] + mainKey := fmt.Sprintf("%s.devices.%d", prefix, m.GetAddr()) + nameKey := fmt.Sprintf("%s.name", mainKey) + if !m.Config.IsSet(nameKey) { + m.Config.Set(nameKey, m.SubManager.GetDefaultName()) + } + + m.Config.UnmarshalKey(mainKey, m) + return nil +} diff --git a/internal/pkg/device/mappings.go b/internal/pkg/device/mappings.go new file mode 100644 index 0000000..8c2bc76 --- /dev/null +++ b/internal/pkg/device/mappings.go @@ -0,0 +1,37 @@ +package device + +import ( + "FRMS/internal/pkg/controller" + pb "FRMS/internal/pkg/grpc" + "FRMS/internal/pkg/sensor" + "errors" + "fmt" +) + +/* + Returns the correct manager for sensor/controller +*/ + +func NewSubManager(device *pb.Device) (SubManager, error) { + // returns correct device manager by ID + var m SubManager + var err error + addr := device.GetAddr() + switch addr { + case 97: + // DO + m = sensor.NewDOManager(device) + case 99: + // pH + m = sensor.NewPHManager(device) + case 102: + // RTD + m = sensor.NewRTDManager(device) + case 256: + // PWM + m = controller.NewPWMManager(device) + default: + err = errors.New(fmt.Sprintf("Error: device id %d unrecognized!", addr)) + } + return m, err +} diff --git a/internal/pkg/grpc/monitoring.proto b/internal/pkg/grpc/monitoring.proto index 4c9a1f1..aab984a 100644 --- a/internal/pkg/grpc/monitoring.proto +++ b/internal/pkg/grpc/monitoring.proto @@ -24,7 +24,5 @@ enum Status { message Device { int32 addr = 1; // i2c addr - string name = 2; // use readable name, changable - Status status = 3; - map data = 4; // k=v, format + Status status = 2; } diff --git a/internal/pkg/manager/manager.go b/internal/pkg/manager/manager.go new file mode 100644 index 0000000..ba1b131 --- /dev/null +++ b/internal/pkg/manager/manager.go @@ -0,0 +1,97 @@ +package manager + +import ( + "errors" + "math" + "math/rand" + "sync" + "sync/atomic" + "time" +) + +// basic manager for starting/stopping checks plus built in heartbeat for downtime detection +// used across server/reactor + +type Connection struct { + Attempts float64 // float for pow + MaxAttempts int // max allowed + sync.Mutex +} + +type Manager struct { + *Connection // embedded for timeout stuff + Active int32 // atomic checks +} + +func New(maxCon int) *Manager { + + c := &Connection{MaxAttempts: maxCon} + m := &Manager{ + Connection: c, + Active: 0, + } + + return m +} + +func (m *Manager) Start() error { + // atomically checks/updates status + if atomic.CompareAndSwapInt32(&m.Active, 0, 1) { + m.ResetConnections() + return nil + } + // already running + return errors.New("Manager already started!") +} + +func (m *Manager) Exit() error { + if atomic.CompareAndSwapInt32(&m.Active, 1, 0) { + return nil + } + return errors.New("Manager not active!") +} + +// Heartbeat tracker + +func (m *Manager) HeartBeat(ping chan struct{}, hb, interval int, scale time.Duration) { + // pings channel every (HB + randInterval) * time.Duration + // can be used anywhere a heartbeat is needed + // closes channel on exit + + if interval > 0 { + rand.Seed(time.Now().UnixNano()) + } + + for atomic.LoadInt32(&m.Active) > 0 { + // atomoic read may cause memory leak, can revisit + ping <- struct{}{} // no mem + sleep := time.Duration(hb-interval) * scale + if interval > 0 { + sleep += time.Duration(rand.Intn(2*interval)) * scale + } + time.Sleep(sleep) + } + // exited, close chan + close(ping) +} + +// connection timeout generator + +func (c *Connection) Timeout() (time.Duration, error) { + // exponential backoff + c.Lock() + defer c.Unlock() + if int(c.Attempts) < c.MaxAttempts { + c.Attempts += 1 + // 50, 100, 200... + to := time.Duration(50*math.Pow(2, c.Attempts)) * time.Millisecond + return to, nil + } + return 0, errors.New("Connection Failed") +} + +func (c *Connection) ResetConnections() { + c.Lock() + defer c.Unlock() + c.Attempts = 0 +} diff --git a/internal/pkg/reactor/devices.go b/internal/pkg/reactor/devices.go index b67b802..5803c57 100644 --- a/internal/pkg/reactor/devices.go +++ b/internal/pkg/reactor/devices.go @@ -1,8 +1,8 @@ package reactor import ( + "FRMS/internal/pkg/device" pb "FRMS/internal/pkg/grpc" - "FRMS/internal/pkg/sensor" "fmt" "sync" @@ -12,31 +12,24 @@ import ( type DeviceManager interface { Start() error Exit() error - GetDelay(*viper.Viper, string) error - SetName(string) error GetName() string - GetInfo() (*pb.Device, error) - SetStatus(int) error - GetStatus() int - SetI2C(sensor.I2CClient) error // setting the i2c + GetStatus() pb.Status + GetDevice() *pb.Device // monitoring info + LoadConfig(string) error } -func NewDeviceManager(addr int, i2c I2CClient) (DeviceManager, error) { - // new dev - sm, err := sensor.NewDeviceManager(&pb.Device{Addr: int32(addr)}) - // set i2c interface - err = sm.SetI2C(i2c) - return sm, err - +func NewDeviceManager(addr int, config *viper.Viper) (DeviceManager, error) { + return device.NewDeviceManager(&pb.Device{Addr: int32(addr)}, config) } type DeviceCoordinator struct { + Config *viper.Viper Managers map[int]DeviceManager sync.RWMutex } -func NewDeviceCoordinator() *DeviceCoordinator { - dm := &DeviceCoordinator{} +func NewDeviceCoordinator(config *viper.Viper) *DeviceCoordinator { + dm := &DeviceCoordinator{Config: config} dm.Managers = make(map[int]DeviceManager) return dm } @@ -52,28 +45,29 @@ func (c *DeviceCoordinator) UpdateDevices(config *viper.Viper, i2c I2CClient, ac // loop over devs if _, ok := c.Managers[addr]; !ok { // no device - if c.Managers[addr], err = NewDeviceManager(addr, i2c); err != nil { + if c.Managers[addr], err = NewDeviceManager(addr, c.Config); err != nil { return err } - fmt.Printf("Found dev %d (%x)\n", addr, addr) + // loading config + c.Managers[addr].LoadConfig("reactor") // check for config name - nameKey := fmt.Sprintf("device.%d.name", addr) - if !config.IsSet(nameKey) { - // no config name, get default - config.Set(nameKey, c.Managers[addr].GetName()) - } - // setting it - if err = c.Managers[addr].SetName(config.Get(nameKey).(string)); err != nil { - return err - } - // atlas delays - delayKey := fmt.Sprintf("device.%d.delays", addr) - if !config.IsSet(delayKey) { - // set empty delays - config.Set(fmt.Sprintf("device.%d.delays.read", addr), 0) - config.Set(fmt.Sprintf("device.%d.delays.cal", addr), 0) - } - c.Managers[addr].GetDelay(config, delayKey) + // nameKey := fmt.Sprintf("device.%d.name", addr) + // if !config.IsSet(nameKey) { + // // no config name, get default + // config.Set(nameKey, c.Managers[addr].GetName()) + // } + // // setting it + // if err = c.Managers[addr].SetName(config.Get(nameKey).(string)); err != nil { + // return err + // } + // // atlas delays + // delayKey := fmt.Sprintf("device.%d.delays", addr) + // if !config.IsSet(delayKey) { + // // set empty delays + // config.Set(fmt.Sprintf("device.%d.delays.read", addr), 0) + // config.Set(fmt.Sprintf("device.%d.delays.cal", addr), 0) + // } + // c.Managers[addr].GetDelay(config, delayKey) } } // all devs accounted for @@ -81,12 +75,10 @@ func (c *DeviceCoordinator) UpdateDevices(config *viper.Viper, i2c I2CClient, ac if active[addr] { // active if dm.GetStatus() != 1 { - go dm.SetStatus(1) err = dm.Start() } } else { if dm.GetStatus() != 0 { - go dm.SetStatus(0) err = dm.Exit() } } @@ -106,7 +98,7 @@ func (c *DeviceCoordinator) GetDevices() ([]*pb.Device, error) { var dev *pb.Device for _, dm := range c.Managers { - dev, err = dm.GetInfo() + dev = dm.GetDevice() fmt.Println(dev) devices = append(devices, dev) } diff --git a/internal/pkg/reactor/monitoring.go b/internal/pkg/reactor/monitoring.go index 503febd..76c2dcf 100644 --- a/internal/pkg/reactor/monitoring.go +++ b/internal/pkg/reactor/monitoring.go @@ -11,7 +11,7 @@ import ( // implements grpc handler and device data aggregater handler // grpc status update handler -func (c *Coordinator) Ping() { +func (c *ReactorCoordinator) Ping() { // sends all device status to central coordinator fmt.Printf("Pinging coordinator\n") // get devices diff --git a/internal/pkg/reactor/rlcoordinator.go b/internal/pkg/reactor/rlcoordinator.go index 8c09111..7326fd4 100644 --- a/internal/pkg/reactor/rlcoordinator.go +++ b/internal/pkg/reactor/rlcoordinator.go @@ -7,12 +7,10 @@ import ( pb "FRMS/internal/pkg/grpc" "FRMS/internal/pkg/influxdb" "FRMS/internal/pkg/logging" + "FRMS/internal/pkg/manager" "FRMS/internal/pkg/system" "context" - "errors" "fmt" - "math" - "sync" "time" "github.com/spf13/viper" @@ -21,6 +19,18 @@ import ( "google.golang.org/grpc/status" ) +// basic manager +type Manager interface { + Start() error + Exit() error + Timeout() (time.Duration, error) + HeartBeat(chan struct{}, int, int, time.Duration) // creates a hb +} + +func NewManager(max int) Manager { + return manager.New(max) +} + // db client interface type DBClient interface { // @@ -42,86 +52,91 @@ func NewI2CClient(config *viper.Viper) (I2CClient, error) { } type Server struct { - // embed Ip string `mapstructure:"ip"` Port int `mapstructure:"port"` } -// Coordinator == Reactor Level Coordinator - -type Coordinator struct { +type Info struct { Name string `mapstructure:"name,omitempty"` ID int `mapstructure:"id,omitempty"` Model string `mapstructure:"model,omitempty"` - // server info embedded + HB int `mapstructure:"heartbeat"` Server - // database +} + +type ReactorCoordinator struct { + Manager // base manager + Config *viper.Viper // config + + Info `mapstructure:",squash"` + Database DBClient I2C I2CClient - // config - Config *viper.Viper - MonitoringClient pb.MonitoringClient - // connected devices + + MonitoringClient pb.MonitoringClient // grpc + *DeviceCoordinator // struct for locking - // other stuff and things - Err chan error - mu sync.Mutex - HB time.Duration - PingTimer chan struct{} - // db client - Active active -} -type active struct { - bool - int - sync.Mutex + Err chan error } -func NewCoordinator(config *viper.Viper, ch chan error) *Coordinator { - // coord - c := &Coordinator{Err: ch, Config: config} - c.DeviceCoordinator = NewDeviceCoordinator() - // hb defaults to 5 - c.HB = time.Duration(5 * time.Second) +func NewCoordinator(config *viper.Viper, errCh chan error) *ReactorCoordinator { - // this is going to be scuffed - //c.DB = &DB{Bucket: "bb", Org: "ForeLight", URL: url, Token: "S1UZssBu6KPfHaQCt34pZFpyc5lzbH9XanYJWCkOI5FqLY7gq205C6FTH-CmugiPH6o2WoKlTkEuPgIfaJjAhw=="} - // setup db - var err error - if c.Database, err = NewDBClient(config); err != nil { - ch <- err - } + m := NewManager(6) // max 6 attempts + dc := NewDeviceCoordinator(config) - if c.I2C, err = NewI2CClient(config); err != nil { - ch <- err + c := &ReactorCoordinator{ + Manager: m, + Config: config, + DeviceCoordinator: dc, + Err: errCh, } - c.PingTimer = make(chan struct{}) + // this is going to be scuffed + //c.DB = &DB{Bucket: "bb", Org: "ForeLight", URL: url, Token: "S1UZssBu6KPfHaQCt34pZFpyc5lzbH9XanYJWCkOI5FqLY7gq205C6FTH-CmugiPH6o2WoKlTkEuPgIfaJjAhw=="} + return c } -func (c *Coordinator) Start() { +func (c *ReactorCoordinator) Start() { // should discover hwinfo and sensors on its own // now setting up sensor managers - c.Activate() - // load hwinfo - if err := c.LoadInfo(); err != nil { // loads info + var err error + + if err = c.Manager.Start(); err != nil { c.Err <- err } - // grab config stuff - c.Config.UnmarshalKey("reactor", c) - go c.Monitor() + // load config + if err = c.LoadConfig(); err != nil { // loads info + c.Err <- err + } + + // loading clients + if c.Database, err = NewDBClient(c.Config); err != nil { + c.Err <- err + } + + if c.I2C, err = NewI2CClient(c.Config); err != nil { + c.Err <- err + } + go c.Discover() go c.Database.Start() } -func (c *Coordinator) LoadInfo() error { - // check ID +func (c *ReactorCoordinator) LoadConfig() error { + var err error + // get hb + if !c.Config.IsSet("reactor.heartbeat") { + // default to 5 seconds + c.Config.Set("reactor.heartbeat", 5) + } + + // check id if !c.Config.IsSet("reactor.id") { - // get id + // get from hw var id int if id, err = system.GetId("eth0"); err != nil { return err @@ -131,42 +146,39 @@ func (c *Coordinator) LoadInfo() error { // check Model if !c.Config.IsSet("reactor.model") { - // get model + // get from hw var model string if model, err = system.GetModel(); err != nil { return err } c.Config.Set("reactor.model", model) } - // all good + + // all good, unmarhsaling + c.Config.UnmarshalKey("reactor", c) + return err } -func (c *Coordinator) Monitor() { +func (c *ReactorCoordinator) Monitor() { // periodically grabs connected devs and updates list - for c.IsActive() { - select { - case <-c.PingTimer: - // check devs and ping - active, err := c.I2C.GetConnected() - if err != nil { - c.Err <- err - } - go c.UpdateDevices(c.Config, c.I2C, active) - go c.Ping() - } - } -} + ch := make(chan struct{}) + go c.HeartBeat(ch, c.HB, 0, time.Second) -func (c *Coordinator) HeartBeat() { - for c.IsActive() { - c.PingTimer <- struct{}{} + for range ch { + // check devs and ping logging.Debug(logging.DClient, "RLC Pinging server") - time.Sleep(c.HB) + // this can probably be offloaded + active, err := c.I2C.GetConnected() + if err != nil { + c.Err <- err + } + go c.UpdateDevices(c.Config, c.I2C, active) + go c.Ping() } } -func (c *Coordinator) Discover() { +func (c *ReactorCoordinator) Discover() { // sets up connection to central coordiantor conn, err := c.Connect(c.Ip, c.Port) if err != nil { @@ -187,11 +199,12 @@ func (c *Coordinator) Discover() { c.Err <- err } c.MonitoringClient = pb.NewMonitoringClient(clientConn) - go c.HeartBeat() + // manager + go c.Monitor() } -func (c *Coordinator) Connect(ip string, port int) (*grpc.ClientConn, error) { +func (c *ReactorCoordinator) Connect(ip string, port int) (*grpc.ClientConn, error) { // function connects to central server and passes hwinfo var opts []grpc.DialOption opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) @@ -202,13 +215,13 @@ func (c *Coordinator) Connect(ip string, port int) (*grpc.ClientConn, error) { code := status.Code(err) if code != 0 { // != OK if code == (5 | 14) { // service temp down - to := c.Timeout() - if to == 0 { - err = errors.New("Failed to connect to central server") + var to time.Duration + if to, err = c.Timeout(); err != nil { + // from manager return &grpc.ClientConn{}, err } - logging.Debug(logging.DClient, "Server currently unavailable, retrying in %v ms", to) - time.Sleep(time.Duration(to) * time.Millisecond) + logging.Debug(logging.DClient, "Server currently unavailable, retrying in %v", to) + time.Sleep(to) } else { return &grpc.ClientConn{}, err } @@ -217,48 +230,3 @@ func (c *Coordinator) Connect(ip string, port int) (*grpc.ClientConn, error) { } return conn, nil } - -func (c *Coordinator) Timeout() int { - c.Active.Lock() - defer c.Active.Unlock() - if c.Active.int < 9 { - v := int(5 * math.Pow(float64(2), float64(c.Active.int))) - c.Active.int += 1 - return v - } else { - //excedded retries - return 0 - } -} - -func (c *Coordinator) IsActive() bool { - c.Active.Lock() - defer c.Active.Unlock() - return c.Active.bool -} - -func (c *Coordinator) Exit() bool { - c.Active.Lock() - defer c.Active.Unlock() - if c.Active.bool { - c.Active.bool = false - logging.Debug(logging.DClient, "RLC Exiting...") - return true - } else { - logging.Debug(logging.DError, "RLC Already Dead!") - return false - } -} - -func (c *Coordinator) Activate() bool { - c.Active.Lock() - defer c.Active.Unlock() - if c.Active.bool { - logging.Debug(logging.DError, "RLC Already Started!") - return false - } else { - logging.Debug(logging.DClient, "RLC Starting") - c.Active.bool = true - return c.Active.bool - } -} diff --git a/internal/pkg/sensor/atlas.go b/internal/pkg/sensor/atlas.go index 78f0c28..dfef5c2 100644 --- a/internal/pkg/sensor/atlas.go +++ b/internal/pkg/sensor/atlas.go @@ -3,8 +3,6 @@ package sensor import ( "errors" "time" - - "github.com/spf13/viper" ) // atlas helpers to aid with sensors @@ -37,20 +35,24 @@ func (a *Atlas) Calibrate(addr int, cal string) error { return err } -func (a *Atlas) Read(addr int) (string, error) { +func (a *Atlas) Read(addr int32) (float32, error) { // take reading function - if _, err := a.I2C.SendCmd(addr, "R"); err != nil { + if _, err := a.I2C.SendCmd(int(addr), "R"); err != nil { // read command //fmt.Printf("Error writing! %s", err) - return "", err + return 0, err } sleep := time.Duration(a.ReadDelay) * time.Millisecond time.Sleep(sleep) // sleep between reads - data, err := a.I2C.SendCmd(addr, "") - return data, err + //data, err := a.I2C.SendCmd(addr, "") + return 0, nil +} + +// for config +func (a *Atlas) GetCalDelay() int { + return a.CalDelay } -// config stuff -func (a *Atlas) GetDelay(config *viper.Viper, key string) error { - return config.UnmarshalKey(key, a) +func (a *Atlas) GetReadDelay() int { + return a.ReadDelay } diff --git a/internal/pkg/sensor/do_sensor.go b/internal/pkg/sensor/do_sensor.go index 9a1336a..4210bda 100644 --- a/internal/pkg/sensor/do_sensor.go +++ b/internal/pkg/sensor/do_sensor.go @@ -5,26 +5,42 @@ package sensor import ( pb "FRMS/internal/pkg/grpc" "sync" - - "github.com/spf13/viper" ) -type DOSensorManager struct { +type DOManager struct { // do sensor manager - *Manager + *Atlas // atlas helper + *SensorManager `mapstructure:",squash"` + sync.RWMutex } -func (s *DOSensorManager) GetName() string { - return s.Manager.GetName("DO Sensor") +func NewDOManager(device *pb.Device) *DOManager { + a := &Atlas{} + sm := NewSensorManager(device, a.Read) + + m := &DOManager{ + SensorManager: sm, + Atlas: a, + } + return m +} + +func (m *DOManager) Start() error { + // start sm + if err := m.SensorManager.Start(); err != nil { + return err + } + // start taking samples + go m.Monitor() + return nil } -func (s *DOSensorManager) Update(sensor *pb.Device, config *viper.Viper) error { - // updates info - return s.Manager.Update(sensor) +func (m *DOManager) GetDefaultName() string { + return "DO Sensor" } -func (s *DOSensorManager) String() string { - // basic - return s.Manager.String() +func (m *DOManager) String() string { + // TODO + return "" } diff --git a/internal/pkg/sensor/manager.go b/internal/pkg/sensor/manager.go index ae7f0e8..49c0db0 100644 --- a/internal/pkg/sensor/manager.go +++ b/internal/pkg/sensor/manager.go @@ -2,195 +2,75 @@ package sensor import ( pb "FRMS/internal/pkg/grpc" - "errors" - "fmt" - "math/rand" + "FRMS/internal/pkg/manager" "sync" "time" ) -// base device manager - -type Manager struct { - // base dm - Device *pb.Device // for sending/updating - *Atlas - *Active // I might need a thing here - - ReadTimer chan struct{} // time reads - sync.RWMutex -} - -type Active struct { - bool - int - sync.Mutex -} - -func NewManager(atlas *Atlas, device *pb.Device) *Manager { - rt := make(chan struct{}) - active := &Active{} - return &Manager{ReadTimer: rt, Device: device, Atlas: atlas, Active: active} -} - -func (m *Manager) SetI2C(i2c I2CClient) error { - m.Atlas.I2C = i2c - return nil -} - -func (m *Manager) GetName(basic string) string { - m.RLock() - defer m.RUnlock() - // basic is the default if there isnt a name set - if m.Device.Name == "" { - return basic - } - return m.Device.Name -} - -func (m *Manager) SetName(name string) error { - m.Lock() - defer m.Unlock() - m.Device.Name = name - return nil -} - -func (m *Manager) GetStatus() int { - m.RLock() - defer m.RUnlock() - return int(m.Device.Status) -} - -func (m *Manager) SetStatus(status int) error { - // updates status - m.Lock() - defer m.Unlock() - m.Device.Status = pb.Status(status) - return nil -} - -func (m *Manager) GetAddr() int { - m.RLock() - defer m.RUnlock() - return int(m.Device.Addr) -} - -func (m *Manager) SetData(data map[string]string) error { - m.Lock() - defer m.Unlock() - m.Device.Data = data - return nil -} - -func (m *Manager) SetParameter(key, val string) error { - // #TODO - return errors.New("UNIMPL") -} - -func (m *Manager) GetParameter(key string) (string, error) { - // #TODO - return "", errors.New("UNIMPL") -} - -func (m *Manager) Update(device *pb.Device) error { - // updates info - m.Lock() - defer m.Unlock() - m.Device = device - return nil +type Manager interface { + Start() error + Exit() error + HeartBeat(chan struct{}, int, int, time.Duration) } -func (m *Manager) GetInfo() (*pb.Device, error) { - m.RLock() - defer m.RUnlock() - return m.Device, nil +func NewManager(max int) Manager { + return manager.New(max) } -func (m Manager) String() string { - // basic printout - str := fmt.Sprintf("%s is %s at %d (%x). Reading: %s", m.Device.Name, m.Device.Status.String(), m.Device.Addr, m.Device.Addr, m.Device.Data["rip"]) - return str +type Reading struct { + sync.RWMutex + Latest float32 + New func(int32) (float32, error) } -// manager funcs to start/stop +type SensorManager struct { + SampleRate int `mapstructure:"samplerate"` // in (ms) + Name string `mapstructure:"name"` + Manager -func (m *Manager) Start() error { - // goal is to start a long running monitoring routine - if !m.Activate() { - return errors.New("Manager already running!") - } // atomically activated if this runs - fmt.Println("Manager starting") - go m.Monitor() - go m.HeartBeat() - return nil + *pb.Device + *Reading } -func (m *Manager) Exit() error { - if !m.Deactivate() { - return errors.New("Manager already exited!") +func NewSensorManager(device *pb.Device, f func(int32) (float32, error)) *SensorManager { + m := NewManager(0) // no timeout + r := &Reading{New: f} + s := &SensorManager{ + Manager: m, + Reading: r, + Device: device, } - return nil + return s } -func (a *Active) Activate() bool { - // returns true if success, false otherwise - a.Lock() - defer a.Unlock() - if a.bool { // already active - return false - } else { - a.bool = true - a.int = 0 - return a.bool - } +func (s *SensorManager) GetDevice() *pb.Device { + return s.Device } -func (a *Active) Deactivate() bool { - // returns true if success false otherise - a.Lock() - defer a.Unlock() - if a.bool { - a.bool = false - return true - } else { // already deactivated - return a.bool // false - } +func (s *SensorManager) GetName() string { + return s.Name } -func (a *Active) IsActive() bool { - a.Lock() - defer a.Unlock() - return a.bool -} +func (s *SensorManager) Monitor() { + ch := make(chan struct{}) // hb chan + go s.HeartBeat(ch, s.SampleRate, 1000, time.Millisecond) -// reads - -func (m *Manager) Monitor() { - for m.IsActive() { - select { - case <-m.ReadTimer: - // perform read - go m.ReadData() - } + for range ch { + go s.TakeReading(s.Device.GetAddr()) } } -func (m *Manager) HeartBeat() { - for m.IsActive() { - m.ReadTimer <- struct{}{} - rand_sleep := rand.Intn(2000) + 4000 - time.Sleep(time.Duration(rand_sleep) * time.Millisecond) // 4000 - 5000 millisecond sleep intervali +func (r *Reading) TakeReading(addr int32) { + sample, err := r.New(addr) + if err != nil { + panic(err) } + r.Lock() + defer r.Unlock() + r.Latest = sample } -func (m *Manager) ReadData() { - // perform I2C read via atlas helper and update data! - var err error - var data string - data, err = m.Atlas.Read(int(m.GetAddr())) - //fmt.Println(data) - d := map[string]string{"rip": data} - if err = m.SetData(d); err != nil { - panic(err) - } +func (s *SensorManager) UpdateDevice(device *pb.Device) error { + s.Device = device + return nil } diff --git a/internal/pkg/sensor/mappings.go b/internal/pkg/sensor/mappings.go deleted file mode 100644 index b5ab34c..0000000 --- a/internal/pkg/sensor/mappings.go +++ /dev/null @@ -1,56 +0,0 @@ -package sensor - -import ( - pb "FRMS/internal/pkg/grpc" - "errors" - "fmt" - - "github.com/spf13/viper" -) - -/* -Returns the correct manager for sensor/device -*/ - -type DeviceManager interface { - // basic device stuff - Start() error - Exit() error - SetI2C(I2CClient) error // for interface - GetName() string - SetName(string) error // change displayed name - GetDelay(*viper.Viper, string) error - GetStatus() int - SetStatus(int) error // status change - SetParameter(string, string) error // key, val - GetParameter(string) (string, error) // key, val - Update(*pb.Device, *viper.Viper) error // write updates - GetInfo() (*pb.Device, error) // gets info - String() string // printable -} - -func NewDeviceManager(device *pb.Device) (DeviceManager, error) { - // returns correct device manager by ID - atlas := &Atlas{} - m := NewManager(atlas, device) - var dm DeviceManager - var err error - - switch id := device.GetAddr(); id { - case 97: - // DO - dm = &DOSensorManager{Manager: m} - case 99: - // pH - dm = &PHSensorManager{Manager: m} - case 102: - // RTD - dm = &RTDSensorManager{Manager: m} - case 256: - // PWM - dm = &PWMDeviceManager{Manager: m} - default: - err = errors.New(fmt.Sprintf("Error: device id %v unrecognized!", id)) - } - return dm, err -} diff --git a/internal/pkg/sensor/ph_sensor.go b/internal/pkg/sensor/ph_sensor.go index fa5d285..5a44b83 100644 --- a/internal/pkg/sensor/ph_sensor.go +++ b/internal/pkg/sensor/ph_sensor.go @@ -5,26 +5,31 @@ package sensor import ( pb "FRMS/internal/pkg/grpc" "sync" - - "github.com/spf13/viper" ) -type PHSensorManager struct { +type PHManager struct { // do sensor manager - *Manager + *Atlas + *SensorManager `mapstructure:",squash"` + sync.RWMutex } -func (s *PHSensorManager) GetName() string { - return s.Manager.GetName("pH Sensor") +func NewPHManager(device *pb.Device) *PHManager { + a := &Atlas{} + sm := NewSensorManager(device, a.Read) + m := &PHManager{ + SensorManager: sm, + Atlas: a, + } + return m } -func (s *PHSensorManager) Update(sensor *pb.Device, config *viper.Viper) error { - // updates info - return s.Manager.Update(sensor) +func (s *PHManager) GetDefaultName() string { + return "pH Sensor" } -func (s PHSensorManager) String() string { - // basic - return s.Manager.String() +func (s PHManager) String() string { + // TODO + return "" } diff --git a/internal/pkg/sensor/pwm_device.go b/internal/pkg/sensor/pwm_device.go deleted file mode 100644 index 574533f..0000000 --- a/internal/pkg/sensor/pwm_device.go +++ /dev/null @@ -1,36 +0,0 @@ -package sensor - -// do sensor and methods - -import ( - pb "FRMS/internal/pkg/grpc" - "errors" - "sync" - - "github.com/spf13/viper" -) - -type PWMDeviceManager struct { - // do sensor manager - *Manager - sync.RWMutex -} - -func (s *PWMDeviceManager) GetName() string { - return s.Manager.GetName("PWM") -} - -func (s *PWMDeviceManager) SetParameter(key, val string) error { - // #TODO - return errors.New("UNIMPL") -} - -func (s *PWMDeviceManager) Update(device *pb.Device, config *viper.Viper) error { - // updates info - return s.Manager.Update(device) -} - -func (s *PWMDeviceManager) String() string { - // basic - return s.Manager.String() -} diff --git a/internal/pkg/sensor/rtd_sensor.go b/internal/pkg/sensor/rtd_sensor.go index 41607ce..964f736 100644 --- a/internal/pkg/sensor/rtd_sensor.go +++ b/internal/pkg/sensor/rtd_sensor.go @@ -5,26 +5,30 @@ package sensor import ( pb "FRMS/internal/pkg/grpc" "sync" - - "github.com/spf13/viper" ) -type RTDSensorManager struct { +type RTDManager struct { // do sensor manager - *Manager + *Atlas + *SensorManager `mapstructure:",squash"` sync.RWMutex } -func (s *RTDSensorManager) GetName() string { - return s.Manager.GetName("RTD Sensor") +func NewRTDManager(device *pb.Device) *RTDManager { + a := &Atlas{} + sm := NewSensorManager(device, a.Read) + m := &RTDManager{ + SensorManager: sm, + Atlas: a, + } + return m } -func (s *RTDSensorManager) Update(sensor *pb.Device, config *viper.Viper) error { - // updates info - return s.Manager.Update(sensor) +func (s *RTDManager) GetDefaultName() string { + return "RTD Sensor" } -func (s *RTDSensorManager) String() string { - // basic info - return s.Manager.String() +func (s *RTDManager) String() string { + // TODO + return "" } diff --git a/internal/pkg/server/coordinator.go b/internal/pkg/server/coordinator.go index 9a622fc..b624f34 100644 --- a/internal/pkg/server/coordinator.go +++ b/internal/pkg/server/coordinator.go @@ -133,32 +133,36 @@ func (c *ReactorCoordinator) Start() error { func (c *ReactorCoordinator) ClientHandler(cl *Client) { // updates clients if nessecary - if err := c.UpdateManager(cl, c.Err); err != nil { + if err := c.UpdateReactorManager(cl, c.Err); err != nil { c.Err <- err } } -func (m *ReactorManagers) GetManager(id int) (*ReactorManager, error) { +func (m *ReactorManagers) GetReactorManager(id int) (*ReactorManager, error) { m.RLock() defer m.RUnlock() rm, exists := m.Directory[id] if !exists { - return &ReactorManager{}, errors.New(fmt.Sprintf("No manager for reactor %d!", id)) + return &ReactorManager{ID: id}, errors.New(fmt.Sprintf("No manager for reactor %d!", id)) } return rm, nil } -func (m *ReactorManagers) UpdateManager(cl *Client, errCh chan error) error { +func (m *ReactorManagers) UpdateReactorManager(cl *Client, errCh chan error) error { // locking m.RLock() defer m.RUnlock() + var err error + rm, exists := m.Directory[cl.Id] if !exists { - logging.Debug(logging.DClient, "RCO 01 starting manager for reactor client %v", cl.Id) + logging.Debug(logging.DClient, "RCO creating manager for reactor client %v", cl.Id) + // creating rm = NewReactorManager(errCh) - if err := rm.Start(); err != nil { + // starting + if err = rm.Start(); err != nil { return err } m.Directory[cl.Id] = rm @@ -179,7 +183,7 @@ func (r *ReactorCoordinator) Register() error { } func (r *ReactorCoordinator) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) { - rm, err := r.GetManager(int(req.GetId())) + rm, err := r.GetReactorManager(int(req.GetId())) // error checking if err != nil { return &pb.ReactorStatusResponse{}, err diff --git a/internal/pkg/server/manager.go b/internal/pkg/server/manager.go index bc7a069..88e4d5d 100644 --- a/internal/pkg/server/manager.go +++ b/internal/pkg/server/manager.go @@ -4,8 +4,6 @@ import ( //"log" "FRMS/internal/pkg/logging" _ "context" - "errors" - "math" "sync" "time" ) @@ -13,107 +11,21 @@ import ( // will condense into the rm soon enough // manager connects to client on start and returns the gRPC connection to make gRPC clients -type Manager struct { +type ClientManager struct { *Client // gives access to c.Ip c.Id etc Hb time.Duration // used for managing hb timer for client - Active active Sig chan bool - Err chan error -} - -type active struct { sync.Mutex - bool - int -} - -func NewManager(err chan error) *Manager { - hb := time.Duration(5 * time.Second) //hb to - m := &Manager{Hb: hb, Err: err} - return m } -func (m *Manager) Start() error { - if !m.Activate() { - // manager already running - return errors.New("Manager already running!") - } // if we get here, manager is atomically activated and we can ensure start wont run again - return nil +func NewClientManager(cl *Client) *ClientManager { + return &ClientManager{Client: cl} } -func (m *Manager) Exit() { - // exit function to eventually allow saving to configs - if !m.Deactivate() { - m.Err <- errors.New("Manager already disabled!") - } -} - -func (m *Manager) UpdateClient(cl *Client) error { +func (m *ClientManager) UpdateClient(cl *Client) error { + m.Lock() + defer m.Unlock() logging.Debug(logging.DClient, "MAN Updating client %v", cl.Id) m.Client = cl return nil } - -// reactor manager atomic operations - -func (m *Manager) IsActive() bool { - m.Active.Lock() - defer m.Active.Unlock() - return m.Active.bool -} - -func (m *Manager) Activate() bool { - // slightly confusing but returns result of trying to activate - m.Active.Lock() - defer m.Active.Unlock() - alive := m.Active.bool - if alive { - return false - } else { - m.Active.bool = true - m.Active.int = 0 - return m.Active.bool - } -} - -func (m *Manager) Deactivate() bool { - // result of trying to deactivate - m.Active.Lock() - defer m.Active.Unlock() - alive := m.Active.bool - if alive { - m.Active.bool = false - return true - } else { - return m.Active.bool - } -} - -// connection stuff - -func (m *Manager) Timeout() int { - // keeps track of and generates timeout [0-1.2s) over span of ~2.5s - // returns 0 on TO elapse - m.Active.Lock() - defer m.Active.Unlock() - if m.Active.int < 9 { - v := int(5 * math.Pow(float64(2), float64(m.Active.int))) - m.Active.int += 1 - return v - } else { - // exceeded retries - return 0 - } -} - -/* -shouldnt be nessecary anymore - -func (m *Manager) GetDevices(ctx context.Context, req *pb.GetDevicesRequest) (*pb.GetDevicesResponse, error) { - return &pb.GetDevicesResponse{}, errors.New("Get Devices not implemented!") -} - -func (m *Manager) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) { - return &pb.ReactorStatusResponse{}, errors.New("Reactor Status Handler not implemented!") -} -*/ diff --git a/internal/pkg/server/reactormanager.go b/internal/pkg/server/reactormanager.go index e17dacd..6fe17a1 100644 --- a/internal/pkg/server/reactormanager.go +++ b/internal/pkg/server/reactormanager.go @@ -1,9 +1,13 @@ package server import ( + "FRMS/internal/pkg/device" pb "FRMS/internal/pkg/grpc" "FRMS/internal/pkg/logging" - "FRMS/internal/pkg/sensor" + "FRMS/internal/pkg/manager" + "time" + + //"FRMS/internal/pkg/device" "context" "fmt" _ "log" @@ -14,38 +18,58 @@ import ( // this package will implement a reactor manager and associated go routines +type Manager interface { + Start() error // status checks + Exit() error + Timeout() (time.Duration, error) // TO Generator +} + +func NewManager(max int) Manager { + // takes a heartbeat and max connection attempts + return manager.New(max) +} + type ReactorManager struct { - *Manager + Manager // base manager interface + *ClientManager // client manager (OUTDATED) // StatusMon *StatusMonitor putting on pause *ReactorDevices Config *viper.Viper // config to update + ID int + Err chan error } type ReactorDevices struct { // device struct + ID int Devices map[int]DeviceManager sync.RWMutex } -func NewReactorManager(err chan error) *ReactorManager { - r := &ReactorManager{} - // sub managers +func NewReactorManager(errCh chan error) *ReactorManager { + // making managers + m := NewManager(6) dm := make(map[int]DeviceManager) - r.ReactorDevices = &ReactorDevices{Devices: dm} - // core manager - r.Manager = NewManager(err) - + rd := &ReactorDevices{Devices: dm} + r := &ReactorManager{ + Manager: m, + ReactorDevices: rd, + Err: errCh, + } return r } func (r *ReactorManager) Start() error { + // allows for extra stuff + logging.Debug(logging.DStart, "RMA %v starting", r.Id) return r.Manager.Start() //go r.StatusMon.Send(&DeviceInfo{Id: r.Id, Type: "Reactor", Status: "[green]ONLINE[white]"}, "Reactor") } -func (r *ReactorManager) Exit() { - r.Manager.Exit() +func (r *ReactorManager) Exit() error { + // allows for extra stuff logging.Debug(logging.DExit, "RMA %v exiting", r.Id) + return r.Manager.Exit() //go r.StatusMon.Send(&DeviceInfo{Id: r.Id, Type: "Reactor", Status: "[red]OFFLINE[white]", Data: fmt.Sprintf("Last Seen %v", time.Now().Format("Mon at 03:04:05pm MST"))}, "Reactor") } @@ -62,16 +86,14 @@ func (r *ReactorManager) ReactorStatusHandler(ctx context.Context, req *pb.React // device stuff type DeviceManager interface { - SetName(string) error // change name - GetParameter(string) (string, error) // key ret val - SetParameter(string, string) error // key, val - Update(*pb.Device, *viper.Viper) error // write updates - String() string // printing + LoadConfig(string) error + UpdateDevice(*pb.Device) error + String() string // printing } -func NewDeviceManager(device *pb.Device) (DeviceManager, error) { +func NewDeviceManager(dev *pb.Device, config *viper.Viper) (DeviceManager, error) { // returns a manager struct - return sensor.NewDeviceManager(device) + return device.NewDeviceManager(dev, config) } func (r *ReactorManager) UpdateDevices(devs []*pb.Device) { @@ -83,21 +105,24 @@ func (r *ReactorManager) UpdateDevices(devs []*pb.Device) { // looping over devs if dm, ok := r.ReactorDevices.Devices[int(dev.GetAddr())]; ok { // device manager found - go dm.Update(dev, r.Config) // update dm - fmt.Println(dm) + go dm.UpdateDevice(dev) + //fmt.Println(dm) } else { // not found - go r.AddDevice(dev, r.Err) + go r.AddDevice(dev, r.ID, r.Config, r.Err) } } } -func (r *ReactorDevices) AddDevice(dev *pb.Device, errCh chan error) { +func (r *ReactorDevices) AddDevice(dev *pb.Device, id int, config *viper.Viper, errCh chan error) { r.Lock() // write lock defer r.Unlock() - dm, err := NewDeviceManager(dev) + dm, err := NewDeviceManager(dev, config) if err != nil { errCh <- err } + if err := dm.LoadConfig(fmt.Sprintf("reactors.%d", id)); err != nil { + errCh <- err + } r.Devices[int(dev.GetAddr())] = dm }