Commit 7776ae0a authored by Philipp Hörist's avatar Philipp Hörist

[openpgp] Inital commit

parent bd1b120b
from .pgpplugin import OpenPGPPlugin
# Copyright (C) 2018 Philipp Hörist <philipp AT hoerist.com>
#
# This file is part of Gajim.
#
# Gajim is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published
# by the Free Software Foundation; version 3 only.
#
# Gajim is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Gajim. If not, see <http://www.gnu.org/licenses/>.
# XEP-0373: OpenPGP for XMPP
import io
from collections import namedtuple
import logging
import gpg
from gajim.common import app
KeyringItem = namedtuple('KeyringItem',
'type keyid userid fingerprint')
log = logging.getLogger('gajim.plugin_system.openpgp.pgpme')
class PGPContext():
def __init__(self, jid, gnuhome):
self.context = gpg.Context(home_dir=str(gnuhome))
# self.create_new_key()
# self.get_key_by_name()
# self.get_key_by_fingerprint()
self.export_public_key()
def create_new_key(self):
parms = """<GnupgKeyParms format="internal">
Key-Type: RSA
Key-Length: 2048
Subkey-Type: RSA
Subkey-Length: 2048
Name-Real: Joe Tester
Name-Comment: with stupid passphrase
Name-Email: test@example.org
Passphrase: Crypt0R0cks
Expire-Date: 2020-12-31
</GnupgKeyParms>
"""
with self.context as c:
c.set_engine_info(gpg.constants.PROTOCOL_OpenPGP, None, app.gajimpaths['MY_DATA'])
c.set_progress_cb(gpg.callbacks.progress_stdout)
c.op_genkey(parms, None, None)
print("Generated key with fingerprint {0}.".format(
c.op_genkey_result().fpr))
def get_all_keys(self):
c = gpg.Context()
for key in c.keylist():
user = key.uids[0]
print("Keys for %s (%s):" % (user.name, user.email))
for subkey in key.subkeys:
features = []
if subkey.can_authenticate:
features.append('auth')
if subkey.can_certify:
features.append('cert')
if subkey.can_encrypt:
features.append('encrypt')
if subkey.can_sign:
features.append('sign')
print(' %s %s' % (subkey.fpr, ','.join(features)))
def get_key_by_name(self):
c = gpg.Context()
for key in c.keylist('john'):
print(key.subkeys[0].fpr)
def get_key_by_fingerprint(self):
c = gpg.Context()
fingerprint = 'key fingerprint to search for'
try:
key = c.get_key(fingerprint)
print('%s (%s)' % (key.uids[0].name, key.uids[0].email))
except gpg.errors.KeyNotFound:
print("No key for fingerprint '%s'." % fingerprint)
def get_secret_key(self):
'''
Key(can_authenticate=1,
can_certify=1,
can_encrypt=1,
can_sign=1,
chain_id=None,
disabled=0,
expired=0,
fpr='7ECE1F88BAFCA37F168E1556A4DBDD1BA55FE3CE',
invalid=0,
is_qualified=0,
issuer_name=None,
issuer_serial=None,
keylist_mode=1,
last_update=0,
origin=0,
owner_trust=5,
protocol=0,
revoked=0,
secret=1,
subkeys=[
SubKey(can_authenticate=1,
can_certify=1,
can_encrypt=1,
can_sign=1,
card_number=None
curve=None,
disabled=0,
expired=0,
expires=0,
fpr='7ECE1F88BAFCA37F168E1556A4DBDD1BA55FE3CE',
invalid=0,
is_cardkey=0,
is_de_vs=1,
is_qualified=0,
keygrip='15BECB77301E4810ABB9CA6A9391158E575DABEC',
keyid='A4DBDD1BA55FE3CE',
length=2048,
pubkey_algo=1,
revoked=0,
secret=1,
timestamp=1525006759)],
uids=[
UID(address=None,
comment='',
email='',
invalid=0,
last_update=0,
name='xmpp:philw@jabber.at',
origin=0,
revoked=0,
signatures=[],
tofu=[],
uid='xmpp:philw@jabber.at',
validity=5)])
'''
for key in self.context.keylist(secret=True):
break
return key.fpr, key.fpr[-16:]
def get_keys(self, secret=False):
keys = []
for key in self.context.keylist():
for uid in key.uids:
if uid.uid.startswith('xmpp:'):
keys.append((key, uid.uid[5:]))
break
return keys
def export_public_key(self):
# print(dir(self.context))
result = self.context.key_export_minimal()
print(result)
def encrypt_decrypt_files(self):
c = gpg.Context()
recipient = c.get_key("fingerprint of recipient's key")
# Encrypt
with open('foo.txt', 'r') as input_file:
with open('foo.txt.gpg', 'wb') as output_file:
c.encrypt([recipient], 0, input_file, output_file)
# Decrypt
with open('foo.txt.gpg', 'rb') as input_file:
with open('foo2.txt', 'w') as output_file:
c.decrypt(input_file, output_file)
def encrypt(self):
c = gpg.Context()
recipient = c.get_key("fingerprint of recipient's key")
plaintext_string = u'plain text data'
plaintext_bytes = io.BytesIO(plaintext_string.encode('utf8'))
encrypted_bytes = io.BytesIO()
c.encrypt([recipient], 0, plaintext_bytes, encrypted_bytes)
def decrypt(self):
c = gpg.Context()
decrypted_bytes = io.BytesIO()
c.decrypt(encrypted_bytes, decrypted_bytes)
decrypted_string = decrypted_bytes.getvalue().decode('utf8')
# Copyright (C) 2018 Philipp Hörist <philipp AT hoerist.com>
#
# This file is part of Gajim.
#
# Gajim is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published
# by the Free Software Foundation; version 3 only.
#
# Gajim is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Gajim. If not, see <http://www.gnu.org/licenses/>.
# XEP-0373: OpenPGP for XMPP
import os
import logging
from collections import namedtuple
import gnupg
from gajim.common import app
from openpgp.modules.util import DecryptionFailed
log = logging.getLogger('gajim.plugin_system.openpgp.pygnupg')
# gnupg.logger = log
KeyringItem = namedtuple('KeyringItem', 'jid keyid fingerprint')
class PGPContext(gnupg.GPG):
def __init__(self, jid, gnupghome):
gnupg.GPG.__init__(
self, gpgbinary=app.get_gpg_binary(), gnupghome=str(gnupghome))
self._passphrase = 'gajimopenpgppassphrase'
self._jid = jid
self._own_fingerprint = None
def _get_key_params(self, jid, passphrase):
'''
Generate --gen-key input
'''
params = {
'Key-Type': 'RSA',
'Key-Length': 2048,
'Name-Real': 'xmpp:%s' % jid,
'Passphrase': passphrase,
}
out = "Key-Type: %s\n" % params.pop('Key-Type')
for key, val in list(params.items()):
out += "%s: %s\n" % (key, val)
out += "%commit\n"
return out
def generate_key(self):
super().gen_key(self._get_key_params(self._jid, self._passphrase))
def encrypt(self, payload, keys):
recipients = [key.fingerprint for key in keys]
log.info('encrypt to:')
for fingerprint in recipients:
log.info(fingerprint)
result = super().encrypt(str(payload).encode('utf8'),
recipients,
sign=self._own_fingerprint,
always_trust=True,
passphrase=self._passphrase)
if result.ok:
error = ''
else:
error = result.status
return str(result), error
def decrypt(self, payload):
result = super().decrypt(payload,
always_trust=True,
passphrase=self._passphrase)
if not result.ok:
raise DecryptionFailed(result.status)
return result.data.decode('utf8')
def get_key(self, fingerprint):
return super().list_keys(keys=[fingerprint])
def get_keys(self, secret=False):
result = super().list_keys(secret=secret)
keys = []
for key in result:
item = self._make_keyring_item(key)
if item is None:
continue
keys.append(self._make_keyring_item(key))
return keys
@staticmethod
def _make_keyring_item(key):
userid = key['uids'][0]
if not userid.startswith('xmpp:'):
log.warning('Incorrect userid: %s found for key, '
'key will be ignored', userid)
return
jid = userid[5:]
return KeyringItem(jid, key['keyid'], key['fingerprint'])
def import_key(self, data, jid):
log.info('Import key from %s', jid)
result = super().import_keys(data)
if not result:
log.error('Could not import key')
log.error(result.results[0])
return
if not self.validate_key(data, jid):
return None
key = self.get_key(result.results[0]['fingerprint'])
return self._make_keyring_item(key[0])
def validate_key(self, public_key, jid):
import tempfile
temppath = os.path.join(tempfile.gettempdir(), 'temp_pubkey')
with open(temppath, 'wb') as tempfile:
tempfile.write(public_key)
result = self.scan_keys(temppath)
if result:
for uid in result.uids:
if uid.startswith('xmpp:'):
if uid[5:] == jid:
key_found = True
else:
log.warning('Found wrong userid in key: %s != %s',
uid[5:], jid)
log.debug(result)
os.remove(temppath)
return False
if not key_found:
log.warning('No valid userid found in key')
log.debug(result)
os.remove(temppath)
return False
log.info('Key validation succesful')
os.remove(temppath)
return True
log.warning('Invalid key data: %s')
log.debug(result)
os.remove(temppath)
return False
def get_own_key_details(self):
result = super().list_keys(secret=True)
if not result:
return None, None
if len(result) > 1:
log.error('More than one secret key found')
return None, None
self._own_fingerprint = result[0]['fingerprint']
return self._own_fingerprint, int(result[0]['date'])
def export_key(self, fingerprint):
key = super().export_keys(
fingerprint, secret=False, armor=False, minimal=False,
passphrase=self._passphrase)
return key
def delete_key(self, fingerprint):
log.info('Delete Key: %s', fingerprint)
super().delete_keys(fingerprint, passphrase=self._passphrase)
# Copyright (C) 2018 Philipp Hörist <philipp AT hoerist.com>
#
# This file is part of Gajim.
#
# Gajim is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published
# by the Free Software Foundation; version 3 only.
#
# Gajim is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Gajim. If not, see <http://www.gnu.org/licenses/>.
# XEP-0373: OpenPGP for XMPP
import sqlite3
import logging
from collections import namedtuple
log = logging.getLogger('gajim.plugin_system.openpgp.sql')
TABLE_LAYOUT = '''
CREATE TABLE contacts (
jid TEXT,
fingerprint TEXT,
active BOOLEAN,
trust INTEGER,
timestamp INTEGER,
comment TEXT
);
CREATE UNIQUE INDEX jid_fingerprint ON contacts (jid, fingerprint);'''
class Storage:
def __init__(self, folder_path):
self._con = sqlite3.connect(folder_path / 'contacts.db',
detect_types=sqlite3.PARSE_DECLTYPES)
self._con.row_factory = self._namedtuple_factory
self._create_database()
self._migrate_database()
self._con.execute("PRAGMA synchronous=FULL;")
self._con.commit()
@staticmethod
def _namedtuple_factory(cursor, row):
fields = [col[0] for col in cursor.description]
Row = namedtuple("Row", fields)
named_row = Row(*row)
return named_row
def _user_version(self):
return self._con.execute('PRAGMA user_version').fetchone()[0]
def _create_database(self):
if not self._user_version():
log.info('Create contacts.db')
self._execute_query(TABLE_LAYOUT)
def _execute_query(self, query):
transaction = """
BEGIN TRANSACTION;
%s
PRAGMA user_version=1;
END TRANSACTION;
""" % (query)
self._con.executescript(transaction)
def _migrate_database(self):
pass
def load_contacts(self):
sql = 'SELECT * from contacts'
rows = self._con.execute(sql).fetchall()
if rows is not None:
return rows
def save_contact(self, db_values):
sql = '''REPLACE INTO
contacts(jid, fingerprint, active, trust, timestamp, comment)
VALUES(?, ?, ?, ?, ?, ?)'''
for values in db_values:
log.info('Store key: %s', values)
self._con.execute(sql, values)
self._con.commit()
def set_trust(self, jid, fingerprint, trust):
sql = 'UPDATE contacts SET trust = ? WHERE jid = ? AND fingerprint = ?'
log.info('Set Trust: %s %s %s', trust, jid, fingerprint)
self._con.execute(sql, (trust, jid, fingerprint))
self._con.commit()
def delete_key(self, jid, fingerprint):
sql = 'DELETE from contacts WHERE jid = ? AND fingerprint = ?'
log.info('Delete Key: %s %s', jid, fingerprint)
self._con.execute(sql, (jid, fingerprint))
self._con.commit()
def cleanup(self):
self._con.close()
# Copyright (C) 2018 Philipp Hörist <philipp AT hoerist.com>
#
# This file is part of Gajim.
#
# Gajim is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published
# by the Free Software Foundation; version 3 only.
#
# Gajim is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Gajim. If not, see <http://www.gnu.org/licenses/>.
# XEP-0373: OpenPGP for XMPP
import logging
import time
from gi.repository import Gtk
from gajim.common import app
from gajim.common.const import DialogButton, ButtonAction
from gajim.gtk import NewConfirmationDialog
from openpgp.modules.util import Trust
log = logging.getLogger('gajim.plugin_system.openpgp.keydialog')
TRUST_DATA = {
Trust.NOT_TRUSTED: ('dialog-error-symbolic',
_('Not Trusted'),
'error-color'),
Trust.UNKNOWN: ('security-low-symbolic',
_('Not Decided'),
'warning-color'),
Trust.BLIND: ('security-medium-symbolic',
_('Blind Trust'),
'openpgp-dark-success-color'),
Trust.VERIFIED: ('security-high-symbolic',
_('Verified'),
'success-color')
}
class KeyDialog(Gtk.Dialog):
def __init__(self, account, jid, transient):
flags = Gtk.DialogFlags.DESTROY_WITH_PARENT
super().__init__(_('Public Keys for %s') % jid, None, flags)
self.set_transient_for(transient)
self.set_resizable(True)
self.set_default_size(500, 300)
self.get_style_context().add_class('openpgp-key-dialog')
self.con = app.connections[account]
self._listbox = Gtk.ListBox()
self._listbox.set_selection_mode(Gtk.SelectionMode.NONE)
self._scrolled = Gtk.ScrolledWindow()
self._scrolled.set_policy(Gtk.PolicyType.NEVER,
Gtk.PolicyType.AUTOMATIC)
self._scrolled.add(self._listbox)
box = self.get_content_area()
box.pack_start(self._scrolled, True, True, 0)
keys = self.con.get_module('OpenPGP').get_keys(jid, only_trusted=False)
for key in keys:
log.info('Load: %s', key.fingerprint)
self._listbox.add(KeyRow(key))
self.show_all()
class KeyRow(Gtk.ListBoxRow):
def __init__(self, key):
Gtk.ListBoxRow.__init__(self)
self.set_activatable(False)
self._dialog = self.get_toplevel()
self.key = key
box = Gtk.Box()
box.set_spacing(12)
self._trust_button = TrustButton(self)
box.add(self._trust_button)
label_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
fingerprint = Gtk.Label(
label=self._format_fingerprint(key.fingerprint))
fingerprint.get_style_context().add_class('openpgp-mono')
if not key.active:
fingerprint.get_style_context().add_class('openpgp-inactive-color')
fingerprint.set_selectable(True)
fingerprint.set_halign(Gtk.Align.START)
fingerprint.set_valign(Gtk.Align.START)
fingerprint.set_hexpand(True)
label_box.add(fingerprint)
date = Gtk.Label(label=self._format_timestamp(key.timestamp))
date.set_halign(Gtk.Align.START)
date.get_style_context().add_class('openpgp-mono')
if not key.active:
date.get_style_context().add_class('openpgp-inactive-color')
label_box.add(date)
box.add(label_box)
self.add(box)
self.show_all()
def delete_fingerprint(self, *args):
def _remove():
self.get_parent().remove(self)
self.key.delete()
self.destroy()
buttons = {
Gtk.ResponseType.CANCEL: DialogButton('Cancel'),
Gtk.ResponseType.OK: DialogButton('Delete',
_remove,
ButtonAction.DESTRUCTIVE),
}
NewConfirmationDialog(
_('Delete Public Key'),
_('This will permanently delete this public key'),
buttons,
transient_for=self.get_toplevel())
def set_trust(self, trust):
icon_name, tooltip, css_class = TRUST_DATA[trust]
image = self._trust_button.get_child()
image.set_from_icon_name(icon_name, Gtk.IconSize.MENU)
image.get_style_context().add_class(css_class)
@staticmethod
def _format_fingerprint(fingerprint):
fplen = len(fingerprint)
wordsize = fplen // 8
buf = ''
for w in range(0, fplen, wordsize):
buf += '{0} '.format(fingerprint[w:w + wordsize])
return buf.rstrip()
@staticmethod
def _format_timestamp(timestamp):
return time.strftime('%Y-%m-%d %H:%M:%S',
time.localtime(timestamp))
class TrustButton(Gtk.MenuButton):
def __init__(self, row):
Gtk.MenuButton.__init__(self)
self._row = row
self._css_class = ''
self.set_popover(TrustPopver(row))
self.update()
def update(self):
icon_name, tooltip, css_class = TRUST_DATA[self._row.key.trust]
image = self.get_child()
image.set_from_icon_name(icon_name, Gtk.IconSize.MENU)
# remove old color from icon
image.get_style_context().remove_class(self._css_class)
if not self._row.key.active:
css_class = 'openpgp-inactive-color'
tooltip = '%s - %s' % (_('Inactive'), tooltip)
image.get_style_context().add_class(css_class)
self._css_class = css_class
self.set_tooltip_text(tooltip)
class TrustPopver(Gtk.Popover):
def __init__(self, row):
Gtk.Popover.__init__(self)
self._row = row
self._listbox = Gtk.ListBox()
self._listbox.set_selection_mode(Gtk.SelectionMode.NONE)
if row.key.trust != Trust.VERIFIED:
self._listbox.add(VerifiedOption())
if row.key.trust != Trust.NOT_TRUSTED:
self._listbox.add(NotTrustedOption())
self._listbox.add(DeleteOption())
self.add(self._listbox)
self._listbox.show_all()
self._listbox.connect('row-activated', self._activated)
self.get_style_context().add_class('openpgp-trust-popover')
def _activated(self, listbox, row):
self.popdown()
if row.type_ is None:
self._row.delete_fingerprint()
else:
self._row.key.trust = row.type_
self.get_relative_to().update()
self.update()
def update(self):
self._listbox.foreach(lambda row: self._listbox.remove(row))
if self._row.key.trust != Trust.VERIFIED:
self._listbox.add(VerifiedOption())
if self._row.key.trust != Trust.NOT_TRUSTED:
self._listbox.add(NotTrustedOption())
self._listbox.add(DeleteOption())
class MenuOption(Gtk.ListBoxRow):
def __init__(self):
Gtk.ListBoxRow.__init__(self)
box = Gtk.Box()
box.set_spacing(6)
image = Gtk.Image.new_from_icon_name(self.icon,
Gtk.IconSize.MENU)
label = Gtk.Label(label=self.label)
image.get_style_context().add_class(self.color)
box.add(image)
box.add(label)
self.add(box)
self.show_all()