From: Jens Axboe Date: Tue, 30 Sep 2014 19:33:23 +0000 (-0600) Subject: engines/libaio: fix issue with EAGAIN X-Git-Tag: fio-2.1.13~7 X-Git-Url: https://git.kernel.dk/?p=fio.git;a=commitdiff_plain;h=acd45e028cfaf15a29eedcba3ff95c804e2d29e9 engines/libaio: fix issue with EAGAIN If we get -EAGAIN on IO submission, we should back off and reap some events. Currently we just loop trying to resubmit, and this has been reported to cause hangs in some virtualized environments. There's no record of this ever being hit on raw metal. So implement this backoff. Unfortunately it requires rewriting the io_u tracking in libaio, since otherwise we depend on being able to submit all the io_u's we are handed. So change the tracking to be a ring buffer, with submission bumping the head and completion bumping the tail. With this in place, we can't stop issuing IO at anytime, since we'll just continue where we left off. Signed-off-by: Jens Axboe --- diff --git a/engines/libaio.c b/engines/libaio.c index 9cc910d7..bcc296ff 100644 --- a/engines/libaio.c +++ b/engines/libaio.c @@ -18,7 +18,20 @@ struct libaio_data { struct io_event *aio_events; struct iocb **iocbs; struct io_u **io_us; - int iocbs_nr; + + /* + * Basic ring buffer. 'head' is incremented in _queue(), and + * 'tail' is incremented in _commit(). We keep 'queued' so + * that we know if the ring is full or empty, when + * 'head' == 'tail'. 'entries' is the ring size, and + * 'is_pow2' is just an optimization to use AND instead of + * modulus to get the remainder on ring increment. + */ + int is_pow2; + unsigned int entries; + unsigned int queued; + unsigned int head; + unsigned int tail; }; struct libaio_options { @@ -41,6 +54,15 @@ static struct fio_option options[] = { }, }; +static inline void ring_inc(struct libaio_data *ld, unsigned int *val, + unsigned int add) +{ + if (ld->is_pow2) + *val = (*val + add) & (ld->entries - 1); + else + *val = (*val + add) % ld->entries; +} + static int fio_libaio_prep(struct thread_data fio_unused *td, struct io_u *io_u) { struct fio_file *f = io_u->file; @@ -150,7 +172,7 @@ static int fio_libaio_queue(struct thread_data *td, struct io_u *io_u) fio_ro_check(td, io_u); - if (ld->iocbs_nr == (int) td->o.iodepth) + if (ld->queued == td->o.iodepth) return FIO_Q_BUSY; /* @@ -160,7 +182,7 @@ static int fio_libaio_queue(struct thread_data *td, struct io_u *io_u) * have pending io, to let fio complete those first. */ if (ddir_sync(io_u->ddir)) { - if (ld->iocbs_nr) + if (ld->queued) return FIO_Q_BUSY; do_io_u_sync(td, io_u); @@ -168,16 +190,17 @@ static int fio_libaio_queue(struct thread_data *td, struct io_u *io_u) } if (io_u->ddir == DDIR_TRIM) { - if (ld->iocbs_nr) + if (ld->queued) return FIO_Q_BUSY; do_io_u_trim(td, io_u); return FIO_Q_COMPLETED; } - ld->iocbs[ld->iocbs_nr] = &io_u->iocb; - ld->io_us[ld->iocbs_nr] = io_u; - ld->iocbs_nr++; + ld->iocbs[ld->head] = &io_u->iocb; + ld->io_us[ld->head] = io_u; + ring_inc(ld, &ld->head, 1); + ld->queued++; return FIO_Q_QUEUED; } @@ -207,27 +230,39 @@ static int fio_libaio_commit(struct thread_data *td) struct io_u **io_us; int ret; - if (!ld->iocbs_nr) + if (!ld->queued) return 0; - io_us = ld->io_us; - iocbs = ld->iocbs; do { - ret = io_submit(ld->aio_ctx, ld->iocbs_nr, iocbs); + long nr = ld->queued; + + nr = min((unsigned int) nr, ld->entries - ld->tail); + io_us = ld->io_us + ld->tail; + iocbs = ld->iocbs + ld->tail; + + ret = io_submit(ld->aio_ctx, nr, iocbs); if (ret > 0) { fio_libaio_queued(td, io_us, ret); io_u_mark_submit(td, ret); - ld->iocbs_nr -= ret; - io_us += ret; - iocbs += ret; + + ld->queued -= ret; + ring_inc(ld, &ld->tail, ret); ret = 0; - } else if (!ret || ret == -EAGAIN || ret == -EINTR) { + } else if (ret == -EINTR || !ret) { if (!ret) io_u_mark_submit(td, ret); continue; + } else if (ret == -EAGAIN) { + /* + * If we get EAGAIN, we should break out without + * error and let the upper layer reap some + * events for us. + */ + ret = 0; + break; } else break; - } while (ld->iocbs_nr); + } while (ld->head != ld->tail); return ret; } @@ -254,11 +289,11 @@ static void fio_libaio_cleanup(struct thread_data *td) static int fio_libaio_init(struct thread_data *td) { - struct libaio_data *ld = malloc(sizeof(*ld)); struct libaio_options *o = td->eo; + struct libaio_data *ld; int err = 0; - memset(ld, 0, sizeof(*ld)); + ld = calloc(1, sizeof(*ld)); /* * First try passing in 0 for queue depth, since we don't @@ -276,13 +311,11 @@ static int fio_libaio_init(struct thread_data *td) return 1; } - ld->aio_events = malloc(td->o.iodepth * sizeof(struct io_event)); - memset(ld->aio_events, 0, td->o.iodepth * sizeof(struct io_event)); - ld->iocbs = malloc(td->o.iodepth * sizeof(struct iocb *)); - memset(ld->iocbs, 0, sizeof(struct iocb *)); - ld->io_us = malloc(td->o.iodepth * sizeof(struct io_u *)); - memset(ld->io_us, 0, td->o.iodepth * sizeof(struct io_u *)); - ld->iocbs_nr = 0; + ld->entries = td->o.iodepth; + ld->is_pow2 = is_power_of_2(ld->entries); + ld->aio_events = calloc(ld->entries, sizeof(struct io_event)); + ld->iocbs = calloc(ld->entries, sizeof(struct iocb *)); + ld->io_us = calloc(ld->entries, sizeof(struct io_u *)); td->io_ops->data = ld; return 0;