From 1c2868daf858bfcf94c10c91f554f84bac748698 Mon Sep 17 00:00:00 2001 From: Keegan Date: Tue, 16 Aug 2022 16:28:01 -0400 Subject: [PATCH] server launches with proper config for itself and grafana, added storing config on exit or changes --- cmd/server/main.go | 112 ++++--- docker-compose.yml | 10 +- influxdb/startup/influxsetup.sh | 10 +- internal/pkg/config/server.go | 244 ++++++++------- internal/pkg/influxdb/client.go | 22 +- internal/pkg/server/coordinator.go | 472 ++++++++++++++--------------- 6 files changed, 454 insertions(+), 416 deletions(-) diff --git a/cmd/server/main.go b/cmd/server/main.go index 29fc237..94d7260 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -1,70 +1,84 @@ package main import ( - _"net/http" - _ "net/http/pprof" - "strconv" - "strings" - //"flag" - //"log" - "os" - "fmt" - "FRMS/internal/pkg/logging" - "FRMS/internal/pkg/config" - "FRMS/internal/pkg/server" + "fmt" + _ "net/http" + _ "net/http/pprof" + "os/signal" + "strconv" + "strings" + "syscall" + + //"flag" + //"log" + "FRMS/internal/pkg/config" + "FRMS/internal/pkg/logging" + "FRMS/internal/pkg/server" + "os" ) type coordinator interface { - Start() + Start() } func NewCoordinator(ch chan error) coordinator { - return server.NewCentralCoordinator(ch) + return server.NewCentralCoordinator(ch) } func LoadConfig(fname string) Config { - config.Load(fname) - return config.LoadConfig() + config.Load(fname) + return config.LoadConfig() } type Config interface { - UpdatePort(string, int) error + UpdatePort(string, int) error + Store() error } func main() { - // lets get this bread + // lets get this bread - // go func() { - // fmt.Println(http.ListenAndServe("localhost:6060",nil)) - // }() - conf := LoadConfig("server") - ch := make(chan error) - // checking env - envVars := os.Environ() - for _, envString := range envVars { - // looping over set ports - initSplt := strings.Split(envString,"=") - key := initSplt[0] - val := initSplt[1] - if strings.Contains(key,"PORT") { - // parsing out correct port to update - splt := strings.Split(key,"_") // LIS_PORT -> LIS, PORT - portName := strings.ToLower(splt[0]) // LIS -> lis - port, err := strconv.Atoi(val) - if err != nil { - panic(err) - } - if err := conf.UpdatePort(portName,port); err != nil { - panic(err) - } - } - } + // go func() { + // fmt.Println(http.ListenAndServe("localhost:6060",nil)) + // }() + gracefulShutdown := make(chan os.Signal, 1) + signal.Notify(gracefulShutdown, syscall.SIGINT, syscall.SIGTERM) + conf := LoadConfig("server") + errCh := make(chan error) + // checking env + envVars := os.Environ() + for _, envString := range envVars { + // looping over set ports + initSplt := strings.Split(envString, "=") + key := initSplt[0] + val := initSplt[1] + if strings.Contains(key, "PORT") { + // parsing out correct port to update + splt := strings.Split(key, "_") // LIS_PORT -> LIS, PORT + portName := strings.ToLower(splt[0]) // LIS -> lis + port, err := strconv.Atoi(val) + if err != nil { + panic(err) + } + if err := conf.UpdatePort(portName, port); err != nil { + panic(err) + } + } + } - //fmt.Printf("Listening on %v\n", lport) - c := NewCoordinator(ch) - go c.Start() - fmt.Println("Server Active!") - logging.Debug(logging.DStart, "CCO 01 Server started") - err := <-ch // blocking to wait for any errors and keep alive otherwise - panic(err) + //fmt.Printf("Listening on %v\n", lport) + c := NewCoordinator(errCh) + go c.Start() + logging.Debug(logging.DStart, "CCO 01 Server started") + select { + case err := <-errCh: // blocking to wait for any errors and keep alive otherwise + panic(err) + case <-gracefulShutdown: + err := conf.Store() + if err != nil { + panic(err) + } + fmt.Println("Stored config successfully. Exiting...") + os.Exit(0) + } } diff --git a/docker-compose.yml b/docker-compose.yml index e7a7795..3bebcdc 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -8,12 +8,16 @@ services: ports: - "2022:2022" - "2023:2023" + - "2024:2024" volumes: - ./logs:/log - - server-config:/configs + - server-config:/etc/frms/config environment: - LOGTYPE=SERVER - VERBOSE=1 + - LIS_PORT=2022 + - REACTOR_PORT=2023 + - TUI_PORT=2024 depends_on: - db db: @@ -32,7 +36,7 @@ services: - DOCKER_INFLUXDB_INIT_USERNAME=admin - DOCKER_INFLUXDB_INIT_PASSWORD=F0r3l1ght - DOCKER_INFLUXDB_INIT_ORG=ForeLight - - DOCKER_INFLUXDB_INIT_BUCKET=default + - DOCKER_INFLUXDB_INIT_BUCKET=test grafana: image: grafana/grafana-oss:latest ports: @@ -40,6 +44,8 @@ services: volumes: - grafana-provisioning:/etc/grafana/provisioning - grafana-data:/var/lib/grafana + depends_on: + - db volumes: grafana-data: grafana-provisioning: diff --git a/influxdb/startup/influxsetup.sh b/influxdb/startup/influxsetup.sh index 222a838..d78f709 100755 --- a/influxdb/startup/influxsetup.sh +++ b/influxdb/startup/influxsetup.sh @@ -1,12 +1,14 @@ #!/bin/bash +#DB_URL=$(cat "$INFLUX_CONFIGS_PATH" | awk '/url/ {print $3}' | head -n 1) +DB_URL="frms-db-1:8086" + TOKEN=$(influx auth list --user ${DOCKER_INFLUXDB_INIT_USER_ID} --hide-headers | cut -f 3) ORG=$(influx org list | grep ${DOCKER_INFLUXDB_INIT_ORG_ID} | awk '{print $2}') # creating starting server YAML -echo -e "server:\n\tdb-url:${INFLUX_HOST}\n\tdb-org:${ORG}\n\tdb-token:${TOKEN}" >/configs/server.yaml; +echo -e "server:\n db-url: ${DB_URL}\n db-org: ${ORG}\n db-token: ${TOKEN}" >/configs/server.yaml; # creating grafana yaml influx user create -n grafana -o ${ORG} -GRAFANA_USER_ID=$(influx user list --hide-headers --name grafana) -GRAFANA_TOKEN=$(influx auth list --user ${GRAFANA_USER_ID} --hide-headers | cut -f 3) -echo -e "datasources:\n\t- name: INFLUXDB\n\ttype: influxdb\n\turl:${INFLUX_HOST}\n\tdatabase: test\n\t jsonData:\n\t\thttpMode: GET\n\t\thttpHeaderName1: 'Authorization'\n\tsecureJsonData:\n\t\thttpHeaderValue1: 'Token ${GRAFANA_TOKEN}'" +GRAFANA_TOKEN=$(influx auth list --user grafana --hide-headers | cut -f 3) +echo -e "apiVersion: 1\n\ndeleteDatasources:\n\ndatasources:\n - name: INFLUXDB\n type: influxdb\n access: proxy\n url: ${DB_URL}\n jsonData:\n httpMode: GET\n httpHeaderName1: 'Authorization'\n secureJsonData:\n httpHeaderValue1: 'Token ${GRAFANA_TOKEN}'" >/grafana/datasources/datasource.yaml diff --git a/internal/pkg/config/server.go b/internal/pkg/config/server.go index 4443a7d..27da7e6 100644 --- a/internal/pkg/config/server.go +++ b/internal/pkg/config/server.go @@ -3,166 +3,184 @@ package config // package serves to store/load config files for server import ( - "fmt" - "strconv" - "github.com/spf13/viper" - "FRMS/internal/pkg/logging" - "errors" - "sync" - "strings" - //"os" - //"log" - //"os/exec" - //"bytes" + "FRMS/internal/pkg/logging" + "errors" + "fmt" + "strconv" + "strings" + "sync" + + "github.com/spf13/viper" + //"os" + //"log" + //"os/exec" + //"bytes" ) type Config struct { - Server ServerConfig `mapstructure:"server"` - Reactors map[string]ReactorConfig `mapstructure:"reactors"` - sync.RWMutex + Server ServerConfig `mapstructure:"server"` + Reactors map[string]ReactorConfig `mapstructure:"reactors"` + sync.RWMutex } type ServerConfig struct { - URL string `mapstructure:"db-url"` - Token string `mapstructure:"db-token"` - Orginization string `mapstructure:"db-org"` - Ports map[string]int `mapstructure:"ports"` // changed from map[string]string to map[string]int - Name string `mapstructure:"name"` + URL string `mapstructure:"db-url"` + Token string `mapstructure:"db-token"` + Orginization string `mapstructure:"db-org"` + Ports map[string]int `mapstructure:"ports"` // changed from map[string]string to map[string]int + Name string `mapstructure:"name"` } type ReactorConfig struct { - Token string `mapstructure:"db-token"` - Bucket string `mapstructure:"db-bucket"` - Name string - Id uint32 + Token string `mapstructure:"db-token"` + Bucket string `mapstructure:"db-bucket"` + Name string + Id uint32 } var C *Config func Load(fname string) { - // read stored configs - C = &Config{} - viper.SetConfigName(fname) - viper.SetConfigType("yaml") - viper.AddConfigPath("./configs") - //viper.AddConfigPath("../../internal/configs") - // defaults - viper.SetDefault("server.db-org", "ForeLight") - viper.SetDefault("server.db-url", "http://192.168.100.2:8086") - // unmarshalling - viper.ReadInConfig() // the fact i never did this is infuriating - logging.Debug(logging.DStart,"CON Loaded configs from %v", viper.ConfigFileUsed()) - err := viper.Unmarshal(C) - if err != nil { - logging.Debug(logging.DError,"Cannot unmarshall Server! %v",err) - panic(err) - } - fmt.Printf("Outcome: %#v\n \n",C) - //fmt.Printf("%v\n",C) - // unmarshalled at this point + // read stored configs + C = &Config{} + viper.SetConfigName(fname) + viper.SetConfigType("yaml") + viper.AddConfigPath("/etc/frms/config") // + // defaults + viper.SetDefault("server.db-org", "ForeLight") + viper.SetDefault("server.db-url", "http://192.168.100.2:8086") + // unmarshalling + err := viper.ReadInConfig() // the fact i never did this is infuriating + if err != nil { + panic(err) + } + logging.Debug(logging.DStart, "CON Loaded configs from %v", viper.ConfigFileUsed()) + err = viper.Unmarshal(C) + if err != nil { + logging.Debug(logging.DError, "Cannot unmarshall Server! %v", err) + panic(err) + } + fmt.Printf("Outcome: %#v\n \n", C) + //fmt.Printf("%v\n",C) + // unmarshalled at this point } func LoadConfig() *Config { - return C + return C } func (c *Config) GetURL() (string, error) { - c.RLock() - defer c.RUnlock() - return C.Server.URL, nil + c.RLock() + defer c.RUnlock() + return C.Server.URL, nil } func (c *Config) GetOrg() (string, error) { - c.RLock() - defer c.RUnlock() - return c.Server.Orginization, nil + c.RLock() + defer c.RUnlock() + return c.Server.Orginization, nil } func (c *Config) GetPort(port string) (int, error) { - c.RLock() - defer c.RUnlock() - portString, ok := c.Server.Ports[port] - if !ok { - portEnv := strings.ToUpper(port) + "_PORT" - return 0, fmt.Errorf("%s port doesnt exist! Please set using env %s=####",port, portEnv) - } - // returns int, err - //return strconv.Atoi(portString) - return portString, nil + c.RLock() + defer c.RUnlock() + portString, ok := c.Server.Ports[port] + if !ok { + portEnv := strings.ToUpper(port) + "_PORT" + return 0, fmt.Errorf("%s port doesnt exist! Please set using env %s=####", port, portEnv) + } + // returns int, err + //return strconv.Atoi(portString) + return portString, nil } func (c *Config) GetServerToken() (string, error) { - c.RLock() - defer c.RUnlock() - return c.Server.Token, nil + c.RLock() + defer c.RUnlock() + return c.Server.Token, nil } func (c *Config) GetReactorClient(id uint32) (string, string, error) { - c.RLock() - defer c.RUnlock() - idString := strconv.FormatUint(uint64(id),10) - if r, ok := c.Reactors[idString]; ok { - return r.Bucket, r.Token, nil - } - return "", "", fmt.Errorf("Reactor %v config doesnt exist!",id) + c.RLock() + defer c.RUnlock() + idString := strconv.FormatUint(uint64(id), 10) + if r, ok := c.Reactors[idString]; ok { + return r.Bucket, r.Token, nil + } + return "", "", fmt.Errorf("Reactor %v config doesnt exist!", id) } // setters func (c *Config) UpdateURL(url string) error { - c.Lock() - defer c.Unlock() - if url == "" { - return errors.New("String cannot be empty!") - } - c.Server.URL = url - return viper.WriteConfig() + c.Lock() + defer c.Unlock() + if url == "" { + return errors.New("String cannot be empty!") + } + c.Server.URL = url + viper.Set("server.db-url", url) + return viper.WriteConfigAs(viper.ConfigFileUsed()) } func (c *Config) UpdateOrg(org string) error { - c.Lock() - defer c.Unlock() - if org == "" { - return errors.New("String cannot be empty!") - } - c.Server.Orginization = org - return viper.WriteConfig() + c.Lock() + defer c.Unlock() + if org == "" { + return errors.New("String cannot be empty!") + } + c.Server.Orginization = org + viper.Set("server.db-org", org) + return viper.WriteConfigAs(viper.ConfigFileUsed()) } func (c *Config) UpdatePort(pName string, port int) error { - c.Lock() - defer c.Unlock() - if port < 1024 || port > 65535 { - // OOB - return fmt.Errorf("Port %d out of bounds! [1024,65535]",port) - } - c.Server.Ports[pName] = port - return nil + c.Lock() + defer c.Unlock() + if port < 1024 || port > 65535 { + // OOB + return fmt.Errorf("Port %d out of bounds! [1024,65535]", port) + } + if c.Server.Ports == nil { + c.Server.Ports = make(map[string]int) + } + c.Server.Ports[pName] = port + pname := fmt.Sprintf("server.ports.%s", pName) + viper.Set(pname, port) + return viper.WriteConfigAs(viper.ConfigFileUsed()) } func (c *Config) UpdateServerToken(token string) error { - c.Lock() - defer c.Unlock() - if token == "" { - return errors.New("String cannot be empty!") - } - c.Server.Token = token - return viper.WriteConfig() + c.Lock() + defer c.Unlock() + if token == "" { + return errors.New("String cannot be empty!") + } + c.Server.Token = token + viper.Set("server.token", token) + return viper.WriteConfigAs(viper.ConfigFileUsed()) } func (c *Config) UpdateReactorClient(id uint32, bucket, token string) error { - c.Lock() - c.Unlock() - sid := strconv.FormatUint(uint64(id), 10) - if token == "" || bucket == "" { - return errors.New("String cannot be empty!") - } - if reactor, ok := c.Reactors[sid]; !ok { - c.Reactors[sid] = ReactorConfig{Token:token,Bucket:bucket,Id:id} - } else { - reactor.Bucket = bucket - reactor.Token = token - c.Reactors[sid] = reactor - } - return viper.WriteConfig() + c.Lock() + c.Unlock() + sid := strconv.FormatUint(uint64(id), 10) + if token == "" || bucket == "" { + return errors.New("String cannot be empty!") + } + if reactor, ok := c.Reactors[sid]; !ok { + c.Reactors[sid] = ReactorConfig{Token: token, Bucket: bucket, Id: id} + } else { + reactor.Bucket = bucket + reactor.Token = token + c.Reactors[sid] = reactor + } + reactorbucket := fmt.Sprintf("%s.db-bucket", id) + reactortoken := fmt.Sprintf("%s.db-token", id) + viper.Set(reactorbucket, bucket) + viper.Set(reactortoken, token) + return viper.WriteConfigAs(viper.ConfigFileUsed()) } +func (c *Config) Store() error { + return viper.WriteConfigAs(viper.ConfigFileUsed()) +} diff --git a/internal/pkg/influxdb/client.go b/internal/pkg/influxdb/client.go index 48cc862..055ef87 100644 --- a/internal/pkg/influxdb/client.go +++ b/internal/pkg/influxdb/client.go @@ -7,6 +7,7 @@ import ( type DBClient struct { URL string + Org string Bucket string Token string // Client *influxdb2.Client @@ -17,18 +18,29 @@ type DBAdmin struct { *DBClient } -func NewDBClient(url, bucket, token string) *DBClient { - db := &DBClient{URL:url, Bucket:bucket, Token:token} +func NewDBClient(url, org, token string) *DBClient { + db := &DBClient{URL:url, Org:org, Token:token} return db } -func NewDBAdmin(url, bucket, token string) *DBAdmin { +func NewDBAdmin(url, org, token string) *DBAdmin { admin := &DBAdmin{} - admin.DBClient = NewDBClient(url, bucket, token) + admin.DBClient = NewDBClient(url, org, token) return admin } + // base level funcs func (d *DBClient) Start() { // connect to DB - // d.Client = influxdb2.NewClient(d.URL,d.Token) } + +func (d *DBAdmin) GetReactorClient(id string) (string, string, error) { + // given an id returns associated token and bucket + client := influxdb2.NewClient(d.URL,d.Token) + defer client.Close() + bucket, err := client.BucketsAPI().FindBucketByName(context.Background(),id) + if err != nil { + return "", "", err + } + if d.ReactorExists(id) { + // get corresponding reactor token and bucket diff --git a/internal/pkg/server/coordinator.go b/internal/pkg/server/coordinator.go index 50054da..9e3999a 100644 --- a/internal/pkg/server/coordinator.go +++ b/internal/pkg/server/coordinator.go @@ -1,17 +1,18 @@ package server import ( - "sync" - "fmt" - "net" - "context" - "errors" - "FRMS/internal/pkg/logging" - "google.golang.org/grpc" - pb "FRMS/internal/pkg/grpc" - "FRMS/internal/pkg/config" - _ "FRMS/internal/pkg/influxdb" - + "FRMS/internal/pkg/config" + pb "FRMS/internal/pkg/grpc" + "FRMS/internal/pkg/influxdb" + _ "FRMS/internal/pkg/influxdb" + "FRMS/internal/pkg/logging" + "context" + "errors" + "fmt" + "net" + "sync" + + "google.golang.org/grpc" ) // this package creates the central coordiantor and sub coordiantors for clients @@ -19,339 +20,324 @@ import ( // config interface func LoadConfig() Config { - // returns a ServerConfig that we can query and update - return config.LoadConfig() + // returns a ServerConfig that we can query and update + return config.LoadConfig() } type Config interface { // PROPOSED RENAMING: ServerConfig to avoid confusion w/ reactor variant - // getters - GetURL() (string, error) - GetOrg() (string, error) - GetPort(string) (int, error) - GetServerToken() (string, error) - GetReactorClient(uint32) (string, string, error) // ret (bucket, token, err) - // setters - // save on write - UpdateURL(string) error - UpdateOrg(string) error - UpdateServerToken(string) error - UpdateReactorClient(uint32, string, string) error // call (id, bucket, token) + // getters + GetURL() (string, error) + GetOrg() (string, error) + GetPort(string) (int, error) + GetServerToken() (string, error) + GetReactorClient(uint32) (string, string, error) // ret (bucket, token, err) + // setters + // save on write + //UpdateURL(string) error + //UpdateOrg(string) error + //UpdateServerToken(string) error + UpdateReactorClient(uint32, string, string) error // call (id, bucket, token) } // db client interface -type DB interface{ - // getters (all create if doesnt exist) - GetToken() (string, error) // returns admin token (Creates if it doesnt exist) - GetReactorClient(uint32) (string, string, error) // returns (bucket, token, err) - // delete - DeleteReactorClient(uint32) error // removes client token but maintains bucket - PurgeReactorClientData(uint32) error // perm deletes all assocaited reactor data (token, bucket etc) +type DB interface { + // getters (all create if doesnt exist) + //GetToken() (string, error) // returns admin token (Creates if it doesnt exist) + GetReactorClient(string) (string, string, error) // returns (bucket, token, err) + // delete + DeleteReactorClient(string) error // removes client token but maintains bucket + PurgeReactorClientData(string) error // perm deletes all assocaited reactor data (token, bucket etc) } -/*func NewDBClient() DBClient { - return influxdb.NewServerClient() -}*/ - +func NewDBAdmin(url, org, token string) DB { + return influxdb.NewDBAdmin(token, org, url) +} type CentralCoordinator struct { - ClientConnections *ClientPacket - //CLisPort int - *SubCoordinators - *SystemViewer - DB - Config - Err chan error + ClientConnections *ClientPacket + //CLisPort int + *SubCoordinators + *SystemViewer + DB + Config + Err chan error } type SubCoordinators struct { - Directory map [string]*SubCoordinator - sync.Mutex + Directory map[string]*SubCoordinator + sync.Mutex } func NewCentralCoordinator(ch chan error) *CentralCoordinator { - c := &CentralCoordinator{Err: ch} - c.SystemViewer = NewSystemViewer() - go c.SystemViewer.Start() - s := make(map[string]*SubCoordinator) - sub := &SubCoordinators{Directory:s} - c.SubCoordinators = sub - return c + c := &CentralCoordinator{Err: ch} + c.SystemViewer = NewSystemViewer() + go c.SystemViewer.Start() + s := make(map[string]*SubCoordinator) + sub := &SubCoordinators{Directory: s} + c.SubCoordinators = sub + return c } func (c *CentralCoordinator) Start() { - // starts up associated funcs - // begin with config and client - c.LoadCfg() - clientChan := make(chan *ClientPacket) - l := NewListener(clientChan,c.Err) - go l.Start() - go c.ClientListener(clientChan) + // starts up associated funcs + // begin with config and client + c.LoadCfg() + clientChan := make(chan *ClientPacket) + l := NewListener(clientChan, c.Err) + go l.Start() + go c.ClientListener(clientChan) } func (c *CentralCoordinator) LoadCfg() { - // loads db client info and updates if anything is missing - c.Config = LoadConfig() - _, err := c.Config.GetURL() - if err != nil { - logging.Debug(logging.DError,"CCO 01 Err: %v", err) - c.Err <-err - } - token, err := c.Config.GetServerToken() - if err != nil { - logging.Debug(logging.DError,"CCO 01 Err: %v", err) - c.Err <-err - } else if token == "" { - if token, err = c.DB.GetToken(); err != nil { - logging.Debug(logging.DError,"CCO 01 Err: %v", err) - c.Err <-err - } - c.Config.UpdateServerToken(token) - } - org, err := c.Config.GetOrg() - if err != nil { - logging.Debug(logging.DError,"CCO 01 Err: %v", err) - c.Err <-err - } else if token == "" { - if token, err = c.DB.GetToken(); err != nil { - logging.Debug(logging.DError,"CCO 01 Err: %v", err) - c.Err <-err - } - c.Config.UpdateOrg(org) - } + // loads db client info and updates if anything is missing + c.Config = LoadConfig() + _, err := c.Config.GetURL() + if err != nil { + logging.Debug(logging.DError, "CCO 01 Err: %v", err) + c.Err <- err + } + token, err := c.Config.GetServerToken() + if err != nil { + logging.Debug(logging.DError, "CCO 01 Err: %v", err) + c.Err <- err + } + org, err := c.Config.GetOrg() + if err != nil { + logging.Debug(logging.DError, "CCO 01 Err: %v", err) + c.Err <- err + } } func (c *CentralCoordinator) ClientListener(ch chan *ClientPacket) { - for client := range ch { - // basically loops until channel is closed - cr := c.ClientHandler(client.Client) - client.Response <-cr - } + for client := range ch { + // basically loops until channel is closed + cr := c.ClientHandler(client.Client) + client.Response <- cr + } } func (c *CentralCoordinator) ClientHandler(cl *Client) *ClientResponse { - c.SubCoordinators.Lock() - defer c.SubCoordinators.Unlock() - subcoord, ok := c.SubCoordinators.Directory[cl.Type] - if !ok { - // Sub Coordinator does not exists - logging.Debug(logging.DSpawn,"CC0 01 Created %v Coordinator",cl.Type) - subcoord = NewSubCoordinator(cl.Type,c.SystemViewer, c.Err) - c.SubCoordinators.Directory[cl.Type] = subcoord - } - go subcoord.ClientHandler(cl) - // setting up client response - var url, org, token, bucket string - var port int - var err error - if url, err = c.Config.GetURL(); err != nil { - logging.Debug(logging.DError,"Error: %v", err) - c.Err <-err - } else if org, err = c.Config.GetOrg(); err != nil { - logging.Debug(logging.DError,"Error: %v", err) - c.Err <-err - } else if bucket, token, err = c.Config.GetReactorClient(cl.Id); err != nil { - logging.Debug(logging.DError,"Error: %v", err) - c.Err <-err - } else if port, err = c.Config.GetPort(cl.Type); err != nil { - logging.Debug(logging.DError,"Error: %v", err) - c.Err <-err - } - cr := &ClientResponse{URL:url, Org:org, Token:token, Bucket:bucket, Port:port} - return cr + c.SubCoordinators.Lock() + defer c.SubCoordinators.Unlock() + subcoord, ok := c.SubCoordinators.Directory[cl.Type] + if !ok { + // Sub Coordinator does not exists + logging.Debug(logging.DSpawn, "CC0 01 Created %v Coordinator", cl.Type) + subcoord = NewSubCoordinator(cl.Type, c.SystemViewer, c.Err) + c.SubCoordinators.Directory[cl.Type] = subcoord + } + go subcoord.ClientHandler(cl) + // setting up client response + var url, org, token, bucket string + var port int + var err error + if url, err = c.Config.GetURL(); err != nil { + logging.Debug(logging.DError, "Error: %v", err) + c.Err <- err + } else if org, err = c.Config.GetOrg(); err != nil { + logging.Debug(logging.DError, "Error: %v", err) + c.Err <- err + } else if bucket, token, err = c.Config.GetReactorClient(cl.Id); err != nil { + logging.Debug(logging.DError, "Error: %v", err) + c.Err <- err + } else if port, err = c.Config.GetPort(cl.Type); err != nil { + logging.Debug(logging.DError, "Error: %v", err) + c.Err <- err + } + cr := &ClientResponse{URL: url, Org: org, Token: token, Bucket: bucket, Port: port} + return cr } type ManagerInterface interface { - Start() - NewManager(*Client,*SystemViewer, chan error) GeneralManager - GetManager(uint32) (GeneralManager, bool) - AddManager(uint32, GeneralManager) - Register() + Start() + NewManager(*Client, *SystemViewer, chan error) GeneralManager + GetManager(uint32) (GeneralManager, bool) + AddManager(uint32, GeneralManager) + Register() } - type GeneralManager interface { - // used by sub coordinator to interact with manager - Start() - UpdateClient(*Client) + // used by sub coordinator to interact with manager + Start() + UpdateClient(*Client) } type SubCoordinator struct { - Port int // port that we set up gRPC endpoint on - ManagerInterface // embed an interface to create/manager managers - *SystemViewer - Err chan error + Port int // port that we set up gRPC endpoint on + ManagerInterface // embed an interface to create/manager managers + *SystemViewer + Err chan error } type Managers struct { - Directory map[uint32]interface{} // support for either manager - sync.RWMutex // potential perf + Directory map[uint32]interface{} // support for either manager + sync.RWMutex // potential perf } // interface stuff func NewSubCoordinator(clientType string, sys *SystemViewer, err chan error) *SubCoordinator { - c := &SubCoordinator{Err:err} - c.SystemViewer = sys - man, errs := NewCoordinatorType(clientType, err) - if errs != nil { - err <-errs - } - c.ManagerInterface = man - go man.Start() - go man.Register() - return c + c := &SubCoordinator{Err: err} + c.SystemViewer = sys + man, errs := NewCoordinatorType(clientType, err) + if errs != nil { + err <- errs + } + c.ManagerInterface = man + go man.Start() + go man.Register() + return c } func (c *SubCoordinator) ClientHandler(cl *Client) { - // (creates and) notifies manager of client connection + // (creates and) notifies manager of client connection - c.UpdateManager(cl) + c.UpdateManager(cl) } func (c *SubCoordinator) UpdateManager(cl *Client) { - // shouldn't happen all that often so should be fine to lock - m, exists := c.GetManager(cl.Id) - if !exists { - m = c.NewManager(cl, c.SystemViewer, c.Err) - m.UpdateClient(cl) - go c.AddManager(cl.Id, m) - go m.Start() - } - go m.UpdateClient(cl) + // shouldn't happen all that often so should be fine to lock + m, exists := c.GetManager(cl.Id) + if !exists { + m = c.NewManager(cl, c.SystemViewer, c.Err) + m.UpdateClient(cl) + go c.AddManager(cl.Id, m) + go m.Start() + } + go m.UpdateClient(cl) } func (m *Managers) AddManager(id uint32, man GeneralManager) { - m.Lock() - defer m.Unlock() - m.Directory[id] = man + m.Lock() + defer m.Unlock() + m.Directory[id] = man } func (m *Managers) GetManager(id uint32) (GeneralManager, bool) { - // just read locks and reuturns - m.RLock() - defer m.RUnlock() - man, exists := m.Directory[id] - if !exists { - return nil, exists - } - return man.(GeneralManager), exists + // just read locks and reuturns + m.RLock() + defer m.RUnlock() + man, exists := m.Directory[id] + if !exists { + return nil, exists + } + return man.(GeneralManager), exists } func NewCoordinatorType(clientType string, err chan error) (ManagerInterface, error) { - m := make(map[uint32]interface{}) - if clientType == "reactor" { - c := &reactorCoordinator{} - //m := make(map[uint32]*ReactorManager) - c.Managers = &Managers{Directory:m} - return c, nil - } else if clientType == "tui" { - c := &tuiCoordinator{} - //m := make(map[uint32]*TUIManager) - c.Managers = &Managers{Directory:m} - return c, nil - } - return &reactorCoordinator{}, errors.New("Unrecognized client type") + m := make(map[uint32]interface{}) + if clientType == "reactor" { + c := &reactorCoordinator{} + //m := make(map[uint32]*ReactorManager) + c.Managers = &Managers{Directory: m} + return c, nil + } else if clientType == "tui" { + c := &tuiCoordinator{} + //m := make(map[uint32]*TUIManager) + c.Managers = &Managers{Directory: m} + return c, nil + } + return &reactorCoordinator{}, errors.New("Unrecognized client type") } // creating sub coordinators for associated gRPC handlers // reactor coordinator type reactorCoordinator struct { - *Managers - pb.UnimplementedMonitoringServer + *Managers + pb.UnimplementedMonitoringServer } func (r *reactorCoordinator) Start() { - logging.Debug(logging.DStart,"RCO 01 Starting!") + logging.Debug(logging.DStart, "RCO 01 Starting!") } func (r *reactorCoordinator) NewManager(cl *Client, sys *SystemViewer, err chan error) GeneralManager { - logging.Debug(logging.DClient, "RCO 01 starting manager for %v client %v",cl.Type,cl.Id) - return NewReactorManager(cl,sys,err) + logging.Debug(logging.DClient, "RCO 01 starting manager for %v client %v", cl.Type, cl.Id) + return NewReactorManager(cl, sys, err) } func (r *reactorCoordinator) Register() { - conf := LoadConfig() - port, err := conf.GetPort("reactor") - if err != nil { - panic(err) - } - lis, err := net.Listen("tcp", fmt.Sprintf(":%v",port)) - if err != nil { - panic(err) - } - grpcServer := grpc.NewServer() - pb.RegisterMonitoringServer(grpcServer,r) - go grpcServer.Serve(lis) - logging.Debug(logging.DClient, "RCO ready for client requests") + conf := LoadConfig() + port, err := conf.GetPort("reactor") + if err != nil { + panic(err) + } + lis, err := net.Listen("tcp", fmt.Sprintf(":%v", port)) + if err != nil { + panic(err) + } + grpcServer := grpc.NewServer() + pb.RegisterMonitoringServer(grpcServer, r) + go grpcServer.Serve(lis) + logging.Debug(logging.DClient, "RCO ready for client requests") } func (r *reactorCoordinator) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) { - m, exists := r.GetManager(req.GetId()) - if !exists { - return &pb.ReactorStatusResponse{}, errors.New("Manager doesn't exists for that client") - } - rm, ok := m.(*ReactorManager) - if !ok { - return &pb.ReactorStatusResponse{}, errors.New("Manager is not a reactor manager!") - } - return rm.ReactorStatusHandler(ctx, req) + m, exists := r.GetManager(req.GetId()) + if !exists { + return &pb.ReactorStatusResponse{}, errors.New("Manager doesn't exists for that client") + } + rm, ok := m.(*ReactorManager) + if !ok { + return &pb.ReactorStatusResponse{}, errors.New("Manager is not a reactor manager!") + } + return rm.ReactorStatusHandler(ctx, req) } //tui coordinator type tuiCoordinator struct { - *Managers // by embedding general struct we allow coordinator to still call general funcs - pb.UnimplementedManagementServer + *Managers // by embedding general struct we allow coordinator to still call general funcs + pb.UnimplementedManagementServer } func (t *tuiCoordinator) Start() { - logging.Debug(logging.DStart,"TCO 01 Starting!") + logging.Debug(logging.DStart, "TCO 01 Starting!") } func (t *tuiCoordinator) NewManager(cl *Client, sys *SystemViewer, err chan error) GeneralManager { - logging.Debug(logging.DClient, "TCO 01 starting manager for %v client %v",cl.Type,cl.Id) - return NewTUIManager(cl,sys,err) + logging.Debug(logging.DClient, "TCO 01 starting manager for %v client %v", cl.Type, cl.Id) + return NewTUIManager(cl, sys, err) } func (t *tuiCoordinator) Register() { - conf := LoadConfig() - port, err := conf.GetPort("tui") - if err != nil { - panic(err) - } - lis, err := net.Listen("tcp", fmt.Sprintf(":%v",port)) - if err != nil { - // rip - } - grpcServer := grpc.NewServer() - pb.RegisterManagementServer(grpcServer,t) - go grpcServer.Serve(lis) - logging.Debug(logging.DClient, "TCO ready for client requests") + conf := LoadConfig() + port, err := conf.GetPort("tui") + if err != nil { + panic(err) + } + lis, err := net.Listen("tcp", fmt.Sprintf(":%v", port)) + if err != nil { + // rip + } + grpcServer := grpc.NewServer() + pb.RegisterManagementServer(grpcServer, t) + go grpcServer.Serve(lis) + logging.Debug(logging.DClient, "TCO ready for client requests") } func (t *tuiCoordinator) GetDevices(ctx context.Context, req *pb.GetDevicesRequest) (*pb.GetDevicesResponse, error) { - // grpc handler to fwd to manager - m, exists := t.GetManager(req.GetClientId()) - if !exists { - // doesnt exist for some reason - return &pb.GetDevicesResponse{}, errors.New("Manager doesn't exists for client") - } - tm, ok := m.(*TUIManager) - if !ok { - return &pb.GetDevicesResponse{}, errors.New("Manager is not of type TUI") - } - return tm.GetDevices(ctx,req) + // grpc handler to fwd to manager + m, exists := t.GetManager(req.GetClientId()) + if !exists { + // doesnt exist for some reason + return &pb.GetDevicesResponse{}, errors.New("Manager doesn't exists for client") + } + tm, ok := m.(*TUIManager) + if !ok { + return &pb.GetDevicesResponse{}, errors.New("Manager is not of type TUI") + } + return tm.GetDevices(ctx, req) } // unimplemented bs for grpc func (t *tuiCoordinator) DeleteReactor(ctx context.Context, req *pb.DeleteReactorRequest) (*pb.DeleteReactorResponse, error) { - // TODO - return &pb.DeleteReactorResponse{}, nil + // TODO + return &pb.DeleteReactorResponse{}, nil } func (t *tuiCoordinator) DeleteReactorDevice(ctx context.Context, req *pb.DeleteReactorDeviceRequest) (*pb.DeleteReactorDeviceResponse, error) { - // TODO - return &pb.DeleteReactorDeviceResponse{}, nil + // TODO + return &pb.DeleteReactorDeviceResponse{}, nil } -