diff --git a/internal/pkg/server/coordinator.go b/internal/pkg/server/coordinator.go index f4c8ea3..1df9b7e 100644 --- a/internal/pkg/server/coordinator.go +++ b/internal/pkg/server/coordinator.go @@ -83,7 +83,12 @@ func NewCentralCoordinator(config *viper.Viper, ch chan error) *CentralCoordinat func (c *CentralCoordinator) Start() { clientChan := make(chan *ClientPacket) - l := NewListener(clientChan, c.Err) + + l, err := NewListener(clientChan, c.Err) + + if err != nil { + c.Err <- err + } c.Config.UnmarshalKey("server.ports", l) diff --git a/internal/pkg/server/listener.go b/internal/pkg/server/listener.go index 3f9dc0d..c68356c 100644 --- a/internal/pkg/server/listener.go +++ b/internal/pkg/server/listener.go @@ -1,7 +1,6 @@ package server import ( - //"log" pb "FRMS/internal/pkg/grpc" "FRMS/internal/pkg/logging" "context" @@ -11,23 +10,24 @@ import ( "google.golang.org/grpc" ) -/* -Listens on a supplied port and sends incoming clients over a supplied channel -Waits for a response on that channel to send back to the client with DB credentials -*/ - -type Listener struct { // exporting for easy use in the short term +// Listener is a struct that listens for incoming clients on a given port +// and passes them the central coordinator. +// Implements the gRPC handshake server for clients. +type Listener struct { Port int `mapstructure:"lis"` ClientConnections chan *ClientPacket Err chan error pb.UnimplementedHandshakeServer } +// ClientPacket is a uniform type to pass on a channel to the server. type ClientPacket struct { *Client Response chan *ClientResponse } +// Client is a struct containing information about the client on +// the incoming connection. type Client struct { //Ip string //Port int @@ -36,6 +36,8 @@ type Client struct { Type string } +// ClientResponse is the database credentials returned from the central +// coordinator for the given client. type ClientResponse struct { Port int URL string @@ -44,41 +46,75 @@ type ClientResponse struct { Bucket string } -func NewListener(cch chan *ClientPacket, ech chan error) *Listener { - l := &Listener{Err: ech, ClientConnections: cch} - return l +// NewListener createsa new listener with the given client and error channels +func NewListener( + cch chan *ClientPacket, + ech chan error, +) (*Listener, error) { + + return &Listener{ + Err: ech, + ClientConnections: cch, + }, nil } +// Start activates the listener and kicks off the gRPC binding process func (l *Listener) Start() error { - // start grpc server and implement reciever logging.Debug(logging.DStart, "LIS 01 Started client listener") return l.Register() } +// Register creates a net listener on the port and binds a grpc server to it +// before registering a handshake server. func (l *Listener) Register() error { - // creates a gRPC service and binds it to our handler + lis, err := net.Listen("tcp", fmt.Sprintf(":%v", l.Port)) if err != nil { return err } + grpcServer := grpc.NewServer() pb.RegisterHandshakeServer(grpcServer, l) + go grpcServer.Serve(lis) + logging.Debug(logging.DStart, "LIS 01 Registered on port %v", l.Port) + return nil } +// ClientDiscoveryHandler implements the grpc method which can be called +// by incoming clients to first make connection to the central +// coordinator and receive database credentials. func (l *Listener) ClientDiscoveryHandler(ctx context.Context, ping *pb.ClientRequest) (*pb.ClientResponse, error) { - // incoming client ping, notify coord and wait for DB credentials to respond - c := &Client{Id: int(ping.GetClientId()), Type: ping.GetClientType()} - logging.Debug(logging.DClient, "LIS %v %v has connected\n", c.Type, c.Id) - // prepare packet to send to coordinator + + c := &Client{ + Id: int(ping.GetClientId()), + Type: ping.GetClientType(), + } + + logging.Debug(logging.DClient, "LIS 01 %v %v has connected\n", c.Type, c.Id) + ch := make(chan *ClientResponse) - p := &ClientPacket{Client: c, Response: ch} - // blocking + p := &ClientPacket{ + Client: c, + Response: ch, + } + l.ClientConnections <- p + resp := <-ch - // prepare object to return to client - db := &pb.Database{URL: resp.URL, ORG: resp.Org, Token: resp.Token, Bucket: resp.Bucket} - return &pb.ClientResponse{ClientId: uint32(c.Id), ServerPort: uint32(resp.Port), Database: db}, nil + + db := &pb.Database{ + URL: resp.URL, + ORG: resp.Org, + Token: resp.Token, + Bucket: resp.Bucket, + } + + return &pb.ClientResponse{ + ClientId: uint32(c.Id), + ServerPort: uint32(resp.Port), + Database: db, + }, nil } diff --git a/internal/pkg/server/listener_test.go b/internal/pkg/server/listener_test.go new file mode 100644 index 0000000..ae5d770 --- /dev/null +++ b/internal/pkg/server/listener_test.go @@ -0,0 +1,17 @@ +package server + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +// TestNewListener tries to create a new listener +func TestNewListener(t *testing.T) { + assert := assert.New(t) + + cch := make(chan *ClientPacket) + ech := make(chan error) + _, err := NewListener(cch, ech) + assert.Equal(err, nil, "creating listener failed") +}