merging wrapup

spinach 2 years ago
commit 6ad458ff3d

.gitignore vendored

@ -13,23 +13,9 @@
# swap files
# Dependency directories (remove the comment below to include it)
# vendor/
# binaries
# logs
# binaries generated in testing
# task related
# machine dependent

@ -1 +0,0 @@

@ -1,3 +1,64 @@
Distributed Monitoring and Control
## Distributed Monitoring and Control
This branch will serve as the staging ground for adding unit tests and documentation in order to finalize v0.1.0-alpha
## Table of Contents
* [Introduction](#introduction)
* [Getting Started](#getting-started)
* [Installation](#installation)
* [Usage](#usage)
* [Wiki](wiki/
* [Overview](wiki/
* [Server](wiki/
* [Reactor](wiki/
* [Hardware](wiki/
* [Networking](wiki/
* [GUI](wiki/
* [API](wiki/
* [Future Work](wiki/
## Introduction
FRMS serves as both an internal framework for testing reactor designs as well as a scalable customer facing application for monitoring and control.
The project makes heavy use of low-cost yet powerful embedded systems capable of running full Linux kernels.
Examples include the [BeagleBone Black]( which was heavily used in development of FRMS as well as the popular [Raspberry Pi 4](
For more information about the hardware used in the reactors see [here](wiki/
In its current state, FRMS is very bare bones and exists mostly as a proof of concept.
Quickly navigate to:
- [Getting started](#getting-started)
- [Improving the project](wiki/
- [More information](wiki/
- [Bugs/questions](
## Getting Started
For specific information about decisions made in development see [here](wiki/
### Installation
The project uses a make alternative called [task]( written in go for building and testing.
After using `git clone` to clone the repository, you can then build binaries of the two commands `server` and `reactor` for testing.
The binaries will be put into the `bin/` folder and will be labeled with the platform and architecture they were built for.
**WARNING**: The reactor binary currently relies on the Linux [i2c-tools]( to interact with the i2c bus.
This may cause undefined behavior when run on a device without the tools installed. More information about this design choice can be found [here](wiki/
### Usage
## Technical Information
### Overview
### Reactor
### Networking
### GUI
### API
### Future Work

@ -4,14 +4,24 @@ tasks:
desc: "clean all of the old binaries"
- rm -v bin/* 2>/dev/null
- rm -vf bin/frms_* 2>/dev/null
desc: "Runs the full test suite"
- bin/
desc: "Rebuilds protobuf for gRPC"
- protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative internal/pkg/grpc/*.proto
desc: "cleans and builds all"
deps: [clean, bb, server]
desc: "builds arm reactor binaries and arm/amd server binaries"
deps: [arm32-reactor, arm64-reactor, arm64-server, amd64-server]
desc: "Builds and sends to the beaglebone"
desc: "Builds reactor binary for 32 bit arm linux device"
- task: go-build
@ -19,26 +29,43 @@ tasks:
GOARCH: "arm"
GOOS: "linux"
BUILD_DIR: "reactor"
- scp bin/reactor_linux_arm debian:~/
desc: "Builds server binary"
desc: "Builds reactor binary for 64 bit arm linux device"
- task: go-build
GOARCH: "arm64"
GOOS: "linux"
BUILD_DIR: "reactor"
desc: "Builds server binary for 64 bit arm linux device"
- task: go-build
GOARCH: "arm64"
GOOS: "linux"
BUILD_DIR: "server"
desc: "Builds server binary for amd linux machine"
- task: go-build
GOARCH: "amd64"
GOOS: "linux"
BUILD_DIR: "server"
GOOS: "{{OS}}"
internal: true
- go build -o bin/{{.BUILD_DIR}}_{{.GOOS}}_{{.GOARCH}} cmd/{{.BUILD_DIR}}/main.go
- go build -o bin/frms_{{.BUILD_DIR}}_{{.GOOS}}_{{.GOARCH}} cmd/{{.BUILD_DIR}}/main.go
- internal/pkg/**/*.go
- cmd/{{.BUILD_DIR}}/main.go
- bin/{{.BUILD_DIR}}_{{.GOOS}}_{{.GOARCH}}
- bin/frms_{{.BUILD_DIR}}_{{.GOOS}}_{{.GOARCH}}

@ -0,0 +1,89 @@
#!/usr/bin/env python
import json
import subprocess
class PackageTest:
def __init__(self):
self.status = ""
self.tests = []
self.totaltime = 0
res = {}
output =["go","test","-count=1","-json","./..."], capture_output=True, text=True)
output = str(output.stdout)
output = output.split('\n')
for line in output[:-1]:
# parse the json
parsed = json.loads(line)
action = parsed["Action"]
# skip
if action in ["start", "output", "run"]:
# create blank if doesn't exist
if parsed["Package"] not in res:
res[parsed["Package"]] = PackageTest()
pkg = res[parsed["Package"]]
if "Test" not in parsed:
# top level package result
pkg.status = action
if "Elapsed" in parsed:
pkg.totaltime = parsed["Elapsed"]
# individual test
totalRan = 0
totalPassed = 0
totalTime = 0
# generating output from parsed json
for name, info in res.items():
pkgname = name.split('/')
pkgname = '/'.join(name.split('/')[1:])
if info.status == "skip":
print("Skipped %s" % (pkgname))
print("\nTesting %s:" % (pkgname))
passed = 0
total = 0
for test in info.tests:
total += 1
out = []
if test[1] == "pass":
passed += 1
out = [" " + test[0] + ":",'\033[32mpass\033[0m ',str(test[2]) + 's']
elif test[1] == "fail":
out = [" " + test[0] + ":",'\033[31mfail\033[0m ',str(test[2]) + 's']
print(f"{out[0] : <30}{out[1] : >5}{out[2] : >8}")
result = ""
if info.status == "pass":
result = "\033[32mPASSED\033[0m"
result = "\033[31mFAILED\033[0m"
# keep track of grand totals
totalRan += total
totalPassed += passed
totalTime += info.totaltime
print(" %s %d/%d in %.3fs" % (result, passed, total, info.totaltime))
# output overall test statistics
if totalRan == totalPassed:
result = "\033[32mPASSED\033[0m"
result = "\033[31mFAILED\033[0m"
print("\nSUMMARY:\n\t%s %d/%d in %.3fs" % (result, totalPassed, totalRan, totalTime))

@ -1,153 +0,0 @@
# adding commands
usage() {
# how to use this build script
cat <<EOF
usage: $0 [-c][-l][-i s] s1 [s2....]
s1, s2, etc. the systems to build for (see -l)
-c, --clean cleans the bin folder of any existing builds
-f, --force same as clean but skips prompt
-l, --list list available systems to build for
-s, --scp will attempt to scp to aplicable devices
-h, --help display this message
list_systems() {
# list available systems to build for
cat <<EOF
Name (shorthand) SCP available? (y/n)
$0 Name or $0 (shorthand) will build for the device
RaspberryPi (rpi) y
BeagleBone (bb) y
Desktop (d) n
Server (s) n
clean_builds() {
# cleans old builds
if [[ "$FORCE"=true ]] ; then
printf 'Cleaning old builds... \n'
rm -v bin/* 2>/dev/null
read -p "Clean old builds?(y/n) " -n 1 -r
if [[ $REPLY =~ ^[Yy]$ ]] ; then
rm -v bin/* 2>/dev/null
printf 'Clean!\n'
create_build() {
# create build for $1
case $1 in
'rpi' )
echo "NOT IMPL">&2 && exit 1
printf 'Building for Raspberry Pi!\n'
echo "NOT IMPL">&2 && exit 1
printf 'Building for BeagleBone!\n'
printf 'Building for Server!\n'
printf 'Building for Desktop!\n'
* )
printf 'ERROR: %s type unrecognized!\n' "$1"
exit 1
# setting up build
OUTFILE=$(printf '%s_linux_%s' "$PLATFORM" "$GARCH")
# building
( cd server; env GOOS=linux GOARCH="$GARCH" $GARM go build -o "$OUTFILE")
mv server/"$OUTFILE" bin/"$OUTFILE"
echo "Finished"
# scp
if [[ -n "$SCP" ]] ; then
printf 'Attempting to transfer to %s\n' "$2"
if [[ "$1" == "bb" ]] ; then
printf 'Copying to %s\n' ""
scp "$HOME/FRMS/bin/$OUTFILE" debian:~/
printf 'SCP Not available!\n'
# handle long form
for arg in "$@"; do
case "$arg" in
'--help') set -- "$@" "-h" ;;
'--list') set -- "$@" "-l" ;;
'--scp') set -- "$@" "-s" ;;
'--clean') set -- "$@" "-c" ;;
'--force') set -- "$@" "-f" ;;
*) set -- "$@" "$arg" ;;
# handle args
while getopts "lcsfh" opt ; do
case "$opt" in
'h' )
exit 0
'c' )
'f' )
's' )
'?' )
exit 1
shift $(($OPTIND - 1))
for dev in "$@"; do
case "$dev" in
'RaspberryPi') dev='rpi' ;;
'BeagleBone') dev='bb' ;;
'Server') dev='s' ;;
'Desktop') dev='d' ;;
create_build "$dev"
printf 'Nothing else to do!\n'
# tar -czf pireactor.tar.gz -C bin reactor_linux_arm64
# tar -czf bbreactor.tar.gz -C bin reactor_linux_arm
# tar -czf server.tar.gz -C bin server_linux_amd64
# tar -czf tui.tar.gz -C bin tui_linux_amd64 tui_linux_arm tui_linux_arm64

@ -0,0 +1,72 @@
package main
import (
type reactorCoordinator interface {
func NewReactorCoordinator(
config *viper.Viper,
ch chan error,
) (reactorCoordinator, error) {
// allows interface checking as opposed to calling directly
return reactor.NewCoordinator(config, ch)
func main() {
// shutdown
gracefulShutdown := make(chan os.Signal, 1)
signal.Notify(gracefulShutdown, syscall.SIGINT, syscall.SIGTERM)
// load any stored configs
home, err := os.UserHomeDir()
if err != nil {
configPath := fmt.Sprintf("%s/.config/FRMS", home)
configFile := "reactor"
configExt := "yaml"
conf, err := config.Load(configFile, configPath, configExt)
if err != nil {
ch := make(chan error)
rlc, err := NewReactorCoordinator(conf, ch) // passing conf and err
if err != nil {
go rlc.Start()
logging.Debug(logging.DStart, "Reactor Started")
// check for errors
select {
case err := <-ch:
if err != nil {
conf.WriteConfig() // save changes
case <-gracefulShutdown:
// sigint
fmt.Printf("\nStoring config to %s\n", conf.ConfigFileUsed())
if err := conf.WriteConfig(); err != nil {

@ -3,7 +3,7 @@ FROM --platform=$BUILDPLATFORM golang:1.18-alpine as builder
COPY . .
COPY ../ .
RUN go mod download

@ -1,14 +0,0 @@
#DB_URL=$(cat "$INFLUX_CONFIGS_PATH" | awk '/url/ {print $3}' | head -n 1)
TOKEN=$(influx auth list --user ${DOCKER_INFLUXDB_INIT_USER_ID} --hide-headers | cut -f 3)
ORG=$(influx org list | grep ${DOCKER_INFLUXDB_INIT_ORG_ID} | awk '{print $2}')
# creating starting server YAML
echo -e "server:\n db-url: ${DB_URL}\n db-org: ${ORG}\n db-token: ${TOKEN}" >/configs/server.yaml;
# creating grafana yaml
influx user create -n grafana -o ${ORG}
GRAFANA_TOKEN=$(influx auth list --user grafana --hide-headers | cut -f 3)
echo -e "apiVersion: 1\n\ndeleteDatasources:\n\ndatasources:\n - name: INFLUXDB\n type: influxdb\n access: proxy\n url: ${DB_URL}\n jsonData:\n httpMode: GET\n httpHeaderName1: 'Authorization'\n secureJsonData:\n httpHeaderValue1: 'Token ${GRAFANA_TOKEN}'" >/grafana/datasources/datasource.yaml

@ -1,6 +0,0 @@
# ${gen_statement}
db-url: "${db_url}"
db-org: "${db_org}"
db-token: "${db_token}"

@ -0,0 +1,51 @@
// package Config wraps the viper library to setup/manage files for FRMS
package config
import (
// Load the file at path/file into a viper object.
// Expects config file to be yaml.
func Load(file, path, ext string) (*viper.Viper, error) {
logging.Debug(logging.DStart, "CON loading %s", file)
config := viper.New()
//configFile := fmt.Sprintf("%s/%s.%s", path, file, ext)
// Sets env vars
// create config directory if it doesn't exist
if err := os.MkdirAll(path, 0750); err != nil && !os.IsExist(err) {
return config, err
// attempt to create an empty config incase it doesn't exist
// if err := config.SafeWriteConfigAs(configFile); err != nil {
// // if error thrown because file exists, fine to ignore
// if _, ok := err.(viper.ConfigFileAlreadyExistsError); !ok {
// return config, err
// }
// }
if err := config.ReadInConfig(); err != nil {
fmt.Printf("read error %v\n", config)
return config, err
logging.Debug(logging.DStart, "CON Loaded configs from %#V", config.ConfigFileUsed())
// returning config object
return config, nil

@ -0,0 +1,64 @@
// package Database wraps some influx db methods to provide functionality.
package database
import (
influx ""
var (
ErrDBConnection = errors.New("connection to database failed")
ErrNoURLFound = errors.New("database url not found")
var db influx.Client
// Connect takes in a config and attempts to create a client for influxdb.
// Will automatically write changes back to config for future attempts
func Connect(config *viper.Viper) error {
url := config.GetString("db.url")
token := config.GetString("db.token")
if url == "" {
return ErrNoURLFound
db = influx.NewClient(url, token)
if token == "" {
// try setup
fmt.Printf("attempting to setup database at %v\n", url)
user := config.GetString("db.username")
password := config.GetString("db.password")
org := config.GetString("")
bucket := config.GetString("db.bucket")
Setup(user, pass, org, bucket
db = influx.NewClient(url, token)
return nil
func Setup(user, pass, org, bucket string, ret int) (string, error) {
resp, err := db.Setup(context.Background(), user, pass, org, bucket, ret)
return "", nil
func GetBucket(id int) (string, error) {
return "", nil
func GetToken(id int) (string, error) {
// bucket, err := client.BucketsAPI().FindBucketByName(context.Background(), id)
return "", nil

@ -22,12 +22,12 @@ import (
// I dont think I actually need this interface, package manager has a point
type Manager interface {
Start() error
Exit() error
Stop() error
Timeout() (time.Duration, error)
HeartBeat(chan struct{}, int, int, time.Duration) // creates a hb
func NewManager(max int) Manager {
func NewManager(max int) (Manager, error) {
return manager.New(max)
@ -71,10 +71,22 @@ type ReactorCoordinator struct {
Err chan error
func NewCoordinator(config *viper.Viper, errCh chan error) *ReactorCoordinator {
func NewCoordinator(
config *viper.Viper,
errCh chan error,
) (*ReactorCoordinator, error) {
m := NewManager(6) // max 6 attempts
dc := NewDeviceCoordinator(config)
m, err := NewManager(6) // max 6 attempts
if err != nil {
return &ReactorCoordinator{}, err
dc, err := NewDeviceCoordinator(config)
if err != nil {
return &ReactorCoordinator{}, err
c := &ReactorCoordinator{
Manager: m,
@ -83,10 +95,7 @@ func NewCoordinator(config *viper.Viper, errCh chan error) *ReactorCoordinator {
Err: errCh,
// this is going to be scuffed
//c.DB = &DB{Bucket: "bb", Org: "ForeLight", URL: url, Token: "S1UZssBu6KPfHaQCt34pZFpyc5lzbH9XanYJWCkOI5FqLY7gq205C6FTH-CmugiPH6o2WoKlTkEuPgIfaJjAhw=="}
return c
return c, nil
func (c *ReactorCoordinator) Start() {
@ -129,7 +138,7 @@ func (c *ReactorCoordinator) LoadConfig() error {
if !c.Config.IsSet("") {
// get from hw
var id int
if id, err = system.GetId("eth0"); err != nil {
if id, err = system.GetID(); err != nil {
return err
c.Config.Set("", id)

@ -4,6 +4,7 @@ import (
pb "FRMS/internal/pkg/grpc"
@ -16,8 +17,8 @@ import (
// device manager
type DeviceManager interface {
Start() error
Exit() error
IsActive() int
Stop() error
IsActive() manager.Status
func NewDeviceManager(bus, addr int, config *viper.Viper) (DeviceManager, error) {
@ -35,15 +36,16 @@ type DeviceCoordinator struct {
DeviceManagers map[int]DeviceManager
func NewDeviceCoordinator(config *viper.Viper) *DeviceCoordinator {
func NewDeviceCoordinator(config *viper.Viper) (*DeviceCoordinator, error) {
dm := make(map[int]DeviceManager)
m := NewManager(0)
c := &DeviceCoordinator{
m, err := NewManager(0)
return &DeviceCoordinator{
Manager: m,
DeviceManagers: dm,
Config: config,
return c
}, err
func (c *DeviceCoordinator) Start(bus int) error {
@ -83,14 +85,14 @@ func (c *DeviceCoordinator) UpdateManagers(active map[int]bool) {
for addr, dm := range c.DeviceManagers {
_, ok := active[addr]
if ok && dm.IsActive() == 0 {
if ok && dm.IsActive() == manager.Inactive {
// active and dm not
if err := dm.Start(); err != nil {
} else if !ok && dm.IsActive() == 1 {
} else if !ok && dm.IsActive() == manager.Active {
// not active and dm is
if err := dm.Exit(); err != nil {
if err := dm.Stop(); err != nil {

@ -13,8 +13,8 @@ type ControllerManager struct {
Enabled bool // turn controller on or off
func NewControllerManager() *ControllerManager {
return &ControllerManager{}
func NewControllerManager() (*ControllerManager, error) {
return &ControllerManager{}, nil
func (c *ControllerManager) SetDeviceManager(d *DeviceManager) {

@ -14,7 +14,7 @@ type DOManager struct {
func NewDOManager() *DOManager {
func NewDOManager() (*DOManager, error) {
// atlas delays
a := &Atlas{
CalDelay: 1300,
@ -23,11 +23,10 @@ func NewDOManager() *DOManager {
sm := NewSensorManager()
m := &DOManager{
return &DOManager{
Atlas: a,
SensorManager: sm,
return m
}, nil
func (m *DOManager) Start() error {

@ -14,12 +14,12 @@ import (
type Manager interface {
// core manager
Start() error
Exit() error
IsActive() int
Stop() error
IsActive() manager.Status
HeartBeat(chan struct{}, int, int, time.Duration)
func NewManager() Manager {
func NewManager() (Manager, error) {
// no timeouts needed
return manager.New(0)
@ -36,21 +36,26 @@ type DeviceManager struct {
// config
Config *viper.Viper
// gRPC server
func NewDeviceManager(bus, addr int, config *viper.Viper, defaultName string) *DeviceManager {
func NewDeviceManager(
bus, addr int,
config *viper.Viper,
defaultName string,
) (*DeviceManager, error) {
// new base dm
m := NewManager()
dm := &DeviceManager{
m, err := NewManager()
if err != nil {
return &DeviceManager{}, err
return &DeviceManager{
Address: addr,
Bus: bus,
defaultName: defaultName,
Manager: m,
Config: config,
return dm
}, nil
func (m *DeviceManager) LoadConfig() error {

@ -1,6 +1,7 @@
package device
import (
@ -10,8 +11,8 @@ import (
// Returns the correct manager for sensor/controller
type Device interface {
Start() error
Exit() error
IsActive() int
Stop() error
IsActive() manager.Status
@ -25,26 +26,33 @@ func New(bus, addr int, config *viper.Viper) (Device, error) {
case 97:
// DO
defaultName = "DO Sensor"
m = NewDOManager()
m, err = NewDOManager()
case 99:
// pH
defaultName = "pH Sensor"
m = NewPHManager()
m, err = NewPHManager()
case 102:
// RTD
defaultName = "RTD Sensor"
m = NewRTDManager()
m, err = NewRTDManager()
case 256:
// PWM
defaultName = "PWM Controller"
m = NewPWMManager()
m, err = NewPWMManager()
err = errors.New(fmt.Sprintf("Error: device id %d unrecognized!", addr))
// setting device manager
dm := NewDeviceManager(bus, addr, config, defaultName)
if err != nil {
return m, err
dm, err := NewDeviceManager(bus, addr, config, defaultName)
if err != nil {
return m, err
// setting up gRPC server functionality
return m, err

@ -14,7 +14,7 @@ type PHManager struct {
func NewPHManager() *PHManager {
func NewPHManager() (*PHManager, error) {
// atlas delays
a := &Atlas{
CalDelay: 900,
@ -22,11 +22,10 @@ func NewPHManager() *PHManager {
sm := NewSensorManager()
m := &PHManager{
return &PHManager{
Atlas: a,
SensorManager: sm,
return m
}, nil
func (m *PHManager) Start() error {

@ -14,9 +14,12 @@ type PWMManager struct {
DutyCycle int
func NewPWMManager() *PWMManager {
cm := NewControllerManager()
return &PWMManager{ControllerManager: cm}
func NewPWMManager() (*PWMManager, error) {
cm, err := NewControllerManager()
return &PWMManager{
ControllerManager: cm,
}, err
// freq changing

@ -13,7 +13,7 @@ type RTDManager struct {
func NewRTDManager() *RTDManager {
func NewRTDManager() (*RTDManager, error) {
// atlas delays
a := &Atlas{
CalDelay: 600,
@ -21,11 +21,10 @@ func NewRTDManager() *RTDManager {
sm := NewSensorManager()
m := &RTDManager{
return &RTDManager{
Atlas: a,
SensorManager: sm,
return m
}, nil
func (m *RTDManager) Start() error {

@ -16,9 +16,6 @@ type SensorManager struct {
SampleTimestamp int64
*DeviceManager `mapstructure:",squash"`
// gRPC server
func NewSensorManager() *SensorManager {
@ -86,7 +83,7 @@ func (s *SensorManager) Monitor(f takeReading) {
fmt.Printf("Got %f\n", reading)
s.LatestSample = float32(reading)
s.SampleTimestamp = time.Now.Unix()
s.SampleTimestamp = time.Now().Unix()

@ -1,7 +1,9 @@
// package i2c wraps the [i2c package] to interact with the i2c
// with devices on the bus
// [i2c package]:
package i2c
// file has general wrappers to interact with i2c-tools
import (
@ -12,56 +14,75 @@ import (
// GetConnected returns a map of each device address and its current
// connection status.
func GetConnected(b int) (map[int]bool, error) {
// Returns all the connected devices by address
// might just do this in bash and make it easier
bus := strconv.Itoa(b)
devices := make(map[int]bool) // only keys
cmd := exec.Command("i2cdetect", "-y", "-r", bus)
var out bytes.Buffer
var errs bytes.Buffer
cmd.Stderr = &errs
cmd.Stdout = &out
if err := cmd.Run(); err != nil {
logging.Debug(logging.DError, "I2C error performing scan. %v", errs.String())
"I2C scan error %v",
return devices, err
// parsing the command output
outString := out.String()
// could split by \n too
split := strings.SplitAfter(outString, ":")
// 1st entry is garbage headers and ending is always \n##:
// 1st entry is reserved and ending is always \n##:
split = split[1:]
// create empty slice for all the devices
for i, v := range split {
lst := strings.Index(v, "\n")
trimmed := v[:lst]
lastDevice := strings.Index(v, "\n")
trimmed := v[:lastDevice]
trimmed = strings.Trim(trimmed, " ")
// trimmed now holds just possible sensor addresses
count := strings.Split(trimmed, " ")
for j, d := range count {
// the first row has to be offset by 3 but after its just i*16 + j
offset := 0
offset := j
if i == 0 {
offset = 3
offset += 3
addr := i*16 + j + offset
addr := i*16 + offset
if !strings.Contains(d, "--") && !strings.Contains(d, "UU") {
// active
devices[addr] = true
return devices, nil
// SendCmd sends an arbitrary command string to the device at addr on i2c bus b.
// Command will be converted from a string to bytes before
// attempting to be sent.
func SendCmd(b, addr int, command string) (string, error) {
// sends an arbituary commnd over specified bus to int
// might make a base script for this too
var cmd *exec.Cmd
bus := strconv.Itoa(b)
operation := "r20" // default read
frmt_cmd := "" // empty cmd
// default to an empty read
operation := "r20"
frmt_cmd := ""
if command != "" {
// command, do write
operation = fmt.Sprintf("w%d", len(command)) // write
@ -75,14 +96,19 @@ func SendCmd(b, addr int, command string) (string, error) {
// reading
cmd = exec.Command("i2ctransfer", "-y", bus, fmt.Sprintf("%s@0x%x", operation, addr))
// exec command
// execute command
var out bytes.Buffer
var errs bytes.Buffer
cmd.Stderr = &errs
cmd.Stdout = &out
if err := cmd.Run(); err != nil {
logging.Debug(logging.DError, "I2C error getting data! %v", err)
logging.Debug(logging.DError, "I2C command error %v", err)
return "", err
return out.String(), nil

@ -0,0 +1,114 @@
// package i2c wraps the [i2c package] to interact with the i2c
// with devices on the bus
// [i2c package]:
package i2c
import (
_ "log"
// GetConnected returns a map of each device address and its current
// connection status.
func GetConnected(b int) (map[int]bool, error) {
bus := strconv.Itoa(b)
devices := make(map[int]bool) // only keys
cmd := exec.Command("i2cdetect", "-y", "-r", bus)
var out bytes.Buffer
var errs bytes.Buffer
cmd.Stderr = &errs
cmd.Stdout = &out
if err := cmd.Run(); err != nil {
"I2C scan error %v",
return devices, err
// parsing the command output
outString := out.String()
split := strings.SplitAfter(outString, ":")
// 1st entry is reserved and ending is always \n##:
split = split[1:]
// create empty slice for all the devices
for i, v := range split {
lastDevice := strings.Index(v, "\n")
trimmed := v[:lastDevice]
trimmed = strings.Trim(trimmed, " ")
count := strings.Split(trimmed, " ")
for j, d := range count {
// the first row has to be offset by 3 but after its just i*16 + j
offset := j
if i == 0 {
offset += 3
addr := i*16 + offset
if !strings.Contains(d, "--") && !strings.Contains(d, "UU") {
devices[addr] = true
return devices, nil
// SendCmd sends an arbitrary command string to the device at addr on i2c bus b.
// Command will be converted from a string to bytes before
// attempting to be sent.
func SendCmd(b, addr int, command string) (string, error) {
var cmd *exec.Cmd
bus := strconv.Itoa(b)
// default to an empty read
operation := "r20"
frmt_cmd := ""
if command != "" {
// command, do write
operation = fmt.Sprintf("w%d", len(command)) // write
// formatting cmd
for _, char := range command {
// loop over string
frmt_cmd += fmt.Sprintf("0x%x", char)
cmd = exec.Command("i2ctransfer", "-y", bus, fmt.Sprintf("%s@0x%x", operation, addr), frmt_cmd)
} else {
// reading
cmd = exec.Command("i2ctransfer", "-y", bus, fmt.Sprintf("%s@0x%x", operation, addr))
// execute command
var out bytes.Buffer
var errs bytes.Buffer
cmd.Stderr = &errs
cmd.Stdout = &out
if err := cmd.Run(); err != nil {
logging.Debug(logging.DError, "I2C command error %v", err)
return "", err
return out.String(), nil

@ -1,118 +1,149 @@
<<<<<<< HEAD:reactor/needs_port/system/hwinfo.go
// package system uses linux commands to get hardware info for identifation
// package system uses linux ip command to get hardware info from devices
>>>>>>> origin/wrapup:internal/pkg/system/hwinfo.go
package system
import (
func GetId() (int, error) {
// gets the mac address and hashes into consistent id
maccmd := fmt.Sprintf("ifconfig %v | awk '/ether / {print $2}'", et)
var stderr bytes.Buffer
var out bytes.Buffer
cmd := exec.Command("bash", "-c", maccmd)
cmd.Stdout = &out
cmd.Stderr = &stderr
if err := cmd.Run(); err != nil {
return 0, err
hash := fnv.New32a()
id := hash.Sum32()
return int(id), nil
var (
ErrBusNotFound = errors.New("bus not found for device")
ErrNoNetworkInterface = errors.New("no default network found")
var HardwareInfo = &hardwareInfo{}
type hardwareInfo struct {
MAC string
IP string
ID int
Model string
// NetInterfaces is a struct to unmarshal the ip command json.
type netInterfaces []network
// Networks holds relevant information for each network interface.
// Used to unmarshal ip command json.
type network struct {
Subnets []netInfo `json:"addr_info"`
Mac string `json:"address"`
Group string `json:"group"`
State string `json:"operstate"`
type netInfo struct {
Family string `json:"family"`
Ip string `json:"local"`
func GetIp() (string, error) {
ipcmd := "ip route get 1 | sed 's/^.*src \([^ ]*\).*$/\1/;q'"
var stderr bytes.Buffer
var out bytes.Buffer
cmd := exec.Command("bash", "-c", ipcmd)
cmd.Stdout = &out
func getInfo() error {
var stderr, stdout bytes.Buffer
cmd := exec.Command("ip", "-j", "a")
cmd.Stdout = &stdout
cmd.Stderr = &stderr
if err := cmd.Run(); err != nil {
return "", err
return err
ip := strings.Trim(out.String(), " \n")
return ip, nil
var networks netInterfaces
func GetPort() (int, error) {
// obsolete
if addr, err := net.ResolveTCPAddr("tcp", ":0"); err != nil {
return 0, err
} else if lis, err := net.ListenTCP("tcp", addr); err != nil {
return 0, err
} else {
defer lis.Close()
return lis.Addr().(*net.TCPAddr).Port, nil
if err := json.Unmarshal(stdout.Bytes(), &networks); err != nil {
return err
// loop over found networks finding first default, IPv4 network currently
// UP (active).
// Eventually need to look only at wg interface which simplifies
// implementation.
for _, network := range networks {
if network.Group == "default" && network.State == "UP" {
for _, subnet := range network.Subnets {
if subnet.Family == "inet" {
hash := fnv.New32a()
id := hash.Sum32()
HardwareInfo.MAC = network.Mac
HardwareInfo.IP = subnet.Ip
HardwareInfo.ID = int(id)
return nil
return nil
func GetBus() (int, error) {
// preset busses
busList := map[string]int{"raspberrypi": 1, "beaglebone": 2}
// vars
var bus int
var ok bool
func GetID() (int, error) {
if name, err =: GetModel(); err != nil {
return bus, err
} else if bus, ok = busList[b]; !ok {
return 0, errors.New(fmt.Sprintf("No bus for dev %s", b))
if HardwareInfo.ID == 0 {
if err := getInfo(); err != nil {
return 0, err
// returns correct bus
return bus, nil
return HardwareInfo.ID, nil
func GetModel() (string, error) {
var stderr, out bytes.Buffer
cmd := exec.Command("bash", "-c", "hostname")
cmd.Stdout = &out
cmd.Stderr = &stderr
if err := cmd.Run(); err != nil {
return "", err
func GetIP() (string, error) {
if HardwareInfo.IP == "" {
if err := getInfo(); err != nil {
return "", err
b := out.String()
b = strings.Trim(b, " \n")
return b, nil
return HardwareInfo.IP, nil
func Get() error {
// responsible for filling out struct
//bus := map[string]int{"raspberrypi":1,"beaglebone":2} // eventually will replace this with a config file
func GetModel() (string, error) {
ipcmd := "ifconfig eth0 | awk '/inet / {print $2}'"
maccmd := "ifconfig eth0 | awk '/ether / {print $2}'"
devcmd := "lshw -C system 2>/dev/null | head -n 1"
if HardwareInfo.Model == "" {
res := [3]bytes.Buffer{}
var stderr bytes.Buffer
cmds := []string{ipcmd, maccmd, devcmd}
for i, c := range cmds {
cmd := exec.Command("bash", "-c", c)
cmd.Stdout = &res[i]
var stderr, out bytes.Buffer
cmd := exec.Command("uname", "-n")
cmd.Stdout = &out
cmd.Stderr = &stderr
err := cmd.Run()
if err != nil {
return err
if err := cmd.Run(); err != nil {
return "", err
b := out.String()
HardwareInfo.Model = strings.Trim(b, " \n")
// formatting
ip := res[0].String()
ip = strings.Trim(ip, " \n")
hash := fnv.New32a()
return HardwareInfo.Model, nil
b := res[2].String()
b = strings.Trim(b, " \n")
return nil
func GetBus() (int, error) {
// preset busses
busList := map[string]int{"raspberrypi": 1, "beaglebone": 2}
// vars
var bus int
var ok bool
if name, err := GetModel(); err != nil {
return bus, err
} else if bus, ok = busList[name]; !ok {
return 0, ErrBusNotFound
return bus, nil

@ -0,0 +1,41 @@
package system
import (
// TestGetInfo ensures that the array can be populated with device info.
func TestGetInfo(t *testing.T) {
if err := getInfo(); err != nil {
t.Fatalf(`getInfo() failed %v`, err)
// TestGetIP tests that the IP is returned without error and not empty.
func TestGetIP(t *testing.T) {
if ip, err := GetIP(); err != nil || ip == "" {
t.Fatalf(`GetIP() failed, got %s, err: %v`, ip, err)
// TestGetID tests that the ID is returned without error and not empty.
func TestGetID(t *testing.T) {
if id, err := GetID(); err != nil || id == 0 {
t.Fatalf(`GetID() failed, got %d %v`, id, err)
// TestGetModel tests that the Model is returned without error and not empty.
func TestGetModel(t *testing.T) {
if model, err := GetModel(); err != nil || model == "" {
t.Fatalf(`GetModel() failed, got %s %v`, model, err)
// TestGetModel tests that the correct error is thrown as the bus does not exist on the test rig.
func TestGetBus(t *testing.T) {
if bus, err := GetBus(); err != ErrBusNotFound {
t.Fatalf(`GetBus() should fail, got %d %v`, bus, err)

@ -3,14 +3,19 @@ module FRMS
go 1.18
require (
<<<<<<< HEAD:server/go.mod v1.5.0
>>>>>>> origin/wrapup:go.mod v2.9.1 v1.12.0 v1.7.1 v1.47.0 v1.28.0
require ( v1.1.1 // indirect v1.8.2 // indirect v1.5.4 // indirect v1.5.2 // indirect
@ -21,14 +26,24 @@ require ( v1.9.5 // indirect v2.0.1 // indirect v0.9.1 // indirect
<<<<<<< HEAD:server/go.mod
======= v1.0.0 // indirect
>>>>>>> origin/wrapup:go.mod v1.8.2 // indirect v1.5.0 // indirect v1.1.0 // indirect v1.0.5 // indirect v1.3.0 // indirect
<<<<<<< HEAD:server/go.mod v0.0.0-20220520000938-2e3eb7b945c2 // indirect v0.0.0-20220627191245-f75cf1eec38b // indirect v0.3.7 // indirect
======= v0.10.0 // indirect v0.8.0 // indirect v0.9.0 // indirect
>>>>>>> origin/wrapup:go.mod v0.0.0-20220519153652-3a47de7e79bd // indirect v1.66.4 // indirect v2.4.0 // indirect

@ -139,8 +139,6 @@ v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+ v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= v0.0.0-20200911160855-bcd43fbb19e8/go.mod h1:dvDLG8qkwmyD9a/MJJN3XJcT3xFxOKAvTZGvuZmac9g= v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
@ -299,8 +297,8 @@ v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= v0.0.0-20220520000938-2e3eb7b945c2 h1:NWy5+hlRbC7HK+PmcXVUmW1IMyFce7to56IUvhUFm7Y= v0.0.0-20220520000938-2e3eb7b945c2/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M= v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@ -363,8 +361,8 @@ v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7w v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= v0.0.0-20220627191245-f75cf1eec38b h1:2n253B2r0pYSmEV+UNCQoPfU/FiaizQEK5Gu4Bq4JE8= v0.0.0-20220627191245-f75cf1eec38b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@ -374,8 +372,13 @@ v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
<<<<<<< HEAD:server/go.sum v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
======= v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
>>>>>>> origin/wrapup:go.sum v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=

@ -0,0 +1,260 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.28.1
// protoc v3.21.12
// source: internal/pkg/grpc/handshake.proto
package grpc
import (
protoreflect ""
protoimpl ""
reflect "reflect"
sync "sync"
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
type ReactorClientRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Id uint32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"`
Port uint32 `protobuf:"varint,2,opt,name=port,proto3" json:"port,omitempty"` // client gRPC port
func (x *ReactorClientRequest) Reset() {
*x = ReactorClientRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_internal_pkg_grpc_handshake_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
func (x *ReactorClientRequest) String() string {
return protoimpl.X.MessageStringOf(x)
func (*ReactorClientRequest) ProtoMessage() {}
func (x *ReactorClientRequest) ProtoReflect() protoreflect.Message {
mi := &file_internal_pkg_grpc_handshake_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
return ms
return mi.MessageOf(x)
// Deprecated: Use ReactorClientRequest.ProtoReflect.Descriptor instead.
func (*ReactorClientRequest) Descriptor() ([]byte, []int) {
return file_internal_pkg_grpc_handshake_proto_rawDescGZIP(), []int{0}
func (x *ReactorClientRequest) GetId() uint32 {
if x != nil {
return x.Id
return 0
func (x *ReactorClientRequest) GetPort() uint32 {
if x != nil {
return x.Port
return 0
type ReactorClientResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Id uint32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"`
Url string `protobuf:"bytes,2,opt,name=url,proto3" json:"url,omitempty"`
Org string `protobuf:"bytes,3,opt,name=org,proto3" json:"org,omitempty"`
Token string `protobuf:"bytes,4,opt,name=token,proto3" json:"token,omitempty"`
Bucket string `protobuf:"bytes,5,opt,name=bucket,proto3" json:"bucket,omitempty"`
func (x *ReactorClientResponse) Reset() {
*x = ReactorClientResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_internal_pkg_grpc_handshake_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
func (x *ReactorClientResponse) String() string {
return protoimpl.X.MessageStringOf(x)
func (*ReactorClientResponse) ProtoMessage() {}
func (x *ReactorClientResponse) ProtoReflect() protoreflect.Message {
mi := &file_internal_pkg_grpc_handshake_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
return ms
return mi.MessageOf(x)
// Deprecated: Use ReactorClientResponse.ProtoReflect.Descriptor instead.
func (*ReactorClientResponse) Descriptor() ([]byte, []int) {
return file_internal_pkg_grpc_handshake_proto_rawDescGZIP(), []int{1}
func (x *ReactorClientResponse) GetId() uint32 {
if x != nil {
return x.Id
return 0
func (x *ReactorClientResponse) GetUrl() string {
if x != nil {
return x.Url
return ""
func (x *ReactorClientResponse) GetOrg() string {
if x != nil {
return x.Org
return ""
func (x *ReactorClientResponse) GetToken() string {
if x != nil {
return x.Token
return ""
func (x *ReactorClientResponse) GetBucket() string {
if x != nil {
return x.Bucket
return ""
var File_internal_pkg_grpc_handshake_proto protoreflect.FileDescriptor
var file_internal_pkg_grpc_handshake_proto_rawDesc = []byte{
0x0a, 0x21, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x67,
0x72, 0x70, 0x63, 0x2f, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x2e, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x12, 0x04, 0x67, 0x72, 0x70, 0x63, 0x22, 0x3a, 0x0a, 0x14, 0x52, 0x65, 0x61,
0x63, 0x74, 0x6f, 0x72, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x02, 0x69,
0x64, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x6f, 0x72, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52,
0x04, 0x70, 0x6f, 0x72, 0x74, 0x22, 0x79, 0x0a, 0x15, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72,
0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x0e,
0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x02, 0x69, 0x64, 0x12, 0x10,
0x0a, 0x03, 0x75, 0x72, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x75, 0x72, 0x6c,
0x12, 0x10, 0x0a, 0x03, 0x6f, 0x72, 0x67, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6f,
0x72, 0x67, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28,
0x09, 0x52, 0x05, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x16, 0x0a, 0x06, 0x62, 0x75, 0x63, 0x6b,
0x65, 0x74, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x62, 0x75, 0x63, 0x6b, 0x65, 0x74,
0x32, 0x5c, 0x0a, 0x09, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x12, 0x4f, 0x0a,
0x14, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x48, 0x61,
0x6e, 0x64, 0x6c, 0x65, 0x72, 0x12, 0x1a, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x61,
0x63, 0x74, 0x6f, 0x72, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x1a, 0x1b, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72,
0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x13,
0x5a, 0x11, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x67,
0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
var (
file_internal_pkg_grpc_handshake_proto_rawDescOnce sync.Once
file_internal_pkg_grpc_handshake_proto_rawDescData = file_internal_pkg_grpc_handshake_proto_rawDesc
func file_internal_pkg_grpc_handshake_proto_rawDescGZIP() []byte {
file_internal_pkg_grpc_handshake_proto_rawDescOnce.Do(func() {
file_internal_pkg_grpc_handshake_proto_rawDescData = protoimpl.X.CompressGZIP(file_internal_pkg_grpc_handshake_proto_rawDescData)
return file_internal_pkg_grpc_handshake_proto_rawDescData
var file_internal_pkg_grpc_handshake_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_internal_pkg_grpc_handshake_proto_goTypes = []interface{}{
(*ReactorClientRequest)(nil), // 0: grpc.ReactorClientRequest
(*ReactorClientResponse)(nil), // 1: grpc.ReactorClientResponse
var file_internal_pkg_grpc_handshake_proto_depIdxs = []int32{
0, // 0: grpc.handshake.ReactorClientHandler:input_type -> grpc.ReactorClientRequest
1, // 1: grpc.handshake.ReactorClientHandler:output_type -> grpc.ReactorClientResponse
1, // [1:2] is the sub-list for method output_type
0, // [0:1] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
func init() { file_internal_pkg_grpc_handshake_proto_init() }
func file_internal_pkg_grpc_handshake_proto_init() {
if File_internal_pkg_grpc_handshake_proto != nil {
if !protoimpl.UnsafeEnabled {
file_internal_pkg_grpc_handshake_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ReactorClientRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
return nil
file_internal_pkg_grpc_handshake_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ReactorClientResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
return nil
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_internal_pkg_grpc_handshake_proto_rawDesc,
NumEnums: 0,
NumMessages: 2,
NumExtensions: 0,
NumServices: 1,
GoTypes: file_internal_pkg_grpc_handshake_proto_goTypes,
DependencyIndexes: file_internal_pkg_grpc_handshake_proto_depIdxs,
MessageInfos: file_internal_pkg_grpc_handshake_proto_msgTypes,
File_internal_pkg_grpc_handshake_proto = out.File
file_internal_pkg_grpc_handshake_proto_rawDesc = nil
file_internal_pkg_grpc_handshake_proto_goTypes = nil
file_internal_pkg_grpc_handshake_proto_depIdxs = nil

@ -0,0 +1,21 @@
syntax = "proto3";
package grpc;
option go_package = "internal/pkg/grpc";
service handshake {
rpc ReactorClientHandler(ReactorClientRequest) returns (ReactorClientResponse);
message ReactorClientRequest {
uint32 id = 1;
uint32 port = 2; // client gRPC port
message ReactorClientResponse {
uint32 id = 1;
string url = 2;
string org = 3;
string token = 4;
string bucket = 5;

@ -0,0 +1,105 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v3.21.12
// source: internal/pkg/grpc/handshake.proto
package grpc
import (
context "context"
grpc ""
codes ""
status ""
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// HandshakeClient is the client API for Handshake service.
// For semantics around ctx use and closing/ending streaming RPCs, please refer to
type HandshakeClient interface {
ReactorClientHandler(ctx context.Context, in *ReactorClientRequest, opts ...grpc.CallOption) (*ReactorClientResponse, error)
type handshakeClient struct {
cc grpc.ClientConnInterface
func NewHandshakeClient(cc grpc.ClientConnInterface) HandshakeClient {
return &handshakeClient{cc}
func (c *handshakeClient) ReactorClientHandler(ctx context.Context, in *ReactorClientRequest, opts ...grpc.CallOption) (*ReactorClientResponse, error) {
out := new(ReactorClientResponse)
err :=, "/grpc.handshake/ReactorClientHandler", in, out, opts...)
if err != nil {
return nil, err
return out, nil
// HandshakeServer is the server API for Handshake service.
// All implementations must embed UnimplementedHandshakeServer
// for forward compatibility
type HandshakeServer interface {
ReactorClientHandler(context.Context, *ReactorClientRequest) (*ReactorClientResponse, error)
// UnimplementedHandshakeServer must be embedded to have forward compatible implementations.
type UnimplementedHandshakeServer struct {
func (UnimplementedHandshakeServer) ReactorClientHandler(context.Context, *ReactorClientRequest) (*ReactorClientResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method ReactorClientHandler not implemented")
func (UnimplementedHandshakeServer) mustEmbedUnimplementedHandshakeServer() {}
// UnsafeHandshakeServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to HandshakeServer will
// result in compilation errors.
type UnsafeHandshakeServer interface {
func RegisterHandshakeServer(s grpc.ServiceRegistrar, srv HandshakeServer) {
s.RegisterService(&Handshake_ServiceDesc, srv)
func _Handshake_ReactorClientHandler_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ReactorClientRequest)
if err := dec(in); err != nil {
return nil, err
if interceptor == nil {
return srv.(HandshakeServer).ReactorClientHandler(ctx, in)
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/grpc.handshake/ReactorClientHandler",
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(HandshakeServer).ReactorClientHandler(ctx, req.(*ReactorClientRequest))
return interceptor(ctx, in, info, handler)
// Handshake_ServiceDesc is the grpc.ServiceDesc for Handshake service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var Handshake_ServiceDesc = grpc.ServiceDesc{
ServiceName: "grpc.handshake",
HandlerType: (*HandshakeServer)(nil),
Methods: []grpc.MethodDesc{
MethodName: "ReactorClientHandler",
Handler: _Handshake_ReactorClientHandler_Handler,
Streams: []grpc.StreamDesc{},
Metadata: "internal/pkg/grpc/handshake.proto",

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.28.0
// protoc v3.12.4
// protoc-gen-go v1.28.1
// protoc v3.21.12
// source: internal/pkg/grpc/monitoring.proto
package grpc
@ -20,56 +20,7 @@ const (
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
type Status int32
const (
Status_DEAD Status = 0
Status_ALIVE Status = 1
Status_UNKOWN Status = 2
// Enum value maps for Status.
var (
Status_name = map[int32]string{
0: "DEAD",
1: "ALIVE",
2: "UNKOWN",
Status_value = map[string]int32{
"DEAD": 0,
"ALIVE": 1,
"UNKOWN": 2,
func (x Status) Enum() *Status {
p := new(Status)
*p = x
return p
func (x Status) String() string {
return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x))
func (Status) Descriptor() protoreflect.EnumDescriptor {
return file_internal_pkg_grpc_monitoring_proto_enumTypes[0].Descriptor()
func (Status) Type() protoreflect.EnumType {
return &file_internal_pkg_grpc_monitoring_proto_enumTypes[0]
func (x Status) Number() protoreflect.EnumNumber {
return protoreflect.EnumNumber(x)
// Deprecated: Use Status.Descriptor instead.
func (Status) EnumDescriptor() ([]byte, []int) {
return file_internal_pkg_grpc_monitoring_proto_rawDescGZIP(), []int{0}
type ReactorStatusResponse struct {
type ReactorAck struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
@ -77,8 +28,8 @@ type ReactorStatusResponse struct {
Id int32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"`
func (x *ReactorStatusResponse) Reset() {
*x = ReactorStatusResponse{}
func (x *ReactorAck) Reset() {
*x = ReactorAck{}
if protoimpl.UnsafeEnabled {
mi := &file_internal_pkg_grpc_monitoring_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
@ -86,13 +37,13 @@ func (x *ReactorStatusResponse) Reset() {
func (x *ReactorStatusResponse) String() string {
func (x *ReactorAck) String() string {
return protoimpl.X.MessageStringOf(x)
func (*ReactorStatusResponse) ProtoMessage() {}
func (*ReactorAck) ProtoMessage() {}
func (x *ReactorStatusResponse) ProtoReflect() protoreflect.Message {
func (x *ReactorAck) ProtoReflect() protoreflect.Message {
mi := &file_internal_pkg_grpc_monitoring_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
@ -104,30 +55,28 @@ func (x *ReactorStatusResponse) ProtoReflect() protoreflect.Message {
return mi.MessageOf(x)
// Deprecated: Use ReactorStatusResponse.ProtoReflect.Descriptor instead.
func (*ReactorStatusResponse) Descriptor() ([]byte, []int) {
// Deprecated: Use ReactorAck.ProtoReflect.Descriptor instead.
func (*ReactorAck) Descriptor() ([]byte, []int) {
return file_internal_pkg_grpc_monitoring_proto_rawDescGZIP(), []int{0}
func (x *ReactorStatusResponse) GetId() int32 {
func (x *ReactorAck) GetId() int32 {
if x != nil {
return x.Id
return 0
type ReactorStatusPing struct {
type ReactorPing struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Id int32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"`
// new devices
Devices []*Device `protobuf:"bytes,2,rep,name=devices,proto3" json:"devices,omitempty"`
func (x *ReactorStatusPing) Reset() {
*x = ReactorStatusPing{}
func (x *ReactorPing) Reset() {
*x = ReactorPing{}
if protoimpl.UnsafeEnabled {
mi := &file_internal_pkg_grpc_monitoring_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
@ -135,13 +84,13 @@ func (x *ReactorStatusPing) Reset() {
func (x *ReactorStatusPing) String() string {
func (x *ReactorPing) String() string {
return protoimpl.X.MessageStringOf(x)
func (*ReactorStatusPing) ProtoMessage() {}
func (*ReactorPing) ProtoMessage() {}
func (x *ReactorStatusPing) ProtoReflect() protoreflect.Message {
func (x *ReactorPing) ProtoReflect() protoreflect.Message {
mi := &file_internal_pkg_grpc_monitoring_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
@ -153,108 +102,34 @@ func (x *ReactorStatusPing) ProtoReflect() protoreflect.Message {
return mi.MessageOf(x)
// Deprecated: Use ReactorStatusPing.ProtoReflect.Descriptor instead.
func (*ReactorStatusPing) Descriptor() ([]byte, []int) {
// Deprecated: Use ReactorPing.ProtoReflect.Descriptor instead.
func (*ReactorPing) Descriptor() ([]byte, []int) {
return file_internal_pkg_grpc_monitoring_proto_rawDescGZIP(), []int{1}
func (x *ReactorStatusPing) GetId() int32 {
func (x *ReactorPing) GetId() int32 {
if x != nil {
return x.Id
return 0
func (x *ReactorStatusPing) GetDevices() []*Device {
if x != nil {
return x.Devices
return nil
type Device struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Addr int32 `protobuf:"varint,1,opt,name=addr,proto3" json:"addr,omitempty"` // i2c addr
Status Status `protobuf:"varint,2,opt,name=status,proto3,enum=grpc.Status" json:"status,omitempty"` // most recent status
func (x *Device) Reset() {
*x = Device{}
if protoimpl.UnsafeEnabled {
mi := &file_internal_pkg_grpc_monitoring_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
func (x *Device) String() string {
return protoimpl.X.MessageStringOf(x)
func (*Device) ProtoMessage() {}
func (x *Device) ProtoReflect() protoreflect.Message {
mi := &file_internal_pkg_grpc_monitoring_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
return ms
return mi.MessageOf(x)
// Deprecated: Use Device.ProtoReflect.Descriptor instead.
func (*Device) Descriptor() ([]byte, []int) {
return file_internal_pkg_grpc_monitoring_proto_rawDescGZIP(), []int{2}
func (x *Device) GetAddr() int32 {
if x != nil {
return x.Addr
return 0
func (x *Device) GetStatus() Status {
if x != nil {
return x.Status
return Status_DEAD
var File_internal_pkg_grpc_monitoring_proto protoreflect.FileDescriptor
var file_internal_pkg_grpc_monitoring_proto_rawDesc = []byte{
0x0a, 0x22, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x67,
0x72, 0x70, 0x63, 0x2f, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x69, 0x6e, 0x67, 0x2e, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x12, 0x04, 0x67, 0x72, 0x70, 0x63, 0x22, 0x27, 0x0a, 0x15, 0x52, 0x65,
0x61, 0x63, 0x74, 0x6f, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f,
0x6e, 0x73, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52,
0x02, 0x69, 0x64, 0x22, 0x4b, 0x0a, 0x11, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x53, 0x74,
0x61, 0x74, 0x75, 0x73, 0x50, 0x69, 0x6e, 0x67, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01,
0x20, 0x01, 0x28, 0x05, 0x52, 0x02, 0x69, 0x64, 0x12, 0x26, 0x0a, 0x07, 0x64, 0x65, 0x76, 0x69,
0x63, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0c, 0x2e, 0x67, 0x72, 0x70, 0x63,
0x2e, 0x44, 0x65, 0x76, 0x69, 0x63, 0x65, 0x52, 0x07, 0x64, 0x65, 0x76, 0x69, 0x63, 0x65, 0x73,
0x22, 0x42, 0x0a, 0x06, 0x44, 0x65, 0x76, 0x69, 0x63, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x61, 0x64,
0x64, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x04, 0x61, 0x64, 0x64, 0x72, 0x12, 0x24,
0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0c,
0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x06, 0x73, 0x74,
0x61, 0x74, 0x75, 0x73, 0x2a, 0x29, 0x0a, 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x08,
0x0a, 0x04, 0x44, 0x45, 0x41, 0x44, 0x10, 0x00, 0x12, 0x09, 0x0a, 0x05, 0x41, 0x4c, 0x49, 0x56,
0x45, 0x10, 0x01, 0x12, 0x0a, 0x0a, 0x06, 0x55, 0x4e, 0x4b, 0x4f, 0x57, 0x4e, 0x10, 0x02, 0x32,
0x5a, 0x0a, 0x0a, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x69, 0x6e, 0x67, 0x12, 0x4c, 0x0a,
0x14, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x48, 0x61,
0x6e, 0x64, 0x6c, 0x65, 0x72, 0x12, 0x17, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x61,
0x63, 0x74, 0x6f, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x50, 0x69, 0x6e, 0x67, 0x1a, 0x1b,
0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x53, 0x74, 0x61,
0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x13, 0x5a, 0x11, 0x69,
0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x67, 0x72, 0x70, 0x63,
0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x72, 0x6f, 0x74, 0x6f, 0x12, 0x04, 0x67, 0x72, 0x70, 0x63, 0x22, 0x1c, 0x0a, 0x0a, 0x52, 0x65,
0x61, 0x63, 0x74, 0x6f, 0x72, 0x41, 0x63, 0x6b, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01,
0x20, 0x01, 0x28, 0x05, 0x52, 0x02, 0x69, 0x64, 0x22, 0x1d, 0x0a, 0x0b, 0x52, 0x65, 0x61, 0x63,
0x74, 0x6f, 0x72, 0x50, 0x69, 0x6e, 0x67, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20,
0x01, 0x28, 0x05, 0x52, 0x02, 0x69, 0x64, 0x32, 0x49, 0x0a, 0x0a, 0x6d, 0x6f, 0x6e, 0x69, 0x74,
0x6f, 0x72, 0x69, 0x6e, 0x67, 0x12, 0x3b, 0x0a, 0x12, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72,
0x50, 0x69, 0x6e, 0x67, 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x12, 0x11, 0x2e, 0x67, 0x72,
0x70, 0x63, 0x2e, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x50, 0x69, 0x6e, 0x67, 0x1a, 0x10,
0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x41, 0x63, 0x6b,
0x28, 0x01, 0x42, 0x13, 0x5a, 0x11, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70,
0x6b, 0x67, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
var (
@ -269,24 +144,19 @@ func file_internal_pkg_grpc_monitoring_proto_rawDescGZIP() []byte {
return file_internal_pkg_grpc_monitoring_proto_rawDescData
var file_internal_pkg_grpc_monitoring_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
var file_internal_pkg_grpc_monitoring_proto_msgTypes = make([]protoimpl.MessageInfo, 3)
var file_internal_pkg_grpc_monitoring_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_internal_pkg_grpc_monitoring_proto_goTypes = []interface{}{
(Status)(0), // 0: grpc.Status
(*ReactorStatusResponse)(nil), // 1: grpc.ReactorStatusResponse
(*ReactorStatusPing)(nil), // 2: grpc.ReactorStatusPing
(*Device)(nil), // 3: grpc.Device
(*ReactorAck)(nil), // 0: grpc.ReactorAck
(*ReactorPing)(nil), // 1: grpc.ReactorPing
var file_internal_pkg_grpc_monitoring_proto_depIdxs = []int32{
3, // 0: grpc.ReactorStatusPing.devices:type_name -> grpc.Device
0, // 1: grpc.Device.status:type_name -> grpc.Status
2, // 2: grpc.monitoring.ReactorStatusHandler:input_type -> grpc.ReactorStatusPing
1, // 3: grpc.monitoring.ReactorStatusHandler:output_type -> grpc.ReactorStatusResponse
3, // [3:4] is the sub-list for method output_type
2, // [2:3] is the sub-list for method input_type
2, // [2:2] is the sub-list for extension type_name
2, // [2:2] is the sub-list for extension extendee
0, // [0:2] is the sub-list for field type_name
1, // 0: grpc.monitoring.ReactorPingHandler:input_type -> grpc.ReactorPing
0, // 1: grpc.monitoring.ReactorPingHandler:output_type -> grpc.ReactorAck
1, // [1:2] is the sub-list for method output_type
0, // [0:1] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
func init() { file_internal_pkg_grpc_monitoring_proto_init() }
@ -296,7 +166,7 @@ func file_internal_pkg_grpc_monitoring_proto_init() {
if !protoimpl.UnsafeEnabled {
file_internal_pkg_grpc_monitoring_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ReactorStatusResponse); i {
switch v := v.(*ReactorAck); i {
case 0:
return &v.state
case 1:
@ -308,19 +178,7 @@ func file_internal_pkg_grpc_monitoring_proto_init() {
file_internal_pkg_grpc_monitoring_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ReactorStatusPing); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
return nil
file_internal_pkg_grpc_monitoring_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*Device); i {
switch v := v.(*ReactorPing); i {
case 0:
return &v.state
case 1:
@ -337,14 +195,13 @@ func file_internal_pkg_grpc_monitoring_proto_init() {
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_internal_pkg_grpc_monitoring_proto_rawDesc,
NumEnums: 1,
NumMessages: 3,
NumEnums: 0,
NumMessages: 2,
NumExtensions: 0,
NumServices: 1,
GoTypes: file_internal_pkg_grpc_monitoring_proto_goTypes,
DependencyIndexes: file_internal_pkg_grpc_monitoring_proto_depIdxs,
EnumInfos: file_internal_pkg_grpc_monitoring_proto_enumTypes,
MessageInfos: file_internal_pkg_grpc_monitoring_proto_msgTypes,
File_internal_pkg_grpc_monitoring_proto = out.File

@ -4,26 +4,13 @@ package grpc;
option go_package = "internal/pkg/grpc";
service monitoring {
rpc ReactorStatusHandler(ReactorStatusPing) returns (ReactorStatusResponse);
rpc ReactorPingHandler(stream ReactorPing) returns (ReactorAck);
message ReactorStatusResponse {
message ReactorAck {
int32 id = 1;
message ReactorStatusPing {
message ReactorPing {
int32 id = 1;
// new devices
repeated Device devices = 2;
enum Status {
DEAD = 0;
ALIVE = 1;
message Device {
int32 addr = 1; // i2c addr
Status status = 2; // most recent status

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v3.12.4
// - protoc v3.21.12
// source: internal/pkg/grpc/monitoring.proto
package grpc
@ -22,7 +22,7 @@ const _ = grpc.SupportPackageIsVersion7
// For semantics around ctx use and closing/ending streaming RPCs, please refer to
type MonitoringClient interface {
ReactorStatusHandler(ctx context.Context, in *ReactorStatusPing, opts ...grpc.CallOption) (*ReactorStatusResponse, error)
ReactorPingHandler(ctx context.Context, opts ...grpc.CallOption) (Monitoring_ReactorPingHandlerClient, error)
type monitoringClient struct {
@ -33,20 +33,45 @@ func NewMonitoringClient(cc grpc.ClientConnInterface) MonitoringClient {
return &monitoringClient{cc}
func (c *monitoringClient) ReactorStatusHandler(ctx context.Context, in *ReactorStatusPing, opts ...grpc.CallOption) (*ReactorStatusResponse, error) {
out := new(ReactorStatusResponse)
err :=, "/grpc.monitoring/ReactorStatusHandler", in, out, opts...)
func (c *monitoringClient) ReactorPingHandler(ctx context.Context, opts ...grpc.CallOption) (Monitoring_ReactorPingHandlerClient, error) {
stream, err :=, &Monitoring_ServiceDesc.Streams[0], "/grpc.monitoring/ReactorPingHandler", opts...)
if err != nil {
return nil, err
return out, nil
x := &monitoringReactorPingHandlerClient{stream}
return x, nil
type Monitoring_ReactorPingHandlerClient interface {
Send(*ReactorPing) error
CloseAndRecv() (*ReactorAck, error)
type monitoringReactorPingHandlerClient struct {
func (x *monitoringReactorPingHandlerClient) Send(m *ReactorPing) error {
return x.ClientStream.SendMsg(m)
func (x *monitoringReactorPingHandlerClient) CloseAndRecv() (*ReactorAck, error) {
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
m := new(ReactorAck)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
return m, nil
// MonitoringServer is the server API for Monitoring service.
// All implementations must embed UnimplementedMonitoringServer
// for forward compatibility
type MonitoringServer interface {
ReactorStatusHandler(context.Context, *ReactorStatusPing) (*ReactorStatusResponse, error)
ReactorPingHandler(Monitoring_ReactorPingHandlerServer) error
@ -54,8 +79,8 @@ type MonitoringServer interface {
type UnimplementedMonitoringServer struct {
func (UnimplementedMonitoringServer) ReactorStatusHandler(context.Context, *ReactorStatusPing) (*ReactorStatusResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method ReactorStatusHandler not implemented")
func (UnimplementedMonitoringServer) ReactorPingHandler(Monitoring_ReactorPingHandlerServer) error {
return status.Errorf(codes.Unimplemented, "method ReactorPingHandler not implemented")
func (UnimplementedMonitoringServer) mustEmbedUnimplementedMonitoringServer() {}
@ -70,22 +95,30 @@ func RegisterMonitoringServer(s grpc.ServiceRegistrar, srv MonitoringServer) {
s.RegisterService(&Monitoring_ServiceDesc, srv)
func _Monitoring_ReactorStatusHandler_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ReactorStatusPing)
if err := dec(in); err != nil {
func _Monitoring_ReactorPingHandler_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(MonitoringServer).ReactorPingHandler(&monitoringReactorPingHandlerServer{stream})
type Monitoring_ReactorPingHandlerServer interface {
SendAndClose(*ReactorAck) error
Recv() (*ReactorPing, error)
type monitoringReactorPingHandlerServer struct {
func (x *monitoringReactorPingHandlerServer) SendAndClose(m *ReactorAck) error {
return x.ServerStream.SendMsg(m)
func (x *monitoringReactorPingHandlerServer) Recv() (*ReactorPing, error) {
m := new(ReactorPing)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
if interceptor == nil {
return srv.(MonitoringServer).ReactorStatusHandler(ctx, in)
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/grpc.monitoring/ReactorStatusHandler",
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(MonitoringServer).ReactorStatusHandler(ctx, req.(*ReactorStatusPing))
return interceptor(ctx, in, info, handler)
return m, nil
// Monitoring_ServiceDesc is the grpc.ServiceDesc for Monitoring service.
@ -94,12 +127,13 @@ func _Monitoring_ReactorStatusHandler_Handler(srv interface{}, ctx context.Conte
var Monitoring_ServiceDesc = grpc.ServiceDesc{
ServiceName: "grpc.monitoring",
HandlerType: (*MonitoringServer)(nil),
Methods: []grpc.MethodDesc{
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
MethodName: "ReactorStatusHandler",
Handler: _Monitoring_ReactorStatusHandler_Handler,
StreamName: "ReactorPingHandler",
Handler: _Monitoring_ReactorPingHandler_Handler,
ClientStreams: true,
Streams: []grpc.StreamDesc{},
Metadata: "internal/pkg/grpc/monitoring.proto",

@ -1,8 +1,8 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v3.6.1
// source: internal/pkg/grpc/server.proto
// - protoc v3.21.12
// source: internal/pkg/grpc/handshake.proto
package grpc
@ -22,7 +22,7 @@ const _ = grpc.SupportPackageIsVersion7
// For semantics around ctx use and closing/ending streaming RPCs, please refer to
type HandshakeClient interface {
ClientDiscoveryHandler(ctx context.Context, in *ClientRequest, opts ...grpc.CallOption) (*ClientResponse, error)
ReactorClientHandler(ctx context.Context, in *ReactorClientRequest, opts ...grpc.CallOption) (*ReactorClientResponse, error)
type handshakeClient struct {
@ -33,9 +33,9 @@ func NewHandshakeClient(cc grpc.ClientConnInterface) HandshakeClient {
return &handshakeClient{cc}
func (c *handshakeClient) ClientDiscoveryHandler(ctx context.Context, in *ClientRequest, opts ...grpc.CallOption) (*ClientResponse, error) {
out := new(ClientResponse)
err :=, "/grpc.handshake/ClientDiscoveryHandler", in, out, opts...)
func (c *handshakeClient) ReactorClientHandler(ctx context.Context, in *ReactorClientRequest, opts ...grpc.CallOption) (*ReactorClientResponse, error) {
out := new(ReactorClientResponse)
err :=, "/grpc.handshake/ReactorClientHandler", in, out, opts...)
if err != nil {
return nil, err
@ -46,7 +46,7 @@ func (c *handshakeClient) ClientDiscoveryHandler(ctx context.Context, in *Client
// All implementations must embed UnimplementedHandshakeServer
// for forward compatibility
type HandshakeServer interface {
ClientDiscoveryHandler(context.Context, *ClientRequest) (*ClientResponse, error)
ReactorClientHandler(context.Context, *ReactorClientRequest) (*ReactorClientResponse, error)
@ -54,8 +54,8 @@ type HandshakeServer interface {
type UnimplementedHandshakeServer struct {
func (UnimplementedHandshakeServer) ClientDiscoveryHandler(context.Context, *ClientRequest) (*ClientResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method ClientDiscoveryHandler not implemented")
func (UnimplementedHandshakeServer) ReactorClientHandler(context.Context, *ReactorClientRequest) (*ReactorClientResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method ReactorClientHandler not implemented")
func (UnimplementedHandshakeServer) mustEmbedUnimplementedHandshakeServer() {}
@ -70,20 +70,20 @@ func RegisterHandshakeServer(s grpc.ServiceRegistrar, srv HandshakeServer) {
s.RegisterService(&Handshake_ServiceDesc, srv)
func _Handshake_ClientDiscoveryHandler_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ClientRequest)
func _Handshake_ReactorClientHandler_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ReactorClientRequest)
if err := dec(in); err != nil {
return nil, err
if interceptor == nil {
return srv.(HandshakeServer).ClientDiscoveryHandler(ctx, in)
return srv.(HandshakeServer).ReactorClientHandler(ctx, in)
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/grpc.handshake/ClientDiscoveryHandler",
FullMethod: "/grpc.handshake/ReactorClientHandler",
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(HandshakeServer).ClientDiscoveryHandler(ctx, req.(*ClientRequest))
return srv.(HandshakeServer).ReactorClientHandler(ctx, req.(*ReactorClientRequest))
return interceptor(ctx, in, info, handler)
@ -96,10 +96,10 @@ var Handshake_ServiceDesc = grpc.ServiceDesc{
HandlerType: (*HandshakeServer)(nil),
Methods: []grpc.MethodDesc{
MethodName: "ClientDiscoveryHandler",
Handler: _Handshake_ClientDiscoveryHandler_Handler,
MethodName: "ReactorClientHandler",
Handler: _Handshake_ReactorClientHandler_Handler,
Streams: []grpc.StreamDesc{},
Metadata: "internal/pkg/grpc/server.proto",
Metadata: "internal/pkg/grpc/handshake.proto",

@ -1,3 +1,4 @@
// Package manager provides basic manager functions to embed into higher level managers.
package manager
import (
@ -9,91 +10,128 @@ import (
// basic manager for starting/stopping checks plus built in heartbeat for downtime detection
// used across server/reactor
var (
ErrInvalidMaxConn = errors.New("invalid max connection attempts")
ErrManagerInactive = errors.New("manager inactive")
ErrManagerActive = errors.New("manager active")
ErrMaxAttemptsExceeded = errors.New("max connection attempts exceeded")
type Connection struct {
Attempts float64 // float for pow
MaxAttempts int // max allowed
// Status is used as an enum for the current status.
// Could be expanded to include others such as killed, sleeping, etc.
type Status int
const (
Inactive Status = 0
Active Status = 1
// MaxConnAttempts is the maximum allowable connection attempts.
// Limited to 255 to prevent excessive timeout scaling.
const MaxConnAttempts = 0xFF
// Manager is a general purpose structure to implement basic capabilities.
// Stores state in active variable, modified through atomic swaps.
// Embeds a connection to be used in generating timeouts.
type Manager struct {
*Connection // embedded for timeout stuff
Active int32 // atomic checks
active int32
func New(maxCon int) *Manager {
// New creates a new manager with the maxConn maximum attempts.
// Throws ErrInvalidMaxConn if maxConn is not in [0, MaxConnAttempts].
func New(maxConn int) (*Manager, error) {
if maxConn < 0 || maxConn > MaxConnAttempts {
return &Manager{}, ErrInvalidMaxConn
c := &connection{MaxAttempts: maxConn}
c := &Connection{MaxAttempts: maxCon}
m := &Manager{
Connection: c,
connection: c,
return m
return m, nil
// Start attempts to start the manager.
// Throws ErrManagerActive error if the manager is already active.
func (m *Manager) Start() error {
// atomically checks/updates status
if atomic.CompareAndSwapInt32(&m.Active, 0, 1) {
if atomic.CompareAndSwapInt32(&, 0, 1) {
return nil
// already running
return errors.New("Manager already started!")
return ErrManagerActive
func (m *Manager) Exit() error {
if atomic.CompareAndSwapInt32(&m.Active, 1, 0) {
// Stop attempts to stop the manager.
// Throws ErrManagerInactive error if the manager is already inactive.
func (m *Manager) Stop() error {
if atomic.CompareAndSwapInt32(&, 1, 0) {
return nil
return errors.New("Manager not active!")
return ErrManagerInactive
func (m *Manager) IsActive() int {
return int(atomic.LoadInt32(&m.Active))
// IsActive returns the current ManagerStatus.
func (m *Manager) IsActive() Status {
return Status(atomic.LoadInt32(&
// Heartbeat tracker
// HeartBeat will send an empty struct over ping every hb (scale).
// The pings are sent ever (hb + rand(interval)) * scale.
// Where scale is typically time.Millisecond, time.Second etc.
// Will close the channel on exit to prevent leaks.
func (m *Manager) HeartBeat(
ping chan struct{},
hb, interval int,
scale time.Duration) {
func (m *Manager) HeartBeat(ping chan struct{}, hb, interval int, scale time.Duration) {
// pings channel every (HB + randInterval) * time.Duration
// can be used anywhere a heartbeat is needed
// closes channel on exit
for m.IsActive() == Active {
ping <- struct{}{}
if interval > 0 {
for atomic.LoadInt32(&m.Active) > 0 {
// atomoic read may cause memory leak, can revisit
ping <- struct{}{} // no mem
sleep := time.Duration(hb-interval) * scale
if interval > 0 {
sleep += time.Duration(rand.Intn(2*interval)) * scale
// exited, close chan
// connection timeout generator
// connection keeps track of maximum and current number of connection attempts.
// Concurrency safe as it is protected by a mutex.
type connection struct {
Attempts float64
MaxAttempts int
// Timeout returns an exponentially decaying timeout based on attempts.
// Returns timeout of type time.Duration in milliseconds.
// Returns ErrMaxAttemptsExceeded if too many attempts are tried.
func (c *connection) Timeout() (time.Duration, error) {
func (c *Connection) Timeout() (time.Duration, error) {
// exponential backoff
defer c.Unlock()
if int(c.Attempts) < c.MaxAttempts {
c.Attempts += 1
// 50, 100, 200...
to := time.Duration(50*math.Pow(2, c.Attempts)) * time.Millisecond
c.Attempts += 1
return to, nil
return 0, errors.New("Connection Failed")
return 0, ErrMaxAttemptsExceeded
func (c *Connection) ResetConnections() {
// ResetConnections sets the current connection attempts back to 0.
func (c *connection) ResetConnections() {
defer c.Unlock()
c.Attempts = 0

@ -0,0 +1,206 @@
package manager
import (
// creating, starting and stopping tests
// newManager is a helper for creating new managers for tests
func newManager(conn int, want error, t *testing.T) *Manager {
var manager *Manager
var err error
assert := assert.New(t)
manager, err = New(conn)
if err != want {
`New(%d) = %v, %v, %d max connections failed`,
assert.Equal(manager.IsActive(), Inactive, "manager should start inactive")
return manager
// TestEmptyManager creates a new manager with 0 max connections
func TestEmptyManager(t *testing.T) {
conn := 0
newManager(conn, nil, t)
// TestPostiveManager creates a new manager with valid max connections
func TestPositiveManager(t *testing.T) {
conn := rand.Intn(MaxConnAttempts)
newManager(conn, nil, t)
// TestNegativeManager creates a new manager with negative max connections
func TestNegativeManager(t *testing.T) {
conn := -1 * rand.Intn(MaxConnAttempts)
newManager(conn, ErrInvalidMaxConn, t)
// TestInvalidManager creates a new manager with large max connections
func TestInvalidManager(t *testing.T) {
conn := MaxConnAttempts + 0xf
newManager(conn, ErrInvalidMaxConn, t)
// TestManagerLifeCycle tests that a manager can start and exit several times
func TestManagerLifeCycle(t *testing.T) {
var manager *Manager
assert := assert.New(t)
conn := rand.Intn(MaxConnAttempts)
manager = newManager(conn, nil, t)
cycles := 10
// starting and stopping sequentially
for i := 0; i < cycles; i++ {
assert.NoError(manager.Start(), "starting manager failed")
assert.Equal(manager.IsActive(), Active, "manager inactive after start")
assert.NoError(manager.Stop(), "stopping manager failed")
assert.Equal(manager.IsActive(), Inactive, "manager active after stop")
// TestManagerStopFail tests that stopping an inactive manager will error
func TestManagerStopFail(t *testing.T) {
var manager *Manager
assert := assert.New(t)
conn := rand.Intn(MaxConnAttempts)
manager = newManager(conn, nil, t)
assert.NoError(manager.Start(), "starting manager failed")
// stopping sequentially
assert.NoError(manager.Stop(), "stopping manager failed")
assert.Error(manager.Stop(), "stopping inactive manager should fail")
// TestManagerStartFail tests that starting an active manager will error
func TestManagerStartFail(t *testing.T) {
var manager *Manager
assert := assert.New(t)
conn := rand.Intn(MaxConnAttempts)
manager = newManager(conn, nil, t)
// starting sequentially
assert.NoError(manager.Start(), "starting manager failed")
assert.Error(manager.Start(), "starting active manager should fail")
// auxiliary tests
// TestManagerTimeout checks that timeouts exponentially backoff
func TestManagerTimeout(t *testing.T) {
var manager *Manager
assert := assert.New(t)
conn := 10
manager = newManager(conn, nil, t)
assert.NoError(manager.Start(), "starting manager failed")
assert.Equal(manager.IsActive(), Active, "manager is inactive")
prevTimeout, err := manager.Timeout()
for i := 1; i <= conn; i++ {
assert.NoError(err, "generating timeout failed")
assert.True(prevTimeout > 0, "invalid timeout")
timeout, err := manager.Timeout()
if i == conn {
assert.Error(err, "allowed exceeding max attempts")
} else {
assert.NoError(err, "generating timeout failed")
timeout == 2*prevTimeout,
"incorrect timeout %d, expected %d",
timeout, 2*prevTimeout,
prevTimeout = timeout
// TestManagerHB tests the heartbeat channel opens and closes
func TestManagerHB(t *testing.T) {
var manager *Manager
assert := assert.New(t)
conn := rand.Intn(MaxConnAttempts)
manager = newManager(conn, nil, t)
assert.NoError(manager.Start(), "starting manager failed")
assert.Equal(manager.IsActive(), Active, "manager is inactive")
ch := make(chan struct{})
go manager.HeartBeat(ch, 10, 0, time.Millisecond)
for range ch {
// close on first ping
assert.NoError(manager.Stop(), "stopping manager failed")
assert.Equal(manager.IsActive(), Inactive, "manager is active")
// TestManagerHBTiming tests the heartbeat channel timing is correct
func TestManagerHBTiming(t *testing.T) {
var manager *Manager
assert := assert.New(t)
conn := rand.Intn(MaxConnAttempts)
manager = newManager(conn, nil, t)
assert.NoError(manager.Start(), "starting manager failed")
assert.Equal(manager.IsActive(), Active, "manager is inactive")
ch := make(chan struct{})
hb := 100
pings := 10
// expected time with tolerance for other logic and worst case rand timeout
expected := time.Duration(pings*hb+15) * time.Millisecond
go manager.HeartBeat(ch, hb, 1, time.Millisecond)
iter := 0
start := time.Now()
for range ch {
// close after 10 pings
iter += 1
if iter >= pings {
assert.NoError(manager.Stop(), "stopping manager failed")
end := time.Now()
assert.Equal(manager.IsActive(), Inactive, "manager is active")
assert.WithinDuration(start, end, expected, "inaccurate heartbeat")

@ -1,10 +1,15 @@
// package Server provides a way to listen for incoming connections
// and manage multiple reactor clients.
package server
import (
pb "FRMS/internal/pkg/grpc"
<<<<<<< HEAD:server/internal/pkg/server/coordinator.go
>>>>>>> origin/wrapup:internal/pkg/server/coordinator.go
@ -14,184 +19,68 @@ import (
// this package creates the central coordiantor and sub coordiantors to route clients
// db client interface
type Database interface {
// getters (all create if doesnt exist)
GetReactorClient(int) (string, string, string, string, error) // returns (url, org, bucket, token, err)
func NewDatabaseAdmin(config *viper.Viper) (Database, error) {
return influxdb.NewDBAdmin(config)
type CentralCoordinator struct {
// main coordinator
ClientConnections *ClientPacket
Config *viper.Viper
// from config
Ports map[string]int `mapstructure:"ports"`
Err chan error
func NewCentralCoordinator(config *viper.Viper, ch chan error) *CentralCoordinator {
// create a central coordinator to manage requests
db, err := NewDatabaseAdmin(config)
if err != nil {
ch <- err
rc, err := NewReactorCoordinator(config, ch)
if err != nil {
ch <- err
config.UnmarshalKey("server.ports", rc) // get reactor port
c := &CentralCoordinator{
Err: ch,
Config: config,
Database: db,
ReactorCoordinator: rc,
// grab config settings
if err = config.UnmarshalKey("server", c); err != nil {
ch <- err
var (
ErrMissingPort = errors.New("port not set")
return c
// Coordinator is runs on the server and is used to oversee
// the reactor managers as well as process incoming client connections.
type Coordinator struct {
Config *viper.Viper
listener net.Listener
grpcServer *grpc.Server
func (c *CentralCoordinator) Start() {
// starts up associated funcs
clientChan := make(chan *ClientPacket)
l := NewListener(clientChan, c.Err)
// grabs lis port
c.Config.UnmarshalKey("server.ports", l)
DatabasePort int `mapstructure:"database_port"`
GRPCPort int `mapstructure:"grpc_port"`
// starting reactor coordinator
if err := c.ReactorCoordinator.Start(); err != nil {
c.Err <- err
// starting listener
if err := l.Start(); err != nil {
c.Err <- err
// lastly start client listener
go c.ClientListener(clientChan)
func (c *CentralCoordinator) ClientListener(ch chan *ClientPacket) {
for client := range ch {
// basically loops until channel is closed
client.Response <- c.ClientHandler(client.Client) // respond with cred
directory map[int]*ReactorManager
managerMu sync.RWMutex
func (c *CentralCoordinator) ClientHandler(cl *Client) *ClientResponse {
// returns reactor db info
var err error
cr := &ClientResponse{Port: c.Ports[cl.Type]}
if cl.Type == "reactor" {
// get reactor info
go c.ReactorCoordinator.ClientHandler(cl)
// db info
cr.URL, cr.Org, cr.Token, cr.Bucket, err = c.Database.GetReactorClient(cl.Id)
} else {
// throw error
err = errors.New(fmt.Sprintf("Client type %s not recognized!"))
// returns based on cl type
if err != nil {
c.Err <- err
return cr
type ReactorCoordinator struct {
Port int `mapstructure:"reactor"`
Err chan error
// grpc
type ReactorManagers struct {
Config *viper.Viper
Directory map[int]*ReactorManager
// NewCentralCoordinator creates a central coordinator with the given global
// config and error channel.
func NewCentralCoordinator(config *viper.Viper, ch chan error) *Coordinator {
func NewReactorCoordinator(config *viper.Viper, errCh chan error) (*ReactorCoordinator, error) {
rmap := make(map[int]*ReactorManager)
rm := &ReactorManagers{Directory: rmap, Config: config}
c := &ReactorCoordinator{Err: errCh, ReactorManagers: rm}
return c, nil
func (c *ReactorCoordinator) Start() error {
logging.Debug(logging.DStart, "RCO 01 Starting!")
// register grpc service
return c.Register()
func (c *ReactorCoordinator) ClientHandler(cl *Client) {
// updates clients if nessecary
if err := c.UpdateReactorManager(cl, c.Err); err != nil {
c.Err <- err
return &Coordinator{
Err: ch,
Config: config,
directory: rmap,
func (m *ReactorManagers) GetReactorManager(id int) (*ReactorManager, error) {
defer m.RUnlock()
// Start loads config, starts network listener and registers grpc handlers.
// Ready for new clients on return.
func (c *Coordinator) Start() error {
rm, exists := m.Directory[id]
if !exists {
return &ReactorManager{}, errors.New(fmt.Sprintf("No manager for reactor %d!", id))
if err := c.Config.Unmarshal(c); err != nil {
return err
return rm, nil
func (m *ReactorManagers) UpdateReactorManager(cl *Client, errCh chan error) error {
// locking
defer m.RUnlock()
var err error
rm, exists := m.Directory[cl.Id]
if !exists {
logging.Debug(logging.DClient, "RCO creating manager for reactor client %v", cl.Id)
// creating
rm = NewReactorManager(cl, m.Config, errCh)
// starting
if err = rm.Start(); err != nil {
return err
m.Directory[cl.Id] = rm
// ensure it shows up as missing
if c.GRPCPort == 0 {
c.Config.Set("grpc_port", 0)
return ErrMissingPort
return rm.UpdateClient(cl)
func (r *ReactorCoordinator) Register() error {
lis, err := net.Listen("tcp", fmt.Sprintf(":%v", r.Port))
lis, err := net.Listen("tcp", fmt.Sprintf(":%v", c.GRPCPort))
if err != nil {
return err
grpcServer := grpc.NewServer()
pb.RegisterMonitoringServer(grpcServer, r)
go grpcServer.Serve(lis)
logging.Debug(logging.DClient, "RCO ready for client requests")
return nil
func (r *ReactorCoordinator) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) {
rm, err := r.GetReactorManager(int(req.GetId()))
// error checking
if err != nil {
return &pb.ReactorStatusResponse{}, err
return rm.ReactorStatusHandler(ctx, req)
c.listener = lis
c.grpcServer = grpcServer
return c.Register()

@ -0,0 +1,13 @@
package server
type dbinfo struct {
url string
org string
token string
bucket string
func (c *Coordinator) getReactorDatabaseCred(id int) (*dbinfo, error) {
return &dbinfo{}, nil

@ -0,0 +1,48 @@
package server
import (
pb "FRMS/internal/pkg/grpc"
// ClientDiscoveryHandler implements the grpc method which can be called
// by incoming clients to first make connection to the central
// coordinator and receive database credentials.
func (c *Coordinator) ReactorClientHandler(
ctx context.Context,
req *pb.ReactorClientRequest,
) (*pb.ReactorClientResponse, error) {
id := int(req.GetId())
"LIS 00 reactor %v has connected\n",
db, err := c.getReactorDatabaseCred(id)
if err != nil {
return &pb.ReactorClientResponse{}, err
return &pb.ReactorClientResponse{
Id: id,
Url: db.url,
Token: db.token,
Bucket: db.bucket,
}, err
// ReactorStatusHandler is a gRPC handler used to handle incoming
// reactor requests containing information about said reactor.
// It will get the associate reactor manager and pass the
// request device information before returning an acknowledgement.
func (c *Coordinator) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) {
// rm, err := c.LoadReactorManager(int(req.GetId()))
return &pb.ReactorStatusResponse{}, nil

@ -0,0 +1,121 @@
package server
import (
var (
ErrNoReactorManager = errors.New("no reactor manager found")
// ReactorManager can be started/stopped as clients connect/disconnect.
type ReactorManager struct {
Manager // base manager interface
// Manager is an interface requiring a structure that can be started
// and stopped as well as provide timeouts in milliseconds.
type Manager interface {
Start() error // status checks
Stop() error
Timeout() (time.Duration, error) // TO Generator
// NewManager returns a manager fulfilling the Manager interface as well as
// any potential errors.
func NewManager(max int) (Manager, error) {
return manager.New(max)
// GetReactorManager returns a reactor manager for passed id.
// Throws error if manager not found for id.
func (c *Coordinator) LoadReactorManager(id int) (*ReactorManager, error) {
defer c.managerMu.RUnlock()
rm, exists :=[id]
if !exists {
"RCO 00 creating manager for %v",
m, err := NewManager(0)
rm = &ReactorManager{
Manager: m,
if err = rm.Start(); err != nil {
return rm, err
}[id] = rm
return rm, nil
// // NewReactorManager takes in a client, config and channel to pass errors on.
// // Returns a new reactor manager as well as any errors that occured during
// // creation.
// // Uses MaxConnectionAttempts which defaults to 10 to prevent
// // unnessecary network load and/or timeout lengths.
// func NewReactorManager(
// ) (*ReactorManager, error) {
// m, err := NewManager(MaxConnectionAttempts)
// if err != nil {
// return &ReactorManager{}, err
// }
// return r, err
// }
// Start logs the start and calls start on the embedded manager.
func (r *ReactorManager) Start() error {
// logging.Debug(logging.DStart, "RMA starting", r.Id)
return r.Manager.Start()
// Stop logs the stop and calls stop on the embedded manager.
func (r *ReactorManager) Stop() error {
// logging.Debug(logging.DExit, "RMA %v stopping", r.Id)
return r.Manager.Stop()
// UpdateClient is used to change the underlying manager client if there
// changes to its data.
// BUG(Keegan): Client is not protected by a lock and may lead to races
// func (r *ReactorManager) UpdateClient(cl *Client) error {
// logging.Debug(logging.DClient, "RMA %v updating client", r.Id)
// r.Client = cl
// return nil
// }
// // ReactorDeviceHandler processes incoming device information and
// // updates the manager accordingly.
// func (r *ReactorManager) ReactorDeviceHandler(devs []*pb.Device) error {
// logging.Debug(logging.DClient, "CCO recieved ping from %v", r.Id)
// for _, dev := range devs {
// logging.Debug(
// logging.DClient,
// "CCO %v device %v is %v",
// r.Id,
// dev.GetAddr(),
// dev.GetStatus().String(),
// )
// }
// return nil
// }

@ -0,0 +1,19 @@
package server
import (
pb "FRMS/internal/pkg/grpc"
func (c *Coordinator) Register() error {
// register services
pb.RegisterHandshakeServer(c.grpcServer, c)
go c.grpcServer.Serve(c.listener)
// testing
pb.RegisterMonitoringServer(c.grpcServer, c)
logging.Debug(logging.DStart, "CCO 00 registered grpc")
return nil

@ -4,24 +4,26 @@ import (
conf "FRMS/internal/pkg/config"
type coordinator interface {
Start() error
func NewCoordinator(config *viper.Viper, ch chan error) coordinator {
return server.NewCentralCoordinator(config, ch)
<<<<<<< HEAD:server/main.go
func NewConfig(filename string) (*viper.Viper, error) {
return config.LoadConfig(filename)
@ -34,11 +36,17 @@ func NewWebSocket() ws {
return websocket.New()
>>>>>>> origin/wrapup:cmd/server/main.go
func main() {
// lets get this bread
fmt.Printf("starting server... ")
start := time.Now()
gracefulShutdown := make(chan os.Signal, 1)
signal.Notify(gracefulShutdown, syscall.SIGINT, syscall.SIGTERM)
<<<<<<< HEAD:server/main.go
// config file
conf, err := NewConfig("server")
if err != nil {
@ -72,6 +80,48 @@ func main() {
logging.Debug(logging.DStop, "CON stored successfully", conf.ConfigFileUsed())
userHome, err := os.UserHomeDir()
if err != nil {
configPath := fmt.Sprintf("%s/.config/FRMS", userHome)
configFile := "server"
configExt := "yaml"
config, err := conf.Load(configFile, configPath, configExt)
if err != nil {
errCh := make(chan error)
c := NewCoordinator(config, errCh)
if err := c.Start(); err != nil {
logging.Debug(logging.DStart, "CCO 01 %s started", config.Get("name"))
elapsed := time.Since(start)
fmt.Printf("done %v\n", elapsed.Round(time.Microsecond))
select {
case err := <-errCh:
case <-gracefulShutdown:
fmt.Printf("\nstopping server... ")
start := time.Now()
if err := config.WriteConfig(); err != nil {
logging.Debug(logging.DExit, "CON wrote %s", config.ConfigFileUsed())
elapsed := time.Since(start)
fmt.Printf("done %v\n", elapsed.Round(time.Microsecond))
>>>>>>> origin/wrapup:cmd/server/main.go

@ -0,0 +1,34 @@
### Documentation
- [ ] API
- [ ] Config
- [ ] Device
- [ ] gRPC
- [ ] I2C
- [ ] DB
- [ ] logging
- [ ] reactor
- [ ] server
- [ ] system?
### Testing
- [ ] Config
- [ ] Device
- [ ] i2c
- [ ] logging
- [ ] reactor
- [O] server
- [ ] gRPC?
- [ ] db?
- [ ] system?
### Misc
- [ ] i2c -> lib
- [ ] strip device pkg
- [ ] future work writeup
- [ ] strip reactor down
- [ ] websocket -> web-gRPC

@ -1,2 +0,0 @@
echo $(git ls-files | grep .go | awk '!/pb/' | xargs wc -l | tail -n 1)

@ -0,0 +1,3 @@
This will describe the idea for the API

@ -0,0 +1,3 @@
# Future Work
This will describe where to take the project from here

@ -0,0 +1,3 @@
This will describe the basic outline for the front-end gui

@ -0,0 +1,3 @@
# Networking
This will describe how the reactor/server/client talk to each other

@ -0,0 +1,10 @@
# Reactor
This will talk about the reactor setup
## Hardware
This will describe the hardware used
## I2C

@ -0,0 +1,3 @@
# Server Information
## This is a sample

@ -0,0 +1,14 @@
# Wiki
## Overview
This is an example of the overview section of the wiki
## Table of Contents
* [Server](
* [Reactor](
* [Networking](
* [GUI](
* [API](
* [Future Work](