Skip to content

Commit

Permalink
Merge branch 'master' into brian/zendesk_support_upgrade_to_concurrent
Browse files Browse the repository at this point in the history
  • Loading branch information
maxi297 committed Nov 8, 2024
2 parents dcf0f07 + c589cb5 commit 947fc88
Show file tree
Hide file tree
Showing 326 changed files with 29,913 additions and 12,100 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/connectors_version_increment_check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
name: Connectors Version Increment Check
runs-on: connector-test-large
if: github.event.pull_request.head.repo.fork != true
timeout-minutes: 12
timeout-minutes: 22
steps:
- name: Checkout Airbyte
uses: actions/checkout@v4
Expand Down
4 changes: 4 additions & 0 deletions airbyte-cdk/bulk/core/base/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,7 @@ dependencies {
testFixturesApi 'io.micronaut.test:micronaut-test-junit5:4.5.0'
testFixturesApi 'io.github.deblockt:json-diff:1.1.0'
}

test {
environment "PRESENT", "present-value"
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ import io.airbyte.cdk.ssh.SshTunnelMethodConfiguration
interface SshTunnelConfiguration {
val realHost: String
val realPort: Int
val sshTunnel: SshTunnelMethodConfiguration
val sshTunnel: SshTunnelMethodConfiguration?
val sshConnectionOptions: SshConnectionOptions
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ internal constructor(
/** Creates an open [TunnelSession]. */
fun createTunnelSession(
remote: SshdSocketAddress,
sshTunnel: SshTunnelMethodConfiguration,
sshTunnel: SshTunnelMethodConfiguration?,
connectionOptions: SshConnectionOptions,
): TunnelSession {
if (sshTunnel is SshNoTunnelMethod) {
Expand All @@ -62,7 +62,8 @@ fun createTunnelSession(
log.info { "Creating SSH client session." }
val connectFuture: ConnectFuture =
when (sshTunnel) {
SshNoTunnelMethod -> TODO("unreachable code")
SshNoTunnelMethod,
null -> TODO("unreachable code")
is SshKeyAuthTunnelMethod ->
client.connect(sshTunnel.user.trim(), sshTunnel.host.trim(), sshTunnel.port)
is SshPasswordAuthTunnelMethod ->
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
airbyte:
file-transfer:
enabled: ${USE_FILE_TRANSFER:false}
staging-path: ${AIRBYTE_STAGING_DIRECTORY:/staging/files}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.initialization

import io.micronaut.context.annotation.Bean
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Value
import io.micronaut.test.extensions.junit5.annotation.MicronautTest
import jakarta.inject.Inject
import kotlin.test.assertEquals
import org.junit.jupiter.api.Test

@MicronautTest
class TestApplicationYaml {
@Inject lateinit var defaultValueBean: DefaultValueBean

@Test
fun testMainDefaultValue() {
assertEquals("/staging/files", defaultValueBean.stagingFolder)
assertEquals(false, defaultValueBean.fileTransferEnable)
}
}

data class DefaultValueBean(
val stagingFolder: String,
val fileTransferEnable: Boolean,
)

@Factory
class TestFactory {
@Bean
fun defaultValueBean(
@Value("\${airbyte.file-transfer.staging-path}") stagingFolder: String,
@Value("\${airbyte.file-transfer.enabled}") fileTransferEnable: Boolean,
): DefaultValueBean {
return DefaultValueBean(stagingFolder, fileTransferEnable)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class MockStreamLoader(override val stream: DestinationStream) : StreamLoader {
stream.generationId,
it.data as ObjectValue,
OutputRecord.Meta(
changes = it.meta?.changes ?: mutableListOf(),
changes = it.meta?.changes ?: listOf(),
syncId = stream.syncId
),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package io.airbyte.cdk.load.check
import io.airbyte.cdk.load.command.Append
import io.airbyte.cdk.load.command.DestinationConfiguration
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.data.NullType
import io.airbyte.cdk.load.data.ObjectTypeWithoutSchema

/**
* A check operation that is run before the destination is used.
Expand All @@ -25,7 +25,7 @@ interface DestinationChecker<C : DestinationConfiguration> {
DestinationStream(
descriptor = DestinationStream.Descriptor("testing", "test"),
importType = Append,
schema = NullType,
schema = ObjectTypeWithoutSchema,
generationId = 1,
minimumGenerationId = 0,
syncId = 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,8 @@
package io.airbyte.cdk.load.command

import io.airbyte.cdk.load.data.AirbyteType
import io.airbyte.cdk.load.data.ArrayType
import io.airbyte.cdk.load.data.FieldType
import io.airbyte.cdk.load.data.IntegerType
import io.airbyte.cdk.load.data.ObjectType
import io.airbyte.cdk.load.data.StringType
import io.airbyte.cdk.load.data.json.AirbyteTypeToJsonSchema
import io.airbyte.cdk.load.data.json.JsonSchemaToAirbyteType
import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.protocol.models.v0.AirbyteStream
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
import io.airbyte.protocol.models.v0.DestinationSyncMode
Expand Down Expand Up @@ -51,59 +45,6 @@ data class DestinationStream(
* what actually exists, as many destinations have legacy data from before this schema was
* adopted.
*/
val schemaWithMeta: ObjectType
get() =
ObjectType(
linkedMapOf(
DestinationRecord.Meta.COLUMN_NAME_AB_RAW_ID to
FieldType(StringType, nullable = false),
DestinationRecord.Meta.COLUMN_NAME_AB_EXTRACTED_AT to
FieldType(IntegerType, nullable = false),
DestinationRecord.Meta.COLUMN_NAME_AB_META to
FieldType(
nullable = false,
type =
ObjectType(
linkedMapOf(
"sync_id" to FieldType(IntegerType, nullable = false),
"changes" to
FieldType(
nullable = false,
type =
ArrayType(
FieldType(
nullable = false,
type =
ObjectType(
linkedMapOf(
"field" to
FieldType(
StringType,
nullable = false
),
"change" to
FieldType(
StringType,
nullable = false
),
"reason" to
FieldType(
StringType,
nullable = false
),
)
)
)
)
)
)
)
),
DestinationRecord.Meta.COLUMN_NAME_AB_GENERATION_ID to
FieldType(IntegerType, nullable = false),
DestinationRecord.Meta.COLUMN_NAME_DATA to FieldType(schema, nullable = false),
)
)

/**
* This is not fully round-trippable. Destinations don't care about most of the stuff in an
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,17 @@

package io.airbyte.cdk.load.data

interface AirbyteSchemaIdentityMapper {
fun map(schema: AirbyteType): AirbyteType =
interface AirbyteSchemaMapper {
fun map(schema: AirbyteType): AirbyteType
}

class AirbyteSchemaNoopMapper : AirbyteSchemaMapper {
override fun map(schema: AirbyteType): AirbyteType = schema
}

interface AirbyteSchemaIdentityMapper : AirbyteSchemaMapper {
override fun map(schema: AirbyteType): AirbyteType =
when (schema) {
is NullType -> mapNull(schema)
is StringType -> mapString(schema)
is BooleanType -> mapBoolean(schema)
is IntegerType -> mapInteger(schema)
Expand All @@ -26,7 +33,6 @@ interface AirbyteSchemaIdentityMapper {
is UnknownType -> mapUnknown(schema)
}

fun mapNull(schema: NullType): AirbyteType = schema
fun mapString(schema: StringType): AirbyteType = schema
fun mapBoolean(schema: BooleanType): AirbyteType = schema
fun mapInteger(schema: IntegerType): AirbyteType = schema
Expand All @@ -43,7 +49,7 @@ interface AirbyteSchemaIdentityMapper {
fun mapObjectWithoutSchema(schema: ObjectTypeWithoutSchema): AirbyteType = schema
fun mapObjectWithEmptySchema(schema: ObjectTypeWithEmptySchema): AirbyteType = schema
fun mapUnion(schema: UnionType): AirbyteType {
return UnionType(schema.options.map { map(it) })
return UnionType.of(schema.options.map { map(it) })
}
fun mapDate(schema: DateType): AirbyteType = schema
fun mapTimeTypeWithTimezone(schema: TimeTypeWithTimezone): AirbyteType = schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

package io.airbyte.cdk.load.data

sealed interface AirbyteType
import com.fasterxml.jackson.databind.JsonNode

data object NullType : AirbyteType
sealed interface AirbyteType

data object StringType : AirbyteType

Expand Down Expand Up @@ -36,8 +36,21 @@ data object ObjectTypeWithEmptySchema : AirbyteType

data object ObjectTypeWithoutSchema : AirbyteType

data class UnionType(val options: List<AirbyteType>) : AirbyteType
data class UnionType(val options: Set<AirbyteType>) : AirbyteType {
companion object {
fun of(options: Set<AirbyteType>): AirbyteType {
if (options.size == 1) {
return options.first()
}
return UnionType(options)
}

fun of(options: List<AirbyteType>): AirbyteType = of(options.toSet())

fun of(vararg options: AirbyteType): AirbyteType = of(options.toSet())
}
}

data class UnknownType(val what: String) : AirbyteType
data class UnknownType(val schema: JsonNode) : AirbyteType

data class FieldType(val type: AirbyteType, val nullable: Boolean)
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.data

import io.airbyte.cdk.load.message.DestinationRecord

class AirbyteTypeToAirbyteTypeWithMeta(private val flatten: Boolean) {
fun convert(schema: AirbyteType): ObjectType {
val properties =
linkedMapOf(
DestinationRecord.Meta.COLUMN_NAME_AB_RAW_ID to
FieldType(StringType, nullable = false),
DestinationRecord.Meta.COLUMN_NAME_AB_EXTRACTED_AT to
FieldType(IntegerType, nullable = false),
DestinationRecord.Meta.COLUMN_NAME_AB_META to
FieldType(
nullable = false,
type =
ObjectType(
linkedMapOf(
"sync_id" to FieldType(IntegerType, nullable = false),
"changes" to
FieldType(
nullable = false,
type =
ArrayType(
FieldType(
nullable = false,
type =
ObjectType(
linkedMapOf(
"field" to
FieldType(
StringType,
nullable = false
),
"change" to
FieldType(
StringType,
nullable = false
),
"reason" to
FieldType(
StringType,
nullable = false
),
)
)
)
)
)
)
)
),
DestinationRecord.Meta.COLUMN_NAME_AB_GENERATION_ID to
FieldType(IntegerType, nullable = false)
)
if (flatten) {
(schema as ObjectType).properties.forEach { (name, field) -> properties[name] = field }
} else {
properties[DestinationRecord.Meta.COLUMN_NAME_DATA] =
FieldType(schema, nullable = false)
}
return ObjectType(properties)
}
}

fun AirbyteType.withAirbyteMeta(flatten: Boolean = false): ObjectType =
AirbyteTypeToAirbyteTypeWithMeta(flatten).convert(this)
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.cdk.load.data

import com.fasterxml.jackson.databind.JsonNode
import java.math.BigDecimal
import java.time.LocalDate
import java.time.LocalDateTime
Expand Down Expand Up @@ -161,4 +162,4 @@ value class ObjectValue(val values: LinkedHashMap<String, AirbyteValue>) : Airby
}
}

@JvmInline value class UnknownValue(val what: String) : AirbyteValue
@JvmInline value class UnknownValue(val value: JsonNode) : AirbyteValue
Loading

0 comments on commit 947fc88

Please sign in to comment.