*/
static void cleanup_pending_aio(struct thread_data *td)
{
- struct timespec ts = { .tv_sec = 0, .tv_nsec = 0};
struct list_head *entry, *n;
- struct io_completion_data icd;
struct io_u *io_u;
int r;
/*
* get immediately available events, if any
*/
- r = td_io_getevents(td, 0, td->cur_depth, &ts);
- if (r > 0) {
- icd.nr = r;
- ios_completed(td, &icd);
- }
+ io_u_queued_complete(td, 0, NULL);
/*
* now cancel remaining active events
}
}
- if (td->cur_depth) {
- r = td_io_getevents(td, td->cur_depth, td->cur_depth, NULL);
- if (r > 0) {
- icd.nr = r;
- ios_completed(td, &icd);
- }
- }
+ if (td->cur_depth)
+ io_u_queued_complete(td, td->cur_depth, NULL);
}
/*
static int fio_io_sync(struct thread_data *td, struct fio_file *f)
{
struct io_u *io_u = __get_io_u(td);
- struct io_completion_data icd;
int ret;
if (!io_u)
put_io_u(td, io_u);
return 1;
} else if (ret == FIO_Q_QUEUED) {
- ret = td_io_getevents(td, 1, td->cur_depth, NULL);
- if (ret < 0) {
- td_verror(td, ret);
+ if (io_u_queued_complete(td, 1, NULL))
return 1;
- }
-
- icd.nr = ret;
- ios_completed(td, &icd);
- if (icd.error) {
- td_verror(td, icd.error);
- return 1;
- }
} else if (ret == FIO_Q_COMPLETED) {
if (io_u->error) {
td_verror(td, io_u->error);
return 1;
}
- init_icd(&icd);
- io_completed(td, io_u, &icd);
- put_io_u(td, io_u);
+ io_u_sync_complete(td, io_u, NULL);
}
return 0;
{
struct fio_file *f;
struct io_u *io_u;
- int ret, i;
+ int ret, i, min_events;
/*
* sync io first and invalidate cache, to make sure we really
switch (ret) {
case FIO_Q_COMPLETED:
if (io_u->error)
- ret = io_u->error;
+ ret = -io_u->error;
if (io_u->xfer_buflen != io_u->resid && io_u->resid) {
int bytes = io_u->xfer_buflen - io_u->resid;
io_u->xfer_buf += bytes;
goto requeue;
}
- if (do_io_u_verify(td, &io_u)) {
- ret = -EIO;
+ ret = io_u_sync_complete(td, io_u, verify_io_u);
+ if (ret)
break;
- }
continue;
case FIO_Q_QUEUED:
break;
default:
assert(ret < 0);
- td_verror(td, ret);
+ td_verror(td, -ret);
break;
}
- /*
- * We get here for a queued request, in the future we
- * want to later make this take full advantage of
- * keeping IO in flight while verifying others.
- */
- ret = td_io_getevents(td, 1, 1, NULL);
- if (ret < 0)
+ if (ret < 0 || td->error)
break;
- assert(ret == 1);
- io_u = td->io_ops->event(td, 0);
+ /*
+ * if we can queue more, do so. but check if there are
+ * completed io_u's first.
+ */
+ min_events = 0;
+ if (queue_full(td))
+ min_events = 1;
- if (do_io_u_verify(td, &io_u))
+ /*
+ * Reap required number of io units, if any, and do the
+ * verification on them through the callback handler
+ */
+ if (io_u_queued_complete(td, min_events, verify_io_u))
break;
}
*/
static void do_io(struct thread_data *td)
{
- struct io_completion_data icd;
struct timeval s;
unsigned long usec;
struct fio_file *f;
td_set_runstate(td, TD_RUNNING);
while ((td->this_io_bytes[0] + td->this_io_bytes[1]) < td->io_size) {
- struct timespec *timeout;
+ struct timeval comp_time;
+ long bytes_done = 0;
int min_evts = 0;
struct io_u *io_u;
break;
memcpy(&s, &io_u->start_time, sizeof(s));
+
+ if (runtime_exceeded(td, &s)) {
+ put_io_u(td, io_u);
+ break;
+ }
requeue:
ret = td_io_queue(td, io_u);
io_u->xfer_buf += bytes;
goto requeue;
}
- init_icd(&icd);
- io_completed(td, io_u, &icd);
- put_io_u(td, io_u);
+ fio_gettime(&comp_time, NULL);
+ bytes_done = io_u_sync_complete(td, io_u, NULL);
+ if (bytes_done < 0)
+ ret = bytes_done;
break;
case FIO_Q_QUEUED:
break;
break;
}
- if (ret < 0)
+ if (ret < 0 || td->error)
break;
add_slat_sample(td, io_u->ddir, mtime_since(&io_u->start_time, &io_u->issue_time));
+ /*
+ * See if we need to complete some commands
+ */
if (ret == FIO_Q_QUEUED) {
- if (td->cur_depth < td->iodepth) {
- struct timespec ts;
-
- ts.tv_sec = 0;
- ts.tv_nsec = 0;
- timeout = &ts;
- min_evts = 0;
- } else {
- timeout = NULL;
+ min_evts = 0;
+ if (queue_full(td))
min_evts = 1;
- }
- ret = td_io_getevents(td, min_evts, td->cur_depth, timeout);
- if (ret < 0) {
- td_verror(td, ret);
+ fio_gettime(&comp_time, NULL);
+ bytes_done = io_u_queued_complete(td, min_evts, NULL);
+ if (bytes_done < 0)
break;
- } else if (!ret)
- continue;
-
- icd.nr = ret;
- ios_completed(td, &icd);
- if (icd.error) {
- td_verror(td, icd.error);
- break;
- }
}
+ if (!bytes_done)
+ continue;
+
/*
* the rate is batched for now, it should work for batches
* of completions except the very first one which may look
* a little bursty
*/
- usec = utime_since(&s, &icd.time);
+ usec = utime_since(&s, &comp_time);
- rate_throttle(td, usec, icd.bytes_done[td->ddir], td->ddir);
+ rate_throttle(td, usec, bytes_done, td->ddir);
- if (check_min_rate(td, &icd.time)) {
+ if (check_min_rate(td, &comp_time)) {
if (exitall_on_terminate)
terminate_threads(td->groupid, 0);
td_verror(td, ENODATA);
break;
}
- if (runtime_exceeded(td, &icd.time))
- break;
-
if (td->thinktime) {
unsigned long long b;