Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add side input sdkclient and grpc #953

Merged
merged 15 commits into from
Aug 25, 2023
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ require (
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe
github.com/nats-io/nats-server/v2 v2.9.19
github.com/nats-io/nats.go v1.27.1
github.com/numaproj/numaflow-go v0.4.6-0.20230822054239-88190e94e727
github.com/numaproj/numaflow-go v0.4.6-0.20230824220200-630a5eba1f54
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/common v0.37.0
github.com/redis/go-redis/v9 v9.0.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -670,8 +670,8 @@ github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5s
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/numaproj/numaflow-go v0.4.6-0.20230822054239-88190e94e727 h1:m+2sl0pbBvhiiLEXyyslBv0GeWXm/1wpR4PUg0C2xY8=
github.com/numaproj/numaflow-go v0.4.6-0.20230822054239-88190e94e727/go.mod h1:5zwvvREIbqaCPCKsNE1MVjVToD0kvkCh2Z90Izlhw5U=
github.com/numaproj/numaflow-go v0.4.6-0.20230824220200-630a5eba1f54 h1:nx77VKeseDKPFHhY4AMecvzhJw8oSEVeisAROufT5dU=
github.com/numaproj/numaflow-go v0.4.6-0.20230824220200-630a5eba1f54/go.mod h1:5zwvvREIbqaCPCKsNE1MVjVToD0kvkCh2Z90Izlhw5U=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
Expand Down
75 changes: 75 additions & 0 deletions pkg/sdkclient/sideinput/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package sideinput

import (
"context"
"fmt"
"time"

sideinputpb "github.com/numaproj/numaflow-go/pkg/apis/proto/sideinput/v1"
"github.com/numaproj/numaflow-go/pkg/shared"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/types/known/emptypb"
)

// client contains the grpc connection and the grpc client.
type client struct {
conn *grpc.ClientConn
grpcClt sideinputpb.SideInputClient
}

var _ Client = (*client)(nil)

// New creates a new client object.
func New(inputOptions ...Option) (*client, error) {
var opts = &options{
sockAddr: shared.SideInputAddr,
maxMessageSize: 1024 * 1024 * 64, // 64 MB
}
for _, inputOption := range inputOptions {
inputOption(opts)
}
_, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()
c := new(client)
sockAddr := fmt.Sprintf("%s:%s", shared.UDS, opts.sockAddr)
conn, err := grpc.Dial(sockAddr, grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(opts.maxMessageSize), grpc.MaxCallSendMsgSize(opts.maxMessageSize)))
if err != nil {
return nil, fmt.Errorf("failed to execute grpc.Dial(%q): %w", sockAddr, err)
}
c.conn = conn
c.grpcClt = sideinputpb.NewSideInputClient(conn)
return c, nil
}

// NewFromClient creates a new client object from a grpc client. This is used for testing.
func NewFromClient(c sideinputpb.SideInputClient) (Client, error) {
whynowy marked this conversation as resolved.
Show resolved Hide resolved
return &client{
grpcClt: c,
}, nil
}

// CloseConn closes the grpc connection.
func (c client) CloseConn(ctx context.Context) error {
return c.conn.Close()
}

// IsReady checks if the grpc connection is ready to use.
func (c client) IsReady(ctx context.Context, in *emptypb.Empty) (bool, error) {
resp, err := c.grpcClt.IsReady(ctx, in)
if err != nil {
return false, err
}
return resp.GetReady(), nil
}

// RetrieveSideInput retrieves the side input value and returns the updated payload.
func (c client) RetrieveSideInput(ctx context.Context, in *emptypb.Empty) (*sideinputpb.SideInputResponse, error) {
retrieveResponse, err := c.grpcClt.RetrieveSideInput(ctx, in)
// TODO check which error to use
if err != nil {
return nil, fmt.Errorf("failed to execute c.grpcClt.RetrieveSideInput(): %w", err)
}
return retrieveResponse, nil
}
15 changes: 15 additions & 0 deletions pkg/sdkclient/sideinput/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package sideinput

import (
"context"

sideinputpb "github.com/numaproj/numaflow-go/pkg/apis/proto/sideinput/v1"
"google.golang.org/protobuf/types/known/emptypb"
)

// Client contains methods to call a gRPC client for side input.
type Client interface {
CloseConn(ctx context.Context) error
IsReady(ctx context.Context, in *emptypb.Empty) (bool, error)
RetrieveSideInput(ctx context.Context, in *emptypb.Empty) (*sideinputpb.SideInputResponse, error)
}
49 changes: 49 additions & 0 deletions pkg/sdkclient/sideinput/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
Copyright 2022 The Numaproj Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package sideinput

import "time"

type options struct {
sockAddr string
maxMessageSize int
sideInputTimeout time.Duration
}

// Option is the interface to apply options.
type Option func(*options)

// WithSockAddr start the client with the given sock addr. This is mainly used for testing purpose.
func WithSockAddr(addr string) Option {
return func(opts *options) {
opts.sockAddr = addr
}
}

