summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordefault <nobody@localhost>2023-01-31 22:30:34 +0100
committerdefault <nobody@localhost>2023-01-31 22:30:34 +0100
commitc2524323a932d383a10c0e3e3adbffdada085515 (patch)
tree907347422b1adb05f4a7a0eb445883fde4402458
parentf0ef1d41159abe53633986f419c434d0c3b4363e (diff)
New function process_queue_item().
-rw-r--r--activitypub.c184
1 files changed, 96 insertions, 88 deletions
diff --git a/activitypub.c b/activitypub.c
index 9bffb5b..ba6e45e 100644
--- a/activitypub.c
+++ b/activitypub.c
@@ -845,7 +845,7 @@ void notify(snac *snac, char *type, char *utype, char *actor, char *msg)
/** queues **/
-int process_message(snac *snac, char *msg, char *req)
+int process_input_message(snac *snac, char *msg, char *req)
/* processes an ActivityPub message from the input queue */
{
/* actor and type exist, were checked previously */
@@ -1065,114 +1065,122 @@ int send_email(char *msg)
}
-void process_queue(snac *snac)
-/* processes the queue */
+void process_queue_item(snac *snac, xs_dict *q_item)
+/* processes an item from the queue */
{
- xs *list;
- char *p, *fn;
+ char *type;
int queue_retry_max = xs_number_get(xs_dict_get(srv_config, "queue_retry_max"));
- list = queue(snac);
+ if ((type = xs_dict_get(q_item, "type")) == NULL)
+ type = "output";
- p = list;
- while (xs_list_iter(&p, &fn)) {
- xs *q_item = dequeue(snac, fn);
- char *type;
+ if (strcmp(type, "message") == 0) {
+ xs_dict *msg = xs_dict_get(q_item, "message");
+ xs *inboxes = inbox_list(snac, msg);
+ xs_list *p;
+ xs_str *inbox;
- if (q_item == NULL) {
- snac_log(snac, xs_fmt("process_queue q_item error"));
- continue;
+ p = inboxes;
+ while (xs_list_iter(&p, &inbox)) {
+ enqueue_output(snac, msg, inbox, 0);
}
+ }
+ else
+ if (strcmp(type, "output") == 0) {
+ int status;
+ xs_str *inbox = xs_dict_get(q_item, "inbox");
+ xs_dict *msg = xs_dict_get(q_item, "message");
+ int retries = xs_number_get(xs_dict_get(q_item, "retries"));
+ xs *payload = NULL;
+ int p_size = 0;
+
+ if (xs_is_null(inbox) || xs_is_null(msg))
+ return;
- if ((type = xs_dict_get(q_item, "type")) == NULL)
- type = "output";
+ /* deliver */
+ status = send_to_inbox(snac, inbox, msg, &payload, &p_size, retries == 0 ? 3 : 8);
- if (strcmp(type, "message") == 0) {
- char *msg = xs_dict_get(q_item, "message");
- xs *inboxes = inbox_list(snac, msg);
- char *p, *v;
+ snac_log(snac, xs_fmt("process_queue sent to inbox %s %d", inbox, status));
- p = inboxes;
- while (xs_list_iter(&p, &v)) {
- enqueue_output(snac, msg, v, 0);
- }
- }
- else
- if (strcmp(type, "output") == 0) {
- int status;
- char *inbox = xs_dict_get(q_item, "inbox");
- char *msg = xs_dict_get(q_item, "message");
- int retries = xs_number_get(xs_dict_get(q_item, "retries"));
- xs *payload = NULL;
- int p_size = 0;
-
- if (xs_is_null(inbox) || xs_is_null(msg))
- continue;
-
- /* deliver */
- status = send_to_inbox(snac, inbox, msg, &payload, &p_size, retries == 0 ? 3 : 8);
-
- snac_log(snac, xs_fmt("process_queue sent to inbox %s %d", inbox, status));
-
- if (!valid_status(status)) {
- /* error sending; requeue? */
- if (status == 404 || status == 410)
- /* explicit error: discard */
- snac_log(snac, xs_fmt("process_queue error %s %d", inbox, status));
- else
- if (retries > queue_retry_max)
- snac_log(snac, xs_fmt("process_queue giving up %s %d", inbox, status));
- else {
- /* requeue */
- enqueue_output(snac, msg, inbox, retries + 1);
- snac_log(snac, xs_fmt("process_queue requeue %s #%d", inbox, retries + 1));
- }
+ if (!valid_status(status)) {
+ /* error sending; requeue? */
+ if (status == 404 || status == 410)
+ /* explicit error: discard */
+ snac_log(snac, xs_fmt("process_queue error %s %d", inbox, status));
+ else
+ if (retries > queue_retry_max)
+ snac_log(snac, xs_fmt("process_queue giving up %s %d", inbox, status));
+ else {
+ /* requeue */
+ enqueue_output(snac, msg, inbox, retries + 1);
+ snac_log(snac, xs_fmt("process_queue requeue %s #%d", inbox, retries + 1));
}
}
- else
- if (strcmp(type, "input") == 0) {
- /* process the message */
- char *msg = xs_dict_get(q_item, "message");
- char *req = xs_dict_get(q_item, "req");
- int retries = xs_number_get(xs_dict_get(q_item, "retries"));
-
- if (xs_is_null(msg))
- continue;
-
- if (!process_message(snac, msg, req)) {
- if (retries > queue_retry_max)
- snac_log(snac, xs_fmt("process_queue input giving up"));
- else {
- /* reenqueue */
- enqueue_input(snac, msg, req, retries + 1);
- snac_log(snac, xs_fmt("process_queue input requeue #%d", retries + 1));
- }
+ }
+ else
+ if (strcmp(type, "input") == 0) {
+ /* process the message */
+ xs_dict *msg = xs_dict_get(q_item, "message");
+ xs_dict *req = xs_dict_get(q_item, "req");
+ int retries = xs_number_get(xs_dict_get(q_item, "retries"));
+
+ if (xs_is_null(msg))
+ return;
+
+ if (!process_input_message(snac, msg, req)) {
+ if (retries > queue_retry_max)
+ snac_log(snac, xs_fmt("process_queue input giving up"));
+ else {
+ /* reenqueue */
+ enqueue_input(snac, msg, req, retries + 1);
+ snac_log(snac, xs_fmt("process_queue input requeue #%d", retries + 1));
}
}
- else
- if (strcmp(type, "email") == 0) {
- /* send this email */
- char *msg = xs_dict_get(q_item, "message");
- int retries = xs_number_get(xs_dict_get(q_item, "retries"));
+ }
+ else
+ if (strcmp(type, "email") == 0) {
+ /* send this email */
+ xs_str *msg = xs_dict_get(q_item, "message");
+ int retries = xs_number_get(xs_dict_get(q_item, "retries"));
- if (!send_email(msg))
- snac_debug(snac, 1, xs_fmt("email message sent"));
+ if (!send_email(msg))
+ snac_debug(snac, 1, xs_fmt("email message sent"));
+ else {
+ if (retries > queue_retry_max)
+ snac_log(snac, xs_fmt("process_queue email giving up (errno: %d)", errno));
else {
- if (retries > queue_retry_max)
- snac_log(snac, xs_fmt("process_queue email giving up (errno: %d)", errno));
- else {
- /* requeue */
- snac_log(snac, xs_fmt(
- "process_queue email requeue #%d (errno: %d)", retries + 1, errno));
-
- enqueue_email(snac, msg, retries + 1);
- }
+ /* requeue */
+ snac_log(snac, xs_fmt(
+ "process_queue email requeue #%d (errno: %d)", retries + 1, errno));
+
+ enqueue_email(snac, msg, retries + 1);
}
}
}
}
+void process_queue(snac *snac)
+/* processes the queue */
+{
+ xs *list = queue(snac);
+
+ xs_list *p = list;
+ xs_str *fn;
+
+ while (xs_list_iter(&p, &fn)) {
+ xs *q_item = dequeue(snac, fn);
+
+ if (q_item == NULL) {
+ snac_log(snac, xs_fmt("process_queue q_item error"));
+ continue;
+ }
+
+ process_queue_item(snac, q_item);
+ }
+}
+
+
/** HTTP handlers */
int activitypub_get_handler(d_char *req, char *q_path,