summaryrefslogtreecommitdiff
path: root/activitypub.c
diff options
context:
space:
mode:
authordefault <nobody@localhost>2023-12-11 17:59:48 +0100
committerdefault <nobody@localhost>2023-12-11 17:59:48 +0100
commit888a79e58a2de457558c41cc91cf7e8d5df78811 (patch)
treeed4835db1b38c1175134cbd4ec601a0a9256db7a /activitypub.c
parentb1ecaba8035d0239051f01e4bdc7af5326a1d980 (diff)
Call process_input_message() from the shared-inbox input.
This way, some garbage like unrequested Deletes from Mastodon and other transient errors (like unaccessible authors) can be short-circuited before propagating the message to the users.
Diffstat (limited to 'activitypub.c')
-rw-r--r--activitypub.c73
1 files changed, 47 insertions, 26 deletions
diff --git a/activitypub.c b/activitypub.c
index 55a245e..d7f5b37 100644
--- a/activitypub.c
+++ b/activitypub.c
@@ -2052,45 +2052,66 @@ void process_queue_item(xs_dict *q_item)
}
else
if (strcmp(type, "input") == 0) {
- /* redistribute the input message to all users */
- char *ntid = xs_dict_get(q_item, "ntid");
- xs *tmpfn = xs_fmt("%s/tmp/%s.json", srv_basedir, ntid);
xs_dict *msg = xs_dict_get(q_item, "message");
- FILE *f;
+ xs_dict *req = xs_dict_get(q_item, "req");
+ int retries = xs_number_get(xs_dict_get(q_item, "retries"));
+
+ /* do some instance-level checks */
+ int r = process_input_message(NULL, msg, req);
- if ((f = fopen(tmpfn, "w")) != NULL) {
- xs_json_dump(q_item, 4, f);
- fclose(f);
+ if (r == 0) {
+ /* transient error? retry */
+ int queue_retry_max = xs_number_get(xs_dict_get(srv_config, "queue_retry_max"));
+
+ if (retries > queue_retry_max)
+ srv_log(xs_fmt("shared input giving up"));
+ else {
+ /* reenqueue */
+ enqueue_shared_input(msg, req, retries + 1);
+ srv_log(xs_fmt("shared input requeue #%d", retries + 1));
+ }
}
+ else
+ if (r == 2) {
+ /* redistribute the input message to all users */
+ char *ntid = xs_dict_get(q_item, "ntid");
+ xs *tmpfn = xs_fmt("%s/tmp/%s.json", srv_basedir, ntid);
+ FILE *f;
+
+ if ((f = fopen(tmpfn, "w")) != NULL) {
+ xs_json_dump(q_item, 4, f);
+ fclose(f);
+ }
- xs *users = user_list();
- xs_list *p = users;
- char *v;
- int cnt = 0;
+ xs *users = user_list();
+ xs_list *p = users;
+ char *v;
+ int cnt = 0;
- while (xs_list_iter(&p, &v)) {
- snac user;
+ while (xs_list_iter(&p, &v)) {
+ snac user;
- if (user_open(&user, v)) {
- if (is_msg_for_me(&user, msg)) {
- xs *fn = xs_fmt("%s/queue/%s.json", user.basedir, ntid);
+ if (user_open(&user, v)) {
+ if (is_msg_for_me(&user, msg)) {
+ xs *fn = xs_fmt("%s/queue/%s.json", user.basedir, ntid);
- snac_debug(&user, 1, xs_fmt("enqueue_input (from shared inbox) %s", fn));
+ snac_debug(&user, 1, xs_fmt("enqueue_input (from shared inbox) %s", fn));
- if (link(tmpfn, fn) < 0)
- srv_log(xs_fmt("link(%s, %s) error", tmpfn, fn));
+ if (link(tmpfn, fn) < 0)
+ srv_log(xs_fmt("link(%s, %s) error", tmpfn, fn));
- cnt++;
- }
+ cnt++;
+ }
- user_free(&user);
+ user_free(&user);
+ }
}
- }
- unlink(tmpfn);
+ unlink(tmpfn);
- if (cnt == 0)
- srv_debug(1, xs_fmt("no valid recipients for %s", tmpfn));
+ if (cnt == 0)
+ srv_debug(1, xs_fmt("no valid recipients for %s", tmpfn));
+ }
}
else
srv_log(xs_fmt("unexpected q_item type '%s'", type));