From e37490d5e1e025473a935c8952d3c5d0416990d4 Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Thu, 21 Nov 2024 09:18:19 -0700 Subject: [PATCH] io_uring: replace defer task_work llist with io_wq_work_list Add a spinlock for the list, and replace the lockless llist with the work list instead. This avoids needing to reverse items in the list before running them, as the io_wq_work_list is FIFO by nature whereas the llist is LIFO. Signed-off-by: Jens Axboe --- include/linux/io_uring_types.h | 13 ++- io_uring/io_uring.c | 184 +++++++++++++++++---------------- io_uring/io_uring.h | 2 +- 3 files changed, 103 insertions(+), 96 deletions(-) diff --git a/include/linux/io_uring_types.h b/include/linux/io_uring_types.h index 2922635986f5..d66ebe7a1e2c 100644 --- a/include/linux/io_uring_types.h +++ b/include/linux/io_uring_types.h @@ -351,8 +351,9 @@ struct io_ring_ctx { * regularly bounce b/w CPUs. */ struct { - struct llist_head work_llist; - struct llist_head retry_llist; + struct io_wq_work_list work_list; + spinlock_t work_lock; + int work_items; unsigned long check_cq; atomic_t cq_wait_nr; atomic_t cq_timeouts; @@ -596,7 +597,11 @@ enum { typedef void (*io_req_tw_func_t)(struct io_kiocb *req, io_tw_token_t tw); struct io_task_work { - struct llist_node node; + /* DEFER_TASKRUN uses work_node, regular task_work node */ + union { + struct io_wq_work_node work_node; + struct llist_node node; + }; io_req_tw_func_t func; }; @@ -655,8 +660,6 @@ struct io_kiocb { */ u16 buf_index; - unsigned nr_tw; - /* REQ_F_* flags */ io_req_flags_t flags; diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c index 5111ec040c53..f3752dc99889 100644 --- a/io_uring/io_uring.c +++ b/io_uring/io_uring.c @@ -355,7 +355,8 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) INIT_LIST_HEAD(&ctx->defer_list); INIT_LIST_HEAD(&ctx->timeout_list); INIT_LIST_HEAD(&ctx->ltimeout_list); - init_llist_head(&ctx->work_llist); + INIT_WQ_LIST(&ctx->work_list); + spin_lock_init(&ctx->work_lock); INIT_LIST_HEAD(&ctx->tctx_list); ctx->submit_state.free_list.next = NULL; INIT_HLIST_HEAD(&ctx->waitid_list); @@ -1080,6 +1081,22 @@ struct llist_node *io_handle_tw_list(struct llist_node *node, return node; } +static __cold void ____io_fallback_tw(struct io_kiocb *req, bool sync, + struct io_ring_ctx **last_ctx) +{ + if (*last_ctx != req->ctx) { + if (*last_ctx) { + if (sync) + flush_delayed_work(&(*last_ctx)->fallback_work); + percpu_ref_put(&(*last_ctx)->refs); + } + *last_ctx = req->ctx; + percpu_ref_get(&(*last_ctx)->refs); + } + if (llist_add(&req->io_task_work.node, &(*last_ctx)->fallback_llist)) + schedule_delayed_work(&(*last_ctx)->fallback_work, 1); +} + static __cold void __io_fallback_tw(struct llist_node *node, bool sync) { struct io_ring_ctx *last_ctx = NULL; @@ -1088,17 +1105,7 @@ static __cold void __io_fallback_tw(struct llist_node *node, bool sync) while (node) { req = container_of(node, struct io_kiocb, io_task_work.node); node = node->next; - if (last_ctx != req->ctx) { - if (last_ctx) { - if (sync) - flush_delayed_work(&last_ctx->fallback_work); - percpu_ref_put(&last_ctx->refs); - } - last_ctx = req->ctx; - percpu_ref_get(&last_ctx->refs); - } - if (llist_add(&req->io_task_work.node, &last_ctx->fallback_llist)) - schedule_delayed_work(&last_ctx->fallback_work, 1); + ____io_fallback_tw(req, sync, &last_ctx); } if (last_ctx) { @@ -1152,66 +1159,48 @@ void tctx_task_work(struct callback_head *cb) WARN_ON_ONCE(ret); } -static void io_req_local_work_add(struct io_kiocb *req, unsigned flags) +static void io_req_local_work_add(struct io_kiocb *req, unsigned tw_flags) { struct io_ring_ctx *ctx = req->ctx; - unsigned nr_wait, nr_tw, nr_tw_prev; - struct llist_node *head; + unsigned nr_tw, nr_tw_prev, nr_wait; + unsigned long flags; /* See comment above IO_CQ_WAKE_INIT */ BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES); /* - * We don't know how many reuqests is there in the link and whether - * they can even be queued lazily, fall back to non-lazy. + * We don't know how many requests are in the link and whether they can + * even be queued lazily, fall back to non-lazy. */ if (req->flags & IO_REQ_LINK_FLAGS) - flags &= ~IOU_F_TWQ_LAZY_WAKE; + tw_flags &= ~IOU_F_TWQ_LAZY_WAKE; guard(rcu)(); + spin_lock_irqsave(&ctx->work_lock, flags); + wq_list_add_tail(&req->io_task_work.work_node, &ctx->work_list); + nr_tw_prev = ctx->work_items++; + spin_unlock_irqrestore(&ctx->work_lock, flags); - head = READ_ONCE(ctx->work_llist.first); - do { - nr_tw_prev = 0; - if (head) { - struct io_kiocb *first_req = container_of(head, - struct io_kiocb, - io_task_work.node); - /* - * Might be executed at any moment, rely on - * SLAB_TYPESAFE_BY_RCU to keep it alive. - */ - nr_tw_prev = READ_ONCE(first_req->nr_tw); - } - - /* - * Theoretically, it can overflow, but that's fine as one of - * previous adds should've tried to wake the task. - */ - nr_tw = nr_tw_prev + 1; - if (!(flags & IOU_F_TWQ_LAZY_WAKE)) - nr_tw = IO_CQ_WAKE_FORCE; - - req->nr_tw = nr_tw; - req->io_task_work.node.next = head; - } while (!try_cmpxchg(&ctx->work_llist.first, &head, - &req->io_task_work.node)); - - /* - * cmpxchg implies a full barrier, which pairs with the barrier - * in set_current_state() on the io_cqring_wait() side. It's used - * to ensure that either we see updated ->cq_wait_nr, or waiters - * going to sleep will observe the work added to the list, which - * is similar to the wait/wawke task state sync. - */ + nr_tw = nr_tw_prev + 1; + if (!(tw_flags & IOU_F_TWQ_LAZY_WAKE)) + nr_tw = IO_CQ_WAKE_FORCE; - if (!head) { + if (!nr_tw_prev) { if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); if (ctx->has_evfd) io_eventfd_signal(ctx, false); } + + /* + * We need a barrier after unlock, which pairs with the barrier + * in set_current_state() on the io_cqring_wait() side. It's used + * to ensure that either we see updated ->cq_wait_nr, or waiters + * going to sleep will observe the work added to the list, which + * is similar to the wait/wake task state sync. + */ + smp_mb(); nr_wait = atomic_read(&ctx->cq_wait_nr); /* not enough or no one is waiting */ if (nr_tw < nr_wait) @@ -1263,11 +1252,27 @@ void io_req_task_work_add_remote(struct io_kiocb *req, unsigned flags) static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx) { - struct llist_node *node = llist_del_all(&ctx->work_llist); + struct io_ring_ctx *last_ctx = NULL; + struct io_wq_work_node *node; + unsigned long flags; + + spin_lock_irqsave(&ctx->work_lock, flags); + node = ctx->work_list.first; + INIT_WQ_LIST(&ctx->work_list); + ctx->work_items = 0; + spin_unlock_irqrestore(&ctx->work_lock, flags); - __io_fallback_tw(node, false); - node = llist_del_all(&ctx->retry_llist); - __io_fallback_tw(node, false); + while (node) { + struct io_kiocb *req; + + req = container_of(node, struct io_kiocb, io_task_work.work_node); + node = node->next; + ____io_fallback_tw(req, false, &last_ctx); + } + if (last_ctx) { + flush_delayed_work(&last_ctx->fallback_work); + percpu_ref_put(&last_ctx->refs); + } } static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events, @@ -1282,33 +1287,12 @@ static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events, return false; } -static int __io_run_local_work_loop(struct llist_node **node, - io_tw_token_t tw, - int events) -{ - int ret = 0; - - while (*node) { - struct llist_node *next = (*node)->next; - struct io_kiocb *req = container_of(*node, struct io_kiocb, - io_task_work.node); - INDIRECT_CALL_2(req->io_task_work.func, - io_poll_task_func, io_req_rw_complete, - req, tw); - *node = next; - if (++ret >= events) - break; - } - - return ret; -} - static int __io_run_local_work(struct io_ring_ctx *ctx, io_tw_token_t tw, int min_events, int max_events) { - struct llist_node *node; + struct io_wq_work_node *node, *tail; + int ret = 0, nitems; unsigned int loops = 0; - int ret = 0; if (WARN_ON_ONCE(ctx->submitter_task != current)) return -EEXIST; @@ -1316,17 +1300,37 @@ static int __io_run_local_work(struct io_ring_ctx *ctx, io_tw_token_t tw, atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); again: min_events -= ret; - ret = __io_run_local_work_loop(&ctx->retry_llist.first, tw, max_events); - if (ctx->retry_llist.first) + spin_lock_irq(&ctx->work_lock); + node = ctx->work_list.first; + tail = ctx->work_list.last; + nitems = ctx->work_items; + INIT_WQ_LIST(&ctx->work_list); + ctx->work_items = 0; + spin_unlock_irq(&ctx->work_lock); + + while (node) { + struct io_kiocb *req = container_of(node, struct io_kiocb, + io_task_work.work_node); + node = node->next; + INDIRECT_CALL_2(req->io_task_work.func, + io_poll_task_func, io_req_rw_complete, + req, tw); + nitems--; + if (++ret >= max_events) + break; + } + + if (unlikely(node)) { + spin_lock_irq(&ctx->work_lock); + tail->next = ctx->work_list.first; + ctx->work_list.first = node; + if (!ctx->work_list.last) + ctx->work_list.last = tail; + ctx->work_items += nitems; + spin_unlock_irq(&ctx->work_lock); goto retry_done; + } - /* - * llists are in reverse order, flip it back the right way before - * running the pending items. - */ - node = llist_reverse_order(llist_del_all(&ctx->work_llist)); - ret += __io_run_local_work_loop(&node, tw, max_events - ret); - ctx->retry_llist.first = node; loops++; if (io_run_local_work_continue(ctx, ret, min_events)) @@ -2435,7 +2439,7 @@ static enum hrtimer_restart io_cqring_min_timer_wakeup(struct hrtimer *timer) if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) { atomic_set(&ctx->cq_wait_nr, 1); smp_mb(); - if (!llist_empty(&ctx->work_llist)) + if (io_local_work_pending(ctx)) goto out_wake; } diff --git a/io_uring/io_uring.h b/io_uring/io_uring.h index 56becba55c7e..0d191ec45d0e 100644 --- a/io_uring/io_uring.h +++ b/io_uring/io_uring.h @@ -383,7 +383,7 @@ static inline int io_run_task_work(void) static inline bool io_local_work_pending(struct io_ring_ctx *ctx) { - return !llist_empty(&ctx->work_llist) || !llist_empty(&ctx->retry_llist); + return READ_ONCE(ctx->work_list.first); } static inline bool io_task_work_pending(struct io_ring_ctx *ctx) -- 2.25.1