This repository has been archived by the owner on Oct 25, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
option.go
242 lines (188 loc) · 5.87 KB
/
option.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
package quark
import (
"context"
"time"
)
// Option is a unit of configuration of a Broker
type Option interface {
apply(*options)
}
type options struct {
providerConfig interface{}
cluster []string
errHandler ErrorHandler
publisher Publisher
eventMux EventMux
eventWriter EventWriter
poolSize int
maxRetries int
maxConnRetries int
retryBackoff time.Duration
connRetryBackoff time.Duration
messageIDFactory IDFactory
workerFactory WorkerFactory
baseMessageSource string
baseMessageContentType string
baseContext context.Context
}
type clusterOption []string
func (o clusterOption) apply(opts *options) {
opts.cluster = o
}
// WithCluster defines a set of addresses a Broker will use
func WithCluster(addr ...string) Option {
return clusterOption(addr)
}
type providerConfigOption struct {
ProviderConfig interface{}
}
func (o providerConfigOption) apply(opts *options) {
opts.providerConfig = o.ProviderConfig
}
// WithProviderConfiguration defines an specific Provider configuration
func WithProviderConfiguration(cfg interface{}) Option {
return providerConfigOption{ProviderConfig: cfg}
}
type errHandlerOption struct {
Handler ErrorHandler
}
func (o errHandlerOption) apply(opts *options) {
opts.errHandler = o.Handler
}
// WithErrorHandler defines an error hook/middleware that will be executed when an error occurs inside
// Quark low-level internals
func WithErrorHandler(handler ErrorHandler) Option {
return errHandlerOption{Handler: handler}
}
type publisherOption struct {
Publisher Publisher
}
func (o publisherOption) apply(opts *options) {
opts.publisher = o.Publisher
}
// WithPublisher defines a global Publisher that will be used to push messages to the ecosystem through
// the EventWriter
func WithPublisher(p Publisher) Option {
return publisherOption{Publisher: p}
}
type eventMuxOption struct {
Mux EventMux
}
func (o eventMuxOption) apply(opts *options) {
opts.eventMux = o.Mux
}
// WithEventMux defines the EventMux that will be used for Broker's operations
func WithEventMux(mux EventMux) Option {
return eventMuxOption{Mux: mux}
}
type eventWriterOption struct {
EventWriter EventWriter
}
func (o eventWriterOption) apply(opts *options) {
opts.eventWriter = o.EventWriter
}
// WithEventWriter defines the global event writer
func WithEventWriter(w EventWriter) Option {
return eventWriterOption{EventWriter: w}
}
type poolSizeOption int
func (o poolSizeOption) apply(opts *options) {
opts.poolSize = int(o)
}
// WithPoolSize defines the global pool size of total Workers
func WithPoolSize(size int) Option {
if size <= 0 {
return poolSizeOption(defaultPoolSize)
}
return poolSizeOption(size)
}
type maxRetriesOption int
func (o maxRetriesOption) apply(opts *options) {
opts.maxRetries = int(o)
}
// WithMaxRetries defines the global maximum ammount of times a publish operations will retry an Event operation
func WithMaxRetries(t int) Option {
if t <= 0 {
return maxRetriesOption(defaultMaxRetries)
}
return maxRetriesOption(t)
}
type maxConnRetriesOption int
func (o maxConnRetriesOption) apply(opts *options) {
opts.maxConnRetries = int(o)
}
// WithMaxConnRetries defines the global maximum ammount of times a consumer worker will retry its operations
func WithMaxConnRetries(t int) Option {
if t <= 0 {
return maxConnRetriesOption(defaultConnRetries)
}
return maxConnRetriesOption(t)
}
type retryBackoffOption time.Duration
func (o retryBackoffOption) apply(opts *options) {
opts.retryBackoff = time.Duration(o)
}
// WithRetryBackoff defines a time duration an EventWriter will wait to execute a write operation
func WithRetryBackoff(backoff time.Duration) Option {
if backoff <= 0 {
return retryBackoffOption(defaultRetryBackoff)
}
return retryBackoffOption(backoff)
}
type connRetryBackoffOption time.Duration
func (o connRetryBackoffOption) apply(opts *options) {
opts.connRetryBackoff = time.Duration(o)
}
// WithConnRetryBackoff defines a time duration a Worker will wait to connect to the specified infrastructure
func WithConnRetryBackoff(backoff time.Duration) Option {
if backoff <= 0 {
return connRetryBackoffOption(defaultRetryBackoff)
}
return connRetryBackoffOption(backoff)
}
type idFactoryOption struct {
Factory IDFactory
}
func (o idFactoryOption) apply(opts *options) {
opts.messageIDFactory = o.Factory
}
// WithMessageIDFactory defines the global Message ID Factory
func WithMessageIDFactory(factory IDFactory) Option {
return idFactoryOption{Factory: factory}
}
type workerFactoryOption struct {
Factory WorkerFactory
}
func (o workerFactoryOption) apply(opts *options) {
opts.workerFactory = o.Factory
}
// WithWorkerFactory defines the global Worker Factory Quark's Supervisor(s) will use to schedule background I/O tasks
func WithWorkerFactory(factory WorkerFactory) Option {
return workerFactoryOption{Factory: factory}
}
type messageSourceOption string
func (o messageSourceOption) apply(opts *options) {
opts.baseMessageSource = string(o)
}
// WithBaseMessageSource defines the global base Message Source to comply with the CNCF CloudEvents specification
func WithBaseMessageSource(s string) Option {
return messageSourceOption(s)
}
type messageTypeOption string
func (o messageTypeOption) apply(opts *options) {
opts.baseMessageContentType = string(o)
}
// WithBaseMessageContentType defines the global base Message Content Type to comply with the CNCF CloudEvents specification
func WithBaseMessageContentType(s string) Option {
return messageTypeOption(s)
}
type baseContextOption struct {
Ctx context.Context
}
func (o baseContextOption) apply(opts *options) {
opts.baseContext = o.Ctx
}
// WithBaseContext defines the context a Broker will use to executes its operations
func WithBaseContext(ctx context.Context) Option {
return baseContextOption{Ctx: ctx}
}