Skip to content

Commit

Permalink
doc: Scaling out solution (#15)
Browse files Browse the repository at this point in the history
  • Loading branch information
OneCricketeer authored Jan 7, 2023
1 parent b3f78e9 commit 4b82a31
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 36 deletions.
37 changes: 25 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ docker pull cricketeerone/apache-kafka-connect:latest-confluent-hub
- [Image Details](#image-details)
- [Build it locally](#build-it-locally)
- [Tutorial](#tutorial)
- [Without Docker](#without-docker)
- [Starting Kafka in Docker](#start-kafka-cluster-in-docker)
- [Without Docker](#without-docker)
- [Starting Kafka in Docker](#start-kafka-cluster-in-docker)
- Extra
- [Scaling Up](#scaling-up)
- [Scaling Out](#scaling-out)
Expand Down Expand Up @@ -155,9 +155,11 @@ curl -XPUT http://localhost:8083/connectors/console-sink/config -H 'Content-Type
"connector.class": "FileStreamSink",
"tasks.max": 1,
"topics": "input",
"transforms": "MakeMap",
"transforms": "MakeMap,AddPartition",
"transforms.MakeMap.type": "org.apache.kafka.connect.transforms.HoistField$Value",
"transforms.MakeMap.field" : "line",
"transforms.AddPartition.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddPartition.partition.field" : "partition!",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
}'
Expand All @@ -166,12 +168,12 @@ curl -XPUT http://localhost:8083/connectors/console-sink/config -H 'Content-Type
In the output of _Terminal 2_, you should see something similar to the following.

```text
connect-jib_1 | Struct{line=Morbi eu pharetra dolor. ....}
connect-jib_1 | Struct{line=}
connect-jib_1 | Struct{line=Nullam mauris sapien, vestibulum ....}
connect-jib_1 | Struct{line=Morbi eu pharetra dolor. ....,partition=1}
connect-jib_1 | Struct{line=,partition=1}
connect-jib_1 | Struct{line=Nullam mauris sapien, vestibulum ....,partition=1}
```

This is the `toString()` representation of Kafka Connect's internal `Struct` class. Since we added a `HoistField$Value` transform, then there is a Structured Object with a field of `line` set to the value of the Kafka message that was read from the lines of the `lipsum.txt` file that was produced in the third step above.
This is the `toString()` representation of Kafka Connect's internal `Struct` class. Since we added a `HoistField$Value` transform, then there is a Structured Object with a field of `line` set to the value of the Kafka message that was read from the lines of the `lipsum.txt` file that was produced in the third step above, as well as a `partition` field set to the consumed record partition. The topic was only created with one partition.

To repeat that process, we delete the connector and reset the consumer group.

Expand All @@ -188,16 +190,27 @@ Re-run above console-producer and `curl -XPUT ...` command, but this time, there

### Scaling up

Redo the tutorial with more input data and partitions and increase `max.tasks` of the connector.
Redo the tutorial with more input data and partitions and increase `max.tasks` of the connector. Notice that the `partition` field in the output may change (you may need to produce data multiple times to randomize the record batches).

### Scaling out

Scaling the workers will require more variables related to the `listeners` properties. Ex. sending a request to one of the worker in the group that is not the leader will return this. (TODO: Document the fix.)

```shell
{"error_code":500,"message":"Error trying to forward REST request: Error trying to forward REST request: Cannot complete request because of a conflicting operation (e.g. worker rebalance)"}%
Scaling the workers will require adding another container with a unique `CONNECT_ADVERTISED_HOST_NAME` variable. I.e.

```yml
connect-jib-2:
image: *connect-image
hostname: connect-jib-2
depends_on:
- kafka
ports:
- '8183:8083'
environment:
<<: *connect-vars
CONNECT_REST_ADVERTISED_HOST_NAME: connect-jib-2
```
See [`docker-compose.cluster.yml`](./docker-compose.cluster.yml). It can be ran via `docker compose -f docker-compose.cluster.yml up`.

## Extending with new Connectors

> ***Disclaimer*** It is best to think of this image as a base upon which you can add your own Connectors. Below is the output of the default connector plugins, as provided by Apache Kafka project.
Expand Down
83 changes: 83 additions & 0 deletions docker-compose.cluster.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# Originated from https://github.com/bitnami/bitnami-docker-kafka/blob/0b1b18843b8a5c754a4c6e52a49ac5cf992fa5ed/docker-compose.yml
version: '3'

x-connect-image: &connect-image cricketeerone/apache-kafka-connect:3.2.0

x-connect: &connect-vars
CONNECT_BOOTSTRAP_SERVERS: kafka:29092

CONNECT_GROUP_ID: cg_connect
CONNECT_CONFIG_STORAGE_TOPIC: connect-jib_config
CONNECT_OFFSET_STORAGE_TOPIC: connect-jib_offsets
CONNECT_STATUS_STORAGE_TOPIC: connect-jib_status
# Cannot be higher than the number of brokers in the Kafka cluster
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1

CONNECT_KEY_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter

CONNECT_PLUGIN_PATH: /app/libs

# Connect client overrides
CONNECT_TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS: 30000
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 900000
# CONNECT_CONSUMER_MAX_POLL_RECORDS: 500 default==500

services:
zookeeper:
image: bitnami/zookeeper:3.8-debian-11
ports:
- '2181:2181'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
volumes:
- 'zookeeper_data:/bitnami/zookeeper'
kafka:
image: bitnami/kafka:3.2.0
depends_on:
- zookeeper
restart: unless-stopped
ports:
- '9092:9092'
- '29092:29092'
volumes:
- 'kafka_data:/bitnami/kafka'
- $PWD/lipsum.txt:/data/lipsum.txt:ro # Some data to produce
environment:
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CFG_DELETE_TOPIC_ENABLE=true
- KAFKA_CFG_LOG_RETENTION_HOURS=48 # 2 days of retention for demo purposes
# https://rmoff.net/2018/08/02/kafka-listeners-explained/
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
- KAFKA_CFG_LISTENERS=PLAINTEXT://:29092,PLAINTEXT_HOST://0.0.0.0:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092

# Jib app
connect-jib-1:
image: *connect-image
hostname: connect-jib-1
depends_on:
- kafka
ports:
- '8083:8083' # full cluster info accessible from one instance
environment:
<<: *connect-vars
CONNECT_REST_ADVERTISED_HOST_NAME: connect-jib-1

connect-jib-2:
image: *connect-image
hostname: connect-jib-2
depends_on:
- kafka
environment:
<<: *connect-vars
CONNECT_REST_ADVERTISED_HOST_NAME: connect-jib-2

volumes:
zookeeper_data:
driver: local
kafka_data:
driver: local
53 changes: 29 additions & 24 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,30 @@
# Originated from https://github.com/bitnami/bitnami-docker-kafka/blob/0b1b18843b8a5c754a4c6e52a49ac5cf992fa5ed/docker-compose.yml
version: '3'

x-connect-image: &connect-image cricketeerone/apache-kafka-connect:3.2.0

x-connect: &connect-vars
CONNECT_BOOTSTRAP_SERVERS: kafka:29092

CONNECT_GROUP_ID: cg_connect
CONNECT_CONFIG_STORAGE_TOPIC: connect-jib_config
CONNECT_OFFSET_STORAGE_TOPIC: connect-jib_offsets
CONNECT_STATUS_STORAGE_TOPIC: connect-jib_status
# Cannot be higher than the number of brokers in the Kafka cluster
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1

CONNECT_KEY_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter

CONNECT_PLUGIN_PATH: /app/libs

# Connect client overrides
CONNECT_TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS: 30000
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 900000
# CONNECT_CONSUMER_MAX_POLL_RECORDS: 500 default==500

services:
zookeeper:
image: bitnami/zookeeper:3.8-debian-11
Expand Down Expand Up @@ -32,35 +56,16 @@ services:
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092

# Jib app
connect-jib:
image: cricketeerone/apache-kafka-connect:3.2.0
connect-jib-1:
image: *connect-image
hostname: connect-jib-1
depends_on:
- kafka
ports:
- '8083:8083'
environment:
CONNECT_REST_ADVERTISED_HOST_NAME: connect-jib
CONNECT_BOOTSTRAP_SERVERS: kafka:29092

CONNECT_GROUP_ID: cg_connect-idea
CONNECT_CONFIG_STORAGE_TOPIC: connect-jib_config
CONNECT_OFFSET_STORAGE_TOPIC: connect-jib_offsets
CONNECT_STATUS_STORAGE_TOPIC: connect-jib_status
# Cannot be higher than the number of brokers in the Kafka cluster
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1

CONNECT_KEY_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter

CONNECT_PLUGIN_PATH: /app/libs

# Logging
# Connect client overrides
CONNECT_TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS: 30000
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 900000
CONNECT_CONSUMER_MAX_POLL_RECORDS: 500
<<: *connect-vars
CONNECT_REST_ADVERTISED_HOST_NAME: connect-jib-1

volumes:
zookeeper_data:
Expand Down

0 comments on commit 4b82a31

Please sign in to comment.