|
|
@ -45,12 +45,17 @@ func NewCentralCoordinator(config *viper.Viper, ch chan error) *CentralCoordinat
|
|
|
|
ch <- err
|
|
|
|
ch <- err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
rc, err := NewReactorCoordinator(ch)
|
|
|
|
rc, err := NewReactorCoordinator(config, ch)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
ch <- err
|
|
|
|
ch <- err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
config.UnmarshalKey("server.ports", rc) // get reactor port
|
|
|
|
config.UnmarshalKey("server.ports", rc) // get reactor port
|
|
|
|
c := &CentralCoordinator{Err: ch, Config: config, Database: db, ReactorCoordinator: rc}
|
|
|
|
c := &CentralCoordinator{
|
|
|
|
|
|
|
|
Err: ch,
|
|
|
|
|
|
|
|
Config: config,
|
|
|
|
|
|
|
|
Database: db,
|
|
|
|
|
|
|
|
ReactorCoordinator: rc,
|
|
|
|
|
|
|
|
}
|
|
|
|
// grab config settings
|
|
|
|
// grab config settings
|
|
|
|
if err = config.UnmarshalKey("server", c); err != nil {
|
|
|
|
if err = config.UnmarshalKey("server", c); err != nil {
|
|
|
|
ch <- err
|
|
|
|
ch <- err
|
|
|
@ -114,13 +119,14 @@ type ReactorCoordinator struct {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
type ReactorManagers struct {
|
|
|
|
type ReactorManagers struct {
|
|
|
|
|
|
|
|
Config *viper.Viper
|
|
|
|
Directory map[int]*ReactorManager
|
|
|
|
Directory map[int]*ReactorManager
|
|
|
|
sync.RWMutex
|
|
|
|
sync.RWMutex
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func NewReactorCoordinator(errCh chan error) (*ReactorCoordinator, error) {
|
|
|
|
func NewReactorCoordinator(config *viper.Viper, errCh chan error) (*ReactorCoordinator, error) {
|
|
|
|
rmap := make(map[int]*ReactorManager)
|
|
|
|
rmap := make(map[int]*ReactorManager)
|
|
|
|
rm := &ReactorManagers{Directory: rmap}
|
|
|
|
rm := &ReactorManagers{Directory: rmap, Config: config}
|
|
|
|
c := &ReactorCoordinator{Err: errCh, ReactorManagers: rm}
|
|
|
|
c := &ReactorCoordinator{Err: errCh, ReactorManagers: rm}
|
|
|
|
return c, nil
|
|
|
|
return c, nil
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -144,7 +150,7 @@ func (m *ReactorManagers) GetReactorManager(id int) (*ReactorManager, error) {
|
|
|
|
|
|
|
|
|
|
|
|
rm, exists := m.Directory[id]
|
|
|
|
rm, exists := m.Directory[id]
|
|
|
|
if !exists {
|
|
|
|
if !exists {
|
|
|
|
return &ReactorManager{ID: id}, errors.New(fmt.Sprintf("No manager for reactor %d!", id))
|
|
|
|
return &ReactorManager{}, errors.New(fmt.Sprintf("No manager for reactor %d!", id))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return rm, nil
|
|
|
|
return rm, nil
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -160,7 +166,7 @@ func (m *ReactorManagers) UpdateReactorManager(cl *Client, errCh chan error) err
|
|
|
|
if !exists {
|
|
|
|
if !exists {
|
|
|
|
logging.Debug(logging.DClient, "RCO creating manager for reactor client %v", cl.Id)
|
|
|
|
logging.Debug(logging.DClient, "RCO creating manager for reactor client %v", cl.Id)
|
|
|
|
// creating
|
|
|
|
// creating
|
|
|
|
rm = NewReactorManager(errCh)
|
|
|
|
rm = NewReactorManager(cl, m.Config, errCh)
|
|
|
|
// starting
|
|
|
|
// starting
|
|
|
|
if err = rm.Start(); err != nil {
|
|
|
|
if err = rm.Start(); err != nil {
|
|
|
|
return err
|
|
|
|
return err
|
|
|
|