long offset = ((long) iocb - (long) td->aio_iocbs)/ sizeof(struct iocb);
td->aio_iocbs_status[offset] = 0;
+ td->aio_cur_depth--;
}
static struct iocb *aio_get_iocb(struct thread_data *td, char *buffer,
struct timeval *t)
{
struct iocb *iocb = NULL;
- int i;
+ unsigned int i;
for (i = 0; i < td->aio_depth; i++) {
if (td->aio_iocbs_status[i] == 0) {
return iocb;
}
+static int aio_submit(struct thread_data *td, struct iocb *iocb)
+{
+ int ret;
+
+ do {
+ ret = io_submit(*td->aio_ctx, 1, &iocb);
+ if (ret == 1)
+ return 0;
+
+ if (errno == EINTR)
+ continue;
+ else if (errno == EAGAIN)
+ usleep(100);
+ else
+ break;
+ } while (1);
+
+ return 1;
+}
+
#define iocb_time(iocb) ((unsigned long) (iocb)->data)
static void do_async_io(struct thread_data *td)
iocb = aio_get_iocb(td, buf, &s);
- ret = io_submit(*td->aio_ctx, 1, &iocb);
- if (ret < 0) {
+ ret = aio_submit(td, iocb);
+ if (ret) {
td->error = errno;
break;
}
struct io_event *ev = td->aio_events + i;
td->io_blocks++;
- td->aio_cur_depth--;
iocb = ev->obj;
free(ptr);
}
-static void cleanup_aio(struct thread_data *td)
+static void cleanup_pending_aio(struct thread_data *td)
{
+ struct timespec ts = { .tv_sec = 0, .tv_nsec = 0};
+ unsigned int i;
+ int r;
+
/*
- * flush pending events
+ * get immediately available events, if any
*/
+ r = io_getevents(*td->aio_ctx, 0, td->aio_cur_depth, td->aio_events, &ts);
+ if (r > 0) {
+ for (i = 0; i < r; i++)
+ aio_put_iocb(td, &td->aio_iocbs[i]);
+ }
+
+ /*
+ * now cancel remaining active events
+ */
+ for (i = 0; i < td->aio_depth; i++) {
+ if (td->aio_iocbs_status[i] == 0)
+ continue;
+
+ r = io_cancel(*td->aio_ctx, &td->aio_iocbs[i], td->aio_events);
+ if (!r)
+ aio_put_iocb(td, &td->aio_iocbs[i]);
+ }
+
if (td->aio_cur_depth)
io_getevents(*td->aio_ctx, td->aio_cur_depth, td->aio_cur_depth, td->aio_events, NULL);
+}
+
+static void cleanup_aio(struct thread_data *td)
+{
+ if (td->aio_cur_depth)
+ cleanup_pending_aio(td);
if (td->aio_ctx) {
io_destroy(*td->aio_ctx);
if (timeout) {
signal(SIGALRM, sig_handler);
+ signal(SIGINT, sig_handler);
alarm(timeout);
}