io_uring: replace workqueue usage with io-wq
authorJens Axboe <axboe@kernel.dk>
Thu, 24 Oct 2019 13:25:42 +0000 (07:25 -0600)
committerJens Axboe <axboe@kernel.dk>
Tue, 29 Oct 2019 18:43:06 +0000 (12:43 -0600)
Drop various work-arounds we have for workqueues:

- We no longer need the async_list for tracking sequential IO.

- We don't have to maintain our own mm tracking/setting.

- We don't need a separate workqueue for buffered writes. This didn't
  even work that well to begin with, as it was suboptimal for multiple
  buffered writers on multiple files.

- We can properly cancel pending interruptible work. This fixes
  deadlocks with particularly socket IO, where we cannot cancel them
  when the io_uring is closed. Hence the ring will wait forever for
  these requests to complete, which may never happen. This is different
  from disk IO where we know requests will complete in a finite amount
  of time.

- Due to being able to cancel work interruptible work that is already
  running, we can implement file table support for work. We need that
  for supporting system calls that add to a process file table.

- It gets us one step closer to adding async support for any system
  call.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
fs/io_uring.c
include/trace/events/io_uring.h
init/Kconfig

index f9eff8f62ddb95aba07cd05203dd44a0f2bd0143..d94bd4e3a60eb675fac9f447fd9eb5e28505058e 100644 (file)
@@ -56,7 +56,6 @@
 #include <linux/mmu_context.h>
 #include <linux/percpu.h>
 #include <linux/slab.h>
-#include <linux/workqueue.h>
 #include <linux/kthread.h>
 #include <linux/blkdev.h>
 #include <linux/bvec.h>
@@ -77,6 +76,7 @@
 #include <uapi/linux/io_uring.h>
 
 #include "internal.h"
+#include "io-wq.h"
 
 #define IORING_MAX_ENTRIES     32768
 #define IORING_MAX_CQ_ENTRIES  (2 * IORING_MAX_ENTRIES)
@@ -165,16 +165,6 @@ struct io_mapped_ubuf {
        unsigned int    nr_bvecs;
 };
 
-struct async_list {
-       spinlock_t              lock;
-       atomic_t                cnt;
-       struct list_head        list;
-
-       struct file             *file;
-       off_t                   io_start;
-       size_t                  io_len;
-};
-
 struct io_ring_ctx {
        struct {
                struct percpu_ref       refs;
@@ -209,7 +199,7 @@ struct io_ring_ctx {
        } ____cacheline_aligned_in_smp;
 
        /* IO offload */
-       struct workqueue_struct *sqo_wq[2];
+       struct io_wq            *io_wq;
        struct task_struct      *sqo_thread;    /* if using sq thread polling */
        struct mm_struct        *sqo_mm;
        wait_queue_head_t       sqo_wait;
@@ -262,8 +252,6 @@ struct io_ring_ctx {
                struct list_head        cancel_list;
        } ____cacheline_aligned_in_smp;
 
-       struct async_list       pending_async[2];
-
 #if defined(CONFIG_UNIX)
        struct socket           *ring_sock;
 #endif
@@ -333,7 +321,7 @@ struct io_kiocb {
        u32                     result;
        u32                     sequence;
 
-       struct work_struct      work;
+       struct io_wq_work       work;
 };
 
 #define IO_PLUG_THRESHOLD              2
@@ -359,7 +347,7 @@ struct io_submit_state {
        unsigned int            ios_left;
 };
 
-static void io_sq_wq_submit_work(struct work_struct *work);
+static void io_wq_submit_work(struct io_wq_work **workptr);
 static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data,
                                 long res);
 static void __io_free_req(struct io_kiocb *req);
@@ -391,7 +379,6 @@ static void io_ring_ctx_ref_free(struct percpu_ref *ref)
 static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
 {
        struct io_ring_ctx *ctx;
-       int i;
 
        ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
        if (!ctx)
@@ -409,11 +396,6 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
        init_completion(&ctx->sqo_thread_started);
        mutex_init(&ctx->uring_lock);
        init_waitqueue_head(&ctx->wait);
-       for (i = 0; i < ARRAY_SIZE(ctx->pending_async); i++) {
-               spin_lock_init(&ctx->pending_async[i].lock);
-               INIT_LIST_HEAD(&ctx->pending_async[i].list);
-               atomic_set(&ctx->pending_async[i].cnt, 0);
-       }
        spin_lock_init(&ctx->completion_lock);
        INIT_LIST_HEAD(&ctx->poll_list);
        INIT_LIST_HEAD(&ctx->cancel_list);
@@ -479,22 +461,45 @@ static void __io_commit_cqring(struct io_ring_ctx *ctx)
        }
 }
 
