Commit bd7195cb authored by Philipp Hörist's avatar Philipp Hörist

Muclumbus: Use tasks

parent 3c507751
......@@ -22,15 +22,16 @@ from gi.repository import Soup
from nbxmpp.namespaces import Namespace
from nbxmpp.protocol import Node
from nbxmpp.protocol import Iq
from nbxmpp.protocol import isResultNode
from nbxmpp.structs import MuclumbusItem
from nbxmpp.structs import MuclumbusResult
from nbxmpp.const import AnonymityMode
from nbxmpp.modules.dataforms import extend_form
from nbxmpp.util import call_on_response
from nbxmpp.util import callback
from nbxmpp.util import raise_error
from nbxmpp.errors import StanzaError
from nbxmpp.errors import MalformedStanzaError
from nbxmpp.task import iq_request_task
from nbxmpp.task import http_request_task
from nbxmpp.modules.base import BaseModule
from nbxmpp.modules.util import finalize
# API Documentation
......@@ -52,84 +53,58 @@ class Muclumbus(BaseModule):
self._proxy_resolver = proxy.get_resolver()
self._soup_session.props.proxy_resolver = self._proxy_resolver
@call_on_response('_parameters_received')
@iq_request_task
def request_parameters(self, jid):
query = Iq(to=jid, typ='get')
query.addChild(node=Node('search',
attrs={'xmlns': Namespace.MUCLUMBUS}))
return query
task = yield
@callback
def _parameters_received(self, stanza):
if not isResultNode(stanza):
return raise_error(self._log.info, stanza)
response = yield _make_parameter_request(jid)
if response.isError():
raise StanzaError(response)
search = stanza.getTag('search', namespace=Namespace.MUCLUMBUS)
search = response.getTag('search', namespace=Namespace.MUCLUMBUS)
if search is None:
return raise_error(self._log.warning, stanza, 'stanza-malformed')
raise MalformedStanzaError('search node missing', response)
dataform = search.getTag('x', namespace=Namespace.DATA)
if dataform is None:
return raise_error(self._log.warning, stanza, 'stanza-malformed')
raise MalformedStanzaError('dataform node missing', response)
self._log.info('Muclumbus parameters received')
return extend_form(node=dataform)
yield finalize(task, extend_form(node=dataform))
@call_on_response('_search_received')
@iq_request_task
def set_search(self, jid, dataform, items_per_page=50, after=None):
search = Node('search', attrs={'xmlns': Namespace.MUCLUMBUS})
search.addChild(node=dataform)
rsm = search.addChild('set', namespace=Namespace.RSM)
rsm.addChild('max').setData(items_per_page)
if after is not None:
rsm.addChild('after').setData(after)
query = Iq(to=jid, typ='get')
query.addChild(node=search)
return query
def set_http_search(self, uri, keywords, after=None,
callback=None, user_data=None):
search = {'keywords': keywords}
if after is not None:
search['after'] = after
message = Soup.Message.new('POST', uri)
message.set_request('application/json',
Soup.MemoryUse.COPY,
json.dumps(search).encode())
_task = yield
self._soup_session.queue_message(message,
self._http_search_received,
callback,
user_data)
response = yield _make_search_query(jid,
dataform,
items_per_page,
after)
if response.isError():
raise StanzaError(response)
@callback
def _search_received(self, stanza):
if not isResultNode(stanza):
return raise_error(self._log.info, stanza)
result = stanza.getTag('result', namespace=Namespace.MUCLUMBUS)
result = response.getTag('result', namespace=Namespace.MUCLUMBUS)
if result is None:
return raise_error(self._log.warning, stanza, 'stanza-malformed')
raise MalformedStanzaError('result node missing', response)
items = result.getTags('item')
if not items:
return MuclumbusResult(first=None,
last=None,
max=None,
end=True,
items=[])
yield MuclumbusResult(first=None,
last=None,
max=None,
end=True,
items=[])
set_ = result.getTag('set', namespace=Namespace.RSM)
if set_ is None:
return raise_error(self._log.warning, stanza, 'stanza-malformed')
raise MalformedStanzaError('set node missing', response)
first = set_.getTagData('first')
last = set_.getTagData('last')
try:
max_ = int(set_.getTagData('max'))
except Exception:
return raise_error(self._log.warning, stanza, 'stanza-malformed')
raise MalformedStanzaError('invalid max value', response)
results = []
for item in items:
......@@ -152,40 +127,47 @@ class Muclumbus(BaseModule):
language=language or '',
is_open=is_open,
anonymity_mode=anonymity_mode))
return MuclumbusResult(first=first,
last=last,
max=max_,
end=len(items) < max_,
items=results)
yield MuclumbusResult(first=first,
last=last,
max=max_,
end=len(items) < max_,
items=results)
@http_request_task
def set_http_search(self, uri, keywords, after=None):
_task = yield
search = {'keywords': keywords}
if after is not None:
search['after'] = after
def _http_search_received(self, _session, message, callback, user_data):
message = Soup.Message.new('POST', uri)
message.set_request('application/json',
Soup.MemoryUse.COPY,
json.dumps(search).encode())
def exec_callback(muclumbus_result):
if user_data is None:
callback(muclumbus_result)
else:
callback(muclumbus_result, user_data)
response_message = yield message
soup_body = message.get_property('response-body')
soup_body = response_message.get_property('response-body')
if message.status_code != 200:
if response_message.status_code != 200:
self._log.warning(soup_body.data)
exec_callback(MuclumbusResult(first=None,
last=None,
max=None,
end=True,
items=[]))
yield MuclumbusResult(first=None,
last=None,
max=None,
end=True,
items=[])
response = json.loads(soup_body.data)
result = response['result']
items = result.get('items')
if items is None:
exec_callback(MuclumbusResult(first=None,
last=None,
max=None,
end=True,
items=[]))
yield MuclumbusResult(first=None,
last=None,
max=None,
end=True,
items=[])
results = []
for item in items:
......@@ -203,8 +185,27 @@ class Muclumbus(BaseModule):
is_open=item['is_open'],
anonymity_mode=anonymity_mode))
exec_callback(MuclumbusResult(first=None,
last=result['last'],
max=None,
end=not result['more'],
items=results))
yield MuclumbusResult(first=None,
last=result['last'],
max=None,
end=not result['more'],
items=results)
def _make_parameter_request(jid):
query = Iq(to=jid, typ='get')
query.addChild(node=Node('search',
attrs={'xmlns': Namespace.MUCLUMBUS}))
return query
def _make_search_query(jid, dataform, items_per_page=50, after=None):
search = Node('search', attrs={'xmlns': Namespace.MUCLUMBUS})
search.addChild(node=dataform)
rsm = search.addChild('set', namespace=Namespace.RSM)
rsm.addChild('max').setData(items_per_page)
if after is not None:
rsm.addChild('after').setData(after)
query = Iq(to=jid, typ='get')
query.addChild(node=search)
return query
......@@ -21,6 +21,8 @@ import logging
from enum import IntEnum
from functools import wraps
from gi.repository import Soup
from nbxmpp.errors import is_error
from nbxmpp.errors import CancelledError
from nbxmpp.simplexml import Node
......@@ -62,20 +64,33 @@ class TaskState(IntEnum):
return self == TaskState.CANCELLED
def _setup_task(task, client, callback, user_data):
client.add_task(task)
task.set_finalize_func(client.remove_task)
task.set_user_data(user_data)
if callback is not None:
task.add_done_callback(callback)
task.start()
return task
def iq_request_task(func):
@wraps(func)
def func_wrapper(self, *args,
callback=None, user_data=None, **kwargs):
def func_wrapper(self, *args, callback=None, user_data=None, **kwargs):
task = IqRequestTask(func(self, *args, **kwargs),
self._log,
self._client)
self._client.add_task(task)
task.set_finalize_func(self._client.remove_task)
task.set_user_data(user_data)
if callback is not None:
task.add_done_callback(callback)
task.start()
return task
return _setup_task(task, self._client, callback, user_data)
return func_wrapper
def http_request_task(func):
@wraps(func)
def func_wrapper(self, *args, callback=None, user_data=None, **kwargs):
task = HTTPRequestTask(func(self, *args, **kwargs),
self._log,
self._soup_session)
return _setup_task(task, self._client, callback, user_data)
return func_wrapper
......@@ -287,3 +302,28 @@ class IqRequestTask(Task):
def _finalize(self):
self._client = None
super()._finalize()
class HTTPRequestTask(Task):
'''
A Task for running HTTP requests
'''
_process_types = (Soup.Message,)
def __init__(self, gen, logger, session):
super().__init__(gen, logger)
self._session = session
def _run_async(self, message):
self._session.queue_message(message, self._async_finished, None)
def _async_finished(self, _session, message, _user_data):
if self._state != TaskState.CANCELLED:
self._next_step(message)
def _finalize(self):
self._session = None
super()._finalize()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment