michael@0: /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ michael@0: /* This Source Code Form is subject to the terms of the Mozilla Public michael@0: * License, v. 2.0. If a copy of the MPL was not distributed with this file, michael@0: * You can obtain one at http://mozilla.org/MPL/2.0/. */ michael@0: michael@0: // Original author: ekr@rtfm.com michael@0: michael@0: #ifndef mediapipeline_h__ michael@0: #define mediapipeline_h__ michael@0: michael@0: #include "sigslot.h" michael@0: michael@0: #ifdef USE_FAKE_MEDIA_STREAMS michael@0: #include "FakeMediaStreams.h" michael@0: #else michael@0: #include "DOMMediaStream.h" michael@0: #include "MediaStreamGraph.h" michael@0: #include "VideoUtils.h" michael@0: #endif michael@0: #include "MediaConduitInterface.h" michael@0: #include "MediaPipelineFilter.h" michael@0: #include "AudioSegment.h" michael@0: #include "mozilla/ReentrantMonitor.h" michael@0: #include "SrtpFlow.h" michael@0: #include "databuffer.h" michael@0: #include "runnable_utils.h" michael@0: #include "transportflow.h" michael@0: michael@0: #ifdef MOZILLA_INTERNAL_API michael@0: #include "VideoSegment.h" michael@0: #endif michael@0: michael@0: #include "webrtc/modules/rtp_rtcp/interface/rtp_header_parser.h" michael@0: michael@0: namespace mozilla { michael@0: michael@0: // A class that represents the pipeline of audio and video michael@0: // The dataflow looks like: michael@0: // michael@0: // TRANSMIT michael@0: // CaptureDevice -> stream -> [us] -> conduit -> [us] -> transport -> network michael@0: // michael@0: // RECEIVE michael@0: // network -> transport -> [us] -> conduit -> [us] -> stream -> Playout michael@0: // michael@0: // The boxes labeled [us] are just bridge logic implemented in this class michael@0: // michael@0: // We have to deal with a number of threads: michael@0: // michael@0: // GSM: michael@0: // * Assembles the pipeline michael@0: // SocketTransportService michael@0: // * Receives notification that ICE and DTLS have completed michael@0: // * Processes incoming network data and passes it to the conduit michael@0: // * Processes outgoing RTP and RTCP michael@0: // MediaStreamGraph michael@0: // * Receives outgoing data from the MediaStreamGraph michael@0: // * Receives pull requests for more data from the michael@0: // MediaStreamGraph michael@0: // One or another GIPS threads michael@0: // * Receives RTCP messages to send to the other side michael@0: // * Processes video frames GIPS wants to render michael@0: // michael@0: // For a transmitting conduit, "output" is RTP and "input" is RTCP. michael@0: // For a receiving conduit, "input" is RTP and "output" is RTCP. michael@0: // michael@0: class MediaPipeline : public sigslot::has_slots<> { michael@0: public: michael@0: enum Direction { TRANSMIT, RECEIVE }; michael@0: enum State { MP_CONNECTING, MP_OPEN, MP_CLOSED }; michael@0: MediaPipeline(const std::string& pc, michael@0: Direction direction, michael@0: nsCOMPtr main_thread, michael@0: nsCOMPtr sts_thread, michael@0: MediaStream *stream, michael@0: TrackID track_id, michael@0: int level, michael@0: RefPtr conduit, michael@0: RefPtr rtp_transport, michael@0: RefPtr rtcp_transport) michael@0: : direction_(direction), michael@0: stream_(stream), michael@0: track_id_(track_id), michael@0: level_(level), michael@0: conduit_(conduit), michael@0: rtp_(rtp_transport, rtcp_transport ? RTP : MUX), michael@0: rtcp_(rtcp_transport ? rtcp_transport : rtp_transport, michael@0: rtcp_transport ? RTCP : MUX), michael@0: main_thread_(main_thread), michael@0: sts_thread_(sts_thread), michael@0: rtp_packets_sent_(0), michael@0: rtcp_packets_sent_(0), michael@0: rtp_packets_received_(0), michael@0: rtcp_packets_received_(0), michael@0: rtp_bytes_sent_(0), michael@0: rtp_bytes_received_(0), michael@0: pc_(pc), michael@0: description_() { michael@0: // To indicate rtcp-mux rtcp_transport should be nullptr. michael@0: // Therefore it's an error to send in the same flow for michael@0: // both rtp and rtcp. michael@0: MOZ_ASSERT(rtp_transport != rtcp_transport); michael@0: michael@0: // PipelineTransport() will access this->sts_thread_; moved here for safety michael@0: transport_ = new PipelineTransport(this); michael@0: } michael@0: michael@0: virtual ~MediaPipeline(); michael@0: michael@0: // Must be called on the STS thread. Must be called after ShutdownMedia_m(). michael@0: void ShutdownTransport_s(); michael@0: michael@0: // Must be called on the main thread. michael@0: void ShutdownMedia_m() { michael@0: ASSERT_ON_THREAD(main_thread_); michael@0: michael@0: if (stream_) { michael@0: DetachMediaStream(); michael@0: } michael@0: } michael@0: michael@0: virtual nsresult Init(); michael@0: michael@0: // When we have offered bundle, the MediaPipelines are created in an michael@0: // indeterminate state; we do not know whether the answerer will take us michael@0: // up on our offer. In the meantime, we need to behave in a manner that michael@0: // errs on the side of packet loss when it is unclear whether an arriving michael@0: // packet is meant for us. We want to get out of this indeterminate state michael@0: // ASAP, which is what this function can be used for. michael@0: void SetUsingBundle_s(bool decision); michael@0: MediaPipelineFilter* UpdateFilterFromRemoteDescription_s( michael@0: nsAutoPtr filter); michael@0: michael@0: virtual Direction direction() const { return direction_; } michael@0: virtual TrackID trackid() const { return track_id_; } michael@0: virtual int level() const { return level_; } michael@0: michael@0: bool IsDoingRtcpMux() const { michael@0: return (rtp_.type_ == MUX); michael@0: } michael@0: michael@0: int32_t rtp_packets_sent() const { return rtp_packets_sent_; } michael@0: int64_t rtp_bytes_sent() const { return rtp_bytes_sent_; } michael@0: int32_t rtcp_packets_sent() const { return rtcp_packets_sent_; } michael@0: int32_t rtp_packets_received() const { return rtp_packets_received_; } michael@0: int64_t rtp_bytes_received() const { return rtp_bytes_received_; } michael@0: int32_t rtcp_packets_received() const { return rtcp_packets_received_; } michael@0: michael@0: MediaSessionConduit *Conduit() const { return conduit_; } michael@0: michael@0: // Thread counting michael@0: NS_INLINE_DECL_THREADSAFE_REFCOUNTING(MediaPipeline) michael@0: michael@0: typedef enum { michael@0: RTP, michael@0: RTCP, michael@0: MUX, michael@0: MAX_RTP_TYPE michael@0: } RtpType; michael@0: michael@0: protected: michael@0: virtual void DetachMediaStream() {} michael@0: michael@0: // Separate class to allow ref counting michael@0: class PipelineTransport : public TransportInterface { michael@0: public: michael@0: // Implement the TransportInterface functions michael@0: explicit PipelineTransport(MediaPipeline *pipeline) michael@0: : pipeline_(pipeline), michael@0: sts_thread_(pipeline->sts_thread_) {} michael@0: michael@0: void Detach() { pipeline_ = nullptr; } michael@0: MediaPipeline *pipeline() const { return pipeline_; } michael@0: michael@0: virtual nsresult SendRtpPacket(const void* data, int len); michael@0: virtual nsresult SendRtcpPacket(const void* data, int len); michael@0: michael@0: private: michael@0: virtual nsresult SendRtpPacket_s(nsAutoPtr data); michael@0: virtual nsresult SendRtcpPacket_s(nsAutoPtr data); michael@0: michael@0: MediaPipeline *pipeline_; // Raw pointer to avoid cycles michael@0: nsCOMPtr sts_thread_; michael@0: }; michael@0: friend class PipelineTransport; michael@0: michael@0: class TransportInfo { michael@0: public: michael@0: TransportInfo(RefPtr flow, RtpType type) : michael@0: transport_(flow), michael@0: state_(MP_CONNECTING), michael@0: type_(type) { michael@0: MOZ_ASSERT(flow); michael@0: } michael@0: michael@0: RefPtr transport_; michael@0: State state_; michael@0: RefPtr send_srtp_; michael@0: RefPtr recv_srtp_; michael@0: RtpType type_; michael@0: }; michael@0: michael@0: // The transport is down michael@0: virtual nsresult TransportFailed_s(TransportInfo &info); michael@0: // The transport is ready michael@0: virtual nsresult TransportReady_s(TransportInfo &info); michael@0: void UpdateRtcpMuxState(TransportInfo &info); michael@0: michael@0: // Unhooks from signals michael@0: void DisconnectTransport_s(TransportInfo &info); michael@0: nsresult ConnectTransport_s(TransportInfo &info); michael@0: michael@0: TransportInfo* GetTransportInfo_s(TransportFlow *flow); michael@0: michael@0: void increment_rtp_packets_sent(int bytes); michael@0: void increment_rtcp_packets_sent(); michael@0: void increment_rtp_packets_received(int bytes); michael@0: void increment_rtcp_packets_received(); michael@0: michael@0: virtual nsresult SendPacket(TransportFlow *flow, const void *data, int len); michael@0: michael@0: // Process slots on transports michael@0: void StateChange(TransportFlow *flow, TransportLayer::State); michael@0: void RtpPacketReceived(TransportLayer *layer, const unsigned char *data, michael@0: size_t len); michael@0: void RtcpPacketReceived(TransportLayer *layer, const unsigned char *data, michael@0: size_t len); michael@0: void PacketReceived(TransportLayer *layer, const unsigned char *data, michael@0: size_t len); michael@0: michael@0: Direction direction_; michael@0: RefPtr stream_; // A pointer to the stream we are servicing. michael@0: // Written on the main thread. michael@0: // Used on STS and MediaStreamGraph threads. michael@0: TrackID track_id_; // The track on the stream. michael@0: // Written and used as the stream_; michael@0: int level_; // The m-line index (starting at 1, to match convention) michael@0: RefPtr conduit_; // Our conduit. Written on the main michael@0: // thread. Read on STS thread. michael@0: michael@0: // The transport objects. Read/written on STS thread. michael@0: TransportInfo rtp_; michael@0: TransportInfo rtcp_; michael@0: // These are for bundle. We have a separate set because when we have offered michael@0: // bundle, we do not know whether we will receive traffic on the transport michael@0: // in this pipeline's m-line, or the transport in the "master" m-line for michael@0: // the bundle. We need to be ready for either. Once this ambiguity is michael@0: // resolved, the transport we know that we'll be using will be set in michael@0: // rtp_transport_ and rtcp_transport_, and these will be unset. michael@0: // TODO(bcampen@mozilla.com): I'm pretty sure this could be leveraged for michael@0: // re-offer with a new address on an m-line too, with a little work. michael@0: nsAutoPtr possible_bundle_rtp_; michael@0: nsAutoPtr possible_bundle_rtcp_; michael@0: michael@0: // Pointers to the threads we need. Initialized at creation michael@0: // and used all over the place. michael@0: nsCOMPtr main_thread_; michael@0: nsCOMPtr sts_thread_; michael@0: michael@0: // Created on Init. Referenced by the conduit and eventually michael@0: // destroyed on the STS thread. michael@0: RefPtr transport_; michael@0: michael@0: // Only safe to access from STS thread. michael@0: // Build into TransportInfo? michael@0: int32_t rtp_packets_sent_; michael@0: int32_t rtcp_packets_sent_; michael@0: int32_t rtp_packets_received_; michael@0: int32_t rtcp_packets_received_; michael@0: int64_t rtp_bytes_sent_; michael@0: int64_t rtp_bytes_received_; michael@0: michael@0: // Written on Init. Read on STS thread. michael@0: std::string pc_; michael@0: std::string description_; michael@0: michael@0: // Written on Init, all following accesses are on the STS thread. michael@0: nsAutoPtr filter_; michael@0: nsAutoPtr rtp_parser_; michael@0: michael@0: private: michael@0: nsresult Init_s(); michael@0: michael@0: bool IsRtp(const unsigned char *data, size_t len); michael@0: }; michael@0: michael@0: class GenericReceiveListener : public MediaStreamListener michael@0: { michael@0: public: michael@0: GenericReceiveListener(SourceMediaStream *source, TrackID track_id, michael@0: TrackRate track_rate) michael@0: : source_(source), michael@0: track_id_(track_id), michael@0: track_rate_(track_rate), michael@0: played_ticks_(0) {} michael@0: michael@0: virtual ~GenericReceiveListener() {} michael@0: michael@0: void AddSelf(MediaSegment* segment); michael@0: michael@0: void SetPlayedTicks(TrackTicks time) { michael@0: played_ticks_ = time; michael@0: } michael@0: michael@0: void EndTrack() { michael@0: source_->EndTrack(track_id_); michael@0: } michael@0: michael@0: protected: michael@0: SourceMediaStream *source_; michael@0: TrackID track_id_; michael@0: TrackRate track_rate_; michael@0: TrackTicks played_ticks_; michael@0: }; michael@0: michael@0: class TrackAddedCallback { michael@0: public: michael@0: virtual void TrackAdded(TrackTicks current_ticks) = 0; michael@0: michael@0: NS_INLINE_DECL_THREADSAFE_REFCOUNTING(TrackAddedCallback); michael@0: michael@0: protected: michael@0: virtual ~TrackAddedCallback() {} michael@0: }; michael@0: michael@0: class GenericReceiveListener; michael@0: michael@0: class GenericReceiveCallback : public TrackAddedCallback michael@0: { michael@0: public: michael@0: GenericReceiveCallback(GenericReceiveListener* listener) michael@0: : listener_(listener) {} michael@0: michael@0: void TrackAdded(TrackTicks time) { michael@0: listener_->SetPlayedTicks(time); michael@0: } michael@0: michael@0: private: michael@0: RefPtr listener_; michael@0: }; michael@0: michael@0: class ConduitDeleteEvent: public nsRunnable michael@0: { michael@0: public: michael@0: ConduitDeleteEvent(TemporaryRef aConduit) : michael@0: mConduit(aConduit) {} michael@0: michael@0: /* we exist solely to proxy release of the conduit */ michael@0: NS_IMETHOD Run() { return NS_OK; } michael@0: private: michael@0: RefPtr mConduit; michael@0: }; michael@0: michael@0: // A specialization of pipeline for reading from an input device michael@0: // and transmitting to the network. michael@0: class MediaPipelineTransmit : public MediaPipeline { michael@0: public: michael@0: // Set rtcp_transport to nullptr to use rtcp-mux michael@0: MediaPipelineTransmit(const std::string& pc, michael@0: nsCOMPtr main_thread, michael@0: nsCOMPtr sts_thread, michael@0: DOMMediaStream *domstream, michael@0: TrackID track_id, michael@0: int level, michael@0: RefPtr conduit, michael@0: RefPtr rtp_transport, michael@0: RefPtr rtcp_transport) : michael@0: MediaPipeline(pc, TRANSMIT, main_thread, sts_thread, michael@0: domstream->GetStream(), track_id, level, michael@0: conduit, rtp_transport, rtcp_transport), michael@0: listener_(new PipelineListener(conduit)), michael@0: domstream_(domstream) michael@0: {} michael@0: michael@0: // Initialize (stuff here may fail) michael@0: virtual nsresult Init(); michael@0: michael@0: // Called on the main thread. michael@0: virtual void DetachMediaStream() { michael@0: ASSERT_ON_THREAD(main_thread_); michael@0: domstream_->RemoveDirectListener(listener_); michael@0: domstream_ = nullptr; michael@0: stream_->RemoveListener(listener_); michael@0: // Let the listener be destroyed with the pipeline (or later). michael@0: stream_ = nullptr; michael@0: } michael@0: michael@0: // Override MediaPipeline::TransportReady. michael@0: virtual nsresult TransportReady_s(TransportInfo &info); michael@0: michael@0: // Separate class to allow ref counting michael@0: class PipelineListener : public MediaStreamDirectListener { michael@0: friend class MediaPipelineTransmit; michael@0: public: michael@0: PipelineListener(const RefPtr& conduit) michael@0: : conduit_(conduit), michael@0: active_(false), michael@0: direct_connect_(false), michael@0: samples_10ms_buffer_(nullptr), michael@0: buffer_current_(0), michael@0: samplenum_10ms_(0) michael@0: #ifdef MOZILLA_INTERNAL_API michael@0: , last_img_(-1) michael@0: #endif // MOZILLA_INTERNAL_API michael@0: { michael@0: } michael@0: michael@0: ~PipelineListener() michael@0: { michael@0: // release conduit on mainthread. Must use forget()! michael@0: nsresult rv = NS_DispatchToMainThread(new michael@0: ConduitDeleteEvent(conduit_.forget()), NS_DISPATCH_NORMAL); michael@0: MOZ_ASSERT(!NS_FAILED(rv),"Could not dispatch conduit shutdown to main"); michael@0: if (NS_FAILED(rv)) { michael@0: MOZ_CRASH(); michael@0: } michael@0: } michael@0: michael@0: michael@0: // XXX. This is not thread-safe but the hazard is just michael@0: // that active_ = true takes a while to propagate. Revisit michael@0: // when 823600 lands. michael@0: void SetActive(bool active) { active_ = active; } michael@0: michael@0: // Implement MediaStreamListener michael@0: virtual void NotifyQueuedTrackChanges(MediaStreamGraph* graph, TrackID tid, michael@0: TrackRate rate, michael@0: TrackTicks offset, michael@0: uint32_t events, michael@0: const MediaSegment& queued_media) MOZ_OVERRIDE; michael@0: virtual void NotifyPull(MediaStreamGraph* aGraph, StreamTime aDesiredTime) MOZ_OVERRIDE {} michael@0: michael@0: // Implement MediaStreamDirectListener michael@0: virtual void NotifyRealtimeData(MediaStreamGraph* graph, TrackID tid, michael@0: TrackRate rate, michael@0: TrackTicks offset, michael@0: uint32_t events, michael@0: const MediaSegment& media) MOZ_OVERRIDE; michael@0: michael@0: private: michael@0: void NewData(MediaStreamGraph* graph, TrackID tid, michael@0: TrackRate rate, michael@0: TrackTicks offset, michael@0: uint32_t events, michael@0: const MediaSegment& media); michael@0: michael@0: virtual void ProcessAudioChunk(AudioSessionConduit *conduit, michael@0: TrackRate rate, AudioChunk& chunk); michael@0: #ifdef MOZILLA_INTERNAL_API michael@0: virtual void ProcessVideoChunk(VideoSessionConduit *conduit, michael@0: TrackRate rate, VideoChunk& chunk); michael@0: #endif michael@0: RefPtr conduit_; michael@0: volatile bool active_; michael@0: bool direct_connect_; michael@0: michael@0: // These vars handle breaking audio samples into exact 10ms chunks: michael@0: // The buffer of 10ms audio samples that we will send once full michael@0: // (can be carried over from one call to another). michael@0: nsAutoArrayPtr samples_10ms_buffer_; michael@0: // The location of the pointer within that buffer (in units of samples). michael@0: int64_t buffer_current_; michael@0: // The number of samples in a 10ms audio chunk. michael@0: int64_t samplenum_10ms_; michael@0: michael@0: #ifdef MOZILLA_INTERNAL_API michael@0: int32_t last_img_; // serial number of last Image michael@0: #endif // MOZILLA_INTERNAL_API michael@0: }; michael@0: michael@0: private: michael@0: RefPtr listener_; michael@0: DOMMediaStream *domstream_; michael@0: }; michael@0: michael@0: michael@0: // A specialization of pipeline for reading from the network and michael@0: // rendering video. michael@0: class MediaPipelineReceive : public MediaPipeline { michael@0: public: michael@0: // Set rtcp_transport to nullptr to use rtcp-mux michael@0: MediaPipelineReceive(const std::string& pc, michael@0: nsCOMPtr main_thread, michael@0: nsCOMPtr sts_thread, michael@0: MediaStream *stream, michael@0: TrackID track_id, michael@0: int level, michael@0: RefPtr conduit, michael@0: RefPtr rtp_transport, michael@0: RefPtr rtcp_transport, michael@0: RefPtr bundle_rtp_transport, michael@0: RefPtr bundle_rtcp_transport, michael@0: nsAutoPtr filter) : michael@0: MediaPipeline(pc, RECEIVE, main_thread, sts_thread, michael@0: stream, track_id, level, conduit, rtp_transport, michael@0: rtcp_transport), michael@0: segments_added_(0) { michael@0: filter_ = filter; michael@0: rtp_parser_ = webrtc::RtpHeaderParser::Create(); michael@0: if (bundle_rtp_transport) { michael@0: if (bundle_rtcp_transport) { michael@0: MOZ_ASSERT(bundle_rtp_transport != bundle_rtcp_transport); michael@0: possible_bundle_rtp_ = new TransportInfo(bundle_rtp_transport, RTP); michael@0: possible_bundle_rtcp_ = new TransportInfo(bundle_rtcp_transport, RTCP); michael@0: } else { michael@0: possible_bundle_rtp_ = new TransportInfo(bundle_rtp_transport, MUX); michael@0: possible_bundle_rtcp_ = new TransportInfo(bundle_rtp_transport, MUX); michael@0: } michael@0: } michael@0: } michael@0: michael@0: int segments_added() const { return segments_added_; } michael@0: michael@0: protected: michael@0: int segments_added_; michael@0: michael@0: private: michael@0: }; michael@0: michael@0: michael@0: // A specialization of pipeline for reading from the network and michael@0: // rendering audio. michael@0: class MediaPipelineReceiveAudio : public MediaPipelineReceive { michael@0: public: michael@0: MediaPipelineReceiveAudio(const std::string& pc, michael@0: nsCOMPtr main_thread, michael@0: nsCOMPtr sts_thread, michael@0: MediaStream *stream, michael@0: TrackID track_id, michael@0: int level, michael@0: RefPtr conduit, michael@0: RefPtr rtp_transport, michael@0: RefPtr rtcp_transport, michael@0: RefPtr bundle_rtp_transport, michael@0: RefPtr bundle_rtcp_transport, michael@0: nsAutoPtr filter) : michael@0: MediaPipelineReceive(pc, main_thread, sts_thread, michael@0: stream, track_id, level, conduit, rtp_transport, michael@0: rtcp_transport, bundle_rtp_transport, michael@0: bundle_rtcp_transport, filter), michael@0: listener_(new PipelineListener(stream->AsSourceStream(), michael@0: track_id, conduit)) { michael@0: } michael@0: michael@0: virtual void DetachMediaStream() { michael@0: ASSERT_ON_THREAD(main_thread_); michael@0: listener_->EndTrack(); michael@0: stream_->RemoveListener(listener_); michael@0: stream_ = nullptr; michael@0: } michael@0: michael@0: virtual nsresult Init(); michael@0: michael@0: private: michael@0: // Separate class to allow ref counting michael@0: class PipelineListener : public GenericReceiveListener { michael@0: public: michael@0: PipelineListener(SourceMediaStream * source, TrackID track_id, michael@0: const RefPtr& conduit); michael@0: michael@0: ~PipelineListener() michael@0: { michael@0: // release conduit on mainthread. Must use forget()! michael@0: nsresult rv = NS_DispatchToMainThread(new michael@0: ConduitDeleteEvent(conduit_.forget()), NS_DISPATCH_NORMAL); michael@0: MOZ_ASSERT(!NS_FAILED(rv),"Could not dispatch conduit shutdown to main"); michael@0: if (NS_FAILED(rv)) { michael@0: MOZ_CRASH(); michael@0: } michael@0: } michael@0: michael@0: // Implement MediaStreamListener michael@0: virtual void NotifyQueuedTrackChanges(MediaStreamGraph* graph, TrackID tid, michael@0: TrackRate rate, michael@0: TrackTicks offset, michael@0: uint32_t events, michael@0: const MediaSegment& queued_media) MOZ_OVERRIDE {} michael@0: virtual void NotifyPull(MediaStreamGraph* graph, StreamTime desired_time) MOZ_OVERRIDE; michael@0: michael@0: private: michael@0: RefPtr conduit_; michael@0: }; michael@0: michael@0: RefPtr listener_; michael@0: }; michael@0: michael@0: michael@0: // A specialization of pipeline for reading from the network and michael@0: // rendering video. michael@0: class MediaPipelineReceiveVideo : public MediaPipelineReceive { michael@0: public: michael@0: MediaPipelineReceiveVideo(const std::string& pc, michael@0: nsCOMPtr main_thread, michael@0: nsCOMPtr sts_thread, michael@0: MediaStream *stream, michael@0: TrackID track_id, michael@0: int level, michael@0: RefPtr conduit, michael@0: RefPtr rtp_transport, michael@0: RefPtr rtcp_transport, michael@0: RefPtr bundle_rtp_transport, michael@0: RefPtr bundle_rtcp_transport, michael@0: nsAutoPtr filter) : michael@0: MediaPipelineReceive(pc, main_thread, sts_thread, michael@0: stream, track_id, level, conduit, rtp_transport, michael@0: rtcp_transport, bundle_rtp_transport, michael@0: bundle_rtcp_transport, filter), michael@0: renderer_(new PipelineRenderer(MOZ_THIS_IN_INITIALIZER_LIST())), michael@0: listener_(new PipelineListener(stream->AsSourceStream(), track_id)) { michael@0: } michael@0: michael@0: // Called on the main thread. michael@0: virtual void DetachMediaStream() { michael@0: ASSERT_ON_THREAD(main_thread_); michael@0: michael@0: listener_->EndTrack(); michael@0: // stop generating video and thus stop invoking the PipelineRenderer michael@0: // and PipelineListener - the renderer has a raw ptr to the Pipeline to michael@0: // avoid cycles, and the render callbacks are invoked from a different michael@0: // thread so simple null-checks would cause TSAN bugs without locks. michael@0: static_cast(conduit_.get())->DetachRenderer(); michael@0: stream_->RemoveListener(listener_); michael@0: stream_ = nullptr; michael@0: } michael@0: michael@0: virtual nsresult Init(); michael@0: michael@0: private: michael@0: class PipelineRenderer : public VideoRenderer { michael@0: public: michael@0: PipelineRenderer(MediaPipelineReceiveVideo *pipeline) : michael@0: pipeline_(pipeline) {} michael@0: michael@0: void Detach() { pipeline_ = nullptr; } michael@0: michael@0: // Implement VideoRenderer michael@0: virtual void FrameSizeChange(unsigned int width, michael@0: unsigned int height, michael@0: unsigned int number_of_streams) { michael@0: pipeline_->listener_->FrameSizeChange(width, height, number_of_streams); michael@0: } michael@0: michael@0: virtual void RenderVideoFrame(const unsigned char* buffer, michael@0: unsigned int buffer_size, michael@0: uint32_t time_stamp, michael@0: int64_t render_time, michael@0: const ImageHandle& handle) { michael@0: pipeline_->listener_->RenderVideoFrame(buffer, buffer_size, time_stamp, michael@0: render_time, michael@0: handle.GetImage()); michael@0: } michael@0: michael@0: private: michael@0: MediaPipelineReceiveVideo *pipeline_; // Raw pointer to avoid cycles michael@0: }; michael@0: michael@0: // Separate class to allow ref counting michael@0: class PipelineListener : public GenericReceiveListener { michael@0: public: michael@0: PipelineListener(SourceMediaStream * source, TrackID track_id); michael@0: michael@0: // Implement MediaStreamListener michael@0: virtual void NotifyQueuedTrackChanges(MediaStreamGraph* graph, TrackID tid, michael@0: TrackRate rate, michael@0: TrackTicks offset, michael@0: uint32_t events, michael@0: const MediaSegment& queued_media) MOZ_OVERRIDE {} michael@0: virtual void NotifyPull(MediaStreamGraph* graph, StreamTime desired_time) MOZ_OVERRIDE; michael@0: michael@0: // Accessors for external writes from the renderer michael@0: void FrameSizeChange(unsigned int width, michael@0: unsigned int height, michael@0: unsigned int number_of_streams) { michael@0: ReentrantMonitorAutoEnter enter(monitor_); michael@0: michael@0: width_ = width; michael@0: height_ = height; michael@0: } michael@0: michael@0: void RenderVideoFrame(const unsigned char* buffer, michael@0: unsigned int buffer_size, michael@0: uint32_t time_stamp, michael@0: int64_t render_time, michael@0: const RefPtr& video_image); michael@0: michael@0: private: michael@0: int width_; michael@0: int height_; michael@0: #ifdef MOZILLA_INTERNAL_API michael@0: nsRefPtr image_container_; michael@0: nsRefPtr image_; michael@0: #endif michael@0: mozilla::ReentrantMonitor monitor_; // Monitor for processing WebRTC frames. michael@0: // Protects image_ against: michael@0: // - Writing from the GIPS thread michael@0: // - Reading from the MSG thread michael@0: }; michael@0: michael@0: friend class PipelineRenderer; michael@0: michael@0: RefPtr renderer_; michael@0: RefPtr listener_; michael@0: }; michael@0: michael@0: michael@0: } // end namespace michael@0: #endif