From 6acf06f21b3f42f41a23ec3c1dec2f36c33da6eb Mon Sep 17 00:00:00 2001 From: spinach <19keegandeppe@gmail.com> Date: Fri, 23 Jun 2023 15:42:23 -0400 Subject: [PATCH] fixing refs --- server/pkg/config/config.go | 54 +++++ server/pkg/database/connect.go | 64 +++++ server/pkg/grpc/device.proto | 42 ++++ server/pkg/grpc/handshake.pb.go | 260 ++++++++++++++++++++ server/pkg/grpc/handshake.proto | 21 ++ server/pkg/grpc/handshake_grpc.pb.go | 105 ++++++++ server/pkg/grpc/monitoring.pb.go | 211 ++++++++++++++++ server/pkg/grpc/monitoring.proto | 16 ++ server/pkg/grpc/monitoring_grpc.pb.go | 139 +++++++++++ server/pkg/grpc/server.pb.go | 335 ++++++++++++++++++++++++++ server/pkg/grpc/server.proto | 28 +++ server/pkg/grpc/server_grpc.pb.go | 105 ++++++++ server/pkg/logging/dslogs | 112 +++++++++ server/pkg/logging/logging.go | 86 +++++++ server/pkg/system/coordinator.go | 80 ++++++ server/pkg/system/database.go | 13 + server/pkg/system/handler.go | 48 ++++ server/pkg/system/hwinfo.go | 118 +++++++++ server/pkg/system/listener.go | 84 +++++++ server/pkg/system/manager.go | 29 +++ server/pkg/system/manager_test.go | 206 ++++++++++++++++ server/pkg/system/reactor.go | 121 ++++++++++ server/pkg/system/reactormanager.go | 149 ++++++++++++ server/pkg/system/register.go | 19 ++ server/pkg/system/system.go | 309 ++++++++++++++++++++++++ server/pkg/websocket/connect.go | 90 +++++++ 26 files changed, 2844 insertions(+) create mode 100644 server/pkg/config/config.go create mode 100644 server/pkg/database/connect.go create mode 100644 server/pkg/grpc/device.proto create mode 100644 server/pkg/grpc/handshake.pb.go create mode 100644 server/pkg/grpc/handshake.proto create mode 100644 server/pkg/grpc/handshake_grpc.pb.go create mode 100644 server/pkg/grpc/monitoring.pb.go create mode 100644 server/pkg/grpc/monitoring.proto create mode 100644 server/pkg/grpc/monitoring_grpc.pb.go create mode 100644 server/pkg/grpc/server.pb.go create mode 100644 server/pkg/grpc/server.proto create mode 100644 server/pkg/grpc/server_grpc.pb.go create mode 100755 server/pkg/logging/dslogs create mode 100644 server/pkg/logging/logging.go create mode 100644 server/pkg/system/coordinator.go create mode 100644 server/pkg/system/database.go create mode 100644 server/pkg/system/handler.go create mode 100644 server/pkg/system/hwinfo.go create mode 100644 server/pkg/system/listener.go create mode 100644 server/pkg/system/manager.go create mode 100644 server/pkg/system/manager_test.go create mode 100644 server/pkg/system/reactor.go create mode 100644 server/pkg/system/reactormanager.go create mode 100644 server/pkg/system/register.go create mode 100644 server/pkg/system/system.go create mode 100644 server/pkg/websocket/connect.go diff --git a/server/pkg/config/config.go b/server/pkg/config/config.go new file mode 100644 index 0000000..7cc23c7 --- /dev/null +++ b/server/pkg/config/config.go @@ -0,0 +1,54 @@ +// Package config wraps viper to load/store configs +// using the XDG standard ($HOME/.config/FRMS) as the base directory +// +// WARNING: only built for Linux + +import ( + "dmac/pkg/logging" + "FRMS/internal/pkg/logging" + "fmt" + "os" + + "github.com/spf13/viper" +) + +// Load the file at path/file into a viper object. +// Expects config file to be yaml. +func Load(file, path, ext string) (*viper.Viper, error) { + + logging.Debug(logging.DStart, "CON loading %s", file) + + config := viper.New() + + //configFile := fmt.Sprintf("%s/%s.%s", path, file, ext) + + config.SetConfigName(file) + config.AddConfigPath(path) + config.SetConfigType(ext) + + // Sets env vars + config.AutomaticEnv() + + // create config directory if it doesn't exist + if err := os.MkdirAll(path, 0750); err != nil && !os.IsExist(err) { + return config, err + } + + // attempt to create an empty config incase it doesn't exist + // if err := config.SafeWriteConfigAs(configFile); err != nil { + // // if error thrown because file exists, fine to ignore + // if _, ok := err.(viper.ConfigFileAlreadyExistsError); !ok { + // return config, err + // } + // } + + if err := config.ReadInConfig(); err != nil { + fmt.Printf("read error %v\n", config) + return config, err + } + + logging.Debug(logging.DStart, "CON Loaded configs from %#V", config.ConfigFileUsed()) + + // returning config object + return config, nil +} diff --git a/server/pkg/database/connect.go b/server/pkg/database/connect.go new file mode 100644 index 0000000..84df903 --- /dev/null +++ b/server/pkg/database/connect.go @@ -0,0 +1,64 @@ +// package Database wraps some influx db methods to provide functionality. +package database + +import ( + "context" + "errors" + "fmt" + + influx "github.com/influxdata/influxdb-client-go/v2" + "github.com/spf13/viper" +) + +var ( + ErrDBConnection = errors.New("connection to database failed") + ErrNoURLFound = errors.New("database url not found") +) + +var db influx.Client + +// Connect takes in a config and attempts to create a client for influxdb. +// Will automatically write changes back to config for future attempts +func Connect(config *viper.Viper) error { + + url := config.GetString("db.url") + token := config.GetString("db.token") + + if url == "" { + return ErrNoURLFound + } + + db = influx.NewClient(url, token) + + if token == "" { + // try setup + fmt.Printf("attempting to setup database at %v\n", url) + + user := config.GetString("db.username") + password := config.GetString("db.password") + org := config.GetString("db.org") + bucket := config.GetString("db.bucket") + + Setup(user, pass, org, bucket + } + + db = influx.NewClient(url, token) + + return nil +} + +func Setup(user, pass, org, bucket string, ret int) (string, error) { + + resp, err := db.Setup(context.Background(), user, pass, org, bucket, ret) + + return "", nil +} + +func GetBucket(id int) (string, error) { + return "", nil +} + +func GetToken(id int) (string, error) { + // bucket, err := client.BucketsAPI().FindBucketByName(context.Background(), id) + return "", nil +} diff --git a/server/pkg/grpc/device.proto b/server/pkg/grpc/device.proto new file mode 100644 index 0000000..69ec364 --- /dev/null +++ b/server/pkg/grpc/device.proto @@ -0,0 +1,42 @@ +syntax = "proto3"; +package grpc; + +option go_package = "internal/pkg/grpc"; + +service device { + // groups basic device interactions + // get/set name based on request + rpc Name(NameRequest) returns (NameResponse) +} + +message NameRequest { + // empty for future expansion + string Name = 1; +} + +message NameResponse { + string Name = 1; +} + +service sensor { + // sensor specific functions + rpc Reading(ReadingRequest) returns (ReadingResponse) + rpc SampleRate(SampleRateRequest) returns (SampleRateResponse) +} + +message ReadingRequest { + // empty +} + +message ReadingResponse { + string Reading = 1; // formatted reading "9.7 pH" + int64 Timestamp = 2; // when the reading was taken +} + +message SampleRateRequest { + int32 SampleRate = 1; // 0 to return current sample rate, value in seconds +} + +message SampleRateResponse { + int32 SampleRate = 1; // returns the set sample rate +} diff --git a/server/pkg/grpc/handshake.pb.go b/server/pkg/grpc/handshake.pb.go new file mode 100644 index 0000000..3444896 --- /dev/null +++ b/server/pkg/grpc/handshake.pb.go @@ -0,0 +1,260 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.1 +// protoc v3.21.12 +// source: internal/pkg/grpc/handshake.proto + +package grpc + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type ReactorClientRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id uint32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` + Port uint32 `protobuf:"varint,2,opt,name=port,proto3" json:"port,omitempty"` // client gRPC port +} + +func (x *ReactorClientRequest) Reset() { + *x = ReactorClientRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_internal_pkg_grpc_handshake_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ReactorClientRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ReactorClientRequest) ProtoMessage() {} + +func (x *ReactorClientRequest) ProtoReflect() protoreflect.Message { + mi := &file_internal_pkg_grpc_handshake_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ReactorClientRequest.ProtoReflect.Descriptor instead. +func (*ReactorClientRequest) Descriptor() ([]byte, []int) { + return file_internal_pkg_grpc_handshake_proto_rawDescGZIP(), []int{0} +} + +func (x *ReactorClientRequest) GetId() uint32 { + if x != nil { + return x.Id + } + return 0 +} + +func (x *ReactorClientRequest) GetPort() uint32 { + if x != nil { + return x.Port + } + return 0 +} + +type ReactorClientResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id uint32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` + Url string `protobuf:"bytes,2,opt,name=url,proto3" json:"url,omitempty"` + Org string `protobuf:"bytes,3,opt,name=org,proto3" json:"org,omitempty"` + Token string `protobuf:"bytes,4,opt,name=token,proto3" json:"token,omitempty"` + Bucket string `protobuf:"bytes,5,opt,name=bucket,proto3" json:"bucket,omitempty"` +} + +func (x *ReactorClientResponse) Reset() { + *x = ReactorClientResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_internal_pkg_grpc_handshake_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ReactorClientResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ReactorClientResponse) ProtoMessage() {} + +func (x *ReactorClientResponse) ProtoReflect() protoreflect.Message { + mi := &file_internal_pkg_grpc_handshake_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ReactorClientResponse.ProtoReflect.Descriptor instead. +func (*ReactorClientResponse) Descriptor() ([]byte, []int) { + return file_internal_pkg_grpc_handshake_proto_rawDescGZIP(), []int{1} +} + +func (x *ReactorClientResponse) GetId() uint32 { + if x != nil { + return x.Id + } + return 0 +} + +func (x *ReactorClientResponse) GetUrl() string { + if x != nil { + return x.Url + } + return "" +} + +func (x *ReactorClientResponse) GetOrg() string { + if x != nil { + return x.Org + } + return "" +} + +func (x *ReactorClientResponse) GetToken() string { + if x != nil { + return x.Token + } + return "" +} + +func (x *ReactorClientResponse) GetBucket() string { + if x != nil { + return x.Bucket + } + return "" +} + +var File_internal_pkg_grpc_handshake_proto protoreflect.FileDescriptor + +var file_internal_pkg_grpc_handshake_proto_rawDesc = []byte{ + 0x0a, 0x21, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x67, + 0x72, 0x70, 0x63, 0x2f, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x12, 0x04, 0x67, 0x72, 0x70, 0x63, 0x22, 0x3a, 0x0a, 0x14, 0x52, 0x65, 0x61, + 0x63, 0x74, 0x6f, 0x72, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x02, 0x69, + 0x64, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x6f, 0x72, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, + 0x04, 0x70, 0x6f, 0x72, 0x74, 0x22, 0x79, 0x0a, 0x15, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, + 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x0e, + 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x02, 0x69, 0x64, 0x12, 0x10, + 0x0a, 0x03, 0x75, 0x72, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x75, 0x72, 0x6c, + 0x12, 0x10, 0x0a, 0x03, 0x6f, 0x72, 0x67, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6f, + 0x72, 0x67, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x05, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x16, 0x0a, 0x06, 0x62, 0x75, 0x63, 0x6b, + 0x65, 0x74, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x62, 0x75, 0x63, 0x6b, 0x65, 0x74, + 0x32, 0x5c, 0x0a, 0x09, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x12, 0x4f, 0x0a, + 0x14, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x48, 0x61, + 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x12, 0x1a, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x61, + 0x63, 0x74, 0x6f, 0x72, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x1b, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, + 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x13, + 0x5a, 0x11, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x67, + 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_internal_pkg_grpc_handshake_proto_rawDescOnce sync.Once + file_internal_pkg_grpc_handshake_proto_rawDescData = file_internal_pkg_grpc_handshake_proto_rawDesc +) + +func file_internal_pkg_grpc_handshake_proto_rawDescGZIP() []byte { + file_internal_pkg_grpc_handshake_proto_rawDescOnce.Do(func() { + file_internal_pkg_grpc_handshake_proto_rawDescData = protoimpl.X.CompressGZIP(file_internal_pkg_grpc_handshake_proto_rawDescData) + }) + return file_internal_pkg_grpc_handshake_proto_rawDescData +} + +var file_internal_pkg_grpc_handshake_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_internal_pkg_grpc_handshake_proto_goTypes = []interface{}{ + (*ReactorClientRequest)(nil), // 0: grpc.ReactorClientRequest + (*ReactorClientResponse)(nil), // 1: grpc.ReactorClientResponse +} +var file_internal_pkg_grpc_handshake_proto_depIdxs = []int32{ + 0, // 0: grpc.handshake.ReactorClientHandler:input_type -> grpc.ReactorClientRequest + 1, // 1: grpc.handshake.ReactorClientHandler:output_type -> grpc.ReactorClientResponse + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_internal_pkg_grpc_handshake_proto_init() } +func file_internal_pkg_grpc_handshake_proto_init() { + if File_internal_pkg_grpc_handshake_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_internal_pkg_grpc_handshake_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ReactorClientRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_internal_pkg_grpc_handshake_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ReactorClientResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_internal_pkg_grpc_handshake_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_internal_pkg_grpc_handshake_proto_goTypes, + DependencyIndexes: file_internal_pkg_grpc_handshake_proto_depIdxs, + MessageInfos: file_internal_pkg_grpc_handshake_proto_msgTypes, + }.Build() + File_internal_pkg_grpc_handshake_proto = out.File + file_internal_pkg_grpc_handshake_proto_rawDesc = nil + file_internal_pkg_grpc_handshake_proto_goTypes = nil + file_internal_pkg_grpc_handshake_proto_depIdxs = nil +} diff --git a/server/pkg/grpc/handshake.proto b/server/pkg/grpc/handshake.proto new file mode 100644 index 0000000..716477e --- /dev/null +++ b/server/pkg/grpc/handshake.proto @@ -0,0 +1,21 @@ +syntax = "proto3"; +package grpc; + +option go_package = "internal/pkg/grpc"; + +service handshake { + rpc ReactorClientHandler(ReactorClientRequest) returns (ReactorClientResponse); +} + +message ReactorClientRequest { + uint32 id = 1; + uint32 port = 2; // client gRPC port +} + +message ReactorClientResponse { + uint32 id = 1; + string url = 2; + string org = 3; + string token = 4; + string bucket = 5; +} diff --git a/server/pkg/grpc/handshake_grpc.pb.go b/server/pkg/grpc/handshake_grpc.pb.go new file mode 100644 index 0000000..a78492f --- /dev/null +++ b/server/pkg/grpc/handshake_grpc.pb.go @@ -0,0 +1,105 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.2.0 +// - protoc v3.21.12 +// source: internal/pkg/grpc/handshake.proto + +package grpc + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// HandshakeClient is the client API for Handshake service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type HandshakeClient interface { + ReactorClientHandler(ctx context.Context, in *ReactorClientRequest, opts ...grpc.CallOption) (*ReactorClientResponse, error) +} + +type handshakeClient struct { + cc grpc.ClientConnInterface +} + +func NewHandshakeClient(cc grpc.ClientConnInterface) HandshakeClient { + return &handshakeClient{cc} +} + +func (c *handshakeClient) ReactorClientHandler(ctx context.Context, in *ReactorClientRequest, opts ...grpc.CallOption) (*ReactorClientResponse, error) { + out := new(ReactorClientResponse) + err := c.cc.Invoke(ctx, "/grpc.handshake/ReactorClientHandler", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// HandshakeServer is the server API for Handshake service. +// All implementations must embed UnimplementedHandshakeServer +// for forward compatibility +type HandshakeServer interface { + ReactorClientHandler(context.Context, *ReactorClientRequest) (*ReactorClientResponse, error) + mustEmbedUnimplementedHandshakeServer() +} + +// UnimplementedHandshakeServer must be embedded to have forward compatible implementations. +type UnimplementedHandshakeServer struct { +} + +func (UnimplementedHandshakeServer) ReactorClientHandler(context.Context, *ReactorClientRequest) (*ReactorClientResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method ReactorClientHandler not implemented") +} +func (UnimplementedHandshakeServer) mustEmbedUnimplementedHandshakeServer() {} + +// UnsafeHandshakeServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to HandshakeServer will +// result in compilation errors. +type UnsafeHandshakeServer interface { + mustEmbedUnimplementedHandshakeServer() +} + +func RegisterHandshakeServer(s grpc.ServiceRegistrar, srv HandshakeServer) { + s.RegisterService(&Handshake_ServiceDesc, srv) +} + +func _Handshake_ReactorClientHandler_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ReactorClientRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(HandshakeServer).ReactorClientHandler(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/grpc.handshake/ReactorClientHandler", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(HandshakeServer).ReactorClientHandler(ctx, req.(*ReactorClientRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// Handshake_ServiceDesc is the grpc.ServiceDesc for Handshake service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var Handshake_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "grpc.handshake", + HandlerType: (*HandshakeServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "ReactorClientHandler", + Handler: _Handshake_ReactorClientHandler_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "internal/pkg/grpc/handshake.proto", +} diff --git a/server/pkg/grpc/monitoring.pb.go b/server/pkg/grpc/monitoring.pb.go new file mode 100644 index 0000000..59cd1d3 --- /dev/null +++ b/server/pkg/grpc/monitoring.pb.go @@ -0,0 +1,211 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.1 +// protoc v3.21.12 +// source: internal/pkg/grpc/monitoring.proto + +package grpc + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type ReactorAck struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id int32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` +} + +func (x *ReactorAck) Reset() { + *x = ReactorAck{} + if protoimpl.UnsafeEnabled { + mi := &file_internal_pkg_grpc_monitoring_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ReactorAck) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ReactorAck) ProtoMessage() {} + +func (x *ReactorAck) ProtoReflect() protoreflect.Message { + mi := &file_internal_pkg_grpc_monitoring_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ReactorAck.ProtoReflect.Descriptor instead. +func (*ReactorAck) Descriptor() ([]byte, []int) { + return file_internal_pkg_grpc_monitoring_proto_rawDescGZIP(), []int{0} +} + +func (x *ReactorAck) GetId() int32 { + if x != nil { + return x.Id + } + return 0 +} + +type ReactorPing struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id int32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` +} + +func (x *ReactorPing) Reset() { + *x = ReactorPing{} + if protoimpl.UnsafeEnabled { + mi := &file_internal_pkg_grpc_monitoring_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ReactorPing) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ReactorPing) ProtoMessage() {} + +func (x *ReactorPing) ProtoReflect() protoreflect.Message { + mi := &file_internal_pkg_grpc_monitoring_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ReactorPing.ProtoReflect.Descriptor instead. +func (*ReactorPing) Descriptor() ([]byte, []int) { + return file_internal_pkg_grpc_monitoring_proto_rawDescGZIP(), []int{1} +} + +func (x *ReactorPing) GetId() int32 { + if x != nil { + return x.Id + } + return 0 +} + +var File_internal_pkg_grpc_monitoring_proto protoreflect.FileDescriptor + +var file_internal_pkg_grpc_monitoring_proto_rawDesc = []byte{ + 0x0a, 0x22, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x67, + 0x72, 0x70, 0x63, 0x2f, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x69, 0x6e, 0x67, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x04, 0x67, 0x72, 0x70, 0x63, 0x22, 0x1c, 0x0a, 0x0a, 0x52, 0x65, + 0x61, 0x63, 0x74, 0x6f, 0x72, 0x41, 0x63, 0x6b, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x05, 0x52, 0x02, 0x69, 0x64, 0x22, 0x1d, 0x0a, 0x0b, 0x52, 0x65, 0x61, 0x63, + 0x74, 0x6f, 0x72, 0x50, 0x69, 0x6e, 0x67, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x05, 0x52, 0x02, 0x69, 0x64, 0x32, 0x49, 0x0a, 0x0a, 0x6d, 0x6f, 0x6e, 0x69, 0x74, + 0x6f, 0x72, 0x69, 0x6e, 0x67, 0x12, 0x3b, 0x0a, 0x12, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, + 0x50, 0x69, 0x6e, 0x67, 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x12, 0x11, 0x2e, 0x67, 0x72, + 0x70, 0x63, 0x2e, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x50, 0x69, 0x6e, 0x67, 0x1a, 0x10, + 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x41, 0x63, 0x6b, + 0x28, 0x01, 0x42, 0x13, 0x5a, 0x11, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, + 0x6b, 0x67, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_internal_pkg_grpc_monitoring_proto_rawDescOnce sync.Once + file_internal_pkg_grpc_monitoring_proto_rawDescData = file_internal_pkg_grpc_monitoring_proto_rawDesc +) + +func file_internal_pkg_grpc_monitoring_proto_rawDescGZIP() []byte { + file_internal_pkg_grpc_monitoring_proto_rawDescOnce.Do(func() { + file_internal_pkg_grpc_monitoring_proto_rawDescData = protoimpl.X.CompressGZIP(file_internal_pkg_grpc_monitoring_proto_rawDescData) + }) + return file_internal_pkg_grpc_monitoring_proto_rawDescData +} + +var file_internal_pkg_grpc_monitoring_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_internal_pkg_grpc_monitoring_proto_goTypes = []interface{}{ + (*ReactorAck)(nil), // 0: grpc.ReactorAck + (*ReactorPing)(nil), // 1: grpc.ReactorPing +} +var file_internal_pkg_grpc_monitoring_proto_depIdxs = []int32{ + 1, // 0: grpc.monitoring.ReactorPingHandler:input_type -> grpc.ReactorPing + 0, // 1: grpc.monitoring.ReactorPingHandler:output_type -> grpc.ReactorAck + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_internal_pkg_grpc_monitoring_proto_init() } +func file_internal_pkg_grpc_monitoring_proto_init() { + if File_internal_pkg_grpc_monitoring_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_internal_pkg_grpc_monitoring_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ReactorAck); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_internal_pkg_grpc_monitoring_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ReactorPing); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_internal_pkg_grpc_monitoring_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_internal_pkg_grpc_monitoring_proto_goTypes, + DependencyIndexes: file_internal_pkg_grpc_monitoring_proto_depIdxs, + MessageInfos: file_internal_pkg_grpc_monitoring_proto_msgTypes, + }.Build() + File_internal_pkg_grpc_monitoring_proto = out.File + file_internal_pkg_grpc_monitoring_proto_rawDesc = nil + file_internal_pkg_grpc_monitoring_proto_goTypes = nil + file_internal_pkg_grpc_monitoring_proto_depIdxs = nil +} diff --git a/server/pkg/grpc/monitoring.proto b/server/pkg/grpc/monitoring.proto new file mode 100644 index 0000000..8f307b2 --- /dev/null +++ b/server/pkg/grpc/monitoring.proto @@ -0,0 +1,16 @@ +syntax = "proto3"; +package grpc; + +option go_package = "internal/pkg/grpc"; + +service monitoring { + rpc ReactorPingHandler(stream ReactorPing) returns (ReactorAck); +} + +message ReactorAck { + int32 id = 1; +} + +message ReactorPing { + int32 id = 1; +} diff --git a/server/pkg/grpc/monitoring_grpc.pb.go b/server/pkg/grpc/monitoring_grpc.pb.go new file mode 100644 index 0000000..75a39d6 --- /dev/null +++ b/server/pkg/grpc/monitoring_grpc.pb.go @@ -0,0 +1,139 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.2.0 +// - protoc v3.21.12 +// source: internal/pkg/grpc/monitoring.proto + +package grpc + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// MonitoringClient is the client API for Monitoring service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type MonitoringClient interface { + ReactorPingHandler(ctx context.Context, opts ...grpc.CallOption) (Monitoring_ReactorPingHandlerClient, error) +} + +type monitoringClient struct { + cc grpc.ClientConnInterface +} + +func NewMonitoringClient(cc grpc.ClientConnInterface) MonitoringClient { + return &monitoringClient{cc} +} + +func (c *monitoringClient) ReactorPingHandler(ctx context.Context, opts ...grpc.CallOption) (Monitoring_ReactorPingHandlerClient, error) { + stream, err := c.cc.NewStream(ctx, &Monitoring_ServiceDesc.Streams[0], "/grpc.monitoring/ReactorPingHandler", opts...) + if err != nil { + return nil, err + } + x := &monitoringReactorPingHandlerClient{stream} + return x, nil +} + +type Monitoring_ReactorPingHandlerClient interface { + Send(*ReactorPing) error + CloseAndRecv() (*ReactorAck, error) + grpc.ClientStream +} + +type monitoringReactorPingHandlerClient struct { + grpc.ClientStream +} + +func (x *monitoringReactorPingHandlerClient) Send(m *ReactorPing) error { + return x.ClientStream.SendMsg(m) +} + +func (x *monitoringReactorPingHandlerClient) CloseAndRecv() (*ReactorAck, error) { + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + m := new(ReactorAck) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// MonitoringServer is the server API for Monitoring service. +// All implementations must embed UnimplementedMonitoringServer +// for forward compatibility +type MonitoringServer interface { + ReactorPingHandler(Monitoring_ReactorPingHandlerServer) error + mustEmbedUnimplementedMonitoringServer() +} + +// UnimplementedMonitoringServer must be embedded to have forward compatible implementations. +type UnimplementedMonitoringServer struct { +} + +func (UnimplementedMonitoringServer) ReactorPingHandler(Monitoring_ReactorPingHandlerServer) error { + return status.Errorf(codes.Unimplemented, "method ReactorPingHandler not implemented") +} +func (UnimplementedMonitoringServer) mustEmbedUnimplementedMonitoringServer() {} + +// UnsafeMonitoringServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to MonitoringServer will +// result in compilation errors. +type UnsafeMonitoringServer interface { + mustEmbedUnimplementedMonitoringServer() +} + +func RegisterMonitoringServer(s grpc.ServiceRegistrar, srv MonitoringServer) { + s.RegisterService(&Monitoring_ServiceDesc, srv) +} + +func _Monitoring_ReactorPingHandler_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(MonitoringServer).ReactorPingHandler(&monitoringReactorPingHandlerServer{stream}) +} + +type Monitoring_ReactorPingHandlerServer interface { + SendAndClose(*ReactorAck) error + Recv() (*ReactorPing, error) + grpc.ServerStream +} + +type monitoringReactorPingHandlerServer struct { + grpc.ServerStream +} + +func (x *monitoringReactorPingHandlerServer) SendAndClose(m *ReactorAck) error { + return x.ServerStream.SendMsg(m) +} + +func (x *monitoringReactorPingHandlerServer) Recv() (*ReactorPing, error) { + m := new(ReactorPing) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// Monitoring_ServiceDesc is the grpc.ServiceDesc for Monitoring service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var Monitoring_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "grpc.monitoring", + HandlerType: (*MonitoringServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "ReactorPingHandler", + Handler: _Monitoring_ReactorPingHandler_Handler, + ClientStreams: true, + }, + }, + Metadata: "internal/pkg/grpc/monitoring.proto", +} diff --git a/server/pkg/grpc/server.pb.go b/server/pkg/grpc/server.pb.go new file mode 100644 index 0000000..20b821c --- /dev/null +++ b/server/pkg/grpc/server.pb.go @@ -0,0 +1,335 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.0 +// protoc v3.6.1 +// source: internal/pkg/grpc/server.proto + +package grpc + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type ClientRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + ClientId uint32 `protobuf:"varint,1,opt,name=clientId,proto3" json:"clientId,omitempty"` + ClientType string `protobuf:"bytes,2,opt,name=clientType,proto3" json:"clientType,omitempty"` +} + +func (x *ClientRequest) Reset() { + *x = ClientRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_internal_pkg_grpc_server_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ClientRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ClientRequest) ProtoMessage() {} + +func (x *ClientRequest) ProtoReflect() protoreflect.Message { + mi := &file_internal_pkg_grpc_server_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ClientRequest.ProtoReflect.Descriptor instead. +func (*ClientRequest) Descriptor() ([]byte, []int) { + return file_internal_pkg_grpc_server_proto_rawDescGZIP(), []int{0} +} + +func (x *ClientRequest) GetClientId() uint32 { + if x != nil { + return x.ClientId + } + return 0 +} + +func (x *ClientRequest) GetClientType() string { + if x != nil { + return x.ClientType + } + return "" +} + +type ClientResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + ClientId uint32 `protobuf:"varint,1,opt,name=clientId,proto3" json:"clientId,omitempty"` + ServerPort uint32 `protobuf:"varint,2,opt,name=serverPort,proto3" json:"serverPort,omitempty"` + Database *Database `protobuf:"bytes,3,opt,name=database,proto3" json:"database,omitempty"` +} + +func (x *ClientResponse) Reset() { + *x = ClientResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_internal_pkg_grpc_server_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ClientResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ClientResponse) ProtoMessage() {} + +func (x *ClientResponse) ProtoReflect() protoreflect.Message { + mi := &file_internal_pkg_grpc_server_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ClientResponse.ProtoReflect.Descriptor instead. +func (*ClientResponse) Descriptor() ([]byte, []int) { + return file_internal_pkg_grpc_server_proto_rawDescGZIP(), []int{1} +} + +func (x *ClientResponse) GetClientId() uint32 { + if x != nil { + return x.ClientId + } + return 0 +} + +func (x *ClientResponse) GetServerPort() uint32 { + if x != nil { + return x.ServerPort + } + return 0 +} + +func (x *ClientResponse) GetDatabase() *Database { + if x != nil { + return x.Database + } + return nil +} + +type Database struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + URL string `protobuf:"bytes,1,opt,name=URL,proto3" json:"URL,omitempty"` + ORG string `protobuf:"bytes,2,opt,name=ORG,proto3" json:"ORG,omitempty"` + Token string `protobuf:"bytes,3,opt,name=token,proto3" json:"token,omitempty"` + Bucket string `protobuf:"bytes,4,opt,name=bucket,proto3" json:"bucket,omitempty"` +} + +func (x *Database) Reset() { + *x = Database{} + if protoimpl.UnsafeEnabled { + mi := &file_internal_pkg_grpc_server_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Database) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Database) ProtoMessage() {} + +func (x *Database) ProtoReflect() protoreflect.Message { + mi := &file_internal_pkg_grpc_server_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Database.ProtoReflect.Descriptor instead. +func (*Database) Descriptor() ([]byte, []int) { + return file_internal_pkg_grpc_server_proto_rawDescGZIP(), []int{2} +} + +func (x *Database) GetURL() string { + if x != nil { + return x.URL + } + return "" +} + +func (x *Database) GetORG() string { + if x != nil { + return x.ORG + } + return "" +} + +func (x *Database) GetToken() string { + if x != nil { + return x.Token + } + return "" +} + +func (x *Database) GetBucket() string { + if x != nil { + return x.Bucket + } + return "" +} + +var File_internal_pkg_grpc_server_proto protoreflect.FileDescriptor + +var file_internal_pkg_grpc_server_proto_rawDesc = []byte{ + 0x0a, 0x1e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x67, + 0x72, 0x70, 0x63, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x12, 0x04, 0x67, 0x72, 0x70, 0x63, 0x22, 0x4b, 0x0a, 0x0d, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x63, 0x6c, 0x69, 0x65, 0x6e, + 0x74, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x63, 0x6c, 0x69, 0x65, 0x6e, + 0x74, 0x49, 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, + 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x54, + 0x79, 0x70, 0x65, 0x22, 0x78, 0x0a, 0x0e, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x49, + 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x49, + 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x50, 0x6f, 0x72, 0x74, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x50, 0x6f, 0x72, + 0x74, 0x12, 0x2a, 0x0a, 0x08, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x62, + 0x61, 0x73, 0x65, 0x52, 0x08, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x22, 0x5c, 0x0a, + 0x08, 0x44, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x55, 0x52, 0x4c, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x55, 0x52, 0x4c, 0x12, 0x10, 0x0a, 0x03, 0x4f, + 0x52, 0x47, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x4f, 0x52, 0x47, 0x12, 0x14, 0x0a, + 0x05, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, + 0x6b, 0x65, 0x6e, 0x12, 0x16, 0x0a, 0x06, 0x62, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x18, 0x04, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x06, 0x62, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x32, 0x50, 0x0a, 0x09, 0x68, + 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x12, 0x43, 0x0a, 0x16, 0x43, 0x6c, 0x69, 0x65, + 0x6e, 0x74, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x48, 0x61, 0x6e, 0x64, 0x6c, + 0x65, 0x72, 0x12, 0x13, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x43, + 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x13, 0x5a, + 0x11, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x67, 0x72, + 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_internal_pkg_grpc_server_proto_rawDescOnce sync.Once + file_internal_pkg_grpc_server_proto_rawDescData = file_internal_pkg_grpc_server_proto_rawDesc +) + +func file_internal_pkg_grpc_server_proto_rawDescGZIP() []byte { + file_internal_pkg_grpc_server_proto_rawDescOnce.Do(func() { + file_internal_pkg_grpc_server_proto_rawDescData = protoimpl.X.CompressGZIP(file_internal_pkg_grpc_server_proto_rawDescData) + }) + return file_internal_pkg_grpc_server_proto_rawDescData +} + +var file_internal_pkg_grpc_server_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_internal_pkg_grpc_server_proto_goTypes = []interface{}{ + (*ClientRequest)(nil), // 0: grpc.ClientRequest + (*ClientResponse)(nil), // 1: grpc.ClientResponse + (*Database)(nil), // 2: grpc.Database +} +var file_internal_pkg_grpc_server_proto_depIdxs = []int32{ + 2, // 0: grpc.ClientResponse.database:type_name -> grpc.Database + 0, // 1: grpc.handshake.ClientDiscoveryHandler:input_type -> grpc.ClientRequest + 1, // 2: grpc.handshake.ClientDiscoveryHandler:output_type -> grpc.ClientResponse + 2, // [2:3] is the sub-list for method output_type + 1, // [1:2] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_internal_pkg_grpc_server_proto_init() } +func file_internal_pkg_grpc_server_proto_init() { + if File_internal_pkg_grpc_server_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_internal_pkg_grpc_server_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ClientRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_internal_pkg_grpc_server_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ClientResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_internal_pkg_grpc_server_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Database); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_internal_pkg_grpc_server_proto_rawDesc, + NumEnums: 0, + NumMessages: 3, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_internal_pkg_grpc_server_proto_goTypes, + DependencyIndexes: file_internal_pkg_grpc_server_proto_depIdxs, + MessageInfos: file_internal_pkg_grpc_server_proto_msgTypes, + }.Build() + File_internal_pkg_grpc_server_proto = out.File + file_internal_pkg_grpc_server_proto_rawDesc = nil + file_internal_pkg_grpc_server_proto_goTypes = nil + file_internal_pkg_grpc_server_proto_depIdxs = nil +} diff --git a/server/pkg/grpc/server.proto b/server/pkg/grpc/server.proto new file mode 100644 index 0000000..27f336f --- /dev/null +++ b/server/pkg/grpc/server.proto @@ -0,0 +1,28 @@ +syntax = "proto3"; +package grpc; + +option go_package = "internal/pkg/grpc"; + +service handshake { + rpc ClientDiscoveryHandler(ClientRequest) returns (ClientResponse); +} + +message ClientRequest { + uint32 clientId = 1; + string clientType = 2; + string ip = 3; // client ip + uint32 port = 4; // client port for gRPC server +} + +message ClientResponse { + uint32 clientId = 1; + uint32 serverPort = 2; + Database database = 3; +} + +message Database { + string URL = 1; + string ORG = 2; + string token = 3; + string bucket = 4; +} diff --git a/server/pkg/grpc/server_grpc.pb.go b/server/pkg/grpc/server_grpc.pb.go new file mode 100644 index 0000000..a78492f --- /dev/null +++ b/server/pkg/grpc/server_grpc.pb.go @@ -0,0 +1,105 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.2.0 +// - protoc v3.21.12 +// source: internal/pkg/grpc/handshake.proto + +package grpc + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// HandshakeClient is the client API for Handshake service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type HandshakeClient interface { + ReactorClientHandler(ctx context.Context, in *ReactorClientRequest, opts ...grpc.CallOption) (*ReactorClientResponse, error) +} + +type handshakeClient struct { + cc grpc.ClientConnInterface +} + +func NewHandshakeClient(cc grpc.ClientConnInterface) HandshakeClient { + return &handshakeClient{cc} +} + +func (c *handshakeClient) ReactorClientHandler(ctx context.Context, in *ReactorClientRequest, opts ...grpc.CallOption) (*ReactorClientResponse, error) { + out := new(ReactorClientResponse) + err := c.cc.Invoke(ctx, "/grpc.handshake/ReactorClientHandler", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// HandshakeServer is the server API for Handshake service. +// All implementations must embed UnimplementedHandshakeServer +// for forward compatibility +type HandshakeServer interface { + ReactorClientHandler(context.Context, *ReactorClientRequest) (*ReactorClientResponse, error) + mustEmbedUnimplementedHandshakeServer() +} + +// UnimplementedHandshakeServer must be embedded to have forward compatible implementations. +type UnimplementedHandshakeServer struct { +} + +func (UnimplementedHandshakeServer) ReactorClientHandler(context.Context, *ReactorClientRequest) (*ReactorClientResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method ReactorClientHandler not implemented") +} +func (UnimplementedHandshakeServer) mustEmbedUnimplementedHandshakeServer() {} + +// UnsafeHandshakeServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to HandshakeServer will +// result in compilation errors. +type UnsafeHandshakeServer interface { + mustEmbedUnimplementedHandshakeServer() +} + +func RegisterHandshakeServer(s grpc.ServiceRegistrar, srv HandshakeServer) { + s.RegisterService(&Handshake_ServiceDesc, srv) +} + +func _Handshake_ReactorClientHandler_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ReactorClientRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(HandshakeServer).ReactorClientHandler(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/grpc.handshake/ReactorClientHandler", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(HandshakeServer).ReactorClientHandler(ctx, req.(*ReactorClientRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// Handshake_ServiceDesc is the grpc.ServiceDesc for Handshake service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var Handshake_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "grpc.handshake", + HandlerType: (*HandshakeServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "ReactorClientHandler", + Handler: _Handshake_ReactorClientHandler_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "internal/pkg/grpc/handshake.proto", +} diff --git a/server/pkg/logging/dslogs b/server/pkg/logging/dslogs new file mode 100755 index 0000000..98a98f4 --- /dev/null +++ b/server/pkg/logging/dslogs @@ -0,0 +1,112 @@ +#!/usr/bin/env python3 +import sys +import shutil +from typing import Optional, List, Tuple, Dict + +import typer +from rich import print +from rich.columns import Columns +from rich.console import Console +from rich.traceback import install + +# fmt: off +# Mapping from topics to colors +TOPICS = { + "EXIT": "#9a9a99", + "STRT": "#67a0b2", + "PING": "#d0b343", + "SCAN": "#70c43f", + "SPWN": "#4878bc", + "STOP": "#ffff00", + #"LOG2": "#398280", + #"CMIT": "#98719f", + #"PERS": "#d08341", + #"SNAP": "#FD971F", + #"DROP": "#ff615c", + "CLNT": "#00813c", + #"TEST": "#fe2c79", + #"INFO": "#ffffff", + #"WARN": "#d08341", + "ERRO": "#fe2626", +} +# fmt: on + + +def list_topics(value: Optional[str]): + if value is None: + return value + topics = value.split(",") + for topic in topics: + if topic not in TOPICS: + raise typer.BadParameter(f"topic {topic} not recognized") + return topics + + +def main( + file: typer.FileText = typer.Argument(None, help="File to read, stdin otherwise"), + colorize: bool = typer.Option(True, "--no-color"), + n_columns: Optional[int] = typer.Option(None, "--columns", "-c"), + ignore: Optional[str] = typer.Option(None, "--ignore", "-i", callback=list_topics), + just: Optional[str] = typer.Option(None, "--just", "-j", callback=list_topics), +): + topics = list(TOPICS) + + # We can take input from a stdin (pipes) or from a file + input_ = file if file else sys.stdin + # Print just some topics or exclude some topics (good for avoiding verbose ones) + if just: + topics = just + if ignore: + topics = [lvl for lvl in topics if lvl not in set(ignore)] + + topics = set(topics) + console = Console() + width = console.size.width + + panic = False + for line in input_: + try: + time, topic, *msg = line.strip().split(" ") + # To ignore some topics + if topic not in topics: + continue + + msg = " ".join(msg) + + # Debug calls from the test suite aren't associated with + # any particular peer. Otherwise we can treat second column + # as peer id + if topic != "TEST": + i = int(msg[1]) + + # Colorize output by using rich syntax when needed + if colorize and topic in TOPICS: + color = TOPICS[topic] + msg = f"[{color}]{msg}[/{color}]" + + # Single column printing. Always the case for debug stmts in tests + if n_columns is None or topic == "TEST": + print(time, msg) + # Multi column printing, timing is dropped to maximize horizontal + # space. Heavylifting is done through rich.column.Columns object + else: + cols = ["" for _ in range(n_columns)] + msg = "" + msg + cols[i] = msg + col_width = int(width / n_columns) + cols = Columns(cols, width=col_width - 1, equal=True, expand=True) + print(cols) + except: + # Code from tests or panics does not follow format + # so we print it as is + if line.startswith("panic"): + panic = True + # Output from tests is usually important so add a + # horizontal line with hashes to make it more obvious + if not panic: + print("#" * console.width) + print(line, end="") + + +if __name__ == "__main__": + typer.run(main) diff --git a/server/pkg/logging/logging.go b/server/pkg/logging/logging.go new file mode 100644 index 0000000..aa28511 --- /dev/null +++ b/server/pkg/logging/logging.go @@ -0,0 +1,86 @@ +package logging + +import ( + "errors" + "fmt" + "log" + "os" + "strconv" + "time" +) + +func getLogType() string { + if t, ok := os.LookupEnv("LOGTYPE"); ok { + return t + } + return "DEFAULT" +} + +func getVerbosity() int { + v := os.Getenv("VERBOSE") + level := 0 + if v != "" { + var err error + level, err = strconv.Atoi(v) + if err != nil { + log.Fatalf("Invalid Verbosity %v", v) + } + } + return level +} + +type logTopic string + +const ( + // define 4 character topic abbreviations for coloring + DError logTopic = "ERRO" + DClient logTopic = "CLNT" + DStart logTopic = "STRT" + DExit logTopic = "EXIT" + DPing logTopic = "PING" + DScan logTopic = "SCAN" + DSpawn logTopic = "SPWN" + DStop logTopic = "STOP" +) + +// the list can grow + +var debugStart time.Time +var debugVerbosity int + +func init() { + + debugVerbosity = getVerbosity() + debugStart = time.Now() + if debugVerbosity > 0 { + path := "log/" + if _, err := os.Stat(path); errors.Is(err, os.ErrNotExist) { + err := os.Mkdir(path, os.ModePerm) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + } + logtype := getLogType() // start with "REACTOR" etc + timestamp := time.Now().Format("Mon-15:04:05") + filename := fmt.Sprintf("%s-%s.log", logtype, timestamp) + f, err := os.OpenFile(path+filename, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0664) + if err != nil { + log.Fatal(err) + } + log.SetOutput(f) + } + + log.SetFlags(log.Flags() &^ (log.Ldate | log.Ltime)) // turns off date and time so we can set manually +} + +// example call Debug(dClient, "R%d connecting to client %d", r.Id, c.Id) +func Debug(topic logTopic, format string, a ...interface{}) { + if debugVerbosity >= 1 { + time := time.Since(debugStart).Microseconds() + time /= 100 + prefix := fmt.Sprintf("%06d %v ", time, string(topic)) + format = prefix + format + log.Printf(format, a...) + } +} diff --git a/server/pkg/system/coordinator.go b/server/pkg/system/coordinator.go new file mode 100644 index 0000000..7980aef --- /dev/null +++ b/server/pkg/system/coordinator.go @@ -0,0 +1,80 @@ +// package System provides a way to listen for incoming connections +// and manage multiple reactor clients. +package system + +import ( + pb "dmac/pkg/grpc" + "errors" + "fmt" + "net" + "sync" + + "github.com/spf13/viper" + "google.golang.org/grpc" +) + +var ( + ErrMissingPort = errors.New("port not set") +) + +// Coordinator is runs on the server and is used to oversee +// the reactor managers as well as process incoming client connections. +type Coordinator struct { + Config *viper.Viper + listener net.Listener + grpcServer *grpc.Server + + DatabasePort int `mapstructure:"database_port"` + GRPCPort int `mapstructure:"grpc_port"` + + directory map[int]*ReactorManager + managerMu sync.RWMutex + + Err chan error + + // grpc + pb.UnimplementedHandshakeServer + pb.UnimplementedMonitoringServer +} + +// NewCentralCoordinator creates a central coordinator with the given global +// config and error channel. +func NewCentralCoordinator(config *viper.Viper, ch chan error) *Coordinator { + + rmap := make(map[int]*ReactorManager) + + return &Coordinator{ + Err: ch, + Config: config, + directory: rmap, + } +} + +// Start loads config, starts network listener and registers grpc handlers. +// Ready for new clients on return. +func (c *Coordinator) Start() error { + + if err := c.Config.Unmarshal(c); err != nil { + return err + } + + // ensure it shows up as missing + if c.GRPCPort == 0 { + c.Config.Set("grpc_port", 0) + c.Config.WriteConfig() + + return ErrMissingPort + } + + lis, err := net.Listen("tcp", fmt.Sprintf(":%v", c.GRPCPort)) + if err != nil { + return err + } + + grpcServer := grpc.NewServer() + + c.listener = lis + c.grpcServer = grpcServer + + return c.Register() +} diff --git a/server/pkg/system/database.go b/server/pkg/system/database.go new file mode 100644 index 0000000..4195fad --- /dev/null +++ b/server/pkg/system/database.go @@ -0,0 +1,13 @@ +package server + +type dbinfo struct { + url string + org string + token string + bucket string +} + +func (c *Coordinator) getReactorDatabaseCred(id int) (*dbinfo, error) { + + return &dbinfo{}, nil +} diff --git a/server/pkg/system/handler.go b/server/pkg/system/handler.go new file mode 100644 index 0000000..1f8d46a --- /dev/null +++ b/server/pkg/system/handler.go @@ -0,0 +1,48 @@ +package system + +import ( + "context" + pb "dmac/pkg/grpc" + "dmac/pkg/logging" +) + +// ClientDiscoveryHandler implements the grpc method which can be called +// by incoming clients to first make connection to the central +// coordinator and receive database credentials. +func (c *Coordinator) ReactorClientHandler( + ctx context.Context, + req *pb.ReactorClientRequest, +) (*pb.ReactorClientResponse, error) { + + id := int(req.GetId()) + + logging.Debug( + logging.DClient, + "LIS 00 reactor %v has connected\n", + id, + ) + + db, err := c.getReactorDatabaseCred(id) + if err != nil { + return &pb.ReactorClientResponse{}, err + } + + return &pb.ReactorClientResponse{ + Id: id, + Url: db.url, + Org: db.org, + Token: db.token, + Bucket: db.bucket, + }, err +} + +// ReactorStatusHandler is a gRPC handler used to handle incoming +// reactor requests containing information about said reactor. +// It will get the associate reactor manager and pass the +// request device information before returning an acknowledgement. +func (c *Coordinator) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) { + + // rm, err := c.LoadReactorManager(int(req.GetId())) + + return &pb.ReactorStatusResponse{}, nil +} diff --git a/server/pkg/system/hwinfo.go b/server/pkg/system/hwinfo.go new file mode 100644 index 0000000..08715b7 --- /dev/null +++ b/server/pkg/system/hwinfo.go @@ -0,0 +1,118 @@ +// package system uses linux commands to get hardware info for identifation +package system + +import ( + "bytes" + "errors" + "fmt" + "hash/fnv" + "net" + "os/exec" + "strings" +) + +func GetId() (int, error) { + // gets the mac address and hashes into consistent id + maccmd := fmt.Sprintf("ifconfig %v | awk '/ether / {print $2}'", et) + var stderr bytes.Buffer + var out bytes.Buffer + cmd := exec.Command("bash", "-c", maccmd) + cmd.Stdout = &out + cmd.Stderr = &stderr + if err := cmd.Run(); err != nil { + return 0, err + } + hash := fnv.New32a() + hash.Write(out.Bytes()) + id := hash.Sum32() + return int(id), nil +} + +func GetIp() (string, error) { + ipcmd := "ip route get 1 | sed 's/^.*src \([^ ]*\).*$/\1/;q'" + var stderr bytes.Buffer + var out bytes.Buffer + cmd := exec.Command("bash", "-c", ipcmd) + cmd.Stdout = &out + cmd.Stderr = &stderr + if err := cmd.Run(); err != nil { + return "", err + } + ip := strings.Trim(out.String(), " \n") + return ip, nil + +} + +func GetPort() (int, error) { + // obsolete + if addr, err := net.ResolveTCPAddr("tcp", ":0"); err != nil { + return 0, err + } else if lis, err := net.ListenTCP("tcp", addr); err != nil { + return 0, err + } else { + defer lis.Close() + return lis.Addr().(*net.TCPAddr).Port, nil + } +} + +func GetBus() (int, error) { + // preset busses + busList := map[string]int{"raspberrypi": 1, "beaglebone": 2} + // vars + var bus int + var ok bool + + if name, err =: GetModel(); err != nil { + return bus, err + } else if bus, ok = busList[b]; !ok { + return 0, errors.New(fmt.Sprintf("No bus for dev %s", b)) + } + + // returns correct bus + return bus, nil +} + +func GetModel() (string, error) { + var stderr, out bytes.Buffer + cmd := exec.Command("bash", "-c", "hostname") + cmd.Stdout = &out + cmd.Stderr = &stderr + if err := cmd.Run(); err != nil { + return "", err + } + b := out.String() + b = strings.Trim(b, " \n") + return b, nil +} + +func Get() error { + // responsible for filling out struct + //bus := map[string]int{"raspberrypi":1,"beaglebone":2} // eventually will replace this with a config file + + ipcmd := "ifconfig eth0 | awk '/inet / {print $2}'" + maccmd := "ifconfig eth0 | awk '/ether / {print $2}'" + devcmd := "lshw -C system 2>/dev/null | head -n 1" + + res := [3]bytes.Buffer{} + var stderr bytes.Buffer + cmds := []string{ipcmd, maccmd, devcmd} + for i, c := range cmds { + cmd := exec.Command("bash", "-c", c) + cmd.Stdout = &res[i] + cmd.Stderr = &stderr + err := cmd.Run() + if err != nil { + return err + } + } + // formatting + ip := res[0].String() + ip = strings.Trim(ip, " \n") + + hash := fnv.New32a() + hash.Write(res[1].Bytes()) + + b := res[2].String() + b = strings.Trim(b, " \n") + return nil +} diff --git a/server/pkg/system/listener.go b/server/pkg/system/listener.go new file mode 100644 index 0000000..db4de5c --- /dev/null +++ b/server/pkg/system/listener.go @@ -0,0 +1,84 @@ +package system + +import ( + //"log" + pb "FRMS/internal/pkg/grpc" + "FRMS/internal/pkg/logging" + "context" + "fmt" + "net" + + "google.golang.org/grpc" +) + +/* +Listens on a supplied port and sends incoming clients over a supplied channel +Waits for a response on that channel to send back to the client with DB credentials +*/ + +type Listener struct { // exporting for easy use in the short term + Port int `mapstructure:"lis"` + ClientConnections chan *ClientPacket + Err chan error + pb.UnimplementedHandshakeServer +} + +type ClientPacket struct { + *Client + Response chan *ClientResponse +} + +type Client struct { + //Ip string + //Port int + Id int + Model string + Type string +} + +type ClientResponse struct { + Port int + URL string + Org string + Token string + Bucket string +} + +func NewListener(cch chan *ClientPacket, ech chan error) *Listener { + l := &Listener{Err: ech, ClientConnections: cch} + return l +} + +func (l *Listener) Start() error { + // start grpc server and implement reciever + logging.Debug(logging.DStart, "LIS 01 Started client listener") + return l.Register() +} + +func (l *Listener) Register() error { + // creates a gRPC service and binds it to our handler + lis, err := net.Listen("tcp", fmt.Sprintf(":%v", l.Port)) + if err != nil { + return err + } + grpcServer := grpc.NewServer() + pb.RegisterHandshakeServer(grpcServer, l) + go grpcServer.Serve(lis) + logging.Debug(logging.DStart, "LIS 01 Registered on port %v", l.Port) + return nil +} + +func (l *Listener) ClientDiscoveryHandler(ctx context.Context, ping *pb.ClientRequest) (*pb.ClientResponse, error) { + // incoming client ping, notify coord and wait for DB credentials to respond + c := &Client{Id: int(ping.GetClientId()), Type: ping.GetClientType()} + logging.Debug(logging.DClient, "LIS %v %v has connected\n", c.Type, c.Id) + // prepare packet to send to coordinator + ch := make(chan *ClientResponse) + p := &ClientPacket{Client: c, Response: ch} + // blocking + l.ClientConnections <- p + resp := <-ch + // prepare object to return to client + db := &pb.Database{URL: resp.URL, ORG: resp.Org, Token: resp.Token, Bucket: resp.Bucket} + return &pb.ClientResponse{ClientId: uint32(c.Id), ServerPort: uint32(resp.Port), Database: db}, nil +} diff --git a/server/pkg/system/manager.go b/server/pkg/system/manager.go new file mode 100644 index 0000000..8ba3270 --- /dev/null +++ b/server/pkg/system/manager.go @@ -0,0 +1,29 @@ +package system + +import ( + //"log" + + _ "context" +) + +// will condense into the rm soon enough +// manager connects to client on start and returns the gRPC connection to make gRPC clients + +// type ClientManager struct { +// *Client // gives access to c.Ip c.Id etc +// Hb time.Duration // used for managing hb timer for client +// Sig chan bool +// sync.Mutex +// } + +// func NewClientManager(cl *Client) *ClientManager { +// return &ClientManager{Client: cl} +// } + +// func (m *ClientManager) UpdateClient(cl *Client) error { +// m.Lock() +// defer m.Unlock() +// logging.Debug(logging.DClient, "MAN Updating client %v", cl.Id) +// m.Client = cl +// return nil +// } diff --git a/server/pkg/system/manager_test.go b/server/pkg/system/manager_test.go new file mode 100644 index 0000000..1a3a2d4 --- /dev/null +++ b/server/pkg/system/manager_test.go @@ -0,0 +1,206 @@ +package system + +import ( + "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +// creating, starting and stopping tests + +// newManager is a helper for creating new managers for tests +func newManager(conn int, want error, t *testing.T) *Manager { + var manager *Manager + var err error + assert := assert.New(t) + + manager, err = New(conn) + + if err != want { + t.Fatalf( + `New(%d) = %v, %v, %d max connections failed`, + conn, + manager, + err, + conn, + ) + } + + assert.Equal(manager.IsActive(), Inactive, "manager should start inactive") + + return manager +} + +// TestEmptyManager creates a new manager with 0 max connections +func TestEmptyManager(t *testing.T) { + conn := 0 + newManager(conn, nil, t) +} + +// TestPostiveManager creates a new manager with valid max connections +func TestPositiveManager(t *testing.T) { + conn := rand.Intn(MaxConnAttempts) + newManager(conn, nil, t) +} + +// TestNegativeManager creates a new manager with negative max connections +func TestNegativeManager(t *testing.T) { + conn := -1 * rand.Intn(MaxConnAttempts) + newManager(conn, ErrInvalidMaxConn, t) +} + +// TestInvalidManager creates a new manager with large max connections +func TestInvalidManager(t *testing.T) { + conn := MaxConnAttempts + 0xf + newManager(conn, ErrInvalidMaxConn, t) +} + +// TestManagerLifeCycle tests that a manager can start and exit several times +func TestManagerLifeCycle(t *testing.T) { + + var manager *Manager + assert := assert.New(t) + + conn := rand.Intn(MaxConnAttempts) + manager = newManager(conn, nil, t) + + cycles := 10 + + // starting and stopping sequentially + for i := 0; i < cycles; i++ { + + assert.NoError(manager.Start(), "starting manager failed") + assert.Equal(manager.IsActive(), Active, "manager inactive after start") + + assert.NoError(manager.Stop(), "stopping manager failed") + assert.Equal(manager.IsActive(), Inactive, "manager active after stop") + } +} + +// TestManagerStopFail tests that stopping an inactive manager will error +func TestManagerStopFail(t *testing.T) { + + var manager *Manager + assert := assert.New(t) + + conn := rand.Intn(MaxConnAttempts) + manager = newManager(conn, nil, t) + + assert.NoError(manager.Start(), "starting manager failed") + + // stopping sequentially + assert.NoError(manager.Stop(), "stopping manager failed") + assert.Error(manager.Stop(), "stopping inactive manager should fail") +} + +// TestManagerStartFail tests that starting an active manager will error +func TestManagerStartFail(t *testing.T) { + + var manager *Manager + assert := assert.New(t) + + conn := rand.Intn(MaxConnAttempts) + manager = newManager(conn, nil, t) + + // starting sequentially + assert.NoError(manager.Start(), "starting manager failed") + assert.Error(manager.Start(), "starting active manager should fail") +} + +// auxiliary tests + +// TestManagerTimeout checks that timeouts exponentially backoff +func TestManagerTimeout(t *testing.T) { + var manager *Manager + assert := assert.New(t) + + conn := 10 + manager = newManager(conn, nil, t) + + assert.NoError(manager.Start(), "starting manager failed") + assert.Equal(manager.IsActive(), Active, "manager is inactive") + + prevTimeout, err := manager.Timeout() + + for i := 1; i <= conn; i++ { + assert.NoError(err, "generating timeout failed") + assert.True(prevTimeout > 0, "invalid timeout") + + timeout, err := manager.Timeout() + + if i == conn { + assert.Error(err, "allowed exceeding max attempts") + } else { + assert.NoError(err, "generating timeout failed") + assert.True( + timeout == 2*prevTimeout, + "incorrect timeout %d, expected %d", + timeout, 2*prevTimeout, + ) + } + + prevTimeout = timeout + } +} + +// TestManagerHB tests the heartbeat channel opens and closes +func TestManagerHB(t *testing.T) { + + var manager *Manager + assert := assert.New(t) + + conn := rand.Intn(MaxConnAttempts) + manager = newManager(conn, nil, t) + + assert.NoError(manager.Start(), "starting manager failed") + assert.Equal(manager.IsActive(), Active, "manager is inactive") + + ch := make(chan struct{}) + + go manager.HeartBeat(ch, 10, 0, time.Millisecond) + + for range ch { + // close on first ping + assert.NoError(manager.Stop(), "stopping manager failed") + } + + assert.Equal(manager.IsActive(), Inactive, "manager is active") +} + +// TestManagerHBTiming tests the heartbeat channel timing is correct +func TestManagerHBTiming(t *testing.T) { + + var manager *Manager + assert := assert.New(t) + + conn := rand.Intn(MaxConnAttempts) + manager = newManager(conn, nil, t) + + assert.NoError(manager.Start(), "starting manager failed") + assert.Equal(manager.IsActive(), Active, "manager is inactive") + + ch := make(chan struct{}) + hb := 100 + pings := 10 + + // expected time with tolerance for other logic and worst case rand timeout + expected := time.Duration(pings*hb+15) * time.Millisecond + + go manager.HeartBeat(ch, hb, 1, time.Millisecond) + + iter := 0 + start := time.Now() + for range ch { + // close after 10 pings + iter += 1 + if iter >= pings { + assert.NoError(manager.Stop(), "stopping manager failed") + } + } + end := time.Now() + + assert.Equal(manager.IsActive(), Inactive, "manager is active") + assert.WithinDuration(start, end, expected, "inaccurate heartbeat") +} diff --git a/server/pkg/system/reactor.go b/server/pkg/system/reactor.go new file mode 100644 index 0000000..12dc3ad --- /dev/null +++ b/server/pkg/system/reactor.go @@ -0,0 +1,121 @@ +package server + +import ( + "FRMS/internal/pkg/logging" + "FRMS/internal/pkg/manager" + "errors" + "time" +) + +var ( + ErrNoReactorManager = errors.New("no reactor manager found") +) + +// ReactorManager can be started/stopped as clients connect/disconnect. +type ReactorManager struct { + Manager // base manager interface +} + +// Manager is an interface requiring a structure that can be started +// and stopped as well as provide timeouts in milliseconds. +type Manager interface { + Start() error // status checks + Stop() error + Timeout() (time.Duration, error) // TO Generator +} + +// NewManager returns a manager fulfilling the Manager interface as well as +// any potential errors. +func NewManager(max int) (Manager, error) { + return manager.New(max) +} + +// GetReactorManager returns a reactor manager for passed id. +// Throws error if manager not found for id. +func (c *Coordinator) LoadReactorManager(id int) (*ReactorManager, error) { + + c.managerMu.RLock() + defer c.managerMu.RUnlock() + + rm, exists := c.directory[id] + + if !exists { + logging.Debug( + logging.DClient, + "RCO 00 creating manager for %v", + id, + ) + + m, err := NewManager(0) + + rm = &ReactorManager{ + Manager: m, + } + + if err = rm.Start(); err != nil { + return rm, err + } + + c.directory[id] = rm + } + + return rm, nil +} + +// // NewReactorManager takes in a client, config and channel to pass errors on. +// // Returns a new reactor manager as well as any errors that occured during +// // creation. +// // Uses MaxConnectionAttempts which defaults to 10 to prevent +// // unnessecary network load and/or timeout lengths. +// func NewReactorManager( +// ) (*ReactorManager, error) { + +// m, err := NewManager(MaxConnectionAttempts) + +// if err != nil { +// return &ReactorManager{}, err +// } + +// return r, err +// } + +// Start logs the start and calls start on the embedded manager. +func (r *ReactorManager) Start() error { + // logging.Debug(logging.DStart, "RMA starting", r.Id) + return r.Manager.Start() +} + +// Stop logs the stop and calls stop on the embedded manager. +func (r *ReactorManager) Stop() error { + // logging.Debug(logging.DExit, "RMA %v stopping", r.Id) + return r.Manager.Stop() +} + +// UpdateClient is used to change the underlying manager client if there +// changes to its data. +// +// BUG(Keegan): Client is not protected by a lock and may lead to races +// func (r *ReactorManager) UpdateClient(cl *Client) error { +// logging.Debug(logging.DClient, "RMA %v updating client", r.Id) +// r.Client = cl +// return nil +// } + +// // ReactorDeviceHandler processes incoming device information and +// // updates the manager accordingly. +// func (r *ReactorManager) ReactorDeviceHandler(devs []*pb.Device) error { + +// logging.Debug(logging.DClient, "CCO recieved ping from %v", r.Id) + +// for _, dev := range devs { +// logging.Debug( +// logging.DClient, +// "CCO %v device %v is %v", +// r.Id, +// dev.GetAddr(), +// dev.GetStatus().String(), +// ) +// } + +// return nil +// } diff --git a/server/pkg/system/reactormanager.go b/server/pkg/system/reactormanager.go new file mode 100644 index 0000000..d31108f --- /dev/null +++ b/server/pkg/system/reactormanager.go @@ -0,0 +1,149 @@ +package server + +import ( + pb "FRMS/internal/pkg/grpc" + "FRMS/internal/pkg/logging" + "FRMS/internal/pkg/manager" + "time" + + //"FRMS/internal/pkg/device" + "context" + "fmt" + _ "log" + + "github.com/spf13/viper" +) + +// manager stuff + +type Manager interface { + Start() error // status checks + Exit() error + Timeout() (time.Duration, error) // TO Generator +} + +func NewManager(max int) Manager { + // takes a heartbeat and max connection attempts + return manager.New(max) +} + +type ReactorManager struct { + Manager // base manager interface + // *ClientManager // client manager (OUTDATED) + *Client // access to ID etc + // StatusMon *StatusMonitor putting on pause + // *ReactorDevices + Config *viper.Viper // config to update + Err chan error +} + +// type ReactorDevices struct { +// // device struct +// Devices map[int]DeviceManager +// sync.RWMutex +// } + +func NewReactorManager(cl *Client, config *viper.Viper, errCh chan error) *ReactorManager { + // making managers + m := NewManager(6) + r := &ReactorManager{ + Manager: m, + Client: cl, + Config: config, + Err: errCh, + } + return r +} + +func (r *ReactorManager) Start() error { + // allows for extra stuff + logging.Debug(logging.DStart, "RMA %v starting", r.Id) + return r.Manager.Start() + //go r.StatusMon.Send(&DeviceInfo{Id: r.Id, Type: "Reactor", Status: "[green]ONLINE[white]"}, "Reactor") +} + +func (r *ReactorManager) Exit() error { + // allows for extra stuff + logging.Debug(logging.DExit, "RMA %v exiting", r.Id) + return r.Manager.Exit() + //go r.StatusMon.Send(&DeviceInfo{Id: r.Id, Type: "Reactor", Status: "[red]OFFLINE[white]", Data: fmt.Sprintf("Last Seen %v", time.Now().Format("Mon at 03:04:05pm MST"))}, "Reactor") +} + +func (r *ReactorManager) UpdateClient(cl *Client) error { + // this is probably unnessecary + fmt.Printf("Reactor Manager %d updating client!\n", r.Id) + r.Client = cl + return nil +} + +func (r *ReactorManager) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) { + // function client will call to update reactor information + //go r.PingReset() + fmt.Printf("Recieved ping from %d!\n", req.GetId()) + // update devices/sensors + for _, dev := range req.GetDevices() { + fmt.Printf("Device %d is %s ", dev.GetAddr(), dev.GetStatus().String()) + } + fmt.Printf("\n") + // go r.UpdateDevices(req.GetDevices()) + + return &pb.ReactorStatusResponse{Id: int32(r.Id)}, nil +} + +// // device stuff + +// type DeviceManager interface { +// LoadConfig() error +// UpdateStatus(string) error +// String() string // printing +// } + +// func NewDeviceManager(addr int, config *viper.Viper, prefix string) (DeviceManager, error) { +// // returns a manager struct +// return device.NewDeviceManager(addr, config, prefix) +// } + +//func (r *ReactorManager) UpdateDevices(devs []*pb.Device) { +// // pass updates to correct manager +// r.ReactorDevices.RLock() // read lock only +// defer r.ReactorDevices.RUnlock() + +// for _, dev := range devs { +// // looping over devs +// if dm, ok := r.ReactorDevices.Devices[int(dev.GetAddr())]; ok { +// // device manager found +// go dm.UpdateStatus(dev.GetStatus().String()) +// //fmt.Println(dm) +// } else { +// // not found +// go r.AddDevice(dev, r.Id, r.Config, r.Err) +// } +// } +//} + +// func (r *ReactorDevices) AddDevice(dev *pb.Device, id int, config *viper.Viper, errCh chan error) { + +// // setting vars +// prefix := fmt.Sprintf("reactors.%d.", id) +// addr := int(dev.GetAddr()) +// var dm DeviceManager +// var err error +// // write locking +// r.Lock() +// defer r.Unlock() + +// if dm, err = NewDeviceManager(addr, config, prefix); err != nil { +// errCh <- err +// } + +// // setting status +// if err = dm.UpdateStatus(dev.GetStatus().String()); err != nil { +// errCh <- err +// } + +// // loading config +// if err = dm.LoadConfig(); err != nil { +// errCh <- err +// } +// r.Devices[int(addr)] = dm +// } diff --git a/server/pkg/system/register.go b/server/pkg/system/register.go new file mode 100644 index 0000000..7be0111 --- /dev/null +++ b/server/pkg/system/register.go @@ -0,0 +1,19 @@ +package server + +import ( + pb "FRMS/internal/pkg/grpc" + "FRMS/internal/pkg/logging" +) + +func (c *Coordinator) Register() error { + // register services + pb.RegisterHandshakeServer(c.grpcServer, c) + + go c.grpcServer.Serve(c.listener) + // testing + pb.RegisterMonitoringServer(c.grpcServer, c) + + logging.Debug(logging.DStart, "CCO 00 registered grpc") + + return nil +} diff --git a/server/pkg/system/system.go b/server/pkg/system/system.go new file mode 100644 index 0000000..11d1c3f --- /dev/null +++ b/server/pkg/system/system.go @@ -0,0 +1,309 @@ +package server + +import ( + _ "fmt" + // sensor components +) + +/* + +type StatusMonitor struct { + // allows for embedding into managers + TransactionId chan uint32 // monotonically increases to track outdated reqs + DevChan chan *DeviceInfo // channel for device status + ReactorChan chan *DeviceInfo // channel for reactor status + *SystemViewer + DevBuf *devbuf + sync.Mutex +} + +type devbuf struct { + ReactorId int // reactor we are looking at, if any + Buf map[string]map[int]*DeviceInfo // convienent way to store/seperate device data + sync.Mutex +} + +func NewBuffer() map[string]map[int]*DeviceInfo { + rbuf := make(map[int]*DeviceInfo) + dbuf := make(map[int]*DeviceInfo) + sbuf := make(map[string]map[int]*DeviceInfo) + sbuf["Reactor"] = rbuf + sbuf["Device"] = dbuf + return sbuf +} + +func NewStatusMonitor(t string, id int, sys *SystemViewer) *StatusMonitor { + tid := make(chan uint32) + sm := &StatusMonitor{TransactionId: tid} + sm.SystemViewer = sys + logging.Debug(logging.DClient, "SYS Creating new status monitor") + if t == "Reactor" { + // reactor status monitor + sm.ReactorChan = sys.AddReactorSender() + sm.DevChan = sys.AddDeviceSender(id) + go sm.GenerateIds() + } else { + // tui status monitor + sbuf := NewBuffer() + //sm.ReactorChan, sbuf["Reactor"] = sys.AddListener(id,0) + sm.DevBuf = &devbuf{Buf: sbuf} // makes it easier to work with + go sm.UpdateListener(id, 0) + } + return sm +} + +func (s *StatusMonitor) GenerateIds() { + var id uint32 + id = 0 + for { + s.TransactionId <- id + id += 1 + } +} + +func (s *StatusMonitor) Send(d *DeviceInfo, dtype string) { + d.TransactionId = <-s.TransactionId + logging.Debug(logging.DClient, "SYS 01 Sending update for: %s (%s)", d.Type, d.Status) + if dtype == "Reactor" { + s.ReactorChan <- d + } else { + s.DevChan <- d + } +} + +func (s *StatusMonitor) GetBuffer() []*DeviceInfo { + // also clears buffer + s.DevBuf.Lock() + defer s.DevBuf.Unlock() + res := []*DeviceInfo{} + if len(s.DevBuf.Buf["Reactor"]) != 0 || len(s.DevBuf.Buf["Device"]) != 0 { + logging.Debug(logging.DClient, "Clearing buff %v", s.DevBuf.Buf) + } + for _, devs := range s.DevBuf.Buf { + for _, dev := range devs { + // loops over reactors then devices + res = append(res, dev) + } + } + s.DevBuf.Buf = NewBuffer() // clearing old buffer + return res +} + +func (s *StatusMonitor) UpdateListener(tid, reactorId uint32) { + s.DevBuf.Lock() + defer s.DevBuf.Unlock() + // clearing proper buffer + if reactorId == 0 { + logging.Debug(logging.DClient, "SYS 01 Adding %v as reactor listener", tid) + s.DevBuf.Buf["Reactor"] = make(map[uint32]*DeviceInfo) + s.ReactorChan, s.DevBuf.Buf["Reactor"] = s.SystemViewer.AddListener(tid, reactorId) + go s.Listen(s.ReactorChan) + } else { + logging.Debug(logging.DClient, "SYS 01 Adding %v as reactor %v listener", tid, reactorId) + s.DevBuf.Buf["Device"] = make(map[uint32]*DeviceInfo) // clearing old devices + if s.DevBuf.ReactorId != reactorId && s.DevBuf.ReactorId != 0 { + go s.SystemViewer.RemoveListener(s.DevBuf.ReactorId, tid) + } + s.DevBuf.ReactorId = reactorId + s.DevChan, s.DevBuf.Buf["Device"] = s.SystemViewer.AddListener(tid, reactorId) + go s.Listen(s.DevChan) + } +} + +func (s *StatusMonitor) UpdateBuffer(d *DeviceInfo, dtype string, ch chan *DeviceInfo) { + s.DevBuf.Lock() + defer s.DevBuf.Unlock() + logging.Debug(logging.DClient, "SYS 01 Dev %v update requested", d) + if dev, exists := s.DevBuf.Buf[dtype][d.Id]; exists { + // already a device in the buffer + if dev.TransactionId > d.TransactionId { + logging.Debug(logging.DClient, "SYS 01 Update Processed. Old: %v, New: %v \n", dev, d) + d = dev // not sure if i can do this lol + } + } + if ch == s.ReactorChan || ch == s.DevChan { + // hacky way to check if the device came from a listener of a current channel + s.DevBuf.Buf[dtype][d.Id] = d + } else { + logging.Debug(logging.DClient, "Dev out of date!") + } +} + +func (s *StatusMonitor) Listen(ch chan *DeviceInfo) { + for dev := range ch { + if dev.Type == "Reactor" { + go s.UpdateBuffer(dev, "Reactor", ch) + } else { + go s.UpdateBuffer(dev, "Device", ch) + } + } +} + +type InfoStream struct { + // base stream for any reactor/device + // NewSender embedds the channel to send updates on + // NewListener will add the statusmon to the list of devs to echo to + Stream chan *DeviceInfo + Layout *syslayout + *listeners +} + +type listeners struct { + sync.RWMutex + Listeners map[uint32]*lischan +} + +type lischan struct { + sync.WaitGroup + StatusChan chan *DeviceInfo +} + +type syslayout struct { + Devs map[uint32]*DeviceInfo + uint32 //index + sync.RWMutex +} + +func NewLisChan(ch chan *DeviceInfo) *lischan { + l := &lischan{StatusChan: ch} + return l +} + +func NewInfoStream() *InfoStream { + dch := make(chan *DeviceInfo) + s := &InfoStream{Stream: dch} + m := make(map[uint32]*DeviceInfo) + s.Layout = &syslayout{Devs: m} + s.listeners = &listeners{Listeners: make(map[uint32]*lischan)} + return s +} + +func (s *InfoStream) Start() { + // consistency + go s.Listen() +} + +// goal is to hook every new manager into the reactor status chan +func (s *InfoStream) AddSender() chan *DeviceInfo { + return s.Stream +} + +func (s *InfoStream) Listen() { + for deviceInfo := range s.Stream { + go s.Update(deviceInfo) + } +} + +func (s *InfoStream) Update(d *DeviceInfo) { + s.Layout.Lock() + defer s.Layout.Unlock() + if dev, ok := s.Layout.Devs[d.Id]; !ok { + d.Index = s.Layout.uint32 + s.Layout.uint32 += 1 + } else { + d.Index = dev.Index + } + s.Layout.Devs[d.Id] = d + go s.Echo(d) +} + +func (l *listeners) Echo(d *DeviceInfo) { + l.RLock() + defer l.RUnlock() + // read only lock + for _, lis := range l.Listeners { + lis.Add(1) + go func(listener *lischan, dev *DeviceInfo) { + defer listener.Done() + listener.StatusChan <- dev + }(lis, d) + } +} + +func (s *InfoStream) AddListener(id int, ch chan *DeviceInfo) map[uint32]*DeviceInfo { + // if i get a memory leak ill eat my shoe + s.listeners.Lock() + defer s.listeners.Unlock() + if _, ok := s.listeners.Listeners[id]; ok { + // exists + go s.RemoveListener(id) + } + s.listeners.Listeners[id] = NewLisChan(ch) + logging.Debug(logging.DClient, "SYS 01 Getting Devices") + //s.Layout.RLock() + //defer s.Layout.RUnlock() + logging.Debug(logging.DClient, "SYS 01 Returning Devices %v", s.Layout.Devs) + return s.Layout.Devs +} + +func (l *listeners) RemoveListener(id int) { + l.Lock() + defer l.Unlock() + if lis, ok := l.Listeners[id]; ok { + delete(l.Listeners, id) + go func(ls *lischan) { + ls.Wait() + close(ls.StatusChan) + }(lis) + } +} + +// status buffer maintaince + +type SystemViewer struct { + // stores system directory and provide methods to be embedded in managers + ReactorStream *InfoStream // can add itself as a listener to provide methods + DeviceStream *ds +} + +type ds struct { + Reactors map[uint32]*InfoStream //map from reactor id to its device info stream + sync.Mutex +} + +func NewSystemViewer() *SystemViewer { + rs := NewInfoStream() + s := &SystemViewer{ReactorStream: rs} + m := make(map[uint32]*InfoStream) + s.DeviceStream = &ds{Reactors: m} + return s +} + +func (s *SystemViewer) Start() { + go s.ReactorStream.Start() +} + +func (s *SystemViewer) AddReactorSender() chan *DeviceInfo { + return s.ReactorStream.AddSender() +} + +func (s *SystemViewer) AddDeviceSender(reactorId uint32) chan *DeviceInfo { + s.DeviceStream.Lock() + defer s.DeviceStream.Unlock() + var ds *InfoStream + var ok bool + if ds, ok = s.DeviceStream.Reactors[reactorId]; !ok { + ds = NewInfoStream() + s.DeviceStream.Reactors[reactorId] = ds + go ds.Start() + } + return ds.AddSender() +} + +func (s *SystemViewer) AddListener(id, rid int) (chan *DeviceInfo, map[uint32]*DeviceInfo) { + // returns a listener for that chan + ch := make(chan *DeviceInfo) + if rid != 0 { + return ch, s.DeviceStream.Reactors[rid].AddListener(id, ch) + } else { + return ch, s.ReactorStream.AddListener(id, ch) + } +} + +func (s *SystemViewer) RemoveListener(rid, tid int) { + // removes chan for specific tid and rid + s.DeviceStream.Lock() + defer s.DeviceStream.Unlock() + go s.DeviceStream.Reactors[rid].RemoveListener(tid) +} +*/ diff --git a/server/pkg/websocket/connect.go b/server/pkg/websocket/connect.go new file mode 100644 index 0000000..70127af --- /dev/null +++ b/server/pkg/websocket/connect.go @@ -0,0 +1,90 @@ +// Package websocket sets up websocket connections with clients and allows live reactor readouts. +package websocket + +// creates websocket server and upgrades incoming connections +import ( + "encoding/json" + "fmt" + "net/http" + + ws "github.com/gorilla/websocket" +) + +type ReactorTest struct { + Id int `json:"id"` + Name string `json:"name"` +} + +type WebSocket struct { + // dummy struct for interface + N string +} + +func New() *WebSocket { + return &WebSocket{} +} + +func (s *WebSocket) Start() { + fmt.Println("Starting ws server!") + setupRoutes() + http.ListenAndServe(":8080", nil) +} + +// default opts allow all origins +var upgrader = ws.Upgrader{ + CheckOrigin: func(r *http.Request) bool { return true }, +} + +// reader +func reader(conn *ws.Conn) { + + for { + // read forever + //messageType, p, err := conn.ReadMessage() + _, p, err := conn.ReadMessage() + + if err != nil { + if ws.IsCloseError(err, ws.CloseNormalClosure, ws.CloseGoingAway) { + // normally closed + return + } + panic(err) + } + fmt.Printf("Msg: %s\n", string(p)) + } +} + +func serverWs(w http.ResponseWriter, r *http.Request) { + fmt.Println(r.Host) + + websocket, err := upgrader.Upgrade(w, r, nil) + + if err != nil { + panic(err) + } + + // try sending reactor + t1 := &ReactorTest{Id: 1111, Name: "test1"} + t2 := &ReactorTest{Id: 1112, Name: "test2"} + t3 := &ReactorTest{Id: 1113, Name: "test3"} + n := []*ReactorTest{t1, t2, t3} + msg, err := json.Marshal(n) + if err != nil { + panic(err) + } + // pass to connection + if err := websocket.WriteMessage(ws.TextMessage, msg); err != nil { + panic(err) + } + + // pass to reader + reader(websocket) +} + +func setupRoutes() { + http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, "Simple Server") + }) + + http.HandleFunc("/ws", serverWs) +}