From 548e7058eff37ce09f28cac22bbe44ed14c76ce1 Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Fri, 6 Dec 2024 10:16:06 -0700 Subject: [PATCH] foo Signed-off-by: Jens Axboe --- include/linux/io_uring_types.h | 144 +++-- include/uapi/linux/io_uring.h | 6 + io_uring/cancel.c | 13 +- io_uring/eventfd.c | 8 +- io_uring/fdinfo.c | 59 +- io_uring/filetable.c | 7 +- io_uring/futex.c | 35 +- io_uring/futex.h | 8 +- io_uring/io_uring.c | 1030 ++++++++++++++++++++------------ io_uring/io_uring.h | 180 +++--- io_uring/kbuf.c | 26 +- io_uring/memmap.c | 44 +- io_uring/memmap.h | 4 +- io_uring/msg_ring.c | 31 +- io_uring/napi.c | 2 +- io_uring/net.c | 12 +- io_uring/nop.c | 4 +- io_uring/notif.c | 7 +- io_uring/notif.h | 2 +- io_uring/openclose.c | 4 +- io_uring/poll.c | 102 +++- io_uring/register.c | 97 +-- io_uring/rsrc.c | 12 +- io_uring/rsrc.h | 6 +- io_uring/rw.c | 14 +- io_uring/splice.c | 6 +- io_uring/sqpoll.c | 23 +- io_uring/tctx.c | 55 +- io_uring/tctx.h | 26 +- io_uring/timeout.c | 17 +- io_uring/uring_cmd.c | 16 +- io_uring/waitid.c | 15 +- 32 files changed, 1226 insertions(+), 789 deletions(-) diff --git a/include/linux/io_uring_types.h b/include/linux/io_uring_types.h index 623d8e798a11..9048684b14b7 100644 --- a/include/linux/io_uring_types.h +++ b/include/linux/io_uring_types.h @@ -225,6 +225,79 @@ struct io_alloc_cache { size_t elem_size; }; +struct io_sq_cq { + struct task_struct *submitter_task; + + struct io_ring_ctx *ctx; + unsigned int ring_flags; + + /* + * Held over submit for this io_sq_cq, and protects the data + * structures in here as well. + */ + struct mutex ring_lock; + + struct io_rings *rings; + + /* + * Ring buffer of indices into array of io_uring_sqe, which is + * mmapped by the application using the IORING_OFF_SQES offset. + * + * This indirection could e.g. be used to assign fixed + * io_uring_sqe entries to operations and only submit them to + * the queue when needed. + * + * The kernel modifies neither the indices array nor the entries + * array. + */ + u32 *sq_array; + struct io_uring_sqe *sq_sqes; + unsigned cached_sq_head; + unsigned sq_entries; + + struct io_submit_state submit_state; + + struct io_alloc_cache apoll_cache; + struct io_alloc_cache netmsg_cache; + struct io_alloc_cache rw_cache; + struct io_alloc_cache uring_cache; +#ifdef CONFIG_FUTEX + struct io_alloc_cache futex_cache; +#endif + + struct io_alloc_cache msg_cache; + spinlock_t msg_lock; + + /* + * We cache a range of free CQEs we can use, once exhausted it + * should go through a slower range setup, see __io_get_cqe() + */ + struct io_uring_cqe *cqe_cached; + struct io_uring_cqe *cqe_sentinel; + + unsigned cached_cq_tail; + unsigned cq_entries; + unsigned cq_extra; + + struct io_hash_table cancel_table; + + /* + * task_work and async notification delivery cacheline. Expected to + * regularly bounce b/w CPUs. + */ + struct { + struct llist_head work_llist; + struct llist_head retry_llist; + unsigned long check_cq; + atomic_t cq_wait_nr; + atomic_t cq_timeouts; + struct wait_queue_head cq_wait; + } ____cacheline_aligned_in_smp; + + struct io_mapped_region sq_region; + struct io_mapped_region ring_region; +}; + struct io_ring_ctx { /* const or read-mostly hot data */ struct { @@ -243,8 +316,6 @@ struct io_ring_ctx { unsigned int compat: 1; unsigned int iowq_limits_set : 1; - struct task_struct *submitter_task; - struct io_rings *rings; struct percpu_ref refs; clockid_t clockid; @@ -252,28 +323,17 @@ struct io_ring_ctx { enum task_work_notify_mode notify_method; unsigned sq_thread_idle; + + unsigned int nr_sq; } ____cacheline_aligned_in_smp; + struct io_sq_cq __s; + struct io_sq_cq *s; + /* submission data */ struct { struct mutex uring_lock; - /* - * Ring buffer of indices into array of io_uring_sqe, which is - * mmapped by the application using the IORING_OFF_SQES offset. - * - * This indirection could e.g. be used to assign fixed - * io_uring_sqe entries to operations and only submit them to - * the queue when needed. - * - * The kernel modifies neither the indices array nor the entries - * array. - */ - u32 *sq_array; - struct io_uring_sqe *sq_sqes; - unsigned cached_sq_head; - unsigned sq_entries; - /* * Fixed resources fast path, should be accessed only under * uring_lock, and updated through io_uring_register(2) @@ -289,11 +349,13 @@ struct io_ring_ctx { bool poll_multi_queue; struct io_wq_work_list iopoll_list; + /* + * Read side protected by s->ring_lock, write side must grab + * all ring locks. + */ struct io_file_table file_table; struct io_rsrc_data buf_table; - struct io_submit_state submit_state; - /* * Modifications are protected by ->uring_lock and ->mmap_lock. * The flags, buf_pages and buf_nr_pages fields should be stable @@ -301,12 +363,6 @@ struct io_ring_ctx { */ struct xarray io_bl_xa; - struct io_hash_table cancel_table; - struct io_alloc_cache apoll_cache; - struct io_alloc_cache netmsg_cache; - struct io_alloc_cache rw_cache; - struct io_alloc_cache uring_cache; - /* * Any cancelable uring_cmd is added to this list in * ->uring_cmd() by io_uring_cmd_insert_cancelable() @@ -320,35 +376,12 @@ struct io_ring_ctx { } ____cacheline_aligned_in_smp; struct { - /* - * We cache a range of free CQEs we can use, once exhausted it - * should go through a slower range setup, see __io_get_cqe() - */ - struct io_uring_cqe *cqe_cached; - struct io_uring_cqe *cqe_sentinel; - - unsigned cached_cq_tail; - unsigned cq_entries; struct io_ev_fd __rcu *io_ev_fd; - unsigned cq_extra; void *cq_wait_arg; size_t cq_wait_size; } ____cacheline_aligned_in_smp; - /* - * task_work and async notification delivery cacheline. Expected to - * regularly bounce b/w CPUs. - */ - struct { - struct llist_head work_llist; - struct llist_head retry_llist; - unsigned long check_cq; - atomic_t cq_wait_nr; - atomic_t cq_timeouts; - struct wait_queue_head cq_wait; - } ____cacheline_aligned_in_smp; - /* timeouts */ struct { raw_spinlock_t timeout_lock; @@ -359,14 +392,16 @@ struct io_ring_ctx { spinlock_t completion_lock; + /* protected by ->completion_lock */ struct list_head io_buffers_comp; + + /* protected by ->uring_lock */ struct list_head cq_overflow_list; struct hlist_head waitid_list; #ifdef CONFIG_FUTEX struct hlist_head futex_list; - struct io_alloc_cache futex_cache; #endif const struct cred *sq_creds; /* cred used for __io_sq_thread() */ @@ -405,10 +440,9 @@ struct io_ring_ctx { u32 iowq_limits[2]; struct callback_head poll_wq_task_work; - struct list_head defer_list; - struct io_alloc_cache msg_cache; - spinlock_t msg_lock; + /* protected by ->completion_lock */ + struct list_head defer_list; #ifdef CONFIG_NET_RX_BUSY_POLL struct list_head napi_list; /* track busy poll napi_id */ @@ -432,13 +466,12 @@ struct io_ring_ctx { */ struct mutex mmap_lock; - struct io_mapped_region sq_region; - struct io_mapped_region ring_region; /* used for optimised request parameter and wait argument passing */ struct io_mapped_region param_region; }; struct io_tw_state { + struct io_sq_cq *sq; }; enum { @@ -632,6 +665,7 @@ struct io_kiocb { struct io_cqe cqe; struct io_ring_ctx *ctx; + struct io_sq_cq *sq; struct io_uring_task *tctx; union { diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h index 38f0d6b10eaf..05132fe59b96 100644 --- a/include/uapi/linux/io_uring.h +++ b/include/uapi/linux/io_uring.h @@ -219,6 +219,9 @@ enum io_uring_sqe_flags_bit { /* Use hybrid poll in iopoll process */ #define IORING_SETUP_HYBRID_IOPOLL (1U << 17) +/* multiple submit/completion contexts */ +#define IORING_SETUP_THREAD_ISSUER (1U << 18) + enum io_uring_op { IORING_OP_NOP, IORING_OP_READV, @@ -489,6 +492,7 @@ struct io_uring_cqe { #define IORING_OFF_PBUF_RING 0x80000000ULL #define IORING_OFF_PBUF_SHIFT 16 #define IORING_OFF_MMAP_MASK 0xf8000000ULL +#define IORING_OFF_ISSUER_SHIFT 16 /* * Filled with the offset for mmap(2) @@ -652,6 +656,8 @@ enum io_uring_register_op { IORING_REGISTER_USE_REGISTERED_RING = 1U << 31 }; +#define IO_URING_MAX_CONTEXTS 128 + /* io-wq worker categories */ enum io_wq_type { IO_WQ_BOUND, diff --git a/io_uring/cancel.c b/io_uring/cancel.c index 484193567839..f97d2cb93ec5 100644 --- a/io_uring/cancel.c +++ b/io_uring/cancel.c @@ -181,7 +181,7 @@ static int __io_async_cancel(struct io_cancel_data *cd, } while (1); /* slow path, try all io-wq's */ - io_ring_submit_lock(ctx, issue_flags); + io_ring_submit_lock(ctx->s, issue_flags); ret = -ENOENT; list_for_each_entry(node, &ctx->tctx_list, ctx_node) { ret = io_async_cancel_one(node->task->io_uring, cd); @@ -191,7 +191,7 @@ static int __io_async_cancel(struct io_cancel_data *cd, nr++; } } - io_ring_submit_unlock(ctx, issue_flags); + io_ring_submit_unlock(ctx->s, issue_flags); return all ? nr : ret; } @@ -254,7 +254,6 @@ static int __io_sync_cancel(struct io_uring_task *tctx, } int io_sync_cancel(struct io_ring_ctx *ctx, void __user *arg) - __must_hold(&ctx->uring_lock) { struct io_cancel_data cd = { .ctx = ctx, @@ -266,6 +265,10 @@ int io_sync_cancel(struct io_ring_ctx *ctx, void __user *arg) DEFINE_WAIT(wait); int ret, i; + lockdep_assert_held(&ctx->uring_lock); + + guard(mutex)(&ctx->s->ring_lock); + if (copy_from_user(&sc, arg, sizeof(sc))) return -EFAULT; if (sc.flags & ~CANCEL_FLAGS) @@ -312,7 +315,7 @@ int io_sync_cancel(struct io_ring_ctx *ctx, void __user *arg) do { cd.seq = atomic_inc_return(&ctx->cancel_seq); - prepare_to_wait(&ctx->cq_wait, &wait, TASK_INTERRUPTIBLE); + prepare_to_wait(&ctx->s->cq_wait, &wait, TASK_INTERRUPTIBLE); ret = __io_sync_cancel(current->io_uring, &cd, sc.fd); @@ -331,7 +334,7 @@ int io_sync_cancel(struct io_ring_ctx *ctx, void __user *arg) mutex_lock(&ctx->uring_lock); } while (1); - finish_wait(&ctx->cq_wait, &wait); + finish_wait(&ctx->s->cq_wait, &wait); mutex_lock(&ctx->uring_lock); if (ret == -ENOENT || ret > 0) diff --git a/io_uring/eventfd.c b/io_uring/eventfd.c index fab936d31ba8..e4c227f5eb82 100644 --- a/io_uring/eventfd.c +++ b/io_uring/eventfd.c @@ -91,7 +91,7 @@ static struct io_ev_fd *io_eventfd_grab(struct io_ring_ctx *ctx) { struct io_ev_fd *ev_fd; - if (READ_ONCE(ctx->rings->cq_flags) & IORING_CQ_EVENTFD_DISABLED) + if (READ_ONCE(ctx->s->rings->cq_flags) & IORING_CQ_EVENTFD_DISABLED) return NULL; rcu_read_lock(); @@ -141,8 +141,8 @@ void io_eventfd_flush_signal(struct io_ring_ctx *ctx) * the CQ ring. */ spin_lock(&ctx->completion_lock); - skip = ctx->cached_cq_tail == ev_fd->last_cq_tail; - ev_fd->last_cq_tail = ctx->cached_cq_tail; + skip = ctx->s->cached_cq_tail == ev_fd->last_cq_tail; + ev_fd->last_cq_tail = ctx->s->cached_cq_tail; spin_unlock(&ctx->completion_lock); if (!skip) @@ -180,7 +180,7 @@ int io_eventfd_register(struct io_ring_ctx *ctx, void __user *arg, } spin_lock(&ctx->completion_lock); - ev_fd->last_cq_tail = ctx->cached_cq_tail; + ev_fd->last_cq_tail = ctx->s->cached_cq_tail; spin_unlock(&ctx->completion_lock); ev_fd->eventfd_async = eventfd_async; diff --git a/io_uring/fdinfo.c b/io_uring/fdinfo.c index b214e5a407b5..b44832d691dd 100644 --- a/io_uring/fdinfo.c +++ b/io_uring/fdinfo.c @@ -86,17 +86,10 @@ static inline void napi_show_fdinfo(struct io_ring_ctx *ctx, } #endif -/* - * Caller holds a reference to the file already, we don't need to do - * anything else to get an extra reference. - */ -__cold void io_uring_show_fdinfo(struct seq_file *m, struct file *file) +static void io_uring_show_s(struct seq_file *m, struct io_sq_cq *s, int idx) { - struct io_ring_ctx *ctx = file->private_data; - struct io_overflow_cqe *ocqe; - struct io_rings *r = ctx->rings; - struct rusage sq_usage; - unsigned int sq_mask = ctx->sq_entries - 1, cq_mask = ctx->cq_entries - 1; + struct io_rings *r = s->rings; + unsigned int sq_mask = s->sq_entries - 1, cq_mask = s->cq_entries - 1; unsigned int sq_head = READ_ONCE(r->sq.head); unsigned int sq_tail = READ_ONCE(r->sq.tail); unsigned int cq_head = READ_ONCE(r->cq.head); @@ -104,14 +97,11 @@ __cold void io_uring_show_fdinfo(struct seq_file *m, struct file *file) unsigned int cq_shift = 0; unsigned int sq_shift = 0; unsigned int sq_entries, cq_entries; - int sq_pid = -1, sq_cpu = -1; - u64 sq_total_time = 0, sq_work_time = 0; - bool has_lock; unsigned int i; - if (ctx->flags & IORING_SETUP_CQE32) + if (s->ctx->flags & IORING_SETUP_CQE32) cq_shift = 1; - if (ctx->flags & IORING_SETUP_SQE128) + if (s->ctx->flags & IORING_SETUP_SQE128) sq_shift = 1; /* @@ -120,27 +110,29 @@ __cold void io_uring_show_fdinfo(struct seq_file *m, struct file *file) * and sq_tail and cq_head are changed by userspace. But it's ok since * we usually use these info when it is stuck. */ + seq_printf(m, "Issuer:\t%d\n", idx); seq_printf(m, "SqMask:\t0x%x\n", sq_mask); seq_printf(m, "SqHead:\t%u\n", sq_head); seq_printf(m, "SqTail:\t%u\n", sq_tail); - seq_printf(m, "CachedSqHead:\t%u\n", ctx->cached_sq_head); + seq_printf(m, "CachedSqHead:\t%u\n", s->cached_sq_head); seq_printf(m, "CqMask:\t0x%x\n", cq_mask); seq_printf(m, "CqHead:\t%u\n", cq_head); seq_printf(m, "CqTail:\t%u\n", cq_tail); - seq_printf(m, "CachedCqTail:\t%u\n", ctx->cached_cq_tail); + seq_printf(m, "CachedCqTail:\t%u\n", s->cached_cq_tail); seq_printf(m, "SQEs:\t%u\n", sq_tail - sq_head); - sq_entries = min(sq_tail - sq_head, ctx->sq_entries); + sq_entries = min(sq_tail - sq_head, s->sq_entries); + for (i = 0; i < sq_entries; i++) { unsigned int entry = i + sq_head; struct io_uring_sqe *sqe; unsigned int sq_idx; - if (ctx->flags & IORING_SETUP_NO_SQARRAY) + if (s->ring_flags & IORING_SETUP_NO_SQARRAY) break; - sq_idx = READ_ONCE(ctx->sq_array[entry & sq_mask]); + sq_idx = READ_ONCE(s->sq_array[entry & sq_mask]); if (sq_idx > sq_mask) continue; - sqe = &ctx->sq_sqes[sq_idx << sq_shift]; + sqe = &s->sq_sqes[sq_idx << sq_shift]; seq_printf(m, "%5u: opcode:%s, fd:%d, flags:%x, off:%llu, " "addr:0x%llx, rw_flags:0x%x, buf_index:%d " "user_data:%llu", @@ -162,7 +154,7 @@ __cold void io_uring_show_fdinfo(struct seq_file *m, struct file *file) seq_printf(m, "\n"); } seq_printf(m, "CQEs:\t%u\n", cq_tail - cq_head); - cq_entries = min(cq_tail - cq_head, ctx->cq_entries); + cq_entries = min(cq_tail - cq_head, s->cq_entries); for (i = 0; i < cq_entries; i++) { unsigned int entry = i + cq_head; struct io_uring_cqe *cqe = &r->cqes[(entry & cq_mask) << cq_shift]; @@ -175,6 +167,25 @@ __cold void io_uring_show_fdinfo(struct seq_file *m, struct file *file) cqe->big_cqe[0], cqe->big_cqe[1]); seq_printf(m, "\n"); } +} + +/* + * Caller holds a reference to the file already, we don't need to do + * anything else to get an extra reference. + */ +__cold void io_uring_show_fdinfo(struct seq_file *m, struct file *file) +{ + struct io_ring_ctx *ctx = file->private_data; + struct io_overflow_cqe *ocqe; + struct rusage sq_usage; + int sq_pid = -1, sq_cpu = -1; + u64 sq_total_time = 0, sq_work_time = 0; + struct io_sq_cq *s; + bool has_lock; + unsigned int i; + + io_for_each_s(ctx, s, i) + io_uring_show_s(m, s, i); /* * Avoid ABBA deadlock between the seq lock and the io_uring mutex, @@ -237,8 +248,8 @@ __cold void io_uring_show_fdinfo(struct seq_file *m, struct file *file) } seq_puts(m, "PollList:\n"); - for (i = 0; has_lock && i < (1U << ctx->cancel_table.hash_bits); i++) { - struct io_hash_bucket *hb = &ctx->cancel_table.hbs[i]; + for (i = 0; has_lock && i < (1U << ctx->s->cancel_table.hash_bits); i++) { + struct io_hash_bucket *hb = &ctx->s->cancel_table.hbs[i]; struct io_kiocb *req; hlist_for_each_entry(req, &hb->list, hash_node) diff --git a/io_uring/filetable.c b/io_uring/filetable.c index a21660e3145a..d59b5d72d376 100644 --- a/io_uring/filetable.c +++ b/io_uring/filetable.c @@ -57,10 +57,11 @@ void io_free_file_tables(struct io_ring_ctx *ctx, struct io_file_table *table) static int io_install_fixed_file(struct io_ring_ctx *ctx, struct file *file, u32 slot_index) - __must_hold(&req->ctx->uring_lock) { struct io_rsrc_node *node; + lockdep_assert_held(&ctx->uring_lock); + if (io_is_uring_fops(file)) return -EBADF; if (!ctx->file_table.data.nr) @@ -110,9 +111,9 @@ int io_fixed_fd_install(struct io_kiocb *req, unsigned int issue_flags, struct io_ring_ctx *ctx = req->ctx; int ret; - io_ring_submit_lock(ctx, issue_flags); + io_ring_submit_lock(req->sq, issue_flags); ret = __io_fixed_fd_install(ctx, file, file_slot); - io_ring_submit_unlock(ctx, issue_flags); + io_ring_submit_unlock(req->sq, issue_flags); if (unlikely(ret < 0)) fput(file); diff --git a/io_uring/futex.c b/io_uring/futex.c index 30139cc150f2..a67b0c7aef84 100644 --- a/io_uring/futex.c +++ b/io_uring/futex.c @@ -33,15 +33,15 @@ struct io_futex_data { #define IO_FUTEX_ALLOC_CACHE_MAX 32 -bool io_futex_cache_init(struct io_ring_ctx *ctx) +bool io_futex_cache_init(struct io_sq_cq *s) { - return io_alloc_cache_init(&ctx->futex_cache, IO_FUTEX_ALLOC_CACHE_MAX, + return io_alloc_cache_init(&s->futex_cache, IO_FUTEX_ALLOC_CACHE_MAX, sizeof(struct io_futex_data)); } -void io_futex_cache_free(struct io_ring_ctx *ctx) +void io_futex_cache_free(struct io_sq_cq *s) { - io_alloc_cache_free(&ctx->futex_cache, kfree); + io_alloc_cache_free(&s->futex_cache, kfree); } static void __io_futex_complete(struct io_kiocb *req, struct io_tw_state *ts) @@ -54,10 +54,10 @@ static void __io_futex_complete(struct io_kiocb *req, struct io_tw_state *ts) static void io_futex_complete(struct io_kiocb *req, struct io_tw_state *ts) { struct io_futex_data *ifd = req->async_data; - struct io_ring_ctx *ctx = req->ctx; + struct io_sq_cq *s = ts->sq; - io_tw_lock(ctx, ts); - if (!io_alloc_cache_put(&ctx->futex_cache, ifd)) + io_tw_lock(s); + if (!io_alloc_cache_put(&s->futex_cache, ifd)) kfree(ifd); __io_futex_complete(req, ts); } @@ -67,7 +67,7 @@ static void io_futexv_complete(struct io_kiocb *req, struct io_tw_state *ts) struct io_futex *iof = io_kiocb_to_cmd(req, struct io_futex); struct futex_vector *futexv = req->async_data; - io_tw_lock(req->ctx, ts); + io_tw_lock(ts->sq); if (!iof->futexv_unqueued) { int res; @@ -123,7 +123,7 @@ int io_futex_cancel(struct io_ring_ctx *ctx, struct io_cancel_data *cd, if (cd->flags & (IORING_ASYNC_CANCEL_FD|IORING_ASYNC_CANCEL_FD_FIXED)) return -ENOENT; - io_ring_submit_lock(ctx, issue_flags); + io_ring_submit_lock(ctx->s, issue_flags); hlist_for_each_entry_safe(req, tmp, &ctx->futex_list, hash_node) { if (req->cqe.user_data != cd->data && !(cd->flags & IORING_ASYNC_CANCEL_ANY)) @@ -133,7 +133,7 @@ int io_futex_cancel(struct io_ring_ctx *ctx, struct io_cancel_data *cd, if (!(cd->flags & IORING_ASYNC_CANCEL_ALL)) break; } - io_ring_submit_unlock(ctx, issue_flags); + io_ring_submit_unlock(ctx->s, issue_flags); if (nr) return nr; @@ -258,7 +258,7 @@ int io_futexv_wait(struct io_kiocb *req, unsigned int issue_flags) struct io_ring_ctx *ctx = req->ctx; int ret, woken = -1; - io_ring_submit_lock(ctx, issue_flags); + io_ring_submit_lock(ctx->s, issue_flags); ret = futex_wait_multiple_setup(futexv, iof->futex_nr, &woken); @@ -266,7 +266,7 @@ int io_futexv_wait(struct io_kiocb *req, unsigned int issue_flags) * Error case, ret is < 0. Mark the request as failed. */ if (unlikely(ret < 0)) { - io_ring_submit_unlock(ctx, issue_flags); + io_ring_submit_unlock(ctx->s, issue_flags); req_set_fail(req); io_req_set_res(req, ret, 0); kfree(futexv); @@ -302,7 +302,7 @@ int io_futexv_wait(struct io_kiocb *req, unsigned int issue_flags) io_req_set_res(req, woken, 0); } - io_ring_submit_unlock(ctx, issue_flags); + io_ring_submit_unlock(ctx->s, issue_flags); return IOU_ISSUE_SKIP_COMPLETE; } @@ -312,6 +312,7 @@ int io_futex_wait(struct io_kiocb *req, unsigned int issue_flags) struct io_ring_ctx *ctx = req->ctx; struct io_futex_data *ifd = NULL; struct futex_hash_bucket *hb; + struct io_sq_cq *s = req->sq; int ret; if (!iof->futex_mask) { @@ -319,8 +320,8 @@ int io_futex_wait(struct io_kiocb *req, unsigned int issue_flags) goto done; } - io_ring_submit_lock(ctx, issue_flags); - ifd = io_cache_alloc(&ctx->futex_cache, GFP_NOWAIT, NULL); + io_ring_submit_lock(s, issue_flags); + ifd = io_cache_alloc(&s->futex_cache, GFP_NOWAIT, NULL); if (!ifd) { ret = -ENOMEM; goto done_unlock; @@ -336,14 +337,14 @@ int io_futex_wait(struct io_kiocb *req, unsigned int issue_flags) &ifd->q, &hb); if (!ret) { hlist_add_head(&req->hash_node, &ctx->futex_list); - io_ring_submit_unlock(ctx, issue_flags); + io_ring_submit_unlock(s, issue_flags); futex_queue(&ifd->q, hb); return IOU_ISSUE_SKIP_COMPLETE; } done_unlock: - io_ring_submit_unlock(ctx, issue_flags); + io_ring_submit_unlock(s, issue_flags); done: if (ret < 0) req_set_fail(req); diff --git a/io_uring/futex.h b/io_uring/futex.h index d789fcf715e3..19755f05e229 100644 --- a/io_uring/futex.h +++ b/io_uring/futex.h @@ -13,8 +13,8 @@ int io_futex_cancel(struct io_ring_ctx *ctx, struct io_cancel_data *cd, unsigned int issue_flags); bool io_futex_remove_all(struct io_ring_ctx *ctx, struct io_uring_task *tctx, bool cancel_all); -bool io_futex_cache_init(struct io_ring_ctx *ctx); -void io_futex_cache_free(struct io_ring_ctx *ctx); +bool io_futex_cache_init(struct io_sq_cq *s); +void io_futex_cache_free(struct io_sq_cq *s); #else static inline int io_futex_cancel(struct io_ring_ctx *ctx, struct io_cancel_data *cd, @@ -27,11 +27,11 @@ static inline bool io_futex_remove_all(struct io_ring_ctx *ctx, { return false; } -static inline bool io_futex_cache_init(struct io_ring_ctx *ctx) +static inline bool io_futex_cache_init(struct io_sq_cq *s) { return false; } -static inline void io_futex_cache_free(struct io_ring_ctx *ctx) +static inline void io_futex_cache_free(struct io_sq_cq *s) { } #endif diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c index 0417d80c9cbe..b081270ee016 100644 --- a/io_uring/io_uring.c +++ b/io_uring/io_uring.c @@ -176,14 +176,14 @@ static struct ctl_table kernel_io_uring_disabled_table[] = { }; #endif -static inline unsigned int __io_cqring_events(struct io_ring_ctx *ctx) +static inline unsigned int __io_cqring_events(struct io_sq_cq *s) { - return ctx->cached_cq_tail - READ_ONCE(ctx->rings->cq.head); + return s->cached_cq_tail - READ_ONCE(s->rings->cq.head); } -static inline unsigned int __io_cqring_events_user(struct io_ring_ctx *ctx) +static inline unsigned int __io_cqring_events_user(struct io_sq_cq *s) { - return READ_ONCE(ctx->rings->cq.tail) - READ_ONCE(ctx->rings->cq.head); + return READ_ONCE(s->rings->cq.tail) - READ_ONCE(s->rings->cq.head); } static bool io_match_linked(struct io_kiocb *head) @@ -230,9 +230,10 @@ static inline void req_fail_link_node(struct io_kiocb *req, int res) io_req_set_res(req, res, 0); } -static inline void io_req_add_to_cache(struct io_kiocb *req, struct io_ring_ctx *ctx) +static inline void io_req_add_to_cache(struct io_kiocb *req, + struct io_submit_state *state) { - wq_stack_add_head(&req->comp_list, &ctx->submit_state.free_list); + wq_stack_add_head(&req->comp_list, &state->free_list); } static __cold void io_ring_ctx_ref_free(struct percpu_ref *ref) @@ -251,11 +252,15 @@ static __cold void io_fallback_req_func(struct work_struct *work) struct io_tw_state ts = {}; percpu_ref_get(&ctx->refs); - mutex_lock(&ctx->uring_lock); - llist_for_each_entry_safe(req, tmp, node, io_task_work.node) + llist_for_each_entry_safe(req, tmp, node, io_task_work.node) { + struct io_sq_cq *s = req->sq; + + mutex_lock(&s->ring_lock); + ts.sq = s; req->io_task_work.func(req, &ts); - io_submit_flush_completions(ctx); - mutex_unlock(&ctx->uring_lock); + io_submit_flush_completions(s); + mutex_unlock(&s->ring_lock); + } percpu_ref_put(&ctx->refs); } @@ -281,56 +286,95 @@ static int io_alloc_hash_table(struct io_hash_table *table, unsigned bits) return 0; } -static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) +static void free_s(struct io_sq_cq *s) { - struct io_ring_ctx *ctx; - int hash_bits; - bool ret; - - ctx = kzalloc(sizeof(*ctx), GFP_KERNEL); - if (!ctx) - return NULL; + io_alloc_cache_free(&s->apoll_cache, kfree); + io_alloc_cache_free(&s->netmsg_cache, io_netmsg_cache_free); + io_alloc_cache_free(&s->rw_cache, io_rw_cache_free); + io_alloc_cache_free(&s->uring_cache, kfree); + io_alloc_cache_free(&s->msg_cache, kfree); + io_futex_cache_free(s); + kvfree(s->cancel_table.hbs); +} - xa_init(&ctx->io_bl_xa); +int init_s(struct io_ring_ctx *ctx, struct io_sq_cq *s, unsigned int cq_entries) +{ + int hash_bits, ret; /* * Use 5 bits less than the max cq entries, that should give us around * 32 entries per hash list if totally full and uniformly spread, but * don't keep too many buckets to not overconsume memory. */ - hash_bits = ilog2(p->cq_entries) - 5; + hash_bits = ilog2(cq_entries) - 5; hash_bits = clamp(hash_bits, 1, 8); - if (io_alloc_hash_table(&ctx->cancel_table, hash_bits)) - goto err; + + s->ring_flags = ctx->flags; + s->ctx = ctx; + mutex_init(&s->ring_lock); + atomic_set(&s->cq_wait_nr, IO_CQ_WAKE_INIT); + init_waitqueue_head(&s->cq_wait); + init_llist_head(&s->work_llist); + s->submit_state.free_list.next = NULL; + INIT_WQ_LIST(&s->submit_state.compl_reqs); + spin_lock_init(&s->msg_lock); + + if (io_alloc_hash_table(&s->cancel_table, hash_bits)) + return -ENOMEM; + + ret = io_alloc_cache_init(&s->apoll_cache, IO_POLL_ALLOC_CACHE_MAX, + sizeof(struct async_poll)); + ret |= io_alloc_cache_init(&s->netmsg_cache, IO_ALLOC_CACHE_MAX, + sizeof(struct io_async_msghdr)); + ret |= io_alloc_cache_init(&s->rw_cache, IO_ALLOC_CACHE_MAX, + sizeof(struct io_async_rw)); + ret |= io_alloc_cache_init(&s->uring_cache, IO_ALLOC_CACHE_MAX, + sizeof(struct uring_cache)); + ret |= io_alloc_cache_init(&s->msg_cache, IO_ALLOC_CACHE_MAX, + sizeof(struct io_kiocb)); + ret |= io_futex_cache_init(s); + if (ret) + return -ENOMEM; + + ctx->nr_sq++; + return 0; +} + +static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p, + u32 contexts) +{ + struct io_ring_ctx *ctx; + int i; + + ctx = kzalloc(sizeof(*ctx), GFP_KERNEL); + if (!ctx) + return NULL; + + ctx->s = kzalloc(sizeof(struct io_sq_cq) * contexts, GFP_KERNEL); + if (!ctx->s) { + kfree(ctx); + return NULL; + } + + xa_init(&ctx->io_bl_xa); + if (percpu_ref_init(&ctx->refs, io_ring_ctx_ref_free, 0, GFP_KERNEL)) goto err; ctx->flags = p->flags; ctx->hybrid_poll_time = LLONG_MAX; - atomic_set(&ctx->cq_wait_nr, IO_CQ_WAKE_INIT); + for (i = 0; i < contexts; i++) { + if (init_s(ctx, &ctx->s[i], p->cq_entries)) + goto free_ref; + } init_waitqueue_head(&ctx->sqo_sq_wait); INIT_LIST_HEAD(&ctx->sqd_list); INIT_LIST_HEAD(&ctx->cq_overflow_list); INIT_LIST_HEAD(&ctx->io_buffers_cache); - ret = io_alloc_cache_init(&ctx->apoll_cache, IO_POLL_ALLOC_CACHE_MAX, - sizeof(struct async_poll)); - ret |= io_alloc_cache_init(&ctx->netmsg_cache, IO_ALLOC_CACHE_MAX, - sizeof(struct io_async_msghdr)); - ret |= io_alloc_cache_init(&ctx->rw_cache, IO_ALLOC_CACHE_MAX, - sizeof(struct io_async_rw)); - ret |= io_alloc_cache_init(&ctx->uring_cache, IO_ALLOC_CACHE_MAX, - sizeof(struct uring_cache)); - spin_lock_init(&ctx->msg_lock); - ret |= io_alloc_cache_init(&ctx->msg_cache, IO_ALLOC_CACHE_MAX, - sizeof(struct io_kiocb)); - ret |= io_futex_cache_init(ctx); - if (ret) - goto free_ref; init_completion(&ctx->ref_comp); xa_init_flags(&ctx->personalities, XA_FLAGS_ALLOC1); mutex_init(&ctx->uring_lock); - init_waitqueue_head(&ctx->cq_wait); init_waitqueue_head(&ctx->poll_wq); spin_lock_init(&ctx->completion_lock); raw_spin_lock_init(&ctx->timeout_lock); @@ -339,15 +383,12 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) INIT_LIST_HEAD(&ctx->defer_list); INIT_LIST_HEAD(&ctx->timeout_list); INIT_LIST_HEAD(&ctx->ltimeout_list); - init_llist_head(&ctx->work_llist); INIT_LIST_HEAD(&ctx->tctx_list); - ctx->submit_state.free_list.next = NULL; INIT_HLIST_HEAD(&ctx->waitid_list); #ifdef CONFIG_FUTEX INIT_HLIST_HEAD(&ctx->futex_list); #endif INIT_DELAYED_WORK(&ctx->fallback_work, io_fallback_req_func); - INIT_WQ_LIST(&ctx->submit_state.compl_reqs); INIT_HLIST_HEAD(&ctx->cancelable_uring_cmd); io_napi_init(ctx); mutex_init(&ctx->mmap_lock); @@ -357,32 +398,26 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) free_ref: percpu_ref_exit(&ctx->refs); err: - io_alloc_cache_free(&ctx->apoll_cache, kfree); - io_alloc_cache_free(&ctx->netmsg_cache, io_netmsg_cache_free); - io_alloc_cache_free(&ctx->rw_cache, io_rw_cache_free); - io_alloc_cache_free(&ctx->uring_cache, kfree); - io_alloc_cache_free(&ctx->msg_cache, kfree); - io_futex_cache_free(ctx); - kvfree(ctx->cancel_table.hbs); xa_destroy(&ctx->io_bl_xa); + kfree(ctx->s); kfree(ctx); return NULL; } -static void io_account_cq_overflow(struct io_ring_ctx *ctx) +static void io_account_cq_overflow(struct io_sq_cq *s) { - struct io_rings *r = ctx->rings; + struct io_rings *r = s->rings; WRITE_ONCE(r->cq_overflow, READ_ONCE(r->cq_overflow) + 1); - ctx->cq_extra--; + s->cq_extra--; } static bool req_need_defer(struct io_kiocb *req, u32 seq) { if (unlikely(req->flags & REQ_F_IO_DRAIN)) { - struct io_ring_ctx *ctx = req->ctx; + struct io_sq_cq *s = req->sq; - return seq + READ_ONCE(ctx->cq_extra) != ctx->cached_cq_tail; + return seq + READ_ONCE(s->cq_extra) != s->cached_cq_tail; } return false; @@ -548,6 +583,8 @@ void io_req_queue_iowq(struct io_kiocb *req) static __cold void io_queue_deferred(struct io_ring_ctx *ctx) { + lockdep_assert_held(&ctx->completion_lock); + while (!list_empty(&ctx->defer_list)) { struct io_defer_entry *de = list_first_entry(&ctx->defer_list, struct io_defer_entry, list); @@ -587,36 +624,41 @@ static inline void io_cq_lock(struct io_ring_ctx *ctx) spin_lock(&ctx->completion_lock); } -static inline void __io_cq_unlock_post(struct io_ring_ctx *ctx) +static inline void __io_cq_unlock_post(struct io_sq_cq *s) { - io_commit_cqring(ctx); + struct io_ring_ctx *ctx = s->ctx; + + io_commit_cqring(s); if (!ctx->task_complete) { if (!ctx->lockless_cq) spin_unlock(&ctx->completion_lock); /* IOPOLL rings only need to wake up if it's also SQPOLL */ if (!ctx->syscall_iopoll) - io_cqring_wake(ctx); + io_cqring_wake(s); } io_commit_cqring_flush(ctx); } -static void io_cq_unlock_post(struct io_ring_ctx *ctx) +static void io_cq_unlock_post(struct io_sq_cq *s) __releases(ctx->completion_lock) { - io_commit_cqring(ctx); + struct io_ring_ctx *ctx = s->ctx; + + io_commit_cqring(s); spin_unlock(&ctx->completion_lock); - io_cqring_wake(ctx); + io_cqring_wake(s); io_commit_cqring_flush(ctx); } -static void __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool dying) +static void __io_cqring_overflow_flush(struct io_sq_cq *s, bool dying) { size_t cqe_size = sizeof(struct io_uring_cqe); + struct io_ring_ctx *ctx = s->ctx; lockdep_assert_held(&ctx->uring_lock); /* don't abort if we're dying, entries must get freed */ - if (!dying && __io_cqring_events(ctx) == ctx->cq_entries) + if (!dying && __io_cqring_events(s) == s->cq_entries) return; if (ctx->flags & IORING_SETUP_CQE32) @@ -631,7 +673,7 @@ static void __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool dying) struct io_overflow_cqe, list); if (!dying) { - if (!io_get_cqe_overflow(ctx, &cqe, true)) + if (!io_get_cqe_overflow(s, &cqe, true)) break; memcpy(cqe, &ocqe->cqe, cqe_size); } @@ -646,7 +688,7 @@ static void __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool dying) * to care for a non-real case. */ if (need_resched()) { - io_cq_unlock_post(ctx); + io_cq_unlock_post(s); mutex_unlock(&ctx->uring_lock); cond_resched(); mutex_lock(&ctx->uring_lock); @@ -655,22 +697,31 @@ static void __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool dying) } if (list_empty(&ctx->cq_overflow_list)) { - clear_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq); - atomic_andnot(IORING_SQ_CQ_OVERFLOW, &ctx->rings->sq_flags); + clear_bit(IO_CHECK_CQ_OVERFLOW_BIT, &s->check_cq); + atomic_andnot(IORING_SQ_CQ_OVERFLOW, &s->rings->sq_flags); } - io_cq_unlock_post(ctx); + io_cq_unlock_post(s); } static void io_cqring_overflow_kill(struct io_ring_ctx *ctx) { - if (ctx->rings) - __io_cqring_overflow_flush(ctx, true); + struct io_sq_cq *s; + int i; + + io_for_each_s(ctx, s, i) { + if (!test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &s->check_cq)) + continue; + if (s->rings) + __io_cqring_overflow_flush(s, true); + } } -static void io_cqring_do_overflow_flush(struct io_ring_ctx *ctx) +static void io_cqring_do_overflow_flush(struct io_sq_cq *s) { + struct io_ring_ctx *ctx = s->ctx; + mutex_lock(&ctx->uring_lock); - __io_cqring_overflow_flush(ctx, false); + __io_cqring_overflow_flush(s, false); mutex_unlock(&ctx->uring_lock); } @@ -710,12 +761,13 @@ static __cold void io_uring_drop_tctx_refs(struct task_struct *task) } } -static bool io_cqring_event_overflow(struct io_ring_ctx *ctx, u64 user_data, +static bool io_cqring_event_overflow(struct io_sq_cq *s, u64 user_data, s32 res, u32 cflags, u64 extra1, u64 extra2) { struct io_overflow_cqe *ocqe; size_t ocq_size = sizeof(struct io_overflow_cqe); - bool is_cqe32 = (ctx->flags & IORING_SETUP_CQE32); + bool is_cqe32 = (s->ring_flags & IORING_SETUP_CQE32); + struct io_ring_ctx *ctx = s->ctx; lockdep_assert_held(&ctx->completion_lock); @@ -723,20 +775,20 @@ static bool io_cqring_event_overflow(struct io_ring_ctx *ctx, u64 user_data, ocq_size += sizeof(struct io_uring_cqe); ocqe = kmalloc(ocq_size, GFP_ATOMIC | __GFP_ACCOUNT); - trace_io_uring_cqe_overflow(ctx, user_data, res, cflags, ocqe); + trace_io_uring_cqe_overflow(s, user_data, res, cflags, ocqe); if (!ocqe) { /* * If we're in ring overflow flush mode, or in task cancel mode, * or cannot allocate an overflow entry, then we need to drop it * on the floor. */ - io_account_cq_overflow(ctx); - set_bit(IO_CHECK_CQ_DROPPED_BIT, &ctx->check_cq); + io_account_cq_overflow(s); + set_bit(IO_CHECK_CQ_DROPPED_BIT, &s->check_cq); return false; } - if (list_empty(&ctx->cq_overflow_list)) { - set_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq); - atomic_or(IORING_SQ_CQ_OVERFLOW, &ctx->rings->sq_flags); + if (!list_empty(&ctx->cq_overflow_list)) { + set_bit(IO_CHECK_CQ_OVERFLOW_BIT, &s->check_cq); + atomic_or(IORING_SQ_CQ_OVERFLOW, &s->rings->sq_flags); } ocqe->cqe.user_data = user_data; @@ -750,9 +802,9 @@ static bool io_cqring_event_overflow(struct io_ring_ctx *ctx, u64 user_data, return true; } -static void io_req_cqe_overflow(struct io_kiocb *req) +static void io_req_cqe_overflow(struct io_sq_cq *s, struct io_kiocb *req) { - io_cqring_event_overflow(req->ctx, req->cqe.user_data, + io_cqring_event_overflow(s, req->cqe.user_data, req->cqe.res, req->cqe.flags, req->big_cqe.extra1, req->big_cqe.extra2); memset(&req->big_cqe, 0, sizeof(req->big_cqe)); @@ -763,10 +815,10 @@ static void io_req_cqe_overflow(struct io_kiocb *req) * control dependency is enough as we're using WRITE_ONCE to * fill the cq entry */ -bool io_cqe_cache_refill(struct io_ring_ctx *ctx, bool overflow) +bool io_cqe_cache_refill(struct io_sq_cq *s, bool overflow) { - struct io_rings *rings = ctx->rings; - unsigned int off = ctx->cached_cq_tail & (ctx->cq_entries - 1); + struct io_rings *rings = s->rings; + unsigned int off = s->cached_cq_tail & (s->cq_entries - 1); unsigned int free, queued, len; /* @@ -774,74 +826,74 @@ bool io_cqe_cache_refill(struct io_ring_ctx *ctx, bool overflow) * ordering guarantees, which will affect links, F_MORE users and more. * Force overflow the completion. */ - if (!overflow && (ctx->check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT))) + if (!overflow && (s->check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT))) return false; /* userspace may cheat modifying the tail, be safe and do min */ - queued = min(__io_cqring_events(ctx), ctx->cq_entries); - free = ctx->cq_entries - queued; + queued = min(__io_cqring_events(s), s->cq_entries); + free = s->cq_entries - queued; /* we need a contiguous range, limit based on the current array offset */ - len = min(free, ctx->cq_entries - off); + len = min(free, s->cq_entries - off); if (!len) return false; - if (ctx->flags & IORING_SETUP_CQE32) { + if (s->ring_flags & IORING_SETUP_CQE32) { off <<= 1; len <<= 1; } - ctx->cqe_cached = &rings->cqes[off]; - ctx->cqe_sentinel = ctx->cqe_cached + len; + s->cqe_cached = &rings->cqes[off]; + s->cqe_sentinel = s->cqe_cached + len; return true; } -static bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res, +static bool io_fill_cqe_aux(struct io_sq_cq *s, u64 user_data, s32 res, u32 cflags) { struct io_uring_cqe *cqe; - ctx->cq_extra++; + s->cq_extra++; /* * If we can't get a cq entry, userspace overflowed the * submission (by quite a lot). Increment the overflow count in * the ring. */ - if (likely(io_get_cqe(ctx, &cqe))) { + if (likely(io_get_cqe(s, &cqe))) { WRITE_ONCE(cqe->user_data, user_data); WRITE_ONCE(cqe->res, res); WRITE_ONCE(cqe->flags, cflags); - if (ctx->flags & IORING_SETUP_CQE32) { + if (s->ring_flags & IORING_SETUP_CQE32) { WRITE_ONCE(cqe->big_cqe[0], 0); WRITE_ONCE(cqe->big_cqe[1], 0); } - trace_io_uring_complete(ctx, NULL, cqe); + trace_io_uring_complete(s->ctx, NULL, cqe); return true; } return false; } -static bool __io_post_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, +static bool __io_post_aux_cqe(struct io_sq_cq *s, u64 user_data, s32 res, u32 cflags) { bool filled; - filled = io_fill_cqe_aux(ctx, user_data, res, cflags); + filled = io_fill_cqe_aux(s, user_data, res, cflags); if (!filled) - filled = io_cqring_event_overflow(ctx, user_data, res, cflags, 0, 0); + filled = io_cqring_event_overflow(s, user_data, res, cflags, 0, 0); return filled; } -bool io_post_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags) +bool io_post_aux_cqe(struct io_sq_cq *s, u64 user_data, s32 res, u32 cflags) { bool filled; - io_cq_lock(ctx); - filled = __io_post_aux_cqe(ctx, user_data, res, cflags); - io_cq_unlock_post(ctx); + io_cq_lock(s->ctx); + filled = __io_post_aux_cqe(s, user_data, res, cflags); + io_cq_unlock_post(s); return filled; } @@ -849,14 +901,14 @@ bool io_post_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags * Must be called from inline task_work so we now a flush will happen later, * and obviously with ctx->uring_lock held (tw always has that). */ -void io_add_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags) +void io_add_aux_cqe(struct io_sq_cq *s, u64 user_data, s32 res, u32 cflags) { - if (!io_fill_cqe_aux(ctx, user_data, res, cflags)) { - spin_lock(&ctx->completion_lock); - io_cqring_event_overflow(ctx, user_data, res, cflags, 0, 0); - spin_unlock(&ctx->completion_lock); + if (!io_fill_cqe_aux(s, user_data, res, cflags)) { + spin_lock(&s->ctx->completion_lock); + io_cqring_event_overflow(s, user_data, res, cflags, 0, 0); + spin_unlock(&s->ctx->completion_lock); } - ctx->submit_state.cq_flush = true; + s->submit_state.cq_flush = true; } /* @@ -866,21 +918,23 @@ void io_add_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags) bool io_req_post_cqe(struct io_kiocb *req, s32 res, u32 cflags) { struct io_ring_ctx *ctx = req->ctx; + struct io_sq_cq *s = req->sq; bool posted; lockdep_assert(!io_wq_current_is_worker()); - lockdep_assert_held(&ctx->uring_lock); + lockdep_assert_held(&s->ring_lock); __io_cq_lock(ctx); - posted = io_fill_cqe_aux(ctx, req->cqe.user_data, res, cflags); - ctx->submit_state.cq_flush = true; - __io_cq_unlock_post(ctx); + posted = io_fill_cqe_aux(s, req->cqe.user_data, res, cflags); + s->submit_state.cq_flush = true; + __io_cq_unlock_post(s); return posted; } static void io_req_complete_post(struct io_kiocb *req, unsigned issue_flags) { struct io_ring_ctx *ctx = req->ctx; + struct io_sq_cq *s = req->sq; /* * All execution paths but io-wq use the deferred completions by @@ -901,10 +955,10 @@ static void io_req_complete_post(struct io_kiocb *req, unsigned issue_flags) io_cq_lock(ctx); if (!(req->flags & REQ_F_CQE_SKIP)) { - if (!io_fill_cqe_req(ctx, req)) - io_req_cqe_overflow(req); + if (!io_fill_cqe_req(s, req)) + io_req_cqe_overflow(s, req); } - io_cq_unlock_post(ctx); + io_cq_unlock_post(s); /* * We don't free the request here because we know it's called from @@ -914,17 +968,16 @@ static void io_req_complete_post(struct io_kiocb *req, unsigned issue_flags) } void io_req_defer_failed(struct io_kiocb *req, s32 res) - __must_hold(&ctx->uring_lock) { const struct io_cold_def *def = &io_cold_defs[req->opcode]; - lockdep_assert_held(&req->ctx->uring_lock); + lockdep_assert_held(&req->sq->ring_lock); req_set_fail(req); io_req_set_res(req, res, io_put_kbuf(req, res, IO_URING_F_UNLOCKED)); if (def->fail) def->fail(req); - io_req_complete_defer(req); + io_req_complete_defer(req, req->sq); } /* @@ -949,13 +1002,14 @@ static void io_preinit_req(struct io_kiocb *req, struct io_ring_ctx *ctx) * Because of that, io_alloc_req() should be called only under ->uring_lock * and with extra caution to not get a request that is still worked on. */ -__cold bool __io_alloc_req_refill(struct io_ring_ctx *ctx) - __must_hold(&ctx->uring_lock) +__cold bool __io_alloc_req_refill(struct io_sq_cq *s) { gfp_t gfp = GFP_KERNEL | __GFP_NOWARN; void *reqs[IO_REQ_ALLOC_BATCH]; int ret; + lockdep_assert_held(&s->ring_lock); + ret = kmem_cache_alloc_bulk(req_cachep, gfp, ARRAY_SIZE(reqs), reqs); /* @@ -969,12 +1023,12 @@ __cold bool __io_alloc_req_refill(struct io_ring_ctx *ctx) ret = 1; } - percpu_ref_get_many(&ctx->refs, ret); + percpu_ref_get_many(&s->ctx->refs, ret); while (ret--) { struct io_kiocb *req = reqs[ret]; - io_preinit_req(req, ctx); - io_req_add_to_cache(req, ctx); + io_preinit_req(req, s->ctx); + io_req_add_to_cache(req, &s->submit_state); } return true; } @@ -1017,13 +1071,15 @@ static inline struct io_kiocb *io_req_find_next(struct io_kiocb *req) static void ctx_flush_and_put(struct io_ring_ctx *ctx, struct io_tw_state *ts) { + struct io_sq_cq *s = ctx->s; + if (!ctx) return; if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) - atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); + atomic_andnot(IORING_SQ_TASKRUN, &s->rings->sq_flags); - io_submit_flush_completions(ctx); - mutex_unlock(&ctx->uring_lock); + io_submit_flush_completions(s); + mutex_unlock(&s->ring_lock); percpu_ref_put(&ctx->refs); } @@ -1047,7 +1103,8 @@ struct llist_node *io_handle_tw_list(struct llist_node *node, if (req->ctx != ctx) { ctx_flush_and_put(ctx, &ts); ctx = req->ctx; - mutex_lock(&ctx->uring_lock); + mutex_lock(&ctx->s->ring_lock); + ts.sq = ctx->s; percpu_ref_get(&ctx->refs); } INDIRECT_CALL_2(req->io_task_work.func, @@ -1137,11 +1194,80 @@ void tctx_task_work(struct callback_head *cb) WARN_ON_ONCE(ret); } +static inline void io_req_thread_work_add(struct io_kiocb *req, + struct io_ring_ctx *ctx, + unsigned flags) +{ + unsigned nr_wait, nr_tw, nr_tw_prev; + struct io_sq_cq *s = &ctx->s[0]; + struct llist_node *head; + + /* + * We don't know how many reuqests is there in the link and whether + * they can even be queued lazily, fall back to non-lazy. + */ + if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK)) + flags &= ~IOU_F_TWQ_LAZY_WAKE; + + guard(rcu)(); + + head = READ_ONCE(s->work_llist.first); + do { + nr_tw_prev = 0; + if (head) { + struct io_kiocb *first_req = container_of(head, + struct io_kiocb, + io_task_work.node); + /* + * Might be executed at any moment, rely on + * SLAB_TYPESAFE_BY_RCU to keep it alive. + */ + nr_tw_prev = READ_ONCE(first_req->nr_tw); + } + + /* + * Theoretically, it can overflow, but that's fine as one of + * previous adds should've tried to wake the task. + */ + nr_tw = nr_tw_prev + 1; + if (!(flags & IOU_F_TWQ_LAZY_WAKE)) + nr_tw = IO_CQ_WAKE_FORCE; + + req->nr_tw = nr_tw; + req->io_task_work.node.next = head; + } while (!try_cmpxchg(&s->work_llist.first, &head, &req->io_task_work.node)); + + /* + * cmpxchg implies a full barrier, which pairs with the barrier + * in set_current_state() on the io_cqring_wait() side. It's used + * to ensure that either we see updated ->cq_wait_nr, or waiters + * going to sleep will observe the work added to the list, which + * is similar to the wait/wawke task state sync. + */ + + if (!head) { + if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) + atomic_or(IORING_SQ_TASKRUN, &s->rings->sq_flags); + if (ctx->has_evfd) + io_eventfd_signal(ctx); + } + + nr_wait = atomic_read(&s->cq_wait_nr); + /* not enough or no one is waiting */ + if (nr_tw < nr_wait) + return; + /* the previous add has already woken it up */ + if (nr_tw_prev >= nr_wait) + return; + wake_up_state(s->submitter_task, TASK_INTERRUPTIBLE); +} + static inline void io_req_local_work_add(struct io_kiocb *req, struct io_ring_ctx *ctx, unsigned flags) { unsigned nr_wait, nr_tw, nr_tw_prev; + struct io_sq_cq *s = ctx->s; struct llist_node *head; /* See comment above IO_CQ_WAKE_INIT */ @@ -1156,7 +1282,7 @@ static inline void io_req_local_work_add(struct io_kiocb *req, guard(rcu)(); - head = READ_ONCE(ctx->work_llist.first); + head = READ_ONCE(s->work_llist.first); do { nr_tw_prev = 0; if (head) { @@ -1180,8 +1306,7 @@ static inline void io_req_local_work_add(struct io_kiocb *req, req->nr_tw = nr_tw; req->io_task_work.node.next = head; - } while (!try_cmpxchg(&ctx->work_llist.first, &head, - &req->io_task_work.node)); + } while (!try_cmpxchg(&s->work_llist.first, &head, &req->io_task_work.node)); /* * cmpxchg implies a full barrier, which pairs with the barrier @@ -1193,32 +1318,33 @@ static inline void io_req_local_work_add(struct io_kiocb *req, if (!head) { if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) - atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); + atomic_or(IORING_SQ_TASKRUN, &s->rings->sq_flags); if (ctx->has_evfd) io_eventfd_signal(ctx); } - nr_wait = atomic_read(&ctx->cq_wait_nr); + nr_wait = atomic_read(&s->cq_wait_nr); /* not enough or no one is waiting */ if (nr_tw < nr_wait) return; /* the previous add has already woken it up */ if (nr_tw_prev >= nr_wait) return; - wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE); + wake_up_state(s->submitter_task, TASK_INTERRUPTIBLE); } static void io_req_normal_work_add(struct io_kiocb *req) { struct io_uring_task *tctx = req->tctx; struct io_ring_ctx *ctx = req->ctx; + struct io_sq_cq *s = ctx->s; /* task_work already pending, we're done */ if (!llist_add(&req->io_task_work.node, &tctx->task_list)) return; if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) - atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); + atomic_or(IORING_SQ_TASKRUN, &s->rings->sq_flags); /* SQPOLL doesn't need the task_work added, it'll run it itself */ if (ctx->flags & IORING_SETUP_SQPOLL) { @@ -1237,8 +1363,12 @@ static void io_req_normal_work_add(struct io_kiocb *req) void __io_req_task_work_add(struct io_kiocb *req, unsigned flags) { - if (req->ctx->flags & IORING_SETUP_DEFER_TASKRUN) - io_req_local_work_add(req, req->ctx, flags); + struct io_ring_ctx *ctx = req->ctx; + + if (ctx->flags & IORING_SETUP_THREAD_ISSUER) + io_req_thread_work_add(req, ctx, flags); + else if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) + io_req_local_work_add(req, ctx, flags); else io_req_normal_work_add(req); } @@ -1253,22 +1383,27 @@ void io_req_task_work_add_remote(struct io_kiocb *req, struct io_ring_ctx *ctx, static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx) { - struct llist_node *node = llist_del_all(&ctx->work_llist); + struct io_sq_cq *s; + int i; + + io_for_each_s(ctx, s, i) { + struct llist_node *node = llist_del_all(&s->work_llist); - __io_fallback_tw(node, false); - node = llist_del_all(&ctx->retry_llist); - __io_fallback_tw(node, false); + __io_fallback_tw(node, false); + node = llist_del_all(&s->retry_llist); + __io_fallback_tw(node, false); + } } -static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events, +static bool io_run_local_work_continue(struct io_sq_cq *s, int events, int min_events) { - if (!io_local_work_pending(ctx)) + if (!io_local_work_pending(s)) return false; if (events < min_events) return true; - if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) - atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); + if (s->ring_flags & IORING_SETUP_TASKRUN_FLAG) + atomic_or(IORING_SQ_TASKRUN, &s->rings->sq_flags); return false; } @@ -1293,75 +1428,76 @@ static int __io_run_local_work_loop(struct llist_node **node, return ret; } -static int __io_run_local_work(struct io_ring_ctx *ctx, struct io_tw_state *ts, +static int __io_run_local_work(struct io_sq_cq *s, struct io_tw_state *ts, int min_events, int max_events) { + struct io_ring_ctx *ctx = s->ctx; struct llist_node *node; unsigned int loops = 0; int ret = 0; - if (WARN_ON_ONCE(ctx->submitter_task != current)) + if (WARN_ON_ONCE(s->submitter_task != current)) return -EEXIST; if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) - atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); + atomic_andnot(IORING_SQ_TASKRUN, &s->rings->sq_flags); again: min_events -= ret; - ret = __io_run_local_work_loop(&ctx->retry_llist.first, ts, max_events); - if (ctx->retry_llist.first) + ret = __io_run_local_work_loop(&s->retry_llist.first, ts, max_events); + if (s->retry_llist.first) goto retry_done; /* * llists are in reverse order, flip it back the right way before * running the pending items. */ - node = llist_reverse_order(llist_del_all(&ctx->work_llist)); + node = llist_reverse_order(llist_del_all(&s->work_llist)); ret += __io_run_local_work_loop(&node, ts, max_events - ret); - ctx->retry_llist.first = node; + s->retry_llist.first = node; loops++; - if (io_run_local_work_continue(ctx, ret, min_events)) + if (io_run_local_work_continue(s, ret, min_events)) goto again; retry_done: - io_submit_flush_completions(ctx); - if (io_run_local_work_continue(ctx, ret, min_events)) + io_submit_flush_completions(s); + if (io_run_local_work_continue(s, ret, min_events)) goto again; trace_io_uring_local_work_run(ctx, ret, loops); return ret; } -static inline int io_run_local_work_locked(struct io_ring_ctx *ctx, - int min_events) +static inline int io_run_local_work_locked(struct io_sq_cq *s, int min_events) { - struct io_tw_state ts = {}; + struct io_tw_state ts = { .sq = s }; - if (!io_local_work_pending(ctx)) + if (!io_local_work_pending(s)) return 0; - return __io_run_local_work(ctx, &ts, min_events, + return __io_run_local_work(s, &ts, min_events, max(IO_LOCAL_TW_DEFAULT_MAX, min_events)); } -static int io_run_local_work(struct io_ring_ctx *ctx, int min_events, - int max_events) +static int io_run_local_work(struct io_sq_cq *s, int min_events, int max_events) { - struct io_tw_state ts = {}; + struct io_tw_state ts = { .sq = s }; int ret; - mutex_lock(&ctx->uring_lock); - ret = __io_run_local_work(ctx, &ts, min_events, max_events); - mutex_unlock(&ctx->uring_lock); + mutex_lock(&s->ring_lock); + ret = __io_run_local_work(s, &ts, min_events, max_events); + mutex_unlock(&s->ring_lock); return ret; } static void io_req_task_cancel(struct io_kiocb *req, struct io_tw_state *ts) { - io_tw_lock(req->ctx, ts); + req->sq = ts->sq; + io_tw_lock(ts->sq); io_req_defer_failed(req, req->cqe.res); } void io_req_task_submit(struct io_kiocb *req, struct io_tw_state *ts) { - io_tw_lock(req->ctx, ts); + req->sq = ts->sq; + io_tw_lock(ts->sq); if (unlikely(io_should_terminate_tw())) io_req_defer_failed(req, -EFAULT); else if (req->flags & REQ_F_FORCE_ASYNC) @@ -1391,10 +1527,10 @@ void io_queue_next(struct io_kiocb *req) io_req_task_queue(nxt); } -static void io_free_batch_list(struct io_ring_ctx *ctx, - struct io_wq_work_node *node) - __must_hold(&ctx->uring_lock) +static void io_free_batch_list(struct io_sq_cq *s, struct io_wq_work_node *node) { + lockdep_assert_held(&s->ring_lock); + do { struct io_kiocb *req = container_of(node, struct io_kiocb, comp_list); @@ -1410,7 +1546,7 @@ static void io_free_batch_list(struct io_ring_ctx *ctx, if (apoll->double_poll) kfree(apoll->double_poll); - if (!io_alloc_cache_put(&ctx->apoll_cache, apoll)) + if (!io_alloc_cache_put(&s->apoll_cache, apoll)) kfree(apoll); req->flags &= ~REQ_F_POLLED; } @@ -1424,15 +1560,18 @@ static void io_free_batch_list(struct io_ring_ctx *ctx, io_put_task(req); node = req->comp_list.next; - io_req_add_to_cache(req, ctx); + io_req_add_to_cache(req, &s->submit_state); } while (node); } -void __io_submit_flush_completions(struct io_ring_ctx *ctx) - __must_hold(&ctx->uring_lock) +void __io_submit_flush_completions(struct io_sq_cq *s) { - struct io_submit_state *state = &ctx->submit_state; + struct io_submit_state *state = &s->submit_state; + struct io_ring_ctx *ctx = s->ctx; struct io_wq_work_node *node; + int nr = 0; + + lockdep_assert_held(&s->ring_lock); __io_cq_lock(ctx); __wq_list_for_each(node, &state->compl_reqs) { @@ -1440,30 +1579,31 @@ void __io_submit_flush_completions(struct io_ring_ctx *ctx) comp_list); if (!(req->flags & REQ_F_CQE_SKIP) && - unlikely(!io_fill_cqe_req(ctx, req))) { + unlikely(!io_fill_cqe_req(s, req))) { if (ctx->lockless_cq) { spin_lock(&ctx->completion_lock); - io_req_cqe_overflow(req); + io_req_cqe_overflow(s, req); spin_unlock(&ctx->completion_lock); } else { - io_req_cqe_overflow(req); + io_req_cqe_overflow(s, req); } } + nr++; } - __io_cq_unlock_post(ctx); + __io_cq_unlock_post(s); if (!wq_list_empty(&state->compl_reqs)) { - io_free_batch_list(ctx, state->compl_reqs.first); + io_free_batch_list(s, state->compl_reqs.first); INIT_WQ_LIST(&state->compl_reqs); } - ctx->submit_state.cq_flush = false; + state->cq_flush = false; } -static unsigned io_cqring_events(struct io_ring_ctx *ctx) +static unsigned io_cqring_events(struct io_sq_cq *s) { /* See comment at the top of this file */ smp_rmb(); - return __io_cqring_events(ctx); + return __io_cqring_events(s); } /* @@ -1472,13 +1612,15 @@ static unsigned io_cqring_events(struct io_ring_ctx *ctx) */ static __cold void io_iopoll_try_reap_events(struct io_ring_ctx *ctx) { + struct io_sq_cq *s = &ctx->s[0]; + if (!(ctx->flags & IORING_SETUP_IOPOLL)) return; - mutex_lock(&ctx->uring_lock); + mutex_lock(&s->ring_lock); while (!wq_list_empty(&ctx->iopoll_list)) { /* let it sleep and repeat later if can't complete a request */ - if (io_do_iopoll(ctx, true) == 0) + if (io_do_iopoll(s, true) == 0) break; /* * Ensure we allow local-to-the-cpu processing to take place, @@ -1486,28 +1628,28 @@ static __cold void io_iopoll_try_reap_events(struct io_ring_ctx *ctx) * Also let task_work, etc. to progress by releasing the mutex */ if (need_resched()) { - mutex_unlock(&ctx->uring_lock); + mutex_unlock(&s->ring_lock); cond_resched(); - mutex_lock(&ctx->uring_lock); + mutex_lock(&s->ring_lock); } } - mutex_unlock(&ctx->uring_lock); + mutex_unlock(&s->ring_lock); } -static int io_iopoll_check(struct io_ring_ctx *ctx, long min) +static int io_iopoll_check(struct io_ring_ctx *ctx, struct io_sq_cq *s, long min) { unsigned int nr_events = 0; unsigned long check_cq; - lockdep_assert_held(&ctx->uring_lock); + lockdep_assert_held(&s->ring_lock); - if (!io_allowed_run_tw(ctx)) + if (!io_allowed_run_tw(s)) return -EEXIST; - check_cq = READ_ONCE(ctx->check_cq); + check_cq = READ_ONCE(s->check_cq); if (unlikely(check_cq)) { if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT)) - __io_cqring_overflow_flush(ctx, false); + __io_cqring_overflow_flush(s, false); /* * Similarly do not spin if we have not informed the user of any * dropped CQE. @@ -1520,7 +1662,7 @@ static int io_iopoll_check(struct io_ring_ctx *ctx, long min) * If we do, we can potentially be spinning for commands that * already triggered a CQE (eg in error). */ - if (io_cqring_events(ctx)) + if (io_cqring_events(s)) return 0; do { @@ -1537,23 +1679,23 @@ static int io_iopoll_check(struct io_ring_ctx *ctx, long min) * very same mutex. */ if (wq_list_empty(&ctx->iopoll_list) || - io_task_work_pending(ctx)) { - u32 tail = ctx->cached_cq_tail; + io_task_work_pending(s)) { + u32 tail = s->cached_cq_tail; - (void) io_run_local_work_locked(ctx, min); + io_run_local_work_locked(s, min); if (task_work_pending(current) || wq_list_empty(&ctx->iopoll_list)) { - mutex_unlock(&ctx->uring_lock); + mutex_unlock(&s->ring_lock); io_run_task_work(); - mutex_lock(&ctx->uring_lock); + mutex_lock(&s->ring_lock); } /* some requests don't go through iopoll_list */ - if (tail != ctx->cached_cq_tail || + if (tail != s->cached_cq_tail || wq_list_empty(&ctx->iopoll_list)) break; } - ret = io_do_iopoll(ctx, !min); + ret = io_do_iopoll(s, !min); if (unlikely(ret < 0)) return ret; @@ -1570,7 +1712,7 @@ static int io_iopoll_check(struct io_ring_ctx *ctx, long min) void io_req_task_complete(struct io_kiocb *req, struct io_tw_state *ts) { - io_req_complete_defer(req); + io_req_complete_defer(req, ts->sq); } /* @@ -1581,12 +1723,13 @@ void io_req_task_complete(struct io_kiocb *req, struct io_tw_state *ts) */ static void io_iopoll_req_issued(struct io_kiocb *req, unsigned int issue_flags) { - struct io_ring_ctx *ctx = req->ctx; const bool needs_lock = issue_flags & IO_URING_F_UNLOCKED; + struct io_ring_ctx *ctx = req->ctx; + struct io_sq_cq *s = req->sq; /* workqueue context doesn't hold uring_lock, grab it now */ if (unlikely(needs_lock)) - mutex_lock(&ctx->uring_lock); + mutex_lock(&s->ring_lock); /* * Track whether we have multiple files in our lists. This will impact @@ -1624,7 +1767,7 @@ static void io_iopoll_req_issued(struct io_kiocb *req, unsigned int issue_flags) wq_has_sleeper(&ctx->sq_data->wait)) wake_up(&ctx->sq_data->wait); - mutex_unlock(&ctx->uring_lock); + mutex_unlock(&s->ring_lock); } } @@ -1641,7 +1784,7 @@ io_req_flags_t io_file_get_flags(struct file *file) static u32 io_get_sequence(struct io_kiocb *req) { - u32 seq = req->ctx->cached_sq_head; + u32 seq = req->sq->cached_sq_head; struct io_kiocb *cur; /* need original cached_sq_head, but it was increased for each req */ @@ -1651,13 +1794,15 @@ static u32 io_get_sequence(struct io_kiocb *req) } static __cold void io_drain_req(struct io_kiocb *req) - __must_hold(&ctx->uring_lock) { struct io_ring_ctx *ctx = req->ctx; + struct io_sq_cq *s = req->sq; struct io_defer_entry *de; int ret; u32 seq = io_get_sequence(req); + lockdep_assert_held(&s->ring_lock); + /* Still need defer if there is pending req in defer list. */ spin_lock(&ctx->completion_lock); if (!req_need_defer(req, seq) && list_empty_careful(&ctx->defer_list)) { @@ -1730,7 +1875,7 @@ static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags) if (ret == IOU_OK) { if (issue_flags & IO_URING_F_COMPLETE_DEFER) - io_req_complete_defer(req); + io_req_complete_defer(req, req->sq); else io_req_complete_post(req, issue_flags); @@ -1750,7 +1895,8 @@ static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags) int io_poll_issue(struct io_kiocb *req, struct io_tw_state *ts) { - io_tw_lock(req->ctx, ts); + req->sq = ts->sq; + io_tw_lock(ts->sq); return io_issue_sqe(req, IO_URING_F_NONBLOCK|IO_URING_F_MULTISHOT| IO_URING_F_COMPLETE_DEFER); } @@ -1873,14 +2019,14 @@ inline struct file *io_file_get_fixed(struct io_kiocb *req, int fd, struct io_rsrc_node *node; struct file *file = NULL; - io_ring_submit_lock(ctx, issue_flags); + io_ring_submit_lock(req->sq, issue_flags); node = io_rsrc_node_lookup(&ctx->file_table.data, fd); if (node) { io_req_assign_rsrc_node(&req->file_node, node); req->flags |= io_slot_flags(node); file = io_slot_file(node); } - io_ring_submit_unlock(ctx, issue_flags); + io_ring_submit_unlock(req->sq, issue_flags); return file; } @@ -1897,10 +2043,11 @@ struct file *io_file_get_normal(struct io_kiocb *req, int fd) } static void io_queue_async(struct io_kiocb *req, int ret) - __must_hold(&req->ctx->uring_lock) { struct io_kiocb *linked_timeout; + lockdep_assert_held(&req->sq->ring_lock); + if (ret != -EAGAIN || (req->flags & REQ_F_NOWAIT)) { io_req_defer_failed(req, ret); return; @@ -1926,7 +2073,6 @@ static void io_queue_async(struct io_kiocb *req, int ret) } static inline void io_queue_sqe(struct io_kiocb *req) - __must_hold(&req->ctx->uring_lock) { int ret; @@ -1941,8 +2087,9 @@ static inline void io_queue_sqe(struct io_kiocb *req) } static void io_queue_sqe_fallback(struct io_kiocb *req) - __must_hold(&req->ctx->uring_lock) { + lockdep_assert_held(&req->sq->ring_lock); + if (unlikely(req->flags & REQ_F_FAIL)) { /* * We don't submit, fail them all, for that replace hardlinks @@ -1982,10 +2129,10 @@ static inline bool io_check_restriction(struct io_ring_ctx *ctx, return true; } -static void io_init_req_drain(struct io_kiocb *req) +static void io_init_req_drain(struct io_sq_cq *s, struct io_kiocb *req) { struct io_ring_ctx *ctx = req->ctx; - struct io_kiocb *head = ctx->submit_state.link.head; + struct io_kiocb *head = s->submit_state.link.head; ctx->drain_active = true; if (head) { @@ -2008,22 +2155,25 @@ static __cold int io_init_fail_req(struct io_kiocb *req, int err) return err; } -static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req, +static int io_init_req(struct io_sq_cq *s, struct io_kiocb *req, const struct io_uring_sqe *sqe) - __must_hold(&ctx->uring_lock) { + struct io_ring_ctx *ctx = s->ctx; const struct io_issue_def *def; unsigned int sqe_flags; int personality; u8 opcode; + lockdep_assert_held(&s->ring_lock); + /* req is partially pre-initialised, see io_preinit_req() */ + req->file = NULL; req->opcode = opcode = READ_ONCE(sqe->opcode); /* same numerical values with corresponding REQ_F_*, safe to copy */ sqe_flags = READ_ONCE(sqe->flags); req->flags = (__force io_req_flags_t) sqe_flags; req->cqe.user_data = READ_ONCE(sqe->user_data); - req->file = NULL; + req->sq = s; req->tctx = current->io_uring; req->cancel_seq_set = false; @@ -2046,7 +2196,7 @@ static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req, if (sqe_flags & IOSQE_IO_DRAIN) { if (ctx->drain_disabled) return io_init_fail_req(req, -EOPNOTSUPP); - io_init_req_drain(req); + io_init_req_drain(s, req); } } if (unlikely(ctx->restricted || ctx->drain_active || ctx->drain_next)) { @@ -2056,7 +2206,7 @@ static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req, if (ctx->drain_active) req->flags |= REQ_F_FORCE_ASYNC; /* if there is no link, we're at "next" request and need to drain */ - if (unlikely(ctx->drain_next) && !ctx->submit_state.link.head) { + if (unlikely(ctx->drain_next) && !s->submit_state.link.head) { ctx->drain_next = false; ctx->drain_active = true; req->flags |= REQ_F_IO_DRAIN | REQ_F_FORCE_ASYNC; @@ -2069,7 +2219,7 @@ static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req, return io_init_fail_req(req, -EINVAL); if (def->needs_file) { - struct io_submit_state *state = &ctx->submit_state; + struct io_submit_state *state = &s->submit_state; req->cqe.fd = READ_ONCE(sqe->fd); @@ -2103,11 +2253,11 @@ static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req, return def->prep(req, sqe); } -static __cold int io_submit_fail_init(const struct io_uring_sqe *sqe, +static __cold int io_submit_fail_init(struct io_sq_cq *s, + const struct io_uring_sqe *sqe, struct io_kiocb *req, int ret) { - struct io_ring_ctx *ctx = req->ctx; - struct io_submit_link *link = &ctx->submit_state.link; + struct io_submit_link *link = &s->submit_state.link; struct io_kiocb *head = link->head; trace_io_uring_req_failed(sqe, req, ret); @@ -2140,16 +2290,17 @@ static __cold int io_submit_fail_init(const struct io_uring_sqe *sqe, return 0; } -static inline int io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req, - const struct io_uring_sqe *sqe) - __must_hold(&ctx->uring_lock) +static inline int io_submit_sqe(struct io_sq_cq *s, struct io_kiocb *req, + const struct io_uring_sqe *sqe) { - struct io_submit_link *link = &ctx->submit_state.link; + struct io_submit_link *link = &s->submit_state.link; int ret; - ret = io_init_req(ctx, req, sqe); + lockdep_assert_held(&s->ring_lock); + + ret = io_init_req(s, req, sqe); if (unlikely(ret)) - return io_submit_fail_init(sqe, req, ret); + return io_submit_fail_init(s, sqe, req, ret); trace_io_uring_submit_req(req); @@ -2192,14 +2343,14 @@ fallback: /* * Batched submission is done, ensure local IO is flushed out. */ -static void io_submit_state_end(struct io_ring_ctx *ctx) +static void io_submit_state_end(struct io_sq_cq *s) { - struct io_submit_state *state = &ctx->submit_state; + struct io_submit_state *state = &s->submit_state; if (unlikely(state->link.head)) io_queue_sqe_fallback(state->link.head); /* flush only after queuing links as they can generate completions */ - io_submit_flush_completions(ctx); + io_submit_flush_completions(s); if (state->plug_started) blk_finish_plug(&state->plug); } @@ -2217,16 +2368,16 @@ static void io_submit_state_start(struct io_submit_state *state, state->link.head = NULL; } -static void io_commit_sqring(struct io_ring_ctx *ctx) +static void io_commit_sqring(struct io_sq_cq *s) { - struct io_rings *rings = ctx->rings; + struct io_rings *rings = s->rings; /* * Ensure any loads from the SQEs are done at this point, * since once we write the new head, the application could * write new data to them. */ - smp_store_release(&rings->sq.head, ctx->cached_sq_head); + smp_store_release(&rings->sq.head, s->cached_sq_head); } /* @@ -2237,24 +2388,24 @@ static void io_commit_sqring(struct io_ring_ctx *ctx) * used, it's important that those reads are done through READ_ONCE() to * prevent a re-load down the line. */ -static bool io_get_sqe(struct io_ring_ctx *ctx, const struct io_uring_sqe **sqe) +static bool io_get_sqe(struct io_sq_cq *s, const struct io_uring_sqe **sqe) { - unsigned mask = ctx->sq_entries - 1; - unsigned head = ctx->cached_sq_head++ & mask; + unsigned mask = s->sq_entries - 1; + unsigned head = s->cached_sq_head++ & mask; if (static_branch_unlikely(&io_key_has_sqarray) && - (!(ctx->flags & IORING_SETUP_NO_SQARRAY))) { - head = READ_ONCE(ctx->sq_array[head]); - if (unlikely(head >= ctx->sq_entries)) { + (!(s->ring_flags & IORING_SETUP_NO_SQARRAY))) { + head = READ_ONCE(s->sq_array[head]); + if (unlikely(head >= s->sq_entries)) { /* drop invalid entries */ - spin_lock(&ctx->completion_lock); - ctx->cq_extra--; - spin_unlock(&ctx->completion_lock); - WRITE_ONCE(ctx->rings->sq_dropped, - READ_ONCE(ctx->rings->sq_dropped) + 1); + spin_lock(&s->ctx->completion_lock); + s->cq_extra--; + spin_unlock(&s->ctx->completion_lock); + WRITE_ONCE(s->rings->sq_dropped, + READ_ONCE(s->rings->sq_dropped) + 1); return false; } - head = array_index_nospec(head, ctx->sq_entries); + head = array_index_nospec(head, s->sq_entries); } /* @@ -2267,34 +2418,35 @@ static bool io_get_sqe(struct io_ring_ctx *ctx, const struct io_uring_sqe **sqe) */ /* double index for 128-byte SQEs, twice as long */ - if (ctx->flags & IORING_SETUP_SQE128) + if (s->ring_flags & IORING_SETUP_SQE128) head <<= 1; - *sqe = &ctx->sq_sqes[head]; + *sqe = &s->sq_sqes[head]; return true; } -int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr) - __must_hold(&ctx->uring_lock) +int io_submit_sqes(struct io_sq_cq *s, unsigned int nr) { - unsigned int entries = io_sqring_entries(ctx); + unsigned int entries = io_sqring_entries(s); unsigned int left; int ret; + lockdep_assert_held(&s->ring_lock); + if (unlikely(!entries)) return 0; /* make sure SQ entry isn't read before tail */ ret = left = min(nr, entries); io_get_task_refs(left); - io_submit_state_start(&ctx->submit_state, left); + io_submit_state_start(&s->submit_state, left); do { const struct io_uring_sqe *sqe; struct io_kiocb *req; - if (unlikely(!io_alloc_req(ctx, &req))) + if (unlikely(!io_alloc_req(s, &req))) break; - if (unlikely(!io_get_sqe(ctx, &sqe))) { - io_req_add_to_cache(req, ctx); + if (unlikely(!io_get_sqe(s, &sqe))) { + io_req_add_to_cache(req, &s->submit_state); break; } @@ -2302,8 +2454,8 @@ int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr) * Continue submitting even for sqe failure if the * ring was setup with IORING_SETUP_SUBMIT_ALL */ - if (unlikely(io_submit_sqe(ctx, req, sqe)) && - !(ctx->flags & IORING_SETUP_SUBMIT_ALL)) { + if (unlikely(io_submit_sqe(s, req, sqe)) && + !(s->ring_flags & IORING_SETUP_SUBMIT_ALL)) { left--; break; } @@ -2312,14 +2464,14 @@ int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr) if (unlikely(left)) { ret -= left; /* try again if it submitted nothing and can't allocate a req */ - if (!ret && io_req_cache_empty(ctx)) + if (!ret && io_req_cache_empty(&s->submit_state)) ret = -EAGAIN; current->io_uring->cached_refs += left; } - io_submit_state_end(ctx); + io_submit_state_end(s); /* Commit SQ ring head once we've consumed and submitted all SQEs */ - io_commit_sqring(ctx); + io_commit_sqring(s); return ret; } @@ -2332,18 +2484,25 @@ static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode, * Cannot safely flush overflowed CQEs from here, ensure we wake up * the task, and the next invocation will do it. */ - if (io_should_wake(iowq) || io_has_work(iowq->ctx)) + if (io_should_wake(iowq) || io_has_work(iowq->s)) return autoremove_wake_function(curr, mode, wake_flags, key); return -1; } int io_run_task_work_sig(struct io_ring_ctx *ctx) { - if (io_local_work_pending(ctx)) { + struct io_sq_cq *s; + int i, ret = 1; + + io_for_each_s(ctx, s, i) { + if (!io_local_work_pending(s)) + continue; __set_current_state(TASK_RUNNING); - if (io_run_local_work(ctx, INT_MAX, IO_LOCAL_TW_DEFAULT_MAX) > 0) - return 0; + if (io_run_local_work(s, INT_MAX, IO_LOCAL_TW_DEFAULT_MAX) > 0) + ret = 0; } + if (!ret) + return ret; if (io_run_task_work() > 0) return 0; if (task_sigpending(current)) @@ -2378,20 +2537,20 @@ static enum hrtimer_restart io_cqring_timer_wakeup(struct hrtimer *timer) static enum hrtimer_restart io_cqring_min_timer_wakeup(struct hrtimer *timer) { struct io_wait_queue *iowq = container_of(timer, struct io_wait_queue, t); - struct io_ring_ctx *ctx = iowq->ctx; + struct io_sq_cq *s = iowq->s; /* no general timeout, or shorter (or equal), we are done */ if (iowq->timeout == KTIME_MAX || ktime_compare(iowq->min_timeout, iowq->timeout) >= 0) goto out_wake; /* work we may need to run, wake function will see if we need to wake */ - if (io_has_work(ctx)) + if (io_has_work(s)) goto out_wake; /* got events since we started waiting, min timeout is done */ - if (iowq->cq_min_tail != READ_ONCE(ctx->rings->cq.tail)) + if (iowq->cq_min_tail != READ_ONCE(s->rings->cq.tail)) goto out_wake; /* if we have any events and min timeout expired, we're done */ - if (io_cqring_events(ctx)) + if (io_cqring_events(s)) goto out_wake; /* @@ -2400,10 +2559,10 @@ static enum hrtimer_restart io_cqring_min_timer_wakeup(struct hrtimer *timer) * to normal sleeps. Any request completion post min_wait should wake * the task and return. */ - if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) { - atomic_set(&ctx->cq_wait_nr, 1); + if (s->ring_flags & IORING_SETUP_DEFER_TASKRUN) { + atomic_set(&s->cq_wait_nr, 1); smp_mb(); - if (!llist_empty(&ctx->work_llist)) + if (!llist_empty(&s->work_llist)) goto out_wake; } @@ -2464,13 +2623,13 @@ static int __io_cqring_wait_schedule(struct io_ring_ctx *ctx, } /* If this returns > 0, the caller should retry */ -static inline int io_cqring_wait_schedule(struct io_ring_ctx *ctx, +static inline int io_cqring_wait_schedule(struct io_sq_cq *s, struct io_wait_queue *iowq, ktime_t start_time) { - if (unlikely(READ_ONCE(ctx->check_cq))) + if (unlikely(READ_ONCE(s->check_cq))) return 1; - if (unlikely(io_local_work_pending(ctx))) + if (unlikely(io_local_work_pending(s))) return 1; if (unlikely(task_work_pending(current))) return 1; @@ -2479,7 +2638,7 @@ static inline int io_cqring_wait_schedule(struct io_ring_ctx *ctx, if (unlikely(io_should_wake(iowq))) return 0; - return __io_cqring_wait_schedule(ctx, iowq, start_time); + return __io_cqring_wait_schedule(s->ctx, iowq, start_time); } struct ext_arg { @@ -2494,33 +2653,35 @@ struct ext_arg { * Wait until events become available, if we don't already have some. The * application must reap them itself, as they reside on the shared cq ring. */ -static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, u32 flags, - struct ext_arg *ext_arg) +static int io_cqring_wait(struct io_ring_ctx *ctx, struct io_sq_cq *s, + int min_events, u32 flags, struct ext_arg *ext_arg) { struct io_wait_queue iowq; - struct io_rings *rings = ctx->rings; + struct io_rings *rings = s->rings; ktime_t start_time; int ret; - if (!io_allowed_run_tw(ctx)) + if (!io_allowed_run_tw(s)) { + printk("eexist\n"); return -EEXIST; - if (io_local_work_pending(ctx)) - io_run_local_work(ctx, min_events, + } + if (io_local_work_pending(s)) + io_run_local_work(s, min_events, max(IO_LOCAL_TW_DEFAULT_MAX, min_events)); io_run_task_work(); - if (unlikely(test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq))) - io_cqring_do_overflow_flush(ctx); - if (__io_cqring_events_user(ctx) >= min_events) + if (unlikely(test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &s->check_cq))) + io_cqring_do_overflow_flush(s); + if (__io_cqring_events_user(s) >= min_events) return 0; init_waitqueue_func_entry(&iowq.wq, io_wake_function); iowq.wq.private = current; INIT_LIST_HEAD(&iowq.wq.entry); - iowq.ctx = ctx; + iowq.s = s; iowq.cq_tail = READ_ONCE(rings->cq.head) + min_events; iowq.cq_min_tail = READ_ONCE(rings->cq.tail); - iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts); + iowq.nr_timeouts = atomic_read(&s->cq_timeouts); iowq.hit_timeout = 0; iowq.min_timeout = ext_arg->min_time; iowq.timeout = KTIME_MAX; @@ -2559,24 +2720,24 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, u32 flags, nr_wait = 1; if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) { - atomic_set(&ctx->cq_wait_nr, nr_wait); + atomic_set(&s->cq_wait_nr, nr_wait); set_current_state(TASK_INTERRUPTIBLE); } else { - prepare_to_wait_exclusive(&ctx->cq_wait, &iowq.wq, + prepare_to_wait_exclusive(&s->cq_wait, &iowq.wq, TASK_INTERRUPTIBLE); } - ret = io_cqring_wait_schedule(ctx, &iowq, start_time); + ret = io_cqring_wait_schedule(s, &iowq, start_time); __set_current_state(TASK_RUNNING); - atomic_set(&ctx->cq_wait_nr, IO_CQ_WAKE_INIT); + atomic_set(&s->cq_wait_nr, IO_CQ_WAKE_INIT); /* * Run task_work after scheduling and before io_should_wake(). * If we got woken because of task_work being processed, run it * now rather than let the caller do another wait loop. */ - if (io_local_work_pending(ctx)) - io_run_local_work(ctx, nr_wait, nr_wait); + if (io_local_work_pending(s)) + io_run_local_work(s, nr_wait, nr_wait); io_run_task_work(); /* @@ -2591,11 +2752,11 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, u32 flags, if (ret < 0) break; - check_cq = READ_ONCE(ctx->check_cq); + check_cq = READ_ONCE(s->check_cq); if (unlikely(check_cq)) { /* let the caller flush overflows, retry */ if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT)) - io_cqring_do_overflow_flush(ctx); + io_cqring_do_overflow_flush(s); if (check_cq & BIT(IO_CHECK_CQ_DROPPED_BIT)) { ret = -EBADR; break; @@ -2609,19 +2770,19 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, u32 flags, cond_resched(); } while (1); - if (!(ctx->flags & IORING_SETUP_DEFER_TASKRUN)) - finish_wait(&ctx->cq_wait, &iowq.wq); + if (!(s->ring_flags & IORING_SETUP_DEFER_TASKRUN)) + finish_wait(&s->cq_wait, &iowq.wq); restore_saved_sigmask_unless(ret == -EINTR); return READ_ONCE(rings->cq.head) == READ_ONCE(rings->cq.tail) ? ret : 0; } -static void io_rings_free(struct io_ring_ctx *ctx) +static void io_rings_free(struct user_struct *user, struct io_sq_cq *s) { - io_free_region(ctx, &ctx->sq_region); - io_free_region(ctx, &ctx->ring_region); - ctx->rings = NULL; - ctx->sq_sqes = NULL; + io_free_region(user, &s->sq_region); + io_free_region(user, &s->ring_region); + s->rings = NULL; + s->sq_sqes = NULL; } unsigned long rings_size(unsigned int flags, unsigned int sq_entries, @@ -2664,22 +2825,27 @@ unsigned long rings_size(unsigned int flags, unsigned int sq_entries, static void io_req_caches_free(struct io_ring_ctx *ctx) { struct io_kiocb *req; - int nr = 0; - - mutex_lock(&ctx->uring_lock); - - while (!io_req_cache_empty(ctx)) { - req = io_extract_req(ctx); - kmem_cache_free(req_cachep, req); - nr++; + struct io_sq_cq *s; + int i, nr = 0; + + io_for_each_s(ctx, s, i) { + mutex_lock(&s->ring_lock); + while (!io_req_cache_empty(&s->submit_state)) { + req = io_extract_req(&s->submit_state); + kmem_cache_free(req_cachep, req); + nr++; + } + mutex_unlock(&s->ring_lock); } if (nr) percpu_ref_put_many(&ctx->refs, nr); - mutex_unlock(&ctx->uring_lock); } static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx) { + struct io_sq_cq *s; + int i; + io_sq_thread_finish(ctx); mutex_lock(&ctx->uring_lock); @@ -2687,19 +2853,21 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx) io_sqe_files_unregister(ctx); io_cqring_overflow_kill(ctx); io_eventfd_unregister(ctx); - io_alloc_cache_free(&ctx->apoll_cache, kfree); - io_alloc_cache_free(&ctx->netmsg_cache, io_netmsg_cache_free); - io_alloc_cache_free(&ctx->rw_cache, io_rw_cache_free); - io_alloc_cache_free(&ctx->uring_cache, kfree); - io_alloc_cache_free(&ctx->msg_cache, kfree); - io_futex_cache_free(ctx); + io_for_each_s(ctx, s, i) { + mutex_lock(&s->ring_lock); + io_alloc_cache_free(&s->apoll_cache, kfree); + io_alloc_cache_free(&s->netmsg_cache, io_netmsg_cache_free); + io_alloc_cache_free(&s->rw_cache, io_rw_cache_free); + io_alloc_cache_free(&s->uring_cache, kfree); + io_alloc_cache_free(&s->msg_cache, kfree); + io_futex_cache_free(s); + mutex_unlock(&s->ring_lock); + } io_destroy_buffers(ctx); - io_free_region(ctx, &ctx->param_region); + io_free_region(ctx->user, &ctx->param_region); mutex_unlock(&ctx->uring_lock); if (ctx->sq_creds) put_cred(ctx->sq_creds); - if (ctx->submitter_task) - put_task_struct(ctx->submitter_task); WARN_ON_ONCE(!list_empty(&ctx->ltimeout_list)); @@ -2707,8 +2875,12 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx) mmdrop(ctx->mm_account); ctx->mm_account = NULL; } - io_rings_free(ctx); - + io_for_each_s(ctx, s, i) { + if (s->submitter_task) + put_task_struct(s->submitter_task); + io_rings_free(ctx->user, s); + free_s(s); + } if (!(ctx->flags & IORING_SETUP_NO_SQARRAY)) static_branch_dec(&io_key_has_sqarray); @@ -2718,7 +2890,7 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx) if (ctx->hash_map) io_wq_put_hash(ctx->hash_map); io_napi_free(ctx); - kvfree(ctx->cancel_table.hbs); + kfree(ctx->s); xa_destroy(&ctx->io_bl_xa); kfree(ctx); } @@ -2742,21 +2914,24 @@ static __cold void io_activate_pollwq_cb(struct callback_head *cb) __cold void io_activate_pollwq(struct io_ring_ctx *ctx) { + /* foo: something something */ + struct io_sq_cq *s = &ctx->s[0]; + spin_lock(&ctx->completion_lock); /* already activated or in progress */ if (ctx->poll_activated || ctx->poll_wq_task_work.func) goto out; if (WARN_ON_ONCE(!ctx->task_complete)) goto out; - if (!ctx->submitter_task) + if (!s->submitter_task) goto out; /* - * with ->submitter_task only the submitter task completes requests, we + * with ->s.submitter_task only the submitter task completes requests, we * only need to sync with it, which is done by injecting a tw */ init_task_work(&ctx->poll_wq_task_work, io_activate_pollwq_cb); percpu_ref_get(&ctx->refs); - if (task_work_add(ctx->submitter_task, &ctx->poll_wq_task_work, TWA_SIGNAL)) + if (task_work_add(s->submitter_task, &ctx->poll_wq_task_work, TWA_SIGNAL)) percpu_ref_put(&ctx->refs); out: spin_unlock(&ctx->completion_lock); @@ -2765,6 +2940,7 @@ out: static __poll_t io_uring_poll(struct file *file, poll_table *wait) { struct io_ring_ctx *ctx = file->private_data; + struct io_sq_cq *s = ctx->s; __poll_t mask = 0; if (unlikely(!ctx->poll_activated)) @@ -2776,7 +2952,7 @@ static __poll_t io_uring_poll(struct file *file, poll_table *wait) * io_commit_cqring */ smp_rmb(); - if (!io_sqring_full(ctx)) + if (!io_sqring_full(s)) mask |= EPOLLOUT | EPOLLWRNORM; /* @@ -2793,7 +2969,7 @@ static __poll_t io_uring_poll(struct file *file, poll_table *wait) * pushes them to do the flush. */ - if (__io_cqring_events_user(ctx) || io_has_work(ctx)) + if (__io_cqring_events_user(s) || io_has_work(s)) mask |= EPOLLIN | EPOLLRDNORM; return mask; @@ -2845,11 +3021,9 @@ static __cold void io_ring_exit_work(struct work_struct *work) * as nobody else will be looking for them. */ do { - if (test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)) { - mutex_lock(&ctx->uring_lock); - io_cqring_overflow_kill(ctx); - mutex_unlock(&ctx->uring_lock); - } + mutex_lock(&ctx->uring_lock); + io_cqring_overflow_kill(ctx); + mutex_unlock(&ctx->uring_lock); if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) io_move_task_work_from_local(ctx); @@ -3003,6 +3177,7 @@ static __cold bool io_uring_try_cancel_iowq(struct io_ring_ctx *ctx) bool ret = false; mutex_lock(&ctx->uring_lock); + //mutex_lock(&ctx->s.ring_lock); list_for_each_entry(node, &ctx->tctx_list, ctx_node) { struct io_uring_task *tctx = node->task->io_uring; @@ -3015,6 +3190,7 @@ static __cold bool io_uring_try_cancel_iowq(struct io_ring_ctx *ctx) cret = io_wq_cancel_cb(tctx->io_wq, io_cancel_ctx_cb, ctx, true); ret |= (cret != IO_WQ_CANCEL_NOTFOUND); } + //mutex_unlock(&ctx->s.ring_lock); mutex_unlock(&ctx->uring_lock); return ret; @@ -3030,12 +3206,16 @@ static __cold bool io_uring_try_cancel_requests(struct io_ring_ctx *ctx, /* set it so io_req_local_work_add() would wake us up */ if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) { - atomic_set(&ctx->cq_wait_nr, 1); + struct io_sq_cq *s; + int i; + + io_for_each_s(ctx, s, i) + atomic_set(&s->cq_wait_nr, 1); smp_mb(); } /* failed during ring init, it couldn't have issued any requests */ - if (!ctx->rings) + if (!ctx->s->rings) return false; if (!tctx) { @@ -3060,9 +3240,16 @@ static __cold bool io_uring_try_cancel_requests(struct io_ring_ctx *ctx, } } - if ((ctx->flags & IORING_SETUP_DEFER_TASKRUN) && - io_allowed_defer_tw_run(ctx)) - ret |= io_run_local_work(ctx, INT_MAX, INT_MAX) > 0; + if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) { + struct io_sq_cq *s; + int i; + + io_for_each_s(ctx, s, i) { + if (!io_allowed_defer_tw_run(s)) + continue; + ret |= io_run_local_work(s, INT_MAX, INT_MAX) > 0; + } + } ret |= io_cancel_defer_files(ctx, tctx, cancel_all); mutex_lock(&ctx->uring_lock); ret |= io_poll_remove_all(ctx, tctx, cancel_all); @@ -3143,9 +3330,9 @@ __cold void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd) io_run_task_work(); io_uring_drop_tctx_refs(current); xa_for_each(&tctx->xa, index, node) { - if (io_local_work_pending(node->ctx)) { - WARN_ON_ONCE(node->ctx->submitter_task && - node->ctx->submitter_task != current); + if (io_local_work_pending(node->ctx->s)) { + WARN_ON_ONCE(node->ctx->s[0].submitter_task && + node->ctx->s[0].submitter_task != current); goto end_wait; } } @@ -3288,6 +3475,7 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit, size_t, argsz) { struct io_ring_ctx *ctx; + struct io_sq_cq *s; struct file *file; long ret; @@ -3325,6 +3513,14 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit, if (unlikely(ctx->flags & IORING_SETUP_R_DISABLED)) goto out; + ret = -EINVAL; + s = ctx->s; + if (ctx->flags & IORING_SETUP_THREAD_ISSUER) { + s = io_uring_get_sq(ctx); + if (unlikely(!s)) + goto out; + } + /* * For SQ polling, the thread will do all submissions and completions. * Just return the requested submit count, and wake the thread if @@ -3343,14 +3539,16 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit, ret = to_submit; } else if (to_submit) { - ret = io_uring_add_tctx_node(ctx); - if (unlikely(ret)) + ret = io_uring_add_tctx_node(ctx, s); + if (unlikely(ret)) { + printk("fail ret %ld\n", ret); goto out; + } - mutex_lock(&ctx->uring_lock); - ret = io_submit_sqes(ctx, to_submit); + mutex_lock(&s->ring_lock); + ret = io_submit_sqes(s, to_submit); if (ret != to_submit) { - mutex_unlock(&ctx->uring_lock); + mutex_unlock(&s->ring_lock); goto out; } if (flags & IORING_ENTER_GETEVENTS) { @@ -3361,9 +3559,9 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit, * it should handle ownership problems if any. */ if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) - (void)io_run_local_work_locked(ctx, min_complete); + io_run_local_work_locked(s, min_complete); } - mutex_unlock(&ctx->uring_lock); + mutex_unlock(&s->ring_lock); } if (flags & IORING_ENTER_GETEVENTS) { @@ -3376,24 +3574,22 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit, * prevent racing with polled issue that got punted to * a workqueue. */ - mutex_lock(&ctx->uring_lock); + mutex_lock(&s->ring_lock); iopoll_locked: ret2 = io_validate_ext_arg(ctx, flags, argp, argsz); if (likely(!ret2)) { - min_complete = min(min_complete, - ctx->cq_entries); - ret2 = io_iopoll_check(ctx, min_complete); + min_complete = min(min_complete, s->cq_entries); + ret2 = io_iopoll_check(ctx, s, min_complete); } - mutex_unlock(&ctx->uring_lock); + mutex_unlock(&s->ring_lock); } else { struct ext_arg ext_arg = { .argsz = argsz }; ret2 = io_get_ext_arg(ctx, flags, argp, &ext_arg); if (likely(!ret2)) { - min_complete = min(min_complete, - ctx->cq_entries); - ret2 = io_cqring_wait(ctx, min_complete, flags, - &ext_arg); + min_complete = min(min_complete, s->cq_entries); + ret2 = io_cqring_wait(ctx, s, min_complete, + flags, &ext_arg); } } @@ -3406,8 +3602,7 @@ iopoll_locked: * as they are obviously ok with those drops. */ if (unlikely(ret2 == -EBADR)) - clear_bit(IO_CHECK_CQ_DROPPED_BIT, - &ctx->check_cq); + clear_bit(IO_CHECK_CQ_DROPPED_BIT, &s->check_cq); } } out: @@ -3434,8 +3629,9 @@ bool io_is_uring_fops(struct file *file) return file->f_op == &io_uring_fops; } -static __cold int io_allocate_scq_urings(struct io_ring_ctx *ctx, - struct io_uring_params *p) +__cold int io_allocate_scq_urings(struct io_sq_cq *s, unsigned int flags, + struct user_struct *user, + struct io_uring_params *p) { struct io_uring_region_desc rd; struct io_rings *rings; @@ -3443,27 +3639,27 @@ static __cold int io_allocate_scq_urings(struct io_ring_ctx *ctx, int ret; /* make sure these are sane, as we already accounted them */ - ctx->sq_entries = p->sq_entries; - ctx->cq_entries = p->cq_entries; + s->sq_entries = p->sq_entries; + s->cq_entries = p->cq_entries; - size = rings_size(ctx->flags, p->sq_entries, p->cq_entries, + size = rings_size(flags, p->sq_entries, p->cq_entries, &sq_array_offset); if (size == SIZE_MAX) return -EOVERFLOW; memset(&rd, 0, sizeof(rd)); rd.size = PAGE_ALIGN(size); - if (ctx->flags & IORING_SETUP_NO_MMAP) { + if (flags & IORING_SETUP_NO_MMAP) { rd.user_addr = p->cq_off.user_addr; rd.flags |= IORING_MEM_REGION_TYPE_USER; } - ret = io_create_region(ctx, &ctx->ring_region, &rd, IORING_OFF_CQ_RING); + ret = io_create_region(user, &s->ring_region, &rd, IORING_OFF_CQ_RING); if (ret) return ret; - ctx->rings = rings = io_region_get_ptr(&ctx->ring_region); + s->rings = rings = io_region_get_ptr(&s->ring_region); - if (!(ctx->flags & IORING_SETUP_NO_SQARRAY)) - ctx->sq_array = (u32 *)((char *)rings + sq_array_offset); + if (!(flags & IORING_SETUP_NO_SQARRAY)) + s->sq_array = (u32 *)((char *)rings + sq_array_offset); rings->sq_ring_mask = p->sq_entries - 1; rings->cq_ring_mask = p->cq_entries - 1; rings->sq_ring_entries = p->sq_entries; @@ -3474,22 +3670,22 @@ static __cold int io_allocate_scq_urings(struct io_ring_ctx *ctx, else size = array_size(sizeof(struct io_uring_sqe), p->sq_entries); if (size == SIZE_MAX) { - io_rings_free(ctx); + io_rings_free(user, s); return -EOVERFLOW; } memset(&rd, 0, sizeof(rd)); rd.size = PAGE_ALIGN(size); - if (ctx->flags & IORING_SETUP_NO_MMAP) { + if (flags & IORING_SETUP_NO_MMAP) { rd.user_addr = p->sq_off.user_addr; rd.flags |= IORING_MEM_REGION_TYPE_USER; } - ret = io_create_region(ctx, &ctx->sq_region, &rd, IORING_OFF_SQES); + ret = io_create_region(user, &s->sq_region, &rd, IORING_OFF_SQES); if (ret) { - io_rings_free(ctx); + io_rings_free(user, s); return ret; } - ctx->sq_sqes = io_region_get_ptr(&ctx->sq_region); + s->sq_sqes = io_region_get_ptr(&s->sq_region); return 0; } @@ -3584,18 +3780,19 @@ int io_uring_fill_params(unsigned entries, struct io_uring_params *p) } static __cold int io_uring_create(unsigned entries, struct io_uring_params *p, - struct io_uring_params __user *params) + struct io_uring_params __user *params, + u32 contexts) { struct io_ring_ctx *ctx; struct io_uring_task *tctx; struct file *file; - int ret; + int i, ret; ret = io_uring_fill_params(entries, p); if (unlikely(ret)) return ret; - ctx = io_ring_ctx_alloc(p); + ctx = io_ring_ctx_alloc(p, contexts); if (!ctx) return -ENOMEM; @@ -3661,13 +3858,27 @@ static __cold int io_uring_create(unsigned entries, struct io_uring_params *p, goto err; /* - * For DEFER_TASKRUN we require the completion task to be the same as the - * submission task. This implies that there is only one submitter, so enforce - * that. + * For DEFER_TASKRUN we require the completion task to be the same as + * the submission task. This implies that there is only one submitter, + * so enforce that. */ if (ctx->flags & IORING_SETUP_DEFER_TASKRUN && - !(ctx->flags & IORING_SETUP_SINGLE_ISSUER)) { + !(ctx->flags & IORING_SETUP_SINGLE_ISSUER)) goto err; + + /* + * thread issuer requires DEFER_TASKRUN, and is currently not + * compatible with SQPOLL or IOPOLL. + */ + if (ctx->flags & IORING_SETUP_THREAD_ISSUER) { + if (!(ctx->flags & IORING_SETUP_DEFER_TASKRUN)) + goto err; + if (!(ctx->flags & IORING_SETUP_SINGLE_ISSUER)) + goto err; + if (!(ctx->flags & IORING_SETUP_R_DISABLED)) + goto err; + if (ctx->flags & (IORING_SETUP_SQPOLL|IORING_SETUP_IOPOLL)) + goto err; } /* @@ -3679,12 +3890,16 @@ static __cold int io_uring_create(unsigned entries, struct io_uring_params *p, mmgrab(current->mm); ctx->mm_account = current->mm; - ret = io_allocate_scq_urings(ctx, p); - if (ret) - goto err; + for (i = 0; i < contexts; i++) { + struct io_sq_cq *s = &ctx->s[i]; - if (!(p->flags & IORING_SETUP_NO_SQARRAY)) - p->sq_off.array = (char *)ctx->sq_array - (char *)ctx->rings; + ret = io_allocate_scq_urings(s, ctx->flags, ctx->user, p); + if (ret) + goto err; + + if (!(p->flags & IORING_SETUP_NO_SQARRAY)) + p->sq_off.array = (char *)s->sq_array - (char *)s->rings; + } ret = io_sq_offload_create(ctx, p); if (ret) @@ -3706,7 +3921,7 @@ static __cold int io_uring_create(unsigned entries, struct io_uring_params *p, if (ctx->flags & IORING_SETUP_SINGLE_ISSUER && !(ctx->flags & IORING_SETUP_R_DISABLED)) - WRITE_ONCE(ctx->submitter_task, get_task_struct(current)); + WRITE_ONCE(ctx->s[0].submitter_task, get_task_struct(current)); file = io_uring_get_file(ctx); if (IS_ERR(file)) { @@ -3714,7 +3929,7 @@ static __cold int io_uring_create(unsigned entries, struct io_uring_params *p, goto err; } - ret = __io_uring_add_tctx_node(ctx); + ret = __io_uring_add_tctx_node(ctx, NULL); if (ret) goto err_fput; tctx = current->io_uring; @@ -3745,7 +3960,8 @@ err_fput: * ring size, we return the actual sq/cq ring sizes (among other things) in the * params structure passed in. */ -static long io_uring_setup(u32 entries, struct io_uring_params __user *params) +static long io_uring_setup(u32 entries, struct io_uring_params __user *params, + u32 contexts) { struct io_uring_params p; int i; @@ -3765,10 +3981,16 @@ static long io_uring_setup(u32 entries, struct io_uring_params __user *params) IORING_SETUP_SQE128 | IORING_SETUP_CQE32 | IORING_SETUP_SINGLE_ISSUER | IORING_SETUP_DEFER_TASKRUN | IORING_SETUP_NO_MMAP | IORING_SETUP_REGISTERED_FD_ONLY | - IORING_SETUP_NO_SQARRAY | IORING_SETUP_HYBRID_IOPOLL)) + IORING_SETUP_NO_SQARRAY | IORING_SETUP_HYBRID_IOPOLL | + IORING_SETUP_THREAD_ISSUER)) return -EINVAL; - return io_uring_create(entries, &p, params); + if (!(p.flags & IORING_SETUP_THREAD_ISSUER)) + contexts = 1; + else if (contexts > IO_URING_MAX_CONTEXTS) + return -EINVAL; + + return io_uring_create(entries, &p, params, contexts); } static inline bool io_uring_allowed(void) @@ -3789,13 +4011,29 @@ static inline bool io_uring_allowed(void) return in_group_p(io_uring_group); } -SYSCALL_DEFINE2(io_uring_setup, u32, entries, - struct io_uring_params __user *, params) +SYSCALL_DEFINE3(io_uring_setup, u32, entries, + struct io_uring_params __user *, params, u32, contexts) { if (!io_uring_allowed()) return -EPERM; - return io_uring_setup(entries, params); + return io_uring_setup(entries, params, contexts); +} + +void io_uring_unlock_ctx(struct io_ring_ctx *ctx) +{ + int i; + + for (i = ctx->nr_sq - 1; i >= 0; i--) + mutex_unlock(&ctx->uring_lock); +} + +void io_uring_lock_ctx(struct io_ring_ctx *ctx) +{ + int i; + + for (i = 0; i < ctx->nr_sq; i++) + mutex_lock(&ctx->uring_lock); } static int __init io_uring_init(void) diff --git a/io_uring/io_uring.h b/io_uring/io_uring.h index 032758b28d78..e0e7b26b5d7a 100644 --- a/io_uring/io_uring.h +++ b/io_uring/io_uring.h @@ -39,7 +39,7 @@ enum { struct io_wait_queue { struct wait_queue_entry wq; - struct io_ring_ctx *ctx; + struct io_sq_cq *s; unsigned cq_tail; unsigned cq_min_tail; unsigned nr_timeouts; @@ -56,15 +56,15 @@ struct io_wait_queue { static inline bool io_should_wake(struct io_wait_queue *iowq) { - struct io_ring_ctx *ctx = iowq->ctx; - int dist = READ_ONCE(ctx->rings->cq.tail) - (int) iowq->cq_tail; + struct io_sq_cq *s = iowq->s; + int dist = READ_ONCE(s->rings->cq.tail) - (int) iowq->cq_tail; /* * Wake up if we have enough events, or if a timeout occurred since we * started waiting. For timeouts, we always want to return to userspace, * regardless of event count. */ - return dist >= 0 || atomic_read(&ctx->cq_timeouts) != iowq->nr_timeouts; + return dist >= 0 || atomic_read(&s->cq_timeouts) != iowq->nr_timeouts; } #define IORING_MAX_ENTRIES 32768 @@ -73,11 +73,14 @@ static inline bool io_should_wake(struct io_wait_queue *iowq) unsigned long rings_size(unsigned int flags, unsigned int sq_entries, unsigned int cq_entries, size_t *sq_offset); int io_uring_fill_params(unsigned entries, struct io_uring_params *p); -bool io_cqe_cache_refill(struct io_ring_ctx *ctx, bool overflow); +int io_allocate_scq_urings(struct io_sq_cq *s, unsigned int flags, + struct user_struct *user, struct io_uring_params *p); +int init_s(struct io_ring_ctx *ctx, struct io_sq_cq *s, unsigned int cq_entries); +bool io_cqe_cache_refill(struct io_sq_cq *s, bool overflow); int io_run_task_work_sig(struct io_ring_ctx *ctx); void io_req_defer_failed(struct io_kiocb *req, s32 res); -bool io_post_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags); -void io_add_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags); +bool io_post_aux_cqe(struct io_sq_cq *s, u64 user_data, s32 res, u32 cflags); +void io_add_aux_cqe(struct io_sq_cq *s, u64 user_data, s32 res, u32 cflags); bool io_req_post_cqe(struct io_kiocb *req, s32 res, u32 cflags); void __io_commit_cqring_flush(struct io_ring_ctx *ctx); @@ -105,9 +108,9 @@ int io_ring_add_registered_file(struct io_uring_task *tctx, struct file *file, void io_req_queue_iowq(struct io_kiocb *req); int io_poll_issue(struct io_kiocb *req, struct io_tw_state *ts); -int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr); -int io_do_iopoll(struct io_ring_ctx *ctx, bool force_nonspin); -void __io_submit_flush_completions(struct io_ring_ctx *ctx); +int io_submit_sqes(struct io_sq_cq *s, unsigned int nr); +int io_do_iopoll(struct io_sq_cq *s, bool force_nonspin); +void __io_submit_flush_completions(struct io_sq_cq *s); struct io_wq_work *io_wq_free_work(struct io_wq_work *work); void io_wq_submit_work(struct io_wq_work *work); @@ -115,23 +118,25 @@ void io_wq_submit_work(struct io_wq_work *work); void io_free_req(struct io_kiocb *req); void io_queue_next(struct io_kiocb *req); void io_task_refs_refill(struct io_uring_task *tctx); -bool __io_alloc_req_refill(struct io_ring_ctx *ctx); +bool __io_alloc_req_refill(struct io_sq_cq *s); bool io_match_task_safe(struct io_kiocb *head, struct io_uring_task *tctx, bool cancel_all); void io_activate_pollwq(struct io_ring_ctx *ctx); -static inline void io_lockdep_assert_cq_locked(struct io_ring_ctx *ctx) +static inline void io_lockdep_assert_cq_locked(struct io_sq_cq *s) { #if defined(CONFIG_PROVE_LOCKING) + struct io_ring_ctx *ctx = s->ctx; + lockdep_assert(in_task()); - if (ctx->flags & IORING_SETUP_IOPOLL) { - lockdep_assert_held(&ctx->uring_lock); + if (s->ring_flags & IORING_SETUP_IOPOLL) { + lockdep_assert_held(&s->ring_lock); } else if (!ctx->task_complete) { lockdep_assert_held(&ctx->completion_lock); - } else if (ctx->submitter_task) { + } else if (s->submitter_task) { /* * ->submitter_task may be NULL and we can still post a CQE, * if the ring has been setup with IORING_SETUP_R_DISABLED. @@ -141,7 +146,7 @@ static inline void io_lockdep_assert_cq_locked(struct io_ring_ctx *ctx) if (percpu_ref_is_dying(&ctx->refs)) lockdep_assert(current_work()); else - lockdep_assert(current == ctx->submitter_task); + lockdep_assert(current == s->submitter_task); } #endif } @@ -151,40 +156,39 @@ static inline void io_req_task_work_add(struct io_kiocb *req) __io_req_task_work_add(req, 0); } -static inline void io_submit_flush_completions(struct io_ring_ctx *ctx) +static inline void io_submit_flush_completions(struct io_sq_cq *s) { - if (!wq_list_empty(&ctx->submit_state.compl_reqs) || - ctx->submit_state.cq_flush) - __io_submit_flush_completions(ctx); + if (!wq_list_empty(&s->submit_state.compl_reqs) || + s->submit_state.cq_flush) + __io_submit_flush_completions(s); } #define io_for_each_link(pos, head) \ for (pos = (head); pos; pos = pos->link) -static inline bool io_get_cqe_overflow(struct io_ring_ctx *ctx, - struct io_uring_cqe **ret, - bool overflow) +static inline bool io_get_cqe_overflow(struct io_sq_cq *s, + struct io_uring_cqe **ret, bool overflow) { - io_lockdep_assert_cq_locked(ctx); + io_lockdep_assert_cq_locked(s); - if (unlikely(ctx->cqe_cached >= ctx->cqe_sentinel)) { - if (unlikely(!io_cqe_cache_refill(ctx, overflow))) + if (unlikely(s->cqe_cached >= s->cqe_sentinel)) { + if (unlikely(!io_cqe_cache_refill(s, overflow))) return false; } - *ret = ctx->cqe_cached; - ctx->cached_cq_tail++; - ctx->cqe_cached++; - if (ctx->flags & IORING_SETUP_CQE32) - ctx->cqe_cached++; + *ret = s->cqe_cached; + s->cached_cq_tail++; + s->cqe_cached++; + if (s->ring_flags & IORING_SETUP_CQE32) + s->cqe_cached++; return true; } -static inline bool io_get_cqe(struct io_ring_ctx *ctx, struct io_uring_cqe **ret) +static inline bool io_get_cqe(struct io_sq_cq *s, struct io_uring_cqe **ret) { - return io_get_cqe_overflow(ctx, ret, false); + return io_get_cqe_overflow(s, ret, false); } -static __always_inline bool io_fill_cqe_req(struct io_ring_ctx *ctx, +static __always_inline bool io_fill_cqe_req(struct io_sq_cq *s, struct io_kiocb *req) { struct io_uring_cqe *cqe; @@ -194,12 +198,12 @@ static __always_inline bool io_fill_cqe_req(struct io_ring_ctx *ctx, * submission (by quite a lot). Increment the overflow count in * the ring. */ - if (unlikely(!io_get_cqe(ctx, &cqe))) + if (unlikely(!io_get_cqe(s, &cqe))) return false; memcpy(cqe, &req->cqe, sizeof(*cqe)); - if (ctx->flags & IORING_SETUP_CQE32) { + if (s->ring_flags & IORING_SETUP_CQE32) { memcpy(cqe->big_cqe, &req->big_cqe, sizeof(*cqe)); memset(&req->big_cqe, 0, sizeof(req->big_cqe)); } @@ -256,16 +260,15 @@ static inline void io_put_file(struct io_kiocb *req) fput(req->file); } -static inline void io_ring_submit_unlock(struct io_ring_ctx *ctx, +static inline void io_ring_submit_unlock(struct io_sq_cq *s, unsigned issue_flags) { - lockdep_assert_held(&ctx->uring_lock); + lockdep_assert_held(&s->ring_lock); if (unlikely(issue_flags & IO_URING_F_UNLOCKED)) - mutex_unlock(&ctx->uring_lock); + mutex_unlock(&s->ring_lock); } -static inline void io_ring_submit_lock(struct io_ring_ctx *ctx, - unsigned issue_flags) +static inline void io_ring_submit_lock(struct io_sq_cq *s, unsigned issue_flags) { /* * "Normal" inline submissions always hold the uring_lock, since we @@ -274,14 +277,14 @@ static inline void io_ring_submit_lock(struct io_ring_ctx *ctx, * from an async worker thread, grab the lock for that case. */ if (unlikely(issue_flags & IO_URING_F_UNLOCKED)) - mutex_lock(&ctx->uring_lock); - lockdep_assert_held(&ctx->uring_lock); + mutex_lock(&s->ring_lock); + lockdep_assert_held(&s->ring_lock); } -static inline void io_commit_cqring(struct io_ring_ctx *ctx) +static inline void io_commit_cqring(struct io_sq_cq *s) { /* order cqe stores with ring update */ - smp_store_release(&ctx->rings->cq.tail, ctx->cached_cq_tail); + smp_store_release(&s->rings->cq.tail, s->cached_cq_tail); } static inline void io_poll_wq_wake(struct io_ring_ctx *ctx) @@ -291,7 +294,7 @@ static inline void io_poll_wq_wake(struct io_ring_ctx *ctx) poll_to_key(EPOLL_URING_WAKE | EPOLLIN)); } -static inline void io_cqring_wake(struct io_ring_ctx *ctx) +static inline void io_cqring_wake(struct io_sq_cq *s) { /* * Trigger waitqueue handler on all waiters on our waitqueue. This @@ -303,14 +306,14 @@ static inline void io_cqring_wake(struct io_ring_ctx *ctx) * waitqueue handlers, we know we have a dependency between eventfd or * epoll and should terminate multishot poll at that point. */ - if (wq_has_sleeper(&ctx->cq_wait)) - __wake_up(&ctx->cq_wait, TASK_NORMAL, 0, + if (wq_has_sleeper(&s->cq_wait)) + __wake_up(&s->cq_wait, TASK_NORMAL, 0, poll_to_key(EPOLL_URING_WAKE | EPOLLIN)); } -static inline bool io_sqring_full(struct io_ring_ctx *ctx) +static inline bool io_sqring_full(struct io_sq_cq *s) { - struct io_rings *r = ctx->rings; + struct io_rings *r = s->rings; /* * SQPOLL must use the actual sqring head, as using the cached_sq_head @@ -319,17 +322,17 @@ static inline bool io_sqring_full(struct io_ring_ctx *ctx) * since this helper is just used for SQPOLL sqring waits (or POLLOUT), * just read the actual sqring head unconditionally. */ - return READ_ONCE(r->sq.tail) - READ_ONCE(r->sq.head) == ctx->sq_entries; + return READ_ONCE(r->sq.tail) - READ_ONCE(r->sq.head) == s->sq_entries; } -static inline unsigned int io_sqring_entries(struct io_ring_ctx *ctx) +static inline unsigned int io_sqring_entries(struct io_sq_cq *s) { - struct io_rings *rings = ctx->rings; + struct io_rings *rings = s->rings; unsigned int entries; /* make sure SQ entry isn't read before tail */ - entries = smp_load_acquire(&rings->sq.tail) - ctx->cached_sq_head; - return min(entries, ctx->sq_entries); + entries = smp_load_acquire(&rings->sq.tail) - s->cached_sq_head; + return min(entries, s->sq_entries); } static inline int io_run_task_work(void) @@ -370,32 +373,35 @@ static inline int io_run_task_work(void) return ret; } -static inline bool io_local_work_pending(struct io_ring_ctx *ctx) +static inline bool io_local_work_pending(struct io_sq_cq *s) { - return !llist_empty(&ctx->work_llist) || !llist_empty(&ctx->retry_llist); + return !llist_empty(&s->work_llist) || !llist_empty(&s->retry_llist); } -static inline bool io_task_work_pending(struct io_ring_ctx *ctx) +static inline bool io_task_work_pending(struct io_sq_cq *s) { - return task_work_pending(current) || io_local_work_pending(ctx); + return task_work_pending(current) || io_local_work_pending(s); } -static inline void io_tw_lock(struct io_ring_ctx *ctx, struct io_tw_state *ts) +static inline void io_tw_lock(struct io_sq_cq *s) { - lockdep_assert_held(&ctx->uring_lock); + lockdep_assert_held(&s->ring_lock); } +#define io_for_each_s(ctx, s, i) \ + for (i = 0, s = &ctx->s[0]; i < (ctx)->nr_sq; i++, s++) + /* * Don't complete immediately but use deferred completion infrastructure. * Protected by ->uring_lock and can only be used either with * IO_URING_F_COMPLETE_DEFER or inside a tw handler holding the mutex. */ -static inline void io_req_complete_defer(struct io_kiocb *req) - __must_hold(&req->ctx->uring_lock) +static inline void io_req_complete_defer(struct io_kiocb *req, struct io_sq_cq *s) + __must_hold(&s->ring_lock) { - struct io_submit_state *state = &req->ctx->submit_state; + struct io_submit_state *state = &s->submit_state; - lockdep_assert_held(&req->ctx->uring_lock); + lockdep_assert_held(&s->ring_lock); wq_list_add_tail(&req->comp_list, &state->compl_reqs); } @@ -416,42 +422,48 @@ static inline void io_get_task_refs(int nr) io_task_refs_refill(tctx); } -static inline bool io_req_cache_empty(struct io_ring_ctx *ctx) +static inline bool io_req_cache_empty(struct io_submit_state *s) { - return !ctx->submit_state.free_list.next; + return !s->free_list.next; } extern struct kmem_cache *req_cachep; extern struct kmem_cache *io_buf_cachep; -static inline struct io_kiocb *io_extract_req(struct io_ring_ctx *ctx) +static inline struct io_kiocb *io_extract_req(struct io_submit_state *state) { struct io_kiocb *req; - req = container_of(ctx->submit_state.free_list.next, struct io_kiocb, comp_list); - wq_stack_extract(&ctx->submit_state.free_list); + req = container_of(state->free_list.next, struct io_kiocb, comp_list); + wq_stack_extract(&state->free_list); return req; } -static inline bool io_alloc_req(struct io_ring_ctx *ctx, struct io_kiocb **req) +static inline bool io_alloc_req(struct io_sq_cq *s, struct io_kiocb **req) { - if (unlikely(io_req_cache_empty(ctx))) { - if (!__io_alloc_req_refill(ctx)) + if (unlikely(io_req_cache_empty(&s->submit_state))) { + if (!__io_alloc_req_refill(s)) return false; } - *req = io_extract_req(ctx); + *req = io_extract_req(&s->submit_state); return true; } -static inline bool io_allowed_defer_tw_run(struct io_ring_ctx *ctx) +static inline bool io_allowed_defer_tw_run(struct io_sq_cq *s) { - return likely(ctx->submitter_task == current); + if (s->submitter_task == current) + return true; + if (s->ring_flags & IORING_SETUP_THREAD_ISSUER) + return same_thread_group(s->submitter_task, current); + return false; + } -static inline bool io_allowed_run_tw(struct io_ring_ctx *ctx) +static inline bool io_allowed_run_tw(struct io_sq_cq *s) { - return likely(!(ctx->flags & IORING_SETUP_DEFER_TASKRUN) || - ctx->submitter_task == current); + if (!(s->ring_flags & IORING_SETUP_DEFER_TASKRUN)) + return true; + return io_allowed_defer_tw_run(s); } /* @@ -509,9 +521,13 @@ enum { IO_CHECK_CQ_DROPPED_BIT, }; -static inline bool io_has_work(struct io_ring_ctx *ctx) +static inline bool io_has_work(struct io_sq_cq *s) { - return test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq) || - io_local_work_pending(ctx); + return test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &s->check_cq) || + io_local_work_pending(s); } + +void io_uring_unlock_ctx(struct io_ring_ctx *ctx); +void io_uring_lock_ctx(struct io_ring_ctx *ctx); + #endif diff --git a/io_uring/kbuf.c b/io_uring/kbuf.c index 15e5e6ec5968..62ba5913d55f 100644 --- a/io_uring/kbuf.c +++ b/io_uring/kbuf.c @@ -34,7 +34,7 @@ struct io_provide_buf { static inline struct io_buffer_list *io_buffer_get_list(struct io_ring_ctx *ctx, unsigned int bgid) { - lockdep_assert_held(&ctx->uring_lock); + lockdep_assert_held(&ctx->s->ring_lock); return xa_load(&ctx->io_bl_xa, bgid); } @@ -58,7 +58,7 @@ bool io_kbuf_recycle_legacy(struct io_kiocb *req, unsigned issue_flags) struct io_buffer_list *bl; struct io_buffer *buf; - io_ring_submit_lock(ctx, issue_flags); + io_ring_submit_lock(req->sq, issue_flags); buf = req->kbuf; bl = io_buffer_get_list(ctx, buf->bgid); @@ -66,7 +66,7 @@ bool io_kbuf_recycle_legacy(struct io_kiocb *req, unsigned issue_flags) req->flags &= ~REQ_F_BUFFER_SELECTED; req->buf_index = buf->bgid; - io_ring_submit_unlock(ctx, issue_flags); + io_ring_submit_unlock(req->sq, issue_flags); return true; } @@ -178,7 +178,7 @@ void __user *io_buffer_select(struct io_kiocb *req, size_t *len, struct io_buffer_list *bl; void __user *ret = NULL; - io_ring_submit_lock(req->ctx, issue_flags); + io_ring_submit_lock(req->sq, issue_flags); bl = io_buffer_get_list(ctx, req->buf_index); if (likely(bl)) { @@ -187,7 +187,7 @@ void __user *io_buffer_select(struct io_kiocb *req, size_t *len, else ret = io_provided_buffer_select(req, len, bl); } - io_ring_submit_unlock(req->ctx, issue_flags); + io_ring_submit_unlock(req->sq, issue_flags); return ret; } @@ -291,7 +291,7 @@ int io_buffers_select(struct io_kiocb *req, struct buf_sel_arg *arg, struct io_buffer_list *bl; int ret = -ENOENT; - io_ring_submit_lock(ctx, issue_flags); + io_ring_submit_lock(req->sq, issue_flags); bl = io_buffer_get_list(ctx, req->buf_index); if (unlikely(!bl)) goto out_unlock; @@ -313,7 +313,7 @@ int io_buffers_select(struct io_kiocb *req, struct buf_sel_arg *arg, ret = io_provided_buffers_select(req, &arg->out_len, bl, arg->iovs); } out_unlock: - io_ring_submit_unlock(ctx, issue_flags); + io_ring_submit_unlock(req->sq, issue_flags); return ret; } @@ -351,7 +351,7 @@ static int __io_remove_buffers(struct io_ring_ctx *ctx, if (bl->flags & IOBL_BUF_RING) { i = bl->buf_ring->tail - bl->head; - io_free_region(ctx, &bl->region); + io_free_region(ctx->user, &bl->region); /* make sure it's seen as empty */ INIT_LIST_HEAD(&bl->buf_list); bl->flags &= ~IOBL_BUF_RING; @@ -439,7 +439,7 @@ int io_remove_buffers(struct io_kiocb *req, unsigned int issue_flags) struct io_buffer_list *bl; int ret = 0; - io_ring_submit_lock(ctx, issue_flags); + io_ring_submit_lock(req->sq, issue_flags); ret = -ENOENT; bl = io_buffer_get_list(ctx, p->bgid); @@ -449,7 +449,7 @@ int io_remove_buffers(struct io_kiocb *req, unsigned int issue_flags) if (!(bl->flags & IOBL_BUF_RING)) ret = __io_remove_buffers(ctx, bl, p->nbufs); } - io_ring_submit_unlock(ctx, issue_flags); + io_ring_submit_unlock(req->sq, issue_flags); if (ret < 0) req_set_fail(req); io_req_set_res(req, ret, 0); @@ -572,7 +572,7 @@ int io_provide_buffers(struct io_kiocb *req, unsigned int issue_flags) struct io_buffer_list *bl; int ret = 0; - io_ring_submit_lock(ctx, issue_flags); + io_ring_submit_lock(req->sq, issue_flags); bl = io_buffer_get_list(ctx, p->bgid); if (unlikely(!bl)) { @@ -596,7 +596,7 @@ int io_provide_buffers(struct io_kiocb *req, unsigned int issue_flags) ret = io_add_buffers(ctx, p, bl); err: - io_ring_submit_unlock(ctx, issue_flags); + io_ring_submit_unlock(req->sq, issue_flags); if (ret < 0) req_set_fail(req); @@ -680,7 +680,7 @@ int io_register_pbuf_ring(struct io_ring_ctx *ctx, void __user *arg) io_buffer_add_list(ctx, bl, reg.bgid); return 0; fail: - io_free_region(ctx, &bl->region); + io_free_region(ctx->user, &bl->region); kfree(free_bl); return ret; } diff --git a/io_uring/memmap.c b/io_uring/memmap.c index dda846190fbd..4d9f3a9a8b8c 100644 --- a/io_uring/memmap.c +++ b/io_uring/memmap.c @@ -87,7 +87,7 @@ enum { IO_REGION_F_SINGLE_REF = 4, }; -void io_free_region(struct io_ring_ctx *ctx, struct io_mapped_region *mr) +void io_free_region(struct user_struct *user, struct io_mapped_region *mr) { if (mr->pages) { long nr_refs = mr->nr_pages; @@ -104,8 +104,8 @@ void io_free_region(struct io_ring_ctx *ctx, struct io_mapped_region *mr) } if ((mr->flags & IO_REGION_F_VMAP) && mr->ptr) vunmap(mr->ptr); - if (mr->nr_pages && ctx->user) - __io_unaccount_mem(ctx->user, mr->nr_pages); + if (mr->nr_pages && user) + __io_unaccount_mem(user, mr->nr_pages); memset(mr, 0, sizeof(*mr)); } @@ -130,9 +130,8 @@ static int io_region_init_ptr(struct io_mapped_region *mr) return 0; } -static int io_region_pin_pages(struct io_ring_ctx *ctx, - struct io_mapped_region *mr, - struct io_uring_region_desc *reg) +static int io_region_pin_pages(struct io_mapped_region *mr, + struct io_uring_region_desc *reg) { unsigned long size = mr->nr_pages << PAGE_SHIFT; struct page **pages; @@ -149,8 +148,7 @@ static int io_region_pin_pages(struct io_ring_ctx *ctx, return 0; } -static int io_region_allocate_pages(struct io_ring_ctx *ctx, - struct io_mapped_region *mr, +static int io_region_allocate_pages(struct io_mapped_region *mr, struct io_uring_region_desc *reg, unsigned long mmap_offset) { @@ -184,7 +182,7 @@ done: return 0; } -int io_create_region(struct io_ring_ctx *ctx, struct io_mapped_region *mr, +int io_create_region(struct user_struct *user, struct io_mapped_region *mr, struct io_uring_region_desc *reg, unsigned long mmap_offset) { @@ -210,17 +208,17 @@ int io_create_region(struct io_ring_ctx *ctx, struct io_mapped_region *mr, return -EOVERFLOW; nr_pages = reg->size >> PAGE_SHIFT; - if (ctx->user) { - ret = __io_account_mem(ctx->user, nr_pages); + if (user) { + ret = __io_account_mem(user, nr_pages); if (ret) return ret; } mr->nr_pages = nr_pages; if (reg->flags & IORING_MEM_REGION_TYPE_USER) - ret = io_region_pin_pages(ctx, mr, reg); + ret = io_region_pin_pages(mr, reg); else - ret = io_region_allocate_pages(ctx, mr, reg, mmap_offset); + ret = io_region_allocate_pages(mr, reg, mmap_offset); if (ret) goto out_free; @@ -229,7 +227,7 @@ int io_create_region(struct io_ring_ctx *ctx, struct io_mapped_region *mr, goto out_free; return 0; out_free: - io_free_region(ctx, mr); + io_free_region(user, mr); return ret; } @@ -241,7 +239,7 @@ int io_create_region_mmap_safe(struct io_ring_ctx *ctx, struct io_mapped_region int ret; memcpy(&tmp_mr, mr, sizeof(tmp_mr)); - ret = io_create_region(ctx, &tmp_mr, reg, mmap_offset); + ret = io_create_region(ctx->user, &tmp_mr, reg, mmap_offset); if (ret) return ret; @@ -258,17 +256,23 @@ static struct io_mapped_region *io_mmap_get_region(struct io_ring_ctx *ctx, loff_t pgoff) { loff_t offset = pgoff << PAGE_SHIFT; - unsigned int bgid; + unsigned int index; switch (offset & IORING_OFF_MMAP_MASK) { case IORING_OFF_SQ_RING: case IORING_OFF_CQ_RING: - return &ctx->ring_region; + index = (offset & ~IORING_OFF_MMAP_MASK) >> IORING_OFF_ISSUER_SHIFT; + if (index >= ctx->nr_sq) + return NULL; + return &ctx->s[index].ring_region; case IORING_OFF_SQES: - return &ctx->sq_region; + index = (offset & ~IORING_OFF_MMAP_MASK) >> IORING_OFF_ISSUER_SHIFT; + if (index >= ctx->nr_sq) + return NULL; + return &ctx->s[index].sq_region; case IORING_OFF_PBUF_RING: - bgid = (offset & ~IORING_OFF_MMAP_MASK) >> IORING_OFF_PBUF_SHIFT; - return io_pbuf_get_region(ctx, bgid); + index = (offset & ~IORING_OFF_MMAP_MASK) >> IORING_OFF_PBUF_SHIFT; + return io_pbuf_get_region(ctx, index); case IORING_MAP_OFF_PARAM_REGION: return &ctx->param_region; } diff --git a/io_uring/memmap.h b/io_uring/memmap.h index c898dcba2b4e..d9737269c107 100644 --- a/io_uring/memmap.h +++ b/io_uring/memmap.h @@ -13,8 +13,8 @@ unsigned long io_uring_get_unmapped_area(struct file *file, unsigned long addr, unsigned long flags); int io_uring_mmap(struct file *file, struct vm_area_struct *vma); -void io_free_region(struct io_ring_ctx *ctx, struct io_mapped_region *mr); -int io_create_region(struct io_ring_ctx *ctx, struct io_mapped_region *mr, +void io_free_region(struct user_struct *user, struct io_mapped_region *mr); +int io_create_region(struct user_struct *user, struct io_mapped_region *mr, struct io_uring_region_desc *reg, unsigned long mmap_offset); diff --git a/io_uring/msg_ring.c b/io_uring/msg_ring.c index bd3cd78d2dba..c5bdaf12a333 100644 --- a/io_uring/msg_ring.c +++ b/io_uring/msg_ring.c @@ -74,12 +74,13 @@ static inline bool io_msg_need_remote(struct io_ring_ctx *target_ctx) static void io_msg_tw_complete(struct io_kiocb *req, struct io_tw_state *ts) { struct io_ring_ctx *ctx = req->ctx; + struct io_sq_cq *s = req->sq; - io_add_aux_cqe(ctx, req->cqe.user_data, req->cqe.res, req->cqe.flags); - if (spin_trylock(&ctx->msg_lock)) { - if (io_alloc_cache_put(&ctx->msg_cache, req)) + io_add_aux_cqe(s, req->cqe.user_data, req->cqe.res, req->cqe.flags); + if (spin_trylock(&s->msg_lock)) { + if (io_alloc_cache_put(&s->msg_cache, req)) req = NULL; - spin_unlock(&ctx->msg_lock); + spin_unlock(&s->msg_lock); } if (req) kmem_cache_free(req_cachep, req); @@ -89,7 +90,7 @@ static void io_msg_tw_complete(struct io_kiocb *req, struct io_tw_state *ts) static int io_msg_remote_post(struct io_ring_ctx *ctx, struct io_kiocb *req, int res, u32 cflags, u64 user_data) { - req->tctx = READ_ONCE(ctx->submitter_task->io_uring); + req->tctx = READ_ONCE(ctx->s[0].submitter_task->io_uring); if (!req->tctx) { kmem_cache_free(req_cachep, req); return -EOWNERDEAD; @@ -103,13 +104,13 @@ static int io_msg_remote_post(struct io_ring_ctx *ctx, struct io_kiocb *req, return 0; } -static struct io_kiocb *io_msg_get_kiocb(struct io_ring_ctx *ctx) +static struct io_kiocb *io_msg_get_kiocb(struct io_sq_cq *s) { struct io_kiocb *req = NULL; - if (spin_trylock(&ctx->msg_lock)) { - req = io_alloc_cache_get(&ctx->msg_cache); - spin_unlock(&ctx->msg_lock); + if (spin_trylock(&s->msg_lock)) { + req = io_alloc_cache_get(&s->msg_cache); + spin_unlock(&s->msg_lock); if (req) return req; } @@ -122,7 +123,7 @@ static int io_msg_data_remote(struct io_ring_ctx *target_ctx, struct io_kiocb *target; u32 flags = 0; - target = io_msg_get_kiocb(target_ctx); + target = io_msg_get_kiocb(target_ctx->s); if (unlikely(!target)) return -ENOMEM; @@ -157,7 +158,7 @@ static int __io_msg_ring_data(struct io_ring_ctx *target_ctx, if (unlikely(io_double_lock_ctx(target_ctx, issue_flags))) return -EAGAIN; } - if (io_post_aux_cqe(target_ctx, msg->user_data, msg->len, flags)) + if (io_post_aux_cqe(target_ctx->s, msg->user_data, msg->len, flags)) ret = 0; if (target_ctx->flags & IORING_SETUP_IOPOLL) io_double_unlock_ctx(target_ctx); @@ -179,7 +180,7 @@ static int io_msg_grab_file(struct io_kiocb *req, unsigned int issue_flags) struct io_rsrc_node *node; int ret = -EBADF; - io_ring_submit_lock(ctx, issue_flags); + io_ring_submit_lock(ctx->s, issue_flags); node = io_rsrc_node_lookup(&ctx->file_table.data, msg->src_fd); if (node) { msg->src_file = io_slot_file(node); @@ -188,7 +189,7 @@ static int io_msg_grab_file(struct io_kiocb *req, unsigned int issue_flags) req->flags |= REQ_F_NEED_CLEANUP; ret = 0; } - io_ring_submit_unlock(ctx, issue_flags); + io_ring_submit_unlock(ctx->s, issue_flags); return ret; } @@ -217,7 +218,7 @@ static int io_msg_install_complete(struct io_kiocb *req, unsigned int issue_flag * completes with -EOVERFLOW, then the sender must ensure that a * later IORING_OP_MSG_RING delivers the message. */ - if (!io_post_aux_cqe(target_ctx, msg->user_data, ret, 0)) + if (!io_post_aux_cqe(target_ctx->s, msg->user_data, ret, 0)) ret = -EOVERFLOW; out_unlock: io_double_unlock_ctx(target_ctx); @@ -241,7 +242,7 @@ static int io_msg_fd_remote(struct io_kiocb *req) { struct io_ring_ctx *ctx = req->file->private_data; struct io_msg *msg = io_kiocb_to_cmd(req, struct io_msg); - struct task_struct *task = READ_ONCE(ctx->submitter_task); + struct task_struct *task = READ_ONCE(ctx->s[0].submitter_task); if (unlikely(!task)) return -EOWNERDEAD; diff --git a/io_uring/napi.c b/io_uring/napi.c index b1ade3fda30f..6563f1d57883 100644 --- a/io_uring/napi.c +++ b/io_uring/napi.c @@ -148,7 +148,7 @@ static bool io_napi_busy_loop_should_end(void *data, if (signal_pending(current)) return true; - if (io_should_wake(iowq) || io_has_work(iowq->ctx)) + if (io_should_wake(iowq) || io_has_work(iowq->s)) return true; if (io_napi_busy_loop_timeout(net_to_ktime(start_time), iowq->napi_busy_poll_dt)) diff --git a/io_uring/net.c b/io_uring/net.c index 8457408194e7..660d726572dd 100644 --- a/io_uring/net.c +++ b/io_uring/net.c @@ -147,7 +147,7 @@ static void io_netmsg_recycle(struct io_kiocb *req, unsigned int issue_flags) /* Let normal cleanup path reap it if we fail adding to the cache */ iov = hdr->free_iov; - if (io_alloc_cache_put(&req->ctx->netmsg_cache, hdr)) { + if (io_alloc_cache_put(&req->sq->netmsg_cache, hdr)) { if (iov) kasan_mempool_poison_object(iov); req->async_data = NULL; @@ -165,10 +165,9 @@ static void io_msg_async_data_init(void *obj) static struct io_async_msghdr *io_msg_alloc_async(struct io_kiocb *req) { - struct io_ring_ctx *ctx = req->ctx; struct io_async_msghdr *hdr; - hdr = io_uring_alloc_async_data(&ctx->netmsg_cache, req, + hdr = io_uring_alloc_async_data(&req->sq->netmsg_cache, req, io_msg_async_data_init); if (!hdr) return NULL; @@ -1228,7 +1227,6 @@ void io_send_zc_cleanup(struct io_kiocb *req) int io_send_zc_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) { struct io_sr_msg *zc = io_kiocb_to_cmd(req, struct io_sr_msg); - struct io_ring_ctx *ctx = req->ctx; struct io_kiocb *notif; zc->done_io = 0; @@ -1240,7 +1238,7 @@ int io_send_zc_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) if (req->flags & REQ_F_CQE_SKIP) return -EINVAL; - notif = zc->notif = io_alloc_notif(ctx); + notif = zc->notif = io_alloc_notif(req->sq); if (!notif) return -ENOMEM; notif->cqe.user_data = req->cqe.user_data; @@ -1346,13 +1344,13 @@ static int io_send_zc_import(struct io_kiocb *req, unsigned int issue_flags) struct io_rsrc_node *node; ret = -EFAULT; - io_ring_submit_lock(ctx, issue_flags); + io_ring_submit_lock(req->sq, issue_flags); node = io_rsrc_node_lookup(&ctx->buf_table, sr->buf_index); if (node) { io_req_assign_buf_node(sr->notif, node); ret = 0; } - io_ring_submit_unlock(ctx, issue_flags); + io_ring_submit_unlock(req->sq, issue_flags); if (unlikely(ret)) return ret; diff --git a/io_uring/nop.c b/io_uring/nop.c index 5e5196df650a..cae77314fb45 100644 --- a/io_uring/nop.c +++ b/io_uring/nop.c @@ -68,13 +68,13 @@ int io_nop(struct io_kiocb *req, unsigned int issue_flags) struct io_rsrc_node *node; ret = -EFAULT; - io_ring_submit_lock(ctx, issue_flags); + io_ring_submit_lock(ctx->s, issue_flags); node = io_rsrc_node_lookup(&ctx->buf_table, nop->buffer); if (node) { io_req_assign_buf_node(req, node); ret = 0; } - io_ring_submit_unlock(ctx, issue_flags); + io_ring_submit_unlock(ctx->s, issue_flags); } done: if (ret < 0) diff --git a/io_uring/notif.c b/io_uring/notif.c index ee3a33510b3c..c93bfe06fd98 100644 --- a/io_uring/notif.c +++ b/io_uring/notif.c @@ -104,13 +104,14 @@ static const struct ubuf_info_ops io_ubuf_ops = { .link_skb = io_link_skb, }; -struct io_kiocb *io_alloc_notif(struct io_ring_ctx *ctx) - __must_hold(&ctx->uring_lock) +struct io_kiocb *io_alloc_notif(struct io_sq_cq *s) { struct io_kiocb *notif; struct io_notif_data *nd; - if (unlikely(!io_alloc_req(ctx, ¬if))) + lockdep_assert_held(&s->ring_lock); + + if (unlikely(!io_alloc_req(s, ¬if))) return NULL; notif->opcode = IORING_OP_NOP; notif->flags = 0; diff --git a/io_uring/notif.h b/io_uring/notif.h index f3589cfef4a9..caaeda8463df 100644 --- a/io_uring/notif.h +++ b/io_uring/notif.h @@ -23,7 +23,7 @@ struct io_notif_data { bool zc_copied; }; -struct io_kiocb *io_alloc_notif(struct io_ring_ctx *ctx); +struct io_kiocb *io_alloc_notif(struct io_sq_cq *s); void io_tx_ubuf_complete(struct sk_buff *skb, struct ubuf_info *uarg, bool success); diff --git a/io_uring/openclose.c b/io_uring/openclose.c index e3357dfa14ca..3af896026b2d 100644 --- a/io_uring/openclose.c +++ b/io_uring/openclose.c @@ -190,9 +190,9 @@ int __io_close_fixed(struct io_ring_ctx *ctx, unsigned int issue_flags, { int ret; - io_ring_submit_lock(ctx, issue_flags); + io_ring_submit_lock(ctx->s, issue_flags); ret = io_fixed_fd_remove(ctx, offset); - io_ring_submit_unlock(ctx, issue_flags); + io_ring_submit_unlock(ctx->s, issue_flags); return ret; } diff --git a/io_uring/poll.c b/io_uring/poll.c index cc01c40b43d3..addd3bd96734 100644 --- a/io_uring/poll.c +++ b/io_uring/poll.c @@ -120,10 +120,10 @@ static struct io_poll *io_poll_get_single(struct io_kiocb *req) static void io_poll_req_insert(struct io_kiocb *req) { - struct io_hash_table *table = &req->ctx->cancel_table; + struct io_hash_table *table = &req->sq->cancel_table; u32 index = hash_long(req->cqe.user_data, table->hash_bits); - lockdep_assert_held(&req->ctx->uring_lock); + lockdep_assert_held(&req->sq->ring_lock); hlist_add_head(&req->hash_node, &table->hbs[index].list); } @@ -341,7 +341,7 @@ void io_poll_task_func(struct io_kiocb *req, struct io_tw_state *ts) io_req_set_res(req, req->cqe.res, 0); io_req_task_complete(req, ts); } else { - io_tw_lock(req->ctx, ts); + io_tw_lock(ts->sq); if (ret == IOU_POLL_REMOVE_POLL_USE_RES) io_req_task_complete(req, ts); @@ -524,11 +524,11 @@ static bool io_poll_can_finish_inline(struct io_kiocb *req, static void io_poll_add_hash(struct io_kiocb *req, unsigned int issue_flags) { - struct io_ring_ctx *ctx = req->ctx; + struct io_sq_cq *s = req->sq; - io_ring_submit_lock(ctx, issue_flags); + io_ring_submit_lock(s, issue_flags); io_poll_req_insert(req); - io_ring_submit_unlock(ctx, issue_flags); + io_ring_submit_unlock(s, issue_flags); } /* @@ -642,7 +642,6 @@ static void io_async_queue_proc(struct file *file, struct wait_queue_head *head, static struct async_poll *io_req_alloc_apoll(struct io_kiocb *req, unsigned issue_flags) { - struct io_ring_ctx *ctx = req->ctx; struct async_poll *apoll; if (req->flags & REQ_F_POLLED) { @@ -650,7 +649,7 @@ static struct async_poll *io_req_alloc_apoll(struct io_kiocb *req, kfree(apoll->double_poll); } else { if (!(issue_flags & IO_URING_F_UNLOCKED)) - apoll = io_cache_alloc(&ctx->apoll_cache, GFP_ATOMIC, NULL); + apoll = io_cache_alloc(&req->sq->apoll_cache, GFP_ATOMIC, NULL); else apoll = kmalloc(sizeof(*apoll), GFP_ATOMIC); if (!apoll) @@ -707,22 +706,17 @@ int io_arm_poll_handler(struct io_kiocb *req, unsigned issue_flags) return IO_APOLL_OK; } -/* - * Returns true if we found and killed one or more poll requests - */ -__cold bool io_poll_remove_all(struct io_ring_ctx *ctx, struct io_uring_task *tctx, - bool cancel_all) +static bool __io_poll_remove_all(struct io_sq_cq *s, struct io_uring_task *tctx, + bool cancel_all) { - unsigned nr_buckets = 1U << ctx->cancel_table.hash_bits; + unsigned nr_buckets = 1U << s->cancel_table.hash_bits; struct hlist_node *tmp; struct io_kiocb *req; bool found = false; int i; - lockdep_assert_held(&ctx->uring_lock); - for (i = 0; i < nr_buckets; i++) { - struct io_hash_bucket *hb = &ctx->cancel_table.hbs[i]; + struct io_hash_bucket *hb = &s->cancel_table.hbs[i]; hlist_for_each_entry_safe(req, tmp, &hb->list, hash_node) { if (io_match_task_safe(req, tctx, cancel_all)) { @@ -735,12 +729,31 @@ __cold bool io_poll_remove_all(struct io_ring_ctx *ctx, struct io_uring_task *tc return found; } -static struct io_kiocb *io_poll_find(struct io_ring_ctx *ctx, bool poll_only, +/* + * Returns true if we found and killed one or more poll requests + */ +__cold bool io_poll_remove_all(struct io_ring_ctx *ctx, struct io_uring_task *tctx, + bool cancel_all) +{ + struct io_sq_cq *s; + bool found; + int i; + + lockdep_assert_held(&ctx->uring_lock); + + found = false; + io_for_each_s(ctx, s, i) + found |= __io_poll_remove_all(s, tctx, cancel_all); + + return found; +} + +static struct io_kiocb *__io_poll_find(struct io_sq_cq *s, bool poll_only, struct io_cancel_data *cd) { struct io_kiocb *req; - u32 index = hash_long(cd->data, ctx->cancel_table.hash_bits); - struct io_hash_bucket *hb = &ctx->cancel_table.hbs[index]; + u32 index = hash_long(cd->data, s->cancel_table.hash_bits); + struct io_hash_bucket *hb = &s->cancel_table.hbs[index]; hlist_for_each_entry(req, &hb->list, hash_node) { if (cd->data != req->cqe.user_data) @@ -756,15 +769,31 @@ static struct io_kiocb *io_poll_find(struct io_ring_ctx *ctx, bool poll_only, return NULL; } -static struct io_kiocb *io_poll_file_find(struct io_ring_ctx *ctx, - struct io_cancel_data *cd) +static struct io_kiocb *io_poll_find(struct io_ring_ctx *ctx, bool poll_only, + struct io_cancel_data *cd) +{ + struct io_kiocb *req; + struct io_sq_cq *s; + int i; + + io_for_each_s(ctx, s, i) { + req = __io_poll_find(s, poll_only, cd); + if (req) + return req; + } + + return NULL; +} + +static struct io_kiocb *__io_poll_file_find(struct io_sq_cq *s, + struct io_cancel_data *cd) { - unsigned nr_buckets = 1U << ctx->cancel_table.hash_bits; + unsigned nr_buckets = 1U << s->cancel_table.hash_bits; struct io_kiocb *req; int i; for (i = 0; i < nr_buckets; i++) { - struct io_hash_bucket *hb = &ctx->cancel_table.hbs[i]; + struct io_hash_bucket *hb = &s->cancel_table.hbs[i]; hlist_for_each_entry(req, &hb->list, hash_node) { if (io_cancel_req_match(req, cd)) @@ -774,6 +803,23 @@ static struct io_kiocb *io_poll_file_find(struct io_ring_ctx *ctx, return NULL; } + +static struct io_kiocb *io_poll_file_find(struct io_ring_ctx *ctx, + struct io_cancel_data *cd) +{ + struct io_kiocb *req; + struct io_sq_cq *s; + int i; + + io_for_each_s(ctx, s, i) { + req = __io_poll_file_find(s, cd); + if (req) + return req; + } + + return NULL; +} + static int io_poll_disarm(struct io_kiocb *req) { if (!req) @@ -807,9 +853,9 @@ int io_poll_cancel(struct io_ring_ctx *ctx, struct io_cancel_data *cd, { int ret; - io_ring_submit_lock(ctx, issue_flags); + io_ring_submit_lock(ctx->s, issue_flags); ret = __io_poll_cancel(ctx, cd); - io_ring_submit_unlock(ctx, issue_flags); + io_ring_submit_unlock(ctx->s, issue_flags); return ret; } @@ -901,7 +947,7 @@ int io_poll_remove(struct io_kiocb *req, unsigned int issue_flags) struct io_kiocb *preq; int ret2, ret = 0; - io_ring_submit_lock(ctx, issue_flags); + io_ring_submit_lock(req->sq, issue_flags); preq = io_poll_find(ctx, true, &cd); ret2 = io_poll_disarm(preq); if (ret2) { @@ -936,7 +982,7 @@ int io_poll_remove(struct io_kiocb *req, unsigned int issue_flags) preq->io_task_work.func = io_req_task_complete; io_req_task_work_add(preq); out: - io_ring_submit_unlock(ctx, issue_flags); + io_ring_submit_unlock(req->sq, issue_flags); if (ret < 0) { req_set_fail(req); return ret; diff --git a/io_uring/register.c b/io_uring/register.c index f1698c18c7cb..dff9b09c8d1d 100644 --- a/io_uring/register.c +++ b/io_uring/register.c @@ -174,13 +174,26 @@ out: return ret; } -static int io_register_enable_rings(struct io_ring_ctx *ctx) +static int io_register_enable_rings(struct io_ring_ctx *ctx, int index) { + struct io_sq_cq *s; + int i, ret; + if (!(ctx->flags & IORING_SETUP_R_DISABLED)) return -EBADFD; + if (index >= ctx->nr_sq) + return -EINVAL; + + s = &ctx->s[index]; + if (!(s->ring_flags & IORING_SETUP_R_DISABLED)) + return -EBADFD; + + ret = io_uring_tctx_node_set_sq(ctx, s); + if (ret) + return ret; - if (ctx->flags & IORING_SETUP_SINGLE_ISSUER && !ctx->submitter_task) { - WRITE_ONCE(ctx->submitter_task, get_task_struct(current)); + if (ctx->flags & IORING_SETUP_SINGLE_ISSUER && !s->submitter_task) { + WRITE_ONCE(s->submitter_task, get_task_struct(current)); /* * Lazy activation attempts would fail if it was polled before * submitter_task is set. @@ -192,6 +205,13 @@ static int io_register_enable_rings(struct io_ring_ctx *ctx) if (ctx->restrictions.registered) ctx->restricted = 1; + s->ring_flags &= ~IORING_SETUP_R_DISABLED; + io_for_each_s(ctx, s, i) { + if (s->ring_flags & IORING_SETUP_R_DISABLED) + return 0; + } + + printk("all enabled, live\n"); ctx->flags &= ~IORING_SETUP_R_DISABLED; if (ctx->sq_data && wq_has_sleeper(&ctx->sq_data->wait)) wake_up(&ctx->sq_data->wait); @@ -374,12 +394,12 @@ struct io_ring_ctx_rings { struct io_mapped_region ring_region; }; -static void io_register_free_rings(struct io_ring_ctx *ctx, +static void io_register_free_rings(struct user_struct *user, struct io_uring_params *p, struct io_ring_ctx_rings *r) { - io_free_region(ctx, &r->sq_region); - io_free_region(ctx, &r->ring_region); + io_free_region(user, &r->sq_region); + io_free_region(user, &r->ring_region); } #define swap_old(ctx, o, n, field) \ @@ -392,7 +412,8 @@ static void io_register_free_rings(struct io_ring_ctx *ctx, #define COPY_FLAGS (IORING_SETUP_NO_SQARRAY | IORING_SETUP_SQE128 | \ IORING_SETUP_CQE32 | IORING_SETUP_NO_MMAP) -static int io_register_resize_rings(struct io_ring_ctx *ctx, void __user *arg) +static int io_register_resize_rings(struct io_ring_ctx *ctx, struct io_sq_cq *s, + void __user *arg) { struct io_uring_region_desc rd; struct io_ring_ctx_rings o = { }, n = { }, *to_free = NULL; @@ -403,7 +424,7 @@ static int io_register_resize_rings(struct io_ring_ctx *ctx, void __user *arg) /* for single issuer, must be owner resizing */ if (ctx->flags & IORING_SETUP_SINGLE_ISSUER && - current != ctx->submitter_task) + current != s->submitter_task) return -EEXIST; if (copy_from_user(&p, arg, sizeof(p))) return -EFAULT; @@ -418,7 +439,7 @@ static int io_register_resize_rings(struct io_ring_ctx *ctx, void __user *arg) return ret; /* nothing to do, but copy params back */ - if (p.sq_entries == ctx->sq_entries && p.cq_entries == ctx->cq_entries) { + if (p.sq_entries == s->sq_entries && p.cq_entries == s->cq_entries) { if (copy_to_user(arg, &p, sizeof(p))) return -EFAULT; return 0; @@ -437,7 +458,7 @@ static int io_register_resize_rings(struct io_ring_ctx *ctx, void __user *arg) } ret = io_create_region_mmap_safe(ctx, &n.ring_region, &rd, IORING_OFF_CQ_RING); if (ret) { - io_register_free_rings(ctx, &p, &n); + io_register_free_rings(ctx->user, &p, &n); return ret; } n.rings = io_region_get_ptr(&n.ring_region); @@ -448,7 +469,7 @@ static int io_register_resize_rings(struct io_ring_ctx *ctx, void __user *arg) n.rings->cq_ring_entries = p.cq_entries; if (copy_to_user(arg, &p, sizeof(p))) { - io_register_free_rings(ctx, &p, &n); + io_register_free_rings(ctx->user, &p, &n); return -EFAULT; } @@ -457,7 +478,7 @@ static int io_register_resize_rings(struct io_ring_ctx *ctx, void __user *arg) else size = array_size(sizeof(struct io_uring_sqe), p.sq_entries); if (size == SIZE_MAX) { - io_register_free_rings(ctx, &p, &n); + io_register_free_rings(ctx->user, &p, &n); return -EOVERFLOW; } @@ -469,7 +490,7 @@ static int io_register_resize_rings(struct io_ring_ctx *ctx, void __user *arg) } ret = io_create_region_mmap_safe(ctx, &n.sq_region, &rd, IORING_OFF_SQES); if (ret) { - io_register_free_rings(ctx, &p, &n); + io_register_free_rings(ctx->user, &p, &n); return ret; } n.sq_sqes = io_region_get_ptr(&n.sq_region); @@ -494,10 +515,10 @@ static int io_register_resize_rings(struct io_ring_ctx *ctx, void __user *arg) */ mutex_lock(&ctx->mmap_lock); spin_lock(&ctx->completion_lock); - o.rings = ctx->rings; - ctx->rings = NULL; - o.sq_sqes = ctx->sq_sqes; - ctx->sq_sqes = NULL; + o.rings = s->rings; + s->rings = NULL; + o.sq_sqes = s->sq_sqes; + s->sq_sqes = NULL; /* * Now copy SQ and CQ entries, if any. If either of the destination @@ -507,7 +528,7 @@ static int io_register_resize_rings(struct io_ring_ctx *ctx, void __user *arg) if (tail - o.rings->sq.head > p.sq_entries) goto overflow; for (i = o.rings->sq.head; i < tail; i++) { - unsigned src_head = i & (ctx->sq_entries - 1); + unsigned src_head = i & (s->sq_entries - 1); unsigned dst_head = i & n.rings->sq_ring_mask; n.sq_sqes[dst_head] = o.sq_sqes[src_head]; @@ -519,14 +540,14 @@ static int io_register_resize_rings(struct io_ring_ctx *ctx, void __user *arg) if (tail - o.rings->cq.head > p.cq_entries) { overflow: /* restore old rings, and return -EOVERFLOW via cleanup path */ - ctx->rings = o.rings; - ctx->sq_sqes = o.sq_sqes; + s->rings = o.rings; + s->sq_sqes = o.sq_sqes; to_free = &n; ret = -EOVERFLOW; goto out; } for (i = o.rings->cq.head; i < tail; i++) { - unsigned src_head = i & (ctx->cq_entries - 1); + unsigned src_head = i & (s->cq_entries - 1); unsigned dst_head = i & n.rings->cq_ring_mask; n.rings->cqes[dst_head] = o.rings->cqes[src_head]; @@ -534,7 +555,7 @@ overflow: n.rings->cq.head = o.rings->cq.head; n.rings->cq.tail = o.rings->cq.tail; /* invalidate cached cqe refill */ - ctx->cqe_cached = ctx->cqe_sentinel = NULL; + s->cqe_cached = s->cqe_sentinel = NULL; n.rings->sq_dropped = o.rings->sq_dropped; n.rings->sq_flags = o.rings->sq_flags; @@ -543,21 +564,21 @@ overflow: /* all done, store old pointers and assign new ones */ if (!(ctx->flags & IORING_SETUP_NO_SQARRAY)) - ctx->sq_array = (u32 *)((char *)n.rings + sq_array_offset); + s->sq_array = (u32 *)((char *)n.rings + sq_array_offset); - ctx->sq_entries = p.sq_entries; - ctx->cq_entries = p.cq_entries; + s->sq_entries = p.sq_entries; + s->cq_entries = p.cq_entries; - ctx->rings = n.rings; - ctx->sq_sqes = n.sq_sqes; - swap_old(ctx, o, n, ring_region); - swap_old(ctx, o, n, sq_region); + s->rings = n.rings; + s->sq_sqes = n.sq_sqes; + swap_old(s, o, n, ring_region); + swap_old(s, o, n, sq_region); to_free = &o; ret = 0; out: spin_unlock(&ctx->completion_lock); mutex_unlock(&ctx->mmap_lock); - io_register_free_rings(ctx, &p, to_free); + io_register_free_rings(ctx->user, &p, to_free); if (ctx->sq_data) io_sq_thread_unpark(ctx->sq_data); @@ -599,7 +620,7 @@ static int io_register_mem_region(struct io_ring_ctx *ctx, void __user *uarg) if (ret) return ret; if (copy_to_user(rd_uptr, &rd, sizeof(rd))) { - io_free_region(ctx, &ctx->param_region); + io_free_region(ctx->user, &ctx->param_region); return -EFAULT; } @@ -615,6 +636,7 @@ static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode, __releases(ctx->uring_lock) __acquires(ctx->uring_lock) { + struct io_sq_cq *s = ctx->s; int ret; /* @@ -624,7 +646,10 @@ static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode, if (WARN_ON_ONCE(percpu_ref_is_dying(&ctx->refs))) return -ENXIO; - if (ctx->submitter_task && ctx->submitter_task != current) + if (ctx->flags & IORING_SETUP_THREAD_ISSUER) + s = io_uring_get_sq(ctx); + + if (s && s->submitter_task && s->submitter_task != current) return -EEXIST; if (ctx->restricted) { @@ -699,9 +724,11 @@ static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode, break; case IORING_REGISTER_ENABLE_RINGS: ret = -EINVAL; - if (arg || nr_args) + if (arg) + break; + if (nr_args && !(ctx->flags & IORING_SETUP_THREAD_ISSUER)) break; - ret = io_register_enable_rings(ctx); + ret = io_register_enable_rings(ctx, nr_args); break; case IORING_REGISTER_RESTRICTIONS: ret = io_register_restrictions(ctx, arg, nr_args); @@ -802,7 +829,7 @@ static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode, ret = -EINVAL; if (!arg || nr_args != 1) break; - ret = io_register_resize_rings(ctx, arg); + ret = io_register_resize_rings(ctx, ctx->s, arg); break; case IORING_REGISTER_MEM_REGION: ret = -EINVAL; diff --git a/io_uring/rsrc.c b/io_uring/rsrc.c index f2ff108485c8..3934843e80cf 100644 --- a/io_uring/rsrc.c +++ b/io_uring/rsrc.c @@ -125,7 +125,7 @@ struct io_rsrc_node *io_rsrc_node_alloc(struct io_ring_ctx *ctx, int type) node = kzalloc(sizeof(*node), GFP_KERNEL); if (node) { node->type = type; - node->refs = 1; + atomic_set(&node->refs, 1); } return node; } @@ -430,10 +430,10 @@ int io_files_update(struct io_kiocb *req, unsigned int issue_flags) if (up->offset == IORING_FILE_INDEX_ALLOC) { ret = io_files_update_with_index_alloc(req, issue_flags); } else { - io_ring_submit_lock(ctx, issue_flags); + io_ring_submit_lock(req->sq, issue_flags); ret = __io_register_rsrc_update(ctx, IORING_RSRC_FILE, &up2, up->nr_args); - io_ring_submit_unlock(ctx, issue_flags); + io_ring_submit_unlock(req->sq, issue_flags); } if (ret < 0) @@ -444,10 +444,8 @@ int io_files_update(struct io_kiocb *req, unsigned int issue_flags) void io_free_rsrc_node(struct io_ring_ctx *ctx, struct io_rsrc_node *node) { - lockdep_assert_held(&ctx->uring_lock); - if (node->tag) - io_post_aux_cqe(ctx, node->tag, 0, 0); + io_post_aux_cqe(ctx->s, node->tag, 0, 0); switch (node->type) { case IORING_RSRC_FILE: @@ -957,7 +955,7 @@ static int io_clone_buffers(struct io_ring_ctx *ctx, struct io_ring_ctx *src_ctx if (src_node) { data.nodes[i] = src_node; - src_node->refs++; + atomic_inc(&src_node->refs); } } diff --git a/io_uring/rsrc.h b/io_uring/rsrc.h index c8b093584461..75b2a4118582 100644 --- a/io_uring/rsrc.h +++ b/io_uring/rsrc.h @@ -15,7 +15,7 @@ enum { struct io_rsrc_node { unsigned char type; - int refs; + atomic_t refs; u64 tag; union { @@ -80,7 +80,7 @@ static inline struct io_rsrc_node *io_rsrc_node_lookup(struct io_rsrc_data *data static inline void io_put_rsrc_node(struct io_ring_ctx *ctx, struct io_rsrc_node *node) { - if (node && !--node->refs) + if (node && atomic_dec_and_test(&node->refs)) io_free_rsrc_node(ctx, node); } @@ -111,7 +111,7 @@ static inline void io_req_put_rsrc_nodes(struct io_kiocb *req) static inline void io_req_assign_rsrc_node(struct io_rsrc_node **dst_node, struct io_rsrc_node *node) { - node->refs++; + atomic_inc(&node->refs); *dst_node = node; } diff --git a/io_uring/rw.c b/io_uring/rw.c index 75f70935ccf4..e98f99cbc31a 100644 --- a/io_uring/rw.c +++ b/io_uring/rw.c @@ -165,7 +165,7 @@ static void io_rw_recycle(struct io_kiocb *req, unsigned int issue_flags) return; } iov = rw->free_iovec; - if (io_alloc_cache_put(&req->ctx->rw_cache, rw)) { + if (io_alloc_cache_put(&req->sq->rw_cache, rw)) { if (iov) kasan_mempool_poison_object(iov); req->async_data = NULL; @@ -218,10 +218,9 @@ static void io_rw_async_data_init(void *obj) static int io_rw_alloc_async(struct io_kiocb *req) { - struct io_ring_ctx *ctx = req->ctx; struct io_async_rw *rw; - rw = io_uring_alloc_async_data(&ctx->rw_cache, req, io_rw_async_data_init); + rw = io_uring_alloc_async_data(&req->sq->rw_cache, req, io_rw_async_data_init); if (!rw) return -ENOMEM; if (rw->free_iovec) { @@ -1280,9 +1279,10 @@ static int io_uring_hybrid_poll(struct io_kiocb *req, return ret; } -int io_do_iopoll(struct io_ring_ctx *ctx, bool force_nonspin) +int io_do_iopoll(struct io_sq_cq *s, bool force_nonspin) { struct io_wq_work_node *pos, *start, *prev; + struct io_ring_ctx *ctx = s->ctx; unsigned int poll_flags = 0; DEFINE_IO_COMP_BATCH(iob); int nr_events = 0; @@ -1345,10 +1345,10 @@ int io_do_iopoll(struct io_ring_ctx *ctx, bool force_nonspin) pos = start ? start->next : ctx->iopoll_list.first; wq_list_cut(&ctx->iopoll_list, prev, start); - if (WARN_ON_ONCE(!wq_list_empty(&ctx->submit_state.compl_reqs))) + if (WARN_ON_ONCE(!wq_list_empty(&s->submit_state.compl_reqs))) return 0; - ctx->submit_state.compl_reqs.first = pos; - __io_submit_flush_completions(ctx); + s->submit_state.compl_reqs.first = pos; + __io_submit_flush_completions(s); return nr_events; } diff --git a/io_uring/splice.c b/io_uring/splice.c index 5b84f1630611..1bba85dbba6b 100644 --- a/io_uring/splice.c +++ b/io_uring/splice.c @@ -65,15 +65,15 @@ static struct file *io_splice_get_file(struct io_kiocb *req, if (!(sp->flags & SPLICE_F_FD_IN_FIXED)) return io_file_get_normal(req, sp->splice_fd_in); - io_ring_submit_lock(ctx, issue_flags); + io_ring_submit_lock(ctx->s, issue_flags); node = io_rsrc_node_lookup(&ctx->file_table.data, sp->splice_fd_in); if (node) { - node->refs++; + atomic_inc(&node->refs); sp->rsrc_node = node; file = io_slot_file(node); req->flags |= REQ_F_NEED_CLEANUP; } - io_ring_submit_unlock(ctx, issue_flags); + io_ring_submit_unlock(ctx->s, issue_flags); return file; } diff --git a/io_uring/sqpoll.c b/io_uring/sqpoll.c index 6df5e649c413..b4b46390b679 100644 --- a/io_uring/sqpoll.c +++ b/io_uring/sqpoll.c @@ -162,10 +162,11 @@ static inline bool io_sqd_events_pending(struct io_sq_data *sqd) static int __io_sq_thread(struct io_ring_ctx *ctx, bool cap_entries) { + struct io_sq_cq *s = ctx->s; unsigned int to_submit; int ret = 0; - to_submit = io_sqring_entries(ctx); + to_submit = io_sqring_entries(s); /* if we're handling multiple rings, cap submit size for fairness */ if (cap_entries && to_submit > IORING_SQPOLL_CAP_ENTRIES_VALUE) to_submit = IORING_SQPOLL_CAP_ENTRIES_VALUE; @@ -176,9 +177,9 @@ static int __io_sq_thread(struct io_ring_ctx *ctx, bool cap_entries) if (ctx->sq_creds != current_cred()) creds = override_creds(ctx->sq_creds); - mutex_lock(&ctx->uring_lock); + mutex_lock(&s->ring_lock); if (!wq_list_empty(&ctx->iopoll_list)) - io_do_iopoll(ctx, true); + io_do_iopoll(&ctx->s[0], true); /* * Don't submit if refs are dying, good for io_uring_register(), @@ -186,8 +187,8 @@ static int __io_sq_thread(struct io_ring_ctx *ctx, bool cap_entries) */ if (to_submit && likely(!percpu_ref_is_dying(&ctx->refs)) && !(ctx->flags & IORING_SETUP_R_DISABLED)) - ret = io_submit_sqes(ctx, to_submit); - mutex_unlock(&ctx->uring_lock); + ret = io_submit_sqes(s, to_submit); + mutex_unlock(&s->ring_lock); if (to_submit && wq_has_sleeper(&ctx->sqo_sq_wait)) wake_up(&ctx->sqo_sq_wait); @@ -337,7 +338,7 @@ static int io_sq_thread(void *data) list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) { atomic_or(IORING_SQ_NEED_WAKEUP, - &ctx->rings->sq_flags); + &ctx->s->rings->sq_flags); if ((ctx->flags & IORING_SETUP_IOPOLL) && !wq_list_empty(&ctx->iopoll_list)) { needs_sched = false; @@ -350,7 +351,7 @@ static int io_sq_thread(void *data) */ smp_mb__after_atomic(); - if (io_sqring_entries(ctx)) { + if (io_sqring_entries(ctx->s)) { needs_sched = false; break; } @@ -364,7 +365,7 @@ static int io_sq_thread(void *data) } list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) atomic_andnot(IORING_SQ_NEED_WAKEUP, - &ctx->rings->sq_flags); + &ctx->s->rings->sq_flags); } finish_wait(&sqd->wait, &wait); @@ -377,7 +378,7 @@ static int io_sq_thread(void *data) io_uring_cancel_generic(true, sqd); sqd->thread = NULL; list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) - atomic_or(IORING_SQ_NEED_WAKEUP, &ctx->rings->sq_flags); + atomic_or(IORING_SQ_NEED_WAKEUP, &ctx->s->rings->sq_flags); io_run_task_work(); mutex_unlock(&sqd->lock); err_out: @@ -390,11 +391,11 @@ void io_sqpoll_wait_sq(struct io_ring_ctx *ctx) DEFINE_WAIT(wait); do { - if (!io_sqring_full(ctx)) + if (!io_sqring_full(ctx->s)) break; prepare_to_wait(&ctx->sqo_sq_wait, &wait, TASK_INTERRUPTIBLE); - if (!io_sqring_full(ctx)) + if (!io_sqring_full(ctx->s)) break; schedule(); } while (!signal_pending(current)); diff --git a/io_uring/tctx.c b/io_uring/tctx.c index adc6e42c14df..ec02c355446b 100644 --- a/io_uring/tctx.c +++ b/io_uring/tctx.c @@ -39,7 +39,7 @@ static struct io_wq *io_init_wq_offload(struct io_ring_ctx *ctx, data.do_work = io_wq_submit_work; /* Do QD, or 4 * CPUS, whatever is smallest */ - concurrency = min(ctx->sq_entries, 4 * num_online_cpus()); + concurrency = min(ctx->s->sq_entries, 4 * num_online_cpus()); return io_wq_create(concurrency, &data); } @@ -103,7 +103,8 @@ __cold int io_uring_alloc_task_context(struct task_struct *task, return 0; } -int __io_uring_add_tctx_node(struct io_ring_ctx *ctx) +static struct io_tctx_node *__io_uring_add_ret_tctx_node(struct io_ring_ctx *ctx, + struct io_sq_cq *s) { struct io_uring_task *tctx = current->io_uring; struct io_tctx_node *node; @@ -112,7 +113,7 @@ int __io_uring_add_tctx_node(struct io_ring_ctx *ctx) if (unlikely(!tctx)) { ret = io_uring_alloc_task_context(current, ctx); if (unlikely(ret)) - return ret; + return ERR_PTR(ret); tctx = current->io_uring; if (ctx->iowq_limits_set) { @@ -121,39 +122,71 @@ int __io_uring_add_tctx_node(struct io_ring_ctx *ctx) ret = io_wq_max_workers(tctx->io_wq, limits); if (ret) - return ret; + return ERR_PTR(ret); } } - if (!xa_load(&tctx->xa, (unsigned long)ctx)) { + node = xa_load(&tctx->xa, (unsigned long) ctx); + if (!node) { node = kmalloc(sizeof(*node), GFP_KERNEL); if (!node) - return -ENOMEM; + return ERR_PTR(-ENOMEM); node->ctx = ctx; + node->sq = s; node->task = current; ret = xa_err(xa_store(&tctx->xa, (unsigned long)ctx, node, GFP_KERNEL)); if (ret) { kfree(node); - return ret; + return ERR_PTR(ret); } mutex_lock(&ctx->uring_lock); list_add(&node->ctx_node, &ctx->tctx_list); mutex_unlock(&ctx->uring_lock); } + return node; +} + +int __io_uring_add_tctx_node(struct io_ring_ctx *ctx, struct io_sq_cq *s) +{ + struct io_tctx_node *node; + + node = __io_uring_add_ret_tctx_node(ctx, s); + if (IS_ERR(node)) + return PTR_ERR(node); + + return 0; +} + +int io_uring_tctx_node_set_sq(struct io_ring_ctx *ctx, struct io_sq_cq *s) +{ + struct io_tctx_node *node; + + mutex_unlock(&ctx->uring_lock); + node = __io_uring_add_ret_tctx_node(ctx, s); + mutex_lock(&ctx->uring_lock); + if (IS_ERR(node)) + return PTR_ERR(node); + if (node->sq == s) + return 0; + else if (node->sq) + return -EBUSY; + + node->sq = s; return 0; } -int __io_uring_add_tctx_node_from_submit(struct io_ring_ctx *ctx) +int __io_uring_add_tctx_node_from_submit(struct io_ring_ctx *ctx, + struct io_sq_cq *s) { int ret; if (ctx->flags & IORING_SETUP_SINGLE_ISSUER - && ctx->submitter_task != current) + && s->submitter_task != current) return -EEXIST; - ret = __io_uring_add_tctx_node(ctx); + ret = __io_uring_add_tctx_node(ctx, s); if (ret) return ret; @@ -274,7 +307,7 @@ int io_ringfd_register(struct io_ring_ctx *ctx, void __user *__arg, return -EINVAL; mutex_unlock(&ctx->uring_lock); - ret = __io_uring_add_tctx_node(ctx); + ret = __io_uring_add_tctx_node(ctx, NULL); mutex_lock(&ctx->uring_lock); if (ret) return ret; diff --git a/io_uring/tctx.h b/io_uring/tctx.h index 608e96de70a2..3e200c3ec9ef 100644 --- a/io_uring/tctx.h +++ b/io_uring/tctx.h @@ -4,13 +4,16 @@ struct io_tctx_node { struct list_head ctx_node; struct task_struct *task; struct io_ring_ctx *ctx; + struct io_sq_cq *sq; }; int io_uring_alloc_task_context(struct task_struct *task, struct io_ring_ctx *ctx); void io_uring_del_tctx_node(unsigned long index); -int __io_uring_add_tctx_node(struct io_ring_ctx *ctx); -int __io_uring_add_tctx_node_from_submit(struct io_ring_ctx *ctx); +int io_uring_tctx_node_set_sq(struct io_ring_ctx *ctx, struct io_sq_cq *s); +int __io_uring_add_tctx_node(struct io_ring_ctx *ctx, struct io_sq_cq *s); +int __io_uring_add_tctx_node_from_submit(struct io_ring_ctx *ctx, + struct io_sq_cq *s); void io_uring_clean_tctx(struct io_uring_task *tctx); void io_uring_unreg_ringfd(void); @@ -22,12 +25,27 @@ int io_ringfd_unregister(struct io_ring_ctx *ctx, void __user *__arg, /* * Note that this task has used io_uring. We use it for cancelation purposes. */ -static inline int io_uring_add_tctx_node(struct io_ring_ctx *ctx) +static inline int io_uring_add_tctx_node(struct io_ring_ctx *ctx, + struct io_sq_cq *s) { struct io_uring_task *tctx = current->io_uring; if (likely(tctx && tctx->last == ctx)) return 0; - return __io_uring_add_tctx_node_from_submit(ctx); + return __io_uring_add_tctx_node_from_submit(ctx, s); +} + +static inline struct io_sq_cq *io_uring_get_sq(struct io_ring_ctx *ctx) +{ + struct io_uring_task *tctx = current->io_uring; + + if (tctx) { + struct io_tctx_node *node; + + node = xa_load(&tctx->xa, (unsigned long) ctx); + if (node) + return node->sq; + } + return NULL; } diff --git a/io_uring/timeout.c b/io_uring/timeout.c index a166fd90667a..a21d0201c87f 100644 --- a/io_uring/timeout.c +++ b/io_uring/timeout.c @@ -95,8 +95,8 @@ static bool io_kill_timeout(struct io_kiocb *req, int status) if (status) req_set_fail(req); - atomic_set(&req->ctx->cq_timeouts, - atomic_read(&req->ctx->cq_timeouts) + 1); + atomic_set(&req->sq->cq_timeouts, + atomic_read(&req->sq->cq_timeouts) + 1); list_del_init(&timeout->list); io_req_queue_tw_complete(req, status); return true; @@ -110,7 +110,7 @@ __cold void io_flush_timeouts(struct io_ring_ctx *ctx) struct io_timeout *timeout, *tmp; raw_spin_lock_irq(&ctx->timeout_lock); - seq = ctx->cached_cq_tail - atomic_read(&ctx->cq_timeouts); + seq = ctx->s->cached_cq_tail - atomic_read(&ctx->s->cq_timeouts); list_for_each_entry_safe(timeout, tmp, &ctx->timeout_list, list) { struct io_kiocb *req = cmd_to_io_kiocb(timeout); @@ -139,7 +139,7 @@ __cold void io_flush_timeouts(struct io_ring_ctx *ctx) static void io_req_tw_fail_links(struct io_kiocb *link, struct io_tw_state *ts) { - io_tw_lock(link->ctx, ts); + io_tw_lock(ts->sq); while (link) { struct io_kiocb *nxt = link->link; long res = -ECANCELED; @@ -240,8 +240,8 @@ static enum hrtimer_restart io_timeout_fn(struct hrtimer *timer) raw_spin_lock_irqsave(&ctx->timeout_lock, flags); list_del_init(&timeout->list); - atomic_set(&req->ctx->cq_timeouts, - atomic_read(&req->ctx->cq_timeouts) + 1); + atomic_set(&req->ctx->s->cq_timeouts, + atomic_read(&req->ctx->s->cq_timeouts) + 1); raw_spin_unlock_irqrestore(&ctx->timeout_lock, flags); if (!(data->flags & IORING_TIMEOUT_ETIME_SUCCESS)) @@ -541,7 +541,7 @@ static int __io_timeout_prep(struct io_kiocb *req, hrtimer_init(&data->timer, io_timeout_get_clock(data), data->mode); if (is_timeout_link) { - struct io_submit_link *link = &req->ctx->submit_state.link; + struct io_submit_link *link = &req->sq->submit_state.link; if (!link->head) return -EINVAL; @@ -567,6 +567,7 @@ int io_timeout(struct io_kiocb *req, unsigned int issue_flags) { struct io_timeout *timeout = io_kiocb_to_cmd(req, struct io_timeout); struct io_ring_ctx *ctx = req->ctx; + struct io_sq_cq *s = req->sq; struct io_timeout_data *data = req->async_data; struct list_head *entry; u32 tail, off = timeout->off; @@ -583,7 +584,7 @@ int io_timeout(struct io_kiocb *req, unsigned int issue_flags) goto add; } - tail = data_race(ctx->cached_cq_tail) - atomic_read(&ctx->cq_timeouts); + tail = data_race(s->cached_cq_tail) - atomic_read(&s->cq_timeouts); timeout->target_seq = tail + off; /* Update the last seq here in case io_flush_timeouts() hasn't. diff --git a/io_uring/uring_cmd.c b/io_uring/uring_cmd.c index d6ff803dbbe1..a09e83149cfe 100644 --- a/io_uring/uring_cmd.c +++ b/io_uring/uring_cmd.c @@ -23,7 +23,7 @@ static void io_req_uring_cleanup(struct io_kiocb *req, unsigned int issue_flags) if (issue_flags & IO_URING_F_UNLOCKED) return; - if (io_alloc_cache_put(&req->ctx->uring_cache, cache)) { + if (io_alloc_cache_put(&req->sq->uring_cache, cache)) { ioucmd->sqe = NULL; req->async_data = NULL; req->flags &= ~REQ_F_ASYNC_DATA; @@ -57,7 +57,7 @@ bool io_uring_try_cancel_uring_cmd(struct io_ring_ctx *ctx, ret = true; } } - io_submit_flush_completions(ctx); + io_submit_flush_completions(ctx->s); return ret; } @@ -71,9 +71,9 @@ static void io_uring_cmd_del_cancelable(struct io_uring_cmd *cmd, return; cmd->flags &= ~IORING_URING_CMD_CANCELABLE; - io_ring_submit_lock(ctx, issue_flags); + io_ring_submit_lock(ctx->s, issue_flags); hlist_del(&req->hash_node); - io_ring_submit_unlock(ctx, issue_flags); + io_ring_submit_unlock(ctx->s, issue_flags); } /* @@ -93,9 +93,9 @@ void io_uring_cmd_mark_cancelable(struct io_uring_cmd *cmd, if (!(cmd->flags & IORING_URING_CMD_CANCELABLE)) { cmd->flags |= IORING_URING_CMD_CANCELABLE; - io_ring_submit_lock(ctx, issue_flags); + io_ring_submit_lock(ctx->s, issue_flags); hlist_add_head(&req->hash_node, &ctx->cancelable_uring_cmd); - io_ring_submit_unlock(ctx, issue_flags); + io_ring_submit_unlock(ctx->s, issue_flags); } } EXPORT_SYMBOL_GPL(io_uring_cmd_mark_cancelable); @@ -155,7 +155,7 @@ void io_uring_cmd_done(struct io_uring_cmd *ioucmd, ssize_t ret, u64 res2, } else if (issue_flags & IO_URING_F_COMPLETE_DEFER) { if (WARN_ON_ONCE(issue_flags & IO_URING_F_UNLOCKED)) return; - io_req_complete_defer(req); + io_req_complete_defer(req, req->sq); } else { req->io_task_work.func = io_req_task_complete; io_req_task_work_add(req); @@ -169,7 +169,7 @@ static int io_uring_cmd_prep_setup(struct io_kiocb *req, struct io_uring_cmd *ioucmd = io_kiocb_to_cmd(req, struct io_uring_cmd); struct uring_cache *cache; - cache = io_uring_alloc_async_data(&req->ctx->uring_cache, req, NULL); + cache = io_uring_alloc_async_data(&req->sq->uring_cache, req, NULL); if (!cache) return -ENOMEM; diff --git a/io_uring/waitid.c b/io_uring/waitid.c index 6778c0ee76c4..8ab6425fb2a9 100644 --- a/io_uring/waitid.c +++ b/io_uring/waitid.c @@ -166,7 +166,7 @@ int io_waitid_cancel(struct io_ring_ctx *ctx, struct io_cancel_data *cd, if (cd->flags & (IORING_ASYNC_CANCEL_FD|IORING_ASYNC_CANCEL_FD_FIXED)) return -ENOENT; - io_ring_submit_lock(ctx, issue_flags); + io_ring_submit_lock(ctx->s, issue_flags); hlist_for_each_entry_safe(req, tmp, &ctx->waitid_list, hash_node) { if (req->cqe.user_data != cd->data && !(cd->flags & IORING_ASYNC_CANCEL_ANY)) @@ -176,7 +176,7 @@ int io_waitid_cancel(struct io_ring_ctx *ctx, struct io_cancel_data *cd, if (!(cd->flags & IORING_ASYNC_CANCEL_ALL)) break; } - io_ring_submit_unlock(ctx, issue_flags); + io_ring_submit_unlock(ctx->s, issue_flags); if (nr) return nr; @@ -225,10 +225,9 @@ static inline bool io_waitid_drop_issue_ref(struct io_kiocb *req) static void io_waitid_cb(struct io_kiocb *req, struct io_tw_state *ts) { struct io_waitid_async *iwa = req->async_data; - struct io_ring_ctx *ctx = req->ctx; int ret; - io_tw_lock(ctx, ts); + io_tw_lock(ts->sq); ret = __do_wait(&iwa->wo); @@ -327,7 +326,7 @@ int io_waitid(struct io_kiocb *req, unsigned int issue_flags) * dropped. We only need to worry about racing with the wakeup * callback. */ - io_ring_submit_lock(ctx, issue_flags); + io_ring_submit_lock(ctx->s, issue_flags); hlist_add_head(&req->hash_node, &ctx->waitid_list); init_waitqueue_func_entry(&iwa->wo.child_wait, io_waitid_wait); @@ -342,7 +341,7 @@ int io_waitid(struct io_kiocb *req, unsigned int issue_flags) * a waitqueue callback, or if someone cancels it. */ if (!io_waitid_drop_issue_ref(req)) { - io_ring_submit_unlock(ctx, issue_flags); + io_ring_submit_unlock(ctx->s, issue_flags); return IOU_ISSUE_SKIP_COMPLETE; } @@ -350,7 +349,7 @@ int io_waitid(struct io_kiocb *req, unsigned int issue_flags) * Wakeup triggered, racing with us. It was prevented from * completing because of that, queue up the tw to do that. */ - io_ring_submit_unlock(ctx, issue_flags); + io_ring_submit_unlock(ctx->s, issue_flags); return IOU_ISSUE_SKIP_COMPLETE; } @@ -358,7 +357,7 @@ int io_waitid(struct io_kiocb *req, unsigned int issue_flags) remove_wait_queue(iw->head, &iwa->wo.child_wait); ret = io_waitid_finish(req, ret); - io_ring_submit_unlock(ctx, issue_flags); + io_ring_submit_unlock(ctx->s, issue_flags); done: if (ret < 0) req_set_fail(req); -- 2.25.1