-
Notifications
You must be signed in to change notification settings - Fork 1
/
consumer.go
62 lines (50 loc) · 1.52 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package que
import (
"strconv"
"strings"
"github.com/jcoene/env"
"github.com/nsqio/go-nsq"
)
// A Consumer receives messages on a given topic and channel and
type Consumer struct {
Topic string
Channel string
Count int
Config *nsq.Config
wrapper *wrapper
consumer *nsq.Consumer
}
// Creates a new Consumer with a given topic, channel, concurrency and handler generator. The
// nsq max_in_flight setting defaults to the given concurrency value (you can change it later).
func NewConsumer(topic string, channel string, defaultCount int, generator HandlerGenerator) (c *Consumer) {
config := nsq.NewConfig()
count := getConcurrency(topic, defaultCount)
config.Set("max_in_flight", count)
return &Consumer{
Topic: topic,
Channel: channel,
Count: count,
Config: config,
wrapper: &wrapper{topic, channel, generator},
}
}
// Identifies the consumer using the topic name
func (c *Consumer) Id() string {
return c.Topic
}
// Creates the nsq consumer, adds handler wrapper and connects to nsq to start processing messages.
func (c *Consumer) ConnectToNSQLookupd(lookupdAddr string) (err error) {
if c.consumer, err = nsq.NewConsumer(c.Topic, c.Channel, c.Config); err != nil {
return
}
c.consumer.AddConcurrentHandlers(c.wrapper, c.Count)
return c.consumer.ConnectToNSQLookupd(lookupdAddr)
}
func getConcurrency(key string, fallback int) int {
if s := env.Get("TOPIC_CONCURRENCY_" + strings.ToUpper(key)); s != "" {
if n, err := strconv.Atoi(s); err == nil && n > 0 {
return n
}
}
return fallback
}