diff --git a/.dockerignore b/.dockerignore index 8d54bcf..5bba498 100644 --- a/.dockerignore +++ b/.dockerignore @@ -10,3 +10,4 @@ !go.sum !server !reactor +!.env diff --git a/.gitignore b/.gitignore index 1566793..375a86c 100644 --- a/.gitignore +++ b/.gitignore @@ -29,3 +29,4 @@ cmd/tui/tui # machine dependent tokens/ logs/ +influxdb/config diff --git a/Dockerfile.server b/Dockerfile.server index df78994..4c29de6 100644 --- a/Dockerfile.server +++ b/Dockerfile.server @@ -12,8 +12,6 @@ RUN CGO_ENABLED=0 go build -o /server ./cmd/server/main.go FROM alpine COPY --from=builder /server . -COPY --from=builder /app/internal/configs/ ./configs -COPY --from=builder /app/tokens/ ./tokens EXPOSE 2022 EXPOSE 2023 diff --git a/cmd/server/main.go b/cmd/server/main.go index eaef3c2..29fc237 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -4,6 +4,7 @@ import ( _"net/http" _ "net/http/pprof" "strconv" + "strings" //"flag" //"log" "os" @@ -17,12 +18,17 @@ type coordinator interface { Start() } -func NewCoordinator(port int, ch chan error) coordinator { - return server.NewCentralCoordinator(port, ch) +func NewCoordinator(ch chan error) coordinator { + return server.NewCentralCoordinator(ch) } -func LoadConfig(fname string) { +func LoadConfig(fname string) Config { config.Load(fname) + return config.LoadConfig() +} + +type Config interface { + UpdatePort(string, int) error } func main() { @@ -31,24 +37,34 @@ func main() { // go func() { // fmt.Println(http.ListenAndServe("localhost:6060",nil)) // }() - LoadConfig("server") + conf := LoadConfig("server") ch := make(chan error) - var port int - var err error - - if p := os.Getenv("gRPC_PORT"); p == "" { - port = 2022 // default docker port - } else { - if port, err = strconv.Atoi(p); err != nil { - panic(err) + // checking env + envVars := os.Environ() + for _, envString := range envVars { + // looping over set ports + initSplt := strings.Split(envString,"=") + key := initSplt[0] + val := initSplt[1] + if strings.Contains(key,"PORT") { + // parsing out correct port to update + splt := strings.Split(key,"_") // LIS_PORT -> LIS, PORT + portName := strings.ToLower(splt[0]) // LIS -> lis + port, err := strconv.Atoi(val) + if err != nil { + panic(err) + } + if err := conf.UpdatePort(portName,port); err != nil { + panic(err) + } } } //fmt.Printf("Listening on %v\n", lport) - c := NewCoordinator(port, ch) + c := NewCoordinator(ch) go c.Start() fmt.Println("Server Active!") logging.Debug(logging.DStart, "CCO 01 Server started") - err = <-ch // blocking to wait for any errors and keep alive otherwise + err := <-ch // blocking to wait for any errors and keep alive otherwise panic(err) } diff --git a/docker-compose.yml b/docker-compose.yml index 0f96204..ca17e39 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -10,6 +10,7 @@ services: - "2023:2023" volumes: - ./logs:/log + - server-config:/configs environment: - LOGTYPE=SERVER - VERBOSE=1 @@ -21,13 +22,16 @@ services: - "8086:8086" volumes: - influx-data:/var/lib/influxdb2 - - influx-config:/etc/influxdb2 + - ./influxdb/startup:/docker-entrypoint-initdb.d + - server-config:/server-config + env_file: + - ./internal/configs/db.env environment: - DOCKER_INFLUXDB_INIT_MODE=setup - - DOCKER_INFLUXDB_INIT_USERNAME=${INFLUXDB_USERNAME} - - DOCKER_INFLUXDB_INIT_PASSWORD=${INFLUXDB_PASSWORD} - - DOCKER_INFLUXDB_INIT_ORG=${INFLUXDB_ORG} - - DOCKER_INFLUXDB_INIT_BUCKET=${INFLUXDB_BUCKET} + - DOCKER_INFLUXDB_INIT_USERNAME=admin + - DOCKER_INFLUXDB_INIT_PASSWORD=admin + - DOCKER_INFLUXDB_INIT_ORG=ForeLight + - DOCKER_INFLUXDB_INIT_BUCKET=default grafana: image: grafana/grafana-oss:latest ports: @@ -37,4 +41,4 @@ services: volumes: grafana-data: influx-data: - influx-config: + server-config: diff --git a/influxdb/startup/influxsetup.sh b/influxdb/startup/influxsetup.sh new file mode 100755 index 0000000..90474c8 --- /dev/null +++ b/influxdb/startup/influxsetup.sh @@ -0,0 +1,16 @@ +#!/bin/bash + +TOKEN=$(influx auth list --user ${DOCKER_INFLUXDB_INIT_USER_ID} --hide-headers | cut -f 3) + +export db_url=$INFLUX_HOST +export db_org=$DOCKER_INFLUXDB_INIT_ORG_ID +export db_token=$TOKEN + +rm -f temp.yaml +( echo "cat <final.yaml"; + cat template.yaml; + echo "EOF"; +) >temp.yaml +. temp.yaml +cat final.yaml +mv final.yaml /server-config/server.yaml diff --git a/influxdb/startup/template.yaml b/influxdb/startup/template.yaml new file mode 100644 index 0000000..d832594 --- /dev/null +++ b/influxdb/startup/template.yaml @@ -0,0 +1,6 @@ +---- +# ${gen_statement} +server: + db-url: "${db_url}" + db-org: "${db_org}" + db-token: "${db_token}" diff --git a/internal/configs/db.env b/internal/configs/db.env new file mode 100644 index 0000000..dbc19f5 --- /dev/null +++ b/internal/configs/db.env @@ -0,0 +1,5 @@ +INFLUXDB_USERNAME=admin +INFLUXDB_PASSWORD=admin +INFLUXDB_ORG=ForeLight +INFLUXDB_BUCKET=default + diff --git a/internal/pkg/config/server.go b/internal/pkg/config/server.go index 640e4bc..4443a7d 100644 --- a/internal/pkg/config/server.go +++ b/internal/pkg/config/server.go @@ -8,23 +8,25 @@ import ( "github.com/spf13/viper" "FRMS/internal/pkg/logging" "errors" + "sync" + "strings" //"os" //"log" //"os/exec" //"bytes" - //"strings" ) type Config struct { Server ServerConfig `mapstructure:"server"` Reactors map[string]ReactorConfig `mapstructure:"reactors"` + sync.RWMutex } type ServerConfig struct { URL string `mapstructure:"db-url"` Token string `mapstructure:"db-token"` Orginization string `mapstructure:"db-org"` - Ports map[string]string `mapstructure:"ports"` + Ports map[string]int `mapstructure:"ports"` // changed from map[string]string to map[string]int Name string `mapstructure:"name"` } @@ -56,30 +58,111 @@ func Load(fname string) { panic(err) } fmt.Printf("Outcome: %#v\n \n",C) - fmt.Printf("%v\n",C) + //fmt.Printf("%v\n",C) // unmarshalled at this point } -func LoadStruct() *Config { +func LoadConfig() *Config { return C } func (c *Config) GetURL() (string, error) { + c.RLock() + defer c.RUnlock() return C.Server.URL, nil } func (c *Config) GetOrg() (string, error) { + c.RLock() + defer c.RUnlock() return c.Server.Orginization, nil } +func (c *Config) GetPort(port string) (int, error) { + c.RLock() + defer c.RUnlock() + portString, ok := c.Server.Ports[port] + if !ok { + portEnv := strings.ToUpper(port) + "_PORT" + return 0, fmt.Errorf("%s port doesnt exist! Please set using env %s=####",port, portEnv) + } + // returns int, err + //return strconv.Atoi(portString) + return portString, nil +} + func (c *Config) GetServerToken() (string, error) { + c.RLock() + defer c.RUnlock() return c.Server.Token, nil } -func (c *Config) GetReactorToken(id uint32) (string, error) { +func (c *Config) GetReactorClient(id uint32) (string, string, error) { + c.RLock() + defer c.RUnlock() idString := strconv.FormatUint(uint64(id),10) if r, ok := c.Reactors[idString]; ok { - return r.Token, nil + return r.Bucket, r.Token, nil + } + return "", "", fmt.Errorf("Reactor %v config doesnt exist!",id) +} + +// setters +func (c *Config) UpdateURL(url string) error { + c.Lock() + defer c.Unlock() + if url == "" { + return errors.New("String cannot be empty!") + } + c.Server.URL = url + return viper.WriteConfig() +} + +func (c *Config) UpdateOrg(org string) error { + c.Lock() + defer c.Unlock() + if org == "" { + return errors.New("String cannot be empty!") + } + c.Server.Orginization = org + return viper.WriteConfig() +} + +func (c *Config) UpdatePort(pName string, port int) error { + c.Lock() + defer c.Unlock() + if port < 1024 || port > 65535 { + // OOB + return fmt.Errorf("Port %d out of bounds! [1024,65535]",port) + } + c.Server.Ports[pName] = port + return nil +} + +func (c *Config) UpdateServerToken(token string) error { + c.Lock() + defer c.Unlock() + if token == "" { + return errors.New("String cannot be empty!") } - return "", errors.New(fmt.Sprintf("Reactor %v config not found!",id)) + c.Server.Token = token + return viper.WriteConfig() } + +func (c *Config) UpdateReactorClient(id uint32, bucket, token string) error { + c.Lock() + c.Unlock() + sid := strconv.FormatUint(uint64(id), 10) + if token == "" || bucket == "" { + return errors.New("String cannot be empty!") + } + if reactor, ok := c.Reactors[sid]; !ok { + c.Reactors[sid] = ReactorConfig{Token:token,Bucket:bucket,Id:id} + } else { + reactor.Bucket = bucket + reactor.Token = token + c.Reactors[sid] = reactor + } + return viper.WriteConfig() +} + diff --git a/internal/pkg/grpc/server.pb.go b/internal/pkg/grpc/server.pb.go index cf2c410..20b821c 100644 --- a/internal/pkg/grpc/server.pb.go +++ b/internal/pkg/grpc/server.pb.go @@ -80,8 +80,9 @@ type ClientResponse struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - ClientId uint32 `protobuf:"varint,1,opt,name=clientId,proto3" json:"clientId,omitempty"` - ServerPort uint32 `protobuf:"varint,2,opt,name=serverPort,proto3" json:"serverPort,omitempty"` + ClientId uint32 `protobuf:"varint,1,opt,name=clientId,proto3" json:"clientId,omitempty"` + ServerPort uint32 `protobuf:"varint,2,opt,name=serverPort,proto3" json:"serverPort,omitempty"` + Database *Database `protobuf:"bytes,3,opt,name=database,proto3" json:"database,omitempty"` } func (x *ClientResponse) Reset() { @@ -130,6 +131,84 @@ func (x *ClientResponse) GetServerPort() uint32 { return 0 } +func (x *ClientResponse) GetDatabase() *Database { + if x != nil { + return x.Database + } + return nil +} + +type Database struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + URL string `protobuf:"bytes,1,opt,name=URL,proto3" json:"URL,omitempty"` + ORG string `protobuf:"bytes,2,opt,name=ORG,proto3" json:"ORG,omitempty"` + Token string `protobuf:"bytes,3,opt,name=token,proto3" json:"token,omitempty"` + Bucket string `protobuf:"bytes,4,opt,name=bucket,proto3" json:"bucket,omitempty"` +} + +func (x *Database) Reset() { + *x = Database{} + if protoimpl.UnsafeEnabled { + mi := &file_internal_pkg_grpc_server_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Database) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Database) ProtoMessage() {} + +func (x *Database) ProtoReflect() protoreflect.Message { + mi := &file_internal_pkg_grpc_server_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Database.ProtoReflect.Descriptor instead. +func (*Database) Descriptor() ([]byte, []int) { + return file_internal_pkg_grpc_server_proto_rawDescGZIP(), []int{2} +} + +func (x *Database) GetURL() string { + if x != nil { + return x.URL + } + return "" +} + +func (x *Database) GetORG() string { + if x != nil { + return x.ORG + } + return "" +} + +func (x *Database) GetToken() string { + if x != nil { + return x.Token + } + return "" +} + +func (x *Database) GetBucket() string { + if x != nil { + return x.Bucket + } + return "" +} + var File_internal_pkg_grpc_server_proto protoreflect.FileDescriptor var file_internal_pkg_grpc_server_proto_rawDesc = []byte{ @@ -140,18 +219,27 @@ var file_internal_pkg_grpc_server_proto_rawDesc = []byte{ 0x74, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x54, - 0x79, 0x70, 0x65, 0x22, 0x4c, 0x0a, 0x0e, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, + 0x79, 0x70, 0x65, 0x22, 0x78, 0x0a, 0x0e, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x50, 0x6f, 0x72, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x50, 0x6f, 0x72, - 0x74, 0x32, 0x50, 0x0a, 0x09, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x12, 0x43, - 0x0a, 0x16, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, - 0x79, 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x12, 0x13, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, - 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, - 0x67, 0x72, 0x70, 0x63, 0x2e, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x42, 0x13, 0x5a, 0x11, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, - 0x70, 0x6b, 0x67, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x74, 0x12, 0x2a, 0x0a, 0x08, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x62, + 0x61, 0x73, 0x65, 0x52, 0x08, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x22, 0x5c, 0x0a, + 0x08, 0x44, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x55, 0x52, 0x4c, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x55, 0x52, 0x4c, 0x12, 0x10, 0x0a, 0x03, 0x4f, + 0x52, 0x47, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x4f, 0x52, 0x47, 0x12, 0x14, 0x0a, + 0x05, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, + 0x6b, 0x65, 0x6e, 0x12, 0x16, 0x0a, 0x06, 0x62, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x18, 0x04, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x06, 0x62, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x32, 0x50, 0x0a, 0x09, 0x68, + 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x12, 0x43, 0x0a, 0x16, 0x43, 0x6c, 0x69, 0x65, + 0x6e, 0x74, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x48, 0x61, 0x6e, 0x64, 0x6c, + 0x65, 0x72, 0x12, 0x13, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x43, + 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x13, 0x5a, + 0x11, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x67, 0x72, + 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -166,19 +254,21 @@ func file_internal_pkg_grpc_server_proto_rawDescGZIP() []byte { return file_internal_pkg_grpc_server_proto_rawDescData } -var file_internal_pkg_grpc_server_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_internal_pkg_grpc_server_proto_msgTypes = make([]protoimpl.MessageInfo, 3) var file_internal_pkg_grpc_server_proto_goTypes = []interface{}{ (*ClientRequest)(nil), // 0: grpc.ClientRequest (*ClientResponse)(nil), // 1: grpc.ClientResponse + (*Database)(nil), // 2: grpc.Database } var file_internal_pkg_grpc_server_proto_depIdxs = []int32{ - 0, // 0: grpc.handshake.ClientDiscoveryHandler:input_type -> grpc.ClientRequest - 1, // 1: grpc.handshake.ClientDiscoveryHandler:output_type -> grpc.ClientResponse - 1, // [1:2] is the sub-list for method output_type - 0, // [0:1] is the sub-list for method input_type - 0, // [0:0] is the sub-list for extension type_name - 0, // [0:0] is the sub-list for extension extendee - 0, // [0:0] is the sub-list for field type_name + 2, // 0: grpc.ClientResponse.database:type_name -> grpc.Database + 0, // 1: grpc.handshake.ClientDiscoveryHandler:input_type -> grpc.ClientRequest + 1, // 2: grpc.handshake.ClientDiscoveryHandler:output_type -> grpc.ClientResponse + 2, // [2:3] is the sub-list for method output_type + 1, // [1:2] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name } func init() { file_internal_pkg_grpc_server_proto_init() } @@ -211,6 +301,18 @@ func file_internal_pkg_grpc_server_proto_init() { return nil } } + file_internal_pkg_grpc_server_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Database); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -218,7 +320,7 @@ func file_internal_pkg_grpc_server_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_internal_pkg_grpc_server_proto_rawDesc, NumEnums: 0, - NumMessages: 2, + NumMessages: 3, NumExtensions: 0, NumServices: 1, }, diff --git a/internal/pkg/grpc/server.proto b/internal/pkg/grpc/server.proto index 4ad5135..23e999b 100644 --- a/internal/pkg/grpc/server.proto +++ b/internal/pkg/grpc/server.proto @@ -22,4 +22,5 @@ message Database { string URL = 1; string ORG = 2; string token = 3; + string bucket = 4; } diff --git a/internal/pkg/influxdb/client.go b/internal/pkg/influxdb/client.go index edac4cd..48cc862 100644 --- a/internal/pkg/influxdb/client.go +++ b/internal/pkg/influxdb/client.go @@ -4,3 +4,31 @@ import ( _ "fmt" _ "github.com/influxdata/influxdb-client-go/v2" ) + +type DBClient struct { + URL string + Bucket string + Token string + // Client *influxdb2.Client +} + +type DBAdmin struct { + // struct for admin methods + *DBClient +} + +func NewDBClient(url, bucket, token string) *DBClient { + db := &DBClient{URL:url, Bucket:bucket, Token:token} + return db +} + +func NewDBAdmin(url, bucket, token string) *DBAdmin { + admin := &DBAdmin{} + admin.DBClient = NewDBClient(url, bucket, token) + return admin +} +// base level funcs +func (d *DBClient) Start() { + // connect to DB + // d.Client = influxdb2.NewClient(d.URL,d.Token) +} diff --git a/internal/pkg/server/coordinator.go b/internal/pkg/server/coordinator.go index 3ddfefd..50054da 100644 --- a/internal/pkg/server/coordinator.go +++ b/internal/pkg/server/coordinator.go @@ -2,7 +2,7 @@ package server import ( "sync" - //"fmt" + "fmt" "net" "context" "errors" @@ -19,21 +19,34 @@ import ( // config interface func LoadConfig() Config { - return config.LoadStruct() + // returns a ServerConfig that we can query and update + return config.LoadConfig() } -type Config interface { - GetURL() (string, error) - GetOrg() (string, error) - GetServerToken() (string,error) - GetReactorToken(uint32) (string, error) +type Config interface { // PROPOSED RENAMING: ServerConfig to avoid confusion w/ reactor variant + + // getters + GetURL() (string, error) + GetOrg() (string, error) + GetPort(string) (int, error) + GetServerToken() (string, error) + GetReactorClient(uint32) (string, string, error) // ret (bucket, token, err) + // setters + // save on write + UpdateURL(string) error + UpdateOrg(string) error + UpdateServerToken(string) error + UpdateReactorClient(uint32, string, string) error // call (id, bucket, token) } // db client interface -type DBClient interface{ - GetBucket(uint32) (string, error) // will create if it doesn't exist - GetToken(uint32) (string, error) // will create if it doesn't exist - GetURL() (string, error) // returns the ipaddres + dbport +type DB interface{ + // getters (all create if doesnt exist) + GetToken() (string, error) // returns admin token (Creates if it doesnt exist) + GetReactorClient(uint32) (string, string, error) // returns (bucket, token, err) + // delete + DeleteReactorClient(uint32) error // removes client token but maintains bucket + PurgeReactorClientData(uint32) error // perm deletes all assocaited reactor data (token, bucket etc) } /*func NewDBClient() DBClient { @@ -43,9 +56,10 @@ type DBClient interface{ type CentralCoordinator struct { ClientConnections *ClientPacket - CLisPort int + //CLisPort int *SubCoordinators *SystemViewer + DB Config Err chan error } @@ -55,9 +69,8 @@ type SubCoordinators struct { sync.Mutex } -func NewCentralCoordinator(port int, ch chan error) *CentralCoordinator { - c := &CentralCoordinator{CLisPort: port, Err: ch} - c.Config = LoadConfig() +func NewCentralCoordinator(ch chan error) *CentralCoordinator { + c := &CentralCoordinator{Err: ch} c.SystemViewer = NewSystemViewer() go c.SystemViewer.Start() s := make(map[string]*SubCoordinator) @@ -68,22 +81,56 @@ func NewCentralCoordinator(port int, ch chan error) *CentralCoordinator { func (c *CentralCoordinator) Start() { // starts up associated funcs + // begin with config and client + c.LoadCfg() clientChan := make(chan *ClientPacket) - l := NewListener(c.CLisPort,clientChan,c.Err) + l := NewListener(clientChan,c.Err) go l.Start() go c.ClientListener(clientChan) } +func (c *CentralCoordinator) LoadCfg() { + // loads db client info and updates if anything is missing + c.Config = LoadConfig() + _, err := c.Config.GetURL() + if err != nil { + logging.Debug(logging.DError,"CCO 01 Err: %v", err) + c.Err <-err + } + token, err := c.Config.GetServerToken() + if err != nil { + logging.Debug(logging.DError,"CCO 01 Err: %v", err) + c.Err <-err + } else if token == "" { + if token, err = c.DB.GetToken(); err != nil { + logging.Debug(logging.DError,"CCO 01 Err: %v", err) + c.Err <-err + } + c.Config.UpdateServerToken(token) + } + org, err := c.Config.GetOrg() + if err != nil { + logging.Debug(logging.DError,"CCO 01 Err: %v", err) + c.Err <-err + } else if token == "" { + if token, err = c.DB.GetToken(); err != nil { + logging.Debug(logging.DError,"CCO 01 Err: %v", err) + c.Err <-err + } + c.Config.UpdateOrg(org) + } +} + func (c *CentralCoordinator) ClientListener(ch chan *ClientPacket) { for client := range ch { // basically loops until channel is closed - port := c.ClientHandler(client.Client) - client.Port <-port + cr := c.ClientHandler(client.Client) + client.Response <-cr } } -func (c *CentralCoordinator) ClientHandler(cl *Client) int { +func (c *CentralCoordinator) ClientHandler(cl *Client) *ClientResponse { c.SubCoordinators.Lock() defer c.SubCoordinators.Unlock() subcoord, ok := c.SubCoordinators.Directory[cl.Type] @@ -94,7 +141,25 @@ func (c *CentralCoordinator) ClientHandler(cl *Client) int { c.SubCoordinators.Directory[cl.Type] = subcoord } go subcoord.ClientHandler(cl) - return subcoord.Port + // setting up client response + var url, org, token, bucket string + var port int + var err error + if url, err = c.Config.GetURL(); err != nil { + logging.Debug(logging.DError,"Error: %v", err) + c.Err <-err + } else if org, err = c.Config.GetOrg(); err != nil { + logging.Debug(logging.DError,"Error: %v", err) + c.Err <-err + } else if bucket, token, err = c.Config.GetReactorClient(cl.Id); err != nil { + logging.Debug(logging.DError,"Error: %v", err) + c.Err <-err + } else if port, err = c.Config.GetPort(cl.Type); err != nil { + logging.Debug(logging.DError,"Error: %v", err) + c.Err <-err + } + cr := &ClientResponse{URL:url, Org:org, Token:token, Bucket:bucket, Port:port} + return cr } type ManagerInterface interface { @@ -128,21 +193,20 @@ type Managers struct { func NewSubCoordinator(clientType string, sys *SystemViewer, err chan error) *SubCoordinator { c := &SubCoordinator{Err:err} c.SystemViewer = sys - man, port, errs := NewCoordinatorType(clientType, err) + man, errs := NewCoordinatorType(clientType, err) if errs != nil { err <-errs } - c.Port = port c.ManagerInterface = man go man.Start() go man.Register() return c } -func (c *SubCoordinator) ClientHandler(cl *Client) int { +func (c *SubCoordinator) ClientHandler(cl *Client) { // (creates and) notifies manager of client connection - go c.UpdateManager(cl) - return c.Port + + c.UpdateManager(cl) } func (c *SubCoordinator) UpdateManager(cl *Client) { @@ -174,21 +238,21 @@ func (m *Managers) GetManager(id uint32) (GeneralManager, bool) { return man.(GeneralManager), exists } -func NewCoordinatorType(clientType string, err chan error) (ManagerInterface, int, error) { +func NewCoordinatorType(clientType string, err chan error) (ManagerInterface, error) { m := make(map[uint32]interface{}) if clientType == "reactor" { c := &reactorCoordinator{} //m := make(map[uint32]*ReactorManager) c.Managers = &Managers{Directory:m} - return c, 2023, nil + return c, nil } else if clientType == "tui" { c := &tuiCoordinator{} //m := make(map[uint32]*TUIManager) c.Managers = &Managers{Directory:m} - return c, 2024, nil + return c, nil } - return &reactorCoordinator{}, 0, errors.New("Unrecognized client type") + return &reactorCoordinator{}, errors.New("Unrecognized client type") } // creating sub coordinators for associated gRPC handlers @@ -208,9 +272,14 @@ func (r *reactorCoordinator) NewManager(cl *Client, sys *SystemViewer, err chan } func (r *reactorCoordinator) Register() { - lis, err := net.Listen("tcp", ":2023") + conf := LoadConfig() + port, err := conf.GetPort("reactor") if err != nil { - // rip + panic(err) + } + lis, err := net.Listen("tcp", fmt.Sprintf(":%v",port)) + if err != nil { + panic(err) } grpcServer := grpc.NewServer() pb.RegisterMonitoringServer(grpcServer,r) @@ -246,7 +315,12 @@ func (t *tuiCoordinator) NewManager(cl *Client, sys *SystemViewer, err chan erro } func (t *tuiCoordinator) Register() { - lis, err := net.Listen("tcp", ":2024") + conf := LoadConfig() + port, err := conf.GetPort("tui") + if err != nil { + panic(err) + } + lis, err := net.Listen("tcp", fmt.Sprintf(":%v",port)) if err != nil { // rip } diff --git a/internal/pkg/server/listener.go b/internal/pkg/server/listener.go index f575404..c531a7b 100644 --- a/internal/pkg/server/listener.go +++ b/internal/pkg/server/listener.go @@ -16,7 +16,6 @@ Listens on a supplied port and sends incoming clients over a supplied channel */ type Listener struct { // exporting for easy use in the short term - Port int ClientConnections chan *ClientPacket Err chan error pb.UnimplementedHandshakeServer @@ -24,7 +23,7 @@ type Listener struct { // exporting for easy use in the short term type ClientPacket struct { *Client - Port chan int + Response chan *ClientResponse } type Client struct { @@ -36,9 +35,16 @@ type Client struct { Type string } -func NewListener(port int, cch chan *ClientPacket, ech chan error) *Listener { +type ClientResponse struct { + Port int + URL string + Org string + Token string + Bucket string +} + +func NewListener(cch chan *ClientPacket, ech chan error) *Listener { l := &Listener{Err:ech,ClientConnections:cch} - l.Port = port return l } @@ -47,19 +53,24 @@ func (l *Listener) Start() { if err := l.Register(); err != nil { l.Err <- err } - logging.Debug(logging.DStart,"Started client listener on port %v\n",l.Port) + logging.Debug(logging.DStart,"LIS 01 Started client listener") } func (l *Listener) Register() error { // creates a gRPC service and binds it to our handler - lis, err := net.Listen("tcp", fmt.Sprintf(":%v",l.Port)) // either binding to supplied port or binding to docker default + conf := LoadConfig() + port, err := conf.GetPort("lis") + if err != nil { + return err + } + lis, err := net.Listen("tcp", fmt.Sprintf(":%v",port)) // either binding to supplied port or binding to docker default if err != nil { return err } grpcServer := grpc.NewServer() pb.RegisterHandshakeServer(grpcServer, l) go grpcServer.Serve(lis) - logging.Debug(logging.DStart, "LIS Registered on port %v", l.Port) + logging.Debug(logging.DStart, "LIS 01 Registered on port %v", port) return nil } @@ -67,21 +78,12 @@ func (l *Listener) ClientDiscoveryHandler(ctx context.Context, ping *pb.ClientRe // incoming reactor ping need to spawn coord c := &Client{Id:ping.GetClientId(),Type:ping.GetClientType()} logging.Debug(logging.DClient, "LIS %v %v has connected\n",c.Type,c.Id) - ch := make(chan int) - p := &ClientPacket{Port:ch} + ch := make(chan *ClientResponse) + p := &ClientPacket{Response:ch} p.Client = c l.ClientConnections <-p - port := <-ch - /* - coord, ok := l.Coordinators[c.Type] - if !ok { - logging.Debug(logging.DSpawn,"CCO 01 Created Coordinator") - coord = NewCoordinator(c.Type, l.Sys, l.Err) - l.Coordinators[c.Type] = coord - go coord.Start() - } - port := coord.ClientHandler(c) - */ + resp := <-ch // return the port for the incoming requests - return &pb.ClientResponse{ClientId:c.Id,ServerPort:uint32(port)}, nil + db := &pb.Database{URL:resp.URL,ORG:resp.Org,Token:resp.Token,Bucket:resp.Bucket} + return &pb.ClientResponse{ClientId:c.Id,ServerPort:uint32(resp.Port),Database:db}, nil } diff --git a/notes b/notes index 9980903..6a4f4ce 100644 --- a/notes +++ b/notes @@ -897,3 +897,28 @@ with an embedded listener and database and shit so SERVER will call NewServer which will take care of subsequents + +# TODO 8/5 +Config storing time +going to probably have to add admin database client(aka server client which makes 0 sense) +can abstract all operations through interface and plugable package + +I just reliazed I coupled my mechanism with influxs token thing because it wokrs well but I am going to have to completely rebuild that if its properietary or we transition to a new DB + - hopefully null point due to the nature of OSS and time series + +CONFIG (and by extension DB) + +config.UpdateReactor(id, key, value) +config.UpdateSelf(key, value) + +should just be a basic way to update a given entry for an reactor +seperating server and reactor methods should lead to less finicky behaviour +should also probably wrap these in a seperate struct + - methods? + - who can call? + - who stores ptr? + - do we even need a ptr? can configs be stored and loaded statically or is that a bitch on the FS + +does it make more sensor to load different configs for each entity or justhave one monolithic config (probably load for each one and then let it update itself) + +going to have the init script set up the