diff --git a/cmd/reactor/reactor b/cmd/reactor/reactor index 85afa70..12fda16 100755 Binary files a/cmd/reactor/reactor and b/cmd/reactor/reactor differ diff --git a/cmd/server/server b/cmd/server/server index 977d8e9..aa74c00 100755 Binary files a/cmd/server/server and b/cmd/server/server differ diff --git a/cmd/tui/tui b/cmd/tui/tui index cbd7438..1c98c8e 100755 Binary files a/cmd/tui/tui and b/cmd/tui/tui differ diff --git a/internal/pkg/server/coordinator.go b/internal/pkg/server/coordinator.go index dba8c05..d5c4b64 100644 --- a/internal/pkg/server/coordinator.go +++ b/internal/pkg/server/coordinator.go @@ -1,59 +1,45 @@ package server import ( - "fmt" "sync" - "log" ) // this package creates coordinators responsible for keeping track of active clients and invoking managers +type CreateManager interface { + NewManager(*Client, *System, chan error) GeneralManager +} + +type GeneralManager interface { + Start() + GetPort() int +} type Coordinator struct { - Type string // ["reactor","tui"] - IncomingClients <-chan *Client *Managers Sys *System + CreateManager Err chan error Pc chan int } type Managers struct { - Directory map[uint32](chan<- *Client) + Directory map[uint32]GeneralManager sync.Mutex } - -func NewCoordinator(t string,ch chan *Client, sys *System,pc chan int, err chan error) *Coordinator { - d := make(map[uint32](chan<- *Client)) +// interface stuff +func NewCoordinator(manager CreateManager, sys *System,err chan error) *Coordinator { + d := make(map[uint32]GeneralManager) m := &Managers{Directory:d} - c := &Coordinator{Type: t,IncomingClients: ch,Err:err} - c.Managers = m + c := &Coordinator{Err:err} + c.CreateManager = manager c.Sys = sys - c.Pc = pc + c.Managers = m return c } -func FindNewManager(c *Client,ch chan *Client, sys *System, pc chan int, err chan error) { - switch c.Type { - case "reactor": - NewReactorManager(c,ch,sys,err) - case "tui": - NewTUIManager(c,"192.1.168.136",ch,sys,pc,err) - default: - log.Fatal(fmt.Sprintf("ERROR %v NOT FOUND",c.Type)) - } -} - func (c *Coordinator) Start() { // on start we need to create channel listener // on each new connection we want to check its id against our mapping - go c.Listen() -} - -func (c *Coordinator) Listen() { - for { - cl := <-c.IncomingClients - go c.ClientHandler(cl) - } } func (c *Coordinator) ClientHandler(cl *Client) { @@ -62,13 +48,46 @@ func (c *Coordinator) ClientHandler(cl *Client) { defer c.Managers.Unlock() if m, exists := c.Managers.Directory[cl.Id]; exists { // manager in memory - m <-cl + go m.Start() } else { // create channel and manager - ch := make(chan *Client) - FindNewManager(cl, ch, c.Sys, c.Pc, c.Err) - c.Managers.Directory[cl.Id] = ch - // will block until manager is ready - ch <-cl + m := c.NewManager(cl, c.Sys, c.Err) + c.Managers.Directory[cl.Id] = m + go m.Start() } } +// tui port grabber + +// reactor coordinator +type reactorCoordinator struct { + //empty unexported for method +} + +func (r *reactorCoordinator) NewManager(cl *Client, sys *System, err chan error) GeneralManager { + return NewReactorManager(cl,sys,err) +} + +func NewReactorCoordinator(sys *System, err chan error) *Coordinator { + return NewCoordinator(&reactorCoordinator{}, sys, err) +} + +//tui coordinator +type tuiCoordinator struct { + //can add fields as needed + Ip string + Port map[uint32]int +} + +func (t *tuiCoordinator) NewManager(cl *Client, sys *System, err chan error) GeneralManager { + return NewTUIManager(t.Ip,cl,sys,err) +} + +func NewTUICoordinator(ip string, sys *System, err chan error) *Coordinator { + p := make(map[uint32]int) + return NewCoordinator(&tuiCoordinator{Ip:ip,Port:p}, sys, err) +} + +func (c *Coordinator) GetTUIPort(cl *Client) int { + m := c.Managers.Directory[cl.Id] + return m.GetPort() +} diff --git a/internal/pkg/server/listener.go b/internal/pkg/server/listener.go index e166fe1..adb7dce 100644 --- a/internal/pkg/server/listener.go +++ b/internal/pkg/server/listener.go @@ -3,7 +3,6 @@ package server import ( "fmt" "net" - "sync" "context" "FRMS/internal/pkg/system" "google.golang.org/grpc" @@ -17,17 +16,12 @@ type Listener struct { // exporting for easy use in the short term // Reactor map[uint32]*ReactorManager this will go in eventual "coordinator" struct Ip string Port int - *Coordinators + Coordinators map[string]*Coordinator Sys *System Err chan error pb.UnimplementedHandshakeServer } -type Coordinators struct { - Channel map[string](chan<- *Client) - sync.Mutex -} - type Client struct { // can use general client and leave unset fields nil Ip string @@ -49,8 +43,7 @@ func NewListener(ifconfig string,ch chan error) (*Listener, error) { if ip, err = GetIp(ifconfig); err != nil { return &Listener{}, err } - m := make(map[string](chan<- *Client)) - c := &Coordinators{Channel:m} + c := make(map[string]*Coordinator) l := &Listener{Ip:ip,Err:ch} l.Coordinators = c l.Sys = NewSystemStruct() @@ -83,32 +76,29 @@ func (l *Listener) ReactorClientDiscoveryHandler(ctx context.Context, ping *pb.R // incoming reactor ping need to spawn coord c := &Client{Ip:ping.GetClientIp(),Model:ping.GetClientModel(),Type:"reactor",Port:int(ping.GetClientPort()),Id:ping.GetClientId()} fmt.Printf("%v Client %v has connected from %v:%v\n",c.Type,c.Id,c.Ip,c.Port) - ch := make(chan int) - go l.ConnectClient(c,ch) + + coord, ok := l.Coordinators["reactor"] + if !ok { + coord = NewReactorCoordinator(l.Sys, l.Err) + l.Coordinators["reactor"] = coord + coord.Start() + } + go coord.ClientHandler(c) // we dont handle any actual logic about the creation so we just respon true if the request was received return &pb.ReactorClientResponse{ClientId:c.Id,Success:true}, nil } func (l *Listener) TUIClientDiscoveryHandler(ctx context.Context, ping *pb.TUIClientRequest) (*pb.TUIClientResponse, error) { t := &Client{Type:"tui",Id:ping.GetClientId()} - ch := make(chan int) - go l.ConnectClient(t,ch) - port := <-ch + coord, ok := l.Coordinators["tui"] + if !ok { + coord := NewTUICoordinator(l.Ip, l.Sys, l.Err) + l.Coordinators["tui"] = coord + coord.Start() + } + go coord.ClientHandler(t) + port := coord.GetTUIPort(t) r := &pb.TUIClientResponse{ClientId:t.Id,ServerIp:l.Ip,ServerPort:int32(port)} return r, nil } -func (l *Listener) ConnectClient(c *Client, portchan chan int){ - // send to reactor coordinator for ease - l.Coordinators.Lock() - defer l.Coordinators.Unlock() - if ch, exists := l.Coordinators.Channel[c.Type]; exists { - ch <-c - } else { - ch := make(chan *Client) - newC := NewCoordinator(c.Type, ch, l.Sys, portchan, l.Err) - go newC.Start() - l.Coordinators.Channel[c.Type] = ch - ch <-c - } -} diff --git a/internal/pkg/server/manager.go b/internal/pkg/server/manager.go index 6c2a9d7..142251c 100644 --- a/internal/pkg/server/manager.go +++ b/internal/pkg/server/manager.go @@ -12,7 +12,6 @@ import ( type Manager struct { *Client // gives access to c.Ip c.Id etc - ClientReconnect chan *Client // chan for client reconnect Hb time.Duration Active active Sig chan bool @@ -25,28 +24,19 @@ type active struct{ int } -func NewManager(c *Client, ch chan *Client, sig chan bool, err chan error) *Manager { +func NewManager(c *Client, err chan error) *Manager { hb := time.Duration(1) //hb to - m := &Manager{Hb:hb,ClientReconnect:ch,Err:err} + m := &Manager{Hb:hb,Err:err} m.Client = c - m.Sig = sig - go m.Reconnect() return m } -func (m *Manager) Reconnect() { - c := <-m.ClientReconnect - m.Client = c // could contain new ip or port - m.Start() -} - func (m *Manager) Start() { // establish connection with client and start pinging at set intervals if !m.Activate() { // manager already running m.Err <-errors.New("Manager already running!") } // if we get here, manager is atomically activated and we can ensure start wont run again - m.Sig <-true } func (m *Manager) Exit() { @@ -54,8 +44,6 @@ func (m *Manager) Exit() { if !m.Deactivate() { m.Err <-errors.New("Manager already disabled!") } - go m.Reconnect() - m.Sig <-false } // reactor manager atomic operations diff --git a/internal/pkg/server/reactormanager.go b/internal/pkg/server/reactormanager.go index c2f408f..a51f493 100644 --- a/internal/pkg/server/reactormanager.go +++ b/internal/pkg/server/reactormanager.go @@ -25,25 +25,16 @@ type Devices struct { D map[int]Device } -func NewReactorManager(c *Client,ch chan *Client,sys *System,err chan error) { +func NewReactorManager(c *Client,sys *System,err chan error) GeneralManager { d := new(Devices) r := &ReactorManager{Devs:d} - start := make(chan bool) - r.Manager = NewManager(c, ch, start, err) + r.Manager = NewManager(c, err) r.System = sys - go r.Listen(start) -} - -func (r *ReactorManager) Listen(ch chan bool) { - for { - sig := <-ch - if sig { - r.Start() - } - } + return r } func (r *ReactorManager) Start() { + r.Manager.Start() conn := r.Connect() empty := &grpc.ClientConn{} if conn != empty { @@ -51,6 +42,10 @@ func (r *ReactorManager) Start() { } } +func (r *ReactorManager) GetPort() int { + return 0 +} + func (r *ReactorManager) Connect() *grpc.ClientConn { // establish gRPC conection with reactor var opts []grpc.DialOption diff --git a/internal/pkg/server/tuimanager.go b/internal/pkg/server/tuimanager.go index 9beb5e5..f17fef0 100644 --- a/internal/pkg/server/tuimanager.go +++ b/internal/pkg/server/tuimanager.go @@ -19,34 +19,21 @@ type TUIManager struct { Ip string Port int Err chan error - Hb time.Duration - Pc chan int *pb.UnimplementedManagementServer } -func NewTUIManager(c *Client, ip string, ch chan *Client,sys *System, pc chan int, err chan error) { - sig := make(chan bool) - m := NewManager(c, ch, sig, err) - hb := time.Duration(5) - t := &TUIManager{Hb: hb,Err: err} +func NewTUIManager(ip string, c *Client, sys *System, err chan error) GeneralManager { + m := NewManager(c, err) + t := &TUIManager{Err: err} t.Manager = m t.System = sys t.Ip = ip - t.Pc = pc - go t.Listen(sig) -} - -func (t *TUIManager) Listen(sig chan bool) { - for { - c := <-sig - if c { - t.Start() - } - } + return t } func (t *TUIManager) Start() { // + t.Manager.Start() go t.Register() // begin tui server to respond to tui client reqs //go t.Monitor(conn) } @@ -61,7 +48,13 @@ func (t *TUIManager) Register() { pb.RegisterManagementServer(grpcServer,t) go grpcServer.Serve(lis) log.Printf("TUI %v Endpoint active on %v:%v\n",t.Id, t.Ip, t.Port) - t.Pc <-t.Port +} + +func (t *TUIManager) GetPort() int { + for t.Port == 0 { + time.Sleep(10 * time.Millisecond) + } + return t.Port } func (t *TUIManager) GetReactors(ctx context.Context, req *pb.GetReactorsRequest) (*pb.GetReactorsResponse, error) {