expanded the sensor/device manager stuff to prep for fully fledged managers

main
KeeganForelight 2 years ago
parent 63811505eb
commit e64efad3fc

@ -1,9 +1,10 @@
devices:
address: 112
name: DO Sensor
reactor: reactor:
name: "Dummy Reactor" id: 2166136261
model: ""
name: Dummy Reactor
server: server:
ip: "192.168.100.2" ip: 192.168.100.2
port: 2022 port: 2022
devices:
address: 112 # decimal
name: "DO Sensor"

@ -1,79 +1,72 @@
package I2C package I2C
import ( import (
"fmt" "fmt"
"sync" "sync"
"time" "time"
) )
type I2CDevice struct { type I2CDevice struct {
*I2CBus // embeds bus *I2CBus // embeds bus
bool // stores whether dev is currently connected bool // stores whether dev is currently connected
int // addr int // addr
Data *data Data *data
} }
type data struct { type data struct {
string string
bool bool
sync.Mutex sync.Mutex
} }
func (d I2CDevice) String() string { func (d I2CDevice) String() string {
t := map[int]string{97:"DO Sensor",99:"pH Sensor",102:"Temperature Sensor",64:"DHT11 Sensor"} t := map[int]string{97: "DO Sensor", 99: "pH Sensor", 102: "Temperature Sensor", 64: "DHT11 Sensor"}
return t[d.int] return t[d.int]
} }
func NewDevice(addr int,bus *I2CBus) *I2CDevice { func NewDevice(addr int, bus *I2CBus) *I2CDevice {
d := &I2CDevice{} d := &I2CDevice{}
d.I2CBus = bus d.I2CBus = bus
d.int = addr d.int = addr
d.Data = &data{} d.Data = &data{}
return d return d
} }
func (d *I2CDevice) GetAddr() int { func (d *I2CDevice) GetAddr() int {
return d.int return d.int
} }
func (d *I2CDevice) GetStatus() string { func (d *I2CDevice) GetStatus() bool {
// TODO // TODO
s := d.I2CBus.GetStatus(d.int) return d.I2CBus.GetStatus(d.int)
if s {
d.Data.Active()
return "[green]ACTIVE[white]"
} else {
d.Data.Killed()
return "[red]KILLED[white]"
}
} }
func (d *I2CDevice) GetType() string { func (d *I2CDevice) GetType() string {
// TODO // TODO
return fmt.Sprint(d) return fmt.Sprint(d)
} }
func (d *I2CDevice) GetData() string { func (d *I2CDevice) GetData() string {
d.Data.Lock() d.Data.Lock()
defer d.Data.Unlock() defer d.Data.Unlock()
d.Data.string = d.I2CBus.GetData(d.int) d.Data.string = d.I2CBus.GetData(d.int)
return d.Data.string return d.Data.string
} }
func (d *data) Active() { func (d *data) Active() {
d.Lock() d.Lock()
defer d.Unlock() defer d.Unlock()
if !d.bool { if !d.bool {
d.string = "" d.string = ""
d.bool = true d.bool = true
} }
} }
func (d *data) Killed() { func (d *data) Killed() {
d.Lock() d.Lock()
defer d.Unlock() defer d.Unlock()
if d.bool { if d.bool {
d.string = time.Now().Format("Mon at 03:04:05pm MST") d.string = time.Now().Format("Mon at 03:04:05pm MST")
d.bool = false d.bool = false
} }
} }

@ -1,97 +1,102 @@
package I2C package I2C
import ( import (
"time" _ "fmt"
_ "fmt" "sync"
"sync" "time"
) )
/* /*
i2c monitor implements a long running monitor responsible for sending active devices to the rlc i2c monitor implements a long running monitor responsible for sending active devices to the rlc
*/ */
type I2CMonitor struct { type I2CMonitor struct {
*I2CBus *I2CBus
Devices *devs Devices *devs
DevChan chan int DevChan chan int
} }
type devs struct { type devs struct {
sync.Mutex sync.Mutex
m map[int]*I2CDevice m map[int]*I2CDevice
} }
func NewMonitor(bus int,ch chan int) *I2CMonitor { func NewMonitor(bus int, ch chan int) *I2CMonitor {
m := &I2CMonitor{} m := &I2CMonitor{}
b := NewBus(bus) b := NewBus(bus)
m.I2CBus = b m.I2CBus = b
d := make(map[int]*I2CDevice) d := make(map[int]*I2CDevice)
m.Devices = &devs{m:d} m.Devices = &devs{m: d}
m.DevChan = ch m.DevChan = ch
return m return m
} }
func (m *I2CMonitor) Update() { func (m *I2CMonitor) Update() {
/* /*
scans bus and adds new active devices scans bus and adds new active devices
*/ */
devs := m.Scan() devs := m.Scan()
chng := m.Devices.Parse(m.I2CBus,devs) chng := m.Devices.Parse(m.I2CBus, devs)
for _, d := range chng { for _, d := range chng {
go m.ConnectDevice(d) go m.ConnectDevice(d)
} }
} }
func (m *I2CMonitor) Monitor() { func (m *I2CMonitor) Monitor() {
// functon that updates the device list and notifies rlc of any changes to sensor composition // functon that updates the device list and notifies rlc of any changes to sensor composition
s := make(chan struct{}) s := make(chan struct{})
t := 5 * time.Second t := 5 * time.Second
go func(signal chan struct{},to time.Duration) { // simple signal func to init scan go func(signal chan struct{}, to time.Duration) { // simple signal func to init scan
for { for {
signal <-struct{}{} signal <- struct{}{}
time.Sleep(to) time.Sleep(to)
} }
}(s,t) }(s, t)
for { for {
<-s <-s
m.Update() m.Update()
} }
} }
func (m *I2CMonitor) ConnectDevice(addr int) { func (m *I2CMonitor) ConnectDevice(addr int) {
m.DevChan <-addr m.DevChan <- addr
} }
func (m *I2CMonitor) GetDevice(addr int) interface{ GetAddr() int; GetData() string; GetStatus() string; GetType() string } { func (m *I2CMonitor) GetDevice(addr int) interface {
m.Devices.Lock() GetAddr() int
defer m.Devices.Unlock() GetData() string
return m.Devices.m[addr] GetStatus() bool
GetType() string
} {
m.Devices.Lock()
defer m.Devices.Unlock()
return m.Devices.m[addr]
} }
func (d *devs) Parse(bus *I2CBus,devices map[int]bool) []int { func (d *devs) Parse(bus *I2CBus, devices map[int]bool) []int {
d.Lock() d.Lock()
defer d.Unlock() defer d.Unlock()
newdevs := []int{} newdevs := []int{}
for addr, status := range devices { for addr, status := range devices {
if dev, exists := d.m[addr]; exists { if dev, exists := d.m[addr]; exists {
// device seen // device seen
if status != dev.bool { // if device state changed if status != dev.bool { // if device state changed
dev.bool = status dev.bool = status
if status { if status {
newdevs = append(newdevs,dev.GetAddr()) newdevs = append(newdevs, dev.GetAddr())
} }
} }
} else { } else {
// device not seen yet // device not seen yet
if status { if status {
// active // active
newd := NewDevice(addr,bus) newd := NewDevice(addr, bus)
newd.bool = status newd.bool = status
d.m[addr] = newd d.m[addr] = newd
newdevs = append(newdevs,newd.GetAddr()) newdevs = append(newdevs, newd.GetAddr())
} }
} }
} }
return newdevs return newdevs
} }

