engines/libaio: fix issue with EAGAIN
authorJens Axboe <axboe@fb.com>
Tue, 30 Sep 2014 19:33:23 +0000 (13:33 -0600)
committerJens Axboe <axboe@fb.com>
Tue, 30 Sep 2014 19:33:23 +0000 (13:33 -0600)
If we get -EAGAIN on IO submission, we should back off and reap
some events. Currently we just loop trying to resubmit, and this
has been reported to cause hangs in some virtualized environments.
There's no record of this ever being hit on raw metal.

So implement this backoff. Unfortunately it requires rewriting
the io_u tracking in libaio, since otherwise we depend on being
able to submit all the io_u's we are handed. So change the
tracking to be a ring buffer, with submission bumping the head
and completion bumping the tail. With this in place, we can't
stop issuing IO at anytime, since we'll just continue where
we left off.

Signed-off-by: Jens Axboe <axboe@fb.com>
engines/libaio.c

index 9cc910d73bc19fdbb690baa5295131fc0f7addbc..bcc296ffdbc2c1046e9d91774381c34748b4d1ea 100644 (file)
@@ -18,7 +18,20 @@ struct libaio_data {
        struct io_event *aio_events;
        struct iocb **iocbs;
        struct io_u **io_us;
-       int iocbs_nr;
+
+       /*
+        * Basic ring buffer. 'head' is incremented in _queue(), and
+        * 'tail' is incremented in _commit(). We keep 'queued' so
+        * that we know if the ring is full or empty, when
+        * 'head' == 'tail'. 'entries' is the ring size, and
+        * 'is_pow2' is just an optimization to use AND instead of
+        * modulus to get the remainder on ring increment.
+        */
+       int is_pow2;
+       unsigned int entries;
+       unsigned int queued;
+       unsigned int head;
+       unsigned int tail;
 };
 
 struct libaio_options {
@@ -41,6 +54,15 @@ static struct fio_option options[] = {
        },
 };
 
+static inline void ring_inc(struct libaio_data *ld, unsigned int *val,
+                           unsigned int add)
+{
+       if (ld->is_pow2)
+               *val = (*val + add) & (ld->entries - 1);
+       else
+               *val = (*val + add) % ld->entries;
+}
+
 static int fio_libaio_prep(struct thread_data fio_unused *td, struct io_u *io_u)
 {
        struct fio_file *f = io_u->file;
@@ -150,7 +172,7 @@ static int fio_libaio_queue(struct thread_data *td, struct io_u *io_u)
 
        fio_ro_check(td, io_u);
 
-       if (ld->iocbs_nr == (int) td->o.iodepth)
+       if (ld->queued == td->o.iodepth)
                return FIO_Q_BUSY;
 
        /*
@@ -160,7 +182,7 @@ static int fio_libaio_queue(struct thread_data *td, struct io_u *io_u)
         * have pending io, to let fio complete those first.
         */
        if (ddir_sync(io_u->ddir)) {
-               if (ld->iocbs_nr)
+               if (ld->queued)
                        return FIO_Q_BUSY;
 
                do_io_u_sync(td, io_u);
@@ -168,16 +190,17 @@ static int fio_libaio_queue(struct thread_data *td, struct io_u *io_u)
        }
 
        if (io_u->ddir == DDIR_TRIM) {
-               if (ld->iocbs_nr)
+               if (ld->queued)
                        return FIO_Q_BUSY;
 
                do_io_u_trim(td, io_u);
                return FIO_Q_COMPLETED;
        }
 
-       ld->iocbs[ld->iocbs_nr] = &io_u->iocb;
-       ld->io_us[ld->iocbs_nr] = io_u;
-       ld->iocbs_nr++;
+       ld->iocbs[ld->head] = &io_u->iocb;
+       ld->io_us[ld->head] = io_u;
+       ring_inc(ld, &ld->head, 1);
+       ld->queued++;
        return FIO_Q_QUEUED;
 }
 
@@ -207,27 +230,39 @@ static int fio_libaio_commit(struct thread_data *td)
        struct io_u **io_us;
        int ret;
 
-       if (!ld->iocbs_nr)
+       if (!ld->queued)
                return 0;
 
-       io_us = ld->io_us;
-       iocbs = ld->iocbs;
        do {
-               ret = io_submit(ld->aio_ctx, ld->iocbs_nr, iocbs);
+               long nr = ld->queued;
+
+               nr = min((unsigned int) nr, ld->entries - ld->tail);
+               io_us = ld->io_us + ld->tail;
+               iocbs = ld->iocbs + ld->tail;
+
+               ret = io_submit(ld->aio_ctx, nr, iocbs);
                if (ret > 0) {
                        fio_libaio_queued(td, io_us, ret);
                        io_u_mark_submit(td, ret);
-                       ld->iocbs_nr -= ret;
-                       io_us += ret;
-                       iocbs += ret;
+
+                       ld->queued -= ret;
+                       ring_inc(ld, &ld->tail, ret);
                        ret = 0;
-               } else if (!ret || ret == -EAGAIN || ret == -EINTR) {
+               } else if (ret == -EINTR || !ret) {
                        if (!ret)
                                io_u_mark_submit(td, ret);
                        continue;
+               } else if (ret == -EAGAIN) {
+                       /*
+                        * If we get EAGAIN, we should break out without
+                        * error and let the upper layer reap some
+                        * events for us.
+                        */
+                       ret = 0;
+                       break;
                } else
                        break;
-       } while (ld->iocbs_nr);
+       } while (ld->head != ld->tail);
 
        return ret;
 }
@@ -254,11 +289,11 @@ static void fio_libaio_cleanup(struct thread_data *td)
 
 static int fio_libaio_init(struct thread_data *td)
 {
-       struct libaio_data *ld = malloc(sizeof(*ld));
        struct libaio_options *o = td->eo;
+       struct libaio_data *ld;
        int err = 0;
 
-       memset(ld, 0, sizeof(*ld));
+       ld = calloc(1, sizeof(*ld));
 
        /*
         * First try passing in 0 for queue depth, since we don't
@@ -276,13 +311,11 @@ static int fio_libaio_init(struct thread_data *td)
                return 1;
        }
 
-       ld->aio_events = malloc(td->o.iodepth * sizeof(struct io_event));
-       memset(ld->aio_events, 0, td->o.iodepth * sizeof(struct io_event));
-       ld->iocbs = malloc(td->o.iodepth * sizeof(struct iocb *));
-       memset(ld->iocbs, 0, sizeof(struct iocb *));
-       ld->io_us = malloc(td->o.iodepth * sizeof(struct io_u *));
-       memset(ld->io_us, 0, td->o.iodepth * sizeof(struct io_u *));
-       ld->iocbs_nr = 0;
+       ld->entries = td->o.iodepth;
+       ld->is_pow2 = is_power_of_2(ld->entries);
+       ld->aio_events = calloc(ld->entries, sizeof(struct io_event));
+       ld->iocbs = calloc(ld->entries, sizeof(struct iocb *));
+       ld->io_us = calloc(ld->entries, sizeof(struct io_u *));
 
        td->io_ops->data = ld;
        return 0;