diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 15426e54d088..ce2c8d359d75 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -3813,6 +3813,12 @@ otlp_config: # status code (260) is returned to the client along with an error message. # CLI flag: -limits.block-ingestion-status-code [block_ingestion_status_code: | default = 260] + +# The number of partitions a tenant's data should be sharded to when using kafka +# ingestion. Tenants are sharded across partitions using shuffle-sharding. 0 +# disables shuffle sharding and tenant is sharded across all partitions. +# CLI flag: -limits.ingestion-partition-tenant-shard-size +[ingestion_partitions_tenant_shard_size: | default = 0] ``` ### local_storage_config diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 476bad507ea0..01dae3ee6e0c 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -604,8 +604,12 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log tracker.streamsPending.Store(int32(streamsToWrite)) if d.cfg.KafkaEnabled { + subring, err := d.partitionRing.PartitionRing().ShuffleShard(tenantID, d.validator.IngestionPartitionsTenantShardSize(tenantID)) + if err != nil { + return nil, err + } // We don't need to create a new context like the ingester writes, because we don't return unless all writes have succeeded. - d.sendStreamsToKafka(ctx, streams, tenantID, &tracker) + d.sendStreamsToKafka(ctx, streams, tenantID, &tracker, subring) } if d.cfg.IngesterEnabled { @@ -931,10 +935,10 @@ func (d *Distributor) sendStreamsErr(ctx context.Context, ingester ring.Instance return err } -func (d *Distributor) sendStreamsToKafka(ctx context.Context, streams []KeyedStream, tenant string, tracker *pushTracker) { +func (d *Distributor) sendStreamsToKafka(ctx context.Context, streams []KeyedStream, tenant string, tracker *pushTracker, subring *ring.PartitionRing) { for _, s := range streams { go func(s KeyedStream) { - err := d.sendStreamToKafka(ctx, s, tenant) + err := d.sendStreamToKafka(ctx, s, tenant, subring) if err != nil { err = fmt.Errorf("failed to write stream to kafka: %w", err) } @@ -943,11 +947,11 @@ func (d *Distributor) sendStreamsToKafka(ctx context.Context, streams []KeyedStr } } -func (d *Distributor) sendStreamToKafka(ctx context.Context, stream KeyedStream, tenant string) error { +func (d *Distributor) sendStreamToKafka(ctx context.Context, stream KeyedStream, tenant string, subring *ring.PartitionRing) error { if len(stream.Stream.Entries) == 0 { return nil } - partitionID, err := d.partitionRing.PartitionRing().ActivePartitionForKey(stream.HashKey) + partitionID, err := subring.ActivePartitionForKey(stream.HashKey) if err != nil { d.kafkaAppends.WithLabelValues("kafka", "fail").Inc() return fmt.Errorf("failed to find active partition for stream: %w", err) diff --git a/pkg/distributor/limits.go b/pkg/distributor/limits.go index 7e7e6050d612..c72eb1939a3b 100644 --- a/pkg/distributor/limits.go +++ b/pkg/distributor/limits.go @@ -36,4 +36,6 @@ type Limits interface { BlockIngestionUntil(userID string) time.Time BlockIngestionStatusCode(userID string) int + + IngestionPartitionsTenantShardSize(userID string) int } diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 529336a58561..8672d5f4cad7 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -410,7 +410,7 @@ func New(cfg Config, clientConfig client.Config, store Store, limits Limits, con var ownedStreamsStrategy ownershipStrategy if i.cfg.KafkaIngestion.Enabled { - ownedStreamsStrategy = newOwnedStreamsPartitionStrategy(i.ingestPartitionID, partitionRingWatcher, util_log.Logger) + ownedStreamsStrategy = newOwnedStreamsPartitionStrategy(i.ingestPartitionID, partitionRingWatcher, limits.IngestionPartitionsTenantShardSize, util_log.Logger) } else { ownedStreamsStrategy = newOwnedStreamsIngesterStrategy(i.lifecycler.ID, i.readRing, util_log.Logger) } diff --git a/pkg/ingester/limiter.go b/pkg/ingester/limiter.go index 1ed3a3ea2716..a9ddd2ba3ba3 100644 --- a/pkg/ingester/limiter.go +++ b/pkg/ingester/limiter.go @@ -31,6 +31,7 @@ type Limits interface { MaxGlobalStreamsPerUser(userID string) int PerStreamRateLimit(userID string) validation.RateLimit ShardStreams(userID string) shardstreams.Config + IngestionPartitionsTenantShardSize(userID string) int } // Limiter implements primitives to get the maximum number of streams diff --git a/pkg/ingester/recalculate_owned_streams.go b/pkg/ingester/recalculate_owned_streams.go index b1f6bd62ebfc..c521e55e2d43 100644 --- a/pkg/ingester/recalculate_owned_streams.go +++ b/pkg/ingester/recalculate_owned_streams.go @@ -160,11 +160,12 @@ type ownedStreamsPartitionStrategy struct { getPartitionShardSize func(user string) int } -func newOwnedStreamsPartitionStrategy(partitionID int32, ring ring.PartitionRingReader, logger log.Logger) *ownedStreamsPartitionStrategy { +func newOwnedStreamsPartitionStrategy(partitionID int32, ring ring.PartitionRingReader, getPartitionShardSize func(user string) int, logger log.Logger) *ownedStreamsPartitionStrategy { return &ownedStreamsPartitionStrategy{ - partitionID: partitionID, - partitionRingWatcher: ring, - logger: logger, + partitionID: partitionID, + partitionRingWatcher: ring, + logger: logger, + getPartitionShardSize: getPartitionShardSize, } } @@ -174,7 +175,7 @@ func (s *ownedStreamsPartitionStrategy) checkRingForChanges() (bool, error) { if r.PartitionsCount() == 0 { return false, ring.ErrEmptyRing } - + // todo(ctovena): We might need to consider partition shard size changes as well. activePartitions := r.ActivePartitionIDs() ringChanged := !slices.Equal(s.previousActivePartitions, activePartitions) s.previousActivePartitions = activePartitions @@ -182,7 +183,11 @@ func (s *ownedStreamsPartitionStrategy) checkRingForChanges() (bool, error) { } func (s *ownedStreamsPartitionStrategy) isOwnedStream(str *stream) (bool, error) { - partitionForStream, err := s.partitionRingWatcher.PartitionRing().ActivePartitionForKey(lokiring.TokenFor(str.tenant, str.labelsString)) + subring, err := s.partitionRingWatcher.PartitionRing().ShuffleShard(str.tenant, s.getPartitionShardSize(str.tenant)) + if err != nil { + return false, fmt.Errorf("failed to get shuffle shard for stream: %w", err) + } + partitionForStream, err := subring.ActivePartitionForKey(lokiring.TokenFor(str.tenant, str.labelsString)) if err != nil { return false, fmt.Errorf("failed to find active partition for stream: %w", err) } diff --git a/pkg/ingester/recalculate_owned_streams_test.go b/pkg/ingester/recalculate_owned_streams_test.go index 82a733e593d6..d5dce8599287 100644 --- a/pkg/ingester/recalculate_owned_streams_test.go +++ b/pkg/ingester/recalculate_owned_streams_test.go @@ -108,7 +108,7 @@ func Test_recalculateOwnedStreams_recalculateWithIngesterStrategy(t *testing.T) strategy := newOwnedStreamsIngesterStrategy(currentIngesterName, mockRing, log.NewNopLogger()) service := newRecalculateOwnedStreamsSvc(mockTenantsSupplier.get, strategy, 50*time.Millisecond, log.NewNopLogger()) - //change the limit to assert that fixed limit is updated after the recalculation + // change the limit to assert that fixed limit is updated after the recalculation limits.DefaultLimits().MaxGlobalStreamsPerUser = 50 service.recalculate() @@ -120,7 +120,6 @@ func Test_recalculateOwnedStreams_recalculateWithIngesterStrategy(t *testing.T) require.Len(t, tenant.ownedStreamsSvc.notOwnedStreams, testData.expectedNotOwnedStreamCount) }) } - } type mockStreamsOwnershipRing struct { @@ -203,7 +202,7 @@ func Test_ownedStreamsPartitionStrategy_checkRingForChanges(t *testing.T) { ringReader := &mockPartitionRingReader{ ring: newMockPartitionRingWithActivePartitions(1), } - service := newOwnedStreamsPartitionStrategy(1, ringReader, log.NewNopLogger()) + service := newOwnedStreamsPartitionStrategy(1, ringReader, func(string) int { return 1 }, log.NewNopLogger()) ringChanged, err := service.checkRingForChanges() require.NoError(t, err) @@ -226,12 +225,12 @@ func Test_ownedStreamsPartitionStrategy_isOwnedStream(t *testing.T) { } stream := &stream{tenant: "test1", labelsString: "mock=1"} // has a hashkey mapping to partition 1 - service1 := newOwnedStreamsPartitionStrategy(1, ringReader, log.NewNopLogger()) + service1 := newOwnedStreamsPartitionStrategy(1, ringReader, func(string) int { return 1 }, log.NewNopLogger()) owned, err := service1.isOwnedStream(stream) require.NoError(t, err) require.True(t, owned) - service2 := newOwnedStreamsPartitionStrategy(2, ringReader, log.NewNopLogger()) + service2 := newOwnedStreamsPartitionStrategy(2, ringReader, func(string) int { return 1 }, log.NewNopLogger()) owned, err = service2.isOwnedStream(stream) require.NoError(t, err) require.False(t, owned) diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 9b362f20704f..5da6bc9cfc61 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -222,6 +222,8 @@ type Limits struct { BlockIngestionUntil dskit_flagext.Time `yaml:"block_ingestion_until" json:"block_ingestion_until"` BlockIngestionStatusCode int `yaml:"block_ingestion_status_code" json:"block_ingestion_status_code"` + + IngestionPartitionsTenantShardSize int `yaml:"ingestion_partitions_tenant_shard_size" json:"ingestion_partitions_tenant_shard_size" category:"experimental"` } type StreamRetention struct { @@ -412,6 +414,8 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.Var(&l.BlockIngestionUntil, "limits.block-ingestion-until", "Block ingestion until the configured date. The time should be in RFC3339 format.") f.IntVar(&l.BlockIngestionStatusCode, "limits.block-ingestion-status-code", defaultBlockedIngestionStatusCode, "HTTP status code to return when ingestion is blocked. If 200, the ingestion will be blocked without returning an error to the client. By Default, a custom status code (260) is returned to the client along with an error message.") + + f.IntVar(&l.IngestionPartitionsTenantShardSize, "limits.ingestion-partition-tenant-shard-size", 0, "The number of partitions a tenant's data should be sharded to when using kafka ingestion. Tenants are sharded across partitions using shuffle-sharding. 0 disables shuffle sharding and tenant is sharded across all partitions.") } // SetGlobalOTLPConfig set GlobalOTLPConfig which is used while unmarshaling per-tenant otlp config to use the default list of resource attributes picked as index labels. @@ -778,6 +782,10 @@ func (o *Overrides) RulerTenantShardSize(userID string) int { return o.getOverridesForUser(userID).RulerTenantShardSize } +func (o *Overrides) IngestionPartitionsTenantShardSize(userID string) int { + return o.getOverridesForUser(userID).IngestionPartitionsTenantShardSize +} + // RulerMaxRulesPerRuleGroup returns the maximum number of rules per rule group for a given user. func (o *Overrides) RulerMaxRulesPerRuleGroup(userID string) int { return o.getOverridesForUser(userID).RulerMaxRulesPerRuleGroup