Skip to content

Commit

Permalink
Use KafkaSourceDescriptor as cache key and log entry
Browse files Browse the repository at this point in the history
  • Loading branch information
sjvanrossum committed Nov 12, 2024
1 parent 3f01ea0 commit 4df08e6
Showing 1 changed file with 12 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,8 @@ private ReadFromKafkaDoFn(
private transient @Nullable LoadingCache<KafkaSourceDescriptor, KafkaLatestOffsetEstimator>
offsetEstimatorCache;

private transient @Nullable LoadingCache<TopicPartition, AverageRecordSize> avgRecordSizeCache;
private transient @Nullable LoadingCache<KafkaSourceDescriptor, AverageRecordSize>
avgRecordSizeCache;
private static final long DEFAULT_KAFKA_POLL_TIMEOUT = 2L;
@VisibleForTesting final long consumerPollingTimeout;
@VisibleForTesting final DeserializerProvider<K> keyDeserializerProvider;
Expand Down Expand Up @@ -293,7 +294,7 @@ public OffsetRange initialRestriction(@Element KafkaSourceDescriptor kafkaSource
Map<String, Object> updatedConsumerConfig =
overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor);
TopicPartition partition = kafkaSourceDescriptor.getTopicPartition();
LOG.info("Creating Kafka consumer for initial restriction for {}", partition);
LOG.info("Creating Kafka consumer for initial restriction for {}", kafkaSourceDescriptor);
try (Consumer<byte[], byte[]> offsetConsumer = consumerFactoryFn.apply(updatedConsumerConfig)) {
ConsumerSpEL.evaluateAssign(offsetConsumer, ImmutableList.of(partition));
long startOffset;
Expand Down Expand Up @@ -343,10 +344,10 @@ public double getSize(
throws ExecutionException {
// If present, estimates the record size to offset gap ratio. Compacted topics may hold less
// records than the estimated offset range due to record deletion within a partition.
final LoadingCache<TopicPartition, AverageRecordSize> avgRecordSizeCache =
final LoadingCache<KafkaSourceDescriptor, AverageRecordSize> avgRecordSizeCache =
Preconditions.checkStateNotNull(this.avgRecordSizeCache);
final @Nullable AverageRecordSize avgRecordSize =
avgRecordSizeCache.getIfPresent(kafkaSourceDescriptor.getTopicPartition());
avgRecordSizeCache.getIfPresent(kafkaSourceDescriptor);
// The tracker estimates the offset range by subtracting the last claimed position from the
// currently observed end offset for the partition belonging to this split.
double estimatedOffsetRange =
Expand Down Expand Up @@ -403,7 +404,7 @@ public ProcessContinuation processElement(
WatermarkEstimator<Instant> watermarkEstimator,
MultiOutputReceiver receiver)
throws Exception {
final LoadingCache<TopicPartition, AverageRecordSize> avgRecordSizeCache =
final LoadingCache<KafkaSourceDescriptor, AverageRecordSize> avgRecordSizeCache =
Preconditions.checkStateNotNull(this.avgRecordSizeCache);
final LoadingCache<KafkaSourceDescriptor, KafkaLatestOffsetEstimator> offsetEstimatorCache =
Preconditions.checkStateNotNull(this.offsetEstimatorCache);
Expand All @@ -412,7 +413,7 @@ public ProcessContinuation processElement(
final Deserializer<V> valueDeserializerInstance =
Preconditions.checkStateNotNull(this.valueDeserializerInstance);
final TopicPartition topicPartition = kafkaSourceDescriptor.getTopicPartition();
final AverageRecordSize avgRecordSize = avgRecordSizeCache.get(topicPartition);
final AverageRecordSize avgRecordSize = avgRecordSizeCache.get(kafkaSourceDescriptor);
// TODO: Metrics should be reported per split instead of partition, add bootstrap server hash?
final Distribution rawSizes =
Metrics.distribution(METRIC_NAMESPACE, RAW_SIZE_METRIC_PREFIX + topicPartition.toString());
Expand All @@ -439,9 +440,7 @@ public ProcessContinuation processElement(
topicPartition, Optional.ofNullable(watermarkEstimator.currentWatermark()));
}

LOG.info(
"Creating Kafka consumer for process continuation for {}",
kafkaSourceDescriptor.getTopicPartition());
LOG.info("Creating Kafka consumer for process continuation for {}", kafkaSourceDescriptor);
try (Consumer<byte[], byte[]> consumer = consumerFactoryFn.apply(updatedConsumerConfig)) {
ConsumerSpEL.evaluateAssign(
consumer, ImmutableList.of(kafkaSourceDescriptor.getTopicPartition()));
Expand Down Expand Up @@ -515,7 +514,7 @@ public ProcessContinuation processElement(
(rawRecord.key() == null ? 0 : rawRecord.key().length)
+ (rawRecord.value() == null ? 0 : rawRecord.value().length);
avgRecordSizeCache
.getUnchecked(kafkaSourceDescriptor.getTopicPartition())
.getUnchecked(kafkaSourceDescriptor)
.update(recordSize, rawRecord.offset() - expectedOffset);
rawSizes.update(recordSize);
expectedOffset = rawRecord.offset() + 1;
Expand Down Expand Up @@ -630,9 +629,10 @@ public void setup() throws Exception {
CacheBuilder.newBuilder()
.maximumSize(1000L)
.build(
new CacheLoader<TopicPartition, AverageRecordSize>() {
new CacheLoader<KafkaSourceDescriptor, AverageRecordSize>() {
@Override
public AverageRecordSize load(TopicPartition topicPartition) throws Exception {
public AverageRecordSize load(KafkaSourceDescriptor kafkaSourceDescriptor)
throws Exception {
return new AverageRecordSize();
}
});
Expand Down

0 comments on commit 4df08e6

Please sign in to comment.