Skip to content
Snippets Groups Projects
Commit c3d11637 authored by Daniel Brötzmann's avatar Daniel Brötzmann
Browse files

chore: Discovery: Add type annotations

parent d690d291
No related branches found
No related tags found
No related merge requests found
......@@ -14,13 +14,24 @@
# XEP-0030: Service Discovery
from __future__ import annotations
from typing import Optional
from typing import Union
import nbxmpp
from nbxmpp.namespaces import Namespace
from nbxmpp.structs import StanzaHandler
from nbxmpp.errors import StanzaError
from nbxmpp.errors import is_error
from nbxmpp.namespaces import Namespace
from nbxmpp.protocol import Iq
from nbxmpp.protocol import JID
from nbxmpp.structs import DiscoInfo
from nbxmpp.structs import IqProperties
from nbxmpp.structs import StanzaHandler
from nbxmpp.task import Task
from gajim.common import app
from gajim.common import types
from gajim.common.events import ServerDiscoReceived
from gajim.common.events import MucDiscoUpdate
from gajim.common.modules.util import as_task
......@@ -35,7 +46,7 @@ class Discovery(BaseModule):
'disco_items',
]
def __init__(self, con):
def __init__(self, con: types.Client) -> None:
BaseModule.__init__(self, con)
self.handlers = [
......@@ -49,22 +60,22 @@ def __init__(self, con):
ns=Namespace.DISCO_ITEMS),
]
self._account_info = None
self._server_info = None
self._account_info: Optional[DiscoInfo] = None
self._server_info: Optional[DiscoInfo] = None
@property
def account_info(self):
def account_info(self) -> Optional[DiscoInfo]:
return self._account_info
@property
def server_info(self):
def server_info(self) -> Optional[DiscoInfo]:
return self._server_info
def discover_server_items(self):
def discover_server_items(self) -> None:
server = self._con.get_own_jid().domain
self.disco_items(server, callback=self._server_items_received)
def _server_items_received(self, task):
def _server_items_received(self, task: Task) -> None:
try:
result = task.finish()
except StanzaError as error:
......@@ -80,7 +91,7 @@ def _server_items_received(self, task):
continue
self.disco_info(item.jid, callback=self._server_items_info_received)
def _server_items_info_received(self, task):
def _server_items_info_received(self, task: Task) -> None:
try:
result = task.finish()
except StanzaError as error:
......@@ -99,11 +110,11 @@ def _server_items_info_received(self, task):
app.ged.raise_event(ServerDiscoReceived())
def discover_account_info(self):
def discover_account_info(self) -> None:
own_jid = self._con.get_own_jid().bare
self.disco_info(own_jid, callback=self._account_info_received)
def _account_info_received(self, task):
def _account_info_received(self, task: Task) -> None:
try:
result = task.finish()
except StanzaError as error:
......@@ -123,12 +134,12 @@ def _account_info_received(self, task):
self._con.get_module('Caps').update_caps()
def discover_server_info(self):
def discover_server_info(self) -> None:
# Calling this method starts the connect_maschine()
server = self._con.get_own_jid().domain
self.disco_info(server, callback=self._server_info_received)
def _server_info_received(self, task):
def _server_info_received(self, task: Task) -> None:
try:
result = task.finish()
except StanzaError as error:
......@@ -149,7 +160,7 @@ def _server_info_received(self, task):
self._con.connect_machine(restart=True)
def _parse_transports(self, info):
def _parse_transports(self, info: DiscoInfo) -> None:
for identity in info.identities:
if identity.category not in ('gateway', 'headline'):
continue
......@@ -166,7 +177,11 @@ def _parse_transports(self, info):
else:
self._con.available_transports[identity.type] = [jid]
def _answer_disco_items(self, _con, stanza, _properties):
def _answer_disco_items(self,
_con: types.xmppClient,
stanza: Iq,
_properties: IqProperties
) -> None:
from_ = stanza.getFrom()
self._log.info('Answer disco items to %s', from_)
......@@ -183,7 +198,11 @@ def _answer_disco_items(self, _con, stanza, _properties):
self._con.get_module('AdHocCommands').command_list_query(stanza)
raise nbxmpp.NodeProcessed
def _answer_disco_info(self, _con, stanza, _properties):
def _answer_disco_info(self,
_con: types.xmppClient,
stanza: Iq,
_properties: IqProperties
) -> None:
from_ = stanza.getFrom()
self._log.info('Answer disco info %s', from_)
if str(from_).startswith('echo.'):
......@@ -195,9 +214,10 @@ def _answer_disco_info(self, _con, stanza, _properties):
@as_task
def disco_muc(self,
jid,
request_vcard=False,
allow_redirect=False):
jid: Union[JID, str],
request_vcard: bool = False,
allow_redirect: bool = False
):
_task = yield
......@@ -236,7 +256,7 @@ def disco_muc(self,
yield result
@as_task
def disco_contact(self, contact):
def disco_contact(self, contact: types.ContactT):
_task = yield
result = yield self.disco_info(contact.jid)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment