shifted server impl. Working on loading/storing config for db connections

main
Keegan 3 years ago
parent 54815d16f1
commit a887028e7a

@ -3,63 +3,48 @@ package main
import (
_"net/http"
_ "net/http/pprof"
"strconv"
//"flag"
"log"
//"log"
"os"
"fmt"
"FRMS/internal/pkg/logging"
"FRMS/internal/pkg/config"
//"FRMS/internal/pkg/config"
"FRMS/internal/pkg/server"
)
type listener interface {
type coordinator interface {
Start()
}
func NewListener(ch chan error, port int) listener {
return server.NewListener(ch, port)
}
type dbconfig interface {
GetUrl() string
GetOrg() string
GetBucket() string
GetToken() string
}
func ReadConfig() dbconfig {
return config.ReadServerConfig()
func NewCoordinator(port int, ch chan error) coordinator {
return server.NewCentralCoordinator(port, ch)
}
func main() {
// lets get this bread
// all we need to do is call the reactor coordinator and thats it
// removing os flags in favor of env vars
// go func() {
// fmt.Println(http.ListenAndServe("localhost:6060",nil))
// }()
ch := make(chan error)
// creating listener
var lport int
//var dbport int
if port := os.Getenv("gRPC_PORT"); port == "" {
lport = 2022 // default docker port
var port int
var err error
if p := os.Getenv("gRPC_PORT"); p == "" {
port = 2022 // default docker port
} else {
if port, err = strconv.Atoi(p); err != nil {
panic(err)
}
}
//if port := os.Getenv("DATABASE_PORT"); port == "" {
//dbport = 8086
//}
//fmt.Printf("DBPORT %d\n", dbport)
conf := ReadConfig()
fmt.Printf("Found %v %v %v %v\n",conf.GetUrl(),conf.GetBucket(),conf.GetOrg(),conf.GetToken())
fmt.Printf("Listening on %v\n", lport)
l := NewListener(ch,lport)
//db := os.Getenv("DATABASE_URL") // database url
go l.Start()
//fmt.Printf("Listening on %v\n", lport)
c := NewCoordinator(port, ch)
go c.Start()
fmt.Println("Server Active!")
logging.Debug(logging.DStart, "CCO 01 Server started")
err := <-ch // blocking to wait for any errors and keep alive otherwise
if err != nil {
log.Fatal(err)
}
err = <-ch // blocking to wait for any errors and keep alive otherwise
panic(err)
}

@ -9,7 +9,7 @@ services:
- "2022:2022"
- "2023:2023"
volumes:
- ./logs:/app/log
- ./logs:/log
environment:
- LOGTYPE=SERVER
- VERBOSE=1

@ -1,4 +1,21 @@
model: raspberrypi
bus: 1
model: beagleboard
bus: 2
server:
id: 1000213123
name: "Rack Server"
db-url: "http://192.168.100.2:3000"
db-token: ""
l-port: 2022
r-port: 2023
t-port: 2024
reactor:
id: 102233
name: "Beaglebone Black"
i2c-bus: 2
db-token: ""
db-bucket: ""
id: 220123123
name: "Raspberry Pi"
tui:
id: 10000
name: "kdeppe"
db-token: ""
db-bucket: ""

@ -12,8 +12,62 @@ import (
)
// this package creates coordinators responsible for keeping track of active clients and invoking managers
type SubCoordinator interface {
// this package creates the central coordiantor and sub coordiantors for clients
type CentralCoordinator struct {
ClientConnections *ClientPacket
CLisPort int
*SubCoordinators
*SystemViewer
Err chan error
}
type SubCoordinators struct {
Directory map [string]*SubCoordinator
sync.Mutex
}
func NewCentralCoordinator(port int, ch chan error) *CentralCoordinator {
c := &CentralCoordinator{CLisPort: port, Err: ch}
c.SystemViewer = NewSystemViewer()
go c.SystemViewer.Start()
s := make(map[string]*SubCoordinator)
sub := &SubCoordinators{Directory:s}
c.SubCoordinators = sub
return c
}
func (c *CentralCoordinator) Start() {
// starts up associated funcs
clientChan := make(chan *ClientPacket)
l := NewListener(c.CLisPort,clientChan,c.Err)
go l.Start()
go c.ClientListener(clientChan)
}
func (c *CentralCoordinator) ClientListener(ch chan *ClientPacket) {
for client := range ch {
// basically loops until channel is closed
port := c.ClientHandler(client.Client)
client.Port <-port
}
}
func (c *CentralCoordinator) ClientHandler(cl *Client) int {
c.SubCoordinators.Lock()
defer c.SubCoordinators.Unlock()
subcoord, ok := c.SubCoordinators.Directory[cl.Type]
if !ok {
// Sub Coordinator does not exists
logging.Debug(logging.DSpawn,"CC0 01 Created %v Coordinator",cl.Type)
subcoord = NewSubCoordinator(cl.Type,c.SystemViewer, c.Err)
c.SubCoordinators.Directory[cl.Type] = subcoord
}
go subcoord.ClientHandler(cl)
return subcoord.Port
}
type ManagerInterface interface {
Start()
NewManager(*Client,*SystemViewer, chan error) GeneralManager
GetManager(uint32) (GeneralManager, bool)
@ -21,56 +75,47 @@ type SubCoordinator interface {
Register()
}
type GeneralManager interface {
// used by sub coordinator to interact with manager
Start()
UpdateClient(*Client)
ReactorStatusHandler(context.Context,*pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error)
GetDevices(context.Context, *pb.GetDevicesRequest) (*pb.GetDevicesResponse, error)
}
type Coordinator struct {
type SubCoordinator struct {
Port int // port that we set up gRPC endpoint on
//*Managers going to embed this in subcoordinator
SubCoordinator
ManagerInterface // embed an interface to create/manager managers
*SystemViewer
Err chan error
}
type Managers struct {
Directory map[uint32]GeneralManager
Directory map[uint32]interface{} // support for either manager
sync.RWMutex // potential perf
}
// interface stuff
func NewCoordinator(clientType string, sys *SystemViewer, err chan error) *Coordinator {
d := make(map[uint32]GeneralManager)
m := &Managers{Directory:d}
c := &Coordinator{Err:err}
c.Port = 2023
sub, errs := NewSubCoordinator(clientType, m, err)
func NewSubCoordinator(clientType string, sys *SystemViewer, err chan error) *SubCoordinator {
c := &SubCoordinator{Err:err}
c.SystemViewer = sys
man, port, errs := NewCoordinatorType(clientType, err)
if errs != nil {
err <-errs
}
c.SubCoordinator = sub
c.SystemViewer = sys
//c.Managers = m
go c.Register()
c.Port = port
c.ManagerInterface = man
go man.Start()
go man.Register()
return c
}
func (c *Coordinator) Start() {
// on start we need to create channel listener
// on each new connection we want to check its id against our mapping
c.SubCoordinator.Start()
}
func (c *Coordinator) ClientHandler(cl *Client) int {
func (c *SubCoordinator) ClientHandler(cl *Client) int {
// (creates and) notifies manager of client connection
go c.UpdateManager(cl)
return c.Port
}
func (c *Coordinator) UpdateManager(cl *Client) {
func (c *SubCoordinator) UpdateManager(cl *Client) {
// shouldn't happen all that often so should be fine to lock
m, exists := c.GetManager(cl.Id)
if !exists {
@ -93,21 +138,27 @@ func (m *Managers) GetManager(id uint32) (GeneralManager, bool) {
m.RLock()
defer m.RUnlock()
man, exists := m.Directory[id]
return man, exists
if !exists {
return nil, exists
}
return man.(GeneralManager), exists
}
func NewSubCoordinator(clientType string, m *Managers, err chan error) (SubCoordinator, error) {
func NewCoordinatorType(clientType string, err chan error) (ManagerInterface, int, error) {
m := make(map[uint32]interface{})
if clientType == "reactor" {
c := &reactorCoordinator{}
c.Managers = m
return c, nil
//m := make(map[uint32]*ReactorManager)
c.Managers = &Managers{Directory:m}
return c, 2023, nil
} else if clientType == "tui" {
c := &tuiCoordinator{}
c.Managers = m
return c, nil
//m := make(map[uint32]*TUIManager)
c.Managers = &Managers{Directory:m}
return c, 2024, nil
}
return &reactorCoordinator{}, errors.New("Unrecognized client type")
return &reactorCoordinator{}, 0, errors.New("Unrecognized client type")
}
// creating sub coordinators for associated gRPC handlers
@ -142,12 +193,16 @@ func (r *reactorCoordinator) ReactorStatusHandler(ctx context.Context, req *pb.R
if !exists {
return &pb.ReactorStatusResponse{}, errors.New("Manager doesn't exists for that client")
}
return m.ReactorStatusHandler(ctx, req)
rm, ok := m.(*ReactorManager)
if !ok {
return &pb.ReactorStatusResponse{}, errors.New("Manager is not a reactor manager!")
}
return rm.ReactorStatusHandler(ctx, req)
}
//tui coordinator
type tuiCoordinator struct {
*Managers
*Managers // by embedding general struct we allow coordinator to still call general funcs
pb.UnimplementedManagementServer
}
@ -178,7 +233,11 @@ func (t *tuiCoordinator) GetDevices(ctx context.Context, req *pb.GetDevicesReque
// doesnt exist for some reason
return &pb.GetDevicesResponse{}, errors.New("Manager doesn't exists for client")
}
return m.GetDevices(ctx,req)
tm, ok := m.(*TUIManager)
if !ok {
return &pb.GetDevicesResponse{}, errors.New("Manager is not of type TUI")
}
return tm.GetDevices(ctx,req)
}
// unimplemented bs for grpc

@ -12,20 +12,21 @@ import (
)
/*
Originally this package served as a client listener to route requests
I am going to repurpose this to serve as a listener for all gRPC requests
should simplify interfaces
Listens on a supplied port and sends incoming clients over a supplied channel
*/
type Listener struct { // exporting for easy use in the short term
Port int
Coordinators map[string]*Coordinator
CLis *grpc.Server
Sys *SystemViewer
ClientConnections chan *ClientPacket
Err chan error
pb.UnimplementedHandshakeServer
}
type ClientPacket struct {
*Client
Port chan int
}
type Client struct {
// general client struct to store reqs from reactors/tui
Ip string
@ -35,11 +36,8 @@ type Client struct {
Type string
}
func NewListener(ch chan error, port int) *Listener {
c := make(map[string]*Coordinator)
l := &Listener{Err:ch}
l.Coordinators = c
l.Sys = NewSystemViewer()
func NewListener(port int, cch chan *ClientPacket, ech chan error) *Listener {
l := &Listener{Err:ech,ClientConnections:cch}
l.Port = port
return l
}
@ -49,10 +47,7 @@ func (l *Listener) Start() {
if err := l.Register(); err != nil {
l.Err <- err
}
go l.Sys.Start()
// listener started and grpc handler registered
logging.Debug(logging.DStart,"Started client listener on port %v\n",l.Port)
//fmt.Printf("==========================\n PORT: %v\n==========================\n",l.Port)
}
func (l *Listener) Register() error {
@ -65,23 +60,19 @@ func (l *Listener) Register() error {
pb.RegisterHandshakeServer(grpcServer, l)
go grpcServer.Serve(lis)
logging.Debug(logging.DStart, "LIS Registered on port %v", l.Port)
//lis, err = net.Listen("tcp", fmt.Sprintf(":%v",l.Port+1)) // either binding to supplied port or binding to docker default
//if err != nil {
//return err
//}
//grpcServer = grpc.NewServer()
//l.CLis = grpcServer
//go grpcServer.Serve(lis)
//logging.Debug(logging.DStart, "LIS Coordinator server registered on port %v", l.Port + 1)
return nil
}
func (l *Listener) ClientDiscoveryHandler(ctx context.Context, ping *pb.ClientRequest) (*pb.ClientResponse, error) {
// incoming reactor ping need to spawn coord
c := &Client{Id:ping.GetClientId(),Type:ping.GetClientType()}
logging.Debug(logging.DClient, "%v %v has connected\n",c.Type,c.Id)
logging.Debug(logging.DClient, "LIS %v %v has connected\n",c.Type,c.Id)
ch := make(chan int)
p := &ClientPacket{Port:ch}
p.Client = c
l.ClientConnections <-p
port := <-ch
/*
coord, ok := l.Coordinators[c.Type]
if !ok {
logging.Debug(logging.DSpawn,"CCO 01 Created Coordinator")
@ -90,6 +81,7 @@ func (l *Listener) ClientDiscoveryHandler(ctx context.Context, ping *pb.ClientRe
go coord.Start()
}
port := coord.ClientHandler(c)
*/
// return the port for the incoming requests
return &pb.ClientResponse{ClientId:c.Id,ServerPort:uint32(port)}, nil
}

@ -6,9 +6,8 @@ import (
"math"
"sync"
"errors"
"context"
_ "context"
"FRMS/internal/pkg/logging"
pb "FRMS/internal/pkg/grpc" // unimplemented base methods
)
// this package will implement a boilerplate manager
@ -104,6 +103,8 @@ func (m *Manager) Timeout() int {
return 0
}
}
/*
shouldnt be nessecary anymore
func (m *Manager) GetDevices(ctx context.Context, req *pb.GetDevicesRequest) (*pb.GetDevicesResponse, error) {
return &pb.GetDevicesResponse{}, errors.New("Get Devices not implemented!")
@ -112,3 +113,4 @@ func (m *Manager) GetDevices(ctx context.Context, req *pb.GetDevicesRequest) (*p
func (m *Manager) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) {
return &pb.ReactorStatusResponse{}, errors.New("Reactor Status Handler not implemented!")
}
*/

@ -44,6 +44,7 @@ func NewStatusMonitor(t string, id uint32, sys *SystemViewer) *StatusMonitor {
tid := make(chan uint32)
sm := &StatusMonitor{TransactionId:tid}
sm.SystemViewer = sys
logging.Debug(logging.DClient,"SYS Creating new status monitor")
if t == "Reactor" {
// reactor status monitor
sm.ReactorChan = sys.AddReactorSender()

21
notes

@ -876,3 +876,24 @@ Refactoring server code now
is there ever a situation where I would need to run this not on docker?
- can i just hardcode for docker and then rely on nginx for routing etc?
ALRIGHT TIME TO LOCK TF IN
#TODO 8/1
Time to set up proper config loading and storing
Doing it all through interfaces and tagged structs
On start up
Server needs to load up its own config
- take action on that config
wait for client connections
- load client config and reply with associated data
on client disconnect
- store any updates and return to idle state
restructing away from "listener" and coordiantor and stuff
going to just have central server
with an embedded listener
and database and shit
so
SERVER will call NewServer which will take care of subsequents

Loading…
Cancel
Save