io_uring/register: add support for ring -> ring channels
authorJens Axboe <axboe@kernel.dk>
Thu, 24 Jul 2025 17:22:47 +0000 (11:22 -0600)
committerJens Axboe <axboe@kernel.dk>
Sat, 9 Aug 2025 14:38:27 +0000 (08:38 -0600)
Add support for a unidirectional communication channel from one ring to
another. While IORING_OP_MSG_RING already supports some notion of
communicating between rings, it works by submitting task_work between
rings. This isn't very efficient, and is also a bit wonky in how
io_kiocb requests need to be faked to accomplish that goal.

An io_uring channel is a single unidirectional ring, with a specific
single producer (the source ring) and a specific single consumer (the
target ring). Messages added to the target ring will treated like
task_work in that it'll be checked for like local task_work, and flushed
like local task_work. Anything added to the channel will be turned into
CQEs posted on the target ring CQ ring.

If IORING_CHAN_REG_BIDI is set in the registration flags, then two
channels will be setup - one in each direction. This allows for
responses from one channel to be posted on the other channel.

IORING_REGISTER_ADD_CHAN adds a new channel (or pair of channels),
and takes a pointer to the following structure as the argument:

struct io_uring_chan_reg {
__u32 flags;
__u32 dst_fd;
__u32 nentries;
__u32 resv[7];
};

where 'flags' must be 0 or set to IORING_CHAN_REG_BIDI, 'dst_fd' is the
file descriptor for the target ring, and 'nentries' is the number of
entries that the channel should be able to hold. Channel messages are
flushed like task_work, and hence generally a lot won't be required.
'nentries' must be a power-of-2 value.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
include/linux/io_uring_types.h
include/uapi/linux/io_uring.h
io_uring/Makefile
io_uring/chan.c [new file with mode: 0644]
io_uring/chan.h [new file with mode: 0644]
io_uring/io_uring.c
io_uring/register.c
io_uring/register.h

index 6f4080ec968ea971b5483869cc53092f2e3687b1..d03b93b14b2890055b710048be9b771025612805 100644 (file)
@@ -371,6 +371,14 @@ struct io_ring_ctx {
 
        spinlock_t              completion_lock;
 
+       /*
+        * Communication channels, if any. xa_dst_chan are channels registered
+        * where this ring is the destination/target, and xa_src_chan are
+        * channels where this ring is the source/sender.
+        */
+       struct xarray           xa_dst_chan;
+       struct xarray           xa_src_chan;
+
        struct list_head        cq_overflow_list;
 
        struct hlist_head       waitid_list;
index 33b386f43d47cdd090efa19c36d88da7f719847e..f36d59051399dd951b1cbc97618733f6ecf74f27 100644 (file)
@@ -682,6 +682,9 @@ enum io_uring_register_op {
 
        IORING_REGISTER_MEM_REGION              = 34,
 
+       /* add channel(s) between rings */
+       IORING_REGISTER_ADD_CHAN                = 35,
+
        /* this goes last */
        IORING_REGISTER_LAST,
 
@@ -727,6 +730,23 @@ struct io_uring_mem_region_reg {
        __u64 __resv[2];
 };
 
+enum {
+       /*
+        * Setup a communication channel that includes a response channel
+        * as well. Messages sent over this link will include the response
+        * queue ID in the posted CQE, so that the receiver can send a
+        * response back to the originator.
+        */
+       IORING_CHAN_REG_BIDI    = 0x1,
+};
+
+struct io_uring_chan_reg {
+       __u32 flags;
+       __u32 dst_fd;
+       __u32 nentries;
+       __u32 resv[7];
+};
+
 /*
  * Register a fully sparse file space, rather than pass in an array of all
  * -1 file descriptors.
index b3f1bd492804b01dee2b1b075a5f9a4653e2d595..19c447765a27351899e0fe6a6ec8e5397a3ae658 100644 (file)
@@ -13,7 +13,7 @@ obj-$(CONFIG_IO_URING)                += io_uring.o opdef.o kbuf.o rsrc.o notif.o \
                                        sync.o msg_ring.o advise.o openclose.o \
                                        statx.o timeout.o cancel.o \
                                        waitid.o register.o truncate.o \
-                                       memmap.o alloc_cache.o
+                                       memmap.o alloc_cache.o chan.o
 obj-$(CONFIG_IO_URING_ZCRX)    += zcrx.o
 obj-$(CONFIG_IO_WQ)            += io-wq.o
 obj-$(CONFIG_FUTEX)            += futex.o
diff --git a/io_uring/chan.c b/io_uring/chan.c
new file mode 100644 (file)
index 0000000..b4f16ae
--- /dev/null
@@ -0,0 +1,183 @@
+// SPDX-License-Identifier: GPL-2.0
+#include <linux/kernel.h>
+#include <linux/errno.h>
+#include <linux/fs.h>
+#include <linux/file.h>
+#include <linux/io_uring.h>
+
+#include <uapi/linux/io_uring.h>
+
+#include "io_uring.h"
+#include "register.h"
+#include "chan.h"
+
+/*
+ * ctx1 is already locked on entry, both will be locked on return.
+ */
+static void io_ctx_double_lock(struct io_ring_ctx *ctx1,
+                              struct io_ring_ctx *ctx2)
+{
+       if (ctx1 < ctx2) {
+               mutex_lock_nested(&ctx2->uring_lock, SINGLE_DEPTH_NESTING);
+       } else {
+               mutex_unlock(&ctx1->uring_lock);
+               mutex_lock(&ctx2->uring_lock);
+               mutex_lock_nested(&ctx1->uring_lock, SINGLE_DEPTH_NESTING);
+       }
+}
+
+void io_unregister_queue_chans(struct io_ring_ctx *ctx)
+{
+       struct io_queue_chan *c;
+       unsigned long index;
+
+       lockdep_assert_held(&ctx->uring_lock);
+
+       xa_for_each(&ctx->xa_src_chan, index, c) {
+               if (atomic_dec_and_test(&c->refs))
+                       kfree_rcu(c, rcu_head);
+       }
+       xa_for_each(&ctx->xa_dst_chan, index, c) {
+               if (atomic_dec_and_test(&c->refs))
+                       kfree_rcu(c, rcu_head);
+       }
+       xa_destroy(&ctx->xa_src_chan);
+       xa_destroy(&ctx->xa_dst_chan);
+}
+
+struct chan_ids {
+       __u32 src_id;
+       __u32 dst_id;
+};
+
+static struct io_queue_chan *__io_register_queue_chan(struct io_ring_ctx *ctx,
+                                                     struct io_ring_ctx *dst,
+                                                     struct io_uring_chan_reg *chan,
+                                                     struct chan_ids *ids)
+{
+       struct xa_limit lim = { .max = SHRT_MAX, .min = 0 };
+       struct io_queue_chan *c;
+       size_t chan_size;
+       int ret;
+
+       if (percpu_ref_is_dying(&dst->refs))
+               return ERR_PTR(-ENXIO);
+
+       chan_size = struct_size(c, data, chan->nentries);
+       if (chan_size == SIZE_MAX || chan_size > KMALLOC_MAX_SIZE)
+               return ERR_PTR(-EOVERFLOW);
+
+       c = kzalloc(chan_size, GFP_KERNEL_ACCOUNT);
+       if (!c)
+               return ERR_PTR(-ENOMEM);
+
+       /*
+        * One ref for each ring that is attached to an endpoint. Having refs
+        * != 2 then also means that one end has detached and the channel
+        * can be considered dead.
+        */
+       atomic_set(&c->refs, 2);
+       c->nentries = chan->nentries;
+       c->mask = chan->nentries - 1;
+
+       ret = xa_alloc(&ctx->xa_src_chan, &ids->src_id, c, lim, GFP_KERNEL_ACCOUNT);
+       if (ret) {
+               kfree_rcu(c, rcu_head);
+               return ERR_PTR(ret);
+       }
+
+       ret = xa_alloc(&dst->xa_dst_chan, &ids->dst_id, c, lim, GFP_KERNEL_ACCOUNT);
+       if (ret) {
+               xa_erase(&ctx->xa_src_chan, ids->src_id);
+               kfree_rcu(c, rcu_head);
+               return ERR_PTR(ret);
+       }
+
+       return c;
+}
+
+static void io_chan_free(struct io_ring_ctx *ctx, struct io_ring_ctx *dst,
+                        struct chan_ids *ids)
+{
+       struct io_queue_chan *c;
+
+       c = xa_erase(&ctx->xa_src_chan, ids->src_id);
+       xa_erase(&dst->xa_dst_chan, ids->dst_id);
+       percpu_ref_put(&dst->refs);
+       atomic_sub(2, &c->refs);
+       kfree_rcu(c, rcu_head);
+}
+
+static bool valid_ring_flags(struct io_ring_ctx *ctx)
+{
+       /*
+        * Must be DEFER_TASKRUN (could be relaxed) and CQE32 to be able to
+        * send enough data.
+        */
+       if ((ctx->flags & (IORING_SETUP_DEFER_TASKRUN|IORING_SETUP_CQE32)) !=
+           (IORING_SETUP_DEFER_TASKRUN|IORING_SETUP_CQE32))
+               return false;
+       return true;
+}
+
+int io_register_add_queue_chan(struct io_ring_ctx *ctx, void __user *arg)
+{
+       struct chan_ids ids1 = { }, ids2 = { };
+       struct io_uring_chan_reg chan;
+       struct io_queue_chan *c;
+       struct io_ring_ctx *dst;
+       struct file *file;
+       int ret;
+
+       lockdep_assert_held(&ctx->uring_lock);
+
+       if (copy_from_user(&chan, arg, sizeof(chan)))
+               return -EFAULT;
+       if (chan.flags & ~IORING_CHAN_REG_BIDI)
+               return -EINVAL;
+       if (!is_power_of_2(chan.nentries))
+               return -EINVAL;
+       if (memchr_inv(&chan.resv, 0, sizeof(chan.resv)))
+               return -EINVAL;
+
+       file = io_uring_register_get_file(chan.dst_fd, false);
+       if (IS_ERR(file))
+               return PTR_ERR(file);
+       dst = file->private_data;
+       if (dst == ctx) {
+               ret = -EINVAL;
+               goto err;
+       }
+       if (!valid_ring_flags(dst)) {
+               ret = -EINVAL;
+               goto err;
+       }
+       if (chan.flags & IORING_CHAN_REG_BIDI && !valid_ring_flags(ctx)) {
+               ret = -EINVAL;
+               goto err;
+       }
+
+       io_ctx_double_lock(ctx, dst);
+       c = __io_register_queue_chan(ctx, dst, &chan, &ids1);
+       if (IS_ERR(c)) {
+               ret = PTR_ERR(c);
+               goto unlock;
+       }
+       if (chan.flags & IORING_CHAN_REG_BIDI) {
+               struct io_queue_chan *c2;
+
+               c2 = __io_register_queue_chan(dst, ctx, &chan, &ids2);
+               if (IS_ERR(c2)) {
+                       ret = PTR_ERR(c2);
+                       io_chan_free(ctx, dst, &ids1);
+                       goto unlock;
+               }
+               c->resp_id = ids2.src_id;
+       }
+       ret = ids1.src_id;
+unlock:
+       mutex_unlock(&dst->uring_lock);
+err:
+       fput(file);
+       return ret;
+}
diff --git a/io_uring/chan.h b/io_uring/chan.h
new file mode 100644 (file)
index 0000000..cea16b5
--- /dev/null
@@ -0,0 +1,20 @@
+// SPDX-License-Identifier: GPL-2.0
+struct io_queue_chan_entry {
+       struct io_uring_cqe     cqes[2];
+};
+
+struct io_queue_chan {
+       struct {
+               atomic_t                refs;
+               __u32                   head;
+       } ____cacheline_aligned_in_smp;
+       __u32                           nentries;
+       __u32                           mask;
+       __u32                           tail;
+       __u32                           resp_id;
+       struct rcu_head                 rcu_head;
+       struct io_queue_chan_entry      data[];
+};
+
+int io_register_add_queue_chan(struct io_ring_ctx *ctx, void __user *arg);
+void io_unregister_queue_chans(struct io_ring_ctx *ctx);
index 92279685533667a54d7d813383287804dd145a41..41ea45ecc080605dd3433693bfb8fca5a17e9d93 100644 (file)
@@ -346,6 +346,8 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
                goto free_ref;
        init_completion(&ctx->ref_comp);
        xa_init_flags(&ctx->personalities, XA_FLAGS_ALLOC1);
+       xa_init_flags(&ctx->xa_src_chan, XA_FLAGS_ALLOC1);
+       xa_init_flags(&ctx->xa_dst_chan, XA_FLAGS_ALLOC1);
        mutex_init(&ctx->uring_lock);
        init_waitqueue_head(&ctx->cq_wait);
        init_waitqueue_head(&ctx->poll_wq);
@@ -3097,6 +3099,7 @@ static __cold void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
        percpu_ref_kill(&ctx->refs);
        xa_for_each(&ctx->personalities, index, creds)
                io_unregister_personality(ctx, index);
+       io_unregister_queue_chans(ctx);
        mutex_unlock(&ctx->uring_lock);
 
        flush_delayed_work(&ctx->fallback_work);
index a1a9b2884eae5e76e03c4535e8c935788d9bca3e..2c8a14b87d7d5a328e22acdf79a0430f3cfb2cf6 100644 (file)
@@ -31,6 +31,7 @@
 #include "msg_ring.h"
 #include "memmap.h"
 #include "zcrx.h"
+#include "chan.h"
 
 #define IORING_MAX_RESTRICTIONS        (IORING_RESTRICTION_LAST + \
                                 IORING_REGISTER_LAST + IORING_OP_LAST)
@@ -836,6 +837,12 @@ static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
                        break;
                ret = io_register_mem_region(ctx, arg);
                break;
+       case IORING_REGISTER_ADD_CHAN:
+               ret = -EINVAL;
+               if (!arg || nr_args != 1)
+                       break;
+               ret = io_register_add_queue_chan(ctx, arg);
+               break;
        default:
                ret = -EINVAL;
                break;
index a5f39d5ef9e0e4588134c6b50ae4905430f8e1d1..8b40ea183d9d7d2b14758f4fcd77c32ee6f14978 100644 (file)
@@ -5,5 +5,6 @@
 int io_eventfd_unregister(struct io_ring_ctx *ctx);
 int io_unregister_personality(struct io_ring_ctx *ctx, unsigned id);
 struct file *io_uring_register_get_file(unsigned int fd, bool registered);
+void io_unregister_queue_chans(struct io_ring_ctx *ctx);
 
 #endif