working downstream

main
KeeganForelight 2 years ago
parent efd91b1c90
commit 42ce886114

@ -1,13 +1,55 @@
### Planning
**Monitoring Changes**
I want to refactor the reactor stuff to be less method oriented as far as data collection. For example, the monitoring stuff is all about events that happen pretty infrequently. It makes sense to then use a channel on the device side to just feed relevant status updates back to the reactor. I think that this makes the most sense because this will synchronize updates and leverage the rarity of events to cut down on errant calls.
- pros
- less repitive method calls needed
- less device locking
- localize the information to different packages
- cons
- extra memory for channels and duplicate storage info
- could just remove status from dm?
**New Idea**
I can leverage wireguard to do server-> reactor connections even beyond the testing phase
Changes:
1) move device coordinator into device package
2) expose relevant methods to reactor interface
3) clarify individual package responsibilities
4) add stuff server side to create/destroy grpc connections as the information is rendered client side
- this might be scuffed but oh well
### Package Separation
**Reactor**
- coordinator
- creates initial link to the server
- creates database client
- creates and starts a device coordinator
**Device**
- coordinator
- searches i2c bus for connected devices
- spins up managers to control the connected devices
- relays information back up to the reactor coordinator
- manager
- control over singular device
- has the core information that will be needed across any type of device (name, status, address etc)
- sub-manager
- fine grained struct with methods specific to the device
**Server**
Going to ignore for now because I am lazy
- central coordinator starts up database connection config etc
- reactor coordinator
### TODO
**Monitoring Changes**

