X-Git-Url: https://git.kernel.dk/?p=fio.git;a=blobdiff_plain;f=backend.c;h=fb7dc68a92084962b3f522741810452a098aa216;hp=933d84144f5caec32d7c0d5f8c7168dbc140f714;hb=HEAD;hpb=f0547200d6d48dd847cae48f8db08cda0a38fd89 diff --git a/backend.c b/backend.c index 933d8414..fe03eab3 100644 --- a/backend.c +++ b/backend.c @@ -49,6 +49,7 @@ #include "helper_thread.h" #include "pshared.h" #include "zone-dist.h" +#include "fio_time.h" static struct fio_sem *startup_sem; static struct flist_head *cgroup_list; @@ -90,6 +91,22 @@ static void sig_int(int sig) } } +#ifdef WIN32 +static void sig_break(int sig) +{ + sig_int(sig); + + /** + * Windows terminates all job processes on SIGBREAK after the handler + * returns, so give them time to wrap-up and give stats + */ + for_each_td(td) { + while (td->runstate < TD_EXITED) + sleep(1); + } end_for_each(); +} +#endif + void sig_show_status(int sig) { show_running_run_stats(); @@ -112,7 +129,7 @@ static void set_sig_handlers(void) /* Windows uses SIGBREAK as a quit signal from other applications */ #ifdef WIN32 memset(&act, 0, sizeof(act)); - act.sa_handler = sig_int; + act.sa_handler = sig_break; act.sa_flags = SA_RESTART; sigaction(SIGBREAK, &act, NULL); #endif @@ -136,13 +153,10 @@ static void set_sig_handlers(void) static bool __check_min_rate(struct thread_data *td, struct timespec *now, enum fio_ddir ddir) { - unsigned long long bytes = 0; - unsigned long iops = 0; - unsigned long spent; - unsigned long long rate; - unsigned long long ratemin = 0; - unsigned int rate_iops = 0; - unsigned int rate_iops_min = 0; + unsigned long long current_rate_check_bytes = td->this_io_bytes[ddir]; + unsigned long current_rate_check_blocks = td->this_io_blocks[ddir]; + unsigned long long option_rate_bytes_min = td->o.ratemin[ddir]; + unsigned int option_rate_iops_min = td->o.rate_iops_min[ddir]; assert(ddir_rw(ddir)); @@ -155,68 +169,44 @@ static bool __check_min_rate(struct thread_data *td, struct timespec *now, if (mtime_since(&td->start, now) < 2000) return false; - iops += td->this_io_blocks[ddir]; - bytes += td->this_io_bytes[ddir]; - ratemin += td->o.ratemin[ddir]; - rate_iops += td->o.rate_iops[ddir]; - rate_iops_min += td->o.rate_iops_min[ddir]; - /* - * if rate blocks is set, sample is running + * if last_rate_check_blocks or last_rate_check_bytes is set, + * we can compute a rate per ratecycle */ - if (td->rate_bytes[ddir] || td->rate_blocks[ddir]) { - spent = mtime_since(&td->lastrate[ddir], now); - if (spent < td->o.ratecycle) + if (td->last_rate_check_bytes[ddir] || td->last_rate_check_blocks[ddir]) { + unsigned long spent = mtime_since(&td->last_rate_check_time[ddir], now); + if (spent < td->o.ratecycle || spent==0) return false; - if (td->o.rate[ddir] || td->o.ratemin[ddir]) { + if (td->o.ratemin[ddir]) { /* * check bandwidth specified rate */ - if (bytes < td->rate_bytes[ddir]) { - log_err("%s: rate_min=%lluB/s not met, only transferred %lluB\n", - td->o.name, ratemin, bytes); + unsigned long long current_rate_bytes = + ((current_rate_check_bytes - td->last_rate_check_bytes[ddir]) * 1000) / spent; + if (current_rate_bytes < option_rate_bytes_min) { + log_err("%s: rate_min=%lluB/s not met, got %lluB/s\n", + td->o.name, option_rate_bytes_min, current_rate_bytes); return true; - } else { - if (spent) - rate = ((bytes - td->rate_bytes[ddir]) * 1000) / spent; - else - rate = 0; - - if (rate < ratemin || - bytes < td->rate_bytes[ddir]) { - log_err("%s: rate_min=%lluB/s not met, got %lluB/s\n", - td->o.name, ratemin, rate); - return true; - } } } else { /* * checks iops specified rate */ - if (iops < rate_iops) { - log_err("%s: rate_iops_min=%u not met, only performed %lu IOs\n", - td->o.name, rate_iops, iops); + unsigned long long current_rate_iops = + ((current_rate_check_blocks - td->last_rate_check_blocks[ddir]) * 1000) / spent; + + if (current_rate_iops < option_rate_iops_min) { + log_err("%s: rate_iops_min=%u not met, got %llu IOPS\n", + td->o.name, option_rate_iops_min, current_rate_iops); return true; - } else { - if (spent) - rate = ((iops - td->rate_blocks[ddir]) * 1000) / spent; - else - rate = 0; - - if (rate < rate_iops_min || - iops < td->rate_blocks[ddir]) { - log_err("%s: rate_iops_min=%u not met, got %llu IOPS\n", - td->o.name, rate_iops_min, rate); - return true; - } } } } - td->rate_bytes[ddir] = bytes; - td->rate_blocks[ddir] = iops; - memcpy(&td->lastrate[ddir], now, sizeof(*now)); + td->last_rate_check_bytes[ddir] = current_rate_check_bytes; + td->last_rate_check_blocks[ddir] = current_rate_check_blocks; + memcpy(&td->last_rate_check_time[ddir], now, sizeof(*now)); return false; } @@ -477,7 +467,7 @@ int io_queue_event(struct thread_data *td, struct io_u *io_u, int *ret, if (!from_verify) unlog_io_piece(td, io_u); td_verror(td, EIO, "full resid"); - put_io_u(td, io_u); + clear_io_u(td, io_u); break; } @@ -645,15 +635,6 @@ static void do_verify(struct thread_data *td, uint64_t verify_bytes) if (td->error) return; - /* - * verify_state needs to be reset before verification - * proceeds so that expected random seeds match actual - * random seeds in headers. The main loop will reset - * all random number generators if randrepeat is set. - */ - if (!td->o.rand_repeatable) - td_fill_verify_state_seed(td); - td_set_runstate(td, TD_VERIFYING); io_u = NULL; @@ -690,7 +671,7 @@ static void do_verify(struct thread_data *td, uint64_t verify_bytes) break; } } else { - if (ddir_rw_sum(td->bytes_done) + td->o.rw_min_bs > verify_bytes) + if (td->bytes_verified + td->o.rw_min_bs > verify_bytes) break; while ((io_u = get_io_u(td)) != NULL) { @@ -719,6 +700,8 @@ static void do_verify(struct thread_data *td, uint64_t verify_bytes) break; } else if (io_u->ddir == DDIR_WRITE) { io_u->ddir = DDIR_READ; + io_u->numberio = td->verify_read_issues; + td->verify_read_issues++; populate_verify_io_u(td, io_u); break; } else { @@ -872,6 +855,7 @@ static void handle_thinktime(struct thread_data *td, enum fio_ddir ddir, struct timespec *time) { unsigned long long b; + unsigned long long runtime_left; uint64_t total; int left; struct timespec now; @@ -880,7 +864,7 @@ static void handle_thinktime(struct thread_data *td, enum fio_ddir ddir, if (td->o.thinktime_iotime) { fio_gettime(&now, NULL); if (utime_since(&td->last_thinktime, &now) - >= td->o.thinktime_iotime + td->o.thinktime) { + >= td->o.thinktime_iotime) { stall = true; } else if (!fio_option_is_set(&td->o, thinktime_blocks)) { /* @@ -903,11 +887,33 @@ static void handle_thinktime(struct thread_data *td, enum fio_ddir ddir, io_u_quiesce(td); + left = td->o.thinktime_spin; + if (td->o.timeout) { + runtime_left = td->o.timeout - utime_since_now(&td->epoch); + if (runtime_left < (unsigned long long)left) + left = runtime_left; + } + total = 0; - if (td->o.thinktime_spin) - total = usec_spin(td->o.thinktime_spin); + if (left) + total = usec_spin(left); + + /* + * usec_spin() might run for slightly longer than intended in a VM + * where the vCPU could get descheduled or the hypervisor could steal + * CPU time. Ensure "left" doesn't become negative. + */ + if (total < td->o.thinktime) + left = td->o.thinktime - total; + else + left = 0; + + if (td->o.timeout) { + runtime_left = td->o.timeout - utime_since_now(&td->epoch); + if (runtime_left < (unsigned long long)left) + left = runtime_left; + } - left = td->o.thinktime - total; if (left) total += usec_sleep(td, left); @@ -936,8 +942,10 @@ static void handle_thinktime(struct thread_data *td, enum fio_ddir ddir, fio_gettime(time, NULL); td->last_thinktime_blocks = b; - if (td->o.thinktime_iotime) + if (td->o.thinktime_iotime) { + fio_gettime(&now, NULL); td->last_thinktime = now; + } } /* @@ -969,6 +977,11 @@ static void do_io(struct thread_data *td, uint64_t *bytes_done) */ if (td_write(td) && td_random(td) && td->o.norandommap) total_bytes = max(total_bytes, (uint64_t) td->o.io_size); + + /* Don't break too early if io_size > size */ + if (td_rw(td) && !td_random(td)) + total_bytes = max(total_bytes, (uint64_t)td->o.io_size); + /* * If verify_backlog is enabled, we'll run the verify in this * handler as well. For that case, we may need up to twice the @@ -979,9 +992,11 @@ static void do_io(struct thread_data *td, uint64_t *bytes_done) total_bytes += td->o.size; /* In trimwrite mode, each byte is trimmed and then written, so - * allow total_bytes to be twice as big */ - if (td_trimwrite(td)) + * allow total_bytes or number of ios to be twice as big */ + if (td_trimwrite(td)) { total_bytes += td->total_io_size; + td->o.number_ios *= 2; + } while ((td->o.read_iolog_file && !flist_empty(&td->io_log_list)) || (!flist_empty(&td->trim_list)) || !io_issue_bytes_exceeded(td) || @@ -1036,8 +1051,13 @@ static void do_io(struct thread_data *td, uint64_t *bytes_done) break; } - if (io_u->ddir == DDIR_WRITE && td->flags & TD_F_DO_VERIFY) - populate_verify_io_u(td, io_u); + if (io_u->ddir == DDIR_WRITE && td->flags & TD_F_DO_VERIFY) { + if (!(io_u->flags & IO_U_F_PATTERN_DONE)) { + io_u_set(td, io_u, IO_U_F_PATTERN_DONE); + io_u->numberio = td->io_issues[io_u->ddir]; + populate_verify_io_u(td, io_u); + } + } ddir = io_u->ddir; @@ -1091,8 +1111,10 @@ static void do_io(struct thread_data *td, uint64_t *bytes_done) td->rate_io_issue_bytes[__ddir] += blen; } - if (should_check_rate(td)) + if (should_check_rate(td)) { td->rate_next_io_time[__ddir] = usec_for_io(td, __ddir); + fio_gettime(&comp_time, NULL); + } } else { ret = io_u_submit(td, io_u); @@ -1117,6 +1139,9 @@ reap: if (ret < 0) break; + if (ddir_rw(ddir) && td->o.thinkcycles) + cycles_spin(td->o.thinkcycles); + if (ddir_rw(ddir) && td->o.thinktime) handle_thinktime(td, ddir, &comp_time); @@ -1172,8 +1197,11 @@ reap: f->file_name); } } - } else + } else { + if (td->o.io_submit_mode == IO_MODE_OFFLOAD) + workqueue_flush(&td->io_wq); cleanup_pending_aio(td); + } /* * stop job if we failed doing any IO @@ -1298,7 +1326,8 @@ static int init_io_u(struct thread_data *td) } } - init_io_u_buffers(td); + if (init_io_u_buffers(td)) + return 1; if (init_file_completion_logging(td, max_units)) return 1; @@ -1309,7 +1338,7 @@ static int init_io_u(struct thread_data *td) int init_io_u_buffers(struct thread_data *td) { struct io_u *io_u; - unsigned long long max_bs, min_write; + unsigned long long max_bs, min_write, trim_bs = 0; int i, max_units; int data_xfer = 1; char *p; @@ -1320,7 +1349,18 @@ int init_io_u_buffers(struct thread_data *td) td->orig_buffer_size = (unsigned long long) max_bs * (unsigned long long) max_units; - if (td_ioengine_flagged(td, FIO_NOIO) || !(td_read(td) || td_write(td))) + if (td_trim(td) && td->o.num_range > 1) { + trim_bs = td->o.num_range * sizeof(struct trim_range); + td->orig_buffer_size = trim_bs + * (unsigned long long) max_units; + } + + /* + * For reads, writes, and multi-range trim operations we need a + * data buffer + */ + if (td_ioengine_flagged(td, FIO_NOIO) || + !(td_read(td) || td_write(td) || (td_trim(td) && td->o.num_range > 1))) data_xfer = 0; /* @@ -1329,7 +1369,7 @@ int init_io_u_buffers(struct thread_data *td) * overflow later. this adjustment may be too much if we get * lucky and the allocator gives us an aligned address. */ - if (td->o.odirect || td->o.mem_align || td->o.oatomic || + if (td->o.odirect || td->o.mem_align || td_ioengine_flagged(td, FIO_RAWIO)) td->orig_buffer_size += page_mask + td->o.mem_align; @@ -1348,7 +1388,7 @@ int init_io_u_buffers(struct thread_data *td) if (data_xfer && allocate_io_mem(td)) return 1; - if (td->o.odirect || td->o.mem_align || td->o.oatomic || + if (td->o.odirect || td->o.mem_align || td_ioengine_flagged(td, FIO_RAWIO)) p = PTR_ALIGN(td->orig_buffer, page_mask) + td->o.mem_align; else @@ -1372,7 +1412,10 @@ int init_io_u_buffers(struct thread_data *td) fill_verify_pattern(td, io_u->buf, max_bs, io_u, 0, 0); } } - p += max_bs; + if (td_trim(td) && td->o.num_range > 1) + p += trim_bs; + else + p += max_bs; } return 0; @@ -1622,7 +1665,7 @@ static void *thread_main(void *data) uint64_t bytes_done[DDIR_RWDIR_CNT]; int deadlock_loop_cnt; bool clear_state; - int res, ret; + int ret; sk_out_assign(sk_out); free(fd); @@ -1779,19 +1822,27 @@ static void *thread_main(void *data) /* ioprio_set() has to be done before td_io_init() */ if (fio_option_is_set(o, ioprio) || - fio_option_is_set(o, ioprio_class)) { - ret = ioprio_set(IOPRIO_WHO_PROCESS, 0, o->ioprio_class, o->ioprio); + fio_option_is_set(o, ioprio_class) || + fio_option_is_set(o, ioprio_hint)) { + ret = ioprio_set(IOPRIO_WHO_PROCESS, 0, o->ioprio_class, + o->ioprio, o->ioprio_hint); if (ret == -1) { td_verror(td, errno, "ioprio_set"); goto err; } - td->ioprio = ioprio_value(o->ioprio_class, o->ioprio); + td->ioprio = ioprio_value(o->ioprio_class, o->ioprio, + o->ioprio_hint); td->ts.ioprio = td->ioprio; } if (td_io_init(td)) goto err; + if (td_ioengine_flagged(td, FIO_SYNCIO) && td->o.iodepth > 1 && td->o.io_submit_mode != IO_MODE_OFFLOAD) { + log_info("note: both iodepth >= 1 and synchronous I/O engine " + "are selected, queue depth will be capped at 1\n"); + } + if (init_io_u(td)) goto err; @@ -1830,7 +1881,7 @@ static void *thread_main(void *data) if (rate_submit_init(td, sk_out)) goto err; - set_epoch_time(td, o->log_unix_epoch); + set_epoch_time(td, o->log_alternate_epoch_clock_id, o->job_start_clock_id); fio_getrusage(&td->ru_start); memcpy(&td->bw_sample_time, &td->epoch, sizeof(td->epoch)); memcpy(&td->iops_sample_time, &td->epoch, sizeof(td->epoch)); @@ -1840,11 +1891,11 @@ static void *thread_main(void *data) if (o->ratemin[DDIR_READ] || o->ratemin[DDIR_WRITE] || o->ratemin[DDIR_TRIM]) { - memcpy(&td->lastrate[DDIR_READ], &td->bw_sample_time, + memcpy(&td->last_rate_check_time[DDIR_READ], &td->bw_sample_time, sizeof(td->bw_sample_time)); - memcpy(&td->lastrate[DDIR_WRITE], &td->bw_sample_time, + memcpy(&td->last_rate_check_time[DDIR_WRITE], &td->bw_sample_time, sizeof(td->bw_sample_time)); - memcpy(&td->lastrate[DDIR_TRIM], &td->bw_sample_time, + memcpy(&td->last_rate_check_time[DDIR_TRIM], &td->bw_sample_time, sizeof(td->bw_sample_time)); } @@ -1869,8 +1920,12 @@ static void *thread_main(void *data) if (td->o.verify_only && td_write(td)) verify_bytes = do_dry_run(td); else { + if (!td->o.rand_repeatable) + /* save verify rand state to replay hdr seeds later at verify */ + frand_copy(&td->verify_state_last_do_io, &td->verify_state); do_io(td, bytes_done); - + if (!td->o.rand_repeatable) + frand_copy(&td->verify_state, &td->verify_state_last_do_io); if (!ddir_rw_sum(bytes_done)) { fio_mark_td_terminate(td); verify_bytes = 0; @@ -1910,7 +1965,8 @@ static void *thread_main(void *data) } } while (1); - if (td_read(td) && td->io_bytes[DDIR_READ]) + if (td->io_bytes[DDIR_READ] && (td_read(td) || + ((td->flags & TD_F_VER_BACKLOG) && td_write(td)))) update_runtime(td, elapsed_us, DDIR_READ); if (td_write(td) && td->io_bytes[DDIR_WRITE]) update_runtime(td, elapsed_us, DDIR_WRITE); @@ -1953,13 +2009,23 @@ static void *thread_main(void *data) * another thread is checking its io_u's for overlap */ if (td_offload_overlap(td)) { - int res = pthread_mutex_lock(&overlap_check); - assert(res == 0); + int res; + + res = pthread_mutex_lock(&overlap_check); + if (res) { + td->error = errno; + goto err; + } } td_set_runstate(td, TD_FINISHING); if (td_offload_overlap(td)) { + int res; + res = pthread_mutex_unlock(&overlap_check); - assert(res == 0); + if (res) { + td->error = errno; + goto err; + } } update_rusage_stat(td); @@ -2032,18 +2098,17 @@ err: static void reap_threads(unsigned int *nr_running, uint64_t *t_rate, uint64_t *m_rate) { - struct thread_data *td; unsigned int cputhreads, realthreads, pending; - int i, status, ret; + int ret; /* * reap exited threads (TD_EXITED -> TD_REAPED) */ realthreads = pending = cputhreads = 0; - for_each_td(td, i) { - int flags = 0; + for_each_td(td) { + int flags = 0, status; - if (!strcmp(td->o.ioengine, "cpuio")) + if (!strcmp(td->o.ioengine, "cpuio")) cputhreads++; else realthreads++; @@ -2133,7 +2198,7 @@ reaped: done_secs += mtime_since_now(&td->epoch) / 1000; profile_td_exit(td); flow_exit_job(td); - } + } end_for_each(); if (*nr_running == cputhreads && !pending && realthreads) fio_terminate_threads(TERMINATE_ALL, TERMINATE_ALL); @@ -2260,13 +2325,11 @@ static bool waitee_running(struct thread_data *me) { const char *waitee = me->o.wait_for; const char *self = me->o.name; - struct thread_data *td; - int i; if (!waitee) return false; - for_each_td(td, i) { + for_each_td(td) { if (!strcmp(td->o.name, self) || strcmp(td->o.name, waitee)) continue; @@ -2276,7 +2339,7 @@ static bool waitee_running(struct thread_data *me) runstate_to_name(td->runstate)); return true; } - } + } end_for_each(); dprint(FD_PROCESS, "%s: %s completed, can run\n", self, waitee); return false; @@ -2300,14 +2363,14 @@ static void run_threads(struct sk_out *sk_out) set_sig_handlers(); nr_thread = nr_process = 0; - for_each_td(td, i) { + for_each_td(td) { if (check_mount_writes(td)) return; if (td->o.use_thread) nr_thread++; else nr_process++; - } + } end_for_each(); if (output_format & FIO_OUTPUT_NORMAL) { struct buf_output out; @@ -2333,7 +2396,7 @@ static void run_threads(struct sk_out *sk_out) nr_started = 0; m_rate = t_rate = 0; - for_each_td(td, i) { + for_each_td(td) { print_status_init(td->thread_number - 1); if (!td->o.create_serialize) @@ -2369,7 +2432,7 @@ reap: td_io_close_file(td, f); } } - } + } end_for_each(); /* start idle threads before io threads start to run */ fio_idle_prof_start(); @@ -2385,7 +2448,7 @@ reap: /* * create threads (TD_NOT_CREATED -> TD_CREATED) */ - for_each_td(td, i) { + for_each_td(td) { if (td->runstate != TD_NOT_CREATED) continue; @@ -2454,15 +2517,21 @@ reap: strerror(ret)); } else { pid_t pid; + void *eo; dprint(FD_PROCESS, "will fork\n"); + eo = td->eo; + read_barrier(); pid = fork(); if (!pid) { int ret; ret = (int)(uintptr_t)thread_main(fd); _exit(ret); - } else if (i == fio_debug_jobno) + } else if (__td_index == fio_debug_jobno) *fio_debug_jobp = pid; + free(eo); + free(fd); + fd = NULL; } dprint(FD_MUTEX, "wait on startup_sem\n"); if (fio_sem_down_timeout(startup_sem, 10000)) { @@ -2474,7 +2543,7 @@ reap: break; } dprint(FD_MUTEX, "done waiting on startup_sem\n"); - } + } end_for_each(); /* * Wait for the started threads to transition to @@ -2519,7 +2588,7 @@ reap: /* * start created threads (TD_INITIALIZED -> TD_RUNNING). */ - for_each_td(td, i) { + for_each_td(td) { if (td->runstate != TD_INITIALIZED) continue; @@ -2533,7 +2602,7 @@ reap: t_rate += ddir_rw_sum(td->o.rate); todo--; fio_sem_up(td->sem); - } + } end_for_each(); reap_threads(&nr_running, &t_rate, &m_rate); @@ -2559,9 +2628,7 @@ static void free_disk_util(void) int fio_backend(struct sk_out *sk_out) { - struct thread_data *td; int i; - if (exec_profile) { if (load_profile(exec_profile)) return 1; @@ -2581,6 +2648,11 @@ int fio_backend(struct sk_out *sk_out) setup_log(&agg_io_log[DDIR_TRIM], &p, "agg-trim_bw.log"); } + if (init_global_dedupe_working_set_seeds()) { + log_err("fio: failed to initialize global dedupe working set\n"); + return 1; + } + startup_sem = fio_sem_init(FIO_SEM_LOCKED); if (!sk_out) is_local_backend = true; @@ -2612,7 +2684,7 @@ int fio_backend(struct sk_out *sk_out) } } - for_each_td(td, i) { + for_each_td(td) { struct thread_stat *ts = &td->ts; free_clat_prio_stats(ts); @@ -2625,7 +2697,7 @@ int fio_backend(struct sk_out *sk_out) } fio_sem_remove(td->sem); td->sem = NULL; - } + } end_for_each(); free_disk_util(); if (cgroup_list) {