X-Git-Url: https://git.kernel.dk/?p=fio.git;a=blobdiff_plain;f=backend.c;h=2e6a377cfb4cb4317294eb854cd61712f402caf7;hp=3c45e7898728669c4abe211d5be4004532dbe802;hb=9107a641e67f27003fa6cbe7b55b1ec6a0239197;hpb=0aa8371d9fc40121098929a7f670dcdd259b8eee diff --git a/backend.c b/backend.c index 3c45e789..2e6a377c 100644 --- a/backend.c +++ b/backend.c @@ -29,6 +29,7 @@ #include #include #include +#include #include "fio.h" #include "smalloc.h" @@ -47,12 +48,13 @@ #include "rate-submit.h" #include "helper_thread.h" #include "pshared.h" +#include "zone-dist.h" static struct fio_sem *startup_sem; static struct flist_head *cgroup_list; static struct cgroup_mnt *cgroup_mnt; static int exit_value; -static volatile int fio_abort; +static volatile bool fio_abort; static unsigned int nr_process = 0; static unsigned int nr_thread = 0; @@ -60,16 +62,22 @@ struct io_log *agg_io_log[DDIR_RWDIR_CNT]; int groupid = 0; unsigned int thread_number = 0; +unsigned int nr_segments = 0; +unsigned int cur_segment = 0; unsigned int stat_number = 0; -int shm_id = 0; int temp_stall_ts; unsigned long done_secs = 0; +#ifdef PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP +pthread_mutex_t overlap_check = PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP; +#else +pthread_mutex_t overlap_check = PTHREAD_MUTEX_INITIALIZER; +#endif #define JOB_START_TIMEOUT (5 * 1000) static void sig_int(int sig) { - if (threads) { + if (nr_segments) { if (is_backend) fio_server_got_signal(sig); else { @@ -78,7 +86,7 @@ static void sig_int(int sig) exit_value = 128; } - fio_terminate_threads(TERMINATE_ALL); + fio_terminate_threads(TERMINATE_ALL, TERMINATE_ALL); } } @@ -131,8 +139,8 @@ static bool __check_min_rate(struct thread_data *td, struct timespec *now, unsigned long long bytes = 0; unsigned long iops = 0; unsigned long spent; - unsigned long rate; - unsigned int ratemin = 0; + unsigned long long rate; + unsigned long long ratemin = 0; unsigned int rate_iops = 0; unsigned int rate_iops_min = 0; @@ -166,7 +174,7 @@ static bool __check_min_rate(struct thread_data *td, struct timespec *now, * check bandwidth specified rate */ if (bytes < td->rate_bytes[ddir]) { - log_err("%s: rate_min=%uB/s not met, only transferred %lluB\n", + log_err("%s: rate_min=%lluB/s not met, only transferred %lluB\n", td->o.name, ratemin, bytes); return true; } else { @@ -177,7 +185,7 @@ static bool __check_min_rate(struct thread_data *td, struct timespec *now, if (rate < ratemin || bytes < td->rate_bytes[ddir]) { - log_err("%s: rate_min=%uB/s not met, got %luB/s\n", + log_err("%s: rate_min=%lluB/s not met, got %lluB/s\n", td->o.name, ratemin, rate); return true; } @@ -198,7 +206,7 @@ static bool __check_min_rate(struct thread_data *td, struct timespec *now, if (rate < rate_iops_min || iops < td->rate_blocks[ddir]) { - log_err("%s: rate_iops_min=%u not met, got %lu IOPS\n", + log_err("%s: rate_iops_min=%u not met, got %llu IOPS\n", td->o.name, rate_iops_min, rate); return true; } @@ -216,12 +224,10 @@ static bool check_min_rate(struct thread_data *td, struct timespec *now) { bool ret = false; - if (td->bytes_done[DDIR_READ]) - ret |= __check_min_rate(td, now, DDIR_READ); - if (td->bytes_done[DDIR_WRITE]) - ret |= __check_min_rate(td, now, DDIR_WRITE); - if (td->bytes_done[DDIR_TRIM]) - ret |= __check_min_rate(td, now, DDIR_TRIM); + for_each_rw_ddir(ddir) { + if (td->bytes_done[ddir]) + ret |= __check_min_rate(td, now, ddir); + } return ret; } @@ -238,8 +244,6 @@ static void cleanup_pending_aio(struct thread_data *td) * get immediately available events, if any */ r = io_u_queued_complete(td, 0); - if (r < 0) - return; /* * now cancel remaining active events @@ -275,6 +279,7 @@ static bool fio_io_sync(struct thread_data *td, struct fio_file *f) io_u->ddir = DDIR_SYNC; io_u->file = f; + io_u_set(td, io_u, IO_U_F_NO_FILE_PUT); if (td_io_prep(td, io_u)) { put_io_u(td, io_u); @@ -308,7 +313,7 @@ requeue: static int fio_file_fsync(struct thread_data *td, struct fio_file *f) { - int ret; + int ret, ret2; if (fio_file_open(f)) return fio_io_sync(td, f); @@ -317,8 +322,10 @@ static int fio_file_fsync(struct thread_data *td, struct fio_file *f) return 1; ret = fio_io_sync(td, f); - td_io_close_file(td, f); - return ret; + ret2 = 0; + if (fio_file_open(f)) + ret2 = td_io_close_file(td, f); + return (ret || ret2); } static inline void __update_ts_cache(struct thread_data *td) @@ -566,7 +573,7 @@ static int unlink_all_files(struct thread_data *td) /* * Check if io_u will overlap an in-flight IO in the queue */ -static bool in_flight_overlap(struct io_u_queue *q, struct io_u *io_u) +bool in_flight_overlap(struct io_u_queue *q, struct io_u *io_u) { bool overlap; struct io_u *check_io_u; @@ -966,8 +973,10 @@ static void do_io(struct thread_data *td, uint64_t *bytes_done) * Break if we exceeded the bytes. The exception is time * based runs, but we still need to break out of the loop * for those to run verification, if enabled. + * Jobs read from iolog do not use this stop condition. */ if (bytes_issued >= total_bytes && + !td->o.read_iolog_file && (!td->o.time_based || (td->o.time_based && td->o.verify != VERIFY_NONE))) break; @@ -1000,12 +1009,6 @@ static void do_io(struct thread_data *td, uint64_t *bytes_done) if (td->o.verify != VERIFY_NONE && io_u->ddir == DDIR_READ && ((io_u->flags & IO_U_F_VER_LIST) || !td_rw(td))) { - if (!td->o.verify_pattern_bytes) { - io_u->rand_seed = __rand(&td->verify_state); - if (sizeof(int) != sizeof(long *)) - io_u->rand_seed *= __rand(&td->verify_state); - } - if (verify_state_should_stop(td, io_u)) { put_io_u(td, io_u); break; @@ -1080,7 +1083,7 @@ reap: if (!in_ramp_time(td) && should_check_rate(td)) { if (check_min_rate(td, &comp_time)) { if (exitall_on_terminate || td->o.exitall_error) - fio_terminate_threads(td->groupid); + fio_terminate_threads(td->groupid, td->o.exit_what); td_verror(td, EIO, "check_min_rate"); break; } @@ -1116,7 +1119,7 @@ reap: td->error = 0; } - if (should_fsync(td) && td->o.end_fsync) { + if (should_fsync(td) && (td->o.end_fsync || td->o.fsync_on_close)) { td_set_runstate(td, TD_FSYNCING); for_each_file(td, f, i) { @@ -1184,14 +1187,14 @@ static void cleanup_io_u(struct thread_data *td) if (td->io_ops->io_u_free) td->io_ops->io_u_free(td, io_u); - fio_memfree(io_u, sizeof(*io_u)); + fio_memfree(io_u, sizeof(*io_u), td_offload_overlap(td)); } free_io_mem(td); io_u_rexit(&td->io_u_requeues); - io_u_qexit(&td->io_u_freelist); - io_u_qexit(&td->io_u_all); + io_u_qexit(&td->io_u_freelist, false); + io_u_qexit(&td->io_u_all, td_offload_overlap(td)); free_file_completion_logging(td); } @@ -1199,30 +1202,85 @@ static void cleanup_io_u(struct thread_data *td) static int init_io_u(struct thread_data *td) { struct io_u *io_u; - unsigned long long max_bs, min_write; int cl_align, i, max_units; - int data_xfer = 1, err; - char *p; + int err; max_units = td->o.iodepth; - max_bs = td_max_bs(td); - min_write = td->o.min_bs[DDIR_WRITE]; - td->orig_buffer_size = (unsigned long long) max_bs - * (unsigned long long) max_units; - - if (td_ioengine_flagged(td, FIO_NOIO) || !(td_read(td) || td_write(td))) - data_xfer = 0; err = 0; err += !io_u_rinit(&td->io_u_requeues, td->o.iodepth); - err += !io_u_qinit(&td->io_u_freelist, td->o.iodepth); - err += !io_u_qinit(&td->io_u_all, td->o.iodepth); + err += !io_u_qinit(&td->io_u_freelist, td->o.iodepth, false); + err += !io_u_qinit(&td->io_u_all, td->o.iodepth, td_offload_overlap(td)); if (err) { log_err("fio: failed setting up IO queues\n"); return 1; } + cl_align = os_cache_line_size(); + + for (i = 0; i < max_units; i++) { + void *ptr; + + if (td->terminate) + return 1; + + ptr = fio_memalign(cl_align, sizeof(*io_u), td_offload_overlap(td)); + if (!ptr) { + log_err("fio: unable to allocate aligned memory\n"); + return 1; + } + + io_u = ptr; + memset(io_u, 0, sizeof(*io_u)); + INIT_FLIST_HEAD(&io_u->verify_list); + dprint(FD_MEM, "io_u alloc %p, index %u\n", io_u, i); + + io_u->index = i; + io_u->flags = IO_U_F_FREE; + io_u_qpush(&td->io_u_freelist, io_u); + + /* + * io_u never leaves this stack, used for iteration of all + * io_u buffers. + */ + io_u_qpush(&td->io_u_all, io_u); + + if (td->io_ops->io_u_init) { + int ret = td->io_ops->io_u_init(td, io_u); + + if (ret) { + log_err("fio: failed to init engine data: %d\n", ret); + return 1; + } + } + } + + init_io_u_buffers(td); + + if (init_file_completion_logging(td, max_units)) + return 1; + + return 0; +} + +int init_io_u_buffers(struct thread_data *td) +{ + struct io_u *io_u; + unsigned long long max_bs, min_write; + int i, max_units; + int data_xfer = 1; + char *p; + + max_units = td->o.iodepth; + max_bs = td_max_bs(td); + min_write = td->o.min_bs[DDIR_WRITE]; + td->orig_buffer_size = (unsigned long long) max_bs + * (unsigned long long) max_units; + + if (td_ioengine_flagged(td, FIO_NOIO) || !(td_read(td) || td_write(td))) + data_xfer = 0; + /* * if we may later need to do address alignment, then add any * possible adjustment here so that we don't cause a buffer @@ -1254,23 +1312,8 @@ static int init_io_u(struct thread_data *td) else p = td->orig_buffer; - cl_align = os_cache_line_size(); - for (i = 0; i < max_units; i++) { - void *ptr; - - if (td->terminate) - return 1; - - ptr = fio_memalign(cl_align, sizeof(*io_u)); - if (!ptr) { - log_err("fio: unable to allocate aligned memory\n"); - break; - } - - io_u = ptr; - memset(io_u, 0, sizeof(*io_u)); - INIT_FLIST_HEAD(&io_u->verify_list); + io_u = td->io_u_all.io_us[i]; dprint(FD_MEM, "io_u alloc %p, index %u\n", io_u, i); if (data_xfer) { @@ -1287,32 +1330,9 @@ static int init_io_u(struct thread_data *td) fill_verify_pattern(td, io_u->buf, max_bs, io_u, 0, 0); } } - - io_u->index = i; - io_u->flags = IO_U_F_FREE; - io_u_qpush(&td->io_u_freelist, io_u); - - /* - * io_u never leaves this stack, used for iteration of all - * io_u buffers. - */ - io_u_qpush(&td->io_u_all, io_u); - - if (td->io_ops->io_u_init) { - int ret = td->io_ops->io_u_init(td, io_u); - - if (ret) { - log_err("fio: failed to init engine data: %d\n", ret); - return 1; - } - } - p += max_bs; } - if (init_file_completion_logging(td, max_units)) - return 1; - return 0; } @@ -1439,16 +1459,17 @@ static bool keep_running(struct thread_data *td) return false; } -static int exec_string(struct thread_options *o, const char *string, const char *mode) +static int exec_string(struct thread_options *o, const char *string, + const char *mode) { - size_t newlen = strlen(string) + strlen(o->name) + strlen(mode) + 9 + 1; int ret; char *str; - str = malloc(newlen); - sprintf(str, "%s &> %s.%s.txt", string, o->name, mode); + if (asprintf(&str, "%s > %s.%s.txt 2>&1", string, o->name, mode) < 0) + return -1; - log_info("%s : Saving output of %s in %s.%s.txt\n",o->name, mode, o->name, mode); + log_info("%s : Saving output of %s in %s.%s.txt\n", o->name, mode, + o->name, mode); ret = system(str); if (ret == -1) log_err("fio: exec of cmd <%s> failed\n", str); @@ -1517,8 +1538,8 @@ static void *thread_main(void *data) struct sk_out *sk_out = fd->sk_out; uint64_t bytes_done[DDIR_RWDIR_CNT]; int deadlock_loop_cnt; - bool clear_state, did_some_io; - int ret; + bool clear_state; + int res, ret; sk_out_assign(sk_out); free(fd); @@ -1573,6 +1594,8 @@ static void *thread_main(void *data) goto err; } + td_zone_gen_index(td); + /* * Do this early, we don't want the compress threads to be limited * to the same CPUs as the IO workers. So do this before we set @@ -1671,9 +1694,15 @@ static void *thread_main(void *data) if (!init_iolog(td)) goto err; + if (td_io_init(td)) + goto err; + if (init_io_u(td)) goto err; + if (td->io_ops->post_init && td->io_ops->post_init(td)) + goto err; + if (o->verify_async && verify_async_init(td)) goto err; @@ -1701,13 +1730,10 @@ static void *thread_main(void *data) if (!o->create_serialize && setup_files(td)) goto err; - if (td_io_init(td)) - goto err; - if (!init_random_map(td)) goto err; - if (o->exec_prerun && exec_string(o, o->exec_prerun, (const char *)"prerun")) + if (o->exec_prerun && exec_string(o, o->exec_prerun, "prerun")) goto err; if (o->pre_read && !pre_read_files(td)) @@ -1736,7 +1762,6 @@ static void *thread_main(void *data) memset(bytes_done, 0, sizeof(bytes_done)); clear_state = false; - did_some_io = false; while (keep_running(td)) { uint64_t verify_bytes; @@ -1814,9 +1839,6 @@ static void *thread_main(void *data) td_ioengine_flagged(td, FIO_UNIDIR)) continue; - if (ddir_rw_sum(bytes_done)) - did_some_io = true; - clear_io_state(td, 0); fio_gettime(&td->start, NULL); @@ -1838,25 +1860,25 @@ static void *thread_main(void *data) } /* - * If td ended up with no I/O when it should have had, - * then something went wrong unless FIO_NOIO or FIO_DISKLESSIO. - * (Are we not missing other flags that can be ignored ?) + * Acquire this lock if we were doing overlap checking in + * offload mode so that we don't clean up this job while + * another thread is checking its io_u's for overlap */ - if ((td->o.size || td->o.io_size) && !ddir_rw_sum(bytes_done) && - !did_some_io && !td->o.create_only && - !(td_ioengine_flagged(td, FIO_NOIO) || - td_ioengine_flagged(td, FIO_DISKLESSIO))) - log_err("%s: No I/O performed by %s, " - "perhaps try --debug=io option for details?\n", - td->o.name, td->io_ops->name); - + if (td_offload_overlap(td)) { + int res = pthread_mutex_lock(&overlap_check); + assert(res == 0); + } td_set_runstate(td, TD_FINISHING); + if (td_offload_overlap(td)) { + res = pthread_mutex_unlock(&overlap_check); + assert(res == 0); + } update_rusage_stat(td); td->ts.total_run_time = mtime_since_now(&td->epoch); - td->ts.io_bytes[DDIR_READ] = td->io_bytes[DDIR_READ]; - td->ts.io_bytes[DDIR_WRITE] = td->io_bytes[DDIR_WRITE]; - td->ts.io_bytes[DDIR_TRIM] = td->io_bytes[DDIR_TRIM]; + for_each_rw_ddir(ddir) { + td->ts.io_bytes[ddir] = td->io_bytes[ddir]; + } if (td->o.verify_state_save && !(td->flags & TD_F_VSTATE_SAVED) && (td->o.verify != VERIFY_NONE && td_write(td))) @@ -1870,10 +1892,10 @@ static void *thread_main(void *data) rate_submit_exit(td); if (o->exec_postrun) - exec_string(o, o->exec_postrun, (const char *)"postrun"); + exec_string(o, o->exec_postrun, "postrun"); if (exitall_on_terminate || (o->exitall_error && td->error)) - fio_terminate_threads(td->groupid); + fio_terminate_threads(td->groupid, td->o.exit_what); err: if (td->error) @@ -1888,15 +1910,7 @@ err: close_ioengine(td); cgroup_shutdown(td, cgroup_mnt); verify_free_state(td); - - if (td->zone_state_index) { - int i; - - for (i = 0; i < DDIR_RWDIR_CNT; i++) - free(td->zone_state_index[i]); - free(td->zone_state_index); - td->zone_state_index = NULL; - } + td_zone_free_index(td); if (fio_option_is_set(o, cpumask)) { ret = fio_cpuset_exit(&o->cpumask); @@ -1909,6 +1923,8 @@ err: */ if (o->write_iolog_file) write_iolog_close(td); + if (td->io_log_rfile) + fclose(td->io_log_rfile); td_set_runstate(td, TD_EXITED); @@ -2028,10 +2044,11 @@ reaped: done_secs += mtime_since_now(&td->epoch) / 1000; profile_td_exit(td); + flow_exit_job(td); } if (*nr_running == cputhreads && !pending && realthreads) - fio_terminate_threads(TERMINATE_ALL); + fio_terminate_threads(TERMINATE_ALL, TERMINATE_ALL); } static bool __check_trigger_file(void) @@ -2081,7 +2098,7 @@ void check_trigger_file(void) fio_clients_send_trigger(trigger_remote_cmd); else { verify_save_state(IO_LIST_ALL); - fio_terminate_threads(TERMINATE_ALL); + fio_terminate_threads(TERMINATE_ALL, TERMINATE_ALL); exec_trigger(trigger_cmd); } } @@ -2101,8 +2118,16 @@ static int fio_verify_load_state(struct thread_data *td) td->thread_number - 1, &data); if (!ret) verify_assign_state(td, data); - } else - ret = verify_load_state(td, "local"); + } else { + char prefix[PATH_MAX]; + + if (aux_path) + sprintf(prefix, "%s%clocal", aux_path, + FIO_OS_PATH_SEPARATOR); + else + strcpy(prefix, "local"); + ret = verify_load_state(td, prefix); + } return ret; } @@ -2197,18 +2222,22 @@ static void run_threads(struct sk_out *sk_out) } if (output_format & FIO_OUTPUT_NORMAL) { - log_info("Starting "); + struct buf_output out; + + buf_output_init(&out); + __log_buf(&out, "Starting "); if (nr_thread) - log_info("%d thread%s", nr_thread, + __log_buf(&out, "%d thread%s", nr_thread, nr_thread > 1 ? "s" : ""); if (nr_process) { if (nr_thread) - log_info(" and "); - log_info("%d process%s", nr_process, + __log_buf(&out, " and "); + __log_buf(&out, "%d process%s", nr_process, nr_process > 1 ? "es" : ""); } - log_info("\n"); - log_info_flush(); + __log_buf(&out, "\n"); + log_info_buf(out.buf, out.buflen); + buf_output_free(&out); } todo = thread_number; @@ -2350,8 +2379,8 @@ reap: dprint(FD_MUTEX, "wait on startup_sem\n"); if (fio_sem_down_timeout(startup_sem, 10000)) { log_err("fio: job startup hung? exiting.\n"); - fio_terminate_threads(TERMINATE_ALL); - fio_abort = 1; + fio_terminate_threads(TERMINATE_ALL, TERMINATE_ALL); + fio_abort = true; nr_started--; free(fd); break; @@ -2465,12 +2494,15 @@ int fio_backend(struct sk_out *sk_out) } startup_sem = fio_sem_init(FIO_SEM_LOCKED); + if (!sk_out) + is_local_backend = true; if (startup_sem == NULL) return 1; set_genesis_time(); stat_init(); - helper_thread_create(startup_sem, sk_out); + if (helper_thread_create(startup_sem, sk_out)) + log_err("fio: failed to create helper thread\n"); cgroup_list = smalloc(sizeof(*cgroup_list)); if (cgroup_list)