Commit cbbc8928 authored by Emmanuel Gil Peyrot's avatar Emmanuel Gil Peyrot
Browse files

Add typing hints to simplexml

parent 442eae7e
Pipeline #5409 failed with stages
in 4 seconds
......@@ -18,14 +18,19 @@ nodes and XML streams. I'm personally using it in many other separate
projects. It is designed to be as standalone as possible
from __future__ import annotations
import logging
import xml.parsers.expat
from xml.parsers.expat import ExpatError
from copy import deepcopy
from typing import Dict, List, Optional, Union, Iterator, Callable, Any
Attrs = Dict[str, str]
log = logging.getLogger('nbxmpp.simplexml')
def XMLescape(txt):
def XMLescape(txt: str) -> str:
Return provided string with symbols & < > " replaced by their respective XML
......@@ -54,10 +59,19 @@ class Node:
on the classes tree).
name: str
namespace: str
attrs: Attrs
data: List[str]
kids: List[Union[Node, str]]
parent: Optional[Node]
nsd: Dict[str, str]
nsp_cache: Dict[Any, Any]
def __init__(self, tag=None, attrs=None, payload=None, parent=None, nsp=None,
node_built=False, node=None):
def __init__(self, tag: Optional[str] = None, attrs: Optional[Attrs] = None, payload: Optional[Union[Node, str, List[Union[Node, str]]]] = None, parent: Optional[Node] = None, nsp: Optional[Dict[Any, Any]] = None,
node_built: bool = False, node: Optional[Union[Node, Any]] = None) -> None:
Takes "tag" argument as the name of node (prepended by namespace, if
needed and separated from it by a space), attrs dictionary as the set of
......@@ -121,7 +135,7 @@ class Node:
def lookup_nsp(self, pfx=''):
def lookup_nsp(self, pfx: str = '') -> str:
ns = self.nsd.get(pfx, None)
if ns is None:
ns = self.nsp_cache.get(pfx, None)
......@@ -133,7 +147,7 @@ class Node:
return ''
return ns
def __str__(self, fancy=0):
def __str__(self, fancy: int = 0) -> str:
Method used to dump node into textual representation. If "fancy" argument
is set to True produces indented output for readability
......@@ -178,7 +192,7 @@ class Node:
s += "\n"
return s
def addChild(self, name=None, attrs=None, payload=None, namespace=None, node=None):
def addChild(self, name: Optional[str] = None, attrs: Optional[Attrs] = None, payload: Optional[List[Any]] = None, namespace: Optional[str] = None, node: Optional[Node] = None) -> Node:
If "node" argument is provided, adds it as child node. Else creates new
node from the other arguments' values and adds it as well
......@@ -199,25 +213,25 @@ class Node:
return newnode
def addData(self, data):
def addData(self, data: Any) -> None:
Add some CDATA to node
def clearData(self):
def clearData(self) -> None:
Remove all CDATA from the node
""" = []
def delAttr(self, key):
def delAttr(self, key: str) -> None:
Delete an attribute "key"
del self.attrs[key]
def delChild(self, node, attrs=None):
def delChild(self, node: Union[Node, str], attrs: Optional[Attrs] = None) -> Optional[Node]:
Delete the "node" from the node's childs list, if "node" is an instance.
Else delete the first node that have specified name and (optionally)
......@@ -225,10 +239,11 @@ class Node:
if not isinstance(node, Node):
node = self.getTag(node, attrs)
assert isinstance(node, Node)
return node
def getAttrs(self, copy=False):
def getAttrs(self, copy: bool = False) -> Attrs:
Return all node's attributes as dictionary
......@@ -236,49 +251,49 @@ class Node:
return deepcopy(self.attrs)
return self.attrs
def getAttr(self, key):
def getAttr(self, key: str) -> Optional[str]:
Return value of specified attribute
return self.attrs.get(key)
def getChildren(self):
def getChildren(self) -> List[Union[Node, str]]:
Return all node's child nodes as list
def getData(self):
def getData(self) -> str:
Return all node CDATA as string (concatenated)
return ''.join(
def getName(self):
def getName(self) -> str:
Return the name of node
def getNamespace(self):
def getNamespace(self) -> str:
Return the namespace of node
return self.namespace
def getParent(self):
def getParent(self) -> Optional[Node]:
Returns the parent of node (if present)
return self.parent
def getPayload(self):
def getPayload(self) -> List[Union[Node, str]]:
Return the payload of node i.e. list of child nodes and CDATA entries.
F.e. for "<node>text1<nodea/><nodeb/> text2</node>" will be returned
list: ['text1', <nodea instance>, <nodeb instance>, ' text2']
ret = []
ret: List[Union[Node, str]] = []
for i in range(len(
......@@ -291,33 +306,35 @@ class Node:
return ret
def getTag(self, name, attrs=None, namespace=None):
def getTag(self, name: str, attrs: Optional[Attrs] = None, namespace: Optional[str] = None) -> Optional[Node]:
Filter all child nodes using specified arguments as filter. Return the
first found or None if not found
return self.getTags(name, attrs, namespace, one=1)
tag = self.getTags(name, attrs, namespace, one=True)
assert not isinstance(tag, list)
return tag
def getTagAttr(self, tag, attr, namespace=None):
def getTagAttr(self, tag: str, attr: str, namespace: Optional[str] = None) -> Optional[str]:
Return attribute value of the child with specified name (or None if no
such attribute)
return self.getTag(tag, namespace=namespace).attrs[attr]
except Exception:
node = self.getTag(tag, namespace=namespace)
if node is None:
return None
return node.getAttr(attr)
def getTagData(self, tag):
def getTagData(self, tag: str) -> Optional[str]:
Return cocatenated CDATA of the child with specified name
return self.getTag(tag).getData()
except Exception:
node = self.getTag(tag)
if node is None:
return None
return node.getData()
def getTags(self, name, attrs=None, namespace=None, one=0):
def getTags(self, name: str, attrs: Optional[Attrs] = None, namespace: Optional[str] = None, one: bool = False) -> Union[List[Node], Node, None]:
Filter all child nodes using specified arguments as filter. Returns the
list of nodes found
......@@ -340,7 +357,7 @@ class Node:
return nodes
return None
def iterTags(self, name, attrs=None, namespace=None):
def iterTags(self, name: str, attrs: Optional[Attrs] = None, namespace: Optional[str] = None) -> Iterator[Node]:
Iterate over all children using specified arguments as filter
......@@ -357,31 +374,31 @@ class Node:
yield node
def setAttr(self, key, val):
def setAttr(self, key: str, val: str) -> None:
Set attribute "key" with the value "val"
self.attrs[key] = val
def setData(self, data):
def setData(self, data: Any) -> None:
Set node's CDATA to provided string. Resets all previous CDATA!
""" = [str(data)]
def setName(self, val):
def setName(self, val: str) -> None:
Change the node name
""" = val
def setNamespace(self, namespace):
def setNamespace(self, namespace: str) -> None:
Changes the node namespace
self.namespace = namespace
def setParent(self, node):
def setParent(self, node: Node) -> None:
Set node's parent to "node". WARNING: do not checks if the parent already
present and not removes the node from the list of childs of previous
......@@ -389,7 +406,7 @@ class Node:
self.parent = node
def setPayload(self, payload, add=0):
def setPayload(self, payload: Union[List[Union[Node, str]], Node, str], add: bool = False) -> None:
Set node payload according to the list specified. WARNING: completely
replaces all node's previous content. If you wish just to add child or
......@@ -402,17 +419,17 @@ class Node:
else: = payload
def setTag(self, name, attrs=None, namespace=None):
def setTag(self, name: str, attrs: Optional[Attrs] = None, namespace: Optional[str] = None) -> Node:
Same as getTag but if the node with specified namespace/attributes not
found, creates such node and returns it
node = self.getTags(name, attrs, namespace=namespace, one=1)
node = self.getTags(name, attrs, namespace=namespace, one=True)
if node:
return node
return self.addChild(name, attrs, namespace=namespace)
def setTagAttr(self, tag, attr, val, namespace=None):
def setTagAttr(self, tag: str, attr: str, val: str, namespace: Optional[str] = None) -> None:
Create new node (if not already present) with name "tag" and set it's
attribute "attr" to value "val"
......@@ -422,7 +439,7 @@ class Node:
except Exception:
self.addChild(tag, namespace=namespace, attrs={attr: val})
def setTagData(self, tag, val, attrs=None):
def setTagData(self, tag: str, val: str, attrs: Optional[Attrs] = None) -> None:
Creates new node (if not already present) with name "tag" and
(optionally) attributes "attrs" and sets it's CDATA to string "val"
......@@ -432,7 +449,7 @@ class Node:
except Exception:
self.addChild(tag, attrs, payload = [str(val)])
def getXmlLang(self):
def getXmlLang(self) -> Optional[str]:
lang = self.attrs.get('xml:lang')
if lang is not None:
return lang
......@@ -441,37 +458,37 @@ class Node:
return self.parent.getXmlLang()
return None
def has_attr(self, key):
def has_attr(self, key: str) -> bool:
Check if node have attribute "key"
return key in self.attrs
def __getitem__(self, item):
def __getitem__(self, item: str) -> Optional[str]:
Return node's attribute "item" value
return self.getAttr(item)
def __setitem__(self, item, val):
def __setitem__(self, item: str, val: str) -> None:
Set node's attribute "item" value
return self.setAttr(item, val)
self.setAttr(item, val)
def __delitem__(self, item):
def __delitem__(self, item: str) -> None:
Delete node's attribute "item"
return self.delAttr(item)
def __contains__(self, item):
def __contains__(self, item: str) -> bool:
Check if node has attribute "item"
return self.has_attr(item)
def __getattr__(self, attr):
def __getattr__(self, attr: str) -> Union['T', 'NT']:
Reduce memory usage caused by T/NT classes - use memory only when needed
......@@ -533,8 +550,23 @@ class NodeBuilder:
XML handler
def __init__(self, data=None, initial_node=None,
dispatch_depth=1, finished=True):
_parser: Any
Parse: Callable[[str, bool], None]
__depth: int
__last_depth: int
__max_depth: int
_dispatch_depth: int
_document_attrs: Optional[Attrs]
_document_nsp: Optional[Dict[str, str]]
_mini_dom: Optional[Node]
last_is_data: bool
_ptr: Optional[Node]
data_buffer: Optional[List[str]]
streamError: str
_is_stream: bool
def __init__(self, data: Optional[str] = None, initial_node: Optional[Node] = None,
dispatch_depth: int = 1, finished: bool = True) -> None:
Take two optional parameters: "data" and "initial_node"
......@@ -565,7 +597,7 @@ class NodeBuilder:
self._document_attrs = None
self._document_nsp = None
self._mini_dom = initial_node
self.last_is_data = 1
self.last_is_data = True
self._ptr = None
self.data_buffer = None
self.streamError = ''
......@@ -573,13 +605,13 @@ class NodeBuilder:
if data:
self._parser.Parse(data, finished)
def check_data_buffer(self):
def check_data_buffer(self) -> None:
if self.data_buffer:''.join(self.data_buffer))
del self.data_buffer[:]
self.data_buffer = None
def destroy(self):
def destroy(self) -> None:
Method used to allow class instance to be garbage-collected
......@@ -589,7 +621,7 @@ class NodeBuilder:
self._parser.CharacterDataHandler = None
self._parser.StartNamespaceDeclHandler = None
def starttag(self, tag, attrs):
def starttag(self, tag: str, attrs: Attrs) -> None:
XML Parser callback. Used internally
......@@ -627,9 +659,9 @@ class NodeBuilder:
raise ValueError(str(e))
if not self.last_is_data and self._ptr.parent:'')
self.last_is_data = 0
self.last_is_data = False
def _check_stream_start(self, ns, tag):
def _check_stream_start(self, ns: str, tag: str) -> None:
if self._is_stream:
if ns != '' or tag != 'stream':
raise ValueError('Incorrect stream start: (%s,%s). Terminating.'
......@@ -637,7 +669,7 @@ class NodeBuilder:
def endtag(self, tag):
def endtag(self, tag: str) -> None:
XML Parser callback. Used internally
......@@ -656,70 +688,70 @@ class NodeBuilder:
log.debug("Got higher than dispatch level. Stream terminated?")
self.last_is_data = 0
self.last_is_data = False
if self.__depth == 0:
def handle_cdata(self, data):
def handle_cdata(self, data: str) -> None:
if self.last_is_data:
if self.data_buffer:
elif self._ptr:
self.data_buffer = [data]
self.last_is_data = 1
self.last_is_data = True
def handle_invalid_xmpp_element(*args):
def handle_invalid_xmpp_element(*args: Any) -> None:
raise ExpatError('Found invalid xmpp stream element: %s' % str(args))
def handle_namespace_start(self, _prefix, _uri):
def handle_namespace_start(self, _prefix: str, _uri: str) -> None:
XML Parser callback. Used internally
def getDom(self):
def getDom(self) -> Optional[Node]:
Return just built Node
return self._mini_dom
def dispatch(self, stanza):
def dispatch(self, stanza: Any) -> None:
Get called when the NodeBuilder reaches some level of depth on it's way
up with the built node as argument. Can be redefined to convert incoming
XML stanzas to program events
def stream_header_received(self):
def stream_header_received(self) -> None:
Method called when stream just opened
def stream_footer_received(self):
def stream_footer_received(self) -> None:
Method called when stream just closed
def has_received_endtag(self, level=0):
def has_received_endtag(self, level: int = 0) -> bool:
Return True if at least one end tag was seen (at level)
return self.__depth <= level < self.__max_depth
def _inc_depth(self):
def _inc_depth(self) -> None:
self.__last_depth = self.__depth
self.__depth += 1
self.__max_depth = max(self.__depth, self.__max_depth)
def _dec_depth(self):
def _dec_depth(self) -> None:
self.__last_depth = self.__depth
self.__depth -= 1
def XML2Node(xml):
def XML2Node(xml: str) -> Optional[Node]:
Convert supplied textual string into XML node. Handy f.e. for reading
configuration file. Raises xml.parser.expat.parsererror if provided string
......@@ -727,7 +759,7 @@ def XML2Node(xml):
return NodeBuilder(xml).getDom()
def BadXML2Node(xml):
def BadXML2Node(xml: str) -> Optional[Node]:
Convert supplied textual string into XML node. Survives if xml data is
cutted half way round. I.e. "<html>some text <br>some more text". Will raise
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment