From: Jens Axboe Date: Thu, 24 Jul 2025 17:22:47 +0000 (-0600) Subject: io_uring/register: add support for ring -> ring channels X-Git-Url: https://git.kernel.dk/?a=commitdiff_plain;h=9d13021a2157ae8e1b6fdbe59b50217eb0ac1bd0;p=linux-block.git io_uring/register: add support for ring -> ring channels Add support for a unidirectional communication channel from one ring to another. While IORING_OP_MSG_RING already supports some notion of communicating between rings, it works by submitting task_work between rings. This isn't very efficient, and is also a bit wonky in how io_kiocb requests need to be faked to accomplish that goal. An io_uring channel is a single unidirectional ring, with a specific single producer (the source ring) and a specific single consumer (the target ring). Messages added to the target ring will treated like task_work in that it'll be checked for like local task_work, and flushed like local task_work. Anything added to the channel will be turned into CQEs posted on the target ring CQ ring. If IORING_CHAN_REG_BIDI is set in the registration flags, then two channels will be setup - one in each direction. This allows for responses from one channel to be posted on the other channel. IORING_REGISTER_ADD_CHAN adds a new channel (or pair of channels), and takes a pointer to the following structure as the argument: struct io_uring_chan_reg { __u32 flags; __u32 dst_fd; __u32 nentries; __u32 resv[7]; }; where 'flags' must be 0 or set to IORING_CHAN_REG_BIDI, 'dst_fd' is the file descriptor for the target ring, and 'nentries' is the number of entries that the channel should be able to hold. Channel messages are flushed like task_work, and hence generally a lot won't be required. 'nentries' must be a power-of-2 value. Signed-off-by: Jens Axboe --- diff --git a/include/linux/io_uring_types.h b/include/linux/io_uring_types.h index 6f4080ec968e..d03b93b14b28 100644 --- a/include/linux/io_uring_types.h +++ b/include/linux/io_uring_types.h @@ -371,6 +371,14 @@ struct io_ring_ctx { spinlock_t completion_lock; + /* + * Communication channels, if any. xa_dst_chan are channels registered + * where this ring is the destination/target, and xa_src_chan are + * channels where this ring is the source/sender. + */ + struct xarray xa_dst_chan; + struct xarray xa_src_chan; + struct list_head cq_overflow_list; struct hlist_head waitid_list; diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h index 33b386f43d47..f36d59051399 100644 --- a/include/uapi/linux/io_uring.h +++ b/include/uapi/linux/io_uring.h @@ -682,6 +682,9 @@ enum io_uring_register_op { IORING_REGISTER_MEM_REGION = 34, + /* add channel(s) between rings */ + IORING_REGISTER_ADD_CHAN = 35, + /* this goes last */ IORING_REGISTER_LAST, @@ -727,6 +730,23 @@ struct io_uring_mem_region_reg { __u64 __resv[2]; }; +enum { + /* + * Setup a communication channel that includes a response channel + * as well. Messages sent over this link will include the response + * queue ID in the posted CQE, so that the receiver can send a + * response back to the originator. + */ + IORING_CHAN_REG_BIDI = 0x1, +}; + +struct io_uring_chan_reg { + __u32 flags; + __u32 dst_fd; + __u32 nentries; + __u32 resv[7]; +}; + /* * Register a fully sparse file space, rather than pass in an array of all * -1 file descriptors. diff --git a/io_uring/Makefile b/io_uring/Makefile index b3f1bd492804..19c447765a27 100644 --- a/io_uring/Makefile +++ b/io_uring/Makefile @@ -13,7 +13,7 @@ obj-$(CONFIG_IO_URING) += io_uring.o opdef.o kbuf.o rsrc.o notif.o \ sync.o msg_ring.o advise.o openclose.o \ statx.o timeout.o cancel.o \ waitid.o register.o truncate.o \ - memmap.o alloc_cache.o + memmap.o alloc_cache.o chan.o obj-$(CONFIG_IO_URING_ZCRX) += zcrx.o obj-$(CONFIG_IO_WQ) += io-wq.o obj-$(CONFIG_FUTEX) += futex.o diff --git a/io_uring/chan.c b/io_uring/chan.c new file mode 100644 index 000000000000..b4f16aec2ffa --- /dev/null +++ b/io_uring/chan.c @@ -0,0 +1,183 @@ +// SPDX-License-Identifier: GPL-2.0 +#include +#include +#include +#include +#include + +#include + +#include "io_uring.h" +#include "register.h" +#include "chan.h" + +/* + * ctx1 is already locked on entry, both will be locked on return. + */ +static void io_ctx_double_lock(struct io_ring_ctx *ctx1, + struct io_ring_ctx *ctx2) +{ + if (ctx1 < ctx2) { + mutex_lock_nested(&ctx2->uring_lock, SINGLE_DEPTH_NESTING); + } else { + mutex_unlock(&ctx1->uring_lock); + mutex_lock(&ctx2->uring_lock); + mutex_lock_nested(&ctx1->uring_lock, SINGLE_DEPTH_NESTING); + } +} + +void io_unregister_queue_chans(struct io_ring_ctx *ctx) +{ + struct io_queue_chan *c; + unsigned long index; + + lockdep_assert_held(&ctx->uring_lock); + + xa_for_each(&ctx->xa_src_chan, index, c) { + if (atomic_dec_and_test(&c->refs)) + kfree_rcu(c, rcu_head); + } + xa_for_each(&ctx->xa_dst_chan, index, c) { + if (atomic_dec_and_test(&c->refs)) + kfree_rcu(c, rcu_head); + } + xa_destroy(&ctx->xa_src_chan); + xa_destroy(&ctx->xa_dst_chan); +} + +struct chan_ids { + __u32 src_id; + __u32 dst_id; +}; + +static struct io_queue_chan *__io_register_queue_chan(struct io_ring_ctx *ctx, + struct io_ring_ctx *dst, + struct io_uring_chan_reg *chan, + struct chan_ids *ids) +{ + struct xa_limit lim = { .max = SHRT_MAX, .min = 0 }; + struct io_queue_chan *c; + size_t chan_size; + int ret; + + if (percpu_ref_is_dying(&dst->refs)) + return ERR_PTR(-ENXIO); + + chan_size = struct_size(c, data, chan->nentries); + if (chan_size == SIZE_MAX || chan_size > KMALLOC_MAX_SIZE) + return ERR_PTR(-EOVERFLOW); + + c = kzalloc(chan_size, GFP_KERNEL_ACCOUNT); + if (!c) + return ERR_PTR(-ENOMEM); + + /* + * One ref for each ring that is attached to an endpoint. Having refs + * != 2 then also means that one end has detached and the channel + * can be considered dead. + */ + atomic_set(&c->refs, 2); + c->nentries = chan->nentries; + c->mask = chan->nentries - 1; + + ret = xa_alloc(&ctx->xa_src_chan, &ids->src_id, c, lim, GFP_KERNEL_ACCOUNT); + if (ret) { + kfree_rcu(c, rcu_head); + return ERR_PTR(ret); + } + + ret = xa_alloc(&dst->xa_dst_chan, &ids->dst_id, c, lim, GFP_KERNEL_ACCOUNT); + if (ret) { + xa_erase(&ctx->xa_src_chan, ids->src_id); + kfree_rcu(c, rcu_head); + return ERR_PTR(ret); + } + + return c; +} + +static void io_chan_free(struct io_ring_ctx *ctx, struct io_ring_ctx *dst, + struct chan_ids *ids) +{ + struct io_queue_chan *c; + + c = xa_erase(&ctx->xa_src_chan, ids->src_id); + xa_erase(&dst->xa_dst_chan, ids->dst_id); + percpu_ref_put(&dst->refs); + atomic_sub(2, &c->refs); + kfree_rcu(c, rcu_head); +} + +static bool valid_ring_flags(struct io_ring_ctx *ctx) +{ + /* + * Must be DEFER_TASKRUN (could be relaxed) and CQE32 to be able to + * send enough data. + */ + if ((ctx->flags & (IORING_SETUP_DEFER_TASKRUN|IORING_SETUP_CQE32)) != + (IORING_SETUP_DEFER_TASKRUN|IORING_SETUP_CQE32)) + return false; + return true; +} + +int io_register_add_queue_chan(struct io_ring_ctx *ctx, void __user *arg) +{ + struct chan_ids ids1 = { }, ids2 = { }; + struct io_uring_chan_reg chan; + struct io_queue_chan *c; + struct io_ring_ctx *dst; + struct file *file; + int ret; + + lockdep_assert_held(&ctx->uring_lock); + + if (copy_from_user(&chan, arg, sizeof(chan))) + return -EFAULT; + if (chan.flags & ~IORING_CHAN_REG_BIDI) + return -EINVAL; + if (!is_power_of_2(chan.nentries)) + return -EINVAL; + if (memchr_inv(&chan.resv, 0, sizeof(chan.resv))) + return -EINVAL; + + file = io_uring_register_get_file(chan.dst_fd, false); + if (IS_ERR(file)) + return PTR_ERR(file); + dst = file->private_data; + if (dst == ctx) { + ret = -EINVAL; + goto err; + } + if (!valid_ring_flags(dst)) { + ret = -EINVAL; + goto err; + } + if (chan.flags & IORING_CHAN_REG_BIDI && !valid_ring_flags(ctx)) { + ret = -EINVAL; + goto err; + } + + io_ctx_double_lock(ctx, dst); + c = __io_register_queue_chan(ctx, dst, &chan, &ids1); + if (IS_ERR(c)) { + ret = PTR_ERR(c); + goto unlock; + } + if (chan.flags & IORING_CHAN_REG_BIDI) { + struct io_queue_chan *c2; + + c2 = __io_register_queue_chan(dst, ctx, &chan, &ids2); + if (IS_ERR(c2)) { + ret = PTR_ERR(c2); + io_chan_free(ctx, dst, &ids1); + goto unlock; + } + c->resp_id = ids2.src_id; + } + ret = ids1.src_id; +unlock: + mutex_unlock(&dst->uring_lock); +err: + fput(file); + return ret; +} diff --git a/io_uring/chan.h b/io_uring/chan.h new file mode 100644 index 000000000000..cea16b5d2f4e --- /dev/null +++ b/io_uring/chan.h @@ -0,0 +1,20 @@ +// SPDX-License-Identifier: GPL-2.0 +struct io_queue_chan_entry { + struct io_uring_cqe cqes[2]; +}; + +struct io_queue_chan { + struct { + atomic_t refs; + __u32 head; + } ____cacheline_aligned_in_smp; + __u32 nentries; + __u32 mask; + __u32 tail; + __u32 resp_id; + struct rcu_head rcu_head; + struct io_queue_chan_entry data[]; +}; + +int io_register_add_queue_chan(struct io_ring_ctx *ctx, void __user *arg); +void io_unregister_queue_chans(struct io_ring_ctx *ctx); diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c index 922796855336..41ea45ecc080 100644 --- a/io_uring/io_uring.c +++ b/io_uring/io_uring.c @@ -346,6 +346,8 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) goto free_ref; init_completion(&ctx->ref_comp); xa_init_flags(&ctx->personalities, XA_FLAGS_ALLOC1); + xa_init_flags(&ctx->xa_src_chan, XA_FLAGS_ALLOC1); + xa_init_flags(&ctx->xa_dst_chan, XA_FLAGS_ALLOC1); mutex_init(&ctx->uring_lock); init_waitqueue_head(&ctx->cq_wait); init_waitqueue_head(&ctx->poll_wq); @@ -3097,6 +3099,7 @@ static __cold void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx) percpu_ref_kill(&ctx->refs); xa_for_each(&ctx->personalities, index, creds) io_unregister_personality(ctx, index); + io_unregister_queue_chans(ctx); mutex_unlock(&ctx->uring_lock); flush_delayed_work(&ctx->fallback_work); diff --git a/io_uring/register.c b/io_uring/register.c index a1a9b2884eae..2c8a14b87d7d 100644 --- a/io_uring/register.c +++ b/io_uring/register.c @@ -31,6 +31,7 @@ #include "msg_ring.h" #include "memmap.h" #include "zcrx.h" +#include "chan.h" #define IORING_MAX_RESTRICTIONS (IORING_RESTRICTION_LAST + \ IORING_REGISTER_LAST + IORING_OP_LAST) @@ -836,6 +837,12 @@ static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode, break; ret = io_register_mem_region(ctx, arg); break; + case IORING_REGISTER_ADD_CHAN: + ret = -EINVAL; + if (!arg || nr_args != 1) + break; + ret = io_register_add_queue_chan(ctx, arg); + break; default: ret = -EINVAL; break; diff --git a/io_uring/register.h b/io_uring/register.h index a5f39d5ef9e0..8b40ea183d9d 100644 --- a/io_uring/register.h +++ b/io_uring/register.h @@ -5,5 +5,6 @@ int io_eventfd_unregister(struct io_ring_ctx *ctx); int io_unregister_personality(struct io_ring_ctx *ctx, unsigned id); struct file *io_uring_register_get_file(unsigned int fd, bool registered); +void io_unregister_queue_chans(struct io_ring_ctx *ctx); #endif