io_uring/chan: add support for IORING_OP_CHAN_POST
authorJens Axboe <axboe@kernel.dk>
Thu, 24 Jul 2025 18:41:30 +0000 (12:41 -0600)
committerJens Axboe <axboe@kernel.dk>
Sat, 9 Aug 2025 14:38:27 +0000 (08:38 -0600)
If a ring has setup a unidirectional communication line between two
rings, it can use IORING_OP_CHAN_POST to efficiently send data from one
ring to another.

The destination ring must be setup with DEFER_TASKRUN and be able to
post 32b CQEs. The former for efficiency reasons, the latter to be able
to pass enough information in a IORING_OP_CHAN_POST request.

On the source ring, the SQE must be setup as follows:

->fd Queue to target. Queue IDs are 1..USHRT_MAX and returned
from io_uring_register() with IORING_OP_REGISTER_CHAN.
->rw_flags Modifier flags. Supports IORING_CHAN_IDLE for now, which
picks a source queue that is currently idle (eg waiting
on CQEs). If that flag is set, ->fd must be set to zero.
->addr Target ring cqe->user_data will be set to this value.
->off Target ring cqe->big_cqe[0] will be set to this value.

On the source ring, cqe->res will be set to < 0 in case of error. For
success, cqe->res will be set to the target queue ID that received the
message.

On the target, the cqe->res will be the queue ID to use for a response,
if the communication channel has been setup with IORING_CHAN_REG_BIDI.
If the channel is non-bidi, then the result will be 0.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
include/linux/io_uring_types.h
include/uapi/linux/io_uring.h
io_uring/chan.c
io_uring/chan.h
io_uring/io_uring.c
io_uring/opdef.c

index d03b93b14b2890055b710048be9b771025612805..d5966520074c74d46544b0511164e692849c0226 100644 (file)
@@ -357,6 +357,7 @@ struct io_ring_ctx {
                struct llist_head       retry_llist;
                unsigned long           check_cq;
                atomic_t                cq_wait_nr;
+               atomic_t                chan_flags;
                atomic_t                cq_timeouts;
                struct wait_queue_head  cq_wait;
        } ____cacheline_aligned_in_smp;
