Skip to content

Commit

Permalink
Share AvgRecordSize and KafkaLatestOffsetEstimator caches among DoFns
Browse files Browse the repository at this point in the history
  • Loading branch information
sjvanrossum committed Nov 12, 2024
1 parent 180f70c commit 22a6bd0
Showing 1 changed file with 75 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.sdk.coders.Coder;
Expand Down Expand Up @@ -64,6 +66,7 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Closeables;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
Expand Down Expand Up @@ -209,6 +212,23 @@ private ReadFromKafkaDoFn(

private static final Logger LOG = LoggerFactory.getLogger(ReadFromKafkaDoFn.class);

/**
* A holder class for all construction time unique instances of {@link ReadFromKafkaDoFn}. Caches
* must run clean up tasks when {@link #teardown()} is called.
*/
private static final class SharedStateHolder {

private static final Map<Long, LoadingCache<KafkaSourceDescriptor, KafkaLatestOffsetEstimator>>
OFFSET_ESTIMATOR_CACHE = new ConcurrentHashMap<>();
private static final Map<Long, LoadingCache<KafkaSourceDescriptor, AverageRecordSize>>
AVG_RECORD_SIZE_CACHE = new ConcurrentHashMap<>();
}

private static final AtomicLong FN_ID = new AtomicLong();

// A unique identifier for the instance. Generally unique unless the ID generator overflows.
private final long fnId = FN_ID.getAndIncrement();

private final @Nullable Map<String, Object> offsetConsumerConfig;

private final @Nullable CheckStopReadingFn checkStopReadingFn;
Expand Down Expand Up @@ -610,50 +630,67 @@ public Coder<OffsetRange> restrictionCoder() {
public void setup() throws Exception {
// Start to track record size and offset gap per bundle.
avgRecordSizeCache =
CacheBuilder.newBuilder()
.maximumSize(1000L)
.build(
new CacheLoader<KafkaSourceDescriptor, AverageRecordSize>() {
@Override
public AverageRecordSize load(KafkaSourceDescriptor kafkaSourceDescriptor)
throws Exception {
return new AverageRecordSize();
}
});
SharedStateHolder.AVG_RECORD_SIZE_CACHE.computeIfAbsent(
fnId,
k -> {
return CacheBuilder.newBuilder()
.maximumSize(1000L)
.build(
new CacheLoader<KafkaSourceDescriptor, AverageRecordSize>() {
@Override
public AverageRecordSize load(KafkaSourceDescriptor kafkaSourceDescriptor)
throws Exception {
return new AverageRecordSize();
}
});
});
keyDeserializerInstance = keyDeserializerProvider.getDeserializer(consumerConfig, true);
valueDeserializerInstance = valueDeserializerProvider.getDeserializer(consumerConfig, false);
offsetEstimatorCache =
CacheBuilder.newBuilder()
.weakValues()
.expireAfterAccess(1, TimeUnit.MINUTES)
.build(
new CacheLoader<KafkaSourceDescriptor, KafkaLatestOffsetEstimator>() {
@Override
public KafkaLatestOffsetEstimator load(
KafkaSourceDescriptor kafkaSourceDescriptor) throws Exception {
LOG.info(
"Creating Kafka consumer for offset estimation for {}",
kafkaSourceDescriptor);

TopicPartition topicPartition = kafkaSourceDescriptor.getTopicPartition();
Map<String, Object> updatedConsumerConfig =
overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor);
Consumer<byte[], byte[]> offsetConsumer =
consumerFactoryFn.apply(
KafkaIOUtils.getOffsetConsumerConfig(
"tracker-" + topicPartition,
offsetConsumerConfig,
updatedConsumerConfig));
return new KafkaLatestOffsetEstimator(offsetConsumer, topicPartition);
}
});
SharedStateHolder.OFFSET_ESTIMATOR_CACHE.computeIfAbsent(
fnId,
k -> {
final Map<String, Object> consumerConfig = ImmutableMap.copyOf(this.consumerConfig);
final @Nullable Map<String, Object> offsetConsumerConfig =
this.offsetConsumerConfig == null
? null
: ImmutableMap.copyOf(this.offsetConsumerConfig);
return CacheBuilder.newBuilder()
.weakValues()
.expireAfterAccess(1, TimeUnit.MINUTES)
.build(
new CacheLoader<KafkaSourceDescriptor, KafkaLatestOffsetEstimator>() {
@Override
public KafkaLatestOffsetEstimator load(
KafkaSourceDescriptor kafkaSourceDescriptor) throws Exception {
LOG.info(
"Creating Kafka consumer for offset estimation for {}",
kafkaSourceDescriptor);

TopicPartition topicPartition = kafkaSourceDescriptor.getTopicPartition();
Map<String, Object> updatedConsumerConfig =
overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor);
Consumer<byte[], byte[]> offsetConsumer =
consumerFactoryFn.apply(
KafkaIOUtils.getOffsetConsumerConfig(
"tracker-" + topicPartition,
offsetConsumerConfig,
updatedConsumerConfig));
return new KafkaLatestOffsetEstimator(offsetConsumer, topicPartition);
}
});
});
if (checkStopReadingFn != null) {
checkStopReadingFn.setup();
}
}

@Teardown
public void teardown() throws Exception {
final LoadingCache<KafkaSourceDescriptor, AverageRecordSize> avgRecordSizeCache =
Preconditions.checkStateNotNull(this.avgRecordSizeCache);
final LoadingCache<KafkaSourceDescriptor, KafkaLatestOffsetEstimator> offsetEstimatorCache =
Preconditions.checkStateNotNull(this.offsetEstimatorCache);
try {
if (valueDeserializerInstance != null) {
Closeables.close(valueDeserializerInstance, true);
Expand All @@ -666,13 +703,13 @@ public void teardown() throws Exception {
} catch (Exception anyException) {
LOG.warn("Fail to close resource during finishing bundle.", anyException);
}

if (offsetEstimatorCache != null) {
offsetEstimatorCache.invalidateAll();
}
if (checkStopReadingFn != null) {
checkStopReadingFn.teardown();
}

// Allow the cache to perform clean up tasks when this instance is about to be deleted.
avgRecordSizeCache.cleanUp();
offsetEstimatorCache.cleanUp();
}

private Map<String, Object> overrideBootstrapServersConfig(
Expand Down

0 comments on commit 22a6bd0

Please sign in to comment.