You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

203 lines
4.8 KiB
Go

package reactor
// file describes reactor level coordinator and associated implementation
import (
"fmt"
"sync"
"time"
"math"
"FRMS/internal/pkg/system"
"FRMS/internal/pkg/I2C"
"FRMS/internal/pkg/sensor"
"FRMS/internal/pkg/logging"
"errors"
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/status"
"google.golang.org/grpc/credentials/insecure"
pb "FRMS/internal/pkg/grpc"
)
// Coordinator == Reactor Level Coordinator
type Coordinator struct {
*server
*hwinfo
Devices *DeviceManagers // struct for fine grain locking
Err chan error
mu sync.Mutex
Active active
pb.UnimplementedMonitoringServer
}
type active struct {
bool
int
sync.Mutex
}
type server struct {
// store central server endpoint
Ip string
Port int
}
type hwinfo struct {
// store reactor info
Ip string
Port int
Model string
Bus int
Id uint32
}
type DeviceManagers struct {
Managers map[int]DeviceManager
sync.Mutex
}
// basic devicemanager struct manipulations
type DeviceManager interface {
Start()
GetType() string
GetStatus() string
GetData() string
}
type I2CDev interface {
GetAddr() int
GetData() string
GetStatus() string
GetType() string
}
func NewDeviceManager(i2c I2CDev) DeviceManager {
return sensor.NewDeviceManager(i2c)
}
type I2CMonitor interface {
Monitor()
GetDevice(int) interface{ GetAddr() int; GetStatus() string; GetData() string; GetType() string}
}
func NewI2CMonitor(b int,ch chan int) I2CMonitor {
return I2C.NewMonitor(b, ch)
}
func NewCoordinator(ip string,port int,ch chan error) *Coordinator {
serv := &server{Ip:ip,Port:port}
sen := new(DeviceManagers)
sen.Managers = make(map[int]DeviceManager)
c := &Coordinator{Err:ch,Devices:sen}
c.server = serv
c.hwinfo = &hwinfo{}
return c
}
type Hardware interface {
GetId() uint32
GetIp() string
GetBus() int
GetModel() string
GetPort() int
}
func GetHWInfo() (Hardware, error) {
return system.NewHWinfo()
}
func (c *Coordinator) Start() {
// should discover hwinfo and sensors on its own
// now setting up sensor managers
hw, err := GetHWInfo() // locking provided by struct is only useful on init
if err != nil {
c.Err <-err
}
// setting up hw stuff
c.hwinfo.Ip = hw.GetIp() //get should prevent empty data
c.Id = hw.GetId()
c.Model = hw.GetModel()
c.Bus = hw.GetBus()
c.Register()
go c.Monitor()
go c.Connect()
logging.Debug(logging.DStart, "%v RLC Starting", c.Id)
}
func (c *Coordinator) Monitor() {
// function to automatically create and destroy sm
ch := make(chan int)
im := NewI2CMonitor(c.Bus,ch)
go im.Monitor()
for {
d := <-ch
i := im.GetDevice(d)
go c.DeviceConnect(i)
}
}
func (c *Coordinator) DeviceConnect(i2c I2CDev) {
c.Devices.Lock()
defer c.Devices.Unlock()
addr := i2c.GetAddr()
if dm, exists := c.Devices.Managers[addr]; !exists{
dm := NewDeviceManager(i2c)
c.Devices.Managers[addr] = dm
go dm.Start()
} else {
go dm.Start()
}
}
func (c *Coordinator) Connect() {
// function connects to central server and passes hwinfo
var opts []grpc.DialOption
opts = append(opts,grpc.WithTransportCredentials(insecure.NewCredentials()))
var conn *grpc.ClientConn
var err error
for {
conn, err = grpc.Dial(fmt.Sprintf("%v:%v",c.server.Ip,c.server.Port),opts...)
code := status.Code(err)
if code != 0 { // != OK
if code == (5 | 14) { // service temp down
to := c.Timeout()
if to == 0 {
err = errors.New("Failed to connect to central server")
c.Err <-err
}
logging.Debug(logging.DClient,"Server currently unavailable, retrying in %v ms", to)
time.Sleep(time.Duration(to) * time.Millisecond)
} else {
c.Err <-err
}
}
break;
}
defer conn.Close()
client := pb.NewHandshakeClient(conn)
req := &pb.ReactorClientRequest{ClientId:c.Id,ClientIp:c.hwinfo.Ip,ClientPort:int32(c.hwinfo.Port),ClientModel:c.Model}
resp, err := client.ReactorClientDiscoveryHandler(context.Background(), req)
if err != nil {
c.Err <-err
}
if resp.GetSuccess() {
logging.Debug(logging.DClient,"Central server reached")
} else {
c.Err <-errors.New("Failed to reach central server!")
}
}
func (c *Coordinator) Timeout() int {
c.Active.Lock()
defer c.Active.Unlock()
if c.Active.int < 9 {
v := int(5 * math.Pow(float64(2), float64(c.Active.int)))
c.Active.int +=1
return v
} else {
//excedded retries
return 0
}
}