#include "register.h"
#include "chan.h"
+struct io_chan_post {
+ struct file *file;
+ unsigned int queue;
+ unsigned int flags;
+ struct io_uring_cqe cqes[2];
+};
+
/*
* ctx1 is already locked on entry, both will be locked on return.
*/
rcu_read_lock();
xa_for_each(&ctx->xa_src_chan, index, c) {
+ atomic_or(RING_CHAN_DEAD, &c->flags);
if (atomic_dec_and_test(&c->refs))
- kfree_rcu(c, rcu_head);
+ kfree_rcu(c, req.rcu_head);
}
xa_for_each(&ctx->xa_dst_chan, index, c) {
+ atomic_or(RING_CHAN_DEAD, &c->flags);
if (rcu_dereference(c->dst_ring) == ctx) {
percpu_ref_put(&ctx->refs);
rcu_assign_pointer(c->dst_ring, NULL);
}
if (atomic_dec_and_test(&c->refs))
- kfree_rcu(c, rcu_head);
+ kfree_rcu(c, req.rcu_head);
}
rcu_read_unlock();
xa_destroy(&ctx->xa_src_chan);
atomic_set(&c->refs, 2);
c->nentries = chan->nentries;
c->mask = chan->nentries - 1;
+ c->req.ctx = dst;
ret = xa_alloc(&ctx->xa_src_chan, &ids->src_id, c, lim, GFP_KERNEL_ACCOUNT);
if (ret) {
- kfree_rcu(c, rcu_head);
+ kfree_rcu(c, req.rcu_head);
return ERR_PTR(ret);
}
ret = xa_alloc(&dst->xa_dst_chan, &ids->dst_id, c, lim, GFP_KERNEL_ACCOUNT);
if (ret) {
xa_erase(&ctx->xa_src_chan, ids->src_id);
- kfree_rcu(c, rcu_head);
+ kfree_rcu(c, req.rcu_head);
return ERR_PTR(ret);
}
xa_erase(&dst->xa_dst_chan, ids->dst_id);
percpu_ref_put(&dst->refs);
atomic_sub(2, &c->refs);
- kfree_rcu(c, rcu_head);
+ kfree_rcu(c, req.rcu_head);
}
static bool valid_ring_flags(struct io_ring_ctx *ctx)
{
/*
- * Must be DEFER_TASKRUN (could be relaxed) and CQE32 to be able to
- * send enough data.
+ * Must be DEFER_TASKRUN (could be relaxed) and be able to post 32b
+ * CQEs.
*/
- if ((ctx->flags & (IORING_SETUP_DEFER_TASKRUN|IORING_SETUP_CQE32)) !=
- (IORING_SETUP_DEFER_TASKRUN|IORING_SETUP_CQE32))
+ if (!(ctx->flags & IORING_SETUP_DEFER_TASKRUN))
+ return false;
+ if (!(ctx->flags & (IORING_SETUP_CQE32|IORING_SETUP_CQE_MIXED)))
return false;
return true;
}
fput(file);
return ret;
}
+
+static void io_flush_chan(struct io_ring_ctx *ctx, struct io_queue_chan *c)
+{
+ u32 tail, head = c->head;
+
+ tail = smp_load_acquire(&c->tail);
+ if (tail == head)
+ return;
+
+ if (atomic_read(&c->flags) & RING_CHAN_OVERFLOW)
+ return;
+
+ while (head < tail) {
+ struct io_queue_chan_entry *e = &c->data[head & c->mask];
+
+ /*
+ * If we fail posting a CQE, mark this ring as needing to
+ * ignore channel postings until overflow has been cleared.
+ * Overflow clearing will clear IO_CHECK_IGNORE_CHAN_BIT as
+ * well.
+ */
+ if (!io_add_aux_cqe32(ctx, e->cqes)) {
+ atomic_or(RING_CHAN_OVERFLOW, &c->flags);
+ break;
+ }
+ head++;
+ }
+ smp_store_release(&c->head, head);
+}
+
+static void io_flush_chans(struct io_ring_ctx *ctx)
+{
+ struct io_queue_chan *c;
+ unsigned long index;
+
+ xa_for_each(&ctx->xa_dst_chan, index, c)
+ io_flush_chan(ctx, c);
+}
+
+static void io_chan_tw(struct io_kiocb *req, io_tw_token_t tw)
+{
+ struct io_queue_chan *c = container_of(req, struct io_queue_chan, req);
+ struct io_ring_ctx *ctx = req->ctx;
+
+ atomic_fetch_andnot_acquire(1, &ctx->chan_flags);
+ io_flush_chans(ctx);
+ percpu_ref_put(&ctx->refs);
+ if (atomic_dec_and_test(&c->refs))
+ kfree_rcu(c, req.rcu_head);
+}
+
+static void io_chan_tw_queue(struct io_ring_ctx *ctx, struct io_queue_chan *c)
+{
+ struct io_kiocb *req = &c->req;
+
+ if (atomic_fetch_or(1, &ctx->chan_flags))
+ return;
+ req->io_task_work.func = io_chan_tw;
+ percpu_ref_get(&ctx->refs);
+ atomic_inc(&c->refs);
+ io_req_task_work_add_remote(req, 0);
+}
+
+void io_chan_clear_overflow(struct io_ring_ctx *ctx)
+{
+ struct io_queue_chan *c;
+ unsigned long index;
+
+ rcu_read_lock();
+ xa_for_each(&ctx->xa_dst_chan, index, c) {
+ struct io_ring_ctx *dst_ctx = rcu_dereference(c->dst_ring);
+
+ atomic_andnot(RING_CHAN_OVERFLOW, &c->flags);
+ if (dst_ctx)
+ io_chan_tw_queue(dst_ctx, c);
+ }
+ rcu_read_unlock();
+}
+
+static struct io_queue_chan *io_chan_find_idle(struct io_ring_ctx *ctx)
+{
+ struct io_queue_chan *c;
+ unsigned long index;
+
+ xa_for_each(&ctx->xa_src_chan, index, c) {
+ struct io_ring_ctx *dst_ctx = rcu_dereference(c->dst_ring);
+
+ if (!dst_ctx)
+ continue;
+
+ if (c->head != c->tail)
+ continue;
+
+ /*
+ * Not 100% reliable, but should be good enough. It'll find
+ * a task waiting for io_uring events, which is what we
+ * care about. Could be combined with TASK_INTERRUPTIBLE
+ * ->submitter_task check for higher accuracy.
+ */
+ if (atomic_read(&dst_ctx->cq_wait_nr) <= 0)
+ continue;
+ return c;
+ }
+
+ return NULL;
+}
+
+int io_chan_post_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
+{
+ struct io_chan_post *icp = io_kiocb_to_cmd(req, struct io_chan_post);
+
+ if (sqe->len || sqe->personality || sqe->splice_fd_in | sqe->addr3)
+ return -EINVAL;
+
+ icp->queue = READ_ONCE(sqe->fd);
+ icp->flags = READ_ONCE(sqe->rw_flags);
+ if (icp->flags & ~IORING_CHAN_POST_IDLE)
+ return -EINVAL;
+
+ icp->cqes->user_data = READ_ONCE(sqe->addr);
+ icp->cqes->flags = 0;
+ icp->cqes->big_cqe[0] = READ_ONCE(sqe->off);
+ icp->cqes->big_cqe[1] = 0;
+ return 0;
+}
+
+int io_chan_post(struct io_kiocb *req, unsigned int issue_flags)
+{
+ struct io_chan_post *icp = io_kiocb_to_cmd(req, struct io_chan_post);
+ struct io_ring_ctx *dst_ctx, *ctx = req->ctx;
+ struct io_queue_chan_entry *e;
+ struct task_struct *task;
+ struct io_queue_chan *c;
+ __u32 head, tail;
+ int ret;
+
+ io_ring_submit_lock(ctx, issue_flags);
+ rcu_read_lock();
+ if (icp->flags & IORING_CHAN_POST_IDLE)
+ c = io_chan_find_idle(ctx);
+ else
+ c = xa_load(&ctx->xa_src_chan, icp->queue);
+
+ if (unlikely(!c || atomic_read(&c->flags) & RING_CHAN_DEAD)) {
+ ret = -ENXIO;
+ goto err;
+ }
+ /* ours must be the source end of the channel */
+ dst_ctx = rcu_dereference(c->dst_ring);
+ if (unlikely(!dst_ctx))
+ goto is_dead;
+ task = READ_ONCE(dst_ctx->submitter_task);
+ if (unlikely(!task)) {
+is_dead:
+ ret = -EOWNERDEAD;
+ goto err;
+ }
+
+ head = smp_load_acquire(&c->head);
+ tail = c->tail;
+ if (tail - head >= c->nentries) {
+ ret = -EXFULL;
+ goto err;
+ }
+ /* fill in entry */
+ e = &c->data[tail & c->mask];
+ icp->cqes->res = c->resp_id;
+ memcpy(e->cqes, icp->cqes, sizeof(icp->cqes));
+ smp_store_release(&c->tail, tail + 1);
+ io_chan_tw_queue(dst_ctx, c);
+ ret = 0;
+err:
+ rcu_read_unlock();
+ io_ring_submit_unlock(ctx, issue_flags);
+ if (ret < 0)
+ req_set_fail(req);
+ io_req_set_res(req, ret, 0);
+ return IOU_COMPLETE;
+}