documented manager

main
KeeganForelight 2 years ago
parent 8936570c59
commit 4914b93408

@ -1,3 +1,4 @@
// Package manager provides basic manager functions to embed into higher level managers
package manager
import (
@ -9,23 +10,6 @@ import (
"time"
)
// max allowable connection attempts is 255
const MaxConnAttempts = 0xFF
// basic manager for starting/stopping checks plus built in heartbeat for downtime detection
type Connection struct {
Attempts float64 // float for pow
MaxAttempts int // max allowed
sync.Mutex
}
type Manager struct {
*Connection // embedded for timeout stuff
Active int32 // atomic checks
}
// errors
var (
ErrInvalidMaxConn = errors.New("invalid max connection attempts")
ErrManagerInactive = errors.New("manager inactive")
@ -33,22 +17,48 @@ var (
ErrMaxAttemptsExceeded = errors.New("max connection attempts exceeded")
)
// ManagerStatus is used as an enum for the current status
// Could be expanded to include others such as killed, sleeping, etc.
type ManagerStatus int
const (
Inactive ManagerStatus = 0
Active ManagerStatus = 1
)
// MaxConnAttempts is the maximum allowable connection attempts
// Limited to 255 to prevent excessive timeout scaling
const MaxConnAttempts = 0xFF
// Manager is a general purpose structure to implement basic capabilities
// Tracks its state via Active, modifying it on starts and stops
// Embeds a connection to be used in generating timeouts
type Manager struct {
*connection // embedded for timeout stuff
active int32 // atomic checks
}
// New creates a new manager with the maxConn maximum attempts
// Throws ErrInvalidMaxConn if maxConn is not in [0, MaxConnAttempts]
func New(maxConn int) (*Manager, error) {
if maxConn < 0 || maxConn > MaxConnAttempts {
return &Manager{}, ErrInvalidMaxConn
}
c := &Connection{MaxAttempts: maxConn}
c := &connection{MaxAttempts: maxConn}
m := &Manager{
Connection: c,
connection: c,
}
return m, nil
}
// Start attempts to start the manager
// Throws ErrManagerActive error if the manager is already active
func (m *Manager) Start() error {
if atomic.CompareAndSwapInt32(&m.Active, 0, 1) {
if atomic.CompareAndSwapInt32(&m.active, 0, 1) {
m.ResetConnections()
return nil
}
@ -56,57 +66,72 @@ func (m *Manager) Start() error {
return ErrManagerActive
}
// Stop attempts to stop the manager
// Throws ErrManagerInactive error if the manager is already inactive
func (m *Manager) Stop() error {
if atomic.CompareAndSwapInt32(&m.Active, 1, 0) {
if atomic.CompareAndSwapInt32(&m.active, 1, 0) {
return nil
}
return ErrManagerInactive
}
func (m *Manager) IsActive() int {
return int(atomic.LoadInt32(&m.Active))
// IsActive returns the current ManagerStatus
func (m *Manager) IsActive() ManagerStatus {
return ManagerStatus(atomic.LoadInt32(&m.active))
}
// Heartbeat tracker
// HeartBeat will send an empty struct over ping every hb (scale)
// The pings are sent ever (hb + rand(interval)) * scale
// Where scale is typically time.Millisecond, time.Second etc.
// Will close the channel on exit to prevent leaks
func (m *Manager) HeartBeat(
ping chan struct{},
hb, interval int,
scale time.Duration) {
func (m *Manager) HeartBeat(ping chan struct{}, hb, interval int, scale time.Duration) {
// pings channel every (HB + randInterval) * time.Duration
// can be used anywhere a heartbeat is needed
// closes channel on exit
for m.IsActive() == Active {
ping <- struct{}{}
if interval > 0 {
rand.Seed(time.Now().UnixNano())
}
for atomic.LoadInt32(&m.Active) > 0 {
// atomoic read may cause memory leak, can revisit
ping <- struct{}{} // no mem
sleep := time.Duration(hb-interval) * scale
if interval > 0 {
sleep += time.Duration(rand.Intn(2*interval)) * scale
}
time.Sleep(sleep)
}
// exited, close chan
close(ping)
}
// connection timeout generator
// connection keeps track of maximum and current number of connection attempts
// Concurrency safe as it is protected by a mutex
type connection struct {
Attempts float64
MaxAttempts int
sync.Mutex
}
// Timeout returns an exponentially decaying timeout based on attempts
// Returns timeout of type time.Duration in milliseconds
// Returns ErrMaxAttemptsExceeded if too many attempts are tried
func (c *connection) Timeout() (time.Duration, error) {
func (c *Connection) Timeout() (time.Duration, error) {
// exponential backoff
c.Lock()
defer c.Unlock()
if int(c.Attempts) < c.MaxAttempts {
to := time.Duration(50*math.Pow(2, c.Attempts)) * time.Millisecond
c.Attempts += 1
return to, nil
}
return 0, ErrMaxAttemptsExceeded
}
func (c *Connection) ResetConnections() {
// ResetConnections sets the current connection attempts back to 0
func (c *connection) ResetConnections() {
c.Lock()
defer c.Unlock()
c.Attempts = 0

@ -28,7 +28,7 @@ func newManager(conn int, want error, t *testing.T) *Manager {
)
}
assert.Equal(manager.IsActive(), 0, "manager should start inactive")
assert.Equal(manager.IsActive(), Inactive, "manager should start inactive")
return manager
}
@ -72,10 +72,10 @@ func TestManagerLifeCycle(t *testing.T) {
for i := 0; i < cycles; i++ {
assert.NoError(manager.Start(), "starting manager failed")
assert.Equal(manager.IsActive(), 1, "manager is inactive after start")
assert.Equal(manager.IsActive(), Active, "manager inactive after start")
assert.NoError(manager.Stop(), "stopping manager failed")
assert.Equal(manager.IsActive(), 0, "manager is active after stop")
assert.Equal(manager.IsActive(), Inactive, "manager active after stop")
}
}
@ -120,7 +120,7 @@ func TestManagerTimeout(t *testing.T) {
manager = newManager(conn, nil, t)
assert.NoError(manager.Start(), "starting manager failed")
assert.Equal(manager.IsActive(), 1, "manager is inactive")
assert.Equal(manager.IsActive(), Active, "manager is inactive")
prevTimeout, err := manager.Timeout()
@ -155,7 +155,7 @@ func TestManagerHB(t *testing.T) {
manager = newManager(conn, nil, t)
assert.NoError(manager.Start(), "starting manager failed")
assert.Equal(manager.IsActive(), 1, "manager is inactive")
assert.Equal(manager.IsActive(), Active, "manager is inactive")
ch := make(chan struct{})
@ -166,7 +166,7 @@ func TestManagerHB(t *testing.T) {
assert.NoError(manager.Stop(), "stopping manager failed")
}
assert.Equal(manager.IsActive(), 0, "manager is active")
assert.Equal(manager.IsActive(), Inactive, "manager is active")
}
// TestManagerHBTiming tests the heartbeat channel timing is correct
@ -179,7 +179,7 @@ func TestManagerHBTiming(t *testing.T) {
manager = newManager(conn, nil, t)
assert.NoError(manager.Start(), "starting manager failed")
assert.Equal(manager.IsActive(), 1, "manager is inactive")
assert.Equal(manager.IsActive(), Active, "manager is inactive")
ch := make(chan struct{})
hb := 100
@ -201,6 +201,6 @@ func TestManagerHBTiming(t *testing.T) {
}
end := time.Now()
assert.Equal(manager.IsActive(), 0, "manager is active")
assert.Equal(manager.IsActive(), Inactive, "manager is active")
assert.WithinDuration(start, end, expected, "inaccurate heartbeat")
}

Loading…
Cancel
Save