package reactor // file describes reactor level coordinator and associated implementation import ( "fmt" "sync" "time" "math" "FRMS/internal/pkg/system" "FRMS/internal/pkg/I2C" "FRMS/internal/pkg/sensor" "errors" "context" "google.golang.org/grpc" "google.golang.org/grpc/status" "google.golang.org/grpc/credentials/insecure" pb "FRMS/internal/pkg/grpc" ) // Coordinator == Reactor Level Coordinator type Coordinator struct { *server *hwinfo Devices *DeviceManagers // struct for fine grain locking Err chan error mu sync.Mutex Active active pb.UnimplementedMonitoringServer } type active struct { bool int sync.Mutex } type server struct { // store central server endpoint Ip string Port int } type hwinfo struct { // store reactor info Ip string Port int Model string Type string Bus int Id uint32 } type DeviceManagers struct { Managers map[int]DeviceManager sync.Mutex } // basic devicemanager struct manipulations type DeviceManager interface { Start() GetType() string GetStatus() string GetData() string } type I2CDev interface { GetAddr() int GetStatus() string GetType() string } func NewDeviceManager(i2c I2CDev) DeviceManager { return sensor.NewDeviceManager(i2c) } type I2CMonitor interface { Monitor() GetDevice(int) interface{ GetStatus() string; GetType() string;GetAddr() int } } func NewI2CMonitor(b int,ch chan int) I2CMonitor { return I2C.NewMonitor(b, ch) } func NewCoordinator(ip string,port int,ch chan error) *Coordinator { serv := &server{Ip:ip,Port:port} sen := new(DeviceManagers) sen.Managers = make(map[int]DeviceManager) c := &Coordinator{Err:ch,Devices:sen} c.server = serv c.hwinfo = &hwinfo{} c.Type = "reactor" // explicit for client stuff return c } type Hardware interface { GetId() uint32 GetIp() string GetBus() int GetModel() string GetPort() int } func GetHWInfo() (Hardware, error) { return system.NewHWinfo() } func (c *Coordinator) Start() { // should discover hwinfo and sensors on its own // now setting up sensor managers hw, err := GetHWInfo() // locking provided by struct is only useful on init if err != nil { c.Err <-err } // setting up hw stuff c.hwinfo.Ip = hw.GetIp() //get should prevent empty data c.Id = hw.GetId() c.Model = hw.GetModel() c.Bus = hw.GetBus() c.Register() go c.Monitor() go c.Connect() } func (c *Coordinator) Monitor() { // function to automatically create and destroy sm ch := make(chan int) im := NewI2CMonitor(c.Bus,ch) go im.Monitor() for { d := <-ch i := im.GetDevice(d) go c.DeviceConnect(i) } } func (c *Coordinator) DeviceConnect(i2c I2CDev) { c.Devices.Lock() defer c.Devices.Unlock() addr := i2c.GetAddr() if dm, exists := c.Devices.Managers[addr]; !exists{ dm := NewDeviceManager(i2c) c.Devices.Managers[addr] = dm go dm.Start() } else { go dm.Start() } } func (c *Coordinator) Connect() { // function connects to central server and passes hwinfo var opts []grpc.DialOption opts = append(opts,grpc.WithTransportCredentials(insecure.NewCredentials())) var conn *grpc.ClientConn var err error for { conn, err = grpc.Dial(fmt.Sprintf("%v:%v",c.server.Ip,c.server.Port),opts...) code := status.Code(err) if code != 0 { // != OK if code == (5 | 14) { // service temp down to := c.Timeout() if to == 0 { err = errors.New("Failed to connect to central server") c.Err <-err } fmt.Printf("Server currently unavailable, retrying in %v ms", to) time.Sleep(time.Duration(to) * time.Millisecond) } else { c.Err <-err } } break; } defer conn.Close() client := pb.NewHandshakeClient(conn) req := &pb.ClientDiscoveryRequest{Id:c.Id,Ip:c.hwinfo.Ip,Port:int32(c.hwinfo.Port),Model:c.Model,ClientType:c.Type} resp, err := client.ClientDiscoveryHandler(context.Background(), req) if err != nil { c.Err <-err } if resp.GetSuccess() { fmt.Println("Central server reached") } else { c.Err <-errors.New("Failed to reach central server!") } } func (c *Coordinator) Timeout() int { c.Active.Lock() defer c.Active.Unlock() if c.Active.int < 9 { v := int(5 * math.Pow(float64(2), float64(c.Active.int))) c.Active.int +=1 return v } else { //excedded retries return 0 } }