io_uring: add support for link with drain
authorJackie Liu <liuyun01@kylinos.cn>
Mon, 9 Sep 2019 12:50:40 +0000 (20:50 +0800)
committerJens Axboe <axboe@kernel.dk>
Mon, 9 Sep 2019 22:15:00 +0000 (16:15 -0600)
To support the link with drain, we need to do two parts.

There is an sqes:

    0     1     2     3     4     5     6
 +-----+-----+-----+-----+-----+-----+-----+
 |  N  |  L  |  L  | L+D |  N  |  N  |  N  |
 +-----+-----+-----+-----+-----+-----+-----+

First, we need to ensure that the io before the link is completed,
there is a easy way is set drain flag to the link list's head, so
all subsequent io will be inserted into the defer_list.

+-----+
    (0) |  N  |
+-----+
           |          (2)         (3)         (4)
+-----+     +-----+     +-----+     +-----+
    (1) | L+D | --> |  L  | --> | L+D | --> |  N  |
+-----+     +-----+     +-----+     +-----+
           |
+-----+
    (5) |  N  |
+-----+
           |
+-----+
    (6) |  N  |
+-----+

Second, ensure that the following IO will not be completed first,
an easy way is to create a mirror of drain io and insert it into
defer_list, in this way, as long as drain io is not processed, the
following io in the defer_list will not be actively process.

+-----+
    (0) |  N  |
+-----+
           |          (2)         (3)         (4)
+-----+     +-----+     +-----+     +-----+
    (1) | L+D | --> |  L  | --> | L+D | --> |  N  |
+-----+     +-----+     +-----+     +-----+
           |
+-----+
   ('3) |  D  |   <== This is a shadow of (3)
+-----+
           |
+-----+
    (5) |  N  |
+-----+
           |
+-----+
    (6) |  N  |
+-----+

Signed-off-by: Jackie Liu <liuyun01@kylinos.cn>
Signed-off-by: Jens Axboe <axboe@kernel.dk>
fs/io_uring.c

index cf6d807fa414e5d1f9d3a787cfd1ac389fc19772..eff29d705a2613a6f3675ed01465cf8d12d7e2cf 100644 (file)
@@ -312,6 +312,7 @@ struct io_kiocb {
 #define REQ_F_LINK             64      /* linked sqes */
 #define REQ_F_LINK_DONE                128     /* linked sqes done */
 #define REQ_F_FAIL_LINK                256     /* fail rest of links */
+#define REQ_F_SHADOW_DRAIN     512     /* link-drain shadow req */
        u64                     user_data;
        u32                     result;
        u32                     sequence;
@@ -343,6 +344,7 @@ struct io_submit_state {
 };
 
 static void io_sq_wq_submit_work(struct work_struct *work);
+static void __io_free_req(struct io_kiocb *req);
 
 static struct kmem_cache *req_cachep;
 
@@ -448,6 +450,11 @@ static void io_commit_cqring(struct io_ring_ctx *ctx)
        __io_commit_cqring(ctx);
 
        while ((req = io_get_deferred_req(ctx)) != NULL) {
+               if (req->flags & REQ_F_SHADOW_DRAIN) {
+                       /* Just for drain, free it. */
+                       __io_free_req(req);
+                       continue;
+               }
                req->flags |= REQ_F_IO_DRAINED;
                queue_work(ctx->sqo_wq, &req->work);
        }
@@ -2015,10 +2022,14 @@ static int io_req_set_file(struct io_ring_ctx *ctx, const struct sqe_submit *s,
        flags = READ_ONCE(s->sqe->flags);
        fd = READ_ONCE(s->sqe->fd);
 
-       if (flags & IOSQE_IO_DRAIN) {
+       if (flags & IOSQE_IO_DRAIN)
                req->flags |= REQ_F_IO_DRAIN;
-               req->sequence = s->sequence;
-       }
+       /*
+        * All io need record the previous position, if LINK vs DARIN,
+        * it can be used to mark the position of the first IO in the
+        * link list.
+        */
+       req->sequence = s->sequence;
 
        if (!io_op_needs_file(s->sqe))
                return 0;
@@ -2040,20 +2051,11 @@ static int io_req_set_file(struct io_ring_ctx *ctx, const struct sqe_submit *s,
        return 0;
 }
 
-static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
+static int __io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
                        struct sqe_submit *s)
 {
        int ret;
 
-       ret = io_req_defer(ctx, req, s->sqe);
-       if (ret) {
-               if (ret != -EIOCBQUEUED) {
-                       io_free_req(req);
-                       io_cqring_add_event(ctx, s->sqe->user_data, ret);
-               }
-               return 0;
-       }
-
        ret = __io_submit_sqe(ctx, req, s, true);
        if (ret == -EAGAIN && !(req->flags & REQ_F_NOWAIT)) {
                struct io_uring_sqe *sqe_copy;
@@ -2096,6 +2098,64 @@ static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
        return ret;
 }
 
+static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
+                       struct sqe_submit *s)
+{
+       int ret;
+
+       ret = io_req_defer(ctx, req, s->sqe);
+       if (ret) {
+               if (ret != -EIOCBQUEUED) {
+                       io_free_req(req);
+                       io_cqring_add_event(ctx, s->sqe->user_data, ret);
+               }
+               return 0;
+       }
+
+       return __io_queue_sqe(ctx, req, s);
+}
+
+static int io_queue_link_head(struct io_ring_ctx *ctx, struct io_kiocb *req,
+                             struct sqe_submit *s, struct io_kiocb *shadow)
+{
+       int ret;
+       int need_submit = false;
+
+       if (!shadow)
+               return io_queue_sqe(ctx, req, s);
+
+       /*
+        * Mark the first IO in link list as DRAIN, let all the following
+        * IOs enter the defer list. all IO needs to be completed before link
+        * list.
+        */
+       req->flags |= REQ_F_IO_DRAIN;
+       ret = io_req_defer(ctx, req, s->sqe);
+       if (ret) {
+               if (ret != -EIOCBQUEUED) {
+                       io_free_req(req);
+                       io_cqring_add_event(ctx, s->sqe->user_data, ret);
+                       return 0;
+               }
+       } else {
+               /*
+                * If ret == 0 means that all IOs in front of link io are
+                * running done. let's queue link head.
+                */
+               need_submit = true;
+       }
+
+       /* Insert shadow req to defer_list, blocking next IOs */
+       spin_lock_irq(&ctx->completion_lock);
+       list_add_tail(&shadow->list, &ctx->defer_list);
+       spin_unlock_irq(&ctx->completion_lock);
+
+       if (need_submit)
+               return __io_queue_sqe(ctx, req, s);
+
+       return 0;
+}
+
 #define SQE_VALID_FLAGS        (IOSQE_FIXED_FILE|IOSQE_IO_DRAIN|IOSQE_IO_LINK)
 
 static void io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
@@ -2241,6 +2301,7 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,
 {
        struct io_submit_state state, *statep = NULL;
        struct io_kiocb *link = NULL;
+       struct io_kiocb *shadow_req = NULL;
        bool prev_was_link = false;
        int i, submitted = 0;
 
@@ -2255,11 +2316,20 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,
                 * that's the end of the chain. Submit the previous link.
                 */
                if (!prev_was_link && link) {
-                       io_queue_sqe(ctx, link, &link->submit);
+                       io_queue_link_head(ctx, link, &link->submit, shadow_req);
                        link = NULL;
                }
                prev_was_link = (sqes[i].sqe->flags & IOSQE_IO_LINK) != 0;
 
+               if (link && (sqes[i].sqe->flags & IOSQE_IO_DRAIN)) {
+                       if (!shadow_req) {
+                               shadow_req = io_get_req(ctx, NULL);
+                               shadow_req->flags |= (REQ_F_IO_DRAIN | REQ_F_SHADOW_DRAIN);
+                               refcount_dec(&shadow_req->refs);
+                       }
+                       shadow_req->sequence = sqes[i].sequence;
+               }
+
                if (unlikely(mm_fault)) {
                        io_cqring_add_event(ctx, sqes[i].sqe->user_data,
                                                -EFAULT);
@@ -2273,7 +2343,7 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,
        }
 
        if (link)
-               io_queue_sqe(ctx, link, &link->submit);
+               io_queue_link_head(ctx, link, &link->submit, shadow_req);
        if (statep)
                io_submit_state_end(&state);
 
@@ -2409,6 +2479,7 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
 {
        struct io_submit_state state, *statep = NULL;
        struct io_kiocb *link = NULL;
+       struct io_kiocb *shadow_req = NULL;
        bool prev_was_link = false;
        int i, submit = 0;
 
@@ -2428,11 +2499,20 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
                 * that's the end of the chain. Submit the previous link.
                 */
                if (!prev_was_link && link) {
-                       io_queue_sqe(ctx, link, &link->submit);
+                       io_queue_link_head(ctx, link, &link->submit, shadow_req);
                        link = NULL;
                }
                prev_was_link = (s.sqe->flags & IOSQE_IO_LINK) != 0;
 
+               if (link && (s.sqe->flags & IOSQE_IO_DRAIN)) {
+                       if (!shadow_req) {
+                               shadow_req = io_get_req(ctx, NULL);
+                               shadow_req->flags |= (REQ_F_IO_DRAIN | REQ_F_SHADOW_DRAIN);
+                               refcount_dec(&shadow_req->refs);
+                       }
+                       shadow_req->sequence = s.sequence;
+               }
+
                s.has_user = true;
                s.needs_lock = false;
                s.needs_fixed_file = false;
@@ -2442,7 +2522,7 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
        io_commit_sqring(ctx);
 
        if (link)
-               io_queue_sqe(ctx, link, &link->submit);
+               io_queue_link_head(ctx, link, &link->submit, shadow_req);
        if (statep)
                io_submit_state_end(statep);