Skip to content

Commit

Permalink
fix: incorrect side inputs initialization ready check logic
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Wang <[email protected]>
  • Loading branch information
whynowy committed Oct 5, 2023
1 parent 26b6472 commit 85eddd1
Show file tree
Hide file tree
Showing 6 changed files with 9 additions and 12 deletions.
2 changes: 1 addition & 1 deletion cmd/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,5 @@ func init() {
rootCmd.AddCommand(NewWebhookCommand())
rootCmd.AddCommand(NewSideInputsInitCommand())
rootCmd.AddCommand(NewSideInputsManagerCommand())
rootCmd.AddCommand(NewSideInputsWatcherCommand())
rootCmd.AddCommand(NewSideInputsSynchronizerCommand())
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ import (
"github.com/numaproj/numaflow/pkg/sideinputs/synchronizer"
)

func NewSideInputsWatcherCommand() *cobra.Command {
func NewSideInputsSynchronizerCommand() *cobra.Command {
var (
isbSvcType string
sideInputsStore string
sideInputs []string
)
command := &cobra.Command{
Use: "side-inputs-watcher",
Short: "Start the Side Inputs Watcher",
Use: "side-inputs-synchronizer",
Short: "Start the Side Inputs Synchronizer",
RunE: func(cmd *cobra.Command, args []string) error {
pipelineName, defined := os.LookupEnv(dfv1.EnvPipelineName)
if !defined {
Expand All @@ -47,7 +47,7 @@ func NewSideInputsWatcherCommand() *cobra.Command {
return fmt.Errorf("no side inputs are defined for this vertex")
}

logger := logging.NewLogger().Named("side-inputs-watcher").With("pipeline", pipelineName)
logger := logging.NewLogger().Named("side-inputs-synchronizer").With("pipeline", pipelineName)
ctx := logging.WithLogger(signals.SetupSignalHandler(), logger)
sideInputsWatcher := synchronizer.NewSideInputsSynchronizer(dfv1.ISBSvcType(isbSvcType), pipelineName, sideInputsStore, sideInputs)
return sideInputsWatcher.Start(ctx)
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/numaflow/v1alpha1/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ const (
CtrUdtransformer = "transformer"
CtrUdSideInput = "udsi"
CtrInitSideInputs = "init-side-inputs"
CtrSideInputsWatcher = "side-inputs-watcher"
CtrSideInputsWatcher = "side-inputs-synchronizer"

// components
ComponentISBSvc = "isbsvc"
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/numaflow/v1alpha1/vertex_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func (v Vertex) GetPodSpec(req GetVertexPodSpecReq) (*corev1.PodSpec, error) {
Image: req.Image,
ImagePullPolicy: req.PullPolicy,
Resources: standardResources,
Args: []string{"side-inputs-watcher", "--isbsvc-type=" + string(req.ISBSvcType), "--side-inputs-store=" + req.SideInputsStoreName, "--side-inputs=" + strings.Join(v.Spec.SideInputs, ",")},
Args: []string{"side-inputs-synchronizer", "--isbsvc-type=" + string(req.ISBSvcType), "--side-inputs-store=" + req.SideInputsStoreName, "--side-inputs=" + strings.Join(v.Spec.SideInputs, ",")},
}
sideInputsWatcher.Env = append(sideInputsWatcher.Env, v.commonEnvs()...)
if x := v.Spec.SideInputsContainerTemplate; x != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/shared/kvs/jetstream/kv_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (jss *jetStreamStore) Watch(ctx context.Context) (<-chan kvs.KVEntry, <-cha
continue
}
if value == nil {
jss.log.Infow("watcher initialization and subscription got nil value")
jss.log.Infow("Watcher initialization and subscription got nil value")
continue
}
jss.previousFetchTime = value.Created()
Expand Down
5 changes: 1 addition & 4 deletions pkg/sideinputs/initializer/initializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func startSideInputInitializer(ctx context.Context, store kvs.KVStorer, mountPat
if gotAllSideInputVals(sideInputs, m) {
for sideInput := range m {
p := path.Join(mountPath, sideInput)
log.Info("Initializing Side Input data for %q", p)
log.Infof("Initializing Side Input data for %q", sideInput)
err := utils.UpdateSideInputFile(ctx, p, m[sideInput])
if err != nil {
return fmt.Errorf("failed to update Side Input value, %w", err)
Expand All @@ -132,9 +132,6 @@ func startSideInputInitializer(ctx context.Context, store kvs.KVStorer, mountPat
// gotAllSideInputVals checks if values for all side-inputs
// have been received from the KV bucket
func gotAllSideInputVals(sideInputs []string, m map[string][]byte) bool {
if len(sideInputs) != len(m) {
return false
}
for _, sideInput := range sideInputs {
if _, ok := m[sideInput]; !ok {
return false
Expand Down

0 comments on commit 85eddd1

Please sign in to comment.