You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

344 lines
9.3 KiB
Go

package server
import (
"FRMS/internal/pkg/config"
pb "FRMS/internal/pkg/grpc"
"FRMS/internal/pkg/influxdb"
_ "FRMS/internal/pkg/influxdb"
"FRMS/internal/pkg/logging"
"context"
"errors"
"fmt"
"net"
"sync"
"google.golang.org/grpc"
)
// this package creates the central coordiantor and sub coordiantors for clients
// interfaces
// config interface
func LoadConfig() Config {
// returns a ServerConfig that we can query and update
return config.LoadConfig()
}
type Config interface { // PROPOSED RENAMING: ServerConfig to avoid confusion w/ reactor variant
// getters
GetURL() (string, error)
GetOrg() (string, error)
GetPort(string) (int, error)
GetServerToken() (string, error)
GetReactorClient(uint32) (string, string, error) // ret (bucket, token, err)
// setters
// save on write
//UpdateURL(string) error
//UpdateOrg(string) error
//UpdateServerToken(string) error
UpdateReactorClient(uint32, string, string) error // call (id, bucket, token)
}
// db client interface
type DB interface {
// getters (all create if doesnt exist)
//GetToken() (string, error) // returns admin token (Creates if it doesnt exist)
GetReactorClient(string) (string, string, error) // returns (bucket, token, err)
// delete
DeleteReactorClient(string) error // removes client token but maintains bucket
PurgeReactorClientData(string) error // perm deletes all assocaited reactor data (token, bucket etc)
}
func NewDBAdmin(url, org, token string) DB {
return influxdb.NewDBAdmin(token, org, url)
}
type CentralCoordinator struct {
ClientConnections *ClientPacket
//CLisPort int
*SubCoordinators
*SystemViewer
DB
Config
Err chan error
}
type SubCoordinators struct {
Directory map[string]*SubCoordinator
sync.Mutex
}
func NewCentralCoordinator(ch chan error) *CentralCoordinator {
c := &CentralCoordinator{Err: ch}
c.SystemViewer = NewSystemViewer()
go c.SystemViewer.Start()
s := make(map[string]*SubCoordinator)
sub := &SubCoordinators{Directory: s}
c.SubCoordinators = sub
return c
}
func (c *CentralCoordinator) Start() {
// starts up associated funcs
// begin with config and client
c.LoadCfg()
clientChan := make(chan *ClientPacket)
l := NewListener(clientChan, c.Err)
go l.Start()
go c.ClientListener(clientChan)
}
func (c *CentralCoordinator) LoadCfg() {
// loads db client info and updates if anything is missing
c.Config = LoadConfig()
_, err := c.Config.GetURL()
if err != nil {
logging.Debug(logging.DError, "CCO 01 Err: %v", err)
c.Err <- err
}
token, err := c.Config.GetServerToken()
if err != nil {
logging.Debug(logging.DError, "CCO 01 Err: %v", err)
c.Err <- err
}
org, err := c.Config.GetOrg()
if err != nil {
logging.Debug(logging.DError, "CCO 01 Err: %v", err)
c.Err <- err
}
}
func (c *CentralCoordinator) ClientListener(ch chan *ClientPacket) {
for client := range ch {
// basically loops until channel is closed
cr := c.ClientHandler(client.Client)
client.Response <- cr
}
}
func (c *CentralCoordinator) ClientHandler(cl *Client) *ClientResponse {
c.SubCoordinators.Lock()
defer c.SubCoordinators.Unlock()
subcoord, ok := c.SubCoordinators.Directory[cl.Type]
if !ok {
// Sub Coordinator does not exists
logging.Debug(logging.DSpawn, "CC0 01 Created %v Coordinator", cl.Type)
subcoord = NewSubCoordinator(cl.Type, c.SystemViewer, c.Err)
c.SubCoordinators.Directory[cl.Type] = subcoord
}
go subcoord.ClientHandler(cl)
// setting up client response
var url, org, token, bucket string
var port int
var err error
if url, err = c.Config.GetURL(); err != nil {
logging.Debug(logging.DError, "Error: %v", err)
c.Err <- err
} else if org, err = c.Config.GetOrg(); err != nil {
logging.Debug(logging.DError, "Error: %v", err)
c.Err <- err
} else if bucket, token, err = c.Config.GetReactorClient(cl.Id); err != nil {
logging.Debug(logging.DError, "Error: %v", err)
c.Err <- err
} else if port, err = c.Config.GetPort(cl.Type); err != nil {
logging.Debug(logging.DError, "Error: %v", err)
c.Err <- err
}
cr := &ClientResponse{URL: url, Org: org, Token: token, Bucket: bucket, Port: port}
return cr
}
type ManagerInterface interface {
Start()
NewManager(*Client, *SystemViewer, chan error) GeneralManager
GetManager(uint32) (GeneralManager, bool)
AddManager(uint32, GeneralManager)
Register()
}
type GeneralManager interface {
// used by sub coordinator to interact with manager
Start()
UpdateClient(*Client)
}
type SubCoordinator struct {
Port int // port that we set up gRPC endpoint on
ManagerInterface // embed an interface to create/manager managers
*SystemViewer
Err chan error
}
type Managers struct {
Directory map[uint32]interface{} // support for either manager
sync.RWMutex // potential perf
}
// interface stuff
func NewSubCoordinator(clientType string, sys *SystemViewer, err chan error) *SubCoordinator {
c := &SubCoordinator{Err: err}
c.SystemViewer = sys
man, errs := NewCoordinatorType(clientType, err)
if errs != nil {
err <- errs
}
c.ManagerInterface = man
go man.Start()
go man.Register()
return c
}
func (c *SubCoordinator) ClientHandler(cl *Client) {
// (creates and) notifies manager of client connection
c.UpdateManager(cl)
}
func (c *SubCoordinator) UpdateManager(cl *Client) {
// shouldn't happen all that often so should be fine to lock
m, exists := c.GetManager(cl.Id)
if !exists {
m = c.NewManager(cl, c.SystemViewer, c.Err)
m.UpdateClient(cl)
go c.AddManager(cl.Id, m)
go m.Start()
}
go m.UpdateClient(cl)
}
func (m *Managers) AddManager(id uint32, man GeneralManager) {
m.Lock()
defer m.Unlock()
m.Directory[id] = man
}
func (m *Managers) GetManager(id uint32) (GeneralManager, bool) {
// just read locks and reuturns
m.RLock()
defer m.RUnlock()
man, exists := m.Directory[id]
if !exists {
return nil, exists
}
return man.(GeneralManager), exists
}
func NewCoordinatorType(clientType string, err chan error) (ManagerInterface, error) {
m := make(map[uint32]interface{})
if clientType == "reactor" {
c := &reactorCoordinator{}
//m := make(map[uint32]*ReactorManager)
c.Managers = &Managers{Directory: m}
return c, nil
} else if clientType == "tui" {
c := &tuiCoordinator{}
//m := make(map[uint32]*TUIManager)
c.Managers = &Managers{Directory: m}
return c, nil
}
return &reactorCoordinator{}, errors.New("Unrecognized client type")
}
// creating sub coordinators for associated gRPC handlers
// reactor coordinator
type reactorCoordinator struct {
*Managers
pb.UnimplementedMonitoringServer
}
func (r *reactorCoordinator) Start() {
logging.Debug(logging.DStart, "RCO 01 Starting!")
}
func (r *reactorCoordinator) NewManager(cl *Client, sys *SystemViewer, err chan error) GeneralManager {
logging.Debug(logging.DClient, "RCO 01 starting manager for %v client %v", cl.Type, cl.Id)
return NewReactorManager(cl, sys, err)
}
func (r *reactorCoordinator) Register() {
conf := LoadConfig()
port, err := conf.GetPort("reactor")
if err != nil {
panic(err)
}
lis, err := net.Listen("tcp", fmt.Sprintf(":%v", port))
if err != nil {
panic(err)
}
grpcServer := grpc.NewServer()
pb.RegisterMonitoringServer(grpcServer, r)
go grpcServer.Serve(lis)
logging.Debug(logging.DClient, "RCO ready for client requests")
}
func (r *reactorCoordinator) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) {
m, exists := r.GetManager(req.GetId())
if !exists {
return &pb.ReactorStatusResponse{}, errors.New("Manager doesn't exists for that client")
}
rm, ok := m.(*ReactorManager)
if !ok {
return &pb.ReactorStatusResponse{}, errors.New("Manager is not a reactor manager!")
}
return rm.ReactorStatusHandler(ctx, req)
}
//tui coordinator
type tuiCoordinator struct {
*Managers // by embedding general struct we allow coordinator to still call general funcs
pb.UnimplementedManagementServer
}
func (t *tuiCoordinator) Start() {
logging.Debug(logging.DStart, "TCO 01 Starting!")
}
func (t *tuiCoordinator) NewManager(cl *Client, sys *SystemViewer, err chan error) GeneralManager {
logging.Debug(logging.DClient, "TCO 01 starting manager for %v client %v", cl.Type, cl.Id)
return NewTUIManager(cl, sys, err)
}
func (t *tuiCoordinator) Register() {
conf := LoadConfig()
port, err := conf.GetPort("tui")
if err != nil {
panic(err)
}
lis, err := net.Listen("tcp", fmt.Sprintf(":%v", port))
if err != nil {
// rip
}
grpcServer := grpc.NewServer()
pb.RegisterManagementServer(grpcServer, t)
go grpcServer.Serve(lis)
logging.Debug(logging.DClient, "TCO ready for client requests")
}
func (t *tuiCoordinator) GetDevices(ctx context.Context, req *pb.GetDevicesRequest) (*pb.GetDevicesResponse, error) {
// grpc handler to fwd to manager
m, exists := t.GetManager(req.GetClientId())
if !exists {
// doesnt exist for some reason
return &pb.GetDevicesResponse{}, errors.New("Manager doesn't exists for client")
}
tm, ok := m.(*TUIManager)
if !ok {
return &pb.GetDevicesResponse{}, errors.New("Manager is not of type TUI")
}
return tm.GetDevices(ctx, req)
}
// unimplemented bs for grpc
func (t *tuiCoordinator) DeleteReactor(ctx context.Context, req *pb.DeleteReactorRequest) (*pb.DeleteReactorResponse, error) {
// TODO
return &pb.DeleteReactorResponse{}, nil
}
func (t *tuiCoordinator) DeleteReactorDevice(ctx context.Context, req *pb.DeleteReactorDeviceRequest) (*pb.DeleteReactorDeviceResponse, error) {
// TODO
return &pb.DeleteReactorDeviceResponse{}, nil
}