Newer
Older
## common/helpers.py
##
## Copyright (C) 2003-2006 Yann Le Boulanger <asterix@lagaule.org>
## Copyright (C) 2005-2006 Nikos Kouremenos <kourem@gmail.com>
## Copyright (C) 2005
## Dimitur Kirov <dkirov@gmail.com>
## Travis Shirk <travis@pobox.com>
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published
## by the Free Software Foundation; version 2 only.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##

Yann Leboulanger
committed
import errno
import sys
import stat
from pysqlite2 import dbapi2 as sqlite
import logger
import i18n
from xmpp_stringprep import nodeprep, resourceprep, nameprep

nkour
committed
import win32api
import win32con
Q_ = i18n.Q_
class InvalidFormat(Exception):
pass
def parse_jid(jidstring):
'''Perform stringprep on all JID fragments from a string
and return the full jid'''
# This function comes from http://svn.twistedmatrix.com/cvs/trunk/twisted/words/protocols/jabber/jid.py
user = None
server = None
resource = None
# Search for delimiters
user_sep = jidstring.find('@')
if res_sep == -1:
# host
server = jidstring
else:
# host/resource
server = jidstring[0:res_sep]
resource = jidstring[res_sep + 1:] or None
else:
if res_sep == -1:
# user@host
user = jidstring[0:user_sep] or None
server = jidstring[user_sep + 1:]
else:
if user_sep < res_sep:
# user@host/resource
user = jidstring[0:user_sep] or None
server = jidstring[user_sep + 1:user_sep + (res_sep - user_sep)]
resource = jidstring[res_sep + 1:] or None
else:
# server/resource (with an @ in resource)
server = jidstring[0:res_sep]
resource = jidstring[res_sep + 1:] or None
return prep(user, server, resource)
def parse_resource(resource):
'''Perform stringprep on resource and return it'''
if resource:
try:
return resourceprep.prepare(unicode(resource))
except UnicodeError:
raise InvalidFormat, 'Invalid character in resource.'
def prep(user, server, resource):
'''Perform stringprep on all JID fragments and return the full jid'''
# This function comes from
#http://svn.twistedmatrix.com/cvs/trunk/twisted/words/protocols/jabber/jid.py
if user:
try:
user = nodeprep.prepare(unicode(user))
except UnicodeError:
raise InvalidFormat, _('Invalid character in username.')
else:
user = None
if not server:
else:
try:
server = nameprep.prepare(unicode(server))
except UnicodeError:
raise InvalidFormat, _('Invalid character in hostname.')
if resource:
try:
resource = resourceprep.prepare(unicode(resource))
except UnicodeError:
raise InvalidFormat, _('Invalid character in resource.')
else:
resource = None
if user:
if resource:
return '%s@%s/%s' % (user, server, resource)
return '%s@%s' % (user, server)
else:
if resource:
return '%s/%s' % (server, resource)
else:
return server

Yann Leboulanger
committed
def temp_failure_retry(func, *args, **kwargs):
while True:
try:
return func(*args, **kwargs)
except (os.error, IOError, select.error), ex:
if ex.errno == errno.EINTR:
continue
else:
raise

Yann Leboulanger
committed
def convert_bytes(string):
suffix = ''
# IEC standard says KiB = 1024 bytes KB = 1000 bytes
use_kib_mib = gajim.config.get('use_kib_mib')
align = 1024.
bytes = float(string)
if bytes >= align:
bytes = round(bytes/align, 1)
if bytes >= align:
bytes = round(bytes/align, 1)
if bytes >= align:
bytes = round(bytes/align, 1)
if use_kib_mib:
#GiB means gibibyte
else:
#GB means gigabyte
suffix = _('%s GB')
else:
if use_kib_mib:
#MiB means mibibyte
suffix = _('%s MiB')
else:
#MB means megabyte
suffix = _('%s MB')
else:
if use_kib_mib:
#KiB means kibibyte
suffix = _('%s KiB')
else:
#KB means kilo bytes
suffix = _('%s KB')
else:
return suffix % unicode(bytes)
def get_uf_show(show, use_mnemonic = False):
'''returns a userfriendly string for dnd/xa/chat
and makes all strings translatable
if use_mnemonic is True, it adds _ so GUI should call with True
for accessibility issues'''
if use_mnemonic:
uf_show = _('_Busy')
else:
uf_show = _('Busy')
if use_mnemonic:
uf_show = _('_Not Available')
else:
uf_show = _('Not Available')
if use_mnemonic:
uf_show = _('_Free for Chat')
else:
uf_show = _('Free for Chat')
if use_mnemonic:
uf_show = _('_Available')
else:
uf_show = _('Available')
if use_mnemonic:
uf_show = _('A_way')
else:
uf_show = _('Away')
if use_mnemonic:
uf_show = _('_Offline')
else:
uf_show = _('Offline')
if use_mnemonic:
uf_show = _('_Invisible')
else:
uf_show = _('Invisible')
elif show == 'not in roster':
return unicode(uf_show)
def get_uf_sub(sub):
if sub == 'none':
uf_sub = Q_('?Subscription we already have:None')
elif sub == 'to':
uf_sub = _('To')
elif sub == 'from':
uf_sub = _('From')
elif sub == 'both':
uf_sub = _('Both')
else:
uf_sub = sub
return unicode(uf_sub)
def get_uf_ask(ask):
if ask is None:
uf_ask = Q_('?Ask (for Subscription):None')
elif ask == 'subscribe':
uf_ask = _('Subscribe')
else:
uf_ask = ask
return unicode(uf_ask)
''' plural determines if you get Moderators or Moderator'''
if role == 'none':
role_name = Q_('?Group Chat Contact Role:None')
elif role == 'moderator':
if plural:
role_name = _('Moderators')
else:
role_name = _('Moderator')
elif role == 'participant':
if plural:
role_name = _('Participants')
else:
role_name = _('Participant')
elif role == 'visitor':
if plural:
role_name = _('Visitors')
else:
role_name = _('Visitor')
return role_name
def get_sorted_keys(adict):
keys = adict.keys()
keys.sort()
return keys
def to_one_line(msg):
msg = msg.replace('\\', '\\\\')
msg = msg.replace('\n', '\\n')
# s1 = 'test\ntest\\ntest'
# s11 = s1.replace('\\', '\\\\')
# s12 = s11.replace('\n', '\\n')
# s12
# 'test\\ntest\\\\ntest'
return msg
def from_one_line(msg):
# (?<!\\) is a lookbehind assertion which asks anything but '\'
# to match the regexp that follows it
# So here match '\\n' but not if you have a '\' before that
re = sre.compile(r'(?<!\\)\\n')
msg = re.sub('\n', msg)
msg = msg.replace('\\\\', '\\')
# s12 = 'test\\ntest\\\\ntest'
# s13 = re.sub('\n', s12)
# s14 s13.replace('\\\\', '\\')
# s14
# 'test\ntest\\ntest'
return msg
def get_uf_chatstate(chatstate):
'''removes chatstate jargon and returns user friendly messages'''
if chatstate == 'active':
return _('is paying attention to the conversation')
elif chatstate == 'inactive':
return _('is doing something else')
elif chatstate == 'composing':
return _('is composing a message...')
elif chatstate == 'paused':
#paused means he or she was compoing but has stopped for a while
return _('paused composing a message')
elif chatstate == 'gone':
def is_in_path(name_of_command, return_abs_path = False):
# if return_abs_path is True absolute path will be returned
# for name_of_command
# on failures False is returned
is_in_dir = False
found_in_which_dir = None
path = os.getenv('PATH').split(':')
for path_to_directory in path:
try:
contents = os.listdir(path_to_directory)
except OSError: # user can have something in PATH that is not a dir
pass
if is_in_dir:
if return_abs_path:
found_in_which_dir = path_to_directory
break
if found_in_which_dir:
abs_path = os.path.join(path_to_directory, name_of_command)
return abs_path
else:
return is_in_dir
#kind = 'url' or 'mail'
if os.name == 'nt':
try:
os.startfile(uri) # if pywin32 is installed we open
except:
pass
else:
if kind == 'mail' and not uri.startswith('mailto:'):
uri = 'mailto:' + uri
if gajim.config.get('openwith') == 'gnome-open':
command = 'gnome-open'
elif gajim.config.get('openwith') == 'kfmclient exec':
command = 'kfmclient exec'
elif gajim.config.get('openwith') == 'custom':
if kind == 'url':
command = gajim.config.get('custombrowser')
if kind == 'mail':
command = gajim.config.get('custommailapp')
if command == '': # if no app is configured
return
# we add the uri in "" so we have good parsing from shell
command = command + ' "' + uri + '" &'
try: #FIXME: when we require python2.4+ use subprocess module
os.system(command)
except:
pass
if os.name == 'nt':
try:
os.startfile(path_to_open) # if pywin32 is installed we open
except:
pass
else:
if gajim.config.get('openwith') == 'gnome-open':
command = 'gnome-open'
elif gajim.config.get('openwith') == 'kfmclient exec':
command = 'kfmclient exec'
elif gajim.config.get('openwith') == 'custom':
command = gajim.config.get('custom_file_manager')
if command == '': # if no app is configured
return
# we add the path in "" so we have good parsing from shell
path_to_open = path_to_open.replace('"', '\\"') # escape "
command = command + ' "' + path_to_open + '" &'
try: #FIXME: when we require python2.4+ use subprocess module
os.system(command)
except:
pass
if not gajim.config.get('sounds_on'):
return
path_to_soundfile = gajim.config.get_per('soundevents', event, 'path')
if path_to_soundfile == 'beep':
print '\a' # make a speaker beep
return

nkour
committed
if path_to_soundfile is None or not os.path.exists(path_to_soundfile):
if os.name == 'nt':
try:
winsound.PlaySound(path_to_soundfile,
winsound.SND_FILENAME|winsound.SND_ASYNC)
except:
pass
elif os.name == 'posix':
if gajim.config.get('soundplayer') == '':
return
player = gajim.config.get('soundplayer')
# we add the path in "" so we have good parsing from shell
path_to_soundfile = path_to_soundfile.replace('"', '\\"') # escape "
command = player + ' "' + path_to_soundfile + '" &'
#FIXME: when we require 2.4+ use subprocess module
os.system(command)
def get_file_path_from_dnd_dropped_uri(uri):
path = urllib.url2pathname(uri) # escape special chars
path = path.strip('\r\n\x00') # remove \r\n and NULL
# get the path to file
if path.startswith('file:\\\\\\'): # windows
path = path[8:] # 8 is len('file:///')
elif path.startswith('file://'): # nautilus, rox
path = path[7:] # 7 is len('file://')
elif path.startswith('file:'): # xffm
path = path[5:] # 5 is len('file:')
return path
def from_xs_boolean_to_python_boolean(value):
# convert those to True/False (python booleans)
val = True
else: # '0', 'false' or anything else
val = False
return val
def ensure_unicode_string(s):
# py23 u'abc'.decode('utf-8') raises
# python24 does not. if python23 is ooold we can remove this func

Yann Leboulanger
committed
def one_account_connected():

Yann Leboulanger
committed
one_connected = False
accounts = gajim.connections.keys()
for acct in accounts:
if gajim.connections[acct].connected > 1:
one_connected = True
break
return one_connected
def get_output_of_command(command):
try:
child_stdin, child_stdout = os.popen2(command)
except ValueError:
return None
output = child_stdout.readlines()
child_stdout.close()
child_stdin.close()
def get_global_show():
maxi = 0
for account in gajim.connections:

Yann Leboulanger
committed
if not gajim.config.get_per('accounts', account,
'sync_with_global_status'):
continue
connected = gajim.connections[account].connected
if connected > maxi:
maxi = connected
return gajim.SHOW_LIST[maxi]
def get_global_status():
maxi = 0
for account in gajim.connections:
if not gajim.config.get_per('accounts', account,
'sync_with_global_status'):
continue
connected = gajim.connections[account].connected
if connected > maxi:
maxi = connected
status = gajim.connections[account].status
return status
'''Get the icon name to show in online, away, requested, ...'''
if account and gajim.awaiting_events[account].has_key(contact.jid):
if contact.jid.find('@') <= 0: # if not '@' or '@' starts the jid ==> agent
return contact.show
if contact.sub in ('both', 'to'):
return contact.show
if contact.ask == 'subscribe':
return 'requested'
if transport:
return contact.show
def decode_string(string):
'''try to decode (to make it Unicode instance) given string'''
# by the time we go to iso15 it better be the one else we show bad characters
encodings = (sys.getfilesystemencoding(), 'utf-8', 'iso-8859-15')
for encoding in encodings:
try:
string = string.decode(encoding)
except UnicodeError:
continue
break

nkour
committed
def get_windows_reg_env(varname, default=''):
'''asks for paths commonly used but not exposed as ENVs
in english Windows 2003 those are:

nkour
committed
'AppData' = %USERPROFILE%\Application Data (also an ENV)
'Desktop' = %USERPROFILE%\Desktop
'Favorites' = %USERPROFILE%\Favorites
'NetHood' = %USERPROFILE%\NetHood
'Personal' = D:\My Documents (PATH TO MY DOCUMENTS)
'PrintHood' = %USERPROFILE%\PrintHood
'Programs' = %USERPROFILE%\Start Menu\Programs
'Recent' = %USERPROFILE%\Recent
'SendTo' = %USERPROFILE%\SendTo
'Start Menu' = %USERPROFILE%\Start Menu
'Startup' = %USERPROFILE%\Start Menu\Programs\Startup
'Templates' = %USERPROFILE%\Templates
'My Pictures' = D:\My Documents\My Pictures
'Local Settings' = %USERPROFILE%\Local Settings
'Local AppData' = %USERPROFILE%\Local Settings\Application Data
'Cache' = %USERPROFILE%\Local Settings\Temporary Internet Files
'Cookies' = %USERPROFILE%\Cookies
'History' = %USERPROFILE%\Local Settings\History
'''
if os.name != 'nt':
return ''
val = default
try:
rkey = win32api.RegOpenKey(win32con.HKEY_CURRENT_USER,

nkour
committed
r'Software\Microsoft\Windows\CurrentVersion\Explorer\User Shell Folders')
try:
val = str(win32api.RegQueryValueEx(rkey, varname)[0])
val = win32api.ExpandEnvironmentStrings(val) # expand using environ

nkour
committed
except:
pass
finally:
win32api.RegCloseKey(rkey)
return val
def get_my_pictures_path():
'''windows-only atm. [Unix lives in the past]'''
return get_windows_reg_env('My Pictures')
def get_desktop_path():
if os.name == 'nt':
path = get_windows_reg_env('Desktop')
else:
path = os.path.join(os.path.expanduser('~'), 'Desktop')
return path
def get_documents_path():
if os.name == 'nt':
path = get_windows_reg_env('Personal')
else:
path = os.path.expanduser('~')
return path
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
# moved from connection.py
def get_full_jid_from_iq(iq_obj):
'''return the full jid (with resource) from an iq as unicode'''
return parse_jid(str(iq_obj.getFrom()))
def get_jid_from_iq(iq_obj):
'''return the jid (without resource) from an iq as unicode'''
jid = get_full_jid_from_iq(iq_obj)
return gajim.get_jid_without_resource(jid)
def get_auth_sha(sid, initiator, target):
''' return sha of sid + initiator + target used for proxy auth'''
return sha.new("%s%s%s" % (sid, initiator, target)).hexdigest()
distro_info = {
'Arch Linux': '/etc/arch-release',
'Aurox Linux': '/etc/aurox-release',
'Conectiva Linux': '/etc/conectiva-release',
'CRUX': '/usr/bin/crux',
'Debian GNU/Linux': '/etc/debian_release',
'Debian GNU/Linux': '/etc/debian_version',
'Fedora Linux': '/etc/fedora-release',
'Gentoo Linux': '/etc/gentoo-release',
'Linux from Scratch': '/etc/lfs-release',
'Mandrake Linux': '/etc/mandrake-release',
'Slackware Linux': '/etc/slackware-release',
'Slackware Linux': '/etc/slackware-version',
'Solaris/Sparc': '/etc/release',
'Source Mage': '/etc/sourcemage_version',
'SUSE Linux': '/etc/SuSE-release',
'Sun JDS': '/etc/sun-release',
'PLD Linux': '/etc/pld-release',
'Yellow Dog Linux': '/etc/yellowdog-release',
# many distros use the /etc/redhat-release for compatibility
# so Redhat is the last
'Redhat Linux': '/etc/redhat-release'
}
def get_random_string_16():
''' create random string of length 16'''
rng = range(65, 90)
rng.extend(range(48, 57))
char_sequence = map(lambda e:chr(e), rng)
from random import sample
return reduce(lambda e1, e2: e1 + e2,
sample(char_sequence, 16))
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
def get_os_info():
if os.name == 'nt':
ver = os.sys.getwindowsversion()
ver_format = ver[3], ver[0], ver[1]
win_version = {
(1, 4, 0): '95',
(1, 4, 10): '98',
(1, 4, 90): 'ME',
(2, 4, 0): 'NT',
(2, 5, 0): '2000',
(2, 5, 1): 'XP',
(2, 5, 2): '2003'
}
if win_version.has_key(ver_format):
return 'Windows' + ' ' + win_version[ver_format]
else:
return 'Windows'
elif os.name == 'posix':
executable = 'lsb_release'
params = ' --id --codename --release --short'
full_path_to_executable = is_in_path(executable, return_abs_path = True)
if full_path_to_executable:
command = executable + params
child_stdin, child_stdout = os.popen2(command)
output = temp_failure_retry(child_stdout.readline).strip()
child_stdout.close()
child_stdin.close()
# some distros put n/a in places so remove them
pattern = sre.compile(r' n/a', sre.IGNORECASE)
output = sre.sub(pattern, '', output)
return output
# lsb_release executable not available, so parse files
for distro_name in distro_info:
path_to_file = distro_info[distro_name]
if os.path.exists(path_to_file):
if os.access(path_to_file, os.X_OK):
# the file is executable (f.e. CRUX)
# yes, then run it and get the first line of output.
text = get_output_of_command(path_to_file)[0]
else:
fd = open(path_to_file)
text = fd.readline().strip() # get only first line
fd.close()
if path_to_file.endswith('version'):
# sourcemage_version has all the info we need
if not os.path.basename(path_to_file).startswith('sourcemage'):
text = distro_name + ' ' + text
elif path_to_file.endswith('aurox-release'):
# file doesn't have version
text = distro_name
elif path_to_file.endswith('lfs-release'): # file just has version
text = distro_name + ' ' + text
return text
# our last chance, ask uname and strip it
uname_output = get_output_of_command('uname -a | cut -d" " -f1,3')
if uname_output is not None:
return uname_output[0] # only first line
return 'N/A'