Skip to content

Commit

Permalink
Merge pull request #5 from lensesio/feat/timezone_conversion
Browse files Browse the repository at this point in the history
Timezone support
  • Loading branch information
stheppi committed Feb 26, 2024
2 parents e5e85df + 4cbd0de commit 93dbeb6
Show file tree
Hide file tree
Showing 17 changed files with 625 additions and 217 deletions.
65 changes: 3 additions & 62 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@ name: CI/CD

on:
push:
branches:
- main
branches: [ "*" ]
pull_request:
branches: [ "*" ]

create:
tags:
- 'v*'

jobs:
build:
Expand All @@ -24,69 +22,12 @@ jobs:
java-version: '11'
distribution: 'temurin'

- name: Extract tag name
id: extract_tag
run: echo ::set-output name=TAG_NAME::${GITHUB_REF#refs/tags/}

- name: Update Maven version
run: mvn versions:set -DnewVersion=${{ steps.extract_tag.outputs.TAG_NAME }}

- name: Check License
run: mvn license:check

- name: Checkstyle
run: mvn checkstyle:checkstyle

- name: Build
run: mvn clean package -B

- name: Create JAR
run: mvn jar:jar

release:
name: Create Release
needs: build
if: startsWith(github.ref, 'refs/tags/v')

runs-on: ubuntu-latest

steps:
- name: Checkout code
uses: actions/checkout@v3

- name: Set up JDK
uses: actions/setup-java@v3
with:
java-version: '11' # Or the desired Java version
distribution: 'temurin'

- name: Extract tag name
id: extract_tag
run: echo ::set-output name=TAG_NAME::${GITHUB_REF#refs/tags/}

- name: Update Maven version
run: mvn versions:set -DnewVersion=${{ steps.extract_tag.outputs.TAG_NAME }}

- name: Build Jar
run: mvn -B package --file pom.xml -DskipTests

- name: Create Release
id: create_release
uses: actions/create-release@v1
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
tag_name: ${{ github.ref }}
release_name: Release ${{ github.ref }}
draft: false
prerelease: false

- name: Upload JAR
uses: actions/upload-release-asset@v1
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
upload_url: ${{ steps.create_release.outputs.upload_url }}
asset_path: ./target/kafka-connect-smt-${{ steps.extract_tag.outputs.TAG_NAME }}.jar
asset_name: kafka-connect-smt-${{ steps.extract_tag.outputs.TAG_NAME }}.jar
asset_content_type: application/java-archive
88 changes: 88 additions & 0 deletions .github/workflows/release.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
name: CI/CD

on:
create:
tags:
- 'v*'

jobs:
build:
name: Build
runs-on: ubuntu-latest

steps:
- name: Checkout code
uses: actions/checkout@v3

- name: Set up JDK
uses: actions/setup-java@v3
with:
java-version: '11'
distribution: 'temurin'

- name: Extract tag name
id: extract_tag
run: echo ::set-output name=TAG_NAME::${GITHUB_REF#refs/tags/}

- name: Update Maven version
run: mvn versions:set -DnewVersion=${{ steps.extract_tag.outputs.TAG_NAME }}

- name: Check License
run: mvn license:check

- name: Checkstyle
run: mvn checkstyle:checkstyle

- name: Build
run: mvn clean package -B

- name: Create JAR
run: mvn jar:jar

release:
name: Create Release
needs: build
if: startsWith(github.ref, 'refs/tags/v')

runs-on: ubuntu-latest

steps:
- name: Checkout code
uses: actions/checkout@v3

- name: Set up JDK
uses: actions/setup-java@v3
with:
java-version: '11' # Or the desired Java version
distribution: 'temurin'

- name: Extract tag name
id: extract_tag
run: echo ::set-output name=TAG_NAME::${GITHUB_REF#refs/tags/}

- name: Update Maven version
run: mvn versions:set -DnewVersion=${{ steps.extract_tag.outputs.TAG_NAME }}

- name: Build Jar
run: mvn -B package --file pom.xml -DskipTests

- name: Create Release
id: create_release
uses: actions/create-release@v1
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
tag_name: ${{ github.ref }}
release_name: Release ${{ github.ref }}
draft: false
prerelease: false

- name: Upload JAR
uses: actions/upload-release-asset@v1
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
upload_url: ${{ steps.create_release.outputs.upload_url }}
asset_path: ./target/kafka-connect-smt-${{ steps.extract_tag.outputs.TAG_NAME }}.jar
asset_name: kafka-connect-smt-${{ steps.extract_tag.outputs.TAG_NAME }}.jar
asset_content_type: application/java-archive
16 changes: 15 additions & 1 deletion InsertRollingWallclock.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ The value inserted is stored as a STRING, and it holds either a string represent
| `format` | Sets the format of the header value inserted if the type was set to string. It can be any valid java date format. | String | | | High |
| `rolling.window.type` | Sets the window type. It can be fixed or rolling. | String | minutes | hours, minutes, seconds | High |
| `rolling.window.size` | Sets the window size. It can be any positive integer, and depending on the `window.type` it has an upper bound, 60 for seconds and minutes, and 24 for hours. | Int | 15 | | High |
| `timezone` | Sets the timezone. It can be any valid java timezone. Overwrite it when `value.type` is set to `format`, otherwise it will raise an exception. | String | UTC | | High |

## Example

Expand All @@ -36,8 +37,21 @@ To store a string representation of the date and time in the format `yyyy-MM-dd
transforms=InsertRollingWallclock
transforms.InsertRollingWallclock.type=io.lenses.connect.smt.header.InsertRollingWallclock
transforms.InsertRollingWallclock.header.name=wallclock
transforms.InsertRollingWallclock.value.type=string
transforms.InsertRollingWallclock.value.type=format
transforms.InsertRollingWallclock.format=yyyy-MM-dd HH:mm:ss.SSS
transforms.InsertRollingWallclock.rolling.window.type=minutes
transforms.InsertRollingWallclock.rolling.window.size=15
```

To use the timezone `Asia/Kolkoata`, use the following:

```properties
transforms=InsertRollingWallclock
transforms.InsertRollingWallclock.type=io.lenses.connect.smt.header.InsertRollingWallclock
transforms.InsertRollingWallclock.header.name=wallclock
transforms.InsertRollingWallclock.value.type=format
transforms.InsertRollingWallclock.format=yyyy-MM-dd HH:mm:ss.SSS
transforms.InsertRollingWallclock.rolling.window.type=minutes
transforms.InsertRollingWallclock.rolling.window.size=15
transforms.InsertRollingWallclock.timezone=Asia/Kolkata
```
26 changes: 18 additions & 8 deletions InsertWallclock.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,12 @@ for example `yyyy-MM-dd HH:mm:ss.SSS`.

## Configuration

| Name | Description | Type | Default | Valid Values | Importance |
|---------------|-----------------------------------------------------------------------------------------------------------------------|--------|---------|--------------|------------|
| `header.name` | The name of the header to insert the timestamp into. | String | | | High |
| `value.type` | Sets the header value inserted. It can be epoch or string. If string is used, then the 'format' setting is required." | String | format | epoch,format | High |
| `format` | Sets the format of the header value inserted if the type was set to string. It can be any valid java date format. | String | | | High |


| Name | Description | Type | Default | Valid Values | Importance |
|---------------|------------------------------------------------------------------------------------------------------------------------------------------------|--------|---------|--------------|------------|
| `header.name` | The name of the header to insert the timestamp into. | String | | | High |
| `value.type` | Sets the header value inserted. It can be epoch or string. If string is used, then the 'format' setting is required." | String | format | epoch,format | High |
| `format` | Sets the format of the header value inserted if the type was set to string. It can be any valid java date format. | String | | | High |
| `timezone` | Sets the timezone. It can be any valid java timezone. Overwrite it when `value.type` is set to `format`, otherwise it will raise an exception. | String | UTC | | High |

## Example

Expand All @@ -35,6 +34,17 @@ To store a string representation of the date and time in the format `yyyy-MM-dd
transforms=InsertWallclock
transforms.InsertWallclock.type=io.lenses.connect.smt.header.InsertWallclock
transforms.InsertWallclock.header.name=wallclock
transforms.InsertWallclock.value.type=string
transforms.InsertWallclock.value.type=format
transforms.InsertWallclock.format=yyyy-MM-dd HH:mm:ss.SSS
```

To use the timezone `Asia/Kolkoata`, use the following:

```properties
transforms=InsertWallclock
transforms.InsertWallclock.type=io.lenses.connect.smt.header.InsertWallclock
transforms.InsertWallclock.header.name=wallclock
transforms.InsertWallclock.value.type=format
transforms.InsertWallclock.format=yyyy-MM-dd HH:mm:ss.SSS
transforms.InsertWallclock.timezone=Asia/Kolkata
```
23 changes: 16 additions & 7 deletions InsertWallclockDateTimePart.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@

## Description

A Kafka Connect Single Message Transform (SMT) that inserts the system clock year, month, day, minute, or seconds as a message header, with a value of type STRING.
A Kafka Connect Single Message Transform (SMT) that inserts the system clock year, month, day, minute, or seconds as a
message header, with a value of type STRING.

## Configuration


| Name | Description | Type | Default | Valid Values | Importance |
|------------------|------------------------------------------------------|--------|---------|---------------------------------------|------------|
| `header.name` | The name of the header to insert the timestamp into. | String | | | High |
| `date.time.part` | The date time part to insert. | String | | year, month, day, hour,minute, second | High |

| Name | Description | Type | Default | Valid Values | Importance |
|------------------|-------------------------------------------------------|--------|---------|---------------------------------------|------------|
| `header.name` | The name of the header to insert the timestamp into. | String | | | High |
| `date.time.part` | The date time part to insert. | String | | year, month, day, hour,minute, second | High |
| `timezone` | Sets the timezone. It can be any valid java timezone. | String | UTC | | High |

## Example

Expand Down Expand Up @@ -50,6 +50,15 @@ transforms.InsertWallclockDateTimePart.type=io.lenses.connect.smt.header.InsertW
transforms.InsertWallclockDateTimePart.header.name=wallclock
transforms.InsertWallclockDateTimePart.date.time.part=hour
```
To store the hour, and apply a timezone, use the following configuration:

```properties
transforms=InsertWallclockDateTimePart
transforms.InsertWallclockDateTimePart.type=io.lenses.connect.smt.header.InsertWallclockDateTimePart
transforms.InsertWallclockDateTimePart.header.name=wallclock
transforms.InsertWallclockDateTimePart.date.time.part=hour
transforms.InsertWallclockDateTimePart.timezone=Asia/Kolkata
```

To store the minute, use the following configuration:

Expand Down
28 changes: 20 additions & 8 deletions TimestampConverter.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

## Description

An adapted version of the [TimestampConverter](https://github.com/apache/kafka/blob/5c2492bca71200806ccf776ea31639a90290d43e/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java#L50) SMT, that allows the user to specify the format of the timestamp inserted as a header.
An adapted version of
the [TimestampConverter](https://github.com/apache/kafka/blob/5c2492bca71200806ccf776ea31639a90290d43e/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java#L50)
SMT, that allows the user to specify the format of the timestamp inserted as a header.
It also avoids the synchronization block requirement for converting to a string representation of the timestamp.

The SMT adds a few more features to the original:
Expand All @@ -12,10 +14,8 @@ The SMT adds a few more features to the original:
* allows conversion from one string representation to another (e.g. `yyyy-MM-dd HH:mm:ss` to `yyyy-MM-dd`)
* allows conversion using a rolling window boundary (e.g. every 15 minutes, or one hour)


## Configuration


| Name | Description | Type | Default | Valid Values |
|-----------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------|--------------|--------------------------------------------------|
| `header.name` | The name of the header to insert the timestamp into. | String | | |
Expand All @@ -26,11 +26,12 @@ The SMT adds a few more features to the original:
| `rolling.window.type` | An optional parameter for the rolling time window type. When set it will adjust the output value according to the time window boundary. | String | none | none, hours, minutes, seconds |
| `rolling.window.size` | An optional positive integer parameter for the rolling time window size. When `rolling.window.type` is defined this setting is required. The value is bound by the `rolling.window.type` configuration. If type is `minutes` or `seconds` then the value cannot bigger than 60, and if the type is `hours` then the max value is 24. | Int | 15 | |
| `unix.precision` | The desired Unix precision for the timestamp. Used to generate the output when type=unix or used to parse the input if the input is a Long. This SMT will cause precision loss during conversions from, and to, values with sub-millisecond components. | String | milliseconds | seconds, milliseconds, microseconds, nanoseconds |

| `timezone` | Sets the timezone. It can be any valid java timezone. Overwrite it when `target.type` is set to `date, time, or string`, otherwise it will raise an exception. | String | UTC | |

## Example

To convert to and from a string representation of the date and time in the format `yyyy-MM-dd HH:mm:ss.SSS`, use the following configuration:
To convert to and from a string representation of the date and time in the format `yyyy-MM-dd HH:mm:ss.SSS`, use the
following configuration:

```properties
transforms=TimestampConverter
Expand All @@ -44,7 +45,6 @@ transforms.TimestampConverter.format.to.pattern=yyyy-MM-dd HH:mm:ss.SSS

To convert to and from a string representation while applying an hourly rolling window:


```properties
transforms=TimestampConverter
transforms.TimestampConverter.type=io.lenses.connect.smt.header.TimestampConverter
Expand All @@ -57,8 +57,21 @@ transforms.TimestampConverter.rolling.window.type=hours
transforms.TimestampConverter.rolling.window.size=1
```

To convert to and from a string representation while applying a 15 minutes rolling window:
To convert to and from a string representation while applying an hourly rolling window and timezone:

```properties
transforms=TimestampConverter
transforms.TimestampConverter.type=io.lenses.connect.smt.header.TimestampConverter
transforms.TimestampConverter.header.name=wallclock
transforms.TimestampConverter.field=_value.ts
transforms.TimestampConverter.target.type=string
transforms.TimestampConverter.format.from.pattern=yyyyMMddHHmmssSSS
transforms.TimestampConverter.format.to.pattern=yyyy-MM-dd-HH
transforms.TimestampConverter.rolling.window.type=hours
transforms.TimestampConverter.rolling.window.size=1
transforms.TimestampConverter.timezone=Asia/Kolkata
```
To convert to and from a string representation while applying a 15 minutes rolling window:

```properties
transforms=TimestampConverter
Expand All @@ -72,7 +85,6 @@ transforms.TimestampConverter.rolling.window.type=minutes
transforms.TimestampConverter.rolling.window.size=15
```


To convert to and from a Unix timestamp, use the following:

```properties
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
</execution>
</executions>
</plugin>
<plugin>
<!--<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>3.1.2</version>
Expand Down Expand Up @@ -114,7 +114,7 @@
<version>10.12.1</version>
</dependency>
</dependencies>
</plugin>
</plugin>-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
Expand Down
Loading

0 comments on commit 93dbeb6

Please sign in to comment.