Commit b749795f authored by André's avatar André Committed by Philipp Hörist

configpaths: Port to pathlib

parent 96454134
Pipeline #6634 passed with stages
in 4 minutes and 4 seconds
......@@ -36,7 +36,6 @@
import time
import sys
from datetime import datetime
from pathlib import Path
from urllib.parse import unquote
from nbxmpp.namespaces import Namespace
......@@ -384,7 +383,7 @@ def warn_with_traceback(message, category, filename, lineno,
@staticmethod
def _redirect_output():
debug_folder = Path(configpaths.get('DEBUG'))
debug_folder = configpaths.get('DEBUG')
date = datetime.today().strftime('%d%m%Y-%H%M%S')
filename = '%s-debug.log' % date
fd = open(debug_folder / filename, 'a')
......@@ -392,7 +391,7 @@ def _redirect_output():
@staticmethod
def _cleanup_debug_logs():
debug_folder = Path(configpaths.get('DEBUG'))
debug_folder = configpaths.get('DEBUG')
debug_files = list(debug_folder.glob('*-debug.log*'))
now = time.time()
for file in debug_files:
......
......@@ -34,7 +34,6 @@
import sys
import logging
import uuid
from pathlib import Path
from collections import namedtuple
from collections import defaultdict
......@@ -631,7 +630,7 @@ def load_css_config():
css_config = CSSConfig()
def set_debug_mode(enable: bool) -> None:
debug_folder = Path(configpaths.get('DEBUG'))
debug_folder = configpaths.get('DEBUG')
debug_enabled = debug_folder / 'debug-enabled'
if enable:
debug_enabled.touch()
......@@ -640,7 +639,7 @@ def set_debug_mode(enable: bool) -> None:
debug_enabled.unlink()
def get_debug_mode() -> bool:
debug_folder = Path(configpaths.get('DEBUG'))
debug_folder = configpaths.get('DEBUG')
debug_enabled = debug_folder / 'debug-enabled'
return debug_enabled.exists()
......@@ -648,7 +647,7 @@ def get_stored_bob_data(algo_hash: str) -> Optional[bytes]:
try:
return bob_cache[algo_hash]
except KeyError:
filepath = Path(configpaths.get('BOB')) / algo_hash
filepath = configpaths.get('BOB') / algo_hash
if filepath.exists():
with open(str(filepath), 'r+b') as file:
data = file.read()
......
......@@ -14,7 +14,6 @@
import logging
from pathlib import Path
from gi.repository import GLib
from gi.repository import Gio
......@@ -30,7 +29,7 @@
class CertificateStore:
def __init__(self):
self._path = Path(configpaths.get('CERT_STORE'))
self._path = configpaths.get('CERT_STORE')
self._certs = []
self._load_certificates()
......
......@@ -38,7 +38,7 @@
from gajim.common.types import PathTuple
def get(key: str) -> str:
def get(key: str) -> Path:
return _paths[key]
......@@ -50,7 +50,7 @@ def get_plugin_dirs() -> List[Path]:
Path(_paths['PLUGINS_USER'])]
def get_paths(type_: PathType) -> Generator[str, None, None]:
def get_paths(type_: PathType) -> Generator[Path, None, None]:
for key, value in _paths.items():
path_type = value[2]
if type_ != path_type:
......@@ -71,7 +71,7 @@ def set_profile(profile: str) -> None:
def set_config_root(config_root: str) -> None:
_paths.custom_config_root = str(Path(config_root).resolve())
_paths.custom_config_root = Path(config_root).resolve()
def init() -> None:
......@@ -80,24 +80,21 @@ def init() -> None:
def create_paths() -> None:
for path in get_paths(PathType.FOLDER):
if not isinstance(path, Path):
path_ = Path(path)
if path_.is_file():
print(_('%s is a file but it should be a directory') % path_)
if path.is_file():
print(_('%s is a file but it should be a directory') % path)
print(_('Gajim will now exit'))
sys.exit()
if not path_.exists():
for parent_path in reversed(path_.parents):
if not path.exists():
for parent_path in reversed(path.parents):
# Create all parent folders
# don't use mkdir(parent=True), as it ignores `mode`
# when creating the parents
if not parent_path.exists():
print(('creating %s directory') % parent_path)
parent_path.mkdir(mode=0o700)
print(('creating %s directory') % path_)
path_.mkdir(mode=0o700)
print(('creating %s directory') % path)
path.mkdir(mode=0o700)
class ConfigPaths:
......@@ -105,67 +102,66 @@ def __init__(self) -> None:
self._paths = {} # type: Dict[str, PathTuple]
self.profile = ''
self.profile_separation = False
self.custom_config_root = None # type: Optional[str]
self.custom_config_root = None # type: Optional[Path]
if os.name == 'nt':
try:
# Documents and Settings\[User Name]\Application Data\Gajim
self.config_root = self.cache_root = self.data_root = \
os.path.join(os.environ['appdata'], 'Gajim')
Path(os.environ['appdata']) / 'Gajim'
except KeyError:
# win9x, in cwd
self.config_root = self.cache_root = self.data_root = '.'
self.config_root = self.cache_root = self.data_root = Path('.')
else:
self.config_root = os.path.join(GLib.get_user_config_dir(),
'gajim')
self.cache_root = os.path.join(GLib.get_user_cache_dir(), 'gajim')
self.data_root = os.path.join(GLib.get_user_data_dir(), 'gajim')
self.config_root = Path(GLib.get_user_config_dir()) / 'gajim'
self.cache_root = Path(GLib.get_user_cache_dir()) / 'gajim'
self.data_root = Path(GLib.get_user_data_dir()) / 'gajim'
import pkg_resources
basedir = pkg_resources.resource_filename("gajim", ".")
basedir = Path(pkg_resources.resource_filename("gajim", "."))
source_paths = [
('DATA', os.path.join(basedir, 'data')),
('STYLE', os.path.join(basedir, 'data', 'style')),
('EMOTICONS', os.path.join(basedir, 'data', 'emoticons')),
('GUI', os.path.join(basedir, 'data', 'gui')),
('ICONS', os.path.join(basedir, 'data', 'icons')),
('HOME', os.path.expanduser('~')),
('PLUGINS_BASE', os.path.join(basedir, 'data', 'plugins')),
('DATA', basedir / 'data'),
('STYLE', basedir / 'data' / 'style'),
('EMOTICONS', basedir / 'data' / 'emoticons'),
('GUI', basedir / 'data' / 'gui'),
('ICONS', basedir / 'data' / 'icons'),
('HOME', Path.home()),
('PLUGINS_BASE', basedir / 'data' / 'plugins'),
]
for path in source_paths:
self.add(*path)
def __getitem__(self, key: str) -> str:
def __getitem__(self, key: str) -> Path:
location, path, _ = self._paths[key]
if location == PathLocation.CONFIG:
return os.path.join(self.config_root, path)
return self.config_root / path
if location == PathLocation.CACHE:
return os.path.join(self.cache_root, path)
return self.cache_root / path
if location == PathLocation.DATA:
return os.path.join(self.data_root, path)
return self.data_root / path
return path
def items(self) -> Generator[Tuple[str, PathTuple], None, None]:
for key, value in self._paths.items():
yield (key, value)
def _prepare(self, path: str, unique: bool) -> str:
def _prepare(self, path: Path, unique: bool) -> Path:
if os.name == 'nt':
path = path.capitalize()
path = Path(str(path).capitalize())
if self.profile:
if unique or self.profile_separation:
return '%s.%s' % (path, self.profile)
return Path(f'{path}.{self.profile}')
return path
def add(self,
name: str,
path: str,
path: Path,
location: PathLocation = None,
path_type: PathType = None,
unique: bool = False) -> None:
if path and location is not None:
if location is not None:
path = self._prepare(path, unique)
self._paths[name] = (location, path, path_type)
......@@ -175,10 +171,10 @@ def init(self):
self.cache_root = self.data_root = self.custom_config_root
user_dir_paths = [
('TMP', tempfile.gettempdir()),
('MY_CONFIG', '', PathLocation.CONFIG, PathType.FOLDER),
('MY_CACHE', '', PathLocation.CACHE, PathType.FOLDER),
('MY_DATA', '', PathLocation.DATA, PathType.FOLDER),
('TMP', Path(tempfile.gettempdir())),
('MY_CONFIG', Path(), PathLocation.CONFIG, PathType.FOLDER),
('MY_CACHE', Path(), PathLocation.CACHE, PathType.FOLDER),
('MY_DATA', Path(), PathLocation.DATA, PathType.FOLDER),
]
for path in user_dir_paths:
......
......@@ -51,6 +51,7 @@
from urllib.parse import unquote
from encodings.punycode import punycode_encode
from functools import wraps
from pathlib import Path
from packaging.version import Version as V
from nbxmpp.namespaces import Namespace
......@@ -447,18 +448,18 @@ def check_soundfile_path(file_, dirs=None):
(eg: ~/.gajim/sounds/, DATADIR/sounds...).
:return the path to file or None if it doesn't exists.
"""
if not file_:
return None
if Path(file_).exists():
return Path(file_)
if dirs is None:
dirs = [configpaths.get('MY_DATA'),
configpaths.get('DATA')]
if not file_:
return None
if os.path.exists(file_):
return file_
for dir_ in dirs:
dir_ = os.path.join(dir_, 'sounds', file_)
if os.path.exists(dir_):
dir_ = dir_ / 'sounds' / file_
if dir_.exists():
return dir_
return None
......@@ -482,11 +483,12 @@ def strip_soundfile_path(file_, dirs=None, abs_=True):
dirs = [configpaths.get('MY_DATA'),
configpaths.get('DATA')]
name = os.path.basename(file_)
file_ = Path(file_)
name = file_.name
for dir_ in dirs:
dir_ = os.path.join(dir_, 'sounds', name)
dir_ = dir_ / 'sounds' / name
if abs_:
dir_ = os.path.abspath(dir_)
dir_ = dir_.absolute()
if file_ == dir_:
return name
return file_
......@@ -496,6 +498,7 @@ def play_sound_file(path_to_soundfile):
if path_to_soundfile is None:
return
path_to_soundfile = str(path_to_soundfile)
if sys.platform == 'win32':
import winsound
try:
......@@ -854,25 +857,19 @@ def version_condition(current_version, required_version):
return True
def get_available_emoticon_themes():
emoticons_themes = ['font']
files = []
dir_iterator = os.scandir(configpaths.get('EMOTICONS'))
for folder in dir_iterator:
for folder in configpaths.get('EMOTICONS').iterdir():
if not folder.is_dir():
continue
file_iterator = os.scandir(folder.path)
for theme in file_iterator:
if theme.is_file():
files.append(theme.name)
if os.path.isdir(configpaths.get('MY_EMOTS')):
files += os.listdir(configpaths.get('MY_EMOTS'))
for file in files:
if file.endswith('.png'):
emoticons_themes.append(file[:-4])
emoticons_themes.sort()
return emoticons_themes
files += [theme for theme in folder.iterdir() if theme.is_file()]
my_emots = configpaths.get('MY_EMOTS')
if my_emots.is_dir():
files += list(my_emots.iterdir())
emoticons_themes = ['font']
emoticons_themes += [file.stem for file in files if file.suffix == '.png']
return sorted(emoticons_themes)
def call_counter(func):
def helper(self, restart=False):
......
......@@ -19,8 +19,6 @@
from typing import Any # pylint: disable=unused-import
from typing import Dict # pylint: disable=unused-import
import os
import nbxmpp
from nbxmpp.namespaces import Namespace
......@@ -227,8 +225,8 @@ def _fill_content(self, content):
if self.use_security:
security = nbxmpp.simplexml.Node(
tag=Namespace.JINGLE_XTLS + ' security')
certpath = os.path.join(
configpaths.get('MY_CERT'), SELF_SIGNED_CERTIFICATE) + '.cert'
certpath = configpaths.get('MY_CERT') / (SELF_SIGNED_CERTIFICATE
+ '.cert')
cert = load_cert_file(certpath)
if cert:
digest_algo = (cert.get_signature_algorithm()
......
......@@ -18,7 +18,6 @@
import hashlib
import logging
import os
import threading
from enum import IntEnum, unique
......@@ -119,10 +118,10 @@ def __init__(self, session, transport=None, file_props=None,
State.CAND_SENT_AND_RECEIVED : StateCandSentAndRecv(self)
}
cert_name = os.path.join(configpaths.get('MY_CERT'),
jingle_xtls.SELF_SIGNED_CERTIFICATE)
if not (os.path.exists(cert_name + '.cert')
and os.path.exists(cert_name + '.pkey')):
cert_name = (configpaths.get('MY_CERT') /
jingle_xtls.SELF_SIGNED_CERTIFICATE)
if not (cert_name.with_suffix('.cert').exists()
and cert_name.with_suffix('.pkey').exists()):
jingle_xtls.make_certs(cert_name, 'gajim')
def __state_changed(self, nextstate, args=None):
......
......@@ -13,7 +13,7 @@
# along with Gajim. If not, see <http://www.gnu.org/licenses/>.
import logging
import os
from pathlib import Path
from OpenSSL import SSL, crypto
......@@ -52,7 +52,7 @@ def load_cert_file(cert_path, cert_store=None):
"""
This is almost identical to the one in nbxmpp.tls_nb
"""
if not os.path.isfile(cert_path):
if not cert_path.is_file():
return None
try:
f = open(cert_path)
......@@ -100,22 +100,22 @@ def get_context(fingerprint, verify_cb=None, remote_jid=None):
elif fingerprint == 'client':
ctx.set_verify(SSL.VERIFY_PEER, verify_cb or default_callback)
cert_name = os.path.join(configpaths.get('MY_CERT'), SELF_SIGNED_CERTIFICATE)
ctx.use_privatekey_file((cert_name + '.pkey').encode('utf-8'))
ctx.use_certificate_file((cert_name + '.cert').encode('utf-8'))
cert_name = configpaths.get('MY_CERT') / SELF_SIGNED_CERTIFICATE
ctx.use_privatekey_file(str(cert_name.with_suffix('.pkey')).encode('utf-8'))
ctx.use_certificate_file(str(cert_name.with_suffix('.cert')).encode('utf-8'))
# Try to load Diffie-Hellman parameters.
# First try user DH parameters, if this fails load the default DH parameters
dh_params_name = os.path.join(configpaths.get('MY_CERT'), DH_PARAMS)
dh_params_name = configpaths.get('MY_CERT') / DH_PARAMS
try:
with open(dh_params_name, "r"):
ctx.load_tmp_dh(dh_params_name.encode('utf-8'))
except FileNotFoundError as err:
default_dh_params_name = os.path.join(configpaths.get('DATA'),
'other', DEFAULT_DH_PARAMS)
default_dh_params_name = (configpaths.get('DATA') / 'other' /
DEFAULT_DH_PARAMS)
try:
with open(default_dh_params_name, "r"):
ctx.load_tmp_dh(default_dh_params_name.encode('utf-8'))
ctx.load_tmp_dh(str(default_dh_params_name).encode('utf-8'))
except FileNotFoundError as err:
log.error('Unable to load default DH parameter file: %s, %s',
default_dh_params_name, err)
......@@ -123,9 +123,9 @@ def get_context(fingerprint, verify_cb=None, remote_jid=None):
if remote_jid:
store = ctx.get_cert_store()
path = os.path.join(os.path.expanduser(configpaths.get('MY_PEER_CERTS')),
remote_jid) + '.cert'
if os.path.exists(path):
path = configpaths.get('MY_PEER_CERTS').expanduser() / (remote_jid
+ '.cert')
if path.exists():
load_cert_file(path, cert_store=store)
log.debug('certificate file %s loaded fingerprint %s',
path, fingerprint)
......@@ -140,8 +140,8 @@ def read_cert(certpath):
return certificate
def send_cert(con, jid_from, sid):
certpath = os.path.join(configpaths.get('MY_CERT'), SELF_SIGNED_CERTIFICATE) + \
'.cert'
certpath = configpaths.get('MY_CERT') / (SELF_SIGNED_CERTIFICATE
+ '.cert')
certificate = read_cert(certpath)
iq = nbxmpp.Iq('result', to=jid_from)
iq.setAttr('id', sid)
......@@ -159,8 +159,7 @@ def send_cert(con, jid_from, sid):
def handle_new_cert(con, obj, jid_from):
jid = app.get_jid_without_resource(jid_from)
certpath = os.path.join(os.path.expanduser(configpaths.get('MY_PEER_CERTS')), jid)
certpath += '.cert'
certpath = configpaths.get('MY_PEER_CERTS').expanduser() / (jid + '.cert')
id_ = obj.getAttr('id')
......@@ -177,9 +176,8 @@ def handle_new_cert(con, obj, jid_from):
approve_pending_content(id_)
def check_cert(jid, fingerprint):
certpath = os.path.join(os.path.expanduser(configpaths.get('MY_PEER_CERTS')), jid)
certpath += '.cert'
if os.path.exists(certpath):
certpath = configpaths.get('MY_PEER_CERTS').expanduser() / (jid + '.cert')
if certpath.exists():
cert = load_cert_file(certpath)
if cert:
digest_algo = cert.get_signature_algorithm().decode('utf-8')\
......@@ -273,12 +271,12 @@ def make_certs(filepath, CN):
key = createKeyPair(TYPE_RSA, 4096)
req = createCertRequest(key, CN=CN)
cert = createCertificate(req, req, key, 0, 0, 60*60*24*365*5) # five years
with open(filepath + '.pkey', 'wb') as f:
os.chmod(filepath + '.pkey', 0o600)
with open(filepath.with_suffix('.pkey'), 'wb') as f:
filepath.with_suffix('.pkey').chmod(0o600)
f.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, key))
with open(filepath + '.cert', 'wb') as f:
with open(filepath.with_suffix('.cert'), 'wb') as f:
f.write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
if __name__ == '__main__':
make_certs('./selfcert', 'gajim')
make_certs(Path('selfcert'), 'gajim')
......@@ -17,7 +17,6 @@
import logging
import hashlib
from base64 import b64decode
from pathlib import Path
import nbxmpp
from nbxmpp.namespaces import Namespace
......@@ -133,7 +132,7 @@ def parse_bob_data(stanza):
log.warning('No data found: %s', stanza)
return None
filepath = Path(configpaths.get('BOB')) / algo_hash
filepath = configpaths.get('BOB') / algo_hash
if algo_hash in app.bob_cache or filepath.exists():
log.info('BoB data already cached')
return None
......@@ -182,7 +181,7 @@ def store_bob_data(bob_data):
algo_hash = '%s+%s' % (bob_data.algo, bob_data.hash_)
filepath = Path(configpaths.get('BOB')) / algo_hash
filepath = configpaths.get('BOB') / algo_hash
if algo_hash in app.bob_cache or filepath.exists():
log.info('BoB data already cached')
return None
......
......@@ -170,7 +170,7 @@ def _namedtuple_factory(cursor: Any, row: Any) -> Any:
return namedtuple("Row", fields)(*row)
def _connect_database(self) -> None:
path = Path(configpaths.get('SETTINGS'))
path = configpaths.get('SETTINGS')
if path.is_dir():
log.error('%s is a directory but should be a file', path)
sys.exit()
......@@ -237,7 +237,7 @@ def _migrate(self) -> None:
pass
def _migrate_old_config(self) -> None:
config_file = Path(configpaths.get('CONFIG_FILE'))
config_file = configpaths.get('CONFIG_FILE')
if not config_file.exists():
return
......
......@@ -27,7 +27,6 @@
import json
import logging
import sqlite3 as sqlite
from pathlib import Path
from collections import namedtuple
from gajim.common import app
......@@ -93,7 +92,7 @@ class MessageArchiveStorage(SqliteStorage):
def __init__(self):
SqliteStorage.__init__(self,
log,
Path(configpaths.get('LOG_DB')),
configpaths.get('LOG_DB'),
ARCHIVE_SQL_STATEMENT)
self._jid_ids = {}
......
......@@ -16,7 +16,6 @@
import time
import sqlite3
import logging
from pathlib import Path
from collections import namedtuple
from gajim.common import configpaths
......@@ -56,7 +55,7 @@ class CacheStorage(SqliteStorage):
def __init__(self):
SqliteStorage.__init__(self,
log,
Path(configpaths.get('CACHE_DB')),
configpaths.get('CACHE_DB'),
CACHE_SQL_STATEMENT)
self._entity_caps_cache = {}
......
......@@ -22,6 +22,7 @@
from typing import Union
from typing import TYPE_CHECKING
from pathlib import Path
import nbxmpp
from gajim.common.const import PathType, PathLocation
......@@ -53,7 +54,7 @@
PEPHandlersDict = Dict[str, List[PEPNotifyCallback]]
# Configpaths
PathTuple = Tuple[Optional[PathLocation], str, Optional[PathType]]
PathTuple = Tuple[Optional[PathLocation], Path, Optional[PathType]]
# Plugins
PluginExtensionPoints = Dict[str, Tuple[Optional[Callable[..., None]],
......
......@@ -13,7 +13,6 @@
# along with Gajim. If not, see <http://www.gnu.org/licenses/>.
import logging
from pathlib import Path
from gi.repository import Gdk
from gi.repository import GLib
......@@ -559,7 +558,7 @@ def _on_signup(self, *args):
def _create_server_completion(self):
# Parse servers.json
file_path = Path(configpaths.get('DATA')) / 'other' / 'servers.json'
file_path = configpaths.get('DATA') / 'other' / 'servers.json'
self._servers = helpers.load_json(file_path, 'servers', [])
# Create a separate model for the address entry, because it will
......@@ -658,7 +657,7 @@ def focus(self):
def _create_server_completion(self):
# Parse servers.json
file_path = Path(configpaths.get('DATA')) / 'other' / 'servers.json'
file_path = configpaths.get('DATA') / 'other' / 'servers.json'
servers = helpers.load_json(file_path, 'servers', [])
# Create servers_model for comboboxes and entries
......
......@@ -12,7 +12,6 @@
# You should have received a copy of the GNU General Public License
# along with Gajim. If not, see <http://www.gnu.org/licenses/>.
import os
import logging
import hashlib
from math import pi
......@@ -295,7 +294,7 @@ def save_avatar(data):
return None