added correct manager/coordinator impl with support for method extending should be set to test tui

main
K D 3 years ago
parent e045f3046e
commit 24f7c39263

Binary file not shown.

Binary file not shown.

Binary file not shown.

@ -1,59 +1,45 @@
package server
import (
"fmt"
"sync"
"log"
)
// this package creates coordinators responsible for keeping track of active clients and invoking managers
type CreateManager interface {
NewManager(*Client, *System, chan error) GeneralManager
}
type GeneralManager interface {
Start()
GetPort() int
}
type Coordinator struct {
Type string // ["reactor","tui"]
IncomingClients <-chan *Client
*Managers
Sys *System
CreateManager
Err chan error
Pc chan int
}
type Managers struct {
Directory map[uint32](chan<- *Client)
Directory map[uint32]GeneralManager
sync.Mutex
}
func NewCoordinator(t string,ch chan *Client, sys *System,pc chan int, err chan error) *Coordinator {
d := make(map[uint32](chan<- *Client))
// interface stuff
func NewCoordinator(manager CreateManager, sys *System,err chan error) *Coordinator {
d := make(map[uint32]GeneralManager)
m := &Managers{Directory:d}
c := &Coordinator{Type: t,IncomingClients: ch,Err:err}
c.Managers = m
c := &Coordinator{Err:err}
c.CreateManager = manager
c.Sys = sys
c.Pc = pc
c.Managers = m
return c
}
func FindNewManager(c *Client,ch chan *Client, sys *System, pc chan int, err chan error) {
switch c.Type {
case "reactor":
NewReactorManager(c,ch,sys,err)
case "tui":
NewTUIManager(c,"192.1.168.136",ch,sys,pc,err)
default:
log.Fatal(fmt.Sprintf("ERROR %v NOT FOUND",c.Type))
}
}
func (c *Coordinator) Start() {
// on start we need to create channel listener
// on each new connection we want to check its id against our mapping
go c.Listen()
}
func (c *Coordinator) Listen() {
for {
cl := <-c.IncomingClients
go c.ClientHandler(cl)
}
}
func (c *Coordinator) ClientHandler(cl *Client) {
@ -62,13 +48,46 @@ func (c *Coordinator) ClientHandler(cl *Client) {
defer c.Managers.Unlock()
if m, exists := c.Managers.Directory[cl.Id]; exists {
// manager in memory
m <-cl
go m.Start()
} else {
// create channel and manager
ch := make(chan *Client)
FindNewManager(cl, ch, c.Sys, c.Pc, c.Err)
c.Managers.Directory[cl.Id] = ch
// will block until manager is ready
ch <-cl
m := c.NewManager(cl, c.Sys, c.Err)
c.Managers.Directory[cl.Id] = m
go m.Start()
}
}
// tui port grabber
// reactor coordinator
type reactorCoordinator struct {
//empty unexported for method
}
func (r *reactorCoordinator) NewManager(cl *Client, sys *System, err chan error) GeneralManager {
return NewReactorManager(cl,sys,err)
}
func NewReactorCoordinator(sys *System, err chan error) *Coordinator {
return NewCoordinator(&reactorCoordinator{}, sys, err)
}
//tui coordinator
type tuiCoordinator struct {
//can add fields as needed
Ip string
Port map[uint32]int
}
func (t *tuiCoordinator) NewManager(cl *Client, sys *System, err chan error) GeneralManager {
return NewTUIManager(t.Ip,cl,sys,err)
}
func NewTUICoordinator(ip string, sys *System, err chan error) *Coordinator {
p := make(map[uint32]int)
return NewCoordinator(&tuiCoordinator{Ip:ip,Port:p}, sys, err)
}
func (c *Coordinator) GetTUIPort(cl *Client) int {
m := c.Managers.Directory[cl.Id]
return m.GetPort()
}

@ -3,7 +3,6 @@ package server
import (
"fmt"
"net"
"sync"
"context"
"FRMS/internal/pkg/system"
"google.golang.org/grpc"
@ -17,17 +16,12 @@ type Listener struct { // exporting for easy use in the short term
// Reactor map[uint32]*ReactorManager this will go in eventual "coordinator" struct
Ip string
Port int
*Coordinators
Coordinators map[string]*Coordinator
Sys *System
Err chan error
pb.UnimplementedHandshakeServer
}
type Coordinators struct {
Channel map[string](chan<- *Client)
sync.Mutex
}
type Client struct {
// can use general client and leave unset fields nil
Ip string
@ -49,8 +43,7 @@ func NewListener(ifconfig string,ch chan error) (*Listener, error) {
if ip, err = GetIp(ifconfig); err != nil {
return &Listener{}, err
}
m := make(map[string](chan<- *Client))
c := &Coordinators{Channel:m}
c := make(map[string]*Coordinator)
l := &Listener{Ip:ip,Err:ch}
l.Coordinators = c
l.Sys = NewSystemStruct()
@ -83,32 +76,29 @@ func (l *Listener) ReactorClientDiscoveryHandler(ctx context.Context, ping *pb.R
// incoming reactor ping need to spawn coord
c := &Client{Ip:ping.GetClientIp(),Model:ping.GetClientModel(),Type:"reactor",Port:int(ping.GetClientPort()),Id:ping.GetClientId()}
fmt.Printf("%v Client %v has connected from %v:%v\n",c.Type,c.Id,c.Ip,c.Port)
ch := make(chan int)
go l.ConnectClient(c,ch)
coord, ok := l.Coordinators["reactor"]
if !ok {
coord = NewReactorCoordinator(l.Sys, l.Err)
l.Coordinators["reactor"] = coord
coord.Start()
}
go coord.ClientHandler(c)
// we dont handle any actual logic about the creation so we just respon true if the request was received
return &pb.ReactorClientResponse{ClientId:c.Id,Success:true}, nil
}
func (l *Listener) TUIClientDiscoveryHandler(ctx context.Context, ping *pb.TUIClientRequest) (*pb.TUIClientResponse, error) {
t := &Client{Type:"tui",Id:ping.GetClientId()}
ch := make(chan int)
go l.ConnectClient(t,ch)
port := <-ch
coord, ok := l.Coordinators["tui"]
if !ok {
coord := NewTUICoordinator(l.Ip, l.Sys, l.Err)
l.Coordinators["tui"] = coord
coord.Start()
}
go coord.ClientHandler(t)
port := coord.GetTUIPort(t)
r := &pb.TUIClientResponse{ClientId:t.Id,ServerIp:l.Ip,ServerPort:int32(port)}
return r, nil
}
func (l *Listener) ConnectClient(c *Client, portchan chan int){
// send to reactor coordinator for ease
l.Coordinators.Lock()
defer l.Coordinators.Unlock()
if ch, exists := l.Coordinators.Channel[c.Type]; exists {
ch <-c
} else {
ch := make(chan *Client)
newC := NewCoordinator(c.Type, ch, l.Sys, portchan, l.Err)
go newC.Start()
l.Coordinators.Channel[c.Type] = ch
ch <-c
}
}

@ -12,7 +12,6 @@ import (
type Manager struct {
*Client // gives access to c.Ip c.Id etc
ClientReconnect chan *Client // chan for client reconnect
Hb time.Duration
Active active
Sig chan bool
@ -25,28 +24,19 @@ type active struct{
int
}
func NewManager(c *Client, ch chan *Client, sig chan bool, err chan error) *Manager {
func NewManager(c *Client, err chan error) *Manager {
hb := time.Duration(1) //hb to
m := &Manager{Hb:hb,ClientReconnect:ch,Err:err}
m := &Manager{Hb:hb,Err:err}
m.Client = c
m.Sig = sig
go m.Reconnect()
return m
}
func (m *Manager) Reconnect() {
c := <-m.ClientReconnect
m.Client = c // could contain new ip or port
m.Start()
}
func (m *Manager) Start() {
// establish connection with client and start pinging at set intervals
if !m.Activate() {
// manager already running
m.Err <-errors.New("Manager already running!")
} // if we get here, manager is atomically activated and we can ensure start wont run again
m.Sig <-true
}
func (m *Manager) Exit() {
@ -54,8 +44,6 @@ func (m *Manager) Exit() {
if !m.Deactivate() {
m.Err <-errors.New("Manager already disabled!")
}
go m.Reconnect()
m.Sig <-false
}
// reactor manager atomic operations

@ -25,25 +25,16 @@ type Devices struct {
D map[int]Device
}
func NewReactorManager(c *Client,ch chan *Client,sys *System,err chan error) {
func NewReactorManager(c *Client,sys *System,err chan error) GeneralManager {
d := new(Devices)
r := &ReactorManager{Devs:d}
start := make(chan bool)
r.Manager = NewManager(c, ch, start, err)
r.Manager = NewManager(c, err)
r.System = sys
go r.Listen(start)
}
func (r *ReactorManager) Listen(ch chan bool) {
for {
sig := <-ch
if sig {
r.Start()
}
}
return r
}
func (r *ReactorManager) Start() {
r.Manager.Start()
conn := r.Connect()
empty := &grpc.ClientConn{}
if conn != empty {
@ -51,6 +42,10 @@ func (r *ReactorManager) Start() {
}
}
func (r *ReactorManager) GetPort() int {
return 0
}
func (r *ReactorManager) Connect() *grpc.ClientConn {
// establish gRPC conection with reactor
var opts []grpc.DialOption

@ -19,34 +19,21 @@ type TUIManager struct {
Ip string
Port int
Err chan error
Hb time.Duration
Pc chan int
*pb.UnimplementedManagementServer
}
func NewTUIManager(c *Client, ip string, ch chan *Client,sys *System, pc chan int, err chan error) {
sig := make(chan bool)
m := NewManager(c, ch, sig, err)
hb := time.Duration(5)
t := &TUIManager{Hb: hb,Err: err}
func NewTUIManager(ip string, c *Client, sys *System, err chan error) GeneralManager {
m := NewManager(c, err)
t := &TUIManager{Err: err}
t.Manager = m
t.System = sys
t.Ip = ip
t.Pc = pc
go t.Listen(sig)
}
func (t *TUIManager) Listen(sig chan bool) {
for {
c := <-sig
if c {
t.Start()
}
}
return t
}
func (t *TUIManager) Start() {
//
t.Manager.Start()
go t.Register() // begin tui server to respond to tui client reqs
//go t.Monitor(conn)
}
@ -61,7 +48,13 @@ func (t *TUIManager) Register() {
pb.RegisterManagementServer(grpcServer,t)
go grpcServer.Serve(lis)
log.Printf("TUI %v Endpoint active on %v:%v\n",t.Id, t.Ip, t.Port)
t.Pc <-t.Port
}
func (t *TUIManager) GetPort() int {
for t.Port == 0 {
time.Sleep(10 * time.Millisecond)
}
return t.Port
}
func (t *TUIManager) GetReactors(ctx context.Context, req *pb.GetReactorsRequest) (*pb.GetReactorsResponse, error) {

Loading…
Cancel
Save