This repository has been archived by the owner on Jan 26, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 7
/
protocolBase.py
115 lines (90 loc) · 3.92 KB
/
protocolBase.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
from re import compile
from txzmq import ZmqEndpoint, ZmqFactory, ZmqSubConnection, ZmqEndpointType
import json
import requests
_zmqFactory = ZmqFactory()
class PushjetProtocolBase(object):
_uuidRe = compile(r'^[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}$')
_errorTemplate = '{"error":{"id":%i,"message":"%s"}}'
def __init__(self, apiUri, pubUri):
self.api = apiUri.rstrip('/')
self.zmqEndpoint = ZmqEndpoint(ZmqEndpointType.connect, pubUri)
self.zmq = None
self.uuid = None
self.subscriptions = []
@staticmethod
def isUuid(s):
return bool(PushjetProtocolBase._uuidRe.match(s))
def onZmqMessage(self, data):
tag, message = data.split(' ', 1)
self.sendMessage(message)
decoded = json.loads(message)
if 'message' in decoded:
self.markReadAsync()
if 'subscription' in decoded:
token = decoded['subscription']['service']['public']
if token in self.subscriptions:
self.zmq.unsubscribe(token)
else:
self.zmq.subscribe(token)
def markReadAsync(self):
self.factory.reactor.callFromThread(self.markRead)
def markRead(self):
url = "%s/message?uuid=%s" % (self.api, self.uuid)
data = requests.delete(url).json()
if 'error' in data:
print "Could mark messages read for %s got error %i: %s" % (
self.uuid, data['error']['id'], data['error']['message']
)
def getMessages(self):
url = "%s/message?uuid=%s" % (self.api, self.uuid)
data = requests.get(url).json()
if 'error' in data:
print "Could fetch messages for %s got error %i: %s" % (
self.uuid, data['error']['id'], data['error']['message']
)
return []
return data['messages']
def updateSubscriptionsAsync(self):
self.factory.reactor.callFromThread(self.updateSubscriptions)
def updateSubscriptions(self):
url = "%s/subscription?uuid=%s" % (self.api, self.uuid)
subscriptions = requests.get(url).json()
if 'error' in subscriptions:
print "Could not fetch subscriptions for %s got error %i: %s" % (
self.uuid, subscriptions['error']['id'], subscriptions['error']['message']
)
else:
tokens = [x['service']['public'] for x in subscriptions['subscriptions']]
# Make sure we are always subscriptioning to messages that are meant
# for our client
tokens.append(self.uuid)
unsubscribe = [self.toAscii(x) for x in self.subscriptions if x not in tokens]
subscribe = [self.toAscii(x) for x in tokens if x not in self.subscriptions]
self.subscriptions = tokens
map(self.zmq.unsubscribe, unsubscribe)
map(self.zmq.subscribe, subscribe)
print "Successfully updated subscriptions for %s" % self.uuid
def onClientMessage(self, payload, binary=False):
if binary:
message = self._errorTemplate % (-1, 'Expected text got binary data')
self.sendMessage(message)
elif self.uuid: # Already initialized
return
elif not self.isUuid(payload):
message = self._errorTemplate % (1, 'Invalid client uuid')
self.sendMessage(message)
else: # Initialize ZMQ
self.uuid = payload
self.zmq = ZmqSubConnection(_zmqFactory, self.zmqEndpoint)
self.zmq.gotMessage = self.onZmqMessage
self.updateSubscriptionsAsync()
self.sendMessage('{"status": "ok"}')
msg = self.getMessages()
for m in msg:
self.sendMessage(json.dumps({'message': m}))
@staticmethod
def toAscii(s):
return s.encode('ascii', 'ignore')
def sendMessage(self, message):
raise NotImplementedError()