diff --git a/internal/pkg/grpc/monitoring.proto b/internal/pkg/grpc/monitoring.proto index b2d9e27..4f05051 100644 --- a/internal/pkg/grpc/monitoring.proto +++ b/internal/pkg/grpc/monitoring.proto @@ -8,17 +8,46 @@ service monitoring { } message ReactorStatusResponse { - uint32 id = 1; + int32 id = 1; + repeated DeviceResponse = 2; + repeated SensorResponse = 3; } +message DeviceResponse { + int32 id = 1; + string name = 2; + map updates = 3; +} + +message SensorResponse { + int32 id = 1; + string name = 2; + int32 sampleRate = 3; +} + message ReactorStatusPing { - uint32 id = 1; - repeated Device devices = 2; + int32 id = 1; + repeated Sensor sensors = 2; + repeated Device devices = 3; +} + +enum Status { + DEAD = 0; + ALIVE = 1; + UNKOWN = 2; +} + +message Sensor { + int32 addr = 1; + string name = 2; + Status status = 3; + int32 sampleRate = 4; + string data = 5; } message Device { int32 addr = 1; - string type = 2; - string status = 3; + string name = 2; + Status status = 3; string data = 4; } diff --git a/internal/pkg/server/coordinator.go b/internal/pkg/server/coordinator.go index e00d70a..9a622fc 100644 --- a/internal/pkg/server/coordinator.go +++ b/internal/pkg/server/coordinator.go @@ -8,58 +8,54 @@ import ( "context" "errors" "fmt" + "net" "sync" "github.com/spf13/viper" + "google.golang.org/grpc" ) -// this package creates the central coordiantor and sub coordiantors for clients +// this package creates the central coordiantor and sub coordiantors to route clients // db client interface -type DB interface { +type Database interface { // getters (all create if doesnt exist) GetReactorClient(int) (string, string, string, string, error) // returns (url, org, bucket, token, err) } -func NewDBAdmin(config *viper.Viper) (DB, error) { +func NewDatabaseAdmin(config *viper.Viper) (Database, error) { return influxdb.NewDBAdmin(config) } type CentralCoordinator struct { // main coordinator ClientConnections *ClientPacket - *SubCoordinators - //*SystemViewer - DB + *ReactorCoordinator + Database Config *viper.Viper // from config Ports map[string]int `mapstructure:"ports"` Err chan error } -type SubCoordinators struct { - Directory map[string]*SubCoordinator - sync.Mutex -} - func NewCentralCoordinator(config *viper.Viper, ch chan error) *CentralCoordinator { // create a central coordinator to manage requests - db, err := NewDBAdmin(config) + db, err := NewDatabaseAdmin(config) if err != nil { ch <- err } - c := &CentralCoordinator{Err: ch, Config: config, DB: db} + + rc, err := NewReactorCoordinator(ch) + if err != nil { + ch <- err + } + config.UnmarshalKey("server.ports", rc) // get reactor port + c := &CentralCoordinator{Err: ch, Config: config, Database: db, ReactorCoordinator: rc} // grab config settings if err = config.UnmarshalKey("server", c); err != nil { ch <- err } - // spawn a systemviewer DECOMIS - //c.SystemViewer = NewSystemViewer() - //.go c.SystemViewer.Start() - // subcoord map - s := make(map[string]*SubCoordinator) - c.SubCoordinators = &SubCoordinators{Directory: s} - // return init coordinator + return c } @@ -67,250 +63,126 @@ func (c *CentralCoordinator) Start() { // starts up associated funcs clientChan := make(chan *ClientPacket) l := NewListener(clientChan, c.Err) - // grabs lis port c.Config.UnmarshalKey("server.ports", l) - // starts client listener routines - go l.Start() + + // starting reactor coordinator + if err := c.ReactorCoordinator.Start(); err != nil { + c.Err <- err + } + // starting listener + if err := l.Start(); err != nil { + c.Err <- err + } + // lastly start client listener go c.ClientListener(clientChan) } func (c *CentralCoordinator) ClientListener(ch chan *ClientPacket) { for client := range ch { // basically loops until channel is closed - fmt.Printf("Incoming client: +%v\n", client) client.Response <- c.ClientHandler(client.Client) // respond with cred } } func (c *CentralCoordinator) ClientHandler(cl *Client) *ClientResponse { - // look for sub coord - c.SubCoordinators.Lock() - - subcoord, ok := c.SubCoordinators.Directory[cl.Type] - if !ok { - // Sub Coordinator does not exists, creating - fmt.Printf("Cl type: %s, Port: %d\n", cl.Type, c.Ports[cl.Type]) - subcoord = NewSubCoordinator(cl.Type, c.Ports[cl.Type], c.Err) - c.SubCoordinators.Directory[cl.Type] = subcoord - fmt.Printf("Creating subcord for %s on %d\n", cl.Type, c.Ports[cl.Type]) - logging.Debug(logging.DSpawn, "CC0 01 Created %v Coordinator", cl.Type) + // returns reactor db info + var err error + cr := &ClientResponse{Port: c.Ports[cl.Type]} + + if cl.Type == "reactor" { + // get reactor info + go c.ReactorCoordinator.ClientHandler(cl) + // db info + cr.URL, cr.Org, cr.Token, cr.Bucket, err = c.Database.GetReactorClient(cl.Id) + } else { + // throw error + err = errors.New(fmt.Sprintf("Client type %s not recognized!")) } - // unlocking - c.SubCoordinators.Unlock() - - // starts sub coord with client credentials - fmt.Printf("Starting subcoord client handler\n") - go subcoord.ClientHandler(cl) - - fmt.Printf("Getting db info\n") - // setting up client response - url, org, token, bucket, err := c.DB.GetReactorClient(cl.Id) - fmt.Printf("Got URL: %s, Org: %s, Token: %s, Bucket: %b\n", url, org, token, bucket) + // returns based on cl type if err != nil { c.Err <- err } - - // returning info - return &ClientResponse{URL: url, Org: org, Token: token, Bucket: bucket, Port: c.Ports[cl.Type]} + return cr } -type ManagerInterface interface { - Start() - NewManager(*Client, chan error) GeneralManager - GetManager(int) (GeneralManager, bool) - AddManager(int, GeneralManager) - Register() +type ReactorCoordinator struct { + Port int `mapstructure:"reactor"` + *ReactorManagers + Err chan error + pb.UnimplementedMonitoringServer } -type GeneralManager interface { - // used by sub coordinator to interact with manager - Start() - UpdateClient(*Client) +type ReactorManagers struct { + Directory map[int]*ReactorManager + sync.RWMutex } -type SubCoordinator struct { - Port int // port that we set up gRPC endpoint on - ManagerInterface // embed an interface to create/manager managers - //*SystemViewer - Err chan error +func NewReactorCoordinator(errCh chan error) (*ReactorCoordinator, error) { + rmap := make(map[int]*ReactorManager) + rm := &ReactorManagers{Directory: rmap} + c := &ReactorCoordinator{Err: errCh, ReactorManagers: rm} + return c, nil } -type Managers struct { - Directory map[int]interface{} // support for either manager - sync.RWMutex // potential perf +func (c *ReactorCoordinator) Start() error { + logging.Debug(logging.DStart, "RCO 01 Starting!") + // register grpc service + return c.Register() } -// interface stuff -func NewSubCoordinator(clientType string, port int, errCh chan error) *SubCoordinator { - c := &SubCoordinator{Err: errCh} - //c.SystemViewer = sys - man, err := NewCoordinatorType(clientType, errCh) - if err != nil { - errCh <- err +func (c *ReactorCoordinator) ClientHandler(cl *Client) { + // updates clients if nessecary + if err := c.UpdateManager(cl, c.Err); err != nil { + c.Err <- err } - c.ManagerInterface = man - go man.Start() - go man.Register() - return c } -func (c *SubCoordinator) ClientHandler(cl *Client) { - // (creates and) notifies manager of client connection - c.UpdateManager(cl) -} +func (m *ReactorManagers) GetManager(id int) (*ReactorManager, error) { + m.RLock() + defer m.RUnlock() -func (c *SubCoordinator) UpdateManager(cl *Client) { - // shouldn't happen all that often so should be fine to lock - fmt.Printf("Grabbing Manager\n") - m, exists := c.GetManager(cl.Id) + rm, exists := m.Directory[id] if !exists { - fmt.Printf("Creating Manager\n") - m = c.NewManager(cl, c.Err) - m.UpdateClient(cl) - go c.AddManager(cl.Id, m) - go m.Start() + return &ReactorManager{}, errors.New(fmt.Sprintf("No manager for reactor %d!", id)) } - go m.UpdateClient(cl) -} - -func (m *Managers) AddManager(id int, man GeneralManager) { - m.Lock() - defer m.Unlock() - m.Directory[id] = man + return rm, nil } -func (m *Managers) GetManager(id int) (GeneralManager, bool) { - // just read locks and reuturns +func (m *ReactorManagers) UpdateManager(cl *Client, errCh chan error) error { + // locking m.RLock() defer m.RUnlock() - man, exists := m.Directory[id] + + rm, exists := m.Directory[cl.Id] if !exists { - return nil, exists + logging.Debug(logging.DClient, "RCO 01 starting manager for reactor client %v", cl.Id) + rm = NewReactorManager(errCh) + if err := rm.Start(); err != nil { + return err + } + m.Directory[cl.Id] = rm } - return man.(GeneralManager), exists + return rm.UpdateClient(cl) } -func NewCoordinatorType(clientType string, err chan error) (ManagerInterface, error) { - - m := make(map[int]interface{}) - if clientType == "reactor" { - c := &reactorCoordinator{} - //m := make(map[uint32]*ReactorManager) - c.Managers = &Managers{Directory: m} - return c, nil - } else if clientType == "tui" { - c := &reactorCoordinator{} - //m := make(map[uint32]*TUIManager) - //c.Managers = &Managers{Directory: m} - return c, errors.New(fmt.Sprint("error, TUI Not impl")) +func (r *ReactorCoordinator) Register() error { + lis, err := net.Listen("tcp", fmt.Sprintf(":%v", r.Port)) + if err != nil { + return err } - return &reactorCoordinator{}, errors.New("Unrecognized client type") + grpcServer := grpc.NewServer() + pb.RegisterMonitoringServer(grpcServer, r) + go grpcServer.Serve(lis) + logging.Debug(logging.DClient, "RCO ready for client requests") + return nil } -// creating sub coordinators for associated gRPC handlers -// reactor coordinator -type reactorCoordinator struct { - *Managers - pb.UnimplementedMonitoringServer -} - -func (r *reactorCoordinator) Start() { - logging.Debug(logging.DStart, "RCO 01 Starting!") -} - -func (r *reactorCoordinator) NewManager(cl *Client, err chan error) GeneralManager { - logging.Debug(logging.DClient, "RCO 01 starting manager for %v client %v", cl.Type, cl.Id) - return NewReactorManager(cl, err) -} - -func (r *reactorCoordinator) Register() { - //conf := LoadConfig() - /* - port, err := conf.GetPort("reactor") - if err != nil { - panic(err) - } - lis, err := net.Listen("tcp", fmt.Sprintf(":%v", port)) - if err != nil { - panic(err) - } - grpcServer := grpc.NewServer() - pb.RegisterMonitoringServer(grpcServer, r) - go grpcServer.Serve(lis) - logging.Debug(logging.DClient, "RCO ready for client requests") - */ -} - -func (r *reactorCoordinator) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) { - m, exists := r.GetManager(int(req.GetId())) - if !exists { - return &pb.ReactorStatusResponse{}, errors.New("Manager doesn't exists for that client") - } - rm, ok := m.(*ReactorManager) - if !ok { - return &pb.ReactorStatusResponse{}, errors.New("Manager is not a reactor manager!") +func (r *ReactorCoordinator) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) { + rm, err := r.GetManager(int(req.GetId())) + // error checking + if err != nil { + return &pb.ReactorStatusResponse{}, err } return rm.ReactorStatusHandler(ctx, req) } - -// tui coordinator -/* -type tuiCoordinator struct { - *Managers // by embedding general struct we allow coordinator to still call general funcs - pb.UnimplementedManagementServer -} - -func (t *tuiCoordinator) Start() { - logging.Debug(logging.DStart, "TCO 01 Starting!") -} - -func (t *tuiCoordinator) NewManager(cl *Client, err chan error) GeneralManager { - logging.Debug(logging.DClient, "TCO 01 starting manager for %v client %v", cl.Type, cl.Id) - return NewTUIManager(cl, err) -} - -func (t *tuiCoordinator) Register() { - /* - conf := LoadConfig() - port, err := conf.GetPort("tui") - if err != nil { - panic(err) - } - lis, err := net.Listen("tcp", fmt.Sprintf(":%v", port)) - if err != nil { - // rip - } - grpcServer := grpc.NewServer() - pb.RegisterManagementServer(grpcServer, t) - go grpcServer.Serve(lis) - logging.Debug(logging.DClient, "TCO ready for client requests") -*/ -/* -} - -func (t *tuiCoordinator) GetDevices(ctx context.Context, req *pb.GetDevicesRequest) (*pb.GetDevicesResponse, error) { - // grpc handler to fwd to manager - m, exists := t.GetManager(int(req.GetClientId())) - if !exists { - // doesnt exist for some reason - return &pb.GetDevicesResponse{}, errors.New("Manager doesn't exists for client") - } - tm, ok := m.(*TUIManager) - if !ok { - return &pb.GetDevicesResponse{}, errors.New("Manager is not of type TUI") - } - return tm.GetDevices(ctx, req) -} - -// unimplemented bs for grpc -func (t *tuiCoordinator) DeleteReactor(ctx context.Context, req *pb.DeleteReactorRequest) (*pb.DeleteReactorResponse, error) { - // TODO - return &pb.DeleteReactorResponse{}, nil -} - -func (t *tuiCoordinator) DeleteReactorDevice(ctx context.Context, req *pb.DeleteReactorDeviceRequest) (*pb.DeleteReactorDeviceResponse, error) { - // TODO - return &pb.DeleteReactorDeviceResponse{}, nil -} -*/ diff --git a/internal/pkg/server/listener.go b/internal/pkg/server/listener.go index 4223b54..3e1fde9 100644 --- a/internal/pkg/server/listener.go +++ b/internal/pkg/server/listener.go @@ -49,12 +49,10 @@ func NewListener(cch chan *ClientPacket, ech chan error) *Listener { return l } -func (l *Listener) Start() { +func (l *Listener) Start() error { // start grpc server and implement reciever - if err := l.Register(); err != nil { - l.Err <- err - } logging.Debug(logging.DStart, "LIS 01 Started client listener") + return l.Register() } func (l *Listener) Register() error { diff --git a/internal/pkg/server/manager.go b/internal/pkg/server/manager.go index 793a103..a4d3930 100644 --- a/internal/pkg/server/manager.go +++ b/internal/pkg/server/manager.go @@ -33,11 +33,12 @@ func NewManager(err chan error) *Manager { return m } -func (m *Manager) Start() { +func (m *Manager) Start() error { if !m.Activate() { // manager already running - m.Err <- errors.New("Manager already running!") + return errors.New("Manager already running!") } // if we get here, manager is atomically activated and we can ensure start wont run again + return nil } func (m *Manager) Exit() { @@ -47,9 +48,10 @@ func (m *Manager) Exit() { } } -func (m *Manager) UpdateClient(cl *Client) { +func (m *Manager) UpdateClient(cl *Client) error { logging.Debug(logging.DClient, "MAN Updating client %v", cl.Id) m.Client = cl + return nil } // reactor manager atomic operations diff --git a/internal/pkg/server/reactormanager.go b/internal/pkg/server/reactormanager.go index 9d026cd..d3165e0 100644 --- a/internal/pkg/server/reactormanager.go +++ b/internal/pkg/server/reactormanager.go @@ -4,14 +4,8 @@ import ( pb "FRMS/internal/pkg/grpc" "FRMS/internal/pkg/logging" "context" - "fmt" _ "log" "sync" - "time" - - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/status" ) // this package will implement a reactor manager and associated go routines @@ -28,7 +22,7 @@ type devstatus struct { Devs map[int]*DeviceInfo } -func NewReactorManager(c *Client, err chan error) GeneralManager { +func NewReactorManager(err chan error) *ReactorManager { r := &ReactorManager{} di := make(map[int]*DeviceInfo) r.devstatus = &devstatus{Devs: di} @@ -37,14 +31,9 @@ func NewReactorManager(c *Client, err chan error) GeneralManager { return r } -func (r *ReactorManager) Start() { - r.Manager.Start() - logging.Debug(logging.DStart, "RMA %v starting", r.Id) +func (r *ReactorManager) Start() error { + return r.Manager.Start() //go r.StatusMon.Send(&DeviceInfo{Id: r.Id, Type: "Reactor", Status: "[green]ONLINE[white]"}, "Reactor") - //conn := r.Connect() - //empty := &grpc.ClientConn{} - //if conn != empty { - //} } func (r *ReactorManager) Exit() { @@ -62,42 +51,6 @@ func (r *ReactorManager) Exit() { } } -func (r *ReactorManager) Connect() *grpc.ClientConn { - // establish gRPC conection with reactor - // this seems pretty stupid, seems like reactor should communicate up the chain to avoid unnessecary comms. - var opts []grpc.DialOption - var conn *grpc.ClientConn - opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) - - for { - if !r.IsActive() { - logging.Debug(logging.DClient, "RMA %v No longer active, aborting connection attempt", r.Id) - return &grpc.ClientConn{} - } - var err error - conn, err = grpc.Dial(fmt.Sprintf("%v:%v", r.Ip, r.Port), opts...) - // error handling - code := status.Code(err) - if code != 0 { // != OK - if code == (5 | 14) { // unavailable or not found - to := r.Timeout() - if to == 0 { - logging.Debug(logging.DClient, "RMA %v Client not responding", r.Id) - return &grpc.ClientConn{} - } - logging.Debug(logging.DClient, "RMA %v Client currently down, retrying in %v ms", r.Id, to) - time.Sleep(time.Duration(to) * time.Millisecond) - - } else { - logging.Debug(logging.DError, "RMA %v GRPC ERROR: %v", r.Id, code) - r.Err <- err - } - } - break - } - return conn -} - func (r *ReactorManager) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) { // function client will call to update reactor information //go r.PingReset() @@ -108,36 +61,6 @@ func (r *ReactorManager) ReactorStatusHandler(ctx context.Context, req *pb.React return &pb.ReactorStatusResponse{Id: uint32(r.Id)}, nil } -/* - func (r *ReactorManager) Monitor(conn *grpc.ClientConn) { - defer conn.Close() - client := pb.NewMonitoringClient(conn) - for r.IsActive() { - req := &pb.ReactorStatusRequest{Id:r.Id} - resp, err := client.GetReactorStatus(context.Background(),req) - code := status.Code(err) - if code != 0 { // if != OK - logging.Debug(logging.DClient,"RMA %v Reactor not responding! Code: %v\n", r.Id,code) - r.devstatus.Lock() - for _, d := range r.Devs { - newd := d - newd.Status = "[yellow]UNKOWN[white]" - r.Devs[newd.Id] = newd - go r.StatusMon.Send(newd,"Device") - } - r.devstatus.Unlock() - r.Exit() - break; - } - for _,v := range resp.GetDevices() { - d := &DeviceInfo{Id:uint32(v.GetAddr()),Type:v.GetType(),Status:v.GetStatus(),Data:v.GetData()} - go r.UpdateDevice(d) - } - time.Sleep(r.Hb) // time between sensor pings - } - } -*/ - func (r *ReactorManager) UpdateDevice(d *DeviceInfo) { r.devstatus.Lock() defer r.devstatus.Unlock() diff --git a/notes.md b/notes.md index 156a594..69aa898 100644 --- a/notes.md +++ b/notes.md @@ -38,3 +38,23 @@ - I concede, I will just remove flags as most people will never use them anyway and instead rely on env vars and config files. To hell with the flags. - I am ripping out all of the TUI and status manager stuff, its convoluted and harder than just pulling info from database. - I can eventaully rework TUI to pull from DB which is fine, there will never be that many clients anyway and a lot of them are only 1 time calls with refreshes which aren't that slow anyway. +- alright I gutted the tui and system viewer, reworking sub coord to launch at start. That way there is a listener active +- time to boil down to functionality a LOT, right now its clumsy and inefficent, there needs to be a better way to keep everything straight +- Moving the DB responsibilites to the reactor itself seems to be the best way to do it in the short term. Reduce network load and overall keep things efficient. May lead to duplicte copies of data? Not the end of the world, logging system can make sure we are maintaining entries. + +**IDEA** +Reactors log data themselves, Send periodic status updates over grpc to enable monitoring faster than the sample rate +*This could work!* +Outline: +- Reactors reach out to server on boot to get DB info + - compare this against what they have internally to ensure they are up to date and allow for migrations + - Maybe not even save the db info because we don't need to?? +- Reactors also recieve port for their specific manager + - Can be dynamically given out to allow for spread out load +- Reactors then reach out with sensor and device info periodically (5s?) which can be used for live monitoring +- RM responds with any potential updates for the device settings i.e. change pwm duty on web interface, pass on to reactor +- Allows for a live view with current reading as well as historical data at differing interval via grafana. (i.e. 5s live view with 10 min sample interval) + +Need to differentiate sensors vs devices that can be changed +- Sensors have a variable sample rate and eventually name/address +- Devices have more and widley varying parameters, could be pwm with freq/duty/onoff or ph pump with on, time or off etc.