Commit f5a9acc0 authored by Philipp Hörist's avatar Philipp Hörist
Browse files

Refactor b64decode()

parent b48f0f6c
......@@ -288,7 +288,7 @@ class GSSAPI:
self._client.send_nonza(node)
def response(self, server_message, *args, **kwargs):
server_message = b64decode(server_message, bytes)
server_message = b64decode(server_message)
try:
if not self.ctx.complete:
output_token = self.ctx.step(server_message)
......@@ -350,14 +350,14 @@ class SCRAM:
self._client.send_nonza(node)
def response(self, server_first_message):
server_first_message = b64decode(server_first_message)
server_first_message = b64decode(server_first_message).decode()
challenge = self._scram_parse(server_first_message)
client_nonce = challenge['r'][:self.nonce_length]
if client_nonce != self._client_nonce:
raise AuthFail('Invalid client nonce received from server')
salt = b64decode(challenge['s'], bytes)
salt = b64decode(challenge['s'])
iteration_count = int(challenge['i'])
if iteration_count < 4096:
......@@ -397,9 +397,9 @@ class SCRAM:
self._client.send_nonza(node)
def success(self, server_last_message):
server_last_message = b64decode(server_last_message)
server_last_message = b64decode(server_last_message).decode()
success = self._scram_parse(server_last_message)
server_signature = b64decode(success['v'], bytes)
server_signature = b64decode(success['v'])
if server_signature != self._server_signature:
raise AuthFail('Invalid server signature')
......
......@@ -60,7 +60,7 @@ def parse_bob_data(stanza: Node) -> Optional[BobData]:
return None
try:
bob_data = b64decode(bob_data, return_type=bytes)
bob_data = b64decode(bob_data)
except Exception:
log.warning('Unable to decode data')
log.exception(stanza)
......
......@@ -113,7 +113,7 @@ class IBB(BaseModule):
raise NodeProcessed
try:
decoded_data = b64decode(data.getData(), return_type=bytes)
decoded_data = b64decode(data.getData())
except Exception:
self._log.exception('Failed to decode IBB data')
self._client.send_stanza(ErrorStanza(stanza, ERR_BAD_REQUEST))
......
......@@ -194,7 +194,7 @@ def _parse_omemo_message(stanza):
iv_node = header.getTag('iv')
try:
iv = b64decode(iv_node.getData(), bytes)
iv = b64decode(iv_node.getData())
except Exception as error:
raise MalformedStanzaError('failed to decode iv: %s' % error, stanza)
......@@ -202,7 +202,7 @@ def _parse_omemo_message(stanza):
payload_node = encrypted.getTag('payload')
if payload_node is not None:
try:
payload = b64decode(payload_node.getData(), bytes)
payload = b64decode(payload_node.getData())
except Exception as error:
raise MalformedStanzaError('failed to decode payload: %s' % error,
stanza)
......@@ -227,7 +227,7 @@ def _parse_omemo_message(stanza):
raise MalformedStanzaError(error, stanza)
try:
keys[int(rid)] = (b64decode(kn.getData(), bytes), prekey)
keys[int(rid)] = (b64decode(kn.getData()), prekey)
except Exception as error:
raise MalformedStanzaError('failed to decode key: %s' % error,
stanza)
......@@ -273,8 +273,7 @@ def _parse_bundle(item):
result = {}
signed_prekey_node = bundle.getTag('signedPreKeyPublic')
try:
result['spk'] = {'key': b64decode(signed_prekey_node.getData(),
bytes)}
result['spk'] = {'key': b64decode(signed_prekey_node.getData())}
except Exception as error:
error = 'Failed to decode signedPreKeyPublic: %s' % error
raise MalformedStanzaError(error, item)
......@@ -287,15 +286,14 @@ def _parse_bundle(item):
signed_signature_node = bundle.getTag('signedPreKeySignature')
try:
result['spk_signature'] = b64decode(signed_signature_node.getData(),
bytes)
result['spk_signature'] = b64decode(signed_signature_node.getData())
except Exception as error:
error = 'Failed to decode signedPreKeySignature: %s' % error
raise MalformedStanzaError(error, item)
identity_key_node = bundle.getTag('identityKey')
try:
result['ik'] = b64decode(identity_key_node.getData(), bytes)
result['ik'] = b64decode(identity_key_node.getData())
except Exception as error:
error = 'Failed to decode IdentityKey: %s' % error
raise MalformedStanzaError(error, item)
......@@ -312,7 +310,7 @@ def _parse_bundle(item):
raise MalformedStanzaError('Invalid prekey: %s' % error, item)
try:
key = b64decode(prekey.getData(), bytes)
key = b64decode(prekey.getData())
except Exception as error:
raise MalformedStanzaError(
'Failed to decode preKeyPublic: %s' % error, item)
......
......@@ -74,7 +74,7 @@ class OpenPGP(BaseModule):
self._log.info('Encrypted message received')
try:
properties.openpgp = b64decode(data, return_type=bytes)
properties.openpgp = b64decode(data)
except Exception:
self._log.warning('b64decode failed')
self._log.warning(stanza)
......@@ -390,7 +390,7 @@ def _parse_public_key(jid, item):
raise ValueError('data node missing')
try:
key = b64decode(data.getData(), return_type=bytes)
key = b64decode(data.getData())
except Exception as error:
raise ValueError(f'decoding error: {error}')
......@@ -432,7 +432,7 @@ def _parse_secret_key(item):
raise ValueError('secretkey data missing')
try:
key = b64decode(data, return_type=bytes)
key = b64decode(data)
except Exception as error:
raise ValueError(f'decoding error: {error}')
......
......@@ -220,7 +220,7 @@ def _get_avatar_data(item, id_):
raise MalformedStanzaError('data node empty', item)
try:
avatar = b64decode(data, return_type=bytes)
avatar = b64decode(data)
except Exception as error:
raise MalformedStanzaError(f'decoding error: {error}', item)
......
......@@ -138,6 +138,6 @@ class VCard:
if not avatar:
return None, None
avatar = b64decode(avatar, return_type=bytes)
avatar = b64decode(avatar)
avatar_sha = hashlib.sha1(avatar).hexdigest()
return avatar, avatar_sha
......@@ -16,6 +16,7 @@
# along with this program; If not, see <http://www.gnu.org/licenses/>.
from typing import Literal
from typing import Union
import base64
import hashlib
......@@ -50,15 +51,14 @@ from nbxmpp.third_party.hsluv import hsluv_to_rgb
log = logging.getLogger('nbxmpp.util')
def b64decode(data, return_type=str):
def b64decode(data: Union[str, bytes]) -> bytes:
if not data:
raise ValueError('No data to decode')
if isinstance(data, str):
data = data.encode()
result = base64.b64decode(data)
if return_type == bytes:
return result
return result.decode()
return base64.b64decode(data)
def b64encode(data, return_type=str):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment