-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add topicPattern property to KafkaIO.Read to match topics using a regex #26948
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,6 +37,7 @@ | |
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.Set; | ||
import java.util.regex.Pattern; | ||
import java.util.stream.Collectors; | ||
import org.apache.beam.runners.core.construction.PTransformMatchers; | ||
import org.apache.beam.runners.core.construction.ReplacementOutputs; | ||
|
@@ -350,10 +351,11 @@ | |
* href="https://beam.apache.org/blog/splittable-do-fn/">blog post</a> and <a | ||
* href="https://s.apache.org/beam-fn-api">design doc</a>. The major difference from {@link | ||
* KafkaIO.Read} is, {@link ReadSourceDescriptors} doesn't require source descriptions(e.g., {@link | ||
* KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link | ||
* KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction time. Instead, the | ||
* pipeline can populate these source descriptions during runtime. For example, the pipeline can | ||
* query Kafka topics from a BigQuery table and read these topics via {@link ReadSourceDescriptors}. | ||
* KafkaIO.Read#getTopicPattern()}, {@link KafkaIO.Read#getTopicPartitions()}, {@link | ||
* KafkaIO.Read#getTopics()}, {@link KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline | ||
* construction time. Instead, the pipeline can populate these source descriptions during runtime. | ||
* For example, the pipeline can query Kafka topics from a BigQuery table and read these topics via | ||
* {@link ReadSourceDescriptors}. | ||
* | ||
* <h3>Common Kafka Consumer Configurations</h3> | ||
* | ||
|
@@ -633,6 +635,9 @@ public abstract static class Read<K, V> | |
@Pure | ||
abstract @Nullable List<TopicPartition> getTopicPartitions(); | ||
|
||
@Pure | ||
abstract @Nullable Pattern getTopicPattern(); | ||
|
||
@Pure | ||
abstract @Nullable Coder<K> getKeyCoder(); | ||
|
||
|
@@ -692,6 +697,8 @@ abstract static class Builder<K, V> { | |
|
||
abstract Builder<K, V> setTopicPartitions(List<TopicPartition> topicPartitions); | ||
|
||
abstract Builder<K, V> setTopicPattern(Pattern topicPattern); | ||
|
||
abstract Builder<K, V> setKeyCoder(Coder<K> keyCoder); | ||
|
||
abstract Builder<K, V> setValueCoder(Coder<V> valueCoder); | ||
|
@@ -922,8 +929,9 @@ public Read<K, V> withTopic(String topic) { | |
*/ | ||
public Read<K, V> withTopics(List<String> topics) { | ||
checkState( | ||
getTopicPartitions() == null || getTopicPartitions().isEmpty(), | ||
"Only topics or topicPartitions can be set, not both"); | ||
(getTopicPartitions() == null || getTopicPartitions().isEmpty()) | ||
&& getTopicPattern() == null, | ||
"Only one of topics, topicPartitions or topicPattern can be set"); | ||
return toBuilder().setTopics(ImmutableList.copyOf(topics)).build(); | ||
} | ||
|
||
|
@@ -936,11 +944,26 @@ public Read<K, V> withTopics(List<String> topics) { | |
*/ | ||
public Read<K, V> withTopicPartitions(List<TopicPartition> topicPartitions) { | ||
checkState( | ||
getTopics() == null || getTopics().isEmpty(), | ||
"Only topics or topicPartitions can be set, not both"); | ||
(getTopics() == null || getTopics().isEmpty()) && getTopicPattern() == null, | ||
"Only one of topics, topicPartitions or topicPattern can be set"); | ||
return toBuilder().setTopicPartitions(ImmutableList.copyOf(topicPartitions)).build(); | ||
} | ||
|
||
/** | ||
* Sets a {@link java.util.regex.Pattern} of topics to read from. All the partitions from each | ||
* of the matching topics are read. | ||
* | ||
* <p>See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for description of how the | ||
* partitions are distributed among the splits. | ||
*/ | ||
public Read<K, V> withTopicPattern(Pattern topicPattern) { | ||
checkState( | ||
(getTopics() == null || getTopics().isEmpty()) | ||
&& (getTopicPartitions() == null || getTopicPartitions().isEmpty()), | ||
"Only one of topics, topicPartitions or topicPattern can be set"); | ||
return toBuilder().setTopicPattern(topicPattern).build(); | ||
} | ||
|
||
/** | ||
* Sets a Kafka {@link Deserializer} to interpret key bytes read from Kafka. | ||
* | ||
|
@@ -1274,8 +1297,9 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) { | |
if (!isDynamicRead()) { | ||
checkArgument( | ||
(getTopics() != null && getTopics().size() > 0) | ||
|| (getTopicPartitions() != null && getTopicPartitions().size() > 0), | ||
"Either withTopic(), withTopics() or withTopicPartitions() is required"); | ||
|| (getTopicPartitions() != null && getTopicPartitions().size() > 0) | ||
|| getTopicPattern() != null, | ||
"Either withTopic(), withTopics(), withTopicPartitions() or withTopicPattern() is required"); | ||
} else { | ||
checkArgument( | ||
ExperimentalOptions.hasExperiment(input.getPipeline().getOptions(), "beam_fn_api"), | ||
|
@@ -1537,6 +1561,7 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) { | |
kafkaRead.getConsumerConfig(), | ||
kafkaRead.getCheckStopReadingFn(), | ||
topics, | ||
kafkaRead.getTopicPattern(), | ||
kafkaRead.getStartReadTime(), | ||
kafkaRead.getStopReadTime())); | ||
} else { | ||
|
@@ -1561,6 +1586,7 @@ static class GenerateKafkaSourceDescriptor extends DoFn<byte[], KafkaSourceDescr | |
this.consumerFactoryFn = read.getConsumerFactoryFn(); | ||
this.topics = read.getTopics(); | ||
this.topicPartitions = read.getTopicPartitions(); | ||
this.topicPattern = read.getTopicPattern(); | ||
this.startReadTime = read.getStartReadTime(); | ||
this.stopReadTime = read.getStopReadTime(); | ||
} | ||
|
@@ -1578,15 +1604,30 @@ static class GenerateKafkaSourceDescriptor extends DoFn<byte[], KafkaSourceDescr | |
|
||
@VisibleForTesting final @Nullable List<String> topics; | ||
|
||
private final @Nullable Pattern topicPattern; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not annotated with |
||
|
||
@ProcessElement | ||
public void processElement(OutputReceiver<KafkaSourceDescriptor> receiver) { | ||
List<TopicPartition> partitions = | ||
new ArrayList<>(Preconditions.checkStateNotNull(topicPartitions)); | ||
if (partitions.isEmpty()) { | ||
try (Consumer<?, ?> consumer = consumerFactoryFn.apply(consumerConfig)) { | ||
for (String topic : Preconditions.checkStateNotNull(topics)) { | ||
for (PartitionInfo p : consumer.partitionsFor(topic)) { | ||
partitions.add(new TopicPartition(p.topic(), p.partition())); | ||
List<String> topics = Preconditions.checkStateNotNull(this.topics); | ||
if (topics.isEmpty()) { | ||
Pattern pattern = Preconditions.checkStateNotNull(topicPattern); | ||
for (Map.Entry<String, List<PartitionInfo>> entry : | ||
consumer.listTopics().entrySet()) { | ||
if (pattern.matcher(entry.getKey()).matches()) { | ||
for (PartitionInfo p : entry.getValue()) { | ||
partitions.add(new TopicPartition(p.topic(), p.partition())); | ||
} | ||
} | ||
} | ||
} else { | ||
for (String topic : topics) { | ||
for (PartitionInfo p : consumer.partitionsFor(topic)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any chance this is null at this point? In the split below you have null checks, but not here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this referring to As far as Kafka's topic metadata goes,
|
||
partitions.add(new TopicPartition(p.topic(), p.partition())); | ||
} | ||
} | ||
} | ||
} | ||
|
@@ -1634,12 +1675,16 @@ public void populateDisplayData(DisplayData.Builder builder) { | |
super.populateDisplayData(builder); | ||
List<String> topics = Preconditions.checkStateNotNull(getTopics()); | ||
List<TopicPartition> topicPartitions = Preconditions.checkStateNotNull(getTopicPartitions()); | ||
Pattern topicPattern = getTopicPattern(); | ||
if (topics.size() > 0) { | ||
builder.add(DisplayData.item("topics", Joiner.on(",").join(topics)).withLabel("Topic/s")); | ||
} else if (topicPartitions.size() > 0) { | ||
builder.add( | ||
DisplayData.item("topicPartitions", Joiner.on(",").join(topicPartitions)) | ||
.withLabel("Topic Partition/s")); | ||
} else if (topicPattern != null) { | ||
builder.add( | ||
DisplayData.item("topicPattern", topicPattern.pattern()).withLabel("Topic Pattern")); | ||
} | ||
Set<String> disallowedConsumerPropertiesKeys = | ||
KafkaIOUtils.DISALLOWED_CONSUMER_PROPERTIES.keySet(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can change this API to use a portable type (for example, a string regex) so that this can be supported via cross-language wrappers ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Read.External.Configuration
has fields forkeyDeserializer
andvalueDeserializer
which are resolved fromString
toClass
during external transform construction, does it make sense to provide a mapping there instead?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally we don't want to be using external read configuration as a pattern. It makes it so the configurations for an IO diverge for python v. java users
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. Would an additional method overload with
String topicPattern
help at all?Otherwise I'll change the method signature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can have both
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a significant advantage to using "Pattern" over a string regex ?
If it's a perf issue, we could just build the Patter object once within "withTopicPattern" and use that ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly to make sure that pattern flags can be specified by users, but nearly all (except
LITERAL
andCANON_EQ
) can be specified in the expression as well.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. If there is no significant advantage for specifying a Pattern object I would just support specifying a String regex to keep the API simple.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.