From a0bda5d6b3901ee2e442c4babac40662cd856eb4 Mon Sep 17 00:00:00 2001 From: KeeganForelight Date: Wed, 14 Jun 2023 15:17:46 -0400 Subject: [PATCH] documented coordinator --- internal/pkg/server/coordinator.go | 140 +++++++++++++++++++++-------- 1 file changed, 105 insertions(+), 35 deletions(-) diff --git a/internal/pkg/server/coordinator.go b/internal/pkg/server/coordinator.go index 6bf495c..af52cd3 100644 --- a/internal/pkg/server/coordinator.go +++ b/internal/pkg/server/coordinator.go @@ -1,12 +1,12 @@ +// package Server provides a way to listen for incoming connections +// and manage multiple reactor clients. package server import ( pb "FRMS/internal/pkg/grpc" "FRMS/internal/pkg/influxdb" - _ "FRMS/internal/pkg/influxdb" "FRMS/internal/pkg/logging" "context" - "errors" "fmt" "net" "sync" @@ -15,20 +15,26 @@ import ( "google.golang.org/grpc" ) -// this package creates the central coordiantor and sub coordiantors to route clients - -// db client interface +// Database is an interface to interact with the server database. +// Used mainly to find existing credentials for +// incoming reactor client connections. type Database interface { - // getters (all create if doesnt exist) GetReactorClient(int) (string, string, string, string, error) // returns (url, org, bucket, token, err) } +// NewDatabaseAdmin creates a new database admin that implements the +// Database interface. +// Allows access to the database to find/create reactor credentials. +// Implemented via the influxdb package. func NewDatabaseAdmin(config *viper.Viper) (Database, error) { return influxdb.NewDBAdmin(config) } +// CentralCoordinator is the main coordinator struct that runs on the server. +// Used to oversee the reactor managers as well as process incoming +// client connections. +// Also interacts with the database and global config. type CentralCoordinator struct { - // main coordinator ClientConnections *ClientPacket *ReactorCoordinator Database @@ -38,6 +44,10 @@ type CentralCoordinator struct { Err chan error } +// NewCentralCoordinator creates a central coordinator with the given global +// config and error channel. +// It will create a new reactor coordinator and database admin. +// It will also try to load the existing configuration information. func NewCentralCoordinator(config *viper.Viper, ch chan error) *CentralCoordinator { // create a central coordinator to manage requests db, err := NewDatabaseAdmin(config) @@ -49,13 +59,16 @@ func NewCentralCoordinator(config *viper.Viper, ch chan error) *CentralCoordinat if err != nil { ch <- err } - config.UnmarshalKey("server.ports", rc) // get reactor port + + config.UnmarshalKey("server.ports", rc) + c := &CentralCoordinator{ Err: ch, Config: config, Database: db, ReactorCoordinator: rc, } + // grab config settings if err = config.UnmarshalKey("server", c); err != nil { ch <- err @@ -64,53 +77,64 @@ func NewCentralCoordinator(config *viper.Viper, ch chan error) *CentralCoordinat return c } +// Start activates the central coordinator and ensures it is ready for +// new clients. +// Creates a listener and starts both reactor coordinator and listener. func (c *CentralCoordinator) Start() { - // starts up associated funcs + clientChan := make(chan *ClientPacket) l := NewListener(clientChan, c.Err) - // grabs lis port + c.Config.UnmarshalKey("server.ports", l) - // starting reactor coordinator if err := c.ReactorCoordinator.Start(); err != nil { c.Err <- err } - // starting listener + if err := l.Start(); err != nil { c.Err <- err } - // lastly start client listener + go c.ClientListener(clientChan) } +// ClientListener listens on the given channel for clients that are sent +// over from the listener. +// The clients are then passed to the handler before returning the response. func (c *CentralCoordinator) ClientListener(ch chan *ClientPacket) { + for client := range ch { - // basically loops until channel is closed client.Response <- c.ClientHandler(client.Client) // respond with cred } } +// ClientHandler takes in a client and retrieves the associated +// database credentials. +// Currently only handles reactor type clients, can be modified +// to support others. func (c *CentralCoordinator) ClientHandler(cl *Client) *ClientResponse { // returns reactor db info var err error cr := &ClientResponse{Port: c.Ports[cl.Type]} - if cl.Type == "reactor" { - // get reactor info - go c.ReactorCoordinator.ClientHandler(cl) - // db info - cr.URL, cr.Org, cr.Token, cr.Bucket, err = c.Database.GetReactorClient(cl.Id) - } else { - // throw error - err = fmt.Errorf("Client type %s not recognized!", cl.Type) + if cl.Type != "reactor" { + c.Err <- fmt.Errorf("client type %s not recognized", cl.Type) } - // returns based on cl type + + go c.ReactorCoordinator.ClientHandler(cl) + + // db info + cr.URL, cr.Org, cr.Token, cr.Bucket, err = c.Database.GetReactorClient(cl.Id) + if err != nil { c.Err <- err } + return cr } +// ReactorCoordinator is a strucutre used to store reactor managers for +// clients that have connected at some point. type ReactorCoordinator struct { Port int `mapstructure:"reactor"` *ReactorManagers @@ -118,84 +142,130 @@ type ReactorCoordinator struct { pb.UnimplementedMonitoringServer } +// ReactorManagers is a structure that stores a concurrent map of the +// reactor managers as well as the global config. type ReactorManagers struct { Config *viper.Viper Directory map[int]*ReactorManager sync.RWMutex } +// NewReactorCoordinator takes the global config and error channel and returns +// a pointer to a ReactorCoordinator as well as any errors. func NewReactorCoordinator(config *viper.Viper, errCh chan error) (*ReactorCoordinator, error) { + rmap := make(map[int]*ReactorManager) - rm := &ReactorManagers{Directory: rmap, Config: config} - c := &ReactorCoordinator{Err: errCh, ReactorManagers: rm} - return c, nil + + rm := &ReactorManagers{ + Directory: rmap, + Config: config, + } + + return &ReactorCoordinator{ + Err: errCh, + ReactorManagers: rm, + }, nil } +// Start starts the reactor coordinator and kicks off +// registering the gRPC service func (c *ReactorCoordinator) Start() error { + logging.Debug(logging.DStart, "RCO 01 Starting!") - // register grpc service + return c.Register() } +// ClientHandler takes in a client and finds or creates the correct +// manager for said client. func (c *ReactorCoordinator) ClientHandler(cl *Client) { - // updates clients if nessecary + if err := c.UpdateReactorManager(cl, c.Err); err != nil { c.Err <- err } } +// GetReactorManager attempts to locate a reactor manager for a given id. +// Returns either the associated reactor manager, or an error if +// a manager does not exist for the given id. func (m *ReactorManagers) GetReactorManager(id int) (*ReactorManager, error) { m.RLock() defer m.RUnlock() rm, exists := m.Directory[id] + if !exists { - return &ReactorManager{}, errors.New(fmt.Sprintf("No manager for reactor %d!", id)) + return &ReactorManager{}, fmt.Errorf("no manager for reactor %d", id) } return rm, nil } +// UpdateReactorManager takes in a client and error channel and passes the +// client to the associate reactor manager. +// If the client does not have an existing reactor manager, it will create one +// , start it, and add it to the map for future calls. +// The function then calls UpdateClient on the reactor manager and returns +// any errors generated by this function. func (m *ReactorManagers) UpdateReactorManager(cl *Client, errCh chan error) error { - // locking m.RLock() defer m.RUnlock() var err error rm, exists := m.Directory[cl.Id] + if !exists { - logging.Debug(logging.DClient, "RCO creating manager for reactor client %v", cl.Id) - // creating + // reactor manager does not exist, creating new one + logging.Debug( + logging.DClient, + "RCO 01 creating manager for %v", + cl.Id, + ) + if rm, err = NewReactorManager(cl, m.Config, errCh); err != nil { return err } - // starting if err = rm.Start(); err != nil { return err } + m.Directory[cl.Id] = rm } + return rm.UpdateClient(cl) } +// Register attaches to the servers port and attempts to bind +// a gRPC server to it. func (r *ReactorCoordinator) Register() error { + lis, err := net.Listen("tcp", fmt.Sprintf(":%v", r.Port)) if err != nil { return err } + grpcServer := grpc.NewServer() pb.RegisterMonitoringServer(grpcServer, r) + go grpcServer.Serve(lis) - logging.Debug(logging.DClient, "RCO ready for client requests") + + logging.Debug(logging.DClient, "RCO 01 ready") + return nil } +// ReactorStatusHandler is a gRPC handler used to handle incoming +// reactor requests containing information about said reactor. +// It will get the associate reactor manager and pass the +// request down for further processing. func (r *ReactorCoordinator) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) { + rm, err := r.GetReactorManager(int(req.GetId())) - // error checking + if err != nil { return &pb.ReactorStatusResponse{}, err } + return rm.ReactorStatusHandler(ctx, req) }