Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Debezium 2.x docs #1149

Merged
merged 35 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
21dd6c8
New Debezium docs
TomaszGaweda Jun 27, 2024
1595164
Example changed
TomaszGaweda Jun 27, 2024
0c5e07b
Function overview
TomaszGaweda Jun 27, 2024
bb480f1
More info
TomaszGaweda Jun 27, 2024
17f6ebb
CDC-join
TomaszGaweda Jun 27, 2024
d39cd98
Merge branch 'main' into 5.5/debezium-docs
TomaszGaweda Jun 27, 2024
5e18cce
Fix links
TomaszGaweda Jun 28, 2024
4e0825f
Apply suggestions from code review
TomaszGaweda Jul 8, 2024
4fd775f
Apply suggestions from code review
TomaszGaweda Jul 8, 2024
a25adae
Review comments
TomaszGaweda Jul 9, 2024
3642046
CRs
TomaszGaweda Jul 9, 2024
941f476
Merge branch 'refs/heads/main' into 5.5/debezium-docs
TomaszGaweda Jul 9, 2024
9d68f3c
Post-merge
TomaszGaweda Jul 9, 2024
cf480cb
Merge branch 'main' into 5.5/debezium-docs
TomaszGaweda Jul 16, 2024
28df78e
Remove jar-with-dependencies
TomaszGaweda Jul 17, 2024
f140c77
Merge remote-tracking branch 'origin/5.5/debezium-docs' into 5.5/debe…
TomaszGaweda Jul 17, 2024
1313239
Changed package name
TomaszGaweda Jul 19, 2024
75bf12e
Merge branch 'main' into 5.5/debezium-docs
oliverhowell Jul 29, 2024
47a11bf
CR
TomaszGaweda Aug 20, 2024
d561f2c
Merge remote-tracking branch 'origin/5.5/debezium-docs' into 5.5/debe…
TomaszGaweda Aug 20, 2024
d03fbb3
Added migration guide
TomaszGaweda Aug 22, 2024
e562794
CR Suggestions
TomaszGaweda Sep 5, 2024
14ba5e9
CR
TomaszGaweda Sep 5, 2024
1ecd47d
Merge branch 'main' into 5.5/debezium-docs
TomaszGaweda Sep 5, 2024
9daaafb
Update docs/modules/integrate/pages/cdc-connectors.adoc
TomaszGaweda Sep 9, 2024
8870d99
Update docs/modules/integrate/pages/cdc-connectors.adoc
TomaszGaweda Sep 9, 2024
31448f4
Few changes
TomaszGaweda Sep 12, 2024
0c61046
Merge remote-tracking branch 'origin/5.5/debezium-docs' into 5.5/debe…
TomaszGaweda Sep 12, 2024
a7c0691
Everything can be string after all
TomaszGaweda Sep 12, 2024
5c1f269
Added formatting
TomaszGaweda Sep 12, 2024
e7c65e7
Merge branch 'main' into 5.5/debezium-docs
TomaszGaweda Sep 20, 2024
c2afb09
Apply suggestions from code review
TomaszGaweda Oct 7, 2024
f85e53c
Apply suggestions from code review
TomaszGaweda Oct 8, 2024
2cfdd2d
Apply suggestions from code review
TomaszGaweda Oct 8, 2024
2b8a37b
Merge branch 'main' into 5.5/debezium-docs
oliverhowell Oct 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 80 additions & 13 deletions docs/modules/integrate/pages/cdc-connectors.adoc
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
= CDC Connector
[.enterprise]*Enterprise*

Change Data Capture (CDC) refers to the process of observing changes
TomaszGaweda marked this conversation as resolved.
Show resolved Hide resolved
made to a database and extracting them in a form usable by other
Expand All @@ -8,15 +9,49 @@ Change Data Capture is especially important to Hazelcast, because it allows
for the _streaming of changes from databases_, which can be efficiently
processed by the Jet engine.

Implementation of CDC in Hazelcast is based on
link:https://debezium.io/[Debezium]. Hazelcast offers a generic Debezium source
which can handle CDC events from link:https://debezium.io/documentation/reference/stable/connectors/index.html[any database supported by Debezium],
Implementation of CDC in Hazelcast Enterprise is based on
link:https://debezium.io/[Debezium 2.x]. Hazelcast offers a generic Debezium source
which can handle CDC events from link:https://debezium.io/documentation/reference/2.7/connectors/index.html[any database supported by Debezium],
but we're also striving to make CDC sources first class citizens in Hazelcast.
The ones for MySQL and PostgreSQL already are.

== Installing the Connector
TomaszGaweda marked this conversation as resolved.
Show resolved Hide resolved

This connector is included in the full and slim distributions of Hazelcast.
This connector is included in the full distribution of Hazelcast Enterprise.

=== Maven
For using this connector inside Maven project you can add following entries into `pom.xml`'s `<dependencies>` section:

Generic connector:
```xml
<dependency>
<groupId>com.hazelcast.jet</groupId>
<artifactId>hazelcast-enterprise-cdc-debezium</artifactId>
<version>{full-version}</version>
<classifier>jar-with-dependencies</classifier>
oliverhowell marked this conversation as resolved.
Show resolved Hide resolved
</dependency>
```

MySQL-specific connector:
```xml
<dependency>
<groupId>com.hazelcast.jet</groupId>
<artifactId>hazelcast-enterprise-cdc-mysql</artifactId>
<version>{full-version}</version>
<classifier>jar-with-dependencies</classifier>
</dependency>
```
Note: MySQL connector does not include MySQL driver as a dependency.

PostgreSQL-specific connector:
```xml
<dependency>
<groupId>com.hazelcast.jet</groupId>
<artifactId>hazelcast-enterprise-cdc-postgres</artifactId>
<version>{full-version}</version>
<classifier>jar-with-dependencies</classifier>
</dependency>
```

== CDC as a Source
TomaszGaweda marked this conversation as resolved.
Show resolved Hide resolved

Expand All @@ -39,28 +74,62 @@ For the setting up a streaming source of CDC data is just the matter of pointing
Pipeline pipeline = Pipeline.create();
pipeline.readFrom(
MySqlCdcSources.mysql("customers")
.setDatabaseAddress("127.0.0.1")
.setDatabasePort(3306)
.setDatabaseUser("debezium")
.setDatabasePassword("dbz")
.setDatabaseAddress("127.0.0.1", 3306)
.setDatabaseCredentials("debezium", "dbz")
.setClusterName("dbserver1")
.setDatabaseWhitelist("inventory")
.setTableWhitelist("inventory.customers")
.setDatabaseIncludeList("inventory")
.setTableIncludeList("inventory.customers")
.build())
.withNativeTimestamps(0)
.writeTo(Sinks.logger());
```

MySQL- and PostgreSQL-specific source builders contain methods for all major configuration setting and it guards if
e.g. mutually exclusive options are not used. For generic source builder user must rely on Debezium's documentation
to provide all necessary options.

For an example of how to use CDC data see xref:pipelines:cdc.adoc[our tutorial].

=== Common source builder functions
[cols="m,a"]
|===
|Method name|Description

|changeRecord()
| Sets output type to ChangeRecord - a wrapper, providing most of the fields in
strongly-typed manner.

| json()
| Sets output type to JSON - result stage will have `Map<String, String>` as it's type,
where key is SourceRecord's key in json format and value is whole SourceRecord's value in json string.

|customMapping(RecordMappingFunction<T>)
| Sets output type to some arbitrary user type `T`. Mapping from `SourceRecord` to `T` will
be done using provided function.

|withDefaultEngine()
|Sets preferred engine to default (non-async) one. This engine is single-threaded,
but also older and more tested. For most stable results (e.g. no async offset restore) this engine should be preferred. For MySQL and PostgreSQL especially this engine makes the most sense, as MySQL and PostgreSQL Debezium connectors are single-threaded only.

|withAsyncEngine()
|Sets preferred engine to async one. This engine is multithreaded (if connector supports
it), but you must be aware of it's async nature, e.g. offset restore after restart is done
asynchronously as well, leading to sometimes confusing results.

|setProperty(String, String)
|Sets connector property to given value. There are multiple overloads, allowing to
TomaszGaweda marked this conversation as resolved.
Show resolved Hide resolved
sets value to some `long`, `String` or `boolean`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this one, "overloads" are not referred to elsewhere.

I have offered an alternative, but if I have not understood correctly, let me know!

TomaszGaweda marked this conversation as resolved.
Show resolved Hide resolved

|===

=== Fault Tolerance
TomaszGaweda marked this conversation as resolved.
Show resolved Hide resolved

CDC sources offer at least-once processing guarantees. The source
periodically saves the database write ahead log offset for which it had
dispatched events and in case of a failure/restart it will replay all
events since the last successfully saved offset.

Unfortunately, however, there is no guarantee that the last saved offset
Unfortunately, however, there is no guaran`tee that the last saved offset
TomaszGaweda marked this conversation as resolved.
Show resolved Hide resolved
is still in the database changelog. Such logs are always finite and
depending on the DB configuration can be relatively short, so if the CDC
source has to replay data for a long period of inactivity, then there
Expand Down Expand Up @@ -93,6 +162,4 @@ p.readFrom(source)
The key and value functions have certain limitations. They can be used to map only to objects which the Hazelcast member can deserialize, which unfortunately doesn't include user code submitted as a part of the job. So in the above example it's OK to have `String` email values, but we wouldn't be able to use `Customer` directly.

If user code has to be used, then the problem can be solved with the help of the User Code Deployment feature. Example configs for that can be seen in our xref:pipelines:cdc-join.adoc#7-start-hazelcast-jet[CDC Join tutorial].

Although User Code Deployment has been deprecated, the replacement User Code Namespaces feature does not yet support Jet jobs or pipelines. For now, continue to use the User Code Deployment solution in this scenario.
====
43 changes: 31 additions & 12 deletions docs/modules/integrate/pages/connectors.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,36 @@ The Jet API supports more connectors than SQL.
|batch
|N/A

|xref:integrate:cdc-connectors.adoc[DebeziumCdcSources.debezium]
|xref:integrate:legacy-cdc-connectors.adoc[DebeziumCdcSources.debezium] (Legacy)
|hazelcast-jet-cdc-debezium
|streaming
|at-least-once

|xref:integrate:legacy-cdc-connectors.adoc[MySqlCdcSources.mysql] (Legacy)
|hazelcast-jet-cdc-mysql
|streaming
|exactly-once

|xref:integrate:legacy-cdc-connectors.adoc[PostgresCdcSources.postgres] (Legacy)
|hazelcast-jet-cdc-postgres
|streaming
|exactly-once

|xref:integrate:cdc-connectors.adoc[DebeziumCdcSources.debezium] ([.enterprise]*Enterprise*)
|hazelcast-enterprise-cdc-debezium
|streaming
|at-least-once

|xref:integrate:cdc-connectors.adoc[MySqlCdcSources.mysql]
|hazelcast-enterprise-cdc-mysql
|streaming
|exactly-once

|xref:integrate:cdc-connectors.adoc[PostgresCdcSources.postgres]
|hazelcast-enterprise-cdc-postgres
|streaming
|exactly-once

|xref:integrate:elasticsearch-connector.adoc[ElasticSources.elastic]
|hazelcast-jet-elasticsearch-7
|batch
Expand Down Expand Up @@ -150,16 +175,6 @@ The Jet API supports more connectors than SQL.
|streaming
|exactly-once

|xref:integrate:cdc-connectors.adoc[MySqlCdcSources.mysql]
|hazelcast-jet-cdc-mysql
|streaming
|exactly-once

|xref:integrate:cdc-connectors.adoc[PostgresCdcSources.postgres]
|hazelcast-jet-cdc-postgres
|streaming
|exactly-once

|xref:integrate:pulsar-connector.adoc[PulsarSources.pulsarConsumer]
|hazelcast-jet-contrib-pulsar
|streaming
Expand Down Expand Up @@ -258,7 +273,11 @@ The Jet API supports more connectors than SQL.
|N/A

|xref:integrate:cdc-connectors.adoc[CdcSinks.map]
|hazelcast-jet-cdc-debezium
|hazelcast-jet-cdc-debezium (legacy, OS)
TomaszGaweda marked this conversation as resolved.
Show resolved Hide resolved

or

