super crude grafana display but works with hardcoded sensor manager and database client

main
Keegan 2 years ago
parent c5b9953313
commit 44fc6eb74a

2
.gitignore vendored

@ -22,4 +22,4 @@ bin/*
# logs # logs
*.log *.log
influxdb tokens

@ -1 +0,0 @@
Zrtg0Q9u65HbFaK4KPWbl9y1xofJwsRHVwuWcIq3xvSOstVbjshDoRNjPiwsz31vIoP-GwDuGL8gzouEHqMuYg==

@ -41,14 +41,14 @@ func main() {
ch := make(chan error) ch := make(chan error)
// creating listener // creating listener
var lport int var lport int
var dbport int //var dbport int
if port := os.Getenv("gRPC_PORT"); port == "" { if port := os.Getenv("gRPC_PORT"); port == "" {
lport = 2022 // default docker port lport = 2022 // default docker port
} }
if port := os.Getenv("DATABASE_PORT"); port == "" { //if port := os.Getenv("DATABASE_PORT"); port == "" {
dbport = 8086 //dbport = 8086
} //}
fmt.Printf("DBPORT %d\n", dbport) //fmt.Printf("DBPORT %d\n", dbport)
conf := ReadConfig() conf := ReadConfig()
fmt.Printf("Found %v %v %v %v\n",conf.GetUrl(),conf.GetBucket(),conf.GetOrg(),conf.GetToken()) fmt.Printf("Found %v %v %v %v\n",conf.GetUrl(),conf.GetBucket(),conf.GetOrg(),conf.GetToken())
fmt.Printf("Listening on %v\n", lport) fmt.Printf("Listening on %v\n", lport)

@ -2,10 +2,16 @@ version: "2.1"
services: services:
server: server:
image: server image: server
build:
context: .
dockerfile: Dockerfile.server
ports: ports:
- "2022:2022" - "2022:2022"
- "2023:2023" - "2023:2023"
volumes:
- ./logs:/app/log
environment: environment:
- LOGTYPE=SERVER
- VERBOSE=1 - VERBOSE=1
depends_on: depends_on:
- db - db
@ -14,11 +20,21 @@ services:
ports: ports:
- "8086:8086" - "8086:8086"
volumes: volumes:
- ./influxdb/data:/var/lib/influxdb2 - influx-data:/var/lib/influxdb2
- ./influxdb/config:/etc/influxdb2 - influx-config:/etc/influxdb2
environment: environment:
- DOCKER_INFLUXDB_INIT_MODE=setup - DOCKER_INFLUXDB_INIT_MODE=setup
- DOCKER_INFLUXDB_INIT_USERNAME=${INFLUXDB_USERNAME} - DOCKER_INFLUXDB_INIT_USERNAME=${INFLUXDB_USERNAME}
- DOCKER_INFLUXDB_INIT_PASSWORD=${INFLUXDB_PASSWORD} - DOCKER_INFLUXDB_INIT_PASSWORD=${INFLUXDB_PASSWORD}
- DOCKER_INFLUXDB_INIT_ORG=${INFLUXDB_ORG} - DOCKER_INFLUXDB_INIT_ORG=${INFLUXDB_ORG}
- DOCKER_INFLUXDB_INIT_BUCKET=${INFLUXDB_BUCKET} - DOCKER_INFLUXDB_INIT_BUCKET=${INFLUXDB_BUCKET}
grafana:
image: grafana/grafana-oss:latest
ports:
- "3000:3000"
volumes:
- grafana-data:/var/lib/grafana
volumes:
grafana-data:
influx-data:
influx-config:

@ -3,8 +3,9 @@ package I2C
// file has general wrappers to interact with i2c-tools // file has general wrappers to interact with i2c-tools
import ( import (
_ "fmt" "fmt"
_ "log" _ "log"
"encoding/hex"
"os/exec" "os/exec"
"bytes" "bytes"
"strings" "strings"
@ -37,7 +38,7 @@ func (b *I2CBus) Scan() map[int]bool {
cmd.Stderr = &errs cmd.Stderr = &errs
cmd.Stdout = &out cmd.Stdout = &out
if err := cmd.Run(); err != nil { if err := cmd.Run(); err != nil {
logging.Debug(logging.DError, "I2C ERROR: %v", errs.String()) logging.Debug(logging.DError, "I2C error performing scan. %v", errs.String())
} }
outString := out.String() outString := out.String()
@ -82,9 +83,11 @@ func (b *I2CBus) GetStatus(addr int) bool {
a := strconv.Itoa(addr) a := strconv.Itoa(addr)
cmd := exec.Command("i2cdetect","-y","-r",bus,a,a) cmd := exec.Command("i2cdetect","-y","-r",bus,a,a)
var out bytes.Buffer var out bytes.Buffer
var errs bytes.Buffer
cmd.Stderr = &errs
cmd.Stdout = &out cmd.Stdout = &out
if err := cmd.Run(); err != nil { if err := cmd.Run(); err != nil {
logging.Debug(logging.DError,"I2C ERROR: %v", err) logging.Debug(logging.DError,"I2C error getting status! %v", errs.String())
} }
outString := out.String() outString := out.String()
@ -101,3 +104,35 @@ func (b *I2CBus) GetStatus(addr int) bool {
return true return true
} }
} }
func (b *I2CBus) GetData(addr int) string {
b.Lock()
defer b.Unlock()
bus := strconv.Itoa(b.int)
a := strconv.FormatInt(int64(addr),16)
cmd := exec.Command("i2ctransfer","-y",bus,fmt.Sprintf("r40@0x%s",a))
var out bytes.Buffer
var errs bytes.Buffer
cmd.Stderr = &errs
cmd.Stdout = &out
if err := cmd.Run(); err != nil {
logging.Debug(logging.DError,"I2C error getting data! %v", errs.String())
}
outString := out.String()
split := strings.SplitAfter(outString," ") //getting chars 0x12 0x2f etc
var final string
for _,v := range split {
trimmed := strings.TrimLeft(v, "0x ") // trimming extra bs in front of num
trimmed = strings.TrimRight(trimmed," \n") // trimming back
if trimmed != "ff" {
final += trimmed
}
}
ret, err := hex.DecodeString(final)
if err != nil {
panic(err)
}
return string(ret)
}

@ -20,7 +20,7 @@ type data struct {
} }
func (d I2CDevice) String() string { func (d I2CDevice) String() string {
t := map[int]string{97:"DO Sensor",99:"pH Sensor",102:"Temperature Sensor"} t := map[int]string{97:"DO Sensor",99:"pH Sensor",102:"Temperature Sensor",64:"DHT11 Sensor"}
return t[d.int] return t[d.int]
} }
@ -56,6 +56,7 @@ func (d *I2CDevice) GetType() string {
func (d *I2CDevice) GetData() string { func (d *I2CDevice) GetData() string {
d.Data.Lock() d.Data.Lock()
defer d.Data.Unlock() defer d.Data.Unlock()
d.Data.string = d.I2CBus.GetData(d.int)
return d.Data.string return d.Data.string
} }

@ -5,6 +5,7 @@ package config
import ( import (
_ "fmt" _ "fmt"
"github.com/spf13/viper" "github.com/spf13/viper"
"FRMS/internal/pkg/logging"
"log" "log"
"os/exec" "os/exec"
"bytes" "bytes"
@ -22,22 +23,25 @@ func ReadServerConfig() *serverconfig {
viper.SetConfigName("database") viper.SetConfigName("database")
viper.SetConfigType("yaml") viper.SetConfigType("yaml")
viper.AddConfigPath("../../internal/configs") viper.AddConfigPath("./internal/configs")
viper.SetDefault("Orginization","ForeLight") viper.SetDefault("Orginization","ForeLight")
viper.SetDefault("URL","http://localhost:8086") viper.SetDefault("URL","http://localhost:8086")
var C serverconfig var C serverconfig
err := viper.Unmarshal(&C) err := viper.Unmarshal(&C)
if err != nil { if err != nil {
logging.Debug(logging.DError,"Cannot unmarshal! %v",err)
log.Fatal(err) log.Fatal(err)
} }
if C.Token == "" { if C.Token == "" {
// token unset // token unset
cmd := exec.Command("cat","admin_token") logging.Debug(logging.DClient,"CON Grabbing adming token")
cmd := exec.Command("cat","tokens/admin_token")
var out bytes.Buffer var out bytes.Buffer
var stderr bytes.Buffer var stderr bytes.Buffer
cmd.Stdout = &out cmd.Stdout = &out
cmd.Stderr = &stderr cmd.Stderr = &stderr
if err := cmd.Run(); err != nil { if err := cmd.Run(); err != nil {
logging.Debug(logging.DError,"CON Error grabbing token %v",err)
log.Fatal(err) log.Fatal(err)
} }
outstring := out.String() outstring := out.String()

@ -0,0 +1,6 @@
package influxdb
import (
"fmt"
"github.com/influxdata/influxdb-client-go/v2"
)

@ -3,6 +3,10 @@ package reactor
import ( import (
"sync" "sync"
"context" "context"
"strings"
"github.com/influxdata/influxdb-client-go/v2"
"strconv"
"time"
//"log" //"log"
//"fmt" //"fmt"
//"net" //"net"
@ -28,7 +32,9 @@ func (c *Coordinator) DevStatus(ch chan *DeviceStatus, a int, dm DeviceManager)
ch <-d ch <-d
} }
func (c *Coordinator) GetStatus() []*pb.Device { func (c *Coordinator) GetStatus(client influxdb2.Client) []*pb.Device {
// db stuff
api := client.WriteAPIBlocking(c.Org,c.Bucket)
var wg sync.WaitGroup var wg sync.WaitGroup
devs := []*pb.Device{} devs := []*pb.Device{}
statusChan := make(chan *DeviceStatus) statusChan := make(chan *DeviceStatus)
@ -47,6 +53,24 @@ func (c *Coordinator) GetStatus() []*pb.Device {
select{ select{
case s:= <-statusChan: case s:= <-statusChan:
//fmt.Printf("%v is %v\n",s.Type,s.Status) //fmt.Printf("%v is %v\n",s.Type,s.Status)
data := strings.Split(s.Data,",") // T:10C,H:102% -> T:10C H:10%
for _, m := range data {
var meas string
splt := strings.Split(m,":") // T 10C or H 10%
if splt[0] == "T" {
meas = "Temperature"
} else if splt[0] == "H" {
meas = "Humidity"
}
val, err := strconv.ParseFloat(strings.Trim(splt[1]," %C\n"), 64)
if err != nil {
panic(err)
}
p := influxdb2.NewPoint("measurements",map[string]string{"type":meas},map[string]interface{}{"val":val},time.Now())
if err := api.WritePoint(context.Background(), p); err != nil {
panic(err)
}
}
devs = append(devs,&pb.Device{Addr:int32(s.Addr),Type:s.Type,Status:s.Status,Data:s.Data}) devs = append(devs,&pb.Device{Addr:int32(s.Addr),Type:s.Type,Status:s.Status,Data:s.Data})
wg.Done() wg.Done()
case <-allDone: case <-allDone:
@ -56,9 +80,9 @@ func (c *Coordinator) GetStatus() []*pb.Device {
} }
// grpc status update handler // grpc status update handler
func (c *Coordinator) Ping() { func (c *Coordinator) Ping(client influxdb2.Client) {
// sends all device status to central coordinator // sends all device status to central coordinator
devs := c.GetStatus() devs := c.GetStatus(client)
req := &pb.ReactorStatusPing{Id:c.Id,Devices:devs} req := &pb.ReactorStatusPing{Id:c.Id,Devices:devs}
_, err := c.MonitoringClient.ReactorStatusHandler(context.Background(),req) _, err := c.MonitoringClient.ReactorStatusHandler(context.Background(),req)
if err != nil { if err != nil {

@ -16,6 +16,7 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"
"github.com/influxdata/influxdb-client-go/v2"
pb "FRMS/internal/pkg/grpc" pb "FRMS/internal/pkg/grpc"
) )
@ -31,9 +32,18 @@ type Coordinator struct {
mu sync.Mutex mu sync.Mutex
HB time.Duration HB time.Duration
PingTimer chan struct{} PingTimer chan struct{}
*DB
Active active Active active
} }
type DB struct {
// struct to hold db connection info
Org string
Bucket string
Token string
URL string
}
type active struct { type active struct {
bool bool
int int
@ -89,6 +99,10 @@ func NewCoordinator(ip string,port int,ch chan error) *Coordinator {
c.hw = &hw{} c.hw = &hw{}
c.HB = time.Duration(1 * time.Second) c.HB = time.Duration(1 * time.Second)
c.PingTimer = make(chan struct{}) c.PingTimer = make(chan struct{})
// this is going to be scuffed
url := fmt.Sprintf("http://%s:8086",ip)
fmt.Println(url)
c.DB = &DB{Bucket:"bb",Org:"ForeLight",URL:url,Token:"S1UZssBu6KPfHaQCt34pZFpyc5lzbH9XanYJWCkOI5FqLY7gq205C6FTH-CmugiPH6o2WoKlTkEuPgIfaJjAhw=="}
return c return c
} }
@ -110,6 +124,9 @@ func (c *Coordinator) Start() {
func (c *Coordinator) Monitor() { func (c *Coordinator) Monitor() {
// function to automatically create and destroy sm // function to automatically create and destroy sm
// scuffedaf
client := influxdb2.NewClient(c.URL,c.Token)
defer client.Close()
dch := make(chan int) dch := make(chan int)
im := NewI2CMonitor(c.Bus,dch) im := NewI2CMonitor(c.Bus,dch)
go im.Monitor() go im.Monitor()
@ -119,7 +136,7 @@ func (c *Coordinator) Monitor() {
i := im.GetDevice(d) i := im.GetDevice(d)
go c.DeviceConnect(i) go c.DeviceConnect(i)
case <-c.PingTimer: case <-c.PingTimer:
go c.Ping() go c.Ping(client)
} }
} }
} }

Loading…
Cancel
Save