diff --git a/:wq b/:wq new file mode 100644 index 0000000..8272d9b --- /dev/null +++ b/:wq @@ -0,0 +1,18 @@ +syntax = "proto3"; +package grpc; + +option go_package = "internal/pkg/grpc"; + +service handshake { + rpc ClientDiscoveryHandler(ClientRequest) returns (ClientResponse); +} + +message ClientRequest { + uint32 clientId = 1; + string clientType = 2; +} + +message ClientResponse { + uint32 clientId = 1; + uint32 serverPort = 2; +} diff --git a/Dockerfile.server b/Dockerfile.server new file mode 100644 index 0000000..abec284 --- /dev/null +++ b/Dockerfile.server @@ -0,0 +1,19 @@ +# syntax=docker/dockerfile:1 +FROM golang:1.18 + +WORKDIR /app + +COPY . . + +RUN go mod download + +RUN go build -o /server ./cmd/server/main.go + +EXPOSE 2022 +EXPOSE 2023 +EXPOSE 2024 + +CMD [ "/server" ] + + + diff --git a/cmd/server/main.go b/cmd/server/main.go index 8c585d7..cecb068 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -1,7 +1,7 @@ package main import ( - "net/http" + _"net/http" _ "net/http/pprof" "flag" "log" @@ -15,37 +15,31 @@ type listener interface { Start() } -func NewListener(s string,ch chan error) (listener, error) { - return server.NewListener(s, ch) +func NewListener(ch chan error) listener { + return server.NewListener(ch) } func main() { // lets get this bread // all we need to do is call the reactor coordinator and thats it - go func() { - fmt.Println(http.ListenAndServe("localhost:6060",nil)) - }() - flag.Usage = func() { - w := flag.CommandLine.Output() - fmt.Fprintf(w,"Usage: %s [eth*,wlan*,etc.]\n",os.Args[0]) - } - flag.Parse() - if flag.NArg() != 1 { - flag.Usage() - fmt.Println("Specify ifconfig interface. See man ifconfig for further information") - os.Exit(1) - } - ifconfig := string(flag.Arg(0)) + // removing os flags in favor of env vars + // go func() { + // fmt.Println(http.ListenAndServe("localhost:6060",nil)) + // }() ch := make(chan error) - l, err := NewListener(ifconfig,ch) - if err != nil { - log.Fatal(err) + // creating listener + l := NewListener() + + if port := os.Getenv("gRPC_PORT"); port == 0 { + l.Port = 2022 // default docker port } + + db := os.Getenv("DATABASE_URL") // database url + go l.Start() logging.Debug(logging.DStart, "CCO 01 Server started") err = <-ch // blocking to wait for any errors and keep alive otherwise if err != nil { - //fmt.Printf("ERROR: %v\n",err) log.Fatal(err) } } diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..25f52cc --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,10 @@ +version: "2.1" +services: + frms: + image: server + ports: + - "2022:2022" + - "2023:2023" + - "2024:2024" + environment: + - DOCKER=true diff --git a/internal/pkg/grpc/monitoring.pb.go b/internal/pkg/grpc/monitoring.pb.go index 8596741..84943fa 100644 --- a/internal/pkg/grpc/monitoring.pb.go +++ b/internal/pkg/grpc/monitoring.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.27.1 +// protoc-gen-go v1.28.0 // protoc v3.6.1 // source: internal/pkg/grpc/monitoring.proto @@ -212,14 +212,14 @@ var file_internal_pkg_grpc_monitoring_proto_rawDesc = []byte{ 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x64, 0x61, 0x74, - 0x61, 0x32, 0x59, 0x0a, 0x0a, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x69, 0x6e, 0x67, 0x12, - 0x4b, 0x0a, 0x10, 0x47, 0x65, 0x74, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x53, 0x74, 0x61, - 0x74, 0x75, 0x73, 0x12, 0x1a, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x61, 0x63, 0x74, - 0x6f, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, - 0x1b, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x53, 0x74, - 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x13, 0x5a, 0x11, - 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x67, 0x72, 0x70, - 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x61, 0x32, 0x5d, 0x0a, 0x0a, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x69, 0x6e, 0x67, 0x12, + 0x4f, 0x0a, 0x14, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, + 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x12, 0x1a, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x52, + 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x61, 0x63, 0x74, + 0x6f, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x42, 0x13, 0x5a, 0x11, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x6b, 0x67, + 0x2f, 0x67, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -242,8 +242,8 @@ var file_internal_pkg_grpc_monitoring_proto_goTypes = []interface{}{ } var file_internal_pkg_grpc_monitoring_proto_depIdxs = []int32{ 2, // 0: grpc.ReactorStatusResponse.devices:type_name -> grpc.Device - 0, // 1: grpc.monitoring.GetReactorStatus:input_type -> grpc.ReactorStatusRequest - 1, // 2: grpc.monitoring.GetReactorStatus:output_type -> grpc.ReactorStatusResponse + 0, // 1: grpc.monitoring.ReactorStatusHandler:input_type -> grpc.ReactorStatusRequest + 1, // 2: grpc.monitoring.ReactorStatusHandler:output_type -> grpc.ReactorStatusResponse 2, // [2:3] is the sub-list for method output_type 1, // [1:2] is the sub-list for method input_type 1, // [1:1] is the sub-list for extension type_name diff --git a/internal/pkg/grpc/monitoring.proto b/internal/pkg/grpc/monitoring.proto index fef5d26..794da85 100644 --- a/internal/pkg/grpc/monitoring.proto +++ b/internal/pkg/grpc/monitoring.proto @@ -4,7 +4,7 @@ package grpc; option go_package = "internal/pkg/grpc"; service monitoring { - rpc GetReactorStatus(ReactorStatusRequest) returns (ReactorStatusResponse); + rpc ReactorStatusHandler(ReactorStatusRequest) returns (ReactorStatusResponse); } message ReactorStatusRequest { diff --git a/internal/pkg/grpc/monitoring_grpc.pb.go b/internal/pkg/grpc/monitoring_grpc.pb.go index b8becba..ff4293b 100644 --- a/internal/pkg/grpc/monitoring_grpc.pb.go +++ b/internal/pkg/grpc/monitoring_grpc.pb.go @@ -22,7 +22,7 @@ const _ = grpc.SupportPackageIsVersion7 // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type MonitoringClient interface { - GetReactorStatus(ctx context.Context, in *ReactorStatusRequest, opts ...grpc.CallOption) (*ReactorStatusResponse, error) + ReactorStatusHandler(ctx context.Context, in *ReactorStatusRequest, opts ...grpc.CallOption) (*ReactorStatusResponse, error) } type monitoringClient struct { @@ -33,9 +33,9 @@ func NewMonitoringClient(cc grpc.ClientConnInterface) MonitoringClient { return &monitoringClient{cc} } -func (c *monitoringClient) GetReactorStatus(ctx context.Context, in *ReactorStatusRequest, opts ...grpc.CallOption) (*ReactorStatusResponse, error) { +func (c *monitoringClient) ReactorStatusHandler(ctx context.Context, in *ReactorStatusRequest, opts ...grpc.CallOption) (*ReactorStatusResponse, error) { out := new(ReactorStatusResponse) - err := c.cc.Invoke(ctx, "/grpc.monitoring/GetReactorStatus", in, out, opts...) + err := c.cc.Invoke(ctx, "/grpc.monitoring/ReactorStatusHandler", in, out, opts...) if err != nil { return nil, err } @@ -46,7 +46,7 @@ func (c *monitoringClient) GetReactorStatus(ctx context.Context, in *ReactorStat // All implementations must embed UnimplementedMonitoringServer // for forward compatibility type MonitoringServer interface { - GetReactorStatus(context.Context, *ReactorStatusRequest) (*ReactorStatusResponse, error) + ReactorStatusHandler(context.Context, *ReactorStatusRequest) (*ReactorStatusResponse, error) mustEmbedUnimplementedMonitoringServer() } @@ -54,8 +54,8 @@ type MonitoringServer interface { type UnimplementedMonitoringServer struct { } -func (UnimplementedMonitoringServer) GetReactorStatus(context.Context, *ReactorStatusRequest) (*ReactorStatusResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method GetReactorStatus not implemented") +func (UnimplementedMonitoringServer) ReactorStatusHandler(context.Context, *ReactorStatusRequest) (*ReactorStatusResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method ReactorStatusHandler not implemented") } func (UnimplementedMonitoringServer) mustEmbedUnimplementedMonitoringServer() {} @@ -70,20 +70,20 @@ func RegisterMonitoringServer(s grpc.ServiceRegistrar, srv MonitoringServer) { s.RegisterService(&Monitoring_ServiceDesc, srv) } -func _Monitoring_GetReactorStatus_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { +func _Monitoring_ReactorStatusHandler_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(ReactorStatusRequest) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(MonitoringServer).GetReactorStatus(ctx, in) + return srv.(MonitoringServer).ReactorStatusHandler(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/grpc.monitoring/GetReactorStatus", + FullMethod: "/grpc.monitoring/ReactorStatusHandler", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(MonitoringServer).GetReactorStatus(ctx, req.(*ReactorStatusRequest)) + return srv.(MonitoringServer).ReactorStatusHandler(ctx, req.(*ReactorStatusRequest)) } return interceptor(ctx, in, info, handler) } @@ -96,8 +96,8 @@ var Monitoring_ServiceDesc = grpc.ServiceDesc{ HandlerType: (*MonitoringServer)(nil), Methods: []grpc.MethodDesc{ { - MethodName: "GetReactorStatus", - Handler: _Monitoring_GetReactorStatus_Handler, + MethodName: "ReactorStatusHandler", + Handler: _Monitoring_ReactorStatusHandler_Handler, }, }, Streams: []grpc.StreamDesc{}, diff --git a/internal/pkg/grpc/server.pb.go b/internal/pkg/grpc/server.pb.go index d75d198..cf2c410 100644 --- a/internal/pkg/grpc/server.pb.go +++ b/internal/pkg/grpc/server.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.27.1 +// protoc-gen-go v1.28.0 // protoc v3.6.1 // source: internal/pkg/grpc/server.proto @@ -20,20 +20,17 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) -type ReactorClientRequest struct { +type ClientRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - ClientId uint32 `protobuf:"varint,1,opt,name=clientId,proto3" json:"clientId,omitempty"` - ClientIp string `protobuf:"bytes,2,opt,name=clientIp,proto3" json:"clientIp,omitempty"` - ClientPort int32 `protobuf:"varint,3,opt,name=clientPort,proto3" json:"clientPort,omitempty"` - ClientModel string `protobuf:"bytes,4,opt,name=clientModel,proto3" json:"clientModel,omitempty"` - ClientType string `protobuf:"bytes,5,opt,name=clientType,proto3" json:"clientType,omitempty"` + ClientId uint32 `protobuf:"varint,1,opt,name=clientId,proto3" json:"clientId,omitempty"` + ClientType string `protobuf:"bytes,2,opt,name=clientType,proto3" json:"clientType,omitempty"` } -func (x *ReactorClientRequest) Reset() { - *x = ReactorClientRequest{} +func (x *ClientRequest) Reset() { + *x = ClientRequest{} if protoimpl.UnsafeEnabled { mi := &file_internal_pkg_grpc_server_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -41,13 +38,13 @@ func (x *ReactorClientRequest) Reset() { } } -func (x *ReactorClientRequest) String() string { +func (x *ClientRequest) String() string { return protoimpl.X.MessageStringOf(x) } -func (*ReactorClientRequest) ProtoMessage() {} +func (*ClientRequest) ProtoMessage() {} -func (x *ReactorClientRequest) ProtoReflect() protoreflect.Message { +func (x *ClientRequest) ProtoReflect() protoreflect.Message { mi := &file_internal_pkg_grpc_server_proto_msgTypes[0] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -59,57 +56,36 @@ func (x *ReactorClientRequest) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use ReactorClientRequest.ProtoReflect.Descriptor instead. -func (*ReactorClientRequest) Descriptor() ([]byte, []int) { +// Deprecated: Use ClientRequest.ProtoReflect.Descriptor instead. +func (*ClientRequest) Descriptor() ([]byte, []int) { return file_internal_pkg_grpc_server_proto_rawDescGZIP(), []int{0} } -func (x *ReactorClientRequest) GetClientId() uint32 { +func (x *ClientRequest) GetClientId() uint32 { if x != nil { return x.ClientId } return 0 } -func (x *ReactorClientRequest) GetClientIp() string { - if x != nil { - return x.ClientIp - } - return "" -} - -func (x *ReactorClientRequest) GetClientPort() int32 { - if x != nil { - return x.ClientPort - } - return 0 -} - -func (x *ReactorClientRequest) GetClientModel() string { - if x != nil { - return x.ClientModel - } - return "" -} - -func (x *ReactorClientRequest) GetClientType() string { +func (x *ClientRequest) GetClientType() string { if x != nil { return x.ClientType } return "" } -type ReactorClientResponse struct { +type ClientResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - ClientId uint32 `protobuf:"varint,1,opt,name=clientId,proto3" json:"clientId,omitempty"` - Success bool `protobuf:"varint,2,opt,name=Success,proto3" json:"Success,omitempty"` + ClientId uint32 `protobuf:"varint,1,opt,name=clientId,proto3" json:"clientId,omitempty"` + ServerPort uint32 `protobuf:"varint,2,opt,name=serverPort,proto3" json:"serverPort,omitempty"` } -func (x *ReactorClientResponse) Reset() { - *x = ReactorClientResponse{} +func (x *ClientResponse) Reset() { + *x = ClientResponse{} if protoimpl.UnsafeEnabled { mi := &file_internal_pkg_grpc_server_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -117,13 +93,13 @@ func (x *ReactorClientResponse) Reset() { } } -func (x *ReactorClientResponse) String() string { +func (x *ClientResponse) String() string { return protoimpl.X.MessageStringOf(x) } -func (*ReactorClientResponse) ProtoMessage() {} +func (*ClientResponse) ProtoMessage() {} -func (x *ReactorClientResponse) ProtoReflect() protoreflect.Message { +func (x *ClientResponse) ProtoReflect() protoreflect.Message { mi := &file_internal_pkg_grpc_server_proto_msgTypes[1] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -135,129 +111,19 @@ func (x *ReactorClientResponse) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use ReactorClientResponse.ProtoReflect.Descriptor instead. -func (*ReactorClientResponse) Descriptor() ([]byte, []int) { +// Deprecated: Use ClientResponse.ProtoReflect.Descriptor instead. +func (*ClientResponse) Descriptor() ([]byte, []int) { return file_internal_pkg_grpc_server_proto_rawDescGZIP(), []int{1} } -func (x *ReactorClientResponse) GetClientId() uint32 { +func (x *ClientResponse) GetClientId() uint32 { if x != nil { return x.ClientId } return 0 } -func (x *ReactorClientResponse) GetSuccess() bool { - if x != nil { - return x.Success - } - return false -} - -type TUIClientRequest struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - ClientId uint32 `protobuf:"varint,1,opt,name=clientId,proto3" json:"clientId,omitempty"` -} - -func (x *TUIClientRequest) Reset() { - *x = TUIClientRequest{} - if protoimpl.UnsafeEnabled { - mi := &file_internal_pkg_grpc_server_proto_msgTypes[2] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *TUIClientRequest) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*TUIClientRequest) ProtoMessage() {} - -func (x *TUIClientRequest) ProtoReflect() protoreflect.Message { - mi := &file_internal_pkg_grpc_server_proto_msgTypes[2] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use TUIClientRequest.ProtoReflect.Descriptor instead. -func (*TUIClientRequest) Descriptor() ([]byte, []int) { - return file_internal_pkg_grpc_server_proto_rawDescGZIP(), []int{2} -} - -func (x *TUIClientRequest) GetClientId() uint32 { - if x != nil { - return x.ClientId - } - return 0 -} - -type TUIClientResponse struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - ClientId uint32 `protobuf:"varint,1,opt,name=clientId,proto3" json:"clientId,omitempty"` - ServerIp string `protobuf:"bytes,2,opt,name=serverIp,proto3" json:"serverIp,omitempty"` - ServerPort int32 `protobuf:"varint,3,opt,name=serverPort,proto3" json:"serverPort,omitempty"` -} - -func (x *TUIClientResponse) Reset() { - *x = TUIClientResponse{} - if protoimpl.UnsafeEnabled { - mi := &file_internal_pkg_grpc_server_proto_msgTypes[3] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *TUIClientResponse) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*TUIClientResponse) ProtoMessage() {} - -func (x *TUIClientResponse) ProtoReflect() protoreflect.Message { - mi := &file_internal_pkg_grpc_server_proto_msgTypes[3] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use TUIClientResponse.ProtoReflect.Descriptor instead. -func (*TUIClientResponse) Descriptor() ([]byte, []int) { - return file_internal_pkg_grpc_server_proto_rawDescGZIP(), []int{3} -} - -func (x *TUIClientResponse) GetClientId() uint32 { - if x != nil { - return x.ClientId - } - return 0 -} - -func (x *TUIClientResponse) GetServerIp() string { - if x != nil { - return x.ServerIp - } - return "" -} - -func (x *TUIClientResponse) GetServerPort() int32 { +func (x *ClientResponse) GetServerPort() uint32 { if x != nil { return x.ServerPort } @@ -269,46 +135,23 @@ var File_internal_pkg_grpc_server_proto protoreflect.FileDescriptor var file_internal_pkg_grpc_server_proto_rawDesc = []byte{ 0x0a, 0x1e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x12, 0x04, 0x67, 0x72, 0x70, 0x63, 0x22, 0xb0, 0x01, 0x0a, 0x14, 0x52, 0x65, 0x61, 0x63, 0x74, - 0x6f, 0x72, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, - 0x1a, 0x0a, 0x08, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x0d, 0x52, 0x08, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x63, - 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x49, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x63, - 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x49, 0x70, 0x12, 0x1e, 0x0a, 0x0a, 0x63, 0x6c, 0x69, 0x65, 0x6e, - 0x74, 0x50, 0x6f, 0x72, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0a, 0x63, 0x6c, 0x69, - 0x65, 0x6e, 0x74, 0x50, 0x6f, 0x72, 0x74, 0x12, 0x20, 0x0a, 0x0b, 0x63, 0x6c, 0x69, 0x65, 0x6e, - 0x74, 0x4d, 0x6f, 0x64, 0x65, 0x6c, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x63, 0x6c, - 0x69, 0x65, 0x6e, 0x74, 0x4d, 0x6f, 0x64, 0x65, 0x6c, 0x12, 0x1e, 0x0a, 0x0a, 0x63, 0x6c, 0x69, - 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x63, - 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x22, 0x4d, 0x0a, 0x15, 0x52, 0x65, 0x61, - 0x63, 0x74, 0x6f, 0x72, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x18, - 0x0a, 0x07, 0x53, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, - 0x07, 0x53, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x22, 0x2e, 0x0a, 0x10, 0x54, 0x55, 0x49, 0x43, - 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1a, 0x0a, 0x08, - 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, - 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x22, 0x6b, 0x0a, 0x11, 0x54, 0x55, 0x49, 0x43, - 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1a, 0x0a, - 0x08, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, - 0x08, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x65, 0x72, - 0x76, 0x65, 0x72, 0x49, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x65, 0x72, - 0x76, 0x65, 0x72, 0x49, 0x70, 0x12, 0x1e, 0x0a, 0x0a, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x50, - 0x6f, 0x72, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0a, 0x73, 0x65, 0x72, 0x76, 0x65, - 0x72, 0x50, 0x6f, 0x72, 0x74, 0x32, 0xb3, 0x01, 0x0a, 0x09, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, - 0x61, 0x6b, 0x65, 0x12, 0x58, 0x0a, 0x1d, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x43, 0x6c, - 0x69, 0x65, 0x6e, 0x74, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x48, 0x61, 0x6e, - 0x64, 0x6c, 0x65, 0x72, 0x12, 0x1a, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x61, 0x63, - 0x74, 0x6f, 0x72, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x1b, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x43, - 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x4c, 0x0a, - 0x19, 0x54, 0x55, 0x49, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, - 0x65, 0x72, 0x79, 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x12, 0x16, 0x2e, 0x67, 0x72, 0x70, - 0x63, 0x2e, 0x54, 0x55, 0x49, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x54, 0x55, 0x49, 0x43, 0x6c, 0x69, - 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x13, 0x5a, 0x11, 0x69, - 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x67, 0x72, 0x70, 0x63, - 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x12, 0x04, 0x67, 0x72, 0x70, 0x63, 0x22, 0x4b, 0x0a, 0x0d, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x63, 0x6c, 0x69, 0x65, 0x6e, + 0x74, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x63, 0x6c, 0x69, 0x65, 0x6e, + 0x74, 0x49, 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, + 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x54, + 0x79, 0x70, 0x65, 0x22, 0x4c, 0x0a, 0x0e, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x49, + 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x49, + 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x50, 0x6f, 0x72, 0x74, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x50, 0x6f, 0x72, + 0x74, 0x32, 0x50, 0x0a, 0x09, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x12, 0x43, + 0x0a, 0x16, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, + 0x79, 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x12, 0x13, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, + 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, + 0x67, 0x72, 0x70, 0x63, 0x2e, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x42, 0x13, 0x5a, 0x11, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, + 0x70, 0x6b, 0x67, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -323,20 +166,16 @@ func file_internal_pkg_grpc_server_proto_rawDescGZIP() []byte { return file_internal_pkg_grpc_server_proto_rawDescData } -var file_internal_pkg_grpc_server_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_internal_pkg_grpc_server_proto_msgTypes = make([]protoimpl.MessageInfo, 2) var file_internal_pkg_grpc_server_proto_goTypes = []interface{}{ - (*ReactorClientRequest)(nil), // 0: grpc.ReactorClientRequest - (*ReactorClientResponse)(nil), // 1: grpc.ReactorClientResponse - (*TUIClientRequest)(nil), // 2: grpc.TUIClientRequest - (*TUIClientResponse)(nil), // 3: grpc.TUIClientResponse + (*ClientRequest)(nil), // 0: grpc.ClientRequest + (*ClientResponse)(nil), // 1: grpc.ClientResponse } var file_internal_pkg_grpc_server_proto_depIdxs = []int32{ - 0, // 0: grpc.handshake.ReactorClientDiscoveryHandler:input_type -> grpc.ReactorClientRequest - 2, // 1: grpc.handshake.TUIClientDiscoveryHandler:input_type -> grpc.TUIClientRequest - 1, // 2: grpc.handshake.ReactorClientDiscoveryHandler:output_type -> grpc.ReactorClientResponse - 3, // 3: grpc.handshake.TUIClientDiscoveryHandler:output_type -> grpc.TUIClientResponse - 2, // [2:4] is the sub-list for method output_type - 0, // [0:2] is the sub-list for method input_type + 0, // 0: grpc.handshake.ClientDiscoveryHandler:input_type -> grpc.ClientRequest + 1, // 1: grpc.handshake.ClientDiscoveryHandler:output_type -> grpc.ClientResponse + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type 0, // [0:0] is the sub-list for extension type_name 0, // [0:0] is the sub-list for extension extendee 0, // [0:0] is the sub-list for field type_name @@ -349,7 +188,7 @@ func file_internal_pkg_grpc_server_proto_init() { } if !protoimpl.UnsafeEnabled { file_internal_pkg_grpc_server_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ReactorClientRequest); i { + switch v := v.(*ClientRequest); i { case 0: return &v.state case 1: @@ -361,31 +200,7 @@ func file_internal_pkg_grpc_server_proto_init() { } } file_internal_pkg_grpc_server_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ReactorClientResponse); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_internal_pkg_grpc_server_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*TUIClientRequest); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_internal_pkg_grpc_server_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*TUIClientResponse); i { + switch v := v.(*ClientResponse); i { case 0: return &v.state case 1: @@ -403,7 +218,7 @@ func file_internal_pkg_grpc_server_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_internal_pkg_grpc_server_proto_rawDesc, NumEnums: 0, - NumMessages: 4, + NumMessages: 2, NumExtensions: 0, NumServices: 1, }, diff --git a/internal/pkg/grpc/server.proto b/internal/pkg/grpc/server.proto index 1366128..8272d9b 100644 --- a/internal/pkg/grpc/server.proto +++ b/internal/pkg/grpc/server.proto @@ -4,28 +4,15 @@ package grpc; option go_package = "internal/pkg/grpc"; service handshake { - rpc ReactorClientDiscoveryHandler(ReactorClientRequest) returns (ReactorClientResponse); - rpc TUIClientDiscoveryHandler(TUIClientRequest) returns (TUIClientResponse); + rpc ClientDiscoveryHandler(ClientRequest) returns (ClientResponse); } -message ReactorClientRequest { +message ClientRequest { uint32 clientId = 1; - string clientIp = 2; - int32 clientPort = 3; - string clientModel = 4; + string clientType = 2; } -message ReactorClientResponse { +message ClientResponse { uint32 clientId = 1; - bool Success = 2; -} - -message TUIClientRequest { - uint32 clientId = 1; -} - -message TUIClientResponse { - uint32 clientId = 1; - string serverIp = 2; - int32 serverPort = 3; + uint32 serverPort = 2; } diff --git a/internal/pkg/grpc/server_grpc.pb.go b/internal/pkg/grpc/server_grpc.pb.go index 3f262ec..eaa3c5d 100644 --- a/internal/pkg/grpc/server_grpc.pb.go +++ b/internal/pkg/grpc/server_grpc.pb.go @@ -22,8 +22,7 @@ const _ = grpc.SupportPackageIsVersion7 // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type HandshakeClient interface { - ReactorClientDiscoveryHandler(ctx context.Context, in *ReactorClientRequest, opts ...grpc.CallOption) (*ReactorClientResponse, error) - TUIClientDiscoveryHandler(ctx context.Context, in *TUIClientRequest, opts ...grpc.CallOption) (*TUIClientResponse, error) + ClientDiscoveryHandler(ctx context.Context, in *ClientRequest, opts ...grpc.CallOption) (*ClientResponse, error) } type handshakeClient struct { @@ -34,18 +33,9 @@ func NewHandshakeClient(cc grpc.ClientConnInterface) HandshakeClient { return &handshakeClient{cc} } -func (c *handshakeClient) ReactorClientDiscoveryHandler(ctx context.Context, in *ReactorClientRequest, opts ...grpc.CallOption) (*ReactorClientResponse, error) { - out := new(ReactorClientResponse) - err := c.cc.Invoke(ctx, "/grpc.handshake/ReactorClientDiscoveryHandler", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *handshakeClient) TUIClientDiscoveryHandler(ctx context.Context, in *TUIClientRequest, opts ...grpc.CallOption) (*TUIClientResponse, error) { - out := new(TUIClientResponse) - err := c.cc.Invoke(ctx, "/grpc.handshake/TUIClientDiscoveryHandler", in, out, opts...) +func (c *handshakeClient) ClientDiscoveryHandler(ctx context.Context, in *ClientRequest, opts ...grpc.CallOption) (*ClientResponse, error) { + out := new(ClientResponse) + err := c.cc.Invoke(ctx, "/grpc.handshake/ClientDiscoveryHandler", in, out, opts...) if err != nil { return nil, err } @@ -56,8 +46,7 @@ func (c *handshakeClient) TUIClientDiscoveryHandler(ctx context.Context, in *TUI // All implementations must embed UnimplementedHandshakeServer // for forward compatibility type HandshakeServer interface { - ReactorClientDiscoveryHandler(context.Context, *ReactorClientRequest) (*ReactorClientResponse, error) - TUIClientDiscoveryHandler(context.Context, *TUIClientRequest) (*TUIClientResponse, error) + ClientDiscoveryHandler(context.Context, *ClientRequest) (*ClientResponse, error) mustEmbedUnimplementedHandshakeServer() } @@ -65,11 +54,8 @@ type HandshakeServer interface { type UnimplementedHandshakeServer struct { } -func (UnimplementedHandshakeServer) ReactorClientDiscoveryHandler(context.Context, *ReactorClientRequest) (*ReactorClientResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method ReactorClientDiscoveryHandler not implemented") -} -func (UnimplementedHandshakeServer) TUIClientDiscoveryHandler(context.Context, *TUIClientRequest) (*TUIClientResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method TUIClientDiscoveryHandler not implemented") +func (UnimplementedHandshakeServer) ClientDiscoveryHandler(context.Context, *ClientRequest) (*ClientResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method ClientDiscoveryHandler not implemented") } func (UnimplementedHandshakeServer) mustEmbedUnimplementedHandshakeServer() {} @@ -84,38 +70,20 @@ func RegisterHandshakeServer(s grpc.ServiceRegistrar, srv HandshakeServer) { s.RegisterService(&Handshake_ServiceDesc, srv) } -func _Handshake_ReactorClientDiscoveryHandler_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(ReactorClientRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(HandshakeServer).ReactorClientDiscoveryHandler(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/grpc.handshake/ReactorClientDiscoveryHandler", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(HandshakeServer).ReactorClientDiscoveryHandler(ctx, req.(*ReactorClientRequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _Handshake_TUIClientDiscoveryHandler_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(TUIClientRequest) +func _Handshake_ClientDiscoveryHandler_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ClientRequest) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(HandshakeServer).TUIClientDiscoveryHandler(ctx, in) + return srv.(HandshakeServer).ClientDiscoveryHandler(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/grpc.handshake/TUIClientDiscoveryHandler", + FullMethod: "/grpc.handshake/ClientDiscoveryHandler", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(HandshakeServer).TUIClientDiscoveryHandler(ctx, req.(*TUIClientRequest)) + return srv.(HandshakeServer).ClientDiscoveryHandler(ctx, req.(*ClientRequest)) } return interceptor(ctx, in, info, handler) } @@ -128,12 +96,8 @@ var Handshake_ServiceDesc = grpc.ServiceDesc{ HandlerType: (*HandshakeServer)(nil), Methods: []grpc.MethodDesc{ { - MethodName: "ReactorClientDiscoveryHandler", - Handler: _Handshake_ReactorClientDiscoveryHandler_Handler, - }, - { - MethodName: "TUIClientDiscoveryHandler", - Handler: _Handshake_TUIClientDiscoveryHandler_Handler, + MethodName: "ClientDiscoveryHandler", + Handler: _Handshake_ClientDiscoveryHandler_Handler, }, }, Streams: []grpc.StreamDesc{}, diff --git a/internal/pkg/reactor/monitoring.go b/internal/pkg/reactor/monitoring.go index d82005f..07cbfd8 100644 --- a/internal/pkg/reactor/monitoring.go +++ b/internal/pkg/reactor/monitoring.go @@ -70,15 +70,3 @@ func (c *Coordinator) Register() { } logging.Debug(logging.DStart, "Listening for pings on %v:%v\n",ip,c.hwinfo.Port) } - -func (c *Coordinator) GetReactorStatus(ctx context.Context, ping *pb.ReactorStatusRequest) (*pb.ReactorStatusResponse, error) { - // status request handler - devs := []*pb.Device{} - resp := &pb.ReactorStatusResponse{Id:c.Id,Devices:devs} - devStatus := c.GetStatus() - for _,v := range devStatus { - d := &pb.Device{Addr:int32(v.Addr),Type:v.Type,Status:v.Status,Data:v.Data} - resp.Devices = append(resp.Devices,d) - } - return resp, nil -} diff --git a/internal/pkg/reactor/rlcoordinator.go b/internal/pkg/reactor/rlcoordinator.go index eb7bf0e..d5ebc74 100644 --- a/internal/pkg/reactor/rlcoordinator.go +++ b/internal/pkg/reactor/rlcoordinator.go @@ -23,14 +23,22 @@ import ( type Coordinator struct { *server - *hwinfo + *hw Devices *DeviceManagers // struct for fine grain locking Err chan error mu sync.Mutex + *Timeout Active active pb.UnimplementedMonitoringServer } +type Timeout struct { + Alert chan bool + LastSeen time.Time + TO time.Duration + sync.Mutex +} + type active struct { bool int @@ -43,7 +51,7 @@ type server struct { Port int } -type hwinfo struct { +type hw struct { // store reactor info Ip string Port int @@ -91,18 +99,10 @@ func NewCoordinator(ip string,port int,ch chan error) *Coordinator { sen.Managers = make(map[int]DeviceManager) c := &Coordinator{Err:ch,Devices:sen} c.server = serv - c.hwinfo = &hwinfo{} + c.hw = &hw{} return c } -type Hardware interface { - GetId() uint32 - GetIp() string - GetBus() int - GetModel() string - GetPort() int -} - func GetHWInfo() (Hardware, error) { return system.NewHWinfo() } @@ -110,15 +110,11 @@ func GetHWInfo() (Hardware, error) { func (c *Coordinator) Start() { // should discover hwinfo and sensors on its own // now setting up sensor managers - hw, err := GetHWInfo() // locking provided by struct is only useful on init - if err != nil { - c.Err <-err - } // setting up hw stuff - c.hwinfo.Ip = hw.GetIp() //get should prevent empty data - c.Id = hw.GetId() - c.Model = hw.GetModel() - c.Bus = hw.GetBus() + c.hw.Ip = system.GetIp() //get should prevent empty data + c.Id = system.GetId() + c.Model = system.GetModel() + c.Bus = system.GetBus() c.Register() go c.Monitor() go c.Connect() @@ -176,16 +172,15 @@ func (c *Coordinator) Connect() { } defer conn.Close() client := pb.NewHandshakeClient(conn) - req := &pb.ReactorClientRequest{ClientId:c.Id,ClientIp:c.hwinfo.Ip,ClientPort:int32(c.hwinfo.Port),ClientModel:c.Model} - resp, err := client.ReactorClientDiscoveryHandler(context.Background(), req) + req := &pb.ClientRequest{ClientId:c.Id,ClientType:"reactor"} + resp, err := client.ClientDiscoveryHandler(context.Background(), req) if err != nil { c.Err <-err } - if resp.GetSuccess() { - logging.Debug(logging.DClient,"Central server reached") - } else { - c.Err <-errors.New("Failed to reach central server!") - } + c.Port = resp.GetPort() + logging.Debug(logging.DClient,"RLC Central server reached, supplied port %v",c.Port) + c.PingReset() + go c.Timeoutd() } func (c *Coordinator) Timeout() int { @@ -200,3 +195,32 @@ func (c *Coordinator) Timeout() int { return 0 } } + +func (t *Timeout) PingReset() { + t.Lock() + defer t.Unlock() + t.LastSeen = time.Now() +} + +func (c *Coordinator) Timeoutd() { + for c.IsActive() { + if sleep, elapsed := t.Elapsed(); elapsed { + logging.Debug(logging.DClient, "RCO Not responding") + break; + } else { + time.Sleep(sleep) + } + } +} + +func (t *Timeout) Elapsed() (time.Duration, bool) { + t.Lock() + defer t.Unlock() + now := time.Now() + if now.After(t.LastSeen.Add(t.TO)) { + return 0 * time.Second, true + } else { + sleep := t.LastSeen.Add(t.To).Sub(now) + return sleep, false + } +} diff --git a/internal/pkg/server/coordinator.go b/internal/pkg/server/coordinator.go index 3faed9d..7d916e4 100644 --- a/internal/pkg/server/coordinator.go +++ b/internal/pkg/server/coordinator.go @@ -7,7 +7,7 @@ import ( ) // this package creates coordinators responsible for keeping track of active clients and invoking managers -type CreateManager interface { +type SubCoordinator interface { Start() NewManager(*Client, chan error) GeneralManager } @@ -18,23 +18,27 @@ type GeneralManager interface { } type Coordinator struct { + Port int // port that we set up gRPC endpoint on *Managers - CreateManager + SubCoordinator + *SystemViewer Err chan error - Pc chan int } type Managers struct { Directory map[uint32]GeneralManager - sync.Mutex + sync.RWMutex // potential perf } + // interface stuff -func NewCoordinator(manager CreateManager, err chan error) *Coordinator { +func NewCoordinator(clientType string, sys *SystemViewer,grpcServer *grpc.Server, err chan error) *Coordinator { d := make(map[uint32]GeneralManager) m := &Managers{Directory:d} c := &Coordinator{Err:err} - c.CreateManager = manager + c.SubCoordinator = NewSubCoordinator(clientType, err) + c.SystemViewer = sys c.Managers = m + go c.Register(clientType, grpcServer) return c } @@ -46,58 +50,98 @@ func (c *Coordinator) Start() { func (c *Coordinator) ClientHandler(cl *Client) int { // (creates and) notifies manager of client connection - m := c.GetManager(cl) - go m.Start(cl) - return m.GetPort() + m := c.LoadManager(cl) + go func (mn GeneralManager, c *client) { + mn.IncomingClient <-c + }(m, cl) + return c.Port } -func (c *Coordinator) GetManager(cl *Client) GeneralManager { +func (c *Coordinator) LoadManager(cl *Client) GeneralManager { + // shouldn't happen all that often so should be fine to lock c.Managers.Lock() defer c.Managers.Unlock() var exists bool var m GeneralManager if m, exists = c.Managers.Directory[cl.Id]; !exists { // manager in memory - m = c.NewManager(cl, c.Err) + m = c.NewManager(cl, c.SystemViewer, c.Err) c.Managers.Directory[cl.Id] = m + go m.Start() } return m } + +func (c *Coordinator) GetManager(id uint32) GeneralManager { + // just read locks and reuturns + c.Managers.RLock() + defer c.Managers.RUnlock() + return c.Managers.Directory[id] +} + +func NewSubCoordinator(clientType string, err chan error) SubCoordinator { + + if clientType == "reactor" { + return &reactorCoordinator{Sys:sys} + } else if clientType == "tui" { + return &tuiCoordinator{Sys:sys} + } + return SubCoordinator{} +} + +func (c *Coordinator) Register(clientType string, grpcServer *grpc.Server) { + if clientType == "reactor" { + pb.RegisterManagementServer(grpcServer,c) + logging.Debug(logging.DClient, "RCO ready for client requests") + } else if clientType == "tui" { + pb.RegisterMonitoringServer(grpcServer,c) + logging.Debug(logging.DClient, "TCO ready for client requests") + } +} +// creating sub coordinators for associated gRPC handlers // reactor coordinator type reactorCoordinator struct { - //empty unexported for method - Sys *SystemViewer + pb.UnimplementedMonitoringServer } func (r *reactorCoordinator) Start() { logging.Debug(logging.DStart,"RCO 01 Starting!") } -func (r *reactorCoordinator) NewManager(cl *Client, err chan error) GeneralManager { +func (r *reactorCoordinator) NewManager(cl *Client, sys *SystemViewer, err chan error) GeneralManager { logging.Debug(logging.DClient, "RCO 01 starting manager for %v client %v",cl.Type,cl.Id) - return NewReactorManager(cl,r.Sys,err) + return NewReactorManager(cl,sys,err) } -func NewReactorCoordinator(sys *SystemViewer, err chan error) *Coordinator { - return NewCoordinator(&reactorCoordinator{Sys:sys}, err) +func (c *Coordinator) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusRequest) (*pb.ReactorStatusResponse, error) { + m := c.GetManager(req.GetId()) + if m == GeneralManager{} { + return &pb.ReactorStatusResponse, errors.New("Manager doesn't exists for that client") + } + return m.ReactorStatusHandler(ctx, req) } + //tui coordinator type tuiCoordinator struct { - //can add fields as needed - Ip string - Sys *SystemViewer + pb.UnimplementedManagementServer } func (t *tuiCoordinator) Start() { logging.Debug(logging.DStart,"TCO 01 Starting!") } -func (t *tuiCoordinator) NewManager(cl *Client, err chan error) GeneralManager { +func (t *tuiCoordinator) NewManager(cl *Client, sys *SystemViewer, err chan error) GeneralManager { logging.Debug(logging.DClient, "TCO 01 starting manager for %v client %v",cl.Type,cl.Id) - return NewTUIManager(t.Ip,cl,t.Sys,err) + return NewTUIManager(cl,sys,err) } -func NewTUICoordinator(ip string, sys *SystemViewer, err chan error) *Coordinator { - return NewCoordinator(&tuiCoordinator{Ip:ip,Sys:sys}, err) +func (c *Coordinator) GetDevices(ctx context.Context, req *pb.GetDevicesRequest) (*pb.GetDevicesResponse, error) { + // grpc handler to fwd to manager + m := c.GetManager(req.GetId()) + if m == GeneralManager{} { + // doesnt exist for some reason + return &pb.GetDevicesRequest{}, errors.New("Manager doesn't exists for client") + } + return m.GetDevices(ctx,req) } diff --git a/internal/pkg/server/listener.go b/internal/pkg/server/listener.go index 7de6dcf..cd1ef65 100644 --- a/internal/pkg/server/listener.go +++ b/internal/pkg/server/listener.go @@ -11,21 +11,23 @@ import ( pb "FRMS/internal/pkg/grpc" ) -// the goal here is to set up a gRPC server to respond to client pings with their IP and to establish a new manager for that specific client +/* +Originally this package served as a client listener to route requests +I am going to repurpose this to serve as a listener for all gRPC requests +should simplify interfaces +*/ -// going to rename shit to be more general type Listener struct { // exporting for easy use in the short term - // Reactor map[uint32]*ReactorManager this will go in eventual "coordinator" struct - Ip string Port int - Coordinators map[string]*Coordinator + Coordinators map[string]chan *Client + CLis *grpc.Server Sys *SystemViewer Err chan error pb.UnimplementedHandshakeServer } type Client struct { - // can use general client and leave unset fields nil + // general client struct to store reqs from reactors/tui Ip string Port int Id uint32 @@ -33,20 +35,9 @@ type Client struct { Type string } - -func GetIp(e string) (string, error) { - return system.GetIp(e) -} - -func NewListener(ifconfig string,ch chan error) (*Listener, error) { - //m := make(map[uint32]*ReactorManager) - var ip string - var err error - if ip, err = GetIp(ifconfig); err != nil { - return &Listener{}, err - } +func NewListener(ch chan error) *Listener { c := make(map[string]*Coordinator) - l := &Listener{Ip:ip,Err:ch} + l := &Listener{Err:ch} l.Coordinators = c l.Sys = NewSystemViewer() return l, nil @@ -59,54 +50,45 @@ func (l *Listener) Start() { } go l.Sys.Start() // listener started and grpc handler registered - logging.Debug(logging.DStart,"Started listener on %v:%v\n",l.Ip,l.Port) - fmt.Printf("==========================\n IP: %v\n",l.Ip) - fmt.Printf("==========================\n PORT: %v\n==========================\n",l.Port) + logging.Debug(logging.DStart,"Started client listener on port %v\n",l.Port) + //fmt.Printf("==========================\n PORT: %v\n==========================\n",l.Port) } func (l *Listener) Register() error { // creates a gRPC service and binds it to our handler - lis, err := net.Listen("tcp", fmt.Sprintf("%v:0",l.Ip)) // by binding to :0 we should get assigned an empty port + lis, err := net.Listen("tcp", fmt.Sprintf(":%v",l.Port)) // either binding to supplied port or binding to docker default if err != nil { return err } - l.Port = lis.Addr().(*net.TCPAddr).Port // getting the port we were assigned grpcServer := grpc.NewServer() pb.RegisterHandshakeServer(grpcServer, l) go grpcServer.Serve(lis) - return nil -} + logging.Debug(logging.DStart, "LIS Registered on port %v", l.Port) -func (l *Listener) ReactorClientDiscoveryHandler(ctx context.Context, ping *pb.ReactorClientRequest) (*pb.ReactorClientResponse, error) { - // incoming reactor ping need to spawn coord - c := &Client{Ip:ping.GetClientIp(),Model:ping.GetClientModel(),Type:"reactor",Port:int(ping.GetClientPort()),Id:ping.GetClientId()} - logging.Debug(logging.DClient, "%v %v has connected from %v:%v\n",c.Type,c.Id,c.Ip,c.Port) - coord, ok := l.Coordinators["reactor"] - if !ok { - logging.Debug(logging.DSpawn,"CCO 01 Created RCO") - coord = NewReactorCoordinator(l.Sys, l.Err) - l.Coordinators["reactor"] = coord - coord.Start() + lis, err = net.Listen("tcp", fmt.Sprintf(":%v",l.Port+1)) // either binding to supplied port or binding to docker default + if err != nil { + return err } - go coord.ClientHandler(c) - // we dont handle any actual logic about the creation so we just respon true if the request was received - return &pb.ReactorClientResponse{ClientId:c.Id,Success:true}, nil + grpcServer = grpc.NewServer() + l.CLis = grpcServer + go grpcServer.Serve(lis) + logging.Debug(logging.DStart, "LIS Coordinator server registered on port %v", l.Port + 1) + + return nil } -func (l *Listener) TUIClientDiscoveryHandler(ctx context.Context, ping *pb.TUIClientRequest) (*pb.TUIClientResponse, error) { - c := &Client{Type:"tui",Id:ping.GetClientId()} - var coord *Coordinator - var ok bool - coord, ok = l.Coordinators["tui"] +func (l *Listener) ClientDiscoveryHandler(ctx context.Context, ping *pb.ClientRequest) (*pb.ClientResponse, error) { + // incoming reactor ping need to spawn coord + c := &Client{Id:ping.GetClientId(),Type:ping.GetClientType()} + logging.Debug(logging.DClient, "%v %v has connected\n",c.Type,c.Id) + coord, ok := l.Coordinators[c.Type] if !ok { - logging.Debug(logging.DSpawn,"CCO 01 Created TCO") - coord = NewTUICoordinator(l.Ip, l.Sys, l.Err) - l.Coordinators["tui"] = coord - coord.Start() + logging.Debug(logging.DSpawn,"CCO 01 Created Coordinator") + coord = NewCoordinator(c.Type, l.Sys, l.CLis, l.Err) + l.Coordinators[c.Type] = coord + go coord.Start() } port := coord.ClientHandler(c) - logging.Debug(logging.DClient,"%v %v has connected from %v:%v\n",c.Type,c.Id,l.Ip,port) - r := &pb.TUIClientResponse{ClientId:c.Id,ServerIp:l.Ip,ServerPort:int32(port)} - return r, nil + // return the port for the incoming requests + return &pb.ClientResponse{ClientId:c.Id,ServerPort:uint32(port)}, nil } - diff --git a/internal/pkg/server/manager.go b/internal/pkg/server/manager.go index 1f76415..4af6810 100644 --- a/internal/pkg/server/manager.go +++ b/internal/pkg/server/manager.go @@ -14,7 +14,8 @@ import ( type Manager struct { *Client // gives access to c.Ip c.Id etc - Hb time.Duration + IncomingClient chan *Client + Hb time.Duration // used for managing hb timer for client Active active Sig chan bool Err chan error @@ -26,19 +27,18 @@ type active struct{ int } -func NewManager(err chan error) *Manager { +func NewManager(cl chan *Client, err chan error) *Manager { hb := time.Duration(1 * time.Second) //hb to - m := &Manager{Hb:hb,Err:err} + m := &Manager{Hb:hb,Err:err,IncomingClient:cl} return m } -func (m *Manager) Start(cl *Client) { - // establish connection with client and start pinging at set intervals - m.Client = cl +func (m *Manager) Start() { if !m.Activate() { // manager already running m.Err <-errors.New("Manager already running!") } // if we get here, manager is atomically activated and we can ensure start wont run again + go m.ClientConnections() } func (m *Manager) Exit() { @@ -48,6 +48,16 @@ func (m *Manager) Exit() { } } +func (m *Manager) ClientConnections() { + for m.IsActive { + cl := <-m.IncomingClient + if m.IsActive { + logging.Debug(logging.DClient,"MAN Updating client old (%v) new (%v)",m.Id,cl.Id) + m.Client = cl + } + } +} + // reactor manager atomic operations func (m *Manager) IsActive() bool { diff --git a/internal/pkg/server/reactormanager.go b/internal/pkg/server/reactormanager.go index 68836b3..9cd68ea 100644 --- a/internal/pkg/server/reactormanager.go +++ b/internal/pkg/server/reactormanager.go @@ -50,10 +50,14 @@ func (r *ReactorManager) Exit() { r.Manager.Exit() logging.Debug(logging.DExit, "RMA %v exiting", r.Id) go r.StatusMon.Send(&DeviceInfo{Id:r.Id,Type:"Reactor",Status:"[red]OFFLINE[white]",Data:fmt.Sprintf("Last Seen %v",time.Now().Format("Mon at 03:04:05pm MST"))},"Reactor") -} - -func (r *ReactorManager) GetPort() int { - return 0 + r.Devs.Lock() + defer r.Devs.Unlock() + for _, d := range r.Devs { + newd := d + newd.Status = "[yellow]UNKOWN[white]" + r.Devs[newd.Id] = newd + go r.StatusMon.Send(newd,"Device") + } } func (r *ReactorManager) Connect() *grpc.ClientConn { @@ -91,6 +95,18 @@ func (r *ReactorManager) Connect() *grpc.ClientConn { return conn } +func (r *ReactorManager) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusRequest) (*pb.ReactorStatusResponse, error) { + // function client will call to update reactor information + go r.PingReset() + for _, dev := range req.GetDevices() { + d := &DeviceInfo{Id:uint32(dev.GetAddr()),Type:dev.GetType(),Status:dev.GetStatus(),Data:dev.GetData()} + go r.UpdateDevice(d) + } + return &pb.ReactorStatusResponse{Id:r.Id}, nil +} + + +/* func (r *ReactorManager) Monitor(conn *grpc.ClientConn) { defer conn.Close() client := pb.NewMonitoringClient(conn) @@ -118,7 +134,7 @@ func (r *ReactorManager) Monitor(conn *grpc.ClientConn) { time.Sleep(r.Hb) // time between sensor pings } } - +*/ func (r *ReactorManager) UpdateDevice(d *DeviceInfo) { r.devstatus.Lock() defer r.devstatus.Unlock() diff --git a/internal/pkg/server/tuimanager.go b/internal/pkg/server/tuimanager.go index dbda3f7..646b1e9 100644 --- a/internal/pkg/server/tuimanager.go +++ b/internal/pkg/server/tuimanager.go @@ -94,28 +94,6 @@ func (t *Timeout) Elapsed() (time.Duration, bool) { } } -func (t *TUIManager) Register() { - lis, err := net.Listen("tcp", fmt.Sprintf("%v:0",t.Ip)) - if err != nil { - log.Fatal(err) - } - grpcServer := grpc.NewServer() - pb.RegisterManagementServer(grpcServer,t) - go grpcServer.Serve(lis) - // send port now that server is up - t.Port.int = lis.Addr().(*net.TCPAddr).Port - go func(ch chan int,p int) { - ch <-p - }(t.Port.Chan, t.Port.int) - logging.Debug(logging.DClient, "TMA %v ready for client conn", t.Id) - // up and running -} - -func (t *TUIManager) GetPort() int { - port := <-t.Port.Chan - return port -} - // tui client requests and logic will be down here func (t *TUIManager) GetDevices(ctx context.Context, req *pb.GetDevicesRequest) (*pb.GetDevicesResponse, error) { diff --git a/internal/pkg/system/hwinfo.go b/internal/pkg/system/hwinfo.go index da58735..675c73d 100644 --- a/internal/pkg/system/hwinfo.go +++ b/internal/pkg/system/hwinfo.go @@ -98,6 +98,25 @@ func GetPort() (int, error) { } } +func GetBus() (int, error) { + bus := map[string]int{"raspberrypi":1,"beaglebone":2} + devname := "lshw -C system 2>/dev/null | head -n 1" + var stderr, out bytes.Buffer + cmd := exec.Command("bash","-c",devname) + cmd.Stdout = &out + cmd.Stderr = &stderr + if err := cmd.Run(); err != nil { + return 0, err + } + b := out.String() + b = stings.Trim(b," \n") + if bs, ok := bus[b]; !ok { + return 0, errors.New("No bus for dev %v", b) + } else { + return bs, nil + } +} + func (h *HWinfo) Get() error { // responsible for filling out struct bus := map[string]int{"raspberrypi":1,"beaglebone":2} // eventually will replace this with a config file diff --git a/internal/pkg/tui/client.go b/internal/pkg/tui/client.go index eb5ffff..78dbf07 100644 --- a/internal/pkg/tui/client.go +++ b/internal/pkg/tui/client.go @@ -82,14 +82,13 @@ func (t *TUIClient) Connect() { } // handle handshake logic here client := pb.NewHandshakeClient(conn) - req := &pb.TUIClientRequest{ClientId:t.Id} - resp, err := client.TUIClientDiscoveryHandler(context.Background(),req) + req := &pb.ClientRequest{ClientId:t.Id,Type:"tui"} + resp, err := client.ClientDiscoveryHandler(context.Background(),req) if err != nil { log.Fatal(err) } conn.Close() // closing old connection // setting up server connection with provided port - t.Ip = resp.GetServerIp() t.Port = int(resp.GetServerPort()) for { conn, err = grpc.Dial(fmt.Sprintf("%v:%v",t.Ip,t.Port),opts...) diff --git a/notes b/notes index 0a12be5..82da71d 100644 --- a/notes +++ b/notes @@ -864,3 +864,15 @@ general implementation details - request for devices gets you the current state and adds your listener to the echo chain so that you recieve any updates - need to ensure sends can complete even if the manager is dead - close the channel? + + + +docker time +Need to refactor code so its eaisier to run in both envs +Refactoring server code now + - bind all gRPC services to the same IP:port to make it efficent + - funnel all rpcs through the one external port + - could even use nginx to route from default :80 +is there ever a situation where I would need to run this not on docker? + - can i just hardcode for docker and then rely on nginx for routing etc? + diff --git a/notes2 b/notes2 deleted file mode 100644 index e0175e9..0000000 --- a/notes2 +++ /dev/null @@ -1,42 +0,0 @@ -alright time to do this correctly because this is brutal - -7/18 -DM -> RLC -> RM -> Sys -> TM -> TC -this basic struct will guide us - -Work backwards to start from what the TC wants and work to how to deliver - -TC -- wants to know what the overall reactor status are and the associated device status on selection -- its OK if this is a bit out of date, but probably want a synchronized view between clients - - could be a performance hit but f it - -TM -Needs to: -- provide up to date information about the reactors connected and know which reactor is selected and provide updates for it -Relies on -- Needs a central struct, 2 methods for data - - call on request to query for ir - - keep its own copy and empty its buffer on request - - this is probably the best route - -So TM really needs an intermediate struct a sort of device buffer -- I like the thing we had before with the channels and embedding but it needs to be fleshed out - -All of this resides on the server which we can use to our advantage -Lets flesh out the TM a bit more - -TM sys struct -this struct will attach itself to a channel and buffer updates for that channel -has to have a few methods - - GetBuffer() returns any new entries and empties the buffer - - ListenTo(reactorID) starts listening to a reactor and returns any devices on that branch so the client can refresh its list - - Refresh() forces the client to get all reactors and devices if listening to any branch -NewSysMonitor() has to return struct and current client status to load fortui client - -this client will be given to a TUI at creation - -TUI can then call associated commands to get updates for the client - -TM sys struct just relies on underlying sys implementation which seems to be working south of the RM so i am just going to work on this TM sys struct for now -