comparison third_party/libuv/src/unix/udp.c @ 160:948de3f54cea

[ThirdParty] Added libuv
author June Park <parkjune1995@gmail.com>
date Wed, 14 Jan 2026 19:39:52 -0800
parents
children
comparison
equal deleted inserted replaced
159:05cf9467a1c3 160:948de3f54cea
1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2 *
3 * Permission is hereby granted, free of charge, to any person obtaining a copy
4 * of this software and associated documentation files (the "Software"), to
5 * deal in the Software without restriction, including without limitation the
6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7 * sell copies of the Software, and to permit persons to whom the Software is
8 * furnished to do so, subject to the following conditions:
9 *
10 * The above copyright notice and this permission notice shall be included in
11 * all copies or substantial portions of the Software.
12 *
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19 * IN THE SOFTWARE.
20 */
21
22 #include "uv.h"
23 #include "internal.h"
24
25 #include <assert.h>
26 #include <string.h>
27 #include <errno.h>
28 #include <stdlib.h>
29 #include <unistd.h>
30 #if defined(__MVS__)
31 #include <xti.h>
32 #endif
33 #include <sys/un.h>
34
35 #if defined(IPV6_JOIN_GROUP) && !defined(IPV6_ADD_MEMBERSHIP)
36 # define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
37 #endif
38
39 #if defined(IPV6_LEAVE_GROUP) && !defined(IPV6_DROP_MEMBERSHIP)
40 # define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP
41 #endif
42
43 static void uv__udp_run_completed(uv_udp_t* handle);
44 static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents);
45 static void uv__udp_recvmsg(uv_udp_t* handle);
46 static void uv__udp_sendmsg(uv_udp_t* handle);
47 static int uv__udp_maybe_deferred_bind(uv_udp_t* handle,
48 int domain,
49 unsigned int flags);
50 static int uv__udp_sendmsg1(int fd,
51 const uv_buf_t* bufs,
52 unsigned int nbufs,
53 const struct sockaddr* addr);
54
55
56 void uv__udp_close(uv_udp_t* handle) {
57 uv__io_close(handle->loop, &handle->io_watcher);
58 uv__handle_stop(handle);
59
60 if (handle->io_watcher.fd != -1) {
61 uv__close(handle->io_watcher.fd);
62 handle->io_watcher.fd = -1;
63 }
64 }
65
66
67 void uv__udp_finish_close(uv_udp_t* handle) {
68 uv_udp_send_t* req;
69 struct uv__queue* q;
70
71 assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT));
72 assert(handle->io_watcher.fd == -1);
73
74 while (!uv__queue_empty(&handle->write_queue)) {
75 q = uv__queue_head(&handle->write_queue);
76 uv__queue_remove(q);
77
78 req = uv__queue_data(q, uv_udp_send_t, queue);
79 req->status = UV_ECANCELED;
80 uv__queue_insert_tail(&handle->write_completed_queue, &req->queue);
81 }
82
83 uv__udp_run_completed(handle);
84
85 assert(handle->send_queue_size == 0);
86 assert(handle->send_queue_count == 0);
87
88 /* Now tear down the handle. */
89 handle->recv_cb = NULL;
90 handle->alloc_cb = NULL;
91 /* but _do not_ touch close_cb */
92 }
93
94
95 static void uv__udp_run_completed(uv_udp_t* handle) {
96 uv_udp_send_t* req;
97 struct uv__queue* q;
98
99 assert(!(handle->flags & UV_HANDLE_UDP_PROCESSING));
100 handle->flags |= UV_HANDLE_UDP_PROCESSING;
101
102 while (!uv__queue_empty(&handle->write_completed_queue)) {
103 q = uv__queue_head(&handle->write_completed_queue);
104 uv__queue_remove(q);
105
106 req = uv__queue_data(q, uv_udp_send_t, queue);
107 uv__req_unregister(handle->loop);
108
109 handle->send_queue_size -= uv__count_bufs(req->bufs, req->nbufs);
110 handle->send_queue_count--;
111
112 if (req->bufs != req->bufsml)
113 uv__free(req->bufs);
114 req->bufs = NULL;
115
116 if (req->send_cb == NULL)
117 continue;
118
119 /* req->status >= 0 == bytes written
120 * req->status < 0 == errno
121 */
122 if (req->status >= 0)
123 req->send_cb(req, 0);
124 else
125 req->send_cb(req, req->status);
126 }
127
128 if (uv__queue_empty(&handle->write_queue)) {
129 /* Pending queue and completion queue empty, stop watcher. */
130 uv__io_stop(handle->loop, &handle->io_watcher, POLLOUT);
131 if (!uv__io_active(&handle->io_watcher, POLLIN))
132 uv__handle_stop(handle);
133 }
134
135 handle->flags &= ~UV_HANDLE_UDP_PROCESSING;
136 }
137
138
139 static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents) {
140 uv_udp_t* handle;
141
142 handle = container_of(w, uv_udp_t, io_watcher);
143 assert(handle->type == UV_UDP);
144
145 if (revents & POLLIN)
146 uv__udp_recvmsg(handle);
147
148 if (revents & POLLOUT && !uv__is_closing(handle)) {
149 uv__udp_sendmsg(handle);
150 uv__udp_run_completed(handle);
151 }
152 }
153
154 static int uv__udp_recvmmsg(uv_udp_t* handle, uv_buf_t* buf) {
155 #if defined(__linux__) || defined(__FreeBSD__) || defined(__APPLE__)
156 struct sockaddr_in6 peers[20];
157 struct iovec iov[ARRAY_SIZE(peers)];
158 struct mmsghdr msgs[ARRAY_SIZE(peers)];
159 ssize_t nread;
160 uv_buf_t chunk_buf;
161 size_t chunks;
162 int flags;
163 size_t k;
164
165 /* prepare structures for recvmmsg */
166 chunks = buf->len / UV__UDP_DGRAM_MAXSIZE;
167 if (chunks > ARRAY_SIZE(iov))
168 chunks = ARRAY_SIZE(iov);
169 for (k = 0; k < chunks; ++k) {
170 iov[k].iov_base = buf->base + k * UV__UDP_DGRAM_MAXSIZE;
171 iov[k].iov_len = UV__UDP_DGRAM_MAXSIZE;
172 memset(&msgs[k].msg_hdr, 0, sizeof(msgs[k].msg_hdr));
173 msgs[k].msg_hdr.msg_iov = iov + k;
174 msgs[k].msg_hdr.msg_iovlen = 1;
175 msgs[k].msg_hdr.msg_name = peers + k;
176 msgs[k].msg_hdr.msg_namelen = sizeof(peers[0]);
177 msgs[k].msg_hdr.msg_control = NULL;
178 msgs[k].msg_hdr.msg_controllen = 0;
179 msgs[k].msg_hdr.msg_flags = 0;
180 msgs[k].msg_len = 0;
181 }
182
183 #if defined(__APPLE__)
184 do
185 nread = recvmsg_x(handle->io_watcher.fd, msgs, chunks, MSG_DONTWAIT);
186 while (nread == -1 && errno == EINTR);
187 #else
188 do
189 nread = recvmmsg(handle->io_watcher.fd, msgs, chunks, 0, NULL);
190 while (nread == -1 && errno == EINTR);
191 #endif
192
193 if (nread < 1) {
194 if (nread == 0 || errno == EAGAIN || errno == EWOULDBLOCK)
195 handle->recv_cb(handle, 0, buf, NULL, 0);
196 else
197 handle->recv_cb(handle, UV__ERR(errno), buf, NULL, 0);
198 } else {
199 /* pass each chunk to the application */
200 for (k = 0; k < (size_t) nread && handle->recv_cb != NULL; k++) {
201 flags = UV_UDP_MMSG_CHUNK;
202 if (msgs[k].msg_hdr.msg_flags & MSG_TRUNC)
203 flags |= UV_UDP_PARTIAL;
204
205 chunk_buf = uv_buf_init(iov[k].iov_base, iov[k].iov_len);
206 handle->recv_cb(handle,
207 msgs[k].msg_len,
208 &chunk_buf,
209 msgs[k].msg_hdr.msg_name,
210 flags);
211 }
212
213 /* one last callback so the original buffer is freed */
214 if (handle->recv_cb != NULL)
215 handle->recv_cb(handle, 0, buf, NULL, UV_UDP_MMSG_FREE);
216 }
217 return nread;
218 #else /* __linux__ || ____FreeBSD__ || __APPLE__ */
219 return UV_ENOSYS;
220 #endif /* __linux__ || ____FreeBSD__ || __APPLE__ */
221 }
222
223 static void uv__udp_recvmsg(uv_udp_t* handle) {
224 struct sockaddr_storage peer;
225 struct msghdr h;
226 ssize_t nread;
227 uv_buf_t buf;
228 int flags;
229 int count;
230
231 assert(handle->recv_cb != NULL);
232 assert(handle->alloc_cb != NULL);
233
234 /* Prevent loop starvation when the data comes in as fast as (or faster than)
235 * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O.
236 */
237 count = 32;
238
239 do {
240 buf = uv_buf_init(NULL, 0);
241 handle->alloc_cb((uv_handle_t*) handle, UV__UDP_DGRAM_MAXSIZE, &buf);
242 if (buf.base == NULL || buf.len == 0) {
243 handle->recv_cb(handle, UV_ENOBUFS, &buf, NULL, 0);
244 return;
245 }
246 assert(buf.base != NULL);
247
248 if (uv_udp_using_recvmmsg(handle)) {
249 nread = uv__udp_recvmmsg(handle, &buf);
250 if (nread > 0)
251 count -= nread;
252 continue;
253 }
254
255 memset(&h, 0, sizeof(h));
256 memset(&peer, 0, sizeof(peer));
257 h.msg_name = &peer;
258 h.msg_namelen = sizeof(peer);
259 h.msg_iov = (void*) &buf;
260 h.msg_iovlen = 1;
261
262 do {
263 nread = recvmsg(handle->io_watcher.fd, &h, 0);
264 }
265 while (nread == -1 && errno == EINTR);
266
267 if (nread == -1) {
268 if (errno == EAGAIN || errno == EWOULDBLOCK)
269 handle->recv_cb(handle, 0, &buf, NULL, 0);
270 else
271 handle->recv_cb(handle, UV__ERR(errno), &buf, NULL, 0);
272 }
273 else {
274 flags = 0;
275 if (h.msg_flags & MSG_TRUNC)
276 flags |= UV_UDP_PARTIAL;
277
278 handle->recv_cb(handle, nread, &buf, (const struct sockaddr*) &peer, flags);
279 }
280 count--;
281 }
282 /* recv_cb callback may decide to pause or close the handle */
283 while (nread != -1
284 && count > 0
285 && handle->io_watcher.fd != -1
286 && handle->recv_cb != NULL);
287 }
288
289
290 /* On the BSDs, SO_REUSEPORT implies SO_REUSEADDR but with some additional
291 * refinements for programs that use multicast. Therefore we preferentially
292 * set SO_REUSEPORT over SO_REUSEADDR here, but we set SO_REUSEPORT only
293 * when that socket option doesn't have the capability of load balancing.
294 * Otherwise, we fall back to SO_REUSEADDR.
295 *
296 * Linux as of 3.9, DragonflyBSD 3.6, AIX 7.2.5 have the SO_REUSEPORT socket
297 * option but with semantics that are different from the BSDs: it _shares_
298 * the port rather than steals it from the current listener. While useful,
299 * it's not something we can emulate on other platforms so we don't enable it.
300 *
301 * zOS does not support getsockname with SO_REUSEPORT option when using
302 * AF_UNIX.
303 *
304 * Solaris 11.4: SO_REUSEPORT will not load balance when SO_REUSEADDR
305 * is also set, but it's not valid for every socket type.
306 */
307 static int uv__sock_reuseaddr(int fd) {
308 int yes;
309 yes = 1;
310
311 #if defined(SO_REUSEPORT) && defined(__MVS__)
312 struct sockaddr_in sockfd;
313 unsigned int sockfd_len = sizeof(sockfd);
314 if (getsockname(fd, (struct sockaddr*) &sockfd, &sockfd_len) == -1)
315 return UV__ERR(errno);
316 if (sockfd.sin_family == AF_UNIX) {
317 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)))
318 return UV__ERR(errno);
319 } else {
320 if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes)))
321 return UV__ERR(errno);
322 }
323 #elif defined(SO_REUSEPORT) && defined(UV__SOLARIS_11_4) && UV__SOLARIS_11_4
324 if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes))) {
325 if (errno != ENOPROTOOPT)
326 return UV__ERR(errno);
327 /* Not all socket types accept SO_REUSEPORT. */
328 errno = 0;
329 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)))
330 return UV__ERR(errno);
331 }
332 #elif defined(SO_REUSEPORT) && \
333 !defined(__linux__) && !defined(__GNU__) && \
334 !defined(__illumos__) && !defined(__DragonFly__) && !defined(_AIX73)
335 if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes)))
336 return UV__ERR(errno);
337 #else
338 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)))
339 return UV__ERR(errno);
340 #endif
341
342 return 0;
343 }
344
345 /*
346 * The Linux kernel suppresses some ICMP error messages by default for UDP
347 * sockets. Setting IP_RECVERR/IPV6_RECVERR on the socket enables full ICMP
348 * error reporting, hopefully resulting in faster failover to working name
349 * servers.
350 */
351 static int uv__set_recverr(int fd, sa_family_t ss_family) {
352 #if defined(__linux__)
353 int yes;
354
355 yes = 1;
356 if (ss_family == AF_INET) {
357 if (setsockopt(fd, IPPROTO_IP, IP_RECVERR, &yes, sizeof(yes)))
358 return UV__ERR(errno);
359 } else if (ss_family == AF_INET6) {
360 if (setsockopt(fd, IPPROTO_IPV6, IPV6_RECVERR, &yes, sizeof(yes)))
361 return UV__ERR(errno);
362 }
363 #endif
364 return 0;
365 }
366
367
368 int uv__udp_bind(uv_udp_t* handle,
369 const struct sockaddr* addr,
370 unsigned int addrlen,
371 unsigned int flags) {
372 int err;
373 int yes;
374 int fd;
375
376 /* Check for bad flags. */
377 if (flags & ~(UV_UDP_IPV6ONLY | UV_UDP_REUSEADDR |
378 UV_UDP_REUSEPORT | UV_UDP_LINUX_RECVERR))
379 return UV_EINVAL;
380
381 /* Cannot set IPv6-only mode on non-IPv6 socket. */
382 if ((flags & UV_UDP_IPV6ONLY) && addr->sa_family != AF_INET6)
383 return UV_EINVAL;
384
385 fd = handle->io_watcher.fd;
386 if (fd == -1) {
387 err = uv__socket(addr->sa_family, SOCK_DGRAM, 0);
388 if (err < 0)
389 return err;
390 fd = err;
391 handle->io_watcher.fd = fd;
392 }
393
394 if (flags & UV_UDP_LINUX_RECVERR) {
395 err = uv__set_recverr(fd, addr->sa_family);
396 if (err)
397 return err;
398 }
399
400 if (flags & UV_UDP_REUSEADDR) {
401 err = uv__sock_reuseaddr(fd);
402 if (err)
403 return err;
404 }
405
406 if (flags & UV_UDP_REUSEPORT) {
407 err = uv__sock_reuseport(fd);
408 if (err)
409 return err;
410 }
411
412 if (flags & UV_UDP_IPV6ONLY) {
413 #ifdef IPV6_V6ONLY
414 yes = 1;
415 if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &yes, sizeof yes) == -1) {
416 err = UV__ERR(errno);
417 return err;
418 }
419 #else
420 err = UV_ENOTSUP;
421 return err;
422 #endif
423 }
424
425 if (bind(fd, addr, addrlen)) {
426 err = UV__ERR(errno);
427 if (errno == EAFNOSUPPORT)
428 /* OSX, other BSDs and SunoS fail with EAFNOSUPPORT when binding a
429 * socket created with AF_INET to an AF_INET6 address or vice versa. */
430 err = UV_EINVAL;
431 return err;
432 }
433
434 if (addr->sa_family == AF_INET6)
435 handle->flags |= UV_HANDLE_IPV6;
436
437 handle->flags |= UV_HANDLE_BOUND;
438 return 0;
439 }
440
441
442 static int uv__udp_maybe_deferred_bind(uv_udp_t* handle,
443 int domain,
444 unsigned int flags) {
445 union uv__sockaddr taddr;
446 socklen_t addrlen;
447
448 if (handle->io_watcher.fd != -1)
449 return 0;
450
451 switch (domain) {
452 case AF_INET:
453 {
454 struct sockaddr_in* addr = &taddr.in;
455 memset(addr, 0, sizeof *addr);
456 addr->sin_family = AF_INET;
457 addr->sin_addr.s_addr = INADDR_ANY;
458 addrlen = sizeof *addr;
459 break;
460 }
461 case AF_INET6:
462 {
463 struct sockaddr_in6* addr = &taddr.in6;
464 memset(addr, 0, sizeof *addr);
465 addr->sin6_family = AF_INET6;
466 addr->sin6_addr = in6addr_any;
467 addrlen = sizeof *addr;
468 break;
469 }
470 default:
471 assert(0 && "unsupported address family");
472 abort();
473 }
474
475 return uv__udp_bind(handle, &taddr.addr, addrlen, flags);
476 }
477
478
479 int uv__udp_connect(uv_udp_t* handle,
480 const struct sockaddr* addr,
481 unsigned int addrlen) {
482 int err;
483
484 err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0);
485 if (err)
486 return err;
487
488 do {
489 errno = 0;
490 err = connect(handle->io_watcher.fd, addr, addrlen);
491 } while (err == -1 && errno == EINTR);
492
493 if (err)
494 return UV__ERR(errno);
495
496 handle->flags |= UV_HANDLE_UDP_CONNECTED;
497
498 return 0;
499 }
500
501 /* From https://pubs.opengroup.org/onlinepubs/9699919799/functions/connect.html
502 * Any of uv supported UNIXs kernel should be standardized, but the kernel
503 * implementation logic not same, let's use pseudocode to explain the udp
504 * disconnect behaviors:
505 *
506 * Predefined stubs for pseudocode:
507 * 1. sodisconnect: The function to perform the real udp disconnect
508 * 2. pru_connect: The function to perform the real udp connect
509 * 3. so: The kernel object match with socket fd
510 * 4. addr: The sockaddr parameter from user space
511 *
512 * BSDs:
513 * if(sodisconnect(so) == 0) { // udp disconnect succeed
514 * if (addr->sa_len != so->addr->sa_len) return EINVAL;
515 * if (addr->sa_family != so->addr->sa_family) return EAFNOSUPPORT;
516 * pru_connect(so);
517 * }
518 * else return EISCONN;
519 *
520 * z/OS (same with Windows):
521 * if(addr->sa_len < so->addr->sa_len) return EINVAL;
522 * if (addr->sa_family == AF_UNSPEC) sodisconnect(so);
523 *
524 * AIX:
525 * if(addr->sa_len != sizeof(struct sockaddr)) return EINVAL; // ignore ip proto version
526 * if (addr->sa_family == AF_UNSPEC) sodisconnect(so);
527 *
528 * Linux,Others:
529 * if(addr->sa_len < sizeof(struct sockaddr)) return EINVAL;
530 * if (addr->sa_family == AF_UNSPEC) sodisconnect(so);
531 */
532 int uv__udp_disconnect(uv_udp_t* handle) {
533 int r;
534 #if defined(__MVS__)
535 struct sockaddr_storage addr;
536 #else
537 struct sockaddr addr;
538 #endif
539
540 memset(&addr, 0, sizeof(addr));
541
542 #if defined(__MVS__)
543 addr.ss_family = AF_UNSPEC;
544 #else
545 addr.sa_family = AF_UNSPEC;
546 #endif
547
548 do {
549 errno = 0;
550 #ifdef __PASE__
551 /* On IBMi a connectionless transport socket can be disconnected by
552 * either setting the addr parameter to NULL or setting the
553 * addr_length parameter to zero, and issuing another connect().
554 * https://www.ibm.com/docs/en/i/7.4?topic=ssw_ibm_i_74/apis/connec.htm
555 */
556 r = connect(handle->io_watcher.fd, (struct sockaddr*) NULL, 0);
557 #else
558 r = connect(handle->io_watcher.fd, (struct sockaddr*) &addr, sizeof(addr));
559 #endif
560 } while (r == -1 && errno == EINTR);
561
562 if (r == -1) {
563 #if defined(BSD) /* The macro BSD is from sys/param.h */
564 if (errno != EAFNOSUPPORT && errno != EINVAL)
565 return UV__ERR(errno);
566 #else
567 return UV__ERR(errno);
568 #endif
569 }
570
571 handle->flags &= ~UV_HANDLE_UDP_CONNECTED;
572 return 0;
573 }
574
575 int uv__udp_send(uv_udp_send_t* req,
576 uv_udp_t* handle,
577 const uv_buf_t bufs[],
578 unsigned int nbufs,
579 const struct sockaddr* addr,
580 unsigned int addrlen,
581 uv_udp_send_cb send_cb) {
582 int err;
583 int empty_queue;
584
585 assert(nbufs > 0);
586
587 if (addr) {
588 err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0);
589 if (err)
590 return err;
591 }
592
593 /* It's legal for send_queue_count > 0 even when the write_queue is empty;
594 * it means there are error-state requests in the write_completed_queue that
595 * will touch up send_queue_size/count later.
596 */
597 empty_queue = (handle->send_queue_count == 0);
598
599 uv__req_init(handle->loop, req, UV_UDP_SEND);
600 assert(addrlen <= sizeof(req->u.storage));
601 if (addr == NULL)
602 req->u.storage.ss_family = AF_UNSPEC;
603 else
604 memcpy(&req->u.storage, addr, addrlen);
605 req->send_cb = send_cb;
606 req->handle = handle;
607 req->nbufs = nbufs;
608
609 req->bufs = req->bufsml;
610 if (nbufs > ARRAY_SIZE(req->bufsml))
611 req->bufs = uv__malloc(nbufs * sizeof(bufs[0]));
612
613 if (req->bufs == NULL) {
614 uv__req_unregister(handle->loop);
615 return UV_ENOMEM;
616 }
617
618 memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0]));
619 handle->send_queue_size += uv__count_bufs(req->bufs, req->nbufs);
620 handle->send_queue_count++;
621 uv__queue_insert_tail(&handle->write_queue, &req->queue);
622 uv__handle_start(handle);
623
624 if (empty_queue && !(handle->flags & UV_HANDLE_UDP_PROCESSING)) {
625 uv__udp_sendmsg(handle);
626
627 /* `uv__udp_sendmsg` may not be able to do non-blocking write straight
628 * away. In such cases the `io_watcher` has to be queued for asynchronous
629 * write.
630 */
631 if (!uv__queue_empty(&handle->write_queue))
632 uv__io_start(handle->loop, &handle->io_watcher, POLLOUT);
633 } else {
634 uv__io_start(handle->loop, &handle->io_watcher, POLLOUT);
635 }
636
637 return 0;
638 }
639
640
641 int uv__udp_try_send(uv_udp_t* handle,
642 const uv_buf_t bufs[],
643 unsigned int nbufs,
644 const struct sockaddr* addr,
645 unsigned int addrlen) {
646 int err;
647
648 if (nbufs < 1)
649 return UV_EINVAL;
650
651 /* already sending a message */
652 if (handle->send_queue_count != 0)
653 return UV_EAGAIN;
654
655 if (addr) {
656 err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0);
657 if (err)
658 return err;
659 } else {
660 assert(handle->flags & UV_HANDLE_UDP_CONNECTED);
661 }
662
663 err = uv__udp_sendmsg1(handle->io_watcher.fd, bufs, nbufs, addr);
664 if (err > 0)
665 return uv__count_bufs(bufs, nbufs);
666
667 return err;
668 }
669
670
671 static int uv__udp_set_membership4(uv_udp_t* handle,
672 const struct sockaddr_in* multicast_addr,
673 const char* interface_addr,
674 uv_membership membership) {
675 struct ip_mreq mreq;
676 int optname;
677 int err;
678
679 memset(&mreq, 0, sizeof mreq);
680
681 if (interface_addr) {
682 err = uv_inet_pton(AF_INET, interface_addr, &mreq.imr_interface.s_addr);
683 if (err)
684 return err;
685 } else {
686 mreq.imr_interface.s_addr = htonl(INADDR_ANY);
687 }
688
689 mreq.imr_multiaddr.s_addr = multicast_addr->sin_addr.s_addr;
690
691 switch (membership) {
692 case UV_JOIN_GROUP:
693 optname = IP_ADD_MEMBERSHIP;
694 break;
695 case UV_LEAVE_GROUP:
696 optname = IP_DROP_MEMBERSHIP;
697 break;
698 default:
699 return UV_EINVAL;
700 }
701
702 if (setsockopt(handle->io_watcher.fd,
703 IPPROTO_IP,
704 optname,
705 &mreq,
706 sizeof(mreq))) {
707 #if defined(__MVS__)
708 if (errno == ENXIO)
709 return UV_ENODEV;
710 #endif
711 return UV__ERR(errno);
712 }
713
714 return 0;
715 }
716
717
718 static int uv__udp_set_membership6(uv_udp_t* handle,
719 const struct sockaddr_in6* multicast_addr,
720 const char* interface_addr,
721 uv_membership membership) {
722 int optname;
723 struct ipv6_mreq mreq;
724 struct sockaddr_in6 addr6;
725
726 memset(&mreq, 0, sizeof mreq);
727
728 if (interface_addr) {
729 if (uv_ip6_addr(interface_addr, 0, &addr6))
730 return UV_EINVAL;
731 mreq.ipv6mr_interface = addr6.sin6_scope_id;
732 } else {
733 mreq.ipv6mr_interface = 0;
734 }
735
736 mreq.ipv6mr_multiaddr = multicast_addr->sin6_addr;
737
738 switch (membership) {
739 case UV_JOIN_GROUP:
740 optname = IPV6_ADD_MEMBERSHIP;
741 break;
742 case UV_LEAVE_GROUP:
743 optname = IPV6_DROP_MEMBERSHIP;
744 break;
745 default:
746 return UV_EINVAL;
747 }
748
749 if (setsockopt(handle->io_watcher.fd,
750 IPPROTO_IPV6,
751 optname,
752 &mreq,
753 sizeof(mreq))) {
754 #if defined(__MVS__)
755 if (errno == ENXIO)
756 return UV_ENODEV;
757 #endif
758 return UV__ERR(errno);
759 }
760
761 return 0;
762 }
763
764
765 #if !defined(__OpenBSD__) && \
766 !defined(__NetBSD__) && \
767 !defined(__ANDROID__) && \
768 !defined(__DragonFly__) && \
769 !defined(__QNX__) && \
770 !defined(__GNU__)
771 static int uv__udp_set_source_membership4(uv_udp_t* handle,
772 const struct sockaddr_in* multicast_addr,
773 const char* interface_addr,
774 const struct sockaddr_in* source_addr,
775 uv_membership membership) {
776 struct ip_mreq_source mreq;
777 int optname;
778 int err;
779
780 err = uv__udp_maybe_deferred_bind(handle, AF_INET, UV_UDP_REUSEADDR);
781 if (err)
782 return err;
783
784 memset(&mreq, 0, sizeof(mreq));
785
786 if (interface_addr != NULL) {
787 err = uv_inet_pton(AF_INET, interface_addr, &mreq.imr_interface.s_addr);
788 if (err)
789 return err;
790 } else {
791 mreq.imr_interface.s_addr = htonl(INADDR_ANY);
792 }
793
794 mreq.imr_multiaddr.s_addr = multicast_addr->sin_addr.s_addr;
795 mreq.imr_sourceaddr.s_addr = source_addr->sin_addr.s_addr;
796
797 if (membership == UV_JOIN_GROUP)
798 optname = IP_ADD_SOURCE_MEMBERSHIP;
799 else if (membership == UV_LEAVE_GROUP)
800 optname = IP_DROP_SOURCE_MEMBERSHIP;
801 else
802 return UV_EINVAL;
803
804 if (setsockopt(handle->io_watcher.fd,
805 IPPROTO_IP,
806 optname,
807 &mreq,
808 sizeof(mreq))) {
809 return UV__ERR(errno);
810 }
811
812 return 0;
813 }
814
815
816 static int uv__udp_set_source_membership6(uv_udp_t* handle,
817 const struct sockaddr_in6* multicast_addr,
818 const char* interface_addr,
819 const struct sockaddr_in6* source_addr,
820 uv_membership membership) {
821 struct group_source_req mreq;
822 struct sockaddr_in6 addr6;
823 int optname;
824 int err;
825
826 err = uv__udp_maybe_deferred_bind(handle, AF_INET6, UV_UDP_REUSEADDR);
827 if (err)
828 return err;
829
830 memset(&mreq, 0, sizeof(mreq));
831
832 if (interface_addr != NULL) {
833 err = uv_ip6_addr(interface_addr, 0, &addr6);
834 if (err)
835 return err;
836 mreq.gsr_interface = addr6.sin6_scope_id;
837 } else {
838 mreq.gsr_interface = 0;
839 }
840
841 STATIC_ASSERT(sizeof(mreq.gsr_group) >= sizeof(*multicast_addr));
842 STATIC_ASSERT(sizeof(mreq.gsr_source) >= sizeof(*source_addr));
843 memcpy(&mreq.gsr_group, multicast_addr, sizeof(*multicast_addr));
844 memcpy(&mreq.gsr_source, source_addr, sizeof(*source_addr));
845
846 if (membership == UV_JOIN_GROUP)
847 optname = MCAST_JOIN_SOURCE_GROUP;
848 else if (membership == UV_LEAVE_GROUP)
849 optname = MCAST_LEAVE_SOURCE_GROUP;
850 else
851 return UV_EINVAL;
852
853 if (setsockopt(handle->io_watcher.fd,
854 IPPROTO_IPV6,
855 optname,
856 &mreq,
857 sizeof(mreq))) {
858 return UV__ERR(errno);
859 }
860
861 return 0;
862 }
863 #endif
864
865
866 int uv__udp_init_ex(uv_loop_t* loop,
867 uv_udp_t* handle,
868 unsigned flags,
869 int domain) {
870 int fd;
871
872 fd = -1;
873 if (domain != AF_UNSPEC) {
874 fd = uv__socket(domain, SOCK_DGRAM, 0);
875 if (fd < 0)
876 return fd;
877 }
878
879 uv__handle_init(loop, (uv_handle_t*)handle, UV_UDP);
880 handle->alloc_cb = NULL;
881 handle->recv_cb = NULL;
882 handle->send_queue_size = 0;
883 handle->send_queue_count = 0;
884 uv__io_init(&handle->io_watcher, uv__udp_io, fd);
885 uv__queue_init(&handle->write_queue);
886 uv__queue_init(&handle->write_completed_queue);
887
888 return 0;
889 }
890
891
892 int uv_udp_using_recvmmsg(const uv_udp_t* handle) {
893 #if defined(__linux__) || defined(__FreeBSD__) || defined(__APPLE__)
894 if (handle->flags & UV_HANDLE_UDP_RECVMMSG)
895 return 1;
896 #endif
897 return 0;
898 }
899
900
901 int uv_udp_open(uv_udp_t* handle, uv_os_sock_t sock) {
902 int err;
903
904 /* Check for already active socket. */
905 if (handle->io_watcher.fd != -1)
906 return UV_EBUSY;
907
908 if (uv__fd_exists(handle->loop, sock))
909 return UV_EEXIST;
910
911 err = uv__nonblock(sock, 1);
912 if (err)
913 return err;
914
915 err = uv__sock_reuseaddr(sock);
916 if (err)
917 return err;
918
919 handle->io_watcher.fd = sock;
920 if (uv__udp_is_connected(handle))
921 handle->flags |= UV_HANDLE_UDP_CONNECTED;
922
923 return 0;
924 }
925
926
927 int uv_udp_set_membership(uv_udp_t* handle,
928 const char* multicast_addr,
929 const char* interface_addr,
930 uv_membership membership) {
931 int err;
932 struct sockaddr_in addr4;
933 struct sockaddr_in6 addr6;
934
935 if (uv_ip4_addr(multicast_addr, 0, &addr4) == 0) {
936 err = uv__udp_maybe_deferred_bind(handle, AF_INET, UV_UDP_REUSEADDR);
937 if (err)
938 return err;
939 return uv__udp_set_membership4(handle, &addr4, interface_addr, membership);
940 } else if (uv_ip6_addr(multicast_addr, 0, &addr6) == 0) {
941 err = uv__udp_maybe_deferred_bind(handle, AF_INET6, UV_UDP_REUSEADDR);
942 if (err)
943 return err;
944 return uv__udp_set_membership6(handle, &addr6, interface_addr, membership);
945 } else {
946 return UV_EINVAL;
947 }
948 }
949
950
951 int uv_udp_set_source_membership(uv_udp_t* handle,
952 const char* multicast_addr,
953 const char* interface_addr,
954 const char* source_addr,
955 uv_membership membership) {
956 #if !defined(__OpenBSD__) && \
957 !defined(__NetBSD__) && \
958 !defined(__ANDROID__) && \
959 !defined(__DragonFly__) && \
960 !defined(__QNX__) && \
961 !defined(__GNU__)
962 int err;
963 union uv__sockaddr mcast_addr;
964 union uv__sockaddr src_addr;
965
966 err = uv_ip4_addr(multicast_addr, 0, &mcast_addr.in);
967 if (err) {
968 err = uv_ip6_addr(multicast_addr, 0, &mcast_addr.in6);
969 if (err)
970 return err;
971 err = uv_ip6_addr(source_addr, 0, &src_addr.in6);
972 if (err)
973 return err;
974 return uv__udp_set_source_membership6(handle,
975 &mcast_addr.in6,
976 interface_addr,
977 &src_addr.in6,
978 membership);
979 }
980
981 err = uv_ip4_addr(source_addr, 0, &src_addr.in);
982 if (err)
983 return err;
984 return uv__udp_set_source_membership4(handle,
985 &mcast_addr.in,
986 interface_addr,
987 &src_addr.in,
988 membership);
989 #else
990 return UV_ENOSYS;
991 #endif
992 }
993
994
995 static int uv__setsockopt(uv_udp_t* handle,
996 int option4,
997 int option6,
998 const void* val,
999 socklen_t size) {
1000 int r;
1001
1002 if (handle->flags & UV_HANDLE_IPV6)
1003 r = setsockopt(handle->io_watcher.fd,
1004 IPPROTO_IPV6,
1005 option6,
1006 val,
1007 size);
1008 else
1009 r = setsockopt(handle->io_watcher.fd,
1010 IPPROTO_IP,
1011 option4,
1012 val,
1013 size);
1014 if (r)
1015 return UV__ERR(errno);
1016
1017 return 0;
1018 }
1019
1020 static int uv__setsockopt_maybe_char(uv_udp_t* handle,
1021 int option4,
1022 int option6,
1023 int val) {
1024 #if defined(__sun) || defined(_AIX) || defined(__MVS__)
1025 char arg = val;
1026 #elif defined(__OpenBSD__)
1027 unsigned char arg = val;
1028 #else
1029 int arg = val;
1030 #endif
1031
1032 if (val < 0 || val > 255)
1033 return UV_EINVAL;
1034
1035 return uv__setsockopt(handle, option4, option6, &arg, sizeof(arg));
1036 }
1037
1038
1039 int uv_udp_set_broadcast(uv_udp_t* handle, int on) {
1040 if (setsockopt(handle->io_watcher.fd,
1041 SOL_SOCKET,
1042 SO_BROADCAST,
1043 &on,
1044 sizeof(on))) {
1045 return UV__ERR(errno);
1046 }
1047
1048 return 0;
1049 }
1050
1051
1052 int uv_udp_set_ttl(uv_udp_t* handle, int ttl) {
1053 if (ttl < 1 || ttl > 255)
1054 return UV_EINVAL;
1055
1056 #if defined(__MVS__)
1057 if (!(handle->flags & UV_HANDLE_IPV6))
1058 return UV_ENOTSUP; /* zOS does not support setting ttl for IPv4 */
1059 #endif
1060
1061 /*
1062 * On Solaris and derivatives such as SmartOS, the length of socket options
1063 * is sizeof(int) for IP_TTL and IPV6_UNICAST_HOPS,
1064 * so hardcode the size of these options on this platform,
1065 * and use the general uv__setsockopt_maybe_char call on other platforms.
1066 */
1067 #if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
1068 defined(__MVS__) || defined(__QNX__)
1069
1070 return uv__setsockopt(handle,
1071 IP_TTL,
1072 IPV6_UNICAST_HOPS,
1073 &ttl,
1074 sizeof(ttl));
1075
1076 #else /* !(defined(__sun) || defined(_AIX) || defined (__OpenBSD__) ||
1077 defined(__MVS__) || defined(__QNX__)) */
1078
1079 return uv__setsockopt_maybe_char(handle,
1080 IP_TTL,
1081 IPV6_UNICAST_HOPS,
1082 ttl);
1083
1084 #endif /* defined(__sun) || defined(_AIX) || defined (__OpenBSD__) ||
1085 defined(__MVS__) || defined(__QNX__) */
1086 }
1087
1088
1089 int uv_udp_set_multicast_ttl(uv_udp_t* handle, int ttl) {
1090 /*
1091 * On Solaris and derivatives such as SmartOS, the length of socket options
1092 * is sizeof(int) for IPV6_MULTICAST_HOPS and sizeof(char) for
1093 * IP_MULTICAST_TTL, so hardcode the size of the option in the IPv6 case,
1094 * and use the general uv__setsockopt_maybe_char call otherwise.
1095 */
1096 #if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
1097 defined(__MVS__) || defined(__QNX__)
1098 if (handle->flags & UV_HANDLE_IPV6)
1099 return uv__setsockopt(handle,
1100 IP_MULTICAST_TTL,
1101 IPV6_MULTICAST_HOPS,
1102 &ttl,
1103 sizeof(ttl));
1104 #endif /* defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
1105 defined(__MVS__) || defined(__QNX__) */
1106
1107 return uv__setsockopt_maybe_char(handle,
1108 IP_MULTICAST_TTL,
1109 IPV6_MULTICAST_HOPS,
1110 ttl);
1111 }
1112
1113
1114 int uv_udp_set_multicast_loop(uv_udp_t* handle, int on) {
1115 /*
1116 * On Solaris and derivatives such as SmartOS, the length of socket options
1117 * is sizeof(int) for IPV6_MULTICAST_LOOP and sizeof(char) for
1118 * IP_MULTICAST_LOOP, so hardcode the size of the option in the IPv6 case,
1119 * and use the general uv__setsockopt_maybe_char call otherwise.
1120 */
1121 #if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
1122 defined(__MVS__) || defined(__QNX__)
1123 if (handle->flags & UV_HANDLE_IPV6)
1124 return uv__setsockopt(handle,
1125 IP_MULTICAST_LOOP,
1126 IPV6_MULTICAST_LOOP,
1127 &on,
1128 sizeof(on));
1129 #endif /* defined(__sun) || defined(_AIX) ||defined(__OpenBSD__) ||
1130 defined(__MVS__) || defined(__QNX__) */
1131
1132 return uv__setsockopt_maybe_char(handle,
1133 IP_MULTICAST_LOOP,
1134 IPV6_MULTICAST_LOOP,
1135 on);
1136 }
1137
1138 int uv_udp_set_multicast_interface(uv_udp_t* handle, const char* interface_addr) {
1139 struct sockaddr_storage addr_st;
1140 struct sockaddr_in* addr4;
1141 struct sockaddr_in6* addr6;
1142
1143 addr4 = (struct sockaddr_in*) &addr_st;
1144 addr6 = (struct sockaddr_in6*) &addr_st;
1145
1146 if (!interface_addr) {
1147 memset(&addr_st, 0, sizeof addr_st);
1148 if (handle->flags & UV_HANDLE_IPV6) {
1149 addr_st.ss_family = AF_INET6;
1150 addr6->sin6_scope_id = 0;
1151 } else {
1152 addr_st.ss_family = AF_INET;
1153 addr4->sin_addr.s_addr = htonl(INADDR_ANY);
1154 }
1155 } else if (uv_ip4_addr(interface_addr, 0, addr4) == 0) {
1156 /* nothing, address was parsed */
1157 } else if (uv_ip6_addr(interface_addr, 0, addr6) == 0) {
1158 /* nothing, address was parsed */
1159 } else {
1160 return UV_EINVAL;
1161 }
1162
1163 if (addr_st.ss_family == AF_INET) {
1164 if (setsockopt(handle->io_watcher.fd,
1165 IPPROTO_IP,
1166 IP_MULTICAST_IF,
1167 (void*) &addr4->sin_addr,
1168 sizeof(addr4->sin_addr)) == -1) {
1169 return UV__ERR(errno);
1170 }
1171 } else if (addr_st.ss_family == AF_INET6) {
1172 if (setsockopt(handle->io_watcher.fd,
1173 IPPROTO_IPV6,
1174 IPV6_MULTICAST_IF,
1175 &addr6->sin6_scope_id,
1176 sizeof(addr6->sin6_scope_id)) == -1) {
1177 return UV__ERR(errno);
1178 }
1179 } else {
1180 assert(0 && "unexpected address family");
1181 abort();
1182 }
1183
1184 return 0;
1185 }
1186
1187 int uv_udp_getpeername(const uv_udp_t* handle,
1188 struct sockaddr* name,
1189 int* namelen) {
1190
1191 return uv__getsockpeername((const uv_handle_t*) handle,
1192 getpeername,
1193 name,
1194 namelen);
1195 }
1196
1197 int uv_udp_getsockname(const uv_udp_t* handle,
1198 struct sockaddr* name,
1199 int* namelen) {
1200
1201 return uv__getsockpeername((const uv_handle_t*) handle,
1202 getsockname,
1203 name,
1204 namelen);
1205 }
1206
1207
1208 int uv__udp_recv_start(uv_udp_t* handle,
1209 uv_alloc_cb alloc_cb,
1210 uv_udp_recv_cb recv_cb) {
1211 int err;
1212
1213 if (alloc_cb == NULL || recv_cb == NULL)
1214 return UV_EINVAL;
1215
1216 if (uv__io_active(&handle->io_watcher, POLLIN))
1217 return UV_EALREADY; /* FIXME(bnoordhuis) Should be UV_EBUSY. */
1218
1219 err = uv__udp_maybe_deferred_bind(handle, AF_INET, 0);
1220 if (err)
1221 return err;
1222
1223 handle->alloc_cb = alloc_cb;
1224 handle->recv_cb = recv_cb;
1225
1226 uv__io_start(handle->loop, &handle->io_watcher, POLLIN);
1227 uv__handle_start(handle);
1228
1229 return 0;
1230 }
1231
1232
1233 int uv__udp_recv_stop(uv_udp_t* handle) {
1234 uv__io_stop(handle->loop, &handle->io_watcher, POLLIN);
1235
1236 if (!uv__io_active(&handle->io_watcher, POLLOUT))
1237 uv__handle_stop(handle);
1238
1239 handle->alloc_cb = NULL;
1240 handle->recv_cb = NULL;
1241
1242 return 0;
1243 }
1244
1245
1246 static int uv__udp_prep_pkt(struct msghdr* h,
1247 const uv_buf_t* bufs,
1248 const unsigned int nbufs,
1249 const struct sockaddr* addr) {
1250 memset(h, 0, sizeof(*h));
1251 h->msg_name = (void*) addr;
1252 h->msg_iov = (void*) bufs;
1253 h->msg_iovlen = nbufs;
1254 if (addr == NULL)
1255 return 0;
1256 switch (addr->sa_family) {
1257 case AF_INET:
1258 h->msg_namelen = sizeof(struct sockaddr_in);
1259 return 0;
1260 case AF_INET6:
1261 h->msg_namelen = sizeof(struct sockaddr_in6);
1262 return 0;
1263 case AF_UNIX:
1264 h->msg_namelen = sizeof(struct sockaddr_un);
1265 return 0;
1266 case AF_UNSPEC:
1267 h->msg_name = NULL;
1268 return 0;
1269 }
1270 return UV_EINVAL;
1271 }
1272
1273
1274 static int uv__udp_sendmsg1(int fd,
1275 const uv_buf_t* bufs,
1276 unsigned int nbufs,
1277 const struct sockaddr* addr) {
1278 struct msghdr h;
1279 int r;
1280
1281 if ((r = uv__udp_prep_pkt(&h, bufs, nbufs, addr)))
1282 return r;
1283
1284 do
1285 r = sendmsg(fd, &h, 0);
1286 while (r == -1 && errno == EINTR);
1287
1288 if (r < 0) {
1289 r = UV__ERR(errno);
1290 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
1291 r = UV_EAGAIN;
1292 return r;
1293 }
1294
1295 /* UDP sockets don't EOF so we don't have to handle r=0 specially,
1296 * that only happens when the input was a zero-sized buffer.
1297 */
1298 return 1;
1299 }
1300
1301
1302 static int uv__udp_sendmsgv(int fd,
1303 unsigned int count,
1304 uv_buf_t* bufs[/*count*/],
1305 unsigned int nbufs[/*count*/],
1306 struct sockaddr* addrs[/*count*/]) {
1307 unsigned int i;
1308 int nsent;
1309 int r;
1310
1311 r = 0;
1312 nsent = 0;
1313
1314 #if defined(__linux__) || defined(__FreeBSD__) || defined(__APPLE__) || \
1315 (defined(__sun__) && defined(MSG_WAITFORONE))
1316 if (count > 1) {
1317 for (i = 0; i < count; /*empty*/) {
1318 struct mmsghdr m[20];
1319 unsigned int n;
1320
1321 for (n = 0; i < count && n < ARRAY_SIZE(m); i++, n++)
1322 if ((r = uv__udp_prep_pkt(&m[n].msg_hdr, bufs[i], nbufs[i], addrs[i])))
1323 goto exit;
1324
1325 do
1326 #if defined(__APPLE__)
1327 r = sendmsg_x(fd, m, n, MSG_DONTWAIT);
1328 #else
1329 r = sendmmsg(fd, m, n, 0);
1330 #endif
1331 while (r == -1 && errno == EINTR);
1332
1333 if (r < 1)
1334 goto exit;
1335
1336 nsent += r;
1337 i += r;
1338 }
1339
1340 goto exit;
1341 }
1342 #endif /* defined(__linux__) || defined(__FreeBSD__) || defined(__APPLE__) ||
1343 * (defined(__sun__) && defined(MSG_WAITFORONE))
1344 */
1345
1346 for (i = 0; i < count; i++, nsent++)
1347 if ((r = uv__udp_sendmsg1(fd, bufs[i], nbufs[i], addrs[i])))
1348 goto exit; /* goto to avoid unused label warning. */
1349
1350 exit:
1351
1352 if (nsent > 0)
1353 return nsent;
1354
1355 if (r < 0) {
1356 r = UV__ERR(errno);
1357 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
1358 r = UV_EAGAIN;
1359 }
1360
1361 return r;
1362 }
1363
1364
1365 static void uv__udp_sendmsg(uv_udp_t* handle) {
1366 static const int N = 20;
1367 struct sockaddr* addrs[N];
1368 unsigned int nbufs[N];
1369 uv_buf_t* bufs[N];
1370 struct uv__queue* q;
1371 uv_udp_send_t* req;
1372 int n;
1373
1374 if (uv__queue_empty(&handle->write_queue))
1375 return;
1376
1377 again:
1378 n = 0;
1379 q = uv__queue_head(&handle->write_queue);
1380 do {
1381 req = uv__queue_data(q, uv_udp_send_t, queue);
1382 addrs[n] = &req->u.addr;
1383 nbufs[n] = req->nbufs;
1384 bufs[n] = req->bufs;
1385 q = uv__queue_next(q);
1386 n++;
1387 } while (n < N && q != &handle->write_queue);
1388
1389 n = uv__udp_sendmsgv(handle->io_watcher.fd, n, bufs, nbufs, addrs);
1390 while (n > 0) {
1391 q = uv__queue_head(&handle->write_queue);
1392 req = uv__queue_data(q, uv_udp_send_t, queue);
1393 req->status = uv__count_bufs(req->bufs, req->nbufs);
1394 uv__queue_remove(&req->queue);
1395 uv__queue_insert_tail(&handle->write_completed_queue, &req->queue);
1396 n--;
1397 }
1398
1399 if (n == 0) {
1400 if (uv__queue_empty(&handle->write_queue))
1401 goto feed;
1402 goto again;
1403 }
1404
1405 if (n == UV_EAGAIN)
1406 return;
1407
1408 /* Register the error against first request in queue because that
1409 * is the request that uv__udp_sendmsgv tried but failed to send,
1410 * because if it did send any requests, it won't return an error.
1411 */
1412 q = uv__queue_head(&handle->write_queue);
1413 req = uv__queue_data(q, uv_udp_send_t, queue);
1414 req->status = n;
1415 uv__queue_remove(&req->queue);
1416 uv__queue_insert_tail(&handle->write_completed_queue, &req->queue);
1417 feed:
1418 uv__io_feed(handle->loop, &handle->io_watcher);
1419 }
1420
1421
1422 int uv__udp_try_send2(uv_udp_t* handle,
1423 unsigned int count,
1424 uv_buf_t* bufs[/*count*/],
1425 unsigned int nbufs[/*count*/],
1426 struct sockaddr* addrs[/*count*/]) {
1427 int fd;
1428
1429 fd = handle->io_watcher.fd;
1430 if (fd == -1)
1431 return UV_EINVAL;
1432
1433 return uv__udp_sendmsgv(fd, count, bufs, nbufs, addrs);
1434 }