hazelcast-enterprise-cdc-debezium (EE)
TomaszGaweda marked this conversation as resolved.
Show resolved Hide resolved
|streaming
|at-least-once

Expand Down
98 changes: 98 additions & 0 deletions docs/modules/integrate/pages/legacy-cdc-connectors.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
= Legacy CDC Connector

Change Data Capture (CDC) refers to the process of observing changes
TomaszGaweda marked this conversation as resolved.
Show resolved Hide resolved
made to a database and extracting them in a form usable by other
systems, for the purposes of replication, analysis and many more.

Change Data Capture is especially important to Hazelcast, because it allows
for the _streaming of changes from databases_, which can be efficiently
processed by the Jet engine.

Implementation of CDC in Hazelcast is based on
link:https://debezium.io/[Debezium]. Hazelcast offers a generic Debezium source
which can handle CDC events from link:https://debezium.io/documentation/reference/stable/connectors/index.html[any database supported by Debezium],
but we're also striving to make CDC sources first class citizens in Hazelcast.
The ones for MySQL and PostgreSQL already are.

== Installing the Connector
TomaszGaweda marked this conversation as resolved.
Show resolved Hide resolved

This connector is included in the full distribution of Open Source Hazelcast.
TomaszGaweda marked this conversation as resolved.
Show resolved Hide resolved

== CDC as a Source
TomaszGaweda marked this conversation as resolved.
Show resolved Hide resolved

We have the following types of CDC sources:

* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/jet/cdc/DebeziumCdcSources.html[DebeziumCdcSources]:
generic source for all databases supported by Debezium
* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/jet/cdc/mysql/MySqlCdcSources.html[MySqlCdcSources]:
specific, first class Jet CDC source for MySQL databases (also based
on Debezium, but benefiting the full range of convenience Jet can
additionally provide)
* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/jet/cdc/postgres/PostgresCdcSources.html[PostgresCdcSources]:
specific, first class CDC source for PostgreSQL databases (also based
on Debezium, but benefiting the full range of convenience Hazelcast can
additionally provide)

For the setting up a streaming source of CDC data is just the matter of pointing it at the right database via configuration:

```java
Pipeline pipeline = Pipeline.create();
pipeline.readFrom(
MySqlCdcSources.mysql("customers")
.setDatabaseAddress("127.0.0.1")
.setDatabasePort(3306)
.setDatabaseUser("debezium")
.setDatabasePassword("dbz")
.setClusterName("dbserver1")
.setDatabaseWhitelist("inventory")
.setTableWhitelist("inventory.customers")
.build())
.withNativeTimestamps(0)
.writeTo(Sinks.logger());
```

For an example of how to use CDC data see xref:pipelines:cdc.adoc[our tutorial].
TomaszGaweda marked this conversation as resolved.
Show resolved Hide resolved
TomaszGaweda marked this conversation as resolved.
Show resolved Hide resolved

=== Fault Tolerance
TomaszGaweda marked this conversation as resolved.
Show resolved Hide resolved

CDC sources offer at least-once processing guarantees. The source
periodically saves the database write ahead log offset for which it had
dispatched events and in case of a failure/restart it will replay all
events since the last successfully saved offset.
TomaszGaweda marked this conversation as resolved.
Show resolved Hide resolved

Unfortunately, however, there is no guarantee that the last saved offset
is still in the database changelog. Such logs are always finite and
depending on the DB configuration can be relatively short, so if the CDC
source has to replay data for a long period of inactivity, then there
can be a data loss. With careful management though we can say that
at-least once guarantee can practically be provided.
TomaszGaweda marked this conversation as resolved.
Show resolved Hide resolved

== CDC as a Sink
TomaszGaweda marked this conversation as resolved.
Show resolved Hide resolved

Change data capture is a source-side functionality in Jet, but we also
offer some specialized sinks that simplify applying CDC events to a map, which gives you the ability to reconstruct the contents of the
original database table. The sinks expect to receive `ChangeRecord`
objects and apply your custom functions to them that extract the key and
the value that will be applied to the target map.
TomaszGaweda marked this conversation as resolved.
Show resolved Hide resolved

For example, a sink mapping CDC data to a `Customer` class and
maintaining a map view of latest known email addresses per customer
(identified by ID) would look like this:

