diff --git a/cmd/reactor/reactor b/cmd/reactor/reactor index e1ebf82..efb8b05 100755 Binary files a/cmd/reactor/reactor and b/cmd/reactor/reactor differ diff --git a/cmd/server/server b/cmd/server/server index 8006ce6..7edb90a 100755 Binary files a/cmd/server/server and b/cmd/server/server differ diff --git a/cmd/tui/main.go b/cmd/tui/main.go index 3a77058..fefee53 100644 --- a/cmd/tui/main.go +++ b/cmd/tui/main.go @@ -13,8 +13,8 @@ type TUI interface { Start() } -func NewTUI(ip string, port int, ch chan error) TUI { - return tui.NewTUI(ip, port, ch) +func NewTUI(ip string, port int, ifconfig string, ch chan error) TUI { + return tui.NewTUI(ip, port, ifconfig, ch) } func main() { @@ -22,11 +22,11 @@ func main() { var err error flag.Usage = func() { w := flag.CommandLine.Output() - fmt.Fprintf(w,"Usage: %s port \n", os.Args[0]) + fmt.Fprintf(w,"Usage: %s port [eth*, wlan*, etc.]\n", os.Args[0]) } iptr := flag.String("i","192.168.100.2","ip address of listener") flag.Parse() - if flag.NArg() != 1 { + if flag.NArg() != 2 { flag.Usage() os.Exit(1) } @@ -37,9 +37,10 @@ func main() { } else if err != nil { log.Fatal(err) } + ifconfig := string(args[1]) ip := *iptr ch := make(chan error) - t := NewTUI(ip,port,ch) + t := NewTUI(ip,port,ifconfig,ch) go t.Start() err = <-ch if err != nil { diff --git a/cmd/tui/tui b/cmd/tui/tui index bbf5018..8c2917b 100755 Binary files a/cmd/tui/tui and b/cmd/tui/tui differ diff --git a/internal/pkg/I2C/device.go b/internal/pkg/I2C/device.go index 69296e9..7981bd9 100644 --- a/internal/pkg/I2C/device.go +++ b/internal/pkg/I2C/device.go @@ -2,13 +2,21 @@ package I2C import ( "fmt" - _ "sync" + "sync" + "time" ) type I2CDevice struct { *I2CBus // embeds bus bool // stores whether dev is currently connected int // addr + Data *data +} + +type data struct { + string + bool + sync.Mutex } func (d I2CDevice) String() string { @@ -20,6 +28,7 @@ func NewDevice(addr int,bus *I2CBus) *I2CDevice { d := &I2CDevice{} d.I2CBus = bus d.int = addr + d.Data = &data{} return d } @@ -31,9 +40,11 @@ func (d *I2CDevice) GetStatus() string { // TODO s := d.I2CBus.GetStatus(d.int) if s { - return "ACTIVE" + d.Data.Active() + return "[green]ACTIVE[white]" } else { - return "KILLED" + d.Data.Killed() + return "[red]KILLED[white]" } } @@ -41,3 +52,28 @@ func (d *I2CDevice) GetType() string { // TODO return fmt.Sprint(d) } + +func (d *I2CDevice) GetData() string { + d.Data.Lock() + defer d.Data.Unlock() + return d.Data.string +} + +func (d *data) Active() { + d.Lock() + defer d.Unlock() + if !d.bool { + d.string = "" + d.bool = true + } +} + +func (d *data) Killed() { + d.Lock() + defer d.Unlock() + if d.bool { + fmt.Println(time.Now().Location()) + d.string = time.Now().Format("Mon at 03:04:05pm MST") + d.bool = false + } +} diff --git a/internal/pkg/I2C/monitor.go b/internal/pkg/I2C/monitor.go index 720820b..07bb585 100644 --- a/internal/pkg/I2C/monitor.go +++ b/internal/pkg/I2C/monitor.go @@ -63,7 +63,7 @@ func (m *I2CMonitor) ConnectDevice(addr int) { m.DevChan <-addr } -func (m *I2CMonitor) GetDevice(addr int) interface{GetAddr() int; GetStatus() string; GetType() string} { +func (m *I2CMonitor) GetDevice(addr int) interface{ GetAddr() int; GetData() string; GetStatus() string; GetType() string } { m.Devices.Lock() defer m.Devices.Unlock() return m.Devices.m[addr] diff --git a/internal/pkg/reactor/monitoring.go b/internal/pkg/reactor/monitoring.go index 1e94135..f8f9612 100644 --- a/internal/pkg/reactor/monitoring.go +++ b/internal/pkg/reactor/monitoring.go @@ -49,7 +49,6 @@ func (c *Coordinator) GetStatus() []*DeviceStatus { devs = append(devs,s) wg.Done() case <-allDone: - fmt.Printf("Devices scaned\n") return devs } } diff --git a/internal/pkg/reactor/rlcoordinator.go b/internal/pkg/reactor/rlcoordinator.go index 4487db8..5fe2e41 100644 --- a/internal/pkg/reactor/rlcoordinator.go +++ b/internal/pkg/reactor/rlcoordinator.go @@ -66,6 +66,7 @@ type DeviceManager interface { type I2CDev interface { GetAddr() int + GetData() string GetStatus() string GetType() string } @@ -76,7 +77,7 @@ func NewDeviceManager(i2c I2CDev) DeviceManager { type I2CMonitor interface { Monitor() - GetDevice(int) interface{ GetStatus() string; GetType() string;GetAddr() int } + GetDevice(int) interface{ GetAddr() int; GetStatus() string; GetData() string; GetType() string} } func NewI2CMonitor(b int,ch chan int) I2CMonitor { diff --git a/internal/pkg/sensor/manager.go b/internal/pkg/sensor/manager.go index f5cd8b4..5350a8e 100644 --- a/internal/pkg/sensor/manager.go +++ b/internal/pkg/sensor/manager.go @@ -4,6 +4,7 @@ import ( _"fmt" "time" "sync" + "strings" _ "FRMS/internal/pkg/I2C" "log" ) @@ -26,6 +27,7 @@ type Dev struct { Addr int Type string Status string // could be more efficient but to hell with it + Data string } type I2CDevice interface { @@ -33,13 +35,14 @@ type I2CDevice interface { GetAddr() int GetStatus() string GetType() string + GetData() string } func NewDeviceManager(i2c I2CDevice) *Manager { m := &Manager{Hb:time.Duration(1*time.Second)} m.I2CDevice = i2c m.Active = &Active{} - m.Dev = &Dev{Addr:i2c.GetAddr(),Type:i2c.GetType(),Status:i2c.GetStatus()} + m.Dev = &Dev{Addr:i2c.GetAddr(),Type:i2c.GetType(),Status:i2c.GetStatus(),Data:i2c.GetData()} return m } @@ -48,7 +51,7 @@ func (m *Manager) Start() { if !m.Activate() { log.Fatal("Manager already running!") } // atomically activated if this runs - go m.Monitor() + // go m.Monitor() } func (m *Manager) Exit() { @@ -57,26 +60,21 @@ func (m *Manager) Exit() { } } -func (m *Manager) Monitor() { - for m.IsActive() { - if m.Status == "KILLED" { - m.Exit() - } - time.Sleep(m.Hb) - } -} - func (m *Manager) GetType() string { return m.Type } func (m *Manager) GetStatus() string { m.Status = m.I2CDevice.GetStatus() + if m.IsActive() && strings.Contains(m.Status,"KILLED") { + m.Exit() + } return m.Status } func (m *Manager) GetData() string { - return "" + m.Data = m.I2CDevice.GetData() + return m.Data } func (m *Manager) GetAddr() int { diff --git a/internal/pkg/server/coordinator.go b/internal/pkg/server/coordinator.go index c49819f..e069862 100644 --- a/internal/pkg/server/coordinator.go +++ b/internal/pkg/server/coordinator.go @@ -2,6 +2,7 @@ package server import ( "sync" + "fmt" ) // this package creates coordinators responsible for keeping track of active clients and invoking managers @@ -52,6 +53,7 @@ func (c *Coordinator) GetManager(cl *Client) GeneralManager { defer c.Managers.Unlock() var exists bool var m GeneralManager + fmt.Printf("client %v\n",cl) if m, exists = c.Managers.Directory[cl.Id]; !exists { // manager in memory m = c.NewManager(cl, c.Err) diff --git a/internal/pkg/server/listener.go b/internal/pkg/server/listener.go index ce2132f..1ee7352 100644 --- a/internal/pkg/server/listener.go +++ b/internal/pkg/server/listener.go @@ -59,6 +59,7 @@ func (l *Listener) Start() { go l.Sys.Start() // listener started and grpc handler registered log.Printf("Started listener on %v:%v\n",l.Ip,l.Port) + fmt.Printf("=========================\n PORT: %v\n=========================\n",l.Port) } func (l *Listener) Register() error { diff --git a/internal/pkg/server/manager.go b/internal/pkg/server/manager.go index 44349a7..1432ea4 100644 --- a/internal/pkg/server/manager.go +++ b/internal/pkg/server/manager.go @@ -1,6 +1,7 @@ package server import ( + "log" "time" "math" "sync" @@ -37,6 +38,7 @@ func (m *Manager) Start(cl *Client) { // manager already running m.Err <-errors.New("Manager already running!") } // if we get here, manager is atomically activated and we can ensure start wont run again + log.Printf("%v Manager (%v) starting!\n",m.Type,m.Id) } func (m *Manager) Exit() { @@ -44,6 +46,7 @@ func (m *Manager) Exit() { if !m.Deactivate() { m.Err <-errors.New("Manager already disabled!") } + log.Printf("%v Manager (%v) exiting!\n",m.Type,m.Id) } // reactor manager atomic operations diff --git a/internal/pkg/server/reactormanager.go b/internal/pkg/server/reactormanager.go index 7e9df4f..6cbee2a 100644 --- a/internal/pkg/server/reactormanager.go +++ b/internal/pkg/server/reactormanager.go @@ -5,7 +5,7 @@ import ( "time" "log" "context" - _ "sync" + "sync" "google.golang.org/grpc" "google.golang.org/grpc/status" "google.golang.org/grpc/credentials/insecure" @@ -18,10 +18,18 @@ type ReactorManager struct { *Manager StatusMon *StatusMonitor DevsMon *StatusMonitor + *devstatus +} + +type devstatus struct { + sync.Mutex + Devs map[uint32]*DeviceInfo } func NewReactorManager(c *Client,sys *SystemViewer,err chan error) GeneralManager { r := &ReactorManager{} + di := make(map[uint32]*DeviceInfo) + r.devstatus = &devstatus{Devs:di} r.Manager = NewManager(err) r.StatusMon = sys.AddReactorSender() r.DevsMon = sys.AddDeviceSender(c.Id) @@ -42,7 +50,7 @@ func (r *ReactorManager) Start(cl *Client) { func (r *ReactorManager) Exit() { r.Manager.Exit() - go r.StatusMon.Send(&DeviceInfo{Id:r.Id,Type:"Reactor",Status:"[red]OFFLINE[white]",Data:fmt.Sprintf(": Last Seen %v",time.Now().Format("Mon at 03:04:05pm"))}) + go r.StatusMon.Send(&DeviceInfo{Id:r.Id,Type:"Reactor",Status:"[red]OFFLINE[white]",Data:fmt.Sprintf("Last Seen %v",time.Now().Format("Mon at 03:04:05pm MST"))}) } func (r *ReactorManager) GetPort() int { @@ -93,13 +101,23 @@ func (r *ReactorManager) Monitor(conn *grpc.ClientConn) { code := status.Code(err) if code != 0 { // if != OK fmt.Printf("Reactor %v down! Code: %v\n", r.Id,code) + r.devstatus.Lock() + for _, d := range r.Devs { + newd := d + newd.Status = "[yellow]UNKOWN[white]" + go r.DevsMon.Send(newd) + } + r.devstatus.Unlock() r.Exit() break; } + r.devstatus.Lock() for _,v := range resp.GetDevices() { d := &DeviceInfo{Id:uint32(v.GetAddr()),Type:v.GetType(),Status:v.GetStatus(),Data:v.GetData()} go r.DevsMon.Send(d) + r.Devs[d.Id] = d } + r.devstatus.Unlock() time.Sleep(r.Hb) // time between sensor pings } } diff --git a/internal/pkg/server/tuimanager.go b/internal/pkg/server/tuimanager.go index 00a36c0..b367199 100644 --- a/internal/pkg/server/tuimanager.go +++ b/internal/pkg/server/tuimanager.go @@ -2,8 +2,8 @@ package server import ( "fmt" - _ "time" - _ "sync" + "time" + "sync" "net" "log" "context" @@ -19,9 +19,17 @@ type TUIManager struct { *SystemViewer Port *port Err chan error + *Timeout *pb.UnimplementedManagementServer } +type Timeout struct { + Alert chan bool + LastSeen time.Time + TO time.Duration + sync.Mutex +} + type port struct { Chan chan int int @@ -31,6 +39,8 @@ func NewTUIManager(ip string, c *Client, sys *SystemViewer, err chan error) Gene m := NewManager(err) t := &TUIManager{Err: err} t.Port = &port{Chan:make(chan int)} + alert := make(chan bool) + t.Timeout = &Timeout{Alert:alert,TO:time.Duration(2500*time.Millisecond)} // short time outs are fine because we will just rejoin t.Manager = m t.SystemViewer = sys t.Ip = ip @@ -39,11 +49,43 @@ func NewTUIManager(ip string, c *Client, sys *SystemViewer, err chan error) Gene func (t *TUIManager) Start(cl *Client) { // + t.PingReset() t.Manager.Start(cl) + go t.Timeoutd() go t.Register() // begin tui server to respond to tui client reqs //go t.Monitor(conn) } +func (t *Timeout) PingReset() { + t.Lock() + defer t.Unlock() + t.LastSeen = time.Now() +} + +func (t *TUIManager) Timeoutd() { + for t.IsActive() { + if sleep, elapsed := t.Elapsed(); elapsed { + // timeout elapsed + t.Exit() + } else { + time.Sleep(sleep) + } + } +} + +func (t *Timeout) Elapsed() (time.Duration, bool) { + t.Lock() + defer t.Unlock() + now := time.Now() + if now.After(t.LastSeen.Add(t.TO)) { + // timeout expired + return 0 * time.Second, true + } else { + sleep := t.LastSeen.Add(t.TO).Sub(now) + return sleep, false + } +} + func (t *TUIManager) Register() { lis, err := net.Listen("tcp", fmt.Sprintf("%v:0",t.Ip)) if err != nil { @@ -66,6 +108,7 @@ func (t *TUIManager) GetPort() int { } func (t *TUIManager) GetDevices(ctx context.Context, req *pb.GetDevicesRequest) (*pb.GetDevicesResponse, error) { + go t.PingReset() rid := req.GetReactorId() devices := []*pb.Dev{} resp := &pb.GetDevicesResponse{ClientId:t.Id,ReactorId:rid,Devices:devices} @@ -85,11 +128,13 @@ func (t *TUIManager) GetDevices(ctx context.Context, req *pb.GetDevicesRequest) } func (t *TUIManager) DeleteReactors(ctx context.Context, req *pb.DeleteReactorRequest) (*pb.DeleteReactorResponse, error) { + go t.PingReset() // return &pb.DeleteReactorResponse{}, nil } func (t *TUIManager) DeleteReactorDevice(ctx context.Context, req *pb.DeleteReactorDeviceRequest) (*pb.DeleteReactorDeviceResponse, error) { + go t.PingReset() // return &pb.DeleteReactorDeviceResponse{}, nil } diff --git a/internal/pkg/tui/client.go b/internal/pkg/tui/client.go index e40d5ba..5c325f4 100644 --- a/internal/pkg/tui/client.go +++ b/internal/pkg/tui/client.go @@ -30,8 +30,8 @@ type Active struct { int } -func NewTUIClient(ip string, port int) *TUIClient { - id, err := system.GetId("eth2") +func NewTUIClient(ip string, port int, ifconfig string) *TUIClient { + id, err := system.GetId(ifconfig) if err != nil { log.Fatal(err) } @@ -120,7 +120,7 @@ func (t *TUIClient) Connect() { func (t *TUIClient) GetDevices(id ...uint32) (map[uint32]*Device, error) { // returns var rid uint32 - if len(id) > 0 { + if len(id) > 0 && id[0] != 0 { rid = id[0] } req := &pb.GetDevicesRequest{ClientId:t.Id,ReactorId:rid} diff --git a/internal/pkg/tui/tui.go b/internal/pkg/tui/tui.go index a6c6633..420beb7 100644 --- a/internal/pkg/tui/tui.go +++ b/internal/pkg/tui/tui.go @@ -5,9 +5,11 @@ import ( "log" "sync" "strconv" + "strings" "time" "github.com/rivo/tview" _ "github.com/gdamore/tcell/v2" + "os" ) type Device struct { @@ -27,14 +29,14 @@ type TUI struct { Err chan error } -func NewTUI(ip string, port int, ch chan error) *TUI { +func NewTUI(ip string, port int, ifconfig string, ch chan error) *TUI { //r := make(map[uint32]*Reactor) t := &TUI{} //l := new(LocalView) //l.Reactors = r //t.LocalView = l t.Err = ch - client := NewTUIClient(ip, port) + client := NewTUIClient(ip, port, ifconfig) t.TUIClient = client return t } @@ -74,16 +76,11 @@ func (t *TUI) Monitor() { select { case reactor := <-t.SelectedReactor: // reactor has been selected in tui, grabbing devs - fmt.Printf("ReactorSelected %v\n",reactor) t.App.QueueUpdateDraw(func() { t.UpdateDevices(reactor) }) -<<<<<<< HEAD - case device : <-t.SelectedDevice: - fmt.Println("DeviceSelected") + case <-t.SelectedDevice: // TODO -======= ->>>>>>> 03b99d8d16e9ca33d05553b749613e30ab9be204 case <-timer: // time to ping for status t.App.QueueUpdateDraw(func() { @@ -96,6 +93,15 @@ func (t *TUI) Monitor() { func (t *TUI) UpdateDevices(r ...uint32) { // get devices for the reactor and update the tui var id uint32 + // see if there is a page being displayed + if name, _ := t.Display.DevicePages.GetFrontPage(); name != "" { + if tmp, err := strconv.ParseUint(name, 10, 32); err != nil { + log.Fatal(err) + } else { + id = uint32(tmp) + } + } + // overwrite if called as a func if len(r) > 0 { id = r[0] } @@ -103,7 +109,7 @@ func (t *TUI) UpdateDevices(r ...uint32) { if err != nil { log.Fatal(err) } - if len(r) > 0 { + if id != 0 { // reactor specificed split devs reactors := make(map[uint32]*Device) devices := make(map[uint32]*Device) @@ -139,9 +145,14 @@ func NewDisplay(rc,dc chan uint32) *Display { d.Flex = tview.NewFlex() lists := make(map[string]*tview.List) d.DeviceList = lists - d.ReactorList = tview.NewList().ShowSecondaryText(false) + d.ReactorList = tview.NewList()//.ShowSecondaryText(false) + d.ReactorList.AddItem("Quit","Press (q) to quit",113,func() { + d.App.Stop() + os.Exit(0) + }) d.DevicePages = tview.NewPages() d.ReactorList.SetTitle("Reactors").SetBorder(true) + d.ReactorList.SetSelectedFunc(d.SelectReactor) d.DevicePages.SetTitle("Devices").SetBorder(true) d.SelectedReactor = rc d.SelectedDevice = dc @@ -160,49 +171,53 @@ func (d *Display) DisplayReactors(r map[uint32]*Device) { //d.Lock() //defer d.Unlock() // locking may break the hell out of this gonna trust tview - for id, reactor := range r { - txt := fmt.Sprintf("%v %v%v", reactor.Id, reactor.Status,reactor.Data) - if d.ReactorList.GetItemCount() > int(reactor.Index) { + for _, reactor := range r { + txt := fmt.Sprintf("%v %v", reactor.Id, reactor.Status) + if d.ReactorList.GetItemCount() > int(reactor.Index) + 1 { d.ReactorList.RemoveItem(int(reactor.Index)) } - d.ReactorList.InsertItem(int(reactor.Index),txt,string(id),rune(49+reactor.Index),nil) + d.ReactorList.InsertItem(int(reactor.Index),txt,reactor.Data,rune(49+reactor.Index),nil) } } func (d *Display) DisplayDevices(devs map[uint32]*Device, rid uint32) { //d.Lock() - reactorPage := string(rid) + reactorPage := strconv.FormatUint(uint64(rid), 10) var reactorList *tview.List - if reactorList, ok := d.DeviceList[reactorPage]; !ok { - reactorList = tview.NewList() + var ok bool + if reactorList, ok = d.DeviceList[reactorPage]; !ok { + reactorList = tview.NewList().ShowSecondaryText(false) d.DeviceList[reactorPage] = reactorList + d.DevicePages.AddPage(reactorPage, reactorList, true, false) } //d.Unlock() for _, dev := range devs { - txt := fmt.Sprintf("%v %v at %x%x",dev.Type,dev.Status,dev.Id,dev.Data) // sensor alive at 0x0 data + txt := fmt.Sprintf("0x%x %v %v",dev.Id,dev.Status,dev.Type) // sensor alive at 0x0 data if reactorList.GetItemCount() > int(dev.Index) { reactorList.RemoveItem(int(dev.Index)) } - reactorList.InsertItem(int(dev.Index),txt,string(dev.Id),rune(49+dev.Index),nil) + reactorList.InsertItem(int(dev.Index),txt,dev.Data,0,nil) } d.DevicePages.SwitchToPage(reactorPage) } -func (d *Display) SelectReactor(index int, main, id string, r rune) { +func (d *Display) SelectReactor(index int, main, data string, r rune) { // called when reactor in list in selected - fmt.Println("SELECTED") - if id, err := strconv.Atoi(id); err != nil { - log.Fatal(err) - } else { - d.SelectedReactor <-uint32(id) + if main != "Quit" { + maintxt := strings.Split(main," ") + id := maintxt[0] + if id, err := strconv.ParseUint(id, 10, 32); err != nil { + log.Fatal(err) + } else { + d.SelectedReactor <-uint32(id) + } } } func (d *Display) SelectDevice(index int, main, id string, r rune) { // called when device is selected in sub menu - fmt.Println("SELECTED DEV") - if id, err := strconv.Atoi(id); err != nil { + if id, err := strconv.ParseUint(id, 10, 32); err != nil { log.Fatal(err) } else { d.SelectedDevice <-uint32(id)