index f36d59051399dd951b1cbc97618733f6ecf74f27..a6ce632b170f01e7faf64ccf771629b5bbe4969e 100644 (file)
@@ -295,6 +295,7 @@ enum io_uring_op {
        IORING_OP_READV_FIXED,
        IORING_OP_WRITEV_FIXED,
        IORING_OP_PIPE,
+       IORING_OP_CHAN_POST,
 
        /* this goes last, obviously */
        IORING_OP_LAST,
@@ -462,6 +463,15 @@ enum io_uring_msg_ring_flags {
 #define IORING_NOP_TW                  (1U << 4)
 #define IORING_NOP_CQE32               (1U << 5)
 
+/*
+ * IORING_OP_CHAN_POST flags (sqe->rw_flags)
+ *
+ * IORING_CHAN_POST_IDLE       Rather than target a specific queue via
+ *                             the sqe->fd field, find any idle queue
+ *                             and post it there.
+ */
+#define IORING_CHAN_POST_IDLE  0x1
+
 /*
  * IO completion data structure (Completion Queue Entry)
  */
index e205bfd1f2861705d7ec583d59b0fc2ffdacb340..ebda7179544fe49509b76af653cfd5df7e26100c 100644 (file)
 #include "register.h"
 #include "chan.h"
 
+struct io_chan_post {
+       struct file             *file;
+       unsigned int            queue;
+       unsigned int            flags;
+       struct io_uring_cqe     cqes[2];
+};
+
 /*
  * ctx1 is already locked on entry, both will be locked on return.
  */
@@ -35,16 +42,18 @@ void io_unregister_queue_chans(struct io_ring_ctx *ctx)
 
        rcu_read_lock();
        xa_for_each(&ctx->xa_src_chan, index, c) {
+               atomic_or(RING_CHAN_DEAD, &c->flags);
                if (atomic_dec_and_test(&c->refs))
-                       kfree_rcu(c, rcu_head);
+                       kfree_rcu(c, req.rcu_head);
        }
        xa_for_each(&ctx->xa_dst_chan, index, c) {
+               atomic_or(RING_CHAN_DEAD, &c->flags);
                if (rcu_dereference(c->dst_ring) == ctx) {
                        percpu_ref_put(&ctx->refs);
                        rcu_assign_pointer(c->dst_ring, NULL);
                }
                if (atomic_dec_and_test(&c->refs))
-                       kfree_rcu(c, rcu_head);
+                       kfree_rcu(c, req.rcu_head);
        }
        rcu_read_unlock();
        xa_destroy(&ctx->xa_src_chan);
@@ -85,17 +94,18 @@ static struct io_queue_chan *__io_register_queue_chan(struct io_ring_ctx *ctx,
        atomic_set(&c->refs, 2);
        c->nentries = chan->nentries;
        c->mask = chan->nentries - 1;
+       c->req.ctx = dst;
 
        ret = xa_alloc(&ctx->xa_src_chan, &ids->src_id, c, lim, GFP_KERNEL_ACCOUNT);
        if (ret) {
-               kfree_rcu(c, rcu_head);
+               kfree_rcu(c, req.rcu_head);
                return ERR_PTR(ret);
        }
 
        ret = xa_alloc(&dst->xa_dst_chan, &ids->dst_id, c, lim, GFP_KERNEL_ACCOUNT);
        if (ret) {
                xa_erase(&ctx->xa_src_chan, ids->src_id);
-               kfree_rcu(c, rcu_head);
+               kfree_rcu(c, req.rcu_head);
                return ERR_PTR(ret);
        }
 
@@ -113,17 +123,18 @@ static void io_chan_free(struct io_ring_ctx *ctx, struct io_ring_ctx *dst,
        xa_erase(&dst->xa_dst_chan, ids->dst_id);
        percpu_ref_put(&dst->refs);
        atomic_sub(2, &c->refs);
-       kfree_rcu(c, rcu_head);
+       kfree_rcu(c, req.rcu_head);
 }
 
 static bool valid_ring_flags(struct io_ring_ctx *ctx)
 {
        /*
-        * Must be DEFER_TASKRUN (could be relaxed) and CQE32 to be able to
-        * send enough data.
+        * Must be DEFER_TASKRUN (could be relaxed) and be able to post 32b
+        * CQEs.
         */
-       if ((ctx->flags & (IORING_SETUP_DEFER_TASKRUN|IORING_SETUP_CQE32)) !=
-           (IORING_SETUP_DEFER_TASKRUN|IORING_SETUP_CQE32))
+       if (!(ctx->flags & IORING_SETUP_DEFER_TASKRUN))
+               return false;
+       if (!(ctx->flags & (IORING_SETUP_CQE32|IORING_SETUP_CQE_MIXED)))
                return false;
        return true;
 }
@@ -189,3 +200,182 @@ err:
        fput(file);
        return ret;
 }
+
+static void io_flush_chan(struct io_ring_ctx *ctx, struct io_queue_chan *c)
+{
+       u32 tail, head = c->head;
+
+       tail = smp_load_acquire(&c->tail);
+       if (tail == head)
+               return;
+
+       if (atomic_read(&c->flags) & RING_CHAN_OVERFLOW)
+               return;
+
+       while (head < tail) {
+               struct io_queue_chan_entry *e = &c->data[head & c->mask];
+
+               /*
+                * If we fail posting a CQE, mark this ring as needing to
+                * ignore channel postings until overflow has been cleared.
+                * Overflow clearing will clear IO_CHECK_IGNORE_CHAN_BIT as
+                * well.
+                */
+               if (!io_add_aux_cqe32(ctx, e->cqes)) {
+                       atomic_or(RING_CHAN_OVERFLOW, &c->flags);
+                       break;
+               }
+               head++;
+       }
+       smp_store_release(&c->head, head);
+}
+
+static void io_flush_chans(struct io_ring_ctx *ctx)
+{
+       struct io_queue_chan *c;
+       unsigned long index;
+
+       xa_for_each(&ctx->xa_dst_chan, index, c)
+               io_flush_chan(ctx, c);
+}
+
+static void io_chan_tw(struct io_kiocb *req, io_tw_token_t tw)
+{
+       struct io_queue_chan *c = container_of(req, struct io_queue_chan, req);
+       struct io_ring_ctx *ctx = req->ctx;
+
+       atomic_fetch_andnot_acquire(1, &ctx->chan_flags);
+       io_flush_chans(ctx);
+       percpu_ref_put(&ctx->refs);
+       if (atomic_dec_and_test(&c->refs))
+               kfree_rcu(c, req.rcu_head);
+}
+
+static void io_chan_tw_queue(struct io_ring_ctx *ctx, struct io_queue_chan *c)
+{
+       struct io_kiocb *req = &c->req;
+
+       if (atomic_fetch_or(1, &ctx->chan_flags))
+               return;
+       req->io_task_work.func = io_chan_tw;
+       percpu_ref_get(&ctx->refs);
+       atomic_inc(&c->refs);
+       io_req_task_work_add_remote(req, 0);
+}
+
+void io_chan_clear_overflow(struct io_ring_ctx *ctx)
+{
+       struct io_queue_chan *c;
+       unsigned long index;
+
+       rcu_read_lock();
+       xa_for_each(&ctx->xa_dst_chan, index, c) {
+               struct io_ring_ctx *dst_ctx = rcu_dereference(c->dst_ring);
+
+               atomic_andnot(RING_CHAN_OVERFLOW, &c->flags);
+               if (dst_ctx)
+                       io_chan_tw_queue(dst_ctx, c);
+       }
+       rcu_read_unlock();
+}
+
+static struct io_queue_chan *io_chan_find_idle(struct io_ring_ctx *ctx)
+{
+       struct io_queue_chan *c;
+       unsigned long index;
+
+       xa_for_each(&ctx->xa_src_chan, index, c) {
+               struct io_ring_ctx *dst_ctx = rcu_dereference(c->dst_ring);
+
+               if (!dst_ctx)
+                       continue;
+
+               if (c->head != c->tail)
+                       continue;
+
+               /*
+                * Not 100% reliable, but should be good enough. It'll find
+                * a task waiting for io_uring events, which is what we
+                * care about. Could be combined with TASK_INTERRUPTIBLE
+                * ->submitter_task check for higher accuracy.
+                */
+               if (atomic_read(&dst_ctx->cq_wait_nr) <= 0)
+                       continue;
+               return c;
+       }
+
+       return NULL;
+}
+
+int io_chan_post_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
+{
+       struct io_chan_post *icp = io_kiocb_to_cmd(req, struct io_chan_post);
+
+       if (sqe->len || sqe->personality || sqe->splice_fd_in | sqe->addr3)
+               return -EINVAL;
+
+       icp->queue = READ_ONCE(sqe->fd);
+       icp->flags = READ_ONCE(sqe->rw_flags);
+       if (icp->flags & ~IORING_CHAN_POST_IDLE)
+               return -EINVAL;
+
+       icp->cqes->user_data = READ_ONCE(sqe->addr);
+       icp->cqes->flags = 0;
+       icp->cqes->big_cqe[0] = READ_ONCE(sqe->off);
+       icp->cqes->big_cqe[1] = 0;
+       return 0;
+}
+
+int io_chan_post(struct io_kiocb *req, unsigned int issue_flags)
+{
+       struct io_chan_post *icp = io_kiocb_to_cmd(req, struct io_chan_post);
+       struct io_ring_ctx *dst_ctx, *ctx = req->ctx;
+       struct io_queue_chan_entry *e;
+       struct task_struct *task;
+       struct io_queue_chan *c;
+       __u32 head, tail;
+       int ret;
+
+       io_ring_submit_lock(ctx, issue_flags);
+       rcu_read_lock();
+       if (icp->flags & IORING_CHAN_POST_IDLE)
+               c = io_chan_find_idle(ctx);
+       else
+               c = xa_load(&ctx->xa_src_chan, icp->queue);
+
+       if (unlikely(!c || atomic_read(&c->flags) & RING_CHAN_DEAD)) {
+               ret = -ENXIO;
+               goto err;
+       }
+       /* ours must be the source end of the channel */
+       dst_ctx = rcu_dereference(c->dst_ring);
+       if (unlikely(!dst_ctx))
+               goto is_dead;
+       task = READ_ONCE(dst_ctx->submitter_task);
+       if (unlikely(!task)) {
+is_dead:
+               ret = -EOWNERDEAD;
+               goto err;
+       }
+
+       head = smp_load_acquire(&c->head);
+       tail = c->tail;
+       if (tail - head >= c->nentries) {
+               ret = -EXFULL;
+               goto err;
+       }
+       /* fill in entry */
+       e = &c->data[tail & c->mask];
+       icp->cqes->res = c->resp_id;
+       memcpy(e->cqes, icp->cqes, sizeof(icp->cqes));
+       smp_store_release(&c->tail, tail + 1);
+       io_chan_tw_queue(dst_ctx, c);
+       ret = 0;
+err:
+       rcu_read_unlock();
+       io_ring_submit_unlock(ctx, issue_flags);
+       if (ret < 0)
+               req_set_fail(req);
+       io_req_set_res(req, ret, 0);
+       return IOU_COMPLETE;
+}
index a8dc962fc61f4363643335acd81c4497d0e1833c..d09b449e6d46e3d462bd1e73f14123c44eb49410 100644 (file)
@@ -3,19 +3,31 @@ struct io_queue_chan_entry {
        struct io_uring_cqe     cqes[2];
 };
 
+enum {
+       /* channel destination ring in overflow */
+       RING_CHAN_OVERFLOW      = 1,
+       /* other end went away */
+       RING_CHAN_DEAD          = 2,
+};
+
 struct io_queue_chan {
        struct {
                atomic_t                refs;
                __u32                   head;
+               struct io_kiocb         req;
        } ____cacheline_aligned_in_smp;
        __u32                           nentries;
        __u32                           mask;
        __u32                           tail;
        __u32                           resp_id;
+       atomic_t                        flags;
        struct io_ring_ctx __rcu        *dst_ring;
-       struct rcu_head                 rcu_head;
        struct io_queue_chan_entry      data[];
 };
 
 int io_register_add_queue_chan(struct io_ring_ctx *ctx, void __user *arg);
 void io_unregister_queue_chans(struct io_ring_ctx *ctx);
+void io_chan_clear_overflow(struct io_ring_ctx *ctx);
+
+int io_chan_post_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe);
+int io_chan_post(struct io_kiocb *req, unsigned int issue_flags);
index 41ea45ecc080605dd3433693bfb8fca5a17e9d93..98a56e89deff69a2a7a6c1af3ba119a7264be3fb 100644 (file)
@@ -98,6 +98,7 @@
 #include "msg_ring.h"
 #include "memmap.h"
 #include "zcrx.h"
+#include "chan.h"
 
 #include "timeout.h"
 #include "poll.h"
@@ -648,6 +649,7 @@ static void __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool dying)
 
        if (list_empty(&ctx->cq_overflow_list)) {
                clear_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq);
+               io_chan_clear_overflow(ctx);
                atomic_andnot(IORING_SQ_CQ_OVERFLOW, &ctx->rings->sq_flags);
        }
        io_cq_unlock_post(ctx);
index 9568785810d9e498233a0ae02a15c31fa91ab5a8..9427cd31a3a97d358fba7cb4a449acbd03b0bc24 100644 (file)
@@ -38,6 +38,7 @@
 #include "futex.h"
 #include "truncate.h"
 #include "zcrx.h"
+#include "chan.h"
 
 static int io_no_issue(struct io_kiocb *req, unsigned int issue_flags)
 {
@@ -574,6 +575,11 @@ const struct io_issue_def io_issue_defs[] = {
                .prep                   = io_pipe_prep,
                .issue                  = io_pipe,
        },
+       [IORING_OP_CHAN_POST] = {
+               .prep                   = io_chan_post_prep,
+               .issue                  = io_chan_post,
+               .audit_skip             = 1,
+       },
 };
 
 const struct io_cold_def io_cold_defs[] = {
@@ -824,6 +830,9 @@ const struct io_cold_def io_cold_defs[] = {
        [IORING_OP_PIPE] = {
                .name                   = "PIPE",
        },
+       [IORING_OP_CHAN_POST] = {
+               .name                   = "CHAN_POST",
+       },
 };
 
 const char *io_uring_get_opcode(u8 opcode)