From e9df8d2ed0645acf25833beea6ca2f218979bc69 Mon Sep 17 00:00:00 2001 From: David Sloan <33483659+davidsloan@users.noreply.github.com> Date: Mon, 12 Aug 2024 10:01:36 +0100 Subject: [PATCH] Introduce File Extension Filtering Logic for GCP Storage and AWS S3 Connectors (#64) This PR introduces new filtering logic to the GCP Storage Source and AWS S3 Source connectors, allowing users to include or exclude files based on their extensions during the source file search process. This enhancement provides finer control over which files are processed, improving the flexibility and efficiency of data ingestion. GCP Storage Source Connector New Properties: connect.gcpstorage.source.extension.excludes: Description: A comma-separated list of file extensions to exclude from the source file search. If this property is not configured, all files are considered. Default: null (No filtering is enabled by default; all files are considered) connect.gcpstorage.source.extension.includes: Description: A comma-separated list of file extensions to include in the source file search. If this property is not configured, all files are considered. Default: null (All extensions are included by default) AWS S3 Source Connector New Properties: connect.s3.source.extension.excludes: Description: A comma-separated list of file extensions to exclude from the source file search. If this property is not configured, all files are considered. Default: null (No filtering is enabled by default; all files are considered) connect.s3.source.extension.includes: Description: A comma-separated list of file extensions to include in the source file search. If this property is not configured, all files are considered. Default: null (All extensions are included by default) How It Works Include Filtering: If the source.extension.includes property is set, only files with extensions listed in this property will be considered for processing. Exclude Filtering: If the source.extension.excludes property is set, files with extensions listed in this property will be ignored during processing. Combined Use: When both properties are set, the connector will only include files that match the includes property and do not match the excludes property. Use Cases: Inclusion: Users can specify certain file types to process (e.g., .csv, .json), ensuring that only relevant files are ingested. Exclusion: Users can exclude files with extensions that should not be processed (e.g., temporary files like .tmp or backup files like .bak). * Source extension filters: part 1 * Wiring in * Addressing review comments * Making documentation more specific --- .../aws/s3/utils/S3ProxyContainerTest.scala | 10 +- .../connect/aws/s3/sink/S3SinkTask.scala | 6 +- .../connect/aws/s3/source/S3SourceTask.scala | 6 +- .../aws/s3/source/config/S3SourceConfig.scala | 3 + .../s3/source/config/S3SourceConfigDef.scala | 1 + .../s3/storage/AwsS3StorageInterface.scala | 5 +- .../aws/s3/config/S3ConfigSettingsTest.scala | 2 +- .../common/config/traits/CloudConfig.scala | 12 ++ .../source/config/CloudSourceSettings.scala | 30 ++++ .../config/CloudSourceSettingsKeys.scala | 55 ++++++- .../source/config/S3SourceBucketOptions.scala | 15 -- .../common/storage/ExtensionFilter.scala | 63 ++++++++ .../config/CloudSourceSettingsTest.scala | 69 ++++++++ .../common/storage/ExtensionFilterTest.scala | 150 ++++++++++++++++++ .../storage/utils/GCPProxyContainerTest.scala | 4 +- .../gcp/storage/sink/GCPStorageSinkTask.scala | 6 +- .../storage/source/GCPStorageSourceTask.scala | 6 +- .../config/GCPStorageSourceConfig.scala | 3 + .../config/GCPStorageSourceConfigDef.scala | 1 + .../storage/GCPStorageStorageInterface.scala | 53 +++++-- .../GCPStorageStorageInterfaceTest.scala | 3 +- 21 files changed, 460 insertions(+), 43 deletions(-) delete mode 100644 kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/config/S3SourceBucketOptions.scala create mode 100644 kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/storage/ExtensionFilter.scala create mode 100644 kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/source/config/CloudSourceSettingsTest.scala create mode 100644 kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/storage/ExtensionFilterTest.scala diff --git a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/utils/S3ProxyContainerTest.scala b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/utils/S3ProxyContainerTest.scala index d85998041..6730ad3f1 100644 --- a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/utils/S3ProxyContainerTest.scala +++ b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/utils/S3ProxyContainerTest.scala @@ -1,8 +1,8 @@ package io.lenses.streamreactor.connect.aws.s3.utils import com.typesafe.scalalogging.LazyLogging import io.lenses.streamreactor.connect.aws.s3.auth.AwsS3ClientCreator -import io.lenses.streamreactor.connect.aws.s3.config.S3ConfigSettings._ import io.lenses.streamreactor.connect.aws.s3.config.AuthMode +import io.lenses.streamreactor.connect.aws.s3.config.S3ConfigSettings._ import io.lenses.streamreactor.connect.aws.s3.config.S3ConnectionConfig import io.lenses.streamreactor.connect.aws.s3.sink.S3SinkTask import io.lenses.streamreactor.connect.aws.s3.sink.config.S3SinkConfig @@ -44,7 +44,13 @@ trait S3ProxyContainerTest override val prefix: String = "connect.s3" override def createStorageInterface(client: S3Client): Either[Throwable, AwsS3StorageInterface] = - Try(new AwsS3StorageInterface(connectorTaskId, client, true)).toEither + Try( + new AwsS3StorageInterface(connectorTaskId = connectorTaskId, + s3Client = client, + batchDelete = true, + extensionFilter = Option.empty, + ), + ).toEither override def createClient(): Either[Throwable, S3Client] = { diff --git a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3SinkTask.scala b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3SinkTask.scala index bc0a1c028..e992b50cd 100644 --- a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3SinkTask.scala +++ b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3SinkTask.scala @@ -50,7 +50,11 @@ class S3SinkTask config: S3SinkConfig, cloudClient: S3Client, ): AwsS3StorageInterface = - new AwsS3StorageInterface(connectorTaskId, cloudClient, config.batchDelete) + new AwsS3StorageInterface(connectorTaskId = connectorTaskId, + s3Client = cloudClient, + batchDelete = config.batchDelete, + extensionFilter = Option.empty, + ) override def createClient(config: S3ConnectionConfig): Either[Throwable, S3Client] = AwsS3ClientCreator.make(config) diff --git a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/S3SourceTask.scala b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/S3SourceTask.scala index b5a762284..3d8111cbb 100644 --- a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/S3SourceTask.scala +++ b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/S3SourceTask.scala @@ -43,7 +43,11 @@ class S3SourceTask config: S3SourceConfig, s3Client: S3Client, ): AwsS3StorageInterface = - new AwsS3StorageInterface(connectorTaskId, s3Client, config.batchDelete) + new AwsS3StorageInterface(connectorTaskId = connectorTaskId, + s3Client = s3Client, + batchDelete = config.batchDelete, + extensionFilter = config.extensionFilter, + ) override def createClient(config: S3SourceConfig): Either[Throwable, S3Client] = AwsS3ClientCreator.make(config.connectionConfig) diff --git a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/config/S3SourceConfig.scala b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/config/S3SourceConfig.scala index 5f64167e9..64b7af0c3 100644 --- a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/config/S3SourceConfig.scala +++ b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/config/S3SourceConfig.scala @@ -25,6 +25,7 @@ import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodec import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocationValidator import io.lenses.streamreactor.connect.cloud.common.source.config.CloudSourceBucketOptions import io.lenses.streamreactor.connect.cloud.common.source.config.PartitionSearcherOptions +import io.lenses.streamreactor.connect.cloud.common.storage.ExtensionFilter object S3SourceConfig extends PropsToConfigConverter[S3SourceConfig] { @@ -52,6 +53,7 @@ object S3SourceConfig extends PropsToConfigConverter[S3SourceConfig] { s3ConfigDefBuilder.getCompressionCodec(), s3ConfigDefBuilder.getPartitionSearcherOptions(parsedValues), s3ConfigDefBuilder.batchDelete(), + s3ConfigDefBuilder.getSourceExtensionFilter, ) } @@ -64,4 +66,5 @@ case class S3SourceConfig( compressionCodec: CompressionCodec, partitionSearcher: PartitionSearcherOptions, batchDelete: Boolean, + extensionFilter: Option[ExtensionFilter], ) extends CloudSourceConfig[S3FileMetadata] diff --git a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/config/S3SourceConfigDef.scala b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/config/S3SourceConfigDef.scala index 22c2c9aca..df432c999 100644 --- a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/config/S3SourceConfigDef.scala +++ b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/config/S3SourceConfigDef.scala @@ -30,5 +30,6 @@ object S3SourceConfigDef extends S3CommonConfigDef with CloudSourceSettingsKeys addSourceOrderingSettings(settings) addSourcePartitionSearcherSettings(settings) addSourcePartitionExtractorSettings(settings) + addSourceFilteringSettings(settings) } } diff --git a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/storage/AwsS3StorageInterface.scala b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/storage/AwsS3StorageInterface.scala index 6b6cf5175..881023d75 100644 --- a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/storage/AwsS3StorageInterface.scala +++ b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/storage/AwsS3StorageInterface.scala @@ -20,6 +20,7 @@ import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId import io.lenses.streamreactor.connect.cloud.common.config.ObjectMetadata import io.lenses.streamreactor.connect.cloud.common.model.UploadableFile import io.lenses.streamreactor.connect.cloud.common.model.UploadableString +import io.lenses.streamreactor.connect.cloud.common.storage.ExtensionFilter import io.lenses.streamreactor.connect.cloud.common.storage.FileCreateError import io.lenses.streamreactor.connect.cloud.common.storage.FileDeleteError import io.lenses.streamreactor.connect.cloud.common.storage.FileListError @@ -49,6 +50,7 @@ class AwsS3StorageInterface( connectorTaskId: ConnectorTaskId, s3Client: S3Client, batchDelete: Boolean, + extensionFilter: Option[ExtensionFilter], ) extends StorageInterface[S3FileMetadata] with LazyLogging { @@ -74,6 +76,7 @@ class AwsS3StorageInterface( .asScala .filterNot(AwsS3StorageFilter.filterOut) .map(o => S3FileMetadata(o.key(), o.lastModified())) + .filter(md => extensionFilter.forall(_.filter(md))) processAsKey( bucket, @@ -121,7 +124,7 @@ class AwsS3StorageInterface( pagReq.iterator().asScala.flatMap( _.contents().asScala.filterNot(AwsS3StorageFilter.filterOut).toSeq.map(o => S3FileMetadata(o.key(), o.lastModified()), - ), + ).filter(md => extensionFilter.forall(_.filter(md))), ).toSeq, ) }.toEither.leftMap { diff --git a/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/config/S3ConfigSettingsTest.scala b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/config/S3ConfigSettingsTest.scala index cbab1e27d..ccab6d4f0 100644 --- a/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/config/S3ConfigSettingsTest.scala +++ b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/config/S3ConfigSettingsTest.scala @@ -35,7 +35,7 @@ class S3ConfigSettingsTest extends AnyFlatSpec with Matchers with LazyLogging { val configKeys = S3SinkConfigDef.config.configKeys().keySet().asScala ++ S3SourceConfigDef.config.configKeys().keySet().asScala - configKeys.size shouldBe 51 + configKeys.size shouldBe 53 configKeys.foreach { k => k.toLowerCase should be(k) } diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/config/traits/CloudConfig.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/config/traits/CloudConfig.scala index 3bf141922..a52bae388 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/config/traits/CloudConfig.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/config/traits/CloudConfig.scala @@ -22,6 +22,7 @@ import io.lenses.streamreactor.connect.cloud.common.sink.config.CloudSinkBucketO import io.lenses.streamreactor.connect.cloud.common.sink.config.IndexOptions import io.lenses.streamreactor.connect.cloud.common.source.config.CloudSourceBucketOptions import io.lenses.streamreactor.connect.cloud.common.source.config.PartitionSearcherOptions +import io.lenses.streamreactor.connect.cloud.common.storage.ExtensionFilter import io.lenses.streamreactor.connect.cloud.common.storage.FileMetadata /** @@ -99,4 +100,15 @@ trait CloudSourceConfig[MD <: FileMetadata] extends CloudConfig { * @return The partition searcher options for the cloud source. */ def partitionSearcher: PartitionSearcherOptions + + /** + * Retrieves the extension filter for the cloud source, if configured. + * + * The extension filter is used to include or exclude files + * based on their extensions when reading from the cloud source. + * + * @return Option containing the extension filter for the cloud source. + */ + def extensionFilter: Option[ExtensionFilter] + } diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/config/CloudSourceSettings.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/config/CloudSourceSettings.scala index 830b57238..add34b2e6 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/config/CloudSourceSettings.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/config/CloudSourceSettings.scala @@ -19,6 +19,7 @@ import io.lenses.streamreactor.common.config.base.traits.BaseSettings import io.lenses.streamreactor.connect.cloud.common.config.ConfigParse import io.lenses.streamreactor.connect.cloud.common.config.kcqlprops.PropsKeyEntry import io.lenses.streamreactor.connect.cloud.common.config.kcqlprops.PropsKeyEnum +import io.lenses.streamreactor.connect.cloud.common.storage.ExtensionFilter import io.lenses.streamreactor.connect.cloud.common.storage.FileMetadata import io.lenses.streamreactor.connect.config.kcqlprops.KcqlProperties @@ -52,4 +53,33 @@ trait CloudSourceSettings extends BaseSettings with CloudSourceSettingsKeys { wildcardExcludes = getString(PARTITION_SEARCH_INDEX_EXCLUDES).split(',').toSet[String].map(_.trim), ) + /** + * Retrieves the extension filter for the source. + * + * The extension filter is used to include or exclude files + * based on their extensions when reading from the source. + * + * @return The extension filter for the source. + */ + def getSourceExtensionFilter: Option[ExtensionFilter] = { + + val includes = extractSetFromProperty(SOURCE_EXTENSION_INCLUDES) + val excludes = extractSetFromProperty(SOURCE_EXTENSION_EXCLUDES) + Option.when(includes.nonEmpty || excludes.nonEmpty)(new ExtensionFilter(includes.getOrElse(Set.empty), + excludes.getOrElse(Set.empty), + )) + } + + /** + * Extracts the property value from the configuration and transforms it into a set of strings. + * + * Each string in the set represents a file extension. If the extension does not start with a dot, one is added. + * + * @param propertyName The name of the property to extract. + * @return An Option containing a set of strings if the property exists, None otherwise. + */ + private def extractSetFromProperty(propertyName: String): Option[Set[String]] = + Option(getString(propertyName)).map(_.split(",").map(_.toLowerCase).map(s => + if (s.startsWith(".")) s else s".$s", + ).toSet) } diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/config/CloudSourceSettingsKeys.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/config/CloudSourceSettingsKeys.scala index 98682c86e..6df64b4db 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/config/CloudSourceSettingsKeys.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/config/CloudSourceSettingsKeys.scala @@ -39,6 +39,49 @@ trait CloudSourceSettingsKeys extends WithConnectorPrefix { "Comma-separated list of directory prefixes to exclude from the partition search" private val PARTITION_SEARCH_INDEX_EXCLUDES_DEFAULT: String = ".indexes" + protected val SOURCE_EXTENSION_EXCLUDES: String = s"$connectorPrefix.source.extension.excludes" + private val SOURCE_EXTENSION_EXCLUDES_DOC: String = + "Comma-separated list of file extensions to exclude from the source file search. If not configured, no files will be excluded. When used in conjunction with 'source.extension.includes', files must match the includes list and not match the excludes list to be considered." + private val SOURCE_EXTENSION_EXCLUDES_DEFAULT: String = null + + protected val SOURCE_EXTENSION_INCLUDES: String = s"$connectorPrefix.source.extension.includes" + private val SOURCE_EXTENSION_INCLUDES_DOC: String = + "Comma-separated list of file extensions to include in the source file search. If not configured, all files are considered. When used in conjunction with 'source.extension.excludes', files must match the includes list and not match the excludes list to be considered." + private val SOURCE_EXTENSION_INCLUDES_DEFAULT: String = null + + /** + * Adds source filtering settings to the provided ConfigDef. + * + * The settings include the file extensions to include and exclude when searching for source files. + * + * @param configDef The ConfigDef to which the settings are added. + * @return The ConfigDef with the added settings. + */ + def addSourceFilteringSettings(configDef: ConfigDef): ConfigDef = + configDef + .define( + SOURCE_EXTENSION_EXCLUDES, + Type.STRING, + SOURCE_EXTENSION_EXCLUDES_DEFAULT, + Importance.LOW, + SOURCE_EXTENSION_EXCLUDES_DOC, + "Source Filtering", + 2, + ConfigDef.Width.LONG, + SOURCE_EXTENSION_EXCLUDES, + ) + .define( + SOURCE_EXTENSION_INCLUDES, + Type.STRING, + SOURCE_EXTENSION_INCLUDES_DEFAULT, + Importance.LOW, + SOURCE_EXTENSION_INCLUDES_DOC, + "Source Filtering", + 1, + ConfigDef.Width.LONG, + SOURCE_EXTENSION_INCLUDES, + ) + def addSourceOrderingSettings(configDef: ConfigDef): ConfigDef = configDef .define( @@ -100,15 +143,15 @@ trait CloudSourceSettingsKeys extends WithConnectorPrefix { ) val SOURCE_PARTITION_EXTRACTOR_TYPE = s"$connectorPrefix.source.partition.extractor.type" - val SOURCE_PARTITION_EXTRACTOR_TYPE_DOC = + private val SOURCE_PARTITION_EXTRACTOR_TYPE_DOC = "If you want to read to specific partitions when running the source. Options are 'hierarchical' (to match the sink's hierarchical file storage pattern) and 'regex' (supply a custom regex). Any other value will ignore original partitions and they should be evenly distributed through available partitions (Kafka dependent)." - val SOURCE_PARTITION_EXTRACTOR_REGEX = s"$connectorPrefix.source.partition.extractor.regex" - val SOURCE_PARTITION_EXTRACTOR_REGEX_DOC = "If reading filename from regex, supply the regex here." + val SOURCE_PARTITION_EXTRACTOR_REGEX = s"$connectorPrefix.source.partition.extractor.regex" + private val SOURCE_PARTITION_EXTRACTOR_REGEX_DOC = "If reading filename from regex, supply the regex here." - val SOURCE_ORDERING_TYPE: String = s"$connectorPrefix.ordering.type" - val SOURCE_ORDERING_TYPE_DOC: String = "AlphaNumeric (the default)" - val SOURCE_ORDERING_TYPE_DEFAULT: String = "AlphaNumeric" + val SOURCE_ORDERING_TYPE: String = s"$connectorPrefix.ordering.type" + private val SOURCE_ORDERING_TYPE_DOC: String = "AlphaNumeric (the default)" + private val SOURCE_ORDERING_TYPE_DEFAULT: String = "AlphaNumeric" def addSourcePartitionExtractorSettings(configDef: ConfigDef): ConfigDef = configDef.define( SOURCE_PARTITION_EXTRACTOR_TYPE, diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/config/S3SourceBucketOptions.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/config/S3SourceBucketOptions.scala deleted file mode 100644 index 861a5e2ce..000000000 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/config/S3SourceBucketOptions.scala +++ /dev/null @@ -1,15 +0,0 @@ -/* - * Copyright 2017-2024 Lenses.io Ltd - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/storage/ExtensionFilter.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/storage/ExtensionFilter.scala new file mode 100644 index 000000000..0ce9a8722 --- /dev/null +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/storage/ExtensionFilter.scala @@ -0,0 +1,63 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.cloud.common.storage + +/** + * A class used to filter files based on their extensions. + * It allows to include or exclude files with certain extensions. + * + * @constructor create a new ExtensionFilter with allowed and excluded extensions. + * @param allowedExtensions the set of extensions that are allowed. + * @param excludedExtensions the set of extensions that are excluded. + */ +class ExtensionFilter( + val allowedExtensions: Set[String], + val excludedExtensions: Set[String], +) { + + /** + * Filters the metadata of a file based on its extension. + * + * @param metadata the metadata of the file to be filtered. + * @return true if the file passes the filter, false otherwise. + */ + def filter[MD <: FileMetadata](metadata: MD): Boolean = + ExtensionFilter.performFilterLogic(metadata.file.toLowerCase, allowedExtensions, excludedExtensions) + + /** + * Filters a file based on its name. + * + * @param fileName the name of the file to be filtered. + * @return true if the file passes the filter, false otherwise. + */ + def filter(fileName: String): Boolean = + ExtensionFilter.performFilterLogic(fileName.toLowerCase, allowedExtensions, excludedExtensions) + +} + +object ExtensionFilter { + + def performFilterLogic( + fileName: String, + allowedExtensions: Set[String], + excludedExtensions: Set[String], + ): Boolean = { + val allowedContainsEx = allowedExtensions.exists(ext => fileName.endsWith(ext)) + val excludedNotContainsEx = excludedExtensions.forall(ext => !fileName.endsWith(ext)) + (allowedExtensions.isEmpty || allowedContainsEx) && excludedNotContainsEx + } + +} diff --git a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/source/config/CloudSourceSettingsTest.scala b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/source/config/CloudSourceSettingsTest.scala new file mode 100644 index 000000000..d24da6410 --- /dev/null +++ b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/source/config/CloudSourceSettingsTest.scala @@ -0,0 +1,69 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.cloud.common.source.config + +import org.apache.kafka.common.config.types.Password +import org.scalatest.OptionValues +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +import java.lang +import java.util + +class CloudSourceSettingsTest extends AnyFlatSpec with Matchers with OptionValues { + + "getSourceExtensionFilter" should "return an ExtensionFilter with correct includes and excludes" in { + val settings: CloudSourceSettings = mockSettingsObject( + includes = ".txt,.csv", + excludes = ".log", + ) + + val filter = settings.getSourceExtensionFilter.value + filter.allowedExtensions should be(Set(".txt", ".csv")) + filter.excludedExtensions should be(Set(".log")) + } + + "getSourceExtensionFilter" should "return an ExtensionFilter with correct includes and if no dots used" in { + val settings: CloudSourceSettings = mockSettingsObject( + includes = "txt", + excludes = "log,csv", + ) + + val filter = settings.getSourceExtensionFilter.value + filter.allowedExtensions should be(Set(".txt")) + filter.excludedExtensions should be(Set(".log", ".csv")) + } + + private def mockSettingsObject(includes: String, excludes: String) = new CloudSourceSettings { + override def getString(key: String): String = key match { + case SOURCE_EXTENSION_INCLUDES => includes + case SOURCE_EXTENSION_EXCLUDES => excludes + case _ => "" + } + + override def getInt(key: String): Integer = ??? + + override def getLong(key: String): lang.Long = ??? + + override def getBoolean(key: String): lang.Boolean = ??? + + override def getPassword(key: String): Password = ??? + + override def getList(key: String): util.List[String] = ??? + + override def connectorPrefix: String = "my.connector" + } +} diff --git a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/storage/ExtensionFilterTest.scala b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/storage/ExtensionFilterTest.scala new file mode 100644 index 000000000..513630961 --- /dev/null +++ b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/storage/ExtensionFilterTest.scala @@ -0,0 +1,150 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.cloud.common.storage + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +import java.time.Instant + +class ExtensionFilterTest extends AnyFlatSpec with Matchers { + + case class TestFileMetadata(file: String) extends FileMetadata { + override def lastModified: Instant = Instant.now() + } + + "ExtensionFilter" should "work with extensions with multiple dot characters" in { + val filter = new ExtensionFilter(Set(".tar.gz"), Set.empty) + + val metadata = TestFileMetadata("myfile.tar.gz") + + filter.filter(metadata) should be(true) + } + + "ExtensionFilter" should "allow all extensions if allowedExtensions is None" in { + val filter = new ExtensionFilter(Set.empty, Set.empty) + + val metadata = TestFileMetadata("file.txt") + + filter.filter(metadata) should be(true) + } + + it should "allow only specified extensions when allowedExtensions is defined" in { + val filter = new ExtensionFilter(Set(".txt", ".md"), Set.empty) + + val txtMetadata = TestFileMetadata("file.txt") + val mdMetadata = TestFileMetadata("file.md") + val csvMetadata = TestFileMetadata("file.csv") + + filter.filter(txtMetadata) should be(true) + filter.filter(mdMetadata) should be(true) + filter.filter(csvMetadata) should be(false) + } + + it should "exclude specified extensions when excludedExtensions is defined" in { + val filter = new ExtensionFilter(Set.empty, Set(".exe", ".bin")) + + val txtMetadata = TestFileMetadata("file.txt") + val exeMetadata = TestFileMetadata("file.exe") + + filter.filter(txtMetadata) should be(true) + filter.filter(exeMetadata) should be(false) + } + + it should "allow only allowed extensions and not excluded extensions" in { + val filter = new ExtensionFilter(Set(".txt", ".md"), Set(".exe", ".bin")) + + val txtMetadata = TestFileMetadata("file.txt") + val mdMetadata = TestFileMetadata("file.md") + val exeMetadata = TestFileMetadata("file.exe") + val binMetadata = TestFileMetadata("file.bin") + + filter.filter(txtMetadata) should be(true) + filter.filter(mdMetadata) should be(true) + filter.filter(exeMetadata) should be(false) + filter.filter(binMetadata) should be(false) + } + + it should "not allow any extensions if all are excluded" in { + val filter = new ExtensionFilter(Set(".txt", ".md"), Set(".txt", ".md", ".exe", ".bin")) + + val txtMetadata = TestFileMetadata("file.txt") + val mdMetadata = TestFileMetadata("file.md") + + filter.filter(txtMetadata) should be(false) + filter.filter(mdMetadata) should be(false) + } + + it should "allow all extensions if allowedExtensions and excludedExtensions are both None" in { + val filter = new ExtensionFilter(Set.empty, Set.empty) + + val txtMetadata = TestFileMetadata("file.txt") + val mdMetadata = TestFileMetadata("file.md") + val exeMetadata = TestFileMetadata("file.exe") + + filter.filter(txtMetadata) should be(true) + filter.filter(mdMetadata) should be(true) + filter.filter(exeMetadata) should be(true) + } + + it should "skip files with no extension if extension filter is configured" in { + val filter = new ExtensionFilter(Set(".txt"), Set.empty) + + val horseMetadata = TestFileMetadata("horses") + val catMetadata = TestFileMetadata("cats") + val donkeyMetadata = TestFileMetadata("donkey") + val txtMetadata = TestFileMetadata("file.txt") + + filter.filter(horseMetadata) should be(false) + filter.filter(catMetadata) should be(false) + filter.filter(donkeyMetadata) should be(false) + + filter.filter(txtMetadata) should be(true) + + } + + "performFilterLogic" should "return true when filename matches allowed extension and not in excluded extensions" in { + val fileName = "test.txt" + val maybeAllowedExtensions = Set(".txt") + val maybeExcludedExtensions = Set(".exe") + val result = ExtensionFilter.performFilterLogic(fileName, maybeAllowedExtensions, maybeExcludedExtensions) + result should be(true) + } + + it should "return false when filename matches excluded extension" in { + val fileName = "test.exe" + val maybeAllowedExtensions = Set(".txt") + val maybeExcludedExtensions = Set(".exe") + val result = ExtensionFilter.performFilterLogic(fileName, maybeAllowedExtensions, maybeExcludedExtensions) + result should be(false) + } + + it should "return false when filename does not match allowed extension" in { + val fileName = "test.doc" + val maybeAllowedExtensions = Set(".txt") + val maybeExcludedExtensions = Set(".exe") + val result = ExtensionFilter.performFilterLogic(fileName, maybeAllowedExtensions, maybeExcludedExtensions) + result should be(false) + } + + it should "return true when no extensions are specified" in { + val fileName = "test.doc" + val maybeAllowedExtensions = Set.empty[String] + val maybeExcludedExtensions = Set.empty[String] + val result = ExtensionFilter.performFilterLogic(fileName, maybeAllowedExtensions, maybeExcludedExtensions) + result should be(true) + } +} diff --git a/kafka-connect-gcp-storage/src/it/scala/io/lenses/streamreactor/connect/gcp/storage/utils/GCPProxyContainerTest.scala b/kafka-connect-gcp-storage/src/it/scala/io/lenses/streamreactor/connect/gcp/storage/utils/GCPProxyContainerTest.scala index 7333b1d32..95e4cb9bf 100644 --- a/kafka-connect-gcp-storage/src/it/scala/io/lenses/streamreactor/connect/gcp/storage/utils/GCPProxyContainerTest.scala +++ b/kafka-connect-gcp-storage/src/it/scala/io/lenses/streamreactor/connect/gcp/storage/utils/GCPProxyContainerTest.scala @@ -49,7 +49,9 @@ trait GCPProxyContainerTest override val prefix: String = "connect.gcpstorage" override def createStorageInterface(client: Storage): Either[Throwable, GCPStorageStorageInterface] = - Try(new GCPStorageStorageInterface(connectorTaskId, client, true)).toEither + Try( + new GCPStorageStorageInterface(connectorTaskId, client, true, Option.empty), + ).toEither override def createClient(): Either[Throwable, Storage] = { diff --git a/kafka-connect-gcp-storage/src/main/scala/io/lenses/streamreactor/connect/gcp/storage/sink/GCPStorageSinkTask.scala b/kafka-connect-gcp-storage/src/main/scala/io/lenses/streamreactor/connect/gcp/storage/sink/GCPStorageSinkTask.scala index 7205c7efd..89dc67687 100644 --- a/kafka-connect-gcp-storage/src/main/scala/io/lenses/streamreactor/connect/gcp/storage/sink/GCPStorageSinkTask.scala +++ b/kafka-connect-gcp-storage/src/main/scala/io/lenses/streamreactor/connect/gcp/storage/sink/GCPStorageSinkTask.scala @@ -50,7 +50,11 @@ class GCPStorageSinkTask config: GCPStorageSinkConfig, cloudClient: Storage, ): StorageInterface[GCPStorageFileMetadata] = - new GCPStorageStorageInterface(connectorTaskId, cloudClient, avoidReumableUpload = config.avoidResumableUpload) + new GCPStorageStorageInterface(connectorTaskId, + storage = cloudClient, + avoidReumableUpload = config.avoidResumableUpload, + extensionFilter = Option.empty, + ) override def convertPropsToConfig( connectorTaskId: ConnectorTaskId, diff --git a/kafka-connect-gcp-storage/src/main/scala/io/lenses/streamreactor/connect/gcp/storage/source/GCPStorageSourceTask.scala b/kafka-connect-gcp-storage/src/main/scala/io/lenses/streamreactor/connect/gcp/storage/source/GCPStorageSourceTask.scala index f4c129eb6..698c878b3 100644 --- a/kafka-connect-gcp-storage/src/main/scala/io/lenses/streamreactor/connect/gcp/storage/source/GCPStorageSourceTask.scala +++ b/kafka-connect-gcp-storage/src/main/scala/io/lenses/streamreactor/connect/gcp/storage/source/GCPStorageSourceTask.scala @@ -44,7 +44,11 @@ class GCPStorageSourceTask config: GCPStorageSourceConfig, storage: Storage, ): GCPStorageStorageInterface = - new GCPStorageStorageInterface(connectorTaskId, storage = storage, avoidReumableUpload = false) + new GCPStorageStorageInterface(connectorTaskId, + storage = storage, + avoidReumableUpload = false, + extensionFilter = config.extensionFilter, + ) override def createClient(config: GCPStorageSourceConfig): Either[Throwable, Storage] = GCPStorageClientCreator.make(config.connectionConfig) diff --git a/kafka-connect-gcp-storage/src/main/scala/io/lenses/streamreactor/connect/gcp/storage/source/config/GCPStorageSourceConfig.scala b/kafka-connect-gcp-storage/src/main/scala/io/lenses/streamreactor/connect/gcp/storage/source/config/GCPStorageSourceConfig.scala index 45cbca230..e3ccda27c 100644 --- a/kafka-connect-gcp-storage/src/main/scala/io/lenses/streamreactor/connect/gcp/storage/source/config/GCPStorageSourceConfig.scala +++ b/kafka-connect-gcp-storage/src/main/scala/io/lenses/streamreactor/connect/gcp/storage/source/config/GCPStorageSourceConfig.scala @@ -23,6 +23,7 @@ import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodec import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocationValidator import io.lenses.streamreactor.connect.cloud.common.source.config.CloudSourceBucketOptions import io.lenses.streamreactor.connect.cloud.common.source.config.PartitionSearcherOptions +import io.lenses.streamreactor.connect.cloud.common.storage.ExtensionFilter import io.lenses.streamreactor.connect.gcp.common.auth.GCPConnectionConfig import io.lenses.streamreactor.connect.gcp.storage.model.location.GCPStorageLocationValidator import io.lenses.streamreactor.connect.gcp.storage.storage.GCPStorageFileMetadata @@ -56,6 +57,7 @@ object GCPStorageSourceConfig extends PropsToConfigConverter[GCPStorageSourceCon sbo, gcpConfigDefBuilder.getCompressionCodec(), gcpConfigDefBuilder.getPartitionSearcherOptions(parsedValues), + gcpConfigDefBuilder.getSourceExtensionFilter, ) } @@ -67,4 +69,5 @@ case class GCPStorageSourceConfig( bucketOptions: Seq[CloudSourceBucketOptions[GCPStorageFileMetadata]] = Seq.empty, compressionCodec: CompressionCodec, partitionSearcher: PartitionSearcherOptions, + extensionFilter: Option[ExtensionFilter], ) extends CloudSourceConfig[GCPStorageFileMetadata] diff --git a/kafka-connect-gcp-storage/src/main/scala/io/lenses/streamreactor/connect/gcp/storage/source/config/GCPStorageSourceConfigDef.scala b/kafka-connect-gcp-storage/src/main/scala/io/lenses/streamreactor/connect/gcp/storage/source/config/GCPStorageSourceConfigDef.scala index 24a4f4697..9f62cb0b4 100644 --- a/kafka-connect-gcp-storage/src/main/scala/io/lenses/streamreactor/connect/gcp/storage/source/config/GCPStorageSourceConfigDef.scala +++ b/kafka-connect-gcp-storage/src/main/scala/io/lenses/streamreactor/connect/gcp/storage/source/config/GCPStorageSourceConfigDef.scala @@ -31,6 +31,7 @@ object GCPStorageSourceConfigDef extends CommonConfigDef with CloudSourceSetting addSourceOrderingSettings(settings) addSourcePartitionSearcherSettings(settings) addSourcePartitionExtractorSettings(settings) + addSourceFilteringSettings(settings) } } diff --git a/kafka-connect-gcp-storage/src/main/scala/io/lenses/streamreactor/connect/gcp/storage/storage/GCPStorageStorageInterface.scala b/kafka-connect-gcp-storage/src/main/scala/io/lenses/streamreactor/connect/gcp/storage/storage/GCPStorageStorageInterface.scala index 341cdc9cc..31f7c1bb2 100644 --- a/kafka-connect-gcp-storage/src/main/scala/io/lenses/streamreactor/connect/gcp/storage/storage/GCPStorageStorageInterface.scala +++ b/kafka-connect-gcp-storage/src/main/scala/io/lenses/streamreactor/connect/gcp/storage/storage/GCPStorageStorageInterface.scala @@ -29,6 +29,7 @@ import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId import io.lenses.streamreactor.connect.cloud.common.config.ObjectMetadata import io.lenses.streamreactor.connect.cloud.common.model.UploadableFile import io.lenses.streamreactor.connect.cloud.common.model.UploadableString +import io.lenses.streamreactor.connect.cloud.common.storage.ExtensionFilter import io.lenses.streamreactor.connect.cloud.common.storage.FileCreateError import io.lenses.streamreactor.connect.cloud.common.storage.FileDeleteError import io.lenses.streamreactor.connect.cloud.common.storage.FileListError @@ -48,8 +49,12 @@ import scala.collection.immutable.Seq import scala.jdk.CollectionConverters.IterableHasAsScala import scala.util.Try -class GCPStorageStorageInterface(connectorTaskId: ConnectorTaskId, storage: Storage, avoidReumableUpload: Boolean) - extends StorageInterface[GCPStorageFileMetadata] +class GCPStorageStorageInterface( + connectorTaskId: ConnectorTaskId, + storage: Storage, + avoidReumableUpload: Boolean, + extensionFilter: Option[ExtensionFilter], +) extends StorageInterface[GCPStorageFileMetadata] with LazyLogging { override def uploadFile(source: UploadableFile, bucket: String, path: String): Either[UploadError, Unit] = { logger.debug(s"[{}] GCP Uploading file from local {} to Storage {}:{}", connectorTaskId.show, source, bucket, path) @@ -189,7 +194,10 @@ class GCPStorageStorageInterface(connectorTaskId: ConnectorTaskId, storage: Stor Option.when(accumulatedKeys.nonEmpty)(fnResultWrapperCreate(accumulatedKeys, latestCreated)) case Some(page: Page[Blob]) => val pages = page.getValues.asScala - val newKeys: Seq[E] = accumulatedKeys ++ pages.map(p => fnListElementCreate(p)) + val newKeys: Seq[E] = + accumulatedKeys ++ pages.filter(p => extensionFilter.forall(_.filter(p.getName))).map(p => + fnListElementCreate(p), + ) val newLatestCreated = pages.lastOption.map(_.getCreateTimeOffsetDateTime.toInstant).orElse(latestCreated) listKeysRecursiveHelper(Option(page.getNextPage), newKeys, newLatestCreated) } @@ -207,13 +215,15 @@ class GCPStorageStorageInterface(connectorTaskId: ConnectorTaskId, storage: Stor listFilesRecursive( bucket, prefix, - (accumulatedKeys: Seq[String], latestCreated: Option[Instant]) => + (accumulatedKeys: Seq[String], latestCreated: Option[Instant]) => { + val filteredKeys = filterKeys(accumulatedKeys) ListOfKeysResponse[GCPStorageFileMetadata]( bucket, prefix, - accumulatedKeys, - GCPStorageFileMetadata(accumulatedKeys.last, latestCreated.get), - ), + filteredKeys, + GCPStorageFileMetadata(filteredKeys.last, latestCreated.get), + ) + }, _.getName, ) @@ -224,16 +234,35 @@ class GCPStorageStorageInterface(connectorTaskId: ConnectorTaskId, storage: Stor listFilesRecursive( bucket, prefix, - (accumulatedKeys: Seq[GCPStorageFileMetadata], _: Option[Instant]) => + (accumulatedKeys: Seq[GCPStorageFileMetadata], _: Option[Instant]) => { + val filteredKeys: Seq[GCPStorageFileMetadata] = filterKeys(accumulatedKeys) ListOfMetadataResponse[GCPStorageFileMetadata]( bucket, prefix, - accumulatedKeys, - accumulatedKeys.last, - ), + filteredKeys, + filteredKeys.last, + ) + }, p => GCPStorageFileMetadata(p.getName, p.getCreateTimeOffsetDateTime.toInstant), ) + trait Filterable[T] { + def filter(extensionFilter: ExtensionFilter, t: T): Boolean + } + + implicit val stringFilterable: Filterable[String] = (extensionFilter: ExtensionFilter, t: String) => + extensionFilter.filter(t) + + implicit val metadataFilterable: Filterable[GCPStorageFileMetadata] = + (extensionFilter: ExtensionFilter, t: GCPStorageFileMetadata) => extensionFilter.filter(t.file) + + def filterKeys[T: Filterable](accumulatedKeys: Seq[T]): Seq[T] = { + val filterable = implicitly[Filterable[T]] + extensionFilter + .map(ex => accumulatedKeys.filter(filterable.filter(ex, _))) + .getOrElse(accumulatedKeys) + } + override def list( bucket: String, prefix: Option[String], @@ -250,7 +279,7 @@ class GCPStorageStorageInterface(connectorTaskId: ConnectorTaskId, storage: Stor case Left(ex) => FileListError(ex, bucket, prefix).asLeft case Right(page) => val pageValues = page.getValues.asScala - val keys = pageValues.map(_.getName).filterNot(f => lastFile.map(_.file).contains(f)).toSeq + val keys = filterKeys(pageValues.map(_.getName).toSeq).filterNot(f => lastFile.map(_.file).contains(f)) logger.trace( s"[${connectorTaskId.show}] Last file: $lastFile, Prefix: $prefix Page: ${pageValues.map(_.getName)}, Keys: $keys", ) diff --git a/kafka-connect-gcp-storage/src/test/scala/io/lenses/streamreactor/connect/gcp/storage/GCPStorageStorageInterfaceTest.scala b/kafka-connect-gcp-storage/src/test/scala/io/lenses/streamreactor/connect/gcp/storage/GCPStorageStorageInterfaceTest.scala index 9ec80aa8f..7915fe9fa 100644 --- a/kafka-connect-gcp-storage/src/test/scala/io/lenses/streamreactor/connect/gcp/storage/GCPStorageStorageInterfaceTest.scala +++ b/kafka-connect-gcp-storage/src/test/scala/io/lenses/streamreactor/connect/gcp/storage/GCPStorageStorageInterfaceTest.scala @@ -85,7 +85,8 @@ class GCPStorageStorageInterfaceTest private val connectorTaskId: ConnectorTaskId = ConnectorTaskId("connector", 1, 1) - private val storageInterface = new GCPStorageStorageInterface(connectorTaskId, client, false) + private val storageInterface = + new GCPStorageStorageInterface(connectorTaskId, client, false, Option.empty) private val bucket = "test-bucket" //private val prefix = "test-prefix".some