regening grpc proto

main
KeeganForelight 2 years ago
parent 3d36a782f1
commit 4ab69cb503

@ -13,11 +13,11 @@ import (
"github.com/spf13/viper" "github.com/spf13/viper"
) )
type coordinator interface { type reactorCoordinator interface {
Start() Start()
} }
func NewCoordinator(config *viper.Viper, ch chan error) coordinator { func NewReactorCoordinator(config *viper.Viper, ch chan error) reactorCoordinator {
// allows interface checking as opposed to calling directly // allows interface checking as opposed to calling directly
return reactor.NewCoordinator(config, ch) return reactor.NewCoordinator(config, ch)
} }
@ -36,7 +36,7 @@ func main() {
conf := NewConfig("reactor") conf := NewConfig("reactor")
ch := make(chan error) ch := make(chan error)
rlc := NewCoordinator(conf, ch) // passing conf and err rlc := NewReactorCoordinator(conf, ch) // passing conf and err
go rlc.Start() go rlc.Start()
logging.Debug(logging.DStart, "Reactor Started") logging.Debug(logging.DStart, "Reactor Started")

@ -2,6 +2,7 @@ devices:
address: 112 address: 112
name: DO Sensor name: DO Sensor
reactor: reactor:
heartbeat: 5
id: 2166136261 id: 2166136261
model: "" model: ""
name: Dummy Reactor name: Dummy Reactor

@ -0,0 +1,48 @@
package sensor
// do sensor and methods
import (
pb "FRMS/internal/pkg/grpc"
"sync"
)
type DOManager struct {
// do sensor manager
*Atlas // atlas helper
*SensorManager `mapstructure:",squash"`
sync.RWMutex
Status int
Name string
}
func NewDOManager(device *pb.Device) *DOManager {
a := NewAtlasManager()
sm := NewSensorManager(device*pb.Device, a.Read)
m := &DOManager{
SensorManager: sm,
Atlas: a,
}
return m
}
func (m *DOManager) Start() error {
// start sm
if err := m.SensorManager.Start(); err != nil {
return err
}
// start taking samples
go m.Monitor(m.Address, m.Atlas.Read)
return nil
}
func (m *DOManager) GetDefaultName() string {
return "DO Sensor"
}
func (m *DOManager) String() string {
// TODO
return ""
}

@ -0,0 +1,41 @@
package controller
import (
pb "FRMS/internal/pkg/grpc"
"FRMS/internal/pkg/manager"
"sync"
)
// base controller manager
type Manager interface {
Start() error
Exit() error
}
func NewManager(max int) Manager {
return manager.New(max)
}
type ControllerManager struct {
Manager
*pb.Device
sync.Mutex
Enabled bool // turn controller on or off
}
func NewControllerManager(device *pb.Device) *ControllerManager {
m := NewManager(0) // no connections
return &ControllerManager{Manager: m, Device: device}
}
func (c *ControllerManager) GetDevice() *pb.Device {
return c.Device
}
func (c *ControllerManager) UpdateDevice(device *pb.Device) error {
c.Device = device
return nil
}

@ -0,0 +1,37 @@
package controller
// do sensor and methods
import (
pb "FRMS/internal/pkg/grpc"
"sync"
)
type PWMManager struct {
// do sensor manager
*ControllerManager
sync.RWMutex
Frequency int
DutyCycle int
}
func NewPWMManager(device *pb.Device) *PWMManager {
cm := NewControllerManager(device)
return &PWMManager{ControllerManager: cm}
}
// freq changing
func (m *PWMManager) GetFrequency() (int, error) {
m.Lock()
defer m.Unlock()
return m.Frequency, nil
}
func (m *PWMManager) GetDefaultName() string {
return "pwm controller"
}
func (m *PWMManager) String() string {
// TODO
return ""
}

@ -0,0 +1,55 @@
package device
import (
pb "FRMS/internal/pkg/grpc"
"fmt"
"sync"
"github.com/spf13/viper"
)
type SubManager interface {
Start() error
Exit() error
String() string // printing info about the sub manager
GetDevice() *pb.Device
UpdateDevice(*pb.Device) error // updates device
GetAddr() int32
GetStatus() pb.Status
GetName() string
GetDefaultName() string
}
// base device manager
type DeviceManager struct {
SubManager `mapstructure:",squash"`
Config *viper.Viper
sync.RWMutex
}
func NewDeviceManager(device *pb.Device, config *viper.Viper) (*DeviceManager, error) {
s, err := NewSubManager(device)
dm := &DeviceManager{
SubManager: s,
Config: config,
}
return dm, err
}
func (m *DeviceManager) LoadConfig(prefix string) error {
// setting default name
// prefix = [ reactors | reactor.id ]
mainKey := fmt.Sprintf("%s.devices.%d", prefix, m.GetAddr())
nameKey := fmt.Sprintf("%s.name", mainKey)
if !m.Config.IsSet(nameKey) {
m.Config.Set(nameKey, m.SubManager.GetDefaultName())
}
m.Config.UnmarshalKey(mainKey, m)
return nil
}

@ -0,0 +1,37 @@
package device
import (
"FRMS/internal/pkg/controller"
pb "FRMS/internal/pkg/grpc"
"FRMS/internal/pkg/sensor"
"errors"
"fmt"
)
/*
Returns the correct manager for sensor/controller
*/
func NewSubManager(device *pb.Device) (SubManager, error) {
// returns correct device manager by ID
var m SubManager
var err error
addr := device.GetAddr()
switch addr {
case 97:
// DO
m = sensor.NewDOManager(device)
case 99:
// pH
m = sensor.NewPHManager(device)
case 102:
// RTD
m = sensor.NewRTDManager(device)
case 256:
// PWM
m = controller.NewPWMManager(device)
default:
err = errors.New(fmt.Sprintf("Error: device id %d unrecognized!", addr))
}
return m, err
}

@ -24,7 +24,5 @@ enum Status {
message Device { message Device {
int32 addr = 1; // i2c addr int32 addr = 1; // i2c addr
string name = 2; // use readable name, changable Status status = 2;
Status status = 3;
map<string,string> data = 4; // k=v, format
} }

@ -0,0 +1,97 @@
package manager
import (
"errors"
"math"
"math/rand"
"sync"
"sync/atomic"
"time"
)
// basic manager for starting/stopping checks plus built in heartbeat for downtime detection
// used across server/reactor
type Connection struct {
Attempts float64 // float for pow
MaxAttempts int // max allowed
sync.Mutex
}
type Manager struct {
*Connection // embedded for timeout stuff
Active int32 // atomic checks
}
func New(maxCon int) *Manager {
c := &Connection{MaxAttempts: maxCon}
m := &Manager{
Connection: c,
Active: 0,
}
return m
}
func (m *Manager) Start() error {
// atomically checks/updates status
if atomic.CompareAndSwapInt32(&m.Active, 0, 1) {
m.ResetConnections()
return nil
}
// already running
return errors.New("Manager already started!")
}
func (m *Manager) Exit() error {
if atomic.CompareAndSwapInt32(&m.Active, 1, 0) {
return nil
}
return errors.New("Manager not active!")
}
// Heartbeat tracker
func (m *Manager) HeartBeat(ping chan struct{}, hb, interval int, scale time.Duration) {
// pings channel every (HB + randInterval) * time.Duration
// can be used anywhere a heartbeat is needed
// closes channel on exit
if interval > 0 {
rand.Seed(time.Now().UnixNano())
}
for atomic.LoadInt32(&m.Active) > 0 {
// atomoic read may cause memory leak, can revisit
ping <- struct{}{} // no mem
sleep := time.Duration(hb-interval) * scale
if interval > 0 {
sleep += time.Duration(rand.Intn(2*interval)) * scale
}
time.Sleep(sleep)
}
// exited, close chan
close(ping)
}
// connection timeout generator
func (c *Connection) Timeout() (time.Duration, error) {
// exponential backoff
c.Lock()
defer c.Unlock()
if int(c.Attempts) < c.MaxAttempts {
c.Attempts += 1
// 50, 100, 200...
to := time.Duration(50*math.Pow(2, c.Attempts)) * time.Millisecond
return to, nil
}
return 0, errors.New("Connection Failed")
}
func (c *Connection) ResetConnections() {
c.Lock()
defer c.Unlock()
c.Attempts = 0
}

@ -1,8 +1,8 @@
package reactor package reactor
import ( import (
"FRMS/internal/pkg/device"
pb "FRMS/internal/pkg/grpc" pb "FRMS/internal/pkg/grpc"
"FRMS/internal/pkg/sensor"
"fmt" "fmt"
"sync" "sync"
@ -12,31 +12,24 @@ import (
type DeviceManager interface { type DeviceManager interface {
Start() error Start() error
Exit() error Exit() error
GetDelay(*viper.Viper, string) error
SetName(string) error
GetName() string GetName() string
GetInfo() (*pb.Device, error) GetStatus() pb.Status
SetStatus(int) error GetDevice() *pb.Device // monitoring info
GetStatus() int LoadConfig(string) error
SetI2C(sensor.I2CClient) error // setting the i2c
} }
func NewDeviceManager(addr int, i2c I2CClient) (DeviceManager, error) { func NewDeviceManager(addr int, config *viper.Viper) (DeviceManager, error) {
// new dev return device.NewDeviceManager(&pb.Device{Addr: int32(addr)}, config)
sm, err := sensor.NewDeviceManager(&pb.Device{Addr: int32(addr)})
// set i2c interface
err = sm.SetI2C(i2c)
return sm, err
} }
type DeviceCoordinator struct { type DeviceCoordinator struct {
Config *viper.Viper
Managers map[int]DeviceManager Managers map[int]DeviceManager
sync.RWMutex sync.RWMutex
} }
func NewDeviceCoordinator() *DeviceCoordinator { func NewDeviceCoordinator(config *viper.Viper) *DeviceCoordinator {
dm := &DeviceCoordinator{} dm := &DeviceCoordinator{Config: config}
dm.Managers = make(map[int]DeviceManager) dm.Managers = make(map[int]DeviceManager)
return dm return dm
} }
@ -52,28 +45,29 @@ func (c *DeviceCoordinator) UpdateDevices(config *viper.Viper, i2c I2CClient, ac
// loop over devs // loop over devs
if _, ok := c.Managers[addr]; !ok { if _, ok := c.Managers[addr]; !ok {
// no device // no device
if c.Managers[addr], err = NewDeviceManager(addr, i2c); err != nil { if c.Managers[addr], err = NewDeviceManager(addr, c.Config); err != nil {
return err return err
} }
fmt.Printf("Found dev %d (%x)\n", addr, addr) // loading config
c.Managers[addr].LoadConfig("reactor")
// check for config name // check for config name
nameKey := fmt.Sprintf("device.%d.name", addr) // nameKey := fmt.Sprintf("device.%d.name", addr)
if !config.IsSet(nameKey) { // if !config.IsSet(nameKey) {
// no config name, get default // // no config name, get default
config.Set(nameKey, c.Managers[addr].GetName()) // config.Set(nameKey, c.Managers[addr].GetName())
} // }
// setting it // // setting it
if err = c.Managers[addr].SetName(config.Get(nameKey).(string)); err != nil { // if err = c.Managers[addr].SetName(config.Get(nameKey).(string)); err != nil {
return err // return err
} // }
// atlas delays // // atlas delays
delayKey := fmt.Sprintf("device.%d.delays", addr) // delayKey := fmt.Sprintf("device.%d.delays", addr)
if !config.IsSet(delayKey) { // if !config.IsSet(delayKey) {
// set empty delays // // set empty delays
config.Set(fmt.Sprintf("device.%d.delays.read", addr), 0) // config.Set(fmt.Sprintf("device.%d.delays.read", addr), 0)
config.Set(fmt.Sprintf("device.%d.delays.cal", addr), 0) // config.Set(fmt.Sprintf("device.%d.delays.cal", addr), 0)
} // }
c.Managers[addr].GetDelay(config, delayKey) // c.Managers[addr].GetDelay(config, delayKey)
} }
} }
// all devs accounted for // all devs accounted for
@ -81,12 +75,10 @@ func (c *DeviceCoordinator) UpdateDevices(config *viper.Viper, i2c I2CClient, ac
if active[addr] { if active[addr] {
// active // active
if dm.GetStatus() != 1 { if dm.GetStatus() != 1 {
go dm.SetStatus(1)
err = dm.Start() err = dm.Start()
} }
} else { } else {
if dm.GetStatus() != 0 { if dm.GetStatus() != 0 {
go dm.SetStatus(0)
err = dm.Exit() err = dm.Exit()
} }
} }
@ -106,7 +98,7 @@ func (c *DeviceCoordinator) GetDevices() ([]*pb.Device, error) {
var dev *pb.Device var dev *pb.Device
for _, dm := range c.Managers { for _, dm := range c.Managers {
dev, err = dm.GetInfo() dev = dm.GetDevice()
fmt.Println(dev) fmt.Println(dev)
devices = append(devices, dev) devices = append(devices, dev)
} }

@ -11,7 +11,7 @@ import (
// implements grpc handler and device data aggregater handler // implements grpc handler and device data aggregater handler
// grpc status update handler // grpc status update handler
func (c *Coordinator) Ping() { func (c *ReactorCoordinator) Ping() {
// sends all device status to central coordinator // sends all device status to central coordinator
fmt.Printf("Pinging coordinator\n") fmt.Printf("Pinging coordinator\n")
// get devices // get devices

@ -7,12 +7,10 @@ import (
pb "FRMS/internal/pkg/grpc" pb "FRMS/internal/pkg/grpc"
"FRMS/internal/pkg/influxdb" "FRMS/internal/pkg/influxdb"
"FRMS/internal/pkg/logging" "FRMS/internal/pkg/logging"
"FRMS/internal/pkg/manager"
"FRMS/internal/pkg/system" "FRMS/internal/pkg/system"
"context" "context"
"errors"
"fmt" "fmt"
"math"
"sync"
"time" "time"
"github.com/spf13/viper" "github.com/spf13/viper"
@ -21,6 +19,18 @@ import (
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
) )
// basic manager
type Manager interface {
Start() error
Exit() error
Timeout() (time.Duration, error)
HeartBeat(chan struct{}, int, int, time.Duration) // creates a hb
}
func NewManager(max int) Manager {
return manager.New(max)
}
// db client interface // db client interface
type DBClient interface { type DBClient interface {
// //
@ -42,86 +52,91 @@ func NewI2CClient(config *viper.Viper) (I2CClient, error) {
} }
type Server struct { type Server struct {
// embed
Ip string `mapstructure:"ip"` Ip string `mapstructure:"ip"`
Port int `mapstructure:"port"` Port int `mapstructure:"port"`
} }
// Coordinator == Reactor Level Coordinator type Info struct {
type Coordinator struct {
Name string `mapstructure:"name,omitempty"` Name string `mapstructure:"name,omitempty"`
ID int `mapstructure:"id,omitempty"` ID int `mapstructure:"id,omitempty"`
Model string `mapstructure:"model,omitempty"` Model string `mapstructure:"model,omitempty"`
// server info embedded HB int `mapstructure:"heartbeat"`
Server Server
// database }
type ReactorCoordinator struct {
Manager // base manager
Config *viper.Viper // config
Info `mapstructure:",squash"`
Database DBClient Database DBClient
I2C I2CClient I2C I2CClient
// config
Config *viper.Viper MonitoringClient pb.MonitoringClient // grpc
MonitoringClient pb.MonitoringClient
// connected devices
*DeviceCoordinator // struct for locking *DeviceCoordinator // struct for locking
// other stuff and things
Err chan error
mu sync.Mutex
HB time.Duration
PingTimer chan struct{}
// db client
Active active
}
type active struct { Err chan error
bool
int
sync.Mutex
} }
func NewCoordinator(config *viper.Viper, ch chan error) *Coordinator { func NewCoordinator(config *viper.Viper, errCh chan error) *ReactorCoordinator {
// coord
c := &Coordinator{Err: ch, Config: config}
c.DeviceCoordinator = NewDeviceCoordinator()
// hb defaults to 5
c.HB = time.Duration(5 * time.Second)
// this is going to be scuffed m := NewManager(6) // max 6 attempts
//c.DB = &DB{Bucket: "bb", Org: "ForeLight", URL: url, Token: "S1UZssBu6KPfHaQCt34pZFpyc5lzbH9XanYJWCkOI5FqLY7gq205C6FTH-CmugiPH6o2WoKlTkEuPgIfaJjAhw=="} dc := NewDeviceCoordinator(config)
// setup db
var err error
if c.Database, err = NewDBClient(config); err != nil {
ch <- err
}
if c.I2C, err = NewI2CClient(config); err != nil { c := &ReactorCoordinator{
ch <- err Manager: m,
Config: config,
DeviceCoordinator: dc,
Err: errCh,
} }
c.PingTimer = make(chan struct{}) // this is going to be scuffed
//c.DB = &DB{Bucket: "bb", Org: "ForeLight", URL: url, Token: "S1UZssBu6KPfHaQCt34pZFpyc5lzbH9XanYJWCkOI5FqLY7gq205C6FTH-CmugiPH6o2WoKlTkEuPgIfaJjAhw=="}
return c return c
} }
func (c *Coordinator) Start() { func (c *ReactorCoordinator) Start() {
// should discover hwinfo and sensors on its own // should discover hwinfo and sensors on its own
// now setting up sensor managers // now setting up sensor managers
c.Activate() var err error
// load hwinfo
if err := c.LoadInfo(); err != nil { // loads info if err = c.Manager.Start(); err != nil {
c.Err <- err c.Err <- err
} }
// grab config stuff // load config
c.Config.UnmarshalKey("reactor", c) if err = c.LoadConfig(); err != nil { // loads info
go c.Monitor() c.Err <- err
}
// loading clients
if c.Database, err = NewDBClient(c.Config); err != nil {
c.Err <- err
}
if c.I2C, err = NewI2CClient(c.Config); err != nil {
c.Err <- err
}
go c.Discover() go c.Discover()
go c.Database.Start() go c.Database.Start()
} }
func (c *Coordinator) LoadInfo() error { func (c *ReactorCoordinator) LoadConfig() error {
// check ID
var err error var err error
// get hb
if !c.Config.IsSet("reactor.heartbeat") {
// default to 5 seconds
c.Config.Set("reactor.heartbeat", 5)
}
// check id
if !c.Config.IsSet("reactor.id") { if !c.Config.IsSet("reactor.id") {
// get id // get from hw
var id int var id int
if id, err = system.GetId("eth0"); err != nil { if id, err = system.GetId("eth0"); err != nil {
return err return err
@ -131,42 +146,39 @@ func (c *Coordinator) LoadInfo() error {
// check Model // check Model
if !c.Config.IsSet("reactor.model") { if !c.Config.IsSet("reactor.model") {
// get model // get from hw
var model string var model string
if model, err = system.GetModel(); err != nil { if model, err = system.GetModel(); err != nil {
return err return err
} }
c.Config.Set("reactor.model", model) c.Config.Set("reactor.model", model)
} }
// all good
// all good, unmarhsaling
c.Config.UnmarshalKey("reactor", c)
return err return err
} }
func (c *Coordinator) Monitor() { func (c *ReactorCoordinator) Monitor() {
// periodically grabs connected devs and updates list // periodically grabs connected devs and updates list
for c.IsActive() { ch := make(chan struct{})
select { go c.HeartBeat(ch, c.HB, 0, time.Second)
case <-c.PingTimer:
// check devs and ping
active, err := c.I2C.GetConnected()
if err != nil {
c.Err <- err
}
go c.UpdateDevices(c.Config, c.I2C, active)
go c.Ping()
}
}
}
func (c *Coordinator) HeartBeat() { for range ch {
for c.IsActive() { // check devs and ping
c.PingTimer <- struct{}{}
logging.Debug(logging.DClient, "RLC Pinging server") logging.Debug(logging.DClient, "RLC Pinging server")
time.Sleep(c.HB) // this can probably be offloaded
active, err := c.I2C.GetConnected()
if err != nil {
c.Err <- err
}
go c.UpdateDevices(c.Config, c.I2C, active)
go c.Ping()
} }
} }
func (c *Coordinator) Discover() { func (c *ReactorCoordinator) Discover() {
// sets up connection to central coordiantor // sets up connection to central coordiantor
conn, err := c.Connect(c.Ip, c.Port) conn, err := c.Connect(c.Ip, c.Port)
if err != nil { if err != nil {
@ -187,11 +199,12 @@ func (c *Coordinator) Discover() {
c.Err <- err c.Err <- err
} }
c.MonitoringClient = pb.NewMonitoringClient(clientConn) c.MonitoringClient = pb.NewMonitoringClient(clientConn)
go c.HeartBeat() // manager
go c.Monitor()
} }
func (c *Coordinator) Connect(ip string, port int) (*grpc.ClientConn, error) { func (c *ReactorCoordinator) Connect(ip string, port int) (*grpc.ClientConn, error) {
// function connects to central server and passes hwinfo // function connects to central server and passes hwinfo
var opts []grpc.DialOption var opts []grpc.DialOption
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
@ -202,13 +215,13 @@ func (c *Coordinator) Connect(ip string, port int) (*grpc.ClientConn, error) {
code := status.Code(err) code := status.Code(err)
if code != 0 { // != OK if code != 0 { // != OK
if code == (5 | 14) { // service temp down if code == (5 | 14) { // service temp down
to := c.Timeout() var to time.Duration
if to == 0 { if to, err = c.Timeout(); err != nil {
err = errors.New("Failed to connect to central server") // from manager
return &grpc.ClientConn{}, err return &grpc.ClientConn{}, err
} }
logging.Debug(logging.DClient, "Server currently unavailable, retrying in %v ms", to) logging.Debug(logging.DClient, "Server currently unavailable, retrying in %v", to)
time.Sleep(time.Duration(to) * time.Millisecond) time.Sleep(to)
} else { } else {
return &grpc.ClientConn{}, err return &grpc.ClientConn{}, err
} }
@ -217,48 +230,3 @@ func (c *Coordinator) Connect(ip string, port int) (*grpc.ClientConn, error) {
} }
return conn, nil return conn, nil
} }
func (c *Coordinator) Timeout() int {
c.Active.Lock()
defer c.Active.Unlock()
if c.Active.int < 9 {
v := int(5 * math.Pow(float64(2), float64(c.Active.int)))
c.Active.int += 1
return v
} else {
//excedded retries
return 0
}
}
func (c *Coordinator) IsActive() bool {
c.Active.Lock()
defer c.Active.Unlock()
return c.Active.bool
}
func (c *Coordinator) Exit() bool {
c.Active.Lock()
defer c.Active.Unlock()
if c.Active.bool {
c.Active.bool = false
logging.Debug(logging.DClient, "RLC Exiting...")
return true
} else {
logging.Debug(logging.DError, "RLC Already Dead!")
return false
}
}
func (c *Coordinator) Activate() bool {
c.Active.Lock()
defer c.Active.Unlock()
if c.Active.bool {
logging.Debug(logging.DError, "RLC Already Started!")
return false
} else {
logging.Debug(logging.DClient, "RLC Starting")
c.Active.bool = true
return c.Active.bool
}
}

@ -3,8 +3,6 @@ package sensor
import ( import (
"errors" "errors"
"time" "time"
"github.com/spf13/viper"
) )
// atlas helpers to aid with sensors // atlas helpers to aid with sensors
@ -37,20 +35,24 @@ func (a *Atlas) Calibrate(addr int, cal string) error {
return err return err
} }
func (a *Atlas) Read(addr int) (string, error) { func (a *Atlas) Read(addr int32) (float32, error) {
// take reading function // take reading function
if _, err := a.I2C.SendCmd(addr, "R"); err != nil { if _, err := a.I2C.SendCmd(int(addr), "R"); err != nil {
// read command // read command
//fmt.Printf("Error writing! %s", err) //fmt.Printf("Error writing! %s", err)
return "", err return 0, err
} }
sleep := time.Duration(a.ReadDelay) * time.Millisecond sleep := time.Duration(a.ReadDelay) * time.Millisecond
time.Sleep(sleep) // sleep between reads time.Sleep(sleep) // sleep between reads
data, err := a.I2C.SendCmd(addr, "") //data, err := a.I2C.SendCmd(addr, "")
return data, err return 0, nil
}
// for config
func (a *Atlas) GetCalDelay() int {
return a.CalDelay
} }
// config stuff func (a *Atlas) GetReadDelay() int {
func (a *Atlas) GetDelay(config *viper.Viper, key string) error { return a.ReadDelay
return config.UnmarshalKey(key, a)
} }

@ -5,26 +5,42 @@ package sensor
import ( import (
pb "FRMS/internal/pkg/grpc" pb "FRMS/internal/pkg/grpc"
"sync" "sync"
"github.com/spf13/viper"
) )
type DOSensorManager struct { type DOManager struct {
// do sensor manager // do sensor manager
*Manager *Atlas // atlas helper
*SensorManager `mapstructure:",squash"`
sync.RWMutex sync.RWMutex
} }
func (s *DOSensorManager) GetName() string { func NewDOManager(device *pb.Device) *DOManager {
return s.Manager.GetName("DO Sensor") a := &Atlas{}
sm := NewSensorManager(device, a.Read)
m := &DOManager{
SensorManager: sm,
Atlas: a,
}
return m
}
func (m *DOManager) Start() error {
// start sm
if err := m.SensorManager.Start(); err != nil {
return err
}
// start taking samples
go m.Monitor()
return nil
} }
func (s *DOSensorManager) Update(sensor *pb.Device, config *viper.Viper) error { func (m *DOManager) GetDefaultName() string {
// updates info return "DO Sensor"
return s.Manager.Update(sensor)
} }
func (s *DOSensorManager) String() string { func (m *DOManager) String() string {
// basic // TODO
return s.Manager.String() return ""
} }

@ -2,195 +2,75 @@ package sensor
import ( import (
pb "FRMS/internal/pkg/grpc" pb "FRMS/internal/pkg/grpc"
"errors" "FRMS/internal/pkg/manager"
"fmt"
"math/rand"
"sync" "sync"
"time" "time"
) )
// base device manager type Manager interface {
Start() error
type Manager struct { Exit() error
// base dm HeartBeat(chan struct{}, int, int, time.Duration)
Device *pb.Device // for sending/updating
*Atlas
*Active // I might need a thing here
ReadTimer chan struct{} // time reads
sync.RWMutex
}
type Active struct {
bool
int
sync.Mutex
}
func NewManager(atlas *Atlas, device *pb.Device) *Manager {
rt := make(chan struct{})
active := &Active{}
return &Manager{ReadTimer: rt, Device: device, Atlas: atlas, Active: active}
}
func (m *Manager) SetI2C(i2c I2CClient) error {
m.Atlas.I2C = i2c
return nil
}
func (m *Manager) GetName(basic string) string {
m.RLock()
defer m.RUnlock()
// basic is the default if there isnt a name set
if m.Device.Name == "" {
return basic
}
return m.Device.Name
}
func (m *Manager) SetName(name string) error {
m.Lock()
defer m.Unlock()
m.Device.Name = name
return nil
}
func (m *Manager) GetStatus() int {
m.RLock()
defer m.RUnlock()
return int(m.Device.Status)
}
func (m *Manager) SetStatus(status int) error {
// updates status
m.Lock()
defer m.Unlock()
m.Device.Status = pb.Status(status)
return nil
}
func (m *Manager) GetAddr() int {
m.RLock()
defer m.RUnlock()
return int(m.Device.Addr)
}
func (m *Manager) SetData(data map[string]string) error {
m.Lock()
defer m.Unlock()
m.Device.Data = data
return nil
}
func (m *Manager) SetParameter(key, val string) error {
// #TODO
return errors.New("UNIMPL")
}
func (m *Manager) GetParameter(key string) (string, error) {
// #TODO
return "", errors.New("UNIMPL")
}
func (m *Manager) Update(device *pb.Device) error {
// updates info
m.Lock()
defer m.Unlock()
m.Device = device
return nil
} }
func (m *Manager) GetInfo() (*pb.Device, error) { func NewManager(max int) Manager {
m.RLock() return manager.New(max)
defer m.RUnlock()
return m.Device, nil
} }
func (m Manager) String() string { type Reading struct {
// basic printout sync.RWMutex
str := fmt.Sprintf("%s is %s at %d (%x). Reading: %s", m.Device.Name, m.Device.Status.String(), m.Device.Addr, m.Device.Addr, m.Device.Data["rip"]) Latest float32
return str New func(int32) (float32, error)
} }
// manager funcs to start/stop type SensorManager struct {
SampleRate int `mapstructure:"samplerate"` // in (ms)
Name string `mapstructure:"name"`
Manager
func (m *Manager) Start() error { *pb.Device
// goal is to start a long running monitoring routine *Reading
if !m.Activate() {
return errors.New("Manager already running!")
} // atomically activated if this runs
fmt.Println("Manager starting")
go m.Monitor()
go m.HeartBeat()
return nil
} }
func (m *Manager) Exit() error { func NewSensorManager(device *pb.Device, f func(int32) (float32, error)) *SensorManager {
if !m.Deactivate() { m := NewManager(0) // no timeout
return errors.New("Manager already exited!") r := &Reading{New: f}
s := &SensorManager{
Manager: m,
Reading: r,
Device: device,
} }
return nil return s
} }
func (a *Active) Activate() bool { func (s *SensorManager) GetDevice() *pb.Device {
// returns true if success, false otherwise return s.Device
a.Lock()
defer a.Unlock()
if a.bool { // already active
return false
} else {
a.bool = true
a.int = 0
return a.bool
}
} }
func (a *Active) Deactivate() bool { func (s *SensorManager) GetName() string {
// returns true if success false otherise return s.Name
a.Lock()
defer a.Unlock()
if a.bool {
a.bool = false
return true
} else { // already deactivated
return a.bool // false
}
} }
func (a *Active) IsActive() bool { func (s *SensorManager) Monitor() {
a.Lock() ch := make(chan struct{}) // hb chan
defer a.Unlock() go s.HeartBeat(ch, s.SampleRate, 1000, time.Millisecond)
return a.bool
}
// reads for range ch {
go s.TakeReading(s.Device.GetAddr())
func (m *Manager) Monitor() {
for m.IsActive() {
select {
case <-m.ReadTimer:
// perform read
go m.ReadData()
}
} }
} }
func (m *Manager) HeartBeat() { func (r *Reading) TakeReading(addr int32) {
for m.IsActive() { sample, err := r.New(addr)
m.ReadTimer <- struct{}{} if err != nil {
rand_sleep := rand.Intn(2000) + 4000 panic(err)
time.Sleep(time.Duration(rand_sleep) * time.Millisecond) // 4000 - 5000 millisecond sleep intervali
} }
r.Lock()
defer r.Unlock()
r.Latest = sample
} }
func (m *Manager) ReadData() { func (s *SensorManager) UpdateDevice(device *pb.Device) error {
// perform I2C read via atlas helper and update data! s.Device = device
var err error return nil
var data string
data, err = m.Atlas.Read(int(m.GetAddr()))
//fmt.Println(data)
d := map[string]string{"rip": data}
if err = m.SetData(d); err != nil {
panic(err)
}
} }

@ -1,56 +0,0 @@
package sensor
import (
pb "FRMS/internal/pkg/grpc"
"errors"
"fmt"
"github.com/spf13/viper"
)
/*
Returns the correct manager for sensor/device
*/
type DeviceManager interface {
// basic device stuff
Start() error
Exit() error
SetI2C(I2CClient) error // for interface
GetName() string
SetName(string) error // change displayed name
GetDelay(*viper.Viper, string) error
GetStatus() int
SetStatus(int) error // status change
SetParameter(string, string) error // key, val
GetParameter(string) (string, error) // key, val
Update(*pb.Device, *viper.Viper) error // write updates
GetInfo() (*pb.Device, error) // gets info
String() string // printable
}
func NewDeviceManager(device *pb.Device) (DeviceManager, error) {
// returns correct device manager by ID
atlas := &Atlas{}
m := NewManager(atlas, device)
var dm DeviceManager
var err error
switch id := device.GetAddr(); id {
case 97:
// DO
dm = &DOSensorManager{Manager: m}
case 99:
// pH
dm = &PHSensorManager{Manager: m}
case 102:
// RTD
dm = &RTDSensorManager{Manager: m}
case 256:
// PWM
dm = &PWMDeviceManager{Manager: m}
default:
err = errors.New(fmt.Sprintf("Error: device id %v unrecognized!", id))
}
return dm, err
}

@ -5,26 +5,31 @@ package sensor
import ( import (
pb "FRMS/internal/pkg/grpc" pb "FRMS/internal/pkg/grpc"
"sync" "sync"
"github.com/spf13/viper"
) )
type PHSensorManager struct { type PHManager struct {
// do sensor manager // do sensor manager
*Manager *Atlas
*SensorManager `mapstructure:",squash"`
sync.RWMutex sync.RWMutex
} }
func (s *PHSensorManager) GetName() string { func NewPHManager(device *pb.Device) *PHManager {
return s.Manager.GetName("pH Sensor") a := &Atlas{}
sm := NewSensorManager(device, a.Read)
m := &PHManager{
SensorManager: sm,
Atlas: a,
}
return m
} }
func (s *PHSensorManager) Update(sensor *pb.Device, config *viper.Viper) error { func (s *PHManager) GetDefaultName() string {
// updates info return "pH Sensor"
return s.Manager.Update(sensor)
} }
func (s PHSensorManager) String() string { func (s PHManager) String() string {
// basic // TODO
return s.Manager.String() return ""
} }

@ -1,36 +0,0 @@
package sensor
// do sensor and methods
import (
pb "FRMS/internal/pkg/grpc"
"errors"
"sync"
"github.com/spf13/viper"
)
type PWMDeviceManager struct {
// do sensor manager
*Manager
sync.RWMutex
}
func (s *PWMDeviceManager) GetName() string {
return s.Manager.GetName("PWM")
}
func (s *PWMDeviceManager) SetParameter(key, val string) error {
// #TODO
return errors.New("UNIMPL")
}
func (s *PWMDeviceManager) Update(device *pb.Device, config *viper.Viper) error {
// updates info
return s.Manager.Update(device)
}
func (s *PWMDeviceManager) String() string {
// basic
return s.Manager.String()
}

@ -5,26 +5,30 @@ package sensor
import ( import (
pb "FRMS/internal/pkg/grpc" pb "FRMS/internal/pkg/grpc"
"sync" "sync"
"github.com/spf13/viper"
) )
type RTDSensorManager struct { type RTDManager struct {
// do sensor manager // do sensor manager
*Manager *Atlas
*SensorManager `mapstructure:",squash"`
sync.RWMutex sync.RWMutex
} }
func (s *RTDSensorManager) GetName() string { func NewRTDManager(device *pb.Device) *RTDManager {
return s.Manager.GetName("RTD Sensor") a := &Atlas{}
sm := NewSensorManager(device, a.Read)
m := &RTDManager{
SensorManager: sm,
Atlas: a,
}
return m
} }
func (s *RTDSensorManager) Update(sensor *pb.Device, config *viper.Viper) error { func (s *RTDManager) GetDefaultName() string {
// updates info return "RTD Sensor"
return s.Manager.Update(sensor)
} }
func (s *RTDSensorManager) String() string { func (s *RTDManager) String() string {
// basic info // TODO
return s.Manager.String() return ""
} }

@ -133,32 +133,36 @@ func (c *ReactorCoordinator) Start() error {
func (c *ReactorCoordinator) ClientHandler(cl *Client) { func (c *ReactorCoordinator) ClientHandler(cl *Client) {
// updates clients if nessecary // updates clients if nessecary
if err := c.UpdateManager(cl, c.Err); err != nil { if err := c.UpdateReactorManager(cl, c.Err); err != nil {
c.Err <- err c.Err <- err
} }
} }
func (m *ReactorManagers) GetManager(id int) (*ReactorManager, error) { func (m *ReactorManagers) GetReactorManager(id int) (*ReactorManager, error) {
m.RLock() m.RLock()
defer m.RUnlock() defer m.RUnlock()
rm, exists := m.Directory[id] rm, exists := m.Directory[id]
if !exists { if !exists {
return &ReactorManager{}, errors.New(fmt.Sprintf("No manager for reactor %d!", id)) return &ReactorManager{ID: id}, errors.New(fmt.Sprintf("No manager for reactor %d!", id))
} }
return rm, nil return rm, nil
} }
func (m *ReactorManagers) UpdateManager(cl *Client, errCh chan error) error { func (m *ReactorManagers) UpdateReactorManager(cl *Client, errCh chan error) error {
// locking // locking
m.RLock() m.RLock()
defer m.RUnlock() defer m.RUnlock()
var err error
rm, exists := m.Directory[cl.Id] rm, exists := m.Directory[cl.Id]
if !exists { if !exists {
logging.Debug(logging.DClient, "RCO 01 starting manager for reactor client %v", cl.Id) logging.Debug(logging.DClient, "RCO creating manager for reactor client %v", cl.Id)
// creating
rm = NewReactorManager(errCh) rm = NewReactorManager(errCh)
if err := rm.Start(); err != nil { // starting
if err = rm.Start(); err != nil {
return err return err
} }
m.Directory[cl.Id] = rm m.Directory[cl.Id] = rm
@ -179,7 +183,7 @@ func (r *ReactorCoordinator) Register() error {
} }
func (r *ReactorCoordinator) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) { func (r *ReactorCoordinator) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) {
rm, err := r.GetManager(int(req.GetId())) rm, err := r.GetReactorManager(int(req.GetId()))
// error checking // error checking
if err != nil { if err != nil {
return &pb.ReactorStatusResponse{}, err return &pb.ReactorStatusResponse{}, err

@ -4,8 +4,6 @@ import (
//"log" //"log"
"FRMS/internal/pkg/logging" "FRMS/internal/pkg/logging"
_ "context" _ "context"
"errors"
"math"
"sync" "sync"
"time" "time"
) )
@ -13,107 +11,21 @@ import (
// will condense into the rm soon enough // will condense into the rm soon enough
// manager connects to client on start and returns the gRPC connection to make gRPC clients // manager connects to client on start and returns the gRPC connection to make gRPC clients
type Manager struct { type ClientManager struct {
*Client // gives access to c.Ip c.Id etc *Client // gives access to c.Ip c.Id etc
Hb time.Duration // used for managing hb timer for client Hb time.Duration // used for managing hb timer for client
Active active
Sig chan bool Sig chan bool
Err chan error
}
type active struct {
sync.Mutex sync.Mutex
bool
int
}
func NewManager(err chan error) *Manager {
hb := time.Duration(5 * time.Second) //hb to
m := &Manager{Hb: hb, Err: err}
return m
} }
func (m *Manager) Start() error { func NewClientManager(cl *Client) *ClientManager {
if !m.Activate() { return &ClientManager{Client: cl}
// manager already running
return errors.New("Manager already running!")
} // if we get here, manager is atomically activated and we can ensure start wont run again
return nil
} }
func (m *Manager) Exit() { func (m *ClientManager) UpdateClient(cl *Client) error {
// exit function to eventually allow saving to configs m.Lock()
if !m.Deactivate() { defer m.Unlock()
m.Err <- errors.New("Manager already disabled!")
}
}
func (m *Manager) UpdateClient(cl *Client) error {
logging.Debug(logging.DClient, "MAN Updating client %v", cl.Id) logging.Debug(logging.DClient, "MAN Updating client %v", cl.Id)
m.Client = cl m.Client = cl
return nil return nil
} }
// reactor manager atomic operations
func (m *Manager) IsActive() bool {
m.Active.Lock()
defer m.Active.Unlock()
return m.Active.bool
}
func (m *Manager) Activate() bool {
// slightly confusing but returns result of trying to activate
m.Active.Lock()
defer m.Active.Unlock()
alive := m.Active.bool
if alive {
return false
} else {
m.Active.bool = true
m.Active.int = 0
return m.Active.bool
}
}
func (m *Manager) Deactivate() bool {
// result of trying to deactivate
m.Active.Lock()
defer m.Active.Unlock()
alive := m.Active.bool
if alive {
m.Active.bool = false
return true
} else {
return m.Active.bool
}
}
// connection stuff
func (m *Manager) Timeout() int {
// keeps track of and generates timeout [0-1.2s) over span of ~2.5s
// returns 0 on TO elapse
m.Active.Lock()
defer m.Active.Unlock()
if m.Active.int < 9 {
v := int(5 * math.Pow(float64(2), float64(m.Active.int)))
m.Active.int += 1
return v
} else {
// exceeded retries
return 0
}
}
/*
shouldnt be nessecary anymore
func (m *Manager) GetDevices(ctx context.Context, req *pb.GetDevicesRequest) (*pb.GetDevicesResponse, error) {
return &pb.GetDevicesResponse{}, errors.New("Get Devices not implemented!")
}
func (m *Manager) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) {
return &pb.ReactorStatusResponse{}, errors.New("Reactor Status Handler not implemented!")
}
*/

@ -1,9 +1,13 @@
package server package server
import ( import (
"FRMS/internal/pkg/device"
pb "FRMS/internal/pkg/grpc" pb "FRMS/internal/pkg/grpc"
"FRMS/internal/pkg/logging" "FRMS/internal/pkg/logging"
"FRMS/internal/pkg/sensor" "FRMS/internal/pkg/manager"
"time"
//"FRMS/internal/pkg/device"
"context" "context"
"fmt" "fmt"
_ "log" _ "log"
@ -14,38 +18,58 @@ import (
// this package will implement a reactor manager and associated go routines // this package will implement a reactor manager and associated go routines
type Manager interface {
Start() error // status checks
Exit() error
Timeout() (time.Duration, error) // TO Generator
}
func NewManager(max int) Manager {
// takes a heartbeat and max connection attempts
return manager.New(max)
}
type ReactorManager struct { type ReactorManager struct {
*Manager Manager // base manager interface
*ClientManager // client manager (OUTDATED)
// StatusMon *StatusMonitor putting on pause // StatusMon *StatusMonitor putting on pause
*ReactorDevices *ReactorDevices
Config *viper.Viper // config to update Config *viper.Viper // config to update
ID int
Err chan error
} }
type ReactorDevices struct { type ReactorDevices struct {
// device struct // device struct
ID int
Devices map[int]DeviceManager Devices map[int]DeviceManager
sync.RWMutex sync.RWMutex
} }
func NewReactorManager(err chan error) *ReactorManager { func NewReactorManager(errCh chan error) *ReactorManager {
r := &ReactorManager{} // making managers
// sub managers m := NewManager(6)
dm := make(map[int]DeviceManager) dm := make(map[int]DeviceManager)
r.ReactorDevices = &ReactorDevices{Devices: dm} rd := &ReactorDevices{Devices: dm}
// core manager r := &ReactorManager{
r.Manager = NewManager(err) Manager: m,
ReactorDevices: rd,
Err: errCh,
}
return r return r
} }
func (r *ReactorManager) Start() error { func (r *ReactorManager) Start() error {
// allows for extra stuff
logging.Debug(logging.DStart, "RMA %v starting", r.Id)
return r.Manager.Start() return r.Manager.Start()
//go r.StatusMon.Send(&DeviceInfo{Id: r.Id, Type: "Reactor", Status: "[green]ONLINE[white]"}, "Reactor") //go r.StatusMon.Send(&DeviceInfo{Id: r.Id, Type: "Reactor", Status: "[green]ONLINE[white]"}, "Reactor")
} }
func (r *ReactorManager) Exit() { func (r *ReactorManager) Exit() error {
r.Manager.Exit() // allows for extra stuff
logging.Debug(logging.DExit, "RMA %v exiting", r.Id) logging.Debug(logging.DExit, "RMA %v exiting", r.Id)
return r.Manager.Exit()
//go r.StatusMon.Send(&DeviceInfo{Id: r.Id, Type: "Reactor", Status: "[red]OFFLINE[white]", Data: fmt.Sprintf("Last Seen %v", time.Now().Format("Mon at 03:04:05pm MST"))}, "Reactor") //go r.StatusMon.Send(&DeviceInfo{Id: r.Id, Type: "Reactor", Status: "[red]OFFLINE[white]", Data: fmt.Sprintf("Last Seen %v", time.Now().Format("Mon at 03:04:05pm MST"))}, "Reactor")
} }
@ -62,16 +86,14 @@ func (r *ReactorManager) ReactorStatusHandler(ctx context.Context, req *pb.React
// device stuff // device stuff
type DeviceManager interface { type DeviceManager interface {
SetName(string) error // change name LoadConfig(string) error
GetParameter(string) (string, error) // key ret val UpdateDevice(*pb.Device) error
SetParameter(string, string) error // key, val String() string // printing
Update(*pb.Device, *viper.Viper) error // write updates
String() string // printing
} }
func NewDeviceManager(device *pb.Device) (DeviceManager, error) { func NewDeviceManager(dev *pb.Device, config *viper.Viper) (DeviceManager, error) {
// returns a manager struct // returns a manager struct
return sensor.NewDeviceManager(device) return device.NewDeviceManager(dev, config)
} }
func (r *ReactorManager) UpdateDevices(devs []*pb.Device) { func (r *ReactorManager) UpdateDevices(devs []*pb.Device) {
@ -83,21 +105,24 @@ func (r *ReactorManager) UpdateDevices(devs []*pb.Device) {
// looping over devs // looping over devs
if dm, ok := r.ReactorDevices.Devices[int(dev.GetAddr())]; ok { if dm, ok := r.ReactorDevices.Devices[int(dev.GetAddr())]; ok {
// device manager found // device manager found
go dm.Update(dev, r.Config) // update dm go dm.UpdateDevice(dev)
fmt.Println(dm) //fmt.Println(dm)
} else { } else {
// not found // not found
go r.AddDevice(dev, r.Err) go r.AddDevice(dev, r.ID, r.Config, r.Err)
} }
} }
} }
func (r *ReactorDevices) AddDevice(dev *pb.Device, errCh chan error) { func (r *ReactorDevices) AddDevice(dev *pb.Device, id int, config *viper.Viper, errCh chan error) {
r.Lock() // write lock r.Lock() // write lock
defer r.Unlock() defer r.Unlock()
dm, err := NewDeviceManager(dev) dm, err := NewDeviceManager(dev, config)
if err != nil { if err != nil {
errCh <- err errCh <- err
} }
if err := dm.LoadConfig(fmt.Sprintf("reactors.%d", id)); err != nil {
errCh <- err
}
r.Devices[int(dev.GetAddr())] = dm r.Devices[int(dev.GetAddr())] = dm
} }

Loading…
Cancel
Save