client_zeroconf.py 11.6 KB
Newer Older
dkirov's avatar
dkirov committed
1
##      common/zeroconf/client_zeroconf.py
sb's avatar
sb committed
2
3
##
## Copyright (C) 2006 Stefan Bethge <stefan@lanpartei.de>
4
## 				2006 Dimitur Kirov <dkirov@gmail.com>
sb's avatar
sb committed
5
6
7
8
9
10
11
12
13
14
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published
## by the Free Software Foundation; version 2 only.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
## GNU General Public License for more details.
##
15
16
from common import gajim
from common.xmpp.idlequeue import IdleObject
dkirov's avatar
dkirov committed
17
from common.xmpp import dispatcher_nb, simplexml
18
19
20
from common.xmpp.client import *
from common.xmpp.simplexml import ustr
from dialogs import BindPortError
dkirov's avatar
dkirov committed
21
from  common.xmpp.protocol import *
22
23
import socket
import errno
sb's avatar
sb committed
24

sb's avatar
sb committed
25
from common.zeroconf import roster_zeroconf
sb's avatar
sb committed
26

27
28
29
MAX_BUFF_LEN = 65536
DATA_RECEIVED='DATA RECEIVED'
DATA_SENT='DATA SENT'
dkirov's avatar
dkirov committed
30
TYPE_SERVER, TYPE_CLIENT = range(2)
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61

class ZeroconfListener(IdleObject):
	def __init__(self, port, caller = None):
		''' handle all incomming connections on ('0.0.0.0', port)'''
		self.port = port
		self.queue_idx = -1	
		#~ self.queue = None
		self.started = False
		self._sock = None
		self.fd = -1
		self.caller = caller
		
	def bind(self):
		self._serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
		self._serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
		self._serv.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
		self._serv.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
		# will fail when port as busy, or we don't have rights to bind
		try:
			self._serv.bind(('0.0.0.0', self.port))
		except Exception, e:
			# unable to bind, show error dialog
			return None
		self._serv.listen(socket.SOMAXCONN)
		self._serv.setblocking(False)
		self.fd = self._serv.fileno()
		gajim.idlequeue.plug_idle(self, False, True)
		self.started = True
	
	def pollend(self):
		''' called when we stop listening on (host, port) '''
dkirov's avatar
dkirov committed
62
		self.disconnect()
63
64
65
66
	
	def pollin(self):
		''' accept a new incomming connection and notify queue'''
		sock = self.accept_conn()
dkirov's avatar
dkirov committed
67
		P2PClient(sock[0], sock[1][0], sock[1][1], self.caller)
68
69
70
71
72
73
74
75
76
77
78
	
	def disconnect(self):
		''' free all resources, we are not listening anymore '''
		gajim.idlequeue.remove_timeout(self.fd)
		gajim.idlequeue.unplug_idle(self.fd)
		self.fd = -1
		self.started = False
		try:
			self._serv.close()
		except:
			pass
dkirov's avatar
dkirov committed
79
		#XXX kill all active connection
80
81
82
83
84
85
86
87
	
	def accept_conn(self):
		''' accepts a new incomming connection '''
		_sock  = self._serv.accept()
		_sock[0].setblocking(False)
		return _sock


dkirov's avatar
dkirov committed
88
89
90
91
92
93

class P2PClient(IdleObject):
	def __init__(self, _sock, host, port, caller):
		self._owner = self
		self.Namespace = 'jabber:client'
		self.defaultNamespace = self.Namespace
dkirov's avatar
dkirov committed
94
95
		self._component = 0
		self._registered_name = None
dkirov's avatar
dkirov committed
96
97
98
99
100
101
102
103
		self._caller = caller
		self.Server = host
		self.DBG = 'client'
		debug = ['always', 'nodebuilder']
		self._DEBUG = Debug.Debug(debug)
		self.DEBUG = self._DEBUG.Show
		self.debug_flags = self._DEBUG.debug_flags
		self.debug_flags.append(self.DBG)
dkirov's avatar
dkirov committed
104
105
106
107
		P2PConnection('', _sock, host, port, caller, self.on_connect)
	
	def on_connect(self, conn):
		self.Connection = conn
dkirov's avatar
dkirov committed
108
109
110
111
		self.Connection.PlugIn(self)
		dispatcher_nb.Dispatcher().PlugIn(self)
		self.RegisterHandler('message', self._messageCB)
	
dkirov's avatar
dkirov committed
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
	def StreamInit(self):
		''' Send an initial stream header. '''
		self.Dispatcher.Stream = simplexml.NodeBuilder()
		self.Dispatcher.Stream._dispatch_depth = 2
		self.Dispatcher.Stream.dispatch = self.Dispatcher.dispatch
		self.Dispatcher.Stream.stream_header_received = self.Dispatcher._check_stream_start
		self.debug_flags.append(simplexml.DBG_NODEBUILDER)
		self.Dispatcher.Stream.DEBUG = self.DEBUG
		self.Dispatcher.Stream.features = None
		self.Dispatcher._metastream = Node('stream:stream')
		self.Dispatcher._metastream.setNamespace(self.Namespace)
		#~ self._metastream.setAttr('version', '1.0')
		self.Dispatcher._metastream.setAttr('xmlns:stream', NS_STREAMS)
		#~ self._metastream.setAttr('to', self._owner.Server)
		self.Dispatcher.send("<?xml version='1.0'?>%s>" % str(self.Dispatcher._metastream)[:-2])
	
dkirov's avatar
dkirov committed
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
	def disconnected(self):
		if self.__dict__.has_key('Dispatcher'):
			self.Dispatcher.PlugOut()
		if self.__dict__.has_key('P2PConnection'):
			self.P2PConnection.PlugOut()
		
	def _on_receive_document_attrs(self, data):
		if data:
			self.Dispatcher.ProcessNonBlocking(data)
		if not hasattr(self, 'Dispatcher') or \
			self.Dispatcher.Stream._document_attrs is None:
			return
		self.onreceive(None)
		if self.Dispatcher.Stream._document_attrs.has_key('version') and \
			self.Dispatcher.Stream._document_attrs['version'] == '1.0':
				#~ self.onreceive(self._on_receive_stream_features)
				#XXX continue with TLS
				return
		self.onreceive(None)
		return True
		
		
	def _messageCB(self, conn, data):
		self._caller._messageCB(self.Server, conn, data)
		
		
154
155
class P2PConnection(IdleObject, PlugIn):
	''' class for sending file to socket over socks5 '''
dkirov's avatar
dkirov committed
156
	def __init__(self, sock_hash, _sock, host = None, port = None, caller = None, on_connect = None):
dkirov's avatar
dkirov committed
157
		IdleObject.__init__(self)
158
		PlugIn.__init__(self)
dkirov's avatar
dkirov committed
159
		self.DBG_LINE='socket'
160
161
162
		self.sendqueue = []
		self.sendbuff = None
		self._sock = _sock
dkirov's avatar
dkirov committed
163
164
		self.host, self.port = host, port
		self.on_connect = on_connect
165
166
		self.writable = False
		self.readable = False
dkirov's avatar
dkirov committed
167
		self._exported_methods=[self.send, self.disconnect, self.onreceive]
168
		self.on_receive = None
dkirov's avatar
dkirov committed
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
		if _sock:
			self.sock_type = TYPE_SERVER
			self.connected = True
			self.state = 1 
			_sock.setblocking(False)
			self.fd = _sock.fileno()
			self.on_connect(self)
		else:
			self.sock_type = TYPE_CLIENT
			self.connected = False
			self.state = 0
			self.idlequeue.plug_idle(self, True, False)
			self.do_connect()
			
		
dkirov's avatar
dkirov committed
184
185
186
187
		
		
	def plugin(self, owner):
		self.onreceive(owner._on_receive_document_attrs)
dkirov's avatar
dkirov committed
188
		self._plug_idle()
dkirov's avatar
dkirov committed
189
		return True
190
	
dkirov's avatar
dkirov committed
191
192
193
194
195
196
	def plugout(self):
		''' Disconnect from the remote server and unregister self.disconnected method from
			the owner's dispatcher. '''
		self.disconnect()
		self._owner.Connection = None
		self._owner = None
197
198
199
200
201
202
203
204
205
206
207
208
209
210
	
	def onreceive(self, recv_handler):
		if not recv_handler:
			if hasattr(self._owner, 'Dispatcher'):
				self.on_receive = self._owner.Dispatcher.ProcessNonBlocking
			else:
				self.on_receive = None
			return
		_tmp = self.on_receive
		# make sure this cb is not overriden by recursive calls
		if not recv_handler(None) and _tmp == self.on_receive:
			self.on_receive = recv_handler
	
	
dkirov's avatar
dkirov committed
211
212
	
	def send(self, stanza):
213
214
215
216
217
218
219
220
		'''Append stanza to the queue of messages to be send. 
		If supplied data is unicode string, encode it to utf-8.
		'''
		if self.state <= 0:
			return
		r = stanza
		if isinstance(r, unicode): 
			r = r.encode('utf-8')
dkirov's avatar
dkirov committed
221
222
		elif not isinstance(r, str): 
			r = ustr(r).encode('utf-8')
223
224
225
226
227
228
229
230
		self.sendqueue.append(r)
		self._plug_idle()
		
	def read_timeout(self):
		gajim.idlequeue.remove_timeout(self.fd)
		# no activity for foo seconds
		# self.pollend()
	
dkirov's avatar
dkirov committed
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
	
	def do_connect(self):
		try:
			self._sock.connect((self.host, self.port))
			self._sock.setblocking(False)
		except Exception, ee:
			(errnum, errstr) = ee
			if errnum in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK): 
				return
			# win32 needs this
			elif errnum not in  (10056, errno.EISCONN) or self.state != 0:
				self.disconnect()
				return None
			else: # socket is already connected
				self._sock.setblocking(False)
		self.connected = True
		self.state = 1 # connected
		self.on_connect(self)
		return 1 # we are connected
	
	
252
253
	def pollout(self):
		if not self.connected:
dkirov's avatar
dkirov committed
254
255
256
257
			self.disconnect()
			return
		if self.state == 0:
			self.do_connect()
258
259
260
261
262
263
			return
		gajim.idlequeue.remove_timeout(self.fd)
		self._do_send()
	
	def pollend(self):
		self.state = -1
dkirov's avatar
dkirov committed
264
		self.disconnect()
265
266
267
268
269
270
271
	
	def pollin(self):
		''' Reads all pending incoming data. Calls owner's disconnected() method if appropriate.'''
		received = ''
		errnum = 0
		try: 
			# get as many bites, as possible, but not more than RECV_BUFSIZE
dkirov's avatar
dkirov committed
272
			received = self._sock.recv(MAX_BUFF_LEN)
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
		except Exception, e:
			if len(e.args)  > 0 and isinstance(e.args[0], int):
				errnum = e[0]
			sys.exc_clear()
			# "received" will be empty anyhow 
		if errnum == socket.SSL_ERROR_WANT_READ:
			pass
		elif errnum in [errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN]:
			self.pollend()
			# don't proccess result, cas it will raise error
			return
		elif not received :
			if errnum != socket.SSL_ERROR_EOF: 
				# 8 EOF occurred in violation of protocol
				self.pollend()
			if self.state >= 0:
dkirov's avatar
dkirov committed
289
				self.disconnect()
290
291
292
293
294
			return
		
		if self.state < 0:
			return
		if self.on_receive:
dkirov's avatar
dkirov committed
295
296
			if received.strip():
				self.DEBUG(received, 'got')
297
298
299
300
301
302
			if hasattr(self._owner, 'Dispatcher'):
				self._owner.Dispatcher.Event('', DATA_RECEIVED, received)
			self.on_receive(received)
		else:
			# This should never happed, so we need the debug
			self.DEBUG('Unhandled data received: %s' % received,'got')
dkirov's avatar
dkirov committed
303
			self.disconnect()
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
			if self.on_connect_failure:
				self.on_connect_failure()
		return True
	
	def onreceive(self, recv_handler):
		if not recv_handler:
			if hasattr(self._owner, 'Dispatcher'):
				self.on_receive = self._owner.Dispatcher.ProcessNonBlocking
			else:
				self.on_receive = None
			return
		_tmp = self.on_receive
		# make sure this cb is not overriden by recursive calls
		if not recv_handler(None) and _tmp == self.on_receive:
			self.on_receive = recv_handler
	
dkirov's avatar
dkirov committed
320
	def disconnect(self):
321
322
323
324
325
326
327
328
329
330
331
332
		''' Closes the socket. '''
		gajim.idlequeue.remove_timeout(self.fd)
		gajim.idlequeue.unplug_idle(self.fd)
		try:
			self._sock.shutdown(socket.SHUT_RDWR)
			self._sock.close()
		except:
			# socket is already closed
			pass
		self.connected = False
		self.fd = -1
		self.state = -1
dkirov's avatar
dkirov committed
333
		self._owner.disconnected()
334
335
336
337
338
339
340
341

	def _do_send(self):
		if not self.sendbuff:
			if not self.sendqueue:
				return None # nothing to send
			self.sendbuff = self.sendqueue.pop(0)
			self.sent_data = self.sendbuff
		try:
dkirov's avatar
dkirov committed
342
			send_count = self._sock.send(self.sendbuff)
343
344
345
346
347
348
			if send_count:
				self.sendbuff = self.sendbuff[send_count:]
				if not self.sendbuff and not self.sendqueue:
					if self.state < 0:
						gajim.idlequeue.unplug_idle(self.fd)
						self._on_send()
dkirov's avatar
dkirov committed
349
						self.disconnect()
350
351
352
353
354
355
356
357
358
						return
					# we are not waiting for write 
					self._plug_idle()
				self._on_send()
		except socket.error, e:
			sys.exc_clear()
			if e[0] == socket.SSL_ERROR_WANT_WRITE:
				return True		
			if self.state < 0:
dkirov's avatar
dkirov committed
359
				self.disconnect()
360
				return
dkirov's avatar
dkirov committed
361
362
			self._on_send_failure()
			return
363
364
365
366
367
368
369
370
371
372
373
374
375
376
		return True
	
	def _plug_idle(self):
		readable = self.state != 0
		if self.sendqueue or self.sendbuff:
			writable = True
		else:
			writable = False
		if self.writable != writable or self.readable != readable:
			gajim.idlequeue.plug_idle(self, writable, readable)
	
	
	def _on_send(self):
		if self.sent_data and self.sent_data.strip():
dkirov's avatar
dkirov committed
377
			self.DEBUG(self.sent_data,'sent')
378
379
380
381
382
383
384
385
386
387
			if hasattr(self._owner, 'Dispatcher'):
				self._owner.Dispatcher.Event('', DATA_SENT, self.sent_data)
		self.sent_data  = None
	
	def _on_send_failure(self):
		self.DEBUG("Socket error while sending data",'error')
		self._owner.disconnected()
		self.sent_data = None


sb's avatar
sb committed
388
class ClientZeroconf:
389
	def __init__(self, zeroconf, caller):
sb's avatar
sb committed
390
		self.roster = roster_zeroconf.Roster(zeroconf)
391
392
393
		self.caller = caller
		self.start_listener(zeroconf.port)
		
sb's avatar
sb committed
394
		
395
396
397
398
399
400
401
402
403
404
	def start_listener(self, port):
		self.listener = ZeroconfListener(port, self.caller)
		self.listener.bind()
		if self.listener.started is False:
			self.listener = None
			# We cannot bind port, call error 
			# dialog from dialogs.py and fail
			BindPortError(port)
			return None
		#~ self.connected += 1
sb's avatar
sb committed
405
406
407
408
409
	def getRoster(self):
		return self.roster.getRoster()

	def send(self, str):
		pass