package server import ( "fmt" "time" _ "log" "context" "sync" "FRMS/internal/pkg/logging" "google.golang.org/grpc" "google.golang.org/grpc/status" "google.golang.org/grpc/credentials/insecure" pb "FRMS/internal/pkg/grpc" ) // this package will implement a reactor coordinator and associated go routines type ReactorManager struct { *Manager StatusMon *StatusMonitor *devstatus } type devstatus struct { sync.Mutex Devs map[uint32]*DeviceInfo } func NewReactorManager(c *Client,sys *SystemViewer,err chan error) GeneralManager { r := &ReactorManager{} di := make(map[uint32]*DeviceInfo) r.devstatus = &devstatus{Devs:di} r.Manager = NewManager(err) r.StatusMon = NewStatusMonitor("Reactor",c.Id,sys) return r } func (r *ReactorManager) Start(cl *Client) { r.Manager.Start(cl) logging.Debug(logging.DStart,"RMA %v starting", r.Id) go r.StatusMon.Send(&DeviceInfo{Id:r.Id,Type:"Reactor",Status:"[green]ONLINE[white]"},"Reactor") conn := r.Connect() empty := &grpc.ClientConn{} if conn != empty { go r.Monitor(conn) } } func (r *ReactorManager) Exit() { r.Manager.Exit() logging.Debug(logging.DExit, "RMA %v exiting", r.Id) go r.StatusMon.Send(&DeviceInfo{Id:r.Id,Type:"Reactor",Status:"[red]OFFLINE[white]",Data:fmt.Sprintf("Last Seen %v",time.Now().Format("Mon at 03:04:05pm MST"))},"Reactor") r.Devs.Lock() defer r.Devs.Unlock() for _, d := range r.Devs { newd := d newd.Status = "[yellow]UNKOWN[white]" r.Devs[newd.Id] = newd go r.StatusMon.Send(newd,"Device") } } func (r *ReactorManager) Connect() *grpc.ClientConn { // establish gRPC conection with reactor var opts []grpc.DialOption var conn *grpc.ClientConn opts = append(opts,grpc.WithTransportCredentials(insecure.NewCredentials())) for { if !r.IsActive() { logging.Debug(logging.DClient,"RMA %v No longer active, aborting connection attempt",r.Id) return &grpc.ClientConn{} } var err error conn, err = grpc.Dial(fmt.Sprintf("%v:%v",r.Ip,r.Port),opts...) // error handling code := status.Code(err) if code != 0 { // != OK if code == (5 | 14) { // unavailable or not found to := r.Timeout() if to == 0 { logging.Debug(logging.DClient,"RMA %v Client not responding",r.Id) return &grpc.ClientConn{} } logging.Debug(logging.DClient,"RMA %v Client currently down, retrying in %v ms",r.Id, to) time.Sleep(time.Duration(to) * time.Millisecond) } else { logging.Debug(logging.DError,"RMA %v GRPC ERROR: %v",r.Id, code) r.Err <- err } } break; } return conn } func (r *ReactorManager) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusRequest) (*pb.ReactorStatusResponse, error) { // function client will call to update reactor information go r.PingReset() for _, dev := range req.GetDevices() { d := &DeviceInfo{Id:uint32(dev.GetAddr()),Type:dev.GetType(),Status:dev.GetStatus(),Data:dev.GetData()} go r.UpdateDevice(d) } return &pb.ReactorStatusResponse{Id:r.Id}, nil } /* func (r *ReactorManager) Monitor(conn *grpc.ClientConn) { defer conn.Close() client := pb.NewMonitoringClient(conn) for r.IsActive() { req := &pb.ReactorStatusRequest{Id:r.Id} resp, err := client.GetReactorStatus(context.Background(),req) code := status.Code(err) if code != 0 { // if != OK logging.Debug(logging.DClient,"RMA %v Reactor not responding! Code: %v\n", r.Id,code) r.devstatus.Lock() for _, d := range r.Devs { newd := d newd.Status = "[yellow]UNKOWN[white]" r.Devs[newd.Id] = newd go r.StatusMon.Send(newd,"Device") } r.devstatus.Unlock() r.Exit() break; } for _,v := range resp.GetDevices() { d := &DeviceInfo{Id:uint32(v.GetAddr()),Type:v.GetType(),Status:v.GetStatus(),Data:v.GetData()} go r.UpdateDevice(d) } time.Sleep(r.Hb) // time between sensor pings } } */ func (r *ReactorManager) UpdateDevice(d *DeviceInfo) { r.devstatus.Lock() defer r.devstatus.Unlock() if olddev, ok := r.Devs[d.Id]; !ok { // new device r.Devs[d.Id] = d go r.StatusMon.Send(d,"Device") } else if olddev.Status != d.Status || olddev.Data != d.Data { // dev status or data has changed r.Devs[d.Id] = d go r.StatusMon.Send(d,"Device") } }