You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

164 lines
4.3 KiB
Go

3 years ago
package reactor
// file describes reactor level coordinator and associated implementation
import (
"fmt"
3 years ago
"sync"
"net"
3 years ago
"FRMS/internal/pkg/system"
"FRMS/internal/pkg/I2C"
3 years ago
"FRMS/internal/pkg/sensor"
"errors"
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
pb "FRMS/internal/pkg/grpc"
3 years ago
)
type Hardware interface {
GetId() uint32
GetIp() string
GetBus() int
GetModel() string
GetPort() int
3 years ago
}
type Coordinator struct {
Server string
HW Hardware
Connected <-chan int
I2CMonitor I2Cmonitor
Sensors *SensorManagers // struct for fine grain locking
3 years ago
Err chan error
mu sync.Mutex
pb.UnimplementedMonitoringServer
3 years ago
}
type SensorManagers struct {
Managers map[int]SensorManager
mu sync.Mutex
}
3 years ago
type SensorManager interface {
GetStatus() string
3 years ago
GetType() string
}
type I2Cmonitor interface {
CreateDevice(int) error
GetStatus(int) bool
}
func NewSensorManager(addr int,m I2Cmonitor) (SensorManager, error) {
return sensor.NewSensorManager(addr,m)
3 years ago
}
3 years ago
3 years ago
func NewCoordinator(s string,ch chan error) *Coordinator {
sen := new(SensorManagers)
sen.Managers = make(map[int]SensorManager)
return &Coordinator{Server:s,Err:ch,Sensors:sen}
3 years ago
}
func NewI2CMonitor(c chan<- int, b int) (I2Cmonitor, error) {
return I2C.NewMonitor(c,b)
3 years ago
}
func GetHWInfo() (Hardware, error) {
return system.NewHWinfo()
3 years ago
}
func (c *Coordinator) Start() error {
// should discover hwinfo and sensors on its own
// now setting up sensor managers
hw, err := GetHWInfo()
if err != nil {
return err
}
c.HW = hw
con := make(chan int)
c.Connected = con
monitor, err := NewI2CMonitor(con,c.HW.GetBus())
if err != nil {
return err
}
c.I2CMonitor = monitor
fmt.Printf("Reactor at IP addr %v using I2C bus %v\n",c.HW.GetIp(),c.HW.GetBus())
err = c.EstablishConnection()
for err != nil {
fmt.Printf("Connection failed: %v \n Retrying!\n",err)
3 years ago
}
3 years ago
c.Register()
go c.Monitor()
return nil
}
func (c *Coordinator) Monitor() {
// function to automatically create and destroy sm
for {
addr := <-c.Connected
go c.Connect(addr)
}
}
func (c *Coordinator) Connect(addr int) {
c.Sensors.mu.Lock()
defer c.Sensors.mu.Unlock()
_, exists := c.Sensors.Managers[addr]
if !exists {
go c.I2CMonitor.CreateDevice(addr)
sm, err := NewSensorManager(addr,c.I2CMonitor)
if err != nil {
c.Err <-err
}
c.Sensors.Managers[addr] = sm
} // ignoring case of existing sm eventually will have to check for alive
3 years ago
}
func (c *Coordinator) EstablishConnection() error {
3 years ago
// function connects to central server and passes hwinfo
3 years ago
var opts []grpc.DialOption
opts = append(opts,grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := grpc.Dial(c.Server,opts...)
if err != nil {
return err
}
defer conn.Close()
client := pb.NewHandshakeClient(conn)
req := &pb.ReactorDiscoveryRequest{Id:c.HW.GetId(),Ip:c.HW.GetIp(),Port:int32(c.HW.GetPort()),Model:c.HW.GetModel()}
3 years ago
resp, err := client.ReactorDiscoveryHandler(context.Background(), req)
if err != nil {
return err
}
if resp.GetSuccess() {
fmt.Println("Central server reached")
return nil
} else {
return errors.New("Failed to reach central server!")
}
}
3 years ago
3 years ago
func (c *Coordinator) Register() error {
fmt.Printf("Listing for pings on %v:%v\n",c.HW.GetIp(),c.HW.GetPort())
lis, err := net.Listen("tcp", fmt.Sprintf("%v:%v",c.HW.GetIp(),c.HW.GetPort()))
3 years ago
if err != nil {
return err
}
grpcServer := grpc.NewServer()
pb.RegisterMonitoringServer(grpcServer, c)
go grpcServer.Serve(lis)
3 years ago
return nil
}
3 years ago
// sensor status stuff
3 years ago
func (c *Coordinator) SensorStatusHandler(ctx context.Context, ping *pb.SensorStatusRequest) (*pb.SensorStatusResponse,error) {
c.Sensors.mu.Lock()
defer c.Sensors.mu.Unlock()
3 years ago
var sensors []*pb.SensorStatus
resp := &pb.SensorStatusResponse{Id:c.HW.GetId(),Sensors:sensors}
for a, s := range c.Sensors.Managers {
3 years ago
resp.Sensors = append(resp.Sensors,&pb.SensorStatus{Addr:int32(a), Type:s.GetType(), Status:s.GetStatus()})
}
return resp, nil
}