@ -23,7 +23,7 @@ type I2CClient struct {
sync.Mutex
}
func NewI2CClient(config *viper.Viper) (*I2CClient, error) {
func NewClient(config *viper.Viper) (*I2CClient, error) {
var err error
var bus int
client := &I2CClient{}

@ -10,6 +10,7 @@ import (
type Manager interface {
Start() error
Exit() error
IsActive() int
}
func NewManager(max int) Manager {

@ -3,7 +3,10 @@ package controller
// do sensor and methods
import (
"fmt"
"sync"
"github.com/spf13/viper"
)
type PWMManager struct {
@ -26,6 +29,10 @@ func (m *PWMManager) GetFrequency() (int, error) {
return m.Frequency, nil
}
func (m *PWMManager) LoadConfig(config *viper.Viper, key string) {
fmt.Printf("config\n")
}
func (m *PWMManager) GetDefaultName() string {
return "pwm controller"
}

@ -1,25 +1,147 @@
package device
// serves as a server side device coordinator to sync
import (
"FRMS/internal/pkg/I2C"
pb "FRMS/internal/pkg/grpc"
"FRMS/internal/pkg/manager"
"fmt"
"sync"
"time"
// assume has a server connection from DM
// STEPS
// 1) client loads web page
// 2) DC pushes what it has to the client
// 3) requests DM for what it doesnt
// 4) DM responds
// 5) DC forwards responses to client
"github.com/spf13/viper"
)
// What can happen
// - Client can push something to client
// - Client requests info
// - If this happens, we can push it to client when we get it. Just need to know it was requests
// Created by rlc to manage devices
// basic manager to embed
type Manager interface {
Start() error
Exit() error
// create a heartbeat to send to chan at intervals
HeartBeat(chan struct{}, int, int, time.Duration)
}
func NewManager() Manager {
// dont need timeout functionality
return manager.New(0)
}
// I2C client for locking
type I2CClient interface {
// i2c client w/ locking
GetConnected() (map[int]bool, error) // gets connected addresses
SendCmd(int, string) (string, error)
}
func NewI2CClient(config *viper.Viper) (I2CClient, error) {
return I2C.NewClient(config)
}
// device coordinator itself
type DeviceCoordinator struct {
NameChan chan string
StatusChan chan string
I2C I2CClient
Manager
Config *viper.Viper
managersMu sync.RWMutex
Managers map[int]*DeviceManager
}
func NewCoordinator(config *viper.Viper) *DeviceCoordinator {
dm := make(map[int]*DeviceManager)
m := NewManager()
c := &DeviceCoordinator{
Manager: m,
Managers: dm,
Config: config,
}
return c
}
func (c *DeviceCoordinator) Start() error {
var err error
if err = c.Manager.Start(); err != nil {
return err
}
if c.I2C, err = NewI2CClient(c.Config); err != nil {
return err
}
go c.Monitor()
return err
}
func ClientSetName() {
// pass client set names to DM
func (c *DeviceCoordinator) Monitor() {
// monitor I2C for new devices
ch := make(chan struct{})
go c.HeartBeat(ch, 10, 0, time.Second)
for range ch {
// on notification (10s)
devs, err := c.I2C.GetConnected()
if err != nil {
panic(err)
}
// update list
go c.UpdateManagers(devs)
}
}
func (c *DeviceCoordinator) UpdateManagers(active map[int]bool) {
// updates managers
c.managersMu.Lock()
defer c.managersMu.Unlock()
for addr, dm := range c.Managers {
_, ok := active[addr]
if ok && dm.IsActive() == 0 {
// active and dm not
if err := dm.Start(); err != nil {
panic(err)
}
} else if dm.IsActive() == 1 {
// not active and dm is
if err := dm.Exit(); err != nil {
panic(err)
}
}
// remove from map
delete(active, addr)
}
for addr, _ := range active {
// no manager, create one
fmt.Printf("New device %d!\n", addr)
dm, err := NewDeviceManager(addr, c.Config, "", c.I2C)
if err != nil {
panic(err)
}
if err := dm.Start(); err != nil {
panic(err)
}
c.Managers[addr] = dm
}
}
func (c *DeviceCoordinator) GetDeviceInfo() ([]*pb.Device, error) {
// gets device info for monitoring
c.managersMu.RLock()
defer c.managersMu.RUnlock()
var devices []*pb.Device
for addr, dm := range c.Managers {
// looping over devices
devices = append(devices, &pb.Device{
Addr: int32(addr),
Status: pb.Status(dm.IsActive()),
})
}
return devices, nil
}

@ -10,7 +10,9 @@ import (
type SubManager interface {
Start(int) error
Exit() error
IsActive() int
String() string // printing info about the sub manager
LoadConfig(*viper.Viper, string)
// for config bs
GetDefaultName() string
@ -23,23 +25,22 @@ type NameChan struct {
// base device manager
type DeviceManager struct {
SubManager `mapstructure:",squash"`
SubManager
// across controllers/sensors
Address int `mapstructure:"address"`
Address int `mapstructure:"address"`
Name string `mapstructure:"name"`
infoMu sync.RWMutex
Name string `mapstructure:"name"`
Status string // easier to remember
Config *viper.Viper
ConfigPrefix string
}
func NewDeviceManager(addr int, config *viper.Viper, configPrefix string) (*DeviceManager, error) {
func NewDeviceManager(addr int, config *viper.Viper, configPrefix string, i2c I2CClient) (*DeviceManager, error) {
// validate prefix
s, err := NewSubManager(addr)
s, err := NewSubManager(addr, i2c)
dm := &DeviceManager{
SubManager: s,
@ -61,26 +62,25 @@ func (m *DeviceManager) LoadConfig() error {
}
m.Config.UnmarshalKey(mainKey, m)
m.SubManager.LoadConfig(m.Config, mainKey)
return nil
}
func (m *DeviceManager) Start() error {
return m.SubManager.Start(m.Address)
}
// load config and then start
var err error
func (m *DeviceManager) UpdateStatus(status string) error {
// updates device status
m.infoMu.Lock()
defer m.infoMu.Unlock()
m.Status = status
return nil
}
// load config
if err = m.LoadConfig(); err != nil {
return err
}
// start
if err = m.SubManager.Start(m.Address); err != nil {
return err
}
func (m *DeviceManager) GetStatus() string {
// updates device status
m.infoMu.RLock()
defer m.infoMu.RUnlock()
return m.Status
return err
}
// dev info grpc handlers

@ -11,20 +11,20 @@ import (
Returns the correct manager for sensor/controller
*/
func NewSubManager(addr int) (SubManager, error) {
func NewSubManager(addr int, i2c I2CClient) (SubManager, error) {
// returns correct device manager by ID
var m SubManager
var err error
switch addr {
case 97:
// DO
m = sensor.NewDOManager()
m = sensor.NewDOManager(i2c)
case 99:
// pH
m = sensor.NewPHManager()
m = sensor.NewPHManager(i2c)
case 102:
// RTD
m = sensor.NewRTDManager()
m = sensor.NewRTDManager(i2c)
case 256:
// PWM
m = controller.NewPWMManager()

@ -50,6 +50,10 @@ func (m *Manager) Exit() error {
return errors.New("Manager not active!")
}
func (m *Manager) IsActive() int {
return int(atomic.LoadInt32(&m.Active))
}
// Heartbeat tracker
func (m *Manager) HeartBeat(ping chan struct{}, hb, interval int, scale time.Duration) {

@ -3,7 +3,7 @@ package reactor
// file describes reactor level coordinator and associated implementation
import (
"FRMS/internal/pkg/I2C"
"FRMS/internal/pkg/device"
pb "FRMS/internal/pkg/grpc"
"FRMS/internal/pkg/influxdb"
"FRMS/internal/pkg/logging"
@ -31,7 +31,7 @@ func NewManager(max int) Manager {
return manager.New(max)
}
// db client interface
// db client
type DBClient interface {
//
Start() error
@ -41,14 +41,15 @@ func NewDBClient(config *viper.Viper) (DBClient, error) {
return influxdb.NewDBClient(config)
}
type I2CClient interface {
// simple client to push responsibilites to sensor
GetConnected() (map[int]bool, error) // gets all connected addr
SendCmd(int, string) (string, error) // send cmd, string is return
// device coordinator
type DeviceCoordinator interface {
Start() error
// in grpc format
GetDeviceInfo() ([]*pb.Device, error)
}
func NewI2CClient(config *viper.Viper) (I2CClient, error) {
return I2C.NewI2CClient(config)
func NewDeviceCoordinator(config *viper.Viper) DeviceCoordinator {
return device.NewCoordinator(config)
}
type Server struct {
@ -71,11 +72,10 @@ type ReactorCoordinator struct {
Info `mapstructure:",squash"`
Database DBClient
I2C I2CClient
MonitoringClient pb.MonitoringClient // grpc
pb.MonitoringClient // grpc embedding
*DeviceCoordinator // struct for locking
DeviceCoordinator // struct for locking
Err chan error
}
@ -111,12 +111,12 @@ func (c *ReactorCoordinator) Start() {
c.Err <- err
}
// loading clients
if c.Database, err = NewDBClient(c.Config); err != nil {
if err = c.DeviceCoordinator.Start(); err != nil {
c.Err <- err
}
if c.I2C, err = NewI2CClient(c.Config); err != nil {
// loading clients
if c.Database, err = NewDBClient(c.Config); err != nil {
c.Err <- err
}
@ -168,12 +168,7 @@ func (c *ReactorCoordinator) Monitor() {
for range ch {
// check devs and ping
logging.Debug(logging.DClient, "RLC Pinging server")
// this can probably be offloaded
active, err := c.I2C.GetConnected()
if err != nil {
c.Err <- err
}
go c.DeviceCoordinator.UpdateDevices(c.Config, c.I2C, active)
// ping central server with status
go c.Ping()
}
}
@ -230,3 +225,26 @@ func (c *ReactorCoordinator) Connect(ip string, port int) (*grpc.ClientConn, err
}
return conn, nil
}
func (c *ReactorCoordinator) Ping() {
// send device info to central coordinator
fmt.Printf("Pinging server\n")
var devices []*pb.Device
var err error
if devices, err = c.GetDeviceInfo(); err != nil {
c.Err <- err
}
// create request
req := &pb.ReactorStatusPing{
Id: int32(c.ID),
Devices: devices,
}
// ping server
if _, err = c.ReactorStatusHandler(context.Background(), req); err != nil {
c.Err <- err
}
}

@ -1,100 +0,0 @@
package reactor
import (
"FRMS/internal/pkg/device"
pb "FRMS/internal/pkg/grpc"
"sync"
"github.com/spf13/viper"
)
type DeviceManager interface {
Start() error
Exit() error
GetStatus() string
LoadConfig() error
}
func NewDeviceManager(addr int, config *viper.Viper, prefix string) (DeviceManager, error) {
return device.NewDeviceManager(addr, config, prefix)
}
type DeviceCoordinator struct {
Config *viper.Viper
Managers map[int]DeviceManager
sync.RWMutex
}
func NewDeviceCoordinator(config *viper.Viper) *DeviceCoordinator {
dm := &DeviceCoordinator{Config: config}
dm.Managers = make(map[int]DeviceManager)
return dm
}
func (c *DeviceCoordinator) UpdateDevices(config *viper.Viper, i2c I2CClient, active map[int]bool) error {
// update device list
c.Lock()
defer c.Unlock()
for addr, _ := range active {
// loop over devs
if _, ok := c.Managers[addr]; !ok {
// no device, creating one
dm, err := NewDeviceManager(addr, c.Config, "")
if err != nil {
return err
}
// starting
if err = dm.Start(); err != nil {
return err
}
// loading config
if err = dm.LoadConfig(); err != nil {
return err
}
// update entry
c.Managers[addr] = dm
}
}
// all devs accounted for
// I can rework this to rely on individual devices to keep track of status and only need above
// for addr, dm := range c.Managers {
// if active[addr] {
// // active
// if dm.IsActive() != 1 {
// err = dm.Start()
// }
// } else {
// if dm.IsActive() != 0 {
// err = dm.Exit()
// }
// }
// if err != nil {
// return err
// }
// }
return nil
}
func (c *DeviceCoordinator) GetDevices() ([]*pb.Device, error) {
c.RLock()
defer c.RUnlock()
var err error
var devices []*pb.Device
for addr, dm := range c.Managers {
status := pb.Status(pb.Status_value[dm.GetStatus()])
devices = append(devices, &pb.Device{
Addr: int32(addr),
Status: status,
})
}
return devices, err
}

@ -1,27 +0,0 @@
package reactor
import (
"context"
"fmt"
//"FRMS/internal/pkg/logging"
//"google.golang.org/grpc"
pb "FRMS/internal/pkg/grpc"
)
// implements grpc handler and device data aggregater handler
// grpc status update handler
func (c *ReactorCoordinator) Ping() {
// sends all device status to central coordinator
fmt.Printf("Pinging coordinator\n")
// get devices
devices, err := c.GetDevices()
if err != nil {
c.Err <- err
}
req := &pb.ReactorStatusPing{Id: int32(c.ID), Devices: devices}
if _, err := c.MonitoringClient.ReactorStatusHandler(context.Background(), req); err != nil {
c.Err <- err
}
}

@ -2,6 +2,7 @@ package sensor
import (
"errors"
"strconv"
"time"
)
@ -15,8 +16,8 @@ type Atlas struct {
// helper struct to embedd
I2C I2CClient
// delays unmarshalled
CalDelay int `mapstructure:"cal"`
ReadDelay int `mapstructure:"read"`
CalDelay int `mapstructure:"cal_delay"`
ReadDelay int `mapstructure:"read_delay"`
}
func (a *Atlas) Calibrate(addr int, cal string) error {
@ -42,10 +43,18 @@ func (a *Atlas) Read(addr int) (float32, error) {
//fmt.Printf("Error writing! %s", err)
return 0, err
}
if a.ReadDelay == 0 {
return 0, errors.New("Read Delay unset, please check config")
}
sleep := time.Duration(a.ReadDelay) * time.Millisecond
time.Sleep(sleep) // sleep between reads
//data, err := a.I2C.SendCmd(addr, "")
return 0, nil
data, err := a.I2C.SendCmd(addr, "")
if err != nil {
return 0, err
}
f, err := strconv.ParseFloat(data, 32)
return float32(f), err
}
// for config

@ -3,19 +3,22 @@ package sensor
// do sensor and methods
import (
"fmt"
"sync"
"github.com/spf13/viper"
)
type DOManager struct {
// do sensor manager
*Atlas // atlas helper
*Atlas `mapstructure:",squash"`
*SensorManager `mapstructure:",squash"`
sync.RWMutex
}
func NewDOManager() *DOManager {
a := &Atlas{}
func NewDOManager(i2c I2CClient) *DOManager {
a := &Atlas{I2C: i2c}
sm := NewSensorManager(a.Read)
m := &DOManager{
@ -25,6 +28,11 @@ func NewDOManager() *DOManager {
return m
}
func (m *DOManager) LoadConfig(config *viper.Viper, key string) {
config.UnmarshalKey(key, m)
fmt.Printf("DO: %v\n", m.Atlas)
}
func (m *DOManager) GetDefaultName() string {
return "DO Sensor"
}

@ -2,6 +2,7 @@ package sensor
import (
"FRMS/internal/pkg/manager"
"fmt"
"sync"
"time"
)
@ -9,6 +10,7 @@ import (
type Manager interface {
Start() error
Exit() error
IsActive() int
HeartBeat(chan struct{}, int, int, time.Duration)
}
@ -43,12 +45,17 @@ func (s *SensorManager) Start(addr int) error {
if err := s.Manager.Start(); err != nil {
return err
}
fmt.Printf("Sensor %d: %v\n", addr, s)
// starting monitoring
go s.Monitor(addr)
return nil
}
func (s *SensorManager) Monitor(addr int) {
ch := make(chan struct{}) // hb chan
if s.SampleRate == 0 {
s.SampleRate = 5000
}
go s.HeartBeat(ch, s.SampleRate, 1000, time.Millisecond)
for range ch {
@ -61,6 +68,7 @@ func (r *Reading) TakeReading(addr int) {
if err != nil {
panic(err)
}
fmt.Printf("got sample: %v\n", sample)
r.Lock()
defer r.Unlock()
r.Latest = sample

@ -3,19 +3,22 @@ package sensor
// do sensor and methods
import (
"fmt"
"sync"
"github.com/spf13/viper"
)
type PHManager struct {
// do sensor manager
*Atlas
*Atlas `mapstructure:",squash"`
*SensorManager `mapstructure:",squash"`
sync.RWMutex
}
func NewPHManager() *PHManager {
a := &Atlas{}
func NewPHManager(i2c I2CClient) *PHManager {
a := &Atlas{I2C: i2c}
sm := NewSensorManager(a.Read)
m := &PHManager{
SensorManager: sm,
@ -24,6 +27,11 @@ func NewPHManager() *PHManager {
return m
}
func (s *PHManager) LoadConfig(config *viper.Viper, key string) {
config.UnmarshalKey(key, s)
fmt.Printf("PH: %v\n", s.Atlas)
}
func (s *PHManager) GetDefaultName() string {
return "pH Sensor"
}

@ -3,18 +3,21 @@ package sensor
// do sensor and methods
import (
"fmt"
"sync"
"github.com/spf13/viper"
)
type RTDManager struct {
// do sensor manager
*Atlas
*Atlas `mapstructure:",squash"`
*SensorManager `mapstructure:",squash"`
sync.RWMutex
}
func NewRTDManager() *RTDManager {
a := &Atlas{}
func NewRTDManager(i2c I2CClient) *RTDManager {
a := &Atlas{I2C: i2c}
sm := NewSensorManager(a.Read)
m := &RTDManager{
SensorManager: sm,
@ -23,6 +26,11 @@ func NewRTDManager() *RTDManager {
return m
}
func (s *RTDManager) LoadConfig(config *viper.Viper, key string) {
config.UnmarshalKey(key, s)
fmt.Printf("RTD: %v\n", s.Atlas)
}
func (s *RTDManager) GetDefaultName() string {
return "RTD Sensor"
}

@ -1,7 +1,6 @@
package server
import (
"FRMS/internal/pkg/device"
pb "FRMS/internal/pkg/grpc"
"FRMS/internal/pkg/logging"
"FRMS/internal/pkg/manager"
@ -11,7 +10,6 @@ import (
"context"
"fmt"
_ "log"
"sync"
"github.com/spf13/viper"
)
@ -34,30 +32,30 @@ type ReactorManager struct {
// *ClientManager // client manager (OUTDATED)
*Client // access to ID etc
// StatusMon *StatusMonitor putting on pause
*ReactorDevices
// *ReactorDevices
Config *viper.Viper // config to update
Err chan error
}
type ReactorDevices struct {
// device struct
Devices map[int]DeviceManager
sync.RWMutex
}
// type ReactorDevices struct {
// // device struct
// Devices map[int]DeviceManager
// sync.RWMutex
// }
func NewReactorManager(cl *Client, config *viper.Viper, errCh chan error) *ReactorManager {
// making managers
m := NewManager(6)
dm := make(map[int]DeviceManager)
rd := &ReactorDevices{Devices: dm}
//dm := make(map[int]DeviceManager)
//rd := &ReactorDevices{Devices: dm}
//cm := NewClientManager(cl)
r := &ReactorManager{
//ClientManager: cm,
Manager: m,
Client: cl,
ReactorDevices: rd,
Config: config,
Err: errCh,
Manager: m,
Client: cl,
//ReactorDevices: rd,
Config: config,
Err: errCh,
}
return r
}
@ -88,65 +86,69 @@ func (r *ReactorManager) ReactorStatusHandler(ctx context.Context, req *pb.React
//go r.PingReset()
fmt.Printf("Recieved ping from %d!\n", req.GetId())
// update devices/sensors
go r.UpdateDevices(req.GetDevices())
for _, dev := range req.GetDevices() {
fmt.Printf("Device %d is %s ", dev.GetAddr(), dev.GetStatus().String())
}
fmt.Printf("\n")
// go r.UpdateDevices(req.GetDevices())
return &pb.ReactorStatusResponse{Id: int32(r.Id)}, nil
}
// device stuff
type DeviceManager interface {
LoadConfig() error
UpdateStatus(string) error
String() string // printing
}
func NewDeviceManager(addr int, config *viper.Viper, prefix string) (DeviceManager, error) {
// returns a manager struct
return device.NewDeviceManager(addr, config, prefix)
}
func (r *ReactorManager) UpdateDevices(devs []*pb.Device) {
// pass updates to correct manager
r.ReactorDevices.RLock() // read lock only
defer r.ReactorDevices.RUnlock()
for _, dev := range devs {
// looping over devs
if dm, ok := r.ReactorDevices.Devices[int(dev.GetAddr())]; ok {
// device manager found
go dm.UpdateStatus(dev.GetStatus().String())
//fmt.Println(dm)
} else {
// not found
go r.AddDevice(dev, r.Id, r.Config, r.Err)
}
}
}
func (r *ReactorDevices) AddDevice(dev *pb.Device, id int, config *viper.Viper, errCh chan error) {
// setting vars
prefix := fmt.Sprintf("reactors.%d.", id)
addr := int(dev.GetAddr())
var dm DeviceManager
var err error
// write locking
r.Lock()
defer r.Unlock()
if dm, err = NewDeviceManager(addr, config, prefix); err != nil {
errCh <- err
}
// setting status
if err = dm.UpdateStatus(dev.GetStatus().String()); err != nil {
errCh <- err
}
// loading config
if err = dm.LoadConfig(); err != nil {
errCh <- err
}
r.Devices[int(addr)] = dm
}
// // device stuff
// type DeviceManager interface {
// LoadConfig() error
// UpdateStatus(string) error
// String() string // printing
// }
// func NewDeviceManager(addr int, config *viper.Viper, prefix string) (DeviceManager, error) {
// // returns a manager struct
// return device.NewDeviceManager(addr, config, prefix)
// }
//func (r *ReactorManager) UpdateDevices(devs []*pb.Device) {
// // pass updates to correct manager
// r.ReactorDevices.RLock() // read lock only
// defer r.ReactorDevices.RUnlock()
// for _, dev := range devs {
// // looping over devs
// if dm, ok := r.ReactorDevices.Devices[int(dev.GetAddr())]; ok {
// // device manager found
// go dm.UpdateStatus(dev.GetStatus().String())
// //fmt.Println(dm)
// } else {
// // not found
// go r.AddDevice(dev, r.Id, r.Config, r.Err)
// }
// }
//}
// func (r *ReactorDevices) AddDevice(dev *pb.Device, id int, config *viper.Viper, errCh chan error) {
// // setting vars
// prefix := fmt.Sprintf("reactors.%d.", id)
// addr := int(dev.GetAddr())
// var dm DeviceManager
// var err error
// // write locking
// r.Lock()
// defer r.Unlock()
// if dm, err = NewDeviceManager(addr, config, prefix); err != nil {
// errCh <- err
// }
// // setting status
// if err = dm.UpdateStatus(dev.GetStatus().String()); err != nil {
// errCh <- err
// }
// // loading config
// if err = dm.LoadConfig(); err != nil {
// errCh <- err
// }
// r.Devices[int(addr)] = dm
// }

Loading…
Cancel
Save