Commit d595d93b authored by Philipp Hörist's avatar Philipp Hörist
Browse files

imprv: HTTP: Better handle finish of messages

- Always use content sniffer
- Connect `got-body` to determain if body was fully received
- Connect `content-sniffed` and handle header checks there
- Add more logging
parent 0da8888d
......@@ -23,6 +23,7 @@ from packaging.version import Version
from gi.repository import Gio
from gi.repository import GLib
from gi.repository import Soup
GLIB_VERSION = Version(
......@@ -706,3 +707,13 @@ NOT_ALLOWED_XML_CHARS = {
'\x0C': '',
'\x1B': ''
}
SOUP_ENCODING: set[Soup.Encoding] = {
Soup.Encoding.UNRECOGNIZED,
Soup.Encoding.NONE,
Soup.Encoding.CONTENT_LENGTH,
Soup.Encoding.EOF,
Soup.Encoding.CHUNKED,
Soup.Encoding.BYTERANGES,
}
......@@ -18,7 +18,6 @@ from __future__ import annotations
from pathlib import Path
from typing import Any
from typing import cast
from typing import Literal
from typing import Callable
from typing import Optional
......@@ -31,6 +30,7 @@ from gi.repository import GLib
from gi.repository import GObject
import nbxmpp
from .util import convert_soup_encoding
from .const import HTTPRequestError
......@@ -59,16 +59,11 @@ class HTTPLogAdapter(logging.LoggerAdapter):
class HTTPSession:
def __init__(self,
user_agent: str = DEFAULT_USER_AGENT,
sniffer: bool = False
) -> None:
def __init__(self, user_agent: str = DEFAULT_USER_AGENT) -> None:
self._session = Soup.Session()
self._session.set_user_agent(user_agent)
if sniffer:
self._session.add_feature_by_type(Soup.ContentSniffer)
self._session.add_feature_by_type(Soup.ContentSniffer)
def get_soup_session(self) -> Soup.Session:
return self._session
......@@ -88,6 +83,7 @@ class HTTPRequest(GObject.GObject):
__gtype_name__ = "HTTPRequest"
__gsignals__ = {
'content-sniffed': (SIGNAL_ACTIONS, None, (int, str)),
'starting-response-body': (SIGNAL_ACTIONS, None, ()),
'response-progress': (SIGNAL_ACTIONS, None, (float,)),
'request-progress': (SIGNAL_ACTIONS, None, (float,)),
......@@ -106,16 +102,18 @@ class HTTPRequest(GObject.GObject):
self._sent_size = 0
self._cancellable = Gio.Cancellable()
self._input_stream = cast(Gio.InputStream, None)
self._input_stream: Optional[Gio.InputStream] = None
self._output_stream: Optional[Gio.OutputStream] = None
self._is_finished = False
self._error: Optional[HTTPRequestError] = None
self._is_complete = False
self._timeout_reached = False
self._timeout_id = None
self._no_content_length_set = False
self._response_body_file: Optional[Gio.File] = None
self._response_body_data = b''
self._body_received = False
self._request_body_file: Optional[Gio.File] = None
self._request_body_data: Optional[bytes] = None
......@@ -146,7 +144,7 @@ class HTTPRequest(GObject.GObject):
def get_error_string(self) -> str:
if self._error == HTTPRequestError.STATUS_NOT_OK:
return self._message.get_reason_phrase()
return f'{self._error}'
return repr(self._error)
def get_response_headers(self) -> Soup.MessageHeaders:
return self._message.get_response_headers()
......@@ -178,6 +176,7 @@ class HTTPRequest(GObject.GObject):
if self._is_finished:
raise ValueError('Session already finished')
self._log.info('Cancel requested')
self._cancellable.cancel()
def set_request_body_from_path(self, content_type: str, path: Path) -> None:
......@@ -277,6 +276,8 @@ class HTTPRequest(GObject.GObject):
self._message.connect('wrote-body-data',
self._on_request_body_progress)
self._message.connect('content-sniffed', self._on_content_sniffed)
self._message.connect('got-body', self._on_got_body)
self._message.connect('finished', self._on_finished)
soup_session = self._session.get_soup_session()
......@@ -312,23 +313,21 @@ class HTTPRequest(GObject.GObject):
try:
self._input_stream = session.send_finish(result)
except GLib.Error as error:
self._log.error(error)
quark = GLib.quark_try_string('g-io-error-quark')
if error.matches(quark, Gio.IOErrorEnum.CANCELLED):
self._set_failed(HTTPRequestError.CANCELLED)
else:
self._set_failed(HTTPRequestError.UNKNOWN)
return
if self._no_content_length_set:
self._set_failed(HTTPRequestError.MISSING_CONTENT_LENGTH)
else:
self._set_failed(HTTPRequestError.CANCELLED)
return
if self._message.get_status() not in (Soup.Status.OK, Soup.Status.CREATED):
self._set_failed(HTTPRequestError.STATUS_NOT_OK)
self._log.error(error)
self._set_failed(HTTPRequestError.UNKNOWN)
return
headers = self.get_response_headers()
self._response_content_length = headers.get_content_length()
self._response_content_type, _params = headers.get_content_type()
if self._response_content_length == 0:
self._set_failed(HTTPRequestError.MISSING_CONTENT_LENGTH)
if self._message.get_status() not in (Soup.Status.OK,
Soup.Status.CREATED):
self._set_failed(HTTPRequestError.STATUS_NOT_OK)
return
self._log.info('Start downloading response body')
......@@ -337,6 +336,7 @@ class HTTPRequest(GObject.GObject):
self._read_async()
def _read_async(self) -> None:
assert self._input_stream is not None
self._input_stream.read_bytes_async(CHUNK_SIZE,
GLib.PRIORITY_LOW,
self._cancellable,
......@@ -399,26 +399,54 @@ class HTTPRequest(GObject.GObject):
self._set_failed(error)
def _on_finished(self, _message: Soup.Message) -> None:
self._set_finished()
def _on_content_sniffed(self,
message: Soup.Message,
content_type: str,
_params: GLib.HashTable,
) -> None:
headers = message.get_response_headers()
encoding = headers.get_encoding()
if Soup.Encoding.CONTENT_LENGTH not in convert_soup_encoding(encoding):
self._log.warning('No content-length in response')
self._no_content_length_set = True
self.cancel()
return
def _set_finished(self) -> None:
status = self._message.get_status()
if status == Soup.Status.NONE:
# Message has not been sent, can happen when we cancel the message
# before it is sent. The finished signal triggers before the
# cancelled exception.
self._response_content_length = headers.get_content_length()
if self._response_content_length == 0:
self._log.warning('No content-length in response')
self._no_content_length_set = True
self.cancel()
return
if self._is_finished:
self._response_content_type = content_type
self.emit('content-sniffed',
self._response_content_length,
self._response_content_type)
def _on_got_body(self, _message: Soup.Message) -> None:
# This signal tells us that the full body was received.
# The `finished` signal is not a sure indicator if the message body
# was received in full, as its also triggered when a message is
# cancelled.
self._log.info('Body received')
self._body_received = True
def _on_finished(self, _message: Soup.Message) -> None:
self._log.info('Message finished')
if not self._body_received:
# This can happen when the message is cancelled. The `finished`
# signal is raised whenever the input stream is closed.
# In the case the message was cancelled other parts of the code
# will call set_failed().
return
headers = self._message.get_response_headers()
content_length = headers.get_content_length()
if self._received_size != content_length:
if self._received_size != self._response_content_length:
self._set_failed(HTTPRequestError.INCOMPLETE)
return
status = self._message.get_status()
if status not in (Soup.Status.OK, Soup.Status.CREATED):
self._set_failed(HTTPRequestError.STATUS_NOT_OK)
return
......@@ -426,6 +454,7 @@ class HTTPRequest(GObject.GObject):
self._set_complete()
def _set_failed(self, error: HTTPRequestError) -> None:
self._log.info('Set Failed: %s', error)
self._is_finished = True
if self._timeout_reached:
self._timeout_id = None
......@@ -437,6 +466,7 @@ class HTTPRequest(GObject.GObject):
self._cleanup()
def _set_complete(self) -> None:
self._log.info('Set Complete')
self._is_finished = True
self._is_complete = True
self._close_all_streams()
......@@ -460,12 +490,14 @@ class HTTPRequest(GObject.GObject):
del self._cancellable
del self._session
del self._input_stream
del self._output_stream
del self._user_data
self._input_stream = None
self._output_stream = None
if self._timeout_id is not None:
GLib.source_remove(self._timeout_id)
self._timeout_id = None
self.emit('destroy')
self.run_dispose()
......
......@@ -36,9 +36,11 @@ from functools import lru_cache
from packaging.version import Version
from gi.repository import Gio
from gi.repository import Soup
from nbxmpp.protocol import DiscoInfoMalformed
from nbxmpp.const import GIO_TLS_ERRORS
from nbxmpp.const import SOUP_ENCODING
from nbxmpp.const import GLIB_VERSION
from nbxmpp.namespaces import Namespace
from nbxmpp.protocol import StanzaMalformed
......@@ -400,6 +402,10 @@ def convert_tls_error_flags(flags):
return set(filter(lambda error: error & flags, GIO_TLS_ERRORS.keys()))
def convert_soup_encoding(flags: int) -> set[Soup.Encoding]:
return set(filter(lambda enc: enc & flags, SOUP_ENCODING))
def get_websocket_close_string(websocket: Any) -> str:
data = websocket.get_close_data()
code = websocket.get_close_code()
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment