#include <sys/shm.h>
#include <asm/unistd.h>
+#include "list.h"
+
#define MAX_JOBS (1024)
/*
TD_REAPED,
};
+/*
+ * The io unit
+ */
+struct io_u {
+ struct iocb iocb;
+ struct timeval issue_time;
+
+ void *mem;
+ char *buf;
+ unsigned int buflen;
+ off_t offset;
+
+ struct list_head list;
+};
+
#define td_read(td) ((td)->ddir == DDIR_READ)
#define should_fsync(td) (!td_read(td) && !(td)->odirect)
int error;
int fd;
pid_t pid;
- char *buf;
volatile int terminate;
volatile int runstate;
unsigned int ddir;
cpu_set_t cpumask;
io_context_t aio_ctx;
- struct iocb *aio_iocbs;
unsigned int aio_depth;
- unsigned int aio_cur_depth;
struct io_event *aio_events;
- char *aio_iocbs_status;
+
+ unsigned int cur_depth;
+ struct list_head io_u_freelist;
+ struct list_head io_u_busylist;
unsigned int rate;
unsigned int ratemin;
return 0;
}
+static void put_io_u(struct thread_data *td, struct io_u *io_u)
+{
+ list_del(&io_u->list);
+ list_add(&io_u->list, &td->io_u_freelist);
+ td->cur_depth--;
+}
+
+static struct io_u *get_io_u(struct thread_data *td)
+{
+ struct io_u *io_u;
+
+ if (list_empty(&td->io_u_freelist))
+ return NULL;
+
+ io_u = list_entry(td->io_u_freelist.next, struct io_u, list);
+ list_del(&io_u->list);
+ list_add(&io_u->list, &td->io_u_busylist);
+
+ io_u->offset = get_next_offset(td);
+
+ if (td->use_aio) {
+ if (td_read(td))
+ io_prep_pread(&io_u->iocb, td->fd, io_u->buf, io_u->buflen, io_u->offset);
+ else
+ io_prep_pwrite(&io_u->iocb, td->fd, io_u->buf, io_u->buflen, io_u->offset);
+ }
+
+ gettimeofday(&io_u->issue_time, NULL);
+ td->cur_depth++;
+ return io_u;
+}
+
static void do_sync_io(struct thread_data *td)
{
- struct timeval s, e;
unsigned long blocks, msec, usec;
+ struct timeval e;
for (blocks = 0; blocks < td->blocks; blocks++) {
- off_t offset = get_next_offset(td);
+ struct io_u *io_u;
int ret;
if (td->terminate)
break;
- if (lseek(td->fd, offset, SEEK_SET) == -1) {
+ io_u = get_io_u(td);
+
+ if (lseek(td->fd, io_u->offset, SEEK_SET) == -1) {
td->error = errno;
break;
}
if (td->delay_sleep)
usec_sleep(td->delay_sleep);
- gettimeofday(&s, NULL);
-
if (td_read(td))
- ret = read(td->fd, td->buf, td->bs);
+ ret = read(td->fd, io_u->buf, io_u->buflen);
else
- ret = write(td->fd, td->buf, td->bs);
+ ret = write(td->fd, io_u->buf, io_u->buflen);
- if (ret < (int) td->bs) {
+ if (ret < (int) io_u->buflen) {
if (ret == -1)
td->error = errno;
break;
gettimeofday(&e, NULL);
- usec = utime_since(&s, &e);
+ usec = utime_since(&io_u->issue_time, &e);
rate_throttle(td, usec);
if (runtime_exceeded(td, &e))
break;
+
+ put_io_u(td, io_u);
}
if (should_fsync(td))
fsync(td->fd);
}
-static void aio_put_iocb(struct thread_data *td, struct iocb *iocb)
-{
- long offset = ((long) iocb - (long) td->aio_iocbs)/ sizeof(struct iocb);
-
- td->aio_iocbs_status[offset] = 0;
- td->aio_cur_depth--;
-}
-
-static struct iocb *aio_get_iocb(struct thread_data *td, struct timeval *t)
-{
- struct iocb *iocb = NULL;
- unsigned int i;
-
- for (i = 0; i < td->aio_depth; i++) {
- if (td->aio_iocbs_status[i] == 0) {
- td->aio_iocbs_status[i] = 1;
- iocb = &td->aio_iocbs[i];
- break;
- }
- }
-
- if (iocb) {
- off_t off = get_next_offset(td);
- char *p = td->buf + i * td->bs;
-
- if (td_read(td))
- io_prep_pread(iocb, td->fd, p, td->bs, off);
- else
- io_prep_pwrite(iocb, td->fd, p, td->bs, off);
-
- io_set_callback(iocb, (io_callback_t) msec_now(t));
- }
-
- return iocb;
-}
-
-static int aio_submit(struct thread_data *td, struct iocb *iocb)
+static int io_u_queue(struct thread_data *td, struct io_u *io_u)
{
+ struct iocb *iocb = &io_u->iocb;
int ret;
do {
}
#define iocb_time(iocb) ((unsigned long) (iocb)->data)
+#define ev_to_iou(ev) (struct io_u *) ((unsigned long) (ev)->obj)
+
+static void ios_completed(struct thread_data *td, int nr)
+{
+ unsigned long msec;
+ struct io_u *io_u;
+ struct timeval e;
+ int i;
+
+ gettimeofday(&e, NULL);
+
+ for (i = 0; i < nr; i++) {
+ td->io_blocks++;
+
+ io_u = ev_to_iou(td->aio_events + i);
+
+ msec = mtime_since(&io_u->issue_time, &e);
+
+ add_stat_sample(td, msec);
+
+ if (msec < td->min_latency)
+ td->min_latency = msec;
+ if (msec > td->max_latency)
+ td->max_latency = msec;
+
+ put_io_u(td, io_u);
+ }
+}
+
+static void cleanup_pending_aio(struct thread_data *td)
+{
+ struct timespec ts = { .tv_sec = 0, .tv_nsec = 0};
+ struct list_head *entry, *n;
+ struct io_u *io_u;
+ int r;
+
+ /*
+ * get immediately available events, if any
+ */
+ r = io_getevents(td->aio_ctx, 0, td->cur_depth, td->aio_events, &ts);
+ if (r > 0)
+ ios_completed(td, r);
+
+ /*
+ * now cancel remaining active events
+ */
+ list_for_each_safe(entry, n, &td->io_u_busylist) {
+ io_u = list_entry(entry, struct io_u, list);
+
+ r = io_cancel(td->aio_ctx, &io_u->iocb, td->aio_events);
+ if (!r)
+ put_io_u(td, io_u);
+ }
+
+ if (td->cur_depth) {
+ r = io_getevents(td->aio_ctx, td->cur_depth, td->cur_depth, td->aio_events, NULL);
+ if (r > 0)
+ ios_completed(td, r);
+ }
+}
static void do_async_io(struct thread_data *td)
{
struct timeval s, e;
- unsigned long blocks, msec, usec;
+ unsigned long blocks, usec;
for (blocks = 0; blocks < td->blocks; blocks++) {
struct timespec ts = { .tv_sec = 0, .tv_nsec = 0};
struct timespec *timeout;
- int ret, i, min_evts = 0;
- struct iocb *iocb;
+ int ret, min_evts = 0;
+ struct io_u *io_u;
if (td->terminate)
break;
if (td->delay_sleep)
usec_sleep(td->delay_sleep);
- gettimeofday(&s, NULL);
+ io_u = get_io_u(td);
- iocb = aio_get_iocb(td, &s);
+ memcpy(&s, &io_u->issue_time, sizeof(s));
- ret = aio_submit(td, iocb);
+ ret = io_u_queue(td, io_u);
if (ret) {
td->error = errno;
break;
}
- td->aio_cur_depth++;
-
- if (td->aio_cur_depth < td->aio_depth) {
+ if (td->cur_depth < td->aio_depth) {
timeout = &ts;
min_evts = 0;
} else {
min_evts = 1;
}
- ret = io_getevents(td->aio_ctx, min_evts, td->aio_cur_depth, td->aio_events, timeout);
+ ret = io_getevents(td->aio_ctx, min_evts, td->cur_depth, td->aio_events, timeout);
if (ret < 0) {
td->error = errno;
break;
} else if (!ret)
continue;
- gettimeofday(&e, NULL);
-
- for (i = 0; i < ret; i++) {
- struct io_event *ev = td->aio_events + i;
-
- td->io_blocks++;
-
- iocb = ev->obj;
-
- msec = msec_now(&e) - iocb_time(iocb);
- add_stat_sample(td, msec);
-
- if (msec < td->min_latency)
- td->min_latency = msec;
- if (msec > td->max_latency)
- td->max_latency = msec;
-
- aio_put_iocb(td, iocb);
- }
+ ios_completed(td, ret);
/*
* the rate is batched for now, it should work for batches
* of completions except the very first one which may look
* a little bursty
*/
+ gettimeofday(&e, NULL);
usec = utime_since(&s, &e);
rate_throttle(td, usec);
if (runtime_exceeded(td, &e))
break;
}
-}
-static void cleanup_pending_aio(struct thread_data *td)
-{
- struct timespec ts = { .tv_sec = 0, .tv_nsec = 0};
- unsigned int i;
- int r;
-
- /*
- * get immediately available events, if any
- */
- r = io_getevents(td->aio_ctx, 0, td->aio_cur_depth, td->aio_events, &ts);
- if (r > 0) {
- for (i = 0; i < r; i++)
- aio_put_iocb(td, &td->aio_iocbs[i]);
- }
-
- /*
- * now cancel remaining active events
- */
- for (i = 0; i < td->aio_depth; i++) {
- if (td->aio_iocbs_status[i] == 0)
- continue;
-
- r = io_cancel(td->aio_ctx, &td->aio_iocbs[i], td->aio_events);
- if (!r)
- aio_put_iocb(td, &td->aio_iocbs[i]);
- }
-
- if (td->aio_cur_depth)
- io_getevents(td->aio_ctx, td->aio_cur_depth, td->aio_cur_depth, td->aio_events, NULL);
+ if (td->cur_depth)
+ cleanup_pending_aio(td);
}
static void cleanup_aio(struct thread_data *td)
{
- if (td->aio_cur_depth)
- cleanup_pending_aio(td);
-
io_destroy(td->aio_ctx);
- if (td->aio_iocbs)
- free(td->aio_iocbs);
if (td->aio_events)
free(td->aio_events);
- if (td->aio_iocbs_status)
- free(td->aio_iocbs_status);
}
static int init_aio(struct thread_data *td)
return 1;
}
- td->aio_iocbs = malloc(td->aio_depth * sizeof(struct iocb));
td->aio_events = malloc(td->aio_depth * sizeof(struct io_event));
- td->aio_iocbs_status = malloc(td->aio_depth * sizeof(char));
return 0;
}
+static void cleanup_io_u(struct thread_data *td)
+{
+ struct list_head *entry, *n;
+ struct io_u *io_u;
+
+ list_for_each_safe(entry, n, &td->io_u_freelist) {
+ io_u = list_entry(entry, struct io_u, list);
+
+ list_del(&io_u->list);
+ free(io_u->mem);
+ free(io_u);
+ }
+}
+
+static void init_io_u(struct thread_data *td)
+{
+ struct io_u *io_u;
+ int i, max_units;
+
+ if (!td->use_aio)
+ max_units = 1;
+ else
+ max_units = td->aio_depth;
+
+ INIT_LIST_HEAD(&td->io_u_freelist);
+ INIT_LIST_HEAD(&td->io_u_busylist);
+
+ for (i = 0; i < max_units; i++) {
+ io_u = malloc(sizeof(*io_u));
+ memset(io_u, 0, sizeof(*io_u));
+ INIT_LIST_HEAD(&io_u->list);
+
+ io_u->mem = malloc(td->bs + MASK);
+ io_u->buf = ALIGN(io_u->mem);
+ io_u->buflen = td->bs;
+
+ list_add(&io_u->list, &td->io_u_freelist);
+ }
+}
+
static int create_file(struct thread_data *td)
{
unsigned int i;
static void *thread_main(int shm_id, int offset, char *argv[])
{
struct thread_data *td;
- void *data, *ptr = NULL;
int ret = 1;
+ void *data;
setsid();
td = data + offset * sizeof(struct thread_data);
td->pid = getpid();
+ init_io_u(td);
+
if (sched_setaffinity(td->pid, sizeof(td->cpumask), &td->cpumask) == -1) {
td->error = errno;
goto err;
if (setup_file(td))
goto err;
- if (td->use_aio)
- ptr = malloc(td->bs * td->aio_depth + MASK);
- else
- ptr = malloc(td->bs + MASK);
-
- td->buf = ALIGN(ptr);
-
sem_post(&startup_sem);
sem_wait(&td->mutex);
close(td->fd);
td->fd = -1;
}
+ cleanup_io_u(td);
if (ret) {
sem_post(&startup_sem);
sem_wait(&td->mutex);
}
- if (ptr)
- free(ptr);
td->runstate = TD_EXITED;
shmdt(data);
return NULL;