X-Git-Url: https://git.kernel.dk/?p=fio.git;a=blobdiff_plain;f=backend.c;h=3df0133f3e56d1c2648580fccaf1b870847992f4;hp=c0b446ac8d98d706fdb042e64e51715343c79f73;hb=3c6c5a29a74cf5727c0572b82907c330c38cea61;hpb=8a1db9a16e1075498e5d6166fa46b419cecd8af8 diff --git a/backend.c b/backend.c index c0b446ac..3df0133f 100644 --- a/backend.c +++ b/backend.c @@ -418,6 +418,34 @@ static void check_update_rusage(struct thread_data *td) } } +static int wait_for_completions(struct thread_data *td, struct timeval *time, + uint64_t *bytes_done) +{ + const int full = queue_full(td); + int min_evts = 0; + int ret; + + /* + * if the queue is full, we MUST reap at least 1 event + */ + min_evts = min(td->o.iodepth_batch_complete, td->cur_depth); + if (full && !min_evts) + min_evts = 1; + + if (time && (__should_check_rate(td, DDIR_READ) || + __should_check_rate(td, DDIR_WRITE) || + __should_check_rate(td, DDIR_TRIM))) + fio_gettime(time, NULL); + + do { + ret = io_u_queued_complete(td, min_evts, bytes_done); + if (ret < 0) + break; + } while (full && (td->cur_depth > td->o.iodepth_low)); + + return ret; +} + /* * The main verify engine. Runs over the writes we previously submitted, * reads the blocks back in, and checks the crc/md5 of the data. @@ -526,6 +554,11 @@ static void do_verify(struct thread_data *td, uint64_t verify_bytes) break; } + if (verify_state_should_stop(td, io_u)) { + put_io_u(td, io_u); + break; + } + if (td->o.verify_async) io_u->end_io = verify_io_u_async; else @@ -594,27 +627,9 @@ sync_done: */ reap: full = queue_full(td) || (ret == FIO_Q_BUSY && td->cur_depth); - if (full || !td->o.iodepth_batch_complete) { - min_events = min(td->o.iodepth_batch_complete, - td->cur_depth); - /* - * if the queue is full, we MUST reap at least 1 event - */ - if (full && !min_events) - min_events = 1; + if (full || !td->o.iodepth_batch_complete) + ret = wait_for_completions(td, NULL, bytes_done); - do { - /* - * Reap required number of io units, if any, - * and do the verification on them through - * the callback handler - */ - if (io_u_queued_complete(td, min_events, bytes_done) < 0) { - ret = -1; - break; - } - } while (full && (td->cur_depth > td->o.iodepth_low)); - } if (ret < 0) break; } @@ -702,7 +717,6 @@ static uint64_t do_io(struct thread_data *td) (!flist_empty(&td->trim_list)) || !io_bytes_exceeded(td) || td->o.time_based) { struct timeval comp_time; - int min_evts = 0; struct io_u *io_u; int ret2, full; enum fio_ddir ddir; @@ -758,6 +772,11 @@ static uint64_t do_io(struct thread_data *td) io_u->rand_seed *= __rand(&td->verify_state); } + if (verify_state_should_stop(td, io_u)) { + put_io_u(td, io_u); + break; + } + if (td->o.verify_async) io_u->end_io = verify_io_u_async; else @@ -861,28 +880,8 @@ sync_done: */ reap: full = queue_full(td) || (ret == FIO_Q_BUSY && td->cur_depth); - if (full || !td->o.iodepth_batch_complete) { - min_evts = min(td->o.iodepth_batch_complete, - td->cur_depth); - /* - * if the queue is full, we MUST reap at least 1 event - */ - if (full && !min_evts) - min_evts = 1; - - if (__should_check_rate(td, DDIR_READ) || - __should_check_rate(td, DDIR_WRITE) || - __should_check_rate(td, DDIR_TRIM)) - fio_gettime(&comp_time, NULL); - - do { - ret = io_u_queued_complete(td, min_evts, bytes_done); - if (ret < 0) - break; - - } while (full && (td->cur_depth > td->o.iodepth_low)); - } - + if (full || !td->o.iodepth_batch_complete) + ret = wait_for_completions(td, &comp_time, bytes_done); if (ret < 0) break; if (!ddir_rw_sum(bytes_done) && !(td->io_ops->flags & FIO_NOIO)) @@ -977,6 +976,9 @@ static void cleanup_io_u(struct thread_data *td) io_u_rexit(&td->io_u_requeues); io_u_qexit(&td->io_u_freelist); io_u_qexit(&td->io_u_all); + + if (td->last_write_comp) + sfree(td->last_write_comp); } static int init_io_u(struct thread_data *td) @@ -1093,6 +1095,14 @@ static int init_io_u(struct thread_data *td) p += max_bs; } + if (td->o.verify != VERIFY_NONE) { + td->last_write_comp = scalloc(max_units, sizeof(uint64_t)); + if (!td->last_write_comp) { + log_err("fio: failed to alloc write comp data\n"); + return 1; + } + } + return 0; } @@ -1324,7 +1334,7 @@ static void *thread_main(void *data) * Set affinity first, in case it has an impact on the memory * allocations. */ - if (o->cpumask_set) { + if (fio_option_is_set(o, cpumask)) { if (o->cpus_allowed_policy == FIO_CPUS_SPLIT) { ret = fio_cpus_split(&o->cpumask, td->thread_number - 1); if (!ret) { @@ -1343,7 +1353,8 @@ static void *thread_main(void *data) #ifdef CONFIG_LIBNUMA /* numa node setup */ - if (o->numa_cpumask_set || o->numa_memmask_set) { + if (fio_option_is_set(o, numa_cpunodes) || + fio_option_is_set(o, numa_memnodes)) { struct bitmask *mask; if (numa_available() < 0) { @@ -1351,7 +1362,7 @@ static void *thread_main(void *data) goto err; } - if (o->numa_cpumask_set) { + if (fio_option_is_set(o, numa_cpunodes)) { mask = numa_parse_nodestring(o->numa_cpunodes); ret = numa_run_on_node_mask(mask); numa_free_nodemask(mask); @@ -1362,8 +1373,7 @@ static void *thread_main(void *data) } } - if (o->numa_memmask_set) { - + if (fio_option_is_set(o, numa_memnodes)) { mask = NULL; if (o->numa_memnodes) mask = numa_parse_nodestring(o->numa_memnodes); @@ -1409,7 +1419,8 @@ static void *thread_main(void *data) if (o->verify_async && verify_async_init(td)) goto err; - if (o->ioprio) { + if (fio_option_is_set(o, ioprio) || + fio_option_is_set(o, ioprio_class)) { ret = ioprio_set(IOPRIO_WHO_PROCESS, 0, o->ioprio_class, o->ioprio); if (ret == -1) { td_verror(td, errno, "ioprio_set"); @@ -1484,18 +1495,21 @@ static void *thread_main(void *data) clear_state = 1; + fio_mutex_down(stat_mutex); if (td_read(td) && td->io_bytes[DDIR_READ]) { - elapsed = utime_since_now(&td->start); + elapsed = mtime_since_now(&td->start); td->ts.runtime[DDIR_READ] += elapsed; } if (td_write(td) && td->io_bytes[DDIR_WRITE]) { - elapsed = utime_since_now(&td->start); + elapsed = mtime_since_now(&td->start); td->ts.runtime[DDIR_WRITE] += elapsed; } if (td_trim(td) && td->io_bytes[DDIR_TRIM]) { - elapsed = utime_since_now(&td->start); + elapsed = mtime_since_now(&td->start); td->ts.runtime[DDIR_TRIM] += elapsed; } + fio_gettime(&td->start, NULL); + fio_mutex_up(stat_mutex); if (td->error || td->terminate) break; @@ -1511,21 +1525,33 @@ static void *thread_main(void *data) do_verify(td, verify_bytes); - td->ts.runtime[DDIR_READ] += utime_since_now(&td->start); + fio_mutex_down(stat_mutex); + td->ts.runtime[DDIR_READ] += mtime_since_now(&td->start); + fio_gettime(&td->start, NULL); + fio_mutex_up(stat_mutex); if (td->error || td->terminate) break; } update_rusage_stat(td); - td->ts.runtime[DDIR_READ] = (td->ts.runtime[DDIR_READ] + 999) / 1000; - td->ts.runtime[DDIR_WRITE] = (td->ts.runtime[DDIR_WRITE] + 999) / 1000; - td->ts.runtime[DDIR_TRIM] = (td->ts.runtime[DDIR_TRIM] + 999) / 1000; td->ts.total_run_time = mtime_since_now(&td->epoch); td->ts.io_bytes[DDIR_READ] = td->io_bytes[DDIR_READ]; td->ts.io_bytes[DDIR_WRITE] = td->io_bytes[DDIR_WRITE]; td->ts.io_bytes[DDIR_TRIM] = td->io_bytes[DDIR_TRIM]; + if (td->o.verify_state_save && !(td->flags & TD_F_VSTATE_SAVED) && + (td->o.verify != VERIFY_NONE && td_write(td))) { + struct all_io_list *state; + size_t sz; + + state = get_all_io_list(td->thread_number, &sz); + if (state) { + __verify_save_state(state, "local"); + free(state); + } + } + fio_unpin_memory(td); fio_writeout_logs(td); @@ -1551,8 +1577,9 @@ err: cleanup_io_u(td); close_ioengine(td); cgroup_shutdown(td, &cgroup_mnt); + verify_free_state(td); - if (o->cpumask_set) { + if (fio_option_is_set(o, cpumask)) { ret = fio_cpuset_exit(&o->cpumask); if (ret) td_verror(td, ret, "fio_cpuset_exit"); @@ -1727,9 +1754,80 @@ reaped: fio_terminate_threads(TERMINATE_ALL); } +static int __check_trigger_file(void) +{ + struct stat sb; + + if (!trigger_file) + return 0; + + if (stat(trigger_file, &sb)) + return 0; + + if (unlink(trigger_file) < 0) + log_err("fio: failed to unlink %s: %s\n", trigger_file, + strerror(errno)); + + return 1; +} + +static int trigger_timedout(void) +{ + if (trigger_timeout) + return time_since_genesis() >= trigger_timeout; + + return 0; +} + +void exec_trigger(const char *cmd) +{ + int ret; + + if (!cmd) + return; + + ret = system(cmd); + if (ret == -1) + log_err("fio: failed executing %s trigger\n", cmd); +} + +void check_trigger_file(void) +{ + if (__check_trigger_file() || trigger_timedout()) { + if (nr_clients) + fio_clients_send_trigger(trigger_remote_cmd); + else { + verify_save_state(); + fio_terminate_threads(TERMINATE_ALL); + exec_trigger(trigger_cmd); + } + } +} + +static int fio_verify_load_state(struct thread_data *td) +{ + int ret; + + if (!td->o.verify_state) + return 0; + + if (is_backend) { + void *data; + + ret = fio_server_get_verify_state(td->o.name, + td->thread_number - 1, &data); + if (!ret) + verify_convert_assign_state(td, data); + } else + ret = verify_load_state(td, "local"); + + return ret; +} + static void do_usleep(unsigned int usecs) { check_for_running_stats(); + check_trigger_file(); usleep(usecs); } @@ -1783,12 +1881,16 @@ static void run_threads(void) if (!td->o.create_serialize) continue; + if (fio_verify_load_state(td)) + goto reap; + /* * do file setup here so it happens sequentially, * we don't want X number of threads getting their * client data interspersed on disk */ if (setup_files(td)) { +reap: exit_value++; if (td->error) log_err("fio: pid=%d, err=%d/%s\n", @@ -2002,12 +2104,20 @@ static void *helper_thread_main(void *data) uint64_t sec = DISK_UTIL_MSEC / 1000; uint64_t nsec = (DISK_UTIL_MSEC % 1000) * 1000000; struct timespec ts; + +#if defined(CONFIG_CLOCK_MONOTONIC) + clock_gettime(CLOCK_MONOTONIC, &ts); + ts.tv_sec += sec; + ts.tv_nsec += nsec; +#else struct timeval tv; gettimeofday(&tv, NULL); ts.tv_sec = tv.tv_sec + sec; ts.tv_nsec = (tv.tv_usec * 1000) + nsec; - if (ts.tv_nsec > 1000000000ULL) { +#endif + + if (ts.tv_nsec >= 1000000000ULL) { ts.tv_nsec -= 1000000000ULL; ts.tv_sec++; } @@ -2102,8 +2212,10 @@ int fio_backend(void) for_each_td(td, i) { fio_options_free(td); - fio_mutex_remove(td->rusage_sem); - td->rusage_sem = NULL; + if (td->rusage_sem) { + fio_mutex_remove(td->rusage_sem); + td->rusage_sem = NULL; + } } free_disk_util();