package server import ( "fmt" "time" _ "log" "context" "sync" "FRMS/internal/pkg/logging" "google.golang.org/grpc" "google.golang.org/grpc/status" "google.golang.org/grpc/credentials/insecure" pb "FRMS/internal/pkg/grpc" ) // this package will implement a reactor coordinator and associated go routines type ReactorManager struct { *Manager StatusMon *StatusMonitor DevsMon *StatusMonitor *devstatus } type devstatus struct { sync.Mutex Devs map[uint32]*DeviceInfo } func NewReactorManager(c *Client,sys *SystemViewer,err chan error) GeneralManager { r := &ReactorManager{} di := make(map[uint32]*DeviceInfo) r.devstatus = &devstatus{Devs:di} r.Manager = NewManager(err) r.StatusMon = sys.AddReactorSender() r.DevsMon = sys.AddDeviceSender(c.Id) return r } func (r *ReactorManager) Start(cl *Client) { r.Manager.Start(cl) logging.Debug(logging.DStart,"RMA %v starting", r.Id) go r.StatusMon.Start() go r.DevsMon.Start() go r.StatusMon.Send(&DeviceInfo{Id:r.Id,Type:"Reactor",Status:"[green]ONLINE[white]"}) conn := r.Connect() empty := &grpc.ClientConn{} if conn != empty { go r.Monitor(conn) } } func (r *ReactorManager) Exit() { r.Manager.Exit() logging.Debug(logging.DExit, "RMA %v exiting", r.Id) go r.StatusMon.Send(&DeviceInfo{Id:r.Id,Type:"Reactor",Status:"[red]OFFLINE[white]",Data:fmt.Sprintf("Last Seen %v",time.Now().Format("Mon at 03:04:05pm MST"))}) } func (r *ReactorManager) GetPort() int { return 0 } func (r *ReactorManager) Connect() *grpc.ClientConn { // establish gRPC conection with reactor var opts []grpc.DialOption var conn *grpc.ClientConn opts = append(opts,grpc.WithTransportCredentials(insecure.NewCredentials())) for { if !r.IsActive() { logging.Debug(logging.DClient,"RMA %v No longer active, aborting connection attempt",r.Id) return &grpc.ClientConn{} } var err error conn, err = grpc.Dial(fmt.Sprintf("%v:%v",r.Ip,r.Port),opts...) // error handling code := status.Code(err) if code != 0 { // != OK if code == (5 | 14) { // unavailable or not found to := r.Timeout() if to == 0 { logging.Debug(logging.DClient,"RMA %v Client not responding",r.Id) return &grpc.ClientConn{} } logging.Debug(logging.DClient,"RMA %v Client currently down, retrying in %v ms",r.Id, to) time.Sleep(time.Duration(to) * time.Millisecond) } else { logging.Debug(logging.DError,"RMA %v GRPC ERROR: %v",r.Id, code) r.Err <- err } } break; } return conn } func (r *ReactorManager) Monitor(conn *grpc.ClientConn) { defer conn.Close() client := pb.NewMonitoringClient(conn) for r.IsActive() { req := &pb.ReactorStatusRequest{Id:r.Id} resp, err := client.GetReactorStatus(context.Background(),req) code := status.Code(err) if code != 0 { // if != OK logging.Debug(logging.DClient,"RMA %v Reactor not responding! Code: %v\n", r.Id,code) r.devstatus.Lock() for _, d := range r.Devs { newd := d newd.Status = "[yellow]UNKOWN[white]" r.Devs[newd.Id] = newd go r.DevsMon.Send(newd) } r.devstatus.Unlock() r.Exit() break; } //r.devstatus.Lock() for _,v := range resp.GetDevices() { d := &DeviceInfo{Id:uint32(v.GetAddr()),Type:v.GetType(),Status:v.GetStatus(),Data:v.GetData()} go r.UpdateDevice(d) //go r.DevsMon.Send(d) //r.Devs[d.Id] = d } //r.devstatus.Unlock() logging.Debug(logging.DPing, "RMA %v Reactor Reached", r.Id) time.Sleep(r.Hb) // time between sensor pings } } func (r *ReactorManager) UpdateDevice(d *DeviceInfo) { r.devstatus.Lock() defer r.devstatus.Unlock() if olddev, ok := r.Devs[d.Id]; !ok { // new device r.Devs[d.Id] = d go r.DevsMon.Send(d) } else if olddev.Status != d.Status || olddev.Data != d.Data { // dev status or data has changed r.Devs[d.Id] = d go r.DevsMon.Send(d) } }