server launches with proper config for itself and grafana, added storing config on exit or changes

main
Keegan 2 years ago
parent 429ce92207
commit 1c2868daf8

@ -1,70 +1,84 @@
package main package main
import ( import (
_"net/http" "fmt"
_ "net/http/pprof" _ "net/http"
"strconv" _ "net/http/pprof"
"strings" "os/signal"
//"flag" "strconv"
//"log" "strings"
"os" "syscall"
"fmt"
"FRMS/internal/pkg/logging" //"flag"
"FRMS/internal/pkg/config" //"log"
"FRMS/internal/pkg/server" "FRMS/internal/pkg/config"
"FRMS/internal/pkg/logging"
"FRMS/internal/pkg/server"
"os"
) )
type coordinator interface { type coordinator interface {
Start() Start()
} }
func NewCoordinator(ch chan error) coordinator { func NewCoordinator(ch chan error) coordinator {
return server.NewCentralCoordinator(ch) return server.NewCentralCoordinator(ch)
} }
func LoadConfig(fname string) Config { func LoadConfig(fname string) Config {
config.Load(fname) config.Load(fname)
return config.LoadConfig() return config.LoadConfig()
} }
type Config interface { type Config interface {
UpdatePort(string, int) error UpdatePort(string, int) error
Store() error
} }
func main() { func main() {
// lets get this bread // lets get this bread
// go func() { // go func() {
// fmt.Println(http.ListenAndServe("localhost:6060",nil)) // fmt.Println(http.ListenAndServe("localhost:6060",nil))
// }() // }()
conf := LoadConfig("server") gracefulShutdown := make(chan os.Signal, 1)
ch := make(chan error) signal.Notify(gracefulShutdown, syscall.SIGINT, syscall.SIGTERM)
// checking env conf := LoadConfig("server")
envVars := os.Environ() errCh := make(chan error)
for _, envString := range envVars { // checking env
// looping over set ports envVars := os.Environ()
initSplt := strings.Split(envString,"=") for _, envString := range envVars {
key := initSplt[0] // looping over set ports
val := initSplt[1] initSplt := strings.Split(envString, "=")
if strings.Contains(key,"PORT") { key := initSplt[0]
// parsing out correct port to update val := initSplt[1]
splt := strings.Split(key,"_") // LIS_PORT -> LIS, PORT if strings.Contains(key, "PORT") {
portName := strings.ToLower(splt[0]) // LIS -> lis // parsing out correct port to update
port, err := strconv.Atoi(val) splt := strings.Split(key, "_") // LIS_PORT -> LIS, PORT
if err != nil { portName := strings.ToLower(splt[0]) // LIS -> lis
panic(err) port, err := strconv.Atoi(val)
} if err != nil {
if err := conf.UpdatePort(portName,port); err != nil { panic(err)
panic(err) }
} if err := conf.UpdatePort(portName, port); err != nil {
} panic(err)
} }
}
}
//fmt.Printf("Listening on %v\n", lport) //fmt.Printf("Listening on %v\n", lport)
c := NewCoordinator(ch) c := NewCoordinator(errCh)
go c.Start() go c.Start()
fmt.Println("Server Active!") logging.Debug(logging.DStart, "CCO 01 Server started")
logging.Debug(logging.DStart, "CCO 01 Server started") select {
err := <-ch // blocking to wait for any errors and keep alive otherwise case err := <-errCh: // blocking to wait for any errors and keep alive otherwise
panic(err) panic(err)
case <-gracefulShutdown:
err := conf.Store()
if err != nil {
panic(err)
}
fmt.Println("Stored config successfully. Exiting...")
os.Exit(0)
}
} }

@ -8,12 +8,16 @@ services:
ports: ports:
- "2022:2022" - "2022:2022"
- "2023:2023" - "2023:2023"
- "2024:2024"
volumes: volumes:
- ./logs:/log - ./logs:/log
- server-config:/configs - server-config:/etc/frms/config
environment: environment:
- LOGTYPE=SERVER - LOGTYPE=SERVER
- VERBOSE=1 - VERBOSE=1
- LIS_PORT=2022
- REACTOR_PORT=2023
- TUI_PORT=2024
depends_on: depends_on:
- db - db
db: db:
@ -32,7 +36,7 @@ services:
- DOCKER_INFLUXDB_INIT_USERNAME=admin - DOCKER_INFLUXDB_INIT_USERNAME=admin
- DOCKER_INFLUXDB_INIT_PASSWORD=F0r3l1ght - DOCKER_INFLUXDB_INIT_PASSWORD=F0r3l1ght
- DOCKER_INFLUXDB_INIT_ORG=ForeLight - DOCKER_INFLUXDB_INIT_ORG=ForeLight
- DOCKER_INFLUXDB_INIT_BUCKET=default - DOCKER_INFLUXDB_INIT_BUCKET=test
grafana: grafana:
image: grafana/grafana-oss:latest image: grafana/grafana-oss:latest
ports: ports:
@ -40,6 +44,8 @@ services:
volumes: volumes:
- grafana-provisioning:/etc/grafana/provisioning - grafana-provisioning:/etc/grafana/provisioning
- grafana-data:/var/lib/grafana - grafana-data:/var/lib/grafana
depends_on:
- db
volumes: volumes:
grafana-data: grafana-data:
grafana-provisioning: grafana-provisioning:

@ -1,12 +1,14 @@
#!/bin/bash #!/bin/bash
#DB_URL=$(cat "$INFLUX_CONFIGS_PATH" | awk '/url/ {print $3}' | head -n 1)
DB_URL="frms-db-1:8086"
TOKEN=$(influx auth list --user ${DOCKER_INFLUXDB_INIT_USER_ID} --hide-headers | cut -f 3) TOKEN=$(influx auth list --user ${DOCKER_INFLUXDB_INIT_USER_ID} --hide-headers | cut -f 3)
ORG=$(influx org list | grep ${DOCKER_INFLUXDB_INIT_ORG_ID} | awk '{print $2}') ORG=$(influx org list | grep ${DOCKER_INFLUXDB_INIT_ORG_ID} | awk '{print $2}')
# creating starting server YAML # creating starting server YAML
echo -e "server:\n\tdb-url:${INFLUX_HOST}\n\tdb-org:${ORG}\n\tdb-token:${TOKEN}" >/configs/server.yaml; echo -e "server:\n db-url: ${DB_URL}\n db-org: ${ORG}\n db-token: ${TOKEN}" >/configs/server.yaml;
# creating grafana yaml # creating grafana yaml
influx user create -n grafana -o ${ORG} influx user create -n grafana -o ${ORG}
GRAFANA_USER_ID=$(influx user list --hide-headers --name grafana) GRAFANA_TOKEN=$(influx auth list --user grafana --hide-headers | cut -f 3)
GRAFANA_TOKEN=$(influx auth list --user ${GRAFANA_USER_ID} --hide-headers | cut -f 3) echo -e "apiVersion: 1\n\ndeleteDatasources:\n\ndatasources:\n - name: INFLUXDB\n type: influxdb\n access: proxy\n url: ${DB_URL}\n jsonData:\n httpMode: GET\n httpHeaderName1: 'Authorization'\n secureJsonData:\n httpHeaderValue1: 'Token ${GRAFANA_TOKEN}'" >/grafana/datasources/datasource.yaml
echo -e "datasources:\n\t- name: INFLUXDB\n\ttype: influxdb\n\turl:${INFLUX_HOST}\n\tdatabase: test\n\t jsonData:\n\t\thttpMode: GET\n\t\thttpHeaderName1: 'Authorization'\n\tsecureJsonData:\n\t\thttpHeaderValue1: 'Token ${GRAFANA_TOKEN}'"

@ -3,166 +3,184 @@ package config
// package serves to store/load config files for server // package serves to store/load config files for server
import ( import (
"fmt" "FRMS/internal/pkg/logging"
"strconv" "errors"
"github.com/spf13/viper" "fmt"
"FRMS/internal/pkg/logging" "strconv"
"errors" "strings"
"sync" "sync"
"strings"
//"os" "github.com/spf13/viper"
//"log" //"os"
//"os/exec" //"log"
//"bytes" //"os/exec"
//"bytes"
) )
type Config struct { type Config struct {
Server ServerConfig `mapstructure:"server"` Server ServerConfig `mapstructure:"server"`
Reactors map[string]ReactorConfig `mapstructure:"reactors"` Reactors map[string]ReactorConfig `mapstructure:"reactors"`
sync.RWMutex sync.RWMutex
} }
type ServerConfig struct { type ServerConfig struct {
URL string `mapstructure:"db-url"` URL string `mapstructure:"db-url"`
Token string `mapstructure:"db-token"` Token string `mapstructure:"db-token"`
Orginization string `mapstructure:"db-org"` Orginization string `mapstructure:"db-org"`
Ports map[string]int `mapstructure:"ports"` // changed from map[string]string to map[string]int Ports map[string]int `mapstructure:"ports"` // changed from map[string]string to map[string]int
Name string `mapstructure:"name"` Name string `mapstructure:"name"`
} }
type ReactorConfig struct { type ReactorConfig struct {
Token string `mapstructure:"db-token"` Token string `mapstructure:"db-token"`
Bucket string `mapstructure:"db-bucket"` Bucket string `mapstructure:"db-bucket"`
Name string Name string
Id uint32 Id uint32
} }
var C *Config var C *Config
func Load(fname string) { func Load(fname string) {
// read stored configs // read stored configs
C = &Config{} C = &Config{}
viper.SetConfigName(fname) viper.SetConfigName(fname)
viper.SetConfigType("yaml") viper.SetConfigType("yaml")
viper.AddConfigPath("./configs") viper.AddConfigPath("/etc/frms/config") //
//viper.AddConfigPath("../../internal/configs") // defaults
// defaults viper.SetDefault("server.db-org", "ForeLight")
viper.SetDefault("server.db-org", "ForeLight") viper.SetDefault("server.db-url", "http://192.168.100.2:8086")
viper.SetDefault("server.db-url", "http://192.168.100.2:8086") // unmarshalling
// unmarshalling err := viper.ReadInConfig() // the fact i never did this is infuriating
viper.ReadInConfig() // the fact i never did this is infuriating if err != nil {
logging.Debug(logging.DStart,"CON Loaded configs from %v", viper.ConfigFileUsed()) panic(err)
err := viper.Unmarshal(C) }
if err != nil { logging.Debug(logging.DStart, "CON Loaded configs from %v", viper.ConfigFileUsed())
logging.Debug(logging.DError,"Cannot unmarshall Server! %v",err) err = viper.Unmarshal(C)
panic(err) if err != nil {
} logging.Debug(logging.DError, "Cannot unmarshall Server! %v", err)
fmt.Printf("Outcome: %#v\n \n",C) panic(err)
//fmt.Printf("%v\n",C) }
// unmarshalled at this point fmt.Printf("Outcome: %#v\n \n", C)
//fmt.Printf("%v\n",C)
// unmarshalled at this point
} }
func LoadConfig() *Config { func LoadConfig() *Config {
return C return C
} }
func (c *Config) GetURL() (string, error) { func (c *Config) GetURL() (string, error) {
c.RLock() c.RLock()
defer c.RUnlock() defer c.RUnlock()
return C.Server.URL, nil return C.Server.URL, nil
} }
func (c *Config) GetOrg() (string, error) { func (c *Config) GetOrg() (string, error) {
c.RLock() c.RLock()
defer c.RUnlock() defer c.RUnlock()
return c.Server.Orginization, nil return c.Server.Orginization, nil
} }
func (c *Config) GetPort(port string) (int, error) { func (c *Config) GetPort(port string) (int, error) {
c.RLock() c.RLock()
defer c.RUnlock() defer c.RUnlock()
portString, ok := c.Server.Ports[port] portString, ok := c.Server.Ports[port]
if !ok { if !ok {
portEnv := strings.ToUpper(port) + "_PORT" portEnv := strings.ToUpper(port) + "_PORT"
return 0, fmt.Errorf("%s port doesnt exist! Please set using env %s=####",port, portEnv) return 0, fmt.Errorf("%s port doesnt exist! Please set using env %s=####", port, portEnv)
} }
// returns int, err // returns int, err
//return strconv.Atoi(portString) //return strconv.Atoi(portString)
return portString, nil return portString, nil
} }
func (c *Config) GetServerToken() (string, error) { func (c *Config) GetServerToken() (string, error) {
c.RLock() c.RLock()
defer c.RUnlock() defer c.RUnlock()
return c.Server.Token, nil return c.Server.Token, nil
} }
func (c *Config) GetReactorClient(id uint32) (string, string, error) { func (c *Config) GetReactorClient(id uint32) (string, string, error) {
c.RLock() c.RLock()
defer c.RUnlock() defer c.RUnlock()
idString := strconv.FormatUint(uint64(id),10) idString := strconv.FormatUint(uint64(id), 10)
if r, ok := c.Reactors[idString]; ok { if r, ok := c.Reactors[idString]; ok {
return r.Bucket, r.Token, nil return r.Bucket, r.Token, nil
} }
return "", "", fmt.Errorf("Reactor %v config doesnt exist!",id) return "", "", fmt.Errorf("Reactor %v config doesnt exist!", id)
} }
// setters // setters
func (c *Config) UpdateURL(url string) error { func (c *Config) UpdateURL(url string) error {
c.Lock() c.Lock()
defer c.Unlock() defer c.Unlock()
if url == "" { if url == "" {
return errors.New("String cannot be empty!") return errors.New("String cannot be empty!")
} }
c.Server.URL = url c.Server.URL = url
return viper.WriteConfig() viper.Set("server.db-url", url)
return viper.WriteConfigAs(viper.ConfigFileUsed())
} }
func (c *Config) UpdateOrg(org string) error { func (c *Config) UpdateOrg(org string) error {
c.Lock() c.Lock()
defer c.Unlock() defer c.Unlock()
if org == "" { if org == "" {
return errors.New("String cannot be empty!") return errors.New("String cannot be empty!")
} }
c.Server.Orginization = org c.Server.Orginization = org
return viper.WriteConfig() viper.Set("server.db-org", org)
return viper.WriteConfigAs(viper.ConfigFileUsed())
} }
func (c *Config) UpdatePort(pName string, port int) error { func (c *Config) UpdatePort(pName string, port int) error {
c.Lock() c.Lock()
defer c.Unlock() defer c.Unlock()
if port < 1024 || port > 65535 { if port < 1024 || port > 65535 {
// OOB // OOB
return fmt.Errorf("Port %d out of bounds! [1024,65535]",port) return fmt.Errorf("Port %d out of bounds! [1024,65535]", port)
} }
c.Server.Ports[pName] = port if c.Server.Ports == nil {
return nil c.Server.Ports = make(map[string]int)
}
c.Server.Ports[pName] = port
pname := fmt.Sprintf("server.ports.%s", pName)
viper.Set(pname, port)
return viper.WriteConfigAs(viper.ConfigFileUsed())
} }
func (c *Config) UpdateServerToken(token string) error { func (c *Config) UpdateServerToken(token string) error {
c.Lock() c.Lock()
defer c.Unlock() defer c.Unlock()
if token == "" { if token == "" {
return errors.New("String cannot be empty!") return errors.New("String cannot be empty!")
} }
c.Server.Token = token c.Server.Token = token
return viper.WriteConfig() viper.Set("server.token", token)
return viper.WriteConfigAs(viper.ConfigFileUsed())
} }
func (c *Config) UpdateReactorClient(id uint32, bucket, token string) error { func (c *Config) UpdateReactorClient(id uint32, bucket, token string) error {
c.Lock() c.Lock()
c.Unlock() c.Unlock()
sid := strconv.FormatUint(uint64(id), 10) sid := strconv.FormatUint(uint64(id), 10)
if token == "" || bucket == "" { if token == "" || bucket == "" {
return errors.New("String cannot be empty!") return errors.New("String cannot be empty!")
} }
if reactor, ok := c.Reactors[sid]; !ok { if reactor, ok := c.Reactors[sid]; !ok {
c.Reactors[sid] = ReactorConfig{Token:token,Bucket:bucket,Id:id} c.Reactors[sid] = ReactorConfig{Token: token, Bucket: bucket, Id: id}
} else { } else {
reactor.Bucket = bucket reactor.Bucket = bucket
reactor.Token = token reactor.Token = token
c.Reactors[sid] = reactor c.Reactors[sid] = reactor
} }
return viper.WriteConfig() reactorbucket := fmt.Sprintf("%s.db-bucket", id)
reactortoken := fmt.Sprintf("%s.db-token", id)
viper.Set(reactorbucket, bucket)
viper.Set(reactortoken, token)
return viper.WriteConfigAs(viper.ConfigFileUsed())
} }
func (c *Config) Store() error {
return viper.WriteConfigAs(viper.ConfigFileUsed())
}

