// package Server provides a way to listen for incoming connections // and manage multiple reactor clients. package server import ( pb "FRMS/internal/pkg/grpc" "FRMS/internal/pkg/influxdb" "FRMS/internal/pkg/logging" "context" "fmt" "net" "sync" "github.com/spf13/viper" "google.golang.org/grpc" ) // Database is an interface to interact with the server database. // Used mainly to find existing credentials for // incoming reactor client connections. type Database interface { GetReactorClient(int) (string, string, string, string, error) // returns (url, org, bucket, token, err) } // NewDatabaseAdmin creates a new database admin that implements the // Database interface. // Allows access to the database to find/create reactor credentials. // Implemented via the influxdb package. func NewDatabaseAdmin(config *viper.Viper) (Database, error) { return influxdb.NewDBAdmin(config) } // CentralCoordinator is the main coordinator struct that runs on the server. // Used to oversee the reactor managers as well as process incoming // client connections. // Also interacts with the database and global config. type CentralCoordinator struct { ClientConnections *ClientPacket *ReactorCoordinator Database Config *viper.Viper // from config Ports map[string]int `mapstructure:"ports"` Err chan error } // NewCentralCoordinator creates a central coordinator with the given global // config and error channel. // It will create a new reactor coordinator and database admin. // It will also try to load the existing configuration information. func NewCentralCoordinator(config *viper.Viper, ch chan error) *CentralCoordinator { // create a central coordinator to manage requests db, err := NewDatabaseAdmin(config) if err != nil { ch <- err } rc, err := NewReactorCoordinator(config, ch) if err != nil { ch <- err } config.UnmarshalKey("server.ports", rc) c := &CentralCoordinator{ Err: ch, Config: config, Database: db, ReactorCoordinator: rc, } // grab config settings if err = config.UnmarshalKey("server", c); err != nil { ch <- err } return c } // Start activates the central coordinator and ensures it is ready for // new clients. // Creates a listener and starts both reactor coordinator and listener. func (c *CentralCoordinator) Start() { clientChan := make(chan *ClientPacket) l, err := NewListener(clientChan, c.Err) if err != nil { c.Err <- err } c.Config.UnmarshalKey("server.ports", l) if err := c.ReactorCoordinator.Start(); err != nil { c.Err <- err } if err := l.Start(); err != nil { c.Err <- err } go c.ClientListener(clientChan) } // ClientListener listens on the given channel for clients that are sent // over from the listener. // The clients are then passed to the handler before returning the response. func (c *CentralCoordinator) ClientListener(ch chan *ClientPacket) { for client := range ch { client.Response <- c.ClientHandler(client.Client) // respond with cred } } // ClientHandler takes in a client and retrieves the associated // database credentials. // Currently only handles reactor type clients, can be modified // to support others. func (c *CentralCoordinator) ClientHandler(cl *Client) *ClientResponse { // returns reactor db info var err error cr := &ClientResponse{Port: c.Ports[cl.Type]} if cl.Type != "reactor" { c.Err <- fmt.Errorf("client type %s not recognized", cl.Type) } go c.ReactorCoordinator.ClientHandler(cl) // db info cr.URL, cr.Org, cr.Token, cr.Bucket, err = c.Database.GetReactorClient(cl.Id) if err != nil { c.Err <- err } return cr } // ReactorCoordinator is a strucutre used to store reactor managers for // clients that have connected at some point. type ReactorCoordinator struct { Port int `mapstructure:"reactor"` *ReactorManagers Err chan error pb.UnimplementedMonitoringServer } // ReactorManagers is a structure that stores a concurrent map of the // reactor managers as well as the global config. type ReactorManagers struct { Config *viper.Viper Directory map[int]*ReactorManager sync.RWMutex } // NewReactorCoordinator takes the global config and error channel and returns // a pointer to a ReactorCoordinator as well as any errors. func NewReactorCoordinator(config *viper.Viper, errCh chan error) (*ReactorCoordinator, error) { rmap := make(map[int]*ReactorManager) rm := &ReactorManagers{ Directory: rmap, Config: config, } return &ReactorCoordinator{ Err: errCh, ReactorManagers: rm, }, nil } // Start starts the reactor coordinator and kicks off // registering the gRPC service func (c *ReactorCoordinator) Start() error { logging.Debug(logging.DStart, "RCO 01 Starting!") return c.Register() } // ClientHandler takes in a client and finds or creates the correct // manager for said client. func (c *ReactorCoordinator) ClientHandler(cl *Client) { if err := c.UpdateReactorManager(cl, c.Err); err != nil { c.Err <- err } } // GetReactorManager attempts to locate a reactor manager for a given id. // Returns either the associated reactor manager, or an error if // a manager does not exist for the given id. func (m *ReactorManagers) GetReactorManager(id int) (*ReactorManager, error) { m.RLock() defer m.RUnlock() rm, exists := m.Directory[id] if !exists { return &ReactorManager{}, fmt.Errorf("no manager for reactor %d", id) } return rm, nil } // UpdateReactorManager takes in a client and error channel and passes the // client to the associate reactor manager. // If the client does not have an existing reactor manager, it will create one // , start it, and add it to the map for future calls. // The function then calls UpdateClient on the reactor manager and returns // any errors generated by this function. func (m *ReactorManagers) UpdateReactorManager(cl *Client, errCh chan error) error { m.RLock() defer m.RUnlock() var err error rm, exists := m.Directory[cl.Id] if !exists { // reactor manager does not exist, creating new one logging.Debug( logging.DClient, "RCO 01 creating manager for %v", cl.Id, ) if rm, err = NewReactorManager(cl, m.Config, errCh); err != nil { return err } if err = rm.Start(); err != nil { return err } m.Directory[cl.Id] = rm } return rm.UpdateClient(cl) } // Register attaches to the servers port and attempts to bind // a gRPC server to it. func (r *ReactorCoordinator) Register() error { lis, err := net.Listen("tcp", fmt.Sprintf(":%v", r.Port)) if err != nil { return err } grpcServer := grpc.NewServer() pb.RegisterMonitoringServer(grpcServer, r) go grpcServer.Serve(lis) logging.Debug(logging.DClient, "RCO 01 ready") return nil } // ReactorStatusHandler is a gRPC handler used to handle incoming // reactor requests containing information about said reactor. // It will get the associate reactor manager and pass the // request device information before returning an acknowledgement. func (r *ReactorCoordinator) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) { rm, err := r.GetReactorManager(int(req.GetId())) if err != nil { return &pb.ReactorStatusResponse{}, err } go rm.ReactorDeviceHandler(req.GetDevices()) return &pb.ReactorStatusResponse{Id: int32(rm.Id)}, nil }