You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

202 lines
4.8 KiB
Go

package server
import (
pb "FRMS/internal/pkg/grpc"
"FRMS/internal/pkg/influxdb"
_ "FRMS/internal/pkg/influxdb"
"FRMS/internal/pkg/logging"
"context"
"errors"
"fmt"
"net"
"sync"
"github.com/spf13/viper"
"google.golang.org/grpc"
)
// this package creates the central coordiantor and sub coordiantors to route clients
// db client interface
type Database interface {
// getters (all create if doesnt exist)
GetReactorClient(int) (string, string, string, string, error) // returns (url, org, bucket, token, err)
}
func NewDatabaseAdmin(config *viper.Viper) (Database, error) {
return influxdb.NewDBAdmin(config)
}
type CentralCoordinator struct {
// main coordinator
ClientConnections *ClientPacket
*ReactorCoordinator
Database
Config *viper.Viper
// from config
Ports map[string]int `mapstructure:"ports"`
Err chan error
}
func NewCentralCoordinator(config *viper.Viper, ch chan error) *CentralCoordinator {
// create a central coordinator to manage requests
db, err := NewDatabaseAdmin(config)
if err != nil {
ch <- err
}
rc, err := NewReactorCoordinator(config, ch)
if err != nil {
ch <- err
}
config.UnmarshalKey("server.ports", rc) // get reactor port
c := &CentralCoordinator{
Err: ch,
Config: config,
Database: db,
ReactorCoordinator: rc,
}
// grab config settings
if err = config.UnmarshalKey("server", c); err != nil {
ch <- err
}
return c
}
func (c *CentralCoordinator) Start() {
// starts up associated funcs
clientChan := make(chan *ClientPacket)
l := NewListener(clientChan, c.Err)
// grabs lis port
c.Config.UnmarshalKey("server.ports", l)
// starting reactor coordinator
if err := c.ReactorCoordinator.Start(); err != nil {
c.Err <- err
}
// starting listener
if err := l.Start(); err != nil {
c.Err <- err
}
// lastly start client listener
go c.ClientListener(clientChan)
}
func (c *CentralCoordinator) ClientListener(ch chan *ClientPacket) {
for client := range ch {
// basically loops until channel is closed
client.Response <- c.ClientHandler(client.Client) // respond with cred
}
}
func (c *CentralCoordinator) ClientHandler(cl *Client) *ClientResponse {
// returns reactor db info
var err error
cr := &ClientResponse{Port: c.Ports[cl.Type]}
if cl.Type == "reactor" {
// get reactor info
go c.ReactorCoordinator.ClientHandler(cl)
// db info
cr.URL, cr.Org, cr.Token, cr.Bucket, err = c.Database.GetReactorClient(cl.Id)
} else {
// throw error
err = fmt.Errorf("Client type %s not recognized!", cl.Type)
}
// returns based on cl type
if err != nil {
c.Err <- err
}
return cr
}
type ReactorCoordinator struct {
Port int `mapstructure:"reactor"`
*ReactorManagers
Err chan error
pb.UnimplementedMonitoringServer
}
type ReactorManagers struct {
Config *viper.Viper
Directory map[int]*ReactorManager
sync.RWMutex
}
func NewReactorCoordinator(config *viper.Viper, errCh chan error) (*ReactorCoordinator, error) {
rmap := make(map[int]*ReactorManager)
rm := &ReactorManagers{Directory: rmap, Config: config}
c := &ReactorCoordinator{Err: errCh, ReactorManagers: rm}
return c, nil
}
func (c *ReactorCoordinator) Start() error {
logging.Debug(logging.DStart, "RCO 01 Starting!")
// register grpc service
return c.Register()
}
func (c *ReactorCoordinator) ClientHandler(cl *Client) {
// updates clients if nessecary
if err := c.UpdateReactorManager(cl, c.Err); err != nil {
c.Err <- err
}
}
func (m *ReactorManagers) GetReactorManager(id int) (*ReactorManager, error) {
m.RLock()
defer m.RUnlock()
rm, exists := m.Directory[id]
if !exists {
return &ReactorManager{}, errors.New(fmt.Sprintf("No manager for reactor %d!", id))
}
return rm, nil
}
func (m *ReactorManagers) UpdateReactorManager(cl *Client, errCh chan error) error {
// locking
m.RLock()
defer m.RUnlock()
var err error
rm, exists := m.Directory[cl.Id]
if !exists {
logging.Debug(logging.DClient, "RCO creating manager for reactor client %v", cl.Id)
// creating
if rm, err = NewReactorManager(cl, m.Config, errCh); err != nil {
return err
}
// starting
if err = rm.Start(); err != nil {
return err
}
m.Directory[cl.Id] = rm
}
return rm.UpdateClient(cl)
}
func (r *ReactorCoordinator) Register() error {
lis, err := net.Listen("tcp", fmt.Sprintf(":%v", r.Port))
if err != nil {
return err
}
grpcServer := grpc.NewServer()
pb.RegisterMonitoringServer(grpcServer, r)
go grpcServer.Serve(lis)
logging.Debug(logging.DClient, "RCO ready for client requests")
return nil
}
func (r *ReactorCoordinator) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) {
rm, err := r.GetReactorManager(int(req.GetId()))
// error checking
if err != nil {
return &pb.ReactorStatusResponse{}, err
}
return rm.ReactorStatusHandler(ctx, req)
}