|
|
|
@ -3,258 +3,268 @@ package reactor
|
|
|
|
|
// file describes reactor level coordinator and associated implementation
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"fmt"
|
|
|
|
|
"sync"
|
|
|
|
|
"time"
|
|
|
|
|
"math"
|
|
|
|
|
"FRMS/internal/pkg/system"
|
|
|
|
|
"FRMS/internal/pkg/I2C"
|
|
|
|
|
"FRMS/internal/pkg/sensor"
|
|
|
|
|
"FRMS/internal/pkg/logging"
|
|
|
|
|
"errors"
|
|
|
|
|
"context"
|
|
|
|
|
"google.golang.org/grpc"
|
|
|
|
|
"google.golang.org/grpc/status"
|
|
|
|
|
"google.golang.org/grpc/credentials/insecure"
|
|
|
|
|
"github.com/influxdata/influxdb-client-go/v2"
|
|
|
|
|
pb "FRMS/internal/pkg/grpc"
|
|
|
|
|
"FRMS/internal/pkg/I2C"
|
|
|
|
|
pb "FRMS/internal/pkg/grpc"
|
|
|
|
|
"FRMS/internal/pkg/logging"
|
|
|
|
|
"FRMS/internal/pkg/sensor"
|
|
|
|
|
"FRMS/internal/pkg/system"
|
|
|
|
|
"context"
|
|
|
|
|
"errors"
|
|
|
|
|
"fmt"
|
|
|
|
|
"math"
|
|
|
|
|
"sync"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
|
|
|
|
|
"google.golang.org/grpc"
|
|
|
|
|
"google.golang.org/grpc/credentials/insecure"
|
|
|
|
|
"google.golang.org/grpc/status"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// Coordinator == Reactor Level Coordinator
|
|
|
|
|
|
|
|
|
|
type Coordinator struct {
|
|
|
|
|
Ip string
|
|
|
|
|
Port int // listener port
|
|
|
|
|
MonitoringClient pb.MonitoringClient
|
|
|
|
|
*hw
|
|
|
|
|
Devices *DeviceManagers // struct for fine grain locking
|
|
|
|
|
Err chan error
|
|
|
|
|
mu sync.Mutex
|
|
|
|
|
HB time.Duration
|
|
|
|
|
PingTimer chan struct{}
|
|
|
|
|
*DB
|
|
|
|
|
Active active
|
|
|
|
|
Ip string
|
|
|
|
|
Port int // listener port
|
|
|
|
|
MonitoringClient pb.MonitoringClient
|
|
|
|
|
*hw
|
|
|
|
|
Devices *DeviceManagers // struct for fine grain locking
|
|
|
|
|
Err chan error
|
|
|
|
|
mu sync.Mutex
|
|
|
|
|
HB time.Duration
|
|
|
|
|
PingTimer chan struct{}
|
|
|
|
|
*DB
|
|
|
|
|
Active active
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type DB struct {
|
|
|
|
|
// struct to hold db connection info
|
|
|
|
|
Org string
|
|
|
|
|
Bucket string
|
|
|
|
|
Token string
|
|
|
|
|
URL string
|
|
|
|
|
// struct to hold db connection info
|
|
|
|
|
Org string
|
|
|
|
|
Bucket string
|
|
|
|
|
Token string
|
|
|
|
|
URL string
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type active struct {
|
|
|
|
|
bool
|
|
|
|
|
int
|
|
|
|
|
sync.Mutex
|
|
|
|
|
bool
|
|
|
|
|
int
|
|
|
|
|
sync.Mutex
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type hw struct {
|
|
|
|
|
// store reactor info
|
|
|
|
|
Model string
|
|
|
|
|
Bus int
|
|
|
|
|
Id uint32
|
|
|
|
|
// store reactor info
|
|
|
|
|
Model string
|
|
|
|
|
Bus int
|
|
|
|
|
Id uint32
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type DeviceManagers struct {
|
|
|
|
|
Managers map[int]DeviceManager
|
|
|
|
|
sync.Mutex
|
|
|
|
|
Managers map[int]DeviceManager
|
|
|
|
|
sync.Mutex
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// basic devicemanager struct manipulations
|
|
|
|
|
|
|
|
|
|
type DeviceManager interface {
|
|
|
|
|
Start()
|
|
|
|
|
GetType() string
|
|
|
|
|
GetStatus() string
|
|
|
|
|
GetData() string
|
|
|
|
|
Start()
|
|
|
|
|
GetType() string
|
|
|
|
|
GetStatus() string
|
|
|
|
|
GetData() string
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type I2CDev interface {
|
|
|
|
|
GetAddr() int
|
|
|
|
|
GetData() string
|
|
|
|
|
GetStatus() string
|
|
|
|
|
GetType() string
|
|
|
|
|
GetAddr() int
|
|
|
|
|
GetData() string
|
|
|
|
|
GetStatus() string
|
|
|
|
|
GetType() string
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func NewDeviceManager(i2c I2CDev) DeviceManager {
|
|
|
|
|
return sensor.NewDeviceManager(i2c)
|
|
|
|
|
return sensor.NewDeviceManager(i2c)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type I2CMonitor interface {
|
|
|
|
|
Monitor()
|
|
|
|
|
GetDevice(int) interface{ GetAddr() int; GetStatus() string; GetData() string; GetType() string}
|
|
|
|
|
Monitor()
|
|
|
|
|
GetDevice(int) interface {
|
|
|
|
|
GetAddr() int
|
|
|
|
|
GetStatus() string
|
|
|
|
|
GetData() string
|
|
|
|
|
GetType() string
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func NewI2CMonitor(b int,ch chan int) I2CMonitor {
|
|
|
|
|
return I2C.NewMonitor(b, ch)
|
|
|
|
|
func NewI2CMonitor(b int, ch chan int) I2CMonitor {
|
|
|
|
|
return I2C.NewMonitor(b, ch)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func NewCoordinator(ip string,port int,ch chan error) *Coordinator {
|
|
|
|
|
sen := new(DeviceManagers)
|
|
|
|
|
sen.Managers = make(map[int]DeviceManager)
|
|
|
|
|
c := &Coordinator{Err:ch,Devices:sen}
|
|
|
|
|
c.Ip = ip
|
|
|
|
|
c.Port = port
|
|
|
|
|
c.hw = &hw{}
|
|
|
|
|
c.HB = time.Duration(5 * time.Second)
|
|
|
|
|
c.PingTimer = make(chan struct{})
|
|
|
|
|
// this is going to be scuffed
|
|
|
|
|
url := fmt.Sprintf("http://%s:8086",ip)
|
|
|
|
|
fmt.Println(url)
|
|
|
|
|
c.DB = &DB{Bucket:"bb",Org:"ForeLight",URL:url,Token:"S1UZssBu6KPfHaQCt34pZFpyc5lzbH9XanYJWCkOI5FqLY7gq205C6FTH-CmugiPH6o2WoKlTkEuPgIfaJjAhw=="}
|
|
|
|
|
return c
|
|
|
|
|
func NewCoordinator(ip string, port int, ch chan error) *Coordinator {
|
|
|
|
|
sen := new(DeviceManagers)
|
|
|
|
|
sen.Managers = make(map[int]DeviceManager)
|
|
|
|
|
c := &Coordinator{Err: ch, Devices: sen}
|
|
|
|
|
// all this stuff can come from config
|
|
|
|
|
c.Ip = ip
|
|
|
|
|
c.Port = port
|
|
|
|
|
c.hw = &hw{}
|
|
|
|
|
c.HB = time.Duration(5 * time.Second)
|
|
|
|
|
|
|
|
|
|
// this is going to be scuffed
|
|
|
|
|
url := fmt.Sprintf("http://%s:8086", ip)
|
|
|
|
|
fmt.Println(url)
|
|
|
|
|
c.DB = &DB{Bucket: "bb", Org: "ForeLight", URL: url, Token: "S1UZssBu6KPfHaQCt34pZFpyc5lzbH9XanYJWCkOI5FqLY7gq205C6FTH-CmugiPH6o2WoKlTkEuPgIfaJjAhw=="}
|
|
|
|
|
|
|
|
|
|
c.PingTimer = make(chan struct{})
|
|
|
|
|
return c
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *Coordinator) Start() {
|
|
|
|
|
// should discover hwinfo and sensors on its own
|
|
|
|
|
// now setting up sensor managers
|
|
|
|
|
// setting up hw stuff
|
|
|
|
|
c.Activate()
|
|
|
|
|
var err error
|
|
|
|
|
c.Id, err = system.GetId("eth0")
|
|
|
|
|
c.Model, err = system.GetModel()
|
|
|
|
|
c.Bus, err = system.GetBus()
|
|
|
|
|
if err != nil {
|
|
|
|
|
c.Err <-err
|
|
|
|
|
}
|
|
|
|
|
go c.Monitor()
|
|
|
|
|
go c.Discover()
|
|
|
|
|
// should discover hwinfo and sensors on its own
|
|
|
|
|
// now setting up sensor managers
|
|
|
|
|
// setting up hw stuff
|
|
|
|
|
c.Activate()
|
|
|
|
|
var err error
|
|
|
|
|
c.Id, err = system.GetId("eth0")
|
|
|
|
|
c.Model, err = system.GetModel()
|
|
|
|
|
c.Bus, err = system.GetBus()
|
|
|
|
|
if err != nil {
|
|
|
|
|
c.Err <- err
|
|
|
|
|
}
|
|
|
|
|
go c.Monitor()
|
|
|
|
|
go c.Discover()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *Coordinator) Monitor() {
|
|
|
|
|
// function to automatically create and destroy sm
|
|
|
|
|
// scuffedaf
|
|
|
|
|
client := influxdb2.NewClient(c.URL,c.Token)
|
|
|
|
|
defer client.Close()
|
|
|
|
|
dch := make(chan int)
|
|
|
|
|
im := NewI2CMonitor(c.Bus,dch)
|
|
|
|
|
go im.Monitor()
|
|
|
|
|
for c.IsActive() {
|
|
|
|
|
select {
|
|
|
|
|
case d := <-dch:
|
|
|
|
|
i := im.GetDevice(d)
|
|
|
|
|
go c.DeviceConnect(i)
|
|
|
|
|
case <-c.PingTimer:
|
|
|
|
|
go c.Ping(client)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
// function to automatically create and destroy sm
|
|
|
|
|
// scuffedaf
|
|
|
|
|
client := influxdb2.NewClient(c.URL, c.Token)
|
|
|
|
|
defer client.Close()
|
|
|
|
|
dch := make(chan int)
|
|
|
|
|
im := NewI2CMonitor(c.Bus, dch)
|
|
|
|
|
go im.Monitor()
|
|
|
|
|
for c.IsActive() {
|
|
|
|
|
select {
|
|
|
|
|
case d := <-dch:
|
|
|
|
|
i := im.GetDevice(d)
|
|
|
|
|
go c.DeviceConnect(i)
|
|
|
|
|
case <-c.PingTimer:
|
|
|
|
|
go c.Ping(client)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *Coordinator) HeartBeat() {
|
|
|
|
|
for c.IsActive() {
|
|
|
|
|
c.PingTimer <-struct{}{}
|
|
|
|
|
logging.Debug(logging.DClient,"RLC Pinging server")
|
|
|
|
|
time.Sleep(c.HB)
|
|
|
|
|
}
|
|
|
|
|
for c.IsActive() {
|
|
|
|
|
c.PingTimer <- struct{}{}
|
|
|
|
|
logging.Debug(logging.DClient, "RLC Pinging server")
|
|
|
|
|
time.Sleep(c.HB)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *Coordinator) DeviceConnect(i2c I2CDev) {
|
|
|
|
|
c.Devices.Lock()
|
|
|
|
|
defer c.Devices.Unlock()
|
|
|
|
|
addr := i2c.GetAddr()
|
|
|
|
|
if dm, exists := c.Devices.Managers[addr]; !exists{
|
|
|
|
|
dm := NewDeviceManager(i2c)
|
|
|
|
|
c.Devices.Managers[addr] = dm
|
|
|
|
|
go dm.Start()
|
|
|
|
|
} else {
|
|
|
|
|
go dm.Start()
|
|
|
|
|
}
|
|
|
|
|
c.Devices.Lock()
|
|
|
|
|
defer c.Devices.Unlock()
|
|
|
|
|
addr := i2c.GetAddr()
|
|
|
|
|
if dm, exists := c.Devices.Managers[addr]; !exists {
|
|
|
|
|
dm := NewDeviceManager(i2c)
|
|
|
|
|
c.Devices.Managers[addr] = dm
|
|
|
|
|
go dm.Start()
|
|
|
|
|
} else {
|
|
|
|
|
go dm.Start()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *Coordinator) Discover() {
|
|
|
|
|
// sets up connection to central coordiantor
|
|
|
|
|
conn, err := c.Connect(c.Ip, c.Port)
|
|
|
|
|
if err != nil {
|
|
|
|
|
c.Err <-err
|
|
|
|
|
}
|
|
|
|
|
defer conn.Close()
|
|
|
|
|
client := pb.NewHandshakeClient(conn)
|
|
|
|
|
req := &pb.ClientRequest{ClientId:c.Id,ClientType:"reactor"}
|
|
|
|
|
resp, err := client.ClientDiscoveryHandler(context.Background(), req)
|
|
|
|
|
if err != nil {
|
|
|
|
|
c.Err <-err
|
|
|
|
|
}
|
|
|
|
|
c.Port = int(resp.GetServerPort()) // updating server port
|
|
|
|
|
logging.Debug(logging.DClient,"RLC Central server reached, supplied port %v",c.Port)
|
|
|
|
|
// connecting to manager now
|
|
|
|
|
clientConn, err := c.Connect(c.Ip, c.Port)
|
|
|
|
|
if err != nil {
|
|
|
|
|
c.Err <-err
|
|
|
|
|
}
|
|
|
|
|
c.MonitoringClient = pb.NewMonitoringClient(clientConn)
|
|
|
|
|
go c.HeartBeat()
|
|
|
|
|
// sets up connection to central coordiantor
|
|
|
|
|
conn, err := c.Connect(c.Ip, c.Port)
|
|
|
|
|
if err != nil {
|
|
|
|
|
c.Err <- err
|
|
|
|
|
}
|
|
|
|
|
defer conn.Close()
|
|
|
|
|
client := pb.NewHandshakeClient(conn)
|
|
|
|
|
req := &pb.ClientRequest{ClientId: c.Id, ClientType: "reactor"}
|
|
|
|
|
resp, err := client.ClientDiscoveryHandler(context.Background(), req)
|
|
|
|
|
if err != nil {
|
|
|
|
|
c.Err <- err
|
|
|
|
|
}
|
|
|
|
|
c.Port = int(resp.GetServerPort()) // updating server port
|
|
|
|
|
logging.Debug(logging.DClient, "RLC Central server reached, supplied port %v", c.Port)
|
|
|
|
|
// connecting to manager now
|
|
|
|
|
clientConn, err := c.Connect(c.Ip, c.Port)
|
|
|
|
|
if err != nil {
|
|
|
|
|
c.Err <- err
|
|
|
|
|
}
|
|
|
|
|
c.MonitoringClient = pb.NewMonitoringClient(clientConn)
|
|
|
|
|
go c.HeartBeat()
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *Coordinator) Connect(ip string, port int) (*grpc.ClientConn, error) {
|
|
|
|
|
// function connects to central server and passes hwinfo
|
|
|
|
|
var opts []grpc.DialOption
|
|
|
|
|
opts = append(opts,grpc.WithTransportCredentials(insecure.NewCredentials()))
|
|
|
|
|
var conn *grpc.ClientConn
|
|
|
|
|
var err error
|
|
|
|
|
for {
|
|
|
|
|
conn, err = grpc.Dial(fmt.Sprintf("%v:%v",ip,port),opts...)
|
|
|
|
|
code := status.Code(err)
|
|
|
|
|
if code != 0 { // != OK
|
|
|
|
|
if code == (5 | 14) { // service temp down
|
|
|
|
|
to := c.Timeout()
|
|
|
|
|
if to == 0 {
|
|
|
|
|
err = errors.New("Failed to connect to central server")
|
|
|
|
|
return &grpc.ClientConn{}, err
|
|
|
|
|
}
|
|
|
|
|
logging.Debug(logging.DClient,"Server currently unavailable, retrying in %v ms", to)
|
|
|
|
|
time.Sleep(time.Duration(to) * time.Millisecond)
|
|
|
|
|
} else {
|
|
|
|
|
return &grpc.ClientConn{}, err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
return conn, nil
|
|
|
|
|
// function connects to central server and passes hwinfo
|
|
|
|
|
var opts []grpc.DialOption
|
|
|
|
|
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
|
|
|
|
var conn *grpc.ClientConn
|
|
|
|
|
var err error
|
|
|
|
|
for {
|
|
|
|
|
conn, err = grpc.Dial(fmt.Sprintf("%v:%v", ip, port), opts...)
|
|
|
|
|
code := status.Code(err)
|
|
|
|
|
if code != 0 { // != OK
|
|
|
|
|
if code == (5 | 14) { // service temp down
|
|
|
|
|
to := c.Timeout()
|
|
|
|
|
if to == 0 {
|
|
|
|
|
err = errors.New("Failed to connect to central server")
|
|
|
|
|
return &grpc.ClientConn{}, err
|
|
|
|
|
}
|
|
|
|
|
logging.Debug(logging.DClient, "Server currently unavailable, retrying in %v ms", to)
|
|
|
|
|
time.Sleep(time.Duration(to) * time.Millisecond)
|
|
|
|
|
} else {
|
|
|
|
|
return &grpc.ClientConn{}, err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
return conn, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *Coordinator) Timeout() int {
|
|
|
|
|
c.Active.Lock()
|
|
|
|
|
defer c.Active.Unlock()
|
|
|
|
|
if c.Active.int < 9 {
|
|
|
|
|
v := int(5 * math.Pow(float64(2), float64(c.Active.int)))
|
|
|
|
|
c.Active.int +=1
|
|
|
|
|
return v
|
|
|
|
|
} else {
|
|
|
|
|
//excedded retries
|
|
|
|
|
return 0
|
|
|
|
|
}
|
|
|
|
|
c.Active.Lock()
|
|
|
|
|
defer c.Active.Unlock()
|
|
|
|
|
if c.Active.int < 9 {
|
|
|
|
|
v := int(5 * math.Pow(float64(2), float64(c.Active.int)))
|
|
|
|
|
c.Active.int += 1
|
|
|
|
|
return v
|
|
|
|
|
} else {
|
|
|
|
|
//excedded retries
|
|
|
|
|
return 0
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *Coordinator) IsActive() bool {
|
|
|
|
|
c.Active.Lock()
|
|
|
|
|
defer c.Active.Unlock()
|
|
|
|
|
return c.Active.bool
|
|
|
|
|
c.Active.Lock()
|
|
|
|
|
defer c.Active.Unlock()
|
|
|
|
|
return c.Active.bool
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *Coordinator) Exit() bool {
|
|
|
|
|
c.Active.Lock()
|
|
|
|
|
defer c.Active.Unlock()
|
|
|
|
|
if c.Active.bool {
|
|
|
|
|
c.Active.bool = false
|
|
|
|
|
logging.Debug(logging.DClient,"RLC Exiting...")
|
|
|
|
|
return true
|
|
|
|
|
} else {
|
|
|
|
|
logging.Debug(logging.DError, "RLC Already Dead!")
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
c.Active.Lock()
|
|
|
|
|
defer c.Active.Unlock()
|
|
|
|
|
if c.Active.bool {
|
|
|
|
|
c.Active.bool = false
|
|
|
|
|
logging.Debug(logging.DClient, "RLC Exiting...")
|
|
|
|
|
return true
|
|
|
|
|
} else {
|
|
|
|
|
logging.Debug(logging.DError, "RLC Already Dead!")
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *Coordinator) Activate() bool {
|
|
|
|
|
c.Active.Lock()
|
|
|
|
|
defer c.Active.Unlock()
|
|
|
|
|
if c.Active.bool {
|
|
|
|
|
logging.Debug(logging.DError,"RLC Already Started!")
|
|
|
|
|
return false
|
|
|
|
|
} else {
|
|
|
|
|
logging.Debug(logging.DClient, "RLC Starting")
|
|
|
|
|
c.Active.bool = true
|
|
|
|
|
return c.Active.bool
|
|
|
|
|
}
|
|
|
|
|
c.Active.Lock()
|
|
|
|
|
defer c.Active.Unlock()
|
|
|
|
|
if c.Active.bool {
|
|
|
|
|
logging.Debug(logging.DError, "RLC Already Started!")
|
|
|
|
|
return false
|
|
|
|
|
} else {
|
|
|
|
|
logging.Debug(logging.DClient, "RLC Starting")
|
|
|
|
|
c.Active.bool = true
|
|
|
|
|
return c.Active.bool
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|