diff --git a/cmd/reactor/main.go b/cmd/reactor/main.go index 8e41609..18170cc 100644 --- a/cmd/reactor/main.go +++ b/cmd/reactor/main.go @@ -10,7 +10,6 @@ import ( "os/signal" - flag "github.com/spf13/pflag" "github.com/spf13/viper" ) @@ -36,19 +35,6 @@ func main() { // load any stored configs conf := NewConfig("reactor") - flag.String("ip", "192.168.100.2", "server ip") - flag.Int("port", 2022, "server port") - flag.String("name", "", "human readable name") - - // bind flags - conf.BindPFlag("reactor_ip", flag.Lookup("ip")) - conf.BindPFlag("reactor_port", flag.Lookup("port")) - conf.BindPFlag("reactor_name", flag.Lookup("name")) - - flag.Parse() - - conf.WriteConfig() - ch := make(chan error) rlc := NewCoordinator(conf, ch) // passing conf and err go rlc.Start() diff --git a/cmd/server/main.go b/cmd/server/main.go index 3cc8386..a067300 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -10,7 +10,6 @@ import ( "FRMS/internal/pkg/server" "os" - flag "github.com/spf13/pflag" "github.com/spf13/viper" ) @@ -35,18 +34,6 @@ func main() { // config file conf := NewConfig("server") - // flags - flag.String("name", "", "human readable name") - flag.Int("lis_port", 2022, "port for listener") - flag.Int("db_port", 2022, "port for database") - - // bind flags - conf.BindPFlag("ports_lis", flag.Lookup("lis_port")) - conf.BindPFlag("ports_db", flag.Lookup("db_port")) - conf.BindPFlag("server_name", flag.Lookup("name")) - - flag.Parse() - errCh := make(chan error) c := NewCoordinator(conf, errCh) diff --git a/internal/pkg/influxdb/client.go b/internal/pkg/influxdb/client.go index 12b615a..c0b56a9 100644 --- a/internal/pkg/influxdb/client.go +++ b/internal/pkg/influxdb/client.go @@ -1,7 +1,6 @@ package influxdb import ( - "errors" _ "fmt" _ "github.com/influxdata/influxdb-client-go/v2" @@ -74,6 +73,9 @@ func (d *DBAdmin) GetReactorClient(id int) (url, bucket, org, token string, err */ url = d.URL org = d.Org - err = errors.New("Unimpl") + token = "" + bucket = "" + //err = errors.New("Unimpl") + err = nil return } diff --git a/internal/pkg/server/coordinator.go b/internal/pkg/server/coordinator.go index 5dadba5..e00d70a 100644 --- a/internal/pkg/server/coordinator.go +++ b/internal/pkg/server/coordinator.go @@ -14,16 +14,11 @@ import ( ) // this package creates the central coordiantor and sub coordiantors for clients -// interfaces // db client interface type DB interface { // getters (all create if doesnt exist) - //GetToken() (string, error) // returns admin token (Creates if it doesnt exist) GetReactorClient(int) (string, string, string, string, error) // returns (url, org, bucket, token, err) - // delete - // DeleteReactorClient(string) error // removes client token but maintains bucket - // PurgeReactorClientData(string) error // perm deletes all assocaited reactor data (token, bucket etc) } func NewDBAdmin(config *viper.Viper) (DB, error) { @@ -31,9 +26,10 @@ func NewDBAdmin(config *viper.Viper) (DB, error) { } type CentralCoordinator struct { + // main coordinator ClientConnections *ClientPacket *SubCoordinators - *SystemViewer + //*SystemViewer DB Config *viper.Viper // from config @@ -47,18 +43,23 @@ type SubCoordinators struct { } func NewCentralCoordinator(config *viper.Viper, ch chan error) *CentralCoordinator { - c := &CentralCoordinator{Err: ch, Config: config} - - if err := config.UnmarshalKey("server", c); err != nil { - // report error + // create a central coordinator to manage requests + db, err := NewDBAdmin(config) + if err != nil { ch <- err } - fmt.Printf("%+v\n", c) - c.SystemViewer = NewSystemViewer() - go c.SystemViewer.Start() + c := &CentralCoordinator{Err: ch, Config: config, DB: db} + // grab config settings + if err = config.UnmarshalKey("server", c); err != nil { + ch <- err + } + // spawn a systemviewer DECOMIS + //c.SystemViewer = NewSystemViewer() + //.go c.SystemViewer.Start() + // subcoord map s := make(map[string]*SubCoordinator) - sub := &SubCoordinators{Directory: s} - c.SubCoordinators = sub + c.SubCoordinators = &SubCoordinators{Directory: s} + // return init coordinator return c } @@ -66,7 +67,10 @@ func (c *CentralCoordinator) Start() { // starts up associated funcs clientChan := make(chan *ClientPacket) l := NewListener(clientChan, c.Err) + + // grabs lis port c.Config.UnmarshalKey("server.ports", l) + // starts client listener routines go l.Start() go c.ClientListener(clientChan) } @@ -74,36 +78,48 @@ func (c *CentralCoordinator) Start() { func (c *CentralCoordinator) ClientListener(ch chan *ClientPacket) { for client := range ch { // basically loops until channel is closed - cr := c.ClientHandler(client.Client) - client.Response <- cr + fmt.Printf("Incoming client: +%v\n", client) + client.Response <- c.ClientHandler(client.Client) // respond with cred } } func (c *CentralCoordinator) ClientHandler(cl *Client) *ClientResponse { + // look for sub coord c.SubCoordinators.Lock() - defer c.SubCoordinators.Unlock() + subcoord, ok := c.SubCoordinators.Directory[cl.Type] if !ok { - // Sub Coordinator does not exists - logging.Debug(logging.DSpawn, "CC0 01 Created %v Coordinator", cl.Type) - subcoord = NewSubCoordinator(cl.Type, c.SystemViewer, c.Err) + // Sub Coordinator does not exists, creating + fmt.Printf("Cl type: %s, Port: %d\n", cl.Type, c.Ports[cl.Type]) + subcoord = NewSubCoordinator(cl.Type, c.Ports[cl.Type], c.Err) c.SubCoordinators.Directory[cl.Type] = subcoord + fmt.Printf("Creating subcord for %s on %d\n", cl.Type, c.Ports[cl.Type]) + logging.Debug(logging.DSpawn, "CC0 01 Created %v Coordinator", cl.Type) } + // unlocking + c.SubCoordinators.Unlock() + + // starts sub coord with client credentials + fmt.Printf("Starting subcoord client handler\n") go subcoord.ClientHandler(cl) + + fmt.Printf("Getting db info\n") // setting up client response - url, org, token, bucket, err := c.DB.GetReactorClient(int(cl.Id)) + url, org, token, bucket, err := c.DB.GetReactorClient(cl.Id) + fmt.Printf("Got URL: %s, Org: %s, Token: %s, Bucket: %b\n", url, org, token, bucket) if err != nil { c.Err <- err } - cr := &ClientResponse{URL: url, Org: org, Token: token, Bucket: bucket, Port: c.Ports[cl.Type]} - return cr + + // returning info + return &ClientResponse{URL: url, Org: org, Token: token, Bucket: bucket, Port: c.Ports[cl.Type]} } type ManagerInterface interface { Start() - NewManager(*Client, *SystemViewer, chan error) GeneralManager - GetManager(uint32) (GeneralManager, bool) - AddManager(uint32, GeneralManager) + NewManager(*Client, chan error) GeneralManager + GetManager(int) (GeneralManager, bool) + AddManager(int, GeneralManager) Register() } @@ -116,22 +132,22 @@ type GeneralManager interface { type SubCoordinator struct { Port int // port that we set up gRPC endpoint on ManagerInterface // embed an interface to create/manager managers - *SystemViewer + //*SystemViewer Err chan error } type Managers struct { - Directory map[uint32]interface{} // support for either manager - sync.RWMutex // potential perf + Directory map[int]interface{} // support for either manager + sync.RWMutex // potential perf } // interface stuff -func NewSubCoordinator(clientType string, sys *SystemViewer, err chan error) *SubCoordinator { - c := &SubCoordinator{Err: err} - c.SystemViewer = sys - man, errs := NewCoordinatorType(clientType, err) - if errs != nil { - err <- errs +func NewSubCoordinator(clientType string, port int, errCh chan error) *SubCoordinator { + c := &SubCoordinator{Err: errCh} + //c.SystemViewer = sys + man, err := NewCoordinatorType(clientType, errCh) + if err != nil { + errCh <- err } c.ManagerInterface = man go man.Start() @@ -141,15 +157,16 @@ func NewSubCoordinator(clientType string, sys *SystemViewer, err chan error) *Su func (c *SubCoordinator) ClientHandler(cl *Client) { // (creates and) notifies manager of client connection - c.UpdateManager(cl) } func (c *SubCoordinator) UpdateManager(cl *Client) { // shouldn't happen all that often so should be fine to lock + fmt.Printf("Grabbing Manager\n") m, exists := c.GetManager(cl.Id) if !exists { - m = c.NewManager(cl, c.SystemViewer, c.Err) + fmt.Printf("Creating Manager\n") + m = c.NewManager(cl, c.Err) m.UpdateClient(cl) go c.AddManager(cl.Id, m) go m.Start() @@ -157,13 +174,13 @@ func (c *SubCoordinator) UpdateManager(cl *Client) { go m.UpdateClient(cl) } -func (m *Managers) AddManager(id uint32, man GeneralManager) { +func (m *Managers) AddManager(id int, man GeneralManager) { m.Lock() defer m.Unlock() m.Directory[id] = man } -func (m *Managers) GetManager(id uint32) (GeneralManager, bool) { +func (m *Managers) GetManager(id int) (GeneralManager, bool) { // just read locks and reuturns m.RLock() defer m.RUnlock() @@ -176,17 +193,17 @@ func (m *Managers) GetManager(id uint32) (GeneralManager, bool) { func NewCoordinatorType(clientType string, err chan error) (ManagerInterface, error) { - m := make(map[uint32]interface{}) + m := make(map[int]interface{}) if clientType == "reactor" { c := &reactorCoordinator{} //m := make(map[uint32]*ReactorManager) c.Managers = &Managers{Directory: m} return c, nil } else if clientType == "tui" { - c := &tuiCoordinator{} + c := &reactorCoordinator{} //m := make(map[uint32]*TUIManager) - c.Managers = &Managers{Directory: m} - return c, nil + //c.Managers = &Managers{Directory: m} + return c, errors.New(fmt.Sprint("error, TUI Not impl")) } return &reactorCoordinator{}, errors.New("Unrecognized client type") } @@ -202,9 +219,9 @@ func (r *reactorCoordinator) Start() { logging.Debug(logging.DStart, "RCO 01 Starting!") } -func (r *reactorCoordinator) NewManager(cl *Client, sys *SystemViewer, err chan error) GeneralManager { +func (r *reactorCoordinator) NewManager(cl *Client, err chan error) GeneralManager { logging.Debug(logging.DClient, "RCO 01 starting manager for %v client %v", cl.Type, cl.Id) - return NewReactorManager(cl, sys, err) + return NewReactorManager(cl, err) } func (r *reactorCoordinator) Register() { @@ -226,7 +243,7 @@ func (r *reactorCoordinator) Register() { } func (r *reactorCoordinator) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) { - m, exists := r.GetManager(req.GetId()) + m, exists := r.GetManager(int(req.GetId())) if !exists { return &pb.ReactorStatusResponse{}, errors.New("Manager doesn't exists for that client") } @@ -238,6 +255,7 @@ func (r *reactorCoordinator) ReactorStatusHandler(ctx context.Context, req *pb.R } // tui coordinator +/* type tuiCoordinator struct { *Managers // by embedding general struct we allow coordinator to still call general funcs pb.UnimplementedManagementServer @@ -247,9 +265,9 @@ func (t *tuiCoordinator) Start() { logging.Debug(logging.DStart, "TCO 01 Starting!") } -func (t *tuiCoordinator) NewManager(cl *Client, sys *SystemViewer, err chan error) GeneralManager { +func (t *tuiCoordinator) NewManager(cl *Client, err chan error) GeneralManager { logging.Debug(logging.DClient, "TCO 01 starting manager for %v client %v", cl.Type, cl.Id) - return NewTUIManager(cl, sys, err) + return NewTUIManager(cl, err) } func (t *tuiCoordinator) Register() { @@ -267,12 +285,13 @@ func (t *tuiCoordinator) Register() { pb.RegisterManagementServer(grpcServer, t) go grpcServer.Serve(lis) logging.Debug(logging.DClient, "TCO ready for client requests") - */ +*/ +/* } func (t *tuiCoordinator) GetDevices(ctx context.Context, req *pb.GetDevicesRequest) (*pb.GetDevicesResponse, error) { // grpc handler to fwd to manager - m, exists := t.GetManager(req.GetClientId()) + m, exists := t.GetManager(int(req.GetClientId())) if !exists { // doesnt exist for some reason return &pb.GetDevicesResponse{}, errors.New("Manager doesn't exists for client") @@ -294,3 +313,4 @@ func (t *tuiCoordinator) DeleteReactorDevice(ctx context.Context, req *pb.Delete // TODO return &pb.DeleteReactorDeviceResponse{}, nil } +*/ diff --git a/internal/pkg/server/listener.go b/internal/pkg/server/listener.go index 0daaf8b..4223b54 100644 --- a/internal/pkg/server/listener.go +++ b/internal/pkg/server/listener.go @@ -13,6 +13,7 @@ import ( /* Listens on a supplied port and sends incoming clients over a supplied channel +Waits for a response on that channel to send back to the client with DB credentials */ type Listener struct { // exporting for easy use in the short term @@ -28,10 +29,9 @@ type ClientPacket struct { } type Client struct { - // general client struct to store reqs from reactors/tui Ip string Port int - Id uint32 + Id int Model string Type string } @@ -71,15 +71,16 @@ func (l *Listener) Register() error { } func (l *Listener) ClientDiscoveryHandler(ctx context.Context, ping *pb.ClientRequest) (*pb.ClientResponse, error) { - // incoming reactor ping need to spawn coord - c := &Client{Id: ping.GetClientId(), Type: ping.GetClientType()} + // incoming client ping, notify coord and wait for DB credentials to respond + c := &Client{Id: int(ping.GetClientId()), Type: ping.GetClientType()} logging.Debug(logging.DClient, "LIS %v %v has connected\n", c.Type, c.Id) + // prepare packet to send to coordinator ch := make(chan *ClientResponse) - p := &ClientPacket{Response: ch} - p.Client = c + p := &ClientPacket{Client: c, Response: ch} + // blocking l.ClientConnections <- p resp := <-ch - // return the port for the incoming requests + // prepare object to return to client db := &pb.Database{URL: resp.URL, ORG: resp.Org, Token: resp.Token, Bucket: resp.Bucket} - return &pb.ClientResponse{ClientId: c.Id, ServerPort: uint32(resp.Port), Database: db}, nil + return &pb.ClientResponse{ClientId: uint32(c.Id), ServerPort: uint32(resp.Port), Database: db}, nil } diff --git a/internal/pkg/server/manager.go b/internal/pkg/server/manager.go index a9f2627..793a103 100644 --- a/internal/pkg/server/manager.go +++ b/internal/pkg/server/manager.go @@ -1,108 +1,109 @@ package server import ( - //"log" - "time" - "math" - "sync" - "errors" - _ "context" - "FRMS/internal/pkg/logging" + //"log" + "FRMS/internal/pkg/logging" + _ "context" + "errors" + "math" + "sync" + "time" ) // this package will implement a boilerplate manager // manager connects to client on start and returns the gRPC connection to make gRPC clients type Manager struct { - *Client // gives access to c.Ip c.Id etc - Hb time.Duration // used for managing hb timer for client - Active active - Sig chan bool - Err chan error + *Client // gives access to c.Ip c.Id etc + Hb time.Duration // used for managing hb timer for client + Active active + Sig chan bool + Err chan error } -type active struct{ - sync.Mutex - bool - int +type active struct { + sync.Mutex + bool + int } func NewManager(err chan error) *Manager { - hb := time.Duration(1 * time.Second) //hb to - m := &Manager{Hb:hb,Err:err} - return m + hb := time.Duration(5 * time.Second) //hb to + m := &Manager{Hb: hb, Err: err} + return m } func (m *Manager) Start() { - if !m.Activate() { - // manager already running - m.Err <-errors.New("Manager already running!") - } // if we get here, manager is atomically activated and we can ensure start wont run again + if !m.Activate() { + // manager already running + m.Err <- errors.New("Manager already running!") + } // if we get here, manager is atomically activated and we can ensure start wont run again } func (m *Manager) Exit() { - // exit function to eventually allow saving to configs - if !m.Deactivate() { - m.Err <-errors.New("Manager already disabled!") - } + // exit function to eventually allow saving to configs + if !m.Deactivate() { + m.Err <- errors.New("Manager already disabled!") + } } func (m *Manager) UpdateClient(cl *Client) { - logging.Debug(logging.DClient,"MAN Updating client %v",cl.Id) - m.Client = cl + logging.Debug(logging.DClient, "MAN Updating client %v", cl.Id) + m.Client = cl } // reactor manager atomic operations func (m *Manager) IsActive() bool { - m.Active.Lock() - defer m.Active.Unlock() - return m.Active.bool + m.Active.Lock() + defer m.Active.Unlock() + return m.Active.bool } func (m *Manager) Activate() bool { - // slightly confusing but returns result of trying to activate - m.Active.Lock() - defer m.Active.Unlock() - alive := m.Active.bool - if alive { - return false - } else { - m.Active.bool = true - m.Active.int = 0 - return m.Active.bool - } + // slightly confusing but returns result of trying to activate + m.Active.Lock() + defer m.Active.Unlock() + alive := m.Active.bool + if alive { + return false + } else { + m.Active.bool = true + m.Active.int = 0 + return m.Active.bool + } } func (m *Manager) Deactivate() bool { - // result of trying to deactivate - m.Active.Lock() - defer m.Active.Unlock() - alive := m.Active.bool - if alive { - m.Active.bool = false - return true - } else { - return m.Active.bool - } + // result of trying to deactivate + m.Active.Lock() + defer m.Active.Unlock() + alive := m.Active.bool + if alive { + m.Active.bool = false + return true + } else { + return m.Active.bool + } } // connection stuff func (m *Manager) Timeout() int { - // keeps track of and generates timeout [0-1.2s) over span of ~2.5s - // returns 0 on TO elapse - m.Active.Lock() - defer m.Active.Unlock() - if m.Active.int < 9 { - v := int(5 * math.Pow(float64(2), float64(m.Active.int))) - m.Active.int += 1 - return v - } else { - // exceeded retries - return 0 - } + // keeps track of and generates timeout [0-1.2s) over span of ~2.5s + // returns 0 on TO elapse + m.Active.Lock() + defer m.Active.Unlock() + if m.Active.int < 9 { + v := int(5 * math.Pow(float64(2), float64(m.Active.int))) + m.Active.int += 1 + return v + } else { + // exceeded retries + return 0 + } } + /* shouldnt be nessecary anymore diff --git a/internal/pkg/server/reactormanager.go b/internal/pkg/server/reactormanager.go index 6f967b1..9d026cd 100644 --- a/internal/pkg/server/reactormanager.go +++ b/internal/pkg/server/reactormanager.go @@ -1,149 +1,153 @@ package server import ( - "fmt" - "time" - _ "log" - "context" - "sync" - "FRMS/internal/pkg/logging" - "google.golang.org/grpc" - "google.golang.org/grpc/status" - "google.golang.org/grpc/credentials/insecure" - pb "FRMS/internal/pkg/grpc" + pb "FRMS/internal/pkg/grpc" + "FRMS/internal/pkg/logging" + "context" + "fmt" + _ "log" + "sync" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" ) -// this package will implement a reactor coordinator and associated go routines +// this package will implement a reactor manager and associated go routines type ReactorManager struct { - *Manager - StatusMon *StatusMonitor - *devstatus + *Manager + // StatusMon *StatusMonitor putting on pause + *devstatus } type devstatus struct { - sync.Mutex - Devs map[uint32]*DeviceInfo + // keeping this around but not using it to create status for status mon + sync.Mutex + Devs map[int]*DeviceInfo } -func NewReactorManager(c *Client,sys *SystemViewer,err chan error) GeneralManager { - r := &ReactorManager{} - di := make(map[uint32]*DeviceInfo) - r.devstatus = &devstatus{Devs:di} - r.Manager = NewManager(err) - r.StatusMon = NewStatusMonitor("Reactor",c.Id,sys) - return r +func NewReactorManager(c *Client, err chan error) GeneralManager { + r := &ReactorManager{} + di := make(map[int]*DeviceInfo) + r.devstatus = &devstatus{Devs: di} + r.Manager = NewManager(err) + //r.StatusMon = NewStatusMonitor("Reactor", c.Id, sys) + return r } func (r *ReactorManager) Start() { - r.Manager.Start() - logging.Debug(logging.DStart,"RMA %v starting", r.Id) - go r.StatusMon.Send(&DeviceInfo{Id:r.Id,Type:"Reactor",Status:"[green]ONLINE[white]"},"Reactor") - //conn := r.Connect() - //empty := &grpc.ClientConn{} - //if conn != empty { - //} + r.Manager.Start() + logging.Debug(logging.DStart, "RMA %v starting", r.Id) + //go r.StatusMon.Send(&DeviceInfo{Id: r.Id, Type: "Reactor", Status: "[green]ONLINE[white]"}, "Reactor") + //conn := r.Connect() + //empty := &grpc.ClientConn{} + //if conn != empty { + //} } func (r *ReactorManager) Exit() { - r.Manager.Exit() - logging.Debug(logging.DExit, "RMA %v exiting", r.Id) - go r.StatusMon.Send(&DeviceInfo{Id:r.Id,Type:"Reactor",Status:"[red]OFFLINE[white]",Data:fmt.Sprintf("Last Seen %v",time.Now().Format("Mon at 03:04:05pm MST"))},"Reactor") - r.devstatus.Lock() - defer r.devstatus.Unlock() - for _, d := range r.Devs { - newd := d - newd.Status = "[yellow]UNKOWN[white]" - r.Devs[newd.Id] = newd - go r.StatusMon.Send(newd,"Device") - } + r.Manager.Exit() + logging.Debug(logging.DExit, "RMA %v exiting", r.Id) + //go r.StatusMon.Send(&DeviceInfo{Id: r.Id, Type: "Reactor", Status: "[red]OFFLINE[white]", Data: fmt.Sprintf("Last Seen %v", time.Now().Format("Mon at 03:04:05pm MST"))}, "Reactor") + r.devstatus.Lock() + defer r.devstatus.Unlock() + // keeping this because it **COULD** be useful, maybe + for _, d := range r.Devs { + newd := d + newd.Status = "UNKOWN" + r.Devs[newd.Id] = newd + //go r.StatusMon.Send(newd, "Device") + } } func (r *ReactorManager) Connect() *grpc.ClientConn { - // establish gRPC conection with reactor - var opts []grpc.DialOption - var conn *grpc.ClientConn - opts = append(opts,grpc.WithTransportCredentials(insecure.NewCredentials())) + // establish gRPC conection with reactor + // this seems pretty stupid, seems like reactor should communicate up the chain to avoid unnessecary comms. + var opts []grpc.DialOption + var conn *grpc.ClientConn + opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) - for { - if !r.IsActive() { - logging.Debug(logging.DClient,"RMA %v No longer active, aborting connection attempt",r.Id) - return &grpc.ClientConn{} - } - var err error - conn, err = grpc.Dial(fmt.Sprintf("%v:%v",r.Ip,r.Port),opts...) - // error handling - code := status.Code(err) - if code != 0 { // != OK - if code == (5 | 14) { // unavailable or not found - to := r.Timeout() - if to == 0 { - logging.Debug(logging.DClient,"RMA %v Client not responding",r.Id) - return &grpc.ClientConn{} - } - logging.Debug(logging.DClient,"RMA %v Client currently down, retrying in %v ms",r.Id, to) - time.Sleep(time.Duration(to) * time.Millisecond) + for { + if !r.IsActive() { + logging.Debug(logging.DClient, "RMA %v No longer active, aborting connection attempt", r.Id) + return &grpc.ClientConn{} + } + var err error + conn, err = grpc.Dial(fmt.Sprintf("%v:%v", r.Ip, r.Port), opts...) + // error handling + code := status.Code(err) + if code != 0 { // != OK + if code == (5 | 14) { // unavailable or not found + to := r.Timeout() + if to == 0 { + logging.Debug(logging.DClient, "RMA %v Client not responding", r.Id) + return &grpc.ClientConn{} + } + logging.Debug(logging.DClient, "RMA %v Client currently down, retrying in %v ms", r.Id, to) + time.Sleep(time.Duration(to) * time.Millisecond) - } else { - logging.Debug(logging.DError,"RMA %v GRPC ERROR: %v",r.Id, code) - r.Err <- err - } - } - break; - } - return conn + } else { + logging.Debug(logging.DError, "RMA %v GRPC ERROR: %v", r.Id, code) + r.Err <- err + } + } + break + } + return conn } func (r *ReactorManager) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) { - // function client will call to update reactor information - //go r.PingReset() - for _, dev := range req.GetDevices() { - d := &DeviceInfo{Id:uint32(dev.GetAddr()),Type:dev.GetType(),Status:dev.GetStatus(),Data:dev.GetData()} - go r.UpdateDevice(d) - } - return &pb.ReactorStatusResponse{Id:r.Id}, nil + // function client will call to update reactor information + //go r.PingReset() + for _, dev := range req.GetDevices() { + d := &DeviceInfo{Id: int(dev.GetAddr()), Type: dev.GetType(), Status: dev.GetStatus(), Data: dev.GetData()} + go r.UpdateDevice(d) + } + return &pb.ReactorStatusResponse{Id: uint32(r.Id)}, nil } - /* -func (r *ReactorManager) Monitor(conn *grpc.ClientConn) { - defer conn.Close() - client := pb.NewMonitoringClient(conn) - for r.IsActive() { - req := &pb.ReactorStatusRequest{Id:r.Id} - resp, err := client.GetReactorStatus(context.Background(),req) - code := status.Code(err) - if code != 0 { // if != OK - logging.Debug(logging.DClient,"RMA %v Reactor not responding! Code: %v\n", r.Id,code) - r.devstatus.Lock() - for _, d := range r.Devs { - newd := d - newd.Status = "[yellow]UNKOWN[white]" - r.Devs[newd.Id] = newd - go r.StatusMon.Send(newd,"Device") - } - r.devstatus.Unlock() - r.Exit() - break; - } - for _,v := range resp.GetDevices() { - d := &DeviceInfo{Id:uint32(v.GetAddr()),Type:v.GetType(),Status:v.GetStatus(),Data:v.GetData()} - go r.UpdateDevice(d) - } - time.Sleep(r.Hb) // time between sensor pings - } -} + func (r *ReactorManager) Monitor(conn *grpc.ClientConn) { + defer conn.Close() + client := pb.NewMonitoringClient(conn) + for r.IsActive() { + req := &pb.ReactorStatusRequest{Id:r.Id} + resp, err := client.GetReactorStatus(context.Background(),req) + code := status.Code(err) + if code != 0 { // if != OK + logging.Debug(logging.DClient,"RMA %v Reactor not responding! Code: %v\n", r.Id,code) + r.devstatus.Lock() + for _, d := range r.Devs { + newd := d + newd.Status = "[yellow]UNKOWN[white]" + r.Devs[newd.Id] = newd + go r.StatusMon.Send(newd,"Device") + } + r.devstatus.Unlock() + r.Exit() + break; + } + for _,v := range resp.GetDevices() { + d := &DeviceInfo{Id:uint32(v.GetAddr()),Type:v.GetType(),Status:v.GetStatus(),Data:v.GetData()} + go r.UpdateDevice(d) + } + time.Sleep(r.Hb) // time between sensor pings + } + } */ + func (r *ReactorManager) UpdateDevice(d *DeviceInfo) { - r.devstatus.Lock() - defer r.devstatus.Unlock() - if olddev, ok := r.Devs[d.Id]; !ok { - // new device - r.Devs[d.Id] = d - go r.StatusMon.Send(d,"Device") - } else if olddev.Status != d.Status || olddev.Data != d.Data { - // dev status or data has changed - r.Devs[d.Id] = d - go r.StatusMon.Send(d,"Device") - } + r.devstatus.Lock() + defer r.devstatus.Unlock() + if olddev, ok := r.Devs[d.Id]; !ok { + // new device + r.Devs[d.Id] = d + //go r.StatusMon.Send(d, "Device") + } else if olddev.Status != d.Status || olddev.Data != d.Data { + // dev status or data has changed + r.Devs[d.Id] = d + //go r.StatusMon.Send(d, "Device") + } } diff --git a/internal/pkg/server/system.go b/internal/pkg/server/system.go index 8a0afc8..e30ade3 100644 --- a/internal/pkg/server/system.go +++ b/internal/pkg/server/system.go @@ -1,315 +1,318 @@ package server import ( - "sync" - _ "fmt" - "FRMS/internal/pkg/logging" + _ "fmt" ) + // allows for multiple readers/writers type DeviceInfo struct { - Id uint32 - Type string - Status string - Data string - Index uint32 - TransactionId uint32 + Id int + Type string + Status string + Data string + Index int + TransactionId uint32 } +/* + type StatusMonitor struct { - // allows for embedding into managers - TransactionId chan uint32 // monotonically increases to track outdated reqs - DevChan chan *DeviceInfo // channel for device status - ReactorChan chan *DeviceInfo // channel for reactor status - *SystemViewer - DevBuf *devbuf - sync.Mutex + // allows for embedding into managers + TransactionId chan uint32 // monotonically increases to track outdated reqs + DevChan chan *DeviceInfo // channel for device status + ReactorChan chan *DeviceInfo // channel for reactor status + *SystemViewer + DevBuf *devbuf + sync.Mutex } type devbuf struct { - ReactorId uint32 // reactor we are looking at, if any - Buf map[string]map[uint32]*DeviceInfo // convienent way to store/seperate device data - sync.Mutex -} - -func NewBuffer() map[string]map[uint32]*DeviceInfo { - rbuf := make(map[uint32]*DeviceInfo) - dbuf := make(map[uint32]*DeviceInfo) - sbuf := make(map[string]map[uint32]*DeviceInfo) - sbuf["Reactor"] = rbuf - sbuf["Device"] = dbuf - return sbuf -} - -func NewStatusMonitor(t string, id uint32, sys *SystemViewer) *StatusMonitor { - tid := make(chan uint32) - sm := &StatusMonitor{TransactionId:tid} - sm.SystemViewer = sys - logging.Debug(logging.DClient,"SYS Creating new status monitor") - if t == "Reactor" { - // reactor status monitor - sm.ReactorChan = sys.AddReactorSender() - sm.DevChan = sys.AddDeviceSender(id) - go sm.GenerateIds() - } else { - // tui status monitor - sbuf := NewBuffer() - //sm.ReactorChan, sbuf["Reactor"] = sys.AddListener(id,0) - sm.DevBuf = &devbuf{Buf:sbuf} // makes it easier to work with - go sm.UpdateListener(id,0) - } - return sm + ReactorId int // reactor we are looking at, if any + Buf map[string]map[int]*DeviceInfo // convienent way to store/seperate device data + sync.Mutex +} + +func NewBuffer() map[string]map[int]*DeviceInfo { + rbuf := make(map[int]*DeviceInfo) + dbuf := make(map[int]*DeviceInfo) + sbuf := make(map[string]map[int]*DeviceInfo) + sbuf["Reactor"] = rbuf + sbuf["Device"] = dbuf + return sbuf +} + +func NewStatusMonitor(t string, id int, sys *SystemViewer) *StatusMonitor { + tid := make(chan uint32) + sm := &StatusMonitor{TransactionId: tid} + sm.SystemViewer = sys + logging.Debug(logging.DClient, "SYS Creating new status monitor") + if t == "Reactor" { + // reactor status monitor + sm.ReactorChan = sys.AddReactorSender() + sm.DevChan = sys.AddDeviceSender(id) + go sm.GenerateIds() + } else { + // tui status monitor + sbuf := NewBuffer() + //sm.ReactorChan, sbuf["Reactor"] = sys.AddListener(id,0) + sm.DevBuf = &devbuf{Buf: sbuf} // makes it easier to work with + go sm.UpdateListener(id, 0) + } + return sm } func (s *StatusMonitor) GenerateIds() { - var id uint32 - id = 0 - for { - s.TransactionId <-id - id += 1 - } + var id uint32 + id = 0 + for { + s.TransactionId <- id + id += 1 + } } func (s *StatusMonitor) Send(d *DeviceInfo, dtype string) { - d.TransactionId = <-s.TransactionId - logging.Debug(logging.DClient,"SYS 01 Sending update for: %s (%s)", d.Type, d.Status) - if dtype == "Reactor" { - s.ReactorChan <-d - } else { - s.DevChan <-d - } + d.TransactionId = <-s.TransactionId + logging.Debug(logging.DClient, "SYS 01 Sending update for: %s (%s)", d.Type, d.Status) + if dtype == "Reactor" { + s.ReactorChan <- d + } else { + s.DevChan <- d + } } func (s *StatusMonitor) GetBuffer() []*DeviceInfo { - // also clears buffer - s.DevBuf.Lock() - defer s.DevBuf.Unlock() - res := []*DeviceInfo{} - if len(s.DevBuf.Buf["Reactor"]) != 0 || len(s.DevBuf.Buf["Device"]) != 0 { - logging.Debug(logging.DClient,"Clearing buff %v", s.DevBuf.Buf) - } - for _, devs := range s.DevBuf.Buf { - for _, dev := range devs { - // loops over reactors then devices - res = append(res,dev) - } - } - s.DevBuf.Buf = NewBuffer() // clearing old buffer - return res + // also clears buffer + s.DevBuf.Lock() + defer s.DevBuf.Unlock() + res := []*DeviceInfo{} + if len(s.DevBuf.Buf["Reactor"]) != 0 || len(s.DevBuf.Buf["Device"]) != 0 { + logging.Debug(logging.DClient, "Clearing buff %v", s.DevBuf.Buf) + } + for _, devs := range s.DevBuf.Buf { + for _, dev := range devs { + // loops over reactors then devices + res = append(res, dev) + } + } + s.DevBuf.Buf = NewBuffer() // clearing old buffer + return res } func (s *StatusMonitor) UpdateListener(tid, reactorId uint32) { - s.DevBuf.Lock() - defer s.DevBuf.Unlock() - // clearing proper buffer - if reactorId == 0 { - logging.Debug(logging.DClient,"SYS 01 Adding %v as reactor listener", tid) - s.DevBuf.Buf["Reactor"] = make(map[uint32]*DeviceInfo) - s.ReactorChan, s.DevBuf.Buf["Reactor"] = s.SystemViewer.AddListener(tid, reactorId) - go s.Listen(s.ReactorChan) - } else { - logging.Debug(logging.DClient,"SYS 01 Adding %v as reactor %v listener", tid, reactorId) - s.DevBuf.Buf["Device"] = make(map[uint32]*DeviceInfo) // clearing old devices - if s.DevBuf.ReactorId != reactorId && s.DevBuf.ReactorId != 0{ - go s.SystemViewer.RemoveListener(s.DevBuf.ReactorId, tid) - } - s.DevBuf.ReactorId = reactorId - s.DevChan, s.DevBuf.Buf["Device"] = s.SystemViewer.AddListener(tid, reactorId) - go s.Listen(s.DevChan) - } + s.DevBuf.Lock() + defer s.DevBuf.Unlock() + // clearing proper buffer + if reactorId == 0 { + logging.Debug(logging.DClient, "SYS 01 Adding %v as reactor listener", tid) + s.DevBuf.Buf["Reactor"] = make(map[uint32]*DeviceInfo) + s.ReactorChan, s.DevBuf.Buf["Reactor"] = s.SystemViewer.AddListener(tid, reactorId) + go s.Listen(s.ReactorChan) + } else { + logging.Debug(logging.DClient, "SYS 01 Adding %v as reactor %v listener", tid, reactorId) + s.DevBuf.Buf["Device"] = make(map[uint32]*DeviceInfo) // clearing old devices + if s.DevBuf.ReactorId != reactorId && s.DevBuf.ReactorId != 0 { + go s.SystemViewer.RemoveListener(s.DevBuf.ReactorId, tid) + } + s.DevBuf.ReactorId = reactorId + s.DevChan, s.DevBuf.Buf["Device"] = s.SystemViewer.AddListener(tid, reactorId) + go s.Listen(s.DevChan) + } } func (s *StatusMonitor) UpdateBuffer(d *DeviceInfo, dtype string, ch chan *DeviceInfo) { - s.DevBuf.Lock() - defer s.DevBuf.Unlock() - logging.Debug(logging.DClient,"SYS 01 Dev %v update requested", d) - if dev, exists := s.DevBuf.Buf[dtype][d.Id]; exists { - // already a device in the buffer - if dev.TransactionId > d.TransactionId { - logging.Debug(logging.DClient,"SYS 01 Update Processed. Old: %v, New: %v \n", dev, d) - d = dev // not sure if i can do this lol - } - } - if ch == s.ReactorChan || ch == s.DevChan { - // hacky way to check if the device came from a listener of a current channel - s.DevBuf.Buf[dtype][d.Id] = d - } else { - logging.Debug(logging.DClient,"Dev out of date!") - } + s.DevBuf.Lock() + defer s.DevBuf.Unlock() + logging.Debug(logging.DClient, "SYS 01 Dev %v update requested", d) + if dev, exists := s.DevBuf.Buf[dtype][d.Id]; exists { + // already a device in the buffer + if dev.TransactionId > d.TransactionId { + logging.Debug(logging.DClient, "SYS 01 Update Processed. Old: %v, New: %v \n", dev, d) + d = dev // not sure if i can do this lol + } + } + if ch == s.ReactorChan || ch == s.DevChan { + // hacky way to check if the device came from a listener of a current channel + s.DevBuf.Buf[dtype][d.Id] = d + } else { + logging.Debug(logging.DClient, "Dev out of date!") + } } func (s *StatusMonitor) Listen(ch chan *DeviceInfo) { - for dev := range ch { - if dev.Type == "Reactor" { - go s.UpdateBuffer(dev,"Reactor", ch) - } else { - go s.UpdateBuffer(dev, "Device", ch) - } - } + for dev := range ch { + if dev.Type == "Reactor" { + go s.UpdateBuffer(dev, "Reactor", ch) + } else { + go s.UpdateBuffer(dev, "Device", ch) + } + } } type InfoStream struct { - // base stream for any reactor/device - // NewSender embedds the channel to send updates on - // NewListener will add the statusmon to the list of devs to echo to - Stream chan *DeviceInfo - Layout *syslayout - *listeners + // base stream for any reactor/device + // NewSender embedds the channel to send updates on + // NewListener will add the statusmon to the list of devs to echo to + Stream chan *DeviceInfo + Layout *syslayout + *listeners } type listeners struct { - sync.RWMutex - Listeners map[uint32]*lischan + sync.RWMutex + Listeners map[uint32]*lischan } type lischan struct { - sync.WaitGroup - StatusChan chan *DeviceInfo + sync.WaitGroup + StatusChan chan *DeviceInfo } type syslayout struct { - Devs map[uint32]*DeviceInfo - uint32 //index - sync.RWMutex + Devs map[uint32]*DeviceInfo + uint32 //index + sync.RWMutex } func NewLisChan(ch chan *DeviceInfo) *lischan { - l := &lischan{StatusChan:ch} - return l + l := &lischan{StatusChan: ch} + return l } func NewInfoStream() *InfoStream { - dch := make(chan *DeviceInfo) - s := &InfoStream{Stream:dch} - m := make(map[uint32]*DeviceInfo) - s.Layout = &syslayout{Devs:m} - s.listeners = &listeners{Listeners:make(map[uint32]*lischan)} - return s + dch := make(chan *DeviceInfo) + s := &InfoStream{Stream: dch} + m := make(map[uint32]*DeviceInfo) + s.Layout = &syslayout{Devs: m} + s.listeners = &listeners{Listeners: make(map[uint32]*lischan)} + return s } func (s *InfoStream) Start() { - // consistency - go s.Listen() + // consistency + go s.Listen() } + // goal is to hook every new manager into the reactor status chan func (s *InfoStream) AddSender() chan *DeviceInfo { - return s.Stream + return s.Stream } func (s *InfoStream) Listen() { - for deviceInfo := range s.Stream { - go s.Update(deviceInfo) - } + for deviceInfo := range s.Stream { + go s.Update(deviceInfo) + } } func (s *InfoStream) Update(d *DeviceInfo) { - s.Layout.Lock() - defer s.Layout.Unlock() - if dev, ok := s.Layout.Devs[d.Id]; !ok { - d.Index = s.Layout.uint32 - s.Layout.uint32 += 1 - } else { - d.Index = dev.Index - } - s.Layout.Devs[d.Id] = d - go s.Echo(d) + s.Layout.Lock() + defer s.Layout.Unlock() + if dev, ok := s.Layout.Devs[d.Id]; !ok { + d.Index = s.Layout.uint32 + s.Layout.uint32 += 1 + } else { + d.Index = dev.Index + } + s.Layout.Devs[d.Id] = d + go s.Echo(d) } func (l *listeners) Echo(d *DeviceInfo) { - l.RLock() - defer l.RUnlock() - // read only lock - for _, lis := range l.Listeners { - lis.Add(1) - go func(listener *lischan, dev *DeviceInfo){ - defer listener.Done() - listener.StatusChan <-dev - }(lis,d) - } -} - -func (s *InfoStream) AddListener(id uint32, ch chan *DeviceInfo) map[uint32]*DeviceInfo { - // if i get a memory leak ill eat my shoe - s.listeners.Lock() - defer s.listeners.Unlock() - if _, ok := s.listeners.Listeners[id]; ok { - // exists - go s.RemoveListener(id) - } - s.listeners.Listeners[id] = NewLisChan(ch) - logging.Debug(logging.DClient,"SYS 01 Getting Devices") - //s.Layout.RLock() - //defer s.Layout.RUnlock() - logging.Debug(logging.DClient,"SYS 01 Returning Devices %v", s.Layout.Devs) - return s.Layout.Devs -} - -func (l *listeners) RemoveListener(id uint32) { - l.Lock() - defer l.Unlock() - if lis, ok := l.Listeners[id]; ok { - delete(l.Listeners,id) - go func(ls *lischan){ - ls.Wait() - close(ls.StatusChan) - }(lis) - } + l.RLock() + defer l.RUnlock() + // read only lock + for _, lis := range l.Listeners { + lis.Add(1) + go func(listener *lischan, dev *DeviceInfo) { + defer listener.Done() + listener.StatusChan <- dev + }(lis, d) + } +} + +func (s *InfoStream) AddListener(id int, ch chan *DeviceInfo) map[uint32]*DeviceInfo { + // if i get a memory leak ill eat my shoe + s.listeners.Lock() + defer s.listeners.Unlock() + if _, ok := s.listeners.Listeners[id]; ok { + // exists + go s.RemoveListener(id) + } + s.listeners.Listeners[id] = NewLisChan(ch) + logging.Debug(logging.DClient, "SYS 01 Getting Devices") + //s.Layout.RLock() + //defer s.Layout.RUnlock() + logging.Debug(logging.DClient, "SYS 01 Returning Devices %v", s.Layout.Devs) + return s.Layout.Devs +} + +func (l *listeners) RemoveListener(id int) { + l.Lock() + defer l.Unlock() + if lis, ok := l.Listeners[id]; ok { + delete(l.Listeners, id) + go func(ls *lischan) { + ls.Wait() + close(ls.StatusChan) + }(lis) + } } // status buffer maintaince type SystemViewer struct { - // stores system directory and provide methods to be embedded in managers - ReactorStream *InfoStream // can add itself as a listener to provide methods - DeviceStream *ds + // stores system directory and provide methods to be embedded in managers + ReactorStream *InfoStream // can add itself as a listener to provide methods + DeviceStream *ds } type ds struct { - Reactors map[uint32]*InfoStream //map from reactor id to its device info stream - sync.Mutex + Reactors map[uint32]*InfoStream //map from reactor id to its device info stream + sync.Mutex } func NewSystemViewer() *SystemViewer { - rs := NewInfoStream() - s := &SystemViewer{ReactorStream:rs} - m := make(map[uint32]*InfoStream) - s.DeviceStream = &ds{Reactors:m} - return s + rs := NewInfoStream() + s := &SystemViewer{ReactorStream: rs} + m := make(map[uint32]*InfoStream) + s.DeviceStream = &ds{Reactors: m} + return s } func (s *SystemViewer) Start() { - go s.ReactorStream.Start() + go s.ReactorStream.Start() } func (s *SystemViewer) AddReactorSender() chan *DeviceInfo { - return s.ReactorStream.AddSender() + return s.ReactorStream.AddSender() } func (s *SystemViewer) AddDeviceSender(reactorId uint32) chan *DeviceInfo { - s.DeviceStream.Lock() - defer s.DeviceStream.Unlock() - var ds *InfoStream - var ok bool - if ds, ok = s.DeviceStream.Reactors[reactorId]; !ok { - ds = NewInfoStream() - s.DeviceStream.Reactors[reactorId] = ds - go ds.Start() - } - return ds.AddSender() -} - -func (s *SystemViewer) AddListener(id, rid uint32) (chan *DeviceInfo, map[uint32]*DeviceInfo) { - // returns a listener for that chan - ch := make(chan *DeviceInfo) - if rid != 0 { - return ch, s.DeviceStream.Reactors[rid].AddListener(id, ch) - } else { - return ch, s.ReactorStream.AddListener(id, ch) - } -} - -func (s *SystemViewer) RemoveListener(rid, tid uint32) { - // removes chan for specific tid and rid - s.DeviceStream.Lock() - defer s.DeviceStream.Unlock() - go s.DeviceStream.Reactors[rid].RemoveListener(tid) -} + s.DeviceStream.Lock() + defer s.DeviceStream.Unlock() + var ds *InfoStream + var ok bool + if ds, ok = s.DeviceStream.Reactors[reactorId]; !ok { + ds = NewInfoStream() + s.DeviceStream.Reactors[reactorId] = ds + go ds.Start() + } + return ds.AddSender() +} + +func (s *SystemViewer) AddListener(id, rid int) (chan *DeviceInfo, map[uint32]*DeviceInfo) { + // returns a listener for that chan + ch := make(chan *DeviceInfo) + if rid != 0 { + return ch, s.DeviceStream.Reactors[rid].AddListener(id, ch) + } else { + return ch, s.ReactorStream.AddListener(id, ch) + } +} + +func (s *SystemViewer) RemoveListener(rid, tid int) { + // removes chan for specific tid and rid + s.DeviceStream.Lock() + defer s.DeviceStream.Unlock() + go s.DeviceStream.Reactors[rid].RemoveListener(tid) +} +*/ diff --git a/internal/pkg/server/tuimanager.go b/internal/pkg/server/tuimanager.go index bb37596..9d59b7d 100644 --- a/internal/pkg/server/tuimanager.go +++ b/internal/pkg/server/tuimanager.go @@ -1,5 +1,7 @@ package server +/* + import ( // "fmt" "time" @@ -33,7 +35,7 @@ func NewTUIManager(c *Client, sys *SystemViewer, err chan error) GeneralManager m := NewManager(err) t := &TUIManager{Err: err} alert := make(chan bool) - t.Timeout = &Timeout{Alert:alert,TO:time.Duration(2500*time.Millisecond)} // short time outs are fine because we will just rejoin + t.Timeout = &Timeout{Alert:alert,TO:time.Duration(2500*time.Millisecond)} // short time outs are fine because we will just rejoin t.Manager = m t.StatusMon = NewStatusMonitor("TUI",c.Id,sys) t.Manager.UpdateClient(c) @@ -118,4 +120,4 @@ func (t *TUIManager) DeleteReactorDevice(ctx context.Context, req *pb.DeleteReac // return &pb.DeleteReactorDeviceResponse{}, nil } - +*/ diff --git a/notes.md b/notes.md index 3a46adf..156a594 100644 --- a/notes.md +++ b/notes.md @@ -34,3 +34,7 @@ #### 12/06 TODO - I think I can completely remove the old config way and just pass the viper object directly. I think its not worth the hassle of trying to keep track of a million interfaces +#### 12/07 TODO +- I concede, I will just remove flags as most people will never use them anyway and instead rely on env vars and config files. To hell with the flags. +- I am ripping out all of the TUI and status manager stuff, its convoluted and harder than just pulling info from database. + - I can eventaully rework TUI to pull from DB which is fine, there will never be that many clients anyway and a lot of them are only 1 time calls with refreshes which aren't that slow anyway. diff --git a/notes_old b/old_notes similarity index 100% rename from notes_old rename to old_notes