diff --git a/cmd/server/main.go b/cmd/server/main.go index 5aa32e7..fa6497d 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -3,63 +3,48 @@ package main import ( _"net/http" _ "net/http/pprof" + "strconv" //"flag" - "log" + //"log" "os" "fmt" "FRMS/internal/pkg/logging" - "FRMS/internal/pkg/config" + //"FRMS/internal/pkg/config" "FRMS/internal/pkg/server" ) -type listener interface { +type coordinator interface { Start() } -func NewListener(ch chan error, port int) listener { - return server.NewListener(ch, port) -} - -type dbconfig interface { - GetUrl() string - GetOrg() string - GetBucket() string - GetToken() string -} - -func ReadConfig() dbconfig { - return config.ReadServerConfig() +func NewCoordinator(port int, ch chan error) coordinator { + return server.NewCentralCoordinator(port, ch) } func main() { // lets get this bread - // all we need to do is call the reactor coordinator and thats it - // removing os flags in favor of env vars + // go func() { // fmt.Println(http.ListenAndServe("localhost:6060",nil)) // }() + ch := make(chan error) - // creating listener - var lport int - //var dbport int - if port := os.Getenv("gRPC_PORT"); port == "" { - lport = 2022 // default docker port + var port int + var err error + + if p := os.Getenv("gRPC_PORT"); p == "" { + port = 2022 // default docker port + } else { + if port, err = strconv.Atoi(p); err != nil { + panic(err) + } } - //if port := os.Getenv("DATABASE_PORT"); port == "" { - //dbport = 8086 - //} - //fmt.Printf("DBPORT %d\n", dbport) - conf := ReadConfig() - fmt.Printf("Found %v %v %v %v\n",conf.GetUrl(),conf.GetBucket(),conf.GetOrg(),conf.GetToken()) - fmt.Printf("Listening on %v\n", lport) - l := NewListener(ch,lport) - //db := os.Getenv("DATABASE_URL") // database url - - go l.Start() + //fmt.Printf("Listening on %v\n", lport) + c := NewCoordinator(port, ch) + go c.Start() + fmt.Println("Server Active!") logging.Debug(logging.DStart, "CCO 01 Server started") - err := <-ch // blocking to wait for any errors and keep alive otherwise - if err != nil { - log.Fatal(err) - } + err = <-ch // blocking to wait for any errors and keep alive otherwise + panic(err) } diff --git a/docker-compose.yml b/docker-compose.yml index 75b43f6..0f96204 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -9,7 +9,7 @@ services: - "2022:2022" - "2023:2023" volumes: - - ./logs:/app/log + - ./logs:/log environment: - LOGTYPE=SERVER - VERBOSE=1 diff --git a/internal/configs/boards.yaml b/internal/configs/boards.yaml index d281607..c20aacf 100644 --- a/internal/configs/boards.yaml +++ b/internal/configs/boards.yaml @@ -1,4 +1,21 @@ -model: raspberrypi - bus: 1 -model: beagleboard - bus: 2 +server: + id: 1000213123 + name: "Rack Server" + db-url: "http://192.168.100.2:3000" + db-token: "" + l-port: 2022 + r-port: 2023 + t-port: 2024 +reactor: + id: 102233 + name: "Beaglebone Black" + i2c-bus: 2 + db-token: "" + db-bucket: "" + id: 220123123 + name: "Raspberry Pi" +tui: + id: 10000 + name: "kdeppe" + db-token: "" + db-bucket: "" diff --git a/internal/pkg/server/coordinator.go b/internal/pkg/server/coordinator.go index 53319e2..dc1a215 100644 --- a/internal/pkg/server/coordinator.go +++ b/internal/pkg/server/coordinator.go @@ -12,8 +12,62 @@ import ( ) -// this package creates coordinators responsible for keeping track of active clients and invoking managers -type SubCoordinator interface { +// this package creates the central coordiantor and sub coordiantors for clients +type CentralCoordinator struct { + ClientConnections *ClientPacket + CLisPort int + *SubCoordinators + *SystemViewer + Err chan error +} + +type SubCoordinators struct { + Directory map [string]*SubCoordinator + sync.Mutex +} + +func NewCentralCoordinator(port int, ch chan error) *CentralCoordinator { + c := &CentralCoordinator{CLisPort: port, Err: ch} + c.SystemViewer = NewSystemViewer() + go c.SystemViewer.Start() + s := make(map[string]*SubCoordinator) + sub := &SubCoordinators{Directory:s} + c.SubCoordinators = sub + return c +} + +func (c *CentralCoordinator) Start() { + // starts up associated funcs + clientChan := make(chan *ClientPacket) + l := NewListener(c.CLisPort,clientChan,c.Err) + go l.Start() + go c.ClientListener(clientChan) + +} + +func (c *CentralCoordinator) ClientListener(ch chan *ClientPacket) { + for client := range ch { + // basically loops until channel is closed + port := c.ClientHandler(client.Client) + client.Port <-port + } +} + +func (c *CentralCoordinator) ClientHandler(cl *Client) int { + c.SubCoordinators.Lock() + defer c.SubCoordinators.Unlock() + subcoord, ok := c.SubCoordinators.Directory[cl.Type] + if !ok { + // Sub Coordinator does not exists + logging.Debug(logging.DSpawn,"CC0 01 Created %v Coordinator",cl.Type) + subcoord = NewSubCoordinator(cl.Type,c.SystemViewer, c.Err) + c.SubCoordinators.Directory[cl.Type] = subcoord + } + go subcoord.ClientHandler(cl) + return subcoord.Port +} + +type ManagerInterface interface { Start() NewManager(*Client,*SystemViewer, chan error) GeneralManager GetManager(uint32) (GeneralManager, bool) @@ -21,56 +75,47 @@ type SubCoordinator interface { Register() } + type GeneralManager interface { + // used by sub coordinator to interact with manager Start() UpdateClient(*Client) - ReactorStatusHandler(context.Context,*pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) - GetDevices(context.Context, *pb.GetDevicesRequest) (*pb.GetDevicesResponse, error) } -type Coordinator struct { +type SubCoordinator struct { Port int // port that we set up gRPC endpoint on - //*Managers going to embed this in subcoordinator - SubCoordinator + ManagerInterface // embed an interface to create/manager managers *SystemViewer Err chan error } type Managers struct { - Directory map[uint32]GeneralManager + Directory map[uint32]interface{} // support for either manager sync.RWMutex // potential perf } // interface stuff -func NewCoordinator(clientType string, sys *SystemViewer, err chan error) *Coordinator { - d := make(map[uint32]GeneralManager) - m := &Managers{Directory:d} - c := &Coordinator{Err:err} - c.Port = 2023 - sub, errs := NewSubCoordinator(clientType, m, err) +func NewSubCoordinator(clientType string, sys *SystemViewer, err chan error) *SubCoordinator { + c := &SubCoordinator{Err:err} + c.SystemViewer = sys + man, port, errs := NewCoordinatorType(clientType, err) if errs != nil { err <-errs } - c.SubCoordinator = sub - c.SystemViewer = sys - //c.Managers = m - go c.Register() + c.Port = port + c.ManagerInterface = man + go man.Start() + go man.Register() return c } -func (c *Coordinator) Start() { - // on start we need to create channel listener - // on each new connection we want to check its id against our mapping - c.SubCoordinator.Start() -} - -func (c *Coordinator) ClientHandler(cl *Client) int { +func (c *SubCoordinator) ClientHandler(cl *Client) int { // (creates and) notifies manager of client connection go c.UpdateManager(cl) return c.Port } -func (c *Coordinator) UpdateManager(cl *Client) { +func (c *SubCoordinator) UpdateManager(cl *Client) { // shouldn't happen all that often so should be fine to lock m, exists := c.GetManager(cl.Id) if !exists { @@ -93,21 +138,27 @@ func (m *Managers) GetManager(id uint32) (GeneralManager, bool) { m.RLock() defer m.RUnlock() man, exists := m.Directory[id] - return man, exists + if !exists { + return nil, exists + } + return man.(GeneralManager), exists } -func NewSubCoordinator(clientType string, m *Managers, err chan error) (SubCoordinator, error) { +func NewCoordinatorType(clientType string, err chan error) (ManagerInterface, int, error) { + m := make(map[uint32]interface{}) if clientType == "reactor" { c := &reactorCoordinator{} - c.Managers = m - return c, nil + //m := make(map[uint32]*ReactorManager) + c.Managers = &Managers{Directory:m} + return c, 2023, nil } else if clientType == "tui" { c := &tuiCoordinator{} - c.Managers = m - return c, nil + //m := make(map[uint32]*TUIManager) + c.Managers = &Managers{Directory:m} + return c, 2024, nil } - return &reactorCoordinator{}, errors.New("Unrecognized client type") + return &reactorCoordinator{}, 0, errors.New("Unrecognized client type") } // creating sub coordinators for associated gRPC handlers @@ -142,12 +193,16 @@ func (r *reactorCoordinator) ReactorStatusHandler(ctx context.Context, req *pb.R if !exists { return &pb.ReactorStatusResponse{}, errors.New("Manager doesn't exists for that client") } - return m.ReactorStatusHandler(ctx, req) + rm, ok := m.(*ReactorManager) + if !ok { + return &pb.ReactorStatusResponse{}, errors.New("Manager is not a reactor manager!") + } + return rm.ReactorStatusHandler(ctx, req) } //tui coordinator type tuiCoordinator struct { - *Managers + *Managers // by embedding general struct we allow coordinator to still call general funcs pb.UnimplementedManagementServer } @@ -178,7 +233,11 @@ func (t *tuiCoordinator) GetDevices(ctx context.Context, req *pb.GetDevicesReque // doesnt exist for some reason return &pb.GetDevicesResponse{}, errors.New("Manager doesn't exists for client") } - return m.GetDevices(ctx,req) + tm, ok := m.(*TUIManager) + if !ok { + return &pb.GetDevicesResponse{}, errors.New("Manager is not of type TUI") + } + return tm.GetDevices(ctx,req) } // unimplemented bs for grpc diff --git a/internal/pkg/server/listener.go b/internal/pkg/server/listener.go index ee58d73..f575404 100644 --- a/internal/pkg/server/listener.go +++ b/internal/pkg/server/listener.go @@ -12,20 +12,21 @@ import ( ) /* -Originally this package served as a client listener to route requests -I am going to repurpose this to serve as a listener for all gRPC requests -should simplify interfaces +Listens on a supplied port and sends incoming clients over a supplied channel */ type Listener struct { // exporting for easy use in the short term Port int - Coordinators map[string]*Coordinator - CLis *grpc.Server - Sys *SystemViewer + ClientConnections chan *ClientPacket Err chan error pb.UnimplementedHandshakeServer } +type ClientPacket struct { + *Client + Port chan int +} + type Client struct { // general client struct to store reqs from reactors/tui Ip string @@ -35,11 +36,8 @@ type Client struct { Type string } -func NewListener(ch chan error, port int) *Listener { - c := make(map[string]*Coordinator) - l := &Listener{Err:ch} - l.Coordinators = c - l.Sys = NewSystemViewer() +func NewListener(port int, cch chan *ClientPacket, ech chan error) *Listener { + l := &Listener{Err:ech,ClientConnections:cch} l.Port = port return l } @@ -49,10 +47,7 @@ func (l *Listener) Start() { if err := l.Register(); err != nil { l.Err <- err } - go l.Sys.Start() - // listener started and grpc handler registered logging.Debug(logging.DStart,"Started client listener on port %v\n",l.Port) - //fmt.Printf("==========================\n PORT: %v\n==========================\n",l.Port) } func (l *Listener) Register() error { @@ -65,23 +60,19 @@ func (l *Listener) Register() error { pb.RegisterHandshakeServer(grpcServer, l) go grpcServer.Serve(lis) logging.Debug(logging.DStart, "LIS Registered on port %v", l.Port) - - //lis, err = net.Listen("tcp", fmt.Sprintf(":%v",l.Port+1)) // either binding to supplied port or binding to docker default - //if err != nil { - //return err - //} - //grpcServer = grpc.NewServer() - //l.CLis = grpcServer - //go grpcServer.Serve(lis) - //logging.Debug(logging.DStart, "LIS Coordinator server registered on port %v", l.Port + 1) - return nil } func (l *Listener) ClientDiscoveryHandler(ctx context.Context, ping *pb.ClientRequest) (*pb.ClientResponse, error) { // incoming reactor ping need to spawn coord c := &Client{Id:ping.GetClientId(),Type:ping.GetClientType()} - logging.Debug(logging.DClient, "%v %v has connected\n",c.Type,c.Id) + logging.Debug(logging.DClient, "LIS %v %v has connected\n",c.Type,c.Id) + ch := make(chan int) + p := &ClientPacket{Port:ch} + p.Client = c + l.ClientConnections <-p + port := <-ch + /* coord, ok := l.Coordinators[c.Type] if !ok { logging.Debug(logging.DSpawn,"CCO 01 Created Coordinator") @@ -90,6 +81,7 @@ func (l *Listener) ClientDiscoveryHandler(ctx context.Context, ping *pb.ClientRe go coord.Start() } port := coord.ClientHandler(c) + */ // return the port for the incoming requests return &pb.ClientResponse{ClientId:c.Id,ServerPort:uint32(port)}, nil } diff --git a/internal/pkg/server/manager.go b/internal/pkg/server/manager.go index 41e092c..a9f2627 100644 --- a/internal/pkg/server/manager.go +++ b/internal/pkg/server/manager.go @@ -6,9 +6,8 @@ import ( "math" "sync" "errors" - "context" + _ "context" "FRMS/internal/pkg/logging" - pb "FRMS/internal/pkg/grpc" // unimplemented base methods ) // this package will implement a boilerplate manager @@ -104,6 +103,8 @@ func (m *Manager) Timeout() int { return 0 } } +/* +shouldnt be nessecary anymore func (m *Manager) GetDevices(ctx context.Context, req *pb.GetDevicesRequest) (*pb.GetDevicesResponse, error) { return &pb.GetDevicesResponse{}, errors.New("Get Devices not implemented!") @@ -112,3 +113,4 @@ func (m *Manager) GetDevices(ctx context.Context, req *pb.GetDevicesRequest) (*p func (m *Manager) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) { return &pb.ReactorStatusResponse{}, errors.New("Reactor Status Handler not implemented!") } +*/ diff --git a/internal/pkg/server/system.go b/internal/pkg/server/system.go index a569bf2..8a0afc8 100644 --- a/internal/pkg/server/system.go +++ b/internal/pkg/server/system.go @@ -44,6 +44,7 @@ func NewStatusMonitor(t string, id uint32, sys *SystemViewer) *StatusMonitor { tid := make(chan uint32) sm := &StatusMonitor{TransactionId:tid} sm.SystemViewer = sys + logging.Debug(logging.DClient,"SYS Creating new status monitor") if t == "Reactor" { // reactor status monitor sm.ReactorChan = sys.AddReactorSender() diff --git a/notes b/notes index 82da71d..9980903 100644 --- a/notes +++ b/notes @@ -876,3 +876,24 @@ Refactoring server code now is there ever a situation where I would need to run this not on docker? - can i just hardcode for docker and then rely on nginx for routing etc? + + +ALRIGHT TIME TO LOCK TF IN +#TODO 8/1 +Time to set up proper config loading and storing + +Doing it all through interfaces and tagged structs + +On start up +Server needs to load up its own config + - take action on that config +wait for client connections + - load client config and reply with associated data +on client disconnect + - store any updates and return to idle state +restructing away from "listener" and coordiantor and stuff +going to just have central server +with an embedded listener +and database and shit +so +SERVER will call NewServer which will take care of subsequents