Commit f3469e7d authored by Philipp Hörist's avatar Philipp Hörist

Refactor IQ callbacks

- Remove SendAndWaitForResponse(), there is no need for a timeout that
ends the stream. We have other means of checking if the connection to
the server was lost.

- Don't wrap ProcessNonBlocking(), a server can send multiple stanzas
in one packet, wraping this methods means we process callbacks after
processing all stanzas in the packet. Which can lead to processing stanzas
not in arrival order. Instead process callbacks inside dispatch().
parent 7e700683
......@@ -87,7 +87,7 @@ class NonBlockingBind(PlugIn):
payload = Node('bind', attrs={'xmlns': NS_BIND}, payload=resource)
node = Protocol('iq', typ='set', payload=[payload])
self._owner.Dispatcher.SendAndWaitForResponse(node, func=self._on_bind)
self._owner.Dispatcher.SendAndCallForResponse(node, func=self._on_bind)
def _on_bind(self, stanza):
if isResultNode(stanza):
......@@ -104,7 +104,7 @@ class NonBlockingBind(PlugIn):
else:
node = Node('session', attrs={'xmlns':NS_SESSION})
iq = Protocol('iq', typ='set', payload=[node])
self._owner.SendAndWaitForResponse(
self._owner.SendAndCallForResponse(
iq, func=self._on_session)
return
if stanza:
......
......@@ -145,7 +145,7 @@ class XMPPDispatcher(PlugIn):
self.RegisterEventHandler, self.UnregisterCycleHandler,
self.RegisterCycleHandler, self.RegisterHandlerOnce,
self.UnregisterHandler, self.RegisterProtocol,
self.SendAndWaitForResponse, self.SendAndCallForResponse,
self.SendAndCallForResponse,
self.getAnID, self.Event, self.send, self.get_module]
# \ufddo -> \ufdef range
......@@ -245,7 +245,6 @@ class XMPPDispatcher(PlugIn):
self.RegisterDefaultHandler(self.returnStanzaHandler)
self.RegisterEventHandler(self._owner._caller._event_dispatcher)
self._register_modules()
self.on_responses = {}
def plugin(self, owner):
"""
......@@ -608,21 +607,18 @@ class XMPPDispatcher(PlugIn):
_id = stanza.getID()
processed = False
if _id in session._expected:
if isinstance(session._expected[_id], tuple):
cb, args = session._expected[_id]
log.debug('Expected stanza arrived. Callback %s(%s) found',
cb, args)
try:
cb(session, stanza, **args)
except NodeProcessed:
pass
except Exception:
raise
else:
log.debug('Expected stanza arrived')
session._expected[_id] = stanza
processed = True
if _id in self._expected:
cb, args = self._expected[_id]
log.debug('Expected stanza arrived. Callback %s(%s) found',
cb, args)
try:
if args is None:
cb(stanza)
else:
cb(self, stanza, **args)
except NodeProcessed:
pass
return
# Gather specifics depending on stanza properties
specifics = ['default']
......@@ -664,60 +660,14 @@ class XMPPDispatcher(PlugIn):
if not processed and self._defaultHandler:
self._defaultHandler(session, stanza)
def _WaitForData(self, data):
"""
Internal wrapper around ProcessNonBlocking. Will check for
"""
if data is None:
return
res = self.ProcessNonBlocking(data)
# 0 result indicates that we have closed the connection, e.g.
# we have released dispatcher, so self._owner has no methods
if not res:
return
for (_id, _iq) in list(self._expected.items()):
if _iq is None:
# If the expected Stanza would have arrived, ProcessNonBlocking
# would have placed the reply stanza in there
continue
if _id in self.on_responses:
if len(self._expected) == 1:
if hasattr(self._owner, 'onreceive'):
# With BOSH we get a terminating body with multiple stanzas
# in it, we unplug BOSH before we parse the stanzas
self._owner.onreceive(None)
resp, args = self.on_responses[_id]
del self.on_responses[_id]
if args is None:
resp(_iq)
else:
resp(self._owner, _iq, **args)
del self._expected[_id]
def SendAndWaitForResponse(self, stanza, timeout=None, func=None, args=None):
"""
Send stanza and wait for recipient's response to it. Will call transports
on_timeout callback if response is not retrieved in time
Be aware: Only timeout of latest call of SendAndWait is active.
"""
if timeout is None:
timeout = DEFAULT_TIMEOUT_SECONDS
_waitid = self.send(stanza)
if func:
self.on_responses[_waitid] = (func, args)
if timeout:
self._owner.set_timeout(timeout)
self._owner.onreceive(self._WaitForData)
self._expected[_waitid] = None
return _waitid
def SendAndCallForResponse(self, stanza, func=None, args=None):
"""
Put stanza on the wire and call back when recipient replies. Additional
callback arguments can be specified in args
"""
self.SendAndWaitForResponse(stanza, 0, func, args)
_waitid = self.send(stanza)
self._expected[_waitid] = (func, args)
return _waitid
def send(self, stanza, now=False):
"""
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment