Skip to content

Commit

Permalink
Lineage support for JdbcIO (#33062)
Browse files Browse the repository at this point in the history
* Lineage support for JdbcIO

* Report table the pipeline read from and write to

* add logs and documentation
  • Loading branch information
Abacn authored Nov 13, 2024
1 parent 87e251a commit f25ac69
Show file tree
Hide file tree
Showing 5 changed files with 420 additions and 8 deletions.
2 changes: 2 additions & 0 deletions sdks/java/io/jdbc/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ dependencies {
testImplementation library.java.testcontainers_mysql
testImplementation library.java.testcontainers_postgresql
testImplementation 'mysql:mysql-connector-java:8.0.22'
// TODO(https://github.com/apache/beam/issues/31678) HikariCP 5.x requires Java11+
testImplementation 'com.zaxxer:HikariCP:4.0.3'

testRuntimeOnly library.java.slf4j_jdk14
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
Expand All @@ -55,6 +56,7 @@
import org.apache.beam.sdk.io.jdbc.JdbcUtil.PartitioningFn;
import org.apache.beam.sdk.io.jdbc.SchemaUtil.FieldWithIndex;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
Expand Down Expand Up @@ -93,6 +95,7 @@
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.values.TypeDescriptors.TypeVariableExtractor;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.commons.dbcp2.DataSourceConnectionFactory;
import org.apache.commons.dbcp2.PoolableConnectionFactory;
Expand Down Expand Up @@ -1608,6 +1611,7 @@ private static class ReadFn<ParameterT, OutputT> extends DoFn<ParameterT, Output

private @Nullable DataSource dataSource;
private @Nullable Connection connection;
private @Nullable String reportedLineage;

private ReadFn(
SerializableFunction<Void, DataSource> dataSourceProviderFn,
Expand All @@ -1630,10 +1634,26 @@ public void setup() throws Exception {
}

private Connection getConnection() throws SQLException {
if (this.connection == null) {
this.connection = checkStateNotNull(this.dataSource).getConnection();
Connection connection = this.connection;
if (connection == null) {
DataSource validSource = checkStateNotNull(this.dataSource);
connection = checkStateNotNull(validSource).getConnection();
this.connection = connection;

// report Lineage if not haven't done so
String table = JdbcUtil.extractTableFromReadQuery(query.get());
if (!table.equals(reportedLineage)) {
JdbcUtil.FQNComponents fqn = JdbcUtil.FQNComponents.of(validSource);
if (fqn == null) {
fqn = JdbcUtil.FQNComponents.of(connection);
}
if (fqn != null) {
fqn.reportLineage(Lineage.getSources(), table);
reportedLineage = table;
}
}
}
return this.connection;
return connection;
}

@ProcessElement
Expand Down Expand Up @@ -2645,6 +2665,7 @@ abstract Builder<T, V> setMaxBatchBufferingDuration(
private @Nullable DataSource dataSource;
private @Nullable Connection connection;
private @Nullable PreparedStatement preparedStatement;
private @Nullable String reportedLineage;
private static @Nullable FluentBackoff retryBackOff;

public WriteFn(WriteFnSpec<T, V> spec) {
Expand Down Expand Up @@ -2677,11 +2698,28 @@ public void setup() {
private Connection getConnection() throws SQLException {
Connection connection = this.connection;
if (connection == null) {
connection = checkStateNotNull(dataSource).getConnection();
DataSource validSource = checkStateNotNull(dataSource);
connection = validSource.getConnection();
connection.setAutoCommit(false);
preparedStatement =
connection.prepareStatement(checkStateNotNull(spec.getStatement()).get());
this.connection = connection;

// report Lineage if haven't done so
String table = spec.getTable();
if (Strings.isNullOrEmpty(table) && spec.getStatement() != null) {
table = JdbcUtil.extractTableFromWriteQuery(spec.getStatement().get());
}
if (!Objects.equals(table, reportedLineage)) {
JdbcUtil.FQNComponents fqn = JdbcUtil.FQNComponents.of(validSource);
if (fqn == null) {
fqn = JdbcUtil.FQNComponents.of(connection);
}
if (fqn != null) {
fqn.reportLineage(Lineage.getSinks(), table);
reportedLineage = table;
}
}
}
return connection;
}
Expand Down
Loading

0 comments on commit f25ac69

Please sign in to comment.