fixing refs

main
spinach 1 year ago
parent d0201b23f1
commit 6acf06f21b

@ -0,0 +1,54 @@
// Package config wraps viper to load/store configs
// using the XDG standard ($HOME/.config/FRMS) as the base directory
//
// WARNING: only built for Linux
import (
"dmac/pkg/logging"
"FRMS/internal/pkg/logging"
"fmt"
"os"
"github.com/spf13/viper"
)
// Load the file at path/file into a viper object.
// Expects config file to be yaml.
func Load(file, path, ext string) (*viper.Viper, error) {
logging.Debug(logging.DStart, "CON loading %s", file)
config := viper.New()
//configFile := fmt.Sprintf("%s/%s.%s", path, file, ext)
config.SetConfigName(file)
config.AddConfigPath(path)
config.SetConfigType(ext)
// Sets env vars
config.AutomaticEnv()
// create config directory if it doesn't exist
if err := os.MkdirAll(path, 0750); err != nil && !os.IsExist(err) {
return config, err
}
// attempt to create an empty config incase it doesn't exist
// if err := config.SafeWriteConfigAs(configFile); err != nil {
// // if error thrown because file exists, fine to ignore
// if _, ok := err.(viper.ConfigFileAlreadyExistsError); !ok {
// return config, err
// }
// }
if err := config.ReadInConfig(); err != nil {
fmt.Printf("read error %v\n", config)
return config, err
}
logging.Debug(logging.DStart, "CON Loaded configs from %#V", config.ConfigFileUsed())
// returning config object
return config, nil
}

