Commit 5876bdf3 authored by Thilo Molitor's avatar Thilo Molitor

Implemented smacks 3 (xep revision 1.5), the location attribute is not implemented yet

parent 95254655
......@@ -734,9 +734,9 @@ class NonBlockingBind(PlugIn):
self._owner.User = jid.getNode()
self._owner.Resource = jid.getResource()
# Only negociate stream management after bounded
sm = self._owner._caller.sm
if self.supports_sm:
# starts negociation
sm = self._owner._caller.sm
sm.supports_sm = True
sm.set_owner(self._owner)
sm.negociate()
......@@ -745,6 +745,7 @@ class NonBlockingBind(PlugIn):
if hasattr(self, 'session') and self.session == -1:
# Server don't want us to initialize a session
log.info('No session required.')
self._owner._caller.sm.resend_queue() #resend old messages still in the smacks queue
self.on_bound('ok')
else:
self._owner.SendAndWaitForResponse(Protocol('iq', typ='set',
......@@ -763,6 +764,7 @@ class NonBlockingBind(PlugIn):
if isResultNode(resp):
log.info('Successfully opened session.')
self.session = 1
self._owner._caller.sm.resend_queue() #resend old messages still in the smacks queue
self.on_bound('ok')
else:
log.error('Session open failed.')
......
......@@ -387,7 +387,7 @@ class XMPPDispatcher(PlugIn):
def UnregisterCycleHandler(self, handler):
"""
Unregister handler that will is called on every Dispatcher.Process() call
Unregister handler that will be called on every Dispatcher.Process() call
"""
if handler in self._cycleHandlers:
self._cycleHandlers.remove(handler)
......
......@@ -164,7 +164,7 @@ NS_RECEIPTS = 'urn:xmpp:receipts'
NS_PUBKEY_PUBKEY = 'urn:xmpp:pubkey:2' # XEP-0189
NS_PUBKEY_REVOKE = 'urn:xmpp:revoke:2'
NS_PUBKEY_ATTEST = 'urn:xmpp:attest:2'
NS_STREAM_MGMT = 'urn:xmpp:sm:2' # XEP-198
NS_STREAM_MGMT = 'urn:xmpp:sm:3' # XEP-198
NS_HASHES = 'urn:xmpp:hashes:1' # XEP-300
NS_HASHES_MD5 = 'urn:xmpp:hash-function-textual-names:md5'
NS_HASHES_SHA1 = 'urn:xmpp:hash-function-textual-names:sha-1'
......
......@@ -22,7 +22,8 @@ class Smacks(object):
self.session_id = None
self.resumption = False # If server supports resume
# Max number of stanzas in queue before making a request
self.max_queue = 5
#be more agressive here (every message must be acked), this creates fewer message duplicates on failed resume)
self.max_queue = 0
self._owner = None
self.resuming = False
self.enabled = False # If SM is enabled
......@@ -63,7 +64,6 @@ class Smacks(object):
# Every time we attempt to negociate, we must erase all previous info
# about any previous session
self.uqueue = []
self.old_uqueue = []
self.in_h = 0
self.out_h = 0
self.session_id = None
......@@ -73,12 +73,26 @@ class Smacks(object):
stanza.buildEnable(resume)
self._owner.Connection.send(stanza, now=True)
def resend_queue(self):
"""
Resends unsent stanzas when a new session is established.
This way there won't be any lost outgoing messages even on failed smacks resumes (but message duplicates are possible).
If your server supports revision 1.5 of smacks then even message duplicates are eliminated here :)
"""
if self.old_uqueue:
log.info('Session resumption failed, replaying %s stanzas anyways...' % len(self.old_uqueue))
for i in self.old_uqueue:
self._owner.Dispatcher.send(i, False) #use this send so that our stanzas actually increment out_h
self.old_uqueue = []
def resume_request(self):
if not self.session_id:
self.resuming = False
log.error('Attempted to resume without a valid session id ')
return
self.old_uqueue = self.uqueue #save old messages in an extra "queue" to avoid race conditions
#save old messages in an extra "queue" to avoid race conditions and to make it possible to replay stanzas even when resuming fails
#add messages here (instead of overwriting) so that repeated connection errors don't delete unacked stanzas (uqueue should be empty in this case anyways)
self.old_uqueue += self.uqueue
self.uqueue = []
resume = Acks()
resume.buildResume(self.in_h, self.session_id)
......@@ -160,6 +174,23 @@ class Smacks(object):
self._owner.NonBlockingBind.resuming = False
self._owner._on_auth_bind(None)
self.failed_resume = True
h = stanza.getTag('item-not-found').getAttr('h')
if not h:
return
#prepare old_queue to contain only unacked stanzas for later resend (which is happening after our session is established properly)
h = int(h)
diff = self.out_h - h
if diff < 0:
log.error('Server and client number of stanzas handled mismatch on session resumption (our h: %d, server h: %d)' % (self.out_h, h))
self.old_uqueue = [] #that's weird, but we don't resend this stanzas if the server says we don't need to
elif len(self.old_uqueue) < diff:
log.error('Server and client number of stanzas handled mismatch on session resumption (our h: %d, server h: %d)' % (self.out_h, h))
else:
log.info('Removing %d already received stanzas from old outgoing queue (our h: %d, server h: %d, remaining in queue: %d)' % (len(self.old_uqueue) - diff, self.out_h, h, diff))
while (len(self.old_uqueue) > diff):
self.old_uqueue.pop(0)
return
# Doesn't support resumption
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment