comparison third_party/wrk/src/wrk.c @ 178:94705b5986b3

[ThirdParty] Added WRK and luajit for load testing.
author MrJuneJune <me@mrjunejune.com>
date Thu, 22 Jan 2026 20:10:30 -0800
parents
children
comparison
equal deleted inserted replaced
177:24fe8ff94056 178:94705b5986b3
1 // Copyright (C) 2012 - Will Glozer. All rights reserved.
2
3 #include "wrk.h"
4 #include "script.h"
5 #include "main.h"
6
7 static struct config {
8 uint64_t connections;
9 uint64_t duration;
10 uint64_t threads;
11 uint64_t timeout;
12 uint64_t pipeline;
13 bool delay;
14 bool dynamic;
15 bool latency;
16 char *host;
17 char *script;
18 SSL_CTX *ctx;
19 } cfg;
20
21 static struct {
22 stats *latency;
23 stats *requests;
24 } statistics;
25
26 static struct sock sock = {
27 .connect = sock_connect,
28 .close = sock_close,
29 .read = sock_read,
30 .write = sock_write,
31 .readable = sock_readable
32 };
33
34 static struct http_parser_settings parser_settings = {
35 .on_message_complete = response_complete
36 };
37
38 static volatile sig_atomic_t stop = 0;
39
40 static void handler(int sig) {
41 stop = 1;
42 }
43
44 static void usage() {
45 printf("Usage: wrk <options> <url> \n"
46 " Options: \n"
47 " -c, --connections <N> Connections to keep open \n"
48 " -d, --duration <T> Duration of test \n"
49 " -t, --threads <N> Number of threads to use \n"
50 " \n"
51 " -s, --script <S> Load Lua script file \n"
52 " -H, --header <H> Add header to request \n"
53 " --latency Print latency statistics \n"
54 " --timeout <T> Socket/request timeout \n"
55 " -v, --version Print version details \n"
56 " \n"
57 " Numeric arguments may include a SI unit (1k, 1M, 1G)\n"
58 " Time arguments may include a time unit (2s, 2m, 2h)\n");
59 }
60
61 int main(int argc, char **argv) {
62 char *url, **headers = zmalloc(argc * sizeof(char *));
63 struct http_parser_url parts = {};
64
65 if (parse_args(&cfg, &url, &parts, headers, argc, argv)) {
66 usage();
67 exit(1);
68 }
69
70 char *schema = copy_url_part(url, &parts, UF_SCHEMA);
71 char *host = copy_url_part(url, &parts, UF_HOST);
72 char *port = copy_url_part(url, &parts, UF_PORT);
73 char *service = port ? port : schema;
74
75 if (!strncmp("https", schema, 5)) {
76 if ((cfg.ctx = ssl_init()) == NULL) {
77 fprintf(stderr, "unable to initialize SSL\n");
78 ERR_print_errors_fp(stderr);
79 exit(1);
80 }
81 sock.connect = ssl_connect;
82 sock.close = ssl_close;
83 sock.read = ssl_read;
84 sock.write = ssl_write;
85 sock.readable = ssl_readable;
86 }
87
88 signal(SIGPIPE, SIG_IGN);
89 signal(SIGINT, SIG_IGN);
90
91 statistics.latency = stats_alloc(cfg.timeout * 1000);
92 statistics.requests = stats_alloc(MAX_THREAD_RATE_S);
93 thread *threads = zcalloc(cfg.threads * sizeof(thread));
94
95 lua_State *L = script_create(cfg.script, url, headers);
96 if (!script_resolve(L, host, service)) {
97 char *msg = strerror(errno);
98 fprintf(stderr, "unable to connect to %s:%s %s\n", host, service, msg);
99 exit(1);
100 }
101
102 cfg.host = host;
103
104 for (uint64_t i = 0; i < cfg.threads; i++) {
105 thread *t = &threads[i];
106 t->loop = aeCreateEventLoop(10 + cfg.connections * 3);
107 t->connections = cfg.connections / cfg.threads;
108
109 t->L = script_create(cfg.script, url, headers);
110 script_init(L, t, argc - optind, &argv[optind]);
111
112 if (i == 0) {
113 cfg.pipeline = script_verify_request(t->L);
114 cfg.dynamic = !script_is_static(t->L);
115 cfg.delay = script_has_delay(t->L);
116 if (script_want_response(t->L)) {
117 parser_settings.on_header_field = header_field;
118 parser_settings.on_header_value = header_value;
119 parser_settings.on_body = response_body;
120 }
121 }
122
123 if (!t->loop || pthread_create(&t->thread, NULL, &thread_main, t)) {
124 char *msg = strerror(errno);
125 fprintf(stderr, "unable to create thread %"PRIu64": %s\n", i, msg);
126 exit(2);
127 }
128 }
129
130 struct sigaction sa = {
131 .sa_handler = handler,
132 .sa_flags = 0,
133 };
134 sigfillset(&sa.sa_mask);
135 sigaction(SIGINT, &sa, NULL);
136
137 char *time = format_time_s(cfg.duration);
138 printf("Running %s test @ %s\n", time, url);
139 printf(" %"PRIu64" threads and %"PRIu64" connections\n", cfg.threads, cfg.connections);
140
141 uint64_t start = time_us();
142 uint64_t complete = 0;
143 uint64_t bytes = 0;
144 errors errors = { 0 };
145
146 sleep(cfg.duration);
147 stop = 1;
148
149 for (uint64_t i = 0; i < cfg.threads; i++) {
150 thread *t = &threads[i];
151 pthread_join(t->thread, NULL);
152
153 complete += t->complete;
154 bytes += t->bytes;
155
156 errors.connect += t->errors.connect;
157 errors.read += t->errors.read;
158 errors.write += t->errors.write;
159 errors.timeout += t->errors.timeout;
160 errors.status += t->errors.status;
161 }
162
163 uint64_t runtime_us = time_us() - start;
164 long double runtime_s = runtime_us / 1000000.0;
165 long double req_per_s = complete / runtime_s;
166 long double bytes_per_s = bytes / runtime_s;
167
168 if (complete / cfg.connections > 0) {
169 int64_t interval = runtime_us / (complete / cfg.connections);
170 stats_correct(statistics.latency, interval);
171 }
172
173 print_stats_header();
174 print_stats("Latency", statistics.latency, format_time_us);
175 print_stats("Req/Sec", statistics.requests, format_metric);
176 if (cfg.latency) print_stats_latency(statistics.latency);
177
178 char *runtime_msg = format_time_us(runtime_us);
179
180 printf(" %"PRIu64" requests in %s, %sB read\n", complete, runtime_msg, format_binary(bytes));
181 if (errors.connect || errors.read || errors.write || errors.timeout) {
182 printf(" Socket errors: connect %d, read %d, write %d, timeout %d\n",
183 errors.connect, errors.read, errors.write, errors.timeout);
184 }
185
186 if (errors.status) {
187 printf(" Non-2xx or 3xx responses: %d\n", errors.status);
188 }
189
190 printf("Requests/sec: %9.2Lf\n", req_per_s);
191 printf("Transfer/sec: %10sB\n", format_binary(bytes_per_s));
192
193 if (script_has_done(L)) {
194 script_summary(L, runtime_us, complete, bytes);
195 script_errors(L, &errors);
196 script_done(L, statistics.latency, statistics.requests);
197 }
198
199 return 0;
200 }
201
202 void *thread_main(void *arg) {
203 thread *thread = arg;
204
205 char *request = NULL;
206 size_t length = 0;
207
208 if (!cfg.dynamic) {
209 script_request(thread->L, &request, &length);
210 }
211
212 thread->cs = zcalloc(thread->connections * sizeof(connection));
213 connection *c = thread->cs;
214
215 for (uint64_t i = 0; i < thread->connections; i++, c++) {
216 c->thread = thread;
217 c->ssl = cfg.ctx ? SSL_new(cfg.ctx) : NULL;
218 c->request = request;
219 c->length = length;
220 c->delayed = cfg.delay;
221 connect_socket(thread, c);
222 }
223
224 aeEventLoop *loop = thread->loop;
225 aeCreateTimeEvent(loop, RECORD_INTERVAL_MS, record_rate, thread, NULL);
226
227 thread->start = time_us();
228 aeMain(loop);
229
230 aeDeleteEventLoop(loop);
231 zfree(thread->cs);
232
233 return NULL;
234 }
235
236 static int connect_socket(thread *thread, connection *c) {
237 struct addrinfo *addr = thread->addr;
238 struct aeEventLoop *loop = thread->loop;
239 int fd, flags;
240
241 fd = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
242
243 flags = fcntl(fd, F_GETFL, 0);
244 fcntl(fd, F_SETFL, flags | O_NONBLOCK);
245
246 if (connect(fd, addr->ai_addr, addr->ai_addrlen) == -1) {
247 if (errno != EINPROGRESS) goto error;
248 }
249
250 flags = 1;
251 setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &flags, sizeof(flags));
252
253 flags = AE_READABLE | AE_WRITABLE;
254 if (aeCreateFileEvent(loop, fd, flags, socket_connected, c) == AE_OK) {
255 c->parser.data = c;
256 c->fd = fd;
257 return fd;
258 }
259
260 error:
261 thread->errors.connect++;
262 close(fd);
263 return -1;
264 }
265
266 static int reconnect_socket(thread *thread, connection *c) {
267 aeDeleteFileEvent(thread->loop, c->fd, AE_WRITABLE | AE_READABLE);
268 sock.close(c);
269 close(c->fd);
270 return connect_socket(thread, c);
271 }
272
273 static int record_rate(aeEventLoop *loop, long long id, void *data) {
274 thread *thread = data;
275
276 if (thread->requests > 0) {
277 uint64_t elapsed_ms = (time_us() - thread->start) / 1000;
278 uint64_t requests = (thread->requests / (double) elapsed_ms) * 1000;
279
280 stats_record(statistics.requests, requests);
281
282 thread->requests = 0;
283 thread->start = time_us();
284 }
285
286 if (stop) aeStop(loop);
287
288 return RECORD_INTERVAL_MS;
289 }
290
291 static int delay_request(aeEventLoop *loop, long long id, void *data) {
292 connection *c = data;
293 c->delayed = false;
294 aeCreateFileEvent(loop, c->fd, AE_WRITABLE, socket_writeable, c);
295 return AE_NOMORE;
296 }
297
298 static int header_field(http_parser *parser, const char *at, size_t len) {
299 connection *c = parser->data;
300 if (c->state == VALUE) {
301 *c->headers.cursor++ = '\0';
302 c->state = FIELD;
303 }
304 buffer_append(&c->headers, at, len);
305 return 0;
306 }
307
308 static int header_value(http_parser *parser, const char *at, size_t len) {
309 connection *c = parser->data;
310 if (c->state == FIELD) {
311 *c->headers.cursor++ = '\0';
312 c->state = VALUE;
313 }
314 buffer_append(&c->headers, at, len);
315 return 0;
316 }
317
318 static int response_body(http_parser *parser, const char *at, size_t len) {
319 connection *c = parser->data;
320 buffer_append(&c->body, at, len);
321 return 0;
322 }
323
324 static int response_complete(http_parser *parser) {
325 connection *c = parser->data;
326 thread *thread = c->thread;
327 uint64_t now = time_us();
328 int status = parser->status_code;
329
330 thread->complete++;
331 thread->requests++;
332
333 if (status > 399) {
334 thread->errors.status++;
335 }
336
337 if (c->headers.buffer) {
338 *c->headers.cursor++ = '\0';
339 script_response(thread->L, status, &c->headers, &c->body);
340 c->state = FIELD;
341 }
342
343 if (--c->pending == 0) {
344 if (!stats_record(statistics.latency, now - c->start)) {
345 thread->errors.timeout++;
346 }
347 c->delayed = cfg.delay;
348 aeCreateFileEvent(thread->loop, c->fd, AE_WRITABLE, socket_writeable, c);
349 }
350
351 if (!http_should_keep_alive(parser)) {
352 reconnect_socket(thread, c);
353 goto done;
354 }
355
356 http_parser_init(parser, HTTP_RESPONSE);
357
358 done:
359 return 0;
360 }
361
362 static void socket_connected(aeEventLoop *loop, int fd, void *data, int mask) {
363 connection *c = data;
364
365 switch (sock.connect(c, cfg.host)) {
366 case OK: break;
367 case ERROR: goto error;
368 case RETRY: return;
369 }
370
371 http_parser_init(&c->parser, HTTP_RESPONSE);
372 c->written = 0;
373
374 aeCreateFileEvent(c->thread->loop, fd, AE_READABLE, socket_readable, c);
375 aeCreateFileEvent(c->thread->loop, fd, AE_WRITABLE, socket_writeable, c);
376
377 return;
378
379 error:
380 c->thread->errors.connect++;
381 reconnect_socket(c->thread, c);
382 }
383
384 static void socket_writeable(aeEventLoop *loop, int fd, void *data, int mask) {
385 connection *c = data;
386 thread *thread = c->thread;
387
388 if (c->delayed) {
389 uint64_t delay = script_delay(thread->L);
390 aeDeleteFileEvent(loop, fd, AE_WRITABLE);
391 aeCreateTimeEvent(loop, delay, delay_request, c, NULL);
392 return;
393 }
394
395 if (!c->written) {
396 if (cfg.dynamic) {
397 script_request(thread->L, &c->request, &c->length);
398 }
399 c->start = time_us();
400 c->pending = cfg.pipeline;
401 }
402
403 char *buf = c->request + c->written;
404 size_t len = c->length - c->written;
405 size_t n;
406
407 switch (sock.write(c, buf, len, &n)) {
408 case OK: break;
409 case ERROR: goto error;
410 case RETRY: return;
411 }
412
413 c->written += n;
414 if (c->written == c->length) {
415 c->written = 0;
416 aeDeleteFileEvent(loop, fd, AE_WRITABLE);
417 }
418
419 return;
420
421 error:
422 thread->errors.write++;
423 reconnect_socket(thread, c);
424 }
425
426 static void socket_readable(aeEventLoop *loop, int fd, void *data, int mask) {
427 connection *c = data;
428 size_t n;
429
430 do {
431 switch (sock.read(c, &n)) {
432 case OK: break;
433 case ERROR: goto error;
434 case RETRY: return;
435 }
436
437 if (http_parser_execute(&c->parser, &parser_settings, c->buf, n) != n) goto error;
438 if (n == 0 && !http_body_is_final(&c->parser)) goto error;
439
440 c->thread->bytes += n;
441 } while (n == RECVBUF && sock.readable(c) > 0);
442
443 return;
444
445 error:
446 c->thread->errors.read++;
447 reconnect_socket(c->thread, c);
448 }
449
450 static uint64_t time_us() {
451 struct timeval t;
452 gettimeofday(&t, NULL);
453 return (t.tv_sec * 1000000) + t.tv_usec;
454 }
455
456 static char *copy_url_part(char *url, struct http_parser_url *parts, enum http_parser_url_fields field) {
457 char *part = NULL;
458
459 if (parts->field_set & (1 << field)) {
460 uint16_t off = parts->field_data[field].off;
461 uint16_t len = parts->field_data[field].len;
462 part = zcalloc(len + 1 * sizeof(char));
463 memcpy(part, &url[off], len);
464 }
465
466 return part;
467 }
468
469 static struct option longopts[] = {
470 { "connections", required_argument, NULL, 'c' },
471 { "duration", required_argument, NULL, 'd' },
472 { "threads", required_argument, NULL, 't' },
473 { "script", required_argument, NULL, 's' },
474 { "header", required_argument, NULL, 'H' },
475 { "latency", no_argument, NULL, 'L' },
476 { "timeout", required_argument, NULL, 'T' },
477 { "help", no_argument, NULL, 'h' },
478 { "version", no_argument, NULL, 'v' },
479 { NULL, 0, NULL, 0 }
480 };
481
482 static int parse_args(struct config *cfg, char **url, struct http_parser_url *parts, char **headers, int argc, char **argv) {
483 char **header = headers;
484 int c;
485
486 memset(cfg, 0, sizeof(struct config));
487 cfg->threads = 2;
488 cfg->connections = 10;
489 cfg->duration = 10;
490 cfg->timeout = SOCKET_TIMEOUT_MS;
491
492 while ((c = getopt_long(argc, argv, "t:c:d:s:H:T:Lrv?", longopts, NULL)) != -1) {
493 switch (c) {
494 case 't':
495 if (scan_metric(optarg, &cfg->threads)) return -1;
496 break;
497 case 'c':
498 if (scan_metric(optarg, &cfg->connections)) return -1;
499 break;
500 case 'd':
501 if (scan_time(optarg, &cfg->duration)) return -1;
502 break;
503 case 's':
504 cfg->script = optarg;
505 break;
506 case 'H':
507 *header++ = optarg;
508 break;
509 case 'L':
510 cfg->latency = true;
511 break;
512 case 'T':
513 if (scan_time(optarg, &cfg->timeout)) return -1;
514 cfg->timeout *= 1000;
515 break;
516 case 'v':
517 printf("wrk %s [%s] ", VERSION, aeGetApiName());
518 printf("Copyright (C) 2012 Will Glozer\n");
519 break;
520 case 'h':
521 case '?':
522 case ':':
523 default:
524 return -1;
525 }
526 }
527
528 if (optind == argc || !cfg->threads || !cfg->duration) return -1;
529
530 if (!script_parse_url(argv[optind], parts)) {
531 fprintf(stderr, "invalid URL: %s\n", argv[optind]);
532 return -1;
533 }
534
535 if (!cfg->connections || cfg->connections < cfg->threads) {
536 fprintf(stderr, "number of connections must be >= threads\n");
537 return -1;
538 }
539
540 *url = argv[optind];
541 *header = NULL;
542
543 return 0;
544 }
545
546 static void print_stats_header() {
547 printf(" Thread Stats%6s%11s%8s%12s\n", "Avg", "Stdev", "Max", "+/- Stdev");
548 }
549
550 static void print_units(long double n, char *(*fmt)(long double), int width) {
551 char *msg = fmt(n);
552 int len = strlen(msg), pad = 2;
553
554 if (isalpha(msg[len-1])) pad--;
555 if (isalpha(msg[len-2])) pad--;
556 width -= pad;
557
558 printf("%*.*s%.*s", width, width, msg, pad, " ");
559
560 free(msg);
561 }
562
563 static void print_stats(char *name, stats *stats, char *(*fmt)(long double)) {
564 uint64_t max = stats->max;
565 long double mean = stats_mean(stats);
566 long double stdev = stats_stdev(stats, mean);
567
568 printf(" %-10s", name);
569 print_units(mean, fmt, 8);
570 print_units(stdev, fmt, 10);
571 print_units(max, fmt, 9);
572 printf("%8.2Lf%%\n", stats_within_stdev(stats, mean, stdev, 1));
573 }
574
575 static void print_stats_latency(stats *stats) {
576 long double percentiles[] = { 50.0, 75.0, 90.0, 99.0 };
577 printf(" Latency Distribution\n");
578 for (size_t i = 0; i < sizeof(percentiles) / sizeof(long double); i++) {
579 long double p = percentiles[i];
580 uint64_t n = stats_percentile(stats, p);
581 printf("%7.0Lf%%", p);
582 print_units(n, format_time_us, 10);
583 printf("\n");
584 }
585 }