package server import ( "sync" //"fmt" "net" "context" "errors" "FRMS/internal/pkg/logging" "google.golang.org/grpc" pb "FRMS/internal/pkg/grpc" ) // this package creates coordinators responsible for keeping track of active clients and invoking managers type SubCoordinator interface { Start() NewManager(*Client,*SystemViewer, chan error) GeneralManager GetManager(uint32) (GeneralManager, bool) AddManager(uint32, GeneralManager) Register() } type GeneralManager interface { Start() UpdateClient(*Client) ReactorStatusHandler(context.Context,*pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) GetDevices(context.Context, *pb.GetDevicesRequest) (*pb.GetDevicesResponse, error) } type Coordinator struct { Port int // port that we set up gRPC endpoint on //*Managers going to embed this in subcoordinator SubCoordinator *SystemViewer Err chan error } type Managers struct { Directory map[uint32]GeneralManager sync.RWMutex // potential perf } // interface stuff func NewCoordinator(clientType string, sys *SystemViewer, err chan error) *Coordinator { d := make(map[uint32]GeneralManager) m := &Managers{Directory:d} c := &Coordinator{Err:err} c.Port = 2023 sub, errs := NewSubCoordinator(clientType, m, err) if errs != nil { err <-errs } c.SubCoordinator = sub c.SystemViewer = sys //c.Managers = m go c.Register() return c } func (c *Coordinator) Start() { // on start we need to create channel listener // on each new connection we want to check its id against our mapping c.SubCoordinator.Start() } func (c *Coordinator) ClientHandler(cl *Client) int { // (creates and) notifies manager of client connection go c.UpdateManager(cl) return c.Port } func (c *Coordinator) UpdateManager(cl *Client) { // shouldn't happen all that often so should be fine to lock m, exists := c.GetManager(cl.Id) if !exists { m = c.NewManager(cl, c.SystemViewer, c.Err) m.UpdateClient(cl) go c.AddManager(cl.Id, m) go m.Start() } go m.UpdateClient(cl) } func (m *Managers) AddManager(id uint32, man GeneralManager) { m.Lock() defer m.Unlock() m.Directory[id] = man } func (m *Managers) GetManager(id uint32) (GeneralManager, bool) { // just read locks and reuturns m.RLock() defer m.RUnlock() man, exists := m.Directory[id] return man, exists } func NewSubCoordinator(clientType string, m *Managers, err chan error) (SubCoordinator, error) { if clientType == "reactor" { c := &reactorCoordinator{} c.Managers = m return c, nil } else if clientType == "tui" { c := &tuiCoordinator{} c.Managers = m return c, nil } return &reactorCoordinator{}, errors.New("Unrecognized client type") } // creating sub coordinators for associated gRPC handlers // reactor coordinator type reactorCoordinator struct { *Managers pb.UnimplementedMonitoringServer } func (r *reactorCoordinator) Start() { logging.Debug(logging.DStart,"RCO 01 Starting!") } func (r *reactorCoordinator) NewManager(cl *Client, sys *SystemViewer, err chan error) GeneralManager { logging.Debug(logging.DClient, "RCO 01 starting manager for %v client %v",cl.Type,cl.Id) return NewReactorManager(cl,sys,err) } func (r *reactorCoordinator) Register() { lis, err := net.Listen("tcp", ":2023") if err != nil { // rip } grpcServer := grpc.NewServer() pb.RegisterMonitoringServer(grpcServer,r) go grpcServer.Serve(lis) logging.Debug(logging.DClient, "RCO ready for client requests") } func (r *reactorCoordinator) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) { m, exists := r.GetManager(req.GetId()) if !exists { return &pb.ReactorStatusResponse{}, errors.New("Manager doesn't exists for that client") } return m.ReactorStatusHandler(ctx, req) } //tui coordinator type tuiCoordinator struct { *Managers pb.UnimplementedManagementServer } func (t *tuiCoordinator) Start() { logging.Debug(logging.DStart,"TCO 01 Starting!") } func (t *tuiCoordinator) NewManager(cl *Client, sys *SystemViewer, err chan error) GeneralManager { logging.Debug(logging.DClient, "TCO 01 starting manager for %v client %v",cl.Type,cl.Id) return NewTUIManager(cl,sys,err) } func (t *tuiCoordinator) Register() { lis, err := net.Listen("tcp", ":2024") if err != nil { // rip } grpcServer := grpc.NewServer() pb.RegisterManagementServer(grpcServer,t) go grpcServer.Serve(lis) logging.Debug(logging.DClient, "TCO ready for client requests") } func (t *tuiCoordinator) GetDevices(ctx context.Context, req *pb.GetDevicesRequest) (*pb.GetDevicesResponse, error) { // grpc handler to fwd to manager m, exists := t.GetManager(req.GetClientId()) if !exists { // doesnt exist for some reason return &pb.GetDevicesResponse{}, errors.New("Manager doesn't exists for client") } return m.GetDevices(ctx,req) } // unimplemented bs for grpc func (t *tuiCoordinator) DeleteReactor(ctx context.Context, req *pb.DeleteReactorRequest) (*pb.DeleteReactorResponse, error) { // TODO return &pb.DeleteReactorResponse{}, nil } func (t *tuiCoordinator) DeleteReactorDevice(ctx context.Context, req *pb.DeleteReactorDeviceRequest) (*pb.DeleteReactorDeviceResponse, error) { // TODO return &pb.DeleteReactorDeviceResponse{}, nil }