engines/aio-ring: initialization error handling
[fio.git] / engines / aioring.c
index 5a9add7937529b98a8924faf6d8c200edf438f73..59551f9cbfc557f4c20a4aa4a999989cfafb9d46 100644 (file)
@@ -1,8 +1,9 @@
-#ifdef ARCH_HAVE_AIORING
 /*
  * aioring engine
  *
- * IO engine using the new native Linux libaio ring interface
+ * IO engine using the new native Linux libaio ring interface. See:
+ *
+ * http://git.kernel.dk/cgit/linux-block/log/?h=aio-poll
  *
  */
 #include <stdlib.h>
@@ -17,6 +18,8 @@
 #include "../optgroup.h"
 #include "../lib/memalign.h"
 
+#ifdef ARCH_HAVE_AIORING
+
 #ifndef IOCB_FLAG_HIPRI
 #define IOCB_FLAG_HIPRI        (1 << 2)
 #endif
 #ifndef IOCTX_FLAG_SQWQ
 #define IOCTX_FLAG_SQWQ                (1 << 4)
 #endif
+#ifndef IOCTX_FLAG_SQPOLL
+#define IOCTX_FLAG_SQPOLL      (1 << 5)
+#endif
+
 
 /*
  * io_ring_enter(2) flags
@@ -54,6 +61,10 @@ typedef uint64_t u64;
 typedef uint32_t u32;
 typedef uint16_t u16;
 
+#define IORING_SQ_NEED_WAKEUP  (1 << 0)
+
+#define IOEV_RES2_CACHEHIT     (1 << 0)
+
 struct aio_sq_ring {
        union {
                struct {
@@ -61,6 +72,7 @@ struct aio_sq_ring {
                        u32 tail;
                        u32 nr_events;
                        u16 sq_thread_cpu;
+                       u16 kflags;
                        u64 iocbs;
                };
                u32 pad[16];
@@ -93,14 +105,31 @@ struct aioring_data {
 
        int queued;
        int cq_ring_off;
+
+       uint64_t cachehit;
+       uint64_t cachemiss;
 };
 
 struct aioring_options {
        void *pad;
        unsigned int hipri;
        unsigned int fixedbufs;
+       unsigned int sqthread;
+       unsigned int sqthread_set;
+       unsigned int sqthread_poll;
+       unsigned int sqwq;
 };
 
+static int fio_aioring_sqthread_cb(void *data,
+                                  unsigned long long *val)
+{
+       struct aioring_options *o = data;
+
+       o->sqthread = *val;
+       o->sqthread_set = 1;
+       return 0;
+}
+
 static struct fio_option options[] = {
        {
                .name   = "hipri",
@@ -120,22 +149,43 @@ static struct fio_option options[] = {
                .category = FIO_OPT_C_ENGINE,
                .group  = FIO_OPT_G_LIBAIO,
        },
+       {
+               .name   = "sqthread",
+               .lname  = "Use kernel SQ thread on this CPU",
+               .type   = FIO_OPT_INT,
+               .cb     = fio_aioring_sqthread_cb,
+               .help   = "Offload submission to kernel thread",
+               .category = FIO_OPT_C_ENGINE,
+               .group  = FIO_OPT_G_LIBAIO,
+       },
+       {
+               .name   = "sqthread_poll",
+               .lname  = "Kernel SQ thread should poll",
+               .type   = FIO_OPT_STR_SET,
+               .off1   = offsetof(struct aioring_options, sqthread_poll),
+               .help   = "Used with sqthread, enables kernel side polling",
+               .category = FIO_OPT_C_ENGINE,
+               .group  = FIO_OPT_G_LIBAIO,
+       },
+       {
+               .name   = "sqwq",
+               .lname  = "Offload submission to kernel workqueue",
+               .type   = FIO_OPT_STR_SET,
+               .off1   = offsetof(struct aioring_options, sqwq),
+               .help   = "Offload submission to kernel workqueue",
+               .category = FIO_OPT_C_ENGINE,
+               .group  = FIO_OPT_G_LIBAIO,
+       },
        {
                .name   = NULL,
        },
 };
 
-static int fio_aioring_commit(struct thread_data *td);
-
 static int io_ring_enter(io_context_t ctx, unsigned int to_submit,
                         unsigned int min_complete, unsigned int flags)
 {
-#ifdef __NR_sys_io_ring_enter
        return syscall(__NR_sys_io_ring_enter, ctx, to_submit, min_complete,
                        flags);
-#else
-       return -1;
-#endif
 }
 
 static int fio_aioring_prep(struct thread_data *td, struct io_u *io_u)
@@ -147,26 +197,20 @@ static int fio_aioring_prep(struct thread_data *td, struct io_u *io_u)
 
        iocb = &ld->iocbs[io_u->index];
 
-       if (io_u->ddir == DDIR_READ) {
-               if (o->fixedbufs) {
-                       iocb->aio_fildes = f->fd;
+       if (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_WRITE) {
+               if (io_u->ddir == DDIR_READ)
                        iocb->aio_lio_opcode = IO_CMD_PREAD;
-                       iocb->u.c.offset = io_u->offset;
-               } else {
-                       io_prep_pread(iocb, f->fd, io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
-                       if (o->hipri)
-                               iocb->u.c.flags |= IOCB_FLAG_HIPRI;
-               }
-       } else if (io_u->ddir == DDIR_WRITE) {
-               if (o->fixedbufs) {
-                       iocb->aio_fildes = f->fd;
+               else
                        iocb->aio_lio_opcode = IO_CMD_PWRITE;
-                       iocb->u.c.offset = io_u->offset;
-               } else {
-                       io_prep_pwrite(iocb, f->fd, io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
-                       if (o->hipri)
-                               iocb->u.c.flags |= IOCB_FLAG_HIPRI;
-               }
+               iocb->aio_reqprio = 0;
+               iocb->aio_fildes = f->fd;
+               iocb->u.c.buf = io_u->xfer_buf;
+               iocb->u.c.nbytes = io_u->xfer_buflen;
+               iocb->u.c.offset = io_u->offset;
+               if (o->hipri)
+                       iocb->u.c.flags |= IOCB_FLAG_HIPRI;
+               else
+                       iocb->u.c.flags = 0;
        } else if (ddir_sync(io_u->ddir))
                io_prep_fsync(iocb, f->fd);
 
@@ -196,6 +240,13 @@ static struct io_u *fio_aioring_event(struct thread_data *td, int event)
        } else
                io_u->error = 0;
 
+       if (io_u->ddir == DDIR_READ) {
+               if (ev->res2 & IOEV_RES2_CACHEHIT)
+                       ld->cachehit++;
+               else
+                       ld->cachemiss++;
+       }
+
        return io_u;
 }
 
@@ -227,6 +278,7 @@ static int fio_aioring_getevents(struct thread_data *td, unsigned int min,
 {
        struct aioring_data *ld = td->io_ops_data;
        unsigned actual_min = td->o.iodepth_batch_complete_min == 0 ? 0 : min;
+       struct aioring_options *o = td->eo;
        struct aio_cq_ring *ring = ld->cq_ring;
        int r, events = 0;
 
@@ -238,13 +290,15 @@ static int fio_aioring_getevents(struct thread_data *td, unsigned int min,
                        continue;
                }
 
-               r = io_ring_enter(ld->aio_ctx, 0, actual_min,
-                                       IORING_FLAG_GETEVENTS);
-               if (r < 0) {
-                       if (errno == EAGAIN)
-                               continue;
-                       perror("ring enter");
-                       break;
+               if (!o->sqthread_poll) {
+                       r = io_ring_enter(ld->aio_ctx, 0, actual_min,
+                                               IORING_FLAG_GETEVENTS);
+                       if (r < 0) {
+                               if (errno == EAGAIN)
+                                       continue;
+                               td_verror(td, errno, "io_ring_enter get");
+                               break;
+                       }
                }
        } while (events < min);
 
@@ -263,20 +317,6 @@ static enum fio_q_status fio_aioring_queue(struct thread_data *td,
        if (ld->queued == td->o.iodepth)
                return FIO_Q_BUSY;
 
-       /*
-        * fsync is tricky, since it can fail and we need to do it
-        * serialized with other io. the reason is that linux doesn't
-        * support aio fsync yet. So return busy for the case where we
-        * have pending io, to let fio complete those first.
-        */
-       if (ddir_sync(io_u->ddir)) {
-               if (ld->queued)
-                       return FIO_Q_BUSY;
-
-               do_io_u_sync(td, io_u);
-               return FIO_Q_COMPLETED;
-       }
-
        if (io_u->ddir == DDIR_TRIM) {
                if (ld->queued)
                        return FIO_Q_BUSY;
@@ -329,55 +369,49 @@ static void fio_aioring_queued(struct thread_data *td, int start, int nr)
 static int fio_aioring_commit(struct thread_data *td)
 {
        struct aioring_data *ld = td->io_ops_data;
+       struct aioring_options *o = td->eo;
        int ret;
 
        if (!ld->queued)
                return 0;
 
+       /* Nothing to do */
+       if (o->sqthread_poll) {
+               struct aio_sq_ring *ring = ld->sq_ring;
+
+               if (ring->kflags & IORING_SQ_NEED_WAKEUP)
+                       io_ring_enter(ld->aio_ctx, ld->queued, 0, IORING_FLAG_SUBMIT);
+               ld->queued = 0;
+               return 0;
+       }
+
        do {
                int start = ld->sq_ring->head;
                long nr = ld->queued;
 
                ret = io_ring_enter(ld->aio_ctx, nr, 0, IORING_FLAG_SUBMIT |
                                                IORING_FLAG_GETEVENTS);
-               if (ret == -1)
-                       perror("io_ring_enter");
                if (ret > 0) {
                        fio_aioring_queued(td, start, ret);
                        io_u_mark_submit(td, ret);
 
                        ld->queued -= ret;
                        ret = 0;
-               } else if (ret == -EINTR || !ret) {
-                       if (!ret)
-                               io_u_mark_submit(td, ret);
+               } else if (!ret) {
+                       io_u_mark_submit(td, ret);
                        continue;
-               } else if (ret == -EAGAIN) {
-                       /*
-                        * If we get EAGAIN, we should break out without
-                        * error and let the upper layer reap some
-                        * events for us. If we have no queued IO, we
-                        * must loop here. If we loop for more than 30s,
-                        * just error out, something must be buggy in the
-                        * IO path.
-                        */
-                       if (ld->queued) {
-                               ret = 0;
-                               break;
+               } else {
+                       if (errno == EAGAIN) {
+                               ret = fio_aioring_cqring_reap(td, 0, ld->queued);
+                               if (ret)
+                                       continue;
+                               /* Shouldn't happen */
+                               usleep(1);
+                               continue;
                        }
-                       usleep(1);
-                       continue;
-               } else if (ret == -ENOMEM) {
-                       /*
-                        * If we get -ENOMEM, reap events if we can. If
-                        * we cannot, treat it as a fatal event since there's
-                        * nothing we can do about it.
-                        */
-                       if (ld->queued)
-                               ret = 0;
-                       break;
-               } else
+                       td_verror(td, errno, "io_ring_enter sumit");
                        break;
+               }
        } while (ld->queued);
 
        return ret;
@@ -403,6 +437,12 @@ static void fio_aioring_cleanup(struct thread_data *td)
        struct aioring_data *ld = td->io_ops_data;
 
        if (ld) {
+               td->ts.cachehit += ld->cachehit;
+               td->ts.cachemiss += ld->cachemiss;
+
+               /* Bump depth to match init depth */
+               td->o.iodepth++;
+
                /*
                 * Work-around to avoid huge RCU stalls at exit time. If we
                 * don't do this here, then it'll be torn down by exit_aio().
@@ -422,7 +462,6 @@ static void fio_aioring_cleanup(struct thread_data *td)
 
 static int fio_aioring_queue_init(struct thread_data *td)
 {
-#ifdef __NR_sys_io_setup2
        struct aioring_data *ld = td->io_ops_data;
        struct aioring_options *o = td->eo;
        int flags = IOCTX_FLAG_SCQRING;
@@ -430,6 +469,15 @@ static int fio_aioring_queue_init(struct thread_data *td)
 
        if (o->hipri)
                flags |= IOCTX_FLAG_IOPOLL;
+       if (o->sqthread_set) {
+               ld->sq_ring->sq_thread_cpu = o->sqthread;
+               flags |= IOCTX_FLAG_SQTHREAD;
+               if (o->sqthread_poll)
+                       flags |= IOCTX_FLAG_SQPOLL;
+       }
+       if (o->sqwq)
+               flags |= IOCTX_FLAG_SQWQ;
+
        if (o->fixedbufs) {
                struct rlimit rlim = {
                        .rlim_cur = RLIM_INFINITY,
@@ -442,9 +490,6 @@ static int fio_aioring_queue_init(struct thread_data *td)
 
        return syscall(__NR_sys_io_setup2, depth, flags,
                        ld->sq_ring, ld->cq_ring, &ld->aio_ctx);
-#else
-       return -1;
-#endif
 }
 
 static int fio_aioring_post_init(struct thread_data *td)
@@ -470,8 +515,12 @@ static int fio_aioring_post_init(struct thread_data *td)
        }
 
        err = fio_aioring_queue_init(td);
+
+       /* Adjust depth back again */
+       td->o.iodepth--;
+
        if (err) {
-               td_verror(td, -err, "io_queue_init");
+               td_verror(td, errno, "io_queue_init");
                return 1;
        }
 
@@ -482,10 +531,8 @@ static int fio_aioring_init(struct thread_data *td)
 {
        struct aioring_data *ld;
 
-       if (td->o.iodepth <= 1) {
-               printf("aio-ring: needs a minimum QD of 2\n");
-               return 1;
-       }
+       /* ring needs an extra entry, add one to achieve QD set */
+       td->o.iodepth++;
 
        ld = calloc(1, sizeof(*ld));