You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
139 lines
3.3 KiB
Go
139 lines
3.3 KiB
Go
// Package manager provides basic manager functions to embed into higher level managers.
|
|
package manager
|
|
|
|
import (
|
|
"errors"
|
|
"math"
|
|
"math/rand"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
var (
|
|
ErrInvalidMaxConn = errors.New("invalid max connection attempts")
|
|
ErrManagerInactive = errors.New("manager inactive")
|
|
ErrManagerActive = errors.New("manager active")
|
|
ErrMaxAttemptsExceeded = errors.New("max connection attempts exceeded")
|
|
)
|
|
|
|
// Status is used as an enum for the current status.
|
|
// Could be expanded to include others such as killed, sleeping, etc.
|
|
type Status int
|
|
|
|
const (
|
|
Inactive Status = 0
|
|
Active Status = 1
|
|
)
|
|
|
|
// MaxConnAttempts is the maximum allowable connection attempts.
|
|
// Limited to 255 to prevent excessive timeout scaling.
|
|
const MaxConnAttempts = 0xFF
|
|
|
|
// Manager is a general purpose structure to implement basic capabilities.
|
|
// Stores state in active variable, modified through atomic swaps.
|
|
// Embeds a connection to be used in generating timeouts.
|
|
type Manager struct {
|
|
*connection
|
|
active int32
|
|
}
|
|
|
|
// New creates a new manager with the maxConn maximum attempts.
|
|
// Throws ErrInvalidMaxConn if maxConn is not in [0, MaxConnAttempts].
|
|
func New(maxConn int) (*Manager, error) {
|
|
|
|
if maxConn < 0 || maxConn > MaxConnAttempts {
|
|
return &Manager{}, ErrInvalidMaxConn
|
|
}
|
|
|
|
c := &connection{MaxAttempts: maxConn}
|
|
|
|
m := &Manager{
|
|
connection: c,
|
|
}
|
|
|
|
return m, nil
|
|
}
|
|
|
|
// Start attempts to start the manager.
|
|
// Throws ErrManagerActive error if the manager is already active.
|
|
func (m *Manager) Start() error {
|
|
if atomic.CompareAndSwapInt32(&m.active, 0, 1) {
|
|
m.ResetConnections()
|
|
return nil
|
|
}
|
|
|
|
return ErrManagerActive
|
|
}
|
|
|
|
// Stop attempts to stop the manager.
|
|
// Throws ErrManagerInactive error if the manager is already inactive.
|
|
func (m *Manager) Stop() error {
|
|
if atomic.CompareAndSwapInt32(&m.active, 1, 0) {
|
|
return nil
|
|
}
|
|
|
|
return ErrManagerInactive
|
|
}
|
|
|
|
// IsActive returns the current ManagerStatus.
|
|
func (m *Manager) IsActive() Status {
|
|
return Status(atomic.LoadInt32(&m.active))
|
|
}
|
|
|
|
// HeartBeat will send an empty struct over ping every hb (scale).
|
|
// The pings are sent ever (hb + rand(interval)) * scale.
|
|
// Where scale is typically time.Millisecond, time.Second etc.
|
|
// Will close the channel on exit to prevent leaks.
|
|
func (m *Manager) HeartBeat(
|
|
ping chan struct{},
|
|
hb, interval int,
|
|
scale time.Duration) {
|
|
|
|
for m.IsActive() == Active {
|
|
ping <- struct{}{}
|
|
|
|
sleep := time.Duration(hb-interval) * scale
|
|
|
|
if interval > 0 {
|
|
sleep += time.Duration(rand.Intn(2*interval)) * scale
|
|
}
|
|
|
|
time.Sleep(sleep)
|
|
}
|
|
|
|
close(ping)
|
|
}
|
|
|
|
// connection keeps track of maximum and current number of connection attempts.
|
|
// Concurrency safe as it is protected by a mutex.
|
|
type connection struct {
|
|
Attempts float64
|
|
MaxAttempts int
|
|
sync.Mutex
|
|
}
|
|
|
|
// Timeout returns an exponentially decaying timeout based on attempts.
|
|
// Returns timeout of type time.Duration in milliseconds.
|
|
// Returns ErrMaxAttemptsExceeded if too many attempts are tried.
|
|
func (c *connection) Timeout() (time.Duration, error) {
|
|
|
|
c.Lock()
|
|
defer c.Unlock()
|
|
|
|
if int(c.Attempts) < c.MaxAttempts {
|
|
to := time.Duration(50*math.Pow(2, c.Attempts)) * time.Millisecond
|
|
c.Attempts += 1
|
|
return to, nil
|
|
}
|
|
|
|
return 0, ErrMaxAttemptsExceeded
|
|
}
|
|
|
|
// ResetConnections sets the current connection attempts back to 0.
|
|
func (c *connection) ResetConnections() {
|
|
c.Lock()
|
|
defer c.Unlock()
|
|
c.Attempts = 0
|
|
}
|