|
|
|
package reactor
|
|
|
|
|
|
|
|
import (
|
|
|
|
"sync"
|
|
|
|
"context"
|
|
|
|
//"log"
|
|
|
|
//"fmt"
|
|
|
|
//"net"
|
|
|
|
//"FRMS/internal/pkg/logging"
|
|
|
|
//"google.golang.org/grpc"
|
|
|
|
pb "FRMS/internal/pkg/grpc"
|
|
|
|
)
|
|
|
|
|
|
|
|
// implements grpc handler and device data aggregater handler
|
|
|
|
type DeviceStatus struct {
|
|
|
|
Addr int
|
|
|
|
Status string
|
|
|
|
Type string
|
|
|
|
Data string
|
|
|
|
}
|
|
|
|
|
|
|
|
// get reactor/device status
|
|
|
|
func (c *Coordinator) DevStatus(ch chan *DeviceStatus, a int, dm DeviceManager) {
|
|
|
|
d := &DeviceStatus{Addr:a}
|
|
|
|
d.Type = dm.GetType()
|
|
|
|
d.Status = dm.GetStatus()
|
|
|
|
d.Data = dm.GetData()
|
|
|
|
ch <-d
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *Coordinator) GetStatus() []*pb.Device {
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
devs := []*pb.Device{}
|
|
|
|
statusChan := make(chan *DeviceStatus)
|
|
|
|
c.Devices.Lock()
|
|
|
|
for a,dm := range c.Devices.Managers {
|
|
|
|
wg.Add(1)
|
|
|
|
go c.DevStatus(statusChan,a,dm)
|
|
|
|
}
|
|
|
|
c.Devices.Unlock()
|
|
|
|
allDone := make(chan struct{})
|
|
|
|
go func(){
|
|
|
|
wg.Wait()
|
|
|
|
allDone <-struct{}{}
|
|
|
|
}() // once all the status are sent we send all done on the chan
|
|
|
|
for {
|
|
|
|
select{
|
|
|
|
case s:= <-statusChan:
|
|
|
|
//fmt.Printf("%v is %v\n",s.Type,s.Status)
|
|
|
|
devs = append(devs,&pb.Device{Addr:int32(s.Addr),Type:s.Type,Status:s.Status,Data:s.Data})
|
|
|
|
wg.Done()
|
|
|
|
case <-allDone:
|
|
|
|
return devs
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// grpc status update handler
|
|
|
|
func (c *Coordinator) Ping() {
|
|
|
|
// sends all device status to central coordinator
|
|
|
|
devs := c.GetStatus()
|
|
|
|
req := &pb.ReactorStatusPing{Id:c.Id,Devices:devs}
|
|
|
|
_, err := c.MonitoringClient.ReactorStatusHandler(context.Background(),req)
|
|
|
|
if err != nil {
|
|
|
|
c.Err <-err
|
|
|
|
go c.Exit()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
/*
|
|
|
|
func (c *Coordinator) Register() {
|
|
|
|
ip := c.hwinfo.Ip
|
|
|
|
|
|
|
|
if lis, err := net.Listen("tcp", fmt.Sprintf("%v:0",ip)); err != nil {
|
|
|
|
log.Fatal(err)
|
|
|
|
} else {
|
|
|
|
c.hwinfo.Port = lis.Addr().(*net.TCPAddr).Port
|
|
|
|
grpcServer := grpc.NewServer()
|
|
|
|
pb.RegisterMonitoringServer(grpcServer,c)
|
|
|
|
go grpcServer.Serve(lis)
|
|
|
|
}
|
|
|
|
logging.Debug(logging.DStart, "Listening for pings on %v:%v\n",ip,c.hwinfo.Port)
|
|
|
|
}
|
|
|
|
*/
|