From: Jens Axboe Date: Tue, 25 Oct 2005 09:05:50 +0000 (+0200) Subject: [PATCH] fio: create an abstract io unit X-Git-Url: https://git.kernel.dk/?a=commitdiff_plain;h=2c83567e7de39e9ae28c9b7b02ce6d9107712bae;p=disktools.git [PATCH] fio: create an abstract io unit --- diff --git a/fio.c b/fio.c index e33a7b1..aa9c03b 100644 --- a/fio.c +++ b/fio.c @@ -40,6 +40,8 @@ #include #include +#include "list.h" + #define MAX_JOBS (1024) /* @@ -152,6 +154,21 @@ enum { TD_REAPED, }; +/* + * The io unit + */ +struct io_u { + struct iocb iocb; + struct timeval issue_time; + + void *mem; + char *buf; + unsigned int buflen; + off_t offset; + + struct list_head list; +}; + #define td_read(td) ((td)->ddir == DDIR_READ) #define should_fsync(td) (!td_read(td) && !(td)->odirect) @@ -161,7 +178,6 @@ struct thread_data { int error; int fd; pid_t pid; - char *buf; volatile int terminate; volatile int runstate; unsigned int ddir; @@ -182,11 +198,12 @@ struct thread_data { cpu_set_t cpumask; io_context_t aio_ctx; - struct iocb *aio_iocbs; unsigned int aio_depth; - unsigned int aio_cur_depth; struct io_event *aio_events; - char *aio_iocbs_status; + + unsigned int cur_depth; + struct list_head io_u_freelist; + struct list_head io_u_busylist; unsigned int rate; unsigned int ratemin; @@ -423,19 +440,53 @@ static inline int runtime_exceeded(struct thread_data *td, struct timeval *t) return 0; } +static void put_io_u(struct thread_data *td, struct io_u *io_u) +{ + list_del(&io_u->list); + list_add(&io_u->list, &td->io_u_freelist); + td->cur_depth--; +} + +static struct io_u *get_io_u(struct thread_data *td) +{ + struct io_u *io_u; + + if (list_empty(&td->io_u_freelist)) + return NULL; + + io_u = list_entry(td->io_u_freelist.next, struct io_u, list); + list_del(&io_u->list); + list_add(&io_u->list, &td->io_u_busylist); + + io_u->offset = get_next_offset(td); + + if (td->use_aio) { + if (td_read(td)) + io_prep_pread(&io_u->iocb, td->fd, io_u->buf, io_u->buflen, io_u->offset); + else + io_prep_pwrite(&io_u->iocb, td->fd, io_u->buf, io_u->buflen, io_u->offset); + } + + gettimeofday(&io_u->issue_time, NULL); + td->cur_depth++; + return io_u; +} + static void do_sync_io(struct thread_data *td) { - struct timeval s, e; unsigned long blocks, msec, usec; + struct timeval e; for (blocks = 0; blocks < td->blocks; blocks++) { - off_t offset = get_next_offset(td); + struct io_u *io_u; int ret; if (td->terminate) break; - if (lseek(td->fd, offset, SEEK_SET) == -1) { + io_u = get_io_u(td); + + if (lseek(td->fd, io_u->offset, SEEK_SET) == -1) { td->error = errno; break; } @@ -443,14 +494,12 @@ static void do_sync_io(struct thread_data *td) if (td->delay_sleep) usec_sleep(td->delay_sleep); - gettimeofday(&s, NULL); - if (td_read(td)) - ret = read(td->fd, td->buf, td->bs); + ret = read(td->fd, io_u->buf, io_u->buflen); else - ret = write(td->fd, td->buf, td->bs); + ret = write(td->fd, io_u->buf, io_u->buflen); - if (ret < (int) td->bs) { + if (ret < (int) io_u->buflen) { if (ret == -1) td->error = errno; break; @@ -464,7 +513,7 @@ static void do_sync_io(struct thread_data *td) gettimeofday(&e, NULL); - usec = utime_since(&s, &e); + usec = utime_since(&io_u->issue_time, &e); rate_throttle(td, usec); @@ -483,50 +532,17 @@ static void do_sync_io(struct thread_data *td) if (runtime_exceeded(td, &e)) break; + + put_io_u(td, io_u); } if (should_fsync(td)) fsync(td->fd); } -static void aio_put_iocb(struct thread_data *td, struct iocb *iocb) -{ - long offset = ((long) iocb - (long) td->aio_iocbs)/ sizeof(struct iocb); - - td->aio_iocbs_status[offset] = 0; - td->aio_cur_depth--; -} - -static struct iocb *aio_get_iocb(struct thread_data *td, struct timeval *t) -{ - struct iocb *iocb = NULL; - unsigned int i; - - for (i = 0; i < td->aio_depth; i++) { - if (td->aio_iocbs_status[i] == 0) { - td->aio_iocbs_status[i] = 1; - iocb = &td->aio_iocbs[i]; - break; - } - } - - if (iocb) { - off_t off = get_next_offset(td); - char *p = td->buf + i * td->bs; - - if (td_read(td)) - io_prep_pread(iocb, td->fd, p, td->bs, off); - else - io_prep_pwrite(iocb, td->fd, p, td->bs, off); - - io_set_callback(iocb, (io_callback_t) msec_now(t)); - } - - return iocb; -} - -static int aio_submit(struct thread_data *td, struct iocb *iocb) +static int io_u_queue(struct thread_data *td, struct io_u *io_u) { + struct iocb *iocb = &io_u->iocb; int ret; do { @@ -546,17 +562,77 @@ static int aio_submit(struct thread_data *td, struct iocb *iocb) } #define iocb_time(iocb) ((unsigned long) (iocb)->data) +#define ev_to_iou(ev) (struct io_u *) ((unsigned long) (ev)->obj) + +static void ios_completed(struct thread_data *td, int nr) +{ + unsigned long msec; + struct io_u *io_u; + struct timeval e; + int i; + + gettimeofday(&e, NULL); + + for (i = 0; i < nr; i++) { + td->io_blocks++; + + io_u = ev_to_iou(td->aio_events + i); + + msec = mtime_since(&io_u->issue_time, &e); + + add_stat_sample(td, msec); + + if (msec < td->min_latency) + td->min_latency = msec; + if (msec > td->max_latency) + td->max_latency = msec; + + put_io_u(td, io_u); + } +} + +static void cleanup_pending_aio(struct thread_data *td) +{ + struct timespec ts = { .tv_sec = 0, .tv_nsec = 0}; + struct list_head *entry, *n; + struct io_u *io_u; + int r; + + /* + * get immediately available events, if any + */ + r = io_getevents(td->aio_ctx, 0, td->cur_depth, td->aio_events, &ts); + if (r > 0) + ios_completed(td, r); + + /* + * now cancel remaining active events + */ + list_for_each_safe(entry, n, &td->io_u_busylist) { + io_u = list_entry(entry, struct io_u, list); + + r = io_cancel(td->aio_ctx, &io_u->iocb, td->aio_events); + if (!r) + put_io_u(td, io_u); + } + + if (td->cur_depth) { + r = io_getevents(td->aio_ctx, td->cur_depth, td->cur_depth, td->aio_events, NULL); + if (r > 0) + ios_completed(td, r); + } +} static void do_async_io(struct thread_data *td) { struct timeval s, e; - unsigned long blocks, msec, usec; + unsigned long blocks, usec; for (blocks = 0; blocks < td->blocks; blocks++) { struct timespec ts = { .tv_sec = 0, .tv_nsec = 0}; struct timespec *timeout; - int ret, i, min_evts = 0; - struct iocb *iocb; + int ret, min_evts = 0; + struct io_u *io_u; if (td->terminate) break; @@ -564,19 +640,17 @@ static void do_async_io(struct thread_data *td) if (td->delay_sleep) usec_sleep(td->delay_sleep); - gettimeofday(&s, NULL); + io_u = get_io_u(td); - iocb = aio_get_iocb(td, &s); + memcpy(&s, &io_u->issue_time, sizeof(s)); - ret = aio_submit(td, iocb); + ret = io_u_queue(td, io_u); if (ret) { td->error = errno; break; } - td->aio_cur_depth++; - - if (td->aio_cur_depth < td->aio_depth) { + if (td->cur_depth < td->aio_depth) { timeout = &ts; min_evts = 0; } else { @@ -584,38 +658,21 @@ static void do_async_io(struct thread_data *td) min_evts = 1; } - ret = io_getevents(td->aio_ctx, min_evts, td->aio_cur_depth, td->aio_events, timeout); + ret = io_getevents(td->aio_ctx, min_evts, td->cur_depth, td->aio_events, timeout); if (ret < 0) { td->error = errno; break; } else if (!ret) continue; - gettimeofday(&e, NULL); - - for (i = 0; i < ret; i++) { - struct io_event *ev = td->aio_events + i; - - td->io_blocks++; - - iocb = ev->obj; - - msec = msec_now(&e) - iocb_time(iocb); - add_stat_sample(td, msec); - - if (msec < td->min_latency) - td->min_latency = msec; - if (msec > td->max_latency) - td->max_latency = msec; - - aio_put_iocb(td, iocb); - } + ios_completed(td, ret); /* * the rate is batched for now, it should work for batches * of completions except the very first one which may look * a little bursty */ + gettimeofday(&e, NULL); usec = utime_since(&s, &e); rate_throttle(td, usec); @@ -628,52 +685,17 @@ static void do_async_io(struct thread_data *td) if (runtime_exceeded(td, &e)) break; } -} -static void cleanup_pending_aio(struct thread_data *td) -{ - struct timespec ts = { .tv_sec = 0, .tv_nsec = 0}; - unsigned int i; - int r; - - /* - * get immediately available events, if any - */ - r = io_getevents(td->aio_ctx, 0, td->aio_cur_depth, td->aio_events, &ts); - if (r > 0) { - for (i = 0; i < r; i++) - aio_put_iocb(td, &td->aio_iocbs[i]); - } - - /* - * now cancel remaining active events - */ - for (i = 0; i < td->aio_depth; i++) { - if (td->aio_iocbs_status[i] == 0) - continue; - - r = io_cancel(td->aio_ctx, &td->aio_iocbs[i], td->aio_events); - if (!r) - aio_put_iocb(td, &td->aio_iocbs[i]); - } - - if (td->aio_cur_depth) - io_getevents(td->aio_ctx, td->aio_cur_depth, td->aio_cur_depth, td->aio_events, NULL); + if (td->cur_depth) + cleanup_pending_aio(td); } static void cleanup_aio(struct thread_data *td) { - if (td->aio_cur_depth) - cleanup_pending_aio(td); - io_destroy(td->aio_ctx); - if (td->aio_iocbs) - free(td->aio_iocbs); if (td->aio_events) free(td->aio_events); - if (td->aio_iocbs_status) - free(td->aio_iocbs_status); } static int init_aio(struct thread_data *td) @@ -683,12 +705,50 @@ static int init_aio(struct thread_data *td) return 1; } - td->aio_iocbs = malloc(td->aio_depth * sizeof(struct iocb)); td->aio_events = malloc(td->aio_depth * sizeof(struct io_event)); - td->aio_iocbs_status = malloc(td->aio_depth * sizeof(char)); return 0; } +static void cleanup_io_u(struct thread_data *td) +{ + struct list_head *entry, *n; + struct io_u *io_u; + + list_for_each_safe(entry, n, &td->io_u_freelist) { + io_u = list_entry(entry, struct io_u, list); + + list_del(&io_u->list); + free(io_u->mem); + free(io_u); + } +} + +static void init_io_u(struct thread_data *td) +{ + struct io_u *io_u; + int i, max_units; + + if (!td->use_aio) + max_units = 1; + else + max_units = td->aio_depth; + + INIT_LIST_HEAD(&td->io_u_freelist); + INIT_LIST_HEAD(&td->io_u_busylist); + + for (i = 0; i < max_units; i++) { + io_u = malloc(sizeof(*io_u)); + memset(io_u, 0, sizeof(*io_u)); + INIT_LIST_HEAD(&io_u->list); + + io_u->mem = malloc(td->bs + MASK); + io_u->buf = ALIGN(io_u->mem); + io_u->buflen = td->bs; + + list_add(&io_u->list, &td->io_u_freelist); + } +} + static int create_file(struct thread_data *td) { unsigned int i; @@ -814,8 +874,8 @@ static int setup_file(struct thread_data *td) static void *thread_main(int shm_id, int offset, char *argv[]) { struct thread_data *td; - void *data, *ptr = NULL; int ret = 1; + void *data; setsid(); @@ -823,6 +883,8 @@ static void *thread_main(int shm_id, int offset, char *argv[]) td = data + offset * sizeof(struct thread_data); td->pid = getpid(); + init_io_u(td); + if (sched_setaffinity(td->pid, sizeof(td->cpumask), &td->cpumask) == -1) { td->error = errno; goto err; @@ -848,13 +910,6 @@ static void *thread_main(int shm_id, int offset, char *argv[]) if (setup_file(td)) goto err; - if (td->use_aio) - ptr = malloc(td->bs * td->aio_depth + MASK); - else - ptr = malloc(td->bs + MASK); - - td->buf = ALIGN(ptr); - sem_post(&startup_sem); sem_wait(&td->mutex); @@ -879,12 +934,11 @@ err: close(td->fd); td->fd = -1; } + cleanup_io_u(td); if (ret) { sem_post(&startup_sem); sem_wait(&td->mutex); } - if (ptr) - free(ptr); td->runstate = TD_EXITED; shmdt(data); return NULL;