diff --git a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3AvroWriterManagerTest.scala b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3AvroWriterManagerTest.scala index e562fa802..1dc0bc136 100644 --- a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3AvroWriterManagerTest.scala +++ b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3AvroWriterManagerTest.scala @@ -104,7 +104,7 @@ class S3AvroWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont dataStorage = DataStorageSettings.disabled, ), ), - indexOptions = IndexOptions(5, ".indexes"), + indexOptions = IndexOptions(5, ".indexes").some, compressionCodec = compressionCodec, batchDelete = true, errorPolicy = ErrorPolicy(ErrorPolicyEnum.THROW), @@ -116,7 +116,7 @@ class S3AvroWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont val sink = writerManagerCreator.from(avroConfig(new OffsetFileNamer( identity[String], AvroFormatSelection.extension, - ))) + )))._2 firstUsers.zipWithIndex.foreach { case (struct: Struct, index: Int) => val writeRes = sink.write( @@ -152,7 +152,7 @@ class S3AvroWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont val sink = writerManagerCreator.from(avroConfig(new OffsetFileNamer( identity[String], AvroFormatSelection.extension, - ))) + )))._2 firstUsers.zip(List(0 -> 100, 1 -> 99, 2 -> 101, 3 -> 102)).foreach { case (struct: Struct, (index: Int, timestamp: Int)) => val writeRes = sink.write( @@ -187,7 +187,7 @@ class S3AvroWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont val sink = writerManagerCreator.from(avroConfig(new OffsetFileNamer( identity[String], AvroFormatSelection.extension, - ))) + )))._2 val usersWithDecimal1 = new Struct(UsersSchemaDecimal) .put("name", "sam") @@ -266,7 +266,7 @@ class S3AvroWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont val sink = writerManagerCreator.from(avroConfig(new OffsetFileNamer( identity[String], AvroFormatSelection.extension, - ))) + )))._2 firstUsers.concat(usersWithNewSchema).zipWithIndex.foreach { case (user, index) => sink.write( diff --git a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3JsonWriterManagerTest.scala b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3JsonWriterManagerTest.scala index b466e8393..48853b001 100644 --- a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3JsonWriterManagerTest.scala +++ b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3JsonWriterManagerTest.scala @@ -105,7 +105,7 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont dataStorage = DataStorageSettings.disabled, ), // JsonS3Format ), - indexOptions = IndexOptions(5, ".indexes"), + indexOptions = IndexOptions(5, ".indexes").some, compressionCodec, batchDelete = true, errorPolicy = ErrorPolicy(ErrorPolicyEnum.THROW), @@ -113,7 +113,7 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont logMetrics = false, ) - val sink = writerManagerCreator.from(config) + val sink = writerManagerCreator.from(config)._2 val topic = Topic(TopicName) val offset = Offset(1) sink.write( @@ -169,7 +169,7 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont dataStorage = DataStorageSettings.disabled, ), ), - indexOptions = IndexOptions(5, ".indexes"), + indexOptions = IndexOptions(5, ".indexes").some, compressionCodec, batchDelete = true, errorPolicy = ErrorPolicy(ErrorPolicyEnum.THROW), @@ -177,7 +177,7 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont logMetrics = false, ) - val sink = writerManagerCreator.from(config) + val sink = writerManagerCreator.from(config)._2 firstUsers.zipWithIndex.foreach { case (struct: Struct, index: Int) => val topic = Topic(TopicName) @@ -237,7 +237,7 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont dataStorage = DataStorageSettings.disabled, ), ), - indexOptions = IndexOptions(5, ".indexes"), + indexOptions = IndexOptions(5, ".indexes").some, compressionCodec, batchDelete = true, errorPolicy = ErrorPolicy(ErrorPolicyEnum.THROW), @@ -245,7 +245,7 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont logMetrics = false, ) - val sink = writerManagerCreator.from(config) + val sink = writerManagerCreator.from(config)._2 val topic = Topic(TopicName) val offset = Offset(1L) @@ -311,7 +311,7 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont dataStorage = DataStorageSettings.disabled, ), ), - indexOptions = IndexOptions(5, ".indexes"), + indexOptions = IndexOptions(5, ".indexes").some, compressionCodec, batchDelete = true, errorPolicy = ErrorPolicy(ErrorPolicyEnum.THROW), @@ -319,7 +319,7 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont logMetrics = false, ) - val sink = writerManagerCreator.from(config) + val sink = writerManagerCreator.from(config)._2 val topic = Topic(TopicName) val offset = Offset(1) val listOfPojo: java.util.List[Pojo] = List( diff --git a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3ParquetWriterManagerTest.scala b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3ParquetWriterManagerTest.scala index 139ed1dd2..dee8cdc04 100644 --- a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3ParquetWriterManagerTest.scala +++ b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3ParquetWriterManagerTest.scala @@ -102,7 +102,7 @@ class S3ParquetWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyC dataStorage = DataStorageSettings.disabled, ), ), - indexOptions = IndexOptions(5, ".indexes"), + indexOptions = IndexOptions(5, ".indexes").some, compressionCodec, batchDelete = true, errorPolicy = ErrorPolicy(ErrorPolicyEnum.THROW), @@ -112,7 +112,7 @@ class S3ParquetWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyC "parquet sink" should "write 2 records to parquet format in s3" in { - val sink = writerManagerCreator.from(parquetConfig) + val sink = writerManagerCreator.from(parquetConfig)._2 firstUsers.zipWithIndex.foreach { case (struct: Struct, index: Int) => val topic = Topic(TopicName) @@ -158,7 +158,7 @@ class S3ParquetWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyC new Struct(secondSchema).put("name", "coco").put("designation", null).put("salary", 395.44), ) - val sink = writerManagerCreator.from(parquetConfig) + val sink = writerManagerCreator.from(parquetConfig)._2 firstUsers.concat(usersWithNewSchema).zipWithIndex.foreach { case (user, index) => val topic = Topic(TopicName) diff --git a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/WriterManagerTest.scala b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/WriterManagerTest.scala index b80045f69..011a5b048 100644 --- a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/WriterManagerTest.scala +++ b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/WriterManagerTest.scala @@ -1,11 +1,14 @@ package io.lenses.streamreactor.connect.aws.s3.sink import cats.implicits.catsSyntaxEitherId +import cats.implicits.catsSyntaxOptionId import io.lenses.streamreactor.connect.aws.s3.model.location.S3LocationValidator import io.lenses.streamreactor.connect.aws.s3.storage.S3FileMetadata import io.lenses.streamreactor.connect.aws.s3.utils.S3ProxyContainerTest import io.lenses.streamreactor.connect.cloud.common.formats.writer.FormatWriter +import io.lenses.streamreactor.connect.cloud.common.model.Offset import io.lenses.streamreactor.connect.cloud.common.model.Topic +import io.lenses.streamreactor.connect.cloud.common.model.TopicPartition import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocation import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocationValidator import io.lenses.streamreactor.connect.cloud.common.sink.commit.CommitPolicy @@ -14,7 +17,7 @@ import io.lenses.streamreactor.connect.cloud.common.sink.commit.FileSize import io.lenses.streamreactor.connect.cloud.common.sink.commit.Interval import io.lenses.streamreactor.connect.cloud.common.sink.naming.CloudKeyNamer import io.lenses.streamreactor.connect.cloud.common.sink.naming.ObjectKeyBuilder -import io.lenses.streamreactor.connect.cloud.common.sink.seek.IndexManager +import io.lenses.streamreactor.connect.cloud.common.sink.writer.WriterIndexer import io.lenses.streamreactor.connect.cloud.common.sink.writer.WriterManager import org.apache.kafka.clients.consumer.OffsetAndMetadata import org.mockito.MockitoSugar @@ -39,8 +42,9 @@ class WriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyContainerT stagingFilenameFn = (_, _) => new File("blah.csv").asRight, objKeyBuilderFn = (_, _) => mock[ObjectKeyBuilder], formatWriterFn = (_, _) => mock[FormatWriter].asRight, - indexManager = mock[IndexManager[S3FileMetadata]], + writerIndexer = mock[WriterIndexer[S3FileMetadata]], _.asRight, + ((_: TopicPartition) => Offset(45).some).some, ) val result = wm.preCommit(Map(topicPartition -> new OffsetAndMetadata(999))) diff --git a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3SinkConfig.scala b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3SinkConfig.scala index 7134b213c..593683002 100644 --- a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3SinkConfig.scala +++ b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3SinkConfig.scala @@ -70,7 +70,7 @@ object S3SinkConfig extends PropsToConfigConverter[S3SinkConfig] { case class S3SinkConfig( connectionConfig: S3ConnectionConfig, bucketOptions: Seq[CloudSinkBucketOptions] = Seq.empty, - indexOptions: IndexOptions, + indexOptions: Option[IndexOptions], compressionCodec: CompressionCodec, batchDelete: Boolean, errorPolicy: ErrorPolicy, diff --git a/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/config/S3ConfigSettingsTest.scala b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/config/S3ConfigSettingsTest.scala index ccab6d4f0..ea17c0810 100644 --- a/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/config/S3ConfigSettingsTest.scala +++ b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/config/S3ConfigSettingsTest.scala @@ -35,7 +35,7 @@ class S3ConfigSettingsTest extends AnyFlatSpec with Matchers with LazyLogging { val configKeys = S3SinkConfigDef.config.configKeys().keySet().asScala ++ S3SourceConfigDef.config.configKeys().keySet().asScala - configKeys.size shouldBe 53 + configKeys.size shouldBe 54 configKeys.foreach { k => k.toLowerCase should be(k) } diff --git a/kafka-connect-azure-datalake/src/main/scala/io/lenses/streamreactor/connect/datalake/sink/config/DatalakeSinkConfig.scala b/kafka-connect-azure-datalake/src/main/scala/io/lenses/streamreactor/connect/datalake/sink/config/DatalakeSinkConfig.scala index b5a18b8f8..357f22066 100644 --- a/kafka-connect-azure-datalake/src/main/scala/io/lenses/streamreactor/connect/datalake/sink/config/DatalakeSinkConfig.scala +++ b/kafka-connect-azure-datalake/src/main/scala/io/lenses/streamreactor/connect/datalake/sink/config/DatalakeSinkConfig.scala @@ -65,7 +65,7 @@ object DatalakeSinkConfig extends PropsToConfigConverter[DatalakeSinkConfig] { case class DatalakeSinkConfig( connectionConfig: AzureConnectionConfig, bucketOptions: Seq[CloudSinkBucketOptions] = Seq.empty, - indexOptions: IndexOptions, + indexOptions: Option[IndexOptions], compressionCodec: CompressionCodec, errorPolicy: ErrorPolicy, connectorRetryConfig: RetryConfig, diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/config/IndexSettings.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/config/IndexSettings.scala index 31980e091..1b828e7f3 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/config/IndexSettings.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/config/IndexSettings.scala @@ -34,6 +34,11 @@ trait IndexConfigKeys extends WithConnectorPrefix { s"Name of the indexes directory" private val INDEXES_DIRECTORY_NAME_DEFAULT = ".indexes" + val ENABLE_EXACTLY_ONCE = s"$connectorPrefix.exactly.once.enable" + private val ENABLE_EXACTLY_ONCE_DOC = + s"Exactly once is enabled by default. It works by keeping an .indexes directory at the root of your bucket with subdirectories for indexes. Exactly once support can be disabled and the default offset tracking from kafka can be used instead by setting this to false." + private val ENABLE_EXACTLY_ONCE_DEFAULT = true + def addIndexSettingsToConfigDef(configDef: ConfigDef): ConfigDef = configDef .define( @@ -58,11 +63,22 @@ trait IndexConfigKeys extends WithConnectorPrefix { ConfigDef.Width.LONG, INDEXES_DIRECTORY_NAME, ) + .define( + ENABLE_EXACTLY_ONCE, + Type.BOOLEAN, + ENABLE_EXACTLY_ONCE_DEFAULT, + Importance.LOW, + ENABLE_EXACTLY_ONCE_DOC, + "Sink Seek", + 3, + ConfigDef.Width.NONE, + ENABLE_EXACTLY_ONCE, + ) } trait IndexSettings extends BaseSettings with IndexConfigKeys { - def getIndexSettings: IndexOptions = - IndexOptions( + def getIndexSettings: Option[IndexOptions] = + Option.when(getBoolean(ENABLE_EXACTLY_ONCE))(IndexOptions( getInt(SEEK_MAX_INDEX_FILES), getString(INDEXES_DIRECTORY_NAME), - ) + )) } diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/config/traits/CloudConfig.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/config/traits/CloudConfig.scala index a52bae388..632617500 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/config/traits/CloudConfig.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/config/traits/CloudConfig.scala @@ -56,7 +56,7 @@ trait CloudSinkConfig[CC] extends CloudConfig { * * @return The offset seeker options for the cloud sink. */ - def indexOptions: IndexOptions + def indexOptions: Option[IndexOptions] /** * Retrieves the compression codec for the cloud sink. diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/CloudSinkTask.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/CloudSinkTask.scala index 51f3d256c..c910bcd85 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/CloudSinkTask.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/CloudSinkTask.scala @@ -32,6 +32,7 @@ import io.lenses.streamreactor.connect.cloud.common.model.Topic import io.lenses.streamreactor.connect.cloud.common.model.TopicPartition import io.lenses.streamreactor.connect.cloud.common.sink.conversion.HeaderToStringConverter import io.lenses.streamreactor.connect.cloud.common.sink.conversion.ValueToSinkDataConverter +import io.lenses.streamreactor.connect.cloud.common.sink.seek.IndexManager import io.lenses.streamreactor.connect.cloud.common.sink.writer.WriterManager import io.lenses.streamreactor.connect.cloud.common.storage.FileMetadata import io.lenses.streamreactor.connect.cloud.common.storage.StorageInterface @@ -69,8 +70,10 @@ abstract class CloudSinkTask[MD <: FileMetadata, C <: CloudSinkConfig[CC], CC <: private val writerManagerCreator = new WriterManagerCreator[MD, C]() private var logMetrics = false - private var writerManager: WriterManager[MD] = _ - implicit var connectorTaskId: ConnectorTaskId = _ + private var writerManager: WriterManager[MD] = _ + private var maybeIndexManager: Option[IndexManager[MD]] = _ + + implicit var connectorTaskId: ConnectorTaskId = _ override def version(): String = manifest.getVersion() @@ -89,7 +92,11 @@ abstract class CloudSinkTask[MD <: FileMetadata, C <: CloudSinkConfig[CC], CC <: val props = MapUtils.mergeProps(contextProps, fallbackProps.asScala.toMap) val errOrWriterMan = createWriterMan(props) - errOrWriterMan.leftMap(throw _).foreach(writerManager = _) + errOrWriterMan.leftMap(throw _).foreach { + case (mim, wm) => + maybeIndexManager = mim + writerManager = wm + } } private def rollback(topicPartitions: Set[TopicPartition]): Unit = @@ -217,29 +224,30 @@ abstract class CloudSinkTask[MD <: FileMetadata, C <: CloudSinkConfig[CC], CC <: actualOffsets } - override def open(partitions: util.Collection[KafkaTopicPartition]): Unit = { - val partitionsDebug = partitions.asScala.map(tp => s"${tp.topic()}-${tp.partition()}").mkString(",") - logger.debug(s"[{}] Open partitions", connectorTaskId.show, partitionsDebug: Any) - - val topicPartitions = partitions.asScala - .map(tp => TopicPartition(Topic(tp.topic), tp.partition)) - .toSet - - handleErrors( - for { - tpoMap <- writerManager.open(topicPartitions) - } yield { - tpoMap.foreach { - case (topicPartition, offset) => - logger.debug( - s"[${connectorTaskId.show}] Seeking to ${topicPartition.topic.value}-${topicPartition.partition}:${offset.value}", - ) - context.offset(topicPartition.toKafka, offset.value) - } - }, - ) - - } + override def open(partitions: util.Collection[KafkaTopicPartition]): Unit = + maybeIndexManager.foreach { + indexManager => + val partitionsDebug = partitions.asScala.map(tp => s"${tp.topic()}-${tp.partition()}").mkString(",") + logger.debug(s"[{}] Open partitions", connectorTaskId.show, partitionsDebug: Any) + + val topicPartitions = partitions.asScala + .map(tp => TopicPartition(Topic(tp.topic), tp.partition)) + .toSet + + handleErrors( + for { + tpoMap <- indexManager.open(topicPartitions) + } yield { + tpoMap.foreach { + case (topicPartition, offset) => + logger.debug( + s"[${connectorTaskId.show}] Seeking to ${topicPartition.topic.value}-${topicPartition.partition}:${offset.value}", + ) + context.offset(topicPartition.toKafka, offset.value) + } + }, + ) + } /** * Whenever close is called, the topics and partitions assigned to this task @@ -269,17 +277,21 @@ abstract class CloudSinkTask[MD <: FileMetadata, C <: CloudSinkConfig[CC], CC <: def convertPropsToConfig(connectorTaskId: ConnectorTaskId, props: Map[String, String]): Either[Throwable, C] - private def createWriterMan(props: Map[String, String]): Either[Throwable, WriterManager[MD]] = + private def createWriterMan( + props: Map[String, String], + ): Either[Throwable, (Option[IndexManager[MD]], WriterManager[MD])] = for { config <- convertPropsToConfig(connectorTaskId, props) s3Client <- createClient(config.connectionConfig) storageInterface = createStorageInterface(connectorTaskId, config, s3Client) _ <- setRetryInterval(config) - writerManager <- Try(writerManagerCreator.from(config)(connectorTaskId, storageInterface)).toEither - _ <- initializeFromConfig(config) + (maybeIndexManager, writerManager) <- Try( + writerManagerCreator.from(config)(connectorTaskId, storageInterface), + ).toEither + _ <- initializeFromConfig(config) } yield { logMetrics = config.logMetrics - writerManager + (maybeIndexManager, writerManager) } private def initializeFromConfig(config: C): Either[Throwable, Unit] = diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/WriterManagerCreator.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/WriterManagerCreator.scala index 008b6ce9b..bf1261944 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/WriterManagerCreator.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/WriterManagerCreator.scala @@ -33,6 +33,7 @@ import io.lenses.streamreactor.connect.cloud.common.sink.naming.KeyNamer import io.lenses.streamreactor.connect.cloud.common.sink.naming.ObjectKeyBuilder import io.lenses.streamreactor.connect.cloud.common.sink.seek.IndexManager import io.lenses.streamreactor.connect.cloud.common.sink.transformers.TopicsTransformers +import io.lenses.streamreactor.connect.cloud.common.sink.writer.WriterIndexer import io.lenses.streamreactor.connect.cloud.common.sink.writer.WriterManager import io.lenses.streamreactor.connect.cloud.common.storage.FileMetadata import io.lenses.streamreactor.connect.cloud.common.storage.StorageInterface @@ -48,7 +49,7 @@ class WriterManagerCreator[MD <: FileMetadata, SC <: CloudSinkConfig[_]] extends implicit connectorTaskId: ConnectorTaskId, storageInterface: StorageInterface[MD], - ): WriterManager[MD] = { + ): (Option[IndexManager[MD]], WriterManager[MD]) = { val bucketAndPrefixFn: TopicPartition => Either[SinkError, CloudLocation] = topicPartition => { bucketOptsForTopic(config, topicPartition.topic) match { @@ -120,22 +121,28 @@ class WriterManagerCreator[MD <: FileMetadata, SC <: CloudSinkConfig[_]] extends case None => FatalCloudSinkError("Can't find format choice in config", topicPartition).asLeft } - val indexManager = new IndexManager( - config.indexOptions.maxIndexFiles, - new IndexFilenames(config.indexOptions.indexesDirectoryName), + val indexManager = config.indexOptions.map(io => + new IndexManager( + io.maxIndexFiles, + new IndexFilenames(io.indexesDirectoryName), + bucketAndPrefixFn, + ), ) + val writerIndexer = new WriterIndexer[MD](indexManager) val transformers = TopicsTransformers.from(config.bucketOptions) - new WriterManager( + val writerManager = new WriterManager( commitPolicyFn, bucketAndPrefixFn, keyNamerBuilderFn, stagingFilenameFn, finalFilenameFn, formatWriterFn, - indexManager, + writerIndexer, transformers.transform, + indexManager.map(_.getSeekedOffsetForTopicPartition), ) + (indexManager, writerManager) } private def bucketOptsForTopic(config: CloudSinkConfig[_], topic: Topic): Option[CloudSinkBucketOptions] = diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/seek/IndexManager.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/seek/IndexManager.scala index 09696d5c3..29d40163b 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/seek/IndexManager.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/seek/IndexManager.scala @@ -18,9 +18,12 @@ package io.lenses.streamreactor.connect.cloud.common.sink.seek import cats.implicits._ import com.typesafe.scalalogging.LazyLogging import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId +import io.lenses.streamreactor.connect.cloud.common.model.Offset import io.lenses.streamreactor.connect.cloud.common.model.TopicPartition import io.lenses.streamreactor.connect.cloud.common.model.TopicPartitionOffset import io.lenses.streamreactor.connect.cloud.common.model.UploadableString +import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocation +import io.lenses.streamreactor.connect.cloud.common.sink.BatchCloudSinkError import io.lenses.streamreactor.connect.cloud.common.sink.FatalCloudSinkError import io.lenses.streamreactor.connect.cloud.common.sink.NonFatalCloudSinkError import io.lenses.streamreactor.connect.cloud.common.sink.SinkError @@ -29,15 +32,22 @@ import io.lenses.streamreactor.connect.cloud.common.sink.seek.IndexManagerErrors import io.lenses.streamreactor.connect.cloud.common.sink.seek.IndexManagerErrors.fileDeleteError import io.lenses.streamreactor.connect.cloud.common.storage._ +import scala.collection.mutable + class IndexManager[SM <: FileMetadata]( - maxIndexes: Int, - indexFilenames: IndexFilenames, + maxIndexes: Int, + indexFilenames: IndexFilenames, + bucketAndPrefixFn: TopicPartition => Either[SinkError, CloudLocation], )( implicit connectorTaskId: ConnectorTaskId, storageInterface: StorageInterface[SM], ) extends LazyLogging { + // A mutable map that stores the latest offset for each TopicPartition that was seeked during the initialization of the Kafka Connect SinkTask. + // The key is the TopicPartition and the value is the corresponding Offset. + private val seekedOffsets = mutable.Map.empty[TopicPartition, Offset] + /** * deletes all index files except for the one corresponding to topicPartitionOffset * @@ -160,7 +170,7 @@ class IndexManager[SM <: FileMetadata]( } } - private def seekAndClean( + def seekAndClean( topicPartition: TopicPartition, bucket: String, indexes: Seq[String], @@ -217,4 +227,61 @@ class IndexManager[SM <: FileMetadata]( case _ => result } } + + /** + * Opens the `IndexManager` for a set of `TopicPartition`s. + * + * This method is called at the start of a Kafka Connect SinkTask when it is first initialized. + * It seeks the filesystem to find the latest offsets for each `TopicPartition` in the provided set. + * The results are stored in the `seekedOffsets` map for later use. + * + * @param partitions A set of `TopicPartition`s for which to retrieve the offsets. + * @return Either a `SinkError` if an error occurred during the operation, or a `Map[TopicPartition, Offset]` containing the seeked offsets for each `TopicPartition`. + */ + def open(partitions: Set[TopicPartition]): Either[SinkError, Map[TopicPartition, Offset]] = { + logger.debug(s"[{}] Received call to WriterManager.open", connectorTaskId.show) + + partitions + .map(seekOffsetsForTopicPartition) + .partitionMap(identity) match { + case (throwables, _) if throwables.nonEmpty => BatchCloudSinkError(throwables).asLeft + case (_, offsets) => + val seeked = offsets.flatten.map( + _.toTopicPartitionOffsetTuple, + ).toMap + seekedOffsets ++= seeked + seeked.asRight + } + } + + /** + * Seeks the filesystem to find the latest offset for a specific `TopicPartition`. + * + * This method is used during the initialization of a Kafka Connect SinkTask to find the latest offset for a specific `TopicPartition`. + * The result is stored in the `seekedOffsets` map for later use. + * + * @param topicPartition The `TopicPartition` for which to retrieve the offset. + * @return Either a `SinkError` if an error occurred during the operation, or an `Option[TopicPartitionOffset]` containing the seeked offset for the `TopicPartition`. + */ + private def seekOffsetsForTopicPartition( + topicPartition: TopicPartition, + ): Either[SinkError, Option[TopicPartitionOffset]] = { + logger.debug(s"[{}] seekOffsetsForTopicPartition {}", connectorTaskId.show, topicPartition) + for { + bucketAndPrefix <- bucketAndPrefixFn(topicPartition) + offset <- initialSeek(topicPartition, bucketAndPrefix.bucket) + } yield offset + } + + /** + * Retrieves the latest offset for a specific `TopicPartition` that was seeked during the initialization of the Kafka Connect SinkTask. + * + * This method is used to get the latest offset for a specific `TopicPartition` from the `seekedOffsets` map. + * + * @param topicPartition The `TopicPartition` for which to retrieve the offset. + * @return An `Option[Offset]` containing the seeked offset for the `TopicPartition` if it exists, or `None` if it does not. + */ + def getSeekedOffsetForTopicPartition(topicPartition: TopicPartition): Option[Offset] = + seekedOffsets.get(topicPartition) + } diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/writer/Writer.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/writer/Writer.scala index 972310fc0..7bdecf488 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/writer/Writer.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/writer/Writer.scala @@ -29,7 +29,6 @@ import io.lenses.streamreactor.connect.cloud.common.sink.SinkError import io.lenses.streamreactor.connect.cloud.common.sink.commit.CloudCommitContext import io.lenses.streamreactor.connect.cloud.common.sink.commit.CommitPolicy import io.lenses.streamreactor.connect.cloud.common.sink.naming.ObjectKeyBuilder -import io.lenses.streamreactor.connect.cloud.common.sink.seek.IndexManager import io.lenses.streamreactor.connect.cloud.common.storage._ import org.apache.kafka.connect.data.Schema @@ -40,7 +39,7 @@ import scala.util.Try class Writer[SM <: FileMetadata]( topicPartition: TopicPartition, commitPolicy: CommitPolicy, - indexManager: IndexManager[SM], + writerIndexer: WriterIndexer[SM], stagingFilenameFn: () => Either[SinkError, File], objectKeyBuilder: ObjectKeyBuilder, formatWriterFn: File => Either[SinkError, FormatWriter], @@ -117,10 +116,10 @@ class Writer[SM <: FileMetadata]( for { key <- objectKeyBuilder.build(uncommittedOffset, earliestRecordTimestamp, latestRecordTimestamp) path <- key.path.toRight(NonFatalCloudSinkError("No path exists within cloud location")) - indexFileName <- indexManager.write( - key.bucket, - path, - topicPartition.withOffset(uncommittedOffset), + maybeIndexFileName: Option[String] <- writerIndexer.writeIndex(topicPartition, + key.bucket, + uncommittedOffset, + path, ) _ <- storageInterface.uploadFile(UploadableFile(file), key.bucket, path) .recover { @@ -130,7 +129,7 @@ class Writer[SM <: FileMetadata]( .leftMap { case UploadFailedError(exception, _) => NonFatalCloudSinkError(exception.getMessage, exception.some) } - _ <- indexManager.clean(key.bucket, indexFileName, topicPartition) + _ <- writerIndexer.cleanIndex(topicPartition, key, maybeIndexFileName) stateReset <- Try { logger.debug(s"[{}] Writer.resetState: Resetting state $writeState", connectorTaskId.show) writeState = uploadState.toNoWriter diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/writer/WriterIndexer.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/writer/WriterIndexer.scala new file mode 100644 index 000000000..17a964b1a --- /dev/null +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/writer/WriterIndexer.scala @@ -0,0 +1,92 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.cloud.common.sink.writer + +import cats.implicits.catsSyntaxOptionId +import cats.implicits.toTraverseOps +import io.lenses.streamreactor.connect.cloud.common.model.Offset +import io.lenses.streamreactor.connect.cloud.common.model.TopicPartition +import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocation +import io.lenses.streamreactor.connect.cloud.common.sink.SinkError +import io.lenses.streamreactor.connect.cloud.common.sink.seek.IndexManager +import io.lenses.streamreactor.connect.cloud.common.storage.FileMetadata + +/** + * The `WriterIndexer` class provides the indexing operations for a Writer. + * It handles the optional nature of the `IndexManager` and provides methods for writing and cleaning the index. + * + * @constructor create a new `WriterIndexer` with an optional `IndexManager`. + * @param maybeIndexManager An optional `IndexManager`. If provided, `WriterIndexer` will use it to perform index operations. + * @tparam SM The type of `FileMetadata` used by the `IndexManager`. + */ +class WriterIndexer[SM <: FileMetadata](maybeIndexManager: Option[IndexManager[SM]]) { + + /** + * Executes a function with the `IndexManager` and a provided value if both are present. + * + * @param option An optional value of type `A`. If present, the function `f` will be executed with this value and the `IndexManager`. + * @param f A function that takes an `IndexManager` and a value of type `A` and returns an `Either[SinkError, B]`. + * @tparam A The type of the optional value. + * @tparam B The type of the result of the function `f`. + * @return An `Either[SinkError, Option[B]]` that contains a `SinkError` if the operation failed, or an `Option[B]` with the result of the function `f` if the operation succeeded. + */ + private def withIndexManager[A, B]( + option: Option[A], + )(f: (IndexManager[SM], A) => Either[SinkError, B], + ): Either[SinkError, Option[B]] = { + for { + indexManager <- maybeIndexManager + value <- option + } yield f(indexManager, value) + }.sequence + + /** + * Writes an index entry. + * + * @param topicPartition The `TopicPartition` for which to write the index entry. + * @param bucket The bucket in which the index entry is to be written. + * @param uncommittedOffset The `Offset` to be written to the index. + * @param path The path where the index entry is to be written. + * @return An `Either` that contains a `SinkError` if the operation failed, or an `Option[String]` with the index entry if the operation succeeded. + */ + def writeIndex( + topicPartition: TopicPartition, + bucket: String, + uncommittedOffset: Offset, + path: String, + ): Either[SinkError, Option[String]] = + withIndexManager(path.some) { (indexManager, path) => + indexManager.write(bucket, path, topicPartition.withOffset(uncommittedOffset)) + } + + /** + * Cleans an index entry. + * + * @param topicPartition The `TopicPartition` for which to clean the index entry. + * @param key The `CloudLocation` of the index entry to be cleaned. + * @param maybeIndexFileName An optional index file name. If provided, the method will clean the index entry in this file. + * @return An `Either` that contains a `SinkError` if the operation failed, or an `Option[Int]` with the result of the clean operation if the operation succeeded. + */ + def cleanIndex( + topicPartition: TopicPartition, + key: CloudLocation, + maybeIndexFileName: Option[String], + ): Either[SinkError, Option[Int]] = + withIndexManager(maybeIndexFileName) { (indexManager, indexFileName) => + indexManager.clean(key.bucket, indexFileName, topicPartition) + } + +} diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/writer/WriterManager.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/writer/WriterManager.scala index 89e9aeec7..2fc99af77 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/writer/WriterManager.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/writer/WriterManager.scala @@ -18,21 +18,20 @@ package io.lenses.streamreactor.connect.cloud.common.sink.writer import cats.implicits._ import com.typesafe.scalalogging.StrictLogging import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId -import io.lenses.streamreactor.connect.cloud.common.formats.writer.MessageDetail import io.lenses.streamreactor.connect.cloud.common.formats.writer.FormatWriter -import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocation +import io.lenses.streamreactor.connect.cloud.common.formats.writer.MessageDetail import io.lenses.streamreactor.connect.cloud.common.model.Offset import io.lenses.streamreactor.connect.cloud.common.model.TopicPartition import io.lenses.streamreactor.connect.cloud.common.model.TopicPartitionOffset +import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocation import io.lenses.streamreactor.connect.cloud.common.sink -import io.lenses.streamreactor.connect.cloud.common.sink.commit.CommitPolicy -import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionField -import io.lenses.streamreactor.connect.cloud.common.sink.naming.ObjectKeyBuilder -import io.lenses.streamreactor.connect.cloud.common.sink.naming.KeyNamer -import io.lenses.streamreactor.connect.cloud.common.sink.seek.IndexManager import io.lenses.streamreactor.connect.cloud.common.sink.BatchCloudSinkError import io.lenses.streamreactor.connect.cloud.common.sink.FatalCloudSinkError import io.lenses.streamreactor.connect.cloud.common.sink.SinkError +import io.lenses.streamreactor.connect.cloud.common.sink.commit.CommitPolicy +import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionField +import io.lenses.streamreactor.connect.cloud.common.sink.naming.KeyNamer +import io.lenses.streamreactor.connect.cloud.common.sink.naming.ObjectKeyBuilder import io.lenses.streamreactor.connect.cloud.common.storage.FileMetadata import io.lenses.streamreactor.connect.cloud.common.storage.StorageInterface import org.apache.kafka.clients.consumer.OffsetAndMetadata @@ -61,8 +60,9 @@ class WriterManager[SM <: FileMetadata]( stagingFilenameFn: (TopicPartition, Map[PartitionField, String]) => Either[SinkError, File], objKeyBuilderFn: (TopicPartition, Map[PartitionField, String]) => ObjectKeyBuilder, formatWriterFn: (TopicPartition, File) => Either[SinkError, FormatWriter], - indexManager: IndexManager[SM], + writerIndexer: WriterIndexer[SM], transformerF: MessageDetail => Either[RuntimeException, MessageDetail], + getOffsetFn: Option[TopicPartition => Option[Offset]], )( implicit connectorTaskId: ConnectorTaskId, @@ -71,8 +71,6 @@ class WriterManager[SM <: FileMetadata]( private val writers = mutable.Map.empty[MapKey, Writer[SM]] - private val seekedOffsets = mutable.Map.empty[TopicPartition, Offset] - def commitAllWritersIfFlushRequired(): Either[BatchCloudSinkError, Unit] = if (writers.values.exists(_.shouldFlush)) { commitAllWriters() @@ -127,32 +125,6 @@ class WriterManager[SM <: FileMetadata]( } - def open(partitions: Set[TopicPartition]): Either[SinkError, Map[TopicPartition, Offset]] = { - logger.debug(s"[{}] Received call to WriterManager.open", connectorTaskId.show) - - partitions - .map(seekOffsetsForTopicPartition) - .partitionMap(identity) match { - case (throwables, _) if throwables.nonEmpty => BatchCloudSinkError(throwables).asLeft - case (_, offsets) => - val seeked = offsets.flatten.map( - _.toTopicPartitionOffsetTuple, - ).toMap - seekedOffsets ++= seeked - seeked.asRight - } - } - - private def seekOffsetsForTopicPartition( - topicPartition: TopicPartition, - ): Either[SinkError, Option[TopicPartitionOffset]] = { - logger.debug(s"[{}] seekOffsetsForTopicPartition {}", connectorTaskId.show, topicPartition) - for { - bucketAndPrefix <- bucketAndPrefixFn(topicPartition) - offset <- indexManager.initialSeek(topicPartition, bucketAndPrefix.bucket) - } yield offset - } - def close(): Unit = { logger.debug(s"[{}] Received call to WriterManager.close", connectorTaskId.show) writers.values.foreach(_.close()) @@ -244,11 +216,11 @@ class WriterManager[SM <: FileMetadata]( new Writer( topicPartition, commitPolicy, - indexManager, + writerIndexer, () => stagingFilenameFn(topicPartition, partitionValues), objKeyBuilderFn(topicPartition, partitionValues), formatWriterFn.curried(topicPartition), - seekedOffsets.get(topicPartition), + getOffsetFn.map(_(topicPartition)).orNull, ) } } diff --git a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/WriterManagerCreatorTest.scala b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/WriterManagerCreatorTest.scala index 26a9fbb61..b3c954e21 100644 --- a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/WriterManagerCreatorTest.scala +++ b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/WriterManagerCreatorTest.scala @@ -15,6 +15,7 @@ */ package io.lenses.streamreactor.connect.cloud.common.sink +import cats.implicits.catsSyntaxOptionId import io.lenses.streamreactor.common.config.base.RetryConfig import io.lenses.streamreactor.common.config.base.intf.ConnectionConfig import io.lenses.streamreactor.common.errors.NoopErrorPolicy @@ -24,22 +25,24 @@ import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodec import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodecName import io.lenses.streamreactor.connect.cloud.common.sink.config.CloudSinkBucketOptions import io.lenses.streamreactor.connect.cloud.common.sink.config.IndexOptions +import io.lenses.streamreactor.connect.cloud.common.sink.seek.IndexManager import io.lenses.streamreactor.connect.cloud.common.sink.writer.WriterManager import io.lenses.streamreactor.connect.cloud.common.storage.FileMetadata import io.lenses.streamreactor.connect.cloud.common.storage.StorageInterface import org.mockito.MockitoSugar +import org.scalatest.OptionValues import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers import java.time.Instant -class WriterManagerCreatorTest extends AnyFunSuite with Matchers with MockitoSugar { +class WriterManagerCreatorTest extends AnyFunSuite with Matchers with MockitoSugar with OptionValues { case class FakeConnectionConfig() extends ConnectionConfig case class FakeCloudSinkConfig( connectionConfig: FakeConnectionConfig, bucketOptions: Seq[CloudSinkBucketOptions], - indexOptions: IndexOptions, + indexOptions: Option[IndexOptions], compressionCodec: CompressionCodec, connectorRetryConfig: RetryConfig, errorPolicy: NoopErrorPolicy, @@ -57,15 +60,16 @@ class WriterManagerCreatorTest extends AnyFunSuite with Matchers with MockitoSug val config = FakeCloudSinkConfig( connectionConfig = FakeConnectionConfig(), bucketOptions = Seq.empty, - indexOptions = IndexOptions(maxIndexFiles = 10, ".indexes"), + indexOptions = IndexOptions(maxIndexFiles = 10, ".indexes").some, compressionCodec = CompressionCodecName.ZSTD.toCodec(), errorPolicy = NoopErrorPolicy(), connectorRetryConfig = new RetryConfig(1, 1L, 1.0), ) - val writerManagerCreator = new WriterManagerCreator[FakeFileMetadata, FakeCloudSinkConfig]() - val writerManager = writerManagerCreator.from(config) + val writerManagerCreator = new WriterManagerCreator[FakeFileMetadata, FakeCloudSinkConfig]() + val (indexManager, writerManager) = writerManagerCreator.from(config) writerManager shouldBe a[WriterManager[_]] + indexManager.value shouldBe a[IndexManager[_]] } } diff --git a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/seek/IndexManagerTest.scala b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/seek/IndexManagerTest.scala index ec284146e..7593514b8 100644 --- a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/seek/IndexManagerTest.scala +++ b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/seek/IndexManagerTest.scala @@ -17,14 +17,17 @@ package io.lenses.streamreactor.connect.cloud.common.sink.seek import cats.implicits.catsSyntaxEitherId import cats.implicits.catsSyntaxOptionId +import cats.implicits.none import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId import io.lenses.streamreactor.connect.cloud.common.model.Offset import io.lenses.streamreactor.connect.cloud.common.model.Topic import io.lenses.streamreactor.connect.cloud.common.model.UploadableString +import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocation import io.lenses.streamreactor.connect.cloud.common.sink.FatalCloudSinkError import io.lenses.streamreactor.connect.cloud.common.sink.NonFatalCloudSinkError import io.lenses.streamreactor.connect.cloud.common.sink.naming.IndexFilenames import io.lenses.streamreactor.connect.cloud.common.storage._ +import io.lenses.streamreactor.connect.cloud.common.utils.SampleData.cloudLocationValidator import org.mockito.ArgumentMatchers.any import org.mockito.ArgumentMatchers.anyString import org.mockito.ArgumentMatchers.{ eq => eqTo } @@ -53,7 +56,11 @@ class IndexManagerTest extends AnyFlatSpec with MockitoSugar with EitherValues w private val maxIndexes = 5 - private val indexManager = new IndexManager(maxIndexes, new IndexFilenames(".indexes")) + private val indexManager = new IndexManager( + maxIndexes, + new IndexFilenames(".indexes"), + _ => CloudLocation(targetPath, none, none, none).asRight, + ) before { when(storageInterface.system()).thenReturn("TestaCloud") @@ -375,4 +382,5 @@ class IndexManagerTest extends AnyFlatSpec with MockitoSugar with EitherValues w val result = indexManager.handleSeekAndCleanErrors(fileNameParseError) result shouldBe a[NonFatalCloudSinkError] } + } diff --git a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/writer/WriterIndexerTest.scala b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/writer/WriterIndexerTest.scala new file mode 100644 index 000000000..3ed09a9b1 --- /dev/null +++ b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/writer/WriterIndexerTest.scala @@ -0,0 +1,81 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.cloud.common.sink.writer + +import cats.data.Validated +import cats.implicits.catsSyntaxOptionId +import cats.implicits.none +import io.lenses.streamreactor.connect.cloud.common.model.Offset +import io.lenses.streamreactor.connect.cloud.common.model.Topic +import io.lenses.streamreactor.connect.cloud.common.model.TopicPartition +import io.lenses.streamreactor.connect.cloud.common.model.TopicPartitionOffset +import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocation +import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocationValidator +import io.lenses.streamreactor.connect.cloud.common.sink.seek.IndexManager +import io.lenses.streamreactor.connect.cloud.common.storage.FileMetadata +import org.mockito.ArgumentMatchers._ +import org.mockito.MockitoSugar +import org.scalatest.BeforeAndAfter +import org.scalatest.EitherValues +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +class WriterIndexerTest extends AnyFlatSpec with Matchers with MockitoSugar with EitherValues with BeforeAndAfter { + + private implicit val cloudLocationValidator: CloudLocationValidator = (location: CloudLocation) => + Validated.Valid(location) + + private val mockIndexManager: IndexManager[FileMetadata] = mock[IndexManager[FileMetadata]] + private val writerIndexer: WriterIndexer[FileMetadata] = new WriterIndexer(Some(mockIndexManager)) + + private val bucket = "myTestBucket" + private val topicPartition: TopicPartition = TopicPartition(Topic("testTopic"), 1) + private val cloudLocation: CloudLocation = CloudLocation(bucket, "testKey".some) + + before { + reset(mockIndexManager) + } + + "writeIndex" should "call IndexManager's write method with correct parameters" in { + val uncommittedOffset = Offset(100) + val path = "testPath" + + when(mockIndexManager.write(anyString, anyString, any[TopicPartitionOffset])).thenReturn(Right("index")) + + val result = writerIndexer.writeIndex(topicPartition, bucket, uncommittedOffset, path) + result.value should be(Some("index")) + + verify(mockIndexManager).write(bucket, path, topicPartition.withOffset(uncommittedOffset)) + } + + "cleanIndex" should "call IndexManager's clean method with correct parameters" in { + val indexFileName = "indexFileName" + + when(mockIndexManager.clean(anyString, anyString, any[TopicPartition])).thenReturn(Right(1)) + + val result = writerIndexer.cleanIndex(topicPartition, cloudLocation, indexFileName.some) + result.value should be(Some(1)) + + verify(mockIndexManager).clean(bucket, indexFileName, topicPartition) + } + + "cleanIndex" should "not call IndexManager when indexFileName is None" in { + val result = writerIndexer.cleanIndex(topicPartition, cloudLocation, none) + result.value should be(None) + + verifyZeroInteractions(mockIndexManager) + } + +} diff --git a/kafka-connect-gcp-storage/src/main/scala/io/lenses/streamreactor/connect/gcp/storage/sink/config/GCPStorageSinkConfig.scala b/kafka-connect-gcp-storage/src/main/scala/io/lenses/streamreactor/connect/gcp/storage/sink/config/GCPStorageSinkConfig.scala index c87a21921..70cc0b13a 100644 --- a/kafka-connect-gcp-storage/src/main/scala/io/lenses/streamreactor/connect/gcp/storage/sink/config/GCPStorageSinkConfig.scala +++ b/kafka-connect-gcp-storage/src/main/scala/io/lenses/streamreactor/connect/gcp/storage/sink/config/GCPStorageSinkConfig.scala @@ -69,7 +69,7 @@ object GCPStorageSinkConfig extends PropsToConfigConverter[GCPStorageSinkConfig] case class GCPStorageSinkConfig( connectionConfig: GCPConnectionConfig, bucketOptions: Seq[CloudSinkBucketOptions] = Seq.empty, - indexOptions: IndexOptions, + indexOptions: Option[IndexOptions], compressionCodec: CompressionCodec, avoidResumableUpload: Boolean, connectorRetryConfig: RetryConfig, diff --git a/kafka-connect-gcp-storage/src/test/scala/io/lenses/streamreactor/connect/gcp/storage/config/GCPConfigSettingsTest.scala b/kafka-connect-gcp-storage/src/test/scala/io/lenses/streamreactor/connect/gcp/storage/config/GCPConfigSettingsTest.scala index ca481a53a..3f51ee6f5 100644 --- a/kafka-connect-gcp-storage/src/test/scala/io/lenses/streamreactor/connect/gcp/storage/config/GCPConfigSettingsTest.scala +++ b/kafka-connect-gcp-storage/src/test/scala/io/lenses/streamreactor/connect/gcp/storage/config/GCPConfigSettingsTest.scala @@ -29,7 +29,7 @@ class GCPConfigSettingsTest extends AnyFlatSpec with Matchers with LazyLogging { val configKeys = GCPStorageSinkConfigDef.config.configKeys().keySet().asScala - configKeys.size shouldBe 25 + configKeys.size shouldBe 26 configKeys.foreach { k => k.toLowerCase should be(k) }