1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/ipc/chromium/src/third_party/libevent/evrpc.c Wed Dec 31 06:09:35 2014 +0100 1.3 @@ -0,0 +1,1174 @@ 1.4 +/* 1.5 + * Copyright (c) 2000-2007 Niels Provos <provos@citi.umich.edu> 1.6 + * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson 1.7 + * 1.8 + * Redistribution and use in source and binary forms, with or without 1.9 + * modification, are permitted provided that the following conditions 1.10 + * are met: 1.11 + * 1. Redistributions of source code must retain the above copyright 1.12 + * notice, this list of conditions and the following disclaimer. 1.13 + * 2. Redistributions in binary form must reproduce the above copyright 1.14 + * notice, this list of conditions and the following disclaimer in the 1.15 + * documentation and/or other materials provided with the distribution. 1.16 + * 3. The name of the author may not be used to endorse or promote products 1.17 + * derived from this software without specific prior written permission. 1.18 + * 1.19 + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 1.20 + * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 1.21 + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 1.22 + * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 1.23 + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 1.24 + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 1.25 + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 1.26 + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 1.27 + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 1.28 + * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 1.29 + */ 1.30 +#include "event2/event-config.h" 1.31 + 1.32 +#ifdef WIN32 1.33 +#define WIN32_LEAN_AND_MEAN 1.34 +#include <winsock2.h> 1.35 +#include <windows.h> 1.36 +#undef WIN32_LEAN_AND_MEAN 1.37 +#endif 1.38 + 1.39 +#include <sys/types.h> 1.40 +#ifndef WIN32 1.41 +#include <sys/socket.h> 1.42 +#endif 1.43 +#ifdef _EVENT_HAVE_SYS_TIME_H 1.44 +#include <sys/time.h> 1.45 +#endif 1.46 +#include <sys/queue.h> 1.47 +#include <stdio.h> 1.48 +#include <stdlib.h> 1.49 +#ifndef WIN32 1.50 +#include <unistd.h> 1.51 +#endif 1.52 +#include <errno.h> 1.53 +#include <signal.h> 1.54 +#include <string.h> 1.55 + 1.56 +#include <sys/queue.h> 1.57 + 1.58 +#include "event2/event.h" 1.59 +#include "event2/event_struct.h" 1.60 +#include "event2/rpc.h" 1.61 +#include "event2/rpc_struct.h" 1.62 +#include "evrpc-internal.h" 1.63 +#include "event2/http.h" 1.64 +#include "event2/buffer.h" 1.65 +#include "event2/tag.h" 1.66 +#include "event2/http_struct.h" 1.67 +#include "event2/http_compat.h" 1.68 +#include "event2/util.h" 1.69 +#include "util-internal.h" 1.70 +#include "log-internal.h" 1.71 +#include "mm-internal.h" 1.72 + 1.73 +struct evrpc_base * 1.74 +evrpc_init(struct evhttp *http_server) 1.75 +{ 1.76 + struct evrpc_base* base = mm_calloc(1, sizeof(struct evrpc_base)); 1.77 + if (base == NULL) 1.78 + return (NULL); 1.79 + 1.80 + /* we rely on the tagging sub system */ 1.81 + evtag_init(); 1.82 + 1.83 + TAILQ_INIT(&base->registered_rpcs); 1.84 + TAILQ_INIT(&base->input_hooks); 1.85 + TAILQ_INIT(&base->output_hooks); 1.86 + 1.87 + TAILQ_INIT(&base->paused_requests); 1.88 + 1.89 + base->http_server = http_server; 1.90 + 1.91 + return (base); 1.92 +} 1.93 + 1.94 +void 1.95 +evrpc_free(struct evrpc_base *base) 1.96 +{ 1.97 + struct evrpc *rpc; 1.98 + struct evrpc_hook *hook; 1.99 + struct evrpc_hook_ctx *pause; 1.100 + int r; 1.101 + 1.102 + while ((rpc = TAILQ_FIRST(&base->registered_rpcs)) != NULL) { 1.103 + r = evrpc_unregister_rpc(base, rpc->uri); 1.104 + EVUTIL_ASSERT(r == 0); 1.105 + } 1.106 + while ((pause = TAILQ_FIRST(&base->paused_requests)) != NULL) { 1.107 + TAILQ_REMOVE(&base->paused_requests, pause, next); 1.108 + mm_free(pause); 1.109 + } 1.110 + while ((hook = TAILQ_FIRST(&base->input_hooks)) != NULL) { 1.111 + r = evrpc_remove_hook(base, EVRPC_INPUT, hook); 1.112 + EVUTIL_ASSERT(r); 1.113 + } 1.114 + while ((hook = TAILQ_FIRST(&base->output_hooks)) != NULL) { 1.115 + r = evrpc_remove_hook(base, EVRPC_OUTPUT, hook); 1.116 + EVUTIL_ASSERT(r); 1.117 + } 1.118 + mm_free(base); 1.119 +} 1.120 + 1.121 +void * 1.122 +evrpc_add_hook(void *vbase, 1.123 + enum EVRPC_HOOK_TYPE hook_type, 1.124 + int (*cb)(void *, struct evhttp_request *, struct evbuffer *, void *), 1.125 + void *cb_arg) 1.126 +{ 1.127 + struct _evrpc_hooks *base = vbase; 1.128 + struct evrpc_hook_list *head = NULL; 1.129 + struct evrpc_hook *hook = NULL; 1.130 + switch (hook_type) { 1.131 + case EVRPC_INPUT: 1.132 + head = &base->in_hooks; 1.133 + break; 1.134 + case EVRPC_OUTPUT: 1.135 + head = &base->out_hooks; 1.136 + break; 1.137 + default: 1.138 + EVUTIL_ASSERT(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT); 1.139 + } 1.140 + 1.141 + hook = mm_calloc(1, sizeof(struct evrpc_hook)); 1.142 + EVUTIL_ASSERT(hook != NULL); 1.143 + 1.144 + hook->process = cb; 1.145 + hook->process_arg = cb_arg; 1.146 + TAILQ_INSERT_TAIL(head, hook, next); 1.147 + 1.148 + return (hook); 1.149 +} 1.150 + 1.151 +static int 1.152 +evrpc_remove_hook_internal(struct evrpc_hook_list *head, void *handle) 1.153 +{ 1.154 + struct evrpc_hook *hook = NULL; 1.155 + TAILQ_FOREACH(hook, head, next) { 1.156 + if (hook == handle) { 1.157 + TAILQ_REMOVE(head, hook, next); 1.158 + mm_free(hook); 1.159 + return (1); 1.160 + } 1.161 + } 1.162 + 1.163 + return (0); 1.164 +} 1.165 + 1.166 +/* 1.167 + * remove the hook specified by the handle 1.168 + */ 1.169 + 1.170 +int 1.171 +evrpc_remove_hook(void *vbase, enum EVRPC_HOOK_TYPE hook_type, void *handle) 1.172 +{ 1.173 + struct _evrpc_hooks *base = vbase; 1.174 + struct evrpc_hook_list *head = NULL; 1.175 + switch (hook_type) { 1.176 + case EVRPC_INPUT: 1.177 + head = &base->in_hooks; 1.178 + break; 1.179 + case EVRPC_OUTPUT: 1.180 + head = &base->out_hooks; 1.181 + break; 1.182 + default: 1.183 + EVUTIL_ASSERT(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT); 1.184 + } 1.185 + 1.186 + return (evrpc_remove_hook_internal(head, handle)); 1.187 +} 1.188 + 1.189 +static int 1.190 +evrpc_process_hooks(struct evrpc_hook_list *head, void *ctx, 1.191 + struct evhttp_request *req, struct evbuffer *evbuf) 1.192 +{ 1.193 + struct evrpc_hook *hook; 1.194 + TAILQ_FOREACH(hook, head, next) { 1.195 + int res = hook->process(ctx, req, evbuf, hook->process_arg); 1.196 + if (res != EVRPC_CONTINUE) 1.197 + return (res); 1.198 + } 1.199 + 1.200 + return (EVRPC_CONTINUE); 1.201 +} 1.202 + 1.203 +static void evrpc_pool_schedule(struct evrpc_pool *pool); 1.204 +static void evrpc_request_cb(struct evhttp_request *, void *); 1.205 + 1.206 +/* 1.207 + * Registers a new RPC with the HTTP server. The evrpc object is expected 1.208 + * to have been filled in via the EVRPC_REGISTER_OBJECT macro which in turn 1.209 + * calls this function. 1.210 + */ 1.211 + 1.212 +static char * 1.213 +evrpc_construct_uri(const char *uri) 1.214 +{ 1.215 + char *constructed_uri; 1.216 + size_t constructed_uri_len; 1.217 + 1.218 + constructed_uri_len = strlen(EVRPC_URI_PREFIX) + strlen(uri) + 1; 1.219 + if ((constructed_uri = mm_malloc(constructed_uri_len)) == NULL) 1.220 + event_err(1, "%s: failed to register rpc at %s", 1.221 + __func__, uri); 1.222 + memcpy(constructed_uri, EVRPC_URI_PREFIX, strlen(EVRPC_URI_PREFIX)); 1.223 + memcpy(constructed_uri + strlen(EVRPC_URI_PREFIX), uri, strlen(uri)); 1.224 + constructed_uri[constructed_uri_len - 1] = '\0'; 1.225 + 1.226 + return (constructed_uri); 1.227 +} 1.228 + 1.229 +int 1.230 +evrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc, 1.231 + void (*cb)(struct evrpc_req_generic *, void *), void *cb_arg) 1.232 +{ 1.233 + char *constructed_uri = evrpc_construct_uri(rpc->uri); 1.234 + 1.235 + rpc->base = base; 1.236 + rpc->cb = cb; 1.237 + rpc->cb_arg = cb_arg; 1.238 + 1.239 + TAILQ_INSERT_TAIL(&base->registered_rpcs, rpc, next); 1.240 + 1.241 + evhttp_set_cb(base->http_server, 1.242 + constructed_uri, 1.243 + evrpc_request_cb, 1.244 + rpc); 1.245 + 1.246 + mm_free(constructed_uri); 1.247 + 1.248 + return (0); 1.249 +} 1.250 + 1.251 +int 1.252 +evrpc_unregister_rpc(struct evrpc_base *base, const char *name) 1.253 +{ 1.254 + char *registered_uri = NULL; 1.255 + struct evrpc *rpc; 1.256 + int r; 1.257 + 1.258 + /* find the right rpc; linear search might be slow */ 1.259 + TAILQ_FOREACH(rpc, &base->registered_rpcs, next) { 1.260 + if (strcmp(rpc->uri, name) == 0) 1.261 + break; 1.262 + } 1.263 + if (rpc == NULL) { 1.264 + /* We did not find an RPC with this name */ 1.265 + return (-1); 1.266 + } 1.267 + TAILQ_REMOVE(&base->registered_rpcs, rpc, next); 1.268 + 1.269 + registered_uri = evrpc_construct_uri(name); 1.270 + 1.271 + /* remove the http server callback */ 1.272 + r = evhttp_del_cb(base->http_server, registered_uri); 1.273 + EVUTIL_ASSERT(r == 0); 1.274 + 1.275 + mm_free(registered_uri); 1.276 + 1.277 + mm_free((char *)rpc->uri); 1.278 + mm_free(rpc); 1.279 + return (0); 1.280 +} 1.281 + 1.282 +static int evrpc_pause_request(void *vbase, void *ctx, 1.283 + void (*cb)(void *, enum EVRPC_HOOK_RESULT)); 1.284 +static void evrpc_request_cb_closure(void *, enum EVRPC_HOOK_RESULT); 1.285 + 1.286 +static void 1.287 +evrpc_request_cb(struct evhttp_request *req, void *arg) 1.288 +{ 1.289 + struct evrpc *rpc = arg; 1.290 + struct evrpc_req_generic *rpc_state = NULL; 1.291 + 1.292 + /* let's verify the outside parameters */ 1.293 + if (req->type != EVHTTP_REQ_POST || 1.294 + evbuffer_get_length(req->input_buffer) <= 0) 1.295 + goto error; 1.296 + 1.297 + rpc_state = mm_calloc(1, sizeof(struct evrpc_req_generic)); 1.298 + if (rpc_state == NULL) 1.299 + goto error; 1.300 + rpc_state->rpc = rpc; 1.301 + rpc_state->http_req = req; 1.302 + rpc_state->rpc_data = NULL; 1.303 + 1.304 + if (TAILQ_FIRST(&rpc->base->input_hooks) != NULL) { 1.305 + int hook_res; 1.306 + 1.307 + evrpc_hook_associate_meta(&rpc_state->hook_meta, req->evcon); 1.308 + 1.309 + /* 1.310 + * allow hooks to modify the outgoing request 1.311 + */ 1.312 + hook_res = evrpc_process_hooks(&rpc->base->input_hooks, 1.313 + rpc_state, req, req->input_buffer); 1.314 + switch (hook_res) { 1.315 + case EVRPC_TERMINATE: 1.316 + goto error; 1.317 + case EVRPC_PAUSE: 1.318 + evrpc_pause_request(rpc->base, rpc_state, 1.319 + evrpc_request_cb_closure); 1.320 + return; 1.321 + case EVRPC_CONTINUE: 1.322 + break; 1.323 + default: 1.324 + EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || 1.325 + hook_res == EVRPC_CONTINUE || 1.326 + hook_res == EVRPC_PAUSE); 1.327 + } 1.328 + } 1.329 + 1.330 + evrpc_request_cb_closure(rpc_state, EVRPC_CONTINUE); 1.331 + return; 1.332 + 1.333 +error: 1.334 + if (rpc_state != NULL) 1.335 + evrpc_reqstate_free(rpc_state); 1.336 + evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); 1.337 + return; 1.338 +} 1.339 + 1.340 +static void 1.341 +evrpc_request_cb_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) 1.342 +{ 1.343 + struct evrpc_req_generic *rpc_state = arg; 1.344 + struct evrpc *rpc; 1.345 + struct evhttp_request *req; 1.346 + 1.347 + EVUTIL_ASSERT(rpc_state); 1.348 + rpc = rpc_state->rpc; 1.349 + req = rpc_state->http_req; 1.350 + 1.351 + if (hook_res == EVRPC_TERMINATE) 1.352 + goto error; 1.353 + 1.354 + /* let's check that we can parse the request */ 1.355 + rpc_state->request = rpc->request_new(rpc->request_new_arg); 1.356 + if (rpc_state->request == NULL) 1.357 + goto error; 1.358 + 1.359 + if (rpc->request_unmarshal( 1.360 + rpc_state->request, req->input_buffer) == -1) { 1.361 + /* we failed to parse the request; that's a bummer */ 1.362 + goto error; 1.363 + } 1.364 + 1.365 + /* at this point, we have a well formed request, prepare the reply */ 1.366 + 1.367 + rpc_state->reply = rpc->reply_new(rpc->reply_new_arg); 1.368 + if (rpc_state->reply == NULL) 1.369 + goto error; 1.370 + 1.371 + /* give the rpc to the user; they can deal with it */ 1.372 + rpc->cb(rpc_state, rpc->cb_arg); 1.373 + 1.374 + return; 1.375 + 1.376 +error: 1.377 + if (rpc_state != NULL) 1.378 + evrpc_reqstate_free(rpc_state); 1.379 + evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); 1.380 + return; 1.381 +} 1.382 + 1.383 + 1.384 +void 1.385 +evrpc_reqstate_free(struct evrpc_req_generic* rpc_state) 1.386 +{ 1.387 + struct evrpc *rpc; 1.388 + EVUTIL_ASSERT(rpc_state != NULL); 1.389 + rpc = rpc_state->rpc; 1.390 + 1.391 + /* clean up all memory */ 1.392 + if (rpc_state->hook_meta != NULL) 1.393 + evrpc_hook_context_free(rpc_state->hook_meta); 1.394 + if (rpc_state->request != NULL) 1.395 + rpc->request_free(rpc_state->request); 1.396 + if (rpc_state->reply != NULL) 1.397 + rpc->reply_free(rpc_state->reply); 1.398 + if (rpc_state->rpc_data != NULL) 1.399 + evbuffer_free(rpc_state->rpc_data); 1.400 + mm_free(rpc_state); 1.401 +} 1.402 + 1.403 +static void 1.404 +evrpc_request_done_closure(void *, enum EVRPC_HOOK_RESULT); 1.405 + 1.406 +void 1.407 +evrpc_request_done(struct evrpc_req_generic *rpc_state) 1.408 +{ 1.409 + struct evhttp_request *req; 1.410 + struct evrpc *rpc; 1.411 + 1.412 + EVUTIL_ASSERT(rpc_state); 1.413 + 1.414 + req = rpc_state->http_req; 1.415 + rpc = rpc_state->rpc; 1.416 + 1.417 + if (rpc->reply_complete(rpc_state->reply) == -1) { 1.418 + /* the reply was not completely filled in. error out */ 1.419 + goto error; 1.420 + } 1.421 + 1.422 + if ((rpc_state->rpc_data = evbuffer_new()) == NULL) { 1.423 + /* out of memory */ 1.424 + goto error; 1.425 + } 1.426 + 1.427 + /* serialize the reply */ 1.428 + rpc->reply_marshal(rpc_state->rpc_data, rpc_state->reply); 1.429 + 1.430 + if (TAILQ_FIRST(&rpc->base->output_hooks) != NULL) { 1.431 + int hook_res; 1.432 + 1.433 + evrpc_hook_associate_meta(&rpc_state->hook_meta, req->evcon); 1.434 + 1.435 + /* do hook based tweaks to the request */ 1.436 + hook_res = evrpc_process_hooks(&rpc->base->output_hooks, 1.437 + rpc_state, req, rpc_state->rpc_data); 1.438 + switch (hook_res) { 1.439 + case EVRPC_TERMINATE: 1.440 + goto error; 1.441 + case EVRPC_PAUSE: 1.442 + if (evrpc_pause_request(rpc->base, rpc_state, 1.443 + evrpc_request_done_closure) == -1) 1.444 + goto error; 1.445 + return; 1.446 + case EVRPC_CONTINUE: 1.447 + break; 1.448 + default: 1.449 + EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || 1.450 + hook_res == EVRPC_CONTINUE || 1.451 + hook_res == EVRPC_PAUSE); 1.452 + } 1.453 + } 1.454 + 1.455 + evrpc_request_done_closure(rpc_state, EVRPC_CONTINUE); 1.456 + return; 1.457 + 1.458 +error: 1.459 + if (rpc_state != NULL) 1.460 + evrpc_reqstate_free(rpc_state); 1.461 + evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); 1.462 + return; 1.463 +} 1.464 + 1.465 +void * 1.466 +evrpc_get_request(struct evrpc_req_generic *req) 1.467 +{ 1.468 + return req->request; 1.469 +} 1.470 + 1.471 +void * 1.472 +evrpc_get_reply(struct evrpc_req_generic *req) 1.473 +{ 1.474 + return req->reply; 1.475 +} 1.476 + 1.477 +static void 1.478 +evrpc_request_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) 1.479 +{ 1.480 + struct evrpc_req_generic *rpc_state = arg; 1.481 + struct evhttp_request *req; 1.482 + EVUTIL_ASSERT(rpc_state); 1.483 + req = rpc_state->http_req; 1.484 + 1.485 + if (hook_res == EVRPC_TERMINATE) 1.486 + goto error; 1.487 + 1.488 + /* on success, we are going to transmit marshaled binary data */ 1.489 + if (evhttp_find_header(req->output_headers, "Content-Type") == NULL) { 1.490 + evhttp_add_header(req->output_headers, 1.491 + "Content-Type", "application/octet-stream"); 1.492 + } 1.493 + evhttp_send_reply(req, HTTP_OK, "OK", rpc_state->rpc_data); 1.494 + 1.495 + evrpc_reqstate_free(rpc_state); 1.496 + 1.497 + return; 1.498 + 1.499 +error: 1.500 + if (rpc_state != NULL) 1.501 + evrpc_reqstate_free(rpc_state); 1.502 + evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); 1.503 + return; 1.504 +} 1.505 + 1.506 + 1.507 +/* Client implementation of RPC site */ 1.508 + 1.509 +static int evrpc_schedule_request(struct evhttp_connection *connection, 1.510 + struct evrpc_request_wrapper *ctx); 1.511 + 1.512 +struct evrpc_pool * 1.513 +evrpc_pool_new(struct event_base *base) 1.514 +{ 1.515 + struct evrpc_pool *pool = mm_calloc(1, sizeof(struct evrpc_pool)); 1.516 + if (pool == NULL) 1.517 + return (NULL); 1.518 + 1.519 + TAILQ_INIT(&pool->connections); 1.520 + TAILQ_INIT(&pool->requests); 1.521 + 1.522 + TAILQ_INIT(&pool->paused_requests); 1.523 + 1.524 + TAILQ_INIT(&pool->input_hooks); 1.525 + TAILQ_INIT(&pool->output_hooks); 1.526 + 1.527 + pool->base = base; 1.528 + pool->timeout = -1; 1.529 + 1.530 + return (pool); 1.531 +} 1.532 + 1.533 +static void 1.534 +evrpc_request_wrapper_free(struct evrpc_request_wrapper *request) 1.535 +{ 1.536 + if (request->hook_meta != NULL) 1.537 + evrpc_hook_context_free(request->hook_meta); 1.538 + mm_free(request->name); 1.539 + mm_free(request); 1.540 +} 1.541 + 1.542 +void 1.543 +evrpc_pool_free(struct evrpc_pool *pool) 1.544 +{ 1.545 + struct evhttp_connection *connection; 1.546 + struct evrpc_request_wrapper *request; 1.547 + struct evrpc_hook_ctx *pause; 1.548 + struct evrpc_hook *hook; 1.549 + int r; 1.550 + 1.551 + while ((request = TAILQ_FIRST(&pool->requests)) != NULL) { 1.552 + TAILQ_REMOVE(&pool->requests, request, next); 1.553 + evrpc_request_wrapper_free(request); 1.554 + } 1.555 + 1.556 + while ((pause = TAILQ_FIRST(&pool->paused_requests)) != NULL) { 1.557 + TAILQ_REMOVE(&pool->paused_requests, pause, next); 1.558 + mm_free(pause); 1.559 + } 1.560 + 1.561 + while ((connection = TAILQ_FIRST(&pool->connections)) != NULL) { 1.562 + TAILQ_REMOVE(&pool->connections, connection, next); 1.563 + evhttp_connection_free(connection); 1.564 + } 1.565 + 1.566 + while ((hook = TAILQ_FIRST(&pool->input_hooks)) != NULL) { 1.567 + r = evrpc_remove_hook(pool, EVRPC_INPUT, hook); 1.568 + EVUTIL_ASSERT(r); 1.569 + } 1.570 + 1.571 + while ((hook = TAILQ_FIRST(&pool->output_hooks)) != NULL) { 1.572 + r = evrpc_remove_hook(pool, EVRPC_OUTPUT, hook); 1.573 + EVUTIL_ASSERT(r); 1.574 + } 1.575 + 1.576 + mm_free(pool); 1.577 +} 1.578 + 1.579 +/* 1.580 + * Add a connection to the RPC pool. A request scheduled on the pool 1.581 + * may use any available connection. 1.582 + */ 1.583 + 1.584 +void 1.585 +evrpc_pool_add_connection(struct evrpc_pool *pool, 1.586 + struct evhttp_connection *connection) 1.587 +{ 1.588 + EVUTIL_ASSERT(connection->http_server == NULL); 1.589 + TAILQ_INSERT_TAIL(&pool->connections, connection, next); 1.590 + 1.591 + /* 1.592 + * associate an event base with this connection 1.593 + */ 1.594 + if (pool->base != NULL) 1.595 + evhttp_connection_set_base(connection, pool->base); 1.596 + 1.597 + /* 1.598 + * unless a timeout was specifically set for a connection, 1.599 + * the connection inherits the timeout from the pool. 1.600 + */ 1.601 + if (connection->timeout == -1) 1.602 + connection->timeout = pool->timeout; 1.603 + 1.604 + /* 1.605 + * if we have any requests pending, schedule them with the new 1.606 + * connections. 1.607 + */ 1.608 + 1.609 + if (TAILQ_FIRST(&pool->requests) != NULL) { 1.610 + struct evrpc_request_wrapper *request = 1.611 + TAILQ_FIRST(&pool->requests); 1.612 + TAILQ_REMOVE(&pool->requests, request, next); 1.613 + evrpc_schedule_request(connection, request); 1.614 + } 1.615 +} 1.616 + 1.617 +void 1.618 +evrpc_pool_remove_connection(struct evrpc_pool *pool, 1.619 + struct evhttp_connection *connection) 1.620 +{ 1.621 + TAILQ_REMOVE(&pool->connections, connection, next); 1.622 +} 1.623 + 1.624 +void 1.625 +evrpc_pool_set_timeout(struct evrpc_pool *pool, int timeout_in_secs) 1.626 +{ 1.627 + struct evhttp_connection *evcon; 1.628 + TAILQ_FOREACH(evcon, &pool->connections, next) { 1.629 + evcon->timeout = timeout_in_secs; 1.630 + } 1.631 + pool->timeout = timeout_in_secs; 1.632 +} 1.633 + 1.634 + 1.635 +static void evrpc_reply_done(struct evhttp_request *, void *); 1.636 +static void evrpc_request_timeout(evutil_socket_t, short, void *); 1.637 + 1.638 +/* 1.639 + * Finds a connection object associated with the pool that is currently 1.640 + * idle and can be used to make a request. 1.641 + */ 1.642 +static struct evhttp_connection * 1.643 +evrpc_pool_find_connection(struct evrpc_pool *pool) 1.644 +{ 1.645 + struct evhttp_connection *connection; 1.646 + TAILQ_FOREACH(connection, &pool->connections, next) { 1.647 + if (TAILQ_FIRST(&connection->requests) == NULL) 1.648 + return (connection); 1.649 + } 1.650 + 1.651 + return (NULL); 1.652 +} 1.653 + 1.654 +/* 1.655 + * Prototypes responsible for evrpc scheduling and hooking 1.656 + */ 1.657 + 1.658 +static void evrpc_schedule_request_closure(void *ctx, enum EVRPC_HOOK_RESULT); 1.659 + 1.660 +/* 1.661 + * We assume that the ctx is no longer queued on the pool. 1.662 + */ 1.663 +static int 1.664 +evrpc_schedule_request(struct evhttp_connection *connection, 1.665 + struct evrpc_request_wrapper *ctx) 1.666 +{ 1.667 + struct evhttp_request *req = NULL; 1.668 + struct evrpc_pool *pool = ctx->pool; 1.669 + struct evrpc_status status; 1.670 + 1.671 + if ((req = evhttp_request_new(evrpc_reply_done, ctx)) == NULL) 1.672 + goto error; 1.673 + 1.674 + /* serialize the request data into the output buffer */ 1.675 + ctx->request_marshal(req->output_buffer, ctx->request); 1.676 + 1.677 + /* we need to know the connection that we might have to abort */ 1.678 + ctx->evcon = connection; 1.679 + 1.680 + /* if we get paused we also need to know the request */ 1.681 + ctx->req = req; 1.682 + 1.683 + if (TAILQ_FIRST(&pool->output_hooks) != NULL) { 1.684 + int hook_res; 1.685 + 1.686 + evrpc_hook_associate_meta(&ctx->hook_meta, connection); 1.687 + 1.688 + /* apply hooks to the outgoing request */ 1.689 + hook_res = evrpc_process_hooks(&pool->output_hooks, 1.690 + ctx, req, req->output_buffer); 1.691 + 1.692 + switch (hook_res) { 1.693 + case EVRPC_TERMINATE: 1.694 + goto error; 1.695 + case EVRPC_PAUSE: 1.696 + /* we need to be explicitly resumed */ 1.697 + if (evrpc_pause_request(pool, ctx, 1.698 + evrpc_schedule_request_closure) == -1) 1.699 + goto error; 1.700 + return (0); 1.701 + case EVRPC_CONTINUE: 1.702 + /* we can just continue */ 1.703 + break; 1.704 + default: 1.705 + EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || 1.706 + hook_res == EVRPC_CONTINUE || 1.707 + hook_res == EVRPC_PAUSE); 1.708 + } 1.709 + } 1.710 + 1.711 + evrpc_schedule_request_closure(ctx, EVRPC_CONTINUE); 1.712 + return (0); 1.713 + 1.714 +error: 1.715 + memset(&status, 0, sizeof(status)); 1.716 + status.error = EVRPC_STATUS_ERR_UNSTARTED; 1.717 + (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg); 1.718 + evrpc_request_wrapper_free(ctx); 1.719 + return (-1); 1.720 +} 1.721 + 1.722 +static void 1.723 +evrpc_schedule_request_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) 1.724 +{ 1.725 + struct evrpc_request_wrapper *ctx = arg; 1.726 + struct evhttp_connection *connection = ctx->evcon; 1.727 + struct evhttp_request *req = ctx->req; 1.728 + struct evrpc_pool *pool = ctx->pool; 1.729 + struct evrpc_status status; 1.730 + char *uri = NULL; 1.731 + int res = 0; 1.732 + 1.733 + if (hook_res == EVRPC_TERMINATE) 1.734 + goto error; 1.735 + 1.736 + uri = evrpc_construct_uri(ctx->name); 1.737 + if (uri == NULL) 1.738 + goto error; 1.739 + 1.740 + if (pool->timeout > 0) { 1.741 + /* 1.742 + * a timeout after which the whole rpc is going to be aborted. 1.743 + */ 1.744 + struct timeval tv; 1.745 + evutil_timerclear(&tv); 1.746 + tv.tv_sec = pool->timeout; 1.747 + evtimer_add(&ctx->ev_timeout, &tv); 1.748 + } 1.749 + 1.750 + /* start the request over the connection */ 1.751 + res = evhttp_make_request(connection, req, EVHTTP_REQ_POST, uri); 1.752 + mm_free(uri); 1.753 + 1.754 + if (res == -1) 1.755 + goto error; 1.756 + 1.757 + return; 1.758 + 1.759 +error: 1.760 + memset(&status, 0, sizeof(status)); 1.761 + status.error = EVRPC_STATUS_ERR_UNSTARTED; 1.762 + (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg); 1.763 + evrpc_request_wrapper_free(ctx); 1.764 +} 1.765 + 1.766 +/* we just queue the paused request on the pool under the req object */ 1.767 +static int 1.768 +evrpc_pause_request(void *vbase, void *ctx, 1.769 + void (*cb)(void *, enum EVRPC_HOOK_RESULT)) 1.770 +{ 1.771 + struct _evrpc_hooks *base = vbase; 1.772 + struct evrpc_hook_ctx *pause = mm_malloc(sizeof(*pause)); 1.773 + if (pause == NULL) 1.774 + return (-1); 1.775 + 1.776 + pause->ctx = ctx; 1.777 + pause->cb = cb; 1.778 + 1.779 + TAILQ_INSERT_TAIL(&base->pause_requests, pause, next); 1.780 + return (0); 1.781 +} 1.782 + 1.783 +int 1.784 +evrpc_resume_request(void *vbase, void *ctx, enum EVRPC_HOOK_RESULT res) 1.785 +{ 1.786 + struct _evrpc_hooks *base = vbase; 1.787 + struct evrpc_pause_list *head = &base->pause_requests; 1.788 + struct evrpc_hook_ctx *pause; 1.789 + 1.790 + TAILQ_FOREACH(pause, head, next) { 1.791 + if (pause->ctx == ctx) 1.792 + break; 1.793 + } 1.794 + 1.795 + if (pause == NULL) 1.796 + return (-1); 1.797 + 1.798 + (*pause->cb)(pause->ctx, res); 1.799 + TAILQ_REMOVE(head, pause, next); 1.800 + mm_free(pause); 1.801 + return (0); 1.802 +} 1.803 + 1.804 +int 1.805 +evrpc_make_request(struct evrpc_request_wrapper *ctx) 1.806 +{ 1.807 + struct evrpc_pool *pool = ctx->pool; 1.808 + 1.809 + /* initialize the event structure for this rpc */ 1.810 + evtimer_assign(&ctx->ev_timeout, pool->base, evrpc_request_timeout, ctx); 1.811 + 1.812 + /* we better have some available connections on the pool */ 1.813 + EVUTIL_ASSERT(TAILQ_FIRST(&pool->connections) != NULL); 1.814 + 1.815 + /* 1.816 + * if no connection is available, we queue the request on the pool, 1.817 + * the next time a connection is empty, the rpc will be send on that. 1.818 + */ 1.819 + TAILQ_INSERT_TAIL(&pool->requests, ctx, next); 1.820 + 1.821 + evrpc_pool_schedule(pool); 1.822 + 1.823 + return (0); 1.824 +} 1.825 + 1.826 + 1.827 +struct evrpc_request_wrapper * 1.828 +evrpc_make_request_ctx( 1.829 + struct evrpc_pool *pool, void *request, void *reply, 1.830 + const char *rpcname, 1.831 + void (*req_marshal)(struct evbuffer*, void *), 1.832 + void (*rpl_clear)(void *), 1.833 + int (*rpl_unmarshal)(void *, struct evbuffer *), 1.834 + void (*cb)(struct evrpc_status *, void *, void *, void *), 1.835 + void *cbarg) 1.836 +{ 1.837 + struct evrpc_request_wrapper *ctx = (struct evrpc_request_wrapper *) 1.838 + mm_malloc(sizeof(struct evrpc_request_wrapper)); 1.839 + if (ctx == NULL) 1.840 + return (NULL); 1.841 + 1.842 + ctx->pool = pool; 1.843 + ctx->hook_meta = NULL; 1.844 + ctx->evcon = NULL; 1.845 + ctx->name = mm_strdup(rpcname); 1.846 + if (ctx->name == NULL) { 1.847 + mm_free(ctx); 1.848 + return (NULL); 1.849 + } 1.850 + ctx->cb = cb; 1.851 + ctx->cb_arg = cbarg; 1.852 + ctx->request = request; 1.853 + ctx->reply = reply; 1.854 + ctx->request_marshal = req_marshal; 1.855 + ctx->reply_clear = rpl_clear; 1.856 + ctx->reply_unmarshal = rpl_unmarshal; 1.857 + 1.858 + return (ctx); 1.859 +} 1.860 + 1.861 +static void 1.862 +evrpc_reply_done_closure(void *, enum EVRPC_HOOK_RESULT); 1.863 + 1.864 +static void 1.865 +evrpc_reply_done(struct evhttp_request *req, void *arg) 1.866 +{ 1.867 + struct evrpc_request_wrapper *ctx = arg; 1.868 + struct evrpc_pool *pool = ctx->pool; 1.869 + int hook_res = EVRPC_CONTINUE; 1.870 + 1.871 + /* cancel any timeout we might have scheduled */ 1.872 + event_del(&ctx->ev_timeout); 1.873 + 1.874 + ctx->req = req; 1.875 + 1.876 + /* we need to get the reply now */ 1.877 + if (req == NULL) { 1.878 + evrpc_reply_done_closure(ctx, EVRPC_CONTINUE); 1.879 + return; 1.880 + } 1.881 + 1.882 + if (TAILQ_FIRST(&pool->input_hooks) != NULL) { 1.883 + evrpc_hook_associate_meta(&ctx->hook_meta, ctx->evcon); 1.884 + 1.885 + /* apply hooks to the incoming request */ 1.886 + hook_res = evrpc_process_hooks(&pool->input_hooks, 1.887 + ctx, req, req->input_buffer); 1.888 + 1.889 + switch (hook_res) { 1.890 + case EVRPC_TERMINATE: 1.891 + case EVRPC_CONTINUE: 1.892 + break; 1.893 + case EVRPC_PAUSE: 1.894 + /* 1.895 + * if we get paused we also need to know the 1.896 + * request. unfortunately, the underlying 1.897 + * layer is going to free it. we need to 1.898 + * request ownership explicitly 1.899 + */ 1.900 + if (req != NULL) 1.901 + evhttp_request_own(req); 1.902 + 1.903 + evrpc_pause_request(pool, ctx, 1.904 + evrpc_reply_done_closure); 1.905 + return; 1.906 + default: 1.907 + EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || 1.908 + hook_res == EVRPC_CONTINUE || 1.909 + hook_res == EVRPC_PAUSE); 1.910 + } 1.911 + } 1.912 + 1.913 + evrpc_reply_done_closure(ctx, hook_res); 1.914 + 1.915 + /* http request is being freed by underlying layer */ 1.916 +} 1.917 + 1.918 +static void 1.919 +evrpc_reply_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) 1.920 +{ 1.921 + struct evrpc_request_wrapper *ctx = arg; 1.922 + struct evhttp_request *req = ctx->req; 1.923 + struct evrpc_pool *pool = ctx->pool; 1.924 + struct evrpc_status status; 1.925 + int res = -1; 1.926 + 1.927 + memset(&status, 0, sizeof(status)); 1.928 + status.http_req = req; 1.929 + 1.930 + /* we need to get the reply now */ 1.931 + if (req == NULL) { 1.932 + status.error = EVRPC_STATUS_ERR_TIMEOUT; 1.933 + } else if (hook_res == EVRPC_TERMINATE) { 1.934 + status.error = EVRPC_STATUS_ERR_HOOKABORTED; 1.935 + } else { 1.936 + res = ctx->reply_unmarshal(ctx->reply, req->input_buffer); 1.937 + if (res == -1) 1.938 + status.error = EVRPC_STATUS_ERR_BADPAYLOAD; 1.939 + } 1.940 + 1.941 + if (res == -1) { 1.942 + /* clear everything that we might have written previously */ 1.943 + ctx->reply_clear(ctx->reply); 1.944 + } 1.945 + 1.946 + (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg); 1.947 + 1.948 + evrpc_request_wrapper_free(ctx); 1.949 + 1.950 + /* the http layer owned the original request structure, but if we 1.951 + * got paused, we asked for ownership and need to free it here. */ 1.952 + if (req != NULL && evhttp_request_is_owned(req)) 1.953 + evhttp_request_free(req); 1.954 + 1.955 + /* see if we can schedule another request */ 1.956 + evrpc_pool_schedule(pool); 1.957 +} 1.958 + 1.959 +static void 1.960 +evrpc_pool_schedule(struct evrpc_pool *pool) 1.961 +{ 1.962 + struct evrpc_request_wrapper *ctx = TAILQ_FIRST(&pool->requests); 1.963 + struct evhttp_connection *evcon; 1.964 + 1.965 + /* if no requests are pending, we have no work */ 1.966 + if (ctx == NULL) 1.967 + return; 1.968 + 1.969 + if ((evcon = evrpc_pool_find_connection(pool)) != NULL) { 1.970 + TAILQ_REMOVE(&pool->requests, ctx, next); 1.971 + evrpc_schedule_request(evcon, ctx); 1.972 + } 1.973 +} 1.974 + 1.975 +static void 1.976 +evrpc_request_timeout(evutil_socket_t fd, short what, void *arg) 1.977 +{ 1.978 + struct evrpc_request_wrapper *ctx = arg; 1.979 + struct evhttp_connection *evcon = ctx->evcon; 1.980 + EVUTIL_ASSERT(evcon != NULL); 1.981 + 1.982 + evhttp_connection_fail(evcon, EVCON_HTTP_TIMEOUT); 1.983 +} 1.984 + 1.985 +/* 1.986 + * frees potential meta data associated with a request. 1.987 + */ 1.988 + 1.989 +static void 1.990 +evrpc_meta_data_free(struct evrpc_meta_list *meta_data) 1.991 +{ 1.992 + struct evrpc_meta *entry; 1.993 + EVUTIL_ASSERT(meta_data != NULL); 1.994 + 1.995 + while ((entry = TAILQ_FIRST(meta_data)) != NULL) { 1.996 + TAILQ_REMOVE(meta_data, entry, next); 1.997 + mm_free(entry->key); 1.998 + mm_free(entry->data); 1.999 + mm_free(entry); 1.1000 + } 1.1001 +} 1.1002 + 1.1003 +static struct evrpc_hook_meta * 1.1004 +evrpc_hook_meta_new(void) 1.1005 +{ 1.1006 + struct evrpc_hook_meta *ctx; 1.1007 + ctx = mm_malloc(sizeof(struct evrpc_hook_meta)); 1.1008 + EVUTIL_ASSERT(ctx != NULL); 1.1009 + 1.1010 + TAILQ_INIT(&ctx->meta_data); 1.1011 + ctx->evcon = NULL; 1.1012 + 1.1013 + return (ctx); 1.1014 +} 1.1015 + 1.1016 +static void 1.1017 +evrpc_hook_associate_meta(struct evrpc_hook_meta **pctx, 1.1018 + struct evhttp_connection *evcon) 1.1019 +{ 1.1020 + struct evrpc_hook_meta *ctx = *pctx; 1.1021 + if (ctx == NULL) 1.1022 + *pctx = ctx = evrpc_hook_meta_new(); 1.1023 + ctx->evcon = evcon; 1.1024 +} 1.1025 + 1.1026 +static void 1.1027 +evrpc_hook_context_free(struct evrpc_hook_meta *ctx) 1.1028 +{ 1.1029 + evrpc_meta_data_free(&ctx->meta_data); 1.1030 + mm_free(ctx); 1.1031 +} 1.1032 + 1.1033 +/* Adds meta data */ 1.1034 +void 1.1035 +evrpc_hook_add_meta(void *ctx, const char *key, 1.1036 + const void *data, size_t data_size) 1.1037 +{ 1.1038 + struct evrpc_request_wrapper *req = ctx; 1.1039 + struct evrpc_hook_meta *store = NULL; 1.1040 + struct evrpc_meta *meta = NULL; 1.1041 + 1.1042 + if ((store = req->hook_meta) == NULL) 1.1043 + store = req->hook_meta = evrpc_hook_meta_new(); 1.1044 + 1.1045 + meta = mm_malloc(sizeof(struct evrpc_meta)); 1.1046 + EVUTIL_ASSERT(meta != NULL); 1.1047 + meta->key = mm_strdup(key); 1.1048 + EVUTIL_ASSERT(meta->key != NULL); 1.1049 + meta->data_size = data_size; 1.1050 + meta->data = mm_malloc(data_size); 1.1051 + EVUTIL_ASSERT(meta->data != NULL); 1.1052 + memcpy(meta->data, data, data_size); 1.1053 + 1.1054 + TAILQ_INSERT_TAIL(&store->meta_data, meta, next); 1.1055 +} 1.1056 + 1.1057 +int 1.1058 +evrpc_hook_find_meta(void *ctx, const char *key, void **data, size_t *data_size) 1.1059 +{ 1.1060 + struct evrpc_request_wrapper *req = ctx; 1.1061 + struct evrpc_meta *meta = NULL; 1.1062 + 1.1063 + if (req->hook_meta == NULL) 1.1064 + return (-1); 1.1065 + 1.1066 + TAILQ_FOREACH(meta, &req->hook_meta->meta_data, next) { 1.1067 + if (strcmp(meta->key, key) == 0) { 1.1068 + *data = meta->data; 1.1069 + *data_size = meta->data_size; 1.1070 + return (0); 1.1071 + } 1.1072 + } 1.1073 + 1.1074 + return (-1); 1.1075 +} 1.1076 + 1.1077 +struct evhttp_connection * 1.1078 +evrpc_hook_get_connection(void *ctx) 1.1079 +{ 1.1080 + struct evrpc_request_wrapper *req = ctx; 1.1081 + return (req->hook_meta != NULL ? req->hook_meta->evcon : NULL); 1.1082 +} 1.1083 + 1.1084 +int 1.1085 +evrpc_send_request_generic(struct evrpc_pool *pool, 1.1086 + void *request, void *reply, 1.1087 + void (*cb)(struct evrpc_status *, void *, void *, void *), 1.1088 + void *cb_arg, 1.1089 + const char *rpcname, 1.1090 + void (*req_marshal)(struct evbuffer *, void *), 1.1091 + void (*rpl_clear)(void *), 1.1092 + int (*rpl_unmarshal)(void *, struct evbuffer *)) 1.1093 +{ 1.1094 + struct evrpc_status status; 1.1095 + struct evrpc_request_wrapper *ctx; 1.1096 + ctx = evrpc_make_request_ctx(pool, request, reply, 1.1097 + rpcname, req_marshal, rpl_clear, rpl_unmarshal, cb, cb_arg); 1.1098 + if (ctx == NULL) 1.1099 + goto error; 1.1100 + return (evrpc_make_request(ctx)); 1.1101 +error: 1.1102 + memset(&status, 0, sizeof(status)); 1.1103 + status.error = EVRPC_STATUS_ERR_UNSTARTED; 1.1104 + (*(cb))(&status, request, reply, cb_arg); 1.1105 + return (-1); 1.1106 +} 1.1107 + 1.1108 +/** Takes a request object and fills it in with the right magic */ 1.1109 +static struct evrpc * 1.1110 +evrpc_register_object(const char *name, 1.1111 + void *(*req_new)(void*), void *req_new_arg, void (*req_free)(void *), 1.1112 + int (*req_unmarshal)(void *, struct evbuffer *), 1.1113 + void *(*rpl_new)(void*), void *rpl_new_arg, void (*rpl_free)(void *), 1.1114 + int (*rpl_complete)(void *), 1.1115 + void (*rpl_marshal)(struct evbuffer *, void *)) 1.1116 +{ 1.1117 + struct evrpc* rpc = (struct evrpc *)mm_calloc(1, sizeof(struct evrpc)); 1.1118 + if (rpc == NULL) 1.1119 + return (NULL); 1.1120 + rpc->uri = mm_strdup(name); 1.1121 + if (rpc->uri == NULL) { 1.1122 + mm_free(rpc); 1.1123 + return (NULL); 1.1124 + } 1.1125 + rpc->request_new = req_new; 1.1126 + rpc->request_new_arg = req_new_arg; 1.1127 + rpc->request_free = req_free; 1.1128 + rpc->request_unmarshal = req_unmarshal; 1.1129 + rpc->reply_new = rpl_new; 1.1130 + rpc->reply_new_arg = rpl_new_arg; 1.1131 + rpc->reply_free = rpl_free; 1.1132 + rpc->reply_complete = rpl_complete; 1.1133 + rpc->reply_marshal = rpl_marshal; 1.1134 + return (rpc); 1.1135 +} 1.1136 + 1.1137 +int 1.1138 +evrpc_register_generic(struct evrpc_base *base, const char *name, 1.1139 + void (*callback)(struct evrpc_req_generic *, void *), void *cbarg, 1.1140 + void *(*req_new)(void *), void *req_new_arg, void (*req_free)(void *), 1.1141 + int (*req_unmarshal)(void *, struct evbuffer *), 1.1142 + void *(*rpl_new)(void *), void *rpl_new_arg, void (*rpl_free)(void *), 1.1143 + int (*rpl_complete)(void *), 1.1144 + void (*rpl_marshal)(struct evbuffer *, void *)) 1.1145 +{ 1.1146 + struct evrpc* rpc = 1.1147 + evrpc_register_object(name, req_new, req_new_arg, req_free, req_unmarshal, 1.1148 + rpl_new, rpl_new_arg, rpl_free, rpl_complete, rpl_marshal); 1.1149 + if (rpc == NULL) 1.1150 + return (-1); 1.1151 + evrpc_register_rpc(base, rpc, 1.1152 + (void (*)(struct evrpc_req_generic*, void *))callback, cbarg); 1.1153 + return (0); 1.1154 +} 1.1155 + 1.1156 +/** accessors for obscure and undocumented functionality */ 1.1157 +struct evrpc_pool * 1.1158 +evrpc_request_get_pool(struct evrpc_request_wrapper *ctx) 1.1159 +{ 1.1160 + return (ctx->pool); 1.1161 +} 1.1162 + 1.1163 +void 1.1164 +evrpc_request_set_pool(struct evrpc_request_wrapper *ctx, 1.1165 + struct evrpc_pool *pool) 1.1166 +{ 1.1167 + ctx->pool = pool; 1.1168 +} 1.1169 + 1.1170 +void 1.1171 +evrpc_request_set_cb(struct evrpc_request_wrapper *ctx, 1.1172 + void (*cb)(struct evrpc_status*, void *request, void *reply, void *arg), 1.1173 + void *cb_arg) 1.1174 +{ 1.1175 + ctx->cb = cb; 1.1176 + ctx->cb_arg = cb_arg; 1.1177 +}