Skip to content

Commit

Permalink
Destination postgres: add option for cascade drop (#36974)
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao authored Apr 11, 2024
1 parent 426b65a commit 9ac5233
Show file tree
Hide file tree
Showing 16 changed files with 101 additions and 28 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ Maven and Gradle will automatically reference the correct (pinned) version of th

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.30.0 | 2024-04-11 | [\#36974](https://github.com/airbytehq/airbyte/pull/36974) | Destinations: Pass config to jdbc sqlgenerator; allow cascade drop |
| 0.29.13 | 2024-04-10 | [\#36981](https://github.com/airbytehq/airbyte/pull/36981) | DB sources : Emit analytics for data type serialization errors. |
| 0.29.12 | 2024-04-10 | [\#36973](https://github.com/airbytehq/airbyte/pull/36973) | Destinations: Make flush batch size configurable for JdbcInsertFlush |
| 0.29.11 | 2024-04-10 | [\#36865](https://github.com/airbytehq/airbyte/pull/36865) | Sources : Remove noisy log line. |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.29.13
version=0.30.0
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat

abstract fun toJdbcConfig(config: JsonNode): JsonNode

protected abstract val sqlGenerator: JdbcSqlGenerator
protected abstract fun getSqlGenerator(config: JsonNode): JdbcSqlGenerator

protected abstract fun getDestinationHandler(
databaseName: String,
Expand Down Expand Up @@ -281,7 +281,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
database: JdbcDatabase,
defaultNamespace: String
): SerializedAirbyteMessageConsumer {
val sqlGenerator = sqlGenerator
val sqlGenerator = getSqlGenerator(config)
val rawNamespaceOverride = getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE)
val parsedCatalog =
rawNamespaceOverride
Expand Down Expand Up @@ -314,6 +314,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
database: JdbcDatabase,
parsedCatalog: ParsedCatalog,
): TyperDeduper {
val sqlGenerator = getSqlGenerator(config)
val databaseName = getDatabaseName(config)
val v2TableMigrator = NoopV2TableMigrator()
val migrator = JdbcV1V2Migrator(namingResolver, database, databaseName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,12 @@ import org.jooq.conf.ParamType
import org.jooq.impl.DSL
import org.jooq.impl.SQLDataType

abstract class JdbcSqlGenerator(protected val namingTransformer: NamingConventionTransformer) :
SqlGenerator {
abstract class JdbcSqlGenerator
@JvmOverloads
constructor(
protected val namingTransformer: NamingConventionTransformer,
private val cascadeDrop: Boolean = false
) : SqlGenerator {
protected val cdcDeletedAtColumn: ColumnId = buildColumnId("_ab_cdc_deleted_at")

override fun buildStreamId(
Expand Down Expand Up @@ -86,20 +90,16 @@ abstract class JdbcSqlGenerator(protected val namingTransformer: NamingConventio
}

protected abstract val structType: DataType<*>
get

protected abstract val arrayType: DataType<*>?
get

@get:VisibleForTesting
val timestampWithTimeZoneType: DataType<*>
get() = toDialectType(AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE)

protected abstract val widestType: DataType<*>?
get

protected abstract val dialect: SQLDialect?
get

/**
* @param columns from the schema to be extracted from _airbyte_data column. Use the destination
Expand Down Expand Up @@ -265,13 +265,17 @@ abstract class JdbcSqlGenerator(protected val namingTransformer: NamingConventio
.toList()
)
}

val dropTableStep =
DSL.dropTableIfExists(DSL.quotedName(stream.id.finalNamespace, finalTableIdentifier))
if (cascadeDrop) {
dropTableStep.cascade()
}

return transactionally(
Stream.concat(
Stream.of(
DSL.dropTableIfExists(
DSL.quotedName(stream.id.finalNamespace, finalTableIdentifier)
)
.getSQL(ParamType.INLINED),
dropTableStep.getSQL(ParamType.INLINED),
createTableSql(
stream.id.finalNamespace,
finalTableIdentifier,
Expand All @@ -285,25 +289,28 @@ abstract class JdbcSqlGenerator(protected val namingTransformer: NamingConventio
}

override fun updateTable(
streamConfig: StreamConfig,
stream: StreamConfig,
finalSuffix: String,
minRawTimestamp: Optional<Instant>,
useExpensiveSaferCasting: Boolean
): Sql {
// TODO: Add flag to use merge vs insert/delete

return insertAndDeleteTransaction(
streamConfig,
stream,
finalSuffix,
minRawTimestamp,
useExpensiveSaferCasting
)
}

override fun overwriteFinalTable(stream: StreamId, finalSuffix: String): Sql {
val dropTableStep = DSL.dropTableIfExists(DSL.name(stream.finalNamespace, stream.finalName))
if (cascadeDrop) {
dropTableStep.cascade()
}
return transactionally(
DSL.dropTableIfExists(DSL.name(stream.finalNamespace, stream.finalName))
.getSQL(ParamType.INLINED),
dropTableStep.getSQL(ParamType.INLINED),
DSL.alterTable(DSL.name(stream.finalNamespace, stream.finalName + finalSuffix))
.renameTo(DSL.name(stream.finalName))
.sql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ class AbstractJdbcDestinationTest {
return config
}

override val sqlGenerator: JdbcSqlGenerator = mock()
override fun getSqlGenerator(config: JsonNode): JdbcSqlGenerator = mock()

override fun getDestinationHandler(
databaseName: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.29.10'
cdkVersionRequired = '0.30.0'
features = ['db-destinations', 'typing-deduping', 'datastore-postgres']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 25c5221d-dce2-4163-ade9-739ef790f503
dockerImageTag: 2.0.8
dockerImageTag: 2.0.9
dockerRepository: airbyte/destination-postgres-strict-encrypt
documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres
githubIssueLabel: destination-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,13 @@
}
}
]
},
"drop_cascade": {
"type": "boolean",
"default": false,
"description": "Drop tables with CASCADE. WARNING! This will delete all data in all dependent objects (views, etc.). Use with caution. This option is intended for usecases which can easily rebuild the dependent objects.",
"title": "Drop tables with CASCADE. (WARNING! Risk of unrecoverable data loss)",
"order": 11
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.29.10'
cdkVersionRequired = '0.30.0'
features = ['db-destinations', 'datastore-postgres', 'typing-deduping']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 25c5221d-dce2-4163-ade9-739ef790f503
dockerImageTag: 2.0.8
dockerImageTag: 2.0.9
dockerRepository: airbyte/destination-postgres
documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres
githubIssueLabel: destination-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public class PostgresDestination extends AbstractJdbcDestination<PostgresState>

public static final String DRIVER_CLASS = DatabaseDriver.POSTGRESQL.getDriverClassName();

private static final String DROP_CASCADE_OPTION = "drop_cascade";

public static Destination sshWrappedDestination() {
return new SshWrappedDestination(new PostgresDestination(), JdbcUtils.HOST_LIST_KEY, JdbcUtils.PORT_LIST_KEY);
}
Expand Down Expand Up @@ -132,8 +134,10 @@ public JsonNode toJdbcConfig(final JsonNode config) {
}

@Override
protected JdbcSqlGenerator getSqlGenerator() {
return new PostgresSqlGenerator(new PostgresSQLNameTransformer());
protected JdbcSqlGenerator getSqlGenerator(final JsonNode config) {
final JsonNode dropCascadeNode = config.get(DROP_CASCADE_OPTION);
final boolean dropCascade = dropCascadeNode != null && dropCascadeNode.asBoolean();
return new PostgresSqlGenerator(new PostgresSQLNameTransformer(), dropCascade);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ public class PostgresSqlGenerator extends JdbcSqlGenerator {
private static final String AB_META_CHANGES_CHANGE_KEY = "change";
private static final String AB_META_CHANGES_REASON_KEY = "reason";

public PostgresSqlGenerator(final NamingConventionTransformer namingTransformer) {
super(namingTransformer);
public PostgresSqlGenerator(final NamingConventionTransformer namingTransformer, final boolean cascadeDrop) {
super(namingTransformer, cascadeDrop);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,13 @@
"description": "Disable Writing Final Tables. WARNING! The data format in _airbyte_data is likely stable but there are no guarantees that other metadata columns will remain the same in future versions",
"title": "Disable Final Tables. (WARNING! Unstable option; Columns in raw table schema might change between versions)",
"order": 10
},
"drop_cascade": {
"type": "boolean",
"default": false,
"description": "Drop tables with CASCADE. WARNING! This will delete all data in all dependent objects (views, etc.). Use with caution. This option is intended for usecases which can easily rebuild the dependent objects.",
"title": "Drop tables with CASCADE. (WARNING! Risk of unrecoverable data loss)",
"order": 11
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
package io.airbyte.integrations.destination.postgres.typing_deduping;

import static io.airbyte.integrations.destination.postgres.typing_deduping.PostgresSqlGenerator.JSONB_TYPE;
import static org.jooq.impl.DSL.createView;
import static org.jooq.impl.DSL.quotedName;
import static org.jooq.impl.DSL.select;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand All @@ -26,6 +30,7 @@
import org.jooq.DataType;
import org.jooq.Field;
import org.jooq.SQLDialect;
import org.jooq.conf.ParamType;
import org.jooq.impl.DSL;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -71,7 +76,7 @@ protected DataType<?> getStructType() {

@Override
protected JdbcSqlGenerator getSqlGenerator() {
return new PostgresSqlGenerator(new PostgresSQLNameTransformer());
return new PostgresSqlGenerator(new PostgresSQLNameTransformer(), false);
}

@Override
Expand Down Expand Up @@ -102,4 +107,23 @@ public void testCreateTableIncremental() throws Exception {
assertFalse(initialStatus.isSchemaMismatch());
}

/**
* Verify that we correctly DROP...CASCADE the final table when cascadeDrop is enabled.
*/
@Test
public void testCascadeDrop() throws Exception {
// Explicitly create a sqlgenerator with cascadeDrop=true
final PostgresSqlGenerator generator = new PostgresSqlGenerator(new PostgresSQLNameTransformer(), true);
// Create a table, then create a view referencing it
getDestinationHandler().execute(generator.createTable(getIncrementalAppendStream(), "", false));
database.execute(createView(quotedName(getIncrementalAppendStream().getId().getFinalNamespace(), "example_view"))
.as(select().from(quotedName(getIncrementalAppendStream().getId().getFinalNamespace(), getIncrementalAppendStream().getId().getFinalName())))
.getSQL(ParamType.INLINED));
// Create a "soft reset" table
getDestinationHandler().execute(generator.createTable(getIncrementalDedupStream(), "_soft_reset", false));

// Overwriting the first table with the second table should succeed.
assertDoesNotThrow(() -> getDestinationHandler().execute(generator.overwriteFinalTable(getIncrementalDedupStream().getId(), "_soft_reset")));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ private String generateBigString() {

@Override
protected SqlGenerator getSqlGenerator() {
return new PostgresSqlGenerator(new PostgresSQLNameTransformer());
return new PostgresSqlGenerator(new PostgresSQLNameTransformer(), false);
}

@Override
Expand Down
22 changes: 22 additions & 0 deletions docs/integrations/destinations/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,27 @@ will contain 3 columns:
| time_without_timezone | TIME |
| date | DATE |

## Creating dependent objects

:::caution

This section involves running `DROP ... CASCADE` on the tables that Airbyte produces. Make sure you
fully understand the consequences before enabling this option. **Permanent** data loss is possible
with this option!

:::

You may want to create objects that depend on the tables generated by Airbyte, such as views. If you
do so, we strongly recommend:
* Using a tool like `dbt` to automate the creation
* And using an orchestrator to trigger `dbt`.

This is because you will need to enable the "Drop tables with CASCADE" option. The connector
sometimes needs to recreate the tables; if you have created dependent objects, Postgres will require
the connector to run drop statements with CASCADE enabled. However, this will cause the connector to
**also drop the dependent objects**. Therefore, you MUST have a way to recreate those dependent
objects from scratch.

## Tutorials

Now that you have set up the Postgres destination connector, check out the following tutorials:
Expand All @@ -236,6 +257,7 @@ Now that you have set up the Postgres destination connector, check out the follo

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------|
| 2.0.9 | 2024-04-11 | [\#36974](https://github.com/airbytehq/airbyte/pull/36974) | Add option to drop with `CASCADE` |
| 2.0.8 | 2024-04-10 | [\#36805](https://github.com/airbytehq/airbyte/pull/36805) | Adopt CDK 0.29.10 to improve long column name handling |
| 2.0.7 | 2024-04-08 | [\#36768](https://github.com/airbytehq/airbyte/pull/36768) | Adopt CDK 0.29.7 to improve destination state handling |
| 2.0.6 | 2024-04-05 | [\#36620](https://github.com/airbytehq/airbyte/pull/36620) | Adopt CDK 0.29.3 to use Kotlin CDK |
Expand Down

0 comments on commit 9ac5233

Please sign in to comment.