Skip to content

Commit

Permalink
feat(ingester): implement partition shuffle sharding for ingester (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
cyriltovena authored Oct 2, 2024
1 parent 30528c9 commit 1a4436c
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 17 deletions.
6 changes: 6 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3813,6 +3813,12 @@ otlp_config:
# status code (260) is returned to the client along with an error message.
# CLI flag: -limits.block-ingestion-status-code
[block_ingestion_status_code: <int> | default = 260]

# The number of partitions a tenant's data should be sharded to when using kafka
# ingestion. Tenants are sharded across partitions using shuffle-sharding. 0
# disables shuffle sharding and tenant is sharded across all partitions.
# CLI flag: -limits.ingestion-partition-tenant-shard-size
[ingestion_partitions_tenant_shard_size: <int> | default = 0]
```
### local_storage_config
Expand Down
14 changes: 9 additions & 5 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,8 +604,12 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
tracker.streamsPending.Store(int32(streamsToWrite))

if d.cfg.KafkaEnabled {
subring, err := d.partitionRing.PartitionRing().ShuffleShard(tenantID, d.validator.IngestionPartitionsTenantShardSize(tenantID))
if err != nil {
return nil, err
}
// We don't need to create a new context like the ingester writes, because we don't return unless all writes have succeeded.
d.sendStreamsToKafka(ctx, streams, tenantID, &tracker)
d.sendStreamsToKafka(ctx, streams, tenantID, &tracker, subring)
}

if d.cfg.IngesterEnabled {
Expand Down Expand Up @@ -931,10 +935,10 @@ func (d *Distributor) sendStreamsErr(ctx context.Context, ingester ring.Instance
return err
}

func (d *Distributor) sendStreamsToKafka(ctx context.Context, streams []KeyedStream, tenant string, tracker *pushTracker) {
func (d *Distributor) sendStreamsToKafka(ctx context.Context, streams []KeyedStream, tenant string, tracker *pushTracker, subring *ring.PartitionRing) {
for _, s := range streams {
go func(s KeyedStream) {
err := d.sendStreamToKafka(ctx, s, tenant)
err := d.sendStreamToKafka(ctx, s, tenant, subring)
if err != nil {
err = fmt.Errorf("failed to write stream to kafka: %w", err)
}
Expand All @@ -943,11 +947,11 @@ func (d *Distributor) sendStreamsToKafka(ctx context.Context, streams []KeyedStr
}
}

func (d *Distributor) sendStreamToKafka(ctx context.Context, stream KeyedStream, tenant string) error {
func (d *Distributor) sendStreamToKafka(ctx context.Context, stream KeyedStream, tenant string, subring *ring.PartitionRing) error {
if len(stream.Stream.Entries) == 0 {
return nil
}
partitionID, err := d.partitionRing.PartitionRing().ActivePartitionForKey(stream.HashKey)
partitionID, err := subring.ActivePartitionForKey(stream.HashKey)
if err != nil {
d.kafkaAppends.WithLabelValues("kafka", "fail").Inc()
return fmt.Errorf("failed to find active partition for stream: %w", err)
Expand Down
2 changes: 2 additions & 0 deletions pkg/distributor/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,6 @@ type Limits interface {

BlockIngestionUntil(userID string) time.Time
BlockIngestionStatusCode(userID string) int

IngestionPartitionsTenantShardSize(userID string) int
}
2 changes: 1 addition & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ func New(cfg Config, clientConfig client.Config, store Store, limits Limits, con

var ownedStreamsStrategy ownershipStrategy
if i.cfg.KafkaIngestion.Enabled {
ownedStreamsStrategy = newOwnedStreamsPartitionStrategy(i.ingestPartitionID, partitionRingWatcher, util_log.Logger)
ownedStreamsStrategy = newOwnedStreamsPartitionStrategy(i.ingestPartitionID, partitionRingWatcher, limits.IngestionPartitionsTenantShardSize, util_log.Logger)
} else {
ownedStreamsStrategy = newOwnedStreamsIngesterStrategy(i.lifecycler.ID, i.readRing, util_log.Logger)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/ingester/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Limits interface {
MaxGlobalStreamsPerUser(userID string) int
PerStreamRateLimit(userID string) validation.RateLimit
ShardStreams(userID string) shardstreams.Config
IngestionPartitionsTenantShardSize(userID string) int
}

// Limiter implements primitives to get the maximum number of streams
Expand Down
17 changes: 11 additions & 6 deletions pkg/ingester/recalculate_owned_streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,12 @@ type ownedStreamsPartitionStrategy struct {
getPartitionShardSize func(user string) int
}

func newOwnedStreamsPartitionStrategy(partitionID int32, ring ring.PartitionRingReader, logger log.Logger) *ownedStreamsPartitionStrategy {
func newOwnedStreamsPartitionStrategy(partitionID int32, ring ring.PartitionRingReader, getPartitionShardSize func(user string) int, logger log.Logger) *ownedStreamsPartitionStrategy {
return &ownedStreamsPartitionStrategy{
partitionID: partitionID,
partitionRingWatcher: ring,
logger: logger,
partitionID: partitionID,
partitionRingWatcher: ring,
logger: logger,
getPartitionShardSize: getPartitionShardSize,
}
}

Expand All @@ -174,15 +175,19 @@ func (s *ownedStreamsPartitionStrategy) checkRingForChanges() (bool, error) {
if r.PartitionsCount() == 0 {
return false, ring.ErrEmptyRing
}

// todo(ctovena): We might need to consider partition shard size changes as well.
activePartitions := r.ActivePartitionIDs()
ringChanged := !slices.Equal(s.previousActivePartitions, activePartitions)
s.previousActivePartitions = activePartitions
return ringChanged, nil
}

func (s *ownedStreamsPartitionStrategy) isOwnedStream(str *stream) (bool, error) {
partitionForStream, err := s.partitionRingWatcher.PartitionRing().ActivePartitionForKey(lokiring.TokenFor(str.tenant, str.labelsString))
subring, err := s.partitionRingWatcher.PartitionRing().ShuffleShard(str.tenant, s.getPartitionShardSize(str.tenant))
if err != nil {
return false, fmt.Errorf("failed to get shuffle shard for stream: %w", err)
}
partitionForStream, err := subring.ActivePartitionForKey(lokiring.TokenFor(str.tenant, str.labelsString))
if err != nil {
return false, fmt.Errorf("failed to find active partition for stream: %w", err)
}
Expand Down
9 changes: 4 additions & 5 deletions pkg/ingester/recalculate_owned_streams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func Test_recalculateOwnedStreams_recalculateWithIngesterStrategy(t *testing.T)

strategy := newOwnedStreamsIngesterStrategy(currentIngesterName, mockRing, log.NewNopLogger())
service := newRecalculateOwnedStreamsSvc(mockTenantsSupplier.get, strategy, 50*time.Millisecond, log.NewNopLogger())
//change the limit to assert that fixed limit is updated after the recalculation
// change the limit to assert that fixed limit is updated after the recalculation
limits.DefaultLimits().MaxGlobalStreamsPerUser = 50

service.recalculate()
Expand All @@ -120,7 +120,6 @@ func Test_recalculateOwnedStreams_recalculateWithIngesterStrategy(t *testing.T)
require.Len(t, tenant.ownedStreamsSvc.notOwnedStreams, testData.expectedNotOwnedStreamCount)
})
}

}

type mockStreamsOwnershipRing struct {
Expand Down Expand Up @@ -203,7 +202,7 @@ func Test_ownedStreamsPartitionStrategy_checkRingForChanges(t *testing.T) {
ringReader := &mockPartitionRingReader{
ring: newMockPartitionRingWithActivePartitions(1),
}
service := newOwnedStreamsPartitionStrategy(1, ringReader, log.NewNopLogger())
service := newOwnedStreamsPartitionStrategy(1, ringReader, func(string) int { return 1 }, log.NewNopLogger())

ringChanged, err := service.checkRingForChanges()
require.NoError(t, err)
Expand All @@ -226,12 +225,12 @@ func Test_ownedStreamsPartitionStrategy_isOwnedStream(t *testing.T) {
}
stream := &stream{tenant: "test1", labelsString: "mock=1"} // has a hashkey mapping to partition 1

service1 := newOwnedStreamsPartitionStrategy(1, ringReader, log.NewNopLogger())
service1 := newOwnedStreamsPartitionStrategy(1, ringReader, func(string) int { return 1 }, log.NewNopLogger())
owned, err := service1.isOwnedStream(stream)
require.NoError(t, err)
require.True(t, owned)

service2 := newOwnedStreamsPartitionStrategy(2, ringReader, log.NewNopLogger())
service2 := newOwnedStreamsPartitionStrategy(2, ringReader, func(string) int { return 1 }, log.NewNopLogger())
owned, err = service2.isOwnedStream(stream)
require.NoError(t, err)
require.False(t, owned)
Expand Down
8 changes: 8 additions & 0 deletions pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ type Limits struct {

BlockIngestionUntil dskit_flagext.Time `yaml:"block_ingestion_until" json:"block_ingestion_until"`
BlockIngestionStatusCode int `yaml:"block_ingestion_status_code" json:"block_ingestion_status_code"`

IngestionPartitionsTenantShardSize int `yaml:"ingestion_partitions_tenant_shard_size" json:"ingestion_partitions_tenant_shard_size" category:"experimental"`
}

type StreamRetention struct {
Expand Down Expand Up @@ -412,6 +414,8 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {

f.Var(&l.BlockIngestionUntil, "limits.block-ingestion-until", "Block ingestion until the configured date. The time should be in RFC3339 format.")
f.IntVar(&l.BlockIngestionStatusCode, "limits.block-ingestion-status-code", defaultBlockedIngestionStatusCode, "HTTP status code to return when ingestion is blocked. If 200, the ingestion will be blocked without returning an error to the client. By Default, a custom status code (260) is returned to the client along with an error message.")

f.IntVar(&l.IngestionPartitionsTenantShardSize, "limits.ingestion-partition-tenant-shard-size", 0, "The number of partitions a tenant's data should be sharded to when using kafka ingestion. Tenants are sharded across partitions using shuffle-sharding. 0 disables shuffle sharding and tenant is sharded across all partitions.")
}

// SetGlobalOTLPConfig set GlobalOTLPConfig which is used while unmarshaling per-tenant otlp config to use the default list of resource attributes picked as index labels.
Expand Down Expand Up @@ -778,6 +782,10 @@ func (o *Overrides) RulerTenantShardSize(userID string) int {
return o.getOverridesForUser(userID).RulerTenantShardSize
}

func (o *Overrides) IngestionPartitionsTenantShardSize(userID string) int {
return o.getOverridesForUser(userID).IngestionPartitionsTenantShardSize
}

// RulerMaxRulesPerRuleGroup returns the maximum number of rules per rule group for a given user.
func (o *Overrides) RulerMaxRulesPerRuleGroup(userID string) int {
return o.getOverridesForUser(userID).RulerMaxRulesPerRuleGroup
Expand Down

0 comments on commit 1a4436c

Please sign in to comment.