jingle_rtp.py 18.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13
# This file is part of Gajim.
#
# Gajim is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published
# by the Free Software Foundation; version 3 only.
#
# Gajim is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Gajim. If not, see <http://www.gnu.org/licenses/>.
14

15 16 17
"""
Handles Jingle RTP sessions (XEP 0167)
"""
18

19
import logging
20
import socket
21
import nbxmpp
22
import gi
Philipp Hörist's avatar
Philipp Hörist committed
23 24
from gi.repository import Farstream

25 26 27
gi.require_version('Gst', '1.0')
from gi.repository import Gst
from gi.repository import GLib
28

29
from gajim.common import app
30

André's avatar
André committed
31 32 33 34
from gajim.common.jingle_transport import JingleTransportICEUDP
from gajim.common.jingle_content import contents, JingleContent, JingleContentSetupException
from gajim.common.connection_handlers_events import InformationEvent
from gajim.common.jingle_session import FailedApplication
35

Philipp Hörist's avatar
Philipp Hörist committed
36
from collections import deque
37

38 39 40
log = logging.getLogger('gajim.c.jingle_rtp')


41
class JingleRTPContent(JingleContent):
42 43
    def __init__(self, session, media, transport=None):
        if transport is None:
44
            transport = JingleTransportICEUDP(None)
45
        JingleContent.__init__(self, session, transport, None)
46 47
        self.media = media
        self._dtmf_running = False
48 49
        self.farstream_media = {
            'audio': Farstream.MediaType.AUDIO,
50
            'video': Farstream.MediaType.VIDEO}[media]
51

52 53 54 55
        self.pipeline = None
        self.src_bin = None
        self.stream_failed_once = False

56 57
        self.candidates_ready = False # True when local candidates are prepared

58 59 60 61 62 63
        # TODO
        self.conference = None
        self.funnel = None
        self.p2psession = None
        self.p2pstream = None

64 65 66
        self.callbacks['session-initiate'] += [self.__on_remote_codecs]
        self.callbacks['content-add'] += [self.__on_remote_codecs]
        self.callbacks['description-info'] += [self.__on_remote_codecs]
67 68
        self.callbacks['content-accept'] += [self.__on_remote_codecs]
        self.callbacks['session-accept'] += [self.__on_remote_codecs]
69 70 71
        self.callbacks['session-terminate'] += [self.__stop]
        self.callbacks['session-terminate-sent'] += [self.__stop]

72
    def setup_stream(self, on_src_pad_added):
73
        # pipeline and bus
74
        self.pipeline = Gst.Pipeline()
75 76 77 78 79
        bus = self.pipeline.get_bus()
        bus.add_signal_watch()
        bus.connect('message', self._on_gst_message)

        # conference
80
        self.conference = Gst.ElementFactory.make('fsrtpconference', None)
81 82 83
        self.pipeline.add(self.conference)
        self.funnel = None

84
        self.p2psession = self.conference.new_session(self.farstream_media)
85

86
        participant = self.conference.new_participant()
87 88 89
        # FIXME: Consider a workaround, here...
        # pidgin and telepathy-gabble don't follow the XEP, and it won't work
        # due to bad controlling-mode
90

91
        params = {'controlling-mode': self.session.weinitiate, 'debug': False}
92 93
        if app.config.get('use_stun_server'):
            stun_server = app.config.get('stun_server')
94 95 96 97 98
            if not stun_server and self.session.connection._stun_servers:
                stun_server = self.session.connection._stun_servers[0]['host']
            if stun_server:
                try:
                    ip = socket.getaddrinfo(stun_server, 0, socket.AF_UNSPEC,
99
                                            socket.SOCK_STREAM)[0][4][0]
100
                except socket.gaierror as e:
101
                    log.warning('Lookup of stun ip failed: %s', str(e))
102 103 104 105
                else:
                    params['stun-ip'] = ip

        self.p2pstream = self.p2psession.new_stream(participant,
106
                                                    Farstream.StreamDirection.BOTH)
107
        self.p2pstream.connect('src-pad-added', on_src_pad_added)
108
        self.p2pstream.set_transmitter_ht('nice', params)
109 110

    def is_ready(self):
111
        return JingleContent.is_ready(self) and self.candidates_ready
112 113

    def make_bin_from_config(self, config_key, pipeline, text):
114
        pipeline = pipeline % app.config.get(config_key)
115
        try:
116 117
            gst_bin = Gst.parse_bin_from_description(pipeline, True)
            return gst_bin
118
        except GLib.GError as e:
119
            app.nec.push_incoming_event(
120 121 122
                InformationEvent(
                    None, conn=self.session.connection, level='error',
                    pri_txt=_('%s configuration error') % text.capitalize(),
Yann Leboulanger's avatar
Yann Leboulanger committed
123
                    sec_txt=_('Couldn’t set up %(text)s. Check your '
124 125 126
                    'configuration.\n\nPipeline was:\n%(pipeline)s\n\n'
                    'Error was:\n%(error)s') % {'text': text,
                    'pipeline': pipeline, 'error': str(e)}))
127 128 129 130 131 132 133
            raise JingleContentSetupException

    def add_remote_candidates(self, candidates):
        JingleContent.add_remote_candidates(self, candidates)
        # FIXME: connectivity should not be etablished yet
        # Instead, it should be etablished after session-accept!
        if self.sent:
134
            self.p2pstream.add_remote_candidates(candidates)
135 136 137 138 139 140

    def batch_dtmf(self, events):
        """
        Send several DTMF tones
        """
        if self._dtmf_running:
141
            raise Exception("There is a DTMF batch already running")
142
        events = deque(events)
143
        self._dtmf_running = True
144
        self._start_dtmf(events.popleft())
Yann Leboulanger's avatar
Yann Leboulanger committed
145
        GLib.timeout_add(500, self._next_dtmf, events)
146 147 148 149

    def _next_dtmf(self, events):
        self._stop_dtmf()
        if events:
150
            self._start_dtmf(events.popleft())
Yann Leboulanger's avatar
Yann Leboulanger committed
151
            GLib.timeout_add(500, self._next_dtmf, events)
152 153 154 155 156
        else:
            self._dtmf_running = False

    def _start_dtmf(self, event):
        if event in ('*', '#'):
157
            event = {'*': Farstream.DTMFEvent.STAR,
158
                     '#': Farstream.DTMFEvent.POUND}[event]
159 160
        else:
            event = int(event)
161
        self.p2psession.start_telephony_event(event, 2)
162 163

    def _stop_dtmf(self):
164
        self.p2psession.stop_telephony_event()
165 166

    def _fill_content(self, content):
167
        content.addChild(nbxmpp.NS_JINGLE_RTP + ' description',
168 169
                         attrs={'media': self.media},
                         payload=list(self.iter_codecs()))
170 171

    def _setup_funnel(self):
172
        self.funnel = Gst.ElementFactory.make('funnel', None)
173 174
        self.pipeline.add(self.funnel)
        self.funnel.link(self.sink)
175 176
        self.sink.set_state(Gst.State.PLAYING)
        self.funnel.set_state(Gst.State.PLAYING)
177 178 179 180

    def _on_src_pad_added(self, stream, pad, codec):
        if not self.funnel:
            self._setup_funnel()
181
        pad.link(self.funnel.get_request_pad('sink_%u'))
182 183

    def _on_gst_message(self, bus, message):
184 185
        if message.type == Gst.MessageType.ELEMENT:
            name = message.get_structure().get_name()
186
            log.debug('gst element message: %s: %s', name, message)
187
            if name == 'farstream-new-active-candidate-pair':
188
                pass
189
            elif name == 'farstream-recv-codecs-changed':
190
                pass
191
            elif name == 'farstream-codecs-changed':
192
                if self.sent and self.p2psession.props.codecs_without_config:
Thibg's avatar
Thibg committed
193
                    self.send_description_info()
194 195 196 197 198 199 200
                    if self.transport.remote_candidates:
                        # those lines MUST be done after we get info on our
                        # codecs
                        self.p2pstream.add_remote_candidates(
                            self.transport.remote_candidates)
                        self.transport.remote_candidates = []
                        self.p2pstream.set_property('direction',
201
                                                    Farstream.StreamDirection.BOTH)
202

203
            elif name == 'farstream-local-candidates-prepared':
204 205 206
                self.candidates_ready = True
                if self.is_ready():
                    self.session.on_session_state_changed(self)
207
            elif name == 'farstream-new-local-candidate':
208
                candidate = self.p2pstream.parse_new_local_candidate(message)[1]
209
                self.transport.candidates.append(candidate)
210
                if self.sent:
211 212
                    # FIXME: Is this case even possible?
                    self.send_candidate(candidate)
213
            elif name == 'farstream-component-state-changed':
214 215
                state = message.get_structure().get_value('state')
                if state == Farstream.StreamState.FAILED:
216
                    reason = nbxmpp.Node('reason')
217
                    reason.setTag('failed-transport')
218
                    self.session.remove_content(self.creator, self.name, reason)
219
            elif name == 'farstream-error':
220 221 222
                log.error('Farstream error #%d!\nMessage: %s',
                          message.get_structure().get_value('error-no'),
                          message.get_structure().get_value('error-msg'))
223
        elif message.type == Gst.MessageType.ERROR:
224 225 226 227
            # TODO: Fix it to fallback to videotestsrc anytime an error occur,
            # or raise an error, Jingle way
            # or maybe one-sided stream?
            if not self.stream_failed_once:
228
                app.nec.push_incoming_event(
229
                    InformationEvent(
230 231 232
                        None, dialog_name='gstreamer-error',
                        kwargs={'error': message.get_structure().get_value('gerror'),
                                'debug': message.get_structure().get_value('debug')}))
233 234 235 236

            sink_pad = self.p2psession.get_property('sink-pad')

            # Remove old source
237 238
            self.src_bin.get_static_pad('src').unlink(sink_pad)
            self.src_bin.set_state(Gst.State.NULL)
239 240 241 242 243 244
            self.pipeline.remove(self.src_bin)

            if not self.stream_failed_once:
                # Add fallback source
                self.src_bin = self.get_fallback_src()
                self.pipeline.add(self.src_bin)
245
                self.src_bin.get_static_pad('src').link(sink_pad)
246 247
                self.stream_failed_once = True
            else:
248
                reason = nbxmpp.Node('reason')
249
                reason.setTag('failed-application')
250
                self.session.remove_content(self.creator, self.name, reason)
251 252

            # Start playing again
253
            self.pipeline.set_state(Gst.State.PLAYING)
254

255 256
    @staticmethod
    def get_fallback_src():
257
        return Gst.ElementFactory.make('fakesrc', None)
258

259
    def on_negotiated(self):
260
        if self.accepted:
261 262 263 264 265 266
            if self.p2psession.get_property('codecs'):
                # those lines MUST be done after we get info on our codecs
                if self.transport.remote_candidates:
                    self.p2pstream.add_remote_candidates(
                        self.transport.remote_candidates)
                    self.transport.remote_candidates = []
267 268 269
                    # TODO: Farstream.StreamDirection.BOTH only if senders='both'
#                    self.p2pstream.set_property('direction',
#                        Farstream.StreamDirection.BOTH)
270
        JingleContent.on_negotiated(self)
271 272 273 274 275 276 277 278

    def __on_remote_codecs(self, stanza, content, error, action):
        """
        Get peer codecs from what we get from peer
        """

        codecs = []
        for codec in content.getTag('description').iterTags('payload-type'):
279 280 281
            if not codec['id'] or not codec['name'] or not codec['clockrate']:
                # ignore invalid payload-types
                continue
282
            c = Farstream.Codec.new(int(codec['id']), codec['name'],
283
                                    self.farstream_media, int(codec['clockrate']))
284 285 286 287
            if 'channels' in codec:
                c.channels = int(codec['channels'])
            else:
                c.channels = 1
288 289
            for p in codec.iterTags('parameter'):
                c.add_optional_parameter(p['name'], str(p['value']))
290 291 292
            codecs.append(c)

        if codecs:
293 294 295 296
            try:
                self.p2pstream.set_remote_codecs(codecs)
            except GLib.Error:
                raise FailedApplication
297 298

    def iter_codecs(self):
299
        codecs = self.p2psession.props.codecs_without_config
300
        for codec in codecs:
301 302
            attrs = {
                'name': codec.encoding_name,
303
                'id': codec.id,
304 305
                'channels': codec.channels
            }
306 307 308
            if codec.clock_rate:
                attrs['clockrate'] = codec.clock_rate
            if codec.optional_params:
309 310 311
                payload = [nbxmpp.Node('parameter',
                                       {'name': p.name, 'value': p.value})
                           for p in codec.optional_params]
312
            else:
313
                payload = []
314
            yield nbxmpp.Node('payload-type', attrs, payload)
315 316

    def __stop(self, *things):
317
        self.pipeline.set_state(Gst.State.NULL)
318 319 320 321 322 323 324 325

    def __del__(self):
        self.__stop()

    def destroy(self):
        JingleContent.destroy(self)
        self.p2pstream.disconnect_by_func(self._on_src_pad_added)
        self.pipeline.get_bus().disconnect_by_func(self._on_gst_message)
326 327 328


class JingleAudio(JingleRTPContent):
329 330 331 332
    """
    Jingle VoIP sessions consist of audio content transported over an ICE UDP
    protocol
    """
333

334 335 336
    def __init__(self, session, transport=None):
        JingleRTPContent.__init__(self, session, 'audio', transport)
        self.setup_stream()
337

338 339 340 341 342
    def set_mic_volume(self, vol):
        """
        vol must be between 0 ans 1
        """
        self.mic_volume.set_property('volume', vol)
343

344 345 346 347 348
    def set_out_volume(self, vol):
        """
        vol must be between 0 ans 1
        """
        self.out_volume.set_property('volume', vol)
349

350
    def setup_stream(self):
351
        JingleRTPContent.setup_stream(self, self._on_src_pad_added)
352

353 354 355 356 357
        # Configure SPEEX
        # Workaround for psi (not needed since rev
        # 147aedcea39b43402fe64c533d1866a25449888a):
        #  place 16kHz before 8kHz, as buggy psi versions will take in
        #  account only the first codec
358

359 360 361
        codecs = [
            Farstream.Codec.new(Farstream.CODEC_ID_ANY, 'SPEEX',
                                Farstream.MediaType.AUDIO, 16000),
362
            Farstream.Codec.new(Farstream.CODEC_ID_ANY, 'SPEEX',
363
                                Farstream.MediaType.AUDIO, 8000)]
364
        self.p2psession.set_codec_preferences(codecs)
365

366 367
        # the local parts
        # TODO: Add queues?
368
        self.src_bin = self.make_bin_from_config('audio_input_device',
369 370
                                                 '%s ! audioconvert',
                                                 _("audio input"))
371

372
        self.sink = self.make_bin_from_config('audio_output_device',
373 374
                                              'audioconvert ! volume name=gajim_out_vol ! %s',
                                              _("audio output"))
375

376
        self.mic_volume = self.src_bin.get_by_name('gajim_vol')
377
        self.out_volume = self.sink.get_by_name('gajim_out_vol')
378

379
        # link gst elements
380 381
        self.pipeline.add(self.sink)
        self.pipeline.add(self.src_bin)
382

383 384
        self.src_bin.get_static_pad('src').link(self.p2psession.get_property(
            'sink-pad'))
385

386
        # The following is needed for farstream to process ICE requests:
387
        self.pipeline.set_state(Gst.State.PLAYING)
388 389 390


class JingleVideo(JingleRTPContent):
391
    def __init__(self, session, transport=None, in_xid=0, out_xid=0):
392
        JingleRTPContent.__init__(self, session, 'video', transport)
393 394 395
        self.in_xid = in_xid
        self.out_xid = out_xid
        self.out_xid_set = False
396
        self.setup_stream()
397

398 399 400 401
    def setup_stream(self):
        # TODO: Everything is not working properly:
        # sometimes, one window won't show up,
        # sometimes it'll freeze...
402
        JingleRTPContent.setup_stream(self, self._on_src_pad_added)
403
        bus = self.pipeline.get_bus()
404
        bus.enable_sync_message_emission()
405
        bus.connect('sync-message::element', self._on_sync_message)
406

407
        # the local parts
408
        if app.config.get('video_framerate'):
409
            framerate = 'videorate ! video/x-raw,framerate=%s ! ' % \
410
                app.config.get('video_framerate')
411 412 413
        else:
            framerate = ''
        try:
414
            w, h = app.config.get('video_size').split('x')
415
        except Exception:
416 417
            w = h = None
        if w and h:
418
            video_size = 'video/x-raw,width=%s,height=%s ! ' % (w, h)
419 420
        else:
            video_size = ''
421
        if app.config.get('video_see_self'):
422 423
            tee = '! tee name=t ! queue ! videoscale ! ' + \
                'video/x-raw,width=160,height=120 ! videoconvert ! ' + \
424
                '%s t. ! queue ' % app.config.get(
425 426 427 428
                    'video_output_device')
        else:
            tee = ''

429
        self.src_bin = self.make_bin_from_config('video_input_device',
430 431 432
                                                 '%%s %s! %svideoscale ! %svideoconvert' %
                                                 (tee, framerate, video_size),
                                                 _("video input"))
433

434
        self.pipeline.add(self.src_bin)
435
        self.pipeline.set_state(Gst.State.PLAYING)
436

437
        self.sink = self.make_bin_from_config('video_output_device',
438 439
                                              'videoscale ! videoconvert ! %s',
                                              _("video output"))
440

441
        self.pipeline.add(self.sink)
442

443
        self.src_bin.get_static_pad('src').link(self.p2psession.get_property(
444
            'sink-pad'))
445

446
        # The following is needed for farstream to process ICE requests:
447
        self.pipeline.set_state(Gst.State.PLAYING)
448

449
    def _on_sync_message(self, bus, message):
450
        if message.get_structure() is None:
451
            return False
452
        if message.get_structure().get_name() == 'prepare-window-handle':
453 454
            message.src.set_property('force-aspect-ratio', True)
            imagesink = message.src
455
            if app.config.get('video_see_self') and not self.out_xid_set:
456
                imagesink.set_window_handle(self.out_xid)
457 458
                self.out_xid_set = True
            else:
459
                imagesink.set_window_handle(self.in_xid)
460

461 462
    def get_fallback_src(self):
        # TODO: Use avatar?
463 464
        pipeline = 'videotestsrc is-live=true ! video/x-raw,framerate=10/1 ! videoconvert'
        return Gst.parse_bin_from_description(pipeline, True)
465

466 467 468 469
    def destroy(self):
        JingleRTPContent.destroy(self)
        self.pipeline.get_bus().disconnect_by_func(self._on_sync_message)

470
def get_content(desc):
471 472
    if desc['media'] == 'audio':
        return JingleAudio
473
    if desc['media'] == 'video':
474
        return JingleVideo
475

476
contents[nbxmpp.NS_JINGLE_RTP] = get_content