michael@0: /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ michael@0: /* vim: set ts=2 et sw=2 tw=80: */ michael@0: /* This Source Code Form is subject to the terms of the Mozilla Public michael@0: * License, v. 2.0. If a copy of the MPL was not distributed with this file, michael@0: * You can obtain one at http://mozilla.org/MPL/2.0/. */ michael@0: michael@0: // Original author: ekr@rtfm.com michael@0: #include michael@0: michael@0: #include "logging.h" michael@0: #include "runnable_utils.h" michael@0: #include "transportflow.h" michael@0: #include "transportlayer.h" michael@0: michael@0: namespace mozilla { michael@0: michael@0: MOZ_MTLOG_MODULE("mtransport") michael@0: michael@0: NS_IMPL_ISUPPORTS0(TransportFlow) michael@0: michael@0: // There are some hacks here to allow destruction off of michael@0: // the main thread. michael@0: TransportFlow::~TransportFlow() { michael@0: // Make sure that if we are off the right thread, we have michael@0: // no more attached signals. michael@0: if (!CheckThreadInt()) { michael@0: MOZ_ASSERT(SignalStateChange.is_empty()); michael@0: MOZ_ASSERT(SignalPacketReceived.is_empty()); michael@0: } michael@0: michael@0: // Push the destruction onto the STS thread. Note that there michael@0: // is still some possibility that someone is accessing this michael@0: // object simultaneously, but as long as smart pointer discipline michael@0: // is maintained, it shouldn't be possible to access and michael@0: // destroy it simultaneously. The conversion to an nsAutoPtr michael@0: // ensures automatic destruction of the queue at exit of michael@0: // DestroyFinal. michael@0: nsAutoPtr > layers_tmp(layers_.forget()); michael@0: RUN_ON_THREAD(target_, michael@0: WrapRunnableNM(&TransportFlow::DestroyFinal, layers_tmp), michael@0: NS_DISPATCH_NORMAL); michael@0: } michael@0: michael@0: void TransportFlow::DestroyFinal(nsAutoPtr > layers) { michael@0: ClearLayers(layers); michael@0: } michael@0: michael@0: void TransportFlow::ClearLayers(std::queue* layers) { michael@0: while (!layers->empty()) { michael@0: delete layers->front(); michael@0: layers->pop(); michael@0: } michael@0: } michael@0: michael@0: void TransportFlow::ClearLayers(std::deque* layers) { michael@0: while (!layers->empty()) { michael@0: delete layers->front(); michael@0: layers->pop_front(); michael@0: } michael@0: } michael@0: michael@0: nsresult TransportFlow::PushLayer(TransportLayer *layer) { michael@0: CheckThread(); michael@0: ScopedDeletePtr layer_tmp(layer); // Destroy on failure. michael@0: michael@0: // Don't allow pushes once we are in error state. michael@0: if (state_ == TransportLayer::TS_ERROR) { michael@0: MOZ_MTLOG(ML_ERROR, id_ + ": Can't call PushLayer in error state for flow"); michael@0: return NS_ERROR_FAILURE; michael@0: } michael@0: michael@0: nsresult rv = layer->Init(); michael@0: if (!NS_SUCCEEDED(rv)) { michael@0: // Destroy the rest of the flow, because it's no longer in an acceptable michael@0: // state. michael@0: ClearLayers(layers_.get()); michael@0: michael@0: // Set ourselves to have failed. michael@0: MOZ_MTLOG(ML_ERROR, id_ << ": Layer initialization failed; invalidating"); michael@0: StateChangeInt(TransportLayer::TS_ERROR); michael@0: michael@0: return rv; michael@0: } michael@0: EnsureSameThread(layer); michael@0: michael@0: TransportLayer *old_layer = layers_->empty() ? nullptr : layers_->front(); michael@0: michael@0: // Re-target my signals to the new layer michael@0: if (old_layer) { michael@0: old_layer->SignalStateChange.disconnect(this); michael@0: old_layer->SignalPacketReceived.disconnect(this); michael@0: } michael@0: layers_->push_front(layer_tmp.forget()); michael@0: layer->Inserted(this, old_layer); michael@0: michael@0: layer->SignalStateChange.connect(this, &TransportFlow::StateChange); michael@0: layer->SignalPacketReceived.connect(this, &TransportFlow::PacketReceived); michael@0: StateChangeInt(layer->state()); michael@0: michael@0: return NS_OK; michael@0: } michael@0: michael@0: // This is all-or-nothing. michael@0: nsresult TransportFlow::PushLayers(nsAutoPtr > layers) { michael@0: CheckThread(); michael@0: michael@0: MOZ_ASSERT(!layers->empty()); michael@0: if (layers->empty()) { michael@0: MOZ_MTLOG(ML_ERROR, id_ << ": Can't call PushLayers with empty layers"); michael@0: return NS_ERROR_INVALID_ARG; michael@0: } michael@0: michael@0: // Don't allow pushes once we are in error state. michael@0: if (state_ == TransportLayer::TS_ERROR) { michael@0: MOZ_MTLOG(ML_ERROR, michael@0: id_ << ": Can't call PushLayers in error state for flow "); michael@0: ClearLayers(layers.get()); michael@0: return NS_ERROR_FAILURE; michael@0: } michael@0: michael@0: nsresult rv = NS_OK; michael@0: michael@0: // Disconnect all the old signals. michael@0: disconnect_all(); michael@0: michael@0: TransportLayer *layer; michael@0: michael@0: while (!layers->empty()) { michael@0: TransportLayer *old_layer = layers_->empty() ? nullptr : layers_->front(); michael@0: layer = layers->front(); michael@0: michael@0: rv = layer->Init(); michael@0: if (NS_FAILED(rv)) { michael@0: MOZ_MTLOG(ML_ERROR, michael@0: id_ << ": Layer initialization failed; invalidating flow "); michael@0: break; michael@0: } michael@0: michael@0: EnsureSameThread(layer); michael@0: michael@0: // Push the layer onto the queue. michael@0: layers_->push_front(layer); michael@0: layers->pop(); michael@0: layer->Inserted(this, old_layer); michael@0: } michael@0: michael@0: if (NS_FAILED(rv)) { michael@0: // Destroy any layers we could not push. michael@0: ClearLayers(layers); michael@0: michael@0: // Now destroy the rest of the flow, because it's no longer michael@0: // in an acceptable state. michael@0: ClearLayers(layers_); michael@0: michael@0: // Set ourselves to have failed. michael@0: StateChangeInt(TransportLayer::TS_ERROR); michael@0: michael@0: // Return failure. michael@0: return rv; michael@0: } michael@0: michael@0: // Finally, attach ourselves to the top layer. michael@0: layer->SignalStateChange.connect(this, &TransportFlow::StateChange); michael@0: layer->SignalPacketReceived.connect(this, &TransportFlow::PacketReceived); michael@0: StateChangeInt(layer->state()); // Signals if the state changes. michael@0: michael@0: return NS_OK; michael@0: } michael@0: michael@0: TransportLayer *TransportFlow::top() const { michael@0: CheckThread(); michael@0: michael@0: return layers_->empty() ? nullptr : layers_->front(); michael@0: } michael@0: michael@0: TransportLayer *TransportFlow::GetLayer(const std::string& id) const { michael@0: CheckThread(); michael@0: michael@0: for (std::deque::const_iterator it = layers_->begin(); michael@0: it != layers_->end(); ++it) { michael@0: if ((*it)->id() == id) michael@0: return *it; michael@0: } michael@0: michael@0: return nullptr; michael@0: } michael@0: michael@0: TransportLayer::State TransportFlow::state() { michael@0: CheckThread(); michael@0: michael@0: return state_; michael@0: } michael@0: michael@0: TransportResult TransportFlow::SendPacket(const unsigned char *data, michael@0: size_t len) { michael@0: CheckThread(); michael@0: michael@0: if (state_ != TransportLayer::TS_OPEN) { michael@0: return TE_ERROR; michael@0: } michael@0: return top() ? top()->SendPacket(data, len) : TE_ERROR; michael@0: } michael@0: michael@0: bool TransportFlow::Contains(TransportLayer *layer) const { michael@0: if (layers_) { michael@0: for (auto l = layers_->begin(); l != layers_->end(); ++l) { michael@0: if (*l == layer) { michael@0: return true; michael@0: } michael@0: } michael@0: } michael@0: return false; michael@0: } michael@0: michael@0: void TransportFlow::EnsureSameThread(TransportLayer *layer) { michael@0: // Enforce that if any of the layers have a thread binding, michael@0: // they all have the same binding. michael@0: if (target_) { michael@0: const nsCOMPtr& lthread = layer->GetThread(); michael@0: michael@0: if (lthread && (lthread != target_)) michael@0: MOZ_CRASH(); michael@0: } michael@0: else { michael@0: target_ = layer->GetThread(); michael@0: } michael@0: } michael@0: michael@0: void TransportFlow::StateChangeInt(TransportLayer::State state) { michael@0: CheckThread(); michael@0: michael@0: if (state == state_) { michael@0: return; michael@0: } michael@0: michael@0: state_ = state; michael@0: SignalStateChange(this, state_); michael@0: } michael@0: michael@0: void TransportFlow::StateChange(TransportLayer *layer, michael@0: TransportLayer::State state) { michael@0: CheckThread(); michael@0: michael@0: StateChangeInt(state); michael@0: } michael@0: michael@0: void TransportFlow::PacketReceived(TransportLayer* layer, michael@0: const unsigned char *data, michael@0: size_t len) { michael@0: CheckThread(); michael@0: michael@0: SignalPacketReceived(this, data, len); michael@0: } michael@0: michael@0: } // close namespace