michael@0: /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ michael@0: /* This Source Code Form is subject to the terms of the Mozilla Public michael@0: * License, v. 2.0. If a copy of the MPL was not distributed with this file, michael@0: * You can obtain one at http://mozilla.org/MPL/2.0/. */ michael@0: michael@0: // Original author: ekr@rtfm.com michael@0: michael@0: #include "logging.h" michael@0: #include "MediaPipeline.h" michael@0: michael@0: #ifndef USE_FAKE_MEDIA_STREAMS michael@0: #include "MediaStreamGraphImpl.h" michael@0: #endif michael@0: michael@0: #include michael@0: michael@0: #include "nspr.h" michael@0: #include "srtp.h" michael@0: michael@0: #ifdef MOZILLA_INTERNAL_API michael@0: #include "VideoSegment.h" michael@0: #include "Layers.h" michael@0: #include "ImageTypes.h" michael@0: #include "ImageContainer.h" michael@0: #include "VideoUtils.h" michael@0: #ifdef WEBRTC_GONK michael@0: #include "GrallocImages.h" michael@0: #include "mozilla/layers/GrallocTextureClient.h" michael@0: #endif michael@0: #endif michael@0: michael@0: #include "nsError.h" michael@0: #include "AudioSegment.h" michael@0: #include "MediaSegment.h" michael@0: #include "databuffer.h" michael@0: #include "transportflow.h" michael@0: #include "transportlayer.h" michael@0: #include "transportlayerdtls.h" michael@0: #include "transportlayerice.h" michael@0: #include "runnable_utils.h" michael@0: #include "libyuv/convert.h" michael@0: #include "mozilla/gfx/Point.h" michael@0: #include "mozilla/gfx/Types.h" michael@0: michael@0: #include "webrtc/modules/interface/module_common_types.h" michael@0: #include "webrtc/modules/rtp_rtcp/interface/rtp_header_parser.h" michael@0: michael@0: using namespace mozilla; michael@0: using namespace mozilla::gfx; michael@0: michael@0: // Logging context michael@0: MOZ_MTLOG_MODULE("mediapipeline") michael@0: michael@0: namespace mozilla { michael@0: michael@0: static char kDTLSExporterLabel[] = "EXTRACTOR-dtls_srtp"; michael@0: michael@0: MediaPipeline::~MediaPipeline() { michael@0: ASSERT_ON_THREAD(main_thread_); michael@0: MOZ_ASSERT(!stream_); // Check that we have shut down already. michael@0: MOZ_MTLOG(ML_INFO, "Destroying MediaPipeline: " << description_); michael@0: } michael@0: michael@0: nsresult MediaPipeline::Init() { michael@0: ASSERT_ON_THREAD(main_thread_); michael@0: michael@0: RUN_ON_THREAD(sts_thread_, michael@0: WrapRunnable( michael@0: nsRefPtr(this), michael@0: &MediaPipeline::Init_s), michael@0: NS_DISPATCH_NORMAL); michael@0: michael@0: return NS_OK; michael@0: } michael@0: michael@0: nsresult MediaPipeline::Init_s() { michael@0: ASSERT_ON_THREAD(sts_thread_); michael@0: conduit_->AttachTransport(transport_); michael@0: michael@0: nsresult res; michael@0: MOZ_ASSERT(rtp_.transport_); michael@0: MOZ_ASSERT(rtcp_.transport_); michael@0: res = ConnectTransport_s(rtp_); michael@0: if (NS_FAILED(res)) { michael@0: return res; michael@0: } michael@0: michael@0: if (rtcp_.transport_ != rtp_.transport_) { michael@0: res = ConnectTransport_s(rtcp_); michael@0: if (NS_FAILED(res)) { michael@0: return res; michael@0: } michael@0: } michael@0: michael@0: if (possible_bundle_rtp_) { michael@0: MOZ_ASSERT(possible_bundle_rtcp_); michael@0: MOZ_ASSERT(possible_bundle_rtp_->transport_); michael@0: MOZ_ASSERT(possible_bundle_rtcp_->transport_); michael@0: michael@0: res = ConnectTransport_s(*possible_bundle_rtp_); michael@0: if (NS_FAILED(res)) { michael@0: return res; michael@0: } michael@0: michael@0: if (possible_bundle_rtcp_->transport_ != possible_bundle_rtp_->transport_) { michael@0: res = ConnectTransport_s(*possible_bundle_rtcp_); michael@0: if (NS_FAILED(res)) { michael@0: return res; michael@0: } michael@0: } michael@0: } michael@0: michael@0: return NS_OK; michael@0: } michael@0: michael@0: michael@0: // Disconnect us from the transport so that we can cleanly destruct the michael@0: // pipeline on the main thread. ShutdownMedia_m() must have already been michael@0: // called michael@0: void MediaPipeline::ShutdownTransport_s() { michael@0: ASSERT_ON_THREAD(sts_thread_); michael@0: MOZ_ASSERT(!stream_); // verifies that ShutdownMedia_m() has run michael@0: michael@0: disconnect_all(); michael@0: transport_->Detach(); michael@0: rtp_.transport_ = nullptr; michael@0: rtcp_.transport_ = nullptr; michael@0: possible_bundle_rtp_ = nullptr; michael@0: possible_bundle_rtcp_ = nullptr; michael@0: } michael@0: michael@0: void MediaPipeline::StateChange(TransportFlow *flow, TransportLayer::State state) { michael@0: TransportInfo* info = GetTransportInfo_s(flow); michael@0: MOZ_ASSERT(info); michael@0: michael@0: if (state == TransportLayer::TS_OPEN) { michael@0: MOZ_MTLOG(ML_INFO, "Flow is ready"); michael@0: TransportReady_s(*info); michael@0: } else if (state == TransportLayer::TS_CLOSED || michael@0: state == TransportLayer::TS_ERROR) { michael@0: TransportFailed_s(*info); michael@0: } michael@0: } michael@0: michael@0: static bool MakeRtpTypeToStringArray(const char** array) { michael@0: static const char* RTP_str = "RTP"; michael@0: static const char* RTCP_str = "RTCP"; michael@0: static const char* MUX_str = "RTP/RTCP mux"; michael@0: array[MediaPipeline::RTP] = RTP_str; michael@0: array[MediaPipeline::RTCP] = RTCP_str; michael@0: array[MediaPipeline::MUX] = MUX_str; michael@0: return true; michael@0: } michael@0: michael@0: static const char* ToString(MediaPipeline::RtpType type) { michael@0: static const char* array[(int)MediaPipeline::MAX_RTP_TYPE] = {nullptr}; michael@0: // Dummy variable to cause init to happen only on first call michael@0: static bool dummy = MakeRtpTypeToStringArray(array); michael@0: (void)dummy; michael@0: return array[type]; michael@0: } michael@0: michael@0: nsresult MediaPipeline::TransportReady_s(TransportInfo &info) { michael@0: MOZ_ASSERT(!description_.empty()); michael@0: michael@0: // TODO(ekr@rtfm.com): implement some kind of notification on michael@0: // failure. bug 852665. michael@0: if (info.state_ != MP_CONNECTING) { michael@0: MOZ_MTLOG(ML_ERROR, "Transport ready for flow in wrong state:" << michael@0: description_ << ": " << ToString(info.type_)); michael@0: return NS_ERROR_FAILURE; michael@0: } michael@0: michael@0: MOZ_MTLOG(ML_INFO, "Transport ready for pipeline " << michael@0: static_cast(this) << " flow " << description_ << ": " << michael@0: ToString(info.type_)); michael@0: michael@0: // TODO(bcampen@mozilla.com): Should we disconnect from the flow on failure? michael@0: nsresult res; michael@0: michael@0: // Now instantiate the SRTP objects michael@0: TransportLayerDtls *dtls = static_cast( michael@0: info.transport_->GetLayer(TransportLayerDtls::ID())); michael@0: MOZ_ASSERT(dtls); // DTLS is mandatory michael@0: michael@0: uint16_t cipher_suite; michael@0: res = dtls->GetSrtpCipher(&cipher_suite); michael@0: if (NS_FAILED(res)) { michael@0: MOZ_MTLOG(ML_ERROR, "Failed to negotiate DTLS-SRTP. This is an error"); michael@0: info.state_ = MP_CLOSED; michael@0: UpdateRtcpMuxState(info); michael@0: return res; michael@0: } michael@0: michael@0: // SRTP Key Exporter as per RFC 5764 S 4.2 michael@0: unsigned char srtp_block[SRTP_TOTAL_KEY_LENGTH * 2]; michael@0: res = dtls->ExportKeyingMaterial(kDTLSExporterLabel, false, "", michael@0: srtp_block, sizeof(srtp_block)); michael@0: if (NS_FAILED(res)) { michael@0: MOZ_MTLOG(ML_ERROR, "Failed to compute DTLS-SRTP keys. This is an error"); michael@0: info.state_ = MP_CLOSED; michael@0: UpdateRtcpMuxState(info); michael@0: MOZ_CRASH(); // TODO: Remove once we have enough field experience to michael@0: // know it doesn't happen. bug 798797. Note that the michael@0: // code after this never executes. michael@0: return res; michael@0: } michael@0: michael@0: // Slice and dice as per RFC 5764 S 4.2 michael@0: unsigned char client_write_key[SRTP_TOTAL_KEY_LENGTH]; michael@0: unsigned char server_write_key[SRTP_TOTAL_KEY_LENGTH]; michael@0: int offset = 0; michael@0: memcpy(client_write_key, srtp_block + offset, SRTP_MASTER_KEY_LENGTH); michael@0: offset += SRTP_MASTER_KEY_LENGTH; michael@0: memcpy(server_write_key, srtp_block + offset, SRTP_MASTER_KEY_LENGTH); michael@0: offset += SRTP_MASTER_KEY_LENGTH; michael@0: memcpy(client_write_key + SRTP_MASTER_KEY_LENGTH, michael@0: srtp_block + offset, SRTP_MASTER_SALT_LENGTH); michael@0: offset += SRTP_MASTER_SALT_LENGTH; michael@0: memcpy(server_write_key + SRTP_MASTER_KEY_LENGTH, michael@0: srtp_block + offset, SRTP_MASTER_SALT_LENGTH); michael@0: offset += SRTP_MASTER_SALT_LENGTH; michael@0: MOZ_ASSERT(offset == sizeof(srtp_block)); michael@0: michael@0: unsigned char *write_key; michael@0: unsigned char *read_key; michael@0: michael@0: if (dtls->role() == TransportLayerDtls::CLIENT) { michael@0: write_key = client_write_key; michael@0: read_key = server_write_key; michael@0: } else { michael@0: write_key = server_write_key; michael@0: read_key = client_write_key; michael@0: } michael@0: michael@0: MOZ_ASSERT(!info.send_srtp_ && !info.recv_srtp_); michael@0: info.send_srtp_ = SrtpFlow::Create(cipher_suite, false, write_key, michael@0: SRTP_TOTAL_KEY_LENGTH); michael@0: info.recv_srtp_ = SrtpFlow::Create(cipher_suite, true, read_key, michael@0: SRTP_TOTAL_KEY_LENGTH); michael@0: if (!info.send_srtp_ || !info.recv_srtp_) { michael@0: MOZ_MTLOG(ML_ERROR, "Couldn't create SRTP flow for " michael@0: << ToString(info.type_)); michael@0: info.state_ = MP_CLOSED; michael@0: UpdateRtcpMuxState(info); michael@0: return NS_ERROR_FAILURE; michael@0: } michael@0: michael@0: MOZ_MTLOG(ML_INFO, "Listening for " << ToString(info.type_) michael@0: << " packets received on " << michael@0: static_cast(dtls->downward())); michael@0: michael@0: switch (info.type_) { michael@0: case RTP: michael@0: dtls->downward()->SignalPacketReceived.connect( michael@0: this, michael@0: &MediaPipeline::RtpPacketReceived); michael@0: break; michael@0: case RTCP: michael@0: dtls->downward()->SignalPacketReceived.connect( michael@0: this, michael@0: &MediaPipeline::RtcpPacketReceived); michael@0: break; michael@0: case MUX: michael@0: dtls->downward()->SignalPacketReceived.connect( michael@0: this, michael@0: &MediaPipeline::PacketReceived); michael@0: break; michael@0: default: michael@0: MOZ_CRASH(); michael@0: } michael@0: michael@0: info.state_ = MP_OPEN; michael@0: UpdateRtcpMuxState(info); michael@0: return NS_OK; michael@0: } michael@0: michael@0: nsresult MediaPipeline::TransportFailed_s(TransportInfo &info) { michael@0: ASSERT_ON_THREAD(sts_thread_); michael@0: michael@0: info.state_ = MP_CLOSED; michael@0: UpdateRtcpMuxState(info); michael@0: michael@0: MOZ_MTLOG(ML_INFO, "Transport closed for flow " << ToString(info.type_)); michael@0: michael@0: NS_WARNING( michael@0: "MediaPipeline Transport failed. This is not properly cleaned up yet"); michael@0: michael@0: // TODO(ekr@rtfm.com): SECURITY: Figure out how to clean up if the michael@0: // connection was good and now it is bad. michael@0: // TODO(ekr@rtfm.com): Report up so that the PC knows we michael@0: // have experienced an error. michael@0: michael@0: return NS_OK; michael@0: } michael@0: michael@0: void MediaPipeline::UpdateRtcpMuxState(TransportInfo &info) { michael@0: if (info.type_ == MUX) { michael@0: if (info.transport_ == rtcp_.transport_) { michael@0: rtcp_.state_ = info.state_; michael@0: if (!rtcp_.send_srtp_) { michael@0: rtcp_.send_srtp_ = info.send_srtp_; michael@0: rtcp_.recv_srtp_ = info.recv_srtp_; michael@0: } michael@0: } else if (possible_bundle_rtcp_ && michael@0: info.transport_ == possible_bundle_rtcp_->transport_) { michael@0: possible_bundle_rtcp_->state_ = info.state_; michael@0: if (!possible_bundle_rtcp_->send_srtp_) { michael@0: possible_bundle_rtcp_->send_srtp_ = info.send_srtp_; michael@0: possible_bundle_rtcp_->recv_srtp_ = info.recv_srtp_; michael@0: } michael@0: } michael@0: } michael@0: } michael@0: michael@0: nsresult MediaPipeline::SendPacket(TransportFlow *flow, const void *data, michael@0: int len) { michael@0: ASSERT_ON_THREAD(sts_thread_); michael@0: michael@0: // Note that we bypass the DTLS layer here michael@0: TransportLayerDtls *dtls = static_cast( michael@0: flow->GetLayer(TransportLayerDtls::ID())); michael@0: MOZ_ASSERT(dtls); michael@0: michael@0: TransportResult res = dtls->downward()-> michael@0: SendPacket(static_cast(data), len); michael@0: michael@0: if (res != len) { michael@0: // Ignore blocking indications michael@0: if (res == TE_WOULDBLOCK) michael@0: return NS_OK; michael@0: michael@0: MOZ_MTLOG(ML_ERROR, "Failed write on stream"); michael@0: return NS_BASE_STREAM_CLOSED; michael@0: } michael@0: michael@0: return NS_OK; michael@0: } michael@0: michael@0: void MediaPipeline::increment_rtp_packets_sent(int32_t bytes) { michael@0: ++rtp_packets_sent_; michael@0: rtp_bytes_sent_ += bytes; michael@0: michael@0: if (!(rtp_packets_sent_ % 100)) { michael@0: MOZ_MTLOG(ML_INFO, "RTP sent packet count for " << description_ michael@0: << " Pipeline " << static_cast(this) michael@0: << " Flow : " << static_cast(rtp_.transport_) michael@0: << ": " << rtp_packets_sent_ michael@0: << " (" << rtp_bytes_sent_ << " bytes)"); michael@0: } michael@0: } michael@0: michael@0: void MediaPipeline::increment_rtcp_packets_sent() { michael@0: ++rtcp_packets_sent_; michael@0: if (!(rtcp_packets_sent_ % 100)) { michael@0: MOZ_MTLOG(ML_INFO, "RTCP sent packet count for " << description_ michael@0: << " Pipeline " << static_cast(this) michael@0: << " Flow : " << static_cast(rtcp_.transport_) michael@0: << ": " << rtcp_packets_sent_); michael@0: } michael@0: } michael@0: michael@0: void MediaPipeline::increment_rtp_packets_received(int32_t bytes) { michael@0: ++rtp_packets_received_; michael@0: rtp_bytes_received_ += bytes; michael@0: if (!(rtp_packets_received_ % 100)) { michael@0: MOZ_MTLOG(ML_INFO, "RTP received packet count for " << description_ michael@0: << " Pipeline " << static_cast(this) michael@0: << " Flow : " << static_cast(rtp_.transport_) michael@0: << ": " << rtp_packets_received_ michael@0: << " (" << rtp_bytes_received_ << " bytes)"); michael@0: } michael@0: } michael@0: michael@0: void MediaPipeline::increment_rtcp_packets_received() { michael@0: ++rtcp_packets_received_; michael@0: if (!(rtcp_packets_received_ % 100)) { michael@0: MOZ_MTLOG(ML_INFO, "RTCP received packet count for " << description_ michael@0: << " Pipeline " << static_cast(this) michael@0: << " Flow : " << static_cast(rtcp_.transport_) michael@0: << ": " << rtcp_packets_received_); michael@0: } michael@0: } michael@0: michael@0: void MediaPipeline::RtpPacketReceived(TransportLayer *layer, michael@0: const unsigned char *data, michael@0: size_t len) { michael@0: if (!transport_->pipeline()) { michael@0: MOZ_MTLOG(ML_ERROR, "Discarding incoming packet; transport disconnected"); michael@0: return; michael@0: } michael@0: michael@0: if (!conduit_) { michael@0: MOZ_MTLOG(ML_DEBUG, "Discarding incoming packet; media disconnected"); michael@0: return; michael@0: } michael@0: michael@0: TransportInfo* info = &rtp_; michael@0: michael@0: if (possible_bundle_rtp_ && michael@0: possible_bundle_rtp_->transport_->Contains(layer)) { michael@0: // Received this on our possible bundle transport. Override info. michael@0: info = possible_bundle_rtp_; michael@0: } michael@0: michael@0: // TODO(bcampen@mozilla.com): Can either of these actually happen? If not, michael@0: // the info variable can be removed, and this function gets simpler. michael@0: if (info->state_ != MP_OPEN) { michael@0: MOZ_MTLOG(ML_ERROR, "Discarding incoming packet; pipeline not open"); michael@0: return; michael@0: } michael@0: michael@0: if (info->transport_->state() != TransportLayer::TS_OPEN) { michael@0: MOZ_MTLOG(ML_ERROR, "Discarding incoming packet; transport not open"); michael@0: return; michael@0: } michael@0: michael@0: // This should never happen. michael@0: MOZ_ASSERT(info->recv_srtp_); michael@0: michael@0: if (direction_ == TRANSMIT) { michael@0: return; michael@0: } michael@0: michael@0: if (possible_bundle_rtp_ && (info == &rtp_)) { michael@0: // We were not sure we would be using rtp_ or possible_bundle_rtp_, but we michael@0: // have just received traffic that clears this up. michael@0: // Don't let our filter prevent us from noticing this, if the filter is michael@0: // incomplete (ie; no SSRCs in remote SDP, or no remote SDP at all). michael@0: SetUsingBundle_s(false); michael@0: MOZ_MTLOG(ML_INFO, "Ruled out the possibility that we're receiving bundle " michael@0: "for " << description_); michael@0: // TODO(bcampen@mozilla.com): Might be nice to detect when every michael@0: // MediaPipeline but the master has determined that it isn't doing bundle, michael@0: // since that means the master isn't doing bundle either. We could maybe michael@0: // do this by putting some refcounted dummy variable in the filters, and michael@0: // checking the value of the refcount. It is not clear whether this is michael@0: // going to be useful in practice. michael@0: } michael@0: michael@0: if (!len) { michael@0: return; michael@0: } michael@0: michael@0: // Filter out everything but RTP/RTCP michael@0: if (data[0] < 128 || data[0] > 191) { michael@0: return; michael@0: } michael@0: michael@0: if (filter_) { michael@0: webrtc::RTPHeader header; michael@0: if (!rtp_parser_->Parse(data, len, &header) || michael@0: !filter_->Filter(header)) { michael@0: return; michael@0: } michael@0: } michael@0: michael@0: if (possible_bundle_rtp_) { michael@0: // Just got traffic that passed our filter on the potential bundle michael@0: // transport. Must be doing bundle. michael@0: SetUsingBundle_s(true); michael@0: MOZ_MTLOG(ML_INFO, "Confirmed the possibility that we're receiving bundle"); michael@0: } michael@0: michael@0: // Everything is decided now; just use rtp_ michael@0: MOZ_ASSERT(!possible_bundle_rtp_); michael@0: MOZ_ASSERT(!possible_bundle_rtcp_); michael@0: michael@0: // Make a copy rather than cast away constness michael@0: ScopedDeletePtr inner_data( michael@0: new unsigned char[len]); michael@0: memcpy(inner_data, data, len); michael@0: int out_len = 0; michael@0: nsresult res = rtp_.recv_srtp_->UnprotectRtp(inner_data, michael@0: len, len, &out_len); michael@0: if (!NS_SUCCEEDED(res)) { michael@0: char tmp[16]; michael@0: michael@0: PR_snprintf(tmp, sizeof(tmp), "%.2x %.2x %.2x %.2x", michael@0: inner_data[0], michael@0: inner_data[1], michael@0: inner_data[2], michael@0: inner_data[3]); michael@0: michael@0: MOZ_MTLOG(ML_NOTICE, "Error unprotecting RTP in " << description_ michael@0: << "len= " << len << "[" << tmp << "...]"); michael@0: michael@0: return; michael@0: } michael@0: increment_rtp_packets_received(out_len); michael@0: michael@0: (void)conduit_->ReceivedRTPPacket(inner_data, out_len); // Ignore error codes michael@0: } michael@0: michael@0: void MediaPipeline::RtcpPacketReceived(TransportLayer *layer, michael@0: const unsigned char *data, michael@0: size_t len) { michael@0: if (!transport_->pipeline()) { michael@0: MOZ_MTLOG(ML_DEBUG, "Discarding incoming packet; transport disconnected"); michael@0: return; michael@0: } michael@0: michael@0: if (!conduit_) { michael@0: MOZ_MTLOG(ML_DEBUG, "Discarding incoming packet; media disconnected"); michael@0: return; michael@0: } michael@0: michael@0: TransportInfo* info = &rtcp_; michael@0: if (possible_bundle_rtcp_ && michael@0: possible_bundle_rtcp_->transport_->Contains(layer)) { michael@0: info = possible_bundle_rtcp_; michael@0: } michael@0: michael@0: if (info->state_ != MP_OPEN) { michael@0: MOZ_MTLOG(ML_DEBUG, "Discarding incoming packet; pipeline not open"); michael@0: return; michael@0: } michael@0: michael@0: if (info->transport_->state() != TransportLayer::TS_OPEN) { michael@0: MOZ_MTLOG(ML_ERROR, "Discarding incoming packet; transport not open"); michael@0: return; michael@0: } michael@0: michael@0: if (possible_bundle_rtp_ && (info == &rtcp_)) { michael@0: // We have offered bundle, and received our first packet on a non-bundle michael@0: // address. We are definitely not using the bundle address. michael@0: SetUsingBundle_s(false); michael@0: } michael@0: michael@0: if (!len) { michael@0: return; michael@0: } michael@0: michael@0: // Filter out everything but RTP/RTCP michael@0: if (data[0] < 128 || data[0] > 191) { michael@0: return; michael@0: } michael@0: michael@0: MediaPipelineFilter::Result filter_result = MediaPipelineFilter::PASS; michael@0: if (filter_) { michael@0: filter_result = filter_->FilterRTCP(data, len); michael@0: if (filter_result == MediaPipelineFilter::FAIL) { michael@0: return; michael@0: } michael@0: } michael@0: michael@0: if (filter_result == MediaPipelineFilter::PASS && possible_bundle_rtp_) { michael@0: // Just got traffic that passed our filter on the potential bundle michael@0: // transport. Must be doing bundle. michael@0: SetUsingBundle_s(true); michael@0: } michael@0: michael@0: // We continue using info here, since it is possible that the filter did not michael@0: // support the payload type (ie; returned MediaPipelineFilter::UNSUPPORTED). michael@0: // In this case, we just let it pass, and hope the webrtc.org code does michael@0: // something sane. michael@0: increment_rtcp_packets_received(); michael@0: michael@0: MOZ_ASSERT(info->recv_srtp_); // This should never happen michael@0: michael@0: // Make a copy rather than cast away constness michael@0: ScopedDeletePtr inner_data( michael@0: new unsigned char[len]); michael@0: memcpy(inner_data, data, len); michael@0: int out_len; michael@0: michael@0: nsresult res = info->recv_srtp_->UnprotectRtcp(inner_data, michael@0: len, michael@0: len, michael@0: &out_len); michael@0: michael@0: if (!NS_SUCCEEDED(res)) michael@0: return; michael@0: michael@0: (void)conduit_->ReceivedRTCPPacket(inner_data, out_len); // Ignore error codes michael@0: } michael@0: michael@0: bool MediaPipeline::IsRtp(const unsigned char *data, size_t len) { michael@0: if (len < 2) michael@0: return false; michael@0: michael@0: // Check if this is a RTCP packet. Logic based on the types listed in michael@0: // media/webrtc/trunk/src/modules/rtp_rtcp/source/rtp_utility.cc michael@0: michael@0: // Anything outside this range is RTP. michael@0: if ((data[1] < 192) || (data[1] > 207)) michael@0: return true; michael@0: michael@0: if (data[1] == 192) // FIR michael@0: return false; michael@0: michael@0: if (data[1] == 193) // NACK, but could also be RTP. This makes us sad michael@0: return true; // but it's how webrtc.org behaves. michael@0: michael@0: if (data[1] == 194) michael@0: return true; michael@0: michael@0: if (data[1] == 195) // IJ. michael@0: return false; michael@0: michael@0: if ((data[1] > 195) && (data[1] < 200)) // the > 195 is redundant michael@0: return true; michael@0: michael@0: if ((data[1] >= 200) && (data[1] <= 207)) // SR, RR, SDES, BYE, michael@0: return false; // APP, RTPFB, PSFB, XR michael@0: michael@0: MOZ_ASSERT(false); // Not reached, belt and suspenders. michael@0: return true; michael@0: } michael@0: michael@0: void MediaPipeline::PacketReceived(TransportLayer *layer, michael@0: const unsigned char *data, michael@0: size_t len) { michael@0: if (!transport_->pipeline()) { michael@0: MOZ_MTLOG(ML_DEBUG, "Discarding incoming packet; transport disconnected"); michael@0: return; michael@0: } michael@0: michael@0: if (IsRtp(data, len)) { michael@0: RtpPacketReceived(layer, data, len); michael@0: } else { michael@0: RtcpPacketReceived(layer, data, len); michael@0: } michael@0: } michael@0: michael@0: nsresult MediaPipelineTransmit::Init() { michael@0: char track_id_string[11]; michael@0: ASSERT_ON_THREAD(main_thread_); michael@0: michael@0: // We can replace this when we are allowed to do streams or std::to_string michael@0: PR_snprintf(track_id_string, sizeof(track_id_string), "%d", track_id_); michael@0: michael@0: description_ = pc_ + "| "; michael@0: description_ += conduit_->type() == MediaSessionConduit::AUDIO ? michael@0: "Transmit audio[" : "Transmit video["; michael@0: description_ += track_id_string; michael@0: description_ += "]"; michael@0: michael@0: // TODO(ekr@rtfm.com): Check for errors michael@0: MOZ_MTLOG(ML_DEBUG, "Attaching pipeline to stream " michael@0: << static_cast(stream_) << " conduit type=" << michael@0: (conduit_->type() == MediaSessionConduit::AUDIO ?"audio":"video")); michael@0: michael@0: stream_->AddListener(listener_); michael@0: michael@0: // Is this a gUM mediastream? If so, also register the Listener directly with michael@0: // the SourceMediaStream that's attached to the TrackUnion so we can get direct michael@0: // unqueued (and not resampled) data michael@0: if (domstream_->AddDirectListener(listener_)) { michael@0: listener_->direct_connect_ = true; michael@0: } michael@0: michael@0: return MediaPipeline::Init(); michael@0: } michael@0: michael@0: nsresult MediaPipelineTransmit::TransportReady_s(TransportInfo &info) { michael@0: ASSERT_ON_THREAD(sts_thread_); michael@0: // Call base ready function. michael@0: MediaPipeline::TransportReady_s(info); michael@0: michael@0: // Should not be set for a transmitter michael@0: MOZ_ASSERT(!possible_bundle_rtp_); michael@0: if (&info == &rtp_) { michael@0: // TODO(ekr@rtfm.com): Move onto MSG thread. michael@0: listener_->SetActive(true); michael@0: } michael@0: michael@0: return NS_OK; michael@0: } michael@0: michael@0: void MediaPipeline::DisconnectTransport_s(TransportInfo &info) { michael@0: MOZ_ASSERT(info.transport_); michael@0: ASSERT_ON_THREAD(sts_thread_); michael@0: michael@0: info.transport_->SignalStateChange.disconnect(this); michael@0: // We do this even if we're a transmitter, since we are still possibly michael@0: // registered to receive RTCP. michael@0: TransportLayerDtls *dtls = static_cast( michael@0: info.transport_->GetLayer(TransportLayerDtls::ID())); michael@0: MOZ_ASSERT(dtls); // DTLS is mandatory michael@0: MOZ_ASSERT(dtls->downward()); michael@0: dtls->downward()->SignalPacketReceived.disconnect(this); michael@0: } michael@0: michael@0: nsresult MediaPipeline::ConnectTransport_s(TransportInfo &info) { michael@0: MOZ_ASSERT(info.transport_); michael@0: ASSERT_ON_THREAD(sts_thread_); michael@0: michael@0: // Look to see if the transport is ready michael@0: if (info.transport_->state() == TransportLayer::TS_OPEN) { michael@0: nsresult res = TransportReady_s(info); michael@0: if (NS_FAILED(res)) { michael@0: MOZ_MTLOG(ML_ERROR, "Error calling TransportReady(); res=" michael@0: << static_cast(res) << " in " << __FUNCTION__); michael@0: return res; michael@0: } michael@0: } else if (info.transport_->state() == TransportLayer::TS_ERROR) { michael@0: MOZ_MTLOG(ML_ERROR, ToString(info.type_) michael@0: << "transport is already in error state"); michael@0: TransportFailed_s(info); michael@0: return NS_ERROR_FAILURE; michael@0: } michael@0: michael@0: info.transport_->SignalStateChange.connect(this, michael@0: &MediaPipeline::StateChange); michael@0: michael@0: return NS_OK; michael@0: } michael@0: michael@0: MediaPipeline::TransportInfo* MediaPipeline::GetTransportInfo_s( michael@0: TransportFlow *flow) { michael@0: ASSERT_ON_THREAD(sts_thread_); michael@0: if (flow == rtp_.transport_) { michael@0: return &rtp_; michael@0: } michael@0: michael@0: if (flow == rtcp_.transport_) { michael@0: return &rtcp_; michael@0: } michael@0: michael@0: if (possible_bundle_rtp_) { michael@0: if (flow == possible_bundle_rtp_->transport_) { michael@0: return possible_bundle_rtp_; michael@0: } michael@0: michael@0: if (flow == possible_bundle_rtcp_->transport_) { michael@0: return possible_bundle_rtcp_; michael@0: } michael@0: } michael@0: michael@0: return nullptr; michael@0: } michael@0: michael@0: void MediaPipeline::SetUsingBundle_s(bool decision) { michael@0: ASSERT_ON_THREAD(sts_thread_); michael@0: // Note: This can be called either because of events on the STS thread, or michael@0: // by events on the main thread (ie; receiving a remote description). It is michael@0: // best to be careful of races here, so don't assume that transports are open. michael@0: if (!possible_bundle_rtp_) { michael@0: if (!decision) { michael@0: // This can happen on the master pipeline. michael@0: filter_ = nullptr; michael@0: } michael@0: return; michael@0: } michael@0: michael@0: if (direction_ == RECEIVE) { michael@0: if (decision) { michael@0: MOZ_MTLOG(ML_INFO, "Non-master pipeline confirmed bundle for " michael@0: << description_); michael@0: // We're doing bundle. Release the unused flows, and copy the ones we michael@0: // are using into the less wishy-washy members. michael@0: DisconnectTransport_s(rtp_); michael@0: DisconnectTransport_s(rtcp_); michael@0: rtp_ = *possible_bundle_rtp_; michael@0: rtcp_ = *possible_bundle_rtcp_; michael@0: } else { michael@0: MOZ_MTLOG(ML_INFO, "Non-master pipeline confirmed no bundle for " michael@0: << description_); michael@0: // We are not doing bundle michael@0: DisconnectTransport_s(*possible_bundle_rtp_); michael@0: DisconnectTransport_s(*possible_bundle_rtcp_); michael@0: filter_ = nullptr; michael@0: } michael@0: michael@0: // We are no longer in an ambiguous state. michael@0: possible_bundle_rtp_ = nullptr; michael@0: possible_bundle_rtcp_ = nullptr; michael@0: } michael@0: } michael@0: michael@0: MediaPipelineFilter* MediaPipeline::UpdateFilterFromRemoteDescription_s( michael@0: nsAutoPtr filter) { michael@0: ASSERT_ON_THREAD(sts_thread_); michael@0: // This is only supposed to relax the filter. Relaxing a missing filter is michael@0: // not possible. michael@0: MOZ_ASSERT(filter_); michael@0: michael@0: if (!filter) { michael@0: filter_ = nullptr; michael@0: } else { michael@0: filter_->IncorporateRemoteDescription(*filter); michael@0: } michael@0: michael@0: return filter_.get(); michael@0: } michael@0: michael@0: nsresult MediaPipeline::PipelineTransport::SendRtpPacket( michael@0: const void *data, int len) { michael@0: michael@0: nsAutoPtr buf(new DataBuffer(static_cast(data), michael@0: len)); michael@0: michael@0: RUN_ON_THREAD(sts_thread_, michael@0: WrapRunnable( michael@0: RefPtr(this), michael@0: &MediaPipeline::PipelineTransport::SendRtpPacket_s, michael@0: buf), michael@0: NS_DISPATCH_NORMAL); michael@0: michael@0: return NS_OK; michael@0: } michael@0: michael@0: nsresult MediaPipeline::PipelineTransport::SendRtpPacket_s( michael@0: nsAutoPtr data) { michael@0: ASSERT_ON_THREAD(sts_thread_); michael@0: if (!pipeline_) michael@0: return NS_OK; // Detached michael@0: michael@0: if (!pipeline_->rtp_.send_srtp_) { michael@0: MOZ_MTLOG(ML_DEBUG, "Couldn't write RTP packet; SRTP not set up yet"); michael@0: return NS_OK; michael@0: } michael@0: michael@0: MOZ_ASSERT(pipeline_->rtp_.transport_); michael@0: NS_ENSURE_TRUE(pipeline_->rtp_.transport_, NS_ERROR_NULL_POINTER); michael@0: michael@0: // libsrtp enciphers in place, so we need a new, big enough michael@0: // buffer. michael@0: // XXX. allocates and deletes one buffer per packet sent. michael@0: // Bug 822129 michael@0: int max_len = data->len() + SRTP_MAX_EXPANSION; michael@0: ScopedDeletePtr inner_data( michael@0: new unsigned char[max_len]); michael@0: memcpy(inner_data, data->data(), data->len()); michael@0: michael@0: int out_len; michael@0: nsresult res = pipeline_->rtp_.send_srtp_->ProtectRtp(inner_data, michael@0: data->len(), michael@0: max_len, michael@0: &out_len); michael@0: if (!NS_SUCCEEDED(res)) michael@0: return res; michael@0: michael@0: pipeline_->increment_rtp_packets_sent(out_len); michael@0: return pipeline_->SendPacket(pipeline_->rtp_.transport_, inner_data, michael@0: out_len); michael@0: } michael@0: michael@0: nsresult MediaPipeline::PipelineTransport::SendRtcpPacket( michael@0: const void *data, int len) { michael@0: michael@0: nsAutoPtr buf(new DataBuffer(static_cast(data), michael@0: len)); michael@0: michael@0: RUN_ON_THREAD(sts_thread_, michael@0: WrapRunnable( michael@0: RefPtr(this), michael@0: &MediaPipeline::PipelineTransport::SendRtcpPacket_s, michael@0: buf), michael@0: NS_DISPATCH_NORMAL); michael@0: michael@0: return NS_OK; michael@0: } michael@0: michael@0: nsresult MediaPipeline::PipelineTransport::SendRtcpPacket_s( michael@0: nsAutoPtr data) { michael@0: ASSERT_ON_THREAD(sts_thread_); michael@0: if (!pipeline_) michael@0: return NS_OK; // Detached michael@0: michael@0: if (!pipeline_->rtcp_.send_srtp_) { michael@0: MOZ_MTLOG(ML_DEBUG, "Couldn't write RTCP packet; SRTCP not set up yet"); michael@0: return NS_OK; michael@0: } michael@0: michael@0: MOZ_ASSERT(pipeline_->rtcp_.transport_); michael@0: NS_ENSURE_TRUE(pipeline_->rtcp_.transport_, NS_ERROR_NULL_POINTER); michael@0: michael@0: // libsrtp enciphers in place, so we need a new, big enough michael@0: // buffer. michael@0: // XXX. allocates and deletes one buffer per packet sent. michael@0: // Bug 822129. michael@0: int max_len = data->len() + SRTP_MAX_EXPANSION; michael@0: ScopedDeletePtr inner_data( michael@0: new unsigned char[max_len]); michael@0: memcpy(inner_data, data->data(), data->len()); michael@0: michael@0: int out_len; michael@0: nsresult res = pipeline_->rtcp_.send_srtp_->ProtectRtcp(inner_data, michael@0: data->len(), michael@0: max_len, michael@0: &out_len); michael@0: michael@0: if (!NS_SUCCEEDED(res)) michael@0: return res; michael@0: michael@0: pipeline_->increment_rtcp_packets_sent(); michael@0: return pipeline_->SendPacket(pipeline_->rtcp_.transport_, inner_data, michael@0: out_len); michael@0: } michael@0: michael@0: // Called if we're attached with AddDirectListener() michael@0: void MediaPipelineTransmit::PipelineListener:: michael@0: NotifyRealtimeData(MediaStreamGraph* graph, TrackID tid, michael@0: TrackRate rate, michael@0: TrackTicks offset, michael@0: uint32_t events, michael@0: const MediaSegment& media) { michael@0: MOZ_MTLOG(ML_DEBUG, "MediaPipeline::NotifyRealtimeData()"); michael@0: michael@0: NewData(graph, tid, rate, offset, events, media); michael@0: } michael@0: michael@0: void MediaPipelineTransmit::PipelineListener:: michael@0: NotifyQueuedTrackChanges(MediaStreamGraph* graph, TrackID tid, michael@0: TrackRate rate, michael@0: TrackTicks offset, michael@0: uint32_t events, michael@0: const MediaSegment& queued_media) { michael@0: MOZ_MTLOG(ML_DEBUG, "MediaPipeline::NotifyQueuedTrackChanges()"); michael@0: michael@0: // ignore non-direct data if we're also getting direct data michael@0: if (!direct_connect_) { michael@0: NewData(graph, tid, rate, offset, events, queued_media); michael@0: } michael@0: } michael@0: michael@0: void MediaPipelineTransmit::PipelineListener:: michael@0: NewData(MediaStreamGraph* graph, TrackID tid, michael@0: TrackRate rate, michael@0: TrackTicks offset, michael@0: uint32_t events, michael@0: const MediaSegment& media) { michael@0: if (!active_) { michael@0: MOZ_MTLOG(ML_DEBUG, "Discarding packets because transport not ready"); michael@0: return; michael@0: } michael@0: michael@0: // TODO(ekr@rtfm.com): For now assume that we have only one michael@0: // track type and it's destined for us michael@0: // See bug 784517 michael@0: if (media.GetType() == MediaSegment::AUDIO) { michael@0: if (conduit_->type() != MediaSessionConduit::AUDIO) { michael@0: // Ignore data in case we have a muxed stream michael@0: return; michael@0: } michael@0: AudioSegment* audio = const_cast( michael@0: static_cast(&media)); michael@0: michael@0: AudioSegment::ChunkIterator iter(*audio); michael@0: while(!iter.IsEnded()) { michael@0: ProcessAudioChunk(static_cast(conduit_.get()), michael@0: rate, *iter); michael@0: iter.Next(); michael@0: } michael@0: } else if (media.GetType() == MediaSegment::VIDEO) { michael@0: #ifdef MOZILLA_INTERNAL_API michael@0: if (conduit_->type() != MediaSessionConduit::VIDEO) { michael@0: // Ignore data in case we have a muxed stream michael@0: return; michael@0: } michael@0: VideoSegment* video = const_cast( michael@0: static_cast(&media)); michael@0: michael@0: VideoSegment::ChunkIterator iter(*video); michael@0: while(!iter.IsEnded()) { michael@0: ProcessVideoChunk(static_cast(conduit_.get()), michael@0: rate, *iter); michael@0: iter.Next(); michael@0: } michael@0: #endif michael@0: } else { michael@0: // Ignore michael@0: } michael@0: } michael@0: michael@0: void MediaPipelineTransmit::PipelineListener::ProcessAudioChunk( michael@0: AudioSessionConduit *conduit, michael@0: TrackRate rate, michael@0: AudioChunk& chunk) { michael@0: // TODO(ekr@rtfm.com): Do more than one channel michael@0: nsAutoArrayPtr samples(new int16_t[chunk.mDuration]); michael@0: michael@0: if (chunk.mBuffer) { michael@0: switch (chunk.mBufferFormat) { michael@0: case AUDIO_FORMAT_FLOAT32: michael@0: { michael@0: const float* buf = static_cast(chunk.mChannelData[0]); michael@0: ConvertAudioSamplesWithScale(buf, static_cast(samples), michael@0: chunk.mDuration, chunk.mVolume); michael@0: } michael@0: break; michael@0: case AUDIO_FORMAT_S16: michael@0: { michael@0: const short* buf = static_cast(chunk.mChannelData[0]); michael@0: ConvertAudioSamplesWithScale(buf, samples, chunk.mDuration, chunk.mVolume); michael@0: } michael@0: break; michael@0: case AUDIO_FORMAT_SILENCE: michael@0: memset(samples, 0, chunk.mDuration * sizeof(samples[0])); michael@0: break; michael@0: default: michael@0: MOZ_ASSERT(PR_FALSE); michael@0: return; michael@0: break; michael@0: } michael@0: } else { michael@0: // This means silence. michael@0: memset(samples, 0, chunk.mDuration * sizeof(samples[0])); michael@0: } michael@0: michael@0: MOZ_ASSERT(!(rate%100)); // rate should be a multiple of 100 michael@0: michael@0: // Check if the rate has changed since the last time we came through michael@0: // I realize it may be overkill to check if the rate has changed, but michael@0: // I believe it is possible (e.g. if we change sources) and it costs us michael@0: // very little to handle this case michael@0: michael@0: if (samplenum_10ms_ != rate/100) { michael@0: // Determine number of samples in 10 ms from the rate: michael@0: samplenum_10ms_ = rate/100; michael@0: // If we switch sample rates (e.g. if we switch codecs), michael@0: // we throw away what was in the sample_10ms_buffer at the old rate michael@0: samples_10ms_buffer_ = new int16_t[samplenum_10ms_]; michael@0: buffer_current_ = 0; michael@0: } michael@0: michael@0: // Vars to handle the non-sunny-day case (where the audio chunks michael@0: // we got are not multiples of 10ms OR there were samples left over michael@0: // from the last run) michael@0: int64_t chunk_remaining; michael@0: int64_t tocpy; michael@0: int16_t *samples_tmp = samples.get(); michael@0: michael@0: chunk_remaining = chunk.mDuration; michael@0: michael@0: MOZ_ASSERT(chunk_remaining >= 0); michael@0: michael@0: if (buffer_current_) { michael@0: tocpy = std::min(chunk_remaining, samplenum_10ms_ - buffer_current_); michael@0: memcpy(&samples_10ms_buffer_[buffer_current_], samples_tmp, tocpy * sizeof(int16_t)); michael@0: buffer_current_ += tocpy; michael@0: samples_tmp += tocpy; michael@0: chunk_remaining -= tocpy; michael@0: michael@0: if (buffer_current_ == samplenum_10ms_) { michael@0: // Send out the audio buffer we just finished filling michael@0: conduit->SendAudioFrame(samples_10ms_buffer_, samplenum_10ms_, rate, 0); michael@0: buffer_current_ = 0; michael@0: } else { michael@0: // We still don't have enough data to send a buffer michael@0: return; michael@0: } michael@0: } michael@0: michael@0: // Now send (more) frames if there is more than 10ms of input left michael@0: tocpy = (chunk_remaining / samplenum_10ms_) * samplenum_10ms_; michael@0: if (tocpy > 0) { michael@0: conduit->SendAudioFrame(samples_tmp, tocpy, rate, 0); michael@0: samples_tmp += tocpy; michael@0: chunk_remaining -= tocpy; michael@0: } michael@0: // Copy what remains for the next run michael@0: michael@0: MOZ_ASSERT(chunk_remaining < samplenum_10ms_); michael@0: michael@0: if (chunk_remaining) { michael@0: memcpy(samples_10ms_buffer_, samples_tmp, chunk_remaining * sizeof(int16_t)); michael@0: buffer_current_ = chunk_remaining; michael@0: } michael@0: michael@0: } michael@0: michael@0: #ifdef MOZILLA_INTERNAL_API michael@0: void MediaPipelineTransmit::PipelineListener::ProcessVideoChunk( michael@0: VideoSessionConduit* conduit, michael@0: TrackRate rate, michael@0: VideoChunk& chunk) { michael@0: layers::Image *img = chunk.mFrame.GetImage(); michael@0: michael@0: // We now need to send the video frame to the other side michael@0: if (!img) { michael@0: // segment.AppendFrame() allows null images, which show up here as null michael@0: return; michael@0: } michael@0: michael@0: gfx::IntSize size = img->GetSize(); michael@0: if ((size.width & 1) != 0 || (size.height & 1) != 0) { michael@0: MOZ_ASSERT(false, "Can't handle odd-sized images"); michael@0: return; michael@0: } michael@0: michael@0: if (chunk.mFrame.GetForceBlack()) { michael@0: uint32_t yPlaneLen = size.width*size.height; michael@0: uint32_t cbcrPlaneLen = yPlaneLen/2; michael@0: uint32_t length = yPlaneLen + cbcrPlaneLen; michael@0: michael@0: // Send a black image. michael@0: nsAutoArrayPtr pixelData; michael@0: static const fallible_t fallible = fallible_t(); michael@0: pixelData = new (fallible) uint8_t[length]; michael@0: if (pixelData) { michael@0: memset(pixelData, 0x10, yPlaneLen); michael@0: // Fill Cb/Cr planes michael@0: memset(pixelData + yPlaneLen, 0x80, cbcrPlaneLen); michael@0: michael@0: MOZ_MTLOG(ML_DEBUG, "Sending a black video frame"); michael@0: conduit->SendVideoFrame(pixelData, length, size.width, size.height, michael@0: mozilla::kVideoI420, 0); michael@0: } michael@0: return; michael@0: } michael@0: michael@0: // We get passed duplicate frames every ~10ms even if there's no frame change! michael@0: int32_t serial = img->GetSerial(); michael@0: if (serial == last_img_) { michael@0: return; michael@0: } michael@0: last_img_ = serial; michael@0: michael@0: ImageFormat format = img->GetFormat(); michael@0: #ifdef WEBRTC_GONK michael@0: if (format == ImageFormat::GRALLOC_PLANAR_YCBCR) { michael@0: layers::GrallocImage *nativeImage = static_cast(img); michael@0: android::sp graphicBuffer = nativeImage->GetGraphicBuffer(); michael@0: void *basePtr; michael@0: graphicBuffer->lock(android::GraphicBuffer::USAGE_SW_READ_MASK, &basePtr); michael@0: conduit->SendVideoFrame(static_cast(basePtr), michael@0: (graphicBuffer->getWidth() * graphicBuffer->getHeight() * 3) / 2, michael@0: graphicBuffer->getWidth(), michael@0: graphicBuffer->getHeight(), michael@0: mozilla::kVideoNV21, 0); michael@0: graphicBuffer->unlock(); michael@0: } else michael@0: #endif michael@0: if (format == ImageFormat::PLANAR_YCBCR) { michael@0: // Cast away constness b/c some of the accessors are non-const michael@0: layers::PlanarYCbCrImage* yuv = michael@0: const_cast( michael@0: static_cast(img)); michael@0: // Big-time assumption here that this is all contiguous data coming michael@0: // from getUserMedia or other sources. michael@0: const layers::PlanarYCbCrData *data = yuv->GetData(); michael@0: michael@0: uint8_t *y = data->mYChannel; michael@0: #ifdef DEBUG michael@0: uint8_t *cb = data->mCbChannel; michael@0: uint8_t *cr = data->mCrChannel; michael@0: #endif michael@0: uint32_t width = yuv->GetSize().width; michael@0: uint32_t height = yuv->GetSize().height; michael@0: uint32_t length = yuv->GetDataSize(); michael@0: michael@0: // SendVideoFrame only supports contiguous YCrCb 4:2:0 buffers michael@0: // Verify it's contiguous and in the right order michael@0: MOZ_ASSERT(cb == (y + width*height) && michael@0: cr == (cb + width*height/4)); michael@0: // XXX Consider making this a non-debug-only check if we ever implement michael@0: // any subclasses of PlanarYCbCrImage that allow disjoint buffers such michael@0: // that y+3(width*height)/2 might go outside the allocation. michael@0: // GrallocImage can have wider strides, and so in some cases michael@0: // would encode as garbage. If we need to encode it we'll either want to michael@0: // modify SendVideoFrame or copy/move the data in the buffer. michael@0: michael@0: // OK, pass it on to the conduit michael@0: MOZ_MTLOG(ML_DEBUG, "Sending a video frame"); michael@0: // Not much for us to do with an error michael@0: conduit->SendVideoFrame(y, length, width, height, mozilla::kVideoI420, 0); michael@0: } else if(format == ImageFormat::CAIRO_SURFACE) { michael@0: layers::CairoImage* rgb = michael@0: const_cast( michael@0: static_cast(img)); michael@0: michael@0: gfx::IntSize size = rgb->GetSize(); michael@0: int half_width = (size.width + 1) >> 1; michael@0: int half_height = (size.height + 1) >> 1; michael@0: int c_size = half_width * half_height; michael@0: int buffer_size = size.width * size.height + 2 * c_size; michael@0: uint8* yuv = (uint8*) malloc(buffer_size); michael@0: if (!yuv) michael@0: return; michael@0: michael@0: int cb_offset = size.width * size.height; michael@0: int cr_offset = cb_offset + c_size; michael@0: RefPtr tempSurf = rgb->GetAsSourceSurface(); michael@0: RefPtr surf = tempSurf->GetDataSurface(); michael@0: michael@0: switch (surf->GetFormat()) { michael@0: case gfx::SurfaceFormat::B8G8R8A8: michael@0: case gfx::SurfaceFormat::B8G8R8X8: michael@0: libyuv::ARGBToI420(static_cast(surf->GetData()), surf->Stride(), michael@0: yuv, size.width, michael@0: yuv + cb_offset, half_width, michael@0: yuv + cr_offset, half_width, michael@0: size.width, size.height); michael@0: break; michael@0: case gfx::SurfaceFormat::R5G6B5: michael@0: libyuv::RGB565ToI420(static_cast(surf->GetData()), surf->Stride(), michael@0: yuv, size.width, michael@0: yuv + cb_offset, half_width, michael@0: yuv + cr_offset, half_width, michael@0: size.width, size.height); michael@0: break; michael@0: default: michael@0: MOZ_MTLOG(ML_ERROR, "Unsupported RGB video format"); michael@0: MOZ_ASSERT(PR_FALSE); michael@0: } michael@0: conduit->SendVideoFrame(yuv, buffer_size, size.width, size.height, mozilla::kVideoI420, 0); michael@0: } else { michael@0: MOZ_MTLOG(ML_ERROR, "Unsupported video format"); michael@0: MOZ_ASSERT(PR_FALSE); michael@0: return; michael@0: } michael@0: } michael@0: #endif michael@0: michael@0: nsresult MediaPipelineReceiveAudio::Init() { michael@0: char track_id_string[11]; michael@0: ASSERT_ON_THREAD(main_thread_); michael@0: MOZ_MTLOG(ML_DEBUG, __FUNCTION__); michael@0: michael@0: // We can replace this when we are allowed to do streams or std::to_string michael@0: PR_snprintf(track_id_string, sizeof(track_id_string), "%d", track_id_); michael@0: michael@0: description_ = pc_ + "| Receive audio["; michael@0: description_ += track_id_string; michael@0: description_ += "]"; michael@0: michael@0: listener_->AddSelf(new AudioSegment()); michael@0: michael@0: return MediaPipelineReceive::Init(); michael@0: } michael@0: michael@0: michael@0: // Add a track and listener on the MSG thread using the MSG command queue michael@0: static void AddTrackAndListener(MediaStream* source, michael@0: TrackID track_id, TrackRate track_rate, michael@0: MediaStreamListener* listener, MediaSegment* segment, michael@0: const RefPtr& completed) { michael@0: // This both adds the listener and the track michael@0: #ifdef MOZILLA_INTERNAL_API michael@0: class Message : public ControlMessage { michael@0: public: michael@0: Message(MediaStream* stream, TrackID track, TrackRate rate, michael@0: MediaSegment* segment, MediaStreamListener* listener, michael@0: const RefPtr& completed) michael@0: : ControlMessage(stream), michael@0: track_id_(track), michael@0: track_rate_(rate), michael@0: segment_(segment), michael@0: listener_(listener), michael@0: completed_(completed) {} michael@0: michael@0: virtual void Run() MOZ_OVERRIDE { michael@0: StreamTime current_end = mStream->GetBufferEnd(); michael@0: TrackTicks current_ticks = TimeToTicksRoundUp(track_rate_, current_end); michael@0: michael@0: mStream->AddListenerImpl(listener_.forget()); michael@0: michael@0: // Add a track 'now' to avoid possible underrun, especially if we add michael@0: // a track "later". michael@0: michael@0: if (current_end != 0L) { michael@0: MOZ_MTLOG(ML_DEBUG, "added track @ " << current_end << michael@0: " -> " << MediaTimeToSeconds(current_end)); michael@0: } michael@0: michael@0: // To avoid assertions, we need to insert a dummy segment that covers up michael@0: // to the "start" time for the track michael@0: segment_->AppendNullData(current_ticks); michael@0: mStream->AsSourceStream()->AddTrack(track_id_, track_rate_, michael@0: current_ticks, segment_); michael@0: // AdvanceKnownTracksTicksTime(HEAT_DEATH_OF_UNIVERSE) means that in michael@0: // theory per the API, we can't add more tracks before that michael@0: // time. However, the impl actually allows it, and it avoids a whole michael@0: // bunch of locking that would be required (and potential blocking) michael@0: // if we used smaller values and updated them on each NotifyPull. michael@0: mStream->AsSourceStream()->AdvanceKnownTracksTime(STREAM_TIME_MAX); michael@0: michael@0: // We need to know how much has been "inserted" because we're given absolute michael@0: // times in NotifyPull. michael@0: completed_->TrackAdded(current_ticks); michael@0: } michael@0: private: michael@0: TrackID track_id_; michael@0: TrackRate track_rate_; michael@0: MediaSegment* segment_; michael@0: nsRefPtr listener_; michael@0: const RefPtr completed_; michael@0: }; michael@0: michael@0: MOZ_ASSERT(listener); michael@0: michael@0: source->GraphImpl()->AppendMessage(new Message(source, track_id, track_rate, segment, listener, completed)); michael@0: #else michael@0: source->AddListener(listener); michael@0: source->AsSourceStream()->AddTrack(track_id, track_rate, 0, segment); michael@0: #endif michael@0: } michael@0: michael@0: void GenericReceiveListener::AddSelf(MediaSegment* segment) { michael@0: RefPtr callback = new GenericReceiveCallback(this); michael@0: AddTrackAndListener(source_, track_id_, track_rate_, this, segment, callback); michael@0: } michael@0: michael@0: MediaPipelineReceiveAudio::PipelineListener::PipelineListener( michael@0: SourceMediaStream * source, TrackID track_id, michael@0: const RefPtr& conduit) michael@0: : GenericReceiveListener(source, track_id, 16000), // XXX rate assumption michael@0: conduit_(conduit) michael@0: { michael@0: MOZ_ASSERT(track_rate_%100 == 0); michael@0: } michael@0: michael@0: void MediaPipelineReceiveAudio::PipelineListener:: michael@0: NotifyPull(MediaStreamGraph* graph, StreamTime desired_time) { michael@0: MOZ_ASSERT(source_); michael@0: if (!source_) { michael@0: MOZ_MTLOG(ML_ERROR, "NotifyPull() called from a non-SourceMediaStream"); michael@0: return; michael@0: } michael@0: michael@0: // This comparison is done in total time to avoid accumulated roundoff errors. michael@0: while (TicksToTimeRoundDown(track_rate_, played_ticks_) < desired_time) { michael@0: // TODO(ekr@rtfm.com): Is there a way to avoid mallocating here? Or reduce the size? michael@0: // Max size given mono is 480*2*1 = 960 (48KHz) michael@0: #define AUDIO_SAMPLE_BUFFER_MAX 1000 michael@0: MOZ_ASSERT((track_rate_/100)*sizeof(uint16_t) <= AUDIO_SAMPLE_BUFFER_MAX); michael@0: michael@0: nsRefPtr samples = SharedBuffer::Create(AUDIO_SAMPLE_BUFFER_MAX); michael@0: int16_t *samples_data = static_cast(samples->Data()); michael@0: int samples_length; michael@0: michael@0: // This fetches 10ms of data michael@0: MediaConduitErrorCode err = michael@0: static_cast(conduit_.get())->GetAudioFrame( michael@0: samples_data, michael@0: track_rate_, michael@0: 0, // TODO(ekr@rtfm.com): better estimate of "capture" (really playout) delay michael@0: samples_length); michael@0: MOZ_ASSERT(samples_length < AUDIO_SAMPLE_BUFFER_MAX); michael@0: michael@0: if (err != kMediaConduitNoError) { michael@0: // Insert silence on conduit/GIPS failure (extremely unlikely) michael@0: MOZ_MTLOG(ML_ERROR, "Audio conduit failed (" << err michael@0: << ") to return data @ " << played_ticks_ michael@0: << " (desired " << desired_time << " -> " michael@0: << MediaTimeToSeconds(desired_time) << ")"); michael@0: MOZ_ASSERT(err == kMediaConduitNoError); michael@0: samples_length = (track_rate_/100)*sizeof(uint16_t); // if this is not enough we'll loop and provide more michael@0: memset(samples_data, '\0', samples_length); michael@0: } michael@0: michael@0: MOZ_MTLOG(ML_DEBUG, "Audio conduit returned buffer of length " michael@0: << samples_length); michael@0: michael@0: AudioSegment segment; michael@0: nsAutoTArray channels; michael@0: channels.AppendElement(samples_data); michael@0: segment.AppendFrames(samples.forget(), channels, samples_length); michael@0: michael@0: // Handle track not actually added yet or removed/finished michael@0: if (source_->AppendToTrack(track_id_, &segment)) { michael@0: played_ticks_ += track_rate_/100; // 10ms in TrackTicks michael@0: } else { michael@0: MOZ_MTLOG(ML_ERROR, "AppendToTrack failed"); michael@0: // we can't un-read the data, but that's ok since we don't want to michael@0: // buffer - but don't i-loop! michael@0: return; michael@0: } michael@0: } michael@0: } michael@0: michael@0: nsresult MediaPipelineReceiveVideo::Init() { michael@0: char track_id_string[11]; michael@0: ASSERT_ON_THREAD(main_thread_); michael@0: MOZ_MTLOG(ML_DEBUG, __FUNCTION__); michael@0: michael@0: // We can replace this when we are allowed to do streams or std::to_string michael@0: PR_snprintf(track_id_string, sizeof(track_id_string), "%d", track_id_); michael@0: michael@0: description_ = pc_ + "| Receive video["; michael@0: description_ += track_id_string; michael@0: description_ += "]"; michael@0: michael@0: #ifdef MOZILLA_INTERNAL_API michael@0: listener_->AddSelf(new VideoSegment()); michael@0: #endif michael@0: michael@0: // Always happens before we can DetachMediaStream() michael@0: static_cast(conduit_.get())-> michael@0: AttachRenderer(renderer_); michael@0: michael@0: return MediaPipelineReceive::Init(); michael@0: } michael@0: michael@0: MediaPipelineReceiveVideo::PipelineListener::PipelineListener( michael@0: SourceMediaStream* source, TrackID track_id) michael@0: : GenericReceiveListener(source, track_id, USECS_PER_S), michael@0: width_(640), michael@0: height_(480), michael@0: #ifdef MOZILLA_INTERNAL_API michael@0: image_container_(), michael@0: image_(), michael@0: #endif michael@0: monitor_("Video PipelineListener") { michael@0: #ifdef MOZILLA_INTERNAL_API michael@0: image_container_ = layers::LayerManager::CreateImageContainer(); michael@0: #endif michael@0: } michael@0: michael@0: void MediaPipelineReceiveVideo::PipelineListener::RenderVideoFrame( michael@0: const unsigned char* buffer, michael@0: unsigned int buffer_size, michael@0: uint32_t time_stamp, michael@0: int64_t render_time, michael@0: const RefPtr& video_image) { michael@0: #ifdef MOZILLA_INTERNAL_API michael@0: ReentrantMonitorAutoEnter enter(monitor_); michael@0: michael@0: if (buffer) { michael@0: // Create a video frame using |buffer|. michael@0: #ifdef MOZ_WIDGET_GONK michael@0: ImageFormat format = ImageFormat::GRALLOC_PLANAR_YCBCR; michael@0: #else michael@0: ImageFormat format = ImageFormat::PLANAR_YCBCR; michael@0: #endif michael@0: nsRefPtr image = image_container_->CreateImage(format); michael@0: layers::PlanarYCbCrImage* yuvImage = static_cast(image.get()); michael@0: uint8_t* frame = const_cast(static_cast (buffer)); michael@0: const uint8_t lumaBpp = 8; michael@0: const uint8_t chromaBpp = 4; michael@0: michael@0: layers::PlanarYCbCrData yuvData; michael@0: yuvData.mYChannel = frame; michael@0: yuvData.mYSize = IntSize(width_, height_); michael@0: yuvData.mYStride = width_ * lumaBpp/ 8; michael@0: yuvData.mCbCrStride = width_ * chromaBpp / 8; michael@0: yuvData.mCbChannel = frame + height_ * yuvData.mYStride; michael@0: yuvData.mCrChannel = yuvData.mCbChannel + height_ * yuvData.mCbCrStride / 2; michael@0: yuvData.mCbCrSize = IntSize(width_/ 2, height_/ 2); michael@0: yuvData.mPicX = 0; michael@0: yuvData.mPicY = 0; michael@0: yuvData.mPicSize = IntSize(width_, height_); michael@0: yuvData.mStereoMode = StereoMode::MONO; michael@0: michael@0: yuvImage->SetData(yuvData); michael@0: michael@0: image_ = image.forget(); michael@0: } michael@0: #ifdef WEBRTC_GONK michael@0: else { michael@0: // Decoder produced video frame that can be appended to the track directly. michael@0: MOZ_ASSERT(video_image); michael@0: image_ = video_image; michael@0: } michael@0: #endif // WEBRTC_GONK michael@0: #endif // MOZILLA_INTERNAL_API michael@0: } michael@0: michael@0: void MediaPipelineReceiveVideo::PipelineListener:: michael@0: NotifyPull(MediaStreamGraph* graph, StreamTime desired_time) { michael@0: ReentrantMonitorAutoEnter enter(monitor_); michael@0: michael@0: #ifdef MOZILLA_INTERNAL_API michael@0: nsRefPtr image = image_; michael@0: TrackTicks target = TimeToTicksRoundUp(USECS_PER_S, desired_time); michael@0: TrackTicks delta = target - played_ticks_; michael@0: michael@0: // Don't append if we've already provided a frame that supposedly michael@0: // goes past the current aDesiredTime Doing so means a negative michael@0: // delta and thus messes up handling of the graph michael@0: if (delta > 0) { michael@0: VideoSegment segment; michael@0: segment.AppendFrame(image.forget(), delta, IntSize(width_, height_)); michael@0: // Handle track not actually added yet or removed/finished michael@0: if (source_->AppendToTrack(track_id_, &segment)) { michael@0: played_ticks_ = target; michael@0: } else { michael@0: MOZ_MTLOG(ML_ERROR, "AppendToTrack failed"); michael@0: return; michael@0: } michael@0: } michael@0: #endif michael@0: } michael@0: michael@0: michael@0: } // end namespace