-static inline void io_queue_async_work(struct io_ring_ctx *ctx,
-                                      struct io_kiocb *req)
+static inline bool io_sqe_needs_user(const struct io_uring_sqe *sqe)
 {
-       int rw = 0;
+       u8 opcode = READ_ONCE(sqe->opcode);
+
+       return !(opcode == IORING_OP_READ_FIXED ||
+                opcode == IORING_OP_WRITE_FIXED);
+}
+
+static inline bool io_prep_async_work(struct io_kiocb *req)
+{
+       bool do_hashed = false;
 
        if (req->submit.sqe) {
                switch (req->submit.sqe->opcode) {
                case IORING_OP_WRITEV:
                case IORING_OP_WRITE_FIXED:
-                       rw = !(req->rw.ki_flags & IOCB_DIRECT);
+                       do_hashed = true;
                        break;
                }
+               if (io_sqe_needs_user(req->submit.sqe))
+                       req->work.flags |= IO_WQ_WORK_NEEDS_USER;
        }
 
-       trace_io_uring_queue_async_work(ctx, rw, req, &req->work, req->flags);
-       queue_work(ctx->sqo_wq[rw], &req->work);
+       return do_hashed;
+}
+
+static inline void io_queue_async_work(struct io_ring_ctx *ctx,
+                                      struct io_kiocb *req)
+{
+       bool do_hashed = io_prep_async_work(req);
+
+       trace_io_uring_queue_async_work(ctx, do_hashed, req, &req->work,
+                                       req->flags);
+       if (!do_hashed) {
+               io_wq_enqueue(ctx->io_wq, &req->work);
+       } else {
+               io_wq_enqueue_hashed(ctx->io_wq, &req->work,
+                                       file_inode(req->file));
+       }
 }
 
 static void io_kill_timeout(struct io_kiocb *req)
@@ -647,6 +652,7 @@ static struct io_kiocb *io_get_req(struct io_ring_ctx *ctx,
        /* one is dropped after submission, the other at completion */
        refcount_set(&req->refs, 2);
        req->result = 0;
+       INIT_IO_WORK(&req->work, io_wq_submit_work);
        return req;
 out:
        percpu_ref_put(&ctx->refs);
@@ -693,12 +699,10 @@ static void io_req_link_next(struct io_kiocb *req, struct io_kiocb **nxtptr)
                 * If we're in async work, we can continue processing the chain
                 * in this context instead of having to queue up new async work.
                 */
-               if (nxtptr && current_work()) {
+               if (nxtptr && current_work())
                        *nxtptr = nxt;
-               } else {
-                       INIT_WORK(&nxt->work, io_sq_wq_submit_work);
+               else
                        io_queue_async_work(req->ctx, nxt);
-               }
        }
 }
 
@@ -757,12 +761,10 @@ static void io_put_req(struct io_kiocb *req, struct io_kiocb **nxtptr)
 
        nxt = io_put_req_find_next(req);
        if (nxt) {
-               if (nxtptr) {
+               if (nxtptr)
                        *nxtptr = nxt;
-               } else {
-                       INIT_WORK(&nxt->work, io_sq_wq_submit_work);
+               else
                        io_queue_async_work(nxt->ctx, nxt);
-               }
        }
 }
 
@@ -1324,65 +1326,6 @@ static ssize_t io_import_iovec(struct io_ring_ctx *ctx, int rw,
        return import_iovec(rw, buf, sqe_len, UIO_FASTIOV, iovec, iter);
 }
 
