diff --git a/sdks/java/io/jdbc/build.gradle b/sdks/java/io/jdbc/build.gradle index 2015bf173978..8c5fa685fdad 100644 --- a/sdks/java/io/jdbc/build.gradle +++ b/sdks/java/io/jdbc/build.gradle @@ -48,6 +48,8 @@ dependencies { testImplementation library.java.testcontainers_mysql testImplementation library.java.testcontainers_postgresql testImplementation 'mysql:mysql-connector-java:8.0.22' + // TODO(https://github.com/apache/beam/issues/31678) HikariCP 5.x requires Java11+ + testImplementation 'com.zaxxer:HikariCP:4.0.3' testRuntimeOnly library.java.slf4j_jdk14 testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java index 946c07f55763..ab2e3e07e817 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java @@ -39,6 +39,7 @@ import java.util.Collection; import java.util.HashSet; import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -55,6 +56,7 @@ import org.apache.beam.sdk.io.jdbc.JdbcUtil.PartitioningFn; import org.apache.beam.sdk.io.jdbc.SchemaUtil.FieldWithIndex; import org.apache.beam.sdk.metrics.Distribution; +import org.apache.beam.sdk.metrics.Lineage; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; @@ -93,6 +95,7 @@ import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.beam.sdk.values.TypeDescriptors.TypeVariableExtractor; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.apache.commons.dbcp2.BasicDataSource; import org.apache.commons.dbcp2.DataSourceConnectionFactory; import org.apache.commons.dbcp2.PoolableConnectionFactory; @@ -1608,6 +1611,7 @@ private static class ReadFn extends DoFn dataSourceProviderFn, @@ -1630,10 +1634,26 @@ public void setup() throws Exception { } private Connection getConnection() throws SQLException { - if (this.connection == null) { - this.connection = checkStateNotNull(this.dataSource).getConnection(); + Connection connection = this.connection; + if (connection == null) { + DataSource validSource = checkStateNotNull(this.dataSource); + connection = checkStateNotNull(validSource).getConnection(); + this.connection = connection; + + // report Lineage if not haven't done so + String table = JdbcUtil.extractTableFromReadQuery(query.get()); + if (!table.equals(reportedLineage)) { + JdbcUtil.FQNComponents fqn = JdbcUtil.FQNComponents.of(validSource); + if (fqn == null) { + fqn = JdbcUtil.FQNComponents.of(connection); + } + if (fqn != null) { + fqn.reportLineage(Lineage.getSources(), table); + reportedLineage = table; + } + } } - return this.connection; + return connection; } @ProcessElement @@ -2645,6 +2665,7 @@ abstract Builder setMaxBatchBufferingDuration( private @Nullable DataSource dataSource; private @Nullable Connection connection; private @Nullable PreparedStatement preparedStatement; + private @Nullable String reportedLineage; private static @Nullable FluentBackoff retryBackOff; public WriteFn(WriteFnSpec spec) { @@ -2677,11 +2698,28 @@ public void setup() { private Connection getConnection() throws SQLException { Connection connection = this.connection; if (connection == null) { - connection = checkStateNotNull(dataSource).getConnection(); + DataSource validSource = checkStateNotNull(dataSource); + connection = validSource.getConnection(); connection.setAutoCommit(false); preparedStatement = connection.prepareStatement(checkStateNotNull(spec.getStatement()).get()); this.connection = connection; + + // report Lineage if haven't done so + String table = spec.getTable(); + if (Strings.isNullOrEmpty(table) && spec.getStatement() != null) { + table = JdbcUtil.extractTableFromWriteQuery(spec.getStatement().get()); + } + if (!Objects.equals(table, reportedLineage)) { + JdbcUtil.FQNComponents fqn = JdbcUtil.FQNComponents.of(validSource); + if (fqn == null) { + fqn = JdbcUtil.FQNComponents.of(connection); + } + if (fqn != null) { + fqn.reportLineage(Lineage.getSinks(), table); + reportedLineage = table; + } + } } return connection; } diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java index b3f46492f745..c0f7d68899b3 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java @@ -19,12 +19,18 @@ import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; +import com.google.auto.value.AutoValue; import java.io.File; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.net.URI; import java.net.URL; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; import java.nio.file.Paths; +import java.sql.Connection; +import java.sql.DatabaseMetaData; import java.sql.Date; import java.sql.JDBCType; import java.sql.PreparedStatement; @@ -33,6 +39,7 @@ import java.sql.Time; import java.sql.Timestamp; import java.util.ArrayList; +import java.util.Arrays; import java.util.Calendar; import java.util.Collection; import java.util.EnumMap; @@ -40,12 +47,17 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Properties; import java.util.TimeZone; import java.util.UUID; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.IntStream; +import javax.sql.DataSource; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.metrics.Lineage; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.logicaltypes.FixedPrecisionNumeric; import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant; @@ -57,6 +69,8 @@ import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteStreams; @@ -563,4 +577,251 @@ public KV> mapRow(ResultSet resultSet) throws Excep } } }); + + @AutoValue + abstract static class JdbcUrl { + abstract String getScheme(); + + abstract @Nullable String getHostAndPort(); + + abstract String getDatabase(); + + /** + * Parse Jdbc Url String and return an {@link JdbcUrl} object, or return null for unsupported + * formats. + * + *

Example of supported format: + * + *

    + *
  • "jdbc:postgresql://localhost:5432/postgres" + *
  • "jdbc:mysql://127.0.0.1:3306/db" + *
  • "jdbc:oracle:thin:HR/hr@localhost:5221:orcl" + *
  • "jdbc:derby:memory:testDB;create=true" + *
  • "jdbc:oracle:thin:@//myhost.example.com:1521/my_service" + *
  • "jdbc:mysql:///cloud_sql" (GCP CloudSQL, supported if Connection name setup via + * HikariDataSource) + *
+ */ + static @Nullable JdbcUrl of(String url) { + if (Strings.isNullOrEmpty(url) || !url.startsWith("jdbc:")) { + return null; + } + String cleanUri = url.substring(5); + + // 1. Resolve the scheme + // handle sub-schemes e.g. oracle:thin (RAC) + int start = cleanUri.indexOf("//"); + if (start != -1) { + List subschemes = Splitter.on(':').splitToList(cleanUri.substring(0, start)); + cleanUri = subschemes.get(0) + ":" + cleanUri.substring(start); + } else { + // not a URI format e.g. oracle:thin (non-RAC); derby in memory + if (cleanUri.startsWith("derby:")) { + String scheme = "derby"; + int endUrl = cleanUri.indexOf(";"); + if (endUrl == -1) { + endUrl = cleanUri.length(); + } + List components = + Splitter.on(':').splitToList(cleanUri.substring("derby:".length(), endUrl)); + if (components.size() < 2) { + return null; + } + return new AutoValue_JdbcUtil_JdbcUrl(scheme, components.get(0), components.get(1)); + } else if (cleanUri.startsWith("oracle:thin:")) { + String scheme = "oracle"; + + int startHost = cleanUri.indexOf("@"); + if (startHost == -1) { + return null; + } + List components = Splitter.on(':').splitToList(cleanUri.substring(startHost + 1)); + if (components.size() < 3) { + return null; + } + return new AutoValue_JdbcUtil_JdbcUrl( + scheme, components.get(0) + ":" + components.get(1), components.get(2)); + } else { + return null; + } + } + + URI uri = URI.create(cleanUri); + String scheme = uri.getScheme(); + + // 2. resolve database + @Nullable String path = uri.getPath(); + if (path != null && path.startsWith("/")) { + path = path.substring(1); + } + if (path == null) { + return null; + } + + // 3. resolve host and port + // treat as self-managed SQL instance + @Nullable String hostAndPort = null; + @Nullable String host = uri.getHost(); + if (host != null) { + int port = uri.getPort(); + hostAndPort = port != -1 ? host + ":" + port : null; + } + return new AutoValue_JdbcUtil_JdbcUrl(scheme, hostAndPort, path); + } + } + + /** Jdbc fully qualified name components. */ + @AutoValue + abstract static class FQNComponents { + abstract String getScheme(); + + abstract Iterable getSegments(); + + void reportLineage(Lineage lineage, @Nullable String table) { + ImmutableList.Builder builder = ImmutableList.builder().addAll(getSegments()); + if (table != null && !table.isEmpty()) { + builder.add(table); + } + lineage.add(getScheme(), builder.build()); + } + + /** Fail-safely extract FQN from supported DataSource. Return null if failed. */ + static @Nullable FQNComponents of(DataSource dataSource) { + // Supported case CloudSql using HikariDataSource + // Had to retrieve properties via Reflection to avoid introduce mandatory Hikari dependencies + String maybeSqlInstance; + String url; + try { + Class hikariClass = Class.forName("com.zaxxer.hikari.HikariDataSource"); + if (!hikariClass.isInstance(dataSource)) { + return null; + } + Method getProperties = hikariClass.getMethod("getDataSourceProperties"); + Properties properties = (Properties) getProperties.invoke(dataSource); + if (properties == null) { + return null; + } + maybeSqlInstance = properties.getProperty("cloudSqlInstance"); + if (maybeSqlInstance == null) { + // not a cloudSqlInstance + return null; + } + Method getUrl = hikariClass.getMethod("getJdbcUrl"); + url = (String) getUrl.invoke(dataSource); + if (url == null) { + return null; + } + } catch (ClassNotFoundException + | InvocationTargetException + | IllegalAccessException + | NoSuchMethodException e) { + return null; + } + + JdbcUrl jdbcUrl = JdbcUrl.of(url); + if (jdbcUrl == null) { + LOG.info("Failed to parse JdbcUrl {}. Lineage will not be reported.", url); + return null; + } + + String scheme = "cloudsql_" + jdbcUrl.getScheme(); + ImmutableList.Builder segments = ImmutableList.builder(); + List sqlInstance = Arrays.asList(maybeSqlInstance.split(":")); + if (sqlInstance.size() > 3) { + // project name contains ":" + segments + .add(String.join(":", sqlInstance.subList(0, sqlInstance.size() - 2))) + .add(sqlInstance.get(sqlInstance.size() - 2)) + .add(sqlInstance.get(sqlInstance.size() - 1)); + } else { + segments.addAll(Arrays.asList(maybeSqlInstance.split(":"))); + } + segments.add(jdbcUrl.getDatabase()); + return new AutoValue_JdbcUtil_FQNComponents(scheme, segments.build()); + } + + /** Fail-safely extract FQN from an active connection. Return null if failed. */ + static @Nullable FQNComponents of(Connection connection) { + try { + DatabaseMetaData metadata = connection.getMetaData(); + if (metadata == null) { + // usually not-null, but can be null when running a mock + return null; + } + String url = metadata.getURL(); + if (url == null) { + // usually not-null, but can be null when running a mock + return null; + } + return of(url); + } catch (Exception e) { + // suppressed + return null; + } + } + + /** + * Fail-safely parse FQN from a Jdbc URL. Return null if failed. + * + *

e.g. + * + *

jdbc:postgresql://localhost:5432/postgres -> (postgresql, [localhost:5432, postgres]) + * + *

jdbc:mysql://127.0.0.1:3306/db -> (mysql, [127.0.0.1:3306, db]) + */ + @VisibleForTesting + static @Nullable FQNComponents of(String url) { + JdbcUrl jdbcUrl = JdbcUrl.of(url); + if (jdbcUrl == null || jdbcUrl.getHostAndPort() == null) { + LOG.info("Failed to parse JdbcUrl {}. Lineage will not be reported.", url); + return null; + } + String hostAndPort = jdbcUrl.getHostAndPort(); + if (hostAndPort == null) { + LOG.info("Failed to parse host/port from JdbcUrl {}. Lineage will not be reported.", url); + return null; + } + + return new AutoValue_JdbcUtil_FQNComponents( + jdbcUrl.getScheme(), ImmutableList.of(hostAndPort, jdbcUrl.getDatabase())); + } + } + + private static final Pattern READ_STATEMENT_PATTERN = + Pattern.compile( + "SELECT\\s+.+?\\s+FROM\\s+\\[?(?[^\\s\\[\\]]+)\\]?", Pattern.CASE_INSENSITIVE); + + private static final Pattern WRITE_STATEMENT_PATTERN = + Pattern.compile( + "INSERT\\s+INTO\\s+\\[?(?[^\\s\\[\\]]+)\\]?", Pattern.CASE_INSENSITIVE); + + /** Extract table name a SELECT statement. Return empty string if fail to extract. */ + static String extractTableFromReadQuery(@Nullable String query) { + if (query == null) { + return ""; + } + Matcher matchRead = READ_STATEMENT_PATTERN.matcher(query); + if (matchRead.find()) { + String matched = matchRead.group("tableName"); + if (matched != null) { + return matched; + } + } + return ""; + } + + /** Extract table name from an INSERT statement. Return empty string if fail to extract. */ + static String extractTableFromWriteQuery(@Nullable String query) { + if (query == null) { + return ""; + } + Matcher matchRead = WRITE_STATEMENT_PATTERN.matcher(query); + if (matchRead.find()) { + String matched = matchRead.group("tableName"); + if (matched != null) { + return matched; + } + } + return ""; + } } diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java index 013fc7996a95..a04f8c4e762f 100644 --- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java +++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java @@ -21,6 +21,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.closeTo; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertEquals; @@ -71,6 +72,7 @@ import org.apache.beam.sdk.io.jdbc.JdbcIO.DataSourceConfiguration; import org.apache.beam.sdk.io.jdbc.JdbcIO.PoolableDataSourceProvider; import org.apache.beam.sdk.io.jdbc.JdbcUtil.PartitioningFn; +import org.apache.beam.sdk.metrics.Lineage; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.logicaltypes.FixedPrecisionNumeric; @@ -243,7 +245,10 @@ public void testRead() { Iterable expectedValues = TestRow.getExpectedValues(0, EXPECTED_ROW_COUNT); PAssert.that(rows).containsInAnyOrder(expectedValues); - pipeline.run(); + PipelineResult result = pipeline.run(); + assertThat( + Lineage.query(result.metrics(), Lineage.Type.SOURCE), + hasItem(Lineage.getFqName("derby", ImmutableList.of("memory", "testDB", READ_TABLE_NAME)))); } @Test @@ -263,7 +268,10 @@ public void testReadWithSingleStringParameter() { Iterable expectedValues = Collections.singletonList(TestRow.fromSeed(1)); PAssert.that(rows).containsInAnyOrder(expectedValues); - pipeline.run(); + PipelineResult result = pipeline.run(); + assertThat( + Lineage.query(result.metrics(), Lineage.Type.SOURCE), + hasItem(Lineage.getFqName("derby", ImmutableList.of("memory", "testDB", READ_TABLE_NAME)))); } @Test @@ -531,9 +539,11 @@ public void testWrite() throws Exception { ArrayList> data = getDataToWrite(EXPECTED_ROW_COUNT); pipeline.apply(Create.of(data)).apply(getJdbcWrite(tableName)); - pipeline.run(); - + PipelineResult result = pipeline.run(); assertRowCount(DATA_SOURCE, tableName, EXPECTED_ROW_COUNT); + assertThat( + Lineage.query(result.metrics(), Lineage.Type.SINK), + hasItem(Lineage.getFqName("derby", ImmutableList.of("memory", "testDB", tableName)))); } finally { DatabaseTestHelper.deleteTable(DATA_SOURCE, tableName); } diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcUtilTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcUtilTest.java index 5b2e9f27f0a8..356d6c7f8de7 100644 --- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcUtilTest.java +++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcUtilTest.java @@ -22,7 +22,10 @@ import static org.hamcrest.number.IsCloseTo.closeTo; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; import java.io.File; import java.io.IOException; import java.net.URL; @@ -34,12 +37,17 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.List; +import java.util.Map.Entry; import java.util.Random; +import javax.sql.DataSource; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.DateTime; import org.junit.Rule; import org.junit.Test; @@ -264,4 +272,97 @@ public void testSavesFilesAsExpected() throws IOException { expectedContent2, new String(Files.readAllBytes(Paths.get(urls[1].getFile())), StandardCharsets.UTF_8)); } + + @Test + public void testJdbcUrl() { + ImmutableMap> testCases = + ImmutableMap.>builder() + .put( + "jdbc:postgresql://localhost:5432/postgres", + ImmutableList.of("postgresql", "localhost:5432", "postgres")) + .put( + "jdbc:mysql://127.0.0.1:3306/db", ImmutableList.of("mysql", "127.0.0.1:3306", "db")) + .put( + "jdbc:oracle:thin:HR/hr@localhost:5221:orcl", + ImmutableList.of("oracle", "localhost:5221", "orcl")) + .put( + "jdbc:derby:memory:testDB;create=true", + ImmutableList.of("derby", "memory", "testDB")) + .put( + "jdbc:oracle:thin:@//myhost.example.com:1521/my_service", + ImmutableList.of("oracle", "myhost.example.com:1521", "my_service")) + .put("jdbc:mysql:///cloud_sql", ImmutableList.of("mysql", "", "cloud_sql")) + .put("invalid", ImmutableList.of()) + .build(); + for (Entry> entry : testCases.entrySet()) { + JdbcUtil.JdbcUrl jdbcUrl = JdbcUtil.JdbcUrl.of(entry.getKey()); + + System.out.println(entry.getKey()); + if (entry.getValue().equals(ImmutableList.of())) { + assertNull(jdbcUrl); + } else { + assertEquals(entry.getValue().get(0), jdbcUrl.getScheme()); + assertEquals( + entry.getValue().get(1), + jdbcUrl.getHostAndPort() == null ? "" : jdbcUrl.getHostAndPort()); + assertEquals(entry.getValue().get(2), jdbcUrl.getDatabase()); + } + } + } + + @Test + public void testFqnFromHikariDataSourcePostgreSql() { + HikariConfig config = new HikariConfig(); + config.setJdbcUrl("jdbc:postgresql:///postgres"); + config.setUsername("postgres"); + config.addDataSourceProperty( + "cloudSqlInstance", "example.com:project:some-region:instance-name"); + // instead of `new HikariDataSource(config)`, initialize an empty source to avoid creation + // of actual connection pool + DataSource dataSource = new HikariDataSource(); + config.validate(); + config.copyStateTo((HikariConfig) dataSource); + JdbcUtil.FQNComponents components = JdbcUtil.FQNComponents.of(dataSource); + assertEquals("cloudsql_postgresql", components.getScheme()); + assertEquals( + ImmutableList.of("example.com:project", "some-region", "instance-name", "postgres"), + components.getSegments()); + } + + @Test + public void testFqnFromHikariDataSourceMySql() { + HikariConfig config = new HikariConfig(); + config.setJdbcUrl("jdbc:mysql:///db"); + config.setUsername("root"); + config.addDataSourceProperty("cloudSqlInstance", "some-project:US:instance-name"); + // instead of `new HikariDataSource(config)`, initialize an empty source to avoid creation + // of actual connection pool + DataSource dataSource = new HikariDataSource(); + config.validate(); + config.copyStateTo((HikariConfig) dataSource); + JdbcUtil.FQNComponents components = JdbcUtil.FQNComponents.of(dataSource); + assertEquals("cloudsql_mysql", components.getScheme()); + assertEquals( + ImmutableList.of("some-project", "US", "instance-name", "db"), components.getSegments()); + } + + @Test + public void testExtractTableFromQuery() { + ImmutableList> readCases = + ImmutableList.of( + KV.of("select * from table_1", "table_1"), + KV.of("SELECT a, b FROM [table-2]", "table-2"), + KV.of("drop table not-select", "")); + for (KV testCase : readCases) { + assertEquals(testCase.getValue(), JdbcUtil.extractTableFromReadQuery(testCase.getKey())); + } + ImmutableList> writeCases = + ImmutableList.of( + KV.of("insert into table_1 values ...", "table_1"), + KV.of("INSERT INTO [table-2] values ...", "table-2"), + KV.of("drop table not-select", "")); + for (KV testCase : writeCases) { + assertEquals(testCase.getValue(), JdbcUtil.extractTableFromWriteQuery(testCase.getKey())); + } + } }