Skip to content
Snippets Groups Projects
Commit d45aaa85 authored by dkirov's avatar dkirov
Browse files

FT for zeroconf

parent 109ae1a3
No related branches found
No related tags found
No related merge requests found
......@@ -13,6 +13,7 @@
## GNU General Public License for more details.
##
from common import gajim
import common.xmpp
from common.xmpp.idlequeue import IdleObject
from common.xmpp import dispatcher_nb, simplexml
from common.xmpp.client import *
......@@ -135,7 +136,7 @@ def on_connect(self, conn):
self.Connection = conn
self.Connection.PlugIn(self)
dispatcher_nb.Dispatcher().PlugIn(self)
self.RegisterHandler('message', self._messageCB)
self._register_handlers()
if self.sock_type == TYPE_CLIENT:
while self.messagequeue:
message = self.messagequeue.pop(0)
......@@ -206,10 +207,22 @@ def _on_receive_document_attrs(self, data):
return True
def _messageCB(self, conn, data):
self._caller._messageCB(self.Server, conn, data)
def _register_handlers(self):
self.RegisterHandler('message', lambda conn, data:self._caller._messageCB(self.Server, conn, data))
self.RegisterHandler('iq', self._caller._siSetCB, 'set',
common.xmpp.NS_SI)
self.RegisterHandler('iq', self._caller._siErrorCB, 'error',
common.xmpp.NS_SI)
self.RegisterHandler('iq', self._caller._siResultCB, 'result',
common.xmpp.NS_SI)
self.RegisterHandler('iq', self._caller._bytestreamSetCB, 'set',
common.xmpp.NS_BYTESTREAM)
self.RegisterHandler('iq', self._caller._bytestreamResultCB, 'result',
common.xmpp.NS_BYTESTREAM)
self.RegisterHandler('iq', self._caller._bytestreamErrorCB, 'error',
common.xmpp.NS_BYTESTREAM)
class P2PConnection(IdleObject, PlugIn):
''' class for sending file to socket over socks5 '''
def __init__(self, sock_hash, _sock, host = None, port = None, caller = None, on_connect = None):
......
......@@ -50,6 +50,479 @@
gajim.log.debug(_('Unable to load idle module'))
HAS_IDLE = False
class ConnectionBytestream:
def __init__(self):
self.files_props = {}
def is_transfer_stoped(self, file_props):
if file_props.has_key('error') and file_props['error'] != 0:
return True
if file_props.has_key('completed') and file_props['completed']:
return True
if file_props.has_key('connected') and file_props['connected'] == False:
return True
if not file_props.has_key('stopped') or not file_props['stopped']:
return False
return True
def send_success_connect_reply(self, streamhost):
''' send reply to the initiator of FT that we
made a connection
'''
if streamhost is None:
return None
iq = common.xmpp.Iq(to = streamhost['initiator'], typ = 'result',
frm = streamhost['target'])
iq.setAttr('id', streamhost['id'])
query = iq.setTag('query')
query.setNamespace(common.xmpp.NS_BYTESTREAM)
stream_tag = query.setTag('streamhost-used')
stream_tag.setAttr('jid', streamhost['jid'])
self.connection.send(iq)
def remove_transfers_for_contact(self, contact):
''' stop all active transfer for contact '''
for file_props in self.files_props.values():
if self.is_transfer_stoped(file_props):
continue
receiver_jid = unicode(file_props['receiver']).split('/')[0]
if contact.jid == receiver_jid:
file_props['error'] = -5
self.remove_transfer(file_props)
self.dispatch('FILE_REQUEST_ERROR', (contact.jid, file_props, ''))
sender_jid = unicode(file_props['sender'])
if contact.jid == sender_jid:
file_props['error'] = -3
self.remove_transfer(file_props)
def remove_all_transfers(self):
''' stops and removes all active connections from the socks5 pool '''
for file_props in self.files_props.values():
self.remove_transfer(file_props, remove_from_list = False)
del(self.files_props)
self.files_props = {}
def remove_transfer(self, file_props, remove_from_list = True):
if file_props is None:
return
self.disconnect_transfer(file_props)
sid = file_props['sid']
gajim.socks5queue.remove_file_props(self.name, sid)
if remove_from_list:
if self.files_props.has_key('sid'):
del(self.files_props['sid'])
def disconnect_transfer(self, file_props):
if file_props is None:
return
if file_props.has_key('hash'):
gajim.socks5queue.remove_sender(file_props['hash'])
if file_props.has_key('streamhosts'):
for host in file_props['streamhosts']:
if host.has_key('idx') and host['idx'] > 0:
gajim.socks5queue.remove_receiver(host['idx'])
gajim.socks5queue.remove_sender(host['idx'])
def send_socks5_info(self, file_props, fast = True, receiver = None,
sender = None):
''' send iq for the present streamhosts and proxies '''
if type(self.peerhost) != tuple:
return
port = gajim.config.get('file_transfers_port')
ft_override_host_to_send = gajim.config.get('ft_override_host_to_send')
if receiver is None:
receiver = file_props['receiver']
if sender is None:
sender = file_props['sender']
proxyhosts = []
sha_str = helpers.get_auth_sha(file_props['sid'], sender,
receiver)
file_props['sha_str'] = sha_str
if not ft_override_host_to_send:
ft_override_host_to_send = self.peerhost[0]
try:
ft_override_host_to_send = socket.gethostbyname(
ft_override_host_to_send)
except socket.gaierror:
self.dispatch('ERROR', (_('Wrong host'), _('The host you configured as the ft_override_host_to_send advanced option is not valid, so ignored.')))
ft_override_host_to_send = self.peerhost[0]
listener = gajim.socks5queue.start_listener(self.peerhost[0], port,
sha_str, self._result_socks5_sid, file_props['sid'])
if listener == None:
file_props['error'] = -5
self.dispatch('FILE_REQUEST_ERROR', (unicode(receiver), file_props,
''))
self._connect_error(unicode(receiver), file_props['sid'],
file_props['sid'], code = 406)
return
iq = common.xmpp.Protocol(name = 'iq', to = unicode(receiver),
typ = 'set')
file_props['request-id'] = 'id_' + file_props['sid']
iq.setID(file_props['request-id'])
query = iq.setTag('query')
query.setNamespace(common.xmpp.NS_BYTESTREAM)
query.setAttr('mode', 'tcp')
query.setAttr('sid', file_props['sid'])
streamhost = query.setTag('streamhost')
streamhost.setAttr('port', unicode(port))
streamhost.setAttr('host', ft_override_host_to_send)
streamhost.setAttr('jid', sender)
self.connection.send(iq)
def send_file_rejection(self, file_props):
''' informs sender that we refuse to download the file '''
# user response to ConfirmationDialog may come after we've disconneted
if not self.connection or self.connected < 2:
return
iq = common.xmpp.Protocol(name = 'iq',
to = unicode(file_props['sender']), typ = 'error')
iq.setAttr('id', file_props['request-id'])
err = common.xmpp.ErrorNode(code = '403', typ = 'cancel', name =
'forbidden', text = 'Offer Declined')
iq.addChild(node=err)
self.connection.send(iq)
def send_file_approval(self, file_props):
''' send iq, confirming that we want to download the file '''
# user response to ConfirmationDialog may come after we've disconneted
if not self.connection or self.connected < 2:
return
iq = common.xmpp.Protocol(name = 'iq',
to = unicode(file_props['sender']), typ = 'result')
iq.setAttr('id', file_props['request-id'])
si = iq.setTag('si')
si.setNamespace(common.xmpp.NS_SI)
if file_props.has_key('offset') and file_props['offset']:
file_tag = si.setTag('file')
file_tag.setNamespace(common.xmpp.NS_FILE)
range_tag = file_tag.setTag('range')
range_tag.setAttr('offset', file_props['offset'])
feature = si.setTag('feature')
feature.setNamespace(common.xmpp.NS_FEATURE)
_feature = common.xmpp.DataForm(typ='submit')
feature.addChild(node=_feature)
field = _feature.setField('stream-method')
field.delAttr('type')
field.setValue(common.xmpp.NS_BYTESTREAM)
self.connection.send(iq)
def send_file_request(self, file_props):
''' send iq for new FT request '''
if not self.connection or self.connected < 2:
return
our_jid = gajim.get_jid_from_account(self.name)
frm = our_jid
file_props['sender'] = frm
fjid = file_props['receiver'].jid
iq = common.xmpp.Protocol(name = 'iq', to = fjid,
typ = 'set')
iq.setID(file_props['sid'])
self.files_props[file_props['sid']] = file_props
si = iq.setTag('si')
si.setNamespace(common.xmpp.NS_SI)
si.setAttr('profile', common.xmpp.NS_FILE)
si.setAttr('id', file_props['sid'])
file_tag = si.setTag('file')
file_tag.setNamespace(common.xmpp.NS_FILE)
file_tag.setAttr('name', file_props['name'])
file_tag.setAttr('size', file_props['size'])
desc = file_tag.setTag('desc')
if file_props.has_key('desc'):
desc.setData(file_props['desc'])
file_tag.setTag('range')
feature = si.setTag('feature')
feature.setNamespace(common.xmpp.NS_FEATURE)
_feature = common.xmpp.DataForm(typ='form')
feature.addChild(node=_feature)
field = _feature.setField('stream-method')
field.setAttr('type', 'list-single')
field.addOption(common.xmpp.NS_BYTESTREAM)
self.connection.send(iq)
def _result_socks5_sid(self, sid, hash_id):
''' store the result of sha message from auth. '''
if not self.files_props.has_key(sid):
return
file_props = self.files_props[sid]
file_props['hash'] = hash_id
return
def _connect_error(self, to, _id, sid, code = 404):
''' cb, when there is an error establishing BS connection, or
when connection is rejected'''
msg_dict = {
404: 'Could not connect to given hosts',
405: 'Cancel',
406: 'Not acceptable',
}
msg = msg_dict[code]
iq = None
iq = common.xmpp.Protocol(name = 'iq', to = to,
typ = 'error')
iq.setAttr('id', _id)
err = iq.setTag('error')
err.setAttr('code', unicode(code))
err.setData(msg)
self.connection.send(iq)
if code == 404:
file_props = gajim.socks5queue.get_file_props(self.name, sid)
if file_props is not None:
self.disconnect_transfer(file_props)
file_props['error'] = -3
self.dispatch('FILE_REQUEST_ERROR', (to, file_props, msg))
def _proxy_auth_ok(self, proxy):
'''cb, called after authentication to proxy server '''
file_props = self.files_props[proxy['sid']]
iq = common.xmpp.Protocol(name = 'iq', to = proxy['initiator'],
typ = 'set')
auth_id = "au_" + proxy['sid']
iq.setID(auth_id)
query = iq.setTag('query')
query.setNamespace(common.xmpp.NS_BYTESTREAM)
query.setAttr('sid', proxy['sid'])
activate = query.setTag('activate')
activate.setData(file_props['proxy_receiver'])
iq.setID(auth_id)
self.connection.send(iq)
# register xmpppy handlers for bytestream and FT stanzas
def _bytestreamErrorCB(self, con, iq_obj):
gajim.log.debug('_bytestreamErrorCB')
id = unicode(iq_obj.getAttr('id'))
frm = unicode(iq_obj.getFrom())
query = iq_obj.getTag('query')
gajim.proxy65_manager.error_cb(frm, query)
jid = unicode(iq_obj.getFrom())
id = id[3:]
if not self.files_props.has_key(id):
return
file_props = self.files_props[id]
file_props['error'] = -4
self.dispatch('FILE_REQUEST_ERROR', (jid, file_props, ''))
raise common.xmpp.NodeProcessed
def _bytestreamSetCB(self, con, iq_obj):
gajim.log.debug('_bytestreamSetCB')
target = unicode(iq_obj.getAttr('to'))
id = unicode(iq_obj.getAttr('id'))
query = iq_obj.getTag('query')
sid = unicode(query.getAttr('sid'))
file_props = gajim.socks5queue.get_file_props(
self.name, sid)
streamhosts=[]
for item in query.getChildren():
if item.getName() == 'streamhost':
host_dict={
'state': 0,
'target': target,
'id': id,
'sid': sid,
'initiator': unicode(iq_obj.getFrom())
}
for attr in item.getAttrs():
host_dict[attr] = item.getAttr(attr)
streamhosts.append(host_dict)
if file_props is None:
if self.files_props.has_key(sid):
file_props = self.files_props[sid]
file_props['fast'] = streamhosts
if file_props['type'] == 's':
if file_props.has_key('streamhosts'):
file_props['streamhosts'].extend(streamhosts)
else:
file_props['streamhosts'] = streamhosts
if not gajim.socks5queue.get_file_props(self.name, sid):
gajim.socks5queue.add_file_props(self.name, file_props)
gajim.socks5queue.connect_to_hosts(self.name, sid,
self.send_success_connect_reply, None)
raise common.xmpp.NodeProcessed
file_props['streamhosts'] = streamhosts
if file_props['type'] == 'r':
gajim.socks5queue.connect_to_hosts(self.name, sid,
self.send_success_connect_reply, self._connect_error)
raise common.xmpp.NodeProcessed
def _ResultCB(self, con, iq_obj):
gajim.log.debug('_ResultCB')
# if we want to respect jep-0065 we have to check for proxy
# activation result in any result iq
real_id = unicode(iq_obj.getAttr('id'))
if real_id[:3] != 'au_':
return
frm = unicode(iq_obj.getFrom())
id = real_id[3:]
if self.files_props.has_key(id):
file_props = self.files_props[id]
if file_props['streamhost-used']:
for host in file_props['proxyhosts']:
if host['initiator'] == frm and host.has_key('idx'):
gajim.socks5queue.activate_proxy(host['idx'])
raise common.xmpp.NodeProcessed
def _bytestreamResultCB(self, con, iq_obj):
gajim.log.debug('_bytestreamResultCB')
frm = unicode(iq_obj.getFrom())
real_id = unicode(iq_obj.getAttr('id'))
query = iq_obj.getTag('query')
gajim.proxy65_manager.resolve_result(frm, query)
try:
streamhost = query.getTag('streamhost-used')
except: # this bytestream result is not what we need
pass
id = real_id[3:]
if self.files_props.has_key(id):
file_props = self.files_props[id]
else:
raise common.xmpp.NodeProcessed
if streamhost is None:
# proxy approves the activate query
if real_id[:3] == 'au_':
id = real_id[3:]
if not file_props.has_key('streamhost-used') or \
file_props['streamhost-used'] is False:
raise common.xmpp.NodeProcessed
if not file_props.has_key('proxyhosts'):
raise common.xmpp.NodeProcessed
for host in file_props['proxyhosts']:
if host['initiator'] == frm and \
unicode(query.getAttr('sid')) == file_props['sid']:
gajim.socks5queue.activate_proxy(host['idx'])
break
raise common.xmpp.NodeProcessed
jid = streamhost.getAttr('jid')
if file_props.has_key('streamhost-used') and \
file_props['streamhost-used'] is True:
raise common.xmpp.NodeProcessed
if real_id[:3] == 'au_':
gajim.socks5queue.send_file(file_props, self.name)
raise common.xmpp.NodeProcessed
proxy = None
if file_props.has_key('proxyhosts'):
for proxyhost in file_props['proxyhosts']:
if proxyhost['jid'] == jid:
proxy = proxyhost
if proxy != None:
file_props['streamhost-used'] = True
if not file_props.has_key('streamhosts'):
file_props['streamhosts'] = []
file_props['streamhosts'].append(proxy)
file_props['is_a_proxy'] = True
receiver = socks5.Socks5Receiver(gajim.idlequeue, proxy, file_props['sid'], file_props)
gajim.socks5queue.add_receiver(self.name, receiver)
proxy['idx'] = receiver.queue_idx
gajim.socks5queue.on_success = self._proxy_auth_ok
raise common.xmpp.NodeProcessed
else:
gajim.socks5queue.send_file(file_props, self.name)
if file_props.has_key('fast'):
fasts = file_props['fast']
if len(fasts) > 0:
self._connect_error(frm, fasts[0]['id'], file_props['sid'],
code = 406)
raise common.xmpp.NodeProcessed
def _siResultCB(self, con, iq_obj):
gajim.log.debug('_siResultCB')
self.peerhost = con._owner.Connection._sock.getsockname()
id = iq_obj.getAttr('id')
if not self.files_props.has_key(id):
# no such jid
return
file_props = self.files_props[id]
if file_props is None:
# file properties for jid is none
return
if file_props.has_key('request-id'):
# we have already sent streamhosts info
return
file_props['receiver'] = unicode(iq_obj.getFrom())
si = iq_obj.getTag('si')
file_tag = si.getTag('file')
range_tag = None
if file_tag:
range_tag = file_tag.getTag('range')
if range_tag:
offset = range_tag.getAttr('offset')
if offset:
file_props['offset'] = int(offset)
length = range_tag.getAttr('length')
if length:
file_props['length'] = int(length)
feature = si.setTag('feature')
if feature.getNamespace() != common.xmpp.NS_FEATURE:
return
form_tag = feature.getTag('x')
form = common.xmpp.DataForm(node=form_tag)
field = form.getField('stream-method')
if field.getValue() != common.xmpp.NS_BYTESTREAM:
return
self.send_socks5_info(file_props, fast = True)
raise common.xmpp.NodeProcessed
def _siSetCB(self, con, iq_obj):
gajim.log.debug('_siSetCB')
jid = unicode(iq_obj.getFrom())
si = iq_obj.getTag('si')
profile = si.getAttr('profile')
mime_type = si.getAttr('mime-type')
if profile != common.xmpp.NS_FILE:
return
file_tag = si.getTag('file')
file_props = {'type': 'r'}
for attribute in file_tag.getAttrs():
if attribute in ('name', 'size', 'hash', 'date'):
val = file_tag.getAttr(attribute)
if val is None:
continue
file_props[attribute] = val
file_desc_tag = file_tag.getTag('desc')
if file_desc_tag is not None:
file_props['desc'] = file_desc_tag.getData()
if mime_type is not None:
file_props['mime-type'] = mime_type
our_jid = gajim.get_jid_from_account(self.name)
file_props['receiver'] = our_jid
file_props['sender'] = unicode(iq_obj.getFrom())
file_props['request-id'] = unicode(iq_obj.getAttr('id'))
file_props['sid'] = unicode(si.getAttr('id'))
gajim.socks5queue.add_file_props(self.name, file_props)
self.dispatch('FILE_REQUEST', (jid, file_props))
raise common.xmpp.NodeProcessed
def _siErrorCB(self, con, iq_obj):
gajim.log.debug('_siErrorCB')
si = iq_obj.getTag('si')
profile = si.getAttr('profile')
if profile != common.xmpp.NS_FILE:
return
id = iq_obj.getAttr('id')
if not self.files_props.has_key(id):
# no such jid
return
file_props = self.files_props[id]
if file_props is None:
# file properties for jid is none
return
jid = unicode(iq_obj.getFrom())
file_props['error'] = -3
self.dispatch('FILE_REQUEST_ERROR', (jid, file_props, ''))
raise common.xmpp.NodeProcessed
class ConnectionVcard:
def __init__(self):
self.vcard_sha = None
......@@ -212,9 +685,10 @@ def send_vcard(self, vcard):
'''
pass
class ConnectionHandlersZeroconf(ConnectionVcard):
class ConnectionHandlersZeroconf(ConnectionVcard, ConnectionBytestream):
def __init__(self):
ConnectionVcard.__init__(self)
ConnectionBytestream.__init__(self)
# List of IDs we are waiting answers for {id: (type_of_request, data), }
self.awaiting_answers = {}
# List of IDs that will produce a timeout is answer doesn't arrive
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment