diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index 0e889da660f1..c32f5fa7b7ab 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -174,10 +174,11 @@ corresponds to that version. | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------| -| 0.37.1 | 2024-06-10 | [\#38075](https://github.com/airbytehq/airbyte/pull/38075) | Destinations: Track stream statuses in async framework | -| 0.37.0 | 2024-06-10 | [\#38121](https://github.com/airbytehq/airbyte/pull/38121) | Destinations: Set default namespace via CatalogParser | -| 0.36.8 | 2024-06-07 | [\#38763](https://github.com/airbytehq/airbyte/pull/38763) | Increase Jackson message length limit | -| 0.36.7 | 2024-06-06 | [\#39220](https://github.com/airbytehq/airbyte/pull/39220) | Handle null messages in ConnectorExceptionUtil | +| 0.38.0 | 2024-06-11 | [\#39405](https://github.com/airbytehq/airbyte/pull/39405) | Sources: Debezium properties manager interface changed to accept a list of streams to scope to | +| 0.37.1 | 2024-06-10 | [\#38075](https://github.com/airbytehq/airbyte/pull/38075) | Destinations: Track stream statuses in async framework | +| 0.37.0 | 2024-06-10 | [\#38121](https://github.com/airbytehq/airbyte/pull/38121) | Destinations: Set default namespace via CatalogParser | +| 0.36.8 | 2024-06-07 | [\#38763](https://github.com/airbytehq/airbyte/pull/38763) | Increase Jackson message length limit | +| 0.36.7 | 2024-06-06 | [\#39220](https://github.com/airbytehq/airbyte/pull/39220) | Handle null messages in ConnectorExceptionUtil | | 0.36.6 | 2024-06-05 | [\#39106](https://github.com/airbytehq/airbyte/pull/39106) | Skip write to storage with 0 byte file | | 0.36.5 | 2024-06-01 | [\#38792](https://github.com/airbytehq/airbyte/pull/38792) | Throw config exception if no selectable table exists in user provided schemas | | 0.36.4 | 2024-05-31 | [\#38824](https://github.com/airbytehq/airbyte/pull/38824) | Param marked as non-null to nullable in JdbcDestinationHandler for NPE fix | diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index 0baca54913ec..774491a859f9 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.37.3 +version=0.38.0 \ No newline at end of file diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumPropertiesManager.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumPropertiesManager.kt index 5edfd1656b45..513939047816 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumPropertiesManager.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumPropertiesManager.kt @@ -11,7 +11,8 @@ import java.util.* abstract class DebeziumPropertiesManager( private val properties: Properties, private val config: JsonNode, - private val catalog: ConfiguredAirbyteCatalog + private val catalog: ConfiguredAirbyteCatalog, + private val streamNames: List ) { fun getDebeziumProperties(offsetManager: AirbyteFileOffsetBackingStore): Properties { return getDebeziumProperties(offsetManager, Optional.empty()) @@ -73,7 +74,7 @@ abstract class DebeziumPropertiesManager( // following props.setProperty("value.converter.replace.null.with.default", "false") // includes - props.putAll(getIncludeConfiguration(catalog, config)) + props.putAll(getIncludeConfiguration(catalog, config, streamNames)) return props } @@ -84,7 +85,8 @@ abstract class DebeziumPropertiesManager( protected abstract fun getIncludeConfiguration( catalog: ConfiguredAirbyteCatalog, - config: JsonNode? + config: JsonNode?, + streamNames: List ): Properties companion object { diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/RelationalDbDebeziumPropertiesManager.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/RelationalDbDebeziumPropertiesManager.kt index eba1ab42d338..d1ef37a27b4d 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/RelationalDbDebeziumPropertiesManager.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/RelationalDbDebeziumPropertiesManager.kt @@ -18,8 +18,9 @@ import org.codehaus.plexus.util.StringUtils class RelationalDbDebeziumPropertiesManager( properties: Properties, config: JsonNode, - catalog: ConfiguredAirbyteCatalog -) : DebeziumPropertiesManager(properties, config, catalog) { + catalog: ConfiguredAirbyteCatalog, + completedStreamNames: List +) : DebeziumPropertiesManager(properties, config, catalog, completedStreamNames) { override fun getConnectionConfiguration(config: JsonNode): Properties { val properties = Properties() @@ -42,20 +43,24 @@ class RelationalDbDebeziumPropertiesManager( override fun getIncludeConfiguration( catalog: ConfiguredAirbyteCatalog, - config: JsonNode? + config: JsonNode?, + streamNames: List ): Properties { val properties = Properties() // table selection - properties.setProperty("table.include.list", getTableIncludelist(catalog)) + properties.setProperty("table.include.list", getTableIncludelist(catalog, streamNames)) // column selection - properties.setProperty("column.include.list", getColumnIncludeList(catalog)) + properties.setProperty("column.include.list", getColumnIncludeList(catalog, streamNames)) return properties } companion object { - fun getTableIncludelist(catalog: ConfiguredAirbyteCatalog): String { + fun getTableIncludelist( + catalog: ConfiguredAirbyteCatalog, + completedStreamNames: List + ): String { // Turn "stream": { // "namespace": "schema1" // "name": "table1 @@ -69,13 +74,17 @@ class RelationalDbDebeziumPropertiesManager( .filter { s: ConfiguredAirbyteStream -> s.syncMode == SyncMode.INCREMENTAL } .map { obj: ConfiguredAirbyteStream -> obj.stream } .map { stream: AirbyteStream -> stream.namespace + "." + stream.name } + .filter { streamName: String -> completedStreamNames.contains(streamName) } // debezium needs commas escaped to split properly .joinToString(",") { x: String -> StringUtils.escape(Pattern.quote(x), ",".toCharArray(), "\\,") } } - fun getColumnIncludeList(catalog: ConfiguredAirbyteCatalog): String { + fun getColumnIncludeList( + catalog: ConfiguredAirbyteCatalog, + completedStreamNames: List + ): String { // Turn "stream": { // "namespace": "schema1" // "name": "table1" @@ -92,6 +101,9 @@ class RelationalDbDebeziumPropertiesManager( return catalog.streams .filter { s: ConfiguredAirbyteStream -> s.syncMode == SyncMode.INCREMENTAL } .map { obj: ConfiguredAirbyteStream -> obj.stream } + .filter { stream: AirbyteStream -> + completedStreamNames.contains(stream.namespace + "." + stream.name) + } .map { s: AirbyteStream -> val fields = parseFields(s.jsonSchema["properties"].fieldNames()) Pattern.quote(s.namespace + "." + s.name) + diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/debezium/DebeziumRecordPublisherTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/debezium/DebeziumRecordPublisherTest.kt index 8a23f58e748b..02c789a8d202 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/debezium/DebeziumRecordPublisherTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/debezium/DebeziumRecordPublisherTest.kt @@ -9,6 +9,7 @@ import io.airbyte.protocol.models.Field import io.airbyte.protocol.models.JsonSchemaType import io.airbyte.protocol.models.v0.CatalogHelpers import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog +import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream import io.airbyte.protocol.models.v0.SyncMode import java.util.regex.Pattern import org.junit.jupiter.api.Assertions @@ -26,13 +27,17 @@ internal class DebeziumRecordPublisherTest { CatalogHelpers.createConfiguredAirbyteStream("id_,something", "public") .withSyncMode(SyncMode.INCREMENTAL), CatalogHelpers.createConfiguredAirbyteStream("n\"aMéS", "public") - .withSyncMode(SyncMode.INCREMENTAL) - ) + .withSyncMode(SyncMode.INCREMENTAL), + ), ) val expectedWhitelist = "\\Qpublic.id_and_name\\E,\\Qpublic.id_\\,something\\E,\\Qpublic.n\"aMéS\\E" - val actualWhitelist = RelationalDbDebeziumPropertiesManager.getTableIncludelist(catalog) + val actualWhitelist = + RelationalDbDebeziumPropertiesManager.getTableIncludelist( + catalog, + getCdcStreamListFromCatalog(catalog) + ) Assertions.assertEquals(expectedWhitelist, actualWhitelist) } @@ -46,12 +51,16 @@ internal class DebeziumRecordPublisherTest { CatalogHelpers.createConfiguredAirbyteStream("id_and_name", "public") .withSyncMode(SyncMode.INCREMENTAL), CatalogHelpers.createConfiguredAirbyteStream("id_and_name2", "public") - .withSyncMode(SyncMode.FULL_REFRESH) - ) + .withSyncMode(SyncMode.FULL_REFRESH), + ), ) val expectedWhitelist = "\\Qpublic.id_and_name\\E" - val actualWhitelist = RelationalDbDebeziumPropertiesManager.getTableIncludelist(catalog) + val actualWhitelist = + RelationalDbDebeziumPropertiesManager.getTableIncludelist( + catalog, + getCdcStreamListFromCatalog(catalog) + ) Assertions.assertEquals(expectedWhitelist, actualWhitelist) } @@ -66,7 +75,7 @@ internal class DebeziumRecordPublisherTest { "id_and_name", "public", Field.of("fld1", JsonSchemaType.NUMBER), - Field.of("fld2", JsonSchemaType.STRING) + Field.of("fld2", JsonSchemaType.STRING), ) .withSyncMode(SyncMode.INCREMENTAL), CatalogHelpers.createConfiguredAirbyteStream("id_,something", "public") @@ -74,13 +83,87 @@ internal class DebeziumRecordPublisherTest { CatalogHelpers.createConfiguredAirbyteStream("id_and_name2", "public") .withSyncMode(SyncMode.FULL_REFRESH), CatalogHelpers.createConfiguredAirbyteStream("n\"aMéS", "public") - .withSyncMode(SyncMode.INCREMENTAL) - ) + .withSyncMode(SyncMode.INCREMENTAL), + ), ) val expectedWhitelist = "\\Qpublic.id_and_name\\E\\.(\\Qfld2\\E|\\Qfld1\\E),\\Qpublic.id_\\,something\\E,\\Qpublic.n\"aMéS\\E" - val actualWhitelist = RelationalDbDebeziumPropertiesManager.getColumnIncludeList(catalog) + val actualWhitelist = + RelationalDbDebeziumPropertiesManager.getColumnIncludeList( + catalog, + getCdcStreamListFromCatalog(catalog) + ) + + Assertions.assertEquals(expectedWhitelist, actualWhitelist) + } + + @Test + fun testTableIncludelistFiltersStreamList() { + val catalog = + ConfiguredAirbyteCatalog() + .withStreams( + ImmutableList.of( + CatalogHelpers.createConfiguredAirbyteStream("id_and_name", "public") + .withSyncMode(SyncMode.INCREMENTAL), + CatalogHelpers.createConfiguredAirbyteStream("id_and_name2", "public") + .withSyncMode(SyncMode.INCREMENTAL), + ), + ) + + val expectedWhitelist = "\\Qpublic.id_and_name\\E" + val cdcStreamList = + catalog.streams + .stream() + .filter { configuredStream: ConfiguredAirbyteStream -> + configuredStream.stream.name.equals("id_and_name") + } + .map { stream: ConfiguredAirbyteStream -> + stream.stream.namespace + "." + stream.stream.name + } + .toList() + val actualWhitelist = + RelationalDbDebeziumPropertiesManager.getTableIncludelist(catalog, cdcStreamList) + + Assertions.assertEquals(expectedWhitelist, actualWhitelist) + } + + @Test + fun testColumnIncludelistFiltersStreamList() { + val catalog = + ConfiguredAirbyteCatalog() + .withStreams( + ImmutableList.of( + CatalogHelpers.createConfiguredAirbyteStream( + "id_and_name", + "public", + Field.of("fld1", JsonSchemaType.NUMBER), + Field.of("fld2", JsonSchemaType.STRING), + ) + .withSyncMode(SyncMode.INCREMENTAL), + CatalogHelpers.createConfiguredAirbyteStream("id_,something", "public") + .withSyncMode(SyncMode.INCREMENTAL), + CatalogHelpers.createConfiguredAirbyteStream("id_and_name2", "public") + .withSyncMode(SyncMode.FULL_REFRESH), + CatalogHelpers.createConfiguredAirbyteStream("n\"aMéS", "public") + .withSyncMode(SyncMode.INCREMENTAL), + ), + ) + + val expectedWhitelist = + "\\Qpublic.id_and_name\\E\\.(\\Qfld2\\E|\\Qfld1\\E),\\Qpublic.id_\\,something\\E,\\Qpublic.n\"aMéS\\E" + val cdcStreamList = + catalog.streams + .stream() + .filter { configuredStream: ConfiguredAirbyteStream -> + !configuredStream.stream.name.equals("id_and_name2") + } + .map { stream: ConfiguredAirbyteStream -> + stream.stream.namespace + "." + stream.stream.name + } + .toList() + val actualWhitelist = + RelationalDbDebeziumPropertiesManager.getColumnIncludeList(catalog, cdcStreamList) Assertions.assertEquals(expectedWhitelist, actualWhitelist) } @@ -101,14 +184,19 @@ internal class DebeziumRecordPublisherTest { "id_and_name", "public", Field.of("fld1", JsonSchemaType.NUMBER), - Field.of("fld2", JsonSchemaType.STRING) + Field.of("fld2", JsonSchemaType.STRING), ) - .withSyncMode(SyncMode.INCREMENTAL) - ) + .withSyncMode(SyncMode.INCREMENTAL), + ), ) val anchored = - "^" + RelationalDbDebeziumPropertiesManager.getColumnIncludeList(catalog) + "$" + "^" + + RelationalDbDebeziumPropertiesManager.getColumnIncludeList( + catalog, + getCdcStreamListFromCatalog(catalog) + ) + + "$" val pattern = Pattern.compile(anchored) Assertions.assertTrue(pattern.matcher("public.id_and_name.fld1").find()) @@ -117,4 +205,14 @@ internal class DebeziumRecordPublisherTest { Assertions.assertFalse(pattern.matcher("ppppublic.id_and_name.fld2333").find()) Assertions.assertFalse(pattern.matcher("public.id_and_name.fld_wrong_wrong").find()) } + + fun getCdcStreamListFromCatalog(catalog: ConfiguredAirbyteCatalog): List { + return catalog.streams + .stream() + .filter { stream: ConfiguredAirbyteStream -> stream.syncMode == SyncMode.INCREMENTAL } + .map { stream: ConfiguredAirbyteStream -> + stream.stream.namespace + "." + stream.stream.name + } + .toList() + } } diff --git a/airbyte-integrations/connectors/source-mysql/build.gradle b/airbyte-integrations/connectors/source-mysql/build.gradle index b469513ea1f6..7d251f9beabf 100644 --- a/airbyte-integrations/connectors/source-mysql/build.gradle +++ b/airbyte-integrations/connectors/source-mysql/build.gradle @@ -6,7 +6,7 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.36.3' + cdkVersionRequired = '0.38.0' features = ['db-sources'] useLocalCdk = false } diff --git a/airbyte-integrations/connectors/source-mysql/metadata.yaml b/airbyte-integrations/connectors/source-mysql/metadata.yaml index 408e329e9dd8..1b776f4d3e35 100644 --- a/airbyte-integrations/connectors/source-mysql/metadata.yaml +++ b/airbyte-integrations/connectors/source-mysql/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad - dockerImageTag: 3.4.8 + dockerImageTag: 3.4.9 dockerRepository: airbyte/source-mysql documentationUrl: https://docs.airbyte.com/integrations/sources/mysql githubIssueLabel: source-mysql diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/cdc/MySqlDebeziumStateUtil.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/cdc/MySqlDebeziumStateUtil.java index 3193e8e6de95..39f8ff8fe271 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/cdc/MySqlDebeziumStateUtil.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/cdc/MySqlDebeziumStateUtil.java @@ -36,6 +36,7 @@ import java.sql.SQLException; import java.time.Duration; import java.time.Instant; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -171,7 +172,8 @@ public Optional savedOffset(final Properties baseP } final var offsetManager = AirbyteFileOffsetBackingStore.initializeState(cdcOffset, Optional.empty()); - final DebeziumPropertiesManager debeziumPropertiesManager = new RelationalDbDebeziumPropertiesManager(baseProperties, config, catalog); + final DebeziumPropertiesManager debeziumPropertiesManager = new RelationalDbDebeziumPropertiesManager(baseProperties, config, catalog, + new ArrayList()); final Properties debeziumProperties = debeziumPropertiesManager.getDebeziumProperties(offsetManager); return parseSavedOffset(debeziumProperties); } @@ -247,7 +249,8 @@ public JsonNode constructInitialDebeziumState(final Properties properties, final AirbyteSchemaHistoryStorage schemaHistoryStorage = AirbyteSchemaHistoryStorage.initializeDBHistory(new SchemaHistory<>(Optional.empty(), false), COMPRESSION_ENABLED); final LinkedBlockingQueue> queue = new LinkedBlockingQueue<>(); - final var debeziumPropertiesManager = new RelationalDbDebeziumPropertiesManager(properties, database.getSourceConfig(), catalog); + final var debeziumPropertiesManager = + new RelationalDbDebeziumPropertiesManager(properties, database.getSourceConfig(), catalog, new ArrayList()); try (final DebeziumRecordPublisher publisher = new DebeziumRecordPublisher(debeziumPropertiesManager)) { publisher.start(queue, offsetManager, Optional.of(schemaHistoryStorage)); diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialReadUtil.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialReadUtil.java index cdd2fd4dbb22..04a62a270073 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialReadUtil.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialReadUtil.java @@ -243,8 +243,11 @@ public static List> getCdcReadIterators(fi firstRecordWaitTime, AirbyteDebeziumHandler.QUEUE_CAPACITY, false); + final var cdcStreamList = catalog.getStreams().stream() + .filter(stream -> stream.getSyncMode() == SyncMode.INCREMENTAL) + .map(stream -> stream.getStream().getNamespace() + "." + stream.getStream().getName()).toList(); final var propertiesManager = new RelationalDbDebeziumPropertiesManager( - MySqlCdcProperties.getDebeziumProperties(database), sourceConfig, catalog); + MySqlCdcProperties.getDebeziumProperties(database), sourceConfig, catalog, cdcStreamList); final var eventConverter = new RelationalDbDebeziumEventConverter(metadataInjector, emittedAt); final Supplier> incrementalIteratorSupplier = () -> handler.getIncrementalIterators( diff --git a/docs/integrations/sources/mysql.md b/docs/integrations/sources/mysql.md index aeccfc74a0bd..60b94293a90c 100644 --- a/docs/integrations/sources/mysql.md +++ b/docs/integrations/sources/mysql.md @@ -233,6 +233,7 @@ Any database or table encoding combination of charset and collation is supported | Version | Date | Pull Request | Subject | |:--------|:-----------| :--------------------------------------------------------- |:------------------------------------------------------------------------------------------------------------------------------------------------| +| 3.4.9 | 2024-06-11 | [39405](https://github.com/airbytehq/airbyte/pull/39405) | Adopt latest CDK. | | 3.4.8 | 2024-06-05 | [39144](https://github.com/airbytehq/airbyte/pull/39144) | Upgrade Debezium to 2.5.4 | | 3.4.7 | 2024-05-29 | [38584](https://github.com/airbytehq/airbyte/pull/38584) | Set is_resumable flag in discover. | | 3.4.6 | 2024-05-29 | [38538](https://github.com/airbytehq/airbyte/pull/38538) | Exit connector when encountering a config error. |