michael@0: /* michael@0: * Copyright (c) 2000-2007 Niels Provos michael@0: * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson michael@0: * michael@0: * Redistribution and use in source and binary forms, with or without michael@0: * modification, are permitted provided that the following conditions michael@0: * are met: michael@0: * 1. Redistributions of source code must retain the above copyright michael@0: * notice, this list of conditions and the following disclaimer. michael@0: * 2. Redistributions in binary form must reproduce the above copyright michael@0: * notice, this list of conditions and the following disclaimer in the michael@0: * documentation and/or other materials provided with the distribution. michael@0: * 3. The name of the author may not be used to endorse or promote products michael@0: * derived from this software without specific prior written permission. michael@0: * michael@0: * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR michael@0: * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES michael@0: * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. michael@0: * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, michael@0: * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT michael@0: * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, michael@0: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY michael@0: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT michael@0: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF michael@0: * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. michael@0: */ michael@0: #include "event2/event-config.h" michael@0: michael@0: #ifdef WIN32 michael@0: #define WIN32_LEAN_AND_MEAN michael@0: #include michael@0: #include michael@0: #undef WIN32_LEAN_AND_MEAN michael@0: #endif michael@0: michael@0: #include michael@0: #ifndef WIN32 michael@0: #include michael@0: #endif michael@0: #ifdef _EVENT_HAVE_SYS_TIME_H michael@0: #include michael@0: #endif michael@0: #include michael@0: #include michael@0: #include michael@0: #ifndef WIN32 michael@0: #include michael@0: #endif michael@0: #include michael@0: #include michael@0: #include michael@0: michael@0: #include michael@0: michael@0: #include "event2/event.h" michael@0: #include "event2/event_struct.h" michael@0: #include "event2/rpc.h" michael@0: #include "event2/rpc_struct.h" michael@0: #include "evrpc-internal.h" michael@0: #include "event2/http.h" michael@0: #include "event2/buffer.h" michael@0: #include "event2/tag.h" michael@0: #include "event2/http_struct.h" michael@0: #include "event2/http_compat.h" michael@0: #include "event2/util.h" michael@0: #include "util-internal.h" michael@0: #include "log-internal.h" michael@0: #include "mm-internal.h" michael@0: michael@0: struct evrpc_base * michael@0: evrpc_init(struct evhttp *http_server) michael@0: { michael@0: struct evrpc_base* base = mm_calloc(1, sizeof(struct evrpc_base)); michael@0: if (base == NULL) michael@0: return (NULL); michael@0: michael@0: /* we rely on the tagging sub system */ michael@0: evtag_init(); michael@0: michael@0: TAILQ_INIT(&base->registered_rpcs); michael@0: TAILQ_INIT(&base->input_hooks); michael@0: TAILQ_INIT(&base->output_hooks); michael@0: michael@0: TAILQ_INIT(&base->paused_requests); michael@0: michael@0: base->http_server = http_server; michael@0: michael@0: return (base); michael@0: } michael@0: michael@0: void michael@0: evrpc_free(struct evrpc_base *base) michael@0: { michael@0: struct evrpc *rpc; michael@0: struct evrpc_hook *hook; michael@0: struct evrpc_hook_ctx *pause; michael@0: int r; michael@0: michael@0: while ((rpc = TAILQ_FIRST(&base->registered_rpcs)) != NULL) { michael@0: r = evrpc_unregister_rpc(base, rpc->uri); michael@0: EVUTIL_ASSERT(r == 0); michael@0: } michael@0: while ((pause = TAILQ_FIRST(&base->paused_requests)) != NULL) { michael@0: TAILQ_REMOVE(&base->paused_requests, pause, next); michael@0: mm_free(pause); michael@0: } michael@0: while ((hook = TAILQ_FIRST(&base->input_hooks)) != NULL) { michael@0: r = evrpc_remove_hook(base, EVRPC_INPUT, hook); michael@0: EVUTIL_ASSERT(r); michael@0: } michael@0: while ((hook = TAILQ_FIRST(&base->output_hooks)) != NULL) { michael@0: r = evrpc_remove_hook(base, EVRPC_OUTPUT, hook); michael@0: EVUTIL_ASSERT(r); michael@0: } michael@0: mm_free(base); michael@0: } michael@0: michael@0: void * michael@0: evrpc_add_hook(void *vbase, michael@0: enum EVRPC_HOOK_TYPE hook_type, michael@0: int (*cb)(void *, struct evhttp_request *, struct evbuffer *, void *), michael@0: void *cb_arg) michael@0: { michael@0: struct _evrpc_hooks *base = vbase; michael@0: struct evrpc_hook_list *head = NULL; michael@0: struct evrpc_hook *hook = NULL; michael@0: switch (hook_type) { michael@0: case EVRPC_INPUT: michael@0: head = &base->in_hooks; michael@0: break; michael@0: case EVRPC_OUTPUT: michael@0: head = &base->out_hooks; michael@0: break; michael@0: default: michael@0: EVUTIL_ASSERT(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT); michael@0: } michael@0: michael@0: hook = mm_calloc(1, sizeof(struct evrpc_hook)); michael@0: EVUTIL_ASSERT(hook != NULL); michael@0: michael@0: hook->process = cb; michael@0: hook->process_arg = cb_arg; michael@0: TAILQ_INSERT_TAIL(head, hook, next); michael@0: michael@0: return (hook); michael@0: } michael@0: michael@0: static int michael@0: evrpc_remove_hook_internal(struct evrpc_hook_list *head, void *handle) michael@0: { michael@0: struct evrpc_hook *hook = NULL; michael@0: TAILQ_FOREACH(hook, head, next) { michael@0: if (hook == handle) { michael@0: TAILQ_REMOVE(head, hook, next); michael@0: mm_free(hook); michael@0: return (1); michael@0: } michael@0: } michael@0: michael@0: return (0); michael@0: } michael@0: michael@0: /* michael@0: * remove the hook specified by the handle michael@0: */ michael@0: michael@0: int michael@0: evrpc_remove_hook(void *vbase, enum EVRPC_HOOK_TYPE hook_type, void *handle) michael@0: { michael@0: struct _evrpc_hooks *base = vbase; michael@0: struct evrpc_hook_list *head = NULL; michael@0: switch (hook_type) { michael@0: case EVRPC_INPUT: michael@0: head = &base->in_hooks; michael@0: break; michael@0: case EVRPC_OUTPUT: michael@0: head = &base->out_hooks; michael@0: break; michael@0: default: michael@0: EVUTIL_ASSERT(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT); michael@0: } michael@0: michael@0: return (evrpc_remove_hook_internal(head, handle)); michael@0: } michael@0: michael@0: static int michael@0: evrpc_process_hooks(struct evrpc_hook_list *head, void *ctx, michael@0: struct evhttp_request *req, struct evbuffer *evbuf) michael@0: { michael@0: struct evrpc_hook *hook; michael@0: TAILQ_FOREACH(hook, head, next) { michael@0: int res = hook->process(ctx, req, evbuf, hook->process_arg); michael@0: if (res != EVRPC_CONTINUE) michael@0: return (res); michael@0: } michael@0: michael@0: return (EVRPC_CONTINUE); michael@0: } michael@0: michael@0: static void evrpc_pool_schedule(struct evrpc_pool *pool); michael@0: static void evrpc_request_cb(struct evhttp_request *, void *); michael@0: michael@0: /* michael@0: * Registers a new RPC with the HTTP server. The evrpc object is expected michael@0: * to have been filled in via the EVRPC_REGISTER_OBJECT macro which in turn michael@0: * calls this function. michael@0: */ michael@0: michael@0: static char * michael@0: evrpc_construct_uri(const char *uri) michael@0: { michael@0: char *constructed_uri; michael@0: size_t constructed_uri_len; michael@0: michael@0: constructed_uri_len = strlen(EVRPC_URI_PREFIX) + strlen(uri) + 1; michael@0: if ((constructed_uri = mm_malloc(constructed_uri_len)) == NULL) michael@0: event_err(1, "%s: failed to register rpc at %s", michael@0: __func__, uri); michael@0: memcpy(constructed_uri, EVRPC_URI_PREFIX, strlen(EVRPC_URI_PREFIX)); michael@0: memcpy(constructed_uri + strlen(EVRPC_URI_PREFIX), uri, strlen(uri)); michael@0: constructed_uri[constructed_uri_len - 1] = '\0'; michael@0: michael@0: return (constructed_uri); michael@0: } michael@0: michael@0: int michael@0: evrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc, michael@0: void (*cb)(struct evrpc_req_generic *, void *), void *cb_arg) michael@0: { michael@0: char *constructed_uri = evrpc_construct_uri(rpc->uri); michael@0: michael@0: rpc->base = base; michael@0: rpc->cb = cb; michael@0: rpc->cb_arg = cb_arg; michael@0: michael@0: TAILQ_INSERT_TAIL(&base->registered_rpcs, rpc, next); michael@0: michael@0: evhttp_set_cb(base->http_server, michael@0: constructed_uri, michael@0: evrpc_request_cb, michael@0: rpc); michael@0: michael@0: mm_free(constructed_uri); michael@0: michael@0: return (0); michael@0: } michael@0: michael@0: int michael@0: evrpc_unregister_rpc(struct evrpc_base *base, const char *name) michael@0: { michael@0: char *registered_uri = NULL; michael@0: struct evrpc *rpc; michael@0: int r; michael@0: michael@0: /* find the right rpc; linear search might be slow */ michael@0: TAILQ_FOREACH(rpc, &base->registered_rpcs, next) { michael@0: if (strcmp(rpc->uri, name) == 0) michael@0: break; michael@0: } michael@0: if (rpc == NULL) { michael@0: /* We did not find an RPC with this name */ michael@0: return (-1); michael@0: } michael@0: TAILQ_REMOVE(&base->registered_rpcs, rpc, next); michael@0: michael@0: registered_uri = evrpc_construct_uri(name); michael@0: michael@0: /* remove the http server callback */ michael@0: r = evhttp_del_cb(base->http_server, registered_uri); michael@0: EVUTIL_ASSERT(r == 0); michael@0: michael@0: mm_free(registered_uri); michael@0: michael@0: mm_free((char *)rpc->uri); michael@0: mm_free(rpc); michael@0: return (0); michael@0: } michael@0: michael@0: static int evrpc_pause_request(void *vbase, void *ctx, michael@0: void (*cb)(void *, enum EVRPC_HOOK_RESULT)); michael@0: static void evrpc_request_cb_closure(void *, enum EVRPC_HOOK_RESULT); michael@0: michael@0: static void michael@0: evrpc_request_cb(struct evhttp_request *req, void *arg) michael@0: { michael@0: struct evrpc *rpc = arg; michael@0: struct evrpc_req_generic *rpc_state = NULL; michael@0: michael@0: /* let's verify the outside parameters */ michael@0: if (req->type != EVHTTP_REQ_POST || michael@0: evbuffer_get_length(req->input_buffer) <= 0) michael@0: goto error; michael@0: michael@0: rpc_state = mm_calloc(1, sizeof(struct evrpc_req_generic)); michael@0: if (rpc_state == NULL) michael@0: goto error; michael@0: rpc_state->rpc = rpc; michael@0: rpc_state->http_req = req; michael@0: rpc_state->rpc_data = NULL; michael@0: michael@0: if (TAILQ_FIRST(&rpc->base->input_hooks) != NULL) { michael@0: int hook_res; michael@0: michael@0: evrpc_hook_associate_meta(&rpc_state->hook_meta, req->evcon); michael@0: michael@0: /* michael@0: * allow hooks to modify the outgoing request michael@0: */ michael@0: hook_res = evrpc_process_hooks(&rpc->base->input_hooks, michael@0: rpc_state, req, req->input_buffer); michael@0: switch (hook_res) { michael@0: case EVRPC_TERMINATE: michael@0: goto error; michael@0: case EVRPC_PAUSE: michael@0: evrpc_pause_request(rpc->base, rpc_state, michael@0: evrpc_request_cb_closure); michael@0: return; michael@0: case EVRPC_CONTINUE: michael@0: break; michael@0: default: michael@0: EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || michael@0: hook_res == EVRPC_CONTINUE || michael@0: hook_res == EVRPC_PAUSE); michael@0: } michael@0: } michael@0: michael@0: evrpc_request_cb_closure(rpc_state, EVRPC_CONTINUE); michael@0: return; michael@0: michael@0: error: michael@0: if (rpc_state != NULL) michael@0: evrpc_reqstate_free(rpc_state); michael@0: evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); michael@0: return; michael@0: } michael@0: michael@0: static void michael@0: evrpc_request_cb_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) michael@0: { michael@0: struct evrpc_req_generic *rpc_state = arg; michael@0: struct evrpc *rpc; michael@0: struct evhttp_request *req; michael@0: michael@0: EVUTIL_ASSERT(rpc_state); michael@0: rpc = rpc_state->rpc; michael@0: req = rpc_state->http_req; michael@0: michael@0: if (hook_res == EVRPC_TERMINATE) michael@0: goto error; michael@0: michael@0: /* let's check that we can parse the request */ michael@0: rpc_state->request = rpc->request_new(rpc->request_new_arg); michael@0: if (rpc_state->request == NULL) michael@0: goto error; michael@0: michael@0: if (rpc->request_unmarshal( michael@0: rpc_state->request, req->input_buffer) == -1) { michael@0: /* we failed to parse the request; that's a bummer */ michael@0: goto error; michael@0: } michael@0: michael@0: /* at this point, we have a well formed request, prepare the reply */ michael@0: michael@0: rpc_state->reply = rpc->reply_new(rpc->reply_new_arg); michael@0: if (rpc_state->reply == NULL) michael@0: goto error; michael@0: michael@0: /* give the rpc to the user; they can deal with it */ michael@0: rpc->cb(rpc_state, rpc->cb_arg); michael@0: michael@0: return; michael@0: michael@0: error: michael@0: if (rpc_state != NULL) michael@0: evrpc_reqstate_free(rpc_state); michael@0: evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); michael@0: return; michael@0: } michael@0: michael@0: michael@0: void michael@0: evrpc_reqstate_free(struct evrpc_req_generic* rpc_state) michael@0: { michael@0: struct evrpc *rpc; michael@0: EVUTIL_ASSERT(rpc_state != NULL); michael@0: rpc = rpc_state->rpc; michael@0: michael@0: /* clean up all memory */ michael@0: if (rpc_state->hook_meta != NULL) michael@0: evrpc_hook_context_free(rpc_state->hook_meta); michael@0: if (rpc_state->request != NULL) michael@0: rpc->request_free(rpc_state->request); michael@0: if (rpc_state->reply != NULL) michael@0: rpc->reply_free(rpc_state->reply); michael@0: if (rpc_state->rpc_data != NULL) michael@0: evbuffer_free(rpc_state->rpc_data); michael@0: mm_free(rpc_state); michael@0: } michael@0: michael@0: static void michael@0: evrpc_request_done_closure(void *, enum EVRPC_HOOK_RESULT); michael@0: michael@0: void michael@0: evrpc_request_done(struct evrpc_req_generic *rpc_state) michael@0: { michael@0: struct evhttp_request *req; michael@0: struct evrpc *rpc; michael@0: michael@0: EVUTIL_ASSERT(rpc_state); michael@0: michael@0: req = rpc_state->http_req; michael@0: rpc = rpc_state->rpc; michael@0: michael@0: if (rpc->reply_complete(rpc_state->reply) == -1) { michael@0: /* the reply was not completely filled in. error out */ michael@0: goto error; michael@0: } michael@0: michael@0: if ((rpc_state->rpc_data = evbuffer_new()) == NULL) { michael@0: /* out of memory */ michael@0: goto error; michael@0: } michael@0: michael@0: /* serialize the reply */ michael@0: rpc->reply_marshal(rpc_state->rpc_data, rpc_state->reply); michael@0: michael@0: if (TAILQ_FIRST(&rpc->base->output_hooks) != NULL) { michael@0: int hook_res; michael@0: michael@0: evrpc_hook_associate_meta(&rpc_state->hook_meta, req->evcon); michael@0: michael@0: /* do hook based tweaks to the request */ michael@0: hook_res = evrpc_process_hooks(&rpc->base->output_hooks, michael@0: rpc_state, req, rpc_state->rpc_data); michael@0: switch (hook_res) { michael@0: case EVRPC_TERMINATE: michael@0: goto error; michael@0: case EVRPC_PAUSE: michael@0: if (evrpc_pause_request(rpc->base, rpc_state, michael@0: evrpc_request_done_closure) == -1) michael@0: goto error; michael@0: return; michael@0: case EVRPC_CONTINUE: michael@0: break; michael@0: default: michael@0: EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || michael@0: hook_res == EVRPC_CONTINUE || michael@0: hook_res == EVRPC_PAUSE); michael@0: } michael@0: } michael@0: michael@0: evrpc_request_done_closure(rpc_state, EVRPC_CONTINUE); michael@0: return; michael@0: michael@0: error: michael@0: if (rpc_state != NULL) michael@0: evrpc_reqstate_free(rpc_state); michael@0: evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); michael@0: return; michael@0: } michael@0: michael@0: void * michael@0: evrpc_get_request(struct evrpc_req_generic *req) michael@0: { michael@0: return req->request; michael@0: } michael@0: michael@0: void * michael@0: evrpc_get_reply(struct evrpc_req_generic *req) michael@0: { michael@0: return req->reply; michael@0: } michael@0: michael@0: static void michael@0: evrpc_request_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) michael@0: { michael@0: struct evrpc_req_generic *rpc_state = arg; michael@0: struct evhttp_request *req; michael@0: EVUTIL_ASSERT(rpc_state); michael@0: req = rpc_state->http_req; michael@0: michael@0: if (hook_res == EVRPC_TERMINATE) michael@0: goto error; michael@0: michael@0: /* on success, we are going to transmit marshaled binary data */ michael@0: if (evhttp_find_header(req->output_headers, "Content-Type") == NULL) { michael@0: evhttp_add_header(req->output_headers, michael@0: "Content-Type", "application/octet-stream"); michael@0: } michael@0: evhttp_send_reply(req, HTTP_OK, "OK", rpc_state->rpc_data); michael@0: michael@0: evrpc_reqstate_free(rpc_state); michael@0: michael@0: return; michael@0: michael@0: error: michael@0: if (rpc_state != NULL) michael@0: evrpc_reqstate_free(rpc_state); michael@0: evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); michael@0: return; michael@0: } michael@0: michael@0: michael@0: /* Client implementation of RPC site */ michael@0: michael@0: static int evrpc_schedule_request(struct evhttp_connection *connection, michael@0: struct evrpc_request_wrapper *ctx); michael@0: michael@0: struct evrpc_pool * michael@0: evrpc_pool_new(struct event_base *base) michael@0: { michael@0: struct evrpc_pool *pool = mm_calloc(1, sizeof(struct evrpc_pool)); michael@0: if (pool == NULL) michael@0: return (NULL); michael@0: michael@0: TAILQ_INIT(&pool->connections); michael@0: TAILQ_INIT(&pool->requests); michael@0: michael@0: TAILQ_INIT(&pool->paused_requests); michael@0: michael@0: TAILQ_INIT(&pool->input_hooks); michael@0: TAILQ_INIT(&pool->output_hooks); michael@0: michael@0: pool->base = base; michael@0: pool->timeout = -1; michael@0: michael@0: return (pool); michael@0: } michael@0: michael@0: static void michael@0: evrpc_request_wrapper_free(struct evrpc_request_wrapper *request) michael@0: { michael@0: if (request->hook_meta != NULL) michael@0: evrpc_hook_context_free(request->hook_meta); michael@0: mm_free(request->name); michael@0: mm_free(request); michael@0: } michael@0: michael@0: void michael@0: evrpc_pool_free(struct evrpc_pool *pool) michael@0: { michael@0: struct evhttp_connection *connection; michael@0: struct evrpc_request_wrapper *request; michael@0: struct evrpc_hook_ctx *pause; michael@0: struct evrpc_hook *hook; michael@0: int r; michael@0: michael@0: while ((request = TAILQ_FIRST(&pool->requests)) != NULL) { michael@0: TAILQ_REMOVE(&pool->requests, request, next); michael@0: evrpc_request_wrapper_free(request); michael@0: } michael@0: michael@0: while ((pause = TAILQ_FIRST(&pool->paused_requests)) != NULL) { michael@0: TAILQ_REMOVE(&pool->paused_requests, pause, next); michael@0: mm_free(pause); michael@0: } michael@0: michael@0: while ((connection = TAILQ_FIRST(&pool->connections)) != NULL) { michael@0: TAILQ_REMOVE(&pool->connections, connection, next); michael@0: evhttp_connection_free(connection); michael@0: } michael@0: michael@0: while ((hook = TAILQ_FIRST(&pool->input_hooks)) != NULL) { michael@0: r = evrpc_remove_hook(pool, EVRPC_INPUT, hook); michael@0: EVUTIL_ASSERT(r); michael@0: } michael@0: michael@0: while ((hook = TAILQ_FIRST(&pool->output_hooks)) != NULL) { michael@0: r = evrpc_remove_hook(pool, EVRPC_OUTPUT, hook); michael@0: EVUTIL_ASSERT(r); michael@0: } michael@0: michael@0: mm_free(pool); michael@0: } michael@0: michael@0: /* michael@0: * Add a connection to the RPC pool. A request scheduled on the pool michael@0: * may use any available connection. michael@0: */ michael@0: michael@0: void michael@0: evrpc_pool_add_connection(struct evrpc_pool *pool, michael@0: struct evhttp_connection *connection) michael@0: { michael@0: EVUTIL_ASSERT(connection->http_server == NULL); michael@0: TAILQ_INSERT_TAIL(&pool->connections, connection, next); michael@0: michael@0: /* michael@0: * associate an event base with this connection michael@0: */ michael@0: if (pool->base != NULL) michael@0: evhttp_connection_set_base(connection, pool->base); michael@0: michael@0: /* michael@0: * unless a timeout was specifically set for a connection, michael@0: * the connection inherits the timeout from the pool. michael@0: */ michael@0: if (connection->timeout == -1) michael@0: connection->timeout = pool->timeout; michael@0: michael@0: /* michael@0: * if we have any requests pending, schedule them with the new michael@0: * connections. michael@0: */ michael@0: michael@0: if (TAILQ_FIRST(&pool->requests) != NULL) { michael@0: struct evrpc_request_wrapper *request = michael@0: TAILQ_FIRST(&pool->requests); michael@0: TAILQ_REMOVE(&pool->requests, request, next); michael@0: evrpc_schedule_request(connection, request); michael@0: } michael@0: } michael@0: michael@0: void michael@0: evrpc_pool_remove_connection(struct evrpc_pool *pool, michael@0: struct evhttp_connection *connection) michael@0: { michael@0: TAILQ_REMOVE(&pool->connections, connection, next); michael@0: } michael@0: michael@0: void michael@0: evrpc_pool_set_timeout(struct evrpc_pool *pool, int timeout_in_secs) michael@0: { michael@0: struct evhttp_connection *evcon; michael@0: TAILQ_FOREACH(evcon, &pool->connections, next) { michael@0: evcon->timeout = timeout_in_secs; michael@0: } michael@0: pool->timeout = timeout_in_secs; michael@0: } michael@0: michael@0: michael@0: static void evrpc_reply_done(struct evhttp_request *, void *); michael@0: static void evrpc_request_timeout(evutil_socket_t, short, void *); michael@0: michael@0: /* michael@0: * Finds a connection object associated with the pool that is currently michael@0: * idle and can be used to make a request. michael@0: */ michael@0: static struct evhttp_connection * michael@0: evrpc_pool_find_connection(struct evrpc_pool *pool) michael@0: { michael@0: struct evhttp_connection *connection; michael@0: TAILQ_FOREACH(connection, &pool->connections, next) { michael@0: if (TAILQ_FIRST(&connection->requests) == NULL) michael@0: return (connection); michael@0: } michael@0: michael@0: return (NULL); michael@0: } michael@0: michael@0: /* michael@0: * Prototypes responsible for evrpc scheduling and hooking michael@0: */ michael@0: michael@0: static void evrpc_schedule_request_closure(void *ctx, enum EVRPC_HOOK_RESULT); michael@0: michael@0: /* michael@0: * We assume that the ctx is no longer queued on the pool. michael@0: */ michael@0: static int michael@0: evrpc_schedule_request(struct evhttp_connection *connection, michael@0: struct evrpc_request_wrapper *ctx) michael@0: { michael@0: struct evhttp_request *req = NULL; michael@0: struct evrpc_pool *pool = ctx->pool; michael@0: struct evrpc_status status; michael@0: michael@0: if ((req = evhttp_request_new(evrpc_reply_done, ctx)) == NULL) michael@0: goto error; michael@0: michael@0: /* serialize the request data into the output buffer */ michael@0: ctx->request_marshal(req->output_buffer, ctx->request); michael@0: michael@0: /* we need to know the connection that we might have to abort */ michael@0: ctx->evcon = connection; michael@0: michael@0: /* if we get paused we also need to know the request */ michael@0: ctx->req = req; michael@0: michael@0: if (TAILQ_FIRST(&pool->output_hooks) != NULL) { michael@0: int hook_res; michael@0: michael@0: evrpc_hook_associate_meta(&ctx->hook_meta, connection); michael@0: michael@0: /* apply hooks to the outgoing request */ michael@0: hook_res = evrpc_process_hooks(&pool->output_hooks, michael@0: ctx, req, req->output_buffer); michael@0: michael@0: switch (hook_res) { michael@0: case EVRPC_TERMINATE: michael@0: goto error; michael@0: case EVRPC_PAUSE: michael@0: /* we need to be explicitly resumed */ michael@0: if (evrpc_pause_request(pool, ctx, michael@0: evrpc_schedule_request_closure) == -1) michael@0: goto error; michael@0: return (0); michael@0: case EVRPC_CONTINUE: michael@0: /* we can just continue */ michael@0: break; michael@0: default: michael@0: EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || michael@0: hook_res == EVRPC_CONTINUE || michael@0: hook_res == EVRPC_PAUSE); michael@0: } michael@0: } michael@0: michael@0: evrpc_schedule_request_closure(ctx, EVRPC_CONTINUE); michael@0: return (0); michael@0: michael@0: error: michael@0: memset(&status, 0, sizeof(status)); michael@0: status.error = EVRPC_STATUS_ERR_UNSTARTED; michael@0: (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg); michael@0: evrpc_request_wrapper_free(ctx); michael@0: return (-1); michael@0: } michael@0: michael@0: static void michael@0: evrpc_schedule_request_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) michael@0: { michael@0: struct evrpc_request_wrapper *ctx = arg; michael@0: struct evhttp_connection *connection = ctx->evcon; michael@0: struct evhttp_request *req = ctx->req; michael@0: struct evrpc_pool *pool = ctx->pool; michael@0: struct evrpc_status status; michael@0: char *uri = NULL; michael@0: int res = 0; michael@0: michael@0: if (hook_res == EVRPC_TERMINATE) michael@0: goto error; michael@0: michael@0: uri = evrpc_construct_uri(ctx->name); michael@0: if (uri == NULL) michael@0: goto error; michael@0: michael@0: if (pool->timeout > 0) { michael@0: /* michael@0: * a timeout after which the whole rpc is going to be aborted. michael@0: */ michael@0: struct timeval tv; michael@0: evutil_timerclear(&tv); michael@0: tv.tv_sec = pool->timeout; michael@0: evtimer_add(&ctx->ev_timeout, &tv); michael@0: } michael@0: michael@0: /* start the request over the connection */ michael@0: res = evhttp_make_request(connection, req, EVHTTP_REQ_POST, uri); michael@0: mm_free(uri); michael@0: michael@0: if (res == -1) michael@0: goto error; michael@0: michael@0: return; michael@0: michael@0: error: michael@0: memset(&status, 0, sizeof(status)); michael@0: status.error = EVRPC_STATUS_ERR_UNSTARTED; michael@0: (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg); michael@0: evrpc_request_wrapper_free(ctx); michael@0: } michael@0: michael@0: /* we just queue the paused request on the pool under the req object */ michael@0: static int michael@0: evrpc_pause_request(void *vbase, void *ctx, michael@0: void (*cb)(void *, enum EVRPC_HOOK_RESULT)) michael@0: { michael@0: struct _evrpc_hooks *base = vbase; michael@0: struct evrpc_hook_ctx *pause = mm_malloc(sizeof(*pause)); michael@0: if (pause == NULL) michael@0: return (-1); michael@0: michael@0: pause->ctx = ctx; michael@0: pause->cb = cb; michael@0: michael@0: TAILQ_INSERT_TAIL(&base->pause_requests, pause, next); michael@0: return (0); michael@0: } michael@0: michael@0: int michael@0: evrpc_resume_request(void *vbase, void *ctx, enum EVRPC_HOOK_RESULT res) michael@0: { michael@0: struct _evrpc_hooks *base = vbase; michael@0: struct evrpc_pause_list *head = &base->pause_requests; michael@0: struct evrpc_hook_ctx *pause; michael@0: michael@0: TAILQ_FOREACH(pause, head, next) { michael@0: if (pause->ctx == ctx) michael@0: break; michael@0: } michael@0: michael@0: if (pause == NULL) michael@0: return (-1); michael@0: michael@0: (*pause->cb)(pause->ctx, res); michael@0: TAILQ_REMOVE(head, pause, next); michael@0: mm_free(pause); michael@0: return (0); michael@0: } michael@0: michael@0: int michael@0: evrpc_make_request(struct evrpc_request_wrapper *ctx) michael@0: { michael@0: struct evrpc_pool *pool = ctx->pool; michael@0: michael@0: /* initialize the event structure for this rpc */ michael@0: evtimer_assign(&ctx->ev_timeout, pool->base, evrpc_request_timeout, ctx); michael@0: michael@0: /* we better have some available connections on the pool */ michael@0: EVUTIL_ASSERT(TAILQ_FIRST(&pool->connections) != NULL); michael@0: michael@0: /* michael@0: * if no connection is available, we queue the request on the pool, michael@0: * the next time a connection is empty, the rpc will be send on that. michael@0: */ michael@0: TAILQ_INSERT_TAIL(&pool->requests, ctx, next); michael@0: michael@0: evrpc_pool_schedule(pool); michael@0: michael@0: return (0); michael@0: } michael@0: michael@0: michael@0: struct evrpc_request_wrapper * michael@0: evrpc_make_request_ctx( michael@0: struct evrpc_pool *pool, void *request, void *reply, michael@0: const char *rpcname, michael@0: void (*req_marshal)(struct evbuffer*, void *), michael@0: void (*rpl_clear)(void *), michael@0: int (*rpl_unmarshal)(void *, struct evbuffer *), michael@0: void (*cb)(struct evrpc_status *, void *, void *, void *), michael@0: void *cbarg) michael@0: { michael@0: struct evrpc_request_wrapper *ctx = (struct evrpc_request_wrapper *) michael@0: mm_malloc(sizeof(struct evrpc_request_wrapper)); michael@0: if (ctx == NULL) michael@0: return (NULL); michael@0: michael@0: ctx->pool = pool; michael@0: ctx->hook_meta = NULL; michael@0: ctx->evcon = NULL; michael@0: ctx->name = mm_strdup(rpcname); michael@0: if (ctx->name == NULL) { michael@0: mm_free(ctx); michael@0: return (NULL); michael@0: } michael@0: ctx->cb = cb; michael@0: ctx->cb_arg = cbarg; michael@0: ctx->request = request; michael@0: ctx->reply = reply; michael@0: ctx->request_marshal = req_marshal; michael@0: ctx->reply_clear = rpl_clear; michael@0: ctx->reply_unmarshal = rpl_unmarshal; michael@0: michael@0: return (ctx); michael@0: } michael@0: michael@0: static void michael@0: evrpc_reply_done_closure(void *, enum EVRPC_HOOK_RESULT); michael@0: michael@0: static void michael@0: evrpc_reply_done(struct evhttp_request *req, void *arg) michael@0: { michael@0: struct evrpc_request_wrapper *ctx = arg; michael@0: struct evrpc_pool *pool = ctx->pool; michael@0: int hook_res = EVRPC_CONTINUE; michael@0: michael@0: /* cancel any timeout we might have scheduled */ michael@0: event_del(&ctx->ev_timeout); michael@0: michael@0: ctx->req = req; michael@0: michael@0: /* we need to get the reply now */ michael@0: if (req == NULL) { michael@0: evrpc_reply_done_closure(ctx, EVRPC_CONTINUE); michael@0: return; michael@0: } michael@0: michael@0: if (TAILQ_FIRST(&pool->input_hooks) != NULL) { michael@0: evrpc_hook_associate_meta(&ctx->hook_meta, ctx->evcon); michael@0: michael@0: /* apply hooks to the incoming request */ michael@0: hook_res = evrpc_process_hooks(&pool->input_hooks, michael@0: ctx, req, req->input_buffer); michael@0: michael@0: switch (hook_res) { michael@0: case EVRPC_TERMINATE: michael@0: case EVRPC_CONTINUE: michael@0: break; michael@0: case EVRPC_PAUSE: michael@0: /* michael@0: * if we get paused we also need to know the michael@0: * request. unfortunately, the underlying michael@0: * layer is going to free it. we need to michael@0: * request ownership explicitly michael@0: */ michael@0: if (req != NULL) michael@0: evhttp_request_own(req); michael@0: michael@0: evrpc_pause_request(pool, ctx, michael@0: evrpc_reply_done_closure); michael@0: return; michael@0: default: michael@0: EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || michael@0: hook_res == EVRPC_CONTINUE || michael@0: hook_res == EVRPC_PAUSE); michael@0: } michael@0: } michael@0: michael@0: evrpc_reply_done_closure(ctx, hook_res); michael@0: michael@0: /* http request is being freed by underlying layer */ michael@0: } michael@0: michael@0: static void michael@0: evrpc_reply_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) michael@0: { michael@0: struct evrpc_request_wrapper *ctx = arg; michael@0: struct evhttp_request *req = ctx->req; michael@0: struct evrpc_pool *pool = ctx->pool; michael@0: struct evrpc_status status; michael@0: int res = -1; michael@0: michael@0: memset(&status, 0, sizeof(status)); michael@0: status.http_req = req; michael@0: michael@0: /* we need to get the reply now */ michael@0: if (req == NULL) { michael@0: status.error = EVRPC_STATUS_ERR_TIMEOUT; michael@0: } else if (hook_res == EVRPC_TERMINATE) { michael@0: status.error = EVRPC_STATUS_ERR_HOOKABORTED; michael@0: } else { michael@0: res = ctx->reply_unmarshal(ctx->reply, req->input_buffer); michael@0: if (res == -1) michael@0: status.error = EVRPC_STATUS_ERR_BADPAYLOAD; michael@0: } michael@0: michael@0: if (res == -1) { michael@0: /* clear everything that we might have written previously */ michael@0: ctx->reply_clear(ctx->reply); michael@0: } michael@0: michael@0: (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg); michael@0: michael@0: evrpc_request_wrapper_free(ctx); michael@0: michael@0: /* the http layer owned the original request structure, but if we michael@0: * got paused, we asked for ownership and need to free it here. */ michael@0: if (req != NULL && evhttp_request_is_owned(req)) michael@0: evhttp_request_free(req); michael@0: michael@0: /* see if we can schedule another request */ michael@0: evrpc_pool_schedule(pool); michael@0: } michael@0: michael@0: static void michael@0: evrpc_pool_schedule(struct evrpc_pool *pool) michael@0: { michael@0: struct evrpc_request_wrapper *ctx = TAILQ_FIRST(&pool->requests); michael@0: struct evhttp_connection *evcon; michael@0: michael@0: /* if no requests are pending, we have no work */ michael@0: if (ctx == NULL) michael@0: return; michael@0: michael@0: if ((evcon = evrpc_pool_find_connection(pool)) != NULL) { michael@0: TAILQ_REMOVE(&pool->requests, ctx, next); michael@0: evrpc_schedule_request(evcon, ctx); michael@0: } michael@0: } michael@0: michael@0: static void michael@0: evrpc_request_timeout(evutil_socket_t fd, short what, void *arg) michael@0: { michael@0: struct evrpc_request_wrapper *ctx = arg; michael@0: struct evhttp_connection *evcon = ctx->evcon; michael@0: EVUTIL_ASSERT(evcon != NULL); michael@0: michael@0: evhttp_connection_fail(evcon, EVCON_HTTP_TIMEOUT); michael@0: } michael@0: michael@0: /* michael@0: * frees potential meta data associated with a request. michael@0: */ michael@0: michael@0: static void michael@0: evrpc_meta_data_free(struct evrpc_meta_list *meta_data) michael@0: { michael@0: struct evrpc_meta *entry; michael@0: EVUTIL_ASSERT(meta_data != NULL); michael@0: michael@0: while ((entry = TAILQ_FIRST(meta_data)) != NULL) { michael@0: TAILQ_REMOVE(meta_data, entry, next); michael@0: mm_free(entry->key); michael@0: mm_free(entry->data); michael@0: mm_free(entry); michael@0: } michael@0: } michael@0: michael@0: static struct evrpc_hook_meta * michael@0: evrpc_hook_meta_new(void) michael@0: { michael@0: struct evrpc_hook_meta *ctx; michael@0: ctx = mm_malloc(sizeof(struct evrpc_hook_meta)); michael@0: EVUTIL_ASSERT(ctx != NULL); michael@0: michael@0: TAILQ_INIT(&ctx->meta_data); michael@0: ctx->evcon = NULL; michael@0: michael@0: return (ctx); michael@0: } michael@0: michael@0: static void michael@0: evrpc_hook_associate_meta(struct evrpc_hook_meta **pctx, michael@0: struct evhttp_connection *evcon) michael@0: { michael@0: struct evrpc_hook_meta *ctx = *pctx; michael@0: if (ctx == NULL) michael@0: *pctx = ctx = evrpc_hook_meta_new(); michael@0: ctx->evcon = evcon; michael@0: } michael@0: michael@0: static void michael@0: evrpc_hook_context_free(struct evrpc_hook_meta *ctx) michael@0: { michael@0: evrpc_meta_data_free(&ctx->meta_data); michael@0: mm_free(ctx); michael@0: } michael@0: michael@0: /* Adds meta data */ michael@0: void michael@0: evrpc_hook_add_meta(void *ctx, const char *key, michael@0: const void *data, size_t data_size) michael@0: { michael@0: struct evrpc_request_wrapper *req = ctx; michael@0: struct evrpc_hook_meta *store = NULL; michael@0: struct evrpc_meta *meta = NULL; michael@0: michael@0: if ((store = req->hook_meta) == NULL) michael@0: store = req->hook_meta = evrpc_hook_meta_new(); michael@0: michael@0: meta = mm_malloc(sizeof(struct evrpc_meta)); michael@0: EVUTIL_ASSERT(meta != NULL); michael@0: meta->key = mm_strdup(key); michael@0: EVUTIL_ASSERT(meta->key != NULL); michael@0: meta->data_size = data_size; michael@0: meta->data = mm_malloc(data_size); michael@0: EVUTIL_ASSERT(meta->data != NULL); michael@0: memcpy(meta->data, data, data_size); michael@0: michael@0: TAILQ_INSERT_TAIL(&store->meta_data, meta, next); michael@0: } michael@0: michael@0: int michael@0: evrpc_hook_find_meta(void *ctx, const char *key, void **data, size_t *data_size) michael@0: { michael@0: struct evrpc_request_wrapper *req = ctx; michael@0: struct evrpc_meta *meta = NULL; michael@0: michael@0: if (req->hook_meta == NULL) michael@0: return (-1); michael@0: michael@0: TAILQ_FOREACH(meta, &req->hook_meta->meta_data, next) { michael@0: if (strcmp(meta->key, key) == 0) { michael@0: *data = meta->data; michael@0: *data_size = meta->data_size; michael@0: return (0); michael@0: } michael@0: } michael@0: michael@0: return (-1); michael@0: } michael@0: michael@0: struct evhttp_connection * michael@0: evrpc_hook_get_connection(void *ctx) michael@0: { michael@0: struct evrpc_request_wrapper *req = ctx; michael@0: return (req->hook_meta != NULL ? req->hook_meta->evcon : NULL); michael@0: } michael@0: michael@0: int michael@0: evrpc_send_request_generic(struct evrpc_pool *pool, michael@0: void *request, void *reply, michael@0: void (*cb)(struct evrpc_status *, void *, void *, void *), michael@0: void *cb_arg, michael@0: const char *rpcname, michael@0: void (*req_marshal)(struct evbuffer *, void *), michael@0: void (*rpl_clear)(void *), michael@0: int (*rpl_unmarshal)(void *, struct evbuffer *)) michael@0: { michael@0: struct evrpc_status status; michael@0: struct evrpc_request_wrapper *ctx; michael@0: ctx = evrpc_make_request_ctx(pool, request, reply, michael@0: rpcname, req_marshal, rpl_clear, rpl_unmarshal, cb, cb_arg); michael@0: if (ctx == NULL) michael@0: goto error; michael@0: return (evrpc_make_request(ctx)); michael@0: error: michael@0: memset(&status, 0, sizeof(status)); michael@0: status.error = EVRPC_STATUS_ERR_UNSTARTED; michael@0: (*(cb))(&status, request, reply, cb_arg); michael@0: return (-1); michael@0: } michael@0: michael@0: /** Takes a request object and fills it in with the right magic */ michael@0: static struct evrpc * michael@0: evrpc_register_object(const char *name, michael@0: void *(*req_new)(void*), void *req_new_arg, void (*req_free)(void *), michael@0: int (*req_unmarshal)(void *, struct evbuffer *), michael@0: void *(*rpl_new)(void*), void *rpl_new_arg, void (*rpl_free)(void *), michael@0: int (*rpl_complete)(void *), michael@0: void (*rpl_marshal)(struct evbuffer *, void *)) michael@0: { michael@0: struct evrpc* rpc = (struct evrpc *)mm_calloc(1, sizeof(struct evrpc)); michael@0: if (rpc == NULL) michael@0: return (NULL); michael@0: rpc->uri = mm_strdup(name); michael@0: if (rpc->uri == NULL) { michael@0: mm_free(rpc); michael@0: return (NULL); michael@0: } michael@0: rpc->request_new = req_new; michael@0: rpc->request_new_arg = req_new_arg; michael@0: rpc->request_free = req_free; michael@0: rpc->request_unmarshal = req_unmarshal; michael@0: rpc->reply_new = rpl_new; michael@0: rpc->reply_new_arg = rpl_new_arg; michael@0: rpc->reply_free = rpl_free; michael@0: rpc->reply_complete = rpl_complete; michael@0: rpc->reply_marshal = rpl_marshal; michael@0: return (rpc); michael@0: } michael@0: michael@0: int michael@0: evrpc_register_generic(struct evrpc_base *base, const char *name, michael@0: void (*callback)(struct evrpc_req_generic *, void *), void *cbarg, michael@0: void *(*req_new)(void *), void *req_new_arg, void (*req_free)(void *), michael@0: int (*req_unmarshal)(void *, struct evbuffer *), michael@0: void *(*rpl_new)(void *), void *rpl_new_arg, void (*rpl_free)(void *), michael@0: int (*rpl_complete)(void *), michael@0: void (*rpl_marshal)(struct evbuffer *, void *)) michael@0: { michael@0: struct evrpc* rpc = michael@0: evrpc_register_object(name, req_new, req_new_arg, req_free, req_unmarshal, michael@0: rpl_new, rpl_new_arg, rpl_free, rpl_complete, rpl_marshal); michael@0: if (rpc == NULL) michael@0: return (-1); michael@0: evrpc_register_rpc(base, rpc, michael@0: (void (*)(struct evrpc_req_generic*, void *))callback, cbarg); michael@0: return (0); michael@0: } michael@0: michael@0: /** accessors for obscure and undocumented functionality */ michael@0: struct evrpc_pool * michael@0: evrpc_request_get_pool(struct evrpc_request_wrapper *ctx) michael@0: { michael@0: return (ctx->pool); michael@0: } michael@0: michael@0: void michael@0: evrpc_request_set_pool(struct evrpc_request_wrapper *ctx, michael@0: struct evrpc_pool *pool) michael@0: { michael@0: ctx->pool = pool; michael@0: } michael@0: michael@0: void michael@0: evrpc_request_set_cb(struct evrpc_request_wrapper *ctx, michael@0: void (*cb)(struct evrpc_status*, void *request, void *reply, void *arg), michael@0: void *cb_arg) michael@0: { michael@0: ctx->cb = cb; michael@0: ctx->cb_arg = cb_arg; michael@0: }