refactored reactor side coordinators and managers. in process of adding monitoring framework

main
KeeganForelight 2 years ago
parent 53eabda3b7
commit 530413e9a1

@ -11,7 +11,7 @@ import (
)
type coordinator interface {
Start() error
Start()
}
func NewCoordinator(ip string,port int,ch chan error) coordinator {

Binary file not shown.

Binary file not shown.

@ -1,42 +1,42 @@
package I2C
// file has general wrappers to interact with i2c-tools
import (
"fmt"
_ "fmt"
"log"
"os/exec"
"bytes"
"strings"
"time"
"sync"
"strconv"
)
type I2Cdev struct {
Active bool
LastSeen time.Time
type I2CBus struct {
int
sync.Mutex
}
type I2CMonitor struct { // used in RLC to synchronize accesses
Bus int // bus to use {0,1,2}
Devices map[int]*I2Cdev // mapping to quickly index addresses to their device structs
Scan <-chan struct{} // empty struct for efficient sends
Connect chan<- int
mu sync.Mutex
func NewBus(bus int) *I2CBus {
b := &I2CBus{}
b.int = bus
return b
}
func I2Cconnected(bus int) map[int]bool {
func (b *I2CBus) Scan() map[int]bool {
/*
I2CConnect takes an integer specifying the bus to search as input
Returns a slice of type I2Cdev
Returns all the connected devices
*/
b := strconv.Itoa(bus)
cmd := exec.Command("i2cdetect", "-y", "-r", b)
b.Lock()
defer b.Unlock()
bus := strconv.Itoa(b.int)
cmd := exec.Command("i2cdetect", "-y", "-r", bus)
var out bytes.Buffer
cmd.Stdout = &out
err:= cmd.Run()
if err != nil {
fmt.Println(err)
if err := cmd.Run(); err != nil {
log.Fatal(err)
}
outString := out.String()
// could split by \n too
split := strings.SplitAfter(outString,":")

@ -0,0 +1,33 @@
package I2C
import (
_ "fmt"
_ "sync"
)
type I2CDevice struct {
*I2CBus // embeds bus
bool // stores whether dev is currently connected
int // addr
}
func NewDevice(addr int,bus *I2CBus) *I2CDevice {
d := &I2CDevice{}
d.I2CBus = bus
d.int = addr
return d
}
func (d *I2CDevice) GetAddr() int {
return d.int
}
func (d *I2CDevice) GetStatus() string {
// TODO
return "Unknown"
}
func (d *I2CDevice) GetType() string {
// TODO
return "Unknown"
}

@ -0,0 +1,97 @@
package I2C
import (
"time"
_ "fmt"
"sync"
)
/*
i2c monitor implements a long running monitor responsible for sending active devices to the rlc
*/
type I2CMonitor struct {
*I2CBus
Devices *devs
DevChan chan int
}
type devs struct {
sync.Mutex
m map[int]*I2CDevice
}
func NewMonitor(bus int,ch chan int) *I2CMonitor {
m := &I2CMonitor{}
b := NewBus(bus)
m.I2CBus = b
d := make(map[int]*I2CDevice)
m.Devices = &devs{m:d}
m.DevChan = ch
return m
}
func (m *I2CMonitor) Update() {
/*
scans bus and adds new active devices
*/
devs := m.Scan()
chng := m.Devices.Parse(m.I2CBus,devs)
for _, d := range chng {
go m.ConnectDevice(d)
}
}
func (m *I2CMonitor) Monitor() {
// functon that updates the device list and notifies rlc of any changes to sensor composition
s := make(chan struct{})
t := 5 * time.Second
go func(signal chan struct{},to time.Duration) { // simple signal func to init scan
for {
signal <-struct{}{}
time.Sleep(to)
}
}(s,t)
for {
<-s
m.Update()
}
}
func (m *I2CMonitor) ConnectDevice(addr int) {
m.DevChan <-addr
}
func (m *I2CMonitor) GetDevice(addr int) interface{GetAddr() int; GetStatus() string; GetType() string} {
m.Devices.Lock()
defer m.Devices.Unlock()
return m.Devices.m[addr]
}
func (d *devs) Parse(bus *I2CBus,devices map[int]bool) []int {
d.Lock()
defer d.Unlock()
newdevs := []int{}
for addr, status := range devices {
if dev, exists := d.m[addr]; exists {
// device seen
if status != dev.bool { // if device state changed
dev.bool = status
if status {
newdevs = append(newdevs,dev.GetAddr())
}
}
} else {
// device not seen yet
if status {
// active
newd := NewDevice(addr,bus)
newd.bool = status
d.m[addr] = newd
newdevs = append(newdevs,newd.GetAddr())
}
}
}
return newdevs
}

@ -1,95 +0,0 @@
package I2C
import (
"time"
"fmt"
)
func NewMonitor(c chan<- int, b int) (*I2CMonitor, error) {
m := &I2CMonitor{Bus:b,Connect:c}
m.Devices = make(map[int]*I2Cdev)
s := make(chan struct{})
go scan(s)
m.Scan = s
go m.Monitor()
return m, nil
}
func (m *I2CMonitor) Update() []int {
/*
gets newly connected sensors and updates any currently dead sensors
*/
m.mu.Lock()
defer m.mu.Unlock()
connected := I2Cconnected(m.Bus)
now := time.Now()
newDevs := []int{}
for k,v := range connected {
if _, ok := m.Devices[k]; ok { // if address existed we need to update sm
dev := m.Devices[k]
dev.Active = v
if v {
dev.LastSeen = now
}
m.Devices[k] = dev
} else if v { // new device connected
fmt.Printf("Device %v found\n",k)
newDevs = append(newDevs, k) // mark device for adding
//m.Devices[k] = I2Cdev{Active:true, LastSeen:now} // add new sensors
}
}
return newDevs
}
func (m *I2CMonitor) Remove(addr int) {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.Devices, addr)
}
func scan(ch chan<- struct{}) {
// helper func to time the device updates
for true {
ch <-struct{}{} // weird syntax but just empty struct init
time.Sleep(5 * time.Second)
}
}
func (m *I2CMonitor) Monitor() {
// functon that updates the device list and notifies rlc of any changes to sensor composition
for {
<-m.Scan
fmt.Println("Scanning I2C bus")
newdevs := m.Update()
for _,v := range newdevs {
go m.ConnectDevice(v)
}
}
}
func (m *I2CMonitor) ConnectDevice(addr int) {
m.Connect <-addr
}
func (m *I2CMonitor) CreateDevice(addr int) error {
m.mu.Lock()
defer m.mu.Unlock()
dev := &I2Cdev{Active:true}
m.Devices[addr] = dev
return nil
}
func (m *I2CMonitor) GetStatus(addr int) bool {
m.mu.Lock()
defer m.mu.Unlock()
_,exists := m.Devices[addr]
if exists {
return m.Devices[addr].Active
} else {
return false
}
}
func (d *I2Cdev) GetStatus() bool {
return d.Active
}

@ -0,0 +1,29 @@
package reactor
import (
"sync"
)
// implements grpc handler and device update handler
type SystemUpdates struct {
sync.Mutex
pending map[int]*Dev
}
type Dev struct {
Status string
Type string
}
func (s *SystemUpdates) DeviceUpdateHandler(a int, st, t string) {
s.Lock()
defer s.Unlock()
s.pending[a] = &Dev{Status:st,Type:t}
}
func (s *SystemUpdates) GetPendingChanges() (map[int]*Dev, chan bool) {
// calls chld routine to block and return w/ buffer contents
// chan is used to send grpc res
// true = empty buffer
// false = release lock unchanged
res := s.LockAndWait()

@ -5,7 +5,7 @@ package reactor
import (
"fmt"
"sync"
"net"
//"net"
"time"
"math"
"FRMS/internal/pkg/system"
@ -24,21 +24,14 @@ import (
type Coordinator struct {
*server
*hwinfo
Connected <-chan int
I2CMonitor I2Cmonitor
Sensors *SensorManagers // struct for fine grain locking
Devices *DeviceManagers // struct for fine grain locking
Err chan error
mu sync.Mutex
PendingChanges pc
Active active
*SystemUpdates
pb.UnimplementedMonitoringServer
}
type pc struct {
[]//some type of sensor
sync.Mutex
}
type active struct {
bool
int
@ -61,40 +54,48 @@ type hwinfo struct {
Id uint32
}
type SensorManagers struct {
Managers map[int]SensorManager
type DeviceManagers struct {
Managers map[int]DeviceManager
mu sync.Mutex
}
type SensorManager interface {
GetStatus() uint32
type DeviceManager interface {
Start()
}
type I2CDev interface {
GetAddr() int
GetStatus() string
GetType() string
}
type I2Cmonitor interface {
CreateDevice(int) error
GetStatus(int) bool
func NewDeviceManager(i2c I2CDev, sys *SystemUpdates) DeviceManager {
return sensor.NewDeviceManager(i2c,sys)
}
type I2CMonitor interface {
Monitor()
GetDevice(int) interface{ GetStatus() string; GetType() string;GetAddr() int }
}
func NewSensorManager(addr int,m I2Cmonitor) (SensorManager, error) {
return sensor.NewSensorManager(addr,m)
func NewI2CMonitor(b int,ch chan int) I2CMonitor {
return I2C.NewMonitor(b, ch)
}
func NewCoordinator(ip string,port int,ch chan error) *Coordinator {
serv := &server{Ip:ip,Port:port}
sen := new(SensorManagers)
sen.Managers = make(map[int]SensorManager)
c := &Coordinator{Err:ch,Sensors:sen}
sen := new(DeviceManagers)
sen.Managers = make(map[int]DeviceManager)
c := &Coordinator{Err:ch,Devices:sen}
c.server = serv
c.hwinfo = &hwinfo{}
c.Type = "reactor" // explicit for client stuff
p := make(map[int]*Dev)
sys := &SystemUpdates{pending:p}
c.SystemUpdates = sys
return c
}
func NewI2CMonitor(c chan<- int, b int) (I2Cmonitor, error) {
return I2C.NewMonitor(c,b)
}
type Hardware interface {
GetId() uint32
GetIp() string
@ -107,12 +108,12 @@ func GetHWInfo() (Hardware, error) {
return system.NewHWinfo()
}
func (c *Coordinator) Start() error {
func (c *Coordinator) Start() {
// should discover hwinfo and sensors on its own
// now setting up sensor managers
hw, err := GetHWInfo() // locking provided by struct is only useful on init
if err != nil {
return err
c.Err <-err
}
// setting up hw stuff
c.hwinfo.Ip = hw.GetIp() //get should prevent empty data
@ -120,45 +121,36 @@ func (c *Coordinator) Start() error {
c.Id = hw.GetId()
c.Model = hw.GetModel()
c.Bus = hw.GetBus()
con := make(chan int)
c.Connected = con
if c.I2CMonitor, err = NewI2CMonitor(con,c.Bus); err != nil {
return err
}
fmt.Printf("Reactor at IP addr %v using I2C bus %v\n",c.hwinfo.Ip,c.hwinfo.Bus)
err = c.Connect()
for err != nil {
fmt.Printf("Connection failed: %v \n Retrying!\n",err)
}
c.Register()
go c.Monitor()
return nil
go c.Connect()
}
func (c *Coordinator) Monitor() {
// function to automatically create and destroy sm
ch := make(chan int)
im := NewI2CMonitor(c.Bus,ch)
go im.Monitor()
for {
addr := <-c.Connected
go c.SensorConnect(addr)
d := <-ch
i := im.GetDevice(d)
go c.DeviceConnect(i)
}
}
func (c *Coordinator) SensorConnect(addr int) {
c.Sensors.mu.Lock()
defer c.Sensors.mu.Unlock()
_, exists := c.Sensors.Managers[addr]
if !exists {
go c.I2CMonitor.CreateDevice(addr)
sm, err := NewSensorManager(addr,c.I2CMonitor)
if err != nil {
c.Err <-err
func (c *Coordinator) DeviceConnect(i2c I2CDev) {
c.Devices.mu.Lock()
defer c.Devices.mu.Unlock()
addr := i2c.GetAddr()
if dm, exists := c.Devices.Managers[addr]; !exists{
dm := NewDeviceManager(i2c,c.SystemUpdates)
c.Devices.Managers[addr] = dm
go dm.Start()
} else {
go dm.Start()
}
c.Sensors.Managers[addr] = sm
} // ignoring case of existing sm eventually will have to check for alive
}
func (c *Coordinator) Connect() error {
func (c *Coordinator) Connect() {
// function connects to central server and passes hwinfo
var opts []grpc.DialOption
opts = append(opts,grpc.WithTransportCredentials(insecure.NewCredentials()))
@ -172,12 +164,12 @@ func (c *Coordinator) Connect() error {
to := c.Timeout()
if to == 0 {
err = errors.New("Failed to connect to central server")
return err
c.Err <-err
}
fmt.Printf("Server currently unavailable, retrying in %v ms", to)
time.Sleep(time.Duration(to) * time.Millisecond)
} else {
return err
c.Err <-err
}
}
break;
@ -187,15 +179,16 @@ func (c *Coordinator) Connect() error {
req := &pb.ClientDiscoveryRequest{Id:c.Id,Ip:c.hwinfo.Ip,Port:int32(c.hwinfo.Port),Model:c.Model,ClientType:c.Type}
resp, err := client.ClientDiscoveryHandler(context.Background(), req)
if err != nil {
return err
c.Err <-err
}
if resp.GetSuccess() {
fmt.Println("Central server reached")
return nil
} else {
return errors.New("Failed to reach central server!")
c.Err <-errors.New("Failed to reach central server!")
}
}
/*
we shouldnt register any services to coordinator. we can do that later
func (c *Coordinator) Register() error {
fmt.Printf("Listening for pings on %v:%v\n",c.hwinfo.Ip,c.hwinfo.Port)
@ -220,7 +213,7 @@ func (c *Coordinator) ReactorStatusHandler(ctx context.Context, ping *pb.Reactor
}
return resp, nil
}
*/
func (c *Coordinator) Timeout() int {
c.Active.Lock()
defer c.Active.Unlock()

@ -1,86 +1,111 @@
package sensor
import (
"sync"
_"fmt"
"time"
"sync"
_ "FRMS/internal/pkg/I2C"
"log"
)
type Manager struct {
Addr int
Sensor *SensorInfo
I2CMonitor I2Cmonitor
Kill chan<- bool
mu sync.Mutex
*Dev
I2CDevice
SystemUpdates
*Active
Hb time.Duration
}
// implementing sensor skeleton
type SensorStatus uint32
const (
READY SensorStatus = iota
ALIVE
SENDING
KILLED
)
type SystemUpdates interface {
DeviceUpdateHandler(int, string, string)
}
func (s SensorStatus) String() string {
return [...]string{"Ready","Active","Sending","Disabled"}[s]
type Active struct {
sync.Mutex
bool
int
}
type SensorInfo struct {
Status SensorStatus
type Dev struct {
// last known values
Addr int
Type string
mu sync.Mutex
Status string // could be more efficient but to hell with it
}
func (s *SensorInfo) GetStatus() uint32{
s.mu.Lock()
defer s.mu.Unlock()
return uint32(s.Status)
type I2CDevice interface {
// basic device info
GetAddr() int
GetStatus() string
GetType() string
}
func (s *SensorInfo) GetType() string{
s.mu.Lock()
defer s.mu.Unlock()
return s.Type
func NewDeviceManager(i2c I2CDevice,sys SystemUpdates) *Manager {
m := &Manager{Hb:time.Duration(1*time.Second)}
m.I2CDevice = i2c
m.SystemUpdates = sys
m.Active = &Active{}
m.Dev = &Dev{Addr:i2c.GetAddr(),Type:i2c.GetType(),Status:i2c.GetStatus()}
return m
}
// i2c stuff
type I2Cmonitor interface {
GetStatus(int) bool
func (m *Manager) Start() {
// goal is to start a long running monitoring routine
if !m.Activate() {
log.Fatal("Manager already running!")
} // atomically activated if this runs
go m.Monitor()
}
func NewSensorManager(addr int,i I2Cmonitor) (*Manager,error) {
m := &Manager{Addr:addr,I2CMonitor:i}
go m.Start()
return m, nil
func (m *Manager) Exit() {
if !m.Deactivate() {
log.Fatal("Manager already exited!")
}
func (m *Manager) Start() {
types := map[int]string{97:"DO Sensor",99:"pH Sensor",102:"RTD Sensor"}
m.Sensor = &SensorInfo{Type:types[m.Addr]}
}
func (m *Manager) GetStatus() uint32 {
b := m.I2CMonitor.GetStatus(m.Addr)
m.Sensor.Update(b)
return m.Sensor.GetStatus()
func (m *Manager) Monitor() {
for m.IsActive() {
go m.DeviceStatus()
time.Sleep(m.Hb)
}
func (m *Manager) GetType() string {
return m.Sensor.GetType()
}
// I2C interface stuff
func (m *Manager) DeviceStatus() {
status := m.GetStatus()
if status != m.Status { // changed
go m.DeviceUpdateHandler(m.Addr,status,m.Type)
m.Status = status
}
}
func (s *SensorInfo) Update(b bool) {
s.mu.Lock()
defer s.mu.Unlock()
if b {
s.Status = ALIVE
// atomic activation and deactivation
func (a *Active) Activate() bool {
// returns true if success, false otherwise
a.Lock()
defer a.Unlock()
if a.bool { // already active
return false
} else {
s.Status = KILLED
a.bool = true
a.int = 0
return a.bool
}
}
func (a *Active) Deactivate() bool {
// returns true if success false otherise
a.Lock()
defer a.Unlock()
if a.bool {
a.bool = false
return true
} else { // already deactivated
return a.bool // false
}
}
func (a *Active) IsActive() bool {
a.Lock()
defer a.Unlock()
return a.bool
}

@ -1,8 +0,0 @@
package sensor
import (
_ "fmt"
)

@ -11,7 +11,7 @@ import (
type Coordinator struct {
Type string // ["reactor","tui"]
IncomingClients <-chan *Client
Managers
*Managers
Err chan error
}
@ -21,7 +21,11 @@ type Managers struct {
}
func NewCoordinator(t string,ch chan *Client, err chan error) *Coordinator {
return &Coordinator{Type: t,IncomingClients: ch,Err:err}
d := make(map[uint32](chan<- bool))
m := &Managers{Directory:d}
c := &Coordinator{Type: t,IncomingClients: ch,Err:err}
c.Managers = m
return c
}
func FindNewManager(c *Client,ch chan bool, err chan error) {

145
notes

@ -307,3 +307,148 @@ formally, New Coordinator:
- manager to implement actual manager
- implements
- manager activation and memory
TODO 6/30
creating efficient system mapping and data logging/structure info
idea # 1.5
use json maybe?
how?
- use json to build in the structure of our system via heirarchy
ex)
[
{
"reactor": uint32,
"status": bool,
"connected devices": [
"device" : {
"addr": "0x10"
"type": "ph sensor",
"status": uint32,
"data": [{"ph7.3"}, // json marshelling of the specific struct
},
"device" : {
"addr": "0x11"
"type": "temp sensor"
status: uint32
"data": "t24.5C"
}
]
}
]
use go structs to map components and embed them
can send
need to just spitball here
what am I trying to do at the end of the day?
I am taking sensor measurements
and potentially tweaking control paramters
lets treat each one sperately at firs
sensor measurements
each particular sensor manager will only be responsible for getting data from its sensor
what is the scope of responsibilities?
the sensor manager should log this data locally using a method? json?
how do we aggregate this info?
what if we structure our reactor as a mirror of our coordiantor
rlc job would be to
- establish connection with central server
- wait for connections from devices
- create reactor managers for these devices
this could be really nice
rm (general) job:
- establish connection with device via I2C (client via wifi)
- shut down when device connection drops
- start when device connects again
adding data responsiblities
tuim:
needs to know of a struct of system
[reactorid][deviceaddress][device]
thus needs to know:
- reactor id
- connected device addresses
- device info: can just be a string!
- made up of status and relevant data
what do we rely on
- accurate device info string
- can have someone else make/maintain struct and proxy updates
tuic:
-needs to maintain an atomic struct of system
as above
- still only needs to know
- reactor id
- connected device address maped to device info [string]
relies on
- accurate status updates
- accurate device info
RC ** could have this guy be responsible for change parsing
- respond to updated status from RM and send to TUI
- basically a focus point
RM
- needs to call corret handlers for data coming in from rlc
- can we just use grpc handlers that get embedded in the manager at start?
- handlers can also notify when data doesnt match previous entry
- this would prompt the data to be sent to the rc where it can be forwardd
RLC
- needs to have internal reactor state
-
this gives us a complete "skeleton" of service where we can connect/reconnect clients with appropriate managers
there isnt any functionality yet to actually log data
how do we leverage our current connections and add functionality to managers and coordinators?
methods and channels
each manager is responsible for pinging the associate device {reactor, device, tui}
either sending device info in tui case
or recieving it in reactor/device case
this is why wrapping the gen structures is nessecary. Two different operations
device manager:
could recieve 0-100000000 values
could be any type
could be any number per entry
common struct?
"timestamp":"data"
data could be json struct
- makes it easy to parse at some point
- force sensor driver to write a go struct for the data
- Parse___Data(*json.Unmarshalled)
complete i2c monitor redesign
i2c interface needs to do
data stuff:
- locking structure to serialize commands/reads
- removal function to manually parse dead devices
- reconnecting should reinit device manager and stuff
init stuff:
- keep track of devices seen and connected
- notify rlc of devices that connect/reconnect
build init stuff into a struct that can be embedded?
I2CCoordinator
- created when rlc is created
- tie rlc to i2ccoord via channels
- new devices channel for devices that go offline->online
- send the i2cdevice struct to embed in rm
- can call interface funcs on the embedded interface

@ -0,0 +1,103 @@
this will be a living doc
starting with for connection management:
listener:
- knows
- ip:port to listen to
- clients connect with{ip, port, clientType, model, id}
- is able to
- create coordinators for each clientType
- send new clients to coordiantor handlers via chan
- depends on
- clients sending known data (gRPC)
- NewCoordinator func
- implements
* shouldnt really have any callable methods
coordinator: (General)
- knows
- what client ids have already connected
- which managers correlate to which clients
- is able to
- create managers for new clients
- start managers for clients
- depends on
- listener for client structure
- manager for NewManager() function
- implements
- client connection handling
- general manager call
manager (general):
- knows
- client info
- timeout
- if it is active
- is able to
- establish a connection with a client
- stop when the connection drops
- depends on
- coordinator for start calls
- implements
- client connection creation
- client info storage
manager (reactor):
* embedds the gm
- knows
- devices attached to the reactor
- underlying client manager
- is able to
- maintain device struct
- no pings only control logic (i.e remove device, restart etc)
- depends on
- gm for client conn
- coordiantor for starts
- implements
- reactor structure tracking
manager (tui):
* embedds the gm
- knows
- structure of the system (ie reactors:devices)
- underlying client manager
- is able to
- keep track of system changes
- updates client from buffer or concurrent grpc?
- depends on
- RM to get system info
- coordinator for starts
- implements
- system tracking
reactor level coordinator: (right away this feels so fat compared)
- knows
- current connected devices
- server to init to
- its own hwinfo to establish itself as a client
- is able to:
- reach out to server on boot
- transmit client details
- keep reactor devices current
- depends on
- I2C package to notify of connected devices
- hardware info to get its client info
- server to handle connection
- sm for new manager
- implements
- reactor status handler for updates to other coords/managers
device itself:
- knows
- probe status ( maybe)
- data in buffer
- is able to
- clear buffer on request
- respond to commands
- implements
- data collection
- control execution
- depends on
- nothing its a driver
- maybe the control logic??
Loading…
Cancel
Save