ALSA: hda - Fix pending unsol events at shutdown
[linux-2.6-block.git] / fs / io_uring.c
index 24bbe3cb7ad45e72b1cfca18c3084795d0be6a8c..8a0381f1a43becd39d72f96a6af1f25caf024ff1 100644 (file)
@@ -75,7 +75,7 @@
 
 #include "internal.h"
 
-#define IORING_MAX_ENTRIES     4096
+#define IORING_MAX_ENTRIES     32768
 #define IORING_MAX_FIXED_FILES 1024
 
 struct io_uring {
@@ -84,27 +84,29 @@ struct io_uring {
 };
 
 /*
- * This data is shared with the application through the mmap at offset
- * IORING_OFF_SQ_RING.
+ * This data is shared with the application through the mmap at offsets
+ * IORING_OFF_SQ_RING and IORING_OFF_CQ_RING.
  *
  * The offsets to the member fields are published through struct
  * io_sqring_offsets when calling io_uring_setup.
  */
-struct io_sq_ring {
+struct io_rings {
        /*
         * Head and tail offsets into the ring; the offsets need to be
         * masked to get valid indices.
         *
-        * The kernel controls head and the application controls tail.
+        * The kernel controls head of the sq ring and the tail of the cq ring,
+        * and the application controls tail of the sq ring and the head of the
+        * cq ring.
         */
-       struct io_uring         r;
+       struct io_uring         sq, cq;
        /*
-        * Bitmask to apply to head and tail offsets (constant, equals
+        * Bitmasks to apply to head and tail offsets (constant, equals
         * ring_entries - 1)
         */
-       u32                     ring_mask;
-       /* Ring size (constant, power of 2) */
-       u32                     ring_entries;
+       u32                     sq_ring_mask, cq_ring_mask;
+       /* Ring sizes (constant, power of 2) */
+       u32                     sq_ring_entries, cq_ring_entries;
        /*
         * Number of invalid entries dropped by the kernel due to
         * invalid index stored in array
@@ -117,7 +119,7 @@ struct io_sq_ring {
         * counter includes all submissions that were dropped reaching
         * the new SQ head (and possibly more).
         */
-       u32                     dropped;
+       u32                     sq_dropped;
        /*
         * Runtime flags
         *
@@ -127,43 +129,7 @@ struct io_sq_ring {
         * The application needs a full memory barrier before checking
         * for IORING_SQ_NEED_WAKEUP after updating the sq tail.
         */
-       u32                     flags;
-       /*
-        * Ring buffer of indices into array of io_uring_sqe, which is
-        * mmapped by the application using the IORING_OFF_SQES offset.
-        *
-        * This indirection could e.g. be used to assign fixed
-        * io_uring_sqe entries to operations and only submit them to
-        * the queue when needed.
-        *
-        * The kernel modifies neither the indices array nor the entries
-        * array.
-        */
-       u32                     array[];
-};
-
-/*
- * This data is shared with the application through the mmap at offset
- * IORING_OFF_CQ_RING.
- *
- * The offsets to the member fields are published through struct
- * io_cqring_offsets when calling io_uring_setup.
- */
-struct io_cq_ring {
-       /*
-        * Head and tail offsets into the ring; the offsets need to be
-        * masked to get valid indices.
-        *
-        * The application controls head and the kernel tail.
-        */
-       struct io_uring         r;
-       /*
-        * Bitmask to apply to head and tail offsets (constant, equals
-        * ring_entries - 1)
-        */
-       u32                     ring_mask;
-       /* Ring size (constant, power of 2) */
-       u32                     ring_entries;
+       u32                     sq_flags;
        /*
         * Number of completion events lost because the queue was full;
         * this should be avoided by the application by making sure
@@ -177,7 +143,7 @@ struct io_cq_ring {
         * As completion events come in out of order this counter is not
         * ordered with any other data.
         */
-       u32                     overflow;
+       u32                     cq_overflow;
        /*
         * Ring buffer of completion events.
         *
@@ -185,7 +151,7 @@ struct io_cq_ring {
         * produced, so the application is allowed to modify pending
         * entries.
         */
-       struct io_uring_cqe     cqes[];
+       struct io_uring_cqe     cqes[] ____cacheline_aligned_in_smp;
 };
 
 struct io_mapped_ubuf {
@@ -201,7 +167,7 @@ struct async_list {
        struct list_head        list;
 
        struct file             *file;
-       off_t                   io_end;
+       off_t                   io_start;
        size_t                  io_len;
 };
 
@@ -215,8 +181,18 @@ struct io_ring_ctx {
                bool                    compat;
                bool                    account_mem;
 
-               /* SQ ring */
-               struct io_sq_ring       *sq_ring;
+               /*
+                * Ring buffer of indices into array of io_uring_sqe, which is
+                * mmapped by the application using the IORING_OFF_SQES offset.
+                *
+                * This indirection could e.g. be used to assign fixed
+                * io_uring_sqe entries to operations and only submit them to
+                * the queue when needed.
+                *
+                * The kernel modifies neither the indices array nor the entries
+                * array.
+                */
+               u32                     *sq_array;
                unsigned                cached_sq_head;
                unsigned                sq_entries;
                unsigned                sq_mask;
@@ -224,26 +200,28 @@ struct io_ring_ctx {
                struct io_uring_sqe     *sq_sqes;
 
                struct list_head        defer_list;
+               struct list_head        timeout_list;
        } ____cacheline_aligned_in_smp;
 
        /* IO offload */
-       struct workqueue_struct *sqo_wq;
+       struct workqueue_struct *sqo_wq[2];
        struct task_struct      *sqo_thread;    /* if using sq thread polling */
        struct mm_struct        *sqo_mm;
        wait_queue_head_t       sqo_wait;
        struct completion       sqo_thread_started;
 
        struct {
-               /* CQ ring */
-               struct io_cq_ring       *cq_ring;
                unsigned                cached_cq_tail;
                unsigned                cq_entries;
                unsigned                cq_mask;
                struct wait_queue_head  cq_wait;
                struct fasync_struct    *cq_fasync;
                struct eventfd_ctx      *cq_ev_fd;
+               atomic_t                cq_timeouts;
        } ____cacheline_aligned_in_smp;
 
+       struct io_rings *rings;
+
        /*
         * If used, fixed file set. Writers must ensure that ->refs is dead,
         * readers must ensure that ->refs is alive as long as the file* is
@@ -288,6 +266,7 @@ struct io_ring_ctx {
 struct sqe_submit {
        const struct io_uring_sqe       *sqe;
        unsigned short                  index;
+       u32                             sequence;
        bool                            has_user;
        bool                            needs_lock;
        bool                            needs_fixed_file;
@@ -306,6 +285,11 @@ struct io_poll_iocb {
        struct wait_queue_entry         wait;
 };
 
+struct io_timeout {
+       struct file                     *file;
+       struct hrtimer                  timer;
+};
+
 /*
  * NOTE! Each of the iocb union members has the file pointer
  * as the first entry in their struct definition. So you can
@@ -317,6 +301,7 @@ struct io_kiocb {
                struct file             *file;
                struct kiocb            rw;
                struct io_poll_iocb     poll;
+               struct io_timeout       timeout;
        };
 
        struct sqe_submit       submit;
@@ -335,6 +320,8 @@ struct io_kiocb {
 #define REQ_F_LINK             64      /* linked sqes */
 #define REQ_F_LINK_DONE                128     /* linked sqes done */
 #define REQ_F_FAIL_LINK                256     /* fail rest of links */
+#define REQ_F_SHADOW_DRAIN     512     /* link-drain shadow req */
+#define REQ_F_TIMEOUT          1024    /* timeout request */
        u64                     user_data;
        u32                     result;
        u32                     sequence;
@@ -366,6 +353,9 @@ struct io_submit_state {
 };
 
 static void io_sq_wq_submit_work(struct work_struct *work);
+static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data,
+                                long res);
+static void __io_free_req(struct io_kiocb *req);
 
 static struct kmem_cache *req_cachep;
 
@@ -421,26 +411,30 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
        INIT_LIST_HEAD(&ctx->poll_list);
        INIT_LIST_HEAD(&ctx->cancel_list);
        INIT_LIST_HEAD(&ctx->defer_list);
+       INIT_LIST_HEAD(&ctx->timeout_list);
        return ctx;
 }
 
 static inline bool io_sequence_defer(struct io_ring_ctx *ctx,
                                     struct io_kiocb *req)
 {
-       if ((req->flags & (REQ_F_IO_DRAIN|REQ_F_IO_DRAINED)) != REQ_F_IO_DRAIN)
+       /* timeout requests always honor sequence */
+       if (!(req->flags & REQ_F_TIMEOUT) &&
+           (req->flags & (REQ_F_IO_DRAIN|REQ_F_IO_DRAINED)) != REQ_F_IO_DRAIN)
                return false;
 
-       return req->sequence != ctx->cached_cq_tail + ctx->sq_ring->dropped;
+       return req->sequence != ctx->cached_cq_tail + ctx->rings->sq_dropped;
 }
 
-static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx)
+static struct io_kiocb *__io_get_deferred_req(struct io_ring_ctx *ctx,
+                                             struct list_head *list)
 {
        struct io_kiocb *req;
 
-       if (list_empty(&ctx->defer_list))
+       if (list_empty(list))
                return NULL;
 
-       req = list_first_entry(&ctx->defer_list, struct io_kiocb, list);
+       req = list_first_entry(list, struct io_kiocb, list);
        if (!io_sequence_defer(ctx, req)) {
                list_del_init(&req->list);
                return req;
@@ -449,13 +443,23 @@ static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx)
        return NULL;
 }
 
+static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx)
+{
+       return __io_get_deferred_req(ctx, &ctx->defer_list);
+}
+
+static struct io_kiocb *io_get_timeout_req(struct io_ring_ctx *ctx)
+{
+       return __io_get_deferred_req(ctx, &ctx->timeout_list);
+}
+
 static void __io_commit_cqring(struct io_ring_ctx *ctx)
 {
-       struct io_cq_ring *ring = ctx->cq_ring;
+       struct io_rings *rings = ctx->rings;
 
-       if (ctx->cached_cq_tail != READ_ONCE(ring->r.tail)) {
+       if (ctx->cached_cq_tail != READ_ONCE(rings->cq.tail)) {
                /* order cqe stores with ring update */
-               smp_store_release(&ring->r.tail, ctx->cached_cq_tail);
+               smp_store_release(&rings->cq.tail, ctx->cached_cq_tail);
 
                if (wq_has_sleeper(&ctx->cq_wait)) {
                        wake_up_interruptible(&ctx->cq_wait);
@@ -464,21 +468,69 @@ static void __io_commit_cqring(struct io_ring_ctx *ctx)
        }
 }
 
+static inline void io_queue_async_work(struct io_ring_ctx *ctx,
+                                      struct io_kiocb *req)
+{
+       int rw = 0;
+
+       if (req->submit.sqe) {
+               switch (req->submit.sqe->opcode) {
+               case IORING_OP_WRITEV:
+               case IORING_OP_WRITE_FIXED:
+                       rw = !(req->rw.ki_flags & IOCB_DIRECT);
+                       break;
+               }
+       }
+
+       queue_work(ctx->sqo_wq[rw], &req->work);
+}
+
+static void io_kill_timeout(struct io_kiocb *req)
+{
+       int ret;
+
+       ret = hrtimer_try_to_cancel(&req->timeout.timer);
+       if (ret != -1) {
+               atomic_inc(&req->ctx->cq_timeouts);
+               list_del(&req->list);
+               io_cqring_fill_event(req->ctx, req->user_data, 0);
+               __io_free_req(req);
+       }
+}
+
+static void io_kill_timeouts(struct io_ring_ctx *ctx)
+{
+       struct io_kiocb *req, *tmp;
+
+       spin_lock_irq(&ctx->completion_lock);
+       list_for_each_entry_safe(req, tmp, &ctx->timeout_list, list)
+               io_kill_timeout(req);
+       spin_unlock_irq(&ctx->completion_lock);
+}
+
 static void io_commit_cqring(struct io_ring_ctx *ctx)
 {
        struct io_kiocb *req;
 
+       while ((req = io_get_timeout_req(ctx)) != NULL)
+               io_kill_timeout(req);
+
        __io_commit_cqring(ctx);
 
        while ((req = io_get_deferred_req(ctx)) != NULL) {
+               if (req->flags & REQ_F_SHADOW_DRAIN) {
+                       /* Just for drain, free it. */
+                       __io_free_req(req);
+                       continue;
+               }
                req->flags |= REQ_F_IO_DRAINED;
-               queue_work(ctx->sqo_wq, &req->work);
+               io_queue_async_work(ctx, req);
        }
 }
 
 static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx)
 {
-       struct io_cq_ring *ring = ctx->cq_ring;
+       struct io_rings *rings = ctx->rings;
        unsigned tail;
 
        tail = ctx->cached_cq_tail;
@@ -487,11 +539,11 @@ static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx)
         * control dependency is enough as we're using WRITE_ONCE to
         * fill the cq entry
         */
-       if (tail - READ_ONCE(ring->r.head) == ring->ring_entries)
+       if (tail - READ_ONCE(rings->cq.head) == rings->cq_ring_entries)
                return NULL;
 
        ctx->cached_cq_tail++;
-       return &ring->cqes[tail & ctx->cq_mask];
+       return &rings->cqes[tail & ctx->cq_mask];
 }
 
 static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data,
