went back to monitoring via pings; starting terminal interface over the weekend

main
KeeganForelight 3 years ago
parent 530413e9a1
commit 37ad511131

@ -5,7 +5,6 @@ import (
"os"
"flag"
"log"
"strings"
"strconv"
"FRMS/internal/pkg/reactor"
)
@ -25,26 +24,24 @@ func main() {
flag.Usage = func() {
w := flag.CommandLine.Output()
fmt.Fprintf(w, "Usage: %s ip_addr port \n",os.Args[0])
fmt.Fprintf(w, "Usage: %s port \n",os.Args[0])
}
iptr := flag.String("i","192.1.168.136","ip address of server")
flag.Parse()
if flag.NArg() != 2 {
if flag.NArg() != 1 {
flag.Usage()
os.Exit(1)
}
args := flag.Args()
if s := strings.Split(args[0],"."); len(s) != 4 {
flag.Usage()
log.Fatal("second arguement must be an ip address of type x.x.x.x")
}
if p, err := strconv.Atoi(args[1]);p < 1024 || p > 65535 {
if p, err := strconv.Atoi(args[0]);p < 1024 || p > 65535 {
flag.Usage()
log.Fatal("Port must be between [1023,65535]")
} else if err != nil {
log.Fatal(err)
}
ip = args[0]
port, err := strconv.Atoi(args[1])
ip = *iptr
port, err := strconv.Atoi(args[0])
if err != nil {
log.Fatal(err)
}

Binary file not shown.

@ -1,6 +1,8 @@
package main
import (
"net/http"
_ "net/http/pprof"
"flag"
"log"
"os"
@ -19,6 +21,9 @@ func NewListener(s string,ch chan error) (listener, error) {
func main() {
// lets get this bread
// all we need to do is call the reactor coordinator and thats it
go func() {
fmt.Println(http.ListenAndServe("localhost:6060",nil))
}()
flag.Usage = func() {
w := flag.CommandLine.Output()
fmt.Fprintf(w,"Usage: %s [eth*,wlan*,etc.]\n",os.Args[0])

Binary file not shown.

@ -9,14 +9,17 @@ require (
)
require (
github.com/chzyer/readline v1.5.0 // indirect
github.com/gdamore/encoding v1.0.0 // indirect
github.com/gdamore/tcell/v2 v2.4.1-0.20210905002822-f057f0a857a1 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/pprof v0.0.0-20220608213341-c488b8fa1db3 // indirect
github.com/ianlancetaylor/demangle v0.0.0-20220517205856-0058ec4f073c // indirect
github.com/lucasb-eyer/go-colorful v1.2.0 // indirect
github.com/mattn/go-runewidth v0.0.13 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
golang.org/x/net v0.0.0-20201021035429-f5854403a974 // indirect
golang.org/x/sys v0.0.0-20210309074719-68d13333faf2 // indirect
golang.org/x/sys v0.0.0-20220627191245-f75cf1eec38b // indirect
golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d // indirect
golang.org/x/text v0.3.6 // indirect
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect

@ -4,6 +4,10 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chzyer/logex v1.2.0/go.mod h1:9+9sk7u7pGNWYMkh0hdiL++6OeibzJccyQU4p4MedaY=
github.com/chzyer/readline v1.5.0 h1:lSwwFrbNviGePhkewF1az4oLmcwqCZijQ2/Wi3BGHAI=
github.com/chzyer/readline v1.5.0/go.mod h1:x22KAscuvRqlLoK9CsoYsmxoXZMMFVyOl86cAH8qUic=
github.com/chzyer/test v0.0.0-20210722231415-061457976a23/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
@ -47,8 +51,12 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/pprof v0.0.0-20220608213341-c488b8fa1db3 h1:mpL/HvfIgIejhVwAfxBQkwEjlhP5o0O9RAeTAjpwzxc=
github.com/google/pprof v0.0.0-20220608213341-c488b8fa1db3/go.mod h1:gSuNB+gJaOiQKLEZ+q+PK9Mq3SOzhRcw2GsGS/FhYDk=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/ianlancetaylor/demangle v0.0.0-20220517205856-0058ec4f073c h1:rwmN+hgiyp8QyBqzdEX43lTjKAxaqCrYHaU5op5P9J8=
github.com/ianlancetaylor/demangle v0.0.0-20220517205856-0058ec4f073c/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w=
github.com/lucasb-eyer/go-colorful v1.2.0 h1:1nnpGOrhyZZuNyfu1QjKiUICQ74+3FNCN69Aj6K7nkY=
github.com/lucasb-eyer/go-colorful v1.2.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0=
github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4OSgU=
@ -94,6 +102,9 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210309074719-68d13333faf2 h1:46ULzRKLh1CwgRq2dC5SlBzEqqNCi8rreOZnNrbqcIY=
golang.org/x/sys v0.0.0-20210309074719-68d13333faf2/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220627191245-f75cf1eec38b h1:2n253B2r0pYSmEV+UNCQoPfU/FiaizQEK5Gu4Bq4JE8=
golang.org/x/sys v0.0.0-20220627191245-f75cf1eec38b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201210144234-2321bbc49cbf/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d h1:SZxvLBoTP5yHO3Frd4z4vrF+DBX9vMVanchswa69toE=
golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=

@ -70,3 +70,31 @@ func (b *I2CBus) Scan() map[int]bool {
}
return devices
}
func (b *I2CBus) GetStatus(addr int) bool {
b.Lock()
defer b.Unlock()
bus := strconv.Itoa(b.int)
a := strconv.Itoa(addr)
cmd := exec.Command("i2cdetect","-y","-r",bus,a,a)
var out bytes.Buffer
cmd.Stdout = &out
if err := cmd.Run(); err != nil {
log.Fatal(err)
}
outString := out.String()
split := strings.SplitAfter(outString,":")
split = split[1:] // remove garbage header
val := int(addr/16) // if addr = 90 90/16 = int(5.6) = 5 will be in 5th row
dev := split[val]
lst := strings.Index(dev,"\n")
dev = dev[:lst]
trimmed := strings.Trim(dev," \n")
if strings.Contains(trimmed,"--") {
return false
} else {
return true
}
}

@ -1,7 +1,7 @@
package I2C
import (
_ "fmt"
"fmt"
_ "sync"
)
@ -11,6 +11,11 @@ type I2CDevice struct {
int // addr
}
func (d I2CDevice) String() string {
t := map[int]string{97:"DO Sensor",99:"pH Sensor",102:"Temperature Sensor"}
return t[d.int]
}
func NewDevice(addr int,bus *I2CBus) *I2CDevice {
d := &I2CDevice{}
d.I2CBus = bus
@ -24,10 +29,15 @@ func (d *I2CDevice) GetAddr() int {
func (d *I2CDevice) GetStatus() string {
// TODO
return "Unknown"
s := d.I2CBus.GetStatus(d.int)
if s {
return "ACTIVE"
} else {
return "KILLED"
}
}
func (d *I2CDevice) GetType() string {
// TODO
return "Unknown"
return fmt.Sprint(d)
}

@ -130,6 +130,7 @@ type Device struct {
Addr int32 `protobuf:"varint,1,opt,name=addr,proto3" json:"addr,omitempty"`
Type string `protobuf:"bytes,2,opt,name=type,proto3" json:"type,omitempty"`
Status string `protobuf:"bytes,3,opt,name=status,proto3" json:"status,omitempty"`
Data string `protobuf:"bytes,4,opt,name=data,proto3" json:"data,omitempty"`
}
func (x *Device) Reset() {
@ -185,6 +186,13 @@ func (x *Device) GetStatus() string {
return ""
}
func (x *Device) GetData() string {
if x != nil {
return x.Data
}
return ""
}
var File_internal_pkg_grpc_monitoring_proto protoreflect.FileDescriptor
var file_internal_pkg_grpc_monitoring_proto_rawDesc = []byte{
@ -198,14 +206,15 @@ var file_internal_pkg_grpc_monitoring_proto_rawDesc = []byte{
0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x02, 0x69, 0x64, 0x12, 0x26, 0x0a, 0x07, 0x64,
0x65, 0x76, 0x69, 0x63, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0c, 0x2e, 0x67,
0x72, 0x70, 0x63, 0x2e, 0x44, 0x65, 0x76, 0x69, 0x63, 0x65, 0x52, 0x07, 0x64, 0x65, 0x76, 0x69,
0x63, 0x65, 0x73, 0x22, 0x48, 0x0a, 0x06, 0x44, 0x65, 0x76, 0x69, 0x63, 0x65, 0x12, 0x12, 0x0a,
0x63, 0x65, 0x73, 0x22, 0x5c, 0x0a, 0x06, 0x44, 0x65, 0x76, 0x69, 0x63, 0x65, 0x12, 0x12, 0x0a,
0x04, 0x61, 0x64, 0x64, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x04, 0x61, 0x64, 0x64,
0x72, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52,
0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18,
0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x32, 0x5d, 0x0a,
0x0a, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x69, 0x6e, 0x67, 0x12, 0x4f, 0x0a, 0x14, 0x52,
0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x48, 0x61, 0x6e, 0x64,
0x6c, 0x65, 0x72, 0x12, 0x1a, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x61, 0x63, 0x74,
0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x12, 0x0a,
0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x64, 0x61, 0x74,
0x61, 0x32, 0x59, 0x0a, 0x0a, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x69, 0x6e, 0x67, 0x12,
0x4b, 0x0a, 0x10, 0x47, 0x65, 0x74, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x53, 0x74, 0x61,
0x74, 0x75, 0x73, 0x12, 0x1a, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x61, 0x63, 0x74,
0x6f, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a,
0x1b, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x53, 0x74,
0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x13, 0x5a, 0x11,
@ -233,8 +242,8 @@ var file_internal_pkg_grpc_monitoring_proto_goTypes = []interface{}{
}
var file_internal_pkg_grpc_monitoring_proto_depIdxs = []int32{
2, // 0: grpc.ReactorStatusResponse.devices:type_name -> grpc.Device
0, // 1: grpc.monitoring.ReactorStatusHandler:input_type -> grpc.ReactorStatusRequest
1, // 2: grpc.monitoring.ReactorStatusHandler:output_type -> grpc.ReactorStatusResponse
0, // 1: grpc.monitoring.GetReactorStatus:input_type -> grpc.ReactorStatusRequest
1, // 2: grpc.monitoring.GetReactorStatus:output_type -> grpc.ReactorStatusResponse
2, // [2:3] is the sub-list for method output_type
1, // [1:2] is the sub-list for method input_type
1, // [1:1] is the sub-list for extension type_name

@ -4,7 +4,7 @@ package grpc;
option go_package = "internal/pkg/grpc";
service monitoring {
rpc ReactorStatusHandler(ReactorStatusRequest) returns (ReactorStatusResponse);
rpc GetReactorStatus(ReactorStatusRequest) returns (ReactorStatusResponse);
}
message ReactorStatusRequest {
@ -20,5 +20,5 @@ message Device {
int32 addr = 1;
string type = 2;
string status = 3;
string data = 4;
}

@ -22,7 +22,7 @@ const _ = grpc.SupportPackageIsVersion7
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type MonitoringClient interface {
ReactorStatusHandler(ctx context.Context, in *ReactorStatusRequest, opts ...grpc.CallOption) (*ReactorStatusResponse, error)
GetReactorStatus(ctx context.Context, in *ReactorStatusRequest, opts ...grpc.CallOption) (*ReactorStatusResponse, error)
}
type monitoringClient struct {
@ -33,9 +33,9 @@ func NewMonitoringClient(cc grpc.ClientConnInterface) MonitoringClient {
return &monitoringClient{cc}
}
func (c *monitoringClient) ReactorStatusHandler(ctx context.Context, in *ReactorStatusRequest, opts ...grpc.CallOption) (*ReactorStatusResponse, error) {
func (c *monitoringClient) GetReactorStatus(ctx context.Context, in *ReactorStatusRequest, opts ...grpc.CallOption) (*ReactorStatusResponse, error) {
out := new(ReactorStatusResponse)
err := c.cc.Invoke(ctx, "/grpc.monitoring/ReactorStatusHandler", in, out, opts...)
err := c.cc.Invoke(ctx, "/grpc.monitoring/GetReactorStatus", in, out, opts...)
if err != nil {
return nil, err
}
@ -46,7 +46,7 @@ func (c *monitoringClient) ReactorStatusHandler(ctx context.Context, in *Reactor
// All implementations must embed UnimplementedMonitoringServer
// for forward compatibility
type MonitoringServer interface {
ReactorStatusHandler(context.Context, *ReactorStatusRequest) (*ReactorStatusResponse, error)
GetReactorStatus(context.Context, *ReactorStatusRequest) (*ReactorStatusResponse, error)
mustEmbedUnimplementedMonitoringServer()
}
@ -54,8 +54,8 @@ type MonitoringServer interface {
type UnimplementedMonitoringServer struct {
}
func (UnimplementedMonitoringServer) ReactorStatusHandler(context.Context, *ReactorStatusRequest) (*ReactorStatusResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method ReactorStatusHandler not implemented")
func (UnimplementedMonitoringServer) GetReactorStatus(context.Context, *ReactorStatusRequest) (*ReactorStatusResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetReactorStatus not implemented")
}
func (UnimplementedMonitoringServer) mustEmbedUnimplementedMonitoringServer() {}
@ -70,20 +70,20 @@ func RegisterMonitoringServer(s grpc.ServiceRegistrar, srv MonitoringServer) {
s.RegisterService(&Monitoring_ServiceDesc, srv)
}
func _Monitoring_ReactorStatusHandler_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
func _Monitoring_GetReactorStatus_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ReactorStatusRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(MonitoringServer).ReactorStatusHandler(ctx, in)
return srv.(MonitoringServer).GetReactorStatus(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/grpc.monitoring/ReactorStatusHandler",
FullMethod: "/grpc.monitoring/GetReactorStatus",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(MonitoringServer).ReactorStatusHandler(ctx, req.(*ReactorStatusRequest))
return srv.(MonitoringServer).GetReactorStatus(ctx, req.(*ReactorStatusRequest))
}
return interceptor(ctx, in, info, handler)
}
@ -96,8 +96,8 @@ var Monitoring_ServiceDesc = grpc.ServiceDesc{
HandlerType: (*MonitoringServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "ReactorStatusHandler",
Handler: _Monitoring_ReactorStatusHandler_Handler,
MethodName: "GetReactorStatus",
Handler: _Monitoring_GetReactorStatus_Handler,
},
},
Streams: []grpc.StreamDesc{},

@ -2,28 +2,83 @@ package reactor
import (
"sync"
"context"
"log"
"fmt"
"net"
"google.golang.org/grpc"
pb "FRMS/internal/pkg/grpc"
)
// implements grpc handler and device update handler
type SystemUpdates struct {
sync.Mutex
pending map[int]*Dev
}
type Dev struct {
// implements grpc handler and device data aggregater handler
type DeviceStatus struct {
Addr int
Status string
Type string
Data string
}
// get reactor/device status
func (c *Coordinator) DevStatus(ch chan *DeviceStatus, a int, dm DeviceManager) {
d := &DeviceStatus{Addr:a}
d.Type = dm.GetType()
d.Status = dm.GetStatus()
d.Data = dm.GetData()
ch <-d
}
func (c *Coordinator) GetStatus() []*DeviceStatus {
var wg sync.WaitGroup
devs := []*DeviceStatus{}
statusChan := make(chan *DeviceStatus)
c.Devices.Lock()
for a,dm := range c.Devices.Managers {
wg.Add(1)
go c.DevStatus(statusChan,a,dm)
}
c.Devices.Unlock()
allDone := make(chan struct{})
go func(){
wg.Wait()
allDone <-struct{}{}
}() // once all the status are sent we send all done on the chan
for {
select{
case s:= <-statusChan:
fmt.Printf("%v is %v, ",s.Type,s.Status)
devs = append(devs,s)
wg.Done()
case <-allDone:
fmt.Printf("\n")
return devs
}
}
}
// grpc status update handler
func (c *Coordinator) Register() {
ip := c.hwinfo.Ip
func (s *SystemUpdates) DeviceUpdateHandler(a int, st, t string) {
s.Lock()
defer s.Unlock()
s.pending[a] = &Dev{Status:st,Type:t}
if lis, err := net.Listen("tcp", fmt.Sprintf("%v:0",ip)); err != nil {
log.Fatal(err)
} else {
c.hwinfo.Port = lis.Addr().(*net.TCPAddr).Port
grpcServer := grpc.NewServer()
pb.RegisterMonitoringServer(grpcServer,c)
go grpcServer.Serve(lis)
}
log.Printf("Listening for pings on %v:%v\n",ip,c.hwinfo.Port)
}
func (s *SystemUpdates) GetPendingChanges() (map[int]*Dev, chan bool) {
// calls chld routine to block and return w/ buffer contents
// chan is used to send grpc res
// true = empty buffer
// false = release lock unchanged
res := s.LockAndWait()
func (c *Coordinator) GetReactorStatus(ctx context.Context, ping *pb.ReactorStatusRequest) (*pb.ReactorStatusResponse, error) {
// status request handler
devs := []*pb.Device{}
resp := &pb.ReactorStatusResponse{Id:c.Id,Devices:devs}
devStatus := c.GetStatus()
for _,v := range devStatus {
d := &pb.Device{Addr:int32(v.Addr),Type:v.Type,Status:v.Status,Data:v.Data}
resp.Devices = append(resp.Devices,d)
}
return resp, nil
}

@ -5,7 +5,6 @@ package reactor
import (
"fmt"
"sync"
//"net"
"time"
"math"
"FRMS/internal/pkg/system"
@ -28,7 +27,6 @@ type Coordinator struct {
Err chan error
mu sync.Mutex
Active active
*SystemUpdates
pb.UnimplementedMonitoringServer
}
@ -56,11 +54,15 @@ type hwinfo struct {
type DeviceManagers struct {
Managers map[int]DeviceManager
mu sync.Mutex
sync.Mutex
}
// basic devicemanager struct manipulations
type DeviceManager interface {
Start()
GetType() string
GetStatus() string
GetData() string
}
type I2CDev interface {
@ -69,8 +71,8 @@ type I2CDev interface {
GetType() string
}
func NewDeviceManager(i2c I2CDev, sys *SystemUpdates) DeviceManager {
return sensor.NewDeviceManager(i2c,sys)
func NewDeviceManager(i2c I2CDev) DeviceManager {
return sensor.NewDeviceManager(i2c)
}
type I2CMonitor interface {
@ -90,9 +92,6 @@ func NewCoordinator(ip string,port int,ch chan error) *Coordinator {
c.server = serv
c.hwinfo = &hwinfo{}
c.Type = "reactor" // explicit for client stuff
p := make(map[int]*Dev)
sys := &SystemUpdates{pending:p}
c.SystemUpdates = sys
return c
}
@ -117,10 +116,10 @@ func (c *Coordinator) Start() {
}
// setting up hw stuff
c.hwinfo.Ip = hw.GetIp() //get should prevent empty data
c.hwinfo.Port = hw.GetPort()
c.Id = hw.GetId()
c.Model = hw.GetModel()
c.Bus = hw.GetBus()
c.Register()
go c.Monitor()
go c.Connect()
}
@ -138,11 +137,11 @@ func (c *Coordinator) Monitor() {
}
func (c *Coordinator) DeviceConnect(i2c I2CDev) {
c.Devices.mu.Lock()
defer c.Devices.mu.Unlock()
c.Devices.Lock()
defer c.Devices.Unlock()
addr := i2c.GetAddr()
if dm, exists := c.Devices.Managers[addr]; !exists{
dm := NewDeviceManager(i2c,c.SystemUpdates)
dm := NewDeviceManager(i2c)
c.Devices.Managers[addr] = dm
go dm.Start()
} else {
@ -187,33 +186,7 @@ func (c *Coordinator) Connect() {
c.Err <-errors.New("Failed to reach central server!")
}
}
/*
we shouldnt register any services to coordinator. we can do that later
func (c *Coordinator) Register() error {
fmt.Printf("Listening for pings on %v:%v\n",c.hwinfo.Ip,c.hwinfo.Port)
lis, err := net.Listen("tcp", fmt.Sprintf("%v:%v",c.hwinfo.Ip,c.hwinfo.Port))
if err != nil {
return err
}
grpcServer := grpc.NewServer()
pb.RegisterMonitoringServer(grpcServer, c)
go grpcServer.Serve(lis)
return nil
}
// sensor status stuff
func (c *Coordinator) ReactorStatusHandler(ctx context.Context, ping *pb.ReactorStatusRequest) (*pb.ReactorStatusResponse,error) {
// handler returns reactor id and any changes in devices
chng := c.PendingChanges() // returns map[int]sensor
var devs []*pb.Device
resp := &pb.ReactorStatusResponse{Id:c.Id,Devices:devs}
for a, s := range chng {
resp.Devices = append(resp.Devices,&pb.Device{Addr:int32(a), Type:s.GetType(), Status:s.GetStatus()})
}
return resp, nil
}
*/
func (c *Coordinator) Timeout() int {
c.Active.Lock()
defer c.Active.Unlock()

@ -11,15 +11,10 @@ import (
type Manager struct {
*Dev
I2CDevice
SystemUpdates
*Active
Hb time.Duration
}
type SystemUpdates interface {
DeviceUpdateHandler(int, string, string)
}
type Active struct {
sync.Mutex
bool
@ -40,10 +35,9 @@ type I2CDevice interface {
GetType() string
}
func NewDeviceManager(i2c I2CDevice,sys SystemUpdates) *Manager {
func NewDeviceManager(i2c I2CDevice) *Manager {
m := &Manager{Hb:time.Duration(1*time.Second)}
m.I2CDevice = i2c
m.SystemUpdates = sys
m.Active = &Active{}
m.Dev = &Dev{Addr:i2c.GetAddr(),Type:i2c.GetType(),Status:i2c.GetStatus()}
return m
@ -65,17 +59,28 @@ func (m *Manager) Exit() {
func (m *Manager) Monitor() {
for m.IsActive() {
go m.DeviceStatus()
if m.Status == "KILLED" {
m.Exit()
}
time.Sleep(m.Hb)
}
}
func (m *Manager) DeviceStatus() {
status := m.GetStatus()
if status != m.Status { // changed
go m.DeviceUpdateHandler(m.Addr,status,m.Type)
m.Status = status
func (m *Manager) GetType() string {
return m.Type
}
func (m *Manager) GetStatus() string {
m.Status = m.I2CDevice.GetStatus()
return m.Status
}
func (m *Manager) GetData() string {
return ""
}
func (m *Manager) GetAddr() int {
return m.Addr
}
// atomic activation and deactivation

@ -16,19 +16,19 @@ type Coordinator struct {
}
type Managers struct {
Directory map[uint32](chan<- bool)
Directory map[uint32](chan<- *Client)
sync.Mutex
}
func NewCoordinator(t string,ch chan *Client, err chan error) *Coordinator {
d := make(map[uint32](chan<- bool))
d := make(map[uint32](chan<- *Client))
m := &Managers{Directory:d}
c := &Coordinator{Type: t,IncomingClients: ch,Err:err}
c.Managers = m
return c
}
func FindNewManager(c *Client,ch chan bool, err chan error) {
func FindNewManager(c *Client,ch chan *Client, err chan error) {
switch c.Type {
case "reactor":
NewReactorManager(c,ch,err)
@ -58,13 +58,13 @@ func (c *Coordinator) ClientHandler(cl *Client) {
defer c.Managers.Unlock()
if m, exists := c.Managers.Directory[cl.Id]; exists {
// manager in memory
m <-true
m <-cl
} else {
// create channel and manager
ch := make(chan bool)
ch := make(chan *Client)
FindNewManager(cl, ch,c.Err)
c.Managers.Directory[cl.Id] = ch
// will block until manager is ready
ch <-true
ch <-cl
}
}

@ -16,8 +16,10 @@ import (
type Manager struct {
*Client // gives access to c.Ip c.Id etc
ClientReconnect chan *Client // chan for client reconnect
Hb time.Duration
Active active
Sig chan bool
Err chan error
}
@ -27,19 +29,28 @@ type active struct{
int
}
func NewManager(c *Client, err chan error) *Manager {
hb := time.Duration(5) //hb to
m := &Manager{Hb:hb,Err:err}
func NewManager(c *Client, ch chan *Client, sig chan bool, err chan error) *Manager {
hb := time.Duration(1) //hb to
m := &Manager{Hb:hb,ClientReconnect:ch,Err:err}
m.Client = c
m.Sig = sig
go m.Reconnect()
return m
}
func (m *Manager) Reconnect() {
c := <-m.ClientReconnect
m.Client = c // could contain new ip or port
m.Start()
}
func (m *Manager) Start() {
// establish connection with client and start pinging at set intervals
if !m.Activate() {
// manager already running
m.Err <-errors.New("Manager already running!")
} // if we get here, manager is atomically activated and we can ensure start wont run again
m.Sig <-true
}
func (m *Manager) Exit() {
@ -47,7 +58,8 @@ func (m *Manager) Exit() {
if !m.Deactivate() {
m.Err <-errors.New("Manager already disabled!")
}
fmt.Printf("Manager %v exiting", m.Id)
go m.Reconnect()
m.Sig <-false
}
// reactor manager atomic operations
@ -118,11 +130,11 @@ func (m *Manager) Connect() *grpc.ClientConn{
if code == (5 | 14) { // == unavailable or not found
to := m.Timeout()
if to == 0 {
fmt.Printf("Client not responding, exiting...\n")
fmt.Printf("Client not responding\n")
m.Exit()
return&grpc.ClientConn{}
}
fmt.Printf("gRPC endpoint currently unavailable, retrying in %v ms",to)
fmt.Printf("gRPC endpoint currently unavailable, retrying in %v ms\n",to)
time.Sleep(time.Duration(to) * time.Millisecond)
} else {
fmt.Printf("ERR GRPC: %v\n",code)

@ -15,7 +15,6 @@ import (
type ReactorManager struct {
*Manager
Devs *Devices
ClientConnections <-chan bool
}
type Devices struct {
@ -23,26 +22,24 @@ type Devices struct {
D map[int]Device
}
func NewReactorManager(c *Client,ch chan bool,err chan error) {
func NewReactorManager(c *Client,ch chan *Client,err chan error) {
d := new(Devices)
r := &ReactorManager{Devs:d,ClientConnections:ch}
r.Manager = NewManager(c, err)
go r.Listen()
r := &ReactorManager{Devs:d}
start := make(chan bool)
r.Manager = NewManager(c, ch, start, err)
go r.Listen(start)
}
func (r *ReactorManager) Listen() {
func (r *ReactorManager) Listen(ch chan bool) {
for {
c := <-r.ClientConnections
if c {
sig := <-ch
if sig {
r.Start()
} else {
r.Exit()
}
}
}
func (r *ReactorManager) Start() {
r.Manager.Start()
conn := r.Connect()
go r.Monitor(conn)
}
@ -52,16 +49,16 @@ func (r *ReactorManager) Monitor(conn *grpc.ClientConn) {
client := pb.NewMonitoringClient(conn)
for r.IsActive() {
req := &pb.ReactorStatusRequest{Id:r.Id}
resp, err := client.ReactorStatusHandler(context.Background(),req)
resp, err := client.GetReactorStatus(context.Background(),req)
code := status.Code(err)
if code != 0 { // if != OK
fmt.Printf("Reactor %v down! Exiting manager...", r.Id)
fmt.Printf("Reactor %v down! ", r.Id)
r.Exit()
}
//for _,v := range resp.GetDevices() {
// TODO changed sensors
//}
fmt.Printf("Reactor %v online\n", resp.GetId())
for _,v := range resp.GetDevices() {
fmt.Printf("%v is %v, ",v.GetType(),v.GetStatus())
}
fmt.Print("\n")
time.Sleep(r.Hb * time.Second) // time between sensor pings
}
}

@ -11,7 +11,6 @@ import (
type TUIManager struct {
*Manager // embedded manager for access to methods and client
Sys *KnownReactors
ClientConnections <-chan bool
Err chan error
Hb time.Duration
}
@ -53,28 +52,27 @@ type DeviceManager interface{
GetType() string
}
func NewTUIManager(c *Client,ch chan bool, err chan error) {
func NewTUIManager(c *Client,ch chan *Client, err chan error) {
k := new(KnownReactors)
m := NewManager(c, err)
sig := make(chan bool)
m := NewManager(c, ch, sig, err)
hb := time.Duration(5)
t := &TUIManager{Hb: hb,Sys: k,Err: err, ClientConnections: ch}
t := &TUIManager{Hb: hb,Sys: k,Err: err}
t.Manager = m
go t.Listen()
go t.Listen(sig)
}
func (t *TUIManager) Listen() {
func (t *TUIManager) Listen(sig chan bool) {
for {
c := <-t.ClientConnections
c := <-sig
if c {
t.Start()
} else {
t.Exit()
}
}
}
func (t *TUIManager) Start() {
t.Manager.Start()
//
//conn := t.Conn()
//go t.Monitor(conn)
}

@ -0,0 +1,25 @@
general outline
want:
- reserved ips
- only really matters for server itself
- convienent for the devices but not extremly neccesarry
- seems to be that now the local network addresses stay the same
- vlan for the server and embedded devices
- server/devices can still access http(s?) port for updates
- just want to close devices ssh port
- way to connect to vlan remotely (vpn tunnel)
- ssh into any device once connected to the server
- subdomain/static ip for server to test with autoconnect?
- can test with reserved ips
- basically endpoint.forelight.com would forward to the provided ip:port combo to allow remote client testing and hardcoded urls for dns resolution
- local dns option?
- ask about laptop dual boot
- fedora on like 50 gigs with a barebones i3wm
- any auth issues?
reserve static ip for the server
mac address
ip address request

158
notes

@ -452,3 +452,161 @@ I2CCoordinator
- send the i2cdevice struct to embed in rm
- can call interface funcs on the embedded interface
Eureka part 2?
we are writing all of the software libraries which means we should (with some basic cleansing) be able to just send direct database queries
this means some things which could be pros or cons
- each sensor realistically will have its own table for each reactor.
- we can group entries by reactor and look across time stamps (tidy?)
- we can log sql entries as text based backups
- we can use basic string struct with time stamps
- each sensor library will use a common struct and probably just use string fmting
- there are some efficiency benfiets if we used custom gRPC calls for each db entry
- but we can just leverage a biolerplate call with some extra overhead?
- we still need a way of representing state of components
- reactor is easy and could be kept server side
- sensor needs to be transmitted across rlc
- should default to down if the reactor goes offline (unknown?)
direct query approach
pros
- easy to implement
- easy to use interfaces for common libs (compiling efficiency)
- easy to add sensors (use common libs and just make custom string in wrapper)
- can develop logging and db parts as manager funcs
cons
- need unique daemon to parse data on server for state struct
- trusting each sensor to maintain itself
- still need a way of translating state
state problem
it just should be an enumeration
its efficeint (could be as little as 4 bits but probably 1 byte) as opposed to a string ( len(s) * byte ex "ONLINE" = 6)
- is this all worth ~1-10? bytes of savings per dev?
- 100 reactors @ ~45 sensors = 46*100 = ~4.5 kb of state or ~ 36kb if we use strings
- so maybe?
more important than memory are network calls
need to update on tui:
- state changes (hopefully less frequent)
- current value (~5 seconds - ~30 minutes)
store both client and server side
- only store actively view data client side to prevent leaks
- store full struct but only serve as request response to prevent extra copies
system struct
- mapping of reactor ids to "Reactor" structs
- reactor is mapping of addr to sensor structs
- sensor struct is basic info
- device type (enum vs string)
- device status (enum vs string)
- device most recent value (int? vs string?)
- if offline last seen time
notes on struct
- should ideally have locks at reactor and sensor level
- use func to return sensor list via lock on reactor
- use func to update sensor list via lock on reactor
- use returned list to parse and request value from each sensor
- use goroutines and channels for efficient operation
- build response via returned structs
- respond to client
note on tui manager
- ideally should keep simplified current client struct to spawn copies of the update daemons for each sensor
- each daemon should be EXTREMELY light weight and just feed new data values to the manager
- tuimanager will be responsible for efficently buffering for tui client requests
- tui pings should be frequent and response should be any data to update
- client side we should be able to essentialy overwrite any entries on our response
- simplifies interface
data aggregation outline
Starting from sensor
1) specific sensor manager initiates a read of the embedded i2c dev
2) on success read gets logged with the time to the internal txt log (json)
RLC loop:
3) rlc has long running method with sub routines reading each log and adding pending entries to the buffer
- buffer is bounded and routines block when it fills (use to limit rpc reply length)
4) on ping buffer is parsed into rpc reply
- send buffered log ids to cleanup routine but dont delete from log yet
5) next req has transaction ids of previous data that have been acked
6) send ids to cleanup process
7) respond with new buffer repeat
RM side:
received data from rlc
1) send reply to data parsing goroutine
parser loop:
1) start a completion listener
2) read each device in reply
3) start goroutine of db daemon for each dev with completion chan
4) once reply is empty can end
db daemon loop:
1) loop over device data entries
2) initiate db connection
3) parse through each data entry and send to db
4) if it was succesfull send the transaction id to the completion channel
monitoring rpc loop:
1) listen for completed transaction entries
2) append entries to ack
3) send to rm on ping timer
Data is now in database for all intents and purposes
process ensures that the data is collected
now the data is on the server
6) server sends grpc reply results to a parsing gorotuine
7) the parser loops over reply and spawns db daemons to enter info
should we even aggregate data? why would we not just write a db client as part of the rlc and let the sensor managers themselves log
need to focus:
2 major things going on
rlc can do data stuff on the reactor itself and just use the db client
- relies on exposed db endpoint but can just handle auth stuff
- can log locally
rlc also responds to status requests
- queries sensors for status
- adds to and sends reply
- recieves these pings <= 5 seconds apart
- should have down detection to kill db actions
- optionally include a "data" string of the most recent reading
going to focus on status
want system to
init reactors
poll for status
respond with sensor info
view and manage on tui
how?
all structs only in status context
rlc struct
- knows
- connected devs and managers
- is able to
- poll managers for state info
- relies on
- managers for accurate and fast data
- implements data aggregation for rm
dm struct
- knows
- underlying i2c dev interface
- basic device info
- is able to
- respond to rlc status requests
- relies on
- rlc to ask for status
- implements
- status response

Binary file not shown.

After

Width:  |  Height:  |  Size: 239 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 239 KiB

@ -0,0 +1,70 @@
package server
import (
"fmt"
"sync"
"log"
)
// this package creates coordinators responsible for keeping track of active clients and invoking managers
type Coordinator struct {
Type string // ["reactor","tui"]
IncomingClients <-chan *Client
*Managers
Err chan error
}
type Managers struct {
Directory map[uint32](chan<- bool)
sync.Mutex
}
func NewCoordinator(t string,ch chan *Client, err chan error) *Coordinator {
d := make(map[uint32](chan<- bool))
m := &Managers{Directory:d}
c := &Coordinator{Type: t,IncomingClients: ch,Err:err}
c.Managers = m
return c
}
func FindNewManager(c *Client,ch chan bool, err chan error) {
switch c.Type {
case "reactor":
NewReactorManager(c,ch,err)
case "tui":
NewTUIManager(c,ch,err)
default:
log.Fatal(fmt.Sprintf("ERROR %v NOT FOUND",c.Type))
}
}
func (c *Coordinator) Start() {
// on start we need to create channel listener
// on each new connection we want to check its id against our mapping
go c.Listen()
}
func (c *Coordinator) Listen() {
for {
cl := <-c.IncomingClients
go c.ClientHandler(cl)
}
}
func (c *Coordinator) ClientHandler(cl *Client) {
// (creates and) notifies manager of client connection
c.Managers.Lock()
defer c.Managers.Unlock()
if m, exists := c.Managers.Directory[cl.Id]; exists {
// manager in memory
m <-true
} else {
// create channel and manager
ch := make(chan bool)
FindNewManager(cl, ch,c.Err)
c.Managers.Directory[cl.Id] = ch
// will block until manager is ready
ch <-true
}
}

@ -0,0 +1,110 @@
package server
import (
"fmt"
"net"
"log"
"sync"
"context"
"FRMS/internal/pkg/system"
"google.golang.org/grpc"
pb "FRMS/internal/pkg/grpc"
)
// the goal here is to set up a gRPC server to respond to client pings with their IP and to establish a new manager for that specific client
// going to rename shit to be more general
type Listener struct { // exporting for easy use in the short term
// Reactor map[uint32]*ReactorManager this will go in eventual "coordinator" struct
Ip string
Port int
*Coordinators
Err chan error
pb.UnimplementedHandshakeServer
}
type Coordinators struct {
Channel map[string](chan<- *Client)
sync.Mutex
}
type Client struct {
Ip string
Port int
Id uint32
Model string
Type string
}
func NewClient(ip, model, t string, port int, id uint32) *Client {
return &Client{Ip:ip,Port:port,Id:id,Model:model,Type:t}
}
func GetIp(e string) (string, error) {
return system.GetIp(e)
}
func NewListener(ifconfig string,ch chan error) (*Listener, error) {
//m := make(map[uint32]*ReactorManager)
var ip string
var err error
if ip, err = GetIp(ifconfig); err != nil {
return &Listener{}, err
}
m := make(map[string](chan<- *Client))
c := &Coordinators{Channel:m}
l := &Listener{Ip:ip,Err:ch}
l.Coordinators = c
return l, nil
}
func (l *Listener) Start() {
// start grpc server and implement reciever
if err := l.Register(); err != nil {
l.Err <- err
}
// listener started and grpc handler registered
fmt.Printf("Started listener on %v:%v\n",l.Ip,l.Port)
}
func (l *Listener) Register() error {
// creates a gRPC service and binds it to our handler
lis, err := net.Listen("tcp", fmt.Sprintf("%v:0",l.Ip)) // by binding to :0 we should get assigned an empty port
if err != nil {
return err
}
l.Port = lis.Addr().(*net.TCPAddr).Port // getting the port we were assigned
grpcServer := grpc.NewServer()
pb.RegisterHandshakeServer(grpcServer, l)
go grpcServer.Serve(lis)
return nil
}
func (l *Listener) ClientDiscoveryHandler(ctx context.Context, ping *pb.ClientDiscoveryRequest) (*pb.ClientDiscoveryResponse, error) {
// incoming reactor ping need to spawn coord
c := NewClient(ping.GetIp(),ping.GetModel(),ping.GetClientType(),int(ping.GetPort()),ping.GetId())
fmt.Printf("%v Client %v has connected from %v:%v\n",c.Type,c.Id,c.Ip,c.Port)
go l.ConnectClient(c)
// we dont handle any actual logic about the creation so we just respon true if the request was received
return &pb.ClientDiscoveryResponse{Success:true}, nil
}
func (l *Listener) ConnectClient(c *Client){
// send to reactor coordinator for ease
l.Coordinators.Lock()
defer l.Coordinators.Unlock()
switch c.Type {
case "reactor","tui":
if ch, exists := l.Coordinators.Channel[c.Type]; exists {
ch <-c
} else {
ch := make(chan *Client)
newC := NewCoordinator(c.Type, ch, l.Err)
go newC.Start()
l.Coordinators.Channel[c.Type] = ch
ch <-c
}
default:
log.Fatal("Error! client %v not supported!",c.Type)
}
}

@ -0,0 +1,135 @@
package server
import (
"fmt"
"time"
"math"
"sync"
"errors"
"google.golang.org/grpc"
"google.golang.org/grpc/status"
"google.golang.org/grpc/credentials/insecure"
)
// this package will implement a boilerplate manager
// manager connects to client on start and returns the gRPC connection to make gRPC clients
type Manager struct {
*Client // gives access to c.Ip c.Id etc
Hb time.Duration
Active active
Err chan error
}
type active struct{
sync.Mutex
bool
int
}
func NewManager(c *Client, err chan error) *Manager {
hb := time.Duration(1) //hb to
m := &Manager{Hb:hb,Err:err}
m.Client = c
return m
}
func (m *Manager) Start() {
// establish connection with client and start pinging at set intervals
if !m.Activate() {
// manager already running
m.Err <-errors.New("Manager already running!")
} // if we get here, manager is atomically activated and we can ensure start wont run again
}
func (m *Manager) Exit() {
// exit function to eventually allow saving to configs
if !m.Deactivate() {
m.Err <-errors.New("Manager already disabled!")
}
fmt.Printf("Manager %v exiting\n", m.Id)
}
// reactor manager atomic operations
func (m *Manager) IsActive() bool {
m.Active.Lock()
defer m.Active.Unlock()
return m.Active.bool
}
func (m *Manager) Activate() bool {
m.Active.Lock()
defer m.Active.Unlock()
alive := m.Active.bool
if alive {
return false
} else {
m.Active.bool = true
m.Active.int = 0
return m.Active.bool
}
}
func (m *Manager) Deactivate() bool {
m.Active.Lock()
defer m.Active.Unlock()
alive := m.Active.bool
if alive {
m.Active.bool = false
return true
} else {
return m.Active.bool
}
}
// connection stuff
func (m *Manager) Timeout() int {
// keeps track of and generates timeout [0-1.2s) over span of ~2.5s
// returns 0 on TO elapse
m.Active.Lock()
defer m.Active.Unlock()
if m.Active.int < 9 {
v := int(5 * math.Pow(float64(2), float64(m.Active.int)))
m.Active.int += 1
return v
} else {
// exceeded retries
return 0
}
}
func (m *Manager) Connect() *grpc.ClientConn{
// establish initial gRPC connection with client
var opts []grpc.DialOption
var conn *grpc.ClientConn
opts = append(opts,grpc.WithTransportCredentials(insecure.NewCredentials()))
for {
if !m.IsActive() {
fmt.Printf("Aborting connection attempt\n")
return &grpc.ClientConn{}
}
var err error
conn, err = grpc.Dial(fmt.Sprintf("%v:%v",m.Ip,m.Port),opts...)
// begin error handling
code := status.Code(err)
if code != 0 { // != OK
if code == (5 | 14) { // == unavailable or not found
to := m.Timeout()
if to == 0 {
fmt.Printf("Client not responding\n")
m.Exit()
return&grpc.ClientConn{}
}
fmt.Printf("gRPC endpoint currently unavailable, retrying in %v ms\n",to)
time.Sleep(time.Duration(to) * time.Millisecond)
} else {
fmt.Printf("ERR GRPC: %v\n",code)
m.Err <-err
}
}
break;
}
return conn
}

@ -0,0 +1,67 @@
package server
import (
"fmt"
"time"
"context"
"sync"
"google.golang.org/grpc"
"google.golang.org/grpc/status"
pb "FRMS/internal/pkg/grpc"
)
// this package will implement a reactor coordinator and associated go routines
type ReactorManager struct {
*Manager
Devs *Devices
ClientConnections <-chan bool
}
type Devices struct {
mu sync.Mutex
D map[int]Device
}
func NewReactorManager(c *Client,ch chan bool,err chan error) {
d := new(Devices)
r := &ReactorManager{Devs:d,ClientConnections:ch}
r.Manager = NewManager(c, err)
go r.Listen()
}
func (r *ReactorManager) Listen() {
for {
c := <-r.ClientConnections
if c {
r.Start()
} else {
r.Exit()
}
}
}
func (r *ReactorManager) Start() {
r.Manager.Start()
conn := r.Connect()
go r.Monitor(conn)
}
func (r *ReactorManager) Monitor(conn *grpc.ClientConn) {
defer conn.Close()
client := pb.NewMonitoringClient(conn)
for r.IsActive() {
req := &pb.ReactorStatusRequest{Id:r.Id}
resp, err := client.GetReactorStatus(context.Background(),req)
code := status.Code(err)
if code != 0 { // if != OK
fmt.Printf("Reactor %v down! ", r.Id)
r.Exit()
}
for _,v := range resp.GetDevices() {
fmt.Printf("%v is %v, ",v.GetType(),v.GetStatus())
}
fmt.Print("\n")
time.Sleep(r.Hb * time.Second) // time between sensor pings
}
}

@ -0,0 +1,80 @@
package server
import (
"fmt"
"time"
"sync"
)
// implement tui specific manager to be called for each client conn
type TUIManager struct {
*Manager // embedded manager for access to methods and client
Sys *KnownReactors
ClientConnections <-chan bool
Err chan error
Hb time.Duration
}
type KnownReactors struct {
Reactors map[uint32]*Reactor
sync.Mutex
}
type Reactor struct {
Devices map[string]*Device
}
type Device struct {
Status DeviceStatus
Type string
Addr int
}
type DeviceStatus uint32
const (
READY DeviceStatus = iota
ACTIVE
DISABLED
)
func (d DeviceStatus) String() string {
return [...]string{"Ready","Active","Disabled"}[d]
}
func (d Device) String() string {
return fmt.Sprintf("%v is %v at %x",d.Type,d.Status,d.Addr)
}
type DeviceManager interface{
// GetStatus() uint32 UNSUPPORTED: arguable memory benifit but until we support 100s of sensors across 10s of tui clients im not implementing it
PrintSatus() string
GetType() string
}
func NewTUIManager(c *Client,ch chan bool, err chan error) {
k := new(KnownReactors)
m := NewManager(c, err)
hb := time.Duration(5)
t := &TUIManager{Hb: hb,Sys: k,Err: err, ClientConnections: ch}
t.Manager = m
go t.Listen()
}
func (t *TUIManager) Listen() {
for {
c := <-t.ClientConnections
if c {
t.Start()
} else {
t.Exit()
}
}
}
func (t *TUIManager) Start() {
t.Manager.Start()
//conn := t.Conn()
//go t.Monitor(conn)
}
Loading…
Cancel
Save