Skip to content

Commit

Permalink
Adding option to process without exactly-once semantics (#75)
Browse files Browse the repository at this point in the history
* Adding option to process without writing an index

* Renaming exactly once property
  • Loading branch information
davidsloan committed Aug 19, 2024
1 parent 2866a69 commit 79e33fa
Show file tree
Hide file tree
Showing 20 changed files with 379 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class S3AvroWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
dataStorage = DataStorageSettings.disabled,
),
),
indexOptions = IndexOptions(5, ".indexes"),
indexOptions = IndexOptions(5, ".indexes").some,
compressionCodec = compressionCodec,
batchDelete = true,
errorPolicy = ErrorPolicy(ErrorPolicyEnum.THROW),
Expand All @@ -116,7 +116,7 @@ class S3AvroWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
val sink = writerManagerCreator.from(avroConfig(new OffsetFileNamer(
identity[String],
AvroFormatSelection.extension,
)))
)))._2
firstUsers.zipWithIndex.foreach {
case (struct: Struct, index: Int) =>
val writeRes = sink.write(
Expand Down Expand Up @@ -152,7 +152,7 @@ class S3AvroWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
val sink = writerManagerCreator.from(avroConfig(new OffsetFileNamer(
identity[String],
AvroFormatSelection.extension,
)))
)))._2
firstUsers.zip(List(0 -> 100, 1 -> 99, 2 -> 101, 3 -> 102)).foreach {
case (struct: Struct, (index: Int, timestamp: Int)) =>
val writeRes = sink.write(
Expand Down Expand Up @@ -187,7 +187,7 @@ class S3AvroWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
val sink = writerManagerCreator.from(avroConfig(new OffsetFileNamer(
identity[String],
AvroFormatSelection.extension,
)))
)))._2
val usersWithDecimal1 =
new Struct(UsersSchemaDecimal)
.put("name", "sam")
Expand Down Expand Up @@ -266,7 +266,7 @@ class S3AvroWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
val sink = writerManagerCreator.from(avroConfig(new OffsetFileNamer(
identity[String],
AvroFormatSelection.extension,
)))
)))._2
firstUsers.concat(usersWithNewSchema).zipWithIndex.foreach {
case (user, index) =>
sink.write(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,15 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
dataStorage = DataStorageSettings.disabled,
), // JsonS3Format
),
indexOptions = IndexOptions(5, ".indexes"),
indexOptions = IndexOptions(5, ".indexes").some,
compressionCodec,
batchDelete = true,
errorPolicy = ErrorPolicy(ErrorPolicyEnum.THROW),
connectorRetryConfig = new RetryConfig(1, 1L, 1.0),
logMetrics = false,
)

val sink = writerManagerCreator.from(config)
val sink = writerManagerCreator.from(config)._2
val topic = Topic(TopicName)
val offset = Offset(1)
sink.write(
Expand Down Expand Up @@ -169,15 +169,15 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
dataStorage = DataStorageSettings.disabled,
),
),
indexOptions = IndexOptions(5, ".indexes"),
indexOptions = IndexOptions(5, ".indexes").some,
compressionCodec,
batchDelete = true,
errorPolicy = ErrorPolicy(ErrorPolicyEnum.THROW),
connectorRetryConfig = new RetryConfig(1, 1L, 1.0),
logMetrics = false,
)

val sink = writerManagerCreator.from(config)
val sink = writerManagerCreator.from(config)._2
firstUsers.zipWithIndex.foreach {
case (struct: Struct, index: Int) =>
val topic = Topic(TopicName)
Expand Down Expand Up @@ -237,15 +237,15 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
dataStorage = DataStorageSettings.disabled,
),
),
indexOptions = IndexOptions(5, ".indexes"),
indexOptions = IndexOptions(5, ".indexes").some,
compressionCodec,
batchDelete = true,
errorPolicy = ErrorPolicy(ErrorPolicyEnum.THROW),
connectorRetryConfig = new RetryConfig(1, 1L, 1.0),
logMetrics = false,
)

val sink = writerManagerCreator.from(config)
val sink = writerManagerCreator.from(config)._2

val topic = Topic(TopicName)
val offset = Offset(1L)
Expand Down Expand Up @@ -311,15 +311,15 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
dataStorage = DataStorageSettings.disabled,
),
),
indexOptions = IndexOptions(5, ".indexes"),
indexOptions = IndexOptions(5, ".indexes").some,
compressionCodec,
batchDelete = true,
errorPolicy = ErrorPolicy(ErrorPolicyEnum.THROW),
connectorRetryConfig = new RetryConfig(1, 1L, 1.0),
logMetrics = false,
)

val sink = writerManagerCreator.from(config)
val sink = writerManagerCreator.from(config)._2
val topic = Topic(TopicName)
val offset = Offset(1)
val listOfPojo: java.util.List[Pojo] = List(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class S3ParquetWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyC
dataStorage = DataStorageSettings.disabled,
),
),
indexOptions = IndexOptions(5, ".indexes"),
indexOptions = IndexOptions(5, ".indexes").some,
compressionCodec,
batchDelete = true,
errorPolicy = ErrorPolicy(ErrorPolicyEnum.THROW),
Expand All @@ -112,7 +112,7 @@ class S3ParquetWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyC

"parquet sink" should "write 2 records to parquet format in s3" in {

val sink = writerManagerCreator.from(parquetConfig)
val sink = writerManagerCreator.from(parquetConfig)._2
firstUsers.zipWithIndex.foreach {
case (struct: Struct, index: Int) =>
val topic = Topic(TopicName)
Expand Down Expand Up @@ -158,7 +158,7 @@ class S3ParquetWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyC
new Struct(secondSchema).put("name", "coco").put("designation", null).put("salary", 395.44),
)

val sink = writerManagerCreator.from(parquetConfig)
val sink = writerManagerCreator.from(parquetConfig)._2
firstUsers.concat(usersWithNewSchema).zipWithIndex.foreach {
case (user, index) =>
val topic = Topic(TopicName)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package io.lenses.streamreactor.connect.aws.s3.sink

import cats.implicits.catsSyntaxEitherId
import cats.implicits.catsSyntaxOptionId
import io.lenses.streamreactor.connect.aws.s3.model.location.S3LocationValidator
import io.lenses.streamreactor.connect.aws.s3.storage.S3FileMetadata
import io.lenses.streamreactor.connect.aws.s3.utils.S3ProxyContainerTest
import io.lenses.streamreactor.connect.cloud.common.formats.writer.FormatWriter
import io.lenses.streamreactor.connect.cloud.common.model.Offset
import io.lenses.streamreactor.connect.cloud.common.model.Topic
import io.lenses.streamreactor.connect.cloud.common.model.TopicPartition
import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocation
import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocationValidator
import io.lenses.streamreactor.connect.cloud.common.sink.commit.CommitPolicy
Expand All @@ -14,7 +17,7 @@ import io.lenses.streamreactor.connect.cloud.common.sink.commit.FileSize
import io.lenses.streamreactor.connect.cloud.common.sink.commit.Interval
import io.lenses.streamreactor.connect.cloud.common.sink.naming.CloudKeyNamer
import io.lenses.streamreactor.connect.cloud.common.sink.naming.ObjectKeyBuilder
import io.lenses.streamreactor.connect.cloud.common.sink.seek.IndexManager
import io.lenses.streamreactor.connect.cloud.common.sink.writer.WriterIndexer
import io.lenses.streamreactor.connect.cloud.common.sink.writer.WriterManager
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.mockito.MockitoSugar
Expand All @@ -39,8 +42,9 @@ class WriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyContainerT
stagingFilenameFn = (_, _) => new File("blah.csv").asRight,
objKeyBuilderFn = (_, _) => mock[ObjectKeyBuilder],
formatWriterFn = (_, _) => mock[FormatWriter].asRight,
indexManager = mock[IndexManager[S3FileMetadata]],
writerIndexer = mock[WriterIndexer[S3FileMetadata]],
_.asRight,
((_: TopicPartition) => Offset(45).some).some,
)

val result = wm.preCommit(Map(topicPartition -> new OffsetAndMetadata(999)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ object S3SinkConfig extends PropsToConfigConverter[S3SinkConfig] {
case class S3SinkConfig(
connectionConfig: S3ConnectionConfig,
bucketOptions: Seq[CloudSinkBucketOptions] = Seq.empty,
indexOptions: IndexOptions,
indexOptions: Option[IndexOptions],
compressionCodec: CompressionCodec,
batchDelete: Boolean,
errorPolicy: ErrorPolicy,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class S3ConfigSettingsTest extends AnyFlatSpec with Matchers with LazyLogging {
val configKeys =
S3SinkConfigDef.config.configKeys().keySet().asScala ++ S3SourceConfigDef.config.configKeys().keySet().asScala

configKeys.size shouldBe 53
configKeys.size shouldBe 54
configKeys.foreach {
k => k.toLowerCase should be(k)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ object DatalakeSinkConfig extends PropsToConfigConverter[DatalakeSinkConfig] {
case class DatalakeSinkConfig(
connectionConfig: AzureConnectionConfig,
bucketOptions: Seq[CloudSinkBucketOptions] = Seq.empty,
indexOptions: IndexOptions,
indexOptions: Option[IndexOptions],
compressionCodec: CompressionCodec,
errorPolicy: ErrorPolicy,
connectorRetryConfig: RetryConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ trait IndexConfigKeys extends WithConnectorPrefix {
s"Name of the indexes directory"
private val INDEXES_DIRECTORY_NAME_DEFAULT = ".indexes"

val ENABLE_EXACTLY_ONCE = s"$connectorPrefix.exactly.once.enable"
private val ENABLE_EXACTLY_ONCE_DOC =
s"Exactly once is enabled by default. It works by keeping an .indexes directory at the root of your bucket with subdirectories for indexes. Exactly once support can be disabled and the default offset tracking from kafka can be used instead by setting this to false."
private val ENABLE_EXACTLY_ONCE_DEFAULT = true

def addIndexSettingsToConfigDef(configDef: ConfigDef): ConfigDef =
configDef
.define(
Expand All @@ -58,11 +63,22 @@ trait IndexConfigKeys extends WithConnectorPrefix {
ConfigDef.Width.LONG,
INDEXES_DIRECTORY_NAME,
)
.define(
ENABLE_EXACTLY_ONCE,
Type.BOOLEAN,
ENABLE_EXACTLY_ONCE_DEFAULT,
Importance.LOW,
ENABLE_EXACTLY_ONCE_DOC,
"Sink Seek",
3,
ConfigDef.Width.NONE,
ENABLE_EXACTLY_ONCE,
)
}
trait IndexSettings extends BaseSettings with IndexConfigKeys {
def getIndexSettings: IndexOptions =
IndexOptions(
def getIndexSettings: Option[IndexOptions] =
Option.when(getBoolean(ENABLE_EXACTLY_ONCE))(IndexOptions(
getInt(SEEK_MAX_INDEX_FILES),
getString(INDEXES_DIRECTORY_NAME),
)
))
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ trait CloudSinkConfig[CC] extends CloudConfig {
*
* @return The offset seeker options for the cloud sink.
*/
def indexOptions: IndexOptions
def indexOptions: Option[IndexOptions]

/**
* Retrieves the compression codec for the cloud sink.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import io.lenses.streamreactor.connect.cloud.common.model.Topic
import io.lenses.streamreactor.connect.cloud.common.model.TopicPartition
import io.lenses.streamreactor.connect.cloud.common.sink.conversion.HeaderToStringConverter
import io.lenses.streamreactor.connect.cloud.common.sink.conversion.ValueToSinkDataConverter
import io.lenses.streamreactor.connect.cloud.common.sink.seek.IndexManager
import io.lenses.streamreactor.connect.cloud.common.sink.writer.WriterManager
import io.lenses.streamreactor.connect.cloud.common.storage.FileMetadata
import io.lenses.streamreactor.connect.cloud.common.storage.StorageInterface
Expand Down Expand Up @@ -69,8 +70,10 @@ abstract class CloudSinkTask[MD <: FileMetadata, C <: CloudSinkConfig[CC], CC <:
private val writerManagerCreator = new WriterManagerCreator[MD, C]()

private var logMetrics = false
private var writerManager: WriterManager[MD] = _
implicit var connectorTaskId: ConnectorTaskId = _
private var writerManager: WriterManager[MD] = _
private var maybeIndexManager: Option[IndexManager[MD]] = _

implicit var connectorTaskId: ConnectorTaskId = _

override def version(): String = manifest.getVersion()

Expand All @@ -89,7 +92,11 @@ abstract class CloudSinkTask[MD <: FileMetadata, C <: CloudSinkConfig[CC], CC <:
val props = MapUtils.mergeProps(contextProps, fallbackProps.asScala.toMap)
val errOrWriterMan = createWriterMan(props)

errOrWriterMan.leftMap(throw _).foreach(writerManager = _)
errOrWriterMan.leftMap(throw _).foreach {
case (mim, wm) =>
maybeIndexManager = mim
writerManager = wm
}
}

private def rollback(topicPartitions: Set[TopicPartition]): Unit =
Expand Down Expand Up @@ -217,29 +224,30 @@ abstract class CloudSinkTask[MD <: FileMetadata, C <: CloudSinkConfig[CC], CC <:
actualOffsets
}

override def open(partitions: util.Collection[KafkaTopicPartition]): Unit = {
val partitionsDebug = partitions.asScala.map(tp => s"${tp.topic()}-${tp.partition()}").mkString(",")
logger.debug(s"[{}] Open partitions", connectorTaskId.show, partitionsDebug: Any)

val topicPartitions = partitions.asScala
.map(tp => TopicPartition(Topic(tp.topic), tp.partition))
.toSet

handleErrors(
for {
tpoMap <- writerManager.open(topicPartitions)
} yield {
tpoMap.foreach {
case (topicPartition, offset) =>
logger.debug(
s"[${connectorTaskId.show}] Seeking to ${topicPartition.topic.value}-${topicPartition.partition}:${offset.value}",
)
context.offset(topicPartition.toKafka, offset.value)
}
},
)

}
override def open(partitions: util.Collection[KafkaTopicPartition]): Unit =
maybeIndexManager.foreach {
indexManager =>
val partitionsDebug = partitions.asScala.map(tp => s"${tp.topic()}-${tp.partition()}").mkString(",")
logger.debug(s"[{}] Open partitions", connectorTaskId.show, partitionsDebug: Any)

val topicPartitions = partitions.asScala
.map(tp => TopicPartition(Topic(tp.topic), tp.partition))
.toSet

handleErrors(
for {
tpoMap <- indexManager.open(topicPartitions)
} yield {
tpoMap.foreach {
case (topicPartition, offset) =>
logger.debug(
s"[${connectorTaskId.show}] Seeking to ${topicPartition.topic.value}-${topicPartition.partition}:${offset.value}",
)
context.offset(topicPartition.toKafka, offset.value)
}
},
)
}

/**
* Whenever close is called, the topics and partitions assigned to this task
Expand Down Expand Up @@ -269,17 +277,21 @@ abstract class CloudSinkTask[MD <: FileMetadata, C <: CloudSinkConfig[CC], CC <:

def convertPropsToConfig(connectorTaskId: ConnectorTaskId, props: Map[String, String]): Either[Throwable, C]

private def createWriterMan(props: Map[String, String]): Either[Throwable, WriterManager[MD]] =
private def createWriterMan(
props: Map[String, String],
): Either[Throwable, (Option[IndexManager[MD]], WriterManager[MD])] =
for {
config <- convertPropsToConfig(connectorTaskId, props)
s3Client <- createClient(config.connectionConfig)
storageInterface = createStorageInterface(connectorTaskId, config, s3Client)
_ <- setRetryInterval(config)
writerManager <- Try(writerManagerCreator.from(config)(connectorTaskId, storageInterface)).toEither
_ <- initializeFromConfig(config)
(maybeIndexManager, writerManager) <- Try(
writerManagerCreator.from(config)(connectorTaskId, storageInterface),
).toEither
_ <- initializeFromConfig(config)
} yield {
logMetrics = config.logMetrics
writerManager
(maybeIndexManager, writerManager)
}

private def initializeFromConfig(config: C): Either[Throwable, Unit] =
Expand Down
Loading

0 comments on commit 79e33fa

Please sign in to comment.