Skip to content

Commit

Permalink
Merge latest changes from main branch
Browse files Browse the repository at this point in the history
  • Loading branch information
bala-ceg authored Nov 8, 2024
2 parents 8c608ed + 2a45a15 commit e943910
Show file tree
Hide file tree
Showing 1,106 changed files with 118,478 additions and 38,704 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/connectors_up_to_date.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,4 @@ jobs:
sentry_dsn: ${{ secrets.SENTRY_AIRBYTE_CI_DSN }}
s3_build_cache_access_key_id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }}
s3_build_cache_secret_key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }}
subcommand: 'connectors --concurrency=10 --language=python --language=low-code --support-level=community --support-level=certified --metadata-query="\"source-declarative-manifest\" not in data.dockerRepository" --metadata-query="\"-rc.\" not in data.dockerImageTag" up-to-date --ignore-connector=source-declarative-manifest --create-prs --auto-merge'
subcommand: 'connectors --concurrency=10 --language=python --language=low-code --support-level=community --support-level=certified --metadata-query="\"source-declarative-manifest\" not in data.dockerRepository" --metadata-query="\"-rc.\" not in data.dockerImageTag" up-to-date --create-prs --auto-merge'
2 changes: 1 addition & 1 deletion .github/workflows/connectors_version_increment_check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
name: Connectors Version Increment Check
runs-on: connector-test-large
if: github.event.pull_request.head.repo.fork != true
timeout-minutes: 12
timeout-minutes: 22
steps:
- name: Checkout Airbyte
uses: actions/checkout@v4
Expand Down
4 changes: 4 additions & 0 deletions airbyte-cdk/bulk/core/base/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,7 @@ dependencies {
testFixturesApi 'io.micronaut.test:micronaut-test-junit5:4.5.0'
testFixturesApi 'io.github.deblockt:json-diff:1.1.0'
}

test {
environment "PRESENT", "present-value"
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ import io.airbyte.cdk.ssh.SshTunnelMethodConfiguration
interface SshTunnelConfiguration {
val realHost: String
val realPort: Int
val sshTunnel: SshTunnelMethodConfiguration
val sshTunnel: SshTunnelMethodConfiguration?
val sshConnectionOptions: SshConnectionOptions
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ internal constructor(
/** Creates an open [TunnelSession]. */
fun createTunnelSession(
remote: SshdSocketAddress,
sshTunnel: SshTunnelMethodConfiguration,
sshTunnel: SshTunnelMethodConfiguration?,
connectionOptions: SshConnectionOptions,
): TunnelSession {
if (sshTunnel is SshNoTunnelMethod) {
Expand All @@ -62,7 +62,8 @@ fun createTunnelSession(
log.info { "Creating SSH client session." }
val connectFuture: ConnectFuture =
when (sshTunnel) {
SshNoTunnelMethod -> TODO("unreachable code")
SshNoTunnelMethod,
null -> TODO("unreachable code")
is SshKeyAuthTunnelMethod ->
client.connect(sshTunnel.user.trim(), sshTunnel.host.trim(), sshTunnel.port)
is SshPasswordAuthTunnelMethod ->
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
airbyte:
file-transfer:
enabled: ${USE_FILE_TRANSFER:false}
staging-path: ${AIRBYTE_STAGING_DIRECTORY:/staging/files}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.initialization

import io.micronaut.context.annotation.Bean
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Value
import io.micronaut.test.extensions.junit5.annotation.MicronautTest
import jakarta.inject.Inject
import kotlin.test.assertEquals
import org.junit.jupiter.api.Test

@MicronautTest
class TestApplicationYaml {
@Inject lateinit var defaultValueBean: DefaultValueBean

@Test
fun testMainDefaultValue() {
assertEquals("/staging/files", defaultValueBean.stagingFolder)
assertEquals(false, defaultValueBean.fileTransferEnable)
}
}

data class DefaultValueBean(
val stagingFolder: String,
val fileTransferEnable: Boolean,
)

@Factory
class TestFactory {
@Bean
fun defaultValueBean(
@Value("\${airbyte.file-transfer.staging-path}") stagingFolder: String,
@Value("\${airbyte.file-transfer.enabled}") fileTransferEnable: Boolean,
): DefaultValueBean {
return DefaultValueBean(stagingFolder, fileTransferEnable)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import io.airbyte.cdk.load.test.util.NoopDestinationCleaner
import io.airbyte.cdk.load.test.util.NoopExpectedRecordMapper
import io.airbyte.cdk.load.test.util.NoopNameMapper
import io.airbyte.cdk.load.write.BasicFunctionalityIntegrationTest
import io.airbyte.cdk.load.write.Untyped
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test

Expand All @@ -21,6 +22,11 @@ class MockBasicFunctionalityIntegrationTest :
NoopNameMapper,
isStreamSchemaRetroactive = false,
supportsDedup = true,
stringifySchemalessObjects = false,
promoteUnionToObject = false,
preserveUndeclaredFields = true,
commitDataIncrementally = false,
allTypesBehavior = Untyped,
) {
@Test
override fun testBasicWrite() {
Expand Down Expand Up @@ -48,6 +54,16 @@ class MockBasicFunctionalityIntegrationTest :
super.testTruncateRefresh()
}

@Test
override fun testInterruptedTruncateWithPriorData() {
super.testInterruptedTruncateWithPriorData()
}

@Test
override fun resumeAfterCancelledTruncate() {
super.resumeAfterCancelledTruncate()
}

@Test
override fun testAppend() {
super.testAppend()
Expand All @@ -62,4 +78,19 @@ class MockBasicFunctionalityIntegrationTest :
override fun testDedup() {
super.testDedup()
}

@Test
override fun testContainerTypes() {
super.testContainerTypes()
}

@Test
override fun testUnions() {
super.testUnions()
}

@Test
override fun testAllTypes() {
super.testAllTypes()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,23 @@ object MockDestinationBackend {
}
}

fun commitFrom(srcFilename: String, dstFilename: String) {
val src = getFile(srcFilename)
insert(dstFilename, *src.toTypedArray())
src.clear()
}

fun commitAndDedupeFrom(
srcFilename: String,
dstFilename: String,
primaryKey: List<List<String>>,
cursor: List<String>,
) {
val src = getFile(srcFilename)
upsert(dstFilename, primaryKey, cursor, *src.toTypedArray())
src.clear()
}

fun readFile(filename: String): List<OutputRecord> {
return getFile(filename)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@

package io.airbyte.cdk.load.mock_integration_test

import io.airbyte.cdk.load.command.Append
import io.airbyte.cdk.load.command.Dedupe
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.data.ObjectValue
import io.airbyte.cdk.load.message.Batch
import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.cdk.load.message.SimpleBatch
import io.airbyte.cdk.load.mock_integration_test.MockStreamLoader.Companion.getFilename
import io.airbyte.cdk.load.state.StreamIncompleteResult
import io.airbyte.cdk.load.test.util.OutputRecord
import io.airbyte.cdk.load.write.DestinationWriter
import io.airbyte.cdk.load.write.StreamLoader
Expand All @@ -33,11 +34,30 @@ class MockStreamLoader(override val stream: DestinationStream) : StreamLoader {
override val state = Batch.State.PERSISTED
}

override suspend fun start() {
MockDestinationBackend.deleteOldRecords(
getFilename(stream.descriptor),
stream.minimumGenerationId
)
override suspend fun close(streamFailure: StreamIncompleteResult?) {
if (streamFailure == null) {
when (val importType = stream.importType) {
is Append -> {
MockDestinationBackend.commitFrom(
getFilename(stream.descriptor, staging = true),
getFilename(stream.descriptor)
)
}
is Dedupe -> {
MockDestinationBackend.commitAndDedupeFrom(
getFilename(stream.descriptor, staging = true),
getFilename(stream.descriptor),
importType.primaryKey,
importType.cursor,
)
}
else -> throw IllegalArgumentException("Unsupported import type $importType")
}
MockDestinationBackend.deleteOldRecords(
getFilename(stream.descriptor),
stream.minimumGenerationId
)
}
}

override suspend fun processRecords(
Expand All @@ -51,7 +71,7 @@ class MockStreamLoader(override val stream: DestinationStream) : StreamLoader {
return when (batch) {
is LocalBatch -> {
batch.records.forEach {
val filename = getFilename(it.stream)
val filename = getFilename(it.stream, staging = true)
val record =
OutputRecord(
UUID.randomUUID(),
Expand All @@ -60,21 +80,12 @@ class MockStreamLoader(override val stream: DestinationStream) : StreamLoader {
stream.generationId,
it.data as ObjectValue,
OutputRecord.Meta(
changes = it.meta?.changes ?: mutableListOf(),
changes = it.meta?.changes ?: listOf(),
syncId = stream.syncId
),
)
val importType = stream.importType
if (importType is Dedupe) {
MockDestinationBackend.upsert(
filename,
importType.primaryKey,
importType.cursor,
record
)
} else {
MockDestinationBackend.insert(filename, record)
}
// blind insert into the staging area. We'll dedupe on commit.
MockDestinationBackend.insert(filename, record)
}
PersistedBatch(batch.records)
}
Expand All @@ -84,8 +95,13 @@ class MockStreamLoader(override val stream: DestinationStream) : StreamLoader {
}

companion object {
fun getFilename(stream: DestinationStream.Descriptor) =
getFilename(stream.namespace, stream.name)
fun getFilename(namespace: String?, name: String) = "(${namespace},${name})"
fun getFilename(stream: DestinationStream.Descriptor, staging: Boolean = false) =
getFilename(stream.namespace, stream.name, staging)
fun getFilename(namespace: String?, name: String, staging: Boolean = false) =
if (staging) {
"(${namespace},${name},staging)"
} else {
"(${namespace},${name})"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package io.airbyte.cdk.load.check
import io.airbyte.cdk.load.command.Append
import io.airbyte.cdk.load.command.DestinationConfiguration
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.data.NullType
import io.airbyte.cdk.load.data.ObjectTypeWithoutSchema

/**
* A check operation that is run before the destination is used.
Expand All @@ -25,7 +25,7 @@ interface DestinationChecker<C : DestinationConfiguration> {
DestinationStream(
descriptor = DestinationStream.Descriptor("testing", "test"),
importType = Append,
schema = NullType,
schema = ObjectTypeWithoutSchema,
generationId = 1,
minimumGenerationId = 0,
syncId = 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,8 @@
package io.airbyte.cdk.load.command

import io.airbyte.cdk.load.data.AirbyteType
import io.airbyte.cdk.load.data.ArrayType
import io.airbyte.cdk.load.data.FieldType
import io.airbyte.cdk.load.data.IntegerType
import io.airbyte.cdk.load.data.ObjectType
import io.airbyte.cdk.load.data.StringType
import io.airbyte.cdk.load.data.json.AirbyteTypeToJsonSchema
import io.airbyte.cdk.load.data.json.JsonSchemaToAirbyteType
import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.protocol.models.v0.AirbyteStream
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
import io.airbyte.protocol.models.v0.DestinationSyncMode
Expand Down Expand Up @@ -51,59 +45,6 @@ data class DestinationStream(
* what actually exists, as many destinations have legacy data from before this schema was
* adopted.
*/
val schemaWithMeta: ObjectType
get() =
ObjectType(
linkedMapOf(
DestinationRecord.Meta.COLUMN_NAME_AB_RAW_ID to
FieldType(StringType, nullable = false),
DestinationRecord.Meta.COLUMN_NAME_AB_EXTRACTED_AT to
FieldType(IntegerType, nullable = false),
DestinationRecord.Meta.COLUMN_NAME_AB_META to
FieldType(
nullable = false,
type =
ObjectType(
linkedMapOf(
"sync_id" to FieldType(IntegerType, nullable = false),
"changes" to
FieldType(
nullable = false,
type =
ArrayType(
FieldType(
nullable = false,
type =
ObjectType(
linkedMapOf(
"field" to
FieldType(
StringType,
nullable = false
),
"change" to
FieldType(
StringType,
nullable = false
),
"reason" to
FieldType(
StringType,
nullable = false
),
)
)
)
)
)
)
)
),
DestinationRecord.Meta.COLUMN_NAME_AB_GENERATION_ID to
FieldType(IntegerType, nullable = false),
DestinationRecord.Meta.COLUMN_NAME_DATA to FieldType(schema, nullable = false),
)
)

/**
* This is not fully round-trippable. Destinations don't care about most of the stuff in an
Expand Down
Loading

0 comments on commit e943910

Please sign in to comment.