@@ -510,9 +562,9 @@ static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data,
                WRITE_ONCE(cqe->res, res);
                WRITE_ONCE(cqe->flags, 0);
        } else {
-               unsigned overflow = READ_ONCE(ctx->cq_ring->overflow);
+               unsigned overflow = READ_ONCE(ctx->rings->cq_overflow);
 
-               WRITE_ONCE(ctx->cq_ring->overflow, overflow + 1);
+               WRITE_ONCE(ctx->rings->cq_overflow, overflow + 1);
        }
 }
 
@@ -635,7 +687,7 @@ static void io_req_link_next(struct io_kiocb *req)
 
                nxt->flags |= REQ_F_LINK_DONE;
                INIT_WORK(&nxt->work, io_sq_wq_submit_work);
-               queue_work(req->ctx->sqo_wq, &nxt->work);
+               io_queue_async_work(req->ctx, nxt);
        }
 }
 
@@ -679,6 +731,13 @@ static void io_put_req(struct io_kiocb *req)
                io_free_req(req);
 }
 
+static unsigned io_cqring_events(struct io_rings *rings)
+{
+       /* See comment at the top of this file */
+       smp_rmb();
+       return READ_ONCE(rings->cq.tail) - READ_ONCE(rings->cq.head);
+}
+
 /*
  * Find and free completed poll iocbs
  */
@@ -771,7 +830,7 @@ static int io_do_iopoll(struct io_ring_ctx *ctx, unsigned int *nr_events,
 static int io_iopoll_getevents(struct io_ring_ctx *ctx, unsigned int *nr_events,
                                long min)
 {
-       while (!list_empty(&ctx->poll_list)) {
+       while (!list_empty(&ctx->poll_list) && !need_resched()) {
                int ret;
 
                ret = io_do_iopoll(ctx, nr_events, min);
@@ -798,6 +857,12 @@ static void io_iopoll_reap_events(struct io_ring_ctx *ctx)
                unsigned int nr_events = 0;
 
                io_iopoll_getevents(ctx, &nr_events, 1);
+
+               /*
+                * Ensure we allow local-to-the-cpu processing to take place,
+                * in this case we need to ensure that we reap all events.
+                */
+               cond_resched();
        }
        mutex_unlock(&ctx->uring_lock);
 }
@@ -805,11 +870,42 @@ static void io_iopoll_reap_events(struct io_ring_ctx *ctx)
 static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
                           long min)
 {
-       int ret = 0;
+       int iters, ret = 0;
+
+       /*
+        * We disallow the app entering submit/complete with polling, but we
+        * still need to lock the ring to prevent racing with polled issue
+        * that got punted to a workqueue.
+        */
+       mutex_lock(&ctx->uring_lock);
 
+       iters = 0;
        do {
                int tmin = 0;
 
+               /*
+                * Don't enter poll loop if we already have events pending.
+                * If we do, we can potentially be spinning for commands that
+                * already triggered a CQE (eg in error).
+                */
+               if (io_cqring_events(ctx->rings))
+                       break;
+
+               /*
+                * If a submit got punted to a workqueue, we can have the
+                * application entering polling for a command before it gets
+                * issued. That app will hold the uring_lock for the duration
+                * of the poll right here, so we need to take a breather every
+                * now and then to ensure that the issue has a chance to add
+                * the poll to the issued list. Otherwise we can spin here
+                * forever, while the workqueue is stuck trying to acquire the
+                * very same mutex.
+                */
+               if (!(++iters & 7)) {
+                       mutex_unlock(&ctx->uring_lock);
+                       mutex_lock(&ctx->uring_lock);
+               }
+
                if (*nr_events < min)
                        tmin = min - *nr_events;
 
@@ -819,6 +915,7 @@ static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
                ret = 0;
        } while (min && !*nr_events && !need_resched());
 
+       mutex_unlock(&ctx->uring_lock);
        return ret;
 }
 
@@ -1142,6 +1239,28 @@ static ssize_t io_import_iovec(struct io_ring_ctx *ctx, int rw,
        return import_iovec(rw, buf, sqe_len, UIO_FASTIOV, iovec, iter);
 }
 
+static inline bool io_should_merge(struct async_list *al, struct kiocb *kiocb)
+{
+       if (al->file == kiocb->ki_filp) {
+               off_t start, end;
+
+               /*
+                * Allow merging if we're anywhere in the range of the same
+                * page. Generally this happens for sub-page reads or writes,
+                * and it's beneficial to allow the first worker to bring the
+                * page in and the piggy backed work can then work on the
+                * cached page.
+                */
+               start = al->io_start & PAGE_MASK;
+               end = (al->io_start + al->io_len + PAGE_SIZE - 1) & PAGE_MASK;
+               if (kiocb->ki_pos >= start && kiocb->ki_pos <= end)
+                       return true;
+       }
+
+       al->file = NULL;
+       return false;
+}
+
 /*
  * Make a note of the last file/offset/direction we punted to async
  * context. We'll use this information to see if we can piggy back a
@@ -1153,9 +1272,8 @@ static void io_async_list_note(int rw, struct io_kiocb *req, size_t len)
        struct async_list *async_list = &req->ctx->pending_async[rw];
        struct kiocb *kiocb = &req->rw;
        struct file *filp = kiocb->ki_filp;
-       off_t io_end = kiocb->ki_pos + len;
 
-       if (filp == async_list->file && kiocb->ki_pos == async_list->io_end) {
+       if (io_should_merge(async_list, kiocb)) {
                unsigned long max_bytes;
 
                /* Use 8x RA size as a decent limiter for both reads/writes */
@@ -1168,17 +1286,61 @@ static void io_async_list_note(int rw, struct io_kiocb *req, size_t len)
                        req->flags |= REQ_F_SEQ_PREV;
                        async_list->io_len += len;
                } else {
-                       io_end = 0;
-                       async_list->io_len = 0;
+                       async_list->file = NULL;
                }
        }
 
        /* New file? Reset state. */
        if (async_list->file != filp) {
-               async_list->io_len = 0;
+               async_list->io_start = kiocb->ki_pos;
+               async_list->io_len = len;
                async_list->file = filp;
        }
-       async_list->io_end = io_end;
+}
+
+/*
+ * For files that don't have ->read_iter() and ->write_iter(), handle them
+ * by looping over ->read() or ->write() manually.
+ */
+static ssize_t loop_rw_iter(int rw, struct file *file, struct kiocb *kiocb,
+                          struct iov_iter *iter)
+{
+       ssize_t ret = 0;
+
+       /*
+        * Don't support polled IO through this interface, and we can't
+        * support non-blocking either. For the latter, this just causes
+        * the kiocb to be handled from an async context.
+        */
+       if (kiocb->ki_flags & IOCB_HIPRI)
+               return -EOPNOTSUPP;
+       if (kiocb->ki_flags & IOCB_NOWAIT)
+               return -EAGAIN;
+
+       while (iov_iter_count(iter)) {
+               struct iovec iovec = iov_iter_iovec(iter);
+               ssize_t nr;
+
+               if (rw == READ) {
+                       nr = file->f_op->read(file, iovec.iov_base,
+                                             iovec.iov_len, &kiocb->ki_pos);
+               } else {
+                       nr = file->f_op->write(file, iovec.iov_base,
+                                              iovec.iov_len, &kiocb->ki_pos);
+               }
+
+               if (nr < 0) {
+                       if (!ret)
+                               ret = nr;
+                       break;
+               }
+               ret += nr;
+               if (nr != iovec.iov_len)
+                       break;
+               iov_iter_advance(iter, nr);
+       }
+
+       return ret;
 }
 
 static int io_read(struct io_kiocb *req, const struct sqe_submit *s,
@@ -1198,8 +1360,6 @@ static int io_read(struct io_kiocb *req, const struct sqe_submit *s,
 
        if (unlikely(!(file->f_mode & FMODE_READ)))
                return -EBADF;
-       if (unlikely(!file->f_op->read_iter))
-               return -EINVAL;
 
        ret = io_import_iovec(req->ctx, READ, s, &iovec, &iter);
        if (ret < 0)
@@ -1214,7 +1374,11 @@ static int io_read(struct io_kiocb *req, const struct sqe_submit *s,
        if (!ret) {
                ssize_t ret2;
 
-               ret2 = call_read_iter(file, kiocb, &iter);
+               if (file->f_op->read_iter)
+                       ret2 = call_read_iter(file, kiocb, &iter);
+               else
+                       ret2 = loop_rw_iter(READ, file, kiocb, &iter);
+
                /*
                 * In case of a short read, punt to async. This can happen
                 * if we have data partially cached. Alternatively we can
@@ -1259,8 +1423,6 @@ static int io_write(struct io_kiocb *req, const struct sqe_submit *s,
        file = kiocb->ki_filp;
        if (unlikely(!(file->f_mode & FMODE_WRITE)))
                return -EBADF;
-       if (unlikely(!file->f_op->write_iter))
-               return -EINVAL;
 
        ret = io_import_iovec(req->ctx, WRITE, s, &iovec, &iter);
        if (ret < 0)
@@ -1298,7 +1460,10 @@ static int io_write(struct io_kiocb *req, const struct sqe_submit *s,
                }
                kiocb->ki_flags |= IOCB_WRITE;
 
-               ret2 = call_write_iter(file, kiocb, &iter);
+               if (file->f_op->write_iter)
+                       ret2 = call_write_iter(file, kiocb, &iter);
+               else
+                       ret2 = loop_rw_iter(WRITE, file, kiocb, &iter);
                if (!force_nonblock || ret2 != -EAGAIN) {
                        io_rw_done(kiocb, ret2);
                } else {
@@ -1490,7 +1655,7 @@ static void io_poll_remove_one(struct io_kiocb *req)
        WRITE_ONCE(poll->canceled, true);
        if (!list_empty(&poll->wait.entry)) {
                list_del_init(&poll->wait.entry);
-               queue_work(req->ctx->sqo_wq, &req->work);
+               io_queue_async_work(req->ctx, req);
        }
        spin_unlock(&poll->head->lock);
 
@@ -1604,7 +1769,7 @@ static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
                io_cqring_ev_posted(ctx);
                io_put_req(req);
        } else {
-               queue_work(ctx->sqo_wq, &req->work);
+               io_queue_async_work(ctx, req);
        }
 
        return 1;
@@ -1647,6 +1812,7 @@ static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe)
        if (!poll->file)
                return -EBADF;
 
+       req->submit.sqe = NULL;
        INIT_WORK(&req->work, io_poll_complete_work);
        events = READ_ONCE(sqe->poll_events);
        poll->events = demangle_poll(events) | EPOLLERR | EPOLLHUP;
@@ -1698,6 +1864,81 @@ static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe)
        return ipt.error;
 }
 
+static enum hrtimer_restart io_timeout_fn(struct hrtimer *timer)
+{
+       struct io_ring_ctx *ctx;
+       struct io_kiocb *req;
+       unsigned long flags;
+
+       req = container_of(timer, struct io_kiocb, timeout.timer);
+       ctx = req->ctx;
+       atomic_inc(&ctx->cq_timeouts);
+
+       spin_lock_irqsave(&ctx->completion_lock, flags);
+       list_del(&req->list);
+
+       io_cqring_fill_event(ctx, req->user_data, -ETIME);
+       io_commit_cqring(ctx);
+       spin_unlock_irqrestore(&ctx->completion_lock, flags);
+
+       io_cqring_ev_posted(ctx);
+
+       io_put_req(req);
+       return HRTIMER_NORESTART;
+}
+
+static int io_timeout(struct io_kiocb *req, const struct io_uring_sqe *sqe)
+{
+       unsigned count, req_dist, tail_index;
+       struct io_ring_ctx *ctx = req->ctx;
+       struct list_head *entry;
+       struct timespec64 ts;
+
+       if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
+               return -EINVAL;
+       if (sqe->flags || sqe->ioprio || sqe->buf_index || sqe->timeout_flags ||
+           sqe->len != 1)
+               return -EINVAL;
+
+       if (get_timespec64(&ts, u64_to_user_ptr(sqe->addr)))
+               return -EFAULT;
+
+       /*
+        * sqe->off holds how many events that need to occur for this
+        * timeout event to be satisfied.
+        */
+       count = READ_ONCE(sqe->off);
+       if (!count)
+               count = 1;
+
+       req->sequence = ctx->cached_sq_head + count - 1;
+       req->flags |= REQ_F_TIMEOUT;
+
+       /*
+        * Insertion sort, ensuring the first entry in the list is always
+        * the one we need first.
+        */
+       tail_index = ctx->cached_cq_tail - ctx->rings->sq_dropped;
+       req_dist = req->sequence - tail_index;
+       spin_lock_irq(&ctx->completion_lock);
+       list_for_each_prev(entry, &ctx->timeout_list) {
+               struct io_kiocb *nxt = list_entry(entry, struct io_kiocb, list);
+               unsigned dist;
+
+               dist = nxt->sequence - tail_index;
+               if (req_dist >= dist)
+                       break;
+       }
+       list_add(&req->list, entry);
+       spin_unlock_irq(&ctx->completion_lock);
+
+       hrtimer_init(&req->timeout.timer, CLOCK_MONOTONIC, HRTIMER_MODE_REL);
+       req->timeout.timer.function = io_timeout_fn;
+       hrtimer_start(&req->timeout.timer, timespec64_to_ktime(ts),
+                       HRTIMER_MODE_REL);
+       return 0;
+}
+
 static int io_req_defer(struct io_ring_ctx *ctx, struct io_kiocb *req,
                        const struct io_uring_sqe *sqe)
 {
@@ -1775,6 +2016,9 @@ static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
        case IORING_OP_RECVMSG:
                ret = io_recvmsg(req, s->sqe, force_nonblock);
                break;
+       case IORING_OP_TIMEOUT:
+               ret = io_timeout(req, s->sqe);
+               break;
        default:
                ret = -EINVAL;
                break;
@@ -1947,7 +2191,7 @@ out:
  */
 static bool io_add_to_prev_work(struct async_list *list, struct io_kiocb *req)
 {
-       bool ret = false;
+       bool ret;
 
        if (!list)
                return false;
@@ -1993,10 +2237,14 @@ static int io_req_set_file(struct io_ring_ctx *ctx, const struct sqe_submit *s,
        flags = READ_ONCE(s->sqe->flags);
        fd = READ_ONCE(s->sqe->fd);
 
-       if (flags & IOSQE_IO_DRAIN) {
+       if (flags & IOSQE_IO_DRAIN)
                req->flags |= REQ_F_IO_DRAIN;
-               req->sequence = ctx->cached_sq_head - 1;
-       }
+       /*
+        * All io need record the previous position, if LINK vs DARIN,
+        * it can be used to mark the position of the first IO in the
+        * link list.
+        */
+       req->sequence = s->sequence;
 
        if (!io_op_needs_file(s->sqe))
                return 0;
@@ -2018,38 +2266,27 @@ static int io_req_set_file(struct io_ring_ctx *ctx, const struct sqe_submit *s,
        return 0;
 }
 
-static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
-                       struct sqe_submit *s)
+static int __io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
+                       struct sqe_submit *s, bool force_nonblock)
 {
        int ret;
 
-       ret = io_req_defer(ctx, req, s->sqe);
-       if (ret) {
-               if (ret != -EIOCBQUEUED) {
-                       io_free_req(req);
-                       io_cqring_add_event(ctx, s->sqe->user_data, ret);
-               }
-               return 0;
-       }
-
-       ret = __io_submit_sqe(ctx, req, s, true);
+       ret = __io_submit_sqe(ctx, req, s, force_nonblock);
        if (ret == -EAGAIN && !(req->flags & REQ_F_NOWAIT)) {
                struct io_uring_sqe *sqe_copy;
 
-               sqe_copy = kmalloc(sizeof(*sqe_copy), GFP_KERNEL);
+               sqe_copy = kmemdup(s->sqe, sizeof(*sqe_copy), GFP_KERNEL);
                if (sqe_copy) {
                        struct async_list *list;
 
-                       memcpy(sqe_copy, s->sqe, sizeof(*sqe_copy));
                        s->sqe = sqe_copy;
-
                        memcpy(&req->submit, s, sizeof(*s));
                        list = io_async_list_from_sqe(ctx, s->sqe);
                        if (!io_add_to_prev_work(list, req)) {
                                if (list)
                                        atomic_inc(&list->cnt);
                                INIT_WORK(&req->work, io_sq_wq_submit_work);
-                               queue_work(ctx->sqo_wq, &req->work);
+                               io_queue_async_work(ctx, req);
                        }
 
                        /*
@@ -2074,10 +2311,70 @@ static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
        return ret;
 }
 
+static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
+                       struct sqe_submit *s, bool force_nonblock)
+{
+       int ret;
+
+       ret = io_req_defer(ctx, req, s->sqe);
+       if (ret) {
+               if (ret != -EIOCBQUEUED) {
+                       io_free_req(req);
+                       io_cqring_add_event(ctx, s->sqe->user_data, ret);
+               }
+               return 0;
+       }
+
+       return __io_queue_sqe(ctx, req, s, force_nonblock);
+}
+
+static int io_queue_link_head(struct io_ring_ctx *ctx, struct io_kiocb *req,
+                             struct sqe_submit *s, struct io_kiocb *shadow,
+                             bool force_nonblock)
+{
+       int ret;
+       int need_submit = false;
+
+       if (!shadow)
+               return io_queue_sqe(ctx, req, s, force_nonblock);
+
+       /*
+        * Mark the first IO in link list as DRAIN, let all the following
+        * IOs enter the defer list. all IO needs to be completed before link
+        * list.
+        */
+       req->flags |= REQ_F_IO_DRAIN;
+       ret = io_req_defer(ctx, req, s->sqe);
+       if (ret) {
+               if (ret != -EIOCBQUEUED) {
+                       io_free_req(req);
+                       io_cqring_add_event(ctx, s->sqe->user_data, ret);
+                       return 0;
+               }
+       } else {
+               /*
+                * If ret == 0 means that all IOs in front of link io are
+                * running done. let's queue link head.
+                */
+               need_submit = true;
+       }
+
+       /* Insert shadow req to defer_list, blocking next IOs */
+       spin_lock_irq(&ctx->completion_lock);
+       list_add_tail(&shadow->list, &ctx->defer_list);
+       spin_unlock_irq(&ctx->completion_lock);
+
+       if (need_submit)
+               return __io_queue_sqe(ctx, req, s, force_nonblock);
+
+       return 0;
+}
+
 #define SQE_VALID_FLAGS        (IOSQE_FIXED_FILE|IOSQE_IO_DRAIN|IOSQE_IO_LINK)
 
 static void io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
-                         struct io_submit_state *state, struct io_kiocb **link)
+                         struct io_submit_state *state, struct io_kiocb **link,
+                         bool force_nonblock)
 {
        struct io_uring_sqe *sqe_copy;
        struct io_kiocb *req;
@@ -2130,7 +2427,7 @@ err:
                INIT_LIST_HEAD(&req->link_list);
                *link = req;
        } else {
-               io_queue_sqe(ctx, req, s);
+               io_queue_sqe(ctx, req, s, force_nonblock);
        }
 }
 
@@ -2160,15 +2457,15 @@ static void io_submit_state_start(struct io_submit_state *state,
 
 static void io_commit_sqring(struct io_ring_ctx *ctx)
 {
-       struct io_sq_ring *ring = ctx->sq_ring;
+       struct io_rings *rings = ctx->rings;
 
-       if (ctx->cached_sq_head != READ_ONCE(ring->r.head)) {
+       if (ctx->cached_sq_head != READ_ONCE(rings->sq.head)) {
                /*
                 * Ensure any loads from the SQEs are done at this point,
                 * since once we write the new head, the application could
                 * write new data to them.
                 */
-               smp_store_release(&ring->r.head, ctx->cached_sq_head);
+               smp_store_release(&rings->sq.head, ctx->cached_sq_head);
        }
 }
 
@@ -2182,7 +2479,8 @@ static void io_commit_sqring(struct io_ring_ctx *ctx)
  */
 static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s)
 {
-       struct io_sq_ring *ring = ctx->sq_ring;
+       struct io_rings *rings = ctx->rings;
+       u32 *sq_array = ctx->sq_array;
        unsigned head;
 
        /*
@@ -2195,20 +2493,21 @@ static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s)
         */
        head = ctx->cached_sq_head;
        /* make sure SQ entry isn't read before tail */
-       if (head == smp_load_acquire(&ring->r.tail))
+       if (head == smp_load_acquire(&rings->sq.tail))
                return false;
 
-       head = READ_ONCE(ring->array[head & ctx->sq_mask]);
+       head = READ_ONCE(sq_array[head & ctx->sq_mask]);
        if (head < ctx->sq_entries) {
                s->index = head;
                s->sqe = &ctx->sq_sqes[head];
+               s->sequence = ctx->cached_sq_head;
                ctx->cached_sq_head++;
                return true;
        }
 
        /* drop invalid entries */
        ctx->cached_sq_head++;
-       ring->dropped++;
+       rings->sq_dropped++;
        return false;
 }
 
@@ -2217,6 +2516,7 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,
 {
        struct io_submit_state state, *statep = NULL;
        struct io_kiocb *link = NULL;
+       struct io_kiocb *shadow_req = NULL;
        bool prev_was_link = false;
        int i, submitted = 0;
 
@@ -2231,11 +2531,25 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,
                 * that's the end of the chain. Submit the previous link.
                 */
                if (!prev_was_link && link) {
-                       io_queue_sqe(ctx, link, &link->submit);
+                       io_queue_link_head(ctx, link, &link->submit, shadow_req,
+                                               true);
                        link = NULL;
+                       shadow_req = NULL;
                }
                prev_was_link = (sqes[i].sqe->flags & IOSQE_IO_LINK) != 0;
 
+               if (link && (sqes[i].sqe->flags & IOSQE_IO_DRAIN)) {
+                       if (!shadow_req) {
+                               shadow_req = io_get_req(ctx, NULL);
+                               if (unlikely(!shadow_req))
+                                       goto out;
+                               shadow_req->flags |= (REQ_F_IO_DRAIN | REQ_F_SHADOW_DRAIN);
+                               refcount_dec(&shadow_req->refs);
+                       }
+                       shadow_req->sequence = sqes[i].sequence;
+               }
+
+out:
                if (unlikely(mm_fault)) {
                        io_cqring_add_event(ctx, sqes[i].sqe->user_data,
                                                -EFAULT);
@@ -2243,13 +2557,13 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,
                        sqes[i].has_user = has_user;
                        sqes[i].needs_lock = true;
                        sqes[i].needs_fixed_file = true;
-                       io_submit_sqe(ctx, &sqes[i], statep, &link);
+                       io_submit_sqe(ctx, &sqes[i], statep, &link, true);
                        submitted++;
                }
        }
 
        if (link)
-               io_queue_sqe(ctx, link, &link->submit);
+               io_queue_link_head(ctx, link, &link->submit, shadow_req, true);
        if (statep)
                io_submit_state_end(&state);
 
@@ -2280,15 +2594,7 @@ static int io_sq_thread(void *data)
                        unsigned nr_events = 0;
 
                        if (ctx->flags & IORING_SETUP_IOPOLL) {
-                               /*
-                                * We disallow the app entering submit/complete
-                                * with polling, but we still need to lock the
-                                * ring to prevent racing with polled issue
-                                * that got punted to a workqueue.
-                                */
-                               mutex_lock(&ctx->uring_lock);
                                io_iopoll_check(ctx, &nr_events, 0);
-                               mutex_unlock(&ctx->uring_lock);
                        } else {
                                /*
                                 * Normal IO, just pretend everything completed.
@@ -2309,7 +2615,7 @@ static int io_sq_thread(void *data)
                         * to sleep.
                         */
                        if (inflight || !time_after(jiffies, timeout)) {
-                               cpu_relax();
+                               cond_resched();
                                continue;
                        }
 
@@ -2329,7 +2635,7 @@ static int io_sq_thread(void *data)
                                                TASK_INTERRUPTIBLE);
 
                        /* Tell userspace we may need a wakeup call */
-                       ctx->sq_ring->flags |= IORING_SQ_NEED_WAKEUP;
+                       ctx->rings->sq_flags |= IORING_SQ_NEED_WAKEUP;
                        /* make sure to read SQ tail after writing flags */
                        smp_mb();
 
@@ -2343,12 +2649,12 @@ static int io_sq_thread(void *data)
                                schedule();
                                finish_wait(&ctx->sqo_wait, &wait);
 
-                               ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP;
+                               ctx->rings->sq_flags &= ~IORING_SQ_NEED_WAKEUP;
                                continue;
                        }
                        finish_wait(&ctx->sqo_wait, &wait);
 
-                       ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP;
+                       ctx->rings->sq_flags &= ~IORING_SQ_NEED_WAKEUP;
                }
 
                i = 0;
@@ -2389,10 +2695,12 @@ static int io_sq_thread(void *data)
        return 0;
 }
 
-static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
+static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit,
+                         bool block_for_last)
 {
        struct io_submit_state state, *statep = NULL;
        struct io_kiocb *link = NULL;
+       struct io_kiocb *shadow_req = NULL;
        bool prev_was_link = false;
        int i, submit = 0;
 
@@ -2402,6 +2710,7 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
        }
 
        for (i = 0; i < to_submit; i++) {
+               bool force_nonblock = true;
                struct sqe_submit s;
 
                if (!io_get_sqring(ctx, &s))
@@ -2412,32 +2721,83 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
                 * that's the end of the chain. Submit the previous link.
                 */
                if (!prev_was_link && link) {
-                       io_queue_sqe(ctx, link, &link->submit);
+                       io_queue_link_head(ctx, link, &link->submit, shadow_req,
+                                               force_nonblock);
                        link = NULL;
+                       shadow_req = NULL;
                }
                prev_was_link = (s.sqe->flags & IOSQE_IO_LINK) != 0;
 
+               if (link && (s.sqe->flags & IOSQE_IO_DRAIN)) {
+                       if (!shadow_req) {
+                               shadow_req = io_get_req(ctx, NULL);
+                               if (unlikely(!shadow_req))
+                                       goto out;
+                               shadow_req->flags |= (REQ_F_IO_DRAIN | REQ_F_SHADOW_DRAIN);
+                               refcount_dec(&shadow_req->refs);
+                       }
+                       shadow_req->sequence = s.sequence;
+               }
+
+out:
                s.has_user = true;
                s.needs_lock = false;
                s.needs_fixed_file = false;
                submit++;
-               io_submit_sqe(ctx, &s, statep, &link);
+
+               /*
+                * The caller will block for events after submit, submit the
+                * last IO non-blocking. This is either the only IO it's
+                * submitting, or it already submitted the previous ones. This
+                * improves performance by avoiding an async punt that we don't
+                * need to do.
+                */
+               if (block_for_last && submit == to_submit)
+                       force_nonblock = false;
+
+               io_submit_sqe(ctx, &s, statep, &link, force_nonblock);
        }
        io_commit_sqring(ctx);
 
        if (link)
-               io_queue_sqe(ctx, link, &link->submit);
+               io_queue_link_head(ctx, link, &link->submit, shadow_req,
+                                       block_for_last);
        if (statep)
                io_submit_state_end(statep);
 
        return submit;
 }
 
-static unsigned io_cqring_events(struct io_cq_ring *ring)
+struct io_wait_queue {
+       struct wait_queue_entry wq;
+       struct io_ring_ctx *ctx;
+       unsigned to_wait;
+       unsigned nr_timeouts;
+};
+
+static inline bool io_should_wake(struct io_wait_queue *iowq)
 {
-       /* See comment at the top of this file */
-       smp_rmb();
-       return READ_ONCE(ring->r.tail) - READ_ONCE(ring->r.head);
+       struct io_ring_ctx *ctx = iowq->ctx;
+
+       /*
+        * Wake up if we have enough events, or if a timeout occured since we
+        * started waiting. For timeouts, we always want to return to userspace,
+        * regardless of event count.
+        */
+       return io_cqring_events(ctx->rings) >= iowq->to_wait ||
+                       atomic_read(&ctx->cq_timeouts) != iowq->nr_timeouts;
+}
+
+static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
+                           int wake_flags, void *key)
+{
+       struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue,
+                                                       wq);
+
+       if (!io_should_wake(iowq))
+               return -1;
+
+       return autoremove_wake_function(curr, mode, wake_flags, key);
 }
 
 /*
@@ -2447,10 +2807,19 @@ static unsigned io_cqring_events(struct io_cq_ring *ring)
 static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
                          const sigset_t __user *sig, size_t sigsz)
 {
-       struct io_cq_ring *ring = ctx->cq_ring;
+       struct io_wait_queue iowq = {
+               .wq = {
+                       .private        = current,
+                       .func           = io_wake_function,
+                       .entry          = LIST_HEAD_INIT(iowq.wq.entry),
+               },
+               .ctx            = ctx,
+               .to_wait        = min_events,
+       };
+       struct io_rings *rings = ctx->rings;
        int ret;
 
-       if (io_cqring_events(ring) >= min_events)
+       if (io_cqring_events(rings) >= min_events)
                return 0;
 
        if (sig) {
@@ -2466,12 +2835,26 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
                        return ret;
        }
 
-       ret = wait_event_interruptible(ctx->wait, io_cqring_events(ring) >= min_events);
+       ret = 0;
+       iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts);
+       do {
+               prepare_to_wait_exclusive(&ctx->wait, &iowq.wq,
+                                               TASK_INTERRUPTIBLE);
+               if (io_should_wake(&iowq))
+                       break;
+               schedule();
+               if (signal_pending(current)) {
+                       ret = -ERESTARTSYS;
+                       break;
+               }
+       } while (1);
+       finish_wait(&ctx->wait, &iowq.wq);
+
        restore_saved_sigmask_unless(ret == -ERESTARTSYS);
        if (ret == -ERESTARTSYS)
                ret = -EINTR;
 
-       return READ_ONCE(ring->r.head) == READ_ONCE(ring->r.tail) ? ret : 0;
+       return READ_ONCE(rings->cq.head) == READ_ONCE(rings->cq.tail) ? ret : 0;
 }
 
 static void __io_sqe_files_unregister(struct io_ring_ctx *ctx)
@@ -2521,11 +2904,15 @@ static void io_sq_thread_stop(struct io_ring_ctx *ctx)
 
 static void io_finish_async(struct io_ring_ctx *ctx)
 {
+       int i;
+
        io_sq_thread_stop(ctx);
 
-       if (ctx->sqo_wq) {
-               destroy_workqueue(ctx->sqo_wq);
-               ctx->sqo_wq = NULL;
+       for (i = 0; i < ARRAY_SIZE(ctx->sqo_wq); i++) {
+               if (ctx->sqo_wq[i]) {
+                       destroy_workqueue(ctx->sqo_wq[i]);
+                       ctx->sqo_wq[i] = NULL;
+               }
        }
 }
 
@@ -2733,16 +3120,31 @@ static int io_sq_offload_start(struct io_ring_ctx *ctx,
        }
 
        /* Do QD, or 2 * CPUS, whatever is smallest */
