You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

265 lines
5.9 KiB
Go

package reactor
// file describes reactor level coordinator and associated implementation
import (
"FRMS/internal/pkg/I2C"
pb "FRMS/internal/pkg/grpc"
"FRMS/internal/pkg/influxdb"
"FRMS/internal/pkg/logging"
"FRMS/internal/pkg/system"
"context"
"errors"
"fmt"
"math"
"sync"
"time"
"github.com/spf13/viper"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
)
// db client interface
type DBClient interface {
//
Start() error
}
func NewDBClient(config *viper.Viper) (DBClient, error) {
return influxdb.NewDBClient(config)
}
type I2CClient interface {
// simple client to push responsibilites to sensor
GetConnected() (map[int]bool, error) // gets all connected addr
SendCmd(int, string) (string, error) // send cmd, string is return
}
func NewI2CClient(config *viper.Viper) (I2CClient, error) {
return I2C.NewI2CClient(config)
}
type Server struct {
// embed
Ip string `mapstructure:"ip"`
Port int `mapstructure:"port"`
}
// Coordinator == Reactor Level Coordinator
type Coordinator struct {
Name string `mapstructure:"name,omitempty"`
ID int `mapstructure:"id,omitempty"`
Model string `mapstructure:"model,omitempty"`
// server info embedded
Server
// database
Database DBClient
I2C I2CClient
// config
Config *viper.Viper
MonitoringClient pb.MonitoringClient
// connected devices
*DeviceCoordinator // struct for locking
// other stuff and things
Err chan error
mu sync.Mutex
HB time.Duration
PingTimer chan struct{}
// db client
Active active
}
type active struct {
bool
int
sync.Mutex
}
func NewCoordinator(config *viper.Viper, ch chan error) *Coordinator {
// coord
c := &Coordinator{Err: ch, Config: config}
c.DeviceCoordinator = NewDeviceCoordinator()
// hb defaults to 5
c.HB = time.Duration(5 * time.Second)
// this is going to be scuffed
//c.DB = &DB{Bucket: "bb", Org: "ForeLight", URL: url, Token: "S1UZssBu6KPfHaQCt34pZFpyc5lzbH9XanYJWCkOI5FqLY7gq205C6FTH-CmugiPH6o2WoKlTkEuPgIfaJjAhw=="}
// setup db
var err error
if c.Database, err = NewDBClient(config); err != nil {
ch <- err
}
if c.I2C, err = NewI2CClient(config); err != nil {
ch <- err
}
c.PingTimer = make(chan struct{})
return c
}
func (c *Coordinator) Start() {
// should discover hwinfo and sensors on its own
// now setting up sensor managers
c.Activate()
// load hwinfo
if err := c.LoadInfo(); err != nil { // loads info
c.Err <- err
}
// grab config stuff
c.Config.UnmarshalKey("reactor", c)
go c.Monitor()
go c.Discover()
go c.Database.Start()
}
func (c *Coordinator) LoadInfo() error {
// check ID
var err error
if !c.Config.IsSet("reactor.id") {
// get id
var id int
if id, err = system.GetId("eth0"); err != nil {
return err
}
c.Config.Set("reactor.id", id)
}
// check Model
if !c.Config.IsSet("reactor.model") {
// get model
var model string
if model, err = system.GetModel(); err != nil {
return err
}
c.Config.Set("reactor.model", model)
}
// all good
return err
}
func (c *Coordinator) Monitor() {
// periodically grabs connected devs and updates list
for c.IsActive() {
select {
case <-c.PingTimer:
// check devs and ping
active, err := c.I2C.GetConnected()
if err != nil {
c.Err <- err
}
go c.UpdateDevices(active)
go c.Ping()
}
}
}
func (c *Coordinator) HeartBeat() {
for c.IsActive() {
c.PingTimer <- struct{}{}
logging.Debug(logging.DClient, "RLC Pinging server")
time.Sleep(c.HB)
}
}
func (c *Coordinator) Discover() {
// sets up connection to central coordiantor
conn, err := c.Connect(c.Ip, c.Port)
if err != nil {
c.Err <- err
}
defer conn.Close()
client := pb.NewHandshakeClient(conn)
req := &pb.ClientRequest{ClientId: uint32(c.ID), ClientType: "reactor"}
resp, err := client.ClientDiscoveryHandler(context.Background(), req)
if err != nil {
c.Err <- err
}
c.Port = int(resp.GetServerPort()) // updating server port
logging.Debug(logging.DClient, "RLC Central server reached, supplied port %v", c.Port)
// connecting to manager now
clientConn, err := c.Connect(c.Ip, c.Port)
if err != nil {
c.Err <- err
}
c.MonitoringClient = pb.NewMonitoringClient(clientConn)
go c.HeartBeat()
}
func (c *Coordinator) Connect(ip string, port int) (*grpc.ClientConn, error) {
// function connects to central server and passes hwinfo
var opts []grpc.DialOption
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
var conn *grpc.ClientConn
var err error
for {
conn, err = grpc.Dial(fmt.Sprintf("%v:%v", ip, port), opts...)
code := status.Code(err)
if code != 0 { // != OK
if code == (5 | 14) { // service temp down
to := c.Timeout()
if to == 0 {
err = errors.New("Failed to connect to central server")
return &grpc.ClientConn{}, err
}
logging.Debug(logging.DClient, "Server currently unavailable, retrying in %v ms", to)
time.Sleep(time.Duration(to) * time.Millisecond)
} else {
return &grpc.ClientConn{}, err
}
}
break
}
return conn, nil
}
func (c *Coordinator) Timeout() int {
c.Active.Lock()
defer c.Active.Unlock()
if c.Active.int < 9 {
v := int(5 * math.Pow(float64(2), float64(c.Active.int)))
c.Active.int += 1
return v
} else {
//excedded retries
return 0
}
}
func (c *Coordinator) IsActive() bool {
c.Active.Lock()
defer c.Active.Unlock()
return c.Active.bool
}
func (c *Coordinator) Exit() bool {
c.Active.Lock()
defer c.Active.Unlock()
if c.Active.bool {
c.Active.bool = false
logging.Debug(logging.DClient, "RLC Exiting...")
return true
} else {
logging.Debug(logging.DError, "RLC Already Dead!")
return false
}
}
func (c *Coordinator) Activate() bool {
c.Active.Lock()
defer c.Active.Unlock()
if c.Active.bool {
logging.Debug(logging.DError, "RLC Already Started!")
return false
} else {
logging.Debug(logging.DClient, "RLC Starting")
c.Active.bool = true
return c.Active.bool
}
}