working base to play with. Ended up combining sensors/devs with a key value store. Need to compile GRPC again...

main
KeeganForelight 2 years ago
parent e64efad3fc
commit d8ae5d89b4

BIN
debian

Binary file not shown.

@ -16,4 +16,3 @@ server:
lis: 2022
reactor: 2023
tui: 2024
server_name: ""

@ -3,35 +3,49 @@ package I2C
// file has general wrappers to interact with i2c-tools
import (
"FRMS/internal/pkg/logging"
"FRMS/internal/pkg/system"
"bytes"
"errors"
"fmt"
_ "log"
"encoding/hex"
"os/exec"
"bytes"
"strconv"
"strings"
"sync"
"strconv"
"FRMS/internal/pkg/logging"
"github.com/spf13/viper"
)
type I2CBus struct {
int
type I2CClient struct {
Bus int `mapstructure:"bus"`
sync.Mutex
}
func NewBus(bus int) *I2CBus {
b := &I2CBus{}
b.int = bus
return b
func NewI2CClient(config *viper.Viper) (*I2CClient, error) {
var err error
var bus int
client := &I2CClient{}
if !config.IsSet("i2c.bus") {
// no bus
if bus, err = system.GetBus(); err != nil {
return client, err
}
config.Set("i2c.bus", bus)
}
err = config.UnmarshalKey("i2c", client)
return client, err
}
func (b *I2CBus) Scan() map[int]bool {
func (b *I2CClient) GetConnected() (map[int]bool, error) {
/*
Returns all the connected devices
Returns all the connected devices by address
I can def improve this
*/
b.Lock()
defer b.Unlock()
bus := strconv.Itoa(b.int)
devices := make(map[int]bool) // only keys
bus := strconv.Itoa(b.Bus)
cmd := exec.Command("i2cdetect", "-y", "-r", bus)
var out bytes.Buffer
var errs bytes.Buffer
@ -39,93 +53,59 @@ func (b *I2CBus) Scan() map[int]bool {
cmd.Stdout = &out
if err := cmd.Run(); err != nil {
logging.Debug(logging.DError, "I2C error performing scan. %v", errs.String())
return devices, err
}
outString := out.String()
// could split by \n too
split := strings.SplitAfter(outString,":")
split := strings.SplitAfter(outString, ":")
// 1st entry is garbage headers and ending is always \n##:
split = split[1:]
// create empty slice for all the devices
//var devices []i2cdev
devices := map[int]bool{} //maps device addresses to active bool
for i,v := range split {
lst := strings.Index(v,"\n")
for i, v := range split {
lst := strings.Index(v, "\n")
trimmed := v[:lst]
trimmed = strings.Trim(trimmed," ")
trimmed = strings.Trim(trimmed, " ")
// trimmed now holds just possible sensor addresses
count := strings.Split(trimmed," ")
for j,d := range count {
count := strings.Split(trimmed, " ")
for j, d := range count {
// the first row has to be offset by 3 but after its just i*16 + j
offset := 0
if i == 0 {
offset = 3
}
addr := i*16 + j + offset
if strings.Contains(d,"--") || strings.Contains(d,"UU") {
// address is unconnected or reserved
//devices = append(devices, I2Cdev{Addr:addr,Active:false})
devices[addr] = false
} else {
//devices = append(devices, I2Cdev{Addr:addr,Active:true,LastSeen:now})
if !strings.Contains(d, "--") && !strings.Contains(d, "UU") {
// active
fmt.Printf("Found %d(%x)\n", addr, addr)
devices[addr] = true
}
}
}
return devices
}
func (b *I2CBus) GetStatus(addr int) bool {
b.Lock()
defer b.Unlock()
bus := strconv.Itoa(b.int)
a := strconv.Itoa(addr)
cmd := exec.Command("i2cdetect","-y","-r",bus,a,a)
var out bytes.Buffer
var errs bytes.Buffer
cmd.Stderr = &errs
cmd.Stdout = &out
if err := cmd.Run(); err != nil {
logging.Debug(logging.DError,"I2C error getting status! %v", errs.String())
}
outString := out.String()
split := strings.SplitAfter(outString,":")
split = split[1:] // remove garbage header
val := int(addr/16) // if addr = 90 90/16 = int(5.6) = 5 will be in 5th row
dev := split[val]
lst := strings.Index(dev,"\n")
dev = dev[:lst]
trimmed := strings.Trim(dev," \n")
if strings.Contains(trimmed,"--") {
return false
} else {
return true
}
return devices, nil
}
func (b *I2CBus) GetData(addr int) string {
func (b *I2CClient) SendCmd(addr int, cmd string) (string, error) {
b.Lock()
defer b.Unlock()
/*
bus := strconv.Itoa(b.int)
a := strconv.FormatInt(int64(addr),16)
cmd := exec.Command("i2ctransfer","-y",bus,fmt.Sprintf("r40@0x%s",a))
a := strconv.FormatInt(int64(addr), 16)
cmd := exec.Command("i2ctransfer", "-y", bus, fmt.Sprintf("r40@0x%s", a))
var out bytes.Buffer
var errs bytes.Buffer
cmd.Stderr = &errs
cmd.Stdout = &out
if err := cmd.Run(); err != nil {
logging.Debug(logging.DError,"I2C error getting data! %v", errs.String())
logging.Debug(logging.DError, "I2C error getting data! %v", errs.String())
}
outString := out.String()
split := strings.SplitAfter(outString," ") //getting chars 0x12 0x2f etc
split := strings.SplitAfter(outString, " ") //getting chars 0x12 0x2f etc
var final string
for _,v := range split {
for _, v := range split {
trimmed := strings.TrimLeft(v, "0x ") // trimming extra bs in front of num
trimmed = strings.TrimRight(trimmed," \n") // trimming back
trimmed = strings.TrimRight(trimmed, " \n") // trimming back
if trimmed != "ff" {
final += trimmed
}
@ -135,4 +115,6 @@ func (b *I2CBus) GetData(addr int) string {
panic(err)
}
return string(ret)
*/
return "", errors.New("NOT IMPLM")
}

@ -1,72 +0,0 @@
package I2C
import (
"fmt"
"sync"
"time"
)
type I2CDevice struct {
*I2CBus // embeds bus
bool // stores whether dev is currently connected
int // addr
Data *data
}
type data struct {
string
bool
sync.Mutex
}
func (d I2CDevice) String() string {
t := map[int]string{97: "DO Sensor", 99: "pH Sensor", 102: "Temperature Sensor", 64: "DHT11 Sensor"}
return t[d.int]
}
func NewDevice(addr int, bus *I2CBus) *I2CDevice {
d := &I2CDevice{}
d.I2CBus = bus
d.int = addr
d.Data = &data{}
return d
}
func (d *I2CDevice) GetAddr() int {
return d.int
}
func (d *I2CDevice) GetStatus() bool {
// TODO
return d.I2CBus.GetStatus(d.int)
}
func (d *I2CDevice) GetType() string {
// TODO
return fmt.Sprint(d)
}
func (d *I2CDevice) GetData() string {
d.Data.Lock()
defer d.Data.Unlock()
d.Data.string = d.I2CBus.GetData(d.int)
return d.Data.string
}
func (d *data) Active() {
d.Lock()
defer d.Unlock()
if !d.bool {
d.string = ""
d.bool = true
}
}
func (d *data) Killed() {
d.Lock()
defer d.Unlock()
if d.bool {
d.string = time.Now().Format("Mon at 03:04:05pm MST")
d.bool = false
}
}

@ -1,102 +0,0 @@
package I2C
import (
_ "fmt"
"sync"
"time"
)
/*
i2c monitor implements a long running monitor responsible for sending active devices to the rlc
*/
type I2CMonitor struct {
*I2CBus
Devices *devs
DevChan chan int
}
type devs struct {
sync.Mutex
m map[int]*I2CDevice
}
func NewMonitor(bus int, ch chan int) *I2CMonitor {
m := &I2CMonitor{}
b := NewBus(bus)
m.I2CBus = b
d := make(map[int]*I2CDevice)
m.Devices = &devs{m: d}
m.DevChan = ch
return m
}
func (m *I2CMonitor) Update() {
/*
scans bus and adds new active devices
*/
devs := m.Scan()
chng := m.Devices.Parse(m.I2CBus, devs)
for _, d := range chng {
go m.ConnectDevice(d)
}
}
func (m *I2CMonitor) Monitor() {
// functon that updates the device list and notifies rlc of any changes to sensor composition
s := make(chan struct{})
t := 5 * time.Second
go func(signal chan struct{}, to time.Duration) { // simple signal func to init scan
for {
signal <- struct{}{}
time.Sleep(to)
}
}(s, t)
for {
<-s
m.Update()
}
}
func (m *I2CMonitor) ConnectDevice(addr int) {
m.DevChan <- addr
}
func (m *I2CMonitor) GetDevice(addr int) interface {
GetAddr() int
GetData() string
GetStatus() bool
GetType() string
} {
m.Devices.Lock()
defer m.Devices.Unlock()
return m.Devices.m[addr]
}
func (d *devs) Parse(bus *I2CBus, devices map[int]bool) []int {
d.Lock()
defer d.Unlock()
newdevs := []int{}
for addr, status := range devices {
if dev, exists := d.m[addr]; exists {
// device seen
if status != dev.bool { // if device state changed
dev.bool = status
if status {
newdevs = append(newdevs, dev.GetAddr())
}
}
} else {
// device not seen yet
if status {
// active
newd := NewDevice(addr, bus)
newd.bool = status
d.m[addr] = newd
newdevs = append(newdevs, newd.GetAddr())
}
}
}
return newdevs
}

@ -13,8 +13,7 @@ message ReactorStatusResponse {
message ReactorStatusPing {
int32 id = 1;
repeated Sensor sensors = 2;
repeated Device devices = 3;
repeated Device devices = 2;
}
enum Status {
@ -23,17 +22,9 @@ enum Status {
UNKOWN = 2; // Disconnected
}
message Sensor {
message Device {
int32 addr = 1; // i2c addr
string name = 2; // use readable name, changable
Status status = 3;
int32 sampleRate = 4; // in seconds
string data = 5; // open for any sort of format
}
message Device {
int32 addr = 1; // i2c addr used for ID
string name = 2; // user readable name, changable
Status status = 3;
string data = 4; // any format
map<string,string> data = 4; // k=v, format
}

@ -0,0 +1,75 @@
package reactor
import (
pb "FRMS/internal/pkg/grpc"
"FRMS/internal/pkg/sensor"
"sync"
)
type DeviceManager interface {
GetInfo() (*pb.Device, error)
SetStatus(int) error
}
func NewDeviceManager(addr int, status int) (DeviceManager, error) {
dev := &pb.Device{Addr: int32(addr), Status: pb.Status(int32(status))}
return sensor.NewDeviceManager(dev)
}
type DeviceCoordinator struct {
Managers map[int]DeviceManager
sync.RWMutex
}
func NewDeviceCoordinator() *DeviceCoordinator {
dm := &DeviceCoordinator{}
dm.Managers = make(map[int]DeviceManager)
return dm
}
func (c *DeviceCoordinator) UpdateDevices(active map[int]bool) error {
// update dev status, add new ones
c.Lock()
defer c.Unlock()
var err error
for addr, _ := range active {
// loop over devs
if _, ok := c.Managers[addr]; !ok {
// no device
if c.Managers[addr], err = NewDeviceManager(addr, 1); err != nil {
return err
}
}
}
// all devs accounted for
for addr, dm := range c.Managers {
if active[addr] {
err = dm.SetStatus(1)
} else {
err = dm.SetStatus(0)
}
if err != nil {
return err
}
}
return err
}
func (c *DeviceCoordinator) GetDevices() ([]*pb.Device, error) {
// TODO
c.RLock()
defer c.RUnlock()
var err error
var devices []*pb.Device
var dev *pb.Device
for _, dm := range c.Managers {
dev, err = dm.GetInfo()
devices = append(devices, dev)
}
return devices, err
}

@ -10,101 +10,18 @@ import (
)
// implements grpc handler and device data aggregater handler
type DeviceStatus struct {
Addr int
Status pb.Status // 0 = Dead, 1 = alive, 2 = unkown
Type string
Data string
}
// get reactor/device status
func (c *Coordinator) DevStatus(ch chan *DeviceStatus, a int, dm DeviceManager) {
d := &DeviceStatus{Addr: a}
d.Type = dm.GetType()
//d.Status = dm.GetStatus()
d.Status = 0
d.Data = dm.GetData()
ch <- d
}
func (c *Coordinator) GetStatus() []*pb.Device {
// db stuff
//api := client.WriteAPIBlocking(c.Org, c.Bucket)
//var wg sync.WaitGroup
devs := []*pb.Device{}
return devs
/*
statusChan := make(chan *DeviceStatus)
c.Devices.Lock()
for a, dm := range c.Devices.Managers {
wg.Add(1)
go c.DevStatus(statusChan, a, dm)
}
c.Devices.Unlock()
allDone := make(chan struct{})
go func() {
wg.Wait()
allDone <- struct{}{}
}() // once all the status are sent we send all done on the chan
for {
select {
case s := <-statusChan:
fmt.Printf("%v is %v\n", s.Type, s.Status)
/*
data := strings.Split(s.Data,",") // T:10C,H:102% -> T:10C H:10%
for _, m := range data {
var meas string
splt := strings.Split(m,":") // T 10C or H 10%
if splt[0] == "T" {
meas = "Temperature"
} else if splt[0] == "H" {
meas = "Humidity"
}
val, err := strconv.ParseFloat(strings.Trim(splt[1]," %C\n"), 64)
if err != nil {
panic(err)
}
p := influxdb2.NewPoint("measurements",map[string]string{"type":meas},map[string]interface{}{"val":val},time.Now())
if err := api.WritePoint(context.Background(), p); err != nil {
panic(err)
}
}
devs = append(devs,&pb.Device{Addr:int32(s.Addr),Type:s.Type,Status:s.Status,Data:s.Data})
*/
/*
wg.Done()
case <-allDone:
return devs
}
}
*/
}
// grpc status update handler
func (c *Coordinator) Ping() {
// sends all device status to central coordinator
fmt.Printf("Pinging cc\n")
devs := c.GetStatus()
req := &pb.ReactorStatusPing{Id: int32(c.ID), Devices: devs}
_, err := c.MonitoringClient.ReactorStatusHandler(context.Background(), req)
// get devices
devices, err := c.GetDevices()
if err != nil {
c.Err <- err
go c.Exit()
}
}
/*
func (c *Coordinator) Register() {
ip := c.hwinfo.Ip
if lis, err := net.Listen("tcp", fmt.Sprintf("%v:0",ip)); err != nil {
log.Fatal(err)
} else {
c.hwinfo.Port = lis.Addr().(*net.TCPAddr).Port
grpcServer := grpc.NewServer()
pb.RegisterMonitoringServer(grpcServer,c)
go grpcServer.Serve(lis)
req := &pb.ReactorStatusPing{Id: int32(c.ID), Devices: devices}
if _, err := c.MonitoringClient.ReactorStatusHandler(context.Background(), req); err != nil {
c.Err <- err
}
logging.Debug(logging.DStart, "Listening for pings on %v:%v\n",ip,c.hwinfo.Port)
}
*/

@ -31,7 +31,15 @@ func NewDBClient(config *viper.Viper) (DBClient, error) {
return influxdb.NewDBClient(config)
}
// Coordinator == Reactor Level Coordinator
type I2CClient interface {
// simple client to push responsibilites to sensor
GetConnected() (map[int]bool, error) // gets all connected addr
SendCmd(int, string) (string, error) // send cmd, string is return
}
func NewI2CClient(config *viper.Viper) (I2CClient, error) {
return I2C.NewI2CClient(config)
}
type Server struct {
// embed
@ -39,26 +47,28 @@ type Server struct {
Port int `mapstructure:"port"`
}
// Coordinator == Reactor Level Coordinator
type Coordinator struct {
Name string `mapstructure:"name,omitempty"`
ID int `mapstructure:"id,omitempty"`
Bus int `mapstructure:"bus,omitempty"`
Model string `mapstructure:"model,omitempty"`
// server info
// server info embedded
Server
// database
Database DBClient
I2C I2CClient
// config
Config *viper.Viper
MonitoringClient pb.MonitoringClient
// connected devices
*Devices // struct for locking
*Sensors
*DeviceCoordinator // struct for locking
// other stuff and things
Err chan error
mu sync.Mutex
HB time.Duration
PingTimer chan struct{}
// db client
DB DBClient
Active active
}
@ -68,72 +78,22 @@ type active struct {
sync.Mutex
}
type Sensors struct {
Managers map[int]SensorManager
sync.Mutex
}
type SensorManager interface {
Start()
GetType() string
GetStatus() string
GetData() string
}
type Devices struct {
Managers map[int]DeviceManager
sync.Mutex
}
// basic devicemanager struct manipulations
type DeviceManager interface {
Start()
GetType() string
GetStatus() string
GetData() string
}
type I2CDev interface {
GetAddr() int
GetData() string
GetStatus() bool
GetType() string
}
//func NewDeviceManager(i2c I2CDev) DeviceManager {
//return sensor.NewDeviceManager(i2c)
//}
type I2CMonitor interface {
Monitor()
GetDevice(int) interface {
GetAddr() int
GetStatus() bool
GetData() string
GetType() string
}
}
func NewI2CMonitor(b int, ch chan int) I2CMonitor {
return I2C.NewMonitor(b, ch)
}
func NewCoordinator(config *viper.Viper, ch chan error) *Coordinator {
// sensor/device manager struct
dm := new(Devices)
dm.Managers = make(map[int]DeviceManager)
sm := new(Sensors)
sm.Managers = make(map[int]SensorManager)
c := &Coordinator{Err: ch, Devices: dm, Sensors: sm, Config: config}
// coord
c := &Coordinator{Err: ch, Config: config}
c.DeviceCoordinator = NewDeviceCoordinator()
// hb defaults to 5
c.HB = time.Duration(5 * time.Second)
// this is going to be scuffed
//c.DB = &DB{Bucket: "bb", Org: "ForeLight", URL: url, Token: "S1UZssBu6KPfHaQCt34pZFpyc5lzbH9XanYJWCkOI5FqLY7gq205C6FTH-CmugiPH6o2WoKlTkEuPgIfaJjAhw=="}
// setup db
var err error
if c.DB, err = NewDBClient(config); err != nil {
if c.Database, err = NewDBClient(config); err != nil {
ch <- err
}
if c.I2C, err = NewI2CClient(config); err != nil {
ch <- err
}
@ -153,7 +113,7 @@ func (c *Coordinator) Start() {
c.Config.UnmarshalKey("reactor", c)
go c.Monitor()
go c.Discover()
go c.DB.Start()
go c.Database.Start()
}
func (c *Coordinator) LoadInfo() error {
@ -178,32 +138,21 @@ func (c *Coordinator) LoadInfo() error {
}
c.Config.Set("reactor.model", model)
}
// check Bus
if !c.Config.IsSet("reactor.bus") {
// get bus
var bus int
if bus, err = system.GetBus(); err != nil {
return err
}
c.Config.Set("reactor.bus", bus)
}
// all good
return err
}
func (c *Coordinator) Monitor() {
// function to automatically create and destroy sm
// scuffedaf
dch := make(chan int)
im := NewI2CMonitor(c.Bus, dch)
go im.Monitor()
// periodically grabs connected devs and updates list
for c.IsActive() {
select {
case d := <-dch:
i := im.GetDevice(d)
go c.DeviceConnect(i)
case <-c.PingTimer:
// check devs and ping
active, err := c.I2C.GetConnected()
if err != nil {
c.Err <- err
}
go c.UpdateDevices(active)
go c.Ping()
}
}
@ -217,20 +166,6 @@ func (c *Coordinator) HeartBeat() {
}
}
func (c *Coordinator) DeviceConnect(i2c I2CDev) {
c.Devices.Lock()
defer c.Devices.Unlock()
addr := i2c.GetAddr()
fmt.Printf("Device %d (%x) found!\n", addr, addr)
//if dm, exists := c.Devices.Managers[addr]; !exists {
//dm := NewDeviceManager(i2c)
//c.Devices.Managers[addr] = dm
//go dm.Start()
//} else {
//go dm.Start()
//}
}
func (c *Coordinator) Discover() {
// sets up connection to central coordiantor
conn, err := c.Connect(c.Ip, c.Port)

@ -4,7 +4,6 @@ package sensor
import (
pb "FRMS/internal/pkg/grpc"
"errors"
"sync"
"github.com/spf13/viper"
@ -12,31 +11,13 @@ import (
type DOSensorManager struct {
// do sensor manager
Sensor *pb.Sensor // for sending/updating
*Manager
sync.RWMutex
}
func (s *DOSensorManager) SetName(name string) error {
// #TODO
return errors.New("UNIMPL")
}
func (s *DOSensorManager) SetStatus(status int) error {
// #TODO
return errors.New("UNIMPL")
}
func (s *DOSensorManager) SetSampleRate(rate int) error {
// #TODO
return errors.New("UNIMPL")
}
func (s *DOSensorManager) Update(sensor *pb.Sensor, config *viper.Viper) error {
func (s *DOSensorManager) Update(sensor *pb.Device, config *viper.Viper) error {
// updates info
s.Lock()
defer s.Unlock()
s.Sensor = sensor
return errors.New("UNIMPL")
return s.Manager.Update(sensor)
}
func (s *DOSensorManager) String() string {

@ -1,10 +1,53 @@
package sensor
import (
_ "FRMS/internal/pkg/I2C"
_ "fmt"
pb "FRMS/internal/pkg/grpc"
"errors"
"sync"
)
// base device manager
type Manager struct {
// base dm
Device *pb.Device // for sending/updating
sync.RWMutex
}
func (m *Manager) SetName(name string) error {
// #TODO
return errors.New("UNIMPL")
}
func (m *Manager) SetStatus(status int) error {
// #TODO
return errors.New("UNIMPL")
}
func (m *Manager) SetParameter(key, val string) error {
// #TODO
return errors.New("UNIMPL")
}
func (m *Manager) GetParameter(key string) (string, error) {
// #TODO
return "", errors.New("UNIMPL")
}
func (m *Manager) Update(device *pb.Device) error {
// updates info
m.Lock()
defer m.Unlock()
m.Device = device
return errors.New("UNIMPL")
}
func (m *Manager) GetInfo() (*pb.Device, error) {
m.RLock()
defer m.RUnlock()
return m.Device, nil
}
/*
// I think most of this is unnessecary as hell
type Manager struct {

@ -11,54 +11,37 @@ import (
/*
Returns the correct manager for sensor/device
*/
type SensorManager interface {
// basic sensor stuff
SetName(string) error // change display name
SetStatus(int) error // update status
SetSampleRate(int) error // update sample rate
Update(*pb.Sensor, *viper.Viper) error // write updates
String() string // printable
}
type DeviceManager interface {
// basic device stuff
SetName(string) error // change displayed name
SetStatus(int) error // status change
SetParameter(string, string) error // key, val
GetParameter(string) (string, error) // key, val
Update(*pb.Device, *viper.Viper) error // write updates
GetInfo() (*pb.Device, error) // gets info
String() string // printable
}
func NewSensorManager(sensor *pb.Sensor) (SensorManager, error) {
// returns correct sensor manager by ID
var sm SensorManager
func NewDeviceManager(device *pb.Device) (DeviceManager, error) {
// returns correct device manager by ID
m := &Manager{Device: device}
var dm DeviceManager
var err error
switch id := sensor.GetAddr(); id {
switch id := device.GetAddr(); id {
case 97:
// DO
sm = &DOSensorManager{Sensor: sensor}
dm = &DOSensorManager{Manager: m}
case 99:
// pH
sm = &PHSensorManager{Sensor: sensor}
dm = &PHSensorManager{Manager: m}
case 102:
// RTD
sm = &RTDSensorManager{Sensor: sensor}
default:
err = errors.New(fmt.Sprintf("Error: sensor id %v unrecognized!", id))
}
return sm, err
}
func NewDeviceManager(device *pb.Device) (DeviceManager, error) {
// returns correct device manager by ID
var dm DeviceManager
var err error
switch id := device.GetAddr(); id {
dm = &RTDSensorManager{Manager: m}
case 256:
// PWM
dm = &PWMDeviceManager{Device: device}
dm = &PWMDeviceManager{Manager: m}
default:
err = errors.New(fmt.Sprintf("Error: device id %v unrecognized!", id))
}

@ -4,7 +4,6 @@ package sensor
import (
pb "FRMS/internal/pkg/grpc"
"errors"
"sync"
"github.com/spf13/viper"
@ -12,31 +11,13 @@ import (
type PHSensorManager struct {
// do sensor manager
Sensor *pb.Sensor // for sending/updating
*Manager
sync.RWMutex
}
func (s *PHSensorManager) SetName(name string) error {
// #TODO
return errors.New("UNIMPL")
}
func (s *PHSensorManager) SetStatus(status int) error {
// #TODO
return errors.New("UNIMPL")
}
func (s *PHSensorManager) SetSampleRate(rate int) error {
// #TODO
return errors.New("UNIMPL")
}
func (s *PHSensorManager) Update(sensor *pb.Sensor, config *viper.Viper) error {
func (s *PHSensorManager) Update(sensor *pb.Device, config *viper.Viper) error {
// updates info
s.Lock()
defer s.Unlock()
s.Sensor = sensor
return errors.New("UNIMPL")
return s.Manager.Update(sensor)
}
func (s *PHSensorManager) String() string {

@ -12,20 +12,10 @@ import (
type PWMDeviceManager struct {
// do sensor manager
Device *pb.Device // for sending/updating
*Manager
sync.RWMutex
}
func (s *PWMDeviceManager) SetName(name string) error {
// #TODO
return errors.New("UNIMPL")
}
func (s *PWMDeviceManager) SetStatus(status int) error {
// #TODO
return errors.New("UNIMPL")
}
func (s *PWMDeviceManager) SetParameter(key, val string) error {
// #TODO
return errors.New("UNIMPL")
@ -33,10 +23,7 @@ func (s *PWMDeviceManager) SetParameter(key, val string) error {
func (s *PWMDeviceManager) Update(device *pb.Device, config *viper.Viper) error {
// updates info
s.Lock()
defer s.Unlock()
s.Device = device
return errors.New("UNIMPL")
return s.Manager.Update(device)
}
func (s *PWMDeviceManager) String() string {

@ -4,7 +4,6 @@ package sensor
import (
pb "FRMS/internal/pkg/grpc"
"errors"
"sync"
"github.com/spf13/viper"
@ -12,31 +11,13 @@ import (
type RTDSensorManager struct {
// do sensor manager
Sensor *pb.Sensor // for sending/updating
*Manager
sync.RWMutex
}
func (s *RTDSensorManager) SetName(name string) error {
// #TODO
return errors.New("UNIMPL")
}
func (s *RTDSensorManager) SetStatus(status int) error {
// #TODO
return errors.New("UNIMPL")
}
func (s *RTDSensorManager) SetSampleRate(rate int) error {
// #TODO
return errors.New("UNIMPL")
}
func (s *RTDSensorManager) Update(sensor *pb.Sensor, config *viper.Viper) error {
func (s *RTDSensorManager) Update(sensor *pb.Device, config *viper.Viper) error {
// updates info
s.Lock()
defer s.Unlock()
s.Sensor = sensor
return errors.New("UNIMPL")
return s.Manager.Update(sensor)
}
func (s *RTDSensorManager) String() string {

@ -17,17 +17,10 @@ import (
type ReactorManager struct {
*Manager
// StatusMon *StatusMonitor putting on pause
*ReactorSensors
*ReactorDevices
Config *viper.Viper // config to update
}
type ReactorSensors struct {
// sensor struct
Sensors map[int]SensorManager
sync.RWMutex
}
type ReactorDevices struct {
// device struct
Devices map[int]DeviceManager
@ -38,9 +31,7 @@ func NewReactorManager(err chan error) *ReactorManager {
r := &ReactorManager{}
// sub managers
dm := make(map[int]DeviceManager)
sm := make(map[int]SensorManager)
r.ReactorDevices = &ReactorDevices{Devices: dm}
r.ReactorSensors = &ReactorSensors{Sensors: sm}
// core manager
r.Manager = NewManager(err)
@ -63,33 +54,21 @@ func (r *ReactorManager) ReactorStatusHandler(ctx context.Context, req *pb.React
//go r.PingReset()
fmt.Printf("Recieved ping from %v!\n", req.GetId())
// update devices/sensors
go r.UpdateSensors(req.GetSensors())
go r.UpdateDevices(req.GetDevices())
return &pb.ReactorStatusResponse{Id: int32(r.Id)}, nil
}
// sensor/device stuff
type SensorManager interface {
SetName(string) error // change name
SetSampleRate(int) error // change sample rate
Update(*pb.Sensor, *viper.Viper) error // write updates
String() string // printing
}
// device stuff
type DeviceManager interface {
SetName(string) error // change name
GetParameter(string) (string, error) // key ret val
SetParameter(string, string) error // key, val
Update(*pb.Device, *viper.Viper) error // write updates
String() string // printing
}
func NewSensorManager(sens *pb.Sensor) (SensorManager, error) {
// returns a manager struct
return sensor.NewSensorManager(sens)
}
func NewDeviceManager(device *pb.Device) (DeviceManager, error) {
// returns a manager struct
return sensor.NewDeviceManager(device)
@ -121,29 +100,3 @@ func (r *ReactorDevices) AddDevice(dev *pb.Device, errCh chan error) {
}
r.Devices[int(dev.GetAddr())] = dm
}
func (r *ReactorManager) UpdateSensors(sensors []*pb.Sensor) {
// pass updates to correct manager
r.ReactorSensors.RLock() // read lock
defer r.ReactorSensors.RUnlock()
for _, sens := range sensors {
// looping over sensors
if sm, ok := r.ReactorSensors.Sensors[int(sens.GetAddr())]; ok {
// sensor manager found
go sm.Update(sens, r.Config) // update sm
} else {
// not found
go r.AddSensor(sens, r.Err)
}
}
}
func (r *ReactorSensors) AddSensor(sensor *pb.Sensor, errCh chan error) {
r.Lock() // write lock
defer r.Unlock()
sm, err := NewSensorManager(sensor)
if err != nil {
errCh <- err
}
r.Sensors[int(sensor.GetAddr())] = sm
}

Loading…
Cancel
Save