Skip to content

Commit

Permalink
chore(v2): compactor concurrency (#3628)
Browse files Browse the repository at this point in the history
* chore(v2): compactor concurrency

* review fixes

* make fmt

* automaxprocs
  • Loading branch information
korniltsev authored Oct 17, 2024
1 parent e43348d commit fc7fad4
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 30 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ require (
go.opentelemetry.io/otel v1.30.0
go.opentelemetry.io/proto/otlp v1.1.0
go.uber.org/atomic v1.11.0
go.uber.org/automaxprocs v1.5.3
go.uber.org/goleak v1.3.0
golang.org/x/exp v0.0.0-20240119083558-1b970713d09a
golang.org/x/mod v0.17.0
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,8 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRI
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s=
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
github.com/prometheus/alertmanager v0.27.0 h1:V6nTa2J5V4s8TG4C4HtrBP/WNSebCCTYGGv4qecA/+I=
github.com/prometheus/alertmanager v0.27.0/go.mod h1:8Ia/R3urPmbzJ8OsdvmZvIprDwvwmYCmUbwBL+jlPOE=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
Expand Down Expand Up @@ -796,6 +798,8 @@ go.opentelemetry.io/proto/otlp v1.1.0/go.mod h1:GpBHCBWiqvVLDqmHZsoMM3C5ySeKTC7e
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8=
go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
Expand Down
70 changes: 40 additions & 30 deletions pkg/experiment/compactor/compaction_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"os"
"path/filepath"
"runtime"
"runtime/debug"
"sync"
"time"
Expand All @@ -22,6 +23,7 @@ import (
"github.com/grafana/pyroscope/pkg/experiment/metastore/client"
"github.com/grafana/pyroscope/pkg/experiment/query_backend/block"
"github.com/grafana/pyroscope/pkg/objstore"
_ "go.uber.org/automaxprocs"
)

type Worker struct {
Expand All @@ -39,10 +41,11 @@ type Worker struct {
completedJobs map[string]*compactorv1.CompactionJobStatus

queue chan *compactorv1.CompactionJob
wg sync.WaitGroup
}

type Config struct {
JobCapacity int `yaml:"job_capacity"`
JobConcurrency int `yaml:"job_capacity"`
JobPollInterval time.Duration `yaml:"job_poll_interval"`
SmallObjectSize int `yaml:"small_object_size_bytes"`
TempDir string `yaml:"temp_dir"`
Expand All @@ -51,7 +54,7 @@ type Config struct {
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
const prefix = "compaction-worker."
tempdir := filepath.Join(os.TempDir(), "pyroscope-compactor")
f.IntVar(&cfg.JobCapacity, prefix+"job-capacity", 3, "How many concurrent jobs will a compaction worker run at most.")
f.IntVar(&cfg.JobConcurrency, prefix+"job-concurrency", 1, "How many concurrent jobs will a compaction worker run at most.")
f.DurationVar(&cfg.JobPollInterval, prefix+"job-poll-interval", 5*time.Second, "How often will a compaction worker poll for jobs.")
f.IntVar(&cfg.SmallObjectSize, prefix+"small-object-size-bytes", 8<<20, "Size of the object that can be loaded in memory.")
f.StringVar(&cfg.TempDir, prefix+"temp-dir", tempdir, "Temporary directory for compaction jobs.")
Expand All @@ -63,6 +66,7 @@ func (cfg *Config) Validate() error {
}

func New(config Config, logger log.Logger, metastoreClient *metastoreclient.Client, storage objstore.Bucket, reg prometheus.Registerer) (*Worker, error) {
workers := runtime.GOMAXPROCS(-1) * config.JobConcurrency
w := &Worker{
config: config,
logger: logger,
Expand All @@ -72,7 +76,7 @@ func New(config Config, logger log.Logger, metastoreClient *metastoreclient.Clie
activeJobs: make(map[string]*compactorv1.CompactionJob),
completedJobs: make(map[string]*compactorv1.CompactionJobStatus),
metrics: newMetrics(reg),
queue: make(chan *compactorv1.CompactionJob, 2*config.JobCapacity),
queue: make(chan *compactorv1.CompactionJob, workers),
}
w.BasicService = services.NewBasicService(w.starting, w.running, w.stopping)
return w, nil
Expand All @@ -85,41 +89,50 @@ func (w *Worker) starting(ctx context.Context) (err error) {
func (w *Worker) running(ctx context.Context) error {
ticker := time.NewTicker(w.config.JobPollInterval)
defer ticker.Stop()
go func() {
for {
select {
case <-ctx.Done():
return

case job := <-w.queue:
w.jobMutex.Lock()
delete(w.pendingJobs, job.Name)
w.activeJobs[job.Name] = job
w.jobMutex.Unlock()

_ = level.Info(w.logger).Log("msg", "starting compaction job", "job", job.Name)
status := w.startJob(ctx, job)
_ = level.Info(w.logger).Log("msg", "compaction job finished", "job", job.Name)

w.jobMutex.Lock()
delete(w.activeJobs, job.Name)
w.completedJobs[job.Name] = status
w.jobMutex.Unlock()
}
}
}()
for i := 0; i < cap(w.queue); i++ {
w.wg.Add(1)
go func() {
defer w.wg.Done()
w.jobsLoop(ctx)
}()
}

for {
select {
case <-ticker.C:
w.poll(ctx)

case <-ctx.Done():
w.wg.Wait()
return nil
}
}
}

func (w *Worker) jobsLoop(ctx context.Context) {
for {
select {
case <-ctx.Done():
return

case job := <-w.queue:
w.jobMutex.Lock()
delete(w.pendingJobs, job.Name)
w.activeJobs[job.Name] = job
w.jobMutex.Unlock()

_ = level.Info(w.logger).Log("msg", "starting compaction job", "job", job.Name)
status := w.startJob(ctx, job)
_ = level.Info(w.logger).Log("msg", "compaction job finished", "job", job.Name)

w.jobMutex.Lock()
delete(w.activeJobs, job.Name)
w.completedJobs[job.Name] = status
w.jobMutex.Unlock()
}
}
}

func (w *Worker) poll(ctx context.Context) {
w.jobMutex.Lock()
level.Debug(w.logger).Log(
Expand All @@ -146,10 +159,7 @@ func (w *Worker) poll(ctx context.Context) {
pendingStatusUpdates = append(pendingStatusUpdates, update)
}

jobCapacity := w.config.JobCapacity - len(w.activeJobs) - len(w.pendingJobs)
if jobCapacity < 0 {
jobCapacity = 0
}
jobCapacity := cap(w.queue) - len(w.queue)
w.jobMutex.Unlock()

if len(pendingStatusUpdates) > 0 || jobCapacity > 0 {
Expand Down

0 comments on commit fc7fad4

Please sign in to comment.