engines/aioring: update for continually rolling ring
authorJens Axboe <axboe@kernel.dk>
Fri, 21 Dec 2018 21:47:07 +0000 (14:47 -0700)
committerJens Axboe <axboe@kernel.dk>
Fri, 21 Dec 2018 21:47:07 +0000 (14:47 -0700)
Signed-off-by: Jens Axboe <axboe@kernel.dk>
engines/aioring.c

index 59551f9..c480962 100644 (file)
@@ -17,6 +17,7 @@
 #include "../lib/pow2.h"
 #include "../optgroup.h"
 #include "../lib/memalign.h"
+#include "../lib/fls.h"
 
 #ifdef ARCH_HAVE_AIORING
 
@@ -99,12 +100,15 @@ struct aioring_data {
 
        struct aio_sq_ring *sq_ring;
        struct iocb *iocbs;
+       unsigned sq_ring_mask;
 
        struct aio_cq_ring *cq_ring;
        struct io_event *events;
+       unsigned cq_ring_mask;
 
        int queued;
        int cq_ring_off;
+       unsigned iodepth;
 
        uint64_t cachehit;
        uint64_t cachemiss;
@@ -223,11 +227,9 @@ static struct io_u *fio_aioring_event(struct thread_data *td, int event)
        struct aioring_data *ld = td->io_ops_data;
        struct io_event *ev;
        struct io_u *io_u;
-       int index;
+       unsigned index;
 
-       index = event + ld->cq_ring_off;
-       if (index >= ld->cq_ring->nr_events)
-               index -= ld->cq_ring->nr_events;
+       index = (event + ld->cq_ring_off) & ld->cq_ring_mask;
 
        ev = &ld->cq_ring->events[index];
        io_u = ev->data;
@@ -264,8 +266,6 @@ static int fio_aioring_cqring_reap(struct thread_data *td, unsigned int events,
                        break;
                reaped++;
                head++;
-               if (head == ring->nr_events)
-                       head = 0;
        } while (reaped + events < max);
 
        ring->head = head;
@@ -280,7 +280,8 @@ static int fio_aioring_getevents(struct thread_data *td, unsigned int min,
        unsigned actual_min = td->o.iodepth_batch_complete_min == 0 ? 0 : min;
        struct aioring_options *o = td->eo;
        struct aio_cq_ring *ring = ld->cq_ring;
-       int r, events = 0;
+       unsigned events = 0;
+       int r;
 
        ld->cq_ring_off = ring->head;
        do {
@@ -314,7 +315,7 @@ static enum fio_q_status fio_aioring_queue(struct thread_data *td,
 
        fio_ro_check(td, io_u);
 
-       if (ld->queued == td->o.iodepth)
+       if (ld->queued == ld->iodepth)
                return FIO_Q_BUSY;
 
        if (io_u->ddir == DDIR_TRIM) {
@@ -329,13 +330,11 @@ static enum fio_q_status fio_aioring_queue(struct thread_data *td,
 
        tail = ring->tail;
        next_tail = tail + 1;
-       if (next_tail == ring->nr_events)
-               next_tail = 0;
        read_barrier();
        if (next_tail == ring->head)
                return FIO_Q_BUSY;
 
-       ring->array[tail] = io_u->index;
+       ring->array[tail & ld->sq_ring_mask] = io_u->index;
        ring->tail = next_tail;
        write_barrier();
 
@@ -354,15 +353,13 @@ static void fio_aioring_queued(struct thread_data *td, int start, int nr)
        fio_gettime(&now, NULL);
 
        while (nr--) {
-               int index = ld->sq_ring->array[start];
+               int index = ld->sq_ring->array[start & ld->sq_ring_mask];
                struct io_u *io_u = io_u = ld->io_u_index[index];
 
                memcpy(&io_u->issue_time, &now, sizeof(now));
                io_u_queued(td, io_u);
 
                start++;
-               if (start == ld->sq_ring->nr_events)
-                       start = 0;
        }
 }
 
@@ -386,7 +383,7 @@ static int fio_aioring_commit(struct thread_data *td)
        }
 
        do {
-               int start = ld->sq_ring->head;
+               unsigned start = ld->sq_ring->head;
                long nr = ld->queued;
 
                ret = io_ring_enter(ld->aio_ctx, nr, 0, IORING_FLAG_SUBMIT |
@@ -432,6 +429,11 @@ static size_t aioring_sq_size(struct thread_data *td)
        return sizeof(struct aio_sq_ring) + td->o.iodepth * sizeof(u32);
 }
 
+static unsigned roundup_pow2(unsigned depth)
+{
+       return 1UL << __fls(depth - 1);
+}
+
 static void fio_aioring_cleanup(struct thread_data *td)
 {
        struct aioring_data *ld = td->io_ops_data;
@@ -440,9 +442,6 @@ static void fio_aioring_cleanup(struct thread_data *td)
                td->ts.cachehit += ld->cachehit;
                td->ts.cachemiss += ld->cachemiss;
 
-               /* Bump depth to match init depth */
-               td->o.iodepth++;
-
                /*
                 * Work-around to avoid huge RCU stalls at exit time. If we
                 * don't do this here, then it'll be torn down by exit_aio().
@@ -516,9 +515,6 @@ static int fio_aioring_post_init(struct thread_data *td)
 
        err = fio_aioring_queue_init(td);
 
-       /* Adjust depth back again */
-       td->o.iodepth--;
-
        if (err) {
                td_verror(td, errno, "io_queue_init");
                return 1;
@@ -531,11 +527,12 @@ static int fio_aioring_init(struct thread_data *td)
 {
        struct aioring_data *ld;
 
-       /* ring needs an extra entry, add one to achieve QD set */
-       td->o.iodepth++;
-
        ld = calloc(1, sizeof(*ld));
 
+       /* ring depth must be a power-of-2 */
+       ld->iodepth = td->o.iodepth;
+       td->o.iodepth = roundup_pow2(td->o.iodepth);
+
        /* io_u index */
        ld->io_u_index = calloc(td->o.iodepth, sizeof(struct io_u *));
        ld->io_us = calloc(td->o.iodepth, sizeof(struct io_u *));
@@ -547,10 +544,12 @@ static int fio_aioring_init(struct thread_data *td)
        memset(ld->sq_ring, 0, aioring_sq_size(td));
        ld->sq_ring->nr_events = td->o.iodepth;
        ld->sq_ring->iocbs = (u64) (uintptr_t) ld->iocbs;
+       ld->sq_ring_mask = td->o.iodepth - 1;
 
        ld->cq_ring = fio_memalign(page_size, aioring_cq_size(td), false);
        memset(ld->cq_ring, 0, aioring_cq_size(td));
        ld->cq_ring->nr_events = td->o.iodepth * 2;
+       ld->cq_ring_mask = (2 * td->o.iodepth) - 1;
 
        td->io_ops_data = ld;
        return 0;