Skip to content

Commit

Permalink
Merge pull request #6 from lensesio/feat/LC-134
Browse files Browse the repository at this point in the history
Simplify S3/Azure/GCS Sink Configuration
  • Loading branch information
stheppi committed Mar 12, 2024
2 parents 93dbeb6 + c5f1678 commit a34b803
Show file tree
Hide file tree
Showing 18 changed files with 2,108 additions and 1 deletion.
101 changes: 101 additions & 0 deletions InsertRecordTimestampHeaders.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# Insert Wallclock

## Description

A Kafka Connect Single Message Transform (SMT) that inserts date, year, month,day, hour, minute and second headers using
the record timestamp. If the record timestamp is null, the SMT uses the current system time.

The headers inserted are of type STRING. By using this SMT, you can partition the data by `yyyy-MM-dd/HH`
or `yyyy/MM/dd/HH`, for example, and only use one SMT.

The list of headers inserted are:

* date
* year
* month
* day
* hour
* minute
* second

All headers can be prefixed with a custom prefix. For example, if the prefix is `wallclock_`, then the headers will be:

* wallclock_date
* wallclock_year
* wallclock_month
* wallclock_day
* wallclock_hour
* wallclock_minute
* wallclock_second

When used with the Lenses connectors for S3, GCS or Azure data lake, the headers can be used to partition the data.
Considering the headers have been prefixed by `_`, here are a few KCQL examples:

```
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header._date, _header._hour
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header._year, _header._month, _header._day, _header._hour
```

## Configuration

| Name | Description | Type | Default | Importance |
|----------------------|-----------------------------------------------------------------|--------|------------|------------|
| `header.prefix.name` | Optional header prefix. | String | | Low |
| `date.format` | Optional Java date time formatter. | String | yyyy-MM-dd | Low |
| `year.format` | Optional Java date time formatter for the year component. | String | yyyy | Low |
| `month.format` | Optional Java date time formatter for the month component. | String | MM | Low |
| `day.format` | Optional Java date time formatter for the day component. | String | dd | Low |
| `hour.format` | Optional Java date time formatter for the hour component. | String | HH | Low |
| `minute.format` | Optional Java date time formatter for the minute component. | String | mm | Low |
| `second.format` | Optional Java date time formatter for the second component. | String | ss | Low |
| `timezone` | Optional. Sets the timezone. It can be any valid Java timezone. | String | UTC | Low |
| `locale` | Optional. Sets the locale. It can be any valid Java locale. | String | en | Low |

## Example

To store the epoch value, use the following configuration:

```properties
transforms=InsertWallclock
transforms.InsertWallclock.type=io.lenses.connect.smt.header.InsertWallclockHeaders
```

To prefix the headers with `wallclock_`, use the following:

```properties
transforms=InsertWallclock
transforms.InsertWallclock.type=io.lenses.connect.smt.header.InsertWallclockHeaders
transforms.InsertWallclock.header.prefix.name=wallclock_
```

To change the date format, use the following:

```properties
transforms=InsertWallclock
transforms.InsertWallclock.type=io.lenses.connect.smt.header.InsertWallclockHeaders
transforms.InsertWallclock.date.format=yyyy-MM-dd
```

To use the timezone `Asia/Kolkoata`, use the following:

```properties
transforms=InsertWallclock
transforms.InsertWallclock.type=io.lenses.connect.smt.header.InsertWallclockHeaders
transforms.InsertWallclock.timezone=Asia/Kolkata
```

To facilitate S3, GCS, or Azure Data Lake partitioning using a Hive-like partition name format, such
as `date=yyyy-MM-dd / hour=HH`, employ the following SMT configuration for a partition strategy.

```properties
transforms=InsertWallclock
transforms.InsertWallclock.type=io.lenses.connect.smt.header.InsertWallclockHeaders
transforms.InsertWallclock.date.format="date=yyyy-MM-dd"
transforms.InsertWallclock.hour.format="hour=yyyy"
```

and in the KCQL setting utilise the headers as partitioning keys:

```properties
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date, _header.year
```
118 changes: 118 additions & 0 deletions InsertRollingRecordTimestampHeaders.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
# Insert Wallclock

## Description

A Kafka Connect Single Message Transform (SMT) that inserts date, year, month,day, hour, minute and second headers using
the record timestamp and a rolling time window configuration. If the record timestamp is null, the SMT uses the current
system time.

The headers inserted are of type STRING. By using this SMT, you can partition the data by `yyyy-MM-dd/HH`
or `yyyy/MM/dd/HH`, for example, and only use one SMT.

The list of headers inserted are:

* date
* year
* month
* day
* hour
* minute
* second

All headers can be prefixed with a custom prefix. For example, if the prefix is `wallclock_`, then the headers will be:

* wallclock_date
* wallclock_year
* wallclock_month
* wallclock_day
* wallclock_hour
* wallclock_minute
* wallclock_second

When used with the Lenses connectors for S3, GCS or Azure data lake, the headers can be used to partition the data.
Considering the headers have been prefixed by `_`, here are a few KCQL examples:

```
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header._date, _header._hour
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header._year, _header._month, _header._day, _header._hour
```

## Configuration

| Name | Description | Type | Default | Importance |
|-----------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------|--------|------------|-------------------------|
| `header.prefix.name` | Optional header prefix. | String | | Low |
| `date.format` | Optional Java date time formatter. | String | yyyy-MM-dd | Low |
| `year.format` | Optional Java date time formatter for the year component. | String | yyyy | Low |
| `month.format` | Optional Java date time formatter for the month component. | String | MM | Low |
| `day.format` | Optional Java date time formatter for the day component. | String | dd | Low |
| `hour.format` | Optional Java date time formatter for the hour component. | String | HH | Low |
| `minute.format` | Optional Java date time formatter for the minute component. | String | mm | Low |
| `second.format` | Optional Java date time formatter for the second component. | String | ss | Low |
| `timezone` | Optional. Sets the timezone. It can be any valid Java timezone. | String | UTC | Low |
| `locale` | Optional. Sets the locale. It can be any valid Java locale. | String | en | Low |
| `rolling.window.type` | Sets the window type. It can be fixed or rolling. | String | minutes | hours, minutes, seconds | High |
| `rolling.window.size` | Sets the window size. It can be any positive integer, and depending on the `window.type` it has an upper bound, 60 for seconds and minutes, and 24 for hours. | Int | 15 | | High |

## Example

To store the epoch value, use the following configuration:

```properties
transforms=rollingWindow
transforms.rollingWindow.type=io.lenses.connect.smt.header.InsertRollingRecordTimestampHeaders
transforms.rollingWindow.rolling.window.type=minutes
transforms.rollingWindow.rolling.window.size=15
```

To prefix the headers with `wallclock_`, use the following:

```properties
transforms=rollingWindow
transforms.rollingWindow.type=io.lenses.connect.smt.header.InsertRollingRecordTimestampHeaders
transforms.rollingWindow.header.prefix.name=wallclock_
transforms.rollingWindow.rolling.window.type=minutes
transforms.rollingWindow.rolling.window.size=15
```

To change the date format, use the following:

```properties
transforms=rollingWindow
transforms.rollingWindow.type=io.lenses.connect.smt.header.InsertRollingRecordTimestampHeaders
transforms.rollingWindow.header.prefix.name=wallclock_
transforms.rollingWindow.rolling.window.type=minutes
transforms.rollingWindow.rolling.window.size=15
transforms.rollingWindow.date.format="date=yyyy-MM-dd"
```

To use the timezone `Asia/Kolkoata`, use the following:

```properties
transforms=rollingWindow
transforms.rollingWindow.type=io.lenses.connect.smt.header.InsertRollingRecordTimestampHeaders
transforms.rollingWindow.header.prefix.name=wallclock_
transforms.rollingWindow.rolling.window.type=minutes
transforms.rollingWindow.rolling.window.size=15
transforms.rollingWindow.timezone=Asia/Kolkata
```


To facilitate S3, GCS, or Azure Data Lake partitioning using a Hive-like partition name format, such
as `date=yyyy-MM-dd / hour=HH`, employ the following SMT configuration for a partition strategy.

```properties
transforms=rollingWindow
transforms.rollingWindow.type=io.lenses.connect.smt.header.InsertRollingRecordTimestampHeaders
transforms.rollingWindow.rolling.window.type=minutes
transforms.rollingWindow.rolling.window.size=15
transforms.rollingWindow.timezone=Asia/Kolkata
transforms.rollingWindow.date.format="date=yyyy-MM-dd"
transforms.rollingWindow.hour.format="hour=yyyy"
```

and in the KCQL setting utilise the headers as partitioning keys:

```properties
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date, _header.year
```
117 changes: 117 additions & 0 deletions InsertRollingWallclockHeaders.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# Insert Wallclock

## Description

A Kafka Connect Single Message Transform (SMT) that inserts date, year, month,day, hour, minute and second headers using
the system timestamp and a rolling time window configuration.

The headers inserted are of type STRING. By using this SMT, you can partition the data by `yyyy-MM-dd/HH`
or `yyyy/MM/dd/HH`, for example, and only use one SMT.

The list of headers inserted are:

* date
* year
* month
* day
* hour
* minute
* second

All headers can be prefixed with a custom prefix. For example, if the prefix is `wallclock_`, then the headers will be:

* wallclock_date
* wallclock_year
* wallclock_month
* wallclock_day
* wallclock_hour
* wallclock_minute
* wallclock_second

When used with the Lenses connectors for S3, GCS or Azure data lake, the headers can be used to partition the data.
Considering the headers have been prefixed by `_`, here are a few KCQL examples:

```
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header._date, _header._hour
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header._year, _header._month, _header._day, _header._hour
```

## Configuration

