idlequeue.py 17.7 KB
Newer Older
Yann Leboulanger's avatar
Yann Leboulanger committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
##   idlequeue.py
##
##   Copyright (C) 2006 Dimitur Kirov <dkirov@gmail.com>
##
##   This program is free software; you can redistribute it and/or modify
##   it under the terms of the GNU General Public License as published by
##   the Free Software Foundation; either version 2, or (at your option)
##   any later version.
##
##   This program is distributed in the hope that it will be useful,
##   but WITHOUT ANY WARRANTY; without even the implied warranty of
##   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
##   GNU General Public License for more details.


"""
Idlequeues are Gajim's network heartbeat. Transports can be plugged as idle
objects and be informed about possible IO
"""

import os
22
import errno
Yann Leboulanger's avatar
Yann Leboulanger committed
23 24 25 26 27 28
import select
import logging
log = logging.getLogger('nbxmpp.idlequeue')

# needed for get_idleqeue
try:
Philipp Hörist's avatar
Philipp Hörist committed
29
    from gi.repository import GLib
Yann Leboulanger's avatar
Yann Leboulanger committed
30
    HAVE_GLIB = True
Yann Leboulanger's avatar
Yann Leboulanger committed
31
except ImportError:
Yann Leboulanger's avatar
Yann Leboulanger committed
32
    HAVE_GLIB = False
Yann Leboulanger's avatar
Yann Leboulanger committed
33 34 35 36 37 38 39

# needed for idlecommand
if os.name == 'nt':
    from subprocess import * # python24 only. we ask this for Windows
elif os.name == 'posix':
    import fcntl

40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
if HAVE_GLIB:
    FLAG_WRITE = GLib.IOCondition.OUT | GLib.IOCondition.HUP
    FLAG_READ  = GLib.IOCondition.IN  | GLib.IOCondition.PRI | \
                 GLib.IOCondition.HUP
    FLAG_READ_WRITE = GLib.IOCondition.OUT | GLib.IOCondition.IN | \
                      GLib.IOCondition.PRI | GLib.IOCondition.HUP
    FLAG_CLOSE     = GLib.IOCondition.HUP
    PENDING_READ   = GLib.IOCondition.IN  # There is data to read.
    PENDING_WRITE  = GLib.IOCondition.OUT # Data CAN be written without blocking.
    IS_CLOSED      = GLib.IOCondition.HUP # Hung up (connection broken)
else:
    FLAG_WRITE      = 20 # write only           10100
    FLAG_READ       = 19 # read only            10011
    FLAG_READ_WRITE = 23 # read and write       10111
    FLAG_CLOSE      = 16 # wait for close       10000
    PENDING_READ    =  3 # waiting read event      11
    PENDING_WRITE   =  4 # waiting write event    100
    IS_CLOSED       = 16 # channel closed       10000

Yann Leboulanger's avatar
Yann Leboulanger committed
59 60 61 62 63 64 65 66 67

def get_idlequeue():
    """
    Get an appropriate idlequeue
    """
    if os.name == 'nt':
        # gobject.io_add_watch does not work on windows
        return SelectIdleQueue()
    else:
Yann Leboulanger's avatar
Yann Leboulanger committed
68
        if HAVE_GLIB:
Yann Leboulanger's avatar
Yann Leboulanger committed
69 70 71 72 73 74 75
            # Gajim's default Idlequeue
            return GlibIdleQueue()
        else:
            # GUI less implementation
            return SelectIdleQueue()


Philipp Hörist's avatar
Philipp Hörist committed
76
class IdleObject:
Yann Leboulanger's avatar
Yann Leboulanger committed
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
    """
    Idle listener interface. Listed methods are called by IdleQueue.
    """

    def __init__(self):
        self.fd = -1 #: filedescriptor, must be unique for each IdleObject

    def pollend(self):
        """
        Called on stream failure
        """
        pass

    def pollin(self):
        """
        Called on new read event
        """
        pass

    def pollout(self):
        """
        Called on new write event (connect in sockets is a pollout)
        """
        pass

    def read_timeout(self):
        """
        Called when timeout happened
        """
        pass


class IdleCommand(IdleObject):
    """
    Can be subclassed to execute commands asynchronously by the idlequeue.
    Result will be optained via file descriptor of created pipe
    """

    def __init__(self, on_result):
        IdleObject.__init__(self)
        # how long (sec.) to wait for result ( 0 - forever )
        # it is a class var, instead of a constant and we can override it.
        self.commandtimeout = 0
        # when we have some kind of result (valid, ot not) we call this handler
        self.result_handler = on_result
        # if it is True, we can safetely execute the command
        self.canexecute = True
        self.idlequeue = None
        self.result =''

    def set_idlequeue(self, idlequeue):
        self.idlequeue = idlequeue

    def _return_result(self):
        if self.result_handler:
            self.result_handler(self.result)
        self.result_handler = None

    def _compose_command_args(self):
        return ['echo', 'da']

    def _compose_command_line(self):
        """
        Return one line representation of command and its arguments
        """
Yann Leboulanger's avatar
Yann Leboulanger committed
142
        return ' '.join(self._compose_command_args())
Yann Leboulanger's avatar
Yann Leboulanger committed
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174

    def wait_child(self):
        if self.pipe.poll() is None:
            # result timeout
            if self.endtime < self.idlequeue.current_time():
                self._return_result()
                self.pipe.stdout.close()
                self.pipe.stdin.close()
            else:
                # child is still active, continue to wait
                self.idlequeue.set_alarm(self.wait_child, 0.1)
        else:
            # child has quit
            self.result = self.pipe.stdout.read()
            self._return_result()
            self.pipe.stdout.close()
            self.pipe.stdin.close()

    def start(self):
        if not self.canexecute:
            self.result = ''
            self._return_result()
            return
        if os.name == 'nt':
            self._start_nt()
        elif os.name == 'posix':
            self._start_posix()

    def _start_nt(self):
        # if program is started from noninteraactive shells stdin is closed and
        # cannot be forwarded, so we have to keep it open
        self.pipe = Popen(self._compose_command_args(), stdout=PIPE,
175
            bufsize=1024, shell=True, stderr=STDOUT, stdin=PIPE)
Yann Leboulanger's avatar
Yann Leboulanger committed
176
        if self.commandtimeout >= 0:
177 178
            self.endtime = self.idlequeue.current_time() + \
                (self.commandtimeout * 1e6)
Yann Leboulanger's avatar
Yann Leboulanger committed
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203
            self.idlequeue.set_alarm(self.wait_child, 0.1)

    def _start_posix(self):
        self.pipe = os.popen(self._compose_command_line())
        self.fd = self.pipe.fileno()
        fcntl.fcntl(self.pipe, fcntl.F_SETFL, os.O_NONBLOCK)
        self.idlequeue.plug_idle(self, False, True)
        if self.commandtimeout >= 0:
            self.idlequeue.set_read_timeout(self.fd, self.commandtimeout)

    def end(self):
        self.idlequeue.unplug_idle(self.fd)
        try:
            self.pipe.close()
        except:
            pass

    def pollend(self):
        self.idlequeue.remove_timeout(self.fd)
        self.end()
        self._return_result()

    def pollin(self):
        try:
            res = self.pipe.read()
Yann Leboulanger's avatar
Yann Leboulanger committed
204
        except Exception as e:
Yann Leboulanger's avatar
Yann Leboulanger committed
205 206 207 208 209 210 211 212 213 214 215
            res = ''
        if res == '':
            return self.pollend()
        else:
            self.result += res

    def read_timeout(self):
        self.end()
        self._return_result()


Philipp Hörist's avatar
Philipp Hörist committed
216
class IdleQueue:
Yann Leboulanger's avatar
Yann Leboulanger committed
217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255
    """
    IdleQueue provide three distinct time based features. Uses select.poll()

        1. Alarm timeout: Execute a callback after foo seconds
        2. Timeout event: Call read_timeout() of an plugged object if a timeout
           has been set, but not removed in time.
        3. Check file descriptor of plugged objects for read, write and error
           events

    """

    # (timeout, boolean)
    # Boolean is True if timeout is specified in seconds, False means miliseconds
    PROCESS_TIMEOUT = (100, False)

    def __init__(self):
        self.queue = {}

        # when there is a timeout it executes obj.read_timeout()
        # timeout is not removed automatically!
        # {fd1: {timeout1: func1, timeout2: func2}}
        # timout are unique (timeout1 must be != timeout2)
        # If func1 is None, read_time function is called
        self.read_timeouts = {}

        # cb, which are executed after XX sec., alarms are removed automatically
        self.alarms = {}
        self._init_idle()

    def _init_idle(self):
        """
        Hook method for subclassed. Will be called by __init__
        """
        self.selector = select.poll()

    def set_alarm(self, alarm_cb, seconds):
        """
        Set up a new alarm. alarm_cb will be called after specified seconds.
        """
256
        alarm_time = self.current_time() + (seconds * 1e6)
Yann Leboulanger's avatar
Yann Leboulanger committed
257 258 259 260 261 262 263 264 265 266 267 268
        # almost impossible, but in case we have another alarm_cb at this time
        if alarm_time in self.alarms:
            self.alarms[alarm_time].append(alarm_cb)
        else:
            self.alarms[alarm_time] = [alarm_cb]
        return alarm_time

    def remove_alarm(self, alarm_cb, alarm_time):
        """
        Remove alarm callback alarm_cb scheduled on alarm_time. Returns True if
        it was removed sucessfully, otherwise False
        """
269
        if alarm_time not in self.alarms:
Yann Leboulanger's avatar
Yann Leboulanger committed
270 271 272 273 274 275 276 277
            return False
        i = -1
        for i in range(len(self.alarms[alarm_time])):
            # let's not modify the list inside the loop
            if self.alarms[alarm_time][i] is alarm_cb:
                break
        if i != -1:
            del self.alarms[alarm_time][i]
278
            if not self.alarms[alarm_time]:
Yann Leboulanger's avatar
Yann Leboulanger committed
279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308
                del self.alarms[alarm_time]
            return True
        else:
            return False

    def remove_timeout(self, fd, timeout=None):
        """
        Remove the read timeout
        """
        log.info('read timeout removed for fd %s' % fd)
        if fd in self.read_timeouts:
            if timeout:
                if timeout in self.read_timeouts[fd]:
                    del(self.read_timeouts[fd][timeout])
                if len(self.read_timeouts[fd]) == 0:
                    del(self.read_timeouts[fd])
            else:
                del(self.read_timeouts[fd])

    def set_read_timeout(self, fd, seconds, func=None):
        """
        Seta a new timeout. If it is not removed after specified seconds,
        func or obj.read_timeout() will be called

        A filedescriptor fd can have several timeouts.
        """
        log_txt = 'read timeout set for fd %s on %s seconds' % (fd, seconds)
        if func:
            log_txt += ' with function ' + str(func)
        log.info(log_txt)
309
        timeout = self.current_time() + (seconds * 1e6)
Yann Leboulanger's avatar
Yann Leboulanger committed
310 311 312 313 314 315 316 317 318 319 320 321
        if fd in self.read_timeouts:
            self.read_timeouts[fd][timeout] = func
        else:
            self.read_timeouts[fd] = {timeout: func}

    def _check_time_events(self):
        """
        Execute and remove alarm callbacks and execute func() or read_timeout()
        for plugged objects if specified time has ellapsed
        """
        current_time = self.current_time()

322
        for fd, timeouts in list(self.read_timeouts.items()):
Yann Leboulanger's avatar
Yann Leboulanger committed
323 324 325
            if fd not in self.queue:
                self.remove_timeout(fd)
                continue
326
            for timeout, func in list(timeouts.items()):
Yann Leboulanger's avatar
Yann Leboulanger committed
327 328 329 330 331 332 333 334 335 336
                if timeout > current_time:
                    continue
                if func:
                    log.debug('Calling %s for fd %s' % (func, fd))
                    func()
                else:
                    log.debug('Calling read_timeout for fd %s' % fd)
                    self.queue[fd].read_timeout()
                self.remove_timeout(fd, timeout)

337
        times = list(self.alarms.keys())
Yann Leboulanger's avatar
Yann Leboulanger committed
338 339 340 341 342 343 344 345 346 347 348 349 350 351 352
        for alarm_time in times:
            if alarm_time > current_time:
                continue
            if alarm_time in self.alarms:
                for callback in self.alarms[alarm_time]:
                    callback()
                if alarm_time in self.alarms:
                    del(self.alarms[alarm_time])

    def plug_idle(self, obj, writable=True, readable=True):
        """
        Plug an IdleObject into idlequeue. Filedescriptor fd must be set

        :param obj: the IdleObject
        :param writable: True if obj has data to sent
353
        :param readable: True if obj expects data to be received
Yann Leboulanger's avatar
Yann Leboulanger committed
354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388
        """
        if obj.fd == -1:
            return
        if obj.fd in self.queue:
            self.unplug_idle(obj.fd)
        self.queue[obj.fd] = obj
        if writable:
            if not readable:
                flags = FLAG_WRITE
            else:
                flags = FLAG_READ_WRITE
        else:
            if readable:
                flags = FLAG_READ
            else:
                # when we paused a FT, we expect only a close event
                flags = FLAG_CLOSE
        self._add_idle(obj.fd, flags)

    def _add_idle(self, fd, flags):
        """
        Hook method for subclasses, called by plug_idle
        """
        self.selector.register(fd, flags)

    def unplug_idle(self, fd):
        """
        Remove plugged IdleObject, specified by filedescriptor fd
        """
        if fd in self.queue:
            del(self.queue[fd])
            self._remove_idle(fd)

    def current_time(self):
        from time import time
389
        return time() * 1e6
Yann Leboulanger's avatar
Yann Leboulanger committed
390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425

    def _remove_idle(self, fd):
        """
        Hook method for subclassed, called by unplug_idle
        """
        self.selector.unregister(fd)

    def _process_events(self, fd, flags):
        obj = self.queue.get(fd)
        if obj is None:
            self.unplug_idle(fd)
            return False

        read_write = False
        if flags & PENDING_READ:
            #print 'waiting read on %d, flags are %d' % (fd, flags)
            obj.pollin()
            read_write = True

        elif flags & PENDING_WRITE and not flags & IS_CLOSED:
            obj.pollout()
            read_write = True

        if flags & IS_CLOSED:
            # io error, don't expect more events
            self.remove_timeout(obj.fd)
            self.unplug_idle(obj.fd)
            obj.pollend()
            return False

        if read_write:
            return True
        return False

    def process(self):
        """
426 427
        This function must be overridden by an implementation of the IdleQueue.

Yann Leboulanger's avatar
Yann Leboulanger committed
428 429 430 431 432 433
        Process idlequeue. Check for any pending timeout or alarm events.  Call
        IdleObjects on possible and requested read, write and error events on
        their file descriptors

        Call this in regular intervals.
        """
434
        raise NotImplementedError("You need to define a process() method.")
Yann Leboulanger's avatar
Yann Leboulanger committed
435 436 437 438 439 440 441 442 443

class SelectIdleQueue(IdleQueue):
    """
    Extends IdleQueue to use select.select() for polling

    This class exisists for the sake of gtk2.8 on windows, which doesn't seem to
    support io_add_watch properly (yet)
    """

444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462
    def checkQueue(self):
        """
        Iterates through all known file descriptors and uses os.stat to check if they're valid.
        Greatly improves performance if the caller hands us and expects notification on an invalid file handle.
        """
        bad_fds=[]
        union={}
        union.update(self.write_fds)
        union.update(self.read_fds)
        union.update(self.error_fds)
        for fd in (union.keys()):
            try:
                status = os.stat(fd)
            except OSError as e:
                # This file descriptor is invalid. Add to list for closure.
                bad_fds.append(fd)

        for fd in (bad_fds):
            obj = self.queue.get(fd)
463 464
            if obj is not None:
                self.remove_timeout(fd)
465 466
            self.unplug_idle(fd)

Yann Leboulanger's avatar
Yann Leboulanger committed
467 468 469 470 471 472 473 474 475 476
    def _init_idle(self):
        """
        Create a dict, which maps file/pipe/sock descriptor to glib event id
        """
        self.read_fds = {}
        self.write_fds = {}
        self.error_fds = {}

    def _add_idle(self, fd, flags):
        """
477
        This method is called when we plug a new idle object. Add descriptor
Yann Leboulanger's avatar
Yann Leboulanger committed
478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502
        to read/write/error lists, according flags
        """
        if flags & 3:
            self.read_fds[fd] = fd
        if flags & 4:
            self.write_fds[fd] = fd
        self.error_fds[fd] = fd

    def _remove_idle(self, fd):
        """
        This method is called when we unplug a new idle object. Remove descriptor
        from read/write/error lists
        """
        if fd in self.read_fds:
            del(self.read_fds[fd])
        if fd in self.write_fds:
            del(self.write_fds[fd])
        if fd in self.error_fds:
            del(self.error_fds[fd])

    def process(self):
        if not self.write_fds and not self.read_fds:
            self._check_time_events()
            return True
        try:
Yann Leboulanger's avatar
Yann Leboulanger committed
503 504
            waiting_descriptors = select.select(list(self.read_fds.keys()),
                    list(self.write_fds.keys()), list(self.error_fds.keys()), 0)
505
        except OSError as e:
Yann Leboulanger's avatar
Yann Leboulanger committed
506
            waiting_descriptors = ((), (), ())
507 508
            if e.errno != errno.EINTR:
                self.checkQueue()
Yann Leboulanger's avatar
Yann Leboulanger committed
509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546
                raise
        for fd in waiting_descriptors[0]:
            q = self.queue.get(fd)
            if q:
                q.pollin()
        for fd in waiting_descriptors[1]:
            q = self.queue.get(fd)
            if q:
                q.pollout()
        for fd in waiting_descriptors[2]:
            q = self.queue.get(fd)
            if q:
                q.pollend()
        self._check_time_events()
        return True


class GlibIdleQueue(IdleQueue):
    """
    Extends IdleQueue to use glib io_add_wath, instead of select/poll In another
    'non gui' implementation of Gajim IdleQueue can be used safetly
    """

    # (timeout, boolean)
    # Boolean is True if timeout is specified in seconds, False means miliseconds
    PROCESS_TIMEOUT = (2, True)

    def _init_idle(self):
        """
        Creates a dict, which maps file/pipe/sock descriptor to glib event id
        """
        self.events = {}

    def _add_idle(self, fd, flags):
        """
        This method is called when we plug a new idle object. Start listening for
        events from fd
        """
Philipp Hörist's avatar
Philipp Hörist committed
547 548 549
        res = GLib.io_add_watch(fd, GLib.PRIORITY_LOW, flags,
            self._process_events)

Yann Leboulanger's avatar
Yann Leboulanger committed
550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567
        # store the id of the watch, so that we can remove it on unplug
        self.events[fd] = res

    def _process_events(self, fd, flags):
        try:
            return IdleQueue._process_events(self, fd, flags)
        except Exception:
            self._remove_idle(fd)
            self._add_idle(fd, flags)
            raise

    def _remove_idle(self, fd):
        """
        This method is called when we unplug a new idle object. Stop listening
        for events from fd
        """
        if not fd in self.events:
            return
Philipp Hörist's avatar
Philipp Hörist committed
568 569

        GLib.source_remove(self.events[fd])
Yann Leboulanger's avatar
Yann Leboulanger committed
570 571 572 573
        del(self.events[fd])

    def process(self):
        self._check_time_events()
574

Philipp Hörist's avatar
Philipp Hörist committed
575
    current_time = GLib.get_real_time