You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

136 lines
3.4 KiB
Go

package server
import (
"fmt"
"time"
"math"
"sync"
"errors"
"google.golang.org/grpc"
"google.golang.org/grpc/status"
"google.golang.org/grpc/credentials/insecure"
)
// this package will implement a boilerplate manager
// manager connects to client on start and returns the gRPC connection to make gRPC clients
type Manager struct {
*Client // gives access to c.Ip c.Id etc
Hb time.Duration
Active active
Err chan error
}
type active struct{
sync.Mutex
bool
int
}
func NewManager(c *Client, err chan error) *Manager {
hb := time.Duration(1) //hb to
m := &Manager{Hb:hb,Err:err}
m.Client = c
return m
}
func (m *Manager) Start() {
// establish connection with client and start pinging at set intervals
if !m.Activate() {
// manager already running
m.Err <-errors.New("Manager already running!")
} // if we get here, manager is atomically activated and we can ensure start wont run again
}
func (m *Manager) Exit() {
// exit function to eventually allow saving to configs
if !m.Deactivate() {
m.Err <-errors.New("Manager already disabled!")
}
fmt.Printf("Manager %v exiting\n", m.Id)
}
// reactor manager atomic operations
func (m *Manager) IsActive() bool {
m.Active.Lock()
defer m.Active.Unlock()
return m.Active.bool
}
func (m *Manager) Activate() bool {
m.Active.Lock()
defer m.Active.Unlock()
alive := m.Active.bool
if alive {
return false
} else {
m.Active.bool = true
m.Active.int = 0
return m.Active.bool
}
}
func (m *Manager) Deactivate() bool {
m.Active.Lock()
defer m.Active.Unlock()
alive := m.Active.bool
if alive {
m.Active.bool = false
return true
} else {
return m.Active.bool
}
}
// connection stuff
func (m *Manager) Timeout() int {
// keeps track of and generates timeout [0-1.2s) over span of ~2.5s
// returns 0 on TO elapse
m.Active.Lock()
defer m.Active.Unlock()
if m.Active.int < 9 {
v := int(5 * math.Pow(float64(2), float64(m.Active.int)))
m.Active.int += 1
return v
} else {
// exceeded retries
return 0
}
}
func (m *Manager) Connect() *grpc.ClientConn{
// establish initial gRPC connection with client
var opts []grpc.DialOption
var conn *grpc.ClientConn
opts = append(opts,grpc.WithTransportCredentials(insecure.NewCredentials()))
for {
if !m.IsActive() {
fmt.Printf("Aborting connection attempt\n")
return &grpc.ClientConn{}
}
var err error
conn, err = grpc.Dial(fmt.Sprintf("%v:%v",m.Ip,m.Port),opts...)
// begin error handling
code := status.Code(err)
if code != 0 { // != OK
if code == (5 | 14) { // == unavailable or not found
to := m.Timeout()
if to == 0 {
fmt.Printf("Client not responding\n")
m.Exit()
return&grpc.ClientConn{}
}
fmt.Printf("gRPC endpoint currently unavailable, retrying in %v ms\n",to)
time.Sleep(time.Duration(to) * time.Millisecond)
} else {
fmt.Printf("ERR GRPC: %v\n",code)
m.Err <-err
}
}
break;
}
return conn
}