Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asterisk disconnection and timeout were not correctly handled #66

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
PYTHON?=python
PYTHON?=python3

test:
${PYTHON} -m unittest discover -s "tests/unit" -p "test_*.py"
Expand Down Expand Up @@ -27,4 +27,4 @@ clean:
rm -f $(shell find . -name "*.pyc")
rm -rf htmlcov/ coverage.xml .coverage
rm -rf dist/ build/
rm -rf *.egg-info
rm -rf *.egg-info
36 changes: 30 additions & 6 deletions asterisk/ami/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def __init__(self, address='127.0.0.1', port=5038,
self._buffer_size = buffer_size
self._port = port
self._socket = None
self._socket_alive = False
self._thread = None
self.finished = None
self._ami_version = None
Expand All @@ -83,6 +84,7 @@ def connect(self):
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.settimeout(self._timeout)
self._socket.connect((self._address, self._port))
self._socket_alive = True
self.finished = threading.Event()
self._thread = threading.Thread(target=self.listen)
self._thread.daemon = True
Expand Down Expand Up @@ -112,16 +114,22 @@ def _fire_on_unknown(self, **kwargs):
for listener in self._listeners:
listener.on_unknown(source=self, **kwargs)

def disconnect(self):
self.finished.set()
def _close(self):
try:
self._socket_alive = False
self._futures = {}
self._socket.close()
self._thread.join(self._timeout)
except:
pass

def disconnect(self):
self.finished.set()
self._close()

def login(self, username, secret, callback=None):
if self.finished is None or self.finished.is_set():
if self.finished is None or self.finished.is_set() or not self._socket_alive:
self._close()
self.connect()
return self.send_action(LoginAction(username, secret), callback)

Expand All @@ -140,6 +148,7 @@ def send_action(self, action, callback=None):
self._futures[action_id] = future
self._fire_on_action(action=action)
self.send(action)

return future

def send(self, pack):
Expand Down Expand Up @@ -173,7 +182,11 @@ def _next_pack(self):

def listen(self):
pack_generator = self._next_pack()
asterisk_start = next(pack_generator)
try:
asterisk_start = next(pack_generator)
except:
self._socket_alive = False
raise
match = AMIClient.asterisk_start_regex.match(asterisk_start)
if not match:
raise Exception()
Expand All @@ -185,9 +198,10 @@ def listen(self):
self.fire_recv_pack(pack)
self._fire_on_disconnect(error=None)
except Exception as ex:
self._socket_alive = False
self._fire_on_disconnect(error=ex)

def fire_recv_reponse(self, response):
def fire_recv_response(self, response):
self._fire_on_response(response=response)
if response.status.lower() == 'goodbye':
self.finished.set()
Expand All @@ -207,7 +221,7 @@ def fire_recv_event(self, event):
def fire_recv_pack(self, pack):
if Response.match(pack):
response = Response.read(pack)
self.fire_recv_reponse(response)
self.fire_recv_response(response)
return
if Event.match(pack):
event = Event.read(pack)
Expand Down Expand Up @@ -264,13 +278,16 @@ def __init__(self, ami_client, delay=0.5,
self._login_args = None
self._login = None
self._logoff = None
self._disconnect = None
self._prepare_client()

def _prepare_client(self):
self._login = self._ami_client.login
self._logoff = self._ami_client.logoff
self._disconnect = self._ami_client.disconnect
self._ami_client.login = self._login_wrapper
self._ami_client.logoff = self._logoff_wrapper
self._ami_client.disconnect = self._disconnect_wrapper

def _rollback_client(self):
self._ami_client.login = self._login
Expand All @@ -295,6 +312,12 @@ def _logoff_wrapper(self, *args, **kwargs):
self._rollback_client()
return self._logoff(*args, **kwargs)

def _disconnect_wrapper(self, *args, **kwargs):
if self.finished:
self.finished.set()
self._rollback_client()
return self._disconnect(*args, **kwargs)

def ping(self):
try:
f = self._ami_client.send_action(Action('Ping'))
Expand All @@ -303,6 +326,7 @@ def ping(self):
return True
self.on_disconnect(self._ami_client, response)
except Exception as ex:
self._socket_alive = False
self.on_disconnect(self._ami_client, ex)
return False

Expand Down
2 changes: 1 addition & 1 deletion asterisk/ami/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def set_response(self, response):
finally:
self._lock.acquire()
self._response = response
self._lock.notifyAll()
self._lock.notify_all()
self._lock.release()

def get_response(self):
Expand Down
2 changes: 1 addition & 1 deletion requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
rx==1.6
rx==4.0.4
106 changes: 103 additions & 3 deletions tests/unit/mock_ami.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
import re
import socket
import threading
import time

from rx import Observable
from asterisk.ami.utils import unicode
from asterisk.ami import Action

import rx


class AMIMock(object):
thread = None
encoding = 'utf-8'
asterisk_line_regex = re.compile(b'\r\n', re.IGNORECASE | re.MULTILINE)
asterisk_pack_regex = re.compile(b'\r\n\r\n', re.IGNORECASE | re.MULTILINE)

def __init__(self):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
Expand All @@ -17,6 +25,33 @@ def start(self):
self.thread.start()
return self.socket.getsockname()

def parse_action(self, lines):
try:
kv = [l.decode(self.encoding).split(': ', 1) for l in lines]
(key, name) = kv.pop(0)
if key.lower() != 'action':
raise Exception()
variables = dict([v.split('=', 1) for (k ,v) in kv if k.lower() == 'variable'])
keys = dict([(k, v) for (k, v) in kv if k.lower() != 'variable'])
return Action(name, keys=keys, variables=variables)
except:
pass

def send_response(self, conn, status, **keys):
lines = [f'Response: {status.capitalize()}'] + [f'{k}: {v}' for k, v in keys.items()]
pack = '\r\n'.join(lines)
conn.send(bytearray(unicode(pack) + '\r\n\r\n', self.encoding))
return pack

def handle_action(self, conn, action):
pass

def handle_request(self, conn, data):
lines = self.asterisk_line_regex.split(data)
action = self.parse_action(lines)
if action:
self.handle_action(conn, action)

def run(self):
self.socket.listen(5)

Expand All @@ -28,11 +63,76 @@ def clients_iter():
pass

def send_start(c):
return c[0].send(b'Asterisk Call Manager/6.6.6\r\n\r\n')
try:
print(f"Starting connection: {c[0]}")
c[0].send(b'Asterisk Call Manager/6.6.6\r\n')
data = b''
while True:
recv = c[0].recv(1024)
if recv == b'':
break
data += recv
while self.asterisk_pack_regex.search(data):
(pack, data) = self.asterisk_pack_regex.split(data, 1)
self.handle_request(c[0], pack)
except:
pass
finally:
c[0].close()

Observable.from_(clients_iter()) \
rx.from_(clients_iter()) \
.subscribe(send_start)

def stop(self):
self.socket.close()
self.thread.join(1)


class AMIPingMock(AMIMock):
def __init__(self):
super().__init__()
self.ping_counter = 0

def send_response(self, conn, status, **keys):
response = super().send_response(conn, status, **keys)
print(f'Sending Response\r\n{response}\r\n')


def handle_action(self, conn, action):
print(f'Received Action\r\n{action}')

if action.name == 'Login':
self.send_response(conn, 'Success', ActionID=action.ActionID, Message='Authentication accepted')


class AMITimeoutMock(AMIPingMock):
"""
Simulate long server response.
"""
def handle_action(self, conn, action):
super().handle_action(conn, action)

if action.name == 'Ping':
self.ping_counter += 1
if self.ping_counter == 2:
print('Forcing connection timeout')
time.sleep(5.0)
self.send_response(conn, 'Success', ActionID=action.ActionID)



class AMIDisconnectionMock(AMIPingMock):
"""
Simulate server disconnection.
"""
def handle_action(self, conn, action):
super().handle_action(conn, action)

if action.name == 'Ping':
self.ping_counter += 1
if self.ping_counter == 2:
print('Forcing disconnection')
conn.close()
else:
self.send_response(conn, 'Success', ActionID=action.ActionID)

2 changes: 1 addition & 1 deletion tests/unit/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import unittest

from asterisk.ami import AMIClient
from mock_ami import AMIMock
from tests.unit.mock_ami import AMIMock
from tests.settings import login


Expand Down
51 changes: 51 additions & 0 deletions tests/unit/test_server_disconnection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import time
import unittest

from tests.settings import login
from tests.unit.mock_ami import AMIDisconnectionMock
from asterisk import ami


class ServerTimeoutTest(unittest.TestCase):
def setUp(self):
self.disconnected = 0
self.reconnected = 0
self.server = AMIDisconnectionMock()
self.server_address = self.server.start()
time.sleep(0.2)

def tearDown(self):
self.server.stop()

def on_disconnect(self, ami, reason):
print(f"Disconnected: {reason}")
self.disconnected += 1

def on_reconnect(self, ami, reason):
print(f"Reconnected: {reason}")
self.reconnected += 1

def test_timeout(self):
ami_client = ami.AMIClient(**dict(zip(('address', 'port'), self.server_address), timeout=1.0))
ami.AutoReconnect(
ami_client,
delay=0.5,
on_reconnect=self.on_reconnect,
on_disconnect=self.on_disconnect
)
ami_client.login(**login)
try:
time.sleep(10.0)
self.assertGreaterEqual(self.disconnected, 1)
self.assertGreaterEqual(self.reconnected, 1)
self.disconnected = 0
self.reconnected = 0
time.sleep(5.0)
self.assertEqual(self.disconnected, 0)
self.assertEqual(self.reconnected, 0)

future = ami_client.send_action(ami.Action('Ping'))
self.assertIsInstance(future.response, ami.Response)

finally:
ami_client.disconnect()
51 changes: 51 additions & 0 deletions tests/unit/test_server_timeout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import time
import unittest

from tests.settings import login
from tests.unit.mock_ami import AMITimeoutMock
from asterisk import ami


class ServerTimeoutTest(unittest.TestCase):
def setUp(self):
self.disconnected = 0
self.reconnected = 0
self.server = AMITimeoutMock()
self.server_address = self.server.start()
time.sleep(0.2)

def tearDown(self):
self.server.stop()

def on_disconnect(self, ami, reason):
print(f"Disconnected: {reason}")
self.disconnected += 1

def on_reconnect(self, ami, reason):
print(f"Reconnected: {reason}")
self.reconnected += 1

def test_timeout(self):
ami_client = ami.AMIClient(**dict(zip(('address', 'port'), self.server_address), timeout=1.0))
ami.AutoReconnect(
ami_client,
delay=0.5,
on_reconnect=self.on_reconnect,
on_disconnect=self.on_disconnect
)
ami_client.login(**login)
try:
time.sleep(10.0)
self.assertGreaterEqual(self.disconnected, 1)
self.assertGreaterEqual(self.reconnected, 1)
self.disconnected = 0
self.reconnected = 0
time.sleep(5.0)
self.assertEqual(self.disconnected, 0)
self.assertEqual(self.reconnected, 0)

future = ami_client.send_action(ami.Action('Ping'))
self.assertIsInstance(future.response, ami.Response)

finally:
ami_client.disconnect()