Skip to content
Snippets Groups Projects
Commit c0e0d3b5 authored by Daniel Brötzmann's avatar Daniel Brötzmann
Browse files

chore: MUC: Add type annotations

parent 4ea4d916
No related branches found
No related tags found
No related merge requests found
......@@ -15,24 +15,37 @@
# XEP-0045: Multi-User Chat
# XEP-0249: Direct MUC Invitations
from __future__ import annotations
from typing import Any
from typing import Optional
import logging
from collections import defaultdict
import time
import nbxmpp
from nbxmpp.namespaces import Namespace
from nbxmpp.const import InviteType
from nbxmpp.const import PresenceType
from nbxmpp.const import StatusCode
from nbxmpp.errors import StanzaError
from nbxmpp.modules.dataforms import SimpleDataForm
from nbxmpp.namespaces import Namespace
from nbxmpp.protocol import JID
from nbxmpp.protocol import Message
from nbxmpp.protocol import Presence
from nbxmpp.structs import DiscoInfo
from nbxmpp.structs import MessageProperties
from nbxmpp.structs import PresenceProperties
from nbxmpp.structs import StanzaHandler
from nbxmpp.errors import StanzaError
from nbxmpp.task import Task
from gi.repository import GLib
from gajim.common import app
from gajim.common import helpers
from gajim.common.const import KindConstant
from gajim.common import types
from gajim.common.const import ClientState, KindConstant
from gajim.common.const import MUCJoinedState
from gajim.common.events import MessageModerated
from gajim.common.events import MucAdded
......@@ -46,7 +59,6 @@
from gajim.common.modules.bits_of_binary import store_bob_data
from gajim.common.modules.base import BaseModule
log = logging.getLogger('gajim.c.m.muc')
......@@ -72,7 +84,7 @@ class MUC(BaseModule):
'retract_message'
]
def __init__(self, con):
def __init__(self, con: types.Client) -> None:
BaseModule.__init__(self, con)
self.handlers = [
......@@ -121,30 +133,38 @@ def __init__(self, con):
self._con.connect_signal('resume-failed',
self._on_client_resume_failed)
self._rejoin_muc = set()
self._join_timeouts = {}
self._rejoin_timeouts = {}
self._rejoin_muc: set[str] = set()
self._join_timeouts: dict[str, int] = {}
self._rejoin_timeouts: dict[str, int] = {}
self._muc_service_jid = None
self._joined_users = defaultdict(dict)
self._mucs = {}
self._joined_users: defaultdict[
str, dict[str, MUCPresenceData]] = defaultdict(dict)
self._mucs: dict[str, MUCData] = {}
self._muc_nicknames = {}
def _on_resume_failed(self, _client, _signal_name):
def _on_resume_failed(self,
_client: types.Client,
_signal_name: str
) -> None:
self._reset_presence()
def _on_state_changed(self, _client, _signal_name, state):
def _on_state_changed(self,
_client: types.Client,
_signal_name: str,
state: ClientState
) -> None:
if state.is_disconnected:
self._reset_presence()
@property
def supported(self):
def supported(self) -> bool:
return self._muc_service_jid is not None
@property
def service_jid(self):
return self._muc_service_jid
def pass_disco(self, info):
def pass_disco(self, info: DiscoInfo) -> None:
if info.is_gateway:
return
......@@ -158,17 +178,17 @@ def pass_disco(self, info):
self._muc_service_jid = info.jid
raise nbxmpp.NodeProcessed
def get_muc_data(self, room_jid):
def get_muc_data(self, room_jid: str) -> Optional[MUCData]:
return self._mucs.get(room_jid)
def set_password(self, room_jid, password):
def set_password(self, room_jid: str, password: str) -> None:
muc_data = self.get_muc_data(room_jid)
muc_data.password = password
def _get_mucs_with_state(self, states):
def _get_mucs_with_state(self, states: list[MUCJoinedState]):
return [muc for muc in self._mucs.values() if muc.state in states]
def _set_muc_state(self, room_jid, state):
def _set_muc_state(self, room_jid: str, state: MUCJoinedState) -> None:
try:
muc = self._mucs[room_jid]
except KeyError:
......@@ -184,7 +204,7 @@ def _set_muc_state(self, room_jid, state):
contact = self._get_contact(room_jid, groupchat=True)
contact.notify('state-changed')
def _reset_state(self):
def _reset_state(self) -> None:
for room_jid in list(self._rejoin_timeouts.keys()):
self._remove_rejoin_timeout(room_jid)
......@@ -200,7 +220,12 @@ def _reset_state(self):
self._joined_users.clear()
def _create_muc_data(self, room_jid, nick, password, config):
def _create_muc_data(self,
room_jid: str,
nick: Optional[str],
password: Optional[str],
config: Optional[dict[str, Any]]
) -> MUCData:
if not nick:
nick = get_group_chat_nick(self._account, room_jid)
......@@ -212,7 +237,12 @@ def _create_muc_data(self, room_jid, nick, password, config):
return MUCData(room_jid, nick, password, config)
def join(self, jid, nick=None, password=None, config=None):
def join(self,
jid: str,
nick: Optional[str] = None,
password: Optional[str] = None,
config: Optional[dict[str, Any]] = None
) -> None:
if not app.account_is_available(self._account):
return
......@@ -245,7 +275,7 @@ def join(self, jid, nick=None, password=None, config=None):
else:
self._join(muc_data)
def create(self, jid, config):
def create(self, jid: str, config: dict[str, Any]) -> None:
if not app.account_is_available(self._account):
return
......@@ -259,7 +289,7 @@ def _push_muc_added_event(self, jid: str) -> None:
app.ged.raise_event(MucAdded(account=self._account,
jid=JID.from_string(jid)))
def _on_disco_result(self, task):
def _on_disco_result(self, task: Task) -> None:
try:
result = task.finish()
except StanzaError as error:
......@@ -275,7 +305,7 @@ def _on_disco_result(self, task):
return
self._join(muc_data)
def _join(self, muc_data):
def _join(self, muc_data: MUCData) -> None:
presence = self._con.get_module('Presence').get_presence(
muc_data.occupant_jid,
show=self._con.status,
......@@ -291,14 +321,14 @@ def _join(self, muc_data):
self._set_muc_state(muc_data.jid, MUCJoinedState.JOINING)
self._con.send_stanza(presence)
def _rejoin(self, room_jid):
def _rejoin(self, room_jid: str) -> bool:
muc_data = self._mucs[room_jid]
if muc_data.state.is_not_joined:
self._log.info('Rejoin %s', room_jid)
self._join(muc_data)
return True
def _create(self, muc_data):
def _create(self, muc_data: MUCData) -> None:
presence = self._con.get_module('Presence').get_presence(
muc_data.occupant_jid,
show=self._con.status,
......@@ -310,7 +340,10 @@ def _create(self, muc_data):
self._set_muc_state(muc_data.jid, MUCJoinedState.CREATING)
self._con.send_stanza(presence)
def leave(self, room_jid, reason=None):
def leave(self,
room_jid: str,
reason: Optional[str] = None
) -> None:
self._log.info('Leave MUC: %s', room_jid)
self._con.get_module('Bookmarks').modify(room_jid, autojoin=False)
......@@ -336,11 +369,11 @@ def leave(self, room_jid, reason=None):
room.set_not_joined()
room.notify('room-left')
def configure_room(self, room_jid):
def configure_room(self, room_jid: str) -> None:
self._nbxmpp('MUC').request_config(room_jid,
callback=self._on_room_config)
def _on_room_config(self, task):
def _on_room_config(self, task: Task) -> None:
try:
result = task.finish()
except StanzaError as error:
......@@ -359,7 +392,9 @@ def _on_room_config(self, task):
callback=self._on_config_result)
@staticmethod
def _apply_config(form, config=None):
def _apply_config(form: SimpleDataForm,
config: Optional[dict[str, Any]] = None
) -> None:
default_config = get_default_muc_config()
if config is not None:
default_config.update(config)
......@@ -371,7 +406,7 @@ def _apply_config(form, config=None):
else:
field.value = value
def _on_config_result(self, task):
def _on_config_result(self, task: Task) -> None:
try:
result = task.finish()
except StanzaError as error:
......@@ -398,7 +433,7 @@ def _on_config_result(self, task):
for jid in invites:
self.invite(result.jid, jid)
def _on_disco_result_after_config(self, task):
def _on_disco_result_after_config(self, task: Task) -> None:
try:
result = task.finish()
except StanzaError as error:
......@@ -414,7 +449,7 @@ def _on_disco_result_after_config(self, task):
room = self._get_contact(jid.bare)
room.notify('room-config-finished')
def update_presence(self):
def update_presence(self) -> None:
mucs = self._get_mucs_with_state([MUCJoinedState.JOINED,
MUCJoinedState.JOINING])
......@@ -426,14 +461,18 @@ def update_presence(self):
status=message,
idle_time=idle)
def change_nick(self, room_jid, new_nick):
def change_nick(self, room_jid: str, new_nick: str) -> None:
status, message, _idle = self._con.get_presence_state()
self._con.get_module('Presence').send_presence(
'%s/%s' % (room_jid, new_nick),
f'{room_jid}/{new_nick}',
show=status,
status=message)
def _on_error_presence(self, _con, stanza, properties):
def _on_error_presence(self,
_con: types.xmppClient,
stanza: Presence,
properties: PresenceProperties
) -> None:
room_jid = properties.jid.bare
muc_data = self._mucs.get(room_jid)
if muc_data is None:
......@@ -479,7 +518,11 @@ def _on_error_presence(self, _con, stanza, properties):
else:
room.notify('room-presence-error', properties)
def _on_muc_user_presence(self, _con, stanza, properties):
def _on_muc_user_presence(self,
_con: types.xmppClient,
stanza: Presence,
properties: PresenceProperties
) -> None:
if properties.type == PresenceType.ERROR:
return
......@@ -569,7 +612,9 @@ def _on_muc_user_presence(self, _con, stanza, properties):
presence = self._process_user_presence(properties)
occupant.update_presence(presence, properties)
def _process_user_presence(self, properties):
def _process_user_presence(self,
properties: PresenceProperties
) -> MUCPresenceData:
jid = properties.jid
muc_presence = MUCPresenceData.from_presence(properties)
if not muc_presence.available:
......@@ -578,24 +623,24 @@ def _process_user_presence(self, properties):
self._joined_users[jid.bare][jid.resource] = muc_presence
return muc_presence
def _is_user_joined(self, jid):
def _is_user_joined(self, jid: Optional[JID]) -> bool:
try:
self._joined_users[jid.bare][jid.resource]
except KeyError:
return False
return True
def get_joined_users(self, jid):
def get_joined_users(self, jid: str) -> list[str]:
return list(self._joined_users[jid].keys())
def _start_rejoin_timeout(self, room_jid):
def _start_rejoin_timeout(self, room_jid: str) -> None:
self._remove_rejoin_timeout(room_jid)
self._rejoin_muc.add(room_jid)
self._log.info('Start rejoin timeout for: %s', room_jid)
id_ = GLib.timeout_add_seconds(2, self._rejoin, room_jid)
self._rejoin_timeouts[room_jid] = id_
def _remove_rejoin_timeout(self, room_jid):
def _remove_rejoin_timeout(self, room_jid: str) -> None:
self._rejoin_muc.discard(room_jid)
id_ = self._rejoin_timeouts.get(room_jid)
if id_ is not None:
......@@ -603,21 +648,24 @@ def _remove_rejoin_timeout(self, room_jid):
GLib.source_remove(id_)
del self._rejoin_timeouts[room_jid]
def _start_join_timeout(self, room_jid):
def _start_join_timeout(self, room_jid: str) -> None:
self._remove_join_timeout(room_jid)
self._log.info('Start join timeout for: %s', room_jid)
id_ = GLib.timeout_add_seconds(
10, self._fake_subject_change, room_jid)
self._join_timeouts[room_jid] = id_
def _remove_join_timeout(self, room_jid):
def _remove_join_timeout(self, room_jid: str) -> None:
id_ = self._join_timeouts.get(room_jid)
if id_ is not None:
self._log.info('Remove join timeout for: %s', room_jid)
GLib.source_remove(id_)
del self._join_timeouts[room_jid]
def _log_muc_event(self, event_name, properties):
def _log_muc_event(self,
event_name: str,
properties: PresenceProperties
) -> None:
# TODO CURRENTLY NOT USED
if event_name not in ['muc-user-joined',
'muc-user-left',
......@@ -650,7 +698,11 @@ def _log_muc_event(self, event_name, properties):
show=show,
additional_data=additional_data)
def _on_subject_change(self, _con, _stanza, properties):
def _on_subject_change(self,
_con: types.xmppClient,
_stanza: Message,
properties: MessageProperties
) -> None:
if not properties.is_muc_subject:
return
......@@ -675,7 +727,11 @@ def _on_subject_change(self, _con, _stanza, properties):
raise nbxmpp.NodeProcessed
def _on_moderation(self, _con, _stanza, properties):
def _on_moderation(self,
_con: types.xmppClient,
_stanza: Message,
properties: MessageProperties
) -> None:
if not properties.is_moderation:
return
......@@ -689,7 +745,7 @@ def _on_moderation(self, _con, _stanza, properties):
raise nbxmpp.NodeProcessed
def _fake_subject_change(self, room_jid):
def _fake_subject_change(self, room_jid: str) -> None:
# This is for servers which don’t send empty subjects as part of the
# event order on joining a MUC. For example jabber.ru
self._log.warning('Fake subject received for %s', room_jid)
......@@ -697,7 +753,7 @@ def _fake_subject_change(self, room_jid):
room = self._get_contact(room_jid)
room.notify('room-joined')
def _room_join_complete(self, muc_data):
def _room_join_complete(self, muc_data: MUCData):
self._remove_join_timeout(muc_data.jid)
self._set_muc_state(muc_data.jid, MUCJoinedState.JOINED)
self._remove_rejoin_timeout(muc_data.jid)
......@@ -714,7 +770,11 @@ def _room_join_complete(self, muc_data):
self._con.get_module('MAM').request_archive_on_muc_join(
muc_data.jid)
def _on_voice_request(self, _con, _stanza, properties):
def _on_voice_request(self,
_con: types.xmppClient,
_stanza: Message,
properties: MessageProperties
) -> None:
if not properties.is_voice_request:
return
......@@ -723,7 +783,11 @@ def _on_voice_request(self, _con, _stanza, properties):
raise nbxmpp.NodeProcessed
def _on_captcha_challenge(self, _con, _stanza, properties):
def _on_captcha_challenge(self,
_con: types.xmppClient,
_stanza: Message,
properties: MessageProperties
) -> None:
if not properties.is_captcha_challenge:
return
......@@ -753,7 +817,7 @@ def _on_captcha_challenge(self, _con, _stanza, properties):
raise nbxmpp.NodeProcessed
def cancel_captcha(self, room_jid):
def cancel_captcha(self, room_jid: str) -> None:
muc_data = self._mucs.get(room_jid)
if muc_data is None:
return
......@@ -765,13 +829,16 @@ def cancel_captcha(self, room_jid):
self._set_muc_state(room_jid, MUCJoinedState.CAPTCHA_FAILED)
self._set_muc_state(room_jid, MUCJoinedState.NOT_JOINED)
def send_captcha(self, room_jid, form_node):
def send_captcha(self,
room_jid: str,
form_node: SimpleDataForm
) -> None:
self._set_muc_state(room_jid, MUCJoinedState.JOINING)
self._nbxmpp('MUC').send_captcha(room_jid,
form_node,
callback=self._on_captcha_result)
def _on_captcha_result(self, task):
def _on_captcha_result(self, task: Task) -> None:
try:
task.finish()
except StanzaError as error:
......@@ -782,7 +849,11 @@ def _on_captcha_result(self, task):
room = self._get_contact(error.jid)
room.notify('room-captcha-error', error)
def _on_config_change(self, _con, _stanza, properties):
def _on_config_change(self,
_con: types.xmppClient,
_stanza: Message,
properties: MessageProperties
) -> None:
if not properties.is_muc_config_change:
return
......@@ -795,7 +866,11 @@ def _on_config_change(self, _con, _stanza, properties):
raise nbxmpp.NodeProcessed
def _on_invite_or_decline(self, _con, _stanza, properties):
def _on_invite_or_decline(self,
_con: types.xmppClient,
_stanza: Message,
properties: MessageProperties
) -> None:
if properties.muc_decline is not None:
data = properties.muc_decline
if helpers.ignore_contact(self._account, data.from_):
......@@ -830,7 +905,7 @@ def _on_invite_or_decline(self, _con, _stanza, properties):
raise nbxmpp.NodeProcessed
def _on_disco_result_after_invite(self, task):
def _on_disco_result_after_invite(self, task: Task) -> None:
try:
result = task.finish()
except StanzaError as error:
......@@ -843,7 +918,12 @@ def _on_disco_result_after_invite(self, task):
info=result.info,
**invite_data._asdict()))
def invite(self, room, jid, reason=None, continue_=False):
def invite(self,
room: str,
jid: str,
reason: Optional[str] = None,
continue_: bool = False
) -> str:
type_ = InviteType.MEDIATED
contact = self._get_contact(jid)
if contact and contact.supports(Namespace.CONFERENCE):
......@@ -851,12 +931,19 @@ def invite(self, room, jid, reason=None, continue_=False):
password = self._mucs[room].password
self._log.info('Invite %s to %s', jid, room)
return self._nbxmpp('MUC').invite(room, jid, reason, password,
continue_, type_)
def _on_client_state_changed(self, _client, _signal_name, state):
return self._nbxmpp('MUC').invite(
room, jid, reason, password, continue_, type_)
def _on_client_state_changed(self,
_client: types.Client,
_signal_name: str,
state: ClientState
) -> None:
if state.is_disconnected:
self._reset_state()
def _on_client_resume_failed(self, _client, _signal_name):
def _on_client_resume_failed(self,
_client: types.Client,
_signal_name: str
) -> None:
self._reset_state()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment