documented coordinator

main
KeeganForelight 2 years ago
parent 4652fea1d8
commit a0bda5d6b3

@ -1,12 +1,12 @@
// package Server provides a way to listen for incoming connections
// and manage multiple reactor clients.
package server package server
import ( import (
pb "FRMS/internal/pkg/grpc" pb "FRMS/internal/pkg/grpc"
"FRMS/internal/pkg/influxdb" "FRMS/internal/pkg/influxdb"
_ "FRMS/internal/pkg/influxdb"
"FRMS/internal/pkg/logging" "FRMS/internal/pkg/logging"
"context" "context"
"errors"
"fmt" "fmt"
"net" "net"
"sync" "sync"
@ -15,20 +15,26 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
) )
// this package creates the central coordiantor and sub coordiantors to route clients // Database is an interface to interact with the server database.
// Used mainly to find existing credentials for
// db client interface // incoming reactor client connections.
type Database interface { type Database interface {
// getters (all create if doesnt exist)
GetReactorClient(int) (string, string, string, string, error) // returns (url, org, bucket, token, err) GetReactorClient(int) (string, string, string, string, error) // returns (url, org, bucket, token, err)
} }
// NewDatabaseAdmin creates a new database admin that implements the
// Database interface.
// Allows access to the database to find/create reactor credentials.
// Implemented via the influxdb package.
func NewDatabaseAdmin(config *viper.Viper) (Database, error) { func NewDatabaseAdmin(config *viper.Viper) (Database, error) {
return influxdb.NewDBAdmin(config) return influxdb.NewDBAdmin(config)
} }
// CentralCoordinator is the main coordinator struct that runs on the server.
// Used to oversee the reactor managers as well as process incoming
// client connections.
// Also interacts with the database and global config.
type CentralCoordinator struct { type CentralCoordinator struct {
// main coordinator
ClientConnections *ClientPacket ClientConnections *ClientPacket
*ReactorCoordinator *ReactorCoordinator
Database Database
@ -38,6 +44,10 @@ type CentralCoordinator struct {
Err chan error Err chan error
} }
// NewCentralCoordinator creates a central coordinator with the given global
// config and error channel.
// It will create a new reactor coordinator and database admin.
// It will also try to load the existing configuration information.
func NewCentralCoordinator(config *viper.Viper, ch chan error) *CentralCoordinator { func NewCentralCoordinator(config *viper.Viper, ch chan error) *CentralCoordinator {
// create a central coordinator to manage requests // create a central coordinator to manage requests
db, err := NewDatabaseAdmin(config) db, err := NewDatabaseAdmin(config)
@ -49,13 +59,16 @@ func NewCentralCoordinator(config *viper.Viper, ch chan error) *CentralCoordinat
if err != nil { if err != nil {
ch <- err ch <- err
} }
config.UnmarshalKey("server.ports", rc) // get reactor port
config.UnmarshalKey("server.ports", rc)
c := &CentralCoordinator{ c := &CentralCoordinator{
Err: ch, Err: ch,
Config: config, Config: config,
Database: db, Database: db,
ReactorCoordinator: rc, ReactorCoordinator: rc,
} }
// grab config settings // grab config settings
if err = config.UnmarshalKey("server", c); err != nil { if err = config.UnmarshalKey("server", c); err != nil {
ch <- err ch <- err
@ -64,53 +77,64 @@ func NewCentralCoordinator(config *viper.Viper, ch chan error) *CentralCoordinat
return c return c
} }
// Start activates the central coordinator and ensures it is ready for
// new clients.
// Creates a listener and starts both reactor coordinator and listener.
func (c *CentralCoordinator) Start() { func (c *CentralCoordinator) Start() {
// starts up associated funcs
clientChan := make(chan *ClientPacket) clientChan := make(chan *ClientPacket)
l := NewListener(clientChan, c.Err) l := NewListener(clientChan, c.Err)
// grabs lis port
c.Config.UnmarshalKey("server.ports", l) c.Config.UnmarshalKey("server.ports", l)
// starting reactor coordinator
if err := c.ReactorCoordinator.Start(); err != nil { if err := c.ReactorCoordinator.Start(); err != nil {
c.Err <- err c.Err <- err
} }
// starting listener
if err := l.Start(); err != nil { if err := l.Start(); err != nil {
c.Err <- err c.Err <- err
} }
// lastly start client listener
go c.ClientListener(clientChan) go c.ClientListener(clientChan)
} }
// ClientListener listens on the given channel for clients that are sent
// over from the listener.
// The clients are then passed to the handler before returning the response.
func (c *CentralCoordinator) ClientListener(ch chan *ClientPacket) { func (c *CentralCoordinator) ClientListener(ch chan *ClientPacket) {
for client := range ch { for client := range ch {
// basically loops until channel is closed
client.Response <- c.ClientHandler(client.Client) // respond with cred client.Response <- c.ClientHandler(client.Client) // respond with cred
} }
} }
// ClientHandler takes in a client and retrieves the associated
// database credentials.
// Currently only handles reactor type clients, can be modified
// to support others.
func (c *CentralCoordinator) ClientHandler(cl *Client) *ClientResponse { func (c *CentralCoordinator) ClientHandler(cl *Client) *ClientResponse {
// returns reactor db info // returns reactor db info
var err error var err error
cr := &ClientResponse{Port: c.Ports[cl.Type]} cr := &ClientResponse{Port: c.Ports[cl.Type]}
if cl.Type == "reactor" { if cl.Type != "reactor" {
// get reactor info c.Err <- fmt.Errorf("client type %s not recognized", cl.Type)
}
go c.ReactorCoordinator.ClientHandler(cl) go c.ReactorCoordinator.ClientHandler(cl)
// db info // db info
cr.URL, cr.Org, cr.Token, cr.Bucket, err = c.Database.GetReactorClient(cl.Id) cr.URL, cr.Org, cr.Token, cr.Bucket, err = c.Database.GetReactorClient(cl.Id)
} else {
// throw error
err = fmt.Errorf("Client type %s not recognized!", cl.Type)
}
// returns based on cl type
if err != nil { if err != nil {
c.Err <- err c.Err <- err
} }
return cr return cr
} }
// ReactorCoordinator is a strucutre used to store reactor managers for
// clients that have connected at some point.
type ReactorCoordinator struct { type ReactorCoordinator struct {
Port int `mapstructure:"reactor"` Port int `mapstructure:"reactor"`
*ReactorManagers *ReactorManagers
@ -118,84 +142,130 @@ type ReactorCoordinator struct {
pb.UnimplementedMonitoringServer pb.UnimplementedMonitoringServer
} }
// ReactorManagers is a structure that stores a concurrent map of the
// reactor managers as well as the global config.
type ReactorManagers struct { type ReactorManagers struct {
Config *viper.Viper Config *viper.Viper
Directory map[int]*ReactorManager Directory map[int]*ReactorManager
sync.RWMutex sync.RWMutex
} }
// NewReactorCoordinator takes the global config and error channel and returns
// a pointer to a ReactorCoordinator as well as any errors.
func NewReactorCoordinator(config *viper.Viper, errCh chan error) (*ReactorCoordinator, error) { func NewReactorCoordinator(config *viper.Viper, errCh chan error) (*ReactorCoordinator, error) {
rmap := make(map[int]*ReactorManager) rmap := make(map[int]*ReactorManager)
rm := &ReactorManagers{Directory: rmap, Config: config}
c := &ReactorCoordinator{Err: errCh, ReactorManagers: rm} rm := &ReactorManagers{
return c, nil Directory: rmap,
Config: config,
} }
return &ReactorCoordinator{
Err: errCh,
ReactorManagers: rm,
}, nil
}
// Start starts the reactor coordinator and kicks off
// registering the gRPC service
func (c *ReactorCoordinator) Start() error { func (c *ReactorCoordinator) Start() error {
logging.Debug(logging.DStart, "RCO 01 Starting!") logging.Debug(logging.DStart, "RCO 01 Starting!")
// register grpc service
return c.Register() return c.Register()
} }
// ClientHandler takes in a client and finds or creates the correct
// manager for said client.
func (c *ReactorCoordinator) ClientHandler(cl *Client) { func (c *ReactorCoordinator) ClientHandler(cl *Client) {
// updates clients if nessecary
if err := c.UpdateReactorManager(cl, c.Err); err != nil { if err := c.UpdateReactorManager(cl, c.Err); err != nil {
c.Err <- err c.Err <- err
} }
} }
// GetReactorManager attempts to locate a reactor manager for a given id.
// Returns either the associated reactor manager, or an error if
// a manager does not exist for the given id.
func (m *ReactorManagers) GetReactorManager(id int) (*ReactorManager, error) { func (m *ReactorManagers) GetReactorManager(id int) (*ReactorManager, error) {
m.RLock() m.RLock()
defer m.RUnlock() defer m.RUnlock()
rm, exists := m.Directory[id] rm, exists := m.Directory[id]
if !exists { if !exists {
return &ReactorManager{}, errors.New(fmt.Sprintf("No manager for reactor %d!", id)) return &ReactorManager{}, fmt.Errorf("no manager for reactor %d", id)
} }
return rm, nil return rm, nil
} }
// UpdateReactorManager takes in a client and error channel and passes the
// client to the associate reactor manager.
// If the client does not have an existing reactor manager, it will create one
// , start it, and add it to the map for future calls.
// The function then calls UpdateClient on the reactor manager and returns
// any errors generated by this function.
func (m *ReactorManagers) UpdateReactorManager(cl *Client, errCh chan error) error { func (m *ReactorManagers) UpdateReactorManager(cl *Client, errCh chan error) error {
// locking
m.RLock() m.RLock()
defer m.RUnlock() defer m.RUnlock()
var err error var err error
rm, exists := m.Directory[cl.Id] rm, exists := m.Directory[cl.Id]
if !exists { if !exists {
logging.Debug(logging.DClient, "RCO creating manager for reactor client %v", cl.Id) // reactor manager does not exist, creating new one
// creating logging.Debug(
logging.DClient,
"RCO 01 creating manager for %v",
cl.Id,
)
if rm, err = NewReactorManager(cl, m.Config, errCh); err != nil { if rm, err = NewReactorManager(cl, m.Config, errCh); err != nil {
return err return err
} }
// starting
if err = rm.Start(); err != nil { if err = rm.Start(); err != nil {
return err return err
} }
m.Directory[cl.Id] = rm m.Directory[cl.Id] = rm
} }
return rm.UpdateClient(cl) return rm.UpdateClient(cl)
} }
// Register attaches to the servers port and attempts to bind
// a gRPC server to it.
func (r *ReactorCoordinator) Register() error { func (r *ReactorCoordinator) Register() error {
lis, err := net.Listen("tcp", fmt.Sprintf(":%v", r.Port)) lis, err := net.Listen("tcp", fmt.Sprintf(":%v", r.Port))
if err != nil { if err != nil {
return err return err
} }
grpcServer := grpc.NewServer() grpcServer := grpc.NewServer()
pb.RegisterMonitoringServer(grpcServer, r) pb.RegisterMonitoringServer(grpcServer, r)
go grpcServer.Serve(lis) go grpcServer.Serve(lis)
logging.Debug(logging.DClient, "RCO ready for client requests")
logging.Debug(logging.DClient, "RCO 01 ready")
return nil return nil
} }
// ReactorStatusHandler is a gRPC handler used to handle incoming
// reactor requests containing information about said reactor.
// It will get the associate reactor manager and pass the
// request down for further processing.
func (r *ReactorCoordinator) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) { func (r *ReactorCoordinator) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) {
rm, err := r.GetReactorManager(int(req.GetId())) rm, err := r.GetReactorManager(int(req.GetId()))
// error checking
if err != nil { if err != nil {
return &pb.ReactorStatusResponse{}, err return &pb.ReactorStatusResponse{}, err
} }
return rm.ReactorStatusHandler(ctx, req) return rm.ReactorStatusHandler(ctx, req)
} }

Loading…
Cancel
Save