Tue, 06 Jan 2015 21:39:09 +0100
Conditionally force memory storage according to privacy.thirdparty.isolate;
This solves Tor bug #9701, complying with disk avoidance documented in
https://www.torproject.org/projects/torbrowser/design/#disk-avoidance.
1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6 /***********************************************************************
7 **
8 ** Name: thrpool.c
9 **
10 ** Description: Test threadpool functionality.
11 **
12 ** Modification History:
13 */
14 #include "primpl.h"
16 #include "plgetopt.h"
18 #include <stdio.h>
19 #include <string.h>
20 #include <errno.h>
21 #ifdef XP_UNIX
22 #include <sys/mman.h>
23 #endif
24 #if defined(_PR_PTHREADS) && !defined(_PR_DCETHREADS)
25 #include <pthread.h>
26 #endif
28 /* for getcwd */
29 #if defined(XP_UNIX) || defined (XP_OS2) || defined(XP_BEOS)
30 #include <unistd.h>
31 #elif defined(XP_PC)
32 #include <direct.h>
33 #endif
35 #ifdef WIN32
36 #include <process.h>
37 #endif
39 static int _debug_on = 0;
40 static char *program_name = NULL;
41 static void serve_client_write(void *arg);
43 #include "obsolete/prsem.h"
45 #ifdef XP_PC
46 #define mode_t int
47 #endif
49 #define DPRINTF(arg) if (_debug_on) printf arg
52 #define BUF_DATA_SIZE (2 * 1024)
53 #define TCP_MESG_SIZE 1024
54 #define NUM_TCP_CLIENTS 10 /* for a listen queue depth of 5 */
57 #define NUM_TCP_CONNECTIONS_PER_CLIENT 10
58 #define NUM_TCP_MESGS_PER_CONNECTION 10
59 #define TCP_SERVER_PORT 10000
60 #define SERVER_MAX_BIND_COUNT 100
62 #ifdef WINCE
63 char *getcwd(char *buf, size_t size)
64 {
65 wchar_t wpath[MAX_PATH];
66 _wgetcwd(wpath, MAX_PATH);
67 WideCharToMultiByte(CP_ACP, 0, wpath, -1, buf, size, 0, 0);
68 }
70 #define perror(s)
71 #endif
73 static PRInt32 num_tcp_clients = NUM_TCP_CLIENTS;
74 static PRInt32 num_tcp_connections_per_client = NUM_TCP_CONNECTIONS_PER_CLIENT;
75 static PRInt32 tcp_mesg_size = TCP_MESG_SIZE;
76 static PRInt32 num_tcp_mesgs_per_connection = NUM_TCP_MESGS_PER_CONNECTION;
77 static void TCP_Server_Accept(void *arg);
80 int failed_already=0;
81 typedef struct buffer {
82 char data[BUF_DATA_SIZE];
83 } buffer;
86 typedef struct Server_Param {
87 PRJobIoDesc iod; /* socket to read from/write to */
88 PRInt32 datalen; /* bytes of data transfered in each read/write */
89 PRNetAddr netaddr;
90 PRMonitor *exit_mon; /* monitor to signal on exit */
91 PRInt32 *job_counterp; /* counter to decrement, before exit */
92 PRInt32 conn_counter; /* counter to decrement, before exit */
93 PRThreadPool *tp;
94 } Server_Param;
96 typedef struct Serve_Client_Param {
97 PRJobIoDesc iod; /* socket to read from/write to */
98 PRInt32 datalen; /* bytes of data transfered in each read/write */
99 PRMonitor *exit_mon; /* monitor to signal on exit */
100 PRInt32 *job_counterp; /* counter to decrement, before exit */
101 PRThreadPool *tp;
102 } Serve_Client_Param;
104 typedef struct Session {
105 PRJobIoDesc iod; /* socket to read from/write to */
106 buffer *in_buf;
107 PRInt32 bytes;
108 PRInt32 msg_num;
109 PRInt32 bytes_read;
110 PRMonitor *exit_mon; /* monitor to signal on exit */
111 PRInt32 *job_counterp; /* counter to decrement, before exit */
112 PRThreadPool *tp;
113 } Session;
115 static void
116 serve_client_read(void *arg)
117 {
118 Session *sp = (Session *) arg;
119 int rem;
120 int bytes;
121 int offset;
122 PRFileDesc *sockfd;
123 char *buf;
124 PRJob *jobp;
126 PRIntervalTime timeout = PR_INTERVAL_NO_TIMEOUT;
128 sockfd = sp->iod.socket;
129 buf = sp->in_buf->data;
131 PR_ASSERT(sp->msg_num < num_tcp_mesgs_per_connection);
132 PR_ASSERT(sp->bytes_read < sp->bytes);
134 offset = sp->bytes_read;
135 rem = sp->bytes - offset;
136 bytes = PR_Recv(sockfd, buf + offset, rem, 0, timeout);
137 if (bytes < 0) {
138 return;
139 }
140 sp->bytes_read += bytes;
141 sp->iod.timeout = PR_SecondsToInterval(60);
142 if (sp->bytes_read < sp->bytes) {
143 jobp = PR_QueueJob_Read(sp->tp, &sp->iod, serve_client_read, sp,
144 PR_FALSE);
145 PR_ASSERT(NULL != jobp);
146 return;
147 }
148 PR_ASSERT(sp->bytes_read == sp->bytes);
149 DPRINTF(("serve_client: read complete, msg(%d) \n", sp->msg_num));
151 sp->iod.timeout = PR_SecondsToInterval(60);
152 jobp = PR_QueueJob_Write(sp->tp, &sp->iod, serve_client_write, sp,
153 PR_FALSE);
154 PR_ASSERT(NULL != jobp);
156 return;
157 }
159 static void
160 serve_client_write(void *arg)
161 {
162 Session *sp = (Session *) arg;
163 int bytes;
164 PRFileDesc *sockfd;
165 char *buf;
166 PRJob *jobp;
168 sockfd = sp->iod.socket;
169 buf = sp->in_buf->data;
171 PR_ASSERT(sp->msg_num < num_tcp_mesgs_per_connection);
173 bytes = PR_Send(sockfd, buf, sp->bytes, 0, PR_INTERVAL_NO_TIMEOUT);
174 PR_ASSERT(bytes == sp->bytes);
176 if (bytes < 0) {
177 return;
178 }
179 DPRINTF(("serve_client: write complete, msg(%d) \n", sp->msg_num));
180 sp->msg_num++;
181 if (sp->msg_num < num_tcp_mesgs_per_connection) {
182 sp->bytes_read = 0;
183 sp->iod.timeout = PR_SecondsToInterval(60);
184 jobp = PR_QueueJob_Read(sp->tp, &sp->iod, serve_client_read, sp,
185 PR_FALSE);
186 PR_ASSERT(NULL != jobp);
187 return;
188 }
190 DPRINTF(("serve_client: read/write complete, msg(%d) \n", sp->msg_num));
191 if (PR_Shutdown(sockfd, PR_SHUTDOWN_BOTH) < 0) {
192 fprintf(stderr,"%s: ERROR - PR_Shutdown\n", program_name);
193 }
195 PR_Close(sockfd);
196 PR_EnterMonitor(sp->exit_mon);
197 --(*sp->job_counterp);
198 PR_Notify(sp->exit_mon);
199 PR_ExitMonitor(sp->exit_mon);
201 PR_DELETE(sp->in_buf);
202 PR_DELETE(sp);
204 return;
205 }
207 /*
208 * Serve_Client
209 * Thread, started by the server, for serving a client connection.
210 * Reads data from socket and writes it back, unmodified, and
211 * closes the socket
212 */
213 static void PR_CALLBACK
214 Serve_Client(void *arg)
215 {
216 Serve_Client_Param *scp = (Serve_Client_Param *) arg;
217 buffer *in_buf;
218 Session *sp;
219 PRJob *jobp;
221 sp = PR_NEW(Session);
222 sp->iod = scp->iod;
224 in_buf = PR_NEW(buffer);
225 if (in_buf == NULL) {
226 fprintf(stderr,"%s: failed to alloc buffer struct\n",program_name);
227 failed_already=1;
228 return;
229 }
231 sp->in_buf = in_buf;
232 sp->bytes = scp->datalen;
233 sp->msg_num = 0;
234 sp->bytes_read = 0;
235 sp->tp = scp->tp;
236 sp->exit_mon = scp->exit_mon;
237 sp->job_counterp = scp->job_counterp;
239 sp->iod.timeout = PR_SecondsToInterval(60);
240 jobp = PR_QueueJob_Read(sp->tp, &sp->iod, serve_client_read, sp,
241 PR_FALSE);
242 PR_ASSERT(NULL != jobp);
243 PR_DELETE(scp);
244 }
246 static void
247 print_stats(void *arg)
248 {
249 Server_Param *sp = (Server_Param *) arg;
250 PRThreadPool *tp = sp->tp;
251 PRInt32 counter;
252 PRJob *jobp;
254 PR_EnterMonitor(sp->exit_mon);
255 counter = (*sp->job_counterp);
256 PR_ExitMonitor(sp->exit_mon);
258 printf("PRINT_STATS: #client connections = %d\n",counter);
261 jobp = PR_QueueJob_Timer(tp, PR_MillisecondsToInterval(500),
262 print_stats, sp, PR_FALSE);
264 PR_ASSERT(NULL != jobp);
265 }
267 static int job_counter = 0;
268 /*
269 * TCP Server
270 * Server binds an address to a socket, starts a client process and
271 * listens for incoming connections.
272 * Each client connects to the server and sends a chunk of data
273 * Starts a Serve_Client job for each incoming connection, to read
274 * the data from the client and send it back to the client, unmodified.
275 * Each client checks that data received from server is same as the
276 * data it sent to the server.
277 * Finally, the threadpool is shutdown
278 */
279 static void PR_CALLBACK
280 TCP_Server(void *arg)
281 {
282 PRThreadPool *tp = (PRThreadPool *) arg;
283 Server_Param *sp;
284 PRFileDesc *sockfd;
285 PRNetAddr netaddr;
286 PRMonitor *sc_mon;
287 PRJob *jobp;
288 int i;
289 PRStatus rval;
291 /*
292 * Create a tcp socket
293 */
294 if ((sockfd = PR_NewTCPSocket()) == NULL) {
295 fprintf(stderr,"%s: PR_NewTCPSocket failed\n", program_name);
296 return;
297 }
298 memset(&netaddr, 0 , sizeof(netaddr));
299 netaddr.inet.family = PR_AF_INET;
300 netaddr.inet.port = PR_htons(TCP_SERVER_PORT);
301 netaddr.inet.ip = PR_htonl(PR_INADDR_ANY);
302 /*
303 * try a few times to bind server's address, if addresses are in
304 * use
305 */
306 i = 0;
307 while (PR_Bind(sockfd, &netaddr) < 0) {
308 if (PR_GetError() == PR_ADDRESS_IN_USE_ERROR) {
309 netaddr.inet.port += 2;
310 if (i++ < SERVER_MAX_BIND_COUNT)
311 continue;
312 }
313 fprintf(stderr,"%s: ERROR - PR_Bind failed\n", program_name);
314 perror("PR_Bind");
315 failed_already=1;
316 return;
317 }
319 if (PR_Listen(sockfd, 32) < 0) {
320 fprintf(stderr,"%s: ERROR - PR_Listen failed\n", program_name);
321 failed_already=1;
322 return;
323 }
325 if (PR_GetSockName(sockfd, &netaddr) < 0) {
326 fprintf(stderr,"%s: ERROR - PR_GetSockName failed\n", program_name);
327 failed_already=1;
328 return;
329 }
331 DPRINTF((
332 "TCP_Server: PR_BIND netaddr.inet.ip = 0x%lx, netaddr.inet.port = %d\n",
333 netaddr.inet.ip, netaddr.inet.port));
335 sp = PR_NEW(Server_Param);
336 if (sp == NULL) {
337 fprintf(stderr,"%s: PR_NEW failed\n", program_name);
338 failed_already=1;
339 return;
340 }
341 sp->iod.socket = sockfd;
342 sp->iod.timeout = PR_SecondsToInterval(60);
343 sp->datalen = tcp_mesg_size;
344 sp->exit_mon = sc_mon;
345 sp->job_counterp = &job_counter;
346 sp->conn_counter = 0;
347 sp->tp = tp;
348 sp->netaddr = netaddr;
350 /* create and cancel an io job */
351 jobp = PR_QueueJob_Accept(tp, &sp->iod, TCP_Server_Accept, sp,
352 PR_FALSE);
353 PR_ASSERT(NULL != jobp);
354 rval = PR_CancelJob(jobp);
355 PR_ASSERT(PR_SUCCESS == rval);
357 /*
358 * create the client process
359 */
360 {
361 #define MAX_ARGS 4
362 char *argv[MAX_ARGS + 1];
363 int index = 0;
364 char port[32];
365 char path[1024 + sizeof("/thrpool_client")];
367 getcwd(path, sizeof(path));
369 (void)strcat(path, "/thrpool_client");
370 #ifdef XP_PC
371 (void)strcat(path, ".exe");
372 #endif
373 argv[index++] = path;
374 sprintf(port,"%d",PR_ntohs(netaddr.inet.port));
375 if (_debug_on)
376 {
377 argv[index++] = "-d";
378 argv[index++] = "-p";
379 argv[index++] = port;
380 argv[index++] = NULL;
381 } else {
382 argv[index++] = "-p";
383 argv[index++] = port;
384 argv[index++] = NULL;
385 }
386 PR_ASSERT(MAX_ARGS >= (index - 1));
388 DPRINTF(("creating client process %s ...\n", path));
389 if (PR_FAILURE == PR_CreateProcessDetached(path, argv, NULL, NULL)) {
390 fprintf(stderr,
391 "thrpool_server: ERROR - PR_CreateProcessDetached failed\n");
392 failed_already=1;
393 return;
394 }
395 }
397 sc_mon = PR_NewMonitor();
398 if (sc_mon == NULL) {
399 fprintf(stderr,"%s: PR_NewMonitor failed\n", program_name);
400 failed_already=1;
401 return;
402 }
404 sp->iod.socket = sockfd;
405 sp->iod.timeout = PR_SecondsToInterval(60);
406 sp->datalen = tcp_mesg_size;
407 sp->exit_mon = sc_mon;
408 sp->job_counterp = &job_counter;
409 sp->conn_counter = 0;
410 sp->tp = tp;
411 sp->netaddr = netaddr;
413 /* create and cancel a timer job */
414 jobp = PR_QueueJob_Timer(tp, PR_MillisecondsToInterval(5000),
415 print_stats, sp, PR_FALSE);
416 PR_ASSERT(NULL != jobp);
417 rval = PR_CancelJob(jobp);
418 PR_ASSERT(PR_SUCCESS == rval);
420 DPRINTF(("TCP_Server: Accepting connections \n"));
422 jobp = PR_QueueJob_Accept(tp, &sp->iod, TCP_Server_Accept, sp,
423 PR_FALSE);
424 PR_ASSERT(NULL != jobp);
425 return;
426 }
428 static void
429 TCP_Server_Accept(void *arg)
430 {
431 Server_Param *sp = (Server_Param *) arg;
432 PRThreadPool *tp = sp->tp;
433 Serve_Client_Param *scp;
434 PRFileDesc *newsockfd;
435 PRJob *jobp;
437 if ((newsockfd = PR_Accept(sp->iod.socket, &sp->netaddr,
438 PR_INTERVAL_NO_TIMEOUT)) == NULL) {
439 fprintf(stderr,"%s: ERROR - PR_Accept failed\n", program_name);
440 failed_already=1;
441 goto exit;
442 }
443 scp = PR_NEW(Serve_Client_Param);
444 if (scp == NULL) {
445 fprintf(stderr,"%s: PR_NEW failed\n", program_name);
446 failed_already=1;
447 goto exit;
448 }
450 /*
451 * Start a Serve_Client job for each incoming connection
452 */
453 scp->iod.socket = newsockfd;
454 scp->iod.timeout = PR_SecondsToInterval(60);
455 scp->datalen = tcp_mesg_size;
456 scp->exit_mon = sp->exit_mon;
457 scp->job_counterp = sp->job_counterp;
458 scp->tp = sp->tp;
460 PR_EnterMonitor(sp->exit_mon);
461 (*sp->job_counterp)++;
462 PR_ExitMonitor(sp->exit_mon);
463 jobp = PR_QueueJob(tp, Serve_Client, scp,
464 PR_FALSE);
466 PR_ASSERT(NULL != jobp);
467 DPRINTF(("TCP_Server: Created Serve_Client = 0x%lx\n", jobp));
469 /*
470 * single-threaded update; no lock needed
471 */
472 sp->conn_counter++;
473 if (sp->conn_counter <
474 (num_tcp_clients * num_tcp_connections_per_client)) {
475 jobp = PR_QueueJob_Accept(tp, &sp->iod, TCP_Server_Accept, sp,
476 PR_FALSE);
477 PR_ASSERT(NULL != jobp);
478 return;
479 }
480 jobp = PR_QueueJob_Timer(tp, PR_MillisecondsToInterval(500),
481 print_stats, sp, PR_FALSE);
483 PR_ASSERT(NULL != jobp);
484 DPRINTF(("TCP_Server: Created print_stats timer job = 0x%lx\n", jobp));
486 exit:
487 PR_EnterMonitor(sp->exit_mon);
488 /* Wait for server jobs to finish */
489 while (0 != *sp->job_counterp) {
490 PR_Wait(sp->exit_mon, PR_INTERVAL_NO_TIMEOUT);
491 DPRINTF(("TCP_Server: conn_counter = %d\n",
492 *sp->job_counterp));
493 }
495 PR_ExitMonitor(sp->exit_mon);
496 if (sp->iod.socket) {
497 PR_Close(sp->iod.socket);
498 }
499 PR_DestroyMonitor(sp->exit_mon);
500 printf("%30s","TCP_Socket_Client_Server_Test:");
501 printf("%2ld Server %2ld Clients %2ld connections_per_client\n",1l,
502 num_tcp_clients, num_tcp_connections_per_client);
503 printf("%30s %2ld messages_per_connection %4ld bytes_per_message\n",":",
504 num_tcp_mesgs_per_connection, tcp_mesg_size);
506 DPRINTF(("%s: calling PR_ShutdownThreadPool\n", program_name));
507 PR_ShutdownThreadPool(sp->tp);
508 PR_DELETE(sp);
509 }
511 /************************************************************************/
513 #define DEFAULT_INITIAL_THREADS 4
514 #define DEFAULT_MAX_THREADS 100
515 #define DEFAULT_STACKSIZE (512 * 1024)
517 int main(int argc, char **argv)
518 {
519 PRInt32 initial_threads = DEFAULT_INITIAL_THREADS;
520 PRInt32 max_threads = DEFAULT_MAX_THREADS;
521 PRInt32 stacksize = DEFAULT_STACKSIZE;
522 PRThreadPool *tp = NULL;
523 PRStatus rv;
524 PRJob *jobp;
526 /*
527 * -d debug mode
528 */
529 PLOptStatus os;
530 PLOptState *opt;
532 program_name = argv[0];
533 opt = PL_CreateOptState(argc, argv, "d");
534 while (PL_OPT_EOL != (os = PL_GetNextOpt(opt)))
535 {
536 if (PL_OPT_BAD == os) continue;
537 switch (opt->option)
538 {
539 case 'd': /* debug mode */
540 _debug_on = 1;
541 break;
542 default:
543 break;
544 }
545 }
546 PL_DestroyOptState(opt);
548 PR_Init(PR_USER_THREAD, PR_PRIORITY_NORMAL, 0);
549 PR_STDIO_INIT();
551 PR_SetConcurrency(4);
553 tp = PR_CreateThreadPool(initial_threads, max_threads, stacksize);
554 if (NULL == tp) {
555 printf("PR_CreateThreadPool failed\n");
556 failed_already=1;
557 goto done;
558 }
559 jobp = PR_QueueJob(tp, TCP_Server, tp, PR_TRUE);
560 rv = PR_JoinJob(jobp);
561 PR_ASSERT(PR_SUCCESS == rv);
563 DPRINTF(("%s: calling PR_JoinThreadPool\n", program_name));
564 rv = PR_JoinThreadPool(tp);
565 PR_ASSERT(PR_SUCCESS == rv);
566 DPRINTF(("%s: returning from PR_JoinThreadPool\n", program_name));
568 done:
569 PR_Cleanup();
570 if (failed_already) return 1;
571 else return 0;
572 }