Commit c7597a9e authored by Philipp Hörist's avatar Philipp Hörist

Refactor validating JIDs

- Validate while parsing into JID
- Add dedicated methods for the different JID parts
- Follow RTC7622 exactly
- Add unit tests
parent 54d8fea2
......@@ -20,9 +20,13 @@ sub- stanzas) handling routines
import time
import hashlib
import socket
from base64 import b64encode
from .simplexml import Node, NodeBuilder
from precis_i18n import get_profile
from nbxmpp.stringprepare import nameprep
from nbxmpp.simplexml import Node
from nbxmpp.simplexml import NodeBuilder
def ascii_upper(s):
return s.upper()
......@@ -503,6 +507,8 @@ _status_conditions = {
'removed-shutdown': 332,
}
_localpart_disallowed_chars = set('"&\'/:<>@')
STREAM_NOT_AUTHORIZED = 'urn:ietf:params:xml:ns:xmpp-streams not-authorized'
STREAM_REMOTE_CONNECTION_FAILED = 'urn:ietf:params:xml:ns:xmpp-streams remote-connection-failed'
......@@ -661,6 +667,30 @@ class InvalidStanza(Exception):
class InvalidJid(Exception):
pass
class LocalpartByteLimit(InvalidJid):
def __init__(self):
InvalidJid.__init__(self, 'Localpart must be between 1 and 1023 bytes')
class LocalpartNotAllowedChar(InvalidJid):
def __init__(self):
InvalidJid.__init__(self, 'Not allowed character in localpart')
class ResourcepartByteLimit(InvalidJid):
def __init__(self):
InvalidJid.__init__(self, 'Resourcepart must be between 1 and 1023 bytes')
class ResourcepartNotAllowedChar(InvalidJid):
def __init__(self):
InvalidJid.__init__(self, 'Not allowed character in resourcepart')
class DomainpartByteLimit(InvalidJid):
def __init__(self):
InvalidJid.__init__(self, 'Domainpart must be between 1 and 1023 bytes')
class DomainpartNotAllowedChar(InvalidJid):
def __init__(self):
InvalidJid.__init__(self, 'Not allowed character in domainpart')
class StanzaMalformed(Exception):
pass
......@@ -693,6 +723,84 @@ stream_exceptions = {'bad-format': BadFormat,
'xml-not-well-formed': XMLNotWellFormed}
def parse_jid(jid):
# https://tools.ietf.org/html/rfc7622#section-3.2
# Remove any portion from the first '/' character to the end of the
# string (if there is a '/' character present).
# Remove any portion from the beginning of the string to the first
# '@' character (if there is an '@' character present).
if jid.find('/') != -1:
rest, resourcepart = jid.split('/', 1)
resourcepart = validate_resourcepart(resourcepart)
else:
rest, resourcepart = jid, ''
if rest.find('@') != -1:
localpart, domainpart = rest.split('@', 1)
localpart = validate_localpart(localpart)
else:
localpart, domainpart = '', rest
domainpart = validate_domainpart(domainpart)
return localpart, domainpart, resourcepart
def validate_localpart(localpart):
if not localpart or len(localpart.encode()) > 1023:
raise LocalpartByteLimit
if _localpart_disallowed_chars & set(localpart):
raise LocalpartNotAllowedChar
try:
username = get_profile('UsernameCaseMapped')
return username.enforce(localpart)
except Exception:
raise LocalpartNotAllowedChar
def validate_resourcepart(resourcepart):
if not resourcepart or len(resourcepart.encode()) > 1023:
raise ResourcepartByteLimit
try:
opaque = get_profile('OpaqueString')
return opaque.enforce(resourcepart)
except Exception:
raise ResourcepartNotAllowedChar
def validate_domainpart(domainpart):
# Check if this is a IPV4 address
try:
socket.inet_aton(domainpart)
return domainpart
except Exception:
pass
# Check if this is a IPV6 address
if domainpart.startswith('[') and domainpart.endswith(']'):
try:
socket.inet_pton(socket.AF_INET6, domainpart.strip('[]'))
return domainpart
except Exception:
pass
if not domainpart or len(domainpart.encode()) > 1023:
raise DomainpartByteLimit
if domainpart.endswith('.'): # RFC7622, 3.2
domainpart = domainpart[:-1]
try:
return nameprep.prepare(domainpart)
except Exception:
raise DomainpartNotAllowedChar
class JID:
"""
JID can be built from string, modified, compared, serialised into string
......@@ -714,14 +822,7 @@ class JID:
elif domain:
self.node, self.domain, self.resource = node, domain, resource
else:
if jid.find('@') + 1:
self.node, jid = jid.split('@', 1)
else:
self.node = ''
if jid.find('/')+1:
self.domain, self.resource = jid.split('/', 1)
else:
self.domain, self.resource = jid, ''
self.node, self.domain, self.resource = parse_jid(jid)
def getNode(self):
"""
......
import unittest
from nbxmpp.protocol import LocalpartByteLimit
from nbxmpp.protocol import LocalpartNotAllowedChar
from nbxmpp.protocol import ResourcepartByteLimit
from nbxmpp.protocol import ResourcepartNotAllowedChar
from nbxmpp.protocol import DomainpartByteLimit
from nbxmpp.protocol import DomainpartNotAllowedChar
from nbxmpp.protocol import JID
class JIDParsing(unittest.TestCase):
def test_valid_jids(self):
tests = [
'juliet@example.com',
'juliet@example.com/foo',
'juliet@example.com/foo bar',
'juliet@example.com/foo@bar',
'foo\\20bar@example.com',
'fussball@example.com',
'fu\U000000DFball@example.com',
'\U000003C0@example.com',
'\U000003A3@example.com/foo',
'\U000003C3@example.com/foo',
'\U000003C2@example.com/foo',
'king@example.com/\U0000265A',
'example.com',
'example.com/foobar',
'a.example.com/b@example.net',
]
for jid in tests:
JID(jid)
def test_invalid_jids(self):
tests = [
('"juliet"@example.com', LocalpartNotAllowedChar),
('foo bar@example.com', LocalpartNotAllowedChar),
('henry\U00002163@example.com', LocalpartNotAllowedChar),
('@example.com', LocalpartByteLimit),
('user@example.com/', ResourcepartByteLimit),
('user@example.com/\U00000001', ResourcepartNotAllowedChar),
('\U0000265A@example.com', LocalpartNotAllowedChar),
('user@host@example.com', DomainpartNotAllowedChar),
('juliet@', DomainpartByteLimit),
('/foobar', DomainpartByteLimit),
]
for jid, exception in tests:
with self.assertRaises(exception):
JID(jid)
def test_ip_literals(self):
tests = [
('juliet@[2002:4559:1FE2::4559:1FE2]/res'),
('juliet@123.123.123.123/res'),
]
for jid in tests:
JID(jid)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment