|
160
|
1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
|
|
|
2 *
|
|
|
3 * Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
|
4 * of this software and associated documentation files (the "Software"), to
|
|
|
5 * deal in the Software without restriction, including without limitation the
|
|
|
6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
|
|
7 * sell copies of the Software, and to permit persons to whom the Software is
|
|
|
8 * furnished to do so, subject to the following conditions:
|
|
|
9 *
|
|
|
10 * The above copyright notice and this permission notice shall be included in
|
|
|
11 * all copies or substantial portions of the Software.
|
|
|
12 *
|
|
|
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
|
14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
|
15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
|
16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
|
17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
|
|
18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
|
|
|
19 * IN THE SOFTWARE.
|
|
|
20 */
|
|
|
21
|
|
|
22 #include "task.h"
|
|
|
23 #include "uv.h"
|
|
|
24
|
|
|
25 #define IPC_PIPE_NAME TEST_PIPENAME
|
|
|
26 #define NUM_CONNECTS (250 * 1000)
|
|
|
27
|
|
|
28 union stream_handle {
|
|
|
29 uv_pipe_t pipe;
|
|
|
30 uv_tcp_t tcp;
|
|
|
31 };
|
|
|
32
|
|
|
33 /* Use as (uv_stream_t *) &handle_storage -- it's kind of clunky but it
|
|
|
34 * avoids aliasing warnings.
|
|
|
35 */
|
|
|
36 typedef unsigned char handle_storage_t[sizeof(union stream_handle)];
|
|
|
37
|
|
|
38 /* Used for passing around the listen handle, not part of the benchmark proper.
|
|
|
39 * We have an overabundance of server types here. It works like this:
|
|
|
40 *
|
|
|
41 * 1. The main thread starts an IPC pipe server.
|
|
|
42 * 2. The worker threads connect to the IPC server and obtain a listen handle.
|
|
|
43 * 3. The worker threads start accepting requests on the listen handle.
|
|
|
44 * 4. The main thread starts connecting repeatedly.
|
|
|
45 *
|
|
|
46 * Step #4 should perhaps be farmed out over several threads.
|
|
|
47 */
|
|
|
48 struct ipc_server_ctx {
|
|
|
49 handle_storage_t server_handle;
|
|
|
50 unsigned int num_connects;
|
|
|
51 uv_pipe_t ipc_pipe;
|
|
|
52 };
|
|
|
53
|
|
|
54 struct ipc_peer_ctx {
|
|
|
55 handle_storage_t peer_handle;
|
|
|
56 uv_write_t write_req;
|
|
|
57 };
|
|
|
58
|
|
|
59 struct ipc_client_ctx {
|
|
|
60 uv_connect_t connect_req;
|
|
|
61 uv_stream_t* server_handle;
|
|
|
62 uv_pipe_t ipc_pipe;
|
|
|
63 char scratch[16];
|
|
|
64 };
|
|
|
65
|
|
|
66 /* Used in the actual benchmark. */
|
|
|
67 struct server_ctx {
|
|
|
68 handle_storage_t server_handle;
|
|
|
69 unsigned int num_connects;
|
|
|
70 uv_async_t async_handle;
|
|
|
71 uv_thread_t thread_id;
|
|
|
72 uv_sem_t semaphore;
|
|
|
73 };
|
|
|
74
|
|
|
75 struct client_ctx {
|
|
|
76 handle_storage_t client_handle;
|
|
|
77 unsigned int num_connects;
|
|
|
78 uv_connect_t connect_req;
|
|
|
79 uv_idle_t idle_handle;
|
|
|
80 };
|
|
|
81
|
|
|
82 static void ipc_connection_cb(uv_stream_t* ipc_pipe, int status);
|
|
|
83 static void ipc_write_cb(uv_write_t* req, int status);
|
|
|
84 static void ipc_close_cb(uv_handle_t* handle);
|
|
|
85 static void ipc_connect_cb(uv_connect_t* req, int status);
|
|
|
86 static void ipc_read_cb(uv_stream_t* handle,
|
|
|
87 ssize_t nread,
|
|
|
88 const uv_buf_t* buf);
|
|
|
89 static void ipc_alloc_cb(uv_handle_t* handle,
|
|
|
90 size_t suggested_size,
|
|
|
91 uv_buf_t* buf);
|
|
|
92
|
|
|
93 static void sv_async_cb(uv_async_t* handle);
|
|
|
94 static void sv_connection_cb(uv_stream_t* server_handle, int status);
|
|
|
95 static void sv_read_cb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf);
|
|
|
96 static void sv_alloc_cb(uv_handle_t* handle,
|
|
|
97 size_t suggested_size,
|
|
|
98 uv_buf_t* buf);
|
|
|
99
|
|
|
100 static void cl_connect_cb(uv_connect_t* req, int status);
|
|
|
101 static void cl_idle_cb(uv_idle_t* handle);
|
|
|
102 static void cl_close_cb(uv_handle_t* handle);
|
|
|
103
|
|
|
104 static struct sockaddr_in listen_addr;
|
|
|
105
|
|
|
106
|
|
|
107 static void ipc_connection_cb(uv_stream_t* ipc_pipe, int status) {
|
|
|
108 struct ipc_server_ctx* sc;
|
|
|
109 struct ipc_peer_ctx* pc;
|
|
|
110 uv_loop_t* loop;
|
|
|
111 uv_buf_t buf;
|
|
|
112
|
|
|
113 loop = ipc_pipe->loop;
|
|
|
114 buf = uv_buf_init("PING", 4);
|
|
|
115 sc = container_of(ipc_pipe, struct ipc_server_ctx, ipc_pipe);
|
|
|
116 pc = calloc(1, sizeof(*pc));
|
|
|
117 ASSERT_NOT_NULL(pc);
|
|
|
118
|
|
|
119 if (ipc_pipe->type == UV_TCP)
|
|
|
120 ASSERT_OK(uv_tcp_init(loop, (uv_tcp_t*) &pc->peer_handle));
|
|
|
121 else if (ipc_pipe->type == UV_NAMED_PIPE)
|
|
|
122 ASSERT_OK(uv_pipe_init(loop, (uv_pipe_t*) &pc->peer_handle, 1));
|
|
|
123 else
|
|
|
124 ASSERT(0);
|
|
|
125
|
|
|
126 ASSERT_OK(uv_accept(ipc_pipe, (uv_stream_t*) &pc->peer_handle));
|
|
|
127 ASSERT_OK(uv_write2(&pc->write_req,
|
|
|
128 (uv_stream_t*) &pc->peer_handle,
|
|
|
129 &buf,
|
|
|
130 1,
|
|
|
131 (uv_stream_t*) &sc->server_handle,
|
|
|
132 ipc_write_cb));
|
|
|
133
|
|
|
134 if (--sc->num_connects == 0)
|
|
|
135 uv_close((uv_handle_t*) ipc_pipe, NULL);
|
|
|
136 }
|
|
|
137
|
|
|
138
|
|
|
139 static void ipc_write_cb(uv_write_t* req, int status) {
|
|
|
140 struct ipc_peer_ctx* ctx;
|
|
|
141 ctx = container_of(req, struct ipc_peer_ctx, write_req);
|
|
|
142 uv_close((uv_handle_t*) &ctx->peer_handle, ipc_close_cb);
|
|
|
143 }
|
|
|
144
|
|
|
145
|
|
|
146 static void ipc_close_cb(uv_handle_t* handle) {
|
|
|
147 struct ipc_peer_ctx* ctx;
|
|
|
148 ctx = container_of(handle, struct ipc_peer_ctx, peer_handle);
|
|
|
149 free(ctx);
|
|
|
150 }
|
|
|
151
|
|
|
152
|
|
|
153 static void ipc_connect_cb(uv_connect_t* req, int status) {
|
|
|
154 struct ipc_client_ctx* ctx;
|
|
|
155 ctx = container_of(req, struct ipc_client_ctx, connect_req);
|
|
|
156 ASSERT_OK(status);
|
|
|
157 ASSERT_OK(uv_read_start((uv_stream_t*) &ctx->ipc_pipe,
|
|
|
158 ipc_alloc_cb,
|
|
|
159 ipc_read_cb));
|
|
|
160 }
|
|
|
161
|
|
|
162
|
|
|
163 static void ipc_alloc_cb(uv_handle_t* handle,
|
|
|
164 size_t suggested_size,
|
|
|
165 uv_buf_t* buf) {
|
|
|
166 struct ipc_client_ctx* ctx;
|
|
|
167 ctx = container_of(handle, struct ipc_client_ctx, ipc_pipe);
|
|
|
168 buf->base = ctx->scratch;
|
|
|
169 buf->len = sizeof(ctx->scratch);
|
|
|
170 }
|
|
|
171
|
|
|
172
|
|
|
173 static void ipc_read_cb(uv_stream_t* handle,
|
|
|
174 ssize_t nread,
|
|
|
175 const uv_buf_t* buf) {
|
|
|
176 struct ipc_client_ctx* ctx;
|
|
|
177 uv_loop_t* loop;
|
|
|
178 uv_handle_type type;
|
|
|
179 uv_pipe_t* ipc_pipe;
|
|
|
180
|
|
|
181 ipc_pipe = (uv_pipe_t*) handle;
|
|
|
182 ctx = container_of(ipc_pipe, struct ipc_client_ctx, ipc_pipe);
|
|
|
183 loop = ipc_pipe->loop;
|
|
|
184
|
|
|
185 ASSERT_EQ(1, uv_pipe_pending_count(ipc_pipe));
|
|
|
186 type = uv_pipe_pending_type(ipc_pipe);
|
|
|
187 if (type == UV_TCP)
|
|
|
188 ASSERT_OK(uv_tcp_init(loop, (uv_tcp_t*) ctx->server_handle));
|
|
|
189 else if (type == UV_NAMED_PIPE)
|
|
|
190 ASSERT_OK(uv_pipe_init(loop, (uv_pipe_t*) ctx->server_handle, 0));
|
|
|
191 else
|
|
|
192 ASSERT(0);
|
|
|
193
|
|
|
194 ASSERT_OK(uv_accept(handle, ctx->server_handle));
|
|
|
195 uv_close((uv_handle_t*) &ctx->ipc_pipe, NULL);
|
|
|
196 }
|
|
|
197
|
|
|
198
|
|
|
199 /* Set up an IPC pipe server that hands out listen sockets to the worker
|
|
|
200 * threads. It's kind of cumbersome for such a simple operation, maybe we
|
|
|
201 * should revive uv_import() and uv_export().
|
|
|
202 */
|
|
|
203 static void send_listen_handles(uv_handle_type type,
|
|
|
204 unsigned int num_servers,
|
|
|
205 struct server_ctx* servers) {
|
|
|
206 struct ipc_server_ctx ctx;
|
|
|
207 uv_loop_t* loop;
|
|
|
208 unsigned int i;
|
|
|
209
|
|
|
210 loop = uv_default_loop();
|
|
|
211 ctx.num_connects = num_servers;
|
|
|
212
|
|
|
213 if (type == UV_TCP) {
|
|
|
214 ASSERT_OK(uv_tcp_init(loop, (uv_tcp_t*) &ctx.server_handle));
|
|
|
215 ASSERT_OK(uv_tcp_bind((uv_tcp_t*) &ctx.server_handle,
|
|
|
216 (const struct sockaddr*) &listen_addr,
|
|
|
217 0));
|
|
|
218 }
|
|
|
219 else
|
|
|
220 ASSERT(0);
|
|
|
221 /* We need to initialize this pipe with ipc=0 - this is not a uv_pipe we'll
|
|
|
222 * be sending handles over, it's just for listening for new connections.
|
|
|
223 * If we accept a connection then the connected pipe must be initialized
|
|
|
224 * with ipc=1.
|
|
|
225 */
|
|
|
226 ASSERT_OK(uv_pipe_init(loop, &ctx.ipc_pipe, 0));
|
|
|
227 ASSERT_OK(uv_pipe_bind(&ctx.ipc_pipe, IPC_PIPE_NAME));
|
|
|
228 ASSERT_OK(uv_listen((uv_stream_t*) &ctx.ipc_pipe, 128, ipc_connection_cb));
|
|
|
229
|
|
|
230 for (i = 0; i < num_servers; i++)
|
|
|
231 uv_sem_post(&servers[i].semaphore);
|
|
|
232
|
|
|
233 ASSERT_OK(uv_run(loop, UV_RUN_DEFAULT));
|
|
|
234 uv_close((uv_handle_t*) &ctx.server_handle, NULL);
|
|
|
235 ASSERT_OK(uv_run(loop, UV_RUN_DEFAULT));
|
|
|
236
|
|
|
237 for (i = 0; i < num_servers; i++)
|
|
|
238 uv_sem_wait(&servers[i].semaphore);
|
|
|
239 }
|
|
|
240
|
|
|
241
|
|
|
242 static void get_listen_handle(uv_loop_t* loop, uv_stream_t* server_handle) {
|
|
|
243 struct ipc_client_ctx ctx;
|
|
|
244
|
|
|
245 ctx.server_handle = server_handle;
|
|
|
246 ctx.server_handle->data = "server handle";
|
|
|
247
|
|
|
248 ASSERT_OK(uv_pipe_init(loop, &ctx.ipc_pipe, 1));
|
|
|
249 uv_pipe_connect(&ctx.connect_req,
|
|
|
250 &ctx.ipc_pipe,
|
|
|
251 IPC_PIPE_NAME,
|
|
|
252 ipc_connect_cb);
|
|
|
253 ASSERT_OK(uv_run(loop, UV_RUN_DEFAULT));
|
|
|
254 }
|
|
|
255
|
|
|
256
|
|
|
257 static void server_cb(void *arg) {
|
|
|
258 struct server_ctx *ctx;
|
|
|
259 uv_loop_t loop;
|
|
|
260
|
|
|
261 ctx = arg;
|
|
|
262 ASSERT_OK(uv_loop_init(&loop));
|
|
|
263
|
|
|
264 ASSERT_OK(uv_async_init(&loop, &ctx->async_handle, sv_async_cb));
|
|
|
265 uv_unref((uv_handle_t*) &ctx->async_handle);
|
|
|
266
|
|
|
267 /* Wait until the main thread is ready. */
|
|
|
268 uv_sem_wait(&ctx->semaphore);
|
|
|
269 get_listen_handle(&loop, (uv_stream_t*) &ctx->server_handle);
|
|
|
270 uv_sem_post(&ctx->semaphore);
|
|
|
271
|
|
|
272 /* Now start the actual benchmark. */
|
|
|
273 ASSERT_OK(uv_listen((uv_stream_t*) &ctx->server_handle,
|
|
|
274 128,
|
|
|
275 sv_connection_cb));
|
|
|
276 ASSERT_OK(uv_run(&loop, UV_RUN_DEFAULT));
|
|
|
277
|
|
|
278 uv_loop_close(&loop);
|
|
|
279 }
|
|
|
280
|
|
|
281
|
|
|
282 static void sv_async_cb(uv_async_t* handle) {
|
|
|
283 struct server_ctx* ctx;
|
|
|
284 ctx = container_of(handle, struct server_ctx, async_handle);
|
|
|
285 uv_close((uv_handle_t*) &ctx->server_handle, NULL);
|
|
|
286 uv_close((uv_handle_t*) &ctx->async_handle, NULL);
|
|
|
287 }
|
|
|
288
|
|
|
289
|
|
|
290 static void sv_connection_cb(uv_stream_t* server_handle, int status) {
|
|
|
291 handle_storage_t* storage;
|
|
|
292 struct server_ctx* ctx;
|
|
|
293
|
|
|
294 ctx = container_of(server_handle, struct server_ctx, server_handle);
|
|
|
295 ASSERT_OK(status);
|
|
|
296
|
|
|
297 storage = malloc(sizeof(*storage));
|
|
|
298 ASSERT_NOT_NULL(storage);
|
|
|
299
|
|
|
300 if (server_handle->type == UV_TCP)
|
|
|
301 ASSERT_OK(uv_tcp_init(server_handle->loop, (uv_tcp_t*) storage));
|
|
|
302 else if (server_handle->type == UV_NAMED_PIPE)
|
|
|
303 ASSERT_OK(uv_pipe_init(server_handle->loop, (uv_pipe_t*) storage, 0));
|
|
|
304 else
|
|
|
305 ASSERT(0);
|
|
|
306
|
|
|
307 ASSERT_OK(uv_accept(server_handle, (uv_stream_t*) storage));
|
|
|
308 ASSERT_OK(uv_read_start((uv_stream_t*) storage, sv_alloc_cb, sv_read_cb));
|
|
|
309 ctx->num_connects++;
|
|
|
310 }
|
|
|
311
|
|
|
312
|
|
|
313 static void sv_alloc_cb(uv_handle_t* handle,
|
|
|
314 size_t suggested_size,
|
|
|
315 uv_buf_t* buf) {
|
|
|
316 static char slab[32];
|
|
|
317 buf->base = slab;
|
|
|
318 buf->len = sizeof(slab);
|
|
|
319 }
|
|
|
320
|
|
|
321
|
|
|
322 static void sv_read_cb(uv_stream_t* handle,
|
|
|
323 ssize_t nread,
|
|
|
324 const uv_buf_t* buf) {
|
|
|
325 ASSERT_EQ(nread, UV_EOF);
|
|
|
326 uv_close((uv_handle_t*) handle, (uv_close_cb) free);
|
|
|
327 }
|
|
|
328
|
|
|
329
|
|
|
330 static void cl_connect_cb(uv_connect_t* req, int status) {
|
|
|
331 struct client_ctx* ctx = container_of(req, struct client_ctx, connect_req);
|
|
|
332 uv_idle_start(&ctx->idle_handle, cl_idle_cb);
|
|
|
333 ASSERT_OK(status);
|
|
|
334 }
|
|
|
335
|
|
|
336
|
|
|
337 static void cl_idle_cb(uv_idle_t* handle) {
|
|
|
338 struct client_ctx* ctx = container_of(handle, struct client_ctx, idle_handle);
|
|
|
339 uv_close((uv_handle_t*) &ctx->client_handle, cl_close_cb);
|
|
|
340 uv_idle_stop(&ctx->idle_handle);
|
|
|
341 }
|
|
|
342
|
|
|
343
|
|
|
344 static void cl_close_cb(uv_handle_t* handle) {
|
|
|
345 struct client_ctx* ctx;
|
|
|
346
|
|
|
347 ctx = container_of(handle, struct client_ctx, client_handle);
|
|
|
348
|
|
|
349 if (--ctx->num_connects == 0) {
|
|
|
350 uv_close((uv_handle_t*) &ctx->idle_handle, NULL);
|
|
|
351 return;
|
|
|
352 }
|
|
|
353
|
|
|
354 ASSERT_OK(uv_tcp_init(handle->loop, (uv_tcp_t*) &ctx->client_handle));
|
|
|
355 ASSERT_OK(uv_tcp_connect(&ctx->connect_req,
|
|
|
356 (uv_tcp_t*) &ctx->client_handle,
|
|
|
357 (const struct sockaddr*) &listen_addr,
|
|
|
358 cl_connect_cb));
|
|
|
359 }
|
|
|
360
|
|
|
361
|
|
|
362 static int test_tcp(unsigned int num_servers, unsigned int num_clients) {
|
|
|
363 struct server_ctx* servers;
|
|
|
364 struct client_ctx* clients;
|
|
|
365 uv_loop_t* loop;
|
|
|
366 uv_tcp_t* handle;
|
|
|
367 unsigned int i;
|
|
|
368 double time;
|
|
|
369
|
|
|
370 ASSERT_OK(uv_ip4_addr("127.0.0.1", TEST_PORT, &listen_addr));
|
|
|
371 loop = uv_default_loop();
|
|
|
372
|
|
|
373 servers = calloc(num_servers, sizeof(servers[0]));
|
|
|
374 clients = calloc(num_clients, sizeof(clients[0]));
|
|
|
375 ASSERT_NOT_NULL(servers);
|
|
|
376 ASSERT_NOT_NULL(clients);
|
|
|
377
|
|
|
378 /* We're making the assumption here that from the perspective of the
|
|
|
379 * OS scheduler, threads are functionally equivalent to and interchangeable
|
|
|
380 * with full-blown processes.
|
|
|
381 */
|
|
|
382 for (i = 0; i < num_servers; i++) {
|
|
|
383 struct server_ctx* ctx = servers + i;
|
|
|
384 ASSERT_OK(uv_sem_init(&ctx->semaphore, 0));
|
|
|
385 ASSERT_OK(uv_thread_create(&ctx->thread_id, server_cb, ctx));
|
|
|
386 }
|
|
|
387
|
|
|
388 send_listen_handles(UV_TCP, num_servers, servers);
|
|
|
389
|
|
|
390 for (i = 0; i < num_clients; i++) {
|
|
|
391 struct client_ctx* ctx = clients + i;
|
|
|
392 ctx->num_connects = NUM_CONNECTS / num_clients;
|
|
|
393 handle = (uv_tcp_t*) &ctx->client_handle;
|
|
|
394 handle->data = "client handle";
|
|
|
395 ASSERT_OK(uv_tcp_init(loop, handle));
|
|
|
396 ASSERT_OK(uv_tcp_connect(&ctx->connect_req,
|
|
|
397 handle,
|
|
|
398 (const struct sockaddr*) &listen_addr,
|
|
|
399 cl_connect_cb));
|
|
|
400 ASSERT_OK(uv_idle_init(loop, &ctx->idle_handle));
|
|
|
401 }
|
|
|
402
|
|
|
403 {
|
|
|
404 uint64_t t = uv_hrtime();
|
|
|
405 ASSERT_OK(uv_run(loop, UV_RUN_DEFAULT));
|
|
|
406 t = uv_hrtime() - t;
|
|
|
407 time = t / 1e9;
|
|
|
408 }
|
|
|
409
|
|
|
410 for (i = 0; i < num_servers; i++) {
|
|
|
411 struct server_ctx* ctx = servers + i;
|
|
|
412 uv_async_send(&ctx->async_handle);
|
|
|
413 ASSERT_OK(uv_thread_join(&ctx->thread_id));
|
|
|
414 uv_sem_destroy(&ctx->semaphore);
|
|
|
415 }
|
|
|
416
|
|
|
417 printf("accept%u: %.0f accepts/sec (%u total)\n",
|
|
|
418 num_servers,
|
|
|
419 NUM_CONNECTS / time,
|
|
|
420 NUM_CONNECTS);
|
|
|
421
|
|
|
422 for (i = 0; i < num_servers; i++) {
|
|
|
423 struct server_ctx* ctx = servers + i;
|
|
|
424 printf(" thread #%u: %.0f accepts/sec (%u total, %.1f%%)\n",
|
|
|
425 i,
|
|
|
426 ctx->num_connects / time,
|
|
|
427 ctx->num_connects,
|
|
|
428 ctx->num_connects * 100.0 / NUM_CONNECTS);
|
|
|
429 }
|
|
|
430
|
|
|
431 free(clients);
|
|
|
432 free(servers);
|
|
|
433
|
|
|
434 MAKE_VALGRIND_HAPPY(loop);
|
|
|
435 return 0;
|
|
|
436 }
|
|
|
437
|
|
|
438
|
|
|
439 BENCHMARK_IMPL(tcp_multi_accept2) {
|
|
|
440 return test_tcp(2, 40);
|
|
|
441 }
|
|
|
442
|
|
|
443
|
|
|
444 BENCHMARK_IMPL(tcp_multi_accept4) {
|
|
|
445 return test_tcp(4, 40);
|
|
|
446 }
|
|
|
447
|
|
|
448
|
|
|
449 BENCHMARK_IMPL(tcp_multi_accept8) {
|
|
|
450 return test_tcp(8, 40);
|
|
|
451 }
|