pgp_legacy.py 10.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
# Copyright (C) 2019 Philipp Hörist <philipp AT hoerist.com>
#
# This file is part of the PGP Gajim Plugin.
#
# PGP Gajim Plugin is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published
# by the Free Software Foundation; version 3 only.
#
# PGP Gajim Plugin is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with PGP Gajim Plugin. If not, see <http://www.gnu.org/licenses/>.

import os
import time
import threading

import nbxmpp
Philipp Hörist's avatar
Philipp Hörist committed
22
from nbxmpp.namespaces import Namespace
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
from nbxmpp.structs import StanzaHandler
from gi.repository import GLib

from gajim.common import app
from gajim.common.nec import NetworkEvent
from gajim.common.const import EncryptionData
from gajim.common.modules.base import BaseModule

from gajim.plugins.plugins_i18n import _

from pgp.backend.python_gnupg import PGP
from pgp.modules.util import prepare_stanza
from pgp.backend.store import KeyStore
from pgp.exceptions import SignError
from pgp.exceptions import KeyMismatch
from pgp.exceptions import NoKeyIdFound


# Module name
name = 'PGPLegacy'
zeroconf = True

Philipp Hörist's avatar
Philipp Hörist committed
45 46 47 48 49 50 51 52 53 54 55 56
ALLOWED_TAGS = [('request', Namespace.RECEIPTS),
                ('active', Namespace.CHATSTATES),
                ('gone', Namespace.CHATSTATES),
                ('inactive', Namespace.CHATSTATES),
                ('paused', Namespace.CHATSTATES),
                ('composing', Namespace.CHATSTATES),
                ('no-store', Namespace.HINTS),
                ('store', Namespace.HINTS),
                ('no-copy', Namespace.HINTS),
                ('no-permanent-store', Namespace.HINTS),
                ('replace', Namespace.CORRECT),
                ('origin-id', Namespace.SID),
57 58 59 60 61 62 63 64 65 66
                ]


class PGPLegacy(BaseModule):
    def __init__(self, con):
        BaseModule.__init__(self, con, plugin=True)

        self.handlers = [
            StanzaHandler(name='message',
                          callback=self._message_received,
Philipp Hörist's avatar
Philipp Hörist committed
67
                          ns=Namespace.ENCRYPTED,
68 69 70
                          priority=9),
            StanzaHandler(name='presence',
                          callback=self._on_presence_received,
Philipp Hörist's avatar
Philipp Hörist committed
71
                          ns=Namespace.SIGNED,
72 73 74 75 76 77
                          priority=48),
        ]

        self.own_jid = self._con.get_own_jid()

        self._pgp = PGP()
78 79
        self._store = KeyStore(self._account, self.own_jid, self._log,
                               self._pgp.list_keys)
80
        self._always_trust = []
81
        self._presence_fingerprint_store = {}
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103

    @property
    def pgp_backend(self):
        return self._pgp

    def set_own_key_data(self, *args, **kwargs):
        return self._store.set_own_key_data(*args, **kwargs)

    def get_own_key_data(self, *args, **kwargs):
        return self._store.get_own_key_data(*args, **kwargs)

    def set_contact_key_data(self, *args, **kwargs):
        return self._store.set_contact_key_data(*args, **kwargs)

    def get_contact_key_data(self, *args, **kwargs):
        return self._store.get_contact_key_data(*args, **kwargs)

    def has_valid_key_assigned(self, jid):
        key_data = self.get_contact_key_data(jid)
        if key_data is None:
            return False
        key_id = key_data['key_id']
104 105 106

        announced_fingerprint = self._presence_fingerprint_store.get(jid)
        if announced_fingerprint is None:
107
            return True
108 109

        if announced_fingerprint == key_id:
110
            return True
111 112

        raise KeyMismatch(announced_fingerprint)
113 114 115 116

    def _on_presence_received(self, _con, _stanza, properties):
        if properties.signed is None:
            return
Philipp Hörist's avatar
Philipp Hörist committed
117
        jid = properties.jid.bare
118

119 120 121 122
        fingerprint = self._pgp.verify(properties.status, properties.signed)
        if fingerprint is None:
            self._log.info('Presence from %s was signed but no corresponding '
                           'key was found', jid)
123 124
            return

125 126 127
        self._presence_fingerprint_store[jid] = fingerprint
        self._log.info('Presence from %s was verified successfully, '
                       'fingerprint: %s', jid, fingerprint)
128 129

        key_data = self.get_contact_key_data(jid)
130 131
        if key_data is None:
            self._log.info('No key assigned for contact: %s', jid)
132 133
            return

134 135 136 137 138
        if key_data['key_id'] != fingerprint:
            self._log.warning('Fingerprint mismatch, '
                              'Presence was signed with fingerprint: %s, '
                              'Assigned key fingerprint: %s',
                              fingerprint, key_data['key_id'])
139 140 141 142 143 144
            return

    def _message_received(self, _con, stanza, properties):
        if not properties.is_pgp_legacy or properties.from_muc:
            return

Philipp Hörist's avatar
Philipp Hörist committed
145
        from_jid = properties.jid.bare
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
        self._log.info('Message received from: %s', from_jid)

        payload = self._pgp.decrypt(properties.pgp_legacy)
        prepare_stanza(stanza, payload)

        properties.encrypted = EncryptionData({'name': 'PGP'})

    def encrypt_message(self, con, event, callback):
        if not event.message:
            callback(event)
            return

        to_jid = app.get_jid_without_resource(event.jid)
        try:
            key_id, own_key_id = self._get_key_ids(to_jid)
        except NoKeyIdFound as error:
            self._log.warning(error)
            return

        always_trust = key_id in self._always_trust
        self._encrypt(con, event, [key_id, own_key_id], callback, always_trust)

    def _encrypt(self, con, event, keys, callback, always_trust):
        result = self._pgp.encrypt(event.message, keys, always_trust)
        encrypted_payload, error = result
        if error:
            self._handle_encrypt_error(con, error, event, keys, callback)
            return

        self._cleanup_stanza(event)
176
        self._create_pgp_legacy_message(event.stanza, encrypted_payload)
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213

        event.xhtml = None
        event.encrypted = 'PGP'
        event.additional_data['encrypted'] = {'name': 'PGP'}

        callback(event)

    def _handle_encrypt_error(self, con, error, event, keys, callback):
        if error.startswith('NOT_TRUSTED'):
            def on_yes(checked):
                if checked:
                    self._always_trust.append(keys[0])
                self._encrypt(con, event, keys, callback, True)

            def on_no():
                self._raise_message_not_sent(con, event, error)

            app.nec.push_incoming_event(
                NetworkEvent('pgp-not-trusted', on_yes=on_yes, on_no=on_no))

        else:
            self._raise_message_not_sent(con, event, error)

    @staticmethod
    def _raise_message_not_sent(con, event, error):
        session = event.session if hasattr(event, 'session') else None
        app.nec.push_incoming_event(
            NetworkEvent('message-not-sent',
                         conn=con,
                         jid=event.jid,
                         message=event.message,
                         error=_('Encryption error: %s') % error,
                         time_=time.time(),
                         session=session))

    def _create_pgp_legacy_message(self, stanza, payload):
        stanza.setBody(self._get_info_message())
Philipp Hörist's avatar
Philipp Hörist committed
214
        stanza.setTag('x', namespace=Namespace.ENCRYPTED).setData(payload)
215
        eme_node = nbxmpp.Node('encryption',
Philipp Hörist's avatar
Philipp Hörist committed
216 217
                               attrs={'xmlns': Namespace.EME,
                                      'namespace': Namespace.ENCRYPTED})
218 219 220 221 222 223 224 225 226 227 228 229 230 231 232
        stanza.addChild(node=eme_node)

    def sign_presence(self, presence, status):
        key_data = self.get_own_key_data()
        if key_data is None:
            self._log.warning('No own key id found, cant sign presence')
            return

        try:
            result = self._pgp.sign(status, key_data['key_id'])
        except SignError as error:
            self._log.warning('Sign Error: %s', error)
            return
        # self._log.debug(self._pgp.sign.cache_info())
        self._log.info('Presence signed')
Philipp Hörist's avatar
Philipp Hörist committed
233
        presence.setTag(Namespace.SIGNED + ' x').setData(result)
234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260

    @staticmethod
    def _get_info_message():
        msg = '[This message is *encrypted* (See :XEP:`27`]'
        lang = os.getenv('LANG')
        if lang is not None and not lang.startswith('en'):
            # we're not english: one in locale and one en
            msg = _('[This message is *encrypted* (See :XEP:`27`]') + \
                    ' (' + msg + ')'
        return msg

    def _get_key_ids(self, jid):
        key_data = self.get_contact_key_data(jid)
        if key_data is None:
            raise NoKeyIdFound('No key id found for %s' % jid)
        key_id = key_data['key_id']

        own_key_data = self.get_own_key_data()
        if own_key_data is None:
            raise NoKeyIdFound('Own key id not found')
        own_key_id = own_key_data['key_id']
        return key_id, own_key_id

    @staticmethod
    def _cleanup_stanza(obj):
        ''' We make sure only allowed tags are in the stanza '''
        stanza = nbxmpp.Message(
261 262
            to=obj.stanza.getTo(),
            typ=obj.stanza.getType())
Philipp Hörist's avatar
Philipp Hörist committed
263
        stanza.setID(obj.stanza.getID())
264
        stanza.setThread(obj.stanza.getThread())
265
        for tag, ns in ALLOWED_TAGS:
266
            node = obj.stanza.getTag(tag, namespace=ns)
267 268
            if node:
                stanza.addChild(node=node)
269
        obj.stanza = stanza
270 271 272 273 274 275 276 277 278 279 280 281 282 283

    def encrypt_file(self, file, callback):
        thread = threading.Thread(target=self._encrypt_file_thread,
                                  args=(file, callback))
        thread.daemon = True
        thread.start()

    def _encrypt_file_thread(self, file, callback):
        try:
            key_id, own_key_id = self._get_key_ids(file.contact.jid)
        except NoKeyIdFound as error:
            self._log.warning(error)
            return

Kamay's avatar
Kamay committed
284 285
        stream = open(file.path, "rb")
        encrypted = self._pgp.encrypt_file(stream,
286
                                           [key_id, own_key_id])
Kamay's avatar
Kamay committed
287 288
        stream.close()

289 290 291 292 293
        if not encrypted:
            GLib.idle_add(self._on_file_encryption_error, encrypted.status)
            return

        file.size = len(encrypted.data)
294 295
        file.set_uri_transform_func(lambda uri: '%s.pgp' % uri)
        file.set_encrypted_data(encrypted.data)
296 297 298 299 300 301 302 303 304
        GLib.idle_add(callback, file)

    @staticmethod
    def _on_file_encryption_error(error):
        app.nec.push_incoming_event(
            NetworkEvent('pgp-file-encryption-error', error=error))

def get_instance(*args, **kwargs):
    return PGPLegacy(*args, **kwargs), 'PGPLegacy'