From 755200a326a33e5e19b16dfd5e013dd98bcf1916 Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Mon, 19 Feb 2007 13:08:12 +0100 Subject: [PATCH] Add support for queuing > 1 command at the time For the async engines, we currently do queuing by issuing one command at the the time. Improve this by adding a ->commit() hook to complement the ->queue() hook. When ->queue() returns FIO_Q_BUSY, call ->commit() to actually send off the io to the kernel. Signed-off-by: Jens Axboe --- engines/libaio.c | 81 ++++++++++++++++++++++++++++++++---------------- fio.c | 23 +++++++++++--- fio.h | 8 ++++- io_u.c | 38 ++++++++++++++++++++--- ioengines.c | 11 +++++++ 5 files changed, 125 insertions(+), 36 deletions(-) diff --git a/engines/libaio.c b/engines/libaio.c index cb488efb..510ecab4 100644 --- a/engines/libaio.c +++ b/engines/libaio.c @@ -18,6 +18,8 @@ struct libaio_data { io_context_t aio_ctx; struct io_event *aio_events; + struct iocb **iocbs; + int iocbs_nr; }; static int fio_libaio_prep(struct thread_data fio_unused *td, struct io_u *io_u) @@ -68,40 +70,60 @@ static int fio_libaio_getevents(struct thread_data *td, int min, int max, static int fio_libaio_queue(struct thread_data *td, struct io_u *io_u) { struct libaio_data *ld = td->io_ops->data; - struct iocb *iocb = &io_u->iocb; - long ret; + if (ld->iocbs_nr == (int) td->iodepth) + return FIO_Q_BUSY; + + /* + * fsync is tricky, since it can fail and we need to do it + * serialized with other io. the reason is that linux doesn't + * support aio fsync yet. So return busy for the case where we + * have pending io, to let fio complete those first. + */ + if (io_u->ddir == DDIR_SYNC) { + if (ld->iocbs_nr) + return FIO_Q_BUSY; + if (fsync(io_u->file->fd) < 0) + io_u->error = errno; + + return FIO_Q_COMPLETED; + } + + ld->iocbs[ld->iocbs_nr] = &io_u->iocb; + ld->iocbs_nr++; + return FIO_Q_QUEUED; +} + +static int fio_libaio_commit(struct thread_data *td) +{ + struct libaio_data *ld = td->io_ops->data; + struct iocb **iocbs; + int ret, iocbs_nr; + + if (!ld->iocbs_nr) + return 0; + + iocbs_nr = ld->iocbs_nr; + iocbs = ld->iocbs; do { - ret = io_submit(ld->aio_ctx, 1, &iocb); - if (ret == 1) - return FIO_Q_QUEUED; - else if (ret == -EAGAIN || !ret) + ret = io_submit(ld->aio_ctx, iocbs_nr, iocbs); + if (ret == iocbs_nr) { + ret = 0; + break; + } else if (ret > 0) { + iocbs += ret; + iocbs_nr -= ret; + continue; + } else if (ret == -EAGAIN || !ret) usleep(100); else if (ret == -EINTR) continue; - else if (ret == -EINVAL && io_u->ddir == DDIR_SYNC) { - /* - * the async fsync doesn't currently seem to be - * supported, so just fsync if we fail with EINVAL - * for a sync. since buffered io is also sync - * with libaio (still), we don't have pending - * requests to flush first. - */ - if (fsync(io_u->file->fd) < 0) - ret = -errno; - else - ret = FIO_Q_COMPLETED; - break; - } else + else break; } while (1); - if (ret <= 0) { - io_u->resid = io_u->xfer_buflen; - io_u->error = -ret; - td_verror(td, io_u->error); - return FIO_Q_COMPLETED; - } + if (!ret) + ld->iocbs_nr = 0; return ret; } @@ -121,6 +143,8 @@ static void fio_libaio_cleanup(struct thread_data *td) io_destroy(ld->aio_ctx); if (ld->aio_events) free(ld->aio_events); + if (ld->iocbs) + free(ld->iocbs); free(ld); td->io_ops->data = NULL; @@ -140,6 +164,10 @@ static int fio_libaio_init(struct thread_data *td) ld->aio_events = malloc(td->iodepth * sizeof(struct io_event)); memset(ld->aio_events, 0, td->iodepth * sizeof(struct io_event)); + ld->iocbs = malloc(td->iodepth * sizeof(struct iocb *)); + memset(ld->iocbs, 0, sizeof(struct iocb *)); + ld->iocbs_nr = 0; + td->io_ops->data = ld; return 0; } @@ -150,6 +178,7 @@ static struct ioengine_ops ioengine = { .init = fio_libaio_init, .prep = fio_libaio_prep, .queue = fio_libaio_queue, + .commit = fio_libaio_commit, .cancel = fio_libaio_cancel, .getevents = fio_libaio_getevents, .event = fio_libaio_event, diff --git a/fio.c b/fio.c index 6e78949b..858d6b81 100644 --- a/fio.c +++ b/fio.c @@ -209,6 +209,7 @@ static int fio_io_sync(struct thread_data *td, struct fio_file *f) return 1; } +requeue: ret = td_io_queue(td, io_u); if (ret < 0) { td_verror(td, io_u->error); @@ -224,6 +225,10 @@ static int fio_io_sync(struct thread_data *td, struct fio_file *f) } io_u_sync_complete(td, io_u, NULL); + } else if (ret == FIO_Q_BUSY) { + if (td_io_commit(td)) + return 1; + goto requeue; } return 0; @@ -285,6 +290,10 @@ requeue: continue; case FIO_Q_QUEUED: break; + case FIO_Q_BUSY: + requeue_io_u(td, &io_u); + ret = td_io_commit(td); + break; default: assert(ret < 0); td_verror(td, -ret); @@ -299,7 +308,7 @@ requeue: * completed io_u's first. */ min_events = 0; - if (queue_full(td)) + if (queue_full(td) || ret == FIO_Q_BUSY) min_events = 1; /* @@ -403,6 +412,10 @@ requeue: break; case FIO_Q_QUEUED: break; + case FIO_Q_BUSY: + requeue_io_u(td, &io_u); + ret = td_io_commit(td); + break; default: assert(ret < 0); put_io_u(td, io_u); @@ -412,14 +425,15 @@ requeue: if (ret < 0 || td->error) break; - add_slat_sample(td, io_u->ddir, mtime_since(&io_u->start_time, &io_u->issue_time)); + if (io_u) + add_slat_sample(td, io_u->ddir, mtime_since(&io_u->start_time, &io_u->issue_time)); /* * See if we need to complete some commands */ - if (ret == FIO_Q_QUEUED) { + if (ret == FIO_Q_QUEUED || ret == FIO_Q_BUSY) { min_evts = 0; - if (queue_full(td)) + if (queue_full(td) || ret == FIO_Q_BUSY) min_evts = 1; fio_gettime(&comp_time, NULL); @@ -633,6 +647,7 @@ static void *thread_main(void *data) INIT_LIST_HEAD(&td->io_u_freelist); INIT_LIST_HEAD(&td->io_u_busylist); + INIT_LIST_HEAD(&td->io_u_requeues); INIT_LIST_HEAD(&td->io_hist_list); INIT_LIST_HEAD(&td->io_log_list); diff --git a/fio.h b/fio.h index aa66ecd4..baaa9a8d 100644 --- a/fio.h +++ b/fio.h @@ -135,6 +135,7 @@ struct io_u { enum { FIO_Q_COMPLETED = 0, /* completed sync */ FIO_Q_QUEUED = 1, /* queued, will complete async */ + FIO_Q_BUSY = 2, /* no more room, call ->commit() */ }; #define FIO_HDR_MAGIC 0xf00baaef @@ -334,6 +335,7 @@ struct thread_data { unsigned long total_io_u; struct list_head io_u_freelist; struct list_head io_u_busylist; + struct list_head io_u_requeues; /* * Rate state @@ -352,6 +354,7 @@ struct thread_data { unsigned long long start_offset; unsigned long long total_io_size; + unsigned long io_issues[2]; unsigned long long io_blocks[2]; unsigned long long io_bytes[2]; unsigned long long zone_bytes; @@ -608,6 +611,7 @@ extern void free_io_mem(struct thread_data *); extern struct io_u *__get_io_u(struct thread_data *); extern struct io_u *get_io_u(struct thread_data *, struct fio_file *); extern void put_io_u(struct thread_data *, struct io_u *); +extern void requeue_io_u(struct thread_data *, struct io_u **); extern long io_u_sync_complete(struct thread_data *, struct io_u *, endio_handler *); extern long io_u_queued_complete(struct thread_data *, int, endio_handler *); @@ -619,6 +623,7 @@ extern int td_io_prep(struct thread_data *, struct io_u *); extern int td_io_queue(struct thread_data *, struct io_u *); extern int td_io_sync(struct thread_data *, struct fio_file *); extern int td_io_getevents(struct thread_data *, int, int, struct timespec *); +extern int td_io_commit(struct thread_data *); /* * This is a pretty crappy semaphore implementation, but with the use that fio @@ -662,6 +667,7 @@ struct ioengine_ops { int (*init)(struct thread_data *); int (*prep)(struct thread_data *, struct io_u *); int (*queue)(struct thread_data *, struct io_u *); + int (*commit)(struct thread_data *); int (*getevents)(struct thread_data *, int, int, struct timespec *); struct io_u *(*event)(struct thread_data *, int); int (*cancel)(struct thread_data *, struct io_u *); @@ -671,7 +677,7 @@ struct ioengine_ops { unsigned long priv; }; -#define FIO_IOOPS_VERSION 4 +#define FIO_IOOPS_VERSION 5 extern struct ioengine_ops *load_ioengine(struct thread_data *, const char *); extern int register_ioengine(struct ioengine_ops *); diff --git a/io_u.c b/io_u.c index c04c9de0..781599f4 100644 --- a/io_u.c +++ b/io_u.c @@ -199,6 +199,16 @@ void put_io_u(struct thread_data *td, struct io_u *io_u) td->cur_depth--; } +void requeue_io_u(struct thread_data *td, struct io_u **io_u) +{ + struct io_u *__io_u = *io_u; + + list_del(&__io_u->list); + list_add_tail(&__io_u->list, &td->io_u_requeues); + td->cur_depth--; + *io_u = NULL; +} + static int fill_io_u(struct thread_data *td, struct fio_file *f, struct io_u *io_u) { @@ -211,8 +221,8 @@ static int fill_io_u(struct thread_data *td, struct fio_file *f, /* * see if it's time to sync */ - if (td->fsync_blocks && !(td->io_blocks[DDIR_WRITE] % td->fsync_blocks) - && should_fsync(td)) { + if (td->fsync_blocks && !(td->io_issues[DDIR_WRITE] % td->fsync_blocks) + && td->io_issues[DDIR_WRITE] && should_fsync(td)) { io_u->ddir = DDIR_SYNC; io_u->file = f; return 0; @@ -310,12 +320,18 @@ struct io_u *__get_io_u(struct thread_data *td) { struct io_u *io_u = NULL; - if (!queue_full(td)) { + if (!list_empty(&td->io_u_requeues)) + io_u = list_entry(td->io_u_requeues.next, struct io_u, list); + else if (!queue_full(td)) { io_u = list_entry(td->io_u_freelist.next, struct io_u, list); io_u->buflen = 0; - io_u->error = 0; io_u->resid = 0; + io_u->file = NULL; + } + + if (io_u) { + io_u->error = 0; list_del(&io_u->list); list_add(&io_u->list, &td->io_u_busylist); td->cur_depth++; @@ -337,6 +353,12 @@ struct io_u *get_io_u(struct thread_data *td, struct fio_file *f) if (!io_u) return NULL; + /* + * from a requeue, io_u already setup + */ + if (io_u->file) + return io_u; + if (td->zone_bytes >= td->zone_size) { td->zone_bytes = 0; f->last_pos += td->zone_skip; @@ -477,8 +499,14 @@ long io_u_queued_complete(struct thread_data *td, int min_events, struct io_completion_data icd; int ret; - if (min_events > 0) + if (min_events > 0) { tsp = &ts; + ret = td_io_commit(td); + if (ret < 0) { + td_verror(td, -ret); + return ret; + } + } ret = td_io_getevents(td, min_events, td->cur_depth, tsp); if (ret < 0) { diff --git a/ioengines.c b/ioengines.c index 1b510dfe..16ea928f 100644 --- a/ioengines.c +++ b/ioengines.c @@ -189,6 +189,9 @@ int td_io_queue(struct thread_data *td, struct io_u *io_u) { fio_gettime(&io_u->issue_time, NULL); + if (io_u->ddir != DDIR_SYNC) + td->io_issues[io_u->ddir]++; + return td->io_ops->queue(td, io_u); } @@ -199,3 +202,11 @@ int td_io_init(struct thread_data *td) return 0; } + +int td_io_commit(struct thread_data *td) +{ + if (td->io_ops->commit) + return td->io_ops->commit(td); + + return 0; +} -- 2.25.1