Mercurial
comparison third_party/libuv/src/unix/stream.c @ 160:948de3f54cea
[ThirdParty] Added libuv
| author | June Park <parkjune1995@gmail.com> |
|---|---|
| date | Wed, 14 Jan 2026 19:39:52 -0800 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| 159:05cf9467a1c3 | 160:948de3f54cea |
|---|---|
| 1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved. | |
| 2 * | |
| 3 * Permission is hereby granted, free of charge, to any person obtaining a copy | |
| 4 * of this software and associated documentation files (the "Software"), to | |
| 5 * deal in the Software without restriction, including without limitation the | |
| 6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or | |
| 7 * sell copies of the Software, and to permit persons to whom the Software is | |
| 8 * furnished to do so, subject to the following conditions: | |
| 9 * | |
| 10 * The above copyright notice and this permission notice shall be included in | |
| 11 * all copies or substantial portions of the Software. | |
| 12 * | |
| 13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
| 14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
| 15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
| 16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
| 17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING | |
| 18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS | |
| 19 * IN THE SOFTWARE. | |
| 20 */ | |
| 21 | |
| 22 #include "uv.h" | |
| 23 #include "internal.h" | |
| 24 | |
| 25 #include <stdio.h> | |
| 26 #include <stdlib.h> | |
| 27 #include <string.h> | |
| 28 #include <assert.h> | |
| 29 #include <errno.h> | |
| 30 | |
| 31 #include <sys/types.h> | |
| 32 #include <sys/socket.h> | |
| 33 #include <sys/uio.h> | |
| 34 #include <sys/un.h> | |
| 35 #include <unistd.h> | |
| 36 #include <limits.h> /* IOV_MAX */ | |
| 37 | |
| 38 #if defined(__APPLE__) | |
| 39 # include <sys/event.h> | |
| 40 # include <sys/time.h> | |
| 41 # include <sys/select.h> | |
| 42 | |
| 43 /* Forward declaration */ | |
| 44 typedef struct uv__stream_select_s uv__stream_select_t; | |
| 45 | |
| 46 struct uv__stream_select_s { | |
| 47 uv_stream_t* stream; | |
| 48 uv_thread_t thread; | |
| 49 uv_sem_t close_sem; | |
| 50 uv_sem_t async_sem; | |
| 51 uv_async_t async; | |
| 52 int events; | |
| 53 int fake_fd; | |
| 54 int int_fd; | |
| 55 int fd; | |
| 56 fd_set* sread; | |
| 57 size_t sread_sz; | |
| 58 fd_set* swrite; | |
| 59 size_t swrite_sz; | |
| 60 }; | |
| 61 #endif /* defined(__APPLE__) */ | |
| 62 | |
| 63 union uv__cmsg { | |
| 64 struct cmsghdr hdr; | |
| 65 /* This cannot be larger because of the IBMi PASE limitation that | |
| 66 * the total size of control messages cannot exceed 256 bytes. | |
| 67 */ | |
| 68 char pad[256]; | |
| 69 }; | |
| 70 | |
| 71 STATIC_ASSERT(256 == sizeof(union uv__cmsg)); | |
| 72 | |
| 73 static void uv__stream_connect(uv_stream_t*); | |
| 74 static void uv__write(uv_stream_t* stream); | |
| 75 static void uv__read(uv_stream_t* stream); | |
| 76 static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events); | |
| 77 static void uv__write_callbacks(uv_stream_t* stream); | |
| 78 static size_t uv__write_req_size(uv_write_t* req); | |
| 79 static void uv__drain(uv_stream_t* stream); | |
| 80 | |
| 81 | |
| 82 void uv__stream_init(uv_loop_t* loop, | |
| 83 uv_stream_t* stream, | |
| 84 uv_handle_type type) { | |
| 85 int err; | |
| 86 | |
| 87 uv__handle_init(loop, (uv_handle_t*)stream, type); | |
| 88 stream->read_cb = NULL; | |
| 89 stream->alloc_cb = NULL; | |
| 90 stream->close_cb = NULL; | |
| 91 stream->connection_cb = NULL; | |
| 92 stream->connect_req = NULL; | |
| 93 stream->shutdown_req = NULL; | |
| 94 stream->accepted_fd = -1; | |
| 95 stream->queued_fds = NULL; | |
| 96 stream->delayed_error = 0; | |
| 97 uv__queue_init(&stream->write_queue); | |
| 98 uv__queue_init(&stream->write_completed_queue); | |
| 99 stream->write_queue_size = 0; | |
| 100 | |
| 101 if (loop->emfile_fd == -1) { | |
| 102 err = uv__open_cloexec("/dev/null", O_RDONLY); | |
| 103 if (err < 0) | |
| 104 /* In the rare case that "/dev/null" isn't mounted open "/" | |
| 105 * instead. | |
| 106 */ | |
| 107 err = uv__open_cloexec("/", O_RDONLY); | |
| 108 if (err >= 0) | |
| 109 loop->emfile_fd = err; | |
| 110 } | |
| 111 | |
| 112 #if defined(__APPLE__) | |
| 113 stream->select = NULL; | |
| 114 #endif /* defined(__APPLE_) */ | |
| 115 | |
| 116 uv__io_init(&stream->io_watcher, uv__stream_io, -1); | |
| 117 } | |
| 118 | |
| 119 | |
| 120 static void uv__stream_osx_interrupt_select(uv_stream_t* stream) { | |
| 121 #if defined(__APPLE__) | |
| 122 /* Notify select() thread about state change */ | |
| 123 uv__stream_select_t* s; | |
| 124 int r; | |
| 125 | |
| 126 s = stream->select; | |
| 127 if (s == NULL) | |
| 128 return; | |
| 129 | |
| 130 /* Interrupt select() loop | |
| 131 * NOTE: fake_fd and int_fd are socketpair(), thus writing to one will | |
| 132 * emit read event on other side | |
| 133 */ | |
| 134 do | |
| 135 r = write(s->fake_fd, "x", 1); | |
| 136 while (r == -1 && errno == EINTR); | |
| 137 | |
| 138 assert(r == 1); | |
| 139 #else /* !defined(__APPLE__) */ | |
| 140 /* No-op on any other platform */ | |
| 141 #endif /* !defined(__APPLE__) */ | |
| 142 } | |
| 143 | |
| 144 | |
| 145 #if defined(__APPLE__) | |
| 146 static void uv__stream_osx_select(void* arg) { | |
| 147 uv_stream_t* stream; | |
| 148 uv__stream_select_t* s; | |
| 149 char buf[1024]; | |
| 150 int events; | |
| 151 int fd; | |
| 152 int r; | |
| 153 int max_fd; | |
| 154 | |
| 155 stream = arg; | |
| 156 s = stream->select; | |
| 157 fd = s->fd; | |
| 158 | |
| 159 if (fd > s->int_fd) | |
| 160 max_fd = fd; | |
| 161 else | |
| 162 max_fd = s->int_fd; | |
| 163 | |
| 164 for (;;) { | |
| 165 /* Terminate on semaphore */ | |
| 166 if (uv_sem_trywait(&s->close_sem) == 0) | |
| 167 break; | |
| 168 | |
| 169 /* Watch fd using select(2) */ | |
| 170 memset(s->sread, 0, s->sread_sz); | |
| 171 memset(s->swrite, 0, s->swrite_sz); | |
| 172 | |
| 173 if (uv__io_active(&stream->io_watcher, POLLIN)) | |
| 174 FD_SET(fd, s->sread); | |
| 175 if (uv__io_active(&stream->io_watcher, POLLOUT)) | |
| 176 FD_SET(fd, s->swrite); | |
| 177 FD_SET(s->int_fd, s->sread); | |
| 178 | |
| 179 /* Wait indefinitely for fd events */ | |
| 180 r = select(max_fd + 1, s->sread, s->swrite, NULL, NULL); | |
| 181 if (r == -1) { | |
| 182 if (errno == EINTR) | |
| 183 continue; | |
| 184 | |
| 185 /* XXX: Possible?! */ | |
| 186 abort(); | |
| 187 } | |
| 188 | |
| 189 /* Ignore timeouts */ | |
| 190 if (r == 0) | |
| 191 continue; | |
| 192 | |
| 193 /* Empty socketpair's buffer in case of interruption */ | |
| 194 if (FD_ISSET(s->int_fd, s->sread)) | |
| 195 for (;;) { | |
| 196 r = read(s->int_fd, buf, sizeof(buf)); | |
| 197 | |
| 198 if (r == sizeof(buf)) | |
| 199 continue; | |
| 200 | |
| 201 if (r != -1) | |
| 202 break; | |
| 203 | |
| 204 if (errno == EAGAIN || errno == EWOULDBLOCK) | |
| 205 break; | |
| 206 | |
| 207 if (errno == EINTR) | |
| 208 continue; | |
| 209 | |
| 210 abort(); | |
| 211 } | |
| 212 | |
| 213 /* Handle events */ | |
| 214 events = 0; | |
| 215 if (FD_ISSET(fd, s->sread)) | |
| 216 events |= POLLIN; | |
| 217 if (FD_ISSET(fd, s->swrite)) | |
| 218 events |= POLLOUT; | |
| 219 | |
| 220 assert(events != 0 || FD_ISSET(s->int_fd, s->sread)); | |
| 221 if (events != 0) { | |
| 222 ACCESS_ONCE(int, s->events) = events; | |
| 223 | |
| 224 uv_async_send(&s->async); | |
| 225 uv_sem_wait(&s->async_sem); | |
| 226 | |
| 227 /* Should be processed at this stage */ | |
| 228 assert((s->events == 0) || (stream->flags & UV_HANDLE_CLOSING)); | |
| 229 } | |
| 230 } | |
| 231 } | |
| 232 | |
| 233 | |
| 234 static void uv__stream_osx_select_cb(uv_async_t* handle) { | |
| 235 uv__stream_select_t* s; | |
| 236 uv_stream_t* stream; | |
| 237 int events; | |
| 238 | |
| 239 s = container_of(handle, uv__stream_select_t, async); | |
| 240 stream = s->stream; | |
| 241 | |
| 242 /* Get and reset stream's events */ | |
| 243 events = s->events; | |
| 244 ACCESS_ONCE(int, s->events) = 0; | |
| 245 | |
| 246 assert(events != 0); | |
| 247 assert(events == (events & (POLLIN | POLLOUT))); | |
| 248 | |
| 249 /* Invoke callback on event-loop */ | |
| 250 if ((events & POLLIN) && uv__io_active(&stream->io_watcher, POLLIN)) | |
| 251 uv__stream_io(stream->loop, &stream->io_watcher, POLLIN); | |
| 252 | |
| 253 if ((events & POLLOUT) && uv__io_active(&stream->io_watcher, POLLOUT)) | |
| 254 uv__stream_io(stream->loop, &stream->io_watcher, POLLOUT); | |
| 255 | |
| 256 if (stream->flags & UV_HANDLE_CLOSING) | |
| 257 return; | |
| 258 | |
| 259 /* NOTE: It is important to do it here, otherwise `select()` might be called | |
| 260 * before the actual `uv__read()`, leading to the blocking syscall | |
| 261 */ | |
| 262 uv_sem_post(&s->async_sem); | |
| 263 } | |
| 264 | |
| 265 | |
| 266 static void uv__stream_osx_cb_close(uv_handle_t* async) { | |
| 267 uv__stream_select_t* s; | |
| 268 | |
| 269 s = container_of(async, uv__stream_select_t, async); | |
| 270 uv__free(s); | |
| 271 } | |
| 272 | |
| 273 | |
| 274 int uv__stream_try_select(uv_stream_t* stream, int* fd) { | |
| 275 /* | |
| 276 * kqueue doesn't work with some files from /dev mount on osx. | |
| 277 * select(2) in separate thread for those fds | |
| 278 */ | |
| 279 | |
| 280 struct kevent filter[1]; | |
| 281 struct kevent events[1]; | |
| 282 struct timespec timeout; | |
| 283 uv__stream_select_t* s; | |
| 284 int fds[2]; | |
| 285 int err; | |
| 286 int ret; | |
| 287 int kq; | |
| 288 int old_fd; | |
| 289 int max_fd; | |
| 290 size_t sread_sz; | |
| 291 size_t swrite_sz; | |
| 292 | |
| 293 kq = kqueue(); | |
| 294 if (kq == -1) { | |
| 295 perror("(libuv) kqueue()"); | |
| 296 return UV__ERR(errno); | |
| 297 } | |
| 298 | |
| 299 EV_SET(&filter[0], *fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0); | |
| 300 | |
| 301 /* Use small timeout, because we only want to capture EINVALs */ | |
| 302 timeout.tv_sec = 0; | |
| 303 timeout.tv_nsec = 1; | |
| 304 | |
| 305 do | |
| 306 ret = kevent(kq, filter, 1, events, 1, &timeout); | |
| 307 while (ret == -1 && errno == EINTR); | |
| 308 | |
| 309 uv__close(kq); | |
| 310 | |
| 311 if (ret == -1) | |
| 312 return UV__ERR(errno); | |
| 313 | |
| 314 if (ret == 0 || (events[0].flags & EV_ERROR) == 0 || events[0].data != EINVAL) | |
| 315 return 0; | |
| 316 | |
| 317 /* At this point we definitely know that this fd won't work with kqueue */ | |
| 318 | |
| 319 /* | |
| 320 * Create fds for io watcher and to interrupt the select() loop. | |
| 321 * NOTE: do it ahead of malloc below to allocate enough space for fd_sets | |
| 322 */ | |
| 323 if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds)) | |
| 324 return UV__ERR(errno); | |
| 325 | |
| 326 max_fd = *fd; | |
| 327 if (fds[1] > max_fd) | |
| 328 max_fd = fds[1]; | |
| 329 | |
| 330 sread_sz = ROUND_UP(max_fd + 1, sizeof(uint32_t) * NBBY) / NBBY; | |
| 331 swrite_sz = sread_sz; | |
| 332 | |
| 333 s = uv__malloc(sizeof(*s) + sread_sz + swrite_sz); | |
| 334 if (s == NULL) { | |
| 335 err = UV_ENOMEM; | |
| 336 goto failed_malloc; | |
| 337 } | |
| 338 | |
| 339 s->events = 0; | |
| 340 s->fd = *fd; | |
| 341 s->sread = (fd_set*) ((char*) s + sizeof(*s)); | |
| 342 s->sread_sz = sread_sz; | |
| 343 s->swrite = (fd_set*) ((char*) s->sread + sread_sz); | |
| 344 s->swrite_sz = swrite_sz; | |
| 345 | |
| 346 err = uv_async_init(stream->loop, &s->async, uv__stream_osx_select_cb); | |
| 347 if (err) | |
| 348 goto failed_async_init; | |
| 349 | |
| 350 s->async.flags |= UV_HANDLE_INTERNAL; | |
| 351 uv__handle_unref(&s->async); | |
| 352 | |
| 353 err = uv_sem_init(&s->close_sem, 0); | |
| 354 if (err != 0) | |
| 355 goto failed_close_sem_init; | |
| 356 | |
| 357 err = uv_sem_init(&s->async_sem, 0); | |
| 358 if (err != 0) | |
| 359 goto failed_async_sem_init; | |
| 360 | |
| 361 s->fake_fd = fds[0]; | |
| 362 s->int_fd = fds[1]; | |
| 363 | |
| 364 old_fd = *fd; | |
| 365 s->stream = stream; | |
| 366 stream->select = s; | |
| 367 *fd = s->fake_fd; | |
| 368 | |
| 369 err = uv_thread_create(&s->thread, uv__stream_osx_select, stream); | |
| 370 if (err != 0) | |
| 371 goto failed_thread_create; | |
| 372 | |
| 373 return 0; | |
| 374 | |
| 375 failed_thread_create: | |
| 376 s->stream = NULL; | |
| 377 stream->select = NULL; | |
| 378 *fd = old_fd; | |
| 379 | |
| 380 uv_sem_destroy(&s->async_sem); | |
| 381 | |
| 382 failed_async_sem_init: | |
| 383 uv_sem_destroy(&s->close_sem); | |
| 384 | |
| 385 failed_close_sem_init: | |
| 386 uv__close(fds[0]); | |
| 387 uv__close(fds[1]); | |
| 388 uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close); | |
| 389 return err; | |
| 390 | |
| 391 failed_async_init: | |
| 392 uv__free(s); | |
| 393 | |
| 394 failed_malloc: | |
| 395 uv__close(fds[0]); | |
| 396 uv__close(fds[1]); | |
| 397 | |
| 398 return err; | |
| 399 } | |
| 400 #endif /* defined(__APPLE__) */ | |
| 401 | |
| 402 | |
| 403 int uv__stream_open(uv_stream_t* stream, int fd, int flags) { | |
| 404 #if defined(__APPLE__) | |
| 405 int enable; | |
| 406 #endif | |
| 407 | |
| 408 if (!(stream->io_watcher.fd == -1 || stream->io_watcher.fd == fd)) | |
| 409 return UV_EBUSY; | |
| 410 | |
| 411 assert(fd >= 0); | |
| 412 stream->flags |= flags; | |
| 413 | |
| 414 if (stream->type == UV_TCP) { | |
| 415 if ((stream->flags & UV_HANDLE_TCP_NODELAY) && uv__tcp_nodelay(fd, 1)) | |
| 416 return UV__ERR(errno); | |
| 417 | |
| 418 /* TODO Use delay the user passed in. */ | |
| 419 if ((stream->flags & UV_HANDLE_TCP_KEEPALIVE) && | |
| 420 uv__tcp_keepalive(fd, 1, 60)) { | |
| 421 return UV__ERR(errno); | |
| 422 } | |
| 423 } | |
| 424 | |
| 425 #if defined(__APPLE__) | |
| 426 enable = 1; | |
| 427 if (setsockopt(fd, SOL_SOCKET, SO_OOBINLINE, &enable, sizeof(enable)) && | |
| 428 errno != ENOTSOCK && | |
| 429 errno != EINVAL) { | |
| 430 return UV__ERR(errno); | |
| 431 } | |
| 432 #endif | |
| 433 | |
| 434 stream->io_watcher.fd = fd; | |
| 435 | |
| 436 return 0; | |
| 437 } | |
| 438 | |
| 439 | |
| 440 void uv__stream_flush_write_queue(uv_stream_t* stream, int error) { | |
| 441 uv_write_t* req; | |
| 442 struct uv__queue* q; | |
| 443 while (!uv__queue_empty(&stream->write_queue)) { | |
| 444 q = uv__queue_head(&stream->write_queue); | |
| 445 uv__queue_remove(q); | |
| 446 | |
| 447 req = uv__queue_data(q, uv_write_t, queue); | |
| 448 req->error = error; | |
| 449 | |
| 450 uv__queue_insert_tail(&stream->write_completed_queue, &req->queue); | |
| 451 } | |
| 452 } | |
| 453 | |
| 454 | |
| 455 void uv__stream_destroy(uv_stream_t* stream) { | |
| 456 assert(!uv__io_active(&stream->io_watcher, POLLIN | POLLOUT)); | |
| 457 assert(stream->flags & UV_HANDLE_CLOSED); | |
| 458 | |
| 459 if (stream->connect_req) { | |
| 460 uv__req_unregister(stream->loop); | |
| 461 stream->connect_req->cb(stream->connect_req, UV_ECANCELED); | |
| 462 stream->connect_req = NULL; | |
| 463 } | |
| 464 | |
| 465 uv__stream_flush_write_queue(stream, UV_ECANCELED); | |
| 466 uv__write_callbacks(stream); | |
| 467 uv__drain(stream); | |
| 468 | |
| 469 assert(stream->write_queue_size == 0); | |
| 470 } | |
| 471 | |
| 472 | |
| 473 /* Implements a best effort approach to mitigating accept() EMFILE errors. | |
| 474 * We have a spare file descriptor stashed away that we close to get below | |
| 475 * the EMFILE limit. Next, we accept all pending connections and close them | |
| 476 * immediately to signal the clients that we're overloaded - and we are, but | |
| 477 * we still keep on trucking. | |
| 478 * | |
| 479 * There is one caveat: it's not reliable in a multi-threaded environment. | |
| 480 * The file descriptor limit is per process. Our party trick fails if another | |
| 481 * thread opens a file or creates a socket in the time window between us | |
| 482 * calling close() and accept(). | |
| 483 */ | |
| 484 static int uv__emfile_trick(uv_loop_t* loop, int accept_fd) { | |
| 485 int err; | |
| 486 int emfile_fd; | |
| 487 | |
| 488 if (loop->emfile_fd == -1) | |
| 489 return UV_EMFILE; | |
| 490 | |
| 491 uv__close(loop->emfile_fd); | |
| 492 loop->emfile_fd = -1; | |
| 493 | |
| 494 do { | |
| 495 err = uv__accept(accept_fd); | |
| 496 if (err >= 0) | |
| 497 uv__close(err); | |
| 498 } while (err >= 0 || err == UV_EINTR); | |
| 499 | |
| 500 emfile_fd = uv__open_cloexec("/", O_RDONLY); | |
| 501 if (emfile_fd >= 0) | |
| 502 loop->emfile_fd = emfile_fd; | |
| 503 | |
| 504 return err; | |
| 505 } | |
| 506 | |
| 507 | |
| 508 void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) { | |
| 509 uv_stream_t* stream; | |
| 510 int err; | |
| 511 int fd; | |
| 512 | |
| 513 stream = container_of(w, uv_stream_t, io_watcher); | |
| 514 assert(events & POLLIN); | |
| 515 assert(stream->accepted_fd == -1); | |
| 516 assert(!(stream->flags & UV_HANDLE_CLOSING)); | |
| 517 | |
| 518 fd = uv__stream_fd(stream); | |
| 519 err = uv__accept(fd); | |
| 520 | |
| 521 if (err == UV_EMFILE || err == UV_ENFILE) | |
| 522 err = uv__emfile_trick(loop, fd); /* Shed load. */ | |
| 523 | |
| 524 if (err < 0) | |
| 525 return; | |
| 526 | |
| 527 stream->accepted_fd = err; | |
| 528 stream->connection_cb(stream, 0); | |
| 529 | |
| 530 if (stream->accepted_fd != -1) | |
| 531 /* The user hasn't yet accepted called uv_accept() */ | |
| 532 uv__io_stop(loop, &stream->io_watcher, POLLIN); | |
| 533 } | |
| 534 | |
| 535 | |
| 536 int uv_accept(uv_stream_t* server, uv_stream_t* client) { | |
| 537 int err; | |
| 538 | |
| 539 assert(server->loop == client->loop); | |
| 540 | |
| 541 if (server->accepted_fd == -1) | |
| 542 return UV_EAGAIN; | |
| 543 | |
| 544 switch (client->type) { | |
| 545 case UV_NAMED_PIPE: | |
| 546 case UV_TCP: | |
| 547 err = uv__stream_open(client, | |
| 548 server->accepted_fd, | |
| 549 UV_HANDLE_READABLE | UV_HANDLE_WRITABLE); | |
| 550 if (err) { | |
| 551 /* TODO handle error */ | |
| 552 uv__close(server->accepted_fd); | |
| 553 goto done; | |
| 554 } | |
| 555 break; | |
| 556 | |
| 557 case UV_UDP: | |
| 558 err = uv_udp_open((uv_udp_t*) client, server->accepted_fd); | |
| 559 if (err) { | |
| 560 uv__close(server->accepted_fd); | |
| 561 goto done; | |
| 562 } | |
| 563 break; | |
| 564 | |
| 565 default: | |
| 566 return UV_EINVAL; | |
| 567 } | |
| 568 | |
| 569 client->flags |= UV_HANDLE_BOUND; | |
| 570 | |
| 571 done: | |
| 572 /* Process queued fds */ | |
| 573 if (server->queued_fds != NULL) { | |
| 574 uv__stream_queued_fds_t* queued_fds; | |
| 575 | |
| 576 queued_fds = server->queued_fds; | |
| 577 | |
| 578 /* Read first */ | |
| 579 server->accepted_fd = queued_fds->fds[0]; | |
| 580 | |
| 581 /* All read, free */ | |
| 582 assert(queued_fds->offset > 0); | |
| 583 if (--queued_fds->offset == 0) { | |
| 584 uv__free(queued_fds); | |
| 585 server->queued_fds = NULL; | |
| 586 } else { | |
| 587 /* Shift rest */ | |
| 588 memmove(queued_fds->fds, | |
| 589 queued_fds->fds + 1, | |
| 590 queued_fds->offset * sizeof(*queued_fds->fds)); | |
| 591 } | |
| 592 } else { | |
| 593 server->accepted_fd = -1; | |
| 594 if (err == 0) | |
| 595 uv__io_start(server->loop, &server->io_watcher, POLLIN); | |
| 596 } | |
| 597 return err; | |
| 598 } | |
| 599 | |
| 600 | |
| 601 int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) { | |
| 602 int err; | |
| 603 if (uv__is_closing(stream)) { | |
| 604 return UV_EINVAL; | |
| 605 } | |
| 606 switch (stream->type) { | |
| 607 case UV_TCP: | |
| 608 err = uv__tcp_listen((uv_tcp_t*)stream, backlog, cb); | |
| 609 break; | |
| 610 | |
| 611 case UV_NAMED_PIPE: | |
| 612 err = uv__pipe_listen((uv_pipe_t*)stream, backlog, cb); | |
| 613 break; | |
| 614 | |
| 615 default: | |
| 616 err = UV_EINVAL; | |
| 617 } | |
| 618 | |
| 619 if (err == 0) | |
| 620 uv__handle_start(stream); | |
| 621 | |
| 622 return err; | |
| 623 } | |
| 624 | |
| 625 | |
| 626 static void uv__drain(uv_stream_t* stream) { | |
| 627 uv_shutdown_t* req; | |
| 628 int err; | |
| 629 | |
| 630 assert(uv__queue_empty(&stream->write_queue)); | |
| 631 if (!(stream->flags & UV_HANDLE_CLOSING)) { | |
| 632 uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT); | |
| 633 uv__stream_osx_interrupt_select(stream); | |
| 634 } | |
| 635 | |
| 636 if (!uv__is_stream_shutting(stream)) | |
| 637 return; | |
| 638 | |
| 639 req = stream->shutdown_req; | |
| 640 assert(req); | |
| 641 | |
| 642 if ((stream->flags & UV_HANDLE_CLOSING) || | |
| 643 !(stream->flags & UV_HANDLE_SHUT)) { | |
| 644 stream->shutdown_req = NULL; | |
| 645 uv__req_unregister(stream->loop); | |
| 646 | |
| 647 err = 0; | |
| 648 if (stream->flags & UV_HANDLE_CLOSING) | |
| 649 /* The user destroyed the stream before we got to do the shutdown. */ | |
| 650 err = UV_ECANCELED; | |
| 651 else if (shutdown(uv__stream_fd(stream), SHUT_WR)) | |
| 652 err = UV__ERR(errno); | |
| 653 else /* Success. */ | |
| 654 stream->flags |= UV_HANDLE_SHUT; | |
| 655 | |
| 656 if (req->cb != NULL) | |
| 657 req->cb(req, err); | |
| 658 } | |
| 659 } | |
| 660 | |
| 661 | |
| 662 static ssize_t uv__writev(int fd, struct iovec* vec, size_t n) { | |
| 663 if (n == 1) | |
| 664 return write(fd, vec->iov_base, vec->iov_len); | |
| 665 else | |
| 666 return writev(fd, vec, n); | |
| 667 } | |
| 668 | |
| 669 | |
| 670 static size_t uv__write_req_size(uv_write_t* req) { | |
| 671 size_t size; | |
| 672 | |
| 673 assert(req->bufs != NULL); | |
| 674 size = uv__count_bufs(req->bufs + req->write_index, | |
| 675 req->nbufs - req->write_index); | |
| 676 assert(req->handle->write_queue_size >= size); | |
| 677 | |
| 678 return size; | |
| 679 } | |
| 680 | |
| 681 | |
| 682 /* Returns 1 if all write request data has been written, or 0 if there is still | |
| 683 * more data to write. | |
| 684 * | |
| 685 * Note: the return value only says something about the *current* request. | |
| 686 * There may still be other write requests sitting in the queue. | |
| 687 */ | |
| 688 static int uv__write_req_update(uv_stream_t* stream, | |
| 689 uv_write_t* req, | |
| 690 size_t n) { | |
| 691 uv_buf_t* buf; | |
| 692 size_t len; | |
| 693 | |
| 694 assert(n <= stream->write_queue_size); | |
| 695 stream->write_queue_size -= n; | |
| 696 | |
| 697 buf = req->bufs + req->write_index; | |
| 698 | |
| 699 do { | |
| 700 len = n < buf->len ? n : buf->len; | |
| 701 if (buf->len != 0) | |
| 702 buf->base += len; | |
| 703 buf->len -= len; | |
| 704 buf += (buf->len == 0); /* Advance to next buffer if this one is empty. */ | |
| 705 n -= len; | |
| 706 } while (n > 0); | |
| 707 | |
| 708 req->write_index = buf - req->bufs; | |
| 709 | |
| 710 return req->write_index == req->nbufs; | |
| 711 } | |
| 712 | |
| 713 | |
| 714 static void uv__write_req_finish(uv_write_t* req) { | |
| 715 uv_stream_t* stream = req->handle; | |
| 716 | |
| 717 /* Pop the req off tcp->write_queue. */ | |
| 718 uv__queue_remove(&req->queue); | |
| 719 | |
| 720 /* Only free when there was no error. On error, we touch up write_queue_size | |
| 721 * right before making the callback. The reason we don't do that right away | |
| 722 * is that a write_queue_size > 0 is our only way to signal to the user that | |
| 723 * they should stop writing - which they should if we got an error. Something | |
| 724 * to revisit in future revisions of the libuv API. | |
| 725 */ | |
| 726 if (req->error == 0) { | |
| 727 if (req->bufs != req->bufsml) | |
| 728 uv__free(req->bufs); | |
| 729 req->bufs = NULL; | |
| 730 } | |
| 731 | |
| 732 /* Add it to the write_completed_queue where it will have its | |
| 733 * callback called in the near future. | |
| 734 */ | |
| 735 uv__queue_insert_tail(&stream->write_completed_queue, &req->queue); | |
| 736 uv__io_feed(stream->loop, &stream->io_watcher); | |
| 737 } | |
| 738 | |
| 739 | |
| 740 static int uv__handle_fd(uv_handle_t* handle) { | |
| 741 switch (handle->type) { | |
| 742 case UV_NAMED_PIPE: | |
| 743 case UV_TCP: | |
| 744 return ((uv_stream_t*) handle)->io_watcher.fd; | |
| 745 | |
| 746 case UV_UDP: | |
| 747 return ((uv_udp_t*) handle)->io_watcher.fd; | |
| 748 | |
| 749 default: | |
| 750 return -1; | |
| 751 } | |
| 752 } | |
| 753 | |
| 754 static int uv__try_write(uv_stream_t* stream, | |
| 755 const uv_buf_t bufs[], | |
| 756 unsigned int nbufs, | |
| 757 uv_stream_t* send_handle) { | |
| 758 struct iovec* iov; | |
| 759 int iovmax; | |
| 760 int iovcnt; | |
| 761 ssize_t n; | |
| 762 | |
| 763 /* | |
| 764 * Cast to iovec. We had to have our own uv_buf_t instead of iovec | |
| 765 * because Windows's WSABUF is not an iovec. | |
| 766 */ | |
| 767 iov = (struct iovec*) bufs; | |
| 768 iovcnt = nbufs; | |
| 769 | |
| 770 iovmax = uv__getiovmax(); | |
| 771 | |
| 772 /* Limit iov count to avoid EINVALs from writev() */ | |
| 773 if (iovcnt > iovmax) | |
| 774 iovcnt = iovmax; | |
| 775 | |
| 776 /* | |
| 777 * Now do the actual writev. Note that we've been updating the pointers | |
| 778 * inside the iov each time we write. So there is no need to offset it. | |
| 779 */ | |
| 780 if (send_handle != NULL) { | |
| 781 int fd_to_send; | |
| 782 struct msghdr msg; | |
| 783 union uv__cmsg cmsg; | |
| 784 | |
| 785 if (uv__is_closing(send_handle)) | |
| 786 return UV_EBADF; | |
| 787 | |
| 788 fd_to_send = uv__handle_fd((uv_handle_t*) send_handle); | |
| 789 | |
| 790 memset(&cmsg, 0, sizeof(cmsg)); | |
| 791 | |
| 792 assert(fd_to_send >= 0); | |
| 793 | |
| 794 msg.msg_name = NULL; | |
| 795 msg.msg_namelen = 0; | |
| 796 msg.msg_iov = iov; | |
| 797 msg.msg_iovlen = iovcnt; | |
| 798 msg.msg_flags = 0; | |
| 799 | |
| 800 msg.msg_control = &cmsg.hdr; | |
| 801 msg.msg_controllen = CMSG_SPACE(sizeof(fd_to_send)); | |
| 802 | |
| 803 cmsg.hdr.cmsg_level = SOL_SOCKET; | |
| 804 cmsg.hdr.cmsg_type = SCM_RIGHTS; | |
| 805 cmsg.hdr.cmsg_len = CMSG_LEN(sizeof(fd_to_send)); | |
| 806 memcpy(CMSG_DATA(&cmsg.hdr), &fd_to_send, sizeof(fd_to_send)); | |
| 807 | |
| 808 do | |
| 809 n = sendmsg(uv__stream_fd(stream), &msg, 0); | |
| 810 while (n == -1 && errno == EINTR); | |
| 811 } else { | |
| 812 do | |
| 813 n = uv__writev(uv__stream_fd(stream), iov, iovcnt); | |
| 814 while (n == -1 && errno == EINTR); | |
| 815 } | |
| 816 | |
| 817 if (n >= 0) | |
| 818 return n; | |
| 819 | |
| 820 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS) | |
| 821 return UV_EAGAIN; | |
| 822 | |
| 823 #ifdef __APPLE__ | |
| 824 /* macOS versions 10.10 and 10.15 - and presumbaly 10.11 to 10.14, too - | |
| 825 * have a bug where a race condition causes the kernel to return EPROTOTYPE | |
| 826 * because the socket isn't fully constructed. It's probably the result of | |
| 827 * the peer closing the connection and that is why libuv translates it to | |
| 828 * ECONNRESET. Previously, libuv retried until the EPROTOTYPE error went | |
| 829 * away but some VPN software causes the same behavior except the error is | |
| 830 * permanent, not transient, turning the retry mechanism into an infinite | |
| 831 * loop. See https://github.com/libuv/libuv/pull/482. | |
| 832 */ | |
| 833 if (errno == EPROTOTYPE) | |
| 834 return UV_ECONNRESET; | |
| 835 #endif /* __APPLE__ */ | |
| 836 | |
| 837 return UV__ERR(errno); | |
| 838 } | |
| 839 | |
| 840 static void uv__write(uv_stream_t* stream) { | |
| 841 struct uv__queue* q; | |
| 842 uv_write_t* req; | |
| 843 ssize_t n; | |
| 844 int count; | |
| 845 | |
| 846 assert(uv__stream_fd(stream) >= 0); | |
| 847 | |
| 848 /* Prevent loop starvation when the consumer of this stream read as fast as | |
| 849 * (or faster than) we can write it. This `count` mechanism does not need to | |
| 850 * change even if we switch to edge-triggered I/O. | |
| 851 */ | |
| 852 count = 32; | |
| 853 | |
| 854 for (;;) { | |
| 855 if (uv__queue_empty(&stream->write_queue)) | |
| 856 return; | |
| 857 | |
| 858 q = uv__queue_head(&stream->write_queue); | |
| 859 req = uv__queue_data(q, uv_write_t, queue); | |
| 860 assert(req->handle == stream); | |
| 861 | |
| 862 n = uv__try_write(stream, | |
| 863 &(req->bufs[req->write_index]), | |
| 864 req->nbufs - req->write_index, | |
| 865 req->send_handle); | |
| 866 | |
| 867 /* Ensure the handle isn't sent again in case this is a partial write. */ | |
| 868 if (n >= 0) { | |
| 869 req->send_handle = NULL; | |
| 870 if (uv__write_req_update(stream, req, n)) { | |
| 871 uv__write_req_finish(req); | |
| 872 if (count-- > 0) | |
| 873 continue; /* Start trying to write the next request. */ | |
| 874 | |
| 875 return; | |
| 876 } | |
| 877 } else if (n != UV_EAGAIN) | |
| 878 goto error; | |
| 879 | |
| 880 /* If this is a blocking stream, try again. */ | |
| 881 if (stream->flags & UV_HANDLE_BLOCKING_WRITES) | |
| 882 continue; | |
| 883 | |
| 884 /* We're not done. */ | |
| 885 uv__io_start(stream->loop, &stream->io_watcher, POLLOUT); | |
| 886 | |
| 887 /* Notify select() thread about state change */ | |
| 888 uv__stream_osx_interrupt_select(stream); | |
| 889 | |
| 890 return; | |
| 891 } | |
| 892 | |
| 893 error: | |
| 894 req->error = n; | |
| 895 uv__write_req_finish(req); | |
| 896 uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT); | |
| 897 uv__stream_osx_interrupt_select(stream); | |
| 898 } | |
| 899 | |
| 900 | |
| 901 static void uv__write_callbacks(uv_stream_t* stream) { | |
| 902 uv_write_t* req; | |
| 903 struct uv__queue* q; | |
| 904 struct uv__queue pq; | |
| 905 | |
| 906 if (uv__queue_empty(&stream->write_completed_queue)) | |
| 907 return; | |
| 908 | |
| 909 uv__queue_move(&stream->write_completed_queue, &pq); | |
| 910 | |
| 911 while (!uv__queue_empty(&pq)) { | |
| 912 /* Pop a req off write_completed_queue. */ | |
| 913 q = uv__queue_head(&pq); | |
| 914 req = uv__queue_data(q, uv_write_t, queue); | |
| 915 uv__queue_remove(q); | |
| 916 uv__req_unregister(stream->loop); | |
| 917 | |
| 918 if (req->bufs != NULL) { | |
| 919 stream->write_queue_size -= uv__write_req_size(req); | |
| 920 if (req->bufs != req->bufsml) | |
| 921 uv__free(req->bufs); | |
| 922 req->bufs = NULL; | |
| 923 } | |
| 924 | |
| 925 /* NOTE: call callback AFTER freeing the request data. */ | |
| 926 if (req->cb) | |
| 927 req->cb(req, req->error); | |
| 928 } | |
| 929 } | |
| 930 | |
| 931 | |
| 932 static void uv__stream_eof(uv_stream_t* stream, const uv_buf_t* buf) { | |
| 933 stream->flags |= UV_HANDLE_READ_EOF; | |
| 934 stream->flags &= ~UV_HANDLE_READING; | |
| 935 uv__io_stop(stream->loop, &stream->io_watcher, POLLIN); | |
| 936 uv__handle_stop(stream); | |
| 937 uv__stream_osx_interrupt_select(stream); | |
| 938 stream->read_cb(stream, UV_EOF, buf); | |
| 939 } | |
| 940 | |
| 941 | |
| 942 static int uv__stream_queue_fd(uv_stream_t* stream, int fd) { | |
| 943 uv__stream_queued_fds_t* queued_fds; | |
| 944 unsigned int queue_size; | |
| 945 | |
| 946 queued_fds = stream->queued_fds; | |
| 947 if (queued_fds == NULL) { | |
| 948 queue_size = 8; | |
| 949 queued_fds = uv__malloc((queue_size - 1) * sizeof(*queued_fds->fds) + | |
| 950 sizeof(*queued_fds)); | |
| 951 if (queued_fds == NULL) | |
| 952 return UV_ENOMEM; | |
| 953 queued_fds->size = queue_size; | |
| 954 queued_fds->offset = 0; | |
| 955 stream->queued_fds = queued_fds; | |
| 956 | |
| 957 /* Grow */ | |
| 958 } else if (queued_fds->size == queued_fds->offset) { | |
| 959 queue_size = queued_fds->size + 8; | |
| 960 queued_fds = uv__realloc(queued_fds, | |
| 961 (queue_size - 1) * sizeof(*queued_fds->fds) + | |
| 962 sizeof(*queued_fds)); | |
| 963 | |
| 964 /* | |
| 965 * Allocation failure, report back. | |
| 966 * NOTE: if it is fatal - sockets will be closed in uv__stream_close | |
| 967 */ | |
| 968 if (queued_fds == NULL) | |
| 969 return UV_ENOMEM; | |
| 970 queued_fds->size = queue_size; | |
| 971 stream->queued_fds = queued_fds; | |
| 972 } | |
| 973 | |
| 974 /* Put fd in a queue */ | |
| 975 queued_fds->fds[queued_fds->offset++] = fd; | |
| 976 | |
| 977 return 0; | |
| 978 } | |
| 979 | |
| 980 | |
| 981 static int uv__stream_recv_cmsg(uv_stream_t* stream, struct msghdr* msg) { | |
| 982 struct cmsghdr* cmsg; | |
| 983 char* p; | |
| 984 char* pe; | |
| 985 int fd; | |
| 986 int err; | |
| 987 size_t count; | |
| 988 | |
| 989 err = 0; | |
| 990 for (cmsg = CMSG_FIRSTHDR(msg); cmsg != NULL; cmsg = CMSG_NXTHDR(msg, cmsg)) { | |
| 991 if (cmsg->cmsg_type != SCM_RIGHTS) { | |
| 992 fprintf(stderr, "ignoring non-SCM_RIGHTS ancillary data: %d\n", | |
| 993 cmsg->cmsg_type); | |
| 994 continue; | |
| 995 } | |
| 996 | |
| 997 assert(cmsg->cmsg_len >= CMSG_LEN(0)); | |
| 998 count = cmsg->cmsg_len - CMSG_LEN(0); | |
| 999 assert(count % sizeof(fd) == 0); | |
| 1000 count /= sizeof(fd); | |
| 1001 | |
| 1002 p = (void*) CMSG_DATA(cmsg); | |
| 1003 pe = p + count * sizeof(fd); | |
| 1004 | |
| 1005 while (p < pe) { | |
| 1006 memcpy(&fd, p, sizeof(fd)); | |
| 1007 p += sizeof(fd); | |
| 1008 | |
| 1009 if (err == 0) { | |
| 1010 if (stream->accepted_fd == -1) | |
| 1011 stream->accepted_fd = fd; | |
| 1012 else | |
| 1013 err = uv__stream_queue_fd(stream, fd); | |
| 1014 } | |
| 1015 | |
| 1016 if (err != 0) | |
| 1017 uv__close(fd); | |
| 1018 } | |
| 1019 } | |
| 1020 | |
| 1021 return err; | |
| 1022 } | |
| 1023 | |
| 1024 | |
| 1025 static void uv__read(uv_stream_t* stream) { | |
| 1026 uv_buf_t buf; | |
| 1027 ssize_t nread; | |
| 1028 struct msghdr msg; | |
| 1029 union uv__cmsg cmsg; | |
| 1030 int count; | |
| 1031 int err; | |
| 1032 int is_ipc; | |
| 1033 | |
| 1034 stream->flags &= ~UV_HANDLE_READ_PARTIAL; | |
| 1035 | |
| 1036 /* Prevent loop starvation when the data comes in as fast as (or faster than) | |
| 1037 * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O. | |
| 1038 */ | |
| 1039 count = 32; | |
| 1040 | |
| 1041 is_ipc = stream->type == UV_NAMED_PIPE && ((uv_pipe_t*) stream)->ipc; | |
| 1042 | |
| 1043 /* XXX: Maybe instead of having UV_HANDLE_READING we just test if | |
| 1044 * tcp->read_cb is NULL or not? | |
| 1045 */ | |
| 1046 while (stream->read_cb | |
| 1047 && (stream->flags & UV_HANDLE_READING) | |
| 1048 && (count-- > 0)) { | |
| 1049 assert(stream->alloc_cb != NULL); | |
| 1050 | |
| 1051 buf = uv_buf_init(NULL, 0); | |
| 1052 stream->alloc_cb((uv_handle_t*)stream, 64 * 1024, &buf); | |
| 1053 if (buf.base == NULL || buf.len == 0) { | |
| 1054 /* User indicates it can't or won't handle the read. */ | |
| 1055 stream->read_cb(stream, UV_ENOBUFS, &buf); | |
| 1056 return; | |
| 1057 } | |
| 1058 | |
| 1059 assert(buf.base != NULL); | |
| 1060 assert(uv__stream_fd(stream) >= 0); | |
| 1061 | |
| 1062 if (!is_ipc) { | |
| 1063 do { | |
| 1064 nread = read(uv__stream_fd(stream), buf.base, buf.len); | |
| 1065 } | |
| 1066 while (nread < 0 && errno == EINTR); | |
| 1067 } else { | |
| 1068 /* ipc uses recvmsg */ | |
| 1069 msg.msg_flags = 0; | |
| 1070 msg.msg_iov = (struct iovec*) &buf; | |
| 1071 msg.msg_iovlen = 1; | |
| 1072 msg.msg_name = NULL; | |
| 1073 msg.msg_namelen = 0; | |
| 1074 /* Set up to receive a descriptor even if one isn't in the message */ | |
| 1075 msg.msg_controllen = sizeof(cmsg); | |
| 1076 msg.msg_control = &cmsg.hdr; | |
| 1077 | |
| 1078 do { | |
| 1079 nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0); | |
| 1080 } | |
| 1081 while (nread < 0 && errno == EINTR); | |
| 1082 } | |
| 1083 | |
| 1084 if (nread < 0) { | |
| 1085 /* Error */ | |
| 1086 if (errno == EAGAIN || errno == EWOULDBLOCK) { | |
| 1087 /* Wait for the next one. */ | |
| 1088 if (stream->flags & UV_HANDLE_READING) { | |
| 1089 uv__io_start(stream->loop, &stream->io_watcher, POLLIN); | |
| 1090 uv__stream_osx_interrupt_select(stream); | |
| 1091 } | |
| 1092 stream->read_cb(stream, 0, &buf); | |
| 1093 #if defined(__CYGWIN__) || defined(__MSYS__) | |
| 1094 } else if (errno == ECONNRESET && stream->type == UV_NAMED_PIPE) { | |
| 1095 uv__stream_eof(stream, &buf); | |
| 1096 return; | |
| 1097 #endif | |
| 1098 } else { | |
| 1099 /* Error. User should call uv_close(). */ | |
| 1100 stream->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE); | |
| 1101 stream->read_cb(stream, UV__ERR(errno), &buf); | |
| 1102 if (stream->flags & UV_HANDLE_READING) { | |
| 1103 stream->flags &= ~UV_HANDLE_READING; | |
| 1104 uv__io_stop(stream->loop, &stream->io_watcher, POLLIN); | |
| 1105 uv__handle_stop(stream); | |
| 1106 uv__stream_osx_interrupt_select(stream); | |
| 1107 } | |
| 1108 } | |
| 1109 return; | |
| 1110 } else if (nread == 0) { | |
| 1111 uv__stream_eof(stream, &buf); | |
| 1112 return; | |
| 1113 } else { | |
| 1114 /* Successful read */ | |
| 1115 ssize_t buflen = buf.len; | |
| 1116 | |
| 1117 if (is_ipc) { | |
| 1118 err = uv__stream_recv_cmsg(stream, &msg); | |
| 1119 if (err != 0) { | |
| 1120 stream->read_cb(stream, err, &buf); | |
| 1121 return; | |
| 1122 } | |
| 1123 } | |
| 1124 | |
| 1125 #if defined(__MVS__) | |
| 1126 if (is_ipc && msg.msg_controllen > 0) { | |
| 1127 uv_buf_t blankbuf; | |
| 1128 int nread; | |
| 1129 struct iovec *old; | |
| 1130 | |
| 1131 blankbuf.base = 0; | |
| 1132 blankbuf.len = 0; | |
| 1133 old = msg.msg_iov; | |
| 1134 msg.msg_iov = (struct iovec*) &blankbuf; | |
| 1135 nread = 0; | |
| 1136 do { | |
| 1137 nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0); | |
| 1138 err = uv__stream_recv_cmsg(stream, &msg); | |
| 1139 if (err != 0) { | |
| 1140 stream->read_cb(stream, err, &buf); | |
| 1141 msg.msg_iov = old; | |
| 1142 return; | |
| 1143 } | |
| 1144 } while (nread == 0 && msg.msg_controllen > 0); | |
| 1145 msg.msg_iov = old; | |
| 1146 } | |
| 1147 #endif | |
| 1148 stream->read_cb(stream, nread, &buf); | |
| 1149 | |
| 1150 /* Return if we didn't fill the buffer, there is no more data to read. */ | |
| 1151 if (nread < buflen) { | |
| 1152 stream->flags |= UV_HANDLE_READ_PARTIAL; | |
| 1153 return; | |
| 1154 } | |
| 1155 } | |
| 1156 } | |
| 1157 } | |
| 1158 | |
| 1159 | |
| 1160 int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) { | |
| 1161 assert(stream->type == UV_TCP || | |
| 1162 stream->type == UV_TTY || | |
| 1163 stream->type == UV_NAMED_PIPE); | |
| 1164 | |
| 1165 if (!(stream->flags & UV_HANDLE_WRITABLE) || | |
| 1166 stream->flags & UV_HANDLE_SHUT || | |
| 1167 uv__is_stream_shutting(stream) || | |
| 1168 uv__is_closing(stream)) { | |
| 1169 return UV_ENOTCONN; | |
| 1170 } | |
| 1171 | |
| 1172 assert(uv__stream_fd(stream) >= 0); | |
| 1173 | |
| 1174 /* Initialize request. The `shutdown(2)` call will always be deferred until | |
| 1175 * `uv__drain`, just before the callback is run. */ | |
| 1176 uv__req_init(stream->loop, req, UV_SHUTDOWN); | |
| 1177 req->handle = stream; | |
| 1178 req->cb = cb; | |
| 1179 stream->shutdown_req = req; | |
| 1180 stream->flags &= ~UV_HANDLE_WRITABLE; | |
| 1181 | |
| 1182 if (uv__queue_empty(&stream->write_queue)) | |
| 1183 uv__io_feed(stream->loop, &stream->io_watcher); | |
| 1184 | |
| 1185 return 0; | |
| 1186 } | |
| 1187 | |
| 1188 | |
| 1189 static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) { | |
| 1190 uv_stream_t* stream; | |
| 1191 | |
| 1192 stream = container_of(w, uv_stream_t, io_watcher); | |
| 1193 | |
| 1194 assert(stream->type == UV_TCP || | |
| 1195 stream->type == UV_NAMED_PIPE || | |
| 1196 stream->type == UV_TTY); | |
| 1197 assert(!(stream->flags & UV_HANDLE_CLOSING)); | |
| 1198 | |
| 1199 if (stream->connect_req) { | |
| 1200 uv__stream_connect(stream); | |
| 1201 return; | |
| 1202 } | |
| 1203 | |
| 1204 assert(uv__stream_fd(stream) >= 0); | |
| 1205 | |
| 1206 /* Ignore POLLHUP here. Even if it's set, there may still be data to read. */ | |
| 1207 if (events & (POLLIN | POLLERR | POLLHUP)) | |
| 1208 uv__read(stream); | |
| 1209 | |
| 1210 if (uv__stream_fd(stream) == -1) | |
| 1211 return; /* read_cb closed stream. */ | |
| 1212 | |
| 1213 /* Short-circuit iff POLLHUP is set, the user is still interested in read | |
| 1214 * events and uv__read() reported a partial read but not EOF. If the EOF | |
| 1215 * flag is set, uv__read() called read_cb with err=UV_EOF and we don't | |
| 1216 * have to do anything. If the partial read flag is not set, we can't | |
| 1217 * report the EOF yet because there is still data to read. | |
| 1218 */ | |
| 1219 if ((events & POLLHUP) && | |
| 1220 (stream->flags & UV_HANDLE_READING) && | |
| 1221 (stream->flags & UV_HANDLE_READ_PARTIAL) && | |
| 1222 !(stream->flags & UV_HANDLE_READ_EOF)) { | |
| 1223 uv_buf_t buf = { NULL, 0 }; | |
| 1224 uv__stream_eof(stream, &buf); | |
| 1225 } | |
| 1226 | |
| 1227 if (uv__stream_fd(stream) == -1) | |
| 1228 return; /* read_cb closed stream. */ | |
| 1229 | |
| 1230 if (events & (POLLOUT | POLLERR | POLLHUP)) { | |
| 1231 uv__write(stream); | |
| 1232 uv__write_callbacks(stream); | |
| 1233 | |
| 1234 /* Write queue drained. */ | |
| 1235 if (uv__queue_empty(&stream->write_queue)) | |
| 1236 uv__drain(stream); | |
| 1237 } | |
| 1238 } | |
| 1239 | |
| 1240 | |
| 1241 /** | |
| 1242 * We get called here from directly following a call to connect(2). | |
| 1243 * In order to determine if we've errored out or succeeded must call | |
| 1244 * getsockopt. | |
| 1245 */ | |
| 1246 static void uv__stream_connect(uv_stream_t* stream) { | |
| 1247 int error; | |
| 1248 uv_connect_t* req = stream->connect_req; | |
| 1249 socklen_t errorsize = sizeof(int); | |
| 1250 | |
| 1251 assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE); | |
| 1252 assert(req); | |
| 1253 | |
| 1254 if (stream->delayed_error) { | |
| 1255 /* To smooth over the differences between unixes errors that | |
| 1256 * were reported synchronously on the first connect can be delayed | |
| 1257 * until the next tick--which is now. | |
| 1258 */ | |
| 1259 error = stream->delayed_error; | |
| 1260 stream->delayed_error = 0; | |
| 1261 } else { | |
| 1262 /* Normal situation: we need to get the socket error from the kernel. */ | |
| 1263 assert(uv__stream_fd(stream) >= 0); | |
| 1264 getsockopt(uv__stream_fd(stream), | |
| 1265 SOL_SOCKET, | |
| 1266 SO_ERROR, | |
| 1267 &error, | |
| 1268 &errorsize); | |
| 1269 error = UV__ERR(error); | |
| 1270 } | |
| 1271 | |
| 1272 if (error == UV__ERR(EINPROGRESS)) | |
| 1273 return; | |
| 1274 | |
| 1275 stream->connect_req = NULL; | |
| 1276 uv__req_unregister(stream->loop); | |
| 1277 | |
| 1278 if (error < 0 || uv__queue_empty(&stream->write_queue)) { | |
| 1279 uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT); | |
| 1280 } | |
| 1281 | |
| 1282 if (req->cb) | |
| 1283 req->cb(req, error); | |
| 1284 | |
| 1285 if (uv__stream_fd(stream) == -1) | |
| 1286 return; | |
| 1287 | |
| 1288 if (error < 0) { | |
| 1289 uv__stream_flush_write_queue(stream, UV_ECANCELED); | |
| 1290 uv__write_callbacks(stream); | |
| 1291 } | |
| 1292 } | |
| 1293 | |
| 1294 | |
| 1295 static int uv__check_before_write(uv_stream_t* stream, | |
| 1296 unsigned int nbufs, | |
| 1297 uv_stream_t* send_handle) { | |
| 1298 assert(nbufs > 0); | |
| 1299 assert((stream->type == UV_TCP || | |
| 1300 stream->type == UV_NAMED_PIPE || | |
| 1301 stream->type == UV_TTY) && | |
| 1302 "uv_write (unix) does not yet support other types of streams"); | |
| 1303 | |
| 1304 if (uv__stream_fd(stream) < 0) | |
| 1305 return UV_EBADF; | |
| 1306 | |
| 1307 if (!(stream->flags & UV_HANDLE_WRITABLE)) | |
| 1308 return UV_EPIPE; | |
| 1309 | |
| 1310 if (send_handle != NULL) { | |
| 1311 if (stream->type != UV_NAMED_PIPE || !((uv_pipe_t*)stream)->ipc) | |
| 1312 return UV_EINVAL; | |
| 1313 | |
| 1314 /* XXX We abuse uv_write2() to send over UDP handles to child processes. | |
| 1315 * Don't call uv__stream_fd() on those handles, it's a macro that on OS X | |
| 1316 * evaluates to a function that operates on a uv_stream_t with a couple of | |
| 1317 * OS X specific fields. On other Unices it does (handle)->io_watcher.fd, | |
| 1318 * which works but only by accident. | |
| 1319 */ | |
| 1320 if (uv__handle_fd((uv_handle_t*) send_handle) < 0) | |
| 1321 return UV_EBADF; | |
| 1322 | |
| 1323 #if defined(__CYGWIN__) || defined(__MSYS__) | |
| 1324 /* Cygwin recvmsg always sets msg_controllen to zero, so we cannot send it. | |
| 1325 See https://github.com/mirror/newlib-cygwin/blob/86fc4bf0/winsup/cygwin/fhandler_socket.cc#L1736-L1743 */ | |
| 1326 return UV_ENOSYS; | |
| 1327 #endif | |
| 1328 } | |
| 1329 | |
| 1330 return 0; | |
| 1331 } | |
| 1332 | |
| 1333 int uv_write2(uv_write_t* req, | |
| 1334 uv_stream_t* stream, | |
| 1335 const uv_buf_t bufs[], | |
| 1336 unsigned int nbufs, | |
| 1337 uv_stream_t* send_handle, | |
| 1338 uv_write_cb cb) { | |
| 1339 int empty_queue; | |
| 1340 int err; | |
| 1341 | |
| 1342 err = uv__check_before_write(stream, nbufs, send_handle); | |
| 1343 if (err < 0) | |
| 1344 return err; | |
| 1345 | |
| 1346 /* It's legal for write_queue_size > 0 even when the write_queue is empty; | |
| 1347 * it means there are error-state requests in the write_completed_queue that | |
| 1348 * will touch up write_queue_size later, see also uv__write_req_finish(). | |
| 1349 * We could check that write_queue is empty instead but that implies making | |
| 1350 * a write() syscall when we know that the handle is in error mode. | |
| 1351 */ | |
| 1352 empty_queue = (stream->write_queue_size == 0); | |
| 1353 | |
| 1354 /* Initialize the req */ | |
| 1355 uv__req_init(stream->loop, req, UV_WRITE); | |
| 1356 req->cb = cb; | |
| 1357 req->handle = stream; | |
| 1358 req->error = 0; | |
| 1359 req->send_handle = send_handle; | |
| 1360 uv__queue_init(&req->queue); | |
| 1361 | |
| 1362 req->bufs = req->bufsml; | |
| 1363 if (nbufs > ARRAY_SIZE(req->bufsml)) | |
| 1364 req->bufs = uv__malloc(nbufs * sizeof(bufs[0])); | |
| 1365 | |
| 1366 if (req->bufs == NULL) | |
| 1367 return UV_ENOMEM; | |
| 1368 | |
| 1369 memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0])); | |
| 1370 req->nbufs = nbufs; | |
| 1371 req->write_index = 0; | |
| 1372 stream->write_queue_size += uv__count_bufs(bufs, nbufs); | |
| 1373 | |
| 1374 /* Append the request to write_queue. */ | |
| 1375 uv__queue_insert_tail(&stream->write_queue, &req->queue); | |
| 1376 | |
| 1377 /* If the queue was empty when this function began, we should attempt to | |
| 1378 * do the write immediately. Otherwise start the write_watcher and wait | |
| 1379 * for the fd to become writable. | |
| 1380 */ | |
| 1381 if (stream->connect_req) { | |
| 1382 /* Still connecting, do nothing. */ | |
| 1383 } | |
| 1384 else if (empty_queue) { | |
| 1385 uv__write(stream); | |
| 1386 } | |
| 1387 else { | |
| 1388 /* | |
| 1389 * blocking streams should never have anything in the queue. | |
| 1390 * if this assert fires then somehow the blocking stream isn't being | |
| 1391 * sufficiently flushed in uv__write. | |
| 1392 */ | |
| 1393 assert(!(stream->flags & UV_HANDLE_BLOCKING_WRITES)); | |
| 1394 uv__io_start(stream->loop, &stream->io_watcher, POLLOUT); | |
| 1395 uv__stream_osx_interrupt_select(stream); | |
| 1396 } | |
| 1397 | |
| 1398 return 0; | |
| 1399 } | |
| 1400 | |
| 1401 | |
| 1402 /* The buffers to be written must remain valid until the callback is called. | |
| 1403 * This is not required for the uv_buf_t array. | |
| 1404 */ | |
| 1405 int uv_write(uv_write_t* req, | |
| 1406 uv_stream_t* handle, | |
| 1407 const uv_buf_t bufs[], | |
| 1408 unsigned int nbufs, | |
| 1409 uv_write_cb cb) { | |
| 1410 return uv_write2(req, handle, bufs, nbufs, NULL, cb); | |
| 1411 } | |
| 1412 | |
| 1413 | |
| 1414 int uv_try_write(uv_stream_t* stream, | |
| 1415 const uv_buf_t bufs[], | |
| 1416 unsigned int nbufs) { | |
| 1417 return uv_try_write2(stream, bufs, nbufs, NULL); | |
| 1418 } | |
| 1419 | |
| 1420 | |
| 1421 int uv_try_write2(uv_stream_t* stream, | |
| 1422 const uv_buf_t bufs[], | |
| 1423 unsigned int nbufs, | |
| 1424 uv_stream_t* send_handle) { | |
| 1425 int err; | |
| 1426 | |
| 1427 /* Connecting or already writing some data */ | |
| 1428 if (stream->connect_req != NULL || stream->write_queue_size != 0) | |
| 1429 return UV_EAGAIN; | |
| 1430 | |
| 1431 err = uv__check_before_write(stream, nbufs, NULL); | |
| 1432 if (err < 0) | |
| 1433 return err; | |
| 1434 | |
| 1435 return uv__try_write(stream, bufs, nbufs, send_handle); | |
| 1436 } | |
| 1437 | |
| 1438 | |
| 1439 int uv__read_start(uv_stream_t* stream, | |
| 1440 uv_alloc_cb alloc_cb, | |
| 1441 uv_read_cb read_cb) { | |
| 1442 assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE || | |
| 1443 stream->type == UV_TTY); | |
| 1444 | |
| 1445 /* The UV_HANDLE_READING flag is irrelevant of the state of the stream - it | |
| 1446 * just expresses the desired state of the user. */ | |
| 1447 stream->flags |= UV_HANDLE_READING; | |
| 1448 stream->flags &= ~UV_HANDLE_READ_EOF; | |
| 1449 | |
| 1450 /* TODO: try to do the read inline? */ | |
| 1451 assert(uv__stream_fd(stream) >= 0); | |
| 1452 assert(alloc_cb); | |
| 1453 | |
| 1454 stream->read_cb = read_cb; | |
| 1455 stream->alloc_cb = alloc_cb; | |
| 1456 | |
| 1457 uv__io_start(stream->loop, &stream->io_watcher, POLLIN); | |
| 1458 uv__handle_start(stream); | |
| 1459 uv__stream_osx_interrupt_select(stream); | |
| 1460 | |
| 1461 return 0; | |
| 1462 } | |
| 1463 | |
| 1464 | |
| 1465 int uv_read_stop(uv_stream_t* stream) { | |
| 1466 if (!(stream->flags & UV_HANDLE_READING)) | |
| 1467 return 0; | |
| 1468 | |
| 1469 stream->flags &= ~UV_HANDLE_READING; | |
| 1470 uv__io_stop(stream->loop, &stream->io_watcher, POLLIN); | |
| 1471 uv__handle_stop(stream); | |
| 1472 uv__stream_osx_interrupt_select(stream); | |
| 1473 | |
| 1474 stream->read_cb = NULL; | |
| 1475 stream->alloc_cb = NULL; | |
| 1476 return 0; | |
| 1477 } | |
| 1478 | |
| 1479 | |
| 1480 int uv_is_readable(const uv_stream_t* stream) { | |
| 1481 return !!(stream->flags & UV_HANDLE_READABLE); | |
| 1482 } | |
| 1483 | |
| 1484 | |
| 1485 int uv_is_writable(const uv_stream_t* stream) { | |
| 1486 return !!(stream->flags & UV_HANDLE_WRITABLE); | |
| 1487 } | |
| 1488 | |
| 1489 | |
| 1490 #if defined(__APPLE__) | |
| 1491 int uv___stream_fd(const uv_stream_t* handle) { | |
| 1492 const uv__stream_select_t* s; | |
| 1493 | |
| 1494 assert(handle->type == UV_TCP || | |
| 1495 handle->type == UV_TTY || | |
| 1496 handle->type == UV_NAMED_PIPE); | |
| 1497 | |
| 1498 s = handle->select; | |
| 1499 if (s != NULL) | |
| 1500 return s->fd; | |
| 1501 | |
| 1502 return handle->io_watcher.fd; | |
| 1503 } | |
| 1504 #endif /* defined(__APPLE__) */ | |
| 1505 | |
| 1506 | |
| 1507 void uv__stream_close(uv_stream_t* handle) { | |
| 1508 unsigned int i; | |
| 1509 uv__stream_queued_fds_t* queued_fds; | |
| 1510 | |
| 1511 #if defined(__APPLE__) | |
| 1512 /* Terminate select loop first */ | |
| 1513 if (handle->select != NULL) { | |
| 1514 uv__stream_select_t* s; | |
| 1515 | |
| 1516 s = handle->select; | |
| 1517 | |
| 1518 uv_sem_post(&s->close_sem); | |
| 1519 uv_sem_post(&s->async_sem); | |
| 1520 uv__stream_osx_interrupt_select(handle); | |
| 1521 uv_thread_join(&s->thread); | |
| 1522 uv_sem_destroy(&s->close_sem); | |
| 1523 uv_sem_destroy(&s->async_sem); | |
| 1524 uv__close(s->fake_fd); | |
| 1525 uv__close(s->int_fd); | |
| 1526 uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close); | |
| 1527 | |
| 1528 handle->select = NULL; | |
| 1529 } | |
| 1530 #endif /* defined(__APPLE__) */ | |
| 1531 | |
| 1532 uv__io_close(handle->loop, &handle->io_watcher); | |
| 1533 uv_read_stop(handle); | |
| 1534 uv__handle_stop(handle); | |
| 1535 handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE); | |
| 1536 | |
| 1537 if (handle->io_watcher.fd != -1) { | |
| 1538 /* Don't close stdio file descriptors. Nothing good comes from it. */ | |
| 1539 if (handle->io_watcher.fd > STDERR_FILENO) | |
| 1540 uv__close(handle->io_watcher.fd); | |
| 1541 handle->io_watcher.fd = -1; | |
| 1542 } | |
| 1543 | |
| 1544 if (handle->accepted_fd != -1) { | |
| 1545 uv__close(handle->accepted_fd); | |
| 1546 handle->accepted_fd = -1; | |
| 1547 } | |
| 1548 | |
| 1549 /* Close all queued fds */ | |
| 1550 if (handle->queued_fds != NULL) { | |
| 1551 queued_fds = handle->queued_fds; | |
| 1552 for (i = 0; i < queued_fds->offset; i++) | |
| 1553 uv__close(queued_fds->fds[i]); | |
| 1554 uv__free(handle->queued_fds); | |
| 1555 handle->queued_fds = NULL; | |
| 1556 } | |
| 1557 | |
| 1558 assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT)); | |
| 1559 } | |
| 1560 | |
| 1561 | |
| 1562 int uv_stream_set_blocking(uv_stream_t* handle, int blocking) { | |
| 1563 /* Don't need to check the file descriptor, uv__nonblock() | |
| 1564 * will fail with EBADF if it's not valid. | |
| 1565 */ | |
| 1566 return uv__nonblock(uv__stream_fd(handle), !blocking); | |
| 1567 } |