Skip to content

Commit

Permalink
SMT for Source Partition/Offset Value
Browse files Browse the repository at this point in the history
  • Loading branch information
davidsloan committed Jun 18, 2024
1 parent 0e622f7 commit 95f6c9f
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 14 deletions.
88 changes: 88 additions & 0 deletions InsertSourcePartitionOrOffsetValue.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# InsertSourcePartitionOrOffsetValue

## Description

The `InsertSourcePartitionOrOffsetValue` transformation in Kafka Connect allows you to insert headers into SourceRecords based on partition or offset values. This is useful for adding metadata to your data records before they are sent to destinations like AWS S3, Azure Datalake, or GCP Storage.

## Note

This SMT only works with source connectors.

## Configuration

To use this transformation, you need to configure it in your Kafka Connect connector properties.

### Configuration Properties

| Configuration Property | Description | Optionality | Default Value |
|------------------------|---------------------------------------------------------------|-------------|----------------|
| `offset.fields` | Comma-separated list of fields to retrieve from the offset | Optional | Empty list |
| `offset.prefix` | Optional prefix for offset keys | Optional | `"offset."` |
| `partition.fields` | Comma-separated list of fields to retrieve from the partition | Required | Empty list |
| `partition.prefix` | Optional prefix for partition keys | Optional | `"partition."` |

- **Default Value**: Specifies the default value assigned if no value is explicitly provided in the configuration.

These properties allow you to customize which fields from the offset and partition of a SourceRecord are added as headers, along with specifying optional prefixes for the header keys. Adjust these configurations based on your specific use case and data requirements.

### Example Configuration

```properties
transforms=InsertSourcePartitionOrOffsetValue
transforms.InsertSourcePartitionOrOffsetValue.type=io.lenses.connect.smt.header.InsertSourcePartitionOrOffsetValue
transforms.InsertSourcePartitionOrOffsetValue.offset.fields=path,line,ts
transforms.InsertSourcePartitionOrOffsetValue.partition.fields=container,prefix
```

### Explanation of Configuration

- `transforms`: This property lists the transformations to be applied to the records.
- `transforms.InsertSourcePartitionOrOffsetValue.type`: Specifies the class implementing the transformation (`InsertSourcePartitionOrOffsetValue` in this case).
- `transforms.InsertSourcePartitionOrOffsetValue.offset.fields`: Defines the fields from the offset to be inserted as headers in the SourceRecord. Replace `path,line,ts` with the actual field names you want to extract from the offset.
- `transforms.InsertSourcePartitionOrOffsetValue.partition.fields`: Defines the fields from the partition to be inserted as headers in the SourceRecord. Replace `container,prefix` with the actual field names you want to extract from the partition.

## Example Usage with Cloud Connectors

### AWS S3, Azure Datalake or GCP Storage

When using this transformation with AWS S3, you can configure your Kafka Connect connector as follows:

```properties
transforms=InsertSourcePartitionOrOffsetValue
transforms.InsertSourcePartitionOrOffsetValue.type=io.lenses.connect.smt.header.InsertSourcePartitionOrOffsetValue
transforms.InsertSourcePartitionOrOffsetValue.offset.fields=path,line,ts
transforms.InsertSourcePartitionOrOffsetValue.partition.fields=container,prefix
```

To customise the header prefix you can also set the header values:

Replace `path,line,ts` and `container,prefix` with the actual field names you are interested in extracting from the partition or offset.

By using `InsertSourcePartitionOrOffsetValue` transformation, you can enrich your data records with additional metadata headers based on partition or offset values before they are delivered to your cloud storage destinations.


### Using the Prefix Feature in InsertSourcePartitionOrOffsetValue Transformation

The prefix feature in `InsertSourcePartitionOrOffsetValue` allows you to prepend a consistent identifier to each header key added based on partition or offset values from SourceRecords.

#### Configuration

Configure the transformation in your Kafka Connect connector properties:

```properties
transforms=InsertSourcePartitionOrOffsetValue
transforms.InsertSourcePartitionOrOffsetValue.type=io.lenses.connect.smt.header.InsertSourcePartitionOrOffsetValue
transforms.InsertSourcePartitionOrOffsetValue.offset.fields=path,line,ts
transforms.InsertSourcePartitionOrOffsetValue.offset.prefix=offset.
transforms.InsertSourcePartitionOrOffsetValue.partition.fields=container,prefix
transforms.InsertSourcePartitionOrOffsetValue.partition.prefix=partition.
```

