A wrapper for Confluent's libraries for Apache Kafka and Schema Registry.
First install dependencies:
go get github.com/confluentinc/confluent-kafka-go github.com/landoop/schema-registry
To install use go get
:
go get github.com/mycujoo/go-kafka-avro/v2
By default this library would fetch configuration from environment variables. But you can customize everything using options.
c, err := kafkaavro.NewConsumer(
[]string{"topic1"},
func(topic string) interface{} {
return val{}
},
kafkaavro.WithKafkaConfig(&kafka.ConfigMap{
"bootstrap.servers": "localhost:29092",
"security.protocol": "ssl",
"socket.keepalive.enable": true,
"enable.auto.commit": false,
"ssl.key.location": "/path/to/service.key",
"ssl.certificate.location": "/path/to/service.cert",
"ssl.ca.location": "/path/to/ca.pem",
"group.id": "some-group-id",
"session.timeout.ms": 6000,
"auto.offset.reset": "earliest",
}),
kafkaavro.WithSchemaRegistryURL(srURL),
kafkaavro.WithEventHandler(func(event kafka.Event) {
log.Println(event)
}),
)
for {
msg, err := c.ReadMessage(5000)
if err != nil {
log.Println("Error", err)
continue
}
if msg == nil {
continue
}
switch v := msg.Value.(type) {
case val:
log.Println(v)
}
}
producer, err := kafkaavro.NewProducer(
"topic",
`"string"`,
`{"type": "record", "name": "test", "fields" : [{"name": "val", "type": "int", "default": 0}]}`,
)
Publish message using Produce
method:
err = producer.Produce("key", "value", nil)
If you provide deliverChan then call will not be blocking until delivery.
Some code for cached schema registry client was based on https://github.com/dangkaka/go-kafka-avro implementation.