From: Jens Axboe Date: Thu, 24 Jul 2025 18:41:30 +0000 (-0600) Subject: io_uring/chan: add support for IORING_OP_CHAN_POST X-Git-Url: https://git.kernel.dk/?a=commitdiff_plain;h=11e8bd03794364ebfc3e3603334896e76e6fa976;p=linux-block.git io_uring/chan: add support for IORING_OP_CHAN_POST If a ring has setup a unidirectional communication line between two rings, it can use IORING_OP_CHAN_POST to efficiently send data from one ring to another. The destination ring must be setup with DEFER_TASKRUN and be able to post 32b CQEs. The former for efficiency reasons, the latter to be able to pass enough information in a IORING_OP_CHAN_POST request. On the source ring, the SQE must be setup as follows: ->fd Queue to target. Queue IDs are 1..USHRT_MAX and returned from io_uring_register() with IORING_OP_REGISTER_CHAN. ->rw_flags Modifier flags. Supports IORING_CHAN_IDLE for now, which picks a source queue that is currently idle (eg waiting on CQEs). If that flag is set, ->fd must be set to zero. ->addr Target ring cqe->user_data will be set to this value. ->off Target ring cqe->big_cqe[0] will be set to this value. On the source ring, cqe->res will be set to < 0 in case of error. For success, cqe->res will be set to the target queue ID that received the message. On the target, the cqe->res will be the queue ID to use for a response, if the communication channel has been setup with IORING_CHAN_REG_BIDI. If the channel is non-bidi, then the result will be 0. Signed-off-by: Jens Axboe --- diff --git a/include/linux/io_uring_types.h b/include/linux/io_uring_types.h index d03b93b14b28..d5966520074c 100644 --- a/include/linux/io_uring_types.h +++ b/include/linux/io_uring_types.h @@ -357,6 +357,7 @@ struct io_ring_ctx { struct llist_head retry_llist; unsigned long check_cq; atomic_t cq_wait_nr; + atomic_t chan_flags; atomic_t cq_timeouts; struct wait_queue_head cq_wait; } ____cacheline_aligned_in_smp; diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h index f36d59051399..a6ce632b170f 100644 --- a/include/uapi/linux/io_uring.h +++ b/include/uapi/linux/io_uring.h @@ -295,6 +295,7 @@ enum io_uring_op { IORING_OP_READV_FIXED, IORING_OP_WRITEV_FIXED, IORING_OP_PIPE, + IORING_OP_CHAN_POST, /* this goes last, obviously */ IORING_OP_LAST, @@ -462,6 +463,15 @@ enum io_uring_msg_ring_flags { #define IORING_NOP_TW (1U << 4) #define IORING_NOP_CQE32 (1U << 5) +/* + * IORING_OP_CHAN_POST flags (sqe->rw_flags) + * + * IORING_CHAN_POST_IDLE Rather than target a specific queue via + * the sqe->fd field, find any idle queue + * and post it there. + */ +#define IORING_CHAN_POST_IDLE 0x1 + /* * IO completion data structure (Completion Queue Entry) */ diff --git a/io_uring/chan.c b/io_uring/chan.c index e205bfd1f286..ebda7179544f 100644 --- a/io_uring/chan.c +++ b/io_uring/chan.c @@ -11,6 +11,13 @@ #include "register.h" #include "chan.h" +struct io_chan_post { + struct file *file; + unsigned int queue; + unsigned int flags; + struct io_uring_cqe cqes[2]; +}; + /* * ctx1 is already locked on entry, both will be locked on return. */ @@ -35,16 +42,18 @@ void io_unregister_queue_chans(struct io_ring_ctx *ctx) rcu_read_lock(); xa_for_each(&ctx->xa_src_chan, index, c) { + atomic_or(RING_CHAN_DEAD, &c->flags); if (atomic_dec_and_test(&c->refs)) - kfree_rcu(c, rcu_head); + kfree_rcu(c, req.rcu_head); } xa_for_each(&ctx->xa_dst_chan, index, c) { + atomic_or(RING_CHAN_DEAD, &c->flags); if (rcu_dereference(c->dst_ring) == ctx) { percpu_ref_put(&ctx->refs); rcu_assign_pointer(c->dst_ring, NULL); } if (atomic_dec_and_test(&c->refs)) - kfree_rcu(c, rcu_head); + kfree_rcu(c, req.rcu_head); } rcu_read_unlock(); xa_destroy(&ctx->xa_src_chan); @@ -85,17 +94,18 @@ static struct io_queue_chan *__io_register_queue_chan(struct io_ring_ctx *ctx, atomic_set(&c->refs, 2); c->nentries = chan->nentries; c->mask = chan->nentries - 1; + c->req.ctx = dst; ret = xa_alloc(&ctx->xa_src_chan, &ids->src_id, c, lim, GFP_KERNEL_ACCOUNT); if (ret) { - kfree_rcu(c, rcu_head); + kfree_rcu(c, req.rcu_head); return ERR_PTR(ret); } ret = xa_alloc(&dst->xa_dst_chan, &ids->dst_id, c, lim, GFP_KERNEL_ACCOUNT); if (ret) { xa_erase(&ctx->xa_src_chan, ids->src_id); - kfree_rcu(c, rcu_head); + kfree_rcu(c, req.rcu_head); return ERR_PTR(ret); } @@ -113,17 +123,18 @@ static void io_chan_free(struct io_ring_ctx *ctx, struct io_ring_ctx *dst, xa_erase(&dst->xa_dst_chan, ids->dst_id); percpu_ref_put(&dst->refs); atomic_sub(2, &c->refs); - kfree_rcu(c, rcu_head); + kfree_rcu(c, req.rcu_head); } static bool valid_ring_flags(struct io_ring_ctx *ctx) { /* - * Must be DEFER_TASKRUN (could be relaxed) and CQE32 to be able to - * send enough data. + * Must be DEFER_TASKRUN (could be relaxed) and be able to post 32b + * CQEs. */ - if ((ctx->flags & (IORING_SETUP_DEFER_TASKRUN|IORING_SETUP_CQE32)) != - (IORING_SETUP_DEFER_TASKRUN|IORING_SETUP_CQE32)) + if (!(ctx->flags & IORING_SETUP_DEFER_TASKRUN)) + return false; + if (!(ctx->flags & (IORING_SETUP_CQE32|IORING_SETUP_CQE_MIXED))) return false; return true; } @@ -189,3 +200,182 @@ err: fput(file); return ret; } + +static void io_flush_chan(struct io_ring_ctx *ctx, struct io_queue_chan *c) +{ + u32 tail, head = c->head; + + tail = smp_load_acquire(&c->tail); + if (tail == head) + return; + + if (atomic_read(&c->flags) & RING_CHAN_OVERFLOW) + return; + + while (head < tail) { + struct io_queue_chan_entry *e = &c->data[head & c->mask]; + + /* + * If we fail posting a CQE, mark this ring as needing to + * ignore channel postings until overflow has been cleared. + * Overflow clearing will clear IO_CHECK_IGNORE_CHAN_BIT as + * well. + */ + if (!io_add_aux_cqe32(ctx, e->cqes)) { + atomic_or(RING_CHAN_OVERFLOW, &c->flags); + break; + } + head++; + } + smp_store_release(&c->head, head); +} + +static void io_flush_chans(struct io_ring_ctx *ctx) +{ + struct io_queue_chan *c; + unsigned long index; + + xa_for_each(&ctx->xa_dst_chan, index, c) + io_flush_chan(ctx, c); +} + +static void io_chan_tw(struct io_kiocb *req, io_tw_token_t tw) +{ + struct io_queue_chan *c = container_of(req, struct io_queue_chan, req); + struct io_ring_ctx *ctx = req->ctx; + + atomic_fetch_andnot_acquire(1, &ctx->chan_flags); + io_flush_chans(ctx); + percpu_ref_put(&ctx->refs); + if (atomic_dec_and_test(&c->refs)) + kfree_rcu(c, req.rcu_head); +} + +static void io_chan_tw_queue(struct io_ring_ctx *ctx, struct io_queue_chan *c) +{ + struct io_kiocb *req = &c->req; + + if (atomic_fetch_or(1, &ctx->chan_flags)) + return; + req->io_task_work.func = io_chan_tw; + percpu_ref_get(&ctx->refs); + atomic_inc(&c->refs); + io_req_task_work_add_remote(req, 0); +} + +void io_chan_clear_overflow(struct io_ring_ctx *ctx) +{ + struct io_queue_chan *c; + unsigned long index; + + rcu_read_lock(); + xa_for_each(&ctx->xa_dst_chan, index, c) { + struct io_ring_ctx *dst_ctx = rcu_dereference(c->dst_ring); + + atomic_andnot(RING_CHAN_OVERFLOW, &c->flags); + if (dst_ctx) + io_chan_tw_queue(dst_ctx, c); + } + rcu_read_unlock(); +} + +static struct io_queue_chan *io_chan_find_idle(struct io_ring_ctx *ctx) +{ + struct io_queue_chan *c; + unsigned long index; + + xa_for_each(&ctx->xa_src_chan, index, c) { + struct io_ring_ctx *dst_ctx = rcu_dereference(c->dst_ring); + + if (!dst_ctx) + continue; + + if (c->head != c->tail) + continue; + + /* + * Not 100% reliable, but should be good enough. It'll find + * a task waiting for io_uring events, which is what we + * care about. Could be combined with TASK_INTERRUPTIBLE + * ->submitter_task check for higher accuracy. + */ + if (atomic_read(&dst_ctx->cq_wait_nr) <= 0) + continue; + return c; + } + + return NULL; +} + +int io_chan_post_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) +{ + struct io_chan_post *icp = io_kiocb_to_cmd(req, struct io_chan_post); + + if (sqe->len || sqe->personality || sqe->splice_fd_in | sqe->addr3) + return -EINVAL; + + icp->queue = READ_ONCE(sqe->fd); + icp->flags = READ_ONCE(sqe->rw_flags); + if (icp->flags & ~IORING_CHAN_POST_IDLE) + return -EINVAL; + + icp->cqes->user_data = READ_ONCE(sqe->addr); + icp->cqes->flags = 0; + icp->cqes->big_cqe[0] = READ_ONCE(sqe->off); + icp->cqes->big_cqe[1] = 0; + return 0; +} + +int io_chan_post(struct io_kiocb *req, unsigned int issue_flags) +{ + struct io_chan_post *icp = io_kiocb_to_cmd(req, struct io_chan_post); + struct io_ring_ctx *dst_ctx, *ctx = req->ctx; + struct io_queue_chan_entry *e; + struct task_struct *task; + struct io_queue_chan *c; + __u32 head, tail; + int ret; + + io_ring_submit_lock(ctx, issue_flags); + rcu_read_lock(); + if (icp->flags & IORING_CHAN_POST_IDLE) + c = io_chan_find_idle(ctx); + else + c = xa_load(&ctx->xa_src_chan, icp->queue); + + if (unlikely(!c || atomic_read(&c->flags) & RING_CHAN_DEAD)) { + ret = -ENXIO; + goto err; + } + /* ours must be the source end of the channel */ + dst_ctx = rcu_dereference(c->dst_ring); + if (unlikely(!dst_ctx)) + goto is_dead; + task = READ_ONCE(dst_ctx->submitter_task); + if (unlikely(!task)) { +is_dead: + ret = -EOWNERDEAD; + goto err; + } + + head = smp_load_acquire(&c->head); + tail = c->tail; + if (tail - head >= c->nentries) { + ret = -EXFULL; + goto err; + } + /* fill in entry */ + e = &c->data[tail & c->mask]; + icp->cqes->res = c->resp_id; + memcpy(e->cqes, icp->cqes, sizeof(icp->cqes)); + smp_store_release(&c->tail, tail + 1); + io_chan_tw_queue(dst_ctx, c); + ret = 0; +err: + rcu_read_unlock(); + io_ring_submit_unlock(ctx, issue_flags); + if (ret < 0) + req_set_fail(req); + io_req_set_res(req, ret, 0); + return IOU_COMPLETE; +} diff --git a/io_uring/chan.h b/io_uring/chan.h index a8dc962fc61f..d09b449e6d46 100644 --- a/io_uring/chan.h +++ b/io_uring/chan.h @@ -3,19 +3,31 @@ struct io_queue_chan_entry { struct io_uring_cqe cqes[2]; }; +enum { + /* channel destination ring in overflow */ + RING_CHAN_OVERFLOW = 1, + /* other end went away */ + RING_CHAN_DEAD = 2, +}; + struct io_queue_chan { struct { atomic_t refs; __u32 head; + struct io_kiocb req; } ____cacheline_aligned_in_smp; __u32 nentries; __u32 mask; __u32 tail; __u32 resp_id; + atomic_t flags; struct io_ring_ctx __rcu *dst_ring; - struct rcu_head rcu_head; struct io_queue_chan_entry data[]; }; int io_register_add_queue_chan(struct io_ring_ctx *ctx, void __user *arg); void io_unregister_queue_chans(struct io_ring_ctx *ctx); +void io_chan_clear_overflow(struct io_ring_ctx *ctx); + +int io_chan_post_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe); +int io_chan_post(struct io_kiocb *req, unsigned int issue_flags); diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c index 41ea45ecc080..98a56e89deff 100644 --- a/io_uring/io_uring.c +++ b/io_uring/io_uring.c @@ -98,6 +98,7 @@ #include "msg_ring.h" #include "memmap.h" #include "zcrx.h" +#include "chan.h" #include "timeout.h" #include "poll.h" @@ -648,6 +649,7 @@ static void __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool dying) if (list_empty(&ctx->cq_overflow_list)) { clear_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq); + io_chan_clear_overflow(ctx); atomic_andnot(IORING_SQ_CQ_OVERFLOW, &ctx->rings->sq_flags); } io_cq_unlock_post(ctx); diff --git a/io_uring/opdef.c b/io_uring/opdef.c index 9568785810d9..9427cd31a3a9 100644 --- a/io_uring/opdef.c +++ b/io_uring/opdef.c @@ -38,6 +38,7 @@ #include "futex.h" #include "truncate.h" #include "zcrx.h" +#include "chan.h" static int io_no_issue(struct io_kiocb *req, unsigned int issue_flags) { @@ -574,6 +575,11 @@ const struct io_issue_def io_issue_defs[] = { .prep = io_pipe_prep, .issue = io_pipe, }, + [IORING_OP_CHAN_POST] = { + .prep = io_chan_post_prep, + .issue = io_chan_post, + .audit_skip = 1, + }, }; const struct io_cold_def io_cold_defs[] = { @@ -824,6 +830,9 @@ const struct io_cold_def io_cold_defs[] = { [IORING_OP_PIPE] = { .name = "PIPE", }, + [IORING_OP_CHAN_POST] = { + .name = "CHAN_POST", + }, }; const char *io_uring_get_opcode(u8 opcode)