io_uring: add lockless LIFO list implementation for task_work io_uring-defer-tw
authorJens Axboe <axboe@kernel.dk>
Fri, 22 Nov 2024 22:08:38 +0000 (15:08 -0700)
committerJens Axboe <axboe@kernel.dk>
Fri, 22 Nov 2024 22:08:38 +0000 (15:08 -0700)
Do this just for deferred task_work for now, still a lot of kinks to
iron out. As it's a WIP, bump tw max count to something large, as
a retry list isn't there yet.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
include/linux/io_uring_types.h
io_uring/io_uring.c

index 7ddac4d1d4b3242a0c8c0e733bdbae4762f5973c..946dd9ce1fcecc3f66ea76edb685b7999a0618cf 100644 (file)
@@ -44,9 +44,20 @@ struct io_wq_work_node {
        struct io_wq_work_node *next;
 };
 
+#ifdef CONFIG_64BIT
+typedef u128 io_wq_list_ptr_t;
+#else
+typedef u64 io_wq_list_ptr_t;
+#endif
+
 struct io_wq_work_list {
-       struct io_wq_work_node *first;
-       struct io_wq_work_node *last;
+       union {
+               struct {
+                       struct io_wq_work_node *first;
+                       struct io_wq_work_node *last;
+               };
+               io_wq_list_ptr_t list_ptr;
+       };
 };
 
 struct io_wq_work {
index b1cd77d2501330b8dfeca9bba9062142278e612f..bc4cb584670fbc034a459ba29bb1800f02b82616 100644 (file)
 
 #define IO_COMPL_BATCH                 32
 #define IO_REQ_ALLOC_BATCH             8
-#define IO_LOCAL_TW_DEFAULT_MAX                20
+#define IO_LOCAL_TW_DEFAULT_MAX                2000
 
 struct io_defer_entry {
        struct list_head        list;
@@ -1179,12 +1179,111 @@ void tctx_task_work(struct callback_head *cb)
        tctx_task_work_run(tctx);
 }
 
+#ifdef CONFIG_64BIT
+#ifdef system_has_cmpxchg128
+#define wq_has_cmpxchg()               system_has_cmpxchg128()
+#define wq_cmpxchg_list                        try_cmpxchg128
+#endif
+#else
+#ifdef system_has_cmpxchg64
+#define wq_has_cmpxchg()               system_has_cmpxchg64()
+#define wq_cmpxchg_list                        try_cmpxchg64
+#endif
+#endif
+
+#ifndef wq_has_cmpxchg
+#define wq_has_cmpxchg()               (0)
+#endif
+
+static void wq_ll_add_tail(struct io_wq_work_node *n, struct io_wq_work_list *l)
+{
+       n->next = NULL;
+       do {
+               struct io_wq_work_list new = {
+                       .first  = READ_ONCE(l->first) ?: n,
+                       .last   = n
+               };
+               struct io_wq_work_list old = {
+                       .first  = READ_ONCE(l->first),
+                       .last   = READ_ONCE(l->last)
+               };
+               struct io_wq_work_node *old_last = READ_ONCE(l->last);
+
+               if (wq_cmpxchg_list(&l->list_ptr, &old.list_ptr, new.list_ptr)) {
+                       if (old_last)
+                               WRITE_ONCE(old_last->next, n);
+                       break;
+               }
+       } while (1);
+}
+
+static int wq_list_add_tail_ll(struct io_ring_ctx *ctx, struct io_kiocb *req)
+{
+       unsigned int nr_tw_prev;
+       unsigned long flags;
+
+       if (wq_has_cmpxchg()) {
+               wq_ll_add_tail(&req->io_task_work.node, &ctx->work_list);
+               return 0;
+       }
+
+       spin_lock_irqsave(&ctx->work_lock, flags);
+       wq_list_add_tail(&req->io_task_work.node, &ctx->work_list);
+       nr_tw_prev = ctx->work_items++;
+       spin_unlock_irqrestore(&ctx->work_lock, flags);
+       return nr_tw_prev;
+}
+
+static void wq_ll_delete_all(struct io_wq_work_list *l,
+                            struct io_wq_work_list *dst)
+{
+       static struct io_wq_work_list new;
+
+       guard(rcu)();
+       do {
+               dst->first = READ_ONCE(l->first);
+               dst->last = READ_ONCE(l->last);
+
+               if (!wq_cmpxchg_list(&l->list_ptr, &dst->list_ptr, new.list_ptr))
+                       continue;
+               if (!dst->last)
+                       break;
+               if (!READ_ONCE(dst->last->next))
+                       break;
+       } while (1);
+}
+
+static void wq_list_delete_all_ll(struct io_ring_ctx *ctx,
+                                 struct io_wq_work_list *dst)
+{
+       if (wq_has_cmpxchg()) {
+               WRITE_ONCE(ctx->work_items, 0);
+               wq_ll_delete_all(&ctx->work_list, dst);
+               /* cmpxchg() provides the necessary memory barrier */
+               return;
+       }
+
+       spin_lock_irq(&ctx->work_lock);
+       *dst = ctx->work_list;
+       INIT_WQ_LIST(&ctx->work_list);
+       ctx->work_items = 0;
+       spin_unlock_irq(&ctx->work_lock);
+
+       /*
+        * We need a barrier after unlock, which pairs with the barrier
+        * in set_current_state() on the io_cqring_wait() side. It's used
+        * to ensure that either we see updated ->cq_wait_nr, or waiters
+        * going to sleep will observe the work added to the list, which
+        * is similar to the wait/wake task state sync.
+        */
+       smp_mb();
+}
+
 static inline void io_req_local_work_add(struct io_kiocb *req,
                                         struct io_ring_ctx *ctx,
                                         unsigned tw_flags)
 {
        unsigned nr_tw, nr_tw_prev, nr_wait;
-       unsigned long flags;
 
        /* See comment above IO_CQ_WAKE_INIT */
        BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES);
@@ -1196,10 +1295,9 @@ static inline void io_req_local_work_add(struct io_kiocb *req,
        if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK))
                tw_flags &= ~IOU_F_TWQ_LAZY_WAKE;
 
-       spin_lock_irqsave(&ctx->work_lock, flags);
-       wq_list_add_tail(&req->io_task_work.node, &ctx->work_list);
-       nr_tw_prev = ctx->work_items++;
-       spin_unlock_irqrestore(&ctx->work_lock, flags);
+       guard(rcu)();
+
+       nr_tw_prev = wq_list_add_tail_ll(ctx, req);
 
        nr_tw = nr_tw_prev + 1;
        if (!(tw_flags & IOU_F_TWQ_LAZY_WAKE))
@@ -1212,16 +1310,6 @@ static inline void io_req_local_work_add(struct io_kiocb *req,
                        io_eventfd_signal(ctx);
        }
 
-       guard(rcu)();
-
-       /*
-        * We need a barrier after unlock, which pairs with the barrier
-        * in set_current_state() on the io_cqring_wait() side. It's used
-        * to ensure that either we see updated ->cq_wait_nr, or waiters
-        * going to sleep will observe the work added to the list, which
-        * is similar to the wait/wake task state sync.
-        */
-       smp_mb();
        nr_wait = atomic_read(&ctx->cq_wait_nr);
        /* not enough or no one is waiting */
        if (nr_tw < nr_wait)
@@ -1308,7 +1396,8 @@ static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events,
 static int __io_run_local_work(struct io_ring_ctx *ctx, struct io_tw_state *ts,
                               int min_events)
 {
-       struct io_wq_work_node *node, *tail;
+       struct io_wq_work_node *node;
+       struct io_wq_work_list list;
        int ret, limit, nitems;
        unsigned int loops = 0;
 
@@ -1319,14 +1408,9 @@ static int __io_run_local_work(struct io_ring_ctx *ctx, struct io_tw_state *ts,
        ret = 0;
        limit = max(IO_LOCAL_TW_DEFAULT_MAX, min_events);
 again:
-       spin_lock_irq(&ctx->work_lock);
-       node = ctx->work_list.first;
-       tail = ctx->work_list.last;
-       nitems = ctx->work_items;
-       INIT_WQ_LIST(&ctx->work_list);
-       ctx->work_items = 0;
-       spin_unlock_irq(&ctx->work_lock);
-
+       wq_list_delete_all_ll(ctx, &list);
+       node = list.first;
+       nitems = 0;
        while (node) {
                struct io_kiocb *req = container_of(node, struct io_kiocb,
                                                    io_task_work.node);
@@ -1334,12 +1418,14 @@ again:
                INDIRECT_CALL_2(req->io_task_work.func,
                                io_poll_task_func, io_req_rw_complete,
                                req, ts);
-               nitems--;
-               if (++ret >= limit)
+               if (++nitems >= limit)
                        break;
        }
 
+       ret += nitems;
        if (unlikely(node)) {
+               WARN_ON_ONCE(1);
+#if 0
                spin_lock_irq(&ctx->work_lock);
                tail->next = ctx->work_list.first;
                ctx->work_list.first = node;
@@ -1347,6 +1433,7 @@ again:
                        ctx->work_list.last = tail;
                ctx->work_items += nitems;
                spin_unlock_irq(&ctx->work_lock);
+#endif
                goto retry_done;
        }