added more standard server structs, in process of making system only send new info
parent
12cd29ba18
commit
5cd34e89b6
Binary file not shown.
@ -0,0 +1,66 @@
|
|||||||
|
package server
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
"log"
|
||||||
|
)
|
||||||
|
|
||||||
|
// this package creates coordinators responsible for keeping track of active clients and invoking managers
|
||||||
|
|
||||||
|
type Coordinator struct {
|
||||||
|
Type string // ["reactor","tui"]
|
||||||
|
IncomingClients <-chan *Client
|
||||||
|
Managers
|
||||||
|
Err chan error
|
||||||
|
}
|
||||||
|
|
||||||
|
type Managers struct {
|
||||||
|
Directory map[uint32](chan<- bool)
|
||||||
|
sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewCoordinator(t string,ch chan *Client, err chan error) *Coordinator {
|
||||||
|
return &Coordinator{Type: t,IncomingClients: ch,Err:err}
|
||||||
|
}
|
||||||
|
|
||||||
|
func FindNewManager(c *Client,ch chan bool, err chan error) {
|
||||||
|
switch c.Type {
|
||||||
|
case "reactor":
|
||||||
|
NewReactorManager(c,ch,err)
|
||||||
|
case "tui":
|
||||||
|
NewTUIManager(c,ch,err)
|
||||||
|
default:
|
||||||
|
log.Fatal(fmt.Sprintf("ERROR %v NOT FOUND",c.Type))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Coordinator) Start() {
|
||||||
|
// on start we need to create channel listener
|
||||||
|
// on each new connection we want to check its id against our mapping
|
||||||
|
go c.Listen()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Coordinator) Listen() {
|
||||||
|
for {
|
||||||
|
cl := <-c.IncomingClients
|
||||||
|
go c.ClientHandler(cl)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Coordinator) ClientHandler(cl *Client) {
|
||||||
|
// (creates and) notifies manager of client connection
|
||||||
|
c.Managers.Lock()
|
||||||
|
defer c.Managers.Unlock()
|
||||||
|
if m, exists := c.Managers.Directory[cl.Id]; exists {
|
||||||
|
// manager in memory
|
||||||
|
m <-true
|
||||||
|
} else {
|
||||||
|
// create channel and manager
|
||||||
|
ch := make(chan bool)
|
||||||
|
FindNewManager(cl, ch,c.Err)
|
||||||
|
c.Managers.Directory[cl.Id] = ch
|
||||||
|
// will block until manager is ready
|
||||||
|
ch <-true
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,110 @@
|
|||||||
|
package server
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"net"
|
||||||
|
"log"
|
||||||
|
"sync"
|
||||||
|
"context"
|
||||||
|
"FRMS/internal/pkg/system"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
pb "FRMS/internal/pkg/grpc"
|
||||||
|
)
|
||||||
|
|
||||||
|
// the goal here is to set up a gRPC server to respond to client pings with their IP and to establish a new manager for that specific client
|
||||||
|
|
||||||
|
// going to rename shit to be more general
|
||||||
|
type Listener struct { // exporting for easy use in the short term
|
||||||
|
// Reactor map[uint32]*ReactorManager this will go in eventual "coordinator" struct
|
||||||
|
Ip string
|
||||||
|
Port int
|
||||||
|
*Coordinators
|
||||||
|
Err chan error
|
||||||
|
pb.UnimplementedHandshakeServer
|
||||||
|
}
|
||||||
|
|
||||||
|
type Coordinators struct {
|
||||||
|
Channel map[string](chan<- *Client)
|
||||||
|
sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
type Client struct {
|
||||||
|
Ip string
|
||||||
|
Port int
|
||||||
|
Id uint32
|
||||||
|
Model string
|
||||||
|
Type string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewClient(ip, model, t string, port int, id uint32) *Client {
|
||||||
|
return &Client{Ip:ip,Port:port,Id:id,Model:model,Type:t}
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetIp(e string) (string, error) {
|
||||||
|
return system.GetIp(e)
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewListener(ifconfig string,ch chan error) (*Listener, error) {
|
||||||
|
//m := make(map[uint32]*ReactorManager)
|
||||||
|
var ip string
|
||||||
|
var err error
|
||||||
|
if ip, err = GetIp(ifconfig); err != nil {
|
||||||
|
return &Listener{}, err
|
||||||
|
}
|
||||||
|
m := make(map[string](chan<- *Client))
|
||||||
|
c := &Coordinators{Channel:m}
|
||||||
|
l := &Listener{Ip:ip,Err:ch}
|
||||||
|
l.Coordinators = c
|
||||||
|
return l, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *Listener) Start() {
|
||||||
|
// start grpc server and implement reciever
|
||||||
|
if err := l.Register(); err != nil {
|
||||||
|
l.Err <- err
|
||||||
|
}
|
||||||
|
// listener started and grpc handler registered
|
||||||
|
fmt.Printf("Started listener on %v:%v\n",l.Ip,l.Port)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *Listener) Register() error {
|
||||||
|
// creates a gRPC service and binds it to our handler
|
||||||
|
lis, err := net.Listen("tcp", fmt.Sprintf("%v:0",l.Ip)) // by binding to :0 we should get assigned an empty port
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
l.Port = lis.Addr().(*net.TCPAddr).Port // getting the port we were assigned
|
||||||
|
grpcServer := grpc.NewServer()
|
||||||
|
pb.RegisterHandshakeServer(grpcServer, l)
|
||||||
|
go grpcServer.Serve(lis)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *Listener) ClientDiscoveryHandler(ctx context.Context, ping *pb.ClientDiscoveryRequest) (*pb.ClientDiscoveryResponse, error) {
|
||||||
|
// incoming reactor ping need to spawn coord
|
||||||
|
c := NewClient(ping.GetIp(),ping.GetModel(),ping.GetClientType(),int(ping.GetPort()),ping.GetId())
|
||||||
|
fmt.Printf("%v Client %v has connected from %v:%v\n",c.Type,c.Id,c.Ip,c.Port)
|
||||||
|
go l.ConnectClient(c)
|
||||||
|
// we dont handle any actual logic about the creation so we just respon true if the request was received
|
||||||
|
return &pb.ClientDiscoveryResponse{Success:true}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *Listener) ConnectClient(c *Client){
|
||||||
|
// send to reactor coordinator for ease
|
||||||
|
l.Coordinators.Lock()
|
||||||
|
defer l.Coordinators.Unlock()
|
||||||
|
switch c.Type {
|
||||||
|
case "reactor","tui":
|
||||||
|
if ch, exists := l.Coordinators.Channel[c.Type]; exists {
|
||||||
|
ch <-c
|
||||||
|
} else {
|
||||||
|
ch := make(chan *Client)
|
||||||
|
newC := NewCoordinator(c.Type, ch, l.Err)
|
||||||
|
go newC.Start()
|
||||||
|
l.Coordinators.Channel[c.Type] = ch
|
||||||
|
ch <-c
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
log.Fatal("Error! client %v not supported!",c.Type)
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,135 @@
|
|||||||
|
package server
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
"math"
|
||||||
|
"sync"
|
||||||
|
"errors"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
"google.golang.org/grpc/status"
|
||||||
|
"google.golang.org/grpc/credentials/insecure"
|
||||||
|
)
|
||||||
|
|
||||||
|
// this package will implement a boilerplate manager
|
||||||
|
// manager connects to client on start and returns the gRPC connection to make gRPC clients
|
||||||
|
|
||||||
|
type Manager struct {
|
||||||
|
*Client // gives access to c.Ip c.Id etc
|
||||||
|
Hb time.Duration
|
||||||
|
Active active
|
||||||
|
Err chan error
|
||||||
|
}
|
||||||
|
|
||||||
|
type active struct{
|
||||||
|
sync.Mutex
|
||||||
|
bool
|
||||||
|
int
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewManager(c *Client, err chan error) *Manager {
|
||||||
|
hb := time.Duration(5) //hb to
|
||||||
|
m := &Manager{Hb:hb,Err:err}
|
||||||
|
m.Client = c
|
||||||
|
return m
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) Start() {
|
||||||
|
// establish connection with client and start pinging at set intervals
|
||||||
|
if !m.Activate() {
|
||||||
|
// manager already running
|
||||||
|
m.Err <-errors.New("Manager already running!")
|
||||||
|
} // if we get here, manager is atomically activated and we can ensure start wont run again
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) Exit() {
|
||||||
|
// exit function to eventually allow saving to configs
|
||||||
|
if !m.Deactivate() {
|
||||||
|
m.Err <-errors.New("Manager already disabled!")
|
||||||
|
}
|
||||||
|
fmt.Printf("Manager %v exiting", m.Id)
|
||||||
|
}
|
||||||
|
|
||||||
|
// reactor manager atomic operations
|
||||||
|
|
||||||
|
func (m *Manager) IsActive() bool {
|
||||||
|
m.Active.Lock()
|
||||||
|
defer m.Active.Unlock()
|
||||||
|
return m.Active.bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) Activate() bool {
|
||||||
|
m.Active.Lock()
|
||||||
|
defer m.Active.Unlock()
|
||||||
|
alive := m.Active.bool
|
||||||
|
if alive {
|
||||||
|
return false
|
||||||
|
} else {
|
||||||
|
m.Active.bool = true
|
||||||
|
m.Active.int = 0
|
||||||
|
return m.Active.bool
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) Deactivate() bool {
|
||||||
|
m.Active.Lock()
|
||||||
|
defer m.Active.Unlock()
|
||||||
|
alive := m.Active.bool
|
||||||
|
if alive {
|
||||||
|
m.Active.bool = false
|
||||||
|
return true
|
||||||
|
} else {
|
||||||
|
return m.Active.bool
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// connection stuff
|
||||||
|
|
||||||
|
func (m *Manager) Timeout() int {
|
||||||
|
// keeps track of and generates timeout [0-1.2s) over span of ~2.5s
|
||||||
|
// returns 0 on TO elapse
|
||||||
|
m.Active.Lock()
|
||||||
|
defer m.Active.Unlock()
|
||||||
|
if m.Active.int < 9 {
|
||||||
|
v := int(5 * math.Pow(float64(2), float64(m.Active.int)))
|
||||||
|
m.Active.int += 1
|
||||||
|
return v
|
||||||
|
} else {
|
||||||
|
// exceeded retries
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) Connect() *grpc.ClientConn{
|
||||||
|
// establish initial gRPC connection with client
|
||||||
|
var opts []grpc.DialOption
|
||||||
|
var conn *grpc.ClientConn
|
||||||
|
opts = append(opts,grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||||
|
for {
|
||||||
|
if !m.IsActive() {
|
||||||
|
fmt.Printf("Aborting connection attempt\n")
|
||||||
|
return &grpc.ClientConn{}
|
||||||
|
}
|
||||||
|
var err error
|
||||||
|
conn, err = grpc.Dial(fmt.Sprintf("%v:%v",m.Ip,m.Port),opts...)
|
||||||
|
// begin error handling
|
||||||
|
code := status.Code(err)
|
||||||
|
if code != 0 { // != OK
|
||||||
|
if code == (5 | 14) { // == unavailable or not found
|
||||||
|
to := m.Timeout()
|
||||||
|
if to == 0 {
|
||||||
|
fmt.Printf("Client not responding, exiting...\n")
|
||||||
|
m.Exit()
|
||||||
|
return&grpc.ClientConn{}
|
||||||
|
}
|
||||||
|
fmt.Printf("gRPC endpoint currently unavailable, retrying in %v ms",to)
|
||||||
|
time.Sleep(time.Duration(to) * time.Millisecond)
|
||||||
|
} else {
|
||||||
|
fmt.Printf("ERR GRPC: %v\n",code)
|
||||||
|
m.Err <-err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
return conn
|
||||||
|
}
|
@ -0,0 +1,80 @@
|
|||||||
|
package server
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
// implement tui specific manager to be called for each client conn
|
||||||
|
|
||||||
|
type TUIManager struct {
|
||||||
|
*Manager // embedded manager for access to methods and client
|
||||||
|
Sys *KnownReactors
|
||||||
|
ClientConnections <-chan bool
|
||||||
|
Err chan error
|
||||||
|
Hb time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
type KnownReactors struct {
|
||||||
|
Reactors map[uint32]*Reactor
|
||||||
|
sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
type Reactor struct {
|
||||||
|
Devices map[string]*Device
|
||||||
|
}
|
||||||
|
|
||||||
|
type Device struct {
|
||||||
|
Status DeviceStatus
|
||||||
|
Type string
|
||||||
|
Addr int
|
||||||
|
}
|
||||||
|
|
||||||
|
type DeviceStatus uint32
|
||||||
|
|
||||||
|
const (
|
||||||
|
READY DeviceStatus = iota
|
||||||
|
ACTIVE
|
||||||
|
DISABLED
|
||||||
|
)
|
||||||
|
|
||||||
|
func (d DeviceStatus) String() string {
|
||||||
|
return [...]string{"Ready","Active","Disabled"}[d]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d Device) String() string {
|
||||||
|
return fmt.Sprintf("%v is %v at %x",d.Type,d.Status,d.Addr)
|
||||||
|
}
|
||||||
|
|
||||||
|
type DeviceManager interface{
|
||||||
|
// GetStatus() uint32 UNSUPPORTED: arguable memory benifit but until we support 100s of sensors across 10s of tui clients im not implementing it
|
||||||
|
PrintSatus() string
|
||||||
|
GetType() string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewTUIManager(c *Client,ch chan bool, err chan error) {
|
||||||
|
k := new(KnownReactors)
|
||||||
|
m := NewManager(c, err)
|
||||||
|
hb := time.Duration(5)
|
||||||
|
t := &TUIManager{Hb: hb,Sys: k,Err: err, ClientConnections: ch}
|
||||||
|
t.Manager = m
|
||||||
|
go t.Listen()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TUIManager) Listen() {
|
||||||
|
for {
|
||||||
|
c := <-t.ClientConnections
|
||||||
|
if c {
|
||||||
|
t.Start()
|
||||||
|
} else {
|
||||||
|
t.Exit()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TUIManager) Start() {
|
||||||
|
t.Manager.Start()
|
||||||
|
//conn := t.Conn()
|
||||||
|
//go t.Monitor(conn)
|
||||||
|
}
|
Loading…
Reference in New Issue