Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

jdbc sink doesn't write data to target table and no errrors #1415

Open
sunzhaoyang opened this issue Jul 3, 2024 · 0 comments
Open

jdbc sink doesn't write data to target table and no errrors #1415

sunzhaoyang opened this issue Jul 3, 2024 · 0 comments

Comments

@sunzhaoyang
Copy link

Environment

kafka:2.4
zookeeper:2.4
confluentinc-kafka-connect-jdbc-10.7.6

Test

producer

bin/kafka-console-producer.sh --broker-list 172.17.0.229:9092 --topic ob7 --property "parse.key=true" --property "key.separator=:"

send simple message : 5:{"id":5}

watcher

start a watcher and can see this message

docker run -it --net host --rm --name watcher -e ZOOKEEPER_CONNECT=172.17.0.229:2181 -e KAFKA_BROKER=172.17.0.229:9092 -e KAFKA_LISTENERS=PLAINTEXT://172.17.0.229:9092 -e KAFKA_ADVERTISED_LISTENERS=PLAIEXT://172.17.0.229:9092 quay.io/debezium/kafka:2.4 watch-topic -a -k ob7

Using KAFKA_LISTENERS=PLAINTEXT://172.17.0.229:9092 and KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.17.0.229:9092
Using KAFKA_BROKER=172.17.0.229:9092
Contents of topic ob7:
5	{"id":5}

sink jdbc

config

{
    "name": "connect-mysql-sink",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "connection.url": "jdbc:mysql://172.17.0.229:2810/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false",
        "connection.user": "root",
        "connection.password": "xxx",
        "insert.mode": "upsert",
	"delete.enabled": "true",
        "pk.mode": "record_key",
        "pk.fields": "id",
        "auto.create": "true",
        "auto.evolve": "true",
        "topics": "ob7",
        "table.name.format":"es4",
        "transforms": "ExtractField",
        "transforms.ExtractField.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
        "transforms.ExtractField.field": "after",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false"

    }
}

after post. the log seems connect target database successful and fetched the schema already.

ache.kafka.clients.consumer.internals.ConsumerCoordinator:307)
[2024-07-03 10:16:36,923] INFO [connect-mysql-sink|task-0] [Consumer clientId=connector-consumer-connect-mysql-sink-0, groupId=connect-connect-mysql-sink] Adding newly assigned partitions: ob7-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:319)
[2024-07-03 10:16:36,934] INFO [connect-mysql-sink|task-0] [Consumer clientId=connector-consumer-connect-mysql-sink-0, groupId=connect-connect-mysql-sink] Setting offset for partition ob7-0 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[172.17.0.229:9092 (id: 1 rack: null)], epoch=0}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:975)
[2024-07-03 10:17:00,651] INFO [connect-mysql-sink|task-0] JdbcDbWriter Connected (io.confluent.connect.jdbc.sink.JdbcDbWriter:56)
[2024-07-03 10:17:00,721] INFO [connect-mysql-sink|task-0] Checking MySql dialect for existence of TABLE "es4" (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect:595)
[2024-07-03 10:17:00,778] INFO [connect-mysql-sink|task-0] Using MySql dialect TABLE "es4" present (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect:603)
[2024-07-03 10:17:00,824] INFO [connect-mysql-sink|task-0] Checking MySql dialect for type of TABLE "es4" (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect:889)
[2024-07-03 10:17:00,831] INFO [connect-mysql-sink|task-0] Setting metadata for table "es4" to Table{name='"es4"', type=TABLE columns=[Column{'id', isPrimaryKey=true, allowsNull=false, sqlType=INT}]} (io.confluent.connect.jdbc.util.TableDefinitions:64)

but there is no data in the target table , no more logs until 4 minutes later .

[2024-07-03 10:21:26,117] INFO [AdminClient clientId=connect-cluster--shared-admin] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient:977)
[2024-07-03 10:25:26,827] INFO [Producer clientId=connect-cluster--offsets] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient:977)
[2024-07-03 10:25:27,182] INFO [Consumer clientId=connect-cluster--offsets, groupId=connect-cluster] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient:977)
[2024-07-03 10:25:27,671] INFO [Producer clientId=connect-cluster--statuses] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient:977)
[2024-07-03 10:25:27,786] INFO [Consumer clientId=connect-cluster--statuses, groupId=connect-cluster] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient:977)
[2024-07-03 10:25:28,460] INFO [Producer clientId=connect-cluster--configs] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient:977)
@sunzhaoyang sunzhaoyang changed the title jdbc sink didn't write data to target table and no errrors jdbc sink doesn't write data to target table and no errrors Jul 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant