diff --git a/internal/configs/reactor.yaml b/internal/configs/reactor.yaml index b2b836a..26f2e46 100644 --- a/internal/configs/reactor.yaml +++ b/internal/configs/reactor.yaml @@ -1,9 +1,10 @@ +devices: + address: 112 + name: DO Sensor reactor: - name: "Dummy Reactor" + id: 2166136261 + model: "" + name: Dummy Reactor server: - ip: "192.168.100.2" + ip: 192.168.100.2 port: 2022 -devices: - address: 112 # decimal - name: "DO Sensor" - diff --git a/internal/pkg/I2C/device.go b/internal/pkg/I2C/device.go index 0171c7e..a24501e 100644 --- a/internal/pkg/I2C/device.go +++ b/internal/pkg/I2C/device.go @@ -1,79 +1,72 @@ package I2C import ( - "fmt" - "sync" - "time" + "fmt" + "sync" + "time" ) type I2CDevice struct { - *I2CBus // embeds bus - bool // stores whether dev is currently connected - int // addr - Data *data + *I2CBus // embeds bus + bool // stores whether dev is currently connected + int // addr + Data *data } type data struct { - string - bool - sync.Mutex + string + bool + sync.Mutex } func (d I2CDevice) String() string { - t := map[int]string{97:"DO Sensor",99:"pH Sensor",102:"Temperature Sensor",64:"DHT11 Sensor"} - return t[d.int] + t := map[int]string{97: "DO Sensor", 99: "pH Sensor", 102: "Temperature Sensor", 64: "DHT11 Sensor"} + return t[d.int] } -func NewDevice(addr int,bus *I2CBus) *I2CDevice { - d := &I2CDevice{} - d.I2CBus = bus - d.int = addr - d.Data = &data{} - return d +func NewDevice(addr int, bus *I2CBus) *I2CDevice { + d := &I2CDevice{} + d.I2CBus = bus + d.int = addr + d.Data = &data{} + return d } func (d *I2CDevice) GetAddr() int { - return d.int + return d.int } -func (d *I2CDevice) GetStatus() string { - // TODO - s := d.I2CBus.GetStatus(d.int) - if s { - d.Data.Active() - return "[green]ACTIVE[white]" - } else { - d.Data.Killed() - return "[red]KILLED[white]" - } +func (d *I2CDevice) GetStatus() bool { + // TODO + return d.I2CBus.GetStatus(d.int) } func (d *I2CDevice) GetType() string { - // TODO - return fmt.Sprint(d) + // TODO + return fmt.Sprint(d) } func (d *I2CDevice) GetData() string { - d.Data.Lock() - defer d.Data.Unlock() - d.Data.string = d.I2CBus.GetData(d.int) - return d.Data.string + d.Data.Lock() + defer d.Data.Unlock() + d.Data.string = d.I2CBus.GetData(d.int) + return d.Data.string } func (d *data) Active() { - d.Lock() - defer d.Unlock() - if !d.bool { - d.string = "" - d.bool = true - } + d.Lock() + defer d.Unlock() + if !d.bool { + d.string = "" + d.bool = true + } } func (d *data) Killed() { - d.Lock() - defer d.Unlock() - if d.bool { - d.string = time.Now().Format("Mon at 03:04:05pm MST") - d.bool = false - } + d.Lock() + defer d.Unlock() + if d.bool { + d.string = time.Now().Format("Mon at 03:04:05pm MST") + d.bool = false + } } diff --git a/internal/pkg/I2C/monitor.go b/internal/pkg/I2C/monitor.go index 07bb585..7aacd32 100644 --- a/internal/pkg/I2C/monitor.go +++ b/internal/pkg/I2C/monitor.go @@ -1,97 +1,102 @@ package I2C import ( - "time" - _ "fmt" - "sync" + _ "fmt" + "sync" + "time" ) /* - i2c monitor implements a long running monitor responsible for sending active devices to the rlc + i2c monitor implements a long running monitor responsible for sending active devices to the rlc */ type I2CMonitor struct { - *I2CBus - Devices *devs - DevChan chan int + *I2CBus + Devices *devs + DevChan chan int } type devs struct { - sync.Mutex - m map[int]*I2CDevice + sync.Mutex + m map[int]*I2CDevice } -func NewMonitor(bus int,ch chan int) *I2CMonitor { - m := &I2CMonitor{} - b := NewBus(bus) - m.I2CBus = b - d := make(map[int]*I2CDevice) - m.Devices = &devs{m:d} - m.DevChan = ch - return m +func NewMonitor(bus int, ch chan int) *I2CMonitor { + m := &I2CMonitor{} + b := NewBus(bus) + m.I2CBus = b + d := make(map[int]*I2CDevice) + m.Devices = &devs{m: d} + m.DevChan = ch + return m } func (m *I2CMonitor) Update() { - /* - scans bus and adds new active devices - */ - devs := m.Scan() - chng := m.Devices.Parse(m.I2CBus,devs) - for _, d := range chng { - go m.ConnectDevice(d) - } + /* + scans bus and adds new active devices + */ + devs := m.Scan() + chng := m.Devices.Parse(m.I2CBus, devs) + for _, d := range chng { + go m.ConnectDevice(d) + } } func (m *I2CMonitor) Monitor() { - // functon that updates the device list and notifies rlc of any changes to sensor composition - s := make(chan struct{}) - t := 5 * time.Second - go func(signal chan struct{},to time.Duration) { // simple signal func to init scan - for { - signal <-struct{}{} - time.Sleep(to) - } - }(s,t) + // functon that updates the device list and notifies rlc of any changes to sensor composition + s := make(chan struct{}) + t := 5 * time.Second + go func(signal chan struct{}, to time.Duration) { // simple signal func to init scan + for { + signal <- struct{}{} + time.Sleep(to) + } + }(s, t) - for { - <-s - m.Update() - } + for { + <-s + m.Update() + } } func (m *I2CMonitor) ConnectDevice(addr int) { - m.DevChan <-addr + m.DevChan <- addr } -func (m *I2CMonitor) GetDevice(addr int) interface{ GetAddr() int; GetData() string; GetStatus() string; GetType() string } { - m.Devices.Lock() - defer m.Devices.Unlock() - return m.Devices.m[addr] +func (m *I2CMonitor) GetDevice(addr int) interface { + GetAddr() int + GetData() string + GetStatus() bool + GetType() string +} { + m.Devices.Lock() + defer m.Devices.Unlock() + return m.Devices.m[addr] } -func (d *devs) Parse(bus *I2CBus,devices map[int]bool) []int { - d.Lock() - defer d.Unlock() - newdevs := []int{} - for addr, status := range devices { - if dev, exists := d.m[addr]; exists { - // device seen - if status != dev.bool { // if device state changed - dev.bool = status - if status { - newdevs = append(newdevs,dev.GetAddr()) - } - } - } else { - // device not seen yet - if status { - // active - newd := NewDevice(addr,bus) - newd.bool = status - d.m[addr] = newd - newdevs = append(newdevs,newd.GetAddr()) - } - } - } - return newdevs +func (d *devs) Parse(bus *I2CBus, devices map[int]bool) []int { + d.Lock() + defer d.Unlock() + newdevs := []int{} + for addr, status := range devices { + if dev, exists := d.m[addr]; exists { + // device seen + if status != dev.bool { // if device state changed + dev.bool = status + if status { + newdevs = append(newdevs, dev.GetAddr()) + } + } + } else { + // device not seen yet + if status { + // active + newd := NewDevice(addr, bus) + newd.bool = status + d.m[addr] = newd + newdevs = append(newdevs, newd.GetAddr()) + } + } + } + return newdevs } diff --git a/internal/pkg/reactor/monitoring.go b/internal/pkg/reactor/monitoring.go index f40583d..b6cca24 100644 --- a/internal/pkg/reactor/monitoring.go +++ b/internal/pkg/reactor/monitoring.go @@ -3,7 +3,6 @@ package reactor import ( "context" "fmt" - "sync" //"FRMS/internal/pkg/logging" //"google.golang.org/grpc" @@ -31,57 +30,62 @@ func (c *Coordinator) DevStatus(ch chan *DeviceStatus, a int, dm DeviceManager) func (c *Coordinator) GetStatus() []*pb.Device { // db stuff //api := client.WriteAPIBlocking(c.Org, c.Bucket) - var wg sync.WaitGroup + //var wg sync.WaitGroup devs := []*pb.Device{} - statusChan := make(chan *DeviceStatus) - c.Devices.Lock() - for a, dm := range c.Devices.Managers { - wg.Add(1) - go c.DevStatus(statusChan, a, dm) - } - c.Devices.Unlock() - allDone := make(chan struct{}) - go func() { - wg.Wait() - allDone <- struct{}{} - }() // once all the status are sent we send all done on the chan - for { - select { - case s := <-statusChan: - fmt.Printf("%v is %v\n", s.Type, s.Status) - /* - data := strings.Split(s.Data,",") // T:10C,H:102% -> T:10C H:10% - for _, m := range data { - var meas string - splt := strings.Split(m,":") // T 10C or H 10% - if splt[0] == "T" { - meas = "Temperature" - } else if splt[0] == "H" { - meas = "Humidity" - } - val, err := strconv.ParseFloat(strings.Trim(splt[1]," %C\n"), 64) - if err != nil { - panic(err) - } - p := influxdb2.NewPoint("measurements",map[string]string{"type":meas},map[string]interface{}{"val":val},time.Now()) - if err := api.WritePoint(context.Background(), p); err != nil { - panic(err) - } - } - devs = append(devs,&pb.Device{Addr:int32(s.Addr),Type:s.Type,Status:s.Status,Data:s.Data}) - */ - wg.Done() - case <-allDone: - return devs + return devs + /* + statusChan := make(chan *DeviceStatus) + c.Devices.Lock() + for a, dm := range c.Devices.Managers { + wg.Add(1) + go c.DevStatus(statusChan, a, dm) } - } + c.Devices.Unlock() + allDone := make(chan struct{}) + go func() { + wg.Wait() + allDone <- struct{}{} + }() // once all the status are sent we send all done on the chan + for { + select { + case s := <-statusChan: + fmt.Printf("%v is %v\n", s.Type, s.Status) + /* + data := strings.Split(s.Data,",") // T:10C,H:102% -> T:10C H:10% + for _, m := range data { + var meas string + splt := strings.Split(m,":") // T 10C or H 10% + if splt[0] == "T" { + meas = "Temperature" + } else if splt[0] == "H" { + meas = "Humidity" + } + val, err := strconv.ParseFloat(strings.Trim(splt[1]," %C\n"), 64) + if err != nil { + panic(err) + } + p := influxdb2.NewPoint("measurements",map[string]string{"type":meas},map[string]interface{}{"val":val},time.Now()) + if err := api.WritePoint(context.Background(), p); err != nil { + panic(err) + } + } + devs = append(devs,&pb.Device{Addr:int32(s.Addr),Type:s.Type,Status:s.Status,Data:s.Data}) + */ + /* + wg.Done() + case <-allDone: + return devs + } + } + */ } // grpc status update handler func (c *Coordinator) Ping() { // sends all device status to central coordinator + fmt.Printf("Pinging cc\n") devs := c.GetStatus() - req := &pb.ReactorStatusPing{Id: uint32(c.ID), Devices: devs} + req := &pb.ReactorStatusPing{Id: int32(c.ID), Devices: devs} _, err := c.MonitoringClient.ReactorStatusHandler(context.Background(), req) if err != nil { c.Err <- err diff --git a/internal/pkg/reactor/rlcoordinator.go b/internal/pkg/reactor/rlcoordinator.go index e42aae9..1816b42 100644 --- a/internal/pkg/reactor/rlcoordinator.go +++ b/internal/pkg/reactor/rlcoordinator.go @@ -7,7 +7,6 @@ import ( pb "FRMS/internal/pkg/grpc" "FRMS/internal/pkg/influxdb" "FRMS/internal/pkg/logging" - "FRMS/internal/pkg/sensor" "FRMS/internal/pkg/system" "context" "errors" @@ -51,7 +50,9 @@ type Coordinator struct { Config *viper.Viper MonitoringClient pb.MonitoringClient // connected devices - Devices *DeviceManagers // struct for locking + *Devices // struct for locking + *Sensors + // other stuff and things Err chan error mu sync.Mutex HB time.Duration @@ -67,7 +68,19 @@ type active struct { sync.Mutex } -type DeviceManagers struct { +type Sensors struct { + Managers map[int]SensorManager + sync.Mutex +} + +type SensorManager interface { + Start() + GetType() string + GetStatus() string + GetData() string +} + +type Devices struct { Managers map[int]DeviceManager sync.Mutex } @@ -84,19 +97,19 @@ type DeviceManager interface { type I2CDev interface { GetAddr() int GetData() string - GetStatus() string + GetStatus() bool GetType() string } -func NewDeviceManager(i2c I2CDev) DeviceManager { - return sensor.NewDeviceManager(i2c) -} +//func NewDeviceManager(i2c I2CDev) DeviceManager { +//return sensor.NewDeviceManager(i2c) +//} type I2CMonitor interface { Monitor() GetDevice(int) interface { GetAddr() int - GetStatus() string + GetStatus() bool GetData() string GetType() string } @@ -107,9 +120,13 @@ func NewI2CMonitor(b int, ch chan int) I2CMonitor { } func NewCoordinator(config *viper.Viper, ch chan error) *Coordinator { - sen := new(DeviceManagers) - sen.Managers = make(map[int]DeviceManager) - c := &Coordinator{Err: ch, Devices: sen, Config: config} + // sensor/device manager struct + dm := new(Devices) + dm.Managers = make(map[int]DeviceManager) + sm := new(Sensors) + sm.Managers = make(map[int]SensorManager) + + c := &Coordinator{Err: ch, Devices: dm, Sensors: sm, Config: config} c.HB = time.Duration(5 * time.Second) // this is going to be scuffed @@ -204,13 +221,14 @@ func (c *Coordinator) DeviceConnect(i2c I2CDev) { c.Devices.Lock() defer c.Devices.Unlock() addr := i2c.GetAddr() - if dm, exists := c.Devices.Managers[addr]; !exists { - dm := NewDeviceManager(i2c) - c.Devices.Managers[addr] = dm - go dm.Start() - } else { - go dm.Start() - } + fmt.Printf("Device %d (%x) found!\n", addr, addr) + //if dm, exists := c.Devices.Managers[addr]; !exists { + //dm := NewDeviceManager(i2c) + //c.Devices.Managers[addr] = dm + //go dm.Start() + //} else { + //go dm.Start() + //} } func (c *Coordinator) Discover() { diff --git a/internal/pkg/sensor/do_sensor.go b/internal/pkg/sensor/do_sensor.go new file mode 100644 index 0000000..b937220 --- /dev/null +++ b/internal/pkg/sensor/do_sensor.go @@ -0,0 +1,45 @@ +package sensor + +// do sensor and methods + +import ( + pb "FRMS/internal/pkg/grpc" + "errors" + "sync" + + "github.com/spf13/viper" +) + +type DOSensorManager struct { + // do sensor manager + Sensor *pb.Sensor // for sending/updating + sync.RWMutex +} + +func (s *DOSensorManager) SetName(name string) error { + // #TODO + return errors.New("UNIMPL") +} + +func (s *DOSensorManager) SetStatus(status int) error { + // #TODO + return errors.New("UNIMPL") +} + +func (s *DOSensorManager) SetSampleRate(rate int) error { + // #TODO + return errors.New("UNIMPL") +} + +func (s *DOSensorManager) Update(sensor *pb.Sensor, config *viper.Viper) error { + // updates info + s.Lock() + defer s.Unlock() + s.Sensor = sensor + return errors.New("UNIMPL") +} + +func (s *DOSensorManager) String() string { + // #TODO + return "TODO" +} diff --git a/internal/pkg/sensor/manager.go b/internal/pkg/sensor/manager.go index 12e5270..58de1da 100644 --- a/internal/pkg/sensor/manager.go +++ b/internal/pkg/sensor/manager.go @@ -3,12 +3,10 @@ package sensor import ( _ "FRMS/internal/pkg/I2C" _ "fmt" - "log" - "strings" - "sync" - "time" ) +/* +// I think most of this is unnessecary as hell type Manager struct { *Dev I2CDevice @@ -26,14 +24,13 @@ type Dev struct { // last known values Addr int Type string - Status string // could be more efficient but to hell with it + Status int // could be more efficient but to hell with it Data string } type I2CDevice interface { // basic device info - GetAddr() int - GetStatus() string + GetStatus() bool GetType() string GetData() string } @@ -112,3 +109,4 @@ func (a *Active) IsActive() bool { defer a.Unlock() return a.bool } +*/ diff --git a/internal/pkg/sensor/mappings.go b/internal/pkg/sensor/mappings.go index 79ff4c9..cfca596 100644 --- a/internal/pkg/sensor/mappings.go +++ b/internal/pkg/sensor/mappings.go @@ -1,36 +1,66 @@ package sensor import ( + pb "FRMS/internal/pkg/grpc" + "errors" "fmt" - "sync" + + "github.com/spf13/viper" ) /* - this file serves as a map that the sensor library can use to determine which manager to call +Returns the correct manager for sensor/device */ - -type NewManager interface { - // serves as interface to restrict managers can be relocated +type SensorManager interface { + // basic sensor stuff + SetName(string) error // change display name + SetStatus(int) error // update status + SetSampleRate(int) error // update sample rate + Update(*pb.Sensor, *viper.Viper) error // write updates + String() string // printable } -type DM struct { - DeviceManagers map[uint]NewManager - sync.Mutex +type DeviceManager interface { + // basic device stuff + SetName(string) error // change displayed name + SetStatus(int) error // status change + SetParameter(string, string) error // key, val + Update(*pb.Device, *viper.Viper) error // write updates + String() string // printable } -func NewManagerDirectory() *DM { - m := map[uint]NewManager{ - // map to set functions up - //112: NewDOManager(), +func NewSensorManager(sensor *pb.Sensor) (SensorManager, error) { + // returns correct sensor manager by ID + var sm SensorManager + var err error + + switch id := sensor.GetAddr(); id { + case 97: + // DO + sm = &DOSensorManager{Sensor: sensor} + case 99: + // pH + sm = &PHSensorManager{Sensor: sensor} + case 102: + // RTD + sm = &RTDSensorManager{Sensor: sensor} + default: + err = errors.New(fmt.Sprintf("Error: sensor id %v unrecognized!", id)) } - return &DM{DeviceManagers: m} + return sm, err } -func (d *DM) GetManager(addr uint) (NewManager, error) { - d.Lock() - defer d.Unlock() - if m, ok := d.DeviceManagers[addr]; ok { - return m, nil +func NewDeviceManager(device *pb.Device) (DeviceManager, error) { + // returns correct device manager by ID + var dm DeviceManager + var err error + + switch id := device.GetAddr(); id { + case 256: + // PWM + dm = &PWMDeviceManager{Device: device} + default: + err = errors.New(fmt.Sprintf("Error: device id %v unrecognized!", id)) } - return &DM{}, fmt.Errorf("No manager found for address %d!", addr) + return dm, err } diff --git a/internal/pkg/sensor/ph_sensor.go b/internal/pkg/sensor/ph_sensor.go new file mode 100644 index 0000000..f5bb1aa --- /dev/null +++ b/internal/pkg/sensor/ph_sensor.go @@ -0,0 +1,45 @@ +package sensor + +// do sensor and methods + +import ( + pb "FRMS/internal/pkg/grpc" + "errors" + "sync" + + "github.com/spf13/viper" +) + +type PHSensorManager struct { + // do sensor manager + Sensor *pb.Sensor // for sending/updating + sync.RWMutex +} + +func (s *PHSensorManager) SetName(name string) error { + // #TODO + return errors.New("UNIMPL") +} + +func (s *PHSensorManager) SetStatus(status int) error { + // #TODO + return errors.New("UNIMPL") +} + +func (s *PHSensorManager) SetSampleRate(rate int) error { + // #TODO + return errors.New("UNIMPL") +} + +func (s *PHSensorManager) Update(sensor *pb.Sensor, config *viper.Viper) error { + // updates info + s.Lock() + defer s.Unlock() + s.Sensor = sensor + return errors.New("UNIMPL") +} + +func (s *PHSensorManager) String() string { + // #TODO + return "TODO" +} diff --git a/internal/pkg/sensor/pwm_device.go b/internal/pkg/sensor/pwm_device.go new file mode 100644 index 0000000..619faac --- /dev/null +++ b/internal/pkg/sensor/pwm_device.go @@ -0,0 +1,45 @@ +package sensor + +// do sensor and methods + +import ( + pb "FRMS/internal/pkg/grpc" + "errors" + "sync" + + "github.com/spf13/viper" +) + +type PWMDeviceManager struct { + // do sensor manager + Device *pb.Device // for sending/updating + sync.RWMutex +} + +func (s *PWMDeviceManager) SetName(name string) error { + // #TODO + return errors.New("UNIMPL") +} + +func (s *PWMDeviceManager) SetStatus(status int) error { + // #TODO + return errors.New("UNIMPL") +} + +func (s *PWMDeviceManager) SetParameter(key, val string) error { + // #TODO + return errors.New("UNIMPL") +} + +func (s *PWMDeviceManager) Update(device *pb.Device, config *viper.Viper) error { + // updates info + s.Lock() + defer s.Unlock() + s.Device = device + return errors.New("UNIMPL") +} + +func (s *PWMDeviceManager) String() string { + // #TODO + return "TODO" +} diff --git a/internal/pkg/sensor/rtd_sensor.go b/internal/pkg/sensor/rtd_sensor.go new file mode 100644 index 0000000..2bb40c3 --- /dev/null +++ b/internal/pkg/sensor/rtd_sensor.go @@ -0,0 +1,45 @@ +package sensor + +// do sensor and methods + +import ( + pb "FRMS/internal/pkg/grpc" + "errors" + "sync" + + "github.com/spf13/viper" +) + +type RTDSensorManager struct { + // do sensor manager + Sensor *pb.Sensor // for sending/updating + sync.RWMutex +} + +func (s *RTDSensorManager) SetName(name string) error { + // #TODO + return errors.New("UNIMPL") +} + +func (s *RTDSensorManager) SetStatus(status int) error { + // #TODO + return errors.New("UNIMPL") +} + +func (s *RTDSensorManager) SetSampleRate(rate int) error { + // #TODO + return errors.New("UNIMPL") +} + +func (s *RTDSensorManager) Update(sensor *pb.Sensor, config *viper.Viper) error { + // updates info + s.Lock() + defer s.Unlock() + s.Sensor = sensor + return errors.New("UNIMPL") +} + +func (s *RTDSensorManager) String() string { + // #TODO + return "TODO" +} diff --git a/internal/pkg/sensor/sensor.go b/internal/pkg/sensor/sensor.go deleted file mode 100644 index fe00d5c..0000000 --- a/internal/pkg/sensor/sensor.go +++ /dev/null @@ -1,6 +0,0 @@ -package sensor - -import ( - _ "fmt" -) - diff --git a/internal/pkg/server/reactormanager.go b/internal/pkg/server/reactormanager.go index f96c55e..a469501 100644 --- a/internal/pkg/server/reactormanager.go +++ b/internal/pkg/server/reactormanager.go @@ -3,10 +3,13 @@ package server import ( pb "FRMS/internal/pkg/grpc" "FRMS/internal/pkg/logging" + "FRMS/internal/pkg/sensor" "context" "fmt" _ "log" "sync" + + "github.com/spf13/viper" ) // this package will implement a reactor manager and associated go routines @@ -14,21 +17,33 @@ import ( type ReactorManager struct { *Manager // StatusMon *StatusMonitor putting on pause - *devstatus + *ReactorSensors + *ReactorDevices + Config *viper.Viper // config to update +} + +type ReactorSensors struct { + // sensor struct + Sensors map[int]SensorManager + sync.RWMutex } -type devstatus struct { - // keeping this around but not using it to create status for status mon - sync.Mutex - Devs map[int]*DeviceInfo +type ReactorDevices struct { + // device struct + Devices map[int]DeviceManager + sync.RWMutex } func NewReactorManager(err chan error) *ReactorManager { r := &ReactorManager{} - di := make(map[int]*DeviceInfo) - r.devstatus = &devstatus{Devs: di} + // sub managers + dm := make(map[int]DeviceManager) + sm := make(map[int]SensorManager) + r.ReactorDevices = &ReactorDevices{Devices: dm} + r.ReactorSensors = &ReactorSensors{Sensors: sm} + // core manager r.Manager = NewManager(err) - //r.StatusMon = NewStatusMonitor("Reactor", c.Id, sys) + return r } @@ -41,38 +56,94 @@ func (r *ReactorManager) Exit() { r.Manager.Exit() logging.Debug(logging.DExit, "RMA %v exiting", r.Id) //go r.StatusMon.Send(&DeviceInfo{Id: r.Id, Type: "Reactor", Status: "[red]OFFLINE[white]", Data: fmt.Sprintf("Last Seen %v", time.Now().Format("Mon at 03:04:05pm MST"))}, "Reactor") - r.devstatus.Lock() - defer r.devstatus.Unlock() - // keeping this because it **COULD** be useful, maybe - for _, d := range r.Devs { - newd := d - newd.Status = "UNKOWN" - r.Devs[newd.Id] = newd - //go r.StatusMon.Send(newd, "Device") - } } func (r *ReactorManager) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) { // function client will call to update reactor information //go r.PingReset() fmt.Printf("Recieved ping from %v!\n", req.GetId()) - for _, dev := range req.GetDevices() { - d := &DeviceInfo{Id: int(dev.GetAddr()), Type: dev.GetName(), Status: dev.GetStatus().String(), Data: dev.GetData()} - go r.UpdateDevice(d) - } + // update devices/sensors + go r.UpdateSensors(req.GetSensors()) + go r.UpdateDevices(req.GetDevices()) + return &pb.ReactorStatusResponse{Id: int32(r.Id)}, nil } -func (r *ReactorManager) UpdateDevice(d *DeviceInfo) { - r.devstatus.Lock() - defer r.devstatus.Unlock() - if olddev, ok := r.Devs[d.Id]; !ok { - // new device - r.Devs[d.Id] = d - //go r.StatusMon.Send(d, "Device") - } else if olddev.Status != d.Status || olddev.Data != d.Data { - // dev status or data has changed - r.Devs[d.Id] = d - //go r.StatusMon.Send(d, "Device") +// sensor/device stuff + +type SensorManager interface { + SetName(string) error // change name + SetSampleRate(int) error // change sample rate + Update(*pb.Sensor, *viper.Viper) error // write updates + String() string // printing +} + +type DeviceManager interface { + SetName(string) error // change name + SetParameter(string, string) error // key, val + Update(*pb.Device, *viper.Viper) error // write updates + String() string // printing +} + +func NewSensorManager(sens *pb.Sensor) (SensorManager, error) { + // returns a manager struct + return sensor.NewSensorManager(sens) +} + +func NewDeviceManager(device *pb.Device) (DeviceManager, error) { + // returns a manager struct + return sensor.NewDeviceManager(device) +} + +func (r *ReactorManager) UpdateDevices(devs []*pb.Device) { + // pass updates to correct manager + r.ReactorDevices.RLock() // read lock only + defer r.ReactorDevices.RUnlock() + + for _, dev := range devs { + // looping over devs + if dm, ok := r.ReactorDevices.Devices[int(dev.GetAddr())]; ok { + // device manager found + go dm.Update(dev, r.Config) // update dm + } else { + // not found + go r.AddDevice(dev, r.Err) + } + } +} + +func (r *ReactorDevices) AddDevice(dev *pb.Device, errCh chan error) { + r.Lock() // write lock + defer r.Unlock() + dm, err := NewDeviceManager(dev) + if err != nil { + errCh <- err + } + r.Devices[int(dev.GetAddr())] = dm +} + +func (r *ReactorManager) UpdateSensors(sensors []*pb.Sensor) { + // pass updates to correct manager + r.ReactorSensors.RLock() // read lock + defer r.ReactorSensors.RUnlock() + for _, sens := range sensors { + // looping over sensors + if sm, ok := r.ReactorSensors.Sensors[int(sens.GetAddr())]; ok { + // sensor manager found + go sm.Update(sens, r.Config) // update sm + } else { + // not found + go r.AddSensor(sens, r.Err) + } + } +} + +func (r *ReactorSensors) AddSensor(sensor *pb.Sensor, errCh chan error) { + r.Lock() // write lock + defer r.Unlock() + sm, err := NewSensorManager(sensor) + if err != nil { + errCh <- err } + r.Sensors[int(sensor.GetAddr())] = sm } diff --git a/internal/pkg/server/system.go b/internal/pkg/server/system.go index e30ade3..11d1c3f 100644 --- a/internal/pkg/server/system.go +++ b/internal/pkg/server/system.go @@ -2,18 +2,9 @@ package server import ( _ "fmt" + // sensor components ) -// allows for multiple readers/writers -type DeviceInfo struct { - Id int - Type string - Status string - Data string - Index int - TransactionId uint32 -} - /* type StatusMonitor struct {