I2C device reconnect/disconnect added

main
KeeganForelight 2 years ago
parent 01115294ea
commit cf95467512

Binary file not shown.

@ -5,17 +5,17 @@ import (
"fmt"
)
func NewMonitor(c,dc <-chan int, b int) error {
m := &I2CMonitor{Bus:b,Connect:c,Disconnect:dc}
m.Devices = make(map[int]I2Cdev)
func NewMonitor(c chan<- int, b int) (*I2CMonitor, error) {
m := &I2CMonitor{Bus:b,Connect:c}
m.Devices = make(map[int]*I2Cdev)
s := make(chan struct{})
go scan(s)
m.Scan = s
go m.Monitor()
return nil
return m, nil
}
func (m *I2CMonitor) Update() map[int]bool {
func (m *I2CMonitor) Update() []int {
/*
gets newly connected sensors and updates any currently dead sensors
*/
@ -23,21 +23,19 @@ func (m *I2CMonitor) Update() map[int]bool {
defer m.mu.Unlock()
connected := I2Cconnected(m.Bus)
now := time.Now()
newDevs := make(map[int]bool)
newDevs := []int{}
for k,v := range connected {
if _, ok := m.Devices[k]; ok { // if address existed
entry := m.Devices[k]
entry.Active = v // update the value
if _, ok := m.Devices[k]; ok { // if address existed we need to update sm
dev := m.Devices[k]
dev.Active = v
if v {
entry.LastSeen = now // if its active, update last seen time
} else {
newDevs[k] = v // mark the device for purging
dev.LastSeen = now
}
m.Devices[k] = entry
} else if v { // else theres a chance its new
m.Devices[k] = dev
} else if v { // new device connected
fmt.Printf("Device %v found\n",k)
newDevs[k] = v // mark device for adding
m.Devices[k] = I2Cdev{Active:true, LastSeen:now} // add new sensors
newDevs = append(newDevs, k) // mark device for adding
//m.Devices[k] = I2Cdev{Active:true, LastSeen:now} // add new sensors
}
}
return newDevs
@ -49,7 +47,7 @@ func (m *I2CMonitor) Remove(addr int) {
delete(m.Devices, addr)
}
func scan(ch <-chan struct{}) {
func scan(ch chan<- struct{}) {
// helper func to time the device updates
for true {
ch <-struct{}{} // weird syntax but just empty struct init
@ -61,14 +59,37 @@ func (m *I2CMonitor) Monitor() {
// functon that updates the device list and notifies rlc of any changes to sensor composition
for {
<-m.Scan
fmt.Println("Scanning I2C bus")
newdevs := m.Update()
for k,v := range newdevs {
if v {
m.Connect <-k
} else {
m.Disconnect <-k
}
for _,v := range newdevs {
go m.ConnectDevice(v)
}
}
}
func (m *I2CMonitor) ConnectDevice(addr int) {
m.Connect <-addr
}
func (m *I2CMonitor) CreateDevice(addr int) error {
m.mu.Lock()
defer m.mu.Unlock()
dev := &I2Cdev{Active:true}
m.Devices[addr] = dev
return nil
}
func (m *I2CMonitor) GetStatus(addr int) bool {
m.mu.Lock()
defer m.mu.Unlock()
_,exists := m.Devices[addr]
if exists {
return m.Devices[addr].Active
} else {
return false
}
}
func (d *I2Cdev) GetStatus() bool {
return d.Active
}

@ -17,10 +17,9 @@ type I2Cdev struct {
type I2CMonitor struct { // used in RLC to synchronize accesses
Bus int // bus to use {0,1,2}
Devices map[int]I2Cdev // mapping to quickly index addresses to their device structs
Devices map[int]*I2Cdev // mapping to quickly index addresses to their device structs
Scan <-chan struct{} // empty struct for efficient sends
Connect chan<- int
Disconnect chan<- int
mu sync.Mutex
}

@ -129,7 +129,7 @@ type SensorStatus struct {
Addr int32 `protobuf:"varint,1,opt,name=addr,proto3" json:"addr,omitempty"`
Type string `protobuf:"bytes,2,opt,name=type,proto3" json:"type,omitempty"`
Status bool `protobuf:"varint,3,opt,name=status,proto3" json:"status,omitempty"`
Status string `protobuf:"bytes,3,opt,name=status,proto3" json:"status,omitempty"`
}
func (x *SensorStatus) Reset() {
@ -178,11 +178,11 @@ func (x *SensorStatus) GetType() string {
return ""
}
func (x *SensorStatus) GetStatus() bool {
func (x *SensorStatus) GetStatus() string {
if x != nil {
return x.Status
}
return false
return ""
}
var File_internal_pkg_grpc_monitoring_proto protoreflect.FileDescriptor
@ -202,7 +202,7 @@ var file_internal_pkg_grpc_monitoring_proto_rawDesc = []byte{
0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x12, 0x0a, 0x04, 0x61, 0x64, 0x64, 0x72, 0x18,
0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x04, 0x61, 0x64, 0x64, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x74,
0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12,
0x16, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52,
0x16, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52,
0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x32, 0x5a, 0x0a, 0x0a, 0x6d, 0x6f, 0x6e, 0x69, 0x74,
0x6f, 0x72, 0x69, 0x6e, 0x67, 0x12, 0x4c, 0x0a, 0x13, 0x53, 0x65, 0x6e, 0x73, 0x6f, 0x72, 0x53,
0x74, 0x61, 0x74, 0x75, 0x73, 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x12, 0x19, 0x2e, 0x67,

@ -19,6 +19,6 @@ message SensorStatusResponse {
message SensorStatus {
int32 addr = 1;
string type = 2;
bool status = 3;
string status = 3;
}

@ -16,109 +16,105 @@ import (
pb "FRMS/internal/pkg/grpc"
)
type Hardware struct {
Id uint32
Ip string
Bus int
Model string
Port int
}
type HWinfo interface {
Get() (*Hardware, error)
type Hardware interface {
GetId() uint32
GetIp() string
GetBus() int
GetModel() string
GetPort() int
}
type Coordinator struct {
Server string
HW Hardware
Connected <-chan int
Disconnected <-chan int
Sensors map[int]SensorManager
I2CMonitor I2Cmonitor
Sensors *SensorManagers // struct for fine grain locking
Err chan error
mu sync.Mutex
pb.UnimplementedMonitoringServer
}
type SensorManagers struct {
Managers map[int]SensorManager
mu sync.Mutex
}
type SensorManager interface {
Start()
GetStatus() bool
GetStatus() string
GetType() string
}
func NewSensorManager(addr int) SensorManager {
return sensor.NewSensorManager(addr)
type I2Cmonitor interface {
CreateDevice(int) error
GetStatus(int) bool
}
func NewSensorManager(addr int,m I2Cmonitor) (SensorManager, error) {
return sensor.NewSensorManager(addr,m)
}
func NewCoordinator(s string,ch chan error) *Coordinator {
c := make(Coordinator{Server:s,Err:ch}
c.Connected = make(<-chan int)
c.Disconnected = make(<-chan int)
return c
sen := new(SensorManagers)
sen.Managers = make(map[int]SensorManager)
return &Coordinator{Server:s,Err:ch,Sensors:sen}
}
func NewI2CMonitor(c, dc <-chan int, b int) error {
return I2C.NewMonitor(b)
func NewI2CMonitor(c chan<- int, b int) (I2Cmonitor, error) {
return I2C.NewMonitor(c,b)
}
func GetHWInfo() Hwinfo {
return system.NewHWMonitor()
func GetHWInfo() (Hardware, error) {
return system.NewHWinfo()
}
func (c *Coordinator) Start() error {
// should discover hwinfo and sensors on its own
// now setting up sensor managers
c.HW, err = GetHWInfo()
hw, err := GetHWInfo()
if err != nil {
return err
}
err = NewI2CMonitor(c.Connect,c.Disconnect,c.Bus)
c.HW = hw
con := make(chan int)
c.Connected = con
monitor, err := NewI2CMonitor(con,c.HW.GetBus())
if err != nil {
return err
}
fmt.Printf("Reactor at IP addr %v using I2C bus %v\n",c.Ip,c.Bus)
response := c.EstablishConnection()
for response != nil {
fmt.Println("Connection failed!, Retrying")
response = c.EstablishConnection()
c.I2CMonitor = monitor
fmt.Printf("Reactor at IP addr %v using I2C bus %v\n",c.HW.GetIp(),c.HW.GetBus())
err = c.EstablishConnection()
for err != nil {
fmt.Printf("Connection failed: %v \n Retrying!\n",err)
}
c.Register()
go c.Monitor()
return nil
}
func (c *Coordinator) Monitor() {
// function to automatically create and destroy sm
for {
select {
case addr := <-c.Connect:
//check if sm exists and add sm for addr
c.Connected(addr)
case addr := <-c.Disconnect:
// kill sm for addr
c.Disconnected(addr)
}
addr := <-c.Connected
go c.Connect(addr)
}
}
func (c *Coordinator) Connected(addr int) {
c.mu.Lock()
defer c.mu.Unlock()
sm, exists := c.Sensors[addr]
func (c *Coordinator) Connect(addr int) {
c.Sensors.mu.Lock()
defer c.Sensors.mu.Unlock()
_, exists := c.Sensors.Managers[addr]
if !exists {
sm := NewSensorManager(addr)
c.Sensors[addr] = sm
go sm.Start()
} // ignoring case of existing sm
go c.I2CMonitor.CreateDevice(addr)
sm, err := NewSensorManager(addr,c.I2CMonitor)
if err != nil {
c.Err <-err
}
c.Sensors.Managers[addr] = sm
} // ignoring case of existing sm eventually will have to check for alive
}
func (c *Coordinator) Disconnected(addr int) {
c.mu.Lock()
defer c.mu.Unlock()
sm, exists := c.Sensors[addr]
if exists {
c.Sensors = delete(c.Sensors,addr)
sm.Kill()
} // not dealing with kill requests for not existing sm
func (c *Coordinator) EstablishConnection() error {
// function connects to central server and passes hwinfo
var opts []grpc.DialOption
@ -129,7 +125,7 @@ func (c *Coordinator) EstablishConnection() error {
}
defer conn.Close()
client := pb.NewHandshakeClient(conn)
req := &pb.ReactorDiscoveryRequest{Id:c.Id,Ip:c.Ip,Port:c.Port,Model:c.Model}
req := &pb.ReactorDiscoveryRequest{Id:c.HW.GetId(),Ip:c.HW.GetIp(),Port:int32(c.HW.GetPort()),Model:c.HW.GetModel()}
resp, err := client.ReactorDiscoveryHandler(context.Background(), req)
if err != nil {
return err
@ -143,8 +139,8 @@ func (c *Coordinator) EstablishConnection() error {
}
func (c *Coordinator) Register() error {
fmt.Printf("Listing for pings on %v:%v\n",c.Ip,c.Port)
lis, err := net.Listen("tcp", fmt.Sprintf("%v:%v",c.Ip(),c.Port))
fmt.Printf("Listing for pings on %v:%v\n",c.HW.GetIp(),c.HW.GetPort())
lis, err := net.Listen("tcp", fmt.Sprintf("%v:%v",c.HW.GetIp(),c.HW.GetPort()))
if err != nil {
return err
}
@ -154,12 +150,13 @@ func (c *Coordinator) Register() error {
return nil
}
// sensor status stuff
func (c *Coordinator) SensorStatusHandler(ctx context.Context, ping *pb.SensorStatusRequest) (*pb.SensorStatusResponse,error) {
c.mu.Lock()
defer c.mu.Unlock()
c.Sensors.mu.Lock()
defer c.Sensors.mu.Unlock()
var sensors []*pb.SensorStatus
resp := &pb.SensorStatusResponse{Id:c.Id,Sensors:sensors}
for a, s := range c.Sensors {
resp := &pb.SensorStatusResponse{Id:c.HW.GetId(),Sensors:sensors}
for a, s := range c.Sensors.Managers {
resp.Sensors = append(resp.Sensors,&pb.SensorStatus{Addr:int32(a), Type:s.GetType(), Status:s.GetStatus()})
}
return resp, nil

@ -2,39 +2,85 @@ package sensor
import (
"sync"
"FRMS/internal/pkg/I2C"
"fmt"
_ "FRMS/internal/pkg/I2C"
)
type Manager struct {
Addr int
Type string
Status bool
Sensor *SensorInfo
I2CMonitor I2Cmonitor
Kill chan<- bool
mu sync.Mutex
}
// implementing sensor skeleton
type SensorStatus uint32
const (
READY SensorStatus = iota
ALIVE
SENDING
KILLED
)
func (s SensorStatus) String() string {
return [...]string{"Ready","Active","Sending","Disabled"}[s]
}
type SensorInfo struct {
Status SensorStatus
Type string
mu sync.Mutex
}
func (s *SensorInfo) GetStatus() string{
s.mu.Lock()
defer s.mu.Unlock()
return fmt.Sprint(s.Status)
}
func (m *Manager) GetStatus() bool{
m.mu.Lock()
defer m.mu.Unlock()
return m.Status
func (s *SensorInfo) GetType() string{
s.mu.Lock()
defer s.mu.Unlock()
return s.Type
}
func (m *Manager) GetType() string{
m.mu.Lock()
defer m.mu.Unlock()
return m.Type
// i2c stuff
type I2Cmonitor interface {
GetStatus(int) bool
}
func NewSensorManager(addr int) *Manager {
return &Manager{Addr:addr}
func NewSensorManager(addr int,i I2Cmonitor) (*Manager,error) {
m := &Manager{Addr:addr,I2CMonitor:i}
go m.Start()
return m, nil
}
func (m *Manager) Start() {
types := map[int]string{97:"DO Sensor",99:"pH Sensor",102:"RTD Sensor"}
m.Type = types[m.Addr]
m.Status = true
m.Sensor = &SensorInfo{Type:types[m.Addr]}
}
func (m *Manager) GetStatus() string {
b := m.I2CMonitor.GetStatus(m.Addr)
m.Sensor.Update(b)
return m.Sensor.GetStatus()
}
func (m *Manager) GetType() string {
return m.Sensor.GetType()
}
// I2C interface stuff
func (s *SensorInfo) Update(b bool) {
s.mu.Lock()
defer s.mu.Unlock()
if b {
s.Status = ALIVE
} else {
s.Status = KILLED
}
}

@ -28,6 +28,26 @@ type HWinfo struct {
Port int
}
func (h *HWinfo) GetIp() string {
return h.Ip
}
func (h *HWinfo) GetId() uint32 {
return h.Id
}
func (h *HWinfo) GetBus() int {
return h.Bus
}
func (h *HWinfo) GetPort() int {
return h.Port
}
func (h *HWinfo) GetModel() string {
return h.Model
}
func NewHWinfo() (*HWinfo, error) {
h := &HWinfo{Port:2000}
err := h.Get()

15
notes

@ -77,3 +77,18 @@ TODO: 6-24
Y - Pres stuff from yesterday + python gRPC abstraction
Y - RPI flash
- Add resiliance to coordinator process (aka error handley blech)
TODO 6/27
- Time to tackle sensor managers officially
- to hell with port generation
- going to use channels but not like a jackass
- going to try generating channels interface side but via implicit types to avoid the interface stff
- should set up a structure where I can use arbiturary types on the backend and fulfill methods to get/infer information on the frontend
- rewrite I2C interface to employ same method, should allow for this
1) generate type
2) send it to worker
3) receive back (original? copy?)
4) use interface methods to get required values
- should simplify all internal communication and potentially suggests api for implementation

Loading…
Cancel
Save