after fio has filled the queue of 16 requests, it will let
the depth drain down to 4 before starting to fill it again.
+io_submit_mode=str This option controls how fio submits the IO to
+ the IO engine. The default is 'inline', which means that the
+ fio job threads submit and reap IO directly. If set to
+ 'offload', the job threads will offload IO submission to a
+ dedicated pool of IO threads. This requires some coordination
+ and thus has a bit of extra overhead, especially for lower
+ queue depth IO where it can increase latencies. The benefit
+ is that fio can manage submission rates independently of
+ the device completion rates. This avoids skewed latency
+ reporting if IO gets back up on the device side (the
+ coordinated omission problem).
+
direct=bool If value is true, use non-buffered io. This is usually
O_DIRECT. Note that ZFS on Solaris doesn't support direct io.
On Windows the synchronous ioengines don't support direct io.
lib/lfsr.c gettime-thread.c helpers.c lib/flist_sort.c \
lib/hweight.c lib/getrusage.c idletime.c td_error.c \
profiles/tiobench.c profiles/act.c io_u_queue.c filelock.c \
- lib/tp.c lib/bloom.c lib/gauss.c
+ lib/tp.c lib/bloom.c lib/gauss.c workqueue.c
ifdef CONFIG_LIBHDFS
HDFSFLAGS= -I $(JAVA_HOME)/include -I $(JAVA_HOME)/include/linux -I $(FIO_LIBHDFS_INCLUDE)
#include "idletime.h"
#include "err.h"
#include "lib/tp.h"
+#include "workqueue.h"
static pthread_t helper_thread;
static pthread_mutex_t helper_lock;
return 0;
}
-static int check_min_rate(struct thread_data *td, struct timeval *now,
- uint64_t *bytes_done)
+static int check_min_rate(struct thread_data *td, struct timeval *now)
{
int ret = 0;
- if (bytes_done[DDIR_READ])
+ if (td->bytes_done[DDIR_READ])
ret |= __check_min_rate(td, now, DDIR_READ);
- if (bytes_done[DDIR_WRITE])
+ if (td->bytes_done[DDIR_WRITE])
ret |= __check_min_rate(td, now, DDIR_WRITE);
- if (bytes_done[DDIR_TRIM])
+ if (td->bytes_done[DDIR_TRIM])
ret |= __check_min_rate(td, now, DDIR_TRIM);
return ret;
/*
* get immediately available events, if any
*/
- r = io_u_queued_complete(td, 0, NULL);
+ r = io_u_queued_complete(td, 0);
if (r < 0)
return;
}
if (td->cur_depth)
- r = io_u_queued_complete(td, td->cur_depth, NULL);
+ r = io_u_queued_complete(td, td->cur_depth);
}
/*
put_io_u(td, io_u);
return 1;
} else if (ret == FIO_Q_QUEUED) {
- if (io_u_queued_complete(td, 1, NULL) < 0)
+ if (io_u_queued_complete(td, 1) < 0)
return 1;
} else if (ret == FIO_Q_COMPLETED) {
if (io_u->error) {
return 1;
}
- if (io_u_sync_complete(td, io_u, NULL) < 0)
+ if (io_u_sync_complete(td, io_u) < 0)
return 1;
} else if (ret == FIO_Q_BUSY) {
if (td_io_commit(td))
}
}
-static int wait_for_completions(struct thread_data *td, struct timeval *time,
- uint64_t *bytes_done)
+static int wait_for_completions(struct thread_data *td, struct timeval *time)
{
const int full = queue_full(td);
int min_evts = 0;
fio_gettime(time, NULL);
do {
- ret = io_u_queued_complete(td, min_evts, bytes_done);
+ ret = io_u_queued_complete(td, min_evts);
if (ret < 0)
break;
} while (full && (td->cur_depth > td->o.iodepth_low));
return ret;
}
+int io_queue_event(struct thread_data *td, struct io_u *io_u, int *ret,
+ enum fio_ddir ddir, uint64_t *bytes_issued, int from_verify,
+ struct timeval *comp_time)
+{
+ int ret2;
+
+ switch (*ret) {
+ case FIO_Q_COMPLETED:
+ if (io_u->error) {
+ *ret = -io_u->error;
+ clear_io_u(td, io_u);
+ } else if (io_u->resid) {
+ int bytes = io_u->xfer_buflen - io_u->resid;
+ struct fio_file *f = io_u->file;
+
+ if (bytes_issued)
+ *bytes_issued += bytes;
+
+ if (!from_verify)
+ trim_io_piece(td, io_u);
+
+ /*
+ * zero read, fail
+ */
+ if (!bytes) {
+ if (!from_verify)
+ unlog_io_piece(td, io_u);
+ td_verror(td, EIO, "full resid");
+ put_io_u(td, io_u);
+ break;
+ }
+
+ io_u->xfer_buflen = io_u->resid;
+ io_u->xfer_buf += bytes;
+ io_u->offset += bytes;
+
+ if (ddir_rw(io_u->ddir))
+ td->ts.short_io_u[io_u->ddir]++;
+
+ f = io_u->file;
+ if (io_u->offset == f->real_file_size)
+ goto sync_done;
+
+ requeue_io_u(td, &io_u);
+ } else {
+sync_done:
+ if (comp_time && (__should_check_rate(td, DDIR_READ) ||
+ __should_check_rate(td, DDIR_WRITE) ||
+ __should_check_rate(td, DDIR_TRIM)))
+ fio_gettime(comp_time, NULL);
+
+ *ret = io_u_sync_complete(td, io_u);
+ if (*ret < 0)
+ break;
+ }
+ return 0;
+ case FIO_Q_QUEUED:
+ /*
+ * if the engine doesn't have a commit hook,
+ * the io_u is really queued. if it does have such
+ * a hook, it has to call io_u_queued() itself.
+ */
+ if (td->io_ops->commit == NULL)
+ io_u_queued(td, io_u);
+ if (bytes_issued)
+ *bytes_issued += io_u->xfer_buflen;
+ break;
+ case FIO_Q_BUSY:
+ if (!from_verify)
+ unlog_io_piece(td, io_u);
+ requeue_io_u(td, &io_u);
+ ret2 = td_io_commit(td);
+ if (ret2 < 0)
+ *ret = ret2;
+ break;
+ default:
+ assert(ret < 0);
+ td_verror(td, -(*ret), "td_io_queue");
+ break;
+ }
+
+ if (break_on_this_error(td, ddir, ret))
+ return 1;
+
+ return 0;
+}
+
/*
* The main verify engine. Runs over the writes we previously submitted,
* reads the blocks back in, and checks the crc/md5 of the data.
*/
static void do_verify(struct thread_data *td, uint64_t verify_bytes)
{
- uint64_t bytes_done[DDIR_RWDIR_CNT] = { 0, 0, 0 };
struct fio_file *f;
struct io_u *io_u;
int ret, min_events;
io_u = NULL;
while (!td->terminate) {
enum fio_ddir ddir;
- int ret2, full;
+ int full;
update_tv_cache(td);
check_update_rusage(td);
break;
}
} else {
- if (ddir_rw_sum(bytes_done) + td->o.rw_min_bs > verify_bytes)
+ if (ddir_rw_sum(td->bytes_done) + td->o.rw_min_bs > verify_bytes)
break;
while ((io_u = get_io_u(td)) != NULL) {
continue;
} else if (io_u->ddir == DDIR_TRIM) {
io_u->ddir = DDIR_READ;
- io_u->flags |= IO_U_F_TRIMMED;
+ io_u_set(io_u, IO_U_F_TRIMMED);
break;
} else if (io_u->ddir == DDIR_WRITE) {
io_u->ddir = DDIR_READ;
fio_gettime(&io_u->start_time, NULL);
ret = td_io_queue(td, io_u);
- switch (ret) {
- case FIO_Q_COMPLETED:
- if (io_u->error) {
- ret = -io_u->error;
- clear_io_u(td, io_u);
- } else if (io_u->resid) {
- int bytes = io_u->xfer_buflen - io_u->resid;
-
- /*
- * zero read, fail
- */
- if (!bytes) {
- td_verror(td, EIO, "full resid");
- put_io_u(td, io_u);
- break;
- }
-
- io_u->xfer_buflen = io_u->resid;
- io_u->xfer_buf += bytes;
- io_u->offset += bytes;
-
- if (ddir_rw(io_u->ddir))
- td->ts.short_io_u[io_u->ddir]++;
- f = io_u->file;
- if (io_u->offset == f->real_file_size)
- goto sync_done;
-
- requeue_io_u(td, &io_u);
- } else {
-sync_done:
- ret = io_u_sync_complete(td, io_u, bytes_done);
- if (ret < 0)
- break;
- }
- continue;
- case FIO_Q_QUEUED:
- break;
- case FIO_Q_BUSY:
- requeue_io_u(td, &io_u);
- ret2 = td_io_commit(td);
- if (ret2 < 0)
- ret = ret2;
- break;
- default:
- assert(ret < 0);
- td_verror(td, -ret, "td_io_queue");
- break;
- }
-
- if (break_on_this_error(td, ddir, &ret))
+ if (io_queue_event(td, io_u, &ret, ddir, NULL, 1, NULL))
break;
/*
reap:
full = queue_full(td) || (ret == FIO_Q_BUSY && td->cur_depth);
if (full || !td->o.iodepth_batch_complete)
- ret = wait_for_completions(td, NULL, bytes_done);
+ ret = wait_for_completions(td, NULL);
if (ret < 0)
break;
min_events = td->cur_depth;
if (min_events)
- ret = io_u_queued_complete(td, min_events, NULL);
+ ret = io_u_queued_complete(td, min_events);
} else
cleanup_pending_aio(td);
*/
static uint64_t do_io(struct thread_data *td)
{
- uint64_t bytes_done[DDIR_RWDIR_CNT] = { 0, 0, 0 };
unsigned int i;
int ret = 0;
uint64_t total_bytes, bytes_issued = 0;
td->o.time_based) {
struct timeval comp_time;
struct io_u *io_u;
- int ret2, full;
+ int full;
enum fio_ddir ddir;
check_update_rusage(td);
!td->o.experimental_verify)
log_io_piece(td, io_u);
- ret = td_io_queue(td, io_u);
- switch (ret) {
- case FIO_Q_COMPLETED:
- if (io_u->error) {
- ret = -io_u->error;
- unlog_io_piece(td, io_u);
- clear_io_u(td, io_u);
- } else if (io_u->resid) {
- int bytes = io_u->xfer_buflen - io_u->resid;
- struct fio_file *f = io_u->file;
-
- bytes_issued += bytes;
-
- trim_io_piece(td, io_u);
-
- /*
- * zero read, fail
- */
- if (!bytes) {
- unlog_io_piece(td, io_u);
- td_verror(td, EIO, "full resid");
- put_io_u(td, io_u);
- break;
- }
-
- io_u->xfer_buflen = io_u->resid;
- io_u->xfer_buf += bytes;
- io_u->offset += bytes;
-
- if (ddir_rw(io_u->ddir))
- td->ts.short_io_u[io_u->ddir]++;
-
- if (io_u->offset == f->real_file_size)
- goto sync_done;
+ if (td->o.io_submit_mode == IO_MODE_OFFLOAD) {
+ if (td->error)
+ break;
+ ret = workqueue_enqueue(&td->io_wq, io_u);
+ } else {
+ ret = td_io_queue(td, io_u);
- requeue_io_u(td, &io_u);
- } else {
-sync_done:
- if (__should_check_rate(td, DDIR_READ) ||
- __should_check_rate(td, DDIR_WRITE) ||
- __should_check_rate(td, DDIR_TRIM))
- fio_gettime(&comp_time, NULL);
+ if (io_queue_event(td, io_u, &ret, ddir, &bytes_issued, 1, &comp_time))
+ break;
- ret = io_u_sync_complete(td, io_u, bytes_done);
- if (ret < 0)
- break;
- bytes_issued += io_u->xfer_buflen;
- }
- break;
- case FIO_Q_QUEUED:
/*
- * if the engine doesn't have a commit hook,
- * the io_u is really queued. if it does have such
- * a hook, it has to call io_u_queued() itself.
+ * See if we need to complete some commands. Note that
+ * we can get BUSY even without IO queued, if the
+ * system is resource starved.
*/
- if (td->io_ops->commit == NULL)
- io_u_queued(td, io_u);
- bytes_issued += io_u->xfer_buflen;
- break;
- case FIO_Q_BUSY:
- unlog_io_piece(td, io_u);
- requeue_io_u(td, &io_u);
- ret2 = td_io_commit(td);
- if (ret2 < 0)
- ret = ret2;
- break;
- default:
- assert(ret < 0);
- put_io_u(td, io_u);
- break;
- }
-
- if (break_on_this_error(td, ddir, &ret))
- break;
-
- /*
- * See if we need to complete some commands. Note that we
- * can get BUSY even without IO queued, if the system is
- * resource starved.
- */
reap:
- full = queue_full(td) || (ret == FIO_Q_BUSY && td->cur_depth);
- if (full || !td->o.iodepth_batch_complete)
- ret = wait_for_completions(td, &comp_time, bytes_done);
+ full = queue_full(td) ||
+ (ret == FIO_Q_BUSY && td->cur_depth);
+ if (full || !td->o.iodepth_batch_complete)
+ ret = wait_for_completions(td, &comp_time);
+ }
if (ret < 0)
break;
- if (!ddir_rw_sum(bytes_done) && !(td->io_ops->flags & FIO_NOIO))
+ if (!ddir_rw_sum(td->bytes_done) &&
+ !(td->io_ops->flags & FIO_NOIO))
continue;
- if (!in_ramp_time(td) && should_check_rate(td, bytes_done)) {
- if (check_min_rate(td, &comp_time, bytes_done)) {
+ if (!in_ramp_time(td) && should_check_rate(td)) {
+ if (check_min_rate(td, &comp_time)) {
if (exitall_on_terminate)
fio_terminate_threads(td->groupid);
td_verror(td, EIO, "check_min_rate");
if (!td->error) {
struct fio_file *f;
- i = td->cur_depth;
+ if (td->o.io_submit_mode == IO_MODE_OFFLOAD) {
+ workqueue_flush(&td->io_wq);
+ i = 0;
+ } else
+ i = td->cur_depth;
+
if (i) {
- ret = io_u_queued_complete(td, i, bytes_done);
+ ret = io_u_queued_complete(td, i);
if (td->o.fill_device && td->error == ENOSPC)
td->error = 0;
}
if (!ddir_rw_sum(td->this_io_bytes))
td->done = 1;
- return bytes_done[DDIR_WRITE] + bytes_done[DDIR_TRIM];
+ return td->bytes_done[DDIR_WRITE] + td->bytes_done[DDIR_TRIM];
}
static void cleanup_io_u(struct thread_data *td)
*/
static uint64_t do_dry_run(struct thread_data *td)
{
- uint64_t bytes_done[DDIR_RWDIR_CNT] = { 0, 0, 0 };
-
td_set_runstate(td, TD_RUNNING);
while ((td->o.read_iolog_file && !flist_empty(&td->io_log_list)) ||
if (!io_u)
break;
- io_u->flags |= IO_U_F_FLIGHT;
+ io_u_set(io_u, IO_U_F_FLIGHT);
io_u->error = 0;
io_u->resid = 0;
if (ddir_rw(acct_ddir(io_u)))
!td->o.experimental_verify)
log_io_piece(td, io_u);
- ret = io_u_sync_complete(td, io_u, bytes_done);
+ ret = io_u_sync_complete(td, io_u);
(void) ret;
}
- return bytes_done[DDIR_WRITE] + bytes_done[DDIR_TRIM];
+ return td->bytes_done[DDIR_WRITE] + td->bytes_done[DDIR_TRIM];
+}
+
+static void io_workqueue_fn(struct thread_data *td, struct io_u *io_u)
+{
+ const enum fio_ddir ddir = io_u->ddir;
+ int ret;
+
+ dprint(FD_RATE, "io_u %p queued by %u\n", io_u, gettid());
+
+ io_u_set(io_u, IO_U_F_NO_FILE_PUT);
+
+ td->cur_depth++;
+
+ ret = td_io_queue(td, io_u);
+
+ dprint(FD_RATE, "io_u %p ret %d by %u\n", io_u, ret, gettid());
+
+ io_queue_event(td, io_u, &ret, ddir, NULL, 0, NULL);
+
+ if (ret == FIO_Q_QUEUED)
+ ret = io_u_queued_complete(td, 1);
+
+ td->cur_depth--;
}
/*
fio_verify_init(td);
+ if ((o->io_submit_mode == IO_MODE_OFFLOAD) &&
+ workqueue_init(td, &td->io_wq, io_workqueue_fn, td->o.iodepth))
+ goto err;
+
fio_gettime(&td->epoch, NULL);
fio_getrusage(&td->ru_start);
clear_state = 0;
fio_writeout_logs(td);
+ if (o->io_submit_mode == IO_MODE_OFFLOAD)
+ workqueue_exit(&td->io_wq);
+
if (td->flags & TD_F_COMPRESS_LOG)
tp_exit(&td->tp_data);
}
o->ratecycle = le32_to_cpu(top->ratecycle);
+ o->io_submit_mode = le32_to_cpu(top->io_submit_mode);
o->nr_files = le32_to_cpu(top->nr_files);
o->open_files = le32_to_cpu(top->open_files);
o->file_lock_mode = le32_to_cpu(top->file_lock_mode);
top->fill_device = cpu_to_le32(o->fill_device);
top->file_append = cpu_to_le32(o->file_append);
top->ratecycle = cpu_to_le32(o->ratecycle);
+ top->io_submit_mode = cpu_to_le32(o->io_submit_mode);
top->nr_files = cpu_to_le32(o->nr_files);
top->open_files = cpu_to_le32(o->open_files);
top->file_lock_mode = cpu_to_le32(o->file_lock_mode);
struct libaio_data *ld = td->io_ops->data;
if (ld) {
- io_destroy(ld->aio_ctx);
+ /*
+ * Work-around to avoid huge RCU stalls at exit time. If we
+ * don't do this here, then it'll be torn down by exit_aio().
+ * But for that case we can parallellize the freeing, thus
+ * speeding it up a lot.
+ */
+ if (!(td->flags & TD_F_CHILD))
+ io_destroy(ld->aio_ctx);
free(ld->aio_events);
free(ld->iocbs);
free(ld->io_us);
--- /dev/null
+[fixed-rate-submit]
+size=128m
+rw=read
+ioengine=libaio
+iodepth=32
+direct=1
+# by setting the submit mode to offload, we can guarantee a fixed rate of
+# submission regardless of what the device completion rate is.
+io_submit_mode=offload
+rate_iops=1000
Low watermark indicating when to start filling the queue again. Default:
\fBiodepth\fR.
.TP
+.BI io_submit_mode \fR=\fPstr
+This option controls how fio submits the IO to the IO engine. The default is
+\fBinline\fR, which means that the fio job threads submit and reap IO directly.
+If set to \fBoffload\fR, the job threads will offload IO submission to a
+dedicated pool of IO threads. This requires some coordination and thus has a
+bit of extra overhead, especially for lower queue depth IO where it can
+increase latencies. The benefit is that fio can manage submission rates
+independently of the device completion rates. This avoids skewed latency
+reporting if IO gets back up on the device side (the coordinated omission
+problem).
+.TP
.BI direct \fR=\fPbool
If true, use non-buffered I/O (usually O_DIRECT). Default: false.
.TP
#include "stat.h"
#include "flow.h"
#include "io_u_queue.h"
+#include "workqueue.h"
#ifdef CONFIG_SOLARISAIO
#include <sys/asynch.h>
TD_F_NOIO = 256,
TD_F_COMPRESS_LOG = 512,
TD_F_VSTATE_SAVED = 1024,
+ TD_F_NEED_LOCK = 2048,
+ TD_F_CHILD = 4096,
};
enum {
FIO_RAND_NR_OFFS,
};
+enum {
+ IO_MODE_INLINE = 0,
+ IO_MODE_OFFLOAD,
+};
+
/*
* This describes a single thread/process executing a fio job.
*/
struct tp_data *tp_data;
+ struct thread_data *parent;
+
uint64_t stat_io_bytes[DDIR_RWDIR_CNT];
struct timeval bw_sample_time;
unsigned long rate_blocks[DDIR_RWDIR_CNT];
struct timeval lastrate[DDIR_RWDIR_CNT];
+ /*
+ * Enforced rate submission/completion workqueue
+ */
+ struct workqueue io_wq;
+
uint64_t total_io_size;
uint64_t fill_device_size;
uint64_t io_blocks[DDIR_RWDIR_CNT];
uint64_t this_io_blocks[DDIR_RWDIR_CNT];
uint64_t io_bytes[DDIR_RWDIR_CNT];
- uint64_t io_skip_bytes;
uint64_t this_io_bytes[DDIR_RWDIR_CNT];
+ uint64_t io_skip_bytes;
uint64_t zone_bytes;
struct fio_mutex *mutex;
+ uint64_t bytes_done[DDIR_RWDIR_CNT];
/*
* State for random io, a bitmap of blocks done vs not done
} while (0)
-#define td_clear_error(td) \
- (td)->error = 0;
-#define td_verror(td, err, func) \
- __td_verror((td), (err), strerror((err)), (func))
-#define td_vmsg(td, err, msg, func) \
- __td_verror((td), (err), (msg), (func))
+#define td_clear_error(td) do { \
+ (td)->error = 0; \
+ if ((td)->parent) \
+ (td)->parent->error = 0; \
+} while (0)
+
+#define td_verror(td, err, func) do { \
+ __td_verror((td), (err), strerror((err)), (func)); \
+ if ((td)->parent) \
+ __td_verror((td)->parent, (err), strerror((err)), (func)); \
+} while (0)
+
+#define td_vmsg(td, err, msg, func) do { \
+ __td_verror((td), (err), (msg), (func)); \
+ if ((td)->parent) \
+ __td_verror((td)->parent, (err), (msg), (func)); \
+} while (0)
#define __fio_stringify_1(x) #x
#define __fio_stringify(x) __fio_stringify_1(x)
return 0;
}
-static inline int should_check_rate(struct thread_data *td,
- uint64_t *bytes_done)
+static inline int should_check_rate(struct thread_data *td)
{
int ret = 0;
- if (bytes_done[DDIR_READ])
+ if (td->bytes_done[DDIR_READ])
ret |= __should_check_rate(td, DDIR_READ);
- if (bytes_done[DDIR_WRITE])
+ if (td->bytes_done[DDIR_WRITE])
ret |= __should_check_rate(td, DDIR_WRITE);
- if (bytes_done[DDIR_TRIM])
+ if (td->bytes_done[DDIR_TRIM])
ret |= __should_check_rate(td, DDIR_TRIM);
return ret;
return (val != 0 && ((val & (val - 1)) == 0));
}
+static inline int td_async_processing(struct thread_data *td)
+{
+ return (td->flags & TD_F_NEED_LOCK) != 0;
+}
+
/*
* We currently only need to do locking if we have verifier threads
* accessing our internal structures too
*/
static inline void td_io_u_lock(struct thread_data *td)
{
- if (td->o.verify_async)
+ if (td_async_processing(td))
pthread_mutex_lock(&td->io_u_lock);
}
static inline void td_io_u_unlock(struct thread_data *td)
{
- if (td->o.verify_async)
+ if (td_async_processing(td))
pthread_mutex_unlock(&td->io_u_lock);
}
static inline void td_io_u_free_notify(struct thread_data *td)
{
- if (td->o.verify_async)
+ if (td_async_processing(td))
pthread_cond_signal(&td->free_cond);
}
td->flags |= TD_F_SCRAMBLE_BUFFERS;
if (o->verify != VERIFY_NONE)
td->flags |= TD_F_VER_NONE;
+
+ if (o->verify_async || o->io_submit_mode == IO_MODE_OFFLOAD)
+ td->flags |= TD_F_NEED_LOCK;
}
static int setup_random_seeds(struct thread_data *td)
*is_random = 1;
} else {
*is_random = 0;
- io_u->flags |= IO_U_F_BUSY_OK;
+ io_u_set(io_u, IO_U_F_BUSY_OK);
ret = get_next_seq_offset(td, f, ddir, &offset);
if (ret)
ret = get_next_rand_block(td, f, ddir, &b);
ret = get_next_seq_offset(td, f, ddir, &offset);
}
} else {
- io_u->flags |= IO_U_F_BUSY_OK;
+ io_u_set(io_u, IO_U_F_BUSY_OK);
*is_random = 0;
if (td->o.rw_seq == RW_SEQ_SEQ) {
while (td->io_u_in_flight) {
int fio_unused ret;
- ret = io_u_queued_complete(td, 1, NULL);
+ ret = io_u_queued_complete(td, 1);
}
}
} else
usec = td->rate_pending_usleep[ddir];
- io_u_quiesce(td);
+ if (td->o.io_submit_mode == IO_MODE_INLINE)
+ io_u_quiesce(td);
usec = usec_sleep(td, usec);
td->o.barrier_blocks &&
!(td->io_issues[DDIR_WRITE] % td->o.barrier_blocks) &&
td->io_issues[DDIR_WRITE])
- io_u->flags |= IO_U_F_BARRIER;
+ io_u_set(io_u, IO_U_F_BARRIER);
}
void put_file_log(struct thread_data *td, struct fio_file *f)
void put_io_u(struct thread_data *td, struct io_u *io_u)
{
+ if (td->parent)
+ td = td->parent;
+
td_io_u_lock(td);
if (io_u->file && !(io_u->flags & IO_U_F_NO_FILE_PUT))
put_file_log(td, io_u->file);
io_u->file = NULL;
- io_u->flags |= IO_U_F_FREE;
+ io_u_set(io_u, IO_U_F_FREE);
- if (io_u->flags & IO_U_F_IN_CUR_DEPTH)
+ if (io_u->flags & IO_U_F_IN_CUR_DEPTH) {
td->cur_depth--;
+ assert(!(td->flags & TD_F_CHILD));
+ }
io_u_qpush(&td->io_u_freelist, io_u);
td_io_u_unlock(td);
td_io_u_free_notify(td);
void clear_io_u(struct thread_data *td, struct io_u *io_u)
{
- io_u->flags &= ~IO_U_F_FLIGHT;
+ io_u_clear(io_u, IO_U_F_FLIGHT);
put_io_u(td, io_u);
}
dprint(FD_IO, "requeue %p\n", __io_u);
+ if (td->parent)
+ td = td->parent;
+
td_io_u_lock(td);
- __io_u->flags |= IO_U_F_FREE;
+ io_u_set(__io_u, IO_U_F_FREE);
if ((__io_u->flags & IO_U_F_FLIGHT) && ddir_rw(ddir))
td->io_issues[ddir]--;
- __io_u->flags &= ~IO_U_F_FLIGHT;
- if (__io_u->flags & IO_U_F_IN_CUR_DEPTH)
+ io_u_clear(__io_u, IO_U_F_FLIGHT);
+ if (__io_u->flags & IO_U_F_IN_CUR_DEPTH) {
td->cur_depth--;
+ assert(!(td->flags & TD_F_CHILD));
+ }
io_u_rpush(&td->io_u_requeues, __io_u);
td_io_u_unlock(td);
+ td_io_u_free_notify(td);
*io_u = NULL;
}
if (io_u) {
assert(io_u->flags & IO_U_F_FREE);
- io_u->flags &= ~(IO_U_F_FREE | IO_U_F_NO_FILE_PUT |
+ io_u_clear(io_u, IO_U_F_FREE | IO_U_F_NO_FILE_PUT |
IO_U_F_TRIMMED | IO_U_F_BARRIER |
IO_U_F_VER_LIST);
io_u->error = 0;
io_u->acct_ddir = -1;
td->cur_depth++;
- io_u->flags |= IO_U_F_IN_CUR_DEPTH;
+ assert(!(td->flags & TD_F_CHILD));
+ io_u_set(io_u, IO_U_F_IN_CUR_DEPTH);
io_u->ipo = NULL;
- } else if (td->o.verify_async) {
+ } else if (td_async_processing(td)) {
/*
* We ran out, wait for async verify threads to finish and
* return one
*/
- pthread_cond_wait(&td->free_cond, &td->io_u_lock);
+ assert(!(td->flags & TD_F_CHILD));
+ assert(!pthread_cond_wait(&td->free_cond, &td->io_u_lock));
goto again;
}
return ERR_PTR(ret);
}
-void io_u_log_error(struct thread_data *td, struct io_u *io_u)
+static void __io_u_log_error(struct thread_data *td, struct io_u *io_u)
{
enum error_type_bit eb = td_error_type(io_u->ddir, io_u->error);
td_verror(td, io_u->error, "io_u error");
}
+void io_u_log_error(struct thread_data *td, struct io_u *io_u)
+{
+ __io_u_log_error(td, io_u);
+ if (td->parent)
+ __io_u_log_error(td, io_u);
+}
+
static inline int gtod_reduce(struct thread_data *td)
{
return td->o.disable_clat && td->o.disable_lat && td->o.disable_slat
struct io_completion_data *icd,
const enum fio_ddir idx, unsigned int bytes)
{
+ const int no_reduce = !gtod_reduce(td);
unsigned long lusec = 0;
- if (!gtod_reduce(td))
+ if (no_reduce)
lusec = utime_since(&io_u->issue_time, &icd->time);
if (!td->o.disable_lat) {
io_u_mark_latency(td, lusec);
}
+ if (td->parent)
+ td = td->parent;
+
if (!td->o.disable_bw)
add_bw_sample(td, idx, bytes, &icd->time);
- if (!gtod_reduce(td))
+ if (no_reduce)
add_iops_sample(td, idx, bytes, &icd->time);
if (td->ts.nr_block_infos && io_u->ddir == DDIR_TRIM) {
{
uint64_t secs, remainder, bps, bytes;
+ assert(!(td->flags & TD_F_CHILD));
bytes = td->this_io_bytes[ddir];
bps = td->rate_bps[ddir];
secs = bytes / bps;
dprint_io_u(io_u, "io complete");
- td_io_u_lock(td);
assert(io_u->flags & IO_U_F_FLIGHT);
- io_u->flags &= ~(IO_U_F_FLIGHT | IO_U_F_BUSY_OK);
+ io_u_clear(io_u, IO_U_F_FLIGHT | IO_U_F_BUSY_OK);
/*
* Mark IO ok to verify
}
}
- td_io_u_unlock(td);
-
if (ddir_sync(ddir)) {
td->last_was_sync = 1;
if (f) {
if (ramp_time_over(td) && (td->runstate == TD_RUNNING ||
td->runstate == TD_VERIFYING)) {
+ struct thread_data *__td = td;
+
account_io_completion(td, io_u, icd, ddir, bytes);
- if (__should_check_rate(td, ddir)) {
- td->rate_pending_usleep[ddir] =
- (usec_for_io(td, ddir) -
- utime_since_now(&td->start));
+ if (td->parent)
+ __td = td->parent;
+
+ if (__should_check_rate(__td, ddir)) {
+ __td->rate_pending_usleep[ddir] =
+ (usec_for_io(__td, ddir) -
+ utime_since_now(&__td->start));
}
if (ddir != DDIR_TRIM &&
- __should_check_rate(td, oddir)) {
- td->rate_pending_usleep[oddir] =
- (usec_for_io(td, oddir) -
- utime_since_now(&td->start));
+ __should_check_rate(__td, oddir)) {
+ __td->rate_pending_usleep[oddir] =
+ (usec_for_io(__td, oddir) -
+ utime_since_now(&__td->start));
}
}
/*
* Complete a single io_u for the sync engines.
*/
-int io_u_sync_complete(struct thread_data *td, struct io_u *io_u,
- uint64_t *bytes)
+int io_u_sync_complete(struct thread_data *td, struct io_u *io_u)
{
struct io_completion_data icd;
+ int ddir;
init_icd(td, &icd, 1);
io_completed(td, &io_u, &icd);
return -1;
}
- if (bytes) {
- int ddir;
-
- for (ddir = DDIR_READ; ddir < DDIR_RWDIR_CNT; ddir++)
- bytes[ddir] += icd.bytes_done[ddir];
- }
+ for (ddir = DDIR_READ; ddir < DDIR_RWDIR_CNT; ddir++)
+ td->bytes_done[ddir] += icd.bytes_done[ddir];
return 0;
}
/*
* Called to complete min_events number of io for the async engines.
*/
-int io_u_queued_complete(struct thread_data *td, int min_evts,
- uint64_t *bytes)
+int io_u_queued_complete(struct thread_data *td, int min_evts)
{
struct io_completion_data icd;
struct timespec *tvp = NULL;
- int ret;
+ int ret, ddir;
struct timespec ts = { .tv_sec = 0, .tv_nsec = 0, };
dprint(FD_IO, "io_u_queued_completed: min=%d\n", min_evts);
return -1;
}
- if (bytes) {
- int ddir;
-
- for (ddir = DDIR_READ; ddir < DDIR_RWDIR_CNT; ddir++)
- bytes[ddir] += icd.bytes_done[ddir];
- }
+ for (ddir = DDIR_READ; ddir < DDIR_RWDIR_CNT; ddir++)
+ td->bytes_done[ddir] += icd.bytes_done[ddir];
return 0;
}
struct ibv_mr *mr;
#endif
void *mmap_data;
+ uint64_t null;
};
};
extern void put_io_u(struct thread_data *, struct io_u *);
extern void clear_io_u(struct thread_data *, struct io_u *);
extern void requeue_io_u(struct thread_data *, struct io_u **);
-extern int __must_check io_u_sync_complete(struct thread_data *, struct io_u *, uint64_t *);
-extern int __must_check io_u_queued_complete(struct thread_data *, int, uint64_t *);
+extern int __must_check io_u_sync_complete(struct thread_data *, struct io_u *);
+extern int __must_check io_u_queued_complete(struct thread_data *, int);
extern void io_u_queued(struct thread_data *, struct io_u *);
extern void io_u_quiesce(struct thread_data *);
extern void io_u_log_error(struct thread_data *, struct io_u *);
return io_u->ddir;
}
+static inline void io_u_clear(struct io_u *io_u, unsigned int flags)
+{
+ __sync_fetch_and_and(&io_u->flags, ~flags);
+}
+
+static inline void io_u_set(struct io_u *io_u, unsigned int flags)
+{
+ __sync_fetch_and_or(&io_u->flags, flags);
+}
+
#endif
int td_io_queue(struct thread_data *td, struct io_u *io_u)
{
+ const enum fio_ddir ddir = acct_ddir(io_u);
+ unsigned long buflen = io_u->xfer_buflen;
int ret;
dprint_io_u(io_u, "queue");
fio_ro_check(td, io_u);
assert((io_u->flags & IO_U_F_FLIGHT) == 0);
- io_u->flags |= IO_U_F_FLIGHT;
+ io_u_set(io_u, IO_U_F_FLIGHT);
assert(fio_file_open(io_u->file));
sizeof(struct timeval));
}
- if (ddir_rw(acct_ddir(io_u))) {
- td->io_issues[acct_ddir(io_u)]++;
- td->io_issue_bytes[acct_ddir(io_u)] += io_u->xfer_buflen;
+ if (ddir_rw(ddir)) {
+ td->io_issues[ddir]++;
+ td->io_issue_bytes[ddir] += buflen;
}
ret = td->io_ops->queue(td, io_u);
unlock_file(td, io_u->file);
- if (ret == FIO_Q_BUSY && ddir_rw(acct_ddir(io_u))) {
- td->io_issues[acct_ddir(io_u)]--;
- td->io_issue_bytes[acct_ddir(io_u)] -= io_u->xfer_buflen;
+ if (ret == FIO_Q_BUSY && ddir_rw(ddir)) {
+ td->io_issues[ddir]--;
+ td->io_issue_bytes[ddir] -= buflen;
}
/*
td->this_io_blocks[ddir] = 0;
td->rate_bytes[ddir] = 0;
td->rate_blocks[ddir] = 0;
+ td->bytes_done[ddir] = 0;
}
td->zone_bytes = 0;
.category = FIO_OPT_C_IO,
.group = FIO_OPT_G_IO_BASIC,
},
+ {
+ .name = "io_submit_mode",
+ .lname = "IO submit mode",
+ .type = FIO_OPT_STR,
+ .off1 = td_var_offset(io_submit_mode),
+ .help = "How IO submissions and completions are done",
+ .def = "inline",
+ .category = FIO_OPT_C_IO,
+ .group = FIO_OPT_G_IO_BASIC,
+ .posval = {
+ { .ival = "inline",
+ .oval = IO_MODE_INLINE,
+ .help = "Submit and complete IO inline",
+ },
+ { .ival = "offload",
+ .oval = IO_MODE_OFFLOAD,
+ .help = "Offload submit and complete to threads",
+ },
+ },
+ },
{
.name = "size",
.lname = "Size",
if (spent < td->o.bw_avg_time)
return;
+ td_io_u_lock(td);
+
/*
* Compute both read and write rates for the interval.
*/
}
fio_gettime(&td->bw_sample_time, NULL);
+ td_io_u_unlock(td);
}
void add_iops_sample(struct thread_data *td, enum fio_ddir ddir, unsigned int bs,
if (spent < td->o.iops_avg_time)
return;
+ td_io_u_lock(td);
+
/*
* Compute both read and write rates for the interval.
*/
}
fio_gettime(&td->iops_sample_time, NULL);
+ td_io_u_unlock(td);
}
void stat_init(void)
unsigned int rate[DDIR_RWDIR_CNT];
unsigned int ratemin[DDIR_RWDIR_CNT];
unsigned int ratecycle;
+ unsigned int io_submit_mode;
unsigned int rate_iops[DDIR_RWDIR_CNT];
unsigned int rate_iops_min[DDIR_RWDIR_CNT];
uint32_t rate[DDIR_RWDIR_CNT];
uint32_t ratemin[DDIR_RWDIR_CNT];
uint32_t ratecycle;
+ uint32_t io_submit_mode;
uint32_t rate_iops[DDIR_RWDIR_CNT];
uint32_t rate_iops_min[DDIR_RWDIR_CNT];
uint64_t latency_target;
uint64_t latency_window;
- uint32_t pad3;
fio_fp64_t latency_percentile;
uint32_t block_error_hist;
if (io_u->flags & IO_U_F_IN_CUR_DEPTH) {
td->cur_depth--;
- io_u->flags &= ~IO_U_F_IN_CUR_DEPTH;
+ io_u_clear(io_u, IO_U_F_IN_CUR_DEPTH);
}
flist_add_tail(&io_u->verify_list, &td->verify_list);
*io_u_ptr = NULL;
io_u->buflen = ipo->len;
io_u->numberio = ipo->numberio;
io_u->file = ipo->file;
- io_u->flags |= IO_U_F_VER_LIST;
+ io_u_set(io_u, IO_U_F_VER_LIST);
if (ipo->flags & IP_F_TRIMMED)
- io_u->flags |= IO_U_F_TRIMMED;
+ io_u_set(io_u, IO_U_F_TRIMMED);
if (!fio_file_open(io_u->file)) {
int r = td_io_open_file(td, io_u->file);
io_u = flist_first_entry(&list, struct io_u, verify_list);
flist_del_init(&io_u->verify_list);
- io_u->flags |= IO_U_F_NO_FILE_PUT;
+ io_u_set(io_u, IO_U_F_NO_FILE_PUT);
ret = verify_io_u(td, &io_u);
put_io_u(td, io_u);
--- /dev/null
+/*
+ * Rated submission helpers
+ *
+ * Copyright (C) 2015 Jens Axboe <axboe@kernel.dk>
+ *
+ */
+#include <unistd.h>
+
+#include "fio.h"
+#include "ioengine.h"
+#include "flist.h"
+#include "workqueue.h"
+#include "lib/getrusage.h"
+
+struct submit_worker {
+ pthread_t thread;
+ pthread_mutex_t lock;
+ pthread_cond_t cond;
+ struct flist_head work_list;
+ unsigned int flags;
+ unsigned int index;
+ uint64_t seq;
+ struct workqueue *wq;
+ struct thread_data td;
+};
+
+enum {
+ SW_F_IDLE = 1 << 0,
+ SW_F_RUNNING = 1 << 1,
+ SW_F_EXIT = 1 << 2,
+ SW_F_EXITED = 1 << 3,
+ SW_F_ACCOUNTED = 1 << 4,
+ SW_F_ERROR = 1 << 5,
+};
+
+static struct submit_worker *__get_submit_worker(struct workqueue *wq,
+ unsigned int start,
+ unsigned int end,
+ struct submit_worker **best)
+{
+ struct submit_worker *sw = NULL;
+
+ while (start <= end) {
+ sw = &wq->workers[start];
+ if (sw->flags & SW_F_IDLE)
+ return sw;
+ if (!(*best) || sw->seq < (*best)->seq)
+ *best = sw;
+ start++;
+ }
+
+ return NULL;
+}
+
+static struct submit_worker *get_submit_worker(struct workqueue *wq)
+{
+ unsigned int next = wq->next_free_worker;
+ struct submit_worker *sw, *best = NULL;
+
+ assert(next < wq->max_workers);
+
+ sw = __get_submit_worker(wq, next, wq->max_workers - 1, &best);
+ if (!sw && next)
+ sw = __get_submit_worker(wq, 0, next - 1, &best);
+
+ /*
+ * No truly idle found, use best match
+ */
+ if (!sw)
+ sw = best;
+
+ if (sw->index == wq->next_free_worker) {
+ if (sw->index + 1 < wq->max_workers)
+ wq->next_free_worker = sw->index + 1;
+ else
+ wq->next_free_worker = 0;
+ }
+
+ return sw;
+}
+
+static int all_sw_idle(struct workqueue *wq)
+{
+ int i;
+
+ for (i = 0; i < wq->max_workers; i++) {
+ struct submit_worker *sw = &wq->workers[i];
+
+ if (!(sw->flags & SW_F_IDLE))
+ return 0;
+ }
+
+ return 1;
+}
+
+/*
+ * Must be serialized wrt workqueue_enqueue() by caller
+ */
+void workqueue_flush(struct workqueue *wq)
+{
+ wq->wake_idle = 1;
+
+ while (!all_sw_idle(wq)) {
+ pthread_mutex_lock(&wq->flush_lock);
+ pthread_cond_wait(&wq->flush_cond, &wq->flush_lock);
+ pthread_mutex_unlock(&wq->flush_lock);
+ }
+
+ wq->wake_idle = 0;
+}
+
+/*
+ * Must be serialized by caller.
+ */
+int workqueue_enqueue(struct workqueue *wq, struct io_u *io_u)
+{
+ struct submit_worker *sw;
+
+ sw = get_submit_worker(wq);
+ if (sw) {
+ const enum fio_ddir ddir = acct_ddir(io_u);
+ struct thread_data *parent = wq->td;
+
+ if (ddir_rw(ddir)) {
+ parent->io_issues[ddir]++;
+ parent->io_issue_bytes[ddir] += io_u->xfer_buflen;
+ }
+
+ pthread_mutex_lock(&sw->lock);
+ flist_add_tail(&io_u->verify_list, &sw->work_list);
+ sw->seq = ++wq->work_seq;
+ sw->flags &= ~SW_F_IDLE;
+ pthread_mutex_unlock(&sw->lock);
+
+ pthread_cond_signal(&sw->cond);
+ return FIO_Q_QUEUED;
+ }
+
+ return FIO_Q_BUSY;
+}
+
+static void handle_list(struct submit_worker *sw, struct flist_head *list)
+{
+ struct workqueue *wq = sw->wq;
+ struct io_u *io_u;
+
+ while (!flist_empty(list)) {
+ io_u = flist_first_entry(list, struct io_u, verify_list);
+ flist_del_init(&io_u->verify_list);
+ wq->fn(&sw->td, io_u);
+ }
+}
+
+static int init_submit_worker(struct submit_worker *sw)
+{
+ struct thread_data *parent = sw->wq->td;
+ struct thread_data *td = &sw->td;
+ int fio_unused ret;
+
+ memcpy(&td->o, &parent->o, sizeof(td->o));
+ memcpy(&td->ts, &parent->ts, sizeof(td->ts));
+ td->o.uid = td->o.gid = -1U;
+ dup_files(td, parent);
+ fio_options_mem_dupe(td);
+
+ if (ioengine_load(td))
+ goto err;
+
+ if (td->o.odirect)
+ td->io_ops->flags |= FIO_RAWIO;
+
+ td->pid = gettid();
+
+ INIT_FLIST_HEAD(&td->io_log_list);
+ INIT_FLIST_HEAD(&td->io_hist_list);
+ INIT_FLIST_HEAD(&td->verify_list);
+ INIT_FLIST_HEAD(&td->trim_list);
+ INIT_FLIST_HEAD(&td->next_rand_list);
+ td->io_hist_tree = RB_ROOT;
+
+ td->o.iodepth = 1;
+ if (td_io_init(td))
+ goto err_io_init;
+
+ fio_gettime(&td->epoch, NULL);
+ fio_getrusage(&td->ru_start);
+ clear_io_state(td);
+
+ td_set_runstate(td, TD_RUNNING);
+ td->flags |= TD_F_CHILD;
+ td->parent = parent;
+ return 0;
+
+err_io_init:
+ close_ioengine(td);
+err:
+ return 1;
+}
+
+static void sum_val(uint64_t *dst, uint64_t *src)
+{
+ if (*src) {
+ __sync_fetch_and_add(dst, *src);
+ *src = 0;
+ }
+}
+
+static void sum_ddir(struct thread_data *dst, struct thread_data *src,
+ enum fio_ddir ddir)
+{
+ sum_val(&dst->io_bytes[ddir], &src->io_bytes[ddir]);
+ sum_val(&dst->io_blocks[ddir], &src->io_blocks[ddir]);
+ sum_val(&dst->this_io_blocks[ddir], &src->this_io_blocks[ddir]);
+ sum_val(&dst->this_io_bytes[ddir], &src->this_io_bytes[ddir]);
+ sum_val(&dst->bytes_done[ddir], &src->bytes_done[ddir]);
+}
+
+static void update_accounting(struct submit_worker *sw)
+{
+ struct thread_data *src = &sw->td;
+ struct thread_data *dst = sw->wq->td;
+
+ if (td_read(src))
+ sum_ddir(dst, src, DDIR_READ);
+ if (td_write(src))
+ sum_ddir(dst, src, DDIR_WRITE);
+ if (td_trim(src))
+ sum_ddir(dst, src, DDIR_TRIM);
+}
+
+static void *worker_thread(void *data)
+{
+ struct submit_worker *sw = data;
+ struct workqueue *wq = sw->wq;
+ struct thread_data *td = &sw->td;
+ unsigned int eflags = 0, ret;
+ FLIST_HEAD(local_list);
+
+ ret = init_submit_worker(sw);
+ pthread_mutex_lock(&sw->lock);
+ sw->flags |= SW_F_RUNNING;
+ if (ret)
+ sw->flags |= SW_F_ERROR;
+ pthread_mutex_unlock(&sw->lock);
+
+ pthread_mutex_lock(&wq->flush_lock);
+ pthread_cond_signal(&wq->flush_cond);
+ pthread_mutex_unlock(&wq->flush_lock);
+
+ if (sw->flags & SW_F_ERROR)
+ goto done;
+
+ while (1) {
+ pthread_mutex_lock(&sw->lock);
+
+ if (flist_empty(&sw->work_list)) {
+ if (sw->flags & SW_F_EXIT) {
+ pthread_mutex_unlock(&sw->lock);
+ break;
+ }
+
+ if (td->io_u_queued || td->cur_depth ||
+ td->io_u_in_flight) {
+ pthread_mutex_unlock(&sw->lock);
+ io_u_quiesce(td);
+ pthread_mutex_lock(&sw->lock);
+ }
+
+ /*
+ * We dropped and reaquired the lock, check
+ * state again.
+ */
+ if (!flist_empty(&sw->work_list))
+ goto handle_work;
+
+ if (sw->flags & SW_F_EXIT) {
+ pthread_mutex_unlock(&sw->lock);
+ break;
+ } else if (!(sw->flags & SW_F_IDLE)) {
+ sw->flags |= SW_F_IDLE;
+ wq->next_free_worker = sw->index;
+ if (wq->wake_idle)
+ pthread_cond_signal(&wq->flush_cond);
+ }
+ update_accounting(sw);
+ pthread_cond_wait(&sw->cond, &sw->lock);
+ } else {
+handle_work:
+ flist_splice_init(&sw->work_list, &local_list);
+ }
+ pthread_mutex_unlock(&sw->lock);
+ handle_list(sw, &local_list);
+ }
+
+ update_accounting(sw);
+
+done:
+ pthread_mutex_lock(&sw->lock);
+ sw->flags |= (SW_F_EXITED | eflags);
+ pthread_mutex_unlock(&sw->lock);
+ return NULL;
+}
+
+static void free_worker(struct submit_worker *sw)
+{
+ struct thread_data *td = &sw->td;
+
+ fio_options_free(td);
+ close_and_free_files(td);
+ if (td->io_ops)
+ close_ioengine(td);
+ td_set_runstate(td, TD_EXITED);
+
+ pthread_cond_destroy(&sw->cond);
+ pthread_mutex_destroy(&sw->lock);
+}
+
+static void shutdown_worker(struct submit_worker *sw, unsigned int *sum_cnt)
+{
+ struct thread_data *parent = sw->wq->td;
+
+ pthread_join(sw->thread, NULL);
+ (*sum_cnt)++;
+ sum_thread_stats(&parent->ts, &sw->td.ts, *sum_cnt);
+ free_worker(sw);
+}
+
+void workqueue_exit(struct workqueue *wq)
+{
+ unsigned int shutdown, sum_cnt = 0;
+ struct submit_worker *sw;
+ int i;
+
+ for (i = 0; i < wq->max_workers; i++) {
+ sw = &wq->workers[i];
+
+ pthread_mutex_lock(&sw->lock);
+ sw->flags |= SW_F_EXIT;
+ pthread_cond_signal(&sw->cond);
+ pthread_mutex_unlock(&sw->lock);
+ }
+
+ do {
+ shutdown = 0;
+ for (i = 0; i < wq->max_workers; i++) {
+ sw = &wq->workers[i];
+ if (sw->flags & SW_F_ACCOUNTED)
+ continue;
+ sw->flags |= SW_F_ACCOUNTED;
+ shutdown_worker(sw, &sum_cnt);
+ shutdown++;
+ }
+ } while (shutdown && shutdown != wq->max_workers);
+
+ free(wq->workers);
+ pthread_mutex_destroy(&wq->flush_lock);
+ pthread_cond_destroy(&wq->flush_cond);
+}
+
+static int start_worker(struct workqueue *wq, unsigned int index)
+{
+ struct submit_worker *sw = &wq->workers[index];
+ int ret;
+
+ INIT_FLIST_HEAD(&sw->work_list);
+ pthread_cond_init(&sw->cond, NULL);
+ pthread_mutex_init(&sw->lock, NULL);
+ sw->wq = wq;
+ sw->index = index;
+
+ ret = pthread_create(&sw->thread, NULL, worker_thread, sw);
+ if (!ret) {
+ pthread_mutex_lock(&sw->lock);
+ sw->flags = SW_F_IDLE;
+ pthread_mutex_unlock(&sw->lock);
+ return 0;
+ }
+
+ free_worker(sw);
+ return 1;
+}
+
+int workqueue_init(struct thread_data *td, struct workqueue *wq,
+ workqueue_fn *fn, unsigned max_pending)
+{
+ unsigned int running;
+ int i, error;
+
+ wq->max_workers = max_pending;
+ wq->td = td;
+ wq->fn = fn;
+ wq->work_seq = 0;
+ wq->next_free_worker = 0;
+ pthread_cond_init(&wq->flush_cond, NULL);
+ pthread_mutex_init(&wq->flush_lock, NULL);
+
+ wq->workers = calloc(wq->max_workers, sizeof(struct submit_worker));
+
+ for (i = 0; i < wq->max_workers; i++)
+ if (start_worker(wq, i))
+ break;
+
+ wq->max_workers = i;
+ if (!wq->max_workers) {
+err:
+ log_err("Can't create rate workqueue\n");
+ td_verror(td, ESRCH, "workqueue_init");
+ workqueue_exit(wq);
+ return 1;
+ }
+
+ /*
+ * Wait for them all to be started and initialized
+ */
+ error = 0;
+ do {
+ struct submit_worker *sw;
+
+ running = 0;
+ pthread_mutex_lock(&wq->flush_lock);
+ for (i = 0; i < wq->max_workers; i++) {
+ sw = &wq->workers[i];
+ pthread_mutex_lock(&sw->lock);
+ if (sw->flags & SW_F_RUNNING)
+ running++;
+ if (sw->flags & SW_F_ERROR)
+ error++;
+ pthread_mutex_unlock(&sw->lock);
+ }
+
+ if (error || running == wq->max_workers) {
+ pthread_mutex_unlock(&wq->flush_lock);
+ break;
+ }
+
+ pthread_cond_wait(&wq->flush_cond, &wq->flush_lock);
+ pthread_mutex_unlock(&wq->flush_lock);
+ } while (1);
+
+ if (error)
+ goto err;
+
+ return 0;
+}
--- /dev/null
+#ifndef FIO_RATE_H
+#define FIO_RATE_H
+
+#include "flist.h"
+
+typedef void (workqueue_fn)(struct thread_data *, struct io_u *);
+
+struct workqueue {
+ unsigned int max_workers;
+
+ struct thread_data *td;
+ workqueue_fn *fn;
+
+ uint64_t work_seq;
+ struct submit_worker *workers;
+ unsigned int next_free_worker;
+
+ pthread_cond_t flush_cond;
+ pthread_mutex_t flush_lock;
+ volatile int wake_idle;
+};
+
+int workqueue_init(struct thread_data *td, struct workqueue *wq, workqueue_fn *fn, unsigned int max_workers);
+void workqueue_exit(struct workqueue *wq);
+
+int workqueue_enqueue(struct workqueue *wq, struct io_u *io_u);
+void workqueue_flush(struct workqueue *wq);
+
+#endif