michael@0: /* michael@0: * Copyright (c) 2009-2012 Niels Provos, Nick Mathewson michael@0: * michael@0: * Redistribution and use in source and binary forms, with or without michael@0: * modification, are permitted provided that the following conditions michael@0: * are met: michael@0: * 1. Redistributions of source code must retain the above copyright michael@0: * notice, this list of conditions and the following disclaimer. michael@0: * 2. Redistributions in binary form must reproduce the above copyright michael@0: * notice, this list of conditions and the following disclaimer in the michael@0: * documentation and/or other materials provided with the distribution. michael@0: * 3. The name of the author may not be used to endorse or promote products michael@0: * derived from this software without specific prior written permission. michael@0: * michael@0: * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR michael@0: * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES michael@0: * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. michael@0: * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, michael@0: * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT michael@0: * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, michael@0: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY michael@0: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT michael@0: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF michael@0: * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. michael@0: */ michael@0: michael@0: #include michael@0: michael@0: #ifdef WIN32 michael@0: #include michael@0: #endif michael@0: michael@0: #include "event2/event-config.h" michael@0: michael@0: #include "event2/util.h" michael@0: #include "event2/buffer.h" michael@0: #include "event2/bufferevent.h" michael@0: #include "event2/bufferevent_struct.h" michael@0: #include "event2/event.h" michael@0: #include "defer-internal.h" michael@0: #include "bufferevent-internal.h" michael@0: #include "mm-internal.h" michael@0: #include "util-internal.h" michael@0: michael@0: struct bufferevent_pair { michael@0: struct bufferevent_private bev; michael@0: struct bufferevent_pair *partner; michael@0: }; michael@0: michael@0: michael@0: /* Given a bufferevent that's really a bev part of a bufferevent_pair, michael@0: * return that bufferevent_filtered. Returns NULL otherwise.*/ michael@0: static inline struct bufferevent_pair * michael@0: upcast(struct bufferevent *bev) michael@0: { michael@0: struct bufferevent_pair *bev_p; michael@0: if (bev->be_ops != &bufferevent_ops_pair) michael@0: return NULL; michael@0: bev_p = EVUTIL_UPCAST(bev, struct bufferevent_pair, bev.bev); michael@0: EVUTIL_ASSERT(bev_p->bev.bev.be_ops == &bufferevent_ops_pair); michael@0: return bev_p; michael@0: } michael@0: michael@0: #define downcast(bev_pair) (&(bev_pair)->bev.bev) michael@0: michael@0: static inline void michael@0: incref_and_lock(struct bufferevent *b) michael@0: { michael@0: struct bufferevent_pair *bevp; michael@0: _bufferevent_incref_and_lock(b); michael@0: bevp = upcast(b); michael@0: if (bevp->partner) michael@0: _bufferevent_incref_and_lock(downcast(bevp->partner)); michael@0: } michael@0: michael@0: static inline void michael@0: decref_and_unlock(struct bufferevent *b) michael@0: { michael@0: struct bufferevent_pair *bevp = upcast(b); michael@0: if (bevp->partner) michael@0: _bufferevent_decref_and_unlock(downcast(bevp->partner)); michael@0: _bufferevent_decref_and_unlock(b); michael@0: } michael@0: michael@0: /* XXX Handle close */ michael@0: michael@0: static void be_pair_outbuf_cb(struct evbuffer *, michael@0: const struct evbuffer_cb_info *, void *); michael@0: michael@0: static struct bufferevent_pair * michael@0: bufferevent_pair_elt_new(struct event_base *base, michael@0: int options) michael@0: { michael@0: struct bufferevent_pair *bufev; michael@0: if (! (bufev = mm_calloc(1, sizeof(struct bufferevent_pair)))) michael@0: return NULL; michael@0: if (bufferevent_init_common(&bufev->bev, base, &bufferevent_ops_pair, michael@0: options)) { michael@0: mm_free(bufev); michael@0: return NULL; michael@0: } michael@0: if (!evbuffer_add_cb(bufev->bev.bev.output, be_pair_outbuf_cb, bufev)) { michael@0: bufferevent_free(downcast(bufev)); michael@0: return NULL; michael@0: } michael@0: michael@0: _bufferevent_init_generic_timeout_cbs(&bufev->bev.bev); michael@0: michael@0: return bufev; michael@0: } michael@0: michael@0: int michael@0: bufferevent_pair_new(struct event_base *base, int options, michael@0: struct bufferevent *pair[2]) michael@0: { michael@0: struct bufferevent_pair *bufev1 = NULL, *bufev2 = NULL; michael@0: int tmp_options; michael@0: michael@0: options |= BEV_OPT_DEFER_CALLBACKS; michael@0: tmp_options = options & ~BEV_OPT_THREADSAFE; michael@0: michael@0: bufev1 = bufferevent_pair_elt_new(base, options); michael@0: if (!bufev1) michael@0: return -1; michael@0: bufev2 = bufferevent_pair_elt_new(base, tmp_options); michael@0: if (!bufev2) { michael@0: bufferevent_free(downcast(bufev1)); michael@0: return -1; michael@0: } michael@0: michael@0: if (options & BEV_OPT_THREADSAFE) { michael@0: /*XXXX check return */ michael@0: bufferevent_enable_locking(downcast(bufev2), bufev1->bev.lock); michael@0: } michael@0: michael@0: bufev1->partner = bufev2; michael@0: bufev2->partner = bufev1; michael@0: michael@0: evbuffer_freeze(downcast(bufev1)->input, 0); michael@0: evbuffer_freeze(downcast(bufev1)->output, 1); michael@0: evbuffer_freeze(downcast(bufev2)->input, 0); michael@0: evbuffer_freeze(downcast(bufev2)->output, 1); michael@0: michael@0: pair[0] = downcast(bufev1); michael@0: pair[1] = downcast(bufev2); michael@0: michael@0: return 0; michael@0: } michael@0: michael@0: static void michael@0: be_pair_transfer(struct bufferevent *src, struct bufferevent *dst, michael@0: int ignore_wm) michael@0: { michael@0: size_t src_size, dst_size; michael@0: size_t n; michael@0: michael@0: evbuffer_unfreeze(src->output, 1); michael@0: evbuffer_unfreeze(dst->input, 0); michael@0: michael@0: if (dst->wm_read.high) { michael@0: dst_size = evbuffer_get_length(dst->input); michael@0: if (dst_size < dst->wm_read.high) { michael@0: n = dst->wm_read.high - dst_size; michael@0: evbuffer_remove_buffer(src->output, dst->input, n); michael@0: } else { michael@0: if (!ignore_wm) michael@0: goto done; michael@0: n = evbuffer_get_length(src->output); michael@0: evbuffer_add_buffer(dst->input, src->output); michael@0: } michael@0: } else { michael@0: n = evbuffer_get_length(src->output); michael@0: evbuffer_add_buffer(dst->input, src->output); michael@0: } michael@0: michael@0: if (n) { michael@0: BEV_RESET_GENERIC_READ_TIMEOUT(dst); michael@0: michael@0: if (evbuffer_get_length(dst->output)) michael@0: BEV_RESET_GENERIC_WRITE_TIMEOUT(dst); michael@0: else michael@0: BEV_DEL_GENERIC_WRITE_TIMEOUT(dst); michael@0: } michael@0: michael@0: src_size = evbuffer_get_length(src->output); michael@0: dst_size = evbuffer_get_length(dst->input); michael@0: michael@0: if (dst_size >= dst->wm_read.low) { michael@0: _bufferevent_run_readcb(dst); michael@0: } michael@0: if (src_size <= src->wm_write.low) { michael@0: _bufferevent_run_writecb(src); michael@0: } michael@0: done: michael@0: evbuffer_freeze(src->output, 1); michael@0: evbuffer_freeze(dst->input, 0); michael@0: } michael@0: michael@0: static inline int michael@0: be_pair_wants_to_talk(struct bufferevent_pair *src, michael@0: struct bufferevent_pair *dst) michael@0: { michael@0: return (downcast(src)->enabled & EV_WRITE) && michael@0: (downcast(dst)->enabled & EV_READ) && michael@0: !dst->bev.read_suspended && michael@0: evbuffer_get_length(downcast(src)->output); michael@0: } michael@0: michael@0: static void michael@0: be_pair_outbuf_cb(struct evbuffer *outbuf, michael@0: const struct evbuffer_cb_info *info, void *arg) michael@0: { michael@0: struct bufferevent_pair *bev_pair = arg; michael@0: struct bufferevent_pair *partner = bev_pair->partner; michael@0: michael@0: incref_and_lock(downcast(bev_pair)); michael@0: michael@0: if (info->n_added > info->n_deleted && partner) { michael@0: /* We got more data. If the other side's reading, then michael@0: hand it over. */ michael@0: if (be_pair_wants_to_talk(bev_pair, partner)) { michael@0: be_pair_transfer(downcast(bev_pair), downcast(partner), 0); michael@0: } michael@0: } michael@0: michael@0: decref_and_unlock(downcast(bev_pair)); michael@0: } michael@0: michael@0: static int michael@0: be_pair_enable(struct bufferevent *bufev, short events) michael@0: { michael@0: struct bufferevent_pair *bev_p = upcast(bufev); michael@0: struct bufferevent_pair *partner = bev_p->partner; michael@0: michael@0: incref_and_lock(bufev); michael@0: michael@0: if (events & EV_READ) { michael@0: BEV_RESET_GENERIC_READ_TIMEOUT(bufev); michael@0: } michael@0: if ((events & EV_WRITE) && evbuffer_get_length(bufev->output)) michael@0: BEV_RESET_GENERIC_WRITE_TIMEOUT(bufev); michael@0: michael@0: /* We're starting to read! Does the other side have anything to write?*/ michael@0: if ((events & EV_READ) && partner && michael@0: be_pair_wants_to_talk(partner, bev_p)) { michael@0: be_pair_transfer(downcast(partner), bufev, 0); michael@0: } michael@0: /* We're starting to write! Does the other side want to read? */ michael@0: if ((events & EV_WRITE) && partner && michael@0: be_pair_wants_to_talk(bev_p, partner)) { michael@0: be_pair_transfer(bufev, downcast(partner), 0); michael@0: } michael@0: decref_and_unlock(bufev); michael@0: return 0; michael@0: } michael@0: michael@0: static int michael@0: be_pair_disable(struct bufferevent *bev, short events) michael@0: { michael@0: if (events & EV_READ) { michael@0: BEV_DEL_GENERIC_READ_TIMEOUT(bev); michael@0: } michael@0: if (events & EV_WRITE) michael@0: BEV_DEL_GENERIC_WRITE_TIMEOUT(bev); michael@0: return 0; michael@0: } michael@0: michael@0: static void michael@0: be_pair_destruct(struct bufferevent *bev) michael@0: { michael@0: struct bufferevent_pair *bev_p = upcast(bev); michael@0: michael@0: if (bev_p->partner) { michael@0: bev_p->partner->partner = NULL; michael@0: bev_p->partner = NULL; michael@0: } michael@0: michael@0: _bufferevent_del_generic_timeout_cbs(bev); michael@0: } michael@0: michael@0: static int michael@0: be_pair_flush(struct bufferevent *bev, short iotype, michael@0: enum bufferevent_flush_mode mode) michael@0: { michael@0: struct bufferevent_pair *bev_p = upcast(bev); michael@0: struct bufferevent *partner; michael@0: incref_and_lock(bev); michael@0: if (!bev_p->partner) michael@0: return -1; michael@0: michael@0: partner = downcast(bev_p->partner); michael@0: michael@0: if (mode == BEV_NORMAL) michael@0: return 0; michael@0: michael@0: if ((iotype & EV_READ) != 0) michael@0: be_pair_transfer(partner, bev, 1); michael@0: michael@0: if ((iotype & EV_WRITE) != 0) michael@0: be_pair_transfer(bev, partner, 1); michael@0: michael@0: if (mode == BEV_FINISHED) { michael@0: _bufferevent_run_eventcb(partner, iotype|BEV_EVENT_EOF); michael@0: } michael@0: decref_and_unlock(bev); michael@0: return 0; michael@0: } michael@0: michael@0: struct bufferevent * michael@0: bufferevent_pair_get_partner(struct bufferevent *bev) michael@0: { michael@0: struct bufferevent_pair *bev_p; michael@0: struct bufferevent *partner; michael@0: bev_p = upcast(bev); michael@0: if (! bev_p) michael@0: return NULL; michael@0: michael@0: incref_and_lock(bev); michael@0: partner = downcast(bev_p->partner); michael@0: decref_and_unlock(bev); michael@0: return partner; michael@0: } michael@0: michael@0: const struct bufferevent_ops bufferevent_ops_pair = { michael@0: "pair_elt", michael@0: evutil_offsetof(struct bufferevent_pair, bev.bev), michael@0: be_pair_enable, michael@0: be_pair_disable, michael@0: be_pair_destruct, michael@0: _bufferevent_generic_adj_timeouts, michael@0: be_pair_flush, michael@0: NULL, /* ctrl */ michael@0: };