#include <time.h>
#include <ctype.h>
#include <sched.h>
+#include <libaio.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/stat.h>
unsigned int delay_sleep;
unsigned int fsync_blocks;
unsigned int start_delay;
+ unsigned int use_aio;
cpu_set_t cpumask;
+ io_context_t *aio_ctx;
+ struct iocb *aio_iocbs;
+ unsigned int aio_depth;
+ unsigned int aio_cur_depth;
+ struct io_event *aio_events;
+ char *aio_iocbs_status;
+
unsigned int rate;
unsigned int ratemin;
unsigned int ratecycle;
#define should_fsync(td) ((td)->ddir == DDIR_WRITE && !(td)->odirect)
-static void do_thread_io(struct thread_data *td)
+static void do_sync_io(struct thread_data *td)
{
struct timeval s, e;
char *buffer, *ptr;
free(ptr);
}
-
+
+static void aio_put_iocb(struct thread_data *td, struct iocb *iocb)
+{
+ long offset = ((long) iocb - (long) td->aio_iocbs)/ sizeof(struct iocb);
+
+ td->aio_iocbs_status[offset] = 0;
+}
+
+static struct iocb *aio_get_iocb(struct thread_data *td, char *buffer)
+{
+ struct iocb *iocb = NULL;
+ int i;
+
+ for (i = 0; i < td->aio_depth; i++) {
+ if (td->aio_iocbs_status[i] == 0) {
+ td->aio_iocbs_status[i] = 1;
+ iocb = &td->aio_iocbs[i];
+ break;
+ }
+ }
+
+ if (iocb) {
+ off_t off = get_next_offset(td);
+ char *p = buffer + i * td->bs;
+
+ if (td->ddir == DDIR_READ)
+ io_prep_pread(iocb, td->fd, p, td->bs, off);
+ else
+ io_prep_pwrite(iocb, td->fd, p, td->bs, off);
+ }
+
+ return iocb;
+}
+
+static void do_async_io(struct thread_data *td)
+{
+ struct timeval s, e;
+ char *buf, *ptr;
+ unsigned long blocks, msec, usec;
+ int max_depth = 0;
+
+ ptr = malloc(td->bs * td->aio_depth + MASK);
+ buf = ALIGN(ptr);
+
+ gettimeofday(&td->start, NULL);
+
+ if (td->ratemin)
+ memcpy(&td->lastrate, &td->start, sizeof(td->start));
+
+ for (blocks = 0; blocks < td->blocks; blocks++) {
+ struct timespec ts = { .tv_sec = 0, .tv_nsec = 0};
+ struct timespec *timeout;
+ struct iocb *iocb = aio_get_iocb(td, buf);
+ int ret, i, min_evts = 0;
+
+ if (td->terminate)
+ break;
+
+ if (td->delay_sleep)
+ usec_sleep(td->delay_sleep);
+
+ gettimeofday(&s, NULL);
+
+ ret = io_submit(*td->aio_ctx, 1, &iocb);
+ if (ret < 0) {
+ td->error = errno;
+ break;
+ }
+
+ td->aio_cur_depth++;
+ if (td->aio_cur_depth > max_depth) {
+ max_depth = td->aio_cur_depth;
+ printf("max now %d\n", max_depth);
+ }
+
+ if (td->aio_cur_depth < td->aio_depth) {
+ timeout = &ts;
+ min_evts = 0;
+ } else {
+ timeout = NULL;
+ min_evts = 1;
+ }
+
+ ret = io_getevents(*td->aio_ctx, min_evts, td->aio_cur_depth, td->aio_events, timeout);
+ if (ret < 0) {
+ td->error = errno;
+ break;
+ } else if (!ret)
+ continue;
+
+ for (i = 0; i < ret; i++) {
+ struct io_event *ev = td->aio_events + i;
+
+ td->io_blocks++;
+ td->aio_cur_depth--;
+
+ iocb = ev->obj;
+ aio_put_iocb(td, iocb);
+ }
+
+ gettimeofday(&e, NULL);
+
+ usec = utime_since(&s, &e);
+
+ rate_throttle(td, usec);
+
+ if (check_min_rate(td, &e)) {
+ td->error = ENODATA;
+ break;
+ }
+
+ msec = usec / 1000;
+ add_stat_sample(td, msec);
+
+ if (msec < td->min_latency)
+ td->min_latency = msec;
+ if (msec > td->max_latency)
+ td->max_latency = msec;
+ }
+
+ gettimeofday(&e, NULL);
+ td->runtime = mtime_since(&td->start, &e);
+
+ free(ptr);
+}
+
+static void cleanup_aio(struct thread_data *td)
+{
+ /*
+ * flush pending events
+ */
+ if (td->aio_cur_depth)
+ io_getevents(*td->aio_ctx, td->aio_cur_depth, td->aio_cur_depth, td->aio_events, NULL);
+
+ if (td->aio_ctx) {
+ io_destroy(*td->aio_ctx);
+ free(td->aio_ctx);
+ }
+ if (td->aio_iocbs)
+ free(td->aio_iocbs);
+ if (td->aio_events)
+ free(td->aio_events);
+ if (td->aio_iocbs_status)
+ free(td->aio_iocbs_status);
+}
+
+static int init_aio(struct thread_data *td)
+{
+ td->aio_ctx = malloc(sizeof(*td->aio_ctx));
+
+ if (io_queue_init(td->aio_depth, td->aio_ctx)) {
+ td->error = errno;
+ return 1;
+ }
+
+ td->aio_iocbs = malloc(td->aio_depth * sizeof(struct iocb));
+ td->aio_events = malloc(td->aio_depth * sizeof(struct io_event));
+ td->aio_iocbs_status = malloc(td->aio_depth * sizeof(char));
+ return 0;
+}
+
static void *thread_main(int shm_id, int offset, char *argv[])
{
struct thread_data *td;
goto err;
}
- printf("Thread (%s) (pid=%u) (f=%s) started\n", td->ddir == DDIR_READ ? "read" : "write", td->pid, td->file_name);
+ printf("Thread (%s) (pid=%u) (f=%s) (aio=%d) started\n", td->ddir == DDIR_READ ? "read" : "write", td->pid, td->file_name, td->use_aio);
fflush(stdout);
sprintf(argv[0], "fio%d", offset);
goto err;
}
+ if (td->use_aio && init_aio(td))
+ goto err;
+
if (init_random_state(td))
goto out;
if (init_stat_file(td))
sem_post(&startup_sem);
sem_wait(&td->mutex);
- do_thread_io(td);
+
+ if (!td->use_aio)
+ do_sync_io(td);
+ else
+ do_async_io(td);
+
ret = 0;
out:
err:
if (td->fd != -1)
close(td->fd);
+ if (td->use_aio)
+ cleanup_aio(td);
if (ret)
sem_post(&startup_sem);
td->ratecycle = DEF_RATE_CYCLE;
td->sequential = sequential;
td->ioprio = 0;
+ td->use_aio = 0;
+ td->aio_depth = 0;
+ td->aio_cur_depth = 0;
memcpy(&td->cpumask, &def_cpumask, sizeof(td->cpumask));
return td;
td->min_latency = 10000000;
td->ioprio = (prioclass << IOPRIO_CLASS_SHIFT) | prio;
+ if (td->use_aio && !td->aio_depth)
+ td->aio_depth = 1;
+
if (setup_rate(td))
return -1;
- printf("Client%d: file=%s, rw=%d, prio=%d, seq=%d, odir=%d, bs=%d, rate=%d\n", td->thread_number, filename, td->ddir, td->ioprio, td->sequential, td->odirect, td->bs, td->rate);
+ printf("Client%d: file=%s, rw=%d, prio=%d, seq=%d, odir=%d, bs=%d, rate=%d, aio=%d, aio_depth=%d\n", td->thread_number, filename, td->ddir, td->ioprio, td->sequential, td->odirect, td->bs, td->rate, td->use_aio, td->aio_depth);
return 0;
}
td->start_delay = strtoul(string, NULL, 10);
}
+ c = strstr(p, "aio_depth=");
+ if (c) {
+ c += 10;
+ fill_option(c, string);
+ td->aio_depth = strtoul(string, NULL, 10);
+ }
+
+ c = strstr(p, "aio");
+ if (c)
+ td->use_aio = 1;
+
c = strstr(p, "random");
if (c)
td->sequential = 0;
fgetpos(f, &off);
continue;
}
- if (!strcmp(p, "sequential")) {
+ if (!check_int(p, "aio_depth", &td->aio_depth)) {
+ fgetpos(f, &off);
+ continue;
+ }
+ if (!strncmp(p, "sequential", 10)) {
td->sequential = 1;
fgetpos(f, &off);
continue;
}
- if (!strcmp(p, "random")) {
+ if (!strncmp(p, "random", 6)) {
td->sequential = 0;
fgetpos(f, &off);
continue;
}
+ if (!strncmp(p, "aio", 3)) {
+ td->use_aio = 1;
+ fgetpos(f, &off);
+ continue;
+ }
+
printf("Client%d: bad option %s\n",td->thread_number,p);
}
fsetpos(f, &off);