-
Notifications
You must be signed in to change notification settings - Fork 0
/
conveyor.go
114 lines (97 loc) · 3.75 KB
/
conveyor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
// Package conveyor provides an abstraction for message queues, brokers, buses and the sort. It is idiomatic and asynchronous because it uses Go channels everywhere
//
//In another repo there are implementations for redis, rabbitmq, ...
//
//This repository includes an in-memory message broker implementation that is useful for testing
//
// Please, check README.md for an overview https://github.com/leolara/conveyor/README.md
package conveyor
// Broker interface for message brokers
type Broker interface {
// Subscribe to a topic/queue
// + target is the name of what you are subscribing to
// + options are implementation dependant
// returns a Subscription object asynchronously
Subscribe(target string, options ...interface{}) <-chan Subscription
// Publish to a topic/queue
// + target is the name of what you are publishing to
// + msgs is a channel on which you will send SendEnvelop objects
// + options are implementation dependant
// After calling this method you can publish as many messages as necessary using msgs channel
Publish(target string, msgs <-chan SendEnvelop, options ...interface{})
}
// Message contains a body, both ReceiveEnvelope and SendEnvelop extend this
type Message interface {
Body() []byte
}
// ReceiveEnvelope encapsulates a message that is being received
type ReceiveEnvelope interface {
Message
// Ack MUST return a channel to which the message receiver must write once to ACK, writing nil means a to ACK in
// all implementations, other values are implementation dependent
Ack() chan<- interface{}
}
// SendEnvelop encapsulates a message that is being received
type SendEnvelop interface {
Message
// Error MUST return a channel, the broker will write nil on success or an error if failure
Error() chan<- error
}
// Subscription encapsulates a subscription to a topic/queue
type Subscription interface {
// Receive returns a channel to read and receive messages. The returned channel is always the same, so it is not necessary
// to call this method for every read.
Receive() <-chan ReceiveEnvelope
// Unsubscribe lives up to its name
Unsubscribe()
Error() error
}
type message struct {
body []byte
}
func (m message) Body() []byte {
return m.body
}
var _ Message = (*message)(nil)
type sendEnvelop struct {
message
err chan error
}
func (m sendEnvelop) Error() chan<- error {
return m.err
}
var _ SendEnvelop = (*sendEnvelop)(nil)
type receiveEnvelope struct {
message
ack chan<- interface{}
}
func (m receiveEnvelope) Ack() chan<- interface{} {
return m.ack
}
var _ ReceiveEnvelope = (*receiveEnvelope)(nil)
// NewSendEnvelop creates an immutable SendEnvelope.
// Useful when you are sending messages, you do not have to create your own implementation of SendEnvelope.
// It is immutable but contains *references* to body and err, so you should be aware of that.
func NewSendEnvelop(body []byte, err chan error) SendEnvelop {
return sendEnvelop{
message: message{body: body},
err: err,
}
}
// NewReceiveEnvelop creates an immutable ReceiveEnvelope.
// Useful when writing a broker implementation and need to send messages to subscribers, you do not have to create your
// own implementation of ReceiveEnvelope.
// It is immutable but contains *references* to body and ack, so you should be aware of that
func NewReceiveEnvelop(body []byte, ack chan<- interface{}) ReceiveEnvelope {
return receiveEnvelope{
message: message{body: body},
ack: ack,
}
}
// NewReceiveEnvelopCopy does like NewReceiveEnvelop but using a copy of body instead of keeping the reference
// It is immutable but contains a *reference* to ack, so you should be aware of that
func NewReceiveEnvelopCopy(body []byte, ack chan<- interface{}) ReceiveEnvelope {
copied := make([]byte, len(body))
copy(copied, body)
return NewReceiveEnvelop(copied, ack)
}