Merge tag 'for-6.4/io_uring-2023-04-21' of git://git.kernel.dk/linux
authorLinus Torvalds <torvalds@linux-foundation.org>
Wed, 26 Apr 2023 19:40:31 +0000 (12:40 -0700)
committerLinus Torvalds <torvalds@linux-foundation.org>
Wed, 26 Apr 2023 19:40:31 +0000 (12:40 -0700)
Pull io_uring updates from Jens Axboe:

 - Cleanup of the io-wq per-node mapping, notably getting rid of it so
   we just have a single io_wq entry per ring (Breno)

 - Followup to the above, move accounting to io_wq as well and
   completely drop struct io_wqe (Gabriel)

 - Enable KASAN for the internal io_uring caches (Breno)

 - Add support for multishot timeouts. Some applications use timeouts to
   wake someone waiting on completion entries, and this makes it a bit
   easier to just have a recurring timer rather than needing to rearm it
   every time (David)

 - Support archs that have shared cache coloring between userspace and
   the kernel, and hence have strict address requirements for mmap'ing
   the ring into userspace. This should only be parisc/hppa. (Helge, me)

 - XFS has supported O_DIRECT writes without needing to lock the inode
   exclusively for a long time, and ext4 now supports it as well. This
   is true for the common cases of not extending the file size. Flag the
   fs as having that feature, and utilize that to avoid serializing
   those writes in io_uring (me)

 - Enable completion batching for uring commands (me)

 - Revert patch adding io_uring restriction to what can be GUP mapped or
   not. This does not belong in io_uring, as io_uring isn't really
   special in this regard. Since this is also getting in the way of
   cleanups and improvements to the GUP code, get rid of if (me)

 - A few series greatly reducing the complexity of registered resources,
   like buffers or files. Not only does this clean up the code a lot,
   the simplified code is also a LOT more efficient (Pavel)

 - Series optimizing how we wait for events and run task_work related to
   it (Pavel)

 - Fixes for file/buffer unregistration with DEFER_TASKRUN (Pavel)

 - Misc cleanups and improvements (Pavel, me)

* tag 'for-6.4/io_uring-2023-04-21' of git://git.kernel.dk/linux: (71 commits)
  Revert "io_uring/rsrc: disallow multi-source reg buffers"
  io_uring: add support for multishot timeouts
  io_uring/rsrc: disassociate nodes and rsrc_data
  io_uring/rsrc: devirtualise rsrc put callbacks
  io_uring/rsrc: pass node to io_rsrc_put_work()
  io_uring/rsrc: inline io_rsrc_put_work()
  io_uring/rsrc: add empty flag in rsrc_node
  io_uring/rsrc: merge nodes and io_rsrc_put
  io_uring/rsrc: infer node from ctx on io_queue_rsrc_removal
  io_uring/rsrc: remove unused io_rsrc_node::llist
  io_uring/rsrc: refactor io_queue_rsrc_removal
  io_uring/rsrc: simplify single file node switching
  io_uring/rsrc: clean up __io_sqe_buffers_update()
  io_uring/rsrc: inline switch_start fast path
  io_uring/rsrc: remove rsrc_data refs
  io_uring/rsrc: fix DEFER_TASKRUN rsrc quiesce
  io_uring/rsrc: use wq for quiescing
  io_uring/rsrc: refactor io_rsrc_ref_quiesce
  io_uring/rsrc: remove io_rsrc_node::done
  io_uring/rsrc: use nospec'ed indexes
  ...

1  2 
include/linux/fs.h
io_uring/io_uring.c
io_uring/kbuf.c
io_uring/rw.c

diff --combined include/linux/fs.h
index ef2281a2accef88e1326ee4f8b1d970792288254,475d88640d3d3e2e48f412a7a91bce98c53c67ec..67495ef79bb2bc79166fc7e413505a5f112d00b2
@@@ -168,6 -168,9 +168,9 @@@ typedef int (dio_iodone_t)(struct kioc
  
  #define       FMODE_NOREUSE           ((__force fmode_t)0x800000)
  
+ /* File supports non-exclusive O_DIRECT writes from multiple threads */
+ #define FMODE_DIO_PARALLEL_WRITE      ((__force fmode_t)0x1000000)
  /* File was opened by fanotify and shouldn't generate fanotify events */
  #define FMODE_NONOTIFY                ((__force fmode_t)0x4000000)
  
@@@ -2675,8 -2678,6 +2678,8 @@@ extern struct inode *new_inode(struct s
  extern void free_inode_nonrcu(struct inode *inode);
  extern int setattr_should_drop_suidgid(struct mnt_idmap *, struct inode *);
  extern int file_remove_privs(struct file *);
 +int setattr_should_drop_sgid(struct mnt_idmap *idmap,
 +                           const struct inode *inode);
  
  /*
   * This must be used for allocating filesystems specific inodes to set
@@@ -2780,7 -2781,7 +2783,7 @@@ enum 
  ssize_t __blockdev_direct_IO(struct kiocb *iocb, struct inode *inode,
                             struct block_device *bdev, struct iov_iter *iter,
                             get_block_t get_block,
 -                           dio_iodone_t end_io, dio_submit_t submit_io,
 +                           dio_iodone_t end_io,
                             int flags);
  
  static inline ssize_t blockdev_direct_IO(struct kiocb *iocb,
                                         get_block_t get_block)
  {
        return __blockdev_direct_IO(iocb, inode, inode->i_sb->s_bdev, iter,
 -                      get_block, NULL, NULL, DIO_LOCKING | DIO_SKIP_HOLES);
 +                      get_block, NULL, DIO_LOCKING | DIO_SKIP_HOLES);
  }
  #endif
  
diff --combined io_uring/io_uring.c
index 4a865f0e85d0b8116c6ef3bd37b5f6db4af29835,3d43df8f1e4e9a07aef7eece00812d47cb69847f..3bca7a79efda4c40c7eb384a84582ec01bc5c3cb
@@@ -72,6 -72,7 +72,7 @@@
  #include <linux/io_uring.h>
  #include <linux/audit.h>
  #include <linux/security.h>
+ #include <asm/shmparam.h>
  
  #define CREATE_TRACE_POINTS
  #include <trace/events/io_uring.h>
@@@ -246,12 -247,12 +247,12 @@@ static __cold void io_fallback_req_func
                                                fallback_work.work);
        struct llist_node *node = llist_del_all(&ctx->fallback_llist);
        struct io_kiocb *req, *tmp;
-       bool locked = true;
+       struct io_tw_state ts = { .locked = true, };
  
        mutex_lock(&ctx->uring_lock);
        llist_for_each_entry_safe(req, tmp, node, io_task_work.node)
-               req->io_task_work.func(req, &locked);
-       if (WARN_ON_ONCE(!locked))
+               req->io_task_work.func(req, &ts);
+       if (WARN_ON_ONCE(!ts.locked))
                return;
        io_submit_flush_completions(ctx);
        mutex_unlock(&ctx->uring_lock);
@@@ -309,13 -310,18 +310,18 @@@ static __cold struct io_ring_ctx *io_ri
        INIT_LIST_HEAD(&ctx->sqd_list);
        INIT_LIST_HEAD(&ctx->cq_overflow_list);
        INIT_LIST_HEAD(&ctx->io_buffers_cache);
-       io_alloc_cache_init(&ctx->apoll_cache);
-       io_alloc_cache_init(&ctx->netmsg_cache);
+       io_alloc_cache_init(&ctx->rsrc_node_cache, IO_NODE_ALLOC_CACHE_MAX,
+                           sizeof(struct io_rsrc_node));
+       io_alloc_cache_init(&ctx->apoll_cache, IO_ALLOC_CACHE_MAX,
+                           sizeof(struct async_poll));
+       io_alloc_cache_init(&ctx->netmsg_cache, IO_ALLOC_CACHE_MAX,
+                           sizeof(struct io_async_msghdr));
        init_completion(&ctx->ref_comp);
        xa_init_flags(&ctx->personalities, XA_FLAGS_ALLOC1);
        mutex_init(&ctx->uring_lock);
        init_waitqueue_head(&ctx->cq_wait);
        init_waitqueue_head(&ctx->poll_wq);
+       init_waitqueue_head(&ctx->rsrc_quiesce_wq);
        spin_lock_init(&ctx->completion_lock);
        spin_lock_init(&ctx->timeout_lock);
        INIT_WQ_LIST(&ctx->iopoll_list);
        INIT_LIST_HEAD(&ctx->defer_list);
        INIT_LIST_HEAD(&ctx->timeout_list);
        INIT_LIST_HEAD(&ctx->ltimeout_list);
-       spin_lock_init(&ctx->rsrc_ref_lock);
        INIT_LIST_HEAD(&ctx->rsrc_ref_list);
-       INIT_DELAYED_WORK(&ctx->rsrc_put_work, io_rsrc_put_work);
-       init_task_work(&ctx->rsrc_put_tw, io_rsrc_put_tw);
-       init_llist_head(&ctx->rsrc_put_llist);
        init_llist_head(&ctx->work_llist);
        INIT_LIST_HEAD(&ctx->tctx_list);
        ctx->submit_state.free_list.next = NULL;
@@@ -424,8 -426,14 +426,14 @@@ static void io_prep_async_work(struct i
        if (req->file && !io_req_ffs_set(req))
                req->flags |= io_file_get_flags(req->file) << REQ_F_SUPPORT_NOWAIT_BIT;
  
-       if (req->flags & REQ_F_ISREG) {
-               if (def->hash_reg_file || (ctx->flags & IORING_SETUP_IOPOLL))
+       if (req->file && (req->flags & REQ_F_ISREG)) {
+               bool should_hash = def->hash_reg_file;
+               /* don't serialize this request if the fs doesn't need it */
+               if (should_hash && (req->file->f_flags & O_DIRECT) &&
+                   (req->file->f_mode & FMODE_DIO_PARALLEL_WRITE))
+                       should_hash = false;
+               if (should_hash || (ctx->flags & IORING_SETUP_IOPOLL))
                        io_wq_hash_work(&req->work, file_inode(req->file));
        } else if (!req->file || !S_ISBLK(file_inode(req->file)->i_mode)) {
                if (def->unbound_nonreg_file)
@@@ -450,7 -458,7 +458,7 @@@ static void io_prep_async_link(struct i
        }
  }
  
- void io_queue_iowq(struct io_kiocb *req, bool *dont_use)
+ void io_queue_iowq(struct io_kiocb *req, struct io_tw_state *ts_dont_use)
  {
        struct io_kiocb *link = io_prep_linked_timeout(req);
        struct io_uring_task *tctx = req->task->io_uring;
@@@ -620,22 -628,22 +628,22 @@@ static inline void __io_cq_unlock_post(
        io_cqring_wake(ctx);
  }
  
- static inline void __io_cq_unlock_post_flush(struct io_ring_ctx *ctx)
+ static void __io_cq_unlock_post_flush(struct io_ring_ctx *ctx)
        __releases(ctx->completion_lock)
  {
        io_commit_cqring(ctx);
-       __io_cq_unlock(ctx);
-       io_commit_cqring_flush(ctx);
  
-       /*
-        * As ->task_complete implies that the ring is single tasked, cq_wait
-        * may only be waited on by the current in io_cqring_wait(), but since
-        * it will re-check the wakeup conditions once we return we can safely
-        * skip waking it up.
-        */
-       if (!(ctx->flags & IORING_SETUP_DEFER_TASKRUN)) {
-               smp_mb();
-               __io_cqring_wake(ctx);
+       if (ctx->task_complete) {
+               /*
+                * ->task_complete implies that only current might be waiting
+                * for CQEs, and obviously, we currently don't. No one is
+                * waiting, wakeups are futile, skip them.
+                */
+               io_commit_cqring_flush(ctx);
+       } else {
+               __io_cq_unlock(ctx);
+               io_commit_cqring_flush(ctx);
+               io_cqring_wake(ctx);
        }
  }
  
@@@ -960,9 -968,10 +968,10 @@@ bool io_aux_cqe(struct io_ring_ctx *ctx
        return true;
  }
  
- static void __io_req_complete_post(struct io_kiocb *req)
+ static void __io_req_complete_post(struct io_kiocb *req, unsigned issue_flags)
  {
        struct io_ring_ctx *ctx = req->ctx;
+       struct io_rsrc_node *rsrc_node = NULL;
  
        io_cq_lock(ctx);
        if (!(req->flags & REQ_F_CQE_SKIP))
                }
                io_put_kbuf_comp(req);
                io_dismantle_req(req);
-               io_req_put_rsrc(req);
+               rsrc_node = req->rsrc_node;
                /*
                 * Selected buffer deallocation in io_clean_op() assumes that
                 * we don't hold ->completion_lock. Clean them here to avoid
                ctx->locked_free_nr++;
        }
        io_cq_unlock_post(ctx);
+       if (rsrc_node) {
+               io_ring_submit_lock(ctx, issue_flags);
+               io_put_rsrc_node(ctx, rsrc_node);
+               io_ring_submit_unlock(ctx, issue_flags);
+       }
  }
  
  void io_req_complete_post(struct io_kiocb *req, unsigned issue_flags)
  {
 -      if (req->ctx->task_complete && (issue_flags & IO_URING_F_IOWQ)) {
 +      if (req->ctx->task_complete && req->ctx->submitter_task != current) {
                req->io_task_work.func = io_req_task_complete;
                io_req_task_work_add(req);
        } else if (!(issue_flags & IO_URING_F_UNLOCKED) ||
                   !(req->ctx->flags & IORING_SETUP_IOPOLL)) {
-               __io_req_complete_post(req);
+               __io_req_complete_post(req, issue_flags);
        } else {
                struct io_ring_ctx *ctx = req->ctx;
  
                mutex_lock(&ctx->uring_lock);
-               __io_req_complete_post(req);
+               __io_req_complete_post(req, issue_flags & ~IO_URING_F_UNLOCKED);
                mutex_unlock(&ctx->uring_lock);
        }
  }
@@@ -1106,11 -1121,14 +1121,14 @@@ static inline void io_dismantle_req(str
                io_put_file(req->file);
  }
  
__cold void io_free_req(struct io_kiocb *req)
static __cold void io_free_req_tw(struct io_kiocb *req, struct io_tw_state *ts)
  {
        struct io_ring_ctx *ctx = req->ctx;
  
-       io_req_put_rsrc(req);
+       if (req->rsrc_node) {
+               io_tw_lock(ctx, ts);
+               io_put_rsrc_node(ctx, req->rsrc_node);
+       }
        io_dismantle_req(req);
        io_put_task_remote(req->task, 1);
  
        spin_unlock(&ctx->completion_lock);
  }
  
+ __cold void io_free_req(struct io_kiocb *req)
+ {
+       req->io_task_work.func = io_free_req_tw;
+       io_req_task_work_add(req);
+ }
  static void __io_req_find_next_prep(struct io_kiocb *req)
  {
        struct io_ring_ctx *ctx = req->ctx;
@@@ -1146,22 -1170,23 +1170,23 @@@ static inline struct io_kiocb *io_req_f
        return nxt;
  }
  
- static void ctx_flush_and_put(struct io_ring_ctx *ctx, bool *locked)
+ static void ctx_flush_and_put(struct io_ring_ctx *ctx, struct io_tw_state *ts)
  {
        if (!ctx)
                return;
        if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
                atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
-       if (*locked) {
+       if (ts->locked) {
                io_submit_flush_completions(ctx);
                mutex_unlock(&ctx->uring_lock);
-               *locked = false;
+               ts->locked = false;
        }
        percpu_ref_put(&ctx->refs);
  }
  
  static unsigned int handle_tw_list(struct llist_node *node,
-                                  struct io_ring_ctx **ctx, bool *locked,
+                                  struct io_ring_ctx **ctx,
+                                  struct io_tw_state *ts,
                                   struct llist_node *last)
  {
        unsigned int count = 0;
                prefetch(container_of(next, struct io_kiocb, io_task_work.node));
  
                if (req->ctx != *ctx) {
-                       ctx_flush_and_put(*ctx, locked);
+                       ctx_flush_and_put(*ctx, ts);
                        *ctx = req->ctx;
                        /* if not contended, grab and improve batching */
-                       *locked = mutex_trylock(&(*ctx)->uring_lock);
+                       ts->locked = mutex_trylock(&(*ctx)->uring_lock);
                        percpu_ref_get(&(*ctx)->refs);
-               } else if (!*locked)
-                       *locked = mutex_trylock(&(*ctx)->uring_lock);
-               req->io_task_work.func(req, locked);
+               }
+               req->io_task_work.func(req, ts);
                node = next;
                count++;
                if (unlikely(need_resched())) {
-                       ctx_flush_and_put(*ctx, locked);
+                       ctx_flush_and_put(*ctx, ts);
                        *ctx = NULL;
                        cond_resched();
                }
@@@ -1226,7 -1250,7 +1250,7 @@@ static inline struct llist_node *io_lli
  
  void tctx_task_work(struct callback_head *cb)
  {
-       bool uring_locked = false;
+       struct io_tw_state ts = {};
        struct io_ring_ctx *ctx = NULL;
        struct io_uring_task *tctx = container_of(cb, struct io_uring_task,
                                                  task_work);
        do {
                loops++;
                node = io_llist_xchg(&tctx->task_list, &fake);
-               count += handle_tw_list(node, &ctx, &uring_locked, &fake);
+               count += handle_tw_list(node, &ctx, &ts, &fake);
  
                /* skip expensive cmpxchg if there are items in the list */
                if (READ_ONCE(tctx->task_list.first) != &fake)
                        continue;
-               if (uring_locked && !wq_list_empty(&ctx->submit_state.compl_reqs)) {
+               if (ts.locked && !wq_list_empty(&ctx->submit_state.compl_reqs)) {
                        io_submit_flush_completions(ctx);
                        if (READ_ONCE(tctx->task_list.first) != &fake)
                                continue;
                node = io_llist_cmpxchg(&tctx->task_list, &fake, NULL);
        } while (node != &fake);
  
-       ctx_flush_and_put(ctx, &uring_locked);
+       ctx_flush_and_put(ctx, &ts);
  
        /* relaxed read is enough as only the task itself sets ->in_cancel */
        if (unlikely(atomic_read(&tctx->in_cancel)))
@@@ -1279,42 -1303,67 +1303,67 @@@ static __cold void io_fallback_tw(struc
        }
  }
  
- static void io_req_local_work_add(struct io_kiocb *req)
+ static void io_req_local_work_add(struct io_kiocb *req, unsigned flags)
  {
        struct io_ring_ctx *ctx = req->ctx;
+       unsigned nr_wait, nr_tw, nr_tw_prev;
+       struct llist_node *first;
  
-       percpu_ref_get(&ctx->refs);
-       if (!llist_add(&req->io_task_work.node, &ctx->work_llist))
-               goto put_ref;
+       if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK))
+               flags &= ~IOU_F_TWQ_LAZY_WAKE;
  
-       /* needed for the following wake up */
-       smp_mb__after_atomic();
-       if (unlikely(atomic_read(&req->task->io_uring->in_cancel))) {
-               io_move_task_work_from_local(ctx);
-               goto put_ref;
+       first = READ_ONCE(ctx->work_llist.first);
+       do {
+               nr_tw_prev = 0;
+               if (first) {
+                       struct io_kiocb *first_req = container_of(first,
+                                                       struct io_kiocb,
+                                                       io_task_work.node);
+                       /*
+                        * Might be executed at any moment, rely on
+                        * SLAB_TYPESAFE_BY_RCU to keep it alive.
+                        */
+                       nr_tw_prev = READ_ONCE(first_req->nr_tw);
+               }
+               nr_tw = nr_tw_prev + 1;
+               /* Large enough to fail the nr_wait comparison below */
+               if (!(flags & IOU_F_TWQ_LAZY_WAKE))
+                       nr_tw = -1U;
+               req->nr_tw = nr_tw;
+               req->io_task_work.node.next = first;
+       } while (!try_cmpxchg(&ctx->work_llist.first, &first,
+                             &req->io_task_work.node));
+       if (!first) {
+               if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
+                       atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
+               if (ctx->has_evfd)
+                       io_eventfd_signal(ctx);
        }
  
-       if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
-               atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
-       if (ctx->has_evfd)
-               io_eventfd_signal(ctx);
-       if (READ_ONCE(ctx->cq_waiting))
-               wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE);
- put_ref:
-       percpu_ref_put(&ctx->refs);
+       nr_wait = atomic_read(&ctx->cq_wait_nr);
+       /* no one is waiting */
+       if (!nr_wait)
+               return;
+       /* either not enough or the previous add has already woken it up */
+       if (nr_wait > nr_tw || nr_tw_prev >= nr_wait)
+               return;
+       /* pairs with set_current_state() in io_cqring_wait() */
+       smp_mb__after_atomic();
+       wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE);
  }
  
- void __io_req_task_work_add(struct io_kiocb *req, bool allow_local)
+ void __io_req_task_work_add(struct io_kiocb *req, unsigned flags)
  {
        struct io_uring_task *tctx = req->task->io_uring;
        struct io_ring_ctx *ctx = req->ctx;
  
-       if (allow_local && ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
-               io_req_local_work_add(req);
+       if (!(flags & IOU_F_TWQ_FORCE_NORMAL) &&
+           (ctx->flags & IORING_SETUP_DEFER_TASKRUN)) {
+               rcu_read_lock();
+               io_req_local_work_add(req, flags);
+               rcu_read_unlock();
                return;
        }
  
@@@ -1341,11 -1390,11 +1390,11 @@@ static void __cold io_move_task_work_fr
                                                    io_task_work.node);
  
                node = node->next;
-               __io_req_task_work_add(req, false);
+               __io_req_task_work_add(req, IOU_F_TWQ_FORCE_NORMAL);
        }
  }
  
- static int __io_run_local_work(struct io_ring_ctx *ctx, bool *locked)
+ static int __io_run_local_work(struct io_ring_ctx *ctx, struct io_tw_state *ts)
  {
        struct llist_node *node;
        unsigned int loops = 0;
@@@ -1362,7 -1411,7 +1411,7 @@@ again
                struct io_kiocb *req = container_of(node, struct io_kiocb,
                                                    io_task_work.node);
                prefetch(container_of(next, struct io_kiocb, io_task_work.node));
-               req->io_task_work.func(req, locked);
+               req->io_task_work.func(req, ts);
                ret++;
                node = next;
        }
  
        if (!llist_empty(&ctx->work_llist))
                goto again;
-       if (*locked) {
+       if (ts->locked) {
                io_submit_flush_completions(ctx);
                if (!llist_empty(&ctx->work_llist))
                        goto again;
  
  static inline int io_run_local_work_locked(struct io_ring_ctx *ctx)
  {
-       bool locked;
+       struct io_tw_state ts = { .locked = true, };
        int ret;
  
        if (llist_empty(&ctx->work_llist))
                return 0;
  
-       locked = true;
-       ret = __io_run_local_work(ctx, &locked);
+       ret = __io_run_local_work(ctx, &ts);
        /* shouldn't happen! */
-       if (WARN_ON_ONCE(!locked))
+       if (WARN_ON_ONCE(!ts.locked))
                mutex_lock(&ctx->uring_lock);
        return ret;
  }
  
  static int io_run_local_work(struct io_ring_ctx *ctx)
  {
-       bool locked = mutex_trylock(&ctx->uring_lock);
+       struct io_tw_state ts = {};
        int ret;
  
-       ret = __io_run_local_work(ctx, &locked);
-       if (locked)
+       ts.locked = mutex_trylock(&ctx->uring_lock);
+       ret = __io_run_local_work(ctx, &ts);
+       if (ts.locked)
                mutex_unlock(&ctx->uring_lock);
  
        return ret;
  }
  
- static void io_req_task_cancel(struct io_kiocb *req, bool *locked)
+ static void io_req_task_cancel(struct io_kiocb *req, struct io_tw_state *ts)
  {
-       io_tw_lock(req->ctx, locked);
+       io_tw_lock(req->ctx, ts);
        io_req_defer_failed(req, req->cqe.res);
  }
  
- void io_req_task_submit(struct io_kiocb *req, bool *locked)
+ void io_req_task_submit(struct io_kiocb *req, struct io_tw_state *ts)
  {
-       io_tw_lock(req->ctx, locked);
+       io_tw_lock(req->ctx, ts);
        /* req->task == current here, checking PF_EXITING is safe */
        if (unlikely(req->task->flags & PF_EXITING))
                io_req_defer_failed(req, -EFAULT);
        else if (req->flags & REQ_F_FORCE_ASYNC)
-               io_queue_iowq(req, locked);
+               io_queue_iowq(req, ts);
        else
                io_queue_sqe(req);
  }
@@@ -1646,9 -1695,9 +1695,9 @@@ static int io_iopoll_check(struct io_ri
        return ret;
  }
  
- void io_req_task_complete(struct io_kiocb *req, bool *locked)
+ void io_req_task_complete(struct io_kiocb *req, struct io_tw_state *ts)
  {
-       if (*locked)
+       if (ts->locked)
                io_req_complete_defer(req);
        else
                io_req_complete_post(req, IO_URING_F_UNLOCKED);
@@@ -1927,9 -1976,9 +1976,9 @@@ static int io_issue_sqe(struct io_kioc
        return 0;
  }
  
- int io_poll_issue(struct io_kiocb *req, bool *locked)
+ int io_poll_issue(struct io_kiocb *req, struct io_tw_state *ts)
  {
-       io_tw_lock(req->ctx, locked);
+       io_tw_lock(req->ctx, ts);
        return io_issue_sqe(req, IO_URING_F_NONBLOCK|IO_URING_F_MULTISHOT|
                                 IO_URING_F_COMPLETE_DEFER);
  }
@@@ -2298,8 -2347,7 +2347,7 @@@ static inline int io_submit_sqe(struct 
        if (unlikely(ret))
                return io_submit_fail_init(sqe, req, ret);
  
-       /* don't need @sqe from now on */
-       trace_io_uring_submit_sqe(req, true);
+       trace_io_uring_submit_req(req);
  
        /*
         * If we already have a head request, queue this one for async
@@@ -2428,7 -2476,7 +2476,7 @@@ int io_submit_sqes(struct io_ring_ctx *
        if (unlikely(!entries))
                return 0;
        /* make sure SQ entry isn't read before tail */
-       ret = left = min3(nr, ctx->sq_entries, entries);
+       ret = left = min(nr, entries);
        io_get_task_refs(left);
        io_submit_state_start(&ctx->submit_state, left);
  
@@@ -2600,7 -2648,9 +2648,9 @@@ static int io_cqring_wait(struct io_rin
                unsigned long check_cq;
  
                if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
-                       WRITE_ONCE(ctx->cq_waiting, 1);
+                       int nr_wait = (int) iowq.cq_tail - READ_ONCE(ctx->rings->cq.tail);
+                       atomic_set(&ctx->cq_wait_nr, nr_wait);
                        set_current_state(TASK_INTERRUPTIBLE);
                } else {
                        prepare_to_wait_exclusive(&ctx->cq_wait, &iowq.wq,
  
                ret = io_cqring_wait_schedule(ctx, &iowq);
                __set_current_state(TASK_RUNNING);
-               WRITE_ONCE(ctx->cq_waiting, 0);
+               atomic_set(&ctx->cq_wait_nr, 0);
  
                if (ret < 0)
                        break;
@@@ -2772,13 -2822,17 +2822,17 @@@ static void io_req_caches_free(struct i
        mutex_unlock(&ctx->uring_lock);
  }
  
+ static void io_rsrc_node_cache_free(struct io_cache_entry *entry)
+ {
+       kfree(container_of(entry, struct io_rsrc_node, cache));
+ }
  static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
  {
        io_sq_thread_finish(ctx);
-       io_rsrc_refs_drop(ctx);
        /* __io_rsrc_put_work() may need uring_lock to progress, wait w/o it */
-       io_wait_rsrc_data(ctx->buf_data);
-       io_wait_rsrc_data(ctx->file_data);
+       if (WARN_ON_ONCE(!list_empty(&ctx->rsrc_ref_list)))
+               return;
  
        mutex_lock(&ctx->uring_lock);
        if (ctx->buf_data)
        io_eventfd_unregister(ctx);
        io_alloc_cache_free(&ctx->apoll_cache, io_apoll_cache_free);
        io_alloc_cache_free(&ctx->netmsg_cache, io_netmsg_cache_free);
 -      mutex_unlock(&ctx->uring_lock);
        io_destroy_buffers(ctx);
 +      mutex_unlock(&ctx->uring_lock);
        if (ctx->sq_creds)
                put_cred(ctx->sq_creds);
        if (ctx->submitter_task)
  
        /* there are no registered resources left, nobody uses it */
        if (ctx->rsrc_node)
-               io_rsrc_node_destroy(ctx->rsrc_node);
-       if (ctx->rsrc_backup_node)
-               io_rsrc_node_destroy(ctx->rsrc_backup_node);
-       flush_delayed_work(&ctx->rsrc_put_work);
-       flush_delayed_work(&ctx->fallback_work);
+               io_rsrc_node_destroy(ctx, ctx->rsrc_node);
  
        WARN_ON_ONCE(!list_empty(&ctx->rsrc_ref_list));
-       WARN_ON_ONCE(!llist_empty(&ctx->rsrc_put_llist));
  
  #if defined(CONFIG_UNIX)
        if (ctx->ring_sock) {
  #endif
        WARN_ON_ONCE(!list_empty(&ctx->ltimeout_list));
  
+       io_alloc_cache_free(&ctx->rsrc_node_cache, io_rsrc_node_cache_free);
        if (ctx->mm_account) {
                mmdrop(ctx->mm_account);
                ctx->mm_account = NULL;
@@@ -3031,6 -3081,10 +3081,10 @@@ static __cold void io_ring_exit_work(st
        spin_lock(&ctx->completion_lock);
        spin_unlock(&ctx->completion_lock);
  
+       /* pairs with RCU read section in io_req_local_work_add() */
+       if (ctx->flags & IORING_SETUP_DEFER_TASKRUN)
+               synchronize_rcu();
        io_ring_ctx_free(ctx);
  }
  
@@@ -3146,6 -3200,12 +3200,12 @@@ static __cold bool io_uring_try_cancel_
        enum io_wq_cancel cret;
        bool ret = false;
  
+       /* set it so io_req_local_work_add() would wake us up */
+       if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
+               atomic_set(&ctx->cq_wait_nr, 1);
+               smp_mb();
+       }
        /* failed during ring init, it couldn't have issued any requests */
        if (!ctx->rings)
                return false;
@@@ -3200,6 -3260,8 +3260,8 @@@ __cold void io_uring_cancel_generic(boo
  {
        struct io_uring_task *tctx = current->io_uring;
        struct io_ring_ctx *ctx;
+       struct io_tctx_node *node;
+       unsigned long index;
        s64 inflight;
        DEFINE_WAIT(wait);
  
                        break;
  
                if (!sqd) {
-                       struct io_tctx_node *node;
-                       unsigned long index;
                        xa_for_each(&tctx->xa, index, node) {
                                /* sqpoll task will cancel all its requests */
                                if (node->ctx->sq_data)
                prepare_to_wait(&tctx->wait, &wait, TASK_INTERRUPTIBLE);
                io_run_task_work();
                io_uring_drop_tctx_refs(current);
+               xa_for_each(&tctx->xa, index, node) {
+                       if (!llist_empty(&node->ctx->work_llist)) {
+                               WARN_ON_ONCE(node->ctx->submitter_task &&
+                                            node->ctx->submitter_task != current);
+                               goto end_wait;
+                       }
+               }
                /*
                 * If we've seen completions, retry without waiting. This
                 * avoids a race where a completion comes in before we did
                 */
                if (inflight == tctx_inflight(tctx, !cancel_all))
                        schedule();
+ end_wait:
                finish_wait(&tctx->wait, &wait);
        } while (1);
  
@@@ -3282,7 -3348,7 +3348,7 @@@ static void *io_uring_validate_mmap_req
        struct page *page;
        void *ptr;
  
-       switch (offset) {
+       switch (offset & IORING_OFF_MMAP_MASK) {
        case IORING_OFF_SQ_RING:
        case IORING_OFF_CQ_RING:
                ptr = ctx->rings;
        case IORING_OFF_SQES:
                ptr = ctx->sq_sqes;
                break;
+       case IORING_OFF_PBUF_RING: {
+               unsigned int bgid;
+               bgid = (offset & ~IORING_OFF_MMAP_MASK) >> IORING_OFF_PBUF_SHIFT;
+               mutex_lock(&ctx->uring_lock);
+               ptr = io_pbuf_get_address(ctx, bgid);
+               mutex_unlock(&ctx->uring_lock);
+               if (!ptr)
+                       return ERR_PTR(-EINVAL);
+               break;
+               }
        default:
                return ERR_PTR(-EINVAL);
        }
@@@ -3317,6 -3394,54 +3394,54 @@@ static __cold int io_uring_mmap(struct 
        return remap_pfn_range(vma, vma->vm_start, pfn, sz, vma->vm_page_prot);
  }
  
+ static unsigned long io_uring_mmu_get_unmapped_area(struct file *filp,
+                       unsigned long addr, unsigned long len,
+                       unsigned long pgoff, unsigned long flags)
+ {
+       const unsigned long mmap_end = arch_get_mmap_end(addr, len, flags);
+       struct vm_unmapped_area_info info;
+       void *ptr;
+       /*
+        * Do not allow to map to user-provided address to avoid breaking the
+        * aliasing rules. Userspace is not able to guess the offset address of
+        * kernel kmalloc()ed memory area.
+        */
+       if (addr)
+               return -EINVAL;
+       ptr = io_uring_validate_mmap_request(filp, pgoff, len);
+       if (IS_ERR(ptr))
+               return -ENOMEM;
+       info.flags = VM_UNMAPPED_AREA_TOPDOWN;
+       info.length = len;
+       info.low_limit = max(PAGE_SIZE, mmap_min_addr);
+       info.high_limit = arch_get_mmap_base(addr, current->mm->mmap_base);
+ #ifdef SHM_COLOUR
+       info.align_mask = PAGE_MASK & (SHM_COLOUR - 1UL);
+ #else
+       info.align_mask = PAGE_MASK & (SHMLBA - 1UL);
+ #endif
+       info.align_offset = (unsigned long) ptr;
+       /*
+        * A failed mmap() very likely causes application failure,
+        * so fall back to the bottom-up function here. This scenario
+        * can happen with large stack limits and large mmap()
+        * allocations.
+        */
+       addr = vm_unmapped_area(&info);
+       if (offset_in_page(addr)) {
+               info.flags = 0;
+               info.low_limit = TASK_UNMAPPED_BASE;
+               info.high_limit = mmap_end;
+               addr = vm_unmapped_area(&info);
+       }
+       return addr;
+ }
  #else /* !CONFIG_MMU */
  
  static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
@@@ -3529,6 -3654,8 +3654,8 @@@ static const struct file_operations io_
  #ifndef CONFIG_MMU
        .get_unmapped_area = io_uring_nommu_get_unmapped_area,
        .mmap_capabilities = io_uring_nommu_mmap_capabilities,
+ #else
+       .get_unmapped_area = io_uring_mmu_get_unmapped_area,
  #endif
        .poll           = io_uring_poll,
  #ifdef CONFIG_PROC_FS
@@@ -3755,11 -3882,10 +3882,10 @@@ static __cold int io_uring_create(unsig
        ret = io_sq_offload_create(ctx, p);
        if (ret)
                goto err;
-       /* always set a rsrc node */
-       ret = io_rsrc_node_switch_start(ctx);
+       ret = io_rsrc_init(ctx);
        if (ret)
                goto err;
-       io_rsrc_node_switch(ctx, NULL);
  
        memset(&p->sq_off, 0, sizeof(p->sq_off));
        p->sq_off.head = offsetof(struct io_rings, sq.head);
@@@ -4425,7 -4551,7 +4551,7 @@@ static int __init io_uring_init(void
        io_uring_optable_init();
  
        req_cachep = KMEM_CACHE(io_kiocb, SLAB_HWCACHE_ALIGN | SLAB_PANIC |
-                               SLAB_ACCOUNT);
+                               SLAB_ACCOUNT | SLAB_TYPESAFE_BY_RCU);
        return 0;
  };
  __initcall(io_uring_init);
diff --combined io_uring/kbuf.c
index a90c820ce99e12e5e72d37dee4fefd2d24665c3a,0905c1761fbabc20fb5df2edb91fd2f8d7a6c563..2f0181521c98e41d6a0af6dc83d3d5491956f92b
@@@ -137,7 -137,8 +137,8 @@@ static void __user *io_ring_buffer_sele
                return NULL;
  
        head &= bl->mask;
-       if (head < IO_BUFFER_LIST_BUF_PER_PAGE) {
+       /* mmaped buffers are always contig */
+       if (bl->is_mmap || head < IO_BUFFER_LIST_BUF_PER_PAGE) {
                buf = &br->bufs[head];
        } else {
                int off = head & (IO_BUFFER_LIST_BUF_PER_PAGE - 1);
@@@ -179,7 -180,7 +180,7 @@@ void __user *io_buffer_select(struct io
  
        bl = io_buffer_get_list(ctx, req->buf_index);
        if (likely(bl)) {
-               if (bl->buf_nr_pages)
+               if (bl->is_mapped)
                        ret = io_ring_buffer_select(req, len, bl, issue_flags);
                else
                        ret = io_provided_buffer_select(req, len, bl);
@@@ -214,32 -215,42 +215,43 @@@ static int __io_remove_buffers(struct i
        if (!nbufs)
                return 0;
  
-       if (bl->buf_nr_pages) {
-               int j;
+       if (bl->is_mapped) {
                i = bl->buf_ring->tail - bl->head;
-               for (j = 0; j < bl->buf_nr_pages; j++)
-                       unpin_user_page(bl->buf_pages[j]);
-               kvfree(bl->buf_pages);
-               bl->buf_pages = NULL;
-               bl->buf_nr_pages = 0;
+               if (bl->is_mmap) {
+                       struct page *page;
+                       page = virt_to_head_page(bl->buf_ring);
+                       if (put_page_testzero(page))
+                               free_compound_page(page);
+                       bl->buf_ring = NULL;
+                       bl->is_mmap = 0;
+               } else if (bl->buf_nr_pages) {
+                       int j;
+                       for (j = 0; j < bl->buf_nr_pages; j++)
+                               unpin_user_page(bl->buf_pages[j]);
+                       kvfree(bl->buf_pages);
+                       bl->buf_pages = NULL;
+                       bl->buf_nr_pages = 0;
+               }
                /* make sure it's seen as empty */
                INIT_LIST_HEAD(&bl->buf_list);
+               bl->is_mapped = 0;
                return i;
        }
  
 -      /* the head kbuf is the list itself */
 +      /* protects io_buffers_cache */
 +      lockdep_assert_held(&ctx->uring_lock);
 +
        while (!list_empty(&bl->buf_list)) {
                struct io_buffer *nxt;
  
                nxt = list_first_entry(&bl->buf_list, struct io_buffer, list);
 -              list_del(&nxt->list);
 +              list_move(&nxt->list, &ctx->io_buffers_cache);
                if (++i == nbufs)
                        return i;
                cond_resched();
        }
 -      i++;
  
        return i;
  }
@@@ -304,7 -315,7 +316,7 @@@ int io_remove_buffers(struct io_kiocb *
        if (bl) {
                ret = -EINVAL;
                /* can't use provide/remove buffers command on mapped buffers */
-               if (!bl->buf_nr_pages)
+               if (!bl->is_mapped)
                        ret = __io_remove_buffers(ctx, bl, p->nbufs);
        }
        io_ring_submit_unlock(ctx, issue_flags);
@@@ -449,7 -460,7 +461,7 @@@ int io_provide_buffers(struct io_kiocb 
                }
        }
        /* can't add buffers via this command for a mapped buffer ring */
-       if (bl->buf_nr_pages) {
+       if (bl->is_mapped) {
                ret = -EINVAL;
                goto err;
        }
@@@ -464,23 -475,87 +476,87 @@@ err
        return IOU_OK;
  }
  
- int io_register_pbuf_ring(struct io_ring_ctx *ctx, void __user *arg)
+ static int io_pin_pbuf_ring(struct io_uring_buf_reg *reg,
+                           struct io_buffer_list *bl)
  {
        struct io_uring_buf_ring *br;
-       struct io_uring_buf_reg reg;
-       struct io_buffer_list *bl, *free_bl = NULL;
        struct page **pages;
        int nr_pages;
  
+       pages = io_pin_pages(reg->ring_addr,
+                            flex_array_size(br, bufs, reg->ring_entries),
+                            &nr_pages);
+       if (IS_ERR(pages))
+               return PTR_ERR(pages);
+       br = page_address(pages[0]);
+ #ifdef SHM_COLOUR
+       /*
+        * On platforms that have specific aliasing requirements, SHM_COLOUR
+        * is set and we must guarantee that the kernel and user side align
+        * nicely. We cannot do that if IOU_PBUF_RING_MMAP isn't set and
+        * the application mmap's the provided ring buffer. Fail the request
+        * if we, by chance, don't end up with aligned addresses. The app
+        * should use IOU_PBUF_RING_MMAP instead, and liburing will handle
+        * this transparently.
+        */
+       if ((reg->ring_addr | (unsigned long) br) & (SHM_COLOUR - 1)) {
+               int i;
+               for (i = 0; i < nr_pages; i++)
+                       unpin_user_page(pages[i]);
+               return -EINVAL;
+       }
+ #endif
+       bl->buf_pages = pages;
+       bl->buf_nr_pages = nr_pages;
+       bl->buf_ring = br;
+       bl->is_mapped = 1;
+       bl->is_mmap = 0;
+       return 0;
+ }
+ static int io_alloc_pbuf_ring(struct io_uring_buf_reg *reg,
+                             struct io_buffer_list *bl)
+ {
+       gfp_t gfp = GFP_KERNEL_ACCOUNT | __GFP_ZERO | __GFP_NOWARN | __GFP_COMP;
+       size_t ring_size;
+       void *ptr;
+       ring_size = reg->ring_entries * sizeof(struct io_uring_buf_ring);
+       ptr = (void *) __get_free_pages(gfp, get_order(ring_size));
+       if (!ptr)
+               return -ENOMEM;
+       bl->buf_ring = ptr;
+       bl->is_mapped = 1;
+       bl->is_mmap = 1;
+       return 0;
+ }
+ int io_register_pbuf_ring(struct io_ring_ctx *ctx, void __user *arg)
+ {
+       struct io_uring_buf_reg reg;
+       struct io_buffer_list *bl, *free_bl = NULL;
+       int ret;
        if (copy_from_user(&reg, arg, sizeof(reg)))
                return -EFAULT;
  
-       if (reg.pad || reg.resv[0] || reg.resv[1] || reg.resv[2])
+       if (reg.resv[0] || reg.resv[1] || reg.resv[2])
                return -EINVAL;
-       if (!reg.ring_addr)
-               return -EFAULT;
-       if (reg.ring_addr & ~PAGE_MASK)
+       if (reg.flags & ~IOU_PBUF_RING_MMAP)
                return -EINVAL;
+       if (!(reg.flags & IOU_PBUF_RING_MMAP)) {
+               if (!reg.ring_addr)
+                       return -EFAULT;
+               if (reg.ring_addr & ~PAGE_MASK)
+                       return -EINVAL;
+       } else {
+               if (reg.ring_addr)
+                       return -EINVAL;
+       }
        if (!is_power_of_2(reg.ring_entries))
                return -EINVAL;
  
        bl = io_buffer_get_list(ctx, reg.bgid);
        if (bl) {
                /* if mapped buffer ring OR classic exists, don't allow */
-               if (bl->buf_nr_pages || !list_empty(&bl->buf_list))
+               if (bl->is_mapped || !list_empty(&bl->buf_list))
                        return -EEXIST;
        } else {
                free_bl = bl = kzalloc(sizeof(*bl), GFP_KERNEL);
                        return -ENOMEM;
        }
  
-       pages = io_pin_pages(reg.ring_addr,
-                            flex_array_size(br, bufs, reg.ring_entries),
-                            &nr_pages);
-       if (IS_ERR(pages)) {
-               kfree(free_bl);
-               return PTR_ERR(pages);
+       if (!(reg.flags & IOU_PBUF_RING_MMAP))
+               ret = io_pin_pbuf_ring(&reg, bl);
+       else
+               ret = io_alloc_pbuf_ring(&reg, bl);
+       if (!ret) {
+               bl->nr_entries = reg.ring_entries;
+               bl->mask = reg.ring_entries - 1;
+               io_buffer_add_list(ctx, bl, reg.bgid);
+               return 0;
        }
  
-       br = page_address(pages[0]);
-       bl->buf_pages = pages;
-       bl->buf_nr_pages = nr_pages;
-       bl->nr_entries = reg.ring_entries;
-       bl->buf_ring = br;
-       bl->mask = reg.ring_entries - 1;
-       io_buffer_add_list(ctx, bl, reg.bgid);
-       return 0;
+       kfree(free_bl);
+       return ret;
  }
  
  int io_unregister_pbuf_ring(struct io_ring_ctx *ctx, void __user *arg)
  
        if (copy_from_user(&reg, arg, sizeof(reg)))
                return -EFAULT;
-       if (reg.pad || reg.resv[0] || reg.resv[1] || reg.resv[2])
+       if (reg.resv[0] || reg.resv[1] || reg.resv[2])
+               return -EINVAL;
+       if (reg.flags)
                return -EINVAL;
  
        bl = io_buffer_get_list(ctx, reg.bgid);
        if (!bl)
                return -ENOENT;
-       if (!bl->buf_nr_pages)
+       if (!bl->is_mapped)
                return -EINVAL;
  
        __io_remove_buffers(ctx, bl, -1U);
        }
        return 0;
  }
+ void *io_pbuf_get_address(struct io_ring_ctx *ctx, unsigned long bgid)
+ {
+       struct io_buffer_list *bl;
+       bl = io_buffer_get_list(ctx, bgid);
+       if (!bl || !bl->is_mmap)
+               return NULL;
+       return bl->buf_ring;
+ }
diff --combined io_uring/rw.c
index f33ba6f282474c2e24f589b42aa30530b1f7c3f0,6c7d2654770efcee44980bcbb378d0fb5bcb2816..70b1407a4f200a0232481412b5708ab9bc3c59c7
@@@ -283,16 -283,16 +283,16 @@@ static inline int io_fixup_rw_res(struc
        return res;
  }
  
- static void io_req_rw_complete(struct io_kiocb *req, bool *locked)
+ static void io_req_rw_complete(struct io_kiocb *req, struct io_tw_state *ts)
  {
        io_req_io_end(req);
  
        if (req->flags & (REQ_F_BUFFER_SELECTED|REQ_F_BUFFER_RING)) {
-               unsigned issue_flags = *locked ? 0 : IO_URING_F_UNLOCKED;
+               unsigned issue_flags = ts->locked ? 0 : IO_URING_F_UNLOCKED;
  
                req->cqe.flags |= io_put_kbuf(req, issue_flags);
        }
-       io_req_task_complete(req, locked);
+       io_req_task_complete(req, ts);
  }
  
  static void io_complete_rw(struct kiocb *kiocb, long res)
                return;
        io_req_set_res(req, io_fixup_rw_res(req, res), 0);
        req->io_task_work.func = io_req_rw_complete;
-       io_req_task_work_add(req);
+       __io_req_task_work_add(req, IOU_F_TWQ_LAZY_WAKE);
  }
  
  static void io_complete_rw_iopoll(struct kiocb *kiocb, long res)
@@@ -447,25 -447,26 +447,25 @@@ static ssize_t loop_rw_iter(int ddir, s
        ppos = io_kiocb_ppos(kiocb);
  
        while (iov_iter_count(iter)) {
 -              struct iovec iovec;
 +              void __user *addr;
 +              size_t len;
                ssize_t nr;
  
                if (iter_is_ubuf(iter)) {
 -                      iovec.iov_base = iter->ubuf + iter->iov_offset;
 -                      iovec.iov_len = iov_iter_count(iter);
 +                      addr = iter->ubuf + iter->iov_offset;
 +                      len = iov_iter_count(iter);
                } else if (!iov_iter_is_bvec(iter)) {
 -                      iovec = iov_iter_iovec(iter);
 +                      addr = iter_iov_addr(iter);
 +                      len = iter_iov_len(iter);
                } else {
 -                      iovec.iov_base = u64_to_user_ptr(rw->addr);
 -                      iovec.iov_len = rw->len;
 +                      addr = u64_to_user_ptr(rw->addr);
 +                      len = rw->len;
                }
  
 -              if (ddir == READ) {
 -                      nr = file->f_op->read(file, iovec.iov_base,
 -                                            iovec.iov_len, ppos);
 -              } else {
 -                      nr = file->f_op->write(file, iovec.iov_base,
 -                                             iovec.iov_len, ppos);
 -              }
 +              if (ddir == READ)
 +                      nr = file->f_op->read(file, addr, len, ppos);
 +              else
 +                      nr = file->f_op->write(file, addr, len, ppos);
  
                if (nr < 0) {
                        if (!ret)
                        if (!rw->len)
                                break;
                }
 -              if (nr != iovec.iov_len)
 +              if (nr != len)
                        break;
        }
  
@@@ -502,10 -503,10 +502,10 @@@ static void io_req_map_rw(struct io_kio
        if (!iovec) {
                unsigned iov_off = 0;
  
 -              io->s.iter.iov = io->s.fast_iov;
 -              if (iter->iov != fast_iov) {
 -                      iov_off = iter->iov - fast_iov;
 -                      io->s.iter.iov += iov_off;
 +              io->s.iter.__iov = io->s.fast_iov;
 +              if (iter->__iov != fast_iov) {
 +                      iov_off = iter_iov(iter) - fast_iov;
 +                      io->s.iter.__iov += iov_off;
                }
                if (io->s.fast_iov != fast_iov)
                        memcpy(io->s.fast_iov + iov_off, fast_iov + iov_off,