Skip to content
GitLab
Explore
Sign in
Register
Primary navigation
Search or go to…
Project
gajim
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Model registry
Operate
Environments
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
Erik Huelsmann
gajim
Commits
2236aef0
Commit
2236aef0
authored
10 years ago
by
Yann Leboulanger
Browse files
Options
Downloads
Patches
Plain Diff
update to latest gnupg.py
parent
51dc297c
No related branches found
No related tags found
No related merge requests found
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
src/common/gnupg.py
+323
-106
323 additions, 106 deletions
src/common/gnupg.py
with
323 additions
and
106 deletions
src/common/gnupg.py
+
323
−
106
View file @
2236aef0
...
...
@@ -31,11 +31,10 @@ Modifications Copyright (C) 2008-2014 Vinay Sajip. All rights reserved.
A unittest harness (test_gnupg.py) has also been added.
"""
import
locale
__version__
=
"
0.3.
4
"
__version__
=
"
0.3.
8.dev0
"
__author__
=
"
Vinay Sajip
"
__date__
=
"
$0
5-Jun
-2014
09:48:54
$
"
__date__
=
"
$0
7-Dec
-2014
18:46:17
$
"
try
:
from
io
import
StringIO
...
...
@@ -53,6 +52,13 @@ from subprocess import PIPE
import
sys
import
threading
STARTUPINFO
=
None
if
os
.
name
==
'
nt
'
:
try
:
from
subprocess
import
STARTUPINFO
,
STARTF_USESHOWWINDOW
,
SW_HIDE
except
ImportError
:
STARTUPINFO
=
None
try
:
import
logging.NullHandler
as
NullHandler
except
ImportError
:
...
...
@@ -73,6 +79,50 @@ logger = logging.getLogger('gajim.c.gnupg')
if
not
logger
.
handlers
:
logger
.
addHandler
(
NullHandler
())
# We use the test below because it works for Jython as well as CPython
if
os
.
path
.
__name__
==
'
ntpath
'
:
# On Windows, we don't need shell quoting, other than worrying about
# paths with spaces in them.
def
shell_quote
(
s
):
return
'"
%s
"'
%
s
else
:
# Section copied from sarge
# This regex determines which shell input needs quoting
# because it may be unsafe
UNSAFE
=
re
.
compile
(
r
'
[^\w%+,./:=@-]
'
)
def
shell_quote
(
s
):
"""
Quote text so that it is safe for Posix command shells.
For example,
"
*.py
"
would be converted to
"'
*.py
'"
. If the text is
considered safe it is returned unquoted.
:param s: The value to quote
:type s: str (or unicode on 2.x)
:return: A safe version of the input, from the point of view of Posix
command shells
:rtype: The passed-in type
"""
if
not
isinstance
(
s
,
string_types
):
raise
TypeError
(
'
Expected string type, got %s
'
%
type
(
s
))
if
not
s
:
result
=
"''"
elif
not
UNSAFE
.
search
(
s
):
result
=
s
else
:
result
=
"'
%s
'"
%
s
.
replace
(
"'"
,
r
"'
\'
'"
)
return
result
# end of sarge code
# Now that we use shell=False, we shouldn't need to quote arguments.
# Use no_quote instead of shell_quote to remind us of where quoting
# was needed.
def
no_quote
(
s
):
return
s
def
_copy_data
(
instream
,
outstream
):
# Copy one stream to another
sent
=
0
...
...
@@ -112,11 +162,19 @@ def _write_passphrase(stream, passphrase, encoding):
passphrase
=
'
%s
\n
'
%
passphrase
passphrase
=
passphrase
.
encode
(
encoding
)
stream
.
write
(
passphrase
)
logger
.
debug
(
"
Wrote passphrase
: %r
"
,
passphrase
)
logger
.
debug
(
'
Wrote passphrase
'
)
def
_is_sequence
(
instance
):
return
isinstance
(
instance
,
(
list
,
tuple
,
set
,
frozenset
))
def
_make_memory_stream
(
s
):
try
:
from
io
import
BytesIO
rv
=
BytesIO
(
s
)
except
ImportError
:
rv
=
StringIO
(
s
)
return
rv
def
_make_binary_stream
(
s
,
encoding
):
if
_py3k
:
if
isinstance
(
s
,
str
):
...
...
@@ -124,12 +182,7 @@ def _make_binary_stream(s, encoding):
else
:
if
type
(
s
)
is
not
str
:
s
=
s
.
encode
(
encoding
)
try
:
from
io
import
BytesIO
rv
=
BytesIO
(
s
)
except
ImportError
:
rv
=
StringIO
(
s
)
return
rv
return
_make_memory_stream
(
s
)
class
Verify
(
object
):
"
Handle status messages for --verify
"
...
...
@@ -175,7 +228,8 @@ class Verify(object):
"
PLAINTEXT_LENGTH
"
,
"
POLICY_URL
"
,
"
DECRYPTION_INFO
"
,
"
DECRYPTION_OKAY
"
,
"
INV_SGNR
"
,
"
FILE_START
"
,
"
FILE_ERROR
"
,
"
FILE_DONE
"
,
"
PKA_TRUST_GOOD
"
,
"
PKA_TRUST_BAD
"
,
"
BADMDC
"
,
"
GOODMDC
"
,
"
NO_SGNR
"
,
"
NOTATION_NAME
"
,
"
NOTATION_DATA
"
):
"
GOODMDC
"
,
"
NO_SGNR
"
,
"
NOTATION_NAME
"
,
"
NOTATION_DATA
"
,
"
PROGRESS
"
):
pass
elif
key
==
"
BADSIG
"
:
self
.
valid
=
False
...
...
@@ -230,6 +284,10 @@ class Verify(object):
else
:
self
.
key_status
=
'
signing key was revoked
'
self
.
status
=
self
.
key_status
elif
key
==
"
UNEXPECTED
"
:
self
.
valid
=
False
self
.
key_id
=
value
self
.
status
=
'
unexpected data
'
else
:
raise
ValueError
(
"
Unknown status message: %r
"
%
key
)
...
...
@@ -298,8 +356,8 @@ class ImportResult(object):
'
problem
'
:
reason
,
'
text
'
:
self
.
problem_reason
[
reason
]})
elif
key
==
"
IMPORT_RES
"
:
import_res
=
value
.
split
()
for
i
in
range
(
len
(
self
.
counts
)
)
:
setattr
(
self
,
self
.
count
s
[
i
]
,
int
(
import_res
[
i
]))
for
i
,
count
in
enumerate
(
self
.
counts
):
setattr
(
self
,
count
,
int
(
import_res
[
i
]))
elif
key
==
"
KEYEXPIRED
"
:
self
.
results
.
append
({
'
fingerprint
'
:
None
,
'
problem
'
:
'
0
'
,
'
text
'
:
'
Key expired
'
})
...
...
@@ -326,7 +384,53 @@ BASIC_ESCAPES = {
r
'
\0
'
:
'
\0
'
,
}
class
ListKeys
(
list
):
class
SendResult
(
object
):
def
__init__
(
self
,
gpg
):
self
.
gpg
=
gpg
def
handle_status
(
self
,
key
,
value
):
logger
.
debug
(
'
SendResult: %s: %s
'
,
key
,
value
)
class
SearchKeys
(
list
):
'''
Handle status messages for --search-keys.
Handle pub and uid (relating the latter to the former).
Don
'
t care about the rest
'''
UID_INDEX
=
1
FIELDS
=
'
type keyid algo length date expires
'
.
split
()
def
__init__
(
self
,
gpg
):
self
.
gpg
=
gpg
self
.
curkey
=
None
self
.
fingerprints
=
[]
self
.
uids
=
[]
def
get_fields
(
self
,
args
):
result
=
{}
for
i
,
var
in
enumerate
(
self
.
FIELDS
):
result
[
var
]
=
args
[
i
]
result
[
'
uids
'
]
=
[]
return
result
def
pub
(
self
,
args
):
self
.
curkey
=
curkey
=
self
.
get_fields
(
args
)
self
.
append
(
curkey
)
def
uid
(
self
,
args
):
uid
=
args
[
self
.
UID_INDEX
]
uid
=
ESCAPE_PATTERN
.
sub
(
lambda
m
:
chr
(
int
(
m
.
group
(
1
),
16
)),
uid
)
for
k
,
v
in
BASIC_ESCAPES
.
items
():
uid
=
uid
.
replace
(
k
,
v
)
self
.
curkey
[
'
uids
'
].
append
(
uid
)
self
.
uids
.
append
(
uid
)
def
handle_status
(
self
,
key
,
value
):
pass
class
ListKeys
(
SearchKeys
):
'''
Handle status messages for --list-keys.
Handle pub and uid (relating the latter to the former).
...
...
@@ -343,25 +447,17 @@ class ListKeys(list):
grp = reserved for gpgsm
rvk = revocation key
'''
def
__init__
(
self
,
gpg
):
self
.
gpg
=
gpg
self
.
curkey
=
None
self
.
fingerprints
=
[]
self
.
uids
=
[]
UID_INDEX
=
9
FIELDS
=
'
type trust length algo keyid date expires dummy ownertrust uid
'
.
split
()
def
key
(
self
,
args
):
vars
=
(
"""
type trust length algo keyid date expires dummy ownertrust uid
"""
).
split
()
self
.
curkey
=
{}
for
i
in
range
(
len
(
vars
)):
self
.
curkey
[
vars
[
i
]]
=
args
[
i
]
self
.
curkey
[
'
uids
'
]
=
[]
if
self
.
curkey
[
'
uid
'
]:
self
.
curkey
[
'
uids
'
].
append
(
self
.
curkey
[
'
uid
'
])
del
self
.
curkey
[
'
uid
'
]
self
.
curkey
[
'
subkeys
'
]
=
[]
self
.
append
(
self
.
curkey
)
self
.
curkey
=
curkey
=
self
.
get_fields
(
args
)
if
curkey
[
'
uid
'
]:
curkey
[
'
uids
'
].
append
(
curkey
[
'
uid
'
])
del
curkey
[
'
uid
'
]
curkey
[
'
subkeys
'
]
=
[]
self
.
append
(
curkey
)
pub
=
sec
=
key
...
...
@@ -369,22 +465,34 @@ class ListKeys(list):
self
.
curkey
[
'
fingerprint
'
]
=
args
[
9
]
self
.
fingerprints
.
append
(
args
[
9
])
def
uid
(
self
,
args
):
uid
=
args
[
9
]
uid
=
ESCAPE_PATTERN
.
sub
(
lambda
m
:
chr
(
int
(
m
.
group
(
1
),
16
)),
uid
)
for
k
,
v
in
BASIC_ESCAPES
.
items
():
uid
=
uid
.
replace
(
k
,
v
)
self
.
curkey
[
'
uids
'
].
append
(
uid
)
self
.
uids
.
append
(
uid
)
def
sub
(
self
,
args
):
subkey
=
[
args
[
4
],
args
[
11
]]
self
.
curkey
[
'
subkeys
'
].
append
(
subkey
)
def
handle_status
(
self
,
key
,
value
):
pass
class
Crypt
(
Verify
):
class
ScanKeys
(
ListKeys
):
'''
Handle status messages for --with-fingerprint.
'''
def
sub
(
self
,
args
):
# --with-fingerprint --with-colons somehow outputs fewer colons,
# use the last value args[-1] instead of args[11]
subkey
=
[
args
[
4
],
args
[
-
1
]]
self
.
curkey
[
'
subkeys
'
].
append
(
subkey
)
class
TextHandler
(
object
):
def
_as_text
(
self
):
return
self
.
data
.
decode
(
self
.
gpg
.
encoding
,
self
.
gpg
.
decode_errors
)
if
_py3k
:
__str__
=
_as_text
else
:
__unicode__
=
_as_text
def
__str__
(
self
):
return
self
.
data
class
Crypt
(
Verify
,
TextHandler
):
"
Handle status messages for --encrypt and --decrypt
"
def
__init__
(
self
,
gpg
):
Verify
.
__init__
(
self
,
gpg
)
...
...
@@ -398,19 +506,16 @@ class Crypt(Verify):
__bool__
=
__nonzero__
def
__str__
(
self
):
return
self
.
data
.
decode
(
self
.
gpg
.
encoding
,
self
.
gpg
.
decode_errors
)
def
handle_status
(
self
,
key
,
value
):
if
key
in
(
"
ENC_TO
"
,
"
USERID_HINT
"
,
"
GOODMDC
"
,
"
END_DECRYPTION
"
,
"
BEGIN_SIGNING
"
,
"
NO_SECKEY
"
,
"
ERROR
"
,
"
NODATA
"
,
"
BEGIN_SIGNING
"
,
"
NO_SECKEY
"
,
"
ERROR
"
,
"
NODATA
"
,
"
PROGRESS
"
,
"
CARDCTRL
"
,
"
BADMDC
"
,
"
SC_OP_FAILURE
"
,
"
SC_OP_SUCCESS
"
):
# in the case of ERROR, this is because a more specific error
# message will have come first
pass
elif
key
in
(
"
NEED_PASSPHRASE
"
,
"
BAD_PASSPHRASE
"
,
"
GOOD_PASSPHRASE
"
,
"
MISSING_PASSPHRASE
"
,
"
DECRYPTION_FAILED
"
,
"
KEY_NOT_CREATED
"
):
"
KEY_NOT_CREATED
"
,
"
NEED_PASSPHRASE_PIN
"
):
self
.
status
=
key
.
replace
(
"
_
"
,
"
"
).
lower
()
elif
key
==
"
NEED_PASSPHRASE_SYM
"
:
self
.
status
=
'
need symmetric passphrase
'
...
...
@@ -487,31 +592,29 @@ class DeleteResult(object):
__bool__
=
__nonzero__
class
Sign
(
object
):
class
Sign
(
TextHandler
):
"
Handle status messages for --sign
"
def
__init__
(
self
,
gpg
):
self
.
gpg
=
gpg
self
.
type
=
None
self
.
hash_algo
=
None
self
.
fingerprint
=
None
self
.
status
=
''
def
__nonzero__
(
self
):
return
self
.
fingerprint
is
not
None
__bool__
=
__nonzero__
def
__str__
(
self
):
return
self
.
data
.
decode
(
self
.
gpg
.
encoding
,
self
.
gpg
.
decode_errors
)
def
handle_status
(
self
,
key
,
value
):
if
key
in
(
"
USERID_HINT
"
,
"
NEED_PASSPHRASE
"
,
"
BAD_PASSPHRASE
"
,
"
GOOD_PASSPHRASE
"
,
"
BEGIN_SIGNING
"
,
"
CARDCTRL
"
,
"
INV_SGNR
"
,
"
KEYREVOKED
"
,
"
NO_SGNR
"
,
"
MISSING_PASSPHRASE
"
,
"
SC_OP_FAILURE
"
,
"
SC_OP_SUCCESS
"
):
"
NO_SGNR
"
,
"
MISSING_PASSPHRASE
"
,
"
NEED_PASSPHRASE_PIN
"
,
"
SC_OP_FAILURE
"
,
"
SC_OP_SUCCESS
"
,
"
PROGRESS
"
):
pass
elif
key
in
(
"
KEYEXPIRED
"
,
"
SIGEXPIRED
"
):
self
.
status
=
'
key expired
'
elif
key
==
"
KEYREVOKED
"
:
self
.
status
=
'
key revoked
'
elif
key
==
"
SIG_CREATED
"
:
(
self
.
type
,
algo
,
self
.
hash_algo
,
cls
,
...
...
@@ -520,7 +623,8 @@ class Sign(object):
else
:
raise
ValueError
(
"
Unknown status message: %r
"
%
key
)
VERSION_RE
=
re
.
compile
(
r
'
gpg \(GnuPG\) (\d+(\.\d+)*)
'
.
encode
(
'
utf-8
'
),
re
.
I
)
VERSION_RE
=
re
.
compile
(
r
'
gpg \(GnuPG\) (\d+(\.\d+)*)
'
.
encode
(
'
ascii
'
),
re
.
I
)
HEX_DIGITS_RE
=
re
.
compile
(
r
'
[0-9a-f]+$
'
,
re
.
I
)
class
GPG
(
object
):
...
...
@@ -531,7 +635,10 @@ class GPG(object):
'
delete
'
:
DeleteResult
,
'
generate
'
:
GenKey
,
'
import
'
:
ImportResult
,
'
send
'
:
SendResult
,
'
list
'
:
ListKeys
,
'
scan
'
:
ScanKeys
,
'
search
'
:
SearchKeys
,
'
sign
'
:
Sign
,
'
verify
'
:
Verify
,
}
...
...
@@ -571,9 +678,11 @@ class GPG(object):
if
isinstance
(
options
,
str
):
options
=
[
options
]
self
.
options
=
options
self
.
encoding
=
locale
.
getpreferredencoding
()
if
self
.
encoding
is
None
:
# This happens on Jython!
self
.
encoding
=
sys
.
stdin
.
encoding
# Changed in 0.3.7 to use Latin-1 encoding rather than
# locale.getpreferredencoding falling back to sys.stdin.encoding
# falling back to utf-8, because gpg itself uses latin-1 as the default
# encoding.
self
.
encoding
=
'
latin-1
'
if
gnupghome
and
not
os
.
path
.
isdir
(
self
.
gnupghome
):
os
.
makedirs
(
self
.
gnupghome
,
0x1C0
)
p
=
self
.
_open_subprocess
([
"
--version
"
])
...
...
@@ -586,7 +695,7 @@ class GPG(object):
if
not
m
:
self
.
version
=
None
else
:
dot
=
'
.
'
.
encode
(
'
utf-8
'
)
dot
=
'
.
'
.
encode
(
'
ascii
'
)
self
.
version
=
tuple
([
int
(
s
)
for
s
in
m
.
groups
()[
0
].
split
(
dot
)])
def
make_args
(
self
,
args
,
passphrase
):
...
...
@@ -595,18 +704,18 @@ class GPG(object):
will be appended. The ``passphrase`` argument needs to be True if
a passphrase will be sent to GPG, else False.
"""
cmd
=
[
self
.
gpgbinary
,
'
--status-fd
2
--no-tty
'
]
cmd
=
[
self
.
gpgbinary
,
'
--status-fd
'
,
'
2
'
,
'
--no-tty
'
]
if
self
.
gnupghome
:
cmd
.
app
end
(
'
--homedir
"
%s
"'
%
self
.
gnupghome
)
cmd
.
ext
end
(
[
'
--homedir
'
,
no_quote
(
self
.
gnupghome
)
])
if
self
.
keyring
:
cmd
.
append
(
'
--no-default-keyring
'
)
for
fn
in
self
.
keyring
:
cmd
.
app
end
(
'
--keyring
"
%s
"'
%
fn
)
cmd
.
ext
end
(
[
'
--keyring
'
,
no_quote
(
fn
)
])
if
self
.
secret_keyring
:
for
fn
in
self
.
secret_keyring
:
cmd
.
app
end
(
'
--secret-keyring
"
%s
"'
%
fn
)
cmd
.
ext
end
(
[
'
--secret-keyring
'
,
no_quote
(
fn
)
])
if
passphrase
:
cmd
.
app
end
(
'
--batch
--passphrase-fd
0
'
)
cmd
.
ext
end
(
[
'
--batch
'
,
'
--passphrase-fd
'
,
'
0
'
]
)
if
self
.
use_agent
:
cmd
.
append
(
'
--use-agent
'
)
if
self
.
options
:
...
...
@@ -617,11 +726,19 @@ class GPG(object):
def
_open_subprocess
(
self
,
args
,
passphrase
=
False
):
# Internal method: open a pipe to a GPG subprocess and return
# the file objects for communicating with it.
cmd
=
'
'
.
join
(
self
.
make_args
(
args
,
passphrase
)
)
cmd
=
self
.
make_args
(
args
,
passphrase
)
if
self
.
verbose
:
print
(
cmd
)
pcmd
=
'
'
.
join
(
cmd
)
print
(
pcmd
)
logger
.
debug
(
"
%s
"
,
cmd
)
return
Popen
(
cmd
,
shell
=
True
,
stdin
=
PIPE
,
stdout
=
PIPE
,
stderr
=
PIPE
)
if
not
STARTUPINFO
:
si
=
None
else
:
si
=
STARTUPINFO
()
si
.
dwFlags
=
STARTF_USESHOWWINDOW
si
.
wShowWindow
=
SW_HIDE
return
Popen
(
cmd
,
shell
=
False
,
stdin
=
PIPE
,
stdout
=
PIPE
,
stderr
=
PIPE
,
startupinfo
=
si
)
def
_read_response
(
self
,
stream
,
result
):
# Internal method: reads all the stderr output from GPG, taking notice
...
...
@@ -723,8 +840,15 @@ class GPG(object):
f
.
close
()
return
result
def
set_output_without_confirmation
(
self
,
args
,
output
):
"
If writing to a file which exists, avoid a confirmation message.
"
if
os
.
path
.
exists
(
output
):
# We need to avoid an overwrite confirmation message
args
.
extend
([
'
--batch
'
,
'
--yes
'
])
args
.
extend
([
'
--output
'
,
output
])
def
sign_file
(
self
,
file
,
keyid
=
None
,
passphrase
=
None
,
clearsign
=
True
,
detach
=
False
,
binary
=
False
):
detach
=
False
,
binary
=
False
,
output
=
None
):
"""
sign file
"""
logger
.
debug
(
"
sign_file: %s
"
,
file
)
if
binary
:
...
...
@@ -738,7 +862,10 @@ class GPG(object):
elif
clearsign
:
args
.
append
(
"
--clearsign
"
)
if
keyid
:
args
.
append
(
'
--default-key
"
%s
"'
%
keyid
)
args
.
extend
([
'
--default-key
'
,
no_quote
(
keyid
)])
if
output
:
# write the output to a file with the specified name
self
.
set_output_without_confirmation
(
args
,
output
)
result
=
self
.
result_map
[
'
sign
'
](
self
)
#We could use _handle_io here except for the fact that if the
#passphrase is bad, gpg bails and you can't write the message.
...
...
@@ -790,8 +917,8 @@ class GPG(object):
logger
.
debug
(
'
Wrote to temp file: %r
'
,
s
)
os
.
write
(
fd
,
s
)
os
.
close
(
fd
)
args
.
append
(
fn
)
args
.
append
(
'"
%s
"'
%
data_filename
)
args
.
append
(
no_quote
(
fn
)
)
args
.
append
(
no_quote
(
data_filename
)
)
try
:
p
=
self
.
_open_subprocess
(
args
)
self
.
_collect_output
(
p
,
result
,
stdin
=
p
.
stdin
)
...
...
@@ -799,6 +926,15 @@ class GPG(object):
os
.
unlink
(
fn
)
return
result
def
verify_data
(
self
,
sig_filename
,
data
):
"
Verify the signature in sig_filename against data in memory
"
logger
.
debug
(
'
verify_data: %r, %r ...
'
,
sig_filename
,
data
[:
16
])
result
=
self
.
result_map
[
'
verify
'
](
self
)
args
=
[
'
--verify
'
,
no_quote
(
sig_filename
),
'
-
'
]
stream
=
_make_memory_stream
(
data
)
self
.
_handle_io
(
args
,
stream
,
result
,
binary
=
True
)
return
result
#
# KEY MANAGEMENT
#
...
...
@@ -859,9 +995,6 @@ class GPG(object):
def
recv_keys
(
self
,
keyserver
,
*
keyids
):
"""
Import a key from a keyserver
The doctest assertion is skipped in Jython because security permissions
may prevent the recv_keys from succeeding.
>>>
import
shutil
>>>
shutil
.
rmtree
(
"
keys
"
)
>>>
gpg
=
GPG
(
gnupghome
=
"
keys
"
)
...
...
@@ -874,33 +1007,60 @@ class GPG(object):
logger
.
debug
(
'
recv_keys: %r
'
,
keyids
)
data
=
_make_binary_stream
(
""
,
self
.
encoding
)
#data = ""
args
=
[
'
--keyserver
'
,
keyserver
,
'
--recv-keys
'
]
args
.
extend
(
keyids
)
args
=
[
'
--keyserver
'
,
no_quote
(
keyserver
)
,
'
--recv-keys
'
]
args
.
extend
(
[
no_quote
(
k
)
for
k
in
keyids
]
)
self
.
_handle_io
(
args
,
data
,
result
,
binary
=
True
)
logger
.
debug
(
'
recv_keys result: %r
'
,
result
.
__dict__
)
data
.
close
()
return
result
def
send_keys
(
self
,
keyserver
,
*
keyids
):
"""
Send a key to a keyserver.
Note: it
'
s not practical to test this function without sending
arbitrary data to live keyservers.
"""
result
=
self
.
result_map
[
'
send
'
](
self
)
logger
.
debug
(
'
send_keys: %r
'
,
keyids
)
data
=
_make_binary_stream
(
''
,
self
.
encoding
)
#data = ""
args
=
[
'
--keyserver
'
,
no_quote
(
keyserver
),
'
--send-keys
'
]
args
.
extend
([
no_quote
(
k
)
for
k
in
keyids
])
self
.
_handle_io
(
args
,
data
,
result
,
binary
=
True
)
logger
.
debug
(
'
send_keys result: %r
'
,
result
.
__dict__
)
data
.
close
()
return
result
def
delete_keys
(
self
,
fingerprints
,
secret
=
False
):
which
=
'
key
'
if
secret
:
which
=
'
secret-key
'
if
_is_sequence
(
fingerprints
):
fingerprints
=
'
'
.
join
(
fingerprints
)
args
=
[
'
--batch --delete-%s
"
%s
"'
%
(
which
,
fingerprints
)]
fingerprints
=
[
no_quote
(
s
)
for
s
in
fingerprints
]
else
:
fingerprints
=
[
no_quote
(
fingerprints
)]
args
=
[
'
--batch
'
,
'
--delete-%s
'
%
which
]
args
.
extend
(
fingerprints
)
result
=
self
.
result_map
[
'
delete
'
](
self
)
p
=
self
.
_open_subprocess
(
args
)
self
.
_collect_output
(
p
,
result
,
stdin
=
p
.
stdin
)
return
result
def
export_keys
(
self
,
keyids
,
secret
=
False
):
def
export_keys
(
self
,
keyids
,
secret
=
False
,
armor
=
True
,
minimal
=
False
):
"
export the indicated keys.
'
keyid
'
is anything gpg accepts
"
which
=
''
if
secret
:
which
=
'
-secret-key
'
if
_is_sequence
(
keyids
):
keyids
=
'
'
.
join
([
'"
%s
"'
%
k
for
k
in
keyids
])
args
=
[
"
--armor --export%s %s
"
%
(
which
,
keyids
)]
keyids
=
[
no_quote
(
k
)
for
k
in
keyids
]
else
:
keyids
=
[
no_quote
(
keyids
)]
args
=
[
'
--export%s
'
%
which
]
if
armor
:
args
.
append
(
'
--armor
'
)
if
minimal
:
args
.
extend
([
'
--export-options
'
,
'
export-minimal
'
])
args
.
extend
(
keyids
)
p
=
self
.
_open_subprocess
(
args
)
# gpg --export produces no status-fd output; stdout will be
# empty in case of failure
...
...
@@ -910,6 +1070,27 @@ class GPG(object):
logger
.
debug
(
'
export_keys result: %r
'
,
result
.
data
)
return
result
.
data
.
decode
(
self
.
encoding
,
self
.
decode_errors
)
def
_get_list_output
(
self
,
p
,
kind
):
# Get the response information
result
=
self
.
result_map
[
kind
](
self
)
self
.
_collect_output
(
p
,
result
,
stdin
=
p
.
stdin
)
lines
=
result
.
data
.
decode
(
self
.
encoding
,
self
.
decode_errors
).
splitlines
()
valid_keywords
=
'
pub uid sec fpr sub
'
.
split
()
for
line
in
lines
:
if
self
.
verbose
:
print
(
line
)
logger
.
debug
(
"
line: %r
"
,
line
.
rstrip
())
if
not
line
:
break
L
=
line
.
strip
().
split
(
'
:
'
)
if
not
L
:
continue
keyword
=
L
[
0
]
if
keyword
in
valid_keywords
:
getattr
(
result
,
keyword
)(
L
)
return
result
def
list_keys
(
self
,
secret
=
False
):
"""
list the keys currently in the keyring
...
...
@@ -930,25 +1111,58 @@ class GPG(object):
which
=
'
keys
'
if
secret
:
which
=
'
secret-keys
'
args
=
"
--list-%s
--fixed-list-mode
--fingerprint
--with-colons
"
%
(
which
,)
args
=
[
args
]
args
=
[
"
--list-%s
"
%
which
,
"
--fixed-list-mode
"
,
"
--fingerprint
"
,
"
--with-colons
"
]
p
=
self
.
_open_subprocess
(
args
)
return
self
.
_get_list_output
(
p
,
'
list
'
)
# there might be some status thingumy here I should handle... (amk)
# ...nope, unless you care about expired sigs or keys (stevegt)
def
scan_keys
(
self
,
filename
):
"""
List details of an ascii armored or binary key file
without first importing it to the local keyring.
The function achieves this by running:
$ gpg --with-fingerprint --with-colons filename
"""
args
=
[
'
--with-fingerprint
'
,
'
--with-colons
'
]
args
.
append
(
no_quote
(
filename
))
p
=
self
.
_open_subprocess
(
args
)
return
self
.
_get_list_output
(
p
,
'
scan
'
)
def
search_keys
(
self
,
query
,
keyserver
=
'
pgp.mit.edu
'
):
"""
search keyserver by query (using --search-keys option)
>>>
import
shutil
>>>
shutil
.
rmtree
(
'
keys
'
)
>>>
gpg
=
GPG
(
gnupghome
=
'
keys
'
)
>>>
os
.
chmod
(
'
keys
'
,
0x1C0
)
>>>
result
=
gpg
.
search_keys
(
'
<vinay_sajip@hotmail.com>
'
)
>>>
assert
result
,
'
Failed using default keyserver
'
>>>
keyserver
=
'
keyserver.ubuntu.com
'
>>>
result
=
gpg
.
search_keys
(
'
<vinay_sajip@hotmail.com>
'
,
keyserver
)
>>>
assert
result
,
'
Failed using keyserver.ubuntu.com
'
"""
query
=
query
.
strip
()
if
HEX_DIGITS_RE
.
match
(
query
):
query
=
'
0x
'
+
query
args
=
[
'
--fixed-list-mode
'
,
'
--fingerprint
'
,
'
--with-colons
'
,
'
--keyserver
'
,
no_quote
(
keyserver
),
'
--search-keys
'
,
no_quote
(
query
)]
p
=
self
.
_open_subprocess
(
args
)
# Get the response information
result
=
self
.
result_map
[
'
list
'
](
self
)
result
=
self
.
result_map
[
'
search
'
](
self
)
self
.
_collect_output
(
p
,
result
,
stdin
=
p
.
stdin
)
lines
=
result
.
data
.
decode
(
self
.
encoding
,
self
.
decode_errors
).
splitlines
()
valid_keywords
=
'
pub
uid sec fpr sub
'
.
split
()
valid_keywords
=
[
'
pub
'
,
'
uid
'
]
for
line
in
lines
:
if
self
.
verbose
:
print
(
line
)
logger
.
debug
(
"
line: %r
"
,
line
.
rstrip
())
if
not
line
:
break
logger
.
debug
(
'
line: %r
'
,
line
.
rstrip
())
if
not
line
:
# sometimes get blank lines on Windows
continue
L
=
line
.
strip
().
split
(
'
:
'
)
if
not
L
:
continue
...
...
@@ -969,7 +1183,7 @@ class GPG(object):
>>>
assert
not
result
"""
args
=
[
"
--gen-key
--batch
"
]
args
=
[
"
--gen-key
"
,
"
--batch
"
]
result
=
self
.
result_map
[
'
generate
'
](
self
)
f
=
_make_binary_stream
(
input
,
self
.
encoding
)
self
.
_handle_io
(
args
,
f
,
result
,
binary
=
True
)
...
...
@@ -986,9 +1200,8 @@ class GPG(object):
if
str
(
val
).
strip
():
# skip empty strings
parms
[
key
]
=
val
parms
.
setdefault
(
'
Key-Type
'
,
'
RSA
'
)
parms
.
setdefault
(
'
Key-Length
'
,
1024
)
parms
.
setdefault
(
'
Key-Length
'
,
2048
)
parms
.
setdefault
(
'
Name-Real
'
,
"
Autogenerated Key
"
)
parms
.
setdefault
(
'
Name-Comment
'
,
"
Generated by gnupg.py
"
)
try
:
logname
=
os
.
environ
[
'
LOGNAME
'
]
except
KeyError
:
...
...
@@ -1033,23 +1246,30 @@ class GPG(object):
"
Encrypt the message read from the file-like object
'
file
'"
args
=
[
'
--encrypt
'
]
if
symmetric
:
# can't be False or None - could be True or a cipher algo value
# such as AES256
args
=
[
'
--symmetric
'
]
if
symmetric
is
not
True
:
args
.
extend
([
'
--cipher-algo
'
,
no_quote
(
symmetric
)])
# else use the default, currently CAST5
else
:
args
=
[
'
--encrypt
'
]
if
not
recipients
:
raise
ValueError
(
'
No recipients specified with asymmetric
'
'
encryption
'
)
if
not
_is_sequence
(
recipients
):
recipients
=
(
recipients
,)
for
recipient
in
recipients
:
args
.
app
end
(
'
--recipient
"
%s
"'
%
recipient
)
if
armor
:
# create ascii-armored output -
set to
False for binary output
args
.
ext
end
(
[
'
--recipient
'
,
no_quote
(
recipient
)
])
if
armor
:
# create ascii-armored output - False for binary output
args
.
append
(
'
--armor
'
)
if
output
:
# write the output to a file with the specified name
if
os
.
path
.
exists
(
output
)
:
os
.
remove
(
output
)
# to avoid overwrite confirmation message
args
.
append
(
'
--
output
"
%s
"'
%
output
)
if
sign
:
args
.
app
end
(
'
--sign
--default-key
"
%s
"'
%
sign
)
self
.
set_output_without_confirmation
(
args
,
output
)
if
sign
is
True
:
args
.
append
(
'
--
sign
'
)
el
if
sign
:
args
.
ext
end
(
[
'
--sign
'
,
'
--default-key
'
,
no_quote
(
sign
)
])
if
always_trust
:
args
.
append
(
"
--always-trust
"
)
args
.
append
(
'
--always-trust
'
)
result
=
self
.
result_map
[
'
crypt
'
](
self
)
self
.
_handle_io
(
args
,
file
,
result
,
passphrase
=
passphrase
,
binary
=
True
)
logger
.
debug
(
'
encrypt result: %r
'
,
result
.
data
)
...
...
@@ -1111,13 +1331,10 @@ class GPG(object):
output
=
None
):
args
=
[
"
--decrypt
"
]
if
output
:
# write the output to a file with the specified name
if
os
.
path
.
exists
(
output
):
os
.
remove
(
output
)
# to avoid overwrite confirmation message
args
.
append
(
'
--output
"
%s
"'
%
output
)
self
.
set_output_without_confirmation
(
args
,
output
)
if
always_trust
:
args
.
append
(
"
--always-trust
"
)
result
=
self
.
result_map
[
'
crypt
'
](
self
)
self
.
_handle_io
(
args
,
file
,
result
,
passphrase
,
binary
=
True
)
logger
.
debug
(
'
decrypt result: %r
'
,
result
.
data
)
return
result
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment