forked from wardellbagby/HangoutsBot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
handlers.py
217 lines (185 loc) · 9.14 KB
/
handlers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
import logging, shlex, unicodedata, asyncio
from cleverbot import ChatterBotFactory, ChatterBotType
import hangups
from commands import command
words = open("wordlist.txt")
list = []
for line in words:
list.append(line.strip('\n'))
class MessageHandler(object):
"""Handle Hangups conversation events"""
cleversession = -1
dotalk = False
blocked_list = []
def __init__(self, bot, bot_command='/'):
from UtilBot import UtilBot
self.bot = bot
self.bot_command = bot_command
self.util_bot = UtilBot()
factory = ChatterBotFactory()
cleverbotter = factory.create(ChatterBotType.CLEVERBOT)
MessageHandler.cleversession = cleverbotter.create_session()
MessageHandler.blocked_list = []
@staticmethod
def shutup(bot, event):
if bot.conv_settings[event.conv_id] is None:
bot.conv_settings[event.conv_id] = {}
settings = dict(bot.conv_settings[event.conv_id])
settings['clever'] = False
bot.conv_settings[event.conv_id] = settings
@staticmethod
def speakup(bot, event):
if MessageHandler.cleversession is None:
factory = ChatterBotFactory()
cleverbotter = factory.create(ChatterBotType.CLEVERBOT)
MessageHandler.cleversession = cleverbotter.create_session()
if bot.conv_settings[event.conv_id] is None:
bot.conv_settings[event.conv_id] = {}
settings = dict(bot.conv_settings[event.conv_id])
settings['clever'] = True
bot.conv_settings[event.conv_id] = settings
@staticmethod
def word_in_text(word, text):
"""Return True if word is in text"""
# Transliterate unicode characters to ASCII and make everything lowercase
word = unicodedata.normalize('NFKD', word).encode('ascii', 'ignore').decode().lower()
text = unicodedata.normalize('NFKD', text).encode('ascii', 'ignore').decode().lower()
# Replace delimiters in text with whitespace
for delim in '.,:;!?':
text = text.replace(delim, ' ')
return True if word in text.split() else False
@asyncio.coroutine
def handle(self, event):
# Use this to add commands that are based off of what text the user inputs when it isn't a command.
if event.user.is_self or event.user_id in MessageHandler.blocked_list:
return
if event.conv_id not in self.bot.conv_settings:
self.bot.conv_settings[event.conv_id] = {}
try:
muted = self.bot.conv_settings[event.conv_id]['muted']
except KeyError:
muted = False
import commands
commands.unmute(self.bot, event)
try:
clever = self.bot.conv_settings[event.conv_id]['clever']
except KeyError:
clever = False
import commands
commands.shutup(self.bot, event)
event.text = event.text.replace('\xa0', ' ')
textuppers = str(event.text).upper()
if not event.text.startswith('/') and not muted:
from UtilBot import UtilBot
if event.text[0] == '#':
unhashtagged = self.util_bot.unhashtag(event.text)
if unhashtagged != None:
segments = [hangups.ChatMessageSegment(x) if x != '\n' else hangups.ChatMessageSegment('\n',
segment_type=hangups.SegmentType.LINE_BREAK)
for x in unhashtagged]
self.bot.send_message_segments(event.conv, segments)
elif UtilBot.is_haiku(textuppers):
segments = [hangups.ChatMessageSegment('Haiku: ', is_bold=True),
hangups.ChatMessageSegment('\n', hangups.SegmentType.LINE_BREAK)]
lines = UtilBot.convert_to_haiku(event.text)
if lines is not None:
lines = lines.split('\n')
for line in lines:
segments.append(hangups.ChatMessageSegment(line))
segments.append(hangups.ChatMessageSegment('\n', hangups.SegmentType.LINE_BREAK))
segments.pop()
self.bot.send_message_segments(event.conv, segments)
elif "🚮" in str(event.text):
self.bot.send_message(event.conv, "🚮")
elif textuppers.endswith('?!'):
self.bot.send_message(event.conv, "I agree with " + str(event.user.full_name) + '.')
elif "AMERICA" in textuppers:
self.bot.send_message(event.conv, "MURICA!!!!!!!")
elif "MURICA" in textuppers:
self.bot.send_message(event.conv, "Fuck yeah!")
elif (clever or (self.util_bot.nameregex.search(textuppers))) and MessageHandler.cleversession is not None:
self.bot.send_message(event.conv, MessageHandler.cleversession.think(str(event.text[5:])))
"""Handle conversation event"""
if logging.root.level == logging.DEBUG:
event.print_debug()
if not event.user.is_self and event.text:
if event.text.startswith('/'):
# Run command
if event.text[1] == '?':
event.text = "/help"
yield from self.handle_command(event)
else:
# Forward messages
yield from self.handle_forward(event)
# Send automatic replies
yield from self.handle_autoreply(event)
@asyncio.coroutine
def handle_command(self, event):
"""Handle command messages"""
# Test if command handling is enabled
if not self.bot.get_config_suboption(event.conv_id, 'commands_enabled'):
return
# Parse message
line_args = shlex.split(event.text, posix=False)
i = 0
while i < len(line_args):
line_args[i] = line_args[i].strip()
if line_args[i] == '' or line_args[i] == '':
line_args.remove(line_args[i])
else:
i += 1
# Test if command length is sufficient
if len(line_args) < 1:
self.bot.send_message(event.conv,
'{}: Not a valid command.'.format(event.user.full_name))
return
# Test if user has permissions for running command
commands_admin_list = self.bot.get_config_suboption(event.conv_id, 'commands_admin')
if commands_admin_list and line_args[0].lower().replace('/', '') in commands_admin_list:
admins_list = self.bot.get_config_suboption(event.conv_id, 'admins')
if event.user_id.chat_id not in admins_list:
if not self.bot.dev:
self.bot.send_message(event.conv,
'{}: I\'m sorry, Dave. I\'m afraid I can\'t do that.'.format(
event.user.full_name))
return
# Run command
yield from command.run(self.bot, event, *line_args[0:])
@asyncio.coroutine
def handle_forward(self, event):
# Test if message forwarding is enabled
if not self.bot.get_config_suboption(event.conv_id, 'forwarding_enabled'):
return
forward_to_list = self.bot.get_config_suboption(event.conv_id, 'forward_to')
if forward_to_list:
for dst in forward_to_list:
try:
conv = self.bot._conv_list.get(dst)
except KeyError:
continue
# Prepend forwarded message with name of sender
link = 'https://plus.google.com/u/0/{}/about'.format(event.user_id.chat_id)
segments = [hangups.ChatMessageSegment(event.user.full_name, hangups.SegmentType.LINK,
link_target=link, is_bold=True),
hangups.ChatMessageSegment(': ', is_bold=True)]
# Copy original message segments
segments.extend(event.conv_event.segments)
# Append links to attachments (G+ photos) to forwarded message
if event.conv_event.attachments:
segments.append(hangups.ChatMessageSegment('\n', hangups.SegmentType.LINE_BREAK))
segments.extend([hangups.ChatMessageSegment(link, hangups.SegmentType.LINK, link_target=link)
for link in event.conv_event.attachments])
self.bot.send_message_segments(conv, segments)
@asyncio.coroutine
def handle_autoreply(self, event):
"""Handle autoreplies to keywords in messages"""
# Test if autoreplies are enabled
if not self.bot.get_config_suboption(event.conv_id, 'autoreplies_enabled'):
return
autoreplies_list = self.bot.get_config_suboption(event.conv_id, 'autoreplies')
if autoreplies_list:
for kwds, sentence in autoreplies_list:
for kw in kwds:
if self.word_in_text(kw, event.text) or kw == "*":
self.bot.send_message(event.conv, sentence)
break