Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Debezium 2.x docs #1149

Open
wants to merge 31 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
21dd6c8
New Debezium docs
TomaszGaweda Jun 27, 2024
1595164
Example changed
TomaszGaweda Jun 27, 2024
0c5e07b
Function overview
TomaszGaweda Jun 27, 2024
bb480f1
More info
TomaszGaweda Jun 27, 2024
17f6ebb
CDC-join
TomaszGaweda Jun 27, 2024
d39cd98
Merge branch 'main' into 5.5/debezium-docs
TomaszGaweda Jun 27, 2024
5e18cce
Fix links
TomaszGaweda Jun 28, 2024
4e0825f
Apply suggestions from code review
TomaszGaweda Jul 8, 2024
4fd775f
Apply suggestions from code review
TomaszGaweda Jul 8, 2024
a25adae
Review comments
TomaszGaweda Jul 9, 2024
3642046
CRs
TomaszGaweda Jul 9, 2024
941f476
Merge branch 'refs/heads/main' into 5.5/debezium-docs
TomaszGaweda Jul 9, 2024
9d68f3c
Post-merge
TomaszGaweda Jul 9, 2024
cf480cb
Merge branch 'main' into 5.5/debezium-docs
TomaszGaweda Jul 16, 2024
28df78e
Remove jar-with-dependencies
TomaszGaweda Jul 17, 2024
f140c77
Merge remote-tracking branch 'origin/5.5/debezium-docs' into 5.5/debe…
TomaszGaweda Jul 17, 2024
1313239
Changed package name
TomaszGaweda Jul 19, 2024
75bf12e
Merge branch 'main' into 5.5/debezium-docs
oliverhowell Jul 29, 2024
47a11bf
CR
TomaszGaweda Aug 20, 2024
d561f2c
Merge remote-tracking branch 'origin/5.5/debezium-docs' into 5.5/debe…
TomaszGaweda Aug 20, 2024
d03fbb3
Added migration guide
TomaszGaweda Aug 22, 2024
e562794
CR Suggestions
TomaszGaweda Sep 5, 2024
14ba5e9
CR
TomaszGaweda Sep 5, 2024
1ecd47d
Merge branch 'main' into 5.5/debezium-docs
TomaszGaweda Sep 5, 2024
9daaafb
Update docs/modules/integrate/pages/cdc-connectors.adoc
TomaszGaweda Sep 9, 2024
8870d99
Update docs/modules/integrate/pages/cdc-connectors.adoc
TomaszGaweda Sep 9, 2024
31448f4
Few changes
TomaszGaweda Sep 12, 2024
0c61046
Merge remote-tracking branch 'origin/5.5/debezium-docs' into 5.5/debe…
TomaszGaweda Sep 12, 2024
a7c0691
Everything can be string after all
TomaszGaweda Sep 12, 2024
5c1f269
Added formatting
TomaszGaweda Sep 12, 2024
e7c65e7
Merge branch 'main' into 5.5/debezium-docs
TomaszGaweda Sep 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/modules/ROOT/nav.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ include::wan:partial$nav.adoc[]
** xref:integrate:database-connectors.adoc[Overview]
** xref:integrate:jdbc-connector.adoc[]
** xref:integrate:cdc-connectors.adoc[]
** xref:integrate:legacy-cdc-connectors.adoc[]
** xref:integrate:elasticsearch-connector.adoc[]
** xref:integrate:mongodb-connector.adoc[]
** xref:integrate:influxdb-connector.adoc[]
Expand Down
130 changes: 99 additions & 31 deletions docs/modules/integrate/pages/cdc-connectors.adoc
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
= CDC Connector
[.enterprise]*Enterprise*

Change Data Capture (CDC) refers to the process of observing changes
made to a database and extracting them in a form usable by other
Expand All @@ -8,50 +9,118 @@ Change Data Capture is especially important to Hazelcast, because it allows
for the _streaming of changes from databases_, which can be efficiently
processed by the Jet engine.

Implementation of CDC in Hazelcast is based on
link:https://debezium.io/[Debezium]. Hazelcast offers a generic Debezium source
which can handle CDC events from link:https://debezium.io/documentation/reference/stable/connectors/index.html[any database supported by Debezium],
but we're also striving to make CDC sources first class citizens in Hazelcast.
The ones for MySQL and PostgreSQL already are.
Implementation of CDC in Hazelcast {enterprise-product-name} is based on
TomaszGaweda marked this conversation as resolved.
Show resolved Hide resolved
link:https://debezium.io/[Debezium 2.x, window=_blank]. Hazelcast offers a generic Debezium source
which can handle CDC events from link:https://debezium.io/documentation/reference/2.7/connectors/index.html[any database supported by Debezium, window=_blank],
However, we're also striving to make CDC sources first class citizens in Hazelcast,
as we have done already for MySQL and PostgreSQL already are.
TomaszGaweda marked this conversation as resolved.
Show resolved Hide resolved

== Installing the Connector

This connector is included in the full and slim distributions of Hazelcast.
This connector is included in the full distribution of Hazelcast {enterprise-product-name}.

=== Maven
To use this connector in a Maven project, add the following entries to the `<dependency>` section of your `pom.xml` file:

Generic connector:

[source,xml]
----
<dependency>
<groupId>com.hazelcast.jet</groupId>
<artifactId>hazelcast-enterprise-cdc-debezium</artifactId>
<version>{full-version}</version>
<classifier>jar-with-dependencies</classifier>
oliverhowell marked this conversation as resolved.
Show resolved Hide resolved
</dependency>
----

MySQL-specific connector:

[source,xml]
----
<dependency>
<groupId>com.hazelcast.jet</groupId>
<artifactId>hazelcast-enterprise-cdc-mysql</artifactId>
<version>{full-version}</version>
<classifier>jar-with-dependencies</classifier>
</dependency>
----
NOTE: MySQL connector does not include the MySQL driver as a dependency.
frant-hartm marked this conversation as resolved.
Show resolved Hide resolved

PostgreSQL-specific connector:

[source,xml]
----
<dependency>
<groupId>com.hazelcast.jet</groupId>
<artifactId>hazelcast-enterprise-cdc-postgres</artifactId>
<version>{full-version}</version>
<classifier>jar-with-dependencies</classifier>
</dependency>
----

== CDC as a Source

We have the following types of CDC sources:
The Java API supports the following types of CDC source:

* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/jet/cdc/DebeziumCdcSources.html[DebeziumCdcSources]:
generic source for all databases supported by Debezium
* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/jet/cdc/mysql/MySqlCdcSources.html[MySqlCdcSources]:
specific, first class Jet CDC source for MySQL databases (also based
on Debezium, but benefiting the full range of convenience Jet can
additionally provide)
* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/jet/cdc/postgres/PostgresCdcSources.html[PostgresCdcSources]:
specific, first class CDC source for PostgreSQL databases (also based
on Debezium, but benefiting the full range of convenience Hazelcast can
additionally provide)
* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/jet/cdc/DebeziumCdcSources.html[DebeziumCdcSources, window=_blank]:
a generic source for all databases supported by Debezium
* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/jet/cdc/mysql/MySqlCdcSources.html[MySqlCdcSources, window=_blank]:
a specific, first class Jet CDC source for MySQL databases (also based
on Debezium, but with the additional benefits provided by Hazelcast
* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/jet/cdc/postgres/PostgresCdcSources.html[PostgresCdcSources, window=_blank]:
a specific, first class CDC source for PostgreSQL databases (also based
on Debezium, but with the additional benefits provided by Hazelcast

For the setting up a streaming source of CDC data is just the matter of pointing it at the right database via configuration:
To set up a streaming source of CDC data, define it using the following configuration:
TomaszGaweda marked this conversation as resolved.
Show resolved Hide resolved

```java
[source,java]
----
Pipeline pipeline = Pipeline.create();
pipeline.readFrom(
MySqlCdcSources.mysql("customers")
.setDatabaseAddress("127.0.0.1")
.setDatabasePort(3306)
.setDatabaseUser("debezium")
.setDatabasePassword("dbz")
.setDatabaseAddress("127.0.0.1", 3306)
.setDatabaseCredentials("debezium", "dbz")
.setClusterName("dbserver1")
.setDatabaseWhitelist("inventory")
.setTableWhitelist("inventory.customers")
.setDatabaseIncludeList("inventory")
.setTableIncludeList("inventory.customers")
.build())
.withNativeTimestamps(0)
.writeTo(Sinks.logger());
```
----

For an example of how to use CDC data see xref:pipelines:cdc.adoc[our tutorial].
MySQL- and PostgreSQL-specific source builders contain methods for all major configuration settings with protection if, for example, mutually exclusive options are not used. If using a generic source builder, refer to the link:https://debezium.io/documentation/reference/stable/index.html[Debezium, window=_blank] documentation
frant-hartm marked this conversation as resolved.
Show resolved Hide resolved

Follow the provided xref:pipelines:cdc.adoc[] tutorial to see how CDC processes change events from a MySQL database.

=== Common source builder functions
[cols="m,a"]
|===
|Method name|Description

|changeRecord()
| Sets output type to `ChangeRecord` - a wrapper, which provides most of the fields in
strongly-typed manner.
TomaszGaweda marked this conversation as resolved.
Show resolved Hide resolved

| json()
| Sets output type to `JSON` - in the result stage, the type will be set to `Map<String, String>`,
where map entry's key is the key of `SourceRecord` in JSON format and value is whole `SourceRecord`'s value in JSON format.
TomaszGaweda marked this conversation as resolved.
Show resolved Hide resolved

|customMapping(RecordMappingFunction<T>)
| Sets the output type to an arbitrary user type, `T`. Mapping from `SourceRecord` to `T` is done using provided function by the connector.
TomaszGaweda marked this conversation as resolved.
Show resolved Hide resolved

|withDefaultEngine()
|Sets the preferred engine to the default (non-async) one. This engine is single-threaded,
but also older and more tested. Use this engine for most stable results (for example, no async offset restore). For MySQL and PostgreSQL especially this engine makes the most sense, as MySQL and PostgreSQL Debezium connectors are single-threaded only.
TomaszGaweda marked this conversation as resolved.
Show resolved Hide resolved

|withAsyncEngine()
|Sets the preferred engine to the async one. This engine is multithreaded (if supported by the connector), but you must be aware of the async nature; for example, offset restore may occur asynchronously after the restart is done, leading to sometimes confusing results.

|setProperty(String, String)
|Sets connector property to given value. There are multiple overloads, allowing to
TomaszGaweda marked this conversation as resolved.
Show resolved Hide resolved
set the value to `long`, `String` or `boolean`.

|===

=== Fault Tolerance

Expand Down Expand Up @@ -79,20 +148,19 @@ For example, a sink mapping CDC data to a `Customer` class and
maintaining a map view of latest known email addresses per customer
(identified by ID) would look like this:

```java
[source,java]
----
Pipeline p = Pipeline.create();
p.readFrom(source)
.withoutTimestamps()
.writeTo(CdcSinks.map("customers",
r -> r.key().toMap().get("id"),
r -> r.value().toObject(Customer.class).email));
```
----

[NOTE]
====
The key and value functions have certain limitations. They can be used to map only to objects which the Hazelcast member can deserialize, which unfortunately doesn't include user code submitted as a part of the job. So in the above example it's OK to have `String` email values, but we wouldn't be able to use `Customer` directly.

If user code has to be used, then the problem can be solved with the help of the User Code Deployment feature. Example configs for that can be seen in our xref:pipelines:cdc-join.adoc#7-start-hazelcast-jet[CDC Join tutorial].

Although User Code Deployment has been deprecated, the replacement User Code Namespaces feature does not yet support Jet jobs or pipelines. For now, continue to use the User Code Deployment solution in this scenario.
====
43 changes: 31 additions & 12 deletions docs/modules/integrate/pages/connectors.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,36 @@ The Jet API supports more connectors than SQL.
|batch
|N/A

|xref:integrate:cdc-connectors.adoc[DebeziumCdcSources.debezium]
|xref:integrate:legacy-cdc-connectors.adoc[DebeziumCdcSources.debezium] (Legacy)
|hazelcast-jet-cdc-debezium
|streaming
|at-least-once

|xref:integrate:legacy-cdc-connectors.adoc[MySqlCdcSources.mysql] (Legacy)
|hazelcast-jet-cdc-mysql
|streaming
|exactly-once

|xref:integrate:legacy-cdc-connectors.adoc[PostgresCdcSources.postgres] (Legacy)
|hazelcast-jet-cdc-postgres
|streaming
|exactly-once

|xref:integrate:cdc-connectors.adoc[DebeziumCdcSources.debezium] ([.enterprise]*Enterprise*)
|hazelcast-enterprise-cdc-debezium
|streaming
|at-least-once

|xref:integrate:cdc-connectors.adoc[MySqlCdcSources.mysql]
|hazelcast-enterprise-cdc-mysql
|streaming
|exactly-once

|xref:integrate:cdc-connectors.adoc[PostgresCdcSources.postgres]
|hazelcast-enterprise-cdc-postgres
|streaming
|exactly-once

|xref:integrate:elasticsearch-connector.adoc[ElasticSources.elastic]
|hazelcast-jet-elasticsearch-7
|batch
Expand Down Expand Up @@ -150,16 +175,6 @@ The Jet API supports more connectors than SQL.
|streaming
|exactly-once

|xref:integrate:cdc-connectors.adoc[MySqlCdcSources.mysql]
|hazelcast-jet-cdc-mysql
|streaming
|exactly-once

|xref:integrate:cdc-connectors.adoc[PostgresCdcSources.postgres]
|hazelcast-jet-cdc-postgres
|streaming
|exactly-once

|xref:integrate:pulsar-connector.adoc[PulsarSources.pulsarConsumer]
|hazelcast-jet-contrib-pulsar
|streaming
Expand Down Expand Up @@ -270,7 +285,11 @@ The Jet API supports more connectors than SQL.
|N/A

|xref:integrate:cdc-connectors.adoc[CdcSinks.map]
|hazelcast-jet-cdc-debezium
|hazelcast-jet-cdc-debezium (legacy, {open-source-product-name})

or

hazelcast-enterprise-cdc-debezium ({enterprise-product-name})
|streaming
|at-least-once

Expand Down
98 changes: 98 additions & 0 deletions docs/modules/integrate/pages/legacy-cdc-connectors.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
= Legacy CDC Connector

Change Data Capture (CDC) refers to the process of observing changes
made to a database and extracting them in a form usable by other
systems, for the purposes of replication, analysis and many more.

Change Data Capture is especially important to Hazelcast, because it allows
for the _streaming of changes from databases_, which can be efficiently
processed by the Jet engine.

Implementation of CDC in Hazelcast {open-source-product-name} is based on
link:https://debezium.io/[Debezium, window=_blank]. Hazelcast offers a generic Debezium source
which can handle CDC events from link:https://debezium.io/documentation/reference/stable/connectors/index.html[any database supported by Debezium, window=_blank].
However, we're also striving to make CDC sources first class citizens in Hazelcast,
as we have done already for MySQL and PostgreSQL already are.
TomaszGaweda marked this conversation as resolved.
Show resolved Hide resolved

== Installing the Connector

This connector is included in the full distribution of Open Source Hazelcast.
TomaszGaweda marked this conversation as resolved.
Show resolved Hide resolved

== CDC as a Source

We have the following types of CDC sources:

* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/jet/cdc/DebeziumCdcSources.html[DebeziumCdcSources, window=_blank]:
a generic source for all databases supported by Debezium
* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/jet/cdc/mysql/MySqlCdcSources.html[MySqlCdcSources, window=_blank]:
a specific, first class Jet CDC source for MySQL databases (also based
on Debezium, but with the additional benefits provided by Hazelcast
* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/jet/cdc/postgres/PostgresCdcSources.html[PostgresCdcSources, window=_blank]:
a specific, first class CDC source for PostgreSQL databases (also based
on Debezium, but with the additional benefits provided by Hazelcast

To set up a streaming source of CDC data, define it using the following configuration:

[source,java]
----
Pipeline pipeline = Pipeline.create();
pipeline.readFrom(
MySqlCdcSources.mysql("customers")
.setDatabaseAddress("127.0.0.1")
.setDatabasePort(3306)
.setDatabaseUser("debezium")
.setDatabasePassword("dbz")
.setClusterName("dbserver1")
.setDatabaseWhitelist("inventory")
.setTableWhitelist("inventory.customers")
.build())
.withNativeTimestamps(0)
.writeTo(Sinks.logger());
----

For an example of how to use CDC data see xref:pipelines:cdc.adoc[our tutorial].
TomaszGaweda marked this conversation as resolved.
Show resolved Hide resolved

=== Fault Tolerance

CDC sources offer at least-once processing guarantees. The source
periodically saves the database write ahead log offset for which it had
dispatched events and in case of a failure/restart it will replay all
events since the last successfully saved offset.
TomaszGaweda marked this conversation as resolved.
Show resolved Hide resolved

Unfortunately, however, there is no guarantee that the last saved offset
is still in the database changelog. Such logs are always finite and
depending on the DB configuration can be relatively short, so if the CDC
source has to replay data for a long period of inactivity, then there
can be a data loss. With careful management though we can say that
at-least once guarantee can practically be provided.
TomaszGaweda marked this conversation as resolved.
Show resolved Hide resolved

== CDC as a Sink

Change data capture is a source-side functionality in Jet, but we also
offer some specialized sinks that simplify applying CDC events to a map, which gives you the ability to reconstruct the contents of the
original database table. The sinks expect to receive `ChangeRecord`
objects and apply your custom functions to them that extract the key and
the value that will be applied to the target map.
TomaszGaweda marked this conversation as resolved.
Show resolved Hide resolved

For example, a sink mapping CDC data to a `Customer` class and
maintaining a map view of latest known email addresses per customer
(identified by ID) would look like this:

[source,java]
----
Pipeline p = Pipeline.create();
p.readFrom(source)
.withoutTimestamps()
.writeTo(CdcSinks.map("customers",
r -> r.key().toMap().get("id"),
r -> r.value().toObject(Customer.class).email));
----

[NOTE]
====
The key and value functions have certain limitations. They can be used to map only to objects which the Hazelcast member can deserialize, which unfortunately doesn't include user code submitted as a part of the job. So in the above example it's OK to have `String` email values, but we wouldn't be able to use `Customer` directly.

If user code has to be used, then the problem can be solved with the help of the User Code Deployment feature. Example configs for that can be seen in our xref:pipelines:cdc-join.adoc#7-start-hazelcast-jet[CDC Join tutorial].

Although User Code Deployment has been deprecated, the replacement User Code Namespaces feature does not yet support Jet jobs or pipelines. For now, continue to use the User Code Deployment solution in this scenario.
====
Loading