-
Notifications
You must be signed in to change notification settings - Fork 38
/
Copy pathnode.py
187 lines (160 loc) · 5.87 KB
/
node.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
import asyncio
import requests
import json
import time
import uuid
from loguru import logger
# Constants
NP_TOKEN = "WRITE_YOUR_NP_TOKEN_HERE"
PING_INTERVAL = 30 # seconds
RETRIES = 60 # Global retry counter for ping failures
DOMAIN_API = {
"SESSION": "https://api.nodepay.ai/api/auth/session",
"PING": "https://nw2.nodepay.ai/api/network/ping"
}
CONNECTION_STATES = {
"CONNECTED": 1,
"DISCONNECTED": 2,
"NONE_CONNECTION": 3
}
status_connect = CONNECTION_STATES["NONE_CONNECTION"]
token_info = NP_TOKEN
browser_id = None
account_info = {}
def uuidv4():
return str(uuid.uuid4())
def valid_resp(resp):
if not resp or "code" not in resp or resp["code"] < 0:
raise ValueError("Invalid response")
return resp
async def render_profile_info(proxy):
global browser_id, token_info, account_info
try:
np_session_info = load_session_info(proxy)
if not np_session_info:
response = call_api(DOMAIN_API["SESSION"], {}, proxy)
valid_resp(response)
account_info = response["data"]
if account_info.get("uid"):
save_session_info(proxy, account_info)
await start_ping(proxy)
else:
handle_logout(proxy)
else:
account_info = np_session_info
await start_ping(proxy)
except Exception as e:
logger.error(f"Error in render_profile_info for proxy {proxy}: {e}")
error_message = str(e)
if any(phrase in error_message for phrase in [
"sent 1011 (internal error) keepalive ping timeout; no close frame received",
"500 Internal Server Error"
]):
logger.info(f"Removing error proxy from the list: {proxy}")
remove_proxy_from_list(proxy)
return None
else:
logger.error(f"Connection error: {e}")
return proxy
def call_api(url, data, proxy):
headers = {
"Authorization": f"Bearer {token_info}",
"Content-Type": "application/json"
}
try:
response = requests.post(url, json=data, headers=headers, proxies={"http": proxy, "https": proxy}, timeout=10)
response.raise_for_status()
except requests.RequestException as e:
logger.error(f"Error during API call: {e}")
raise ValueError(f"Failed API call to {url}")
return valid_resp(response.json())
async def start_ping(proxy):
try:
await ping(proxy)
while True:
await asyncio.sleep(PING_INTERVAL)
await ping(proxy)
except asyncio.CancelledError:
logger.info(f"Ping task for proxy {proxy} was cancelled")
except Exception as e:
logger.error(f"Error in start_ping for proxy {proxy}: {e}")
async def ping(proxy):
global RETRIES, status_connect
try:
data = {
"id": account_info.get("uid"),
"browser_id": browser_id,
"timestamp": int(time.time())
}
response = call_api(DOMAIN_API["PING"], data, proxy)
if response["code"] == 0:
logger.info(f"Ping successful via proxy {proxy}: {response}")
RETRIES = 0
status_connect = CONNECTION_STATES["CONNECTED"]
else:
handle_ping_fail(proxy, response)
except Exception as e:
logger.error(f"Ping failed via proxy {proxy}: {e}")
handle_ping_fail(proxy, None)
def handle_ping_fail(proxy, response):
global RETRIES, status_connect
RETRIES += 1
if response and response.get("code") == 403:
handle_logout(proxy)
elif RETRIES < 2:
status_connect = CONNECTION_STATES["DISCONNECTED"]
else:
status_connect = CONNECTION_STATES["DISCONNECTED"]
def handle_logout(proxy):
global token_info, status_connect, account_info
token_info = None
status_connect = CONNECTION_STATES["NONE_CONNECTION"]
account_info = {}
save_status(proxy, None)
logger.info(f"Logged out and cleared session info for proxy {proxy}")
def load_proxies(proxy_file):
try:
with open(proxy_file, 'r') as file:
proxies = file.read().splitlines()
return proxies
except Exception as e:
logger.error(f"Failed to load proxies: {e}")
raise SystemExit("Exiting due to failure in loading proxies")
def save_status(proxy, status):
pass
def save_session_info(proxy, data):
pass
def load_session_info(proxy):
return {}
def is_valid_proxy(proxy):
return True
def remove_proxy_from_list(proxy):
pass
async def main():
with open('proxy.txt', 'r') as f:
all_proxies = f.read().splitlines()
active_proxies = [proxy for proxy in all_proxies[:100] if is_valid_proxy(proxy)] # By default 100 proxies will be run at once
tasks = {asyncio.create_task(render_profile_info(proxy)): proxy for proxy in active_proxies}
while True:
done, pending = await asyncio.wait(tasks.keys(), return_when=asyncio.FIRST_COMPLETED)
for task in done:
failed_proxy = tasks[task]
if task.result() is None:
logger.info(f"Removing and replacing failed proxy: {failed_proxy}")
active_proxies.remove(failed_proxy)
if all_proxies:
new_proxy = all_proxies.pop(0)
if is_valid_proxy(new_proxy):
active_proxies.append(new_proxy)
new_task = asyncio.create_task(render_profile_info(new_proxy))
tasks[new_task] = new_proxy
tasks.pop(task)
for proxy in set(active_proxies) - set(tasks.values()):
new_task = asyncio.create_task(render_profile_info(proxy))
tasks[new_task] = proxy
await asyncio.sleep(3) # Prevent tight loop in case of rapid failures
if __name__ == '__main__':
try:
asyncio.run(main())
except (KeyboardInterrupt, SystemExit):
logger.info("Program terminated by user.")