-       ctx->sqo_wq = alloc_workqueue("io_ring-wq", WQ_UNBOUND | WQ_FREEZABLE,
+       ctx->sqo_wq[0] = alloc_workqueue("io_ring-wq",
+                       WQ_UNBOUND | WQ_FREEZABLE,
                        min(ctx->sq_entries - 1, 2 * num_online_cpus()));
-       if (!ctx->sqo_wq) {
+       if (!ctx->sqo_wq[0]) {
+               ret = -ENOMEM;
+               goto err;
+       }
+
+       /*
+        * This is for buffered writes, where we want to limit the parallelism
+        * due to file locking in file systems. As "normal" buffered writes
+        * should parellelize on writeout quite nicely, limit us to having 2
+        * pending. This avoids massive contention on the inode when doing
+        * buffered async writes.
+        */
+       ctx->sqo_wq[1] = alloc_workqueue("io_ring-write-wq",
+                                               WQ_UNBOUND | WQ_FREEZABLE, 2);
+       if (!ctx->sqo_wq[1]) {
                ret = -ENOMEM;
                goto err;
        }
 
        return 0;
 err:
-       io_sq_thread_stop(ctx);
+       io_finish_async(ctx);
        mmdrop(ctx->sqo_mm);
        ctx->sqo_mm = NULL;
        return ret;
@@ -2791,17 +3193,45 @@ static void *io_mem_alloc(size_t size)
        return (void *) __get_free_pages(gfp_flags, get_order(size));
 }
 
+static unsigned long rings_size(unsigned sq_entries, unsigned cq_entries,
+                               size_t *sq_offset)
+{
+       struct io_rings *rings;
+       size_t off, sq_array_size;
+
+       off = struct_size(rings, cqes, cq_entries);
+       if (off == SIZE_MAX)
+               return SIZE_MAX;
+
+#ifdef CONFIG_SMP
+       off = ALIGN(off, SMP_CACHE_BYTES);
+       if (off == 0)
+               return SIZE_MAX;
+#endif
+
+       sq_array_size = array_size(sizeof(u32), sq_entries);
+       if (sq_array_size == SIZE_MAX)
+               return SIZE_MAX;
+
+       if (check_add_overflow(off, sq_array_size, &off))
+               return SIZE_MAX;
+
+       if (sq_offset)
+               *sq_offset = off;
+
+       return off;
+}
+
 static unsigned long ring_pages(unsigned sq_entries, unsigned cq_entries)
 {
-       struct io_sq_ring *sq_ring;
-       struct io_cq_ring *cq_ring;
-       size_t bytes;
+       size_t pages;
 
-       bytes = struct_size(sq_ring, array, sq_entries);
-       bytes += array_size(sizeof(struct io_uring_sqe), sq_entries);
-       bytes += struct_size(cq_ring, cqes, cq_entries);
+       pages = (size_t)1 << get_order(
+               rings_size(sq_entries, cq_entries, NULL));
+       pages += (size_t)1 << get_order(
+               array_size(sizeof(struct io_uring_sqe), sq_entries));
 
-       return (bytes + PAGE_SIZE - 1) / PAGE_SIZE;
+       return pages;
 }
 
 static int io_sqe_buffer_unregister(struct io_ring_ctx *ctx)
@@ -2815,7 +3245,7 @@ static int io_sqe_buffer_unregister(struct io_ring_ctx *ctx)
                struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
 
                for (j = 0; j < imu->nr_bvecs; j++)
-                       put_page(imu->bvec[j].bv_page);
+                       put_user_page(imu->bvec[j].bv_page);
 
                if (ctx->account_mem)
                        io_unaccount_mem(ctx->user, imu->nr_bvecs);
@@ -2959,10 +3389,8 @@ static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg,
                         * if we did partial map, or found file backed vmas,
                         * release any pages we did get
                         */
-                       if (pret > 0) {
-                               for (j = 0; j < pret; j++)
-                                       put_page(pages[j]);
-                       }
+                       if (pret > 0)
+                               put_user_pages(pages, pret);
                        if (ctx->account_mem)
                                io_unaccount_mem(ctx->user, nr_pages);
                        kvfree(imu->bvec);
@@ -3048,9 +3476,8 @@ static void io_ring_ctx_free(struct io_ring_ctx *ctx)
        }
 #endif
 
-       io_mem_free(ctx->sq_ring);
+       io_mem_free(ctx->rings);
        io_mem_free(ctx->sq_sqes);
-       io_mem_free(ctx->cq_ring);
 
        percpu_ref_exit(&ctx->refs);
        if (ctx->account_mem)
@@ -3071,10 +3498,10 @@ static __poll_t io_uring_poll(struct file *file, poll_table *wait)
         * io_commit_cqring
         */
        smp_rmb();
-       if (READ_ONCE(ctx->sq_ring->r.tail) - ctx->cached_sq_head !=
-           ctx->sq_ring->ring_entries)
+       if (READ_ONCE(ctx->rings->sq.tail) - ctx->cached_sq_head !=
+           ctx->rings->sq_ring_entries)
                mask |= EPOLLOUT | EPOLLWRNORM;
-       if (READ_ONCE(ctx->cq_ring->r.head) != ctx->cached_cq_tail)
+       if (READ_ONCE(ctx->rings->cq.head) != ctx->cached_cq_tail)
                mask |= EPOLLIN | EPOLLRDNORM;
 
        return mask;
@@ -3093,6 +3520,7 @@ static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
        percpu_ref_kill(&ctx->refs);
        mutex_unlock(&ctx->uring_lock);
 
+       io_kill_timeouts(ctx);
        io_poll_remove_all(ctx);
        io_iopoll_reap_events(ctx);
        wait_for_completion(&ctx->ctx_done);
@@ -3119,20 +3547,18 @@ static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
 
        switch (offset) {
        case IORING_OFF_SQ_RING:
-               ptr = ctx->sq_ring;
+       case IORING_OFF_CQ_RING:
+               ptr = ctx->rings;
                break;
        case IORING_OFF_SQES:
                ptr = ctx->sq_sqes;
                break;
-       case IORING_OFF_CQ_RING:
-               ptr = ctx->cq_ring;
-               break;
        default:
                return -EINVAL;
        }
 
        page = virt_to_head_page(ptr);
