@ -7,58 +7,38 @@ import (
"FRMS/internal/pkg/logging"
"context"
"errors"
"fmt"
"sync"
"github.com/spf13/viper"
)
// this package creates the central coordiantor and sub coordiantors for clients
// interfaces
// config interface
/ *
func LoadConfig ( ) Config {
// returns a ServerConfig that we can query and update
//return config.LoadConfig()
}
* /
type Config interface { // PROPOSED RENAMING: ServerConfig to avoid confusion w/ reactor variant
// getters
GetURL ( ) ( string , error )
GetOrg ( ) ( string , error )
GetPort ( string ) ( int , error )
GetServerToken ( ) ( string , error )
GetReactorClient ( uint32 ) ( string , string , error ) // ret (bucket, token, err)
// setters
// save on write
//UpdateURL(string) error
//UpdateOrg(string) error
//UpdateServerToken(string) error
UpdateReactorClient ( uint32 , string , string ) error // call (id, bucket, token)
}
// db client interface
type DB interface {
// getters (all create if doesnt exist)
//GetToken() (string, error) // returns admin token (Creates if it doesnt exist)
GetReactorClient ( string) ( string , string , error ) // returns ( bucket, token, err)
GetReactorClient ( int ) ( string , string , string , string , error ) // returns (url, org, bucket, token, err)
// delete
// DeleteReactorClient(string) error // removes client token but maintains bucket
// PurgeReactorClientData(string) error // perm deletes all assocaited reactor data (token, bucket etc)
}
func NewDBAdmin ( url, org , token string ) DB {
return influxdb . NewDBAdmin ( token, org , url )
func NewDBAdmin ( config * viper . Viper ) ( DB , error ) {
return influxdb . NewDBAdmin ( config )
}
type CentralCoordinator struct {
ClientConnections * ClientPacket
//CLisPort int
* SubCoordinators
* SystemViewer
DB
Config
Err chan error
Config * viper . Viper
// from config
Ports map [ string ] int ` mapstructure:"ports" `
Err chan error
}
type SubCoordinators struct {
@ -66,8 +46,13 @@ type SubCoordinators struct {
sync . Mutex
}
func NewCentralCoordinator ( ch chan error ) * CentralCoordinator {
c := & CentralCoordinator { Err : ch }
func NewCentralCoordinator ( config * viper . Viper , ch chan error ) * CentralCoordinator {
c := & CentralCoordinator { Err : ch , Config : config }
if err := config . UnmarshalKey ( "server" , c ) ; err != nil {
// report error
ch <- err
}
fmt . Printf ( "%+v\n" , c )
c . SystemViewer = NewSystemViewer ( )
go c . SystemViewer . Start ( )
s := make ( map [ string ] * SubCoordinator )
@ -78,36 +63,11 @@ func NewCentralCoordinator(ch chan error) *CentralCoordinator {
func ( c * CentralCoordinator ) Start ( ) {
// starts up associated funcs
// begin with config and client
//c.LoadCfg()
clientChan := make ( chan * ClientPacket )
l := NewListener ( clientChan , c . Err )
go l . Start ( )
go c . ClientListener ( clientChan )
}
/ *
func ( c * CentralCoordinator ) LoadCfg ( ) {
// loads db client info and updates if anything is missing
c . Config = LoadConfig ( )
_ , err := c . Config . GetURL ( )
if err != nil {
logging . Debug ( logging . DError , "CCO 01 Err: %v" , err )
c . Err <- err
}
token , err := c . Config . GetServerToken ( )
if err != nil {
logging . Debug ( logging . DError , "CCO 01 Err: %v" , err )
c . Err <- err
}
org , err := c . Config . GetOrg ( )
if err != nil {
logging . Debug ( logging . DError , "CCO 01 Err: %v" , err )
c . Err <- err
}
}
* /
func ( c * CentralCoordinator ) ClientListener ( ch chan * ClientPacket ) {
for client := range ch {
@ -129,23 +89,11 @@ func (c *CentralCoordinator) ClientHandler(cl *Client) *ClientResponse {
}
go subcoord . ClientHandler ( cl )
// setting up client response
var url , org , token , bucket string
var port int
var err error
if url , err = c . Config . GetURL ( ) ; err != nil {
logging . Debug ( logging . DError , "Error: %v" , err )
c . Err <- err
} else if org , err = c . Config . GetOrg ( ) ; err != nil {
logging . Debug ( logging . DError , "Error: %v" , err )
c . Err <- err
} else if bucket , token , err = c . Config . GetReactorClient ( cl . Id ) ; err != nil {
logging . Debug ( logging . DError , "Error: %v" , err )
c . Err <- err
} else if port , err = c . Config . GetPort ( cl . Type ) ; err != nil {
logging . Debug ( logging . DError , "Error: %v" , err )
url , org , token , bucket , err := c . DB . GetReactorClient ( int ( cl . Id ) )
if err != nil {
c . Err <- err
}
cr := & ClientResponse { URL : url , Org : org , Token : token , Bucket : bucket , Port : port }
cr := & ClientResponse { URL : url , Org : org , Token : token , Bucket : bucket , Port : c . Ports [ cl . Type ] }
return cr
}