Skip to content

Commit

Permalink
Merge branch 'master' into agpapa/braintree-subscription-date-types
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristoGrab authored Jul 3, 2024
2 parents 3b4231b + 4a62ae7 commit 6fece13
Show file tree
Hide file tree
Showing 275 changed files with 22,077 additions and 7,346 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.63.3
current_version = 0.63.4
commit = False
tag = False
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)(\-[a-z]+)?
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/connectors_version_increment_check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ jobs:
uses: ./.github/actions/run-airbyte-ci
with:
context: "pull_request"
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_2 }}
# dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_2 }} Commenting this out as we believe Dagger cloud caching is causing excessively long jobs for such a small check
docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }}
docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }}
gcp_gsm_credentials: ${{ secrets.GCP_GSM_CREDENTIALS }}
Expand Down
7 changes: 6 additions & 1 deletion airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,14 +174,18 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
|:--------| :--------- | :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.40.9 | 2024-07-01 | [\#39473](https://github.com/airbytehq/airbyte/pull/39473) | minor changes around error logging and testing |
| 0.40.8 | 2024-07-01 | [\#40499](https://github.com/airbytehq/airbyte/pull/40499) | Make JdbcDatabase SQL statement logging optional; add generation_id support to JdbcSqlGenerator |
| 0.40.7 | 2024-07-01 | [\#40516](https://github.com/airbytehq/airbyte/pull/40516) | Remove dbz hearbeat. |
| ~~0.40.6~~ | | | (this version does not exist) |
| 0.40.5 | 2024-06-26 | [\#40517](https://github.com/airbytehq/airbyte/pull/40517) | JdbcDatabase.executeWithinTransaction allows disabling SQL statement logging |
| 0.35.16 | 2024-06-25 | [\#40517](https://github.com/airbytehq/airbyte/pull/40517) | (backport) JdbcDatabase.executeWithinTransaction allows disabling SQL statement logging |
| 0.40.4 | 2024-06-18 | [\#40254](https://github.com/airbytehq/airbyte/pull/40254) | Destinations: Do not throw on unrecognized airbyte message type (ignore message instead) |
| 0.40.3 | 2024-06-18 | [\#39526](https://github.com/airbytehq/airbyte/pull/39526) | Destinations: INCOMPLETE stream status is a TRANSIENT error rather than SYSTEM |
| 0.40.2 | 2024-06-18 | [\#39552](https://github.com/airbytehq/airbyte/pull/39552) | Destinations: Throw error if the ConfiguredCatalog has no streams |
| 0.40.1 | 2024-06-14 | [\#39349](https://github.com/airbytehq/airbyte/pull/39349) | Source stats for full refresh streams |
| 0.40.0 | 2024-06-17 | [\#38622](https://github.com/airbytehq/airbyte/pull/38622) | Destinations: Implement refreshes logic in AbstractStreamOperation |
| 0.39.0 | 2024-06-17 | [\#38067](https://github.com/airbytehq/airbyte/pull/38067) | Destinations: Breaking changes for refreshes (fail on INCOMPLETE stream status; ignore OVERWRITE sync mode) |
| 0.38.3 | 2024-06-25 | [\#40499](https://github.com/airbytehq/airbyte/pull/40499) | (backport) Make JdbcDatabase SQL statement logging optional; add generation_id support to JdbcSqlGenerator |
| 0.38.2 | 2024-06-14 | [\#39460](https://github.com/airbytehq/airbyte/pull/39460) | Bump postgres JDBC driver version |
| 0.38.1 | 2024-06-13 | [\#39445](https://github.com/airbytehq/airbyte/pull/39445) | Sources: More CDK changes to handle big initial snapshots. |
| 0.38.0 | 2024-06-11 | [\#39405](https://github.com/airbytehq/airbyte/pull/39405) | Sources: Debezium properties manager interface changed to accept a list of streams to scope to |
Expand All @@ -194,6 +198,7 @@ corresponds to that version.
| 0.36.4 | 2024-05-31 | [\#38824](https://github.com/airbytehq/airbyte/pull/38824) | Param marked as non-null to nullable in JdbcDestinationHandler for NPE fix |
| 0.36.2 | 2024-05-29 | [\#38538](https://github.com/airbytehq/airbyte/pull/38357) | Exit connector when encountering a config error. |
| 0.36.0 | 2024-05-29 | [\#38358](https://github.com/airbytehq/airbyte/pull/38358) | Plumb generation_id / sync_id to destinations code |
| 0.35.16 | 2024-06-25 | [\#40517](https://github.com/airbytehq/airbyte/pull/40517) | (backport) JdbcDatabase.executeWithinTransaction allows disabling SQL statement logging |
| 0.35.15 | 2024-05-31 | [\#38824](https://github.com/airbytehq/airbyte/pull/38824) | Param marked as non-null to nullable in JdbcDestinationHandler for NPE fix |
| 0.35.14 | 2024-05-28 | [\#38738](https://github.com/airbytehq/airbyte/pull/38738) | make ThreadCreationInfo cast as nullable |
| 0.35.13 | 2024-05-28 | [\#38632](https://github.com/airbytehq/airbyte/pull/38632) | minor changes to allow conversion of snowflake tests to kotlin |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ object DbAnalyticsUtils {
const val CDC_CURSOR_INVALID_KEY: String = "db-sources-cdc-cursor-invalid"
const val DATA_TYPES_SERIALIZATION_ERROR_KEY = "db-sources-data-serialization-error"
const val CDC_SNAPSHOT_FORCE_SHUTDOWN_KEY = "db-sources-snapshot-force-shutdown"
const val DEBEZIUM_CLOSE_REASON_KEY = "db-sources-debezium-close-reason"

@JvmStatic
fun cdcCursorInvalidMessage(): AirbyteAnalyticsTraceMessage {
Expand All @@ -33,4 +34,9 @@ object DbAnalyticsUtils {
.withType(CDC_SNAPSHOT_FORCE_SHUTDOWN_KEY)
.withValue("1")
}

@JvmStatic
fun debeziumCloseReasonMessage(reason: String): AirbyteAnalyticsTraceMessage {
return AirbyteAnalyticsTraceMessage().withType(DEBEZIUM_CLOSE_REASON_KEY).withValue(reason)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ internal constructor(
}
}
} catch (e: Exception) {
LOGGER.error(e) { "caught exception!" }
// Many of the exceptions thrown are nested inside layers of RuntimeExceptions. An
// attempt is made
// to
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.40.5
version=0.40.9
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.ObjectNode
import io.airbyte.cdk.db.jdbc.JdbcDatabase
import io.airbyte.cdk.integrations.base.JavaBaseConstants
import io.airbyte.cdk.integrations.base.JavaBaseConstants.DestinationColumns
import io.airbyte.cdk.integrations.destination.jdbc.ColumnDefinition
import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition
import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil.getResultsOrLogAndThrowFirst
Expand Down Expand Up @@ -50,7 +51,8 @@ abstract class JdbcDestinationHandler<DestinationState>(
protected val catalogName: String?,
protected val jdbcDatabase: JdbcDatabase,
protected val rawTableNamespace: String,
private val dialect: SQLDialect
private val dialect: SQLDialect,
private val columns: DestinationColumns = DestinationColumns.V2_WITH_GENERATION,
) : DestinationHandler<DestinationState> {
protected val dslContext: DSLContext
get() = DSL.using(dialect)
Expand Down Expand Up @@ -363,6 +365,14 @@ abstract class JdbcDestinationHandler<DestinationState>(
)
}

protected open fun isAirbyteGenerationColumnMatch(existingTable: TableDefinition): Boolean {
return toJdbcTypeName(AirbyteProtocolType.INTEGER)
.equals(
existingTable.columns.getValue(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID).type,
ignoreCase = true,
)
}

open protected fun existingSchemaMatchesStreamConfig(
stream: StreamConfig?,
existingTable: TableDefinition
Expand All @@ -375,7 +385,11 @@ abstract class JdbcDestinationHandler<DestinationState>(
JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT
) && isAirbyteExtractedAtColumnMatch(existingTable)) ||
!(existingTable.columns.containsKey(JavaBaseConstants.COLUMN_NAME_AB_META) &&
isAirbyteMetaColumnMatch(existingTable))
isAirbyteMetaColumnMatch(existingTable)) ||
(columns == DestinationColumns.V2_WITH_GENERATION &&
!(existingTable.columns.containsKey(
JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID
) && isAirbyteGenerationColumnMatch(existingTable)))
) {
// Missing AB meta columns from final table, we need them to do proper T+D so trigger
// soft-reset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package io.airbyte.cdk.integrations.destination.jdbc.typing_deduping

import com.google.common.annotations.VisibleForTesting
import io.airbyte.cdk.integrations.base.JavaBaseConstants
import io.airbyte.cdk.integrations.base.JavaBaseConstants.DestinationColumns
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType
Expand All @@ -23,12 +24,11 @@ import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf
import io.airbyte.protocol.models.v0.DestinationSyncMode
import java.sql.Timestamp
import java.time.Instant
import java.util.*
import kotlin.Any
import kotlin.Boolean
import kotlin.IllegalArgumentException
import java.util.Locale
import java.util.Optional
import kotlin.Int
import org.jooq.Condition
import org.jooq.CreateTableColumnStep
import org.jooq.DSLContext
import org.jooq.DataType
import org.jooq.Field
Expand All @@ -37,6 +37,7 @@ import org.jooq.Name
import org.jooq.Record
import org.jooq.SQLDialect
import org.jooq.SelectConditionStep
import org.jooq.SelectFieldOrAsterisk
import org.jooq.conf.ParamType
import org.jooq.impl.DSL
import org.jooq.impl.SQLDataType
Expand All @@ -45,7 +46,9 @@ abstract class JdbcSqlGenerator
@JvmOverloads
constructor(
protected val namingTransformer: NamingConventionTransformer,
private val cascadeDrop: Boolean = false
private val cascadeDrop: Boolean = false,
@VisibleForTesting
internal val columns: DestinationColumns = DestinationColumns.V2_WITH_GENERATION,
) : SqlGenerator {
protected val cdcDeletedAtColumn: ColumnId = buildColumnId("_ab_cdc_deleted_at")

Expand Down Expand Up @@ -199,6 +202,9 @@ constructor(
SQLDataType.VARCHAR(36).nullable(false)
metaColumns[JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT] =
timestampWithTimeZoneType.nullable(false)
if (columns == DestinationColumns.V2_WITH_GENERATION) {
metaColumns[JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID] = SQLDataType.BIGINT
}
if (includeMetaColumn)
metaColumns[JavaBaseConstants.COLUMN_NAME_AB_META] = structType.nullable(false)
return metaColumns
Expand Down Expand Up @@ -332,38 +338,50 @@ constructor(
rawTableName: Name,
namespace: String,
tableName: String
) =
dslContext
.createTable(rawTableName)
.column(
JavaBaseConstants.COLUMN_NAME_AB_RAW_ID,
SQLDataType.VARCHAR(36).nullable(false),
)
.column(
JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT,
timestampWithTimeZoneType.nullable(false),
)
.column(
JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT,
timestampWithTimeZoneType.nullable(true),
)
.column(JavaBaseConstants.COLUMN_NAME_DATA, structType.nullable(false))
.column(JavaBaseConstants.COLUMN_NAME_AB_META, structType.nullable(true))
.`as`(
DSL.select(
DSL.field(JavaBaseConstants.COLUMN_NAME_AB_ID)
.`as`(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID),
DSL.field(JavaBaseConstants.COLUMN_NAME_EMITTED_AT)
.`as`(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT),
DSL.cast(null, timestampWithTimeZoneType)
.`as`(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT),
DSL.field(JavaBaseConstants.COLUMN_NAME_DATA)
.`as`(JavaBaseConstants.COLUMN_NAME_DATA),
DSL.cast(null, structType).`as`(JavaBaseConstants.COLUMN_NAME_AB_META),
)
.from(DSL.table(DSL.name(namespace, tableName))),
): String {
val hasGenerationId = columns == DestinationColumns.V2_WITH_GENERATION

val createTable: CreateTableColumnStep =
dslContext
.createTable(rawTableName)
.column(
JavaBaseConstants.COLUMN_NAME_AB_RAW_ID,
SQLDataType.VARCHAR(36).nullable(false),
)
.column(
JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT,
timestampWithTimeZoneType.nullable(false),
)
.column(
JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT,
timestampWithTimeZoneType.nullable(true),
)
.column(JavaBaseConstants.COLUMN_NAME_DATA, structType.nullable(false))
.column(JavaBaseConstants.COLUMN_NAME_AB_META, structType.nullable(true))
if (hasGenerationId) {
createTable.column(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID, SQLDataType.BIGINT)
}

val selectColumns: MutableList<SelectFieldOrAsterisk> =
mutableListOf(
DSL.field(JavaBaseConstants.COLUMN_NAME_AB_ID)
.`as`(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID),
DSL.field(JavaBaseConstants.COLUMN_NAME_EMITTED_AT)
.`as`(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT),
DSL.cast(null, timestampWithTimeZoneType)
.`as`(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT),
DSL.field(JavaBaseConstants.COLUMN_NAME_DATA)
.`as`(JavaBaseConstants.COLUMN_NAME_DATA),
DSL.cast(null, structType).`as`(JavaBaseConstants.COLUMN_NAME_AB_META),
)
if (hasGenerationId) {
selectColumns += DSL.value(0).`as`(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID)
}

return createTable
.`as`(DSL.select(selectColumns).from(DSL.table(DSL.name(namespace, tableName))))
.getSQL(ParamType.INLINED)
}

override fun clearLoadedAt(streamId: StreamId): Sql {
return of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.cdk.db.jdbc.JdbcDatabase
import io.airbyte.cdk.integrations.base.JavaBaseConstants
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_ID
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_META
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_RAW_ID
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_DATA
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_EMITTED_AT
import io.airbyte.cdk.integrations.base.JavaBaseConstants.DestinationColumns
import io.airbyte.cdk.integrations.base.JavaBaseConstants.LEGACY_RAW_TABLE_COLUMNS
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType
Expand Down Expand Up @@ -90,16 +92,18 @@ abstract class JdbcSqlGeneratorIntegrationTest<DestinationState : MinimumDestina

@Throws(Exception::class)
override fun createRawTable(streamId: StreamId) {
database.execute(
val columns =
dslContext
.createTable(DSL.name(streamId.rawNamespace, streamId.rawName))
.column(COLUMN_NAME_AB_RAW_ID, SQLDataType.VARCHAR(36).nullable(false))
.column(COLUMN_NAME_AB_EXTRACTED_AT, timestampWithTimeZoneType.nullable(false))
.column(COLUMN_NAME_AB_LOADED_AT, timestampWithTimeZoneType)
.column(COLUMN_NAME_DATA, structType.nullable(false))
.column(COLUMN_NAME_AB_META, structType.nullable(true))
.getSQL(ParamType.INLINED)
)
if (sqlGenerator.columns == DestinationColumns.V2_WITH_GENERATION) {
columns.column(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID, SQLDataType.BIGINT)
}
database.execute(columns.getSQL(ParamType.INLINED))
}

@Throws(Exception::class)
Expand All @@ -118,7 +122,7 @@ abstract class JdbcSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
public override fun insertRawTableRecords(streamId: StreamId, records: List<JsonNode>) {
insertRecords(
DSL.name(streamId.rawNamespace, streamId.rawName),
JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES,
sqlGenerator.columns.rawColumns,
records,
COLUMN_NAME_DATA,
COLUMN_NAME_AB_META
Expand All @@ -143,9 +147,12 @@ abstract class JdbcSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
records: List<JsonNode>,
generationId: Long,
) {
// TODO handle generation ID
val columnNames =
if (includeCdcDeletedAt) FINAL_TABLE_COLUMN_NAMES_CDC else FINAL_TABLE_COLUMN_NAMES
(if (includeCdcDeletedAt) FINAL_TABLE_COLUMN_NAMES_CDC else FINAL_TABLE_COLUMN_NAMES)
.toMutableList()
if (sqlGenerator.columns == DestinationColumns.V2_WITH_GENERATION) {
columnNames += COLUMN_NAME_AB_GENERATION_ID
}
insertRecords(
DSL.name(streamId.finalNamespace, streamId.finalName + suffix),
columnNames,
Expand Down
Loading

0 comments on commit 6fece13

Please sign in to comment.