Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: move queue-related commands into queue subcommand tree #324

Merged
merged 1 commit into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 0 additions & 22 deletions cmd/subcommands/enqueue.go

This file was deleted.

24 changes: 24 additions & 0 deletions cmd/subcommands/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
//go:build !(full || observe)

package subcommands

import (
"github.com/spf13/cobra"

"lunchpail.io/cmd/subcommands/queue"
)

func init() {
cmd := &cobra.Command{
Use: "queue",
Short: "Commands related to the queue",
}
rootCmd.AddCommand(cmd)

// Currently components rely on these operations, and in
// Kubernetes, we currently use the "uncompiled" raw
// `lunchpail` executable for these operations:
cmd.AddCommand(queue.Add())
cmd.AddCommand(queue.Done())
cmd.AddCommand(queue.Download())
}
20 changes: 20 additions & 0 deletions cmd/subcommands/queue/add.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package queue

import (
"github.com/spf13/cobra"

"lunchpail.io/cmd/subcommands/queue/add"
)

func Add() *cobra.Command {
cmd := &cobra.Command{
Use: "add",
Short: "Commands that help with enqueueing work tasks",
Long: "Commands that help with enqueueing work tasks",
}

cmd.AddCommand(add.File())
cmd.AddCommand(add.S3())

return cmd
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package enqueue
package add

import (
"context"
Expand All @@ -11,7 +11,7 @@ import (
"lunchpail.io/pkg/runtime/queue"
)

func NewEnqueueFileCmd() *cobra.Command {
func File() *cobra.Command {
var cmd = &cobra.Command{
Use: "file <file>",
Short: "Enqueue a single file as a work task",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package enqueue
package add

import (
"context"
Expand All @@ -10,7 +10,7 @@ import (
"lunchpail.io/pkg/runtime/queue"
)

func NewEnqueueFromS3Cmd() *cobra.Command {
func S3() *cobra.Command {
var cmd = &cobra.Command{
Use: "s3 <path> <envVarPrefix>",
Short: "Enqueue a files in a given S3 path",
Expand Down
13 changes: 3 additions & 10 deletions cmd/subcommands/qcat.go → cmd/subcommands/queue/cat.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//go:build full || observe

package subcommands
package queue

import (
"context"
Expand All @@ -9,13 +9,12 @@ import (

"lunchpail.io/cmd/options"
"lunchpail.io/pkg/be"
"lunchpail.io/pkg/compilation"
"lunchpail.io/pkg/runtime/queue"
)

func newQcatCmd() *cobra.Command {
func Cat() *cobra.Command {
var cmd = &cobra.Command{
Use: "qcat <file>",
Use: "cat <file>",
Short: "Show the contents of a file in the queue",
Long: "Show the contents of a file in the queue",
Args: cobra.MatchAll(cobra.ExactArgs(1), cobra.OnlyValidArgs),
Expand All @@ -41,9 +40,3 @@ func newQcatCmd() *cobra.Command {

return cmd
}

func init() {
if compilation.IsCompiled() {
rootCmd.AddCommand(newQcatCmd())
}
}
10 changes: 3 additions & 7 deletions cmd/subcommands/qdone.go → cmd/subcommands/queue/done.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package subcommands
package queue

import (
"context"
Expand All @@ -8,9 +8,9 @@ import (
"lunchpail.io/pkg/runtime/queue"
)

func newQdoneCmd() *cobra.Command {
func Done() *cobra.Command {
var cmd = &cobra.Command{
Use: "qdone",
Use: "done",
Short: "Indicate that dispatching is done",
Long: "Indicate that dispatching is done",
Args: cobra.MatchAll(cobra.OnlyValidArgs),
Expand All @@ -22,7 +22,3 @@ func newQdoneCmd() *cobra.Command {

return cmd
}

func init() {
rootCmd.AddCommand(newQdoneCmd())
}
10 changes: 3 additions & 7 deletions cmd/subcommands/qout.go → cmd/subcommands/queue/download.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package subcommands
package queue

import (
"context"
Expand All @@ -8,9 +8,9 @@ import (
"lunchpail.io/pkg/runtime/queue"
)

func newQcopyoutCmd() *cobra.Command {
func Download() *cobra.Command {
var cmd = &cobra.Command{
Use: "qout <bucket/path> <localDir>",
Use: "download <bucket/path> <localDir>",
Short: "Copy data out of queue",
Long: "Copy data out of queue",
Args: cobra.MatchAll(cobra.ExactArgs(2), cobra.OnlyValidArgs),
Expand All @@ -22,7 +22,3 @@ func newQcopyoutCmd() *cobra.Command {

return cmd
}

func init() {
rootCmd.AddCommand(newQcopyoutCmd())
}
13 changes: 3 additions & 10 deletions cmd/subcommands/qlast.go → cmd/subcommands/queue/last.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//go:build full || observe

package subcommands
package queue

import (
"context"
Expand All @@ -10,13 +10,12 @@ import (

"lunchpail.io/cmd/options"
"lunchpail.io/pkg/be"
"lunchpail.io/pkg/compilation"
"lunchpail.io/pkg/observe/qstat"
)

func newQlastCommand() *cobra.Command {
func Last() *cobra.Command {
var cmd = &cobra.Command{
Use: "qlast",
Use: "last",
Short: "Stream queue statistics to console",
Args: cobra.MatchAll(cobra.MinimumNArgs(1), cobra.OnlyValidArgs),
}
Expand Down Expand Up @@ -52,9 +51,3 @@ func newQlastCommand() *cobra.Command {

return cmd
}

func init() {
if compilation.IsCompiled() {
rootCmd.AddCommand(newQlastCommand())
}
}
13 changes: 3 additions & 10 deletions cmd/subcommands/qls.go → cmd/subcommands/queue/ls.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//go:build full || observe

package subcommands
package queue

import (
"context"
Expand All @@ -9,13 +9,12 @@ import (

"lunchpail.io/cmd/options"
"lunchpail.io/pkg/be"
"lunchpail.io/pkg/compilation"
"lunchpail.io/pkg/runtime/queue"
)

func newQlsCmd() *cobra.Command {
func Ls() *cobra.Command {
cmd := &cobra.Command{
Use: "qls [path]",
Use: "ls [path]",
Short: "List queue path",
Long: "List queue path",
Args: cobra.MatchAll(cobra.MaximumNArgs(1), cobra.OnlyValidArgs),
Expand Down Expand Up @@ -46,9 +45,3 @@ func newQlsCmd() *cobra.Command {

return cmd
}

func init() {
if compilation.IsCompiled() {
rootCmd.AddCommand(newQlsCmd())
}
}
13 changes: 3 additions & 10 deletions cmd/subcommands/qstat.go → cmd/subcommands/queue/stat.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//go:build full || observe

package subcommands
package queue

import (
"context"
Expand All @@ -9,17 +9,16 @@ import (

"lunchpail.io/cmd/options"
"lunchpail.io/pkg/be"
"lunchpail.io/pkg/compilation"
"lunchpail.io/pkg/observe/qstat"
)

func newQstatCommand() *cobra.Command {
func Stat() *cobra.Command {
var tailFlag int64
var followFlag bool
var quietFlag bool

var cmd = &cobra.Command{
Use: "qstat",
Use: "stat",
Short: "Stream queue statistics to console",
}

Expand Down Expand Up @@ -52,9 +51,3 @@ func newQstatCommand() *cobra.Command {

return cmd
}

func init() {
if compilation.IsCompiled() {
rootCmd.AddCommand(newQstatCommand())
}
}
11 changes: 2 additions & 9 deletions cmd/subcommands/upload.go → cmd/subcommands/queue/upload.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//go:build full || observe

package subcommands
package queue

import (
"context"
Expand All @@ -9,12 +9,11 @@ import (

"lunchpail.io/cmd/options"
"lunchpail.io/pkg/be"
"lunchpail.io/pkg/compilation"
"lunchpail.io/pkg/runtime/queue"
"lunchpail.io/pkg/runtime/queue/upload"
)

func newUploadCmd() *cobra.Command {
func Upload() *cobra.Command {
var cmd = &cobra.Command{
Use: "upload <srcDir> <bucket>",
Short: "Copy data into queue",
Expand Down Expand Up @@ -44,9 +43,3 @@ func newUploadCmd() *cobra.Command {

return cmd
}

func init() {
if compilation.IsCompiled() {
rootCmd.AddCommand(newUploadCmd())
}
}
33 changes: 33 additions & 0 deletions cmd/subcommands/queue_full.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
//go:build full || observe

package subcommands

import (
"github.com/spf13/cobra"

"lunchpail.io/cmd/subcommands/queue"
"lunchpail.io/pkg/compilation"
)

func init() {
cmd := &cobra.Command{
Use: "queue",
Short: "Commands related to the queue",
}
rootCmd.AddCommand(cmd)

if compilation.IsCompiled() {
cmd.AddCommand(queue.Cat())
cmd.AddCommand(queue.Last())
cmd.AddCommand(queue.Ls())
cmd.AddCommand(queue.Stat())
cmd.AddCommand(queue.Upload())
}

// Currently components rely on these operations, and in
// Kubernetes, we currently use the "uncompiled" raw
// `lunchpail` executable for these operations:
cmd.AddCommand(queue.Add())
cmd.AddCommand(queue.Done())
cmd.AddCommand(queue.Download())
}
2 changes: 1 addition & 1 deletion pkg/be/kubernetes/shell/chart/templates/prestop.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ terminationGracePeriodSeconds: 5 # give time for the preStop in the container
lifecycle:
preStop:
exec:
command: ["lunchpail", "qdone"]
command: ["lunchpail", "queue", "done"]
{{- end }}
{{- end }}
4 changes: 2 additions & 2 deletions pkg/fe/transformer/api/dispatch/parametersweep/main.sh
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ do
then debugflag="--debug"
fi

# If we were asked to wait, then `enqueue file` will exit with the
# If we were asked to wait, then `queue add file` will exit with the
# exit code of the underlying worker. Here, we intentionally
# ignore any errors from the task.
$LUNCHPAIL_EXE enqueue file $task $waitflag $verboseflag $debugflag
$LUNCHPAIL_EXE queue add file $task $waitflag $verboseflag $debugflag
rm -f "$task"
done
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func transpile(sweep hlir.ParameterSweep) (hlir.Application, error) {
app.Spec.Role = "dispatcher"

app.Spec.Command = strings.Join([]string{
`trap "$LUNCHPAIL_EXE qdone" EXIT`,
`trap "$LUNCHPAIL_EXE queue done" EXIT`,
"./main.sh",
}, "\n")

Expand Down
4 changes: 2 additions & 2 deletions pkg/fe/transformer/api/dispatch/s3/transpile.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func transpile(s3 hlir.ProcessS3Objects) (hlir.Application, error) {

envPrefix := "LUNCHPAIL_PROCESS_S3_OBJECTS_"
app.Spec.Command = strings.Join([]string{
`trap "$LUNCHPAIL_EXE qdone" EXIT`,
fmt.Sprintf("$LUNCHPAIL_EXE enqueue s3 --repeat %d %s %s %s %s", repeat, verbose, debug, s3.Spec.Path, envPrefix),
`trap "$LUNCHPAIL_EXE queue done" EXIT`,
fmt.Sprintf("$LUNCHPAIL_EXE queue add s3 --repeat %d %s %s %s %s", repeat, verbose, debug, s3.Spec.Path, envPrefix),
}, "\n")

app.Spec.Env = hlir.Env{}
Expand Down
Loading