@ -0,0 +1,64 @@
// package Database wraps some influx db methods to provide functionality.
package database
import (
"context"
"errors"
"fmt"
influx "github.com/influxdata/influxdb-client-go/v2"
"github.com/spf13/viper"
)
var (
ErrDBConnection = errors.New("connection to database failed")
ErrNoURLFound = errors.New("database url not found")
)
var db influx.Client
// Connect takes in a config and attempts to create a client for influxdb.
// Will automatically write changes back to config for future attempts
func Connect(config *viper.Viper) error {
url := config.GetString("db.url")
token := config.GetString("db.token")
if url == "" {
return ErrNoURLFound
}
db = influx.NewClient(url, token)
if token == "" {
// try setup
fmt.Printf("attempting to setup database at %v\n", url)
user := config.GetString("db.username")
password := config.GetString("db.password")
org := config.GetString("db.org")
bucket := config.GetString("db.bucket")
Setup(user, pass, org, bucket
}
db = influx.NewClient(url, token)
return nil
}
func Setup(user, pass, org, bucket string, ret int) (string, error) {
resp, err := db.Setup(context.Background(), user, pass, org, bucket, ret)
return "", nil
}
func GetBucket(id int) (string, error) {
return "", nil
}
func GetToken(id int) (string, error) {
// bucket, err := client.BucketsAPI().FindBucketByName(context.Background(), id)
return "", nil
}

@ -0,0 +1,42 @@
syntax = "proto3";
package grpc;
option go_package = "internal/pkg/grpc";
service device {
// groups basic device interactions
// get/set name based on request
rpc Name(NameRequest) returns (NameResponse)
}
message NameRequest {
// empty for future expansion
string Name = 1;
}
message NameResponse {
string Name = 1;
}
service sensor {
// sensor specific functions
rpc Reading(ReadingRequest) returns (ReadingResponse)
rpc SampleRate(SampleRateRequest) returns (SampleRateResponse)
}
message ReadingRequest {
// empty
}
message ReadingResponse {
string Reading = 1; // formatted reading "9.7 pH"
int64 Timestamp = 2; // when the reading was taken
}
message SampleRateRequest {
int32 SampleRate = 1; // 0 to return current sample rate, value in seconds
}
message SampleRateResponse {
int32 SampleRate = 1; // returns the set sample rate
}

@ -0,0 +1,260 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.28.1
// protoc v3.21.12
// source: internal/pkg/grpc/handshake.proto
package grpc
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type ReactorClientRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Id uint32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"`
Port uint32 `protobuf:"varint,2,opt,name=port,proto3" json:"port,omitempty"` // client gRPC port
}
func (x *ReactorClientRequest) Reset() {
*x = ReactorClientRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_internal_pkg_grpc_handshake_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *ReactorClientRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ReactorClientRequest) ProtoMessage() {}
func (x *ReactorClientRequest) ProtoReflect() protoreflect.Message {
mi := &file_internal_pkg_grpc_handshake_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ReactorClientRequest.ProtoReflect.Descriptor instead.
func (*ReactorClientRequest) Descriptor() ([]byte, []int) {
return file_internal_pkg_grpc_handshake_proto_rawDescGZIP(), []int{0}
}
func (x *ReactorClientRequest) GetId() uint32 {
if x != nil {
return x.Id
}
return 0
}
func (x *ReactorClientRequest) GetPort() uint32 {
if x != nil {
return x.Port
}
return 0
}
type ReactorClientResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Id uint32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"`
Url string `protobuf:"bytes,2,opt,name=url,proto3" json:"url,omitempty"`
Org string `protobuf:"bytes,3,opt,name=org,proto3" json:"org,omitempty"`
Token string `protobuf:"bytes,4,opt,name=token,proto3" json:"token,omitempty"`
Bucket string `protobuf:"bytes,5,opt,name=bucket,proto3" json:"bucket,omitempty"`
}
func (x *ReactorClientResponse) Reset() {
*x = ReactorClientResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_internal_pkg_grpc_handshake_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *ReactorClientResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ReactorClientResponse) ProtoMessage() {}
func (x *ReactorClientResponse) ProtoReflect() protoreflect.Message {
mi := &file_internal_pkg_grpc_handshake_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ReactorClientResponse.ProtoReflect.Descriptor instead.
func (*ReactorClientResponse) Descriptor() ([]byte, []int) {
return file_internal_pkg_grpc_handshake_proto_rawDescGZIP(), []int{1}
}
func (x *ReactorClientResponse) GetId() uint32 {
if x != nil {
return x.Id
}
return 0
}
func (x *ReactorClientResponse) GetUrl() string {
if x != nil {
return x.Url
}
return ""
}
func (x *ReactorClientResponse) GetOrg() string {
if x != nil {
return x.Org
}
return ""
}
func (x *ReactorClientResponse) GetToken() string {
if x != nil {
return x.Token
}
return ""
}
func (x *ReactorClientResponse) GetBucket() string {
if x != nil {
return x.Bucket
}
return ""
}
var File_internal_pkg_grpc_handshake_proto protoreflect.FileDescriptor
var file_internal_pkg_grpc_handshake_proto_rawDesc = []byte{
0x0a, 0x21, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x67,
0x72, 0x70, 0x63, 0x2f, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x2e, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x12, 0x04, 0x67, 0x72, 0x70, 0x63, 0x22, 0x3a, 0x0a, 0x14, 0x52, 0x65, 0x61,
0x63, 0x74, 0x6f, 0x72, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x02, 0x69,
0x64, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x6f, 0x72, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52,
0x04, 0x70, 0x6f, 0x72, 0x74, 0x22, 0x79, 0x0a, 0x15, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72,
0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x0e,
0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x02, 0x69, 0x64, 0x12, 0x10,
0x0a, 0x03, 0x75, 0x72, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x75, 0x72, 0x6c,
0x12, 0x10, 0x0a, 0x03, 0x6f, 0x72, 0x67, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6f,
0x72, 0x67, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28,
0x09, 0x52, 0x05, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x16, 0x0a, 0x06, 0x62, 0x75, 0x63, 0x6b,
0x65, 0x74, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x62, 0x75, 0x63, 0x6b, 0x65, 0x74,
0x32, 0x5c, 0x0a, 0x09, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x12, 0x4f, 0x0a,
0x14, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x48, 0x61,
0x6e, 0x64, 0x6c, 0x65, 0x72, 0x12, 0x1a, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x61,
0x63, 0x74, 0x6f, 0x72, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x1a, 0x1b, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72,
0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x13,
0x5a, 0x11, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x67,
0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_internal_pkg_grpc_handshake_proto_rawDescOnce sync.Once
file_internal_pkg_grpc_handshake_proto_rawDescData = file_internal_pkg_grpc_handshake_proto_rawDesc
)
func file_internal_pkg_grpc_handshake_proto_rawDescGZIP() []byte {
file_internal_pkg_grpc_handshake_proto_rawDescOnce.Do(func() {
file_internal_pkg_grpc_handshake_proto_rawDescData = protoimpl.X.CompressGZIP(file_internal_pkg_grpc_handshake_proto_rawDescData)
})
return file_internal_pkg_grpc_handshake_proto_rawDescData
}
var file_internal_pkg_grpc_handshake_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_internal_pkg_grpc_handshake_proto_goTypes = []interface{}{
(*ReactorClientRequest)(nil), // 0: grpc.ReactorClientRequest
(*ReactorClientResponse)(nil), // 1: grpc.ReactorClientResponse
}
var file_internal_pkg_grpc_handshake_proto_depIdxs = []int32{
0, // 0: grpc.handshake.ReactorClientHandler:input_type -> grpc.ReactorClientRequest
1, // 1: grpc.handshake.ReactorClientHandler:output_type -> grpc.ReactorClientResponse
1, // [1:2] is the sub-list for method output_type
0, // [0:1] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}
func init() { file_internal_pkg_grpc_handshake_proto_init() }
func file_internal_pkg_grpc_handshake_proto_init() {
if File_internal_pkg_grpc_handshake_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_internal_pkg_grpc_handshake_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ReactorClientRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_internal_pkg_grpc_handshake_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ReactorClientResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_internal_pkg_grpc_handshake_proto_rawDesc,
NumEnums: 0,
NumMessages: 2,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_internal_pkg_grpc_handshake_proto_goTypes,
DependencyIndexes: file_internal_pkg_grpc_handshake_proto_depIdxs,
MessageInfos: file_internal_pkg_grpc_handshake_proto_msgTypes,
}.Build()
File_internal_pkg_grpc_handshake_proto = out.File
file_internal_pkg_grpc_handshake_proto_rawDesc = nil
file_internal_pkg_grpc_handshake_proto_goTypes = nil
file_internal_pkg_grpc_handshake_proto_depIdxs = nil
}

@ -0,0 +1,21 @@
syntax = "proto3";
package grpc;
option go_package = "internal/pkg/grpc";
service handshake {
rpc ReactorClientHandler(ReactorClientRequest) returns (ReactorClientResponse);
}
message ReactorClientRequest {
uint32 id = 1;
uint32 port = 2; // client gRPC port
}
message ReactorClientResponse {
uint32 id = 1;
string url = 2;
string org = 3;
string token = 4;
string bucket = 5;
}

@ -0,0 +1,105 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v3.21.12
// source: internal/pkg/grpc/handshake.proto
package grpc
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// HandshakeClient is the client API for Handshake service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type HandshakeClient interface {
ReactorClientHandler(ctx context.Context, in *ReactorClientRequest, opts ...grpc.CallOption) (*ReactorClientResponse, error)
}
type handshakeClient struct {
cc grpc.ClientConnInterface
}
func NewHandshakeClient(cc grpc.ClientConnInterface) HandshakeClient {
return &handshakeClient{cc}
}
func (c *handshakeClient) ReactorClientHandler(ctx context.Context, in *ReactorClientRequest, opts ...grpc.CallOption) (*ReactorClientResponse, error) {
out := new(ReactorClientResponse)
err := c.cc.Invoke(ctx, "/grpc.handshake/ReactorClientHandler", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// HandshakeServer is the server API for Handshake service.
// All implementations must embed UnimplementedHandshakeServer
// for forward compatibility
type HandshakeServer interface {
ReactorClientHandler(context.Context, *ReactorClientRequest) (*ReactorClientResponse, error)
mustEmbedUnimplementedHandshakeServer()
}
// UnimplementedHandshakeServer must be embedded to have forward compatible implementations.
type UnimplementedHandshakeServer struct {
}
func (UnimplementedHandshakeServer) ReactorClientHandler(context.Context, *ReactorClientRequest) (*ReactorClientResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method ReactorClientHandler not implemented")
}
func (UnimplementedHandshakeServer) mustEmbedUnimplementedHandshakeServer() {}
// UnsafeHandshakeServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to HandshakeServer will
// result in compilation errors.
type UnsafeHandshakeServer interface {
mustEmbedUnimplementedHandshakeServer()
}
func RegisterHandshakeServer(s grpc.ServiceRegistrar, srv HandshakeServer) {
s.RegisterService(&Handshake_ServiceDesc, srv)
}
func _Handshake_ReactorClientHandler_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ReactorClientRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(HandshakeServer).ReactorClientHandler(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/grpc.handshake/ReactorClientHandler",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(HandshakeServer).ReactorClientHandler(ctx, req.(*ReactorClientRequest))
}
return interceptor(ctx, in, info, handler)
}
// Handshake_ServiceDesc is the grpc.ServiceDesc for Handshake service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var Handshake_ServiceDesc = grpc.ServiceDesc{
ServiceName: "grpc.handshake",
HandlerType: (*HandshakeServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "ReactorClientHandler",
Handler: _Handshake_ReactorClientHandler_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "internal/pkg/grpc/handshake.proto",
}

@ -0,0 +1,211 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.28.1
// protoc v3.21.12
// source: internal/pkg/grpc/monitoring.proto
package grpc
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type ReactorAck struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Id int32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"`
}
func (x *ReactorAck) Reset() {
*x = ReactorAck{}
if protoimpl.UnsafeEnabled {
mi := &file_internal_pkg_grpc_monitoring_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *ReactorAck) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ReactorAck) ProtoMessage() {}
func (x *ReactorAck) ProtoReflect() protoreflect.Message {
mi := &file_internal_pkg_grpc_monitoring_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ReactorAck.ProtoReflect.Descriptor instead.
func (*ReactorAck) Descriptor() ([]byte, []int) {
return file_internal_pkg_grpc_monitoring_proto_rawDescGZIP(), []int{0}
}
func (x *ReactorAck) GetId() int32 {
if x != nil {
return x.Id
}
return 0
}
type ReactorPing struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Id int32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"`
}
func (x *ReactorPing) Reset() {
*x = ReactorPing{}
if protoimpl.UnsafeEnabled {
mi := &file_internal_pkg_grpc_monitoring_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *ReactorPing) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ReactorPing) ProtoMessage() {}
func (x *ReactorPing) ProtoReflect() protoreflect.Message {
mi := &file_internal_pkg_grpc_monitoring_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ReactorPing.ProtoReflect.Descriptor instead.
func (*ReactorPing) Descriptor() ([]byte, []int) {
return file_internal_pkg_grpc_monitoring_proto_rawDescGZIP(), []int{1}
}
func (x *ReactorPing) GetId() int32 {
if x != nil {
return x.Id
}
return 0
}
var File_internal_pkg_grpc_monitoring_proto protoreflect.FileDescriptor
var file_internal_pkg_grpc_monitoring_proto_rawDesc = []byte{
0x0a, 0x22, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x67,
0x72, 0x70, 0x63, 0x2f, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x69, 0x6e, 0x67, 0x2e, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x12, 0x04, 0x67, 0x72, 0x70, 0x63, 0x22, 0x1c, 0x0a, 0x0a, 0x52, 0x65,
0x61, 0x63, 0x74, 0x6f, 0x72, 0x41, 0x63, 0x6b, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01,
0x20, 0x01, 0x28, 0x05, 0x52, 0x02, 0x69, 0x64, 0x22, 0x1d, 0x0a, 0x0b, 0x52, 0x65, 0x61, 0x63,
0x74, 0x6f, 0x72, 0x50, 0x69, 0x6e, 0x67, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20,
0x01, 0x28, 0x05, 0x52, 0x02, 0x69, 0x64, 0x32, 0x49, 0x0a, 0x0a, 0x6d, 0x6f, 0x6e, 0x69, 0x74,
0x6f, 0x72, 0x69, 0x6e, 0x67, 0x12, 0x3b, 0x0a, 0x12, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72,
0x50, 0x69, 0x6e, 0x67, 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x12, 0x11, 0x2e, 0x67, 0x72,
0x70, 0x63, 0x2e, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x50, 0x69, 0x6e, 0x67, 0x1a, 0x10,
0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x41, 0x63, 0x6b,
0x28, 0x01, 0x42, 0x13, 0x5a, 0x11, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70,
0x6b, 0x67, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_internal_pkg_grpc_monitoring_proto_rawDescOnce sync.Once
file_internal_pkg_grpc_monitoring_proto_rawDescData = file_internal_pkg_grpc_monitoring_proto_rawDesc
)
func file_internal_pkg_grpc_monitoring_proto_rawDescGZIP() []byte {
file_internal_pkg_grpc_monitoring_proto_rawDescOnce.Do(func() {
file_internal_pkg_grpc_monitoring_proto_rawDescData = protoimpl.X.CompressGZIP(file_internal_pkg_grpc_monitoring_proto_rawDescData)
})
return file_internal_pkg_grpc_monitoring_proto_rawDescData
}
var file_internal_pkg_grpc_monitoring_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_internal_pkg_grpc_monitoring_proto_goTypes = []interface{}{
(*ReactorAck)(nil), // 0: grpc.ReactorAck
(*ReactorPing)(nil), // 1: grpc.ReactorPing
}
var file_internal_pkg_grpc_monitoring_proto_depIdxs = []int32{
1, // 0: grpc.monitoring.ReactorPingHandler:input_type -> grpc.ReactorPing
0, // 1: grpc.monitoring.ReactorPingHandler:output_type -> grpc.ReactorAck
1, // [1:2] is the sub-list for method output_type
0, // [0:1] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}
func init() { file_internal_pkg_grpc_monitoring_proto_init() }
func file_internal_pkg_grpc_monitoring_proto_init() {
if File_internal_pkg_grpc_monitoring_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_internal_pkg_grpc_monitoring_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ReactorAck); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_internal_pkg_grpc_monitoring_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ReactorPing); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_internal_pkg_grpc_monitoring_proto_rawDesc,
NumEnums: 0,
NumMessages: 2,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_internal_pkg_grpc_monitoring_proto_goTypes,
DependencyIndexes: file_internal_pkg_grpc_monitoring_proto_depIdxs,
MessageInfos: file_internal_pkg_grpc_monitoring_proto_msgTypes,
}.Build()
File_internal_pkg_grpc_monitoring_proto = out.File
file_internal_pkg_grpc_monitoring_proto_rawDesc = nil
file_internal_pkg_grpc_monitoring_proto_goTypes = nil
file_internal_pkg_grpc_monitoring_proto_depIdxs = nil
}

@ -0,0 +1,16 @@
syntax = "proto3";
package grpc;
option go_package = "internal/pkg/grpc";
service monitoring {
rpc ReactorPingHandler(stream ReactorPing) returns (ReactorAck);
}
message ReactorAck {
int32 id = 1;
}
message ReactorPing {
int32 id = 1;
}

@ -0,0 +1,139 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v3.21.12
// source: internal/pkg/grpc/monitoring.proto
package grpc
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// MonitoringClient is the client API for Monitoring service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type MonitoringClient interface {
ReactorPingHandler(ctx context.Context, opts ...grpc.CallOption) (Monitoring_ReactorPingHandlerClient, error)
}
type monitoringClient struct {
cc grpc.ClientConnInterface
}
func NewMonitoringClient(cc grpc.ClientConnInterface) MonitoringClient {
return &monitoringClient{cc}
}
func (c *monitoringClient) ReactorPingHandler(ctx context.Context, opts ...grpc.CallOption) (Monitoring_ReactorPingHandlerClient, error) {
stream, err := c.cc.NewStream(ctx, &Monitoring_ServiceDesc.Streams[0], "/grpc.monitoring/ReactorPingHandler", opts...)
if err != nil {
return nil, err
}
x := &monitoringReactorPingHandlerClient{stream}
return x, nil
}
type Monitoring_ReactorPingHandlerClient interface {
Send(*ReactorPing) error
CloseAndRecv() (*ReactorAck, error)
grpc.ClientStream
}
type monitoringReactorPingHandlerClient struct {
grpc.ClientStream
}
func (x *monitoringReactorPingHandlerClient) Send(m *ReactorPing) error {
return x.ClientStream.SendMsg(m)
}
func (x *monitoringReactorPingHandlerClient) CloseAndRecv() (*ReactorAck, error) {
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
m := new(ReactorAck)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// MonitoringServer is the server API for Monitoring service.
// All implementations must embed UnimplementedMonitoringServer
// for forward compatibility
type MonitoringServer interface {
ReactorPingHandler(Monitoring_ReactorPingHandlerServer) error
mustEmbedUnimplementedMonitoringServer()
}
// UnimplementedMonitoringServer must be embedded to have forward compatible implementations.
type UnimplementedMonitoringServer struct {
}
func (UnimplementedMonitoringServer) ReactorPingHandler(Monitoring_ReactorPingHandlerServer) error {
return status.Errorf(codes.Unimplemented, "method ReactorPingHandler not implemented")
}
func (UnimplementedMonitoringServer) mustEmbedUnimplementedMonitoringServer() {}
// UnsafeMonitoringServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to MonitoringServer will
// result in compilation errors.
type UnsafeMonitoringServer interface {
mustEmbedUnimplementedMonitoringServer()
}
func RegisterMonitoringServer(s grpc.ServiceRegistrar, srv MonitoringServer) {
s.RegisterService(&Monitoring_ServiceDesc, srv)
}
func _Monitoring_ReactorPingHandler_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(MonitoringServer).ReactorPingHandler(&monitoringReactorPingHandlerServer{stream})
}
type Monitoring_ReactorPingHandlerServer interface {
SendAndClose(*ReactorAck) error
Recv() (*ReactorPing, error)
grpc.ServerStream
}
type monitoringReactorPingHandlerServer struct {
grpc.ServerStream
}
func (x *monitoringReactorPingHandlerServer) SendAndClose(m *ReactorAck) error {
return x.ServerStream.SendMsg(m)
}
func (x *monitoringReactorPingHandlerServer) Recv() (*ReactorPing, error) {
m := new(ReactorPing)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// Monitoring_ServiceDesc is the grpc.ServiceDesc for Monitoring service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var Monitoring_ServiceDesc = grpc.ServiceDesc{
ServiceName: "grpc.monitoring",
HandlerType: (*MonitoringServer)(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "ReactorPingHandler",
Handler: _Monitoring_ReactorPingHandler_Handler,
ClientStreams: true,
},
},
Metadata: "internal/pkg/grpc/monitoring.proto",
}

@ -0,0 +1,335 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.28.0
// protoc v3.6.1
// source: internal/pkg/grpc/server.proto
package grpc
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type ClientRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
ClientId uint32 `protobuf:"varint,1,opt,name=clientId,proto3" json:"clientId,omitempty"`
ClientType string `protobuf:"bytes,2,opt,name=clientType,proto3" json:"clientType,omitempty"`
}
func (x *ClientRequest) Reset() {
*x = ClientRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_internal_pkg_grpc_server_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *ClientRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ClientRequest) ProtoMessage() {}
func (x *ClientRequest) ProtoReflect() protoreflect.Message {
mi := &file_internal_pkg_grpc_server_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ClientRequest.ProtoReflect.Descriptor instead.
func (*ClientRequest) Descriptor() ([]byte, []int) {
return file_internal_pkg_grpc_server_proto_rawDescGZIP(), []int{0}
}
func (x *ClientRequest) GetClientId() uint32 {
if x != nil {
return x.ClientId
}
return 0
}
func (x *ClientRequest) GetClientType() string {
if x != nil {
return x.ClientType
}
return ""
}
type ClientResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
ClientId uint32 `protobuf:"varint,1,opt,name=clientId,proto3" json:"clientId,omitempty"`
ServerPort uint32 `protobuf:"varint,2,opt,name=serverPort,proto3" json:"serverPort,omitempty"`
Database *Database `protobuf:"bytes,3,opt,name=database,proto3" json:"database,omitempty"`
}
func (x *ClientResponse) Reset() {
*x = ClientResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_internal_pkg_grpc_server_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *ClientResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ClientResponse) ProtoMessage() {}
func (x *ClientResponse) ProtoReflect() protoreflect.Message {
mi := &file_internal_pkg_grpc_server_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ClientResponse.ProtoReflect.Descriptor instead.
func (*ClientResponse) Descriptor() ([]byte, []int) {
return file_internal_pkg_grpc_server_proto_rawDescGZIP(), []int{1}
}
func (x *ClientResponse) GetClientId() uint32 {
if x != nil {
return x.ClientId
}
return 0
}
func (x *ClientResponse) GetServerPort() uint32 {
if x != nil {
return x.ServerPort
}
return 0
}
func (x *ClientResponse) GetDatabase() *Database {
if x != nil {
return x.Database
}
return nil
}
type Database struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
URL string `protobuf:"bytes,1,opt,name=URL,proto3" json:"URL,omitempty"`
ORG string `protobuf:"bytes,2,opt,name=ORG,proto3" json:"ORG,omitempty"`
Token string `protobuf:"bytes,3,opt,name=token,proto3" json:"token,omitempty"`
Bucket string `protobuf:"bytes,4,opt,name=bucket,proto3" json:"bucket,omitempty"`
}
func (x *Database) Reset() {
*x = Database{}
if protoimpl.UnsafeEnabled {
mi := &file_internal_pkg_grpc_server_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *Database) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*Database) ProtoMessage() {}
func (x *Database) ProtoReflect() protoreflect.Message {
mi := &file_internal_pkg_grpc_server_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use Database.ProtoReflect.Descriptor instead.
func (*Database) Descriptor() ([]byte, []int) {
return file_internal_pkg_grpc_server_proto_rawDescGZIP(), []int{2}
}
func (x *Database) GetURL() string {
if x != nil {
return x.URL
}
return ""
}
func (x *Database) GetORG() string {
if x != nil {
return x.ORG
}
return ""
}
func (x *Database) GetToken() string {
if x != nil {
return x.Token
}
return ""
}
func (x *Database) GetBucket() string {
if x != nil {
return x.Bucket
}
return ""
}
var File_internal_pkg_grpc_server_proto protoreflect.FileDescriptor
var file_internal_pkg_grpc_server_proto_rawDesc = []byte{
0x0a, 0x1e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x67,
0x72, 0x70, 0x63, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x12, 0x04, 0x67, 0x72, 0x70, 0x63, 0x22, 0x4b, 0x0a, 0x0d, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74,
0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x63, 0x6c, 0x69, 0x65, 0x6e,
0x74, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x63, 0x6c, 0x69, 0x65, 0x6e,
0x74, 0x49, 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70,
0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x54,
0x79, 0x70, 0x65, 0x22, 0x78, 0x0a, 0x0e, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x49,
0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x49,
0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x50, 0x6f, 0x72, 0x74, 0x18,
0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x50, 0x6f, 0x72,
0x74, 0x12, 0x2a, 0x0a, 0x08, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x18, 0x03, 0x20,
0x01, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x62,
0x61, 0x73, 0x65, 0x52, 0x08, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x22, 0x5c, 0x0a,
0x08, 0x44, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x55, 0x52, 0x4c,
0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x55, 0x52, 0x4c, 0x12, 0x10, 0x0a, 0x03, 0x4f,
0x52, 0x47, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x4f, 0x52, 0x47, 0x12, 0x14, 0x0a,
0x05, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f,
0x6b, 0x65, 0x6e, 0x12, 0x16, 0x0a, 0x06, 0x62, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x18, 0x04, 0x20,
0x01, 0x28, 0x09, 0x52, 0x06, 0x62, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x32, 0x50, 0x0a, 0x09, 0x68,
0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x12, 0x43, 0x0a, 0x16, 0x43, 0x6c, 0x69, 0x65,
0x6e, 0x74, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x48, 0x61, 0x6e, 0x64, 0x6c,
0x65, 0x72, 0x12, 0x13, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74,
0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x43,
0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x13, 0x5a,
0x11, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x67, 0x72,
0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_internal_pkg_grpc_server_proto_rawDescOnce sync.Once
file_internal_pkg_grpc_server_proto_rawDescData = file_internal_pkg_grpc_server_proto_rawDesc
)
func file_internal_pkg_grpc_server_proto_rawDescGZIP() []byte {
file_internal_pkg_grpc_server_proto_rawDescOnce.Do(func() {
file_internal_pkg_grpc_server_proto_rawDescData = protoimpl.X.CompressGZIP(file_internal_pkg_grpc_server_proto_rawDescData)
})
return file_internal_pkg_grpc_server_proto_rawDescData
}
var file_internal_pkg_grpc_server_proto_msgTypes = make([]protoimpl.MessageInfo, 3)
var file_internal_pkg_grpc_server_proto_goTypes = []interface{}{
(*ClientRequest)(nil), // 0: grpc.ClientRequest
(*ClientResponse)(nil), // 1: grpc.ClientResponse
(*Database)(nil), // 2: grpc.Database
}
var file_internal_pkg_grpc_server_proto_depIdxs = []int32{
2, // 0: grpc.ClientResponse.database:type_name -> grpc.Database
0, // 1: grpc.handshake.ClientDiscoveryHandler:input_type -> grpc.ClientRequest
1, // 2: grpc.handshake.ClientDiscoveryHandler:output_type -> grpc.ClientResponse
2, // [2:3] is the sub-list for method output_type
1, // [1:2] is the sub-list for method input_type
1, // [1:1] is the sub-list for extension type_name
1, // [1:1] is the sub-list for extension extendee
0, // [0:1] is the sub-list for field type_name
}
func init() { file_internal_pkg_grpc_server_proto_init() }
func file_internal_pkg_grpc_server_proto_init() {
if File_internal_pkg_grpc_server_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_internal_pkg_grpc_server_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ClientRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_internal_pkg_grpc_server_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ClientResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_internal_pkg_grpc_server_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*Database); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_internal_pkg_grpc_server_proto_rawDesc,
NumEnums: 0,
NumMessages: 3,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_internal_pkg_grpc_server_proto_goTypes,
DependencyIndexes: file_internal_pkg_grpc_server_proto_depIdxs,
MessageInfos: file_internal_pkg_grpc_server_proto_msgTypes,
}.Build()
File_internal_pkg_grpc_server_proto = out.File
file_internal_pkg_grpc_server_proto_rawDesc = nil
file_internal_pkg_grpc_server_proto_goTypes = nil
file_internal_pkg_grpc_server_proto_depIdxs = nil
}

@ -0,0 +1,28 @@
syntax = "proto3";
package grpc;
option go_package = "internal/pkg/grpc";
service handshake {
rpc ClientDiscoveryHandler(ClientRequest) returns (ClientResponse);
}
message ClientRequest {
uint32 clientId = 1;
string clientType = 2;
string ip = 3; // client ip
uint32 port = 4; // client port for gRPC server
}
message ClientResponse {
uint32 clientId = 1;
uint32 serverPort = 2;
Database database = 3;
}
message Database {
string URL = 1;
string ORG = 2;
string token = 3;
string bucket = 4;
}

@ -0,0 +1,105 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v3.21.12
// source: internal/pkg/grpc/handshake.proto
package grpc
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// HandshakeClient is the client API for Handshake service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type HandshakeClient interface {
ReactorClientHandler(ctx context.Context, in *ReactorClientRequest, opts ...grpc.CallOption) (*ReactorClientResponse, error)
}
type handshakeClient struct {
cc grpc.ClientConnInterface
}
func NewHandshakeClient(cc grpc.ClientConnInterface) HandshakeClient {
return &handshakeClient{cc}
}
func (c *handshakeClient) ReactorClientHandler(ctx context.Context, in *ReactorClientRequest, opts ...grpc.CallOption) (*ReactorClientResponse, error) {
out := new(ReactorClientResponse)
err := c.cc.Invoke(ctx, "/grpc.handshake/ReactorClientHandler", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// HandshakeServer is the server API for Handshake service.
// All implementations must embed UnimplementedHandshakeServer
// for forward compatibility
type HandshakeServer interface {
ReactorClientHandler(context.Context, *ReactorClientRequest) (*ReactorClientResponse, error)
mustEmbedUnimplementedHandshakeServer()
}
// UnimplementedHandshakeServer must be embedded to have forward compatible implementations.
type UnimplementedHandshakeServer struct {
}
func (UnimplementedHandshakeServer) ReactorClientHandler(context.Context, *ReactorClientRequest) (*ReactorClientResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method ReactorClientHandler not implemented")
}
func (UnimplementedHandshakeServer) mustEmbedUnimplementedHandshakeServer() {}
// UnsafeHandshakeServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to HandshakeServer will
// result in compilation errors.
type UnsafeHandshakeServer interface {
mustEmbedUnimplementedHandshakeServer()
}
func RegisterHandshakeServer(s grpc.ServiceRegistrar, srv HandshakeServer) {
s.RegisterService(&Handshake_ServiceDesc, srv)
}
func _Handshake_ReactorClientHandler_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ReactorClientRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(HandshakeServer).ReactorClientHandler(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/grpc.handshake/ReactorClientHandler",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(HandshakeServer).ReactorClientHandler(ctx, req.(*ReactorClientRequest))
}
return interceptor(ctx, in, info, handler)
}
// Handshake_ServiceDesc is the grpc.ServiceDesc for Handshake service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var Handshake_ServiceDesc = grpc.ServiceDesc{
ServiceName: "grpc.handshake",
HandlerType: (*HandshakeServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "ReactorClientHandler",
Handler: _Handshake_ReactorClientHandler_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "internal/pkg/grpc/handshake.proto",
}

@ -0,0 +1,112 @@
#!/usr/bin/env python3
import sys
import shutil
from typing import Optional, List, Tuple, Dict
import typer
from rich import print
from rich.columns import Columns
from rich.console import Console
from rich.traceback import install
# fmt: off
# Mapping from topics to colors
TOPICS = {
"EXIT": "#9a9a99",
"STRT": "#67a0b2",
"PING": "#d0b343",
"SCAN": "#70c43f",
"SPWN": "#4878bc",
"STOP": "#ffff00",
#"LOG2": "#398280",
#"CMIT": "#98719f",
#"PERS": "#d08341",
#"SNAP": "#FD971F",
#"DROP": "#ff615c",
"CLNT": "#00813c",
#"TEST": "#fe2c79",
#"INFO": "#ffffff",
#"WARN": "#d08341",
"ERRO": "#fe2626",
}
# fmt: on
def list_topics(value: Optional[str]):
if value is None:
return value
topics = value.split(",")
for topic in topics:
if topic not in TOPICS:
raise typer.BadParameter(f"topic {topic} not recognized")
return topics
def main(
file: typer.FileText = typer.Argument(None, help="File to read, stdin otherwise"),
colorize: bool = typer.Option(True, "--no-color"),
n_columns: Optional[int] = typer.Option(None, "--columns", "-c"),
ignore: Optional[str] = typer.Option(None, "--ignore", "-i", callback=list_topics),
just: Optional[str] = typer.Option(None, "--just", "-j", callback=list_topics),
):
topics = list(TOPICS)
# We can take input from a stdin (pipes) or from a file
input_ = file if file else sys.stdin
# Print just some topics or exclude some topics (good for avoiding verbose ones)
if just:
topics = just
if ignore:
topics = [lvl for lvl in topics if lvl not in set(ignore)]
topics = set(topics)
console = Console()
width = console.size.width
panic = False
for line in input_:
try:
time, topic, *msg = line.strip().split(" ")
# To ignore some topics
if topic not in topics:
continue
msg = " ".join(msg)
# Debug calls from the test suite aren't associated with
# any particular peer. Otherwise we can treat second column
# as peer id
if topic != "TEST":
i = int(msg[1])
# Colorize output by using rich syntax when needed
if colorize and topic in TOPICS:
color = TOPICS[topic]
msg = f"[{color}]{msg}[/{color}]"
# Single column printing. Always the case for debug stmts in tests
if n_columns is None or topic == "TEST":
print(time, msg)
# Multi column printing, timing is dropped to maximize horizontal
# space. Heavylifting is done through rich.column.Columns object
else:
cols = ["" for _ in range(n_columns)]
msg = "" + msg
cols[i] = msg
col_width = int(width / n_columns)
cols = Columns(cols, width=col_width - 1, equal=True, expand=True)
print(cols)
except:
# Code from tests or panics does not follow format
# so we print it as is
if line.startswith("panic"):
panic = True
# Output from tests is usually important so add a
# horizontal line with hashes to make it more obvious
if not panic:
print("#" * console.width)
print(line, end="")
if __name__ == "__main__":
typer.run(main)

@ -0,0 +1,86 @@
package logging
import (
"errors"
"fmt"
"log"
"os"
"strconv"
"time"
)
func getLogType() string {
if t, ok := os.LookupEnv("LOGTYPE"); ok {
return t
}
return "DEFAULT"
}
func getVerbosity() int {
v := os.Getenv("VERBOSE")
level := 0
if v != "" {
var err error
level, err = strconv.Atoi(v)
if err != nil {
log.Fatalf("Invalid Verbosity %v", v)
}
}
return level
}
type logTopic string
const (
// define 4 character topic abbreviations for coloring
DError logTopic = "ERRO"
DClient logTopic = "CLNT"
DStart logTopic = "STRT"
DExit logTopic = "EXIT"
DPing logTopic = "PING"
DScan logTopic = "SCAN"
DSpawn logTopic = "SPWN"
DStop logTopic = "STOP"
)
// the list can grow
var debugStart time.Time
var debugVerbosity int
func init() {
debugVerbosity = getVerbosity()
debugStart = time.Now()
if debugVerbosity > 0 {
path := "log/"
if _, err := os.Stat(path); errors.Is(err, os.ErrNotExist) {
err := os.Mkdir(path, os.ModePerm)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
}
logtype := getLogType() // start with "REACTOR" etc
timestamp := time.Now().Format("Mon-15:04:05")
filename := fmt.Sprintf("%s-%s.log", logtype, timestamp)
f, err := os.OpenFile(path+filename, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0664)
if err != nil {
log.Fatal(err)
}
log.SetOutput(f)
}
log.SetFlags(log.Flags() &^ (log.Ldate | log.Ltime)) // turns off date and time so we can set manually
}
// example call Debug(dClient, "R%d connecting to client %d", r.Id, c.Id)
func Debug(topic logTopic, format string, a ...interface{}) {
if debugVerbosity >= 1 {
time := time.Since(debugStart).Microseconds()
time /= 100
prefix := fmt.Sprintf("%06d %v ", time, string(topic))
format = prefix + format
log.Printf(format, a...)
}
}

@ -0,0 +1,80 @@
// package System provides a way to listen for incoming connections
// and manage multiple reactor clients.
package system
import (
pb "dmac/pkg/grpc"
"errors"
"fmt"
"net"
"sync"
"github.com/spf13/viper"
"google.golang.org/grpc"
)
var (
ErrMissingPort = errors.New("port not set")
)
// Coordinator is runs on the server and is used to oversee
// the reactor managers as well as process incoming client connections.
type Coordinator struct {
Config *viper.Viper
listener net.Listener
grpcServer *grpc.Server
DatabasePort int `mapstructure:"database_port"`
GRPCPort int `mapstructure:"grpc_port"`
directory map[int]*ReactorManager
managerMu sync.RWMutex
Err chan error
// grpc
pb.UnimplementedHandshakeServer
pb.UnimplementedMonitoringServer
}
// NewCentralCoordinator creates a central coordinator with the given global
// config and error channel.
func NewCentralCoordinator(config *viper.Viper, ch chan error) *Coordinator {
rmap := make(map[int]*ReactorManager)
return &Coordinator{
Err: ch,
Config: config,
directory: rmap,
}
}
// Start loads config, starts network listener and registers grpc handlers.
// Ready for new clients on return.
func (c *Coordinator) Start() error {
if err := c.Config.Unmarshal(c); err != nil {
return err
}
// ensure it shows up as missing
if c.GRPCPort == 0 {
c.Config.Set("grpc_port", 0)
c.Config.WriteConfig()
return ErrMissingPort
}
lis, err := net.Listen("tcp", fmt.Sprintf(":%v", c.GRPCPort))
if err != nil {
return err
}
grpcServer := grpc.NewServer()
c.listener = lis
c.grpcServer = grpcServer
return c.Register()
}

@ -0,0 +1,13 @@
package server
type dbinfo struct {
url string
org string
token string
bucket string
}
func (c *Coordinator) getReactorDatabaseCred(id int) (*dbinfo, error) {
return &dbinfo{}, nil
}

@ -0,0 +1,48 @@
package system
import (
"context"
pb "dmac/pkg/grpc"
"dmac/pkg/logging"
)
// ClientDiscoveryHandler implements the grpc method which can be called
// by incoming clients to first make connection to the central
// coordinator and receive database credentials.
func (c *Coordinator) ReactorClientHandler(
ctx context.Context,
req *pb.ReactorClientRequest,
) (*pb.ReactorClientResponse, error) {
id := int(req.GetId())
logging.Debug(
logging.DClient,
"LIS 00 reactor %v has connected\n",
id,
)
db, err := c.getReactorDatabaseCred(id)
if err != nil {
return &pb.ReactorClientResponse{}, err
}
return &pb.ReactorClientResponse{
Id: id,
Url: db.url,
Org: db.org,
Token: db.token,
Bucket: db.bucket,
}, err
}
// ReactorStatusHandler is a gRPC handler used to handle incoming
// reactor requests containing information about said reactor.
// It will get the associate reactor manager and pass the
// request device information before returning an acknowledgement.
func (c *Coordinator) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) {
// rm, err := c.LoadReactorManager(int(req.GetId()))
return &pb.ReactorStatusResponse{}, nil
}

