Mercurial
changeset 62:ea9ef388ab97
[Seobeo] Fixed issues with epoll or kqeue in different threads. Initizlied the event looop inside of the thread itself.
| author | June Park <parkjune1995@gmail.com> |
|---|---|
| date | Tue, 23 Dec 2025 11:48:11 -0800 |
| parents | 9df5587cf23b |
| children | fff1b048dda6 |
| files | seobeo/os/s_linux_edge.c seobeo/os/s_macos_edge.c seobeo/s_linux_network.c seobeo/s_web.c seobeo/seobeo_internal.h |
| diffstat | 5 files changed, 126 insertions(+), 74 deletions(-) [+] |
line wrap: on
line diff
--- a/seobeo/os/s_linux_edge.c Sat Dec 20 21:07:34 2025 -0500 +++ b/seobeo/os/s_linux_edge.c Tue Dec 23 11:48:11 2025 -0800 @@ -2,14 +2,34 @@ #include "seobeo/seobeo.h" -void *Seobeo_Web_Edge_Worker(void *vargs) { +void *Seobeo_Web_Edge_Worker(void *vargs) +{ WorkerArgs *args = vargs; const int max_events = 64; struct epoll_event events[max_events]; + // Each thread creates its own epoll to avoid race conditions + int epfd = epoll_create1(0); + if (epfd < 0) { + perror("epoll_create1"); + return NULL; + } + + // Add server socket to this thread's epoll + struct epoll_event ev = { + .events = EPOLLIN | EPOLLET, + .data.ptr = args->srv + }; + if (epoll_ctl(epfd, EPOLL_CTL_ADD, args->srv->socket, &ev) < 0) { + perror("epoll_ctl ADD server"); + close(epfd); + return NULL; + } + while (1) { - int n = epoll_wait(args->evfd, events, max_events, -1); + int n = epoll_wait(epfd, events, max_events, -1); if (n < 0) { + if (errno == EINTR) continue; perror("epoll_wait"); continue; } @@ -18,6 +38,7 @@ Seobeo_PHandle phandle = events[i].data.ptr; if (phandle == args->srv) { + // Accept all pending connections (edge-triggered mode) while (1) { Seobeo_PHandle p_cli_handle = Seobeo_Stream_Handle_Server_Accept(args->srv); if (!p_cli_handle) break; @@ -26,62 +47,54 @@ .events = EPOLLIN | EPOLLET, .data.ptr = p_cli_handle }; - if (epoll_ctl(args->evfd, EPOLL_CTL_ADD, p_cli_handle->socket, &client_ev) < 0) + if (epoll_ctl(epfd, EPOLL_CTL_ADD, p_cli_handle->socket, &client_ev) < 0) { perror("epoll_ctl ADD client"); Seobeo_Handle_Destroy(p_cli_handle); } } } else { + // Remove from epoll first + epoll_ctl(epfd, EPOLL_CTL_DEL, phandle->socket, NULL); + + // Handle request (this function destroys the handle internally) Seobeo_Web_HandleClientRequest(phandle, args->cache); - epoll_ctl(args->evfd, EPOLL_CTL_DEL, phandle->socket, NULL); - Seobeo_Handle_Destroy(phandle); } } } + close(epfd); return NULL; } void Seobeo_Web_Edge( Seobeo_PHandle p_server_handle, int thread_count, - Dowa_PHashMap p_html_cache) { - - int epfd = epoll_create1(0); - if (epfd < 0) { - perror("epoll_create1"); - return; - } - - struct epoll_event ev = { - .events = EPOLLIN | EPOLLET, - .data.ptr = p_server_handle - }; - if (epoll_ctl(epfd, EPOLL_CTL_ADD, p_server_handle->socket, &ev) < 0) { - perror("epoll_ctl ADD server"); - close(epfd); - return; - } - + Dowa_PHashMap p_html_cache) +{ pthread_attr_t attr; pthread_attr_init(&attr); pthread_attr_setstacksize(&attr, 5 * 1024 * 1024); // 5 MB + pthread_t threads[thread_count]; for (int i = 0; i < thread_count; i++) { WorkerArgs *args = malloc(sizeof(WorkerArgs)); - *args = (WorkerArgs){ p_server_handle, p_html_cache, epfd }; + *args = (WorkerArgs){ p_server_handle, p_html_cache }; - pthread_t tid; - pthread_create(&tid, &attr, Seobeo_Web_Edge_Worker, args); - pthread_detach(tid); + pthread_create(&threads[i], &attr, Seobeo_Web_Edge_Worker, args); } - while (1) pause(); + // Join threads instead of detaching for proper cleanup + for (int i = 0; i < thread_count; i++) { + pthread_join(threads[i], NULL); + } + + pthread_attr_destroy(&attr); } -void Seobeo_Web_Edge_2(Seobeo_PHandle p_handle_server, Dowa_PHashMap cache) { +void Seobeo_Web_Edge_2(Seobeo_PHandle p_handle_server, Dowa_PHashMap cache) +{ const int MAX_EVENTS = 1024; struct epoll_event events[MAX_EVENTS]; char keybuf[32]; @@ -135,6 +148,7 @@ if (epoll_ctl(epfd, EPOLL_CTL_ADD, p_handle_client->socket, &client_ev) < 0) { perror("epoll_ctl ADD client"); + Seobeo_Handle_Destroy(p_handle_client); continue; } @@ -155,10 +169,18 @@ continue; } + // Remove from epoll + epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); + + // Handle request (this function destroys the handle internally) Seobeo_Web_HandleClientRequest(p_handle_client, cache); + + // Remove from hashmap (handle is already destroyed by HandleClientRequest) + Dowa_HashMap_Pop_Key(handles, keybuf); } } close(epfd); + Dowa_HashMap_Destroy(handles); }
--- a/seobeo/os/s_macos_edge.c Sat Dec 20 21:07:34 2025 -0500 +++ b/seobeo/os/s_macos_edge.c Tue Dec 23 11:48:11 2025 -0800 @@ -6,41 +6,66 @@ { WorkerArgs *args = vargs; struct kevent evlist[64]; + + // Each thread creates its own kqueue to avoid race conditions + int kq = kqueue(); + if (kq < 0) { + perror("kqueue"); + return NULL; + } + + // Add server socket to this thread's kqueue + struct kevent kev = { + .ident = args->srv->socket, + .filter = EVFILT_READ, + .flags = EV_ADD, + .udata = args->srv + }; + kevent(kq, &kev, 1, NULL, 0, NULL); + while (1) { - int ne = kevent(args->evfd, NULL, 0, evlist, 64, NULL); - if (ne < 0) continue; + int ne = kevent(kq, NULL, 0, evlist, 64, NULL); + if (ne < 0) { + if (errno == EINTR) continue; + perror("kevent"); + continue; + } for (int i = 0; i < ne; i++) { Seobeo_PHandle h = evlist[i].udata; if (h == args->srv) { - Seobeo_PHandle cli = - Seobeo_Stream_Handle_Server_Accept(args->srv); + // Accept new connections in a loop (for edge-triggered behavior) + while (1) { + Seobeo_PHandle cli = Seobeo_Stream_Handle_Server_Accept(args->srv); + if (!cli) break; - if (!cli) continue; - struct kevent kev = { - .ident = cli->socket, + struct kevent client_kev = { + .ident = cli->socket, + .filter = EVFILT_READ, + .flags = EV_ADD | EV_ONESHOT, + .udata = cli + }; + kevent(kq, &client_kev, 1, NULL, 0, NULL); + } + } else { + // Remove from kqueue first + struct kevent del_kev = { + .ident = h->socket, .filter = EVFILT_READ, - .flags = EV_ADD | EV_ONESHOT, - .udata = cli + .flags = EV_DELETE, }; - kevent(args->evfd, &kev, 1, NULL, 0, NULL); - } else { - if (h != args->srv) { - struct kevent kev = { - .ident = h->socket, - .filter = EVFILT_READ, - .flags = EV_DELETE, - }; - kevent(args->evfd, &kev, 1, NULL, 0, NULL); // Remove from kqueue first - - Seobeo_Web_HandleClientRequest(h, args->cache); // this frees - } + kevent(kq, &del_kev, 1, NULL, 0, NULL); + + // Handle request (this function destroys the handle internally) + Seobeo_Web_HandleClientRequest(h, args->cache); } } } + + close(kq); return NULL; } @@ -49,15 +74,6 @@ int thread_count, Dowa_PHashMap p_html_cache) { - int kq = kqueue(); - struct kevent kev = { - .ident = p_server_handle->socket, - .filter = EVFILT_READ, - .flags = EV_ADD, - .udata = p_server_handle - }; - kevent(kq, &kev, 1, NULL, 0, NULL); - pthread_attr_t attr; pthread_attr_init(&attr); pthread_attr_setstacksize(&attr, 5 * 1024 * 1024); // 5 MB @@ -66,13 +82,15 @@ for (int i = 0; i < thread_count; i++) { WorkerArgs *args = malloc(sizeof(WorkerArgs)); - *args = (WorkerArgs){ p_server_handle, p_html_cache, kq }; + *args = (WorkerArgs){ p_server_handle, p_html_cache }; - pthread_create(&threads[i], NULL, Seobeo_Web_Edge_Worker, args); + pthread_create(&threads[i], &attr, Seobeo_Web_Edge_Worker, args); } for (int i = 0; i < thread_count; i++) { pthread_join(threads[i], NULL); } + + pthread_attr_destroy(&attr); return; }
--- a/seobeo/s_linux_network.c Sat Dec 20 21:07:34 2025 -0500 +++ b/seobeo/s_linux_network.c Tue Dec 23 11:48:11 2025 -0800 @@ -28,7 +28,14 @@ { perror("socket"); continue; } if (setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)) == -1) - { perror("setsockopt"); continue; } + { perror("setsockopt SO_REUSEADDR"); continue; } + +#ifdef SO_REUSEPORT + // SO_REUSEPORT allows multiple threads/processes to bind to the same port + // The kernel will distribute incoming connections among them + if (setsockopt(socket_fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes)) == -1) + { perror("setsockopt SO_REUSEPORT"); continue; } +#endif if (bind(socket_fd, free_server_info->ai_addr, free_server_info->ai_addrlen) == -1) { perror("v_network: Couldn't make socket non-blocking\n"); continue; } @@ -182,7 +189,11 @@ p_client_handle->write_buffer_len = 0; p_client_handle->write_buffer = malloc(p_client_handle->write_buffer_capacity); - p_client_handle->destroyed = false; + p_client_handle->read_buffer_used = 0; + p_client_handle->file = NULL; + p_client_handle->text_copy = NULL; + p_client_handle->file_name = NULL; + p_client_handle->destroyed = false; return p_client_handle; } @@ -314,15 +325,17 @@ return 0; } - if (!p_handle) { + if (!p_handle) + { printf("[ERROR] p_handle is NULL before memcpy\n"); - return; + return -1; } - if (!p_handle->write_buffer) { + if (!p_handle->write_buffer) + { printf("[ERROR] p_handle->write_buffer is NULL (len=%zu, size=%zu)\n", p_handle->write_buffer_len, data_size); - return; + return -1; } printf("[DEBUG] memcpy -> dest=%p (write_buffer=%p + offset=%zu), src=%p, size=%zu\n",
--- a/seobeo/s_web.c Sat Dec 20 21:07:34 2025 -0500 +++ b/seobeo/s_web.c Tue Dec 23 11:48:11 2025 -0800 @@ -252,11 +252,11 @@ clean_up: printf("clean up\n\n"); + if (p_cli_handle) + Seobeo_Handle_Destroy(p_cli_handle); + if (p_response_arena) + Dowa_Arena_Destroy(p_response_arena); return; -// if (p_cli_handle) -// Seobeo_Handle_Destroy(p_cli_handle); -// if (p_response_arena) -// Dowa_Arena_Destroy(p_response_arena); } @@ -503,8 +503,8 @@ if (mode == SEOBEO_MODE_EDGE) { printf("EDGE MODE\n"); - Seobeo_Web_Edge_2(p_server_handle, p_html_cache); - // Seobeo_Web_Edge(p_server_handle, thread_count, p_html_cache); + // Seobeo_Web_Edge_2(p_server_handle, p_html_cache); + Seobeo_Web_Edge(p_server_handle, thread_count, p_html_cache); } return -1;