You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

167 lines
4.2 KiB
Go

3 years ago
package reactor
// file describes reactor level coordinator and associated implementation
import (
"fmt"
3 years ago
"sync"
"net"
3 years ago
"FRMS/internal/pkg/system"
"FRMS/internal/pkg/I2C"
3 years ago
"FRMS/internal/pkg/sensor"
"errors"
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
pb "FRMS/internal/pkg/grpc"
3 years ago
)
type Hardware struct {
Id uint32
Ip string
Bus int
Model string
Port int
3 years ago
}
type HWinfo interface {
Get() (*Hardware, error)
3 years ago
}
type Coordinator struct {
Server string
HW Hardware
Connected <-chan int
Disconnected <-chan int
3 years ago
Sensors map[int]SensorManager
Err chan error
mu sync.Mutex
pb.UnimplementedMonitoringServer
3 years ago
}
3 years ago
type SensorManager interface {
Start()
GetStatus() bool
GetType() string
}
func NewSensorManager(addr int) SensorManager {
return sensor.NewSensorManager(addr)
}
3 years ago
3 years ago
func NewCoordinator(s string,ch chan error) *Coordinator {
c := make(Coordinator{Server:s,Err:ch}
c.Connected = make(<-chan int)
c.Disconnected = make(<-chan int)
return c
3 years ago
}
func NewI2CMonitor(c, dc <-chan int, b int) error {
3 years ago
return I2C.NewMonitor(b)
}
func GetHWInfo() Hwinfo {
3 years ago
return system.NewHWMonitor()
}
func (c *Coordinator) Start() error {
// should discover hwinfo and sensors on its own
// now setting up sensor managers
c.HW, err = GetHWInfo()
if err != nil {
return err
}
err = NewI2CMonitor(c.Connect,c.Disconnect,c.Bus)
if err != nil {
return err
}
fmt.Printf("Reactor at IP addr %v using I2C bus %v\n",c.Ip,c.Bus)
response := c.EstablishConnection()
3 years ago
for response != nil {
fmt.Println("Connection failed!, Retrying")
response = c.EstablishConnection()
3 years ago
}
3 years ago
c.Register()
return nil
}
func (c *Coordinator) Monitor() {
// function to automatically create and destroy sm
for {
select {
case addr := <-c.Connect:
//check if sm exists and add sm for addr
c.Connected(addr)
case addr := <-c.Disconnect:
// kill sm for addr
c.Disconnected(addr)
}
}
}
func (c *Coordinator) Connected(addr int) {
c.mu.Lock()
3 years ago
defer c.mu.Unlock()
sm, exists := c.Sensors[addr]
if !exists {
sm := NewSensorManager(addr)
c.Sensors[addr] = sm
3 years ago
go sm.Start()
} // ignoring case of existing sm
3 years ago
}
func (c *Coordinator) Disconnected(addr int) {
c.mu.Lock()
defer c.mu.Unlock()
sm, exists := c.Sensors[addr]
if exists {
c.Sensors = delete(c.Sensors,addr)
sm.Kill()
} // not dealing with kill requests for not existing sm
func (c *Coordinator) EstablishConnection() error {
3 years ago
// function connects to central server and passes hwinfo
3 years ago
var opts []grpc.DialOption
opts = append(opts,grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := grpc.Dial(c.Server,opts...)
if err != nil {
return err
}
defer conn.Close()
client := pb.NewHandshakeClient(conn)
req := &pb.ReactorDiscoveryRequest{Id:c.Id,Ip:c.Ip,Port:c.Port,Model:c.Model}
3 years ago
resp, err := client.ReactorDiscoveryHandler(context.Background(), req)
if err != nil {
return err
}
if resp.GetSuccess() {
fmt.Println("Central server reached")
return nil
} else {
return errors.New("Failed to reach central server!")
}
}
3 years ago
3 years ago
func (c *Coordinator) Register() error {
fmt.Printf("Listing for pings on %v:%v\n",c.Ip,c.Port)
lis, err := net.Listen("tcp", fmt.Sprintf("%v:%v",c.Ip(),c.Port))
3 years ago
if err != nil {
return err
}
grpcServer := grpc.NewServer()
pb.RegisterMonitoringServer(grpcServer, c)
go grpcServer.Serve(lis)
3 years ago
return nil
}
3 years ago
func (c *Coordinator) SensorStatusHandler(ctx context.Context, ping *pb.SensorStatusRequest) (*pb.SensorStatusResponse,error) {
c.mu.Lock()
defer c.mu.Unlock()
var sensors []*pb.SensorStatus
resp := &pb.SensorStatusResponse{Id:c.Id,Sensors:sensors}
3 years ago
for a, s := range c.Sensors {
resp.Sensors = append(resp.Sensors,&pb.SensorStatus{Addr:int32(a), Type:s.GetType(), Status:s.GetStatus()})
}
return resp, nil
}