@ -0,0 +1,118 @@
// package system uses linux commands to get hardware info for identifation
package system
import (
"bytes"
"errors"
"fmt"
"hash/fnv"
"net"
"os/exec"
"strings"
)
func GetId() (int, error) {
// gets the mac address and hashes into consistent id
maccmd := fmt.Sprintf("ifconfig %v | awk '/ether / {print $2}'", et)
var stderr bytes.Buffer
var out bytes.Buffer
cmd := exec.Command("bash", "-c", maccmd)
cmd.Stdout = &out
cmd.Stderr = &stderr
if err := cmd.Run(); err != nil {
return 0, err
}
hash := fnv.New32a()
hash.Write(out.Bytes())
id := hash.Sum32()
return int(id), nil
}
func GetIp() (string, error) {
ipcmd := "ip route get 1 | sed 's/^.*src \([^ ]*\).*$/\1/;q'"
var stderr bytes.Buffer
var out bytes.Buffer
cmd := exec.Command("bash", "-c", ipcmd)
cmd.Stdout = &out
cmd.Stderr = &stderr
if err := cmd.Run(); err != nil {
return "", err
}
ip := strings.Trim(out.String(), " \n")
return ip, nil
}
func GetPort() (int, error) {
// obsolete
if addr, err := net.ResolveTCPAddr("tcp", ":0"); err != nil {
return 0, err
} else if lis, err := net.ListenTCP("tcp", addr); err != nil {
return 0, err
} else {
defer lis.Close()
return lis.Addr().(*net.TCPAddr).Port, nil
}
}
func GetBus() (int, error) {
// preset busses
busList := map[string]int{"raspberrypi": 1, "beaglebone": 2}
// vars
var bus int
var ok bool
if name, err =: GetModel(); err != nil {
return bus, err
} else if bus, ok = busList[b]; !ok {
return 0, errors.New(fmt.Sprintf("No bus for dev %s", b))
}
// returns correct bus
return bus, nil
}
func GetModel() (string, error) {
var stderr, out bytes.Buffer
cmd := exec.Command("bash", "-c", "hostname")
cmd.Stdout = &out
cmd.Stderr = &stderr
if err := cmd.Run(); err != nil {
return "", err
}
b := out.String()
b = strings.Trim(b, " \n")
return b, nil
}
func Get() error {
// responsible for filling out struct
//bus := map[string]int{"raspberrypi":1,"beaglebone":2} // eventually will replace this with a config file
ipcmd := "ifconfig eth0 | awk '/inet / {print $2}'"
maccmd := "ifconfig eth0 | awk '/ether / {print $2}'"
devcmd := "lshw -C system 2>/dev/null | head -n 1"
res := [3]bytes.Buffer{}
var stderr bytes.Buffer
cmds := []string{ipcmd, maccmd, devcmd}
for i, c := range cmds {
cmd := exec.Command("bash", "-c", c)
cmd.Stdout = &res[i]
cmd.Stderr = &stderr
err := cmd.Run()
if err != nil {
return err
}
}
// formatting
ip := res[0].String()
ip = strings.Trim(ip, " \n")
hash := fnv.New32a()
hash.Write(res[1].Bytes())
b := res[2].String()
b = strings.Trim(b, " \n")
return nil
}

