package reactor import ( "sync" "context" "log" "fmt" "net" "google.golang.org/grpc" pb "FRMS/internal/pkg/grpc" ) // implements grpc handler and device data aggregater handler type DeviceStatus struct { Addr int Status string Type string Data string } // get reactor/device status func (c *Coordinator) DevStatus(ch chan *DeviceStatus, a int, dm DeviceManager) { d := &DeviceStatus{Addr:a} d.Type = dm.GetType() d.Status = dm.GetStatus() d.Data = dm.GetData() ch <-d } func (c *Coordinator) GetStatus() []*DeviceStatus { var wg sync.WaitGroup devs := []*DeviceStatus{} statusChan := make(chan *DeviceStatus) c.Devices.Lock() for a,dm := range c.Devices.Managers { wg.Add(1) go c.DevStatus(statusChan,a,dm) } c.Devices.Unlock() allDone := make(chan struct{}) go func(){ wg.Wait() allDone <-struct{}{} }() // once all the status are sent we send all done on the chan for { select{ case s:= <-statusChan: //fmt.Printf("%v is %v, ",s.Type,s.Status) devs = append(devs,s) wg.Done() case <-allDone: fmt.Printf("Devices scaned\n") return devs } } } // grpc status update handler func (c *Coordinator) Register() { ip := c.hwinfo.Ip if lis, err := net.Listen("tcp", fmt.Sprintf("%v:0",ip)); err != nil { log.Fatal(err) } else { c.hwinfo.Port = lis.Addr().(*net.TCPAddr).Port grpcServer := grpc.NewServer() pb.RegisterMonitoringServer(grpcServer,c) go grpcServer.Serve(lis) } log.Printf("Listening for pings on %v:%v\n",ip,c.hwinfo.Port) } func (c *Coordinator) GetReactorStatus(ctx context.Context, ping *pb.ReactorStatusRequest) (*pb.ReactorStatusResponse, error) { // status request handler devs := []*pb.Device{} resp := &pb.ReactorStatusResponse{Id:c.Id,Devices:devs} devStatus := c.GetStatus() for _,v := range devStatus { d := &pb.Device{Addr:int32(v.Addr),Type:v.Type,Status:v.Status,Data:v.Data} resp.Devices = append(resp.Devices,d) } return resp, nil }