diff --git a/go.mod b/go.mod index e9fe831cef..4fc3ca7cd2 100644 --- a/go.mod +++ b/go.mod @@ -78,6 +78,7 @@ require ( go.opentelemetry.io/otel v1.30.0 go.opentelemetry.io/proto/otlp v1.1.0 go.uber.org/atomic v1.11.0 + go.uber.org/automaxprocs v1.5.3 go.uber.org/goleak v1.3.0 golang.org/x/exp v0.0.0-20240119083558-1b970713d09a golang.org/x/mod v0.17.0 diff --git a/go.sum b/go.sum index 0efa02c18c..e3a55bd982 100644 --- a/go.sum +++ b/go.sum @@ -646,6 +646,8 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRI github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI= github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s= +github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= +github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= github.com/prometheus/alertmanager v0.27.0 h1:V6nTa2J5V4s8TG4C4HtrBP/WNSebCCTYGGv4qecA/+I= github.com/prometheus/alertmanager v0.27.0/go.mod h1:8Ia/R3urPmbzJ8OsdvmZvIprDwvwmYCmUbwBL+jlPOE= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= @@ -796,6 +798,8 @@ go.opentelemetry.io/proto/otlp v1.1.0/go.mod h1:GpBHCBWiqvVLDqmHZsoMM3C5ySeKTC7e go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= +go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8= +go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= diff --git a/pkg/experiment/compactor/compaction_worker.go b/pkg/experiment/compactor/compaction_worker.go index 4fac0eb004..bdf9dddce3 100644 --- a/pkg/experiment/compactor/compaction_worker.go +++ b/pkg/experiment/compactor/compaction_worker.go @@ -6,6 +6,7 @@ import ( "fmt" "os" "path/filepath" + "runtime" "runtime/debug" "sync" "time" @@ -22,6 +23,7 @@ import ( "github.com/grafana/pyroscope/pkg/experiment/metastore/client" "github.com/grafana/pyroscope/pkg/experiment/query_backend/block" "github.com/grafana/pyroscope/pkg/objstore" + _ "go.uber.org/automaxprocs" ) type Worker struct { @@ -39,10 +41,11 @@ type Worker struct { completedJobs map[string]*compactorv1.CompactionJobStatus queue chan *compactorv1.CompactionJob + wg sync.WaitGroup } type Config struct { - JobCapacity int `yaml:"job_capacity"` + JobConcurrency int `yaml:"job_capacity"` JobPollInterval time.Duration `yaml:"job_poll_interval"` SmallObjectSize int `yaml:"small_object_size_bytes"` TempDir string `yaml:"temp_dir"` @@ -51,7 +54,7 @@ type Config struct { func (cfg *Config) RegisterFlags(f *flag.FlagSet) { const prefix = "compaction-worker." tempdir := filepath.Join(os.TempDir(), "pyroscope-compactor") - f.IntVar(&cfg.JobCapacity, prefix+"job-capacity", 3, "How many concurrent jobs will a compaction worker run at most.") + f.IntVar(&cfg.JobConcurrency, prefix+"job-concurrency", 1, "How many concurrent jobs will a compaction worker run at most.") f.DurationVar(&cfg.JobPollInterval, prefix+"job-poll-interval", 5*time.Second, "How often will a compaction worker poll for jobs.") f.IntVar(&cfg.SmallObjectSize, prefix+"small-object-size-bytes", 8<<20, "Size of the object that can be loaded in memory.") f.StringVar(&cfg.TempDir, prefix+"temp-dir", tempdir, "Temporary directory for compaction jobs.") @@ -63,6 +66,7 @@ func (cfg *Config) Validate() error { } func New(config Config, logger log.Logger, metastoreClient *metastoreclient.Client, storage objstore.Bucket, reg prometheus.Registerer) (*Worker, error) { + workers := runtime.GOMAXPROCS(-1) * config.JobConcurrency w := &Worker{ config: config, logger: logger, @@ -72,7 +76,7 @@ func New(config Config, logger log.Logger, metastoreClient *metastoreclient.Clie activeJobs: make(map[string]*compactorv1.CompactionJob), completedJobs: make(map[string]*compactorv1.CompactionJobStatus), metrics: newMetrics(reg), - queue: make(chan *compactorv1.CompactionJob, 2*config.JobCapacity), + queue: make(chan *compactorv1.CompactionJob, workers), } w.BasicService = services.NewBasicService(w.starting, w.running, w.stopping) return w, nil @@ -85,29 +89,13 @@ func (w *Worker) starting(ctx context.Context) (err error) { func (w *Worker) running(ctx context.Context) error { ticker := time.NewTicker(w.config.JobPollInterval) defer ticker.Stop() - go func() { - for { - select { - case <-ctx.Done(): - return - - case job := <-w.queue: - w.jobMutex.Lock() - delete(w.pendingJobs, job.Name) - w.activeJobs[job.Name] = job - w.jobMutex.Unlock() - - _ = level.Info(w.logger).Log("msg", "starting compaction job", "job", job.Name) - status := w.startJob(ctx, job) - _ = level.Info(w.logger).Log("msg", "compaction job finished", "job", job.Name) - - w.jobMutex.Lock() - delete(w.activeJobs, job.Name) - w.completedJobs[job.Name] = status - w.jobMutex.Unlock() - } - } - }() + for i := 0; i < cap(w.queue); i++ { + w.wg.Add(1) + go func() { + defer w.wg.Done() + w.jobsLoop(ctx) + }() + } for { select { @@ -115,11 +103,36 @@ func (w *Worker) running(ctx context.Context) error { w.poll(ctx) case <-ctx.Done(): + w.wg.Wait() return nil } } } +func (w *Worker) jobsLoop(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + + case job := <-w.queue: + w.jobMutex.Lock() + delete(w.pendingJobs, job.Name) + w.activeJobs[job.Name] = job + w.jobMutex.Unlock() + + _ = level.Info(w.logger).Log("msg", "starting compaction job", "job", job.Name) + status := w.startJob(ctx, job) + _ = level.Info(w.logger).Log("msg", "compaction job finished", "job", job.Name) + + w.jobMutex.Lock() + delete(w.activeJobs, job.Name) + w.completedJobs[job.Name] = status + w.jobMutex.Unlock() + } + } +} + func (w *Worker) poll(ctx context.Context) { w.jobMutex.Lock() level.Debug(w.logger).Log( @@ -146,10 +159,7 @@ func (w *Worker) poll(ctx context.Context) { pendingStatusUpdates = append(pendingStatusUpdates, update) } - jobCapacity := w.config.JobCapacity - len(w.activeJobs) - len(w.pendingJobs) - if jobCapacity < 0 { - jobCapacity = 0 - } + jobCapacity := cap(w.queue) - len(w.queue) w.jobMutex.Unlock() if len(pendingStatusUpdates) > 0 || jobCapacity > 0 {