From 4df08e695ac426b9b01ba65d80838bd961a170f9 Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Thu, 24 Oct 2024 16:56:12 +0200 Subject: [PATCH] Use KafkaSourceDescriptor as cache key and log entry --- .../beam/sdk/io/kafka/ReadFromKafkaDoFn.java | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index e0e9062d4e0..eed04d485b6 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -230,7 +230,8 @@ private ReadFromKafkaDoFn( private transient @Nullable LoadingCache offsetEstimatorCache; - private transient @Nullable LoadingCache avgRecordSizeCache; + private transient @Nullable LoadingCache + avgRecordSizeCache; private static final long DEFAULT_KAFKA_POLL_TIMEOUT = 2L; @VisibleForTesting final long consumerPollingTimeout; @VisibleForTesting final DeserializerProvider keyDeserializerProvider; @@ -293,7 +294,7 @@ public OffsetRange initialRestriction(@Element KafkaSourceDescriptor kafkaSource Map updatedConsumerConfig = overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor); TopicPartition partition = kafkaSourceDescriptor.getTopicPartition(); - LOG.info("Creating Kafka consumer for initial restriction for {}", partition); + LOG.info("Creating Kafka consumer for initial restriction for {}", kafkaSourceDescriptor); try (Consumer offsetConsumer = consumerFactoryFn.apply(updatedConsumerConfig)) { ConsumerSpEL.evaluateAssign(offsetConsumer, ImmutableList.of(partition)); long startOffset; @@ -343,10 +344,10 @@ public double getSize( throws ExecutionException { // If present, estimates the record size to offset gap ratio. Compacted topics may hold less // records than the estimated offset range due to record deletion within a partition. - final LoadingCache avgRecordSizeCache = + final LoadingCache avgRecordSizeCache = Preconditions.checkStateNotNull(this.avgRecordSizeCache); final @Nullable AverageRecordSize avgRecordSize = - avgRecordSizeCache.getIfPresent(kafkaSourceDescriptor.getTopicPartition()); + avgRecordSizeCache.getIfPresent(kafkaSourceDescriptor); // The tracker estimates the offset range by subtracting the last claimed position from the // currently observed end offset for the partition belonging to this split. double estimatedOffsetRange = @@ -403,7 +404,7 @@ public ProcessContinuation processElement( WatermarkEstimator watermarkEstimator, MultiOutputReceiver receiver) throws Exception { - final LoadingCache avgRecordSizeCache = + final LoadingCache avgRecordSizeCache = Preconditions.checkStateNotNull(this.avgRecordSizeCache); final LoadingCache offsetEstimatorCache = Preconditions.checkStateNotNull(this.offsetEstimatorCache); @@ -412,7 +413,7 @@ public ProcessContinuation processElement( final Deserializer valueDeserializerInstance = Preconditions.checkStateNotNull(this.valueDeserializerInstance); final TopicPartition topicPartition = kafkaSourceDescriptor.getTopicPartition(); - final AverageRecordSize avgRecordSize = avgRecordSizeCache.get(topicPartition); + final AverageRecordSize avgRecordSize = avgRecordSizeCache.get(kafkaSourceDescriptor); // TODO: Metrics should be reported per split instead of partition, add bootstrap server hash? final Distribution rawSizes = Metrics.distribution(METRIC_NAMESPACE, RAW_SIZE_METRIC_PREFIX + topicPartition.toString()); @@ -439,9 +440,7 @@ public ProcessContinuation processElement( topicPartition, Optional.ofNullable(watermarkEstimator.currentWatermark())); } - LOG.info( - "Creating Kafka consumer for process continuation for {}", - kafkaSourceDescriptor.getTopicPartition()); + LOG.info("Creating Kafka consumer for process continuation for {}", kafkaSourceDescriptor); try (Consumer consumer = consumerFactoryFn.apply(updatedConsumerConfig)) { ConsumerSpEL.evaluateAssign( consumer, ImmutableList.of(kafkaSourceDescriptor.getTopicPartition())); @@ -515,7 +514,7 @@ public ProcessContinuation processElement( (rawRecord.key() == null ? 0 : rawRecord.key().length) + (rawRecord.value() == null ? 0 : rawRecord.value().length); avgRecordSizeCache - .getUnchecked(kafkaSourceDescriptor.getTopicPartition()) + .getUnchecked(kafkaSourceDescriptor) .update(recordSize, rawRecord.offset() - expectedOffset); rawSizes.update(recordSize); expectedOffset = rawRecord.offset() + 1; @@ -630,9 +629,10 @@ public void setup() throws Exception { CacheBuilder.newBuilder() .maximumSize(1000L) .build( - new CacheLoader() { + new CacheLoader() { @Override - public AverageRecordSize load(TopicPartition topicPartition) throws Exception { + public AverageRecordSize load(KafkaSourceDescriptor kafkaSourceDescriptor) + throws Exception { return new AverageRecordSize(); } });