summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordefault <nobody@localhost>2023-02-07 13:31:48 +0100
committerdefault <nobody@localhost>2023-02-07 13:31:48 +0100
commit4cca157641d5f91bde51baf437a3179e39d0b601 (patch)
tree84e6cf02972ea3836fbd7e92f89d69f47ccd88b7
parent8f63c6259ad8eb6d26eff2673a29ef469ad8e018 (diff)
Output messages are now processed by the pool of threads.
-rw-r--r--activitypub.c38
-rw-r--r--data.c30
-rw-r--r--snac.h2
3 files changed, 60 insertions, 10 deletions
diff --git a/activitypub.c b/activitypub.c
index af0a9a1..b54845d 100644
--- a/activitypub.c
+++ b/activitypub.c
@@ -1195,6 +1195,44 @@ void process_queue_item(xs_dict *q_item)
char *type = xs_dict_get(q_item, "type");
int queue_retry_max = xs_number_get(xs_dict_get(srv_config, "queue_retry_max"));
+ if (strcmp(type, "output") == 0) {
+ int status;
+ xs_str *inbox = xs_dict_get(q_item, "inbox");
+ xs_str *keyid = xs_dict_get(q_item, "keyid");
+ xs_str *seckey = xs_dict_get(q_item, "seckey");
+ xs_dict *msg = xs_dict_get(q_item, "message");
+ int retries = xs_number_get(xs_dict_get(q_item, "retries"));
+ xs *payload = NULL;
+ int p_size = 0;
+
+ if (xs_is_null(inbox) || xs_is_null(msg) || xs_is_null(keyid) || xs_is_null(seckey)) {
+ srv_log(xs_fmt("output message error: missing fields"));
+ return;
+ }
+
+ /* deliver */
+ status = send_to_inbox_raw(keyid, seckey, inbox, msg, &payload, &p_size, retries == 0 ? 3 : 8);
+
+ srv_log(xs_fmt("output message: sent to inbox %s %d", inbox, status));
+
+ if (!valid_status(status)) {
+ retries++;
+
+ /* error sending; requeue? */
+ if (status == 404 || status == 410)
+ /* explicit error: discard */
+ srv_log(xs_fmt("output message: fatal error %s %d", inbox, status));
+ else
+ if (retries > queue_retry_max)
+ srv_log(xs_fmt("output message: giving up %s %d", inbox, status));
+ else {
+ /* requeue */
+ enqueue_output_raw(keyid, seckey, msg, inbox, retries);
+ srv_log(xs_fmt("output message: requeue %s #%d", inbox, retries));
+ }
+ }
+ }
+ else
if (strcmp(type, "email") == 0) {
/* send this email */
xs_str *msg = xs_dict_get(q_item, "message");
diff --git a/data.c b/data.c
index 3827422..ac429fa 100644
--- a/data.c
+++ b/data.c
@@ -1373,25 +1373,35 @@ void enqueue_input(snac *snac, xs_dict *msg, xs_dict *req, int retries)
}
-void enqueue_output(snac *snac, xs_dict *msg, xs_str *inbox, int retries)
+void enqueue_output_raw(const char *keyid, const char *seckey,
+ xs_dict *msg, xs_str *inbox, int retries)
/* enqueues an output message to an inbox */
{
- if (xs_startswith(inbox, snac->actor)) {
- snac_debug(snac, 1, xs_str_new("refusing enqueue to myself"));
- return;
- }
-
xs *qmsg = _new_qmsg("output", msg, retries);
char *ntid = xs_dict_get(qmsg, "ntid");
- xs *fn = xs_fmt("%s/queue/%s.json", snac->basedir, ntid);
+ xs *fn = xs_fmt("%s/queue/%s.json", srv_basedir, ntid);
qmsg = xs_dict_append(qmsg, "inbox", inbox);
- qmsg = xs_dict_append(qmsg, "keyid", snac->actor);
- qmsg = xs_dict_append(qmsg, "seckey", xs_dict_get(snac->key, "secret"));
+ qmsg = xs_dict_append(qmsg, "keyid", keyid);
+ qmsg = xs_dict_append(qmsg, "seckey", seckey);
qmsg = _enqueue_put(fn, qmsg);
- snac_debug(snac, 1, xs_fmt("enqueue_output %s %s %d", inbox, fn, retries));
+ srv_debug(1, xs_fmt("enqueue_output %s %s %d", inbox, fn, retries));
+}
+
+
+void enqueue_output(snac *snac, xs_dict *msg, xs_str *inbox, int retries)
+/* enqueues an output message to an inbox */
+{
+ if (xs_startswith(inbox, snac->actor)) {
+ snac_debug(snac, 1, xs_str_new("refusing enqueue to myself"));
+ return;
+ }
+
+ char *seckey = xs_dict_get(snac->key, "secret");
+
+ enqueue_output_raw(snac->actor, seckey, msg, inbox, retries);
}
diff --git a/snac.h b/snac.h
index e3ab4a8..3894908 100644
--- a/snac.h
+++ b/snac.h
@@ -129,6 +129,8 @@ int history_del(snac *snac, char *id);
d_char *history_list(snac *snac);
void enqueue_input(snac *snac, xs_dict *msg, xs_dict *req, int retries);
+void enqueue_output_raw(const char *keyid, const char *seckey,
+ xs_dict *msg, xs_str *inbox, int retries);
void enqueue_output(snac *snac, xs_dict *msg, xs_str *inbox, int retries);
void enqueue_output_by_actor(snac *snac, xs_dict *msg, xs_str *actor, int retries);
void enqueue_email(xs_str *msg, int retries);