|
|
|
@ -1,7 +1,6 @@
|
|
|
|
|
package server
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
//"log"
|
|
|
|
|
pb "FRMS/internal/pkg/grpc"
|
|
|
|
|
"FRMS/internal/pkg/logging"
|
|
|
|
|
"context"
|
|
|
|
@ -11,23 +10,24 @@ import (
|
|
|
|
|
"google.golang.org/grpc"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
Listens on a supplied port and sends incoming clients over a supplied channel
|
|
|
|
|
Waits for a response on that channel to send back to the client with DB credentials
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
type Listener struct { // exporting for easy use in the short term
|
|
|
|
|
// Listener is a struct that listens for incoming clients on a given port
|
|
|
|
|
// and passes them the central coordinator.
|
|
|
|
|
// Implements the gRPC handshake server for clients.
|
|
|
|
|
type Listener struct {
|
|
|
|
|
Port int `mapstructure:"lis"`
|
|
|
|
|
ClientConnections chan *ClientPacket
|
|
|
|
|
Err chan error
|
|
|
|
|
pb.UnimplementedHandshakeServer
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ClientPacket is a uniform type to pass on a channel to the server.
|
|
|
|
|
type ClientPacket struct {
|
|
|
|
|
*Client
|
|
|
|
|
Response chan *ClientResponse
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Client is a struct containing information about the client on
|
|
|
|
|
// the incoming connection.
|
|
|
|
|
type Client struct {
|
|
|
|
|
//Ip string
|
|
|
|
|
//Port int
|
|
|
|
@ -36,6 +36,8 @@ type Client struct {
|
|
|
|
|
Type string
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ClientResponse is the database credentials returned from the central
|
|
|
|
|
// coordinator for the given client.
|
|
|
|
|
type ClientResponse struct {
|
|
|
|
|
Port int
|
|
|
|
|
URL string
|
|
|
|
@ -44,41 +46,75 @@ type ClientResponse struct {
|
|
|
|
|
Bucket string
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func NewListener(cch chan *ClientPacket, ech chan error) *Listener {
|
|
|
|
|
l := &Listener{Err: ech, ClientConnections: cch}
|
|
|
|
|
return l
|
|
|
|
|
// NewListener createsa new listener with the given client and error channels
|
|
|
|
|
func NewListener(
|
|
|
|
|
cch chan *ClientPacket,
|
|
|
|
|
ech chan error,
|
|
|
|
|
) (*Listener, error) {
|
|
|
|
|
|
|
|
|
|
return &Listener{
|
|
|
|
|
Err: ech,
|
|
|
|
|
ClientConnections: cch,
|
|
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Start activates the listener and kicks off the gRPC binding process
|
|
|
|
|
func (l *Listener) Start() error {
|
|
|
|
|
// start grpc server and implement reciever
|
|
|
|
|
logging.Debug(logging.DStart, "LIS 01 Started client listener")
|
|
|
|
|
return l.Register()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Register creates a net listener on the port and binds a grpc server to it
|
|
|
|
|
// before registering a handshake server.
|
|
|
|
|
func (l *Listener) Register() error {
|
|
|
|
|
// creates a gRPC service and binds it to our handler
|
|
|
|
|
|
|
|
|
|
lis, err := net.Listen("tcp", fmt.Sprintf(":%v", l.Port))
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
grpcServer := grpc.NewServer()
|
|
|
|
|
pb.RegisterHandshakeServer(grpcServer, l)
|
|
|
|
|
|
|
|
|
|
go grpcServer.Serve(lis)
|
|
|
|
|
|
|
|
|
|
logging.Debug(logging.DStart, "LIS 01 Registered on port %v", l.Port)
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ClientDiscoveryHandler implements the grpc method which can be called
|
|
|
|
|
// by incoming clients to first make connection to the central
|
|
|
|
|
// coordinator and receive database credentials.
|
|
|
|
|
func (l *Listener) ClientDiscoveryHandler(ctx context.Context, ping *pb.ClientRequest) (*pb.ClientResponse, error) {
|
|
|
|
|
// incoming client ping, notify coord and wait for DB credentials to respond
|
|
|
|
|
c := &Client{Id: int(ping.GetClientId()), Type: ping.GetClientType()}
|
|
|
|
|
logging.Debug(logging.DClient, "LIS %v %v has connected\n", c.Type, c.Id)
|
|
|
|
|
// prepare packet to send to coordinator
|
|
|
|
|
|
|
|
|
|
c := &Client{
|
|
|
|
|
Id: int(ping.GetClientId()),
|
|
|
|
|
Type: ping.GetClientType(),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
logging.Debug(logging.DClient, "LIS 01 %v %v has connected\n", c.Type, c.Id)
|
|
|
|
|
|
|
|
|
|
ch := make(chan *ClientResponse)
|
|
|
|
|
p := &ClientPacket{Client: c, Response: ch}
|
|
|
|
|
// blocking
|
|
|
|
|
p := &ClientPacket{
|
|
|
|
|
Client: c,
|
|
|
|
|
Response: ch,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
l.ClientConnections <- p
|
|
|
|
|
|
|
|
|
|
resp := <-ch
|
|
|
|
|
// prepare object to return to client
|
|
|
|
|
db := &pb.Database{URL: resp.URL, ORG: resp.Org, Token: resp.Token, Bucket: resp.Bucket}
|
|
|
|
|
return &pb.ClientResponse{ClientId: uint32(c.Id), ServerPort: uint32(resp.Port), Database: db}, nil
|
|
|
|
|
|
|
|
|
|
db := &pb.Database{
|
|
|
|
|
URL: resp.URL,
|
|
|
|
|
ORG: resp.Org,
|
|
|
|
|
Token: resp.Token,
|
|
|
|
|
Bucket: resp.Bucket,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return &pb.ClientResponse{
|
|
|
|
|
ClientId: uint32(c.Id),
|
|
|
|
|
ServerPort: uint32(resp.Port),
|
|
|
|
|
Database: db,
|
|
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|