package reactor import ( "sync" "context" "strings" "github.com/influxdata/influxdb-client-go/v2" "strconv" "time" //"log" //"fmt" //"net" //"FRMS/internal/pkg/logging" //"google.golang.org/grpc" pb "FRMS/internal/pkg/grpc" ) // implements grpc handler and device data aggregater handler type DeviceStatus struct { Addr int Status string Type string Data string } // get reactor/device status func (c *Coordinator) DevStatus(ch chan *DeviceStatus, a int, dm DeviceManager) { d := &DeviceStatus{Addr:a} d.Type = dm.GetType() d.Status = dm.GetStatus() d.Data = dm.GetData() ch <-d } func (c *Coordinator) GetStatus(client influxdb2.Client) []*pb.Device { // db stuff api := client.WriteAPIBlocking(c.Org,c.Bucket) var wg sync.WaitGroup devs := []*pb.Device{} statusChan := make(chan *DeviceStatus) c.Devices.Lock() for a,dm := range c.Devices.Managers { wg.Add(1) go c.DevStatus(statusChan,a,dm) } c.Devices.Unlock() allDone := make(chan struct{}) go func(){ wg.Wait() allDone <-struct{}{} }() // once all the status are sent we send all done on the chan for { select{ case s:= <-statusChan: //fmt.Printf("%v is %v\n",s.Type,s.Status) data := strings.Split(s.Data,",") // T:10C,H:102% -> T:10C H:10% for _, m := range data { var meas string splt := strings.Split(m,":") // T 10C or H 10% if splt[0] == "T" { meas = "Temperature" } else if splt[0] == "H" { meas = "Humidity" } val, err := strconv.ParseFloat(strings.Trim(splt[1]," %C\n"), 64) if err != nil { panic(err) } p := influxdb2.NewPoint("measurements",map[string]string{"type":meas},map[string]interface{}{"val":val},time.Now()) if err := api.WritePoint(context.Background(), p); err != nil { panic(err) } } devs = append(devs,&pb.Device{Addr:int32(s.Addr),Type:s.Type,Status:s.Status,Data:s.Data}) wg.Done() case <-allDone: return devs } } } // grpc status update handler func (c *Coordinator) Ping(client influxdb2.Client) { // sends all device status to central coordinator devs := c.GetStatus(client) req := &pb.ReactorStatusPing{Id:c.Id,Devices:devs} _, err := c.MonitoringClient.ReactorStatusHandler(context.Background(),req) if err != nil { c.Err <-err go c.Exit() } } /* func (c *Coordinator) Register() { ip := c.hwinfo.Ip if lis, err := net.Listen("tcp", fmt.Sprintf("%v:0",ip)); err != nil { log.Fatal(err) } else { c.hwinfo.Port = lis.Addr().(*net.TCPAddr).Port grpcServer := grpc.NewServer() pb.RegisterMonitoringServer(grpcServer,c) go grpcServer.Serve(lis) } logging.Debug(logging.DStart, "Listening for pings on %v:%v\n",ip,c.hwinfo.Port) } */