From: Jens Axboe Date: Fri, 21 Oct 2005 12:11:36 +0000 (+0200) Subject: [PATCH] fio: Add support for async io X-Git-Url: https://git.kernel.dk/?a=commitdiff_plain;h=430001184acc843d058cab7032228b9dd7ef263f;p=disktools.git [PATCH] fio: Add support for async io --- diff --git a/Makefile b/Makefile index bb25370..778ce00 100644 --- a/Makefile +++ b/Makefile @@ -8,7 +8,7 @@ dops: dops.o $(CC) $(CFLAGS) -o $@ $(filter %.o,$^) -laio fio: fio.o - $(CC) $(CFLAGS) -o $@ $(filter %.o,$^) -lpthread + $(CC) $(CFLAGS) -o $@ $(filter %.o,$^) -lpthread -laio sgioread: sgioread.o $(CC) $(CFLAGS) -o $@ $(filter %.o,$^) diff --git a/README.fio b/README.fio index 9425444..c362f79 100644 --- a/README.fio +++ b/README.fio @@ -38,6 +38,8 @@ The format is as follows: cpumask=x Allow job to run on CPUs defined by mask fsync=x If writing, fsync after every x blocks have been written startdelay=x Start this thread x seconds after startup + aio Use Linux async io + aio_depth=x Allow x iocbs in flight Examples using cmd line jobs diff --git a/fio.c b/fio.c index d7fbab4..ebb6466 100644 --- a/fio.c +++ b/fio.c @@ -28,6 +28,7 @@ #include #include #include +#include #include #include #include @@ -141,8 +142,16 @@ struct thread_data { unsigned int delay_sleep; unsigned int fsync_blocks; unsigned int start_delay; + unsigned int use_aio; cpu_set_t cpumask; + io_context_t *aio_ctx; + struct iocb *aio_iocbs; + unsigned int aio_depth; + unsigned int aio_cur_depth; + struct io_event *aio_events; + char *aio_iocbs_status; + unsigned int rate; unsigned int ratemin; unsigned int ratecycle; @@ -385,7 +394,7 @@ static int check_min_rate(struct thread_data *td, struct timeval *now) #define should_fsync(td) ((td)->ddir == DDIR_WRITE && !(td)->odirect) -static void do_thread_io(struct thread_data *td) +static void do_sync_io(struct thread_data *td) { struct timeval s, e; char *buffer, *ptr; @@ -461,7 +470,167 @@ static void do_thread_io(struct thread_data *td) free(ptr); } - + +static void aio_put_iocb(struct thread_data *td, struct iocb *iocb) +{ + long offset = ((long) iocb - (long) td->aio_iocbs)/ sizeof(struct iocb); + + td->aio_iocbs_status[offset] = 0; +} + +static struct iocb *aio_get_iocb(struct thread_data *td, char *buffer) +{ + struct iocb *iocb = NULL; + int i; + + for (i = 0; i < td->aio_depth; i++) { + if (td->aio_iocbs_status[i] == 0) { + td->aio_iocbs_status[i] = 1; + iocb = &td->aio_iocbs[i]; + break; + } + } + + if (iocb) { + off_t off = get_next_offset(td); + char *p = buffer + i * td->bs; + + if (td->ddir == DDIR_READ) + io_prep_pread(iocb, td->fd, p, td->bs, off); + else + io_prep_pwrite(iocb, td->fd, p, td->bs, off); + } + + return iocb; +} + +static void do_async_io(struct thread_data *td) +{ + struct timeval s, e; + char *buf, *ptr; + unsigned long blocks, msec, usec; + int max_depth = 0; + + ptr = malloc(td->bs * td->aio_depth + MASK); + buf = ALIGN(ptr); + + gettimeofday(&td->start, NULL); + + if (td->ratemin) + memcpy(&td->lastrate, &td->start, sizeof(td->start)); + + for (blocks = 0; blocks < td->blocks; blocks++) { + struct timespec ts = { .tv_sec = 0, .tv_nsec = 0}; + struct timespec *timeout; + struct iocb *iocb = aio_get_iocb(td, buf); + int ret, i, min_evts = 0; + + if (td->terminate) + break; + + if (td->delay_sleep) + usec_sleep(td->delay_sleep); + + gettimeofday(&s, NULL); + + ret = io_submit(*td->aio_ctx, 1, &iocb); + if (ret < 0) { + td->error = errno; + break; + } + + td->aio_cur_depth++; + if (td->aio_cur_depth > max_depth) { + max_depth = td->aio_cur_depth; + printf("max now %d\n", max_depth); + } + + if (td->aio_cur_depth < td->aio_depth) { + timeout = &ts; + min_evts = 0; + } else { + timeout = NULL; + min_evts = 1; + } + + ret = io_getevents(*td->aio_ctx, min_evts, td->aio_cur_depth, td->aio_events, timeout); + if (ret < 0) { + td->error = errno; + break; + } else if (!ret) + continue; + + for (i = 0; i < ret; i++) { + struct io_event *ev = td->aio_events + i; + + td->io_blocks++; + td->aio_cur_depth--; + + iocb = ev->obj; + aio_put_iocb(td, iocb); + } + + gettimeofday(&e, NULL); + + usec = utime_since(&s, &e); + + rate_throttle(td, usec); + + if (check_min_rate(td, &e)) { + td->error = ENODATA; + break; + } + + msec = usec / 1000; + add_stat_sample(td, msec); + + if (msec < td->min_latency) + td->min_latency = msec; + if (msec > td->max_latency) + td->max_latency = msec; + } + + gettimeofday(&e, NULL); + td->runtime = mtime_since(&td->start, &e); + + free(ptr); +} + +static void cleanup_aio(struct thread_data *td) +{ + /* + * flush pending events + */ + if (td->aio_cur_depth) + io_getevents(*td->aio_ctx, td->aio_cur_depth, td->aio_cur_depth, td->aio_events, NULL); + + if (td->aio_ctx) { + io_destroy(*td->aio_ctx); + free(td->aio_ctx); + } + if (td->aio_iocbs) + free(td->aio_iocbs); + if (td->aio_events) + free(td->aio_events); + if (td->aio_iocbs_status) + free(td->aio_iocbs_status); +} + +static int init_aio(struct thread_data *td) +{ + td->aio_ctx = malloc(sizeof(*td->aio_ctx)); + + if (io_queue_init(td->aio_depth, td->aio_ctx)) { + td->error = errno; + return 1; + } + + td->aio_iocbs = malloc(td->aio_depth * sizeof(struct iocb)); + td->aio_events = malloc(td->aio_depth * sizeof(struct io_event)); + td->aio_iocbs_status = malloc(td->aio_depth * sizeof(char)); + return 0; +} + static void *thread_main(int shm_id, int offset, char *argv[]) { struct thread_data *td; @@ -480,7 +649,7 @@ static void *thread_main(int shm_id, int offset, char *argv[]) goto err; } - printf("Thread (%s) (pid=%u) (f=%s) started\n", td->ddir == DDIR_READ ? "read" : "write", td->pid, td->file_name); + printf("Thread (%s) (pid=%u) (f=%s) (aio=%d) started\n", td->ddir == DDIR_READ ? "read" : "write", td->pid, td->file_name, td->use_aio); fflush(stdout); sprintf(argv[0], "fio%d", offset); @@ -499,6 +668,9 @@ static void *thread_main(int shm_id, int offset, char *argv[]) goto err; } + if (td->use_aio && init_aio(td)) + goto err; + if (init_random_state(td)) goto out; if (init_stat_file(td)) @@ -527,7 +699,12 @@ static void *thread_main(int shm_id, int offset, char *argv[]) sem_post(&startup_sem); sem_wait(&td->mutex); - do_thread_io(td); + + if (!td->use_aio) + do_sync_io(td); + else + do_async_io(td); + ret = 0; out: @@ -535,6 +712,8 @@ out: err: if (td->fd != -1) close(td->fd); + if (td->use_aio) + cleanup_aio(td); if (ret) sem_post(&startup_sem); @@ -602,6 +781,9 @@ static struct thread_data *get_new_job(void) td->ratecycle = DEF_RATE_CYCLE; td->sequential = sequential; td->ioprio = 0; + td->use_aio = 0; + td->aio_depth = 0; + td->aio_cur_depth = 0; memcpy(&td->cpumask, &def_cpumask, sizeof(td->cpumask)); return td; @@ -622,10 +804,13 @@ static int add_job(struct thread_data *td, const char *filename, int prioclass, td->min_latency = 10000000; td->ioprio = (prioclass << IOPRIO_CLASS_SHIFT) | prio; + if (td->use_aio && !td->aio_depth) + td->aio_depth = 1; + if (setup_rate(td)) return -1; - printf("Client%d: file=%s, rw=%d, prio=%d, seq=%d, odir=%d, bs=%d, rate=%d\n", td->thread_number, filename, td->ddir, td->ioprio, td->sequential, td->odirect, td->bs, td->rate); + printf("Client%d: file=%s, rw=%d, prio=%d, seq=%d, odir=%d, bs=%d, rate=%d, aio=%d, aio_depth=%d\n", td->thread_number, filename, td->ddir, td->ioprio, td->sequential, td->odirect, td->bs, td->rate, td->use_aio, td->aio_depth); return 0; } @@ -782,6 +967,17 @@ static void parse_jobs_cmd(int argc, char *argv[], int index) td->start_delay = strtoul(string, NULL, 10); } + c = strstr(p, "aio_depth="); + if (c) { + c += 10; + fill_option(c, string); + td->aio_depth = strtoul(string, NULL, 10); + } + + c = strstr(p, "aio"); + if (c) + td->use_aio = 1; + c = strstr(p, "random"); if (c) td->sequential = 0; @@ -910,16 +1106,26 @@ static int parse_jobs_ini(char *file) fgetpos(f, &off); continue; } - if (!strcmp(p, "sequential")) { + if (!check_int(p, "aio_depth", &td->aio_depth)) { + fgetpos(f, &off); + continue; + } + if (!strncmp(p, "sequential", 10)) { td->sequential = 1; fgetpos(f, &off); continue; } - if (!strcmp(p, "random")) { + if (!strncmp(p, "random", 6)) { td->sequential = 0; fgetpos(f, &off); continue; } + if (!strncmp(p, "aio", 3)) { + td->use_aio = 1; + fgetpos(f, &off); + continue; + } + printf("Client%d: bad option %s\n",td->thread_number,p); } fsetpos(f, &off);