summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordefault <nobody@localhost>2024-01-08 08:10:57 +0100
committerdefault <nobody@localhost>2024-01-08 08:10:57 +0100
commit93e7138e53628a76bed9583c5eb45ccf17b97e21 (patch)
treefe2b503c3843ec0c836b01821b0210e654a4de34
parent22cb139d5b320a5c7701d89b7ff1f31d13c00931 (diff)
Rewritten part of the job threads to be leaner and faster.
-rw-r--r--data.c2
-rw-r--r--httpd.c84
-rw-r--r--snac.h1
3 files changed, 46 insertions, 41 deletions
diff --git a/data.c b/data.c
index ecddfc3..f2b4e0f 100644
--- a/data.c
+++ b/data.c
@@ -2168,7 +2168,7 @@ void enqueue_output_raw(const char *keyid, const char *seckey,
qmsg = xs_dict_append(qmsg, "seckey", seckey);
/* if it's to be sent right now, bypass the disk queue and post the job */
- if (retries == 0 && job_fifo_ready())
+ if (retries == 0)
job_post(qmsg, 0);
else {
qmsg = _enqueue_put(fn, qmsg);
diff --git a/httpd.c b/httpd.c
index 7e71049..c127fee 100644
--- a/httpd.c
+++ b/httpd.c
@@ -31,16 +31,22 @@
srv_stat s_stat = {0};
srv_stat *p_stat = NULL;
+
/** job control **/
/* mutex to access the lists of jobs */
static pthread_mutex_t job_mutex;
-/* semaphre to trigger job processing */
+/* semaphore to trigger job processing */
static sem_t *job_sem;
-/* fifo of jobs */
-xs_list *job_fifo = NULL;
+typedef struct job_fifo_item {
+ struct job_fifo_item *next;
+ xs_val *job;
+} job_fifo_item;
+
+static job_fifo_item *job_fifo_first = NULL;
+static job_fifo_item *job_fifo_last = NULL;
/* nodeinfo 2.0 template */
@@ -418,24 +424,6 @@ void httpd_connection(FILE *f)
}
-static jmp_buf on_break;
-
-
-void term_handler(int s)
-{
- (void)s;
-
- longjmp(on_break, 1);
-}
-
-
-int job_fifo_ready(void)
-/* returns true if the job fifo is ready */
-{
- return job_fifo != NULL;
-}
-
-
void job_post(const xs_val *job, int urgent)
/* posts a job for the threads to process it */
{
@@ -443,19 +431,25 @@ void job_post(const xs_val *job, int urgent)
/* lock the mutex */
pthread_mutex_lock(&job_mutex);
- /* add to the fifo */
- if (job_fifo != NULL) {
- if (urgent)
- job_fifo = xs_list_insert(job_fifo, 0, job);
- else
- job_fifo = xs_list_append(job_fifo, job);
-
- p_stat->job_fifo_size++;
+ job_fifo_item *i = xs_realloc(NULL, sizeof(job_fifo_item));
+ *i = (job_fifo_item){ NULL, xs_dup(job) };
- srv_debug(2, xs_fmt(
- "job_fifo sizes: %d %08x", p_stat->job_fifo_size, xs_size(job_fifo)));
+ if (job_fifo_first == NULL)
+ job_fifo_first = job_fifo_last = i;
+ else
+ if (urgent) {
+ /* prepend */
+ i->next = job_fifo_first;
+ job_fifo_first = i;
+ }
+ else {
+ /* append */
+ job_fifo_last->next = i;
+ job_fifo_last = i;
}
+ p_stat->job_fifo_size++;
+
/* unlock the mutex */
pthread_mutex_unlock(&job_mutex);
@@ -475,8 +469,16 @@ void job_wait(xs_val **job)
pthread_mutex_lock(&job_mutex);
/* dequeue */
- if (job_fifo != NULL) {
- job_fifo = xs_list_shift(job_fifo, job);
+ job_fifo_item *i = job_fifo_first;
+
+ if (i != NULL) {
+ job_fifo_first = i->next;
+
+ if (job_fifo_first == NULL)
+ job_fifo_last = NULL;
+
+ *job = i->job;
+ xs_free(i);
p_stat->job_fifo_size--;
}
@@ -604,6 +606,16 @@ static void *background_thread(void *arg)
}
+static jmp_buf on_break;
+
+void term_handler(int s)
+{
+ (void)s;
+
+ longjmp(on_break, 1);
+}
+
+
void httpd(void)
/* starts the server */
{
@@ -663,8 +675,6 @@ void httpd(void)
return;
}
- job_fifo = xs_list_new();
-
/* initialize sleep control */
pthread_mutex_init(&sleep_mutex, NULL);
pthread_cond_init(&sleep_cond, NULL);
@@ -717,10 +727,6 @@ void httpd(void)
for (n = 0; n < p_stat->n_threads; n++)
pthread_join(threads[n], NULL);
- pthread_mutex_lock(&job_mutex);
- job_fifo = xs_free(job_fifo);
- pthread_mutex_unlock(&job_mutex);
-
sem_close(job_sem);
sem_unlink(sem_name);
diff --git a/snac.h b/snac.h
index 26ddfee..4030717 100644
--- a/snac.h
+++ b/snac.h
@@ -294,7 +294,6 @@ int deluser(snac *user);
extern const char *snac_blurb;
-int job_fifo_ready(void);
void job_post(const xs_val *job, int urgent);
void job_wait(xs_val **job);