Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
Signed-off-by: Sidhant Kohli <[email protected]>
  • Loading branch information
kohlisid committed Aug 21, 2023
1 parent 4c86453 commit fbb589e
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 42 deletions.
17 changes: 9 additions & 8 deletions pkg/sideinputs/initializer/initializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ func NewSideInputsInitializer(isbSvcType dfv1.ISBSvcType, pipelineName, sideInpu
// and update the values on the disk. This would exit once all the side inputs are initialized.
func (sii *sideInputsInitializer) Run(ctx context.Context) error {
var (
natsClient *jsclient.NATSClient
err error
natsClient *jsclient.NATSClient
err error
sideInputWatcher kvs.KVWatcher
)

log := logging.FromContext(ctx)
Expand All @@ -75,15 +76,15 @@ func (sii *sideInputsInitializer) Run(ctx context.Context) error {
return err
}
defer natsClient.Close()
// Load the required KV bucket and create a sideInputWatcher for it
kvName := isbsvc.JetStreamSideInputsStoreKVName(sii.sideInputsStore)
sideInputWatcher, err = jetstream.NewKVJetStreamKVWatch(ctx, kvName, natsClient)
if err != nil {
return fmt.Errorf("failed to create a sideInputWatcher, %w", err)
}
default:
return fmt.Errorf("unrecognized isbsvc type %q", sii.isbSvcType)
}
// Load the required KV bucket and create a sideInputWatcher for it
kvName := isbsvc.JetStreamSideInputsStoreKVName(sii.sideInputsStore)
sideInputWatcher, err := jetstream.NewKVJetStreamKVWatch(ctx, kvName, natsClient)
if err != nil {
return fmt.Errorf("failed to create a sideInputWatcher, %w", err)
}
return startSideInputInitializer(ctx, sideInputWatcher, dfv1.PathSideInputsMount, sii.sideInputs)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/sideinputs/initializer/initializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,14 @@ func TestSideInputsInitializer_Success(t *testing.T) {

for x, sideInput := range sideInputs {
p := path.Join(mountPath, sideInput)
fileData, err := utils.FetchSideInputFile(p)
fileData, err := utils.FetchSideInputFileValue(p)
for err != nil {
select {
case <-ctx.Done():
t.Fatalf("Context timeout")
default:
time.Sleep(10 * time.Millisecond)
fileData, err = utils.FetchSideInputFile(p)
fileData, err = utils.FetchSideInputFileValue(p)
}
}
assert.Equal(t, dataTest[x], string(fileData))
Expand Down
15 changes: 8 additions & 7 deletions pkg/sideinputs/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func (sim *sideInputsManager) Start(ctx context.Context) error {

var natsClient *jsclient.NATSClient
var err error
var siStore kvs.KVStorer
switch sim.isbSvcType {
case dfv1.ISBSvcTypeRedis:
return fmt.Errorf("unsupported isbsvc type %q", sim.isbSvcType)
Expand All @@ -66,6 +67,12 @@ func (sim *sideInputsManager) Start(ctx context.Context) error {
log.Errorw("Failed to get a NATS client.", zap.Error(err))
return err
}
sideInputBucketName := isbsvc.JetStreamSideInputsStoreKVName(sim.sideInputsStore)
siStore, err = jetstream.NewKVJetStreamKVStore(ctx, sim.pipelineName, sideInputBucketName, natsClient)
if err != nil {
return fmt.Errorf("failed to create a new KVStore: %w", err)
}

default:
return fmt.Errorf("unrecognized isbsvc type %q", sim.isbSvcType)
}
Expand All @@ -86,12 +93,6 @@ func (sim *sideInputsManager) Start(ctx context.Context) error {
}
}()

sideInputBucketName := isbsvc.JetStreamSideInputsStoreBucket(sim.sideInputsStore)
siStore, err := jetstream.NewKVJetStreamKVStore(ctx, sim.pipelineName, sideInputBucketName, natsClient)
if err != nil {
return fmt.Errorf("failed to create a new KVStore: %w", err)
}

f := func() {
if err := sim.execute(ctx, sideInputClient, siStore); err != nil {
log.Errorw("Failed to execute the call to fetch Side Inputs.", zap.Error(err))
Expand All @@ -115,7 +116,7 @@ func (sim *sideInputsManager) Start(ctx context.Context) error {

func (sim *sideInputsManager) execute(ctx context.Context, sideInputClient *udsideinput.UDSgRPCBasedUDSideinput, siStore kvs.KVStorer) error {
log := logging.FromContext(ctx)
log.Info("Executing Side Input manager cron ...")
log.Info("Executing Side Inputs manager cron ...")
resp, err := sideInputClient.Apply(ctx)
if err != nil {
return fmt.Errorf("failed to retrieve side input: %w", err)
Expand Down
31 changes: 13 additions & 18 deletions pkg/sideinputs/synchronizer/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package synchronizer

import (
"bytes"
"context"
"fmt"
"path"
Expand Down Expand Up @@ -57,8 +56,9 @@ func NewSideInputsSynchronizer(isbSvcType dfv1.ISBSvcType, pipelineName, sideInp
// and keeps on watching for updates for all the side inputs while writing the new values to the disk.
func (sis *sideInputsSynchronizer) Start(ctx context.Context) error {
var (
natsClient *jsclient.NATSClient
err error
natsClient *jsclient.NATSClient
err error
sideInputWatcher kvs.KVWatcher
)

log := logging.FromContext(ctx)
Expand All @@ -76,15 +76,15 @@ func (sis *sideInputsSynchronizer) Start(ctx context.Context) error {
log.Errorw("Failed to get a NATS client.", zap.Error(err))
return err
}
// Create a new watcher for the side input KV store
kvName := isbsvc.JetStreamSideInputsStoreKVName(sis.sideInputsStore)
sideInputWatcher, err = jetstream.NewKVJetStreamKVWatch(ctx, kvName, natsClient)
if err != nil {
return fmt.Errorf("failed to create a sideInputWatcher, %w", err)
}
default:
return fmt.Errorf("unrecognized isbsvc type %q", sis.isbSvcType)
}
// Create a new watcher for the side input KV store
kvName := isbsvc.JetStreamSideInputsStoreKVName(sis.sideInputsStore)
sideInputWatcher, err := jetstream.NewKVJetStreamKVWatch(ctx, kvName, natsClient)
if err != nil {
return fmt.Errorf("failed to create a sideInputWatcher, %w", err)
}
go startSideInputSynchronizer(ctx, sideInputWatcher, dfv1.PathSideInputsMount)
<-ctx.Done()
return nil
Expand All @@ -95,7 +95,6 @@ func (sis *sideInputsSynchronizer) Start(ctx context.Context) error {
func startSideInputSynchronizer(ctx context.Context, watch kvs.KVWatcher, mountPath string) {
log := logging.FromContext(ctx)
watchCh, stopped := watch.Watch(ctx)
var prevValue []byte
for {
select {
case <-stopped:
Expand All @@ -109,14 +108,10 @@ func startSideInputSynchronizer(ctx context.Context, watch kvs.KVWatcher, mountP
log.Infow("Side Input value received ",
zap.String("key", value.Key()), zap.String("value", string(value.Value())))
p := path.Join(mountPath, value.Key())
// Check if the value has changed from the last update
if prevValue == nil || bytes.Equal(prevValue, value.Value()) == false {
// Write changes to disk
err := utils.UpdateSideInputFile(ctx, p, value.Value())
if err != nil {
log.Errorw("Failed to update Side Input value %s", zap.Error(err))
}
prevValue = value.Value()
// Write changes to disk
err := utils.UpdateSideInputFile(ctx, p, value.Value())
if err != nil {
log.Errorw("Failed to update Side Input value %s", zap.Error(err))
}
continue
case <-ctx.Done():
Expand Down
4 changes: 2 additions & 2 deletions pkg/sideinputs/synchronizer/synchronizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,14 @@ func TestSideInputsValueUpdates(t *testing.T) {

for x, sideInput := range sideInputs {
p := path.Join(mountPath, sideInput)
fileData, err := utils.FetchSideInputFile(p)
fileData, err := utils.FetchSideInputFileValue(p)
for err != nil {
select {
case <-ctx.Done():
t.Fatalf("Context timeout")
default:
time.Sleep(10 * time.Millisecond)
fileData, err = utils.FetchSideInputFile(p)
fileData, err = utils.FetchSideInputFileValue(p)
}
}
assert.Equal(t, dataTest[x], string(fileData))
Expand Down
22 changes: 18 additions & 4 deletions pkg/sideinputs/utils/utils.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package utils

import (
"bytes"
"context"
"fmt"
"os"
"time"

"go.uber.org/zap"

"github.com/numaproj/numaflow/pkg/shared/logging"
)

Expand All @@ -24,8 +27,20 @@ func UpdateSideInputFile(ctx context.Context, fileSymLink string, value []byte)
timestamp := time.Now().UnixNano()
newFileName := fmt.Sprintf("%s_%d", fileSymLink, timestamp)

// Fetch the current side input value from the file
currentValue, err := FetchSideInputFileValue(fileSymLink)

// Check if the current value is same as the new value
// If true then don't update file again and return
if err == nil && bytes.Equal(currentValue, value) {
log.Debugw("Side Input value is same as current value, "+
"skipping update", zap.String("Side Input", fileSymLink))
return nil
}

// Write the side input value to the new file
err := os.WriteFile(newFileName, value, 0666)
// A New file is created with the given name if it doesn't exist
err = os.WriteFile(newFileName, value, 0666)
if err != nil {
return fmt.Errorf("failed to write Side Input file %s : %w", newFileName, err)
}
Expand Down Expand Up @@ -58,9 +73,8 @@ func UpdateSideInputFile(ctx context.Context, fileSymLink string, value []byte)
return nil
}

// FetchSideInputFile reads a given file and returns the value in bytes
// Used as utility for unit tests
func FetchSideInputFile(filePath string) ([]byte, error) {
// FetchSideInputFileValue reads a given file and returns the value in bytes
func FetchSideInputFileValue(filePath string) ([]byte, error) {
b, err := os.ReadFile(filePath)
if err != nil {
return nil, fmt.Errorf("failed to read Side Input %s file: %w", filePath, err)
Expand Down
47 changes: 46 additions & 1 deletion pkg/sideinputs/utils/utils_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package utils

import (
"bytes"
"context"
"os"
"testing"
Expand Down Expand Up @@ -38,13 +39,15 @@ func TestSymLinkUpdate(t *testing.T) {
err := os.Mkdir(mountPath, 0777)
assert.NoError(t, err)
}
byteArray = []byte("test")
// Write data to the link
err = UpdateSideInputFile(ctx, filePath, byteArray)
assert.NoError(t, err)
// Get the target file from the symlink
file1, err := os.Readlink(filePath)
assert.NoError(t, err)
// Write data to the link again
byteArray = []byte("test-new")
// Write data to the link again with new value
err = UpdateSideInputFile(ctx, filePath, byteArray)
assert.NoError(t, err)
// Get the new target file from the symlink
Expand Down Expand Up @@ -84,3 +87,45 @@ func TestSymLinkFileDelete(t *testing.T) {
// The older file should have been deleted
assert.False(t, CheckFileExists(file1))
}

// TestUpdateSideInputFileNoUpdate tests if the new value is same as the current
// value then new file isn't created and file is not updated.
func TestUpdateSideInputFileNoUpdate(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
var (
size = int64(10 * 1024 * 1024) // 100 MB
byteArray = make([]byte, size)
)
mountPath, err := os.MkdirTemp("", "side-input")
assert.NoError(t, err)
// Clean up
defer cleanup(mountPath)

filePath, err := os.CreateTemp(mountPath, "unit-test")
assert.NoError(t, err)
fileName := filePath.Name()

byteArray = []byte("test")
// Write data to the link
err = UpdateSideInputFile(ctx, fileName, byteArray)
assert.NoError(t, err)
// Get the target file from the symlink
file1, err := os.Readlink(fileName)
assert.NoError(t, err)
data1, err := FetchSideInputFileValue(fileName)
assert.NoError(t, err)
// Write data to the link again with same value
err = UpdateSideInputFile(ctx, fileName, byteArray)
assert.NoError(t, err)
// Get the new target file from the symlink
file2, err := os.Readlink(fileName)
assert.NoError(t, err)
data2, err := FetchSideInputFileValue(fileName)
assert.NoError(t, err)
// We expect the target to be same file
assert.Equal(t, file1, file2)
// We expect the target to have the same data
assert.True(t, bytes.Equal(data1, data2))

}

0 comments on commit fbb589e

Please sign in to comment.