Merge tag 'integrity-v6.4' of git://git.kernel.org/pub/scm/linux/kernel/git/zohar...
[linux-block.git] / io_uring / io_uring.c
index 4a865f0e85d0b8116c6ef3bd37b5f6db4af29835..3bca7a79efda4c40c7eb384a84582ec01bc5c3cb 100644 (file)
@@ -72,6 +72,7 @@
 #include <linux/io_uring.h>
 #include <linux/audit.h>
 #include <linux/security.h>
+#include <asm/shmparam.h>
 
 #define CREATE_TRACE_POINTS
 #include <trace/events/io_uring.h>
@@ -246,12 +247,12 @@ static __cold void io_fallback_req_func(struct work_struct *work)
                                                fallback_work.work);
        struct llist_node *node = llist_del_all(&ctx->fallback_llist);
        struct io_kiocb *req, *tmp;
-       bool locked = true;
+       struct io_tw_state ts = { .locked = true, };
 
        mutex_lock(&ctx->uring_lock);
        llist_for_each_entry_safe(req, tmp, node, io_task_work.node)
-               req->io_task_work.func(req, &locked);
-       if (WARN_ON_ONCE(!locked))
+               req->io_task_work.func(req, &ts);
+       if (WARN_ON_ONCE(!ts.locked))
                return;
        io_submit_flush_completions(ctx);
        mutex_unlock(&ctx->uring_lock);
@@ -309,13 +310,18 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
        INIT_LIST_HEAD(&ctx->sqd_list);
        INIT_LIST_HEAD(&ctx->cq_overflow_list);
        INIT_LIST_HEAD(&ctx->io_buffers_cache);
-       io_alloc_cache_init(&ctx->apoll_cache);
-       io_alloc_cache_init(&ctx->netmsg_cache);
+       io_alloc_cache_init(&ctx->rsrc_node_cache, IO_NODE_ALLOC_CACHE_MAX,
+                           sizeof(struct io_rsrc_node));
+       io_alloc_cache_init(&ctx->apoll_cache, IO_ALLOC_CACHE_MAX,
+                           sizeof(struct async_poll));
+       io_alloc_cache_init(&ctx->netmsg_cache, IO_ALLOC_CACHE_MAX,
+                           sizeof(struct io_async_msghdr));
        init_completion(&ctx->ref_comp);
        xa_init_flags(&ctx->personalities, XA_FLAGS_ALLOC1);
        mutex_init(&ctx->uring_lock);
        init_waitqueue_head(&ctx->cq_wait);
        init_waitqueue_head(&ctx->poll_wq);
+       init_waitqueue_head(&ctx->rsrc_quiesce_wq);
        spin_lock_init(&ctx->completion_lock);
        spin_lock_init(&ctx->timeout_lock);
        INIT_WQ_LIST(&ctx->iopoll_list);
@@ -324,11 +330,7 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
        INIT_LIST_HEAD(&ctx->defer_list);
        INIT_LIST_HEAD(&ctx->timeout_list);
        INIT_LIST_HEAD(&ctx->ltimeout_list);
-       spin_lock_init(&ctx->rsrc_ref_lock);
        INIT_LIST_HEAD(&ctx->rsrc_ref_list);
-       INIT_DELAYED_WORK(&ctx->rsrc_put_work, io_rsrc_put_work);
-       init_task_work(&ctx->rsrc_put_tw, io_rsrc_put_tw);
-       init_llist_head(&ctx->rsrc_put_llist);
        init_llist_head(&ctx->work_llist);
        INIT_LIST_HEAD(&ctx->tctx_list);
        ctx->submit_state.free_list.next = NULL;
@@ -424,8 +426,14 @@ static void io_prep_async_work(struct io_kiocb *req)
        if (req->file && !io_req_ffs_set(req))
                req->flags |= io_file_get_flags(req->file) << REQ_F_SUPPORT_NOWAIT_BIT;
 
-       if (req->flags & REQ_F_ISREG) {
-               if (def->hash_reg_file || (ctx->flags & IORING_SETUP_IOPOLL))
+       if (req->file && (req->flags & REQ_F_ISREG)) {
+               bool should_hash = def->hash_reg_file;
+
+               /* don't serialize this request if the fs doesn't need it */
+               if (should_hash && (req->file->f_flags & O_DIRECT) &&
+                   (req->file->f_mode & FMODE_DIO_PARALLEL_WRITE))
+                       should_hash = false;
+               if (should_hash || (ctx->flags & IORING_SETUP_IOPOLL))
                        io_wq_hash_work(&req->work, file_inode(req->file));
        } else if (!req->file || !S_ISBLK(file_inode(req->file)->i_mode)) {
                if (def->unbound_nonreg_file)
@@ -450,7 +458,7 @@ static void io_prep_async_link(struct io_kiocb *req)
        }
 }
 
-void io_queue_iowq(struct io_kiocb *req, bool *dont_use)
+void io_queue_iowq(struct io_kiocb *req, struct io_tw_state *ts_dont_use)
 {
        struct io_kiocb *link = io_prep_linked_timeout(req);
        struct io_uring_task *tctx = req->task->io_uring;
@@ -620,22 +628,22 @@ static inline void __io_cq_unlock_post(struct io_ring_ctx *ctx)
        io_cqring_wake(ctx);
 }
 
-static inline void __io_cq_unlock_post_flush(struct io_ring_ctx *ctx)
+static void __io_cq_unlock_post_flush(struct io_ring_ctx *ctx)
        __releases(ctx->completion_lock)
 {
        io_commit_cqring(ctx);
-       __io_cq_unlock(ctx);
-       io_commit_cqring_flush(ctx);
 
-       /*
-        * As ->task_complete implies that the ring is single tasked, cq_wait
-        * may only be waited on by the current in io_cqring_wait(), but since
-        * it will re-check the wakeup conditions once we return we can safely
-        * skip waking it up.
-        */
-       if (!(ctx->flags & IORING_SETUP_DEFER_TASKRUN)) {
-               smp_mb();
-               __io_cqring_wake(ctx);
+       if (ctx->task_complete) {
+               /*
+                * ->task_complete implies that only current might be waiting
+                * for CQEs, and obviously, we currently don't. No one is
+                * waiting, wakeups are futile, skip them.
+                */
+               io_commit_cqring_flush(ctx);
+       } else {
+               __io_cq_unlock(ctx);
+               io_commit_cqring_flush(ctx);
+               io_cqring_wake(ctx);
        }
 }
 
@@ -960,9 +968,10 @@ bool io_aux_cqe(struct io_ring_ctx *ctx, bool defer, u64 user_data, s32 res, u32
        return true;
 }
 
-static void __io_req_complete_post(struct io_kiocb *req)
+static void __io_req_complete_post(struct io_kiocb *req, unsigned issue_flags)
 {
        struct io_ring_ctx *ctx = req->ctx;
+       struct io_rsrc_node *rsrc_node = NULL;
 
        io_cq_lock(ctx);
        if (!(req->flags & REQ_F_CQE_SKIP))
@@ -983,7 +992,7 @@ static void __io_req_complete_post(struct io_kiocb *req)
                }
                io_put_kbuf_comp(req);
                io_dismantle_req(req);
-               io_req_put_rsrc(req);
+               rsrc_node = req->rsrc_node;
                /*
                 * Selected buffer deallocation in io_clean_op() assumes that
                 * we don't hold ->completion_lock. Clean them here to avoid
@@ -994,6 +1003,12 @@ static void __io_req_complete_post(struct io_kiocb *req)
                ctx->locked_free_nr++;
        }
        io_cq_unlock_post(ctx);
+
+       if (rsrc_node) {
+               io_ring_submit_lock(ctx, issue_flags);
+               io_put_rsrc_node(ctx, rsrc_node);
+               io_ring_submit_unlock(ctx, issue_flags);
+       }
 }
 
 void io_req_complete_post(struct io_kiocb *req, unsigned issue_flags)
@@ -1003,12 +1018,12 @@ void io_req_complete_post(struct io_kiocb *req, unsigned issue_flags)
                io_req_task_work_add(req);
        } else if (!(issue_flags & IO_URING_F_UNLOCKED) ||
                   !(req->ctx->flags & IORING_SETUP_IOPOLL)) {
-               __io_req_complete_post(req);
+               __io_req_complete_post(req, issue_flags);
        } else {
                struct io_ring_ctx *ctx = req->ctx;
 
                mutex_lock(&ctx->uring_lock);
-               __io_req_complete_post(req);
+               __io_req_complete_post(req, issue_flags & ~IO_URING_F_UNLOCKED);
                mutex_unlock(&ctx->uring_lock);
        }
 }
@@ -1106,11 +1121,14 @@ static inline void io_dismantle_req(struct io_kiocb *req)
                io_put_file(req->file);
 }
 
-__cold void io_free_req(struct io_kiocb *req)
+static __cold void io_free_req_tw(struct io_kiocb *req, struct io_tw_state *ts)
 {
        struct io_ring_ctx *ctx = req->ctx;
 
-       io_req_put_rsrc(req);
+       if (req->rsrc_node) {
+               io_tw_lock(ctx, ts);
+               io_put_rsrc_node(ctx, req->rsrc_node);
+       }
        io_dismantle_req(req);
        io_put_task_remote(req->task, 1);
 
@@ -1120,6 +1138,12 @@ __cold void io_free_req(struct io_kiocb *req)
        spin_unlock(&ctx->completion_lock);
 }
 
+__cold void io_free_req(struct io_kiocb *req)
+{
+       req->io_task_work.func = io_free_req_tw;
+       io_req_task_work_add(req);
+}
+
 static void __io_req_find_next_prep(struct io_kiocb *req)
 {
        struct io_ring_ctx *ctx = req->ctx;
@@ -1146,22 +1170,23 @@ static inline struct io_kiocb *io_req_find_next(struct io_kiocb *req)
        return nxt;
 }
 
-static void ctx_flush_and_put(struct io_ring_ctx *ctx, bool *locked)
+static void ctx_flush_and_put(struct io_ring_ctx *ctx, struct io_tw_state *ts)
 {
        if (!ctx)
                return;
        if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
                atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
-       if (*locked) {
+       if (ts->locked) {
                io_submit_flush_completions(ctx);
                mutex_unlock(&ctx->uring_lock);
-               *locked = false;
+               ts->locked = false;
        }
        percpu_ref_put(&ctx->refs);
 }
 
 static unsigned int handle_tw_list(struct llist_node *node,
-                                  struct io_ring_ctx **ctx, bool *locked,
+                                  struct io_ring_ctx **ctx,
+                                  struct io_tw_state *ts,
                                   struct llist_node *last)
 {
        unsigned int count = 0;
@@ -1174,18 +1199,17 @@ static unsigned int handle_tw_list(struct llist_node *node,
                prefetch(container_of(next, struct io_kiocb, io_task_work.node));
 
                if (req->ctx != *ctx) {
-                       ctx_flush_and_put(*ctx, locked);
+                       ctx_flush_and_put(*ctx, ts);
                        *ctx = req->ctx;
                        /* if not contended, grab and improve batching */
-                       *locked = mutex_trylock(&(*ctx)->uring_lock);
+                       ts->locked = mutex_trylock(&(*ctx)->uring_lock);
                        percpu_ref_get(&(*ctx)->refs);
-               } else if (!*locked)
-                       *locked = mutex_trylock(&(*ctx)->uring_lock);
-               req->io_task_work.func(req, locked);
+               }
+               req->io_task_work.func(req, ts);
                node = next;
                count++;
                if (unlikely(need_resched())) {
-                       ctx_flush_and_put(*ctx, locked);
+                       ctx_flush_and_put(*ctx, ts);
                        *ctx = NULL;
                        cond_resched();
                }
@@ -1226,7 +1250,7 @@ static inline struct llist_node *io_llist_cmpxchg(struct llist_head *head,
 
 void tctx_task_work(struct callback_head *cb)
 {
-       bool uring_locked = false;
+       struct io_tw_state ts = {};
        struct io_ring_ctx *ctx = NULL;
        struct io_uring_task *tctx = container_of(cb, struct io_uring_task,
                                                  task_work);
@@ -1243,12 +1267,12 @@ void tctx_task_work(struct callback_head *cb)
        do {
                loops++;
                node = io_llist_xchg(&tctx->task_list, &fake);
-               count += handle_tw_list(node, &ctx, &uring_locked, &fake);
+               count += handle_tw_list(node, &ctx, &ts, &fake);
 
                /* skip expensive cmpxchg if there are items in the list */
                if (READ_ONCE(tctx->task_list.first) != &fake)
                        continue;
-               if (uring_locked && !wq_list_empty(&ctx->submit_state.compl_reqs)) {
+               if (ts.locked && !wq_list_empty(&ctx->submit_state.compl_reqs)) {
                        io_submit_flush_completions(ctx);
                        if (READ_ONCE(tctx->task_list.first) != &fake)
                                continue;
@@ -1256,7 +1280,7 @@ void tctx_task_work(struct callback_head *cb)
                node = io_llist_cmpxchg(&tctx->task_list, &fake, NULL);
        } while (node != &fake);
 
-       ctx_flush_and_put(ctx, &uring_locked);
+       ctx_flush_and_put(ctx, &ts);
 
        /* relaxed read is enough as only the task itself sets ->in_cancel */
        if (unlikely(atomic_read(&tctx->in_cancel)))
@@ -1279,42 +1303,67 @@ static __cold void io_fallback_tw(struct io_uring_task *tctx)
        }
 }
 
-static void io_req_local_work_add(struct io_kiocb *req)
+static void io_req_local_work_add(struct io_kiocb *req, unsigned flags)
 {
        struct io_ring_ctx *ctx = req->ctx;
+       unsigned nr_wait, nr_tw, nr_tw_prev;
+       struct llist_node *first;
 
-       percpu_ref_get(&ctx->refs);
-
-       if (!llist_add(&req->io_task_work.node, &ctx->work_llist))
-               goto put_ref;
+       if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK))
+               flags &= ~IOU_F_TWQ_LAZY_WAKE;
 
-       /* needed for the following wake up */
-       smp_mb__after_atomic();
-
-       if (unlikely(atomic_read(&req->task->io_uring->in_cancel))) {
-               io_move_task_work_from_local(ctx);
-               goto put_ref;
+       first = READ_ONCE(ctx->work_llist.first);
+       do {
+               nr_tw_prev = 0;
+               if (first) {
+                       struct io_kiocb *first_req = container_of(first,
+                                                       struct io_kiocb,
+                                                       io_task_work.node);
+                       /*
+                        * Might be executed at any moment, rely on
+                        * SLAB_TYPESAFE_BY_RCU to keep it alive.
+                        */
+                       nr_tw_prev = READ_ONCE(first_req->nr_tw);
+               }
+               nr_tw = nr_tw_prev + 1;
+               /* Large enough to fail the nr_wait comparison below */
+               if (!(flags & IOU_F_TWQ_LAZY_WAKE))
+                       nr_tw = -1U;
+
+               req->nr_tw = nr_tw;
+               req->io_task_work.node.next = first;
+       } while (!try_cmpxchg(&ctx->work_llist.first, &first,
+                             &req->io_task_work.node));
+
+       if (!first) {
+               if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
+                       atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
+               if (ctx->has_evfd)
+                       io_eventfd_signal(ctx);
        }
 
-       if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
-               atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
-       if (ctx->has_evfd)
-               io_eventfd_signal(ctx);
-
-       if (READ_ONCE(ctx->cq_waiting))
-               wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE);
-
-put_ref:
-       percpu_ref_put(&ctx->refs);
+       nr_wait = atomic_read(&ctx->cq_wait_nr);
+       /* no one is waiting */
+       if (!nr_wait)
+               return;
+       /* either not enough or the previous add has already woken it up */
+       if (nr_wait > nr_tw || nr_tw_prev >= nr_wait)
+               return;
+       /* pairs with set_current_state() in io_cqring_wait() */
+       smp_mb__after_atomic();
+       wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE);
 }
 
-void __io_req_task_work_add(struct io_kiocb *req, bool allow_local)
+void __io_req_task_work_add(struct io_kiocb *req, unsigned flags)
 {
        struct io_uring_task *tctx = req->task->io_uring;
        struct io_ring_ctx *ctx = req->ctx;
 
-       if (allow_local && ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
-               io_req_local_work_add(req);
+       if (!(flags & IOU_F_TWQ_FORCE_NORMAL) &&
+           (ctx->flags & IORING_SETUP_DEFER_TASKRUN)) {
+               rcu_read_lock();
+               io_req_local_work_add(req, flags);
+               rcu_read_unlock();
                return;
        }
 
@@ -1341,11 +1390,11 @@ static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx)
                                                    io_task_work.node);
 
                node = node->next;
-               __io_req_task_work_add(req, false);
+               __io_req_task_work_add(req, IOU_F_TWQ_FORCE_NORMAL);
        }
 }
 
-static int __io_run_local_work(struct io_ring_ctx *ctx, bool *locked)
+static int __io_run_local_work(struct io_ring_ctx *ctx, struct io_tw_state *ts)
 {
        struct llist_node *node;
        unsigned int loops = 0;
@@ -1362,7 +1411,7 @@ again:
                struct io_kiocb *req = container_of(node, struct io_kiocb,
                                                    io_task_work.node);
                prefetch(container_of(next, struct io_kiocb, io_task_work.node));
-               req->io_task_work.func(req, locked);
+               req->io_task_work.func(req, ts);
                ret++;
                node = next;
        }
@@ -1370,7 +1419,7 @@ again:
 
        if (!llist_empty(&ctx->work_llist))
                goto again;
-       if (*locked) {
+       if (ts->locked) {
                io_submit_flush_completions(ctx);
                if (!llist_empty(&ctx->work_llist))
                        goto again;
@@ -1381,46 +1430,46 @@ again:
 
 static inline int io_run_local_work_locked(struct io_ring_ctx *ctx)
 {
-       bool locked;
+       struct io_tw_state ts = { .locked = true, };
        int ret;
 
        if (llist_empty(&ctx->work_llist))
                return 0;
 
-       locked = true;
-       ret = __io_run_local_work(ctx, &locked);
+       ret = __io_run_local_work(ctx, &ts);
        /* shouldn't happen! */
-       if (WARN_ON_ONCE(!locked))
+       if (WARN_ON_ONCE(!ts.locked))
                mutex_lock(&ctx->uring_lock);
        return ret;
 }
 
 static int io_run_local_work(struct io_ring_ctx *ctx)
 {
-       bool locked = mutex_trylock(&ctx->uring_lock);
+       struct io_tw_state ts = {};
        int ret;
 
-       ret = __io_run_local_work(ctx, &locked);
-       if (locked)
+       ts.locked = mutex_trylock(&ctx->uring_lock);
+       ret = __io_run_local_work(ctx, &ts);
+       if (ts.locked)
                mutex_unlock(&ctx->uring_lock);
 
        return ret;
 }
 
-static void io_req_task_cancel(struct io_kiocb *req, bool *locked)
+static void io_req_task_cancel(struct io_kiocb *req, struct io_tw_state *ts)
 {
-       io_tw_lock(req->ctx, locked);
+       io_tw_lock(req->ctx, ts);
        io_req_defer_failed(req, req->cqe.res);
 }
 
-void io_req_task_submit(struct io_kiocb *req, bool *locked)
+void io_req_task_submit(struct io_kiocb *req, struct io_tw_state *ts)
 {
-       io_tw_lock(req->ctx, locked);
+       io_tw_lock(req->ctx, ts);
        /* req->task == current here, checking PF_EXITING is safe */
        if (unlikely(req->task->flags & PF_EXITING))
                io_req_defer_failed(req, -EFAULT);
        else if (req->flags & REQ_F_FORCE_ASYNC)
-               io_queue_iowq(req, locked);
+               io_queue_iowq(req, ts);
        else
                io_queue_sqe(req);
 }
@@ -1646,9 +1695,9 @@ static int io_iopoll_check(struct io_ring_ctx *ctx, long min)
        return ret;
 }
 
-void io_req_task_complete(struct io_kiocb *req, bool *locked)
+void io_req_task_complete(struct io_kiocb *req, struct io_tw_state *ts)
 {
-       if (*locked)
+       if (ts->locked)
                io_req_complete_defer(req);
        else
                io_req_complete_post(req, IO_URING_F_UNLOCKED);
@@ -1927,9 +1976,9 @@ static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags)
        return 0;
 }
 
-int io_poll_issue(struct io_kiocb *req, bool *locked)
+int io_poll_issue(struct io_kiocb *req, struct io_tw_state *ts)
 {
-       io_tw_lock(req->ctx, locked);
+       io_tw_lock(req->ctx, ts);
        return io_issue_sqe(req, IO_URING_F_NONBLOCK|IO_URING_F_MULTISHOT|
                                 IO_URING_F_COMPLETE_DEFER);
 }
@@ -2298,8 +2347,7 @@ static inline int io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
        if (unlikely(ret))
                return io_submit_fail_init(sqe, req, ret);
 
-       /* don't need @sqe from now on */
-       trace_io_uring_submit_sqe(req, true);
+       trace_io_uring_submit_req(req);
 
        /*
         * If we already have a head request, queue this one for async
@@ -2428,7 +2476,7 @@ int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr)
        if (unlikely(!entries))
                return 0;
        /* make sure SQ entry isn't read before tail */
-       ret = left = min3(nr, ctx->sq_entries, entries);
+       ret = left = min(nr, entries);
        io_get_task_refs(left);
        io_submit_state_start(&ctx->submit_state, left);
 
@@ -2600,7 +2648,9 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
                unsigned long check_cq;
 
                if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
-                       WRITE_ONCE(ctx->cq_waiting, 1);
+                       int nr_wait = (int) iowq.cq_tail - READ_ONCE(ctx->rings->cq.tail);
+
+                       atomic_set(&ctx->cq_wait_nr, nr_wait);
                        set_current_state(TASK_INTERRUPTIBLE);
                } else {
                        prepare_to_wait_exclusive(&ctx->cq_wait, &iowq.wq,
@@ -2609,7 +2659,7 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
 
                ret = io_cqring_wait_schedule(ctx, &iowq);
                __set_current_state(TASK_RUNNING);
-               WRITE_ONCE(ctx->cq_waiting, 0);
+               atomic_set(&ctx->cq_wait_nr, 0);
 
                if (ret < 0)
                        break;
@@ -2772,13 +2822,17 @@ static void io_req_caches_free(struct io_ring_ctx *ctx)
        mutex_unlock(&ctx->uring_lock);
 }
 
+static void io_rsrc_node_cache_free(struct io_cache_entry *entry)
+{
+       kfree(container_of(entry, struct io_rsrc_node, cache));
+}
+
 static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
 {
        io_sq_thread_finish(ctx);
-       io_rsrc_refs_drop(ctx);
        /* __io_rsrc_put_work() may need uring_lock to progress, wait w/o it */
-       io_wait_rsrc_data(ctx->buf_data);
-       io_wait_rsrc_data(ctx->file_data);
+       if (WARN_ON_ONCE(!list_empty(&ctx->rsrc_ref_list)))
+               return;
 
        mutex_lock(&ctx->uring_lock);
        if (ctx->buf_data)
@@ -2798,14 +2852,9 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
 
        /* there are no registered resources left, nobody uses it */
        if (ctx->rsrc_node)
-               io_rsrc_node_destroy(ctx->rsrc_node);
-       if (ctx->rsrc_backup_node)
-               io_rsrc_node_destroy(ctx->rsrc_backup_node);
-       flush_delayed_work(&ctx->rsrc_put_work);
-       flush_delayed_work(&ctx->fallback_work);
+               io_rsrc_node_destroy(ctx, ctx->rsrc_node);
 
        WARN_ON_ONCE(!list_empty(&ctx->rsrc_ref_list));
-       WARN_ON_ONCE(!llist_empty(&ctx->rsrc_put_llist));
 
 #if defined(CONFIG_UNIX)
        if (ctx->ring_sock) {
@@ -2815,6 +2864,7 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
 #endif
        WARN_ON_ONCE(!list_empty(&ctx->ltimeout_list));
 
+       io_alloc_cache_free(&ctx->rsrc_node_cache, io_rsrc_node_cache_free);
        if (ctx->mm_account) {
                mmdrop(ctx->mm_account);
                ctx->mm_account = NULL;
@@ -3031,6 +3081,10 @@ static __cold void io_ring_exit_work(struct work_struct *work)
        spin_lock(&ctx->completion_lock);
        spin_unlock(&ctx->completion_lock);
 
+       /* pairs with RCU read section in io_req_local_work_add() */
+       if (ctx->flags & IORING_SETUP_DEFER_TASKRUN)
+               synchronize_rcu();
+
        io_ring_ctx_free(ctx);
 }
 
@@ -3146,6 +3200,12 @@ static __cold bool io_uring_try_cancel_requests(struct io_ring_ctx *ctx,
        enum io_wq_cancel cret;
        bool ret = false;
 
+       /* set it so io_req_local_work_add() would wake us up */
+       if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
+               atomic_set(&ctx->cq_wait_nr, 1);
+               smp_mb();
+       }
+
        /* failed during ring init, it couldn't have issued any requests */
        if (!ctx->rings)
                return false;
@@ -3200,6 +3260,8 @@ __cold void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd)
 {
        struct io_uring_task *tctx = current->io_uring;
        struct io_ring_ctx *ctx;
+       struct io_tctx_node *node;
+       unsigned long index;
        s64 inflight;
        DEFINE_WAIT(wait);
 
@@ -3221,9 +3283,6 @@ __cold void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd)
                        break;
 
                if (!sqd) {
-                       struct io_tctx_node *node;
-                       unsigned long index;
-
                        xa_for_each(&tctx->xa, index, node) {
                                /* sqpoll task will cancel all its requests */
                                if (node->ctx->sq_data)
@@ -3246,7 +3305,13 @@ __cold void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd)
                prepare_to_wait(&tctx->wait, &wait, TASK_INTERRUPTIBLE);
                io_run_task_work();
                io_uring_drop_tctx_refs(current);
-
+               xa_for_each(&tctx->xa, index, node) {
+                       if (!llist_empty(&node->ctx->work_llist)) {
+                               WARN_ON_ONCE(node->ctx->submitter_task &&
+                                            node->ctx->submitter_task != current);
+                               goto end_wait;
+                       }
+               }
                /*
                 * If we've seen completions, retry without waiting. This
                 * avoids a race where a completion comes in before we did
@@ -3254,6 +3319,7 @@ __cold void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd)
                 */
                if (inflight == tctx_inflight(tctx, !cancel_all))
                        schedule();
+end_wait:
                finish_wait(&tctx->wait, &wait);
        } while (1);
 
@@ -3282,7 +3348,7 @@ static void *io_uring_validate_mmap_request(struct file *file,
        struct page *page;
        void *ptr;
 
-       switch (offset) {
+       switch (offset & IORING_OFF_MMAP_MASK) {
        case IORING_OFF_SQ_RING:
        case IORING_OFF_CQ_RING:
                ptr = ctx->rings;
@@ -3290,6 +3356,17 @@ static void *io_uring_validate_mmap_request(struct file *file,
        case IORING_OFF_SQES:
                ptr = ctx->sq_sqes;
                break;
+       case IORING_OFF_PBUF_RING: {
+               unsigned int bgid;
+
+               bgid = (offset & ~IORING_OFF_MMAP_MASK) >> IORING_OFF_PBUF_SHIFT;
+               mutex_lock(&ctx->uring_lock);
+               ptr = io_pbuf_get_address(ctx, bgid);
+               mutex_unlock(&ctx->uring_lock);
+               if (!ptr)
+                       return ERR_PTR(-EINVAL);
+               break;
+               }
        default:
                return ERR_PTR(-EINVAL);
        }
@@ -3317,6 +3394,54 @@ static __cold int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
        return remap_pfn_range(vma, vma->vm_start, pfn, sz, vma->vm_page_prot);
 }
 
+static unsigned long io_uring_mmu_get_unmapped_area(struct file *filp,
+                       unsigned long addr, unsigned long len,
+                       unsigned long pgoff, unsigned long flags)
+{
+       const unsigned long mmap_end = arch_get_mmap_end(addr, len, flags);
+       struct vm_unmapped_area_info info;
+       void *ptr;
+
+       /*
+        * Do not allow to map to user-provided address to avoid breaking the
+        * aliasing rules. Userspace is not able to guess the offset address of
+        * kernel kmalloc()ed memory area.
+        */
+       if (addr)
+               return -EINVAL;
+
+       ptr = io_uring_validate_mmap_request(filp, pgoff, len);
+       if (IS_ERR(ptr))
+               return -ENOMEM;
+
+       info.flags = VM_UNMAPPED_AREA_TOPDOWN;
+       info.length = len;
+       info.low_limit = max(PAGE_SIZE, mmap_min_addr);
+       info.high_limit = arch_get_mmap_base(addr, current->mm->mmap_base);
+#ifdef SHM_COLOUR
+       info.align_mask = PAGE_MASK & (SHM_COLOUR - 1UL);
+#else
+       info.align_mask = PAGE_MASK & (SHMLBA - 1UL);
+#endif
+       info.align_offset = (unsigned long) ptr;
+
+       /*
+        * A failed mmap() very likely causes application failure,
+        * so fall back to the bottom-up function here. This scenario
+        * can happen with large stack limits and large mmap()
+        * allocations.
+        */
+       addr = vm_unmapped_area(&info);
+       if (offset_in_page(addr)) {
+               info.flags = 0;
+               info.low_limit = TASK_UNMAPPED_BASE;
+               info.high_limit = mmap_end;
+               addr = vm_unmapped_area(&info);
+       }
+
+       return addr;
+}
+
 #else /* !CONFIG_MMU */
 
 static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
@@ -3529,6 +3654,8 @@ static const struct file_operations io_uring_fops = {
 #ifndef CONFIG_MMU
        .get_unmapped_area = io_uring_nommu_get_unmapped_area,
        .mmap_capabilities = io_uring_nommu_mmap_capabilities,
+#else
+       .get_unmapped_area = io_uring_mmu_get_unmapped_area,
 #endif
        .poll           = io_uring_poll,
 #ifdef CONFIG_PROC_FS
@@ -3755,11 +3882,10 @@ static __cold int io_uring_create(unsigned entries, struct io_uring_params *p,
        ret = io_sq_offload_create(ctx, p);
        if (ret)
                goto err;
-       /* always set a rsrc node */
-       ret = io_rsrc_node_switch_start(ctx);
+
+       ret = io_rsrc_init(ctx);
        if (ret)
                goto err;
-       io_rsrc_node_switch(ctx, NULL);
 
        memset(&p->sq_off, 0, sizeof(p->sq_off));
        p->sq_off.head = offsetof(struct io_rings, sq.head);
@@ -4425,7 +4551,7 @@ static int __init io_uring_init(void)
        io_uring_optable_init();
 
        req_cachep = KMEM_CACHE(io_kiocb, SLAB_HWCACHE_ALIGN | SLAB_PANIC |
-                               SLAB_ACCOUNT);
+                               SLAB_ACCOUNT | SLAB_TYPESAFE_BY_RCU);
        return 0;
 };
 __initcall(io_uring_init);