diff --git a/src/common/connection.py b/src/common/connection.py index fa9fdb78c20e67cf7f6e296717102679d4409728..d9ab311819f72cdaa9ebf77bdcde272146967b1d 100644 --- a/src/common/connection.py +++ b/src/common/connection.py @@ -800,6 +800,11 @@ class Connection(ConnectionHandlers): return common.xmpp.features_nb.getPrivacyLists(self.connection) + def send_keepalive(self): + # nothing received for the last foo seconds + if self.connection: + self.connection.send(' ') + def sendPing(self, pingTo=None): '''Send XMPP Ping (XEP-0199) request. If pingTo is not set, ping is sent to server to detect connection failure at application level.''' @@ -1018,7 +1023,8 @@ class Connection(ConnectionHandlers): self.connection = con if not self.connection: return - self.connection.set_send_timeout(self.keepalives, self.sendPing) + self.connection.set_send_timeout(self.keepalives, self.send_keepalive) + self.connection.set_send_timeout2(self.keepalives * 2, self.sendPing) self.connection.onreceive(None) iq = common.xmpp.Iq('get', common.xmpp.NS_PRIVACY, xmlns = '') id_ = self.connection.getAnID() diff --git a/src/common/xmpp/idlequeue.py b/src/common/xmpp/idlequeue.py index 378e27e57ac8bc77691457afd82ec743d48de4e0..19a64c20e749e981189298cf4781736b54974b89 100644 --- a/src/common/xmpp/idlequeue.py +++ b/src/common/xmpp/idlequeue.py @@ -205,6 +205,9 @@ class IdleQueue: # when there is a timeout it executes obj.read_timeout() # timeout is not removed automatically! + # {fd1: {timeout1: func1, timeout2: func2}} + # timout are unique (timeout1 must be != timeout2) + # If func1 is None, read_time function is called self.read_timeouts = {} # cb, which are executed after XX sec., alarms are removed automatically @@ -247,39 +250,57 @@ class IdleQueue: else: return False - def remove_timeout(self, fd): + def remove_timeout(self, fd, timeout=None): ''' Removes the read timeout ''' log.info('read timeout removed for fd %s' % fd) if fd in self.read_timeouts: - del(self.read_timeouts[fd]) + if timeout: + if timeout in self.read_timeouts[fd]: + del(self.read_timeouts[fd][timeout]) + if len(self.read_timeouts[fd]) == 0: + del(self.read_timeouts[fd]) + else: + del(self.read_timeouts[fd]) - def set_read_timeout(self, fd, seconds): + def set_read_timeout(self, fd, seconds, func=None): ''' Sets a new timeout. If it is not removed after specified seconds, - obj.read_timeout() will be called. + func or obj.read_timeout() will be called. - A filedescriptor fd can have only one timeout. + A filedescriptor fd can have several timeouts. ''' - log.info('read timeout set for fd %s on %s seconds' % (fd, seconds)) + log_txt = 'read timeout set for fd %s on %s seconds' % (fd, seconds) + if func: + log_txt += ' with function ' + str(func) + log.info(log_txt) timeout = self.current_time() + seconds - self.read_timeouts[fd] = timeout + if fd in self.read_timeouts: + self.read_timeouts[fd][timeout] = func + else: + self.read_timeouts[fd] = {timeout: func} def _check_time_events(self): ''' - Execute and remove alarm callbacks and execute read_timeout() for plugged - objects if specified time has ellapsed. + Execute and remove alarm callbacks and execute func() or read_timeout() + for plugged objects if specified time has ellapsed. ''' log.info('check time evs') current_time = self.current_time() - for fd, timeout in self.read_timeouts.items(): - if timeout > current_time: - continue - if fd in self.queue: - log.debug('Calling read_timeout for fd %s' % fd) - self.queue[fd].read_timeout() - else: + for fd, timeouts in self.read_timeouts.items(): + if fd not in self.queue: self.remove_timeout(fd) + continue + for timeout, func in timeouts.items(): + if timeout > current_time: + continue + if func: + log.debug('Calling %s for fd %s' % (func, fd)) + func() + else: + log.debug('Calling read_timeout for fd %s' % fd) + self.queue[fd].read_timeout() + self.remove_timeout(fd, timeout) times = self.alarms.keys() for alarm_time in times: diff --git a/src/common/xmpp/transports_nb.py b/src/common/xmpp/transports_nb.py index 71f21075bb4a8b91bbe6c20dbe21ded21c4ad39e..ad49eb6cb5dbbef5cbe7cae87c59c111a722be6c 100644 --- a/src/common/xmpp/transports_nb.py +++ b/src/common/xmpp/transports_nb.py @@ -127,13 +127,15 @@ class NonBlockingTransport(PlugIn): # type of used ssl lib (if any) will be assigned to this member var self.ssl_lib = None self._exported_methods=[self.onreceive, self.set_send_timeout, - self.set_timeout, self.remove_timeout, self.start_disconnect] + self.set_send_timeout2, self.set_timeout, self.remove_timeout, + self.start_disconnect] # time to wait for SOME stanza to come and then send keepalive self.sendtimeout = 0 # in case we want to something different than sending keepalives self.on_timeout = None + self.on_timeout2 = None def plugin(self, owner): owner.Connection = self @@ -218,15 +220,26 @@ class NonBlockingTransport(PlugIn): self.on_timeout() self.renew_send_timeout() + def read_timeout2(self): + ''' called when there's no response from server in defined timeout ''' + if self.on_timeout2: + self.on_timeout2() + self.renew_send_timeout2() + def renew_send_timeout(self): if self.on_timeout and self.sendtimeout > 0: self.set_timeout(self.sendtimeout) - else: - self.remove_timeout() + + def renew_send_timeout2(self): + if self.on_timeout2 and self.sendtimeout2 > 0: + self.set_timeout2(self.sendtimeout2) def set_timeout(self, timeout): self.idlequeue.set_read_timeout(self.fd, timeout) + def set_timeout2(self, timeout2): + self.idlequeue.set_read_timeout(self.fd, timeout2, self.read_timeout2) + def get_fd(self): pass @@ -240,6 +253,13 @@ class NonBlockingTransport(PlugIn): else: self.on_timeout = None + def set_send_timeout2(self, timeout2, on_timeout2): + self.sendtimeout2 = timeout2 + if self.sendtimeout2 > 0: + self.on_timeout2 = on_timeout2 + else: + self.on_timeout2 = None + # FIXME: where and why does this need to be called def start_disconnect(self): self.set_state(DISCONNECTING) @@ -541,7 +561,9 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject): return # we have received some bytes, stop the timeout! + self.remove_timeout() self.renew_send_timeout() + self.renew_send_timeout2() # pass received data to owner if self.on_receive: self.raise_event(DATA_RECEIVED, received)