diff --git a/.gitignore b/.gitignore index 2f90cba..44e7ac8 100644 --- a/.gitignore +++ b/.gitignore @@ -13,23 +13,9 @@ # swap files *.swp -# Dependency directories (remove the comment below to include it) -# vendor/ # binaries -bin -*.tar.gz -# logs -*.log -# binaries generated in testing -cmd/server/server -cmd/reactor/reactor -cmd/tui/tui +bin/frms* # task related -.task - -# machine dependent -tokens/ -logs/ -influxdb/config +.task/ diff --git a/.task/checksum/go-build b/.task/checksum/go-build deleted file mode 100644 index c7012f1..0000000 --- a/.task/checksum/go-build +++ /dev/null @@ -1 +0,0 @@ -b43ecff1fe53e18c4c9b756b32d38078 diff --git a/README.md b/README.md index 211b8d6..5d2700f 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,64 @@ # DMAC -Distributed Monitoring and Control +## Distributed Monitoring and Control + +This branch will serve as the staging ground for adding unit tests and documentation in order to finalize v0.1.0-alpha + +## Table of Contents + +* [Introduction](#introduction) +* [Getting Started](#getting-started) + * [Installation](#installation) + * [Usage](#usage) +* [Wiki](wiki/wiki.md) + * [Overview](wiki/wiki.md#overview) + * [Server](wiki/server.md) + * [Reactor](wiki/reactor.md) + * [Hardware](wiki/reactor.md#hardware) + * [Networking](wiki/networking.md) + * [GUI](wiki/gui.md) + * [API](wiki/api.md) + * [Future Work](wiki/future-work.md) + +## Introduction + +FRMS serves as both an internal framework for testing reactor designs as well as a scalable customer facing application for monitoring and control. +The project makes heavy use of low-cost yet powerful embedded systems capable of running full Linux kernels. +Examples include the [BeagleBone Black](https://beagleboard.org/black) which was heavily used in development of FRMS as well as the popular [Raspberry Pi 4](https://www.raspberrypi.com/products/raspberry-pi-4-model-b/). +For more information about the hardware used in the reactors see [here](wiki/reactor.md#hardware). + +In its current state, FRMS is very bare bones and exists mostly as a proof of concept. +Quickly navigate to: +- [Getting started](#getting-started) +- [Improving the project](wiki/future-work.md) +- [More information](wiki/wiki.md) +- [Bugs/questions](https://github.com/fl-src/FRMS/issues/new) + +## Getting Started + +For specific information about decisions made in development see [here](wiki/wiki.md). + +### Installation + +The project uses a make alternative called [task](https://github.com/go-task/task) written in go for building and testing. +After using `git clone git@github.com:fl-src/FRMS.git` to clone the repository, you can then build binaries of the two commands `server` and `reactor` for testing. +The binaries will be put into the `bin/` folder and will be labeled with the platform and architecture they were built for. + +**WARNING**: The reactor binary currently relies on the Linux [i2c-tools](https://archive.kernel.org/oldwiki/i2c.wiki.kernel.org/index.php/I2C_Tools.html) to interact with the i2c bus. +This may cause undefined behavior when run on a device without the tools installed. More information about this design choice can be found [here](wiki/reactor.md#i2c) + +### Usage + +## Technical Information + +### Overview + +### Reactor + +### Networking + +### GUI + +### API + +### Future Work diff --git a/Taskfile.dist.yml b/Taskfile.dist.yml index 6d48331..4f0d97f 100644 --- a/Taskfile.dist.yml +++ b/Taskfile.dist.yml @@ -4,14 +4,24 @@ tasks: clean: desc: "clean all of the old binaries" cmds: - - rm -v bin/* 2>/dev/null + - rm -vf bin/frms_* 2>/dev/null + + test: + desc: "Runs the full test suite" + cmds: + - bin/gotest.py + + proto: + desc: "Rebuilds protobuf for gRPC" + cmds: + - protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative internal/pkg/grpc/*.proto all: - desc: "cleans and builds all" - deps: [clean, bb, server] + desc: "builds arm reactor binaries and arm/amd server binaries" + deps: [arm32-reactor, arm64-reactor, arm64-server, amd64-server] - bb: - desc: "Builds and sends to the beaglebone" + arm32-reactor: + desc: "Builds reactor binary for 32 bit arm linux device" cmds: - task: go-build vars: @@ -19,26 +29,43 @@ tasks: GOARCH: "arm" GOOS: "linux" BUILD_DIR: "reactor" - - scp bin/reactor_linux_arm debian:~/ - server: - desc: "Builds server binary" + arm64-reactor: + desc: "Builds reactor binary for 64 bit arm linux device" + cmds: + - task: go-build + vars: + GOARCH: "arm64" + GOOS: "linux" + BUILD_DIR: "reactor" + + arm64-server: + desc: "Builds server binary for 64 bit arm linux device" cmds: - task: go-build - vars: + vars: + GOARCH: "arm64" + GOOS: "linux" + BUILD_DIR: "server" + + amd64-server: + desc: "Builds server binary for amd linux machine" + cmds: + - task: go-build + vars: + GOARCH: "amd64" + GOOS: "linux" BUILD_DIR: "server" - GOOS: "{{OS}}" - GOARCH: "{{ARCH}}" go-build: internal: true cmds: - - go build -o bin/{{.BUILD_DIR}}_{{.GOOS}}_{{.GOARCH}} cmd/{{.BUILD_DIR}}/main.go + - go build -o bin/frms_{{.BUILD_DIR}}_{{.GOOS}}_{{.GOARCH}} cmd/{{.BUILD_DIR}}/main.go sources: - internal/pkg/**/*.go - cmd/{{.BUILD_DIR}}/main.go generates: - - bin/{{.BUILD_DIR}}_{{.GOOS}}_{{.GOARCH}} + - bin/frms_{{.BUILD_DIR}}_{{.GOOS}}_{{.GOARCH}} env: GOARM: "{{.GOARM}}" GOARCH: "{{.GOARCH}}" diff --git a/bin/gotest.py b/bin/gotest.py new file mode 100755 index 0000000..3a47f33 --- /dev/null +++ b/bin/gotest.py @@ -0,0 +1,89 @@ +#!/usr/bin/env python +import json +import subprocess + +class PackageTest: + def __init__(self): + self.status = "" + self.tests = [] + self.totaltime = 0 + +res = {} + +output = subprocess.run(["go","test","-count=1","-json","./..."], capture_output=True, text=True) + +output = str(output.stdout) +output = output.split('\n') + +for line in output[:-1]: + # parse the json + parsed = json.loads(line) + action = parsed["Action"] + + # skip + if action in ["start", "output", "run"]: + continue + + # create blank if doesn't exist + if parsed["Package"] not in res: + res[parsed["Package"]] = PackageTest() + + pkg = res[parsed["Package"]] + + if "Test" not in parsed: + # top level package result + pkg.status = action + if "Elapsed" in parsed: + pkg.totaltime = parsed["Elapsed"] + else: + # individual test + pkg.tests.append((parsed["Test"],parsed["Action"],parsed["Elapsed"])) + +totalRan = 0 +totalPassed = 0 +totalTime = 0 +# generating output from parsed json +for name, info in res.items(): + pkgname = name.split('/') + pkgname = '/'.join(name.split('/')[1:]) + if info.status == "skip": + print("Skipped %s" % (pkgname)) + continue + + print("\nTesting %s:" % (pkgname)) + + passed = 0 + total = 0 + + for test in info.tests: + total += 1 + out = [] + if test[1] == "pass": + passed += 1 + out = [" " + test[0] + ":",'\033[32mpass\033[0m ',str(test[2]) + 's'] + elif test[1] == "fail": + out = [" " + test[0] + ":",'\033[31mfail\033[0m ',str(test[2]) + 's'] + + print(f"{out[0] : <30}{out[1] : >5}{out[2] : >8}") + + result = "" + if info.status == "pass": + result = "\033[32mPASSED\033[0m" + else: + result = "\033[31mFAILED\033[0m" + + # keep track of grand totals + totalRan += total + totalPassed += passed + totalTime += info.totaltime + + print(" %s %d/%d in %.3fs" % (result, passed, total, info.totaltime)) + + +# output overall test statistics +if totalRan == totalPassed: + result = "\033[32mPASSED\033[0m" +else: + result = "\033[31mFAILED\033[0m" + +print("\nSUMMARY:\n\t%s %d/%d in %.3fs" % (result, totalPassed, totalRan, totalTime)) diff --git a/build.sh b/build.sh deleted file mode 100755 index fa991da..0000000 --- a/build.sh +++ /dev/null @@ -1,153 +0,0 @@ -#!/bin/bash - -# adding commands -usage() { - # how to use this build script - cat </dev/null - else - read -p "Clean old builds?(y/n) " -n 1 -r - if [[ $REPLY =~ ^[Yy]$ ]] ; then - rm -v bin/* 2>/dev/null - fi - fi - printf 'Clean!\n' -} - -create_build() { - # create build for $1 - case $1 in - 'rpi' ) - echo "NOT IMPL">&2 && exit 1 - printf 'Building for Raspberry Pi!\n' - GARCH="arm64" - PLATFORM="reactor" - ;; - 'bb') - echo "NOT IMPL">&2 && exit 1 - printf 'Building for BeagleBone!\n' - GARCH="arm" - GARM="GOARM=7" - PLATFORM="reactor" - ;; - 's') - printf 'Building for Server!\n' - GARCH="amd64" - PLATFORM="server" - ;; - 'd') - printf 'Building for Desktop!\n' - GARCH="amd64" - PLATFORM="server" - ;; - * ) - printf 'ERROR: %s type unrecognized!\n' "$1" - usage - exit 1 - ;; - esac - # setting up build - OUTFILE=$(printf '%s_linux_%s' "$PLATFORM" "$GARCH") - # building - ( cd server; env GOOS=linux GOARCH="$GARCH" $GARM go build -o "$OUTFILE") - mv server/"$OUTFILE" bin/"$OUTFILE" - - echo "Finished" - - # scp - if [[ -n "$SCP" ]] ; then - printf 'Attempting to transfer to %s\n' "$2" - if [[ "$1" == "bb" ]] ; then - printf 'Copying to %s\n' "192.168.100.90" - scp "$HOME/FRMS/bin/$OUTFILE" debian:~/ - else - printf 'SCP Not available!\n' - fi - fi -} - -# handle long form -for arg in "$@"; do - shift - case "$arg" in - '--help') set -- "$@" "-h" ;; - '--list') set -- "$@" "-l" ;; - '--scp') set -- "$@" "-s" ;; - '--clean') set -- "$@" "-c" ;; - '--force') set -- "$@" "-f" ;; - *) set -- "$@" "$arg" ;; - esac -done - -# handle args -while getopts "lcsfh" opt ; do - case "$opt" in - 'h' ) - usage - exit 0 - ;; - 'c' ) - clean_builds - ;; - 'f' ) - FORCE=true - clean_builds - ;; - 's' ) - SCP=true - ;; - 'l') - list_systems - ;; - '?' ) - usage - exit 1 - ;; - esac -done - -shift $(($OPTIND - 1)) - -for dev in "$@"; do - case "$dev" in - 'RaspberryPi') dev='rpi' ;; - 'BeagleBone') dev='bb' ;; - 'Server') dev='s' ;; - 'Desktop') dev='d' ;; - esac - create_build "$dev" -done -printf 'Nothing else to do!\n' - -# tar -czf pireactor.tar.gz -C bin reactor_linux_arm64 -# tar -czf bbreactor.tar.gz -C bin reactor_linux_arm -# tar -czf server.tar.gz -C bin server_linux_amd64 -# tar -czf tui.tar.gz -C bin tui_linux_amd64 tui_linux_arm tui_linux_arm64 diff --git a/cmd/reactor/main.go b/cmd/reactor/main.go new file mode 100644 index 0000000..eb7e6a3 --- /dev/null +++ b/cmd/reactor/main.go @@ -0,0 +1,72 @@ +package main + +import ( + "FRMS/internal/pkg/config" + "FRMS/internal/pkg/logging" + "FRMS/internal/pkg/reactor" + "fmt" + "os" + "syscall" + + "os/signal" + + "github.com/spf13/viper" +) + +type reactorCoordinator interface { + Start() +} + +func NewReactorCoordinator( + config *viper.Viper, + ch chan error, +) (reactorCoordinator, error) { + // allows interface checking as opposed to calling directly + return reactor.NewCoordinator(config, ch) +} + +func main() { + // shutdown + gracefulShutdown := make(chan os.Signal, 1) + signal.Notify(gracefulShutdown, syscall.SIGINT, syscall.SIGTERM) + + // load any stored configs + home, err := os.UserHomeDir() + if err != nil { + panic(err) + } + + configPath := fmt.Sprintf("%s/.config/FRMS", home) + configFile := "reactor" + configExt := "yaml" + + conf, err := config.Load(configFile, configPath, configExt) + if err != nil { + panic(err) + } + + ch := make(chan error) + rlc, err := NewReactorCoordinator(conf, ch) // passing conf and err + if err != nil { + panic(err) + } + + go rlc.Start() + logging.Debug(logging.DStart, "Reactor Started") + + // check for errors + select { + case err := <-ch: + if err != nil { + conf.WriteConfig() // save changes + panic(err) + } + case <-gracefulShutdown: + // sigint + fmt.Printf("\nStoring config to %s\n", conf.ConfigFileUsed()) + if err := conf.WriteConfig(); err != nil { + panic(err) + } + os.Exit(0) + } +} diff --git a/.dockerignore b/docker/.dockerignore similarity index 100% rename from .dockerignore rename to docker/.dockerignore diff --git a/docker/Dockerfile.reactor b/docker/Dockerfile.reactor index 371d40e..43d7023 100644 --- a/docker/Dockerfile.reactor +++ b/docker/Dockerfile.reactor @@ -3,7 +3,7 @@ FROM --platform=$BUILDPLATFORM golang:1.18-alpine as builder WORKDIR /app -COPY . . +COPY ../ . RUN go mod download diff --git a/influxdb/startup/influxsetup.sh b/influxdb/startup/influxsetup.sh deleted file mode 100755 index d78f709..0000000 --- a/influxdb/startup/influxsetup.sh +++ /dev/null @@ -1,14 +0,0 @@ -#!/bin/bash - -#DB_URL=$(cat "$INFLUX_CONFIGS_PATH" | awk '/url/ {print $3}' | head -n 1) -DB_URL="frms-db-1:8086" - -TOKEN=$(influx auth list --user ${DOCKER_INFLUXDB_INIT_USER_ID} --hide-headers | cut -f 3) -ORG=$(influx org list | grep ${DOCKER_INFLUXDB_INIT_ORG_ID} | awk '{print $2}') -# creating starting server YAML -echo -e "server:\n db-url: ${DB_URL}\n db-org: ${ORG}\n db-token: ${TOKEN}" >/configs/server.yaml; - -# creating grafana yaml -influx user create -n grafana -o ${ORG} -GRAFANA_TOKEN=$(influx auth list --user grafana --hide-headers | cut -f 3) -echo -e "apiVersion: 1\n\ndeleteDatasources:\n\ndatasources:\n - name: INFLUXDB\n type: influxdb\n access: proxy\n url: ${DB_URL}\n jsonData:\n httpMode: GET\n httpHeaderName1: 'Authorization'\n secureJsonData:\n httpHeaderValue1: 'Token ${GRAFANA_TOKEN}'" >/grafana/datasources/datasource.yaml diff --git a/influxdb/startup/template.yaml b/influxdb/startup/template.yaml deleted file mode 100644 index d832594..0000000 --- a/influxdb/startup/template.yaml +++ /dev/null @@ -1,6 +0,0 @@ ----- -# ${gen_statement} -server: - db-url: "${db_url}" - db-org: "${db_org}" - db-token: "${db_token}" diff --git a/internal/pkg/config/config.go b/internal/pkg/config/config.go new file mode 100644 index 0000000..54e7907 --- /dev/null +++ b/internal/pkg/config/config.go @@ -0,0 +1,51 @@ +// package Config wraps the viper library to setup/manage files for FRMS +package config + +import ( + "FRMS/internal/pkg/logging" + "fmt" + "os" + + "github.com/spf13/viper" +) + +// Load the file at path/file into a viper object. +// Expects config file to be yaml. +func Load(file, path, ext string) (*viper.Viper, error) { + + logging.Debug(logging.DStart, "CON loading %s", file) + + config := viper.New() + + //configFile := fmt.Sprintf("%s/%s.%s", path, file, ext) + + config.SetConfigName(file) + config.AddConfigPath(path) + config.SetConfigType(ext) + + // Sets env vars + config.AutomaticEnv() + + // create config directory if it doesn't exist + if err := os.MkdirAll(path, 0750); err != nil && !os.IsExist(err) { + return config, err + } + + // attempt to create an empty config incase it doesn't exist + // if err := config.SafeWriteConfigAs(configFile); err != nil { + // // if error thrown because file exists, fine to ignore + // if _, ok := err.(viper.ConfigFileAlreadyExistsError); !ok { + // return config, err + // } + // } + + if err := config.ReadInConfig(); err != nil { + fmt.Printf("read error %v\n", config) + return config, err + } + + logging.Debug(logging.DStart, "CON Loaded configs from %#V", config.ConfigFileUsed()) + + // returning config object + return config, nil +} diff --git a/internal/pkg/database/reactor.go b/internal/pkg/database/reactor.go new file mode 100644 index 0000000..84df903 --- /dev/null +++ b/internal/pkg/database/reactor.go @@ -0,0 +1,64 @@ +// package Database wraps some influx db methods to provide functionality. +package database + +import ( + "context" + "errors" + "fmt" + + influx "github.com/influxdata/influxdb-client-go/v2" + "github.com/spf13/viper" +) + +var ( + ErrDBConnection = errors.New("connection to database failed") + ErrNoURLFound = errors.New("database url not found") +) + +var db influx.Client + +// Connect takes in a config and attempts to create a client for influxdb. +// Will automatically write changes back to config for future attempts +func Connect(config *viper.Viper) error { + + url := config.GetString("db.url") + token := config.GetString("db.token") + + if url == "" { + return ErrNoURLFound + } + + db = influx.NewClient(url, token) + + if token == "" { + // try setup + fmt.Printf("attempting to setup database at %v\n", url) + + user := config.GetString("db.username") + password := config.GetString("db.password") + org := config.GetString("db.org") + bucket := config.GetString("db.bucket") + + Setup(user, pass, org, bucket + } + + db = influx.NewClient(url, token) + + return nil +} + +func Setup(user, pass, org, bucket string, ret int) (string, error) { + + resp, err := db.Setup(context.Background(), user, pass, org, bucket, ret) + + return "", nil +} + +func GetBucket(id int) (string, error) { + return "", nil +} + +func GetToken(id int) (string, error) { + // bucket, err := client.BucketsAPI().FindBucketByName(context.Background(), id) + return "", nil +} diff --git a/reactor/needs_port/core/coordinator.go b/reactor/needs_port/core/coordinator.go index 1d9dd8b..11b562c 100644 --- a/reactor/needs_port/core/coordinator.go +++ b/reactor/needs_port/core/coordinator.go @@ -22,12 +22,12 @@ import ( // I dont think I actually need this interface, package manager has a point type Manager interface { Start() error - Exit() error + Stop() error Timeout() (time.Duration, error) HeartBeat(chan struct{}, int, int, time.Duration) // creates a hb } -func NewManager(max int) Manager { +func NewManager(max int) (Manager, error) { return manager.New(max) } @@ -71,10 +71,22 @@ type ReactorCoordinator struct { Err chan error } -func NewCoordinator(config *viper.Viper, errCh chan error) *ReactorCoordinator { +func NewCoordinator( + config *viper.Viper, + errCh chan error, +) (*ReactorCoordinator, error) { - m := NewManager(6) // max 6 attempts - dc := NewDeviceCoordinator(config) + m, err := NewManager(6) // max 6 attempts + + if err != nil { + return &ReactorCoordinator{}, err + } + + dc, err := NewDeviceCoordinator(config) + + if err != nil { + return &ReactorCoordinator{}, err + } c := &ReactorCoordinator{ Manager: m, @@ -83,10 +95,7 @@ func NewCoordinator(config *viper.Viper, errCh chan error) *ReactorCoordinator { Err: errCh, } - // this is going to be scuffed - //c.DB = &DB{Bucket: "bb", Org: "ForeLight", URL: url, Token: "S1UZssBu6KPfHaQCt34pZFpyc5lzbH9XanYJWCkOI5FqLY7gq205C6FTH-CmugiPH6o2WoKlTkEuPgIfaJjAhw=="} - - return c + return c, nil } func (c *ReactorCoordinator) Start() { @@ -129,7 +138,7 @@ func (c *ReactorCoordinator) LoadConfig() error { if !c.Config.IsSet("reactor.id") { // get from hw var id int - if id, err = system.GetId("eth0"); err != nil { + if id, err = system.GetID(); err != nil { return err } c.Config.Set("reactor.id", id) diff --git a/reactor/needs_port/core/device.go b/reactor/needs_port/core/device.go index c2afc6c..76a2022 100644 --- a/reactor/needs_port/core/device.go +++ b/reactor/needs_port/core/device.go @@ -4,6 +4,7 @@ import ( "FRMS/internal/pkg/device" pb "FRMS/internal/pkg/grpc" "FRMS/internal/pkg/i2c" + "FRMS/internal/pkg/manager" "fmt" "sync" "time" @@ -16,8 +17,8 @@ import ( // device manager type DeviceManager interface { Start() error - Exit() error - IsActive() int + Stop() error + IsActive() manager.Status } func NewDeviceManager(bus, addr int, config *viper.Viper) (DeviceManager, error) { @@ -35,15 +36,16 @@ type DeviceCoordinator struct { DeviceManagers map[int]DeviceManager } -func NewDeviceCoordinator(config *viper.Viper) *DeviceCoordinator { +func NewDeviceCoordinator(config *viper.Viper) (*DeviceCoordinator, error) { dm := make(map[int]DeviceManager) - m := NewManager(0) - c := &DeviceCoordinator{ + + m, err := NewManager(0) + + return &DeviceCoordinator{ Manager: m, DeviceManagers: dm, Config: config, - } - return c + }, err } func (c *DeviceCoordinator) Start(bus int) error { @@ -83,14 +85,14 @@ func (c *DeviceCoordinator) UpdateManagers(active map[int]bool) { for addr, dm := range c.DeviceManagers { _, ok := active[addr] - if ok && dm.IsActive() == 0 { + if ok && dm.IsActive() == manager.Inactive { // active and dm not if err := dm.Start(); err != nil { panic(err) } - } else if !ok && dm.IsActive() == 1 { + } else if !ok && dm.IsActive() == manager.Active { // not active and dm is - if err := dm.Exit(); err != nil { + if err := dm.Stop(); err != nil { panic(err) } } diff --git a/reactor/needs_port/device/controller.go b/reactor/needs_port/device/controller.go index ca03bf1..feaef36 100644 --- a/reactor/needs_port/device/controller.go +++ b/reactor/needs_port/device/controller.go @@ -13,8 +13,8 @@ type ControllerManager struct { Enabled bool // turn controller on or off } -func NewControllerManager() *ControllerManager { - return &ControllerManager{} +func NewControllerManager() (*ControllerManager, error) { + return &ControllerManager{}, nil } func (c *ControllerManager) SetDeviceManager(d *DeviceManager) { diff --git a/reactor/needs_port/device/do.go b/reactor/needs_port/device/do.go index ade64e8..e64871b 100644 --- a/reactor/needs_port/device/do.go +++ b/reactor/needs_port/device/do.go @@ -14,7 +14,7 @@ type DOManager struct { sync.RWMutex } -func NewDOManager() *DOManager { +func NewDOManager() (*DOManager, error) { // atlas delays a := &Atlas{ CalDelay: 1300, @@ -23,11 +23,10 @@ func NewDOManager() *DOManager { sm := NewSensorManager() - m := &DOManager{ + return &DOManager{ Atlas: a, SensorManager: sm, - } - return m + }, nil } func (m *DOManager) Start() error { diff --git a/reactor/needs_port/device/manager.go b/reactor/needs_port/device/manager.go index 92c321e..77faaf4 100644 --- a/reactor/needs_port/device/manager.go +++ b/reactor/needs_port/device/manager.go @@ -14,12 +14,12 @@ import ( type Manager interface { // core manager Start() error - Exit() error - IsActive() int + Stop() error + IsActive() manager.Status HeartBeat(chan struct{}, int, int, time.Duration) } -func NewManager() Manager { +func NewManager() (Manager, error) { // no timeouts needed return manager.New(0) } @@ -36,21 +36,26 @@ type DeviceManager struct { Manager // config Config *viper.Viper - // gRPC server - pb.UnimplementedDeviceServer } -func NewDeviceManager(bus, addr int, config *viper.Viper, defaultName string) *DeviceManager { +func NewDeviceManager( + bus, addr int, + config *viper.Viper, + defaultName string, +) (*DeviceManager, error) { // new base dm - m := NewManager() - dm := &DeviceManager{ + m, err := NewManager() + if err != nil { + return &DeviceManager{}, err + } + + return &DeviceManager{ Address: addr, Bus: bus, defaultName: defaultName, Manager: m, Config: config, - } - return dm + }, nil } func (m *DeviceManager) LoadConfig() error { diff --git a/reactor/needs_port/device/mappings.go b/reactor/needs_port/device/mappings.go index 92c6ff7..143b27f 100644 --- a/reactor/needs_port/device/mappings.go +++ b/reactor/needs_port/device/mappings.go @@ -1,6 +1,7 @@ package device import ( + "FRMS/internal/pkg/manager" "errors" "fmt" @@ -10,8 +11,8 @@ import ( // Returns the correct manager for sensor/controller type Device interface { Start() error - Exit() error - IsActive() int + Stop() error + IsActive() manager.Status SetDeviceManager(*DeviceManager) } @@ -25,26 +26,33 @@ func New(bus, addr int, config *viper.Viper) (Device, error) { case 97: // DO defaultName = "DO Sensor" - m = NewDOManager() + m, err = NewDOManager() case 99: // pH defaultName = "pH Sensor" - m = NewPHManager() + m, err = NewPHManager() case 102: // RTD defaultName = "RTD Sensor" - m = NewRTDManager() + m, err = NewRTDManager() case 256: // PWM defaultName = "PWM Controller" - m = NewPWMManager() + m, err = NewPWMManager() default: err = errors.New(fmt.Sprintf("Error: device id %d unrecognized!", addr)) } - // setting device manager - dm := NewDeviceManager(bus, addr, config, defaultName) + + if err != nil { + return m, err + } + + dm, err := NewDeviceManager(bus, addr, config, defaultName) + if err != nil { + return m, err + } + m.SetDeviceManager(dm) - // setting up gRPC server functionality return m, err } diff --git a/reactor/needs_port/device/ph.go b/reactor/needs_port/device/ph.go index f6d8dde..f86646d 100644 --- a/reactor/needs_port/device/ph.go +++ b/reactor/needs_port/device/ph.go @@ -14,7 +14,7 @@ type PHManager struct { sync.RWMutex } -func NewPHManager() *PHManager { +func NewPHManager() (*PHManager, error) { // atlas delays a := &Atlas{ CalDelay: 900, @@ -22,11 +22,10 @@ func NewPHManager() *PHManager { } sm := NewSensorManager() - m := &PHManager{ + return &PHManager{ Atlas: a, SensorManager: sm, - } - return m + }, nil } func (m *PHManager) Start() error { diff --git a/reactor/needs_port/device/pwm.go b/reactor/needs_port/device/pwm.go index 130c17e..5840a3c 100644 --- a/reactor/needs_port/device/pwm.go +++ b/reactor/needs_port/device/pwm.go @@ -14,9 +14,12 @@ type PWMManager struct { DutyCycle int } -func NewPWMManager() *PWMManager { - cm := NewControllerManager() - return &PWMManager{ControllerManager: cm} +func NewPWMManager() (*PWMManager, error) { + cm, err := NewControllerManager() + + return &PWMManager{ + ControllerManager: cm, + }, err } // freq changing diff --git a/reactor/needs_port/device/rtd.go b/reactor/needs_port/device/rtd.go index 07ef3a4..eb82af4 100644 --- a/reactor/needs_port/device/rtd.go +++ b/reactor/needs_port/device/rtd.go @@ -13,7 +13,7 @@ type RTDManager struct { sync.RWMutex } -func NewRTDManager() *RTDManager { +func NewRTDManager() (*RTDManager, error) { // atlas delays a := &Atlas{ CalDelay: 600, @@ -21,11 +21,10 @@ func NewRTDManager() *RTDManager { } sm := NewSensorManager() - m := &RTDManager{ + return &RTDManager{ Atlas: a, SensorManager: sm, - } - return m + }, nil } func (m *RTDManager) Start() error { diff --git a/reactor/needs_port/device/sensor.go b/reactor/needs_port/device/sensor.go index 560b0da..b53393d 100644 --- a/reactor/needs_port/device/sensor.go +++ b/reactor/needs_port/device/sensor.go @@ -16,9 +16,6 @@ type SensorManager struct { SampleTimestamp int64 *DeviceManager `mapstructure:",squash"` - - // gRPC server - pb.UnimplementedSensorServer } func NewSensorManager() *SensorManager { @@ -86,7 +83,7 @@ func (s *SensorManager) Monitor(f takeReading) { fmt.Printf("Got %f\n", reading) s.sampleMu.Lock() s.LatestSample = float32(reading) - s.SampleTimestamp = time.Now.Unix() + s.SampleTimestamp = time.Now().Unix() s.sampleMu.Unlock() } } diff --git a/reactor/needs_port/i2c/bus.go b/reactor/needs_port/i2c/bus.go index 12a55d4..867168c 100644 --- a/reactor/needs_port/i2c/bus.go +++ b/reactor/needs_port/i2c/bus.go @@ -1,7 +1,9 @@ +// package i2c wraps the [i2c package] to interact with the i2c +// with devices on the bus +// +// [i2c package]: https://pkg.go.dev/periph.io/x/conn/v3/i2c#pkg-overview package i2c -// file has general wrappers to interact with i2c-tools - import ( "FRMS/internal/pkg/logging" "bytes" @@ -12,56 +14,75 @@ import ( "strings" ) +// GetConnected returns a map of each device address and its current +// connection status. func GetConnected(b int) (map[int]bool, error) { - // Returns all the connected devices by address - // might just do this in bash and make it easier + bus := strconv.Itoa(b) devices := make(map[int]bool) // only keys + cmd := exec.Command("i2cdetect", "-y", "-r", bus) + var out bytes.Buffer var errs bytes.Buffer cmd.Stderr = &errs cmd.Stdout = &out + if err := cmd.Run(); err != nil { - logging.Debug(logging.DError, "I2C error performing scan. %v", errs.String()) + + logging.Debug( + logging.DError, + "I2C scan error %v", + errs.String(), + ) + return devices, err } + // parsing the command output outString := out.String() - // could split by \n too split := strings.SplitAfter(outString, ":") - // 1st entry is garbage headers and ending is always \n##: + + // 1st entry is reserved and ending is always \n##: split = split[1:] + // create empty slice for all the devices for i, v := range split { - lst := strings.Index(v, "\n") - trimmed := v[:lst] + lastDevice := strings.Index(v, "\n") + + trimmed := v[:lastDevice] trimmed = strings.Trim(trimmed, " ") - // trimmed now holds just possible sensor addresses + count := strings.Split(trimmed, " ") + for j, d := range count { // the first row has to be offset by 3 but after its just i*16 + j - offset := 0 + offset := j if i == 0 { - offset = 3 + offset += 3 } - addr := i*16 + j + offset + + addr := i*16 + offset + if !strings.Contains(d, "--") && !strings.Contains(d, "UU") { - // active devices[addr] = true } } } + return devices, nil } +// SendCmd sends an arbitrary command string to the device at addr on i2c bus b. +// Command will be converted from a string to bytes before +// attempting to be sent. func SendCmd(b, addr int, command string) (string, error) { - // sends an arbituary commnd over specified bus to int - // might make a base script for this too + var cmd *exec.Cmd bus := strconv.Itoa(b) - operation := "r20" // default read - frmt_cmd := "" // empty cmd + // default to an empty read + operation := "r20" + frmt_cmd := "" if command != "" { // command, do write operation = fmt.Sprintf("w%d", len(command)) // write @@ -75,14 +96,19 @@ func SendCmd(b, addr int, command string) (string, error) { // reading cmd = exec.Command("i2ctransfer", "-y", bus, fmt.Sprintf("%s@0x%x", operation, addr)) } - // exec command + + // execute command var out bytes.Buffer var errs bytes.Buffer cmd.Stderr = &errs cmd.Stdout = &out + if err := cmd.Run(); err != nil { - logging.Debug(logging.DError, "I2C error getting data! %v", err) + + logging.Debug(logging.DError, "I2C command error %v", err) + return "", err } + return out.String(), nil } diff --git a/reactor/needs_port/i2c/i2c.go b/reactor/needs_port/i2c/i2c.go new file mode 100644 index 0000000..867168c --- /dev/null +++ b/reactor/needs_port/i2c/i2c.go @@ -0,0 +1,114 @@ +// package i2c wraps the [i2c package] to interact with the i2c +// with devices on the bus +// +// [i2c package]: https://pkg.go.dev/periph.io/x/conn/v3/i2c#pkg-overview +package i2c + +import ( + "FRMS/internal/pkg/logging" + "bytes" + "fmt" + _ "log" + "os/exec" + "strconv" + "strings" +) + +// GetConnected returns a map of each device address and its current +// connection status. +func GetConnected(b int) (map[int]bool, error) { + + bus := strconv.Itoa(b) + devices := make(map[int]bool) // only keys + + cmd := exec.Command("i2cdetect", "-y", "-r", bus) + + var out bytes.Buffer + var errs bytes.Buffer + cmd.Stderr = &errs + cmd.Stdout = &out + + if err := cmd.Run(); err != nil { + + logging.Debug( + logging.DError, + "I2C scan error %v", + errs.String(), + ) + + return devices, err + } + + // parsing the command output + outString := out.String() + split := strings.SplitAfter(outString, ":") + + // 1st entry is reserved and ending is always \n##: + split = split[1:] + + // create empty slice for all the devices + for i, v := range split { + lastDevice := strings.Index(v, "\n") + + trimmed := v[:lastDevice] + trimmed = strings.Trim(trimmed, " ") + + count := strings.Split(trimmed, " ") + + for j, d := range count { + // the first row has to be offset by 3 but after its just i*16 + j + offset := j + if i == 0 { + offset += 3 + } + + addr := i*16 + offset + + if !strings.Contains(d, "--") && !strings.Contains(d, "UU") { + devices[addr] = true + } + } + } + + return devices, nil +} + +// SendCmd sends an arbitrary command string to the device at addr on i2c bus b. +// Command will be converted from a string to bytes before +// attempting to be sent. +func SendCmd(b, addr int, command string) (string, error) { + + var cmd *exec.Cmd + bus := strconv.Itoa(b) + // default to an empty read + operation := "r20" + frmt_cmd := "" + if command != "" { + // command, do write + operation = fmt.Sprintf("w%d", len(command)) // write + // formatting cmd + for _, char := range command { + // loop over string + frmt_cmd += fmt.Sprintf("0x%x", char) + } + cmd = exec.Command("i2ctransfer", "-y", bus, fmt.Sprintf("%s@0x%x", operation, addr), frmt_cmd) + } else { + // reading + cmd = exec.Command("i2ctransfer", "-y", bus, fmt.Sprintf("%s@0x%x", operation, addr)) + } + + // execute command + var out bytes.Buffer + var errs bytes.Buffer + cmd.Stderr = &errs + cmd.Stdout = &out + + if err := cmd.Run(); err != nil { + + logging.Debug(logging.DError, "I2C command error %v", err) + + return "", err + } + + return out.String(), nil +} diff --git a/reactor/needs_port/system/hwinfo.go b/reactor/needs_port/system/hwinfo.go index 08715b7..9e10693 100644 --- a/reactor/needs_port/system/hwinfo.go +++ b/reactor/needs_port/system/hwinfo.go @@ -1,118 +1,149 @@ +<<<<<<< HEAD:reactor/needs_port/system/hwinfo.go // package system uses linux commands to get hardware info for identifation +======= +// package system uses linux ip command to get hardware info from devices +>>>>>>> origin/wrapup:internal/pkg/system/hwinfo.go package system import ( "bytes" + "encoding/json" "errors" - "fmt" "hash/fnv" - "net" "os/exec" "strings" ) -func GetId() (int, error) { - // gets the mac address and hashes into consistent id - maccmd := fmt.Sprintf("ifconfig %v | awk '/ether / {print $2}'", et) - var stderr bytes.Buffer - var out bytes.Buffer - cmd := exec.Command("bash", "-c", maccmd) - cmd.Stdout = &out - cmd.Stderr = &stderr - if err := cmd.Run(); err != nil { - return 0, err - } - hash := fnv.New32a() - hash.Write(out.Bytes()) - id := hash.Sum32() - return int(id), nil +var ( + ErrBusNotFound = errors.New("bus not found for device") + ErrNoNetworkInterface = errors.New("no default network found") +) + +var HardwareInfo = &hardwareInfo{} + +type hardwareInfo struct { + MAC string + IP string + ID int + Model string +} + +// NetInterfaces is a struct to unmarshal the ip command json. +type netInterfaces []network + +// Networks holds relevant information for each network interface. +// Used to unmarshal ip command json. +type network struct { + Subnets []netInfo `json:"addr_info"` + Mac string `json:"address"` + Group string `json:"group"` + State string `json:"operstate"` +} + +type netInfo struct { + Family string `json:"family"` + Ip string `json:"local"` } -func GetIp() (string, error) { - ipcmd := "ip route get 1 | sed 's/^.*src \([^ ]*\).*$/\1/;q'" - var stderr bytes.Buffer - var out bytes.Buffer - cmd := exec.Command("bash", "-c", ipcmd) - cmd.Stdout = &out +func getInfo() error { + + var stderr, stdout bytes.Buffer + cmd := exec.Command("ip", "-j", "a") + cmd.Stdout = &stdout cmd.Stderr = &stderr + if err := cmd.Run(); err != nil { - return "", err + return err } - ip := strings.Trim(out.String(), " \n") - return ip, nil -} + var networks netInterfaces -func GetPort() (int, error) { - // obsolete - if addr, err := net.ResolveTCPAddr("tcp", ":0"); err != nil { - return 0, err - } else if lis, err := net.ListenTCP("tcp", addr); err != nil { - return 0, err - } else { - defer lis.Close() - return lis.Addr().(*net.TCPAddr).Port, nil + if err := json.Unmarshal(stdout.Bytes(), &networks); err != nil { + return err } + + // loop over found networks finding first default, IPv4 network currently + // UP (active). + // Eventually need to look only at wg interface which simplifies + // implementation. + + for _, network := range networks { + if network.Group == "default" && network.State == "UP" { + + for _, subnet := range network.Subnets { + if subnet.Family == "inet" { + + hash := fnv.New32a() + hash.Write([]byte(network.Mac)) + id := hash.Sum32() + + HardwareInfo.MAC = network.Mac + HardwareInfo.IP = subnet.Ip + HardwareInfo.ID = int(id) + + return nil + } + } + } + } + + return nil } -func GetBus() (int, error) { - // preset busses - busList := map[string]int{"raspberrypi": 1, "beaglebone": 2} - // vars - var bus int - var ok bool +func GetID() (int, error) { - if name, err =: GetModel(); err != nil { - return bus, err - } else if bus, ok = busList[b]; !ok { - return 0, errors.New(fmt.Sprintf("No bus for dev %s", b)) + if HardwareInfo.ID == 0 { + if err := getInfo(); err != nil { + return 0, err + } } - // returns correct bus - return bus, nil + return HardwareInfo.ID, nil } -func GetModel() (string, error) { - var stderr, out bytes.Buffer - cmd := exec.Command("bash", "-c", "hostname") - cmd.Stdout = &out - cmd.Stderr = &stderr - if err := cmd.Run(); err != nil { - return "", err +func GetIP() (string, error) { + + if HardwareInfo.IP == "" { + if err := getInfo(); err != nil { + return "", err + } } - b := out.String() - b = strings.Trim(b, " \n") - return b, nil + + return HardwareInfo.IP, nil } -func Get() error { - // responsible for filling out struct - //bus := map[string]int{"raspberrypi":1,"beaglebone":2} // eventually will replace this with a config file +func GetModel() (string, error) { - ipcmd := "ifconfig eth0 | awk '/inet / {print $2}'" - maccmd := "ifconfig eth0 | awk '/ether / {print $2}'" - devcmd := "lshw -C system 2>/dev/null | head -n 1" + if HardwareInfo.Model == "" { - res := [3]bytes.Buffer{} - var stderr bytes.Buffer - cmds := []string{ipcmd, maccmd, devcmd} - for i, c := range cmds { - cmd := exec.Command("bash", "-c", c) - cmd.Stdout = &res[i] + var stderr, out bytes.Buffer + cmd := exec.Command("uname", "-n") + cmd.Stdout = &out cmd.Stderr = &stderr - err := cmd.Run() - if err != nil { - return err + + if err := cmd.Run(); err != nil { + return "", err } + + b := out.String() + HardwareInfo.Model = strings.Trim(b, " \n") } - // formatting - ip := res[0].String() - ip = strings.Trim(ip, " \n") - hash := fnv.New32a() - hash.Write(res[1].Bytes()) + return HardwareInfo.Model, nil +} - b := res[2].String() - b = strings.Trim(b, " \n") - return nil +func GetBus() (int, error) { + // preset busses + busList := map[string]int{"raspberrypi": 1, "beaglebone": 2} + // vars + var bus int + var ok bool + + if name, err := GetModel(); err != nil { + return bus, err + } else if bus, ok = busList[name]; !ok { + return 0, ErrBusNotFound + } + + return bus, nil } diff --git a/reactor/needs_port/system/hwinfo_test.go b/reactor/needs_port/system/hwinfo_test.go new file mode 100644 index 0000000..c06a6ca --- /dev/null +++ b/reactor/needs_port/system/hwinfo_test.go @@ -0,0 +1,41 @@ +package system + +import ( + "testing" +) + +// TestGetInfo ensures that the array can be populated with device info. +func TestGetInfo(t *testing.T) { + if err := getInfo(); err != nil { + t.Fatalf(`getInfo() failed %v`, err) + } +} + +// TestGetIP tests that the IP is returned without error and not empty. +func TestGetIP(t *testing.T) { + if ip, err := GetIP(); err != nil || ip == "" { + t.Fatalf(`GetIP() failed, got %s, err: %v`, ip, err) + } +} + +// TestGetID tests that the ID is returned without error and not empty. +func TestGetID(t *testing.T) { + if id, err := GetID(); err != nil || id == 0 { + t.Fatalf(`GetID() failed, got %d %v`, id, err) + } +} + +// TestGetModel tests that the Model is returned without error and not empty. +func TestGetModel(t *testing.T) { + if model, err := GetModel(); err != nil || model == "" { + t.Fatalf(`GetModel() failed, got %s %v`, model, err) + } +} + +// TestGetModel tests that the correct error is thrown as the bus does not exist on the test rig. +func TestGetBus(t *testing.T) { + if bus, err := GetBus(); err != ErrBusNotFound { + t.Fatalf(`GetBus() should fail, got %d %v`, bus, err) + } + +} diff --git a/server/go.mod b/server/go.mod index 8782978..7e05d41 100644 --- a/server/go.mod +++ b/server/go.mod @@ -3,14 +3,19 @@ module FRMS go 1.18 require ( +<<<<<<< HEAD:server/go.mod github.com/gorilla/websocket v1.5.0 +======= +>>>>>>> origin/wrapup:go.mod github.com/influxdata/influxdb-client-go/v2 v2.9.1 github.com/spf13/viper v1.12.0 + github.com/stretchr/testify v1.7.1 google.golang.org/grpc v1.47.0 google.golang.org/protobuf v1.28.0 ) require ( + github.com/davecgh/go-spew v1.1.1 // indirect github.com/deepmap/oapi-codegen v1.8.2 // indirect github.com/fsnotify/fsnotify v1.5.4 // indirect github.com/golang/protobuf v1.5.2 // indirect @@ -21,14 +26,24 @@ require ( github.com/pelletier/go-toml v1.9.5 // indirect github.com/pelletier/go-toml/v2 v2.0.1 // indirect github.com/pkg/errors v0.9.1 // indirect +<<<<<<< HEAD:server/go.mod +======= + github.com/pmezard/go-difflib v1.0.0 // indirect +>>>>>>> origin/wrapup:go.mod github.com/spf13/afero v1.8.2 // indirect github.com/spf13/cast v1.5.0 // indirect github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/subosito/gotenv v1.3.0 // indirect +<<<<<<< HEAD:server/go.mod golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2 // indirect golang.org/x/sys v0.0.0-20220627191245-f75cf1eec38b // indirect golang.org/x/text v0.3.7 // indirect +======= + golang.org/x/net v0.10.0 // indirect + golang.org/x/sys v0.8.0 // indirect + golang.org/x/text v0.9.0 // indirect +>>>>>>> origin/wrapup:go.mod google.golang.org/genproto v0.0.0-20220519153652-3a47de7e79bd // indirect gopkg.in/ini.v1 v1.66.4 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/server/go.sum b/server/go.sum index 63ed049..2e874c9 100644 --- a/server/go.sum +++ b/server/go.sum @@ -139,8 +139,6 @@ github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+ github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8/go.mod h1:dvDLG8qkwmyD9a/MJJN3XJcT3xFxOKAvTZGvuZmac9g= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= -github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= -github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= @@ -299,8 +297,8 @@ golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= -golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2 h1:NWy5+hlRbC7HK+PmcXVUmW1IMyFce7to56IUvhUFm7Y= -golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= +golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -363,8 +361,8 @@ golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220627191245-f75cf1eec38b h1:2n253B2r0pYSmEV+UNCQoPfU/FiaizQEK5Gu4Bq4JE8= -golang.org/x/sys v0.0.0-20220627191245-f75cf1eec38b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -374,8 +372,13 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +<<<<<<< HEAD:server/go.sum golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +======= +golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +>>>>>>> origin/wrapup:go.sum golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= diff --git a/server/internal/pkg/grpc/handshake.pb.go b/server/internal/pkg/grpc/handshake.pb.go new file mode 100644 index 0000000..3444896 --- /dev/null +++ b/server/internal/pkg/grpc/handshake.pb.go @@ -0,0 +1,260 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.1 +// protoc v3.21.12 +// source: internal/pkg/grpc/handshake.proto + +package grpc + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type ReactorClientRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id uint32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` + Port uint32 `protobuf:"varint,2,opt,name=port,proto3" json:"port,omitempty"` // client gRPC port +} + +func (x *ReactorClientRequest) Reset() { + *x = ReactorClientRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_internal_pkg_grpc_handshake_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ReactorClientRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ReactorClientRequest) ProtoMessage() {} + +func (x *ReactorClientRequest) ProtoReflect() protoreflect.Message { + mi := &file_internal_pkg_grpc_handshake_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ReactorClientRequest.ProtoReflect.Descriptor instead. +func (*ReactorClientRequest) Descriptor() ([]byte, []int) { + return file_internal_pkg_grpc_handshake_proto_rawDescGZIP(), []int{0} +} + +func (x *ReactorClientRequest) GetId() uint32 { + if x != nil { + return x.Id + } + return 0 +} + +func (x *ReactorClientRequest) GetPort() uint32 { + if x != nil { + return x.Port + } + return 0 +} + +type ReactorClientResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id uint32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` + Url string `protobuf:"bytes,2,opt,name=url,proto3" json:"url,omitempty"` + Org string `protobuf:"bytes,3,opt,name=org,proto3" json:"org,omitempty"` + Token string `protobuf:"bytes,4,opt,name=token,proto3" json:"token,omitempty"` + Bucket string `protobuf:"bytes,5,opt,name=bucket,proto3" json:"bucket,omitempty"` +} + +func (x *ReactorClientResponse) Reset() { + *x = ReactorClientResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_internal_pkg_grpc_handshake_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ReactorClientResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ReactorClientResponse) ProtoMessage() {} + +func (x *ReactorClientResponse) ProtoReflect() protoreflect.Message { + mi := &file_internal_pkg_grpc_handshake_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ReactorClientResponse.ProtoReflect.Descriptor instead. +func (*ReactorClientResponse) Descriptor() ([]byte, []int) { + return file_internal_pkg_grpc_handshake_proto_rawDescGZIP(), []int{1} +} + +func (x *ReactorClientResponse) GetId() uint32 { + if x != nil { + return x.Id + } + return 0 +} + +func (x *ReactorClientResponse) GetUrl() string { + if x != nil { + return x.Url + } + return "" +} + +func (x *ReactorClientResponse) GetOrg() string { + if x != nil { + return x.Org + } + return "" +} + +func (x *ReactorClientResponse) GetToken() string { + if x != nil { + return x.Token + } + return "" +} + +func (x *ReactorClientResponse) GetBucket() string { + if x != nil { + return x.Bucket + } + return "" +} + +var File_internal_pkg_grpc_handshake_proto protoreflect.FileDescriptor + +var file_internal_pkg_grpc_handshake_proto_rawDesc = []byte{ + 0x0a, 0x21, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x67, + 0x72, 0x70, 0x63, 0x2f, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x12, 0x04, 0x67, 0x72, 0x70, 0x63, 0x22, 0x3a, 0x0a, 0x14, 0x52, 0x65, 0x61, + 0x63, 0x74, 0x6f, 0x72, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x02, 0x69, + 0x64, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x6f, 0x72, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, + 0x04, 0x70, 0x6f, 0x72, 0x74, 0x22, 0x79, 0x0a, 0x15, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, + 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x0e, + 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x02, 0x69, 0x64, 0x12, 0x10, + 0x0a, 0x03, 0x75, 0x72, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x75, 0x72, 0x6c, + 0x12, 0x10, 0x0a, 0x03, 0x6f, 0x72, 0x67, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6f, + 0x72, 0x67, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x05, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x16, 0x0a, 0x06, 0x62, 0x75, 0x63, 0x6b, + 0x65, 0x74, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x62, 0x75, 0x63, 0x6b, 0x65, 0x74, + 0x32, 0x5c, 0x0a, 0x09, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x12, 0x4f, 0x0a, + 0x14, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x48, 0x61, + 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x12, 0x1a, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x61, + 0x63, 0x74, 0x6f, 0x72, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x1b, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, + 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x13, + 0x5a, 0x11, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x67, + 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_internal_pkg_grpc_handshake_proto_rawDescOnce sync.Once + file_internal_pkg_grpc_handshake_proto_rawDescData = file_internal_pkg_grpc_handshake_proto_rawDesc +) + +func file_internal_pkg_grpc_handshake_proto_rawDescGZIP() []byte { + file_internal_pkg_grpc_handshake_proto_rawDescOnce.Do(func() { + file_internal_pkg_grpc_handshake_proto_rawDescData = protoimpl.X.CompressGZIP(file_internal_pkg_grpc_handshake_proto_rawDescData) + }) + return file_internal_pkg_grpc_handshake_proto_rawDescData +} + +var file_internal_pkg_grpc_handshake_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_internal_pkg_grpc_handshake_proto_goTypes = []interface{}{ + (*ReactorClientRequest)(nil), // 0: grpc.ReactorClientRequest + (*ReactorClientResponse)(nil), // 1: grpc.ReactorClientResponse +} +var file_internal_pkg_grpc_handshake_proto_depIdxs = []int32{ + 0, // 0: grpc.handshake.ReactorClientHandler:input_type -> grpc.ReactorClientRequest + 1, // 1: grpc.handshake.ReactorClientHandler:output_type -> grpc.ReactorClientResponse + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_internal_pkg_grpc_handshake_proto_init() } +func file_internal_pkg_grpc_handshake_proto_init() { + if File_internal_pkg_grpc_handshake_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_internal_pkg_grpc_handshake_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ReactorClientRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_internal_pkg_grpc_handshake_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ReactorClientResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_internal_pkg_grpc_handshake_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_internal_pkg_grpc_handshake_proto_goTypes, + DependencyIndexes: file_internal_pkg_grpc_handshake_proto_depIdxs, + MessageInfos: file_internal_pkg_grpc_handshake_proto_msgTypes, + }.Build() + File_internal_pkg_grpc_handshake_proto = out.File + file_internal_pkg_grpc_handshake_proto_rawDesc = nil + file_internal_pkg_grpc_handshake_proto_goTypes = nil + file_internal_pkg_grpc_handshake_proto_depIdxs = nil +} diff --git a/server/internal/pkg/grpc/handshake.proto b/server/internal/pkg/grpc/handshake.proto new file mode 100644 index 0000000..716477e --- /dev/null +++ b/server/internal/pkg/grpc/handshake.proto @@ -0,0 +1,21 @@ +syntax = "proto3"; +package grpc; + +option go_package = "internal/pkg/grpc"; + +service handshake { + rpc ReactorClientHandler(ReactorClientRequest) returns (ReactorClientResponse); +} + +message ReactorClientRequest { + uint32 id = 1; + uint32 port = 2; // client gRPC port +} + +message ReactorClientResponse { + uint32 id = 1; + string url = 2; + string org = 3; + string token = 4; + string bucket = 5; +} diff --git a/server/internal/pkg/grpc/handshake_grpc.pb.go b/server/internal/pkg/grpc/handshake_grpc.pb.go new file mode 100644 index 0000000..a78492f --- /dev/null +++ b/server/internal/pkg/grpc/handshake_grpc.pb.go @@ -0,0 +1,105 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.2.0 +// - protoc v3.21.12 +// source: internal/pkg/grpc/handshake.proto + +package grpc + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// HandshakeClient is the client API for Handshake service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type HandshakeClient interface { + ReactorClientHandler(ctx context.Context, in *ReactorClientRequest, opts ...grpc.CallOption) (*ReactorClientResponse, error) +} + +type handshakeClient struct { + cc grpc.ClientConnInterface +} + +func NewHandshakeClient(cc grpc.ClientConnInterface) HandshakeClient { + return &handshakeClient{cc} +} + +func (c *handshakeClient) ReactorClientHandler(ctx context.Context, in *ReactorClientRequest, opts ...grpc.CallOption) (*ReactorClientResponse, error) { + out := new(ReactorClientResponse) + err := c.cc.Invoke(ctx, "/grpc.handshake/ReactorClientHandler", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// HandshakeServer is the server API for Handshake service. +// All implementations must embed UnimplementedHandshakeServer +// for forward compatibility +type HandshakeServer interface { + ReactorClientHandler(context.Context, *ReactorClientRequest) (*ReactorClientResponse, error) + mustEmbedUnimplementedHandshakeServer() +} + +// UnimplementedHandshakeServer must be embedded to have forward compatible implementations. +type UnimplementedHandshakeServer struct { +} + +func (UnimplementedHandshakeServer) ReactorClientHandler(context.Context, *ReactorClientRequest) (*ReactorClientResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method ReactorClientHandler not implemented") +} +func (UnimplementedHandshakeServer) mustEmbedUnimplementedHandshakeServer() {} + +// UnsafeHandshakeServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to HandshakeServer will +// result in compilation errors. +type UnsafeHandshakeServer interface { + mustEmbedUnimplementedHandshakeServer() +} + +func RegisterHandshakeServer(s grpc.ServiceRegistrar, srv HandshakeServer) { + s.RegisterService(&Handshake_ServiceDesc, srv) +} + +func _Handshake_ReactorClientHandler_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ReactorClientRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(HandshakeServer).ReactorClientHandler(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/grpc.handshake/ReactorClientHandler", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(HandshakeServer).ReactorClientHandler(ctx, req.(*ReactorClientRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// Handshake_ServiceDesc is the grpc.ServiceDesc for Handshake service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var Handshake_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "grpc.handshake", + HandlerType: (*HandshakeServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "ReactorClientHandler", + Handler: _Handshake_ReactorClientHandler_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "internal/pkg/grpc/handshake.proto", +} diff --git a/server/internal/pkg/grpc/monitoring.pb.go b/server/internal/pkg/grpc/monitoring.pb.go index f47a538..59cd1d3 100644 --- a/server/internal/pkg/grpc/monitoring.pb.go +++ b/server/internal/pkg/grpc/monitoring.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.28.0 -// protoc v3.12.4 +// protoc-gen-go v1.28.1 +// protoc v3.21.12 // source: internal/pkg/grpc/monitoring.proto package grpc @@ -20,56 +20,7 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) -type Status int32 - -const ( - Status_DEAD Status = 0 - Status_ALIVE Status = 1 - Status_UNKOWN Status = 2 -) - -// Enum value maps for Status. -var ( - Status_name = map[int32]string{ - 0: "DEAD", - 1: "ALIVE", - 2: "UNKOWN", - } - Status_value = map[string]int32{ - "DEAD": 0, - "ALIVE": 1, - "UNKOWN": 2, - } -) - -func (x Status) Enum() *Status { - p := new(Status) - *p = x - return p -} - -func (x Status) String() string { - return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) -} - -func (Status) Descriptor() protoreflect.EnumDescriptor { - return file_internal_pkg_grpc_monitoring_proto_enumTypes[0].Descriptor() -} - -func (Status) Type() protoreflect.EnumType { - return &file_internal_pkg_grpc_monitoring_proto_enumTypes[0] -} - -func (x Status) Number() protoreflect.EnumNumber { - return protoreflect.EnumNumber(x) -} - -// Deprecated: Use Status.Descriptor instead. -func (Status) EnumDescriptor() ([]byte, []int) { - return file_internal_pkg_grpc_monitoring_proto_rawDescGZIP(), []int{0} -} - -type ReactorStatusResponse struct { +type ReactorAck struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields @@ -77,8 +28,8 @@ type ReactorStatusResponse struct { Id int32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` } -func (x *ReactorStatusResponse) Reset() { - *x = ReactorStatusResponse{} +func (x *ReactorAck) Reset() { + *x = ReactorAck{} if protoimpl.UnsafeEnabled { mi := &file_internal_pkg_grpc_monitoring_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -86,13 +37,13 @@ func (x *ReactorStatusResponse) Reset() { } } -func (x *ReactorStatusResponse) String() string { +func (x *ReactorAck) String() string { return protoimpl.X.MessageStringOf(x) } -func (*ReactorStatusResponse) ProtoMessage() {} +func (*ReactorAck) ProtoMessage() {} -func (x *ReactorStatusResponse) ProtoReflect() protoreflect.Message { +func (x *ReactorAck) ProtoReflect() protoreflect.Message { mi := &file_internal_pkg_grpc_monitoring_proto_msgTypes[0] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -104,30 +55,28 @@ func (x *ReactorStatusResponse) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use ReactorStatusResponse.ProtoReflect.Descriptor instead. -func (*ReactorStatusResponse) Descriptor() ([]byte, []int) { +// Deprecated: Use ReactorAck.ProtoReflect.Descriptor instead. +func (*ReactorAck) Descriptor() ([]byte, []int) { return file_internal_pkg_grpc_monitoring_proto_rawDescGZIP(), []int{0} } -func (x *ReactorStatusResponse) GetId() int32 { +func (x *ReactorAck) GetId() int32 { if x != nil { return x.Id } return 0 } -type ReactorStatusPing struct { +type ReactorPing struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields Id int32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` - // new devices - Devices []*Device `protobuf:"bytes,2,rep,name=devices,proto3" json:"devices,omitempty"` } -func (x *ReactorStatusPing) Reset() { - *x = ReactorStatusPing{} +func (x *ReactorPing) Reset() { + *x = ReactorPing{} if protoimpl.UnsafeEnabled { mi := &file_internal_pkg_grpc_monitoring_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -135,13 +84,13 @@ func (x *ReactorStatusPing) Reset() { } } -func (x *ReactorStatusPing) String() string { +func (x *ReactorPing) String() string { return protoimpl.X.MessageStringOf(x) } -func (*ReactorStatusPing) ProtoMessage() {} +func (*ReactorPing) ProtoMessage() {} -func (x *ReactorStatusPing) ProtoReflect() protoreflect.Message { +func (x *ReactorPing) ProtoReflect() protoreflect.Message { mi := &file_internal_pkg_grpc_monitoring_proto_msgTypes[1] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -153,108 +102,34 @@ func (x *ReactorStatusPing) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use ReactorStatusPing.ProtoReflect.Descriptor instead. -func (*ReactorStatusPing) Descriptor() ([]byte, []int) { +// Deprecated: Use ReactorPing.ProtoReflect.Descriptor instead. +func (*ReactorPing) Descriptor() ([]byte, []int) { return file_internal_pkg_grpc_monitoring_proto_rawDescGZIP(), []int{1} } -func (x *ReactorStatusPing) GetId() int32 { +func (x *ReactorPing) GetId() int32 { if x != nil { return x.Id } return 0 } -func (x *ReactorStatusPing) GetDevices() []*Device { - if x != nil { - return x.Devices - } - return nil -} - -type Device struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Addr int32 `protobuf:"varint,1,opt,name=addr,proto3" json:"addr,omitempty"` // i2c addr - Status Status `protobuf:"varint,2,opt,name=status,proto3,enum=grpc.Status" json:"status,omitempty"` // most recent status -} - -func (x *Device) Reset() { - *x = Device{} - if protoimpl.UnsafeEnabled { - mi := &file_internal_pkg_grpc_monitoring_proto_msgTypes[2] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *Device) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*Device) ProtoMessage() {} - -func (x *Device) ProtoReflect() protoreflect.Message { - mi := &file_internal_pkg_grpc_monitoring_proto_msgTypes[2] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use Device.ProtoReflect.Descriptor instead. -func (*Device) Descriptor() ([]byte, []int) { - return file_internal_pkg_grpc_monitoring_proto_rawDescGZIP(), []int{2} -} - -func (x *Device) GetAddr() int32 { - if x != nil { - return x.Addr - } - return 0 -} - -func (x *Device) GetStatus() Status { - if x != nil { - return x.Status - } - return Status_DEAD -} - var File_internal_pkg_grpc_monitoring_proto protoreflect.FileDescriptor var file_internal_pkg_grpc_monitoring_proto_rawDesc = []byte{ 0x0a, 0x22, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x69, 0x6e, 0x67, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x04, 0x67, 0x72, 0x70, 0x63, 0x22, 0x27, 0x0a, 0x15, 0x52, 0x65, - 0x61, 0x63, 0x74, 0x6f, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, - 0x02, 0x69, 0x64, 0x22, 0x4b, 0x0a, 0x11, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x53, 0x74, - 0x61, 0x74, 0x75, 0x73, 0x50, 0x69, 0x6e, 0x67, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x05, 0x52, 0x02, 0x69, 0x64, 0x12, 0x26, 0x0a, 0x07, 0x64, 0x65, 0x76, 0x69, - 0x63, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0c, 0x2e, 0x67, 0x72, 0x70, 0x63, - 0x2e, 0x44, 0x65, 0x76, 0x69, 0x63, 0x65, 0x52, 0x07, 0x64, 0x65, 0x76, 0x69, 0x63, 0x65, 0x73, - 0x22, 0x42, 0x0a, 0x06, 0x44, 0x65, 0x76, 0x69, 0x63, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x61, 0x64, - 0x64, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x04, 0x61, 0x64, 0x64, 0x72, 0x12, 0x24, - 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0c, - 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x06, 0x73, 0x74, - 0x61, 0x74, 0x75, 0x73, 0x2a, 0x29, 0x0a, 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x08, - 0x0a, 0x04, 0x44, 0x45, 0x41, 0x44, 0x10, 0x00, 0x12, 0x09, 0x0a, 0x05, 0x41, 0x4c, 0x49, 0x56, - 0x45, 0x10, 0x01, 0x12, 0x0a, 0x0a, 0x06, 0x55, 0x4e, 0x4b, 0x4f, 0x57, 0x4e, 0x10, 0x02, 0x32, - 0x5a, 0x0a, 0x0a, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x69, 0x6e, 0x67, 0x12, 0x4c, 0x0a, - 0x14, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x48, 0x61, - 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x12, 0x17, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x61, - 0x63, 0x74, 0x6f, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x50, 0x69, 0x6e, 0x67, 0x1a, 0x1b, - 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x53, 0x74, 0x61, - 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x13, 0x5a, 0x11, 0x69, - 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x67, 0x72, 0x70, 0x63, - 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x04, 0x67, 0x72, 0x70, 0x63, 0x22, 0x1c, 0x0a, 0x0a, 0x52, 0x65, + 0x61, 0x63, 0x74, 0x6f, 0x72, 0x41, 0x63, 0x6b, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x05, 0x52, 0x02, 0x69, 0x64, 0x22, 0x1d, 0x0a, 0x0b, 0x52, 0x65, 0x61, 0x63, + 0x74, 0x6f, 0x72, 0x50, 0x69, 0x6e, 0x67, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x05, 0x52, 0x02, 0x69, 0x64, 0x32, 0x49, 0x0a, 0x0a, 0x6d, 0x6f, 0x6e, 0x69, 0x74, + 0x6f, 0x72, 0x69, 0x6e, 0x67, 0x12, 0x3b, 0x0a, 0x12, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, + 0x50, 0x69, 0x6e, 0x67, 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x12, 0x11, 0x2e, 0x67, 0x72, + 0x70, 0x63, 0x2e, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x50, 0x69, 0x6e, 0x67, 0x1a, 0x10, + 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x41, 0x63, 0x6b, + 0x28, 0x01, 0x42, 0x13, 0x5a, 0x11, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, + 0x6b, 0x67, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -269,24 +144,19 @@ func file_internal_pkg_grpc_monitoring_proto_rawDescGZIP() []byte { return file_internal_pkg_grpc_monitoring_proto_rawDescData } -var file_internal_pkg_grpc_monitoring_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_internal_pkg_grpc_monitoring_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_internal_pkg_grpc_monitoring_proto_msgTypes = make([]protoimpl.MessageInfo, 2) var file_internal_pkg_grpc_monitoring_proto_goTypes = []interface{}{ - (Status)(0), // 0: grpc.Status - (*ReactorStatusResponse)(nil), // 1: grpc.ReactorStatusResponse - (*ReactorStatusPing)(nil), // 2: grpc.ReactorStatusPing - (*Device)(nil), // 3: grpc.Device + (*ReactorAck)(nil), // 0: grpc.ReactorAck + (*ReactorPing)(nil), // 1: grpc.ReactorPing } var file_internal_pkg_grpc_monitoring_proto_depIdxs = []int32{ - 3, // 0: grpc.ReactorStatusPing.devices:type_name -> grpc.Device - 0, // 1: grpc.Device.status:type_name -> grpc.Status - 2, // 2: grpc.monitoring.ReactorStatusHandler:input_type -> grpc.ReactorStatusPing - 1, // 3: grpc.monitoring.ReactorStatusHandler:output_type -> grpc.ReactorStatusResponse - 3, // [3:4] is the sub-list for method output_type - 2, // [2:3] is the sub-list for method input_type - 2, // [2:2] is the sub-list for extension type_name - 2, // [2:2] is the sub-list for extension extendee - 0, // [0:2] is the sub-list for field type_name + 1, // 0: grpc.monitoring.ReactorPingHandler:input_type -> grpc.ReactorPing + 0, // 1: grpc.monitoring.ReactorPingHandler:output_type -> grpc.ReactorAck + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name } func init() { file_internal_pkg_grpc_monitoring_proto_init() } @@ -296,7 +166,7 @@ func file_internal_pkg_grpc_monitoring_proto_init() { } if !protoimpl.UnsafeEnabled { file_internal_pkg_grpc_monitoring_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ReactorStatusResponse); i { + switch v := v.(*ReactorAck); i { case 0: return &v.state case 1: @@ -308,19 +178,7 @@ func file_internal_pkg_grpc_monitoring_proto_init() { } } file_internal_pkg_grpc_monitoring_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ReactorStatusPing); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_internal_pkg_grpc_monitoring_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Device); i { + switch v := v.(*ReactorPing); i { case 0: return &v.state case 1: @@ -337,14 +195,13 @@ func file_internal_pkg_grpc_monitoring_proto_init() { File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_internal_pkg_grpc_monitoring_proto_rawDesc, - NumEnums: 1, - NumMessages: 3, + NumEnums: 0, + NumMessages: 2, NumExtensions: 0, NumServices: 1, }, GoTypes: file_internal_pkg_grpc_monitoring_proto_goTypes, DependencyIndexes: file_internal_pkg_grpc_monitoring_proto_depIdxs, - EnumInfos: file_internal_pkg_grpc_monitoring_proto_enumTypes, MessageInfos: file_internal_pkg_grpc_monitoring_proto_msgTypes, }.Build() File_internal_pkg_grpc_monitoring_proto = out.File diff --git a/server/internal/pkg/grpc/monitoring.proto b/server/internal/pkg/grpc/monitoring.proto index 128d9ad..8f307b2 100644 --- a/server/internal/pkg/grpc/monitoring.proto +++ b/server/internal/pkg/grpc/monitoring.proto @@ -4,26 +4,13 @@ package grpc; option go_package = "internal/pkg/grpc"; service monitoring { - rpc ReactorStatusHandler(ReactorStatusPing) returns (ReactorStatusResponse); + rpc ReactorPingHandler(stream ReactorPing) returns (ReactorAck); } -message ReactorStatusResponse { +message ReactorAck { int32 id = 1; } -message ReactorStatusPing { +message ReactorPing { int32 id = 1; - // new devices - repeated Device devices = 2; -} - -enum Status { - DEAD = 0; - ALIVE = 1; - UNKOWN = 2; -} - -message Device { - int32 addr = 1; // i2c addr - Status status = 2; // most recent status } diff --git a/server/internal/pkg/grpc/monitoring_grpc.pb.go b/server/internal/pkg/grpc/monitoring_grpc.pb.go index cc3461b..75a39d6 100644 --- a/server/internal/pkg/grpc/monitoring_grpc.pb.go +++ b/server/internal/pkg/grpc/monitoring_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.2.0 -// - protoc v3.12.4 +// - protoc v3.21.12 // source: internal/pkg/grpc/monitoring.proto package grpc @@ -22,7 +22,7 @@ const _ = grpc.SupportPackageIsVersion7 // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type MonitoringClient interface { - ReactorStatusHandler(ctx context.Context, in *ReactorStatusPing, opts ...grpc.CallOption) (*ReactorStatusResponse, error) + ReactorPingHandler(ctx context.Context, opts ...grpc.CallOption) (Monitoring_ReactorPingHandlerClient, error) } type monitoringClient struct { @@ -33,20 +33,45 @@ func NewMonitoringClient(cc grpc.ClientConnInterface) MonitoringClient { return &monitoringClient{cc} } -func (c *monitoringClient) ReactorStatusHandler(ctx context.Context, in *ReactorStatusPing, opts ...grpc.CallOption) (*ReactorStatusResponse, error) { - out := new(ReactorStatusResponse) - err := c.cc.Invoke(ctx, "/grpc.monitoring/ReactorStatusHandler", in, out, opts...) +func (c *monitoringClient) ReactorPingHandler(ctx context.Context, opts ...grpc.CallOption) (Monitoring_ReactorPingHandlerClient, error) { + stream, err := c.cc.NewStream(ctx, &Monitoring_ServiceDesc.Streams[0], "/grpc.monitoring/ReactorPingHandler", opts...) if err != nil { return nil, err } - return out, nil + x := &monitoringReactorPingHandlerClient{stream} + return x, nil +} + +type Monitoring_ReactorPingHandlerClient interface { + Send(*ReactorPing) error + CloseAndRecv() (*ReactorAck, error) + grpc.ClientStream +} + +type monitoringReactorPingHandlerClient struct { + grpc.ClientStream +} + +func (x *monitoringReactorPingHandlerClient) Send(m *ReactorPing) error { + return x.ClientStream.SendMsg(m) +} + +func (x *monitoringReactorPingHandlerClient) CloseAndRecv() (*ReactorAck, error) { + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + m := new(ReactorAck) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil } // MonitoringServer is the server API for Monitoring service. // All implementations must embed UnimplementedMonitoringServer // for forward compatibility type MonitoringServer interface { - ReactorStatusHandler(context.Context, *ReactorStatusPing) (*ReactorStatusResponse, error) + ReactorPingHandler(Monitoring_ReactorPingHandlerServer) error mustEmbedUnimplementedMonitoringServer() } @@ -54,8 +79,8 @@ type MonitoringServer interface { type UnimplementedMonitoringServer struct { } -func (UnimplementedMonitoringServer) ReactorStatusHandler(context.Context, *ReactorStatusPing) (*ReactorStatusResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method ReactorStatusHandler not implemented") +func (UnimplementedMonitoringServer) ReactorPingHandler(Monitoring_ReactorPingHandlerServer) error { + return status.Errorf(codes.Unimplemented, "method ReactorPingHandler not implemented") } func (UnimplementedMonitoringServer) mustEmbedUnimplementedMonitoringServer() {} @@ -70,22 +95,30 @@ func RegisterMonitoringServer(s grpc.ServiceRegistrar, srv MonitoringServer) { s.RegisterService(&Monitoring_ServiceDesc, srv) } -func _Monitoring_ReactorStatusHandler_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(ReactorStatusPing) - if err := dec(in); err != nil { +func _Monitoring_ReactorPingHandler_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(MonitoringServer).ReactorPingHandler(&monitoringReactorPingHandlerServer{stream}) +} + +type Monitoring_ReactorPingHandlerServer interface { + SendAndClose(*ReactorAck) error + Recv() (*ReactorPing, error) + grpc.ServerStream +} + +type monitoringReactorPingHandlerServer struct { + grpc.ServerStream +} + +func (x *monitoringReactorPingHandlerServer) SendAndClose(m *ReactorAck) error { + return x.ServerStream.SendMsg(m) +} + +func (x *monitoringReactorPingHandlerServer) Recv() (*ReactorPing, error) { + m := new(ReactorPing) + if err := x.ServerStream.RecvMsg(m); err != nil { return nil, err } - if interceptor == nil { - return srv.(MonitoringServer).ReactorStatusHandler(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/grpc.monitoring/ReactorStatusHandler", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(MonitoringServer).ReactorStatusHandler(ctx, req.(*ReactorStatusPing)) - } - return interceptor(ctx, in, info, handler) + return m, nil } // Monitoring_ServiceDesc is the grpc.ServiceDesc for Monitoring service. @@ -94,12 +127,13 @@ func _Monitoring_ReactorStatusHandler_Handler(srv interface{}, ctx context.Conte var Monitoring_ServiceDesc = grpc.ServiceDesc{ ServiceName: "grpc.monitoring", HandlerType: (*MonitoringServer)(nil), - Methods: []grpc.MethodDesc{ + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ { - MethodName: "ReactorStatusHandler", - Handler: _Monitoring_ReactorStatusHandler_Handler, + StreamName: "ReactorPingHandler", + Handler: _Monitoring_ReactorPingHandler_Handler, + ClientStreams: true, }, }, - Streams: []grpc.StreamDesc{}, Metadata: "internal/pkg/grpc/monitoring.proto", } diff --git a/server/internal/pkg/grpc/server_grpc.pb.go b/server/internal/pkg/grpc/server_grpc.pb.go index eaa3c5d..a78492f 100644 --- a/server/internal/pkg/grpc/server_grpc.pb.go +++ b/server/internal/pkg/grpc/server_grpc.pb.go @@ -1,8 +1,8 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.2.0 -// - protoc v3.6.1 -// source: internal/pkg/grpc/server.proto +// - protoc v3.21.12 +// source: internal/pkg/grpc/handshake.proto package grpc @@ -22,7 +22,7 @@ const _ = grpc.SupportPackageIsVersion7 // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type HandshakeClient interface { - ClientDiscoveryHandler(ctx context.Context, in *ClientRequest, opts ...grpc.CallOption) (*ClientResponse, error) + ReactorClientHandler(ctx context.Context, in *ReactorClientRequest, opts ...grpc.CallOption) (*ReactorClientResponse, error) } type handshakeClient struct { @@ -33,9 +33,9 @@ func NewHandshakeClient(cc grpc.ClientConnInterface) HandshakeClient { return &handshakeClient{cc} } -func (c *handshakeClient) ClientDiscoveryHandler(ctx context.Context, in *ClientRequest, opts ...grpc.CallOption) (*ClientResponse, error) { - out := new(ClientResponse) - err := c.cc.Invoke(ctx, "/grpc.handshake/ClientDiscoveryHandler", in, out, opts...) +func (c *handshakeClient) ReactorClientHandler(ctx context.Context, in *ReactorClientRequest, opts ...grpc.CallOption) (*ReactorClientResponse, error) { + out := new(ReactorClientResponse) + err := c.cc.Invoke(ctx, "/grpc.handshake/ReactorClientHandler", in, out, opts...) if err != nil { return nil, err } @@ -46,7 +46,7 @@ func (c *handshakeClient) ClientDiscoveryHandler(ctx context.Context, in *Client // All implementations must embed UnimplementedHandshakeServer // for forward compatibility type HandshakeServer interface { - ClientDiscoveryHandler(context.Context, *ClientRequest) (*ClientResponse, error) + ReactorClientHandler(context.Context, *ReactorClientRequest) (*ReactorClientResponse, error) mustEmbedUnimplementedHandshakeServer() } @@ -54,8 +54,8 @@ type HandshakeServer interface { type UnimplementedHandshakeServer struct { } -func (UnimplementedHandshakeServer) ClientDiscoveryHandler(context.Context, *ClientRequest) (*ClientResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method ClientDiscoveryHandler not implemented") +func (UnimplementedHandshakeServer) ReactorClientHandler(context.Context, *ReactorClientRequest) (*ReactorClientResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method ReactorClientHandler not implemented") } func (UnimplementedHandshakeServer) mustEmbedUnimplementedHandshakeServer() {} @@ -70,20 +70,20 @@ func RegisterHandshakeServer(s grpc.ServiceRegistrar, srv HandshakeServer) { s.RegisterService(&Handshake_ServiceDesc, srv) } -func _Handshake_ClientDiscoveryHandler_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(ClientRequest) +func _Handshake_ReactorClientHandler_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ReactorClientRequest) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(HandshakeServer).ClientDiscoveryHandler(ctx, in) + return srv.(HandshakeServer).ReactorClientHandler(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/grpc.handshake/ClientDiscoveryHandler", + FullMethod: "/grpc.handshake/ReactorClientHandler", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(HandshakeServer).ClientDiscoveryHandler(ctx, req.(*ClientRequest)) + return srv.(HandshakeServer).ReactorClientHandler(ctx, req.(*ReactorClientRequest)) } return interceptor(ctx, in, info, handler) } @@ -96,10 +96,10 @@ var Handshake_ServiceDesc = grpc.ServiceDesc{ HandlerType: (*HandshakeServer)(nil), Methods: []grpc.MethodDesc{ { - MethodName: "ClientDiscoveryHandler", - Handler: _Handshake_ClientDiscoveryHandler_Handler, + MethodName: "ReactorClientHandler", + Handler: _Handshake_ReactorClientHandler_Handler, }, }, Streams: []grpc.StreamDesc{}, - Metadata: "internal/pkg/grpc/server.proto", + Metadata: "internal/pkg/grpc/handshake.proto", } diff --git a/server/internal/pkg/manager/manager.go b/server/internal/pkg/manager/manager.go index ecdd3a0..eaf4376 100644 --- a/server/internal/pkg/manager/manager.go +++ b/server/internal/pkg/manager/manager.go @@ -1,3 +1,4 @@ +// Package manager provides basic manager functions to embed into higher level managers. package manager import ( @@ -9,91 +10,128 @@ import ( "time" ) -// basic manager for starting/stopping checks plus built in heartbeat for downtime detection -// used across server/reactor +var ( + ErrInvalidMaxConn = errors.New("invalid max connection attempts") + ErrManagerInactive = errors.New("manager inactive") + ErrManagerActive = errors.New("manager active") + ErrMaxAttemptsExceeded = errors.New("max connection attempts exceeded") +) -type Connection struct { - Attempts float64 // float for pow - MaxAttempts int // max allowed - sync.Mutex -} +// Status is used as an enum for the current status. +// Could be expanded to include others such as killed, sleeping, etc. +type Status int + +const ( + Inactive Status = 0 + Active Status = 1 +) + +// MaxConnAttempts is the maximum allowable connection attempts. +// Limited to 255 to prevent excessive timeout scaling. +const MaxConnAttempts = 0xFF +// Manager is a general purpose structure to implement basic capabilities. +// Stores state in active variable, modified through atomic swaps. +// Embeds a connection to be used in generating timeouts. type Manager struct { - *Connection // embedded for timeout stuff - Active int32 // atomic checks + *connection + active int32 } -func New(maxCon int) *Manager { +// New creates a new manager with the maxConn maximum attempts. +// Throws ErrInvalidMaxConn if maxConn is not in [0, MaxConnAttempts]. +func New(maxConn int) (*Manager, error) { + + if maxConn < 0 || maxConn > MaxConnAttempts { + return &Manager{}, ErrInvalidMaxConn + } + + c := &connection{MaxAttempts: maxConn} - c := &Connection{MaxAttempts: maxCon} m := &Manager{ - Connection: c, + connection: c, } - return m + return m, nil } +// Start attempts to start the manager. +// Throws ErrManagerActive error if the manager is already active. func (m *Manager) Start() error { - // atomically checks/updates status - if atomic.CompareAndSwapInt32(&m.Active, 0, 1) { + if atomic.CompareAndSwapInt32(&m.active, 0, 1) { m.ResetConnections() return nil } - // already running - return errors.New("Manager already started!") + + return ErrManagerActive } -func (m *Manager) Exit() error { - if atomic.CompareAndSwapInt32(&m.Active, 1, 0) { +// Stop attempts to stop the manager. +// Throws ErrManagerInactive error if the manager is already inactive. +func (m *Manager) Stop() error { + if atomic.CompareAndSwapInt32(&m.active, 1, 0) { return nil } - return errors.New("Manager not active!") + + return ErrManagerInactive } -func (m *Manager) IsActive() int { - return int(atomic.LoadInt32(&m.Active)) +// IsActive returns the current ManagerStatus. +func (m *Manager) IsActive() Status { + return Status(atomic.LoadInt32(&m.active)) } -// Heartbeat tracker +// HeartBeat will send an empty struct over ping every hb (scale). +// The pings are sent ever (hb + rand(interval)) * scale. +// Where scale is typically time.Millisecond, time.Second etc. +// Will close the channel on exit to prevent leaks. +func (m *Manager) HeartBeat( + ping chan struct{}, + hb, interval int, + scale time.Duration) { -func (m *Manager) HeartBeat(ping chan struct{}, hb, interval int, scale time.Duration) { - // pings channel every (HB + randInterval) * time.Duration - // can be used anywhere a heartbeat is needed - // closes channel on exit + for m.IsActive() == Active { + ping <- struct{}{} - if interval > 0 { - rand.Seed(time.Now().UnixNano()) - } - - for atomic.LoadInt32(&m.Active) > 0 { - // atomoic read may cause memory leak, can revisit - ping <- struct{}{} // no mem sleep := time.Duration(hb-interval) * scale + if interval > 0 { sleep += time.Duration(rand.Intn(2*interval)) * scale } + time.Sleep(sleep) } - // exited, close chan + close(ping) } -// connection timeout generator +// connection keeps track of maximum and current number of connection attempts. +// Concurrency safe as it is protected by a mutex. +type connection struct { + Attempts float64 + MaxAttempts int + sync.Mutex +} + +// Timeout returns an exponentially decaying timeout based on attempts. +// Returns timeout of type time.Duration in milliseconds. +// Returns ErrMaxAttemptsExceeded if too many attempts are tried. +func (c *connection) Timeout() (time.Duration, error) { -func (c *Connection) Timeout() (time.Duration, error) { - // exponential backoff c.Lock() defer c.Unlock() + if int(c.Attempts) < c.MaxAttempts { - c.Attempts += 1 - // 50, 100, 200... to := time.Duration(50*math.Pow(2, c.Attempts)) * time.Millisecond + c.Attempts += 1 return to, nil } - return 0, errors.New("Connection Failed") + + return 0, ErrMaxAttemptsExceeded } -func (c *Connection) ResetConnections() { +// ResetConnections sets the current connection attempts back to 0. +func (c *connection) ResetConnections() { c.Lock() defer c.Unlock() c.Attempts = 0 diff --git a/server/internal/pkg/manager/manager_test.go b/server/internal/pkg/manager/manager_test.go new file mode 100644 index 0000000..017ad24 --- /dev/null +++ b/server/internal/pkg/manager/manager_test.go @@ -0,0 +1,206 @@ +package manager + +import ( + "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +// creating, starting and stopping tests + +// newManager is a helper for creating new managers for tests +func newManager(conn int, want error, t *testing.T) *Manager { + var manager *Manager + var err error + assert := assert.New(t) + + manager, err = New(conn) + + if err != want { + t.Fatalf( + `New(%d) = %v, %v, %d max connections failed`, + conn, + manager, + err, + conn, + ) + } + + assert.Equal(manager.IsActive(), Inactive, "manager should start inactive") + + return manager +} + +// TestEmptyManager creates a new manager with 0 max connections +func TestEmptyManager(t *testing.T) { + conn := 0 + newManager(conn, nil, t) +} + +// TestPostiveManager creates a new manager with valid max connections +func TestPositiveManager(t *testing.T) { + conn := rand.Intn(MaxConnAttempts) + newManager(conn, nil, t) +} + +// TestNegativeManager creates a new manager with negative max connections +func TestNegativeManager(t *testing.T) { + conn := -1 * rand.Intn(MaxConnAttempts) + newManager(conn, ErrInvalidMaxConn, t) +} + +// TestInvalidManager creates a new manager with large max connections +func TestInvalidManager(t *testing.T) { + conn := MaxConnAttempts + 0xf + newManager(conn, ErrInvalidMaxConn, t) +} + +// TestManagerLifeCycle tests that a manager can start and exit several times +func TestManagerLifeCycle(t *testing.T) { + + var manager *Manager + assert := assert.New(t) + + conn := rand.Intn(MaxConnAttempts) + manager = newManager(conn, nil, t) + + cycles := 10 + + // starting and stopping sequentially + for i := 0; i < cycles; i++ { + + assert.NoError(manager.Start(), "starting manager failed") + assert.Equal(manager.IsActive(), Active, "manager inactive after start") + + assert.NoError(manager.Stop(), "stopping manager failed") + assert.Equal(manager.IsActive(), Inactive, "manager active after stop") + } +} + +// TestManagerStopFail tests that stopping an inactive manager will error +func TestManagerStopFail(t *testing.T) { + + var manager *Manager + assert := assert.New(t) + + conn := rand.Intn(MaxConnAttempts) + manager = newManager(conn, nil, t) + + assert.NoError(manager.Start(), "starting manager failed") + + // stopping sequentially + assert.NoError(manager.Stop(), "stopping manager failed") + assert.Error(manager.Stop(), "stopping inactive manager should fail") +} + +// TestManagerStartFail tests that starting an active manager will error +func TestManagerStartFail(t *testing.T) { + + var manager *Manager + assert := assert.New(t) + + conn := rand.Intn(MaxConnAttempts) + manager = newManager(conn, nil, t) + + // starting sequentially + assert.NoError(manager.Start(), "starting manager failed") + assert.Error(manager.Start(), "starting active manager should fail") +} + +// auxiliary tests + +// TestManagerTimeout checks that timeouts exponentially backoff +func TestManagerTimeout(t *testing.T) { + var manager *Manager + assert := assert.New(t) + + conn := 10 + manager = newManager(conn, nil, t) + + assert.NoError(manager.Start(), "starting manager failed") + assert.Equal(manager.IsActive(), Active, "manager is inactive") + + prevTimeout, err := manager.Timeout() + + for i := 1; i <= conn; i++ { + assert.NoError(err, "generating timeout failed") + assert.True(prevTimeout > 0, "invalid timeout") + + timeout, err := manager.Timeout() + + if i == conn { + assert.Error(err, "allowed exceeding max attempts") + } else { + assert.NoError(err, "generating timeout failed") + assert.True( + timeout == 2*prevTimeout, + "incorrect timeout %d, expected %d", + timeout, 2*prevTimeout, + ) + } + + prevTimeout = timeout + } +} + +// TestManagerHB tests the heartbeat channel opens and closes +func TestManagerHB(t *testing.T) { + + var manager *Manager + assert := assert.New(t) + + conn := rand.Intn(MaxConnAttempts) + manager = newManager(conn, nil, t) + + assert.NoError(manager.Start(), "starting manager failed") + assert.Equal(manager.IsActive(), Active, "manager is inactive") + + ch := make(chan struct{}) + + go manager.HeartBeat(ch, 10, 0, time.Millisecond) + + for range ch { + // close on first ping + assert.NoError(manager.Stop(), "stopping manager failed") + } + + assert.Equal(manager.IsActive(), Inactive, "manager is active") +} + +// TestManagerHBTiming tests the heartbeat channel timing is correct +func TestManagerHBTiming(t *testing.T) { + + var manager *Manager + assert := assert.New(t) + + conn := rand.Intn(MaxConnAttempts) + manager = newManager(conn, nil, t) + + assert.NoError(manager.Start(), "starting manager failed") + assert.Equal(manager.IsActive(), Active, "manager is inactive") + + ch := make(chan struct{}) + hb := 100 + pings := 10 + + // expected time with tolerance for other logic and worst case rand timeout + expected := time.Duration(pings*hb+15) * time.Millisecond + + go manager.HeartBeat(ch, hb, 1, time.Millisecond) + + iter := 0 + start := time.Now() + for range ch { + // close after 10 pings + iter += 1 + if iter >= pings { + assert.NoError(manager.Stop(), "stopping manager failed") + } + } + end := time.Now() + + assert.Equal(manager.IsActive(), Inactive, "manager is active") + assert.WithinDuration(start, end, expected, "inaccurate heartbeat") +} diff --git a/server/internal/pkg/server/coordinator.go b/server/internal/pkg/server/coordinator.go index 95e6158..c312891 100644 --- a/server/internal/pkg/server/coordinator.go +++ b/server/internal/pkg/server/coordinator.go @@ -1,10 +1,15 @@ +// package Server provides a way to listen for incoming connections +// and manage multiple reactor clients. package server import ( pb "FRMS/internal/pkg/grpc" +<<<<<<< HEAD:server/internal/pkg/server/coordinator.go "FRMS/internal/pkg/influxdb" "FRMS/internal/pkg/logging" "context" +======= +>>>>>>> origin/wrapup:internal/pkg/server/coordinator.go "errors" "fmt" "net" @@ -14,184 +19,68 @@ import ( "google.golang.org/grpc" ) -// this package creates the central coordiantor and sub coordiantors to route clients - -// db client interface -type Database interface { - // getters (all create if doesnt exist) - GetReactorClient(int) (string, string, string, string, error) // returns (url, org, bucket, token, err) -} - -func NewDatabaseAdmin(config *viper.Viper) (Database, error) { - return influxdb.NewDBAdmin(config) -} - -type CentralCoordinator struct { - // main coordinator - ClientConnections *ClientPacket - *ReactorCoordinator - Database - Config *viper.Viper - // from config - Ports map[string]int `mapstructure:"ports"` - Err chan error -} - -func NewCentralCoordinator(config *viper.Viper, ch chan error) *CentralCoordinator { - // create a central coordinator to manage requests - db, err := NewDatabaseAdmin(config) - if err != nil { - ch <- err - } - - rc, err := NewReactorCoordinator(config, ch) - if err != nil { - ch <- err - } - config.UnmarshalKey("server.ports", rc) // get reactor port - c := &CentralCoordinator{ - Err: ch, - Config: config, - Database: db, - ReactorCoordinator: rc, - } - // grab config settings - if err = config.UnmarshalKey("server", c); err != nil { - ch <- err - } +var ( + ErrMissingPort = errors.New("port not set") +) - return c -} +// Coordinator is runs on the server and is used to oversee +// the reactor managers as well as process incoming client connections. +type Coordinator struct { + Config *viper.Viper + listener net.Listener + grpcServer *grpc.Server -func (c *CentralCoordinator) Start() { - // starts up associated funcs - clientChan := make(chan *ClientPacket) - l := NewListener(clientChan, c.Err) - // grabs lis port - c.Config.UnmarshalKey("server.ports", l) + DatabasePort int `mapstructure:"database_port"` + GRPCPort int `mapstructure:"grpc_port"` - // starting reactor coordinator - if err := c.ReactorCoordinator.Start(); err != nil { - c.Err <- err - } - // starting listener - if err := l.Start(); err != nil { - c.Err <- err - } - // lastly start client listener - go c.ClientListener(clientChan) -} - -func (c *CentralCoordinator) ClientListener(ch chan *ClientPacket) { - for client := range ch { - // basically loops until channel is closed - client.Response <- c.ClientHandler(client.Client) // respond with cred - } -} + directory map[int]*ReactorManager + managerMu sync.RWMutex -func (c *CentralCoordinator) ClientHandler(cl *Client) *ClientResponse { - // returns reactor db info - var err error - cr := &ClientResponse{Port: c.Ports[cl.Type]} - - if cl.Type == "reactor" { - // get reactor info - go c.ReactorCoordinator.ClientHandler(cl) - // db info - cr.URL, cr.Org, cr.Token, cr.Bucket, err = c.Database.GetReactorClient(cl.Id) - } else { - // throw error - err = errors.New(fmt.Sprintf("Client type %s not recognized!")) - } - // returns based on cl type - if err != nil { - c.Err <- err - } - return cr -} - -type ReactorCoordinator struct { - Port int `mapstructure:"reactor"` - *ReactorManagers Err chan error + + // grpc + pb.UnimplementedHandshakeServer pb.UnimplementedMonitoringServer } -type ReactorManagers struct { - Config *viper.Viper - Directory map[int]*ReactorManager - sync.RWMutex -} +// NewCentralCoordinator creates a central coordinator with the given global +// config and error channel. +func NewCentralCoordinator(config *viper.Viper, ch chan error) *Coordinator { -func NewReactorCoordinator(config *viper.Viper, errCh chan error) (*ReactorCoordinator, error) { rmap := make(map[int]*ReactorManager) - rm := &ReactorManagers{Directory: rmap, Config: config} - c := &ReactorCoordinator{Err: errCh, ReactorManagers: rm} - return c, nil -} - -func (c *ReactorCoordinator) Start() error { - logging.Debug(logging.DStart, "RCO 01 Starting!") - // register grpc service - return c.Register() -} -func (c *ReactorCoordinator) ClientHandler(cl *Client) { - // updates clients if nessecary - if err := c.UpdateReactorManager(cl, c.Err); err != nil { - c.Err <- err + return &Coordinator{ + Err: ch, + Config: config, + directory: rmap, } } -func (m *ReactorManagers) GetReactorManager(id int) (*ReactorManager, error) { - m.RLock() - defer m.RUnlock() +// Start loads config, starts network listener and registers grpc handlers. +// Ready for new clients on return. +func (c *Coordinator) Start() error { - rm, exists := m.Directory[id] - if !exists { - return &ReactorManager{}, errors.New(fmt.Sprintf("No manager for reactor %d!", id)) + if err := c.Config.Unmarshal(c); err != nil { + return err } - return rm, nil -} -func (m *ReactorManagers) UpdateReactorManager(cl *Client, errCh chan error) error { - // locking - m.RLock() - defer m.RUnlock() - - var err error - - rm, exists := m.Directory[cl.Id] - if !exists { - logging.Debug(logging.DClient, "RCO creating manager for reactor client %v", cl.Id) - // creating - rm = NewReactorManager(cl, m.Config, errCh) - // starting - if err = rm.Start(); err != nil { - return err - } - m.Directory[cl.Id] = rm + // ensure it shows up as missing + if c.GRPCPort == 0 { + c.Config.Set("grpc_port", 0) + c.Config.WriteConfig() + + return ErrMissingPort } - return rm.UpdateClient(cl) -} -func (r *ReactorCoordinator) Register() error { - lis, err := net.Listen("tcp", fmt.Sprintf(":%v", r.Port)) + lis, err := net.Listen("tcp", fmt.Sprintf(":%v", c.GRPCPort)) if err != nil { return err } + grpcServer := grpc.NewServer() - pb.RegisterMonitoringServer(grpcServer, r) - go grpcServer.Serve(lis) - logging.Debug(logging.DClient, "RCO ready for client requests") - return nil -} -func (r *ReactorCoordinator) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) { - rm, err := r.GetReactorManager(int(req.GetId())) - // error checking - if err != nil { - return &pb.ReactorStatusResponse{}, err - } - return rm.ReactorStatusHandler(ctx, req) + c.listener = lis + c.grpcServer = grpcServer + + return c.Register() } diff --git a/server/internal/pkg/server/database.go b/server/internal/pkg/server/database.go new file mode 100644 index 0000000..4195fad --- /dev/null +++ b/server/internal/pkg/server/database.go @@ -0,0 +1,13 @@ +package server + +type dbinfo struct { + url string + org string + token string + bucket string +} + +func (c *Coordinator) getReactorDatabaseCred(id int) (*dbinfo, error) { + + return &dbinfo{}, nil +} diff --git a/server/internal/pkg/server/handler.go b/server/internal/pkg/server/handler.go new file mode 100644 index 0000000..5e855a5 --- /dev/null +++ b/server/internal/pkg/server/handler.go @@ -0,0 +1,48 @@ +package server + +import ( + pb "FRMS/internal/pkg/grpc" + "FRMS/internal/pkg/logging" + "context" +) + +// ClientDiscoveryHandler implements the grpc method which can be called +// by incoming clients to first make connection to the central +// coordinator and receive database credentials. +func (c *Coordinator) ReactorClientHandler( + ctx context.Context, + req *pb.ReactorClientRequest, +) (*pb.ReactorClientResponse, error) { + + id := int(req.GetId()) + + logging.Debug( + logging.DClient, + "LIS 00 reactor %v has connected\n", + id, + ) + + db, err := c.getReactorDatabaseCred(id) + if err != nil { + return &pb.ReactorClientResponse{}, err + } + + return &pb.ReactorClientResponse{ + Id: id, + Url: db.url, + Org: db.org, + Token: db.token, + Bucket: db.bucket, + }, err +} + +// ReactorStatusHandler is a gRPC handler used to handle incoming +// reactor requests containing information about said reactor. +// It will get the associate reactor manager and pass the +// request device information before returning an acknowledgement. +func (c *Coordinator) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) { + + // rm, err := c.LoadReactorManager(int(req.GetId())) + + return &pb.ReactorStatusResponse{}, nil +} diff --git a/server/internal/pkg/server/reactor.go b/server/internal/pkg/server/reactor.go new file mode 100644 index 0000000..12dc3ad --- /dev/null +++ b/server/internal/pkg/server/reactor.go @@ -0,0 +1,121 @@ +package server + +import ( + "FRMS/internal/pkg/logging" + "FRMS/internal/pkg/manager" + "errors" + "time" +) + +var ( + ErrNoReactorManager = errors.New("no reactor manager found") +) + +// ReactorManager can be started/stopped as clients connect/disconnect. +type ReactorManager struct { + Manager // base manager interface +} + +// Manager is an interface requiring a structure that can be started +// and stopped as well as provide timeouts in milliseconds. +type Manager interface { + Start() error // status checks + Stop() error + Timeout() (time.Duration, error) // TO Generator +} + +// NewManager returns a manager fulfilling the Manager interface as well as +// any potential errors. +func NewManager(max int) (Manager, error) { + return manager.New(max) +} + +// GetReactorManager returns a reactor manager for passed id. +// Throws error if manager not found for id. +func (c *Coordinator) LoadReactorManager(id int) (*ReactorManager, error) { + + c.managerMu.RLock() + defer c.managerMu.RUnlock() + + rm, exists := c.directory[id] + + if !exists { + logging.Debug( + logging.DClient, + "RCO 00 creating manager for %v", + id, + ) + + m, err := NewManager(0) + + rm = &ReactorManager{ + Manager: m, + } + + if err = rm.Start(); err != nil { + return rm, err + } + + c.directory[id] = rm + } + + return rm, nil +} + +// // NewReactorManager takes in a client, config and channel to pass errors on. +// // Returns a new reactor manager as well as any errors that occured during +// // creation. +// // Uses MaxConnectionAttempts which defaults to 10 to prevent +// // unnessecary network load and/or timeout lengths. +// func NewReactorManager( +// ) (*ReactorManager, error) { + +// m, err := NewManager(MaxConnectionAttempts) + +// if err != nil { +// return &ReactorManager{}, err +// } + +// return r, err +// } + +// Start logs the start and calls start on the embedded manager. +func (r *ReactorManager) Start() error { + // logging.Debug(logging.DStart, "RMA starting", r.Id) + return r.Manager.Start() +} + +// Stop logs the stop and calls stop on the embedded manager. +func (r *ReactorManager) Stop() error { + // logging.Debug(logging.DExit, "RMA %v stopping", r.Id) + return r.Manager.Stop() +} + +// UpdateClient is used to change the underlying manager client if there +// changes to its data. +// +// BUG(Keegan): Client is not protected by a lock and may lead to races +// func (r *ReactorManager) UpdateClient(cl *Client) error { +// logging.Debug(logging.DClient, "RMA %v updating client", r.Id) +// r.Client = cl +// return nil +// } + +// // ReactorDeviceHandler processes incoming device information and +// // updates the manager accordingly. +// func (r *ReactorManager) ReactorDeviceHandler(devs []*pb.Device) error { + +// logging.Debug(logging.DClient, "CCO recieved ping from %v", r.Id) + +// for _, dev := range devs { +// logging.Debug( +// logging.DClient, +// "CCO %v device %v is %v", +// r.Id, +// dev.GetAddr(), +// dev.GetStatus().String(), +// ) +// } + +// return nil +// } diff --git a/server/internal/pkg/server/register.go b/server/internal/pkg/server/register.go new file mode 100644 index 0000000..7be0111 --- /dev/null +++ b/server/internal/pkg/server/register.go @@ -0,0 +1,19 @@ +package server + +import ( + pb "FRMS/internal/pkg/grpc" + "FRMS/internal/pkg/logging" +) + +func (c *Coordinator) Register() error { + // register services + pb.RegisterHandshakeServer(c.grpcServer, c) + + go c.grpcServer.Serve(c.listener) + // testing + pb.RegisterMonitoringServer(c.grpcServer, c) + + logging.Debug(logging.DStart, "CCO 00 registered grpc") + + return nil +} diff --git a/server/main.go b/server/main.go index ab8c64f..db419e2 100644 --- a/server/main.go +++ b/server/main.go @@ -4,24 +4,26 @@ import ( "fmt" "os/signal" "syscall" + "time" - "FRMS/internal/pkg/config" + conf "FRMS/internal/pkg/config" + "FRMS/internal/pkg/database" "FRMS/internal/pkg/logging" "FRMS/internal/pkg/server" - "FRMS/internal/pkg/websocket" "os" "github.com/spf13/viper" ) type coordinator interface { - Start() + Start() error } func NewCoordinator(config *viper.Viper, ch chan error) coordinator { return server.NewCentralCoordinator(config, ch) } +<<<<<<< HEAD:server/main.go func NewConfig(filename string) (*viper.Viper, error) { return config.LoadConfig(filename) } @@ -34,11 +36,17 @@ func NewWebSocket() ws { return websocket.New() } +======= +>>>>>>> origin/wrapup:cmd/server/main.go func main() { - // lets get this bread + + fmt.Printf("starting server... ") + start := time.Now() + gracefulShutdown := make(chan os.Signal, 1) signal.Notify(gracefulShutdown, syscall.SIGINT, syscall.SIGTERM) +<<<<<<< HEAD:server/main.go // config file conf, err := NewConfig("server") if err != nil { @@ -72,6 +80,48 @@ func main() { panic(err) } logging.Debug(logging.DStop, "CON stored successfully", conf.ConfigFileUsed()) +======= + userHome, err := os.UserHomeDir() + + if err != nil { + panic(err) + } + + configPath := fmt.Sprintf("%s/.config/FRMS", userHome) + configFile := "server" + configExt := "yaml" + config, err := conf.Load(configFile, configPath, configExt) + if err != nil { + panic(err) + } + + database.Connect(config) + + errCh := make(chan error) + + c := NewCoordinator(config, errCh) + if err := c.Start(); err != nil { + panic(err) + } + + logging.Debug(logging.DStart, "CCO 01 %s started", config.Get("name")) + + elapsed := time.Since(start) + fmt.Printf("done %v\n", elapsed.Round(time.Microsecond)) + + select { + case err := <-errCh: + panic(err) + case <-gracefulShutdown: + fmt.Printf("\nstopping server... ") + start := time.Now() + if err := config.WriteConfig(); err != nil { + panic(err) + } + logging.Debug(logging.DExit, "CON wrote %s", config.ConfigFileUsed()) + elapsed := time.Since(start) + fmt.Printf("done %v\n", elapsed.Round(time.Microsecond)) +>>>>>>> origin/wrapup:cmd/server/main.go os.Exit(0) } } diff --git a/todo.md b/todo.md new file mode 100644 index 0000000..37631fa --- /dev/null +++ b/todo.md @@ -0,0 +1,34 @@ +# TODO + +### Documentation + +- [ ] API +- [ ] Config +- [ ] Device +- [ ] gRPC +- [ ] I2C +- [ ] DB +- [ ] logging +- [ ] reactor +- [ ] server +- [ ] system? + +### Testing + +- [ ] Config +- [ ] Device +- [ ] i2c +- [ ] logging +- [ ] reactor +- [O] server +- [ ] gRPC? +- [ ] db? +- [ ] system? + +### Misc + +- [ ] i2c -> lib +- [ ] strip device pkg +- [ ] future work writeup +- [ ] strip reactor down +- [ ] websocket -> web-gRPC diff --git a/wc.sh b/wc.sh deleted file mode 100755 index 3777a06..0000000 --- a/wc.sh +++ /dev/null @@ -1,2 +0,0 @@ -#!/bin/bash -echo $(git ls-files | grep .go | awk '!/pb/' | xargs wc -l | tail -n 1) diff --git a/wiki/api.md b/wiki/api.md new file mode 100644 index 0000000..cfb474b --- /dev/null +++ b/wiki/api.md @@ -0,0 +1,3 @@ +# API + +This will describe the idea for the API diff --git a/wiki/future-work.md b/wiki/future-work.md new file mode 100644 index 0000000..3b4eb14 --- /dev/null +++ b/wiki/future-work.md @@ -0,0 +1,3 @@ +# Future Work + +This will describe where to take the project from here diff --git a/wiki/gui.md b/wiki/gui.md new file mode 100644 index 0000000..0fc9611 --- /dev/null +++ b/wiki/gui.md @@ -0,0 +1,3 @@ +# GUI + +This will describe the basic outline for the front-end gui diff --git a/wiki/networking.md b/wiki/networking.md new file mode 100644 index 0000000..24352c4 --- /dev/null +++ b/wiki/networking.md @@ -0,0 +1,3 @@ +# Networking + +This will describe how the reactor/server/client talk to each other diff --git a/wiki/reactor.md b/wiki/reactor.md new file mode 100644 index 0000000..1b4cfaf --- /dev/null +++ b/wiki/reactor.md @@ -0,0 +1,10 @@ +# Reactor + +This will talk about the reactor setup + +## Hardware + +This will describe the hardware used + +## I2C + diff --git a/wiki/server.md b/wiki/server.md new file mode 100644 index 0000000..70b6706 --- /dev/null +++ b/wiki/server.md @@ -0,0 +1,3 @@ +# Server Information + +## This is a sample diff --git a/wiki/wiki.md b/wiki/wiki.md new file mode 100644 index 0000000..acc1d23 --- /dev/null +++ b/wiki/wiki.md @@ -0,0 +1,14 @@ +# Wiki + +## Overview + +This is an example of the overview section of the wiki + +## Table of Contents + +* [Server](server.md) +* [Reactor](reactor.md) +* [Networking](networking.md) +* [GUI](gui.md) +* [API](api.md) +* [Future Work](future-work.md)