gutted tui and made it soley focused on reactor management

main
KeeganForelight 2 years ago
parent 62f37b5f80
commit d9493a0820

@ -8,17 +8,46 @@ service monitoring {
}
message ReactorStatusResponse {
uint32 id = 1;
int32 id = 1;
repeated DeviceResponse = 2;
repeated SensorResponse = 3;
}
message DeviceResponse {
int32 id = 1;
string name = 2;
map<string,string> updates = 3;
}
message SensorResponse {
int32 id = 1;
string name = 2;
int32 sampleRate = 3;
}
message ReactorStatusPing {
uint32 id = 1;
repeated Device devices = 2;
int32 id = 1;
repeated Sensor sensors = 2;
repeated Device devices = 3;
}
enum Status {
DEAD = 0;
ALIVE = 1;
UNKOWN = 2;
}
message Sensor {
int32 addr = 1;
string name = 2;
Status status = 3;
int32 sampleRate = 4;
string data = 5;
}
message Device {
int32 addr = 1;
string type = 2;
string status = 3;
string name = 2;
Status status = 3;
string data = 4;
}

@ -8,58 +8,54 @@ import (
"context"
"errors"
"fmt"
"net"
"sync"
"github.com/spf13/viper"
"google.golang.org/grpc"
)
// this package creates the central coordiantor and sub coordiantors for clients
// this package creates the central coordiantor and sub coordiantors to route clients
// db client interface
type DB interface {
type Database interface {
// getters (all create if doesnt exist)
GetReactorClient(int) (string, string, string, string, error) // returns (url, org, bucket, token, err)
}
func NewDBAdmin(config *viper.Viper) (DB, error) {
func NewDatabaseAdmin(config *viper.Viper) (Database, error) {
return influxdb.NewDBAdmin(config)
}
type CentralCoordinator struct {
// main coordinator
ClientConnections *ClientPacket
*SubCoordinators
//*SystemViewer
DB
*ReactorCoordinator
Database
Config *viper.Viper
// from config
Ports map[string]int `mapstructure:"ports"`
Err chan error
}
type SubCoordinators struct {
Directory map[string]*SubCoordinator
sync.Mutex
}
func NewCentralCoordinator(config *viper.Viper, ch chan error) *CentralCoordinator {
// create a central coordinator to manage requests
db, err := NewDBAdmin(config)
db, err := NewDatabaseAdmin(config)
if err != nil {
ch <- err
}
c := &CentralCoordinator{Err: ch, Config: config, DB: db}
rc, err := NewReactorCoordinator(ch)
if err != nil {
ch <- err
}
config.UnmarshalKey("server.ports", rc) // get reactor port
c := &CentralCoordinator{Err: ch, Config: config, Database: db, ReactorCoordinator: rc}
// grab config settings
if err = config.UnmarshalKey("server", c); err != nil {
ch <- err
}
// spawn a systemviewer DECOMIS
//c.SystemViewer = NewSystemViewer()
//.go c.SystemViewer.Start()
// subcoord map
s := make(map[string]*SubCoordinator)
c.SubCoordinators = &SubCoordinators{Directory: s}
// return init coordinator
return c
}
@ -67,250 +63,126 @@ func (c *CentralCoordinator) Start() {
// starts up associated funcs
clientChan := make(chan *ClientPacket)
l := NewListener(clientChan, c.Err)
// grabs lis port
c.Config.UnmarshalKey("server.ports", l)
// starts client listener routines
go l.Start()
// starting reactor coordinator
if err := c.ReactorCoordinator.Start(); err != nil {
c.Err <- err
}
// starting listener
if err := l.Start(); err != nil {
c.Err <- err
}
// lastly start client listener
go c.ClientListener(clientChan)
}
func (c *CentralCoordinator) ClientListener(ch chan *ClientPacket) {
for client := range ch {
// basically loops until channel is closed
fmt.Printf("Incoming client: +%v\n", client)
client.Response <- c.ClientHandler(client.Client) // respond with cred
}
}
func (c *CentralCoordinator) ClientHandler(cl *Client) *ClientResponse {
// look for sub coord
c.SubCoordinators.Lock()
subcoord, ok := c.SubCoordinators.Directory[cl.Type]
if !ok {
// Sub Coordinator does not exists, creating
fmt.Printf("Cl type: %s, Port: %d\n", cl.Type, c.Ports[cl.Type])
subcoord = NewSubCoordinator(cl.Type, c.Ports[cl.Type], c.Err)
c.SubCoordinators.Directory[cl.Type] = subcoord
fmt.Printf("Creating subcord for %s on %d\n", cl.Type, c.Ports[cl.Type])
logging.Debug(logging.DSpawn, "CC0 01 Created %v Coordinator", cl.Type)
// returns reactor db info
var err error
cr := &ClientResponse{Port: c.Ports[cl.Type]}
if cl.Type == "reactor" {
// get reactor info
go c.ReactorCoordinator.ClientHandler(cl)
// db info
cr.URL, cr.Org, cr.Token, cr.Bucket, err = c.Database.GetReactorClient(cl.Id)
} else {
// throw error
err = errors.New(fmt.Sprintf("Client type %s not recognized!"))
}
// unlocking
c.SubCoordinators.Unlock()
// starts sub coord with client credentials
fmt.Printf("Starting subcoord client handler\n")
go subcoord.ClientHandler(cl)
fmt.Printf("Getting db info\n")
// setting up client response
url, org, token, bucket, err := c.DB.GetReactorClient(cl.Id)
fmt.Printf("Got URL: %s, Org: %s, Token: %s, Bucket: %b\n", url, org, token, bucket)
// returns based on cl type
if err != nil {
c.Err <- err
}
// returning info
return &ClientResponse{URL: url, Org: org, Token: token, Bucket: bucket, Port: c.Ports[cl.Type]}
return cr
}
type ManagerInterface interface {
Start()
NewManager(*Client, chan error) GeneralManager
GetManager(int) (GeneralManager, bool)
AddManager(int, GeneralManager)
Register()
type ReactorCoordinator struct {
Port int `mapstructure:"reactor"`
*ReactorManagers
Err chan error
pb.UnimplementedMonitoringServer
}
type GeneralManager interface {
// used by sub coordinator to interact with manager
Start()
UpdateClient(*Client)
type ReactorManagers struct {
Directory map[int]*ReactorManager
sync.RWMutex
}
type SubCoordinator struct {
Port int // port that we set up gRPC endpoint on
ManagerInterface // embed an interface to create/manager managers
//*SystemViewer
Err chan error
func NewReactorCoordinator(errCh chan error) (*ReactorCoordinator, error) {
rmap := make(map[int]*ReactorManager)
rm := &ReactorManagers{Directory: rmap}
c := &ReactorCoordinator{Err: errCh, ReactorManagers: rm}
return c, nil
}
type Managers struct {
Directory map[int]interface{} // support for either manager
sync.RWMutex // potential perf
func (c *ReactorCoordinator) Start() error {
logging.Debug(logging.DStart, "RCO 01 Starting!")
// register grpc service
return c.Register()
}
// interface stuff
func NewSubCoordinator(clientType string, port int, errCh chan error) *SubCoordinator {
c := &SubCoordinator{Err: errCh}
//c.SystemViewer = sys
man, err := NewCoordinatorType(clientType, errCh)
if err != nil {
errCh <- err
func (c *ReactorCoordinator) ClientHandler(cl *Client) {
// updates clients if nessecary
if err := c.UpdateManager(cl, c.Err); err != nil {
c.Err <- err
}
c.ManagerInterface = man
go man.Start()
go man.Register()
return c
}
func (c *SubCoordinator) ClientHandler(cl *Client) {
// (creates and) notifies manager of client connection
c.UpdateManager(cl)
}
func (m *ReactorManagers) GetManager(id int) (*ReactorManager, error) {
m.RLock()
defer m.RUnlock()
func (c *SubCoordinator) UpdateManager(cl *Client) {
// shouldn't happen all that often so should be fine to lock
fmt.Printf("Grabbing Manager\n")
m, exists := c.GetManager(cl.Id)
rm, exists := m.Directory[id]
if !exists {
fmt.Printf("Creating Manager\n")
m = c.NewManager(cl, c.Err)
m.UpdateClient(cl)
go c.AddManager(cl.Id, m)
go m.Start()
return &ReactorManager{}, errors.New(fmt.Sprintf("No manager for reactor %d!", id))
}
go m.UpdateClient(cl)
}
func (m *Managers) AddManager(id int, man GeneralManager) {
m.Lock()
defer m.Unlock()
m.Directory[id] = man
return rm, nil
}
func (m *Managers) GetManager(id int) (GeneralManager, bool) {
// just read locks and reuturns
func (m *ReactorManagers) UpdateManager(cl *Client, errCh chan error) error {
// locking
m.RLock()
defer m.RUnlock()
man, exists := m.Directory[id]
rm, exists := m.Directory[cl.Id]
if !exists {
return nil, exists
logging.Debug(logging.DClient, "RCO 01 starting manager for reactor client %v", cl.Id)
rm = NewReactorManager(errCh)
if err := rm.Start(); err != nil {
return err
}
m.Directory[cl.Id] = rm
}
return man.(GeneralManager), exists
return rm.UpdateClient(cl)
}
func NewCoordinatorType(clientType string, err chan error) (ManagerInterface, error) {
m := make(map[int]interface{})
if clientType == "reactor" {
c := &reactorCoordinator{}
//m := make(map[uint32]*ReactorManager)
c.Managers = &Managers{Directory: m}
return c, nil
} else if clientType == "tui" {
c := &reactorCoordinator{}
//m := make(map[uint32]*TUIManager)
//c.Managers = &Managers{Directory: m}
return c, errors.New(fmt.Sprint("error, TUI Not impl"))
func (r *ReactorCoordinator) Register() error {
lis, err := net.Listen("tcp", fmt.Sprintf(":%v", r.Port))
if err != nil {
return err
}
return &reactorCoordinator{}, errors.New("Unrecognized client type")
grpcServer := grpc.NewServer()
pb.RegisterMonitoringServer(grpcServer, r)
go grpcServer.Serve(lis)
logging.Debug(logging.DClient, "RCO ready for client requests")
return nil
}
// creating sub coordinators for associated gRPC handlers
// reactor coordinator
type reactorCoordinator struct {
*Managers
pb.UnimplementedMonitoringServer
}
func (r *reactorCoordinator) Start() {
logging.Debug(logging.DStart, "RCO 01 Starting!")
}
func (r *reactorCoordinator) NewManager(cl *Client, err chan error) GeneralManager {
logging.Debug(logging.DClient, "RCO 01 starting manager for %v client %v", cl.Type, cl.Id)
return NewReactorManager(cl, err)
}
func (r *reactorCoordinator) Register() {
//conf := LoadConfig()
/*
port, err := conf.GetPort("reactor")
if err != nil {
panic(err)
}
lis, err := net.Listen("tcp", fmt.Sprintf(":%v", port))
if err != nil {
panic(err)
}
grpcServer := grpc.NewServer()
pb.RegisterMonitoringServer(grpcServer, r)
go grpcServer.Serve(lis)
logging.Debug(logging.DClient, "RCO ready for client requests")
*/
}
func (r *reactorCoordinator) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) {
m, exists := r.GetManager(int(req.GetId()))
if !exists {
return &pb.ReactorStatusResponse{}, errors.New("Manager doesn't exists for that client")
}
rm, ok := m.(*ReactorManager)
if !ok {
return &pb.ReactorStatusResponse{}, errors.New("Manager is not a reactor manager!")
func (r *ReactorCoordinator) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) {
rm, err := r.GetManager(int(req.GetId()))
// error checking
if err != nil {
return &pb.ReactorStatusResponse{}, err
}
return rm.ReactorStatusHandler(ctx, req)
}
// tui coordinator
/*
type tuiCoordinator struct {
*Managers // by embedding general struct we allow coordinator to still call general funcs
pb.UnimplementedManagementServer
}
func (t *tuiCoordinator) Start() {
logging.Debug(logging.DStart, "TCO 01 Starting!")
}
func (t *tuiCoordinator) NewManager(cl *Client, err chan error) GeneralManager {
logging.Debug(logging.DClient, "TCO 01 starting manager for %v client %v", cl.Type, cl.Id)
return NewTUIManager(cl, err)
}
func (t *tuiCoordinator) Register() {
/*
conf := LoadConfig()
port, err := conf.GetPort("tui")
if err != nil {
panic(err)
}
lis, err := net.Listen("tcp", fmt.Sprintf(":%v", port))
if err != nil {
// rip
}
grpcServer := grpc.NewServer()
pb.RegisterManagementServer(grpcServer, t)
go grpcServer.Serve(lis)
logging.Debug(logging.DClient, "TCO ready for client requests")
*/
/*
}
func (t *tuiCoordinator) GetDevices(ctx context.Context, req *pb.GetDevicesRequest) (*pb.GetDevicesResponse, error) {
// grpc handler to fwd to manager
m, exists := t.GetManager(int(req.GetClientId()))
if !exists {
// doesnt exist for some reason
return &pb.GetDevicesResponse{}, errors.New("Manager doesn't exists for client")
}
tm, ok := m.(*TUIManager)
if !ok {
return &pb.GetDevicesResponse{}, errors.New("Manager is not of type TUI")
}
return tm.GetDevices(ctx, req)
}
// unimplemented bs for grpc
func (t *tuiCoordinator) DeleteReactor(ctx context.Context, req *pb.DeleteReactorRequest) (*pb.DeleteReactorResponse, error) {
// TODO
return &pb.DeleteReactorResponse{}, nil
}
func (t *tuiCoordinator) DeleteReactorDevice(ctx context.Context, req *pb.DeleteReactorDeviceRequest) (*pb.DeleteReactorDeviceResponse, error) {
// TODO
return &pb.DeleteReactorDeviceResponse{}, nil
}
*/

@ -49,12 +49,10 @@ func NewListener(cch chan *ClientPacket, ech chan error) *Listener {
return l
}
func (l *Listener) Start() {
func (l *Listener) Start() error {
// start grpc server and implement reciever
if err := l.Register(); err != nil {
l.Err <- err
}
logging.Debug(logging.DStart, "LIS 01 Started client listener")
return l.Register()
}
func (l *Listener) Register() error {

@ -33,11 +33,12 @@ func NewManager(err chan error) *Manager {
return m
}
func (m *Manager) Start() {
func (m *Manager) Start() error {
if !m.Activate() {
// manager already running
m.Err <- errors.New("Manager already running!")
return errors.New("Manager already running!")
} // if we get here, manager is atomically activated and we can ensure start wont run again
return nil
}
func (m *Manager) Exit() {
@ -47,9 +48,10 @@ func (m *Manager) Exit() {
}
}
func (m *Manager) UpdateClient(cl *Client) {
func (m *Manager) UpdateClient(cl *Client) error {
logging.Debug(logging.DClient, "MAN Updating client %v", cl.Id)
m.Client = cl
return nil
}
// reactor manager atomic operations

@ -4,14 +4,8 @@ import (
pb "FRMS/internal/pkg/grpc"
"FRMS/internal/pkg/logging"
"context"
"fmt"
_ "log"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
)
// this package will implement a reactor manager and associated go routines
@ -28,7 +22,7 @@ type devstatus struct {
Devs map[int]*DeviceInfo
}
func NewReactorManager(c *Client, err chan error) GeneralManager {
func NewReactorManager(err chan error) *ReactorManager {
r := &ReactorManager{}
di := make(map[int]*DeviceInfo)
r.devstatus = &devstatus{Devs: di}
@ -37,14 +31,9 @@ func NewReactorManager(c *Client, err chan error) GeneralManager {
return r
}
func (r *ReactorManager) Start() {
r.Manager.Start()
logging.Debug(logging.DStart, "RMA %v starting", r.Id)
func (r *ReactorManager) Start() error {
return r.Manager.Start()
//go r.StatusMon.Send(&DeviceInfo{Id: r.Id, Type: "Reactor", Status: "[green]ONLINE[white]"}, "Reactor")
//conn := r.Connect()
//empty := &grpc.ClientConn{}
//if conn != empty {
//}
}
func (r *ReactorManager) Exit() {
@ -62,42 +51,6 @@ func (r *ReactorManager) Exit() {
}
}
func (r *ReactorManager) Connect() *grpc.ClientConn {
// establish gRPC conection with reactor
// this seems pretty stupid, seems like reactor should communicate up the chain to avoid unnessecary comms.
var opts []grpc.DialOption
var conn *grpc.ClientConn
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
for {
if !r.IsActive() {
logging.Debug(logging.DClient, "RMA %v No longer active, aborting connection attempt", r.Id)
return &grpc.ClientConn{}
}
var err error
conn, err = grpc.Dial(fmt.Sprintf("%v:%v", r.Ip, r.Port), opts...)
// error handling
code := status.Code(err)
if code != 0 { // != OK
if code == (5 | 14) { // unavailable or not found
to := r.Timeout()
if to == 0 {
logging.Debug(logging.DClient, "RMA %v Client not responding", r.Id)
return &grpc.ClientConn{}
}
logging.Debug(logging.DClient, "RMA %v Client currently down, retrying in %v ms", r.Id, to)
time.Sleep(time.Duration(to) * time.Millisecond)
} else {
logging.Debug(logging.DError, "RMA %v GRPC ERROR: %v", r.Id, code)
r.Err <- err
}
}
break
}
return conn
}
func (r *ReactorManager) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) {
// function client will call to update reactor information
//go r.PingReset()
@ -108,36 +61,6 @@ func (r *ReactorManager) ReactorStatusHandler(ctx context.Context, req *pb.React
return &pb.ReactorStatusResponse{Id: uint32(r.Id)}, nil
}
/*
func (r *ReactorManager) Monitor(conn *grpc.ClientConn) {
defer conn.Close()
client := pb.NewMonitoringClient(conn)
for r.IsActive() {
req := &pb.ReactorStatusRequest{Id:r.Id}
resp, err := client.GetReactorStatus(context.Background(),req)
code := status.Code(err)
if code != 0 { // if != OK
logging.Debug(logging.DClient,"RMA %v Reactor not responding! Code: %v\n", r.Id,code)
r.devstatus.Lock()
for _, d := range r.Devs {
newd := d
newd.Status = "[yellow]UNKOWN[white]"
r.Devs[newd.Id] = newd
go r.StatusMon.Send(newd,"Device")
}
r.devstatus.Unlock()
r.Exit()
break;
}
for _,v := range resp.GetDevices() {
d := &DeviceInfo{Id:uint32(v.GetAddr()),Type:v.GetType(),Status:v.GetStatus(),Data:v.GetData()}
go r.UpdateDevice(d)
}
time.Sleep(r.Hb) // time between sensor pings
}
}
*/
func (r *ReactorManager) UpdateDevice(d *DeviceInfo) {
r.devstatus.Lock()
defer r.devstatus.Unlock()

@ -38,3 +38,23 @@
- I concede, I will just remove flags as most people will never use them anyway and instead rely on env vars and config files. To hell with the flags.
- I am ripping out all of the TUI and status manager stuff, its convoluted and harder than just pulling info from database.
- I can eventaully rework TUI to pull from DB which is fine, there will never be that many clients anyway and a lot of them are only 1 time calls with refreshes which aren't that slow anyway.
- alright I gutted the tui and system viewer, reworking sub coord to launch at start. That way there is a listener active
- time to boil down to functionality a LOT, right now its clumsy and inefficent, there needs to be a better way to keep everything straight
- Moving the DB responsibilites to the reactor itself seems to be the best way to do it in the short term. Reduce network load and overall keep things efficient. May lead to duplicte copies of data? Not the end of the world, logging system can make sure we are maintaining entries.
**IDEA**
Reactors log data themselves, Send periodic status updates over grpc to enable monitoring faster than the sample rate
*This could work!*
Outline:
- Reactors reach out to server on boot to get DB info
- compare this against what they have internally to ensure they are up to date and allow for migrations
- Maybe not even save the db info because we don't need to??
- Reactors also recieve port for their specific manager
- Can be dynamically given out to allow for spread out load
- Reactors then reach out with sensor and device info periodically (5s?) which can be used for live monitoring
- RM responds with any potential updates for the device settings i.e. change pwm duty on web interface, pass on to reactor
- Allows for a live view with current reading as well as historical data at differing interval via grafana. (i.e. 5s live view with 10 min sample interval)
Need to differentiate sensors vs devices that can be changed
- Sensors have a variable sample rate and eventually name/address
- Devices have more and widley varying parameters, could be pwm with freq/duty/onoff or ph pump with on, time or off etc.

Loading…
Cancel
Save