package reactor // file describes reactor level coordinator and associated implementation import ( "fmt" "sync" "net" "FRMS/internal/pkg/system" "FRMS/internal/pkg/I2C" "FRMS/internal/pkg/sensor" "errors" "context" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" pb "FRMS/internal/pkg/grpc" ) type Hardware interface { GetIp() string GetId() uint32 GetBus() int GetModel() string GetPort() int } type I2CMonitor interface { Connected() []int } type Coordinator struct { Server string HW Hardware // object to keep track of reactor hw info I2C I2CMonitor // I2C monitor to keep track of I2C devices Sensors map[int]SensorManager Err chan error mu sync.Mutex pb.UnimplementedMonitoringServer } type SensorManager interface { Start() GetStatus() bool GetType() string } func NewSensorManager(addr int) SensorManager { return sensor.NewSensorManager(addr) } func NewCoordinator(s string,ch chan error) *Coordinator { return &Coordinator{Server:s,Err:ch} } func NewI2CMonitor(b int) I2CMonitor { return I2C.NewMonitor(b) } func NewHWMonitor() Hardware { return system.NewHWMonitor() } func (c *Coordinator) Start() error { // should discover hwinfo and sensors on its own // now setting up sensor managers c.HW = NewHWMonitor() fmt.Printf("Reactor at IP addr %v using I2C bus %v\n",c.HW.GetIp(),c.HW.GetBus()) response := c.Connect() for response != nil { fmt.Println("Connection failed!") response = c.Connect() } c.Register() c.mu.Lock() // lock to finish setting up devices defer c.mu.Unlock() c.I2C = NewI2CMonitor(c.HW.GetBus()) devs := c.I2C.Connected() c.Sensors = make(map[int]SensorManager) for _,d := range devs { // create a goroutine for each active sensor sm := NewSensorManager(d) c.Sensors[d] = sm go sm.Start() fmt.Printf("Sensor Manager for addr %v Started!\n",d) } return nil } func (c *Coordinator) Connect() error { // function connects to central server and passes hwinfo var opts []grpc.DialOption opts = append(opts,grpc.WithTransportCredentials(insecure.NewCredentials())) conn, err := grpc.Dial(c.Server,opts...) if err != nil { return err } defer conn.Close() client := pb.NewHandshakeClient(conn) req := &pb.ReactorDiscoveryRequest{Id:c.HW.GetId(),Ip:c.HW.GetIp(),Port:int32(2000),Model:c.HW.GetModel()} resp, err := client.ReactorDiscoveryHandler(context.Background(), req) if err != nil { return err } if resp.GetSuccess() { fmt.Println("Central server reached") return nil } else { return errors.New("Failed to reach central server!") } } func (c *Coordinator) Register() error { fmt.Printf("Listing for pings on %v:%v\n",c.HW.GetIp(),c.HW.GetPort()) lis, err := net.Listen("tcp", fmt.Sprintf("%v:%v",c.HW.GetIp(),c.HW.GetPort())) if err != nil { return err } grpcServer := grpc.NewServer() pb.RegisterMonitoringServer(grpcServer, c) go grpcServer.Serve(lis) return nil } func (c *Coordinator) SensorStatusHandler(ctx context.Context, ping *pb.SensorStatusRequest) (*pb.SensorStatusResponse,error) { c.mu.Lock() defer c.mu.Unlock() var sensors []*pb.SensorStatus resp := &pb.SensorStatusResponse{Id:c.HW.GetId(),Sensors:sensors} for a, s := range c.Sensors { resp.Sensors = append(resp.Sensors,&pb.SensorStatus{Addr:int32(a), Type:s.GetType(), Status:s.GetStatus()}) } return resp, nil }