You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

108 lines
2.8 KiB
Go

package server
import (
"fmt"
"time"
"log"
"context"
"sync"
"google.golang.org/grpc"
"google.golang.org/grpc/status"
"google.golang.org/grpc/credentials/insecure"
pb "FRMS/internal/pkg/grpc"
)
// this package will implement a reactor coordinator and associated go routines
type ReactorManager struct {
*Manager
Devs *Devices
*System
}
type Devices struct {
mu sync.Mutex
D map[int]Device
}
func NewReactorManager(c *Client,ch chan *Client,sys *System,err chan error) {
d := new(Devices)
r := &ReactorManager{Devs:d}
start := make(chan bool)
r.Manager = NewManager(c, ch, start, err)
r.System = sys
go r.Listen(start)
}
func (r *ReactorManager) Listen(ch chan bool) {
for {
sig := <-ch
if sig {
r.Start()
}
}
}
func (r *ReactorManager) Start() {
conn := r.Connect()
empty := &grpc.ClientConn{}
if conn != empty {
go r.Monitor(conn)
}
}
func (r *ReactorManager) Connect() *grpc.ClientConn {
// establish gRPC conection with reactor
var opts []grpc.DialOption
var conn *grpc.ClientConn
opts = append(opts,grpc.WithTransportCredentials(insecure.NewCredentials()))
for {
if !r.IsActive() {
log.Fatal("No longer active, aborting connection attempt\n")
return &grpc.ClientConn{}
}
var err error
conn, err = grpc.Dial(fmt.Sprintf("%v:%v",r.Ip,r.Port),opts...)
// error handling
code := status.Code(err)
if code != 0 { // != OK
if code == (5 | 14) { // unavailable or not found
to := r.Timeout()
if to == 0 {
log.Printf("Client not responding\n")
return &grpc.ClientConn{}
}
log.Printf("Client currently down, retrying in %v ms\n",to)
time.Sleep(time.Duration(to) * time.Millisecond)
} else {
log.Fatal("GRPC ERROR: %v",code)
r.Err <- err
}
}
break;
}
return conn
}
func (r *ReactorManager) Monitor(conn *grpc.ClientConn) {
defer conn.Close()
client := pb.NewMonitoringClient(conn)
r.UpdateReactor(r.Id,true)
for r.IsActive() {
req := &pb.ReactorStatusRequest{Id:r.Id}
resp, err := client.GetReactorStatus(context.Background(),req)
code := status.Code(err)
if code != 0 { // if != OK
go r.UpdateReactor(r.Id,false)
fmt.Printf("Reactor %v down! ", r.Id)
r.Exit()
}
for _,v := range resp.GetDevices() {
go r.System.UpdateReactorDevice(r.Id, int(v.GetAddr()), v.GetType(), v.GetStatus(), v.GetData())
}
time.Sleep(r.Hb * time.Second) // time between sensor pings
}
}