Skip to content

Commit

Permalink
[DB sources] : Debezium properties accept list of streams (#39405)
Browse files Browse the repository at this point in the history
  • Loading branch information
akashkulk authored Jun 13, 2024
1 parent e915758 commit fcc2940
Show file tree
Hide file tree
Showing 10 changed files with 154 additions and 34 deletions.
9 changes: 5 additions & 4 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,11 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.37.1 | 2024-06-10 | [\#38075](https://github.com/airbytehq/airbyte/pull/38075) | Destinations: Track stream statuses in async framework |
| 0.37.0 | 2024-06-10 | [\#38121](https://github.com/airbytehq/airbyte/pull/38121) | Destinations: Set default namespace via CatalogParser |
| 0.36.8 | 2024-06-07 | [\#38763](https://github.com/airbytehq/airbyte/pull/38763) | Increase Jackson message length limit |
| 0.36.7 | 2024-06-06 | [\#39220](https://github.com/airbytehq/airbyte/pull/39220) | Handle null messages in ConnectorExceptionUtil |
| 0.38.0 | 2024-06-11 | [\#39405](https://github.com/airbytehq/airbyte/pull/39405) | Sources: Debezium properties manager interface changed to accept a list of streams to scope to |
| 0.37.1 | 2024-06-10 | [\#38075](https://github.com/airbytehq/airbyte/pull/38075) | Destinations: Track stream statuses in async framework |
| 0.37.0 | 2024-06-10 | [\#38121](https://github.com/airbytehq/airbyte/pull/38121) | Destinations: Set default namespace via CatalogParser |
| 0.36.8 | 2024-06-07 | [\#38763](https://github.com/airbytehq/airbyte/pull/38763) | Increase Jackson message length limit |
| 0.36.7 | 2024-06-06 | [\#39220](https://github.com/airbytehq/airbyte/pull/39220) | Handle null messages in ConnectorExceptionUtil |
| 0.36.6 | 2024-06-05 | [\#39106](https://github.com/airbytehq/airbyte/pull/39106) | Skip write to storage with 0 byte file |
| 0.36.5 | 2024-06-01 | [\#38792](https://github.com/airbytehq/airbyte/pull/38792) | Throw config exception if no selectable table exists in user provided schemas |
| 0.36.4 | 2024-05-31 | [\#38824](https://github.com/airbytehq/airbyte/pull/38824) | Param marked as non-null to nullable in JdbcDestinationHandler for NPE fix |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.37.3
version=0.38.0
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import java.util.*
abstract class DebeziumPropertiesManager(
private val properties: Properties,
private val config: JsonNode,
private val catalog: ConfiguredAirbyteCatalog
private val catalog: ConfiguredAirbyteCatalog,
private val streamNames: List<String>
) {
fun getDebeziumProperties(offsetManager: AirbyteFileOffsetBackingStore): Properties {
return getDebeziumProperties(offsetManager, Optional.empty())
Expand Down Expand Up @@ -73,7 +74,7 @@ abstract class DebeziumPropertiesManager(
// following
props.setProperty("value.converter.replace.null.with.default", "false")
// includes
props.putAll(getIncludeConfiguration(catalog, config))
props.putAll(getIncludeConfiguration(catalog, config, streamNames))

return props
}
Expand All @@ -84,7 +85,8 @@ abstract class DebeziumPropertiesManager(

protected abstract fun getIncludeConfiguration(
catalog: ConfiguredAirbyteCatalog,
config: JsonNode?
config: JsonNode?,
streamNames: List<String>
): Properties

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ import org.codehaus.plexus.util.StringUtils
class RelationalDbDebeziumPropertiesManager(
properties: Properties,
config: JsonNode,
catalog: ConfiguredAirbyteCatalog
) : DebeziumPropertiesManager(properties, config, catalog) {
catalog: ConfiguredAirbyteCatalog,
completedStreamNames: List<String>
) : DebeziumPropertiesManager(properties, config, catalog, completedStreamNames) {
override fun getConnectionConfiguration(config: JsonNode): Properties {
val properties = Properties()

Expand All @@ -42,20 +43,24 @@ class RelationalDbDebeziumPropertiesManager(

override fun getIncludeConfiguration(
catalog: ConfiguredAirbyteCatalog,
config: JsonNode?
config: JsonNode?,
streamNames: List<String>
): Properties {
val properties = Properties()

// table selection
properties.setProperty("table.include.list", getTableIncludelist(catalog))
properties.setProperty("table.include.list", getTableIncludelist(catalog, streamNames))
// column selection
properties.setProperty("column.include.list", getColumnIncludeList(catalog))
properties.setProperty("column.include.list", getColumnIncludeList(catalog, streamNames))

return properties
}

companion object {
fun getTableIncludelist(catalog: ConfiguredAirbyteCatalog): String {
fun getTableIncludelist(
catalog: ConfiguredAirbyteCatalog,
completedStreamNames: List<String>
): String {
// Turn "stream": {
// "namespace": "schema1"
// "name": "table1
Expand All @@ -69,13 +74,17 @@ class RelationalDbDebeziumPropertiesManager(
.filter { s: ConfiguredAirbyteStream -> s.syncMode == SyncMode.INCREMENTAL }
.map { obj: ConfiguredAirbyteStream -> obj.stream }
.map { stream: AirbyteStream -> stream.namespace + "." + stream.name }
.filter { streamName: String -> completedStreamNames.contains(streamName) }
// debezium needs commas escaped to split properly
.joinToString(",") { x: String ->
StringUtils.escape(Pattern.quote(x), ",".toCharArray(), "\\,")
}
}

fun getColumnIncludeList(catalog: ConfiguredAirbyteCatalog): String {
fun getColumnIncludeList(
catalog: ConfiguredAirbyteCatalog,
completedStreamNames: List<String>
): String {
// Turn "stream": {
// "namespace": "schema1"
// "name": "table1"
Expand All @@ -92,6 +101,9 @@ class RelationalDbDebeziumPropertiesManager(
return catalog.streams
.filter { s: ConfiguredAirbyteStream -> s.syncMode == SyncMode.INCREMENTAL }
.map { obj: ConfiguredAirbyteStream -> obj.stream }
.filter { stream: AirbyteStream ->
completedStreamNames.contains(stream.namespace + "." + stream.name)
}
.map { s: AirbyteStream ->
val fields = parseFields(s.jsonSchema["properties"].fieldNames())
Pattern.quote(s.namespace + "." + s.name) +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import io.airbyte.protocol.models.Field
import io.airbyte.protocol.models.JsonSchemaType
import io.airbyte.protocol.models.v0.CatalogHelpers
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
import io.airbyte.protocol.models.v0.SyncMode
import java.util.regex.Pattern
import org.junit.jupiter.api.Assertions
Expand All @@ -26,13 +27,17 @@ internal class DebeziumRecordPublisherTest {
CatalogHelpers.createConfiguredAirbyteStream("id_,something", "public")
.withSyncMode(SyncMode.INCREMENTAL),
CatalogHelpers.createConfiguredAirbyteStream("n\"aMéS", "public")
.withSyncMode(SyncMode.INCREMENTAL)
)
.withSyncMode(SyncMode.INCREMENTAL),
),
)

val expectedWhitelist =
"\\Qpublic.id_and_name\\E,\\Qpublic.id_\\,something\\E,\\Qpublic.n\"aMéS\\E"
val actualWhitelist = RelationalDbDebeziumPropertiesManager.getTableIncludelist(catalog)
val actualWhitelist =
RelationalDbDebeziumPropertiesManager.getTableIncludelist(
catalog,
getCdcStreamListFromCatalog(catalog)
)

Assertions.assertEquals(expectedWhitelist, actualWhitelist)
}
Expand All @@ -46,12 +51,16 @@ internal class DebeziumRecordPublisherTest {
CatalogHelpers.createConfiguredAirbyteStream("id_and_name", "public")
.withSyncMode(SyncMode.INCREMENTAL),
CatalogHelpers.createConfiguredAirbyteStream("id_and_name2", "public")
.withSyncMode(SyncMode.FULL_REFRESH)
)
.withSyncMode(SyncMode.FULL_REFRESH),
),
)

val expectedWhitelist = "\\Qpublic.id_and_name\\E"
val actualWhitelist = RelationalDbDebeziumPropertiesManager.getTableIncludelist(catalog)
val actualWhitelist =
RelationalDbDebeziumPropertiesManager.getTableIncludelist(
catalog,
getCdcStreamListFromCatalog(catalog)
)

Assertions.assertEquals(expectedWhitelist, actualWhitelist)
}
Expand All @@ -66,21 +75,95 @@ internal class DebeziumRecordPublisherTest {
"id_and_name",
"public",
Field.of("fld1", JsonSchemaType.NUMBER),
Field.of("fld2", JsonSchemaType.STRING)
Field.of("fld2", JsonSchemaType.STRING),
)
.withSyncMode(SyncMode.INCREMENTAL),
CatalogHelpers.createConfiguredAirbyteStream("id_,something", "public")
.withSyncMode(SyncMode.INCREMENTAL),
CatalogHelpers.createConfiguredAirbyteStream("id_and_name2", "public")
.withSyncMode(SyncMode.FULL_REFRESH),
CatalogHelpers.createConfiguredAirbyteStream("n\"aMéS", "public")
.withSyncMode(SyncMode.INCREMENTAL)
)
.withSyncMode(SyncMode.INCREMENTAL),
),
)

val expectedWhitelist =
"\\Qpublic.id_and_name\\E\\.(\\Qfld2\\E|\\Qfld1\\E),\\Qpublic.id_\\,something\\E,\\Qpublic.n\"aMéS\\E"
val actualWhitelist = RelationalDbDebeziumPropertiesManager.getColumnIncludeList(catalog)
val actualWhitelist =
RelationalDbDebeziumPropertiesManager.getColumnIncludeList(
catalog,
getCdcStreamListFromCatalog(catalog)
)

Assertions.assertEquals(expectedWhitelist, actualWhitelist)
}

@Test
fun testTableIncludelistFiltersStreamList() {
val catalog =
ConfiguredAirbyteCatalog()
.withStreams(
ImmutableList.of(
CatalogHelpers.createConfiguredAirbyteStream("id_and_name", "public")
.withSyncMode(SyncMode.INCREMENTAL),
CatalogHelpers.createConfiguredAirbyteStream("id_and_name2", "public")
.withSyncMode(SyncMode.INCREMENTAL),
),
)

val expectedWhitelist = "\\Qpublic.id_and_name\\E"
val cdcStreamList =
catalog.streams
.stream()
.filter { configuredStream: ConfiguredAirbyteStream ->
configuredStream.stream.name.equals("id_and_name")
}
.map { stream: ConfiguredAirbyteStream ->
stream.stream.namespace + "." + stream.stream.name
}
.toList()
val actualWhitelist =
RelationalDbDebeziumPropertiesManager.getTableIncludelist(catalog, cdcStreamList)

Assertions.assertEquals(expectedWhitelist, actualWhitelist)
}

@Test
fun testColumnIncludelistFiltersStreamList() {
val catalog =
ConfiguredAirbyteCatalog()
.withStreams(
ImmutableList.of(
CatalogHelpers.createConfiguredAirbyteStream(
"id_and_name",
"public",
Field.of("fld1", JsonSchemaType.NUMBER),
Field.of("fld2", JsonSchemaType.STRING),
)
.withSyncMode(SyncMode.INCREMENTAL),
CatalogHelpers.createConfiguredAirbyteStream("id_,something", "public")
.withSyncMode(SyncMode.INCREMENTAL),
CatalogHelpers.createConfiguredAirbyteStream("id_and_name2", "public")
.withSyncMode(SyncMode.FULL_REFRESH),
CatalogHelpers.createConfiguredAirbyteStream("n\"aMéS", "public")
.withSyncMode(SyncMode.INCREMENTAL),
),
)

val expectedWhitelist =
"\\Qpublic.id_and_name\\E\\.(\\Qfld2\\E|\\Qfld1\\E),\\Qpublic.id_\\,something\\E,\\Qpublic.n\"aMéS\\E"
val cdcStreamList =
catalog.streams
.stream()
.filter { configuredStream: ConfiguredAirbyteStream ->
!configuredStream.stream.name.equals("id_and_name2")
}
.map { stream: ConfiguredAirbyteStream ->
stream.stream.namespace + "." + stream.stream.name
}
.toList()
val actualWhitelist =
RelationalDbDebeziumPropertiesManager.getColumnIncludeList(catalog, cdcStreamList)

Assertions.assertEquals(expectedWhitelist, actualWhitelist)
}
Expand All @@ -101,14 +184,19 @@ internal class DebeziumRecordPublisherTest {
"id_and_name",
"public",
Field.of("fld1", JsonSchemaType.NUMBER),
Field.of("fld2", JsonSchemaType.STRING)
Field.of("fld2", JsonSchemaType.STRING),
)
.withSyncMode(SyncMode.INCREMENTAL)
)
.withSyncMode(SyncMode.INCREMENTAL),
),
)

val anchored =
"^" + RelationalDbDebeziumPropertiesManager.getColumnIncludeList(catalog) + "$"
"^" +
RelationalDbDebeziumPropertiesManager.getColumnIncludeList(
catalog,
getCdcStreamListFromCatalog(catalog)
) +
"$"
val pattern = Pattern.compile(anchored)

Assertions.assertTrue(pattern.matcher("public.id_and_name.fld1").find())
Expand All @@ -117,4 +205,14 @@ internal class DebeziumRecordPublisherTest {
Assertions.assertFalse(pattern.matcher("ppppublic.id_and_name.fld2333").find())
Assertions.assertFalse(pattern.matcher("public.id_and_name.fld_wrong_wrong").find())
}

fun getCdcStreamListFromCatalog(catalog: ConfiguredAirbyteCatalog): List<String> {
return catalog.streams
.stream()
.filter { stream: ConfiguredAirbyteStream -> stream.syncMode == SyncMode.INCREMENTAL }
.map { stream: ConfiguredAirbyteStream ->
stream.stream.namespace + "." + stream.stream.name
}
.toList()
}
}
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.36.3'
cdkVersionRequired = '0.38.0'
features = ['db-sources']
useLocalCdk = false
}
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.4.8
dockerImageTag: 3.4.9
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -171,7 +172,8 @@ public Optional<MysqlDebeziumStateAttributes> savedOffset(final Properties baseP
}

final var offsetManager = AirbyteFileOffsetBackingStore.initializeState(cdcOffset, Optional.empty());
final DebeziumPropertiesManager debeziumPropertiesManager = new RelationalDbDebeziumPropertiesManager(baseProperties, config, catalog);
final DebeziumPropertiesManager debeziumPropertiesManager = new RelationalDbDebeziumPropertiesManager(baseProperties, config, catalog,
new ArrayList<String>());
final Properties debeziumProperties = debeziumPropertiesManager.getDebeziumProperties(offsetManager);
return parseSavedOffset(debeziumProperties);
}
Expand Down Expand Up @@ -247,7 +249,8 @@ public JsonNode constructInitialDebeziumState(final Properties properties,
final AirbyteSchemaHistoryStorage schemaHistoryStorage =
AirbyteSchemaHistoryStorage.initializeDBHistory(new SchemaHistory<>(Optional.empty(), false), COMPRESSION_ENABLED);
final LinkedBlockingQueue<ChangeEvent<String, String>> queue = new LinkedBlockingQueue<>();
final var debeziumPropertiesManager = new RelationalDbDebeziumPropertiesManager(properties, database.getSourceConfig(), catalog);
final var debeziumPropertiesManager =
new RelationalDbDebeziumPropertiesManager(properties, database.getSourceConfig(), catalog, new ArrayList<String>());

try (final DebeziumRecordPublisher publisher = new DebeziumRecordPublisher(debeziumPropertiesManager)) {
publisher.start(queue, offsetManager, Optional.of(schemaHistoryStorage));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,11 @@ public static List<AutoCloseableIterator<AirbyteMessage>> getCdcReadIterators(fi
firstRecordWaitTime,
AirbyteDebeziumHandler.QUEUE_CAPACITY,
false);
final var cdcStreamList = catalog.getStreams().stream()
.filter(stream -> stream.getSyncMode() == SyncMode.INCREMENTAL)
.map(stream -> stream.getStream().getNamespace() + "." + stream.getStream().getName()).toList();
final var propertiesManager = new RelationalDbDebeziumPropertiesManager(
MySqlCdcProperties.getDebeziumProperties(database), sourceConfig, catalog);
MySqlCdcProperties.getDebeziumProperties(database), sourceConfig, catalog, cdcStreamList);
final var eventConverter = new RelationalDbDebeziumEventConverter(metadataInjector, emittedAt);

final Supplier<AutoCloseableIterator<AirbyteMessage>> incrementalIteratorSupplier = () -> handler.getIncrementalIterators(
Expand Down
Loading

0 comments on commit fcc2940

Please sign in to comment.