|
|
|
package reactor
|
|
|
|
|
|
|
|
// file describes reactor level coordinator and associated implementation
|
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
|
|
|
"sync"
|
|
|
|
"net"
|
|
|
|
"time"
|
|
|
|
"math"
|
|
|
|
"FRMS/internal/pkg/system"
|
|
|
|
"FRMS/internal/pkg/I2C"
|
|
|
|
"FRMS/internal/pkg/sensor"
|
|
|
|
"errors"
|
|
|
|
"context"
|
|
|
|
"google.golang.org/grpc"
|
|
|
|
"google.golang.org/grpc/status"
|
|
|
|
"google.golang.org/grpc/credentials/insecure"
|
|
|
|
pb "FRMS/internal/pkg/grpc"
|
|
|
|
)
|
|
|
|
|
|
|
|
// Coordinator == Reactor Level Coordinator
|
|
|
|
|
|
|
|
type Coordinator struct {
|
|
|
|
*server
|
|
|
|
*hwinfo
|
|
|
|
Connected <-chan int
|
|
|
|
I2CMonitor I2Cmonitor
|
|
|
|
Sensors *SensorManagers // struct for fine grain locking
|
|
|
|
Err chan error
|
|
|
|
mu sync.Mutex
|
|
|
|
PendingChanges pc
|
|
|
|
Active active
|
|
|
|
pb.UnimplementedMonitoringServer
|
|
|
|
}
|
|
|
|
|
|
|
|
type pc struct {
|
|
|
|
[]//some type of sensor
|
|
|
|
sync.Mutex
|
|
|
|
}
|
|
|
|
|
|
|
|
type active struct {
|
|
|
|
bool
|
|
|
|
int
|
|
|
|
sync.Mutex
|
|
|
|
}
|
|
|
|
|
|
|
|
type server struct {
|
|
|
|
// store central server endpoint
|
|
|
|
Ip string
|
|
|
|
Port int
|
|
|
|
}
|
|
|
|
|
|
|
|
type hwinfo struct {
|
|
|
|
// store reactor info
|
|
|
|
Ip string
|
|
|
|
Port int
|
|
|
|
Model string
|
|
|
|
Type string
|
|
|
|
Bus int
|
|
|
|
Id uint32
|
|
|
|
}
|
|
|
|
|
|
|
|
type SensorManagers struct {
|
|
|
|
Managers map[int]SensorManager
|
|
|
|
mu sync.Mutex
|
|
|
|
}
|
|
|
|
|
|
|
|
type SensorManager interface {
|
|
|
|
GetStatus() uint32
|
|
|
|
GetType() string
|
|
|
|
}
|
|
|
|
|
|
|
|
type I2Cmonitor interface {
|
|
|
|
CreateDevice(int) error
|
|
|
|
GetStatus(int) bool
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewSensorManager(addr int,m I2Cmonitor) (SensorManager, error) {
|
|
|
|
return sensor.NewSensorManager(addr,m)
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewCoordinator(ip string,port int,ch chan error) *Coordinator {
|
|
|
|
serv := &server{Ip:ip,Port:port}
|
|
|
|
sen := new(SensorManagers)
|
|
|
|
sen.Managers = make(map[int]SensorManager)
|
|
|
|
c := &Coordinator{Err:ch,Sensors:sen}
|
|
|
|
c.server = serv
|
|
|
|
c.hwinfo = &hwinfo{}
|
|
|
|
c.Type = "reactor" // explicit for client stuff
|
|
|
|
return c
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewI2CMonitor(c chan<- int, b int) (I2Cmonitor, error) {
|
|
|
|
return I2C.NewMonitor(c,b)
|
|
|
|
}
|
|
|
|
|
|
|
|
type Hardware interface {
|
|
|
|
GetId() uint32
|
|
|
|
GetIp() string
|
|
|
|
GetBus() int
|
|
|
|
GetModel() string
|
|
|
|
GetPort() int
|
|
|
|
}
|
|
|
|
|
|
|
|
func GetHWInfo() (Hardware, error) {
|
|
|
|
return system.NewHWinfo()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *Coordinator) Start() error {
|
|
|
|
// should discover hwinfo and sensors on its own
|
|
|
|
// now setting up sensor managers
|
|
|
|
hw, err := GetHWInfo() // locking provided by struct is only useful on init
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
// setting up hw stuff
|
|
|
|
c.hwinfo.Ip = hw.GetIp() //get should prevent empty data
|
|
|
|
c.hwinfo.Port = hw.GetPort()
|
|
|
|
c.Id = hw.GetId()
|
|
|
|
c.Model = hw.GetModel()
|
|
|
|
c.Bus = hw.GetBus()
|
|
|
|
|
|
|
|
con := make(chan int)
|
|
|
|
c.Connected = con
|
|
|
|
if c.I2CMonitor, err = NewI2CMonitor(con,c.Bus); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
fmt.Printf("Reactor at IP addr %v using I2C bus %v\n",c.hwinfo.Ip,c.hwinfo.Bus)
|
|
|
|
err = c.Connect()
|
|
|
|
for err != nil {
|
|
|
|
fmt.Printf("Connection failed: %v \n Retrying!\n",err)
|
|
|
|
}
|
|
|
|
c.Register()
|
|
|
|
go c.Monitor()
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *Coordinator) Monitor() {
|
|
|
|
// function to automatically create and destroy sm
|
|
|
|
for {
|
|
|
|
addr := <-c.Connected
|
|
|
|
go c.SensorConnect(addr)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *Coordinator) SensorConnect(addr int) {
|
|
|
|
c.Sensors.mu.Lock()
|
|
|
|
defer c.Sensors.mu.Unlock()
|
|
|
|
_, exists := c.Sensors.Managers[addr]
|
|
|
|
if !exists {
|
|
|
|
go c.I2CMonitor.CreateDevice(addr)
|
|
|
|
sm, err := NewSensorManager(addr,c.I2CMonitor)
|
|
|
|
if err != nil {
|
|
|
|
c.Err <-err
|
|
|
|
}
|
|
|
|
c.Sensors.Managers[addr] = sm
|
|
|
|
} // ignoring case of existing sm eventually will have to check for alive
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *Coordinator) Connect() error {
|
|
|
|
// function connects to central server and passes hwinfo
|
|
|
|
var opts []grpc.DialOption
|
|
|
|
opts = append(opts,grpc.WithTransportCredentials(insecure.NewCredentials()))
|
|
|
|
var conn *grpc.ClientConn
|
|
|
|
var err error
|
|
|
|
for {
|
|
|
|
conn, err = grpc.Dial(fmt.Sprintf("%v:%v",c.server.Ip,c.server.Port),opts...)
|
|
|
|
code := status.Code(err)
|
|
|
|
if code != 0 { // != OK
|
|
|
|
if code == (5 | 14) { // service temp down
|
|
|
|
to := c.Timeout()
|
|
|
|
if to == 0 {
|
|
|
|
err = errors.New("Failed to connect to central server")
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
fmt.Printf("Server currently unavailable, retrying in %v ms", to)
|
|
|
|
time.Sleep(time.Duration(to) * time.Millisecond)
|
|
|
|
} else {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
defer conn.Close()
|
|
|
|
client := pb.NewHandshakeClient(conn)
|
|
|
|
req := &pb.ClientDiscoveryRequest{Id:c.Id,Ip:c.hwinfo.Ip,Port:int32(c.hwinfo.Port),Model:c.Model,ClientType:c.Type}
|
|
|
|
resp, err := client.ClientDiscoveryHandler(context.Background(), req)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if resp.GetSuccess() {
|
|
|
|
fmt.Println("Central server reached")
|
|
|
|
return nil
|
|
|
|
} else {
|
|
|
|
return errors.New("Failed to reach central server!")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *Coordinator) Register() error {
|
|
|
|
fmt.Printf("Listening for pings on %v:%v\n",c.hwinfo.Ip,c.hwinfo.Port)
|
|
|
|
lis, err := net.Listen("tcp", fmt.Sprintf("%v:%v",c.hwinfo.Ip,c.hwinfo.Port))
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
grpcServer := grpc.NewServer()
|
|
|
|
pb.RegisterMonitoringServer(grpcServer, c)
|
|
|
|
go grpcServer.Serve(lis)
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// sensor status stuff
|
|
|
|
func (c *Coordinator) ReactorStatusHandler(ctx context.Context, ping *pb.ReactorStatusRequest) (*pb.ReactorStatusResponse,error) {
|
|
|
|
// handler returns reactor id and any changes in devices
|
|
|
|
chng := c.PendingChanges() // returns map[int]sensor
|
|
|
|
var devs []*pb.Device
|
|
|
|
resp := &pb.ReactorStatusResponse{Id:c.Id,Devices:devs}
|
|
|
|
for a, s := range chng {
|
|
|
|
resp.Devices = append(resp.Devices,&pb.Device{Addr:int32(a), Type:s.GetType(), Status:s.GetStatus()})
|
|
|
|
}
|
|
|
|
return resp, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *Coordinator) Timeout() int {
|
|
|
|
c.Active.Lock()
|
|
|
|
defer c.Active.Unlock()
|
|
|
|
if c.Active.int < 9 {
|
|
|
|
v := int(5 * math.Pow(float64(2), float64(c.Active.int)))
|
|
|
|
c.Active.int +=1
|
|
|
|
return v
|
|
|
|
} else {
|
|
|
|
//excedded retries
|
|
|
|
return 0
|
|
|
|
}
|
|
|
|
}
|