Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(distributors): Use a pool of worker to push to ingesters. #14245

Merged
merged 5 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -2248,6 +2248,10 @@ ring:
# CLI flag: -distributor.ring.instance-interface-names
[instance_interface_names: <list of strings> | default = [<private network interfaces>]]

# Number of workers to push batches to ingesters.
# CLI flag: -distributor.push-worker-count
[push_worker_count: <int> | default = 256]

rate_store:
# The max number of concurrent requests to make to ingester stream apis
# CLI flag: -distributor.rate-store.max-request-parallelism
Expand Down
58 changes: 46 additions & 12 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"
"unicode"
"unsafe"
Expand Down Expand Up @@ -79,6 +80,7 @@ var allowedLabelsForLevel = map[string]struct{}{
type Config struct {
// Distributors ring
DistributorRing RingConfig `yaml:"ring,omitempty"`
PushWorkerCount int `yaml:"push_worker_count"`

// For testing.
factory ring_client.PoolFactory `yaml:"-"`
Expand All @@ -102,7 +104,7 @@ func (cfg *Config) RegisterFlags(fs *flag.FlagSet) {
cfg.DistributorRing.RegisterFlags(fs)
cfg.RateStore.RegisterFlagsWithPrefix("distributor.rate-store", fs)
cfg.WriteFailuresLogging.RegisterFlagsWithPrefix("distributor.write-failures-logging", fs)

fs.IntVar(&cfg.PushWorkerCount, "distributor.push-worker-count", 256, "Number of workers to push batches to ingesters.")
fs.BoolVar(&cfg.KafkaEnabled, "distributor.kafka-writes-enabled", false, "Enable writes to Kafka during Push requests.")
fs.BoolVar(&cfg.IngesterEnabled, "distributor.ingester-writes-enabled", true, "Enable writes to Ingesters during Push requests. Defaults to true.")
}
Expand Down Expand Up @@ -166,7 +168,9 @@ type Distributor struct {
replicationFactor prometheus.Gauge
streamShardCount prometheus.Counter

usageTracker push.UsageTracker
usageTracker push.UsageTracker
ingesterTasks chan pushIngesterTask
ingesterTaskWg sync.WaitGroup

// kafka
kafkaWriter KafkaProducer
Expand Down Expand Up @@ -253,6 +257,7 @@ func New(
rateLimitStrat: rateLimitStrat,
tee: tee,
usageTracker: usageTracker,
ingesterTasks: make(chan pushIngesterTask),
ingesterAppends: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Namespace: constants.Loki,
Name: "distributor_ingester_appends_total",
Expand Down Expand Up @@ -350,10 +355,18 @@ func New(
}

func (d *Distributor) starting(ctx context.Context) error {
d.ingesterTaskWg.Add(d.cfg.PushWorkerCount)
for i := 0; i < d.cfg.PushWorkerCount; i++ {
go d.pushIngesterWorker()
}
return services.StartManagerAndAwaitHealthy(ctx, d.subservices)
}

func (d *Distributor) running(ctx context.Context) error {
defer func() {
close(d.ingesterTasks)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not 100% sure, but I think it might be possible for this to cause a panic 🤔 If we are in the middle of a Distributor.Push() call when the context gets cancelled, we might close d.ingesterTasks here right before we try to add to it here:

d.ingesterTasks <- pushIngesterTask{

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change a bit the shutdown logic.

d.ingesterTaskWg.Wait()
}()
select {
case <-ctx.Done():
return nil
Expand Down Expand Up @@ -630,15 +643,20 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
}

for ingester, streams := range streamsByIngester {
go func(ingester ring.InstanceDesc, samples []*streamTracker) {
func(ingester ring.InstanceDesc, samples []*streamTracker) {
// Use a background context to make sure all ingesters get samples even if we return early
localCtx, cancel := context.WithTimeout(context.Background(), d.clientCfg.RemoteTimeout)
defer cancel()
localCtx = user.InjectOrgID(localCtx, tenantID)
if sp := opentracing.SpanFromContext(ctx); sp != nil {
localCtx = opentracing.ContextWithSpan(localCtx, sp)
}
d.sendStreams(localCtx, ingester, samples, &tracker)
d.ingesterTasks <- pushIngesterTask{
ingester: ingester,
streamTracker: samples,
pushTracker: &tracker,
ctx: localCtx,
cancel: cancel,
}
}(ingesterDescs[ingester], streams)
}
}
Expand Down Expand Up @@ -830,9 +848,25 @@ func (d *Distributor) truncateLines(vContext validationContext, stream *logproto
validation.MutatedBytes.WithLabelValues(validation.LineTooLong, vContext.userID).Add(float64(truncatedBytes))
}

type pushIngesterTask struct {
streamTracker []*streamTracker
pushTracker *pushTracker
ingester ring.InstanceDesc
ctx context.Context
cancel context.CancelFunc
}

func (d *Distributor) pushIngesterWorker() {
defer d.ingesterTaskWg.Done()
for task := range d.ingesterTasks {
d.sendStreams(task)
}
}

// TODO taken from Cortex, see if we can refactor out an usable interface.
func (d *Distributor) sendStreams(ctx context.Context, ingester ring.InstanceDesc, streamTrackers []*streamTracker, pushTracker *pushTracker) {
err := d.sendStreamsErr(ctx, ingester, streamTrackers)
func (d *Distributor) sendStreams(task pushIngesterTask) {
defer task.cancel()
err := d.sendStreamsErr(task.ctx, task.ingester, task.streamTracker)

// If we succeed, decrement each stream's pending count by one.
// If we reach the required number of successful puts on this stream, then
Expand All @@ -843,17 +877,17 @@ func (d *Distributor) sendStreams(ctx context.Context, ingester ring.InstanceDes
//
// The use of atomic increments here guarantees only a single sendStreams
// goroutine will write to either channel.
for i := range streamTrackers {
for i := range task.streamTracker {
if err != nil {
if streamTrackers[i].failed.Inc() <= int32(streamTrackers[i].maxFailures) {
if task.streamTracker[i].failed.Inc() <= int32(task.streamTracker[i].maxFailures) {
continue
}
pushTracker.doneWithResult(err)
task.pushTracker.doneWithResult(err)
} else {
if streamTrackers[i].succeeded.Inc() != int32(streamTrackers[i].minSuccess) {
if task.streamTracker[i].succeeded.Inc() != int32(task.streamTracker[i].minSuccess) {
continue
}
pushTracker.doneWithResult(nil)
task.pushTracker.doneWithResult(nil)
}
}
}
Expand Down
Loading