|
|
|
@ -6,80 +6,109 @@ import (
|
|
|
|
|
"FRMS/internal/pkg/manager"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
//"FRMS/internal/pkg/device"
|
|
|
|
|
"context"
|
|
|
|
|
"fmt"
|
|
|
|
|
_ "log"
|
|
|
|
|
|
|
|
|
|
"github.com/spf13/viper"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// manager stuff
|
|
|
|
|
// MaxConnectionAttempts is the max number of tries to allow
|
|
|
|
|
// when connecting to a reactor.
|
|
|
|
|
const MaxConnectionAttempts = 10
|
|
|
|
|
|
|
|
|
|
// Manager is an interface requiring a structure that can be started
|
|
|
|
|
// and stopped as well as provide timeouts in milliseconds.
|
|
|
|
|
type Manager interface {
|
|
|
|
|
Start() error // status checks
|
|
|
|
|
Stop() error
|
|
|
|
|
Timeout() (time.Duration, error) // TO Generator
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// NewManager returns a manager fulfilling the Manager interface as well as
|
|
|
|
|
// any potential errors.
|
|
|
|
|
func NewManager(max int) (Manager, error) {
|
|
|
|
|
// takes a heartbeat and max connection attempts
|
|
|
|
|
return manager.New(max)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ReactorManager contains a base manager, client, global
|
|
|
|
|
// config, and error channel.
|
|
|
|
|
// The ReactorManager can be started/stopped as clients connect/disconnect.
|
|
|
|
|
// Also serves as handler for gRPC requests from reactors.
|
|
|
|
|
// Can be extended to write changes to config.
|
|
|
|
|
type ReactorManager struct {
|
|
|
|
|
Manager // base manager interface
|
|
|
|
|
// *ClientManager // client manager (OUTDATED)
|
|
|
|
|
*Client // access to ID etc
|
|
|
|
|
// StatusMon *StatusMonitor putting on pause
|
|
|
|
|
// *ReactorDevices
|
|
|
|
|
Config *viper.Viper // config to update
|
|
|
|
|
*Client
|
|
|
|
|
Config *viper.Viper // global config to maintain
|
|
|
|
|
Err chan error
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func NewReactorManager(cl *Client,
|
|
|
|
|
// NewReactorManager takes in a client, config and channel to pass errors on.
|
|
|
|
|
// Returns a new reactor manager as well as any errors that occured during
|
|
|
|
|
// creation.
|
|
|
|
|
// Uses MaxConnectionAttempts which defaults to 10 to prevent
|
|
|
|
|
// unnessecary network load and/or timeout lengths.
|
|
|
|
|
func NewReactorManager(
|
|
|
|
|
cl *Client,
|
|
|
|
|
config *viper.Viper,
|
|
|
|
|
errCh chan error,
|
|
|
|
|
) (*ReactorManager, error) {
|
|
|
|
|
// making managers
|
|
|
|
|
m, err := NewManager(6)
|
|
|
|
|
|
|
|
|
|
m, err := NewManager(MaxConnectionAttempts)
|
|
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
return &ReactorManager{}, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
r := &ReactorManager{
|
|
|
|
|
Manager: m,
|
|
|
|
|
Client: cl,
|
|
|
|
|
Config: config,
|
|
|
|
|
Err: errCh,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return r, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Start logs the start and calls start on the embedded manager.
|
|
|
|
|
func (r *ReactorManager) Start() error {
|
|
|
|
|
// allows for extra stuff
|
|
|
|
|
logging.Debug(logging.DStart, "RMA %v starting", r.Id)
|
|
|
|
|
return r.Manager.Start()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *ReactorManager) Exit() error {
|
|
|
|
|
// allows for extra stuff
|
|
|
|
|
logging.Debug(logging.DExit, "RMA %v exiting", r.Id)
|
|
|
|
|
return r.Manager.Exit()
|
|
|
|
|
// Stop logs the stop and calls stop on the embedded manager.
|
|
|
|
|
func (r *ReactorManager) Stop() error {
|
|
|
|
|
logging.Debug(logging.DExit, "RMA %v stopping", r.Id)
|
|
|
|
|
return r.Manager.Stop()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// UpdateClient is used to change the underlying manager client if there
|
|
|
|
|
// changes to its data.
|
|
|
|
|
//
|
|
|
|
|
// BUG(Keegan): Client is not protected by a lock and may lead to races
|
|
|
|
|
func (r *ReactorManager) UpdateClient(cl *Client) error {
|
|
|
|
|
// this is probably unnessecary
|
|
|
|
|
fmt.Printf("Reactor Manager %d updating client!\n", r.Id)
|
|
|
|
|
logging.Debug(logging.DClient, "RMA %v updating client", r.Id)
|
|
|
|
|
r.Client = cl
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *ReactorManager) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) {
|
|
|
|
|
// function client will call to update reactor information
|
|
|
|
|
fmt.Printf("Recieved ping from %d!\n", req.GetId())
|
|
|
|
|
// update devices/sensors
|
|
|
|
|
// ReactorStatusHandler implements a gRPC handler that is called by reactors.
|
|
|
|
|
// Takes in a context and request which has reactor and device information.
|
|
|
|
|
// For now, loops over devices and logs information about their status.
|
|
|
|
|
func (r *ReactorManager) ReactorStatusHandler(
|
|
|
|
|
ctx context.Context,
|
|
|
|
|
req *pb.ReactorStatusPing,
|
|
|
|
|
) (*pb.ReactorStatusResponse, error) {
|
|
|
|
|
|
|
|
|
|
logging.Debug(logging.DClient, "RMA %v recieved ping", r.Id)
|
|
|
|
|
|
|
|
|
|
for _, dev := range req.GetDevices() {
|
|
|
|
|
fmt.Printf("Device %d is %s ", dev.GetAddr(), dev.GetStatus().String())
|
|
|
|
|
logging.Debug(
|
|
|
|
|
logging.DClient,
|
|
|
|
|
"RMA %v device %v is %v",
|
|
|
|
|
r.Id,
|
|
|
|
|
dev.GetAddr(),
|
|
|
|
|
dev.GetStatus().String(),
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
fmt.Printf("\n")
|
|
|
|
|
// go r.UpdateDevices(req.GetDevices())
|
|
|
|
|
|
|
|
|
|
return &pb.ReactorStatusResponse{Id: int32(r.Id)}, nil
|
|
|
|
|
}
|
|
|
|
|