Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add topicPattern property to KafkaIO.Read to match topics using a regex #26948

Merged
merged 3 commits into from
Jun 30, 2023

Conversation

sjvanrossum
Copy link
Contributor

@sjvanrossum sjvanrossum commented May 31, 2023

This addresses #19217 and #21338, matching Apache Flink's KafkaSource property topicPattern.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI.

@@ -1578,15 +1604,30 @@ static class GenerateKafkaSourceDescriptor extends DoFn<byte[], KafkaSourceDescr

@VisibleForTesting final @Nullable List<String> topics;

private final @Nullable Pattern topicPattern;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not annotated with @VisibleForTesting since the property is not accessed directly in tests, but neither are the properties which do specify @VisibleForTesting.

@github-actions
Copy link
Contributor

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @bvolpato for label java.
R: @pabloem for label io.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

}
} else {
for (String topic : topics) {
for (PartitionInfo p : consumer.partitionsFor(topic)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any chance this is null at this point? In the split below you have null checks, but not here.

Copy link
Contributor Author

@sjvanrossum sjvanrossum Jun 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this referring to topics? Both topicPartitions and topics are initialized as empty lists in the builder by default and replaced using .withTopics() and .withTopicPartitions(). The previous Preconditions.checkStateNotNull(topics) expression in the for loop should still not be null under any circumstance. Special care should be taken to carry that property forward when we add support for this property in KafkaIO's ExternalTransformRegistrar though, since it doesn't guarantee the same object state the builder guarantees.
In regards to topicPattern, if both topicPartitions and topics are empty, then topicPattern must be non-null, since the PTransform's expansion checks that at least one of those properties is set and the .withX() builder methods check that none are previously set.

As far as Kafka's topic metadata goes, .partitionsFor() will throw an exception if an unauthorized topic is requested and .listTopics() will only list all authorized topics. Both methods return initialized objects and ensure that potential null responses from the server are translated to empty collections (as far back as org.apache.kafka:kafka-clients:0.11.0.3) or throw an exception in the case of an authorization failure. I'd say that the existing check on partitionInfoList seems superfluous and could potentially be considered for deletion:

List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);
checkState(
    partitionInfoList != null,
    "Could not find any partitions info. Please check Kafka configuration and make sure "
        + "that provided topics exist.");
for (PartitionInfo p : partitionInfoList) {
  partitions.add(new TopicPartition(p.topic(), p.partition()));
}

@github-actions
Copy link
Contributor

github-actions bot commented Jun 8, 2023

Reminder, please take a look at this pr: @bvolpato @pabloem

@github-actions
Copy link
Contributor

Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer:

R: @robertwb for label java.
R: @chamikaramj for label io.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

@bvolpato
Copy link
Contributor

@pabloem @johnjcasey can you please TAL / merge? Thanks!

@github-actions
Copy link
Contributor

Reminder, please take a look at this pr: @robertwb @chamikaramj

@@ -723,6 +724,31 @@ public void testUnboundedSourceWithExplicitPartitions() {
p.run();
}

@Test
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a test, or update this test, such that you don't match all the topics?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added one for partial matches and one for no matches.

@sjvanrossum
Copy link
Contributor Author

@johnjcasey Unit tests are failing early on a missing dependency.
The Avro plugin version in use for Beam seems to have been yanked and all versions of the published artifact com.commercehub.gradle.plugin:gradle-avro-plugin are obsolete.

It has been superseded by com.github.davidmc24.gradle.plugin.avro it seems, see changelog.
I'm not sure what the compatibility story is between <1.0.0 and >=1.0.0, but upgrading seems prudent.

@github-actions
Copy link
Contributor

Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer:

R: @Abacn for label java.
R: @Abacn for label io.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

@johnjcasey
Copy link
Contributor

run RAT PreCommit

@johnjcasey
Copy link
Contributor

Tests appear to be building fine to me

* <p>See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for description of how the
* partitions are distributed among the splits.
*/
public Read<K, V> withTopicPattern(Pattern topicPattern) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can change this API to use a portable type (for example, a string regex) so that this can be supported via cross-language wrappers ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Read.External.Configuration has fields for keyDeserializer and valueDeserializer which are resolved from String to Class during external transform construction, does it make sense to provide a mapping there instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we don't want to be using external read configuration as a pattern. It makes it so the configurations for an IO diverge for python v. java users

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Would an additional method overload with String topicPattern help at all?
Otherwise I'll change the method signature.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can have both

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a significant advantage to using "Pattern" over a string regex ?

If it's a perf issue, we could just build the Patter object once within "withTopicPattern" and use that ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly to make sure that pattern flags can be specified by users, but nearly all (except LITERAL and CANON_EQ) can be specified in the expression as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. If there is no significant advantage for specifying a Pattern object I would just support specifying a String regex to keep the API simple.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. LGTM.

@chamikaramj chamikaramj merged commit c36f0f1 into apache:master Jun 30, 2023
aleksandr-dudko pushed a commit to aleksandr-dudko/beam that referenced this pull request Jul 7, 2023
…ex (apache#26948)

* Add topicPattern property to KafkaIO.Read to match topics using a regex

* Add partially matched and unmatched topic pattern test

* Change method signature of withTopicPattern to use String
aleksandr-dudko pushed a commit to aleksandr-dudko/beam that referenced this pull request Jul 7, 2023
…ex (apache#26948)

* Add topicPattern property to KafkaIO.Read to match topics using a regex

* Add partially matched and unmatched topic pattern test

* Change method signature of withTopicPattern to use String
aleksandr-dudko pushed a commit to aleksandr-dudko/beam that referenced this pull request Jul 10, 2023
…ex (apache#26948)

* Add topicPattern property to KafkaIO.Read to match topics using a regex

* Add partially matched and unmatched topic pattern test

* Change method signature of withTopicPattern to use String
aleksandr-dudko pushed a commit to aleksandr-dudko/beam that referenced this pull request Jul 17, 2023
…ex (apache#26948)

* Add topicPattern property to KafkaIO.Read to match topics using a regex

* Add partially matched and unmatched topic pattern test

* Change method signature of withTopicPattern to use String
cushon pushed a commit to cushon/beam that referenced this pull request May 24, 2024
…ex (apache#26948)

* Add topicPattern property to KafkaIO.Read to match topics using a regex

* Add partially matched and unmatched topic pattern test

* Change method signature of withTopicPattern to use String
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants