|
|
|
package manager
|
|
|
|
|
|
|
|
import (
|
|
|
|
"errors"
|
|
|
|
"math"
|
|
|
|
"math/rand"
|
|
|
|
"sync"
|
|
|
|
"sync/atomic"
|
|
|
|
"time"
|
|
|
|
)
|
|
|
|
|
|
|
|
// max allowable connection attempts is 255
|
|
|
|
const MaxConnAttempts = 0xFF
|
|
|
|
|
|
|
|
// basic manager for starting/stopping checks plus built in heartbeat for downtime detection
|
|
|
|
|
|
|
|
type Connection struct {
|
|
|
|
Attempts float64 // float for pow
|
|
|
|
MaxAttempts int // max allowed
|
|
|
|
sync.Mutex
|
|
|
|
}
|
|
|
|
|
|
|
|
type Manager struct {
|
|
|
|
*Connection // embedded for timeout stuff
|
|
|
|
Active int32 // atomic checks
|
|
|
|
}
|
|
|
|
|
|
|
|
// errors
|
|
|
|
var (
|
|
|
|
ErrInvalidMaxConn = errors.New("invalid max connection attempts")
|
|
|
|
ErrManagerInactive = errors.New("manager inactive")
|
|
|
|
ErrManagerActive = errors.New("manager active")
|
|
|
|
ErrMaxAttemptsExceeded = errors.New("max connection attempts exceeded")
|
|
|
|
)
|
|
|
|
|
|
|
|
func New(maxConn int) (*Manager, error) {
|
|
|
|
|
|
|
|
if maxConn < 0 || maxConn > MaxConnAttempts {
|
|
|
|
return &Manager{}, ErrInvalidMaxConn
|
|
|
|
}
|
|
|
|
|
|
|
|
c := &Connection{MaxAttempts: maxConn}
|
|
|
|
m := &Manager{
|
|
|
|
Connection: c,
|
|
|
|
}
|
|
|
|
|
|
|
|
return m, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *Manager) Start() error {
|
|
|
|
if atomic.CompareAndSwapInt32(&m.Active, 0, 1) {
|
|
|
|
m.ResetConnections()
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
return ErrManagerActive
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *Manager) Stop() error {
|
|
|
|
if atomic.CompareAndSwapInt32(&m.Active, 1, 0) {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
return ErrManagerInactive
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *Manager) IsActive() int {
|
|
|
|
return int(atomic.LoadInt32(&m.Active))
|
|
|
|
}
|
|
|
|
|
|
|
|
// Heartbeat tracker
|
|
|
|
|
|
|
|
func (m *Manager) HeartBeat(ping chan struct{}, hb, interval int, scale time.Duration) {
|
|
|
|
// pings channel every (HB + randInterval) * time.Duration
|
|
|
|
// can be used anywhere a heartbeat is needed
|
|
|
|
// closes channel on exit
|
|
|
|
|
|
|
|
if interval > 0 {
|
|
|
|
rand.Seed(time.Now().UnixNano())
|
|
|
|
}
|
|
|
|
|
|
|
|
for atomic.LoadInt32(&m.Active) > 0 {
|
|
|
|
// atomoic read may cause memory leak, can revisit
|
|
|
|
ping <- struct{}{} // no mem
|
|
|
|
sleep := time.Duration(hb-interval) * scale
|
|
|
|
if interval > 0 {
|
|
|
|
sleep += time.Duration(rand.Intn(2*interval)) * scale
|
|
|
|
}
|
|
|
|
time.Sleep(sleep)
|
|
|
|
}
|
|
|
|
// exited, close chan
|
|
|
|
close(ping)
|
|
|
|
}
|
|
|
|
|
|
|
|
// connection timeout generator
|
|
|
|
|
|
|
|
func (c *Connection) Timeout() (time.Duration, error) {
|
|
|
|
// exponential backoff
|
|
|
|
c.Lock()
|
|
|
|
defer c.Unlock()
|
|
|
|
if int(c.Attempts) < c.MaxAttempts {
|
|
|
|
to := time.Duration(50*math.Pow(2, c.Attempts)) * time.Millisecond
|
|
|
|
c.Attempts += 1
|
|
|
|
return to, nil
|
|
|
|
}
|
|
|
|
return 0, ErrMaxAttemptsExceeded
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *Connection) ResetConnections() {
|
|
|
|
c.Lock()
|
|
|
|
defer c.Unlock()
|
|
|
|
c.Attempts = 0
|
|
|
|
}
|