Mercurial
diff third_party/libuv/test/test-poll.c @ 160:948de3f54cea
[ThirdParty] Added libuv
| author | June Park <parkjune1995@gmail.com> |
|---|---|
| date | Wed, 14 Jan 2026 19:39:52 -0800 |
| parents | |
| children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/third_party/libuv/test/test-poll.c Wed Jan 14 19:39:52 2026 -0800 @@ -0,0 +1,730 @@ +/* Copyright Joyent, Inc. and other Node contributors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include <errno.h> + +#ifdef _WIN32 +# include <fcntl.h> +# define close _close +#else +# include <sys/socket.h> +# include <unistd.h> +#endif + +#include "uv.h" +#include "task.h" + +#ifdef __linux__ +# include <sys/epoll.h> +#endif + +#ifdef UV_HAVE_KQUEUE +# include <sys/types.h> +# include <sys/event.h> +# include <sys/time.h> +#endif + + +#define NUM_CLIENTS 5 +#define TRANSFER_BYTES (1 << 16) + +#undef MIN +#define MIN(a, b) (((a) < (b)) ? (a) : (b)); + + +typedef enum { + UNIDIRECTIONAL, + DUPLEX +} test_mode_t; + +typedef struct connection_context_s { + uv_poll_t poll_handle; + uv_timer_t timer_handle; + uv_os_sock_t sock; + size_t read, sent; + int is_server_connection; + int open_handles; + int got_fin, sent_fin, got_disconnect; + unsigned int events, delayed_events; +} connection_context_t; + +typedef struct server_context_s { + uv_poll_t poll_handle; + uv_os_sock_t sock; + int connections; +} server_context_t; + + +static void delay_timer_cb(uv_timer_t* timer); + + +static test_mode_t test_mode = DUPLEX; + +static int closed_connections = 0; + +static int valid_writable_wakeups = 0; +static int spurious_writable_wakeups = 0; + +#if !defined(__sun) && !defined(_AIX) && !defined(__MVS__) +static int disconnects = 0; +#endif /* !__sun && !_AIX && !__MVS__ */ + +static int got_eagain(void) { +#ifdef _WIN32 + return WSAGetLastError() == WSAEWOULDBLOCK; +#else + return errno == EAGAIN + || errno == EINPROGRESS +#ifdef EWOULDBLOCK + || errno == EWOULDBLOCK; +#endif + ; +#endif +} + + +static uv_os_sock_t create_bound_socket (struct sockaddr_in bind_addr) { + uv_os_sock_t sock; + int r; + + sock = socket(AF_INET, SOCK_STREAM, IPPROTO_IP); +#ifdef _WIN32 + ASSERT_NE(sock, INVALID_SOCKET); +#else + ASSERT_GE(sock, 0); +#endif + +#ifndef _WIN32 + { + /* Allow reuse of the port. */ + int yes = 1; + r = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof yes); + ASSERT_OK(r); + } +#endif + + r = bind(sock, (const struct sockaddr*) &bind_addr, sizeof bind_addr); + ASSERT_OK(r); + + return sock; +} + + +static void close_socket(uv_os_sock_t sock) { + int r; +#ifdef _WIN32 + r = closesocket(sock); +#else + r = close(sock); +#endif + /* On FreeBSD close() can fail with ECONNRESET if the socket was shutdown by + * the peer before all pending data was delivered. + */ + ASSERT(r == 0 || errno == ECONNRESET); +} + + +static connection_context_t* create_connection_context( + uv_os_sock_t sock, int is_server_connection) { + int r; + connection_context_t* context; + + context = (connection_context_t*) malloc(sizeof *context); + ASSERT_NOT_NULL(context); + + context->sock = sock; + context->is_server_connection = is_server_connection; + context->read = 0; + context->sent = 0; + context->open_handles = 0; + context->events = 0; + context->delayed_events = 0; + context->got_fin = 0; + context->sent_fin = 0; + context->got_disconnect = 0; + + r = uv_poll_init_socket(uv_default_loop(), &context->poll_handle, sock); + context->open_handles++; + context->poll_handle.data = context; + ASSERT_OK(r); + + r = uv_timer_init(uv_default_loop(), &context->timer_handle); + context->open_handles++; + context->timer_handle.data = context; + ASSERT_OK(r); + + return context; +} + + +static void connection_close_cb(uv_handle_t* handle) { + connection_context_t* context = (connection_context_t*) handle->data; + + if (--context->open_handles == 0) { + if (test_mode == DUPLEX || context->is_server_connection) { + ASSERT_EQ(context->read, TRANSFER_BYTES); + } else { + ASSERT_OK(context->read); + } + + if (test_mode == DUPLEX || !context->is_server_connection) { + ASSERT_EQ(context->sent, TRANSFER_BYTES); + } else { + ASSERT_OK(context->sent); + } + + closed_connections++; + + free(context); + } +} + + +static void destroy_connection_context(connection_context_t* context) { + uv_close((uv_handle_t*) &context->poll_handle, connection_close_cb); + uv_close((uv_handle_t*) &context->timer_handle, connection_close_cb); +} + + +static void connection_poll_cb(uv_poll_t* handle, int status, int events) { + connection_context_t* context = (connection_context_t*) handle->data; + unsigned int new_events; + int r; + + ASSERT_OK(status); + ASSERT(events & context->events); + ASSERT(!(events & ~context->events)); + + new_events = context->events; + + if (events & UV_READABLE) { + int action = rand() % 7; + + switch (action) { + case 0: + case 1: { + /* Read a couple of bytes. */ + static char buffer[74]; + + do + r = recv(context->sock, buffer, sizeof buffer, 0); + while (r == -1 && errno == EINTR); + ASSERT_GE(r, 0); + + if (r > 0) { + context->read += r; + } else { + /* Got FIN. */ + context->got_fin = 1; + new_events &= ~UV_READABLE; + } + + break; + } + + case 2: + case 3: { + /* Read until EAGAIN. */ + static char buffer[931]; + + for (;;) { + do + r = recv(context->sock, buffer, sizeof buffer, 0); + while (r == -1 && errno == EINTR); + + if (r <= 0) + break; + + context->read += r; + } + + if (r == 0) { + /* Got FIN. */ + context->got_fin = 1; + new_events &= ~UV_READABLE; + } else { + ASSERT(got_eagain()); + } + + break; + } + + case 4: + /* Ignore. */ + break; + + case 5: + /* Stop reading for a while. Restart in timer callback. */ + new_events &= ~UV_READABLE; + if (!uv_is_active((uv_handle_t*) &context->timer_handle)) { + context->delayed_events = UV_READABLE; + uv_timer_start(&context->timer_handle, delay_timer_cb, 10, 0); + } else { + context->delayed_events |= UV_READABLE; + } + break; + + case 6: + /* Fudge with the event mask. */ + uv_poll_start(&context->poll_handle, UV_WRITABLE, connection_poll_cb); + uv_poll_start(&context->poll_handle, UV_READABLE, connection_poll_cb); + context->events = UV_READABLE; + break; + + default: + ASSERT(0); + } + } + + if (events & UV_WRITABLE) { + if (context->sent < TRANSFER_BYTES && + !(test_mode == UNIDIRECTIONAL && context->is_server_connection)) { + /* We have to send more bytes. */ + int action = rand() % 7; + + switch (action) { + case 0: + case 1: { + /* Send a couple of bytes. */ + static char buffer[103]; + + int send_bytes = MIN(TRANSFER_BYTES - context->sent, sizeof buffer); + ASSERT_GT(send_bytes, 0); + + do + r = send(context->sock, buffer, send_bytes, 0); + while (r == -1 && errno == EINTR); + + if (r < 0) { + ASSERT(got_eagain()); + spurious_writable_wakeups++; + break; + } + + ASSERT_GT(r, 0); + context->sent += r; + valid_writable_wakeups++; + break; + } + + case 2: + case 3: { + /* Send until EAGAIN. */ + static char buffer[1234]; + + int send_bytes = MIN(TRANSFER_BYTES - context->sent, sizeof buffer); + ASSERT_GT(send_bytes, 0); + + do + r = send(context->sock, buffer, send_bytes, 0); + while (r == -1 && errno == EINTR); + + if (r < 0) { + ASSERT(got_eagain()); + spurious_writable_wakeups++; + break; + } + + ASSERT_GT(r, 0); + valid_writable_wakeups++; + context->sent += r; + + while (context->sent < TRANSFER_BYTES) { + send_bytes = MIN(TRANSFER_BYTES - context->sent, sizeof buffer); + ASSERT_GT(send_bytes, 0); + + do + r = send(context->sock, buffer, send_bytes, 0); + while (r == -1 && errno == EINTR); + ASSERT(r); + + if (r < 0) { + ASSERT(got_eagain()); + break; + } + + context->sent += r; + } + break; + } + + case 4: + /* Ignore. */ + break; + + case 5: + /* Stop sending for a while. Restart in timer callback. */ + new_events &= ~UV_WRITABLE; + if (!uv_is_active((uv_handle_t*) &context->timer_handle)) { + context->delayed_events = UV_WRITABLE; + uv_timer_start(&context->timer_handle, delay_timer_cb, 100, 0); + } else { + context->delayed_events |= UV_WRITABLE; + } + break; + + case 6: + /* Fudge with the event mask. */ + uv_poll_start(&context->poll_handle, + UV_READABLE, + connection_poll_cb); + uv_poll_start(&context->poll_handle, + UV_WRITABLE, + connection_poll_cb); + context->events = UV_WRITABLE; + break; + + default: + ASSERT(0); + } + + } else { + /* Nothing more to write. Send FIN. */ + int r; +#ifdef _WIN32 + r = shutdown(context->sock, SD_SEND); +#else + r = shutdown(context->sock, SHUT_WR); +#endif + ASSERT_OK(r); + context->sent_fin = 1; + new_events &= ~UV_WRITABLE; + } + } +#if !defined(__sun) && !defined(_AIX) && !defined(__MVS__) + if (events & UV_DISCONNECT) { + context->got_disconnect = 1; + ++disconnects; + new_events &= ~UV_DISCONNECT; + } + + if (context->got_fin && context->sent_fin && context->got_disconnect) { +#else /* __sun && _AIX && __MVS__ */ + if (context->got_fin && context->sent_fin) { +#endif /* !__sun && !_AIX && !__MVS__ */ + /* Sent and received FIN. Close and destroy context. */ + close_socket(context->sock); + destroy_connection_context(context); + context->events = 0; + + } else if (new_events != context->events) { + /* Poll mask changed. Call uv_poll_start again. */ + context->events = new_events; + uv_poll_start(handle, new_events, connection_poll_cb); + } + + /* Assert that uv_is_active works correctly for poll handles. */ + if (context->events != 0) { + ASSERT_EQ(1, uv_is_active((uv_handle_t*) handle)); + } else { + ASSERT_OK(uv_is_active((uv_handle_t*) handle)); + } +} + + +static void delay_timer_cb(uv_timer_t* timer) { + connection_context_t* context = (connection_context_t*) timer->data; + int r; + + /* Timer should auto stop. */ + ASSERT_OK(uv_is_active((uv_handle_t*) timer)); + + /* Add the requested events to the poll mask. */ + ASSERT(context->delayed_events != 0); + context->events |= context->delayed_events; + context->delayed_events = 0; + + r = uv_poll_start(&context->poll_handle, + context->events, + connection_poll_cb); + ASSERT_OK(r); +} + + +static server_context_t* create_server_context( + uv_os_sock_t sock) { + int r; + server_context_t* context; + + context = (server_context_t*) malloc(sizeof *context); + ASSERT_NOT_NULL(context); + + context->sock = sock; + context->connections = 0; + + r = uv_poll_init_socket(uv_default_loop(), &context->poll_handle, sock); + context->poll_handle.data = context; + ASSERT_OK(r); + + return context; +} + + +static void server_close_cb(uv_handle_t* handle) { + server_context_t* context = (server_context_t*) handle->data; + free(context); +} + + +static void destroy_server_context(server_context_t* context) { + uv_close((uv_handle_t*) &context->poll_handle, server_close_cb); +} + + +static void server_poll_cb(uv_poll_t* handle, int status, int events) { + server_context_t* server_context = (server_context_t*) + handle->data; + connection_context_t* connection_context; + struct sockaddr_in addr; + socklen_t addr_len; + uv_os_sock_t sock; + int r; + + addr_len = sizeof addr; + sock = accept(server_context->sock, (struct sockaddr*) &addr, &addr_len); +#ifdef _WIN32 + ASSERT_NE(sock, INVALID_SOCKET); +#else + ASSERT_GE(sock, 0); +#endif + + connection_context = create_connection_context(sock, 1); + connection_context->events = UV_READABLE | UV_WRITABLE | UV_DISCONNECT; + r = uv_poll_start(&connection_context->poll_handle, + UV_READABLE | UV_WRITABLE | UV_DISCONNECT, + connection_poll_cb); + ASSERT_OK(r); + + if (++server_context->connections == NUM_CLIENTS) { + close_socket(server_context->sock); + destroy_server_context(server_context); + } +} + + +static void start_server(void) { + server_context_t* context; + struct sockaddr_in addr; + uv_os_sock_t sock; + int r; + + ASSERT_OK(uv_ip4_addr("127.0.0.1", TEST_PORT, &addr)); + sock = create_bound_socket(addr); + context = create_server_context(sock); + + r = listen(sock, 100); + ASSERT_OK(r); + + r = uv_poll_start(&context->poll_handle, UV_READABLE, server_poll_cb); + ASSERT_OK(r); +} + + +static void start_client(void) { + uv_os_sock_t sock; + connection_context_t* context; + struct sockaddr_in server_addr; + struct sockaddr_in addr; + int r; + + ASSERT_OK(uv_ip4_addr("127.0.0.1", TEST_PORT, &server_addr)); + ASSERT_OK(uv_ip4_addr("0.0.0.0", 0, &addr)); + + sock = create_bound_socket(addr); + context = create_connection_context(sock, 0); + + context->events = UV_READABLE | UV_WRITABLE | UV_DISCONNECT; + r = uv_poll_start(&context->poll_handle, + UV_READABLE | UV_WRITABLE | UV_DISCONNECT, + connection_poll_cb); + ASSERT_OK(r); + + r = connect(sock, (struct sockaddr*) &server_addr, sizeof server_addr); + ASSERT(r == 0 || got_eagain()); +} + + +static void start_poll_test(void) { + int i, r; + +#ifdef _WIN32 + { + struct WSAData wsa_data; + int r = WSAStartup(MAKEWORD(2, 2), &wsa_data); + ASSERT_OK(r); + } +#endif + + start_server(); + + for (i = 0; i < NUM_CLIENTS; i++) + start_client(); + + r = uv_run(uv_default_loop(), UV_RUN_DEFAULT); + ASSERT_OK(r); + + /* Assert that at most five percent of the writable wakeups was spurious. */ + ASSERT_NE(spurious_writable_wakeups == 0 || + (valid_writable_wakeups + spurious_writable_wakeups) / + spurious_writable_wakeups > 20, 0); + + ASSERT_EQ(closed_connections, NUM_CLIENTS * 2); +#if !defined(__sun) && !defined(_AIX) && !defined(__MVS__) + ASSERT_EQ(disconnects, NUM_CLIENTS * 2); +#endif + MAKE_VALGRIND_HAPPY(uv_default_loop()); +} + + +/* Issuing a shutdown() on IBM i PASE with parameter SHUT_WR + * also sends a normal close sequence to the partner program. + * This leads to timing issues and ECONNRESET failures in the + * test 'poll_duplex' and 'poll_unidirectional'. + * + * https://www.ibm.com/support/knowledgecenter/en/ssw_ibm_i_74/apis/shutdn.htm + */ +TEST_IMPL(poll_duplex) { +#if defined(NO_SELF_CONNECT) + RETURN_SKIP(NO_SELF_CONNECT); +#elif defined(__PASE__) + RETURN_SKIP("API shutdown() may lead to timing issue on IBM i PASE"); +#endif + test_mode = DUPLEX; + start_poll_test(); + return 0; +} + + +TEST_IMPL(poll_unidirectional) { +#if defined(NO_SELF_CONNECT) + RETURN_SKIP(NO_SELF_CONNECT); +#elif defined(__PASE__) + RETURN_SKIP("API shutdown() may lead to timing issue on IBM i PASE"); +#endif + test_mode = UNIDIRECTIONAL; + start_poll_test(); + return 0; +} + + +/* Windows won't let you open a directory so we open a file instead. + * OS X lets you poll a file so open the $PWD instead. Both fail + * on Linux so it doesn't matter which one we pick. Both succeed + * on Solaris and AIX so skip the test on those platforms. + * On *BSD/Darwin, we disallow polling of regular files, directories. + * In addition to regular files, we also disallow FIFOs on Darwin. + */ +#ifdef __APPLE__ +#define TEST_POLL_FIFO_PATH "/tmp/uv-test-poll-fifo" +#endif +TEST_IMPL(poll_bad_fdtype) { +#if !defined(__sun) && \ + !defined(_AIX) && !defined(__MVS__) && \ + !defined(__CYGWIN__) && !defined(__MSYS__) + uv_poll_t poll_handle; + int fd[2]; + +#if defined(_WIN32) + fd[0] = _open("test/fixtures/empty_file", UV_FS_O_RDONLY); +#else + fd[0] = open(".", UV_FS_O_RDONLY); +#endif + ASSERT_NE(fd[0], -1); + ASSERT_NE(0, uv_poll_init(uv_default_loop(), &poll_handle, fd[0])); + ASSERT_OK(close(fd[0])); +#if defined(__APPLE__) || \ + defined(__DragonFly__) || \ + defined(__FreeBSD__) || \ + defined(__OpenBSD__) || \ + defined(__NetBSD__) + fd[0] = open("test/fixtures/empty_file", UV_FS_O_RDONLY); + ASSERT_NE(fd[0], -1); + /* Regular files should be banned from kqueue. */ + ASSERT_NE(0, uv_poll_init(uv_default_loop(), &poll_handle, fd[0])); + ASSERT_OK(close(fd[0])); +#ifdef __APPLE__ + ASSERT_OK(pipe(fd)); + /* Pipes should be permitted in kqueue. */ + ASSERT_EQ(0, uv_poll_init(uv_default_loop(), &poll_handle, fd[0])); + ASSERT_OK(close(fd[0])); + ASSERT_OK(close(fd[1])); + + ASSERT_OK(mkfifo(TEST_POLL_FIFO_PATH, 0600)); + fd[0] = open(TEST_POLL_FIFO_PATH, O_RDONLY | O_NONBLOCK); + ASSERT_NE(fd[0], -1); + fd[1] = open(TEST_POLL_FIFO_PATH, O_WRONLY | O_NONBLOCK); + ASSERT_NE(fd[1], -1); + /* FIFOs should be banned from kqueue. */ + ASSERT_NE(0, uv_poll_init(uv_default_loop(), &poll_handle, fd[0])); + ASSERT_OK(close(fd[0])); + ASSERT_OK(close(fd[1])); + unlink(TEST_POLL_FIFO_PATH); +#endif +#endif +#endif + + MAKE_VALGRIND_HAPPY(uv_default_loop()); + return 0; +} + + +#ifdef __linux__ +TEST_IMPL(poll_nested_epoll) { + uv_poll_t poll_handle; + int fd; + + fd = epoll_create(1); + ASSERT_NE(fd, -1); + + ASSERT_OK(uv_poll_init(uv_default_loop(), &poll_handle, fd)); + ASSERT_OK(uv_poll_start(&poll_handle, UV_READABLE, (uv_poll_cb) abort)); + ASSERT_NE(0, uv_run(uv_default_loop(), UV_RUN_NOWAIT)); + + uv_close((uv_handle_t*) &poll_handle, NULL); + ASSERT_OK(uv_run(uv_default_loop(), UV_RUN_DEFAULT)); + ASSERT_OK(close(fd)); + + MAKE_VALGRIND_HAPPY(uv_default_loop()); + return 0; +} +#endif /* __linux__ */ + + +#ifdef UV_HAVE_KQUEUE +TEST_IMPL(poll_nested_kqueue) { + uv_poll_t poll_handle; + int fd; + + fd = kqueue(); + ASSERT_NE(fd, -1); + + ASSERT_OK(uv_poll_init(uv_default_loop(), &poll_handle, fd)); + ASSERT_OK(uv_poll_start(&poll_handle, UV_READABLE, (uv_poll_cb) abort)); + ASSERT_NE(0, uv_run(uv_default_loop(), UV_RUN_NOWAIT)); + + uv_close((uv_handle_t*) &poll_handle, NULL); + ASSERT_OK(uv_run(uv_default_loop(), UV_RUN_DEFAULT)); + ASSERT_OK(close(fd)); + + MAKE_VALGRIND_HAPPY(uv_default_loop()); + return 0; +} +#endif /* UV_HAVE_KQUEUE */