Added more config methods for server interface and started working on docker compose. Need to finish influx startup and grafana provisioning

main
Keegan 2 years ago
parent f404056bd2
commit 56512b3086

@ -10,3 +10,4 @@
!go.sum
!server
!reactor
!.env

1
.gitignore vendored

@ -29,3 +29,4 @@ cmd/tui/tui
# machine dependent
tokens/
logs/
influxdb/config

@ -12,8 +12,6 @@ RUN CGO_ENABLED=0 go build -o /server ./cmd/server/main.go
FROM alpine
COPY --from=builder /server .
COPY --from=builder /app/internal/configs/ ./configs
COPY --from=builder /app/tokens/ ./tokens
EXPOSE 2022
EXPOSE 2023

@ -4,6 +4,7 @@ import (
_"net/http"
_ "net/http/pprof"
"strconv"
"strings"
//"flag"
//"log"
"os"
@ -17,12 +18,17 @@ type coordinator interface {
Start()
}
func NewCoordinator(port int, ch chan error) coordinator {
return server.NewCentralCoordinator(port, ch)
func NewCoordinator(ch chan error) coordinator {
return server.NewCentralCoordinator(ch)
}
func LoadConfig(fname string) {
func LoadConfig(fname string) Config {
config.Load(fname)
return config.LoadConfig()
}
type Config interface {
UpdatePort(string, int) error
}
func main() {
@ -31,24 +37,34 @@ func main() {
// go func() {
// fmt.Println(http.ListenAndServe("localhost:6060",nil))
// }()
LoadConfig("server")
conf := LoadConfig("server")
ch := make(chan error)
var port int
var err error
if p := os.Getenv("gRPC_PORT"); p == "" {
port = 2022 // default docker port
} else {
if port, err = strconv.Atoi(p); err != nil {
// checking env
envVars := os.Environ()
for _, envString := range envVars {
// looping over set ports
initSplt := strings.Split(envString,"=")
key := initSplt[0]
val := initSplt[1]
if strings.Contains(key,"PORT") {
// parsing out correct port to update
splt := strings.Split(key,"_") // LIS_PORT -> LIS, PORT
portName := strings.ToLower(splt[0]) // LIS -> lis
port, err := strconv.Atoi(val)
if err != nil {
panic(err)
}
if err := conf.UpdatePort(portName,port); err != nil {
panic(err)
}
}
}
//fmt.Printf("Listening on %v\n", lport)
c := NewCoordinator(port, ch)
c := NewCoordinator(ch)
go c.Start()
fmt.Println("Server Active!")
logging.Debug(logging.DStart, "CCO 01 Server started")
err = <-ch // blocking to wait for any errors and keep alive otherwise
err := <-ch // blocking to wait for any errors and keep alive otherwise
panic(err)
}

@ -10,6 +10,7 @@ services:
- "2023:2023"
volumes:
- ./logs:/log
- server-config:/configs
environment:
- LOGTYPE=SERVER
- VERBOSE=1
@ -21,13 +22,16 @@ services:
- "8086:8086"
volumes:
- influx-data:/var/lib/influxdb2
- influx-config:/etc/influxdb2
- ./influxdb/startup:/docker-entrypoint-initdb.d
- server-config:/server-config
env_file:
- ./internal/configs/db.env
environment:
- DOCKER_INFLUXDB_INIT_MODE=setup
- DOCKER_INFLUXDB_INIT_USERNAME=${INFLUXDB_USERNAME}
- DOCKER_INFLUXDB_INIT_PASSWORD=${INFLUXDB_PASSWORD}
- DOCKER_INFLUXDB_INIT_ORG=${INFLUXDB_ORG}
- DOCKER_INFLUXDB_INIT_BUCKET=${INFLUXDB_BUCKET}
- DOCKER_INFLUXDB_INIT_USERNAME=admin
- DOCKER_INFLUXDB_INIT_PASSWORD=admin
- DOCKER_INFLUXDB_INIT_ORG=ForeLight
- DOCKER_INFLUXDB_INIT_BUCKET=default
grafana:
image: grafana/grafana-oss:latest
ports:
@ -37,4 +41,4 @@ services:
volumes:
grafana-data:
influx-data:
influx-config:
server-config:

@ -0,0 +1,16 @@
#!/bin/bash
TOKEN=$(influx auth list --user ${DOCKER_INFLUXDB_INIT_USER_ID} --hide-headers | cut -f 3)
export db_url=$INFLUX_HOST
export db_org=$DOCKER_INFLUXDB_INIT_ORG_ID
export db_token=$TOKEN
rm -f temp.yaml
( echo "cat <<EOF >final.yaml";
cat template.yaml;
echo "EOF";
) >temp.yaml
. temp.yaml
cat final.yaml
mv final.yaml /server-config/server.yaml

@ -0,0 +1,6 @@
----
# ${gen_statement}
server:
db-url: "${db_url}"
db-org: "${db_org}"
db-token: "${db_token}"

@ -0,0 +1,5 @@
INFLUXDB_USERNAME=admin
INFLUXDB_PASSWORD=admin
INFLUXDB_ORG=ForeLight
INFLUXDB_BUCKET=default

@ -8,23 +8,25 @@ import (
"github.com/spf13/viper"
"FRMS/internal/pkg/logging"
"errors"
"sync"
"strings"
//"os"
//"log"
//"os/exec"
//"bytes"
//"strings"
)
type Config struct {
Server ServerConfig `mapstructure:"server"`
Reactors map[string]ReactorConfig `mapstructure:"reactors"`
sync.RWMutex
}
type ServerConfig struct {
URL string `mapstructure:"db-url"`
Token string `mapstructure:"db-token"`
Orginization string `mapstructure:"db-org"`
Ports map[string]string `mapstructure:"ports"`
Ports map[string]int `mapstructure:"ports"` // changed from map[string]string to map[string]int
Name string `mapstructure:"name"`
}
@ -56,30 +58,111 @@ func Load(fname string) {
panic(err)
}
fmt.Printf("Outcome: %#v\n \n",C)
fmt.Printf("%v\n",C)
//fmt.Printf("%v\n",C)
// unmarshalled at this point
}
func LoadStruct() *Config {
func LoadConfig() *Config {
return C
}
func (c *Config) GetURL() (string, error) {
c.RLock()
defer c.RUnlock()
return C.Server.URL, nil
}
func (c *Config) GetOrg() (string, error) {
c.RLock()
defer c.RUnlock()
return c.Server.Orginization, nil
}
func (c *Config) GetPort(port string) (int, error) {
c.RLock()
defer c.RUnlock()
portString, ok := c.Server.Ports[port]
if !ok {
portEnv := strings.ToUpper(port) + "_PORT"
return 0, fmt.Errorf("%s port doesnt exist! Please set using env %s=####",port, portEnv)
}
// returns int, err
//return strconv.Atoi(portString)
return portString, nil
}
func (c *Config) GetServerToken() (string, error) {
c.RLock()
defer c.RUnlock()
return c.Server.Token, nil
}
func (c *Config) GetReactorToken(id uint32) (string, error) {
func (c *Config) GetReactorClient(id uint32) (string, string, error) {
c.RLock()
defer c.RUnlock()
idString := strconv.FormatUint(uint64(id),10)
if r, ok := c.Reactors[idString]; ok {
return r.Token, nil
return r.Bucket, r.Token, nil
}
return "", "", fmt.Errorf("Reactor %v config doesnt exist!",id)
}
// setters
func (c *Config) UpdateURL(url string) error {
c.Lock()
defer c.Unlock()
if url == "" {
return errors.New("String cannot be empty!")
}
c.Server.URL = url
return viper.WriteConfig()
}
func (c *Config) UpdateOrg(org string) error {
c.Lock()
defer c.Unlock()
if org == "" {
return errors.New("String cannot be empty!")
}
c.Server.Orginization = org
return viper.WriteConfig()
}
func (c *Config) UpdatePort(pName string, port int) error {
c.Lock()
defer c.Unlock()
if port < 1024 || port > 65535 {
// OOB
return fmt.Errorf("Port %d out of bounds! [1024,65535]",port)
}
c.Server.Ports[pName] = port
return nil
}
func (c *Config) UpdateServerToken(token string) error {
c.Lock()
defer c.Unlock()
if token == "" {
return errors.New("String cannot be empty!")
}
return "", errors.New(fmt.Sprintf("Reactor %v config not found!",id))
c.Server.Token = token
return viper.WriteConfig()
}
func (c *Config) UpdateReactorClient(id uint32, bucket, token string) error {
c.Lock()
c.Unlock()
sid := strconv.FormatUint(uint64(id), 10)
if token == "" || bucket == "" {
return errors.New("String cannot be empty!")
}
if reactor, ok := c.Reactors[sid]; !ok {
c.Reactors[sid] = ReactorConfig{Token:token,Bucket:bucket,Id:id}
} else {
reactor.Bucket = bucket
reactor.Token = token
c.Reactors[sid] = reactor
}
return viper.WriteConfig()
}

@ -82,6 +82,7 @@ type ClientResponse struct {
ClientId uint32 `protobuf:"varint,1,opt,name=clientId,proto3" json:"clientId,omitempty"`
ServerPort uint32 `protobuf:"varint,2,opt,name=serverPort,proto3" json:"serverPort,omitempty"`
Database *Database `protobuf:"bytes,3,opt,name=database,proto3" json:"database,omitempty"`
}
func (x *ClientResponse) Reset() {
@ -130,6 +131,84 @@ func (x *ClientResponse) GetServerPort() uint32 {
return 0
}
func (x *ClientResponse) GetDatabase() *Database {
if x != nil {
return x.Database
}
return nil
}
type Database struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
URL string `protobuf:"bytes,1,opt,name=URL,proto3" json:"URL,omitempty"`
ORG string `protobuf:"bytes,2,opt,name=ORG,proto3" json:"ORG,omitempty"`
Token string `protobuf:"bytes,3,opt,name=token,proto3" json:"token,omitempty"`
Bucket string `protobuf:"bytes,4,opt,name=bucket,proto3" json:"bucket,omitempty"`
}
func (x *Database) Reset() {
*x = Database{}
if protoimpl.UnsafeEnabled {
mi := &file_internal_pkg_grpc_server_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *Database) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*Database) ProtoMessage() {}
func (x *Database) ProtoReflect() protoreflect.Message {
mi := &file_internal_pkg_grpc_server_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use Database.ProtoReflect.Descriptor instead.
func (*Database) Descriptor() ([]byte, []int) {
return file_internal_pkg_grpc_server_proto_rawDescGZIP(), []int{2}
}
func (x *Database) GetURL() string {
if x != nil {
return x.URL
}
return ""
}
func (x *Database) GetORG() string {
if x != nil {
return x.ORG
}
return ""
}
func (x *Database) GetToken() string {
if x != nil {
return x.Token
}
return ""
}
func (x *Database) GetBucket() string {
if x != nil {
return x.Bucket
}
return ""
}
var File_internal_pkg_grpc_server_proto protoreflect.FileDescriptor
var file_internal_pkg_grpc_server_proto_rawDesc = []byte{
@ -140,18 +219,27 @@ var file_internal_pkg_grpc_server_proto_rawDesc = []byte{
0x74, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x63, 0x6c, 0x69, 0x65, 0x6e,
0x74, 0x49, 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70,
0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x54,
0x79, 0x70, 0x65, 0x22, 0x4c, 0x0a, 0x0e, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73,
0x79, 0x70, 0x65, 0x22, 0x78, 0x0a, 0x0e, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x49,
0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x49,
0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x50, 0x6f, 0x72, 0x74, 0x18,
0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x50, 0x6f, 0x72,
0x74, 0x32, 0x50, 0x0a, 0x09, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x12, 0x43,
0x0a, 0x16, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72,
0x79, 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x12, 0x13, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e,
0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e,
0x67, 0x72, 0x70, 0x63, 0x2e, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f,
0x6e, 0x73, 0x65, 0x42, 0x13, 0x5a, 0x11, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f,
0x70, 0x6b, 0x67, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x74, 0x12, 0x2a, 0x0a, 0x08, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x18, 0x03, 0x20,
0x01, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x62,
0x61, 0x73, 0x65, 0x52, 0x08, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x22, 0x5c, 0x0a,
0x08, 0x44, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x55, 0x52, 0x4c,
0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x55, 0x52, 0x4c, 0x12, 0x10, 0x0a, 0x03, 0x4f,
0x52, 0x47, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x4f, 0x52, 0x47, 0x12, 0x14, 0x0a,
0x05, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f,
0x6b, 0x65, 0x6e, 0x12, 0x16, 0x0a, 0x06, 0x62, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x18, 0x04, 0x20,
0x01, 0x28, 0x09, 0x52, 0x06, 0x62, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x32, 0x50, 0x0a, 0x09, 0x68,
0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x12, 0x43, 0x0a, 0x16, 0x43, 0x6c, 0x69, 0x65,
0x6e, 0x74, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x48, 0x61, 0x6e, 0x64, 0x6c,
0x65, 0x72, 0x12, 0x13, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74,
0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x43,
0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x13, 0x5a,
0x11, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x67, 0x72,
0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@ -166,19 +254,21 @@ func file_internal_pkg_grpc_server_proto_rawDescGZIP() []byte {
return file_internal_pkg_grpc_server_proto_rawDescData
}
var file_internal_pkg_grpc_server_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_internal_pkg_grpc_server_proto_msgTypes = make([]protoimpl.MessageInfo, 3)
var file_internal_pkg_grpc_server_proto_goTypes = []interface{}{
(*ClientRequest)(nil), // 0: grpc.ClientRequest
(*ClientResponse)(nil), // 1: grpc.ClientResponse
(*Database)(nil), // 2: grpc.Database
}
var file_internal_pkg_grpc_server_proto_depIdxs = []int32{
0, // 0: grpc.handshake.ClientDiscoveryHandler:input_type -> grpc.ClientRequest
1, // 1: grpc.handshake.ClientDiscoveryHandler:output_type -> grpc.ClientResponse
1, // [1:2] is the sub-list for method output_type
0, // [0:1] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
2, // 0: grpc.ClientResponse.database:type_name -> grpc.Database
0, // 1: grpc.handshake.ClientDiscoveryHandler:input_type -> grpc.ClientRequest
1, // 2: grpc.handshake.ClientDiscoveryHandler:output_type -> grpc.ClientResponse
2, // [2:3] is the sub-list for method output_type
1, // [1:2] is the sub-list for method input_type
1, // [1:1] is the sub-list for extension type_name
1, // [1:1] is the sub-list for extension extendee
0, // [0:1] is the sub-list for field type_name
}
func init() { file_internal_pkg_grpc_server_proto_init() }
@ -211,6 +301,18 @@ func file_internal_pkg_grpc_server_proto_init() {
return nil
}
}
file_internal_pkg_grpc_server_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*Database); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
@ -218,7 +320,7 @@ func file_internal_pkg_grpc_server_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_internal_pkg_grpc_server_proto_rawDesc,
NumEnums: 0,
NumMessages: 2,
NumMessages: 3,
NumExtensions: 0,
NumServices: 1,
},

@ -22,4 +22,5 @@ message Database {
string URL = 1;
string ORG = 2;
string token = 3;
string bucket = 4;
}

@ -4,3 +4,31 @@ import (
_ "fmt"
_ "github.com/influxdata/influxdb-client-go/v2"
)
type DBClient struct {
URL string
Bucket string
Token string
// Client *influxdb2.Client
}
type DBAdmin struct {
// struct for admin methods
*DBClient
}
func NewDBClient(url, bucket, token string) *DBClient {
db := &DBClient{URL:url, Bucket:bucket, Token:token}
return db
}
func NewDBAdmin(url, bucket, token string) *DBAdmin {
admin := &DBAdmin{}
admin.DBClient = NewDBClient(url, bucket, token)
return admin
}
// base level funcs
func (d *DBClient) Start() {
// connect to DB
// d.Client = influxdb2.NewClient(d.URL,d.Token)
}

@ -2,7 +2,7 @@ package server
import (
"sync"
//"fmt"
"fmt"
"net"
"context"
"errors"
@ -19,21 +19,34 @@ import (
// config interface
func LoadConfig() Config {
return config.LoadStruct()
// returns a ServerConfig that we can query and update
return config.LoadConfig()
}
type Config interface {
type Config interface { // PROPOSED RENAMING: ServerConfig to avoid confusion w/ reactor variant
// getters
GetURL() (string, error)
GetOrg() (string, error)
GetServerToken() (string,error)
GetReactorToken(uint32) (string, error)
GetPort(string) (int, error)
GetServerToken() (string, error)
GetReactorClient(uint32) (string, string, error) // ret (bucket, token, err)
// setters
// save on write
UpdateURL(string) error
UpdateOrg(string) error
UpdateServerToken(string) error
UpdateReactorClient(uint32, string, string) error // call (id, bucket, token)
}
// db client interface
type DBClient interface{
GetBucket(uint32) (string, error) // will create if it doesn't exist
GetToken(uint32) (string, error) // will create if it doesn't exist
GetURL() (string, error) // returns the ipaddres + dbport
type DB interface{
// getters (all create if doesnt exist)
GetToken() (string, error) // returns admin token (Creates if it doesnt exist)
GetReactorClient(uint32) (string, string, error) // returns (bucket, token, err)
// delete
DeleteReactorClient(uint32) error // removes client token but maintains bucket
PurgeReactorClientData(uint32) error // perm deletes all assocaited reactor data (token, bucket etc)
}
/*func NewDBClient() DBClient {
@ -43,9 +56,10 @@ type DBClient interface{
type CentralCoordinator struct {
ClientConnections *ClientPacket
CLisPort int
//CLisPort int
*SubCoordinators
*SystemViewer
DB
Config
Err chan error
}
@ -55,9 +69,8 @@ type SubCoordinators struct {
sync.Mutex
}
func NewCentralCoordinator(port int, ch chan error) *CentralCoordinator {
c := &CentralCoordinator{CLisPort: port, Err: ch}
c.Config = LoadConfig()
func NewCentralCoordinator(ch chan error) *CentralCoordinator {
c := &CentralCoordinator{Err: ch}
c.SystemViewer = NewSystemViewer()
go c.SystemViewer.Start()
s := make(map[string]*SubCoordinator)
@ -68,22 +81,56 @@ func NewCentralCoordinator(port int, ch chan error) *CentralCoordinator {
func (c *CentralCoordinator) Start() {
// starts up associated funcs
// begin with config and client
c.LoadCfg()
clientChan := make(chan *ClientPacket)
l := NewListener(c.CLisPort,clientChan,c.Err)
l := NewListener(clientChan,c.Err)
go l.Start()
go c.ClientListener(clientChan)
}
func (c *CentralCoordinator) LoadCfg() {
// loads db client info and updates if anything is missing
c.Config = LoadConfig()
_, err := c.Config.GetURL()
if err != nil {
logging.Debug(logging.DError,"CCO 01 Err: %v", err)
c.Err <-err
}
token, err := c.Config.GetServerToken()
if err != nil {
logging.Debug(logging.DError,"CCO 01 Err: %v", err)
c.Err <-err
} else if token == "" {
if token, err = c.DB.GetToken(); err != nil {
logging.Debug(logging.DError,"CCO 01 Err: %v", err)
c.Err <-err
}
c.Config.UpdateServerToken(token)
}
org, err := c.Config.GetOrg()
if err != nil {
logging.Debug(logging.DError,"CCO 01 Err: %v", err)
c.Err <-err
} else if token == "" {
if token, err = c.DB.GetToken(); err != nil {
logging.Debug(logging.DError,"CCO 01 Err: %v", err)
c.Err <-err
}
c.Config.UpdateOrg(org)
}
}
func (c *CentralCoordinator) ClientListener(ch chan *ClientPacket) {
for client := range ch {
// basically loops until channel is closed
port := c.ClientHandler(client.Client)
client.Port <-port
cr := c.ClientHandler(client.Client)
client.Response <-cr
}
}
func (c *CentralCoordinator) ClientHandler(cl *Client) int {
func (c *CentralCoordinator) ClientHandler(cl *Client) *ClientResponse {
c.SubCoordinators.Lock()
defer c.SubCoordinators.Unlock()
subcoord, ok := c.SubCoordinators.Directory[cl.Type]
@ -94,7 +141,25 @@ func (c *CentralCoordinator) ClientHandler(cl *Client) int {
c.SubCoordinators.Directory[cl.Type] = subcoord
}
go subcoord.ClientHandler(cl)
return subcoord.Port
// setting up client response
var url, org, token, bucket string
var port int
var err error
if url, err = c.Config.GetURL(); err != nil {
logging.Debug(logging.DError,"Error: %v", err)
c.Err <-err
} else if org, err = c.Config.GetOrg(); err != nil {
logging.Debug(logging.DError,"Error: %v", err)
c.Err <-err
} else if bucket, token, err = c.Config.GetReactorClient(cl.Id); err != nil {
logging.Debug(logging.DError,"Error: %v", err)
c.Err <-err
} else if port, err = c.Config.GetPort(cl.Type); err != nil {
logging.Debug(logging.DError,"Error: %v", err)
c.Err <-err
}
cr := &ClientResponse{URL:url, Org:org, Token:token, Bucket:bucket, Port:port}
return cr
}
type ManagerInterface interface {
@ -128,21 +193,20 @@ type Managers struct {
func NewSubCoordinator(clientType string, sys *SystemViewer, err chan error) *SubCoordinator {
c := &SubCoordinator{Err:err}
c.SystemViewer = sys
man, port, errs := NewCoordinatorType(clientType, err)
man, errs := NewCoordinatorType(clientType, err)
if errs != nil {
err <-errs
}
c.Port = port
c.ManagerInterface = man
go man.Start()
go man.Register()
return c
}
func (c *SubCoordinator) ClientHandler(cl *Client) int {
func (c *SubCoordinator) ClientHandler(cl *Client) {
// (creates and) notifies manager of client connection
go c.UpdateManager(cl)
return c.Port
c.UpdateManager(cl)
}
func (c *SubCoordinator) UpdateManager(cl *Client) {
@ -174,21 +238,21 @@ func (m *Managers) GetManager(id uint32) (GeneralManager, bool) {
return man.(GeneralManager), exists
}
func NewCoordinatorType(clientType string, err chan error) (ManagerInterface, int, error) {
func NewCoordinatorType(clientType string, err chan error) (ManagerInterface, error) {
m := make(map[uint32]interface{})
if clientType == "reactor" {
c := &reactorCoordinator{}
//m := make(map[uint32]*ReactorManager)
c.Managers = &Managers{Directory:m}
return c, 2023, nil
return c, nil
} else if clientType == "tui" {
c := &tuiCoordinator{}
//m := make(map[uint32]*TUIManager)
c.Managers = &Managers{Directory:m}
return c, 2024, nil
return c, nil
}
return &reactorCoordinator{}, 0, errors.New("Unrecognized client type")
return &reactorCoordinator{}, errors.New("Unrecognized client type")
}
// creating sub coordinators for associated gRPC handlers
@ -208,9 +272,14 @@ func (r *reactorCoordinator) NewManager(cl *Client, sys *SystemViewer, err chan
}
func (r *reactorCoordinator) Register() {
lis, err := net.Listen("tcp", ":2023")
conf := LoadConfig()
port, err := conf.GetPort("reactor")
if err != nil {
// rip
panic(err)
}
lis, err := net.Listen("tcp", fmt.Sprintf(":%v",port))
if err != nil {
panic(err)
}
grpcServer := grpc.NewServer()
pb.RegisterMonitoringServer(grpcServer,r)
@ -246,7 +315,12 @@ func (t *tuiCoordinator) NewManager(cl *Client, sys *SystemViewer, err chan erro
}
func (t *tuiCoordinator) Register() {
lis, err := net.Listen("tcp", ":2024")
conf := LoadConfig()
port, err := conf.GetPort("tui")
if err != nil {
panic(err)
}
lis, err := net.Listen("tcp", fmt.Sprintf(":%v",port))
if err != nil {
// rip
}

@ -16,7 +16,6 @@ Listens on a supplied port and sends incoming clients over a supplied channel
*/
type Listener struct { // exporting for easy use in the short term
Port int
ClientConnections chan *ClientPacket
Err chan error
pb.UnimplementedHandshakeServer
@ -24,7 +23,7 @@ type Listener struct { // exporting for easy use in the short term
type ClientPacket struct {
*Client
Port chan int
Response chan *ClientResponse
}
type Client struct {
@ -36,9 +35,16 @@ type Client struct {
Type string
}
func NewListener(port int, cch chan *ClientPacket, ech chan error) *Listener {
type ClientResponse struct {
Port int
URL string
Org string
Token string
Bucket string
}
func NewListener(cch chan *ClientPacket, ech chan error) *Listener {
l := &Listener{Err:ech,ClientConnections:cch}
l.Port = port
return l
}
@ -47,19 +53,24 @@ func (l *Listener) Start() {
if err := l.Register(); err != nil {
l.Err <- err
}
logging.Debug(logging.DStart,"Started client listener on port %v\n",l.Port)
logging.Debug(logging.DStart,"LIS 01 Started client listener")
}
func (l *Listener) Register() error {
// creates a gRPC service and binds it to our handler
lis, err := net.Listen("tcp", fmt.Sprintf(":%v",l.Port)) // either binding to supplied port or binding to docker default
conf := LoadConfig()
port, err := conf.GetPort("lis")
if err != nil {
return err
}
lis, err := net.Listen("tcp", fmt.Sprintf(":%v",port)) // either binding to supplied port or binding to docker default
if err != nil {
return err
}
grpcServer := grpc.NewServer()
pb.RegisterHandshakeServer(grpcServer, l)
go grpcServer.Serve(lis)
logging.Debug(logging.DStart, "LIS Registered on port %v", l.Port)
logging.Debug(logging.DStart, "LIS 01 Registered on port %v", port)
return nil
}
@ -67,21 +78,12 @@ func (l *Listener) ClientDiscoveryHandler(ctx context.Context, ping *pb.ClientRe
// incoming reactor ping need to spawn coord
c := &Client{Id:ping.GetClientId(),Type:ping.GetClientType()}
logging.Debug(logging.DClient, "LIS %v %v has connected\n",c.Type,c.Id)
ch := make(chan int)
p := &ClientPacket{Port:ch}
ch := make(chan *ClientResponse)
p := &ClientPacket{Response:ch}
p.Client = c
l.ClientConnections <-p
port := <-ch
/*
coord, ok := l.Coordinators[c.Type]
if !ok {
logging.Debug(logging.DSpawn,"CCO 01 Created Coordinator")
coord = NewCoordinator(c.Type, l.Sys, l.Err)
l.Coordinators[c.Type] = coord
go coord.Start()
}
port := coord.ClientHandler(c)
*/
resp := <-ch
// return the port for the incoming requests
return &pb.ClientResponse{ClientId:c.Id,ServerPort:uint32(port)}, nil
db := &pb.Database{URL:resp.URL,ORG:resp.Org,Token:resp.Token,Bucket:resp.Bucket}
return &pb.ClientResponse{ClientId:c.Id,ServerPort:uint32(resp.Port),Database:db}, nil
}

25
notes

@ -897,3 +897,28 @@ with an embedded listener
and database and shit
so
SERVER will call NewServer which will take care of subsequents
# TODO 8/5
Config storing time
going to probably have to add admin database client(aka server client which makes 0 sense)
can abstract all operations through interface and plugable package
I just reliazed I coupled my mechanism with influxs token thing because it wokrs well but I am going to have to completely rebuild that if its properietary or we transition to a new DB
- hopefully null point due to the nature of OSS and time series
CONFIG (and by extension DB)
config.UpdateReactor(id, key, value)
config.UpdateSelf(key, value)
should just be a basic way to update a given entry for an reactor
seperating server and reactor methods should lead to less finicky behaviour
should also probably wrap these in a seperate struct
- methods?
- who can call?
- who stores ptr?
- do we even need a ptr? can configs be stored and loaded statically or is that a bitch on the FS
does it make more sensor to load different configs for each entity or justhave one monolithic config (probably load for each one and then let it update itself)
going to have the init script set up the

Loading…
Cancel
Save