@ -0,0 +1,84 @@
package system
import (
//"log"
pb "FRMS/internal/pkg/grpc"
"FRMS/internal/pkg/logging"
"context"
"fmt"
"net"
"google.golang.org/grpc"
)
/*
Listens on a supplied port and sends incoming clients over a supplied channel
Waits for a response on that channel to send back to the client with DB credentials
*/
type Listener struct { // exporting for easy use in the short term
Port int `mapstructure:"lis"`
ClientConnections chan *ClientPacket
Err chan error
pb.UnimplementedHandshakeServer
}
type ClientPacket struct {
*Client
Response chan *ClientResponse
}
type Client struct {
//Ip string
//Port int
Id int
Model string
Type string
}
type ClientResponse struct {
Port int
URL string
Org string
Token string
Bucket string
}
func NewListener(cch chan *ClientPacket, ech chan error) *Listener {
l := &Listener{Err: ech, ClientConnections: cch}
return l
}
func (l *Listener) Start() error {
// start grpc server and implement reciever
logging.Debug(logging.DStart, "LIS 01 Started client listener")
return l.Register()
}
func (l *Listener) Register() error {
// creates a gRPC service and binds it to our handler
lis, err := net.Listen("tcp", fmt.Sprintf(":%v", l.Port))
if err != nil {
return err
}
grpcServer := grpc.NewServer()
pb.RegisterHandshakeServer(grpcServer, l)
go grpcServer.Serve(lis)
logging.Debug(logging.DStart, "LIS 01 Registered on port %v", l.Port)
return nil
}
func (l *Listener) ClientDiscoveryHandler(ctx context.Context, ping *pb.ClientRequest) (*pb.ClientResponse, error) {
// incoming client ping, notify coord and wait for DB credentials to respond
c := &Client{Id: int(ping.GetClientId()), Type: ping.GetClientType()}
logging.Debug(logging.DClient, "LIS %v %v has connected\n", c.Type, c.Id)
// prepare packet to send to coordinator
ch := make(chan *ClientResponse)
p := &ClientPacket{Client: c, Response: ch}
// blocking
l.ClientConnections <- p
resp := <-ch
// prepare object to return to client
db := &pb.Database{URL: resp.URL, ORG: resp.Org, Token: resp.Token, Bucket: resp.Bucket}
return &pb.ClientResponse{ClientId: uint32(c.Id), ServerPort: uint32(resp.Port), Database: db}, nil
}

@ -0,0 +1,29 @@
package system
import (
//"log"
_ "context"
)
// will condense into the rm soon enough
// manager connects to client on start and returns the gRPC connection to make gRPC clients
// type ClientManager struct {
// *Client // gives access to c.Ip c.Id etc
// Hb time.Duration // used for managing hb timer for client
// Sig chan bool
// sync.Mutex
// }
// func NewClientManager(cl *Client) *ClientManager {
// return &ClientManager{Client: cl}
// }
// func (m *ClientManager) UpdateClient(cl *Client) error {
// m.Lock()
// defer m.Unlock()
// logging.Debug(logging.DClient, "MAN Updating client %v", cl.Id)
// m.Client = cl
// return nil
// }

@ -0,0 +1,206 @@
package system
import (
"math/rand"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
// creating, starting and stopping tests
// newManager is a helper for creating new managers for tests
func newManager(conn int, want error, t *testing.T) *Manager {
var manager *Manager
var err error
assert := assert.New(t)
manager, err = New(conn)
if err != want {
t.Fatalf(
`New(%d) = %v, %v, %d max connections failed`,
conn,
manager,
err,
conn,
)
}
assert.Equal(manager.IsActive(), Inactive, "manager should start inactive")
return manager
}
// TestEmptyManager creates a new manager with 0 max connections
func TestEmptyManager(t *testing.T) {
conn := 0
newManager(conn, nil, t)
}
// TestPostiveManager creates a new manager with valid max connections
func TestPositiveManager(t *testing.T) {
conn := rand.Intn(MaxConnAttempts)
newManager(conn, nil, t)
}
// TestNegativeManager creates a new manager with negative max connections
func TestNegativeManager(t *testing.T) {
conn := -1 * rand.Intn(MaxConnAttempts)
newManager(conn, ErrInvalidMaxConn, t)
}
// TestInvalidManager creates a new manager with large max connections
func TestInvalidManager(t *testing.T) {
conn := MaxConnAttempts + 0xf
newManager(conn, ErrInvalidMaxConn, t)
}
// TestManagerLifeCycle tests that a manager can start and exit several times
func TestManagerLifeCycle(t *testing.T) {
var manager *Manager
assert := assert.New(t)
conn := rand.Intn(MaxConnAttempts)
manager = newManager(conn, nil, t)
cycles := 10
// starting and stopping sequentially
for i := 0; i < cycles; i++ {
assert.NoError(manager.Start(), "starting manager failed")
assert.Equal(manager.IsActive(), Active, "manager inactive after start")
assert.NoError(manager.Stop(), "stopping manager failed")
assert.Equal(manager.IsActive(), Inactive, "manager active after stop")
}
}
// TestManagerStopFail tests that stopping an inactive manager will error
func TestManagerStopFail(t *testing.T) {
var manager *Manager
assert := assert.New(t)
conn := rand.Intn(MaxConnAttempts)
manager = newManager(conn, nil, t)
assert.NoError(manager.Start(), "starting manager failed")
// stopping sequentially
assert.NoError(manager.Stop(), "stopping manager failed")
assert.Error(manager.Stop(), "stopping inactive manager should fail")
}
// TestManagerStartFail tests that starting an active manager will error
func TestManagerStartFail(t *testing.T) {
var manager *Manager
assert := assert.New(t)
conn := rand.Intn(MaxConnAttempts)
manager = newManager(conn, nil, t)
// starting sequentially
assert.NoError(manager.Start(), "starting manager failed")
assert.Error(manager.Start(), "starting active manager should fail")
}
// auxiliary tests
// TestManagerTimeout checks that timeouts exponentially backoff
func TestManagerTimeout(t *testing.T) {
var manager *Manager
assert := assert.New(t)
conn := 10
manager = newManager(conn, nil, t)
assert.NoError(manager.Start(), "starting manager failed")
assert.Equal(manager.IsActive(), Active, "manager is inactive")
prevTimeout, err := manager.Timeout()
for i := 1; i <= conn; i++ {
assert.NoError(err, "generating timeout failed")
assert.True(prevTimeout > 0, "invalid timeout")
timeout, err := manager.Timeout()
if i == conn {
assert.Error(err, "allowed exceeding max attempts")
} else {
assert.NoError(err, "generating timeout failed")
assert.True(
timeout == 2*prevTimeout,
"incorrect timeout %d, expected %d",
timeout, 2*prevTimeout,
)
}
prevTimeout = timeout
}
}
// TestManagerHB tests the heartbeat channel opens and closes
func TestManagerHB(t *testing.T) {
var manager *Manager
assert := assert.New(t)
conn := rand.Intn(MaxConnAttempts)
manager = newManager(conn, nil, t)
assert.NoError(manager.Start(), "starting manager failed")
assert.Equal(manager.IsActive(), Active, "manager is inactive")
ch := make(chan struct{})
go manager.HeartBeat(ch, 10, 0, time.Millisecond)
for range ch {
// close on first ping
assert.NoError(manager.Stop(), "stopping manager failed")
}
assert.Equal(manager.IsActive(), Inactive, "manager is active")
}
// TestManagerHBTiming tests the heartbeat channel timing is correct
func TestManagerHBTiming(t *testing.T) {
var manager *Manager
assert := assert.New(t)
conn := rand.Intn(MaxConnAttempts)
manager = newManager(conn, nil, t)
assert.NoError(manager.Start(), "starting manager failed")
assert.Equal(manager.IsActive(), Active, "manager is inactive")
ch := make(chan struct{})
hb := 100
pings := 10
// expected time with tolerance for other logic and worst case rand timeout
expected := time.Duration(pings*hb+15) * time.Millisecond
go manager.HeartBeat(ch, hb, 1, time.Millisecond)
iter := 0
start := time.Now()
for range ch {
// close after 10 pings
iter += 1
if iter >= pings {
assert.NoError(manager.Stop(), "stopping manager failed")
}
}
end := time.Now()
assert.Equal(manager.IsActive(), Inactive, "manager is active")
assert.WithinDuration(start, end, expected, "inaccurate heartbeat")
}

@ -0,0 +1,121 @@
package server
import (
"FRMS/internal/pkg/logging"
"FRMS/internal/pkg/manager"
"errors"
"time"
)
var (
ErrNoReactorManager = errors.New("no reactor manager found")
)
// ReactorManager can be started/stopped as clients connect/disconnect.
type ReactorManager struct {
Manager // base manager interface
}
// Manager is an interface requiring a structure that can be started
// and stopped as well as provide timeouts in milliseconds.
type Manager interface {
Start() error // status checks
Stop() error
Timeout() (time.Duration, error) // TO Generator
}
// NewManager returns a manager fulfilling the Manager interface as well as
// any potential errors.
func NewManager(max int) (Manager, error) {
return manager.New(max)
}
// GetReactorManager returns a reactor manager for passed id.
// Throws error if manager not found for id.
func (c *Coordinator) LoadReactorManager(id int) (*ReactorManager, error) {
c.managerMu.RLock()
defer c.managerMu.RUnlock()
rm, exists := c.directory[id]
if !exists {
logging.Debug(
logging.DClient,
"RCO 00 creating manager for %v",
id,
)
m, err := NewManager(0)
rm = &ReactorManager{
Manager: m,
}
if err = rm.Start(); err != nil {
return rm, err
}
c.directory[id] = rm
}
return rm, nil
}
// // NewReactorManager takes in a client, config and channel to pass errors on.
// // Returns a new reactor manager as well as any errors that occured during
// // creation.
// // Uses MaxConnectionAttempts which defaults to 10 to prevent
// // unnessecary network load and/or timeout lengths.
// func NewReactorManager(
// ) (*ReactorManager, error) {
// m, err := NewManager(MaxConnectionAttempts)
// if err != nil {
// return &ReactorManager{}, err
// }
// return r, err
// }
// Start logs the start and calls start on the embedded manager.
func (r *ReactorManager) Start() error {
// logging.Debug(logging.DStart, "RMA starting", r.Id)
return r.Manager.Start()
}
// Stop logs the stop and calls stop on the embedded manager.
func (r *ReactorManager) Stop() error {
// logging.Debug(logging.DExit, "RMA %v stopping", r.Id)
return r.Manager.Stop()
}
// UpdateClient is used to change the underlying manager client if there
// changes to its data.
//
// BUG(Keegan): Client is not protected by a lock and may lead to races
// func (r *ReactorManager) UpdateClient(cl *Client) error {
// logging.Debug(logging.DClient, "RMA %v updating client", r.Id)
// r.Client = cl
// return nil
// }
// // ReactorDeviceHandler processes incoming device information and
// // updates the manager accordingly.
// func (r *ReactorManager) ReactorDeviceHandler(devs []*pb.Device) error {
// logging.Debug(logging.DClient, "CCO recieved ping from %v", r.Id)
// for _, dev := range devs {
// logging.Debug(
// logging.DClient,
// "CCO %v device %v is %v",
// r.Id,
// dev.GetAddr(),
// dev.GetStatus().String(),
// )
// }
// return nil
// }

@ -0,0 +1,149 @@
package server
import (
pb "FRMS/internal/pkg/grpc"
"FRMS/internal/pkg/logging"
"FRMS/internal/pkg/manager"
"time"
//"FRMS/internal/pkg/device"
"context"
"fmt"
_ "log"
"github.com/spf13/viper"
)
// manager stuff
type Manager interface {
Start() error // status checks
Exit() error
Timeout() (time.Duration, error) // TO Generator
}
func NewManager(max int) Manager {
// takes a heartbeat and max connection attempts
return manager.New(max)
}
type ReactorManager struct {
Manager // base manager interface
// *ClientManager // client manager (OUTDATED)
*Client // access to ID etc
// StatusMon *StatusMonitor putting on pause
// *ReactorDevices
Config *viper.Viper // config to update
Err chan error
}
// type ReactorDevices struct {
// // device struct
// Devices map[int]DeviceManager
// sync.RWMutex
// }
func NewReactorManager(cl *Client, config *viper.Viper, errCh chan error) *ReactorManager {
// making managers
m := NewManager(6)
r := &ReactorManager{
Manager: m,
Client: cl,
Config: config,
Err: errCh,
}
return r
}
func (r *ReactorManager) Start() error {
// allows for extra stuff
logging.Debug(logging.DStart, "RMA %v starting", r.Id)
return r.Manager.Start()
//go r.StatusMon.Send(&DeviceInfo{Id: r.Id, Type: "Reactor", Status: "[green]ONLINE[white]"}, "Reactor")
}
func (r *ReactorManager) Exit() error {
// allows for extra stuff
logging.Debug(logging.DExit, "RMA %v exiting", r.Id)
return r.Manager.Exit()
//go r.StatusMon.Send(&DeviceInfo{Id: r.Id, Type: "Reactor", Status: "[red]OFFLINE[white]", Data: fmt.Sprintf("Last Seen %v", time.Now().Format("Mon at 03:04:05pm MST"))}, "Reactor")
}
func (r *ReactorManager) UpdateClient(cl *Client) error {
// this is probably unnessecary
fmt.Printf("Reactor Manager %d updating client!\n", r.Id)
r.Client = cl
return nil
}
func (r *ReactorManager) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) {
// function client will call to update reactor information
//go r.PingReset()
fmt.Printf("Recieved ping from %d!\n", req.GetId())
// update devices/sensors
for _, dev := range req.GetDevices() {
fmt.Printf("Device %d is %s ", dev.GetAddr(), dev.GetStatus().String())
}
fmt.Printf("\n")
// go r.UpdateDevices(req.GetDevices())
return &pb.ReactorStatusResponse{Id: int32(r.Id)}, nil
}
// // device stuff
// type DeviceManager interface {
// LoadConfig() error
// UpdateStatus(string) error
// String() string // printing
// }
// func NewDeviceManager(addr int, config *viper.Viper, prefix string) (DeviceManager, error) {
// // returns a manager struct
// return device.NewDeviceManager(addr, config, prefix)
// }
//func (r *ReactorManager) UpdateDevices(devs []*pb.Device) {
// // pass updates to correct manager
// r.ReactorDevices.RLock() // read lock only
// defer r.ReactorDevices.RUnlock()
// for _, dev := range devs {
// // looping over devs
// if dm, ok := r.ReactorDevices.Devices[int(dev.GetAddr())]; ok {
// // device manager found
// go dm.UpdateStatus(dev.GetStatus().String())
// //fmt.Println(dm)
// } else {
// // not found
// go r.AddDevice(dev, r.Id, r.Config, r.Err)
// }
// }
//}
// func (r *ReactorDevices) AddDevice(dev *pb.Device, id int, config *viper.Viper, errCh chan error) {
// // setting vars
// prefix := fmt.Sprintf("reactors.%d.", id)
// addr := int(dev.GetAddr())
// var dm DeviceManager
// var err error
// // write locking
// r.Lock()
// defer r.Unlock()
// if dm, err = NewDeviceManager(addr, config, prefix); err != nil {
// errCh <- err
// }
// // setting status
// if err = dm.UpdateStatus(dev.GetStatus().String()); err != nil {
// errCh <- err
// }
// // loading config
// if err = dm.LoadConfig(); err != nil {
// errCh <- err
// }
// r.Devices[int(addr)] = dm
// }

