michael@0: // Copyright (c) 2006-2008 The Chromium Authors. All rights reserved. michael@0: // Use of this source code is governed by a BSD-style license that can be michael@0: // found in the LICENSE file. michael@0: michael@0: #include "chrome/common/ipc_channel_win.h" michael@0: michael@0: #include michael@0: #include michael@0: michael@0: #include "base/compiler_specific.h" michael@0: #include "base/logging.h" michael@0: #include "base/non_thread_safe.h" michael@0: #include "base/stats_counters.h" michael@0: #include "base/win_util.h" michael@0: #include "chrome/common/ipc_logging.h" michael@0: #include "chrome/common/ipc_message_utils.h" michael@0: #include "mozilla/ipc/ProtocolUtils.h" michael@0: michael@0: namespace IPC { michael@0: //------------------------------------------------------------------------------ michael@0: michael@0: Channel::ChannelImpl::State::State(ChannelImpl* channel) : is_pending(false) { michael@0: memset(&context.overlapped, 0, sizeof(context.overlapped)); michael@0: context.handler = channel; michael@0: } michael@0: michael@0: Channel::ChannelImpl::State::~State() { michael@0: COMPILE_ASSERT(!offsetof(Channel::ChannelImpl::State, context), michael@0: starts_with_io_context); michael@0: } michael@0: michael@0: //------------------------------------------------------------------------------ michael@0: michael@0: Channel::ChannelImpl::ChannelImpl(const std::wstring& channel_id, Mode mode, michael@0: Listener* listener) michael@0: : ALLOW_THIS_IN_INITIALIZER_LIST(input_state_(this)), michael@0: ALLOW_THIS_IN_INITIALIZER_LIST(output_state_(this)), michael@0: ALLOW_THIS_IN_INITIALIZER_LIST(factory_(this)) { michael@0: Init(mode, listener); michael@0: michael@0: if (!CreatePipe(channel_id, mode)) { michael@0: // The pipe may have been closed already. michael@0: CHROMIUM_LOG(WARNING) << "Unable to create pipe named \"" << channel_id << michael@0: "\" in " << (mode == 0 ? "server" : "client") << " mode."; michael@0: } michael@0: } michael@0: michael@0: Channel::ChannelImpl::ChannelImpl(const std::wstring& channel_id, michael@0: HANDLE server_pipe, michael@0: Mode mode, Listener* listener) michael@0: : ALLOW_THIS_IN_INITIALIZER_LIST(input_state_(this)), michael@0: ALLOW_THIS_IN_INITIALIZER_LIST(output_state_(this)), michael@0: ALLOW_THIS_IN_INITIALIZER_LIST(factory_(this)) { michael@0: Init(mode, listener); michael@0: michael@0: if (mode == MODE_SERVER) { michael@0: // Use the existing handle that was dup'd to us michael@0: pipe_ = server_pipe; michael@0: EnqueueHelloMessage(); michael@0: } else { michael@0: // Take the normal init path to connect to the server pipe michael@0: CreatePipe(channel_id, mode); michael@0: } michael@0: } michael@0: michael@0: void Channel::ChannelImpl::Init(Mode mode, Listener* listener) { michael@0: pipe_ = INVALID_HANDLE_VALUE; michael@0: listener_ = listener; michael@0: waiting_connect_ = (mode == MODE_SERVER); michael@0: processing_incoming_ = false; michael@0: closed_ = false; michael@0: output_queue_length_ = 0; michael@0: } michael@0: michael@0: void Channel::ChannelImpl::OutputQueuePush(Message* msg) michael@0: { michael@0: output_queue_.push(msg); michael@0: output_queue_length_++; michael@0: } michael@0: michael@0: void Channel::ChannelImpl::OutputQueuePop() michael@0: { michael@0: output_queue_.pop(); michael@0: output_queue_length_--; michael@0: } michael@0: michael@0: HANDLE Channel::ChannelImpl::GetServerPipeHandle() const { michael@0: return pipe_; michael@0: } michael@0: michael@0: void Channel::ChannelImpl::Close() { michael@0: if (thread_check_.get()) { michael@0: DCHECK(thread_check_->CalledOnValidThread()); michael@0: } michael@0: michael@0: bool waited = false; michael@0: if (input_state_.is_pending || output_state_.is_pending) { michael@0: CancelIo(pipe_); michael@0: waited = true; michael@0: } michael@0: michael@0: // Closing the handle at this point prevents us from issuing more requests michael@0: // form OnIOCompleted(). michael@0: if (pipe_ != INVALID_HANDLE_VALUE) { michael@0: CloseHandle(pipe_); michael@0: pipe_ = INVALID_HANDLE_VALUE; michael@0: } michael@0: michael@0: while (input_state_.is_pending || output_state_.is_pending) { michael@0: MessageLoopForIO::current()->WaitForIOCompletion(INFINITE, this); michael@0: } michael@0: michael@0: while (!output_queue_.empty()) { michael@0: Message* m = output_queue_.front(); michael@0: OutputQueuePop(); michael@0: delete m; michael@0: } michael@0: michael@0: if (thread_check_.get()) michael@0: thread_check_.reset(); michael@0: michael@0: closed_ = true; michael@0: } michael@0: michael@0: bool Channel::ChannelImpl::Send(Message* message) { michael@0: if (thread_check_.get()) { michael@0: DCHECK(thread_check_->CalledOnValidThread()); michael@0: } michael@0: #ifdef IPC_MESSAGE_DEBUG_EXTRA michael@0: DLOG(INFO) << "sending message @" << message << " on channel @" << this michael@0: << " with type " << message->type() michael@0: << " (" << output_queue_.size() << " in queue)"; michael@0: #endif michael@0: michael@0: #ifdef IPC_MESSAGE_LOG_ENABLED michael@0: Logging::current()->OnSendMessage(message, L""); michael@0: #endif michael@0: michael@0: if (closed_) { michael@0: if (mozilla::ipc::LoggingEnabled()) { michael@0: fprintf(stderr, "Can't send message %s, because this channel is closed.\n", michael@0: message->name()); michael@0: } michael@0: delete message; michael@0: return false; michael@0: } michael@0: michael@0: OutputQueuePush(message); michael@0: // ensure waiting to write michael@0: if (!waiting_connect_) { michael@0: if (!output_state_.is_pending) { michael@0: if (!ProcessOutgoingMessages(NULL, 0)) michael@0: return false; michael@0: } michael@0: } michael@0: michael@0: return true; michael@0: } michael@0: michael@0: const std::wstring Channel::ChannelImpl::PipeName( michael@0: const std::wstring& channel_id) const { michael@0: std::wostringstream ss; michael@0: // XXX(darin): get application name from somewhere else michael@0: ss << L"\\\\.\\pipe\\chrome." << channel_id; michael@0: return ss.str(); michael@0: } michael@0: michael@0: bool Channel::ChannelImpl::CreatePipe(const std::wstring& channel_id, michael@0: Mode mode) { michael@0: DCHECK(pipe_ == INVALID_HANDLE_VALUE); michael@0: const std::wstring pipe_name = PipeName(channel_id); michael@0: if (mode == MODE_SERVER) { michael@0: pipe_ = CreateNamedPipeW(pipe_name.c_str(), michael@0: PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED | michael@0: FILE_FLAG_FIRST_PIPE_INSTANCE, michael@0: PIPE_TYPE_BYTE | PIPE_READMODE_BYTE, michael@0: 1, // number of pipe instances michael@0: // output buffer size (XXX tune) michael@0: Channel::kReadBufferSize, michael@0: // input buffer size (XXX tune) michael@0: Channel::kReadBufferSize, michael@0: 5000, // timeout in milliseconds (XXX tune) michael@0: NULL); michael@0: } else { michael@0: pipe_ = CreateFileW(pipe_name.c_str(), michael@0: GENERIC_READ | GENERIC_WRITE, michael@0: 0, michael@0: NULL, michael@0: OPEN_EXISTING, michael@0: SECURITY_SQOS_PRESENT | SECURITY_IDENTIFICATION | michael@0: FILE_FLAG_OVERLAPPED, michael@0: NULL); michael@0: } michael@0: if (pipe_ == INVALID_HANDLE_VALUE) { michael@0: // If this process is being closed, the pipe may be gone already. michael@0: CHROMIUM_LOG(WARNING) << "failed to create pipe: " << GetLastError(); michael@0: return false; michael@0: } michael@0: michael@0: // Create the Hello message to be sent when Connect is called michael@0: return EnqueueHelloMessage(); michael@0: } michael@0: michael@0: bool Channel::ChannelImpl::EnqueueHelloMessage() { michael@0: scoped_ptr m(new Message(MSG_ROUTING_NONE, michael@0: HELLO_MESSAGE_TYPE, michael@0: IPC::Message::PRIORITY_NORMAL)); michael@0: if (!m->WriteInt(GetCurrentProcessId())) { michael@0: CloseHandle(pipe_); michael@0: pipe_ = INVALID_HANDLE_VALUE; michael@0: return false; michael@0: } michael@0: michael@0: OutputQueuePush(m.release()); michael@0: return true; michael@0: } michael@0: michael@0: bool Channel::ChannelImpl::Connect() { michael@0: if (!thread_check_.get()) michael@0: thread_check_.reset(new NonThreadSafe()); michael@0: michael@0: if (pipe_ == INVALID_HANDLE_VALUE) michael@0: return false; michael@0: michael@0: MessageLoopForIO::current()->RegisterIOHandler(pipe_, this); michael@0: michael@0: // Check to see if there is a client connected to our pipe... michael@0: if (waiting_connect_) michael@0: ProcessConnection(); michael@0: michael@0: if (!input_state_.is_pending) { michael@0: // Complete setup asynchronously. By not setting input_state_.is_pending michael@0: // to true, we indicate to OnIOCompleted that this is the special michael@0: // initialization signal. michael@0: MessageLoopForIO::current()->PostTask(FROM_HERE, factory_.NewRunnableMethod( michael@0: &Channel::ChannelImpl::OnIOCompleted, &input_state_.context, 0, 0)); michael@0: } michael@0: michael@0: if (!waiting_connect_) michael@0: ProcessOutgoingMessages(NULL, 0); michael@0: return true; michael@0: } michael@0: michael@0: bool Channel::ChannelImpl::ProcessConnection() { michael@0: DCHECK(thread_check_->CalledOnValidThread()); michael@0: if (input_state_.is_pending) michael@0: input_state_.is_pending = false; michael@0: michael@0: // Do we have a client connected to our pipe? michael@0: if (INVALID_HANDLE_VALUE == pipe_) michael@0: return false; michael@0: michael@0: BOOL ok = ConnectNamedPipe(pipe_, &input_state_.context.overlapped); michael@0: michael@0: DWORD err = GetLastError(); michael@0: if (ok) { michael@0: // Uhm, the API documentation says that this function should never michael@0: // return success when used in overlapped mode. michael@0: NOTREACHED(); michael@0: return false; michael@0: } michael@0: michael@0: switch (err) { michael@0: case ERROR_IO_PENDING: michael@0: input_state_.is_pending = true; michael@0: break; michael@0: case ERROR_PIPE_CONNECTED: michael@0: waiting_connect_ = false; michael@0: break; michael@0: default: michael@0: NOTREACHED(); michael@0: return false; michael@0: } michael@0: michael@0: return true; michael@0: } michael@0: michael@0: bool Channel::ChannelImpl::ProcessIncomingMessages( michael@0: MessageLoopForIO::IOContext* context, michael@0: DWORD bytes_read) { michael@0: DCHECK(thread_check_->CalledOnValidThread()); michael@0: if (input_state_.is_pending) { michael@0: input_state_.is_pending = false; michael@0: DCHECK(context); michael@0: michael@0: if (!context || !bytes_read) michael@0: return false; michael@0: } else { michael@0: // This happens at channel initialization. michael@0: DCHECK(!bytes_read && context == &input_state_.context); michael@0: } michael@0: michael@0: for (;;) { michael@0: if (bytes_read == 0) { michael@0: if (INVALID_HANDLE_VALUE == pipe_) michael@0: return false; michael@0: michael@0: // Read from pipe... michael@0: BOOL ok = ReadFile(pipe_, michael@0: input_buf_, michael@0: Channel::kReadBufferSize, michael@0: &bytes_read, michael@0: &input_state_.context.overlapped); michael@0: if (!ok) { michael@0: DWORD err = GetLastError(); michael@0: if (err == ERROR_IO_PENDING) { michael@0: input_state_.is_pending = true; michael@0: return true; michael@0: } michael@0: CHROMIUM_LOG(ERROR) << "pipe error: " << err; michael@0: return false; michael@0: } michael@0: input_state_.is_pending = true; michael@0: return true; michael@0: } michael@0: DCHECK(bytes_read); michael@0: michael@0: // Process messages from input buffer. michael@0: michael@0: const char* p, *end; michael@0: if (input_overflow_buf_.empty()) { michael@0: p = input_buf_; michael@0: end = p + bytes_read; michael@0: } else { michael@0: if (input_overflow_buf_.size() > (kMaximumMessageSize - bytes_read)) { michael@0: input_overflow_buf_.clear(); michael@0: CHROMIUM_LOG(ERROR) << "IPC message is too big"; michael@0: return false; michael@0: } michael@0: input_overflow_buf_.append(input_buf_, bytes_read); michael@0: p = input_overflow_buf_.data(); michael@0: end = p + input_overflow_buf_.size(); michael@0: } michael@0: michael@0: while (p < end) { michael@0: const char* message_tail = Message::FindNext(p, end); michael@0: if (message_tail) { michael@0: int len = static_cast(message_tail - p); michael@0: const Message m(p, len); michael@0: #ifdef IPC_MESSAGE_DEBUG_EXTRA michael@0: DLOG(INFO) << "received message on channel @" << this << michael@0: " with type " << m.type(); michael@0: #endif michael@0: if (m.routing_id() == MSG_ROUTING_NONE && michael@0: m.type() == HELLO_MESSAGE_TYPE) { michael@0: // The Hello message contains only the process id. michael@0: listener_->OnChannelConnected(MessageIterator(m).NextInt()); michael@0: } else { michael@0: listener_->OnMessageReceived(m); michael@0: } michael@0: p = message_tail; michael@0: } else { michael@0: // Last message is partial. michael@0: break; michael@0: } michael@0: } michael@0: input_overflow_buf_.assign(p, end - p); michael@0: michael@0: bytes_read = 0; // Get more data. michael@0: } michael@0: michael@0: return true; michael@0: } michael@0: michael@0: bool Channel::ChannelImpl::ProcessOutgoingMessages( michael@0: MessageLoopForIO::IOContext* context, michael@0: DWORD bytes_written) { michael@0: DCHECK(!waiting_connect_); // Why are we trying to send messages if there's michael@0: // no connection? michael@0: DCHECK(thread_check_->CalledOnValidThread()); michael@0: michael@0: if (output_state_.is_pending) { michael@0: DCHECK(context); michael@0: output_state_.is_pending = false; michael@0: if (!context || bytes_written == 0) { michael@0: DWORD err = GetLastError(); michael@0: CHROMIUM_LOG(ERROR) << "pipe error: " << err; michael@0: return false; michael@0: } michael@0: // Message was sent. michael@0: DCHECK(!output_queue_.empty()); michael@0: Message* m = output_queue_.front(); michael@0: OutputQueuePop(); michael@0: delete m; michael@0: } michael@0: michael@0: if (output_queue_.empty()) michael@0: return true; michael@0: michael@0: if (INVALID_HANDLE_VALUE == pipe_) michael@0: return false; michael@0: michael@0: // Write to pipe... michael@0: Message* m = output_queue_.front(); michael@0: BOOL ok = WriteFile(pipe_, michael@0: m->data(), michael@0: m->size(), michael@0: &bytes_written, michael@0: &output_state_.context.overlapped); michael@0: if (!ok) { michael@0: DWORD err = GetLastError(); michael@0: if (err == ERROR_IO_PENDING) { michael@0: output_state_.is_pending = true; michael@0: michael@0: #ifdef IPC_MESSAGE_DEBUG_EXTRA michael@0: DLOG(INFO) << "sent pending message @" << m << " on channel @" << michael@0: this << " with type " << m->type(); michael@0: #endif michael@0: michael@0: return true; michael@0: } michael@0: CHROMIUM_LOG(ERROR) << "pipe error: " << err; michael@0: return false; michael@0: } michael@0: michael@0: #ifdef IPC_MESSAGE_DEBUG_EXTRA michael@0: DLOG(INFO) << "sent message @" << m << " on channel @" << this << michael@0: " with type " << m->type(); michael@0: #endif michael@0: michael@0: output_state_.is_pending = true; michael@0: return true; michael@0: } michael@0: michael@0: void Channel::ChannelImpl::OnIOCompleted(MessageLoopForIO::IOContext* context, michael@0: DWORD bytes_transfered, DWORD error) { michael@0: bool ok; michael@0: DCHECK(thread_check_->CalledOnValidThread()); michael@0: if (context == &input_state_.context) { michael@0: if (waiting_connect_) { michael@0: if (!ProcessConnection()) michael@0: return; michael@0: // We may have some messages queued up to send... michael@0: if (!output_queue_.empty() && !output_state_.is_pending) michael@0: ProcessOutgoingMessages(NULL, 0); michael@0: if (input_state_.is_pending) michael@0: return; michael@0: // else, fall-through and look for incoming messages... michael@0: } michael@0: // we don't support recursion through OnMessageReceived yet! michael@0: DCHECK(!processing_incoming_); michael@0: processing_incoming_ = true; michael@0: ok = ProcessIncomingMessages(context, bytes_transfered); michael@0: processing_incoming_ = false; michael@0: } else { michael@0: DCHECK(context == &output_state_.context); michael@0: ok = ProcessOutgoingMessages(context, bytes_transfered); michael@0: } michael@0: if (!ok && INVALID_HANDLE_VALUE != pipe_) { michael@0: // We don't want to re-enter Close(). michael@0: Close(); michael@0: listener_->OnChannelError(); michael@0: } michael@0: } michael@0: michael@0: bool Channel::ChannelImpl::Unsound_IsClosed() const michael@0: { michael@0: return closed_; michael@0: } michael@0: michael@0: uint32_t Channel::ChannelImpl::Unsound_NumQueuedMessages() const michael@0: { michael@0: return output_queue_length_; michael@0: } michael@0: michael@0: //------------------------------------------------------------------------------ michael@0: // Channel's methods simply call through to ChannelImpl. michael@0: Channel::Channel(const std::wstring& channel_id, Mode mode, michael@0: Listener* listener) michael@0: : channel_impl_(new ChannelImpl(channel_id, mode, listener)) { michael@0: } michael@0: michael@0: Channel::Channel(const std::wstring& channel_id, void* server_pipe, michael@0: Mode mode, Listener* listener) michael@0: : channel_impl_(new ChannelImpl(channel_id, server_pipe, mode, listener)) { michael@0: } michael@0: michael@0: Channel::~Channel() { michael@0: delete channel_impl_; michael@0: } michael@0: michael@0: bool Channel::Connect() { michael@0: return channel_impl_->Connect(); michael@0: } michael@0: michael@0: void Channel::Close() { michael@0: channel_impl_->Close(); michael@0: } michael@0: michael@0: void* Channel::GetServerPipeHandle() const { michael@0: return channel_impl_->GetServerPipeHandle(); michael@0: } michael@0: michael@0: Channel::Listener* Channel::set_listener(Listener* listener) { michael@0: return channel_impl_->set_listener(listener); michael@0: } michael@0: michael@0: bool Channel::Send(Message* message) { michael@0: return channel_impl_->Send(message); michael@0: } michael@0: michael@0: bool Channel::Unsound_IsClosed() const { michael@0: return channel_impl_->Unsound_IsClosed(); michael@0: } michael@0: michael@0: uint32_t Channel::Unsound_NumQueuedMessages() const { michael@0: return channel_impl_->Unsound_NumQueuedMessages(); michael@0: } michael@0: michael@0: } // namespace IPC