Mercurial
comparison third_party/libuv/src/threadpool.c @ 160:948de3f54cea
[ThirdParty] Added libuv
| author | June Park <parkjune1995@gmail.com> |
|---|---|
| date | Wed, 14 Jan 2026 19:39:52 -0800 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| 159:05cf9467a1c3 | 160:948de3f54cea |
|---|---|
| 1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved. | |
| 2 * | |
| 3 * Permission is hereby granted, free of charge, to any person obtaining a copy | |
| 4 * of this software and associated documentation files (the "Software"), to | |
| 5 * deal in the Software without restriction, including without limitation the | |
| 6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or | |
| 7 * sell copies of the Software, and to permit persons to whom the Software is | |
| 8 * furnished to do so, subject to the following conditions: | |
| 9 * | |
| 10 * The above copyright notice and this permission notice shall be included in | |
| 11 * all copies or substantial portions of the Software. | |
| 12 * | |
| 13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
| 14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
| 15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
| 16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
| 17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING | |
| 18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS | |
| 19 * IN THE SOFTWARE. | |
| 20 */ | |
| 21 | |
| 22 #include "uv-common.h" | |
| 23 | |
| 24 #if !defined(_WIN32) | |
| 25 # include "unix/internal.h" | |
| 26 #endif | |
| 27 | |
| 28 #include <stdlib.h> | |
| 29 | |
| 30 #define MAX_THREADPOOL_SIZE 1024 | |
| 31 | |
| 32 static uv_once_t once = UV_ONCE_INIT; | |
| 33 static uv_cond_t cond; | |
| 34 static uv_mutex_t mutex; | |
| 35 static unsigned int idle_threads; | |
| 36 static unsigned int slow_io_work_running; | |
| 37 static unsigned int nthreads; | |
| 38 static uv_thread_t* threads; | |
| 39 static uv_thread_t default_threads[4]; | |
| 40 static struct uv__queue exit_message; | |
| 41 static struct uv__queue wq; | |
| 42 static struct uv__queue run_slow_work_message; | |
| 43 static struct uv__queue slow_io_pending_wq; | |
| 44 | |
| 45 static unsigned int slow_work_thread_threshold(void) { | |
| 46 return (nthreads + 1) / 2; | |
| 47 } | |
| 48 | |
| 49 static void uv__cancelled(struct uv__work* w) { | |
| 50 abort(); | |
| 51 } | |
| 52 | |
| 53 | |
| 54 /* To avoid deadlock with uv_cancel() it's crucial that the worker | |
| 55 * never holds the global mutex and the loop-local mutex at the same time. | |
| 56 */ | |
| 57 static void worker(void* arg) { | |
| 58 struct uv__work* w; | |
| 59 struct uv__queue* q; | |
| 60 int is_slow_work; | |
| 61 | |
| 62 uv_thread_setname("libuv-worker"); | |
| 63 uv_sem_post((uv_sem_t*) arg); | |
| 64 arg = NULL; | |
| 65 | |
| 66 uv_mutex_lock(&mutex); | |
| 67 for (;;) { | |
| 68 /* `mutex` should always be locked at this point. */ | |
| 69 | |
| 70 /* Keep waiting while either no work is present or only slow I/O | |
| 71 and we're at the threshold for that. */ | |
| 72 while (uv__queue_empty(&wq) || | |
| 73 (uv__queue_head(&wq) == &run_slow_work_message && | |
| 74 uv__queue_next(&run_slow_work_message) == &wq && | |
| 75 slow_io_work_running >= slow_work_thread_threshold())) { | |
| 76 idle_threads += 1; | |
| 77 uv_cond_wait(&cond, &mutex); | |
| 78 idle_threads -= 1; | |
| 79 } | |
| 80 | |
| 81 q = uv__queue_head(&wq); | |
| 82 if (q == &exit_message) { | |
| 83 uv_cond_signal(&cond); | |
| 84 uv_mutex_unlock(&mutex); | |
| 85 break; | |
| 86 } | |
| 87 | |
| 88 uv__queue_remove(q); | |
| 89 uv__queue_init(q); /* Signal uv_cancel() that the work req is executing. */ | |
| 90 | |
| 91 is_slow_work = 0; | |
| 92 if (q == &run_slow_work_message) { | |
| 93 /* If we're at the slow I/O threshold, re-schedule until after all | |
| 94 other work in the queue is done. */ | |
| 95 if (slow_io_work_running >= slow_work_thread_threshold()) { | |
| 96 uv__queue_insert_tail(&wq, q); | |
| 97 continue; | |
| 98 } | |
| 99 | |
| 100 /* If we encountered a request to run slow I/O work but there is none | |
| 101 to run, that means it's cancelled => Start over. */ | |
| 102 if (uv__queue_empty(&slow_io_pending_wq)) | |
| 103 continue; | |
| 104 | |
| 105 is_slow_work = 1; | |
| 106 slow_io_work_running++; | |
| 107 | |
| 108 q = uv__queue_head(&slow_io_pending_wq); | |
| 109 uv__queue_remove(q); | |
| 110 uv__queue_init(q); | |
| 111 | |
| 112 /* If there is more slow I/O work, schedule it to be run as well. */ | |
| 113 if (!uv__queue_empty(&slow_io_pending_wq)) { | |
| 114 uv__queue_insert_tail(&wq, &run_slow_work_message); | |
| 115 if (idle_threads > 0) | |
| 116 uv_cond_signal(&cond); | |
| 117 } | |
| 118 } | |
| 119 | |
| 120 uv_mutex_unlock(&mutex); | |
| 121 | |
| 122 w = uv__queue_data(q, struct uv__work, wq); | |
| 123 w->work(w); | |
| 124 | |
| 125 uv_mutex_lock(&w->loop->wq_mutex); | |
| 126 w->work = NULL; /* Signal uv_cancel() that the work req is done | |
| 127 executing. */ | |
| 128 uv__queue_insert_tail(&w->loop->wq, &w->wq); | |
| 129 uv_async_send(&w->loop->wq_async); | |
| 130 uv_mutex_unlock(&w->loop->wq_mutex); | |
| 131 | |
| 132 /* Lock `mutex` since that is expected at the start of the next | |
| 133 * iteration. */ | |
| 134 uv_mutex_lock(&mutex); | |
| 135 if (is_slow_work) { | |
| 136 /* `slow_io_work_running` is protected by `mutex`. */ | |
| 137 slow_io_work_running--; | |
| 138 } | |
| 139 } | |
| 140 } | |
| 141 | |
| 142 | |
| 143 static void post(struct uv__queue* q, enum uv__work_kind kind) { | |
| 144 uv_mutex_lock(&mutex); | |
| 145 if (kind == UV__WORK_SLOW_IO) { | |
| 146 /* Insert into a separate queue. */ | |
| 147 uv__queue_insert_tail(&slow_io_pending_wq, q); | |
| 148 if (!uv__queue_empty(&run_slow_work_message)) { | |
| 149 /* Running slow I/O tasks is already scheduled => Nothing to do here. | |
| 150 The worker that runs said other task will schedule this one as well. */ | |
| 151 uv_mutex_unlock(&mutex); | |
| 152 return; | |
| 153 } | |
| 154 q = &run_slow_work_message; | |
| 155 } | |
| 156 | |
| 157 uv__queue_insert_tail(&wq, q); | |
| 158 if (idle_threads > 0) | |
| 159 uv_cond_signal(&cond); | |
| 160 uv_mutex_unlock(&mutex); | |
| 161 } | |
| 162 | |
| 163 | |
| 164 #ifdef __MVS__ | |
| 165 /* TODO(itodorov) - zos: revisit when Woz compiler is available. */ | |
| 166 __attribute__((destructor)) | |
| 167 #endif | |
| 168 void uv__threadpool_cleanup(void) { | |
| 169 unsigned int i; | |
| 170 | |
| 171 if (nthreads == 0) | |
| 172 return; | |
| 173 | |
| 174 #ifndef __MVS__ | |
| 175 /* TODO(gabylb) - zos: revisit when Woz compiler is available. */ | |
| 176 post(&exit_message, UV__WORK_CPU); | |
| 177 #endif | |
| 178 | |
| 179 for (i = 0; i < nthreads; i++) | |
| 180 if (uv_thread_join(threads + i)) | |
| 181 abort(); | |
| 182 | |
| 183 if (threads != default_threads) | |
| 184 uv__free(threads); | |
| 185 | |
| 186 uv_mutex_destroy(&mutex); | |
| 187 uv_cond_destroy(&cond); | |
| 188 | |
| 189 threads = NULL; | |
| 190 nthreads = 0; | |
| 191 } | |
| 192 | |
| 193 | |
| 194 static void init_threads(void) { | |
| 195 uv_thread_options_t config; | |
| 196 unsigned int i; | |
| 197 const char* val; | |
| 198 uv_sem_t sem; | |
| 199 | |
| 200 nthreads = ARRAY_SIZE(default_threads); | |
| 201 val = getenv("UV_THREADPOOL_SIZE"); | |
| 202 if (val != NULL) | |
| 203 nthreads = atoi(val); | |
| 204 if (nthreads == 0) | |
| 205 nthreads = 1; | |
| 206 if (nthreads > MAX_THREADPOOL_SIZE) | |
| 207 nthreads = MAX_THREADPOOL_SIZE; | |
| 208 | |
| 209 threads = default_threads; | |
| 210 if (nthreads > ARRAY_SIZE(default_threads)) { | |
| 211 threads = uv__malloc(nthreads * sizeof(threads[0])); | |
| 212 if (threads == NULL) { | |
| 213 nthreads = ARRAY_SIZE(default_threads); | |
| 214 threads = default_threads; | |
| 215 } | |
| 216 } | |
| 217 | |
| 218 if (uv_cond_init(&cond)) | |
| 219 abort(); | |
| 220 | |
| 221 if (uv_mutex_init(&mutex)) | |
| 222 abort(); | |
| 223 | |
| 224 uv__queue_init(&wq); | |
| 225 uv__queue_init(&slow_io_pending_wq); | |
| 226 uv__queue_init(&run_slow_work_message); | |
| 227 | |
| 228 if (uv_sem_init(&sem, 0)) | |
| 229 abort(); | |
| 230 | |
| 231 config.flags = UV_THREAD_HAS_STACK_SIZE; | |
| 232 config.stack_size = 8u << 20; /* 8 MB */ | |
| 233 | |
| 234 for (i = 0; i < nthreads; i++) | |
| 235 if (uv_thread_create_ex(threads + i, &config, worker, &sem)) | |
| 236 abort(); | |
| 237 | |
| 238 for (i = 0; i < nthreads; i++) | |
| 239 uv_sem_wait(&sem); | |
| 240 | |
| 241 uv_sem_destroy(&sem); | |
| 242 } | |
| 243 | |
| 244 | |
| 245 #ifndef _WIN32 | |
| 246 static void reset_once(void) { | |
| 247 uv_once_t child_once = UV_ONCE_INIT; | |
| 248 memcpy(&once, &child_once, sizeof(child_once)); | |
| 249 } | |
| 250 #endif | |
| 251 | |
| 252 | |
| 253 static void init_once(void) { | |
| 254 #ifndef _WIN32 | |
| 255 /* Re-initialize the threadpool after fork. | |
| 256 * Note that this discards the global mutex and condition as well | |
| 257 * as the work queue. | |
| 258 */ | |
| 259 if (pthread_atfork(NULL, NULL, &reset_once)) | |
| 260 abort(); | |
| 261 #endif | |
| 262 init_threads(); | |
| 263 } | |
| 264 | |
| 265 | |
| 266 void uv__work_submit(uv_loop_t* loop, | |
| 267 struct uv__work* w, | |
| 268 enum uv__work_kind kind, | |
| 269 void (*work)(struct uv__work* w), | |
| 270 void (*done)(struct uv__work* w, int status)) { | |
| 271 uv_once(&once, init_once); | |
| 272 w->loop = loop; | |
| 273 w->work = work; | |
| 274 w->done = done; | |
| 275 post(&w->wq, kind); | |
| 276 } | |
| 277 | |
| 278 | |
| 279 /* TODO(bnoordhuis) teach libuv how to cancel file operations | |
| 280 * that go through io_uring instead of the thread pool. | |
| 281 */ | |
| 282 static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) { | |
| 283 int cancelled; | |
| 284 | |
| 285 uv_once(&once, init_once); /* Ensure |mutex| is initialized. */ | |
| 286 uv_mutex_lock(&mutex); | |
| 287 uv_mutex_lock(&w->loop->wq_mutex); | |
| 288 | |
| 289 cancelled = !uv__queue_empty(&w->wq) && w->work != NULL; | |
| 290 if (cancelled) | |
| 291 uv__queue_remove(&w->wq); | |
| 292 | |
| 293 uv_mutex_unlock(&w->loop->wq_mutex); | |
| 294 uv_mutex_unlock(&mutex); | |
| 295 | |
| 296 if (!cancelled) | |
| 297 return UV_EBUSY; | |
| 298 | |
| 299 w->work = uv__cancelled; | |
| 300 uv_mutex_lock(&loop->wq_mutex); | |
| 301 uv__queue_insert_tail(&loop->wq, &w->wq); | |
| 302 uv_async_send(&loop->wq_async); | |
| 303 uv_mutex_unlock(&loop->wq_mutex); | |
| 304 | |
| 305 return 0; | |
| 306 } | |
| 307 | |
| 308 | |
| 309 void uv__work_done(uv_async_t* handle) { | |
| 310 struct uv__work* w; | |
| 311 uv_loop_t* loop; | |
| 312 struct uv__queue* q; | |
| 313 struct uv__queue wq; | |
| 314 int err; | |
| 315 int nevents; | |
| 316 | |
| 317 loop = container_of(handle, uv_loop_t, wq_async); | |
| 318 uv_mutex_lock(&loop->wq_mutex); | |
| 319 uv__queue_move(&loop->wq, &wq); | |
| 320 uv_mutex_unlock(&loop->wq_mutex); | |
| 321 | |
| 322 nevents = 0; | |
| 323 | |
| 324 while (!uv__queue_empty(&wq)) { | |
| 325 q = uv__queue_head(&wq); | |
| 326 uv__queue_remove(q); | |
| 327 | |
| 328 w = container_of(q, struct uv__work, wq); | |
| 329 err = (w->work == uv__cancelled) ? UV_ECANCELED : 0; | |
| 330 w->done(w, err); | |
| 331 nevents++; | |
| 332 } | |
| 333 | |
| 334 /* This check accomplishes 2 things: | |
| 335 * 1. Even if the queue was empty, the call to uv__work_done() should count | |
| 336 * as an event. Which will have been added by the event loop when | |
| 337 * calling this callback. | |
| 338 * 2. Prevents accidental wrap around in case nevents == 0 events == 0. | |
| 339 */ | |
| 340 if (nevents > 1) { | |
| 341 /* Subtract 1 to counter the call to uv__work_done(). */ | |
| 342 uv__metrics_inc_events(loop, nevents - 1); | |
| 343 if (uv__get_internal_fields(loop)->current_timeout == 0) | |
| 344 uv__metrics_inc_events_waiting(loop, nevents - 1); | |
| 345 } | |
| 346 } | |
| 347 | |
| 348 | |
| 349 static void uv__queue_work(struct uv__work* w) { | |
| 350 uv_work_t* req = container_of(w, uv_work_t, work_req); | |
| 351 | |
| 352 req->work_cb(req); | |
| 353 } | |
| 354 | |
| 355 | |
| 356 static void uv__queue_done(struct uv__work* w, int err) { | |
| 357 uv_work_t* req; | |
| 358 | |
| 359 req = container_of(w, uv_work_t, work_req); | |
| 360 uv__req_unregister(req->loop); | |
| 361 | |
| 362 if (req->after_work_cb == NULL) | |
| 363 return; | |
| 364 | |
| 365 req->after_work_cb(req, err); | |
| 366 } | |
| 367 | |
| 368 | |
| 369 int uv_queue_work(uv_loop_t* loop, | |
| 370 uv_work_t* req, | |
| 371 uv_work_cb work_cb, | |
| 372 uv_after_work_cb after_work_cb) { | |
| 373 if (work_cb == NULL) | |
| 374 return UV_EINVAL; | |
| 375 | |
| 376 uv__req_init(loop, req, UV_WORK); | |
| 377 req->loop = loop; | |
| 378 req->work_cb = work_cb; | |
| 379 req->after_work_cb = after_work_cb; | |
| 380 uv__work_submit(loop, | |
| 381 &req->work_req, | |
| 382 UV__WORK_CPU, | |
| 383 uv__queue_work, | |
| 384 uv__queue_done); | |
| 385 return 0; | |
| 386 } | |
| 387 | |
| 388 | |
| 389 int uv_cancel(uv_req_t* req) { | |
| 390 struct uv__work* wreq; | |
| 391 uv_loop_t* loop; | |
| 392 | |
| 393 switch (req->type) { | |
| 394 case UV_FS: | |
| 395 loop = ((uv_fs_t*) req)->loop; | |
| 396 wreq = &((uv_fs_t*) req)->work_req; | |
| 397 break; | |
| 398 case UV_GETADDRINFO: | |
| 399 loop = ((uv_getaddrinfo_t*) req)->loop; | |
| 400 wreq = &((uv_getaddrinfo_t*) req)->work_req; | |
| 401 break; | |
| 402 case UV_GETNAMEINFO: | |
| 403 loop = ((uv_getnameinfo_t*) req)->loop; | |
| 404 wreq = &((uv_getnameinfo_t*) req)->work_req; | |
| 405 break; | |
| 406 case UV_RANDOM: | |
| 407 loop = ((uv_random_t*) req)->loop; | |
| 408 wreq = &((uv_random_t*) req)->work_req; | |
| 409 break; | |
| 410 case UV_WORK: | |
| 411 loop = ((uv_work_t*) req)->loop; | |
| 412 wreq = &((uv_work_t*) req)->work_req; | |
| 413 break; | |
| 414 default: | |
| 415 return UV_EINVAL; | |
| 416 } | |
| 417 | |
| 418 return uv__work_cancel(loop, req, wreq); | |
| 419 } |