Commit 5c2c0b22 authored by Yann Leboulanger's avatar Yann Leboulanger

Merge branch 'smacks_rev_1.5' into 'master'

Implemented smacks 3 (xep revision 1.5)

See merge request !1
parents ace6e018 bb4cb499
......@@ -734,9 +734,9 @@ class NonBlockingBind(PlugIn):
self._owner.User = jid.getNode()
self._owner.Resource = jid.getResource()
# Only negociate stream management after bounded
sm = self._owner._caller.sm
if self.supports_sm:
# starts negociation
sm = self._owner._caller.sm
sm.supports_sm = True
sm.set_owner(self._owner)
sm.negociate()
......@@ -745,6 +745,7 @@ class NonBlockingBind(PlugIn):
if hasattr(self, 'session') and self.session == -1:
# Server don't want us to initialize a session
log.info('No session required.')
self._owner._caller.sm.resend_queue() #resend old messages still in the smacks queue
self.on_bound('ok')
else:
self._owner.SendAndWaitForResponse(Protocol('iq', typ='set',
......@@ -763,6 +764,7 @@ class NonBlockingBind(PlugIn):
if isResultNode(resp):
log.info('Successfully opened session.')
self.session = 1
self._owner._caller.sm.resend_queue() #resend old messages still in the smacks queue
self.on_bound('ok')
else:
log.error('Session open failed.')
......
......@@ -23,12 +23,13 @@ different handlers to different XMPP stanzas and namespaces
from __future__ import unicode_literals
from . import simplexml
import sys
import time
import locale
import re
import uuid
from xml.parsers.expat import ExpatError
from .plugin import PlugIn
from .protocol import (NS_STREAMS, NS_XMPP_STREAMS, NS_HTTP_BIND, Iq, Presence,
from .protocol import (NS_DELAY2, NS_STREAMS, NS_XMPP_STREAMS, NS_HTTP_BIND, Iq, Presence,
Message, Protocol, Node, Error, ERR_FEATURE_NOT_IMPLEMENTED, StreamError)
import logging
......@@ -387,7 +388,7 @@ class XMPPDispatcher(PlugIn):
def UnregisterCycleHandler(self, handler):
"""
Unregister handler that will is called on every Dispatcher.Process() call
Unregister handler that will be called on every Dispatcher.Process() call
"""
if handler in self._cycleHandlers:
self._cycleHandlers.remove(handler)
......@@ -571,12 +572,21 @@ class XMPPDispatcher(PlugIn):
# If no ID then it is a whitespace
if self.sm and self.sm.enabled and ID:
# add timestamp to message stanza in queue
if stanza.getName() == 'message' and \
(stanza.getType() == 'chat' or stanza.getType() == 'groupchat'):
our_jid = stanza.getAttr('from')
timestamp = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(None))
stanza.addChild('delay', namespace=NS_DELAY2,
attrs={'from': our_jid, 'stamp': timestamp})
self.sm.uqueue.append(stanza)
self.sm.out_h += 1
if len(self.sm.uqueue) > self.sm.max_queue:
self.sm.request_ack()
self._owner.Connection.send(stanza, now)
if self.sm and self.sm.enabled and ID and len(self.sm.uqueue) > self.sm.max_queue:
self.sm.request_ack()
return ID
......
......@@ -169,7 +169,7 @@ NS_RECEIPTS = 'urn:xmpp:receipts'
NS_PUBKEY_PUBKEY = 'urn:xmpp:pubkey:2' # XEP-0189
NS_PUBKEY_REVOKE = 'urn:xmpp:revoke:2'
NS_PUBKEY_ATTEST = 'urn:xmpp:attest:2'
NS_STREAM_MGMT = 'urn:xmpp:sm:2' # XEP-198
NS_STREAM_MGMT = 'urn:xmpp:sm:3' # XEP-198
NS_HASHES = 'urn:xmpp:hashes:1' # XEP-300
NS_HASHES_MD5 = 'urn:xmpp:hash-function-textual-names:md5'
NS_HASHES_SHA1 = 'urn:xmpp:hash-function-textual-names:sha-1'
......
......@@ -22,7 +22,8 @@ class Smacks(object):
self.session_id = None
self.resumption = False # If server supports resume
# Max number of stanzas in queue before making a request
self.max_queue = 5
#be more agressive here (every message must be acked), this creates fewer message duplicates on failed resume)
self.max_queue = 0
self._owner = None
self.resuming = False
self.enabled = False # If SM is enabled
......@@ -47,6 +48,7 @@ class Smacks(object):
def _neg_response(self, disp, stanza):
r = stanza.getAttr('resume')
log.info("Session resumption: %s" % r)
if r == 'true' or r == 'True' or r == '1':
self.resumption = True
self.session_id = stanza.getAttr('id')
......@@ -62,8 +64,8 @@ class Smacks(object):
def negociate(self, resume=True):
# Every time we attempt to negociate, we must erase all previous info
# about any previous session
log.debug("Clearing smacks uqueue")
self.uqueue = []
self.old_uqueue = []
self.in_h = 0
self.out_h = 0
self.session_id = None
......@@ -73,12 +75,26 @@ class Smacks(object):
stanza.buildEnable(resume)
self._owner.Connection.send(stanza, now=True)
def resend_queue(self):
"""
Resends unsent stanzas when a new session is established.
This way there won't be any lost outgoing messages even on failed smacks resumes (but message duplicates are possible).
If your server supports revision 1.5 of smacks then even message duplicates are eliminated here :)
"""
if self.old_uqueue:
log.info('Session resumption failed, replaying %s stanzas anyways...' % len(self.old_uqueue))
for i in self.old_uqueue:
self._owner.Dispatcher.send(i, False) #use this send so that our stanzas actually increment out_h
self.old_uqueue = []
def resume_request(self):
if not self.session_id:
self.resuming = False
log.error('Attempted to resume without a valid session id ')
return
self.old_uqueue = self.uqueue #save old messages in an extra "queue" to avoid race conditions
#save old messages in an extra "queue" to avoid race conditions and to make it possible to replay stanzas even when resuming fails
#add messages here (instead of overwriting) so that repeated connection errors don't delete unacked stanzas (uqueue should be empty in this case anyways)
self.old_uqueue += self.uqueue
self.uqueue = []
resume = Acks()
resume.buildResume(self.in_h, self.session_id)
......@@ -124,6 +140,7 @@ class Smacks(object):
number of stanzas received by the server. Resends stanzas not received
by the server in the last session.
"""
log.info("Session resumption succeeded")
h = stanza.getAttr('h')
if not h:
log.error('Server did not send h attribute')
......@@ -137,7 +154,7 @@ class Smacks(object):
elif len(self.old_uqueue) < diff:
log.error('Server and client number of stanzas handled mismatch on session resumption (our h: %d, server h: %d)' % (self.out_h, h))
else:
log.info('Removing %d already received stanzas from old outgoing queue (our h: %d, server h: %d, remaining in queue: %d)' % (len(self.old_uqueue) - diff, self.out_h, h, diff))
log.info('Removing %d already acked stanzas from old outgoing queue (our h: %d, server h: %d, remaining in queue: %d)' % (len(self.old_uqueue) - diff, self.out_h, h, diff))
while (len(self.old_uqueue) > diff):
self.old_uqueue.pop(0)
......@@ -160,14 +177,33 @@ class Smacks(object):
self._owner.NonBlockingBind.resuming = False
self._owner._on_auth_bind(None)
self.failed_resume = True
h = stanza.getTag('item-not-found').getAttr('h')
log.info('Session resumption failed (item-not-found), server h: %s' % str(h))
if not h:
return
#prepare old_queue to contain only unacked stanzas for later resend (which is happening after our session is established properly)
h = int(h)
diff = self.out_h - h
if diff < 0:
log.error('Server and client number of stanzas handled mismatch on session resumption (our h: %d, server h: %d)' % (self.out_h, h))
self.old_uqueue = [] #that's weird, but we don't resend this stanzas if the server says we don't need to
elif len(self.old_uqueue) < diff:
log.error('Server and client number of stanzas handled mismatch on session resumption (our h: %d, server h: %d)' % (self.out_h, h))
else:
log.info('Removing %d already acked stanzas from old outgoing queue (our h: %d, server h: %d, remaining in queue: %d)' % (len(self.old_uqueue) - diff, self.out_h, h, diff))
while (len(self.old_uqueue) > diff):
self.old_uqueue.pop(0)
return
# Doesn't support resumption
if stanza.getTag('feature-not-implemented'):
log.info('Session resumption failed (feature-not-implemented)')
self.negociate(False)
return
if stanza.getTag('unexpected-request'):
self.enabled = False
log.error('Gajim failed to negociate Stream Management')
self.enabled = False
return
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment