From f83bf25645d18d59c25ec888cd6ce2355e2157d4 Mon Sep 17 00:00:00 2001 From: Keegan Date: Mon, 25 Jul 2022 17:55:43 -0400 Subject: [PATCH] working with hardcoded ports, need a cleaner way for those to be assigned --- :wq | 18 --- Dockerfile.server | 1 - cmd/server/main.go | 19 +-- docker-compose.yml | 3 +- internal/pkg/grpc/monitoring.pb.go | 96 +++++++------- internal/pkg/grpc/monitoring.proto | 6 +- internal/pkg/grpc/monitoring_grpc.pb.go | 12 +- internal/pkg/reactor/monitoring.go | 31 +++-- internal/pkg/reactor/rlcoordinator.go | 161 +++++++++++++----------- internal/pkg/server/coordinator.go | 143 ++++++++++++++------- internal/pkg/server/listener.go | 27 ++-- internal/pkg/server/manager.go | 29 +++-- internal/pkg/server/reactormanager.go | 21 ++-- internal/pkg/server/tuimanager.go | 25 ++-- internal/pkg/system/hwinfo.go | 62 +++------ internal/pkg/tui/client.go | 2 +- 16 files changed, 341 insertions(+), 315 deletions(-) delete mode 100644 :wq diff --git a/:wq b/:wq deleted file mode 100644 index 8272d9b..0000000 --- a/:wq +++ /dev/null @@ -1,18 +0,0 @@ -syntax = "proto3"; -package grpc; - -option go_package = "internal/pkg/grpc"; - -service handshake { - rpc ClientDiscoveryHandler(ClientRequest) returns (ClientResponse); -} - -message ClientRequest { - uint32 clientId = 1; - string clientType = 2; -} - -message ClientResponse { - uint32 clientId = 1; - uint32 serverPort = 2; -} diff --git a/Dockerfile.server b/Dockerfile.server index abec284..b09f64a 100644 --- a/Dockerfile.server +++ b/Dockerfile.server @@ -11,7 +11,6 @@ RUN go build -o /server ./cmd/server/main.go EXPOSE 2022 EXPOSE 2023 -EXPOSE 2024 CMD [ "/server" ] diff --git a/cmd/server/main.go b/cmd/server/main.go index cecb068..bac90f2 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -3,7 +3,7 @@ package main import ( _"net/http" _ "net/http/pprof" - "flag" + //"flag" "log" "os" "fmt" @@ -15,8 +15,8 @@ type listener interface { Start() } -func NewListener(ch chan error) listener { - return server.NewListener(ch) +func NewListener(ch chan error, port int) listener { + return server.NewListener(ch, port) } func main() { @@ -28,17 +28,18 @@ func main() { // }() ch := make(chan error) // creating listener - l := NewListener() - - if port := os.Getenv("gRPC_PORT"); port == 0 { - l.Port = 2022 // default docker port + var lport int + if port := os.Getenv("gRPC_PORT"); port == "" { + lport = 2022 // default docker port } + fmt.Printf("Listening on %v\n", lport) + l := NewListener(ch,lport) - db := os.Getenv("DATABASE_URL") // database url + //db := os.Getenv("DATABASE_URL") // database url go l.Start() logging.Debug(logging.DStart, "CCO 01 Server started") - err = <-ch // blocking to wait for any errors and keep alive otherwise + err := <-ch // blocking to wait for any errors and keep alive otherwise if err != nil { log.Fatal(err) } diff --git a/docker-compose.yml b/docker-compose.yml index 25f52cc..3573449 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -5,6 +5,5 @@ services: ports: - "2022:2022" - "2023:2023" - - "2024:2024" environment: - - DOCKER=true + - VERBOSE=1 diff --git a/internal/pkg/grpc/monitoring.pb.go b/internal/pkg/grpc/monitoring.pb.go index 84943fa..4b8a331 100644 --- a/internal/pkg/grpc/monitoring.pb.go +++ b/internal/pkg/grpc/monitoring.pb.go @@ -20,7 +20,7 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) -type ReactorStatusRequest struct { +type ReactorStatusResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields @@ -28,8 +28,8 @@ type ReactorStatusRequest struct { Id uint32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` } -func (x *ReactorStatusRequest) Reset() { - *x = ReactorStatusRequest{} +func (x *ReactorStatusResponse) Reset() { + *x = ReactorStatusResponse{} if protoimpl.UnsafeEnabled { mi := &file_internal_pkg_grpc_monitoring_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -37,13 +37,13 @@ func (x *ReactorStatusRequest) Reset() { } } -func (x *ReactorStatusRequest) String() string { +func (x *ReactorStatusResponse) String() string { return protoimpl.X.MessageStringOf(x) } -func (*ReactorStatusRequest) ProtoMessage() {} +func (*ReactorStatusResponse) ProtoMessage() {} -func (x *ReactorStatusRequest) ProtoReflect() protoreflect.Message { +func (x *ReactorStatusResponse) ProtoReflect() protoreflect.Message { mi := &file_internal_pkg_grpc_monitoring_proto_msgTypes[0] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -55,19 +55,19 @@ func (x *ReactorStatusRequest) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use ReactorStatusRequest.ProtoReflect.Descriptor instead. -func (*ReactorStatusRequest) Descriptor() ([]byte, []int) { +// Deprecated: Use ReactorStatusResponse.ProtoReflect.Descriptor instead. +func (*ReactorStatusResponse) Descriptor() ([]byte, []int) { return file_internal_pkg_grpc_monitoring_proto_rawDescGZIP(), []int{0} } -func (x *ReactorStatusRequest) GetId() uint32 { +func (x *ReactorStatusResponse) GetId() uint32 { if x != nil { return x.Id } return 0 } -type ReactorStatusResponse struct { +type ReactorStatusPing struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields @@ -76,8 +76,8 @@ type ReactorStatusResponse struct { Devices []*Device `protobuf:"bytes,2,rep,name=devices,proto3" json:"devices,omitempty"` } -func (x *ReactorStatusResponse) Reset() { - *x = ReactorStatusResponse{} +func (x *ReactorStatusPing) Reset() { + *x = ReactorStatusPing{} if protoimpl.UnsafeEnabled { mi := &file_internal_pkg_grpc_monitoring_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -85,13 +85,13 @@ func (x *ReactorStatusResponse) Reset() { } } -func (x *ReactorStatusResponse) String() string { +func (x *ReactorStatusPing) String() string { return protoimpl.X.MessageStringOf(x) } -func (*ReactorStatusResponse) ProtoMessage() {} +func (*ReactorStatusPing) ProtoMessage() {} -func (x *ReactorStatusResponse) ProtoReflect() protoreflect.Message { +func (x *ReactorStatusPing) ProtoReflect() protoreflect.Message { mi := &file_internal_pkg_grpc_monitoring_proto_msgTypes[1] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -103,19 +103,19 @@ func (x *ReactorStatusResponse) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use ReactorStatusResponse.ProtoReflect.Descriptor instead. -func (*ReactorStatusResponse) Descriptor() ([]byte, []int) { +// Deprecated: Use ReactorStatusPing.ProtoReflect.Descriptor instead. +func (*ReactorStatusPing) Descriptor() ([]byte, []int) { return file_internal_pkg_grpc_monitoring_proto_rawDescGZIP(), []int{1} } -func (x *ReactorStatusResponse) GetId() uint32 { +func (x *ReactorStatusPing) GetId() uint32 { if x != nil { return x.Id } return 0 } -func (x *ReactorStatusResponse) GetDevices() []*Device { +func (x *ReactorStatusPing) GetDevices() []*Device { if x != nil { return x.Devices } @@ -198,28 +198,28 @@ var File_internal_pkg_grpc_monitoring_proto protoreflect.FileDescriptor var file_internal_pkg_grpc_monitoring_proto_rawDesc = []byte{ 0x0a, 0x22, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x69, 0x6e, 0x67, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x04, 0x67, 0x72, 0x70, 0x63, 0x22, 0x26, 0x0a, 0x14, 0x52, 0x65, - 0x61, 0x63, 0x74, 0x6f, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x02, - 0x69, 0x64, 0x22, 0x4f, 0x0a, 0x15, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x53, 0x74, 0x61, - 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, - 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x02, 0x69, 0x64, 0x12, 0x26, 0x0a, 0x07, 0x64, - 0x65, 0x76, 0x69, 0x63, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0c, 0x2e, 0x67, - 0x72, 0x70, 0x63, 0x2e, 0x44, 0x65, 0x76, 0x69, 0x63, 0x65, 0x52, 0x07, 0x64, 0x65, 0x76, 0x69, - 0x63, 0x65, 0x73, 0x22, 0x5c, 0x0a, 0x06, 0x44, 0x65, 0x76, 0x69, 0x63, 0x65, 0x12, 0x12, 0x0a, - 0x04, 0x61, 0x64, 0x64, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x04, 0x61, 0x64, 0x64, - 0x72, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, - 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x12, 0x0a, - 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x64, 0x61, 0x74, - 0x61, 0x32, 0x5d, 0x0a, 0x0a, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x69, 0x6e, 0x67, 0x12, - 0x4f, 0x0a, 0x14, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, - 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x12, 0x1a, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x52, - 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x61, 0x63, 0x74, - 0x6f, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x42, 0x13, 0x5a, 0x11, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x6b, 0x67, - 0x2f, 0x67, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x04, 0x67, 0x72, 0x70, 0x63, 0x22, 0x27, 0x0a, 0x15, 0x52, 0x65, + 0x61, 0x63, 0x74, 0x6f, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, + 0x02, 0x69, 0x64, 0x22, 0x4b, 0x0a, 0x11, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x53, 0x74, + 0x61, 0x74, 0x75, 0x73, 0x50, 0x69, 0x6e, 0x67, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x0d, 0x52, 0x02, 0x69, 0x64, 0x12, 0x26, 0x0a, 0x07, 0x64, 0x65, 0x76, 0x69, + 0x63, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0c, 0x2e, 0x67, 0x72, 0x70, 0x63, + 0x2e, 0x44, 0x65, 0x76, 0x69, 0x63, 0x65, 0x52, 0x07, 0x64, 0x65, 0x76, 0x69, 0x63, 0x65, 0x73, + 0x22, 0x5c, 0x0a, 0x06, 0x44, 0x65, 0x76, 0x69, 0x63, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x61, 0x64, + 0x64, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x04, 0x61, 0x64, 0x64, 0x72, 0x12, 0x12, + 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, + 0x70, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, + 0x74, 0x61, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x32, 0x5a, + 0x0a, 0x0a, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x69, 0x6e, 0x67, 0x12, 0x4c, 0x0a, 0x14, + 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x48, 0x61, 0x6e, + 0x64, 0x6c, 0x65, 0x72, 0x12, 0x17, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x61, 0x63, + 0x74, 0x6f, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x50, 0x69, 0x6e, 0x67, 0x1a, 0x1b, 0x2e, + 0x67, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x53, 0x74, 0x61, 0x74, + 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x13, 0x5a, 0x11, 0x69, 0x6e, + 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x62, + 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -236,14 +236,14 @@ func file_internal_pkg_grpc_monitoring_proto_rawDescGZIP() []byte { var file_internal_pkg_grpc_monitoring_proto_msgTypes = make([]protoimpl.MessageInfo, 3) var file_internal_pkg_grpc_monitoring_proto_goTypes = []interface{}{ - (*ReactorStatusRequest)(nil), // 0: grpc.ReactorStatusRequest - (*ReactorStatusResponse)(nil), // 1: grpc.ReactorStatusResponse + (*ReactorStatusResponse)(nil), // 0: grpc.ReactorStatusResponse + (*ReactorStatusPing)(nil), // 1: grpc.ReactorStatusPing (*Device)(nil), // 2: grpc.Device } var file_internal_pkg_grpc_monitoring_proto_depIdxs = []int32{ - 2, // 0: grpc.ReactorStatusResponse.devices:type_name -> grpc.Device - 0, // 1: grpc.monitoring.ReactorStatusHandler:input_type -> grpc.ReactorStatusRequest - 1, // 2: grpc.monitoring.ReactorStatusHandler:output_type -> grpc.ReactorStatusResponse + 2, // 0: grpc.ReactorStatusPing.devices:type_name -> grpc.Device + 1, // 1: grpc.monitoring.ReactorStatusHandler:input_type -> grpc.ReactorStatusPing + 0, // 2: grpc.monitoring.ReactorStatusHandler:output_type -> grpc.ReactorStatusResponse 2, // [2:3] is the sub-list for method output_type 1, // [1:2] is the sub-list for method input_type 1, // [1:1] is the sub-list for extension type_name @@ -258,7 +258,7 @@ func file_internal_pkg_grpc_monitoring_proto_init() { } if !protoimpl.UnsafeEnabled { file_internal_pkg_grpc_monitoring_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ReactorStatusRequest); i { + switch v := v.(*ReactorStatusResponse); i { case 0: return &v.state case 1: @@ -270,7 +270,7 @@ func file_internal_pkg_grpc_monitoring_proto_init() { } } file_internal_pkg_grpc_monitoring_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ReactorStatusResponse); i { + switch v := v.(*ReactorStatusPing); i { case 0: return &v.state case 1: diff --git a/internal/pkg/grpc/monitoring.proto b/internal/pkg/grpc/monitoring.proto index 794da85..b2d9e27 100644 --- a/internal/pkg/grpc/monitoring.proto +++ b/internal/pkg/grpc/monitoring.proto @@ -4,14 +4,14 @@ package grpc; option go_package = "internal/pkg/grpc"; service monitoring { - rpc ReactorStatusHandler(ReactorStatusRequest) returns (ReactorStatusResponse); + rpc ReactorStatusHandler(ReactorStatusPing) returns (ReactorStatusResponse); } -message ReactorStatusRequest { +message ReactorStatusResponse { uint32 id = 1; } -message ReactorStatusResponse { +message ReactorStatusPing { uint32 id = 1; repeated Device devices = 2; } diff --git a/internal/pkg/grpc/monitoring_grpc.pb.go b/internal/pkg/grpc/monitoring_grpc.pb.go index ff4293b..302a222 100644 --- a/internal/pkg/grpc/monitoring_grpc.pb.go +++ b/internal/pkg/grpc/monitoring_grpc.pb.go @@ -22,7 +22,7 @@ const _ = grpc.SupportPackageIsVersion7 // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type MonitoringClient interface { - ReactorStatusHandler(ctx context.Context, in *ReactorStatusRequest, opts ...grpc.CallOption) (*ReactorStatusResponse, error) + ReactorStatusHandler(ctx context.Context, in *ReactorStatusPing, opts ...grpc.CallOption) (*ReactorStatusResponse, error) } type monitoringClient struct { @@ -33,7 +33,7 @@ func NewMonitoringClient(cc grpc.ClientConnInterface) MonitoringClient { return &monitoringClient{cc} } -func (c *monitoringClient) ReactorStatusHandler(ctx context.Context, in *ReactorStatusRequest, opts ...grpc.CallOption) (*ReactorStatusResponse, error) { +func (c *monitoringClient) ReactorStatusHandler(ctx context.Context, in *ReactorStatusPing, opts ...grpc.CallOption) (*ReactorStatusResponse, error) { out := new(ReactorStatusResponse) err := c.cc.Invoke(ctx, "/grpc.monitoring/ReactorStatusHandler", in, out, opts...) if err != nil { @@ -46,7 +46,7 @@ func (c *monitoringClient) ReactorStatusHandler(ctx context.Context, in *Reactor // All implementations must embed UnimplementedMonitoringServer // for forward compatibility type MonitoringServer interface { - ReactorStatusHandler(context.Context, *ReactorStatusRequest) (*ReactorStatusResponse, error) + ReactorStatusHandler(context.Context, *ReactorStatusPing) (*ReactorStatusResponse, error) mustEmbedUnimplementedMonitoringServer() } @@ -54,7 +54,7 @@ type MonitoringServer interface { type UnimplementedMonitoringServer struct { } -func (UnimplementedMonitoringServer) ReactorStatusHandler(context.Context, *ReactorStatusRequest) (*ReactorStatusResponse, error) { +func (UnimplementedMonitoringServer) ReactorStatusHandler(context.Context, *ReactorStatusPing) (*ReactorStatusResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method ReactorStatusHandler not implemented") } func (UnimplementedMonitoringServer) mustEmbedUnimplementedMonitoringServer() {} @@ -71,7 +71,7 @@ func RegisterMonitoringServer(s grpc.ServiceRegistrar, srv MonitoringServer) { } func _Monitoring_ReactorStatusHandler_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(ReactorStatusRequest) + in := new(ReactorStatusPing) if err := dec(in); err != nil { return nil, err } @@ -83,7 +83,7 @@ func _Monitoring_ReactorStatusHandler_Handler(srv interface{}, ctx context.Conte FullMethod: "/grpc.monitoring/ReactorStatusHandler", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(MonitoringServer).ReactorStatusHandler(ctx, req.(*ReactorStatusRequest)) + return srv.(MonitoringServer).ReactorStatusHandler(ctx, req.(*ReactorStatusPing)) } return interceptor(ctx, in, info, handler) } diff --git a/internal/pkg/reactor/monitoring.go b/internal/pkg/reactor/monitoring.go index 07cbfd8..44dcfd9 100644 --- a/internal/pkg/reactor/monitoring.go +++ b/internal/pkg/reactor/monitoring.go @@ -3,11 +3,11 @@ package reactor import ( "sync" "context" - "log" - "fmt" - "net" - "FRMS/internal/pkg/logging" - "google.golang.org/grpc" + //"log" + //"fmt" + //"net" + //"FRMS/internal/pkg/logging" + //"google.golang.org/grpc" pb "FRMS/internal/pkg/grpc" ) @@ -28,9 +28,9 @@ func (c *Coordinator) DevStatus(ch chan *DeviceStatus, a int, dm DeviceManager) ch <-d } -func (c *Coordinator) GetStatus() []*DeviceStatus { +func (c *Coordinator) GetStatus() []*pb.Device { var wg sync.WaitGroup - devs := []*DeviceStatus{} + devs := []*pb.Device{} statusChan := make(chan *DeviceStatus) c.Devices.Lock() for a,dm := range c.Devices.Managers { @@ -46,8 +46,8 @@ func (c *Coordinator) GetStatus() []*DeviceStatus { for { select{ case s:= <-statusChan: - //fmt.Printf("%v is %v, ",s.Type,s.Status) - devs = append(devs,s) + //fmt.Printf("%v is %v\n",s.Type,s.Status) + devs = append(devs,&pb.Device{Addr:int32(s.Addr),Type:s.Type,Status:s.Status,Data:s.Data}) wg.Done() case <-allDone: return devs @@ -56,7 +56,17 @@ func (c *Coordinator) GetStatus() []*DeviceStatus { } // grpc status update handler - +func (c *Coordinator) Ping() { + // sends all device status to central coordinator + devs := c.GetStatus() + req := &pb.ReactorStatusPing{Id:c.Id,Devices:devs} + _, err := c.MonitoringClient.ReactorStatusHandler(context.Background(),req) + if err != nil { + c.Err <-err + go c.Exit() + } +} +/* func (c *Coordinator) Register() { ip := c.hwinfo.Ip @@ -70,3 +80,4 @@ func (c *Coordinator) Register() { } logging.Debug(logging.DStart, "Listening for pings on %v:%v\n",ip,c.hwinfo.Port) } +*/ diff --git a/internal/pkg/reactor/rlcoordinator.go b/internal/pkg/reactor/rlcoordinator.go index d5ebc74..ee3ef82 100644 --- a/internal/pkg/reactor/rlcoordinator.go +++ b/internal/pkg/reactor/rlcoordinator.go @@ -22,21 +22,16 @@ import ( // Coordinator == Reactor Level Coordinator type Coordinator struct { - *server + Ip string + Port int // listener port + MonitoringClient pb.MonitoringClient *hw Devices *DeviceManagers // struct for fine grain locking Err chan error mu sync.Mutex - *Timeout + HB time.Duration + PingTimer chan struct{} Active active - pb.UnimplementedMonitoringServer -} - -type Timeout struct { - Alert chan bool - LastSeen time.Time - TO time.Duration - sync.Mutex } type active struct { @@ -45,16 +40,8 @@ type active struct { sync.Mutex } -type server struct { - // store central server endpoint - Ip string - Port int -} - type hw struct { // store reactor info - Ip string - Port int Model string Bus int Id uint32 @@ -94,42 +81,54 @@ func NewI2CMonitor(b int,ch chan int) I2CMonitor { } func NewCoordinator(ip string,port int,ch chan error) *Coordinator { - serv := &server{Ip:ip,Port:port} sen := new(DeviceManagers) sen.Managers = make(map[int]DeviceManager) c := &Coordinator{Err:ch,Devices:sen} - c.server = serv + c.Ip = ip + c.Port = port c.hw = &hw{} + c.HB = time.Duration(1 * time.Second) + c.PingTimer = make(chan struct{}) return c } -func GetHWInfo() (Hardware, error) { - return system.NewHWinfo() -} - func (c *Coordinator) Start() { // should discover hwinfo and sensors on its own // now setting up sensor managers // setting up hw stuff - c.hw.Ip = system.GetIp() //get should prevent empty data - c.Id = system.GetId() - c.Model = system.GetModel() - c.Bus = system.GetBus() - c.Register() + c.Activate() + var err error + c.Id, err = system.GetId("eth0") + c.Model, err = system.GetModel() + c.Bus, err = system.GetBus() + if err != nil { + c.Err <-err + } go c.Monitor() - go c.Connect() - logging.Debug(logging.DStart, "%v RLC Starting", c.Id) + go c.Discover() } func (c *Coordinator) Monitor() { // function to automatically create and destroy sm - ch := make(chan int) - im := NewI2CMonitor(c.Bus,ch) + dch := make(chan int) + im := NewI2CMonitor(c.Bus,dch) go im.Monitor() - for { - d := <-ch - i := im.GetDevice(d) - go c.DeviceConnect(i) + for c.IsActive() { + select { + case d := <-dch: + i := im.GetDevice(d) + go c.DeviceConnect(i) + case <-c.PingTimer: + go c.Ping() + } + } +} + +func (c *Coordinator) HeartBeat() { + for c.IsActive() { + c.PingTimer <-struct{}{} + logging.Debug(logging.DClient,"RLC Pinging server") + time.Sleep(c.HB) } } @@ -146,41 +145,56 @@ func (c *Coordinator) DeviceConnect(i2c I2CDev) { } } -func (c *Coordinator) Connect() { +func (c *Coordinator) Discover() { + // sets up connection to central coordiantor + conn, err := c.Connect(c.Ip, c.Port) + if err != nil { + c.Err <-err + } + defer conn.Close() + client := pb.NewHandshakeClient(conn) + req := &pb.ClientRequest{ClientId:c.Id,ClientType:"reactor"} + resp, err := client.ClientDiscoveryHandler(context.Background(), req) + if err != nil { + c.Err <-err + } + c.Port = int(resp.GetServerPort()) // updating server port + logging.Debug(logging.DClient,"RLC Central server reached, supplied port %v",c.Port) + // connecting to manager now + clientConn, err := c.Connect(c.Ip, c.Port) + if err != nil { + c.Err <-err + } + c.MonitoringClient = pb.NewMonitoringClient(clientConn) + go c.HeartBeat() + +} + +func (c *Coordinator) Connect(ip string, port int) (*grpc.ClientConn, error) { // function connects to central server and passes hwinfo var opts []grpc.DialOption opts = append(opts,grpc.WithTransportCredentials(insecure.NewCredentials())) var conn *grpc.ClientConn var err error for { - conn, err = grpc.Dial(fmt.Sprintf("%v:%v",c.server.Ip,c.server.Port),opts...) + conn, err = grpc.Dial(fmt.Sprintf("%v:%v",ip,port),opts...) code := status.Code(err) if code != 0 { // != OK if code == (5 | 14) { // service temp down to := c.Timeout() if to == 0 { err = errors.New("Failed to connect to central server") - c.Err <-err + return &grpc.ClientConn{}, err } logging.Debug(logging.DClient,"Server currently unavailable, retrying in %v ms", to) time.Sleep(time.Duration(to) * time.Millisecond) } else { - c.Err <-err + return &grpc.ClientConn{}, err } } break; } - defer conn.Close() - client := pb.NewHandshakeClient(conn) - req := &pb.ClientRequest{ClientId:c.Id,ClientType:"reactor"} - resp, err := client.ClientDiscoveryHandler(context.Background(), req) - if err != nil { - c.Err <-err - } - c.Port = resp.GetPort() - logging.Debug(logging.DClient,"RLC Central server reached, supplied port %v",c.Port) - c.PingReset() - go c.Timeoutd() + return conn, nil } func (c *Coordinator) Timeout() int { @@ -196,31 +210,34 @@ func (c *Coordinator) Timeout() int { } } -func (t *Timeout) PingReset() { - t.Lock() - defer t.Unlock() - t.LastSeen = time.Now() +func (c *Coordinator) IsActive() bool { + c.Active.Lock() + defer c.Active.Unlock() + return c.Active.bool } -func (c *Coordinator) Timeoutd() { - for c.IsActive() { - if sleep, elapsed := t.Elapsed(); elapsed { - logging.Debug(logging.DClient, "RCO Not responding") - break; - } else { - time.Sleep(sleep) - } +func (c *Coordinator) Exit() bool { + c.Active.Lock() + defer c.Active.Unlock() + if c.Active.bool { + c.Active.bool = false + logging.Debug(logging.DClient,"RLC Exiting...") + return true + } else { + logging.Debug(logging.DError, "RLC Already Dead!") + return false } } -func (t *Timeout) Elapsed() (time.Duration, bool) { - t.Lock() - defer t.Unlock() - now := time.Now() - if now.After(t.LastSeen.Add(t.TO)) { - return 0 * time.Second, true +func (c *Coordinator) Activate() bool { + c.Active.Lock() + defer c.Active.Unlock() + if c.Active.bool { + logging.Debug(logging.DError,"RLC Already Started!") + return false } else { - sleep := t.LastSeen.Add(t.To).Sub(now) - return sleep, false + logging.Debug(logging.DClient, "RLC Starting") + c.Active.bool = true + return c.Active.bool } } diff --git a/internal/pkg/server/coordinator.go b/internal/pkg/server/coordinator.go index 7d916e4..53319e2 100644 --- a/internal/pkg/server/coordinator.go +++ b/internal/pkg/server/coordinator.go @@ -3,23 +3,34 @@ package server import ( "sync" //"fmt" + "net" + "context" + "errors" "FRMS/internal/pkg/logging" + "google.golang.org/grpc" + pb "FRMS/internal/pkg/grpc" + ) // this package creates coordinators responsible for keeping track of active clients and invoking managers type SubCoordinator interface { Start() - NewManager(*Client, chan error) GeneralManager + NewManager(*Client,*SystemViewer, chan error) GeneralManager + GetManager(uint32) (GeneralManager, bool) + AddManager(uint32, GeneralManager) + Register() } type GeneralManager interface { - Start(*Client) - GetPort() int + Start() + UpdateClient(*Client) + ReactorStatusHandler(context.Context,*pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) + GetDevices(context.Context, *pb.GetDevicesRequest) (*pb.GetDevicesResponse, error) } type Coordinator struct { Port int // port that we set up gRPC endpoint on - *Managers + //*Managers going to embed this in subcoordinator SubCoordinator *SystemViewer Err chan error @@ -31,76 +42,78 @@ type Managers struct { } // interface stuff -func NewCoordinator(clientType string, sys *SystemViewer,grpcServer *grpc.Server, err chan error) *Coordinator { +func NewCoordinator(clientType string, sys *SystemViewer, err chan error) *Coordinator { d := make(map[uint32]GeneralManager) m := &Managers{Directory:d} c := &Coordinator{Err:err} - c.SubCoordinator = NewSubCoordinator(clientType, err) + c.Port = 2023 + sub, errs := NewSubCoordinator(clientType, m, err) + if errs != nil { + err <-errs + } + c.SubCoordinator = sub c.SystemViewer = sys - c.Managers = m - go c.Register(clientType, grpcServer) + //c.Managers = m + go c.Register() return c } func (c *Coordinator) Start() { // on start we need to create channel listener // on each new connection we want to check its id against our mapping - c.CreateManager.Start() + c.SubCoordinator.Start() } func (c *Coordinator) ClientHandler(cl *Client) int { // (creates and) notifies manager of client connection - m := c.LoadManager(cl) - go func (mn GeneralManager, c *client) { - mn.IncomingClient <-c - }(m, cl) + go c.UpdateManager(cl) return c.Port } -func (c *Coordinator) LoadManager(cl *Client) GeneralManager { +func (c *Coordinator) UpdateManager(cl *Client) { // shouldn't happen all that often so should be fine to lock - c.Managers.Lock() - defer c.Managers.Unlock() - var exists bool - var m GeneralManager - if m, exists = c.Managers.Directory[cl.Id]; !exists { - // manager in memory + m, exists := c.GetManager(cl.Id) + if !exists { m = c.NewManager(cl, c.SystemViewer, c.Err) - c.Managers.Directory[cl.Id] = m + m.UpdateClient(cl) + go c.AddManager(cl.Id, m) go m.Start() } - return m + go m.UpdateClient(cl) +} + +func (m *Managers) AddManager(id uint32, man GeneralManager) { + m.Lock() + defer m.Unlock() + m.Directory[id] = man } -func (c *Coordinator) GetManager(id uint32) GeneralManager { +func (m *Managers) GetManager(id uint32) (GeneralManager, bool) { // just read locks and reuturns - c.Managers.RLock() - defer c.Managers.RUnlock() - return c.Managers.Directory[id] + m.RLock() + defer m.RUnlock() + man, exists := m.Directory[id] + return man, exists } -func NewSubCoordinator(clientType string, err chan error) SubCoordinator { +func NewSubCoordinator(clientType string, m *Managers, err chan error) (SubCoordinator, error) { if clientType == "reactor" { - return &reactorCoordinator{Sys:sys} + c := &reactorCoordinator{} + c.Managers = m + return c, nil } else if clientType == "tui" { - return &tuiCoordinator{Sys:sys} + c := &tuiCoordinator{} + c.Managers = m + return c, nil } - return SubCoordinator{} + return &reactorCoordinator{}, errors.New("Unrecognized client type") } -func (c *Coordinator) Register(clientType string, grpcServer *grpc.Server) { - if clientType == "reactor" { - pb.RegisterManagementServer(grpcServer,c) - logging.Debug(logging.DClient, "RCO ready for client requests") - } else if clientType == "tui" { - pb.RegisterMonitoringServer(grpcServer,c) - logging.Debug(logging.DClient, "TCO ready for client requests") - } -} // creating sub coordinators for associated gRPC handlers // reactor coordinator type reactorCoordinator struct { + *Managers pb.UnimplementedMonitoringServer } @@ -113,17 +126,28 @@ func (r *reactorCoordinator) NewManager(cl *Client, sys *SystemViewer, err chan return NewReactorManager(cl,sys,err) } -func (c *Coordinator) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusRequest) (*pb.ReactorStatusResponse, error) { - m := c.GetManager(req.GetId()) - if m == GeneralManager{} { - return &pb.ReactorStatusResponse, errors.New("Manager doesn't exists for that client") +func (r *reactorCoordinator) Register() { + lis, err := net.Listen("tcp", ":2023") + if err != nil { + // rip } - return m.ReactorStatusHandler(ctx, req) + grpcServer := grpc.NewServer() + pb.RegisterMonitoringServer(grpcServer,r) + go grpcServer.Serve(lis) + logging.Debug(logging.DClient, "RCO ready for client requests") } +func (r *reactorCoordinator) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) { + m, exists := r.GetManager(req.GetId()) + if !exists { + return &pb.ReactorStatusResponse{}, errors.New("Manager doesn't exists for that client") + } + return m.ReactorStatusHandler(ctx, req) +} //tui coordinator type tuiCoordinator struct { + *Managers pb.UnimplementedManagementServer } @@ -136,12 +160,35 @@ func (t *tuiCoordinator) NewManager(cl *Client, sys *SystemViewer, err chan erro return NewTUIManager(cl,sys,err) } -func (c *Coordinator) GetDevices(ctx context.Context, req *pb.GetDevicesRequest) (*pb.GetDevicesResponse, error) { +func (t *tuiCoordinator) Register() { + lis, err := net.Listen("tcp", ":2024") + if err != nil { + // rip + } + grpcServer := grpc.NewServer() + pb.RegisterManagementServer(grpcServer,t) + go grpcServer.Serve(lis) + logging.Debug(logging.DClient, "TCO ready for client requests") +} + +func (t *tuiCoordinator) GetDevices(ctx context.Context, req *pb.GetDevicesRequest) (*pb.GetDevicesResponse, error) { // grpc handler to fwd to manager - m := c.GetManager(req.GetId()) - if m == GeneralManager{} { + m, exists := t.GetManager(req.GetClientId()) + if !exists { // doesnt exist for some reason - return &pb.GetDevicesRequest{}, errors.New("Manager doesn't exists for client") + return &pb.GetDevicesResponse{}, errors.New("Manager doesn't exists for client") } return m.GetDevices(ctx,req) } + +// unimplemented bs for grpc +func (t *tuiCoordinator) DeleteReactor(ctx context.Context, req *pb.DeleteReactorRequest) (*pb.DeleteReactorResponse, error) { + // TODO + return &pb.DeleteReactorResponse{}, nil +} + +func (t *tuiCoordinator) DeleteReactorDevice(ctx context.Context, req *pb.DeleteReactorDeviceRequest) (*pb.DeleteReactorDeviceResponse, error) { + // TODO + return &pb.DeleteReactorDeviceResponse{}, nil +} + diff --git a/internal/pkg/server/listener.go b/internal/pkg/server/listener.go index cd1ef65..ee58d73 100644 --- a/internal/pkg/server/listener.go +++ b/internal/pkg/server/listener.go @@ -5,7 +5,7 @@ import ( "fmt" "net" "context" - "FRMS/internal/pkg/system" + // "FRMS/internal/pkg/system" "FRMS/internal/pkg/logging" "google.golang.org/grpc" pb "FRMS/internal/pkg/grpc" @@ -19,7 +19,7 @@ should simplify interfaces type Listener struct { // exporting for easy use in the short term Port int - Coordinators map[string]chan *Client + Coordinators map[string]*Coordinator CLis *grpc.Server Sys *SystemViewer Err chan error @@ -35,12 +35,13 @@ type Client struct { Type string } -func NewListener(ch chan error) *Listener { +func NewListener(ch chan error, port int) *Listener { c := make(map[string]*Coordinator) l := &Listener{Err:ch} l.Coordinators = c l.Sys = NewSystemViewer() - return l, nil + l.Port = port + return l } func (l *Listener) Start() { @@ -65,14 +66,14 @@ func (l *Listener) Register() error { go grpcServer.Serve(lis) logging.Debug(logging.DStart, "LIS Registered on port %v", l.Port) - lis, err = net.Listen("tcp", fmt.Sprintf(":%v",l.Port+1)) // either binding to supplied port or binding to docker default - if err != nil { - return err - } - grpcServer = grpc.NewServer() - l.CLis = grpcServer - go grpcServer.Serve(lis) - logging.Debug(logging.DStart, "LIS Coordinator server registered on port %v", l.Port + 1) + //lis, err = net.Listen("tcp", fmt.Sprintf(":%v",l.Port+1)) // either binding to supplied port or binding to docker default + //if err != nil { + //return err + //} + //grpcServer = grpc.NewServer() + //l.CLis = grpcServer + //go grpcServer.Serve(lis) + //logging.Debug(logging.DStart, "LIS Coordinator server registered on port %v", l.Port + 1) return nil } @@ -84,7 +85,7 @@ func (l *Listener) ClientDiscoveryHandler(ctx context.Context, ping *pb.ClientRe coord, ok := l.Coordinators[c.Type] if !ok { logging.Debug(logging.DSpawn,"CCO 01 Created Coordinator") - coord = NewCoordinator(c.Type, l.Sys, l.CLis, l.Err) + coord = NewCoordinator(c.Type, l.Sys, l.Err) l.Coordinators[c.Type] = coord go coord.Start() } diff --git a/internal/pkg/server/manager.go b/internal/pkg/server/manager.go index 4af6810..41e092c 100644 --- a/internal/pkg/server/manager.go +++ b/internal/pkg/server/manager.go @@ -6,7 +6,9 @@ import ( "math" "sync" "errors" - //"FRMS/internal/pkg/logging" + "context" + "FRMS/internal/pkg/logging" + pb "FRMS/internal/pkg/grpc" // unimplemented base methods ) // this package will implement a boilerplate manager @@ -14,7 +16,6 @@ import ( type Manager struct { *Client // gives access to c.Ip c.Id etc - IncomingClient chan *Client Hb time.Duration // used for managing hb timer for client Active active Sig chan bool @@ -27,9 +28,9 @@ type active struct{ int } -func NewManager(cl chan *Client, err chan error) *Manager { +func NewManager(err chan error) *Manager { hb := time.Duration(1 * time.Second) //hb to - m := &Manager{Hb:hb,Err:err,IncomingClient:cl} + m := &Manager{Hb:hb,Err:err} return m } @@ -38,7 +39,6 @@ func (m *Manager) Start() { // manager already running m.Err <-errors.New("Manager already running!") } // if we get here, manager is atomically activated and we can ensure start wont run again - go m.ClientConnections() } func (m *Manager) Exit() { @@ -48,14 +48,9 @@ func (m *Manager) Exit() { } } -func (m *Manager) ClientConnections() { - for m.IsActive { - cl := <-m.IncomingClient - if m.IsActive { - logging.Debug(logging.DClient,"MAN Updating client old (%v) new (%v)",m.Id,cl.Id) - m.Client = cl - } - } +func (m *Manager) UpdateClient(cl *Client) { + logging.Debug(logging.DClient,"MAN Updating client %v",cl.Id) + m.Client = cl } // reactor manager atomic operations @@ -109,3 +104,11 @@ func (m *Manager) Timeout() int { return 0 } } + +func (m *Manager) GetDevices(ctx context.Context, req *pb.GetDevicesRequest) (*pb.GetDevicesResponse, error) { + return &pb.GetDevicesResponse{}, errors.New("Get Devices not implemented!") +} + +func (m *Manager) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) { + return &pb.ReactorStatusResponse{}, errors.New("Reactor Status Handler not implemented!") +} diff --git a/internal/pkg/server/reactormanager.go b/internal/pkg/server/reactormanager.go index 9cd68ea..6f967b1 100644 --- a/internal/pkg/server/reactormanager.go +++ b/internal/pkg/server/reactormanager.go @@ -35,23 +35,22 @@ func NewReactorManager(c *Client,sys *SystemViewer,err chan error) GeneralManage return r } -func (r *ReactorManager) Start(cl *Client) { - r.Manager.Start(cl) +func (r *ReactorManager) Start() { + r.Manager.Start() logging.Debug(logging.DStart,"RMA %v starting", r.Id) go r.StatusMon.Send(&DeviceInfo{Id:r.Id,Type:"Reactor",Status:"[green]ONLINE[white]"},"Reactor") - conn := r.Connect() - empty := &grpc.ClientConn{} - if conn != empty { - go r.Monitor(conn) - } + //conn := r.Connect() + //empty := &grpc.ClientConn{} + //if conn != empty { + //} } func (r *ReactorManager) Exit() { r.Manager.Exit() logging.Debug(logging.DExit, "RMA %v exiting", r.Id) go r.StatusMon.Send(&DeviceInfo{Id:r.Id,Type:"Reactor",Status:"[red]OFFLINE[white]",Data:fmt.Sprintf("Last Seen %v",time.Now().Format("Mon at 03:04:05pm MST"))},"Reactor") - r.Devs.Lock() - defer r.Devs.Unlock() + r.devstatus.Lock() + defer r.devstatus.Unlock() for _, d := range r.Devs { newd := d newd.Status = "[yellow]UNKOWN[white]" @@ -95,9 +94,9 @@ func (r *ReactorManager) Connect() *grpc.ClientConn { return conn } -func (r *ReactorManager) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusRequest) (*pb.ReactorStatusResponse, error) { +func (r *ReactorManager) ReactorStatusHandler(ctx context.Context, req *pb.ReactorStatusPing) (*pb.ReactorStatusResponse, error) { // function client will call to update reactor information - go r.PingReset() + //go r.PingReset() for _, dev := range req.GetDevices() { d := &DeviceInfo{Id:uint32(dev.GetAddr()),Type:dev.GetType(),Status:dev.GetStatus(),Data:dev.GetData()} go r.UpdateDevice(d) diff --git a/internal/pkg/server/tuimanager.go b/internal/pkg/server/tuimanager.go index 646b1e9..bb37596 100644 --- a/internal/pkg/server/tuimanager.go +++ b/internal/pkg/server/tuimanager.go @@ -1,14 +1,14 @@ package server import ( - "fmt" + // "fmt" "time" "sync" - "net" - "log" + // "net" + // "log" "context" "FRMS/internal/pkg/logging" - "google.golang.org/grpc" + // "google.golang.org/grpc" pb "FRMS/internal/pkg/grpc" ) @@ -16,8 +16,6 @@ import ( type TUIManager struct { *Manager // embedded manager for access to methods and client - Ip string - Port *port StatusMon *StatusMonitor // use it for all devs coming in Err chan error *Timeout @@ -31,30 +29,23 @@ type Timeout struct { sync.Mutex } -type port struct { - Chan chan int - int -} - -func NewTUIManager(ip string, c *Client, sys *SystemViewer, err chan error) GeneralManager { +func NewTUIManager(c *Client, sys *SystemViewer, err chan error) GeneralManager { m := NewManager(err) t := &TUIManager{Err: err} - t.Port = &port{Chan:make(chan int)} alert := make(chan bool) t.Timeout = &Timeout{Alert:alert,TO:time.Duration(2500*time.Millisecond)} // short time outs are fine because we will just rejoin t.Manager = m - t.Ip = ip t.StatusMon = NewStatusMonitor("TUI",c.Id,sys) + t.Manager.UpdateClient(c) return t } -func (t *TUIManager) Start(cl *Client) { +func (t *TUIManager) Start() { // t.PingReset() - t.Manager.Start(cl) + t.Manager.Start() logging.Debug(logging.DStart,"TMA %v starting", t.Id) go t.Timeoutd() - go t.Register() // begin tui server to respond to tui client reqs //go t.Monitor(conn) } diff --git a/internal/pkg/system/hwinfo.go b/internal/pkg/system/hwinfo.go index 675c73d..bf9e1d1 100644 --- a/internal/pkg/system/hwinfo.go +++ b/internal/pkg/system/hwinfo.go @@ -7,6 +7,7 @@ import ( "strings" "net" "fmt" + "errors" ) // this package serves to add in wrappers for system commands to get hwinfo from the board @@ -21,41 +22,6 @@ import ( // lshw -C system 2>/dev/null | head -n 1 > hwinfo.txt && ifconfig eth0 | awk '/inet |ether / {print $2}' >> hwinfo.txt // *** will just replace info in file everytime - -type HWinfo struct { - Ip string - Id uint32 - Bus int - Model string - Port int -} - -func (h *HWinfo) GetIp() string { - return h.Ip -} - -func (h *HWinfo) GetId() uint32 { - return h.Id -} - -func (h *HWinfo) GetBus() int { - return h.Bus -} - -func (h *HWinfo) GetPort() int { - return h.Port -} - -func (h *HWinfo) GetModel() string { - return h.Model -} - -func NewHWinfo() (*HWinfo, error) { - h := &HWinfo{Port:2000} - err := h.Get() - return h, err -} - func GetId(eth string) (uint32, error) { maccmd := fmt.Sprintf("ifconfig %v | awk '/ether / {print $2}'", eth) var stderr bytes.Buffer @@ -109,17 +75,31 @@ func GetBus() (int, error) { return 0, err } b := out.String() - b = stings.Trim(b," \n") + b = strings.Trim(b," \n") if bs, ok := bus[b]; !ok { - return 0, errors.New("No bus for dev %v", b) + return 0, errors.New(fmt.Sprintf("No bus for dev %v", b)) } else { return bs, nil } } -func (h *HWinfo) Get() error { +func GetModel() (string, error) { + devname := "lshw -C system 2>/dev/null | head -n 1" + var stderr, out bytes.Buffer + cmd := exec.Command("bash","-c",devname) + cmd.Stdout = &out + cmd.Stderr = &stderr + if err := cmd.Run(); err != nil { + return "", err + } + b := out.String() + b = strings.Trim(b," \n") + return b, nil +} + +func Get() error { // responsible for filling out struct - bus := map[string]int{"raspberrypi":1,"beaglebone":2} // eventually will replace this with a config file + //bus := map[string]int{"raspberrypi":1,"beaglebone":2} // eventually will replace this with a config file ipcmd := "ifconfig eth0 | awk '/inet / {print $2}'" maccmd := "ifconfig eth0 | awk '/ether / {print $2}'" @@ -140,15 +120,11 @@ func (h *HWinfo) Get() error { // formatting ip := res[0].String() ip = strings.Trim(ip," \n") - h.Ip = ip hash := fnv.New32a() hash.Write(res[1].Bytes()) - h.Id = hash.Sum32() b := res[2].String() b = strings.Trim(b," \n") - h.Model = b - h.Bus = bus[b] return nil } diff --git a/internal/pkg/tui/client.go b/internal/pkg/tui/client.go index 78dbf07..eda350c 100644 --- a/internal/pkg/tui/client.go +++ b/internal/pkg/tui/client.go @@ -82,7 +82,7 @@ func (t *TUIClient) Connect() { } // handle handshake logic here client := pb.NewHandshakeClient(conn) - req := &pb.ClientRequest{ClientId:t.Id,Type:"tui"} + req := &pb.ClientRequest{ClientId:t.Id,ClientType:"tui"} resp, err := client.ClientDiscoveryHandler(context.Background(),req) if err != nil { log.Fatal(err)