Commit 4f5d18b4 authored by Philipp Hörist's avatar Philipp Hörist
Browse files

Fix tests

parent 78a8dd4c
import gi
gi.require_version('Soup', '2.4')
from .protocol import *
__version__ = "0.9.93"
import logging
# Prevents logging output in tests
log = logging.getLogger('nbxmpp')
log.setLevel(logging.CRITICAL)
'''
Tests for smacks.py Stream Management
'''
import unittest
from unittest.mock import Mock
from nbxmpp import dispatcher
from nbxmpp import protocol
from nbxmpp import smacks
class TestDispatcherNB(unittest.TestCase):
'''
Test class for NonBlocking dispatcher. Tested dispatcher will be plugged
into a mock client
'''
def setUp(self):
self.dispatcher = dispatcher.XMPPDispatcher()
# Setup mock client
self.client = Mock()
self.client.defaultNamespace = protocol.NS_CLIENT
self.client.Connection = Mock() # mock transport
self.con = self.client.Connection
self.con.sm = smacks.Smacks()
def tearDown(self):
# Unplug if needed
if hasattr(self.dispatcher, '_owner'):
self.dispatcher.PlugOut()
def _simulate_connect(self):
self.dispatcher.PlugIn(self.client) # client is owner
self.con.sm.PlugIn(self.client)
# Simulate that we have established a connection
self.dispatcher.StreamInit()
self.dispatcher.ProcessNonBlocking("<stream:stream "
"xmlns:stream='http://etherx.jabber.org/streams' "
"xmlns='jabber:client'>")
self.dispatcher.ProcessNonBlocking("<stream:features> "
"<sm xmlns='urn:xmpp:sm:3'> <optional/> </sm> </stream:features>")
self.con.sm.negociate()
self.dispatcher.ProcessNonBlocking("<enabled xmlns='urn:xmpp:sm:3' "
"id='some-long-sm-id' resume='true'/>")
assert(self.con.sm.enabled)
def _simulate_resume(self):
self.con.sm.resume_request()
# Resuming acknowledging 5 stanzas
self.dispatcher.ProcessNonBlocking("<resumed xmlns='urn:xmpp:sm:3' "
"id='some-long-sm-id' h='5'/>")
assert(self.con.sm.resuming)
def _send(self, send, r, stanza):
for i in range(r):
send(stanza)
def test_messages(self):
message = '<message><body>Helloo </body></message>'
iq = '''<iq from='proxy.jabber.ru' to='j.xxxxxxxx.org/Gajim' type='error' id='18'>
<query xmlns='http://jabber.org/protocol/bytestreams'/>
<error code='403' type='auth'>
<forbidden xmlns='urn:ietf:params:xml:ns:xmpp-stanzas'/>
</error>
</iq>'''
presence = '''<presence from='xxxxxxxxx.com/Talk.v1044194B1E2' to='j.xxxxxxxx.org'>
<priority>24</priority>
<c node="http://www.google.com/xmpp/client/caps" ver="1.0.0.104" ext="share-v1 voice-v1" xmlns="http://jabber.org/protocol/caps"/>
<x stamp="20110614T23:17:51" xmlns="jabber:x:delay"/>
<status>In love Kakashi Sensei :P</status>
<x xmlns="vcard-temp:x:update">
<photo>db4b7c52e39ba28562c74542d5988d47f09108a3</photo>
</x>
</presence> '''
self._simulate_connect()
uqueue = self.con.sm.uqueue
self.assertEqual(self.con.sm.out_h, 0)
self.assertEqual(self.con.sm.in_h, 0)
# The server sends 10 stanzas
self._send(self.dispatcher.ProcessNonBlocking, 5, message)
self._send(self.dispatcher.ProcessNonBlocking, 4, iq)
self._send(self.dispatcher.ProcessNonBlocking, 1, presence)
# The client has recieved 10 stanzas and sent none
self.assertEqual(self.con.sm.in_h, 10)
self.assertEqual(self.con.sm.out_h, 0)
m = protocol.Message()
# The client sends 10 stanzas
for i in range(10):
m = protocol.Message(body=str(i))
self.dispatcher.send(m)
# Client sends 10 stanzas and put them in the queue
self.assertEqual(self.con.sm.out_h, 10)
self.assertEqual(len(uqueue), 10)
# The server acknowledges that it recieved 5 stanzas
self.dispatcher.ProcessNonBlocking("<a xmlns='urn:xmpp:sm:3' h='5'/>")
# 5 stanzas are removed from the queue, only 5 stanzas are left
self.assertEqual(len(uqueue), 5)
# Check for the right order of stanzas in the queue
l = ['5', '6', '7', '8', '9']
for i in uqueue:
self.assertEqual(i.getBody(), l[0])
l.pop(0)
def test_resumption(self):
self._simulate_connect()
m = protocol.Message()
# The client sends 5 stanzas
for i in range(5):
m = protocol.Message(body=str(i))
self.dispatcher.send(m)
self._simulate_resume()
# No stanzas left
self.assertEqual(len(self.con.sm.uqueue), 0)
if __name__ == '__main__':
unittest.main()
......@@ -3,24 +3,19 @@ from unittest.mock import Mock
from test.lib.const import STREAM_START
from nbxmpp import dispatcher
from nbxmpp.dispatcher import StanzaDispatcher
from nbxmpp.protocol import NS_CLIENT
from nbxmpp.protocol import JID
class StanzaHandlerTest(unittest.TestCase):
def setUp(self):
self.dispatcher = dispatcher.XMPPDispatcher()
# Setup mock client
self.client = Mock()
self.client.get_bound_jid.return_value = JID('test@test.test')
self.client.defaultNamespace = NS_CLIENT
self.client.Connection = Mock() # mock transport
self.con = self.client.Connection
self.client.is_websocket = False
self.dispatcher = StanzaDispatcher(self.client)
self.dispatcher.PlugIn(self.client)
self.client.get_bound_jid.return_value = JID('test@test.test')
# Simulate that we have established a connection
self.dispatcher.StreamInit()
self.dispatcher.ProcessNonBlocking(STREAM_START)
self.dispatcher.reset_parser()
self.dispatcher.process_data(STREAM_START)
......@@ -45,9 +45,9 @@ class ActivityTest(StanzaHandlerTest):
</message>
'''
self.dispatcher.RegisterHandler(
self.dispatcher.register_handler(
*StanzaHandler(name='message',
callback=_on_message,
ns=NS_PUBSUB_EVENT))
self.dispatcher.ProcessNonBlocking(event)
self.dispatcher.process_data(event)
......@@ -50,9 +50,9 @@ class AvatarTest(StanzaHandlerTest):
</message>
'''
self.dispatcher.RegisterHandler(
self.dispatcher.register_handler(
*StanzaHandler(name='message',
callback=_on_message,
ns=NS_PUBSUB_EVENT))
self.dispatcher.ProcessNonBlocking(event)
self.dispatcher.process_data(event)
......@@ -61,12 +61,12 @@ class BookmarkTest(StanzaHandlerTest):
</message>
'''
self.dispatcher.RegisterHandler(
self.dispatcher.register_handler(
*StanzaHandler(name='message',
callback=_on_message,
ns=NS_PUBSUB_EVENT))
self.dispatcher.ProcessNonBlocking(event)
self.dispatcher.process_data(event)
def test_bookmark_2_parsing(self):
def _on_message(_con, _stanza, properties):
......@@ -106,9 +106,9 @@ class BookmarkTest(StanzaHandlerTest):
</message>
'''
self.dispatcher.RegisterHandler(
self.dispatcher.register_handler(
*StanzaHandler(name='message',
callback=_on_message,
ns=NS_PUBSUB_EVENT))
self.dispatcher.ProcessNonBlocking(event)
self.dispatcher.process_data(event)
......@@ -97,3 +97,7 @@ class TestDateTime(unittest.TestCase):
result = parse_datetime(
time_string, check_utc=True, epoch=True)
self.assertEqual(result, expected_value)
if __name__ == '__main__':
unittest.main()
......@@ -27,3 +27,7 @@ class TestHelpers(unittest.TestCase):
timestamp = parse_delay(message, not_from=['romeo.com'])
self.assertEqual(timestamp, 1031699305.0)
if __name__ == '__main__':
unittest.main()
......@@ -86,9 +86,9 @@ class LocationTest(StanzaHandlerTest):
</message>
'''
self.dispatcher.RegisterHandler(
self.dispatcher.register_handler(
*StanzaHandler(name='message',
callback=_on_message,
ns=NS_PUBSUB_EVENT))
self.dispatcher.ProcessNonBlocking(event)
self.dispatcher.process_data(event)
......@@ -41,9 +41,9 @@ class MoodTest(StanzaHandlerTest):
</message>
'''
self.dispatcher.RegisterHandler(
self.dispatcher.register_handler(
*StanzaHandler(name='message',
callback=_on_message,
ns=NS_PUBSUB_EVENT))
self.dispatcher.ProcessNonBlocking(event)
self.dispatcher.process_data(event)
......@@ -29,13 +29,13 @@ class PubsubTest(StanzaHandlerTest):
</message>
'''
self.dispatcher.RegisterHandler(
self.dispatcher.register_handler(
*StanzaHandler(name='message',
callback=_on_message,
ns=NS_PUBSUB_EVENT,
priority=16))
self.dispatcher.ProcessNonBlocking(event)
self.dispatcher.process_data(event)
def test_delete_event(self):
def _on_message(_con, _stanza, properties):
......@@ -59,13 +59,13 @@ class PubsubTest(StanzaHandlerTest):
</message>
'''
self.dispatcher.RegisterHandler(
self.dispatcher.register_handler(
*StanzaHandler(name='message',
callback=_on_message,
ns=NS_PUBSUB_EVENT,
priority=16))
self.dispatcher.ProcessNonBlocking(event)
self.dispatcher.process_data(event)
def test_retracted_event(self):
......@@ -90,10 +90,10 @@ class PubsubTest(StanzaHandlerTest):
</message>
'''
self.dispatcher.RegisterHandler(
self.dispatcher.register_handler(
*StanzaHandler(name='message',
callback=_on_message,
ns=NS_PUBSUB_EVENT,
priority=16))
self.dispatcher.ProcessNonBlocking(event)
self.dispatcher.process_data(event)
......@@ -2,27 +2,33 @@ import unittest
from unittest.mock import Mock
from nbxmpp.auth import SCRAM_SHA_1
from nbxmpp.util import b64encode
# Test vector from https://wiki.xmpp.org/web/SASL_and_SCRAM-SHA-1
class SCRAM(unittest.TestCase):
def setUp(self):
self.con = Mock()
self._method = SCRAM_SHA_1(self.con, None)
self._method._client_nonce = '4691d8f313ddb02d2eed511d5617a5c6f72efa671613c598'
self._method._client_nonce = 'fyko+d2lbbFgONRv9qkxdawL'
self.maxDiff = None
self._username = 'user'
self._password = 'pencil'
self._username = 'philw'
self._password = 'testtest123'
self.auth = '<auth xmlns="urn:ietf:params:xml:ns:xmpp-sasl" mechanism="SCRAM-SHA-1">eSwsbj1waGlsdyxyPTQ2OTFkOGYzMTNkZGIwMmQyZWVkNTExZDU2MTdhNWM2ZjcyZWZhNjcxNjEzYzU5OA==</auth>'
self.challenge = 'cj00NjkxZDhmMzEzZGRiMDJkMmVlZDUxMWQ1NjE3YTVjNmY3MmVmYTY3MTYxM2M1OThDaEJpZGEyb0NJeks5S25QdGsxSUZnPT0scz1iZFkrbkRjdUhuVGFtNzgyaG9neHNnPT0saT00MDk2'
self.response = '<response xmlns="urn:ietf:params:xml:ns:xmpp-sasl">Yz1lU3dzLHI9NDY5MWQ4ZjMxM2RkYjAyZDJlZWQ1MTFkNTYxN2E1YzZmNzJlZmE2NzE2MTNjNTk4Q2hCaWRhMm9DSXpLOUtuUHRrMUlGZz09LHA9NUd5a09hWCtSWlllR3E2L2U3YTE2UDVBeFVrPQ==</response>'
self.success = 'dj1qMGtuNlVvT1FjTmJ0MGFlYnEwV1QzYWNkSW89'
self.auth = '<auth xmlns="urn:ietf:params:xml:ns:xmpp-sasl" mechanism="SCRAM-SHA-1">%s</auth>' % b64encode('n,,n=user,r=fyko+d2lbbFgONRv9qkxdawL')
self.challenge = b64encode('r=fyko+d2lbbFgONRv9qkxdawL3rfcNHYJY1ZVvWVs7j,s=QSXCR+Q6sek8bf92,i=4096')
self.response = '<response xmlns="urn:ietf:params:xml:ns:xmpp-sasl">%s</response>' % b64encode('c=biws,r=fyko+d2lbbFgONRv9qkxdawL3rfcNHYJY1ZVvWVs7j,p=v0X8v3Bz2T0CJGbJQyF0X+HI4Ts=')
self.success = b64encode('v=rmF9pqV8S7suAoZWja4dJRkFsKQ=')
def test_auth(self):
self._method.initiate(self._username, self._password)
self.assertEqual(self.auth, str(self.con.send.call_args[0][0]))
self.assertEqual(self.auth, str(self.con.send_nonza.call_args[0][0]))
self._method.response(self.challenge)
self.assertEqual(self.response, str(self.con.send.call_args[0][0]))
self.assertEqual(self.response, str(self.con.send_nonza.call_args[0][0]))
self._method.success(self.success)
if __name__ == '__main__':
unittest.main()
......@@ -52,9 +52,9 @@ class TuneTest(StanzaHandlerTest):
</message>
'''
self.dispatcher.RegisterHandler(
self.dispatcher.register_handler(
*StanzaHandler(name='message',
callback=_on_message,
ns=NS_PUBSUB_EVENT))
self.dispatcher.ProcessNonBlocking(event)
self.dispatcher.process_data(event)
......@@ -7,15 +7,12 @@ from nbxmpp import dispatcher
class XMLVulnerability(unittest.TestCase):
def setUp(self):
self.client = Mock()
self.client.Connection = Mock()
self.dispatcher = dispatcher.XMPPDispatcher()
self.dispatcher.PlugIn(self.client)
self.dispatcher.StreamInit()
def tearDown(self):
self.dispatcher = None
self.client = None
self.stream = Mock()
self.stream.is_websocket = False
self.dispatcher = dispatcher.StanzaDispatcher(self.stream)
self._error_handler = Mock()
self.dispatcher.subscribe('parsing-error', self._error_handler)
self.dispatcher.reset_parser()
def test_exponential_entity_expansion(self):
bomb = """<?xml version="1.0" encoding="utf-8"?>
......@@ -26,8 +23,8 @@ class XMLVulnerability(unittest.TestCase):
]>
<bomb>&c;</bomb>"""
self.dispatcher.ProcessNonBlocking(bomb)
self.client.Connection.disconnect.assert_called()
self.dispatcher.process_data(bomb)
self._error_handler.assert_called()
def test_quadratic_blowup(self):
bomb = """<?xml version="1.0" encoding="utf-8"?>
......@@ -36,8 +33,8 @@ class XMLVulnerability(unittest.TestCase):
]>
<bomb>&a;&a;&a;... repeat</bomb>"""
self.dispatcher.ProcessNonBlocking(bomb)
self.client.Connection.disconnect.assert_called()
self.dispatcher.process_data(bomb)
self._error_handler.assert_called()
def test_external_entity_expansion(self):
bomb = """<?xml version="1.0" encoding="utf-8"?>
......@@ -46,8 +43,8 @@ class XMLVulnerability(unittest.TestCase):
]>
<root>&ee;</root>"""
self.dispatcher.ProcessNonBlocking(bomb)
self.client.Connection.disconnect.assert_called()
self.dispatcher.process_data(bomb)
self._error_handler.assert_called()
def test_external_local_entity_expansion(self):
bomb = """<?xml version="1.0" encoding="utf-8"?>
......@@ -57,8 +54,8 @@ class XMLVulnerability(unittest.TestCase):
]>
<root>&ee;</root>"""
self.dispatcher.ProcessNonBlocking(bomb)
self.client.Connection.disconnect.assert_called()
self.dispatcher.process_data(bomb)
self._error_handler.assert_called()
def test_dtd_retrival(self):
bomb = """<?xml version="1.0" encoding="utf-8"?>
......@@ -69,5 +66,9 @@ class XMLVulnerability(unittest.TestCase):
<body>text</body>
</html>"""
self.dispatcher.ProcessNonBlocking(bomb)
self.client.Connection.disconnect.assert_called()
self.dispatcher.process_data(bomb)
self._error_handler.assert_called()
if __name__ == '__main__':
unittest.main()
'''
Testing script for NonBlockingClient class (nbxmpp/client.py)
It actually connects to a xmpp server.
'''
import unittest
from unittest.mock import Mock
from test.lib.xmpp_mocks import MockConnection, IdleQueueThread
from nbxmpp import client
# (XMPP server hostname, c2s port). Script will connect to the machine.
xmpp_server_port = ('gajim.org', 5222)
# [username, password, resource]. Script will authenticate to server above
credentials = ['unittest', 'testtest', 'res']
@unittest.skip("gajim.org only supports TLS/SSL connections")
class TestNonBlockingClient(unittest.TestCase):
'''
Test Cases class for NonBlockingClient.
'''
def setUp(self):
''' IdleQueue thread is run and dummy connection is created. '''
self.idlequeue_thread = IdleQueueThread()
self.connection = MockConnection() # for dummy callbacks
self.idlequeue_thread.start()
def tearDown(self):
''' IdleQueue thread is stopped. '''
self.idlequeue_thread.stop_thread()
self.idlequeue_thread.join()
del self.connection
self.client = None
def open_stream(self, server_port, wrong_pass=False):
'''
Method opening the XMPP connection. It returns when <stream:features>
is received from server.
:param server_port: tuple of (hostname, port) for where the client should
connect.
'''
class TempConnection():
def get_password(self, cb, mechanism):
if wrong_pass:
cb('wrong pass')
else:
cb(credentials[1])
def on_connect_failure(self):
pass
self.client = client.NonBlockingClient(
domain=server_port[0],
idlequeue=self.idlequeue_thread.iq,
caller=Mock(spec=TempConnection))
self.client.connect(
hostname=server_port[0],
port=server_port[1],
on_connect=lambda *args: self.connection.on_connect(True, *args),
on_connect_failure=lambda *args: self.connection.on_connect(
False, *args))
self.assertTrue(self.connection.wait(),
msg='waiting for callback from client constructor')
# if on_connect was called, client has to be connected and vice versa
if self.connection.connect_succeeded:
self.assertTrue(self.client.get_connect_type())
else:
self.assertTrue(not self.client.get_connect_type())
def client_auth(self, username, password, resource, sasl):
'''
Method authenticating connected client with supplied credentials. Returns
when authentication is over.
:param sasl: whether to use sasl (sasl=1) or old (sasl=0) authentication
:todo: to check and be more specific about when it returns
(bind, session..)
'''
self.client.auth(username, password, resource, sasl,
on_auth=self.connection.on_auth)
self.assertTrue(self.connection.wait(), msg='waiting for authentication')
def do_disconnect(self):
'''
Does disconnecting of connected client. Returns when TCP connection is
closed.
'''
self.client.RegisterDisconnectHandler(self.connection.set_event)
self.client.disconnect()
self.assertTrue(self.connection.wait(), msg='waiting for disconnecting')
def test_proper_connect_sasl(self):
'''
The ideal testcase - client is connected, authenticated with SASL and
then disconnected.
'''
self.open_stream(xmpp_server_port)
# if client is not connected, lets raise the AssertionError
self.assertTrue(self.client.get_connect_type())
# client.disconnect() is already called from NBClient via
# _on_connected_failure, no need to call it here
self.client_auth(credentials[0], credentials[1], credentials[2], sasl=1)
self.assertTrue(self.connection.con)
self.assertTrue(self.connection.auth=='sasl', msg='Unable to auth via SASL')
self.do_disconnect()
def test_proper_connect_oldauth(self):
'''
The ideal testcase - client is connected, authenticated with old auth and
then disconnected.
'''
self.open_stream(xmpp_server_port)
self.assertTrue(self.client.get_connect_type())
self.client_auth(credentials[0], credentials[1], credentials[2], sasl=0)
self.assertTrue(self.connection.con)
features = self.client.Dispatcher.Stream.features
if not features.getTag('auth'):
print("Server doesn't support old authentication type, ignoring test")
else:
self.assertTrue(self.connection.auth=='old_auth',
msg='Unable to auth via old_auth')
self.do_disconnect()
def test_connect_to_nonexisting_host(self):
'''
Connect to nonexisting host. DNS request for A records should return
nothing.
'''
self.open_stream(('fdsfsdf.fdsf.fss', 5222))
self.assertTrue(not self.client.get_connect_type())
def test_connect_to_wrong_port(self):
'''
Connect to nonexisting server. DNS request for A records should return an
IP but there shouldn't be XMPP server running on specified port.
'''
self.open_stream((xmpp_server_port[0], 31337))
self.assertTrue(not self.client.get_connect_type())
def test_connect_with_wrong_creds(self):
'''
Connecting with invalid password.
'''
self.open_stream(xmpp_server_port, wrong_pass=True)