diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index f2472b5a0540..95978158eb83 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -144,14 +144,15 @@ Maven and Gradle will automatically reference the correct (pinned) version of th | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------| -| 0.29.3 | 2024-04-04 | [\#36759](https://github.com/airbytehq/airbyte/pull/36759) | Minor fixes. | -| 0.29.3 | 2024-04-04 | [\#36706](https://github.com/airbytehq/airbyte/pull/36706) | Enabling spotbugs for s3-destination. | -| 0.29.3 | 2024-04-03 | [\#36705](https://github.com/airbytehq/airbyte/pull/36705) | Enabling spotbugs for db-sources. | -| 0.29.3 | 2024-04-03 | [\#36704](https://github.com/airbytehq/airbyte/pull/36704) | Enabling spotbugs for datastore-postgres. | -| 0.29.3 | 2024-04-03 | [\#36703](https://github.com/airbytehq/airbyte/pull/36703) | Enabling spotbugs for gcs-destination. | -| 0.29.3 | 2024-04-03 | [\#36702](https://github.com/airbytehq/airbyte/pull/36702) | Enabling spotbugs for db-destinations. | -| 0.29.3 | 2024-04-03 | [\#36701](https://github.com/airbytehq/airbyte/pull/36701) | Enabling spotbugs for typing_and_deduping. | -| 0.29.3 | 2024-04-03 | [\#36612](https://github.com/airbytehq/airbyte/pull/36612) | Enabling spotbugs for dependencies. | +| 0.29.5 | 2024-04-05 | [\#36620](https://github.com/airbytehq/airbyte/pull/36620) | Missed changes - open for extension for destination-postgres | +| 0.29.3 | 2024-04-04 | [\#36759](https://github.com/airbytehq/airbyte/pull/36759) | Minor fixes. | +| 0.29.3 | 2024-04-04 | [\#36706](https://github.com/airbytehq/airbyte/pull/36706) | Enabling spotbugs for s3-destination. | +| 0.29.3 | 2024-04-03 | [\#36705](https://github.com/airbytehq/airbyte/pull/36705) | Enabling spotbugs for db-sources. | +| 0.29.3 | 2024-04-03 | [\#36704](https://github.com/airbytehq/airbyte/pull/36704) | Enabling spotbugs for datastore-postgres. | +| 0.29.3 | 2024-04-03 | [\#36703](https://github.com/airbytehq/airbyte/pull/36703) | Enabling spotbugs for gcs-destination. | +| 0.29.3 | 2024-04-03 | [\#36702](https://github.com/airbytehq/airbyte/pull/36702) | Enabling spotbugs for db-destinations. | +| 0.29.3 | 2024-04-03 | [\#36701](https://github.com/airbytehq/airbyte/pull/36701) | Enabling spotbugs for typing_and_deduping. | +| 0.29.3 | 2024-04-03 | [\#36612](https://github.com/airbytehq/airbyte/pull/36612) | Enabling spotbugs for dependencies. | | 0.29.2 | 2024-04-04 | [\#36845](https://github.com/airbytehq/airbyte/pull/36772) | Changes to make source-mongo compileable | | 0.29.1 | 2024-04-03 | [\#36772](https://github.com/airbytehq/airbyte/pull/36772) | Changes to make source-mssql compileable | | 0.29.0 | 2024-04-02 | [\#36759](https://github.com/airbytehq/airbyte/pull/36759) | Build artifact publication changes and fixes. | diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index c2a0c99b1b98..b442b07c2b31 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.29.4 +version=0.29.5 diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt index cd94fd11bb36..a852a456f78f 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt @@ -243,7 +243,7 @@ abstract class DestinationAcceptanceTest { return supportsDbt != null && supportsDbt.asBoolean(false) } - protected val destinationDefinitionKey: String + protected open val destinationDefinitionKey: String get() = imageNameWithoutTag protected open fun getNormalizationIntegrationType(): String? { diff --git a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/build.gradle b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/build.gradle index cd273c798e5d..4d7da11de69f 100644 --- a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/build.gradle +++ b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/build.gradle @@ -3,7 +3,7 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.27.3' + cdkVersionRequired = '0.29.5' features = ['db-destinations', 'typing-deduping', 'datastore-postgres'] useLocalCdk = false } diff --git a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/metadata.yaml b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/metadata.yaml index ece21b7bd519..b46a515cebbb 100644 --- a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/metadata.yaml +++ b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 25c5221d-dce2-4163-ade9-739ef790f503 - dockerImageTag: 2.0.5 + dockerImageTag: 2.0.6 dockerRepository: airbyte/destination-postgres-strict-encrypt documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres githubIssueLabel: destination-postgres diff --git a/airbyte-integrations/connectors/destination-postgres/build.gradle b/airbyte-integrations/connectors/destination-postgres/build.gradle index f3211286fe97..83df5f2636c4 100644 --- a/airbyte-integrations/connectors/destination-postgres/build.gradle +++ b/airbyte-integrations/connectors/destination-postgres/build.gradle @@ -3,7 +3,7 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.27.3' + cdkVersionRequired = '0.29.5' features = ['db-destinations', 'datastore-postgres', 'typing-deduping'] useLocalCdk = false } diff --git a/airbyte-integrations/connectors/destination-postgres/metadata.yaml b/airbyte-integrations/connectors/destination-postgres/metadata.yaml index 5f16c4ce0d13..1e1abf1b094a 100644 --- a/airbyte-integrations/connectors/destination-postgres/metadata.yaml +++ b/airbyte-integrations/connectors/destination-postgres/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 25c5221d-dce2-4163-ade9-739ef790f503 - dockerImageTag: 2.0.5 + dockerImageTag: 2.0.6 dockerRepository: airbyte/destination-postgres documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres githubIssueLabel: destination-postgres diff --git a/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java b/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java index deb6138cd34d..749115fa261a 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java +++ b/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java @@ -8,7 +8,6 @@ import static io.airbyte.cdk.integrations.util.PostgresSslConnectionUtils.PARAM_MODE; import static io.airbyte.cdk.integrations.util.PostgresSslConnectionUtils.PARAM_SSL; import static io.airbyte.cdk.integrations.util.PostgresSslConnectionUtils.PARAM_SSL_MODE; -import static io.airbyte.cdk.integrations.util.PostgresSslConnectionUtils.obtainConnectionOptions; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; @@ -24,6 +23,7 @@ import io.airbyte.cdk.integrations.destination.jdbc.AbstractJdbcDestination; import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler; import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator; +import io.airbyte.cdk.integrations.util.PostgresSslConnectionUtils; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler; import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog; @@ -92,7 +92,7 @@ protected Map getDefaultConnectionProperties(final JsonNode conf if (DISABLE.equals(config.get(PARAM_SSL_MODE).get(PARAM_MODE).asText())) { additionalParameters.put("sslmode", DISABLE); } else { - additionalParameters.putAll(obtainConnectionOptions(config.get(PARAM_SSL_MODE))); + additionalParameters.putAll(PostgresSslConnectionUtils.obtainConnectionOptions(config.get(PARAM_SSL_MODE))); } } else { additionalParameters.put(JdbcUtils.SSL_KEY, "true"); diff --git a/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresSqlOperations.java b/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresSqlOperations.java index a179679fc3f4..933e9af80e8d 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresSqlOperations.java +++ b/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresSqlOperations.java @@ -8,7 +8,7 @@ import io.airbyte.cdk.db.jdbc.JdbcDatabase; import io.airbyte.cdk.integrations.base.TypingAndDedupingFlag; -import io.airbyte.cdk.integrations.destination.async.partial_messages.PartialAirbyteMessage; +import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage; import io.airbyte.cdk.integrations.destination.jdbc.JdbcSqlOperations; import java.io.BufferedReader; import java.io.File; diff --git a/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGenerator.java b/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGenerator.java index e4a2bc73c3de..8b803943e126 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGenerator.java +++ b/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGenerator.java @@ -71,11 +71,11 @@ public StreamId buildStreamId(final String namespace, final String name, final S // To keep it consistent when querying raw table in T+D query, convert it to lowercase. // TODO: This logic should be unified across Raw and final table operations in a single class // operating on a StreamId. - final String streamName = namingTransformer.convertStreamName(StreamId.concatenateRawTableName(namespace, name)).toLowerCase(); + final String streamName = getNamingTransformer().convertStreamName(StreamId.concatenateRawTableName(namespace, name)).toLowerCase(); return new StreamId( - namingTransformer.getNamespace(namespace), - namingTransformer.convertStreamName(name), - namingTransformer.getNamespace(rawNamespaceOverride).toLowerCase(), + getNamingTransformer().getNamespace(namespace), + getNamingTransformer().convertStreamName(name), + getNamingTransformer().getNamespace(rawNamespaceOverride).toLowerCase(), streamName.length() > 63 ? streamName.substring(0, 63) : streamName, namespace, name); @@ -117,14 +117,14 @@ public DataType toDialectType(AirbyteProtocolType airbyteProtocolType) { @Override public Sql createTable(final StreamConfig stream, final String suffix, final boolean force) { final List statements = new ArrayList<>(); - final Name finalTableName = name(stream.id().finalNamespace(), stream.id().finalName() + suffix); + final Name finalTableName = name(stream.getId().getFinalNamespace(), stream.getId().getFinalName() + suffix); statements.add(super.createTable(stream, suffix, force)); - if (stream.destinationSyncMode() == DestinationSyncMode.APPEND_DEDUP) { + if (stream.getDestinationSyncMode() == DestinationSyncMode.APPEND_DEDUP) { // An index for our ROW_NUMBER() PARTITION BY pk ORDER BY cursor, extracted_at function - final List pkNames = stream.primaryKey().stream() - .map(pk -> quotedName(pk.name())) + final List pkNames = stream.getPrimaryKey().stream() + .map(pk -> quotedName(pk.getName())) .toList(); statements.add(Sql.of(getDslContext().createIndex().on( finalTableName, @@ -132,7 +132,7 @@ public Sql createTable(final StreamConfig stream, final String suffix, final boo pkNames.stream(), // if cursor is present, then a stream containing its name // but if no cursor, then empty stream - stream.cursor().stream().map(cursor -> quotedName(cursor.name())), + stream.getCursor().stream().map(cursor -> quotedName(cursor.getName())), Stream.of(name(COLUMN_NAME_AB_EXTRACTED_AT))).flatMap(Function.identity()).toList()) .getSQL())); } @@ -151,12 +151,12 @@ public Sql createTable(final StreamConfig stream, final String suffix, final boo @Override protected List createIndexSql(final StreamConfig stream, final String suffix) { - if (stream.destinationSyncMode() == DestinationSyncMode.APPEND_DEDUP && !stream.primaryKey().isEmpty()) { + if (stream.getDestinationSyncMode() == DestinationSyncMode.APPEND_DEDUP && !stream.getPrimaryKey().isEmpty()) { return List.of( getDslContext().createIndex().on( - name(stream.id().finalNamespace(), stream.id().finalName() + suffix), - stream.primaryKey().stream() - .map(pk -> quotedName(pk.name())) + name(stream.getId().getFinalNamespace(), stream.getId().getFinalName() + suffix), + stream.getPrimaryKey().stream() + .map(pk -> quotedName(pk.getName())) .toList()) .getSQL()); } else { @@ -172,7 +172,7 @@ protected List> extractRawDataFields(final LinkedHashMap castedField( extractColumnAsJson(column.getKey()), column.getValue(), - column.getKey().name(), + column.getKey().getName(), useExpensiveSaferCasting)) .collect(Collectors.toList()); } @@ -269,11 +269,11 @@ private Field toCastingErrorCaseStmt(final ColumnId column, final Airbyt return switch (type) { case Struct ignored -> field(CASE_STATEMENT_SQL_TEMPLATE, extract.isNotNull().and(jsonTypeof(extract).notIn("object", "null")), - nulledChangeObject(column.originalName()), + nulledChangeObject(column.getOriginalName()), cast(val((Object) null), JSONB_TYPE)); case Array ignored -> field(CASE_STATEMENT_SQL_TEMPLATE, extract.isNotNull().and(jsonTypeof(extract).notIn("array", "null")), - nulledChangeObject(column.originalName()), + nulledChangeObject(column.getOriginalName()), cast(val((Object) null), JSONB_TYPE)); // Unknown types require no casting, so there's never an error. // Similarly, everything can cast to string without error. @@ -284,7 +284,7 @@ private Field toCastingErrorCaseStmt(final ColumnId column, final Airbyt extract.isNotNull() .and(jsonTypeof(extract).ne("null")) .and(castedField(extract, type, true).isNull()), - nulledChangeObject(column.originalName()), + nulledChangeObject(column.getOriginalName()), cast(val((Object) null), JSONB_TYPE)); }; } @@ -292,7 +292,7 @@ private Field toCastingErrorCaseStmt(final ColumnId column, final Airbyt @Override protected Condition cdcDeletedAtNotNullCondition() { return field(name(COLUMN_NAME_AB_LOADED_AT)).isNotNull() - .and(jsonTypeof(extractColumnAsJson(cdcDeletedAtColumn)).ne("null")); + .and(jsonTypeof(extractColumnAsJson(getCdcDeletedAtColumn())).ne("null")); } @Override @@ -300,12 +300,12 @@ protected Field getRowNumber(final List primaryKeys, final Op // literally identical to redshift's getRowNumber implementation, changes here probably should // be reflected there final List> primaryKeyFields = - primaryKeys != null ? primaryKeys.stream().map(columnId -> field(quotedName(columnId.name()))).collect(Collectors.toList()) + primaryKeys != null ? primaryKeys.stream().map(columnId -> field(quotedName(columnId.getName()))).collect(Collectors.toList()) : new ArrayList<>(); final List> orderedFields = new ArrayList<>(); // We can still use Jooq's field to get the quoted name with raw sql templating. // jooq's .desc returns SortField instead of Field and NULLS LAST doesn't work with it - cursor.ifPresent(columnId -> orderedFields.add(field("{0} desc NULLS LAST", field(quotedName(columnId.name()))))); + cursor.ifPresent(columnId -> orderedFields.add(field("{0} desc NULLS LAST", field(quotedName(columnId.getName()))))); orderedFields.add(field("{0} desc", quotedName(COLUMN_NAME_AB_EXTRACTED_AT))); return rowNumber() .over() @@ -317,7 +317,7 @@ protected Field getRowNumber(final List primaryKeys, final Op * Extract a raw field, leaving it as jsonb */ private Field extractColumnAsJson(final ColumnId column) { - return field("{0} -> {1}", name(COLUMN_NAME_DATA), val(column.originalName())); + return field("{0} -> {1}", name(COLUMN_NAME_DATA), val(column.getOriginalName())); } private Field jsonTypeof(final Field field) { diff --git a/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGeneratorIntegrationTest.java b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGeneratorIntegrationTest.java index 23cbda8a5b05..e46abc05ecf3 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGeneratorIntegrationTest.java +++ b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGeneratorIntegrationTest.java @@ -76,7 +76,7 @@ protected JdbcSqlGenerator getSqlGenerator() { @Override protected DestinationHandler getDestinationHandler() { - return new PostgresDestinationHandler(databaseName, database, namespace); + return new PostgresDestinationHandler(databaseName, database, getNamespace()); } @Override @@ -92,10 +92,10 @@ protected Field toJsonValue(final String valueAsString) { @Test @Override public void testCreateTableIncremental() throws Exception { - final Sql sql = generator.createTable(incrementalDedupStream, "", false); - destinationHandler.execute(sql); + final Sql sql = getGenerator().createTable(getIncrementalDedupStream(), "", false); + getDestinationHandler().execute(sql); - List> initialStatuses = destinationHandler.gatherInitialState(List.of(incrementalDedupStream)); + List> initialStatuses = getDestinationHandler().gatherInitialState(List.of(getIncrementalDedupStream())); assertEquals(1, initialStatuses.size()); final DestinationInitialStatus initialStatus = initialStatuses.getFirst(); assertTrue(initialStatus.isFinalTablePresent()); diff --git a/airbyte-integrations/connectors/destination-postgres/src/testFixtures/java/io/airbyte/integrations/destination/postgres/PostgresTestDatabase.java b/airbyte-integrations/connectors/destination-postgres/src/testFixtures/java/io/airbyte/integrations/destination/postgres/PostgresTestDatabase.java index 6849af062f50..afc0ea8d59c6 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/testFixtures/java/io/airbyte/integrations/destination/postgres/PostgresTestDatabase.java +++ b/airbyte-integrations/connectors/destination-postgres/src/testFixtures/java/io/airbyte/integrations/destination/postgres/PostgresTestDatabase.java @@ -190,9 +190,9 @@ public PostgresConfigBuilder withCdcReplication(String LsnCommitBehaviour) { .with("is_test", true) .with("replication_method", Jsons.jsonNode(ImmutableMap.builder() .put("method", "CDC") - .put("replication_slot", testDatabase.getReplicationSlotName()) - .put("publication", testDatabase.getPublicationName()) - .put("initial_waiting_seconds", DEFAULT_CDC_REPLICATION_INITIAL_WAIT.getSeconds()) + .put("replication_slot", getTestDatabase().getReplicationSlotName()) + .put("publication", getTestDatabase().getPublicationName()) + .put("initial_waiting_seconds", ConfigBuilder.DEFAULT_CDC_REPLICATION_INITIAL_WAIT.getSeconds()) .put("lsn_commit_behaviour", LsnCommitBehaviour) .build())); } diff --git a/airbyte-integrations/connectors/destination-postgres/src/testFixtures/java/io/airbyte/integrations/destination/postgres/typing_deduping/AbstractPostgresTypingDedupingTest.java b/airbyte-integrations/connectors/destination-postgres/src/testFixtures/java/io/airbyte/integrations/destination/postgres/typing_deduping/AbstractPostgresTypingDedupingTest.java index 26a658e310f0..c460a1d337a2 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/testFixtures/java/io/airbyte/integrations/destination/postgres/typing_deduping/AbstractPostgresTypingDedupingTest.java +++ b/airbyte-integrations/connectors/destination-postgres/src/testFixtures/java/io/airbyte/integrations/destination/postgres/typing_deduping/AbstractPostgresTypingDedupingTest.java @@ -57,15 +57,15 @@ protected JdbcCompatibleSourceOperations getSourceOperations() { @Test public void testMixedCasedSchema() throws Exception { - streamName = "MixedCaseSchema" + streamName; + setStreamName("MixedCaseSchema" + getStreamName()); final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of( new ConfiguredAirbyteStream() .withSyncMode(SyncMode.FULL_REFRESH) .withDestinationSyncMode(DestinationSyncMode.OVERWRITE) .withStream(new AirbyteStream() - .withNamespace(streamNamespace) - .withName(streamName) - .withJsonSchema(SCHEMA)))); + .withNamespace(getStreamNamespace()) + .withName(getStreamName()) + .withJsonSchema(getSchema())))); // First sync final List messages1 = readMessages("dat/sync1_messages.jsonl"); @@ -79,23 +79,23 @@ public void testMixedCasedSchema() throws Exception { @Test public void testMixedCaseRawTableV1V2Migration() throws Exception { - streamName = "Mixed Case Table" + streamName; + setStreamName("Mixed Case Table" + getStreamName()); final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of( new ConfiguredAirbyteStream() .withSyncMode(SyncMode.FULL_REFRESH) .withDestinationSyncMode(DestinationSyncMode.APPEND) .withStream(new AirbyteStream() - .withNamespace(streamNamespace) - .withName(streamName) - .withJsonSchema(SCHEMA)))); + .withNamespace(getStreamNamespace()) + .withName(getStreamName()) + .withJsonSchema(getSchema())))); // First sync final List messages1 = readMessages("dat/sync1_messages.jsonl"); runSync(catalog, messages1, "airbyte/destination-postgres:0.6.3"); // Special case to retrieve raw records pre DV2 using the same logic as actual code. - final List rawActualRecords = database.queryJsons( - DSL.selectFrom(DSL.name(streamNamespace, "_airbyte_raw_" + Names.toAlphanumericAndUnderscore(streamName).toLowerCase())).getSQL()); + final List rawActualRecords = getDatabase().queryJsons( + DSL.selectFrom(DSL.name(getStreamNamespace(), "_airbyte_raw_" + Names.toAlphanumericAndUnderscore(getStreamName()).toLowerCase())).getSQL()); // Just verify the size of raw pre DV2, postgres was lower casing the MixedCaseSchema so above // retrieval should give 5 records from sync1 assertEquals(5, rawActualRecords.size()); @@ -113,9 +113,9 @@ public void testRawTableMetaMigration_append() throws Exception { .withSyncMode(SyncMode.FULL_REFRESH) .withDestinationSyncMode(DestinationSyncMode.APPEND) .withStream(new AirbyteStream() - .withNamespace(streamNamespace) - .withName(streamName) - .withJsonSchema(SCHEMA)))); + .withNamespace(getStreamNamespace()) + .withName(getStreamName()) + .withJsonSchema(getSchema())))); // First sync without _airbyte_meta final List messages1 = readMessages("dat/sync1_messages.jsonl"); @@ -138,9 +138,9 @@ public void testRawTableMetaMigration_incrementalDedupe() throws Exception { .withDestinationSyncMode(DestinationSyncMode.APPEND_DEDUP) .withPrimaryKey(List.of(List.of("id1"), List.of("id2"))) .withStream(new AirbyteStream() - .withNamespace(streamNamespace) - .withName(streamName) - .withJsonSchema(SCHEMA)))); + .withNamespace(getStreamNamespace()) + .withName(getStreamName()) + .withJsonSchema(getSchema())))); // First sync without _airbyte_meta final List messages1 = readMessages("dat/sync1_messages.jsonl"); @@ -166,9 +166,9 @@ public void testVarcharLimitOver64K() throws Exception { .withSyncMode(SyncMode.FULL_REFRESH) .withDestinationSyncMode(DestinationSyncMode.OVERWRITE) .withStream(new AirbyteStream() - .withNamespace(streamNamespace) - .withName(streamName) - .withJsonSchema(SCHEMA)))); + .withNamespace(getStreamNamespace()) + .withName(getStreamName()) + .withJsonSchema(getSchema())))); final AirbyteMessage message = new AirbyteMessage(); final String largeString = generateBigString(); @@ -179,8 +179,8 @@ public void testVarcharLimitOver64K() throws Exception { "name", largeString); message.setType(Type.RECORD); message.setRecord(new AirbyteRecordMessage() - .withNamespace(streamNamespace) - .withStream(streamName) + .withNamespace(getStreamNamespace()) + .withStream(getStreamName()) .withData(Jsons.jsonNode(data)) .withEmittedAt(1000L)); final List messages1 = new ArrayList<>(); @@ -189,7 +189,7 @@ public void testVarcharLimitOver64K() throws Exception { // Only assert on the large varchar string landing in final table. // Rest of the fields' correctness is tested by other means in other tests. - final List actualFinalRecords = dumpFinalTableRecords(streamNamespace, streamName); + final List actualFinalRecords = dumpFinalTableRecords(getStreamNamespace(), getStreamName()); assertEquals(1, actualFinalRecords.size()); assertEquals(largeString, actualFinalRecords.get(0).get("name").asText()); diff --git a/docs/integrations/destinations/postgres.md b/docs/integrations/destinations/postgres.md index cf6c1bd8f081..722bcdce84ad 100644 --- a/docs/integrations/destinations/postgres.md +++ b/docs/integrations/destinations/postgres.md @@ -236,6 +236,7 @@ Now that you have set up the Postgres destination connector, check out the follo | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------| +| 2.0.6 | 2024-04-05 | [\#36620](https://github.com/airbytehq/airbyte/pull/36620) | Adopt CDK 0.29.3 to use Kotlin CDK | | 2.0.5 | 2024-03-07 | [\#35899](https://github.com/airbytehq/airbyte/pull/35899) | Adopt CDK 0.27.3; Bugfix for case-senstive table names in v1-v2 migration, `_airbyte_meta` in raw tables | | 2.0.4 | 2024-03-07 | [\#35899](https://github.com/airbytehq/airbyte/pull/35899) | Adopt CDK 0.23.18; Null safety check in state parsing | | 2.0.3 | 2024-03-01 | [\#35528](https://github.com/airbytehq/airbyte/pull/35528) | Adopt CDK 0.23.11; Use Migration framework |