io_uring: replace defer task_work llist with io_wq_work_list
authorJens Axboe <axboe@kernel.dk>
Thu, 21 Nov 2024 16:18:19 +0000 (09:18 -0700)
committerJens Axboe <axboe@kernel.dk>
Tue, 17 Jun 2025 11:49:30 +0000 (05:49 -0600)
Add a spinlock for the list, and replace the lockless llist with the
work list instead. This avoids needing to reverse items in the list
before running them, as the io_wq_work_list is FIFO by nature whereas
the llist is LIFO.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
include/linux/io_uring_types.h
io_uring/io_uring.c
io_uring/io_uring.h

index 2922635986f5210cb7d98060ddf771e26a19054b..d66ebe7a1e2c263cbcbb2c176a80ce557dd20091 100644 (file)
@@ -351,8 +351,9 @@ struct io_ring_ctx {
         * regularly bounce b/w CPUs.
         */
        struct {
-               struct llist_head       work_llist;
-               struct llist_head       retry_llist;
+               struct io_wq_work_list  work_list;
+               spinlock_t              work_lock;
+               int                     work_items;
                unsigned long           check_cq;
                atomic_t                cq_wait_nr;
                atomic_t                cq_timeouts;
@@ -596,7 +597,11 @@ enum {
 typedef void (*io_req_tw_func_t)(struct io_kiocb *req, io_tw_token_t tw);
 
 struct io_task_work {
-       struct llist_node               node;
+       /* DEFER_TASKRUN uses work_node, regular task_work node */
+       union {
+               struct io_wq_work_node  work_node;
+               struct llist_node       node;
+       };
        io_req_tw_func_t                func;
 };
 
@@ -655,8 +660,6 @@ struct io_kiocb {
         */
        u16                             buf_index;
 
-       unsigned                        nr_tw;
-
        /* REQ_F_* flags */
        io_req_flags_t                  flags;
 
index 5111ec040c53424e815072cd062a64bfbaf52956..f3752dc99889d20e429053cfb654b9d5c478366e 100644 (file)
@@ -355,7 +355,8 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
        INIT_LIST_HEAD(&ctx->defer_list);
        INIT_LIST_HEAD(&ctx->timeout_list);
        INIT_LIST_HEAD(&ctx->ltimeout_list);
-       init_llist_head(&ctx->work_llist);
+       INIT_WQ_LIST(&ctx->work_list);
+       spin_lock_init(&ctx->work_lock);
        INIT_LIST_HEAD(&ctx->tctx_list);
        ctx->submit_state.free_list.next = NULL;
        INIT_HLIST_HEAD(&ctx->waitid_list);
@@ -1080,6 +1081,22 @@ struct llist_node *io_handle_tw_list(struct llist_node *node,
        return node;
 }
 
+static __cold void ____io_fallback_tw(struct io_kiocb *req, bool sync,
+                                     struct io_ring_ctx **last_ctx)
+{
+       if (*last_ctx != req->ctx) {
+               if (*last_ctx) {
+                       if (sync)
+                               flush_delayed_work(&(*last_ctx)->fallback_work);
+                       percpu_ref_put(&(*last_ctx)->refs);
+               }
+               *last_ctx = req->ctx;
+               percpu_ref_get(&(*last_ctx)->refs);
+       }
+       if (llist_add(&req->io_task_work.node, &(*last_ctx)->fallback_llist))
+               schedule_delayed_work(&(*last_ctx)->fallback_work, 1);
+}
+
 static __cold void __io_fallback_tw(struct llist_node *node, bool sync)
 {
        struct io_ring_ctx *last_ctx = NULL;
@@ -1088,17 +1105,7 @@ static __cold void __io_fallback_tw(struct llist_node *node, bool sync)
        while (node) {
                req = container_of(node, struct io_kiocb, io_task_work.node);
                node = node->next;
-               if (last_ctx != req->ctx) {
-                       if (last_ctx) {
-                               if (sync)
-                                       flush_delayed_work(&last_ctx->fallback_work);
-                               percpu_ref_put(&last_ctx->refs);
-                       }
-                       last_ctx = req->ctx;
-                       percpu_ref_get(&last_ctx->refs);
-               }
-               if (llist_add(&req->io_task_work.node, &last_ctx->fallback_llist))
-                       schedule_delayed_work(&last_ctx->fallback_work, 1);
+               ____io_fallback_tw(req, sync, &last_ctx);
        }
 
        if (last_ctx) {
@@ -1152,66 +1159,48 @@ void tctx_task_work(struct callback_head *cb)
        WARN_ON_ONCE(ret);
 }
 
-static void io_req_local_work_add(struct io_kiocb *req, unsigned flags)
+static void io_req_local_work_add(struct io_kiocb *req, unsigned tw_flags)
 {
        struct io_ring_ctx *ctx = req->ctx;
-       unsigned nr_wait, nr_tw, nr_tw_prev;
-       struct llist_node *head;
+       unsigned nr_tw, nr_tw_prev, nr_wait;
+       unsigned long flags;
 
        /* See comment above IO_CQ_WAKE_INIT */
        BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES);
 
        /*
-        * We don't know how many reuqests is there in the link and whether
-        * they can even be queued lazily, fall back to non-lazy.
+        * We don't know how many requests are in the link and whether they can
+        * even be queued lazily, fall back to non-lazy.
         */
        if (req->flags & IO_REQ_LINK_FLAGS)
-               flags &= ~IOU_F_TWQ_LAZY_WAKE;
+               tw_flags &= ~IOU_F_TWQ_LAZY_WAKE;
 
        guard(rcu)();
+       spin_lock_irqsave(&ctx->work_lock, flags);
+       wq_list_add_tail(&req->io_task_work.work_node, &ctx->work_list);
+       nr_tw_prev = ctx->work_items++;
+       spin_unlock_irqrestore(&ctx->work_lock, flags);
 
-       head = READ_ONCE(ctx->work_llist.first);
-       do {
-               nr_tw_prev = 0;
-               if (head) {
-                       struct io_kiocb *first_req = container_of(head,
-                                                       struct io_kiocb,
-                                                       io_task_work.node);
-                       /*
-                        * Might be executed at any moment, rely on
-                        * SLAB_TYPESAFE_BY_RCU to keep it alive.
-                        */
-                       nr_tw_prev = READ_ONCE(first_req->nr_tw);
-               }
-
-               /*
-                * Theoretically, it can overflow, but that's fine as one of
-                * previous adds should've tried to wake the task.
-                */
-               nr_tw = nr_tw_prev + 1;
-               if (!(flags & IOU_F_TWQ_LAZY_WAKE))
-                       nr_tw = IO_CQ_WAKE_FORCE;
-
-               req->nr_tw = nr_tw;
-               req->io_task_work.node.next = head;
-       } while (!try_cmpxchg(&ctx->work_llist.first, &head,
-                             &req->io_task_work.node));
-
-       /*
-        * cmpxchg implies a full barrier, which pairs with the barrier
-        * in set_current_state() on the io_cqring_wait() side. It's used
-        * to ensure that either we see updated ->cq_wait_nr, or waiters
-        * going to sleep will observe the work added to the list, which
-        * is similar to the wait/wawke task state sync.
-        */
+       nr_tw = nr_tw_prev + 1;
+       if (!(tw_flags & IOU_F_TWQ_LAZY_WAKE))
+               nr_tw = IO_CQ_WAKE_FORCE;
 
-       if (!head) {
+       if (!nr_tw_prev) {
                if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
                        atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
                if (ctx->has_evfd)
                        io_eventfd_signal(ctx, false);
        }
+       /*
+        * We need a barrier after unlock, which pairs with the barrier
+        * in set_current_state() on the io_cqring_wait() side. It's used
+        * to ensure that either we see updated ->cq_wait_nr, or waiters
+        * going to sleep will observe the work added to the list, which
+        * is similar to the wait/wake task state sync.
+        */
 
+       smp_mb();
        nr_wait = atomic_read(&ctx->cq_wait_nr);
        /* not enough or no one is waiting */
        if (nr_tw < nr_wait)
@@ -1263,11 +1252,27 @@ void io_req_task_work_add_remote(struct io_kiocb *req, unsigned flags)
 
 static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx)
 {
-       struct llist_node *node = llist_del_all(&ctx->work_llist);
+       struct io_ring_ctx *last_ctx = NULL;
+       struct io_wq_work_node *node;
+       unsigned long flags;
+
+       spin_lock_irqsave(&ctx->work_lock, flags);
+       node = ctx->work_list.first;
+       INIT_WQ_LIST(&ctx->work_list);
+       ctx->work_items = 0;
+       spin_unlock_irqrestore(&ctx->work_lock, flags);
 
-       __io_fallback_tw(node, false);
-       node = llist_del_all(&ctx->retry_llist);
-       __io_fallback_tw(node, false);
+       while (node) {
+               struct io_kiocb *req;
+
+               req = container_of(node, struct io_kiocb, io_task_work.work_node);
+               node = node->next;
+               ____io_fallback_tw(req, false, &last_ctx);
+       }
+       if (last_ctx) {
+               flush_delayed_work(&last_ctx->fallback_work);
+               percpu_ref_put(&last_ctx->refs);
+       }
 }
 
 static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events,
@@ -1282,33 +1287,12 @@ static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events,
        return false;
 }
 
-static int __io_run_local_work_loop(struct llist_node **node,
-                                   io_tw_token_t tw,
-                                   int events)
-{
-       int ret = 0;
-
-       while (*node) {
-               struct llist_node *next = (*node)->next;
-               struct io_kiocb *req = container_of(*node, struct io_kiocb,
-                                                   io_task_work.node);
-               INDIRECT_CALL_2(req->io_task_work.func,
-                               io_poll_task_func, io_req_rw_complete,
-                               req, tw);
-               *node = next;
-               if (++ret >= events)
-                       break;
-       }
-
-       return ret;
-}
-
 static int __io_run_local_work(struct io_ring_ctx *ctx, io_tw_token_t tw,
                               int min_events, int max_events)
 {
-       struct llist_node *node;
+       struct io_wq_work_node *node, *tail;
+       int ret = 0, nitems;
        unsigned int loops = 0;
-       int ret = 0;
 
        if (WARN_ON_ONCE(ctx->submitter_task != current))
                return -EEXIST;
@@ -1316,17 +1300,37 @@ static int __io_run_local_work(struct io_ring_ctx *ctx, io_tw_token_t tw,
                atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
 again:
        min_events -= ret;
-       ret = __io_run_local_work_loop(&ctx->retry_llist.first, tw, max_events);
-       if (ctx->retry_llist.first)
+       spin_lock_irq(&ctx->work_lock);
+       node = ctx->work_list.first;
+       tail = ctx->work_list.last;
+       nitems = ctx->work_items;
+       INIT_WQ_LIST(&ctx->work_list);
+       ctx->work_items = 0;
+       spin_unlock_irq(&ctx->work_lock);
+
+       while (node) {
+               struct io_kiocb *req = container_of(node, struct io_kiocb,
+                                                   io_task_work.work_node);
+               node = node->next;
+               INDIRECT_CALL_2(req->io_task_work.func,
+                               io_poll_task_func, io_req_rw_complete,
+                               req, tw);
+               nitems--;
+               if (++ret >= max_events)
+                       break;
+       }
+
+       if (unlikely(node)) {
+               spin_lock_irq(&ctx->work_lock);
+               tail->next = ctx->work_list.first;
+               ctx->work_list.first = node;
+               if (!ctx->work_list.last)
+                       ctx->work_list.last = tail;
+               ctx->work_items += nitems;
+               spin_unlock_irq(&ctx->work_lock);
                goto retry_done;
+       }
 
-       /*
-        * llists are in reverse order, flip it back the right way before
-        * running the pending items.
-        */
-       node = llist_reverse_order(llist_del_all(&ctx->work_llist));
-       ret += __io_run_local_work_loop(&node, tw, max_events - ret);
-       ctx->retry_llist.first = node;
        loops++;
 
        if (io_run_local_work_continue(ctx, ret, min_events))
@@ -2435,7 +2439,7 @@ static enum hrtimer_restart io_cqring_min_timer_wakeup(struct hrtimer *timer)
        if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
                atomic_set(&ctx->cq_wait_nr, 1);
                smp_mb();
-               if (!llist_empty(&ctx->work_llist))
+               if (io_local_work_pending(ctx))
                        goto out_wake;
        }
 
index 56becba55c7e18700df4fb0a5ed112c45f5b0287..0d191ec45d0e0ee6ee16655f86d544cbfb08581b 100644 (file)
@@ -383,7 +383,7 @@ static inline int io_run_task_work(void)
 
 static inline bool io_local_work_pending(struct io_ring_ctx *ctx)
 {
-       return !llist_empty(&ctx->work_llist) || !llist_empty(&ctx->retry_llist);
+       return READ_ONCE(ctx->work_list.first);
 }
 
 static inline bool io_task_work_pending(struct io_ring_ctx *ctx)