Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
P
python-nbxmpp
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
8
Issues
8
List
Boards
Labels
Service Desk
Milestones
Merge Requests
3
Merge Requests
3
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Operations
Operations
Incidents
Environments
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
gajim
python-nbxmpp
Commits
562a26fd
Commit
562a26fd
authored
Oct 26, 2020
by
Philipp Hörist
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
OpenPGP: Use tasks
parent
5e63a080
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
191 additions
and
126 deletions
+191
-126
nbxmpp/modules/openpgp.py
nbxmpp/modules/openpgp.py
+191
-126
No files found.
nbxmpp/modules/openpgp.py
View file @
562a26fd
...
...
@@ -22,25 +22,25 @@ import string
from
nbxmpp.namespaces
import
Namespace
from
nbxmpp.protocol
import
NodeProcessed
from
nbxmpp.protocol
import
Node
from
nbxmpp.protocol
import
isResultNode
from
nbxmpp.protocol
import
StanzaMalformed
from
nbxmpp.util
import
call_on_response
from
nbxmpp.util
import
callback
from
nbxmpp.util
import
b64decode
from
nbxmpp.util
import
b64encode
from
nbxmpp.util
import
raise_error
from
nbxmpp.structs
import
StanzaHandler
from
nbxmpp.structs
import
PGPKeyMetadata
from
nbxmpp.structs
import
PGPPublicKey
from
nbxmpp.errors
import
MalformedStanzaError
from
nbxmpp.task
import
iq_request_task
from
nbxmpp.modules.date_and_time
import
parse_datetime
from
nbxmpp.modules.pubsub
import
get_pubsub_request
from
nbxmpp.modules.base
import
BaseModule
from
nbxmpp.modules.util
import
finalize
from
nbxmpp.modules.util
import
raise_if_error
class
OpenPGP
(
BaseModule
):
_depends
=
{
'publish'
:
'PubSub'
'publish'
:
'PubSub'
,
'request_items'
:
'PubSub'
,
}
def
__init__
(
self
,
client
):
...
...
@@ -107,8 +107,8 @@ class OpenPGP(BaseModule):
return
try
:
data
=
self
.
_parse_keylist
(
properties
.
jid
,
item
)
except
StanzaMalformed
as
error
:
data
=
_parse_keylist
(
properties
.
jid
,
item
)
except
ValueError
as
error
:
self
.
_log
.
warning
(
error
)
self
.
_log
.
warning
(
stanza
)
raise
NodeProcessed
...
...
@@ -123,162 +123,137 @@ class OpenPGP(BaseModule):
properties
.
pubsub_event
=
pubsub_event
@
staticmethod
def
_parse_keylist
(
jid
,
item
):
keylist_node
=
item
.
getTag
(
'public-keys-list'
,
namespace
=
Namespace
.
OPENPGP
)
if
keylist_node
is
None
:
raise
StanzaMalformed
(
'No public-keys-list node found'
)
metadata
=
keylist_node
.
getTags
(
'pubkey-metadata'
)
if
not
metadata
:
return
None
data
=
[]
for
key
in
metadata
:
fingerprint
=
key
.
getAttr
(
'v4-fingerprint'
)
date
=
key
.
getAttr
(
'date'
)
if
fingerprint
is
None
or
date
is
None
:
raise
StanzaMalformed
(
'Invalid metadata node'
)
timestamp
=
parse_datetime
(
date
,
epoch
=
True
)
if
timestamp
is
None
:
raise
StanzaMalformed
(
'Invalid date timestamp: %s'
%
date
)
data
.
append
(
PGPKeyMetadata
(
jid
,
fingerprint
,
timestamp
))
return
data
def
set_keylist
(
self
,
keylist
):
item
=
Node
(
'public-keys-list'
,
{
'xmlns'
:
Namespace
.
OPENPGP
})
if
keylist
is
not
None
:
for
key
in
keylist
:
date
=
time
.
strftime
(
'%Y-%m-%dT%H:%M:%SZ'
,
time
.
gmtime
(
key
.
date
))
attrs
=
{
'v4-fingerprint'
:
key
.
fingerprint
,
'date'
:
date
}
item
.
addChild
(
'pubkey-metadata'
,
attrs
=
attrs
)
@
iq_request_task
def
set_keylist
(
self
,
keylist
,
public
=
True
):
task
=
yield
access_model
=
'open'
if
public
else
'presence'
options
=
{
'pubsub#persist_items'
:
'true'
,
'pubsub#access_model'
:
access_model
,
}
self
.
_log
.
info
(
'Set keylist: %s'
,
keylist
)
self
.
publish
(
Namespace
.
OPENPGP_PK
,
item
,
id_
=
'current'
)
result
=
yield
self
.
publish
(
Namespace
.
OPENPGP_PK
,
_make_keylist
(
keylist
),
id_
=
'current'
,
options
=
options
,
force_node_options
=
True
)
yield
finalize
(
task
,
result
)
def
set_public_key
(
self
,
key
,
fingerprint
,
date
):
date
=
time
.
strftime
(
'%Y-%m-%dT%H:%M:%SZ'
,
time
.
gmtime
(
date
))
item
=
Node
(
'pubkey'
,
attrs
=
{
'xmlns'
:
Namespace
.
OPENPGP
,
'date'
:
date
})
data
=
item
.
addChild
(
'data'
)
data
.
addData
(
b64encode
(
key
))
node
=
'%s:%s'
%
(
Namespace
.
OPENPGP_PK
,
fingerprint
)
@
iq_request_task
def
set_public_key
(
self
,
key
,
fingerprint
,
date
,
public
=
True
):
task
=
yield
self
.
_log
.
info
(
'Set public key'
)
self
.
publish
(
node
,
item
,
id_
=
'current'
)
access_model
=
'open'
if
public
else
'presence'
@
call_on_response
(
'_public_key_received'
)
def
request_public_key
(
self
,
jid
,
fingerprint
):
self
.
_log
.
info
(
'Request public key from: %s %s'
,
jid
,
fingerprint
)
node
=
'%s:%s'
%
(
Namespace
.
OPENPGP_PK
,
fingerprint
)
return
get_pubsub_request
(
jid
,
node
,
max_items
=
1
)
options
=
{
'pubsub#persist_items'
:
'true'
,
'pubsub#access_model'
:
access_model
,
}
@
callback
def
_public_key_received
(
self
,
stanza
):
jid
=
stanza
.
getFrom
().
bare
result
=
yield
self
.
publish
(
f'
{
Namespace
.
OPENPGP_PK
}
:
{
fingerprint
}
'
,
_make_public_key
(
key
,
date
),
id_
=
'current'
,
options
=
options
,
force_node_options
=
True
)
if
not
isResultNode
(
stanza
):
return
raise_error
(
self
.
_log
.
info
,
stanza
)
yield
finalize
(
task
,
result
)
pubsub_node
=
stanza
.
getTag
(
'pubsub'
)
items_node
=
pubsub_node
.
getTag
(
'items'
)
item
=
items_node
.
getTag
(
'item'
)
@
iq_request_task
def
request_public_key
(
self
,
jid
,
fingerprint
):
task
=
yield
pub_key
=
item
.
getTag
(
'pubkey'
,
namespace
=
Namespace
.
OPENPGP
)
if
pub_key
is
None
:
return
raise_error
(
self
.
_log
.
warning
,
stanza
,
'stanza-malformed'
,
'PGP public key has no pubkey node'
)
self
.
_log
.
info
(
'Request public key from: %s %s'
,
jid
,
fingerprint
)
date
=
parse_datetime
(
pub_key
.
getAttr
(
'date'
),
epoch
=
True
)
items
=
yield
self
.
request_items
(
f'
{
Namespace
.
OPENPGP_PK
}
:
{
fingerprint
}
'
,
max_items
=
1
,
jid
=
jid
)
data
=
pub_key
.
getTag
(
'data'
)
if
data
is
None
:
return
raise_error
(
self
.
_log
.
warning
,
stanza
,
'stanza-malformed'
,
'PGP public key has no data node'
)
raise_if_error
(
items
)
if
not
items
:
yield
task
.
set_result
(
None
)
try
:
key
=
b64decode
(
data
.
getData
(),
return_type
=
bytes
)
except
Exception
as
error
:
return
raise_error
(
self
.
_log
.
warning
,
stanza
,
'stanza-malformed'
,
str
(
error
))
key
=
_parse_public_key
(
jid
,
items
[
0
])
except
ValueError
as
error
:
raise
MalformedStanzaError
(
str
(
error
),
items
)
key
=
PGPPublicKey
(
jid
,
key
,
date
)
self
.
_log
.
info
(
'Received public key: %s %s'
,
key
.
jid
,
key
.
date
)
return
key
yield
key
@
iq_request_task
def
request_keylist
(
self
,
jid
=
None
):
task
=
yield
@
call_on_response
(
'_keylist_received'
)
def
request_keylist
(
self
,
jid
):
self
.
_log
.
info
(
'Request keylist from: %s'
,
jid
)
return
get_pubsub_request
(
jid
,
Namespace
.
OPENPGP_PK
,
max_items
=
1
)
@
callback
def
_keylist_received
(
self
,
stanza
):
jid
=
stanza
.
getFrom
().
bare
items
=
yield
self
.
request_items
(
Namespace
.
OPENPGP_PK
,
max_items
=
1
,
jid
=
jid
)
if
not
isResultNode
(
stanza
):
return
raise_error
(
self
.
_log
.
info
,
stanza
)
raise_if_error
(
items
)
pubsub_node
=
stanza
.
getTag
(
'pubsub'
)
items_node
=
pubsub_node
.
getTag
(
'items'
)
item
=
items_node
.
getTag
(
'item'
)
if
not
items
:
yield
task
.
set_result
(
None
)
try
:
keylist
=
self
.
_parse_keylist
(
jid
,
item
)
except
StanzaMalformed
as
error
:
r
eturn
raise_error
(
self
.
_log
.
warning
,
stanza
,
'stanza-malformed'
,
str
(
error
))
keylist
=
_parse_keylist
(
jid
,
items
[
0
]
)
except
ValueError
as
error
:
r
aise
MalformedStanzaError
(
str
(
error
),
items
)
self
.
_log
.
info
(
'Received keylist: %s'
,
keylist
)
return
keylist
yield
keylist
@
call_on_response
(
'_secret_key_received'
)
@
iq_request_task
def
request_secret_key
(
self
):
self
.
_log
.
info
(
'Request secret key'
)
jid
=
self
.
_client
.
get_bound_jid
().
bare
return
get_pubsub_request
(
jid
,
Namespace
.
OPENPGP_SK
,
max_items
=
1
)
task
=
yield
@
callback
def
_secret_key_received
(
self
,
stanza
):
if
not
isResultNode
(
stanza
):
return
raise_error
(
self
.
_log
.
info
,
stanza
)
self
.
_log
.
info
(
'Request secret key'
)
pubsub_node
=
stanza
.
getTag
(
'pubsub'
)
items_node
=
pubsub_node
.
getTag
(
'items'
)
item
=
items_node
.
getTag
(
'item'
)
items
=
yield
self
.
request_items
(
Namespace
.
OPENPGP_SK
,
max_items
=
1
)
sec_key
=
item
.
getTag
(
'secretkey'
,
namespace
=
Namespace
.
OPENPGP
)
if
sec_key
is
None
:
return
raise_error
(
self
.
_log
.
warning
,
stanza
,
'stanza-malformed'
,
'PGP secretkey node not found'
)
raise_if_error
(
items
)
data
=
sec_key
.
getData
()
if
not
data
:
return
raise_error
(
self
.
_log
.
warning
,
stanza
,
'stanza-malformed'
,
'PGP secretkey has no data'
)
if
not
items
:
yield
task
.
set_result
(
None
)
try
:
key
=
b64decode
(
data
,
return_type
=
bytes
)
except
Exception
as
error
:
return
raise_error
(
self
.
_log
.
warning
,
stanza
,
'stanza-malformed'
,
str
(
error
))
self
.
_log
.
info
(
'Received secret key'
)
return
key
secret_key
=
_parse_secret_key
(
items
[
0
])
except
ValueError
as
error
:
raise
MalformedStanzaError
(
str
(
error
),
items
)
yield
secret_key
@
iq_request_task
def
set_secret_key
(
self
,
secret_key
):
item
=
Node
(
'secretkey'
,
{
'xmlns'
:
Namespace
.
OPENPGP
})
if
secret_key
is
not
None
:
item
.
setData
(
b64encode
(
secret_key
))
task
=
yield
self
.
_log
.
info
(
'Set public key'
)
options
=
{
'pubsub#persist_items'
:
'true'
,
'pubsub#access_model'
:
'whitelist'
,
}
self
.
_log
.
info
(
'Set secret key'
)
self
.
publish
(
Namespace
.
OPENPGP_SK
,
item
,
id_
=
'current'
)
result
=
yield
self
.
publish
(
Namespace
.
OPENPGP_SK
,
_make_secret_key
(
secret_key
),
id_
=
'current'
,
options
=
options
,
force_node_options
=
True
)
yield
finalize
(
task
,
result
)
def
parse_signcrypt
(
stanza
):
...
...
@@ -374,3 +349,93 @@ def create_message_stanza(stanza, encrypted_payload, with_fallback_text):
if
with_fallback_text
:
stanza
.
setBody
(
'[This message is *encrypted* with OpenPGP (See :XEP:`0373`]'
)
def
_make_keylist
(
keylist
):
item
=
Node
(
'public-keys-list'
,
{
'xmlns'
:
Namespace
.
OPENPGP
})
if
keylist
is
not
None
:
for
key
in
keylist
:
date
=
time
.
strftime
(
'%Y-%m-%dT%H:%M:%SZ'
,
time
.
gmtime
(
key
.
date
))
attrs
=
{
'v4-fingerprint'
:
key
.
fingerprint
,
'date'
:
date
}
item
.
addChild
(
'pubkey-metadata'
,
attrs
=
attrs
)
return
item
def
_make_public_key
(
key
,
date
):
date
=
time
.
strftime
(
'%Y-%m-%dT%H:%M:%SZ'
,
time
.
gmtime
(
date
))
item
=
Node
(
'pubkey'
,
attrs
=
{
'xmlns'
:
Namespace
.
OPENPGP
,
'date'
:
date
})
data
=
item
.
addChild
(
'data'
)
data
.
addData
(
b64encode
(
key
))
return
item
def
_make_secret_key
(
secret_key
):
item
=
Node
(
'secretkey'
,
{
'xmlns'
:
Namespace
.
OPENPGP
})
if
secret_key
is
not
None
:
item
.
setData
(
b64encode
(
secret_key
))
return
item
def
_parse_public_key
(
jid
,
item
):
pub_key
=
item
.
getTag
(
'pubkey'
,
namespace
=
Namespace
.
OPENPGP
)
if
pub_key
is
None
:
raise
ValueError
(
'pubkey node missing'
)
date
=
parse_datetime
(
pub_key
.
getAttr
(
'date'
),
epoch
=
True
)
data
=
pub_key
.
getTag
(
'data'
)
if
data
is
None
:
raise
ValueError
(
'data node missing'
)
try
:
key
=
b64decode
(
data
.
getData
(),
return_type
=
bytes
)
except
Exception
as
error
:
raise
ValueError
(
f'decoding error:
{
error
}
'
)
return
PGPPublicKey
(
jid
,
key
,
date
)
def
_parse_keylist
(
jid
,
item
):
keylist_node
=
item
.
getTag
(
'public-keys-list'
,
namespace
=
Namespace
.
OPENPGP
)
if
keylist_node
is
None
:
raise
ValueError
(
'public-keys-list node missing'
)
metadata
=
keylist_node
.
getTags
(
'pubkey-metadata'
)
if
not
metadata
:
return
None
data
=
[]
for
key
in
metadata
:
fingerprint
=
key
.
getAttr
(
'v4-fingerprint'
)
date
=
key
.
getAttr
(
'date'
)
if
fingerprint
is
None
or
date
is
None
:
raise
ValueError
(
'Invalid metadata node'
)
timestamp
=
parse_datetime
(
date
,
epoch
=
True
)
if
timestamp
is
None
:
raise
ValueError
(
'Invalid date timestamp: %s'
%
date
)
data
.
append
(
PGPKeyMetadata
(
jid
,
fingerprint
,
timestamp
))
return
data
def
_parse_secret_key
(
item
):
sec_key
=
item
.
getTag
(
'secretkey'
,
namespace
=
Namespace
.
OPENPGP
)
if
sec_key
is
None
:
raise
ValueError
(
'secretkey node missing'
)
data
=
sec_key
.
getData
()
if
not
data
:
raise
ValueError
(
'secretkey data missing'
)
try
:
key
=
b64decode
(
data
,
return_type
=
bytes
)
except
Exception
as
error
:
raise
ValueError
(
f'decoding error:
{
error
}
'
)
return
key
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment