-
Notifications
You must be signed in to change notification settings - Fork 7
/
consumer.go
193 lines (155 loc) · 4.04 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
package kafkaavro
import (
"encoding/binary"
"encoding/json"
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/pkg/errors"
)
type Consumer struct {
consumer *kafka.Consumer
schemaRegistryClient SchemaRegistryClient
stopChan chan struct{}
pollTimeout int
topics []string
}
type ConsumerMessage struct {
*kafka.Message
Error error
// Message Value parsed into maps/structs
Parsed interface{}
// JSON representation of the message
Textual []byte
}
// NewConsumer is a basic consumer to interact with schema registry, avro and kafka
func NewConsumer(topics []string, consumer *kafka.Consumer, schemaRegistryClient SchemaRegistryClient) (*Consumer, error) {
if topics != nil {
if err := consumer.SubscribeTopics(topics, nil); err != nil {
return nil, err
}
}
return &Consumer{
consumer: consumer,
schemaRegistryClient: schemaRegistryClient,
pollTimeout: 100,
topics: topics,
}, nil
}
func (ac *Consumer) SubscribeTopics(topics []string, rebalanceCb kafka.RebalanceCb) error {
ac.topics = topics
return ac.consumer.SubscribeTopics(topics, rebalanceCb)
}
// Messages returns the ConsumerMessage channel (that contains decoded messages)
// and other events channel for events like kafka.PartitionEOF, kafka.Stats
func (ac *Consumer) Messages(stopChan chan struct{}) (chan ConsumerMessage, chan kafka.Event) {
output := make(chan ConsumerMessage)
other := make(chan kafka.Event)
if ac.stopChan != nil {
// stop channel already open
close(ac.stopChan)
}
ac.stopChan = stopChan
go func() {
run := true
for run {
select {
case <-stopChan:
run = false
default:
ev := ac.consumer.Poll(ac.pollTimeout)
if ev == nil {
continue
}
switch e := ev.(type) {
case *kafka.Message:
parsed, textual, err := ac.decodeAvroBinary(e.Value)
if err != nil {
output <- ConsumerMessage{
Message: e,
Error: err,
}
continue
}
msg := ConsumerMessage{
Message: e,
Parsed: parsed,
Textual: textual,
}
if _, err = json.Marshal(msg.Parsed); err != nil {
msg.Error = err
output <- msg
continue
}
if e.TopicPartition.Topic == nil {
msg.Error = errors.New("null topic")
output <- msg
continue
}
if msg.Parsed == nil {
msg.Error = errors.New("missing parsed data")
output <- msg
continue
}
if _, ok := msg.Parsed.(map[string]interface{}); !ok {
msg.Error = errors.New("parsed data is wrong type")
output <- msg
continue
}
output <- ConsumerMessage{
Message: e,
Parsed: parsed,
Textual: textual,
}
default:
other <- e
}
}
}
}()
return output, other
}
func (ac *Consumer) CommitMessage(msg ConsumerMessage) ([]kafka.TopicPartition, error) {
return ac.consumer.CommitMessage(msg.Message)
}
func (ac *Consumer) Close() {
ac.consumer.Close()
close(ac.stopChan)
}
func (ac *Consumer) decodeAvroBinary(data []byte) (interface{}, []byte, error) {
if data[0] != 0 {
return nil, nil, errors.New("invalid magic byte")
}
schemaId := binary.BigEndian.Uint32(data[1:5])
codec, err := ac.schemaRegistryClient.GetSchemaByID(int(schemaId))
if err != nil {
return nil, nil, err
}
// Convert binary Avro data back to native Go form
native, _, err := codec.NativeFromBinary(data[5:])
if err != nil {
return nil, nil, err
}
textual, err := codec.TextualFromNative(nil, native)
if err != nil {
return nil, nil, err
}
return native, textual, err
}
// EnsureTopics returns error if one of the consumed topics
// was not found on the server.
func (ac *Consumer) EnsureTopics() error {
notFound := make([]string, 0)
meta, err := ac.consumer.GetMetadata(nil, true, 6000)
if err != nil {
return err
}
for _, topic := range ac.topics {
if _, ok := meta.Topics[topic]; !ok {
notFound = append(notFound, topic)
}
}
if len(notFound) > 0 {
return fmt.Errorf("topics not found: %v", notFound)
}
return nil
}