refactored a lot of server, creating actual database package

main
KeeganForelight 2 years ago
parent 72fe309a25
commit f209980e8b

@ -11,6 +11,11 @@ tasks:
cmds:
- bin/gotest.py
proto:
desc: "Rebuilds protobuf for gRPC"
cmds:
- protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative internal/pkg/grpc/*.proto
all:
desc: "builds arm reactor binaries and arm/amd server binaries"
deps: [arm32-reactor, arm64-reactor, arm64-server, amd64-server]

@ -25,10 +25,6 @@ func NewReactorCoordinator(
return reactor.NewCoordinator(config, ch)
}
func NewConfig(file, path, ext string) (*viper.Viper, error) {
return config.LoadConfig(file, path, ext)
}
func main() {
// shutdown
gracefulShutdown := make(chan os.Signal, 1)
@ -44,7 +40,7 @@ func main() {
configFile := "reactor"
configExt := "yaml"
conf, err := NewConfig(configFile, configPath, configExt)
conf, err := config.Load(configFile, configPath, configExt)
if err != nil {
panic(err)
}

@ -6,7 +6,8 @@ import (
"syscall"
"time"
"FRMS/internal/pkg/config"
conf "FRMS/internal/pkg/config"
"FRMS/internal/pkg/database"
"FRMS/internal/pkg/logging"
"FRMS/internal/pkg/server"
"os"
@ -15,17 +16,13 @@ import (
)
type coordinator interface {
Start()
Start() error
}
func NewCoordinator(config *viper.Viper, ch chan error) coordinator {
return server.NewCentralCoordinator(config, ch)
}
func LoadConfig(file, path, ext string) (*viper.Viper, error) {
return config.LoadConfig(file, path, ext)
}
func main() {
fmt.Printf("starting server... ")
@ -43,18 +40,23 @@ func main() {
configPath := fmt.Sprintf("%s/.config/FRMS", userHome)
configFile := "server"
configExt := "yaml"
conf, err := LoadConfig(configFile, configPath, configExt)
config, err := conf.Load(configFile, configPath, configExt)
if err != nil {
panic(err)
}
database.Connect(config)
errCh := make(chan error)
c := NewCoordinator(conf, errCh)
go c.Start()
logging.Debug(logging.DStart, "CCO 01 %s started", conf.Get("name"))
c := NewCoordinator(config, errCh)
if err := c.Start(); err != nil {
panic(err)
}
logging.Debug(logging.DStart, "CCO 01 %s started", config.Get("name"))
elapsed := time.Now().Sub(start)
elapsed := time.Since(start)
fmt.Printf("done %v\n", elapsed.Round(time.Microsecond))
select {
@ -63,11 +65,11 @@ func main() {
case <-gracefulShutdown:
fmt.Printf("\nstopping server... ")
start := time.Now()
if err := conf.WriteConfig(); err != nil {
if err := config.WriteConfig(); err != nil {
panic(err)
}
logging.Debug(logging.DExit, "CON wrote %s", conf.ConfigFileUsed())
elapsed := time.Now().Sub(start)
logging.Debug(logging.DExit, "CON wrote %s", config.ConfigFileUsed())
elapsed := time.Since(start)
fmt.Printf("done %v\n", elapsed.Round(time.Microsecond))
os.Exit(0)
}

@ -9,15 +9,15 @@ import (
"github.com/spf13/viper"
)
// LoadConfig loads the file at path/file into a viper object
// Expects config file to be yaml
func LoadConfig(file, path, ext string) (*viper.Viper, error) {
// Load the file at path/file into a viper object.
// Expects config file to be yaml.
func Load(file, path, ext string) (*viper.Viper, error) {
logging.Debug(logging.DStart, "CON Loading config for %s", file)
logging.Debug(logging.DStart, "CON loading %s", file)
config := viper.New()
configFile := fmt.Sprintf("%s/%s.%s", path, file, ext)
//configFile := fmt.Sprintf("%s/%s.%s", path, file, ext)
config.SetConfigName(file)
config.AddConfigPath(path)
@ -32,12 +32,12 @@ func LoadConfig(file, path, ext string) (*viper.Viper, error) {
}
// attempt to create an empty config incase it doesn't exist
if err := config.SafeWriteConfigAs(configFile); err != nil {
// if error thrown because file exists, fine to ignore
if _, ok := err.(viper.ConfigFileAlreadyExistsError); !ok {
return config, err
}
}
// if err := config.SafeWriteConfigAs(configFile); err != nil {
// // if error thrown because file exists, fine to ignore
// if _, ok := err.(viper.ConfigFileAlreadyExistsError); !ok {
// return config, err
// }
// }
if err := config.ReadInConfig(); err != nil {
fmt.Printf("read error %v\n", config)

@ -0,0 +1,64 @@
// package Database wraps some influx db methods to provide functionality.
package database
import (
"context"
"errors"
"fmt"
influx "github.com/influxdata/influxdb-client-go/v2"
"github.com/spf13/viper"
)
var (
ErrDBConnection = errors.New("connection to database failed")
ErrNoURLFound = errors.New("database url not found")
)
var db influx.Client
// Connect takes in a config and attempts to create a client for influxdb.
// Will automatically write changes back to config for future attempts
func Connect(config *viper.Viper) error {
url := config.GetString("db.url")
token := config.GetString("db.token")
if url == "" {
return ErrNoURLFound
}
db = influx.NewClient(url, token)
if token == "" {
// try setup
fmt.Printf("attempting to setup database at %v\n", url)
user := config.GetString("db.username")
password := config.GetString("db.password")
org := config.GetString("db.org")
bucket := config.GetString("db.bucket")
Setup(user, pass, org, bucket
}
db = influx.NewClient(url, token)
return nil
}
func Setup(user, pass, org, bucket string, ret int) (string, error) {
resp, err := db.Setup(context.Background(), user, pass, org, bucket, ret)
return "", nil
}
func GetBucket(id int) (string, error) {
return "", nil
}
func GetToken(id int) (string, error) {
// bucket, err := client.BucketsAPI().FindBucketByName(context.Background(), id)
return "", nil
}

@ -1,42 +0,0 @@
syntax = "proto3";
package grpc;
option go_package = "internal/pkg/grpc";
service device {
// groups basic device interactions
// get/set name based on request
rpc Name(NameRequest) returns (NameResponse)
}
message NameRequest {
// empty for future expansion
string Name = 1;
}
message NameResponse {
string Name = 1;
}
service sensor {
// sensor specific functions
rpc Reading(ReadingRequest) returns (ReadingResponse)
rpc SampleRate(SampleRateRequest) returns (SampleRateResponse)
}
message ReadingRequest {
// empty
}
message ReadingResponse {
string Reading = 1; // formatted reading "9.7 pH"
int64 Timestamp = 2; // when the reading was taken
}
message SampleRateRequest {
int32 SampleRate = 1; // 0 to return current sample rate, value in seconds
}
message SampleRateResponse {
int32 SampleRate = 1; // returns the set sample rate
}

@ -0,0 +1,260 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.28.1
// protoc v3.21.12
// source: internal/pkg/grpc/handshake.proto
package grpc
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type ReactorClientRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Id uint32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"`
Port uint32 `protobuf:"varint,2,opt,name=port,proto3" json:"port,omitempty"` // client gRPC port
}
func (x *ReactorClientRequest) Reset() {
*x = ReactorClientRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_internal_pkg_grpc_handshake_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *ReactorClientRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ReactorClientRequest) ProtoMessage() {}
func (x *ReactorClientRequest) ProtoReflect() protoreflect.Message {
mi := &file_internal_pkg_grpc_handshake_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ReactorClientRequest.ProtoReflect.Descriptor instead.
func (*ReactorClientRequest) Descriptor() ([]byte, []int) {
return file_internal_pkg_grpc_handshake_proto_rawDescGZIP(), []int{0}
}
func (x *ReactorClientRequest) GetId() uint32 {
if x != nil {
return x.Id
}
return 0
}
func (x *ReactorClientRequest) GetPort() uint32 {
if x != nil {
return x.Port
}
return 0
}
type ReactorClientResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Id uint32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"`
Url string `protobuf:"bytes,2,opt,name=url,proto3" json:"url,omitempty"`
Org string `protobuf:"bytes,3,opt,name=org,proto3" json:"org,omitempty"`
Token string `protobuf:"bytes,4,opt,name=token,proto3" json:"token,omitempty"`
Bucket string `protobuf:"bytes,5,opt,name=bucket,proto3" json:"bucket,omitempty"`
}
func (x *ReactorClientResponse) Reset() {
*x = ReactorClientResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_internal_pkg_grpc_handshake_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *ReactorClientResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ReactorClientResponse) ProtoMessage() {}
func (x *ReactorClientResponse) ProtoReflect() protoreflect.Message {
mi := &file_internal_pkg_grpc_handshake_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ReactorClientResponse.ProtoReflect.Descriptor instead.
func (*ReactorClientResponse) Descriptor() ([]byte, []int) {
return file_internal_pkg_grpc_handshake_proto_rawDescGZIP(), []int{1}
}
func (x *ReactorClientResponse) GetId() uint32 {
if x != nil {
return x.Id
}
return 0
}
func (x *ReactorClientResponse) GetUrl() string {
if x != nil {
return x.Url
}
return ""
}
func (x *ReactorClientResponse) GetOrg() string {
if x != nil {
return x.Org
}
return ""
}
func (x *ReactorClientResponse) GetToken() string {
if x != nil {
return x.Token
}
return ""
}
func (x *ReactorClientResponse) GetBucket() string {
if x != nil {
return x.Bucket
}
return ""
}
var File_internal_pkg_grpc_handshake_proto protoreflect.FileDescriptor
var file_internal_pkg_grpc_handshake_proto_rawDesc = []byte{
0x0a, 0x21, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x67,
0x72, 0x70, 0x63, 0x2f, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x2e, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x12, 0x04, 0x67, 0x72, 0x70, 0x63, 0x22, 0x3a, 0x0a, 0x14, 0x52, 0x65, 0x61,
0x63, 0x74, 0x6f, 0x72, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x02, 0x69,
0x64, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x6f, 0x72, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52,
0x04, 0x70, 0x6f, 0x72, 0x74, 0x22, 0x79, 0x0a, 0x15, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72,
0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x0e,
0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x02, 0x69, 0x64, 0x12, 0x10,
0x0a, 0x03, 0x75, 0x72, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x75, 0x72, 0x6c,
0x12, 0x10, 0x0a, 0x03, 0x6f, 0x72, 0x67, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6f,
0x72, 0x67, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28,
0x09, 0x52, 0x05, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x16, 0x0a, 0x06, 0x62, 0x75, 0x63, 0x6b,
0x65, 0x74, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x62, 0x75, 0x63, 0x6b, 0x65, 0x74,
0x32, 0x5c, 0x0a, 0x09, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x12, 0x4f, 0x0a,
0x14, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x48, 0x61,
0x6e, 0x64, 0x6c, 0x65, 0x72, 0x12, 0x1a, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x61,
0x63, 0x74, 0x6f, 0x72, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x1a, 0x1b, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72,
0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x13,
0x5a, 0x11, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x67,
0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_internal_pkg_grpc_handshake_proto_rawDescOnce sync.Once
file_internal_pkg_grpc_handshake_proto_rawDescData = file_internal_pkg_grpc_handshake_proto_rawDesc
)
func file_internal_pkg_grpc_handshake_proto_rawDescGZIP() []byte {
file_internal_pkg_grpc_handshake_proto_rawDescOnce.Do(func() {
file_internal_pkg_grpc_handshake_proto_rawDescData = protoimpl.X.CompressGZIP(file_internal_pkg_grpc_handshake_proto_rawDescData)
})
return file_internal_pkg_grpc_handshake_proto_rawDescData
}
var file_internal_pkg_grpc_handshake_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_internal_pkg_grpc_handshake_proto_goTypes = []interface{}{
(*ReactorClientRequest)(nil), // 0: grpc.ReactorClientRequest
(*ReactorClientResponse)(nil), // 1: grpc.ReactorClientResponse
}
var file_internal_pkg_grpc_handshake_proto_depIdxs = []int32{
0, // 0: grpc.handshake.ReactorClientHandler:input_type -> grpc.ReactorClientRequest
1, // 1: grpc.handshake.ReactorClientHandler:output_type -> grpc.ReactorClientResponse
1, // [1:2] is the sub-list for method output_type
0, // [0:1] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}
func init() { file_internal_pkg_grpc_handshake_proto_init() }
func file_internal_pkg_grpc_handshake_proto_init() {
if File_internal_pkg_grpc_handshake_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_internal_pkg_grpc_handshake_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ReactorClientRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_internal_pkg_grpc_handshake_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ReactorClientResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_internal_pkg_grpc_handshake_proto_rawDesc,
NumEnums: 0,
NumMessages: 2,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_internal_pkg_grpc_handshake_proto_goTypes,
DependencyIndexes: file_internal_pkg_grpc_handshake_proto_depIdxs,
MessageInfos: file_internal_pkg_grpc_handshake_proto_msgTypes,
}.Build()
File_internal_pkg_grpc_handshake_proto = out.File
file_internal_pkg_grpc_handshake_proto_rawDesc = nil
file_internal_pkg_grpc_handshake_proto_goTypes = nil
file_internal_pkg_grpc_handshake_proto_depIdxs = nil
}

@ -0,0 +1,21 @@
syntax = "proto3";
package grpc;
option go_package = "internal/pkg/grpc";
service handshake {
rpc ReactorClientHandler(ReactorClientRequest) returns (ReactorClientResponse);
}
message ReactorClientRequest {
uint32 id = 1;
uint32 port = 2; // client gRPC port
}
message ReactorClientResponse {
uint32 id = 1;
string url = 2;
string org = 3;
string token = 4;
string bucket = 5;
}

@ -1,8 +1,8 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v3.6.1
// source: internal/pkg/grpc/server.proto
// - protoc v3.21.12
// source: internal/pkg/grpc/handshake.proto
package grpc
@ -22,7 +22,7 @@ const _ = grpc.SupportPackageIsVersion7
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type HandshakeClient interface {
ClientDiscoveryHandler(ctx context.Context, in *ClientRequest, opts ...grpc.CallOption) (*ClientResponse, error)
ReactorClientHandler(ctx context.Context, in *ReactorClientRequest, opts ...grpc.CallOption) (*ReactorClientResponse, error)
}
type handshakeClient struct {
@ -33,9 +33,9 @@ func NewHandshakeClient(cc grpc.ClientConnInterface) HandshakeClient {
return &handshakeClient{cc}
}
func (c *handshakeClient) ClientDiscoveryHandler(ctx context.Context, in *ClientRequest, opts ...grpc.CallOption) (*ClientResponse, error) {
out := new(ClientResponse)
err := c.cc.Invoke(ctx, "/grpc.handshake/ClientDiscoveryHandler", in, out, opts...)
func (c *handshakeClient) ReactorClientHandler(ctx context.Context, in *ReactorClientRequest, opts ...grpc.CallOption) (*ReactorClientResponse, error) {
out := new(ReactorClientResponse)
err := c.cc.Invoke(ctx, "/grpc.handshake/ReactorClientHandler", in, out, opts...)
if err != nil {
return nil, err
}
@ -46,7 +46,7 @@ func (c *handshakeClient) ClientDiscoveryHandler(ctx context.Context, in *Client
// All implementations must embed UnimplementedHandshakeServer
// for forward compatibility
type HandshakeServer interface {
ClientDiscoveryHandler(context.Context, *ClientRequest) (*ClientResponse, error)
ReactorClientHandler(context.Context, *ReactorClientRequest) (*ReactorClientResponse, error)
mustEmbedUnimplementedHandshakeServer()
}
@ -54,8 +54,8 @@ type HandshakeServer interface {
type UnimplementedHandshakeServer struct {
}
func (UnimplementedHandshakeServer) ClientDiscoveryHandler(context.Context, *ClientRequest) (*ClientResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method ClientDiscoveryHandler not implemented")
func (UnimplementedHandshakeServer) ReactorClientHandler(context.Context, *ReactorClientRequest) (*ReactorClientResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method ReactorClientHandler not implemented")
}
func (UnimplementedHandshakeServer) mustEmbedUnimplementedHandshakeServer() {}
@ -70,20 +70,20 @@ func RegisterHandshakeServer(s grpc.ServiceRegistrar, srv HandshakeServer) {
s.RegisterService(&Handshake_ServiceDesc, srv)
}
func _Handshake_ClientDiscoveryHandler_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ClientRequest)
func _Handshake_ReactorClientHandler_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ReactorClientRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(HandshakeServer).ClientDiscoveryHandler(ctx, in)
return srv.(HandshakeServer).ReactorClientHandler(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/grpc.handshake/ClientDiscoveryHandler",
FullMethod: "/grpc.handshake/ReactorClientHandler",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(HandshakeServer).ClientDiscoveryHandler(ctx, req.(*ClientRequest))
return srv.(HandshakeServer).ReactorClientHandler(ctx, req.(*ReactorClientRequest))
}
return interceptor(ctx, in, info, handler)
}
@ -96,10 +96,10 @@ var Handshake_ServiceDesc = grpc.ServiceDesc{
HandlerType: (*HandshakeServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "ClientDiscoveryHandler",
Handler: _Handshake_ClientDiscoveryHandler_Handler,
MethodName: "ReactorClientHandler",
Handler: _Handshake_ReactorClientHandler_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "internal/pkg/grpc/server.proto",
Metadata: "internal/pkg/grpc/handshake.proto",
}

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.28.0
// protoc v3.12.4
// protoc-gen-go v1.28.1
// protoc v3.21.12
// source: internal/pkg/grpc/monitoring.proto
package grpc
@ -20,56 +20,7 @@ const (
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type Status int32
const (
Status_DEAD Status = 0
Status_ALIVE Status = 1
Status_UNKOWN Status = 2
)
// Enum value maps for Status.
var (
Status_name = map[int32]string{
0: "DEAD",
1: "ALIVE",
2: "UNKOWN",
}
Status_value = map[string]int32{
"DEAD": 0,
"ALIVE": 1,
"UNKOWN": 2,
}
)
func (x Status) Enum() *Status {
p := new(Status)
*p = x
return p
}
func (x Status) String() string {
return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x))
}
func (Status) Descriptor() protoreflect.EnumDescriptor {
return file_internal_pkg_grpc_monitoring_proto_enumTypes[0].Descriptor()
}
func (Status) Type() protoreflect.EnumType {
return &file_internal_pkg_grpc_monitoring_proto_enumTypes[0]
}
func (x Status) Number() protoreflect.EnumNumber {
return protoreflect.EnumNumber(x)
}
// Deprecated: Use Status.Descriptor instead.
func (Status) EnumDescriptor() ([]byte, []int) {
return file_internal_pkg_grpc_monitoring_proto_rawDescGZIP(), []int{0}
}
type ReactorStatusResponse struct {
type ReactorAck struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
@ -77,8 +28,8 @@ type ReactorStatusResponse struct {
Id int32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"`
}
func (x *ReactorStatusResponse) Reset() {
*x = ReactorStatusResponse{}
func (x *ReactorAck) Reset() {
*x = ReactorAck{}
if protoimpl.UnsafeEnabled {
mi := &file_internal_pkg_grpc_monitoring_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
@ -86,13 +37,13 @@ func (x *ReactorStatusResponse) Reset() {
}
}
func (x *ReactorStatusResponse) String() string {
func (x *ReactorAck) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ReactorStatusResponse) ProtoMessage() {}
func (*ReactorAck) ProtoMessage() {}
func (x *ReactorStatusResponse) ProtoReflect() protoreflect.Message {
func (x *ReactorAck) ProtoReflect() protoreflect.Message {
mi := &file_internal_pkg_grpc_monitoring_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
@ -104,30 +55,28 @@ func (x *ReactorStatusResponse) ProtoReflect() protoreflect.Message {
return mi.MessageOf(x)
}
// Deprecated: Use ReactorStatusResponse.ProtoReflect.Descriptor instead.
func (*ReactorStatusResponse) Descriptor() ([]byte, []int) {
// Deprecated: Use ReactorAck.ProtoReflect.Descriptor instead.
func (*ReactorAck) Descriptor() ([]byte, []int) {
return file_internal_pkg_grpc_monitoring_proto_rawDescGZIP(), []int{0}
}
func (x *ReactorStatusResponse) GetId() int32 {
func (x *ReactorAck) GetId() int32 {
if x != nil {
return x.Id
}
return 0
}
type ReactorStatusPing struct {
type ReactorPing struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Id int32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"`
// new devices
Devices []*Device `protobuf:"bytes,2,rep,name=devices,proto3" json:"devices,omitempty"`
}
func (x *ReactorStatusPing) Reset() {
*x = ReactorStatusPing{}
func (x *ReactorPing) Reset() {
*x = ReactorPing{}
if protoimpl.UnsafeEnabled {
mi := &file_internal_pkg_grpc_monitoring_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
@ -135,13 +84,13 @@ func (x *ReactorStatusPing) Reset() {
}
}
func (x *ReactorStatusPing) String() string {
func (x *ReactorPing) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ReactorStatusPing) ProtoMessage() {}
func (*ReactorPing) ProtoMessage() {}
func (x *ReactorStatusPing) ProtoReflect() protoreflect.Message {
func (x *ReactorPing) ProtoReflect() protoreflect.Message {
mi := &file_internal_pkg_grpc_monitoring_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
@ -153,108 +102,34 @@ func (x *ReactorStatusPing) ProtoReflect() protoreflect.Message {
return mi.MessageOf(x)
}
// Deprecated: Use ReactorStatusPing.ProtoReflect.Descriptor instead.
func (*ReactorStatusPing) Descriptor() ([]byte, []int) {
// Deprecated: Use ReactorPing.ProtoReflect.Descriptor instead.
func (*ReactorPing) Descriptor() ([]byte, []int) {
return file_internal_pkg_grpc_monitoring_proto_rawDescGZIP(), []int{1}
}
func (x *ReactorStatusPing) GetId() int32 {
func (x *ReactorPing) GetId() int32 {
if x != nil {
return x.Id
}
return 0
}
func (x *ReactorStatusPing) GetDevices() []*Device {
if x != nil {
return x.Devices
}
return nil
}
type Device struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Addr int32 `protobuf:"varint,1,opt,name=addr,proto3" json:"addr,omitempty"` // i2c addr
Status Status `protobuf:"varint,2,opt,name=status,proto3,enum=grpc.Status" json:"status,omitempty"` // most recent status
}
func (x *Device) Reset() {
*x = Device{}
if protoimpl.UnsafeEnabled {
mi := &file_internal_pkg_grpc_monitoring_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *Device) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*Device) ProtoMessage() {}
func (x *Device) ProtoReflect() protoreflect.Message {
mi := &file_internal_pkg_grpc_monitoring_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use Device.ProtoReflect.Descriptor instead.
func (*Device) Descriptor() ([]byte, []int) {
return file_internal_pkg_grpc_monitoring_proto_rawDescGZIP(), []int{2}
}
func (x *Device) GetAddr() int32 {
if x != nil {
return x.Addr
}
return 0
}
func (x *Device) GetStatus() Status {
if x != nil {
return x.Status
}
return Status_DEAD
}
var File_internal_pkg_grpc_monitoring_proto protoreflect.FileDescriptor
var file_internal_pkg_grpc_monitoring_proto_rawDesc = []byte{
0x0a, 0x22, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x67,
0x72, 0x70, 0x63, 0x2f, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x69, 0x6e, 0x67, 0x2e, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x12, 0x04, 0x67, 0x72, 0x70, 0x63, 0x22, 0x27, 0x0a, 0x15, 0x52, 0x65,
0x61, 0x63, 0x74, 0x6f, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f,
0x6e, 0x73, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52,
0x02, 0x69, 0x64, 0x22, 0x4b, 0x0a, 0x11, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x53, 0x74,
0x61, 0x74, 0x75, 0x73, 0x50, 0x69, 0x6e, 0x67, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01,
0x20, 0x01, 0x28, 0x05, 0x52, 0x02, 0x69, 0x64, 0x12, 0x26, 0x0a, 0x07, 0x64, 0x65, 0x76, 0x69,
0x63, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0c, 0x2e, 0x67, 0x72, 0x70, 0x63,
0x2e, 0x44, 0x65, 0x76, 0x69, 0x63, 0x65, 0x52, 0x07, 0x64, 0x65, 0x76, 0x69, 0x63, 0x65, 0x73,
0x22, 0x42, 0x0a, 0x06, 0x44, 0x65, 0x76, 0x69, 0x63, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x61, 0x64,
0x64, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x04, 0x61, 0x64, 0x64, 0x72, 0x12, 0x24,
0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0c,
0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x06, 0x73, 0x74,
0x61, 0x74, 0x75, 0x73, 0x2a, 0x29, 0x0a, 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x08,
0x0a, 0x04, 0x44, 0x45, 0x41, 0x44, 0x10, 0x00, 0x12, 0x09, 0x0a, 0x05, 0x41, 0x4c, 0x49, 0x56,
0x45, 0x10, 0x01, 0x12, 0x0a, 0x0a, 0x06, 0x55, 0x4e, 0x4b, 0x4f, 0x57, 0x4e, 0x10, 0x02, 0x32,
0x5a, 0x0a, 0x0a, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x69, 0x6e, 0x67, 0x12, 0x4c, 0x0a,
0x14, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x48, 0x61,
0x6e, 0x64, 0x6c, 0x65, 0x72, 0x12, 0x17, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x61,
0x63, 0x74, 0x6f, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x50, 0x69, 0x6e, 0x67, 0x1a, 0x1b,
0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x53, 0x74, 0x61,
0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x13, 0x5a, 0x11, 0x69,
0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x67, 0x72, 0x70, 0x63,
0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x72, 0x6f, 0x74, 0x6f, 0x12, 0x04, 0x67, 0x72, 0x70, 0x63, 0x22, 0x1c, 0x0a, 0x0a, 0x52, 0x65,
0x61, 0x63, 0x74, 0x6f, 0x72, 0x41, 0x63, 0x6b, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01,
0x20, 0x01, 0x28, 0x05, 0x52, 0x02, 0x69, 0x64, 0x22, 0x1d, 0x0a, 0x0b, 0x52, 0x65, 0x61, 0x63,
0x74, 0x6f, 0x72, 0x50, 0x69, 0x6e, 0x67, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20,
0x01, 0x28, 0x05, 0x52, 0x02, 0x69, 0x64, 0x32, 0x49, 0x0a, 0x0a, 0x6d, 0x6f, 0x6e, 0x69, 0x74,
0x6f, 0x72, 0x69, 0x6e, 0x67, 0x12, 0x3b, 0x0a, 0x12, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72,
0x50, 0x69, 0x6e, 0x67, 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x12, 0x11, 0x2e, 0x67, 0x72,
0x70, 0x63, 0x2e, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x50, 0x69, 0x6e, 0x67, 0x1a, 0x10,
0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x41, 0x63, 0x6b,
0x28, 0x01, 0x42, 0x13, 0x5a, 0x11, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70,
0x6b, 0x67, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@ -269,24 +144,19 @@ func file_internal_pkg_grpc_monitoring_proto_rawDescGZIP() []byte {
return file_internal_pkg_grpc_monitoring_proto_rawDescData
}
var file_internal_pkg_grpc_monitoring_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
var file_internal_pkg_grpc_monitoring_proto_msgTypes = make([]protoimpl.MessageInfo, 3)
var file_internal_pkg_grpc_monitoring_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_internal_pkg_grpc_monitoring_proto_goTypes = []interface{}{
(Status)(0), // 0: grpc.Status
(*ReactorStatusResponse)(nil), // 1: grpc.ReactorStatusResponse
(*ReactorStatusPing)(nil), // 2: grpc.ReactorStatusPing
(*Device)(nil), // 3: grpc.Device
(*ReactorAck)(nil), // 0: grpc.ReactorAck
(*ReactorPing)(nil), // 1: grpc.ReactorPing
}
var file_internal_pkg_grpc_monitoring_proto_depIdxs = []int32{
3, // 0: grpc.ReactorStatusPing.devices:type_name -> grpc.Device
0, // 1: grpc.Device.status:type_name -> grpc.Status
2, // 2: grpc.monitoring.ReactorStatusHandler:input_type -> grpc.ReactorStatusPing
1, // 3: grpc.monitoring.ReactorStatusHandler:output_type -> grpc.ReactorStatusResponse
3, // [3:4] is the sub-list for method output_type
2, // [2:3] is the sub-list for method input_type
2, // [2:2] is the sub-list for extension type_name
2, // [2:2] is the sub-list for extension extendee
0, // [0:2] is the sub-list for field type_name
1, // 0: grpc.monitoring.ReactorPingHandler:input_type -> grpc.ReactorPing
0, // 1: grpc.monitoring.ReactorPingHandler:output_type -> grpc.ReactorAck
1, // [1:2] is the sub-list for method output_type
0, // [0:1] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}
func init() { file_internal_pkg_grpc_monitoring_proto_init() }
@ -296,7 +166,7 @@ func file_internal_pkg_grpc_monitoring_proto_init() {
}
if !protoimpl.UnsafeEnabled {
file_internal_pkg_grpc_monitoring_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ReactorStatusResponse); i {
switch v := v.(*ReactorAck); i {
case 0:
return &v.state
case 1:
@ -308,19 +178,7 @@ func file_internal_pkg_grpc_monitoring_proto_init() {
}
}
file_internal_pkg_grpc_monitoring_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ReactorStatusPing); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_internal_pkg_grpc_monitoring_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*Device); i {
switch v := v.(*ReactorPing); i {
case 0:
return &v.state
case 1:
@ -337,14 +195,13 @@ func file_internal_pkg_grpc_monitoring_proto_init() {
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_internal_pkg_grpc_monitoring_proto_rawDesc,
NumEnums: 1,
NumMessages: 3,
NumEnums: 0,
NumMessages: 2,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_internal_pkg_grpc_monitoring_proto_goTypes,
DependencyIndexes: file_internal_pkg_grpc_monitoring_proto_depIdxs,
EnumInfos: file_internal_pkg_grpc_monitoring_proto_enumTypes,
MessageInfos: file_internal_pkg_grpc_monitoring_proto_msgTypes,
}.Build()
File_internal_pkg_grpc_monitoring_proto = out.File

@ -4,26 +4,13 @@ package grpc;
option go_package = "internal/pkg/grpc";
service monitoring {
rpc ReactorStatusHandler(ReactorStatusPing) returns (ReactorStatusResponse);
rpc ReactorPingHandler(stream ReactorPing) returns (ReactorAck);
}
message ReactorStatusResponse {
message ReactorAck {
int32 id = 1;
}
message ReactorStatusPing {
message ReactorPing {
int32 id = 1;
// new devices
repeated Device devices = 2;
}
enum Status {
DEAD = 0;
ALIVE = 1;
UNKOWN = 2;
}
message Device {
int32 addr = 1; // i2c addr
Status status = 2; // most recent status
}

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v3.12.4
// - protoc v3.21.12
// source: internal/pkg/grpc/monitoring.proto
package grpc
@ -22,7 +22,7 @@ const _ = grpc.SupportPackageIsVersion7
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type MonitoringClient interface {
ReactorStatusHandler(ctx context.Context, in *ReactorStatusPing, opts ...grpc.CallOption) (*ReactorStatusResponse, error)
ReactorPingHandler(ctx context.Context, opts ...grpc.CallOption) (Monitoring_ReactorPingHandlerClient, error)
}
type monitoringClient struct {
@ -33,20 +33,45 @@ func NewMonitoringClient(cc grpc.ClientConnInterface) MonitoringClient {
return &monitoringClient{cc}
}
func (c *monitoringClient) ReactorStatusHandler(ctx context.Context, in *ReactorStatusPing, opts ...grpc.CallOption) (*ReactorStatusResponse, error) {
out := new(ReactorStatusResponse)
err := c.cc.Invoke(ctx, "/grpc.monitoring/ReactorStatusHandler", in, out, opts...)
func (c *monitoringClient) ReactorPingHandler(ctx context.Context, opts ...grpc.CallOption) (Monitoring_ReactorPingHandlerClient, error) {
stream, err := c.cc.NewStream(ctx, &Monitoring_ServiceDesc.Streams[0], "/grpc.monitoring/ReactorPingHandler", opts...)
if err != nil {
return nil, err
}
return out, nil
x := &monitoringReactorPingHandlerClient{stream}
return x, nil
}
type Monitoring_ReactorPingHandlerClient interface {
Send(*ReactorPing) error
CloseAndRecv() (*ReactorAck, error)
grpc.ClientStream
}
type monitoringReactorPingHandlerClient struct {
grpc.ClientStream
}
func (x *monitoringReactorPingHandlerClient) Send(m *ReactorPing) error {
return x.ClientStream.SendMsg(m)
}
func (x *monitoringReactorPingHandlerClient) CloseAndRecv() (*ReactorAck, error) {
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
m := new(ReactorAck)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// MonitoringServer is the server API for Monitoring service.
// All implementations must embed UnimplementedMonitoringServer
// for forward compatibility
type MonitoringServer interface {
ReactorStatusHandler(context.Context, *ReactorStatusPing) (*ReactorStatusResponse, error)
ReactorPingHandler(Monitoring_ReactorPingHandlerServer) error
mustEmbedUnimplementedMonitoringServer()
}
@ -54,8 +79,8 @@ type MonitoringServer interface {
type UnimplementedMonitoringServer struct {
}
func (UnimplementedMonitoringServer) ReactorStatusHandler(context.Context, *ReactorStatusPing) (*ReactorStatusResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method ReactorStatusHandler not implemented")
func (UnimplementedMonitoringServer) ReactorPingHandler(Monitoring_ReactorPingHandlerServer) error {
return status.Errorf(codes.Unimplemented, "method ReactorPingHandler not implemented")
}
func (UnimplementedMonitoringServer) mustEmbedUnimplementedMonitoringServer() {}
@ -70,22 +95,30 @@ func RegisterMonitoringServer(s grpc.ServiceRegistrar, srv MonitoringServer) {
s.RegisterService(&Monitoring_ServiceDesc, srv)
}
func _Monitoring_ReactorStatusHandler_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ReactorStatusPing)
if err := dec(in); err != nil {
func _Monitoring_ReactorPingHandler_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(MonitoringServer).ReactorPingHandler(&monitoringReactorPingHandlerServer{stream})
}
type Monitoring_ReactorPingHandlerServer interface {
SendAndClose(*ReactorAck) error
Recv() (*ReactorPing, error)
grpc.ServerStream
}
type monitoringReactorPingHandlerServer struct {
grpc.ServerStream
}
func (x *monitoringReactorPingHandlerServer) SendAndClose(m *ReactorAck) error {
return x.ServerStream.SendMsg(m)
}
func (x *monitoringReactorPingHandlerServer) Recv() (*ReactorPing, error) {
m := new(ReactorPing)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(MonitoringServer).ReactorStatusHandler(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/grpc.monitoring/ReactorStatusHandler",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(MonitoringServer).ReactorStatusHandler(ctx, req.(*ReactorStatusPing))
}
return interceptor(ctx, in, info, handler)
return m, nil
}
// Monitoring_ServiceDesc is the grpc.ServiceDesc for Monitoring service.
@ -94,12 +127,13 @@ func _Monitoring_ReactorStatusHandler_Handler(srv interface{}, ctx context.Conte
var Monitoring_ServiceDesc = grpc.ServiceDesc{
ServiceName: "grpc.monitoring",
HandlerType: (*MonitoringServer)(nil),
Methods: []grpc.MethodDesc{
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
{
MethodName: "ReactorStatusHandler",
Handler: _Monitoring_ReactorStatusHandler_Handler,
StreamName: "ReactorPingHandler",
Handler: _Monitoring_ReactorPingHandler_Handler,
ClientStreams: true,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "internal/pkg/grpc/monitoring.proto",
}

@ -1,335 +0,0 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.28.0
// protoc v3.6.1
// source: internal/pkg/grpc/server.proto
package grpc
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type ClientRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
ClientId uint32 `protobuf:"varint,1,opt,name=clientId,proto3" json:"clientId,omitempty"`
ClientType string `protobuf:"bytes,2,opt,name=clientType,proto3" json:"clientType,omitempty"`
}
func (x *ClientRequest) Reset() {
*x = ClientRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_internal_pkg_grpc_server_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *ClientRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ClientRequest) ProtoMessage() {}
func (x *ClientRequest) ProtoReflect() protoreflect.Message {
mi := &file_internal_pkg_grpc_server_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ClientRequest.ProtoReflect.Descriptor instead.
func (*ClientRequest) Descriptor() ([]byte, []int) {
return file_internal_pkg_grpc_server_proto_rawDescGZIP(), []int{0}
}
func (x *ClientRequest) GetClientId() uint32 {
if x != nil {
return x.ClientId
}
return 0
}
func (x *ClientRequest) GetClientType() string {
if x != nil {
return x.ClientType
}
return ""
}
type ClientResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
ClientId uint32 `protobuf:"varint,1,opt,name=clientId,proto3" json:"clientId,omitempty"`
ServerPort uint32 `protobuf:"varint,2,opt,name=serverPort,proto3" json:"serverPort,omitempty"`
Database *Database `protobuf:"bytes,3,opt,name=database,proto3" json:"database,omitempty"`
}
func (x *ClientResponse) Reset() {
*x = ClientResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_internal_pkg_grpc_server_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *ClientResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ClientResponse) ProtoMessage() {}
func (x *ClientResponse) ProtoReflect() protoreflect.Message {
mi := &file_internal_pkg_grpc_server_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ClientResponse.ProtoReflect.Descriptor instead.
func (*ClientResponse) Descriptor() ([]byte, []int) {
return file_internal_pkg_grpc_server_proto_rawDescGZIP(), []int{1}
}
func (x *ClientResponse) GetClientId() uint32 {
if x != nil {
return x.ClientId
}
return 0
}
func (x *ClientResponse) GetServerPort() uint32 {
if x != nil {
return x.ServerPort
}
return 0
}
func (x *ClientResponse) GetDatabase() *Database {
if x != nil {
return x.Database
}
return nil
}
type Database struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
URL string `protobuf:"bytes,1,opt,name=URL,proto3" json:"URL,omitempty"`
ORG string `protobuf:"bytes,2,opt,name=ORG,proto3" json:"ORG,omitempty"`
Token string `protobuf:"bytes,3,opt,name=token,proto3" json:"token,omitempty"`
Bucket string `protobuf:"bytes,4,opt,name=bucket,proto3" json:"bucket,omitempty"`
}
func (x *Database) Reset() {
*x = Database{}
if protoimpl.UnsafeEnabled {
mi := &file_internal_pkg_grpc_server_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *Database) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*Database) ProtoMessage() {}
func (x *Database) ProtoReflect() protoreflect.Message {
mi := &file_internal_pkg_grpc_server_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use Database.ProtoReflect.Descriptor instead.
func (*Database) Descriptor() ([]byte, []int) {
return file_internal_pkg_grpc_server_proto_rawDescGZIP(), []int{2}
}
func (x *Database) GetURL() string {
if x != nil {
return x.URL
}
return ""
}
func (x *Database) GetORG() string {
if x != nil {
return x.ORG
}
return ""
}
func (x *Database) GetToken() string {
if x != nil {
return x.Token
}
return ""
}
func (x *Database) GetBucket() string {
if x != nil {
return x.Bucket
}
return ""
}
var File_internal_pkg_grpc_server_proto protoreflect.FileDescriptor
var file_internal_pkg_grpc_server_proto_rawDesc = []byte{
0x0a, 0x1e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x67,
0x72, 0x70, 0x63, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x12, 0x04, 0x67, 0x72, 0x70, 0x63, 0x22, 0x4b, 0x0a, 0x0d, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74,
0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x63, 0x6c, 0x69, 0x65, 0x6e,
0x74, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x63, 0x6c, 0x69, 0x65, 0x6e,
0x74, 0x49, 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70,
0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x54,
0x79, 0x70, 0x65, 0x22, 0x78, 0x0a, 0x0e, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x49,
0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x49,
0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x50, 0x6f, 0x72, 0x74, 0x18,
0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x50, 0x6f, 0x72,
0x74, 0x12, 0x2a, 0x0a, 0x08, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x18, 0x03, 0x20,
0x01, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x62,
0x61, 0x73, 0x65, 0x52, 0x08, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x22, 0x5c, 0x0a,
0x08, 0x44, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x55, 0x52, 0x4c,
0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x55, 0x52, 0x4c, 0x12, 0x10, 0x0a, 0x03, 0x4f,
0x52, 0x47, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x4f, 0x52, 0x47, 0x12, 0x14, 0x0a,
0x05, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f,
0x6b, 0x65, 0x6e, 0x12, 0x16, 0x0a, 0x06, 0x62, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x18, 0x04, 0x20,
0x01, 0x28, 0x09, 0x52, 0x06, 0x62, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x32, 0x50, 0x0a, 0x09, 0x68,
0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x12, 0x43, 0x0a, 0x16, 0x43, 0x6c, 0x69, 0x65,
0x6e, 0x74, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x48, 0x61, 0x6e, 0x64, 0x6c,
0x65, 0x72, 0x12, 0x13, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74,
0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x43,
0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x13, 0x5a,
0x11, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x67, 0x72,
0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_internal_pkg_grpc_server_proto_rawDescOnce sync.Once
file_internal_pkg_grpc_server_proto_rawDescData = file_internal_pkg_grpc_server_proto_rawDesc
)
func file_internal_pkg_grpc_server_proto_rawDescGZIP() []byte {
file_internal_pkg_grpc_server_proto_rawDescOnce.Do(func() {
file_internal_pkg_grpc_server_proto_rawDescData = protoimpl.X.CompressGZIP(file_internal_pkg_grpc_server_proto_rawDescData)
})
return file_internal_pkg_grpc_server_proto_rawDescData
}
var file_internal_pkg_grpc_server_proto_msgTypes = make([]protoimpl.MessageInfo, 3)
var file_internal_pkg_grpc_server_proto_goTypes = []interface{}{
(*ClientRequest)(nil), // 0: grpc.ClientRequest
(*ClientResponse)(nil), // 1: grpc.ClientResponse
(*Database)(nil), // 2: grpc.Database
}
var file_internal_pkg_grpc_server_proto_depIdxs = []int32{
2, // 0: grpc.ClientResponse.database:type_name -> grpc.Database
0, // 1: grpc.handshake.ClientDiscoveryHandler:input_type -> grpc.ClientRequest
1, // 2: grpc.handshake.ClientDiscoveryHandler:output_type -> grpc.ClientResponse
2, // [2:3] is the sub-list for method output_type
1, // [1:2] is the sub-list for method input_type
1, // [1:1] is the sub-list for extension type_name
1, // [1:1] is the sub-list for extension extendee
0, // [0:1] is the sub-list for field type_name
}
func init() { file_internal_pkg_grpc_server_proto_init() }
func file_internal_pkg_grpc_server_proto_init() {
if File_internal_pkg_grpc_server_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_internal_pkg_grpc_server_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ClientRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_internal_pkg_grpc_server_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ClientResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_internal_pkg_grpc_server_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*Database); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_internal_pkg_grpc_server_proto_rawDesc,
NumEnums: 0,
NumMessages: 3,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_internal_pkg_grpc_server_proto_goTypes,
DependencyIndexes: file_internal_pkg_grpc_server_proto_depIdxs,
MessageInfos: file_internal_pkg_grpc_server_proto_msgTypes,
}.Build()
File_internal_pkg_grpc_server_proto = out.File
file_internal_pkg_grpc_server_proto_rawDesc = nil
file_internal_pkg_grpc_server_proto_goTypes = nil
file_internal_pkg_grpc_server_proto_depIdxs = nil
}

@ -1,28 +0,0 @@
syntax = "proto3";
package grpc;
option go_package = "internal/pkg/grpc";
service handshake {
rpc ClientDiscoveryHandler(ClientRequest) returns (ClientResponse);
}
message ClientRequest {
uint32 clientId = 1;
string clientType = 2;
string ip = 3; // client ip
uint32 port = 4; // client port for gRPC server
}
message ClientResponse {
uint32 clientId = 1;
uint32 serverPort = 2;
Database database = 3;
}
message Database {
string URL = 1;
string ORG = 2;
string token = 3;
string bucket = 4;
}

@ -1,81 +0,0 @@
package influxdb
import (
_ "fmt"
_ "github.com/influxdata/influxdb-client-go/v2"
"github.com/spf13/viper"
)
type DBInfo struct {
URL string `mapstructure:"url"`
Org string `mapstructure:"org,omitempty`
Bucket string `mapstructure:"bucket,omitempty"`
Token string `mapstructure:"token,omitempty"`
// Client *influxdb2.Client
}
type DBAdmin struct {
// struct for admin methods
*DBInfo
Config *viper.Viper
}
type DBClient struct {
// struct for client methods
*DBInfo
Config *viper.Viper
}
func NewDBInfo(config *viper.Viper) (*DBInfo, error) {
db := &DBInfo{}
// grabbing config vals
err := config.UnmarshalKey("db", db)
return db, err
}
func NewDBClient(config *viper.Viper) (*DBClient, error) {
client := &DBClient{Config: config}
// grabbing config vals
var err error
client.DBInfo, err = NewDBInfo(config)
return client, err
}
func NewDBAdmin(config *viper.Viper) (*DBAdmin, error) {
admin := &DBAdmin{Config: config}
var err error
// creating client
admin.DBInfo, err = NewDBInfo(config)
return admin, err
}
// base level funcs
func (d *DBInfo) Start() error {
// connect to DB based w/ info
return nil
}
func (d *DBAdmin) GetReactorClient(id int) (url, bucket, org, token string, err error) {
// given an id returns
// (url, org, bucket, token, error) for said id
/*
client := influxdb2.NewClient(d.URL, d.Token)
defer client.Close()
bucket, err := client.BucketsAPI().FindBucketByName(context.Background(), id)
if err != nil {
return "", "", err
}
if d.ReactorExists(id) {
// get corresponding reactor token and bucket
}
*/
url = d.URL
org = d.Org
token = ""
bucket = ""
//err = errors.New("Unimpl")
err = nil
return
}

@ -4,9 +4,7 @@ package server
import (
pb "FRMS/internal/pkg/grpc"
"FRMS/internal/pkg/influxdb"
"FRMS/internal/pkg/logging"
"context"
"errors"
"fmt"
"net"
"sync"
@ -15,264 +13,68 @@ import (
"google.golang.org/grpc"
)
// Database is an interface to interact with the server database.
// Used mainly to find existing credentials for
// incoming reactor client connections.
type Database interface {
GetReactorClient(int) (string, string, string, string, error) // returns (url, org, bucket, token, err)
}
// NewDatabaseAdmin creates a new database admin that implements the
// Database interface.
// Allows access to the database to find/create reactor credentials.
// Implemented via the influxdb package.
func NewDatabaseAdmin(config *viper.Viper) (Database, error) {
return influxdb.NewDBAdmin(config)
}
var (
ErrMissingPort = errors.New("port not set")
)
// CentralCoordinator is the main coordinator struct that runs on the server.
// Used to oversee the reactor managers as well as process incoming
// client connections.
// Also interacts with the database and global config.
type CentralCoordinator struct {
ClientConnections *ClientPacket
*ReactorCoordinator
Database
// Coordinator is runs on the server and is used to oversee
// the reactor managers as well as process incoming client connections.
type Coordinator struct {
Config *viper.Viper
// from config
Ports map[string]int `mapstructure:"ports"`
Err chan error
}
// NewCentralCoordinator creates a central coordinator with the given global
// config and error channel.
// It will create a new reactor coordinator and database admin.
// It will also try to load the existing configuration information.
func NewCentralCoordinator(config *viper.Viper, ch chan error) *CentralCoordinator {
// create a central coordinator to manage requests
db, err := NewDatabaseAdmin(config)
if err != nil {
ch <- err
}
rc, err := NewReactorCoordinator(config, ch)
if err != nil {
ch <- err
}
config.UnmarshalKey("server.ports", rc)
c := &CentralCoordinator{
Err: ch,
Config: config,
Database: db,
ReactorCoordinator: rc,
}
// grab config settings
if err = config.UnmarshalKey("server", c); err != nil {
ch <- err
}
return c
}
// Start activates the central coordinator and ensures it is ready for
// new clients.
// Creates a listener and starts both reactor coordinator and listener.
func (c *CentralCoordinator) Start() {
clientChan := make(chan *ClientPacket)
l, err := NewListener(clientChan, c.Err)
if err != nil {
c.Err <- err
}
c.Config.UnmarshalKey("server.ports", l)
if err := c.ReactorCoordinator.Start(); err != nil {
c.Err <- err
}
if err := l.Start(); err != nil {
c.Err <- err
}
go c.ClientListener(clientChan)
}
// ClientListener listens on the given channel for clients that are sent
// over from the listener.
// The clients are then passed to the handler before returning the response.
func (c *CentralCoordinator) ClientListener(ch chan *ClientPacket) {
for client := range ch {
client.Response <- c.ClientHandler(client.Client) // respond with cred
}
}
// ClientHandler takes in a client and retrieves the associated
// database credentials.
// Currently only handles reactor type clients, can be modified
// to support others.
func (c *CentralCoordinator) ClientHandler(cl *Client) *ClientResponse {
// returns reactor db info
var err error
cr := &ClientResponse{Port: c.Ports[cl.Type]}
if cl.Type != "reactor" {
c.Err <- fmt.Errorf("client type %s not recognized", cl.Type)
}
go c.ReactorCoordinator.ClientHandler(cl)
listener net.Listener
grpcServer *grpc.Server
// db info
cr.URL, cr.Org, cr.Token, cr.Bucket, err = c.Database.GetReactorClient(cl.Id)
DatabasePort int `mapstructure:"database_port"`
GRPCPort int `mapstructure:"grpc_port"`
if err != nil {
c.Err <- err
}
return cr
}
directory map[int]*ReactorManager
managerMu sync.RWMutex
// ReactorCoordinator is a strucutre used to store reactor managers for
// clients that have connected at some point.
type ReactorCoordinator struct {
Port int `mapstructure:"reactor"`
*ReactorManagers
Err chan error
pb.UnimplementedMonitoringServer
}
// ReactorManagers is a structure that stores a concurrent map of the
// reactor managers as well as the global config.
type ReactorManagers struct {
Config *viper.Viper
Directory map[int]*ReactorManager
sync.RWMutex
// grpc
pb.UnimplementedHandshakeServer
pb.UnimplementedMonitoringServer
}
// NewReactorCoordinator takes the global config and error channel and returns
// a pointer to a ReactorCoordinator as well as any errors.
func NewReactorCoordinator(config *viper.Viper, errCh chan error) (*ReactorCoordinator, error) {
// NewCentralCoordinator creates a central coordinator with the given global
// config and error channel.
func NewCentralCoordinator(config *viper.Viper, ch chan error) *Coordinator {
rmap := make(map[int]*ReactorManager)
rm := &ReactorManagers{
Directory: rmap,
return &Coordinator{
Err: ch,
Config: config,
directory: rmap,
}
return &ReactorCoordinator{
Err: errCh,
ReactorManagers: rm,
}, nil
}
// Start starts the reactor coordinator and kicks off
// registering the gRPC service
func (c *ReactorCoordinator) Start() error {
logging.Debug(logging.DStart, "RCO 01 Starting!")
return c.Register()
}
// ClientHandler takes in a client and finds or creates the correct
// manager for said client.
func (c *ReactorCoordinator) ClientHandler(cl *Client) {
if err := c.UpdateReactorManager(cl, c.Err); err != nil {
c.Err <- err
}
}
// GetReactorManager attempts to locate a reactor manager for a given id.
// Returns either the associated reactor manager, or an error if
// a manager does not exist for the given id.
func (m *ReactorManagers) GetReactorManager(id int) (*ReactorManager, error) {
m.RLock()
defer m.RUnlock()
rm, exists := m.Directory[id]
if !exists {
return &ReactorManager{}, fmt.Errorf("no manager for reactor %d", id)
}
return rm, nil
}
// UpdateReactorManager takes in a client and error channel and passes the
// client to the associate reactor manager.
// If the client does not have an existing reactor manager, it will create one
// , start it, and add it to the map for future calls.
// The function then calls UpdateClient on the reactor manager and returns
// any errors generated by this function.
func (m *ReactorManagers) UpdateReactorManager(cl *Client, errCh chan error) error {
m.RLock()
defer m.RUnlock()
var err error
rm, exists := m.Directory[cl.Id]
// Start loads config, starts network listener and registers grpc handlers.
// Ready for new clients on return.
func (c *Coordinator) Start() error {
if !exists {
// reactor manager does not exist, creating new one
logging.Debug(
logging.DClient,
"RCO 01 creating manager for %v",
cl.Id,
)
if rm, err = NewReactorManager(cl, m.Config, errCh); err != nil {
if err := c.Config.Unmarshal(c); err != nil {
return err
}
if err = rm.Start(); err != nil {
return err
}
// ensure it shows up as missing
if c.GRPCPort == 0 {
c.Config.Set("grpc_port", 0)
c.Config.WriteConfig()
m.Directory[cl.Id] = rm
return ErrMissingPort
}
return rm.UpdateClient(cl)
}
// Register attaches to the servers port and attempts to bind
// a gRPC server to it.
func (r *ReactorCoordinator) Register() error {
lis, err := net.Listen("tcp", fmt.Sprintf(":%v", r.Port))
lis, err := net.Listen("tcp", fmt.Sprintf(":%v", c.GRPCPort))
if err != nil {
return err
}
grpcServer := grpc.NewServer()
pb.RegisterMonitoringServer(grpcServer, r)
go grpcServer.Serve(lis)
logging.Debug(logging.DClient, "RCO 01 ready")
return nil
}
// ReactorStatusHandler is a gRPC handler used to handle incoming
// reactor requests containing information about said reactor.
// It will get the associate reactor manager and pass the
// request device information before returning an acknowledgement.
func (r *ReactorCoordinator) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) {
rm, err := r.GetReactorManager(int(req.GetId()))
c.listener = lis
c.grpcServer = grpcServer
if err != nil {
return &pb.ReactorStatusResponse{}, err
}
go rm.ReactorDeviceHandler(req.GetDevices())
return &pb.ReactorStatusResponse{Id: int32(rm.Id)}, nil
return c.Register()
}

@ -0,0 +1,13 @@
package server
type dbinfo struct {
url string
org string
token string
bucket string
}
func (c *Coordinator) getReactorDatabaseCred(id int) (*dbinfo, error) {
return &dbinfo{}, nil
}

@ -0,0 +1,48 @@
package server
import (
pb "FRMS/internal/pkg/grpc"
"FRMS/internal/pkg/logging"
"context"
)
// ClientDiscoveryHandler implements the grpc method which can be called
// by incoming clients to first make connection to the central
// coordinator and receive database credentials.
func (c *Coordinator) ReactorClientHandler(
ctx context.Context,
req *pb.ReactorClientRequest,
) (*pb.ReactorClientResponse, error) {
id := int(req.GetId())
logging.Debug(
logging.DClient,
"LIS 00 reactor %v has connected\n",
id,
)
db, err := c.getReactorDatabaseCred(id)
if err != nil {
return &pb.ReactorClientResponse{}, err
}
return &pb.ReactorClientResponse{
Id: id,
Url: db.url,
Org: db.org,
Token: db.token,
Bucket: db.bucket,
}, err
}
// ReactorStatusHandler is a gRPC handler used to handle incoming
// reactor requests containing information about said reactor.
// It will get the associate reactor manager and pass the
// request device information before returning an acknowledgement.
func (c *Coordinator) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) {
// rm, err := c.LoadReactorManager(int(req.GetId()))
return &pb.ReactorStatusResponse{}, nil
}

@ -1,120 +0,0 @@
package server
import (
pb "FRMS/internal/pkg/grpc"
"FRMS/internal/pkg/logging"
"context"
"fmt"
"net"
"google.golang.org/grpc"
)
// Listener is a struct that listens for incoming clients on a given port
// and passes them the central coordinator.
// Implements the gRPC handshake server for clients.
type Listener struct {
Port int `mapstructure:"lis"`
ClientConnections chan *ClientPacket
Err chan error
pb.UnimplementedHandshakeServer
}
// ClientPacket is a uniform type to pass on a channel to the server.
type ClientPacket struct {
*Client
Response chan *ClientResponse
}
// Client is a struct containing information about the client on
// the incoming connection.
type Client struct {
//Ip string
//Port int
Id int
Model string
Type string
}
// ClientResponse is the database credentials returned from the central
// coordinator for the given client.
type ClientResponse struct {
Port int
URL string
Org string
Token string
Bucket string
}
// NewListener createsa new listener with the given client and error channels
func NewListener(
cch chan *ClientPacket,
ech chan error,
) (*Listener, error) {
return &Listener{
Err: ech,
ClientConnections: cch,
}, nil
}
// Start activates the listener and kicks off the gRPC binding process
func (l *Listener) Start() error {
logging.Debug(logging.DStart, "LIS 01 Started client listener")
return l.Register()
}
// Register creates a net listener on the port and binds a grpc server to it
// before registering a handshake server.
func (l *Listener) Register() error {
lis, err := net.Listen("tcp", fmt.Sprintf(":%v", l.Port))
if err != nil {
return err
}
grpcServer := grpc.NewServer()
pb.RegisterHandshakeServer(grpcServer, l)
go grpcServer.Serve(lis)
logging.Debug(logging.DStart, "LIS 01 Registered on port %v", l.Port)
return nil
}
// ClientDiscoveryHandler implements the grpc method which can be called
// by incoming clients to first make connection to the central
// coordinator and receive database credentials.
func (l *Listener) ClientDiscoveryHandler(ctx context.Context, ping *pb.ClientRequest) (*pb.ClientResponse, error) {
c := &Client{
Id: int(ping.GetClientId()),
Type: ping.GetClientType(),
}
logging.Debug(logging.DClient, "LIS 01 %v %v has connected\n", c.Type, c.Id)
ch := make(chan *ClientResponse)
p := &ClientPacket{
Client: c,
Response: ch,
}
l.ClientConnections <- p
resp := <-ch
db := &pb.Database{
URL: resp.URL,
ORG: resp.Org,
Token: resp.Token,
Bucket: resp.Bucket,
}
return &pb.ClientResponse{
ClientId: uint32(c.Id),
ServerPort: uint32(resp.Port),
Database: db,
}, nil
}

@ -1,17 +0,0 @@
package server
import (
"testing"
"github.com/stretchr/testify/assert"
)
// TestNewListener tries to create a new listener
func TestNewListener(t *testing.T) {
assert := assert.New(t)
cch := make(chan *ClientPacket)
ech := make(chan error)
_, err := NewListener(cch, ech)
assert.Equal(err, nil, "creating listener failed")
}

@ -0,0 +1,121 @@
package server
import (
"FRMS/internal/pkg/logging"
"FRMS/internal/pkg/manager"
"errors"
"time"
)
var (
ErrNoReactorManager = errors.New("no reactor manager found")
)
// ReactorManager can be started/stopped as clients connect/disconnect.
type ReactorManager struct {
Manager // base manager interface
}
// Manager is an interface requiring a structure that can be started
// and stopped as well as provide timeouts in milliseconds.
type Manager interface {
Start() error // status checks
Stop() error
Timeout() (time.Duration, error) // TO Generator
}
// NewManager returns a manager fulfilling the Manager interface as well as
// any potential errors.
func NewManager(max int) (Manager, error) {
return manager.New(max)
}
// GetReactorManager returns a reactor manager for passed id.
// Throws error if manager not found for id.
func (c *Coordinator) LoadReactorManager(id int) (*ReactorManager, error) {
c.managerMu.RLock()
defer c.managerMu.RUnlock()
rm, exists := c.directory[id]
if !exists {
logging.Debug(
logging.DClient,
"RCO 00 creating manager for %v",
id,
)
m, err := NewManager(0)
rm = &ReactorManager{
Manager: m,
}
if err = rm.Start(); err != nil {
return rm, err
}
c.directory[id] = rm
}
return rm, nil
}
// // NewReactorManager takes in a client, config and channel to pass errors on.
// // Returns a new reactor manager as well as any errors that occured during
// // creation.
// // Uses MaxConnectionAttempts which defaults to 10 to prevent
// // unnessecary network load and/or timeout lengths.
// func NewReactorManager(
// ) (*ReactorManager, error) {
// m, err := NewManager(MaxConnectionAttempts)
// if err != nil {
// return &ReactorManager{}, err
// }
// return r, err
// }
// Start logs the start and calls start on the embedded manager.
func (r *ReactorManager) Start() error {
// logging.Debug(logging.DStart, "RMA starting", r.Id)
return r.Manager.Start()
}
// Stop logs the stop and calls stop on the embedded manager.
func (r *ReactorManager) Stop() error {
// logging.Debug(logging.DExit, "RMA %v stopping", r.Id)
return r.Manager.Stop()
}
// UpdateClient is used to change the underlying manager client if there
// changes to its data.
//
// BUG(Keegan): Client is not protected by a lock and may lead to races
// func (r *ReactorManager) UpdateClient(cl *Client) error {
// logging.Debug(logging.DClient, "RMA %v updating client", r.Id)
// r.Client = cl
// return nil
// }
// // ReactorDeviceHandler processes incoming device information and
// // updates the manager accordingly.
// func (r *ReactorManager) ReactorDeviceHandler(devs []*pb.Device) error {
// logging.Debug(logging.DClient, "CCO recieved ping from %v", r.Id)
// for _, dev := range devs {
// logging.Debug(
// logging.DClient,
// "CCO %v device %v is %v",
// r.Id,
// dev.GetAddr(),
// dev.GetStatus().String(),
// )
// }
// return nil
// }

@ -1,108 +0,0 @@
package server
import (
pb "FRMS/internal/pkg/grpc"
"FRMS/internal/pkg/logging"
"FRMS/internal/pkg/manager"
"time"
"github.com/spf13/viper"
)
// MaxConnectionAttempts is the max number of tries to allow
// when connecting to a reactor.
const MaxConnectionAttempts = 10
// Manager is an interface requiring a structure that can be started
// and stopped as well as provide timeouts in milliseconds.
type Manager interface {
Start() error // status checks
Stop() error
Timeout() (time.Duration, error) // TO Generator
}
// NewManager returns a manager fulfilling the Manager interface as well as
// any potential errors.
func NewManager(max int) (Manager, error) {
return manager.New(max)
}
// ReactorManager contains a base manager, client, global
// config, and error channel.
// The ReactorManager can be started/stopped as clients connect/disconnect.
// Also serves as handler for gRPC requests from reactors.
// Can be extended to write changes to config.
type ReactorManager struct {
Manager // base manager interface
*Client
Config *viper.Viper // global config to maintain
Err chan error
}
// NewReactorManager takes in a client, config and channel to pass errors on.
// Returns a new reactor manager as well as any errors that occured during
// creation.
// Uses MaxConnectionAttempts which defaults to 10 to prevent
// unnessecary network load and/or timeout lengths.
func NewReactorManager(
cl *Client,
config *viper.Viper,
errCh chan error,
) (*ReactorManager, error) {
m, err := NewManager(MaxConnectionAttempts)
if err != nil {
return &ReactorManager{}, err
}
r := &ReactorManager{
Manager: m,
Client: cl,
Config: config,
Err: errCh,
}
return r, err
}
// Start logs the start and calls start on the embedded manager.
func (r *ReactorManager) Start() error {
logging.Debug(logging.DStart, "RMA %v starting", r.Id)
return r.Manager.Start()
}
// Stop logs the stop and calls stop on the embedded manager.
func (r *ReactorManager) Stop() error {
logging.Debug(logging.DExit, "RMA %v stopping", r.Id)
return r.Manager.Stop()
}
// UpdateClient is used to change the underlying manager client if there
// changes to its data.
//
// BUG(Keegan): Client is not protected by a lock and may lead to races
func (r *ReactorManager) UpdateClient(cl *Client) error {
logging.Debug(logging.DClient, "RMA %v updating client", r.Id)
r.Client = cl
return nil
}
// ReactorDeviceHandler processes incoming device information and
// updates the manager accordingly.
func (r *ReactorManager) ReactorDeviceHandler(devs []*pb.Device) error {
logging.Debug(logging.DClient, "RMA %v recieved ping", r.Id)
for _, dev := range devs {
logging.Debug(
logging.DClient,
"RMA %v device %v is %v",
r.Id,
dev.GetAddr(),
dev.GetStatus().String(),
)
}
return nil
}

@ -1,85 +0,0 @@
package server
import (
pb "FRMS/internal/pkg/grpc"
"math/rand"
"testing"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
)
// dummyClient creates a dummy client for testing.
func dummyClient() *Client {
return &Client{
Id: rand.Int(),
Model: "dummy",
Type: "dummy",
}
}
func dummyDevices() []*pb.Device {
numDevs := 10
devs := make([]*pb.Device, numDevs)
for i := 0; i < numDevs; i++ {
dev := &pb.Device{
Addr: int32(rand.Intn(255)),
Status: pb.Status(rand.Intn(2)),
}
devs = append(devs, dev)
}
return devs
}
// dummyReactorManager creates a dummy reactor manager for testing.
func dummyReactorManager() (*ReactorManager, error) {
ch := make(chan error)
cl := dummyClient()
return NewReactorManager(cl, viper.New(), ch)
}
// TestNewReactorManager tries to create a new reactor manager.
func TestNewReactorManager(t *testing.T) {
assert := assert.New(t)
_, err := dummyReactorManager()
assert.Equal(err, nil, "failed to create reactor manager")
}
// TestReactorManager tries to start/stop reactor manager
func TestReactorManager(t *testing.T) {
assert := assert.New(t)
rm, err := dummyReactorManager()
assert.Equal(err, nil, "failed to create reactor manager")
cycles := 10
for i := 0; i < cycles; i++ {
assert.NoError(rm.Start(), "failed to start")
assert.NoError(rm.Stop(), "failed to start")
}
}
// TestUpdateClient tries to update a reactor managers embedded client.
func TestUpdateClient(t *testing.T) {
assert := assert.New(t)
rm, err := dummyReactorManager()
assert.Equal(err, nil, "failed to create reactor manager")
cl := dummyClient()
assert.NoError(rm.UpdateClient(cl), "failed to update client")
}
// TestReactorDeviceHandler ensures that a list of devices can be processed.
func TestReactorDeviceHandler(t *testing.T) {
assert := assert.New(t)
rm, err := dummyReactorManager()
assert.Equal(err, nil, "failed to create reactor manager")
devs := dummyDevices()
assert.NoError(rm.ReactorDeviceHandler(devs), "failed to handle devices")
}

@ -0,0 +1,19 @@
package server
import (
pb "FRMS/internal/pkg/grpc"
"FRMS/internal/pkg/logging"
)
func (c *Coordinator) Register() error {
// register services
pb.RegisterHandshakeServer(c.grpcServer, c)
go c.grpcServer.Serve(c.listener)
// testing
pb.RegisterMonitoringServer(c.grpcServer, c)
logging.Debug(logging.DStart, "CCO 00 registered grpc")
return nil
}
Loading…
Cancel
Save