stripping all status monitor and TUI stuff out for now, focusing on core reactor impl

main
KeeganForelight 2 years ago
parent ef7bf9d665
commit 62f37b5f80

@ -10,7 +10,6 @@ import (
"os/signal"
flag "github.com/spf13/pflag"
"github.com/spf13/viper"
)
@ -36,19 +35,6 @@ func main() {
// load any stored configs
conf := NewConfig("reactor")
flag.String("ip", "192.168.100.2", "server ip")
flag.Int("port", 2022, "server port")
flag.String("name", "", "human readable name")
// bind flags
conf.BindPFlag("reactor_ip", flag.Lookup("ip"))
conf.BindPFlag("reactor_port", flag.Lookup("port"))
conf.BindPFlag("reactor_name", flag.Lookup("name"))
flag.Parse()
conf.WriteConfig()
ch := make(chan error)
rlc := NewCoordinator(conf, ch) // passing conf and err
go rlc.Start()

@ -10,7 +10,6 @@ import (
"FRMS/internal/pkg/server"
"os"
flag "github.com/spf13/pflag"
"github.com/spf13/viper"
)
@ -35,18 +34,6 @@ func main() {
// config file
conf := NewConfig("server")
// flags
flag.String("name", "", "human readable name")
flag.Int("lis_port", 2022, "port for listener")
flag.Int("db_port", 2022, "port for database")
// bind flags
conf.BindPFlag("ports_lis", flag.Lookup("lis_port"))
conf.BindPFlag("ports_db", flag.Lookup("db_port"))
conf.BindPFlag("server_name", flag.Lookup("name"))
flag.Parse()
errCh := make(chan error)
c := NewCoordinator(conf, errCh)

@ -1,7 +1,6 @@
package influxdb
import (
"errors"
_ "fmt"
_ "github.com/influxdata/influxdb-client-go/v2"
@ -74,6 +73,9 @@ func (d *DBAdmin) GetReactorClient(id int) (url, bucket, org, token string, err
*/
url = d.URL
org = d.Org
err = errors.New("Unimpl")
token = ""
bucket = ""
//err = errors.New("Unimpl")
err = nil
return
}

@ -14,16 +14,11 @@ import (
)
// this package creates the central coordiantor and sub coordiantors for clients
// interfaces
// db client interface
type DB interface {
// getters (all create if doesnt exist)
//GetToken() (string, error) // returns admin token (Creates if it doesnt exist)
GetReactorClient(int) (string, string, string, string, error) // returns (url, org, bucket, token, err)
// delete
// DeleteReactorClient(string) error // removes client token but maintains bucket
// PurgeReactorClientData(string) error // perm deletes all assocaited reactor data (token, bucket etc)
}
func NewDBAdmin(config *viper.Viper) (DB, error) {
@ -31,9 +26,10 @@ func NewDBAdmin(config *viper.Viper) (DB, error) {
}
type CentralCoordinator struct {
// main coordinator
ClientConnections *ClientPacket
*SubCoordinators
*SystemViewer
//*SystemViewer
DB
Config *viper.Viper
// from config
@ -47,18 +43,23 @@ type SubCoordinators struct {
}
func NewCentralCoordinator(config *viper.Viper, ch chan error) *CentralCoordinator {
c := &CentralCoordinator{Err: ch, Config: config}
if err := config.UnmarshalKey("server", c); err != nil {
// report error
// create a central coordinator to manage requests
db, err := NewDBAdmin(config)
if err != nil {
ch <- err
}
fmt.Printf("%+v\n", c)
c.SystemViewer = NewSystemViewer()
go c.SystemViewer.Start()
c := &CentralCoordinator{Err: ch, Config: config, DB: db}
// grab config settings
if err = config.UnmarshalKey("server", c); err != nil {
ch <- err
}
// spawn a systemviewer DECOMIS
//c.SystemViewer = NewSystemViewer()
//.go c.SystemViewer.Start()
// subcoord map
s := make(map[string]*SubCoordinator)
sub := &SubCoordinators{Directory: s}
c.SubCoordinators = sub
c.SubCoordinators = &SubCoordinators{Directory: s}
// return init coordinator
return c
}
@ -66,7 +67,10 @@ func (c *CentralCoordinator) Start() {
// starts up associated funcs
clientChan := make(chan *ClientPacket)
l := NewListener(clientChan, c.Err)
// grabs lis port
c.Config.UnmarshalKey("server.ports", l)
// starts client listener routines
go l.Start()
go c.ClientListener(clientChan)
}
@ -74,36 +78,48 @@ func (c *CentralCoordinator) Start() {
func (c *CentralCoordinator) ClientListener(ch chan *ClientPacket) {
for client := range ch {
// basically loops until channel is closed
cr := c.ClientHandler(client.Client)
client.Response <- cr
fmt.Printf("Incoming client: +%v\n", client)
client.Response <- c.ClientHandler(client.Client) // respond with cred
}
}
func (c *CentralCoordinator) ClientHandler(cl *Client) *ClientResponse {
// look for sub coord
c.SubCoordinators.Lock()
defer c.SubCoordinators.Unlock()
subcoord, ok := c.SubCoordinators.Directory[cl.Type]
if !ok {
// Sub Coordinator does not exists
logging.Debug(logging.DSpawn, "CC0 01 Created %v Coordinator", cl.Type)
subcoord = NewSubCoordinator(cl.Type, c.SystemViewer, c.Err)
// Sub Coordinator does not exists, creating
fmt.Printf("Cl type: %s, Port: %d\n", cl.Type, c.Ports[cl.Type])
subcoord = NewSubCoordinator(cl.Type, c.Ports[cl.Type], c.Err)
c.SubCoordinators.Directory[cl.Type] = subcoord
fmt.Printf("Creating subcord for %s on %d\n", cl.Type, c.Ports[cl.Type])
logging.Debug(logging.DSpawn, "CC0 01 Created %v Coordinator", cl.Type)
}
// unlocking
c.SubCoordinators.Unlock()
// starts sub coord with client credentials
fmt.Printf("Starting subcoord client handler\n")
go subcoord.ClientHandler(cl)
fmt.Printf("Getting db info\n")
// setting up client response
url, org, token, bucket, err := c.DB.GetReactorClient(int(cl.Id))
url, org, token, bucket, err := c.DB.GetReactorClient(cl.Id)
fmt.Printf("Got URL: %s, Org: %s, Token: %s, Bucket: %b\n", url, org, token, bucket)
if err != nil {
c.Err <- err
}
cr := &ClientResponse{URL: url, Org: org, Token: token, Bucket: bucket, Port: c.Ports[cl.Type]}
return cr
// returning info
return &ClientResponse{URL: url, Org: org, Token: token, Bucket: bucket, Port: c.Ports[cl.Type]}
}
type ManagerInterface interface {
Start()
NewManager(*Client, *SystemViewer, chan error) GeneralManager
GetManager(uint32) (GeneralManager, bool)
AddManager(uint32, GeneralManager)
NewManager(*Client, chan error) GeneralManager
GetManager(int) (GeneralManager, bool)
AddManager(int, GeneralManager)
Register()
}
@ -116,22 +132,22 @@ type GeneralManager interface {
type SubCoordinator struct {
Port int // port that we set up gRPC endpoint on
ManagerInterface // embed an interface to create/manager managers
*SystemViewer
//*SystemViewer
Err chan error
}
type Managers struct {
Directory map[uint32]interface{} // support for either manager
sync.RWMutex // potential perf
Directory map[int]interface{} // support for either manager
sync.RWMutex // potential perf
}
// interface stuff
func NewSubCoordinator(clientType string, sys *SystemViewer, err chan error) *SubCoordinator {
c := &SubCoordinator{Err: err}
c.SystemViewer = sys
man, errs := NewCoordinatorType(clientType, err)
if errs != nil {
err <- errs
func NewSubCoordinator(clientType string, port int, errCh chan error) *SubCoordinator {
c := &SubCoordinator{Err: errCh}
//c.SystemViewer = sys
man, err := NewCoordinatorType(clientType, errCh)
if err != nil {
errCh <- err
}
c.ManagerInterface = man
go man.Start()
@ -141,15 +157,16 @@ func NewSubCoordinator(clientType string, sys *SystemViewer, err chan error) *Su
func (c *SubCoordinator) ClientHandler(cl *Client) {
// (creates and) notifies manager of client connection
c.UpdateManager(cl)
}
func (c *SubCoordinator) UpdateManager(cl *Client) {
// shouldn't happen all that often so should be fine to lock
fmt.Printf("Grabbing Manager\n")
m, exists := c.GetManager(cl.Id)
if !exists {
m = c.NewManager(cl, c.SystemViewer, c.Err)
fmt.Printf("Creating Manager\n")
m = c.NewManager(cl, c.Err)
m.UpdateClient(cl)
go c.AddManager(cl.Id, m)
go m.Start()
@ -157,13 +174,13 @@ func (c *SubCoordinator) UpdateManager(cl *Client) {
go m.UpdateClient(cl)
}
func (m *Managers) AddManager(id uint32, man GeneralManager) {
func (m *Managers) AddManager(id int, man GeneralManager) {
m.Lock()
defer m.Unlock()
m.Directory[id] = man
}
func (m *Managers) GetManager(id uint32) (GeneralManager, bool) {
func (m *Managers) GetManager(id int) (GeneralManager, bool) {
// just read locks and reuturns
m.RLock()
defer m.RUnlock()
@ -176,17 +193,17 @@ func (m *Managers) GetManager(id uint32) (GeneralManager, bool) {
func NewCoordinatorType(clientType string, err chan error) (ManagerInterface, error) {
m := make(map[uint32]interface{})
m := make(map[int]interface{})
if clientType == "reactor" {
c := &reactorCoordinator{}
//m := make(map[uint32]*ReactorManager)
c.Managers = &Managers{Directory: m}
return c, nil
} else if clientType == "tui" {
c := &tuiCoordinator{}
c := &reactorCoordinator{}
//m := make(map[uint32]*TUIManager)
c.Managers = &Managers{Directory: m}
return c, nil
//c.Managers = &Managers{Directory: m}
return c, errors.New(fmt.Sprint("error, TUI Not impl"))
}
return &reactorCoordinator{}, errors.New("Unrecognized client type")
}
@ -202,9 +219,9 @@ func (r *reactorCoordinator) Start() {
logging.Debug(logging.DStart, "RCO 01 Starting!")
}
func (r *reactorCoordinator) NewManager(cl *Client, sys *SystemViewer, err chan error) GeneralManager {
func (r *reactorCoordinator) NewManager(cl *Client, err chan error) GeneralManager {
logging.Debug(logging.DClient, "RCO 01 starting manager for %v client %v", cl.Type, cl.Id)
return NewReactorManager(cl, sys, err)
return NewReactorManager(cl, err)
}
func (r *reactorCoordinator) Register() {
@ -226,7 +243,7 @@ func (r *reactorCoordinator) Register() {
}
func (r *reactorCoordinator) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) {
m, exists := r.GetManager(req.GetId())
m, exists := r.GetManager(int(req.GetId()))
if !exists {
return &pb.ReactorStatusResponse{}, errors.New("Manager doesn't exists for that client")
}
@ -238,6 +255,7 @@ func (r *reactorCoordinator) ReactorStatusHandler(ctx context.Context, req *pb.R
}
// tui coordinator
/*
type tuiCoordinator struct {
*Managers // by embedding general struct we allow coordinator to still call general funcs
pb.UnimplementedManagementServer
@ -247,9 +265,9 @@ func (t *tuiCoordinator) Start() {
logging.Debug(logging.DStart, "TCO 01 Starting!")
}
func (t *tuiCoordinator) NewManager(cl *Client, sys *SystemViewer, err chan error) GeneralManager {
func (t *tuiCoordinator) NewManager(cl *Client, err chan error) GeneralManager {
logging.Debug(logging.DClient, "TCO 01 starting manager for %v client %v", cl.Type, cl.Id)
return NewTUIManager(cl, sys, err)
return NewTUIManager(cl, err)
}
func (t *tuiCoordinator) Register() {
@ -267,12 +285,13 @@ func (t *tuiCoordinator) Register() {
pb.RegisterManagementServer(grpcServer, t)
go grpcServer.Serve(lis)
logging.Debug(logging.DClient, "TCO ready for client requests")
*/
*/
/*
}
func (t *tuiCoordinator) GetDevices(ctx context.Context, req *pb.GetDevicesRequest) (*pb.GetDevicesResponse, error) {
// grpc handler to fwd to manager
m, exists := t.GetManager(req.GetClientId())
m, exists := t.GetManager(int(req.GetClientId()))
if !exists {
// doesnt exist for some reason
return &pb.GetDevicesResponse{}, errors.New("Manager doesn't exists for client")
@ -294,3 +313,4 @@ func (t *tuiCoordinator) DeleteReactorDevice(ctx context.Context, req *pb.Delete
// TODO
return &pb.DeleteReactorDeviceResponse{}, nil
}
*/

@ -13,6 +13,7 @@ import (
/*
Listens on a supplied port and sends incoming clients over a supplied channel
Waits for a response on that channel to send back to the client with DB credentials
*/
type Listener struct { // exporting for easy use in the short term
@ -28,10 +29,9 @@ type ClientPacket struct {
}
type Client struct {
// general client struct to store reqs from reactors/tui
Ip string
Port int
Id uint32
Id int
Model string
Type string
}
@ -71,15 +71,16 @@ func (l *Listener) Register() error {
}
func (l *Listener) ClientDiscoveryHandler(ctx context.Context, ping *pb.ClientRequest) (*pb.ClientResponse, error) {
// incoming reactor ping need to spawn coord
c := &Client{Id: ping.GetClientId(), Type: ping.GetClientType()}
// incoming client ping, notify coord and wait for DB credentials to respond
c := &Client{Id: int(ping.GetClientId()), Type: ping.GetClientType()}
logging.Debug(logging.DClient, "LIS %v %v has connected\n", c.Type, c.Id)
// prepare packet to send to coordinator
ch := make(chan *ClientResponse)
p := &ClientPacket{Response: ch}
p.Client = c
p := &ClientPacket{Client: c, Response: ch}
// blocking
l.ClientConnections <- p
resp := <-ch
// return the port for the incoming requests
// prepare object to return to client
db := &pb.Database{URL: resp.URL, ORG: resp.Org, Token: resp.Token, Bucket: resp.Bucket}
return &pb.ClientResponse{ClientId: c.Id, ServerPort: uint32(resp.Port), Database: db}, nil
return &pb.ClientResponse{ClientId: uint32(c.Id), ServerPort: uint32(resp.Port), Database: db}, nil
}

@ -1,108 +1,109 @@
package server
import (
//"log"
"time"
"math"
"sync"
"errors"
_ "context"
"FRMS/internal/pkg/logging"
//"log"
"FRMS/internal/pkg/logging"
_ "context"
"errors"
"math"
"sync"
"time"
)
// this package will implement a boilerplate manager
// manager connects to client on start and returns the gRPC connection to make gRPC clients
type Manager struct {
*Client // gives access to c.Ip c.Id etc
Hb time.Duration // used for managing hb timer for client
Active active
Sig chan bool
Err chan error
*Client // gives access to c.Ip c.Id etc
Hb time.Duration // used for managing hb timer for client
Active active
Sig chan bool
Err chan error
}
type active struct{
sync.Mutex
bool
int
type active struct {
sync.Mutex
bool
int
}
func NewManager(err chan error) *Manager {
hb := time.Duration(1 * time.Second) //hb to
m := &Manager{Hb:hb,Err:err}
return m
hb := time.Duration(5 * time.Second) //hb to
m := &Manager{Hb: hb, Err: err}
return m
}
func (m *Manager) Start() {
if !m.Activate() {
// manager already running
m.Err <-errors.New("Manager already running!")
} // if we get here, manager is atomically activated and we can ensure start wont run again
if !m.Activate() {
// manager already running
m.Err <- errors.New("Manager already running!")
} // if we get here, manager is atomically activated and we can ensure start wont run again
}
func (m *Manager) Exit() {
// exit function to eventually allow saving to configs
if !m.Deactivate() {
m.Err <-errors.New("Manager already disabled!")
}
// exit function to eventually allow saving to configs
if !m.Deactivate() {
m.Err <- errors.New("Manager already disabled!")
}
}
func (m *Manager) UpdateClient(cl *Client) {
logging.Debug(logging.DClient,"MAN Updating client %v",cl.Id)
m.Client = cl
logging.Debug(logging.DClient, "MAN Updating client %v", cl.Id)
m.Client = cl
}
// reactor manager atomic operations
func (m *Manager) IsActive() bool {
m.Active.Lock()
defer m.Active.Unlock()
return m.Active.bool
m.Active.Lock()
defer m.Active.Unlock()
return m.Active.bool
}
func (m *Manager) Activate() bool {
// slightly confusing but returns result of trying to activate
m.Active.Lock()
defer m.Active.Unlock()
alive := m.Active.bool
if alive {
return false
} else {
m.Active.bool = true
m.Active.int = 0
return m.Active.bool
}
// slightly confusing but returns result of trying to activate
m.Active.Lock()
defer m.Active.Unlock()
alive := m.Active.bool
if alive {
return false
} else {
m.Active.bool = true
m.Active.int = 0
return m.Active.bool
}
}
func (m *Manager) Deactivate() bool {
// result of trying to deactivate
m.Active.Lock()
defer m.Active.Unlock()
alive := m.Active.bool
if alive {
m.Active.bool = false
return true
} else {
return m.Active.bool
}
// result of trying to deactivate
m.Active.Lock()
defer m.Active.Unlock()
alive := m.Active.bool
if alive {
m.Active.bool = false
return true
} else {
return m.Active.bool
}
}
// connection stuff
func (m *Manager) Timeout() int {
// keeps track of and generates timeout [0-1.2s) over span of ~2.5s
// returns 0 on TO elapse
m.Active.Lock()
defer m.Active.Unlock()
if m.Active.int < 9 {
v := int(5 * math.Pow(float64(2), float64(m.Active.int)))
m.Active.int += 1
return v
} else {
// exceeded retries
return 0
}
// keeps track of and generates timeout [0-1.2s) over span of ~2.5s
// returns 0 on TO elapse
m.Active.Lock()
defer m.Active.Unlock()
if m.Active.int < 9 {
v := int(5 * math.Pow(float64(2), float64(m.Active.int)))
m.Active.int += 1
return v
} else {
// exceeded retries
return 0
}
}
/*
shouldnt be nessecary anymore

@ -1,149 +1,153 @@
package server
import (
"fmt"
"time"
_ "log"
"context"
"sync"
"FRMS/internal/pkg/logging"
"google.golang.org/grpc"
"google.golang.org/grpc/status"
"google.golang.org/grpc/credentials/insecure"
pb "FRMS/internal/pkg/grpc"
pb "FRMS/internal/pkg/grpc"
"FRMS/internal/pkg/logging"
"context"
"fmt"
_ "log"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
)
// this package will implement a reactor coordinator and associated go routines
// this package will implement a reactor manager and associated go routines
type ReactorManager struct {
*Manager
StatusMon *StatusMonitor
*devstatus
*Manager
// StatusMon *StatusMonitor putting on pause
*devstatus
}
type devstatus struct {
sync.Mutex
Devs map[uint32]*DeviceInfo
// keeping this around but not using it to create status for status mon
sync.Mutex
Devs map[int]*DeviceInfo
}
func NewReactorManager(c *Client,sys *SystemViewer,err chan error) GeneralManager {
r := &ReactorManager{}
di := make(map[uint32]*DeviceInfo)
r.devstatus = &devstatus{Devs:di}
r.Manager = NewManager(err)
r.StatusMon = NewStatusMonitor("Reactor",c.Id,sys)
return r
func NewReactorManager(c *Client, err chan error) GeneralManager {
r := &ReactorManager{}
di := make(map[int]*DeviceInfo)
r.devstatus = &devstatus{Devs: di}
r.Manager = NewManager(err)
//r.StatusMon = NewStatusMonitor("Reactor", c.Id, sys)
return r
}
func (r *ReactorManager) Start() {
r.Manager.Start()
logging.Debug(logging.DStart,"RMA %v starting", r.Id)
go r.StatusMon.Send(&DeviceInfo{Id:r.Id,Type:"Reactor",Status:"[green]ONLINE[white]"},"Reactor")
//conn := r.Connect()
//empty := &grpc.ClientConn{}
//if conn != empty {
//}
r.Manager.Start()
logging.Debug(logging.DStart, "RMA %v starting", r.Id)
//go r.StatusMon.Send(&DeviceInfo{Id: r.Id, Type: "Reactor", Status: "[green]ONLINE[white]"}, "Reactor")
//conn := r.Connect()
//empty := &grpc.ClientConn{}
//if conn != empty {
//}
}
func (r *ReactorManager) Exit() {
r.Manager.Exit()
logging.Debug(logging.DExit, "RMA %v exiting", r.Id)
go r.StatusMon.Send(&DeviceInfo{Id:r.Id,Type:"Reactor",Status:"[red]OFFLINE[white]",Data:fmt.Sprintf("Last Seen %v",time.Now().Format("Mon at 03:04:05pm MST"))},"Reactor")
r.devstatus.Lock()
defer r.devstatus.Unlock()
for _, d := range r.Devs {
newd := d
newd.Status = "[yellow]UNKOWN[white]"
r.Devs[newd.Id] = newd
go r.StatusMon.Send(newd,"Device")
}
r.Manager.Exit()
logging.Debug(logging.DExit, "RMA %v exiting", r.Id)
//go r.StatusMon.Send(&DeviceInfo{Id: r.Id, Type: "Reactor", Status: "[red]OFFLINE[white]", Data: fmt.Sprintf("Last Seen %v", time.Now().Format("Mon at 03:04:05pm MST"))}, "Reactor")
r.devstatus.Lock()
defer r.devstatus.Unlock()
// keeping this because it **COULD** be useful, maybe
for _, d := range r.Devs {
newd := d
newd.Status = "UNKOWN"
r.Devs[newd.Id] = newd
//go r.StatusMon.Send(newd, "Device")
}
}
func (r *ReactorManager) Connect() *grpc.ClientConn {
// establish gRPC conection with reactor
var opts []grpc.DialOption
var conn *grpc.ClientConn
opts = append(opts,grpc.WithTransportCredentials(insecure.NewCredentials()))
// establish gRPC conection with reactor
// this seems pretty stupid, seems like reactor should communicate up the chain to avoid unnessecary comms.
var opts []grpc.DialOption
var conn *grpc.ClientConn
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
for {
if !r.IsActive() {
logging.Debug(logging.DClient,"RMA %v No longer active, aborting connection attempt",r.Id)
return &grpc.ClientConn{}
}
var err error
conn, err = grpc.Dial(fmt.Sprintf("%v:%v",r.Ip,r.Port),opts...)
// error handling
code := status.Code(err)
if code != 0 { // != OK
if code == (5 | 14) { // unavailable or not found
to := r.Timeout()
if to == 0 {
logging.Debug(logging.DClient,"RMA %v Client not responding",r.Id)
return &grpc.ClientConn{}
}
logging.Debug(logging.DClient,"RMA %v Client currently down, retrying in %v ms",r.Id, to)
time.Sleep(time.Duration(to) * time.Millisecond)
for {
if !r.IsActive() {
logging.Debug(logging.DClient, "RMA %v No longer active, aborting connection attempt", r.Id)
return &grpc.ClientConn{}
}
var err error
conn, err = grpc.Dial(fmt.Sprintf("%v:%v", r.Ip, r.Port), opts...)
// error handling
code := status.Code(err)
if code != 0 { // != OK
if code == (5 | 14) { // unavailable or not found
to := r.Timeout()
if to == 0 {
logging.Debug(logging.DClient, "RMA %v Client not responding", r.Id)
return &grpc.ClientConn{}
}
logging.Debug(logging.DClient, "RMA %v Client currently down, retrying in %v ms", r.Id, to)
time.Sleep(time.Duration(to) * time.Millisecond)
} else {
logging.Debug(logging.DError,"RMA %v GRPC ERROR: %v",r.Id, code)
r.Err <- err
}
}
break;
}
return conn
} else {
logging.Debug(logging.DError, "RMA %v GRPC ERROR: %v", r.Id, code)
r.Err <- err
}
}
break
}
return conn
}
func (r *ReactorManager) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) {
// function client will call to update reactor information
//go r.PingReset()
for _, dev := range req.GetDevices() {
d := &DeviceInfo{Id:uint32(dev.GetAddr()),Type:dev.GetType(),Status:dev.GetStatus(),Data:dev.GetData()}
go r.UpdateDevice(d)
}
return &pb.ReactorStatusResponse{Id:r.Id}, nil
// function client will call to update reactor information
//go r.PingReset()
for _, dev := range req.GetDevices() {
d := &DeviceInfo{Id: int(dev.GetAddr()), Type: dev.GetType(), Status: dev.GetStatus(), Data: dev.GetData()}
go r.UpdateDevice(d)
}
return &pb.ReactorStatusResponse{Id: uint32(r.Id)}, nil
}
/*
func (r *ReactorManager) Monitor(conn *grpc.ClientConn) {
defer conn.Close()
client := pb.NewMonitoringClient(conn)
for r.IsActive() {
req := &pb.ReactorStatusRequest{Id:r.Id}
resp, err := client.GetReactorStatus(context.Background(),req)
code := status.Code(err)
if code != 0 { // if != OK
logging.Debug(logging.DClient,"RMA %v Reactor not responding! Code: %v\n", r.Id,code)
r.devstatus.Lock()
for _, d := range r.Devs {
newd := d
newd.Status = "[yellow]UNKOWN[white]"
r.Devs[newd.Id] = newd
go r.StatusMon.Send(newd,"Device")
}
r.devstatus.Unlock()
r.Exit()
break;
}
for _,v := range resp.GetDevices() {
d := &DeviceInfo{Id:uint32(v.GetAddr()),Type:v.GetType(),Status:v.GetStatus(),Data:v.GetData()}
go r.UpdateDevice(d)
}
time.Sleep(r.Hb) // time between sensor pings
}
}
func (r *ReactorManager) Monitor(conn *grpc.ClientConn) {
defer conn.Close()
client := pb.NewMonitoringClient(conn)
for r.IsActive() {
req := &pb.ReactorStatusRequest{Id:r.Id}
resp, err := client.GetReactorStatus(context.Background(),req)
code := status.Code(err)
if code != 0 { // if != OK
logging.Debug(logging.DClient,"RMA %v Reactor not responding! Code: %v\n", r.Id,code)
r.devstatus.Lock()
for _, d := range r.Devs {
newd := d
newd.Status = "[yellow]UNKOWN[white]"
r.Devs[newd.Id] = newd
go r.StatusMon.Send(newd,"Device")
}
r.devstatus.Unlock()
r.Exit()
break;
}
for _,v := range resp.GetDevices() {
d := &DeviceInfo{Id:uint32(v.GetAddr()),Type:v.GetType(),Status:v.GetStatus(),Data:v.GetData()}
go r.UpdateDevice(d)
}
time.Sleep(r.Hb) // time between sensor pings
}
}
*/
func (r *ReactorManager) UpdateDevice(d *DeviceInfo) {
r.devstatus.Lock()
defer r.devstatus.Unlock()
if olddev, ok := r.Devs[d.Id]; !ok {
// new device
r.Devs[d.Id] = d
go r.StatusMon.Send(d,"Device")
} else if olddev.Status != d.Status || olddev.Data != d.Data {
// dev status or data has changed
r.Devs[d.Id] = d
go r.StatusMon.Send(d,"Device")
}
r.devstatus.Lock()
defer r.devstatus.Unlock()
if olddev, ok := r.Devs[d.Id]; !ok {
// new device
r.Devs[d.Id] = d
//go r.StatusMon.Send(d, "Device")
} else if olddev.Status != d.Status || olddev.Data != d.Data {
// dev status or data has changed
r.Devs[d.Id] = d
//go r.StatusMon.Send(d, "Device")
}
}

@ -1,315 +1,318 @@
package server
import (
"sync"
_ "fmt"
"FRMS/internal/pkg/logging"
_ "fmt"
)
// allows for multiple readers/writers
type DeviceInfo struct {
Id uint32
Type string
Status string
Data string
Index uint32
TransactionId uint32
Id int
Type string
Status string
Data string
Index int
TransactionId uint32
}
/*
type StatusMonitor struct {
// allows for embedding into managers
TransactionId chan uint32 // monotonically increases to track outdated reqs
DevChan chan *DeviceInfo // channel for device status
ReactorChan chan *DeviceInfo // channel for reactor status
*SystemViewer
DevBuf *devbuf
sync.Mutex
// allows for embedding into managers
TransactionId chan uint32 // monotonically increases to track outdated reqs
DevChan chan *DeviceInfo // channel for device status
ReactorChan chan *DeviceInfo // channel for reactor status
*SystemViewer
DevBuf *devbuf
sync.Mutex
}
type devbuf struct {
ReactorId uint32 // reactor we are looking at, if any
Buf map[string]map[uint32]*DeviceInfo // convienent way to store/seperate device data
sync.Mutex
}
func NewBuffer() map[string]map[uint32]*DeviceInfo {
rbuf := make(map[uint32]*DeviceInfo)
dbuf := make(map[uint32]*DeviceInfo)
sbuf := make(map[string]map[uint32]*DeviceInfo)
sbuf["Reactor"] = rbuf
sbuf["Device"] = dbuf
return sbuf
}
func NewStatusMonitor(t string, id uint32, sys *SystemViewer) *StatusMonitor {
tid := make(chan uint32)
sm := &StatusMonitor{TransactionId:tid}
sm.SystemViewer = sys
logging.Debug(logging.DClient,"SYS Creating new status monitor")
if t == "Reactor" {
// reactor status monitor
sm.ReactorChan = sys.AddReactorSender()
sm.DevChan = sys.AddDeviceSender(id)
go sm.GenerateIds()
} else {
// tui status monitor
sbuf := NewBuffer()
//sm.ReactorChan, sbuf["Reactor"] = sys.AddListener(id,0)
sm.DevBuf = &devbuf{Buf:sbuf} // makes it easier to work with
go sm.UpdateListener(id,0)
}
return sm
ReactorId int // reactor we are looking at, if any
Buf map[string]map[int]*DeviceInfo // convienent way to store/seperate device data
sync.Mutex
}
func NewBuffer() map[string]map[int]*DeviceInfo {
rbuf := make(map[int]*DeviceInfo)
dbuf := make(map[int]*DeviceInfo)
sbuf := make(map[string]map[int]*DeviceInfo)
sbuf["Reactor"] = rbuf
sbuf["Device"] = dbuf
return sbuf
}
func NewStatusMonitor(t string, id int, sys *SystemViewer) *StatusMonitor {
tid := make(chan uint32)
sm := &StatusMonitor{TransactionId: tid}
sm.SystemViewer = sys
logging.Debug(logging.DClient, "SYS Creating new status monitor")
if t == "Reactor" {
// reactor status monitor
sm.ReactorChan = sys.AddReactorSender()
sm.DevChan = sys.AddDeviceSender(id)
go sm.GenerateIds()
} else {
// tui status monitor
sbuf := NewBuffer()
//sm.ReactorChan, sbuf["Reactor"] = sys.AddListener(id,0)
sm.DevBuf = &devbuf{Buf: sbuf} // makes it easier to work with
go sm.UpdateListener(id, 0)
}
return sm
}
func (s *StatusMonitor) GenerateIds() {
var id uint32
id = 0
for {
s.TransactionId <-id
id += 1
}
var id uint32
id = 0
for {
s.TransactionId <- id
id += 1
}
}
func (s *StatusMonitor) Send(d *DeviceInfo, dtype string) {
d.TransactionId = <-s.TransactionId
logging.Debug(logging.DClient,"SYS 01 Sending update for: %s (%s)", d.Type, d.Status)
if dtype == "Reactor" {
s.ReactorChan <-d
} else {
s.DevChan <-d
}
d.TransactionId = <-s.TransactionId
logging.Debug(logging.DClient, "SYS 01 Sending update for: %s (%s)", d.Type, d.Status)
if dtype == "Reactor" {
s.ReactorChan <- d
} else {
s.DevChan <- d
}
}
func (s *StatusMonitor) GetBuffer() []*DeviceInfo {
// also clears buffer
s.DevBuf.Lock()
defer s.DevBuf.Unlock()
res := []*DeviceInfo{}
if len(s.DevBuf.Buf["Reactor"]) != 0 || len(s.DevBuf.Buf["Device"]) != 0 {
logging.Debug(logging.DClient,"Clearing buff %v", s.DevBuf.Buf)
}
for _, devs := range s.DevBuf.Buf {
for _, dev := range devs {
// loops over reactors then devices
res = append(res,dev)
}
}
s.DevBuf.Buf = NewBuffer() // clearing old buffer
return res
// also clears buffer
s.DevBuf.Lock()
defer s.DevBuf.Unlock()
res := []*DeviceInfo{}
if len(s.DevBuf.Buf["Reactor"]) != 0 || len(s.DevBuf.Buf["Device"]) != 0 {
logging.Debug(logging.DClient, "Clearing buff %v", s.DevBuf.Buf)
}
for _, devs := range s.DevBuf.Buf {
for _, dev := range devs {
// loops over reactors then devices
res = append(res, dev)
}
}
s.DevBuf.Buf = NewBuffer() // clearing old buffer
return res
}
func (s *StatusMonitor) UpdateListener(tid, reactorId uint32) {
s.DevBuf.Lock()
defer s.DevBuf.Unlock()
// clearing proper buffer
if reactorId == 0 {
logging.Debug(logging.DClient,"SYS 01 Adding %v as reactor listener", tid)
s.DevBuf.Buf["Reactor"] = make(map[uint32]*DeviceInfo)
s.ReactorChan, s.DevBuf.Buf["Reactor"] = s.SystemViewer.AddListener(tid, reactorId)
go s.Listen(s.ReactorChan)
} else {
logging.Debug(logging.DClient,"SYS 01 Adding %v as reactor %v listener", tid, reactorId)
s.DevBuf.Buf["Device"] = make(map[uint32]*DeviceInfo) // clearing old devices
if s.DevBuf.ReactorId != reactorId && s.DevBuf.ReactorId != 0{
go s.SystemViewer.RemoveListener(s.DevBuf.ReactorId, tid)
}
s.DevBuf.ReactorId = reactorId
s.DevChan, s.DevBuf.Buf["Device"] = s.SystemViewer.AddListener(tid, reactorId)
go s.Listen(s.DevChan)
}
s.DevBuf.Lock()
defer s.DevBuf.Unlock()
// clearing proper buffer
if reactorId == 0 {
logging.Debug(logging.DClient, "SYS 01 Adding %v as reactor listener", tid)
s.DevBuf.Buf["Reactor"] = make(map[uint32]*DeviceInfo)
s.ReactorChan, s.DevBuf.Buf["Reactor"] = s.SystemViewer.AddListener(tid, reactorId)
go s.Listen(s.ReactorChan)
} else {
logging.Debug(logging.DClient, "SYS 01 Adding %v as reactor %v listener", tid, reactorId)
s.DevBuf.Buf["Device"] = make(map[uint32]*DeviceInfo) // clearing old devices
if s.DevBuf.ReactorId != reactorId && s.DevBuf.ReactorId != 0 {
go s.SystemViewer.RemoveListener(s.DevBuf.ReactorId, tid)
}
s.DevBuf.ReactorId = reactorId
s.DevChan, s.DevBuf.Buf["Device"] = s.SystemViewer.AddListener(tid, reactorId)
go s.Listen(s.DevChan)
}
}
func (s *StatusMonitor) UpdateBuffer(d *DeviceInfo, dtype string, ch chan *DeviceInfo) {
s.DevBuf.Lock()
defer s.DevBuf.Unlock()
logging.Debug(logging.DClient,"SYS 01 Dev %v update requested", d)
if dev, exists := s.DevBuf.Buf[dtype][d.Id]; exists {
// already a device in the buffer
if dev.TransactionId > d.TransactionId {
logging.Debug(logging.DClient,"SYS 01 Update Processed. Old: %v, New: %v \n", dev, d)
d = dev // not sure if i can do this lol
}
}
if ch == s.ReactorChan || ch == s.DevChan {
// hacky way to check if the device came from a listener of a current channel
s.DevBuf.Buf[dtype][d.Id] = d
} else {
logging.Debug(logging.DClient,"Dev out of date!")
}
s.DevBuf.Lock()
defer s.DevBuf.Unlock()
logging.Debug(logging.DClient, "SYS 01 Dev %v update requested", d)
if dev, exists := s.DevBuf.Buf[dtype][d.Id]; exists {
// already a device in the buffer
if dev.TransactionId > d.TransactionId {
logging.Debug(logging.DClient, "SYS 01 Update Processed. Old: %v, New: %v \n", dev, d)
d = dev // not sure if i can do this lol
}
}
if ch == s.ReactorChan || ch == s.DevChan {
// hacky way to check if the device came from a listener of a current channel
s.DevBuf.Buf[dtype][d.Id] = d
} else {
logging.Debug(logging.DClient, "Dev out of date!")
}
}
func (s *StatusMonitor) Listen(ch chan *DeviceInfo) {
for dev := range ch {
if dev.Type == "Reactor" {
go s.UpdateBuffer(dev,"Reactor", ch)
} else {
go s.UpdateBuffer(dev, "Device", ch)
}
}
for dev := range ch {
if dev.Type == "Reactor" {
go s.UpdateBuffer(dev, "Reactor", ch)
} else {
go s.UpdateBuffer(dev, "Device", ch)
}
}
}
type InfoStream struct {
// base stream for any reactor/device
// NewSender embedds the channel to send updates on
// NewListener will add the statusmon to the list of devs to echo to
Stream chan *DeviceInfo
Layout *syslayout
*listeners
// base stream for any reactor/device
// NewSender embedds the channel to send updates on
// NewListener will add the statusmon to the list of devs to echo to
Stream chan *DeviceInfo
Layout *syslayout
*listeners
}
type listeners struct {
sync.RWMutex
Listeners map[uint32]*lischan
sync.RWMutex
Listeners map[uint32]*lischan
}
type lischan struct {
sync.WaitGroup
StatusChan chan *DeviceInfo
sync.WaitGroup
StatusChan chan *DeviceInfo
}
type syslayout struct {
Devs map[uint32]*DeviceInfo
uint32 //index
sync.RWMutex
Devs map[uint32]*DeviceInfo
uint32 //index
sync.RWMutex
}
func NewLisChan(ch chan *DeviceInfo) *lischan {
l := &lischan{StatusChan:ch}
return l
l := &lischan{StatusChan: ch}
return l
}
func NewInfoStream() *InfoStream {
dch := make(chan *DeviceInfo)
s := &InfoStream{Stream:dch}
m := make(map[uint32]*DeviceInfo)
s.Layout = &syslayout{Devs:m}
s.listeners = &listeners{Listeners:make(map[uint32]*lischan)}
return s
dch := make(chan *DeviceInfo)
s := &InfoStream{Stream: dch}
m := make(map[uint32]*DeviceInfo)
s.Layout = &syslayout{Devs: m}
s.listeners = &listeners{Listeners: make(map[uint32]*lischan)}
return s
}
func (s *InfoStream) Start() {
// consistency
go s.Listen()
// consistency
go s.Listen()
}
// goal is to hook every new manager into the reactor status chan
func (s *InfoStream) AddSender() chan *DeviceInfo {
return s.Stream
return s.Stream
}
func (s *InfoStream) Listen() {
for deviceInfo := range s.Stream {
go s.Update(deviceInfo)
}
for deviceInfo := range s.Stream {
go s.Update(deviceInfo)
}
}
func (s *InfoStream) Update(d *DeviceInfo) {
s.Layout.Lock()
defer s.Layout.Unlock()
if dev, ok := s.Layout.Devs[d.Id]; !ok {
d.Index = s.Layout.uint32
s.Layout.uint32 += 1
} else {
d.Index = dev.Index
}
s.Layout.Devs[d.Id] = d
go s.Echo(d)
s.Layout.Lock()
defer s.Layout.Unlock()
if dev, ok := s.Layout.Devs[d.Id]; !ok {
d.Index = s.Layout.uint32
s.Layout.uint32 += 1
} else {
d.Index = dev.Index
}
s.Layout.Devs[d.Id] = d
go s.Echo(d)
}
func (l *listeners) Echo(d *DeviceInfo) {
l.RLock()
defer l.RUnlock()
// read only lock
for _, lis := range l.Listeners {
lis.Add(1)
go func(listener *lischan, dev *DeviceInfo){
defer listener.Done()
listener.StatusChan <-dev
}(lis,d)
}
}
func (s *InfoStream) AddListener(id uint32, ch chan *DeviceInfo) map[uint32]*DeviceInfo {
// if i get a memory leak ill eat my shoe
s.listeners.Lock()
defer s.listeners.Unlock()
if _, ok := s.listeners.Listeners[id]; ok {
// exists
go s.RemoveListener(id)
}
s.listeners.Listeners[id] = NewLisChan(ch)
logging.Debug(logging.DClient,"SYS 01 Getting Devices")
//s.Layout.RLock()
//defer s.Layout.RUnlock()
logging.Debug(logging.DClient,"SYS 01 Returning Devices %v", s.Layout.Devs)
return s.Layout.Devs
}
func (l *listeners) RemoveListener(id uint32) {
l.Lock()
defer l.Unlock()
if lis, ok := l.Listeners[id]; ok {
delete(l.Listeners,id)
go func(ls *lischan){
ls.Wait()
close(ls.StatusChan)
}(lis)
}
l.RLock()
defer l.RUnlock()
// read only lock
for _, lis := range l.Listeners {
lis.Add(1)
go func(listener *lischan, dev *DeviceInfo) {
defer listener.Done()
listener.StatusChan <- dev
}(lis, d)
}
}
func (s *InfoStream) AddListener(id int, ch chan *DeviceInfo) map[uint32]*DeviceInfo {
// if i get a memory leak ill eat my shoe
s.listeners.Lock()
defer s.listeners.Unlock()
if _, ok := s.listeners.Listeners[id]; ok {
// exists
go s.RemoveListener(id)
}
s.listeners.Listeners[id] = NewLisChan(ch)
logging.Debug(logging.DClient, "SYS 01 Getting Devices")
//s.Layout.RLock()
//defer s.Layout.RUnlock()
logging.Debug(logging.DClient, "SYS 01 Returning Devices %v", s.Layout.Devs)
return s.Layout.Devs
}
func (l *listeners) RemoveListener(id int) {
l.Lock()
defer l.Unlock()
if lis, ok := l.Listeners[id]; ok {
delete(l.Listeners, id)
go func(ls *lischan) {
ls.Wait()
close(ls.StatusChan)
}(lis)
}
}
// status buffer maintaince
type SystemViewer struct {
// stores system directory and provide methods to be embedded in managers
ReactorStream *InfoStream // can add itself as a listener to provide methods
DeviceStream *ds
// stores system directory and provide methods to be embedded in managers
ReactorStream *InfoStream // can add itself as a listener to provide methods
DeviceStream *ds
}
type ds struct {
Reactors map[uint32]*InfoStream //map from reactor id to its device info stream
sync.Mutex
Reactors map[uint32]*InfoStream //map from reactor id to its device info stream
sync.Mutex
}
func NewSystemViewer() *SystemViewer {
rs := NewInfoStream()
s := &SystemViewer{ReactorStream:rs}
m := make(map[uint32]*InfoStream)
s.DeviceStream = &ds{Reactors:m}
return s
rs := NewInfoStream()
s := &SystemViewer{ReactorStream: rs}
m := make(map[uint32]*InfoStream)
s.DeviceStream = &ds{Reactors: m}
return s
}
func (s *SystemViewer) Start() {
go s.ReactorStream.Start()
go s.ReactorStream.Start()
}
func (s *SystemViewer) AddReactorSender() chan *DeviceInfo {
return s.ReactorStream.AddSender()
return s.ReactorStream.AddSender()
}
func (s *SystemViewer) AddDeviceSender(reactorId uint32) chan *DeviceInfo {
s.DeviceStream.Lock()
defer s.DeviceStream.Unlock()
var ds *InfoStream
var ok bool
if ds, ok = s.DeviceStream.Reactors[reactorId]; !ok {
ds = NewInfoStream()
s.DeviceStream.Reactors[reactorId] = ds
go ds.Start()
}
return ds.AddSender()
}
func (s *SystemViewer) AddListener(id, rid uint32) (chan *DeviceInfo, map[uint32]*DeviceInfo) {
// returns a listener for that chan
ch := make(chan *DeviceInfo)
if rid != 0 {
return ch, s.DeviceStream.Reactors[rid].AddListener(id, ch)
} else {
return ch, s.ReactorStream.AddListener(id, ch)
}
}
func (s *SystemViewer) RemoveListener(rid, tid uint32) {
// removes chan for specific tid and rid
s.DeviceStream.Lock()
defer s.DeviceStream.Unlock()
go s.DeviceStream.Reactors[rid].RemoveListener(tid)
}
s.DeviceStream.Lock()
defer s.DeviceStream.Unlock()
var ds *InfoStream
var ok bool
if ds, ok = s.DeviceStream.Reactors[reactorId]; !ok {
ds = NewInfoStream()
s.DeviceStream.Reactors[reactorId] = ds
go ds.Start()
}
return ds.AddSender()
}
func (s *SystemViewer) AddListener(id, rid int) (chan *DeviceInfo, map[uint32]*DeviceInfo) {
// returns a listener for that chan
ch := make(chan *DeviceInfo)
if rid != 0 {
return ch, s.DeviceStream.Reactors[rid].AddListener(id, ch)
} else {
return ch, s.ReactorStream.AddListener(id, ch)
}
}
func (s *SystemViewer) RemoveListener(rid, tid int) {
// removes chan for specific tid and rid
s.DeviceStream.Lock()
defer s.DeviceStream.Unlock()
go s.DeviceStream.Reactors[rid].RemoveListener(tid)
}
*/

@ -1,5 +1,7 @@
package server
/*
import (
// "fmt"
"time"
@ -33,7 +35,7 @@ func NewTUIManager(c *Client, sys *SystemViewer, err chan error) GeneralManager
m := NewManager(err)
t := &TUIManager{Err: err}
alert := make(chan bool)
t.Timeout = &Timeout{Alert:alert,TO:time.Duration(2500*time.Millisecond)} // short time outs are fine because we will just rejoin
t.Timeout = &Timeout{Alert:alert,TO:time.Duration(2500*time.Millisecond)} // short time outs are fine because we will just rejoin
t.Manager = m
t.StatusMon = NewStatusMonitor("TUI",c.Id,sys)
t.Manager.UpdateClient(c)
@ -118,4 +120,4 @@ func (t *TUIManager) DeleteReactorDevice(ctx context.Context, req *pb.DeleteReac
//
return &pb.DeleteReactorDeviceResponse{}, nil
}
*/

@ -34,3 +34,7 @@
#### 12/06 TODO
- I think I can completely remove the old config way and just pass the viper object directly. I think its not worth the hassle of trying to keep track of a million interfaces
#### 12/07 TODO
- I concede, I will just remove flags as most people will never use them anyway and instead rely on env vars and config files. To hell with the flags.
- I am ripping out all of the TUI and status manager stuff, its convoluted and harder than just pulling info from database.
- I can eventaully rework TUI to pull from DB which is fine, there will never be that many clients anyway and a lot of them are only 1 time calls with refreshes which aren't that slow anyway.

Loading…
Cancel
Save