diff --git a/gajim/common/modules/muc.py b/gajim/common/modules/muc.py index d6681728371e2efa5e6c9a9f9ce6ca7c56c74320..b9989c8c42f085e0d17f738800680dd27c273d90 100644 --- a/gajim/common/modules/muc.py +++ b/gajim/common/modules/muc.py @@ -15,24 +15,37 @@ # XEP-0045: Multi-User Chat # XEP-0249: Direct MUC Invitations +from __future__ import annotations + +from typing import Any +from typing import Optional + import logging from collections import defaultdict import time import nbxmpp -from nbxmpp.namespaces import Namespace from nbxmpp.const import InviteType from nbxmpp.const import PresenceType from nbxmpp.const import StatusCode +from nbxmpp.errors import StanzaError +from nbxmpp.modules.dataforms import SimpleDataForm +from nbxmpp.namespaces import Namespace from nbxmpp.protocol import JID +from nbxmpp.protocol import Message +from nbxmpp.protocol import Presence +from nbxmpp.structs import DiscoInfo +from nbxmpp.structs import MessageProperties +from nbxmpp.structs import PresenceProperties from nbxmpp.structs import StanzaHandler -from nbxmpp.errors import StanzaError +from nbxmpp.task import Task from gi.repository import GLib from gajim.common import app from gajim.common import helpers -from gajim.common.const import KindConstant +from gajim.common import types +from gajim.common.const import ClientState, KindConstant from gajim.common.const import MUCJoinedState from gajim.common.events import MessageModerated from gajim.common.events import MucAdded @@ -46,7 +59,6 @@ from gajim.common.modules.bits_of_binary import store_bob_data from gajim.common.modules.base import BaseModule - log = logging.getLogger('gajim.c.m.muc') @@ -72,7 +84,7 @@ class MUC(BaseModule): 'retract_message' ] - def __init__(self, con): + def __init__(self, con: types.Client) -> None: BaseModule.__init__(self, con) self.handlers = [ @@ -121,30 +133,38 @@ def __init__(self, con): self._con.connect_signal('resume-failed', self._on_client_resume_failed) - self._rejoin_muc = set() - self._join_timeouts = {} - self._rejoin_timeouts = {} + self._rejoin_muc: set[str] = set() + self._join_timeouts: dict[str, int] = {} + self._rejoin_timeouts: dict[str, int] = {} self._muc_service_jid = None - self._joined_users = defaultdict(dict) - self._mucs = {} + self._joined_users: defaultdict[ + str, dict[str, MUCPresenceData]] = defaultdict(dict) + self._mucs: dict[str, MUCData] = {} self._muc_nicknames = {} - def _on_resume_failed(self, _client, _signal_name): + def _on_resume_failed(self, + _client: types.Client, + _signal_name: str + ) -> None: self._reset_presence() - def _on_state_changed(self, _client, _signal_name, state): + def _on_state_changed(self, + _client: types.Client, + _signal_name: str, + state: ClientState + ) -> None: if state.is_disconnected: self._reset_presence() @property - def supported(self): + def supported(self) -> bool: return self._muc_service_jid is not None @property def service_jid(self): return self._muc_service_jid - def pass_disco(self, info): + def pass_disco(self, info: DiscoInfo) -> None: if info.is_gateway: return @@ -158,17 +178,17 @@ def pass_disco(self, info): self._muc_service_jid = info.jid raise nbxmpp.NodeProcessed - def get_muc_data(self, room_jid): + def get_muc_data(self, room_jid: str) -> Optional[MUCData]: return self._mucs.get(room_jid) - def set_password(self, room_jid, password): + def set_password(self, room_jid: str, password: str) -> None: muc_data = self.get_muc_data(room_jid) muc_data.password = password - def _get_mucs_with_state(self, states): + def _get_mucs_with_state(self, states: list[MUCJoinedState]): return [muc for muc in self._mucs.values() if muc.state in states] - def _set_muc_state(self, room_jid, state): + def _set_muc_state(self, room_jid: str, state: MUCJoinedState) -> None: try: muc = self._mucs[room_jid] except KeyError: @@ -184,7 +204,7 @@ def _set_muc_state(self, room_jid, state): contact = self._get_contact(room_jid, groupchat=True) contact.notify('state-changed') - def _reset_state(self): + def _reset_state(self) -> None: for room_jid in list(self._rejoin_timeouts.keys()): self._remove_rejoin_timeout(room_jid) @@ -200,7 +220,12 @@ def _reset_state(self): self._joined_users.clear() - def _create_muc_data(self, room_jid, nick, password, config): + def _create_muc_data(self, + room_jid: str, + nick: Optional[str], + password: Optional[str], + config: Optional[dict[str, Any]] + ) -> MUCData: if not nick: nick = get_group_chat_nick(self._account, room_jid) @@ -212,7 +237,12 @@ def _create_muc_data(self, room_jid, nick, password, config): return MUCData(room_jid, nick, password, config) - def join(self, jid, nick=None, password=None, config=None): + def join(self, + jid: str, + nick: Optional[str] = None, + password: Optional[str] = None, + config: Optional[dict[str, Any]] = None + ) -> None: if not app.account_is_available(self._account): return @@ -245,7 +275,7 @@ def join(self, jid, nick=None, password=None, config=None): else: self._join(muc_data) - def create(self, jid, config): + def create(self, jid: str, config: dict[str, Any]) -> None: if not app.account_is_available(self._account): return @@ -259,7 +289,7 @@ def _push_muc_added_event(self, jid: str) -> None: app.ged.raise_event(MucAdded(account=self._account, jid=JID.from_string(jid))) - def _on_disco_result(self, task): + def _on_disco_result(self, task: Task) -> None: try: result = task.finish() except StanzaError as error: @@ -275,7 +305,7 @@ def _on_disco_result(self, task): return self._join(muc_data) - def _join(self, muc_data): + def _join(self, muc_data: MUCData) -> None: presence = self._con.get_module('Presence').get_presence( muc_data.occupant_jid, show=self._con.status, @@ -291,14 +321,14 @@ def _join(self, muc_data): self._set_muc_state(muc_data.jid, MUCJoinedState.JOINING) self._con.send_stanza(presence) - def _rejoin(self, room_jid): + def _rejoin(self, room_jid: str) -> bool: muc_data = self._mucs[room_jid] if muc_data.state.is_not_joined: self._log.info('Rejoin %s', room_jid) self._join(muc_data) return True - def _create(self, muc_data): + def _create(self, muc_data: MUCData) -> None: presence = self._con.get_module('Presence').get_presence( muc_data.occupant_jid, show=self._con.status, @@ -310,7 +340,10 @@ def _create(self, muc_data): self._set_muc_state(muc_data.jid, MUCJoinedState.CREATING) self._con.send_stanza(presence) - def leave(self, room_jid, reason=None): + def leave(self, + room_jid: str, + reason: Optional[str] = None + ) -> None: self._log.info('Leave MUC: %s', room_jid) self._con.get_module('Bookmarks').modify(room_jid, autojoin=False) @@ -336,11 +369,11 @@ def leave(self, room_jid, reason=None): room.set_not_joined() room.notify('room-left') - def configure_room(self, room_jid): + def configure_room(self, room_jid: str) -> None: self._nbxmpp('MUC').request_config(room_jid, callback=self._on_room_config) - def _on_room_config(self, task): + def _on_room_config(self, task: Task) -> None: try: result = task.finish() except StanzaError as error: @@ -359,7 +392,9 @@ def _on_room_config(self, task): callback=self._on_config_result) @staticmethod - def _apply_config(form, config=None): + def _apply_config(form: SimpleDataForm, + config: Optional[dict[str, Any]] = None + ) -> None: default_config = get_default_muc_config() if config is not None: default_config.update(config) @@ -371,7 +406,7 @@ def _apply_config(form, config=None): else: field.value = value - def _on_config_result(self, task): + def _on_config_result(self, task: Task) -> None: try: result = task.finish() except StanzaError as error: @@ -398,7 +433,7 @@ def _on_config_result(self, task): for jid in invites: self.invite(result.jid, jid) - def _on_disco_result_after_config(self, task): + def _on_disco_result_after_config(self, task: Task) -> None: try: result = task.finish() except StanzaError as error: @@ -414,7 +449,7 @@ def _on_disco_result_after_config(self, task): room = self._get_contact(jid.bare) room.notify('room-config-finished') - def update_presence(self): + def update_presence(self) -> None: mucs = self._get_mucs_with_state([MUCJoinedState.JOINED, MUCJoinedState.JOINING]) @@ -426,14 +461,18 @@ def update_presence(self): status=message, idle_time=idle) - def change_nick(self, room_jid, new_nick): + def change_nick(self, room_jid: str, new_nick: str) -> None: status, message, _idle = self._con.get_presence_state() self._con.get_module('Presence').send_presence( - '%s/%s' % (room_jid, new_nick), + f'{room_jid}/{new_nick}', show=status, status=message) - def _on_error_presence(self, _con, stanza, properties): + def _on_error_presence(self, + _con: types.xmppClient, + stanza: Presence, + properties: PresenceProperties + ) -> None: room_jid = properties.jid.bare muc_data = self._mucs.get(room_jid) if muc_data is None: @@ -479,7 +518,11 @@ def _on_error_presence(self, _con, stanza, properties): else: room.notify('room-presence-error', properties) - def _on_muc_user_presence(self, _con, stanza, properties): + def _on_muc_user_presence(self, + _con: types.xmppClient, + stanza: Presence, + properties: PresenceProperties + ) -> None: if properties.type == PresenceType.ERROR: return @@ -569,7 +612,9 @@ def _on_muc_user_presence(self, _con, stanza, properties): presence = self._process_user_presence(properties) occupant.update_presence(presence, properties) - def _process_user_presence(self, properties): + def _process_user_presence(self, + properties: PresenceProperties + ) -> MUCPresenceData: jid = properties.jid muc_presence = MUCPresenceData.from_presence(properties) if not muc_presence.available: @@ -578,24 +623,24 @@ def _process_user_presence(self, properties): self._joined_users[jid.bare][jid.resource] = muc_presence return muc_presence - def _is_user_joined(self, jid): + def _is_user_joined(self, jid: Optional[JID]) -> bool: try: self._joined_users[jid.bare][jid.resource] except KeyError: return False return True - def get_joined_users(self, jid): + def get_joined_users(self, jid: str) -> list[str]: return list(self._joined_users[jid].keys()) - def _start_rejoin_timeout(self, room_jid): + def _start_rejoin_timeout(self, room_jid: str) -> None: self._remove_rejoin_timeout(room_jid) self._rejoin_muc.add(room_jid) self._log.info('Start rejoin timeout for: %s', room_jid) id_ = GLib.timeout_add_seconds(2, self._rejoin, room_jid) self._rejoin_timeouts[room_jid] = id_ - def _remove_rejoin_timeout(self, room_jid): + def _remove_rejoin_timeout(self, room_jid: str) -> None: self._rejoin_muc.discard(room_jid) id_ = self._rejoin_timeouts.get(room_jid) if id_ is not None: @@ -603,21 +648,24 @@ def _remove_rejoin_timeout(self, room_jid): GLib.source_remove(id_) del self._rejoin_timeouts[room_jid] - def _start_join_timeout(self, room_jid): + def _start_join_timeout(self, room_jid: str) -> None: self._remove_join_timeout(room_jid) self._log.info('Start join timeout for: %s', room_jid) id_ = GLib.timeout_add_seconds( 10, self._fake_subject_change, room_jid) self._join_timeouts[room_jid] = id_ - def _remove_join_timeout(self, room_jid): + def _remove_join_timeout(self, room_jid: str) -> None: id_ = self._join_timeouts.get(room_jid) if id_ is not None: self._log.info('Remove join timeout for: %s', room_jid) GLib.source_remove(id_) del self._join_timeouts[room_jid] - def _log_muc_event(self, event_name, properties): + def _log_muc_event(self, + event_name: str, + properties: PresenceProperties + ) -> None: # TODO CURRENTLY NOT USED if event_name not in ['muc-user-joined', 'muc-user-left', @@ -650,7 +698,11 @@ def _log_muc_event(self, event_name, properties): show=show, additional_data=additional_data) - def _on_subject_change(self, _con, _stanza, properties): + def _on_subject_change(self, + _con: types.xmppClient, + _stanza: Message, + properties: MessageProperties + ) -> None: if not properties.is_muc_subject: return @@ -675,7 +727,11 @@ def _on_subject_change(self, _con, _stanza, properties): raise nbxmpp.NodeProcessed - def _on_moderation(self, _con, _stanza, properties): + def _on_moderation(self, + _con: types.xmppClient, + _stanza: Message, + properties: MessageProperties + ) -> None: if not properties.is_moderation: return @@ -689,7 +745,7 @@ def _on_moderation(self, _con, _stanza, properties): raise nbxmpp.NodeProcessed - def _fake_subject_change(self, room_jid): + def _fake_subject_change(self, room_jid: str) -> None: # This is for servers which don’t send empty subjects as part of the # event order on joining a MUC. For example jabber.ru self._log.warning('Fake subject received for %s', room_jid) @@ -697,7 +753,7 @@ def _fake_subject_change(self, room_jid): room = self._get_contact(room_jid) room.notify('room-joined') - def _room_join_complete(self, muc_data): + def _room_join_complete(self, muc_data: MUCData): self._remove_join_timeout(muc_data.jid) self._set_muc_state(muc_data.jid, MUCJoinedState.JOINED) self._remove_rejoin_timeout(muc_data.jid) @@ -714,7 +770,11 @@ def _room_join_complete(self, muc_data): self._con.get_module('MAM').request_archive_on_muc_join( muc_data.jid) - def _on_voice_request(self, _con, _stanza, properties): + def _on_voice_request(self, + _con: types.xmppClient, + _stanza: Message, + properties: MessageProperties + ) -> None: if not properties.is_voice_request: return @@ -723,7 +783,11 @@ def _on_voice_request(self, _con, _stanza, properties): raise nbxmpp.NodeProcessed - def _on_captcha_challenge(self, _con, _stanza, properties): + def _on_captcha_challenge(self, + _con: types.xmppClient, + _stanza: Message, + properties: MessageProperties + ) -> None: if not properties.is_captcha_challenge: return @@ -753,7 +817,7 @@ def _on_captcha_challenge(self, _con, _stanza, properties): raise nbxmpp.NodeProcessed - def cancel_captcha(self, room_jid): + def cancel_captcha(self, room_jid: str) -> None: muc_data = self._mucs.get(room_jid) if muc_data is None: return @@ -765,13 +829,16 @@ def cancel_captcha(self, room_jid): self._set_muc_state(room_jid, MUCJoinedState.CAPTCHA_FAILED) self._set_muc_state(room_jid, MUCJoinedState.NOT_JOINED) - def send_captcha(self, room_jid, form_node): + def send_captcha(self, + room_jid: str, + form_node: SimpleDataForm + ) -> None: self._set_muc_state(room_jid, MUCJoinedState.JOINING) self._nbxmpp('MUC').send_captcha(room_jid, form_node, callback=self._on_captcha_result) - def _on_captcha_result(self, task): + def _on_captcha_result(self, task: Task) -> None: try: task.finish() except StanzaError as error: @@ -782,7 +849,11 @@ def _on_captcha_result(self, task): room = self._get_contact(error.jid) room.notify('room-captcha-error', error) - def _on_config_change(self, _con, _stanza, properties): + def _on_config_change(self, + _con: types.xmppClient, + _stanza: Message, + properties: MessageProperties + ) -> None: if not properties.is_muc_config_change: return @@ -795,7 +866,11 @@ def _on_config_change(self, _con, _stanza, properties): raise nbxmpp.NodeProcessed - def _on_invite_or_decline(self, _con, _stanza, properties): + def _on_invite_or_decline(self, + _con: types.xmppClient, + _stanza: Message, + properties: MessageProperties + ) -> None: if properties.muc_decline is not None: data = properties.muc_decline if helpers.ignore_contact(self._account, data.from_): @@ -830,7 +905,7 @@ def _on_invite_or_decline(self, _con, _stanza, properties): raise nbxmpp.NodeProcessed - def _on_disco_result_after_invite(self, task): + def _on_disco_result_after_invite(self, task: Task) -> None: try: result = task.finish() except StanzaError as error: @@ -843,7 +918,12 @@ def _on_disco_result_after_invite(self, task): info=result.info, **invite_data._asdict())) - def invite(self, room, jid, reason=None, continue_=False): + def invite(self, + room: str, + jid: str, + reason: Optional[str] = None, + continue_: bool = False + ) -> str: type_ = InviteType.MEDIATED contact = self._get_contact(jid) if contact and contact.supports(Namespace.CONFERENCE): @@ -851,12 +931,19 @@ def invite(self, room, jid, reason=None, continue_=False): password = self._mucs[room].password self._log.info('Invite %s to %s', jid, room) - return self._nbxmpp('MUC').invite(room, jid, reason, password, - continue_, type_) - - def _on_client_state_changed(self, _client, _signal_name, state): + return self._nbxmpp('MUC').invite( + room, jid, reason, password, continue_, type_) + + def _on_client_state_changed(self, + _client: types.Client, + _signal_name: str, + state: ClientState + ) -> None: if state.is_disconnected: self._reset_state() - def _on_client_resume_failed(self, _client, _signal_name): + def _on_client_resume_failed(self, + _client: types.Client, + _signal_name: str + ) -> None: self._reset_state()