From fdfc9c90a1654e974070115c59cc8b5d7cb2ecdb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20H=C3=B6rist?= <philipp@hoerist.com> Date: Sun, 19 May 2019 15:16:37 +0200 Subject: [PATCH] Move IBB code to new IBB module --- gajim/common/connection_handlers.py | 8 +- gajim/common/jingle_ftstates.py | 5 +- gajim/common/modules/__init__.py | 1 + gajim/common/modules/ibb.py | 227 +++++++++++++++++++++++++ gajim/common/protocol/bytestream.py | 255 ---------------------------- gajim/gtk/filetransfer.py | 2 +- 6 files changed, 233 insertions(+), 265 deletions(-) create mode 100644 gajim/common/modules/ibb.py diff --git a/gajim/common/connection_handlers.py b/gajim/common/connection_handlers.py index 6491cc4fcc..a61bcb8e59 100644 --- a/gajim/common/connection_handlers.py +++ b/gajim/common/connection_handlers.py @@ -34,7 +34,6 @@ from gajim.common import helpers from gajim.common import jingle_xtls from gajim.common.jingle import ConnectionJingle from gajim.common.protocol.bytestream import ConnectionSocks5Bytestream -from gajim.common.protocol.bytestream import ConnectionIBBytestream from gajim.common.connection_handlers_events import StreamReceivedEvent from gajim.common.connection_handlers_events import PresenceReceivedEvent from gajim.common.connection_handlers_events import StreamConflictReceivedEvent @@ -185,10 +184,9 @@ class ConnectionHandlersBase: class ConnectionHandlers(ConnectionSocks5Bytestream, ConnectionHandlersBase, - ConnectionJingle, ConnectionIBBytestream): + ConnectionJingle): def __init__(self): ConnectionSocks5Bytestream.__init__(self) - ConnectionIBBytestream.__init__(self) ConnectionJingle.__init__(self) ConnectionHandlersBase.__init__(self) @@ -223,10 +221,6 @@ class ConnectionHandlers(ConnectionSocks5Bytestream, nbxmpp.NS_BYTESTREAM) con.RegisterHandler('iq', self._bytestreamErrorCB, 'error', nbxmpp.NS_BYTESTREAM) - con.RegisterHandlerOnce('iq', self.IBBAllIqHandler) - con.RegisterHandler('iq', self.IBBIqHandler, ns=nbxmpp.NS_IBB) - con.RegisterHandler('message', self.IBBMessageHandler, ns=nbxmpp.NS_IBB) - con.RegisterHandler('iq', self._JingleCB, 'result') con.RegisterHandler('iq', self._JingleCB, 'error') con.RegisterHandler('iq', self._JingleCB, 'set', nbxmpp.NS_JINGLE) diff --git a/gajim/common/jingle_ftstates.py b/gajim/common/jingle_ftstates.py index 767b636dc7..9657e0997e 100644 --- a/gajim/common/jingle_ftstates.py +++ b/gajim/common/jingle_ftstates.py @@ -146,8 +146,9 @@ class StateTransfering(JingleFileTransferStates): def _start_ibb_transfer(self, con): self.jft.file_props.transport_sid = self.jft.transport.sid fp = open(self.jft.file_props.file_name, 'rb') - con.OpenStream(self.jft.file_props.sid, self.jft.session.peerjid, fp, - blocksize=4096) + con.get_module('IBB').send_open(self.jft.session.peerjid, + self.jft.file_props.sid, + fp) def _start_sock5_transfer(self): # It tells wether we start the transfer as client or server diff --git a/gajim/common/modules/__init__.py b/gajim/common/modules/__init__.py index 6840db04de..1300edc6fc 100644 --- a/gajim/common/modules/__init__.py +++ b/gajim/common/modules/__init__.py @@ -74,6 +74,7 @@ MODULES = [ 'vcard_avatars', 'vcard_temp', 'announce', + 'ibb', ] _imported_modules = [] # type: List[tuple] diff --git a/gajim/common/modules/ibb.py b/gajim/common/modules/ibb.py new file mode 100644 index 0000000000..4e7f4b80fe --- /dev/null +++ b/gajim/common/modules/ibb.py @@ -0,0 +1,227 @@ +# This file is part of Gajim. +# +# Gajim is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published +# by the Free Software Foundation; version 3 only. +# +# Gajim is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Gajim. If not, see <http://www.gnu.org/licenses/>. + +# XEP-0047: In-Band Bytestreams + +import time + +import nbxmpp +from nbxmpp.protocol import NodeProcessed +from nbxmpp.structs import StanzaHandler +from nbxmpp.util import is_error_result + +from gajim.common import app +from gajim.common.modules.base import BaseModule +from gajim.common.file_props import FilesProp + + +class IBB(BaseModule): + + _nbxmpp_extends = 'IBB' + _nbxmpp_methods = [ + 'send_open', + 'send_close', + 'send_data', + 'send_reply', + ] + + def __init__(self, con): + BaseModule.__init__(self, con) + + self.handlers = [ + StanzaHandler(name='iq', + callback=self._ibb_received, + ns=nbxmpp.NS_IBB), + ] + + def _ibb_received(self, _con, stanza, properties): + if not properties.is_ibb: + return + + if properties.ibb.type == 'data': + self._log.info('Data received, sid: %s, seq: %s', + properties.ibb.sid, properties.ibb.seq) + file_props = FilesProp.getFilePropByTransportSid(self._account, + properties.ibb.sid) + if not file_props: + self.send_reply(stanza, nbxmpp.ERR_ITEM_NOT_FOUND) + raise NodeProcessed + + if file_props.connected: + self._on_data_received(stanza, file_props, properties) + self.send_reply(stanza) + + elif properties.ibb.type == 'open': + self._log.info('Open received, sid: %s, blocksize: %s', + properties.ibb.sid, properties.ibb.block_size) + + file_props = FilesProp.getFilePropByTransportSid(self._account, + properties.ibb.sid) + if not file_props: + self.send_reply(stanza, nbxmpp.ERR_ITEM_NOT_FOUND) + raise NodeProcessed + + file_props.block_size = properties.ibb.block_size + file_props.direction = '<' + file_props.seq = 0 + file_props.received_len = 0 + file_props.last_time = time.time() + file_props.error = 0 + file_props.paused = False + file_props.connected = True + file_props.completed = False + file_props.disconnect_cb = None + file_props.continue_cb = None + file_props.syn_id = stanza.getID() + file_props.fp = open(file_props.file_name, 'wb') + self.send_reply(stanza) + + elif properties.ibb.type == 'close': + self._log.info('Close received, sid: %s', properties.ibb.sid) + file_props = FilesProp.getFilePropByTransportSid(self._account, + properties.ibb.sid) + if not file_props: + self.send_reply(stanza, nbxmpp.ERR_ITEM_NOT_FOUND) + raise NodeProcessed + + self.send_reply(stanza) + file_props.fp.close() + file_props.completed = file_props.received_len >= file_props.size + if not file_props.completed: + file_props.error = -1 + app.socks5queue.complete_transfer_cb(self._account, file_props) + + raise NodeProcessed + + def _on_data_received(self, stanza, file_props, properties): + ibb = properties.ibb + if ibb.seq != file_props.seq: + self.send_reply(stanza, nbxmpp.ERR_UNEXPECTED_REQUEST) + self.send_close(file_props) + raise NodeProcessed + + self._log.debug('Data received: sid: %s, %s+%s bytes', + ibb.sid, file_props.fp.tell(), len(ibb.data)) + + file_props.seq += 1 + file_props.started = True + file_props.fp.write(ibb.data) + current_time = time.time() + file_props.elapsed_time += current_time - file_props.last_time + file_props.last_time = current_time + file_props.received_len += len(ibb.data) + app.socks5queue.progress_transfer_cb(self._account, file_props) + if file_props.received_len >= file_props.size: + file_props.completed = True + + def send_open(self, to, sid, fp): + self._log.info('Send open to %s, sid: %s', to, sid) + file_props = FilesProp.getFilePropBySid(sid) + file_props.direction = '>' + file_props.block_size = 4096 + file_props.fp = fp + file_props.seq = -1 + file_props.error = 0 + file_props.paused = False + file_props.received_len = 0 + file_props.last_time = time.time() + file_props.connected = True + file_props.completed = False + file_props.disconnect_cb = None + file_props.continue_cb = None + self._nbxmpp('IBB').send_open(to, + file_props.transport_sid, + 4096, + callback=self._on_open_result, + user_data=file_props) + return file_props + + def _on_open_result(self, result, file_props): + if is_error_result(result): + app.socks5queue.error_cb('Error', str(result)) + self._log.warning('Error: %s', result) + return + self.send_data(file_props) + + def send_close(self, file_props): + file_props.connected = False + file_props.fp.close() + file_props.stopped = True + to = file_props.receiver + if file_props.direction == '<': + to = file_props.sender + + self._log.info('Send close to %s, sid: %s', + to, file_props.transport_sid) + self._nbxmpp('IBB').send_close(to, file_props.transport_sid, + callback=self._on_close_result) + + if file_props.completed: + app.socks5queue.complete_transfer_cb(self._account, file_props) + else: + if file_props.type_ == 's': + peerjid = file_props.receiver + else: + peerjid = file_props.sender + session = self._con.get_jingle_session( + peerjid, file_props.sid, 'file') + # According to the xep, the initiator also cancels + # the jingle session if there are no more files to send using IBB + if session.weinitiate: + session.cancel_session() + + def _on_close_result(self, result): + if is_error_result(result): + app.socks5queue.error_cb('Error', str(result)) + self._log.warning('Error: %s', result) + return + + def send_data(self, file_props): + if file_props.completed: + self.send_close(file_props) + return + + chunk = file_props.fp.read(file_props.block_size) + if chunk: + file_props.seq += 1 + file_props.started = True + if file_props.seq == 65536: + file_props.seq = 0 + + self._log.info('Send data to %s, sid: %s', + file_props.receiver, file_props.transport_sid) + self._nbxmpp('IBB').send_data(file_props.receiver, + file_props.transport_sid, + file_props.seq, + chunk, + callback=self._on_data_result, + user_data=file_props) + current_time = time.time() + file_props.elapsed_time += current_time - file_props.last_time + file_props.last_time = current_time + file_props.received_len += len(chunk) + if file_props.size == file_props.received_len: + file_props.completed = True + app.socks5queue.progress_transfer_cb(self._account, file_props) + + def _on_data_result(self, result, file_props): + if is_error_result(result): + app.socks5queue.error_cb('Error', str(result)) + self._log.warning('Error: %s', result) + return + self.send_data(file_props) + + +def get_instance(*args, **kwargs): + return IBB(*args, **kwargs), 'IBB' diff --git a/gajim/common/protocol/bytestream.py b/gajim/common/protocol/bytestream.py index 22f288c699..f2eedba3b6 100644 --- a/gajim/common/protocol/bytestream.py +++ b/gajim/common/protocol/bytestream.py @@ -25,8 +25,6 @@ # along with Gajim. If not, see <http://www.gnu.org/licenses/>. import socket -import base64 -import time import logging import nbxmpp @@ -633,259 +631,6 @@ class ConnectionSocks5Bytestream(ConnectionBytestream): raise nbxmpp.NodeProcessed -class ConnectionIBBytestream(ConnectionBytestream): - - def __init__(self): - ConnectionBytestream.__init__(self) - self._streams = {} - - def IBBIqHandler(self, conn, stanza): - """ - Handles streams state change. Used internally. - """ - typ = stanza.getType() - log.debug('IBBIqHandler called typ->%s', typ) - if typ == 'set' and stanza.getTag('open'): - self.StreamOpenHandler(conn, stanza) - elif typ == 'set' and stanza.getTag('close'): - self.StreamCloseHandler(conn, stanza) - elif typ == 'set' and stanza.getTag('data'): - sid = stanza.getTagAttr('data', 'sid') - file_props = FilesProp.getFilePropByTransportSid(self.name, sid) - if not file_props: - conn.send(nbxmpp.Error(stanza, nbxmpp.ERR_ITEM_NOT_FOUND)) - elif file_props.connected and self.IBBMessageHandler(conn, - stanza): - reply = stanza.buildReply('result') - reply.delChild('data') - conn.send(reply) - elif not file_props.connected: - log.debug('Received IQ for closed filetransfer, IQ dropped') - elif typ == 'error': - app.socks5queue.error_cb() - else: - conn.send(nbxmpp.Error(stanza, nbxmpp.ERR_BAD_REQUEST)) - raise nbxmpp.NodeProcessed - - def StreamOpenHandler(self, conn, stanza): - """ - Handles opening of new incoming stream. Used internally. - """ - err = None - sid = stanza.getTagAttr('open', 'sid') - blocksize = stanza.getTagAttr('open', 'block-size') - log.debug('StreamOpenHandler called sid->%s blocksize->%s', - sid, blocksize) - file_props = FilesProp.getFilePropByTransportSid(self.name, sid) - try: - blocksize = int(blocksize) - except Exception: - err = nbxmpp.ERR_BAD_REQUEST - if not sid or not blocksize: - err = nbxmpp.ERR_BAD_REQUEST - elif not file_props: - err = nbxmpp.ERR_UNEXPECTED_REQUEST - if err: - rep = nbxmpp.Error(stanza, err) - else: - log.debug("Opening stream: id %s, block-size %s", - sid, blocksize) - rep = nbxmpp.Protocol('iq', stanza.getFrom(), 'result', - stanza.getTo(), {'id': stanza.getID()}) - file_props.block_size = blocksize - file_props.direction = '<' - file_props.seq = 0 - file_props.received_len = 0 - file_props.last_time = time.time() - file_props.error = 0 - file_props.paused = False - file_props.connected = True - file_props.completed = False - file_props.disconnect_cb = None - file_props.continue_cb = None - file_props.syn_id = stanza.getID() - file_props.fp = open(file_props.file_name, 'wb') - conn.send(rep) - - def CloseIBBStream(self, file_props): - file_props.connected = False - file_props.fp.close() - file_props.stopped = True - to = file_props.receiver - if file_props.direction == '<': - to = file_props.sender - self.connection.send( - nbxmpp.Protocol('iq', to, 'set', - payload=[nbxmpp.Node(nbxmpp.NS_IBB + ' close', - {'sid':file_props.transport_sid})])) - if file_props.completed: - app.socks5queue.complete_transfer_cb(self.name, file_props) - elif file_props.session_type == 'jingle': - peerjid = \ - file_props.receiver if file_props.type_ == 's' else file_props.sender - session = self.get_jingle_session(peerjid, file_props.sid, 'file') - # According to the xep, the initiator also cancels the jingle session - # if there are no more files to send using IBB - if session.weinitiate: - session.cancel_session() - - def OpenStream(self, sid, to, fp, blocksize=4096): - """ - Start new stream. You should provide stream id 'sid', the endpoint jid - 'to', the file object containing info for send 'fp'. Also the desired - blocksize can be specified. - Take into account that recommended stanza size is 4k and IBB uses - base64 encoding that increases size of data by 1/3. - """ - file_props = FilesProp.getFilePropBySid(sid) - file_props.direction = '>' - file_props.block_size = blocksize - file_props.fp = fp - file_props.seq = 0 - file_props.error = 0 - file_props.paused = False - file_props.received_len = 0 - file_props.last_time = time.time() - file_props.connected = True - file_props.completed = False - file_props.disconnect_cb = None - file_props.continue_cb = None - syn = nbxmpp.Protocol('iq', to, 'set', payload=[nbxmpp.Node( - nbxmpp.NS_IBB + ' open', {'sid': file_props.transport_sid, - 'block-size': blocksize, 'stanza': 'iq'})]) - self.connection.send(syn) - file_props.syn_id = syn.getID() - return file_props - - def SendHandler(self, file_props): - """ - Send next portion of data if it is time to do it. Used internally. - """ - log.debug('SendHandler called') - if file_props.completed: - self.CloseIBBStream(file_props) - if file_props.paused: - return - if not file_props.connected: - #TODO: Reply with out of order error - return - chunk = file_props.fp.read(file_props.block_size) - if chunk: - datanode = nbxmpp.Node(nbxmpp.NS_IBB + ' data', { - 'sid': file_props.transport_sid, - 'seq': file_props.seq}, - base64.b64encode(chunk).decode('ascii')) - file_props.seq += 1 - file_props.started = True - if file_props.seq == 65536: - file_props.seq = 0 - file_props.syn_id = self.connection.send( - nbxmpp.Protocol(name='iq', to=file_props.receiver, - typ='set', payload=[datanode])) - current_time = time.time() - file_props.elapsed_time += current_time - file_props.last_time - file_props.last_time = current_time - file_props.received_len += len(chunk) - if file_props.size == file_props.received_len: - file_props.completed = True - app.socks5queue.progress_transfer_cb(self.name, - file_props) - else: - log.debug('Nothing to read, but file not completed') - - def IBBMessageHandler(self, conn, stanza): - """ - Receive next portion of incoming datastream and store it write - it to temporary file. Used internally. - """ - sid = stanza.getTagAttr('data', 'sid') - seq = stanza.getTagAttr('data', 'seq') - data = stanza.getTagData('data') - log.debug('ReceiveHandler called sid->%s seq->%s', sid, seq) - try: - seq = int(seq) - data = base64.b64decode(data.encode('utf-8')) - except Exception: - seq = '' - data = b'' - err = None - file_props = FilesProp.getFilePropByTransportSid(self.name, sid) - if file_props is None: - err = nbxmpp.ERR_ITEM_NOT_FOUND - else: - if not data: - err = nbxmpp.ERR_BAD_REQUEST - elif seq != file_props.seq: - err = nbxmpp.ERR_UNEXPECTED_REQUEST - else: - log.debug('Successfully received sid->%s %s+%s bytes', - sid, file_props.fp.tell(), len(data)) - file_props.seq += 1 - file_props.started = True - file_props.fp.write(data) - current_time = time.time() - file_props.elapsed_time += current_time - file_props.last_time - file_props.last_time = current_time - file_props.received_len += len(data) - app.socks5queue.progress_transfer_cb(self.name, file_props) - if file_props.received_len >= file_props.size: - file_props.completed = True - if err: - log.debug('Error on receive: %s', err) - conn.send(nbxmpp.Error(nbxmpp.Iq(to=stanza.getFrom(), - frm=stanza.getTo(), - payload=[nbxmpp.Node(nbxmpp.NS_IBB + ' close')]), err, reply=0)) - else: - return True - - def StreamCloseHandler(self, conn, stanza): - """ - Handle stream closure due to all data transmitted. - Raise xmpppy event specifying successful data receive. - """ - sid = stanza.getTagAttr('close', 'sid') - log.debug('StreamCloseHandler called sid->%s', sid) - # look in sending files - file_props = FilesProp.getFilePropByTransportSid(self.name, sid) - if file_props: - reply = stanza.buildReply('result') - reply.delChild('close') - conn.send(reply) - # look in receiving files - file_props.fp.close() - file_props.completed = file_props.received_len >= file_props.size - if not file_props.completed: - file_props.error = -1 - app.socks5queue.complete_transfer_cb(self.name, file_props) - else: - conn.send(nbxmpp.Error(stanza, nbxmpp.ERR_ITEM_NOT_FOUND)) - - - def IBBAllIqHandler(self, conn, stanza): - """ - Handle remote side reply about if it agree or not to receive our - datastream. - Used internally. Raises xmpppy event specifying if the data transfer - is agreed upon. - """ - syn_id = stanza.getID() - log.debug('IBBAllIqHandler called syn_id->%s', syn_id) - for file_props in FilesProp.getAllFileProp(): - if not file_props.direction or not file_props.connected: - # It's socks5 bytestream - # Or we closed the IBB stream - continue - if file_props.syn_id == syn_id: - if stanza.getType() == 'error': - if file_props.direction[0] == '<': - conn.Event('IBB', 'ERROR ON RECEIVE', file_props) - else: - conn.Event('IBB', 'ERROR ON SEND', file_props) - elif stanza.getType() == 'result': - self.SendHandler(file_props) - break - - class ConnectionSocks5BytestreamZeroconf(ConnectionSocks5Bytestream): def _ft_get_from(self, iq_obj): diff --git a/gajim/gtk/filetransfer.py b/gajim/gtk/filetransfer.py index d87a44cdf7..aa0e0ad122 100644 --- a/gajim/gtk/filetransfer.py +++ b/gajim/gtk/filetransfer.py @@ -882,7 +882,7 @@ class FileTransfersWindow: con = app.connections[account] # Check if we are in a IBB transfer if file_props.direction: - con.CloseIBBStream(file_props) + con.get_module('IBB').send_close(file_props) con.disconnect_transfer(file_props) self.set_status(file_props, 'stop') -- GitLab