Commit b1cb21dd authored by Philipp Hörist's avatar Philipp Hörist
Browse files

PubSub: Rewrite and use Tasks

parent 1b5bfca9
......@@ -25,6 +25,11 @@ from nbxmpp.modules.base import BaseModule
class Activity(BaseModule):
_depends = {
'publish': 'PubSub'
}
def __init__(self, client):
BaseModule.__init__(self, client)
......@@ -91,6 +96,4 @@ class Activity(BaseModule):
if data.text:
item.addChild('text', payload=data.text)
jid = self._client.get_bound_jid().bare
self._client.get_module('PubSub').publish(
jid, Namespace.ACTIVITY, item, id_='current')
self.publish(Namespace.ACTIVITY, item, id_='current')
......@@ -30,11 +30,9 @@ from nbxmpp.util import to_xs_boolean
from nbxmpp.util import call_on_response
from nbxmpp.util import callback
from nbxmpp.util import raise_error
from nbxmpp.util import is_error_result
from nbxmpp.modules.pubsub import get_pubsub_item
from nbxmpp.modules.pubsub import get_pubsub_items
from nbxmpp.modules.pubsub import get_pubsub_request
from nbxmpp.modules.pubsub import get_publish_options
from nbxmpp.modules.base import BaseModule
......@@ -47,13 +45,19 @@ BOOKMARK_2_OPTIONS = {
'pubsub#notify_delete': 'true',
'pubsub#notify_retract': 'true',
'pubsub#persist_items': 'true',
'pubsub#max_items': '128',
'pubsub#max_items': 'max',
'pubsub#access_model': 'whitelist',
'pubsub#send_last_published_item': 'never',
}
class Bookmarks(BaseModule):
_depends = {
'retract': 'PubSub',
'publish': 'PubSub',
}
def __init__(self, client):
BaseModule.__init__(self, client)
......@@ -71,8 +75,6 @@ class Bookmarks(BaseModule):
self._bookmark_2_queue = {}
self._bookmark_1_queue = []
self._node_configuration_in_progress = False
self._node_configuration_not_possible = False
def _process_pubsub_bookmarks(self, _client, stanza, properties):
if not properties.is_pubsub_event:
......@@ -315,115 +317,44 @@ class Bookmarks(BaseModule):
def retract_bookmark(self, bookmark_jid):
self._log.info('Retract Bookmark: %s', bookmark_jid)
jid = self._client.get_bound_jid().bare
self._client.get_module('PubSub').retract(jid,
Namespace.BOOKMARKS_2,
str(bookmark_jid))
self.retract(Namespace.BOOKMARKS_2, str(bookmark_jid))
def _store_bookmark_1(self, bookmarks):
self._log.info('Store Bookmarks 1 (PubSub)')
jid = self._client.get_bound_jid().bare
self._bookmark_1_queue = bookmarks
item = self._build_storage_node(bookmarks)
options = get_publish_options(BOOKMARK_1_OPTIONS)
self._client.get_module('PubSub').publish(
jid,
Namespace.BOOKMARKS,
item,
id_='current',
options=options,
callback=self._on_store_bookmark_result,
user_data=Namespace.BOOKMARKS)
self.publish(Namespace.BOOKMARKS,
item,
id_='current',
options=BOOKMARK_1_OPTIONS,
force_node_options=True,
callback=self._on_store_bookmark_result,
user_data=Namespace.BOOKMARKS)
def _store_bookmark_2(self, bookmarks):
if self._node_configuration_not_possible:
self._log.warning('Node configuration not possible')
return
self._log.info('Store Bookmarks 2 (PubSub)')
jid = self._client.get_bound_jid().bare
for bookmark in bookmarks:
self._bookmark_2_queue[bookmark.jid] = bookmark
item = self._build_conference_node(bookmark)
options = get_publish_options(BOOKMARK_2_OPTIONS)
self._client.get_module('PubSub').publish(
jid,
Namespace.BOOKMARKS_2,
item,
id_=str(bookmark.jid),
options=options,
callback=self._on_store_bookmark_result,
user_data=Namespace.BOOKMARKS_2)
def _on_store_bookmark_result(self, result, node):
if not is_error_result(result):
self._bookmark_1_queue = []
self._bookmark_2_queue.pop(result.id, None)
return
if (result.condition == 'conflict' and
result.app_condition == 'precondition-not-met'):
if self._node_configuration_in_progress:
return
self._node_configuration_in_progress = True
jid = self._client.get_bound_jid().bare
self._client.get_module('PubSub').get_node_configuration(
jid,
node,
callback=self._on_node_configuration_received)
else:
self._bookmark_1_queue = []
self._bookmark_2_queue.pop(result.id, None)
self._log.warning(result)
def _on_node_configuration_received(self, result):
if is_error_result(result):
self._log.warning(result)
self._bookmark_1_queue = []
self._bookmark_2_queue.clear()
return
if result.node == Namespace.BOOKMARKS:
config = BOOKMARK_1_OPTIONS
else:
config = BOOKMARK_2_OPTIONS
self._apply_config(result.form, config)
self._client.get_module('PubSub').set_node_configuration(
result.jid,
result.node,
result.form,
callback=self._on_node_configuration_finished)
def _on_node_configuration_finished(self, result):
self._node_configuration_in_progress = False
if is_error_result(result):
self._log.warning(result)
self._bookmark_2_queue.clear()
self._bookmark_1_queue = []
self._node_configuration_not_possible = True
return
self._log.info('Republish bookmarks')
if self._bookmark_2_queue:
bookmarks = self._bookmark_2_queue.copy()
self._bookmark_2_queue.clear()
self._store_bookmark_2(bookmarks.values())
else:
bookmarks = self._bookmark_1_queue.copy()
self._bookmark_1_queue.clear()
self._store_bookmark_1(bookmarks)
self.publish(Namespace.BOOKMARKS_2,
item,
id_=str(bookmark.jid),
options=BOOKMARK_2_OPTIONS,
force_node_options=True,
callback=self._on_store_bookmark_result,
user_data=Namespace.BOOKMARKS_2)
def _on_store_bookmark_result(self, task):
try:
result = task.finish()
except Exception as error:
self._log.warning(error)
@staticmethod
def _apply_config(form, config):
for var, value in config.items():
try:
field = form[var]
except KeyError:
pass
else:
field.value = value
self._bookmark_1_queue = []
self._bookmark_2_queue.pop(result.id, None)
@call_on_response('_on_private_store_result')
def _store_with_private(self, bookmarks):
......
......@@ -24,6 +24,11 @@ from nbxmpp.modules.base import BaseModule
class Location(BaseModule):
_depends = {
'publish': 'PubSub'
}
def __init__(self, client):
BaseModule.__init__(self, client)
......@@ -70,6 +75,4 @@ class Location(BaseModule):
if value is not None:
item.addChild(tag, payload=value)
jid = self._client.get_bound_jid().bare
self._client.get_module('PubSub').publish(
jid, Namespace.LOCATION, item, id_='current')
self.publish(Namespace.LOCATION, item, id_='current')
......@@ -25,6 +25,11 @@ from nbxmpp.modules.base import BaseModule
class Mood(BaseModule):
_depends = {
'publish': 'PubSub'
}
def __init__(self, client):
BaseModule.__init__(self, client)
......@@ -80,6 +85,4 @@ class Mood(BaseModule):
if data.text:
item.addChild('text', payload=data.text)
jid = self._client.get_bound_jid().bare
self._client.get_module('PubSub').publish(
jid, Namespace.MOOD, item, id_='current')
self.publish(Namespace.MOOD, item, id_='current')
......@@ -23,6 +23,11 @@ from nbxmpp.modules.base import BaseModule
class Nickname(BaseModule):
_depends = {
'publish': 'PubSub'
}
def __init__(self, client):
BaseModule.__init__(self, client)
......@@ -87,6 +92,5 @@ class Nickname(BaseModule):
item = Node('nick', {'xmlns': Namespace.NICK})
if nickname is not None:
item.addData(nickname)
jid = self._client.get_bound_jid().bare
self._client.get_module('PubSub').publish(
jid, Namespace.NICK, item, id_='current')
self.publish(Namespace.NICK, item, id_='current')
......@@ -35,6 +35,11 @@ from nbxmpp.modules.base import BaseModule
class OMEMO(BaseModule):
_depends = {
'publish': 'PubSub'
}
def __init__(self, client):
BaseModule.__init__(self, client)
......@@ -192,9 +197,8 @@ class OMEMO(BaseModule):
item.addChild('device').setAttr('id', device)
self._log.info('Set devicelist: %s', devicelist)
jid = self._client.get_bound_jid().bare
self._client.get_module('PubSub').publish(
jid, Namespace.OMEMO_TEMP_DL, item, id_='current')
self.publish(Namespace.OMEMO_TEMP_DL, item, id_='current')
@call_on_response('_devicelist_received')
def request_devicelist(self, jid=None):
......@@ -225,9 +229,8 @@ class OMEMO(BaseModule):
self._log.info('Set bundle')
node = '%s:%s' % (Namespace.OMEMO_TEMP_BUNDLE, device_id)
jid = self._client.get_bound_jid().bare
self._client.get_module('PubSub').publish(
jid, node, item, id_='current')
self.publish(node, item, id_='current')
@staticmethod
def _create_bundle(bundle):
......
......@@ -38,6 +38,11 @@ from nbxmpp.modules.base import BaseModule
class OpenPGP(BaseModule):
_depends = {
'publish': 'PubSub'
}
def __init__(self, client):
BaseModule.__init__(self, client)
......@@ -154,9 +159,8 @@ class OpenPGP(BaseModule):
item.addChild('pubkey-metadata', attrs=attrs)
self._log.info('Set keylist: %s', keylist)
jid = self._client.get_bound_jid().bare
self._client.get_module('PubSub').publish(
jid, Namespace.OPENPGP_PK, item, id_='current')
self.publish(Namespace.OPENPGP_PK, item, id_='current')
def set_public_key(self, key, fingerprint, date):
date = time.strftime(
......@@ -168,9 +172,8 @@ class OpenPGP(BaseModule):
node = '%s:%s' % (Namespace.OPENPGP_PK, fingerprint)
self._log.info('Set public key')
jid = self._client.get_bound_jid().bare
self._client.get_module('PubSub').publish(
jid, node, item, id_='current')
self.publish(node, item, id_='current')
@call_on_response('_public_key_received')
def request_public_key(self, jid, fingerprint):
......@@ -274,9 +277,8 @@ class OpenPGP(BaseModule):
item.setData(b64encode(secret_key))
self._log.info('Set secret key')
jid = self._client.get_bound_jid().bare
self._client.get_module('PubSub').publish(
jid, Namespace.OPENPGP_SK, item, id_='current')
self.publish(Namespace.OPENPGP_SK, item, id_='current')
def parse_signcrypt(stanza):
......
# Copyright (C) 2018 Philipp Hörist <philipp AT hoerist.com>
# Copyright (C) 2020 Philipp Hörist <philipp AT hoerist.com>
#
# This file is part of nbxmpp.
#
......@@ -15,20 +15,20 @@
# You should have received a copy of the GNU General Public License
# along with this program; If not, see <http://www.gnu.org/licenses/>.
from nbxmpp.namespaces import Namespace
from nbxmpp.protocol import Node
from nbxmpp.protocol import Iq
from nbxmpp.protocol import isResultNode
from collections import namedtuple
from nbxmpp.task import iq_request_task
from nbxmpp.errors import is_error
from nbxmpp.errors import PubSubStanzaError
from nbxmpp.errors import MalformedStanzaError
from nbxmpp.structs import StanzaHandler
from nbxmpp.structs import PubSubEventData
from nbxmpp.structs import CommonResult
from nbxmpp.structs import PubSubConfigResult
from nbxmpp.structs import PubSubPublishResult
from nbxmpp.modules.dataforms import extend_form
from nbxmpp.util import call_on_response
from nbxmpp.util import callback
from nbxmpp.util import raise_error
from nbxmpp.protocol import Iq
from nbxmpp.protocol import Node
from nbxmpp.namespaces import Namespace
from nbxmpp.modules.base import BaseModule
from nbxmpp.modules.util import process_response
from nbxmpp.modules.dataforms import extend_form
class PubSub(BaseModule):
......@@ -84,107 +84,101 @@ class PubSub(BaseModule):
id_ = item.getAttr('id')
properties.pubsub_event = PubSubEventData(node, id_, item)
@call_on_response('_publish_result_received')
def publish(self, jid, node, item, id_=None, options=None):
query = Iq('set', to=jid)
pubsub = query.addChild('pubsub', namespace=Namespace.PUBSUB)
publish = pubsub.addChild('publish', {'node': node})
attrs = {}
if id_ is not None:
attrs = {'id': id_}
publish.addChild('item', attrs, [item])
if options:
publish = pubsub.addChild('publish-options')
publish.addChild(node=options)
return query
@callback
def _publish_result_received(self, stanza):
if not isResultNode(stanza):
return raise_error(self._log.warning, stanza)
jid = stanza.getFrom()
pubsub = stanza.getTag('pubsub', namespace=Namespace.PUBSUB)
if pubsub is None:
# XEP-0060: IQ payload is not mandatory on result
return PubSubPublishResult(jid, None, None)
publish = pubsub.getTag('publish')
if publish is None:
return raise_error(self._log.warning, stanza, 'stanza-malformed')
node = publish.getAttr('node')
item = publish.getTag('item')
if item is None:
return raise_error(self._log.warning, stanza, 'stanza-malformed')
id_ = item.getAttr('id')
return PubSubPublishResult(jid, node, id_)
@call_on_response('_default_response')
def retract(self, jid, node, id_, notify=True):
query = Iq('set', to=jid)
pubsub = query.addChild('pubsub', namespace=Namespace.PUBSUB)
attrs = {'node': node}
if notify:
attrs['notify'] = 'true'
retract = pubsub.addChild('retract', attrs=attrs)
retract.addChild('item', {'id': id_})
return query
@call_on_response('_default_response')
def set_node_configuration(self, jid, node, form):
self._log.info('Set configuration for %s %s', node, jid)
query = Iq('set', to=jid)
pubsub = query.addChild('pubsub', namespace=Namespace.PUBSUB_OWNER)
configure = pubsub.addChild('configure', {'node': node})
form.setAttr('type', 'submit')
configure.addChild(node=form)
return query
@call_on_response('_node_configuration_received')
def get_node_configuration(self, jid, node):
self._log.info('Request node configuration')
query = Iq('get', to=jid)
pubsub = query.addChild('pubsub', namespace=Namespace.PUBSUB_OWNER)
pubsub.addChild('configure', {'node': node})
return query
@callback
def _node_configuration_received(self, stanza):
if not isResultNode(stanza):
return raise_error(self._log.warning, stanza)
jid = stanza.getFrom()
pubsub = stanza.getTag('pubsub', namespace=Namespace.PUBSUB_OWNER)
if pubsub is None:
return raise_error(self._log.warning, stanza, 'stanza-malformed',
'No pubsub node found')
configure = pubsub.getTag('configure')
if configure is None:
return raise_error(self._log.warning, stanza, 'stanza-malformed',
'No configure node found')
node = configure.getAttr('node')
forms = configure.getTags('x', namespace=Namespace.DATA)
for form in forms:
dataform = extend_form(node=form)
form_type = dataform.vars.get('FORM_TYPE')
if form_type is None or form_type.value != Namespace.PUBSUB_CONFIG:
continue
self._log.info('Node configuration received from: %s', jid)
return PubSubConfigResult(jid=jid, node=node, form=dataform)
return raise_error(self._log.warning, stanza, 'stanza-malformed',
'No valid form type found')
@callback
def _default_response(self, stanza):
if not isResultNode(stanza):
return raise_error(self._log.info, stanza)
return CommonResult(jid=stanza.getFrom())
@iq_request_task
def request_item(self, node, id_, jid=None):
task = yield
response = yield _make_pubsub_request(node, id_=id_, jid=jid)
if response.isError():
raise PubSubStanzaError(response)
item = _get_pubsub_item(response, node, id_)
yield task.set_result(item)
@iq_request_task
def request_items(self, node, max_items=None, jid=None):
_task = yield
response = yield _make_pubsub_request(node,
max_items=max_items,
jid=jid)
if response.isError():
raise PubSubStanzaError(response)
yield _get_pubsub_items(response, node)
@iq_request_task
def publish(self,
node,
item,
id_=None,
options=None,
jid=None,
force_node_options=False):
_task = yield
request = _make_publish_request(node, item, id_, options, jid)
response = yield request
if response.isError():
error = PubSubStanzaError(response)
if (not force_node_options or
error.app_condition != 'precondition-not-met'):
raise error
result = yield self.reconfigure_node(node, options, jid)
if is_error(result):
raise result
response = yield request
if response.isError():
raise PubSubStanzaError(response)
jid = response.getFrom()
item_id = _get_published_item_id(response, node, id_)
yield PubSubPublishResult(jid, node, item_id)
@iq_request_task
def retract(self, node, id_, jid=None, notify=True):
_task = yield
response = yield _make_retract_request(node, id_, jid, notify)
yield process_response(response)
@iq_request_task
def reconfigure_node(self, node, options, jid=None):
_task = yield
result = yield self.get_node_configuration(node, jid)
if is_error(result):
raise result
_apply_options(result.form, options)
result = yield self.set_node_configuration(node, result.form, jid)
yield result
@iq_request_task
def set_node_configuration(self, node, form, jid=None):
_task = yield
response = yield _make_node_configuration(node, form, jid)
yield process_response(response)
@iq_request_task
def get_node_configuration(self, node, jid=None):
_task = yield
response = yield _make_node_configuration_request(node, jid)
if response.isError():
raise PubSubStanzaError(response)
jid = response.getFrom()
form = _get_configure_form(response, node)
yield PubSubNodeConfigurationResult(jid=jid, node=node, form=form)
def get_pubsub_request(jid, node, id_=None, max_items=None):
......@@ -225,3 +219,171 @@ def get_publish_options(config):
field = options.addChild('field', attrs={'var': var})
field.setTagData('value', value)
return options
def _get_pubsub_items(response, node):
pubsub_node = response.getTag('pubsub', namespace=Namespace.PUBSUB)
if pubsub_node is None:
raise MalformedStanzaError('pubsub node missing', response)
items_node = pubsub_node.getTag('items')
if items_node is None:
raise MalformedStanzaError('items node missing', response)
if items_node.getAttr('node') != node:
raise MalformedStanzaError('invalid node attr', response)
return items_node.getTags('item')
def _get_pubsub_item(response, node, id_):
items = _get_pubsub_items(response, node)