Skip to content

Commit

Permalink
Merge branch 'master' into christo/auth-cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristoGrab authored Jul 1, 2024
2 parents 7fc1dd1 + 779d363 commit 5de4607
Show file tree
Hide file tree
Showing 212 changed files with 8,818 additions and 5,278 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.63.3
current_version = 0.63.4
commit = False
tag = False
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)(\-[a-z]+)?
Expand Down
4 changes: 4 additions & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
|:--------| :--------- | :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.40.8 | 2024-06-25 | [\#40499](https://github.com/airbytehq/airbyte/pull/40499) | Make JdbcDatabase SQL statement logging optional; add generation_id support to JdbcSqlGenerator |
| 0.38.3 | 2024-06-25 | [\#40499](https://github.com/airbytehq/airbyte/pull/40499) | (backport) Make JdbcDatabase SQL statement logging optional; add generation_id support to JdbcSqlGenerator |
| 0.40.7 | 2024-07-01 | [40516](https://github.com/airbytehq/airbyte/pull/40516) | Remove dbz hearbeat. |
| ~~0.40.6~~ | | | (this version does not exist) |
| 0.40.5 | 2024-06-26 | [\#40517](https://github.com/airbytehq/airbyte/pull/40517) | JdbcDatabase.executeWithinTransaction allows disabling SQL statement logging |
| 0.35.16 | 2024-06-25 | [\#40517](https://github.com/airbytehq/airbyte/pull/40517) | (backport) JdbcDatabase.executeWithinTransaction allows disabling SQL statement logging |
| 0.40.4 | 2024-06-18 | [\#40254](https://github.com/airbytehq/airbyte/pull/40254) | Destinations: Do not throw on unrecognized airbyte message type (ignore message instead) |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ object DbAnalyticsUtils {
const val CDC_CURSOR_INVALID_KEY: String = "db-sources-cdc-cursor-invalid"
const val DATA_TYPES_SERIALIZATION_ERROR_KEY = "db-sources-data-serialization-error"
const val CDC_SNAPSHOT_FORCE_SHUTDOWN_KEY = "db-sources-snapshot-force-shutdown"
const val DEBEZIUM_CLOSE_REASON_KEY = "db-sources-debezium-close-reason"

@JvmStatic
fun cdcCursorInvalidMessage(): AirbyteAnalyticsTraceMessage {
Expand All @@ -33,4 +34,9 @@ object DbAnalyticsUtils {
.withType(CDC_SNAPSHOT_FORCE_SHUTDOWN_KEY)
.withValue("1")
}

@JvmStatic
fun debeziumCloseReasonMessage(reason: String): AirbyteAnalyticsTraceMessage {
return AirbyteAnalyticsTraceMessage().withType(DEBEZIUM_CLOSE_REASON_KEY).withValue(reason)
}
}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.40.5
version=0.40.8
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.ObjectNode
import io.airbyte.cdk.db.jdbc.JdbcDatabase
import io.airbyte.cdk.integrations.base.JavaBaseConstants
import io.airbyte.cdk.integrations.base.JavaBaseConstants.DestinationColumns
import io.airbyte.cdk.integrations.destination.jdbc.ColumnDefinition
import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition
import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil.getResultsOrLogAndThrowFirst
Expand Down Expand Up @@ -50,7 +51,8 @@ abstract class JdbcDestinationHandler<DestinationState>(
protected val catalogName: String?,
protected val jdbcDatabase: JdbcDatabase,
protected val rawTableNamespace: String,
private val dialect: SQLDialect
private val dialect: SQLDialect,
private val columns: DestinationColumns = DestinationColumns.V2_WITH_GENERATION,
) : DestinationHandler<DestinationState> {
protected val dslContext: DSLContext
get() = DSL.using(dialect)
Expand Down Expand Up @@ -363,6 +365,14 @@ abstract class JdbcDestinationHandler<DestinationState>(
)
}

protected open fun isAirbyteGenerationColumnMatch(existingTable: TableDefinition): Boolean {
return toJdbcTypeName(AirbyteProtocolType.INTEGER)
.equals(
existingTable.columns.getValue(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID).type,
ignoreCase = true,
)
}

open protected fun existingSchemaMatchesStreamConfig(
stream: StreamConfig?,
existingTable: TableDefinition
Expand All @@ -375,7 +385,11 @@ abstract class JdbcDestinationHandler<DestinationState>(
JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT
) && isAirbyteExtractedAtColumnMatch(existingTable)) ||
!(existingTable.columns.containsKey(JavaBaseConstants.COLUMN_NAME_AB_META) &&
isAirbyteMetaColumnMatch(existingTable))
isAirbyteMetaColumnMatch(existingTable)) ||
(columns == DestinationColumns.V2_WITH_GENERATION &&
!(existingTable.columns.containsKey(
JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID
) && isAirbyteGenerationColumnMatch(existingTable)))
) {
// Missing AB meta columns from final table, we need them to do proper T+D so trigger
// soft-reset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package io.airbyte.cdk.integrations.destination.jdbc.typing_deduping

import com.google.common.annotations.VisibleForTesting
import io.airbyte.cdk.integrations.base.JavaBaseConstants
import io.airbyte.cdk.integrations.base.JavaBaseConstants.DestinationColumns
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType
Expand All @@ -23,12 +24,11 @@ import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf
import io.airbyte.protocol.models.v0.DestinationSyncMode
import java.sql.Timestamp
import java.time.Instant
import java.util.*
import kotlin.Any
import kotlin.Boolean
import kotlin.IllegalArgumentException
import java.util.Locale
import java.util.Optional
import kotlin.Int
import org.jooq.Condition
import org.jooq.CreateTableColumnStep
import org.jooq.DSLContext
import org.jooq.DataType
import org.jooq.Field
Expand All @@ -37,6 +37,7 @@ import org.jooq.Name
import org.jooq.Record
import org.jooq.SQLDialect
import org.jooq.SelectConditionStep
import org.jooq.SelectFieldOrAsterisk
import org.jooq.conf.ParamType
import org.jooq.impl.DSL
import org.jooq.impl.SQLDataType
Expand All @@ -45,7 +46,9 @@ abstract class JdbcSqlGenerator
@JvmOverloads
constructor(
protected val namingTransformer: NamingConventionTransformer,
private val cascadeDrop: Boolean = false
private val cascadeDrop: Boolean = false,
@VisibleForTesting
internal val columns: DestinationColumns = DestinationColumns.V2_WITH_GENERATION,
) : SqlGenerator {
protected val cdcDeletedAtColumn: ColumnId = buildColumnId("_ab_cdc_deleted_at")

Expand Down Expand Up @@ -199,6 +202,9 @@ constructor(
SQLDataType.VARCHAR(36).nullable(false)
metaColumns[JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT] =
timestampWithTimeZoneType.nullable(false)
if (columns == DestinationColumns.V2_WITH_GENERATION) {
metaColumns[JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID] = SQLDataType.BIGINT
}
if (includeMetaColumn)
metaColumns[JavaBaseConstants.COLUMN_NAME_AB_META] = structType.nullable(false)
return metaColumns
Expand Down Expand Up @@ -332,38 +338,50 @@ constructor(
rawTableName: Name,
namespace: String,
tableName: String
) =
dslContext
.createTable(rawTableName)
.column(
JavaBaseConstants.COLUMN_NAME_AB_RAW_ID,
SQLDataType.VARCHAR(36).nullable(false),
)
.column(
JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT,
timestampWithTimeZoneType.nullable(false),
)
.column(
JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT,
timestampWithTimeZoneType.nullable(true),
)
.column(JavaBaseConstants.COLUMN_NAME_DATA, structType.nullable(false))
.column(JavaBaseConstants.COLUMN_NAME_AB_META, structType.nullable(true))
.`as`(
DSL.select(
DSL.field(JavaBaseConstants.COLUMN_NAME_AB_ID)
.`as`(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID),
DSL.field(JavaBaseConstants.COLUMN_NAME_EMITTED_AT)
.`as`(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT),
DSL.cast(null, timestampWithTimeZoneType)
.`as`(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT),
DSL.field(JavaBaseConstants.COLUMN_NAME_DATA)
.`as`(JavaBaseConstants.COLUMN_NAME_DATA),
DSL.cast(null, structType).`as`(JavaBaseConstants.COLUMN_NAME_AB_META),
)
.from(DSL.table(DSL.name(namespace, tableName))),
): String {
val hasGenerationId = columns == DestinationColumns.V2_WITH_GENERATION

val createTable: CreateTableColumnStep =
dslContext
.createTable(rawTableName)
.column(
JavaBaseConstants.COLUMN_NAME_AB_RAW_ID,
SQLDataType.VARCHAR(36).nullable(false),
)
.column(
JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT,
timestampWithTimeZoneType.nullable(false),
)
.column(
JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT,
timestampWithTimeZoneType.nullable(true),
)
.column(JavaBaseConstants.COLUMN_NAME_DATA, structType.nullable(false))
.column(JavaBaseConstants.COLUMN_NAME_AB_META, structType.nullable(true))
if (hasGenerationId) {
createTable.column(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID, SQLDataType.BIGINT)
}

val selectColumns: MutableList<SelectFieldOrAsterisk> =
mutableListOf(
DSL.field(JavaBaseConstants.COLUMN_NAME_AB_ID)
.`as`(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID),
DSL.field(JavaBaseConstants.COLUMN_NAME_EMITTED_AT)
.`as`(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT),
DSL.cast(null, timestampWithTimeZoneType)
.`as`(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT),
DSL.field(JavaBaseConstants.COLUMN_NAME_DATA)
.`as`(JavaBaseConstants.COLUMN_NAME_DATA),
DSL.cast(null, structType).`as`(JavaBaseConstants.COLUMN_NAME_AB_META),
)
if (hasGenerationId) {
selectColumns += DSL.value(0).`as`(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID)
}

return createTable
.`as`(DSL.select(selectColumns).from(DSL.table(DSL.name(namespace, tableName))))
.getSQL(ParamType.INLINED)
}

override fun clearLoadedAt(streamId: StreamId): Sql {
return of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.cdk.db.jdbc.JdbcDatabase
import io.airbyte.cdk.integrations.base.JavaBaseConstants
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_ID
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_META
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_RAW_ID
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_DATA
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_EMITTED_AT
import io.airbyte.cdk.integrations.base.JavaBaseConstants.DestinationColumns
import io.airbyte.cdk.integrations.base.JavaBaseConstants.LEGACY_RAW_TABLE_COLUMNS
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType
Expand Down Expand Up @@ -90,16 +92,18 @@ abstract class JdbcSqlGeneratorIntegrationTest<DestinationState : MinimumDestina

@Throws(Exception::class)
override fun createRawTable(streamId: StreamId) {
database.execute(
val columns =
dslContext
.createTable(DSL.name(streamId.rawNamespace, streamId.rawName))
.column(COLUMN_NAME_AB_RAW_ID, SQLDataType.VARCHAR(36).nullable(false))
.column(COLUMN_NAME_AB_EXTRACTED_AT, timestampWithTimeZoneType.nullable(false))
.column(COLUMN_NAME_AB_LOADED_AT, timestampWithTimeZoneType)
.column(COLUMN_NAME_DATA, structType.nullable(false))
.column(COLUMN_NAME_AB_META, structType.nullable(true))
.getSQL(ParamType.INLINED)
)
if (sqlGenerator.columns == DestinationColumns.V2_WITH_GENERATION) {
columns.column(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID, SQLDataType.BIGINT)
}
database.execute(columns.getSQL(ParamType.INLINED))
}

@Throws(Exception::class)
Expand All @@ -118,7 +122,7 @@ abstract class JdbcSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
public override fun insertRawTableRecords(streamId: StreamId, records: List<JsonNode>) {
insertRecords(
DSL.name(streamId.rawNamespace, streamId.rawName),
JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES,
sqlGenerator.columns.rawColumns,
records,
COLUMN_NAME_DATA,
COLUMN_NAME_AB_META
Expand All @@ -143,9 +147,12 @@ abstract class JdbcSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
records: List<JsonNode>,
generationId: Long,
) {
// TODO handle generation ID
val columnNames =
if (includeCdcDeletedAt) FINAL_TABLE_COLUMN_NAMES_CDC else FINAL_TABLE_COLUMN_NAMES
(if (includeCdcDeletedAt) FINAL_TABLE_COLUMN_NAMES_CDC else FINAL_TABLE_COLUMN_NAMES)
.toMutableList()
if (sqlGenerator.columns == DestinationColumns.V2_WITH_GENERATION) {
columnNames += COLUMN_NAME_AB_GENERATION_ID
}
insertRecords(
DSL.name(streamId.finalNamespace, streamId.finalName + suffix),
columnNames,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,20 +80,21 @@ class AirbyteDebeziumHandler<T>(
cdcSavedInfoFetcher.savedOffset,
if (addDbNameToOffsetState)
Optional.ofNullable<String>(config[JdbcUtils.DATABASE_KEY].asText())
else Optional.empty<String>()
else Optional.empty<String>(),
)
val schemaHistoryManager: Optional<AirbyteSchemaHistoryStorage> =
if (trackSchemaHistory)
Optional.of<AirbyteSchemaHistoryStorage>(
AirbyteSchemaHistoryStorage.Companion.initializeDBHistory(
cdcSavedInfoFetcher.savedSchemaHistory,
cdcStateHandler.compressSchemaHistoryForState()
)
cdcStateHandler.compressSchemaHistoryForState(),
),
)
else Optional.empty<AirbyteSchemaHistoryStorage>()
val publisher = DebeziumRecordPublisher(debeziumPropertiesManager)
val queue: CapacityReportingBlockingQueue<ChangeEvent<String?, String?>> =
CapacityReportingBlockingQueue(queueSize)

publisher.start(queue, offsetManager, schemaHistoryManager)
// handle state machine around pub/sub logic.
val eventIterator: AutoCloseableIterator<ChangeEventWithMetadata> =
Expand All @@ -102,13 +103,14 @@ class AirbyteDebeziumHandler<T>(
targetPosition,
{ publisher.hasClosed() },
DebeziumShutdownProcedure(queue, { publisher.close() }, { publisher.hasClosed() }),
firstRecordWaitTime
firstRecordWaitTime,
config
)

val syncCheckpointDuration =
if (config.has(DebeziumIteratorConstants.SYNC_CHECKPOINT_DURATION_PROPERTY))
Duration.ofSeconds(
config[DebeziumIteratorConstants.SYNC_CHECKPOINT_DURATION_PROPERTY].asLong()
config[DebeziumIteratorConstants.SYNC_CHECKPOINT_DURATION_PROPERTY].asLong(),
)
else DebeziumIteratorConstants.SYNC_CHECKPOINT_DURATION
val syncCheckpointRecords =
Expand All @@ -122,7 +124,7 @@ class AirbyteDebeziumHandler<T>(
targetPosition,
eventConverter,
offsetManager,
schemaHistoryManager
schemaHistoryManager,
)

// Usually sourceStateIterator requires airbyteStream as input. For DBZ iterator, stream is
Expand All @@ -133,7 +135,7 @@ class AirbyteDebeziumHandler<T>(
eventIterator,
null,
messageProducer,
StateEmitFrequency(syncCheckpointRecords, syncCheckpointDuration)
StateEmitFrequency(syncCheckpointRecords, syncCheckpointDuration),
)
return AutoCloseableIterators.fromIterator(iterator)
}
Expand Down
Loading

0 comments on commit 5de4607

Please sign in to comment.