```java
Pipeline p = Pipeline.create();
p.readFrom(source)
.withoutTimestamps()
.writeTo(CdcSinks.map("customers",
r -> r.key().toMap().get("id"),
r -> r.value().toObject(Customer.class).email));
```

[NOTE]
====
The key and value functions have certain limitations. They can be used to map only to objects which the Hazelcast member can deserialize, which unfortunately doesn't include user code submitted as a part of the job. So in the above example it's OK to have `String` email values, but we wouldn't be able to use `Customer` directly.

If user code has to be used, then the problem can be solved with the help of the User Code Deployment feature. Example configs for that can be seen in our xref:pipelines:cdc-join.adoc#7-start-hazelcast-jet[CDC Join tutorial].

Although User Code Deployment has been deprecated, the replacement User Code Namespaces feature does not yet support Jet jobs or pipelines. For now, continue to use the User Code Deployment solution in this scenario.
====
1 change: 1 addition & 0 deletions docs/modules/integrate/partials/nav.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
*** xref:integrate:database-connectors.adoc[Overview]
*** xref:integrate:jdbc-connector.adoc[]
*** xref:integrate:cdc-connectors.adoc[]
*** xref:integrate:legacy-cdc-connectors.adoc[]
*** xref:integrate:elasticsearch-connector.adoc[]
*** xref:integrate:mongodb-connector.adoc[]
*** xref:integrate:influxdb-connector.adoc[]
Expand Down
27 changes: 16 additions & 11 deletions docs/modules/pipelines/pages/cdc-database-setup.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,14 @@ config file. See MySQL Reference Manual on how to do that
link:https://dev.mysql.com/doc/refman/8.0/en/option-files.html[8.0]). For
example:

```
[source]
----
server-id = 223344
log_bin = mysql-bin
binlog_format = ROW
binlog_row_image = FULL
expire_logs_days = 10
```
----

The semantics of these options are as follows:

Expand All @@ -62,9 +63,10 @@ link:https://dev.mysql.com/doc/refman/8.0/en/show-variables.html[8.0]).
It's worth pointing out that the names of the options sometimes differ
from the names of the MySQL system variables they set. For example:

```
[source]
----
SHOW VARIABLES LIKE 'server_id';
```
----

=== Configure Session Timeouts

Expand Down Expand Up @@ -121,15 +123,16 @@ configuration options to be set accordingly. This can be done either by

The important properties to set are:

```properties
[source,properties]
----
# MODULES
shared_preload_libraries = 'decoderbufs,wal2json'

# REPLICATION
wal_level = logical
max_wal_senders = 1
max_replication_slots = 1
```
----

`shared_preload_libraries` contains a comma separated list of installed
output plug-ins. `wal_levels` is used to tell the server to use logical
Expand All @@ -152,9 +155,10 @@ permissions. The permissions needed are `REPLICATION` and `LOGIN`.
For setting up database users/roles see the link:https://www.postgresql.org/docs/9.6/user-manag.html[PostgreSQL documentation], but
basically the essential command is:

```
[source,sql]
----
CREATE ROLE name REPLICATION LOGIN;
```
----

Note: database super-users already have all the permissions needed by
replication too.
Expand All @@ -166,11 +170,12 @@ PostgreSQL server needs to allow access from the host the CDC connector
is running on. To specify such link:https://www.postgresql.org/docs/9.6/auth-pg-hba-conf.html[client authentication]
options add following lines to the end of the `pg_hba.conf` file:

```
[source]
----
local replication user trust
host replication user 127.0.0.1/32 trust
host replication user ::1/128 trust
```
----

This example tells the server to allow replication for the specified
user locally or on `localhost`, using IPv4 or IPv6.
Expand Down Expand Up @@ -252,7 +257,7 @@ shouldn't affect normal operations too severely.

==== Failure Tolerance
TomaszGaweda marked this conversation as resolved.
Show resolved Hide resolved

PostgreSQL failure tolerance associated with replication slots is
PostgreSQL's failure tolerance associated with replication slots is
somewhat lacking in certain aspects. The CDC connector can quite nicely
deal with its own restart or connection loss to the primary database,
but only as long as replication slots remain intact. Replication
Expand Down
Loading