Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add side input sdkclient and grpc #953

Merged
merged 15 commits into from
Aug 25, 2023
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ require (
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe
github.com/nats-io/nats-server/v2 v2.9.19
github.com/nats-io/nats.go v1.27.1
github.com/numaproj/numaflow-go v0.4.6-0.20230822054239-88190e94e727
github.com/numaproj/numaflow-go v0.4.6-0.20230824220200-630a5eba1f54
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/common v0.37.0
github.com/redis/go-redis/v9 v9.0.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -670,8 +670,8 @@ github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5s
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/numaproj/numaflow-go v0.4.6-0.20230822054239-88190e94e727 h1:m+2sl0pbBvhiiLEXyyslBv0GeWXm/1wpR4PUg0C2xY8=
github.com/numaproj/numaflow-go v0.4.6-0.20230822054239-88190e94e727/go.mod h1:5zwvvREIbqaCPCKsNE1MVjVToD0kvkCh2Z90Izlhw5U=
github.com/numaproj/numaflow-go v0.4.6-0.20230824220200-630a5eba1f54 h1:nx77VKeseDKPFHhY4AMecvzhJw8oSEVeisAROufT5dU=
github.com/numaproj/numaflow-go v0.4.6-0.20230824220200-630a5eba1f54/go.mod h1:5zwvvREIbqaCPCKsNE1MVjVToD0kvkCh2Z90Izlhw5U=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
Expand Down
95 changes: 95 additions & 0 deletions pkg/sdkclient/sideinput/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package sideinput

import (
"context"
"fmt"
"time"

sideinputpb "github.com/numaproj/numaflow-go/pkg/apis/proto/sideinput/v1"
"github.com/numaproj/numaflow-go/pkg/shared"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/types/known/emptypb"
)

// client contains the grpc connection and the grpc client.
type client struct {
conn *grpc.ClientConn
grpcClt sideinputpb.SideInputClient
}

var _ Client = (*client)(nil)

// New creates a new client object.
func New(inputOptions ...Option) (*client, error) {
var opts = &options{
sockAddr: shared.SideInputAddr,
maxMessageSize: 1024 * 1024 * 64, // 64 MB
}
for _, inputOption := range inputOptions {
inputOption(opts)
}
_, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()
c := new(client)
sockAddr := fmt.Sprintf("%s:%s", shared.UDS, opts.sockAddr)
conn, err := grpc.Dial(sockAddr, grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(opts.maxMessageSize), grpc.MaxCallSendMsgSize(opts.maxMessageSize)))
if err != nil {
return nil, fmt.Errorf("failed to execute grpc.Dial(%q): %w", sockAddr, err)
}
c.conn = conn
c.grpcClt = sideinputpb.NewSideInputClient(conn)
return c, nil
}

// NewFromClient creates a new client object from a grpc client. This is used for testing.
func NewFromClient(c sideinputpb.SideInputClient) (Client, error) {
whynowy marked this conversation as resolved.
Show resolved Hide resolved
return &client{
grpcClt: c,
}, nil
}

// CloseConn closes the grpc connection.
func (c client) CloseConn(ctx context.Context) error {
return c.conn.Close()
}

// IsReady checks if the grpc connection is ready to use.
func (c client) IsReady(ctx context.Context, in *emptypb.Empty) (bool, error) {
resp, err := c.grpcClt.IsReady(ctx, in)
if err != nil {
return false, err
}
return resp.GetReady(), nil
}

// RetrieveSideInput retrieves the side input value and returns the updated payload.
func (c client) RetrieveSideInput(ctx context.Context, in *emptypb.Empty) (*sideinputpb.SideInputResponse, error) {
retrieveResponse, err := c.grpcClt.RetrieveSideInput(ctx, in)
// TODO check which error to use
if err != nil {
return nil, fmt.Errorf("failed to execute c.grpcClt.RetrieveSideInput(): %w", err)
}
return retrieveResponse, nil
}

// IsHealthy checks if the client is healthy.
func (c client) IsHealthy(ctx context.Context) error {
return c.WaitUntilReady(ctx)
}

// WaitUntilReady waits until the client is connected.
func (c client) WaitUntilReady(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return fmt.Errorf("failed on readiness check: %w", ctx.Err())
default:
if _, err := c.IsReady(ctx, &emptypb.Empty{}); err == nil {
return nil
}
time.Sleep(1 * time.Second)
}
}
}
86 changes: 86 additions & 0 deletions pkg/sdkclient/sideinput/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package sideinput

import (
"bytes"
"context"
"fmt"
"reflect"
"testing"

"github.com/golang/mock/gomock"
sideinputpb "github.com/numaproj/numaflow-go/pkg/apis/proto/sideinput/v1"
"github.com/numaproj/numaflow-go/pkg/apis/proto/sideinput/v1/sideinputmock"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/emptypb"
)

type rpcMsg struct {
msg proto.Message
}

func (r *rpcMsg) Matches(msg interface{}) bool {
m, ok := msg.(proto.Message)
if !ok {
return false
}
return proto.Equal(m, r.msg)
}

func (r *rpcMsg) String() string {
return fmt.Sprintf("is %s", r.msg)
}

func TestIsReady(t *testing.T) {
var ctx = context.Background()
LintCleanCall()

ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockClient := sideinputmock.NewMockSideInputClient(ctrl)
mockClient.EXPECT().IsReady(gomock.Any(), gomock.Any()).Return(&sideinputpb.ReadyResponse{Ready: true}, nil)
mockClient.EXPECT().IsReady(gomock.Any(), gomock.Any()).Return(&sideinputpb.ReadyResponse{Ready: false}, fmt.Errorf("mock connection refused"))

testClient, err := NewFromClient(mockClient)
assert.NoError(t, err)
reflect.DeepEqual(testClient, &client{
grpcClt: mockClient,
})

ready, err := testClient.IsReady(ctx, &emptypb.Empty{})
assert.True(t, ready)
assert.NoError(t, err)

ready, err = testClient.IsReady(ctx, &emptypb.Empty{})
assert.False(t, ready)
assert.EqualError(t, err, "mock connection refused")
}

func TestRetrieveFn(t *testing.T) {
var ctx = context.Background()

ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockSideInputClient := sideinputmock.NewMockSideInputClient(ctrl)
response := sideinputpb.SideInputResponse{Value: []byte("mock side input message")}
mockSideInputClient.EXPECT().RetrieveSideInput(gomock.Any(), gomock.Any()).Return(&sideinputpb.SideInputResponse{Value: []byte("mock side input message")}, nil)

testClient, err := NewFromClient(mockSideInputClient)
assert.NoError(t, err)
reflect.DeepEqual(testClient, &client{
grpcClt: mockSideInputClient,
})

got, err := testClient.RetrieveSideInput(ctx, &emptypb.Empty{})
assert.True(t, bytes.Equal(got.Value, response.Value))
assert.NoError(t, err)
}

// Check if there is a better way to resolve
func LintCleanCall() {
var m = rpcMsg{}
fmt.Println(m.Matches(m))
fmt.Println(m)
}
15 changes: 15 additions & 0 deletions pkg/sdkclient/sideinput/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package sideinput

import (
"context"

sideinputpb "github.com/numaproj/numaflow-go/pkg/apis/proto/sideinput/v1"
"google.golang.org/protobuf/types/known/emptypb"
)

// Client contains methods to call a gRPC client for side input.
type Client interface {
CloseConn(ctx context.Context) error
IsReady(ctx context.Context, in *emptypb.Empty) (bool, error)
RetrieveSideInput(ctx context.Context, in *emptypb.Empty) (*sideinputpb.SideInputResponse, error)
}
49 changes: 49 additions & 0 deletions pkg/sdkclient/sideinput/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
Copyright 2022 The Numaproj Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package sideinput

import "time"

type options struct {
sockAddr string
maxMessageSize int
sideInputTimeout time.Duration
}

// Option is the interface to apply options.
type Option func(*options)

// WithSockAddr start the client with the given sock addr. This is mainly used for testing purpose.
func WithSockAddr(addr string) Option {
return func(opts *options) {
opts.sockAddr = addr
}
}

// WithMaxMessageSize sets the max message size to the given size.
func WithMaxMessageSize(size int) Option {
return func(o *options) {
o.maxMessageSize = size
}
}

// WithSideInputTimeout sets the side input timeout to the given timeout.
func WithSideInputTimeout(t time.Duration) Option {
return func(o *options) {
o.sideInputTimeout = t
}
}
17 changes: 9 additions & 8 deletions pkg/sideinputs/initializer/initializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ func NewSideInputsInitializer(isbSvcType dfv1.ISBSvcType, pipelineName, sideInpu
// and update the values on the disk. This would exit once all the side inputs are initialized.
func (sii *sideInputsInitializer) Run(ctx context.Context) error {
var (
natsClient *jsclient.NATSClient
err error
natsClient *jsclient.NATSClient
err error
sideInputWatcher kvs.KVWatcher
)

log := logging.FromContext(ctx)
Expand All @@ -75,15 +76,15 @@ func (sii *sideInputsInitializer) Run(ctx context.Context) error {
return err
}
defer natsClient.Close()
// Load the required KV bucket and create a sideInputWatcher for it
kvName := isbsvc.JetStreamSideInputsStoreKVName(sii.sideInputsStore)
sideInputWatcher, err = jetstream.NewKVJetStreamKVWatch(ctx, kvName, natsClient)
if err != nil {
return fmt.Errorf("failed to create a sideInputWatcher, %w", err)
}
default:
return fmt.Errorf("unrecognized isbsvc type %q", sii.isbSvcType)
}
// Load the required KV bucket and create a sideInputWatcher for it
kvName := isbsvc.JetStreamSideInputsStoreKVName(sii.sideInputsStore)
sideInputWatcher, err := jetstream.NewKVJetStreamKVWatch(ctx, kvName, natsClient)
if err != nil {
return fmt.Errorf("failed to create a sideInputWatcher, %w", err)
}
return startSideInputInitializer(ctx, sideInputWatcher, dfv1.PathSideInputsMount, sii.sideInputs)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/sideinputs/initializer/initializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,14 @@ func TestSideInputsInitializer_Success(t *testing.T) {

for x, sideInput := range sideInputs {
p := path.Join(mountPath, sideInput)
fileData, err := utils.FetchSideInputFile(p)
fileData, err := utils.FetchSideInputFileValue(p)
for err != nil {
select {
case <-ctx.Done():
t.Fatalf("Context timeout")
default:
time.Sleep(10 * time.Millisecond)
fileData, err = utils.FetchSideInputFile(p)
fileData, err = utils.FetchSideInputFileValue(p)
}
}
assert.Equal(t, dataTest[x], string(fileData))
Expand Down
Loading
Loading