ipc/chromium/src/third_party/libevent/evrpc.c

Wed, 31 Dec 2014 06:09:35 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Wed, 31 Dec 2014 06:09:35 +0100
changeset 0
6474c204b198
permissions
-rw-r--r--

Cloned upstream origin tor-browser at tor-browser-31.3.0esr-4.5-1-build1
revision ID fc1c9ff7c1b2defdbc039f12214767608f46423f for hacking purpose.

michael@0 1 /*
michael@0 2 * Copyright (c) 2000-2007 Niels Provos <provos@citi.umich.edu>
michael@0 3 * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
michael@0 4 *
michael@0 5 * Redistribution and use in source and binary forms, with or without
michael@0 6 * modification, are permitted provided that the following conditions
michael@0 7 * are met:
michael@0 8 * 1. Redistributions of source code must retain the above copyright
michael@0 9 * notice, this list of conditions and the following disclaimer.
michael@0 10 * 2. Redistributions in binary form must reproduce the above copyright
michael@0 11 * notice, this list of conditions and the following disclaimer in the
michael@0 12 * documentation and/or other materials provided with the distribution.
michael@0 13 * 3. The name of the author may not be used to endorse or promote products
michael@0 14 * derived from this software without specific prior written permission.
michael@0 15 *
michael@0 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
michael@0 17 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
michael@0 18 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
michael@0 19 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
michael@0 20 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
michael@0 21 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
michael@0 22 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
michael@0 23 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
michael@0 24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
michael@0 25 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
michael@0 26 */
michael@0 27 #include "event2/event-config.h"
michael@0 28
michael@0 29 #ifdef WIN32
michael@0 30 #define WIN32_LEAN_AND_MEAN
michael@0 31 #include <winsock2.h>
michael@0 32 #include <windows.h>
michael@0 33 #undef WIN32_LEAN_AND_MEAN
michael@0 34 #endif
michael@0 35
michael@0 36 #include <sys/types.h>
michael@0 37 #ifndef WIN32
michael@0 38 #include <sys/socket.h>
michael@0 39 #endif
michael@0 40 #ifdef _EVENT_HAVE_SYS_TIME_H
michael@0 41 #include <sys/time.h>
michael@0 42 #endif
michael@0 43 #include <sys/queue.h>
michael@0 44 #include <stdio.h>
michael@0 45 #include <stdlib.h>
michael@0 46 #ifndef WIN32
michael@0 47 #include <unistd.h>
michael@0 48 #endif
michael@0 49 #include <errno.h>
michael@0 50 #include <signal.h>
michael@0 51 #include <string.h>
michael@0 52
michael@0 53 #include <sys/queue.h>
michael@0 54
michael@0 55 #include "event2/event.h"
michael@0 56 #include "event2/event_struct.h"
michael@0 57 #include "event2/rpc.h"
michael@0 58 #include "event2/rpc_struct.h"
michael@0 59 #include "evrpc-internal.h"
michael@0 60 #include "event2/http.h"
michael@0 61 #include "event2/buffer.h"
michael@0 62 #include "event2/tag.h"
michael@0 63 #include "event2/http_struct.h"
michael@0 64 #include "event2/http_compat.h"
michael@0 65 #include "event2/util.h"
michael@0 66 #include "util-internal.h"
michael@0 67 #include "log-internal.h"
michael@0 68 #include "mm-internal.h"
michael@0 69
michael@0 70 struct evrpc_base *
michael@0 71 evrpc_init(struct evhttp *http_server)
michael@0 72 {
michael@0 73 struct evrpc_base* base = mm_calloc(1, sizeof(struct evrpc_base));
michael@0 74 if (base == NULL)
michael@0 75 return (NULL);
michael@0 76
michael@0 77 /* we rely on the tagging sub system */
michael@0 78 evtag_init();
michael@0 79
michael@0 80 TAILQ_INIT(&base->registered_rpcs);
michael@0 81 TAILQ_INIT(&base->input_hooks);
michael@0 82 TAILQ_INIT(&base->output_hooks);
michael@0 83
michael@0 84 TAILQ_INIT(&base->paused_requests);
michael@0 85
michael@0 86 base->http_server = http_server;
michael@0 87
michael@0 88 return (base);
michael@0 89 }
michael@0 90
michael@0 91 void
michael@0 92 evrpc_free(struct evrpc_base *base)
michael@0 93 {
michael@0 94 struct evrpc *rpc;
michael@0 95 struct evrpc_hook *hook;
michael@0 96 struct evrpc_hook_ctx *pause;
michael@0 97 int r;
michael@0 98
michael@0 99 while ((rpc = TAILQ_FIRST(&base->registered_rpcs)) != NULL) {
michael@0 100 r = evrpc_unregister_rpc(base, rpc->uri);
michael@0 101 EVUTIL_ASSERT(r == 0);
michael@0 102 }
michael@0 103 while ((pause = TAILQ_FIRST(&base->paused_requests)) != NULL) {
michael@0 104 TAILQ_REMOVE(&base->paused_requests, pause, next);
michael@0 105 mm_free(pause);
michael@0 106 }
michael@0 107 while ((hook = TAILQ_FIRST(&base->input_hooks)) != NULL) {
michael@0 108 r = evrpc_remove_hook(base, EVRPC_INPUT, hook);
michael@0 109 EVUTIL_ASSERT(r);
michael@0 110 }
michael@0 111 while ((hook = TAILQ_FIRST(&base->output_hooks)) != NULL) {
michael@0 112 r = evrpc_remove_hook(base, EVRPC_OUTPUT, hook);
michael@0 113 EVUTIL_ASSERT(r);
michael@0 114 }
michael@0 115 mm_free(base);
michael@0 116 }
michael@0 117
michael@0 118 void *
michael@0 119 evrpc_add_hook(void *vbase,
michael@0 120 enum EVRPC_HOOK_TYPE hook_type,
michael@0 121 int (*cb)(void *, struct evhttp_request *, struct evbuffer *, void *),
michael@0 122 void *cb_arg)
michael@0 123 {
michael@0 124 struct _evrpc_hooks *base = vbase;
michael@0 125 struct evrpc_hook_list *head = NULL;
michael@0 126 struct evrpc_hook *hook = NULL;
michael@0 127 switch (hook_type) {
michael@0 128 case EVRPC_INPUT:
michael@0 129 head = &base->in_hooks;
michael@0 130 break;
michael@0 131 case EVRPC_OUTPUT:
michael@0 132 head = &base->out_hooks;
michael@0 133 break;
michael@0 134 default:
michael@0 135 EVUTIL_ASSERT(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT);
michael@0 136 }
michael@0 137
michael@0 138 hook = mm_calloc(1, sizeof(struct evrpc_hook));
michael@0 139 EVUTIL_ASSERT(hook != NULL);
michael@0 140
michael@0 141 hook->process = cb;
michael@0 142 hook->process_arg = cb_arg;
michael@0 143 TAILQ_INSERT_TAIL(head, hook, next);
michael@0 144
michael@0 145 return (hook);
michael@0 146 }
michael@0 147
michael@0 148 static int
michael@0 149 evrpc_remove_hook_internal(struct evrpc_hook_list *head, void *handle)
michael@0 150 {
michael@0 151 struct evrpc_hook *hook = NULL;
michael@0 152 TAILQ_FOREACH(hook, head, next) {
michael@0 153 if (hook == handle) {
michael@0 154 TAILQ_REMOVE(head, hook, next);
michael@0 155 mm_free(hook);
michael@0 156 return (1);
michael@0 157 }
michael@0 158 }
michael@0 159
michael@0 160 return (0);
michael@0 161 }
michael@0 162
michael@0 163 /*
michael@0 164 * remove the hook specified by the handle
michael@0 165 */
michael@0 166
michael@0 167 int
michael@0 168 evrpc_remove_hook(void *vbase, enum EVRPC_HOOK_TYPE hook_type, void *handle)
michael@0 169 {
michael@0 170 struct _evrpc_hooks *base = vbase;
michael@0 171 struct evrpc_hook_list *head = NULL;
michael@0 172 switch (hook_type) {
michael@0 173 case EVRPC_INPUT:
michael@0 174 head = &base->in_hooks;
michael@0 175 break;
michael@0 176 case EVRPC_OUTPUT:
michael@0 177 head = &base->out_hooks;
michael@0 178 break;
michael@0 179 default:
michael@0 180 EVUTIL_ASSERT(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT);
michael@0 181 }
michael@0 182
michael@0 183 return (evrpc_remove_hook_internal(head, handle));
michael@0 184 }
michael@0 185
michael@0 186 static int
michael@0 187 evrpc_process_hooks(struct evrpc_hook_list *head, void *ctx,
michael@0 188 struct evhttp_request *req, struct evbuffer *evbuf)
michael@0 189 {
michael@0 190 struct evrpc_hook *hook;
michael@0 191 TAILQ_FOREACH(hook, head, next) {
michael@0 192 int res = hook->process(ctx, req, evbuf, hook->process_arg);
michael@0 193 if (res != EVRPC_CONTINUE)
michael@0 194 return (res);
michael@0 195 }
michael@0 196
michael@0 197 return (EVRPC_CONTINUE);
michael@0 198 }
michael@0 199
michael@0 200 static void evrpc_pool_schedule(struct evrpc_pool *pool);
michael@0 201 static void evrpc_request_cb(struct evhttp_request *, void *);
michael@0 202
michael@0 203 /*
michael@0 204 * Registers a new RPC with the HTTP server. The evrpc object is expected
michael@0 205 * to have been filled in via the EVRPC_REGISTER_OBJECT macro which in turn
michael@0 206 * calls this function.
michael@0 207 */
michael@0 208
michael@0 209 static char *
michael@0 210 evrpc_construct_uri(const char *uri)
michael@0 211 {
michael@0 212 char *constructed_uri;
michael@0 213 size_t constructed_uri_len;
michael@0 214
michael@0 215 constructed_uri_len = strlen(EVRPC_URI_PREFIX) + strlen(uri) + 1;
michael@0 216 if ((constructed_uri = mm_malloc(constructed_uri_len)) == NULL)
michael@0 217 event_err(1, "%s: failed to register rpc at %s",
michael@0 218 __func__, uri);
michael@0 219 memcpy(constructed_uri, EVRPC_URI_PREFIX, strlen(EVRPC_URI_PREFIX));
michael@0 220 memcpy(constructed_uri + strlen(EVRPC_URI_PREFIX), uri, strlen(uri));
michael@0 221 constructed_uri[constructed_uri_len - 1] = '\0';
michael@0 222
michael@0 223 return (constructed_uri);
michael@0 224 }
michael@0 225
michael@0 226 int
michael@0 227 evrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc,
michael@0 228 void (*cb)(struct evrpc_req_generic *, void *), void *cb_arg)
michael@0 229 {
michael@0 230 char *constructed_uri = evrpc_construct_uri(rpc->uri);
michael@0 231
michael@0 232 rpc->base = base;
michael@0 233 rpc->cb = cb;
michael@0 234 rpc->cb_arg = cb_arg;
michael@0 235
michael@0 236 TAILQ_INSERT_TAIL(&base->registered_rpcs, rpc, next);
michael@0 237
michael@0 238 evhttp_set_cb(base->http_server,
michael@0 239 constructed_uri,
michael@0 240 evrpc_request_cb,
michael@0 241 rpc);
michael@0 242
michael@0 243 mm_free(constructed_uri);
michael@0 244
michael@0 245 return (0);
michael@0 246 }
michael@0 247
michael@0 248 int
michael@0 249 evrpc_unregister_rpc(struct evrpc_base *base, const char *name)
michael@0 250 {
michael@0 251 char *registered_uri = NULL;
michael@0 252 struct evrpc *rpc;
michael@0 253 int r;
michael@0 254
michael@0 255 /* find the right rpc; linear search might be slow */
michael@0 256 TAILQ_FOREACH(rpc, &base->registered_rpcs, next) {
michael@0 257 if (strcmp(rpc->uri, name) == 0)
michael@0 258 break;
michael@0 259 }
michael@0 260 if (rpc == NULL) {
michael@0 261 /* We did not find an RPC with this name */
michael@0 262 return (-1);
michael@0 263 }
michael@0 264 TAILQ_REMOVE(&base->registered_rpcs, rpc, next);
michael@0 265
michael@0 266 registered_uri = evrpc_construct_uri(name);
michael@0 267
michael@0 268 /* remove the http server callback */
michael@0 269 r = evhttp_del_cb(base->http_server, registered_uri);
michael@0 270 EVUTIL_ASSERT(r == 0);
michael@0 271
michael@0 272 mm_free(registered_uri);
michael@0 273
michael@0 274 mm_free((char *)rpc->uri);
michael@0 275 mm_free(rpc);
michael@0 276 return (0);
michael@0 277 }
michael@0 278
michael@0 279 static int evrpc_pause_request(void *vbase, void *ctx,
michael@0 280 void (*cb)(void *, enum EVRPC_HOOK_RESULT));
michael@0 281 static void evrpc_request_cb_closure(void *, enum EVRPC_HOOK_RESULT);
michael@0 282
michael@0 283 static void
michael@0 284 evrpc_request_cb(struct evhttp_request *req, void *arg)
michael@0 285 {
michael@0 286 struct evrpc *rpc = arg;
michael@0 287 struct evrpc_req_generic *rpc_state = NULL;
michael@0 288
michael@0 289 /* let's verify the outside parameters */
michael@0 290 if (req->type != EVHTTP_REQ_POST ||
michael@0 291 evbuffer_get_length(req->input_buffer) <= 0)
michael@0 292 goto error;
michael@0 293
michael@0 294 rpc_state = mm_calloc(1, sizeof(struct evrpc_req_generic));
michael@0 295 if (rpc_state == NULL)
michael@0 296 goto error;
michael@0 297 rpc_state->rpc = rpc;
michael@0 298 rpc_state->http_req = req;
michael@0 299 rpc_state->rpc_data = NULL;
michael@0 300
michael@0 301 if (TAILQ_FIRST(&rpc->base->input_hooks) != NULL) {
michael@0 302 int hook_res;
michael@0 303
michael@0 304 evrpc_hook_associate_meta(&rpc_state->hook_meta, req->evcon);
michael@0 305
michael@0 306 /*
michael@0 307 * allow hooks to modify the outgoing request
michael@0 308 */
michael@0 309 hook_res = evrpc_process_hooks(&rpc->base->input_hooks,
michael@0 310 rpc_state, req, req->input_buffer);
michael@0 311 switch (hook_res) {
michael@0 312 case EVRPC_TERMINATE:
michael@0 313 goto error;
michael@0 314 case EVRPC_PAUSE:
michael@0 315 evrpc_pause_request(rpc->base, rpc_state,
michael@0 316 evrpc_request_cb_closure);
michael@0 317 return;
michael@0 318 case EVRPC_CONTINUE:
michael@0 319 break;
michael@0 320 default:
michael@0 321 EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE ||
michael@0 322 hook_res == EVRPC_CONTINUE ||
michael@0 323 hook_res == EVRPC_PAUSE);
michael@0 324 }
michael@0 325 }
michael@0 326
michael@0 327 evrpc_request_cb_closure(rpc_state, EVRPC_CONTINUE);
michael@0 328 return;
michael@0 329
michael@0 330 error:
michael@0 331 if (rpc_state != NULL)
michael@0 332 evrpc_reqstate_free(rpc_state);
michael@0 333 evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
michael@0 334 return;
michael@0 335 }
michael@0 336
michael@0 337 static void
michael@0 338 evrpc_request_cb_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res)
michael@0 339 {
michael@0 340 struct evrpc_req_generic *rpc_state = arg;
michael@0 341 struct evrpc *rpc;
michael@0 342 struct evhttp_request *req;
michael@0 343
michael@0 344 EVUTIL_ASSERT(rpc_state);
michael@0 345 rpc = rpc_state->rpc;
michael@0 346 req = rpc_state->http_req;
michael@0 347
michael@0 348 if (hook_res == EVRPC_TERMINATE)
michael@0 349 goto error;
michael@0 350
michael@0 351 /* let's check that we can parse the request */
michael@0 352 rpc_state->request = rpc->request_new(rpc->request_new_arg);
michael@0 353 if (rpc_state->request == NULL)
michael@0 354 goto error;
michael@0 355
michael@0 356 if (rpc->request_unmarshal(
michael@0 357 rpc_state->request, req->input_buffer) == -1) {
michael@0 358 /* we failed to parse the request; that's a bummer */
michael@0 359 goto error;
michael@0 360 }
michael@0 361
michael@0 362 /* at this point, we have a well formed request, prepare the reply */
michael@0 363
michael@0 364 rpc_state->reply = rpc->reply_new(rpc->reply_new_arg);
michael@0 365 if (rpc_state->reply == NULL)
michael@0 366 goto error;
michael@0 367
michael@0 368 /* give the rpc to the user; they can deal with it */
michael@0 369 rpc->cb(rpc_state, rpc->cb_arg);
michael@0 370
michael@0 371 return;
michael@0 372
michael@0 373 error:
michael@0 374 if (rpc_state != NULL)
michael@0 375 evrpc_reqstate_free(rpc_state);
michael@0 376 evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
michael@0 377 return;
michael@0 378 }
michael@0 379
michael@0 380
michael@0 381 void
michael@0 382 evrpc_reqstate_free(struct evrpc_req_generic* rpc_state)
michael@0 383 {
michael@0 384 struct evrpc *rpc;
michael@0 385 EVUTIL_ASSERT(rpc_state != NULL);
michael@0 386 rpc = rpc_state->rpc;
michael@0 387
michael@0 388 /* clean up all memory */
michael@0 389 if (rpc_state->hook_meta != NULL)
michael@0 390 evrpc_hook_context_free(rpc_state->hook_meta);
michael@0 391 if (rpc_state->request != NULL)
michael@0 392 rpc->request_free(rpc_state->request);
michael@0 393 if (rpc_state->reply != NULL)
michael@0 394 rpc->reply_free(rpc_state->reply);
michael@0 395 if (rpc_state->rpc_data != NULL)
michael@0 396 evbuffer_free(rpc_state->rpc_data);
michael@0 397 mm_free(rpc_state);
michael@0 398 }
michael@0 399
michael@0 400 static void
michael@0 401 evrpc_request_done_closure(void *, enum EVRPC_HOOK_RESULT);
michael@0 402
michael@0 403 void
michael@0 404 evrpc_request_done(struct evrpc_req_generic *rpc_state)
michael@0 405 {
michael@0 406 struct evhttp_request *req;
michael@0 407 struct evrpc *rpc;
michael@0 408
michael@0 409 EVUTIL_ASSERT(rpc_state);
michael@0 410
michael@0 411 req = rpc_state->http_req;
michael@0 412 rpc = rpc_state->rpc;
michael@0 413
michael@0 414 if (rpc->reply_complete(rpc_state->reply) == -1) {
michael@0 415 /* the reply was not completely filled in. error out */
michael@0 416 goto error;
michael@0 417 }
michael@0 418
michael@0 419 if ((rpc_state->rpc_data = evbuffer_new()) == NULL) {
michael@0 420 /* out of memory */
michael@0 421 goto error;
michael@0 422 }
michael@0 423
michael@0 424 /* serialize the reply */
michael@0 425 rpc->reply_marshal(rpc_state->rpc_data, rpc_state->reply);
michael@0 426
michael@0 427 if (TAILQ_FIRST(&rpc->base->output_hooks) != NULL) {
michael@0 428 int hook_res;
michael@0 429
michael@0 430 evrpc_hook_associate_meta(&rpc_state->hook_meta, req->evcon);
michael@0 431
michael@0 432 /* do hook based tweaks to the request */
michael@0 433 hook_res = evrpc_process_hooks(&rpc->base->output_hooks,
michael@0 434 rpc_state, req, rpc_state->rpc_data);
michael@0 435 switch (hook_res) {
michael@0 436 case EVRPC_TERMINATE:
michael@0 437 goto error;
michael@0 438 case EVRPC_PAUSE:
michael@0 439 if (evrpc_pause_request(rpc->base, rpc_state,
michael@0 440 evrpc_request_done_closure) == -1)
michael@0 441 goto error;
michael@0 442 return;
michael@0 443 case EVRPC_CONTINUE:
michael@0 444 break;
michael@0 445 default:
michael@0 446 EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE ||
michael@0 447 hook_res == EVRPC_CONTINUE ||
michael@0 448 hook_res == EVRPC_PAUSE);
michael@0 449 }
michael@0 450 }
michael@0 451
michael@0 452 evrpc_request_done_closure(rpc_state, EVRPC_CONTINUE);
michael@0 453 return;
michael@0 454
michael@0 455 error:
michael@0 456 if (rpc_state != NULL)
michael@0 457 evrpc_reqstate_free(rpc_state);
michael@0 458 evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
michael@0 459 return;
michael@0 460 }
michael@0 461
michael@0 462 void *
michael@0 463 evrpc_get_request(struct evrpc_req_generic *req)
michael@0 464 {
michael@0 465 return req->request;
michael@0 466 }
michael@0 467
michael@0 468 void *
michael@0 469 evrpc_get_reply(struct evrpc_req_generic *req)
michael@0 470 {
michael@0 471 return req->reply;
michael@0 472 }
michael@0 473
michael@0 474 static void
michael@0 475 evrpc_request_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res)
michael@0 476 {
michael@0 477 struct evrpc_req_generic *rpc_state = arg;
michael@0 478 struct evhttp_request *req;
michael@0 479 EVUTIL_ASSERT(rpc_state);
michael@0 480 req = rpc_state->http_req;
michael@0 481
michael@0 482 if (hook_res == EVRPC_TERMINATE)
michael@0 483 goto error;
michael@0 484
michael@0 485 /* on success, we are going to transmit marshaled binary data */
michael@0 486 if (evhttp_find_header(req->output_headers, "Content-Type") == NULL) {
michael@0 487 evhttp_add_header(req->output_headers,
michael@0 488 "Content-Type", "application/octet-stream");
michael@0 489 }
michael@0 490 evhttp_send_reply(req, HTTP_OK, "OK", rpc_state->rpc_data);
michael@0 491
michael@0 492 evrpc_reqstate_free(rpc_state);
michael@0 493
michael@0 494 return;
michael@0 495
michael@0 496 error:
michael@0 497 if (rpc_state != NULL)
michael@0 498 evrpc_reqstate_free(rpc_state);
michael@0 499 evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
michael@0 500 return;
michael@0 501 }
michael@0 502
michael@0 503
michael@0 504 /* Client implementation of RPC site */
michael@0 505
michael@0 506 static int evrpc_schedule_request(struct evhttp_connection *connection,
michael@0 507 struct evrpc_request_wrapper *ctx);
michael@0 508
michael@0 509 struct evrpc_pool *
michael@0 510 evrpc_pool_new(struct event_base *base)
michael@0 511 {
michael@0 512 struct evrpc_pool *pool = mm_calloc(1, sizeof(struct evrpc_pool));
michael@0 513 if (pool == NULL)
michael@0 514 return (NULL);
michael@0 515
michael@0 516 TAILQ_INIT(&pool->connections);
michael@0 517 TAILQ_INIT(&pool->requests);
michael@0 518
michael@0 519 TAILQ_INIT(&pool->paused_requests);
michael@0 520
michael@0 521 TAILQ_INIT(&pool->input_hooks);
michael@0 522 TAILQ_INIT(&pool->output_hooks);
michael@0 523
michael@0 524 pool->base = base;
michael@0 525 pool->timeout = -1;
michael@0 526
michael@0 527 return (pool);
michael@0 528 }
michael@0 529
michael@0 530 static void
michael@0 531 evrpc_request_wrapper_free(struct evrpc_request_wrapper *request)
michael@0 532 {
michael@0 533 if (request->hook_meta != NULL)
michael@0 534 evrpc_hook_context_free(request->hook_meta);
michael@0 535 mm_free(request->name);
michael@0 536 mm_free(request);
michael@0 537 }
michael@0 538
michael@0 539 void
michael@0 540 evrpc_pool_free(struct evrpc_pool *pool)
michael@0 541 {
michael@0 542 struct evhttp_connection *connection;
michael@0 543 struct evrpc_request_wrapper *request;
michael@0 544 struct evrpc_hook_ctx *pause;
michael@0 545 struct evrpc_hook *hook;
michael@0 546 int r;
michael@0 547
michael@0 548 while ((request = TAILQ_FIRST(&pool->requests)) != NULL) {
michael@0 549 TAILQ_REMOVE(&pool->requests, request, next);
michael@0 550 evrpc_request_wrapper_free(request);
michael@0 551 }
michael@0 552
michael@0 553 while ((pause = TAILQ_FIRST(&pool->paused_requests)) != NULL) {
michael@0 554 TAILQ_REMOVE(&pool->paused_requests, pause, next);
michael@0 555 mm_free(pause);
michael@0 556 }
michael@0 557
michael@0 558 while ((connection = TAILQ_FIRST(&pool->connections)) != NULL) {
michael@0 559 TAILQ_REMOVE(&pool->connections, connection, next);
michael@0 560 evhttp_connection_free(connection);
michael@0 561 }
michael@0 562
michael@0 563 while ((hook = TAILQ_FIRST(&pool->input_hooks)) != NULL) {
michael@0 564 r = evrpc_remove_hook(pool, EVRPC_INPUT, hook);
michael@0 565 EVUTIL_ASSERT(r);
michael@0 566 }
michael@0 567
michael@0 568 while ((hook = TAILQ_FIRST(&pool->output_hooks)) != NULL) {
michael@0 569 r = evrpc_remove_hook(pool, EVRPC_OUTPUT, hook);
michael@0 570 EVUTIL_ASSERT(r);
michael@0 571 }
michael@0 572
michael@0 573 mm_free(pool);
michael@0 574 }
michael@0 575
michael@0 576 /*
michael@0 577 * Add a connection to the RPC pool. A request scheduled on the pool
michael@0 578 * may use any available connection.
michael@0 579 */
michael@0 580
michael@0 581 void
michael@0 582 evrpc_pool_add_connection(struct evrpc_pool *pool,
michael@0 583 struct evhttp_connection *connection)
michael@0 584 {
michael@0 585 EVUTIL_ASSERT(connection->http_server == NULL);
michael@0 586 TAILQ_INSERT_TAIL(&pool->connections, connection, next);
michael@0 587
michael@0 588 /*
michael@0 589 * associate an event base with this connection
michael@0 590 */
michael@0 591 if (pool->base != NULL)
michael@0 592 evhttp_connection_set_base(connection, pool->base);
michael@0 593
michael@0 594 /*
michael@0 595 * unless a timeout was specifically set for a connection,
michael@0 596 * the connection inherits the timeout from the pool.
michael@0 597 */
michael@0 598 if (connection->timeout == -1)
michael@0 599 connection->timeout = pool->timeout;
michael@0 600
michael@0 601 /*
michael@0 602 * if we have any requests pending, schedule them with the new
michael@0 603 * connections.
michael@0 604 */
michael@0 605
michael@0 606 if (TAILQ_FIRST(&pool->requests) != NULL) {
michael@0 607 struct evrpc_request_wrapper *request =
michael@0 608 TAILQ_FIRST(&pool->requests);
michael@0 609 TAILQ_REMOVE(&pool->requests, request, next);
michael@0 610 evrpc_schedule_request(connection, request);
michael@0 611 }
michael@0 612 }
michael@0 613
michael@0 614 void
michael@0 615 evrpc_pool_remove_connection(struct evrpc_pool *pool,
michael@0 616 struct evhttp_connection *connection)
michael@0 617 {
michael@0 618 TAILQ_REMOVE(&pool->connections, connection, next);
michael@0 619 }
michael@0 620
michael@0 621 void
michael@0 622 evrpc_pool_set_timeout(struct evrpc_pool *pool, int timeout_in_secs)
michael@0 623 {
michael@0 624 struct evhttp_connection *evcon;
michael@0 625 TAILQ_FOREACH(evcon, &pool->connections, next) {
michael@0 626 evcon->timeout = timeout_in_secs;
michael@0 627 }
michael@0 628 pool->timeout = timeout_in_secs;
michael@0 629 }
michael@0 630
michael@0 631
michael@0 632 static void evrpc_reply_done(struct evhttp_request *, void *);
michael@0 633 static void evrpc_request_timeout(evutil_socket_t, short, void *);
michael@0 634
michael@0 635 /*
michael@0 636 * Finds a connection object associated with the pool that is currently
michael@0 637 * idle and can be used to make a request.
michael@0 638 */
michael@0 639 static struct evhttp_connection *
michael@0 640 evrpc_pool_find_connection(struct evrpc_pool *pool)
michael@0 641 {
michael@0 642 struct evhttp_connection *connection;
michael@0 643 TAILQ_FOREACH(connection, &pool->connections, next) {
michael@0 644 if (TAILQ_FIRST(&connection->requests) == NULL)
michael@0 645 return (connection);
michael@0 646 }
michael@0 647
michael@0 648 return (NULL);
michael@0 649 }
michael@0 650
michael@0 651 /*
michael@0 652 * Prototypes responsible for evrpc scheduling and hooking
michael@0 653 */
michael@0 654
michael@0 655 static void evrpc_schedule_request_closure(void *ctx, enum EVRPC_HOOK_RESULT);
michael@0 656
michael@0 657 /*
michael@0 658 * We assume that the ctx is no longer queued on the pool.
michael@0 659 */
michael@0 660 static int
michael@0 661 evrpc_schedule_request(struct evhttp_connection *connection,
michael@0 662 struct evrpc_request_wrapper *ctx)
michael@0 663 {
michael@0 664 struct evhttp_request *req = NULL;
michael@0 665 struct evrpc_pool *pool = ctx->pool;
michael@0 666 struct evrpc_status status;
michael@0 667
michael@0 668 if ((req = evhttp_request_new(evrpc_reply_done, ctx)) == NULL)
michael@0 669 goto error;
michael@0 670
michael@0 671 /* serialize the request data into the output buffer */
michael@0 672 ctx->request_marshal(req->output_buffer, ctx->request);
michael@0 673
michael@0 674 /* we need to know the connection that we might have to abort */
michael@0 675 ctx->evcon = connection;
michael@0 676
michael@0 677 /* if we get paused we also need to know the request */
michael@0 678 ctx->req = req;
michael@0 679
michael@0 680 if (TAILQ_FIRST(&pool->output_hooks) != NULL) {
michael@0 681 int hook_res;
michael@0 682
michael@0 683 evrpc_hook_associate_meta(&ctx->hook_meta, connection);
michael@0 684
michael@0 685 /* apply hooks to the outgoing request */
michael@0 686 hook_res = evrpc_process_hooks(&pool->output_hooks,
michael@0 687 ctx, req, req->output_buffer);
michael@0 688
michael@0 689 switch (hook_res) {
michael@0 690 case EVRPC_TERMINATE:
michael@0 691 goto error;
michael@0 692 case EVRPC_PAUSE:
michael@0 693 /* we need to be explicitly resumed */
michael@0 694 if (evrpc_pause_request(pool, ctx,
michael@0 695 evrpc_schedule_request_closure) == -1)
michael@0 696 goto error;
michael@0 697 return (0);
michael@0 698 case EVRPC_CONTINUE:
michael@0 699 /* we can just continue */
michael@0 700 break;
michael@0 701 default:
michael@0 702 EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE ||
michael@0 703 hook_res == EVRPC_CONTINUE ||
michael@0 704 hook_res == EVRPC_PAUSE);
michael@0 705 }
michael@0 706 }
michael@0 707
michael@0 708 evrpc_schedule_request_closure(ctx, EVRPC_CONTINUE);
michael@0 709 return (0);
michael@0 710
michael@0 711 error:
michael@0 712 memset(&status, 0, sizeof(status));
michael@0 713 status.error = EVRPC_STATUS_ERR_UNSTARTED;
michael@0 714 (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
michael@0 715 evrpc_request_wrapper_free(ctx);
michael@0 716 return (-1);
michael@0 717 }
michael@0 718
michael@0 719 static void
michael@0 720 evrpc_schedule_request_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res)
michael@0 721 {
michael@0 722 struct evrpc_request_wrapper *ctx = arg;
michael@0 723 struct evhttp_connection *connection = ctx->evcon;
michael@0 724 struct evhttp_request *req = ctx->req;
michael@0 725 struct evrpc_pool *pool = ctx->pool;
michael@0 726 struct evrpc_status status;
michael@0 727 char *uri = NULL;
michael@0 728 int res = 0;
michael@0 729
michael@0 730 if (hook_res == EVRPC_TERMINATE)
michael@0 731 goto error;
michael@0 732
michael@0 733 uri = evrpc_construct_uri(ctx->name);
michael@0 734 if (uri == NULL)
michael@0 735 goto error;
michael@0 736
michael@0 737 if (pool->timeout > 0) {
michael@0 738 /*
michael@0 739 * a timeout after which the whole rpc is going to be aborted.
michael@0 740 */
michael@0 741 struct timeval tv;
michael@0 742 evutil_timerclear(&tv);
michael@0 743 tv.tv_sec = pool->timeout;
michael@0 744 evtimer_add(&ctx->ev_timeout, &tv);
michael@0 745 }
michael@0 746
michael@0 747 /* start the request over the connection */
michael@0 748 res = evhttp_make_request(connection, req, EVHTTP_REQ_POST, uri);
michael@0 749 mm_free(uri);
michael@0 750
michael@0 751 if (res == -1)
michael@0 752 goto error;
michael@0 753
michael@0 754 return;
michael@0 755
michael@0 756 error:
michael@0 757 memset(&status, 0, sizeof(status));
michael@0 758 status.error = EVRPC_STATUS_ERR_UNSTARTED;
michael@0 759 (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
michael@0 760 evrpc_request_wrapper_free(ctx);
michael@0 761 }
michael@0 762
michael@0 763 /* we just queue the paused request on the pool under the req object */
michael@0 764 static int
michael@0 765 evrpc_pause_request(void *vbase, void *ctx,
michael@0 766 void (*cb)(void *, enum EVRPC_HOOK_RESULT))
michael@0 767 {
michael@0 768 struct _evrpc_hooks *base = vbase;
michael@0 769 struct evrpc_hook_ctx *pause = mm_malloc(sizeof(*pause));
michael@0 770 if (pause == NULL)
michael@0 771 return (-1);
michael@0 772
michael@0 773 pause->ctx = ctx;
michael@0 774 pause->cb = cb;
michael@0 775
michael@0 776 TAILQ_INSERT_TAIL(&base->pause_requests, pause, next);
michael@0 777 return (0);
michael@0 778 }
michael@0 779
michael@0 780 int
michael@0 781 evrpc_resume_request(void *vbase, void *ctx, enum EVRPC_HOOK_RESULT res)
michael@0 782 {
michael@0 783 struct _evrpc_hooks *base = vbase;
michael@0 784 struct evrpc_pause_list *head = &base->pause_requests;
michael@0 785 struct evrpc_hook_ctx *pause;
michael@0 786
michael@0 787 TAILQ_FOREACH(pause, head, next) {
michael@0 788 if (pause->ctx == ctx)
michael@0 789 break;
michael@0 790 }
michael@0 791
michael@0 792 if (pause == NULL)
michael@0 793 return (-1);
michael@0 794
michael@0 795 (*pause->cb)(pause->ctx, res);
michael@0 796 TAILQ_REMOVE(head, pause, next);
michael@0 797 mm_free(pause);
michael@0 798 return (0);
michael@0 799 }
michael@0 800
michael@0 801 int
michael@0 802 evrpc_make_request(struct evrpc_request_wrapper *ctx)
michael@0 803 {
michael@0 804 struct evrpc_pool *pool = ctx->pool;
michael@0 805
michael@0 806 /* initialize the event structure for this rpc */
michael@0 807 evtimer_assign(&ctx->ev_timeout, pool->base, evrpc_request_timeout, ctx);
michael@0 808
michael@0 809 /* we better have some available connections on the pool */
michael@0 810 EVUTIL_ASSERT(TAILQ_FIRST(&pool->connections) != NULL);
michael@0 811
michael@0 812 /*
michael@0 813 * if no connection is available, we queue the request on the pool,
michael@0 814 * the next time a connection is empty, the rpc will be send on that.
michael@0 815 */
michael@0 816 TAILQ_INSERT_TAIL(&pool->requests, ctx, next);
michael@0 817
michael@0 818 evrpc_pool_schedule(pool);
michael@0 819
michael@0 820 return (0);
michael@0 821 }
michael@0 822
michael@0 823
michael@0 824 struct evrpc_request_wrapper *
michael@0 825 evrpc_make_request_ctx(
michael@0 826 struct evrpc_pool *pool, void *request, void *reply,
michael@0 827 const char *rpcname,
michael@0 828 void (*req_marshal)(struct evbuffer*, void *),
michael@0 829 void (*rpl_clear)(void *),
michael@0 830 int (*rpl_unmarshal)(void *, struct evbuffer *),
michael@0 831 void (*cb)(struct evrpc_status *, void *, void *, void *),
michael@0 832 void *cbarg)
michael@0 833 {
michael@0 834 struct evrpc_request_wrapper *ctx = (struct evrpc_request_wrapper *)
michael@0 835 mm_malloc(sizeof(struct evrpc_request_wrapper));
michael@0 836 if (ctx == NULL)
michael@0 837 return (NULL);
michael@0 838
michael@0 839 ctx->pool = pool;
michael@0 840 ctx->hook_meta = NULL;
michael@0 841 ctx->evcon = NULL;
michael@0 842 ctx->name = mm_strdup(rpcname);
michael@0 843 if (ctx->name == NULL) {
michael@0 844 mm_free(ctx);
michael@0 845 return (NULL);
michael@0 846 }
michael@0 847 ctx->cb = cb;
michael@0 848 ctx->cb_arg = cbarg;
michael@0 849 ctx->request = request;
michael@0 850 ctx->reply = reply;
michael@0 851 ctx->request_marshal = req_marshal;
michael@0 852 ctx->reply_clear = rpl_clear;
michael@0 853 ctx->reply_unmarshal = rpl_unmarshal;
michael@0 854
michael@0 855 return (ctx);
michael@0 856 }
michael@0 857
michael@0 858 static void
michael@0 859 evrpc_reply_done_closure(void *, enum EVRPC_HOOK_RESULT);
michael@0 860
michael@0 861 static void
michael@0 862 evrpc_reply_done(struct evhttp_request *req, void *arg)
michael@0 863 {
michael@0 864 struct evrpc_request_wrapper *ctx = arg;
michael@0 865 struct evrpc_pool *pool = ctx->pool;
michael@0 866 int hook_res = EVRPC_CONTINUE;
michael@0 867
michael@0 868 /* cancel any timeout we might have scheduled */
michael@0 869 event_del(&ctx->ev_timeout);
michael@0 870
michael@0 871 ctx->req = req;
michael@0 872
michael@0 873 /* we need to get the reply now */
michael@0 874 if (req == NULL) {
michael@0 875 evrpc_reply_done_closure(ctx, EVRPC_CONTINUE);
michael@0 876 return;
michael@0 877 }
michael@0 878
michael@0 879 if (TAILQ_FIRST(&pool->input_hooks) != NULL) {
michael@0 880 evrpc_hook_associate_meta(&ctx->hook_meta, ctx->evcon);
michael@0 881
michael@0 882 /* apply hooks to the incoming request */
michael@0 883 hook_res = evrpc_process_hooks(&pool->input_hooks,
michael@0 884 ctx, req, req->input_buffer);
michael@0 885
michael@0 886 switch (hook_res) {
michael@0 887 case EVRPC_TERMINATE:
michael@0 888 case EVRPC_CONTINUE:
michael@0 889 break;
michael@0 890 case EVRPC_PAUSE:
michael@0 891 /*
michael@0 892 * if we get paused we also need to know the
michael@0 893 * request. unfortunately, the underlying
michael@0 894 * layer is going to free it. we need to
michael@0 895 * request ownership explicitly
michael@0 896 */
michael@0 897 if (req != NULL)
michael@0 898 evhttp_request_own(req);
michael@0 899
michael@0 900 evrpc_pause_request(pool, ctx,
michael@0 901 evrpc_reply_done_closure);
michael@0 902 return;
michael@0 903 default:
michael@0 904 EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE ||
michael@0 905 hook_res == EVRPC_CONTINUE ||
michael@0 906 hook_res == EVRPC_PAUSE);
michael@0 907 }
michael@0 908 }
michael@0 909
michael@0 910 evrpc_reply_done_closure(ctx, hook_res);
michael@0 911
michael@0 912 /* http request is being freed by underlying layer */
michael@0 913 }
michael@0 914
michael@0 915 static void
michael@0 916 evrpc_reply_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res)
michael@0 917 {
michael@0 918 struct evrpc_request_wrapper *ctx = arg;
michael@0 919 struct evhttp_request *req = ctx->req;
michael@0 920 struct evrpc_pool *pool = ctx->pool;
michael@0 921 struct evrpc_status status;
michael@0 922 int res = -1;
michael@0 923
michael@0 924 memset(&status, 0, sizeof(status));
michael@0 925 status.http_req = req;
michael@0 926
michael@0 927 /* we need to get the reply now */
michael@0 928 if (req == NULL) {
michael@0 929 status.error = EVRPC_STATUS_ERR_TIMEOUT;
michael@0 930 } else if (hook_res == EVRPC_TERMINATE) {
michael@0 931 status.error = EVRPC_STATUS_ERR_HOOKABORTED;
michael@0 932 } else {
michael@0 933 res = ctx->reply_unmarshal(ctx->reply, req->input_buffer);
michael@0 934 if (res == -1)
michael@0 935 status.error = EVRPC_STATUS_ERR_BADPAYLOAD;
michael@0 936 }
michael@0 937
michael@0 938 if (res == -1) {
michael@0 939 /* clear everything that we might have written previously */
michael@0 940 ctx->reply_clear(ctx->reply);
michael@0 941 }
michael@0 942
michael@0 943 (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
michael@0 944
michael@0 945 evrpc_request_wrapper_free(ctx);
michael@0 946
michael@0 947 /* the http layer owned the original request structure, but if we
michael@0 948 * got paused, we asked for ownership and need to free it here. */
michael@0 949 if (req != NULL && evhttp_request_is_owned(req))
michael@0 950 evhttp_request_free(req);
michael@0 951
michael@0 952 /* see if we can schedule another request */
michael@0 953 evrpc_pool_schedule(pool);
michael@0 954 }
michael@0 955
michael@0 956 static void
michael@0 957 evrpc_pool_schedule(struct evrpc_pool *pool)
michael@0 958 {
michael@0 959 struct evrpc_request_wrapper *ctx = TAILQ_FIRST(&pool->requests);
michael@0 960 struct evhttp_connection *evcon;
michael@0 961
michael@0 962 /* if no requests are pending, we have no work */
michael@0 963 if (ctx == NULL)
michael@0 964 return;
michael@0 965
michael@0 966 if ((evcon = evrpc_pool_find_connection(pool)) != NULL) {
michael@0 967 TAILQ_REMOVE(&pool->requests, ctx, next);
michael@0 968 evrpc_schedule_request(evcon, ctx);
michael@0 969 }
michael@0 970 }
michael@0 971
michael@0 972 static void
michael@0 973 evrpc_request_timeout(evutil_socket_t fd, short what, void *arg)
michael@0 974 {
michael@0 975 struct evrpc_request_wrapper *ctx = arg;
michael@0 976 struct evhttp_connection *evcon = ctx->evcon;
michael@0 977 EVUTIL_ASSERT(evcon != NULL);
michael@0 978
michael@0 979 evhttp_connection_fail(evcon, EVCON_HTTP_TIMEOUT);
michael@0 980 }
michael@0 981
michael@0 982 /*
michael@0 983 * frees potential meta data associated with a request.
michael@0 984 */
michael@0 985
michael@0 986 static void
michael@0 987 evrpc_meta_data_free(struct evrpc_meta_list *meta_data)
michael@0 988 {
michael@0 989 struct evrpc_meta *entry;
michael@0 990 EVUTIL_ASSERT(meta_data != NULL);
michael@0 991
michael@0 992 while ((entry = TAILQ_FIRST(meta_data)) != NULL) {
michael@0 993 TAILQ_REMOVE(meta_data, entry, next);
michael@0 994 mm_free(entry->key);
michael@0 995 mm_free(entry->data);
michael@0 996 mm_free(entry);
michael@0 997 }
michael@0 998 }
michael@0 999
michael@0 1000 static struct evrpc_hook_meta *
michael@0 1001 evrpc_hook_meta_new(void)
michael@0 1002 {
michael@0 1003 struct evrpc_hook_meta *ctx;
michael@0 1004 ctx = mm_malloc(sizeof(struct evrpc_hook_meta));
michael@0 1005 EVUTIL_ASSERT(ctx != NULL);
michael@0 1006
michael@0 1007 TAILQ_INIT(&ctx->meta_data);
michael@0 1008 ctx->evcon = NULL;
michael@0 1009
michael@0 1010 return (ctx);
michael@0 1011 }
michael@0 1012
michael@0 1013 static void
michael@0 1014 evrpc_hook_associate_meta(struct evrpc_hook_meta **pctx,
michael@0 1015 struct evhttp_connection *evcon)
michael@0 1016 {
michael@0 1017 struct evrpc_hook_meta *ctx = *pctx;
michael@0 1018 if (ctx == NULL)
michael@0 1019 *pctx = ctx = evrpc_hook_meta_new();
michael@0 1020 ctx->evcon = evcon;
michael@0 1021 }
michael@0 1022
michael@0 1023 static void
michael@0 1024 evrpc_hook_context_free(struct evrpc_hook_meta *ctx)
michael@0 1025 {
michael@0 1026 evrpc_meta_data_free(&ctx->meta_data);
michael@0 1027 mm_free(ctx);
michael@0 1028 }
michael@0 1029
michael@0 1030 /* Adds meta data */
michael@0 1031 void
michael@0 1032 evrpc_hook_add_meta(void *ctx, const char *key,
michael@0 1033 const void *data, size_t data_size)
michael@0 1034 {
michael@0 1035 struct evrpc_request_wrapper *req = ctx;
michael@0 1036 struct evrpc_hook_meta *store = NULL;
michael@0 1037 struct evrpc_meta *meta = NULL;
michael@0 1038
michael@0 1039 if ((store = req->hook_meta) == NULL)
michael@0 1040 store = req->hook_meta = evrpc_hook_meta_new();
michael@0 1041
michael@0 1042 meta = mm_malloc(sizeof(struct evrpc_meta));
michael@0 1043 EVUTIL_ASSERT(meta != NULL);
michael@0 1044 meta->key = mm_strdup(key);
michael@0 1045 EVUTIL_ASSERT(meta->key != NULL);
michael@0 1046 meta->data_size = data_size;
michael@0 1047 meta->data = mm_malloc(data_size);
michael@0 1048 EVUTIL_ASSERT(meta->data != NULL);
michael@0 1049 memcpy(meta->data, data, data_size);
michael@0 1050
michael@0 1051 TAILQ_INSERT_TAIL(&store->meta_data, meta, next);
michael@0 1052 }
michael@0 1053
michael@0 1054 int
michael@0 1055 evrpc_hook_find_meta(void *ctx, const char *key, void **data, size_t *data_size)
michael@0 1056 {
michael@0 1057 struct evrpc_request_wrapper *req = ctx;
michael@0 1058 struct evrpc_meta *meta = NULL;
michael@0 1059
michael@0 1060 if (req->hook_meta == NULL)
michael@0 1061 return (-1);
michael@0 1062
michael@0 1063 TAILQ_FOREACH(meta, &req->hook_meta->meta_data, next) {
michael@0 1064 if (strcmp(meta->key, key) == 0) {
michael@0 1065 *data = meta->data;
michael@0 1066 *data_size = meta->data_size;
michael@0 1067 return (0);
michael@0 1068 }
michael@0 1069 }
michael@0 1070
michael@0 1071 return (-1);
michael@0 1072 }
michael@0 1073
michael@0 1074 struct evhttp_connection *
michael@0 1075 evrpc_hook_get_connection(void *ctx)
michael@0 1076 {
michael@0 1077 struct evrpc_request_wrapper *req = ctx;
michael@0 1078 return (req->hook_meta != NULL ? req->hook_meta->evcon : NULL);
michael@0 1079 }
michael@0 1080
michael@0 1081 int
michael@0 1082 evrpc_send_request_generic(struct evrpc_pool *pool,
michael@0 1083 void *request, void *reply,
michael@0 1084 void (*cb)(struct evrpc_status *, void *, void *, void *),
michael@0 1085 void *cb_arg,
michael@0 1086 const char *rpcname,
michael@0 1087 void (*req_marshal)(struct evbuffer *, void *),
michael@0 1088 void (*rpl_clear)(void *),
michael@0 1089 int (*rpl_unmarshal)(void *, struct evbuffer *))
michael@0 1090 {
michael@0 1091 struct evrpc_status status;
michael@0 1092 struct evrpc_request_wrapper *ctx;
michael@0 1093 ctx = evrpc_make_request_ctx(pool, request, reply,
michael@0 1094 rpcname, req_marshal, rpl_clear, rpl_unmarshal, cb, cb_arg);
michael@0 1095 if (ctx == NULL)
michael@0 1096 goto error;
michael@0 1097 return (evrpc_make_request(ctx));
michael@0 1098 error:
michael@0 1099 memset(&status, 0, sizeof(status));
michael@0 1100 status.error = EVRPC_STATUS_ERR_UNSTARTED;
michael@0 1101 (*(cb))(&status, request, reply, cb_arg);
michael@0 1102 return (-1);
michael@0 1103 }
michael@0 1104
michael@0 1105 /** Takes a request object and fills it in with the right magic */
michael@0 1106 static struct evrpc *
michael@0 1107 evrpc_register_object(const char *name,
michael@0 1108 void *(*req_new)(void*), void *req_new_arg, void (*req_free)(void *),
michael@0 1109 int (*req_unmarshal)(void *, struct evbuffer *),
michael@0 1110 void *(*rpl_new)(void*), void *rpl_new_arg, void (*rpl_free)(void *),
michael@0 1111 int (*rpl_complete)(void *),
michael@0 1112 void (*rpl_marshal)(struct evbuffer *, void *))
michael@0 1113 {
michael@0 1114 struct evrpc* rpc = (struct evrpc *)mm_calloc(1, sizeof(struct evrpc));
michael@0 1115 if (rpc == NULL)
michael@0 1116 return (NULL);
michael@0 1117 rpc->uri = mm_strdup(name);
michael@0 1118 if (rpc->uri == NULL) {
michael@0 1119 mm_free(rpc);
michael@0 1120 return (NULL);
michael@0 1121 }
michael@0 1122 rpc->request_new = req_new;
michael@0 1123 rpc->request_new_arg = req_new_arg;
michael@0 1124 rpc->request_free = req_free;
michael@0 1125 rpc->request_unmarshal = req_unmarshal;
michael@0 1126 rpc->reply_new = rpl_new;
michael@0 1127 rpc->reply_new_arg = rpl_new_arg;
michael@0 1128 rpc->reply_free = rpl_free;
michael@0 1129 rpc->reply_complete = rpl_complete;
michael@0 1130 rpc->reply_marshal = rpl_marshal;
michael@0 1131 return (rpc);
michael@0 1132 }
michael@0 1133
michael@0 1134 int
michael@0 1135 evrpc_register_generic(struct evrpc_base *base, const char *name,
michael@0 1136 void (*callback)(struct evrpc_req_generic *, void *), void *cbarg,
michael@0 1137 void *(*req_new)(void *), void *req_new_arg, void (*req_free)(void *),
michael@0 1138 int (*req_unmarshal)(void *, struct evbuffer *),
michael@0 1139 void *(*rpl_new)(void *), void *rpl_new_arg, void (*rpl_free)(void *),
michael@0 1140 int (*rpl_complete)(void *),
michael@0 1141 void (*rpl_marshal)(struct evbuffer *, void *))
michael@0 1142 {
michael@0 1143 struct evrpc* rpc =
michael@0 1144 evrpc_register_object(name, req_new, req_new_arg, req_free, req_unmarshal,
michael@0 1145 rpl_new, rpl_new_arg, rpl_free, rpl_complete, rpl_marshal);
michael@0 1146 if (rpc == NULL)
michael@0 1147 return (-1);
michael@0 1148 evrpc_register_rpc(base, rpc,
michael@0 1149 (void (*)(struct evrpc_req_generic*, void *))callback, cbarg);
michael@0 1150 return (0);
michael@0 1151 }
michael@0 1152
michael@0 1153 /** accessors for obscure and undocumented functionality */
michael@0 1154 struct evrpc_pool *
michael@0 1155 evrpc_request_get_pool(struct evrpc_request_wrapper *ctx)
michael@0 1156 {
michael@0 1157 return (ctx->pool);
michael@0 1158 }
michael@0 1159
michael@0 1160 void
michael@0 1161 evrpc_request_set_pool(struct evrpc_request_wrapper *ctx,
michael@0 1162 struct evrpc_pool *pool)
michael@0 1163 {
michael@0 1164 ctx->pool = pool;
michael@0 1165 }
michael@0 1166
michael@0 1167 void
michael@0 1168 evrpc_request_set_cb(struct evrpc_request_wrapper *ctx,
michael@0 1169 void (*cb)(struct evrpc_status*, void *request, void *reply, void *arg),
michael@0 1170 void *cb_arg)
michael@0 1171 {
michael@0 1172 ctx->cb = cb;
michael@0 1173 ctx->cb_arg = cb_arg;
michael@0 1174 }

mercurial