[PATCH] fio: Add support for async io
authorJens Axboe <axboe@suse.de>
Fri, 21 Oct 2005 12:11:36 +0000 (14:11 +0200)
committerJens Axboe <axboe@suse.de>
Fri, 21 Oct 2005 12:11:36 +0000 (14:11 +0200)
Makefile
README.fio
fio.c

index bb2537011388e9d8fcebe673a01099e264c3e196..778ce00dd1676f686b245baddeb0467007606e21 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -8,7 +8,7 @@ dops: dops.o
        $(CC) $(CFLAGS) -o $@ $(filter %.o,$^) -laio
 
 fio: fio.o
-       $(CC) $(CFLAGS) -o $@ $(filter %.o,$^) -lpthread
+       $(CC) $(CFLAGS) -o $@ $(filter %.o,$^) -lpthread -laio
 
 sgioread: sgioread.o
        $(CC) $(CFLAGS) -o $@ $(filter %.o,$^)
index 942544404b5da0144a3f990795a776276fd72947..c362f79b952f13955738c8c64b7d795499fe3b59 100644 (file)
@@ -38,6 +38,8 @@ The <jobs> format is as follows:
        cpumask=x       Allow job to run on CPUs defined by mask
        fsync=x         If writing, fsync after every x blocks have been written
        startdelay=x    Start this thread x seconds after startup
+       aio             Use Linux async io
+       aio_depth=x     Allow x iocbs in flight
 
 
 Examples using cmd line jobs
diff --git a/fio.c b/fio.c
index d7fbab4e5170f9b4275adbd2faf6b7f7b8dba2e1..ebb64664cfa3c6223df58cd24a14ae43c158d05a 100644 (file)
--- a/fio.c
+++ b/fio.c
@@ -28,6 +28,7 @@
 #include <time.h>
 #include <ctype.h>
 #include <sched.h>
+#include <libaio.h>
 #include <sys/time.h>
 #include <sys/types.h>
 #include <sys/stat.h>
@@ -141,8 +142,16 @@ struct thread_data {
        unsigned int delay_sleep;
        unsigned int fsync_blocks;
        unsigned int start_delay;
+       unsigned int use_aio;
        cpu_set_t cpumask;
 
+       io_context_t *aio_ctx;
+       struct iocb *aio_iocbs;
+       unsigned int aio_depth;
+       unsigned int aio_cur_depth;
+       struct io_event *aio_events;
+       char *aio_iocbs_status;
+
        unsigned int rate;
        unsigned int ratemin;
        unsigned int ratecycle;
@@ -385,7 +394,7 @@ static int check_min_rate(struct thread_data *td, struct timeval *now)
 
 #define should_fsync(td)       ((td)->ddir == DDIR_WRITE && !(td)->odirect)
 
-static void do_thread_io(struct thread_data *td)
+static void do_sync_io(struct thread_data *td)
 {
        struct timeval s, e;
        char *buffer, *ptr;
@@ -461,7 +470,167 @@ static void do_thread_io(struct thread_data *td)
 
        free(ptr);
 }
-       
+
+static void aio_put_iocb(struct thread_data *td, struct iocb *iocb)
+{
+       long offset = ((long) iocb - (long) td->aio_iocbs)/ sizeof(struct iocb);
+
+       td->aio_iocbs_status[offset] = 0;
+}
+
+static struct iocb *aio_get_iocb(struct thread_data *td, char *buffer)
+{
+       struct iocb *iocb = NULL;
+       int i;
+
+       for (i = 0; i < td->aio_depth; i++) {
+               if (td->aio_iocbs_status[i] == 0) {
+                       td->aio_iocbs_status[i] = 1;
+                       iocb = &td->aio_iocbs[i];
+                       break;
+               }
+       }
+
+       if (iocb) {
+               off_t off = get_next_offset(td);
+               char *p = buffer + i * td->bs;
+
+               if (td->ddir == DDIR_READ)
+                       io_prep_pread(iocb, td->fd, p, td->bs, off);
+               else
+                       io_prep_pwrite(iocb, td->fd, p, td->bs, off);
+       }
+
+       return iocb;
+}
+
+static void do_async_io(struct thread_data *td)
+{
+       struct timeval s, e;
+       char *buf, *ptr;
+       unsigned long blocks, msec, usec;
+       int max_depth = 0;
+
+       ptr = malloc(td->bs * td->aio_depth + MASK);
+       buf = ALIGN(ptr);
+
+       gettimeofday(&td->start, NULL);
+
+       if (td->ratemin)
+               memcpy(&td->lastrate, &td->start, sizeof(td->start));
+
+       for (blocks = 0; blocks < td->blocks; blocks++) {
+               struct timespec ts = { .tv_sec = 0, .tv_nsec = 0};
+               struct timespec *timeout;
+               struct iocb *iocb = aio_get_iocb(td, buf);
+               int ret, i, min_evts = 0;
+
+               if (td->terminate)
+                       break;
+
+               if (td->delay_sleep)
+                       usec_sleep(td->delay_sleep);
+
+               gettimeofday(&s, NULL);
+
+               ret = io_submit(*td->aio_ctx, 1, &iocb);
+               if (ret < 0) {
+                       td->error = errno;
+                       break;
+               }
+
+               td->aio_cur_depth++;
+               if (td->aio_cur_depth > max_depth) {
+                       max_depth = td->aio_cur_depth;
+                       printf("max now %d\n", max_depth);
+               }
+
+               if (td->aio_cur_depth < td->aio_depth) {
+                       timeout = &ts;
+                       min_evts = 0;
+               } else {
+                       timeout = NULL;
+                       min_evts = 1;
+               }
+
+               ret = io_getevents(*td->aio_ctx, min_evts, td->aio_cur_depth, td->aio_events, timeout);
+               if (ret < 0) {
+                       td->error = errno;
+                       break;
+               } else if (!ret)
+                       continue;
+
+               for (i = 0; i < ret; i++) {
+                       struct io_event *ev = td->aio_events + i;
+
+                       td->io_blocks++;
+                       td->aio_cur_depth--;
+
+                       iocb = ev->obj;
+                       aio_put_iocb(td, iocb);
+               }
+
+               gettimeofday(&e, NULL);
+
+               usec = utime_since(&s, &e);
+
+               rate_throttle(td, usec);
+
+               if (check_min_rate(td, &e)) {
+                       td->error = ENODATA;
+                       break;
+               }
+
+               msec = usec / 1000;
+               add_stat_sample(td, msec);
+
+               if (msec < td->min_latency)
+                       td->min_latency = msec;
+               if (msec > td->max_latency)
+                       td->max_latency = msec;
+       }
+
+       gettimeofday(&e, NULL);
+       td->runtime = mtime_since(&td->start, &e);
+
+       free(ptr);
+}
+
+static void cleanup_aio(struct thread_data *td)
+{
+       /*
+        * flush pending events
+        */
+       if (td->aio_cur_depth)
+               io_getevents(*td->aio_ctx, td->aio_cur_depth, td->aio_cur_depth, td->aio_events, NULL);
+
+       if (td->aio_ctx) {
+               io_destroy(*td->aio_ctx);
+               free(td->aio_ctx);
+       }
+       if (td->aio_iocbs)
+               free(td->aio_iocbs);
+       if (td->aio_events)
+               free(td->aio_events);
+       if (td->aio_iocbs_status)
+               free(td->aio_iocbs_status);
+}
+
+static int init_aio(struct thread_data *td)
+{
+       td->aio_ctx = malloc(sizeof(*td->aio_ctx));
+
+       if (io_queue_init(td->aio_depth, td->aio_ctx)) {
+               td->error = errno;
+               return 1;
+       }
+
+       td->aio_iocbs = malloc(td->aio_depth * sizeof(struct iocb));
+       td->aio_events = malloc(td->aio_depth * sizeof(struct io_event));
+       td->aio_iocbs_status = malloc(td->aio_depth * sizeof(char));
+       return 0;
+}
+
 static void *thread_main(int shm_id, int offset, char *argv[])
 {
        struct thread_data *td;
@@ -480,7 +649,7 @@ static void *thread_main(int shm_id, int offset, char *argv[])
                goto err;
        }
 
-       printf("Thread (%s) (pid=%u) (f=%s) started\n", td->ddir == DDIR_READ ? "read" : "write", td->pid, td->file_name);
+       printf("Thread (%s) (pid=%u) (f=%s) (aio=%d) started\n", td->ddir == DDIR_READ ? "read" : "write", td->pid, td->file_name, td->use_aio);
        fflush(stdout);
 
        sprintf(argv[0], "fio%d", offset);
@@ -499,6 +668,9 @@ static void *thread_main(int shm_id, int offset, char *argv[])
                goto err;
        }
 
+       if (td->use_aio && init_aio(td))
+               goto err;
+
        if (init_random_state(td))
                goto out;
        if (init_stat_file(td))
@@ -527,7 +699,12 @@ static void *thread_main(int shm_id, int offset, char *argv[])
 
        sem_post(&startup_sem);
        sem_wait(&td->mutex);
-       do_thread_io(td);
+
+       if (!td->use_aio)
+               do_sync_io(td);
+       else
+               do_async_io(td);
+
        ret = 0;
 
 out:
@@ -535,6 +712,8 @@ out:
 err:
        if (td->fd != -1)
                close(td->fd);
+       if (td->use_aio)
+               cleanup_aio(td);
        if (ret)
                sem_post(&startup_sem);
 
@@ -602,6 +781,9 @@ static struct thread_data *get_new_job(void)
        td->ratecycle = DEF_RATE_CYCLE;
        td->sequential = sequential;
        td->ioprio = 0;
+       td->use_aio = 0;
+       td->aio_depth = 0;
+       td->aio_cur_depth = 0;
        memcpy(&td->cpumask, &def_cpumask, sizeof(td->cpumask));
 
        return td;
@@ -622,10 +804,13 @@ static int add_job(struct thread_data *td, const char *filename, int prioclass,
        td->min_latency = 10000000;
        td->ioprio = (prioclass << IOPRIO_CLASS_SHIFT) | prio;
 
+       if (td->use_aio && !td->aio_depth)
+               td->aio_depth = 1;
+
        if (setup_rate(td))
                return -1;
 
-       printf("Client%d: file=%s, rw=%d, prio=%d, seq=%d, odir=%d, bs=%d, rate=%d\n", td->thread_number, filename, td->ddir, td->ioprio, td->sequential, td->odirect, td->bs, td->rate);
+       printf("Client%d: file=%s, rw=%d, prio=%d, seq=%d, odir=%d, bs=%d, rate=%d, aio=%d, aio_depth=%d\n", td->thread_number, filename, td->ddir, td->ioprio, td->sequential, td->odirect, td->bs, td->rate, td->use_aio, td->aio_depth);
        return 0;
 }
 
@@ -782,6 +967,17 @@ static void parse_jobs_cmd(int argc, char *argv[], int index)
                        td->start_delay = strtoul(string, NULL, 10);
                }
 
+               c = strstr(p, "aio_depth=");
+               if (c) {
+                       c += 10;
+                       fill_option(c, string);
+                       td->aio_depth = strtoul(string, NULL, 10);
+               }
+
+               c = strstr(p, "aio");
+               if (c)
+                       td->use_aio = 1;
+
                c = strstr(p, "random");
                if (c)
                        td->sequential = 0;
@@ -910,16 +1106,26 @@ static int parse_jobs_ini(char *file)
                                fgetpos(f, &off);
                                continue;
                        }
-                       if (!strcmp(p, "sequential")) {
+                       if (!check_int(p, "aio_depth", &td->aio_depth)) {
+                               fgetpos(f, &off);
+                               continue;
+                       }
+                       if (!strncmp(p, "sequential", 10)) {
                                td->sequential = 1;
                                fgetpos(f, &off);
                                continue;
                        }
-                       if (!strcmp(p, "random")) {
+                       if (!strncmp(p, "random", 6)) {
                                td->sequential = 0;
                                fgetpos(f, &off);
                                continue;
                        }
+                       if (!strncmp(p, "aio", 3)) {
+                               td->use_aio = 1;
+                               fgetpos(f, &off);
+                               continue;
+                       }
+
                        printf("Client%d: bad option %s\n",td->thread_number,p);
                }
                fsetpos(f, &off);