diff --git a/README.md b/README.md index 3b6bacf..5d91636 100644 --- a/README.md +++ b/README.md @@ -25,8 +25,8 @@ docker pull cricketeerone/apache-kafka-connect:latest-confluent-hub - [Image Details](#image-details) - [Build it locally](#build-it-locally) - [Tutorial](#tutorial) - - [Without Docker](#without-docker) - - [Starting Kafka in Docker](#start-kafka-cluster-in-docker) + - [Without Docker](#without-docker) + - [Starting Kafka in Docker](#start-kafka-cluster-in-docker) - Extra - [Scaling Up](#scaling-up) - [Scaling Out](#scaling-out) @@ -155,9 +155,11 @@ curl -XPUT http://localhost:8083/connectors/console-sink/config -H 'Content-Type "connector.class": "FileStreamSink", "tasks.max": 1, "topics": "input", - "transforms": "MakeMap", + "transforms": "MakeMap,AddPartition", "transforms.MakeMap.type": "org.apache.kafka.connect.transforms.HoistField$Value", "transforms.MakeMap.field" : "line", + "transforms.AddPartition.type": "org.apache.kafka.connect.transforms.InsertField$Value", + "transforms.AddPartition.partition.field" : "partition!", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.storage.StringConverter" }' @@ -166,12 +168,12 @@ curl -XPUT http://localhost:8083/connectors/console-sink/config -H 'Content-Type In the output of _Terminal 2_, you should see something similar to the following. ```text -connect-jib_1 | Struct{line=Morbi eu pharetra dolor. ....} -connect-jib_1 | Struct{line=} -connect-jib_1 | Struct{line=Nullam mauris sapien, vestibulum ....} +connect-jib_1 | Struct{line=Morbi eu pharetra dolor. ....,partition=1} +connect-jib_1 | Struct{line=,partition=1} +connect-jib_1 | Struct{line=Nullam mauris sapien, vestibulum ....,partition=1} ``` -This is the `toString()` representation of Kafka Connect's internal `Struct` class. Since we added a `HoistField$Value` transform, then there is a Structured Object with a field of `line` set to the value of the Kafka message that was read from the lines of the `lipsum.txt` file that was produced in the third step above. +This is the `toString()` representation of Kafka Connect's internal `Struct` class. Since we added a `HoistField$Value` transform, then there is a Structured Object with a field of `line` set to the value of the Kafka message that was read from the lines of the `lipsum.txt` file that was produced in the third step above, as well as a `partition` field set to the consumed record partition. The topic was only created with one partition. To repeat that process, we delete the connector and reset the consumer group. @@ -188,16 +190,27 @@ Re-run above console-producer and `curl -XPUT ...` command, but this time, there ### Scaling up -Redo the tutorial with more input data and partitions and increase `max.tasks` of the connector. +Redo the tutorial with more input data and partitions and increase `max.tasks` of the connector. Notice that the `partition` field in the output may change (you may need to produce data multiple times to randomize the record batches). ### Scaling out -Scaling the workers will require more variables related to the `listeners` properties. Ex. sending a request to one of the worker in the group that is not the leader will return this. (TODO: Document the fix.) - -```shell -{"error_code":500,"message":"Error trying to forward REST request: Error trying to forward REST request: Cannot complete request because of a conflicting operation (e.g. worker rebalance)"}% +Scaling the workers will require adding another container with a unique `CONNECT_ADVERTISED_HOST_NAME` variable. I.e. + +```yml +connect-jib-2: +image: *connect-image +hostname: connect-jib-2 +depends_on: + - kafka +ports: + - '8183:8083' +environment: + <<: *connect-vars + CONNECT_REST_ADVERTISED_HOST_NAME: connect-jib-2 ``` +See [`docker-compose.cluster.yml`](./docker-compose.cluster.yml). It can be ran via `docker compose -f docker-compose.cluster.yml up`. + ## Extending with new Connectors > ***Disclaimer*** It is best to think of this image as a base upon which you can add your own Connectors. Below is the output of the default connector plugins, as provided by Apache Kafka project. diff --git a/docker-compose.cluster.yml b/docker-compose.cluster.yml new file mode 100644 index 0000000..5fa823d --- /dev/null +++ b/docker-compose.cluster.yml @@ -0,0 +1,83 @@ +# Originated from https://github.com/bitnami/bitnami-docker-kafka/blob/0b1b18843b8a5c754a4c6e52a49ac5cf992fa5ed/docker-compose.yml +version: '3' + +x-connect-image: &connect-image cricketeerone/apache-kafka-connect:3.2.0 + +x-connect: &connect-vars + CONNECT_BOOTSTRAP_SERVERS: kafka:29092 + + CONNECT_GROUP_ID: cg_connect + CONNECT_CONFIG_STORAGE_TOPIC: connect-jib_config + CONNECT_OFFSET_STORAGE_TOPIC: connect-jib_offsets + CONNECT_STATUS_STORAGE_TOPIC: connect-jib_status + # Cannot be higher than the number of brokers in the Kafka cluster + CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 + + CONNECT_KEY_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter + CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter + + CONNECT_PLUGIN_PATH: /app/libs + + # Connect client overrides + CONNECT_TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS: 30000 + CONNECT_OFFSET_FLUSH_INTERVAL_MS: 900000 + # CONNECT_CONSUMER_MAX_POLL_RECORDS: 500 default==500 + +services: + zookeeper: + image: bitnami/zookeeper:3.8-debian-11 + ports: + - '2181:2181' + environment: + - ALLOW_ANONYMOUS_LOGIN=yes + volumes: + - 'zookeeper_data:/bitnami/zookeeper' + kafka: + image: bitnami/kafka:3.2.0 + depends_on: + - zookeeper + restart: unless-stopped + ports: + - '9092:9092' + - '29092:29092' + volumes: + - 'kafka_data:/bitnami/kafka' + - $PWD/lipsum.txt:/data/lipsum.txt:ro # Some data to produce + environment: + - ALLOW_PLAINTEXT_LISTENER=yes + - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 + - KAFKA_CFG_DELETE_TOPIC_ENABLE=true + - KAFKA_CFG_LOG_RETENTION_HOURS=48 # 2 days of retention for demo purposes + # https://rmoff.net/2018/08/02/kafka-listeners-explained/ + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + - KAFKA_CFG_LISTENERS=PLAINTEXT://:29092,PLAINTEXT_HOST://0.0.0.0:9092 + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 + + # Jib app + connect-jib-1: + image: *connect-image + hostname: connect-jib-1 + depends_on: + - kafka + ports: + - '8083:8083' # full cluster info accessible from one instance + environment: + <<: *connect-vars + CONNECT_REST_ADVERTISED_HOST_NAME: connect-jib-1 + + connect-jib-2: + image: *connect-image + hostname: connect-jib-2 + depends_on: + - kafka + environment: + <<: *connect-vars + CONNECT_REST_ADVERTISED_HOST_NAME: connect-jib-2 + +volumes: + zookeeper_data: + driver: local + kafka_data: + driver: local diff --git a/docker-compose.yml b/docker-compose.yml index cc29e5f..83996d0 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,6 +1,30 @@ # Originated from https://github.com/bitnami/bitnami-docker-kafka/blob/0b1b18843b8a5c754a4c6e52a49ac5cf992fa5ed/docker-compose.yml version: '3' +x-connect-image: &connect-image cricketeerone/apache-kafka-connect:3.2.0 + +x-connect: &connect-vars + CONNECT_BOOTSTRAP_SERVERS: kafka:29092 + + CONNECT_GROUP_ID: cg_connect + CONNECT_CONFIG_STORAGE_TOPIC: connect-jib_config + CONNECT_OFFSET_STORAGE_TOPIC: connect-jib_offsets + CONNECT_STATUS_STORAGE_TOPIC: connect-jib_status + # Cannot be higher than the number of brokers in the Kafka cluster + CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 + + CONNECT_KEY_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter + CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter + + CONNECT_PLUGIN_PATH: /app/libs + + # Connect client overrides + CONNECT_TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS: 30000 + CONNECT_OFFSET_FLUSH_INTERVAL_MS: 900000 + # CONNECT_CONSUMER_MAX_POLL_RECORDS: 500 default==500 + services: zookeeper: image: bitnami/zookeeper:3.8-debian-11 @@ -32,35 +56,16 @@ services: - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 # Jib app - connect-jib: - image: cricketeerone/apache-kafka-connect:3.2.0 + connect-jib-1: + image: *connect-image + hostname: connect-jib-1 depends_on: - kafka ports: - '8083:8083' environment: - CONNECT_REST_ADVERTISED_HOST_NAME: connect-jib - CONNECT_BOOTSTRAP_SERVERS: kafka:29092 - - CONNECT_GROUP_ID: cg_connect-idea - CONNECT_CONFIG_STORAGE_TOPIC: connect-jib_config - CONNECT_OFFSET_STORAGE_TOPIC: connect-jib_offsets - CONNECT_STATUS_STORAGE_TOPIC: connect-jib_status - # Cannot be higher than the number of brokers in the Kafka cluster - CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 - CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 - CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 - - CONNECT_KEY_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter - CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter - - CONNECT_PLUGIN_PATH: /app/libs - - # Logging - # Connect client overrides - CONNECT_TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS: 30000 - CONNECT_OFFSET_FLUSH_INTERVAL_MS: 900000 - CONNECT_CONSUMER_MAX_POLL_RECORDS: 500 + <<: *connect-vars + CONNECT_REST_ADVERTISED_HOST_NAME: connect-jib-1 volumes: zookeeper_data: