Newer
Older
##
## Copyright (C) 2006 Stefan Bethge <stefan@lanpartei.de>
## 2006 Dimitur Kirov <dkirov@gmail.com>
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published
## by the Free Software Foundation; version 2 only.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
from common.xmpp.idlequeue import IdleObject
from common.xmpp.client import *
from common.xmpp.simplexml import ustr
MAX_BUFF_LEN = 65536
DATA_RECEIVED='DATA RECEIVED'
DATA_SENT='DATA SENT'
# wait XX sec to establish a connection
CONNECT_TIMEOUT_SECONDS = 30
# after XX sec with no activity, close the stream
ACTIVITY_TIMEOUT_SECONDS = 180
def __init__(self, port, conn_holder):
''' handle all incomming connections on ('0.0.0.0', port)'''
self.port = port
self.queue_idx = -1
#~ self.queue = None
self.started = False
self._sock = None
self.fd = -1
self.caller = conn_holder.caller
self.conn_holder = conn_holder
def bind(self):
self._serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._serv.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
self._serv.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
try:
self._serv.bind(('0.0.0.0', self.port))
except Exception, e:
# unable to bind, show error dialog
return None
self._serv.listen(socket.SOMAXCONN)
self._serv.setblocking(False)
self.fd = self._serv.fileno()
gajim.idlequeue.plug_idle(self, False, True)
self.started = True
def pollend(self):
''' called when we stop listening on (host, port) '''
def pollin(self):
''' accept a new incomming connection and notify queue'''
sock = self.accept_conn()
P2PClient(sock[0], sock[1][0], sock[1][1], self.conn_holder)
def disconnect(self):
''' free all resources, we are not listening anymore '''
gajim.idlequeue.remove_timeout(self.fd)
gajim.idlequeue.unplug_idle(self.fd)
self.fd = -1
self.started = False
try:
self._serv.close()
except:
pass
self.conn_holder.kill_all_connections()
def accept_conn(self):
''' accepts a new incomming connection '''
_sock = self._serv.accept()
_sock[0].setblocking(False)
return _sock
def __init__(self, _sock, host, port, conn_holder, messagequeue = [], to = None):
self._owner = self
self.Namespace = 'jabber:client'
self.defaultNamespace = self.Namespace
self._caller = conn_holder.caller
self.conn_holder = conn_holder
if gajim.verbose:
debug = ['always', 'nodebuilder']
else:
debug = []
self._DEBUG = Debug.Debug(debug)
self.DEBUG = self._DEBUG.Show
self.debug_flags = self._DEBUG.debug_flags
self.debug_flags.append(self.DBG)
if _sock:
self.sock_type = TYPE_SERVER
else:
self.sock_type = TYPE_CLIENT
conn = P2PConnection('', _sock, host, port, self._caller, self.on_connect)
self.sock_hash = conn._sock.__hash__
self.conn_holder.add_connection(self, self.Server, self.to)
def add_message(self, message):
if self.Connection:
if self.Connection.state == -1:
return False
self.send(message)
else:
self.messagequeue.append(message)
def on_connect(self, conn):
self.Connection = conn
self.Connection.PlugIn(self)
dispatcher_nb.Dispatcher().PlugIn(self)
if self.sock_type == TYPE_CLIENT:
while self.messagequeue:
message = self.messagequeue.pop(0)
self.send(message)
def StreamInit(self):
''' Send an initial stream header. '''
self.Dispatcher.Stream = simplexml.NodeBuilder()
self.Dispatcher.Stream._dispatch_depth = 2
self.Dispatcher.Stream.dispatch = self.Dispatcher.dispatch
self.Dispatcher.Stream.stream_header_received = self._check_stream_start
self.debug_flags.append(simplexml.DBG_NODEBUILDER)
self.Dispatcher.Stream.DEBUG = self.DEBUG
self.Dispatcher.Stream.features = None
if self.sock_type == TYPE_CLIENT:
self.send_stream_header()
def send_stream_header(self):
self.Dispatcher._metastream = Node('stream:stream')
self.Dispatcher._metastream.setNamespace(self.Namespace)
#~ self._metastream.setAttr('version', '1.0')
self.Dispatcher._metastream.setAttr('xmlns:stream', NS_STREAMS)
self.Dispatcher.send("<?xml version='1.0'?>%s>" % str(self.Dispatcher._metastream)[:-2])
def _check_stream_start(self, ns, tag, attrs):
if ns<>NS_STREAMS or tag<>'stream':
self.Connection.DEBUG('Incorrect stream start: (%s,%s).Terminating! ' % (tag, ns), 'error')
self.Connection.disconnect()
return
if self.sock_type == TYPE_SERVER:
self.send_stream_header()
while self.messagequeue:
message = self.messagequeue.pop(0)
def on_disconnect(self):
if self.conn_holder:
self.conn_holder.remove_connection(self.sock_hash)
if self.__dict__.has_key('Dispatcher'):
self.Dispatcher.PlugOut()
if self.__dict__.has_key('P2PConnection'):
self.P2PConnection.PlugOut()
self.Connection = None
self._caller = None
self.conn_holder = None
def force_disconnect(self):
if self.Connection:
self.disconnect()
else:
self.on_disconnect()
def _on_receive_document_attrs(self, data):
if data:
self.Dispatcher.ProcessNonBlocking(data)
if not hasattr(self, 'Dispatcher') or \
self.Dispatcher.Stream._document_attrs is None:
return
self.onreceive(None)
if self.Dispatcher.Stream._document_attrs.has_key('version') and \
self.Dispatcher.Stream._document_attrs['version'] == '1.0':
#~ self.onreceive(self._on_receive_stream_features)
#XXX continue with TLS
return
self.onreceive(None)
return True
def _register_handlers(self):
self.RegisterHandler('message', lambda conn, data:self._caller._messageCB(self.Server, conn, data))
self.RegisterHandler('iq', self._caller._siSetCB, 'set',
common.xmpp.NS_SI)
self.RegisterHandler('iq', self._caller._siErrorCB, 'error',
common.xmpp.NS_SI)
self.RegisterHandler('iq', self._caller._siResultCB, 'result',
common.xmpp.NS_SI)
self.RegisterHandler('iq', self._caller._bytestreamSetCB, 'set',
common.xmpp.NS_BYTESTREAM)
self.RegisterHandler('iq', self._caller._bytestreamResultCB, 'result',
common.xmpp.NS_BYTESTREAM)
self.RegisterHandler('iq', self._caller._bytestreamErrorCB, 'error',
common.xmpp.NS_BYTESTREAM)
class P2PConnection(IdleObject, PlugIn):
''' class for sending file to socket over socks5 '''
def __init__(self, sock_hash, _sock, host = None, port = None, caller = None, on_connect = None):
self.sendqueue = []
self.sendbuff = None
self._sock = _sock
self.host, self.port = host, port
self.on_connect = on_connect
self.writable = False
self.readable = False
self._exported_methods=[self.send, self.disconnect, self.onreceive]
self._sock.setblocking(False)
self.fd = self._sock.fileno()
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.setblocking(False)
self.fd = self._sock.fileno()
gajim.idlequeue.plug_idle(self, True, False)
self.set_timeout(CONNECT_TIMEOUT_SECONDS)
def set_timeout(self, timeout):
gajim.idlequeue.remove_timeout(self.fd)
if self.state >= 0:
gajim.idlequeue.set_read_timeout(self.fd, timeout)
def plugin(self, owner):
self.onreceive(owner._on_receive_document_attrs)
def plugout(self):
''' Disconnect from the remote server and unregister self.disconnected method from
the owner's dispatcher. '''
self.disconnect()
self._owner = None
def onreceive(self, recv_handler):
if not recv_handler:
if hasattr(self._owner, 'Dispatcher'):
self.on_receive = self._owner.Dispatcher.ProcessNonBlocking
else:
self.on_receive = None
return
_tmp = self.on_receive
# make sure this cb is not overriden by recursive calls
if not recv_handler(None) and _tmp == self.on_receive:
self.on_receive = recv_handler
'''Append stanza to the queue of messages to be send.
If supplied data is unicode string, encode it to utf-8.
'''
if self.state <= 0:
return
r = stanza
if isinstance(r, unicode):
r = r.encode('utf-8')
self.sendqueue.append(r)
self._plug_idle()
def read_timeout(self):
try:
self._sock.connect((self.host, self.port))
self._sock.setblocking(False)
except Exception, ee:
(errnum, errstr) = ee
if errnum in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK):
return
# win32 needs this
elif errnum not in (0, 10056, errno.EISCONN) or self.state != 0:
self.disconnect()
return None
else: # socket is already connected
self._sock.setblocking(False)
self.state = 1 # connected
self.on_connect(self)
return 1 # we are connected
return
gajim.idlequeue.remove_timeout(self.fd)
self._do_send()
def pollend(self):
self.state = -1
def pollin(self):
''' Reads all pending incoming data. Calls owner's disconnected() method if appropriate.'''
received = ''
errnum = 0
try:
# get as many bites, as possible, but not more than RECV_BUFSIZE
except Exception, e:
if len(e.args) > 0 and isinstance(e.args[0], int):
errnum = e[0]
sys.exc_clear()
# "received" will be empty anyhow
if errnum == socket.SSL_ERROR_WANT_READ:
pass
elif errnum in [errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN]:
self.pollend()
# don't proccess result, cas it will raise error
return
elif not received :
if errnum != socket.SSL_ERROR_EOF:
# 8 EOF occurred in violation of protocol
self.pollend()
if self.state >= 0:
return
if self.state < 0:
return
if self.on_receive:
if self._owner.sock_type == TYPE_CLIENT:
self.set_timeout(ACTIVITY_TIMEOUT_SECONDS)
if hasattr(self._owner, 'Dispatcher'):
self._owner.Dispatcher.Event('', DATA_RECEIVED, received)
self.on_receive(received)
else:
# This should never happed, so we need the debug
self.DEBUG('Unhandled data received: %s' % received,'error')
return True
def onreceive(self, recv_handler):
if not recv_handler:
if hasattr(self._owner, 'Dispatcher'):
self.on_receive = self._owner.Dispatcher.ProcessNonBlocking
else:
self.on_receive = None
return
_tmp = self.on_receive
# make sure this cb is not overriden by recursive calls
if not recv_handler(None) and _tmp == self.on_receive:
self.on_receive = recv_handler
''' Closes the socket. '''
gajim.idlequeue.remove_timeout(self.fd)
gajim.idlequeue.unplug_idle(self.fd)
try:
self._sock.shutdown(socket.SHUT_RDWR)
self._sock.close()
except:
# socket is already closed
pass
self.fd = -1
self.state = -1
if self._owner:
self._owner.on_disconnect()
def _do_send(self):
if not self.sendbuff:
if not self.sendqueue:
return None # nothing to send
self.sendbuff = self.sendqueue.pop(0)
self.sent_data = self.sendbuff
try:
if send_count:
self.sendbuff = self.sendbuff[send_count:]
if not self.sendbuff and not self.sendqueue:
if self.state < 0:
gajim.idlequeue.unplug_idle(self.fd)
self._on_send()
return
# we are not waiting for write
self._plug_idle()
self._on_send()
except socket.error, e:
sys.exc_clear()
if e[0] == socket.SSL_ERROR_WANT_WRITE:
return True
if self.state < 0:
if self._owner.sock_type == TYPE_CLIENT:
self.set_timeout(ACTIVITY_TIMEOUT_SECONDS)
return True
def _plug_idle(self):
readable = self.state != 0
if self.sendqueue or self.sendbuff:
writable = True
else:
writable = False
if self.writable != writable or self.readable != readable:
gajim.idlequeue.plug_idle(self, writable, readable)
def _on_send(self):
if self.sent_data and self.sent_data.strip():
if hasattr(self._owner, 'Dispatcher'):
self._owner.Dispatcher.Event('', DATA_SENT, self.sent_data)
self.sent_data = None
def _on_send_failure(self):
self.DEBUG("Socket error while sending data",'error')
self._owner.disconnected()
self.sent_data = None
self.zeroconf = None
self.roster = None
self.last_msg = ''
self.recipient_to_hash = {}
self.ip_to_hash = {}
def test_avahi(self):
try:
import avahi
except ImportError:
return False
return True
def connect(self, show, msg):
self.port = self.start_listener(self.caller.port)
if not self.port:
self.zeroconf_init(show, msg)
if not self.zeroconf.connect():
self.disconnect()
self.roster = roster_zeroconf.Roster(self.zeroconf)
if self.zeroconf:
return self.zeroconf.remove_announce()
if self.zeroconf:
return self.zeroconf.announce()
if self.zeroconf:
self.zeroconf.txt['msg'] = msg
self.last_msg = msg
return self.zeroconf.update_txt(show)
if self.zeroconf:
self.zeroconf.resolve_all()
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
def reannounce(self, txt):
self.remove_announce()
self.zeroconf.txt = txt
self.zeroconf.port = self.port
self.zeroconf.username = self.caller.username
return self.announce()
def zeroconf_init(self, show, msg):
self.zeroconf = zeroconf.Zeroconf(self.caller._on_new_service,
self.caller._on_remove_service, self.caller._on_name_conflictCB,
self.caller._on_disconnected, self.caller.username, self.caller.host,
self.port)
self.zeroconf.txt['msg'] = msg
self.zeroconf.txt['status'] = show
self.zeroconf.txt['1st'] = self.caller.first
self.zeroconf.txt['last'] = self.caller.last
self.zeroconf.txt['jid'] = self.caller.jabber_id
self.zeroconf.txt['email'] = self.caller.email
self.zeroconf.username = self.caller.username
self.zeroconf.host = self.caller.host
self.zeroconf.port = self.port
self.last_msg = msg
def disconnect(self):
if self.listener:
self.listener.disconnect()
self.listener = None
if self.zeroconf:
self.zeroconf.disconnect()
self.zeroconf = None
if self.roster:
self.roster.zeroconf = None
self.roster._data = None
self.roster = None
def kill_all_connections(self):
for connection in self.connections.values():
connection.force_disconnect()
def add_connection(self, connection, ip, recipient):
sock_hash = connection.sock_hash
if sock_hash not in self.connections:
self.connections[sock_hash] = connection
self.ip_to_hash[ip] = sock_hash
if recipient:
self.recipient_to_hash[recipient] = sock_hash
def remove_connection(self, sock_hash):
if sock_hash in self.connections:
del self.connections[sock_hash]
for i in self.recipient_to_hash:
if self.recipient_to_hash[i] == sock_hash:
del self.recipient_to_hash[i]
break
for i in self.ip_to_hash:
if self.ip_to_hash[i] == sock_hash:
del self.ip_to_hash[i]
break
for p in range(port, port + 5):
self.listener = ZeroconfListener(p, self)
self.listener.bind()
if self.listener.started:
return p
self.listener = None
return False
if self.roster:
return self.roster.getRoster()
return {}
def send(self, msg_iq):
msg_iq.setFrom(self.roster.zeroconf.name)
to = msg_iq.getTo()
if to in self.recipient_to_hash:
conn = self.connections[self.recipient_to_hash[to]]
if conn.add_message(msg_iq):
return
try:
item = self.roster[to]
except KeyError:
#XXX invalid recipient, show some error maybe ?
return
if item['address'] in self.ip_to_hash:
conn = self.connections[self.ip_to_hash[item['address']]]
if conn.add_message(msg_iq):
return
P2PClient(None, item['address'], item['port'], self, [msg_iq], to)