-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathipfs_pubsub.py
157 lines (135 loc) · 6 KB
/
ipfs_pubsub.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
from urllib import request
import requests
from threading import Thread
from time import sleep
from time import time
import logging
import json
api_urls = {"ls": "/api/v0/pubsub/ls",
"peers": "/api/v0/pubsub/peers",
"pub": "/api/v0/pubsub/pub",
"sub": "/api/v0/pubsub/sub"
}
class PubSub:
def __init__(self, host="localhost", port=5001, update_interval=0.5):
self.host = host
self.port = port
# this contains the fetched data and their relative sockets
# [topic] = {"data": data, "socket": socket}
self.subscriptions = {}
self.update_thread = None
self.sub_update_interval = update_interval
def topic_ls(self):
"Lists topics by name, returns a list of strings"
url = "http://{}:{}{}".format(self.host, self.port, api_urls["ls"])
with request.urlopen(url) as response:
return json.loads(response.read().decode("utf-8"))["Strings"]
def topic_peers(self, topic=""):
"Lists peers subscribed to topic, returns a list of peers"
url = "http://{}:{}{}?arg={}".format(self.host,
self.port,
api_urls["peers"],
topic)
with request.urlopen(url) as response:
return json.loads(response.read().decode("utf-8"))["Strings"]
def topic_peer_number(self, topic):
peers = self.topic_peers(topic)
if peers:
return len(peers)
return 0
def topic_pub(self, topic, payload):
"Publishes payload to topic, returns True if successfull"
url = "http://{}:{}{}?arg={}&arg={}".format(self.host,
self.port,
api_urls["pub"],
topic,
payload)
with request.urlopen(url, payload.encode("utf-8")) as response:
if response.getcode() == 200:
return True
else:
return False
def _subscription_fetch_data(self, topic):
"Fetches the data for every subscribed topic and saves it"
while True:
# topic_data["socket"][1] is the socket iterator
for msg in self.subscriptions[topic]["socket"][1]:
if msg:
decoded_msg = msg.decode("utf-8")
if decoded_msg != "{}":
if "data" not in decoded_msg:
continue
# Here we get the actual data in base 64
data = {"content":
decoded_msg.split(",")[1].split(":")[1],
"sender": decoded_msg.split(",")[0].
split(":")[1],
"timestamp": time()
}
logging.debug("Received data: ".format(data))
if data:
for callback in self.\
subscriptions[topic]["on_receive"]:
callback(data)
self.subscriptions[topic]["msg"].append(data)
sleep(self.sub_update_interval)
def topic_data(self, topic):
return self.subscriptions[topic]["msg"]
def topic_sub(self, topic, discover_peers=False):
"""
Subscribes to topic, returns true if successfull
"""
url = "http://{}:{}{}?arg={}&discover={}".format(self.host,
self.port,
api_urls["sub"],
topic,
discover_peers)
if topic not in self.subscriptions:
try:
socket = requests.get(url, stream=True)
iterator = socket.iter_lines()
update_thread = Thread(target=self._subscription_fetch_data,
args=(topic,))
# on_receive should be a list of callbacks
self.subscriptions[topic] = {"socket": [socket, iterator],
"thread": update_thread,
"msg": [],
"on_receive": []
}
update_thread.start()
except Exception as exc:
logging.error(exc)
return True
def topic_set_callback(self, topic, func):
"""
Sets func as callback for topic update, func is called as follow
func(data)
where data is a dict with the following keys:
['hash', 'timestamp', 'sender']
"""
if self.subscriptions[topic]:
self.subscriptions[topic]["on_receive"].append(func)
def topic_pop_message(self, topic):
# FIFO data stack
if self.subscriptions[topic]["msg"]:
return self.subscriptions[topic]["msg"].pop(0)
def topic_pop_messages_from_sender(self, topic, sender):
# FIFO data stack
messages = set()
removable = []
if self.subscriptions[topic]["msg"]:
for msg in self.subscriptions[topic]["msg"]:
if msg["sender"] == sender:
messages.update((msg["content"],))
removable.append((msg,))
for msg in removable:
try:
self.subscriptions[topic]["msg"].remove(msg)
except:
pass
return messages
def topic_message_num(self, topic):
"Returns the number of messages in the queue"
return len(self.subscriptions[topic]["msg"])
if __name__ == "__main__":
logging.error("Don't execute this module")