| Name | Description | Type | Default | Importance |
|-----------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------|--------|------------|-------------------------|
| `header.prefix.name` | Optional header prefix. | String | | Low |
| `date.format` | Optional Java date time formatter. | String | yyyy-MM-dd | Low |
| `year.format` | Optional Java date time formatter for the year component. | String | yyyy | Low |
| `month.format` | Optional Java date time formatter for the month component. | String | MM | Low |
| `day.format` | Optional Java date time formatter for the day component. | String | dd | Low |
| `hour.format` | Optional Java date time formatter for the hour component. | String | HH | Low |
| `minute.format` | Optional Java date time formatter for the minute component. | String | mm | Low |
| `second.format` | Optional Java date time formatter for the second component. | String | ss | Low |
| `timezone` | Optional. Sets the timezone. It can be any valid Java timezone. | String | UTC | Low |
| `locale` | Optional. Sets the locale. It can be any valid Java locale. | String | en | Low |
| `rolling.window.type` | Sets the window type. It can be fixed or rolling. | String | minutes | hours, minutes, seconds | High |
| `rolling.window.size` | Sets the window size. It can be any positive integer, and depending on the `window.type` it has an upper bound, 60 for seconds and minutes, and 24 for hours. | Int | 15 | | High |

## Example

To store the epoch value, use the following configuration:

```properties
transforms=rollingWindow
transforms.rollingWindow.type=io.lenses.connect.smt.header.InsertRollingWallclockHeaders
transforms.rollingWindow.rolling.window.type=minutes
transforms.rollingWindow.rolling.window.size=15
```

To prefix the headers with `wallclock_`, use the following:

```properties
transforms=rollingWindow
transforms.rollingWindow.type=io.lenses.connect.smt.header.InsertRollingWallclockHeaders
transforms.rollingWindow.header.prefix.name=wallclock_
transforms.rollingWindow.rolling.window.type=minutes
transforms.rollingWindow.rolling.window.size=15
```

To change the date format, use the following:

```properties
transforms=rollingWindow
transforms.rollingWindow.type=io.lenses.connect.smt.header.InsertRollingWallclockHeaders
transforms.rollingWindow.header.prefix.name=wallclock_
transforms.rollingWindow.rolling.window.type=minutes
transforms.rollingWindow.rolling.window.size=15
transforms.rollingWindow.date.format="date=yyyy-MM-dd"
```

To use the timezone `Asia/Kolkoata`, use the following:

```properties
transforms=rollingWindow
transforms.rollingWindow.type=io.lenses.connect.smt.header.InsertRollingWallclockHeaders
transforms.rollingWindow.header.prefix.name=wallclock_
transforms.rollingWindow.rolling.window.type=minutes
transforms.rollingWindow.rolling.window.size=15
transforms.rollingWindow.timezone=Asia/Kolkata
```


To facilitate S3, GCS, or Azure Data Lake partitioning using a Hive-like partition name format, such
as `date=yyyy-MM-dd / hour=HH`, employ the following SMT configuration for a partition strategy.

```properties
transforms=rollingWindow
transforms.rollingWindow.type=io.lenses.connect.smt.header.InsertRollingWallclockHeaders
transforms.rollingWindow.rolling.window.type=minutes
transforms.rollingWindow.rolling.window.size=15
transforms.rollingWindow.timezone=Asia/Kolkata
transforms.rollingWindow.date.format="date=yyyy-MM-dd"
transforms.rollingWindow.hour.format="hour=yyyy"
```

and in the KCQL setting utilise the headers as partitioning keys:

```properties
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date, _header.year
```
5 changes: 5 additions & 0 deletions InsertWallclock.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

## Description

> **Note:** Use [InsertWallclockHeaders](./InsertWallclockHeaders.md) SMT if you want to use more than one date time
> part. This avoids multiple SMTs and is more efficient. For example if you want to partition the data
> by `yyyy-MM-dd/HH`,
> then you can use `InsertWallclockHeaders` which inserts multiple headers: date, year, month,day, hour, minute, second.
>
A Kafka Connect Single Message Transform (SMT) that inserts the system clock as a message header.

Inserts the system clock as a message header, with a value of type STRING. The value can be either a string
Expand Down
4 changes: 4 additions & 0 deletions InsertWallclockDateTimePart.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## Description

> **Note:** Use [InsertWallclockHeaders](./InsertWallclockHeaders.md) SMT if you want to use more than one date time
> part. This avoids multiple SMTs and is more efficient.
>
A Kafka Connect Single Message Transform (SMT) that inserts the system clock year, month, day, minute, or seconds as a
message header, with a value of type STRING.

Expand Down Expand Up @@ -50,6 +53,7 @@ transforms.InsertWallclockDateTimePart.type=io.lenses.connect.smt.header.InsertW
transforms.InsertWallclockDateTimePart.header.name=wallclock
transforms.InsertWallclockDateTimePart.date.time.part=hour
```

To store the hour, and apply a timezone, use the following configuration:

```properties
Expand Down
Loading

0 comments on commit a34b803

Please sign in to comment.