From ebd87863bccfb8d9c17bba90eb62492013b26232 Mon Sep 17 00:00:00 2001 From: KeeganForelight Date: Mon, 5 Dec 2022 18:25:53 -0500 Subject: [PATCH] starting to reconfig config --- cmd/server/main.go | 54 ++------ internal/configs/server.yaml | 31 +++-- internal/pkg/config/load.go | 165 +++++------------------- internal/pkg/config/{ => old}/def.go | 0 internal/pkg/config/old/flags.go | 5 + internal/pkg/config/old/load.go | 173 +++++++++++++++++++++++++ internal/pkg/config/old/server.go | 180 +++++++++++++++++++++++++++ internal/pkg/config/server.go | 149 +++++++++------------- internal/pkg/influxdb/client.go | 53 ++++---- internal/pkg/server/coordinator.go | 75 +++++------ internal/pkg/server/listener.go | 113 +++++++++-------- notes.md | 6 +- 12 files changed, 604 insertions(+), 400 deletions(-) rename internal/pkg/config/{ => old}/def.go (100%) create mode 100644 internal/pkg/config/old/flags.go create mode 100644 internal/pkg/config/old/load.go create mode 100644 internal/pkg/config/old/server.go diff --git a/cmd/server/main.go b/cmd/server/main.go index 9c9e0a2..514dac6 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -5,8 +5,6 @@ import ( _ "net/http" _ "net/http/pprof" "os/signal" - "strconv" - "strings" "syscall" //"flag" @@ -21,61 +19,34 @@ type coordinator interface { Start() } -// NewCoordinator creates a new coordinator that runs on the central server -// The coordinator is given an error channel to feed errors back to the main process should any arise -// The zero value for a new coordinator is ready to start +type Config interface { + //UpdatePort(string, int) error + Store() error +} + func NewCoordinator(ch chan error) coordinator { return server.NewCentralCoordinator(ch) } -// LoadConfig loads a given config based on a string lookup -// Used to load the associated settings for a coordinator such as port and IP address as well as database settings -// LoadConfig expects the returned config to satisfy the interface +func NewConfig(fname string) Config { + return config.NewConfig(fname) +} + +/* func LoadConfig(fname string) Config { if err := config.Load(fname); err != nil { panic(err) } return config.LoadConfig() -} - -// Basic functions expected to be provided by the config structure -type Config interface { - UpdatePort(string, int) error - Store() error -} +} */ func main() { // lets get this bread - // go func() { - // fmt.Println(http.ListenAndServe("localhost:6060",nil)) - // }() gracefulShutdown := make(chan os.Signal, 1) signal.Notify(gracefulShutdown, syscall.SIGINT, syscall.SIGTERM) - conf := LoadConfig("server") + conf := NewConfig("server") errCh := make(chan error) - // checking env - envVars := os.Environ() - - // I can put this is a seperate func - for _, envString := range envVars { - // looping over set ports - initSplt := strings.Split(envString, "=") - key := initSplt[0] - val := initSplt[1] - if strings.Contains(key, "PORT") { - // parsing out correct port to update - splt := strings.Split(key, "_") // LIS_PORT -> LIS, PORT - portName := strings.ToLower(splt[0]) // LIS -> lis - port, err := strconv.Atoi(val) - if err != nil { - panic(err) - } - if err := conf.UpdatePort(portName, port); err != nil { - panic(err) - } - } - } //fmt.Printf("Listening on %v\n", lport) c := NewCoordinator(errCh) @@ -85,6 +56,7 @@ func main() { case err := <-errCh: // blocking to wait for any errors and keep alive otherwise panic(err) case <-gracefulShutdown: + // Shutdown via INT err := conf.Store() if err != nil { panic(err) diff --git a/internal/configs/server.yaml b/internal/configs/server.yaml index f09fcc1..531c4ec 100644 --- a/internal/configs/server.yaml +++ b/internal/configs/server.yaml @@ -1,17 +1,16 @@ -server: - name: "Rack Server" - db-url: "http://192.168.100.2:8086" - db-org: "ForeLight" - db-token: "" - ports: - lis: 2022 - reactor: 2023 - tui: 2024 - db: 8086 +db: + org: ForeLight + token: "" + url: http://192.168.100.2:8086 +name: Rack Server +ports: + db: 8086 + lis: 20000 + reactor: 2023 + tui: 2024 reactors: - 10002123: - name: "Beaglebone Black" - db-token: "" - db-bucket: "" - - + "10002123": + db: + bucket: test + token: "" + name: Beaglebone Black diff --git a/internal/pkg/config/load.go b/internal/pkg/config/load.go index d0ffb78..2c9c7d6 100644 --- a/internal/pkg/config/load.go +++ b/internal/pkg/config/load.go @@ -8,166 +8,57 @@ import ( "FRMS/internal/pkg/logging" "errors" "fmt" - "strconv" - "strings" "github.com/spf13/viper" - //"os" - //"log" - //"os/exec" - //"bytes" ) -func NewConfig(name string) (Config, error) { - // returns a Config Structure of assocaited name - return C, Load(name) +type Config interface { + //rip + Store() error } -type ConfigStruct interface { - // structure to do demarshall config into - LoadFile(string) error +func NewConfig(name string) Config { + // returns a Config Structure of assocaited name + return LoadConfig(name) } -func LoadConfigFile(fname string, strct ConfigStruct) error { +func LoadConfig(fname string) Config { // Demarshalls a given filename into the struct // returns nil if successful + var C Config + logging.Debug(logging.DStart, "Loading config for %s", fname) viper.SetConfigName(fname) viper.SetConfigType("yaml") - viper.AddConfigPath("/etc/frms/config") + //viper.AddConfigPath("/etc/frms/config") + viper.AddConfigPath("$HOME/FRMS/internal/configs") + // struct and env vars + switch fname { + case "server": + C = &ServerConfig{} + case "reactor": + panic(errors.New("f")) + //C = &ReactorConf{} + default: + panic(errors.New(fmt.Sprintf("%s not recognized", fname))) + } + + // Sets env vars + viper.AutomaticEnv() // unmarshalling if err := viper.ReadInConfig(); err != nil { - return err + panic(err) } logging.Debug(logging.DStart, "CON Loaded configs from %v", viper.ConfigFileUsed()) - if err = viper.Unmarshal(strct); err != nil { + + if err := viper.Unmarshal(C); err != nil { logging.Debug(logging.DError, "Cannot unmarshall %v", err) - return err + panic(err) } fmt.Printf("Outcome: %#v\n \n", C) // unmarshalled at this point -} - -//// - -func LoadConfig() Config { return C -} - -func (c *ServerConf) GetURL() (string, error) { - c.RLock() - defer c.RUnlock() - return C.Server.URL, nil -} - -func (c *ServerConf) GetOrg() (string, error) { - c.RLock() - defer c.RUnlock() - return c.Server.Orginization, nil -} - -func (c *ServerConf) GetPort(port string) (int, error) { - c.RLock() - defer c.RUnlock() - portString, ok := c.Server.Ports[port] - if !ok { - portEnv := strings.ToUpper(port) + "_PORT" - return 0, fmt.Errorf("%s port doesnt exist! Please set using env %s=####", port, portEnv) - } - // returns int, err - //return strconv.Atoi(portString) - return portString, nil -} - -func (c *ServerConf) GetServerToken() (string, error) { - c.RLock() - defer c.RUnlock() - return c.Server.Token, nil -} - -func (c *ServerConf) GetReactorClient(id uint32) (string, string, error) { - c.RLock() - defer c.RUnlock() - idString := strconv.FormatUint(uint64(id), 10) - if r, ok := c.Reactors[idString]; ok { - return r.Bucket, r.Token, nil - } - return "", "", fmt.Errorf("reactor %v config doesnt exist", id) -} - -// setters -func (c *ServerConf) UpdateURL(url string) error { - c.Lock() - defer c.Unlock() - if url == "" { - return errors.New("string cannot be empty") - } - c.Server.URL = url - viper.Set("server.db-url", url) - return viper.WriteConfigAs(viper.ConfigFileUsed()) -} - -func (c *ServerConf) UpdateOrg(org string) error { - c.Lock() - defer c.Unlock() - if org == "" { - return errors.New("string cannot be empty") - } - c.Server.Orginization = org - viper.Set("server.db-org", org) - return viper.WriteConfigAs(viper.ConfigFileUsed()) -} - -func (c *ServerConf) UpdatePort(pName string, port int) error { - c.Lock() - defer c.Unlock() - if port < 1024 || port > 65535 { - // OOB - return fmt.Errorf("Port %d out of bounds! [1024,65535]", port) - } - if c.Server.Ports == nil { - c.Server.Ports = make(map[string]int) - } - c.Server.Ports[pName] = port - pname := fmt.Sprintf("server.ports.%s", pName) - viper.Set(pname, port) - return viper.WriteConfigAs(viper.ConfigFileUsed()) -} - -func (c *ServerConf) UpdateServerToken(token string) error { - c.Lock() - defer c.Unlock() - if token == "" { - return errors.New("String cannot be empty!") - } - c.Server.Token = token - viper.Set("server.token", token) - return viper.WriteConfigAs(viper.ConfigFileUsed()) -} - -func (c *ServerConf) UpdateReactorClient(id uint32, bucket, token string) error { - c.Lock() - c.Unlock() - sid := strconv.FormatUint(uint64(id), 10) - if token == "" || bucket == "" { - return errors.New("String cannot be empty!") - } - if reactor, ok := c.Reactors[sid]; !ok { - c.Reactors[sid] = ReactorConfig{Token: token, Bucket: bucket, Id: id} - } else { - reactor.Bucket = bucket - reactor.Token = token - c.Reactors[sid] = reactor - } - reactorbucket := fmt.Sprintf("%s.db-bucket", id) - reactortoken := fmt.Sprintf("%s.db-token", id) - viper.Set(reactorbucket, bucket) - viper.Set(reactortoken, token) - return viper.WriteConfigAs(viper.ConfigFileUsed()) -} -func (c *ServerConf) Store() error { - return viper.WriteConfigAs(viper.ConfigFileUsed()) } diff --git a/internal/pkg/config/def.go b/internal/pkg/config/old/def.go similarity index 100% rename from internal/pkg/config/def.go rename to internal/pkg/config/old/def.go diff --git a/internal/pkg/config/old/flags.go b/internal/pkg/config/old/flags.go new file mode 100644 index 0000000..7490a17 --- /dev/null +++ b/internal/pkg/config/old/flags.go @@ -0,0 +1,5 @@ +package config + +/* +Package provides a way to update current config based on values passed in flags +*/ diff --git a/internal/pkg/config/old/load.go b/internal/pkg/config/old/load.go new file mode 100644 index 0000000..d0ffb78 --- /dev/null +++ b/internal/pkg/config/old/load.go @@ -0,0 +1,173 @@ +package config + +/* +Load.go contains methods to load values from config, flags and env. +*/ + +import ( + "FRMS/internal/pkg/logging" + "errors" + "fmt" + "strconv" + "strings" + + "github.com/spf13/viper" + //"os" + //"log" + //"os/exec" + //"bytes" +) + +func NewConfig(name string) (Config, error) { + // returns a Config Structure of assocaited name + return C, Load(name) +} + +type ConfigStruct interface { + // structure to do demarshall config into + LoadFile(string) error +} + +func LoadConfigFile(fname string, strct ConfigStruct) error { + // Demarshalls a given filename into the struct + // returns nil if successful + logging.Debug(logging.DStart, "Loading config for %s", fname) + viper.SetConfigName(fname) + viper.SetConfigType("yaml") + viper.AddConfigPath("/etc/frms/config") + + // unmarshalling + if err := viper.ReadInConfig(); err != nil { + return err + } + + logging.Debug(logging.DStart, "CON Loaded configs from %v", viper.ConfigFileUsed()) + if err = viper.Unmarshal(strct); err != nil { + logging.Debug(logging.DError, "Cannot unmarshall %v", err) + return err + } + fmt.Printf("Outcome: %#v\n \n", C) + // unmarshalled at this point +} + +//// + +func LoadConfig() Config { + return C +} + +func (c *ServerConf) GetURL() (string, error) { + c.RLock() + defer c.RUnlock() + return C.Server.URL, nil +} + +func (c *ServerConf) GetOrg() (string, error) { + c.RLock() + defer c.RUnlock() + return c.Server.Orginization, nil +} + +func (c *ServerConf) GetPort(port string) (int, error) { + c.RLock() + defer c.RUnlock() + portString, ok := c.Server.Ports[port] + if !ok { + portEnv := strings.ToUpper(port) + "_PORT" + return 0, fmt.Errorf("%s port doesnt exist! Please set using env %s=####", port, portEnv) + } + // returns int, err + //return strconv.Atoi(portString) + return portString, nil +} + +func (c *ServerConf) GetServerToken() (string, error) { + c.RLock() + defer c.RUnlock() + return c.Server.Token, nil +} + +func (c *ServerConf) GetReactorClient(id uint32) (string, string, error) { + c.RLock() + defer c.RUnlock() + idString := strconv.FormatUint(uint64(id), 10) + if r, ok := c.Reactors[idString]; ok { + return r.Bucket, r.Token, nil + } + return "", "", fmt.Errorf("reactor %v config doesnt exist", id) +} + +// setters +func (c *ServerConf) UpdateURL(url string) error { + c.Lock() + defer c.Unlock() + if url == "" { + return errors.New("string cannot be empty") + } + c.Server.URL = url + viper.Set("server.db-url", url) + return viper.WriteConfigAs(viper.ConfigFileUsed()) +} + +func (c *ServerConf) UpdateOrg(org string) error { + c.Lock() + defer c.Unlock() + if org == "" { + return errors.New("string cannot be empty") + } + c.Server.Orginization = org + viper.Set("server.db-org", org) + return viper.WriteConfigAs(viper.ConfigFileUsed()) +} + +func (c *ServerConf) UpdatePort(pName string, port int) error { + c.Lock() + defer c.Unlock() + if port < 1024 || port > 65535 { + // OOB + return fmt.Errorf("Port %d out of bounds! [1024,65535]", port) + } + if c.Server.Ports == nil { + c.Server.Ports = make(map[string]int) + } + c.Server.Ports[pName] = port + pname := fmt.Sprintf("server.ports.%s", pName) + viper.Set(pname, port) + return viper.WriteConfigAs(viper.ConfigFileUsed()) +} + +func (c *ServerConf) UpdateServerToken(token string) error { + c.Lock() + defer c.Unlock() + if token == "" { + return errors.New("String cannot be empty!") + } + c.Server.Token = token + viper.Set("server.token", token) + return viper.WriteConfigAs(viper.ConfigFileUsed()) +} + +func (c *ServerConf) UpdateReactorClient(id uint32, bucket, token string) error { + c.Lock() + c.Unlock() + sid := strconv.FormatUint(uint64(id), 10) + if token == "" || bucket == "" { + return errors.New("String cannot be empty!") + } + if reactor, ok := c.Reactors[sid]; !ok { + c.Reactors[sid] = ReactorConfig{Token: token, Bucket: bucket, Id: id} + } else { + reactor.Bucket = bucket + reactor.Token = token + c.Reactors[sid] = reactor + } + reactorbucket := fmt.Sprintf("%s.db-bucket", id) + reactortoken := fmt.Sprintf("%s.db-token", id) + viper.Set(reactorbucket, bucket) + viper.Set(reactortoken, token) + return viper.WriteConfigAs(viper.ConfigFileUsed()) +} + +func (c *ServerConf) Store() error { + return viper.WriteConfigAs(viper.ConfigFileUsed()) +} diff --git a/internal/pkg/config/old/server.go b/internal/pkg/config/old/server.go new file mode 100644 index 0000000..9d600b1 --- /dev/null +++ b/internal/pkg/config/old/server.go @@ -0,0 +1,180 @@ +package config + +// package serves to implement config interface for server + +import ( + "FRMS/internal/pkg/logging" + "errors" + "fmt" + "strconv" + "strings" + + "github.com/spf13/viper" + //"os" + //"log" + //"os/exec" + //"bytes" +) + +type Config interface { + Store() error +} + +var C Config + +func Load(fname string) error { + // read stored configs + //C = &ServerConf{} + switch fname { + case "server": + C = &ServerConf{} + case "reactor": + C = &ReactorConf{} + default: + return fmt.Errorf("%s not recognized", fname) + } + logging.Debug(logging.DStart, "Loading config for %s", fname) + viper.SetConfigName(fname) + viper.SetConfigType("yaml") + viper.AddConfigPath("/etc/frms/config") + // defaults which we don't want to set nessecarily + //viper.SetDefault("server.db-org", "ForeLight") + //viper.SetDefault("server.db-url", "http://192.168.100.2:8086") + //viper.SetDefault("reactor.db-url", "http://192.168.100.2:8086") + // unmarshalling + err := viper.ReadInConfig() // the fact i never did this is infuriating + if err != nil { + panic(err) + } + logging.Debug(logging.DStart, "CON Loaded configs from %v", viper.ConfigFileUsed()) + err = viper.Unmarshal(C) + if err != nil { + logging.Debug(logging.DError, "Cannot unmarshall Server! %v", err) + panic(err) + } + fmt.Printf("Outcome: %#v\n \n", C) + //fmt.Printf("%v\n",C) + // unmarshalled at this point +} + +func LoadConfig() Config { + return C +} + +func (c *ServerConf) Load() + +func (c *ServerConf) GetURL() (string, error) { + c.RLock() + defer c.RUnlock() + return C.Server.URL, nil +} + +func (c *ServerConf) GetOrg() (string, error) { + c.RLock() + defer c.RUnlock() + return c.Server.Orginization, nil +} + +func (c *ServerConf) GetPort(port string) (int, error) { + c.RLock() + defer c.RUnlock() + portString, ok := c.Server.Ports[port] + if !ok { + portEnv := strings.ToUpper(port) + "_PORT" + return 0, fmt.Errorf("%s port doesnt exist! Please set using env %s=####", port, portEnv) + } + // returns int, err + //return strconv.Atoi(portString) + return portString, nil +} + +func (c *ServerConf) GetServerToken() (string, error) { + c.RLock() + defer c.RUnlock() + return c.Server.Token, nil +} + +func (c *ServerConf) GetReactorClient(id uint32) (string, string, error) { + c.RLock() + defer c.RUnlock() + idString := strconv.FormatUint(uint64(id), 10) + if r, ok := c.Reactors[idString]; ok { + return r.Bucket, r.Token, nil + } + return "", "", fmt.Errorf("reactor %v config doesnt exist", id) +} + +// setters +func (c *ServerConf) UpdateURL(url string) error { + c.Lock() + defer c.Unlock() + if url == "" { + return errors.New("string cannot be empty") + } + c.Server.URL = url + viper.Set("server.db-url", url) + return viper.WriteConfigAs(viper.ConfigFileUsed()) +} + +func (c *ServerConf) UpdateOrg(org string) error { + c.Lock() + defer c.Unlock() + if org == "" { + return errors.New("string cannot be empty") + } + c.Server.Orginization = org + viper.Set("server.db-org", org) + return viper.WriteConfigAs(viper.ConfigFileUsed()) +} + +func (c *ServerConf) UpdatePort(pName string, port int) error { + c.Lock() + defer c.Unlock() + if port < 1024 || port > 65535 { + // OOB + return fmt.Errorf("Port %d out of bounds! [1024,65535]", port) + } + if c.Server.Ports == nil { + c.Server.Ports = make(map[string]int) + } + c.Server.Ports[pName] = port + pname := fmt.Sprintf("server.ports.%s", pName) + viper.Set(pname, port) + return viper.WriteConfigAs(viper.ConfigFileUsed()) +} + +func (c *ServerConf) UpdateServerToken(token string) error { + c.Lock() + defer c.Unlock() + if token == "" { + return errors.New("String cannot be empty!") + } + c.Server.Token = token + viper.Set("server.token", token) + return viper.WriteConfigAs(viper.ConfigFileUsed()) +} + +func (c *ServerConf) UpdateReactorClient(id uint32, bucket, token string) error { + c.Lock() + c.Unlock() + sid := strconv.FormatUint(uint64(id), 10) + if token == "" || bucket == "" { + return errors.New("String cannot be empty!") + } + if reactor, ok := c.Reactors[sid]; !ok { + c.Reactors[sid] = ReactorConfig{Token: token, Bucket: bucket, Id: id} + } else { + reactor.Bucket = bucket + reactor.Token = token + c.Reactors[sid] = reactor + } + reactorbucket := fmt.Sprintf("%s.db-bucket", id) + reactortoken := fmt.Sprintf("%s.db-token", id) + viper.Set(reactorbucket, bucket) + viper.Set(reactortoken, token) + return viper.WriteConfigAs(viper.ConfigFileUsed()) +} + +func (c *ServerConf) Store() error { + return viper.WriteConfigAs(viper.ConfigFileUsed()) +} diff --git a/internal/pkg/config/server.go b/internal/pkg/config/server.go index 9d600b1..3e48a20 100644 --- a/internal/pkg/config/server.go +++ b/internal/pkg/config/server.go @@ -3,82 +3,52 @@ package config // package serves to implement config interface for server import ( - "FRMS/internal/pkg/logging" "errors" "fmt" "strconv" "strings" + "sync" "github.com/spf13/viper" - //"os" - //"log" - //"os/exec" - //"bytes" ) -type Config interface { - Store() error +type ServerConfig struct { + DB DatabaseInfo `mapstructure:"db"` + Ports map[string]int `mapstructure:"ports"` + Name string `mapstructure:"name"` + Reactors map[string]ServerReactorConfig `mapstructure:"reactors"` + sync.RWMutex } -var C Config - -func Load(fname string) error { - // read stored configs - //C = &ServerConf{} - switch fname { - case "server": - C = &ServerConf{} - case "reactor": - C = &ReactorConf{} - default: - return fmt.Errorf("%s not recognized", fname) - } - logging.Debug(logging.DStart, "Loading config for %s", fname) - viper.SetConfigName(fname) - viper.SetConfigType("yaml") - viper.AddConfigPath("/etc/frms/config") - // defaults which we don't want to set nessecarily - //viper.SetDefault("server.db-org", "ForeLight") - //viper.SetDefault("server.db-url", "http://192.168.100.2:8086") - //viper.SetDefault("reactor.db-url", "http://192.168.100.2:8086") - // unmarshalling - err := viper.ReadInConfig() // the fact i never did this is infuriating - if err != nil { - panic(err) - } - logging.Debug(logging.DStart, "CON Loaded configs from %v", viper.ConfigFileUsed()) - err = viper.Unmarshal(C) - if err != nil { - logging.Debug(logging.DError, "Cannot unmarshall Server! %v", err) - panic(err) - } - fmt.Printf("Outcome: %#v\n \n", C) - //fmt.Printf("%v\n",C) - // unmarshalled at this point +type DatabaseInfo struct { + URL string `mapstructure:"url"` + Token string `mapstructure:"token"` + Org string `mapstructure:"org,omitempty"` + Bucket string `mapstructure:"url,omitempty"` } -func LoadConfig() Config { - return C +type ServerReactorConfig struct { + DB DatabaseInfo `mapstructure:"db"` + Name string `mapstructure:"name"` + Id string } -func (c *ServerConf) Load() - -func (c *ServerConf) GetURL() (string, error) { +func (c *ServerConfig) GetURL() (string, error) { c.RLock() defer c.RUnlock() - return C.Server.URL, nil + return c.DB.URL, nil } -func (c *ServerConf) GetOrg() (string, error) { +func (c *ServerConfig) GetOrg() (string, error) { c.RLock() defer c.RUnlock() - return c.Server.Orginization, nil + return c.DB.Org, nil } -func (c *ServerConf) GetPort(port string) (int, error) { +func (c *ServerConfig) GetPort(port string) (int, error) { c.RLock() defer c.RUnlock() - portString, ok := c.Server.Ports[port] + portString, ok := c.Ports[port] if !ok { portEnv := strings.ToUpper(port) + "_PORT" return 0, fmt.Errorf("%s port doesnt exist! Please set using env %s=####", port, portEnv) @@ -88,93 +58,96 @@ func (c *ServerConf) GetPort(port string) (int, error) { return portString, nil } -func (c *ServerConf) GetServerToken() (string, error) { +func (c *ServerConfig) GetServerToken() (string, error) { c.RLock() defer c.RUnlock() - return c.Server.Token, nil + return c.DB.Token, nil } -func (c *ServerConf) GetReactorClient(id uint32) (string, string, error) { +func (c *ServerConfig) GetReactorClient(id uint32) (string, string, error) { c.RLock() defer c.RUnlock() idString := strconv.FormatUint(uint64(id), 10) if r, ok := c.Reactors[idString]; ok { - return r.Bucket, r.Token, nil + return r.DB.Bucket, r.DB.Token, nil } return "", "", fmt.Errorf("reactor %v config doesnt exist", id) } // setters -func (c *ServerConf) UpdateURL(url string) error { +func (c *ServerConfig) UpdateURL(url string) error { c.Lock() defer c.Unlock() if url == "" { return errors.New("string cannot be empty") } - c.Server.URL = url - viper.Set("server.db-url", url) + c.DB.URL = url + viper.Set("db.url", url) return viper.WriteConfigAs(viper.ConfigFileUsed()) } -func (c *ServerConf) UpdateOrg(org string) error { +func (c *ServerConfig) UpdateOrg(org string) error { c.Lock() defer c.Unlock() if org == "" { return errors.New("string cannot be empty") } - c.Server.Orginization = org - viper.Set("server.db-org", org) + c.DB.Org = org + viper.Set("db.org", org) return viper.WriteConfigAs(viper.ConfigFileUsed()) } -func (c *ServerConf) UpdatePort(pName string, port int) error { +func (c *ServerConfig) UpdatePort(pName string, port int) error { c.Lock() defer c.Unlock() if port < 1024 || port > 65535 { // OOB return fmt.Errorf("Port %d out of bounds! [1024,65535]", port) } - if c.Server.Ports == nil { - c.Server.Ports = make(map[string]int) + if c.Ports == nil { + c.Ports = make(map[string]int) } - c.Server.Ports[pName] = port - pname := fmt.Sprintf("server.ports.%s", pName) + c.Ports[pName] = port + pname := fmt.Sprintf("ports.%s", pName) viper.Set(pname, port) return viper.WriteConfigAs(viper.ConfigFileUsed()) } -func (c *ServerConf) UpdateServerToken(token string) error { +func (c *ServerConfig) UpdateServerToken(token string) error { c.Lock() defer c.Unlock() if token == "" { return errors.New("String cannot be empty!") } - c.Server.Token = token - viper.Set("server.token", token) + c.DB.Token = token + viper.Set("db.token", token) return viper.WriteConfigAs(viper.ConfigFileUsed()) } -func (c *ServerConf) UpdateReactorClient(id uint32, bucket, token string) error { +func (c *ServerConfig) UpdateReactorClient(id uint32, bucket, token string) error { c.Lock() c.Unlock() - sid := strconv.FormatUint(uint64(id), 10) - if token == "" || bucket == "" { - return errors.New("String cannot be empty!") - } - if reactor, ok := c.Reactors[sid]; !ok { - c.Reactors[sid] = ReactorConfig{Token: token, Bucket: bucket, Id: id} - } else { - reactor.Bucket = bucket - reactor.Token = token - c.Reactors[sid] = reactor - } - reactorbucket := fmt.Sprintf("%s.db-bucket", id) - reactortoken := fmt.Sprintf("%s.db-token", id) - viper.Set(reactorbucket, bucket) - viper.Set(reactortoken, token) - return viper.WriteConfigAs(viper.ConfigFileUsed()) + /* + sid := strconv.FormatUint(uint64(id), 10) + if token == "" || bucket == "" { + return errors.New("String cannot be empty!") + } + if reactor, ok := c.Reactors[sid]; !ok { + c.Reactors[sid] = ServerReactorConfig{Token: token, Bucket: bucket, Id: id} + } else { + reactor.Bucket = bucket + reactor.Token = token + c.Reactors[sid] = reactor + } + //reactorbucket := fmt.Sprintf("reactors.%s.db-bucket", id) + //reactortoken := fmt.Sprintf("reactors.%s.db-token", id) + viper.Set(fmt.Sprintf("reactors.%s.db-bucket", id), bucket) + viper.Set(fmt.Sprintf("reactors.%s.db-token", id), token) + return viper.WriteConfigAs(viper.ConfigFileUsed()) + */ + return errors.New("Unimpl") } -func (c *ServerConf) Store() error { +func (c *ServerConfig) Store() error { return viper.WriteConfigAs(viper.ConfigFileUsed()) } diff --git a/internal/pkg/influxdb/client.go b/internal/pkg/influxdb/client.go index 055ef87..21b5f5e 100644 --- a/internal/pkg/influxdb/client.go +++ b/internal/pkg/influxdb/client.go @@ -1,46 +1,53 @@ package influxdb import ( - _ "fmt" - _ "github.com/influxdata/influxdb-client-go/v2" + "errors" + _ "fmt" + + _ "github.com/influxdata/influxdb-client-go/v2" ) type DBClient struct { - URL string - Org string - Bucket string - Token string - // Client *influxdb2.Client + URL string + Org string + Bucket string + Token string + // Client *influxdb2.Client } type DBAdmin struct { - // struct for admin methods - *DBClient + // struct for admin methods + *DBClient } func NewDBClient(url, org, token string) *DBClient { - db := &DBClient{URL:url, Org:org, Token:token} - return db + db := &DBClient{URL: url, Org: org, Token: token} + return db } func NewDBAdmin(url, org, token string) *DBAdmin { - admin := &DBAdmin{} - admin.DBClient = NewDBClient(url, org, token) - return admin + admin := &DBAdmin{} + admin.DBClient = NewDBClient(url, org, token) + return admin } // base level funcs func (d *DBClient) Start() { - // connect to DB + // connect to DB } func (d *DBAdmin) GetReactorClient(id string) (string, string, error) { // given an id returns associated token and bucket - client := influxdb2.NewClient(d.URL,d.Token) - defer client.Close() - bucket, err := client.BucketsAPI().FindBucketByName(context.Background(),id) - if err != nil { - return "", "", err - } - if d.ReactorExists(id) { - // get corresponding reactor token and bucket + /* + client := influxdb2.NewClient(d.URL, d.Token) + defer client.Close() + bucket, err := client.BucketsAPI().FindBucketByName(context.Background(), id) + if err != nil { + return "", "", err + } + if d.ReactorExists(id) { + // get corresponding reactor token and bucket + } + */ + return "", "", errors.New("Unimpl") +} diff --git a/internal/pkg/server/coordinator.go b/internal/pkg/server/coordinator.go index 9e3999a..aaa33a7 100644 --- a/internal/pkg/server/coordinator.go +++ b/internal/pkg/server/coordinator.go @@ -1,28 +1,25 @@ package server import ( - "FRMS/internal/pkg/config" pb "FRMS/internal/pkg/grpc" "FRMS/internal/pkg/influxdb" _ "FRMS/internal/pkg/influxdb" "FRMS/internal/pkg/logging" "context" "errors" - "fmt" - "net" "sync" - - "google.golang.org/grpc" ) // this package creates the central coordiantor and sub coordiantors for clients // interfaces // config interface +/* func LoadConfig() Config { // returns a ServerConfig that we can query and update - return config.LoadConfig() + //return config.LoadConfig() } +*/ type Config interface { // PROPOSED RENAMING: ServerConfig to avoid confusion w/ reactor variant @@ -46,8 +43,8 @@ type DB interface { //GetToken() (string, error) // returns admin token (Creates if it doesnt exist) GetReactorClient(string) (string, string, error) // returns (bucket, token, err) // delete - DeleteReactorClient(string) error // removes client token but maintains bucket - PurgeReactorClientData(string) error // perm deletes all assocaited reactor data (token, bucket etc) + // DeleteReactorClient(string) error // removes client token but maintains bucket + // PurgeReactorClientData(string) error // perm deletes all assocaited reactor data (token, bucket etc) } func NewDBAdmin(url, org, token string) DB { @@ -82,7 +79,7 @@ func NewCentralCoordinator(ch chan error) *CentralCoordinator { func (c *CentralCoordinator) Start() { // starts up associated funcs // begin with config and client - c.LoadCfg() + //c.LoadCfg() clientChan := make(chan *ClientPacket) l := NewListener(clientChan, c.Err) go l.Start() @@ -90,6 +87,7 @@ func (c *CentralCoordinator) Start() { } +/* func (c *CentralCoordinator) LoadCfg() { // loads db client info and updates if anything is missing c.Config = LoadConfig() @@ -109,6 +107,7 @@ func (c *CentralCoordinator) LoadCfg() { c.Err <- err } } +*/ func (c *CentralCoordinator) ClientListener(ch chan *ClientPacket) { for client := range ch { @@ -259,19 +258,21 @@ func (r *reactorCoordinator) NewManager(cl *Client, sys *SystemViewer, err chan } func (r *reactorCoordinator) Register() { - conf := LoadConfig() - port, err := conf.GetPort("reactor") - if err != nil { - panic(err) - } - lis, err := net.Listen("tcp", fmt.Sprintf(":%v", port)) - if err != nil { - panic(err) - } - grpcServer := grpc.NewServer() - pb.RegisterMonitoringServer(grpcServer, r) - go grpcServer.Serve(lis) - logging.Debug(logging.DClient, "RCO ready for client requests") + //conf := LoadConfig() + /* + port, err := conf.GetPort("reactor") + if err != nil { + panic(err) + } + lis, err := net.Listen("tcp", fmt.Sprintf(":%v", port)) + if err != nil { + panic(err) + } + grpcServer := grpc.NewServer() + pb.RegisterMonitoringServer(grpcServer, r) + go grpcServer.Serve(lis) + logging.Debug(logging.DClient, "RCO ready for client requests") + */ } func (r *reactorCoordinator) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) { @@ -286,7 +287,7 @@ func (r *reactorCoordinator) ReactorStatusHandler(ctx context.Context, req *pb.R return rm.ReactorStatusHandler(ctx, req) } -//tui coordinator +// tui coordinator type tuiCoordinator struct { *Managers // by embedding general struct we allow coordinator to still call general funcs pb.UnimplementedManagementServer @@ -302,19 +303,21 @@ func (t *tuiCoordinator) NewManager(cl *Client, sys *SystemViewer, err chan erro } func (t *tuiCoordinator) Register() { - conf := LoadConfig() - port, err := conf.GetPort("tui") - if err != nil { - panic(err) - } - lis, err := net.Listen("tcp", fmt.Sprintf(":%v", port)) - if err != nil { - // rip - } - grpcServer := grpc.NewServer() - pb.RegisterManagementServer(grpcServer, t) - go grpcServer.Serve(lis) - logging.Debug(logging.DClient, "TCO ready for client requests") + /* + conf := LoadConfig() + port, err := conf.GetPort("tui") + if err != nil { + panic(err) + } + lis, err := net.Listen("tcp", fmt.Sprintf(":%v", port)) + if err != nil { + // rip + } + grpcServer := grpc.NewServer() + pb.RegisterManagementServer(grpcServer, t) + go grpcServer.Serve(lis) + logging.Debug(logging.DClient, "TCO ready for client requests") + */ } func (t *tuiCoordinator) GetDevices(ctx context.Context, req *pb.GetDevicesRequest) (*pb.GetDevicesResponse, error) { diff --git a/internal/pkg/server/listener.go b/internal/pkg/server/listener.go index c531a7b..c56a226 100644 --- a/internal/pkg/server/listener.go +++ b/internal/pkg/server/listener.go @@ -1,14 +1,11 @@ package server import ( - //"log" - "fmt" - "net" - "context" - // "FRMS/internal/pkg/system" - "FRMS/internal/pkg/logging" - "google.golang.org/grpc" - pb "FRMS/internal/pkg/grpc" + //"log" + "context" + // "FRMS/internal/pkg/system" + pb "FRMS/internal/pkg/grpc" + "FRMS/internal/pkg/logging" ) /* @@ -16,74 +13,76 @@ Listens on a supplied port and sends incoming clients over a supplied channel */ type Listener struct { // exporting for easy use in the short term - ClientConnections chan *ClientPacket - Err chan error - pb.UnimplementedHandshakeServer + ClientConnections chan *ClientPacket + Err chan error + pb.UnimplementedHandshakeServer } type ClientPacket struct { - *Client - Response chan *ClientResponse + *Client + Response chan *ClientResponse } type Client struct { - // general client struct to store reqs from reactors/tui - Ip string - Port int - Id uint32 - Model string - Type string + // general client struct to store reqs from reactors/tui + Ip string + Port int + Id uint32 + Model string + Type string } type ClientResponse struct { - Port int - URL string - Org string - Token string - Bucket string + Port int + URL string + Org string + Token string + Bucket string } func NewListener(cch chan *ClientPacket, ech chan error) *Listener { - l := &Listener{Err:ech,ClientConnections:cch} - return l + l := &Listener{Err: ech, ClientConnections: cch} + return l } func (l *Listener) Start() { - // start grpc server and implement reciever - if err := l.Register(); err != nil { - l.Err <- err - } - logging.Debug(logging.DStart,"LIS 01 Started client listener") + // start grpc server and implement reciever + if err := l.Register(); err != nil { + l.Err <- err + } + logging.Debug(logging.DStart, "LIS 01 Started client listener") } func (l *Listener) Register() error { - // creates a gRPC service and binds it to our handler - conf := LoadConfig() - port, err := conf.GetPort("lis") - if err != nil { - return err - } - lis, err := net.Listen("tcp", fmt.Sprintf(":%v",port)) // either binding to supplied port or binding to docker default - if err != nil { - return err - } - grpcServer := grpc.NewServer() - pb.RegisterHandshakeServer(grpcServer, l) - go grpcServer.Serve(lis) - logging.Debug(logging.DStart, "LIS 01 Registered on port %v", port) - return nil + // creates a gRPC service and binds it to our handler + /* + conf := LoadConfig() + port, err := conf.GetPort("lis") + if err != nil { + return err + } + lis, err := net.Listen("tcp", fmt.Sprintf(":%v",port)) // either binding to supplied port or binding to docker default + if err != nil { + return err + } + grpcServer := grpc.NewServer() + pb.RegisterHandshakeServer(grpcServer, l) + go grpcServer.Serve(lis) + logging.Debug(logging.DStart, "LIS 01 Registered on port %v", port) + */ + return nil } func (l *Listener) ClientDiscoveryHandler(ctx context.Context, ping *pb.ClientRequest) (*pb.ClientResponse, error) { - // incoming reactor ping need to spawn coord - c := &Client{Id:ping.GetClientId(),Type:ping.GetClientType()} - logging.Debug(logging.DClient, "LIS %v %v has connected\n",c.Type,c.Id) - ch := make(chan *ClientResponse) - p := &ClientPacket{Response:ch} - p.Client = c - l.ClientConnections <-p - resp := <-ch - // return the port for the incoming requests - db := &pb.Database{URL:resp.URL,ORG:resp.Org,Token:resp.Token,Bucket:resp.Bucket} - return &pb.ClientResponse{ClientId:c.Id,ServerPort:uint32(resp.Port),Database:db}, nil + // incoming reactor ping need to spawn coord + c := &Client{Id: ping.GetClientId(), Type: ping.GetClientType()} + logging.Debug(logging.DClient, "LIS %v %v has connected\n", c.Type, c.Id) + ch := make(chan *ClientResponse) + p := &ClientPacket{Response: ch} + p.Client = c + l.ClientConnections <- p + resp := <-ch + // return the port for the incoming requests + db := &pb.Database{URL: resp.URL, ORG: resp.Org, Token: resp.Token, Bucket: resp.Bucket} + return &pb.ClientResponse{ClientId: c.Id, ServerPort: uint32(resp.Port), Database: db}, nil } diff --git a/notes.md b/notes.md index 6b0f7d5..5d32cf5 100644 --- a/notes.md +++ b/notes.md @@ -16,8 +16,7 @@ *I need to put the whole docker thing on the back burner for now. It isn't that important when it comes to immediate goals.* -**12/05** -#### TODO +#### 12/05 TODO - Cleanup server side config stuff to make it coherent - Reflect changes to reactor side startup - Boil down interface to address core issues @@ -26,6 +25,9 @@ 2) Overwrite any previous settings with the flags 3) Intelligently translate config into action 4) launch coordinator and start up existing reactor managers +- Config Structure: + - Wrap viper functions in config struct methods to be used thrtugh interfaces + - minimize the reliance on viper so we can sub in othermethods - is it even important to launch reactor managers? Wont they just be started on connection?