package reactor // file describes reactor level coordinator and associated implementation import ( pb "FRMS/internal/pkg/grpc" "FRMS/internal/pkg/influxdb" "FRMS/internal/pkg/logging" "FRMS/internal/pkg/manager" "FRMS/internal/pkg/system" "context" "fmt" "time" "github.com/spf13/viper" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/status" ) // basic manager type Manager interface { Start() error Exit() error Timeout() (time.Duration, error) HeartBeat(chan struct{}, int, int, time.Duration) // creates a hb } func NewManager(max int) Manager { return manager.New(max) } // db client type DBClient interface { // Start() error } func NewDBClient(config *viper.Viper) (DBClient, error) { return influxdb.NewDBClient(config) } type Server struct { Ip string `mapstructure:"ip"` Port int `mapstructure:"port"` } type ReactorInfo struct { Name string `mapstructure:"name,omitempty"` ID int `mapstructure:"id,omitempty"` Model string `mapstructure:"model,omitempty"` HB int `mapstructure:"heartbeat"` Bus int `mapstructure:"bus"` Server } type ReactorCoordinator struct { Manager // base manager Config *viper.Viper // config ReactorInfo `mapstructure:",squash"` Database DBClient pb.MonitoringClient // grpc embedding *DeviceCoordinator // struct for locking Err chan error } func NewCoordinator(config *viper.Viper, errCh chan error) *ReactorCoordinator { m := NewManager(6) // max 6 attempts dc := NewDeviceCoordinator(config) c := &ReactorCoordinator{ Manager: m, Config: config, DeviceCoordinator: dc, Err: errCh, } // this is going to be scuffed //c.DB = &DB{Bucket: "bb", Org: "ForeLight", URL: url, Token: "S1UZssBu6KPfHaQCt34pZFpyc5lzbH9XanYJWCkOI5FqLY7gq205C6FTH-CmugiPH6o2WoKlTkEuPgIfaJjAhw=="} return c } func (c *ReactorCoordinator) Start() { // should discover hwinfo and sensors on its own // now setting up sensor managers var err error if err = c.Manager.Start(); err != nil { c.Err <- err } // load config if err = c.LoadConfig(); err != nil { // loads info c.Err <- err } if err = c.DeviceCoordinator.Start(c.ReactorInfo.Bus); err != nil { c.Err <- err } // loading clients if c.Database, err = NewDBClient(c.Config); err != nil { c.Err <- err } go c.Discover() go c.Database.Start() } func (c *ReactorCoordinator) LoadConfig() error { var err error // get hb if !c.Config.IsSet("reactor.heartbeat") { // default to 5 seconds c.Config.Set("reactor.heartbeat", 5) } // check id if !c.Config.IsSet("reactor.id") { // get from hw var id int if id, err = system.GetId("eth0"); err != nil { return err } c.Config.Set("reactor.id", id) } // check Model if !c.Config.IsSet("reactor.model") { // get from hw var model string if model, err = system.GetModel(); err != nil { return err } c.Config.Set("reactor.model", model) } // check i2c bus if !c.Config.IsSet("reactor.bus") { // get from hw var bus int if bus, err = system.GetBus(); err != nil { return err } c.Config.Set("reactor.bus", bus) } // all good, unmarhsaling c.Config.UnmarshalKey("reactor", c) return err } func (c *ReactorCoordinator) Monitor() { // periodically grabs connected devs and updates list ch := make(chan struct{}) go c.HeartBeat(ch, c.HB, 0, time.Second) for range ch { // check devs and ping logging.Debug(logging.DClient, "RLC Pinging server") // ping central server with status go c.Ping() } } func (c *ReactorCoordinator) Discover() { // sets up connection to central coordiantor conn, err := c.Connect(c.Ip, c.Port) if err != nil { c.Err <- err } defer conn.Close() client := pb.NewHandshakeClient(conn) req := &pb.ClientRequest{ClientId: uint32(c.ID), ClientType: "reactor"} resp, err := client.ClientDiscoveryHandler(context.Background(), req) if err != nil { c.Err <- err } c.Port = int(resp.GetServerPort()) // updating server port logging.Debug(logging.DClient, "RLC Central server reached, supplied port %v", c.Port) // connecting to manager now clientConn, err := c.Connect(c.Ip, c.Port) if err != nil { c.Err <- err } c.MonitoringClient = pb.NewMonitoringClient(clientConn) // manager go c.Monitor() } func (c *ReactorCoordinator) Connect(ip string, port int) (*grpc.ClientConn, error) { // function connects to central server and passes hwinfo var opts []grpc.DialOption opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) var conn *grpc.ClientConn var err error for { conn, err = grpc.Dial(fmt.Sprintf("%v:%v", ip, port), opts...) code := status.Code(err) if code != 0 { // != OK if code == (5 | 14) { // service temp down var to time.Duration if to, err = c.Timeout(); err != nil { // from manager return &grpc.ClientConn{}, err } logging.Debug(logging.DClient, "Server currently unavailable, retrying in %v", to) time.Sleep(to) } else { return &grpc.ClientConn{}, err } } break } return conn, nil } func (c *ReactorCoordinator) Ping() { // send device info to central coordinator fmt.Printf("Pinging server\n") var devices []*pb.Device var err error if devices, err = c.GetDeviceInfo(); err != nil { c.Err <- err } // create request req := &pb.ReactorStatusPing{ Id: int32(c.ID), Devices: devices, } // ping server if _, err = c.ReactorStatusHandler(context.Background(), req); err != nil { c.Err <- err } }