- `offset.prefix`: Specifies the prefix for headers derived from offset values. Default is `"offset."`.
- `partition.prefix`: Specifies the prefix for headers derived from partition values. Default is `"partition."`.

#### Example Usage

By setting `offset.prefix=offset.` and `partition.prefix=partition.`, headers added based on offset and partition fields will have keys prefixed accordingly in the SourceRecord headers.

This configuration ensures clarity and organization when inserting metadata headers into your Kafka records, distinguishing them based on their source (offset or partition). Adjust prefixes (`offset.prefix` and `partition.prefix`) as per your naming conventions or requirements.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
*/
package io.lenses.connect.smt.header;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand All @@ -19,27 +20,52 @@
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.transforms.Transformation;

/**
* A Kafka Connect transformation that inserts headers based on partition or offset values from
* SourceRecords.
*/
public class InsertSourcePartitionOrOffsetValue implements Transformation<SourceRecord> {

/** Default prefix for offset headers. */
public static final String DEFAULT_PREFIX_OFFSET = "offset.";
public static final String DEFAULT_PREFIX_PARTITION = "partition..";

/** Default prefix for partition headers. */
public static final String DEFAULT_PREFIX_PARTITION = "partition.";

Configuration offsetConfig;
Configuration partitionConfig;

/** Internal class to hold configuration details for fields and prefixes. */
static class Configuration {

private final List<String> fields;
private final String prefix;

/**
* Constructs a Configuration instance.
*
* @param fields List of fields to retrieve.
* @param prefix Prefix to prepend to each field.
*/
public Configuration(final List<String> fields, final String prefix) {
this.fields = fields;
this.prefix = prefix;
}

/**
* Retrieves the list of fields.
*
* @return List of fields.
*/
public List<String> getFields() {
return fields;
}

/**
* Retrieves the prefix.
*
* @return Prefix.
*/
public String getPrefix() {
return prefix;
}
Expand Down Expand Up @@ -106,28 +132,35 @@ public void close() {
@Override
public void configure(Map<String, ?> map) {
offsetConfig =
new Configuration(getFields(map, KEY_OFFSET_FIELDS), getPrefix(map, KEY_OFFSET_PREFIX));
new Configuration(
getFields(map, KEY_OFFSET_FIELDS),
getPrefix(map, KEY_OFFSET_PREFIX, DEFAULT_PREFIX_OFFSET));
partitionConfig =
new Configuration(
getFields(map, KEY_PARTITION_FIELDS), getPrefix(map, KEY_PARTITION_PREFIX));
getFields(map, KEY_PARTITION_FIELDS),
getPrefix(map, KEY_PARTITION_PREFIX, DEFAULT_PREFIX_PARTITION));
}

private static String getPrefix(Map<String, ?> map, String keyOffsetPrefix) {
return Optional.ofNullable((String) map.get(keyOffsetPrefix))
// this exception should never be thrown if Kafka Connect assures the default value
.orElseThrow(() -> new IllegalStateException(keyOffsetPrefix + "not specified"));
private static String getPrefix(Map<String, ?> map, String prefix, String defaultPrefix) {
return Optional.ofNullable((String) map.get(prefix)).orElse(defaultPrefix);
}

private List<String> getFields(Map<String, ?> map, String offsetFields) {
return Optional.ofNullable(map.get(offsetFields)).stream()
.map(
p -> {
if (!(p instanceof List)) {
throw new IllegalStateException(offsetFields + " should be a List");
}
return ((List<?>) p);
})
.map(p -> extractList(offsetFields, p))
.flatMap(p -> p.stream().map(Object::toString))
.collect(Collectors.toList());
}

private static List<?> extractList(String offsetFields, Object p) {
if (p instanceof List) {
return ((List<?>) p);
} else if (p instanceof String) {
var split = ((String) p).split(",");
return Arrays.asList(split);
} else {
throw new IllegalStateException(
offsetFields + " should be a List but they are a " + p.getClass().getName());
}
}
}

0 comments on commit 95f6c9f

Please sign in to comment.