diff --git a/gajim/common/modules/adhoc_commands.py b/gajim/common/modules/adhoc_commands.py index 6284d19e9bedaaf2a4a1f6b791a42fe2c38bc0aa..efdd5da64a15939f6d018c1694bc07595e12063d 100644 --- a/gajim/common/modules/adhoc_commands.py +++ b/gajim/common/modules/adhoc_commands.py @@ -19,14 +19,23 @@ # You should have received a copy of the GNU General Public License # along with Gajim. If not, see <http://www.gnu.org/licenses/>. +from __future__ import annotations + +from typing import Callable +from typing import Optional +from typing import Type + import nbxmpp from nbxmpp.namespaces import Namespace +from nbxmpp.protocol import Iq +from nbxmpp.structs import IqProperties from nbxmpp.structs import StanzaHandler from nbxmpp.modules import dataforms from nbxmpp.util import generate_id from gajim.common import app from gajim.common import helpers +from gajim.common import types from gajim.common.events import AdHocCommandError from gajim.common.events import AdHocCommandActionResponse from gajim.common.i18n import _ @@ -39,7 +48,7 @@ class AdHocCommand: commandfeatures = (Namespace.DATA,) @staticmethod - def is_visible_for(_samejid): + def is_visible_for(_samejid: bool) -> bool: """ This returns True if that command should be visible and invocable for others @@ -49,13 +58,21 @@ def is_visible_for(_samejid): """ return True - def __init__(self, conn, jid, sessionid): + def __init__(self, + conn: types.Client, + jid: str, + sessionid: str + ) -> None: self.connection = conn self.jid = jid self.sessionid = sessionid - def build_response(self, request, status='executing', defaultaction=None, - actions=None): + def build_response(self, + request: Iq, + status: str = 'executing', + defaultaction: Optional[str] = None, + actions: Optional[list[str]] = None + ) -> tuple[Iq, nbxmpp.Node]: assert status in ('executing', 'completed', 'canceled') response = request.buildReply('result') @@ -74,11 +91,11 @@ def build_response(self, request, status='executing', defaultaction=None, cmd.addChild('actions', attrs, actions) return response, cmd - def bad_request(self, stanza): + def bad_request(self, stanza: Iq) -> None: self.connection.connection.send( nbxmpp.Error(stanza, Namespace.STANZAS + ' bad-request')) - def cancel(self, request): + def cancel(self, request: Iq) -> bool: response = self.build_response(request, status='canceled')[0] self.connection.connection.send(response) return False # finish the session @@ -88,21 +105,25 @@ class ChangeStatusCommand(AdHocCommand): commandnode = 'change-status' commandname = _('Change status information') - def __init__(self, conn, jid, sessionid): + def __init__(self, + conn: types.Client, + jid: str, + sessionid: str + ) -> None: AdHocCommand.__init__(self, conn, jid, sessionid) - self._callback = self.first_step + self._callback: Callable[..., bool] = self.first_step @staticmethod - def is_visible_for(samejid): + def is_visible_for(samejid: bool) -> bool: """ Change status is visible only if the entity has the same bare jid """ return samejid - def execute(self, request): + def execute(self, request: Iq): return self._callback(request) - def first_step(self, request): + def first_step(self, request: Iq) -> bool: # first query... response, cmd = self.build_response(request, defaultaction='execute', @@ -141,7 +162,7 @@ def first_step(self, request): return True # keep the session - def second_step(self, request): + def second_step(self, request: Iq) -> bool: # check if the data is correct try: form = dataforms.SimpleDataForm( @@ -190,7 +211,7 @@ class AdHocCommands(BaseModule): 'execute_command', ] - def __init__(self, con): + def __init__(self, con: types.Client) -> None: BaseModule.__init__(self, con) self.handlers = [ @@ -201,24 +222,24 @@ def __init__(self, con): ] # a list of all commands exposed: node -> command class - self._commands = {} + self._commands: dict[str, Type[AdHocCommand]] = {} if app.settings.get('remote_commands'): for cmdobj in (ChangeStatusCommand,): self._commands[cmdobj.commandnode] = cmdobj # a list of sessions; keys are tuples (jid, sessionid, node) - self._sessions = {} + self._sessions: dict[tuple[str, str, str], AdHocCommand] = {} - def get_own_bare_jid(self): + def get_own_bare_jid(self) -> str: return self._con.get_own_jid().bare - def is_same_jid(self, jid): + def is_same_jid(self, jid: str) -> bool: """ Test if the bare jid given is the same as our bare jid """ return nbxmpp.JID.from_string(jid).bare == self.get_own_bare_jid() - def command_list_query(self, stanza): + def command_list_query(self, stanza: Iq) -> None: iq = stanza.buildReply('result') jid = helpers.get_full_jid_from_iq(stanza) query = iq.getTag('query') @@ -235,7 +256,7 @@ def command_list_query(self, stanza): self._con.connection.send(iq) - def command_info_query(self, stanza): + def command_info_query(self, stanza: Iq) -> bool: """ Send disco#info result for query for command (XEP-0050, example 6.). Return True if the result was sent, False if not @@ -268,7 +289,7 @@ def command_info_query(self, stanza): return False - def command_items_query(self, stanza): + def command_items_query(self, stanza: Iq) -> bool: """ Send disco#items result for query for command. Return True if the result was sent, False if not. @@ -287,7 +308,11 @@ def command_items_query(self, stanza): return False - def _execute_command_received(self, _con, stanza, _properties): + def _execute_command_received(self, + _con: types.xmppClient, + stanza: Iq, + _properties: IqProperties + ) -> None: jid = helpers.get_full_jid_from_iq(stanza) cmd = stanza.getTag('command') @@ -364,8 +389,13 @@ def _execute_command_received(self, _con, stanza, _properties): raise nbxmpp.NodeProcessed - def send_command(self, jid, node, session_id, - form, action='execute'): + def send_command(self, + jid: str, + node: str, + session_id: str, + form: dataforms.SimpleDataForm, + action: str = 'execute' + ) -> None: """ Send the command with data form. Wait for reply """ @@ -386,7 +416,10 @@ def send_command(self, jid, node, session_id, self._con.connection.SendAndCallForResponse( stanza, self._action_response_received) - def _action_response_received(self, _nbxmpp_client, stanza): + def _action_response_received(self, + _nbxmpp_client: types.xmppClient, + stanza: Iq + ) -> None: if not nbxmpp.isResultNode(stanza): self._log.info('Error: %s', stanza.getError()) @@ -399,7 +432,11 @@ def _action_response_received(self, _nbxmpp_client, stanza): app.ged.raise_event( AdHocCommandActionResponse(conn=self._con, command=command)) - def send_cancel(self, jid, node, session_id): + def send_cancel(self, + jid: str, + node: str, + session_id: str + ) -> None: """ Send the command with action='cancel' """ @@ -415,7 +452,10 @@ def send_cancel(self, jid, node, session_id): self._con.connection.SendAndCallForResponse( stanza, self._cancel_result_received) - def _cancel_result_received(self, _nbxmpp_client, stanza): + def _cancel_result_received(self, + _nbxmpp_client: types.xmppClient, + stanza: Iq + ) -> None: if not nbxmpp.isResultNode(stanza): self._log.warning('Error: %s', stanza.getError()) else: