1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/media/mtransport/transportflow.cpp Wed Dec 31 06:09:35 2014 +0100 1.3 @@ -0,0 +1,255 @@ 1.4 +/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ 1.5 +/* vim: set ts=2 et sw=2 tw=80: */ 1.6 +/* This Source Code Form is subject to the terms of the Mozilla Public 1.7 + * License, v. 2.0. If a copy of the MPL was not distributed with this file, 1.8 + * You can obtain one at http://mozilla.org/MPL/2.0/. */ 1.9 + 1.10 +// Original author: ekr@rtfm.com 1.11 +#include <deque> 1.12 + 1.13 +#include "logging.h" 1.14 +#include "runnable_utils.h" 1.15 +#include "transportflow.h" 1.16 +#include "transportlayer.h" 1.17 + 1.18 +namespace mozilla { 1.19 + 1.20 +MOZ_MTLOG_MODULE("mtransport") 1.21 + 1.22 +NS_IMPL_ISUPPORTS0(TransportFlow) 1.23 + 1.24 +// There are some hacks here to allow destruction off of 1.25 +// the main thread. 1.26 +TransportFlow::~TransportFlow() { 1.27 + // Make sure that if we are off the right thread, we have 1.28 + // no more attached signals. 1.29 + if (!CheckThreadInt()) { 1.30 + MOZ_ASSERT(SignalStateChange.is_empty()); 1.31 + MOZ_ASSERT(SignalPacketReceived.is_empty()); 1.32 + } 1.33 + 1.34 + // Push the destruction onto the STS thread. Note that there 1.35 + // is still some possibility that someone is accessing this 1.36 + // object simultaneously, but as long as smart pointer discipline 1.37 + // is maintained, it shouldn't be possible to access and 1.38 + // destroy it simultaneously. The conversion to an nsAutoPtr 1.39 + // ensures automatic destruction of the queue at exit of 1.40 + // DestroyFinal. 1.41 + nsAutoPtr<std::deque<TransportLayer*> > layers_tmp(layers_.forget()); 1.42 + RUN_ON_THREAD(target_, 1.43 + WrapRunnableNM(&TransportFlow::DestroyFinal, layers_tmp), 1.44 + NS_DISPATCH_NORMAL); 1.45 +} 1.46 + 1.47 +void TransportFlow::DestroyFinal(nsAutoPtr<std::deque<TransportLayer *> > layers) { 1.48 + ClearLayers(layers); 1.49 +} 1.50 + 1.51 +void TransportFlow::ClearLayers(std::queue<TransportLayer *>* layers) { 1.52 + while (!layers->empty()) { 1.53 + delete layers->front(); 1.54 + layers->pop(); 1.55 + } 1.56 +} 1.57 + 1.58 +void TransportFlow::ClearLayers(std::deque<TransportLayer *>* layers) { 1.59 + while (!layers->empty()) { 1.60 + delete layers->front(); 1.61 + layers->pop_front(); 1.62 + } 1.63 +} 1.64 + 1.65 +nsresult TransportFlow::PushLayer(TransportLayer *layer) { 1.66 + CheckThread(); 1.67 + ScopedDeletePtr<TransportLayer> layer_tmp(layer); // Destroy on failure. 1.68 + 1.69 + // Don't allow pushes once we are in error state. 1.70 + if (state_ == TransportLayer::TS_ERROR) { 1.71 + MOZ_MTLOG(ML_ERROR, id_ + ": Can't call PushLayer in error state for flow"); 1.72 + return NS_ERROR_FAILURE; 1.73 + } 1.74 + 1.75 + nsresult rv = layer->Init(); 1.76 + if (!NS_SUCCEEDED(rv)) { 1.77 + // Destroy the rest of the flow, because it's no longer in an acceptable 1.78 + // state. 1.79 + ClearLayers(layers_.get()); 1.80 + 1.81 + // Set ourselves to have failed. 1.82 + MOZ_MTLOG(ML_ERROR, id_ << ": Layer initialization failed; invalidating"); 1.83 + StateChangeInt(TransportLayer::TS_ERROR); 1.84 + 1.85 + return rv; 1.86 + } 1.87 + EnsureSameThread(layer); 1.88 + 1.89 + TransportLayer *old_layer = layers_->empty() ? nullptr : layers_->front(); 1.90 + 1.91 + // Re-target my signals to the new layer 1.92 + if (old_layer) { 1.93 + old_layer->SignalStateChange.disconnect(this); 1.94 + old_layer->SignalPacketReceived.disconnect(this); 1.95 + } 1.96 + layers_->push_front(layer_tmp.forget()); 1.97 + layer->Inserted(this, old_layer); 1.98 + 1.99 + layer->SignalStateChange.connect(this, &TransportFlow::StateChange); 1.100 + layer->SignalPacketReceived.connect(this, &TransportFlow::PacketReceived); 1.101 + StateChangeInt(layer->state()); 1.102 + 1.103 + return NS_OK; 1.104 +} 1.105 + 1.106 +// This is all-or-nothing. 1.107 +nsresult TransportFlow::PushLayers(nsAutoPtr<std::queue<TransportLayer *> > layers) { 1.108 + CheckThread(); 1.109 + 1.110 + MOZ_ASSERT(!layers->empty()); 1.111 + if (layers->empty()) { 1.112 + MOZ_MTLOG(ML_ERROR, id_ << ": Can't call PushLayers with empty layers"); 1.113 + return NS_ERROR_INVALID_ARG; 1.114 + } 1.115 + 1.116 + // Don't allow pushes once we are in error state. 1.117 + if (state_ == TransportLayer::TS_ERROR) { 1.118 + MOZ_MTLOG(ML_ERROR, 1.119 + id_ << ": Can't call PushLayers in error state for flow "); 1.120 + ClearLayers(layers.get()); 1.121 + return NS_ERROR_FAILURE; 1.122 + } 1.123 + 1.124 + nsresult rv = NS_OK; 1.125 + 1.126 + // Disconnect all the old signals. 1.127 + disconnect_all(); 1.128 + 1.129 + TransportLayer *layer; 1.130 + 1.131 + while (!layers->empty()) { 1.132 + TransportLayer *old_layer = layers_->empty() ? nullptr : layers_->front(); 1.133 + layer = layers->front(); 1.134 + 1.135 + rv = layer->Init(); 1.136 + if (NS_FAILED(rv)) { 1.137 + MOZ_MTLOG(ML_ERROR, 1.138 + id_ << ": Layer initialization failed; invalidating flow "); 1.139 + break; 1.140 + } 1.141 + 1.142 + EnsureSameThread(layer); 1.143 + 1.144 + // Push the layer onto the queue. 1.145 + layers_->push_front(layer); 1.146 + layers->pop(); 1.147 + layer->Inserted(this, old_layer); 1.148 + } 1.149 + 1.150 + if (NS_FAILED(rv)) { 1.151 + // Destroy any layers we could not push. 1.152 + ClearLayers(layers); 1.153 + 1.154 + // Now destroy the rest of the flow, because it's no longer 1.155 + // in an acceptable state. 1.156 + ClearLayers(layers_); 1.157 + 1.158 + // Set ourselves to have failed. 1.159 + StateChangeInt(TransportLayer::TS_ERROR); 1.160 + 1.161 + // Return failure. 1.162 + return rv; 1.163 + } 1.164 + 1.165 + // Finally, attach ourselves to the top layer. 1.166 + layer->SignalStateChange.connect(this, &TransportFlow::StateChange); 1.167 + layer->SignalPacketReceived.connect(this, &TransportFlow::PacketReceived); 1.168 + StateChangeInt(layer->state()); // Signals if the state changes. 1.169 + 1.170 + return NS_OK; 1.171 +} 1.172 + 1.173 +TransportLayer *TransportFlow::top() const { 1.174 + CheckThread(); 1.175 + 1.176 + return layers_->empty() ? nullptr : layers_->front(); 1.177 +} 1.178 + 1.179 +TransportLayer *TransportFlow::GetLayer(const std::string& id) const { 1.180 + CheckThread(); 1.181 + 1.182 + for (std::deque<TransportLayer *>::const_iterator it = layers_->begin(); 1.183 + it != layers_->end(); ++it) { 1.184 + if ((*it)->id() == id) 1.185 + return *it; 1.186 + } 1.187 + 1.188 + return nullptr; 1.189 +} 1.190 + 1.191 +TransportLayer::State TransportFlow::state() { 1.192 + CheckThread(); 1.193 + 1.194 + return state_; 1.195 +} 1.196 + 1.197 +TransportResult TransportFlow::SendPacket(const unsigned char *data, 1.198 + size_t len) { 1.199 + CheckThread(); 1.200 + 1.201 + if (state_ != TransportLayer::TS_OPEN) { 1.202 + return TE_ERROR; 1.203 + } 1.204 + return top() ? top()->SendPacket(data, len) : TE_ERROR; 1.205 +} 1.206 + 1.207 +bool TransportFlow::Contains(TransportLayer *layer) const { 1.208 + if (layers_) { 1.209 + for (auto l = layers_->begin(); l != layers_->end(); ++l) { 1.210 + if (*l == layer) { 1.211 + return true; 1.212 + } 1.213 + } 1.214 + } 1.215 + return false; 1.216 +} 1.217 + 1.218 +void TransportFlow::EnsureSameThread(TransportLayer *layer) { 1.219 + // Enforce that if any of the layers have a thread binding, 1.220 + // they all have the same binding. 1.221 + if (target_) { 1.222 + const nsCOMPtr<nsIEventTarget>& lthread = layer->GetThread(); 1.223 + 1.224 + if (lthread && (lthread != target_)) 1.225 + MOZ_CRASH(); 1.226 + } 1.227 + else { 1.228 + target_ = layer->GetThread(); 1.229 + } 1.230 +} 1.231 + 1.232 +void TransportFlow::StateChangeInt(TransportLayer::State state) { 1.233 + CheckThread(); 1.234 + 1.235 + if (state == state_) { 1.236 + return; 1.237 + } 1.238 + 1.239 + state_ = state; 1.240 + SignalStateChange(this, state_); 1.241 +} 1.242 + 1.243 +void TransportFlow::StateChange(TransportLayer *layer, 1.244 + TransportLayer::State state) { 1.245 + CheckThread(); 1.246 + 1.247 + StateChangeInt(state); 1.248 +} 1.249 + 1.250 +void TransportFlow::PacketReceived(TransportLayer* layer, 1.251 + const unsigned char *data, 1.252 + size_t len) { 1.253 + CheckThread(); 1.254 + 1.255 + SignalPacketReceived(this, data, len); 1.256 +} 1.257 + 1.258 +} // close namespace