[PATCH] fio: create an abstract io unit
authorJens Axboe <axboe@suse.de>
Tue, 25 Oct 2005 09:05:50 +0000 (11:05 +0200)
committerJens Axboe <axboe@suse.de>
Tue, 25 Oct 2005 09:05:50 +0000 (11:05 +0200)
fio.c

diff --git a/fio.c b/fio.c
index e33a7b18551a13705e80ebe63d4b7cfc6121f5f8..aa9c03b4a9bccea7f823f623c888b7085ce623b5 100644 (file)
--- a/fio.c
+++ b/fio.c
@@ -40,6 +40,8 @@
 #include <sys/shm.h>
 #include <asm/unistd.h>
 
+#include "list.h"
+
 #define MAX_JOBS       (1024)
 
 /*
@@ -152,6 +154,21 @@ enum {
        TD_REAPED,
 };
 
+/*
+ * The io unit
+ */
+struct io_u {
+       struct iocb iocb;
+       struct timeval issue_time;
+
+       void *mem;
+       char *buf;
+       unsigned int buflen;
+       off_t offset;
+
+       struct list_head list;
+};
+
 #define td_read(td)            ((td)->ddir == DDIR_READ)
 #define should_fsync(td)       (!td_read(td) && !(td)->odirect)
 
@@ -161,7 +178,6 @@ struct thread_data {
        int error;
        int fd;
        pid_t pid;
-       char *buf;
        volatile int terminate;
        volatile int runstate;
        unsigned int ddir;
@@ -182,11 +198,12 @@ struct thread_data {
        cpu_set_t cpumask;
 
        io_context_t aio_ctx;
-       struct iocb *aio_iocbs;
        unsigned int aio_depth;
-       unsigned int aio_cur_depth;
        struct io_event *aio_events;
-       char *aio_iocbs_status;
+
+       unsigned int cur_depth;
+       struct list_head io_u_freelist;
+       struct list_head io_u_busylist;
 
        unsigned int rate;
        unsigned int ratemin;
@@ -423,19 +440,53 @@ static inline int runtime_exceeded(struct thread_data *td, struct timeval *t)
        return 0;
 }
 
+static void put_io_u(struct thread_data *td, struct io_u *io_u)
+{
+       list_del(&io_u->list);
+       list_add(&io_u->list, &td->io_u_freelist);
+       td->cur_depth--;
+}
+
+static struct io_u *get_io_u(struct thread_data *td)
+{
+       struct io_u *io_u;
+
+       if (list_empty(&td->io_u_freelist))
+               return NULL;
+
+       io_u = list_entry(td->io_u_freelist.next, struct io_u, list);
+       list_del(&io_u->list);
+       list_add(&io_u->list, &td->io_u_busylist);
+
+       io_u->offset = get_next_offset(td);
+
+       if (td->use_aio) {
+               if (td_read(td))
+                       io_prep_pread(&io_u->iocb, td->fd, io_u->buf, io_u->buflen, io_u->offset);
+               else
+                       io_prep_pwrite(&io_u->iocb, td->fd, io_u->buf, io_u->buflen, io_u->offset);
+       }
+
+       gettimeofday(&io_u->issue_time, NULL);
+       td->cur_depth++;
+       return io_u;
+}
+
 static void do_sync_io(struct thread_data *td)
 {
-       struct timeval s, e;
        unsigned long blocks, msec, usec;
+       struct timeval e;
 
        for (blocks = 0; blocks < td->blocks; blocks++) {
-               off_t offset = get_next_offset(td);
+               struct io_u *io_u;
                int ret;
 
                if (td->terminate)
                        break;
 
-               if (lseek(td->fd, offset, SEEK_SET) == -1) {
+               io_u = get_io_u(td);
+
+               if (lseek(td->fd, io_u->offset, SEEK_SET) == -1) {
                        td->error = errno;
                        break;
                }
@@ -443,14 +494,12 @@ static void do_sync_io(struct thread_data *td)
                if (td->delay_sleep)
                        usec_sleep(td->delay_sleep);
 
-               gettimeofday(&s, NULL);
-
                if (td_read(td))
-                       ret = read(td->fd, td->buf, td->bs);
+                       ret = read(td->fd, io_u->buf, io_u->buflen);
                else
-                       ret = write(td->fd, td->buf, td->bs);
+                       ret = write(td->fd, io_u->buf, io_u->buflen);
 
-               if (ret < (int) td->bs) {
+               if (ret < (int) io_u->buflen) {
                        if (ret == -1)
                                td->error = errno;
                        break;
@@ -464,7 +513,7 @@ static void do_sync_io(struct thread_data *td)
 
                gettimeofday(&e, NULL);
 
-               usec = utime_since(&s, &e);
+               usec = utime_since(&io_u->issue_time, &e);
 
                rate_throttle(td, usec);
 
@@ -483,50 +532,17 @@ static void do_sync_io(struct thread_data *td)
 
                if (runtime_exceeded(td, &e))
                        break;
+
+               put_io_u(td, io_u);
        }
 
        if (should_fsync(td))
                fsync(td->fd);
 }
 
-static void aio_put_iocb(struct thread_data *td, struct iocb *iocb)
-{
-       long offset = ((long) iocb - (long) td->aio_iocbs)/ sizeof(struct iocb);
-
-       td->aio_iocbs_status[offset] = 0;
-       td->aio_cur_depth--;
-}
-
-static struct iocb *aio_get_iocb(struct thread_data *td, struct timeval *t)
-{
-       struct iocb *iocb = NULL;
-       unsigned int i;
-
-       for (i = 0; i < td->aio_depth; i++) {
-               if (td->aio_iocbs_status[i] == 0) {
-                       td->aio_iocbs_status[i] = 1;
-                       iocb = &td->aio_iocbs[i];
-                       break;
-               }
-       }
-
-       if (iocb) {
-               off_t off = get_next_offset(td);
-               char *p = td->buf + i * td->bs;
-
-               if (td_read(td))
-                       io_prep_pread(iocb, td->fd, p, td->bs, off);
-               else
-                       io_prep_pwrite(iocb, td->fd, p, td->bs, off);
-
-               io_set_callback(iocb, (io_callback_t) msec_now(t));
-       }
-
-       return iocb;
-}
-
-static int aio_submit(struct thread_data *td, struct iocb *iocb)
+static int io_u_queue(struct thread_data *td, struct io_u *io_u)
 {
+       struct iocb *iocb = &io_u->iocb;
        int ret;
 
        do {
@@ -546,17 +562,77 @@ static int aio_submit(struct thread_data *td, struct iocb *iocb)
 }
 
 #define iocb_time(iocb)        ((unsigned long) (iocb)->data)
+#define ev_to_iou(ev)  (struct io_u *) ((unsigned long) (ev)->obj)
+
+static void ios_completed(struct thread_data *td, int nr)
+{
+       unsigned long msec;
+       struct io_u *io_u;
+       struct timeval e;
+       int i;
+
+       gettimeofday(&e, NULL);
+
+       for (i = 0; i < nr; i++) {
+               td->io_blocks++;
+
+               io_u = ev_to_iou(td->aio_events + i);
+
+               msec = mtime_since(&io_u->issue_time, &e);
+
+               add_stat_sample(td, msec);
+
+               if (msec < td->min_latency)
+                       td->min_latency = msec;
+               if (msec > td->max_latency)
+                       td->max_latency = msec;
+
+               put_io_u(td, io_u);
+       }
+}
+
+static void cleanup_pending_aio(struct thread_data *td)
+{
+       struct timespec ts = { .tv_sec = 0, .tv_nsec = 0};
+       struct list_head *entry, *n;
+       struct io_u *io_u;
+       int r;
+
+       /*
+        * get immediately available events, if any
+        */
+       r = io_getevents(td->aio_ctx, 0, td->cur_depth, td->aio_events, &ts);
+       if (r > 0)
+               ios_completed(td, r);
+
+       /*
+        * now cancel remaining active events
+        */
+       list_for_each_safe(entry, n, &td->io_u_busylist) {
+               io_u = list_entry(entry, struct io_u, list);
+
+               r = io_cancel(td->aio_ctx, &io_u->iocb, td->aio_events);
+               if (!r)
+                       put_io_u(td, io_u);
+       }
+
+       if (td->cur_depth) {
+               r = io_getevents(td->aio_ctx, td->cur_depth, td->cur_depth, td->aio_events, NULL);
+               if (r > 0)
+                       ios_completed(td, r);
+       }
+}
 
 static void do_async_io(struct thread_data *td)
 {
        struct timeval s, e;
-       unsigned long blocks, msec, usec;
+       unsigned long blocks, usec;
 
        for (blocks = 0; blocks < td->blocks; blocks++) {
                struct timespec ts = { .tv_sec = 0, .tv_nsec = 0};
                struct timespec *timeout;
-               int ret, i, min_evts = 0;
-               struct iocb *iocb;
+               int ret, min_evts = 0;
+               struct io_u *io_u;
 
                if (td->terminate)
                        break;
@@ -564,19 +640,17 @@ static void do_async_io(struct thread_data *td)
                if (td->delay_sleep)
                        usec_sleep(td->delay_sleep);
 
-               gettimeofday(&s, NULL);
+               io_u = get_io_u(td);
 
-               iocb = aio_get_iocb(td, &s);
+               memcpy(&s, &io_u->issue_time, sizeof(s));
 
-               ret = aio_submit(td, iocb);
+               ret = io_u_queue(td, io_u);
                if (ret) {
                        td->error = errno;
                        break;
                }
 
-               td->aio_cur_depth++;
-
-               if (td->aio_cur_depth < td->aio_depth) {
+               if (td->cur_depth < td->aio_depth) {
                        timeout = &ts;
                        min_evts = 0;
                } else {
@@ -584,38 +658,21 @@ static void do_async_io(struct thread_data *td)
                        min_evts = 1;
                }
 
-               ret = io_getevents(td->aio_ctx, min_evts, td->aio_cur_depth, td->aio_events, timeout);
+               ret = io_getevents(td->aio_ctx, min_evts, td->cur_depth, td->aio_events, timeout);
                if (ret < 0) {
                        td->error = errno;
                        break;
                } else if (!ret)
                        continue;
 
-               gettimeofday(&e, NULL);
-
-               for (i = 0; i < ret; i++) {
-                       struct io_event *ev = td->aio_events + i;
-
-                       td->io_blocks++;
-
-                       iocb = ev->obj;
-
-                       msec = msec_now(&e) - iocb_time(iocb);
-                       add_stat_sample(td, msec);
-
-                       if (msec < td->min_latency)
-                               td->min_latency = msec;
-                       if (msec > td->max_latency)
-                               td->max_latency = msec;
-
-                       aio_put_iocb(td, iocb);
-               }
+               ios_completed(td, ret);
 
                /*
                 * the rate is batched for now, it should work for batches
                 * of completions except the very first one which may look
                 * a little bursty
                 */
+               gettimeofday(&e, NULL);
                usec = utime_since(&s, &e);
 
                rate_throttle(td, usec);
@@ -628,52 +685,17 @@ static void do_async_io(struct thread_data *td)
                if (runtime_exceeded(td, &e))
                        break;
        }
-}
 
-static void cleanup_pending_aio(struct thread_data *td)
-{
-       struct timespec ts = { .tv_sec = 0, .tv_nsec = 0};
-       unsigned int i;
-       int r;
-
-       /*
-        * get immediately available events, if any
-        */
-       r = io_getevents(td->aio_ctx, 0, td->aio_cur_depth, td->aio_events, &ts);
-       if (r > 0) {
-               for (i = 0; i < r; i++)
-                       aio_put_iocb(td, &td->aio_iocbs[i]);
-       }
-
-       /*
-        * now cancel remaining active events
-        */
-       for (i = 0; i < td->aio_depth; i++) {
-               if (td->aio_iocbs_status[i] == 0)
-                       continue;
-
-               r = io_cancel(td->aio_ctx, &td->aio_iocbs[i], td->aio_events);
-               if (!r)
-                       aio_put_iocb(td, &td->aio_iocbs[i]);
-       }
-
-       if (td->aio_cur_depth)
-               io_getevents(td->aio_ctx, td->aio_cur_depth, td->aio_cur_depth, td->aio_events, NULL);
+       if (td->cur_depth)
+               cleanup_pending_aio(td);
 }
 
 static void cleanup_aio(struct thread_data *td)
 {
-       if (td->aio_cur_depth)
-               cleanup_pending_aio(td);
-
        io_destroy(td->aio_ctx);
 
-       if (td->aio_iocbs)
-               free(td->aio_iocbs);
        if (td->aio_events)
                free(td->aio_events);
-       if (td->aio_iocbs_status)
-               free(td->aio_iocbs_status);
 }
 
 static int init_aio(struct thread_data *td)
@@ -683,12 +705,50 @@ static int init_aio(struct thread_data *td)
                return 1;
        }
 
-       td->aio_iocbs = malloc(td->aio_depth * sizeof(struct iocb));
        td->aio_events = malloc(td->aio_depth * sizeof(struct io_event));
-       td->aio_iocbs_status = malloc(td->aio_depth * sizeof(char));
        return 0;
 }
 
+static void cleanup_io_u(struct thread_data *td)
+{
+       struct list_head *entry, *n;
+       struct io_u *io_u;
+
+       list_for_each_safe(entry, n, &td->io_u_freelist) {
+               io_u = list_entry(entry, struct io_u, list);
+
+               list_del(&io_u->list);
+               free(io_u->mem);
+               free(io_u);
+       }
+}
+
+static void init_io_u(struct thread_data *td)
+{
+       struct io_u *io_u;
+       int i, max_units;
+
+       if (!td->use_aio)
+               max_units = 1;
+       else
+               max_units = td->aio_depth;
+
+       INIT_LIST_HEAD(&td->io_u_freelist);
+       INIT_LIST_HEAD(&td->io_u_busylist);
+
+       for (i = 0; i < max_units; i++) {
+               io_u = malloc(sizeof(*io_u));
+               memset(io_u, 0, sizeof(*io_u));
+               INIT_LIST_HEAD(&io_u->list);
+
+               io_u->mem = malloc(td->bs + MASK);
+               io_u->buf = ALIGN(io_u->mem);
+               io_u->buflen = td->bs;
+
+               list_add(&io_u->list, &td->io_u_freelist);
+       }
+}
+
 static int create_file(struct thread_data *td)
 {
        unsigned int i;
@@ -814,8 +874,8 @@ static int setup_file(struct thread_data *td)
 static void *thread_main(int shm_id, int offset, char *argv[])
 {
        struct thread_data *td;
-       void *data, *ptr = NULL;
        int ret = 1;
+       void *data;
 
        setsid();
 
@@ -823,6 +883,8 @@ static void *thread_main(int shm_id, int offset, char *argv[])
        td = data + offset * sizeof(struct thread_data);
        td->pid = getpid();
 
+       init_io_u(td);
+
        if (sched_setaffinity(td->pid, sizeof(td->cpumask), &td->cpumask) == -1) {
                td->error = errno;
                goto err;
@@ -848,13 +910,6 @@ static void *thread_main(int shm_id, int offset, char *argv[])
        if (setup_file(td))
                goto err;
 
-       if (td->use_aio)
-               ptr = malloc(td->bs * td->aio_depth + MASK);
-       else
-               ptr = malloc(td->bs + MASK);
-
-       td->buf = ALIGN(ptr);
-
        sem_post(&startup_sem);
        sem_wait(&td->mutex);
 
@@ -879,12 +934,11 @@ err:
                close(td->fd);
                td->fd = -1;
        }
+       cleanup_io_u(td);
        if (ret) {
                sem_post(&startup_sem);
                sem_wait(&td->mutex);
        }
-       if (ptr)
-               free(ptr);
        td->runstate = TD_EXITED;
        shmdt(data);
        return NULL;