diff --git a/src/common/connection.py b/src/common/connection.py index 865cda853801fe708d47af65af57006171c34cc2..edda989b6343daf88e7abc3783d6ac49a1cad5d2 100644 --- a/src/common/connection.py +++ b/src/common/connection.py @@ -57,9 +57,9 @@ from common import gajim from common import gpg from common import passwords from common import exceptions - from connection_handlers import * +from xmpp import Smacks from string import Template import logging log = logging.getLogger('gajim.c.connection') @@ -717,6 +717,9 @@ class Connection(CommonConnection, ConnectionHandlers): self.privacy_rules_requested = False self.streamError = '' self.secret_hmac = str(random.random())[2:] + + self.sm = Smacks(self) # Stream Management + gajim.ged.register_event_handler('privacy-list-received', ged.CORE, self._nec_privacy_list_received) gajim.ged.register_event_handler('agent-info-error-received', ged.CORE, @@ -780,6 +783,8 @@ class Connection(CommonConnection, ConnectionHandlers): self.connected = 0 self.time_to_reconnect = None self.privacy_rules_supported = False + if on_purpose: + self.sm = Smacks(self) if self.connection: # make sure previous connection is completely closed gajim.proxy65_manager.disconnect(self.connection) @@ -788,6 +793,15 @@ class Connection(CommonConnection, ConnectionHandlers): self.connection.disconnect() self.last_connection = None self.connection = None + def set_oldst(self): # Set old state + if self.old_show: + self.connected = gajim.SHOW_LIST.index(self.old_show) + gajim.nec.push_incoming_event(OurShowEvent(None, conn=self, + show=self.connected)) + else: # we default to online + self.connected = 2 + gajim.nec.push_incoming_event(OurShowEvent(None, conn=self, + show=gajim.SHOW_LIST[self.connected])) def _disconnectedReconnCB(self): """ @@ -800,8 +814,9 @@ class Connection(CommonConnection, ConnectionHandlers): self.old_show = gajim.SHOW_LIST[self.connected] self.connected = 0 if not self.on_purpose: - gajim.nec.push_incoming_event(OurShowEvent(None, conn=self, - show='offline')) + if not (self.sm and self.sm.resumption): + gajim.nec.push_incoming_event(OurShowEvent(None, conn=self, + show='offline')) self.disconnect() if gajim.config.get_per('accounts', self.name, 'autoreconnect'): self.connected = -1 @@ -989,7 +1004,14 @@ class Connection(CommonConnection, ConnectionHandlers): if self.connection: return self.connection, '' - if data: + if self.sm.resuming and self.sm.location: + # If resuming and server gave a location, connect from there + hostname = self.sm.location + self.try_connecting_for_foo_secs = gajim.config.get_per('accounts', + self.name, 'try_connecting_for_foo_secs') + use_custom = False + + elif data: hostname = data['hostname'] self.try_connecting_for_foo_secs = 45 p = data['proxy'] @@ -1607,12 +1629,16 @@ class Connection(CommonConnection, ConnectionHandlers): self.connection.set_send_timeout2(self.pingalives, self.sendPing) self.connection.onreceive(None) - self.request_message_archiving_preferences() - self.privacy_rules_requested = False - self.discoverInfo(gajim.config.get_per('accounts', self.name, 'hostname'), - id_prefix='Gajim_') + # If we are not resuming, we ask for discovery info + # and archiving preferences + if not self.sm.resuming: + self.request_message_archiving_preferences() + self.discoverInfo(gajim.config.get_per('accounts', self.name, + 'hostname'), id_prefix='Gajim_') + + self.sm.resuming = False # back to previous state # Discover Stun server(s) gajim.resolver.resolve('_stun._udp.' + helpers.idn_to_ascii( self.connected_hostname), self._on_stun_resolved) diff --git a/src/common/xmpp/__init__.py b/src/common/xmpp/__init__.py index 4ebc4cfa3d250f2c5c3b6bc1621adb6e2a968eb2..46037a1cb94e4ed5da9025e9a7583fb19b1282ae 100644 --- a/src/common/xmpp/__init__.py +++ b/src/common/xmpp/__init__.py @@ -15,3 +15,4 @@ import simplexml, protocol, auth_nb, transports_nb, roster_nb import dispatcher_nb, features_nb, idlequeue, bosh, tls_nb, proxy_connectors from client_nb import NonBlockingClient from plugin import PlugIn +from smacks import Smacks diff --git a/src/common/xmpp/auth_nb.py b/src/common/xmpp/auth_nb.py index fbe30aef8e5fa86142a5c5cb246379b9b5b2040f..3f4fcb0155f09a032966031b37db4313c1939fcf 100644 --- a/src/common/xmpp/auth_nb.py +++ b/src/common/xmpp/auth_nb.py @@ -22,8 +22,10 @@ See client_nb.py """ from protocol import NS_SASL, NS_SESSION, NS_STREAMS, NS_BIND, NS_AUTH +from protocol import NS_STREAM_MGMT from protocol import Node, NodeProcessed, isResultNode, Iq, Protocol, JID from plugin import PlugIn +from smacks import Smacks import base64 import random import itertools @@ -142,7 +144,7 @@ class SASL(PlugIn): elif self._owner.Dispatcher.Stream.features: try: self.FeaturesHandler(self._owner.Dispatcher, - self._owner.Dispatcher.Stream.features) + self._owner.Dispatcher.Stream.features) except NodeProcessed: pass else: @@ -154,16 +156,16 @@ class SASL(PlugIn): """ if 'features' in self._owner.__dict__: self._owner.UnregisterHandler('features', self.FeaturesHandler, - xmlns=NS_STREAMS) + xmlns=NS_STREAMS) if 'challenge' in self._owner.__dict__: self._owner.UnregisterHandler('challenge', self.SASLHandler, - xmlns=NS_SASL) + xmlns=NS_SASL) if 'failure' in self._owner.__dict__: self._owner.UnregisterHandler('failure', self.SASLHandler, - xmlns=NS_SASL) + xmlns=NS_SASL) if 'success' in self._owner.__dict__: self._owner.UnregisterHandler('success', self.SASLHandler, - xmlns=NS_SASL) + xmlns=NS_SASL) def auth(self): """ @@ -178,12 +180,12 @@ class SASL(PlugIn): elif self._owner.Dispatcher.Stream.features: try: self.FeaturesHandler(self._owner.Dispatcher, - self._owner.Dispatcher.Stream.features) + self._owner.Dispatcher.Stream.features) except NodeProcessed: pass else: self._owner.RegisterHandler('features', - self.FeaturesHandler, xmlns=NS_STREAMS) + self.FeaturesHandler, xmlns=NS_STREAMS) def FeaturesHandler(self, conn, feats): """ @@ -198,7 +200,8 @@ class SASL(PlugIn): 'mechanism'): self.mecs.append(mec.getData()) - self._owner.RegisterHandler('challenge', self.SASLHandler, xmlns=NS_SASL) + self._owner.RegisterHandler('challenge', self.SASLHandler, + xmlns=NS_SASL) self._owner.RegisterHandler('failure', self.SASLHandler, xmlns=NS_SASL) self._owner.RegisterHandler('success', self.SASLHandler, xmlns=NS_SASL) self.MechanismHandler() @@ -206,7 +209,8 @@ class SASL(PlugIn): def MechanismHandler(self): if 'ANONYMOUS' in self.mecs and self.username is None: self.mecs.remove('ANONYMOUS') - node = Node('auth', attrs={'xmlns': NS_SASL, 'mechanism': 'ANONYMOUS'}) + node = Node('auth', attrs={'xmlns': NS_SASL, + 'mechanism': 'ANONYMOUS'}) self.mechanism = 'ANONYMOUS' self.startsasl = SASL_IN_PROCESS self._owner.send(str(node)) @@ -226,11 +230,11 @@ class SASL(PlugIn): self.mecs.remove('GSSAPI') try: self.gss_vc = kerberos.authGSSClientInit('xmpp@' + \ - self._owner.xmpp_hostname)[1] + self._owner.xmpp_hostname)[1] kerberos.authGSSClientStep(self.gss_vc, '') response = kerberos.authGSSClientResponse(self.gss_vc) - node=Node('auth', attrs={'xmlns': NS_SASL, 'mechanism': 'GSSAPI'}, - payload=(response or '')) + node=Node('auth', attrs={'xmlns': NS_SASL, + 'mechanism': 'GSSAPI'}, payload=(response or '')) self.mechanism = 'GSSAPI' self.gss_step = GSS_STATE_STEP self.startsasl = SASL_IN_PROCESS @@ -247,7 +251,8 @@ class SASL(PlugIn): raise NodeProcessed if 'DIGEST-MD5' in self.mecs: self.mecs.remove('DIGEST-MD5') - node = Node('auth', attrs={'xmlns': NS_SASL, 'mechanism': 'DIGEST-MD5'}) + node = Node('auth', attrs={'xmlns': NS_SASL, + 'mechanism': 'DIGEST-MD5'}) self.mechanism = 'DIGEST-MD5' self.startsasl = SASL_IN_PROCESS self._owner.send(str(node)) @@ -307,13 +312,13 @@ class SASL(PlugIn): handlers = self._owner.Dispatcher.dumpHandlers() # Bosh specific dispatcher replugging - # save old features. They will be used in case we won't get response on - # stream restart after SASL auth (happens with XMPP over BOSH with - # Openfire) + # save old features. They will be used in case we won't get response + # on stream restart after SASL auth (happens with XMPP over BOSH + # with Openfire) old_features = self._owner.Dispatcher.Stream.features self._owner.Dispatcher.PlugOut() dispatcher_nb.Dispatcher.get_instance().PlugIn(self._owner, - after_SASL=True, old_features=old_features) + after_SASL=True, old_features=old_features) self._owner.Dispatcher.restoreHandlers(handlers) self._owner.User = self.username @@ -333,12 +338,12 @@ class SASL(PlugIn): rc = kerberos.authGSSClientUnwrap(self.gss_vc, incoming_data) response = kerberos.authGSSClientResponse(self.gss_vc) rc = kerberos.authGSSClientWrap(self.gss_vc, response, - kerberos.authGSSClientUserName(self.gss_vc)) + kerberos.authGSSClientUserName(self.gss_vc)) response = kerberos.authGSSClientResponse(self.gss_vc) if not response: response = '' self._owner.send(Node('response', attrs={'xmlns': NS_SASL}, - payload=response).__str__()) + payload=response).__str__()) raise NodeProcessed if self.mechanism == 'SCRAM-SHA-1': hashfn = hashlib.sha1 @@ -416,8 +421,8 @@ class SASL(PlugIn): else: self.resp['realm'] = self._owner.Server self.resp['nonce'] = chal['nonce'] - self.resp['cnonce'] = ''.join("%x" % randint(0, 2**28) for randint in - itertools.repeat(random.randint, 7)) + self.resp['cnonce'] = ''.join("%x" % randint(0, 2**28) for randint \ + in itertools.repeat(random.randint, 7)) self.resp['nc'] = ('00000001') self.resp['qop'] = 'auth' self.resp['digest-uri'] = 'xmpp/' + self._owner.Server @@ -477,14 +482,15 @@ class SASL(PlugIn): else: sasl_data += u'%s="%s",' % (key, self.resp[key]) sasl_data = sasl_data[:-1].encode('utf-8').encode('base64').replace( - '\r', '').replace('\n', '') - node = Node('response', attrs={'xmlns':NS_SASL}, payload=[sasl_data]) + '\r', '').replace('\n', '') + node = Node('response', attrs={'xmlns': NS_SASL}, + payload=[sasl_data]) elif self.mechanism == 'PLAIN': sasl_data = u'\x00%s\x00%s' % (self.username, self.password) sasl_data = sasl_data.encode('utf-8').encode('base64').replace( - '\n', '') + '\n', '') node = Node('auth', attrs={'xmlns': NS_SASL, 'mechanism': 'PLAIN'}, - payload=[sasl_data]) + payload=[sasl_data]) self._owner.send(str(node)) @@ -516,8 +522,8 @@ class NonBlockingNonSASL(PlugIn): self.owner = owner owner.Dispatcher.SendAndWaitForResponse( - Iq('get', NS_AUTH, payload=[Node('username', payload=[self.user])]), - func=self._on_username) + Iq('get', NS_AUTH, payload=[Node('username', payload=[self.user])]), + func=self._on_username) def _on_username(self, resp): if not isResultNode(resp): @@ -532,8 +538,8 @@ class NonBlockingNonSASL(PlugIn): if query.getTag('digest'): log.info("Performing digest authentication") query.setTagData('digest', - hashlib.sha1(self.owner.Dispatcher.Stream._document_attrs['id'] - + self.password).hexdigest()) + hashlib.sha1(self.owner.Dispatcher.Stream._document_attrs['id'] + + self.password).hexdigest()) if query.getTag('password'): query.delChild('password') self._method = 'digest' @@ -548,23 +554,25 @@ class NonBlockingNonSASL(PlugIn): def hash_n_times(s, count): return count and hasher(hash_n_times(s, count-1)) or s - hash_ = hash_n_times(hasher(hasher(self.password) + token), int(seq)) + hash_ = hash_n_times(hasher(hasher(self.password) + token), + int(seq)) query.setTagData('hash', hash_) self._method='0k' else: log.warn("Secure methods unsupported, performing plain text \ - authentication") + authentication") query.setTagData('password', self.password) self._method = 'plain' - resp = self.owner.Dispatcher.SendAndWaitForResponse(iq, func=self._on_auth) + resp = self.owner.Dispatcher.SendAndWaitForResponse(iq, + func=self._on_auth) def _on_auth(self, resp): if isResultNode(resp): log.info('Sucessfully authenticated with remote host.') self.owner.User = self.user self.owner.Resource = self.resource - self.owner._registered_name = self.owner.User+'@'+self.owner.Server+\ - '/'+self.owner.Resource + self.owner._registered_name = self.owner.User + '@' + \ + self.owner.Server+ '/' + self.owner.Resource return self.on_auth(self._method) log.info('Authentication failed!') return self.on_auth(None) @@ -579,24 +587,34 @@ class NonBlockingBind(PlugIn): def __init__(self): PlugIn.__init__(self) self.bound = None + self.supports_sm = False + self.resuming = False def plugin(self, owner): ''' Start resource binding, if allowed at this time. Used internally. ''' if self._owner.Dispatcher.Stream.features: try: self.FeaturesHandler(self._owner.Dispatcher, - self._owner.Dispatcher.Stream.features) + self._owner.Dispatcher.Stream.features) except NodeProcessed: pass else: self._owner.RegisterHandler('features', self.FeaturesHandler, - xmlns=NS_STREAMS) + xmlns=NS_STREAMS) def FeaturesHandler(self, conn, feats): """ Determine if server supports resource binding and set some internal - attributes accordingly + attributes accordingly. + + It also checks if server supports stream management """ + + if feats.getTag('sm', namespace=NS_STREAM_MGMT): + self.supports_sm = True # server supports stream management + if self.resuming: + self._owner._caller.sm.resume_request() + if not feats.getTag('bind', namespace=NS_BIND): log.info('Server does not requested binding.') # we try to bind resource anyway @@ -614,12 +632,14 @@ class NonBlockingBind(PlugIn): Remove Bind handler from owner's dispatcher. Used internally """ self._owner.UnregisterHandler('features', self.FeaturesHandler, - xmlns=NS_STREAMS) + xmlns=NS_STREAMS) def NonBlockingBind(self, resource=None, on_bound=None): """ Perform binding. Use provided resource name or random (if not provided). """ + if self.resuming: # We don't bind if we resume the stream + return self.on_bound = on_bound self._resource = resource if self._resource: @@ -629,8 +649,9 @@ class NonBlockingBind(PlugIn): self._owner.onreceive(None) self._owner.Dispatcher.SendAndWaitForResponse( - Protocol('iq', typ='set', payload=[Node('bind', attrs={'xmlns':NS_BIND}, - payload=self._resource)]), func=self._on_bound) + Protocol('iq', typ='set', payload=[Node('bind', + attrs={'xmlns': NS_BIND}, payload=self._resource)]), + func=self._on_bound) def _on_bound(self, resp): if isResultNode(resp): @@ -640,14 +661,22 @@ class NonBlockingBind(PlugIn): jid = JID(resp.getTag('bind').getTagData('jid')) self._owner.User = jid.getNode() self._owner.Resource = jid.getResource() + # Only negociate stream management after bounded + sm = self._owner._caller.sm + if self.supports_sm: + # starts negociation + sm.set_owner(self._owner) + sm.negociate() + self._owner.Dispatcher.sm = sm + if hasattr(self, 'session') and self.session == -1: # Server don't want us to initialize a session log.info('No session required.') self.on_bound('ok') else: self._owner.SendAndWaitForResponse(Protocol('iq', typ='set', - payload=[Node('session', attrs={'xmlns':NS_SESSION})]), - func=self._on_session) + payload=[Node('session', attrs={'xmlns':NS_SESSION})]), + func=self._on_session) return if resp: log.info('Binding failed: %s.' % resp.getTag('error')) diff --git a/src/common/xmpp/client_nb.py b/src/common/xmpp/client_nb.py index dc4c4476b8381e00f28104ca8a5098f01032278d..92b182765611b8ef566e4363d8835b61371bf1d5 100644 --- a/src/common/xmpp/client_nb.py +++ b/src/common/xmpp/client_nb.py @@ -521,7 +521,16 @@ class NonBlockingClient: self.connected = None # FIXME: is this intended? We use ''elsewhere self._on_sasl_auth(None) elif self.SASL.startsasl == 'success': - auth_nb.NonBlockingBind.get_instance().PlugIn(self) + nb_bind = auth_nb.NonBlockingBind.get_instance() + sm = self._caller.sm + if sm._owner and sm.resumption: + nb_bind.resuming = True + sm.set_owner(self) + self.Dispatcher.sm = sm + nb_bind.PlugIn(self) + return + + nb_bind.PlugIn(self) self.onreceive(self._on_auth_bind) return True diff --git a/src/common/xmpp/dispatcher_nb.py b/src/common/xmpp/dispatcher_nb.py index cca56f33ab7ee848e042fe67269471d2d372f0e2..9d54822f33ccfbcbc3f330315b9f7aee9a7f3ae6 100644 --- a/src/common/xmpp/dispatcher_nb.py +++ b/src/common/xmpp/dispatcher_nb.py @@ -90,6 +90,9 @@ class XMPPDispatcher(PlugIn): self.SendAndWaitForResponse, self.SendAndCallForResponse, self.getAnID, self.Event, self.send] + # Let the dispatcher know if there is support for stream management + self.sm = None + def getAnID(self): global outgoingID outgoingID += 1 @@ -417,6 +420,12 @@ class XMPPDispatcher(PlugIn): stanza.props = stanza.getProperties() ID = stanza.getID() + # If server supports stream management + if self.sm and self.sm.enabled and (stanza.getName() != 'r' and + stanza.getName() != 'a' and stanza.getName() != 'enabled' and + stanza.getName() != 'resumed'): + # increments the number of stanzas that has been handled + self.sm.in_h = self.sm.in_h + 1 list_ = ['default'] # we will use all handlers: if typ in self.handlers[xmlns][name]: list_.append(typ) # from very common... @@ -525,6 +534,14 @@ class XMPPDispatcher(PlugIn): ID = stanza.getID() if self._owner._registered_name and not stanza.getAttr('from'): stanza.setAttr('from', self._owner._registered_name) + + # If no ID then it is a whitespace + if self.sm and self.sm.enabled and ID: + self.sm.uqueue.append(stanza) + self.sm.out_h = self.sm.out_h + 1 + if len(self.sm.uqueue) > self.sm.max_queue: + self.sm.request_ack() + self._owner.Connection.send(stanza, now) return ID diff --git a/src/common/xmpp/protocol.py b/src/common/xmpp/protocol.py index 45453310e8f65ac0e4bc059e39dfa80b1f5846e8..58c99c60841c44f4ae2554c810f36c004bc548ed 100644 --- a/src/common/xmpp/protocol.py +++ b/src/common/xmpp/protocol.py @@ -149,6 +149,7 @@ NS_DATA_LAYOUT = 'http://jabber.org/protocol/xdata-layout' # XEP-0141 NS_DATA_VALIDATE = 'http://jabber.org/protocol/xdata-validate' # XEP-0122 NS_XMPP_STREAMS = 'urn:ietf:params:xml:ns:xmpp-streams' NS_RECEIPTS = 'urn:xmpp:receipts' +NS_STREAM_MGMT = 'urn:xmpp:sm:2' # XEP-198 xmpp_stream_error_conditions = ''' bad-format -- -- -- The entity has sent XML that cannot be processed. @@ -984,6 +985,34 @@ class Iq(Protocol): iq.setQueryNS(self.getQueryNS()) return iq +class Acks(Node): + """ + Acknowledgement elements for Stream Management + """ + def __init__(self, nsp=NS_STREAM_MGMT): + Node.__init__(self, None, {}, [], None, None,False, None) + self.setNamespace(nsp) + + def buildAnswer(self, handled): + """ + handled is the number of stanzas handled + """ + self.setName('a') + self.setAttr('h', handled) + + def buildRequest(self): + self.setName('r') + + def buildEnable(self, resume=False): + self.setName('enable') + if resume: + self.setAttr('resume', 'true') + + def buildResume(self, handled, previd): + self.setName('resume') + self.setAttr('h', handled) + self.setAttr('previd', previd) + class ErrorNode(Node): """ XMPP-style error element diff --git a/src/common/xmpp/smacks.py b/src/common/xmpp/smacks.py new file mode 100644 index 0000000000000000000000000000000000000000..c26fb1c0560ca70de378659468a8a909a02f2e2d --- /dev/null +++ b/src/common/xmpp/smacks.py @@ -0,0 +1,129 @@ +from protocol import Acks +from protocol import NS_STREAM_MGMT +import logging +log = logging.getLogger('gajim.c.x.smacks') + +class Smacks(): + ''' + This is Smacks is the Stream Management class. It takes care of requesting + and sending acks. Also, it keeps track of the unhandled outgoing stanzas. + + The dispatcher has to be able to access this class to increment the + number of handled stanzas + ''' + + def __init__(self, con): + self.con = con # Connection object + self.out_h = 0 # Outgoing stanzas handled + self.in_h = 0 # Incoming stanzas handled + self.uqueue = [] # Unhandled stanzas queue + self.session_id = None + self.resumption = False # If server supports resume + # Max number of stanzas in queue before making a request + self.max_queue = 5 + self._owner = None + self.resuming = False + self.enabled = False # If SM is enabled + self.location = None + + def set_owner(self, owner): + self._owner = owner + + # Register handlers + owner.Dispatcher.RegisterNamespace(NS_STREAM_MGMT) + owner.Dispatcher.RegisterHandler('enabled', self._neg_response, + xmlns=NS_STREAM_MGMT) + owner.Dispatcher.RegisterHandler('r', self.send_ack, + xmlns=NS_STREAM_MGMT) + owner.Dispatcher.RegisterHandler('a', self.check_ack, + xmlns=NS_STREAM_MGMT) + owner.Dispatcher.RegisterHandler('resumed', self.check_ack, + xmlns=NS_STREAM_MGMT) + owner.Dispatcher.RegisterHandler('failed', self.error_handling, + xmlns=NS_STREAM_MGMT) + + def _neg_response(self, disp, stanza): + r = stanza.getAttr('resume') + if r == 'true' or r == 'True' or r == '1': + self.resumption = True + self.session_id = stanza.getAttr('id') + + if r == 'false' or r == 'False' or r == '0': + self.negociate(False) + + l = stanza.getAttr('location') + if l: + self.location = l + + def negociate(self, resume=True): + # Every time we attempt to negociate, we must erase all previous info + # about any previous session + self.uqueue = [] + self.in_h = 0 + self.out_h = 0 + self.session_id = None + self.enabled = True + + stanza = Acks() + stanza.buildEnable(resume) + self._owner.Connection.send(stanza, now=True) + + def resume_request(self): + if not self.session_id: + self.resuming = False + log.error('Attempted to resume without a valid session id ') + return + resume = Acks() + resume.buildResume(self.in_h, self.session_id) + self._owner.Connection.send(resume, False) + + def send_ack(self, disp, stanza): + ack = Acks() + ack.buildAnswer(self.in_h) + self._owner.Connection.send(ack, False) + + def request_ack(self): + r = Acks() + r.buildRequest() + self._owner.Connection.send(r, False) + + def check_ack(self, disp, stanza): + ''' + Checks if the number of stanzas sent are the same as the + number of stanzas received by the server. Pops stanzas that were + handled by the server from the queue. + ''' + h = int(stanza.getAttr('h')) + diff = self.out_h - h + + if len(self.uqueue) < diff or diff < 0: + log.error('Server and client number of stanzas handled mismatch ') + else: + while (len(self.uqueue) > diff): + self.uqueue.pop(0) + + if stanza.getName() == 'resumed': + self.resuming = True + self.con.set_oldst() + if self.uqueue != []: + for i in self.uqueue: + self._owner.Connection.send(i, False) + + def error_handling(self, disp, stanza): + # If the server doesn't recognize previd, forget about resuming + # Ask for service discovery, etc.. + if stanza.getTag('item-not-found'): + self.resuming = False + self.negociate() + self.con._discover_server_at_connection(self.con.connection) + return + + # Doesn't support resumption + if stanza.getTag('feature-not-implemented'): + self.negociate(False) + return + + if stanza.getTag('unexpected-request'): + self.enabled = False + log.error('Gajim failed to negociate Stream Management') + return diff --git a/test/unit/test_xmpp_smacks.py b/test/unit/test_xmpp_smacks.py new file mode 100644 index 0000000000000000000000000000000000000000..df6ab412268ba08ab407448f2f18958326c62291 --- /dev/null +++ b/test/unit/test_xmpp_smacks.py @@ -0,0 +1,133 @@ +''' +Tests for smacks.py Stream Management +''' +import unittest + +import lib +lib.setup_env() + +from mock import Mock + +from common.xmpp import dispatcher_nb +from common.xmpp import protocol +from common.xmpp import smacks + +class TestDispatcherNB(unittest.TestCase): + ''' + Test class for NonBlocking dispatcher. Tested dispatcher will be plugged + into a mock client + ''' + def setUp(self): + self.dispatcher = dispatcher_nb.XMPPDispatcher() + + # Setup mock client + self.client = Mock() + self.client.__str__ = lambda: 'Mock' # FIXME: why do I need this one? + self.client._caller = Mock() + self.client.defaultNamespace = protocol.NS_CLIENT + self.client.Connection = Mock() # mock transport + self.con = self.client.Connection + self.con.sm = smacks.Smacks(self.con) + + def tearDown(self): + # Unplug if needed + if hasattr(self.dispatcher, '_owner'): + self.dispatcher.PlugOut() + + def _simulate_connect(self): + self.dispatcher.PlugIn(self.client) # client is owner + self.con.sm.set_owner(self.client) + self.dispatcher.sm = self.con.sm + # Simulate that we have established a connection + self.dispatcher.StreamInit() + self.dispatcher.ProcessNonBlocking("<stream:stream " + "xmlns:stream='http://etherx.jabber.org/streams' " + "xmlns='jabber:client'>") + self.dispatcher.ProcessNonBlocking("<stream:features> " + "<sm xmlns='urn:xmpp:sm:2'> <optional/> </sm> </stream:features>") + self.con.sm.negociate() + self.dispatcher.ProcessNonBlocking("<enabled xmlns='urn:xmpp:sm:2' " + "id='some-long-sm-id' resume='true'/>") + assert(self.con.sm.enabled) + + def _simulate_resume(self): + self.con.sm.resume_request() + # Resuming acknowledging 5 stanzas + self.dispatcher.ProcessNonBlocking("<resumed xmlns='urn:xmpp:sm:2' " + "id='some-long-sm-id' h='5'/>") + assert(self.con.sm.resuming) + + def _send(self, send, r, stanza): + for i in range(r): + send(stanza) + def test_messages(self): + message = '<message><body>Helloo </body></message>' + iq = '''<iq from='proxy.jabber.ru' to='j.xxxxxxxx.org/Gajim' type='error' id='18'> + <query xmlns='http://jabber.org/protocol/bytestreams'/> + <error code='403' type='auth'> + <forbidden xmlns='urn:ietf:params:xml:ns:xmpp-stanzas'/> + </error> + </iq>''' + presence = '''<presence from='xxxxxxxxx.com/Talk.v1044194B1E2' to='j.xxxxxxxx.org'> + <priority>24</priority> + <c node="http://www.google.com/xmpp/client/caps" ver="1.0.0.104" ext="share-v1 voice-v1" xmlns="http://jabber.org/protocol/caps"/> + <x stamp="20110614T23:17:51" xmlns="jabber:x:delay"/> + <status>In love Kakashi Sensei :P</status> + <x xmlns="vcard-temp:x:update"> + <photo>db4b7c52e39ba28562c74542d5988d47f09108a3</photo> + </x> + </presence> ''' + + self._simulate_connect() + uqueue = self.con.sm.uqueue + self.assertEqual(self.con.sm.out_h, 0) + self.assertEqual(self.con.sm.in_h, 0) + + # The server sends 10 stanzas + self._send(self.dispatcher.ProcessNonBlocking, 5, message) + self._send(self.dispatcher.ProcessNonBlocking, 4, iq) + self._send(self.dispatcher.ProcessNonBlocking, 1, presence) + + # The client has recieved 10 stanzas and sent none + self.assertEqual(self.con.sm.in_h, 10) + self.assertEqual(self.con.sm.out_h, 0) + + m = protocol.Message() + + # The client sends 10 stanzas + for i in range(10): + m = protocol.Message(body=str(i)) + self.dispatcher.send(m) + + # Client sends 10 stanzas and put them in the queue + self.assertEqual(self.con.sm.out_h, 10) + self.assertEqual(len(uqueue), 10) + + # The server acknowledges that it recieved 5 stanzas + self.dispatcher.ProcessNonBlocking("<a xmlns='urn:xmpp:sm:2' h='5'/>") + # 5 stanzas are removed from the queue, only 5 stanzas are left + + self.assertEqual(len(uqueue), 5) + + # Check for the right order of stanzas in the queue + l = ['5', '6', '7', '8', '9'] + for i in uqueue: + self.assertEqual(i.getBody(), l[0]) + l.pop(0) + + def test_resumption(self): + self._simulate_connect() + + m = protocol.Message() + + # The client sends 5 stanzas + for i in range(5): + m = protocol.Message(body=str(i)) + self.dispatcher.send(m) + + self._simulate_resume() + # No stanzas left + self.assertEqual(len(self.con.sm.uqueue), 0) + +if __name__ == '__main__': + unittest.main() \ No newline at end of file