You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
358 lines
11 KiB
Go
358 lines
11 KiB
Go
package server
|
|
|
|
import (
|
|
"sync"
|
|
"fmt"
|
|
"net"
|
|
"context"
|
|
"errors"
|
|
"FRMS/internal/pkg/logging"
|
|
"google.golang.org/grpc"
|
|
pb "FRMS/internal/pkg/grpc"
|
|
"FRMS/internal/pkg/config"
|
|
_ "FRMS/internal/pkg/influxdb"
|
|
|
|
)
|
|
|
|
// this package creates the central coordiantor and sub coordiantors for clients
|
|
// interfaces
|
|
|
|
// config interface
|
|
func LoadConfig() Config {
|
|
// returns a ServerConfig that we can query and update
|
|
return config.LoadConfig()
|
|
}
|
|
|
|
type Config interface { // PROPOSED RENAMING: ServerConfig to avoid confusion w/ reactor variant
|
|
|
|
// getters
|
|
GetURL() (string, error)
|
|
GetOrg() (string, error)
|
|
GetPort(string) (int, error)
|
|
GetServerToken() (string, error)
|
|
GetReactorClient(uint32) (string, string, error) // ret (bucket, token, err)
|
|
// setters
|
|
// save on write
|
|
UpdateURL(string) error
|
|
UpdateOrg(string) error
|
|
UpdateServerToken(string) error
|
|
UpdateReactorClient(uint32, string, string) error // call (id, bucket, token)
|
|
}
|
|
|
|
// db client interface
|
|
type DB interface{
|
|
// getters (all create if doesnt exist)
|
|
GetToken() (string, error) // returns admin token (Creates if it doesnt exist)
|
|
GetReactorClient(uint32) (string, string, error) // returns (bucket, token, err)
|
|
// delete
|
|
DeleteReactorClient(uint32) error // removes client token but maintains bucket
|
|
PurgeReactorClientData(uint32) error // perm deletes all assocaited reactor data (token, bucket etc)
|
|
}
|
|
|
|
/*func NewDBClient() DBClient {
|
|
return influxdb.NewServerClient()
|
|
}*/
|
|
|
|
|
|
type CentralCoordinator struct {
|
|
ClientConnections *ClientPacket
|
|
//CLisPort int
|
|
*SubCoordinators
|
|
*SystemViewer
|
|
DB
|
|
Config
|
|
Err chan error
|
|
}
|
|
|
|
type SubCoordinators struct {
|
|
Directory map [string]*SubCoordinator
|
|
sync.Mutex
|
|
}
|
|
|
|
func NewCentralCoordinator(ch chan error) *CentralCoordinator {
|
|
c := &CentralCoordinator{Err: ch}
|
|
c.SystemViewer = NewSystemViewer()
|
|
go c.SystemViewer.Start()
|
|
s := make(map[string]*SubCoordinator)
|
|
sub := &SubCoordinators{Directory:s}
|
|
c.SubCoordinators = sub
|
|
return c
|
|
}
|
|
|
|
func (c *CentralCoordinator) Start() {
|
|
// starts up associated funcs
|
|
// begin with config and client
|
|
c.LoadCfg()
|
|
clientChan := make(chan *ClientPacket)
|
|
l := NewListener(clientChan,c.Err)
|
|
go l.Start()
|
|
go c.ClientListener(clientChan)
|
|
|
|
}
|
|
|
|
func (c *CentralCoordinator) LoadCfg() {
|
|
// loads db client info and updates if anything is missing
|
|
c.Config = LoadConfig()
|
|
_, err := c.Config.GetURL()
|
|
if err != nil {
|
|
logging.Debug(logging.DError,"CCO 01 Err: %v", err)
|
|
c.Err <-err
|
|
}
|
|
token, err := c.Config.GetServerToken()
|
|
if err != nil {
|
|
logging.Debug(logging.DError,"CCO 01 Err: %v", err)
|
|
c.Err <-err
|
|
} else if token == "" {
|
|
if token, err = c.DB.GetToken(); err != nil {
|
|
logging.Debug(logging.DError,"CCO 01 Err: %v", err)
|
|
c.Err <-err
|
|
}
|
|
c.Config.UpdateServerToken(token)
|
|
}
|
|
org, err := c.Config.GetOrg()
|
|
if err != nil {
|
|
logging.Debug(logging.DError,"CCO 01 Err: %v", err)
|
|
c.Err <-err
|
|
} else if token == "" {
|
|
if token, err = c.DB.GetToken(); err != nil {
|
|
logging.Debug(logging.DError,"CCO 01 Err: %v", err)
|
|
c.Err <-err
|
|
}
|
|
c.Config.UpdateOrg(org)
|
|
}
|
|
}
|
|
|
|
func (c *CentralCoordinator) ClientListener(ch chan *ClientPacket) {
|
|
for client := range ch {
|
|
// basically loops until channel is closed
|
|
cr := c.ClientHandler(client.Client)
|
|
client.Response <-cr
|
|
}
|
|
}
|
|
|
|
func (c *CentralCoordinator) ClientHandler(cl *Client) *ClientResponse {
|
|
c.SubCoordinators.Lock()
|
|
defer c.SubCoordinators.Unlock()
|
|
subcoord, ok := c.SubCoordinators.Directory[cl.Type]
|
|
if !ok {
|
|
// Sub Coordinator does not exists
|
|
logging.Debug(logging.DSpawn,"CC0 01 Created %v Coordinator",cl.Type)
|
|
subcoord = NewSubCoordinator(cl.Type,c.SystemViewer, c.Err)
|
|
c.SubCoordinators.Directory[cl.Type] = subcoord
|
|
}
|
|
go subcoord.ClientHandler(cl)
|
|
// setting up client response
|
|
var url, org, token, bucket string
|
|
var port int
|
|
var err error
|
|
if url, err = c.Config.GetURL(); err != nil {
|
|
logging.Debug(logging.DError,"Error: %v", err)
|
|
c.Err <-err
|
|
} else if org, err = c.Config.GetOrg(); err != nil {
|
|
logging.Debug(logging.DError,"Error: %v", err)
|
|
c.Err <-err
|
|
} else if bucket, token, err = c.Config.GetReactorClient(cl.Id); err != nil {
|
|
logging.Debug(logging.DError,"Error: %v", err)
|
|
c.Err <-err
|
|
} else if port, err = c.Config.GetPort(cl.Type); err != nil {
|
|
logging.Debug(logging.DError,"Error: %v", err)
|
|
c.Err <-err
|
|
}
|
|
cr := &ClientResponse{URL:url, Org:org, Token:token, Bucket:bucket, Port:port}
|
|
return cr
|
|
}
|
|
|
|
type ManagerInterface interface {
|
|
Start()
|
|
NewManager(*Client,*SystemViewer, chan error) GeneralManager
|
|
GetManager(uint32) (GeneralManager, bool)
|
|
AddManager(uint32, GeneralManager)
|
|
Register()
|
|
}
|
|
|
|
|
|
type GeneralManager interface {
|
|
// used by sub coordinator to interact with manager
|
|
Start()
|
|
UpdateClient(*Client)
|
|
}
|
|
|
|
type SubCoordinator struct {
|
|
Port int // port that we set up gRPC endpoint on
|
|
ManagerInterface // embed an interface to create/manager managers
|
|
*SystemViewer
|
|
Err chan error
|
|
}
|
|
|
|
type Managers struct {
|
|
Directory map[uint32]interface{} // support for either manager
|
|
sync.RWMutex // potential perf
|
|
}
|
|
|
|
// interface stuff
|
|
func NewSubCoordinator(clientType string, sys *SystemViewer, err chan error) *SubCoordinator {
|
|
c := &SubCoordinator{Err:err}
|
|
c.SystemViewer = sys
|
|
man, errs := NewCoordinatorType(clientType, err)
|
|
if errs != nil {
|
|
err <-errs
|
|
}
|
|
c.ManagerInterface = man
|
|
go man.Start()
|
|
go man.Register()
|
|
return c
|
|
}
|
|
|
|
func (c *SubCoordinator) ClientHandler(cl *Client) {
|
|
// (creates and) notifies manager of client connection
|
|
|
|
c.UpdateManager(cl)
|
|
}
|
|
|
|
func (c *SubCoordinator) UpdateManager(cl *Client) {
|
|
// shouldn't happen all that often so should be fine to lock
|
|
m, exists := c.GetManager(cl.Id)
|
|
if !exists {
|
|
m = c.NewManager(cl, c.SystemViewer, c.Err)
|
|
m.UpdateClient(cl)
|
|
go c.AddManager(cl.Id, m)
|
|
go m.Start()
|
|
}
|
|
go m.UpdateClient(cl)
|
|
}
|
|
|
|
func (m *Managers) AddManager(id uint32, man GeneralManager) {
|
|
m.Lock()
|
|
defer m.Unlock()
|
|
m.Directory[id] = man
|
|
}
|
|
|
|
func (m *Managers) GetManager(id uint32) (GeneralManager, bool) {
|
|
// just read locks and reuturns
|
|
m.RLock()
|
|
defer m.RUnlock()
|
|
man, exists := m.Directory[id]
|
|
if !exists {
|
|
return nil, exists
|
|
}
|
|
return man.(GeneralManager), exists
|
|
}
|
|
|
|
func NewCoordinatorType(clientType string, err chan error) (ManagerInterface, error) {
|
|
|
|
m := make(map[uint32]interface{})
|
|
if clientType == "reactor" {
|
|
c := &reactorCoordinator{}
|
|
//m := make(map[uint32]*ReactorManager)
|
|
c.Managers = &Managers{Directory:m}
|
|
return c, nil
|
|
} else if clientType == "tui" {
|
|
c := &tuiCoordinator{}
|
|
//m := make(map[uint32]*TUIManager)
|
|
c.Managers = &Managers{Directory:m}
|
|
return c, nil
|
|
}
|
|
return &reactorCoordinator{}, errors.New("Unrecognized client type")
|
|
}
|
|
|
|
// creating sub coordinators for associated gRPC handlers
|
|
// reactor coordinator
|
|
type reactorCoordinator struct {
|
|
*Managers
|
|
pb.UnimplementedMonitoringServer
|
|
}
|
|
|
|
func (r *reactorCoordinator) Start() {
|
|
logging.Debug(logging.DStart,"RCO 01 Starting!")
|
|
}
|
|
|
|
func (r *reactorCoordinator) NewManager(cl *Client, sys *SystemViewer, err chan error) GeneralManager {
|
|
logging.Debug(logging.DClient, "RCO 01 starting manager for %v client %v",cl.Type,cl.Id)
|
|
return NewReactorManager(cl,sys,err)
|
|
}
|
|
|
|
func (r *reactorCoordinator) Register() {
|
|
conf := LoadConfig()
|
|
port, err := conf.GetPort("reactor")
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
lis, err := net.Listen("tcp", fmt.Sprintf(":%v",port))
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
grpcServer := grpc.NewServer()
|
|
pb.RegisterMonitoringServer(grpcServer,r)
|
|
go grpcServer.Serve(lis)
|
|
logging.Debug(logging.DClient, "RCO ready for client requests")
|
|
}
|
|
|
|
func (r *reactorCoordinator) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) {
|
|
m, exists := r.GetManager(req.GetId())
|
|
if !exists {
|
|
return &pb.ReactorStatusResponse{}, errors.New("Manager doesn't exists for that client")
|
|
}
|
|
rm, ok := m.(*ReactorManager)
|
|
if !ok {
|
|
return &pb.ReactorStatusResponse{}, errors.New("Manager is not a reactor manager!")
|
|
}
|
|
return rm.ReactorStatusHandler(ctx, req)
|
|
}
|
|
|
|
//tui coordinator
|
|
type tuiCoordinator struct {
|
|
*Managers // by embedding general struct we allow coordinator to still call general funcs
|
|
pb.UnimplementedManagementServer
|
|
}
|
|
|
|
func (t *tuiCoordinator) Start() {
|
|
logging.Debug(logging.DStart,"TCO 01 Starting!")
|
|
}
|
|
|
|
func (t *tuiCoordinator) NewManager(cl *Client, sys *SystemViewer, err chan error) GeneralManager {
|
|
logging.Debug(logging.DClient, "TCO 01 starting manager for %v client %v",cl.Type,cl.Id)
|
|
return NewTUIManager(cl,sys,err)
|
|
}
|
|
|
|
func (t *tuiCoordinator) Register() {
|
|
conf := LoadConfig()
|
|
port, err := conf.GetPort("tui")
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
lis, err := net.Listen("tcp", fmt.Sprintf(":%v",port))
|
|
if err != nil {
|
|
// rip
|
|
}
|
|
grpcServer := grpc.NewServer()
|
|
pb.RegisterManagementServer(grpcServer,t)
|
|
go grpcServer.Serve(lis)
|
|
logging.Debug(logging.DClient, "TCO ready for client requests")
|
|
}
|
|
|
|
func (t *tuiCoordinator) GetDevices(ctx context.Context, req *pb.GetDevicesRequest) (*pb.GetDevicesResponse, error) {
|
|
// grpc handler to fwd to manager
|
|
m, exists := t.GetManager(req.GetClientId())
|
|
if !exists {
|
|
// doesnt exist for some reason
|
|
return &pb.GetDevicesResponse{}, errors.New("Manager doesn't exists for client")
|
|
}
|
|
tm, ok := m.(*TUIManager)
|
|
if !ok {
|
|
return &pb.GetDevicesResponse{}, errors.New("Manager is not of type TUI")
|
|
}
|
|
return tm.GetDevices(ctx,req)
|
|
}
|
|
|
|
// unimplemented bs for grpc
|
|
func (t *tuiCoordinator) DeleteReactor(ctx context.Context, req *pb.DeleteReactorRequest) (*pb.DeleteReactorResponse, error) {
|
|
// TODO
|
|
return &pb.DeleteReactorResponse{}, nil
|
|
}
|
|
|
|
func (t *tuiCoordinator) DeleteReactorDevice(ctx context.Context, req *pb.DeleteReactorDeviceRequest) (*pb.DeleteReactorDeviceResponse, error) {
|
|
// TODO
|
|
return &pb.DeleteReactorDeviceResponse{}, nil
|
|
}
|
|
|