You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
170 lines
4.5 KiB
Go
170 lines
4.5 KiB
Go
3 years ago
|
package server
|
||
|
|
||
|
import (
|
||
|
"fmt"
|
||
|
"time"
|
||
|
"math"
|
||
|
"context"
|
||
|
"sync"
|
||
|
"errors"
|
||
|
"google.golang.org/grpc"
|
||
|
"google.golang.org/grpc/status"
|
||
|
"google.golang.org/grpc/credentials/insecure"
|
||
|
pb "FRMS/internal/pkg/grpc"
|
||
|
)
|
||
|
|
||
|
// this package will implement a reactor coordinator and associated go routines
|
||
|
|
||
|
type ReactorManager struct {
|
||
|
Server *ReactorListener // embedding parent listener to call methods
|
||
|
Ip string
|
||
|
Port int
|
||
|
Id uint32
|
||
|
Model string
|
||
|
Hb time.Duration
|
||
|
Devs *Devices
|
||
|
Active active
|
||
|
}
|
||
|
|
||
|
type active struct{
|
||
|
sync.Mutex
|
||
|
bool
|
||
|
int
|
||
|
}
|
||
|
|
||
|
type Devices struct {
|
||
|
mu sync.Mutex
|
||
|
D map[int]Device
|
||
|
}
|
||
|
|
||
|
type Device interface {
|
||
|
GetStatus() uint32
|
||
|
PrintStatus() string
|
||
|
GetType() string
|
||
|
}
|
||
|
|
||
|
func NewReactorManager(s *ReactorListener, ip, model string, port int, id uint32) *ReactorManager {
|
||
|
d := new(Devices)
|
||
|
return &ReactorManager{Server: s, Ip: ip, Port: port, Id: id, Model: model,Devs:d,Hb:time.Duration(5)}
|
||
|
}
|
||
|
|
||
|
func(r *ReactorManager) Start() {
|
||
|
// establish connection with reactor and start pinging at set intervals
|
||
|
if !r.Activate() {
|
||
|
// reactor already running
|
||
|
r.Server.Err <-errors.New("Reactor already running!")
|
||
|
} // if we get here, reactor is atomically activated and we can ensure start wont run again
|
||
|
go r.Connect()
|
||
|
}
|
||
|
|
||
|
func (r *ReactorManager) Exit() {
|
||
|
// exit function to eventually allow saving to configs
|
||
|
if !r.Deactivate() {
|
||
|
r.Server.Err <-errors.New("Reactor already disabled!")
|
||
|
}
|
||
|
fmt.Printf("Reactor %v exiting", r.Id)
|
||
|
}
|
||
|
|
||
|
// reactor manager atomic operations
|
||
|
|
||
|
func (r *ReactorManager) IsActive() bool {
|
||
|
r.Active.Lock()
|
||
|
defer r.Active.Unlock()
|
||
|
return r.Active.bool
|
||
|
}
|
||
|
|
||
|
func (r *ReactorManager) Activate() bool {
|
||
|
r.Active.Lock()
|
||
|
defer r.Active.Unlock()
|
||
|
alive := r.Active.bool
|
||
|
if alive {
|
||
|
return false
|
||
|
} else {
|
||
|
r.Active.bool = true
|
||
|
r.Active.int = 0
|
||
|
return r.Active.bool
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (r *ReactorManager) Deactivate() bool {
|
||
|
r.Active.Lock()
|
||
|
defer r.Active.Unlock()
|
||
|
alive := r.Active.bool
|
||
|
if alive {
|
||
|
r.Active.bool = false
|
||
|
return true
|
||
|
} else {
|
||
|
return r.Active.bool
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// connection stuff
|
||
|
|
||
|
func (r *ReactorManager) Timeout() int {
|
||
|
// keeps track of and generates timeout [0-1.2s) over span of ~2.5s
|
||
|
// returns 0 on TO elapse
|
||
|
r.Active.Lock()
|
||
|
defer r.Active.Unlock()
|
||
|
if r.Active.int < 9 {
|
||
|
v := int(5 * math.Pow(float64(2), float64(r.Active.int)))
|
||
|
r.Active.int += 1
|
||
|
return v
|
||
|
} else {
|
||
|
// exceeded retries
|
||
|
return 0
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (r *ReactorManager) Connect() {
|
||
|
// establish initial gRPC connection with reactor
|
||
|
var opts []grpc.DialOption
|
||
|
var conn *grpc.ClientConn
|
||
|
opts = append(opts,grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||
|
for {
|
||
|
if !r.IsActive() {
|
||
|
fmt.Printf("Aborting connection attempt\n")
|
||
|
return
|
||
|
}
|
||
|
var err error
|
||
|
conn, err = grpc.Dial(fmt.Sprintf("%v:%v",r.Ip,r.Port),opts...)
|
||
|
// begin error handling
|
||
|
code := status.Code(err)
|
||
|
if code != 0 { // != OK
|
||
|
if code == (5 | 14) { // == unavailable or not found
|
||
|
to := r.Timeout()
|
||
|
if to == 0 {
|
||
|
fmt.Printf("Reactor not responding, exiting...\n")
|
||
|
r.Exit()
|
||
|
return
|
||
|
}
|
||
|
fmt.Printf("gRPC endpoint currently unavailable, retrying in %v ms",to)
|
||
|
time.Sleep(time.Duration(to) * time.Millisecond)
|
||
|
} else {
|
||
|
fmt.Printf("ERR GRPC: %v\n",code)
|
||
|
r.Server.Err <-err
|
||
|
}
|
||
|
} // lmao this is why it always exits
|
||
|
break;
|
||
|
}
|
||
|
client := pb.NewMonitoringClient(conn)
|
||
|
go r.Monitor(conn, client)
|
||
|
}
|
||
|
|
||
|
func (r *ReactorManager) Monitor(conn *grpc.ClientConn, client pb.MonitoringClient) {
|
||
|
defer conn.Close()
|
||
|
for r.IsActive() {
|
||
|
req := &pb.SensorStatusRequest{Id:r.Id}
|
||
|
resp, err := client.SensorStatusHandler(context.Background(),req)
|
||
|
code := status.Code(err)
|
||
|
if code != 0 { // if != OK
|
||
|
fmt.Printf("Reactor %v down! Exiting manager...", r.Id)
|
||
|
r.Exit()
|
||
|
}
|
||
|
for _,v := range resp.GetSensors() {
|
||
|
fmt.Printf("%v is %v at %x\n",v.GetType(),v.GetStatus(),v.GetAddr())
|
||
|
}
|
||
|
fmt.Printf("Reactor %v online\n", resp.GetId())
|
||
|
time.Sleep(r.Hb * time.Second) // time between sensor pings
|
||
|
}
|
||
|
}
|