Skip to content

Library to work with Apache Kafka using Avro serialization format

License

Notifications You must be signed in to change notification settings

sjones94549/go-kafka-avro

 
 

Repository files navigation

go-kafka-avro

A wrapper for Confluent's libraries for Apache Kafka and Schema Registry.

Installation

First install dependencies:

go get github.com/confluentinc/confluent-kafka-go github.com/landoop/schema-registry

To install use go get:

go get github.com/mycujoo/go-kafka-avro/v2

Usage

By default this library would fetch configuration from environment variables. But you can customize everything using options.

Consumer

c, err := kafkaavro.NewConsumer(
    []string{"topic1"},
    func(topic string) interface{} {
        return val{}
    },
    kafkaavro.WithKafkaConfig(&kafka.ConfigMap{
        "bootstrap.servers":        "localhost:29092",
        "security.protocol":        "ssl",
        "socket.keepalive.enable":  true,
        "enable.auto.commit":       false,
        "ssl.key.location":         "/path/to/service.key",
        "ssl.certificate.location": "/path/to/service.cert",
        "ssl.ca.location":          "/path/to/ca.pem",
        "group.id":                 "some-group-id",
        "session.timeout.ms":       6000,
        "auto.offset.reset":        "earliest",
    }),
    kafkaavro.WithSchemaRegistryURL(srURL),
    kafkaavro.WithEventHandler(func(event kafka.Event) {
        log.Println(event)
    }),
)

for {
    msg, err := c.ReadMessage(5000)
    if err != nil {
        log.Println("Error", err)
        continue
    }
    if msg == nil {
        continue
    }
    switch v := msg.Value.(type) {
    case val:
        log.Println(v)
    }
}

Producer

producer, err := kafkaavro.NewProducer(
    "topic",
    `"string"`,
    `{"type": "record", "name": "test", "fields" : [{"name": "val", "type": "int", "default": 0}]}`,
)

Publish message using Produce method:

err = producer.Produce("key", "value", nil)

If you provide deliverChan then call will not be blocking until delivery.

Related

Some code for cached schema registry client was based on https://github.com/dangkaka/go-kafka-avro implementation.

About

Library to work with Apache Kafka using Avro serialization format

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Go 100.0%