-
Notifications
You must be signed in to change notification settings - Fork 0
/
send_posts.py
283 lines (253 loc) · 10.5 KB
/
send_posts.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
import asyncio
import json
import logging
from random import choice
from sys import argv
from typing import List
import requests
import urllib3
from fake_useragent import UserAgent
from exceptions import AccountBannedException, PostDeletedException
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class SendPost:
post_url = "https://bbs.hupu.com/post.php?action=reply"
user_agent = UserAgent().random
session = requests.Session()
signature = '本回复由<a href="https://bbs.hupu.com/43452253.html">虎扑非官方机器人</a>自动发送。如果你对这个回复有什么问题或建议,请回复或私信。'
def __init__(self, sub_name: str, queries: List[str], reply_type: str):
self.sub_name = sub_name
self.queries = queries
self.reply_type = reply_type
self.cookie = self._get_cookie()
self.headers = self._get_headers()
self.deleted_post_ids = []
self.banned_sub_ids = []
self.do_not_reply_users = self.get_do_not_reply_users()
self.previously_replied_floors = self.get_previously_replied_floors()
self.recently_replied_floors = []
@staticmethod
def _get_cookie():
with open("cookie.txt", "r", encoding="utf-8") as f:
return f.read()
def _get_headers(self):
return {
"content-type": "application/x-www-form-urlencoded",
"user-agent": self.user_agent,
"cookie": self.cookie,
"charset": "utf-8",
}
def get_do_not_reply_users(self):
with open("data/global/do_not_reply.json", encoding="utf-8") as f:
return json.loads(f.read())["users"]
def get_previously_replied_floors(self):
try:
with open(f"data/global/replied_floors.json", encoding="utf-8") as f:
return json.loads(f.read())
except (FileNotFoundError, ValueError):
return []
def get_stats_for_pairs(self, a, b):
with open(f"data/{self.sub_name}/stats.json", encoding="utf-8") as f:
stats = json.loads(f.read())
return {
stat_name: {a: stat[a], b: stat[b]} for stat_name, stat in stats.items()
}
@staticmethod
def _get_comparison_pairs(query, content):
content = content.replace(f"#{query}#", "").strip()
a, b = content.split("vs", maxsplit=1)
a, b = a.split(" ")[-1], b.split(" ")[0]
return a, b
@staticmethod
def format_stats(stats):
string_builder = []
for stat_name, stat in stats.items():
string_builder.append(f"{stat_name}:\n")
for item, value in stat.items():
string_builder.append(f"\t{item}: {value}\n")
return "".join(string_builder)
@staticmethod
def format_newlines(input: str):
return input.replace("\n", "<br/>")
def get_comparison_reply_content(self, query: str, quote_content: str) -> str:
try:
pairs = self._get_comparison_pairs(query, quote_content)
result = self.format_stats(self.get_stats_for_pairs(*pairs))
except ValueError:
result = "comparison not found"
return result
def get_keyword_reply_content(self, query: str) -> str:
try:
with open(
f"data/{self.sub_name}/input/keyword_reply.json", encoding="utf-8"
) as f:
keyword_mapper = json.loads(f.read())
result = keyword_mapper[query]
if (
type(result) == str
and result.startswith("%")
and result.endswith("%")
):
result = keyword_mapper[result.strip("%")]
result = choice(result)
except KeyError:
result = "keyword not found"
return result
def get_licking_dog_reply_content(self, query: str) -> str:
try:
return self.session.get(
"https://api.ixiaowai.cn/tgrj/index.php", verify=False
).text
except requests.exceptions.RequestException:
return "机器人出问题了,再试试吧?"
def _get_reply_content(self, reply_type, **kw):
reply_type_function_map = {
"keyword": self.get_keyword_reply_content,
"comparison": self.get_comparison_reply_content,
"licking_dog": self.get_licking_dog_reply_content,
}
reply_type_args_map = {
"keyword": ["query"],
"comparison": ["query", "quote_content"],
"licking_dog": ["query"],
}
reply_type_function = reply_type_function_map[reply_type]
reply_type_kwargs: List = {
arg: kw[arg] for arg in reply_type_args_map[reply_type]
}
return reply_type_function(**reply_type_kwargs)
def get_replies_metadata(self, queries, reply_type):
with open(f"data/{self.sub_name}/floors.json", encoding="utf-8") as f:
floors_to_reply = json.loads(f.read())
reply_metadata = []
for post_id, post in floors_to_reply.items():
sub_id = post["meta"]["sub_id"]
floors = post["floors"]
for floor in floors.values():
if (
floor["username"] in self.do_not_reply_users
or {"post_id": post_id, "floor_id": floor["floor_id"]}
in self.previously_replied_floors
):
continue
floor_id = floor["floor_id"]
quote_content = floor["content"]
for query in queries:
if query in quote_content:
content = self._get_reply_content(
reply_type,
query=query,
quote_content=quote_content,
)
reply_metadata.append(
{
"quote_floor_id": floor_id,
"content": f"{content}\n\n{self.signature}",
"sub_id": sub_id,
"post_id": post_id,
}
)
return reply_metadata
def get_all_replies(self):
replies = self.get_replies_metadata(self.queries, self.reply_type)
with open(f"data/{self.sub_name}/replies.json", "w", encoding="utf-8") as f:
f.write(json.dumps(replies, indent=4, ensure_ascii=False))
return replies
def mark_replied_floors(self):
with open(f"data/global/replied_floors.json", "w", encoding="utf-8") as f:
replied_floors = self.previously_replied_floors + [
floor
for floor in self.recently_replied_floors
if floor not in self.previously_replied_floors
]
f.write(json.dumps(replied_floors, indent=4, ensure_ascii=False))
def test_account_banned(self, sub_id, post_id):
response = requests.get(
f"https://bbs.hupu.com/post.php?fid={sub_id}&tid={post_id}",
headers=self.headers,
timeout=10,
)
if "您在该板块封禁中" in response.text:
raise AccountBannedException("Account banned")
async def try_replying(self, url, headers, payload, times=1):
try:
response = requests.post(
url,
headers=headers,
data=payload,
timeout=10,
)
if "页面不存在" in response.text:
raise PostDeletedException()
if "出错" in response.text:
# doesn't actually post something
self.test_account_banned(sub_id=payload["fid"], post_id=payload["tid"])
raise requests.exceptions.HTTPError("嗯,出错了。")
response.raise_for_status()
except (requests.exceptions.HTTPError, requests.exceptions.ReadTimeout) as e:
logging.info(e)
await asyncio.sleep(3)
if times < 3:
return asyncio.create_task(
self.try_replying(url, headers, payload, times=times + 1)
)
else:
return -1
else:
replied_floor = {
"post_id": payload["tid"],
"floor_id": payload["quotepid"],
}
print(f"Success! {replied_floor}")
logging.info(f"Success! {replied_floor}")
self.recently_replied_floors.append(replied_floor)
return response
async def send_reply(self, metadata):
post_id = metadata["post_id"]
sub_id = metadata["sub_id"]
if sub_id in self.banned_sub_ids or post_id in self.deleted_post_ids:
return -1
quote_floor_id = metadata["quote_floor_id"]
content = metadata["content"]
payload = {
"atc_content": self.format_newlines(content),
"step": 2,
"action": "reply",
"fid": sub_id,
"tid": post_id,
"atc_html": 1,
}
if quote_floor_id != "tpc": # it's not OP who sent it.
payload["quotepid"] = quote_floor_id
try:
return await self.try_replying(self.post_url, self.headers, payload)
except PostDeletedException:
logging.error(f"Deleted:{post_id}")
self.deleted_post_ids.append(post_id)
except AccountBannedException:
logging.error(f"Banned:{sub_id}")
self.banned_sub_ids.append(sub_id)
async def send_all_replies(self, debug=False):
replies = self.get_all_replies()
if not debug:
tasks = []
for reply in replies:
tasks.append(asyncio.create_task(self.send_reply(reply)))
for task in tasks:
await task
self.mark_replied_floors()
async def send_posts(sub_name, reply_type, debug):
if reply_type == "keyword":
with open(f"data/{sub_name}/input/keyword_reply.json", encoding="utf-8") as f:
queries = json.loads(f.read()).keys()
elif reply_type == "licking_dog":
queries = ["#舔狗日记#"]
send_post = SendPost(sub_name, queries=queries, reply_type=reply_type)
send_post.get_all_replies()
task = asyncio.create_task(send_post.send_all_replies(debug))
await task
if __name__ == "__main__":
if len(argv) == 1:
sub_name = "bxj"
else:
sub_name = argv[1]
asyncio.run(send_posts(sub_name, reply_type="licking_dog", debug=True))