// WithMaxMessageSize sets the max message size to the given size.
func WithMaxMessageSize(size int) Option {
return func(o *options) {
o.maxMessageSize = size
}
}

// WithSideInputTimeout sets the side input timeout to the given timeout.
func WithSideInputTimeout(t time.Duration) Option {
return func(o *options) {
o.sideInputTimeout = t
}
}
17 changes: 9 additions & 8 deletions pkg/sideinputs/initializer/initializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ func NewSideInputsInitializer(isbSvcType dfv1.ISBSvcType, pipelineName, sideInpu
// and update the values on the disk. This would exit once all the side inputs are initialized.
func (sii *sideInputsInitializer) Run(ctx context.Context) error {
var (
natsClient *jsclient.NATSClient
err error
natsClient *jsclient.NATSClient
err error
sideInputWatcher kvs.KVWatcher
)

log := logging.FromContext(ctx)
Expand All @@ -75,15 +76,15 @@ func (sii *sideInputsInitializer) Run(ctx context.Context) error {
return err
}
defer natsClient.Close()
// Load the required KV bucket and create a sideInputWatcher for it
kvName := isbsvc.JetStreamSideInputsStoreKVName(sii.sideInputsStore)
sideInputWatcher, err = jetstream.NewKVJetStreamKVWatch(ctx, kvName, natsClient)
if err != nil {
return fmt.Errorf("failed to create a sideInputWatcher, %w", err)
}
default:
return fmt.Errorf("unrecognized isbsvc type %q", sii.isbSvcType)
}
// Load the required KV bucket and create a sideInputWatcher for it
kvName := isbsvc.JetStreamSideInputsStoreKVName(sii.sideInputsStore)
sideInputWatcher, err := jetstream.NewKVJetStreamKVWatch(ctx, kvName, natsClient)
if err != nil {
return fmt.Errorf("failed to create a sideInputWatcher, %w", err)
}
return startSideInputInitializer(ctx, sideInputWatcher, dfv1.PathSideInputsMount, sii.sideInputs)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/sideinputs/initializer/initializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,14 @@ func TestSideInputsInitializer_Success(t *testing.T) {

for x, sideInput := range sideInputs {
p := path.Join(mountPath, sideInput)
fileData, err := utils.FetchSideInputFile(p)
fileData, err := utils.FetchSideInputFileValue(p)
for err != nil {
select {
case <-ctx.Done():
t.Fatalf("Context timeout")
default:
time.Sleep(10 * time.Millisecond)
fileData, err = utils.FetchSideInputFile(p)
fileData, err = utils.FetchSideInputFileValue(p)
}
}
assert.Equal(t, dataTest[x], string(fileData))
Expand Down
61 changes: 50 additions & 11 deletions pkg/sideinputs/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,12 @@ import (

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/isbsvc"
"github.com/numaproj/numaflow/pkg/sdkclient/sideinput"
jsclient "github.com/numaproj/numaflow/pkg/shared/clients/nats"
"github.com/numaproj/numaflow/pkg/shared/kvs"
"github.com/numaproj/numaflow/pkg/shared/kvs/jetstream"
"github.com/numaproj/numaflow/pkg/shared/logging"
udsideinput "github.com/numaproj/numaflow/pkg/sideinputs/udsideinput"
)

type sideInputsManager struct {
Expand All @@ -52,31 +56,53 @@ func (sim *sideInputsManager) Start(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

var isbSvcClient isbsvc.ISBService
var natsClient *jsclient.NATSClient
var err error
var siStore kvs.KVStorer
switch sim.isbSvcType {
case dfv1.ISBSvcTypeRedis:
return fmt.Errorf("unsupported isbsvc type %q", sim.isbSvcType)
case dfv1.ISBSvcTypeJetStream:
natsClient, err := jsclient.NewNATSClient(ctx)
natsClient, err = jsclient.NewNATSClient(ctx)
if err != nil {
log.Errorw("Failed to get a NATS client.", zap.Error(err))
return err
}
kohlisid marked this conversation as resolved.
Show resolved Hide resolved
isbSvcClient, err = isbsvc.NewISBJetStreamSvc(sim.pipelineName, isbsvc.WithJetStreamClient(natsClient))
defer natsClient.Close()
// Load the required KV bucket and create a sideInputWatcher for it
sideInputBucketName := isbsvc.JetStreamSideInputsStoreKVName(sim.sideInputsStore)
siStore, err = jetstream.NewKVJetStreamKVStore(ctx, sideInputBucketName, natsClient)
if err != nil {
log.Errorw("Failed to get an ISB Service client.", zap.Error(err))
return err
return fmt.Errorf("failed to create a new KVStore: %w", err)
}

default:
return fmt.Errorf("unrecognized isbsvc type %q", sim.isbSvcType)
}

// TODO(SI): remove it.
fmt.Printf("ISB Svc Client nil: %v\n", isbSvcClient == nil)
// Create a new gRPC client for UDSideInput
c, err := sideinput.New()
if err != nil {
return fmt.Errorf("failed to create a new gRPC client: %w", err)
}
sideInputClient := udsideinput.NewUDSgRPCBasedUDSideinput(c)
if err != nil {
return fmt.Errorf("failed to create a new gRPC client: %w", err)
}
// Readiness check
if err = sideInputClient.WaitUntilReady(ctx); err != nil {
return fmt.Errorf("failed on UDSideInput readiness check, %w", err)
}
// close the connection when the function exits
defer func() {
kohlisid marked this conversation as resolved.
Show resolved Hide resolved
err = sideInputClient.CloseConn(ctx)
if err != nil {
log.Warnw("Failed to close gRPC client conn", zap.Error(err))
}
}()

f := func() {
if err := sim.execute(ctx); err != nil {
if err := sim.execute(ctx, sideInputClient, siStore); err != nil {
log.Errorw("Failed to execute the call to fetch Side Inputs.", zap.Error(err))
}
}
Expand All @@ -96,10 +122,23 @@ func (sim *sideInputsManager) Start(ctx context.Context) error {
return nil
}

func (sim *sideInputsManager) execute(ctx context.Context) error {
func (sim *sideInputsManager) execute(ctx context.Context, sideInputClient *udsideinput.UDSgRPCBasedUDSideinput, siStore kvs.KVStorer) error {
log := logging.FromContext(ctx)
// TODO(SI): call ud container to fetch data and write to store.
log.Info("Executing ...")
log.Info("Executing Side Inputs manager cron ...")
resp, err := sideInputClient.Apply(ctx)
if err != nil {
return fmt.Errorf("failed to retrieve side input: %w", err)
}
// If the NoBroadcast flag is True, skip writing to the store.
if resp.NoBroadcast {
log.Info("Side input is not broadcasted, skipping ...")
return nil
}
// Write the side input value to the store.
err = siStore.PutKV(ctx, sim.sideInput.Name, resp.Value)
if err != nil {
return fmt.Errorf("failed to write side input %q to store: %w", sim.sideInput.Name, err)
}
return nil
}

Expand Down
19 changes: 10 additions & 9 deletions pkg/sideinputs/synchronizer/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ func NewSideInputsSynchronizer(isbSvcType dfv1.ISBSvcType, pipelineName, sideInp
// and keeps on watching for updates for all the side inputs while writing the new values to the disk.
func (sis *sideInputsSynchronizer) Start(ctx context.Context) error {
var (
natsClient *jsclient.NATSClient
err error
natsClient *jsclient.NATSClient
err error
sideInputWatcher kvs.KVWatcher
)

log := logging.FromContext(ctx)
Expand All @@ -75,15 +76,15 @@ func (sis *sideInputsSynchronizer) Start(ctx context.Context) error {
log.Errorw("Failed to get a NATS client.", zap.Error(err))
return err
}
// Create a new watcher for the side input KV store
kvName := isbsvc.JetStreamSideInputsStoreKVName(sis.sideInputsStore)
sideInputWatcher, err = jetstream.NewKVJetStreamKVWatch(ctx, kvName, natsClient)
if err != nil {
return fmt.Errorf("failed to create a sideInputWatcher, %w", err)
}
default:
return fmt.Errorf("unrecognized isbsvc type %q", sis.isbSvcType)
}
// Create a new watcher for the side input KV store
kvName := isbsvc.JetStreamSideInputsStoreKVName(sis.sideInputsStore)
sideInputWatcher, err := jetstream.NewKVJetStreamKVWatch(ctx, kvName, natsClient)
if err != nil {
return fmt.Errorf("failed to create a sideInputWatcher, %w", err)
}
go startSideInputSynchronizer(ctx, sideInputWatcher, dfv1.PathSideInputsMount)
<-ctx.Done()
return nil
Expand All @@ -104,7 +105,7 @@ func startSideInputSynchronizer(ctx context.Context, watch kvs.KVWatcher, mountP
log.Warnw("nil value received from Side Input watcher")
continue
}
log.Debug("Side Input value received ",
log.Infow("Side Input value received ",
zap.String("key", value.Key()), zap.String("value", string(value.Value())))
p := path.Join(mountPath, value.Key())
// Write changes to disk
Expand Down
4 changes: 2 additions & 2 deletions pkg/sideinputs/synchronizer/synchronizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,14 @@ func TestSideInputsValueUpdates(t *testing.T) {

for x, sideInput := range sideInputs {
p := path.Join(mountPath, sideInput)
fileData, err := utils.FetchSideInputFile(p)
fileData, err := utils.FetchSideInputFileValue(p)
for err != nil {
select {
case <-ctx.Done():
t.Fatalf("Context timeout")
default:
time.Sleep(10 * time.Millisecond)
fileData, err = utils.FetchSideInputFile(p)
fileData, err = utils.FetchSideInputFileValue(p)
}
}
assert.Equal(t, dataTest[x], string(fileData))
Expand Down
Loading
Loading