Skip to content
GitLab
Explore
Sign in
Register
Primary navigation
Search or go to…
Project
gajim
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Model registry
Operate
Environments
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
eta
gajim
Commits
03b06bd7
Commit
03b06bd7
authored
11 years ago
by
Yann Leboulanger
Browse files
Options
Downloads
Patches
Plain Diff
update gnupg to 0.3.4
parent
61b1d4f5
No related branches found
Branches containing commit
No related tags found
Tags containing commit
No related merge requests found
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
src/common/gnupg.py
+153
-56
153 additions, 56 deletions
src/common/gnupg.py
with
153 additions
and
56 deletions
src/common/gnupg.py
+
153
−
56
View file @
03b06bd7
...
...
@@ -27,15 +27,15 @@ Vinay Sajip to make use of the subprocess module (Steve's version uses os.fork()
and so does not work on Windows). Renamed to gnupg.py to avoid confusion with
the previous versions.
Modifications Copyright (C) 2008-201
2
Vinay Sajip. All rights reserved.
Modifications Copyright (C) 2008-201
3
Vinay Sajip. All rights reserved.
A unittest harness (test_gnupg.py) has also been added.
"""
import
locale
__version__
=
"
0.3.
0
"
__version__
=
"
0.3.
4
"
__author__
=
"
Vinay Sajip
"
__date__
=
"
$
12-May-2012 10:49:10
$
"
__date__
=
"
$
05-Jun-2013 09:48:54
$
"
try
:
from
io
import
StringIO
...
...
@@ -46,6 +46,7 @@ import codecs
import
locale
import
logging
import
os
import
re
import
socket
from
subprocess
import
Popen
from
subprocess
import
PIPE
...
...
@@ -61,8 +62,12 @@ except ImportError:
try
:
unicode
_py3k
=
False
string_types
=
basestring
text_type
=
unicode
except
NameError
:
_py3k
=
True
string_types
=
str
text_type
=
str
logger
=
logging
.
getLogger
(
'
gajim.c.gnupg
'
)
if
not
logger
.
handlers
:
...
...
@@ -77,7 +82,7 @@ def _copy_data(instream, outstream):
enc
=
'
ascii
'
while
True
:
data
=
instream
.
read
(
1024
)
if
len
(
data
)
==
0
:
if
not
data
:
break
sent
+=
len
(
data
)
logger
.
debug
(
"
sending chunk (%d): %r
"
,
sent
,
data
[:
256
])
...
...
@@ -110,16 +115,16 @@ def _write_passphrase(stream, passphrase, encoding):
logger
.
debug
(
"
Wrote passphrase: %r
"
,
passphrase
)
def
_is_sequence
(
instance
):
return
isinstance
(
instance
,
list
)
or
isinstance
(
instance
,
tuple
)
return
isinstance
(
instance
,
(
list
,
tuple
,
set
,
frozenset
)
)
def
_make_binary_stream
(
s
,
encoding
):
if
_py3k
:
if
isinstance
(
s
,
str
):
s
=
s
.
encode
(
encoding
)
else
:
if
type
(
s
)
is
not
str
:
s
=
s
.
encode
(
encoding
)
try
:
if
_py3k
:
if
isinstance
(
s
,
str
):
s
=
s
.
encode
(
encoding
)
else
:
if
type
(
s
)
is
not
str
:
s
=
s
.
encode
(
encoding
)
from
io
import
BytesIO
rv
=
BytesIO
(
s
)
except
ImportError
:
...
...
@@ -129,12 +134,33 @@ def _make_binary_stream(s, encoding):
class
Verify
(
object
):
"
Handle status messages for --verify
"
TRUST_UNDEFINED
=
0
TRUST_NEVER
=
1
TRUST_MARGINAL
=
2
TRUST_FULLY
=
3
TRUST_ULTIMATE
=
4
TRUST_LEVELS
=
{
"
TRUST_UNDEFINED
"
:
TRUST_UNDEFINED
,
"
TRUST_NEVER
"
:
TRUST_NEVER
,
"
TRUST_MARGINAL
"
:
TRUST_MARGINAL
,
"
TRUST_FULLY
"
:
TRUST_FULLY
,
"
TRUST_ULTIMATE
"
:
TRUST_ULTIMATE
,
}
def
__init__
(
self
,
gpg
):
self
.
gpg
=
gpg
self
.
valid
=
False
self
.
fingerprint
=
self
.
creation_date
=
self
.
timestamp
=
None
self
.
signature_id
=
self
.
key_id
=
None
self
.
username
=
None
self
.
key_status
=
None
self
.
status
=
None
self
.
pubkey_fingerprint
=
None
self
.
expire_timestamp
=
None
self
.
sig_timestamp
=
None
self
.
trust_text
=
None
self
.
trust_level
=
None
def
__nonzero__
(
self
):
return
self
.
valid
...
...
@@ -142,15 +168,30 @@ class Verify(object):
__bool__
=
__nonzero__
def
handle_status
(
self
,
key
,
value
):
if
key
in
(
"
TRUST_UNDEFINED
"
,
"
TRUST_NEVER
"
,
"
TRUST_MARGINAL
"
,
"
TRUST_FULLY
"
,
"
TRUST_ULTIMATE
"
,
"
RSA_OR_IDEA
"
,
"
NODATA
"
,
"
IMPORT_RES
"
,
"
PLAINTEXT
"
,
"
PLAINTEXT_LENGTH
"
,
"
POLICY_URL
"
,
"
DECRYPTION_INFO
"
,
"
DECRYPTION_OKAY
"
):
if
key
in
self
.
TRUST_LEVELS
:
self
.
trust_text
=
key
self
.
trust_level
=
self
.
TRUST_LEVELS
[
key
]
elif
key
in
(
"
RSA_OR_IDEA
"
,
"
NODATA
"
,
"
IMPORT_RES
"
,
"
PLAINTEXT
"
,
"
PLAINTEXT_LENGTH
"
,
"
POLICY_URL
"
,
"
DECRYPTION_INFO
"
,
"
DECRYPTION_OKAY
"
,
"
INV_SGNR
"
,
"
FILE_START
"
,
"
FILE_ERROR
"
,
"
FILE_DONE
"
,
"
PKA_TRUST_GOOD
"
,
"
PKA_TRUST_BAD
"
,
"
BADMDC
"
,
"
GOODMDC
"
,
"
NO_SGNR
"
):
pass
elif
key
==
"
BADSIG
"
:
self
.
valid
=
False
self
.
status
=
'
signature bad
'
self
.
key_id
,
self
.
username
=
value
.
split
(
None
,
1
)
elif
key
==
"
ERRSIG
"
:
self
.
valid
=
False
(
self
.
key_id
,
algo
,
hash_algo
,
cls
,
self
.
timestamp
)
=
value
.
split
()[:
5
]
self
.
status
=
'
signature error
'
elif
key
==
"
EXPSIG
"
:
self
.
valid
=
False
self
.
status
=
'
signature expired
'
self
.
key_id
,
self
.
username
=
value
.
split
(
None
,
1
)
elif
key
==
"
GOODSIG
"
:
self
.
valid
=
True
self
.
status
=
'
signature good
'
...
...
@@ -166,13 +207,6 @@ class Verify(object):
elif
key
==
"
SIG_ID
"
:
(
self
.
signature_id
,
self
.
creation_date
,
self
.
timestamp
)
=
value
.
split
()
elif
key
==
"
ERRSIG
"
:
self
.
valid
=
False
(
self
.
key_id
,
algo
,
hash_algo
,
cls
,
self
.
timestamp
)
=
value
.
split
()[:
5
]
self
.
status
=
'
signature error
'
elif
key
==
"
DECRYPTION_FAILED
"
:
self
.
valid
=
False
self
.
key_id
=
value
...
...
@@ -181,17 +215,21 @@ class Verify(object):
self
.
valid
=
False
self
.
key_id
=
value
self
.
status
=
'
no public key
'
elif
key
in
(
"
KEYEXPIRED
"
,
"
SIGEXPIRED
"
):
elif
key
in
(
"
KEYEXPIRED
"
,
"
SIGEXPIRED
"
,
"
KEYREVOKED
"
):
# these are useless in verify, since they are spit out for any
# pub/subkeys on the key, not just the one doing the signing.
# if we want to check for signatures with expired key,
# the relevant flag is EXPKEYSIG.
# the relevant flag is EXPKEYSIG
or REVKEYSIG
.
pass
elif
key
in
(
"
EXPKEYSIG
"
,
"
REVKEYSIG
"
):
# signed with expired or revoked key
self
.
valid
=
False
self
.
key_id
=
value
.
split
()[
0
]
self
.
status
=
((
'
%s %s
'
)
%
(
key
[:
3
],
key
[
3
:])).
lower
()
if
key
==
"
EXPKEYSIG
"
:
self
.
key_status
=
'
signing key has expired
'
else
:
self
.
key_status
=
'
signing key was revoked
'
self
.
status
=
self
.
key_status
else
:
raise
ValueError
(
"
Unknown status message: %r
"
%
key
)
...
...
@@ -273,11 +311,21 @@ class ImportResult(object):
def
summary
(
self
):
l
=
[]
l
.
append
(
'
%d imported
'
%
self
.
imported
)
l
.
append
(
'
%d imported
'
%
self
.
imported
)
if
self
.
not_imported
:
l
.
append
(
'
%d not imported
'
%
self
.
not_imported
)
l
.
append
(
'
%d not imported
'
%
self
.
not_imported
)
return
'
,
'
.
join
(
l
)
ESCAPE_PATTERN
=
re
.
compile
(
r
'
\\x([0-9a-f][0-9a-f])
'
,
re
.
I
)
BASIC_ESCAPES
=
{
r
'
\n
'
:
'
\n
'
,
r
'
\r
'
:
'
\r
'
,
r
'
\f
'
:
'
\f
'
,
r
'
\v
'
:
'
\v
'
,
r
'
\b
'
:
'
\b
'
,
r
'
\0
'
:
'
\0
'
,
}
class
ListKeys
(
list
):
'''
Handle status messages for --list-keys.
...
...
@@ -322,8 +370,12 @@ class ListKeys(list):
self
.
fingerprints
.
append
(
args
[
9
])
def
uid
(
self
,
args
):
self
.
curkey
[
'
uids
'
].
append
(
args
[
9
])
self
.
uids
.
append
(
args
[
9
])
uid
=
args
[
9
]
uid
=
ESCAPE_PATTERN
.
sub
(
lambda
m
:
chr
(
int
(
m
.
group
(
1
),
16
)),
uid
)
for
k
,
v
in
BASIC_ESCAPES
.
items
():
uid
=
uid
.
replace
(
k
,
v
)
self
.
curkey
[
'
uids
'
].
append
(
uid
)
self
.
uids
.
append
(
uid
)
def
sub
(
self
,
args
):
subkey
=
[
args
[
4
],
args
[
11
]]
...
...
@@ -352,7 +404,7 @@ class Crypt(Verify):
def
handle_status
(
self
,
key
,
value
):
if
key
in
(
"
ENC_TO
"
,
"
USERID_HINT
"
,
"
GOODMDC
"
,
"
END_DECRYPTION
"
,
"
BEGIN_SIGNING
"
,
"
NO_SECKEY
"
,
"
ERROR
"
,
"
NODATA
"
,
"
CARDCTRL
"
):
"
CARDCTRL
"
,
"
BADMDC
"
,
"
SC_OP_FAILURE
"
,
"
SC_OP_SUCCESS
"
):
# in the case of ERROR, this is because a more specific error
# message will have come first
pass
...
...
@@ -400,7 +452,7 @@ class GenKey(object):
return
self
.
fingerprint
or
''
def
handle_status
(
self
,
key
,
value
):
if
key
in
(
"
PROGRESS
"
,
"
GOOD_PASSPHRASE
"
,
"
NODATA
"
):
if
key
in
(
"
PROGRESS
"
,
"
GOOD_PASSPHRASE
"
,
"
NODATA
"
,
"
KEY_NOT_CREATED
"
):
pass
elif
key
==
"
KEY_CREATED
"
:
(
self
.
type
,
self
.
fingerprint
)
=
value
.
split
()
...
...
@@ -419,7 +471,7 @@ class DeleteResult(object):
problem_reason
=
{
'
1
'
:
'
No such key
'
,
'
2
'
:
'
Must delete secret key first
'
,
'
3
'
:
'
Ambig
i
ous specification
'
,
'
3
'
:
'
Ambig
u
ous specification
'
,
}
def
handle_status
(
self
,
key
,
value
):
...
...
@@ -429,13 +481,19 @@ class DeleteResult(object):
else
:
raise
ValueError
(
"
Unknown status message: %r
"
%
key
)
def
__nonzero__
(
self
):
return
self
.
status
==
'
ok
'
__bool__
=
__nonzero__
class
Sign
(
object
):
"
Handle status messages for --sign
"
def
__init__
(
self
,
gpg
):
self
.
gpg
=
gpg
self
.
type
=
None
self
.
hash_algo
=
None
self
.
fingerprint
=
None
self
.
status
=
''
def
__nonzero__
(
self
):
return
self
.
fingerprint
is
not
None
...
...
@@ -447,18 +505,19 @@ class Sign(object):
def
handle_status
(
self
,
key
,
value
):
if
key
in
(
"
USERID_HINT
"
,
"
NEED_PASSPHRASE
"
,
"
BAD_PASSPHRASE
"
,
"
GOOD_PASSPHRASE
"
,
"
BEGIN_SIGNING
"
,
"
CARDCTRL
"
,
"
MISSING_PASSPHRASE
"
):
"
GOOD_PASSPHRASE
"
,
"
BEGIN_SIGNING
"
,
"
CARDCTRL
"
,
"
INV_SGNR
"
,
"
KEYEXPIRED
"
,
"
SIGEXPIRED
"
,
"
KEYREVOKED
"
,
"
NO_SGNR
"
,
"
MISSING_PASSPHRASE
"
,
"
SC_OP_FAILURE
"
,
"
SC_OP_SUCCESS
"
):
pass
elif
key
in
(
"
KEYEXPIRED
"
,
"
SIGEXPIRED
"
):
self
.
status
=
'
key expired
'
elif
key
==
"
SIG_CREATED
"
:
(
self
.
type
,
algo
,
hashalgo
,
cls
,
algo
,
self
.
hash
_
algo
,
cls
,
self
.
timestamp
,
self
.
fingerprint
)
=
value
.
split
()
else
:
raise
ValueError
(
"
Unknown status message: %r
"
%
key
)
VERSION_RE
=
re
.
compile
(
r
'
gpg \(GnuPG\) (\d+(\.\d+)*)
'
.
encode
(
'
utf-8
'
),
re
.
I
)
class
GPG
(
object
):
...
...
@@ -476,21 +535,39 @@ class GPG(object):
"
Encapsulate access to the gpg executable
"
def
__init__
(
self
,
gpgbinary
=
'
gpg
'
,
gnupghome
=
None
,
verbose
=
False
,
use_agent
=
False
,
keyring
=
None
):
use_agent
=
False
,
keyring
=
None
,
options
=
None
,
secret_keyring
=
None
):
"""
Initialize a GPG process wrapper. Options are:
gpgbinary -- full pathname for GPG binary.
gnupghome -- full pathname to where we can find the public and
private keyrings. Default is whatever gpg defaults to.
keyring -- name of alternative keyring file to use. If specified,
the default keyring is not used.
keyring -- name of alternative keyring file to use, or list of such
keyrings. If specified, the default keyring is not used.
options =-- a list of additional options to pass to the GPG binary.
secret_keyring -- name of alternative secret keyring file to use, or
list of such keyrings.
"""
self
.
gpgbinary
=
gpgbinary
self
.
gnupghome
=
gnupghome
if
keyring
:
# Allow passing a string or another iterable. Make it uniformly
# a list of keyring filenames
if
isinstance
(
keyring
,
string_types
):
keyring
=
[
keyring
]
self
.
keyring
=
keyring
if
secret_keyring
:
# Allow passing a string or another iterable. Make it uniformly
# a list of keyring filenames
if
isinstance
(
secret_keyring
,
string_types
):
secret_keyring
=
[
secret_keyring
]
self
.
secret_keyring
=
secret_keyring
self
.
verbose
=
verbose
self
.
use_agent
=
use_agent
if
isinstance
(
options
,
str
):
options
=
[
options
]
self
.
options
=
options
self
.
encoding
=
locale
.
getpreferredencoding
()
if
self
.
encoding
is
None
:
# This happens on Jython!
self
.
encoding
=
sys
.
stdin
.
encoding
...
...
@@ -502,21 +579,42 @@ class GPG(object):
if
p
.
returncode
!=
0
:
raise
ValueError
(
"
Error invoking gpg: %s: %s
"
%
(
p
.
returncode
,
result
.
stderr
))
m
=
VERSION_RE
.
match
(
result
.
data
)
if
not
m
:
self
.
version
=
None
else
:
dot
=
'
.
'
.
encode
(
'
utf-8
'
)
self
.
version
=
tuple
([
int
(
s
)
for
s
in
m
.
groups
()[
0
].
split
(
dot
)])
def
_open_subprocess
(
self
,
args
,
passphrase
=
False
):
# Internal method: open a pipe to a GPG subprocess and return
# the file objects for communicating with it.
def
make_args
(
self
,
args
,
passphrase
):
"""
Make a list of command line elements for GPG. The value of ``args``
will be appended. The ``passphrase`` argument needs to be True if
a passphrase will be sent to GPG, else False.
"""
cmd
=
[
self
.
gpgbinary
,
'
--status-fd 2 --no-tty
'
]
if
self
.
gnupghome
:
cmd
.
append
(
'
--homedir
"
%s
"
'
%
self
.
gnupghome
)
cmd
.
append
(
'
--homedir
"
%s
"'
%
self
.
gnupghome
)
if
self
.
keyring
:
cmd
.
append
(
'
--no-default-keyring --keyring
"
%s
"
'
%
self
.
keyring
)
cmd
.
append
(
'
--no-default-keyring
'
)
for
fn
in
self
.
keyring
:
cmd
.
append
(
'
--keyring
"
%s
"'
%
fn
)
if
self
.
secret_keyring
:
for
fn
in
self
.
secret_keyring
:
cmd
.
append
(
'
--secret-keyring
"
%s
"'
%
fn
)
if
passphrase
:
cmd
.
append
(
'
--batch --passphrase-fd 0
'
)
if
self
.
use_agent
:
cmd
.
append
(
'
--use-agent
'
)
if
self
.
options
:
cmd
.
extend
(
self
.
options
)
cmd
.
extend
(
args
)
cmd
=
'
'
.
join
(
cmd
)
return
cmd
def
_open_subprocess
(
self
,
args
,
passphrase
=
False
):
# Internal method: open a pipe to a GPG subprocess and return
# the file objects for communicating with it.
cmd
=
'
'
.
join
(
self
.
make_args
(
args
,
passphrase
))
if
self
.
verbose
:
print
(
cmd
)
logger
.
debug
(
"
%s
"
,
cmd
)
...
...
@@ -597,7 +695,7 @@ class GPG(object):
stderr
.
close
()
stdout
.
close
()
def
_handle_io
(
self
,
args
,
file
,
result
,
passphrase
=
None
,
binary
=
False
):
def
_handle_io
(
self
,
args
,
file
obj
,
result
,
passphrase
=
None
,
binary
=
False
):
"
Handle a call to GPG - pass input data, collect output data
"
# Handle a basic data call - pass data to GPG, handle the output
# including status information. Garbage In, Garbage Out :)
...
...
@@ -608,7 +706,7 @@ class GPG(object):
stdin
=
p
.
stdin
if
passphrase
:
_write_passphrase
(
stdin
,
passphrase
,
self
.
encoding
)
writer
=
_threaded_copy_data
(
file
,
stdin
)
writer
=
_threaded_copy_data
(
file
obj
,
stdin
)
self
.
_collect_output
(
p
,
result
,
writer
,
stdin
)
return
result
...
...
@@ -758,10 +856,14 @@ class GPG(object):
def
recv_keys
(
self
,
keyserver
,
*
keyids
):
"""
Import a key from a keyserver
The doctest assertion is skipped in Jython because security permissions
may prevent the recv_keys from succeeding.
>>>
import
shutil
>>>
shutil
.
rmtree
(
"
keys
"
)
>>>
gpg
=
GPG
(
gnupghome
=
"
keys
"
)
>>>
result
=
gpg
.
recv_keys
(
'
pgp.mit.edu
'
,
'
3FF0DB166A7476EA
'
)
>>>
os
.
chmod
(
'
keys
'
,
0x1C0
)
>>>
result
=
gpg
.
recv_keys
(
'
keyserver.ubuntu.com
'
,
'
92905378
'
)
>>>
assert
result
"""
...
...
@@ -878,7 +980,8 @@ class GPG(object):
parms
=
{}
for
key
,
val
in
list
(
kwargs
.
items
()):
key
=
key
.
replace
(
'
_
'
,
'
-
'
).
title
()
parms
[
key
]
=
val
if
str
(
val
).
strip
():
# skip empty strings
parms
[
key
]
=
val
parms
.
setdefault
(
'
Key-Type
'
,
'
RSA
'
)
parms
.
setdefault
(
'
Key-Length
'
,
1024
)
parms
.
setdefault
(
'
Name-Real
'
,
"
Autogenerated Key
"
)
...
...
@@ -971,9 +1074,6 @@ class GPG(object):
'
hello
'
>>>
result
=
gpg
.
encrypt
(
"
hello again
"
,
print1
)
>>>
message
=
str
(
result
)
>>>
result
=
gpg
.
decrypt
(
message
)
>>>
result
.
status
==
'
need passphrase
'
True
>>>
result
=
gpg
.
decrypt
(
message
,
passphrase
=
'
bar
'
)
>>>
result
.
status
in
(
'
decryption failed
'
,
'
bad passphrase
'
)
True
...
...
@@ -983,9 +1083,6 @@ class GPG(object):
True
>>>
str
(
result
)
'
hello again
'
>>>
result
=
gpg
.
encrypt
(
"
signed hello
"
,
print2
,
sign
=
print1
)
>>>
result
.
status
==
'
need passphrase
'
True
>>>
result
=
gpg
.
encrypt
(
"
signed hello
"
,
print2
,
sign
=
print1
,
passphrase
=
'
foo
'
)
>>>
result
.
status
==
'
encryption ok
'
True
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment