diff --git a/internal/pkg/manager/manager.go b/internal/pkg/manager/manager.go index fe316c9..314f7f7 100644 --- a/internal/pkg/manager/manager.go +++ b/internal/pkg/manager/manager.go @@ -1,3 +1,4 @@ +// Package manager provides basic manager functions to embed into higher level managers package manager import ( @@ -9,23 +10,6 @@ import ( "time" ) -// max allowable connection attempts is 255 -const MaxConnAttempts = 0xFF - -// basic manager for starting/stopping checks plus built in heartbeat for downtime detection - -type Connection struct { - Attempts float64 // float for pow - MaxAttempts int // max allowed - sync.Mutex -} - -type Manager struct { - *Connection // embedded for timeout stuff - Active int32 // atomic checks -} - -// errors var ( ErrInvalidMaxConn = errors.New("invalid max connection attempts") ErrManagerInactive = errors.New("manager inactive") @@ -33,22 +17,48 @@ var ( ErrMaxAttemptsExceeded = errors.New("max connection attempts exceeded") ) +// ManagerStatus is used as an enum for the current status +// Could be expanded to include others such as killed, sleeping, etc. +type ManagerStatus int + +const ( + Inactive ManagerStatus = 0 + Active ManagerStatus = 1 +) + +// MaxConnAttempts is the maximum allowable connection attempts +// Limited to 255 to prevent excessive timeout scaling +const MaxConnAttempts = 0xFF + +// Manager is a general purpose structure to implement basic capabilities +// Tracks its state via Active, modifying it on starts and stops +// Embeds a connection to be used in generating timeouts +type Manager struct { + *connection // embedded for timeout stuff + active int32 // atomic checks +} + +// New creates a new manager with the maxConn maximum attempts +// Throws ErrInvalidMaxConn if maxConn is not in [0, MaxConnAttempts] func New(maxConn int) (*Manager, error) { if maxConn < 0 || maxConn > MaxConnAttempts { return &Manager{}, ErrInvalidMaxConn } - c := &Connection{MaxAttempts: maxConn} + c := &connection{MaxAttempts: maxConn} + m := &Manager{ - Connection: c, + connection: c, } return m, nil } +// Start attempts to start the manager +// Throws ErrManagerActive error if the manager is already active func (m *Manager) Start() error { - if atomic.CompareAndSwapInt32(&m.Active, 0, 1) { + if atomic.CompareAndSwapInt32(&m.active, 0, 1) { m.ResetConnections() return nil } @@ -56,57 +66,72 @@ func (m *Manager) Start() error { return ErrManagerActive } +// Stop attempts to stop the manager +// Throws ErrManagerInactive error if the manager is already inactive func (m *Manager) Stop() error { - if atomic.CompareAndSwapInt32(&m.Active, 1, 0) { + if atomic.CompareAndSwapInt32(&m.active, 1, 0) { return nil } return ErrManagerInactive } -func (m *Manager) IsActive() int { - return int(atomic.LoadInt32(&m.Active)) +// IsActive returns the current ManagerStatus +func (m *Manager) IsActive() ManagerStatus { + return ManagerStatus(atomic.LoadInt32(&m.active)) } -// Heartbeat tracker +// HeartBeat will send an empty struct over ping every hb (scale) +// The pings are sent ever (hb + rand(interval)) * scale +// Where scale is typically time.Millisecond, time.Second etc. +// Will close the channel on exit to prevent leaks +func (m *Manager) HeartBeat( + ping chan struct{}, + hb, interval int, + scale time.Duration) { -func (m *Manager) HeartBeat(ping chan struct{}, hb, interval int, scale time.Duration) { - // pings channel every (HB + randInterval) * time.Duration - // can be used anywhere a heartbeat is needed - // closes channel on exit + for m.IsActive() == Active { + ping <- struct{}{} - if interval > 0 { - rand.Seed(time.Now().UnixNano()) - } - - for atomic.LoadInt32(&m.Active) > 0 { - // atomoic read may cause memory leak, can revisit - ping <- struct{}{} // no mem sleep := time.Duration(hb-interval) * scale + if interval > 0 { sleep += time.Duration(rand.Intn(2*interval)) * scale } + time.Sleep(sleep) } - // exited, close chan + close(ping) } -// connection timeout generator +// connection keeps track of maximum and current number of connection attempts +// Concurrency safe as it is protected by a mutex +type connection struct { + Attempts float64 + MaxAttempts int + sync.Mutex +} + +// Timeout returns an exponentially decaying timeout based on attempts +// Returns timeout of type time.Duration in milliseconds +// Returns ErrMaxAttemptsExceeded if too many attempts are tried +func (c *connection) Timeout() (time.Duration, error) { -func (c *Connection) Timeout() (time.Duration, error) { - // exponential backoff c.Lock() defer c.Unlock() + if int(c.Attempts) < c.MaxAttempts { to := time.Duration(50*math.Pow(2, c.Attempts)) * time.Millisecond c.Attempts += 1 return to, nil } + return 0, ErrMaxAttemptsExceeded } -func (c *Connection) ResetConnections() { +// ResetConnections sets the current connection attempts back to 0 +func (c *connection) ResetConnections() { c.Lock() defer c.Unlock() c.Attempts = 0 diff --git a/internal/pkg/manager/manager_test.go b/internal/pkg/manager/manager_test.go index 5a7096c..017ad24 100644 --- a/internal/pkg/manager/manager_test.go +++ b/internal/pkg/manager/manager_test.go @@ -28,7 +28,7 @@ func newManager(conn int, want error, t *testing.T) *Manager { ) } - assert.Equal(manager.IsActive(), 0, "manager should start inactive") + assert.Equal(manager.IsActive(), Inactive, "manager should start inactive") return manager } @@ -72,10 +72,10 @@ func TestManagerLifeCycle(t *testing.T) { for i := 0; i < cycles; i++ { assert.NoError(manager.Start(), "starting manager failed") - assert.Equal(manager.IsActive(), 1, "manager is inactive after start") + assert.Equal(manager.IsActive(), Active, "manager inactive after start") assert.NoError(manager.Stop(), "stopping manager failed") - assert.Equal(manager.IsActive(), 0, "manager is active after stop") + assert.Equal(manager.IsActive(), Inactive, "manager active after stop") } } @@ -120,7 +120,7 @@ func TestManagerTimeout(t *testing.T) { manager = newManager(conn, nil, t) assert.NoError(manager.Start(), "starting manager failed") - assert.Equal(manager.IsActive(), 1, "manager is inactive") + assert.Equal(manager.IsActive(), Active, "manager is inactive") prevTimeout, err := manager.Timeout() @@ -155,7 +155,7 @@ func TestManagerHB(t *testing.T) { manager = newManager(conn, nil, t) assert.NoError(manager.Start(), "starting manager failed") - assert.Equal(manager.IsActive(), 1, "manager is inactive") + assert.Equal(manager.IsActive(), Active, "manager is inactive") ch := make(chan struct{}) @@ -166,7 +166,7 @@ func TestManagerHB(t *testing.T) { assert.NoError(manager.Stop(), "stopping manager failed") } - assert.Equal(manager.IsActive(), 0, "manager is active") + assert.Equal(manager.IsActive(), Inactive, "manager is active") } // TestManagerHBTiming tests the heartbeat channel timing is correct @@ -179,7 +179,7 @@ func TestManagerHBTiming(t *testing.T) { manager = newManager(conn, nil, t) assert.NoError(manager.Start(), "starting manager failed") - assert.Equal(manager.IsActive(), 1, "manager is inactive") + assert.Equal(manager.IsActive(), Active, "manager is inactive") ch := make(chan struct{}) hb := 100 @@ -201,6 +201,6 @@ func TestManagerHBTiming(t *testing.T) { } end := time.Now() - assert.Equal(manager.IsActive(), 0, "manager is active") + assert.Equal(manager.IsActive(), Inactive, "manager is active") assert.WithinDuration(start, end, expected, "inaccurate heartbeat") }