Skip to content

Commit

Permalink
xxxx
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Wang <[email protected]>
  • Loading branch information
whynowy committed Jul 17, 2023
1 parent ec2d395 commit 1f99e60
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 21 deletions.
21 changes: 14 additions & 7 deletions cmd/commands/side_inputs_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,35 +19,42 @@ package commands
import (
"context"
"fmt"
"os"

"github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/daemon/server"
dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/shared/logging"
"github.com/numaproj/numaflow/pkg/sideinputs/initializer"
"github.com/spf13/cobra"
)

func NewSideInputsInitCommand() *cobra.Command {
var (
isbSvcType string
sideInputsStore string
sideInputs []string
)
command := &cobra.Command{
Use: "side-inputs-init",
Short: "Start the Side Inputs init service",
RunE: func(cmd *cobra.Command, args []string) error {
logger := logging.NewLogger().Named("side-inputs-init")

pl, err := decodePipeline()
if err != nil {
return fmt.Errorf("failed to decode the pipeline spec: %v", err)
pipelineName, defined := os.LookupEnv(dfv1.EnvPipelineName)
if !defined {
return fmt.Errorf("environment %q is not defined", dfv1.EnvPipelineName)
}

if len(sideInputs) == 0 {
return fmt.Errorf("no side inputs are defined for this vertex")
}

ctx := logging.WithLogger(context.Background(), logger)
server := server.NewDaemonServer(pl, v1alpha1.ISBSvcType(isbSvcType))
return server.Run(ctx)
sideInputsInitializer := initializer.NewSideInputsInitializer(dfv1.ISBSvcType(isbSvcType), pipelineName, sideInputsStore, sideInputs)
return sideInputsInitializer.Run(ctx)
},
}
command.Flags().StringVar(&isbSvcType, "isbsvc-type", "jetstream", "ISB Service type, e.g. jetstream")
command.Flags().StringVar(&sideInputsStore, "side-inputs-store", "", "Name of the side inputs store")
command.Flags().StringSliceVar(&sideInputs, "side-inputs", []string{}, "Side Input names") // --side-inputs=si1,si2 --side-inputs=si3
return command
}
28 changes: 18 additions & 10 deletions cmd/commands/side_inputs_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,37 +17,45 @@ limitations under the License.
package commands

import (
"context"
"fmt"
"os"

"github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/daemon/server"
"github.com/numaproj/numaflow/pkg/shared/logging"
"github.com/spf13/cobra"
"sigs.k8s.io/controller-runtime/pkg/manager/signals"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/shared/logging"
"github.com/numaproj/numaflow/pkg/sideinputs/watcher"
)

func NewSideInputsWatcherCommand() *cobra.Command {
var (
isbSvcType string
sideInputsStore string
sideInputs []string
)
command := &cobra.Command{
Use: "side-inputs-watcher",
Short: "Start the Side Inputs Watcher",
RunE: func(cmd *cobra.Command, args []string) error {
logger := logging.NewLogger().Named("side-inputs-watcher")

pl, err := decodePipeline()
if err != nil {
return fmt.Errorf("failed to decode the pipeline spec: %v", err)
pipelineName, defined := os.LookupEnv(dfv1.EnvPipelineName)
if !defined {
return fmt.Errorf("environment %q is not defined", dfv1.EnvPipelineName)
}

if len(sideInputs) == 0 {
return fmt.Errorf("no side inputs are defined for this vertex")
}

ctx := logging.WithLogger(context.Background(), logger)
server := server.NewDaemonServer(pl, v1alpha1.ISBSvcType(isbSvcType))
return server.Run(ctx)
ctx := logging.WithLogger(signals.SetupSignalHandler(), logger)
sideInputsWatcher := watcher.NewSideInputsWatcher(dfv1.ISBSvcType(isbSvcType), pipelineName, sideInputsStore, sideInputs)
return sideInputsWatcher.Start(ctx)
},
}
command.Flags().StringVar(&isbSvcType, "isbsvc-type", "jetstream", "ISB Service type, e.g. jetstream")
command.Flags().StringVar(&sideInputsStore, "side-inputs-store", "", "Name of the side inputs store")
command.Flags().StringSliceVar(&sideInputs, "side-inputs", []string{}, "Side Input names") // --side-inputs=si1,si2 --side-inputs=si3
return command
}
3 changes: 1 addition & 2 deletions pkg/apis/numaflow/v1alpha1/pipeline_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,11 +212,10 @@ func (p Pipeline) GetSideInputManagerDeployments(req GetSideInputDeploymentReq)
return nil, err
}
for i := 0; i < len(deployment.Spec.Template.Spec.Containers); i++ {
deployment.Spec.Template.Spec.Containers[0].Env = append(deployment.Spec.Template.Spec.Containers[0].Env, commonEnvVars...)
deployment.Spec.Template.Spec.Containers[i].Env = append(deployment.Spec.Template.Spec.Containers[i].Env, commonEnvVars...)
}
deployment.Spec.Template.Spec.InitContainers[0].Env = append(deployment.Spec.Template.Spec.InitContainers[0].Env, corev1.EnvVar{Name: EnvGoDebug, Value: os.Getenv(EnvGoDebug)})
deployment.Spec.Template.Spec.Containers[0].Env = append(deployment.Spec.Template.Spec.Containers[0].Env, corev1.EnvVar{Name: EnvGoDebug, Value: os.Getenv(EnvGoDebug)})
deployment.Spec.Template.Spec.Containers[1].Env = append(deployment.Spec.Template.Spec.Containers[1].Env, commonEnvVars...)
deployments = append(deployments, deployment)
}
return deployments, nil
Expand Down
59 changes: 59 additions & 0 deletions pkg/sideinputs/initializer/initializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,62 @@ limitations under the License.
*/

package initializer

import (
"context"
"fmt"
"path"

"go.uber.org/zap"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/isbsvc"
jsclient "github.com/numaproj/numaflow/pkg/shared/clients/nats"
"github.com/numaproj/numaflow/pkg/shared/logging"
)

type sideInputsInitializer struct {
isbSvcType dfv1.ISBSvcType
pipelineName string
sideInputsStore string
sideInputs []string
}

func NewSideInputsInitializer(isbSvcType dfv1.ISBSvcType, pipelineName, sideInputsStore string, sideInputs []string) *sideInputsInitializer {
return &sideInputsInitializer{
isbSvcType: isbSvcType,
sideInputsStore: sideInputsStore,
pipelineName: pipelineName,
sideInputs: sideInputs,
}
}

func (sii *sideInputsInitializer) Run(ctx context.Context) error {
log := logging.FromContext(ctx)
log.Infow("Starting Side Inputs Initializer", zap.Strings("sideInputs", sii.sideInputs))

var isbSvcClient isbsvc.ISBService
var err error
switch sii.isbSvcType {
case dfv1.ISBSvcTypeRedis:
return fmt.Errorf("unsupported isbsvc type %q", sii.isbSvcType)
case dfv1.ISBSvcTypeJetStream:
isbSvcClient, err = isbsvc.NewISBJetStreamSvc(sii.pipelineName, isbsvc.WithJetStreamClient(jsclient.NewInClusterJetStreamClient()))
if err != nil {
log.Errorw("Failed to get an ISB Service client.", zap.Error(err))
return err
}
default:
return fmt.Errorf("unrecognized isbsvc type %q", sii.isbSvcType)
}

// TODO(SI): do something
// Wait for the data is ready in the side input store, and then copy the data to the disk
fmt.Printf("ISB Svc Client nil: %v\n", isbSvcClient == nil)
for _, sideInput := range sii.sideInputs {
p := path.Join(dfv1.PathSideInputsMount, sideInput)
fmt.Printf("Initializing side input data for %q\n", p)
}

return nil
}
6 changes: 4 additions & 2 deletions pkg/sideinputs/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func NewSideInputManager(isbSvcType dfv1.ISBSvcType, pipelineName, sideInputsSto

func (sim *sideInputManager) Start(ctx context.Context) error {
log := logging.FromContext(ctx)
log.Infof("starting side input manager for %q", sim.sideInput.Name)
log.Infof("Starting Side Input Manager for %q", sim.sideInput.Name)
ctx, cancel := context.WithCancel(ctx)
defer cancel()

Expand All @@ -66,7 +66,9 @@ func (sim *sideInputManager) Start(ctx context.Context) error {
}

// TODO(SI): do something
fmt.Println(isbSvcClient == nil)
// Periodically call the ud container and write data to the store.
fmt.Printf("ISB Svc Client nil: %v\n", isbSvcClient == nil)
fmt.Printf("Pipeline: %s, SideInput: %s\n", sim.pipelineName, sim.sideInput.Name)

<-ctx.Done()
return nil
Expand Down
62 changes: 62 additions & 0 deletions pkg/sideinputs/watcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,65 @@ limitations under the License.
*/

package watcher

import (
"context"
"fmt"
"path"

"go.uber.org/zap"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/isbsvc"
jsclient "github.com/numaproj/numaflow/pkg/shared/clients/nats"
"github.com/numaproj/numaflow/pkg/shared/logging"
)

type sideInputsWatcher struct {
isbSvcType dfv1.ISBSvcType
pipelineName string
sideInputsStore string
sideInputs []string
}

func NewSideInputsWatcher(isbSvcType dfv1.ISBSvcType, pipelineName, sideInputsStore string, sideInputs []string) *sideInputsWatcher {
return &sideInputsWatcher{
isbSvcType: isbSvcType,
sideInputsStore: sideInputsStore,
pipelineName: pipelineName,
sideInputs: sideInputs,
}
}

func (siw *sideInputsWatcher) Start(ctx context.Context) error {
log := logging.FromContext(ctx)
log.Infow("Starting Side Inputs Watcher", zap.Strings("sideInputs", siw.sideInputs))
ctx, cancel := context.WithCancel(ctx)
defer cancel()

var isbSvcClient isbsvc.ISBService
var err error
switch siw.isbSvcType {
case dfv1.ISBSvcTypeRedis:
return fmt.Errorf("unsupported isbsvc type %q", siw.isbSvcType)
case dfv1.ISBSvcTypeJetStream:
isbSvcClient, err = isbsvc.NewISBJetStreamSvc(siw.pipelineName, isbsvc.WithJetStreamClient(jsclient.NewInClusterJetStreamClient()))
if err != nil {
log.Errorw("Failed to get an ISB Service client.", zap.Error(err))
return err
}
default:
return fmt.Errorf("unrecognized isbsvc type %q", siw.isbSvcType)
}

// TODO(SI): do something
// Watch the side inputs store for changes, and write to disk
fmt.Printf("ISB Svc Client nil: %v\n", isbSvcClient == nil)
for _, sideInput := range siw.sideInputs {
p := path.Join(dfv1.PathSideInputsMount, sideInput)
fmt.Printf("Initializing side input data for %q\n", p)
}

<-ctx.Done()
return nil
}

0 comments on commit 1f99e60

Please sign in to comment.