forked from waggle-sensor/beehive-server
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutilitiesprocess.py
91 lines (75 loc) · 3.36 KB
/
utilitiesprocess.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# utilitiesprocess.py
import sys
sys.path.append("..")
sys.path.append("/usr/lib/waggle/")
from multiprocessing import Process, Manager
from config import *
import pika
from waggle_protocol.protocol.PacketHandler import *
import logging
#logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.CRITICAL)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
class UtilProcess(Process):
"""
This class handles the processing of several types of utility requests that nodes can
send to the server.
"""
def __init__(self):
"""
Initializes the utility process
"""
super(UtilProcess,self).__init__()
#Make the connection to RabbitMQ
#self.connection = pika.BlockingConnection(pika_params)
try:
self.connection = pika.BlockingConnection(pika_params)
except Exception as e:
logger.error("Could not connect to RabbitMQ server \"%s\": %s" % (pika_params.host, e))
sys.exit(1)
logger.info("Connected to RabbitMQ server \"%s\"" % (pika_params.host))
self.channel = self.connection.channel()
self.channel.basic_qos(prefetch_count=1)
# Declare the relevant incoming queue and exchange
self.channel.queue_declare("util")
self.channel.exchange_declare("waggle_in")
self.channel.basic_consume(self._callback, queue = 'util')
def _callback(self,ch,method,props,body):
"""
Called when a packet contianing a utility request is recieved
"""
#Here body is our full packet - header, message and CRC, and one can technically
#throw this back in the waggle_in and get it routed again.
header = get_header(body)
logger.debug("message from %d for %d" % (header['s_uniqid'], header['r_uniqid']) )
if(header['msg_mj_type'] == ord('p') and header["msg_mi_type"] == ord('r')): # It's a ping request
#Create a response header
resp_header = {
"msg_mj_type" : ord('p'),
"msg_mi_type" : ord('a'),
"r_uniqid" : header["s_uniqid"],
"resp_session": header["snd_session"]
}
packer = pack(resp_header,"Pong!")
# Need to use a for each loop because packer yields packets, it doesnt return them
for packet in packer:
response = packet
self.channel.basic_publish(exchange='waggle_in', routing_key = "in", body=response)
elif(header['msg_mj_type'] == ord('t') and header["msg_mi_type"] == ord('r')): # It's a time request
# Make a response header
resp_header = {
"msg_mj_type" : ord('t'),
"msg_mi_type" : ord('a'),
"r_uniqid" : header["s_uniqid"],
"resp_session": header["snd_session"]
}
packer = pack(resp_header,str(time.time())) # Stuff the time in a packet and send it to the router
for packet in packer:
response = packet
self.channel.basic_publish(exchange='waggle_in',routing_key="in",body=response)
ch.basic_ack(delivery_tag = method.delivery_tag)
def run(self):
self.channel.start_consuming()
def join(self):
super(UtilProcess,self).terminate()
self.connection.close()