Skip to content

Commit

Permalink
[KafkaIO] Fix per-split metric updates for KafkaUnboundedReader and R…
Browse files Browse the repository at this point in the history
…eadFromKafkaDoFn (#32921)

* Revert "Set backlog in gauge metric (#31137)"

* Revert "Add Backlog Metrics to  Kafka Splittable DoFn Implementation (#31281)"

This reverts commit fd4368f.

* Call reportBacklog in nextBatch to report split metrics more often

* Report SDF metrics for the active split/partition after processing a record batch

* Use KafkaSourceDescriptor as cache key and log entry
  • Loading branch information
sjvanrossum authored Nov 13, 2024
1 parent f25ac69 commit b62e8c4
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -227,10 +226,6 @@ public boolean advance() throws IOException {
METRIC_NAMESPACE, RAW_SIZE_METRIC_PREFIX + pState.topicPartition.toString());
rawSizes.update(recordSize);

for (Map.Entry<String, Long> backlogSplit : perPartitionBacklogMetrics.entrySet()) {
backlogBytesOfSplit.set(backlogSplit.getValue());
}

// Pass metrics to container.
kafkaResults.updateKafkaMetrics();
return true;
Expand Down Expand Up @@ -349,7 +344,6 @@ public long getSplitBacklogBytes() {
private final Counter bytesReadBySplit;
private final Gauge backlogBytesOfSplit;
private final Gauge backlogElementsOfSplit;
private HashMap<String, Long> perPartitionBacklogMetrics = new HashMap<String, Long>();;
private final Counter checkpointMarkCommitsEnqueued =
Metrics.counter(METRIC_NAMESPACE, CHECKPOINT_MARK_COMMITS_ENQUEUED_METRIC);
// Checkpoint marks skipped in favor of newer mark (only the latest needs to be committed).
Expand Down Expand Up @@ -506,10 +500,6 @@ Instant updateAndGetWatermark() {
lastWatermark = timestampPolicy.getWatermark(mkTimestampPolicyContext());
return lastWatermark;
}

String name() {
return this.topicPartition.toString();
}
}

KafkaUnboundedReader(
Expand Down Expand Up @@ -554,16 +544,14 @@ String name() {
prevWatermark = Optional.of(new Instant(ckptMark.getWatermarkMillis()));
}

PartitionState<K, V> state =
new PartitionState<K, V>(
states.add(
new PartitionState<>(
tp,
nextOffset,
source
.getSpec()
.getTimestampPolicyFactory()
.createTimestampPolicy(tp, prevWatermark));
states.add(state);
perPartitionBacklogMetrics.put(state.name(), 0L);
.createTimestampPolicy(tp, prevWatermark)));
}

partitionStates = ImmutableList.copyOf(states);
Expand Down Expand Up @@ -680,6 +668,8 @@ private void nextBatch() throws IOException {

partitionStates.forEach(p -> p.recordIter = records.records(p.topicPartition).iterator());

reportBacklog();

// cycle through the partitions in order to interleave records from each.
curBatch = Iterators.cycle(new ArrayList<>(partitionStates));
}
Expand Down Expand Up @@ -758,7 +748,6 @@ private long getSplitBacklogMessageCount() {
if (pBacklog == UnboundedReader.BACKLOG_UNKNOWN) {
return UnboundedReader.BACKLOG_UNKNOWN;
}
perPartitionBacklogMetrics.put(p.name(), pBacklog);
backlogCount += pBacklog;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import java.math.BigDecimal;
import java.math.MathContext;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
Expand Down Expand Up @@ -222,13 +225,12 @@ private ReadFromKafkaDoFn(
// Valid between bundle start and bundle finish.
private transient @Nullable Deserializer<K> keyDeserializerInstance = null;
private transient @Nullable Deserializer<V> valueDeserializerInstance = null;
private transient @Nullable Map<TopicPartition, KafkaLatestOffsetEstimator> offsetEstimatorCache;
private transient @Nullable LoadingCache<KafkaSourceDescriptor, KafkaLatestOffsetEstimator>
offsetEstimatorCache;

private transient @Nullable LoadingCache<TopicPartition, AverageRecordSize> avgRecordSize;
private transient @Nullable LoadingCache<KafkaSourceDescriptor, AverageRecordSize>
avgRecordSizeCache;
private static final long DEFAULT_KAFKA_POLL_TIMEOUT = 2L;

private HashMap<String, Long> perPartitionBacklogMetrics = new HashMap<String, Long>();;

@VisibleForTesting final long consumerPollingTimeout;
@VisibleForTesting final DeserializerProvider<K> keyDeserializerProvider;
@VisibleForTesting final DeserializerProvider<V> valueDeserializerProvider;
Expand Down Expand Up @@ -290,7 +292,7 @@ public OffsetRange initialRestriction(@Element KafkaSourceDescriptor kafkaSource
Map<String, Object> updatedConsumerConfig =
overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor);
TopicPartition partition = kafkaSourceDescriptor.getTopicPartition();
LOG.info("Creating Kafka consumer for initial restriction for {}", partition);
LOG.info("Creating Kafka consumer for initial restriction for {}", kafkaSourceDescriptor);
try (Consumer<byte[], byte[]> offsetConsumer = consumerFactoryFn.apply(updatedConsumerConfig)) {
ConsumerSpEL.evaluateAssign(offsetConsumer, ImmutableList.of(partition));
long startOffset;
Expand Down Expand Up @@ -337,63 +339,42 @@ public WatermarkEstimator<Instant> newWatermarkEstimator(
@GetSize
public double getSize(
@Element KafkaSourceDescriptor kafkaSourceDescriptor, @Restriction OffsetRange offsetRange)
throws Exception {
throws ExecutionException {
// If present, estimates the record size to offset gap ratio. Compacted topics may hold less
// records than the estimated offset range due to record deletion within a partition.
final LoadingCache<TopicPartition, AverageRecordSize> avgRecordSize =
Preconditions.checkStateNotNull(this.avgRecordSize);
final LoadingCache<KafkaSourceDescriptor, AverageRecordSize> avgRecordSizeCache =
Preconditions.checkStateNotNull(this.avgRecordSizeCache);
final @Nullable AverageRecordSize avgRecordSize =
avgRecordSizeCache.getIfPresent(kafkaSourceDescriptor);
// The tracker estimates the offset range by subtracting the last claimed position from the
// currently observed end offset for the partition belonging to this split.
double estimatedOffsetRange =
restrictionTracker(kafkaSourceDescriptor, offsetRange).getProgress().getWorkRemaining();
// Before processing elements, we don't have a good estimated size of records and offset gap.
// Return the estimated offset range without scaling by a size to gap ratio.
if (!avgRecordSize.asMap().containsKey(kafkaSourceDescriptor.getTopicPartition())) {
if (avgRecordSize == null) {
return estimatedOffsetRange;
}
if (offsetEstimatorCache != null) {
for (Map.Entry<TopicPartition, KafkaLatestOffsetEstimator> tp :
offsetEstimatorCache.entrySet()) {
perPartitionBacklogMetrics.put(tp.getKey().toString(), tp.getValue().estimate());
}
}

// When processing elements, a moving average estimates the size of records and offset gap.
// Return the estimated offset range scaled by the estimated size to gap ratio.
return estimatedOffsetRange
* avgRecordSize
.get(kafkaSourceDescriptor.getTopicPartition())
.estimateRecordByteSizeToOffsetCountRatio();
return estimatedOffsetRange * avgRecordSize.estimateRecordByteSizeToOffsetCountRatio();
}

@NewTracker
public OffsetRangeTracker restrictionTracker(
@Element KafkaSourceDescriptor kafkaSourceDescriptor, @Restriction OffsetRange restriction) {
@Element KafkaSourceDescriptor kafkaSourceDescriptor, @Restriction OffsetRange restriction)
throws ExecutionException {
if (restriction.getTo() < Long.MAX_VALUE) {
return new OffsetRangeTracker(restriction);
}

// OffsetEstimators are cached for each topic-partition because they hold a stateful connection,
// so we want to minimize the amount of connections that we start and track with Kafka. Another
// point is that it has a memoized backlog, and this should make that more reusable estimations.
final Map<TopicPartition, KafkaLatestOffsetEstimator> offsetEstimatorCacheInstance =
final LoadingCache<KafkaSourceDescriptor, KafkaLatestOffsetEstimator> offsetEstimatorCache =
Preconditions.checkStateNotNull(this.offsetEstimatorCache);

TopicPartition topicPartition = kafkaSourceDescriptor.getTopicPartition();
KafkaLatestOffsetEstimator offsetEstimator = offsetEstimatorCacheInstance.get(topicPartition);
if (offsetEstimator == null) {
Map<String, Object> updatedConsumerConfig =
overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor);

LOG.info("Creating Kafka consumer for offset estimation for {}", topicPartition);

Consumer<byte[], byte[]> offsetConsumer =
consumerFactoryFn.apply(
KafkaIOUtils.getOffsetConsumerConfig(
"tracker-" + topicPartition, offsetConsumerConfig, updatedConsumerConfig));
offsetEstimator = new KafkaLatestOffsetEstimator(offsetConsumer, topicPartition);
offsetEstimatorCacheInstance.put(topicPartition, offsetEstimator);
}
final KafkaLatestOffsetEstimator offsetEstimator =
offsetEstimatorCache.get(kafkaSourceDescriptor);

return new GrowableOffsetRangeTracker(restriction.getFrom(), offsetEstimator);
}
Expand All @@ -405,22 +386,22 @@ public ProcessContinuation processElement(
WatermarkEstimator<Instant> watermarkEstimator,
MultiOutputReceiver receiver)
throws Exception {
final LoadingCache<TopicPartition, AverageRecordSize> avgRecordSize =
Preconditions.checkStateNotNull(this.avgRecordSize);
final LoadingCache<KafkaSourceDescriptor, AverageRecordSize> avgRecordSizeCache =
Preconditions.checkStateNotNull(this.avgRecordSizeCache);
final LoadingCache<KafkaSourceDescriptor, KafkaLatestOffsetEstimator> offsetEstimatorCache =
Preconditions.checkStateNotNull(this.offsetEstimatorCache);
final Deserializer<K> keyDeserializerInstance =
Preconditions.checkStateNotNull(this.keyDeserializerInstance);
final Deserializer<V> valueDeserializerInstance =
Preconditions.checkStateNotNull(this.valueDeserializerInstance);
final TopicPartition topicPartition = kafkaSourceDescriptor.getTopicPartition();
final AverageRecordSize avgRecordSize = avgRecordSizeCache.get(kafkaSourceDescriptor);
// TODO: Metrics should be reported per split instead of partition, add bootstrap server hash?
final Distribution rawSizes =
Metrics.distribution(
METRIC_NAMESPACE,
RAW_SIZE_METRIC_PREFIX + kafkaSourceDescriptor.getTopicPartition().toString());
for (Map.Entry<String, Long> backlogSplit : perPartitionBacklogMetrics.entrySet()) {
Gauge backlog =
Metrics.gauge(
METRIC_NAMESPACE, RAW_SIZE_METRIC_PREFIX + "backlogBytes_" + backlogSplit.getKey());
backlog.set(backlogSplit.getValue());
}
Metrics.distribution(METRIC_NAMESPACE, RAW_SIZE_METRIC_PREFIX + topicPartition.toString());
final Gauge backlogBytes =
Metrics.gauge(
METRIC_NAMESPACE, RAW_SIZE_METRIC_PREFIX + "backlogBytes_" + topicPartition.toString());

// Stop processing current TopicPartition when it's time to stop.
if (checkStopReadingFn != null
Expand All @@ -438,13 +419,10 @@ public ProcessContinuation processElement(
if (timestampPolicyFactory != null) {
timestampPolicy =
timestampPolicyFactory.createTimestampPolicy(
kafkaSourceDescriptor.getTopicPartition(),
Optional.ofNullable(watermarkEstimator.currentWatermark()));
topicPartition, Optional.ofNullable(watermarkEstimator.currentWatermark()));
}

LOG.info(
"Creating Kafka consumer for process continuation for {}",
kafkaSourceDescriptor.getTopicPartition());
LOG.info("Creating Kafka consumer for process continuation for {}", kafkaSourceDescriptor);
try (Consumer<byte[], byte[]> consumer = consumerFactoryFn.apply(updatedConsumerConfig)) {
ConsumerSpEL.evaluateAssign(
consumer, ImmutableList.of(kafkaSourceDescriptor.getTopicPartition()));
Expand Down Expand Up @@ -518,8 +496,8 @@ public ProcessContinuation processElement(
int recordSize =
(rawRecord.key() == null ? 0 : rawRecord.key().length)
+ (rawRecord.value() == null ? 0 : rawRecord.value().length);
avgRecordSize
.getUnchecked(kafkaSourceDescriptor.getTopicPartition())
avgRecordSizeCache
.getUnchecked(kafkaSourceDescriptor)
.update(recordSize, rawRecord.offset() - expectedOffset);
rawSizes.update(recordSize);
expectedOffset = rawRecord.offset() + 1;
Expand Down Expand Up @@ -551,6 +529,15 @@ public ProcessContinuation processElement(
}
}
}

backlogBytes.set(
(long)
(BigDecimal.valueOf(
Preconditions.checkStateNotNull(
offsetEstimatorCache.get(kafkaSourceDescriptor).estimate()))
.subtract(BigDecimal.valueOf(expectedOffset), MathContext.DECIMAL128)
.doubleValue()
* avgRecordSize.estimateRecordByteSizeToOffsetCountRatio()));
}
}
}
Expand Down Expand Up @@ -611,19 +598,44 @@ public Coder<OffsetRange> restrictionCoder() {
@Setup
public void setup() throws Exception {
// Start to track record size and offset gap per bundle.
avgRecordSize =
avgRecordSizeCache =
CacheBuilder.newBuilder()
.maximumSize(1000L)
.build(
new CacheLoader<TopicPartition, AverageRecordSize>() {
new CacheLoader<KafkaSourceDescriptor, AverageRecordSize>() {
@Override
public AverageRecordSize load(TopicPartition topicPartition) throws Exception {
public AverageRecordSize load(KafkaSourceDescriptor kafkaSourceDescriptor)
throws Exception {
return new AverageRecordSize();
}
});
keyDeserializerInstance = keyDeserializerProvider.getDeserializer(consumerConfig, true);
valueDeserializerInstance = valueDeserializerProvider.getDeserializer(consumerConfig, false);
offsetEstimatorCache = new HashMap<>();
offsetEstimatorCache =
CacheBuilder.newBuilder()
.weakValues()
.expireAfterAccess(1, TimeUnit.MINUTES)
.build(
new CacheLoader<KafkaSourceDescriptor, KafkaLatestOffsetEstimator>() {
@Override
public KafkaLatestOffsetEstimator load(
KafkaSourceDescriptor kafkaSourceDescriptor) throws Exception {
LOG.info(
"Creating Kafka consumer for offset estimation for {}",
kafkaSourceDescriptor);

TopicPartition topicPartition = kafkaSourceDescriptor.getTopicPartition();
Map<String, Object> updatedConsumerConfig =
overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor);
Consumer<byte[], byte[]> offsetConsumer =
consumerFactoryFn.apply(
KafkaIOUtils.getOffsetConsumerConfig(
"tracker-" + topicPartition,
offsetConsumerConfig,
updatedConsumerConfig));
return new KafkaLatestOffsetEstimator(offsetConsumer, topicPartition);
}
});
if (checkStopReadingFn != null) {
checkStopReadingFn.setup();
}
Expand All @@ -645,7 +657,7 @@ public void teardown() throws Exception {
}

if (offsetEstimatorCache != null) {
offsetEstimatorCache.clear();
offsetEstimatorCache.invalidateAll();
}
if (checkStopReadingFn != null) {
checkStopReadingFn.teardown();
Expand Down

0 comments on commit b62e8c4

Please sign in to comment.