-static inline bool io_should_merge(struct async_list *al, struct kiocb *kiocb)
-{
-       if (al->file == kiocb->ki_filp) {
-               off_t start, end;
-
-               /*
-                * Allow merging if we're anywhere in the range of the same
-                * page. Generally this happens for sub-page reads or writes,
-                * and it's beneficial to allow the first worker to bring the
-                * page in and the piggy backed work can then work on the
-                * cached page.
-                */
-               start = al->io_start & PAGE_MASK;
-               end = (al->io_start + al->io_len + PAGE_SIZE - 1) & PAGE_MASK;
-               if (kiocb->ki_pos >= start && kiocb->ki_pos <= end)
-                       return true;
-       }
-
-       al->file = NULL;
-       return false;
-}
-
-/*
- * Make a note of the last file/offset/direction we punted to async
- * context. We'll use this information to see if we can piggy back a
- * sequential request onto the previous one, if it's still hasn't been
- * completed by the async worker.
- */
-static void io_async_list_note(int rw, struct io_kiocb *req, size_t len)
-{
-       struct async_list *async_list = &req->ctx->pending_async[rw];
-       struct kiocb *kiocb = &req->rw;
-       struct file *filp = kiocb->ki_filp;
-
-       if (io_should_merge(async_list, kiocb)) {
-               unsigned long max_bytes;
-
-               /* Use 8x RA size as a decent limiter for both reads/writes */
-               max_bytes = filp->f_ra.ra_pages << (PAGE_SHIFT + 3);
-               if (!max_bytes)
-                       max_bytes = VM_READAHEAD_PAGES << (PAGE_SHIFT + 3);
-
-               /* If max len are exceeded, reset the state */
-               if (async_list->io_len + len <= max_bytes) {
-                       req->flags |= REQ_F_SEQ_PREV;
-                       async_list->io_len += len;
-               } else {
-                       async_list->file = NULL;
-               }
-       }
-
-       /* New file? Reset state. */
-       if (async_list->file != filp) {
-               async_list->io_start = kiocb->ki_pos;
-               async_list->io_len = len;
-               async_list->file = filp;
-       }
-}
-
 /*
  * For files that don't have ->read_iter() and ->write_iter(), handle them
  * by looping over ->read() or ->write() manually.
@@ -1477,13 +1420,10 @@ static int io_read(struct io_kiocb *req, const struct sqe_submit *s,
                    ret2 > 0 && ret2 < read_size)
                        ret2 = -EAGAIN;
                /* Catch -EAGAIN return for forced non-blocking submission */
-               if (!force_nonblock || ret2 != -EAGAIN) {
+               if (!force_nonblock || ret2 != -EAGAIN)
                        kiocb_done(kiocb, ret2, nxt, s->in_async);
-               } else {
-                       if (!s->in_async)
-                               io_async_list_note(READ, req, iov_count);
+               else
                        ret = -EAGAIN;
-               }
        }
        kfree(iovec);
        return ret;
@@ -1517,11 +1457,8 @@ static int io_write(struct io_kiocb *req, const struct sqe_submit *s,
        iov_count = iov_iter_count(&iter);
 
        ret = -EAGAIN;
-       if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT)) {
-               if (!s->in_async)
-                       io_async_list_note(WRITE, req, iov_count);
+       if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT))
                goto out_free;
