diff --git a/src/common/xmpp/dispatcher_nb.py b/src/common/xmpp/dispatcher_nb.py index ca13af184327e9ff6c0091dfbb6d6bd7f0ca0674..7d57fdc19121e1a92bb1b772e633573f40fa3b6c 100644 --- a/src/common/xmpp/dispatcher_nb.py +++ b/src/common/xmpp/dispatcher_nb.py @@ -400,7 +400,7 @@ class Dispatcher(PlugIn): ''' Serialise stanza and put it on the wire. Assign an unique ID to it before send. Returns assigned ID.''' if type(stanza) in [type(''), type(u'')]: - return self._owner.Connection.send(stanza) + return self._owner.send_stanza(stanza) if not isinstance(stanza, Protocol): _ID=None elif not stanza.getID(): @@ -423,7 +423,7 @@ class Dispatcher(PlugIn): stanza=route stanza.setNamespace(self._owner.Namespace) stanza.setParent(self._metastream) - self._owner.Connection.send(stanza) + self._owner.send_stanza(stanza) return _ID def disconnect(self): diff --git a/src/common/zeroconf/client_zeroconf.py b/src/common/zeroconf/client_zeroconf.py index e487ec5e68ace0eea67250821c3499309bcca910..452d16324e78f4f804e7ef4e80a28c507393f99f 100644 --- a/src/common/zeroconf/client_zeroconf.py +++ b/src/common/zeroconf/client_zeroconf.py @@ -1,6 +1,7 @@ ## common/zeroconf/client_zeroconf.py ## ## Copyright (C) 2006 Stefan Bethge <stefan@lanpartei.de> +## 2006 Dimitur Kirov <dkirov@gmail.com> ## ## This program is free software; you can redistribute it and/or modify ## it under the terms of the GNU General Public License as published @@ -11,14 +12,314 @@ ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ## GNU General Public License for more details. ## - +from common import gajim +from common.xmpp.idlequeue import IdleObject +from common.xmpp import dispatcher_nb, debug +from common.xmpp.client import * +from common.xmpp.simplexml import ustr +from dialogs import BindPortError +import socket +import errno from common.zeroconf import roster_zeroconf +MAX_BUFF_LEN = 65536 +DATA_RECEIVED='DATA RECEIVED' +DATA_SENT='DATA SENT' + + +class ZeroconfListener(IdleObject): + def __init__(self, port, caller = None): + ''' handle all incomming connections on ('0.0.0.0', port)''' + self.port = port + self.queue_idx = -1 + #~ self.queue = None + self.started = False + self._sock = None + self.fd = -1 + self.caller = caller + + def bind(self): + self._serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self._serv.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + self._serv.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + # will fail when port as busy, or we don't have rights to bind + try: + self._serv.bind(('0.0.0.0', self.port)) + except Exception, e: + # unable to bind, show error dialog + return None + self._serv.listen(socket.SOMAXCONN) + self._serv.setblocking(False) + self.fd = self._serv.fileno() + gajim.idlequeue.plug_idle(self, False, True) + self.started = True + + def pollend(self): + ''' called when we stop listening on (host, port) ''' + self.disconnect2() + + def pollin(self): + ''' accept a new incomming connection and notify queue''' + sock = self.accept_conn() + P2PConnection('', sock[0], sock[1][0], sock[1][1], self.caller) + + def disconnect(self): + ''' free all resources, we are not listening anymore ''' + gajim.idlequeue.remove_timeout(self.fd) + gajim.idlequeue.unplug_idle(self.fd) + self.fd = -1 + self.started = False + try: + self._serv.close() + except: + pass + + def accept_conn(self): + ''' accepts a new incomming connection ''' + _sock = self._serv.accept() + _sock[0].setblocking(False) + return _sock + + +class P2PConnection(IdleObject, PlugIn): + ''' class for sending file to socket over socks5 ''' + def __init__(self, sock_hash, _sock, host = None, port = None, caller = None): + PlugIn.__init__(self) + self.sendqueue = [] + self.sendbuff = None + self._sock = _sock + self._sock.setblocking(False) + self.fd = _sock.fileno() + self._recv = _sock.recv + self._send = _sock.send + self.connected = True + self.state = 1 + self.writable = False + self.readable = False + # waiting for first bytes + # start waiting for data + self.Namespace = 'jabber:client' + self.defaultNamespace = self.Namespace + self._component=0 + self._caller = caller + self.Server = host + self.Connection = self + self._registered_name = None + self.DBG = 'client' + debug = ['always', 'nodebuilder'] + self._DEBUG = Debug.Debug(debug) + self.DEBUG = self._DEBUG.Show + self.debug_flags = self._DEBUG.debug_flags + self.debug_flags.append(self.DBG) + self._owner = self + self._exported_methods=[self.send_stanza, self.disconnect2, self.pollend] + self.on_receive = None + gajim.idlequeue.plug_idle(self, False, True) + self.onreceive(self._on_receive_document_attrs) + dispatcher_nb.Dispatcher().PlugIn(self) + self.RegisterHandler('message', self._messageCB) + + + def _messageCB(self, conn, data): + self._caller._messageCB(self.Server, conn, data) + + def onreceive(self, recv_handler): + if not recv_handler: + if hasattr(self._owner, 'Dispatcher'): + self.on_receive = self._owner.Dispatcher.ProcessNonBlocking + else: + self.on_receive = None + return + _tmp = self.on_receive + # make sure this cb is not overriden by recursive calls + if not recv_handler(None) and _tmp == self.on_receive: + self.on_receive = recv_handler + + def _on_receive_document_attrs(self, data): + if data: + self.Dispatcher.ProcessNonBlocking(data) + if not hasattr(self, 'Dispatcher') or \ + self.Dispatcher.Stream._document_attrs is None: + return + self.onreceive(None) + if self.Dispatcher.Stream._document_attrs.has_key('version') and \ + self.Dispatcher.Stream._document_attrs['version'] == '1.0': + #~ self.onreceive(self._on_receive_stream_features) + #XXX continue with TLS + return + self.onreceive(None) + return True + + def send_stanza(self, stanza): + '''Append stanza to the queue of messages to be send. + If supplied data is unicode string, encode it to utf-8. + ''' + if self.state <= 0: + return + r = stanza + if isinstance(r, unicode): + r = r.encode('utf-8') + elif not isinstance(r, str): + r = ustr(r).encode('utf-8') + self.sendqueue.append(r) + self._plug_idle() + + def read_timeout(self): + gajim.idlequeue.remove_timeout(self.fd) + # no activity for foo seconds + # self.pollend() + + def pollout(self): + if not self.connected: + self.disconnect2() + return + gajim.idlequeue.remove_timeout(self.fd) + self._do_send() + # self.idlequeue.plug_idle(self, False, True) + + def pollend(self): + self.state = -1 + self.disconnect2() + + def pollin(self): + ''' Reads all pending incoming data. Calls owner's disconnected() method if appropriate.''' + received = '' + errnum = 0 + try: + # get as many bites, as possible, but not more than RECV_BUFSIZE + received = self._recv(MAX_BUFF_LEN) + except Exception, e: + if len(e.args) > 0 and isinstance(e.args[0], int): + errnum = e[0] + sys.exc_clear() + # "received" will be empty anyhow + if errnum == socket.SSL_ERROR_WANT_READ: + pass + elif errnum in [errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN]: + self.pollend() + # don't proccess result, cas it will raise error + return + elif not received : + if errnum != socket.SSL_ERROR_EOF: + # 8 EOF occurred in violation of protocol + self.pollend() + if self.state >= 0: + self.disconnect2() + return + + if self.state < 0: + return + if self.on_receive: + if hasattr(self._owner, 'Dispatcher'): + self._owner.Dispatcher.Event('', DATA_RECEIVED, received) + self.on_receive(received) + else: + # This should never happed, so we need the debug + self.DEBUG('Unhandled data received: %s' % received,'got') + self.disconnect2() + if self.on_connect_failure: + self.on_connect_failure() + return True + + def onreceive(self, recv_handler): + if not recv_handler: + if hasattr(self._owner, 'Dispatcher'): + self.on_receive = self._owner.Dispatcher.ProcessNonBlocking + else: + self.on_receive = None + return + _tmp = self.on_receive + # make sure this cb is not overriden by recursive calls + if not recv_handler(None) and _tmp == self.on_receive: + self.on_receive = recv_handler + + def disconnect2(self): + ''' Closes the socket. ''' + gajim.idlequeue.remove_timeout(self.fd) + gajim.idlequeue.unplug_idle(self.fd) + try: + self._sock.shutdown(socket.SHUT_RDWR) + self._sock.close() + except: + # socket is already closed + pass + self.connected = False + self.fd = -1 + self.state = -1 + + def _do_send(self): + if not self.sendbuff: + if not self.sendqueue: + return None # nothing to send + self.sendbuff = self.sendqueue.pop(0) + self.sent_data = self.sendbuff + try: + send_count = self._send(self.sendbuff) + if send_count: + self.sendbuff = self.sendbuff[send_count:] + if not self.sendbuff and not self.sendqueue: + if self.state < 0: + gajim.idlequeue.unplug_idle(self.fd) + self._on_send() + self.disconnect2() + return + # we are not waiting for write + self._plug_idle() + self._on_send() + except socket.error, e: + sys.exc_clear() + if e[0] == socket.SSL_ERROR_WANT_WRITE: + return True + if self.state < 0: + self.disconnect2() + return + if self._on_send_failure: + self._on_send_failure() + return + return True + + def _plug_idle(self): + readable = self.state != 0 + if self.sendqueue or self.sendbuff: + writable = True + else: + writable = False + if self.writable != writable or self.readable != readable: + gajim.idlequeue.plug_idle(self, writable, readable) + + + def _on_send(self): + if self.sent_data and self.sent_data.strip(): + #~ self.DEBUG(self.sent_data,'sent') + if hasattr(self._owner, 'Dispatcher'): + self._owner.Dispatcher.Event('', DATA_SENT, self.sent_data) + self.sent_data = None + + def _on_send_failure(self): + self.DEBUG("Socket error while sending data",'error') + self._owner.disconnected() + self.sent_data = None + + class ClientZeroconf: - def __init__(self, zeroconf): + def __init__(self, zeroconf, caller): self.roster = roster_zeroconf.Roster(zeroconf) + self.caller = caller + self.start_listener(zeroconf.port) + + def start_listener(self, port): + self.listener = ZeroconfListener(port, self.caller) + self.listener.bind() + if self.listener.started is False: + self.listener = None + # We cannot bind port, call error + # dialog from dialogs.py and fail + BindPortError(port) + return None + #~ self.connected += 1 def getRoster(self): return self.roster.getRoster() diff --git a/src/common/zeroconf/connection_handlers_zeroconf.py b/src/common/zeroconf/connection_handlers_zeroconf.py index 758a181e2e11fb358affc74f540f7b647b9890e2..8a4937e071f5134da2cdeed9725667c4c78d3c5f 100644 --- a/src/common/zeroconf/connection_handlers_zeroconf.py +++ b/src/common/zeroconf/connection_handlers_zeroconf.py @@ -33,7 +33,7 @@ import common.xmpp from common import GnuPG from common import helpers from common import gajim - +from common.zeroconf import zeroconf STATUS_LIST = ['offline', 'connecting', 'online', 'chat', 'away', 'xa', 'dnd', 'invisible'] # kind of events we can wait for an answer @@ -227,6 +227,108 @@ class ConnectionHandlersZeroconf(ConnectionVcard): idle.init() except: HAS_IDLE = False + def _messageCB(self, ip, con, msg): + '''Called when we receive a message''' + msgtxt = msg.getBody() + mtype = msg.getType() + subject = msg.getSubject() # if not there, it's None + tim = msg.getTimestamp() + tim = time.strptime(tim, '%Y%m%dT%H:%M:%S') + tim = time.localtime(timegm(tim)) + frm = helpers.get_full_jid_from_iq(msg) + if frm == 'none': + for key in self.zeroconf.contacts: + if ip == self.zeroconf.contacts[key][zeroconf.C_ADDRESS]: + frm = key + jid = helpers.get_jid_from_iq(msg) + print 'jid', jid + no_log_for = gajim.config.get_per('accounts', self.name, + 'no_log_for').split() + encrypted = False + chatstate = None + encTag = msg.getTag('x', namespace = common.xmpp.NS_ENCRYPTED) + decmsg = '' + # invitations + invite = None + if not encTag: + invite = msg.getTag('x', namespace = common.xmpp.NS_MUC_USER) + if invite and not invite.getTag('invite'): + invite = None + delayed = msg.getTag('x', namespace = common.xmpp.NS_DELAY) != None + msg_id = None + composing_jep = None + # FIXME: Msn transport (CMSN1.2.1 and PyMSN0.10) do NOT RECOMMENDED + # invitation + # stanza (MUC JEP) remove in 2007, as we do not do NOT RECOMMENDED + xtags = msg.getTags('x') + # chatstates - look for chatstate tags in a message if not delayed + if not delayed: + composing_jep = False + children = msg.getChildren() + for child in children: + if child.getNamespace() == 'http://jabber.org/protocol/chatstates': + chatstate = child.getName() + composing_jep = 'JEP-0085' + break + # No JEP-0085 support, fallback to JEP-0022 + if not chatstate: + chatstate_child = msg.getTag('x', namespace = common.xmpp.NS_EVENT) + if chatstate_child: + chatstate = 'active' + composing_jep = 'JEP-0022' + if not msgtxt and chatstate_child.getTag('composing'): + chatstate = 'composing' + # JEP-0172 User Nickname + user_nick = msg.getTagData('nick') + if not user_nick: + user_nick = '' + + if encTag and GnuPG.USE_GPG: + #decrypt + encmsg = encTag.getData() + + keyID = gajim.config.get_per('accounts', self.name, 'keyid') + if keyID: + decmsg = self.gpg.decrypt(encmsg, keyID) + if decmsg: + msgtxt = decmsg + encrypted = True + if mtype == 'error': + error_msg = msg.getError() + if not error_msg: + error_msg = msgtxt + msgtxt = None + if self.name not in no_log_for: + gajim.logger.write('error', frm, error_msg, tim = tim, + subject = subject) + self.dispatch('MSGERROR', (frm, msg.getErrorCode(), error_msg, msgtxt, + tim)) + elif mtype == 'chat': # it's type 'chat' + if not msg.getTag('body') and chatstate is None: #no <body> + return + if msg.getTag('body') and self.name not in no_log_for and jid not in\ + no_log_for and msgtxt: + msg_id = gajim.logger.write('chat_msg_recv', frm, msgtxt, tim = tim, + subject = subject) + self.dispatch('MSG', (frm, msgtxt, tim, encrypted, mtype, subject, + chatstate, msg_id, composing_jep, user_nick)) + else: # it's single message + if self.name not in no_log_for and jid not in no_log_for and msgtxt: + gajim.logger.write('single_msg_recv', frm, msgtxt, tim = tim, + subject = subject) + if invite is not None: + item = invite.getTag('invite') + jid_from = item.getAttr('from') + if jid_from == None: + jid_from = frm + reason = item.getTagData('reason') + item = invite.getTag('password') + password = invite.getTagData('password') + self.dispatch('GC_INVITATION',(frm, jid_from, reason, password)) + else: + self.dispatch('MSG', (frm, msgtxt, tim, encrypted, 'normal', + subject, chatstate, msg_id, composing_jep, user_nick)) + # END messageCB ''' def build_http_auth_answer(self, iq_obj, answer): if answer == 'yes': diff --git a/src/common/zeroconf/connection_zeroconf.py b/src/common/zeroconf/connection_zeroconf.py index 1b2ee789635943e07db98ef12e0e4eec746f38ab..8ab243490e4d69d8408c31e8f7ed0a4b3797356d 100644 --- a/src/common/zeroconf/connection_zeroconf.py +++ b/src/common/zeroconf/connection_zeroconf.py @@ -184,7 +184,7 @@ class ConnectionZeroconf(ConnectionHandlersZeroconf): return self.connection, '' if self.zeroconf.connect(): - self.connection = client_zeroconf.ClientZeroconf(self.zeroconf) + self.connection = client_zeroconf.ClientZeroconf(self.zeroconf, self) self.roster = self.connection.getRoster() self.dispatch('ROSTER', self.roster) @@ -197,7 +197,7 @@ class ConnectionZeroconf(ConnectionHandlersZeroconf): # refresh all contacts data every second self.call_resolve_timeout = True - gobject.timeout_add(1000, self._on_resolve_timeout) + gobject.timeout_add(10000, self._on_resolve_timeout) else: pass #TODO: display visual notification that we could not connect to avahi @@ -487,7 +487,13 @@ class ConnectionZeroconf(ConnectionHandlersZeroconf): def send_keepalive(self): # nothing received for the last foo seconds (60 secs by default) - if self.connection: - self.connection.send(' ') + pass + def _event_dispatcher(self, realm, event, data): + if realm == '': + if event == common.xmpp.transports.DATA_RECEIVED: + self.dispatch('STANZA_ARRIVED', unicode(data, errors = 'ignore')) + elif event == common.xmpp.transports.DATA_SENT: + self.dispatch('STANZA_SENT', unicode(data)) + # END ConnectionZeroconf diff --git a/src/common/zeroconf/zeroconf.py b/src/common/zeroconf/zeroconf.py index 9aea0a97ee4d66c72c1c941e73e3b97f319679b3..3b95977842b92185cd84b5de5479e4ab4c8bc8e9 100755 --- a/src/common/zeroconf/zeroconf.py +++ b/src/common/zeroconf/zeroconf.py @@ -300,7 +300,7 @@ class Zeroconf: return False def send (self, msg, sock): - print 'send:'+msg + print 'send:', msg totalsent = 0 while totalsent < len(msg): sent = sock.send(msg[totalsent:]) @@ -309,13 +309,14 @@ class Zeroconf: totalsent = totalsent + sent def send_message(self, jid, msg, type = 'chat'): - print 'zeroconf.py: send_message:'+ msg - + print 'zeroconf.py: send_message:', msg + if not msg : + return sock = socket.socket ( socket.AF_INET, socket.SOCK_STREAM ) #sock.setblocking(False) sock.connect ( ( self.contacts[jid][C_ADDRESS], self.contacts[jid][C_PORT] ) ) - #print (self.txt_array_to_dict(self.contacts[jid][C_TXT]))['port.p2pj'] + print (self.txt_array_to_dict(self.contacts[jid][C_TXT]))['port.p2pj'] #was for adium which uses the txt record #sock.connect ( ( self.contacts[jid][5], int((self.txt_array_to_dict(self.contacts[jid][7]))['port.p2pj']) ) )