Skip to content
Snippets Groups Projects
Commit 9bc1a686 authored by Daniel Brötzmann's avatar Daniel Brötzmann
Browse files

refactor: HistorySyncAssistant: Port to Gajim’s Assistant class

Fixes #10763
parent 9c80e5ff
No related branches found
No related tags found
No related merge requests found
......@@ -12,285 +12,291 @@
# You should have received a copy of the GNU General Public License
# along with Gajim. If not, see <http://www.gnu.org/licenses/>.
from __future__ import annotations
from typing import cast
from typing import Optional
import logging
from enum import IntEnum
from datetime import datetime, timedelta
from datetime import datetime
from datetime import timedelta
from gi.repository import Gtk
from gi.repository import GLib
from nbxmpp.errors import StanzaError
from nbxmpp.errors import MalformedStanzaError
from nbxmpp.task import Task
from gajim.common import app
from gajim.common import ged
from gajim.common.i18n import _
from gajim.common.const import ArchiveState
from gajim.common.events import ArchivingIntervalFinished
from gajim.common.events import RawMamMessageReceived
from gajim.common.helpers import event_filter
from .assistant import Assistant
from .assistant import Page
from .assistant import SuccessPage
from .util import load_icon_surface
from .util import EventHelper
log = logging.getLogger('gajim.gui.history_sync')
class Pages(IntEnum):
TIME = 0
SYNC = 1
SUMMARY = 2
class HistorySyncAssistant(Gtk.Assistant, EventHelper):
def __init__(self, account):
Gtk.Assistant.__init__(self)
class HistorySyncAssistant(Assistant, EventHelper):
def __init__(self, account: str) -> None:
Assistant.__init__(self, width=600, transient_for=app.window)
EventHelper.__init__(self)
self.set_application(app.app)
self.set_position(Gtk.WindowPosition.CENTER)
self.set_name('HistorySyncAssistant')
self.set_default_size(300, -1)
self.set_resizable(False)
self.set_transient_for(app.window)
self.account = account
self.con = app.connections[self.account]
self.timedelta = None
self.now = datetime.utcnow()
self.query_id = None
self.start = None
self.end = None
self.next = None
self._hide_buttons()
self._client = app.get_client(account)
own_jid = self.con.get_own_jid().bare
self._timedelta: Optional[timedelta] = None
self._now = datetime.utcnow()
self._query_id: Optional[str] = None
self._start: Optional[datetime] = None
self._end: Optional[datetime] = None
mam_start = ArchiveState.NEVER
archive = app.storage.archive.get_archive_infos(own_jid)
archive = app.storage.archive.get_archive_infos(
self._client.get_own_jid().bare)
if archive is not None and archive.oldest_mam_timestamp is not None:
mam_start = int(float(archive.oldest_mam_timestamp))
if mam_start == ArchiveState.NEVER:
self.current_start = self.now
self._current_start = self._now
elif mam_start == ArchiveState.ALL:
self.current_start = datetime.utcfromtimestamp(0)
self._current_start = datetime.utcfromtimestamp(0)
else:
self.current_start = datetime.fromtimestamp(mam_start)
self._current_start = datetime.fromtimestamp(mam_start)
self.add_button('synchronize', _('Synchronize'), 'suggested-action')
self.add_button('close', _('Close'))
self.set_button_visible_func(self._visible_func)
self.select_time = SelectTimePage(self)
self.append_page(self.select_time)
self.set_page_type(self.select_time, Gtk.AssistantPageType.INTRO)
self.add_pages({
'select': SelectTime(self._now, self._current_start),
'progress': Progress(),
})
self.download_history = DownloadHistoryPage(self)
self.append_page(self.download_history)
self.set_page_type(self.download_history,
Gtk.AssistantPageType.PROGRESS)
self.set_page_complete(self.download_history, True)
self.add_default_page('success')
success_page = cast(SuccessPage, self.get_page('success'))
success_page.set_title(_('Synchronize Chat History'))
success_page.set_heading(_('Finished'))
self.summary = SummaryPage(self)
self.append_page(self.summary)
self.set_page_type(self.summary, Gtk.AssistantPageType.SUMMARY)
self.set_page_complete(self.summary, True)
self.connect('button-clicked', self._on_button_clicked)
# pylint: disable=line-too-long
self.register_events([
('archiving-count-received', ged.GUI1, self._received_count),
('archiving-interval-finished', ged.GUI1, self._received_finished),
('raw-mam-message-received', ged.PRECORE, self._nec_mam_message_received),
('raw-mam-message-received', ged.PRECORE, self._mam_message_received),
])
# pylint: enable=line-too-long
self.connect('prepare', self._on_page_change)
self.connect('cancel', self._on_close_clicked)
self.connect('close', self._on_close_clicked)
if mam_start == ArchiveState.ALL:
self.set_current_page(Pages.SUMMARY)
self.summary.nothing_to_do()
success_page.set_text(
_('Gajim is fully synchronised with the archive.'))
self.show_page('success')
self.show_all()
self.set_title(_('Synchronise History'))
def _hide_buttons(self):
'''
Hide some of the standard buttons that are included in Gtk.Assistant
'''
if self.get_property('use-header-bar'):
action_area = self.get_children()[1]
else:
box = self.get_children()[0]
content_box = box.get_children()[1]
action_area = content_box.get_children()[1]
for button in action_area.get_children():
button_name = Gtk.Buildable.get_name(button)
if button_name == 'back':
button.connect('show', self._on_show_button)
elif button_name == 'forward':
self.next = button
button.connect('show', self._on_show_button)
@staticmethod
def _on_show_button(button):
button.hide()
def _visible_func(_assistant: Assistant, page_name: str) -> list[str]:
if page_name == 'select':
return ['close', 'synchronize']
def _prepare_query(self):
if self.timedelta:
self.start = self.now - self.timedelta
self.end = self.current_start
if page_name == 'progress':
return ['close']
log.info('Get mam_start_date: %s', self.current_start)
log.info('Now: %s', self.now)
log.info('Start: %s', self.start)
log.info('End: %s', self.end)
if page_name == 'success':
return ['close']
jid = self.con.get_own_jid().bare
raise ValueError(f'page {page_name} unknown')
self.con.get_module('MAM').make_query(jid,
start=self.start,
end=self.end,
max_=0,
callback=self._received_count)
def _on_button_clicked(self,
_assistant: Assistant,
button_name: str
) -> None:
if button_name == 'synchronize':
self._prepare_query()
self.show_page('progress', Gtk.StackTransitionType.SLIDE_LEFT)
return
def _received_count(self, task):
if button_name == 'close':
self.destroy()
def _prepare_query(self) -> None:
select_time_page = cast(SelectTime, self.get_page('select'))
self._timedelta = select_time_page.get_timedelta()
if self._timedelta is not None:
self._start = self._now - self._timedelta
self._end = self._current_start
log.info('Get mam_start_date: %s', self._current_start)
log.info('Now: %s', self._now)
log.info('Start: %s', self._start)
log.info('End: %s', self._end)
self._client.get_module('MAM').make_query(
self._client.get_own_jid().bare,
start=self._start,
end=self._end,
max_=0,
callback=self._received_count)
def _received_count(self, task: Task) -> None:
try:
result = task.finish()
except (StanzaError, MalformedStanzaError):
return
if result.rsm.count is not None:
self.download_history.count = int(result.rsm.count)
self.query_id = self.con.get_module('MAM').request_archive_interval(
self.start, self.end)
progress_page = cast(Progress, self.get_page('progress'))
progress_page.set_count(int(result.rsm.count))
mam_module = self._client.get_module('MAM')
self._query_id = mam_module.request_archive_interval(
self._start, self._end)
@event_filter(['account'])
def _received_finished(self, event):
if event.query_id != self.query_id:
def _received_finished(self, event: ArchivingIntervalFinished) -> None:
if event.query_id != self._query_id:
return
self.query_id = None
self._query_id = None
log.info('Query finished')
GLib.idle_add(self.download_history.finished)
self.set_current_page(Pages.SUMMARY)
self.summary.finished()
progress_page = cast(Progress, self.get_page('progress'))
GLib.idle_add(progress_page.set_finished)
received_count = progress_page.get_received_count()
success_page = cast(SuccessPage, self.get_page('success'))
success_page.set_text(_('Finished synchronising chat history:\n'
'%s messages downloaded') % received_count)
self.show_page('success')
@event_filter(['account'])
def _nec_mam_message_received(self, event):
if self.query_id != event.properties.mam.query_id:
def _mam_message_received(self, event: RawMamMessageReceived) -> None:
if self._query_id != event.properties.mam.query_id:
return
log.debug('Received message')
GLib.idle_add(self.download_history.set_fraction)
progress_page = cast(Progress, self.get_page('progress'))
GLib.idle_add(progress_page.set_fraction)
def on_row_selected(self, _listbox, row):
self.timedelta = row.get_child().get_delta()
if row:
self.set_page_complete(self.select_time, True)
else:
self.set_page_complete(self.select_time, False)
def _on_page_change(self, _assistant, page):
if page == self.download_history:
self.next.hide()
self._prepare_query()
self.set_title(_('Synchronise History'))
class SelectTime(Page):
def __init__(self,
now: datetime,
current_start: datetime
) -> None:
Page.__init__(self)
self.title = _('Synchronize Chat History')
def _on_close_clicked(self, *args):
self.destroy()
self.complete = False
self._timedelta: Optional[timedelta] = None
heading = Gtk.Label()
heading.get_style_context().add_class('large-header')
heading.set_max_width_chars(30)
heading.set_line_wrap(True)
heading.set_halign(Gtk.Align.CENTER)
heading.set_justify(Gtk.Justification.CENTER)
heading.set_text(_('Synchronize Chat History'))
class SelectTimePage(Gtk.Box):
def __init__(self, assistant):
super().__init__(orientation=Gtk.Orientation.VERTICAL)
self.set_spacing(18)
self.assistant = assistant
label = Gtk.Label(
label=_('How far back should the chat history be synchronised?'))
label.set_halign(Gtk.Align.CENTER)
label.set_line_wrap(True)
label.set_max_width_chars(40)
listbox = Gtk.ListBox()
listbox.set_hexpand(False)
listbox.set_halign(Gtk.Align.CENTER)
listbox.add(TimeOption(_('One Month'), 1))
listbox.add(TimeOption(_('Three Months'), 3))
listbox.add(TimeOption(_('One Year'), 12))
listbox.add(TimeOption(_('One Month'), timedelta(days=30)))
listbox.add(TimeOption(_('Three Months'), timedelta(days=90)))
listbox.add(TimeOption(_('One Year'), timedelta(days=365)))
listbox.add(TimeOption(_('Everything')))
listbox.connect('row-selected', assistant.on_row_selected)
listbox.connect('row-selected', self._on_row_selected)
for row in listbox.get_children():
option = row.get_child()
if not option.get_delta():
for row in cast(list[TimeOption], listbox.get_children()):
delta = row.get_timedelta()
if delta is None:
continue
if assistant.now - option.get_delta() > assistant.current_start:
if now - delta > current_start:
row.set_activatable(False)
row.set_selectable(False)
self.pack_start(label, True, True, 0)
self.pack_start(listbox, False, False, 0)
self.add(heading)
self.add(label)
self.add(listbox)
self.show_all()
class DownloadHistoryPage(Gtk.Box):
def __init__(self, assistant):
super().__init__(orientation=Gtk.Orientation.VERTICAL)
self.set_spacing(18)
self.assistant = assistant
self.count = 0
self.received = 0
def _on_row_selected(self, _listbox: Gtk.ListBox, row: TimeOption) -> None:
self._timedelta = row.get_timedelta()
self.complete = row is not None
self.update_page_complete()
surface = load_icon_surface('folder-download-symbolic', size=64)
image = Gtk.Image.new_from_surface(surface)
def get_timedelta(self) -> Optional[timedelta]:
return self._timedelta
self.progress = Gtk.ProgressBar()
self.progress.set_show_text(True)
self.progress.set_text(_('Connecting...'))
self.progress.set_pulse_step(0.1)
self.progress.set_vexpand(True)
self.progress.set_valign(Gtk.Align.CENTER)
self.pack_start(image, False, False, 0)
self.pack_start(self.progress, False, False, 0)
def set_fraction(self):
self.received += 1
if self.count:
self.progress.set_fraction(self.received / self.count)
self.progress.set_text(_('%(received)s of %(max)s') % {
'received': self.received, 'max': self.count})
else:
self.progress.pulse()
self.progress.set_text(_('Downloaded %s messages') % self.received)
def get_default_button(self) -> str:
return 'synchronize'
def finished(self):
self.progress.set_fraction(1)
class Progress(Page):
def __init__(self) -> None:
Page.__init__(self)
self.title = _('Synchronizing Chat History…')
class SummaryPage(Gtk.Box):
def __init__(self, assistant):
super().__init__(orientation=Gtk.Orientation.VERTICAL)
self.set_spacing(18)
self.assistant = assistant
self._count = 0
self._received = 0
self.label = Gtk.Label()
self.label.set_name('FinishedLabel')
self.label.set_valign(Gtk.Align.CENTER)
surface = load_icon_surface('folder-download-symbolic', size=64)
image = Gtk.Image.new_from_surface(surface)
self._progress_bar = Gtk.ProgressBar()
self._progress_bar.set_show_text(True)
self._progress_bar.set_text(_('Connecting...'))
self._progress_bar.set_pulse_step(0.1)
self._progress_bar.set_vexpand(True)
self._progress_bar.set_valign(Gtk.Align.CENTER)
self.add(image)
self.add(self._progress_bar)
self.show_all()
def set_fraction(self) -> None:
self._received += 1
if self._count:
self._progress_bar.set_fraction(self._received / self._count)
self._progress_bar.set_text(
_('%(received)s of %(max)s') % {
'received': self._received,
'max': self._count})
else:
self._progress_bar.pulse()
self._progress_bar.set_text(
_('Downloaded %s messages') % self._received)
self.pack_start(self.label, True, True, 0)
def set_count(self, count: int) -> None:
self._count = count
def finished(self):
received = self.assistant.download_history.received
self.label.set_text(_('Finished synchronising chat history:\n'
'%s messages downloaded') % received)
def set_finished(self) -> None:
self._progress_bar.set_fraction(1)
def nothing_to_do(self):
self.label.set_text(_('Gajim is fully synchronised with the archive.'))
def get_received_count(self) -> int:
return self._received
def query_already_running(self):
self.label.set_text(_('There is already a synchronisation in '
'progress. Please try again later.'))
class TimeOption(Gtk.ListBoxRow):
def __init__(self, text: str, months: Optional[timedelta] = None) -> None:
Gtk.ListBoxRow.__init__(self)
label = Gtk.Label(label=text)
self.add(label)
class TimeOption(Gtk.Label):
def __init__(self, label, months=None):
super().__init__(label=label)
self.date = months
if months:
self.date = timedelta(days=30 * months)
self._timedelta = months
def get_delta(self):
return self.date
def get_timedelta(self) -> Optional[timedelta]:
return self._timedelta
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment