summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--activitypub.c46
-rw-r--r--data.c6
-rw-r--r--httpd.c3
-rw-r--r--snac.h7
4 files changed, 37 insertions, 25 deletions
diff --git a/activitypub.c b/activitypub.c
index 5cc059d..9c9ea7c 100644
--- a/activitypub.c
+++ b/activitypub.c
@@ -839,7 +839,7 @@ void notify(snac *snac, char *type, char *utype, char *actor, char *msg)
body = xs_str_cat(body, s1);
}
- enqueue_email(snac, body, 0);
+ enqueue_email(body, 0);
}
@@ -1137,26 +1137,6 @@ void process_user_queue_item(snac *snac, xs_dict *q_item)
}
}
}
- else
- if (strcmp(type, "email") == 0) {
- /* send this email */
- xs_str *msg = xs_dict_get(q_item, "message");
- int retries = xs_number_get(xs_dict_get(q_item, "retries"));
-
- if (!send_email(msg))
- snac_debug(snac, 1, xs_fmt("email message sent"));
- else {
- if (retries > queue_retry_max)
- snac_log(snac, xs_fmt("process_queue email giving up (errno: %d)", errno));
- else {
- /* requeue */
- snac_log(snac, xs_fmt(
- "process_queue email requeue #%d (errno: %d)", retries + 1, errno));
-
- enqueue_email(snac, msg, retries + 1);
- }
- }
- }
}
@@ -1184,6 +1164,30 @@ void process_user_queue(snac *snac)
void process_queue_item(xs_dict *q_item)
/* processes an item from the global queue */
{
+ char *type = xs_dict_get(q_item, "type");
+ int queue_retry_max = xs_number_get(xs_dict_get(srv_config, "queue_retry_max"));
+
+ if (strcmp(type, "email") == 0) {
+ /* send this email */
+ xs_str *msg = xs_dict_get(q_item, "message");
+ int retries = xs_number_get(xs_dict_get(q_item, "retries"));
+
+ if (!send_email(msg))
+ srv_debug(1, xs_fmt("email message sent"));
+ else {
+ retries++;
+
+ if (retries > queue_retry_max)
+ srv_log(xs_fmt("process_queue email giving up (errno: %d)", errno));
+ else {
+ /* requeue */
+ srv_log(xs_fmt(
+ "process_queue email requeue #%d (errno: %d)", retries, errno));
+
+ enqueue_email(msg, retries);
+ }
+ }
+ }
}
diff --git a/data.c b/data.c
index 96583aa..da8d422 100644
--- a/data.c
+++ b/data.c
@@ -1389,16 +1389,16 @@ void enqueue_output_by_actor(snac *snac, xs_dict *msg, xs_str *actor, int retrie
}
-void enqueue_email(snac *snac, xs_str *msg, int retries)
+void enqueue_email(xs_str *msg, int retries)
/* enqueues an email message to be sent */
{
xs *qmsg = _new_qmsg("email", msg, retries);
char *ntid = xs_dict_get(qmsg, "ntid");
- xs *fn = xs_fmt("%s/queue/%s.json", snac->basedir, ntid);
+ xs *fn = xs_fmt("%s/queue/%s.json", srv_basedir, ntid);
qmsg = _enqueue_put(fn, qmsg);
- snac_debug(snac, 1, xs_fmt("enqueue_email %d", retries));
+ srv_debug(1, xs_fmt("enqueue_email %d", retries));
}
diff --git a/httpd.c b/httpd.c
index c47b841..7932982 100644
--- a/httpd.c
+++ b/httpd.c
@@ -280,6 +280,9 @@ static void *queue_thread(void *arg)
}
}
+ /* global queue */
+ process_queue();
+
/* time to purge? */
if ((t = time(NULL)) > purge_time) {
pthread_t pth;
diff --git a/snac.h b/snac.h
index 55ed49d..78007f7 100644
--- a/snac.h
+++ b/snac.h
@@ -126,10 +126,11 @@ d_char *history_list(snac *snac);
void enqueue_input(snac *snac, xs_dict *msg, xs_dict *req, int retries);
void enqueue_output(snac *snac, xs_dict *msg, xs_str *inbox, int retries);
void enqueue_output_by_actor(snac *snac, xs_dict *msg, xs_str *actor, int retries);
-void enqueue_email(snac *snac, xs_str *msg, int retries);
+void enqueue_email(xs_str *msg, int retries);
void enqueue_message(snac *snac, char *msg);
xs_list *user_queue(snac *snac);
+xs_list *queue(void);
xs_dict *dequeue(const char *fn);
void purge(snac *snac);
@@ -165,7 +166,11 @@ int send_to_inbox(snac *snac, char *inbox, char *msg, d_char **payload, int *p_s
d_char *get_actor_inbox(snac *snac, char *actor);
int send_to_actor(snac *snac, char *actor, char *msg, d_char **payload, int *p_size, int timeout);
int is_msg_public(snac *snac, char *msg);
+
void process_user_queue(snac *snac);
+
+void process_queue(void);
+
void post(snac *snac, char *msg);
int activitypub_get_handler(d_char *req, char *q_path,
char **body, int *b_size, char **ctype);