-       if (sz > (PAGE_SIZE << compound_order(page)))
+       if (sz > page_size(page))
                return -EINVAL;
 
        pfn = virt_to_phys(ptr) >> PAGE_SHIFT;
@@ -3169,19 +3595,27 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
         * Just return the requested submit count, and wake the thread if
         * we were asked to.
         */
+       ret = 0;
        if (ctx->flags & IORING_SETUP_SQPOLL) {
                if (flags & IORING_ENTER_SQ_WAKEUP)
                        wake_up(&ctx->sqo_wait);
                submitted = to_submit;
-               goto out_ctx;
-       }
+       } else if (to_submit) {
+               bool block_for_last = false;
 
-       ret = 0;
-       if (to_submit) {
                to_submit = min(to_submit, ctx->sq_entries);
 
+               /*
+                * Allow last submission to block in a series, IFF the caller
+                * asked to wait for events and we don't currently have
+                * enough. This potentially avoids an async punt.
+                */
+               if (to_submit == min_complete &&
+                   io_cqring_events(ctx->rings) < min_complete)
+                       block_for_last = true;
+
                mutex_lock(&ctx->uring_lock);
-               submitted = io_ring_submit(ctx, to_submit);
+               submitted = io_ring_submit(ctx, to_submit, block_for_last);
                mutex_unlock(&ctx->uring_lock);
        }
        if (flags & IORING_ENTER_GETEVENTS) {
@@ -3190,15 +3624,12 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
                min_complete = min(min_complete, ctx->cq_entries);
 
                if (ctx->flags & IORING_SETUP_IOPOLL) {
-                       mutex_lock(&ctx->uring_lock);
                        ret = io_iopoll_check(ctx, &nr_events, min_complete);
-                       mutex_unlock(&ctx->uring_lock);
                } else {
                        ret = io_cqring_wait(ctx, min_complete, sig, sigsz);
                }
        }
 
-out_ctx:
        io_ring_drop_ctx_refs(ctx, 1);
 out_fput:
        fdput(f);
@@ -3215,19 +3646,27 @@ static const struct file_operations io_uring_fops = {
 static int io_allocate_scq_urings(struct io_ring_ctx *ctx,
                                  struct io_uring_params *p)
 {
-       struct io_sq_ring *sq_ring;
-       struct io_cq_ring *cq_ring;
-       size_t size;
+       struct io_rings *rings;
+       size_t size, sq_array_offset;
 
-       sq_ring = io_mem_alloc(struct_size(sq_ring, array, p->sq_entries));
-       if (!sq_ring)
+       size = rings_size(p->sq_entries, p->cq_entries, &sq_array_offset);
+       if (size == SIZE_MAX)
+               return -EOVERFLOW;
+
+       rings = io_mem_alloc(size);
+       if (!rings)
                return -ENOMEM;
 
-       ctx->sq_ring = sq_ring;
-       sq_ring->ring_mask = p->sq_entries - 1;
-       sq_ring->ring_entries = p->sq_entries;
-       ctx->sq_mask = sq_ring->ring_mask;
-       ctx->sq_entries = sq_ring->ring_entries;
+       ctx->rings = rings;
+       ctx->sq_array = (u32 *)((char *)rings + sq_array_offset);
+       rings->sq_ring_mask = p->sq_entries - 1;
+       rings->cq_ring_mask = p->cq_entries - 1;
+       rings->sq_ring_entries = p->sq_entries;
+       rings->cq_ring_entries = p->cq_entries;
+       ctx->sq_mask = rings->sq_ring_mask;
+       ctx->cq_mask = rings->cq_ring_mask;
+       ctx->sq_entries = rings->sq_ring_entries;
+       ctx->cq_entries = rings->cq_ring_entries;
 
        size = array_size(sizeof(struct io_uring_sqe), p->sq_entries);
        if (size == SIZE_MAX)
@@ -3237,15 +3676,6 @@ static int io_allocate_scq_urings(struct io_ring_ctx *ctx,
        if (!ctx->sq_sqes)
                return -ENOMEM;
 
-       cq_ring = io_mem_alloc(struct_size(cq_ring, cqes, p->cq_entries));
-       if (!cq_ring)
-               return -ENOMEM;
-
-       ctx->cq_ring = cq_ring;
-       cq_ring->ring_mask = p->cq_entries - 1;
-       cq_ring->ring_entries = p->cq_entries;
-       ctx->cq_mask = cq_ring->ring_mask;
-       ctx->cq_entries = cq_ring->ring_entries;
        return 0;
 }
 
@@ -3349,21 +3779,23 @@ static int io_uring_create(unsigned entries, struct io_uring_params *p)
                goto err;
 
        memset(&p->sq_off, 0, sizeof(p->sq_off));
-       p->sq_off.head = offsetof(struct io_sq_ring, r.head);
-       p->sq_off.tail = offsetof(struct io_sq_ring, r.tail);
-       p->sq_off.ring_mask = offsetof(struct io_sq_ring, ring_mask);
-       p->sq_off.ring_entries = offsetof(struct io_sq_ring, ring_entries);
-       p->sq_off.flags = offsetof(struct io_sq_ring, flags);
-       p->sq_off.dropped = offsetof(struct io_sq_ring, dropped);
-       p->sq_off.array = offsetof(struct io_sq_ring, array);
+       p->sq_off.head = offsetof(struct io_rings, sq.head);
+       p->sq_off.tail = offsetof(struct io_rings, sq.tail);
+       p->sq_off.ring_mask = offsetof(struct io_rings, sq_ring_mask);
+       p->sq_off.ring_entries = offsetof(struct io_rings, sq_ring_entries);
+       p->sq_off.flags = offsetof(struct io_rings, sq_flags);
+       p->sq_off.dropped = offsetof(struct io_rings, sq_dropped);
+       p->sq_off.array = (char *)ctx->sq_array - (char *)ctx->rings;
 
        memset(&p->cq_off, 0, sizeof(p->cq_off));
-       p->cq_off.head = offsetof(struct io_cq_ring, r.head);
-       p->cq_off.tail = offsetof(struct io_cq_ring, r.tail);
-       p->cq_off.ring_mask = offsetof(struct io_cq_ring, ring_mask);
-       p->cq_off.ring_entries = offsetof(struct io_cq_ring, ring_entries);
-       p->cq_off.overflow = offsetof(struct io_cq_ring, overflow);
-       p->cq_off.cqes = offsetof(struct io_cq_ring, cqes);
+       p->cq_off.head = offsetof(struct io_rings, cq.head);
+       p->cq_off.tail = offsetof(struct io_rings, cq.tail);
+       p->cq_off.ring_mask = offsetof(struct io_rings, cq_ring_mask);
+       p->cq_off.ring_entries = offsetof(struct io_rings, cq_ring_entries);
+       p->cq_off.overflow = offsetof(struct io_rings, cq_overflow);
+       p->cq_off.cqes = offsetof(struct io_rings, cqes);
+
+       p->features = IORING_FEAT_SINGLE_MMAP;
        return ret;
 err:
        io_ring_ctx_wait_and_kill(ctx);