-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathplugin.py
274 lines (231 loc) · 8.69 KB
/
plugin.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
import json
import random
import re
import graia
from urllib.parse import quote
import requests
from bs4 import BeautifulSoup
from graia.broadcast import ExecutionStop
from graia.application import GroupMessage
from graia.application.message.elements.internal import *
from graia.application.group import Group
from MsgObj import Msg
from init_bot import *
import nest_asyncio
from command_session import *
headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/81.0.4044.138 Safari/537.36 Edg/81.0.416.77 '
} # 请求header
BaiDuWiKi = 'https://baike.baidu.com/item/'
def say_loving(message: GroupMessage, group: Group):
if group.id in start_baiDu_group:
loop.run_until_complete(
app.sendGroupMessage(group, message.messageChain.create([
Plain(get_love()),
At(message.sender.id)
]))
)
return None
def get_love() -> str:
return LoveTalkList[random.randint(0, len(LoveTalkList) - 1)]
async def read_love():
try:
f = open('love.txt', 'r')
a = f.readline()
while a:
if not a in LoveTalkList:
LoveTalkList.append(a)
a = f.readline()
except:
pass
# f = open('love.txt', 'w+')
# f.close()
def getACGKnowledge(txt: str) -> str:
Entry = txt
txt = quote(txt)
moeGirlWiki = f'https://zh.moegirl.org.cn/{txt}'
data = requests.get(moeGirlWiki, headers=headers).text
content = BeautifulSoup(data, 'html.parser')
[s.extract() for s in content('script')]
[s.extract() for s in content('style')]
try:
try:
datas = content.find_all(class_='mw-parser-output')[1]
except:
datas = content.find_all(class_='mw-parser-output')[0]
try:
bs = re.sub('\n+', '\n', datas.text)
bs = re.sub(' +', ' ', bs)
bs = ''.join([s for s in bs.splitlines(True) if s.strip()])
bs = bs. \
replace("萌娘百科欢迎您参与完善本条目☆欢迎有兴趣编辑讨论的朋友加入萌百Bilibili UP主专题编辑团队:338917445 欢迎正在阅读这个条目的您协助.", '') \
.replace("编辑前请阅读Wiki入门或条目编辑规范,并查找相关资料。萌娘百科祝您在本站度过愉快的时光。", '')
try:
firstIntroduce: str = re.findall(f'{Entry}([\s\S]*?)目录', bs)[0]
except:
firstIntroduce = ''
if len(firstIntroduce) < 200:
introduce = datas.find_all(class_='toclevel-1 tocsection-1')[0].find_all(class_='toctext')[0].get_text()
end = datas.find_all(class_='toclevel-1 tocsection-2')[0].find_all(class_='toctext')[0].text
firstIntroduce += re.findall(f"{introduce}([\s\S]*?){end}", bs)[1]
return firstIntroduce + moeGirlWiki
except:
return datas.find_all('p')[0].get_text() + '\n' + moeGirlWiki
except:
return "很抱歉没有找到相关信息或找到多个词条" + '\n' + moeGirlWiki
def getBaiduKnowledge(text: str) -> str:
txt = quote(text)
url = BaiDuWiKi + txt
try:
data = requests.get(url, headers=headers).text
bs = BeautifulSoup(data, 'html.parser').find_all(class_='para')[0].get_text() + url
return bs
except:
return "很抱歉没有找到相关结果"
def add_temp_talk(id: int, type: str, isFirstRun: bool, Question: str):
temp_talk[id] = {
'type': type,
'isFirstRun': isFirstRun,
'Q': Question
}
'''进行封装的命令解析器'''
async def session_manager(message: GroupMessage, group: Group):
if temp_talk.get(message.sender.id):
# 查看发起会话的用户是否有未结束的会话
if temp_talk[message.sender.id]['isFirstRun']:
temp_talk[message.sender.id]['isFirstRun'] = False
type = temp_talk[message.sender.id]['type']
sendMsg = None
if type == 'Add':
sendMsg = await AddQA(message, group)
elif type == 'Change':
sendMsg = await change(group, message)
# 会话结束,将会话释放掉
temp_talk.pop(message.sender.id)
if sendMsg is not None:
await app.sendGroupMessage(group, sendMsg)
def list_refresh(group_id: int):
quick_find_question_list[group_id] = sorted(GroupQA[group_id].keys(), key=lambda i: len(i), reverse=False)
'''获取问题列表'''
def FQA_list(message: GroupMessage, group: Group):
AllQuestionStr = ''
if group.id in GroupQA and len(GroupQA[group.id].keys()) >= 1:
keyList = quick_find_question_list[group.id] if group.id in quick_find_question_list else ['']
num = 0
for i in keyList:
AllQuestionStr += f"#{num}.{i}\n"
num += 1
AllQuestionStr += "使用快速索引:#+问题序号"
send_txt = AllQuestionStr
else:
send_txt = "本群暂时没有问题哦"
send_msg = message.messageChain.create(
[Plain(send_txt)]
)
print(send_txt)
return send_msg
'''
判断消息链是否合法
命令解析器
(目前有bug)
并不能很好地实现目的,因此在调用该解析器的函数中额外增加了判断语句
'''
def parser(message: GroupMessage, txt: str) -> bool:
if message.messageChain.asDisplay().startswith(txt):
return True
return False
def judge(message: MessageChain) -> bool:
if message.has(Plain):
if message.asSendable().asDisplay().startswith("修改迎新词"):
return True
return False
def judge_depend_target(message: MessageChain):
if not judge(message):
raise ExecutionStop()
# 删除问题
def deleteQA(Q: str, group: Group) -> bool:
if group.id in GroupQA:
t_QA: dict = GroupQA[group.id]
if Q in t_QA:
t_QA.pop(Q)
list_refresh(group.id)
return True
return False
# 搜寻问题并返回msg对象
def search(Q: str, group: Group) -> Msg:
if group.id in GroupQA:
t_QA: dict = GroupQA[group.id]
if Q in t_QA:
return t_QA[Q]
return None
# 对问题的回答进行修改
def get_change(Q: str, group: GroupQA, GM: GroupMessage) -> bool:
if group.id in GroupQA:
t_QA: dict = GroupQA[group.id]
if Q in t_QA:
t_QA[Q] = Msg(GM)
return True
return False
# 修改问题
async def change(group: GroupQA, GM: GroupMessage) -> MessageChain:
isFirstRun = temp_talk[GM.sender.id]['isFirstRun']
Question = temp_talk[GM.sender.id]['Q']
# 在会话管理查询该会话是否正在进行
if isFirstRun: # 如果还没有进行
if search(Question, group) is not None:
reply = f"问题{Question}已找到,请问如何回答?"
else:
reply = f"问题{Question}不存在!"
temp_talk.pop(GM.sender.id)
else: # 如果已经进行
if get_change(Question, group, GM):
reply = "修改回答成功!"
await saveQA()
else:
reply = None
if reply is not None:
already = GM.messageChain.create([
Plain(reply)
])
return already
async def AddQA(groupMsg: GroupMessage, group: Group) -> MessageChain:
global app
isFirstRun = temp_talk[groupMsg.sender.id]['isFirstRun']
Question = temp_talk[groupMsg.sender.id]['Q']
sendMsg = None
session = Msg(groupMsg)
if isFirstRun:
if session.user_id in BlackUser:
sendMsg = session.msg_chain.create([
At(session.user_id),
Plain("你已经被拉入小黑屋")
])
temp_talk.pop(session.user_id)
await app.sendGroupMessage(group, sendMsg)
return None
if not GroupQA.get(group.id):
GroupQA[group.id] = dict()
t_QA: dict = GroupQA[group.id]
if Question is not None:
if Question in t_QA.keys():
reply = "问题已存在,当前回答为:"
sendMsg = groupMsg.messageChain.create([
Plain(reply)
]).plusWith(t_QA[Question].get_msg_list())
temp_talk.pop(session.user_id)
else:
sendMsg = session.msg_chain.create([
Plain("问题已被录入,请问如何回答?")
])
else:
t_QA = GroupQA[group.id]
answer = Msg(groupMsg)
t_QA[Question] = answer
sendMsg = session.msg_chain.create([
Plain("录入成功")
])
list_refresh(group.id)
await saveQA()
del session
return sendMsg