Commit f9405ab8 authored by Philipp Hörist's avatar Philipp Hörist

Task: Add timeout

parent 6dde8442
......@@ -82,7 +82,6 @@ from nbxmpp.modules.vcard4 import VCard4
from nbxmpp.modules.ping import Ping
from nbxmpp.modules.misc import unwrap_carbon
from nbxmpp.modules.misc import unwrap_mam
from nbxmpp.structs import StanzaTimeoutError
from nbxmpp.util import get_properties_struct
from nbxmpp.util import get_invalid_xml_regex
from nbxmpp.util import is_websocket_close
......@@ -494,7 +493,7 @@ class StanzaDispatcher(Observable):
if timeout < time.monotonic():
self._id_callbacks.pop(id_)
func(self._client, StanzaTimeoutError(id_), **user_data)
func(self._client, None, **user_data)
return True
def _remove_timeout_source(self):
......
......@@ -137,6 +137,15 @@ class CancelledError(BaseError):
self.text = 'Task has been cancelled'
class TimeoutStanzaError(BaseError):
log_level = logging.INFO
def __init__(self, id_):
BaseError.__init__(self, is_fatal=True)
self.text = 'IQ with id %s reached timeout' % id_
class RegisterStanzaError(StanzaError):
def __init__(self, stanza, data):
StanzaError.__init__(self, stanza)
......
......@@ -559,22 +559,6 @@ class StanzaMalformedError(CommonError):
raise NotImplementedError
class StanzaTimeoutError(CommonError):
def __init__(self, id_):
self.condition = 'timeout'
self.id = id_
@classmethod
def from_string(cls, node_string):
raise NotImplementedError
def __str__(self):
return 'IQ with id %s reached timeout' % self.id
def serialize(self):
raise NotImplementedError
class StreamError(CommonError):
def __init__(self, stanza):
self.condition = stanza.getError()
......
......@@ -25,6 +25,7 @@ from gi.repository import Soup
from nbxmpp.errors import is_error
from nbxmpp.errors import CancelledError
from nbxmpp.errors import TimeoutStanzaError
from nbxmpp.simplexml import Node
......@@ -76,10 +77,11 @@ def _setup_task(task, client, callback, user_data):
def iq_request_task(func):
@wraps(func)
def func_wrapper(self, *args, callback=None, user_data=None, **kwargs):
def func_wrapper(self, *args, timeout=None, callback=None, user_data=None, **kwargs):
task = IqRequestTask(func(self, *args, **kwargs),
self._log,
self._client)
task.set_timeout(timeout)
return _setup_task(task, self._client, callback, user_data)
return func_wrapper
......@@ -125,6 +127,7 @@ class Task:
self._result = None
self._error = None
self._user_data = None
self._timeout = None
self._finalize_func = None
self._finalize_context = None
self._state = TaskState.INIT
......@@ -147,6 +150,9 @@ class Task:
self._done_callbacks.append(callback)
def set_timeout(self, timeout):
self._timeout = timeout
def start(self):
if not self._state.is_init:
raise RuntimeError('Task already started')
......@@ -291,15 +297,27 @@ class IqRequestTask(Task):
def __init__(self, gen, logger, client):
super().__init__(gen, logger)
self._client = client
self._iq_id = None
def _run_async(self, stanza):
self._client.send_stanza(stanza, callback=self._async_finished)
self._iq_id = self._client.send_stanza(stanza,
callback=self._async_finished,
timeout=self._timeout)
def _async_finished(self, _client, result, *args, **kwargs):
if self._state != TaskState.CANCELLED:
self._next_step(result)
if self._state == TaskState.CANCELLED:
return
if result is None:
self._error = TimeoutStanzaError(self._iq_id)
self._set_finished()
return
self._next_step(result)
def _finalize(self):
if self._iq_id is not None:
self._client._dispatcher.remove_iq_callback(self._iq_id)
self._client = None
super()._finalize()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment