diff --git a/debian b/debian new file mode 100755 index 0000000..bba4ca5 Binary files /dev/null and b/debian differ diff --git a/internal/configs/server.yaml b/internal/configs/server.yaml index a53d2cc..e19ac35 100644 --- a/internal/configs/server.yaml +++ b/internal/configs/server.yaml @@ -16,4 +16,3 @@ server: lis: 2022 reactor: 2023 tui: 2024 -server_name: "" diff --git a/internal/pkg/I2C/bus.go b/internal/pkg/I2C/bus.go index dd95a69..5bd755d 100644 --- a/internal/pkg/I2C/bus.go +++ b/internal/pkg/I2C/bus.go @@ -3,136 +3,118 @@ package I2C // file has general wrappers to interact with i2c-tools import ( - "fmt" - _ "log" - "encoding/hex" - "os/exec" - "bytes" - "strings" - "sync" - "strconv" - "FRMS/internal/pkg/logging" -) + "FRMS/internal/pkg/logging" + "FRMS/internal/pkg/system" + "bytes" + "errors" + "fmt" + _ "log" + "os/exec" + "strconv" + "strings" + "sync" -type I2CBus struct { - int - sync.Mutex -} + "github.com/spf13/viper" +) -func NewBus(bus int) *I2CBus { - b := &I2CBus{} - b.int = bus - return b +type I2CClient struct { + Bus int `mapstructure:"bus"` + sync.Mutex } -func (b *I2CBus) Scan() map[int]bool { - /* - Returns all the connected devices - */ - b.Lock() - defer b.Unlock() - bus := strconv.Itoa(b.int) - cmd := exec.Command("i2cdetect", "-y", "-r", bus) - var out bytes.Buffer - var errs bytes.Buffer - cmd.Stderr = &errs - cmd.Stdout = &out - if err := cmd.Run(); err != nil { - logging.Debug(logging.DError, "I2C error performing scan. %v", errs.String()) - } - - outString := out.String() - // could split by \n too - split := strings.SplitAfter(outString,":") - // 1st entry is garbage headers and ending is always \n##: - split = split[1:] - // create empty slice for all the devices - //var devices []i2cdev - devices := map[int]bool{} //maps device addresses to active bool - for i,v := range split { - lst := strings.Index(v,"\n") - trimmed := v[:lst] - trimmed = strings.Trim(trimmed," ") - // trimmed now holds just possible sensor addresses - count := strings.Split(trimmed," ") - for j,d := range count { - // the first row has to be offset by 3 but after its just i*16 + j - offset := 0 - if i == 0 { - offset = 3 - } - addr := i*16 + j + offset - if strings.Contains(d,"--") || strings.Contains(d,"UU") { - // address is unconnected or reserved - //devices = append(devices, I2Cdev{Addr:addr,Active:false}) - devices[addr] = false - } else { - //devices = append(devices, I2Cdev{Addr:addr,Active:true,LastSeen:now}) - devices[addr] = true - } - } - } - return devices +func NewI2CClient(config *viper.Viper) (*I2CClient, error) { + var err error + var bus int + client := &I2CClient{} + if !config.IsSet("i2c.bus") { + // no bus + if bus, err = system.GetBus(); err != nil { + return client, err + } + config.Set("i2c.bus", bus) + } + err = config.UnmarshalKey("i2c", client) + return client, err } -func (b *I2CBus) GetStatus(addr int) bool { - b.Lock() - defer b.Unlock() - - bus := strconv.Itoa(b.int) - a := strconv.Itoa(addr) - cmd := exec.Command("i2cdetect","-y","-r",bus,a,a) - var out bytes.Buffer - var errs bytes.Buffer - cmd.Stderr = &errs - cmd.Stdout = &out - if err := cmd.Run(); err != nil { - logging.Debug(logging.DError,"I2C error getting status! %v", errs.String()) - } +func (b *I2CClient) GetConnected() (map[int]bool, error) { + /* + Returns all the connected devices by address + I can def improve this + */ + b.Lock() + defer b.Unlock() + devices := make(map[int]bool) // only keys + bus := strconv.Itoa(b.Bus) + cmd := exec.Command("i2cdetect", "-y", "-r", bus) + var out bytes.Buffer + var errs bytes.Buffer + cmd.Stderr = &errs + cmd.Stdout = &out + if err := cmd.Run(); err != nil { + logging.Debug(logging.DError, "I2C error performing scan. %v", errs.String()) + return devices, err + } - outString := out.String() - split := strings.SplitAfter(outString,":") - split = split[1:] // remove garbage header - val := int(addr/16) // if addr = 90 90/16 = int(5.6) = 5 will be in 5th row - dev := split[val] - lst := strings.Index(dev,"\n") - dev = dev[:lst] - trimmed := strings.Trim(dev," \n") - if strings.Contains(trimmed,"--") { - return false - } else { - return true - } + outString := out.String() + // could split by \n too + split := strings.SplitAfter(outString, ":") + // 1st entry is garbage headers and ending is always \n##: + split = split[1:] + // create empty slice for all the devices + for i, v := range split { + lst := strings.Index(v, "\n") + trimmed := v[:lst] + trimmed = strings.Trim(trimmed, " ") + // trimmed now holds just possible sensor addresses + count := strings.Split(trimmed, " ") + for j, d := range count { + // the first row has to be offset by 3 but after its just i*16 + j + offset := 0 + if i == 0 { + offset = 3 + } + addr := i*16 + j + offset + if !strings.Contains(d, "--") && !strings.Contains(d, "UU") { + // active + fmt.Printf("Found %d(%x)\n", addr, addr) + devices[addr] = true + } + } + } + return devices, nil } -func (b *I2CBus) GetData(addr int) string { - b.Lock() - defer b.Unlock() - - bus := strconv.Itoa(b.int) - a := strconv.FormatInt(int64(addr),16) - cmd := exec.Command("i2ctransfer","-y",bus,fmt.Sprintf("r40@0x%s",a)) - var out bytes.Buffer - var errs bytes.Buffer - cmd.Stderr = &errs - cmd.Stdout = &out - if err := cmd.Run(); err != nil { - logging.Debug(logging.DError,"I2C error getting data! %v", errs.String()) - } +func (b *I2CClient) SendCmd(addr int, cmd string) (string, error) { + b.Lock() + defer b.Unlock() + /* + bus := strconv.Itoa(b.int) + a := strconv.FormatInt(int64(addr), 16) + cmd := exec.Command("i2ctransfer", "-y", bus, fmt.Sprintf("r40@0x%s", a)) + var out bytes.Buffer + var errs bytes.Buffer + cmd.Stderr = &errs + cmd.Stdout = &out + if err := cmd.Run(); err != nil { + logging.Debug(logging.DError, "I2C error getting data! %v", errs.String()) + } - outString := out.String() - split := strings.SplitAfter(outString," ") //getting chars 0x12 0x2f etc - var final string - for _,v := range split { - trimmed := strings.TrimLeft(v, "0x ") // trimming extra bs in front of num - trimmed = strings.TrimRight(trimmed," \n") // trimming back - if trimmed != "ff" { - final += trimmed - } - } - ret, err := hex.DecodeString(final) - if err != nil { - panic(err) - } - return string(ret) + outString := out.String() + split := strings.SplitAfter(outString, " ") //getting chars 0x12 0x2f etc + var final string + for _, v := range split { + trimmed := strings.TrimLeft(v, "0x ") // trimming extra bs in front of num + trimmed = strings.TrimRight(trimmed, " \n") // trimming back + if trimmed != "ff" { + final += trimmed + } + } + ret, err := hex.DecodeString(final) + if err != nil { + panic(err) + } + return string(ret) + */ + return "", errors.New("NOT IMPLM") } diff --git a/internal/pkg/I2C/device.go b/internal/pkg/I2C/device.go deleted file mode 100644 index a24501e..0000000 --- a/internal/pkg/I2C/device.go +++ /dev/null @@ -1,72 +0,0 @@ -package I2C - -import ( - "fmt" - "sync" - "time" -) - -type I2CDevice struct { - *I2CBus // embeds bus - bool // stores whether dev is currently connected - int // addr - Data *data -} - -type data struct { - string - bool - sync.Mutex -} - -func (d I2CDevice) String() string { - t := map[int]string{97: "DO Sensor", 99: "pH Sensor", 102: "Temperature Sensor", 64: "DHT11 Sensor"} - return t[d.int] -} - -func NewDevice(addr int, bus *I2CBus) *I2CDevice { - d := &I2CDevice{} - d.I2CBus = bus - d.int = addr - d.Data = &data{} - return d -} - -func (d *I2CDevice) GetAddr() int { - return d.int -} - -func (d *I2CDevice) GetStatus() bool { - // TODO - return d.I2CBus.GetStatus(d.int) -} - -func (d *I2CDevice) GetType() string { - // TODO - return fmt.Sprint(d) -} - -func (d *I2CDevice) GetData() string { - d.Data.Lock() - defer d.Data.Unlock() - d.Data.string = d.I2CBus.GetData(d.int) - return d.Data.string -} - -func (d *data) Active() { - d.Lock() - defer d.Unlock() - if !d.bool { - d.string = "" - d.bool = true - } -} - -func (d *data) Killed() { - d.Lock() - defer d.Unlock() - if d.bool { - d.string = time.Now().Format("Mon at 03:04:05pm MST") - d.bool = false - } -} diff --git a/internal/pkg/I2C/monitor.go b/internal/pkg/I2C/monitor.go deleted file mode 100644 index 7aacd32..0000000 --- a/internal/pkg/I2C/monitor.go +++ /dev/null @@ -1,102 +0,0 @@ -package I2C - -import ( - _ "fmt" - "sync" - "time" -) - -/* - i2c monitor implements a long running monitor responsible for sending active devices to the rlc -*/ - -type I2CMonitor struct { - *I2CBus - Devices *devs - DevChan chan int -} - -type devs struct { - sync.Mutex - m map[int]*I2CDevice -} - -func NewMonitor(bus int, ch chan int) *I2CMonitor { - m := &I2CMonitor{} - b := NewBus(bus) - m.I2CBus = b - d := make(map[int]*I2CDevice) - m.Devices = &devs{m: d} - m.DevChan = ch - return m -} - -func (m *I2CMonitor) Update() { - /* - scans bus and adds new active devices - */ - devs := m.Scan() - chng := m.Devices.Parse(m.I2CBus, devs) - for _, d := range chng { - go m.ConnectDevice(d) - } -} - -func (m *I2CMonitor) Monitor() { - // functon that updates the device list and notifies rlc of any changes to sensor composition - s := make(chan struct{}) - t := 5 * time.Second - go func(signal chan struct{}, to time.Duration) { // simple signal func to init scan - for { - signal <- struct{}{} - time.Sleep(to) - } - }(s, t) - - for { - <-s - m.Update() - } -} - -func (m *I2CMonitor) ConnectDevice(addr int) { - m.DevChan <- addr -} - -func (m *I2CMonitor) GetDevice(addr int) interface { - GetAddr() int - GetData() string - GetStatus() bool - GetType() string -} { - m.Devices.Lock() - defer m.Devices.Unlock() - return m.Devices.m[addr] -} - -func (d *devs) Parse(bus *I2CBus, devices map[int]bool) []int { - d.Lock() - defer d.Unlock() - newdevs := []int{} - for addr, status := range devices { - if dev, exists := d.m[addr]; exists { - // device seen - if status != dev.bool { // if device state changed - dev.bool = status - if status { - newdevs = append(newdevs, dev.GetAddr()) - } - } - } else { - // device not seen yet - if status { - // active - newd := NewDevice(addr, bus) - newd.bool = status - d.m[addr] = newd - newdevs = append(newdevs, newd.GetAddr()) - } - } - } - return newdevs -} diff --git a/internal/pkg/grpc/monitoring.proto b/internal/pkg/grpc/monitoring.proto index a96692b..4c9a1f1 100644 --- a/internal/pkg/grpc/monitoring.proto +++ b/internal/pkg/grpc/monitoring.proto @@ -13,8 +13,7 @@ message ReactorStatusResponse { message ReactorStatusPing { int32 id = 1; - repeated Sensor sensors = 2; - repeated Device devices = 3; + repeated Device devices = 2; } enum Status { @@ -23,17 +22,9 @@ enum Status { UNKOWN = 2; // Disconnected } -message Sensor { +message Device { int32 addr = 1; // i2c addr string name = 2; // use readable name, changable Status status = 3; - int32 sampleRate = 4; // in seconds - string data = 5; // open for any sort of format -} - -message Device { - int32 addr = 1; // i2c addr used for ID - string name = 2; // user readable name, changable - Status status = 3; - string data = 4; // any format + map data = 4; // k=v, format } diff --git a/internal/pkg/reactor/devices.go b/internal/pkg/reactor/devices.go new file mode 100644 index 0000000..fca09c2 --- /dev/null +++ b/internal/pkg/reactor/devices.go @@ -0,0 +1,75 @@ +package reactor + +import ( + pb "FRMS/internal/pkg/grpc" + "FRMS/internal/pkg/sensor" + "sync" +) + +type DeviceManager interface { + GetInfo() (*pb.Device, error) + SetStatus(int) error +} + +func NewDeviceManager(addr int, status int) (DeviceManager, error) { + dev := &pb.Device{Addr: int32(addr), Status: pb.Status(int32(status))} + return sensor.NewDeviceManager(dev) +} + +type DeviceCoordinator struct { + Managers map[int]DeviceManager + sync.RWMutex +} + +func NewDeviceCoordinator() *DeviceCoordinator { + dm := &DeviceCoordinator{} + dm.Managers = make(map[int]DeviceManager) + return dm +} + +func (c *DeviceCoordinator) UpdateDevices(active map[int]bool) error { + // update dev status, add new ones + c.Lock() + defer c.Unlock() + + var err error + + for addr, _ := range active { + // loop over devs + if _, ok := c.Managers[addr]; !ok { + // no device + if c.Managers[addr], err = NewDeviceManager(addr, 1); err != nil { + return err + } + } + } + // all devs accounted for + for addr, dm := range c.Managers { + if active[addr] { + err = dm.SetStatus(1) + } else { + err = dm.SetStatus(0) + } + if err != nil { + return err + } + } + return err +} + +func (c *DeviceCoordinator) GetDevices() ([]*pb.Device, error) { + // TODO + c.RLock() + defer c.RUnlock() + + var err error + var devices []*pb.Device + var dev *pb.Device + + for _, dm := range c.Managers { + dev, err = dm.GetInfo() + devices = append(devices, dev) + } + + return devices, err +} diff --git a/internal/pkg/reactor/monitoring.go b/internal/pkg/reactor/monitoring.go index b6cca24..728a800 100644 --- a/internal/pkg/reactor/monitoring.go +++ b/internal/pkg/reactor/monitoring.go @@ -10,101 +10,18 @@ import ( ) // implements grpc handler and device data aggregater handler -type DeviceStatus struct { - Addr int - Status pb.Status // 0 = Dead, 1 = alive, 2 = unkown - Type string - Data string -} - -// get reactor/device status -func (c *Coordinator) DevStatus(ch chan *DeviceStatus, a int, dm DeviceManager) { - d := &DeviceStatus{Addr: a} - d.Type = dm.GetType() - //d.Status = dm.GetStatus() - d.Status = 0 - d.Data = dm.GetData() - ch <- d -} - -func (c *Coordinator) GetStatus() []*pb.Device { - // db stuff - //api := client.WriteAPIBlocking(c.Org, c.Bucket) - //var wg sync.WaitGroup - devs := []*pb.Device{} - return devs - /* - statusChan := make(chan *DeviceStatus) - c.Devices.Lock() - for a, dm := range c.Devices.Managers { - wg.Add(1) - go c.DevStatus(statusChan, a, dm) - } - c.Devices.Unlock() - allDone := make(chan struct{}) - go func() { - wg.Wait() - allDone <- struct{}{} - }() // once all the status are sent we send all done on the chan - for { - select { - case s := <-statusChan: - fmt.Printf("%v is %v\n", s.Type, s.Status) - /* - data := strings.Split(s.Data,",") // T:10C,H:102% -> T:10C H:10% - for _, m := range data { - var meas string - splt := strings.Split(m,":") // T 10C or H 10% - if splt[0] == "T" { - meas = "Temperature" - } else if splt[0] == "H" { - meas = "Humidity" - } - val, err := strconv.ParseFloat(strings.Trim(splt[1]," %C\n"), 64) - if err != nil { - panic(err) - } - p := influxdb2.NewPoint("measurements",map[string]string{"type":meas},map[string]interface{}{"val":val},time.Now()) - if err := api.WritePoint(context.Background(), p); err != nil { - panic(err) - } - } - devs = append(devs,&pb.Device{Addr:int32(s.Addr),Type:s.Type,Status:s.Status,Data:s.Data}) - */ - /* - wg.Done() - case <-allDone: - return devs - } - } - */ -} - // grpc status update handler func (c *Coordinator) Ping() { // sends all device status to central coordinator fmt.Printf("Pinging cc\n") - devs := c.GetStatus() - req := &pb.ReactorStatusPing{Id: int32(c.ID), Devices: devs} - _, err := c.MonitoringClient.ReactorStatusHandler(context.Background(), req) + // get devices + devices, err := c.GetDevices() if err != nil { c.Err <- err - go c.Exit() } -} - -/* -func (c *Coordinator) Register() { - ip := c.hwinfo.Ip - if lis, err := net.Listen("tcp", fmt.Sprintf("%v:0",ip)); err != nil { - log.Fatal(err) - } else { - c.hwinfo.Port = lis.Addr().(*net.TCPAddr).Port - grpcServer := grpc.NewServer() - pb.RegisterMonitoringServer(grpcServer,c) - go grpcServer.Serve(lis) - } - logging.Debug(logging.DStart, "Listening for pings on %v:%v\n",ip,c.hwinfo.Port) + req := &pb.ReactorStatusPing{Id: int32(c.ID), Devices: devices} + if _, err := c.MonitoringClient.ReactorStatusHandler(context.Background(), req); err != nil { + c.Err <- err + } } -*/ diff --git a/internal/pkg/reactor/rlcoordinator.go b/internal/pkg/reactor/rlcoordinator.go index 1816b42..cdec290 100644 --- a/internal/pkg/reactor/rlcoordinator.go +++ b/internal/pkg/reactor/rlcoordinator.go @@ -31,7 +31,15 @@ func NewDBClient(config *viper.Viper) (DBClient, error) { return influxdb.NewDBClient(config) } -// Coordinator == Reactor Level Coordinator +type I2CClient interface { + // simple client to push responsibilites to sensor + GetConnected() (map[int]bool, error) // gets all connected addr + SendCmd(int, string) (string, error) // send cmd, string is return +} + +func NewI2CClient(config *viper.Viper) (I2CClient, error) { + return I2C.NewI2CClient(config) +} type Server struct { // embed @@ -39,26 +47,28 @@ type Server struct { Port int `mapstructure:"port"` } +// Coordinator == Reactor Level Coordinator + type Coordinator struct { Name string `mapstructure:"name,omitempty"` ID int `mapstructure:"id,omitempty"` - Bus int `mapstructure:"bus,omitempty"` Model string `mapstructure:"model,omitempty"` - // server info + // server info embedded Server + // database + Database DBClient + I2C I2CClient // config Config *viper.Viper MonitoringClient pb.MonitoringClient // connected devices - *Devices // struct for locking - *Sensors + *DeviceCoordinator // struct for locking // other stuff and things Err chan error mu sync.Mutex HB time.Duration PingTimer chan struct{} // db client - DB DBClient Active active } @@ -68,72 +78,22 @@ type active struct { sync.Mutex } -type Sensors struct { - Managers map[int]SensorManager - sync.Mutex -} - -type SensorManager interface { - Start() - GetType() string - GetStatus() string - GetData() string -} - -type Devices struct { - Managers map[int]DeviceManager - sync.Mutex -} - -// basic devicemanager struct manipulations - -type DeviceManager interface { - Start() - GetType() string - GetStatus() string - GetData() string -} - -type I2CDev interface { - GetAddr() int - GetData() string - GetStatus() bool - GetType() string -} - -//func NewDeviceManager(i2c I2CDev) DeviceManager { -//return sensor.NewDeviceManager(i2c) -//} - -type I2CMonitor interface { - Monitor() - GetDevice(int) interface { - GetAddr() int - GetStatus() bool - GetData() string - GetType() string - } -} - -func NewI2CMonitor(b int, ch chan int) I2CMonitor { - return I2C.NewMonitor(b, ch) -} - func NewCoordinator(config *viper.Viper, ch chan error) *Coordinator { - // sensor/device manager struct - dm := new(Devices) - dm.Managers = make(map[int]DeviceManager) - sm := new(Sensors) - sm.Managers = make(map[int]SensorManager) - - c := &Coordinator{Err: ch, Devices: dm, Sensors: sm, Config: config} + // coord + c := &Coordinator{Err: ch, Config: config} + c.DeviceCoordinator = NewDeviceCoordinator() + // hb defaults to 5 c.HB = time.Duration(5 * time.Second) // this is going to be scuffed //c.DB = &DB{Bucket: "bb", Org: "ForeLight", URL: url, Token: "S1UZssBu6KPfHaQCt34pZFpyc5lzbH9XanYJWCkOI5FqLY7gq205C6FTH-CmugiPH6o2WoKlTkEuPgIfaJjAhw=="} // setup db var err error - if c.DB, err = NewDBClient(config); err != nil { + if c.Database, err = NewDBClient(config); err != nil { + ch <- err + } + + if c.I2C, err = NewI2CClient(config); err != nil { ch <- err } @@ -153,7 +113,7 @@ func (c *Coordinator) Start() { c.Config.UnmarshalKey("reactor", c) go c.Monitor() go c.Discover() - go c.DB.Start() + go c.Database.Start() } func (c *Coordinator) LoadInfo() error { @@ -178,32 +138,21 @@ func (c *Coordinator) LoadInfo() error { } c.Config.Set("reactor.model", model) } - - // check Bus - if !c.Config.IsSet("reactor.bus") { - // get bus - var bus int - if bus, err = system.GetBus(); err != nil { - return err - } - c.Config.Set("reactor.bus", bus) - } // all good return err } func (c *Coordinator) Monitor() { - // function to automatically create and destroy sm - // scuffedaf - dch := make(chan int) - im := NewI2CMonitor(c.Bus, dch) - go im.Monitor() + // periodically grabs connected devs and updates list for c.IsActive() { select { - case d := <-dch: - i := im.GetDevice(d) - go c.DeviceConnect(i) case <-c.PingTimer: + // check devs and ping + active, err := c.I2C.GetConnected() + if err != nil { + c.Err <- err + } + go c.UpdateDevices(active) go c.Ping() } } @@ -217,20 +166,6 @@ func (c *Coordinator) HeartBeat() { } } -func (c *Coordinator) DeviceConnect(i2c I2CDev) { - c.Devices.Lock() - defer c.Devices.Unlock() - addr := i2c.GetAddr() - fmt.Printf("Device %d (%x) found!\n", addr, addr) - //if dm, exists := c.Devices.Managers[addr]; !exists { - //dm := NewDeviceManager(i2c) - //c.Devices.Managers[addr] = dm - //go dm.Start() - //} else { - //go dm.Start() - //} -} - func (c *Coordinator) Discover() { // sets up connection to central coordiantor conn, err := c.Connect(c.Ip, c.Port) diff --git a/internal/pkg/sensor/do_sensor.go b/internal/pkg/sensor/do_sensor.go index b937220..fb12a9e 100644 --- a/internal/pkg/sensor/do_sensor.go +++ b/internal/pkg/sensor/do_sensor.go @@ -4,7 +4,6 @@ package sensor import ( pb "FRMS/internal/pkg/grpc" - "errors" "sync" "github.com/spf13/viper" @@ -12,31 +11,13 @@ import ( type DOSensorManager struct { // do sensor manager - Sensor *pb.Sensor // for sending/updating + *Manager sync.RWMutex } -func (s *DOSensorManager) SetName(name string) error { - // #TODO - return errors.New("UNIMPL") -} - -func (s *DOSensorManager) SetStatus(status int) error { - // #TODO - return errors.New("UNIMPL") -} - -func (s *DOSensorManager) SetSampleRate(rate int) error { - // #TODO - return errors.New("UNIMPL") -} - -func (s *DOSensorManager) Update(sensor *pb.Sensor, config *viper.Viper) error { +func (s *DOSensorManager) Update(sensor *pb.Device, config *viper.Viper) error { // updates info - s.Lock() - defer s.Unlock() - s.Sensor = sensor - return errors.New("UNIMPL") + return s.Manager.Update(sensor) } func (s *DOSensorManager) String() string { diff --git a/internal/pkg/sensor/manager.go b/internal/pkg/sensor/manager.go index 58de1da..fb5bac1 100644 --- a/internal/pkg/sensor/manager.go +++ b/internal/pkg/sensor/manager.go @@ -1,10 +1,53 @@ package sensor import ( - _ "FRMS/internal/pkg/I2C" - _ "fmt" + pb "FRMS/internal/pkg/grpc" + "errors" + "sync" ) +// base device manager + +type Manager struct { + // base dm + Device *pb.Device // for sending/updating + sync.RWMutex +} + +func (m *Manager) SetName(name string) error { + // #TODO + return errors.New("UNIMPL") +} + +func (m *Manager) SetStatus(status int) error { + // #TODO + return errors.New("UNIMPL") +} + +func (m *Manager) SetParameter(key, val string) error { + // #TODO + return errors.New("UNIMPL") +} + +func (m *Manager) GetParameter(key string) (string, error) { + // #TODO + return "", errors.New("UNIMPL") +} + +func (m *Manager) Update(device *pb.Device) error { + // updates info + m.Lock() + defer m.Unlock() + m.Device = device + return errors.New("UNIMPL") +} + +func (m *Manager) GetInfo() (*pb.Device, error) { + m.RLock() + defer m.RUnlock() + return m.Device, nil +} + /* // I think most of this is unnessecary as hell type Manager struct { diff --git a/internal/pkg/sensor/mappings.go b/internal/pkg/sensor/mappings.go index cfca596..4e65bc8 100644 --- a/internal/pkg/sensor/mappings.go +++ b/internal/pkg/sensor/mappings.go @@ -11,54 +11,37 @@ import ( /* Returns the correct manager for sensor/device */ -type SensorManager interface { - // basic sensor stuff - SetName(string) error // change display name - SetStatus(int) error // update status - SetSampleRate(int) error // update sample rate - Update(*pb.Sensor, *viper.Viper) error // write updates - String() string // printable -} type DeviceManager interface { // basic device stuff SetName(string) error // change displayed name SetStatus(int) error // status change SetParameter(string, string) error // key, val + GetParameter(string) (string, error) // key, val Update(*pb.Device, *viper.Viper) error // write updates + GetInfo() (*pb.Device, error) // gets info String() string // printable } -func NewSensorManager(sensor *pb.Sensor) (SensorManager, error) { - // returns correct sensor manager by ID - var sm SensorManager +func NewDeviceManager(device *pb.Device) (DeviceManager, error) { + // returns correct device manager by ID + m := &Manager{Device: device} + var dm DeviceManager var err error - switch id := sensor.GetAddr(); id { + switch id := device.GetAddr(); id { case 97: // DO - sm = &DOSensorManager{Sensor: sensor} + dm = &DOSensorManager{Manager: m} case 99: // pH - sm = &PHSensorManager{Sensor: sensor} + dm = &PHSensorManager{Manager: m} case 102: // RTD - sm = &RTDSensorManager{Sensor: sensor} - default: - err = errors.New(fmt.Sprintf("Error: sensor id %v unrecognized!", id)) - } - return sm, err -} - -func NewDeviceManager(device *pb.Device) (DeviceManager, error) { - // returns correct device manager by ID - var dm DeviceManager - var err error - - switch id := device.GetAddr(); id { + dm = &RTDSensorManager{Manager: m} case 256: // PWM - dm = &PWMDeviceManager{Device: device} + dm = &PWMDeviceManager{Manager: m} default: err = errors.New(fmt.Sprintf("Error: device id %v unrecognized!", id)) } diff --git a/internal/pkg/sensor/ph_sensor.go b/internal/pkg/sensor/ph_sensor.go index f5bb1aa..03d6588 100644 --- a/internal/pkg/sensor/ph_sensor.go +++ b/internal/pkg/sensor/ph_sensor.go @@ -4,7 +4,6 @@ package sensor import ( pb "FRMS/internal/pkg/grpc" - "errors" "sync" "github.com/spf13/viper" @@ -12,31 +11,13 @@ import ( type PHSensorManager struct { // do sensor manager - Sensor *pb.Sensor // for sending/updating + *Manager sync.RWMutex } -func (s *PHSensorManager) SetName(name string) error { - // #TODO - return errors.New("UNIMPL") -} - -func (s *PHSensorManager) SetStatus(status int) error { - // #TODO - return errors.New("UNIMPL") -} - -func (s *PHSensorManager) SetSampleRate(rate int) error { - // #TODO - return errors.New("UNIMPL") -} - -func (s *PHSensorManager) Update(sensor *pb.Sensor, config *viper.Viper) error { +func (s *PHSensorManager) Update(sensor *pb.Device, config *viper.Viper) error { // updates info - s.Lock() - defer s.Unlock() - s.Sensor = sensor - return errors.New("UNIMPL") + return s.Manager.Update(sensor) } func (s *PHSensorManager) String() string { diff --git a/internal/pkg/sensor/pwm_device.go b/internal/pkg/sensor/pwm_device.go index 619faac..5a6f791 100644 --- a/internal/pkg/sensor/pwm_device.go +++ b/internal/pkg/sensor/pwm_device.go @@ -12,20 +12,10 @@ import ( type PWMDeviceManager struct { // do sensor manager - Device *pb.Device // for sending/updating + *Manager sync.RWMutex } -func (s *PWMDeviceManager) SetName(name string) error { - // #TODO - return errors.New("UNIMPL") -} - -func (s *PWMDeviceManager) SetStatus(status int) error { - // #TODO - return errors.New("UNIMPL") -} - func (s *PWMDeviceManager) SetParameter(key, val string) error { // #TODO return errors.New("UNIMPL") @@ -33,10 +23,7 @@ func (s *PWMDeviceManager) SetParameter(key, val string) error { func (s *PWMDeviceManager) Update(device *pb.Device, config *viper.Viper) error { // updates info - s.Lock() - defer s.Unlock() - s.Device = device - return errors.New("UNIMPL") + return s.Manager.Update(device) } func (s *PWMDeviceManager) String() string { diff --git a/internal/pkg/sensor/rtd_sensor.go b/internal/pkg/sensor/rtd_sensor.go index 2bb40c3..738b4ef 100644 --- a/internal/pkg/sensor/rtd_sensor.go +++ b/internal/pkg/sensor/rtd_sensor.go @@ -4,7 +4,6 @@ package sensor import ( pb "FRMS/internal/pkg/grpc" - "errors" "sync" "github.com/spf13/viper" @@ -12,31 +11,13 @@ import ( type RTDSensorManager struct { // do sensor manager - Sensor *pb.Sensor // for sending/updating + *Manager sync.RWMutex } -func (s *RTDSensorManager) SetName(name string) error { - // #TODO - return errors.New("UNIMPL") -} - -func (s *RTDSensorManager) SetStatus(status int) error { - // #TODO - return errors.New("UNIMPL") -} - -func (s *RTDSensorManager) SetSampleRate(rate int) error { - // #TODO - return errors.New("UNIMPL") -} - -func (s *RTDSensorManager) Update(sensor *pb.Sensor, config *viper.Viper) error { +func (s *RTDSensorManager) Update(sensor *pb.Device, config *viper.Viper) error { // updates info - s.Lock() - defer s.Unlock() - s.Sensor = sensor - return errors.New("UNIMPL") + return s.Manager.Update(sensor) } func (s *RTDSensorManager) String() string { diff --git a/internal/pkg/server/reactormanager.go b/internal/pkg/server/reactormanager.go index a469501..8d610cc 100644 --- a/internal/pkg/server/reactormanager.go +++ b/internal/pkg/server/reactormanager.go @@ -17,17 +17,10 @@ import ( type ReactorManager struct { *Manager // StatusMon *StatusMonitor putting on pause - *ReactorSensors *ReactorDevices Config *viper.Viper // config to update } -type ReactorSensors struct { - // sensor struct - Sensors map[int]SensorManager - sync.RWMutex -} - type ReactorDevices struct { // device struct Devices map[int]DeviceManager @@ -38,9 +31,7 @@ func NewReactorManager(err chan error) *ReactorManager { r := &ReactorManager{} // sub managers dm := make(map[int]DeviceManager) - sm := make(map[int]SensorManager) r.ReactorDevices = &ReactorDevices{Devices: dm} - r.ReactorSensors = &ReactorSensors{Sensors: sm} // core manager r.Manager = NewManager(err) @@ -63,33 +54,21 @@ func (r *ReactorManager) ReactorStatusHandler(ctx context.Context, req *pb.React //go r.PingReset() fmt.Printf("Recieved ping from %v!\n", req.GetId()) // update devices/sensors - go r.UpdateSensors(req.GetSensors()) go r.UpdateDevices(req.GetDevices()) return &pb.ReactorStatusResponse{Id: int32(r.Id)}, nil } -// sensor/device stuff - -type SensorManager interface { - SetName(string) error // change name - SetSampleRate(int) error // change sample rate - Update(*pb.Sensor, *viper.Viper) error // write updates - String() string // printing -} +// device stuff type DeviceManager interface { SetName(string) error // change name + GetParameter(string) (string, error) // key ret val SetParameter(string, string) error // key, val Update(*pb.Device, *viper.Viper) error // write updates String() string // printing } -func NewSensorManager(sens *pb.Sensor) (SensorManager, error) { - // returns a manager struct - return sensor.NewSensorManager(sens) -} - func NewDeviceManager(device *pb.Device) (DeviceManager, error) { // returns a manager struct return sensor.NewDeviceManager(device) @@ -121,29 +100,3 @@ func (r *ReactorDevices) AddDevice(dev *pb.Device, errCh chan error) { } r.Devices[int(dev.GetAddr())] = dm } - -func (r *ReactorManager) UpdateSensors(sensors []*pb.Sensor) { - // pass updates to correct manager - r.ReactorSensors.RLock() // read lock - defer r.ReactorSensors.RUnlock() - for _, sens := range sensors { - // looping over sensors - if sm, ok := r.ReactorSensors.Sensors[int(sens.GetAddr())]; ok { - // sensor manager found - go sm.Update(sens, r.Config) // update sm - } else { - // not found - go r.AddSensor(sens, r.Err) - } - } -} - -func (r *ReactorSensors) AddSensor(sensor *pb.Sensor, errCh chan error) { - r.Lock() // write lock - defer r.Unlock() - sm, err := NewSensorManager(sensor) - if err != nil { - errCh <- err - } - r.Sensors[int(sensor.GetAddr())] = sm -}