Commit afef11e8 authored by Philipp Hörist's avatar Philipp Hörist

Caps: Use TaskManager for requesting disco info

parent 07a89305
Pipeline #5649 failed with stages
in 25 seconds
......@@ -11,6 +11,8 @@ from gajim.common.i18n import Q_
EncryptionData = namedtuple('EncryptionData', 'additional_data')
EncryptionData.__new__.__defaults__ = (None,) # type: ignore
Entity = namedtuple('Entity', 'jid node hash method')
class AvatarSize(IntEnum):
TAB = 16
......
......@@ -17,6 +17,9 @@
# XEP-0115: Entity Capabilities
import weakref
from collections import defaultdict
from nbxmpp.namespaces import Namespace
from nbxmpp.structs import StanzaHandler
from nbxmpp.structs import DiscoIdentity
......@@ -25,8 +28,10 @@ from nbxmpp.util import compute_caps_hash
from gajim.common import app
from gajim.common.const import COMMON_FEATURES
from gajim.common.const import Entity
from gajim.common.helpers import get_optional_features
from gajim.common.nec import NetworkEvent
from gajim.common.task_manager import Task
from gajim.common.modules.base import BaseModule
......@@ -52,6 +57,36 @@ class Caps(BaseModule):
DiscoIdentity(category='client', type='pc', name='Gajim')
]
self._queued_tasks_by_hash = defaultdict(set)
self._queued_tasks_by_jid = {}
def _queue_task(self, task):
old_task = self._get_task(task.entity.jid)
if old_task is not None:
self._remove_task(old_task)
self._log.info('Queue query for hash %s', task.entity.hash)
self._queued_tasks_by_hash[task.entity.hash].add(task)
self._queued_tasks_by_jid[task.entity.jid] = task
app.task_manager.add_task(task)
def _get_task(self, jid):
return self._queued_tasks_by_jid.get(jid)
def _get_similar_tasks(self, task):
return self._queued_tasks_by_hash.pop(task.entity.hash)
def _remove_task(self, task):
task.set_obsolete()
del self._queued_tasks_by_jid[task.entity.jid]
self._queued_tasks_by_hash[task.entity.hash].discard(task)
def _remove_all_tasks(self):
for task in self._queued_tasks_by_jid.values():
task.set_obsolete()
self._queued_tasks_by_jid.clear()
self._queued_tasks_by_hash.clear()
def _entity_caps(self, _con, _stanza, properties):
if properties.type.is_error or properties.type.is_unavailable:
return
......@@ -62,57 +97,70 @@ class Caps(BaseModule):
if properties.entity_caps is None:
return
jid = str(properties.jid)
task = EntityCapsTask(self._account, properties, self._execute_task)
hash_method = properties.entity_caps.hash
node = properties.entity_caps.node
caps_hash = properties.entity_caps.ver
self._log.info('Received %s', task.entity)
self._log.info(
'Received from %s, type: %s, method: %s, node: %s, hash: %s',
jid, properties.type, hash_method, node, caps_hash)
disco_info = app.logger.get_caps_entry(hash_method, caps_hash)
disco_info = app.logger.get_caps_entry(task.entity.method,
task.entity.hash)
if disco_info is None:
self._con.get_module('Discovery').disco_info(
jid,
'%s#%s' % (node, caps_hash),
callback=self._on_disco_info,
user_data=hash_method)
else:
app.logger.set_last_disco_info(jid, disco_info, cache_only=True)
app.nec.push_incoming_event(
NetworkEvent('caps-update',
account=self._account,
fjid=jid,
jid=properties.jid.getBare()))
self._queue_task(task)
return
jid = str(properties.jid)
app.logger.set_last_disco_info(jid, disco_info, cache_only=True)
app.nec.push_incoming_event(
NetworkEvent('caps-update',
account=self._account,
fjid=jid,
jid=properties.jid.getBare()))
def _execute_task(self, task):
self._log.info('Request %s from %s', task.entity.hash, task.entity.jid)
self._con.get_module('Discovery').disco_info(
task.entity.jid,
node=f'{task.entity.node}#{task.entity.hash}',
callback=self._on_disco_info)
def _on_disco_info(self, disco_info):
task = self._get_task(disco_info.jid)
if task is None:
self._log.info('Task not found for %s', disco_info.jid)
return
def _on_disco_info(self, disco_info, hash_method):
if is_error_result(disco_info):
self._remove_task(task)
self._log.info(disco_info)
return
bare_jid = disco_info.jid.getBare()
self._log.info('Disco Info received: %s', disco_info.jid)
try:
compute_caps_hash(disco_info)
except Exception as error:
self._remove_task(task)
self._log.warning('Disco info malformed: %s %s',
disco_info.jid, error)
return
app.logger.add_caps_entry(
str(disco_info.jid),
hash_method,
task.entity.method,
disco_info.get_caps_hash(),
disco_info)
app.nec.push_incoming_event(
NetworkEvent('caps-update',
account=self._account,
fjid=str(disco_info.jid),
jid=bare_jid))
self._log.info('Finished query for %s', task.entity.hash)
tasks = self._get_similar_tasks(task)
for task in tasks:
self._remove_task(task)
self._log.info('Update %s', task.entity.jid)
app.nec.push_incoming_event(
NetworkEvent('caps-update',
account=self._account,
fjid=str(task.entity.jid),
jid=task.entity.jid.getBare()))
def update_caps(self):
if not app.account_is_connected(self._account):
......@@ -130,6 +178,52 @@ class Caps(BaseModule):
app.connections[self._account].status,
app.connections[self._account].status_message)
def cleanup(self):
self._remove_all_tasks()
BaseModule.cleanup(self)
class EntityCapsTask(Task):
def __init__(self, account, properties, callback):
Task.__init__(self)
self._account = account
self._callback = weakref.WeakMethod(callback)
self.entity = Entity(jid=properties.jid,
node=properties.entity_caps.node,
hash=properties.entity_caps.ver,
method=properties.entity_caps.hash)
self._from_muc = properties.from_muc
def execute(self):
callback = self._callback()
if callback is not None:
callback(self)
def preconditions_met(self):
try:
client = app.get_client(self._account)
except Exception:
return False
if self._from_muc:
muc = client.get_module('MUC').get_manager().get(
self.entity.jid.getBare())
if muc is None or not muc.state.is_joined:
self.set_obsolete()
return False
return True
return client.state.is_available
def __repr__(self):
return f'Entity Caps ({self.entity.jid} {self.entity.hash})'
def __hash__(self):
return hash(self.entity)
def get_instance(*args, **kwargs):
return Caps(*args, **kwargs), 'Caps'
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment