You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

139 lines
3.4 KiB
Go

// Package manager provides basic manager functions to embed into higher level managers.
package manager
import (
"errors"
"math"
"math/rand"
"sync"
"sync/atomic"
"time"
)
var (
ErrInvalidMaxConn = errors.New("invalid max connection attempts")
ErrManagerInactive = errors.New("manager inactive")
ErrManagerActive = errors.New("manager active")
ErrMaxAttemptsExceeded = errors.New("max connection attempts exceeded")
)
// ManagerStatus is used as an enum for the current status.
// Could be expanded to include others such as killed, sleeping, etc.
type ManagerStatus int
const (
Inactive ManagerStatus = 0
Active ManagerStatus = 1
)
// MaxConnAttempts is the maximum allowable connection attempts.
// Limited to 255 to prevent excessive timeout scaling.
const MaxConnAttempts = 0xFF
// Manager is a general purpose structure to implement basic capabilities.
// Stores state in active variable, modified through atomic swaps.
// Embeds a connection to be used in generating timeouts.
type Manager struct {
*connection
active int32
}
// New creates a new manager with the maxConn maximum attempts.
// Throws ErrInvalidMaxConn if maxConn is not in [0, MaxConnAttempts].
func New(maxConn int) (*Manager, error) {
if maxConn < 0 || maxConn > MaxConnAttempts {
return &Manager{}, ErrInvalidMaxConn
}
c := &connection{MaxAttempts: maxConn}
m := &Manager{
connection: c,
}
return m, nil
}
// Start attempts to start the manager.
// Throws ErrManagerActive error if the manager is already active.
func (m *Manager) Start() error {
if atomic.CompareAndSwapInt32(&m.active, 0, 1) {
m.ResetConnections()
return nil
}
return ErrManagerActive
}
// Stop attempts to stop the manager.
// Throws ErrManagerInactive error if the manager is already inactive.
func (m *Manager) Stop() error {
if atomic.CompareAndSwapInt32(&m.active, 1, 0) {
return nil
}
return ErrManagerInactive
}
// IsActive returns the current ManagerStatus.
func (m *Manager) IsActive() ManagerStatus {
return ManagerStatus(atomic.LoadInt32(&m.active))
}
// HeartBeat will send an empty struct over ping every hb (scale).
// The pings are sent ever (hb + rand(interval)) * scale.
// Where scale is typically time.Millisecond, time.Second etc.
// Will close the channel on exit to prevent leaks.
func (m *Manager) HeartBeat(
ping chan struct{},
hb, interval int,
scale time.Duration) {
for m.IsActive() == Active {
ping <- struct{}{}
sleep := time.Duration(hb-interval) * scale
if interval > 0 {
sleep += time.Duration(rand.Intn(2*interval)) * scale
}
time.Sleep(sleep)
}
close(ping)
}
// connection keeps track of maximum and current number of connection attempts.
// Concurrency safe as it is protected by a mutex.
type connection struct {
Attempts float64
MaxAttempts int
sync.Mutex
}
// Timeout returns an exponentially decaying timeout based on attempts.
// Returns timeout of type time.Duration in milliseconds.
// Returns ErrMaxAttemptsExceeded if too many attempts are tried.
func (c *connection) Timeout() (time.Duration, error) {
c.Lock()
defer c.Unlock()
if int(c.Attempts) < c.MaxAttempts {
to := time.Duration(50*math.Pow(2, c.Attempts)) * time.Millisecond
c.Attempts += 1
return to, nil
}
return 0, ErrMaxAttemptsExceeded
}
// ResetConnections sets the current connection attempts back to 0.
func (c *connection) ResetConnections() {
c.Lock()
defer c.Unlock()
c.Attempts = 0
}