Commit 383ebd0b authored by Alexander's avatar Alexander

[stickers] Big rework!

Move much of the functionality into modules/stickers_module.py
parent f74b010d
......@@ -25,6 +25,7 @@ from gajim.gui.dialogs import ConfirmationDialog
from gajim.gui.dialogs import DialogButton
from stickers.utils import sticker_data_path
from stickers.gtk.stickers import StickerStorage
class StickersConfigDialog(GajimPluginConfigDialog):
def init(self):
......@@ -35,7 +36,7 @@ class StickersConfigDialog(GajimPluginConfigDialog):
box.pack_start(self._ui.stickers_config_dialog, True, True, 0)
self._ui.connect_signals(self)
self._list_model = self.plugin.sticker_pack_model
self._list_model = StickerStorage().get_model()
self._ui.sticker_width.set_range(0, 400)
self._ui.sticker_width.set_increments(1, -1)
self._ui.sticker_packs_list.bind_model(self._list_model, self._create_sticker_pack_row)
......
......@@ -15,6 +15,8 @@
# along with Gajim. If not, see <http://www.gnu.org/licenses/>..check_file_before_transfer(sticker)
#
import weakref
from gi.repository import Gio
from gi.repository import GObject
......@@ -83,3 +85,53 @@ class StickerPackObject(GObject.GObject):
strings,
pack.uploaded,
model)
class FilterListModel(GObject.GObject, Gio.ListModel):
'''
Since GTK (or GIO) do not provide us with a filterable proxy model
for ListStores, we just implement it ourselves until it somehow moves
into GTK (or GIO).
TODO: Remove once in GTK (or GIO)
'''
def __init__(self, parent):
super().__init__()
self._parent = parent
self._items_cache = []
self._n_items = 0
self._func = lambda x: True
self._parent.connect('items-changed', self._on_parent_changed)
def _refresh_cache(self):
before = self._n_items
self._items_cache = []
for index in range(self._parent.get_n_items()):
item = self._parent.get_item(index)
if self._func(item):
self._items_cache.append(weakref.ref(item))
self._n_items = len(self._items_cache)
self.items_changed(0, before, self._n_items)
def _on_parent_changed(self, model, position, removed, added):
self._refresh_cache()
def set_filter_func(self, func):
self._func = func
self.invalidate_filter()
def do_get_item(self, position):
if position >= self._n_items:
return None
return self._items_cache[position]()
def do_get_item_type(self):
return GObject.GType.children
def do_get_n_items(self):
return self._n_items
def invalidate_filter(self):
self._refresh_cache()
#
# Copyright (C) 2020 Alexander "PapaTutuWawa" <papatutuwawa AT polynom.me>
# This file is part of the Stickers plugin for Gajim.
#
# Gajim is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published
# by the Free Software Foundation; version 3 only.
#
# Gajim is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Gajim. If not, see <http://www.gnu.org/licenses/>.
#
import os
import logging
import json
import shutil
from gi.repository import Gio
from gajim.common.helpers import Singleton
from stickers.gtk.models import StickerPackObject
from stickers.common import dump_hashes
from stickers.utils import sticker_path
from stickers.utils import sticker_data_path
from stickers.utils import detect_path_escape
log = logging.getLogger('gajim.p.stickers')
class StickerStorage(metaclass=Singleton):
def __init__(self):
self._sticker_packs = {} # Sticker pack ID -> StickerPack object
self._sticker_pack_model = Gio.ListStore()
self._in_progress = [] # List of pack IDs that are currently being worked on
super().__init__()
def _model_append(self, pack):
'''
Appends the StickerPackObject pack to the model.
'''
self._sticker_pack_model.append(pack)
def _model_remove(self, pack_id):
'''
Remove the item with the id_ attribute of pack_id from the model.
Returns the index of this item, or -1 if not found.
'''
# Find the correct index in the ListStore
for index in range(self._sticker_pack_model.get_n_items()):
if self._sticker_pack_model.get_item(index).id_ == pack_id:
self._sticker_pack_model.remove(index)
return index
log.warning('Cannot remove %s from sticker_pack_model: Not found', pack_id)
return -1
def _model_replace(self, pack):
'''
Replaces the item with the id_ attribute of pack.id_ with the
StickerPackObject pack.
'''
index = self._model_remove(pack.id_)
if index >= 0:
self._sticker_pack_model.insert(index, pack)
def get_model(self):
return self._sticker_pack_model
def has_sticker_pack(self, id_):
'''
Returns True if we have the sticker pack with the ID of id_. False
otherwise.
'''
return id_ in self._sticker_packs
def get_sticker_pack(self, id_):
'''
Returns the sticker pack with the ID of id_. If not found, returns
None.
'''
if not self.has_sticker_pack(id_):
return None
return self._sticker_packs[id_]
def add_sticker_pack(self, pack, update_model=True):
'''
Add the sticker pack to the model, if update_model is True, and the
cache.
'''
self._sticker_packs[pack.id_] = pack
if update_model:
self.add_sticker_pack_to_model(pack.id_)
def add_sticker_pack_to_model(self, id_):
self._model_append(StickerPackObject.from_sticker_pack(self._sticker_packs[id_]))
def save_sticker_pack(self, id_):
'''
A folder is detected as being a sticker pack if it contains a
info.json. This is just a Gajim specific representation of all
data inside XEP-0449 that is preferrable to cache.
This function writes this based on a StickerPack object
'''
pack = self._sticker_packs[id_]
stickers = [{
'type': sticker.type,
'hashes': dump_hashes(sticker.hashes),
'size': sticker.size,
'dimension': sticker.dimension,
'desc': sticker.desc,
'suggests': sticker.suggests,
'filename': sticker.filename,
'url': sticker.url
} for sticker in pack.stickers]
obj = {
'id': pack.id_,
'name': pack.name,
'summary': pack.summary,
'stickers': stickers,
'hash': dump_hashes([pack.hash])[0]
}
with open(sticker_path(pack.id_, 'info.json'), 'w') as file_:
file_.write(json.dumps(obj, indent=4))
def remove_sticker_pack(self, id_):
'''
Removes the sticker pack with the ID of id_ from both the model and the
cache.
'''
self._model_remove(id_)
del self._sticker_packs[id_]
def delete_sticker_pack(self, pack_id):
# Check for possible path escapes
path = os.path.join(sticker_data_path(), pack_id)
if detect_path_escape(sticker_data_path(), path):
log.error('Possible path escape detected! %s', path)
log.error('Not removing path.')
return
shutil.rmtree(path)
self.remove_sticker_pack(pack_id)
def replace_sticker_pack(self, pack, write=False):
'''
Replaces the sticker pack with the ID of pack.id_ from both the model
and the cache. If write is True, then the new pack is also written
to disk.
'''
self._sticker_packs[pack.id_] = pack
self._model_replace(StickerPackObject.from_sticker_pack(pack))
if write:
self.save_sticker_pack(pack.id_)
def replace_sticker_attrs(self, pack_id, func, **kwargs):
'''
Replaces the attributes of a sticker inside a sticker pack. Note
that this does not update the model.
pack_id: The ID of the sticker pack
func: A function (Sticker -> Boolean) that matches the stciker
**kwargs: Attributes to replace
'''
for i, sticker in enumerate(self._sticker_packs[pack_id].stickers):
if not func(sticker):
continue
self._sticker_packs[pack_id].stickers[i] = sticker._replace(**kwargs)
return
def find_sticker(self, pack_id, func):
'''
Returns the first sticker in the sticker pack with ID pack_id for which
func(sticker) returns True. If none is found, None is returned
'''
for sticker in self._sticker_packs[pack_id].stickers:
if func(sticker):
return sticker
return None
def add_in_progress(self, id_):
'''
Add a sticker pack ID as being "in progress", meaning that we're in the
process of fetching it via PubSub and then downloading it.
'''
self._in_progress.append(id_)
def is_in_progress(self, id_):
'''
Returns True if the sticker pack ID id_ is currently being worked on.
'''
return id_ in self._in_progress
def remove_in_progress(self, id_):
'''
Removes a sticker pack ID from the list of worked on sticer packs.
'''
self._in_progress.remove(id_)
def sticker_pack_ids(self):
return self._sticker_packs.keys()
......@@ -15,57 +15,60 @@
# along with Gajim. If not, see <http://www.gnu.org/licenses/>.
#
import weakref
from gi.repository import GdkPixbuf
from gi.repository import Gio
from gi.repository import GObject
try:
from PIL import Image
except ImportError as err:
ERROR_MSG = str(err)
class FilterListModel(GObject.GObject, Gio.ListModel):
'''
Since GTK (or GIO) do not provide us with a filterable proxy model
for ListStores, we just implement it ourselves until it somehow moves
into GTK (or GIO).
from stickers.utils import sticker_path
from stickers.common import ANIMATED_MIME_TYPES
TODO: Remove once in GTK (or GIO)
def load_sticker_pixbuf(sticker_filename, pack_id, type_, preferred_width, show_animations):
'''
def __init__(self, parent):
super().__init__()
self._parent = parent
self._items_cache = []
self._n_items = 0
self._func = lambda x: True
self._parent.connect('items-changed', self._on_parent_changed)
def _refresh_cache(self):
before = self._n_items
self._items_cache = []
for index in range(self._parent.get_n_items()):
item = self._parent.get_item(index)
if self._func(item):
self._items_cache.append(weakref.ref(item))
self._n_items = len(self._items_cache)
self.items_changed(0, before, self._n_items)
def _on_parent_changed(self, model, position, removed, added):
self._refresh_cache()
Load sticker into a Pixbuf or PixbufSimpleAnim depending on the MIME type.
def set_filter_func(self, func):
self._func = func
self.invalidate_filter()
def do_get_item(self, position):
if position >= self._n_items:
return None
return self._items_cache[position]()
def do_get_item_type(self):
return GObject.GType.children
def do_get_n_items(self):
return self._n_items
sticker_filename: The filename of the sticker within its sticker pack
pack_id: The ID of the sticker pack
type_: The MIME type of the sticker
preferred_width: The width to scale the sticker to
show_animations: Should a possibly animated sticker be imported as an
animation (True) or a single Pixbuf (False)
'''
spath = sticker_path(pack_id, sticker_filename)
if type_ in ANIMATED_MIME_TYPES:
img = Image.open(spath)
# We can't use -1 for the height here, so we have to scale
# accordingly
scaled_height = (preferred_width / img.width) * img.height
pixbuf = GdkPixbuf.PixbufSimpleAnim.new(preferred_width, scaled_height, 20)
pixbuf.set_loop(True)
while True:
try:
img.seek(img.tell() + 1)
data = img.convert(mode='RGB', palette=img.getpalette()).tobytes()
frame_pixbuf = GdkPixbuf.Pixbuf.new_from_data(data,
GdkPixbuf.Colorspace.RGB,
False,
8,
img.width,
img.height,
img.width * 3)
scaled_pixbuf = frame_pixbuf.scale_simple(preferred_width,
scaled_height,
GdkPixbuf.InterpType.NEAREST)
if not show_animations:
img.close()
return scaled_pixbuf
def invalidate_filter(self):
self._refresh_cache()
pixbuf.add_frame(scaled_pixbuf)
except EOFError:
break
img.close()
else:
pixbuf = GdkPixbuf.Pixbuf.new_from_file_at_scale(spath,
preferred_width,
-1,
True)
return pixbuf
......@@ -14,30 +14,48 @@
# along with Gajim. If not, see <http://www.gnu.org/licenses/>.
#
from collections import namedtuple
from functools import partial
import logging
import os
from gi.repository import Soup
from gi.repository import Gtk
from nbxmpp.errors import PubSubStanzaError
from nbxmpp.namespaces import Namespace
from nbxmpp.simplexml import Node
from nbxmpp.structs import StanzaHandler
from gajim.common import app
from gajim.common.nec import NetworkEvent
from gajim.common.modules.base import BaseModule
from gajim.common.const import FTState
from gajim.gui.dialogs import ErrorDialog
from gajim.gui.dialogs import DialogButton
from gajim.plugins.plugins_i18n import _
from stickers.gtk.stickers import StickerStorage
from stickers.gtk.utils import load_sticker_pixbuf
from stickers.utils import find_one
from stickers.utils import sticker_path
from stickers.utils import detect_path_escape
from stickers.utils import body
from stickers.utils import verify_sticker_hashes
from stickers.common import Sticker
from stickers.common import StickerPack
from stickers.common import Hash
from stickers.common import PUBSUB_MAX_FEATURE
from stickers.utils import sticker_data_path
log = logging.getLogger('gajim.p.stickers.module')
name = 'Stickers'
zeroconf = False
DownloadCallbackData = namedtuple('DownloadCallbackData',
['sticker', 'sticker_pack', 'finished'])
def parse_hash_node(node):
'''
Parse a <hash> urn:xmpp:hashes:2 node.
......@@ -91,26 +109,177 @@ class StickersModule(BaseModule):
BaseModule.__init__(self, conn, plugin=True)
self._client = conn
self._plugin = find_one(lambda x: x.short_name == 'stickers', app.plugin_manager.plugins)
self._session = Soup.Session()
self._pack_upload_state = {} # Sticker pack ID -> (filename -> uploaded?)
self._account_pubsub_max = {} # Account name -> Max PubSub items
self._sticker_storage = StickerStorage()
self.handlers = [
StanzaHandler(name='message',
callback=self._message_received,
ns=Namespace.STICKERS,
priority=30)
]
def _message_received(self, _conn, stanza, properties):
log.debug('Received a message %s', str(stanza))
pack_id = stanza.getTag('sticker').getAttr('id')
fs = stanza.getTag('file-sharing')
url = fs.getTag('sources').getTag('url-data').getAttr('target')
desc = body(fs.getTag('file'), 'desc')
jid = stanza.getAttr('from').bare
properties.sticker = {
'pack_id': pack_id,
'url': url,
'desc': desc,
'account': self._account,
'from': jid,
}
# Download the sticker only if we don't have it and the user enabled
# automatic download
if (not self._sticker_storage.has_sticker_pack(pack_id) and
self._plugin.should_download_sticker_pack() and
not self._sticker_storage.is_in_progress(pack_id)):
# TODO: Add a callback for the rest
self.request_sticker_pack(pack_id, jid)
def _download_sticker_pack(self, pack, publish=False):
def download_callback(session, msg, data):
# pylint: disable=unused-argument
fname = os.path.basename(data.sticker.url)
fpath = sticker_path(pack.id_, fname)
with open(fpath, 'wb') as file_:
file_.write(msg.props.response_body_data.get_data())
# Update the sticker
sticker = self._sticker_storage.find_sticker(pack.id_,
lambda x: x.url == data.sticker.url)
pixbuf = load_sticker_pixbuf(fname,
pack.id_,
sticker.type,
self._plugin.sticker_width,
self._plugin.show_animations)
self._sticker_storage.replace_sticker_attrs(pack.id_,
lambda x: x.url == data.sticker.url,
filename=fname,
pixbuf=pixbuf,
url=msg.get_uri().to_string(False))
if not verify_sticker_hashes(sticker, msg.props.response_body_data.get_data()):
log.warning('One or more sticker hashes mismatch')
if data.finished:
self._sticker_storage.remove_in_progress(pack.id_)
self._plugin.process_sticker_mentions(pack.id_)
self._sticker_storage.save_sticker_pack(pack.id_)
# Publish for each account, if wanted
if publish:
pack_ = self._sticker_storage.get_sticker_pack(pack.id_)
for account in app.connections:
app.connections[account].get_module('Stickers').publish_pack(pack_,
upload=False)
# Finally, update the model
# TODO: Maybe only after all accounts have the pack published?
self._sticker_storage.add_sticker_pack_to_model(pack.id_)
self._sticker_storage.add_sticker_pack(pack, update_model=False)
# First, some checks
pack_path = os.path.join(sticker_data_path(), pack.id_)
if detect_path_escape(sticker_data_path(), pack_path):
log.error('Path escape detected: Attempted path "%s"', pack_path)
log.error('Not proceeding to download!')
return
# Create the sticker pack directory
if not os.path.exists(pack_path):
os.mkdir(pack_path)
else:
log.warning('Sticker pack path %s already exists. Stickers will be overwritten', pack_path)
# Download the stickers
# NOTE: Maybe some stickers do not have a URL, so we don't even bother with them
downloadable_stickers = list(filter(lambda x: x.url, pack.stickers))
if not downloadable_stickers:
log.error('No downloadable stickers found for %s', pack.id_)
return
self._sticker_storage.add_in_progress(pack.id_)
log.debug('Downloading stickers of stickerpack %s', pack.id_)
for i, sticker in enumerate(downloadable_stickers):
msg = Soup.Message.new('GET', sticker.url)
finished = i == (len(downloadable_stickers) - 1)
self._session.queue_message(msg,
download_callback,
DownloadCallbackData(sticker, pack, finished))
@property
def _sticker_packs(self):
def download_sticker_pack(self, id_, jid):
'''
For quick access to the plugins sticker_packs dict
A wrapper around internal functions. Requests, downloads, processes
and publishes a sticker pack.
'''
return self._plugin.sticker_packs
def request_sticker_packs(self, jid=None):
def on_sticker_pack_received(pack, error):
if error:
dialog = ErrorDialog(_('Sticker Pack Download'),
_(error))
self._sticker_storage.remove_in_progress(id_)
return
self._download_sticker_pack(pack, publish=True)
self._sticker_storage.add_in_progress(id_)
self.request_sticker_pack(id_,
callback=on_sticker_pack_received,
jid=jid)
def on_signed_in(self):
def got_sticker_packs(packs):
# We now received all sticker packs __we__ published on
# the PubSub node
ids_remote = [x.id_ for x in packs]
ids_local = self._sticker_storage.sticker_pack_ids()
non_local_ids = set(ids_remote).difference(ids_local)
non_remote_ids = set(ids_local).difference(ids_remote)
if self._plugin.upload_new_signin:
log.debug('Non-local: %s, Non-remote: %s',
non_local_ids,
non_remote_ids)
for local_id in non_remote_ids:
pack = self._sticker_storage.get_sticker_pack(local_id)
log.debug('Uploading pack %s', local_id)
# Don't upload files if all stickers already have a URL
upload = not pack.uploaded
log.debug('Uploading via HTTP Upload: %s', upload)
self.publish_pack(pack, upload=upload)