Commit f57bd1ae authored by Philipp Hörist's avatar Philipp Hörist

Add method and tests for entity caps hash computation

parent f9532825
......@@ -49,37 +49,7 @@ class Discovery:
def _disco_info_received(self, stanza):
if not isResultNode(stanza):
return raise_error(log.info, stanza)
idenities = []
features = []
dataforms = []
query = stanza.getQuery()
for node in query.getTags('identity'):
attrs = node.getAttrs()
try:
idenities.append(
DiscoIdentity(category=attrs['category'],
type=attrs['type'],
name=attrs.get('name'),
lang=attrs.get('xml:lang')))
except Exception:
return raise_error(log.warning, stanza, 'stanza-malformed')
for node in query.getTags('feature'):
try:
features.append(node.getAttr('var'))
except Exception:
return raise_error(log.warning, stanza, 'stanza-malformed')
for node in query.getTags('x', namespace=NS_DATA):
dataforms.append(extend_form(node))
return DiscoInfo(jid=stanza.getFrom(),
node=query.getAttr('node'),
identities=idenities,
features=features,
dataforms=dataforms)
return parse_disco_info(stanza)
@call_on_response('_disco_items_received')
def disco_items(self, jid, node=None):
......@@ -90,24 +60,59 @@ class Discovery:
def _disco_items_received(self, stanza):
if not isResultNode(stanza):
return raise_error(log.info, stanza)
return parse_disco_items(stanza)
items = []
query = stanza.getQuery()
for node in query.getTags('item'):
attrs = node.getAttrs()
try:
items.append(
DiscoItem(jid=attrs['jid'],
name=attrs.get('name'),
node=attrs.get('node')))
except Exception:
return raise_error(log.warning, stanza, 'stanza-malformed')
return DiscoItems(jid=stanza.getFrom(),
node=query.getAttr('node'),
items=items)
def parse_disco_info(stanza):
idenities = []
features = []
dataforms = []
query = stanza.getQuery()
for node in query.getTags('identity'):
attrs = node.getAttrs()
try:
idenities.append(
DiscoIdentity(category=attrs['category'],
type=attrs['type'],
name=attrs.get('name'),
lang=attrs.get('xml:lang')))
except Exception:
return raise_error(log.warning, stanza, 'stanza-malformed')
for node in query.getTags('feature'):
try:
features.append(node.getAttr('var'))
except Exception:
return raise_error(log.warning, stanza, 'stanza-malformed')
for node in query.getTags('x', namespace=NS_DATA):
dataforms.append(extend_form(node))
return DiscoInfo(jid=stanza.getFrom(),
node=query.getAttr('node'),
identities=idenities,
features=features,
dataforms=dataforms)
def parse_disco_items(stanza):
items = []
query = stanza.getQuery()
for node in query.getTags('item'):
attrs = node.getAttrs()
try:
items.append(
DiscoItem(jid=attrs['jid'],
name=attrs.get('name'),
node=attrs.get('node')))
except Exception:
return raise_error(log.warning, stanza, 'stanza-malformed')
return DiscoItems(jid=stanza.getFrom(),
node=query.getAttr('node'),
items=items)
def get_disco_request(namespace, jid, node=None):
......
......@@ -664,6 +664,9 @@ class InvalidJid(Exception):
class StanzaMalformed(Exception):
pass
class DiscoInfoMalformed(Exception):
pass
stream_exceptions = {'bad-format': BadFormat,
'bad-namespace-prefix': BadNamespacePrefix,
'conflict': Conflict,
......
......@@ -113,14 +113,35 @@ IBBData = namedtuple('IBBData', 'block_size sid seq type data')
IBBData.__new__.__defaults__ = (None, None, None, None, None)
DiscoInfo = namedtuple('DiscoInfo', 'jid node identities features dataforms')
DiscoIdentity = namedtuple('DiscoIdentity', 'category type name lang')
DiscoIdentity.__new__.__defaults__ = (None, None)
DiscoItems = namedtuple('DiscoItems', 'jid node items')
DiscoItem = namedtuple('DiscoItem', 'jid name node')
DiscoItem.__new__.__defaults__ = (None, None)
class DiscoIdentity(namedtuple('DiscoIdentity', 'category type name lang')):
__slots__ = []
def __new__(cls, category, type, name=None, lang=None):
return super(DiscoIdentity, cls).__new__(cls, category, type, name, lang)
def __eq__(self, other):
return str(self) == str(other)
def __ne__(self, other):
return not self.__eq__(other)
def __str__(self):
return '%s/%s/%s/%s' % (self.category,
self.type,
self.lang or '',
self.name or '')
def __hash__(self):
return hash(str(self))
class AdHocCommand(namedtuple('AdHocCommand', 'jid node name sessionid status data actions notes')):
__slots__ = []
......
......@@ -27,6 +27,7 @@ import precis_i18n.codec
from nbxmpp.protocol import JID
from nbxmpp.protocol import InvalidJid
from nbxmpp.protocol import DiscoInfoMalformed
from nbxmpp.stringprepare import nameprep
from nbxmpp.structs import Properties
from nbxmpp.structs import IqProperties
......@@ -254,3 +255,116 @@ def text_to_color(text, background_color):
bc = 0.2 * bb_inv + 0.8 * blue
return rc, gc, bc
def compute_caps_hash(info):
"""
Compute caps hash according to XEP-0115, V1.5
https://xmpp.org/extensions/xep-0115.html#ver-proc
:param: info DiscoInfo
"""
# Initialize an empty string S.
string_ = ''
# Sort the service discovery identities by category and then by type and
# then by xml:lang (if it exists), formatted as
# CATEGORY '/' [TYPE] '/' [LANG] '/' [NAME]. Note that each slash is
# included even if the LANG or NAME is not included (in accordance with
# XEP-0030, the category and type MUST be included).
# For each identity, append the 'category/type/lang/name' to S, followed by
# the '<' character.
# Sort the supported service discovery features.
def sort_identities_key(i):
return (i.category, i.type, i.lang or '')
identities = sorted(info.identities, key=sort_identities_key)
for identity in identities:
string_ += '%s<' % str(identity)
# If the response includes more than one service discovery identity with
# the same category/type/lang/name, consider the entire response
# to be ill-formed.
if len(set(identities)) != len(identities):
raise DiscoInfoMalformed('Non-unique identity found')
# Sort the supported service discovery features.
# For each feature, append the feature to S, followed by the '<' character.
features = sorted(info.features)
for feature in features:
string_ += '%s<' % feature
# If the response includes more than one service discovery feature with the
# same XML character data, consider the entire response to be ill-formed.
if len(set(features)) != len(features):
raise DiscoInfoMalformed('Non-unique feature found')
# If the response includes more than one extended service discovery
# information form with the same FORM_TYPE or the FORM_TYPE field contains
# more than one <value/> element with different XML character data,
# consider the entire response to be ill-formed.
# If the response includes an extended service discovery information form
# where the FORM_TYPE field is not of type "hidden" or the form does not
# include a FORM_TYPE field, ignore the form but continue processing.
dataforms = []
form_type_values = []
for dataform in info.dataforms:
form_type = dataform.vars.get('FORM_TYPE')
if form_type is None:
# Ignore dataform because of missing FORM_TYPE
continue
if form_type.type_ != 'hidden':
# Ignore dataform because of wrong type
continue
values = form_type.getTags('value')
if len(values) != 1:
raise DiscoInfoMalformed('Form should have exactly '
'one FORM_TYPE value')
value = values[0].getData()
dataforms.append(dataform)
form_type_values.append(value)
if len(set(form_type_values)) != len(form_type_values):
raise DiscoInfoMalformed('Non-unique FORM_TYPE value found')
# If the service discovery information response includes XEP-0128 data
# forms, sort the forms by the FORM_TYPE (i.e., by the XML character data
# of the <value/> element).
# For each extended service discovery information form:
# - Append the XML character data of the FORM_TYPE field's <value/>
# element, followed by the '<' character.
# - Sort the fields by the value of the "var" attribute.
# - For each field other than FORM_TYPE:
# - Append the value of the "var" attribute, followed by the
# '<' character.
# - Sort values by the XML character data of the <value/> element.
# - For each <value/> element, append the XML character data,
# followed by the '<' character.
def sort_dataforms_key(dataform):
return dataform['FORM_TYPE'].getTagData('value')
dataforms = sorted(dataforms, key=sort_dataforms_key)
for dataform in dataforms:
string_ += '%s<' % dataform['FORM_TYPE'].getTagData('value')
fields = {}
for field in dataform.iter_fields():
if field.var == 'FORM_TYPE':
continue
values = field.getTags('value')
fields[field.var] = sorted([value.getData() for value in values])
for var in sorted(fields.keys()):
string_ += '%s<' % var
for value in fields[var]:
string_ += '%s<' % value
hash_ = hashlib.sha1(string_.encode())
return b64encode(hash_.digest())
import unittest
from nbxmpp.util import compute_caps_hash
from nbxmpp.modules.discovery import parse_disco_info
from nbxmpp.protocol import Iq
from nbxmpp.protocol import DiscoInfoMalformed
class EntityCaps(unittest.TestCase):
def test_multiple_field_values(self):
node = """
<iq from='benvolio@capulet.lit/230193' id='disco1' to='juliet@capulet.lit/chamber' type='result'>
<query xmlns='http://jabber.org/protocol/disco#info' node='http://psi-im.org#q07IKJEyjvHSyhy//CH0CxmKi8w='>
<identity xml:lang='en' category='client' name='Psi 0.11' type='pc'/>
<identity xml:lang='el' category='client' name='Ψ 0.11' type='pc'/>
<feature var='http://jabber.org/protocol/caps'/>
<feature var='http://jabber.org/protocol/disco#info'/>
<feature var='http://jabber.org/protocol/disco#items'/>
<feature var='http://jabber.org/protocol/muc'/>
<x xmlns='jabber:x:data' type='result'>
<field var='FORM_TYPE' type='hidden'>
<value>urn:xmpp:dataforms:softwareinfo</value>
</field>
<field var='ip_version'>
<value>ipv4</value>
<value>ipv6</value>
</field>
<field var='os'>
<value>Mac</value>
</field>
<field var='os_version'>
<value>10.5.1</value>
</field>
<field var='software'>
<value>Psi</value>
</field>
<field var='software_version'>
<value>0.11</value>
</field>
</x>
</query>
</iq>"""
info = parse_disco_info(Iq(node=node))
hash_ = compute_caps_hash(info)
self.assertEqual(hash_, 'q07IKJEyjvHSyhy//CH0CxmKi8w=')
def test_ignore_invalid_forms(self):
node = """
<iq from='benvolio@capulet.lit/230193' id='disco1' to='juliet@capulet.lit/chamber' type='result'>
<query xmlns='http://jabber.org/protocol/disco#info' node='http://psi-im.org#q07IKJEyjvHSyhy//CH0CxmKi8w='>
<identity xml:lang='en' category='client' name='Psi 0.11' type='pc'/>
<identity xml:lang='el' category='client' name='Ψ 0.11' type='pc'/>
<feature var='http://jabber.org/protocol/caps'/>
<feature var='http://jabber.org/protocol/disco#info'/>
<feature var='http://jabber.org/protocol/disco#items'/>
<feature var='http://jabber.org/protocol/muc'/>
<x xmlns='jabber:x:data' type='result'>
<field var='FORM_TYPE' type='hidden'>
<value>urn:xmpp:dataforms:softwareinfo</value>
</field>
<field var='ip_version'>
<value>ipv4</value>
<value>ipv6</value>
</field>
<field var='os'>
<value>Mac</value>
</field>
<field var='os_version'>
<value>10.5.1</value>
</field>
<field var='software'>
<value>Psi</value>
</field>
<field var='software_version'>
<value>0.11</value>
</field>
</x>
<x xmlns='jabber:x:data' type='result'>
<field var='FORM_TYPE'>
<value>urn:xmpp:dataforms:softwareinfo</value>
</field>
<field var='ip_version'>
<value>ipv4</value>
<value>ipv6</value>
</field>
</x>
<x xmlns='jabber:x:data' type='result'>
<field var='ip_version'>
<value>ipv4</value>
<value>ipv6</value>
</field>
</x>
</query>
</iq>"""
info = parse_disco_info(Iq(node=node))
hash_ = compute_caps_hash(info)
self.assertEqual(hash_, 'q07IKJEyjvHSyhy//CH0CxmKi8w=')
def test_multiple_form_type_values(self):
node = """
<iq from='benvolio@capulet.lit/230193' id='disco1' to='juliet@capulet.lit/chamber' type='result'>
<query xmlns='http://jabber.org/protocol/disco#info' node='http://psi-im.org#q07IKJEyjvHSyhy//CH0CxmKi8w='>
<identity xml:lang='en' category='client' name='Psi 0.11' type='pc'/>
<identity xml:lang='el' category='client' name='Ψ 0.11' type='pc'/>
<feature var='http://jabber.org/protocol/caps'/>
<feature var='http://jabber.org/protocol/disco#info'/>
<feature var='http://jabber.org/protocol/disco#items'/>
<feature var='http://jabber.org/protocol/muc'/>
<x xmlns='jabber:x:data' type='result'>
<field var='FORM_TYPE' type='hidden'>
<value>urn:xmpp:dataforms:softwareinfo</value>
<value>urn:xmpp:dataforms:softwareinfo_test</value>
</field>
<field var='ip_version'>
<value>ipv4</value>
<value>ipv6</value>
</field>
</x>
</query>
</iq>"""
info = parse_disco_info(Iq(node=node))
with self.assertRaises(DiscoInfoMalformed):
hash_ = compute_caps_hash(info)
def test_non_unique_form_type_value(self):
node = """
<iq from='benvolio@capulet.lit/230193' id='disco1' to='juliet@capulet.lit/chamber' type='result'>
<query xmlns='http://jabber.org/protocol/disco#info' node='http://psi-im.org#q07IKJEyjvHSyhy//CH0CxmKi8w='>
<identity xml:lang='en' category='client' name='Psi 0.11' type='pc'/>
<identity xml:lang='el' category='client' name='Ψ 0.11' type='pc'/>
<feature var='http://jabber.org/protocol/caps'/>
<feature var='http://jabber.org/protocol/disco#info'/>
<feature var='http://jabber.org/protocol/disco#items'/>
<feature var='http://jabber.org/protocol/muc'/>
<x xmlns='jabber:x:data' type='result'>
<field var='FORM_TYPE' type='hidden'>
<value>urn:xmpp:dataforms:softwareinfo</value>
</field>
<field var='ip_version'>
<value>ipv4</value>
<value>ipv6</value>
</field>
</x>
<x xmlns='jabber:x:data' type='result'>
<field var='FORM_TYPE' type='hidden'>
<value>urn:xmpp:dataforms:softwareinfo</value>
</field>
<field var='ip_version'>
<value>ipv4</value>
<value>ipv6</value>
</field>
</x>
</query>
</iq>"""
info = parse_disco_info(Iq(node=node))
with self.assertRaises(DiscoInfoMalformed):
hash_ = compute_caps_hash(info)
def test_non_unique_feature(self):
node = """
<iq from='benvolio@capulet.lit/230193' id='disco1' to='juliet@capulet.lit/chamber' type='result'>
<query xmlns='http://jabber.org/protocol/disco#info' node='http://psi-im.org#q07IKJEyjvHSyhy//CH0CxmKi8w='>
<identity xml:lang='en' category='client' name='Psi 0.11' type='pc'/>
<identity xml:lang='el' category='client' name='Ψ 0.11' type='pc'/>
<feature var='http://jabber.org/protocol/caps'/>
<feature var='http://jabber.org/protocol/muc'/>
<feature var='http://jabber.org/protocol/disco#info'/>
<feature var='http://jabber.org/protocol/disco#items'/>
<feature var='http://jabber.org/protocol/muc'/>
</query>
</iq>"""
info = parse_disco_info(Iq(node=node))
with self.assertRaises(DiscoInfoMalformed):
hash_ = compute_caps_hash(info)
def test_non_unique_identity(self):
node = """
<iq from='benvolio@capulet.lit/230193' id='disco1' to='juliet@capulet.lit/chamber' type='result'>
<query xmlns='http://jabber.org/protocol/disco#info' node='http://psi-im.org#q07IKJEyjvHSyhy//CH0CxmKi8w='>
<identity xml:lang='en' category='client' name='Psi 0.11' type='pc'/>
<identity xml:lang='en' category='client' name='Psi 0.11' type='pc'/>
<feature var='http://jabber.org/protocol/caps'/>
<feature var='http://jabber.org/protocol/muc'/>
<feature var='http://jabber.org/protocol/disco#info'/>
<feature var='http://jabber.org/protocol/disco#items'/>
<feature var='http://jabber.org/protocol/muc'/>
</query>
</iq>"""
info = parse_disco_info(Iq(node=node))
with self.assertRaises(DiscoInfoMalformed):
hash_ = compute_caps_hash(info)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment