Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: incorrect side inputs watch logic #1164

Merged
merged 4 commits into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,5 @@ func init() {
rootCmd.AddCommand(NewWebhookCommand())
rootCmd.AddCommand(NewSideInputsInitCommand())
rootCmd.AddCommand(NewSideInputsManagerCommand())
rootCmd.AddCommand(NewSideInputsWatcherCommand())
rootCmd.AddCommand(NewSideInputsSynchronizerCommand())
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ import (
"github.com/numaproj/numaflow/pkg/sideinputs/synchronizer"
)

func NewSideInputsWatcherCommand() *cobra.Command {
func NewSideInputsSynchronizerCommand() *cobra.Command {
var (
isbSvcType string
sideInputsStore string
sideInputs []string
)
command := &cobra.Command{
Use: "side-inputs-watcher",
Short: "Start the Side Inputs Watcher",
Use: "side-inputs-synchronizer",
Short: "Start the Side Inputs Synchronizer",
RunE: func(cmd *cobra.Command, args []string) error {
pipelineName, defined := os.LookupEnv(dfv1.EnvPipelineName)
if !defined {
Expand All @@ -47,7 +47,7 @@ func NewSideInputsWatcherCommand() *cobra.Command {
return fmt.Errorf("no side inputs are defined for this vertex")
}

logger := logging.NewLogger().Named("side-inputs-watcher").With("pipeline", pipelineName)
logger := logging.NewLogger().Named("side-inputs-synchronizer").With("pipeline", pipelineName)
ctx := logging.WithLogger(signals.SetupSignalHandler(), logger)
sideInputsWatcher := synchronizer.NewSideInputsSynchronizer(dfv1.ISBSvcType(isbSvcType), pipelineName, sideInputsStore, sideInputs)
return sideInputsWatcher.Start(ctx)
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/numaflow/v1alpha1/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ const (
CtrUdtransformer = "transformer"
CtrUdSideInput = "udsi"
CtrInitSideInputs = "init-side-inputs"
CtrSideInputsWatcher = "side-inputs-watcher"
CtrSideInputsWatcher = "side-inputs-synchronizer"

// components
ComponentISBSvc = "isbsvc"
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/numaflow/v1alpha1/vertex_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func (v Vertex) GetPodSpec(req GetVertexPodSpecReq) (*corev1.PodSpec, error) {
Image: req.Image,
ImagePullPolicy: req.PullPolicy,
Resources: standardResources,
Args: []string{"side-inputs-watcher", "--isbsvc-type=" + string(req.ISBSvcType), "--side-inputs-store=" + req.SideInputsStoreName, "--side-inputs=" + strings.Join(v.Spec.SideInputs, ",")},
Args: []string{"side-inputs-synchronizer", "--isbsvc-type=" + string(req.ISBSvcType), "--side-inputs-store=" + req.SideInputsStoreName, "--side-inputs=" + strings.Join(v.Spec.SideInputs, ",")},
}
sideInputsWatcher.Env = append(sideInputsWatcher.Env, v.commonEnvs()...)
if x := v.Spec.SideInputsContainerTemplate; x != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/shared/kvs/jetstream/kv_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (jss *jetStreamStore) Watch(ctx context.Context) (<-chan kvs.KVEntry, <-cha
continue
}
if value == nil {
jss.log.Infow("watcher initialization and subscription got nil value")
jss.log.Infow("Watcher initialization and subscription got nil value")
continue
}
jss.previousFetchTime = value.Created()
Expand Down
7 changes: 2 additions & 5 deletions pkg/sideinputs/initializer/initializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,9 @@ func startSideInputInitializer(ctx context.Context, store kvs.KVStorer, mountPat
m[value.Key()] = value.Value()
// Wait for the data to be ready in the side input store, and then copy it to the disk
if gotAllSideInputVals(sideInputs, m) {
for sideInput := range m {
for _, sideInput := range sideInputs {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should only populate the side inputs that current vertex is watching.

p := path.Join(mountPath, sideInput)
log.Info("Initializing Side Input data for %q", p)
log.Infof("Initializing Side Input data for %q", sideInput)
err := utils.UpdateSideInputFile(ctx, p, m[sideInput])
if err != nil {
return fmt.Errorf("failed to update Side Input value, %w", err)
Expand All @@ -132,9 +132,6 @@ func startSideInputInitializer(ctx context.Context, store kvs.KVStorer, mountPat
// gotAllSideInputVals checks if values for all side-inputs
// have been received from the KV bucket
func gotAllSideInputVals(sideInputs []string, m map[string][]byte) bool {
if len(sideInputs) != len(m) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

len(m) is total number of side inputs defined in the pipeline, len(sideInputs) is the number of side inputs current vertex is watching.

return false
}
for _, sideInput := range sideInputs {
if _, ok := m[sideInput]; !ok {
return false
Expand Down
4 changes: 2 additions & 2 deletions pkg/sideinputs/synchronizer/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (sis *sideInputsSynchronizer) Start(ctx context.Context) error {
)

log := logging.FromContext(ctx)
log.Infow("Starting Side Inputs Watcher", zap.Strings("sideInputs", sis.sideInputs))
log.Infow("Starting Side Inputs Synchronizer", zap.Strings("sideInputs", sis.sideInputs))
ctx, cancel := context.WithCancel(ctx)
defer cancel()

Expand Down Expand Up @@ -98,7 +98,7 @@ func startSideInputSynchronizer(ctx context.Context, watch kvs.KVStorer, mountPa
for {
select {
case <-stopped:
log.Info("Side Input watcher stopped")
log.Info("Side Input Synchronizer stopped")
return
case value := <-watchCh:
if value == nil {
Expand Down
Loading