Skip to content

Commit

Permalink
Destination Postgres: Adapt to kotlin CDK (#36620)
Browse files Browse the repository at this point in the history
  • Loading branch information
gisripa authored Apr 5, 2024
1 parent c5284c7 commit beda34b
Show file tree
Hide file tree
Showing 14 changed files with 69 additions and 67 deletions.
17 changes: 9 additions & 8 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,15 @@ Maven and Gradle will automatically reference the correct (pinned) version of th

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.29.3 | 2024-04-04 | [\#36759](https://github.com/airbytehq/airbyte/pull/36759) | Minor fixes. |
| 0.29.3 | 2024-04-04 | [\#36706](https://github.com/airbytehq/airbyte/pull/36706) | Enabling spotbugs for s3-destination. |
| 0.29.3 | 2024-04-03 | [\#36705](https://github.com/airbytehq/airbyte/pull/36705) | Enabling spotbugs for db-sources. |
| 0.29.3 | 2024-04-03 | [\#36704](https://github.com/airbytehq/airbyte/pull/36704) | Enabling spotbugs for datastore-postgres. |
| 0.29.3 | 2024-04-03 | [\#36703](https://github.com/airbytehq/airbyte/pull/36703) | Enabling spotbugs for gcs-destination. |
| 0.29.3 | 2024-04-03 | [\#36702](https://github.com/airbytehq/airbyte/pull/36702) | Enabling spotbugs for db-destinations. |
| 0.29.3 | 2024-04-03 | [\#36701](https://github.com/airbytehq/airbyte/pull/36701) | Enabling spotbugs for typing_and_deduping. |
| 0.29.3 | 2024-04-03 | [\#36612](https://github.com/airbytehq/airbyte/pull/36612) | Enabling spotbugs for dependencies. |
| 0.29.5 | 2024-04-05 | [\#36620](https://github.com/airbytehq/airbyte/pull/36620) | Missed changes - open for extension for destination-postgres |
| 0.29.3 | 2024-04-04 | [\#36759](https://github.com/airbytehq/airbyte/pull/36759) | Minor fixes. |
| 0.29.3 | 2024-04-04 | [\#36706](https://github.com/airbytehq/airbyte/pull/36706) | Enabling spotbugs for s3-destination. |
| 0.29.3 | 2024-04-03 | [\#36705](https://github.com/airbytehq/airbyte/pull/36705) | Enabling spotbugs for db-sources. |
| 0.29.3 | 2024-04-03 | [\#36704](https://github.com/airbytehq/airbyte/pull/36704) | Enabling spotbugs for datastore-postgres. |
| 0.29.3 | 2024-04-03 | [\#36703](https://github.com/airbytehq/airbyte/pull/36703) | Enabling spotbugs for gcs-destination. |
| 0.29.3 | 2024-04-03 | [\#36702](https://github.com/airbytehq/airbyte/pull/36702) | Enabling spotbugs for db-destinations. |
| 0.29.3 | 2024-04-03 | [\#36701](https://github.com/airbytehq/airbyte/pull/36701) | Enabling spotbugs for typing_and_deduping. |
| 0.29.3 | 2024-04-03 | [\#36612](https://github.com/airbytehq/airbyte/pull/36612) | Enabling spotbugs for dependencies. |
| 0.29.2 | 2024-04-04 | [\#36845](https://github.com/airbytehq/airbyte/pull/36772) | Changes to make source-mongo compileable |
| 0.29.1 | 2024-04-03 | [\#36772](https://github.com/airbytehq/airbyte/pull/36772) | Changes to make source-mssql compileable |
| 0.29.0 | 2024-04-02 | [\#36759](https://github.com/airbytehq/airbyte/pull/36759) | Build artifact publication changes and fixes. |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.29.4
version=0.29.5
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ abstract class DestinationAcceptanceTest {
return supportsDbt != null && supportsDbt.asBoolean(false)
}

protected val destinationDefinitionKey: String
protected open val destinationDefinitionKey: String
get() = imageNameWithoutTag

protected open fun getNormalizationIntegrationType(): String? {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.27.3'
cdkVersionRequired = '0.29.5'
features = ['db-destinations', 'typing-deduping', 'datastore-postgres']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 25c5221d-dce2-4163-ade9-739ef790f503
dockerImageTag: 2.0.5
dockerImageTag: 2.0.6
dockerRepository: airbyte/destination-postgres-strict-encrypt
documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres
githubIssueLabel: destination-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.27.3'
cdkVersionRequired = '0.29.5'
features = ['db-destinations', 'datastore-postgres', 'typing-deduping']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 25c5221d-dce2-4163-ade9-739ef790f503
dockerImageTag: 2.0.5
dockerImageTag: 2.0.6
dockerRepository: airbyte/destination-postgres
documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres
githubIssueLabel: destination-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import static io.airbyte.cdk.integrations.util.PostgresSslConnectionUtils.PARAM_MODE;
import static io.airbyte.cdk.integrations.util.PostgresSslConnectionUtils.PARAM_SSL;
import static io.airbyte.cdk.integrations.util.PostgresSslConnectionUtils.PARAM_SSL_MODE;
import static io.airbyte.cdk.integrations.util.PostgresSslConnectionUtils.obtainConnectionOptions;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
Expand All @@ -24,6 +23,7 @@
import io.airbyte.cdk.integrations.destination.jdbc.AbstractJdbcDestination;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator;
import io.airbyte.cdk.integrations.util.PostgresSslConnectionUtils;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler;
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog;
Expand Down Expand Up @@ -92,7 +92,7 @@ protected Map<String, String> getDefaultConnectionProperties(final JsonNode conf
if (DISABLE.equals(config.get(PARAM_SSL_MODE).get(PARAM_MODE).asText())) {
additionalParameters.put("sslmode", DISABLE);
} else {
additionalParameters.putAll(obtainConnectionOptions(config.get(PARAM_SSL_MODE)));
additionalParameters.putAll(PostgresSslConnectionUtils.obtainConnectionOptions(config.get(PARAM_SSL_MODE)));
}
} else {
additionalParameters.put(JdbcUtils.SSL_KEY, "true");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.integrations.base.TypingAndDedupingFlag;
import io.airbyte.cdk.integrations.destination.async.partial_messages.PartialAirbyteMessage;
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage;
import io.airbyte.cdk.integrations.destination.jdbc.JdbcSqlOperations;
import java.io.BufferedReader;
import java.io.File;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ public StreamId buildStreamId(final String namespace, final String name, final S
// To keep it consistent when querying raw table in T+D query, convert it to lowercase.
// TODO: This logic should be unified across Raw and final table operations in a single class
// operating on a StreamId.
final String streamName = namingTransformer.convertStreamName(StreamId.concatenateRawTableName(namespace, name)).toLowerCase();
final String streamName = getNamingTransformer().convertStreamName(StreamId.concatenateRawTableName(namespace, name)).toLowerCase();
return new StreamId(
namingTransformer.getNamespace(namespace),
namingTransformer.convertStreamName(name),
namingTransformer.getNamespace(rawNamespaceOverride).toLowerCase(),
getNamingTransformer().getNamespace(namespace),
getNamingTransformer().convertStreamName(name),
getNamingTransformer().getNamespace(rawNamespaceOverride).toLowerCase(),
streamName.length() > 63 ? streamName.substring(0, 63) : streamName,
namespace,
name);
Expand Down Expand Up @@ -117,22 +117,22 @@ public DataType<?> toDialectType(AirbyteProtocolType airbyteProtocolType) {
@Override
public Sql createTable(final StreamConfig stream, final String suffix, final boolean force) {
final List<Sql> statements = new ArrayList<>();
final Name finalTableName = name(stream.id().finalNamespace(), stream.id().finalName() + suffix);
final Name finalTableName = name(stream.getId().getFinalNamespace(), stream.getId().getFinalName() + suffix);

statements.add(super.createTable(stream, suffix, force));

if (stream.destinationSyncMode() == DestinationSyncMode.APPEND_DEDUP) {
if (stream.getDestinationSyncMode() == DestinationSyncMode.APPEND_DEDUP) {
// An index for our ROW_NUMBER() PARTITION BY pk ORDER BY cursor, extracted_at function
final List<Name> pkNames = stream.primaryKey().stream()
.map(pk -> quotedName(pk.name()))
final List<Name> pkNames = stream.getPrimaryKey().stream()
.map(pk -> quotedName(pk.getName()))
.toList();
statements.add(Sql.of(getDslContext().createIndex().on(
finalTableName,
Stream.of(
pkNames.stream(),
// if cursor is present, then a stream containing its name
// but if no cursor, then empty stream
stream.cursor().stream().map(cursor -> quotedName(cursor.name())),
stream.getCursor().stream().map(cursor -> quotedName(cursor.getName())),
Stream.of(name(COLUMN_NAME_AB_EXTRACTED_AT))).flatMap(Function.identity()).toList())
.getSQL()));
}
Expand All @@ -151,12 +151,12 @@ public Sql createTable(final StreamConfig stream, final String suffix, final boo

@Override
protected List<String> createIndexSql(final StreamConfig stream, final String suffix) {
if (stream.destinationSyncMode() == DestinationSyncMode.APPEND_DEDUP && !stream.primaryKey().isEmpty()) {
if (stream.getDestinationSyncMode() == DestinationSyncMode.APPEND_DEDUP && !stream.getPrimaryKey().isEmpty()) {
return List.of(
getDslContext().createIndex().on(
name(stream.id().finalNamespace(), stream.id().finalName() + suffix),
stream.primaryKey().stream()
.map(pk -> quotedName(pk.name()))
name(stream.getId().getFinalNamespace(), stream.getId().getFinalName() + suffix),
stream.getPrimaryKey().stream()
.map(pk -> quotedName(pk.getName()))
.toList())
.getSQL());
} else {
Expand All @@ -172,7 +172,7 @@ protected List<Field<?>> extractRawDataFields(final LinkedHashMap<ColumnId, Airb
.map(column -> castedField(
extractColumnAsJson(column.getKey()),
column.getValue(),
column.getKey().name(),
column.getKey().getName(),
useExpensiveSaferCasting))
.collect(Collectors.toList());
}
Expand Down Expand Up @@ -269,11 +269,11 @@ private Field<Object> toCastingErrorCaseStmt(final ColumnId column, final Airbyt
return switch (type) {
case Struct ignored -> field(CASE_STATEMENT_SQL_TEMPLATE,
extract.isNotNull().and(jsonTypeof(extract).notIn("object", "null")),
nulledChangeObject(column.originalName()),
nulledChangeObject(column.getOriginalName()),
cast(val((Object) null), JSONB_TYPE));
case Array ignored -> field(CASE_STATEMENT_SQL_TEMPLATE,
extract.isNotNull().and(jsonTypeof(extract).notIn("array", "null")),
nulledChangeObject(column.originalName()),
nulledChangeObject(column.getOriginalName()),
cast(val((Object) null), JSONB_TYPE));
// Unknown types require no casting, so there's never an error.
// Similarly, everything can cast to string without error.
Expand All @@ -284,28 +284,28 @@ private Field<Object> toCastingErrorCaseStmt(final ColumnId column, final Airbyt
extract.isNotNull()
.and(jsonTypeof(extract).ne("null"))
.and(castedField(extract, type, true).isNull()),
nulledChangeObject(column.originalName()),
nulledChangeObject(column.getOriginalName()),
cast(val((Object) null), JSONB_TYPE));
};
}

@Override
protected Condition cdcDeletedAtNotNullCondition() {
return field(name(COLUMN_NAME_AB_LOADED_AT)).isNotNull()
.and(jsonTypeof(extractColumnAsJson(cdcDeletedAtColumn)).ne("null"));
.and(jsonTypeof(extractColumnAsJson(getCdcDeletedAtColumn())).ne("null"));
}

@Override
protected Field<Integer> getRowNumber(final List<ColumnId> primaryKeys, final Optional<ColumnId> cursor) {
// literally identical to redshift's getRowNumber implementation, changes here probably should
// be reflected there
final List<Field<?>> primaryKeyFields =
primaryKeys != null ? primaryKeys.stream().map(columnId -> field(quotedName(columnId.name()))).collect(Collectors.toList())
primaryKeys != null ? primaryKeys.stream().map(columnId -> field(quotedName(columnId.getName()))).collect(Collectors.toList())
: new ArrayList<>();
final List<Field<?>> orderedFields = new ArrayList<>();
// We can still use Jooq's field to get the quoted name with raw sql templating.
// jooq's .desc returns SortField<?> instead of Field<?> and NULLS LAST doesn't work with it
cursor.ifPresent(columnId -> orderedFields.add(field("{0} desc NULLS LAST", field(quotedName(columnId.name())))));
cursor.ifPresent(columnId -> orderedFields.add(field("{0} desc NULLS LAST", field(quotedName(columnId.getName())))));
orderedFields.add(field("{0} desc", quotedName(COLUMN_NAME_AB_EXTRACTED_AT)));
return rowNumber()
.over()
Expand All @@ -317,7 +317,7 @@ protected Field<Integer> getRowNumber(final List<ColumnId> primaryKeys, final Op
* Extract a raw field, leaving it as jsonb
*/
private Field<Object> extractColumnAsJson(final ColumnId column) {
return field("{0} -> {1}", name(COLUMN_NAME_DATA), val(column.originalName()));
return field("{0} -> {1}", name(COLUMN_NAME_DATA), val(column.getOriginalName()));
}

private Field<String> jsonTypeof(final Field<?> field) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ protected JdbcSqlGenerator getSqlGenerator() {

@Override
protected DestinationHandler<PostgresState> getDestinationHandler() {
return new PostgresDestinationHandler(databaseName, database, namespace);
return new PostgresDestinationHandler(databaseName, database, getNamespace());
}

@Override
Expand All @@ -92,10 +92,10 @@ protected Field<?> toJsonValue(final String valueAsString) {
@Test
@Override
public void testCreateTableIncremental() throws Exception {
final Sql sql = generator.createTable(incrementalDedupStream, "", false);
destinationHandler.execute(sql);
final Sql sql = getGenerator().createTable(getIncrementalDedupStream(), "", false);
getDestinationHandler().execute(sql);

List<DestinationInitialStatus<PostgresState>> initialStatuses = destinationHandler.gatherInitialState(List.of(incrementalDedupStream));
List<DestinationInitialStatus<PostgresState>> initialStatuses = getDestinationHandler().gatherInitialState(List.of(getIncrementalDedupStream()));
assertEquals(1, initialStatuses.size());
final DestinationInitialStatus<PostgresState> initialStatus = initialStatuses.getFirst();
assertTrue(initialStatus.isFinalTablePresent());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,9 @@ public PostgresConfigBuilder withCdcReplication(String LsnCommitBehaviour) {
.with("is_test", true)
.with("replication_method", Jsons.jsonNode(ImmutableMap.builder()
.put("method", "CDC")
.put("replication_slot", testDatabase.getReplicationSlotName())
.put("publication", testDatabase.getPublicationName())
.put("initial_waiting_seconds", DEFAULT_CDC_REPLICATION_INITIAL_WAIT.getSeconds())
.put("replication_slot", getTestDatabase().getReplicationSlotName())
.put("publication", getTestDatabase().getPublicationName())
.put("initial_waiting_seconds", ConfigBuilder.DEFAULT_CDC_REPLICATION_INITIAL_WAIT.getSeconds())
.put("lsn_commit_behaviour", LsnCommitBehaviour)
.build()));
}
Expand Down
Loading

0 comments on commit beda34b

Please sign in to comment.