-
Notifications
You must be signed in to change notification settings - Fork 1
/
judo.go
133 lines (124 loc) · 3.19 KB
/
judo.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package judo
import (
"errors"
"fmt"
"github.com/amagimedia/judo/v3/client"
judoMsg "github.com/amagimedia/judo/v3/message"
amagiPub "github.com/amagimedia/judo/v3/protocols/pub/amagipub"
pubnubPub "github.com/amagimedia/judo/v3/protocols/pub/pubnub"
redispub "github.com/amagimedia/judo/v3/protocols/pub/redis"
sidekiqpub "github.com/amagimedia/judo/v3/protocols/pub/sidekiq"
stanpub "github.com/amagimedia/judo/v3/protocols/pub/stan"
judoReply "github.com/amagimedia/judo/v3/protocols/reply"
nanoreq "github.com/amagimedia/judo/v3/protocols/req/nano"
judoSub "github.com/amagimedia/judo/v3/protocols/sub"
"github.com/amagimedia/judo/v3/publisher"
)
func NewSubscriber(protocol, method, primarySubProtocol, backupSubProtocol string) (client.JudoClient, error) {
var sub client.JudoClient
switch protocol {
case "amqp":
switch method {
case "sub":
sub = judoSub.NewAmqpSub()
case "reply":
sub = judoReply.NewAmqpReply()
default:
return sub, errors.New("Invalid Parameters, method: " + method)
}
case "nano":
switch method {
case "sub":
sub = judoSub.NewNanoSub()
case "reply":
sub = judoReply.NewNanoReply()
default:
return sub, errors.New("Invalid Parameters, method: " + method)
}
case "nats":
switch method {
case "sub":
sub = judoSub.NewNatsSub()
case "reply":
sub = judoReply.NewNatsReply()
default:
return sub, errors.New("Invalid Parameters, method: " + method)
}
case "nats-streaming":
switch method {
case "sub":
sub = judoSub.NewNatsStreamSub()
default:
return sub, errors.New("Invalid Parameters, method: " + method)
}
case "redis":
switch method {
case "sub":
sub = judoSub.NewRedisSub()
default:
return sub, errors.New("Invalid Parameters, method: " + method)
}
case "pubnub":
switch method {
case "sub":
sub = judoSub.NewPubnubSub()
default:
return sub, errors.New("Invalid Parameters, method: " + method)
}
case "amagi":
switch method {
case "sub":
sub = judoSub.NewAmagiSub(primarySubProtocol, backupSubProtocol)
default:
return sub, errors.New("Invalid Parameters, method: " + method)
}
default:
return sub, errors.New("Invalid Protocol: " + protocol)
}
sub.OnMessage(func(msg judoMsg.Message) {
fmt.Println("Received : ", msg.GetMessage())
})
return sub, nil
}
func NewPublisher(pubType, pubMethod, primaryPubProtocol, backupPubProtocol string) (publisher.JudoPub, error) {
// Switch on PubType
// Pass Config to Connect and get Publisher Object
// Return Publisher Object
var pub publisher.JudoPub
var err error
switch pubType + "-" + pubMethod {
case "redis-publish":
pub, err = redispub.New()
if err != nil {
return pub, err
}
case "sidekiq-publish":
pub, err = sidekiqpub.New()
if err != nil {
return pub, err
}
case "nano-req":
pub, err = nanoreq.New()
if err != nil {
return pub, err
}
case "nats-publish":
pub, err = stanpub.New()
if err != nil {
return pub, err
}
case "pubnub-publish":
pub, err = pubnubPub.New()
if err != nil {
return pub, err
}
case "amagi-publish":
pub, err = amagiPub.New(primaryPubProtocol, backupPubProtocol)
if err != nil {
return pub, err
}
default:
return nil, nil
}
return pub, err
}