return 0;
}
-static struct fio_file *get_next_file(struct thread_data *td)
-{
- unsigned int old_next_file = td->next_file;
- struct fio_file *f;
-
- do {
- f = &td->files[td->next_file];
-
- td->next_file++;
- if (td->next_file >= td->nr_files)
- td->next_file = 0;
-
- if (f->fd != -1)
- break;
-
- f = NULL;
- } while (td->next_file != old_next_file);
-
- return f;
-}
-
/*
* When job exits, we can cancel the in-flight IO if we are using async
* io. Attempt to do so.
*/
static void cleanup_pending_aio(struct thread_data *td)
{
- struct timespec ts = { .tv_sec = 0, .tv_nsec = 0};
struct list_head *entry, *n;
- struct io_completion_data icd;
struct io_u *io_u;
int r;
/*
* get immediately available events, if any
*/
- r = td_io_getevents(td, 0, td->cur_depth, &ts);
- if (r > 0) {
- init_icd(&icd, NULL, r);
- ios_completed(td, &icd);
- }
+ io_u_queued_complete(td, 0, NULL);
/*
* now cancel remaining active events
}
}
- if (td->cur_depth) {
- r = td_io_getevents(td, td->cur_depth, td->cur_depth, NULL);
- if (r > 0) {
- init_icd(&icd, NULL, r);
- ios_completed(td, &icd);
- }
- }
+ if (td->cur_depth)
+ io_u_queued_complete(td, td->cur_depth, NULL);
}
/*
static int fio_io_sync(struct thread_data *td, struct fio_file *f)
{
struct io_u *io_u = __get_io_u(td);
- struct io_completion_data icd;
int ret;
if (!io_u)
return 1;
}
+requeue:
ret = td_io_queue(td, io_u);
if (ret < 0) {
td_verror(td, io_u->error);
put_io_u(td, io_u);
return 1;
} else if (ret == FIO_Q_QUEUED) {
- ret = td_io_getevents(td, 1, td->cur_depth, NULL);
- if (ret < 0) {
- td_verror(td, ret);
+ if (io_u_queued_complete(td, 1, NULL))
return 1;
- }
-
- init_icd(&icd, NULL, ret);
- ios_completed(td, &icd);
- if (icd.error) {
- td_verror(td, icd.error);
- return 1;
- }
} else if (ret == FIO_Q_COMPLETED) {
if (io_u->error) {
td_verror(td, io_u->error);
return 1;
}
- init_icd(&icd, NULL, 1);
- io_completed(td, io_u, &icd);
- put_io_u(td, io_u);
+ io_u_sync_complete(td, io_u, NULL);
+ } else if (ret == FIO_Q_BUSY) {
+ if (td_io_commit(td))
+ return 1;
+ goto requeue;
}
return 0;
io_u = NULL;
while (!td->terminate) {
- struct io_completion_data icd;
- struct timespec *timeout;
-
io_u = __get_io_u(td);
if (!io_u)
break;
switch (ret) {
case FIO_Q_COMPLETED:
if (io_u->error)
- ret = io_u->error;
+ ret = -io_u->error;
if (io_u->xfer_buflen != io_u->resid && io_u->resid) {
int bytes = io_u->xfer_buflen - io_u->resid;
io_u->xfer_buf += bytes;
goto requeue;
}
- init_icd(&icd, verify_io_u, 1);
- io_completed(td, io_u, &icd);
- if (icd.error) {
- ret = icd.error;
+ ret = io_u_sync_complete(td, io_u, verify_io_u);
+ if (ret)
break;
- }
- put_io_u(td, io_u);
continue;
case FIO_Q_QUEUED:
break;
+ case FIO_Q_BUSY:
+ requeue_io_u(td, &io_u);
+ ret = td_io_commit(td);
+ break;
default:
assert(ret < 0);
- td_verror(td, ret);
+ td_verror(td, -ret);
break;
}
- if (ret < 0)
+ if (ret < 0 || td->error)
break;
/*
* if we can queue more, do so. but check if there are
* completed io_u's first.
*/
- if (queue_full(td)) {
- timeout = NULL;
+ min_events = 0;
+ if (queue_full(td) || ret == FIO_Q_BUSY)
min_events = 1;
- } else {
- struct timespec ts;
-
- ts.tv_sec = 0;
- ts.tv_nsec = 0;
- timeout = &ts;
- min_events = 0;
- }
/*
* Reap required number of io units, if any, and do the
* verification on them through the callback handler
*/
- ret = td_io_getevents(td, min_events, td->cur_depth, timeout);
- if (ret < 0)
+ if (io_u_queued_complete(td, min_events, verify_io_u))
break;
- else if (!ret)
- continue;
-
- init_icd(&icd, verify_io_u, ret);
- ios_completed(td, &icd);
-
- if (icd.error) {
- td_verror(td, icd.error);
- break;
- }
}
if (io_u)
*/
static void do_io(struct thread_data *td)
{
- struct io_completion_data icd;
struct timeval s;
unsigned long usec;
- struct fio_file *f;
int i, ret = 0;
td_set_runstate(td, TD_RUNNING);
while ((td->this_io_bytes[0] + td->this_io_bytes[1]) < td->io_size) {
- struct timespec *timeout;
+ struct timeval comp_time;
+ long bytes_done = 0;
int min_evts = 0;
struct io_u *io_u;
if (td->terminate)
break;
- f = get_next_file(td);
- if (!f)
- break;
-
- io_u = get_io_u(td, f);
+ io_u = get_io_u(td);
if (!io_u)
break;
memcpy(&s, &io_u->start_time, sizeof(s));
+
+ if (runtime_exceeded(td, &s)) {
+ put_io_u(td, io_u);
+ break;
+ }
requeue:
ret = td_io_queue(td, io_u);
io_u->xfer_buf += bytes;
goto requeue;
}
- init_icd(&icd, NULL, 1);
- io_completed(td, io_u, &icd);
- put_io_u(td, io_u);
+ fio_gettime(&comp_time, NULL);
+ bytes_done = io_u_sync_complete(td, io_u, NULL);
+ if (bytes_done < 0)
+ ret = bytes_done;
break;
case FIO_Q_QUEUED:
break;
+ case FIO_Q_BUSY:
+ requeue_io_u(td, &io_u);
+ ret = td_io_commit(td);
+ break;
default:
assert(ret < 0);
put_io_u(td, io_u);
break;
}
- if (ret < 0)
+ if (ret < 0 || td->error)
break;
- add_slat_sample(td, io_u->ddir, mtime_since(&io_u->start_time, &io_u->issue_time));
+ if (io_u)
+ add_slat_sample(td, io_u->ddir, mtime_since(&io_u->start_time, &io_u->issue_time));
- if (ret == FIO_Q_QUEUED) {
- if (td->cur_depth < td->iodepth) {
- struct timespec ts;
-
- ts.tv_sec = 0;
- ts.tv_nsec = 0;
- timeout = &ts;
- min_evts = 0;
- } else {
- timeout = NULL;
+ /*
+ * See if we need to complete some commands
+ */
+ if (ret == FIO_Q_QUEUED || ret == FIO_Q_BUSY) {
+ min_evts = 0;
+ if (queue_full(td) || ret == FIO_Q_BUSY)
min_evts = 1;
- }
-
- ret = td_io_getevents(td, min_evts, td->cur_depth, timeout);
- if (ret < 0) {
- td_verror(td, ret);
- break;
- } else if (!ret)
- continue;
- init_icd(&icd, NULL, ret);
- ios_completed(td, &icd);
- if (icd.error) {
- td_verror(td, icd.error);
+ fio_gettime(&comp_time, NULL);
+ bytes_done = io_u_queued_complete(td, min_evts, NULL);
+ if (bytes_done < 0)
break;
- }
}
+ if (!bytes_done)
+ continue;
+
/*
* the rate is batched for now, it should work for batches
* of completions except the very first one which may look
* a little bursty
*/
- usec = utime_since(&s, &icd.time);
+ usec = utime_since(&s, &comp_time);
- rate_throttle(td, usec, icd.bytes_done[td->ddir], td->ddir);
+ rate_throttle(td, usec, bytes_done, td->ddir);
- if (check_min_rate(td, &icd.time)) {
+ if (check_min_rate(td, &comp_time)) {
if (exitall_on_terminate)
terminate_threads(td->groupid, 0);
td_verror(td, ENODATA);
break;
}
- if (runtime_exceeded(td, &icd.time))
- break;
-
if (td->thinktime) {
unsigned long long b;
}
if (!td->error) {
+ struct fio_file *f;
+
if (td->cur_depth)
cleanup_pending_aio(td);
INIT_LIST_HEAD(&td->io_u_freelist);
INIT_LIST_HEAD(&td->io_u_busylist);
+ INIT_LIST_HEAD(&td->io_u_requeues);
INIT_LIST_HEAD(&td->io_hist_list);
INIT_LIST_HEAD(&td->io_log_list);