Commit d2b60a7d authored by Yann Leboulanger's avatar Yann Leboulanger

add test suite

parent dbe3e6e6
import sys
import os
import getopt
root = os.path.join(os.path.abspath(os.path.dirname(__file__)), '../..')
# look for modules in the CWD, then gajim/test/lib, then gajim/src,
# then everywhere else
sys.path.insert(1, root)
sys.path.insert(1, root + '/test/lib')
def setup_env():
pass
This diff is collapsed.
'''
Module with dummy classes for unit testing of XMPP and related code.
'''
import threading, time
from mock import Mock
from nbxmpp import idlequeue
IDLEQUEUE_INTERVAL = 0.2 # polling interval. 200ms is used in Gajim as default
IDLEMOCK_TIMEOUT = 30 # how long we wait for an event
class IdleQueueThread(threading.Thread):
'''
Thread for regular processing of idlequeue.
'''
def __init__(self):
self.iq = idlequeue.IdleQueue()
self.stop = threading.Event() # Event to stop the thread main loop.
self.stop.clear()
threading.Thread.__init__(self)
def run(self):
while not self.stop.isSet():
self.iq.process()
time.sleep(IDLEQUEUE_INTERVAL)
def stop_thread(self):
self.stop.set()
class IdleMock:
'''
Serves as template for testing objects that are normally controlled by GUI.
Allows to wait for asynchronous callbacks with wait() method.
'''
def __init__(self):
self._event = threading.Event()
self._event.clear()
def wait(self):
'''
Block until some callback sets the event and clearing the event
subsequently.
Returns True if event was set, False on timeout
'''
self._event.wait(IDLEMOCK_TIMEOUT)
if self._event.isSet():
self._event.clear()
return True
else:
return False
def set_event(self):
self._event.set()
class MockConnection(IdleMock, Mock):
'''
Class simulating Connection class from src/common/connection.py
It is derived from Mock in order to avoid defining all methods
from real Connection that are called from NBClient or Dispatcher
( _event_dispatcher for example)
'''
def __init__(self, *args):
self.connect_succeeded = True
IdleMock.__init__(self)
Mock.__init__(self, *args)
def on_connect(self, success, *args):
'''
Method called after connecting - after receiving <stream:features>
from server (NOT after TLS stream restart) or connect failure
'''
self.connect_succeeded = success
self.set_event()
def on_auth(self, con, auth):
'''
Method called after authentication, regardless of the result.
:Parameters:
con : NonBlockingClient
reference to authenticated object
auth : string
type of authetication in case of success ('old_auth', 'sasl') or
None in case of auth failure
'''
self.auth_connection = con
self.auth = auth
self.set_event()
#!/usr/bin/env python
'''
Runs python-nbxmpp's Test Suite
Unit tests tests will be run on each commit.
'''
import sys
import unittest
import getopt
verbose = 1
try:
shortargs = 'hv:'
longargs = 'help verbose='
opts, args = getopt.getopt(sys.argv[1:], shortargs, longargs.split())
except getopt.error, msg:
print msg
print 'for help use --help'
sys.exit(2)
for o, a in opts:
if o in ('-h', '--help'):
print 'runtests [--help] [--verbose level]'
sys.exit()
elif o in ('-v', '--verbose'):
try:
verbose = int(a)
except Exception:
print 'verbose must be a number >= 0'
sys.exit(2)
# new test modules need to be added manually
modules = ( 'unit.test_xmpp_dispatcher_nb',
'unit.test_xmpp_transports_nb',
'unit.test_xmpp_smacks',
#'unit.test_xmpp_client_nb', gajim.org only supports TLS/SSL connections
'unit.test_xmpp_transports_nb2',
)
nb_errors = 0
nb_failures = 0
for mod in modules:
suite = unittest.defaultTestLoader.loadTestsFromName(mod)
result = unittest.TextTestRunner(verbosity=verbose).run(suite)
nb_errors += len(result.errors)
nb_failures += len(result.failures)
sys.exit(nb_errors + nb_failures)
'''
This package just contains plain unit tests
'''
'''
Testing script for NonBlockingClient class (src/common/xmpp/client_nb.py)
It actually connects to a xmpp server.
'''
import unittest
import lib
lib.setup_env()
from xmpp_mocks import MockConnection, IdleQueueThread
from mock import Mock
from nbxmpp import client_nb
# (XMPP server hostname, c2s port). Script will connect to the machine.
xmpp_server_port = ('gajim.org', 5222)
# [username, password, resource]. Script will authenticate to server above
credentials = ['unittest', 'testtest', 'res']
class TestNonBlockingClient(unittest.TestCase):
'''
Test Cases class for NonBlockingClient.
'''
def setUp(self):
''' IdleQueue thread is run and dummy connection is created. '''
self.idlequeue_thread = IdleQueueThread()
self.connection = MockConnection() # for dummy callbacks
self.idlequeue_thread.start()
def tearDown(self):
''' IdleQueue thread is stopped. '''
self.idlequeue_thread.stop_thread()
self.idlequeue_thread.join()
del self.connection
self.client = None
def open_stream(self, server_port, wrong_pass=False):
'''
Method opening the XMPP connection. It returns when <stream:features>
is received from server.
:param server_port: tuple of (hostname, port) for where the client should
connect.
'''
class TempConnection():
def get_password(self, cb, mechanism):
if wrong_pass:
cb('wrong pass')
else:
cb(credentials[1])
def on_connect_failure(self):
pass
self.client = client_nb.NonBlockingClient(
domain=server_port[0],
idlequeue=self.idlequeue_thread.iq,
caller=Mock(realClass=TempConnection))
self.client.connect(
hostname=server_port[0],
port=server_port[1],
on_connect=lambda *args: self.connection.on_connect(True, *args),
on_connect_failure=lambda *args: self.connection.on_connect(
False, *args))
self.assert_(self.connection.wait(),
msg='waiting for callback from client constructor')
# if on_connect was called, client has to be connected and vice versa
if self.connection.connect_succeeded:
self.assert_(self.client.get_connect_type())
else:
self.assert_(not self.client.get_connect_type())
def client_auth(self, username, password, resource, sasl):
'''
Method authenticating connected client with supplied credentials. Returns
when authentication is over.
:param sasl: whether to use sasl (sasl=1) or old (sasl=0) authentication
:todo: to check and be more specific about when it returns
(bind, session..)
'''
self.client.auth(username, password, resource, sasl,
on_auth=self.connection.on_auth)
self.assert_(self.connection.wait(), msg='waiting for authentication')
def do_disconnect(self):
'''
Does disconnecting of connected client. Returns when TCP connection is
closed.
'''
self.client.RegisterDisconnectHandler(self.connection.set_event)
self.client.disconnect()
self.assertTrue(self.connection.wait(), msg='waiting for disconnecting')
def test_proper_connect_sasl(self):
'''
The ideal testcase - client is connected, authenticated with SASL and
then disconnected.
'''
self.open_stream(xmpp_server_port)
# if client is not connected, lets raise the AssertionError
self.assert_(self.client.get_connect_type())
# client.disconnect() is already called from NBClient via
# _on_connected_failure, no need to call it here
self.client_auth(credentials[0], credentials[1], credentials[2], sasl=1)
self.assert_(self.connection.con)
self.assert_(self.connection.auth=='sasl', msg='Unable to auth via SASL')
self.do_disconnect()
def test_proper_connect_oldauth(self):
'''
The ideal testcase - client is connected, authenticated with old auth and
then disconnected.
'''
self.open_stream(xmpp_server_port)
self.assert_(self.client.get_connect_type())
self.client_auth(credentials[0], credentials[1], credentials[2], sasl=0)
self.assert_(self.connection.con)
features = self.client.Dispatcher.Stream.features
if not features.getTag('auth'):
print "Server doesn't support old authentication type, ignoring test"
else:
self.assert_(self.connection.auth=='old_auth',
msg='Unable to auth via old_auth')
self.do_disconnect()
def test_connect_to_nonexisting_host(self):
'''
Connect to nonexisting host. DNS request for A records should return
nothing.
'''
self.open_stream(('fdsfsdf.fdsf.fss', 5222))
self.assert_(not self.client.get_connect_type())
def test_connect_to_wrong_port(self):
'''
Connect to nonexisting server. DNS request for A records should return an
IP but there shouldn't be XMPP server running on specified port.
'''
self.open_stream((xmpp_server_port[0], 31337))
self.assert_(not self.client.get_connect_type())
def test_connect_with_wrong_creds(self):
'''
Connecting with invalid password.
'''
self.open_stream(xmpp_server_port, wrong_pass=True)
self.assert_(self.client.get_connect_type())
self.client_auth(credentials[0], 'wrong pass', credentials[2], sasl=1)
self.assert_(self.connection.auth is None)
self.do_disconnect()
'''
Tests for dispatcher_nb.py
'''
import unittest
import lib
lib.setup_env()
from mock import Mock
from nbxmpp import dispatcher_nb
from nbxmpp import protocol
class TestDispatcherNB(unittest.TestCase):
'''
Test class for NonBlocking dispatcher. Tested dispatcher will be plugged
into a mock client
'''
def setUp(self):
self.dispatcher = dispatcher_nb.XMPPDispatcher()
# Setup mock client
self.client = Mock()
self.client.__str__ = lambda: 'Mock' # FIXME: why do I need this one?
self.client._caller = Mock()
self.client.defaultNamespace = protocol.NS_CLIENT
self.client.Connection = Mock() # mock transport
self.con = self.client.Connection
def tearDown(self):
# Unplug if needed
if hasattr(self.dispatcher, '_owner'):
self.dispatcher.PlugOut()
def _simulate_connect(self):
self.dispatcher.PlugIn(self.client) # client is owner
# Simulate that we have established a connection
self.dispatcher.StreamInit()
self.dispatcher.ProcessNonBlocking("<stream:stream xmlns:stream='http://etherx.jabber.org/streams' xmlns='jabber:client'>")
def test_unbound_namespace_prefix(self):
'''tests our handling of a message with an unbound namespace prefix'''
self._simulate_connect()
msgs = []
def _got_message(conn, msg):
msgs.append(msg)
self.dispatcher.RegisterHandler('message', _got_message)
# should be able to parse a normal message
self.dispatcher.ProcessNonBlocking('<message><body>hello</body></message>')
self.assertEqual(1, len(msgs))
self.dispatcher.ProcessNonBlocking('<message><x:y/></message>')
self.assertEqual(2, len(msgs))
# we should not have been disconnected after that message
self.assertEqual(0, len(self.con.mockGetNamedCalls('pollend')))
self.assertEqual(0, len(self.con.mockGetNamedCalls('disconnect')))
# we should be able to keep parsing
self.dispatcher.ProcessNonBlocking('<message><body>still here?</body></message>')
self.assertEqual(3, len(msgs))
def test_process_non_blocking(self):
''' Check for ProcessNonBlocking return types '''
self._simulate_connect()
process = self.dispatcher.ProcessNonBlocking
# length of data expected
data = "Please don't fail"
result = process(data)
self.assertEqual(result, len(data))
# no data processed, link shall still be active
result = process('')
self.assertEqual(result, '0')
self.assertEqual(0, len(self.con.mockGetNamedCalls('pollend')) +
len(self.con.mockGetNamedCalls('disconnect')))
# simulate disconnect
result = process('</stream:stream>')
self.assertEqual(1, len(self.client.mockGetNamedCalls('disconnect')))
def test_return_stanza_handler(self):
''' Test sasl_error_conditions transformation in protocol.py '''
# quick'n dirty...I wasn't aware of it existance and thought it would
# always fail :-)
self._simulate_connect()
stanza = "<iq type='get' />"
def send(data):
self.assertEqual(str(data), '<iq xmlns="jabber:client" type="error"><error code="501" type="cancel"><feature-not-implemented xmlns="urn:ietf:params:xml:ns:xmpp-stanzas" /><text xmlns="urn:ietf:params:xml:ns:xmpp-stanzas">The feature requested is not implemented by the recipient or server and therefore cannot be processed.</text></error></iq>')
self.client.send = send
self.dispatcher.ProcessNonBlocking(stanza)
if __name__ == '__main__':
unittest.main()
'''
Tests for smacks.py Stream Management
'''
import unittest
import lib
lib.setup_env()
from mock import Mock
from nbxmpp import dispatcher_nb
from nbxmpp import protocol
from nbxmpp import smacks
class TestDispatcherNB(unittest.TestCase):
'''
Test class for NonBlocking dispatcher. Tested dispatcher will be plugged
into a mock client
'''
def setUp(self):
self.dispatcher = dispatcher_nb.XMPPDispatcher()
# Setup mock client
self.client = Mock()
self.client.__str__ = lambda: 'Mock' # FIXME: why do I need this one?
self.client._caller = Mock()
self.client.defaultNamespace = protocol.NS_CLIENT
self.client.Connection = Mock() # mock transport
self.con = self.client.Connection
self.con.sm = smacks.Smacks(self.con)
def tearDown(self):
# Unplug if needed
if hasattr(self.dispatcher, '_owner'):
self.dispatcher.PlugOut()
def _simulate_connect(self):
self.dispatcher.PlugIn(self.client) # client is owner
self.con.sm.set_owner(self.client)
self.dispatcher.sm = self.con.sm
# Simulate that we have established a connection
self.dispatcher.StreamInit()
self.dispatcher.ProcessNonBlocking("<stream:stream "
"xmlns:stream='http://etherx.jabber.org/streams' "
"xmlns='jabber:client'>")
self.dispatcher.ProcessNonBlocking("<stream:features> "
"<sm xmlns='urn:xmpp:sm:2'> <optional/> </sm> </stream:features>")
self.con.sm.negociate()
self.dispatcher.ProcessNonBlocking("<enabled xmlns='urn:xmpp:sm:2' "
"id='some-long-sm-id' resume='true'/>")
assert(self.con.sm.enabled)
def _simulate_resume(self):
self.con.sm.resume_request()
# Resuming acknowledging 5 stanzas
self.dispatcher.ProcessNonBlocking("<resumed xmlns='urn:xmpp:sm:2' "
"id='some-long-sm-id' h='5'/>")
assert(self.con.sm.resuming)
def _send(self, send, r, stanza):
for i in range(r):
send(stanza)
def test_messages(self):
message = '<message><body>Helloo </body></message>'
iq = '''<iq from='proxy.jabber.ru' to='j.xxxxxxxx.org/Gajim' type='error' id='18'>
<query xmlns='http://jabber.org/protocol/bytestreams'/>
<error code='403' type='auth'>
<forbidden xmlns='urn:ietf:params:xml:ns:xmpp-stanzas'/>
</error>
</iq>'''
presence = '''<presence from='xxxxxxxxx.com/Talk.v1044194B1E2' to='j.xxxxxxxx.org'>
<priority>24</priority>
<c node="http://www.google.com/xmpp/client/caps" ver="1.0.0.104" ext="share-v1 voice-v1" xmlns="http://jabber.org/protocol/caps"/>
<x stamp="20110614T23:17:51" xmlns="jabber:x:delay"/>
<status>In love Kakashi Sensei :P</status>
<x xmlns="vcard-temp:x:update">
<photo>db4b7c52e39ba28562c74542d5988d47f09108a3</photo>
</x>
</presence> '''
self._simulate_connect()
uqueue = self.con.sm.uqueue
self.assertEqual(self.con.sm.out_h, 0)
self.assertEqual(self.con.sm.in_h, 0)
# The server sends 10 stanzas
self._send(self.dispatcher.ProcessNonBlocking, 5, message)
self._send(self.dispatcher.ProcessNonBlocking, 4, iq)
self._send(self.dispatcher.ProcessNonBlocking, 1, presence)
# The client has recieved 10 stanzas and sent none
self.assertEqual(self.con.sm.in_h, 10)
self.assertEqual(self.con.sm.out_h, 0)
m = protocol.Message()
# The client sends 10 stanzas
for i in range(10):
m = protocol.Message(body=str(i))
self.dispatcher.send(m)
# Client sends 10 stanzas and put them in the queue
self.assertEqual(self.con.sm.out_h, 10)
self.assertEqual(len(uqueue), 10)
# The server acknowledges that it recieved 5 stanzas
self.dispatcher.ProcessNonBlocking("<a xmlns='urn:xmpp:sm:2' h='5'/>")
# 5 stanzas are removed from the queue, only 5 stanzas are left
self.assertEqual(len(uqueue), 5)
# Check for the right order of stanzas in the queue
l = ['5', '6', '7', '8', '9']
for i in uqueue:
self.assertEqual(i.getBody(), l[0])
l.pop(0)
def test_resumption(self):
self._simulate_connect()
m = protocol.Message()
# The client sends 5 stanzas
for i in range(5):
m = protocol.Message(body=str(i))
self.dispatcher.send(m)
self._simulate_resume()
# No stanzas left
self.assertEqual(len(self.con.sm.uqueue), 0)
if __name__ == '__main__':
unittest.main()
'''
Unit test for tranports classes.
'''
import unittest
import lib
lib.setup_env()
from nbxmpp import transports_nb
class TestModuleLevelFunctions(unittest.TestCase):
'''
Test class for functions defined at module level
'''
def test_urisplit(self):
def check_uri(uri, proto, host, port, path):
_proto, _host, _port, _path = transports_nb.urisplit(uri)
self.assertEqual(proto, _proto)
self.assertEqual(host, _host)
self.assertEqual(path, _path)
self.assertEqual(port, _port)
check_uri('http://httpcm.jabber.org:5280/webclient', proto='http',
host='httpcm.jabber.org', port=5280, path='/webclient')
check_uri('http://httpcm.jabber.org/webclient', proto='http',
host='httpcm.jabber.org', port=80, path='/webclient')
check_uri('https://httpcm.jabber.org/webclient', proto='https',
host='httpcm.jabber.org', port=443, path='/webclient')
def test_get_proxy_data_from_dict(self):
def check_dict(proxy_dict, host, port, user, passwd):
_host, _port, _user, _passwd = transports_nb.get_proxy_data_from_dict(
proxy_dict)
self.assertEqual(_host, host)
self.assertEqual(_port, port)
self.assertEqual(_user, user)
self.assertEqual(_passwd, passwd)
bosh_dict = {'bosh_content': u'text/xml; charset=utf-8',
'bosh_hold': 2,
'bosh_http_pipelining': False,
'bosh_uri': u'http://gajim.org:5280/http-bind',
'bosh_useproxy': False,
'bosh_wait': 30,
'bosh_wait_for_restart_response': False,
'host': u'172.16.99.11',
'pass': u'pass',
'port': 3128,
'type': u'bosh',
'useauth': True,
'user': u'user'}
check_dict(bosh_dict, host=u'gajim.org', port=5280, user=u'user',
passwd=u'pass')
proxy_dict = {'bosh_content': u'text/xml; charset=utf-8',
'bosh_hold': 2,
'bosh_http_pipelining': False,
'bosh_port': 5280,
'bosh_uri': u'',
'bosh_useproxy': True,
'bosh_wait': 30,
'bosh_wait_for_restart_response': False,
'host': u'172.16.99.11',
'pass': u'pass',
'port': 3128,
'type': 'socks5',
'useauth': True,
'user': u'user'}
check_dict(proxy_dict, host=u'172.16.99.11', port=3128, user=u'user',
passwd=u'pass')
if __name__ == '__main__':
unittest.main()
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment