From: Jens Axboe Date: Fri, 22 Nov 2024 22:08:38 +0000 (-0700) Subject: io_uring: add lockless LIFO list implementation for task_work X-Git-Url: https://git.kernel.dk/?a=commitdiff_plain;h=refs%2Fheads%2Fio_uring-defer-tw;p=linux-2.6-block.git io_uring: add lockless LIFO list implementation for task_work Do this just for deferred task_work for now, still a lot of kinks to iron out. As it's a WIP, bump tw max count to something large, as a retry list isn't there yet. Signed-off-by: Jens Axboe --- diff --git a/include/linux/io_uring_types.h b/include/linux/io_uring_types.h index 7ddac4d1d4b3..946dd9ce1fce 100644 --- a/include/linux/io_uring_types.h +++ b/include/linux/io_uring_types.h @@ -44,9 +44,20 @@ struct io_wq_work_node { struct io_wq_work_node *next; }; +#ifdef CONFIG_64BIT +typedef u128 io_wq_list_ptr_t; +#else +typedef u64 io_wq_list_ptr_t; +#endif + struct io_wq_work_list { - struct io_wq_work_node *first; - struct io_wq_work_node *last; + union { + struct { + struct io_wq_work_node *first; + struct io_wq_work_node *last; + }; + io_wq_list_ptr_t list_ptr; + }; }; struct io_wq_work { diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c index b1cd77d25013..bc4cb584670f 100644 --- a/io_uring/io_uring.c +++ b/io_uring/io_uring.c @@ -121,7 +121,7 @@ #define IO_COMPL_BATCH 32 #define IO_REQ_ALLOC_BATCH 8 -#define IO_LOCAL_TW_DEFAULT_MAX 20 +#define IO_LOCAL_TW_DEFAULT_MAX 2000 struct io_defer_entry { struct list_head list; @@ -1179,12 +1179,111 @@ void tctx_task_work(struct callback_head *cb) tctx_task_work_run(tctx); } +#ifdef CONFIG_64BIT +#ifdef system_has_cmpxchg128 +#define wq_has_cmpxchg() system_has_cmpxchg128() +#define wq_cmpxchg_list try_cmpxchg128 +#endif +#else +#ifdef system_has_cmpxchg64 +#define wq_has_cmpxchg() system_has_cmpxchg64() +#define wq_cmpxchg_list try_cmpxchg64 +#endif +#endif + +#ifndef wq_has_cmpxchg +#define wq_has_cmpxchg() (0) +#endif + +static void wq_ll_add_tail(struct io_wq_work_node *n, struct io_wq_work_list *l) +{ + n->next = NULL; + do { + struct io_wq_work_list new = { + .first = READ_ONCE(l->first) ?: n, + .last = n + }; + struct io_wq_work_list old = { + .first = READ_ONCE(l->first), + .last = READ_ONCE(l->last) + }; + struct io_wq_work_node *old_last = READ_ONCE(l->last); + + if (wq_cmpxchg_list(&l->list_ptr, &old.list_ptr, new.list_ptr)) { + if (old_last) + WRITE_ONCE(old_last->next, n); + break; + } + } while (1); +} + +static int wq_list_add_tail_ll(struct io_ring_ctx *ctx, struct io_kiocb *req) +{ + unsigned int nr_tw_prev; + unsigned long flags; + + if (wq_has_cmpxchg()) { + wq_ll_add_tail(&req->io_task_work.node, &ctx->work_list); + return 0; + } + + spin_lock_irqsave(&ctx->work_lock, flags); + wq_list_add_tail(&req->io_task_work.node, &ctx->work_list); + nr_tw_prev = ctx->work_items++; + spin_unlock_irqrestore(&ctx->work_lock, flags); + return nr_tw_prev; +} + +static void wq_ll_delete_all(struct io_wq_work_list *l, + struct io_wq_work_list *dst) +{ + static struct io_wq_work_list new; + + guard(rcu)(); + do { + dst->first = READ_ONCE(l->first); + dst->last = READ_ONCE(l->last); + + if (!wq_cmpxchg_list(&l->list_ptr, &dst->list_ptr, new.list_ptr)) + continue; + if (!dst->last) + break; + if (!READ_ONCE(dst->last->next)) + break; + } while (1); +} + +static void wq_list_delete_all_ll(struct io_ring_ctx *ctx, + struct io_wq_work_list *dst) +{ + if (wq_has_cmpxchg()) { + WRITE_ONCE(ctx->work_items, 0); + wq_ll_delete_all(&ctx->work_list, dst); + /* cmpxchg() provides the necessary memory barrier */ + return; + } + + spin_lock_irq(&ctx->work_lock); + *dst = ctx->work_list; + INIT_WQ_LIST(&ctx->work_list); + ctx->work_items = 0; + spin_unlock_irq(&ctx->work_lock); + + /* + * We need a barrier after unlock, which pairs with the barrier + * in set_current_state() on the io_cqring_wait() side. It's used + * to ensure that either we see updated ->cq_wait_nr, or waiters + * going to sleep will observe the work added to the list, which + * is similar to the wait/wake task state sync. + */ + smp_mb(); +} + static inline void io_req_local_work_add(struct io_kiocb *req, struct io_ring_ctx *ctx, unsigned tw_flags) { unsigned nr_tw, nr_tw_prev, nr_wait; - unsigned long flags; /* See comment above IO_CQ_WAKE_INIT */ BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES); @@ -1196,10 +1295,9 @@ static inline void io_req_local_work_add(struct io_kiocb *req, if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK)) tw_flags &= ~IOU_F_TWQ_LAZY_WAKE; - spin_lock_irqsave(&ctx->work_lock, flags); - wq_list_add_tail(&req->io_task_work.node, &ctx->work_list); - nr_tw_prev = ctx->work_items++; - spin_unlock_irqrestore(&ctx->work_lock, flags); + guard(rcu)(); + + nr_tw_prev = wq_list_add_tail_ll(ctx, req); nr_tw = nr_tw_prev + 1; if (!(tw_flags & IOU_F_TWQ_LAZY_WAKE)) @@ -1212,16 +1310,6 @@ static inline void io_req_local_work_add(struct io_kiocb *req, io_eventfd_signal(ctx); } - guard(rcu)(); - - /* - * We need a barrier after unlock, which pairs with the barrier - * in set_current_state() on the io_cqring_wait() side. It's used - * to ensure that either we see updated ->cq_wait_nr, or waiters - * going to sleep will observe the work added to the list, which - * is similar to the wait/wake task state sync. - */ - smp_mb(); nr_wait = atomic_read(&ctx->cq_wait_nr); /* not enough or no one is waiting */ if (nr_tw < nr_wait) @@ -1308,7 +1396,8 @@ static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events, static int __io_run_local_work(struct io_ring_ctx *ctx, struct io_tw_state *ts, int min_events) { - struct io_wq_work_node *node, *tail; + struct io_wq_work_node *node; + struct io_wq_work_list list; int ret, limit, nitems; unsigned int loops = 0; @@ -1319,14 +1408,9 @@ static int __io_run_local_work(struct io_ring_ctx *ctx, struct io_tw_state *ts, ret = 0; limit = max(IO_LOCAL_TW_DEFAULT_MAX, min_events); again: - spin_lock_irq(&ctx->work_lock); - node = ctx->work_list.first; - tail = ctx->work_list.last; - nitems = ctx->work_items; - INIT_WQ_LIST(&ctx->work_list); - ctx->work_items = 0; - spin_unlock_irq(&ctx->work_lock); - + wq_list_delete_all_ll(ctx, &list); + node = list.first; + nitems = 0; while (node) { struct io_kiocb *req = container_of(node, struct io_kiocb, io_task_work.node); @@ -1334,12 +1418,14 @@ again: INDIRECT_CALL_2(req->io_task_work.func, io_poll_task_func, io_req_rw_complete, req, ts); - nitems--; - if (++ret >= limit) + if (++nitems >= limit) break; } + ret += nitems; if (unlikely(node)) { + WARN_ON_ONCE(1); +#if 0 spin_lock_irq(&ctx->work_lock); tail->next = ctx->work_list.first; ctx->work_list.first = node; @@ -1347,6 +1433,7 @@ again: ctx->work_list.last = tail; ctx->work_items += nitems; spin_unlock_irq(&ctx->work_lock); +#endif goto retry_done; }