diff third_party/wrk/src/wrk.c @ 178:94705b5986b3

[ThirdParty] Added WRK and luajit for load testing.
author MrJuneJune <me@mrjunejune.com>
date Thu, 22 Jan 2026 20:10:30 -0800
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/third_party/wrk/src/wrk.c	Thu Jan 22 20:10:30 2026 -0800
@@ -0,0 +1,585 @@
+// Copyright (C) 2012 - Will Glozer.  All rights reserved.
+
+#include "wrk.h"
+#include "script.h"
+#include "main.h"
+
+static struct config {
+    uint64_t connections;
+    uint64_t duration;
+    uint64_t threads;
+    uint64_t timeout;
+    uint64_t pipeline;
+    bool     delay;
+    bool     dynamic;
+    bool     latency;
+    char    *host;
+    char    *script;
+    SSL_CTX *ctx;
+} cfg;
+
+static struct {
+    stats *latency;
+    stats *requests;
+} statistics;
+
+static struct sock sock = {
+    .connect  = sock_connect,
+    .close    = sock_close,
+    .read     = sock_read,
+    .write    = sock_write,
+    .readable = sock_readable
+};
+
+static struct http_parser_settings parser_settings = {
+    .on_message_complete = response_complete
+};
+
+static volatile sig_atomic_t stop = 0;
+
+static void handler(int sig) {
+    stop = 1;
+}
+
+static void usage() {
+    printf("Usage: wrk <options> <url>                            \n"
+           "  Options:                                            \n"
+           "    -c, --connections <N>  Connections to keep open   \n"
+           "    -d, --duration    <T>  Duration of test           \n"
+           "    -t, --threads     <N>  Number of threads to use   \n"
+           "                                                      \n"
+           "    -s, --script      <S>  Load Lua script file       \n"
+           "    -H, --header      <H>  Add header to request      \n"
+           "        --latency          Print latency statistics   \n"
+           "        --timeout     <T>  Socket/request timeout     \n"
+           "    -v, --version          Print version details      \n"
+           "                                                      \n"
+           "  Numeric arguments may include a SI unit (1k, 1M, 1G)\n"
+           "  Time arguments may include a time unit (2s, 2m, 2h)\n");
+}
+
+int main(int argc, char **argv) {
+    char *url, **headers = zmalloc(argc * sizeof(char *));
+    struct http_parser_url parts = {};
+
+    if (parse_args(&cfg, &url, &parts, headers, argc, argv)) {
+        usage();
+        exit(1);
+    }
+
+    char *schema  = copy_url_part(url, &parts, UF_SCHEMA);
+    char *host    = copy_url_part(url, &parts, UF_HOST);
+    char *port    = copy_url_part(url, &parts, UF_PORT);
+    char *service = port ? port : schema;
+
+    if (!strncmp("https", schema, 5)) {
+        if ((cfg.ctx = ssl_init()) == NULL) {
+            fprintf(stderr, "unable to initialize SSL\n");
+            ERR_print_errors_fp(stderr);
+            exit(1);
+        }
+        sock.connect  = ssl_connect;
+        sock.close    = ssl_close;
+        sock.read     = ssl_read;
+        sock.write    = ssl_write;
+        sock.readable = ssl_readable;
+    }
+
+    signal(SIGPIPE, SIG_IGN);
+    signal(SIGINT,  SIG_IGN);
+
+    statistics.latency  = stats_alloc(cfg.timeout * 1000);
+    statistics.requests = stats_alloc(MAX_THREAD_RATE_S);
+    thread *threads     = zcalloc(cfg.threads * sizeof(thread));
+
+    lua_State *L = script_create(cfg.script, url, headers);
+    if (!script_resolve(L, host, service)) {
+        char *msg = strerror(errno);
+        fprintf(stderr, "unable to connect to %s:%s %s\n", host, service, msg);
+        exit(1);
+    }
+
+    cfg.host = host;
+
+    for (uint64_t i = 0; i < cfg.threads; i++) {
+        thread *t      = &threads[i];
+        t->loop        = aeCreateEventLoop(10 + cfg.connections * 3);
+        t->connections = cfg.connections / cfg.threads;
+
+        t->L = script_create(cfg.script, url, headers);
+        script_init(L, t, argc - optind, &argv[optind]);
+
+        if (i == 0) {
+            cfg.pipeline = script_verify_request(t->L);
+            cfg.dynamic  = !script_is_static(t->L);
+            cfg.delay    = script_has_delay(t->L);
+            if (script_want_response(t->L)) {
+                parser_settings.on_header_field = header_field;
+                parser_settings.on_header_value = header_value;
+                parser_settings.on_body         = response_body;
+            }
+        }
+
+        if (!t->loop || pthread_create(&t->thread, NULL, &thread_main, t)) {
+            char *msg = strerror(errno);
+            fprintf(stderr, "unable to create thread %"PRIu64": %s\n", i, msg);
+            exit(2);
+        }
+    }
+
+    struct sigaction sa = {
+        .sa_handler = handler,
+        .sa_flags   = 0,
+    };
+    sigfillset(&sa.sa_mask);
+    sigaction(SIGINT, &sa, NULL);
+
+    char *time = format_time_s(cfg.duration);
+    printf("Running %s test @ %s\n", time, url);
+    printf("  %"PRIu64" threads and %"PRIu64" connections\n", cfg.threads, cfg.connections);
+
+    uint64_t start    = time_us();
+    uint64_t complete = 0;
+    uint64_t bytes    = 0;
+    errors errors     = { 0 };
+
+    sleep(cfg.duration);
+    stop = 1;
+
+    for (uint64_t i = 0; i < cfg.threads; i++) {
+        thread *t = &threads[i];
+        pthread_join(t->thread, NULL);
+
+        complete += t->complete;
+        bytes    += t->bytes;
+
+        errors.connect += t->errors.connect;
+        errors.read    += t->errors.read;
+        errors.write   += t->errors.write;
+        errors.timeout += t->errors.timeout;
+        errors.status  += t->errors.status;
+    }
+
+    uint64_t runtime_us = time_us() - start;
+    long double runtime_s   = runtime_us / 1000000.0;
+    long double req_per_s   = complete   / runtime_s;
+    long double bytes_per_s = bytes      / runtime_s;
+
+    if (complete / cfg.connections > 0) {
+        int64_t interval = runtime_us / (complete / cfg.connections);
+        stats_correct(statistics.latency, interval);
+    }
+
+    print_stats_header();
+    print_stats("Latency", statistics.latency, format_time_us);
+    print_stats("Req/Sec", statistics.requests, format_metric);
+    if (cfg.latency) print_stats_latency(statistics.latency);
+
+    char *runtime_msg = format_time_us(runtime_us);
+
+    printf("  %"PRIu64" requests in %s, %sB read\n", complete, runtime_msg, format_binary(bytes));
+    if (errors.connect || errors.read || errors.write || errors.timeout) {
+        printf("  Socket errors: connect %d, read %d, write %d, timeout %d\n",
+               errors.connect, errors.read, errors.write, errors.timeout);
+    }
+
+    if (errors.status) {
+        printf("  Non-2xx or 3xx responses: %d\n", errors.status);
+    }
+
+    printf("Requests/sec: %9.2Lf\n", req_per_s);
+    printf("Transfer/sec: %10sB\n", format_binary(bytes_per_s));
+
+    if (script_has_done(L)) {
+        script_summary(L, runtime_us, complete, bytes);
+        script_errors(L, &errors);
+        script_done(L, statistics.latency, statistics.requests);
+    }
+
+    return 0;
+}
+
+void *thread_main(void *arg) {
+    thread *thread = arg;
+
+    char *request = NULL;
+    size_t length = 0;
+
+    if (!cfg.dynamic) {
+        script_request(thread->L, &request, &length);
+    }
+
+    thread->cs = zcalloc(thread->connections * sizeof(connection));
+    connection *c = thread->cs;
+
+    for (uint64_t i = 0; i < thread->connections; i++, c++) {
+        c->thread = thread;
+        c->ssl     = cfg.ctx ? SSL_new(cfg.ctx) : NULL;
+        c->request = request;
+        c->length  = length;
+        c->delayed = cfg.delay;
+        connect_socket(thread, c);
+    }
+
+    aeEventLoop *loop = thread->loop;
+    aeCreateTimeEvent(loop, RECORD_INTERVAL_MS, record_rate, thread, NULL);
+
+    thread->start = time_us();
+    aeMain(loop);
+
+    aeDeleteEventLoop(loop);
+    zfree(thread->cs);
+
+    return NULL;
+}
+
+static int connect_socket(thread *thread, connection *c) {
+    struct addrinfo *addr = thread->addr;
+    struct aeEventLoop *loop = thread->loop;
+    int fd, flags;
+
+    fd = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
+
+    flags = fcntl(fd, F_GETFL, 0);
+    fcntl(fd, F_SETFL, flags | O_NONBLOCK);
+
+    if (connect(fd, addr->ai_addr, addr->ai_addrlen) == -1) {
+        if (errno != EINPROGRESS) goto error;
+    }
+
+    flags = 1;
+    setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &flags, sizeof(flags));
+
+    flags = AE_READABLE | AE_WRITABLE;
+    if (aeCreateFileEvent(loop, fd, flags, socket_connected, c) == AE_OK) {
+        c->parser.data = c;
+        c->fd = fd;
+        return fd;
+    }
+
+  error:
+    thread->errors.connect++;
+    close(fd);
+    return -1;
+}
+
+static int reconnect_socket(thread *thread, connection *c) {
+    aeDeleteFileEvent(thread->loop, c->fd, AE_WRITABLE | AE_READABLE);
+    sock.close(c);
+    close(c->fd);
+    return connect_socket(thread, c);
+}
+
+static int record_rate(aeEventLoop *loop, long long id, void *data) {
+    thread *thread = data;
+
+    if (thread->requests > 0) {
+        uint64_t elapsed_ms = (time_us() - thread->start) / 1000;
+        uint64_t requests = (thread->requests / (double) elapsed_ms) * 1000;
+
+        stats_record(statistics.requests, requests);
+
+        thread->requests = 0;
+        thread->start    = time_us();
+    }
+
+    if (stop) aeStop(loop);
+
+    return RECORD_INTERVAL_MS;
+}
+
+static int delay_request(aeEventLoop *loop, long long id, void *data) {
+    connection *c = data;
+    c->delayed = false;
+    aeCreateFileEvent(loop, c->fd, AE_WRITABLE, socket_writeable, c);
+    return AE_NOMORE;
+}
+
+static int header_field(http_parser *parser, const char *at, size_t len) {
+    connection *c = parser->data;
+    if (c->state == VALUE) {
+        *c->headers.cursor++ = '\0';
+        c->state = FIELD;
+    }
+    buffer_append(&c->headers, at, len);
+    return 0;
+}
+
+static int header_value(http_parser *parser, const char *at, size_t len) {
+    connection *c = parser->data;
+    if (c->state == FIELD) {
+        *c->headers.cursor++ = '\0';
+        c->state = VALUE;
+    }
+    buffer_append(&c->headers, at, len);
+    return 0;
+}
+
+static int response_body(http_parser *parser, const char *at, size_t len) {
+    connection *c = parser->data;
+    buffer_append(&c->body, at, len);
+    return 0;
+}
+
+static int response_complete(http_parser *parser) {
+    connection *c = parser->data;
+    thread *thread = c->thread;
+    uint64_t now = time_us();
+    int status = parser->status_code;
+
+    thread->complete++;
+    thread->requests++;
+
+    if (status > 399) {
+        thread->errors.status++;
+    }
+
+    if (c->headers.buffer) {
+        *c->headers.cursor++ = '\0';
+        script_response(thread->L, status, &c->headers, &c->body);
+        c->state = FIELD;
+    }
+
+    if (--c->pending == 0) {
+        if (!stats_record(statistics.latency, now - c->start)) {
+            thread->errors.timeout++;
+        }
+        c->delayed = cfg.delay;
+        aeCreateFileEvent(thread->loop, c->fd, AE_WRITABLE, socket_writeable, c);
+    }
+
+    if (!http_should_keep_alive(parser)) {
+        reconnect_socket(thread, c);
+        goto done;
+    }
+
+    http_parser_init(parser, HTTP_RESPONSE);
+
+  done:
+    return 0;
+}
+
+static void socket_connected(aeEventLoop *loop, int fd, void *data, int mask) {
+    connection *c = data;
+
+    switch (sock.connect(c, cfg.host)) {
+        case OK:    break;
+        case ERROR: goto error;
+        case RETRY: return;
+    }
+
+    http_parser_init(&c->parser, HTTP_RESPONSE);
+    c->written = 0;
+
+    aeCreateFileEvent(c->thread->loop, fd, AE_READABLE, socket_readable, c);
+    aeCreateFileEvent(c->thread->loop, fd, AE_WRITABLE, socket_writeable, c);
+
+    return;
+
+  error:
+    c->thread->errors.connect++;
+    reconnect_socket(c->thread, c);
+}
+
+static void socket_writeable(aeEventLoop *loop, int fd, void *data, int mask) {
+    connection *c = data;
+    thread *thread = c->thread;
+
+    if (c->delayed) {
+        uint64_t delay = script_delay(thread->L);
+        aeDeleteFileEvent(loop, fd, AE_WRITABLE);
+        aeCreateTimeEvent(loop, delay, delay_request, c, NULL);
+        return;
+    }
+
+    if (!c->written) {
+        if (cfg.dynamic) {
+            script_request(thread->L, &c->request, &c->length);
+        }
+        c->start   = time_us();
+        c->pending = cfg.pipeline;
+    }
+
+    char  *buf = c->request + c->written;
+    size_t len = c->length  - c->written;
+    size_t n;
+
+    switch (sock.write(c, buf, len, &n)) {
+        case OK:    break;
+        case ERROR: goto error;
+        case RETRY: return;
+    }
+
+    c->written += n;
+    if (c->written == c->length) {
+        c->written = 0;
+        aeDeleteFileEvent(loop, fd, AE_WRITABLE);
+    }
+
+    return;
+
+  error:
+    thread->errors.write++;
+    reconnect_socket(thread, c);
+}
+
+static void socket_readable(aeEventLoop *loop, int fd, void *data, int mask) {
+    connection *c = data;
+    size_t n;
+
+    do {
+        switch (sock.read(c, &n)) {
+            case OK:    break;
+            case ERROR: goto error;
+            case RETRY: return;
+        }
+
+        if (http_parser_execute(&c->parser, &parser_settings, c->buf, n) != n) goto error;
+        if (n == 0 && !http_body_is_final(&c->parser)) goto error;
+
+        c->thread->bytes += n;
+    } while (n == RECVBUF && sock.readable(c) > 0);
+
+    return;
+
+  error:
+    c->thread->errors.read++;
+    reconnect_socket(c->thread, c);
+}
+
+static uint64_t time_us() {
+    struct timeval t;
+    gettimeofday(&t, NULL);
+    return (t.tv_sec * 1000000) + t.tv_usec;
+}
+
+static char *copy_url_part(char *url, struct http_parser_url *parts, enum http_parser_url_fields field) {
+    char *part = NULL;
+
+    if (parts->field_set & (1 << field)) {
+        uint16_t off = parts->field_data[field].off;
+        uint16_t len = parts->field_data[field].len;
+        part = zcalloc(len + 1 * sizeof(char));
+        memcpy(part, &url[off], len);
+    }
+
+    return part;
+}
+
+static struct option longopts[] = {
+    { "connections", required_argument, NULL, 'c' },
+    { "duration",    required_argument, NULL, 'd' },
+    { "threads",     required_argument, NULL, 't' },
+    { "script",      required_argument, NULL, 's' },
+    { "header",      required_argument, NULL, 'H' },
+    { "latency",     no_argument,       NULL, 'L' },
+    { "timeout",     required_argument, NULL, 'T' },
+    { "help",        no_argument,       NULL, 'h' },
+    { "version",     no_argument,       NULL, 'v' },
+    { NULL,          0,                 NULL,  0  }
+};
+
+static int parse_args(struct config *cfg, char **url, struct http_parser_url *parts, char **headers, int argc, char **argv) {
+    char **header = headers;
+    int c;
+
+    memset(cfg, 0, sizeof(struct config));
+    cfg->threads     = 2;
+    cfg->connections = 10;
+    cfg->duration    = 10;
+    cfg->timeout     = SOCKET_TIMEOUT_MS;
+
+    while ((c = getopt_long(argc, argv, "t:c:d:s:H:T:Lrv?", longopts, NULL)) != -1) {
+        switch (c) {
+            case 't':
+                if (scan_metric(optarg, &cfg->threads)) return -1;
+                break;
+            case 'c':
+                if (scan_metric(optarg, &cfg->connections)) return -1;
+                break;
+            case 'd':
+                if (scan_time(optarg, &cfg->duration)) return -1;
+                break;
+            case 's':
+                cfg->script = optarg;
+                break;
+            case 'H':
+                *header++ = optarg;
+                break;
+            case 'L':
+                cfg->latency = true;
+                break;
+            case 'T':
+                if (scan_time(optarg, &cfg->timeout)) return -1;
+                cfg->timeout *= 1000;
+                break;
+            case 'v':
+                printf("wrk %s [%s] ", VERSION, aeGetApiName());
+                printf("Copyright (C) 2012 Will Glozer\n");
+                break;
+            case 'h':
+            case '?':
+            case ':':
+            default:
+                return -1;
+        }
+    }
+
+    if (optind == argc || !cfg->threads || !cfg->duration) return -1;
+
+    if (!script_parse_url(argv[optind], parts)) {
+        fprintf(stderr, "invalid URL: %s\n", argv[optind]);
+        return -1;
+    }
+
+    if (!cfg->connections || cfg->connections < cfg->threads) {
+        fprintf(stderr, "number of connections must be >= threads\n");
+        return -1;
+    }
+
+    *url    = argv[optind];
+    *header = NULL;
+
+    return 0;
+}
+
+static void print_stats_header() {
+    printf("  Thread Stats%6s%11s%8s%12s\n", "Avg", "Stdev", "Max", "+/- Stdev");
+}
+
+static void print_units(long double n, char *(*fmt)(long double), int width) {
+    char *msg = fmt(n);
+    int len = strlen(msg), pad = 2;
+
+    if (isalpha(msg[len-1])) pad--;
+    if (isalpha(msg[len-2])) pad--;
+    width -= pad;
+
+    printf("%*.*s%.*s", width, width, msg, pad, "  ");
+
+    free(msg);
+}
+
+static void print_stats(char *name, stats *stats, char *(*fmt)(long double)) {
+    uint64_t max = stats->max;
+    long double mean  = stats_mean(stats);
+    long double stdev = stats_stdev(stats, mean);
+
+    printf("    %-10s", name);
+    print_units(mean,  fmt, 8);
+    print_units(stdev, fmt, 10);
+    print_units(max,   fmt, 9);
+    printf("%8.2Lf%%\n", stats_within_stdev(stats, mean, stdev, 1));
+}
+
+static void print_stats_latency(stats *stats) {
+    long double percentiles[] = { 50.0, 75.0, 90.0, 99.0 };
+    printf("  Latency Distribution\n");
+    for (size_t i = 0; i < sizeof(percentiles) / sizeof(long double); i++) {
+        long double p = percentiles[i];
+        uint64_t n = stats_percentile(stats, p);
+        printf("%7.0Lf%%", p);
+        print_units(n, format_time_us, 10);
+        printf("\n");
+    }
+}