Skip to content
Snippets Groups Projects

New plugin: Stickers

Open Alexander requested to merge PapaTutuWawa/gajim-plugins:feat/stickers-plugin into master
7 unresolved threads
+ 656
0
#
# Copyright (C) 2020 Alexander "PapaTutuWawa" <papatutuwawa AT polynom.me>
# This file is part of the Stickers plugin for Gajim.
#
# Gajim is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published
# by the Free Software Foundation; version 3 only.
#
# Gajim is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with Gajim. If not, see <http://www.gnu.org/licenses/>.
#
'''
Send stickers according to XEP-0449 version 0.1.0.
:author: Alexander "PapaTutuWawa" <papatutuwawa@polynom.me>
:since: 28 July 2020
:copyright: Copyright (2020) Alexander "PapaTutuWawa" <papatutuwawa@polynom.me>
:license: GPL
'''
from collections import namedtuple
import json
import os
import logging
from gi.repository import Gtk
from gi.repository import GdkPixbuf
from nbxmpp.namespaces import Namespace
from nbxmpp.simplexml import Node
from gajim.chat_control import ChatControl
from gajim.groupchat_control import GroupchatControl
from gajim.common import app
from gajim.common import ged
from gajim.common.structs import OutgoingMessage
from gajim.gui.dialogs import ConfirmationDialog
from gajim.gui.dialogs import InformationDialog
from gajim.gui.dialogs import DialogButton
from gajim.plugins import GajimPlugin
from gajim.plugins.helpers import get_builder
from gajim.plugins.plugins_i18n import _
from stickers.modules import stickers_module
from stickers.gtk.config import StickersConfigDialog
from stickers.gtk.utils import load_sticker_pixbuf
from stickers.gtk.models import FilterListModel
from stickers.gtk.stickers import StickerStorage
from stickers.utils import sticker_data_path
from stickers.utils import dict_append_or_set
from stickers.utils import print_widget
from stickers.common import parse_hashes
from stickers.common import sticker_id
from stickers.common import Sticker
from stickers.common import Hash
from stickers.common import StickerPack
log = logging.getLogger('gajim.p.stickers')
StickerMention = namedtuple('StickerMention',
['start', 'end', 'desc', 'tv', 'id_'])
def image_from_pixbuf(pixbuf):
'''
This is a wrapper function to create a Gtk.Image from a given Pixbuf, not
caring whether it's an animation or a static Pixbuf
'''
# NOTE: We only return one of these two, so it should be fine
if isinstance(pixbuf, GdkPixbuf.Pixbuf):
return Gtk.Image.new_from_pixbuf(pixbuf)
#elif isinstance(pixbuf, GdkPixbuf.PixbufSimpleAnim):
return Gtk.Image.new_from_animation(pixbuf)
class StickersPlugin(GajimPlugin):
def init(self):
self.controls = {}
self.sticker_mentions = {} # Requested stickers that are mentioned
self._sticker_storage = StickerStorage()
self.description = _('Send stickers. Note that stickers are currently always sent unencrypted. Requires python-pillow.')
self.config_default_values = {
'DOWNLOAD_NEW': (True, 'Request new stickers'),
'DOWNLOAD_NEW_SIGNIN': (True, 'Download new stickers'),
'UPLOAD_NEW_SIGNIN': (True, 'Upload new stickers'),
'STICKER_WIDTH': (100, 'Sticker width'),
'SHOW_ANIMATED_STICKERS': (True, 'Show animated stickers'),
'STICKER_ACCESS_MODEL': ('open', 'Access model of the stickers node')
}
self.config_dialog = StickersConfigDialog(self)
self.events_handlers = {
'signed-in': (
ged.PREGUI, self._on_signed_in),
'decrypted-message-received': (
ged.PREGUI, self._on_message_received),
'gc-message-received': (
ged.PREGUI, self._on_message_received),
}
self.gui_extension_points = {
'chat_control': (self.connect_with_chat_control,
self.disconnect_from_chat_control),
'groupchat_control': (self.connect_with_chat_control,
self.disconnect_from_chat_control),
'print_real_text': (self._print_real_text, None),
}
self.modules = [stickers_module]
self._load_sticker_packs()
self._load_icon()
def _on_message_received(self, event):
# TODO: This is a hack to get this data from the module into the
# event's additional_data.
if hasattr(event.properties, 'sticker'):
event.additional_data['sticker'] = event.properties.sticker
def reload_sticker_packs(self):
'''
Searches again for sticker packs and loads them.
'''
log.debug('Reloading sticker packs')
# To prevent duplicate sticker packs, first remove them all
self._sticker_storage.get_model().remove_all()
self._load_sticker_packs()
def retract_sticker_pack(self, id_):
'''
Starts the process to remove sticker packs from both
the accounts' PubSub nodes and the disk.
id_: The ID of the sticker pack to remove
'''
accounts = list(app.connections.keys())
def on_retract(account):
accounts.remove(account)
if accounts:
return
self._sticker_storage.delete_sticker_pack(id_)
self._propagate_sticker_retraction(id_)
for account in app.connections:
app.connections[account].get_module('Stickers').retract_sticker_pack(id_, on_retract)
def upload_sticker_pack(self, pack_id, on_use=False):
'''
Asks the user if the sticker pack should be uploaded.
pack_id: The ID of the sticker pack in question
on_use: Whether this function is called from the context of the attempt
to send a sticker (True).
'''
def on_confirm():
log.debug('Manually uploading sticker pack %s', pack_id)
pack = self._sticker_storage.get_sticker_pack(pack_id)
for account in app.connections:
app.connections[account].get_module('Stickers').publish_pack(pack, upload=True)
if on_use:
text = _('Sending this sticker requires the upload of the sticker pack. Are you sure you want to upload the sticker pack?')
else:
text = _('Are you sure you want to upload the sticker pack?')
ConfirmationDialog(
_('Upload Sticker Pack'),
text,
_('This will also publish the sticker pack on your account(s).'),
[DialogButton.make('Cancel'),
DialogButton.make('Accept',
callback=on_confirm)]).show()
def _load_icon(self):
# pylint: disable=attribute-defined-outside-init
icon_path = self.local_file_path('stickers.png')
pixbuf = GdkPixbuf.Pixbuf.new_from_file_at_scale(icon_path,
24,
24,
True)
self._button_icon_pixbuf = pixbuf
@property
def should_download_sticker_pack(self):
'''
Returns True when the user allows requesting new stickers. False
otherwise.
'''
return self.config['DOWNLOAD_NEW']
@property
def sticker_width(self):
'''
Returns the preferred sticker width
'''
return self.config['STICKER_WIDTH']
@property
def show_animations(self):
'''
Returns True when animated stickers should be shown as animations.
False otherwise.
'''
return self.config['SHOW_ANIMATED_STICKERS']
@property
def upload_new_signin(self):
return self.config['UPLOAD_NEW_SIGNIN']
@property
def download_new_signin(self):
return self.config['UPLOAD_NEW_SIGNIN']
def _on_signed_in(self, event):
if not (self.upload_new_signin or self.download_new_signin):
log.info('Signin up- and download disabled. Skipping self-check')
return
app.connections[event.account].get_module('Stickers').on_signed_in()
def _load_sticker_packs(self):
path = sticker_data_path()
if not os.path.exists(path):
os.mkdir(path)
def filter_path(path, filter_):
'''
Filter out paths based on the function f. Ignores
., / and returns an array of full paths.
'''
tmp = []
for item in os.listdir(path):
full_path = os.path.join(path, item)
if (filter_(full_path) and
not item.startswith('.') and
not item.startswith('/')):
tmp.append(full_path)
return tmp
sticker_packs = filter_path(path, os.path.isdir)
for pack in sticker_packs:
self._load_sticker_pack(pack)
def _load_sticker_pack(self, path):
'''
Load a sticker pack
'''
if not os.path.exists(os.path.join(path, 'info.json')):
log.debug('%s not a sticker pack', path)
return
try:
with open(os.path.join(path, 'info.json'), 'r') as file_:
info = json.loads(file_.read())
except Exception as err:
log.error('Failed to load sticker pack %s: %s', path, err)
return
id_ = info['id']
stickers = {}
for sticker in info['stickers']:
pixbuf = load_sticker_pixbuf(sticker['filename'],
id_,
sticker['type'],
self.sticker_width,
self.show_animations)
hashes = parse_hashes(sticker['hashes'])
sid = sticker_id(hashes)
stickers[sid] = Sticker(type=sticker['type'],
hashes=hashes,
size=sticker['size'],
dimension=sticker['dimension'],
desc=sticker['desc'],
suggests=sticker['suggests'],
filename=sticker['filename'],
url=sticker['url'],
pixbuf=pixbuf,
id_=sid)
self._sticker_storage.add_sticker_pack(StickerPack(id_=id_,
hash=Hash(algo=info['hash']['algo'],
value=info['hash']['value']),
name=info['name'],
summary=info['summary'],
stickers=stickers))
log.debug('Sticker pack %s loaded', info['name'])
def process_sticker_mentions(self, pack_id):
'''
Replaces all mentions of stickers within the sticker pack with the ID
of pack_id with the corresponding image. Does nothing if no mentions
for this sticker pack exist.
'''
if pack_id not in self.sticker_mentions:
return
pack = self._sticker_storage.get_sticker_pack(pack_id)
for mention in self.sticker_mentions[pack.id_]:
sticker = self._sticker_storage.get_sticker(pack_id, mention.id_)
buf = mention.start.get_buffer()
self._print_sticker(buf.get_iter_at_mark(mention.start),
mention.tv,
sticker,
mention.end)
del self.sticker_mentions[pack.id_]
def _propagate_sticker_retraction(self, pack_id):
for btn in self.controls.values():
btn.remove_sticker_pack(pack_id)
# Taken from url_image_preview
@staticmethod
def _print_text(buffer_, iter_, text):
if not iter_:
iter_ = buffer_.get_end_iter()
start_mark = buffer_.create_mark(None, iter_, True)
buffer_.insert(iter_, text, len(text))
end_mark = buffer_.create_mark(None, iter_, True)
return start_mark, end_mark
def _on_manual_sticker_download(self, pack_id, jid, account):
def download_sticker_pack():
log.debug('Manually requested download of sticker pack %s from %s',
pack_id,
jid)
app.connections[account].get_module('Stickers').download_sticker_pack(pack_id, jid)
ConfirmationDialog(
_('Download Sticker Pack'),
_('Are you sure you want to do download this sticker pack?'),
_('This will create files on your computer and publish it on your account.'),
[DialogButton.make('Cancel'),
DialogButton.make('Accept',
callback=download_sticker_pack)]).show()
def _print_non_download_mention(self, iter_, tv, pack_id, jid, account, text):
'''
Print a button to allow manual downloading of a sticker pack. Returns
the start and end marks of the button.
iter_: The TextView's iterator
tv: The TextView
pack_id: The ID of the sticker pack
jid: The bare JID of the sender
account: The name of the account that received the message
text: The text of the message
'''
button = Gtk.Button.new_with_label(text)
button.connect('clicked', lambda x: self._on_manual_sticker_download(pack_id, jid, account))
button.set_tooltip_text('Download this sticker pack')
start_mark, end_mark = print_widget(button, iter_, tv, True)
if tv.autoscroll:
tv.scroll_to_end()
return start_mark, end_mark
def _print_sticker(self, iter_, tv, sticker, end=None):
'''
Print a sticker.
iter_: The TextView's iterator
tv: The TextView
sticker: The Sticker object
end: If end is a TextMark, remove everything from iter_ to the end mark.
'''
img = image_from_pixbuf(sticker.pixbuf)
img.set_tooltip_text(sticker.desc)
img.show_all()
print_widget(img, iter_, tv, False)
if end:
buf = tv.tv.get_buffer()
buf.delete(iter_, buf.get_iter_at_mark(end))
if tv.autoscroll:
tv.scroll_to_end()
def _print_real_text(self, tv, text, _text_tags, _graphics, iter_, additional_data):
# It's there but not needed
# pylint: disable=unused-argument
if 'sticker' not in additional_data:
return
# Find the correct sticker pack
pack_id = additional_data['sticker']['pack_id']
sid = additional_data['sticker']['id']
if not self._sticker_storage.has_sticker_pack(pack_id):
if not self.should_download_sticker_pack:
from_ = additional_data['sticker']['from']
account = additional_data['sticker']['account']
start, end = self._print_non_download_mention(iter_,
tv,
pack_id,
from_,
account,
text)
else:
# We have requested it during _on_message_received
start, end = self._print_text(tv.get_buffer(),
iter_,
text)
dict_append_or_set(self.sticker_mentions,
pack_id,
StickerMention(start=start,
end=end,
desc=text,
tv=tv,
id_=sid))
tv.plugin_modified = True
log.debug('Sticker pack %s not found. Added mention', pack_id)
return
sticker = self._sticker_storage.get_sticker(pack_id, sid)
if not sticker:
self._print_text(tv.tv.get_buffer(), iter_, text)
log.warning('Sticker "%s" of "%s" not found!', text, pack_id)
return
self._print_sticker(iter_,
tv,
sticker)
def connect_with_chat_control(self, control):
# NOTE: Based on our gui_extension_points we *should*
# only get either ChatControl or GroupchatControl, so
# we *should* be fine.
if isinstance(control, GroupchatControl):
chat_type = 'groupchat'
elif isinstance(control, ChatControl):
chat_type = 'chat'
btn = StickersButton(app.connections[control.account],
control.contact,
control,
chat_type,
self.local_file_path,
self.upload_sticker_pack,
self._button_icon_pixbuf)
self.controls[control.control_id] = btn
actions_hbox = control.xml.get_object('hbox')
actions_hbox.pack_start(btn, False, False, 0)
actions_hbox.reorder_child(
btn, len(actions_hbox.get_children()) - 2)
btn.show()
def disconnect_from_chat_control(self, control):
btn = self.controls.get(control.control_id)
btn.destroy()
self.controls.pop(control.control_id, None)
class StickersButton(Gtk.Button):
def __init__(self, conn, contact, chat_control, chat_type, local_file_path, upload_sticker_pack, icon_pixbuf):
Gtk.Button.__init__(self)
icon = Gtk.Image.new_from_pixbuf(icon_pixbuf)
self.set_image(icon)
self.set_relief(Gtk.ReliefStyle.NONE)
self.set_tooltip_text('Stickers')
self._chat_control = chat_control
self._chat_type = chat_type
self._conn = conn
self._contact = contact
self._popover = None
self._model = FilterListModel(StickerStorage().get_model())
self._submodels = {} # Sticker pack ID -> FilterListModel
self._local_file_path = local_file_path
self._upload_sticker_pack = upload_sticker_pack
self._query = ''
self._model.set_filter_func(self._listbox_filter_func)
self._create_popover()
self.connect('clicked', self._on_clicked)
def remove_sticker_pack(self, id_):
'''
Since we keep an additional model per sticker pack, we need a way to
tell when we can dispose of it.
'''
log.debug('StickersButton: Removed submodel for %s', id_)
del self._submodels[id_]
def _create_sticker_button(self, sticker, *user_data):
sticker_pack = user_data[0]
img = image_from_pixbuf(sticker.pixbuf)
img.set_tooltip_text(sticker.desc)
img.show()
box = Gtk.Button.new()
box.add(img)
box.set_relief(Gtk.ReliefStyle.NONE)
box.connect('clicked',
self._send_sticker_lambda(sticker,
sticker_pack.id_,
sticker_pack.uploaded))
return box
def _create_listitem(self, sticker_pack, *user_data):
# pylint: disable=unused-argument
item = get_builder(self._local_file_path('gtk/popover_listitem.ui'))
item.pack_name.set_markup(f'<big>{sticker_pack.name}</big>')
model = FilterListModel(sticker_pack.stickers_model)
model.set_filter_func(self._flowbox_filter_func)
self._submodels[sticker_pack.id_] = model
item.stickers.bind_model(self._submodels[sticker_pack.id_],
self._create_sticker_button,
sticker_pack)
return item.sticker_pack_row
def _create_popover(self):
self._popover = Gtk.Popover.new()
scroll = Gtk.ScrolledWindow.new()
listbox = Gtk.ListBox.new()
listbox.set_selection_mode(Gtk.SelectionMode.NONE)
listbox.bind_model(self._model, self._create_listitem, None)
listbox.show_all()
listbox.set_vexpand(True)
self._listbox = listbox
scroll.add(listbox)
popover_box = Gtk.Box.new(Gtk.Orientation.VERTICAL, 0)
search = Gtk.SearchEntry.new()
search.connect('search-changed', self._on_search_changed)
popover_box.pack_start(search, False, True, 0)
popover_box.pack_end(scroll, True, True, 0)
self._popover.add(popover_box)
self._popover.set_size_request(150 * 4, 150 * 3)
def _on_search_changed(self, entry):
self._query = entry.get_text()
self._model.invalidate_filter()
def _flowbox_filter_func(self, sticker):
'''
Used to perform filtering on a single sticker pack
'''
return (not self._query or
any(x.startswith(self._query) for x in sticker.sticker_strings))
def _listbox_filter_func(self, pack):
'''
Used to perform filtering on all sticker packs
'''
if (not self._query or
any(x.startswith(self._query) for x in pack.sticker_strings)):
# NOTE: This function gets called the first time before we have
# set up the "submodels". In that case, we just ignore them.
if pack.id_ in self._submodels:
# Update the search within the sticker pack
self._submodels[pack.id_].invalidate_filter()
return True
return False
def _send_sticker_lambda(self, sticker, pack_id, uploaded):
'''
In order to react to clicks on stickers, we capture some metadata
in the closure.
sticker: The sticker that will be sent
pack_id: The ID of the sticker pack that the sticker belongs to
uploaded: True if the sticker pack has been uploaded
Returns a function (button -> None) that can be bound to the button's
'clicked' event. It will close the popover and send the sticker.
'''
def wrapper(argument):
# pylint: disable=unused-argument
if not uploaded:
self._upload_sticker_pack(pack_id, on_use=True)
return
self._popover.popdown()
self._send_sticker(sticker, pack_id)
return wrapper
def _send_sticker(self, sticker, pack_id):
sticker_node = Node(tag='sticker',
attrs={
'id': pack_id,
'xmlns': Namespace.STICKERS,
})
sfs_node = Node(tag='file-sharing',
attrs={
'xmlns': Namespace.SFS
})
file_node = Node(tag='file',
attrs={
'xmlns': Namespace.FILE_METADATA
})
file_node.setTagData('media-type', sticker.type)
file_node.setTagData('desc', sticker.desc)
file_node.setTagData('size', sticker.size)
file_node.setTagData('dimension', sticker.dimension)
for hash_ in sticker.hashes:
file_node.addChild('hash',
attrs={
'algo': hash_.algo,
},
namespace=Namespace.HASHES_2,
payload=[hash_.value])
sfs_node.addChild(node=file_node)
sources_node = Node(tag='sources')
sources_node.addChild(name='url-data',
namespace=Namespace.URL_DATA,
attrs={
'target': sticker.url
})
sfs_node.addChild(node=sources_node)
outgoing = OutgoingMessage(account=self._chat_control.account,
contact=self._contact,
message=sticker.desc,
type_=self._chat_type,
nodes=[sticker_node, sfs_node])
outgoing.additional_data['sticker'] = {
'pack_id': pack_id,
'url': sticker.url,
'desc': sticker.desc,
'id': sticker.id_
}
self._conn.send_message(outgoing)
def _on_clicked(self, btn):
# pylint: disable=unused-argument
# TODO: Remove once we can send <sticker> and <file-sharing> with
# encryption.
if self._chat_control.encryption:
InformationDialog(_('Cannot Send Stickers'),
_('Stickers are unavailable when using encryption.')).show()
return
self._popover.set_relative_to(self)
self._popover.show_all()
self._popover.popup()
Loading