before quitting on a block verification failure. If this
option is set, fio will exit the job on the first observed
failure.
+
+verify_async=int Fio will normally verify IO inline from the submitting
+ thread. This option takes an integer describing how many
+ async offload threads to create for IO verification instead,
+ causing fio to offload the duty of verifying IO contents
+ to one or more separate threads.
+
+verify_async_cpus=str Tell fio to set the given CPU affinity on the
+ async IO verification threads. See cpus_allowed for the
+ format used.
stonewall Wait for preceeding jobs in the job file to exit, before
starting this one. Can be used to insert serialization
extern int add_file(struct thread_data *, const char *);
extern void get_file(struct fio_file *);
extern int __must_check put_file(struct thread_data *, struct fio_file *);
+extern void put_file_log(struct thread_data *, struct fio_file *);
extern void lock_file(struct thread_data *, struct fio_file *, enum fio_ddir);
extern void unlock_file(struct thread_data *, struct fio_file *);
extern void unlock_file_all(struct thread_data *, struct fio_file *);
If true, exit the job on the first observed verification failure. Default:
false.
.TP
+.BI verify_async \fR=\fPint
+Fio will normally verify IO inline from the submitting thread. This option
+takes an integer describing how many async offload threads to create for IO
+verification instead, causing fio to offload the duty of verifying IO contents
+to one or more separate threads.
+.TP
+.BI verify_async_cpus \fR=\fPstr
+Tell fio to set the given CPU affinity on the async IO verification threads.
+See \fBcpus_allowed\fP for the format used.
+.TP
.B stonewall
Wait for preceeding jobs in the job file to exit before starting this one.
\fBstonewall\fR implies \fBnew_group\fR.
break;
}
- io_u->end_io = verify_io_u;
+ if (td->o.verify_async)
+ io_u->end_io = verify_io_u_async;
+ else
+ io_u->end_io = verify_io_u;
ret = td_io_queue(td, io_u);
switch (ret) {
* a previously written file.
*/
if (td->o.verify != VERIFY_NONE && io_u->ddir == DDIR_READ) {
- io_u->end_io = verify_io_u;
+ if (td->o.verify_async)
+ io_u->end_io = verify_io_u_async;
+ else
+ io_u->end_io = verify_io_u;
td_set_runstate(td, TD_VERIFYING);
} else if (in_ramp_time(td))
td_set_runstate(td, TD_RAMP);
{
unsigned long long runtime[2], elapsed;
struct thread_data *td = data;
+ pthread_condattr_t attr;
int clear_state;
if (!td->o.use_thread)
INIT_FLIST_HEAD(&td->io_u_requeues);
INIT_FLIST_HEAD(&td->io_log_list);
INIT_FLIST_HEAD(&td->io_hist_list);
+ INIT_FLIST_HEAD(&td->verify_list);
+ pthread_mutex_init(&td->io_u_lock, NULL);
td->io_hist_tree = RB_ROOT;
+ pthread_condattr_init(&attr);
+ pthread_cond_init(&td->verify_cond, &attr);
+ pthread_cond_init(&td->free_cond, &attr);
+
td_set_runstate(td, TD_INITIALIZED);
dprint(FD_MUTEX, "up startup_mutex\n");
fio_mutex_up(startup_mutex);
if (init_io_u(td))
goto err;
- if (td->o.cpumask_set && fio_setaffinity(td) == -1) {
+ if (td->o.verify_async && verify_async_init(td))
+ goto err;
+
+ if (td->o.cpumask_set && fio_setaffinity(td->pid, td->o.cpumask) == -1) {
td_verror(td, errno, "cpu_set_affinity");
goto err;
}
*/
if (td->o.gtod_cpu) {
fio_cpu_clear(&td->o.cpumask, td->o.gtod_cpu);
- if (fio_setaffinity(td) == -1) {
+ if (fio_setaffinity(td->pid, td->o.cpumask) == -1) {
td_verror(td, errno, "cpu_set_affinity");
goto err;
}
td_verror(td, ret, "fio_cpuset_exit");
}
+ if (td->o.verify_async)
+ verify_async_exit(td);
+
/*
* do this very late, it will log file closing as well
*/
unsigned int verify_pattern;
unsigned int verify_pattern_bytes;
unsigned int verify_fatal;
+ unsigned int verify_async;
unsigned int use_thread;
unsigned int unlink;
unsigned int do_disk_util;
unsigned int numjobs;
os_cpu_mask_t cpumask;
unsigned int cpumask_set;
+ os_cpu_mask_t verify_cpumask;
+ unsigned int verify_cpumask_set;
unsigned int iolog;
unsigned int rwmixcycle;
unsigned int rwmix[2];
struct flist_head io_u_freelist;
struct flist_head io_u_busylist;
struct flist_head io_u_requeues;
+ pthread_mutex_t io_u_lock;
+ pthread_cond_t free_cond;
+
+ /*
+ * async verify offload
+ */
+ struct flist_head verify_list;
+ pthread_t *verify_threads;
+ unsigned int nr_verify_threads;
+ pthread_cond_t verify_cond;
+ int verify_thread_exit;
/*
* Rate state
return (val != 0 && ((val & (val - 1)) == 0));
}
+/*
+ * We currently only need to do locking if we have verifier threads
+ * accessing our internal structures too
+ */
+static inline void td_io_u_lock(struct thread_data *td)
+{
+ if (td->o.verify_async)
+ pthread_mutex_lock(&td->io_u_lock);
+}
+
+static inline void td_io_u_unlock(struct thread_data *td)
+{
+ if (td->o.verify_async)
+ pthread_mutex_unlock(&td->io_u_lock);
+}
+
+static inline void td_io_u_free_notify(struct thread_data *td)
+{
+ if (td->o.verify_async)
+ pthread_cond_signal(&td->free_cond);
+}
+
#endif
return td->rwmix_ddir;
}
-static void put_file_log(struct thread_data *td, struct fio_file *f)
+void put_file_log(struct thread_data *td, struct fio_file *f)
{
int ret = put_file(td, f);
void put_io_u(struct thread_data *td, struct io_u *io_u)
{
+ td_io_u_lock(td);
+
assert((io_u->flags & IO_U_F_FREE) == 0);
io_u->flags |= IO_U_F_FREE;
+ io_u->flags &= ~IO_U_F_FREE_DEF;
if (io_u->file)
put_file_log(td, io_u->file);
io_u->file = NULL;
- flist_del(&io_u->list);
+ flist_del_init(&io_u->list);
flist_add(&io_u->list, &td->io_u_freelist);
td->cur_depth--;
+ td_io_u_unlock(td);
+ td_io_u_free_notify(td);
}
void clear_io_u(struct thread_data *td, struct io_u *io_u)
dprint(FD_IO, "requeue %p\n", __io_u);
+ td_io_u_lock(td);
+
__io_u->flags |= IO_U_F_FREE;
if ((__io_u->flags & IO_U_F_FLIGHT) && !ddir_sync(__io_u->ddir))
td->io_issues[__io_u->ddir]--;
flist_del(&__io_u->list);
flist_add_tail(&__io_u->list, &td->io_u_requeues);
td->cur_depth--;
+ td_io_u_unlock(td);
*io_u = NULL;
}
{
struct io_u *io_u = NULL;
+ td_io_u_lock(td);
+
+again:
if (!flist_empty(&td->io_u_requeues))
io_u = flist_entry(td->io_u_requeues.next, struct io_u, list);
else if (!queue_full(td)) {
io_u->end_io = NULL;
}
+ /*
+ * We ran out, wait for async verify threads to finish and return one
+ */
+ if (!io_u && td->o.verify_async) {
+ pthread_cond_wait(&td->free_cond, &td->io_u_lock);
+ goto again;
+ }
+
if (io_u) {
assert(io_u->flags & IO_U_F_FREE);
io_u->flags &= ~IO_U_F_FREE;
+ io_u->flags &= ~IO_U_F_FREE_DEF;
io_u->error = 0;
flist_del(&io_u->list);
td->cur_depth++;
}
+ td_io_u_unlock(td);
return io_u;
}
io_u = td->io_ops->event(td, i);
io_completed(td, io_u, icd);
- put_io_u(td, io_u);
+
+ if (!(io_u->flags & IO_U_F_FREE_DEF))
+ put_io_u(td, io_u);
}
}
init_icd(td, &icd, 1);
io_completed(td, io_u, &icd);
- put_io_u(td, io_u);
+
+ if (!(io_u->flags & IO_U_F_FREE_DEF))
+ put_io_u(td, io_u);
if (icd.error) {
td_verror(td, icd.error, "io_u_sync_complete");
enum {
IO_U_F_FREE = 1 << 0,
IO_U_F_FLIGHT = 1 << 1,
+ IO_U_F_FREE_DEF = 1 << 2,
};
/*
return 0;
}
-static int str_cpus_allowed_cb(void *data, const char *input)
+static int set_cpus_allowed(struct thread_data *td, os_cpu_mask_t *mask,
+ const char *input)
{
- struct thread_data *td = data;
char *cpu, *str, *p;
long max_cpu;
int ret = 0;
- ret = fio_cpuset_init(&td->o.cpumask);
+ ret = fio_cpuset_init(mask);
if (ret < 0) {
log_err("fio: cpuset_init failed\n");
td_verror(td, ret, "fio_cpuset_init");
}
dprint(FD_PARSE, "set cpu allowed %d\n", icpu);
- fio_cpu_set(&td->o.cpumask, icpu);
+ fio_cpu_set(mask, icpu);
icpu++;
}
if (ret)
td->o.cpumask_set = 1;
return ret;
}
+
+static int str_cpus_allowed_cb(void *data, const char *input)
+{
+ struct thread_data *td = data;
+ int ret;
+
+ ret = set_cpus_allowed(td, &td->o.cpumask, input);
+ if (!ret)
+ td->o.cpumask_set = 1;
+
+ return ret;
+}
+
+static int str_verify_cpus_allowed_cb(void *data, const char *input)
+{
+ struct thread_data *td = data;
+ int ret;
+
+ ret = set_cpus_allowed(td, &td->o.verify_cpumask, input);
+ if (!ret)
+ td->o.verify_cpumask_set = 1;
+
+ return ret;
+}
#endif
static int str_fst_cb(void *data, const char *str)
.help = "Exit on a single verify failure, don't continue",
.parent = "verify",
},
+ {
+ .name = "verify_async",
+ .type = FIO_OPT_INT,
+ .off1 = td_var_offset(verify_async),
+ .def = "0",
+ .help = "Number of async verifier threads to use",
+ .parent = "verify",
+ },
+#ifdef FIO_HAVE_CPU_AFFINITY
+ {
+ .name = "verify_async_cpus",
+ .type = FIO_OPT_STR,
+ .cb = str_verify_cpus_allowed_cb,
+ .help = "Set CPUs allowed for async verify threads",
+ .parent = "verify_async",
+ },
+#endif
{
.name = "write_iolog",
.type = FIO_OPT_STR_STORE,
* the affinity helpers to work.
*/
#ifndef GLIBC_2_3_2
-#define fio_setaffinity(td) \
- sched_setaffinity((td)->pid, sizeof((td)->o.cpumask), &(td)->o.cpumask)
+#define fio_setaffinity(pid, cpumask) \
+ sched_setaffinity((pid), sizeof(cpumask), &(cpumask))
#define fio_getaffinity(pid, ptr) \
sched_getaffinity((pid), sizeof(cpu_set_t), (ptr))
#else
-#define fio_setaffinity(td) \
- sched_setaffinity((td)->pid, &(td)->o.cpumask)
+#define fio_setaffinity(pid, cpumask) \
+ sched_setaffinity((pid), &(cpumask))
#define fio_getaffinity(pid, ptr) \
sched_getaffinity((pid), (ptr))
#endif
/*
* pset binding hooks for fio
*/
-#define fio_setaffinity(td) \
- pset_bind((td)->o.cpumask, P_PID, (td)->pid, NULL)
+#define fio_setaffinity(pid, cpumask) \
+ pset_bind(&(cpumask), P_PID, (pid), NULL)
#define fio_getaffinity(pid, ptr) ({ 0; })
#define fio_cpu_clear(mask, cpu) pset_assign(PS_NONE, (cpu), NULL)
#endif /* FIO_HAVE_FADVISE */
#ifndef FIO_HAVE_CPU_AFFINITY
-#define fio_setaffinity(td) (0)
+#define fio_setaffinity(pid, mask) (0)
#define fio_getaffinity(pid, mask) do { } while (0)
#define fio_cpu_clear(mask, cpu) do { } while (0)
#define fio_cpuset_exit(mask) (-1)
#include <fcntl.h>
#include <string.h>
#include <assert.h>
+#include <pthread.h>
#include "fio.h"
#include "verify.h"
+#include "smalloc.h"
#include "crc/md5.h"
#include "crc/crc64.h"
return 0;
}
+/*
+ * Push IO verification to a separate thread
+ */
+int verify_io_u_async(struct thread_data *td, struct io_u *io_u)
+{
+ if (io_u->file)
+ put_file_log(td, io_u->file);
+
+ io_u->file = NULL;
+
+ pthread_mutex_lock(&td->io_u_lock);
+ flist_del(&io_u->list);
+ flist_add_tail(&io_u->list, &td->verify_list);
+ pthread_mutex_unlock(&td->io_u_lock);
+
+ pthread_cond_signal(&td->verify_cond);
+ io_u->flags |= IO_U_F_FREE_DEF;
+ return 0;
+}
+
int verify_io_u(struct thread_data *td, struct io_u *io_u)
{
struct verify_header *hdr;
dprint(FD_VERIFY, "get_next_verify: empty\n");
return 1;
}
+
+static void *verify_async_thread(void *data)
+{
+ struct thread_data *td = data;
+ struct io_u *io_u;
+ int ret = 0;
+
+ if (td->o.verify_cpumask_set &&
+ fio_setaffinity(td->pid, td->o.verify_cpumask)) {
+ log_err("fio: failed setting verify thread affinity\n");
+ goto done;
+ }
+
+ do {
+ read_barrier();
+ if (td->verify_thread_exit)
+ break;
+
+ pthread_mutex_lock(&td->io_u_lock);
+
+ while (flist_empty(&td->verify_list) &&
+ !td->verify_thread_exit) {
+ ret = pthread_cond_wait(&td->verify_cond, &td->io_u_lock);
+ if (ret) {
+ pthread_mutex_unlock(&td->io_u_lock);
+ break;
+ }
+ }
+
+ if (flist_empty(&td->verify_list)) {
+ pthread_mutex_unlock(&td->io_u_lock);
+ continue;
+ }
+
+ io_u = flist_entry(td->verify_list.next, struct io_u, list);
+ flist_del_init(&io_u->list);
+ pthread_mutex_unlock(&td->io_u_lock);
+
+ ret = verify_io_u(td, io_u);
+ put_io_u(td, io_u);
+ } while (!ret);
+
+done:
+ pthread_mutex_lock(&td->io_u_lock);
+ td->nr_verify_threads--;
+ pthread_mutex_unlock(&td->io_u_lock);
+
+ pthread_cond_signal(&td->free_cond);
+ return NULL;
+}
+
+int verify_async_init(struct thread_data *td)
+{
+ int i, ret;
+
+ td->verify_thread_exit = 0;
+
+ td->verify_threads = malloc(sizeof(pthread_t) * td->o.verify_async);
+ for (i = 0; i < td->o.verify_async; i++) {
+ ret = pthread_create(&td->verify_threads[i], NULL,
+ verify_async_thread, td);
+ if (ret) {
+ log_err("fio: async verify creation failed: %s\n",
+ strerror(ret));
+ break;
+ }
+ ret = pthread_detach(td->verify_threads[i]);
+ if (ret) {
+ log_err("fio: async verify thread detach failed: %s\n",
+ strerror(ret));
+ break;
+ }
+ td->nr_verify_threads++;
+ }
+
+ if (i != td->o.verify_async) {
+ td->verify_thread_exit = 1;
+ write_barrier();
+ pthread_cond_broadcast(&td->verify_cond);
+ return 1;
+ }
+
+ return 0;
+}
+
+void verify_async_exit(struct thread_data *td)
+{
+ td->verify_thread_exit = 1;
+ write_barrier();
+ pthread_cond_broadcast(&td->verify_cond);
+
+ pthread_mutex_lock(&td->io_u_lock);
+
+ while (td->nr_verify_threads)
+ pthread_cond_wait(&td->free_cond, &td->io_u_lock);
+
+ pthread_mutex_unlock(&td->io_u_lock);
+ free(td->verify_threads);
+ td->verify_threads = NULL;
+}
extern void populate_verify_io_u(struct thread_data *, struct io_u *);
extern int __must_check get_next_verify(struct thread_data *td, struct io_u *);
extern int __must_check verify_io_u(struct thread_data *, struct io_u *);
+extern int verify_io_u_async(struct thread_data *, struct io_u *);
+
+/*
+ * Async verify offload
+ */
+extern int verify_async_init(struct thread_data *);
+extern void verify_async_exit(struct thread_data *);
#endif