From 1a325833c36ee433363423e7c757904216b1b76f Mon Sep 17 00:00:00 2001
From: Dimitur Kirov <dkirov@gmail.com>
Date: Sun, 17 Sep 2006 22:57:41 +0000
Subject: [PATCH] add listening server for incomming messages

---
 src/common/xmpp/dispatcher_nb.py              |   4 +-
 src/common/zeroconf/client_zeroconf.py        | 305 +++++++++++++++++-
 .../zeroconf/connection_handlers_zeroconf.py  | 104 +++++-
 src/common/zeroconf/connection_zeroconf.py    |  14 +-
 src/common/zeroconf/zeroconf.py               |   9 +-
 5 files changed, 423 insertions(+), 13 deletions(-)

diff --git a/src/common/xmpp/dispatcher_nb.py b/src/common/xmpp/dispatcher_nb.py
index ca13af1843..7d57fdc191 100644
--- a/src/common/xmpp/dispatcher_nb.py
+++ b/src/common/xmpp/dispatcher_nb.py
@@ -400,7 +400,7 @@ class Dispatcher(PlugIn):
 		''' Serialise stanza and put it on the wire. Assign an unique ID to it before send.
 			Returns assigned ID.'''
 		if type(stanza) in [type(''), type(u'')]: 
-			return self._owner.Connection.send(stanza)
+			return self._owner.send_stanza(stanza)
 		if not isinstance(stanza, Protocol): 
 			_ID=None
 		elif not stanza.getID():
@@ -423,7 +423,7 @@ class Dispatcher(PlugIn):
 			stanza=route
 		stanza.setNamespace(self._owner.Namespace)
 		stanza.setParent(self._metastream)
-		self._owner.Connection.send(stanza)
+		self._owner.send_stanza(stanza)
 		return _ID
 	
 	def disconnect(self):
diff --git a/src/common/zeroconf/client_zeroconf.py b/src/common/zeroconf/client_zeroconf.py
index e487ec5e68..452d16324e 100644
--- a/src/common/zeroconf/client_zeroconf.py
+++ b/src/common/zeroconf/client_zeroconf.py
@@ -1,6 +1,7 @@
 ##      common/zeroconf/client_zeroconf.py
 ##
 ## Copyright (C) 2006 Stefan Bethge <stefan@lanpartei.de>
+## 				2006 Dimitur Kirov <dkirov@gmail.com>
 ##
 ## This program is free software; you can redistribute it and/or modify
 ## it under the terms of the GNU General Public License as published
@@ -11,14 +12,314 @@
 ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 ## GNU General Public License for more details.
 ##
-
+from common import gajim
+from common.xmpp.idlequeue import IdleObject
+from common.xmpp import dispatcher_nb, debug
+from common.xmpp.client import *
+from common.xmpp.simplexml import ustr
+from dialogs import BindPortError
+import socket
+import errno
 
 from common.zeroconf import roster_zeroconf
 
+MAX_BUFF_LEN = 65536
+DATA_RECEIVED='DATA RECEIVED'
+DATA_SENT='DATA SENT'
+
+
+class ZeroconfListener(IdleObject):
+	def __init__(self, port, caller = None):
+		''' handle all incomming connections on ('0.0.0.0', port)'''
+		self.port = port
+		self.queue_idx = -1	
+		#~ self.queue = None
+		self.started = False
+		self._sock = None
+		self.fd = -1
+		self.caller = caller
+		
+	def bind(self):
+		self._serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+		self._serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+		self._serv.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
+		self._serv.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+		# will fail when port as busy, or we don't have rights to bind
+		try:
+			self._serv.bind(('0.0.0.0', self.port))
+		except Exception, e:
+			# unable to bind, show error dialog
+			return None
+		self._serv.listen(socket.SOMAXCONN)
+		self._serv.setblocking(False)
+		self.fd = self._serv.fileno()
+		gajim.idlequeue.plug_idle(self, False, True)
+		self.started = True
+	
+	def pollend(self):
+		''' called when we stop listening on (host, port) '''
+		self.disconnect2()
+	
+	def pollin(self):
+		''' accept a new incomming connection and notify queue'''
+		sock = self.accept_conn()
+		P2PConnection('', sock[0], sock[1][0], sock[1][1], self.caller)
+	
+	def disconnect(self):
+		''' free all resources, we are not listening anymore '''
+		gajim.idlequeue.remove_timeout(self.fd)
+		gajim.idlequeue.unplug_idle(self.fd)
+		self.fd = -1
+		self.started = False
+		try:
+			self._serv.close()
+		except:
+			pass
+	
+	def accept_conn(self):
+		''' accepts a new incomming connection '''
+		_sock  = self._serv.accept()
+		_sock[0].setblocking(False)
+		return _sock
+
+
+class P2PConnection(IdleObject, PlugIn):
+	''' class for sending file to socket over socks5 '''
+	def __init__(self, sock_hash, _sock, host = None, port = None, caller = None):
+		PlugIn.__init__(self)
+		self.sendqueue = []
+		self.sendbuff = None
+		self._sock = _sock
+		self._sock.setblocking(False)
+		self.fd = _sock.fileno()
+		self._recv = _sock.recv
+		self._send = _sock.send
+		self.connected = True
+		self.state = 1 
+		self.writable = False
+		self.readable = False
+		# waiting for first bytes
+		# start waiting for data
+		self.Namespace = 'jabber:client'
+		self.defaultNamespace = self.Namespace
+		self._component=0
+		self._caller = caller
+		self.Server = host
+		self.Connection = self
+		self._registered_name = None
+		self.DBG = 'client'
+		debug = ['always', 'nodebuilder']
+		self._DEBUG = Debug.Debug(debug)
+		self.DEBUG = self._DEBUG.Show
+		self.debug_flags = self._DEBUG.debug_flags
+		self.debug_flags.append(self.DBG)
+		self._owner = self
+		self._exported_methods=[self.send_stanza, self.disconnect2, self.pollend]	
+		self.on_receive = None
+		gajim.idlequeue.plug_idle(self, False, True)
+		self.onreceive(self._on_receive_document_attrs)
+		dispatcher_nb.Dispatcher().PlugIn(self)
+		self.RegisterHandler('message', self._messageCB)
+	
+		
+	def _messageCB(self, conn, data):
+		self._caller._messageCB(self.Server, conn, data)
+	
+	def onreceive(self, recv_handler):
+		if not recv_handler:
+			if hasattr(self._owner, 'Dispatcher'):
+				self.on_receive = self._owner.Dispatcher.ProcessNonBlocking
+			else:
+				self.on_receive = None
+			return
+		_tmp = self.on_receive
+		# make sure this cb is not overriden by recursive calls
+		if not recv_handler(None) and _tmp == self.on_receive:
+			self.on_receive = recv_handler
+	
+	def _on_receive_document_attrs(self, data):
+		if data:
+			self.Dispatcher.ProcessNonBlocking(data)
+		if not hasattr(self, 'Dispatcher') or \
+			self.Dispatcher.Stream._document_attrs is None:
+			return
+		self.onreceive(None)
+		if self.Dispatcher.Stream._document_attrs.has_key('version') and \
+			self.Dispatcher.Stream._document_attrs['version'] == '1.0':
+				#~ self.onreceive(self._on_receive_stream_features)
+				#XXX continue with TLS
+				return
+		self.onreceive(None)
+		return True
+	
+	def send_stanza(self, stanza):
+		'''Append stanza to the queue of messages to be send. 
+		If supplied data is unicode string, encode it to utf-8.
+		'''
+		if self.state <= 0:
+			return
+		r = stanza
+		if isinstance(r, unicode): 
+			r = r.encode('utf-8')
+		elif not isinstance(r, str): 
+			r = ustr(r).encode('utf-8')
+		self.sendqueue.append(r)
+		self._plug_idle()
+		
+	def read_timeout(self):
+		gajim.idlequeue.remove_timeout(self.fd)
+		# no activity for foo seconds
+		# self.pollend()
+	
+	def pollout(self):
+		if not self.connected:
+			self.disconnect2()
+			return
+		gajim.idlequeue.remove_timeout(self.fd)
+		self._do_send()
+		# self.idlequeue.plug_idle(self, False, True)
+	
+	def pollend(self):
+		self.state = -1
+		self.disconnect2()
+	
+	def pollin(self):
+		''' Reads all pending incoming data. Calls owner's disconnected() method if appropriate.'''
+		received = ''
+		errnum = 0
+		try: 
+			# get as many bites, as possible, but not more than RECV_BUFSIZE
+			received = self._recv(MAX_BUFF_LEN)
+		except Exception, e:
+			if len(e.args)  > 0 and isinstance(e.args[0], int):
+				errnum = e[0]
+			sys.exc_clear()
+			# "received" will be empty anyhow 
+		if errnum == socket.SSL_ERROR_WANT_READ:
+			pass
+		elif errnum in [errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN]:
+			self.pollend()
+			# don't proccess result, cas it will raise error
+			return
+		elif not received :
+			if errnum != socket.SSL_ERROR_EOF: 
+				# 8 EOF occurred in violation of protocol
+				self.pollend()
+			if self.state >= 0:
+				self.disconnect2()
+			return
+		
+		if self.state < 0:
+			return
+		if self.on_receive:
+			if hasattr(self._owner, 'Dispatcher'):
+				self._owner.Dispatcher.Event('', DATA_RECEIVED, received)
+			self.on_receive(received)
+		else:
+			# This should never happed, so we need the debug
+			self.DEBUG('Unhandled data received: %s' % received,'got')
+			self.disconnect2()
+			if self.on_connect_failure:
+				self.on_connect_failure()
+		return True
+	
+	def onreceive(self, recv_handler):
+		if not recv_handler:
+			if hasattr(self._owner, 'Dispatcher'):
+				self.on_receive = self._owner.Dispatcher.ProcessNonBlocking
+			else:
+				self.on_receive = None
+			return
+		_tmp = self.on_receive
+		# make sure this cb is not overriden by recursive calls
+		if not recv_handler(None) and _tmp == self.on_receive:
+			self.on_receive = recv_handler
+	
+	def disconnect2(self):
+		''' Closes the socket. '''
+		gajim.idlequeue.remove_timeout(self.fd)
+		gajim.idlequeue.unplug_idle(self.fd)
+		try:
+			self._sock.shutdown(socket.SHUT_RDWR)
+			self._sock.close()
+		except:
+			# socket is already closed
+			pass
+		self.connected = False
+		self.fd = -1
+		self.state = -1
+
+	def _do_send(self):
+		if not self.sendbuff:
+			if not self.sendqueue:
+				return None # nothing to send
+			self.sendbuff = self.sendqueue.pop(0)
+			self.sent_data = self.sendbuff
+		try:
+			send_count = self._send(self.sendbuff)
+			if send_count:
+				self.sendbuff = self.sendbuff[send_count:]
+				if not self.sendbuff and not self.sendqueue:
+					if self.state < 0:
+						gajim.idlequeue.unplug_idle(self.fd)
+						self._on_send()
+						self.disconnect2()
+						return
+					# we are not waiting for write 
+					self._plug_idle()
+				self._on_send()
+		except socket.error, e:
+			sys.exc_clear()
+			if e[0] == socket.SSL_ERROR_WANT_WRITE:
+				return True		
+			if self.state < 0:
+				self.disconnect2()
+				return
+			if self._on_send_failure:
+				self._on_send_failure()
+				return
+		return True
+	
+	def _plug_idle(self):
+		readable = self.state != 0
+		if self.sendqueue or self.sendbuff:
+			writable = True
+		else:
+			writable = False
+		if self.writable != writable or self.readable != readable:
+			gajim.idlequeue.plug_idle(self, writable, readable)
+	
+	
+	def _on_send(self):
+		if self.sent_data and self.sent_data.strip():
+			#~ self.DEBUG(self.sent_data,'sent')
+			if hasattr(self._owner, 'Dispatcher'):
+				self._owner.Dispatcher.Event('', DATA_SENT, self.sent_data)
+		self.sent_data  = None
+	
+	def _on_send_failure(self):
+		self.DEBUG("Socket error while sending data",'error')
+		self._owner.disconnected()
+		self.sent_data = None
+
+
 class ClientZeroconf:
-	def __init__(self, zeroconf):
+	def __init__(self, zeroconf, caller):
 		self.roster = roster_zeroconf.Roster(zeroconf)
+		self.caller = caller
+		self.start_listener(zeroconf.port)
+		
 		
+	def start_listener(self, port):
+		self.listener = ZeroconfListener(port, self.caller)
+		self.listener.bind()
+		if self.listener.started is False:
+			self.listener = None
+			# We cannot bind port, call error 
+			# dialog from dialogs.py and fail
+			BindPortError(port)
+			return None
+		#~ self.connected += 1
 	def getRoster(self):
 		return self.roster.getRoster()
 
diff --git a/src/common/zeroconf/connection_handlers_zeroconf.py b/src/common/zeroconf/connection_handlers_zeroconf.py
index 758a181e2e..8a4937e071 100644
--- a/src/common/zeroconf/connection_handlers_zeroconf.py
+++ b/src/common/zeroconf/connection_handlers_zeroconf.py
@@ -33,7 +33,7 @@ import common.xmpp
 from common import GnuPG
 from common import helpers
 from common import gajim
-
+from common.zeroconf import zeroconf
 STATUS_LIST = ['offline', 'connecting', 'online', 'chat', 'away', 'xa', 'dnd',
 	'invisible']
 # kind of events we can wait for an answer
@@ -227,6 +227,108 @@ class ConnectionHandlersZeroconf(ConnectionVcard):
 			idle.init()
 		except:
 			HAS_IDLE = False
+	def _messageCB(self, ip, con, msg):
+		'''Called when we receive a message'''
+		msgtxt = msg.getBody()
+		mtype = msg.getType()
+		subject = msg.getSubject() # if not there, it's None
+		tim = msg.getTimestamp()
+		tim = time.strptime(tim, '%Y%m%dT%H:%M:%S')
+		tim = time.localtime(timegm(tim))
+		frm = helpers.get_full_jid_from_iq(msg)
+		if frm == 'none':
+			for key in self.zeroconf.contacts:
+				if ip == self.zeroconf.contacts[key][zeroconf.C_ADDRESS]:
+					frm = key
+		jid = helpers.get_jid_from_iq(msg)
+		print 'jid', jid
+		no_log_for = gajim.config.get_per('accounts', self.name,
+			'no_log_for').split()
+		encrypted = False
+		chatstate = None
+		encTag = msg.getTag('x', namespace = common.xmpp.NS_ENCRYPTED)
+		decmsg = ''
+		# invitations
+		invite = None
+		if not encTag:
+			invite = msg.getTag('x', namespace = common.xmpp.NS_MUC_USER)
+			if invite and not invite.getTag('invite'):
+				invite = None
+		delayed = msg.getTag('x', namespace = common.xmpp.NS_DELAY) != None
+		msg_id = None
+		composing_jep = None
+		# FIXME: Msn transport (CMSN1.2.1 and PyMSN0.10) do NOT RECOMMENDED
+		# invitation
+		# stanza (MUC JEP) remove in 2007, as we do not do NOT RECOMMENDED
+		xtags = msg.getTags('x')
+		# chatstates - look for chatstate tags in a message if not delayed
+		if not delayed:
+			composing_jep = False
+			children = msg.getChildren()
+			for child in children:
+				if child.getNamespace() == 'http://jabber.org/protocol/chatstates':
+					chatstate = child.getName()
+					composing_jep = 'JEP-0085'
+					break
+			# No JEP-0085 support, fallback to JEP-0022
+			if not chatstate:
+				chatstate_child = msg.getTag('x', namespace = common.xmpp.NS_EVENT)
+				if chatstate_child:
+					chatstate = 'active'
+					composing_jep = 'JEP-0022'
+					if not msgtxt and chatstate_child.getTag('composing'):
+						chatstate = 'composing'
+		# JEP-0172 User Nickname
+		user_nick = msg.getTagData('nick')
+		if not user_nick:
+			user_nick = ''
+
+		if encTag and GnuPG.USE_GPG:
+			#decrypt
+			encmsg = encTag.getData()
+			
+			keyID = gajim.config.get_per('accounts', self.name, 'keyid')
+			if keyID:
+				decmsg = self.gpg.decrypt(encmsg, keyID)
+		if decmsg:
+			msgtxt = decmsg
+			encrypted = True
+		if mtype == 'error':
+			error_msg = msg.getError()
+			if not error_msg:
+				error_msg = msgtxt
+				msgtxt = None
+			if self.name not in no_log_for:
+				gajim.logger.write('error', frm, error_msg, tim = tim,
+					subject = subject)
+			self.dispatch('MSGERROR', (frm, msg.getErrorCode(), error_msg, msgtxt,
+				tim))
+		elif mtype == 'chat': # it's type 'chat'
+			if not msg.getTag('body') and chatstate is None: #no <body>
+				return
+			if msg.getTag('body') and self.name not in no_log_for and jid not in\
+				no_log_for and msgtxt:
+				msg_id = gajim.logger.write('chat_msg_recv', frm, msgtxt, tim = tim,
+					subject = subject)
+			self.dispatch('MSG', (frm, msgtxt, tim, encrypted, mtype, subject,
+				chatstate, msg_id, composing_jep, user_nick))
+		else: # it's single message
+			if self.name not in no_log_for and jid not in no_log_for and msgtxt:
+				gajim.logger.write('single_msg_recv', frm, msgtxt, tim = tim,
+					subject = subject)
+			if invite is not None:
+				item = invite.getTag('invite')
+				jid_from = item.getAttr('from')
+				if jid_from == None:
+					jid_from = frm
+				reason = item.getTagData('reason')
+				item = invite.getTag('password')
+				password = invite.getTagData('password')
+				self.dispatch('GC_INVITATION',(frm, jid_from, reason, password))
+			else:
+				self.dispatch('MSG', (frm, msgtxt, tim, encrypted, 'normal',
+					subject, chatstate, msg_id, composing_jep, user_nick))
+	# END messageCB
 	'''
 	def build_http_auth_answer(self, iq_obj, answer):
 		if answer == 'yes':
diff --git a/src/common/zeroconf/connection_zeroconf.py b/src/common/zeroconf/connection_zeroconf.py
index 1b2ee78963..8ab243490e 100644
--- a/src/common/zeroconf/connection_zeroconf.py
+++ b/src/common/zeroconf/connection_zeroconf.py
@@ -184,7 +184,7 @@ class ConnectionZeroconf(ConnectionHandlersZeroconf):
 			return self.connection, ''
 		
 		if self.zeroconf.connect():
-			self.connection = client_zeroconf.ClientZeroconf(self.zeroconf)
+			self.connection = client_zeroconf.ClientZeroconf(self.zeroconf, self)
 			self.roster = self.connection.getRoster()
 			self.dispatch('ROSTER', self.roster)
 
@@ -197,7 +197,7 @@ class ConnectionZeroconf(ConnectionHandlersZeroconf):
 
 			# refresh all contacts data every second
 			self.call_resolve_timeout = True
-			gobject.timeout_add(1000, self._on_resolve_timeout)
+			gobject.timeout_add(10000, self._on_resolve_timeout)
 		else:
 			pass
 			#TODO: display visual notification that we could not connect to avahi
@@ -487,7 +487,13 @@ class ConnectionZeroconf(ConnectionHandlersZeroconf):
 		
 	def send_keepalive(self):
 		# nothing received for the last foo seconds (60 secs by default)
-		if self.connection:
-			self.connection.send(' ')
+		pass
 		
+	def _event_dispatcher(self, realm, event, data):
+		if realm == '':
+			if event == common.xmpp.transports.DATA_RECEIVED:
+				self.dispatch('STANZA_ARRIVED', unicode(data, errors = 'ignore'))
+			elif event == common.xmpp.transports.DATA_SENT:
+				self.dispatch('STANZA_SENT', unicode(data))
+
 # END ConnectionZeroconf
diff --git a/src/common/zeroconf/zeroconf.py b/src/common/zeroconf/zeroconf.py
index 9aea0a97ee..3b95977842 100755
--- a/src/common/zeroconf/zeroconf.py
+++ b/src/common/zeroconf/zeroconf.py
@@ -300,7 +300,7 @@ class Zeroconf:
 			return False
 
 	def send (self, msg, sock):
-		print 'send:'+msg
+		print 'send:', msg
 		totalsent = 0
 		while totalsent < len(msg):
 			sent = sock.send(msg[totalsent:])
@@ -309,13 +309,14 @@ class Zeroconf:
 			totalsent = totalsent + sent
 
 	def send_message(self, jid, msg, type = 'chat'):
-		print 'zeroconf.py: send_message:'+ msg
-		
+		print 'zeroconf.py: send_message:', msg
+		if not msg :
+			return
 		sock = socket.socket ( socket.AF_INET, socket.SOCK_STREAM )
 		#sock.setblocking(False)
 		sock.connect ( ( self.contacts[jid][C_ADDRESS], self.contacts[jid][C_PORT] ) )
 		
-		#print (self.txt_array_to_dict(self.contacts[jid][C_TXT]))['port.p2pj']
+		print (self.txt_array_to_dict(self.contacts[jid][C_TXT]))['port.p2pj']
 
 		#was for adium which uses the txt record
 		#sock.connect ( ( self.contacts[jid][5], int((self.txt_array_to_dict(self.contacts[jid][7]))['port.p2pj']) ) )
-- 
GitLab