michael@0: // Copyright (c) 2006-2008 The Chromium Authors. All rights reserved. michael@0: // Use of this source code is governed by a BSD-style license that can be michael@0: // found in the LICENSE file. michael@0: michael@0: #include "base/message_loop.h" michael@0: #include "base/thread.h" michael@0: #include "chrome/common/ipc_channel_proxy.h" michael@0: #include "chrome/common/ipc_logging.h" michael@0: #include "chrome/common/ipc_message_utils.h" michael@0: michael@0: namespace IPC { michael@0: michael@0: //----------------------------------------------------------------------------- michael@0: michael@0: ChannelProxy::Context::Context(Channel::Listener* listener, michael@0: MessageFilter* filter, michael@0: MessageLoop* ipc_message_loop) michael@0: : listener_message_loop_(MessageLoop::current()), michael@0: listener_(listener), michael@0: ipc_message_loop_(ipc_message_loop), michael@0: channel_(NULL), michael@0: peer_pid_(0), michael@0: channel_connected_called_(false) { michael@0: if (filter) michael@0: filters_.push_back(filter); michael@0: } michael@0: michael@0: void ChannelProxy::Context::CreateChannel(const std::wstring& id, michael@0: const Channel::Mode& mode) { michael@0: DCHECK(channel_ == NULL); michael@0: channel_id_ = id; michael@0: channel_ = new Channel(id, mode, this); michael@0: } michael@0: michael@0: bool ChannelProxy::Context::TryFilters(const Message& message) { michael@0: #ifdef IPC_MESSAGE_LOG_ENABLED michael@0: Logging* logger = Logging::current(); michael@0: if (logger->Enabled()) michael@0: logger->OnPreDispatchMessage(message); michael@0: #endif michael@0: michael@0: for (size_t i = 0; i < filters_.size(); ++i) { michael@0: if (filters_[i]->OnMessageReceived(message)) { michael@0: #ifdef IPC_MESSAGE_LOG_ENABLED michael@0: if (logger->Enabled()) michael@0: logger->OnPostDispatchMessage(message, channel_id_); michael@0: #endif michael@0: return true; michael@0: } michael@0: } michael@0: return false; michael@0: } michael@0: michael@0: // Called on the IPC::Channel thread michael@0: void ChannelProxy::Context::OnMessageReceived(const Message& message) { michael@0: // First give a chance to the filters to process this message. michael@0: if (!TryFilters(message)) michael@0: OnMessageReceivedNoFilter(message); michael@0: } michael@0: michael@0: // Called on the IPC::Channel thread michael@0: void ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) { michael@0: // NOTE: This code relies on the listener's message loop not going away while michael@0: // this thread is active. That should be a reasonable assumption, but it michael@0: // feels risky. We may want to invent some more indirect way of referring to michael@0: // a MessageLoop if this becomes a problem. michael@0: listener_message_loop_->PostTask(FROM_HERE, NewRunnableMethod( michael@0: this, &Context::OnDispatchMessage, message)); michael@0: } michael@0: michael@0: // Called on the IPC::Channel thread michael@0: void ChannelProxy::Context::OnChannelConnected(int32_t peer_pid) { michael@0: peer_pid_ = peer_pid; michael@0: for (size_t i = 0; i < filters_.size(); ++i) michael@0: filters_[i]->OnChannelConnected(peer_pid); michael@0: michael@0: // See above comment about using listener_message_loop_ here. michael@0: listener_message_loop_->PostTask(FROM_HERE, NewRunnableMethod( michael@0: this, &Context::OnDispatchConnected)); michael@0: } michael@0: michael@0: // Called on the IPC::Channel thread michael@0: void ChannelProxy::Context::OnChannelError() { michael@0: for (size_t i = 0; i < filters_.size(); ++i) michael@0: filters_[i]->OnChannelError(); michael@0: michael@0: // See above comment about using listener_message_loop_ here. michael@0: listener_message_loop_->PostTask(FROM_HERE, NewRunnableMethod( michael@0: this, &Context::OnDispatchError)); michael@0: } michael@0: michael@0: // Called on the IPC::Channel thread michael@0: void ChannelProxy::Context::OnChannelOpened() { michael@0: DCHECK(channel_ != NULL); michael@0: michael@0: // Assume a reference to ourselves on behalf of this thread. This reference michael@0: // will be released when we are closed. michael@0: AddRef(); michael@0: michael@0: if (!channel_->Connect()) { michael@0: OnChannelError(); michael@0: return; michael@0: } michael@0: michael@0: for (size_t i = 0; i < filters_.size(); ++i) michael@0: filters_[i]->OnFilterAdded(channel_); michael@0: } michael@0: michael@0: // Called on the IPC::Channel thread michael@0: void ChannelProxy::Context::OnChannelClosed() { michael@0: // It's okay for IPC::ChannelProxy::Close to be called more than once, which michael@0: // would result in this branch being taken. michael@0: if (!channel_) michael@0: return; michael@0: michael@0: for (size_t i = 0; i < filters_.size(); ++i) { michael@0: filters_[i]->OnChannelClosing(); michael@0: filters_[i]->OnFilterRemoved(); michael@0: } michael@0: michael@0: // We don't need the filters anymore. michael@0: filters_.clear(); michael@0: michael@0: delete channel_; michael@0: channel_ = NULL; michael@0: michael@0: // Balance with the reference taken during startup. This may result in michael@0: // self-destruction. michael@0: Release(); michael@0: } michael@0: michael@0: // Called on the IPC::Channel thread michael@0: void ChannelProxy::Context::OnSendMessage(Message* message) { michael@0: if (!channel_->Send(message)) michael@0: OnChannelError(); michael@0: } michael@0: michael@0: // Called on the IPC::Channel thread michael@0: void ChannelProxy::Context::OnAddFilter(MessageFilter* filter) { michael@0: filters_.push_back(filter); michael@0: michael@0: // If the channel has already been created, then we need to send this message michael@0: // so that the filter gets access to the Channel. michael@0: if (channel_) michael@0: filter->OnFilterAdded(channel_); michael@0: michael@0: // Balances the AddRef in ChannelProxy::AddFilter. michael@0: filter->Release(); michael@0: } michael@0: michael@0: // Called on the IPC::Channel thread michael@0: void ChannelProxy::Context::OnRemoveFilter(MessageFilter* filter) { michael@0: for (size_t i = 0; i < filters_.size(); ++i) { michael@0: if (filters_[i].get() == filter) { michael@0: filter->OnFilterRemoved(); michael@0: filters_.erase(filters_.begin() + i); michael@0: return; michael@0: } michael@0: } michael@0: michael@0: NOTREACHED() << "filter to be removed not found"; michael@0: } michael@0: michael@0: // Called on the listener's thread michael@0: void ChannelProxy::Context::OnDispatchMessage(const Message& message) { michael@0: if (!listener_) michael@0: return; michael@0: michael@0: OnDispatchConnected(); michael@0: michael@0: #ifdef IPC_MESSAGE_LOG_ENABLED michael@0: Logging* logger = Logging::current(); michael@0: if (message.type() == IPC_LOGGING_ID) { michael@0: logger->OnReceivedLoggingMessage(message); michael@0: return; michael@0: } michael@0: michael@0: if (logger->Enabled()) michael@0: logger->OnPreDispatchMessage(message); michael@0: #endif michael@0: michael@0: listener_->OnMessageReceived(message); michael@0: michael@0: #ifdef IPC_MESSAGE_LOG_ENABLED michael@0: if (logger->Enabled()) michael@0: logger->OnPostDispatchMessage(message, channel_id_); michael@0: #endif michael@0: } michael@0: michael@0: // Called on the listener's thread michael@0: void ChannelProxy::Context::OnDispatchConnected() { michael@0: if (channel_connected_called_) michael@0: return; michael@0: michael@0: channel_connected_called_ = true; michael@0: if (listener_) michael@0: listener_->OnChannelConnected(peer_pid_); michael@0: } michael@0: michael@0: // Called on the listener's thread michael@0: void ChannelProxy::Context::OnDispatchError() { michael@0: if (listener_) michael@0: listener_->OnChannelError(); michael@0: } michael@0: michael@0: //----------------------------------------------------------------------------- michael@0: michael@0: ChannelProxy::ChannelProxy(const std::wstring& channel_id, Channel::Mode mode, michael@0: Channel::Listener* listener, MessageFilter* filter, michael@0: MessageLoop* ipc_thread) michael@0: : context_(new Context(listener, filter, ipc_thread)) { michael@0: Init(channel_id, mode, ipc_thread, true); michael@0: } michael@0: michael@0: ChannelProxy::ChannelProxy(const std::wstring& channel_id, Channel::Mode mode, michael@0: MessageLoop* ipc_thread, Context* context, michael@0: bool create_pipe_now) michael@0: : context_(context) { michael@0: Init(channel_id, mode, ipc_thread, create_pipe_now); michael@0: } michael@0: michael@0: void ChannelProxy::Init(const std::wstring& channel_id, Channel::Mode mode, michael@0: MessageLoop* ipc_thread_loop, bool create_pipe_now) { michael@0: if (create_pipe_now) { michael@0: // Create the channel immediately. This effectively sets up the michael@0: // low-level pipe so that the client can connect. Without creating michael@0: // the pipe immediately, it is possible for a listener to attempt michael@0: // to connect and get an error since the pipe doesn't exist yet. michael@0: context_->CreateChannel(channel_id, mode); michael@0: } else { michael@0: #if defined(OS_POSIX) michael@0: // TODO(playmobil): On POSIX, IPC::Channel uses a socketpair(), one side of michael@0: // which needs to be mapped into the child process' address space. michael@0: // To know the value of the client side FD we need to have already michael@0: // created a socketpair which currently occurs in IPC::Channel's michael@0: // constructor. michael@0: // If we lazilly construct the IPC::Channel then the caller has no way michael@0: // of knowing the FD #. michael@0: // michael@0: // We can solve this either by having the Channel's creation launch the michael@0: // subprocess itself or by creating the socketpair() externally. michael@0: NOTIMPLEMENTED(); michael@0: #endif // defined(OS_POSIX) michael@0: context_->ipc_message_loop()->PostTask(FROM_HERE, NewRunnableMethod( michael@0: context_.get(), &Context::CreateChannel, channel_id, mode)); michael@0: } michael@0: michael@0: // complete initialization on the background thread michael@0: context_->ipc_message_loop()->PostTask(FROM_HERE, NewRunnableMethod( michael@0: context_.get(), &Context::OnChannelOpened)); michael@0: } michael@0: michael@0: void ChannelProxy::Close() { michael@0: // Clear the backpointer to the listener so that any pending calls to michael@0: // Context::OnDispatchMessage or OnDispatchError will be ignored. It is michael@0: // possible that the channel could be closed while it is receiving messages! michael@0: context_->Clear(); michael@0: michael@0: if (context_->ipc_message_loop()) { michael@0: context_->ipc_message_loop()->PostTask(FROM_HERE, NewRunnableMethod( michael@0: context_.get(), &Context::OnChannelClosed)); michael@0: } michael@0: } michael@0: michael@0: bool ChannelProxy::Send(Message* message) { michael@0: #ifdef IPC_MESSAGE_LOG_ENABLED michael@0: Logging::current()->OnSendMessage(message, context_->channel_id()); michael@0: #endif michael@0: michael@0: context_->ipc_message_loop()->PostTask(FROM_HERE, NewRunnableMethod( michael@0: context_.get(), &Context::OnSendMessage, message)); michael@0: return true; michael@0: } michael@0: michael@0: void ChannelProxy::AddFilter(MessageFilter* filter) { michael@0: // We want to addref the filter to prevent it from michael@0: // being destroyed before the OnAddFilter call is invoked. michael@0: filter->AddRef(); michael@0: context_->ipc_message_loop()->PostTask(FROM_HERE, NewRunnableMethod( michael@0: context_.get(), &Context::OnAddFilter, filter)); michael@0: } michael@0: michael@0: void ChannelProxy::RemoveFilter(MessageFilter* filter) { michael@0: context_->ipc_message_loop()->PostTask(FROM_HERE, NewRunnableMethod( michael@0: context_.get(), &Context::OnRemoveFilter, filter)); michael@0: } michael@0: michael@0: #if defined(OS_POSIX) michael@0: // See the TODO regarding lazy initialization of the channel in michael@0: // ChannelProxy::Init(). michael@0: // We assume that IPC::Channel::GetClientFileDescriptorMapping() is thread-safe. michael@0: void ChannelProxy::GetClientFileDescriptorMapping(int *src_fd, michael@0: int *dest_fd) const { michael@0: Channel *channel = context_.get()->channel_; michael@0: DCHECK(channel); // Channel must have been created first. michael@0: channel->GetClientFileDescriptorMapping(src_fd, dest_fd); michael@0: } michael@0: #endif michael@0: michael@0: //----------------------------------------------------------------------------- michael@0: michael@0: } // namespace IPC