diff --git a/.gitignore b/.gitignore index 18e7f1d..f50a924 100644 --- a/.gitignore +++ b/.gitignore @@ -22,4 +22,4 @@ bin/* # logs *.log -influxdb +tokens diff --git a/cmd/server/admin_token b/cmd/server/admin_token deleted file mode 100644 index 00e2490..0000000 --- a/cmd/server/admin_token +++ /dev/null @@ -1 +0,0 @@ -Zrtg0Q9u65HbFaK4KPWbl9y1xofJwsRHVwuWcIq3xvSOstVbjshDoRNjPiwsz31vIoP-GwDuGL8gzouEHqMuYg== diff --git a/cmd/server/main.go b/cmd/server/main.go index 687de1f..5aa32e7 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -41,14 +41,14 @@ func main() { ch := make(chan error) // creating listener var lport int - var dbport int + //var dbport int if port := os.Getenv("gRPC_PORT"); port == "" { lport = 2022 // default docker port } - if port := os.Getenv("DATABASE_PORT"); port == "" { - dbport = 8086 - } - fmt.Printf("DBPORT %d\n", dbport) + //if port := os.Getenv("DATABASE_PORT"); port == "" { + //dbport = 8086 + //} + //fmt.Printf("DBPORT %d\n", dbport) conf := ReadConfig() fmt.Printf("Found %v %v %v %v\n",conf.GetUrl(),conf.GetBucket(),conf.GetOrg(),conf.GetToken()) fmt.Printf("Listening on %v\n", lport) diff --git a/docker-compose.yml b/docker-compose.yml index c4e1101..75b43f6 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,10 +2,16 @@ version: "2.1" services: server: image: server + build: + context: . + dockerfile: Dockerfile.server ports: - "2022:2022" - "2023:2023" + volumes: + - ./logs:/app/log environment: + - LOGTYPE=SERVER - VERBOSE=1 depends_on: - db @@ -14,11 +20,21 @@ services: ports: - "8086:8086" volumes: - - ./influxdb/data:/var/lib/influxdb2 - - ./influxdb/config:/etc/influxdb2 + - influx-data:/var/lib/influxdb2 + - influx-config:/etc/influxdb2 environment: - DOCKER_INFLUXDB_INIT_MODE=setup - DOCKER_INFLUXDB_INIT_USERNAME=${INFLUXDB_USERNAME} - DOCKER_INFLUXDB_INIT_PASSWORD=${INFLUXDB_PASSWORD} - DOCKER_INFLUXDB_INIT_ORG=${INFLUXDB_ORG} - DOCKER_INFLUXDB_INIT_BUCKET=${INFLUXDB_BUCKET} + grafana: + image: grafana/grafana-oss:latest + ports: + - "3000:3000" + volumes: + - grafana-data:/var/lib/grafana +volumes: + grafana-data: + influx-data: + influx-config: diff --git a/internal/pkg/I2C/bus.go b/internal/pkg/I2C/bus.go index 843d1b0..dd95a69 100644 --- a/internal/pkg/I2C/bus.go +++ b/internal/pkg/I2C/bus.go @@ -3,8 +3,9 @@ package I2C // file has general wrappers to interact with i2c-tools import ( - _ "fmt" + "fmt" _ "log" + "encoding/hex" "os/exec" "bytes" "strings" @@ -37,7 +38,7 @@ func (b *I2CBus) Scan() map[int]bool { cmd.Stderr = &errs cmd.Stdout = &out if err := cmd.Run(); err != nil { - logging.Debug(logging.DError, "I2C ERROR: %v", errs.String()) + logging.Debug(logging.DError, "I2C error performing scan. %v", errs.String()) } outString := out.String() @@ -82,9 +83,11 @@ func (b *I2CBus) GetStatus(addr int) bool { a := strconv.Itoa(addr) cmd := exec.Command("i2cdetect","-y","-r",bus,a,a) var out bytes.Buffer + var errs bytes.Buffer + cmd.Stderr = &errs cmd.Stdout = &out if err := cmd.Run(); err != nil { - logging.Debug(logging.DError,"I2C ERROR: %v", err) + logging.Debug(logging.DError,"I2C error getting status! %v", errs.String()) } outString := out.String() @@ -101,3 +104,35 @@ func (b *I2CBus) GetStatus(addr int) bool { return true } } + +func (b *I2CBus) GetData(addr int) string { + b.Lock() + defer b.Unlock() + + bus := strconv.Itoa(b.int) + a := strconv.FormatInt(int64(addr),16) + cmd := exec.Command("i2ctransfer","-y",bus,fmt.Sprintf("r40@0x%s",a)) + var out bytes.Buffer + var errs bytes.Buffer + cmd.Stderr = &errs + cmd.Stdout = &out + if err := cmd.Run(); err != nil { + logging.Debug(logging.DError,"I2C error getting data! %v", errs.String()) + } + + outString := out.String() + split := strings.SplitAfter(outString," ") //getting chars 0x12 0x2f etc + var final string + for _,v := range split { + trimmed := strings.TrimLeft(v, "0x ") // trimming extra bs in front of num + trimmed = strings.TrimRight(trimmed," \n") // trimming back + if trimmed != "ff" { + final += trimmed + } + } + ret, err := hex.DecodeString(final) + if err != nil { + panic(err) + } + return string(ret) +} diff --git a/internal/pkg/I2C/device.go b/internal/pkg/I2C/device.go index 34663c6..0171c7e 100644 --- a/internal/pkg/I2C/device.go +++ b/internal/pkg/I2C/device.go @@ -20,7 +20,7 @@ type data struct { } func (d I2CDevice) String() string { - t := map[int]string{97:"DO Sensor",99:"pH Sensor",102:"Temperature Sensor"} + t := map[int]string{97:"DO Sensor",99:"pH Sensor",102:"Temperature Sensor",64:"DHT11 Sensor"} return t[d.int] } @@ -56,6 +56,7 @@ func (d *I2CDevice) GetType() string { func (d *I2CDevice) GetData() string { d.Data.Lock() defer d.Data.Unlock() + d.Data.string = d.I2CBus.GetData(d.int) return d.Data.string } diff --git a/internal/pkg/config/server.go b/internal/pkg/config/server.go index c5edf3a..1f37ccb 100644 --- a/internal/pkg/config/server.go +++ b/internal/pkg/config/server.go @@ -5,6 +5,7 @@ package config import ( _ "fmt" "github.com/spf13/viper" + "FRMS/internal/pkg/logging" "log" "os/exec" "bytes" @@ -22,22 +23,25 @@ func ReadServerConfig() *serverconfig { viper.SetConfigName("database") viper.SetConfigType("yaml") - viper.AddConfigPath("../../internal/configs") + viper.AddConfigPath("./internal/configs") viper.SetDefault("Orginization","ForeLight") viper.SetDefault("URL","http://localhost:8086") var C serverconfig err := viper.Unmarshal(&C) if err != nil { + logging.Debug(logging.DError,"Cannot unmarshal! %v",err) log.Fatal(err) } if C.Token == "" { // token unset - cmd := exec.Command("cat","admin_token") + logging.Debug(logging.DClient,"CON Grabbing adming token") + cmd := exec.Command("cat","tokens/admin_token") var out bytes.Buffer var stderr bytes.Buffer cmd.Stdout = &out cmd.Stderr = &stderr if err := cmd.Run(); err != nil { + logging.Debug(logging.DError,"CON Error grabbing token %v",err) log.Fatal(err) } outstring := out.String() diff --git a/internal/pkg/influxdb/client.go b/internal/pkg/influxdb/client.go new file mode 100644 index 0000000..36a1f03 --- /dev/null +++ b/internal/pkg/influxdb/client.go @@ -0,0 +1,6 @@ +package influxdb + +import ( + "fmt" + "github.com/influxdata/influxdb-client-go/v2" +) diff --git a/internal/pkg/reactor/monitoring.go b/internal/pkg/reactor/monitoring.go index 44dcfd9..b36707c 100644 --- a/internal/pkg/reactor/monitoring.go +++ b/internal/pkg/reactor/monitoring.go @@ -3,6 +3,10 @@ package reactor import ( "sync" "context" + "strings" + "github.com/influxdata/influxdb-client-go/v2" + "strconv" + "time" //"log" //"fmt" //"net" @@ -28,7 +32,9 @@ func (c *Coordinator) DevStatus(ch chan *DeviceStatus, a int, dm DeviceManager) ch <-d } -func (c *Coordinator) GetStatus() []*pb.Device { +func (c *Coordinator) GetStatus(client influxdb2.Client) []*pb.Device { + // db stuff + api := client.WriteAPIBlocking(c.Org,c.Bucket) var wg sync.WaitGroup devs := []*pb.Device{} statusChan := make(chan *DeviceStatus) @@ -47,6 +53,24 @@ func (c *Coordinator) GetStatus() []*pb.Device { select{ case s:= <-statusChan: //fmt.Printf("%v is %v\n",s.Type,s.Status) + data := strings.Split(s.Data,",") // T:10C,H:102% -> T:10C H:10% + for _, m := range data { + var meas string + splt := strings.Split(m,":") // T 10C or H 10% + if splt[0] == "T" { + meas = "Temperature" + } else if splt[0] == "H" { + meas = "Humidity" + } + val, err := strconv.ParseFloat(strings.Trim(splt[1]," %C\n"), 64) + if err != nil { + panic(err) + } + p := influxdb2.NewPoint("measurements",map[string]string{"type":meas},map[string]interface{}{"val":val},time.Now()) + if err := api.WritePoint(context.Background(), p); err != nil { + panic(err) + } + } devs = append(devs,&pb.Device{Addr:int32(s.Addr),Type:s.Type,Status:s.Status,Data:s.Data}) wg.Done() case <-allDone: @@ -56,9 +80,9 @@ func (c *Coordinator) GetStatus() []*pb.Device { } // grpc status update handler -func (c *Coordinator) Ping() { +func (c *Coordinator) Ping(client influxdb2.Client) { // sends all device status to central coordinator - devs := c.GetStatus() + devs := c.GetStatus(client) req := &pb.ReactorStatusPing{Id:c.Id,Devices:devs} _, err := c.MonitoringClient.ReactorStatusHandler(context.Background(),req) if err != nil { diff --git a/internal/pkg/reactor/rlcoordinator.go b/internal/pkg/reactor/rlcoordinator.go index ee3ef82..a39e6e4 100644 --- a/internal/pkg/reactor/rlcoordinator.go +++ b/internal/pkg/reactor/rlcoordinator.go @@ -16,6 +16,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/status" "google.golang.org/grpc/credentials/insecure" + "github.com/influxdata/influxdb-client-go/v2" pb "FRMS/internal/pkg/grpc" ) @@ -31,9 +32,18 @@ type Coordinator struct { mu sync.Mutex HB time.Duration PingTimer chan struct{} + *DB Active active } +type DB struct { + // struct to hold db connection info + Org string + Bucket string + Token string + URL string +} + type active struct { bool int @@ -89,6 +99,10 @@ func NewCoordinator(ip string,port int,ch chan error) *Coordinator { c.hw = &hw{} c.HB = time.Duration(1 * time.Second) c.PingTimer = make(chan struct{}) + // this is going to be scuffed + url := fmt.Sprintf("http://%s:8086",ip) + fmt.Println(url) + c.DB = &DB{Bucket:"bb",Org:"ForeLight",URL:url,Token:"S1UZssBu6KPfHaQCt34pZFpyc5lzbH9XanYJWCkOI5FqLY7gq205C6FTH-CmugiPH6o2WoKlTkEuPgIfaJjAhw=="} return c } @@ -110,6 +124,9 @@ func (c *Coordinator) Start() { func (c *Coordinator) Monitor() { // function to automatically create and destroy sm + // scuffedaf + client := influxdb2.NewClient(c.URL,c.Token) + defer client.Close() dch := make(chan int) im := NewI2CMonitor(c.Bus,dch) go im.Monitor() @@ -119,7 +136,7 @@ func (c *Coordinator) Monitor() { i := im.GetDevice(d) go c.DeviceConnect(i) case <-c.PingTimer: - go c.Ping() + go c.Ping(client) } } }