This repository has been archived by the owner on Apr 16, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 64
/
Copy pathnopm.py
163 lines (140 loc) · 7.73 KB
/
nopm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# -*- coding: future_fstrings -*-
# Friendly Telegram (telegram userbot)
# Copyright (C) 2018-2019 The Authors
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from .. import loader, utils
import logging
from telethon import functions, types
logger = logging.getLogger(__name__)
def register(cb):
cb(AntiPMMod())
@loader.tds
class AntiPMMod(loader.Module):
"""Prevents people sending you unsolicited private messages"""
strings = {"name": "Anti PM",
"limit_cfg_doc": "Max number of PMs before user is blocked, or None",
"who_to_block": "<b>Specify whom to block</b>",
"blocked": ("<b>I don't want any PM from</b> <a href='tg://user?id={}'>you</a>, "
"<b>so you have been blocked!</b>"),
"who_to_unblock": "<b>Specify whom to unblock</b>",
"unblocked": ("<b>Alright fine! I'll forgive them this time. PM has been unblocked for </b> "
"<a href='tg://user?id={}'>this user</a>"),
"who_to_allow": "<b>Who shall I allow to PM?</b>",
"allowed": "<b>I have allowed</b> <a href='tg://user?id={}'>you</a> <b>to PM now</b>",
"who_to_report": "<b>Who shall I report?</b>",
"reported": "<b>You just got reported spam!</b>",
"who_to_deny": "<b>Who shall I deny to PM?</b>",
"denied": ("<b>I have denied</b> <a href='tg://user?id={}'>you</a> "
"<b>of your PM permissions.</b>"),
"notif_off": "<b>Notifications from denied PMs are silenced.</b>",
"notif_on": "<b>Notifications from denied PMs are now activated.</b>",
"go_away": ("Hey there! Unfortunately, I don't accept private messages from "
"strangers.\n\nPlease contact me in a group, or <b>wait</b> "
"for me to approve you."),
"triggered": ("Hey! I don't appreciate you barging into my PM like this! "
"Did you even ask me for approving you to PM? No? Goodbye then."
"\n\nPS: you've been reported as spam already.")}
def __init__(self):
self.config = loader.ModuleConfig("PM_BLOCK_LIMIT", None, lambda: self.strings["limit_cfg_doc"])
self._me = None
self._ratelimit = []
def config_complete(self):
self.name = self.strings["name"]
async def client_ready(self, client, db):
self._db = db
self._client = client
self._me = await client.get_me(True)
async def blockcmd(self, message):
"""Block this user to PM without being warned"""
user = await utils.get_target(message)
if not user:
await utils.answer(message, self.strings["who_to_block"])
return
await message.client(functions.contacts.BlockRequest(user))
await utils.answer(message, self.strings["blocked"].format(user))
async def unblockcmd(self, message):
"""Unblock this user to PM"""
user = await utils.get_target(message)
if not user:
await utils.answer(message, self.strings["who_to_unblock"])
return
await message.client(functions.contacts.UnblockRequest(user))
await utils.answer(message, self.strings["unblocked"].format(user))
async def allowcmd(self, message):
"""Allow this user to PM"""
user = await utils.get_target(message)
if not user:
await utils.answer(message, self.strings["who_to_allow"])
return
self._db.set(__name__, "allow", list(set(self._db.get(__name__, "allow", [])).union({user})))
await utils.answer(message, self.strings["allowed"].format(user))
async def reportcmd(self, message):
"""Report the user spam. Use only in PM"""
user = await utils.get_target(message)
if not user:
await utils.answer(message, self.strings["who_to_report"])
return
self._db.set(__name__, "allow", list(set(self._db.get(__name__, "allow", [])).difference({user})))
if message.is_reply and isinstance(message.to_id, types.PeerChannel):
# Report the message
await message.client(functions.messages.ReportRequest(peer=message.chat_id,
id=[message.reply_to_msg_id],
reason=types.InputReportReasonSpam()))
else:
await message.client(functions.messages.ReportSpamRequest(peer=message.to_id))
await utils.answer(message, self.strings["reported"])
async def denycmd(self, message):
"""Deny this user to PM without being warned"""
user = await utils.get_target(message)
if not user:
await utils.answer(message, self.strings["who_to_deny"])
return
self._db.set(__name__, "allow", list(set(self._db.get(__name__, "allow", [])).difference({user})))
await utils.answer(message, self.strings["denied"].format(user))
async def notifoffcmd(self, message):
"""Disable the notifications from denied PMs"""
self._db.set(__name__, "notif", True)
await utils.answer(message, self.strings["notif_off"])
async def notifoncmd(self, message):
"""Enable the notifications from denied PMs"""
self._db.set(__name__, "notif", False)
await utils.answer(message, self.strings["notif_on"])
async def watcher(self, message):
if getattr(message.to_id, "user_id", None) == self._me.user_id:
logger.debug("pm'd!")
if message.from_id in self._ratelimit:
self._ratelimit.remove(message.from_id)
return
else:
self._ratelimit += [message.from_id]
user = await utils.get_user(message)
if user.is_self or user.bot or user.verified:
logger.debug("User is self, bot or verified.")
return
if self.get_allowed(message.from_id):
logger.debug("Authorised pm detected")
else:
await utils.answer(message, self.strings["go_away"])
if isinstance(self.config["PM_BLOCK_LIMIT"], int):
limit = self._db.get(__name__, "limit", {})
if limit.get(message.from_id, 0) >= self.config["PM_BLOCK_LIMIT"]:
await utils.answer(message, self.strings["triggered"])
await message.client(functions.contacts.BlockRequest(message.from_id))
await message.client(functions.messages.ReportSpamRequest(peer=message.from_id))
del limit[message.from_id]
self._db.set(__name__, "limit", limit)
else:
self._db.set(__name__, "limit", {**limit, message.from_id: limit.get(message.from_id, 0) + 1})
if self._db.get(__name__, "notif", False):
await message.client.send_read_acknowledge(message.chat_id)
def get_allowed(self, id):
return id in self._db.get(__name__, "allow", [])