@ -3,7 +3,6 @@ package reactor
import ( import (
"context" "context"
"fmt" "fmt"
"sync"
//"FRMS/internal/pkg/logging" //"FRMS/internal/pkg/logging"
//"google.golang.org/grpc" //"google.golang.org/grpc"
@ -31,57 +30,62 @@ func (c *Coordinator) DevStatus(ch chan *DeviceStatus, a int, dm DeviceManager)
func (c *Coordinator) GetStatus() []*pb.Device { func (c *Coordinator) GetStatus() []*pb.Device {
// db stuff // db stuff
//api := client.WriteAPIBlocking(c.Org, c.Bucket) //api := client.WriteAPIBlocking(c.Org, c.Bucket)
var wg sync.WaitGroup //var wg sync.WaitGroup
devs := []*pb.Device{} devs := []*pb.Device{}
statusChan := make(chan *DeviceStatus) return devs
c.Devices.Lock() /*
for a, dm := range c.Devices.Managers { statusChan := make(chan *DeviceStatus)
wg.Add(1) c.Devices.Lock()
go c.DevStatus(statusChan, a, dm) for a, dm := range c.Devices.Managers {
} wg.Add(1)
c.Devices.Unlock() go c.DevStatus(statusChan, a, dm)
allDone := make(chan struct{})
go func() {
wg.Wait()
allDone <- struct{}{}
}() // once all the status are sent we send all done on the chan
for {
select {
case s := <-statusChan:
fmt.Printf("%v is %v\n", s.Type, s.Status)
/*
data := strings.Split(s.Data,",") // T:10C,H:102% -> T:10C H:10%
for _, m := range data {
var meas string
splt := strings.Split(m,":") // T 10C or H 10%
if splt[0] == "T" {
meas = "Temperature"
} else if splt[0] == "H" {
meas = "Humidity"
}
val, err := strconv.ParseFloat(strings.Trim(splt[1]," %C\n"), 64)
if err != nil {
panic(err)
}
p := influxdb2.NewPoint("measurements",map[string]string{"type":meas},map[string]interface{}{"val":val},time.Now())
if err := api.WritePoint(context.Background(), p); err != nil {
panic(err)
}
}
devs = append(devs,&pb.Device{Addr:int32(s.Addr),Type:s.Type,Status:s.Status,Data:s.Data})
*/
wg.Done()
case <-allDone:
return devs
} }
} c.Devices.Unlock()
allDone := make(chan struct{})
go func() {
wg.Wait()
allDone <- struct{}{}
}() // once all the status are sent we send all done on the chan
for {
select {
case s := <-statusChan:
fmt.Printf("%v is %v\n", s.Type, s.Status)
/*
data := strings.Split(s.Data,",") // T:10C,H:102% -> T:10C H:10%
for _, m := range data {
var meas string
splt := strings.Split(m,":") // T 10C or H 10%
if splt[0] == "T" {
meas = "Temperature"
} else if splt[0] == "H" {
meas = "Humidity"
}
val, err := strconv.ParseFloat(strings.Trim(splt[1]," %C\n"), 64)
if err != nil {
panic(err)
}
p := influxdb2.NewPoint("measurements",map[string]string{"type":meas},map[string]interface{}{"val":val},time.Now())
if err := api.WritePoint(context.Background(), p); err != nil {
panic(err)
}
}
devs = append(devs,&pb.Device{Addr:int32(s.Addr),Type:s.Type,Status:s.Status,Data:s.Data})
*/
/*
wg.Done()
case <-allDone:
return devs
}
}
*/
} }
// grpc status update handler // grpc status update handler
func (c *Coordinator) Ping() { func (c *Coordinator) Ping() {
// sends all device status to central coordinator // sends all device status to central coordinator
fmt.Printf("Pinging cc\n")
devs := c.GetStatus() devs := c.GetStatus()
req := &pb.ReactorStatusPing{Id: uint32(c.ID), Devices: devs} req := &pb.ReactorStatusPing{Id: int32(c.ID), Devices: devs}
_, err := c.MonitoringClient.ReactorStatusHandler(context.Background(), req) _, err := c.MonitoringClient.ReactorStatusHandler(context.Background(), req)
if err != nil { if err != nil {
c.Err <- err c.Err <- err

@ -7,7 +7,6 @@ import (
pb "FRMS/internal/pkg/grpc" pb "FRMS/internal/pkg/grpc"
"FRMS/internal/pkg/influxdb" "FRMS/internal/pkg/influxdb"
"FRMS/internal/pkg/logging" "FRMS/internal/pkg/logging"
"FRMS/internal/pkg/sensor"
"FRMS/internal/pkg/system" "FRMS/internal/pkg/system"
"context" "context"
"errors" "errors"
@ -51,7 +50,9 @@ type Coordinator struct {
Config *viper.Viper Config *viper.Viper
MonitoringClient pb.MonitoringClient MonitoringClient pb.MonitoringClient
// connected devices // connected devices
Devices *DeviceManagers // struct for locking *Devices // struct for locking
*Sensors
// other stuff and things
Err chan error Err chan error
mu sync.Mutex mu sync.Mutex
HB time.Duration HB time.Duration
@ -67,7 +68,19 @@ type active struct {
sync.Mutex sync.Mutex
} }
type DeviceManagers struct { type Sensors struct {
Managers map[int]SensorManager
sync.Mutex
}
type SensorManager interface {
Start()
GetType() string
GetStatus() string
GetData() string
}
type Devices struct {
Managers map[int]DeviceManager Managers map[int]DeviceManager
sync.Mutex sync.Mutex
} }
@ -84,19 +97,19 @@ type DeviceManager interface {
type I2CDev interface { type I2CDev interface {
GetAddr() int GetAddr() int
GetData() string GetData() string
GetStatus() string GetStatus() bool
GetType() string GetType() string
} }
func NewDeviceManager(i2c I2CDev) DeviceManager { //func NewDeviceManager(i2c I2CDev) DeviceManager {
return sensor.NewDeviceManager(i2c) //return sensor.NewDeviceManager(i2c)
} //}
type I2CMonitor interface { type I2CMonitor interface {
Monitor() Monitor()
GetDevice(int) interface { GetDevice(int) interface {
GetAddr() int GetAddr() int
GetStatus() string GetStatus() bool
GetData() string GetData() string
GetType() string GetType() string
} }
@ -107,9 +120,13 @@ func NewI2CMonitor(b int, ch chan int) I2CMonitor {
} }
func NewCoordinator(config *viper.Viper, ch chan error) *Coordinator { func NewCoordinator(config *viper.Viper, ch chan error) *Coordinator {
sen := new(DeviceManagers) // sensor/device manager struct
sen.Managers = make(map[int]DeviceManager) dm := new(Devices)
c := &Coordinator{Err: ch, Devices: sen, Config: config} dm.Managers = make(map[int]DeviceManager)
sm := new(Sensors)
sm.Managers = make(map[int]SensorManager)
c := &Coordinator{Err: ch, Devices: dm, Sensors: sm, Config: config}
c.HB = time.Duration(5 * time.Second) c.HB = time.Duration(5 * time.Second)
// this is going to be scuffed // this is going to be scuffed
@ -204,13 +221,14 @@ func (c *Coordinator) DeviceConnect(i2c I2CDev) {
c.Devices.Lock() c.Devices.Lock()
defer c.Devices.Unlock() defer c.Devices.Unlock()
addr := i2c.GetAddr() addr := i2c.GetAddr()
if dm, exists := c.Devices.Managers[addr]; !exists { fmt.Printf("Device %d (%x) found!\n", addr, addr)
dm := NewDeviceManager(i2c) //if dm, exists := c.Devices.Managers[addr]; !exists {
c.Devices.Managers[addr] = dm //dm := NewDeviceManager(i2c)
go dm.Start() //c.Devices.Managers[addr] = dm
} else { //go dm.Start()
go dm.Start() //} else {
} //go dm.Start()
//}
} }
func (c *Coordinator) Discover() { func (c *Coordinator) Discover() {

@ -0,0 +1,45 @@
package sensor
// do sensor and methods
import (
pb "FRMS/internal/pkg/grpc"
"errors"
"sync"
"github.com/spf13/viper"
)
type DOSensorManager struct {
// do sensor manager
Sensor *pb.Sensor // for sending/updating
sync.RWMutex
}
func (s *DOSensorManager) SetName(name string) error {
// #TODO
return errors.New("UNIMPL")
}
func (s *DOSensorManager) SetStatus(status int) error {
// #TODO
return errors.New("UNIMPL")
}
func (s *DOSensorManager) SetSampleRate(rate int) error {
// #TODO
return errors.New("UNIMPL")
}
func (s *DOSensorManager) Update(sensor *pb.Sensor, config *viper.Viper) error {
// updates info
s.Lock()
defer s.Unlock()
s.Sensor = sensor
return errors.New("UNIMPL")
}
func (s *DOSensorManager) String() string {
// #TODO
return "TODO"
}

@ -3,12 +3,10 @@ package sensor
import ( import (
_ "FRMS/internal/pkg/I2C" _ "FRMS/internal/pkg/I2C"
_ "fmt" _ "fmt"
"log"
"strings"
"sync"
"time"
) )
/*
// I think most of this is unnessecary as hell
type Manager struct { type Manager struct {
*Dev *Dev
I2CDevice I2CDevice
@ -26,14 +24,13 @@ type Dev struct {
// last known values // last known values
Addr int Addr int
Type string Type string
Status string // could be more efficient but to hell with it Status int // could be more efficient but to hell with it
Data string Data string
} }
type I2CDevice interface { type I2CDevice interface {
// basic device info // basic device info
GetAddr() int GetStatus() bool
GetStatus() string
GetType() string GetType() string
GetData() string GetData() string
} }
@ -112,3 +109,4 @@ func (a *Active) IsActive() bool {
defer a.Unlock() defer a.Unlock()
return a.bool return a.bool
} }
*/

@ -1,36 +1,66 @@
package sensor package sensor
import ( import (
pb "FRMS/internal/pkg/grpc"
"errors"
"fmt" "fmt"
"sync"
"github.com/spf13/viper"
) )
/* /*
this file serves as a map that the sensor library can use to determine which manager to call Returns the correct manager for sensor/device
*/ */
type SensorManager interface {
type NewManager interface { // basic sensor stuff
// serves as interface to restrict managers can be relocated SetName(string) error // change display name
SetStatus(int) error // update status
SetSampleRate(int) error // update sample rate
Update(*pb.Sensor, *viper.Viper) error // write updates
String() string // printable
} }
type DM struct { type DeviceManager interface {
DeviceManagers map[uint]NewManager // basic device stuff
sync.Mutex SetName(string) error // change displayed name
SetStatus(int) error // status change
SetParameter(string, string) error // key, val
Update(*pb.Device, *viper.Viper) error // write updates
String() string // printable
} }
func NewManagerDirectory() *DM { func NewSensorManager(sensor *pb.Sensor) (SensorManager, error) {
m := map[uint]NewManager{ // returns correct sensor manager by ID
// map to set functions up var sm SensorManager
//112: NewDOManager(), var err error
switch id := sensor.GetAddr(); id {
case 97:
// DO
sm = &DOSensorManager{Sensor: sensor}
case 99:
// pH
sm = &PHSensorManager{Sensor: sensor}
case 102:
// RTD
sm = &RTDSensorManager{Sensor: sensor}
default:
err = errors.New(fmt.Sprintf("Error: sensor id %v unrecognized!", id))
} }
return &DM{DeviceManagers: m} return sm, err
} }
func (d *DM) GetManager(addr uint) (NewManager, error) { func NewDeviceManager(device *pb.Device) (DeviceManager, error) {
d.Lock() // returns correct device manager by ID
defer d.Unlock() var dm DeviceManager
if m, ok := d.DeviceManagers[addr]; ok { var err error
return m, nil
switch id := device.GetAddr(); id {
case 256:
// PWM
dm = &PWMDeviceManager{Device: device}
default:
err = errors.New(fmt.Sprintf("Error: device id %v unrecognized!", id))
} }
return &DM{}, fmt.Errorf("No manager found for address %d!", addr) return dm, err
} }

@ -0,0 +1,45 @@
package sensor
// do sensor and methods
import (
pb "FRMS/internal/pkg/grpc"
"errors"
"sync"
"github.com/spf13/viper"
)
type PHSensorManager struct {
// do sensor manager
Sensor *pb.Sensor // for sending/updating
sync.RWMutex
}
func (s *PHSensorManager) SetName(name string) error {
// #TODO
return errors.New("UNIMPL")
}
func (s *PHSensorManager) SetStatus(status int) error {
// #TODO
return errors.New("UNIMPL")
}
func (s *PHSensorManager) SetSampleRate(rate int) error {
// #TODO
return errors.New("UNIMPL")
}
func (s *PHSensorManager) Update(sensor *pb.Sensor, config *viper.Viper) error {
// updates info
s.Lock()
defer s.Unlock()
s.Sensor = sensor
return errors.New("UNIMPL")
}
func (s *PHSensorManager) String() string {
// #TODO
return "TODO"
}

@ -0,0 +1,45 @@
package sensor
// do sensor and methods
import (
pb "FRMS/internal/pkg/grpc"
"errors"
"sync"
"github.com/spf13/viper"
)
type PWMDeviceManager struct {
// do sensor manager
Device *pb.Device // for sending/updating
sync.RWMutex
}
func (s *PWMDeviceManager) SetName(name string) error {
// #TODO
return errors.New("UNIMPL")
}
func (s *PWMDeviceManager) SetStatus(status int) error {
// #TODO
return errors.New("UNIMPL")
}
func (s *PWMDeviceManager) SetParameter(key, val string) error {
// #TODO
return errors.New("UNIMPL")
}
func (s *PWMDeviceManager) Update(device *pb.Device, config *viper.Viper) error {
// updates info
s.Lock()
defer s.Unlock()
s.Device = device
return errors.New("UNIMPL")
}
func (s *PWMDeviceManager) String() string {
// #TODO
return "TODO"
}

@ -0,0 +1,45 @@
package sensor
// do sensor and methods
import (
pb "FRMS/internal/pkg/grpc"
"errors"
"sync"
"github.com/spf13/viper"
)
type RTDSensorManager struct {
// do sensor manager
Sensor *pb.Sensor // for sending/updating
sync.RWMutex
}
func (s *RTDSensorManager) SetName(name string) error {
// #TODO
return errors.New("UNIMPL")
}
func (s *RTDSensorManager) SetStatus(status int) error {
// #TODO
return errors.New("UNIMPL")
}
func (s *RTDSensorManager) SetSampleRate(rate int) error {
// #TODO
return errors.New("UNIMPL")
}
func (s *RTDSensorManager) Update(sensor *pb.Sensor, config *viper.Viper) error {
// updates info
s.Lock()
defer s.Unlock()
s.Sensor = sensor
return errors.New("UNIMPL")
}
func (s *RTDSensorManager) String() string {
// #TODO
return "TODO"
}

@ -1,6 +0,0 @@
package sensor
import (
_ "fmt"
)

@ -3,10 +3,13 @@ package server
import ( import (
pb "FRMS/internal/pkg/grpc" pb "FRMS/internal/pkg/grpc"
"FRMS/internal/pkg/logging" "FRMS/internal/pkg/logging"
"FRMS/internal/pkg/sensor"
"context" "context"
"fmt" "fmt"
_ "log" _ "log"
"sync" "sync"
"github.com/spf13/viper"
) )
// this package will implement a reactor manager and associated go routines // this package will implement a reactor manager and associated go routines
@ -14,21 +17,33 @@ import (
type ReactorManager struct { type ReactorManager struct {
*Manager *Manager
// StatusMon *StatusMonitor putting on pause // StatusMon *StatusMonitor putting on pause
*devstatus *ReactorSensors
*ReactorDevices
Config *viper.Viper // config to update
}
type ReactorSensors struct {
// sensor struct
Sensors map[int]SensorManager
sync.RWMutex
} }
type devstatus struct { type ReactorDevices struct {
// keeping this around but not using it to create status for status mon // device struct
sync.Mutex Devices map[int]DeviceManager
Devs map[int]*DeviceInfo sync.RWMutex
} }
func NewReactorManager(err chan error) *ReactorManager { func NewReactorManager(err chan error) *ReactorManager {
r := &ReactorManager{} r := &ReactorManager{}
di := make(map[int]*DeviceInfo) // sub managers
r.devstatus = &devstatus{Devs: di} dm := make(map[int]DeviceManager)
sm := make(map[int]SensorManager)
r.ReactorDevices = &ReactorDevices{Devices: dm}
r.ReactorSensors = &ReactorSensors{Sensors: sm}
// core manager
r.Manager = NewManager(err) r.Manager = NewManager(err)
//r.StatusMon = NewStatusMonitor("Reactor", c.Id, sys)
return r return r
} }
@ -41,38 +56,94 @@ func (r *ReactorManager) Exit() {
r.Manager.Exit() r.Manager.Exit()
logging.Debug(logging.DExit, "RMA %v exiting", r.Id) logging.Debug(logging.DExit, "RMA %v exiting", r.Id)
//go r.StatusMon.Send(&DeviceInfo{Id: r.Id, Type: "Reactor", Status: "[red]OFFLINE[white]", Data: fmt.Sprintf("Last Seen %v", time.Now().Format("Mon at 03:04:05pm MST"))}, "Reactor") //go r.StatusMon.Send(&DeviceInfo{Id: r.Id, Type: "Reactor", Status: "[red]OFFLINE[white]", Data: fmt.Sprintf("Last Seen %v", time.Now().Format("Mon at 03:04:05pm MST"))}, "Reactor")
r.devstatus.Lock()
defer r.devstatus.Unlock()
// keeping this because it **COULD** be useful, maybe
for _, d := range r.Devs {
newd := d
newd.Status = "UNKOWN"
r.Devs[newd.Id] = newd
//go r.StatusMon.Send(newd, "Device")
}
} }
func (r *ReactorManager) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) { func (r *ReactorManager) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) {
// function client will call to update reactor information // function client will call to update reactor information
//go r.PingReset() //go r.PingReset()
fmt.Printf("Recieved ping from %v!\n", req.GetId()) fmt.Printf("Recieved ping from %v!\n", req.GetId())
for _, dev := range req.GetDevices() { // update devices/sensors
d := &DeviceInfo{Id: int(dev.GetAddr()), Type: dev.GetName(), Status: dev.GetStatus().String(), Data: dev.GetData()} go r.UpdateSensors(req.GetSensors())
go r.UpdateDevice(d) go r.UpdateDevices(req.GetDevices())
}
return &pb.ReactorStatusResponse{Id: int32(r.Id)}, nil return &pb.ReactorStatusResponse{Id: int32(r.Id)}, nil
} }
func (r *ReactorManager) UpdateDevice(d *DeviceInfo) { // sensor/device stuff
r.devstatus.Lock()
defer r.devstatus.Unlock() type SensorManager interface {
if olddev, ok := r.Devs[d.Id]; !ok { SetName(string) error // change name
// new device SetSampleRate(int) error // change sample rate
r.Devs[d.Id] = d Update(*pb.Sensor, *viper.Viper) error // write updates
//go r.StatusMon.Send(d, "Device") String() string // printing
} else if olddev.Status != d.Status || olddev.Data != d.Data { }
// dev status or data has changed
r.Devs[d.Id] = d type DeviceManager interface {
//go r.StatusMon.Send(d, "Device") SetName(string) error // change name
SetParameter(string, string) error // key, val
Update(*pb.Device, *viper.Viper) error // write updates
String() string // printing
}
func NewSensorManager(sens *pb.Sensor) (SensorManager, error) {
// returns a manager struct
return sensor.NewSensorManager(sens)
}
func NewDeviceManager(device *pb.Device) (DeviceManager, error) {
// returns a manager struct
return sensor.NewDeviceManager(device)
}
func (r *ReactorManager) UpdateDevices(devs []*pb.Device) {
// pass updates to correct manager
r.ReactorDevices.RLock() // read lock only
defer r.ReactorDevices.RUnlock()
for _, dev := range devs {
// looping over devs
if dm, ok := r.ReactorDevices.Devices[int(dev.GetAddr())]; ok {
// device manager found
go dm.Update(dev, r.Config) // update dm
} else {
// not found
go r.AddDevice(dev, r.Err)
}
}
}
func (r *ReactorDevices) AddDevice(dev *pb.Device, errCh chan error) {
r.Lock() // write lock
defer r.Unlock()
dm, err := NewDeviceManager(dev)
if err != nil {
errCh <- err
}
r.Devices[int(dev.GetAddr())] = dm
}
func (r *ReactorManager) UpdateSensors(sensors []*pb.Sensor) {
// pass updates to correct manager
r.ReactorSensors.RLock() // read lock
defer r.ReactorSensors.RUnlock()
for _, sens := range sensors {
// looping over sensors
if sm, ok := r.ReactorSensors.Sensors[int(sens.GetAddr())]; ok {
// sensor manager found
go sm.Update(sens, r.Config) // update sm
} else {
// not found
go r.AddSensor(sens, r.Err)
}
}
}
func (r *ReactorSensors) AddSensor(sensor *pb.Sensor, errCh chan error) {
r.Lock() // write lock
defer r.Unlock()
sm, err := NewSensorManager(sensor)
if err != nil {
errCh <- err
} }
r.Sensors[int(sensor.GetAddr())] = sm
} }

@ -2,18 +2,9 @@ package server
import ( import (
_ "fmt" _ "fmt"
// sensor components
) )
// allows for multiple readers/writers
type DeviceInfo struct {
Id int
Type string
Status string
Data string
Index int
TransactionId uint32
}
/* /*
type StatusMonitor struct { type StatusMonitor struct {

Loading…
Cancel
Save