summaryrefslogtreecommitdiff
path: root/httpd.c
diff options
context:
space:
mode:
authordefault <nobody@localhost>2023-02-06 11:29:46 +0100
committerdefault <nobody@localhost>2023-02-06 11:29:46 +0100
commit6bcc6bfa1ca871d2aae71e2b76d6eca36d24b5e5 (patch)
treebba1fa629f92f3ee5c70a64218159f0db09b62fd /httpd.c
parent2ee6bdc745735e11320dc248982c8d4ee92dcb71 (diff)
New functions job_post() and job_wait() (untested).
Diffstat (limited to 'httpd.c')
-rw-r--r--httpd.c86
1 files changed, 85 insertions, 1 deletions
diff --git a/httpd.c b/httpd.c
index d068af3..2708437 100644
--- a/httpd.c
+++ b/httpd.c
@@ -13,6 +13,7 @@
#include <setjmp.h>
#include <pthread.h>
+#include <semaphore.h>
/* nodeinfo 2.0 template */
@@ -319,6 +320,80 @@ static void *connection_thread(void *arg)
}
+/** job control **/
+
+/* mutex to access the lists of jobs */
+static pthread_mutex_t job_mutex;
+
+/* semaphre to trigger job processing */
+static sem_t job_sem;
+
+/* list of input sockets */
+xs_list *job_sockets = NULL;
+
+/* list of queue items */
+xs_list *job_qitems = NULL;
+
+
+void job_post(FILE *socket, const xs_dict *q_item)
+/* posts a job, being an input connection or another queue item */
+{
+ /* lock the mutex */
+ pthread_mutex_lock(&job_mutex);
+
+ /* add to the appropriate fifo */
+ if (socket != NULL) {
+ xs *d = xs_data_new(&socket, sizeof(FILE *));
+ job_sockets = xs_list_append(job_sockets, d);
+ }
+ else
+ if (q_item != NULL)
+ job_qitems = xs_list_append(job_qitems, q_item);
+
+ /* unlock the mutex */
+ pthread_mutex_unlock(&job_mutex);
+
+ /* ask for someone to attend it */
+ sem_post(&job_sem);
+}
+
+
+int job_wait(FILE **socket, xs_dict **q_item)
+/* waits for an available job; returns 0 if nothing left to do */
+{
+ int done = 1;
+
+ *socket = NULL;
+ *q_item = NULL;
+
+ if (sem_wait(&job_sem) == 0) {
+ /* lock the mutex */
+ pthread_mutex_lock(&job_mutex);
+
+ /* try first to get a socket to process */
+ xs *job_socket = NULL;
+ job_sockets = xs_list_shift(job_sockets, &job_socket);
+
+ /* if empty, try a q_item */
+ if (job_socket == NULL)
+ job_qitems = xs_list_shift(job_qitems, q_item);
+
+ /* unlock the mutex */
+ pthread_mutex_unlock(&job_mutex);
+
+ if (job_socket != NULL) {
+ xs_data_get(job_socket, socket);
+ done = 0;
+ }
+ else
+ if (*q_item != NULL)
+ done = 0;
+ }
+
+ return done;
+}
+
+
#ifndef MAX_THREADS
#define MAX_THREADS 256
#endif
@@ -348,8 +423,14 @@ void httpd(void)
srv_log(xs_fmt("httpd start %s:%d %s", address, port, USER_AGENT));
- /* thread creation */
+ /* initialize the job control engine */
+ pthread_mutex_init(&job_mutex, NULL);
+ sem_init(&job_sem, 0, 0);
+ job_sockets = xs_list_new();
+ job_qitems = xs_list_new();
+
#ifdef _SC_NPROCESSORS_ONLN
+ /* get number of CPUs on the machine */
n_threads = sysconf(_SC_NPROCESSORS_ONLN);
#endif
@@ -380,5 +461,8 @@ void httpd(void)
/* wait for the background thread to end */
pthread_join(threads[0], NULL);
+ job_sockets = xs_free(job_sockets);
+ job_qitems = xs_free(job_qitems);
+
srv_log(xs_fmt("httpd stop %s:%d", address, port));
}