Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Debezium 2.x docs #1149

Open
wants to merge 31 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
21dd6c8
New Debezium docs
TomaszGaweda Jun 27, 2024
1595164
Example changed
TomaszGaweda Jun 27, 2024
0c5e07b
Function overview
TomaszGaweda Jun 27, 2024
bb480f1
More info
TomaszGaweda Jun 27, 2024
17f6ebb
CDC-join
TomaszGaweda Jun 27, 2024
d39cd98
Merge branch 'main' into 5.5/debezium-docs
TomaszGaweda Jun 27, 2024
5e18cce
Fix links
TomaszGaweda Jun 28, 2024
4e0825f
Apply suggestions from code review
TomaszGaweda Jul 8, 2024
4fd775f
Apply suggestions from code review
TomaszGaweda Jul 8, 2024
a25adae
Review comments
TomaszGaweda Jul 9, 2024
3642046
CRs
TomaszGaweda Jul 9, 2024
941f476
Merge branch 'refs/heads/main' into 5.5/debezium-docs
TomaszGaweda Jul 9, 2024
9d68f3c
Post-merge
TomaszGaweda Jul 9, 2024
cf480cb
Merge branch 'main' into 5.5/debezium-docs
TomaszGaweda Jul 16, 2024
28df78e
Remove jar-with-dependencies
TomaszGaweda Jul 17, 2024
f140c77
Merge remote-tracking branch 'origin/5.5/debezium-docs' into 5.5/debe…
TomaszGaweda Jul 17, 2024
1313239
Changed package name
TomaszGaweda Jul 19, 2024
75bf12e
Merge branch 'main' into 5.5/debezium-docs
oliverhowell Jul 29, 2024
47a11bf
CR
TomaszGaweda Aug 20, 2024
d561f2c
Merge remote-tracking branch 'origin/5.5/debezium-docs' into 5.5/debe…
TomaszGaweda Aug 20, 2024
d03fbb3
Added migration guide
TomaszGaweda Aug 22, 2024
e562794
CR Suggestions
TomaszGaweda Sep 5, 2024
14ba5e9
CR
TomaszGaweda Sep 5, 2024
1ecd47d
Merge branch 'main' into 5.5/debezium-docs
TomaszGaweda Sep 5, 2024
9daaafb
Update docs/modules/integrate/pages/cdc-connectors.adoc
TomaszGaweda Sep 9, 2024
8870d99
Update docs/modules/integrate/pages/cdc-connectors.adoc
TomaszGaweda Sep 9, 2024
31448f4
Few changes
TomaszGaweda Sep 12, 2024
0c61046
Merge remote-tracking branch 'origin/5.5/debezium-docs' into 5.5/debe…
TomaszGaweda Sep 12, 2024
a7c0691
Everything can be string after all
TomaszGaweda Sep 12, 2024
5c1f269
Added formatting
TomaszGaweda Sep 12, 2024
e7c65e7
Merge branch 'main' into 5.5/debezium-docs
TomaszGaweda Sep 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/modules/ROOT/nav.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ include::wan:partial$nav.adoc[]
* xref:integrate:database-connectors.adoc[Database & CDC Connectors]
** xref:integrate:jdbc-connector.adoc[]
** xref:integrate:cdc-connectors.adoc[]
** xref:integrate:legacy-cdc-connectors.adoc[]
** xref:integrate:elasticsearch-connector.adoc[]
** xref:integrate:mongodb-connector.adoc[]
* File Connectors
Expand Down
227 changes: 196 additions & 31 deletions docs/modules/integrate/pages/cdc-connectors.adoc
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
= CDC Connector
[.enterprise]*Enterprise*

Change Data Capture (CDC) refers to the process of observing changes
made to a database and extracting them in a form usable by other
Expand All @@ -8,50 +9,162 @@ Change Data Capture is especially important to Hazelcast, because it allows
for the _streaming of changes from databases_, which can be efficiently
processed by the Jet engine.

Implementation of CDC in Hazelcast is based on
link:https://debezium.io/[Debezium]. Hazelcast offers a generic Debezium source
which can handle CDC events from link:https://debezium.io/documentation/reference/stable/connectors/index.html[any database supported by Debezium],
but we're also striving to make CDC sources first class citizens in Hazelcast.
The ones for MySQL and PostgreSQL already are.
The implementation of CDC in Hazelcast {enterprise-product-name} is based on
link:https://debezium.io/[Debezium 2.x, window=_blank]. Hazelcast offers a generic Debezium source
which can handle CDC events from link:https://debezium.io/documentation/reference/2.7/connectors/index.html[any database supported by Debezium, window=_blank],
However, we're also striving to make CDC sources first class citizens in Hazelcast,
as we have done already for MySQL and PostgreSQL.

== Installing the Connector

This connector is included in the full and slim distributions of Hazelcast.
This connector is included in the full distribution of Hazelcast {enterprise-product-name}.

=== Maven
To use this connector in a Maven project, add the following entries to the `<dependency>` section of your `pom.xml` file:

Generic connector:

[source,xml]
----
<dependency>
<groupId>com.hazelcast.jet</groupId>
<artifactId>hazelcast-enterprise-cdc-debezium</artifactId>
<version>{full-version}</version>
</dependency>
----

MySQL-specific connector:

[source,xml]
----
<dependency>
<groupId>com.hazelcast.jet</groupId>
<artifactId>hazelcast-enterprise-cdc-mysql</artifactId>
<version>{full-version}</version>
</dependency>
----
NOTE: Due to licensing, MySQL connector does not include the MySQL driver as a dependency. You have to manually add the `com.mysql:mysql-connector-j` dependency to the classpath.

PostgreSQL-specific connector:

[source,xml]
----
<dependency>
<groupId>com.hazelcast.jet</groupId>
<artifactId>hazelcast-enterprise-cdc-postgres</artifactId>
<version>{full-version}</version>
</dependency>
----

== CDC as a Source

We have the following types of CDC sources:
The Java API supports the following types of CDC source:

* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/jet/cdc/DebeziumCdcSources.html[DebeziumCdcSources]:
generic source for all databases supported by Debezium
* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/jet/cdc/mysql/MySqlCdcSources.html[MySqlCdcSources]:
specific, first class Jet CDC source for MySQL databases (also based
on Debezium, but benefiting the full range of convenience Jet can
additionally provide)
* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/jet/cdc/postgres/PostgresCdcSources.html[PostgresCdcSources]:
specific, first class CDC source for PostgreSQL databases (also based
on Debezium, but benefiting the full range of convenience Hazelcast can
additionally provide)
* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/enterprise/jet/cdc/DebeziumCdcSources.html[DebeziumCdcSources, window=_blank]:
a generic source for all databases supported by Debezium
* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/enterprise/jet/cdc/mysql/MySqlCdcSources.html[MySqlCdcSources, window=_blank]:
a specific, first class Jet CDC source for MySQL databases (also based
on Debezium, but with the additional benefits provided by Hazelcast
* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/enterprise/jet/cdc/postgres/PostgresCdcSources.html[PostgresCdcSources, window=_blank]:
a specific, first class CDC source for PostgreSQL databases (also based
on Debezium, but with the additional benefits provided by Hazelcast

For the setting up a streaming source of CDC data is just the matter of pointing it at the right database via configuration:
To set up a CDC data streaming source, define it using the following configuration:

```java
[tabs]
====
MySQL::
+
--
[source,java]
----
Pipeline pipeline = Pipeline.create();
pipeline.readFrom(
MySqlCdcSources.mysql("customers")
.setDatabaseAddress("127.0.0.1")
.setDatabasePort(3306)
.setDatabaseUser("debezium")
.setDatabasePassword("dbz")
.setDatabaseAddress("127.0.0.1", 3306)
.setDatabaseCredentials("debezium", "dbz")
.setClusterName("dbserver1")
.setDatabaseIncludeList("inventory")
.setTableIncludeList("inventory.customers")
.build())
.withNativeTimestamps(0)
.writeTo(Sinks.logger());
----
--
PostgreSQL::
+
--
[source,java]
----
Pipeline pipeline = Pipeline.create();
pipeline.readFrom(
PostgresCdcSources.postgres("customers")
.setDatabaseAddress("127.0.0.1", 5432)
.setDatabaseCredentials("debezium", "dbz")
.setClusterName("dbserver1")
.setDatabaseWhitelist("inventory")
.setTableWhitelist("inventory.customers")
.setDatabaseIncludeList("inventory")
.setTableIncludeList("inventory.customers")
.build())
.withNativeTimestamps(0)
.writeTo(Sinks.logger());
```
----
--
MongoDB::
+
--
[source,java]
----
Pipeline pipeline = Pipeline.create();
pipeline.readFrom(
DebeziumCdcSources.debezium("customers", MongoDbConnector.class)
.setProperty("mongodb.connection.string", "mongodb://localhost:27017")
.setDatabaseIncludeList("inventory")
.setProperty("collection.include.list", "customers")
.build())
.withNativeTimestamps(0)
.writeTo(Sinks.logger());
----
--

====

MySQL- and PostgreSQL-specific source builders contain methods for all major configuration settings with protection if, for example, mutually exclusive options are not used. If using a generic source builder, refer to the link:https://debezium.io/documentation/reference/stable/index.html[Debezium, window=_blank] documentation for the information about required or mutually exclusive fields.

Follow the provided xref:pipelines:cdc.adoc[] tutorial to see how CDC processes change events from a MySQL database.

[NOTE]
====
Remember you have to have database up and running before CDC job is started, including e.g. additional CDC agents required (like DB2 does require).
====

For an example of how to use CDC data see xref:pipelines:cdc.adoc[our tutorial].
=== Common source builder functions
[cols="m,a"]
|===
|Method name|Description

|changeRecord()
| Sets output type to `ChangeRecord` - a wrapper, which provides most of the fields in
a strongly-typed manner.

| json()
| Sets output type to `JSON` - in the result stage, the type will be set to `Map<String, String>`,
where the map entry's key is the key of `SourceRecord` in JSON format, and the value is the whole `SourceRecord`'s value in JSON format.

|customMapping(RecordMappingFunction<T>)
| Sets the output type to an arbitrary user type, `T`. Mapping from `SourceRecord` to `T` is done using the function provided by the connector.

|withDefaultEngine()
|Sets the preferred engine to the default (non-async) one. This engine is single-threaded,
but also more widely used and tested. Use this engine for the most stable results (for example, no async offset restore). For MySQL and PostgreSQL especially this engine makes the most sense, as MySQL and PostgreSQL Debezium connectors are single-threaded only.

|withAsyncEngine()
|Sets the preferred engine to the async one. This engine is multithreaded (if supported by the connector), but you must be aware of the async nature; for example, offset restore may occur asynchronously after the restart is done, leading to sometimes confusing results.

|setProperty(String, String)
|Sets connector property to given value. There are multiple overloads, allowing to
TomaszGaweda marked this conversation as resolved.
Show resolved Hide resolved
set the value to `long`, `String` or `boolean`.

|===

=== Fault Tolerance

Expand Down Expand Up @@ -79,20 +192,72 @@ For example, a sink mapping CDC data to a `Customer` class and
maintaining a map view of latest known email addresses per customer
(identified by ID) would look like this:

```java
[source,java]
----
Pipeline p = Pipeline.create();
p.readFrom(source)
.withoutTimestamps()
.writeTo(CdcSinks.map("customers",
r -> r.key().toMap().get("id"),
r -> r.value().toObject(Customer.class).email));
```
----

[NOTE]
====
The key and value functions have certain limitations. They can be used to map only to objects which the Hazelcast member can deserialize, which unfortunately doesn't include user code submitted as a part of the job. So in the above example it's OK to have `String` email values, but we wouldn't be able to use `Customer` directly.

If user code has to be used, then the problem can be solved with the help of the User Code Deployment feature. Example configs for that can be seen in our xref:pipelines:cdc-join.adoc#7-start-hazelcast-jet[CDC Join tutorial].
====

== Data types

Hazelcast relies on Debezium, which in turn uses Kafka Connect API such as `Struct` objects. Hazelcast makes conversion to `Map` and `POJO` s easier by providing abstractions such as `RecordPart`. Despite that, it's worth knowing how some database types can or will be mapped to Java types.

[NOTE]
====
Each database type has it's own database type-to-struct type mappings. For specific mappings of this type, please
check out Debezium documentation, for example: link:https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-data-types[MySQL], link:https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-data-types[PostgreSQL], link:https://debezium.io/documentation/reference/stable/connectors/db2.html#db2-data-types[DB2], etc..
====

=== Common datatypes mapping.
[cols="m,a,a"]
|===
|Struct type|Semantic type|Java type

.3+|INT32
|-|int/Integer
|io.debezium.time.Date|java.time.LocalDate / java.util.Date / String `yyyy-MM-dd`
|io.debezium.time.Time|java.time.Duration / String ISO-8601 `PnDTnHnMn.nS`

.5+|INT64
|-|long/Long
|io.debezium.time.Timestamp|java.time.Instant / String `yyyy-MM-dd HH:mm:ss.SSS`
|io.debezium.time.MicroTimestamp|java.time.Instant / String `yyyy-MM-dd HH:mm:ss.SSS`
|io.debezium.time.MicroTime|java.time.Duration / String ISO-8601 `PnDTnHnMn.nS`
|io.debezium.time.NanoTimestamp|java.time.Instant / String `yyyy-MM-dd HH:mm:ss.SSS`
|io.debezium.time.NanoTime|java.time.Duration / String ISO-8601 `PnDTnHnMn.nS`

|FLOAT32|-|float/Float / String
|FLOAT64|-|double/Double / String
|BOOLEAN|-|boolean/Boolean / String
|STRING|-|String

The `RecordPart#value` field contains Debezium's message in a JSON format. This JSON format uses string as date representation,
instead of ints, which are standard in Debezium, but harder to deal with.

[NOTE]
====
We strongly recommend using `time.precision.mode=adaptive` (default).
Using `time.precision.mode=connect` uses `java.util.Date` to represent dates, time, etc. and is less precise.
====

|===

== Migration Tips

Hazelcast {open-source-product-name} has a Debezium CDC connector, but it's based on an older version of Debezium.
Migration to the new connector is straightforward but be aware of the following changes:

Although User Code Deployment has been deprecated, the replacement User Code Namespaces feature does not yet support Jet jobs or pipelines. For now, continue to use the User Code Deployment solution in this scenario.
====
* You should use the `com.hazelcast.enterprise.jet.cdc` package instead of `com.hazelcast.jet.cdc`.
* Artifact names are now `hazelcast-enterprise-cdc-debezium`, `hazelcast-enterprise-cdc-mysql` and `hazelcast-enterprise-cdc-postgres` (instead of `hazelcast-jet-...`).
* Debezium replaced all `whitelist`s with `include list`s and `blacklist`s with `exclude list`s, which we have replicated in our naming; so, for example, use `setTableIncludeList` instead of `setTableWhitelist`. If you are not sure what are the new names the Debezium is using, you can check out their link:https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-connector-properties[MySQL] and link:https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-connector-properties[PostgreSQL] documentation.
43 changes: 31 additions & 12 deletions docs/modules/integrate/pages/connectors.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,36 @@ The Jet API supports more connectors than SQL.
|batch
|N/A

|xref:integrate:cdc-connectors.adoc[DebeziumCdcSources.debezium]
|xref:integrate:legacy-cdc-connectors.adoc[DebeziumCdcSources.debezium] (Legacy)
|hazelcast-jet-cdc-debezium
|streaming
|at-least-once

|xref:integrate:legacy-cdc-connectors.adoc[MySqlCdcSources.mysql] (Legacy)
|hazelcast-jet-cdc-mysql
|streaming
|exactly-once

|xref:integrate:legacy-cdc-connectors.adoc[PostgresCdcSources.postgres] (Legacy)
|hazelcast-jet-cdc-postgres
|streaming
|exactly-once

|xref:integrate:cdc-connectors.adoc[DebeziumCdcSources.debezium] ([.enterprise]*Enterprise*)
|hazelcast-enterprise-cdc-debezium
|streaming
|at-least-once

|xref:integrate:cdc-connectors.adoc[MySqlCdcSources.mysql]
|hazelcast-enterprise-cdc-mysql
|streaming
|exactly-once

|xref:integrate:cdc-connectors.adoc[PostgresCdcSources.postgres]
|hazelcast-enterprise-cdc-postgres
|streaming
|exactly-once

|xref:integrate:elasticsearch-connector.adoc[ElasticSources.elastic]
|hazelcast-jet-elasticsearch-7
|batch
Expand Down Expand Up @@ -150,16 +175,6 @@ The Jet API supports more connectors than SQL.
|streaming
|exactly-once

|xref:integrate:cdc-connectors.adoc[MySqlCdcSources.mysql]
|hazelcast-jet-cdc-mysql
|streaming
|exactly-once

|xref:integrate:cdc-connectors.adoc[PostgresCdcSources.postgres]
|hazelcast-jet-cdc-postgres
|streaming
|exactly-once

|xref:integrate:pulsar-connector.adoc[PulsarSources.pulsarConsumer]
|hazelcast-jet-contrib-pulsar
|streaming
Expand Down Expand Up @@ -270,7 +285,11 @@ The Jet API supports more connectors than SQL.
|N/A

|xref:integrate:cdc-connectors.adoc[CdcSinks.map]
|hazelcast-jet-cdc-debezium
|hazelcast-jet-cdc-debezium (legacy, {open-source-product-name})

or

hazelcast-enterprise-cdc-debezium ({enterprise-product-name})
|streaming
|at-least-once

Expand Down
Loading
Loading