Skip to content

Commit

Permalink
Introduce File Extension Filtering Logic for GCP Storage and AWS S3 C…
Browse files Browse the repository at this point in the history
…onnectors (#64)

This PR introduces new filtering logic to the GCP Storage Source and AWS S3 Source connectors, allowing users to include or exclude files based on their extensions during the source file search process. This enhancement provides finer control over which files are processed, improving the flexibility and efficiency of data ingestion.

GCP Storage Source Connector

New Properties:

connect.gcpstorage.source.extension.excludes:
Description: A comma-separated list of file extensions to exclude from the source file search. If this property is not configured, all files are considered.
Default: null (No filtering is enabled by default; all files are considered)

connect.gcpstorage.source.extension.includes:
Description: A comma-separated list of file extensions to include in the source file search. If this property is not configured, all files are considered.
Default: null (All extensions are included by default)

AWS S3 Source Connector

New Properties:

connect.s3.source.extension.excludes:
Description: A comma-separated list of file extensions to exclude from the source file search. If this property is not configured, all files are considered.
Default: null (No filtering is enabled by default; all files are considered)

connect.s3.source.extension.includes:
Description: A comma-separated list of file extensions to include in the source file search. If this property is not configured, all files are considered.
Default: null (All extensions are included by default)

How It Works

Include Filtering: If the source.extension.includes property is set, only files with extensions listed in this property will be considered for processing.
Exclude Filtering: If the source.extension.excludes property is set, files with extensions listed in this property will be ignored during processing.
Combined Use: When both properties are set, the connector will only include files that match the includes property and do not match the excludes property.

Use Cases:

Inclusion: Users can specify certain file types to process (e.g., .csv, .json), ensuring that only relevant files are ingested.
Exclusion: Users can exclude files with extensions that should not be processed (e.g., temporary files like .tmp or backup files like .bak).


* Source extension filters: part 1
* Wiring in
* Addressing review comments
* Making documentation more specific
  • Loading branch information
davidsloan authored Aug 12, 2024
1 parent e902f7f commit e9df8d2
Show file tree
Hide file tree
Showing 21 changed files with 460 additions and 43 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package io.lenses.streamreactor.connect.aws.s3.utils
import com.typesafe.scalalogging.LazyLogging
import io.lenses.streamreactor.connect.aws.s3.auth.AwsS3ClientCreator
import io.lenses.streamreactor.connect.aws.s3.config.S3ConfigSettings._
import io.lenses.streamreactor.connect.aws.s3.config.AuthMode
import io.lenses.streamreactor.connect.aws.s3.config.S3ConfigSettings._
import io.lenses.streamreactor.connect.aws.s3.config.S3ConnectionConfig
import io.lenses.streamreactor.connect.aws.s3.sink.S3SinkTask
import io.lenses.streamreactor.connect.aws.s3.sink.config.S3SinkConfig
Expand Down Expand Up @@ -44,7 +44,13 @@ trait S3ProxyContainerTest
override val prefix: String = "connect.s3"

override def createStorageInterface(client: S3Client): Either[Throwable, AwsS3StorageInterface] =
Try(new AwsS3StorageInterface(connectorTaskId, client, true)).toEither
Try(
new AwsS3StorageInterface(connectorTaskId = connectorTaskId,
s3Client = client,
batchDelete = true,
extensionFilter = Option.empty,
),
).toEither

override def createClient(): Either[Throwable, S3Client] = {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ class S3SinkTask
config: S3SinkConfig,
cloudClient: S3Client,
): AwsS3StorageInterface =
new AwsS3StorageInterface(connectorTaskId, cloudClient, config.batchDelete)
new AwsS3StorageInterface(connectorTaskId = connectorTaskId,
s3Client = cloudClient,
batchDelete = config.batchDelete,
extensionFilter = Option.empty,
)

override def createClient(config: S3ConnectionConfig): Either[Throwable, S3Client] =
AwsS3ClientCreator.make(config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ class S3SourceTask
config: S3SourceConfig,
s3Client: S3Client,
): AwsS3StorageInterface =
new AwsS3StorageInterface(connectorTaskId, s3Client, config.batchDelete)
new AwsS3StorageInterface(connectorTaskId = connectorTaskId,
s3Client = s3Client,
batchDelete = config.batchDelete,
extensionFilter = config.extensionFilter,
)

override def createClient(config: S3SourceConfig): Either[Throwable, S3Client] =
AwsS3ClientCreator.make(config.connectionConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodec
import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocationValidator
import io.lenses.streamreactor.connect.cloud.common.source.config.CloudSourceBucketOptions
import io.lenses.streamreactor.connect.cloud.common.source.config.PartitionSearcherOptions
import io.lenses.streamreactor.connect.cloud.common.storage.ExtensionFilter

object S3SourceConfig extends PropsToConfigConverter[S3SourceConfig] {

Expand Down Expand Up @@ -52,6 +53,7 @@ object S3SourceConfig extends PropsToConfigConverter[S3SourceConfig] {
s3ConfigDefBuilder.getCompressionCodec(),
s3ConfigDefBuilder.getPartitionSearcherOptions(parsedValues),
s3ConfigDefBuilder.batchDelete(),
s3ConfigDefBuilder.getSourceExtensionFilter,
)

}
Expand All @@ -64,4 +66,5 @@ case class S3SourceConfig(
compressionCodec: CompressionCodec,
partitionSearcher: PartitionSearcherOptions,
batchDelete: Boolean,
extensionFilter: Option[ExtensionFilter],
) extends CloudSourceConfig[S3FileMetadata]
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,6 @@ object S3SourceConfigDef extends S3CommonConfigDef with CloudSourceSettingsKeys
addSourceOrderingSettings(settings)
addSourcePartitionSearcherSettings(settings)
addSourcePartitionExtractorSettings(settings)
addSourceFilteringSettings(settings)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId
import io.lenses.streamreactor.connect.cloud.common.config.ObjectMetadata
import io.lenses.streamreactor.connect.cloud.common.model.UploadableFile
import io.lenses.streamreactor.connect.cloud.common.model.UploadableString
import io.lenses.streamreactor.connect.cloud.common.storage.ExtensionFilter
import io.lenses.streamreactor.connect.cloud.common.storage.FileCreateError
import io.lenses.streamreactor.connect.cloud.common.storage.FileDeleteError
import io.lenses.streamreactor.connect.cloud.common.storage.FileListError
Expand Down Expand Up @@ -49,6 +50,7 @@ class AwsS3StorageInterface(
connectorTaskId: ConnectorTaskId,
s3Client: S3Client,
batchDelete: Boolean,
extensionFilter: Option[ExtensionFilter],
) extends StorageInterface[S3FileMetadata]
with LazyLogging {

Expand All @@ -74,6 +76,7 @@ class AwsS3StorageInterface(
.asScala
.filterNot(AwsS3StorageFilter.filterOut)
.map(o => S3FileMetadata(o.key(), o.lastModified()))
.filter(md => extensionFilter.forall(_.filter(md)))

processAsKey(
bucket,
Expand Down Expand Up @@ -121,7 +124,7 @@ class AwsS3StorageInterface(
pagReq.iterator().asScala.flatMap(
_.contents().asScala.filterNot(AwsS3StorageFilter.filterOut).toSeq.map(o =>
S3FileMetadata(o.key(), o.lastModified()),
),
).filter(md => extensionFilter.forall(_.filter(md))),
).toSeq,
)
}.toEither.leftMap {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class S3ConfigSettingsTest extends AnyFlatSpec with Matchers with LazyLogging {
val configKeys =
S3SinkConfigDef.config.configKeys().keySet().asScala ++ S3SourceConfigDef.config.configKeys().keySet().asScala

configKeys.size shouldBe 51
configKeys.size shouldBe 53
configKeys.foreach {
k => k.toLowerCase should be(k)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import io.lenses.streamreactor.connect.cloud.common.sink.config.CloudSinkBucketO
import io.lenses.streamreactor.connect.cloud.common.sink.config.IndexOptions
import io.lenses.streamreactor.connect.cloud.common.source.config.CloudSourceBucketOptions
import io.lenses.streamreactor.connect.cloud.common.source.config.PartitionSearcherOptions
import io.lenses.streamreactor.connect.cloud.common.storage.ExtensionFilter
import io.lenses.streamreactor.connect.cloud.common.storage.FileMetadata

/**
Expand Down Expand Up @@ -99,4 +100,15 @@ trait CloudSourceConfig[MD <: FileMetadata] extends CloudConfig {
* @return The partition searcher options for the cloud source.
*/
def partitionSearcher: PartitionSearcherOptions

/**
* Retrieves the extension filter for the cloud source, if configured.
*
* The extension filter is used to include or exclude files
* based on their extensions when reading from the cloud source.
*
* @return Option containing the extension filter for the cloud source.
*/
def extensionFilter: Option[ExtensionFilter]

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import io.lenses.streamreactor.common.config.base.traits.BaseSettings
import io.lenses.streamreactor.connect.cloud.common.config.ConfigParse
import io.lenses.streamreactor.connect.cloud.common.config.kcqlprops.PropsKeyEntry
import io.lenses.streamreactor.connect.cloud.common.config.kcqlprops.PropsKeyEnum
import io.lenses.streamreactor.connect.cloud.common.storage.ExtensionFilter
import io.lenses.streamreactor.connect.cloud.common.storage.FileMetadata
import io.lenses.streamreactor.connect.config.kcqlprops.KcqlProperties

Expand Down Expand Up @@ -52,4 +53,33 @@ trait CloudSourceSettings extends BaseSettings with CloudSourceSettingsKeys {
wildcardExcludes = getString(PARTITION_SEARCH_INDEX_EXCLUDES).split(',').toSet[String].map(_.trim),
)

/**
* Retrieves the extension filter for the source.
*
* The extension filter is used to include or exclude files
* based on their extensions when reading from the source.
*
* @return The extension filter for the source.
*/
def getSourceExtensionFilter: Option[ExtensionFilter] = {

val includes = extractSetFromProperty(SOURCE_EXTENSION_INCLUDES)
val excludes = extractSetFromProperty(SOURCE_EXTENSION_EXCLUDES)
Option.when(includes.nonEmpty || excludes.nonEmpty)(new ExtensionFilter(includes.getOrElse(Set.empty),
excludes.getOrElse(Set.empty),
))
}

/**
* Extracts the property value from the configuration and transforms it into a set of strings.
*
* Each string in the set represents a file extension. If the extension does not start with a dot, one is added.
*
* @param propertyName The name of the property to extract.
* @return An Option containing a set of strings if the property exists, None otherwise.
*/
private def extractSetFromProperty(propertyName: String): Option[Set[String]] =
Option(getString(propertyName)).map(_.split(",").map(_.toLowerCase).map(s =>
if (s.startsWith(".")) s else s".$s",
).toSet)
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,49 @@ trait CloudSourceSettingsKeys extends WithConnectorPrefix {
"Comma-separated list of directory prefixes to exclude from the partition search"
private val PARTITION_SEARCH_INDEX_EXCLUDES_DEFAULT: String = ".indexes"

protected val SOURCE_EXTENSION_EXCLUDES: String = s"$connectorPrefix.source.extension.excludes"
private val SOURCE_EXTENSION_EXCLUDES_DOC: String =
"Comma-separated list of file extensions to exclude from the source file search. If not configured, no files will be excluded. When used in conjunction with 'source.extension.includes', files must match the includes list and not match the excludes list to be considered."
private val SOURCE_EXTENSION_EXCLUDES_DEFAULT: String = null

protected val SOURCE_EXTENSION_INCLUDES: String = s"$connectorPrefix.source.extension.includes"
private val SOURCE_EXTENSION_INCLUDES_DOC: String =
"Comma-separated list of file extensions to include in the source file search. If not configured, all files are considered. When used in conjunction with 'source.extension.excludes', files must match the includes list and not match the excludes list to be considered."
private val SOURCE_EXTENSION_INCLUDES_DEFAULT: String = null

/**
* Adds source filtering settings to the provided ConfigDef.
*
* The settings include the file extensions to include and exclude when searching for source files.
*
* @param configDef The ConfigDef to which the settings are added.
* @return The ConfigDef with the added settings.
*/
def addSourceFilteringSettings(configDef: ConfigDef): ConfigDef =
configDef
.define(
SOURCE_EXTENSION_EXCLUDES,
Type.STRING,
SOURCE_EXTENSION_EXCLUDES_DEFAULT,
Importance.LOW,
SOURCE_EXTENSION_EXCLUDES_DOC,
"Source Filtering",
2,
ConfigDef.Width.LONG,
SOURCE_EXTENSION_EXCLUDES,
)
.define(
SOURCE_EXTENSION_INCLUDES,
Type.STRING,
SOURCE_EXTENSION_INCLUDES_DEFAULT,
Importance.LOW,
SOURCE_EXTENSION_INCLUDES_DOC,
"Source Filtering",
1,
ConfigDef.Width.LONG,
SOURCE_EXTENSION_INCLUDES,
)

def addSourceOrderingSettings(configDef: ConfigDef): ConfigDef =
configDef
.define(
Expand Down Expand Up @@ -100,15 +143,15 @@ trait CloudSourceSettingsKeys extends WithConnectorPrefix {
)

val SOURCE_PARTITION_EXTRACTOR_TYPE = s"$connectorPrefix.source.partition.extractor.type"
val SOURCE_PARTITION_EXTRACTOR_TYPE_DOC =
private val SOURCE_PARTITION_EXTRACTOR_TYPE_DOC =
"If you want to read to specific partitions when running the source. Options are 'hierarchical' (to match the sink's hierarchical file storage pattern) and 'regex' (supply a custom regex). Any other value will ignore original partitions and they should be evenly distributed through available partitions (Kafka dependent)."

val SOURCE_PARTITION_EXTRACTOR_REGEX = s"$connectorPrefix.source.partition.extractor.regex"
val SOURCE_PARTITION_EXTRACTOR_REGEX_DOC = "If reading filename from regex, supply the regex here."
val SOURCE_PARTITION_EXTRACTOR_REGEX = s"$connectorPrefix.source.partition.extractor.regex"
private val SOURCE_PARTITION_EXTRACTOR_REGEX_DOC = "If reading filename from regex, supply the regex here."

val SOURCE_ORDERING_TYPE: String = s"$connectorPrefix.ordering.type"
val SOURCE_ORDERING_TYPE_DOC: String = "AlphaNumeric (the default)"
val SOURCE_ORDERING_TYPE_DEFAULT: String = "AlphaNumeric"
val SOURCE_ORDERING_TYPE: String = s"$connectorPrefix.ordering.type"
private val SOURCE_ORDERING_TYPE_DOC: String = "AlphaNumeric (the default)"
private val SOURCE_ORDERING_TYPE_DEFAULT: String = "AlphaNumeric"

def addSourcePartitionExtractorSettings(configDef: ConfigDef): ConfigDef = configDef.define(
SOURCE_PARTITION_EXTRACTOR_TYPE,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2017-2024 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.cloud.common.storage

/**
* A class used to filter files based on their extensions.
* It allows to include or exclude files with certain extensions.
*
* @constructor create a new ExtensionFilter with allowed and excluded extensions.
* @param allowedExtensions the set of extensions that are allowed.
* @param excludedExtensions the set of extensions that are excluded.
*/
class ExtensionFilter(
val allowedExtensions: Set[String],
val excludedExtensions: Set[String],
) {

/**
* Filters the metadata of a file based on its extension.
*
* @param metadata the metadata of the file to be filtered.
* @return true if the file passes the filter, false otherwise.
*/
def filter[MD <: FileMetadata](metadata: MD): Boolean =
ExtensionFilter.performFilterLogic(metadata.file.toLowerCase, allowedExtensions, excludedExtensions)

/**
* Filters a file based on its name.
*
* @param fileName the name of the file to be filtered.
* @return true if the file passes the filter, false otherwise.
*/
def filter(fileName: String): Boolean =
ExtensionFilter.performFilterLogic(fileName.toLowerCase, allowedExtensions, excludedExtensions)

}

object ExtensionFilter {

def performFilterLogic(
fileName: String,
allowedExtensions: Set[String],
excludedExtensions: Set[String],
): Boolean = {
val allowedContainsEx = allowedExtensions.exists(ext => fileName.endsWith(ext))
val excludedNotContainsEx = excludedExtensions.forall(ext => !fileName.endsWith(ext))
(allowedExtensions.isEmpty || allowedContainsEx) && excludedNotContainsEx
}

}
Loading

0 comments on commit e9df8d2

Please sign in to comment.