#include <libaio.h>
#include "../fio.h"
+#include "../lib/pow2.h"
+
+static int fio_libaio_commit(struct thread_data *td);
struct libaio_data {
io_context_t aio_ctx;
};
struct libaio_options {
- struct thread_data *td;
+ void *pad;
unsigned int userspace_reap;
};
}
static int fio_libaio_getevents(struct thread_data *td, unsigned int min,
- unsigned int max, struct timespec *t)
+ unsigned int max, const struct timespec *t)
{
struct libaio_data *ld = td->io_ops->data;
struct libaio_options *o = td->eo;
- unsigned actual_min = td->o.iodepth_batch_complete == 0 ? 0 : min;
+ unsigned actual_min = td->o.iodepth_batch_complete_min == 0 ? 0 : min;
+ struct timespec __lt, *lt = NULL;
int r, events = 0;
+ if (t) {
+ __lt = *t;
+ lt = &__lt;
+ }
+
do {
if (o->userspace_reap == 1
&& actual_min == 0
ld->aio_events + events);
} else {
r = io_getevents(ld->aio_ctx, actual_min,
- max, ld->aio_events + events, t);
+ max, ld->aio_events + events, lt);
}
- if (r >= 0)
+ if (r > 0)
events += r;
- else if (r == -EAGAIN)
+ else if ((min && r == 0) || r == -EAGAIN) {
+ fio_libaio_commit(td);
usleep(100);
+ } else if (r != -EINTR)
+ break;
} while (events < min);
return r < 0 ? r : events;
struct libaio_data *ld = td->io_ops->data;
struct iocb **iocbs;
struct io_u **io_us;
- int ret;
+ struct timeval tv;
+ int ret, wait_start = 0;
if (!ld->queued)
return 0;
ld->queued -= ret;
ring_inc(ld, &ld->tail, ret);
ret = 0;
+ wait_start = 0;
} else if (ret == -EINTR || !ret) {
if (!ret)
io_u_mark_submit(td, ret);
+ wait_start = 0;
continue;
} else if (ret == -EAGAIN) {
/*
* If we get EAGAIN, we should break out without
* error and let the upper layer reap some
- * events for us.
+ * events for us. If we have no queued IO, we
+ * must loop here. If we loop for more than 30s,
+ * just error out, something must be buggy in the
+ * IO path.
*/
- ret = 0;
+ if (ld->queued) {
+ ret = 0;
+ break;
+ }
+ if (!wait_start) {
+ fio_gettime(&tv, NULL);
+ wait_start = 1;
+ } else if (mtime_since_now(&tv) > 30000) {
+ log_err("fio: aio appears to be stalled, giving up\n");
+ break;
+ }
+ usleep(1);
+ continue;
+ } else if (ret == -ENOMEM) {
+ /*
+ * If we get -ENOMEM, reap events if we can. If
+ * we cannot, treat it as a fatal event since there's
+ * nothing we can do about it.
+ */
+ if (ld->queued)
+ ret = 0;
break;
} else
break;
- } while (ld->head != ld->tail);
+ } while (ld->queued);
return ret;
}
struct libaio_data *ld = td->io_ops->data;
if (ld) {
- io_destroy(ld->aio_ctx);
+ /*
+ * Work-around to avoid huge RCU stalls at exit time. If we
+ * don't do this here, then it'll be torn down by exit_aio().
+ * But for that case we can parallellize the freeing, thus
+ * speeding it up a lot.
+ */
+ if (!(td->flags & TD_F_CHILD))
+ io_destroy(ld->aio_ctx);
free(ld->aio_events);
free(ld->iocbs);
free(ld->io_us);