@ -7,6 +7,7 @@ import (
type DBClient struct { type DBClient struct {
URL string URL string
Org string
Bucket string Bucket string
Token string Token string
// Client *influxdb2.Client // Client *influxdb2.Client
@ -17,18 +18,29 @@ type DBAdmin struct {
*DBClient *DBClient
} }
func NewDBClient(url, bucket, token string) *DBClient { func NewDBClient(url, org, token string) *DBClient {
db := &DBClient{URL:url, Bucket:bucket, Token:token} db := &DBClient{URL:url, Org:org, Token:token}
return db return db
} }
func NewDBAdmin(url, bucket, token string) *DBAdmin { func NewDBAdmin(url, org, token string) *DBAdmin {
admin := &DBAdmin{} admin := &DBAdmin{}
admin.DBClient = NewDBClient(url, bucket, token) admin.DBClient = NewDBClient(url, org, token)
return admin return admin
} }
// base level funcs // base level funcs
func (d *DBClient) Start() { func (d *DBClient) Start() {
// connect to DB // connect to DB
// d.Client = influxdb2.NewClient(d.URL,d.Token)
} }
func (d *DBAdmin) GetReactorClient(id string) (string, string, error) {
// given an id returns associated token and bucket
client := influxdb2.NewClient(d.URL,d.Token)
defer client.Close()
bucket, err := client.BucketsAPI().FindBucketByName(context.Background(),id)
if err != nil {
return "", "", err
}
if d.ReactorExists(id) {
// get corresponding reactor token and bucket

@ -1,17 +1,18 @@
package server package server
import ( import (
"sync" "FRMS/internal/pkg/config"
"fmt" pb "FRMS/internal/pkg/grpc"
"net" "FRMS/internal/pkg/influxdb"
"context" _ "FRMS/internal/pkg/influxdb"
"errors" "FRMS/internal/pkg/logging"
"FRMS/internal/pkg/logging" "context"
"google.golang.org/grpc" "errors"
pb "FRMS/internal/pkg/grpc" "fmt"
"FRMS/internal/pkg/config" "net"
_ "FRMS/internal/pkg/influxdb" "sync"
"google.golang.org/grpc"
) )
// this package creates the central coordiantor and sub coordiantors for clients // this package creates the central coordiantor and sub coordiantors for clients
@ -19,339 +20,324 @@ import (
// config interface // config interface
func LoadConfig() Config { func LoadConfig() Config {
// returns a ServerConfig that we can query and update // returns a ServerConfig that we can query and update
return config.LoadConfig() return config.LoadConfig()
} }
type Config interface { // PROPOSED RENAMING: ServerConfig to avoid confusion w/ reactor variant type Config interface { // PROPOSED RENAMING: ServerConfig to avoid confusion w/ reactor variant
// getters // getters
GetURL() (string, error) GetURL() (string, error)
GetOrg() (string, error) GetOrg() (string, error)
GetPort(string) (int, error) GetPort(string) (int, error)
GetServerToken() (string, error) GetServerToken() (string, error)
GetReactorClient(uint32) (string, string, error) // ret (bucket, token, err) GetReactorClient(uint32) (string, string, error) // ret (bucket, token, err)
// setters // setters
// save on write // save on write
UpdateURL(string) error //UpdateURL(string) error
UpdateOrg(string) error //UpdateOrg(string) error
UpdateServerToken(string) error //UpdateServerToken(string) error
UpdateReactorClient(uint32, string, string) error // call (id, bucket, token) UpdateReactorClient(uint32, string, string) error // call (id, bucket, token)
} }
// db client interface // db client interface
type DB interface{ type DB interface {
// getters (all create if doesnt exist) // getters (all create if doesnt exist)
GetToken() (string, error) // returns admin token (Creates if it doesnt exist) //GetToken() (string, error) // returns admin token (Creates if it doesnt exist)
GetReactorClient(uint32) (string, string, error) // returns (bucket, token, err) GetReactorClient(string) (string, string, error) // returns (bucket, token, err)
// delete // delete
DeleteReactorClient(uint32) error // removes client token but maintains bucket DeleteReactorClient(string) error // removes client token but maintains bucket
PurgeReactorClientData(uint32) error // perm deletes all assocaited reactor data (token, bucket etc) PurgeReactorClientData(string) error // perm deletes all assocaited reactor data (token, bucket etc)
} }
/*func NewDBClient() DBClient { func NewDBAdmin(url, org, token string) DB {
return influxdb.NewServerClient() return influxdb.NewDBAdmin(token, org, url)
}*/ }
type CentralCoordinator struct { type CentralCoordinator struct {
ClientConnections *ClientPacket ClientConnections *ClientPacket
//CLisPort int //CLisPort int
*SubCoordinators *SubCoordinators
*SystemViewer *SystemViewer
DB DB
Config Config
Err chan error Err chan error
} }
type SubCoordinators struct { type SubCoordinators struct {
Directory map [string]*SubCoordinator Directory map[string]*SubCoordinator
sync.Mutex sync.Mutex
} }
func NewCentralCoordinator(ch chan error) *CentralCoordinator { func NewCentralCoordinator(ch chan error) *CentralCoordinator {
c := &CentralCoordinator{Err: ch} c := &CentralCoordinator{Err: ch}
c.SystemViewer = NewSystemViewer() c.SystemViewer = NewSystemViewer()
go c.SystemViewer.Start() go c.SystemViewer.Start()
s := make(map[string]*SubCoordinator) s := make(map[string]*SubCoordinator)
sub := &SubCoordinators{Directory:s} sub := &SubCoordinators{Directory: s}
c.SubCoordinators = sub c.SubCoordinators = sub
return c return c
} }
func (c *CentralCoordinator) Start() { func (c *CentralCoordinator) Start() {
// starts up associated funcs // starts up associated funcs
// begin with config and client // begin with config and client
c.LoadCfg() c.LoadCfg()
clientChan := make(chan *ClientPacket) clientChan := make(chan *ClientPacket)
l := NewListener(clientChan,c.Err) l := NewListener(clientChan, c.Err)
go l.Start() go l.Start()
go c.ClientListener(clientChan) go c.ClientListener(clientChan)
} }
func (c *CentralCoordinator) LoadCfg() { func (c *CentralCoordinator) LoadCfg() {
// loads db client info and updates if anything is missing // loads db client info and updates if anything is missing
c.Config = LoadConfig() c.Config = LoadConfig()
_, err := c.Config.GetURL() _, err := c.Config.GetURL()
if err != nil { if err != nil {
logging.Debug(logging.DError,"CCO 01 Err: %v", err) logging.Debug(logging.DError, "CCO 01 Err: %v", err)
c.Err <-err c.Err <- err
} }
token, err := c.Config.GetServerToken() token, err := c.Config.GetServerToken()
if err != nil { if err != nil {
logging.Debug(logging.DError,"CCO 01 Err: %v", err) logging.Debug(logging.DError, "CCO 01 Err: %v", err)
c.Err <-err c.Err <- err
} else if token == "" { }
if token, err = c.DB.GetToken(); err != nil { org, err := c.Config.GetOrg()
logging.Debug(logging.DError,"CCO 01 Err: %v", err) if err != nil {
c.Err <-err logging.Debug(logging.DError, "CCO 01 Err: %v", err)
} c.Err <- err
c.Config.UpdateServerToken(token) }
}
org, err := c.Config.GetOrg()
if err != nil {
logging.Debug(logging.DError,"CCO 01 Err: %v", err)
c.Err <-err
} else if token == "" {
if token, err = c.DB.GetToken(); err != nil {
logging.Debug(logging.DError,"CCO 01 Err: %v", err)
c.Err <-err
}
c.Config.UpdateOrg(org)
}
} }
func (c *CentralCoordinator) ClientListener(ch chan *ClientPacket) { func (c *CentralCoordinator) ClientListener(ch chan *ClientPacket) {
for client := range ch { for client := range ch {
// basically loops until channel is closed // basically loops until channel is closed
cr := c.ClientHandler(client.Client) cr := c.ClientHandler(client.Client)
client.Response <-cr client.Response <- cr
} }
} }
func (c *CentralCoordinator) ClientHandler(cl *Client) *ClientResponse { func (c *CentralCoordinator) ClientHandler(cl *Client) *ClientResponse {
c.SubCoordinators.Lock() c.SubCoordinators.Lock()
defer c.SubCoordinators.Unlock() defer c.SubCoordinators.Unlock()
subcoord, ok := c.SubCoordinators.Directory[cl.Type] subcoord, ok := c.SubCoordinators.Directory[cl.Type]
if !ok { if !ok {
// Sub Coordinator does not exists // Sub Coordinator does not exists
logging.Debug(logging.DSpawn,"CC0 01 Created %v Coordinator",cl.Type) logging.Debug(logging.DSpawn, "CC0 01 Created %v Coordinator", cl.Type)
subcoord = NewSubCoordinator(cl.Type,c.SystemViewer, c.Err) subcoord = NewSubCoordinator(cl.Type, c.SystemViewer, c.Err)
c.SubCoordinators.Directory[cl.Type] = subcoord c.SubCoordinators.Directory[cl.Type] = subcoord
} }
go subcoord.ClientHandler(cl) go subcoord.ClientHandler(cl)
// setting up client response // setting up client response
var url, org, token, bucket string var url, org, token, bucket string
var port int var port int
var err error var err error
if url, err = c.Config.GetURL(); err != nil { if url, err = c.Config.GetURL(); err != nil {
logging.Debug(logging.DError,"Error: %v", err) logging.Debug(logging.DError, "Error: %v", err)
c.Err <-err c.Err <- err
} else if org, err = c.Config.GetOrg(); err != nil { } else if org, err = c.Config.GetOrg(); err != nil {
logging.Debug(logging.DError,"Error: %v", err) logging.Debug(logging.DError, "Error: %v", err)
c.Err <-err c.Err <- err
} else if bucket, token, err = c.Config.GetReactorClient(cl.Id); err != nil { } else if bucket, token, err = c.Config.GetReactorClient(cl.Id); err != nil {
logging.Debug(logging.DError,"Error: %v", err) logging.Debug(logging.DError, "Error: %v", err)
c.Err <-err c.Err <- err
} else if port, err = c.Config.GetPort(cl.Type); err != nil { } else if port, err = c.Config.GetPort(cl.Type); err != nil {
logging.Debug(logging.DError,"Error: %v", err) logging.Debug(logging.DError, "Error: %v", err)
c.Err <-err c.Err <- err
} }
cr := &ClientResponse{URL:url, Org:org, Token:token, Bucket:bucket, Port:port} cr := &ClientResponse{URL: url, Org: org, Token: token, Bucket: bucket, Port: port}
return cr return cr
} }
type ManagerInterface interface { type ManagerInterface interface {
Start() Start()
NewManager(*Client,*SystemViewer, chan error) GeneralManager NewManager(*Client, *SystemViewer, chan error) GeneralManager
GetManager(uint32) (GeneralManager, bool) GetManager(uint32) (GeneralManager, bool)
AddManager(uint32, GeneralManager) AddManager(uint32, GeneralManager)
Register() Register()
} }
type GeneralManager interface { type GeneralManager interface {
// used by sub coordinator to interact with manager // used by sub coordinator to interact with manager
Start() Start()
UpdateClient(*Client) UpdateClient(*Client)
} }
type SubCoordinator struct { type SubCoordinator struct {
Port int // port that we set up gRPC endpoint on Port int // port that we set up gRPC endpoint on
ManagerInterface // embed an interface to create/manager managers ManagerInterface // embed an interface to create/manager managers
*SystemViewer *SystemViewer
Err chan error Err chan error
} }
type Managers struct { type Managers struct {
Directory map[uint32]interface{} // support for either manager Directory map[uint32]interface{} // support for either manager
sync.RWMutex // potential perf sync.RWMutex // potential perf
} }
// interface stuff // interface stuff
func NewSubCoordinator(clientType string, sys *SystemViewer, err chan error) *SubCoordinator { func NewSubCoordinator(clientType string, sys *SystemViewer, err chan error) *SubCoordinator {
c := &SubCoordinator{Err:err} c := &SubCoordinator{Err: err}
c.SystemViewer = sys c.SystemViewer = sys
man, errs := NewCoordinatorType(clientType, err) man, errs := NewCoordinatorType(clientType, err)
if errs != nil { if errs != nil {
err <-errs err <- errs
} }
c.ManagerInterface = man c.ManagerInterface = man
go man.Start() go man.Start()
go man.Register() go man.Register()
return c return c
} }
func (c *SubCoordinator) ClientHandler(cl *Client) { func (c *SubCoordinator) ClientHandler(cl *Client) {
// (creates and) notifies manager of client connection // (creates and) notifies manager of client connection
c.UpdateManager(cl) c.UpdateManager(cl)
} }
func (c *SubCoordinator) UpdateManager(cl *Client) { func (c *SubCoordinator) UpdateManager(cl *Client) {
// shouldn't happen all that often so should be fine to lock // shouldn't happen all that often so should be fine to lock
m, exists := c.GetManager(cl.Id) m, exists := c.GetManager(cl.Id)
if !exists { if !exists {
m = c.NewManager(cl, c.SystemViewer, c.Err) m = c.NewManager(cl, c.SystemViewer, c.Err)
m.UpdateClient(cl) m.UpdateClient(cl)
go c.AddManager(cl.Id, m) go c.AddManager(cl.Id, m)
go m.Start() go m.Start()
} }
go m.UpdateClient(cl) go m.UpdateClient(cl)
} }
func (m *Managers) AddManager(id uint32, man GeneralManager) { func (m *Managers) AddManager(id uint32, man GeneralManager) {
m.Lock() m.Lock()
defer m.Unlock() defer m.Unlock()
m.Directory[id] = man m.Directory[id] = man
} }
func (m *Managers) GetManager(id uint32) (GeneralManager, bool) { func (m *Managers) GetManager(id uint32) (GeneralManager, bool) {
// just read locks and reuturns // just read locks and reuturns
m.RLock() m.RLock()
defer m.RUnlock() defer m.RUnlock()
man, exists := m.Directory[id] man, exists := m.Directory[id]
if !exists { if !exists {
return nil, exists return nil, exists
} }
return man.(GeneralManager), exists return man.(GeneralManager), exists
} }
func NewCoordinatorType(clientType string, err chan error) (ManagerInterface, error) { func NewCoordinatorType(clientType string, err chan error) (ManagerInterface, error) {
m := make(map[uint32]interface{}) m := make(map[uint32]interface{})
if clientType == "reactor" { if clientType == "reactor" {
c := &reactorCoordinator{} c := &reactorCoordinator{}
//m := make(map[uint32]*ReactorManager) //m := make(map[uint32]*ReactorManager)
c.Managers = &Managers{Directory:m} c.Managers = &Managers{Directory: m}
return c, nil return c, nil
} else if clientType == "tui" { } else if clientType == "tui" {
c := &tuiCoordinator{} c := &tuiCoordinator{}
//m := make(map[uint32]*TUIManager) //m := make(map[uint32]*TUIManager)
c.Managers = &Managers{Directory:m} c.Managers = &Managers{Directory: m}
return c, nil return c, nil
} }
return &reactorCoordinator{}, errors.New("Unrecognized client type") return &reactorCoordinator{}, errors.New("Unrecognized client type")
} }
// creating sub coordinators for associated gRPC handlers // creating sub coordinators for associated gRPC handlers
// reactor coordinator // reactor coordinator
type reactorCoordinator struct { type reactorCoordinator struct {
*Managers *Managers
pb.UnimplementedMonitoringServer pb.UnimplementedMonitoringServer
} }
func (r *reactorCoordinator) Start() { func (r *reactorCoordinator) Start() {
logging.Debug(logging.DStart,"RCO 01 Starting!") logging.Debug(logging.DStart, "RCO 01 Starting!")
} }
func (r *reactorCoordinator) NewManager(cl *Client, sys *SystemViewer, err chan error) GeneralManager { func (r *reactorCoordinator) NewManager(cl *Client, sys *SystemViewer, err chan error) GeneralManager {
logging.Debug(logging.DClient, "RCO 01 starting manager for %v client %v",cl.Type,cl.Id) logging.Debug(logging.DClient, "RCO 01 starting manager for %v client %v", cl.Type, cl.Id)
return NewReactorManager(cl,sys,err) return NewReactorManager(cl, sys, err)
} }
func (r *reactorCoordinator) Register() { func (r *reactorCoordinator) Register() {
conf := LoadConfig() conf := LoadConfig()
port, err := conf.GetPort("reactor") port, err := conf.GetPort("reactor")
if err != nil { if err != nil {
panic(err) panic(err)
} }
lis, err := net.Listen("tcp", fmt.Sprintf(":%v",port)) lis, err := net.Listen("tcp", fmt.Sprintf(":%v", port))
if err != nil { if err != nil {
panic(err) panic(err)
} }
grpcServer := grpc.NewServer() grpcServer := grpc.NewServer()
pb.RegisterMonitoringServer(grpcServer,r) pb.RegisterMonitoringServer(grpcServer, r)
go grpcServer.Serve(lis) go grpcServer.Serve(lis)
logging.Debug(logging.DClient, "RCO ready for client requests") logging.Debug(logging.DClient, "RCO ready for client requests")
} }
func (r *reactorCoordinator) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) { func (r *reactorCoordinator) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) {
m, exists := r.GetManager(req.GetId()) m, exists := r.GetManager(req.GetId())
if !exists { if !exists {
return &pb.ReactorStatusResponse{}, errors.New("Manager doesn't exists for that client") return &pb.ReactorStatusResponse{}, errors.New("Manager doesn't exists for that client")
} }
rm, ok := m.(*ReactorManager) rm, ok := m.(*ReactorManager)
if !ok { if !ok {
return &pb.ReactorStatusResponse{}, errors.New("Manager is not a reactor manager!") return &pb.ReactorStatusResponse{}, errors.New("Manager is not a reactor manager!")
} }
return rm.ReactorStatusHandler(ctx, req) return rm.ReactorStatusHandler(ctx, req)
} }
//tui coordinator //tui coordinator
type tuiCoordinator struct { type tuiCoordinator struct {
*Managers // by embedding general struct we allow coordinator to still call general funcs *Managers // by embedding general struct we allow coordinator to still call general funcs
pb.UnimplementedManagementServer pb.UnimplementedManagementServer
} }
func (t *tuiCoordinator) Start() { func (t *tuiCoordinator) Start() {
logging.Debug(logging.DStart,"TCO 01 Starting!") logging.Debug(logging.DStart, "TCO 01 Starting!")
} }
func (t *tuiCoordinator) NewManager(cl *Client, sys *SystemViewer, err chan error) GeneralManager { func (t *tuiCoordinator) NewManager(cl *Client, sys *SystemViewer, err chan error) GeneralManager {
logging.Debug(logging.DClient, "TCO 01 starting manager for %v client %v",cl.Type,cl.Id) logging.Debug(logging.DClient, "TCO 01 starting manager for %v client %v", cl.Type, cl.Id)
return NewTUIManager(cl,sys,err) return NewTUIManager(cl, sys, err)
} }
func (t *tuiCoordinator) Register() { func (t *tuiCoordinator) Register() {
conf := LoadConfig() conf := LoadConfig()
port, err := conf.GetPort("tui") port, err := conf.GetPort("tui")
if err != nil { if err != nil {
panic(err) panic(err)
} }
lis, err := net.Listen("tcp", fmt.Sprintf(":%v",port)) lis, err := net.Listen("tcp", fmt.Sprintf(":%v", port))
if err != nil { if err != nil {
// rip // rip
} }
grpcServer := grpc.NewServer() grpcServer := grpc.NewServer()
pb.RegisterManagementServer(grpcServer,t) pb.RegisterManagementServer(grpcServer, t)
go grpcServer.Serve(lis) go grpcServer.Serve(lis)
logging.Debug(logging.DClient, "TCO ready for client requests") logging.Debug(logging.DClient, "TCO ready for client requests")
} }
func (t *tuiCoordinator) GetDevices(ctx context.Context, req *pb.GetDevicesRequest) (*pb.GetDevicesResponse, error) { func (t *tuiCoordinator) GetDevices(ctx context.Context, req *pb.GetDevicesRequest) (*pb.GetDevicesResponse, error) {
// grpc handler to fwd to manager // grpc handler to fwd to manager
m, exists := t.GetManager(req.GetClientId()) m, exists := t.GetManager(req.GetClientId())
if !exists { if !exists {
// doesnt exist for some reason // doesnt exist for some reason
return &pb.GetDevicesResponse{}, errors.New("Manager doesn't exists for client") return &pb.GetDevicesResponse{}, errors.New("Manager doesn't exists for client")
} }
tm, ok := m.(*TUIManager) tm, ok := m.(*TUIManager)
if !ok { if !ok {
return &pb.GetDevicesResponse{}, errors.New("Manager is not of type TUI") return &pb.GetDevicesResponse{}, errors.New("Manager is not of type TUI")
} }
return tm.GetDevices(ctx,req) return tm.GetDevices(ctx, req)
} }
// unimplemented bs for grpc // unimplemented bs for grpc
func (t *tuiCoordinator) DeleteReactor(ctx context.Context, req *pb.DeleteReactorRequest) (*pb.DeleteReactorResponse, error) { func (t *tuiCoordinator) DeleteReactor(ctx context.Context, req *pb.DeleteReactorRequest) (*pb.DeleteReactorResponse, error) {
// TODO // TODO
return &pb.DeleteReactorResponse{}, nil return &pb.DeleteReactorResponse{}, nil
} }
func (t *tuiCoordinator) DeleteReactorDevice(ctx context.Context, req *pb.DeleteReactorDeviceRequest) (*pb.DeleteReactorDeviceResponse, error) { func (t *tuiCoordinator) DeleteReactorDevice(ctx context.Context, req *pb.DeleteReactorDeviceRequest) (*pb.DeleteReactorDeviceResponse, error) {
// TODO // TODO
return &pb.DeleteReactorDeviceResponse{}, nil return &pb.DeleteReactorDeviceResponse{}, nil
} }

Loading…
Cancel
Save