@ -0,0 +1,19 @@
package server
import (
pb "FRMS/internal/pkg/grpc"
"FRMS/internal/pkg/logging"
)
func (c *Coordinator) Register() error {
// register services
pb.RegisterHandshakeServer(c.grpcServer, c)
go c.grpcServer.Serve(c.listener)
// testing
pb.RegisterMonitoringServer(c.grpcServer, c)
logging.Debug(logging.DStart, "CCO 00 registered grpc")
return nil
}

@ -0,0 +1,309 @@
package server
import (
_ "fmt"
// sensor components
)
/*
type StatusMonitor struct {
// allows for embedding into managers
TransactionId chan uint32 // monotonically increases to track outdated reqs
DevChan chan *DeviceInfo // channel for device status
ReactorChan chan *DeviceInfo // channel for reactor status
*SystemViewer
DevBuf *devbuf
sync.Mutex
}
type devbuf struct {
ReactorId int // reactor we are looking at, if any
Buf map[string]map[int]*DeviceInfo // convienent way to store/seperate device data
sync.Mutex
}
func NewBuffer() map[string]map[int]*DeviceInfo {
rbuf := make(map[int]*DeviceInfo)
dbuf := make(map[int]*DeviceInfo)
sbuf := make(map[string]map[int]*DeviceInfo)
sbuf["Reactor"] = rbuf
sbuf["Device"] = dbuf
return sbuf
}
func NewStatusMonitor(t string, id int, sys *SystemViewer) *StatusMonitor {
tid := make(chan uint32)
sm := &StatusMonitor{TransactionId: tid}
sm.SystemViewer = sys
logging.Debug(logging.DClient, "SYS Creating new status monitor")
if t == "Reactor" {
// reactor status monitor
sm.ReactorChan = sys.AddReactorSender()
sm.DevChan = sys.AddDeviceSender(id)
go sm.GenerateIds()
} else {
// tui status monitor
sbuf := NewBuffer()
//sm.ReactorChan, sbuf["Reactor"] = sys.AddListener(id,0)
sm.DevBuf = &devbuf{Buf: sbuf} // makes it easier to work with
go sm.UpdateListener(id, 0)
}
return sm
}
func (s *StatusMonitor) GenerateIds() {
var id uint32
id = 0
for {
s.TransactionId <- id
id += 1
}
}
func (s *StatusMonitor) Send(d *DeviceInfo, dtype string) {
d.TransactionId = <-s.TransactionId
logging.Debug(logging.DClient, "SYS 01 Sending update for: %s (%s)", d.Type, d.Status)
if dtype == "Reactor" {
s.ReactorChan <- d
} else {
s.DevChan <- d
}
}
func (s *StatusMonitor) GetBuffer() []*DeviceInfo {
// also clears buffer
s.DevBuf.Lock()
defer s.DevBuf.Unlock()
res := []*DeviceInfo{}
if len(s.DevBuf.Buf["Reactor"]) != 0 || len(s.DevBuf.Buf["Device"]) != 0 {
logging.Debug(logging.DClient, "Clearing buff %v", s.DevBuf.Buf)
}
for _, devs := range s.DevBuf.Buf {
for _, dev := range devs {
// loops over reactors then devices
res = append(res, dev)
}
}
s.DevBuf.Buf = NewBuffer() // clearing old buffer
return res
}
func (s *StatusMonitor) UpdateListener(tid, reactorId uint32) {
s.DevBuf.Lock()
defer s.DevBuf.Unlock()
// clearing proper buffer
if reactorId == 0 {
logging.Debug(logging.DClient, "SYS 01 Adding %v as reactor listener", tid)
s.DevBuf.Buf["Reactor"] = make(map[uint32]*DeviceInfo)
s.ReactorChan, s.DevBuf.Buf["Reactor"] = s.SystemViewer.AddListener(tid, reactorId)
go s.Listen(s.ReactorChan)
} else {
logging.Debug(logging.DClient, "SYS 01 Adding %v as reactor %v listener", tid, reactorId)
s.DevBuf.Buf["Device"] = make(map[uint32]*DeviceInfo) // clearing old devices
if s.DevBuf.ReactorId != reactorId && s.DevBuf.ReactorId != 0 {
go s.SystemViewer.RemoveListener(s.DevBuf.ReactorId, tid)
}
s.DevBuf.ReactorId = reactorId
s.DevChan, s.DevBuf.Buf["Device"] = s.SystemViewer.AddListener(tid, reactorId)
go s.Listen(s.DevChan)
}
}
func (s *StatusMonitor) UpdateBuffer(d *DeviceInfo, dtype string, ch chan *DeviceInfo) {
s.DevBuf.Lock()
defer s.DevBuf.Unlock()
logging.Debug(logging.DClient, "SYS 01 Dev %v update requested", d)
if dev, exists := s.DevBuf.Buf[dtype][d.Id]; exists {
// already a device in the buffer
if dev.TransactionId > d.TransactionId {
logging.Debug(logging.DClient, "SYS 01 Update Processed. Old: %v, New: %v \n", dev, d)
d = dev // not sure if i can do this lol
}
}
if ch == s.ReactorChan || ch == s.DevChan {
// hacky way to check if the device came from a listener of a current channel
s.DevBuf.Buf[dtype][d.Id] = d
} else {
logging.Debug(logging.DClient, "Dev out of date!")
}
}
func (s *StatusMonitor) Listen(ch chan *DeviceInfo) {
for dev := range ch {
if dev.Type == "Reactor" {
go s.UpdateBuffer(dev, "Reactor", ch)
} else {
go s.UpdateBuffer(dev, "Device", ch)
}
}
}
type InfoStream struct {
// base stream for any reactor/device
// NewSender embedds the channel to send updates on
// NewListener will add the statusmon to the list of devs to echo to
Stream chan *DeviceInfo
Layout *syslayout
*listeners
}
type listeners struct {
sync.RWMutex
Listeners map[uint32]*lischan
}
type lischan struct {
sync.WaitGroup
StatusChan chan *DeviceInfo
}
type syslayout struct {
Devs map[uint32]*DeviceInfo
uint32 //index
sync.RWMutex
}
func NewLisChan(ch chan *DeviceInfo) *lischan {
l := &lischan{StatusChan: ch}
return l
}
func NewInfoStream() *InfoStream {
dch := make(chan *DeviceInfo)
s := &InfoStream{Stream: dch}
m := make(map[uint32]*DeviceInfo)
s.Layout = &syslayout{Devs: m}
s.listeners = &listeners{Listeners: make(map[uint32]*lischan)}
return s
}
func (s *InfoStream) Start() {
// consistency
go s.Listen()
}
// goal is to hook every new manager into the reactor status chan
func (s *InfoStream) AddSender() chan *DeviceInfo {
return s.Stream
}
func (s *InfoStream) Listen() {
for deviceInfo := range s.Stream {
go s.Update(deviceInfo)
}
}
func (s *InfoStream) Update(d *DeviceInfo) {
s.Layout.Lock()
defer s.Layout.Unlock()
if dev, ok := s.Layout.Devs[d.Id]; !ok {
d.Index = s.Layout.uint32
s.Layout.uint32 += 1
} else {
d.Index = dev.Index
}
s.Layout.Devs[d.Id] = d
go s.Echo(d)
}
func (l *listeners) Echo(d *DeviceInfo) {
l.RLock()
defer l.RUnlock()
// read only lock
for _, lis := range l.Listeners {
lis.Add(1)
go func(listener *lischan, dev *DeviceInfo) {
defer listener.Done()
listener.StatusChan <- dev
}(lis, d)
}
}
func (s *InfoStream) AddListener(id int, ch chan *DeviceInfo) map[uint32]*DeviceInfo {
// if i get a memory leak ill eat my shoe
s.listeners.Lock()
defer s.listeners.Unlock()
if _, ok := s.listeners.Listeners[id]; ok {
// exists
go s.RemoveListener(id)
}
s.listeners.Listeners[id] = NewLisChan(ch)
logging.Debug(logging.DClient, "SYS 01 Getting Devices")
//s.Layout.RLock()
//defer s.Layout.RUnlock()
logging.Debug(logging.DClient, "SYS 01 Returning Devices %v", s.Layout.Devs)
return s.Layout.Devs
}
func (l *listeners) RemoveListener(id int) {
l.Lock()
defer l.Unlock()
if lis, ok := l.Listeners[id]; ok {
delete(l.Listeners, id)
go func(ls *lischan) {
ls.Wait()
close(ls.StatusChan)
}(lis)
}
}
// status buffer maintaince
type SystemViewer struct {
// stores system directory and provide methods to be embedded in managers
ReactorStream *InfoStream // can add itself as a listener to provide methods
DeviceStream *ds
}
type ds struct {
Reactors map[uint32]*InfoStream //map from reactor id to its device info stream
sync.Mutex
}
func NewSystemViewer() *SystemViewer {
rs := NewInfoStream()
s := &SystemViewer{ReactorStream: rs}
m := make(map[uint32]*InfoStream)
s.DeviceStream = &ds{Reactors: m}
return s
}
func (s *SystemViewer) Start() {
go s.ReactorStream.Start()
}
func (s *SystemViewer) AddReactorSender() chan *DeviceInfo {
return s.ReactorStream.AddSender()
}
func (s *SystemViewer) AddDeviceSender(reactorId uint32) chan *DeviceInfo {
s.DeviceStream.Lock()
defer s.DeviceStream.Unlock()
var ds *InfoStream
var ok bool
if ds, ok = s.DeviceStream.Reactors[reactorId]; !ok {
ds = NewInfoStream()
s.DeviceStream.Reactors[reactorId] = ds
go ds.Start()
}
return ds.AddSender()
}
func (s *SystemViewer) AddListener(id, rid int) (chan *DeviceInfo, map[uint32]*DeviceInfo) {
// returns a listener for that chan
ch := make(chan *DeviceInfo)
if rid != 0 {
return ch, s.DeviceStream.Reactors[rid].AddListener(id, ch)
} else {
return ch, s.ReactorStream.AddListener(id, ch)
}
}
func (s *SystemViewer) RemoveListener(rid, tid int) {
// removes chan for specific tid and rid
s.DeviceStream.Lock()
defer s.DeviceStream.Unlock()
go s.DeviceStream.Reactors[rid].RemoveListener(tid)
}
*/

@ -0,0 +1,90 @@
// Package websocket sets up websocket connections with clients and allows live reactor readouts.
package websocket
// creates websocket server and upgrades incoming connections
import (
"encoding/json"
"fmt"
"net/http"
ws "github.com/gorilla/websocket"
)
type ReactorTest struct {
Id int `json:"id"`
Name string `json:"name"`
}
type WebSocket struct {
// dummy struct for interface
N string
}
func New() *WebSocket {
return &WebSocket{}
}
func (s *WebSocket) Start() {
fmt.Println("Starting ws server!")
setupRoutes()
http.ListenAndServe(":8080", nil)
}
// default opts allow all origins
var upgrader = ws.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
}
// reader
func reader(conn *ws.Conn) {
for {
// read forever
//messageType, p, err := conn.ReadMessage()
_, p, err := conn.ReadMessage()
if err != nil {
if ws.IsCloseError(err, ws.CloseNormalClosure, ws.CloseGoingAway) {
// normally closed
return
}
panic(err)
}
fmt.Printf("Msg: %s\n", string(p))
}
}
func serverWs(w http.ResponseWriter, r *http.Request) {
fmt.Println(r.Host)
websocket, err := upgrader.Upgrade(w, r, nil)
if err != nil {
panic(err)
}
// try sending reactor
t1 := &ReactorTest{Id: 1111, Name: "test1"}
t2 := &ReactorTest{Id: 1112, Name: "test2"}
t3 := &ReactorTest{Id: 1113, Name: "test3"}
n := []*ReactorTest{t1, t2, t3}
msg, err := json.Marshal(n)
if err != nil {
panic(err)
}
// pass to connection
if err := websocket.WriteMessage(ws.TextMessage, msg); err != nil {
panic(err)
}
// pass to reader
reader(websocket)
}
func setupRoutes() {
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "Simple Server")
})
http.HandleFunc("/ws", serverWs)
}
Loading…
Cancel
Save