-#ifdef ARCH_HAVE_AIORING
/*
* aioring engine
*
- * IO engine using the new native Linux libaio ring interface
+ * IO engine using the new native Linux libaio ring interface. See:
+ *
+ * http://git.kernel.dk/cgit/linux-block/log/?h=aio-poll
*
*/
#include <stdlib.h>
#include "../optgroup.h"
#include "../lib/memalign.h"
+#ifdef ARCH_HAVE_AIORING
+
#ifndef IOCB_FLAG_HIPRI
#define IOCB_FLAG_HIPRI (1 << 2)
#endif
#ifndef IOCTX_FLAG_SQWQ
#define IOCTX_FLAG_SQWQ (1 << 4)
#endif
+#ifndef IOCTX_FLAG_SQPOLL
+#define IOCTX_FLAG_SQPOLL (1 << 5)
+#endif
+
/*
* io_ring_enter(2) flags
typedef uint32_t u32;
typedef uint16_t u16;
+#define IORING_SQ_NEED_WAKEUP (1 << 0)
+
+#define IOEV_RES2_CACHEHIT (1 << 0)
+
struct aio_sq_ring {
union {
struct {
u32 tail;
u32 nr_events;
u16 sq_thread_cpu;
+ u16 kflags;
u64 iocbs;
};
u32 pad[16];
int queued;
int cq_ring_off;
+
+ uint64_t cachehit;
+ uint64_t cachemiss;
};
struct aioring_options {
void *pad;
unsigned int hipri;
unsigned int fixedbufs;
+ unsigned int sqthread;
+ unsigned int sqthread_set;
+ unsigned int sqthread_poll;
+ unsigned int sqwq;
};
+static int fio_aioring_sqthread_cb(void *data,
+ unsigned long long *val)
+{
+ struct aioring_options *o = data;
+
+ o->sqthread = *val;
+ o->sqthread_set = 1;
+ return 0;
+}
+
static struct fio_option options[] = {
{
.name = "hipri",
.category = FIO_OPT_C_ENGINE,
.group = FIO_OPT_G_LIBAIO,
},
+ {
+ .name = "sqthread",
+ .lname = "Use kernel SQ thread on this CPU",
+ .type = FIO_OPT_INT,
+ .cb = fio_aioring_sqthread_cb,
+ .help = "Offload submission to kernel thread",
+ .category = FIO_OPT_C_ENGINE,
+ .group = FIO_OPT_G_LIBAIO,
+ },
+ {
+ .name = "sqthread_poll",
+ .lname = "Kernel SQ thread should poll",
+ .type = FIO_OPT_STR_SET,
+ .off1 = offsetof(struct aioring_options, sqthread_poll),
+ .help = "Used with sqthread, enables kernel side polling",
+ .category = FIO_OPT_C_ENGINE,
+ .group = FIO_OPT_G_LIBAIO,
+ },
+ {
+ .name = "sqwq",
+ .lname = "Offload submission to kernel workqueue",
+ .type = FIO_OPT_STR_SET,
+ .off1 = offsetof(struct aioring_options, sqwq),
+ .help = "Offload submission to kernel workqueue",
+ .category = FIO_OPT_C_ENGINE,
+ .group = FIO_OPT_G_LIBAIO,
+ },
{
.name = NULL,
},
};
-static int fio_aioring_commit(struct thread_data *td);
-
static int io_ring_enter(io_context_t ctx, unsigned int to_submit,
unsigned int min_complete, unsigned int flags)
{
-#ifdef __NR_sys_io_ring_enter
return syscall(__NR_sys_io_ring_enter, ctx, to_submit, min_complete,
flags);
-#else
- return -1;
-#endif
}
static int fio_aioring_prep(struct thread_data *td, struct io_u *io_u)
iocb = &ld->iocbs[io_u->index];
- if (io_u->ddir == DDIR_READ) {
- if (o->fixedbufs) {
- iocb->aio_fildes = f->fd;
+ if (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_WRITE) {
+ if (io_u->ddir == DDIR_READ)
iocb->aio_lio_opcode = IO_CMD_PREAD;
- iocb->u.c.offset = io_u->offset;
- } else {
- io_prep_pread(iocb, f->fd, io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
- if (o->hipri)
- iocb->u.c.flags |= IOCB_FLAG_HIPRI;
- }
- } else if (io_u->ddir == DDIR_WRITE) {
- if (o->fixedbufs) {
- iocb->aio_fildes = f->fd;
+ else
iocb->aio_lio_opcode = IO_CMD_PWRITE;
- iocb->u.c.offset = io_u->offset;
- } else {
- io_prep_pwrite(iocb, f->fd, io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
- if (o->hipri)
- iocb->u.c.flags |= IOCB_FLAG_HIPRI;
- }
+ iocb->aio_reqprio = 0;
+ iocb->aio_fildes = f->fd;
+ iocb->u.c.buf = io_u->xfer_buf;
+ iocb->u.c.nbytes = io_u->xfer_buflen;
+ iocb->u.c.offset = io_u->offset;
+ if (o->hipri)
+ iocb->u.c.flags |= IOCB_FLAG_HIPRI;
+ else
+ iocb->u.c.flags = 0;
} else if (ddir_sync(io_u->ddir))
io_prep_fsync(iocb, f->fd);
} else
io_u->error = 0;
+ if (io_u->ddir == DDIR_READ) {
+ if (ev->res2 & IOEV_RES2_CACHEHIT)
+ ld->cachehit++;
+ else
+ ld->cachemiss++;
+ }
+
return io_u;
}
{
struct aioring_data *ld = td->io_ops_data;
unsigned actual_min = td->o.iodepth_batch_complete_min == 0 ? 0 : min;
+ struct aioring_options *o = td->eo;
struct aio_cq_ring *ring = ld->cq_ring;
int r, events = 0;
continue;
}
- r = io_ring_enter(ld->aio_ctx, 0, actual_min,
- IORING_FLAG_GETEVENTS);
- if (r < 0) {
- if (errno == EAGAIN)
- continue;
- perror("ring enter");
- break;
+ if (!o->sqthread_poll) {
+ r = io_ring_enter(ld->aio_ctx, 0, actual_min,
+ IORING_FLAG_GETEVENTS);
+ if (r < 0) {
+ if (errno == EAGAIN)
+ continue;
+ td_verror(td, errno, "io_ring_enter get");
+ break;
+ }
}
} while (events < min);
if (ld->queued == td->o.iodepth)
return FIO_Q_BUSY;
- /*
- * fsync is tricky, since it can fail and we need to do it
- * serialized with other io. the reason is that linux doesn't
- * support aio fsync yet. So return busy for the case where we
- * have pending io, to let fio complete those first.
- */
- if (ddir_sync(io_u->ddir)) {
- if (ld->queued)
- return FIO_Q_BUSY;
-
- do_io_u_sync(td, io_u);
- return FIO_Q_COMPLETED;
- }
-
if (io_u->ddir == DDIR_TRIM) {
if (ld->queued)
return FIO_Q_BUSY;
static int fio_aioring_commit(struct thread_data *td)
{
struct aioring_data *ld = td->io_ops_data;
+ struct aioring_options *o = td->eo;
int ret;
if (!ld->queued)
return 0;
+ /* Nothing to do */
+ if (o->sqthread_poll) {
+ struct aio_sq_ring *ring = ld->sq_ring;
+
+ if (ring->kflags & IORING_SQ_NEED_WAKEUP)
+ io_ring_enter(ld->aio_ctx, ld->queued, 0, IORING_FLAG_SUBMIT);
+ ld->queued = 0;
+ return 0;
+ }
+
do {
int start = ld->sq_ring->head;
long nr = ld->queued;
ret = io_ring_enter(ld->aio_ctx, nr, 0, IORING_FLAG_SUBMIT |
IORING_FLAG_GETEVENTS);
- if (ret == -1)
- perror("io_ring_enter");
if (ret > 0) {
fio_aioring_queued(td, start, ret);
io_u_mark_submit(td, ret);
ld->queued -= ret;
ret = 0;
- } else if (ret == -EINTR || !ret) {
- if (!ret)
- io_u_mark_submit(td, ret);
+ } else if (!ret) {
+ io_u_mark_submit(td, ret);
continue;
- } else if (ret == -EAGAIN) {
- /*
- * If we get EAGAIN, we should break out without
- * error and let the upper layer reap some
- * events for us. If we have no queued IO, we
- * must loop here. If we loop for more than 30s,
- * just error out, something must be buggy in the
- * IO path.
- */
- if (ld->queued) {
- ret = 0;
- break;
+ } else {
+ if (errno == EAGAIN) {
+ ret = fio_aioring_cqring_reap(td, 0, ld->queued);
+ if (ret)
+ continue;
+ /* Shouldn't happen */
+ usleep(1);
+ continue;
}
- usleep(1);
- continue;
- } else if (ret == -ENOMEM) {
- /*
- * If we get -ENOMEM, reap events if we can. If
- * we cannot, treat it as a fatal event since there's
- * nothing we can do about it.
- */
- if (ld->queued)
- ret = 0;
- break;
- } else
+ td_verror(td, errno, "io_ring_enter sumit");
break;
+ }
} while (ld->queued);
return ret;
struct aioring_data *ld = td->io_ops_data;
if (ld) {
+ td->ts.cachehit += ld->cachehit;
+ td->ts.cachemiss += ld->cachemiss;
+
+ /* Bump depth to match init depth */
+ td->o.iodepth++;
+
/*
* Work-around to avoid huge RCU stalls at exit time. If we
* don't do this here, then it'll be torn down by exit_aio().
static int fio_aioring_queue_init(struct thread_data *td)
{
-#ifdef __NR_sys_io_setup2
struct aioring_data *ld = td->io_ops_data;
struct aioring_options *o = td->eo;
int flags = IOCTX_FLAG_SCQRING;
if (o->hipri)
flags |= IOCTX_FLAG_IOPOLL;
+ if (o->sqthread_set) {
+ ld->sq_ring->sq_thread_cpu = o->sqthread;
+ flags |= IOCTX_FLAG_SQTHREAD;
+ if (o->sqthread_poll)
+ flags |= IOCTX_FLAG_SQPOLL;
+ }
+ if (o->sqwq)
+ flags |= IOCTX_FLAG_SQWQ;
+
if (o->fixedbufs) {
struct rlimit rlim = {
.rlim_cur = RLIM_INFINITY,
return syscall(__NR_sys_io_setup2, depth, flags,
ld->sq_ring, ld->cq_ring, &ld->aio_ctx);
-#else
- return -1;
-#endif
}
static int fio_aioring_post_init(struct thread_data *td)
}
err = fio_aioring_queue_init(td);
+
+ /* Adjust depth back again */
+ td->o.iodepth--;
+
if (err) {
- td_verror(td, -err, "io_queue_init");
+ td_verror(td, errno, "io_queue_init");
return 1;
}
{
struct aioring_data *ld;
- if (td->o.iodepth <= 1) {
- printf("aio-ring: needs a minimum QD of 2\n");
- return 1;
- }
+ /* ring needs an extra entry, add one to achieve QD set */
+ td->o.iodepth++;
ld = calloc(1, sizeof(*ld));