diff --git a/Taskfile.dist.yml b/Taskfile.dist.yml index 780f4be..4f0d97f 100644 --- a/Taskfile.dist.yml +++ b/Taskfile.dist.yml @@ -11,6 +11,11 @@ tasks: cmds: - bin/gotest.py + proto: + desc: "Rebuilds protobuf for gRPC" + cmds: + - protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative internal/pkg/grpc/*.proto + all: desc: "builds arm reactor binaries and arm/amd server binaries" deps: [arm32-reactor, arm64-reactor, arm64-server, amd64-server] diff --git a/cmd/reactor/main.go b/cmd/reactor/main.go index a7b4bdb..eb7e6a3 100644 --- a/cmd/reactor/main.go +++ b/cmd/reactor/main.go @@ -25,10 +25,6 @@ func NewReactorCoordinator( return reactor.NewCoordinator(config, ch) } -func NewConfig(file, path, ext string) (*viper.Viper, error) { - return config.LoadConfig(file, path, ext) -} - func main() { // shutdown gracefulShutdown := make(chan os.Signal, 1) @@ -44,7 +40,7 @@ func main() { configFile := "reactor" configExt := "yaml" - conf, err := NewConfig(configFile, configPath, configExt) + conf, err := config.Load(configFile, configPath, configExt) if err != nil { panic(err) } diff --git a/cmd/server/main.go b/cmd/server/main.go index f5109db..9de6f69 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -6,7 +6,8 @@ import ( "syscall" "time" - "FRMS/internal/pkg/config" + conf "FRMS/internal/pkg/config" + "FRMS/internal/pkg/database" "FRMS/internal/pkg/logging" "FRMS/internal/pkg/server" "os" @@ -15,17 +16,13 @@ import ( ) type coordinator interface { - Start() + Start() error } func NewCoordinator(config *viper.Viper, ch chan error) coordinator { return server.NewCentralCoordinator(config, ch) } -func LoadConfig(file, path, ext string) (*viper.Viper, error) { - return config.LoadConfig(file, path, ext) -} - func main() { fmt.Printf("starting server... ") @@ -43,18 +40,23 @@ func main() { configPath := fmt.Sprintf("%s/.config/FRMS", userHome) configFile := "server" configExt := "yaml" - conf, err := LoadConfig(configFile, configPath, configExt) + config, err := conf.Load(configFile, configPath, configExt) if err != nil { panic(err) } + database.Connect(config) + errCh := make(chan error) - c := NewCoordinator(conf, errCh) - go c.Start() - logging.Debug(logging.DStart, "CCO 01 %s started", conf.Get("name")) + c := NewCoordinator(config, errCh) + if err := c.Start(); err != nil { + panic(err) + } + + logging.Debug(logging.DStart, "CCO 01 %s started", config.Get("name")) - elapsed := time.Now().Sub(start) + elapsed := time.Since(start) fmt.Printf("done %v\n", elapsed.Round(time.Microsecond)) select { @@ -63,11 +65,11 @@ func main() { case <-gracefulShutdown: fmt.Printf("\nstopping server... ") start := time.Now() - if err := conf.WriteConfig(); err != nil { + if err := config.WriteConfig(); err != nil { panic(err) } - logging.Debug(logging.DExit, "CON wrote %s", conf.ConfigFileUsed()) - elapsed := time.Now().Sub(start) + logging.Debug(logging.DExit, "CON wrote %s", config.ConfigFileUsed()) + elapsed := time.Since(start) fmt.Printf("done %v\n", elapsed.Round(time.Microsecond)) os.Exit(0) } diff --git a/internal/pkg/config/config.go b/internal/pkg/config/config.go index 0321571..54e7907 100644 --- a/internal/pkg/config/config.go +++ b/internal/pkg/config/config.go @@ -9,15 +9,15 @@ import ( "github.com/spf13/viper" ) -// LoadConfig loads the file at path/file into a viper object -// Expects config file to be yaml -func LoadConfig(file, path, ext string) (*viper.Viper, error) { +// Load the file at path/file into a viper object. +// Expects config file to be yaml. +func Load(file, path, ext string) (*viper.Viper, error) { - logging.Debug(logging.DStart, "CON Loading config for %s", file) + logging.Debug(logging.DStart, "CON loading %s", file) config := viper.New() - configFile := fmt.Sprintf("%s/%s.%s", path, file, ext) + //configFile := fmt.Sprintf("%s/%s.%s", path, file, ext) config.SetConfigName(file) config.AddConfigPath(path) @@ -32,12 +32,12 @@ func LoadConfig(file, path, ext string) (*viper.Viper, error) { } // attempt to create an empty config incase it doesn't exist - if err := config.SafeWriteConfigAs(configFile); err != nil { - // if error thrown because file exists, fine to ignore - if _, ok := err.(viper.ConfigFileAlreadyExistsError); !ok { - return config, err - } - } + // if err := config.SafeWriteConfigAs(configFile); err != nil { + // // if error thrown because file exists, fine to ignore + // if _, ok := err.(viper.ConfigFileAlreadyExistsError); !ok { + // return config, err + // } + // } if err := config.ReadInConfig(); err != nil { fmt.Printf("read error %v\n", config) diff --git a/internal/pkg/database/reactor.go b/internal/pkg/database/reactor.go new file mode 100644 index 0000000..84df903 --- /dev/null +++ b/internal/pkg/database/reactor.go @@ -0,0 +1,64 @@ +// package Database wraps some influx db methods to provide functionality. +package database + +import ( + "context" + "errors" + "fmt" + + influx "github.com/influxdata/influxdb-client-go/v2" + "github.com/spf13/viper" +) + +var ( + ErrDBConnection = errors.New("connection to database failed") + ErrNoURLFound = errors.New("database url not found") +) + +var db influx.Client + +// Connect takes in a config and attempts to create a client for influxdb. +// Will automatically write changes back to config for future attempts +func Connect(config *viper.Viper) error { + + url := config.GetString("db.url") + token := config.GetString("db.token") + + if url == "" { + return ErrNoURLFound + } + + db = influx.NewClient(url, token) + + if token == "" { + // try setup + fmt.Printf("attempting to setup database at %v\n", url) + + user := config.GetString("db.username") + password := config.GetString("db.password") + org := config.GetString("db.org") + bucket := config.GetString("db.bucket") + + Setup(user, pass, org, bucket + } + + db = influx.NewClient(url, token) + + return nil +} + +func Setup(user, pass, org, bucket string, ret int) (string, error) { + + resp, err := db.Setup(context.Background(), user, pass, org, bucket, ret) + + return "", nil +} + +func GetBucket(id int) (string, error) { + return "", nil +} + +func GetToken(id int) (string, error) { + // bucket, err := client.BucketsAPI().FindBucketByName(context.Background(), id) + return "", nil +} diff --git a/internal/pkg/grpc/device.proto b/internal/pkg/grpc/device.proto deleted file mode 100644 index 69ec364..0000000 --- a/internal/pkg/grpc/device.proto +++ /dev/null @@ -1,42 +0,0 @@ -syntax = "proto3"; -package grpc; - -option go_package = "internal/pkg/grpc"; - -service device { - // groups basic device interactions - // get/set name based on request - rpc Name(NameRequest) returns (NameResponse) -} - -message NameRequest { - // empty for future expansion - string Name = 1; -} - -message NameResponse { - string Name = 1; -} - -service sensor { - // sensor specific functions - rpc Reading(ReadingRequest) returns (ReadingResponse) - rpc SampleRate(SampleRateRequest) returns (SampleRateResponse) -} - -message ReadingRequest { - // empty -} - -message ReadingResponse { - string Reading = 1; // formatted reading "9.7 pH" - int64 Timestamp = 2; // when the reading was taken -} - -message SampleRateRequest { - int32 SampleRate = 1; // 0 to return current sample rate, value in seconds -} - -message SampleRateResponse { - int32 SampleRate = 1; // returns the set sample rate -} diff --git a/internal/pkg/grpc/handshake.pb.go b/internal/pkg/grpc/handshake.pb.go new file mode 100644 index 0000000..3444896 --- /dev/null +++ b/internal/pkg/grpc/handshake.pb.go @@ -0,0 +1,260 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.1 +// protoc v3.21.12 +// source: internal/pkg/grpc/handshake.proto + +package grpc + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type ReactorClientRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id uint32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` + Port uint32 `protobuf:"varint,2,opt,name=port,proto3" json:"port,omitempty"` // client gRPC port +} + +func (x *ReactorClientRequest) Reset() { + *x = ReactorClientRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_internal_pkg_grpc_handshake_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ReactorClientRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ReactorClientRequest) ProtoMessage() {} + +func (x *ReactorClientRequest) ProtoReflect() protoreflect.Message { + mi := &file_internal_pkg_grpc_handshake_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ReactorClientRequest.ProtoReflect.Descriptor instead. +func (*ReactorClientRequest) Descriptor() ([]byte, []int) { + return file_internal_pkg_grpc_handshake_proto_rawDescGZIP(), []int{0} +} + +func (x *ReactorClientRequest) GetId() uint32 { + if x != nil { + return x.Id + } + return 0 +} + +func (x *ReactorClientRequest) GetPort() uint32 { + if x != nil { + return x.Port + } + return 0 +} + +type ReactorClientResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id uint32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` + Url string `protobuf:"bytes,2,opt,name=url,proto3" json:"url,omitempty"` + Org string `protobuf:"bytes,3,opt,name=org,proto3" json:"org,omitempty"` + Token string `protobuf:"bytes,4,opt,name=token,proto3" json:"token,omitempty"` + Bucket string `protobuf:"bytes,5,opt,name=bucket,proto3" json:"bucket,omitempty"` +} + +func (x *ReactorClientResponse) Reset() { + *x = ReactorClientResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_internal_pkg_grpc_handshake_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ReactorClientResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ReactorClientResponse) ProtoMessage() {} + +func (x *ReactorClientResponse) ProtoReflect() protoreflect.Message { + mi := &file_internal_pkg_grpc_handshake_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ReactorClientResponse.ProtoReflect.Descriptor instead. +func (*ReactorClientResponse) Descriptor() ([]byte, []int) { + return file_internal_pkg_grpc_handshake_proto_rawDescGZIP(), []int{1} +} + +func (x *ReactorClientResponse) GetId() uint32 { + if x != nil { + return x.Id + } + return 0 +} + +func (x *ReactorClientResponse) GetUrl() string { + if x != nil { + return x.Url + } + return "" +} + +func (x *ReactorClientResponse) GetOrg() string { + if x != nil { + return x.Org + } + return "" +} + +func (x *ReactorClientResponse) GetToken() string { + if x != nil { + return x.Token + } + return "" +} + +func (x *ReactorClientResponse) GetBucket() string { + if x != nil { + return x.Bucket + } + return "" +} + +var File_internal_pkg_grpc_handshake_proto protoreflect.FileDescriptor + +var file_internal_pkg_grpc_handshake_proto_rawDesc = []byte{ + 0x0a, 0x21, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x67, + 0x72, 0x70, 0x63, 0x2f, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x12, 0x04, 0x67, 0x72, 0x70, 0x63, 0x22, 0x3a, 0x0a, 0x14, 0x52, 0x65, 0x61, + 0x63, 0x74, 0x6f, 0x72, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x02, 0x69, + 0x64, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x6f, 0x72, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, + 0x04, 0x70, 0x6f, 0x72, 0x74, 0x22, 0x79, 0x0a, 0x15, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, + 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x0e, + 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x02, 0x69, 0x64, 0x12, 0x10, + 0x0a, 0x03, 0x75, 0x72, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x75, 0x72, 0x6c, + 0x12, 0x10, 0x0a, 0x03, 0x6f, 0x72, 0x67, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6f, + 0x72, 0x67, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x05, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x16, 0x0a, 0x06, 0x62, 0x75, 0x63, 0x6b, + 0x65, 0x74, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x62, 0x75, 0x63, 0x6b, 0x65, 0x74, + 0x32, 0x5c, 0x0a, 0x09, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x12, 0x4f, 0x0a, + 0x14, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x48, 0x61, + 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x12, 0x1a, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x61, + 0x63, 0x74, 0x6f, 0x72, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x1b, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, + 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x13, + 0x5a, 0x11, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x67, + 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_internal_pkg_grpc_handshake_proto_rawDescOnce sync.Once + file_internal_pkg_grpc_handshake_proto_rawDescData = file_internal_pkg_grpc_handshake_proto_rawDesc +) + +func file_internal_pkg_grpc_handshake_proto_rawDescGZIP() []byte { + file_internal_pkg_grpc_handshake_proto_rawDescOnce.Do(func() { + file_internal_pkg_grpc_handshake_proto_rawDescData = protoimpl.X.CompressGZIP(file_internal_pkg_grpc_handshake_proto_rawDescData) + }) + return file_internal_pkg_grpc_handshake_proto_rawDescData +} + +var file_internal_pkg_grpc_handshake_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_internal_pkg_grpc_handshake_proto_goTypes = []interface{}{ + (*ReactorClientRequest)(nil), // 0: grpc.ReactorClientRequest + (*ReactorClientResponse)(nil), // 1: grpc.ReactorClientResponse +} +var file_internal_pkg_grpc_handshake_proto_depIdxs = []int32{ + 0, // 0: grpc.handshake.ReactorClientHandler:input_type -> grpc.ReactorClientRequest + 1, // 1: grpc.handshake.ReactorClientHandler:output_type -> grpc.ReactorClientResponse + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_internal_pkg_grpc_handshake_proto_init() } +func file_internal_pkg_grpc_handshake_proto_init() { + if File_internal_pkg_grpc_handshake_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_internal_pkg_grpc_handshake_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ReactorClientRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_internal_pkg_grpc_handshake_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ReactorClientResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_internal_pkg_grpc_handshake_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_internal_pkg_grpc_handshake_proto_goTypes, + DependencyIndexes: file_internal_pkg_grpc_handshake_proto_depIdxs, + MessageInfos: file_internal_pkg_grpc_handshake_proto_msgTypes, + }.Build() + File_internal_pkg_grpc_handshake_proto = out.File + file_internal_pkg_grpc_handshake_proto_rawDesc = nil + file_internal_pkg_grpc_handshake_proto_goTypes = nil + file_internal_pkg_grpc_handshake_proto_depIdxs = nil +} diff --git a/internal/pkg/grpc/handshake.proto b/internal/pkg/grpc/handshake.proto new file mode 100644 index 0000000..716477e --- /dev/null +++ b/internal/pkg/grpc/handshake.proto @@ -0,0 +1,21 @@ +syntax = "proto3"; +package grpc; + +option go_package = "internal/pkg/grpc"; + +service handshake { + rpc ReactorClientHandler(ReactorClientRequest) returns (ReactorClientResponse); +} + +message ReactorClientRequest { + uint32 id = 1; + uint32 port = 2; // client gRPC port +} + +message ReactorClientResponse { + uint32 id = 1; + string url = 2; + string org = 3; + string token = 4; + string bucket = 5; +} diff --git a/internal/pkg/grpc/server_grpc.pb.go b/internal/pkg/grpc/handshake_grpc.pb.go similarity index 64% rename from internal/pkg/grpc/server_grpc.pb.go rename to internal/pkg/grpc/handshake_grpc.pb.go index eaa3c5d..a78492f 100644 --- a/internal/pkg/grpc/server_grpc.pb.go +++ b/internal/pkg/grpc/handshake_grpc.pb.go @@ -1,8 +1,8 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.2.0 -// - protoc v3.6.1 -// source: internal/pkg/grpc/server.proto +// - protoc v3.21.12 +// source: internal/pkg/grpc/handshake.proto package grpc @@ -22,7 +22,7 @@ const _ = grpc.SupportPackageIsVersion7 // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type HandshakeClient interface { - ClientDiscoveryHandler(ctx context.Context, in *ClientRequest, opts ...grpc.CallOption) (*ClientResponse, error) + ReactorClientHandler(ctx context.Context, in *ReactorClientRequest, opts ...grpc.CallOption) (*ReactorClientResponse, error) } type handshakeClient struct { @@ -33,9 +33,9 @@ func NewHandshakeClient(cc grpc.ClientConnInterface) HandshakeClient { return &handshakeClient{cc} } -func (c *handshakeClient) ClientDiscoveryHandler(ctx context.Context, in *ClientRequest, opts ...grpc.CallOption) (*ClientResponse, error) { - out := new(ClientResponse) - err := c.cc.Invoke(ctx, "/grpc.handshake/ClientDiscoveryHandler", in, out, opts...) +func (c *handshakeClient) ReactorClientHandler(ctx context.Context, in *ReactorClientRequest, opts ...grpc.CallOption) (*ReactorClientResponse, error) { + out := new(ReactorClientResponse) + err := c.cc.Invoke(ctx, "/grpc.handshake/ReactorClientHandler", in, out, opts...) if err != nil { return nil, err } @@ -46,7 +46,7 @@ func (c *handshakeClient) ClientDiscoveryHandler(ctx context.Context, in *Client // All implementations must embed UnimplementedHandshakeServer // for forward compatibility type HandshakeServer interface { - ClientDiscoveryHandler(context.Context, *ClientRequest) (*ClientResponse, error) + ReactorClientHandler(context.Context, *ReactorClientRequest) (*ReactorClientResponse, error) mustEmbedUnimplementedHandshakeServer() } @@ -54,8 +54,8 @@ type HandshakeServer interface { type UnimplementedHandshakeServer struct { } -func (UnimplementedHandshakeServer) ClientDiscoveryHandler(context.Context, *ClientRequest) (*ClientResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method ClientDiscoveryHandler not implemented") +func (UnimplementedHandshakeServer) ReactorClientHandler(context.Context, *ReactorClientRequest) (*ReactorClientResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method ReactorClientHandler not implemented") } func (UnimplementedHandshakeServer) mustEmbedUnimplementedHandshakeServer() {} @@ -70,20 +70,20 @@ func RegisterHandshakeServer(s grpc.ServiceRegistrar, srv HandshakeServer) { s.RegisterService(&Handshake_ServiceDesc, srv) } -func _Handshake_ClientDiscoveryHandler_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(ClientRequest) +func _Handshake_ReactorClientHandler_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ReactorClientRequest) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(HandshakeServer).ClientDiscoveryHandler(ctx, in) + return srv.(HandshakeServer).ReactorClientHandler(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/grpc.handshake/ClientDiscoveryHandler", + FullMethod: "/grpc.handshake/ReactorClientHandler", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(HandshakeServer).ClientDiscoveryHandler(ctx, req.(*ClientRequest)) + return srv.(HandshakeServer).ReactorClientHandler(ctx, req.(*ReactorClientRequest)) } return interceptor(ctx, in, info, handler) } @@ -96,10 +96,10 @@ var Handshake_ServiceDesc = grpc.ServiceDesc{ HandlerType: (*HandshakeServer)(nil), Methods: []grpc.MethodDesc{ { - MethodName: "ClientDiscoveryHandler", - Handler: _Handshake_ClientDiscoveryHandler_Handler, + MethodName: "ReactorClientHandler", + Handler: _Handshake_ReactorClientHandler_Handler, }, }, Streams: []grpc.StreamDesc{}, - Metadata: "internal/pkg/grpc/server.proto", + Metadata: "internal/pkg/grpc/handshake.proto", } diff --git a/internal/pkg/grpc/monitoring.pb.go b/internal/pkg/grpc/monitoring.pb.go index f47a538..59cd1d3 100644 --- a/internal/pkg/grpc/monitoring.pb.go +++ b/internal/pkg/grpc/monitoring.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.28.0 -// protoc v3.12.4 +// protoc-gen-go v1.28.1 +// protoc v3.21.12 // source: internal/pkg/grpc/monitoring.proto package grpc @@ -20,56 +20,7 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) -type Status int32 - -const ( - Status_DEAD Status = 0 - Status_ALIVE Status = 1 - Status_UNKOWN Status = 2 -) - -// Enum value maps for Status. -var ( - Status_name = map[int32]string{ - 0: "DEAD", - 1: "ALIVE", - 2: "UNKOWN", - } - Status_value = map[string]int32{ - "DEAD": 0, - "ALIVE": 1, - "UNKOWN": 2, - } -) - -func (x Status) Enum() *Status { - p := new(Status) - *p = x - return p -} - -func (x Status) String() string { - return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) -} - -func (Status) Descriptor() protoreflect.EnumDescriptor { - return file_internal_pkg_grpc_monitoring_proto_enumTypes[0].Descriptor() -} - -func (Status) Type() protoreflect.EnumType { - return &file_internal_pkg_grpc_monitoring_proto_enumTypes[0] -} - -func (x Status) Number() protoreflect.EnumNumber { - return protoreflect.EnumNumber(x) -} - -// Deprecated: Use Status.Descriptor instead. -func (Status) EnumDescriptor() ([]byte, []int) { - return file_internal_pkg_grpc_monitoring_proto_rawDescGZIP(), []int{0} -} - -type ReactorStatusResponse struct { +type ReactorAck struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields @@ -77,8 +28,8 @@ type ReactorStatusResponse struct { Id int32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` } -func (x *ReactorStatusResponse) Reset() { - *x = ReactorStatusResponse{} +func (x *ReactorAck) Reset() { + *x = ReactorAck{} if protoimpl.UnsafeEnabled { mi := &file_internal_pkg_grpc_monitoring_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -86,13 +37,13 @@ func (x *ReactorStatusResponse) Reset() { } } -func (x *ReactorStatusResponse) String() string { +func (x *ReactorAck) String() string { return protoimpl.X.MessageStringOf(x) } -func (*ReactorStatusResponse) ProtoMessage() {} +func (*ReactorAck) ProtoMessage() {} -func (x *ReactorStatusResponse) ProtoReflect() protoreflect.Message { +func (x *ReactorAck) ProtoReflect() protoreflect.Message { mi := &file_internal_pkg_grpc_monitoring_proto_msgTypes[0] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -104,30 +55,28 @@ func (x *ReactorStatusResponse) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use ReactorStatusResponse.ProtoReflect.Descriptor instead. -func (*ReactorStatusResponse) Descriptor() ([]byte, []int) { +// Deprecated: Use ReactorAck.ProtoReflect.Descriptor instead. +func (*ReactorAck) Descriptor() ([]byte, []int) { return file_internal_pkg_grpc_monitoring_proto_rawDescGZIP(), []int{0} } -func (x *ReactorStatusResponse) GetId() int32 { +func (x *ReactorAck) GetId() int32 { if x != nil { return x.Id } return 0 } -type ReactorStatusPing struct { +type ReactorPing struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields Id int32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` - // new devices - Devices []*Device `protobuf:"bytes,2,rep,name=devices,proto3" json:"devices,omitempty"` } -func (x *ReactorStatusPing) Reset() { - *x = ReactorStatusPing{} +func (x *ReactorPing) Reset() { + *x = ReactorPing{} if protoimpl.UnsafeEnabled { mi := &file_internal_pkg_grpc_monitoring_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -135,13 +84,13 @@ func (x *ReactorStatusPing) Reset() { } } -func (x *ReactorStatusPing) String() string { +func (x *ReactorPing) String() string { return protoimpl.X.MessageStringOf(x) } -func (*ReactorStatusPing) ProtoMessage() {} +func (*ReactorPing) ProtoMessage() {} -func (x *ReactorStatusPing) ProtoReflect() protoreflect.Message { +func (x *ReactorPing) ProtoReflect() protoreflect.Message { mi := &file_internal_pkg_grpc_monitoring_proto_msgTypes[1] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -153,108 +102,34 @@ func (x *ReactorStatusPing) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use ReactorStatusPing.ProtoReflect.Descriptor instead. -func (*ReactorStatusPing) Descriptor() ([]byte, []int) { +// Deprecated: Use ReactorPing.ProtoReflect.Descriptor instead. +func (*ReactorPing) Descriptor() ([]byte, []int) { return file_internal_pkg_grpc_monitoring_proto_rawDescGZIP(), []int{1} } -func (x *ReactorStatusPing) GetId() int32 { +func (x *ReactorPing) GetId() int32 { if x != nil { return x.Id } return 0 } -func (x *ReactorStatusPing) GetDevices() []*Device { - if x != nil { - return x.Devices - } - return nil -} - -type Device struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Addr int32 `protobuf:"varint,1,opt,name=addr,proto3" json:"addr,omitempty"` // i2c addr - Status Status `protobuf:"varint,2,opt,name=status,proto3,enum=grpc.Status" json:"status,omitempty"` // most recent status -} - -func (x *Device) Reset() { - *x = Device{} - if protoimpl.UnsafeEnabled { - mi := &file_internal_pkg_grpc_monitoring_proto_msgTypes[2] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *Device) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*Device) ProtoMessage() {} - -func (x *Device) ProtoReflect() protoreflect.Message { - mi := &file_internal_pkg_grpc_monitoring_proto_msgTypes[2] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use Device.ProtoReflect.Descriptor instead. -func (*Device) Descriptor() ([]byte, []int) { - return file_internal_pkg_grpc_monitoring_proto_rawDescGZIP(), []int{2} -} - -func (x *Device) GetAddr() int32 { - if x != nil { - return x.Addr - } - return 0 -} - -func (x *Device) GetStatus() Status { - if x != nil { - return x.Status - } - return Status_DEAD -} - var File_internal_pkg_grpc_monitoring_proto protoreflect.FileDescriptor var file_internal_pkg_grpc_monitoring_proto_rawDesc = []byte{ 0x0a, 0x22, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x69, 0x6e, 0x67, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x04, 0x67, 0x72, 0x70, 0x63, 0x22, 0x27, 0x0a, 0x15, 0x52, 0x65, - 0x61, 0x63, 0x74, 0x6f, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, - 0x02, 0x69, 0x64, 0x22, 0x4b, 0x0a, 0x11, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x53, 0x74, - 0x61, 0x74, 0x75, 0x73, 0x50, 0x69, 0x6e, 0x67, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x05, 0x52, 0x02, 0x69, 0x64, 0x12, 0x26, 0x0a, 0x07, 0x64, 0x65, 0x76, 0x69, - 0x63, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0c, 0x2e, 0x67, 0x72, 0x70, 0x63, - 0x2e, 0x44, 0x65, 0x76, 0x69, 0x63, 0x65, 0x52, 0x07, 0x64, 0x65, 0x76, 0x69, 0x63, 0x65, 0x73, - 0x22, 0x42, 0x0a, 0x06, 0x44, 0x65, 0x76, 0x69, 0x63, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x61, 0x64, - 0x64, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x04, 0x61, 0x64, 0x64, 0x72, 0x12, 0x24, - 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0c, - 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x06, 0x73, 0x74, - 0x61, 0x74, 0x75, 0x73, 0x2a, 0x29, 0x0a, 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x08, - 0x0a, 0x04, 0x44, 0x45, 0x41, 0x44, 0x10, 0x00, 0x12, 0x09, 0x0a, 0x05, 0x41, 0x4c, 0x49, 0x56, - 0x45, 0x10, 0x01, 0x12, 0x0a, 0x0a, 0x06, 0x55, 0x4e, 0x4b, 0x4f, 0x57, 0x4e, 0x10, 0x02, 0x32, - 0x5a, 0x0a, 0x0a, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x69, 0x6e, 0x67, 0x12, 0x4c, 0x0a, - 0x14, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x48, 0x61, - 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x12, 0x17, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x61, - 0x63, 0x74, 0x6f, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x50, 0x69, 0x6e, 0x67, 0x1a, 0x1b, - 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x53, 0x74, 0x61, - 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x13, 0x5a, 0x11, 0x69, - 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x67, 0x72, 0x70, 0x63, - 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x04, 0x67, 0x72, 0x70, 0x63, 0x22, 0x1c, 0x0a, 0x0a, 0x52, 0x65, + 0x61, 0x63, 0x74, 0x6f, 0x72, 0x41, 0x63, 0x6b, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x05, 0x52, 0x02, 0x69, 0x64, 0x22, 0x1d, 0x0a, 0x0b, 0x52, 0x65, 0x61, 0x63, + 0x74, 0x6f, 0x72, 0x50, 0x69, 0x6e, 0x67, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x05, 0x52, 0x02, 0x69, 0x64, 0x32, 0x49, 0x0a, 0x0a, 0x6d, 0x6f, 0x6e, 0x69, 0x74, + 0x6f, 0x72, 0x69, 0x6e, 0x67, 0x12, 0x3b, 0x0a, 0x12, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, + 0x50, 0x69, 0x6e, 0x67, 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x12, 0x11, 0x2e, 0x67, 0x72, + 0x70, 0x63, 0x2e, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x50, 0x69, 0x6e, 0x67, 0x1a, 0x10, + 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x41, 0x63, 0x6b, + 0x28, 0x01, 0x42, 0x13, 0x5a, 0x11, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, + 0x6b, 0x67, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -269,24 +144,19 @@ func file_internal_pkg_grpc_monitoring_proto_rawDescGZIP() []byte { return file_internal_pkg_grpc_monitoring_proto_rawDescData } -var file_internal_pkg_grpc_monitoring_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_internal_pkg_grpc_monitoring_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_internal_pkg_grpc_monitoring_proto_msgTypes = make([]protoimpl.MessageInfo, 2) var file_internal_pkg_grpc_monitoring_proto_goTypes = []interface{}{ - (Status)(0), // 0: grpc.Status - (*ReactorStatusResponse)(nil), // 1: grpc.ReactorStatusResponse - (*ReactorStatusPing)(nil), // 2: grpc.ReactorStatusPing - (*Device)(nil), // 3: grpc.Device + (*ReactorAck)(nil), // 0: grpc.ReactorAck + (*ReactorPing)(nil), // 1: grpc.ReactorPing } var file_internal_pkg_grpc_monitoring_proto_depIdxs = []int32{ - 3, // 0: grpc.ReactorStatusPing.devices:type_name -> grpc.Device - 0, // 1: grpc.Device.status:type_name -> grpc.Status - 2, // 2: grpc.monitoring.ReactorStatusHandler:input_type -> grpc.ReactorStatusPing - 1, // 3: grpc.monitoring.ReactorStatusHandler:output_type -> grpc.ReactorStatusResponse - 3, // [3:4] is the sub-list for method output_type - 2, // [2:3] is the sub-list for method input_type - 2, // [2:2] is the sub-list for extension type_name - 2, // [2:2] is the sub-list for extension extendee - 0, // [0:2] is the sub-list for field type_name + 1, // 0: grpc.monitoring.ReactorPingHandler:input_type -> grpc.ReactorPing + 0, // 1: grpc.monitoring.ReactorPingHandler:output_type -> grpc.ReactorAck + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name } func init() { file_internal_pkg_grpc_monitoring_proto_init() } @@ -296,7 +166,7 @@ func file_internal_pkg_grpc_monitoring_proto_init() { } if !protoimpl.UnsafeEnabled { file_internal_pkg_grpc_monitoring_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ReactorStatusResponse); i { + switch v := v.(*ReactorAck); i { case 0: return &v.state case 1: @@ -308,19 +178,7 @@ func file_internal_pkg_grpc_monitoring_proto_init() { } } file_internal_pkg_grpc_monitoring_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ReactorStatusPing); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_internal_pkg_grpc_monitoring_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Device); i { + switch v := v.(*ReactorPing); i { case 0: return &v.state case 1: @@ -337,14 +195,13 @@ func file_internal_pkg_grpc_monitoring_proto_init() { File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_internal_pkg_grpc_monitoring_proto_rawDesc, - NumEnums: 1, - NumMessages: 3, + NumEnums: 0, + NumMessages: 2, NumExtensions: 0, NumServices: 1, }, GoTypes: file_internal_pkg_grpc_monitoring_proto_goTypes, DependencyIndexes: file_internal_pkg_grpc_monitoring_proto_depIdxs, - EnumInfos: file_internal_pkg_grpc_monitoring_proto_enumTypes, MessageInfos: file_internal_pkg_grpc_monitoring_proto_msgTypes, }.Build() File_internal_pkg_grpc_monitoring_proto = out.File diff --git a/internal/pkg/grpc/monitoring.proto b/internal/pkg/grpc/monitoring.proto index 128d9ad..8f307b2 100644 --- a/internal/pkg/grpc/monitoring.proto +++ b/internal/pkg/grpc/monitoring.proto @@ -4,26 +4,13 @@ package grpc; option go_package = "internal/pkg/grpc"; service monitoring { - rpc ReactorStatusHandler(ReactorStatusPing) returns (ReactorStatusResponse); + rpc ReactorPingHandler(stream ReactorPing) returns (ReactorAck); } -message ReactorStatusResponse { +message ReactorAck { int32 id = 1; } -message ReactorStatusPing { +message ReactorPing { int32 id = 1; - // new devices - repeated Device devices = 2; -} - -enum Status { - DEAD = 0; - ALIVE = 1; - UNKOWN = 2; -} - -message Device { - int32 addr = 1; // i2c addr - Status status = 2; // most recent status } diff --git a/internal/pkg/grpc/monitoring_grpc.pb.go b/internal/pkg/grpc/monitoring_grpc.pb.go index cc3461b..75a39d6 100644 --- a/internal/pkg/grpc/monitoring_grpc.pb.go +++ b/internal/pkg/grpc/monitoring_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.2.0 -// - protoc v3.12.4 +// - protoc v3.21.12 // source: internal/pkg/grpc/monitoring.proto package grpc @@ -22,7 +22,7 @@ const _ = grpc.SupportPackageIsVersion7 // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type MonitoringClient interface { - ReactorStatusHandler(ctx context.Context, in *ReactorStatusPing, opts ...grpc.CallOption) (*ReactorStatusResponse, error) + ReactorPingHandler(ctx context.Context, opts ...grpc.CallOption) (Monitoring_ReactorPingHandlerClient, error) } type monitoringClient struct { @@ -33,20 +33,45 @@ func NewMonitoringClient(cc grpc.ClientConnInterface) MonitoringClient { return &monitoringClient{cc} } -func (c *monitoringClient) ReactorStatusHandler(ctx context.Context, in *ReactorStatusPing, opts ...grpc.CallOption) (*ReactorStatusResponse, error) { - out := new(ReactorStatusResponse) - err := c.cc.Invoke(ctx, "/grpc.monitoring/ReactorStatusHandler", in, out, opts...) +func (c *monitoringClient) ReactorPingHandler(ctx context.Context, opts ...grpc.CallOption) (Monitoring_ReactorPingHandlerClient, error) { + stream, err := c.cc.NewStream(ctx, &Monitoring_ServiceDesc.Streams[0], "/grpc.monitoring/ReactorPingHandler", opts...) if err != nil { return nil, err } - return out, nil + x := &monitoringReactorPingHandlerClient{stream} + return x, nil +} + +type Monitoring_ReactorPingHandlerClient interface { + Send(*ReactorPing) error + CloseAndRecv() (*ReactorAck, error) + grpc.ClientStream +} + +type monitoringReactorPingHandlerClient struct { + grpc.ClientStream +} + +func (x *monitoringReactorPingHandlerClient) Send(m *ReactorPing) error { + return x.ClientStream.SendMsg(m) +} + +func (x *monitoringReactorPingHandlerClient) CloseAndRecv() (*ReactorAck, error) { + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + m := new(ReactorAck) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil } // MonitoringServer is the server API for Monitoring service. // All implementations must embed UnimplementedMonitoringServer // for forward compatibility type MonitoringServer interface { - ReactorStatusHandler(context.Context, *ReactorStatusPing) (*ReactorStatusResponse, error) + ReactorPingHandler(Monitoring_ReactorPingHandlerServer) error mustEmbedUnimplementedMonitoringServer() } @@ -54,8 +79,8 @@ type MonitoringServer interface { type UnimplementedMonitoringServer struct { } -func (UnimplementedMonitoringServer) ReactorStatusHandler(context.Context, *ReactorStatusPing) (*ReactorStatusResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method ReactorStatusHandler not implemented") +func (UnimplementedMonitoringServer) ReactorPingHandler(Monitoring_ReactorPingHandlerServer) error { + return status.Errorf(codes.Unimplemented, "method ReactorPingHandler not implemented") } func (UnimplementedMonitoringServer) mustEmbedUnimplementedMonitoringServer() {} @@ -70,22 +95,30 @@ func RegisterMonitoringServer(s grpc.ServiceRegistrar, srv MonitoringServer) { s.RegisterService(&Monitoring_ServiceDesc, srv) } -func _Monitoring_ReactorStatusHandler_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(ReactorStatusPing) - if err := dec(in); err != nil { +func _Monitoring_ReactorPingHandler_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(MonitoringServer).ReactorPingHandler(&monitoringReactorPingHandlerServer{stream}) +} + +type Monitoring_ReactorPingHandlerServer interface { + SendAndClose(*ReactorAck) error + Recv() (*ReactorPing, error) + grpc.ServerStream +} + +type monitoringReactorPingHandlerServer struct { + grpc.ServerStream +} + +func (x *monitoringReactorPingHandlerServer) SendAndClose(m *ReactorAck) error { + return x.ServerStream.SendMsg(m) +} + +func (x *monitoringReactorPingHandlerServer) Recv() (*ReactorPing, error) { + m := new(ReactorPing) + if err := x.ServerStream.RecvMsg(m); err != nil { return nil, err } - if interceptor == nil { - return srv.(MonitoringServer).ReactorStatusHandler(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/grpc.monitoring/ReactorStatusHandler", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(MonitoringServer).ReactorStatusHandler(ctx, req.(*ReactorStatusPing)) - } - return interceptor(ctx, in, info, handler) + return m, nil } // Monitoring_ServiceDesc is the grpc.ServiceDesc for Monitoring service. @@ -94,12 +127,13 @@ func _Monitoring_ReactorStatusHandler_Handler(srv interface{}, ctx context.Conte var Monitoring_ServiceDesc = grpc.ServiceDesc{ ServiceName: "grpc.monitoring", HandlerType: (*MonitoringServer)(nil), - Methods: []grpc.MethodDesc{ + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ { - MethodName: "ReactorStatusHandler", - Handler: _Monitoring_ReactorStatusHandler_Handler, + StreamName: "ReactorPingHandler", + Handler: _Monitoring_ReactorPingHandler_Handler, + ClientStreams: true, }, }, - Streams: []grpc.StreamDesc{}, Metadata: "internal/pkg/grpc/monitoring.proto", } diff --git a/internal/pkg/grpc/server.pb.go b/internal/pkg/grpc/server.pb.go deleted file mode 100644 index 20b821c..0000000 --- a/internal/pkg/grpc/server.pb.go +++ /dev/null @@ -1,335 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// versions: -// protoc-gen-go v1.28.0 -// protoc v3.6.1 -// source: internal/pkg/grpc/server.proto - -package grpc - -import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" - reflect "reflect" - sync "sync" -) - -const ( - // Verify that this generated code is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) - // Verify that runtime/protoimpl is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) -) - -type ClientRequest struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - ClientId uint32 `protobuf:"varint,1,opt,name=clientId,proto3" json:"clientId,omitempty"` - ClientType string `protobuf:"bytes,2,opt,name=clientType,proto3" json:"clientType,omitempty"` -} - -func (x *ClientRequest) Reset() { - *x = ClientRequest{} - if protoimpl.UnsafeEnabled { - mi := &file_internal_pkg_grpc_server_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *ClientRequest) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*ClientRequest) ProtoMessage() {} - -func (x *ClientRequest) ProtoReflect() protoreflect.Message { - mi := &file_internal_pkg_grpc_server_proto_msgTypes[0] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use ClientRequest.ProtoReflect.Descriptor instead. -func (*ClientRequest) Descriptor() ([]byte, []int) { - return file_internal_pkg_grpc_server_proto_rawDescGZIP(), []int{0} -} - -func (x *ClientRequest) GetClientId() uint32 { - if x != nil { - return x.ClientId - } - return 0 -} - -func (x *ClientRequest) GetClientType() string { - if x != nil { - return x.ClientType - } - return "" -} - -type ClientResponse struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - ClientId uint32 `protobuf:"varint,1,opt,name=clientId,proto3" json:"clientId,omitempty"` - ServerPort uint32 `protobuf:"varint,2,opt,name=serverPort,proto3" json:"serverPort,omitempty"` - Database *Database `protobuf:"bytes,3,opt,name=database,proto3" json:"database,omitempty"` -} - -func (x *ClientResponse) Reset() { - *x = ClientResponse{} - if protoimpl.UnsafeEnabled { - mi := &file_internal_pkg_grpc_server_proto_msgTypes[1] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *ClientResponse) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*ClientResponse) ProtoMessage() {} - -func (x *ClientResponse) ProtoReflect() protoreflect.Message { - mi := &file_internal_pkg_grpc_server_proto_msgTypes[1] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use ClientResponse.ProtoReflect.Descriptor instead. -func (*ClientResponse) Descriptor() ([]byte, []int) { - return file_internal_pkg_grpc_server_proto_rawDescGZIP(), []int{1} -} - -func (x *ClientResponse) GetClientId() uint32 { - if x != nil { - return x.ClientId - } - return 0 -} - -func (x *ClientResponse) GetServerPort() uint32 { - if x != nil { - return x.ServerPort - } - return 0 -} - -func (x *ClientResponse) GetDatabase() *Database { - if x != nil { - return x.Database - } - return nil -} - -type Database struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - URL string `protobuf:"bytes,1,opt,name=URL,proto3" json:"URL,omitempty"` - ORG string `protobuf:"bytes,2,opt,name=ORG,proto3" json:"ORG,omitempty"` - Token string `protobuf:"bytes,3,opt,name=token,proto3" json:"token,omitempty"` - Bucket string `protobuf:"bytes,4,opt,name=bucket,proto3" json:"bucket,omitempty"` -} - -func (x *Database) Reset() { - *x = Database{} - if protoimpl.UnsafeEnabled { - mi := &file_internal_pkg_grpc_server_proto_msgTypes[2] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *Database) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*Database) ProtoMessage() {} - -func (x *Database) ProtoReflect() protoreflect.Message { - mi := &file_internal_pkg_grpc_server_proto_msgTypes[2] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use Database.ProtoReflect.Descriptor instead. -func (*Database) Descriptor() ([]byte, []int) { - return file_internal_pkg_grpc_server_proto_rawDescGZIP(), []int{2} -} - -func (x *Database) GetURL() string { - if x != nil { - return x.URL - } - return "" -} - -func (x *Database) GetORG() string { - if x != nil { - return x.ORG - } - return "" -} - -func (x *Database) GetToken() string { - if x != nil { - return x.Token - } - return "" -} - -func (x *Database) GetBucket() string { - if x != nil { - return x.Bucket - } - return "" -} - -var File_internal_pkg_grpc_server_proto protoreflect.FileDescriptor - -var file_internal_pkg_grpc_server_proto_rawDesc = []byte{ - 0x0a, 0x1e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x67, - 0x72, 0x70, 0x63, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x12, 0x04, 0x67, 0x72, 0x70, 0x63, 0x22, 0x4b, 0x0a, 0x0d, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x63, 0x6c, 0x69, 0x65, 0x6e, - 0x74, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x63, 0x6c, 0x69, 0x65, 0x6e, - 0x74, 0x49, 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, - 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x54, - 0x79, 0x70, 0x65, 0x22, 0x78, 0x0a, 0x0e, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x49, - 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x49, - 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x50, 0x6f, 0x72, 0x74, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x50, 0x6f, 0x72, - 0x74, 0x12, 0x2a, 0x0a, 0x08, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x18, 0x03, 0x20, - 0x01, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x62, - 0x61, 0x73, 0x65, 0x52, 0x08, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x22, 0x5c, 0x0a, - 0x08, 0x44, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x55, 0x52, 0x4c, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x55, 0x52, 0x4c, 0x12, 0x10, 0x0a, 0x03, 0x4f, - 0x52, 0x47, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x4f, 0x52, 0x47, 0x12, 0x14, 0x0a, - 0x05, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, - 0x6b, 0x65, 0x6e, 0x12, 0x16, 0x0a, 0x06, 0x62, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x18, 0x04, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x06, 0x62, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x32, 0x50, 0x0a, 0x09, 0x68, - 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x12, 0x43, 0x0a, 0x16, 0x43, 0x6c, 0x69, 0x65, - 0x6e, 0x74, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x48, 0x61, 0x6e, 0x64, 0x6c, - 0x65, 0x72, 0x12, 0x13, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x43, - 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x13, 0x5a, - 0x11, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x67, 0x72, - 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, -} - -var ( - file_internal_pkg_grpc_server_proto_rawDescOnce sync.Once - file_internal_pkg_grpc_server_proto_rawDescData = file_internal_pkg_grpc_server_proto_rawDesc -) - -func file_internal_pkg_grpc_server_proto_rawDescGZIP() []byte { - file_internal_pkg_grpc_server_proto_rawDescOnce.Do(func() { - file_internal_pkg_grpc_server_proto_rawDescData = protoimpl.X.CompressGZIP(file_internal_pkg_grpc_server_proto_rawDescData) - }) - return file_internal_pkg_grpc_server_proto_rawDescData -} - -var file_internal_pkg_grpc_server_proto_msgTypes = make([]protoimpl.MessageInfo, 3) -var file_internal_pkg_grpc_server_proto_goTypes = []interface{}{ - (*ClientRequest)(nil), // 0: grpc.ClientRequest - (*ClientResponse)(nil), // 1: grpc.ClientResponse - (*Database)(nil), // 2: grpc.Database -} -var file_internal_pkg_grpc_server_proto_depIdxs = []int32{ - 2, // 0: grpc.ClientResponse.database:type_name -> grpc.Database - 0, // 1: grpc.handshake.ClientDiscoveryHandler:input_type -> grpc.ClientRequest - 1, // 2: grpc.handshake.ClientDiscoveryHandler:output_type -> grpc.ClientResponse - 2, // [2:3] is the sub-list for method output_type - 1, // [1:2] is the sub-list for method input_type - 1, // [1:1] is the sub-list for extension type_name - 1, // [1:1] is the sub-list for extension extendee - 0, // [0:1] is the sub-list for field type_name -} - -func init() { file_internal_pkg_grpc_server_proto_init() } -func file_internal_pkg_grpc_server_proto_init() { - if File_internal_pkg_grpc_server_proto != nil { - return - } - if !protoimpl.UnsafeEnabled { - file_internal_pkg_grpc_server_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ClientRequest); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_internal_pkg_grpc_server_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ClientResponse); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_internal_pkg_grpc_server_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Database); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - } - type x struct{} - out := protoimpl.TypeBuilder{ - File: protoimpl.DescBuilder{ - GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_internal_pkg_grpc_server_proto_rawDesc, - NumEnums: 0, - NumMessages: 3, - NumExtensions: 0, - NumServices: 1, - }, - GoTypes: file_internal_pkg_grpc_server_proto_goTypes, - DependencyIndexes: file_internal_pkg_grpc_server_proto_depIdxs, - MessageInfos: file_internal_pkg_grpc_server_proto_msgTypes, - }.Build() - File_internal_pkg_grpc_server_proto = out.File - file_internal_pkg_grpc_server_proto_rawDesc = nil - file_internal_pkg_grpc_server_proto_goTypes = nil - file_internal_pkg_grpc_server_proto_depIdxs = nil -} diff --git a/internal/pkg/grpc/server.proto b/internal/pkg/grpc/server.proto deleted file mode 100644 index 27f336f..0000000 --- a/internal/pkg/grpc/server.proto +++ /dev/null @@ -1,28 +0,0 @@ -syntax = "proto3"; -package grpc; - -option go_package = "internal/pkg/grpc"; - -service handshake { - rpc ClientDiscoveryHandler(ClientRequest) returns (ClientResponse); -} - -message ClientRequest { - uint32 clientId = 1; - string clientType = 2; - string ip = 3; // client ip - uint32 port = 4; // client port for gRPC server -} - -message ClientResponse { - uint32 clientId = 1; - uint32 serverPort = 2; - Database database = 3; -} - -message Database { - string URL = 1; - string ORG = 2; - string token = 3; - string bucket = 4; -} diff --git a/internal/pkg/influxdb/client.go b/internal/pkg/influxdb/client.go deleted file mode 100644 index c0b56a9..0000000 --- a/internal/pkg/influxdb/client.go +++ /dev/null @@ -1,81 +0,0 @@ -package influxdb - -import ( - _ "fmt" - - _ "github.com/influxdata/influxdb-client-go/v2" - "github.com/spf13/viper" -) - -type DBInfo struct { - URL string `mapstructure:"url"` - Org string `mapstructure:"org,omitempty` - Bucket string `mapstructure:"bucket,omitempty"` - Token string `mapstructure:"token,omitempty"` - // Client *influxdb2.Client -} - -type DBAdmin struct { - // struct for admin methods - *DBInfo - Config *viper.Viper -} - -type DBClient struct { - // struct for client methods - *DBInfo - Config *viper.Viper -} - -func NewDBInfo(config *viper.Viper) (*DBInfo, error) { - db := &DBInfo{} - // grabbing config vals - err := config.UnmarshalKey("db", db) - return db, err -} - -func NewDBClient(config *viper.Viper) (*DBClient, error) { - - client := &DBClient{Config: config} - // grabbing config vals - var err error - client.DBInfo, err = NewDBInfo(config) - return client, err -} - -func NewDBAdmin(config *viper.Viper) (*DBAdmin, error) { - admin := &DBAdmin{Config: config} - var err error - // creating client - admin.DBInfo, err = NewDBInfo(config) - return admin, err -} - -// base level funcs -func (d *DBInfo) Start() error { - // connect to DB based w/ info - return nil -} - -func (d *DBAdmin) GetReactorClient(id int) (url, bucket, org, token string, err error) { - // given an id returns - // (url, org, bucket, token, error) for said id - /* - client := influxdb2.NewClient(d.URL, d.Token) - defer client.Close() - bucket, err := client.BucketsAPI().FindBucketByName(context.Background(), id) - if err != nil { - return "", "", err - } - if d.ReactorExists(id) { - // get corresponding reactor token and bucket - } - */ - url = d.URL - org = d.Org - token = "" - bucket = "" - //err = errors.New("Unimpl") - err = nil - return -} diff --git a/internal/pkg/server/coordinator.go b/internal/pkg/server/coordinator.go index 1df9b7e..5b89e8a 100644 --- a/internal/pkg/server/coordinator.go +++ b/internal/pkg/server/coordinator.go @@ -4,9 +4,7 @@ package server import ( pb "FRMS/internal/pkg/grpc" - "FRMS/internal/pkg/influxdb" - "FRMS/internal/pkg/logging" - "context" + "errors" "fmt" "net" "sync" @@ -15,264 +13,68 @@ import ( "google.golang.org/grpc" ) -// Database is an interface to interact with the server database. -// Used mainly to find existing credentials for -// incoming reactor client connections. -type Database interface { - GetReactorClient(int) (string, string, string, string, error) // returns (url, org, bucket, token, err) -} - -// NewDatabaseAdmin creates a new database admin that implements the -// Database interface. -// Allows access to the database to find/create reactor credentials. -// Implemented via the influxdb package. -func NewDatabaseAdmin(config *viper.Viper) (Database, error) { - return influxdb.NewDBAdmin(config) -} - -// CentralCoordinator is the main coordinator struct that runs on the server. -// Used to oversee the reactor managers as well as process incoming -// client connections. -// Also interacts with the database and global config. -type CentralCoordinator struct { - ClientConnections *ClientPacket - *ReactorCoordinator - Database - Config *viper.Viper - // from config - Ports map[string]int `mapstructure:"ports"` - Err chan error -} - -// NewCentralCoordinator creates a central coordinator with the given global -// config and error channel. -// It will create a new reactor coordinator and database admin. -// It will also try to load the existing configuration information. -func NewCentralCoordinator(config *viper.Viper, ch chan error) *CentralCoordinator { - // create a central coordinator to manage requests - db, err := NewDatabaseAdmin(config) - if err != nil { - ch <- err - } - - rc, err := NewReactorCoordinator(config, ch) - if err != nil { - ch <- err - } - - config.UnmarshalKey("server.ports", rc) - - c := &CentralCoordinator{ - Err: ch, - Config: config, - Database: db, - ReactorCoordinator: rc, - } - - // grab config settings - if err = config.UnmarshalKey("server", c); err != nil { - ch <- err - } - - return c -} - -// Start activates the central coordinator and ensures it is ready for -// new clients. -// Creates a listener and starts both reactor coordinator and listener. -func (c *CentralCoordinator) Start() { - - clientChan := make(chan *ClientPacket) - - l, err := NewListener(clientChan, c.Err) - - if err != nil { - c.Err <- err - } - - c.Config.UnmarshalKey("server.ports", l) - - if err := c.ReactorCoordinator.Start(); err != nil { - c.Err <- err - } - - if err := l.Start(); err != nil { - c.Err <- err - } - - go c.ClientListener(clientChan) -} - -// ClientListener listens on the given channel for clients that are sent -// over from the listener. -// The clients are then passed to the handler before returning the response. -func (c *CentralCoordinator) ClientListener(ch chan *ClientPacket) { - - for client := range ch { - client.Response <- c.ClientHandler(client.Client) // respond with cred - } -} - -// ClientHandler takes in a client and retrieves the associated -// database credentials. -// Currently only handles reactor type clients, can be modified -// to support others. -func (c *CentralCoordinator) ClientHandler(cl *Client) *ClientResponse { - // returns reactor db info - var err error - cr := &ClientResponse{Port: c.Ports[cl.Type]} - - if cl.Type != "reactor" { - c.Err <- fmt.Errorf("client type %s not recognized", cl.Type) - } - - go c.ReactorCoordinator.ClientHandler(cl) +var ( + ErrMissingPort = errors.New("port not set") +) - // db info - cr.URL, cr.Org, cr.Token, cr.Bucket, err = c.Database.GetReactorClient(cl.Id) +// Coordinator is runs on the server and is used to oversee +// the reactor managers as well as process incoming client connections. +type Coordinator struct { + Config *viper.Viper + listener net.Listener + grpcServer *grpc.Server - if err != nil { - c.Err <- err - } + DatabasePort int `mapstructure:"database_port"` + GRPCPort int `mapstructure:"grpc_port"` - return cr -} + directory map[int]*ReactorManager + managerMu sync.RWMutex -// ReactorCoordinator is a strucutre used to store reactor managers for -// clients that have connected at some point. -type ReactorCoordinator struct { - Port int `mapstructure:"reactor"` - *ReactorManagers Err chan error - pb.UnimplementedMonitoringServer -} -// ReactorManagers is a structure that stores a concurrent map of the -// reactor managers as well as the global config. -type ReactorManagers struct { - Config *viper.Viper - Directory map[int]*ReactorManager - sync.RWMutex + // grpc + pb.UnimplementedHandshakeServer + pb.UnimplementedMonitoringServer } -// NewReactorCoordinator takes the global config and error channel and returns -// a pointer to a ReactorCoordinator as well as any errors. -func NewReactorCoordinator(config *viper.Viper, errCh chan error) (*ReactorCoordinator, error) { +// NewCentralCoordinator creates a central coordinator with the given global +// config and error channel. +func NewCentralCoordinator(config *viper.Viper, ch chan error) *Coordinator { rmap := make(map[int]*ReactorManager) - rm := &ReactorManagers{ - Directory: rmap, + return &Coordinator{ + Err: ch, Config: config, - } - - return &ReactorCoordinator{ - Err: errCh, - ReactorManagers: rm, - }, nil -} - -// Start starts the reactor coordinator and kicks off -// registering the gRPC service -func (c *ReactorCoordinator) Start() error { - - logging.Debug(logging.DStart, "RCO 01 Starting!") - - return c.Register() -} - -// ClientHandler takes in a client and finds or creates the correct -// manager for said client. -func (c *ReactorCoordinator) ClientHandler(cl *Client) { - - if err := c.UpdateReactorManager(cl, c.Err); err != nil { - c.Err <- err + directory: rmap, } } -// GetReactorManager attempts to locate a reactor manager for a given id. -// Returns either the associated reactor manager, or an error if -// a manager does not exist for the given id. -func (m *ReactorManagers) GetReactorManager(id int) (*ReactorManager, error) { - m.RLock() - defer m.RUnlock() - - rm, exists := m.Directory[id] +// Start loads config, starts network listener and registers grpc handlers. +// Ready for new clients on return. +func (c *Coordinator) Start() error { - if !exists { - return &ReactorManager{}, fmt.Errorf("no manager for reactor %d", id) + if err := c.Config.Unmarshal(c); err != nil { + return err } - return rm, nil -} - -// UpdateReactorManager takes in a client and error channel and passes the -// client to the associate reactor manager. -// If the client does not have an existing reactor manager, it will create one -// , start it, and add it to the map for future calls. -// The function then calls UpdateClient on the reactor manager and returns -// any errors generated by this function. -func (m *ReactorManagers) UpdateReactorManager(cl *Client, errCh chan error) error { - m.RLock() - defer m.RUnlock() - - var err error - - rm, exists := m.Directory[cl.Id] - if !exists { - // reactor manager does not exist, creating new one - logging.Debug( - logging.DClient, - "RCO 01 creating manager for %v", - cl.Id, - ) + // ensure it shows up as missing + if c.GRPCPort == 0 { + c.Config.Set("grpc_port", 0) + c.Config.WriteConfig() - if rm, err = NewReactorManager(cl, m.Config, errCh); err != nil { - return err - } - - if err = rm.Start(); err != nil { - return err - } - - m.Directory[cl.Id] = rm + return ErrMissingPort } - return rm.UpdateClient(cl) -} - -// Register attaches to the servers port and attempts to bind -// a gRPC server to it. -func (r *ReactorCoordinator) Register() error { - - lis, err := net.Listen("tcp", fmt.Sprintf(":%v", r.Port)) + lis, err := net.Listen("tcp", fmt.Sprintf(":%v", c.GRPCPort)) if err != nil { return err } grpcServer := grpc.NewServer() - pb.RegisterMonitoringServer(grpcServer, r) - go grpcServer.Serve(lis) + c.listener = lis + c.grpcServer = grpcServer - logging.Debug(logging.DClient, "RCO 01 ready") - - return nil -} - -// ReactorStatusHandler is a gRPC handler used to handle incoming -// reactor requests containing information about said reactor. -// It will get the associate reactor manager and pass the -// request device information before returning an acknowledgement. -func (r *ReactorCoordinator) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) { - - rm, err := r.GetReactorManager(int(req.GetId())) - - if err != nil { - return &pb.ReactorStatusResponse{}, err - } - - go rm.ReactorDeviceHandler(req.GetDevices()) - - return &pb.ReactorStatusResponse{Id: int32(rm.Id)}, nil + return c.Register() } diff --git a/internal/pkg/server/database.go b/internal/pkg/server/database.go new file mode 100644 index 0000000..4195fad --- /dev/null +++ b/internal/pkg/server/database.go @@ -0,0 +1,13 @@ +package server + +type dbinfo struct { + url string + org string + token string + bucket string +} + +func (c *Coordinator) getReactorDatabaseCred(id int) (*dbinfo, error) { + + return &dbinfo{}, nil +} diff --git a/internal/pkg/server/handler.go b/internal/pkg/server/handler.go new file mode 100644 index 0000000..5e855a5 --- /dev/null +++ b/internal/pkg/server/handler.go @@ -0,0 +1,48 @@ +package server + +import ( + pb "FRMS/internal/pkg/grpc" + "FRMS/internal/pkg/logging" + "context" +) + +// ClientDiscoveryHandler implements the grpc method which can be called +// by incoming clients to first make connection to the central +// coordinator and receive database credentials. +func (c *Coordinator) ReactorClientHandler( + ctx context.Context, + req *pb.ReactorClientRequest, +) (*pb.ReactorClientResponse, error) { + + id := int(req.GetId()) + + logging.Debug( + logging.DClient, + "LIS 00 reactor %v has connected\n", + id, + ) + + db, err := c.getReactorDatabaseCred(id) + if err != nil { + return &pb.ReactorClientResponse{}, err + } + + return &pb.ReactorClientResponse{ + Id: id, + Url: db.url, + Org: db.org, + Token: db.token, + Bucket: db.bucket, + }, err +} + +// ReactorStatusHandler is a gRPC handler used to handle incoming +// reactor requests containing information about said reactor. +// It will get the associate reactor manager and pass the +// request device information before returning an acknowledgement. +func (c *Coordinator) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) { + + // rm, err := c.LoadReactorManager(int(req.GetId())) + + return &pb.ReactorStatusResponse{}, nil +} diff --git a/internal/pkg/server/listener.go b/internal/pkg/server/listener.go deleted file mode 100644 index c68356c..0000000 --- a/internal/pkg/server/listener.go +++ /dev/null @@ -1,120 +0,0 @@ -package server - -import ( - pb "FRMS/internal/pkg/grpc" - "FRMS/internal/pkg/logging" - "context" - "fmt" - "net" - - "google.golang.org/grpc" -) - -// Listener is a struct that listens for incoming clients on a given port -// and passes them the central coordinator. -// Implements the gRPC handshake server for clients. -type Listener struct { - Port int `mapstructure:"lis"` - ClientConnections chan *ClientPacket - Err chan error - pb.UnimplementedHandshakeServer -} - -// ClientPacket is a uniform type to pass on a channel to the server. -type ClientPacket struct { - *Client - Response chan *ClientResponse -} - -// Client is a struct containing information about the client on -// the incoming connection. -type Client struct { - //Ip string - //Port int - Id int - Model string - Type string -} - -// ClientResponse is the database credentials returned from the central -// coordinator for the given client. -type ClientResponse struct { - Port int - URL string - Org string - Token string - Bucket string -} - -// NewListener createsa new listener with the given client and error channels -func NewListener( - cch chan *ClientPacket, - ech chan error, -) (*Listener, error) { - - return &Listener{ - Err: ech, - ClientConnections: cch, - }, nil -} - -// Start activates the listener and kicks off the gRPC binding process -func (l *Listener) Start() error { - logging.Debug(logging.DStart, "LIS 01 Started client listener") - return l.Register() -} - -// Register creates a net listener on the port and binds a grpc server to it -// before registering a handshake server. -func (l *Listener) Register() error { - - lis, err := net.Listen("tcp", fmt.Sprintf(":%v", l.Port)) - if err != nil { - return err - } - - grpcServer := grpc.NewServer() - pb.RegisterHandshakeServer(grpcServer, l) - - go grpcServer.Serve(lis) - - logging.Debug(logging.DStart, "LIS 01 Registered on port %v", l.Port) - - return nil -} - -// ClientDiscoveryHandler implements the grpc method which can be called -// by incoming clients to first make connection to the central -// coordinator and receive database credentials. -func (l *Listener) ClientDiscoveryHandler(ctx context.Context, ping *pb.ClientRequest) (*pb.ClientResponse, error) { - - c := &Client{ - Id: int(ping.GetClientId()), - Type: ping.GetClientType(), - } - - logging.Debug(logging.DClient, "LIS 01 %v %v has connected\n", c.Type, c.Id) - - ch := make(chan *ClientResponse) - p := &ClientPacket{ - Client: c, - Response: ch, - } - - l.ClientConnections <- p - - resp := <-ch - - db := &pb.Database{ - URL: resp.URL, - ORG: resp.Org, - Token: resp.Token, - Bucket: resp.Bucket, - } - - return &pb.ClientResponse{ - ClientId: uint32(c.Id), - ServerPort: uint32(resp.Port), - Database: db, - }, nil -} diff --git a/internal/pkg/server/listener_test.go b/internal/pkg/server/listener_test.go deleted file mode 100644 index ae5d770..0000000 --- a/internal/pkg/server/listener_test.go +++ /dev/null @@ -1,17 +0,0 @@ -package server - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -// TestNewListener tries to create a new listener -func TestNewListener(t *testing.T) { - assert := assert.New(t) - - cch := make(chan *ClientPacket) - ech := make(chan error) - _, err := NewListener(cch, ech) - assert.Equal(err, nil, "creating listener failed") -} diff --git a/internal/pkg/server/reactor.go b/internal/pkg/server/reactor.go new file mode 100644 index 0000000..12dc3ad --- /dev/null +++ b/internal/pkg/server/reactor.go @@ -0,0 +1,121 @@ +package server + +import ( + "FRMS/internal/pkg/logging" + "FRMS/internal/pkg/manager" + "errors" + "time" +) + +var ( + ErrNoReactorManager = errors.New("no reactor manager found") +) + +// ReactorManager can be started/stopped as clients connect/disconnect. +type ReactorManager struct { + Manager // base manager interface +} + +// Manager is an interface requiring a structure that can be started +// and stopped as well as provide timeouts in milliseconds. +type Manager interface { + Start() error // status checks + Stop() error + Timeout() (time.Duration, error) // TO Generator +} + +// NewManager returns a manager fulfilling the Manager interface as well as +// any potential errors. +func NewManager(max int) (Manager, error) { + return manager.New(max) +} + +// GetReactorManager returns a reactor manager for passed id. +// Throws error if manager not found for id. +func (c *Coordinator) LoadReactorManager(id int) (*ReactorManager, error) { + + c.managerMu.RLock() + defer c.managerMu.RUnlock() + + rm, exists := c.directory[id] + + if !exists { + logging.Debug( + logging.DClient, + "RCO 00 creating manager for %v", + id, + ) + + m, err := NewManager(0) + + rm = &ReactorManager{ + Manager: m, + } + + if err = rm.Start(); err != nil { + return rm, err + } + + c.directory[id] = rm + } + + return rm, nil +} + +// // NewReactorManager takes in a client, config and channel to pass errors on. +// // Returns a new reactor manager as well as any errors that occured during +// // creation. +// // Uses MaxConnectionAttempts which defaults to 10 to prevent +// // unnessecary network load and/or timeout lengths. +// func NewReactorManager( +// ) (*ReactorManager, error) { + +// m, err := NewManager(MaxConnectionAttempts) + +// if err != nil { +// return &ReactorManager{}, err +// } + +// return r, err +// } + +// Start logs the start and calls start on the embedded manager. +func (r *ReactorManager) Start() error { + // logging.Debug(logging.DStart, "RMA starting", r.Id) + return r.Manager.Start() +} + +// Stop logs the stop and calls stop on the embedded manager. +func (r *ReactorManager) Stop() error { + // logging.Debug(logging.DExit, "RMA %v stopping", r.Id) + return r.Manager.Stop() +} + +// UpdateClient is used to change the underlying manager client if there +// changes to its data. +// +// BUG(Keegan): Client is not protected by a lock and may lead to races +// func (r *ReactorManager) UpdateClient(cl *Client) error { +// logging.Debug(logging.DClient, "RMA %v updating client", r.Id) +// r.Client = cl +// return nil +// } + +// // ReactorDeviceHandler processes incoming device information and +// // updates the manager accordingly. +// func (r *ReactorManager) ReactorDeviceHandler(devs []*pb.Device) error { + +// logging.Debug(logging.DClient, "CCO recieved ping from %v", r.Id) + +// for _, dev := range devs { +// logging.Debug( +// logging.DClient, +// "CCO %v device %v is %v", +// r.Id, +// dev.GetAddr(), +// dev.GetStatus().String(), +// ) +// } + +// return nil +// } diff --git a/internal/pkg/server/reactormanager.go b/internal/pkg/server/reactormanager.go deleted file mode 100644 index ed113f7..0000000 --- a/internal/pkg/server/reactormanager.go +++ /dev/null @@ -1,108 +0,0 @@ -package server - -import ( - pb "FRMS/internal/pkg/grpc" - "FRMS/internal/pkg/logging" - "FRMS/internal/pkg/manager" - "time" - - "github.com/spf13/viper" -) - -// MaxConnectionAttempts is the max number of tries to allow -// when connecting to a reactor. -const MaxConnectionAttempts = 10 - -// Manager is an interface requiring a structure that can be started -// and stopped as well as provide timeouts in milliseconds. -type Manager interface { - Start() error // status checks - Stop() error - Timeout() (time.Duration, error) // TO Generator -} - -// NewManager returns a manager fulfilling the Manager interface as well as -// any potential errors. -func NewManager(max int) (Manager, error) { - return manager.New(max) -} - -// ReactorManager contains a base manager, client, global -// config, and error channel. -// The ReactorManager can be started/stopped as clients connect/disconnect. -// Also serves as handler for gRPC requests from reactors. -// Can be extended to write changes to config. -type ReactorManager struct { - Manager // base manager interface - *Client - Config *viper.Viper // global config to maintain - Err chan error -} - -// NewReactorManager takes in a client, config and channel to pass errors on. -// Returns a new reactor manager as well as any errors that occured during -// creation. -// Uses MaxConnectionAttempts which defaults to 10 to prevent -// unnessecary network load and/or timeout lengths. -func NewReactorManager( - cl *Client, - config *viper.Viper, - errCh chan error, -) (*ReactorManager, error) { - - m, err := NewManager(MaxConnectionAttempts) - - if err != nil { - return &ReactorManager{}, err - } - - r := &ReactorManager{ - Manager: m, - Client: cl, - Config: config, - Err: errCh, - } - - return r, err -} - -// Start logs the start and calls start on the embedded manager. -func (r *ReactorManager) Start() error { - logging.Debug(logging.DStart, "RMA %v starting", r.Id) - return r.Manager.Start() -} - -// Stop logs the stop and calls stop on the embedded manager. -func (r *ReactorManager) Stop() error { - logging.Debug(logging.DExit, "RMA %v stopping", r.Id) - return r.Manager.Stop() -} - -// UpdateClient is used to change the underlying manager client if there -// changes to its data. -// -// BUG(Keegan): Client is not protected by a lock and may lead to races -func (r *ReactorManager) UpdateClient(cl *Client) error { - logging.Debug(logging.DClient, "RMA %v updating client", r.Id) - r.Client = cl - return nil -} - -// ReactorDeviceHandler processes incoming device information and -// updates the manager accordingly. -func (r *ReactorManager) ReactorDeviceHandler(devs []*pb.Device) error { - - logging.Debug(logging.DClient, "RMA %v recieved ping", r.Id) - - for _, dev := range devs { - logging.Debug( - logging.DClient, - "RMA %v device %v is %v", - r.Id, - dev.GetAddr(), - dev.GetStatus().String(), - ) - } - - return nil -} diff --git a/internal/pkg/server/reactormanager_test.go b/internal/pkg/server/reactormanager_test.go deleted file mode 100644 index 9f3b321..0000000 --- a/internal/pkg/server/reactormanager_test.go +++ /dev/null @@ -1,85 +0,0 @@ -package server - -import ( - pb "FRMS/internal/pkg/grpc" - "math/rand" - "testing" - - "github.com/spf13/viper" - "github.com/stretchr/testify/assert" -) - -// dummyClient creates a dummy client for testing. -func dummyClient() *Client { - return &Client{ - Id: rand.Int(), - Model: "dummy", - Type: "dummy", - } -} - -func dummyDevices() []*pb.Device { - numDevs := 10 - - devs := make([]*pb.Device, numDevs) - - for i := 0; i < numDevs; i++ { - dev := &pb.Device{ - Addr: int32(rand.Intn(255)), - Status: pb.Status(rand.Intn(2)), - } - devs = append(devs, dev) - } - - return devs -} - -// dummyReactorManager creates a dummy reactor manager for testing. -func dummyReactorManager() (*ReactorManager, error) { - - ch := make(chan error) - cl := dummyClient() - return NewReactorManager(cl, viper.New(), ch) -} - -// TestNewReactorManager tries to create a new reactor manager. -func TestNewReactorManager(t *testing.T) { - assert := assert.New(t) - _, err := dummyReactorManager() - assert.Equal(err, nil, "failed to create reactor manager") -} - -// TestReactorManager tries to start/stop reactor manager -func TestReactorManager(t *testing.T) { - assert := assert.New(t) - rm, err := dummyReactorManager() - assert.Equal(err, nil, "failed to create reactor manager") - - cycles := 10 - for i := 0; i < cycles; i++ { - assert.NoError(rm.Start(), "failed to start") - assert.NoError(rm.Stop(), "failed to start") - } -} - -// TestUpdateClient tries to update a reactor managers embedded client. -func TestUpdateClient(t *testing.T) { - - assert := assert.New(t) - rm, err := dummyReactorManager() - assert.Equal(err, nil, "failed to create reactor manager") - - cl := dummyClient() - - assert.NoError(rm.UpdateClient(cl), "failed to update client") -} - -// TestReactorDeviceHandler ensures that a list of devices can be processed. -func TestReactorDeviceHandler(t *testing.T) { - assert := assert.New(t) - rm, err := dummyReactorManager() - assert.Equal(err, nil, "failed to create reactor manager") - - devs := dummyDevices() - assert.NoError(rm.ReactorDeviceHandler(devs), "failed to handle devices") -} diff --git a/internal/pkg/server/register.go b/internal/pkg/server/register.go new file mode 100644 index 0000000..7be0111 --- /dev/null +++ b/internal/pkg/server/register.go @@ -0,0 +1,19 @@ +package server + +import ( + pb "FRMS/internal/pkg/grpc" + "FRMS/internal/pkg/logging" +) + +func (c *Coordinator) Register() error { + // register services + pb.RegisterHandshakeServer(c.grpcServer, c) + + go c.grpcServer.Serve(c.listener) + // testing + pb.RegisterMonitoringServer(c.grpcServer, c) + + logging.Debug(logging.DStart, "CCO 00 registered grpc") + + return nil +}