Skip to content
Snippets Groups Projects
Commit c66cb370 authored by zimio's avatar zimio
Browse files

file sharing. Fix request; Include unit testing.

parent 82c86859
No related branches found
No related tags found
No related merge requests found
from common import xmpp
from common import helpers
from common import gajim
from common import XMPPDispatcher
from common.xmpp import Hashes
import nbxmpp
from nbxmpp import Hashes
try:
from common import helpers
from common import gajim
except ImportError:
print "Import Error: Ignore if we are testing"
# Namespace for file sharing
NS_FILE_SHARING = 'http://gajim.org/protocol/filesharing'
class Protocol():
'''
Creates and extracts information from stanzas
'''
def __init__(self, ourjid):
# set our jid with resource
self.ourjid = ourjid
def request(self, contact, stanzaID, path=None):
iq = nbxmpp.Iq(typ='get', to=contact, frm=self.ourjid)
iq.setID(stanzaID)
query = iq.setQuery()
query.setNamespace(NS_FILE_SHARING)
query.setAttr('node', path)
return iq
def buildReply(self, typ, stanza):
iq = nbxmpp.Iq(typ, to=stanza.getFrom(), frm=stanza.getTo(),
attrs={'id': stanza.getID()})
iq.addChild(name='match', namespace=NS_FILE_SHARING)
return iq
def offer(self, id_, contact, items):
iq = nbxmpp.Iq(typ='result', to=contact, frm=self.ourjid,
attrs={'id': id_})
match = iq.addChild(name='match', namespace=NS_FILE_SHARING)
offer = match.addChild(name='offer')
if len(items) == 0:
offer.addChild(name='directory')
else:
for i in items:
# if it is a directory
if i[5] == True:
item = offer.addChild(name='directory')
name = item.addChild('name')
name.setData('/' + i[0])
else:
item = offer.addChild(name='file')
item.addChild('name').setData('/' + i[0])
if i[1] != '':
h = Hashes()
h.addHash(i[1], 'sha-1')
item.addChild(node=h)
item.addChild('size').setData(i[2])
item.addChild('desc').setData(i[3])
item.addChild('date').setData(i[4])
return iq
class ProtocolDispatcher():
'''
Sends and receives stanzas
'''
def __init__(self, account, plugin):
self.account = account
......@@ -16,27 +75,24 @@ class Protocol():
self.ourjid = gajim.get_jid_from_account(self.account)
self.fsw = None
def set_window(self, fsw):
self.fsw = fsw
def set_window(self, window):
self.fsw = window
def request(self, contact, name=None, isFile=False):
iq = xmpp.Iq(typ='get', to=contact, frm=self.ourjid)
match = iq.addChild(name='match', namespace=NS_FILE_SHARING)
request = match.addChild(name='request')
if not isFile and name is None:
request.addChild(name='directory')
elif not isFile and name is not None:
dir_ = request.addChild(name='directory')
dir_.addChild(name='name').addData('/' + name)
elif isFile:
pass
return iq
def __buildReply(self, typ, stanza):
iq = xmpp.Iq(typ, to=stanza.getFrom(), frm=stanza.getTo(),
attrs={'id': stanza.getID()})
iq.addChild(name='match', namespace=NS_FILE_SHARING)
return iq
def handler(self, stanza):
# handles incoming match stanza
if stanza.getTag('match').getTag('offer'):
self.on_offer(stanza)
elif stanza.getTag('match').getTag('request'):
self.on_request(stanza)
else:
# TODO: reply with malformed stanza error
pass
def on_request(self, stanza):
try:
......@@ -98,44 +154,6 @@ class Protocol():
row = self.fsw.ts_search.append(parent, ('',))
self.fsw.empty_row_child[dir_] = row
def handler(self, stanza):
# handles incoming match stanza
if stanza.getTag('match').getTag('offer'):
self.on_offer(stanza)
elif stanza.getTag('match').getTag('request'):
self.on_request(stanza)
else:
# TODO: reply with malformed stanza error
pass
def offer(self, id_, contact, items):
iq = xmpp.Iq(typ='result', to=contact, frm=self.ourjid,
attrs={'id': id_})
match = iq.addChild(name='match', namespace=NS_FILE_SHARING)
offer = match.addChild(name='offer')
if len(items) == 0:
offer.addChild(name='directory')
else:
for i in items:
# if it is a directory
if i[5] == True:
item = offer.addChild(name='directory')
name = item.addChild('name')
name.setData('/' + i[0])
else:
item = offer.addChild(name='file')
item.addChild('name').setData('/' + i[0])
if i[1] != '':
h = Hashes()
h.addHash(i[1], 'sha-1')
item.addChild(node=h)
item.addChild('size').setData(i[2])
item.addChild('desc').setData(i[3])
item.addChild('date').setData(i[4])
return iq
def set_window(self, fsw):
self.fsw = fsw
def get_files_info(stanza):
......
......@@ -4,6 +4,6 @@ short_name: fshare
#version: 0.1.1
description: This plugin allows you to share folders with your peers using jingle file transfer.
authors: Jefry Lagrange <jefry.reyes@gmail.com>
homepage: www.google.com
homepage: www.gajim.org
max_gajim_version: 0.16.9
#/usr/bin/python
import unittest
import sys, os
sys.path.append(os.path.abspath(sys.path[0]) + '/../')
import fshare_protocol
class TestProtocol(unittest.TestCase):
def setUp(self):
self.protocol = fshare_protocol.Protocol('test@gajim.org/test')
def test_request(self):
iq = self.protocol.request('peer@gajim.org/test', '1234', 'documents/test2.txt')
self.assertEqual(iq.getType(), 'get')
self.assertNotEqual(iq.getID(), None)
self.assertEqual(iq.getQuery().getName(), 'query')
self.assertEqual(iq.getQuery().getNamespace(), fshare_protocol.NS_FILE_SHARING)
self.assertEqual(iq.getQuery().getAttr('node'), 'documents/test2.txt')
def test_offer(self):
pass
def test_reply(self):
pass
if __name__ == '__main__':
unittest.main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment