changeset 62:ea9ef388ab97

[Seobeo] Fixed issues with epoll or kqeue in different threads. Initizlied the event looop inside of the thread itself.
author June Park <parkjune1995@gmail.com>
date Tue, 23 Dec 2025 11:48:11 -0800
parents 9df5587cf23b
children fff1b048dda6
files seobeo/os/s_linux_edge.c seobeo/os/s_macos_edge.c seobeo/s_linux_network.c seobeo/s_web.c seobeo/seobeo_internal.h
diffstat 5 files changed, 126 insertions(+), 74 deletions(-) [+]
line wrap: on
line diff
--- a/seobeo/os/s_linux_edge.c	Sat Dec 20 21:07:34 2025 -0500
+++ b/seobeo/os/s_linux_edge.c	Tue Dec 23 11:48:11 2025 -0800
@@ -2,14 +2,34 @@
 #include "seobeo/seobeo.h"
 
 
-void *Seobeo_Web_Edge_Worker(void *vargs) {
+void *Seobeo_Web_Edge_Worker(void *vargs)
+{
   WorkerArgs *args = vargs;
   const int max_events = 64;
   struct epoll_event events[max_events];
 
+  // Each thread creates its own epoll to avoid race conditions
+  int epfd = epoll_create1(0);
+  if (epfd < 0) {
+    perror("epoll_create1");
+    return NULL;
+  }
+
+  // Add server socket to this thread's epoll
+  struct epoll_event ev = {
+    .events = EPOLLIN | EPOLLET,
+    .data.ptr = args->srv
+  };
+  if (epoll_ctl(epfd, EPOLL_CTL_ADD, args->srv->socket, &ev) < 0) {
+    perror("epoll_ctl ADD server");
+    close(epfd);
+    return NULL;
+  }
+
   while (1) {
-    int n = epoll_wait(args->evfd, events, max_events, -1);
+    int n = epoll_wait(epfd, events, max_events, -1);
     if (n < 0) {
+      if (errno == EINTR) continue;
       perror("epoll_wait");
       continue;
     }
@@ -18,6 +38,7 @@
       Seobeo_PHandle phandle = events[i].data.ptr;
 
       if (phandle == args->srv) {
+        // Accept all pending connections (edge-triggered mode)
         while (1) {
           Seobeo_PHandle p_cli_handle = Seobeo_Stream_Handle_Server_Accept(args->srv);
           if (!p_cli_handle) break;
@@ -26,62 +47,54 @@
             .events = EPOLLIN | EPOLLET,
             .data.ptr = p_cli_handle
           };
-          if (epoll_ctl(args->evfd, EPOLL_CTL_ADD, p_cli_handle->socket, &client_ev) < 0)
+          if (epoll_ctl(epfd, EPOLL_CTL_ADD, p_cli_handle->socket, &client_ev) < 0)
           {
             perror("epoll_ctl ADD client");
             Seobeo_Handle_Destroy(p_cli_handle);
           }
         }
       } else {
+        // Remove from epoll first
+        epoll_ctl(epfd, EPOLL_CTL_DEL, phandle->socket, NULL);
+
+        // Handle request (this function destroys the handle internally)
         Seobeo_Web_HandleClientRequest(phandle, args->cache);
-        epoll_ctl(args->evfd, EPOLL_CTL_DEL, phandle->socket, NULL);
-        Seobeo_Handle_Destroy(phandle);
       }
     }
   }
 
+  close(epfd);
   return NULL;
 }
 
 void Seobeo_Web_Edge(
     Seobeo_PHandle p_server_handle,
     int thread_count,
-    Dowa_PHashMap p_html_cache) {
-
-  int epfd = epoll_create1(0);
-  if (epfd < 0) {
-    perror("epoll_create1");
-    return;
-  }
-
-  struct epoll_event ev = {
-    .events = EPOLLIN | EPOLLET,
-    .data.ptr = p_server_handle
-  };
-  if (epoll_ctl(epfd, EPOLL_CTL_ADD, p_server_handle->socket, &ev) < 0) {
-    perror("epoll_ctl ADD server");
-    close(epfd);
-    return;
-  }
-
+    Dowa_PHashMap p_html_cache)
+{
   pthread_attr_t attr;
   pthread_attr_init(&attr);
   pthread_attr_setstacksize(&attr, 5 * 1024 * 1024); // 5 MB
 
+  pthread_t threads[thread_count];
   for (int i = 0; i < thread_count; i++) {
     WorkerArgs *args = malloc(sizeof(WorkerArgs));
-    *args = (WorkerArgs){ p_server_handle, p_html_cache, epfd };
+    *args = (WorkerArgs){ p_server_handle, p_html_cache };
 
-    pthread_t tid;
-    pthread_create(&tid, &attr, Seobeo_Web_Edge_Worker, args);
-    pthread_detach(tid);
+    pthread_create(&threads[i], &attr, Seobeo_Web_Edge_Worker, args);
   }
 
-  while (1) pause();
+  // Join threads instead of detaching for proper cleanup
+  for (int i = 0; i < thread_count; i++) {
+    pthread_join(threads[i], NULL);
+  }
+
+  pthread_attr_destroy(&attr);
 }
 
 
-void Seobeo_Web_Edge_2(Seobeo_PHandle p_handle_server, Dowa_PHashMap cache) {
+void Seobeo_Web_Edge_2(Seobeo_PHandle p_handle_server, Dowa_PHashMap cache)
+{
   const int MAX_EVENTS = 1024;
   struct epoll_event events[MAX_EVENTS];
   char keybuf[32];
@@ -135,6 +148,7 @@
           if (epoll_ctl(epfd, EPOLL_CTL_ADD, p_handle_client->socket, &client_ev) < 0)
           {
             perror("epoll_ctl ADD client");
+            Seobeo_Handle_Destroy(p_handle_client); 
             continue;
           }
 
@@ -155,10 +169,18 @@
         continue;
       }
 
+      // Remove from epoll
+      epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
+
+      // Handle request (this function destroys the handle internally)
       Seobeo_Web_HandleClientRequest(p_handle_client, cache);
+
+      // Remove from hashmap (handle is already destroyed by HandleClientRequest)
+      Dowa_HashMap_Pop_Key(handles, keybuf);
     }
   }
 
   close(epfd);
+  Dowa_HashMap_Destroy(handles);
 }
 
--- a/seobeo/os/s_macos_edge.c	Sat Dec 20 21:07:34 2025 -0500
+++ b/seobeo/os/s_macos_edge.c	Tue Dec 23 11:48:11 2025 -0800
@@ -6,41 +6,66 @@
 {
   WorkerArgs *args = vargs;
   struct kevent evlist[64];
+
+  // Each thread creates its own kqueue to avoid race conditions
+  int kq = kqueue();
+  if (kq < 0) {
+    perror("kqueue");
+    return NULL;
+  }
+
+  // Add server socket to this thread's kqueue
+  struct kevent kev = {
+    .ident  = args->srv->socket,
+    .filter = EVFILT_READ,
+    .flags  = EV_ADD,
+    .udata  = args->srv
+  };
+  kevent(kq, &kev, 1, NULL, 0, NULL);
+
   while (1)
   {
-    int ne = kevent(args->evfd, NULL, 0, evlist, 64, NULL);
-    if (ne < 0) continue;
+    int ne = kevent(kq, NULL, 0, evlist, 64, NULL);
+    if (ne < 0) {
+      if (errno == EINTR) continue;
+      perror("kevent");
+      continue;
+    }
 
     for (int i = 0; i < ne; i++)
     {
       Seobeo_PHandle h = evlist[i].udata;
       if (h == args->srv)
       {
-        Seobeo_PHandle cli =
-          Seobeo_Stream_Handle_Server_Accept(args->srv);
+        // Accept new connections in a loop (for edge-triggered behavior)
+        while (1) {
+          Seobeo_PHandle cli = Seobeo_Stream_Handle_Server_Accept(args->srv);
+          if (!cli) break;
 
-        if (!cli) continue;
-        struct kevent kev = {
-          .ident  = cli->socket,
+          struct kevent client_kev = {
+            .ident  = cli->socket,
+            .filter = EVFILT_READ,
+            .flags  = EV_ADD | EV_ONESHOT,
+            .udata  = cli
+          };
+          kevent(kq, &client_kev, 1, NULL, 0, NULL);
+        }
+      } else {
+        // Remove from kqueue first
+        struct kevent del_kev = {
+          .ident  = h->socket,
           .filter = EVFILT_READ,
-          .flags  = EV_ADD | EV_ONESHOT,
-          .udata  = cli
+          .flags  = EV_DELETE,
         };
-        kevent(args->evfd, &kev, 1, NULL, 0, NULL);
-      } else {
-        if (h != args->srv) {
-          struct kevent kev = {
-            .ident  = h->socket,
-            .filter = EVFILT_READ,
-            .flags  = EV_DELETE,
-          };
-          kevent(args->evfd, &kev, 1, NULL, 0, NULL); // Remove from kqueue first
-        
-          Seobeo_Web_HandleClientRequest(h, args->cache); // this frees 
-        }
+        kevent(kq, &del_kev, 1, NULL, 0, NULL);
+
+        // Handle request (this function destroys the handle internally)
+        Seobeo_Web_HandleClientRequest(h, args->cache);
       }
     }
   }
+
+  close(kq);
   return NULL;
 }
 
@@ -49,15 +74,6 @@
     int            thread_count,
     Dowa_PHashMap  p_html_cache)
 {
-  int kq = kqueue();
-  struct kevent kev = {
-    .ident  = p_server_handle->socket,
-    .filter = EVFILT_READ,
-    .flags  = EV_ADD,
-    .udata  = p_server_handle
-  };
-  kevent(kq, &kev, 1, NULL, 0, NULL);
-
   pthread_attr_t attr;
   pthread_attr_init(&attr);
   pthread_attr_setstacksize(&attr, 5 * 1024 * 1024); // 5 MB
@@ -66,13 +82,15 @@
   for (int i = 0; i < thread_count; i++)
   {
     WorkerArgs *args = malloc(sizeof(WorkerArgs));
-    *args = (WorkerArgs){ p_server_handle, p_html_cache, kq };
+    *args = (WorkerArgs){ p_server_handle, p_html_cache };
 
-    pthread_create(&threads[i], NULL, Seobeo_Web_Edge_Worker, args);
+    pthread_create(&threads[i], &attr, Seobeo_Web_Edge_Worker, args);
   }
   for (int i = 0; i < thread_count; i++)
   {
     pthread_join(threads[i], NULL);
   }
+
+  pthread_attr_destroy(&attr);
   return;
 }
--- a/seobeo/s_linux_network.c	Sat Dec 20 21:07:34 2025 -0500
+++ b/seobeo/s_linux_network.c	Tue Dec 23 11:48:11 2025 -0800
@@ -28,7 +28,14 @@
     { perror("socket"); continue; }
 
      if (setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)) == -1)
-     { perror("setsockopt"); continue; }
+     { perror("setsockopt SO_REUSEADDR"); continue; }
+
+#ifdef SO_REUSEPORT
+     // SO_REUSEPORT allows multiple threads/processes to bind to the same port
+     // The kernel will distribute incoming connections among them
+     if (setsockopt(socket_fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes)) == -1)
+     { perror("setsockopt SO_REUSEPORT"); continue; }
+#endif
 
      if (bind(socket_fd, free_server_info->ai_addr, free_server_info->ai_addrlen) == -1)
      { perror("v_network: Couldn't make socket non-blocking\n"); continue; }
@@ -182,7 +189,11 @@
   p_client_handle->write_buffer_len      = 0;
   p_client_handle->write_buffer          = malloc(p_client_handle->write_buffer_capacity);
 
-  p_client_handle->destroyed = false;
+  p_client_handle->read_buffer_used      = 0;
+  p_client_handle->file                  = NULL;
+  p_client_handle->text_copy             = NULL;
+  p_client_handle->file_name             = NULL;
+  p_client_handle->destroyed             = false;
 
   return p_client_handle;
 }
@@ -314,15 +325,17 @@
     return 0;
   }
 
-  if (!p_handle) {
+  if (!p_handle)
+  {
     printf("[ERROR] p_handle is NULL before memcpy\n");
-    return;
+    return -1;
   }
   
-  if (!p_handle->write_buffer) {
+  if (!p_handle->write_buffer)
+  {
     printf("[ERROR] p_handle->write_buffer is NULL (len=%zu, size=%zu)\n",
             p_handle->write_buffer_len, data_size);
-    return;
+    return -1;
   }
   
   printf("[DEBUG] memcpy -> dest=%p (write_buffer=%p + offset=%zu), src=%p, size=%zu\n",
--- a/seobeo/s_web.c	Sat Dec 20 21:07:34 2025 -0500
+++ b/seobeo/s_web.c	Tue Dec 23 11:48:11 2025 -0800
@@ -252,11 +252,11 @@
 
 clean_up:
   printf("clean up\n\n");
+  if (p_cli_handle)
+    Seobeo_Handle_Destroy(p_cli_handle);
+  if (p_response_arena)
+    Dowa_Arena_Destroy(p_response_arena);
   return;
-//  if (p_cli_handle)
-//    Seobeo_Handle_Destroy(p_cli_handle);
-//  if (p_response_arena)
-//    Dowa_Arena_Destroy(p_response_arena);
 }
 
 
@@ -503,8 +503,8 @@
   if (mode == SEOBEO_MODE_EDGE)
   {
     printf("EDGE MODE\n");
-    Seobeo_Web_Edge_2(p_server_handle, p_html_cache);
-    // Seobeo_Web_Edge(p_server_handle, thread_count, p_html_cache);
+    // Seobeo_Web_Edge_2(p_server_handle, p_html_cache);
+    Seobeo_Web_Edge(p_server_handle, thread_count, p_html_cache);
   }
 
   return -1;
--- a/seobeo/seobeo_internal.h	Sat Dec 20 21:07:34 2025 -0500
+++ b/seobeo/seobeo_internal.h	Tue Dec 23 11:48:11 2025 -0800
@@ -42,7 +42,6 @@
 typedef struct {
   Seobeo_PHandle  srv;
   Dowa_PHashMap   cache;
-  int             evfd;    // epoll‐fd or kqueue‐fd
 } WorkerArgs;
 
 typedef enum {