-       }
 
        ret = rw_verify_area(WRITE, file, &kiocb->ki_pos, iov_count);
        if (!ret) {
@@ -1546,13 +1483,10 @@ static int io_write(struct io_kiocb *req, const struct sqe_submit *s,
                        ret2 = call_write_iter(file, kiocb, &iter);
                else
                        ret2 = loop_rw_iter(WRITE, file, kiocb, &iter);
-               if (!force_nonblock || ret2 != -EAGAIN) {
+               if (!force_nonblock || ret2 != -EAGAIN)
                        kiocb_done(kiocb, ret2, nxt, s->in_async);
-               } else {
-                       if (!s->in_async)
-                               io_async_list_note(WRITE, req, iov_count);
+               else
                        ret = -EAGAIN;
-               }
        }
 out_free:
        kfree(iovec);
@@ -1794,14 +1728,18 @@ static void io_poll_complete(struct io_ring_ctx *ctx, struct io_kiocb *req,
        io_commit_cqring(ctx);
 }
 
-static void io_poll_complete_work(struct work_struct *work)
+static void io_poll_complete_work(struct io_wq_work **workptr)
 {
+       struct io_wq_work *work = *workptr;
        struct io_kiocb *req = container_of(work, struct io_kiocb, work);
        struct io_poll_iocb *poll = &req->poll;
        struct poll_table_struct pt = { ._key = poll->events };
        struct io_ring_ctx *ctx = req->ctx;
        __poll_t mask = 0;
 
+       if (work->flags & IO_WQ_WORK_CANCEL)
+               WRITE_ONCE(poll->canceled, true);
+
        if (!READ_ONCE(poll->canceled))
                mask = vfs_poll(poll->file, &pt) & poll->events;
 
@@ -1894,7 +1832,7 @@ static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe)
                return -EBADF;
 
        req->submit.sqe = NULL;
-       INIT_WORK(&req->work, io_poll_complete_work);
+       INIT_IO_WORK(&req->work, io_poll_complete_work);
        events = READ_ONCE(sqe->poll_events);
        poll->events = demangle_poll(events) | EPOLLERR | EPOLLHUP;
 
@@ -2152,7 +2090,6 @@ static int io_req_defer(struct io_ring_ctx *ctx, struct io_kiocb *req,
        memcpy(sqe_copy, sqe, sizeof(*sqe_copy));
        req->submit.sqe = sqe_copy;
 
-       INIT_WORK(&req->work, io_sq_wq_submit_work);
        trace_io_uring_defer(ctx, req, false);
        list_add_tail(&req->list, &ctx->defer_list);
        spin_unlock_irq(&ctx->completion_lock);
@@ -2235,186 +2172,54 @@ static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
        return 0;
 }
 
-static struct async_list *io_async_list_from_sqe(struct io_ring_ctx *ctx,
-                                                const struct io_uring_sqe *sqe)
-{
-       switch (sqe->opcode) {
-       case IORING_OP_READV:
-       case IORING_OP_READ_FIXED:
-               return &ctx->pending_async[READ];
-       case IORING_OP_WRITEV:
-       case IORING_OP_WRITE_FIXED:
-               return &ctx->pending_async[WRITE];
-       default:
-               return NULL;
-       }
-}
-
-static inline bool io_sqe_needs_user(const struct io_uring_sqe *sqe)
-{
-       u8 opcode = READ_ONCE(sqe->opcode);
-
-       return !(opcode == IORING_OP_READ_FIXED ||
-                opcode == IORING_OP_WRITE_FIXED);
-}
-
-static void io_sq_wq_submit_work(struct work_struct *work)
+static void io_wq_submit_work(struct io_wq_work **workptr)
 {
+       struct io_wq_work *work = *workptr;
        struct io_kiocb *req = container_of(work, struct io_kiocb, work);
        struct io_ring_ctx *ctx = req->ctx;
-       struct mm_struct *cur_mm = NULL;
-       struct async_list *async_list;
-       LIST_HEAD(req_list);
-       mm_segment_t old_fs;
-       int ret;
+       struct sqe_submit *s = &req->submit;
+       const struct io_uring_sqe *sqe = s->sqe;
+       struct io_kiocb *nxt = NULL;
+       int ret = 0;
 
-       async_list = io_async_list_from_sqe(ctx, req->submit.sqe);
-restart:
-       do {
-               struct sqe_submit *s = &req->submit;
-               const struct io_uring_sqe *sqe = s->sqe;
-               unsigned int flags = req->flags;
-               struct io_kiocb *nxt = NULL;
+       /* Ensure we clear previously set non-block flag */
+       req->rw.ki_flags &= ~IOCB_NOWAIT;
 
-               /* Ensure we clear previously set non-block flag */
-               req->rw.ki_flags &= ~IOCB_NOWAIT;
+       if (work->flags & IO_WQ_WORK_CANCEL)
+               ret = -ECANCELED;
 
-               ret = 0;
-               if (io_sqe_needs_user(sqe) && !cur_mm) {
-                       if (!mmget_not_zero(ctx->sqo_mm)) {
-                               ret = -EFAULT;
-                       } else {
-                               cur_mm = ctx->sqo_mm;
-                               use_mm(cur_mm);
-                               old_fs = get_fs();
-                               set_fs(USER_DS);
-                       }
-               }
+       if (!ret) {
+               s->has_user = (work->flags & IO_WQ_WORK_HAS_MM) != 0;
+               s->in_async = true;
+               do {
+                       ret = __io_submit_sqe(ctx, req, s, &nxt, false);
+                       /*
+                        * We can get EAGAIN for polled IO even though we're
+                        * forcing a sync submission from here, since we can't
+                        * wait for request slots on the block side.
+                        */
+                       if (ret != -EAGAIN)
+                               break;
+                       cond_resched();
+               } while (1);
+       }
 
-               if (!ret) {
-                       s->has_user = cur_mm != NULL;
-                       s->in_async = true;
-                       do {
-                               ret = __io_submit_sqe(ctx, req, s, &nxt, false);
-                               /*
-                                * We can get EAGAIN for polled IO even though
-                                * we're forcing a sync submission from here,
-                                * since we can't wait for request slots on the
-                                * block side.
-                                */
-                               if (ret != -EAGAIN)
-                                       break;
-                               cond_resched();
-                       } while (1);
-               }
+       /* drop submission reference */
+       io_put_req(req, NULL);
 
-               /* drop submission reference */
+       if (ret) {
+               io_cqring_add_event(ctx, sqe->user_data, ret);
                io_put_req(req, NULL);
-
-               if (ret) {
-                       io_cqring_add_event(ctx, sqe->user_data, ret);
-                       io_put_req(req, NULL);
-               }
-
-               /* async context always use a copy of the sqe */
-               kfree(sqe);
-
-               /* if a dependent link is ready, do that as the next one */
-               if (!ret && nxt) {
-                       req = nxt;
-                       continue;
-               }
-
-               /* req from defer and link list needn't decrease async cnt */
-               if (flags & (REQ_F_IO_DRAINED | REQ_F_LINK_DONE))
-                       goto out;
-
-               if (!async_list)
-                       break;
-               if (!list_empty(&req_list)) {
-                       req = list_first_entry(&req_list, struct io_kiocb,
-                                               list);
-                       list_del(&req->list);
-                       continue;
-               }
-               if (list_empty(&async_list->list))
-                       break;
-
-               req = NULL;
-               spin_lock(&async_list->lock);
-               if (list_empty(&async_list->list)) {
-                       spin_unlock(&async_list->lock);
-                       break;
-               }
-               list_splice_init(&async_list->list, &req_list);
-               spin_unlock(&async_list->lock);
-
-               req = list_first_entry(&req_list, struct io_kiocb, list);
-               list_del(&req->list);
-       } while (req);
-
-       /*
-        * Rare case of racing with a submitter. If we find the count has
-        * dropped to zero AND we have pending work items, then restart
-        * the processing. This is a tiny race window.
-        */
-       if (async_list) {
-               ret = atomic_dec_return(&async_list->cnt);
-               while (!ret && !list_empty(&async_list->list)) {
-                       spin_lock(&async_list->lock);
-                       atomic_inc(&async_list->cnt);
-                       list_splice_init(&async_list->list, &req_list);
-                       spin_unlock(&async_list->lock);
-
-                       if (!list_empty(&req_list)) {
-                               req = list_first_entry(&req_list,
-                                                       struct io_kiocb, list);
-                               list_del(&req->list);
-                               goto restart;
-                       }
-                       ret = atomic_dec_return(&async_list->cnt);
-               }
-       }
-
-out:
-       if (cur_mm) {
-               set_fs(old_fs);
-               unuse_mm(cur_mm);
-               mmput(cur_mm);
        }
-}
-
-/*
- * See if we can piggy back onto previously submitted work, that is still
- * running. We currently only allow this if the new request is sequential
- * to the previous one we punted.
- */
-static bool io_add_to_prev_work(struct async_list *list, struct io_kiocb *req)
-{
-       bool ret;
 
-       if (!list)
-               return false;
-       if (!(req->flags & REQ_F_SEQ_PREV))
-               return false;
-       if (!atomic_read(&list->cnt))
-               return false;
+       /* async context always use a copy of the sqe */
+       kfree(sqe);
 
-       ret = true;
-       spin_lock(&list->lock);
-       list_add_tail(&req->list, &list->list);
-       /*
-        * Ensure we see a simultaneous modification from io_sq_wq_submit_work()
-        */
-       smp_mb();
-       if (!atomic_read(&list->cnt)) {
-               list_del_init(&req->list);
-               ret = false;
+       /* if a dependent link is ready, pass it back */
+       if (!ret && nxt) {
+               io_prep_async_work(nxt);
+               *workptr = &nxt->work;
        }
-       spin_unlock(&list->lock);
-
-       trace_io_uring_add_to_prev(req, ret);
-       return ret;
 }
 
 static bool io_op_needs_file(const struct io_uring_sqe *sqe)
@@ -2488,17 +2293,9 @@ static int __io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
 
                sqe_copy = kmemdup(s->sqe, sizeof(*sqe_copy), GFP_KERNEL);
                if (sqe_copy) {
-                       struct async_list *list;
-
                        s->sqe = sqe_copy;
                        memcpy(&req->submit, s, sizeof(*s));
-                       list = io_async_list_from_sqe(ctx, s->sqe);
-                       if (!io_add_to_prev_work(list, req)) {
-                               if (list)
-                                       atomic_inc(&list->cnt);
-                               INIT_WORK(&req->work, io_sq_wq_submit_work);
-                               io_queue_async_work(ctx, req);
-                       }
+                       io_queue_async_work(ctx, req);
 
                        /*
                         * Queued up for async execution, worker will release
@@ -3109,15 +2906,11 @@ static void io_sq_thread_stop(struct io_ring_ctx *ctx)
 
 static void io_finish_async(struct io_ring_ctx *ctx)
 {
-       int i;
-
        io_sq_thread_stop(ctx);
 
-       for (i = 0; i < ARRAY_SIZE(ctx->sqo_wq); i++) {
-               if (ctx->sqo_wq[i]) {
-                       destroy_workqueue(ctx->sqo_wq[i]);
-                       ctx->sqo_wq[i] = NULL;
-               }
+       if (ctx->io_wq) {
+               io_wq_destroy(ctx->io_wq);
+               ctx->io_wq = NULL;
        }
 }
 
@@ -3125,11 +2918,9 @@ static void io_finish_async(struct io_ring_ctx *ctx)
 static void io_destruct_skb(struct sk_buff *skb)
 {
        struct io_ring_ctx *ctx = skb->sk->sk_user_data;
-       int i;
 
-       for (i = 0; i < ARRAY_SIZE(ctx->sqo_wq); i++)
-               if (ctx->sqo_wq[i])
-                       flush_workqueue(ctx->sqo_wq[i]);
+       if (ctx->io_wq)
+               io_wq_flush(ctx->io_wq);
 
        unix_destruct_scm(skb);
 }
@@ -3473,6 +3264,7 @@ static int io_sqe_files_update(struct io_ring_ctx *ctx, void __user *arg,
 static int io_sq_offload_start(struct io_ring_ctx *ctx,
                               struct io_uring_params *p)
 {
+       unsigned concurrency;
        int ret;
 
        init_waitqueue_head(&ctx->sqo_wait);
@@ -3516,25 +3308,10 @@ static int io_sq_offload_start(struct io_ring_ctx *ctx,
                goto err;
        }
 
-       /* Do QD, or 2 * CPUS, whatever is smallest */
-       ctx->sqo_wq[0] = alloc_workqueue("io_ring-wq",
-                       WQ_UNBOUND | WQ_FREEZABLE,
-                       min(ctx->sq_entries - 1, 2 * num_online_cpus()));
-       if (!ctx->sqo_wq[0]) {
-               ret = -ENOMEM;
-               goto err;
-       }
-
-       /*
-        * This is for buffered writes, where we want to limit the parallelism
-        * due to file locking in file systems. As "normal" buffered writes
-        * should parellelize on writeout quite nicely, limit us to having 2
-        * pending. This avoids massive contention on the inode when doing
-        * buffered async writes.
-        */
-       ctx->sqo_wq[1] = alloc_workqueue("io_ring-write-wq",
-                                               WQ_UNBOUND | WQ_FREEZABLE, 2);
-       if (!ctx->sqo_wq[1]) {
+       /* Do QD, or 4 * CPUS, whatever is smallest */
+       concurrency = min(ctx->sq_entries, 4 * num_online_cpus());
+       ctx->io_wq = io_wq_create(concurrency, ctx->sqo_mm);
+       if (!ctx->io_wq) {
                ret = -ENOMEM;
                goto err;
        }
@@ -3919,6 +3696,10 @@ static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
 
        io_kill_timeouts(ctx);
        io_poll_remove_all(ctx);
+
+       if (ctx->io_wq)
+               io_wq_cancel_all(ctx->io_wq);
+
        io_iopoll_reap_events(ctx);
        wait_for_completion(&ctx->ctx_done);
        io_ring_ctx_free(ctx);
index c5a905fbf1dab5a334ba47389247f6ac2e261b02..b85255121b9858a1e38ae0853d0c572ab3c7a68f 100644 (file)
@@ -7,6 +7,8 @@
 
 #include <linux/tracepoint.h>
 
+struct io_wq_work;
+
 /**
  * io_uring_create - called after a new io_uring context was prepared
  *
@@ -126,15 +128,15 @@ TRACE_EVENT(io_uring_file_get,
  * io_uring_queue_async_work - called before submitting a new async work
  *
  * @ctx:       pointer to a ring context structure
- * @rw:                type of workqueue, normal or buffered writes
+ * @hashed:    type of workqueue, hashed or normal
  * @req:       pointer to a submitted request
- * @work:      pointer to a submitted work_struct
+ * @work:      pointer to a submitted io_wq_work
  *
  * Allows to trace asynchronous work submission.
  */
 TRACE_EVENT(io_uring_queue_async_work,
 
-       TP_PROTO(void *ctx, int rw, void * req, struct work_struct *work,
+       TP_PROTO(void *ctx, int rw, void * req, struct io_wq_work *work,
                         unsigned int flags),
 
        TP_ARGS(ctx, rw, req, work, flags),
@@ -143,7 +145,7 @@ TRACE_EVENT(io_uring_queue_async_work,
                __field(  void *,                               ctx             )
                __field(  int,                                  rw              )
                __field(  void *,                               req             )
-               __field(  struct work_struct *, work    )
+               __field(  struct io_wq_work *,          work    )
                __field(  unsigned int,                 flags   )
        ),
 
@@ -157,7 +159,7 @@ TRACE_EVENT(io_uring_queue_async_work,
 
        TP_printk("ring %p, request %p, flags %d, %s queue, work %p",
                          __entry->ctx, __entry->req, __entry->flags,
-                         __entry->rw ? "buffered" : "normal", __entry->work)
+                         __entry->rw ? "hashed" : "normal", __entry->work)
 );
 
 /**
index b4daad2bac233cfc32744bfb7b78a6d0ab466744..4d8d145c41d297200f8940ad06e56812f4c3e147 100644 (file)
@@ -1548,6 +1548,7 @@ config AIO
 config IO_URING
        bool "Enable IO uring support" if EXPERT
        select ANON_INODES
+       select IO_WQ
        default y
        help
          This option enables support for the io_uring interface, enabling