Skip to content

Commit

Permalink
fix: default publishers not working (#4646)
Browse files Browse the repository at this point in the history
We are still having issues with using server side default publishers
where the client will fail any job submission where outputs are defined,
but no publisher is explicitly selected. The root cause is that we are
still calling `job.Validate()` in the client instead of just
`job.ValidateSubmission()`. `Validate()` is a more through validation
and should only be called after all transformations and defaults
applied.

This bug has slipped through because we are using some builders in the
client side to translate from cli flags to job and task models instead
of directly instantiating the models. The builders were calling Validate
which wasn't very obvious.

In addition to fixing the bug, this PR also removes all of these
unecessery builders that are mainly used in few tests in favour of using
the models directly for better clarity.


### Current Behaviour
```
# set default publisher
→ bacalhau config set  jobdefaults.batch.task.publisher.type=local
Writing config to /Users/walid/.bacalhau/config.yaml

# run a job with outputs but no publisher defined
→ bacalhau docker run --output outputs:/outputs ubuntu -- /bin/sh -c "echo hello > outputs/text"

Error: building job spec: failed to create job: publisher must be set if result paths are set
```

### Expected Behaviour
```
→ bacalhau docker run --output outputs:/outputs ubuntu -- /bin/sh -c "echo hello > outputs/text"
Job successfully submitted. Job ID: j-aba087a0-0516-4b58-aff0-9824af583723
Checking job status... (Enter Ctrl+C to exit at any time, your job will continue running):

 TIME          EXEC. ID    TOPIC            EVENT
 14:55:19.550              Submission       Job submitted
 14:55:19.592  e-13e6591c  Scheduling       Requested execution on n-a1af1cf4
 14:55:19.761  e-13e6591c  Execution        Running
 14:55:23.323  e-13e6591c  Execution        Completed successfully

To get more details about the run, execute:
	bacalhau job describe j-aba087a0-0516-4b58-aff0-9824af583723

To get more details about the run executions, execute:
	bacalhau job executions j-aba087a0-0516-4b58-aff0-9824af583723

To download the results, execute:
	bacalhau job get j-aba087a0-0516-4b58-aff0-9824af583723

```

### Testing Done:
- Fixed and added more coverage to `DefaultPublisherSuite`
  • Loading branch information
wdbaruni authored Oct 23, 2024
1 parent b63ba60 commit 12402d3
Show file tree
Hide file tree
Showing 21 changed files with 296 additions and 376 deletions.
14 changes: 1 addition & 13 deletions cmd/cli/docker/docker_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/publicapi/apimodels"
clientv2 "github.com/bacalhau-project/bacalhau/pkg/publicapi/client/v2"
"github.com/bacalhau-project/bacalhau/pkg/userstrings"
)

var (
Expand Down Expand Up @@ -182,16 +181,5 @@ func build(args []string, opts *DockerRunOptions) (*models.Job, error) {
return nil, err
}

job, err := helpers.BuildJobFromFlags(engineSpec, opts.JobSettings, opts.TaskSettings)
if err != nil {
return nil, fmt.Errorf("building job spec: %w", err)
}

// Normalize and validate the job spec
job.Normalize()
if err := job.ValidateSubmission(); err != nil {
return nil, fmt.Errorf("%s: %w", userstrings.JobSpecBad, err)
}

return job, nil
return helpers.BuildJobFromFlags(engineSpec, opts.JobSettings, opts.TaskSettings)
}
37 changes: 20 additions & 17 deletions cmd/cli/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/bacalhau-project/bacalhau/cmd/util/flags/cliflags"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/userstrings"
)

func JobToYaml(job *models.Job) (string, error) {
Expand All @@ -30,31 +31,27 @@ func BuildJobFromFlags(
jobSettings *cliflags.JobSettings,
taskSettings *cliflags.TaskSettings,
) (*models.Job, error) {
t, err := models.NewTaskBuilder().
Name(taskSettings.Name).
Engine(engineSpec).
Publisher(taskSettings.Publisher.Value()).
ResourcesConfig(&models.ResourcesConfig{
task := &models.Task{
Name: taskSettings.Name,
Engine: engineSpec,
Publisher: taskSettings.Publisher.Value(),
ResourcesConfig: &models.ResourcesConfig{
CPU: taskSettings.Resources.CPU,
Memory: taskSettings.Resources.Memory,
Disk: taskSettings.Resources.Disk,
GPU: taskSettings.Resources.GPU,
}).
InputSources(taskSettings.InputSources.Values()...).
ResultPaths(taskSettings.ResultPaths...).
Network(&models.NetworkConfig{
},
InputSources: taskSettings.InputSources.Values(),
ResultPaths: taskSettings.ResultPaths,
Network: &models.NetworkConfig{
Type: taskSettings.Network.Network,
Domains: taskSettings.Network.Domains,
}).
Timeouts(&models.TimeoutConfig{
},
Timeouts: &models.TimeoutConfig{
TotalTimeout: taskSettings.Timeout,
QueueTimeout: taskSettings.QueueTimeout,
}).
Build()
if err != nil {
return nil, fmt.Errorf("failed to create job: %w", err)
},
}

constraints, err := jobSettings.Constraints()
if err != nil {
return nil, fmt.Errorf("failed to parse job constraints: %w", err)
Expand All @@ -72,7 +69,13 @@ func BuildJobFromFlags(
Count: jobSettings.Count(),
Constraints: constraints,
Labels: labels,
Tasks: []*models.Task{t},
Tasks: []*models.Task{task},
}

// Normalize and validate the job spec
job.Normalize()
if err := job.ValidateSubmission(); err != nil {
return nil, fmt.Errorf("%s: %w", userstrings.JobSpecBad, err)
}

return job, nil
Expand Down
14 changes: 1 addition & 13 deletions cmd/cli/wasm/wasm_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
clientv2 "github.com/bacalhau-project/bacalhau/pkg/publicapi/client/v2"
"github.com/bacalhau-project/bacalhau/pkg/storage/inline"
storage_ipfs "github.com/bacalhau-project/bacalhau/pkg/storage/ipfs"
"github.com/bacalhau-project/bacalhau/pkg/userstrings"
"github.com/bacalhau-project/bacalhau/pkg/util/closer"
)

Expand Down Expand Up @@ -205,18 +204,7 @@ func build(ctx context.Context, args []string, opts *WasmRunOptions) (*models.Jo
return nil, err
}

job, err := helpers.BuildJobFromFlags(engineSpec, opts.JobSettings, opts.TaskSettings)
if err != nil {
return nil, fmt.Errorf("building job spec: %w", err)
}

// Normalize and validate the job spec
job.Normalize()
if err := job.ValidateSubmission(); err != nil {
return nil, fmt.Errorf("%s: %w", userstrings.JobSpecBad, err)
}

return job, nil
return helpers.BuildJobFromFlags(engineSpec, opts.JobSettings, opts.TaskSettings)
}

func parseWasmEntryModule(ctx context.Context, in string) (*models.InputSource, error) {
Expand Down
2 changes: 1 addition & 1 deletion cmd/util/flags/cliflags/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ Examples:
-i src=s3://bucket/key,dst=/my/input/path,opt=endpoint=https://s3.example.com,opt=region=us-east-1
`

ResultPathUsageMsg = "name=path of the output data volumes"
ResultPathUsageMsg = "name:path of the output data volumes"

PublisherUsageMsg = `Where to publish the result of the job`

Expand Down
11 changes: 11 additions & 0 deletions cmd/util/printer/progress_printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,17 @@ func (j *JobProgressPrinter) printJobDetailsInstructions(cmd *cobra.Command, job
if j.isQuiet() {
return
}

// query the server for the job spec to get any server side defaults and transformations,
// such as if a default publisher was applied
resp, err := j.client.Jobs().Get(cmd.Context(), &apimodels.GetJobRequest{JobID: job.ID})
if err != nil {
// just log and continue with the existing job details
PrintWarning(cmd, fmt.Sprintf("Failed to get updated job details: %v", err))
} else {
job = resp.Job
}

cmd.Println()
cmd.Println("To get more details about the run, execute:")
cmd.Printf("\t%s job describe %s\n", os.Args[0], job.ID)
Expand Down
125 changes: 50 additions & 75 deletions pkg/executor/docker/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,11 @@ func (s *ExecutorTestSuite) TestDockerResourceLimitsCPU() {
WithEntrypoint("bash", "-c", "cat /sys/fs/cgroup/cpu.max").
Build()
s.Require().NoError(err)
task := mock.TaskBuilder().
Engine(es).
ResourcesConfig(models.NewResourcesConfigBuilder().CPU(CPU_LIMIT).Memory(MEBIBYTE_MEMORY_LIMIT).BuildOrDie()).
BuildOrDie()

task := mock.Task()
task.Engine = es
task.ResourcesConfig = &models.ResourcesConfig{CPU: CPU_LIMIT, Memory: MEBIBYTE_MEMORY_LIMIT}
task.Normalize()

result, err := s.runJobGetStdout(task, uuid.New().String())
require.NoError(s.T(), err)
Expand Down Expand Up @@ -264,10 +265,11 @@ func (s *ExecutorTestSuite) TestDockerResourceLimitsMemory() {
WithEntrypoint("bash", "-c", "cat /sys/fs/cgroup/memory.max").
Build()
s.Require().NoError(err)
task := mock.TaskBuilder().
Engine(es).
ResourcesConfig(models.NewResourcesConfigBuilder().CPU(CPU_LIMIT).Memory(p.in).BuildOrDie()).
BuildOrDie()

task := mock.Task()
task.Engine = es
task.ResourcesConfig = &models.ResourcesConfig{CPU: CPU_LIMIT, Memory: p.in}
task.Normalize()

result, err := s.runJobGetStdout(task, uuid.New().String())
require.NoError(s.T(), err)
Expand All @@ -289,12 +291,9 @@ func (s *ExecutorTestSuite) TestDockerResourceLimitsMemory() {
}

func (s *ExecutorTestSuite) TestDockerNetworkingFull() {
task := mock.TaskBuilder().
Network(models.NewNetworkConfigBuilder().
Type(models.NetworkFull).
BuildOrDie()).
Engine(s.curlTask()).
BuildOrDie()
task := mock.Task()
task.Engine = s.curlTask()
task.Network = &models.NetworkConfig{Type: models.NetworkFull}

result, err := s.runJob(task, uuid.New().String())
require.NoError(s.T(), err, result.STDERR)
Expand All @@ -303,12 +302,9 @@ func (s *ExecutorTestSuite) TestDockerNetworkingFull() {
}

func (s *ExecutorTestSuite) TestDockerNetworkingNone() {
task := mock.TaskBuilder().
Network(models.NewNetworkConfigBuilder().
Type(models.NetworkNone).
BuildOrDie()).
Engine(s.curlTask()).
BuildOrDie()
task := mock.Task()
task.Engine = s.curlTask()
task.Network = &models.NetworkConfig{Type: models.NetworkNone}

result, err := s.runJob(task, uuid.New().String())
require.NoError(s.T(), err)
Expand All @@ -318,13 +314,9 @@ func (s *ExecutorTestSuite) TestDockerNetworkingNone() {
}

func (s *ExecutorTestSuite) TestDockerNetworkingHTTP() {
task := mock.TaskBuilder().
Network(models.NewNetworkConfigBuilder().
Type(models.NetworkHTTP).
Domains(s.containerHttpURL().Hostname()).
BuildOrDie()).
Engine(s.curlTask()).
BuildOrDie()
task := mock.Task()
task.Engine = s.curlTask()
task.Network = &models.NetworkConfig{Type: models.NetworkHTTP, Domains: []string{s.containerHttpURL().Hostname()}}

result, err := s.runJob(task, uuid.New().String())
require.NoError(s.T(), err, result.STDERR)
Expand All @@ -333,13 +325,10 @@ func (s *ExecutorTestSuite) TestDockerNetworkingHTTP() {
}

func (s *ExecutorTestSuite) TestDockerNetworkingHTTPWithMultipleDomains() {
task := mock.TaskBuilder().
Network(models.NewNetworkConfigBuilder().
Type(models.NetworkHTTP).
Domains(s.containerHttpURL().Hostname(), "bacalhau.org").
BuildOrDie()).
Engine(s.curlTask()).
BuildOrDie()
task := mock.Task()
task.Engine = s.curlTask()
task.Network = &models.NetworkConfig{Type: models.NetworkHTTP, Domains: []string{
s.containerHttpURL().Hostname(), "bacalhau.org"}}

result, err := s.runJob(task, uuid.New().String())
require.NoError(s.T(), err, result.STDERR)
Expand All @@ -352,13 +341,9 @@ func (s *ExecutorTestSuite) TestDockerNetworkingWithSubdomains() {
hostname := s.containerHttpURL().Hostname()
hostroot := strings.Join(strings.SplitN(hostname, ".", 2)[:1], ".")

task := mock.TaskBuilder().
Network(models.NewNetworkConfigBuilder().
Type(models.NetworkHTTP).
Domains(hostname, hostroot).
BuildOrDie()).
Engine(s.curlTask()).
BuildOrDie()
task := mock.Task()
task.Engine = s.curlTask()
task.Network = &models.NetworkConfig{Type: models.NetworkHTTP, Domains: []string{hostname, hostroot}}

result, err := s.runJob(task, uuid.New().String())
require.NoError(s.T(), err, result.STDERR)
Expand All @@ -367,13 +352,9 @@ func (s *ExecutorTestSuite) TestDockerNetworkingWithSubdomains() {
}

func (s *ExecutorTestSuite) TestDockerNetworkingFiltersHTTP() {
task := mock.TaskBuilder().
Network(models.NewNetworkConfigBuilder().
Type(models.NetworkHTTP).
Domains("bacalhau.org").
BuildOrDie()).
Engine(s.curlTask()).
BuildOrDie()
task := mock.Task()
task.Engine = s.curlTask()
task.Network = &models.NetworkConfig{Type: models.NetworkHTTP, Domains: []string{"bacalhau.org"}}

result, err := s.runJob(task, uuid.New().String())
// The curl will succeed but should return a non-zero exit code and error page.
Expand All @@ -388,13 +369,9 @@ func (s *ExecutorTestSuite) TestDockerNetworkingFiltersHTTPS() {
Build()
s.Require().NoError(err)

task := mock.TaskBuilder().
Network(models.NewNetworkConfigBuilder().
Type(models.NetworkHTTP).
Domains(s.containerHttpURL().Hostname()).
BuildOrDie()).
Engine(es).
BuildOrDie()
task := mock.Task()
task.Engine = es
task.Network = &models.NetworkConfig{Type: models.NetworkHTTP, Domains: []string{s.containerHttpURL().Hostname()}}

result, err := s.runJob(task, uuid.New().String())

Expand All @@ -415,9 +392,8 @@ func (s *ExecutorTestSuite) TestDockerExecutionCancellation() {

s.Require().NoError(err)

task := mock.TaskBuilder().
Engine(es).
BuildOrDie()
task := mock.Task()
task.Engine = es

jobCtx := context.Background()
go func() {
Expand Down Expand Up @@ -456,10 +432,10 @@ func (s *ExecutorTestSuite) TestDockerNetworkingAppendsHTTPHeader() {
_, err := w.Write([]byte(r.Header.Get("X-Bacalhau-Job-ID")))
s.Require().NoError(err)
})
task := mock.TaskBuilder().
Network(models.NewNetworkConfigBuilder().Type(models.NetworkHTTP).Domains(s.containerHttpURL().Hostname()).BuildOrDie()).
Engine(s.curlTask()).
BuildOrDie()

task := mock.Task()
task.Engine = s.curlTask()
task.Network = &models.NetworkConfig{Type: models.NetworkHTTP, Domains: []string{s.containerHttpURL().Hostname()}}

executionID := uuid.New().String()
result, err := s.runJob(task, executionID)
Expand All @@ -476,9 +452,8 @@ func (s *ExecutorTestSuite) TestTimesOutCorrectly() {
WithEntrypoint("bash", "-c", fmt.Sprintf(`sleep 1 && echo "%s" && sleep 20`, expected)).
Build()
s.Require().NoError(err)
task := mock.TaskBuilder().
Engine(es).
BuildOrDie()
task := mock.Task()
task.Engine = es

name := "timeout"
resultDir := s.T().TempDir()
Expand Down Expand Up @@ -533,10 +508,10 @@ func (s *ExecutorTestSuite) TestDockerStreamsAlreadyComplete() {
WithEntrypoint("bash", "cat /sys/fs/cgroup/cpu.max").
Build()
s.Require().NoError(err)
task := mock.TaskBuilder().
Engine(es).
ResourcesConfig(models.NewResourcesConfigBuilder().CPU(CPU_LIMIT).Memory(MEBIBYTE_MEMORY_LIMIT).BuildOrDie()).
BuildOrDie()
task := mock.Task()
task.Engine = es
task.ResourcesConfig = &models.ResourcesConfig{CPU: CPU_LIMIT, Memory: MEBIBYTE_MEMORY_LIMIT}
task.Normalize()

go func() {
_, _ = s.runJobWithContext(ctx, task, id)
Expand All @@ -561,11 +536,10 @@ func (s *ExecutorTestSuite) TestDockerStreamsSlowTask() {
Build()
s.Require().NoError(err)

task := mock.TaskBuilder().
Engine(es).
ResourcesConfig(models.NewResourcesConfigBuilder().CPU(CPU_LIMIT).Memory(MEBIBYTE_MEMORY_LIMIT).BuildOrDie()).
BuildOrDie()

task := mock.Task()
task.Engine = es
task.ResourcesConfig = &models.ResourcesConfig{CPU: CPU_LIMIT, Memory: MEBIBYTE_MEMORY_LIMIT}
task.Normalize()
s.startJob(task, id)

reader, err := s.executor.GetLogStream(context.Background(), executor.LogStreamRequest{
Expand Down Expand Up @@ -595,7 +569,8 @@ func (s *ExecutorTestSuite) TestDockerOOM() {
WithEntrypoint("tail", "/dev/zero").
Build()
s.Require().NoError(err)
task := mock.TaskBuilder().Engine(es).BuildOrDie()
task := mock.Task()
task.Engine = es

result, err := s.runJob(task, uuid.New().String())
require.NoError(s.T(), err)
Expand Down
Loading

0 comments on commit 12402d3

Please sign in to comment.