diff options
-rw-r--r-- | activitypub.c | 73 |
1 files changed, 47 insertions, 26 deletions
diff --git a/activitypub.c b/activitypub.c index 55a245e..d7f5b37 100644 --- a/activitypub.c +++ b/activitypub.c @@ -2052,45 +2052,66 @@ void process_queue_item(xs_dict *q_item) } else if (strcmp(type, "input") == 0) { - /* redistribute the input message to all users */ - char *ntid = xs_dict_get(q_item, "ntid"); - xs *tmpfn = xs_fmt("%s/tmp/%s.json", srv_basedir, ntid); xs_dict *msg = xs_dict_get(q_item, "message"); - FILE *f; + xs_dict *req = xs_dict_get(q_item, "req"); + int retries = xs_number_get(xs_dict_get(q_item, "retries")); + + /* do some instance-level checks */ + int r = process_input_message(NULL, msg, req); - if ((f = fopen(tmpfn, "w")) != NULL) { - xs_json_dump(q_item, 4, f); - fclose(f); + if (r == 0) { + /* transient error? retry */ + int queue_retry_max = xs_number_get(xs_dict_get(srv_config, "queue_retry_max")); + + if (retries > queue_retry_max) + srv_log(xs_fmt("shared input giving up")); + else { + /* reenqueue */ + enqueue_shared_input(msg, req, retries + 1); + srv_log(xs_fmt("shared input requeue #%d", retries + 1)); + } } + else + if (r == 2) { + /* redistribute the input message to all users */ + char *ntid = xs_dict_get(q_item, "ntid"); + xs *tmpfn = xs_fmt("%s/tmp/%s.json", srv_basedir, ntid); + FILE *f; + + if ((f = fopen(tmpfn, "w")) != NULL) { + xs_json_dump(q_item, 4, f); + fclose(f); + } - xs *users = user_list(); - xs_list *p = users; - char *v; - int cnt = 0; + xs *users = user_list(); + xs_list *p = users; + char *v; + int cnt = 0; - while (xs_list_iter(&p, &v)) { - snac user; + while (xs_list_iter(&p, &v)) { + snac user; - if (user_open(&user, v)) { - if (is_msg_for_me(&user, msg)) { - xs *fn = xs_fmt("%s/queue/%s.json", user.basedir, ntid); + if (user_open(&user, v)) { + if (is_msg_for_me(&user, msg)) { + xs *fn = xs_fmt("%s/queue/%s.json", user.basedir, ntid); - snac_debug(&user, 1, xs_fmt("enqueue_input (from shared inbox) %s", fn)); + snac_debug(&user, 1, xs_fmt("enqueue_input (from shared inbox) %s", fn)); - if (link(tmpfn, fn) < 0) - srv_log(xs_fmt("link(%s, %s) error", tmpfn, fn)); + if (link(tmpfn, fn) < 0) + srv_log(xs_fmt("link(%s, %s) error", tmpfn, fn)); - cnt++; - } + cnt++; + } - user_free(&user); + user_free(&user); + } } - } - unlink(tmpfn); + unlink(tmpfn); - if (cnt == 0) - srv_debug(1, xs_fmt("no valid recipients for %s", tmpfn)); + if (cnt == 0) + srv_debug(1, xs_fmt("no valid recipients for %s", tmpfn)); + } } else srv_log(xs_fmt("unexpected q_item type '%s'", type)); |