diff options
-rw-r--r-- | activitypub.c | 2 | ||||
-rw-r--r-- | data.c | 2 | ||||
-rw-r--r-- | httpd.c | 16 | ||||
-rw-r--r-- | snac.h | 2 |
4 files changed, 13 insertions, 9 deletions
diff --git a/activitypub.c b/activitypub.c index ee31ecd..9c47590 100644 --- a/activitypub.c +++ b/activitypub.c @@ -1316,7 +1316,7 @@ int process_queue(void) xs *q_item = dequeue(fn); if (q_item != NULL) { - job_post(q_item); + job_post(q_item, 0); cnt++; } } @@ -1497,7 +1497,7 @@ void enqueue_output_raw(const char *keyid, const char *seckey, /* if it's to be sent right now, bypass the disk queue and post the job */ if (retries == 0 && job_fifo_ready()) - job_post(qmsg); + job_post(qmsg, 0); else { qmsg = _enqueue_put(fn, qmsg); srv_debug(1, xs_fmt("enqueue_output %s %s %d", inbox, fn, retries)); @@ -262,7 +262,7 @@ int job_fifo_ready(void) } -void job_post(const xs_val *job) +void job_post(const xs_val *job, int urgent) /* posts a job for the threads to process it */ { if (job != NULL) { @@ -270,8 +270,12 @@ void job_post(const xs_val *job) pthread_mutex_lock(&job_mutex); /* add to the fifo */ - if (job_fifo != NULL) - job_fifo = xs_list_append(job_fifo, job); + if (job_fifo != NULL) { + if (urgent) + job_fifo = xs_list_insert(job_fifo, 0, job); + else + job_fifo = xs_list_append(job_fifo, job); + } /* unlock the mutex */ pthread_mutex_unlock(&job_mutex); @@ -386,7 +390,7 @@ static void *background_thread(void *arg) xs *q_item = xs_dict_new(); q_item = xs_dict_append(q_item, "type", "purge"); - job_post(q_item); + job_post(q_item, 0); } if (cnt == 0) { @@ -485,7 +489,7 @@ void httpd(void) if (f != NULL) { xs *job = xs_data_new(&f, sizeof(FILE *)); - job_post(job); + job_post(job, 1); } else break; @@ -496,7 +500,7 @@ void httpd(void) /* send as many empty jobs as working threads */ for (n = 1; n < n_threads; n++) - job_post(NULL); + job_post(NULL, 0); /* wait for all the threads to exit */ for (n = 0; n < n_threads; n++) @@ -218,5 +218,5 @@ int adduser(const char *uid); int resetpwd(snac *snac); int job_fifo_ready(void); -void job_post(const xs_val *job); +void job_post(const xs_val *job, int urgent); void job_wait(xs_val **job); |