You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

124 lines
3.5 KiB
Go

package server
import (
"fmt"
"time"
"log"
"context"
"sync"
"google.golang.org/grpc"
"google.golang.org/grpc/status"
"google.golang.org/grpc/credentials/insecure"
pb "FRMS/internal/pkg/grpc"
)
// this package will implement a reactor coordinator and associated go routines
type ReactorManager struct {
*Manager
StatusMon *StatusMonitor
DevsMon *StatusMonitor
*devstatus
}
type devstatus struct {
sync.Mutex
Devs map[uint32]*DeviceInfo
}
func NewReactorManager(c *Client,sys *SystemViewer,err chan error) GeneralManager {
r := &ReactorManager{}
di := make(map[uint32]*DeviceInfo)
r.devstatus = &devstatus{Devs:di}
r.Manager = NewManager(err)
r.StatusMon = sys.AddReactorSender()
r.DevsMon = sys.AddDeviceSender(c.Id)
return r
}
func (r *ReactorManager) Start(cl *Client) {
r.Manager.Start(cl)
go r.StatusMon.Start()
go r.DevsMon.Start()
go r.StatusMon.Send(&DeviceInfo{Id:r.Id,Type:"Reactor",Status:"[green]ONLINE[white]"})
conn := r.Connect()
empty := &grpc.ClientConn{}
if conn != empty {
go r.Monitor(conn)
}
}
func (r *ReactorManager) Exit() {
r.Manager.Exit()
go r.StatusMon.Send(&DeviceInfo{Id:r.Id,Type:"Reactor",Status:"[red]OFFLINE[white]",Data:fmt.Sprintf("Last Seen %v",time.Now().Format("Mon at 03:04:05pm MST"))})
}
func (r *ReactorManager) GetPort() int {
return 0
}
func (r *ReactorManager) Connect() *grpc.ClientConn {
// establish gRPC conection with reactor
var opts []grpc.DialOption
var conn *grpc.ClientConn
opts = append(opts,grpc.WithTransportCredentials(insecure.NewCredentials()))
for {
if !r.IsActive() {
log.Fatal("No longer active, aborting connection attempt\n")
return &grpc.ClientConn{}
}
var err error
conn, err = grpc.Dial(fmt.Sprintf("%v:%v",r.Ip,r.Port),opts...)
// error handling
code := status.Code(err)
if code != 0 { // != OK
if code == (5 | 14) { // unavailable or not found
to := r.Timeout()
if to == 0 {
log.Printf("Client not responding\n")
return &grpc.ClientConn{}
}
log.Printf("Client currently down, retrying in %v ms\n",to)
time.Sleep(time.Duration(to) * time.Millisecond)
} else {
log.Fatal("GRPC ERROR: %v",code)
r.Err <- err
}
}
break;
}
return conn
}
func (r *ReactorManager) Monitor(conn *grpc.ClientConn) {
defer conn.Close()
client := pb.NewMonitoringClient(conn)
for r.IsActive() {
req := &pb.ReactorStatusRequest{Id:r.Id}
resp, err := client.GetReactorStatus(context.Background(),req)
code := status.Code(err)
if code != 0 { // if != OK
fmt.Printf("Reactor %v down! Code: %v\n", r.Id,code)
r.devstatus.Lock()
for _, d := range r.Devs {
newd := d
newd.Status = "[yellow]UNKOWN[white]"
go r.DevsMon.Send(newd)
}
r.devstatus.Unlock()
r.Exit()
break;
}
r.devstatus.Lock()
for _,v := range resp.GetDevices() {
d := &DeviceInfo{Id:uint32(v.GetAddr()),Type:v.GetType(),Status:v.GetStatus(),Data:v.GetData()}
go r.DevsMon.Send(d)
r.Devs[d.Id] = d
}
r.devstatus.Unlock()
time.Sleep(r.Hb) // time between sensor pings
}
}