forked from lich0821/WeChatRobot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
robot.py
265 lines (228 loc) · 10.1 KB
/
robot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
# -*- coding: utf-8 -*-
import logging
import re
import time
import xml.etree.ElementTree as ET
from queue import Empty
from threading import Thread
from wcferry import Wcf, WxMsg
# from base.func_zhipu import ZhiPu
# from base.func_bard import BardAssistant
# from base.func_chatglm import ChatGLM
# from base.func_chatgpt import ChatGPT
# from base.func_chengyu import cy
# from base.func_news import News
# from base.func_tigerbot import TigerBot
# from base.func_xinghuo_web import XinghuoWeb
from configuration import Config
from constants import ChatType
from job_mgmt import Job
__version__ = "39.0.10.1"
class Robot(Job):
"""个性化自己的机器人
"""
def __init__(self, config: Config, wcf: Wcf, chat_type: int) -> None:
self.wcf = wcf
self.config = config
self.LOG = logging.getLogger("Robot")
self.wxid = self.wcf.get_self_wxid()
self.allContacts = self.getAllContacts()
#TODO 添加一个长期记忆存储器,使用向量数据库,针对每一个微信用户的聊天内容长期存储
# if ChatType.is_in_chat_types(chat_type):
# if chat_type == ChatType.TIGER_BOT.value and TigerBot.value_check(self.config.TIGERBOT):
# self.chat = TigerBot(self.config.TIGERBOT)
# elif chat_type == ChatType.CHATGPT.value and ChatGPT.value_check(self.config.CHATGPT):
# self.chat = ChatGPT(self.config.CHATGPT)
# elif chat_type == ChatType.XINGHUO_WEB.value and XinghuoWeb.value_check(self.config.XINGHUO_WEB):
# self.chat = XinghuoWeb(self.config.XINGHUO_WEB)
# elif chat_type == ChatType.CHATGLM.value and ChatGLM.value_check(self.config.CHATGLM):
# self.chat = ChatGLM(self.config.CHATGLM)
# elif chat_type == ChatType.BardAssistant.value and BardAssistant.value_check(self.config.BardAssistant):
# self.chat = BardAssistant(self.config.BardAssistant)
# elif chat_type == ChatType.ZhiPu.value and ZhiPu.value_check(self.config.ZHIPU):
# self.chat = ZhiPu(self.config.ZHIPU)
# else:
# self.LOG.warning("未配置模型")
# self.chat = None
# else:
# if TigerBot.value_check(self.config.TIGERBOT):
# self.chat = TigerBot(self.config.TIGERBOT)
# elif ChatGPT.value_check(self.config.CHATGPT):
# self.chat = ChatGPT(self.config.CHATGPT)
# elif XinghuoWeb.value_check(self.config.XINGHUO_WEB):
# self.chat = XinghuoWeb(self.config.XINGHUO_WEB)
# elif ChatGLM.value_check(self.config.CHATGLM):
# self.chat = ChatGLM(self.config.CHATGLM)
# elif BardAssistant.value_check(self.config.BardAssistant):
# self.chat = BardAssistant(self.config.BardAssistant)
# elif ZhiPu.value_check(self.config.ZhiPu):
# self.chat = ZhiPu(self.config.ZhiPu)
# else:
# self.LOG.warning("未配置模型")
# self.chat = None
# self.LOG.info(f"已选择: {self.chat}")
@staticmethod
def value_check(args: dict) -> bool:
if args:
return all(value is not None for key, value in args.items() if key != 'proxy')
return False
def toAt(self, msg: WxMsg) -> bool:
"""处理被 @ 消息
:param msg: 微信消息结构
:return: 处理状态,`True` 成功,`False` 失败
"""
return self.toChitchat(msg)
# def toChengyu(self, msg: WxMsg) -> bool:
# """
# 处理成语查询/接龙消息
# :param msg: 微信消息结构
# :return: 处理状态,`True` 成功,`False` 失败
# """
# status = False
# texts = re.findall(r"^([#|?|?])(.*)$", msg.content)
# # [('#', '天天向上')]
# if texts:
# flag = texts[0][0]
# text = texts[0][1]
# if flag == "#": # 接龙
# if cy.isChengyu(text):
# rsp = cy.getNext(text)
# if rsp:
# self.sendTextMsg(rsp, msg.roomid)
# status = True
# elif flag in ["?", "?"]: # 查词
# if cy.isChengyu(text):
# rsp = cy.getMeaning(text)
# if rsp:
# self.sendTextMsg(rsp, msg.roomid)
# status = True
# return status
def toChitchat(self, msg: WxMsg) -> bool:
"""闲聊,接入 ChatGPT
"""
if not self.chat: # 没接 ChatGPT,固定回复
rsp = "你@我干嘛?"
else: # 接了 ChatGPT,智能回复
q = re.sub(r"@.*?[\u2005|\s]", "", msg.content).replace(" ", "")
rsp = self.chat.get_answer(q, (msg.roomid if msg.from_group() else msg.sender))
if rsp:
if msg.from_group():
self.sendTextMsg(rsp, msg.roomid, msg.sender)
else:
self.sendTextMsg(rsp, msg.sender)
return True
else:
self.LOG.error(f"无法从获得答案")
return False
def processMsg(self, msg: WxMsg) -> None:
"""当接收到消息的时候,会调用本方法。如果不实现本方法,则打印原始消息。
此处可进行自定义发送的内容,如通过 msg.content 关键字自动获取当前天气信息,并发送到对应的群组@发送者
群号:msg.roomid 微信ID:msg.sender 消息内容:msg.content
content = "xx天气信息为:"
receivers = msg.roomid
self.sendTextMsg(content, receivers, msg.sender)
"""
# 群聊消息
if msg.from_group():
# 如果在群里被 @
if msg.roomid not in self.config.GROUPS: # 不在配置的响应的群列表里,忽略
return
if msg.is_at(self.wxid): # 被@
self.toAt(msg)
else: # 其他消息
self.toChengyu(msg)
return # 处理完群聊信息,后面就不需要处理了
# 非群聊信息,按消息类型进行处理
if msg.type == 37: # 好友请求
self.autoAcceptFriendRequest(msg)
elif msg.type == 10000: # 系统信息
self.sayHiToNewFriend(msg)
elif msg.type == 0x01: # 文本消息
# 让配置加载更灵活,自己可以更新配置。也可以利用定时任务更新。
if msg.from_self():
if msg.content == "^更新$":
self.config.reload()
self.LOG.info("已更新")
else:
self.toChitchat(msg) # 闲聊
# def onMsg(self, msg: WxMsg) -> int:
# try:
# self.LOG.info(msg) # 打印信息
# self.processMsg(msg)
# except Exception as e:
# self.LOG.error(e)
# return 0
# def enableRecvMsg(self) -> None:
# self.wcf.enable_recv_msg(self.onMsg)
def enableReceivingMsg(self) -> None:
def innerProcessMsg(wcf: Wcf):
while wcf.is_receiving_msg():
try:
msg = wcf.get_msg()
self.LOG.info(msg)
self.processMsg(msg)
except Empty:
continue # Empty message
except Exception as e:
self.LOG.error(f"Receiving message error: {e}")
self.wcf.enable_receiving_msg()
Thread(target=innerProcessMsg, name="GetMessage", args=(self.wcf,), daemon=True).start()
def sendTextMsg(self, msg: str, receiver: str, at_list: str = "") -> None:
""" 发送消息
:param msg: 消息字符串
:param receiver: 接收人wxid或者群id
:param at_list: 要@的wxid, @所有人的wxid为:notify@all
"""
# msg 中需要有 @ 名单中一样数量的 @
ats = ""
if at_list:
if at_list == "notify@all": # @所有人
ats = " @所有人"
else:
wxids = at_list.split(",")
for wxid in wxids:
# 根据 wxid 查找群昵称
ats += f" @{self.wcf.get_alias_in_chatroom(wxid, receiver)}"
# {msg}{ats} 表示要发送的消息内容后面紧跟@,例如 北京天气情况为:xxx @张三
if ats == "":
self.LOG.info(f"To {receiver}: {msg}")
self.wcf.send_text(f"{msg}", receiver, at_list)
else:
self.LOG.info(f"To {receiver}: {ats}\r{msg}")
self.wcf.send_text(f"{ats}\n\n{msg}", receiver, at_list)
def getAllContacts(self) -> dict:
"""
获取联系人(包括好友、公众号、服务号、群成员……)
格式: {"wxid": "NickName"}
"""
contacts = self.wcf.query_sql("MicroMsg.db", "SELECT UserName, NickName FROM Contact;")
return {contact["UserName"]: contact["NickName"] for contact in contacts}
def keepRunningAndBlockProcess(self) -> None:
"""
保持机器人运行,不让进程退出
"""
while True:
self.runPendingJobs()
time.sleep(1)
def autoAcceptFriendRequest(self, msg: WxMsg) -> None:
try:
xml = ET.fromstring(msg.content)
v3 = xml.attrib["encryptusername"]
v4 = xml.attrib["ticket"]
scene = int(xml.attrib["scene"])
self.wcf.accept_new_friend(v3, v4, scene)
except Exception as e:
self.LOG.error(f"同意好友出错:{e}")
def sayHiToNewFriend(self, msg: WxMsg) -> None:
nickName = re.findall(r"你已添加了(.*),现在可以开始聊天了。", msg.content)
if nickName:
# 添加了好友,更新好友列表
self.allContacts[msg.sender] = nickName[0]
self.sendTextMsg(f"Hi {nickName[0]},我自动通过了你的好友请求。", msg.sender)
def newsReport(self) -> None:
receivers = self.config.NEWS
if not receivers:
return
news = News().get_important_news()
for r in receivers:
self.sendTextMsg(news, r)