Commit a1589a5e authored by Philipp Hörist's avatar Philipp Hörist

Tests: Add tests for pubsub notification parsing

parent 024d1363
Pipeline #4705 passed with stages
in 28 seconds
STREAM_START = "<stream:stream xmlns:stream='http://etherx.jabber.org/streams' xmlns='jabber:client'>"
import unittest
from unittest.mock import Mock
from test.lib.const import STREAM_START
from nbxmpp import dispatcher
from nbxmpp.protocol import NS_CLIENT
from nbxmpp.protocol import JID
class StanzaHandlerTest(unittest.TestCase):
def setUp(self):
self.dispatcher = dispatcher.XMPPDispatcher()
# Setup mock client
self.client = Mock()
self.client.get_bound_jid.return_value = JID('test@test.test')
self.client.defaultNamespace = NS_CLIENT
self.client.Connection = Mock() # mock transport
self.con = self.client.Connection
self.dispatcher.PlugIn(self.client)
# Simulate that we have established a connection
self.dispatcher.StreamInit()
self.dispatcher.ProcessNonBlocking(STREAM_START)
from test.lib.util import StanzaHandlerTest
from nbxmpp.protocol import NS_PUBSUB_EVENT
from nbxmpp.structs import StanzaHandler
from nbxmpp.structs import ActivityData
from nbxmpp.structs import PubSubEventData
class ActivityTest(StanzaHandlerTest):
def test_activity_parsing(self):
def _on_message(_con, _stanza, properties):
data = ActivityData(activity='relaxing',
subactivity='partying',
text='My nurse\'s birthday!')
pubsub_event = PubSubEventData(
node='http://jabber.org/protocol/activity',
id='b5ac48d0-0f9c-11dc-8754-001143d5d5db',
item=None,
data=data,
deleted=False,
retracted=False,
purged=False)
# We cant compare Node objects
pubsub_event_ = properties.pubsub_event._replace(item=None)
self.assertEqual(pubsub_event, pubsub_event_)
event = '''
<message from='test@test.test'>
<event xmlns='http://jabber.org/protocol/pubsub#event'>
<items node='http://jabber.org/protocol/activity'>
<item id='b5ac48d0-0f9c-11dc-8754-001143d5d5db'>
<activity xmlns='http://jabber.org/protocol/activity'>
<relaxing>
<partying/>
</relaxing>
<text xml:lang='en'>My nurse&apos;s birthday!</text>
</activity>
</item>
</items>
</event>
</message>
'''
self.dispatcher.RegisterHandler(
*StanzaHandler(name='message',
callback=_on_message,
ns=NS_PUBSUB_EVENT))
self.dispatcher.ProcessNonBlocking(event)
from test.lib.util import StanzaHandlerTest
from nbxmpp.protocol import NS_PUBSUB_EVENT
from nbxmpp.structs import StanzaHandler
from nbxmpp.structs import AvatarMetaData
from nbxmpp.structs import PubSubEventData
class AvatarTest(StanzaHandlerTest):
def test_avatar_parsing(self):
def _on_message(_con, _stanza, properties):
data = AvatarMetaData(bytes='12345',
height='64',
width='64',
id='111f4b3c50d7b0df729d299bc6f8e9ef9066971f',
type='image/png',
url='http://avatars.example.org/happy.gif')
pubsub_event = PubSubEventData(
node='urn:xmpp:avatar:metadata',
id='111f4b3c50d7b0df729d299bc6f8e9ef9066971f',
item=None,
data=data,
deleted=False,
retracted=False,
purged=False)
# We cant compare Node objects
pubsub_event_ = properties.pubsub_event._replace(item=None)
self.assertEqual(pubsub_event, pubsub_event_)
event = '''
<message from='test@test.test'>
<event xmlns='http://jabber.org/protocol/pubsub#event'>
<items node='urn:xmpp:avatar:metadata'>
<item id='111f4b3c50d7b0df729d299bc6f8e9ef9066971f'>
<metadata xmlns='urn:xmpp:avatar:metadata'>
<info bytes='12345'
height='64'
id='111f4b3c50d7b0df729d299bc6f8e9ef9066971f'
type='image/png'
width='64'
url='http://avatars.example.org/happy.gif'/>
</metadata>
</item>
</items>
</event>
</message>
'''
self.dispatcher.RegisterHandler(
*StanzaHandler(name='message',
callback=_on_message,
ns=NS_PUBSUB_EVENT))
self.dispatcher.ProcessNonBlocking(event)
from test.lib.util import StanzaHandlerTest
from nbxmpp.protocol import NS_PUBSUB_EVENT
from nbxmpp.protocol import JID
from nbxmpp.structs import StanzaHandler
from nbxmpp.structs import BookmarkData
from nbxmpp.structs import PubSubEventData
class BookmarkTest(StanzaHandlerTest):
def test_bookmark_1_parsing(self):
def _on_message(_con, _stanza, properties):
data = [
BookmarkData(jid=JID('theplay@conference.shakespeare.lit'),
name='The Play\'s the Thing',
autojoin=True,
password='pass',
nick='JC'),
BookmarkData(jid=JID('second@conference.shakespeare.lit'),
name='Second room',
autojoin=False,
password=None,
nick=None)
]
pubsub_event = PubSubEventData(
node='storage:bookmarks',
id='current',
item=None,
data=data,
deleted=False,
retracted=False,
purged=False)
# We cant compare Node objects
pubsub_event_ = properties.pubsub_event._replace(item=None)
self.assertEqual(pubsub_event, pubsub_event_)
event = '''
<message from='test@test.test'>
<event xmlns='http://jabber.org/protocol/pubsub#event'>
<items node='storage:bookmarks'>
<item id='current'>
<storage xmlns='storage:bookmarks'>
<conference name='The Play&apos;s the Thing'
autojoin='true'
jid='theplay@conference.shakespeare.lit'>
<password>pass</password>
<nick>JC</nick>
</conference>
<conference name='Second room'
autojoin='0'
jid='second@conference.shakespeare.lit'>
</conference>
</storage>
</item>
</items>
</event>
</message>
'''
self.dispatcher.RegisterHandler(
*StanzaHandler(name='message',
callback=_on_message,
ns=NS_PUBSUB_EVENT))
self.dispatcher.ProcessNonBlocking(event)
def test_bookmark_2_parsing(self):
def _on_message(_con, _stanza, properties):
data = BookmarkData(jid=JID('theplay@conference.shakespeare.lit'),
name='The Play\'s the Thing',
autojoin=True,
password=None,
nick='JC')
pubsub_event = PubSubEventData(
node='urn:xmpp:bookmarks:0',
id='theplay@conference.shakespeare.lit',
item=None,
data=data,
deleted=False,
retracted=False,
purged=False)
# We cant compare Node objects
pubsub_event_ = properties.pubsub_event._replace(item=None)
self.assertEqual(pubsub_event, pubsub_event_)
event = '''
<message from='test@test.test'>
<event xmlns='http://jabber.org/protocol/pubsub#event'>
<items node='urn:xmpp:bookmarks:0'>
<item id='theplay@conference.shakespeare.lit'>
<conference xmlns='urn:xmpp:bookmarks:0'
name='The Play&apos;s the Thing'
autojoin='1'>
<nick>JC</nick>
</conference>
</item>
</items>
</event>
</message>
'''
self.dispatcher.RegisterHandler(
*StanzaHandler(name='message',
callback=_on_message,
ns=NS_PUBSUB_EVENT))
self.dispatcher.ProcessNonBlocking(event)
from test.lib.util import StanzaHandlerTest
from nbxmpp.protocol import NS_PUBSUB_EVENT
from nbxmpp.structs import StanzaHandler
from nbxmpp.structs import LocationData
from nbxmpp.structs import PubSubEventData
class LocationTest(StanzaHandlerTest):
def test_location_parsing(self):
def _on_message(_con, _stanza, properties):
data = LocationData(accuracy='20',
alt='1609',
altaccuracy='10',
area='Central Park',
bearing='12.33',
building='The Empire State Building',
country='United States',
countrycode='US',
datum='Some datum',
description='Bill\'s house',
error='290.8882087',
floor='102',
lat='39.75',
locality='New York City',
lon='-104.99',
postalcode='10118',
region='New York',
room='Observatory',
speed='52.69',
street='350 Fifth Avenue / 34th and Broadway',
text='Northwest corner of the lobby',
timestamp='2004-02-19T21:12Z',
tzo='-07:00',
uri='http://www.nyc.com/')
pubsub_event = PubSubEventData(
node='http://jabber.org/protocol/geoloc',
id='d81a52b8-0f9c-11dc-9bc8-001143d5d5db',
item=None,
data=data,
deleted=False,
retracted=False,
purged=False)
# We cant compare Node objects
pubsub_event_ = properties.pubsub_event._replace(item=None)
self.assertEqual(pubsub_event, pubsub_event_)
event = '''
<message from='test@test.test'>
<event xmlns='http://jabber.org/protocol/pubsub#event'>
<items node='http://jabber.org/protocol/geoloc'>
<item id='d81a52b8-0f9c-11dc-9bc8-001143d5d5db'>
<geoloc xmlns='http://jabber.org/protocol/geoloc' xml:lang='en'>
<accuracy>20</accuracy>
<alt>1609</alt>
<altaccuracy>10</altaccuracy>
<area>Central Park</area>
<bearing>12.33</bearing>
<building>The Empire State Building</building>
<country>United States</country>
<countrycode>US</countrycode>
<datum>Some datum</datum>
<description>Bill's house</description>
<error>290.8882087</error>
<floor>102</floor>
<lat>39.75</lat>
<locality>New York City</locality>
<lon>-104.99</lon>
<postalcode>10118</postalcode>
<region>New York</region>
<room>Observatory</room>
<speed>52.69</speed>
<street>350 Fifth Avenue / 34th and Broadway</street>
<text>Northwest corner of the lobby</text>
<timestamp>2004-02-19T21:12Z</timestamp>
<tzo>-07:00</tzo>
<uri>http://www.nyc.com/</uri>
</geoloc>
</item>
</items>
</event>
</message>
'''
self.dispatcher.RegisterHandler(
*StanzaHandler(name='message',
callback=_on_message,
ns=NS_PUBSUB_EVENT))
self.dispatcher.ProcessNonBlocking(event)
from test.lib.util import StanzaHandlerTest
from nbxmpp.protocol import NS_PUBSUB_EVENT
from nbxmpp.structs import StanzaHandler
from nbxmpp.structs import MoodData
from nbxmpp.structs import PubSubEventData
class MoodTest(StanzaHandlerTest):
def test_mood_parsing(self):
def _on_message(_con, _stanza, properties):
data = MoodData(mood='annoyed', text='curse my nurse!')
pubsub_event = PubSubEventData(
node='http://jabber.org/protocol/mood',
id='a475804a-0f9c-11dc-98a8-001143d5d5db',
item=None,
data=data,
deleted=False,
retracted=False,
purged=False)
# We cant compare Node objects
pubsub_event_ = properties.pubsub_event._replace(item=None)
self.assertEqual(pubsub_event, pubsub_event_)
event = '''
<message from='test@test.test'>
<event xmlns='http://jabber.org/protocol/pubsub#event'>
<items node='http://jabber.org/protocol/mood'>
<item id='a475804a-0f9c-11dc-98a8-001143d5d5db'>
<mood xmlns='http://jabber.org/protocol/mood'>
<annoyed/>
<text>curse my nurse!</text>
</mood>
</item>
</items>
</event>
</message>
'''
self.dispatcher.RegisterHandler(
*StanzaHandler(name='message',
callback=_on_message,
ns=NS_PUBSUB_EVENT))
self.dispatcher.ProcessNonBlocking(event)
from test.lib.util import StanzaHandlerTest
from nbxmpp.protocol import NS_PUBSUB_EVENT
from nbxmpp.structs import StanzaHandler
from nbxmpp.structs import PubSubEventData
class PubsubTest(StanzaHandlerTest):
def test_purge_event(self):
def _on_message(_con, _stanza, properties):
pubsub_event = PubSubEventData(
node='princely_musings',
id=None,
item=None,
data=None,
deleted=False,
retracted=False,
purged=True)
self.assertEqual(pubsub_event, properties.pubsub_event)
event = '''
<message from='test@test.test' id='b5ac48d0-0f9c-11dc-8754-001143d5d5db'>
<event xmlns='http://jabber.org/protocol/pubsub#event'>
<purge node='princely_musings'/>
</event>
</message>
'''
self.dispatcher.RegisterHandler(
*StanzaHandler(name='message',
callback=_on_message,
ns=NS_PUBSUB_EVENT,
priority=16))
self.dispatcher.ProcessNonBlocking(event)
def test_delete_event(self):
def _on_message(_con, _stanza, properties):
pubsub_event = PubSubEventData(
node='princely_musings',
id=None,
item=None,
data=None,
deleted=True,
retracted=False,
purged=False)
self.assertEqual(pubsub_event, properties.pubsub_event)
event = '''
<message from='test@test.test' id='b5ac48d0-0f9c-11dc-8754-001143d5d5db'>
<delete node='princely_musings'>
<redirect uri='xmpp:hamlet@denmark.lit?;node=blog'/>
</delete>
</message>
'''
self.dispatcher.RegisterHandler(
*StanzaHandler(name='message',
callback=_on_message,
ns=NS_PUBSUB_EVENT,
priority=16))
self.dispatcher.ProcessNonBlocking(event)
def test_retracted_event(self):
def _on_message(_con, _stanza, properties):
pubsub_event = PubSubEventData(
node='princely_musings',
id='ae890ac52d0df67ed7cfdf51b644e901',
item=None,
data=None,
deleted=False,
retracted=True,
purged=False)
self.assertEqual(pubsub_event, properties.pubsub_event)
event = '''
<message from='test@test.test' id='b5ac48d0-0f9c-11dc-8754-001143d5d5db'>
<items node='princely_musings'>
<retract id='ae890ac52d0df67ed7cfdf51b644e901'/>
</items>
</message>
'''
self.dispatcher.RegisterHandler(
*StanzaHandler(name='message',
callback=_on_message,
ns=NS_PUBSUB_EVENT,
priority=16))
self.dispatcher.ProcessNonBlocking(event)
from test.lib.util import StanzaHandlerTest
from nbxmpp.protocol import NS_PUBSUB_EVENT
from nbxmpp.structs import StanzaHandler
from nbxmpp.structs import TuneData
from nbxmpp.structs import PubSubEventData
class TuneTest(StanzaHandlerTest):
def test_tune_parsing(self):
def _on_message(_con, _stanza, properties):
data = TuneData(artist='Yes',
length='686',
rating='8',
source='Yessongs',
title='Heart of the Sunrise',
track='3',
uri='https://www.artist.com')
pubsub_event = PubSubEventData(
node='http://jabber.org/protocol/tune',
id='bffe6584-0f9c-11dc-84ba-001143d5d5db',
item=None,
data=data,
deleted=False,
retracted=False,
purged=False)
# We cant compare Node objects
pubsub_event_ = properties.pubsub_event._replace(item=None)
self.assertEqual(pubsub_event, pubsub_event_)
event = '''
<message from='test@test.test'>
<event xmlns='http://jabber.org/protocol/pubsub#event'>
<items node='http://jabber.org/protocol/tune'>
<item id='bffe6584-0f9c-11dc-84ba-001143d5d5db'>
<tune xmlns='http://jabber.org/protocol/tune'>
<artist>Yes</artist>
<length>686</length>
<rating>8</rating>
<source>Yessongs</source>
<title>Heart of the Sunrise</title>
<track>3</track>
<uri>https://www.artist.com</uri>
</tune>
</item>
</items>
</event>
</message>
'''
self.dispatcher.RegisterHandler(
*StanzaHandler(name='message',
callback=_on_message,
ns=NS_PUBSUB_EVENT))
self.dispatcher.ProcessNonBlocking(event)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment