X-Git-Url: https://git.kernel.dk/?p=fio.git;a=blobdiff_plain;f=backend.c;h=fe1599706fcdcb32b0990b1e840b3259448396ab;hp=f0cb1bc34cf91937b119c54186276a0aea696a26;hb=8b6a404cdd2c40715885e562416c3db039912773;hpb=a47591e4923fb8faef18e1cd5125a50429282089 diff --git a/backend.c b/backend.c index f0cb1bc3..fe159970 100644 --- a/backend.c +++ b/backend.c @@ -57,6 +57,7 @@ #include "workqueue.h" #include "lib/mountcheck.h" #include "rate-submit.h" +#include "helper_thread.h" static struct fio_mutex *startup_mutex; static struct flist_head *cgroup_list; @@ -75,19 +76,6 @@ int shm_id = 0; int temp_stall_ts; unsigned long done_secs = 0; -static struct helper_data { - volatile int exit; - volatile int reset; - volatile int do_stat; - struct sk_out *sk_out; - pthread_t thread; - pthread_mutex_t lock; - pthread_cond_t cond; -} *helper_data; - -#define PAGE_ALIGN(buf) \ - (char *) (((uintptr_t) (buf) + page_mask) & ~page_mask) - #define JOB_START_TIMEOUT (5 * 1000) static void sig_int(int sig) @@ -148,7 +136,7 @@ static void set_sig_handlers(void) /* * Check if we are above the minimum rate given. */ -static bool __check_min_rate(struct thread_data *td, struct timeval *now, +static bool __check_min_rate(struct thread_data *td, struct timespec *now, enum fio_ddir ddir) { unsigned long long bytes = 0; @@ -189,8 +177,8 @@ static bool __check_min_rate(struct thread_data *td, struct timeval *now, * check bandwidth specified rate */ if (bytes < td->rate_bytes[ddir]) { - log_err("%s: min rate %u not met\n", td->o.name, - ratemin); + log_err("%s: rate_min=%uB/s not met, only transferred %lluB\n", + td->o.name, ratemin, bytes); return true; } else { if (spent) @@ -200,9 +188,8 @@ static bool __check_min_rate(struct thread_data *td, struct timeval *now, if (rate < ratemin || bytes < td->rate_bytes[ddir]) { - log_err("%s: min rate %u not met, got" - " %luKB/sec\n", td->o.name, - ratemin, rate); + log_err("%s: rate_min=%uB/s not met, got %luB/s\n", + td->o.name, ratemin, rate); return true; } } @@ -211,8 +198,8 @@ static bool __check_min_rate(struct thread_data *td, struct timeval *now, * checks iops specified rate */ if (iops < rate_iops) { - log_err("%s: min iops rate %u not met\n", - td->o.name, rate_iops); + log_err("%s: rate_iops_min=%u not met, only performed %lu IOs\n", + td->o.name, rate_iops, iops); return true; } else { if (spent) @@ -222,9 +209,8 @@ static bool __check_min_rate(struct thread_data *td, struct timeval *now, if (rate < rate_iops_min || iops < td->rate_blocks[ddir]) { - log_err("%s: min iops rate %u not met," - " got %lu\n", td->o.name, - rate_iops_min, rate); + log_err("%s: rate_iops_min=%u not met, got %lu IOPS\n", + td->o.name, rate_iops_min, rate); return true; } } @@ -237,7 +223,7 @@ static bool __check_min_rate(struct thread_data *td, struct timeval *now, return false; } -static bool check_min_rate(struct thread_data *td, struct timeval *now) +static bool check_min_rate(struct thread_data *td, struct timespec *now) { bool ret = false; @@ -349,18 +335,18 @@ static int fio_file_fsync(struct thread_data *td, struct fio_file *f) return ret; } -static inline void __update_tv_cache(struct thread_data *td) +static inline void __update_ts_cache(struct thread_data *td) { - fio_gettime(&td->tv_cache, NULL); + fio_gettime(&td->ts_cache, NULL); } -static inline void update_tv_cache(struct thread_data *td) +static inline void update_ts_cache(struct thread_data *td) { - if ((++td->tv_cache_nr & td->tv_cache_mask) == td->tv_cache_mask) - __update_tv_cache(td); + if ((++td->ts_cache_nr & td->ts_cache_mask) == td->ts_cache_mask) + __update_ts_cache(td); } -static inline bool runtime_exceeded(struct thread_data *td, struct timeval *t) +static inline bool runtime_exceeded(struct thread_data *td, struct timespec *t) { if (in_ramp_time(td)) return false; @@ -444,12 +430,15 @@ static void check_update_rusage(struct thread_data *td) } } -static int wait_for_completions(struct thread_data *td, struct timeval *time) +static int wait_for_completions(struct thread_data *td, struct timespec *time) { const int full = queue_full(td); int min_evts = 0; int ret; + if (td->flags & TD_F_REGROW_LOGS) + return io_u_quiesce(td); + /* * if the queue is full, we MUST reap at least 1 event */ @@ -473,7 +462,7 @@ static int wait_for_completions(struct thread_data *td, struct timeval *time) int io_queue_event(struct thread_data *td, struct io_u *io_u, int *ret, enum fio_ddir ddir, uint64_t *bytes_issued, int from_verify, - struct timeval *comp_time) + struct timespec *comp_time) { int ret2; @@ -527,6 +516,9 @@ sync_done: break; } + if (td->flags & TD_F_REGROW_LOGS) + regrow_logs(td); + /* * when doing I/O (not when verifying), * check for any errors that are to be ignored @@ -571,6 +563,28 @@ static inline bool io_in_polling(struct thread_data *td) return !td->o.iodepth_batch_complete_min && !td->o.iodepth_batch_complete_max; } +/* + * Unlinks files from thread data fio_file structure + */ +static int unlink_all_files(struct thread_data *td) +{ + struct fio_file *f; + unsigned int i; + int ret = 0; + + for_each_file(td, f, i) { + if (f->filetype != FIO_TYPE_FILE) + continue; + ret = td_io_unlink_file(td, f); + if (ret) + break; + } + + if (ret) + td_verror(td, ret, "unlink_all_files"); + + return ret; +} /* * The main verify engine. Runs over the writes we previously submitted, @@ -603,6 +617,15 @@ static void do_verify(struct thread_data *td, uint64_t verify_bytes) if (td->error) return; + /* + * verify_state needs to be reset before verification + * proceeds so that expected random seeds match actual + * random seeds in headers. The main loop will reset + * all random number generators if randrepeat is set. + */ + if (!td->o.rand_repeatable) + td_fill_verify_state_seed(td); + td_set_runstate(td, TD_VERIFYING); io_u = NULL; @@ -610,12 +633,12 @@ static void do_verify(struct thread_data *td, uint64_t verify_bytes) enum fio_ddir ddir; int full; - update_tv_cache(td); + update_ts_cache(td); check_update_rusage(td); - if (runtime_exceeded(td, &td->tv_cache)) { - __update_tv_cache(td); - if (runtime_exceeded(td, &td->tv_cache)) { + if (runtime_exceeded(td, &td->ts_cache)) { + __update_ts_cache(td); + if (runtime_exceeded(td, &td->ts_cache)) { fio_mark_td_terminate(td); break; } @@ -643,7 +666,7 @@ static void do_verify(struct thread_data *td, uint64_t verify_bytes) break; while ((io_u = get_io_u(td)) != NULL) { - if (IS_ERR(io_u)) { + if (IS_ERR_OR_NULL(io_u)) { io_u = NULL; ret = FIO_Q_BUSY; goto reap; @@ -664,7 +687,7 @@ static void do_verify(struct thread_data *td, uint64_t verify_bytes) continue; } else if (io_u->ddir == DDIR_TRIM) { io_u->ddir = DDIR_READ; - io_u_set(io_u, IO_U_F_TRIMMED); + io_u_set(td, io_u, IO_U_F_TRIMMED); break; } else if (io_u->ddir == DDIR_WRITE) { io_u->ddir = DDIR_READ; @@ -740,21 +763,21 @@ static bool exceeds_number_ios(struct thread_data *td) return number_ios >= (td->o.number_ios * td->loops); } -static bool io_issue_bytes_exceeded(struct thread_data *td) +static bool io_bytes_exceeded(struct thread_data *td, uint64_t *this_bytes) { unsigned long long bytes, limit; if (td_rw(td)) - bytes = td->io_issue_bytes[DDIR_READ] + td->io_issue_bytes[DDIR_WRITE]; + bytes = this_bytes[DDIR_READ] + this_bytes[DDIR_WRITE]; else if (td_write(td)) - bytes = td->io_issue_bytes[DDIR_WRITE]; + bytes = this_bytes[DDIR_WRITE]; else if (td_read(td)) - bytes = td->io_issue_bytes[DDIR_READ]; + bytes = this_bytes[DDIR_READ]; else - bytes = td->io_issue_bytes[DDIR_TRIM]; + bytes = this_bytes[DDIR_TRIM]; - if (td->o.io_limit) - limit = td->o.io_limit; + if (td->o.io_size) + limit = td->o.io_size; else limit = td->o.size; @@ -762,26 +785,14 @@ static bool io_issue_bytes_exceeded(struct thread_data *td) return bytes >= limit || exceeds_number_ios(td); } -static bool io_complete_bytes_exceeded(struct thread_data *td) +static bool io_issue_bytes_exceeded(struct thread_data *td) { - unsigned long long bytes, limit; - - if (td_rw(td)) - bytes = td->this_io_bytes[DDIR_READ] + td->this_io_bytes[DDIR_WRITE]; - else if (td_write(td)) - bytes = td->this_io_bytes[DDIR_WRITE]; - else if (td_read(td)) - bytes = td->this_io_bytes[DDIR_READ]; - else - bytes = td->this_io_bytes[DDIR_TRIM]; - - if (td->o.io_limit) - limit = td->o.io_limit; - else - limit = td->o.size; + return io_bytes_exceeded(td, td->io_issue_bytes); +} - limit *= td->loops; - return bytes >= limit || exceeds_number_ios(td); +static bool io_complete_bytes_exceeded(struct thread_data *td) +{ + return io_bytes_exceeded(td, td->this_io_bytes); } /* @@ -800,13 +811,14 @@ static long long usec_for_io(struct thread_data *td, enum fio_ddir ddir) uint64_t val; iops = bps / td->o.bs[ddir]; val = (int64_t) (1000000 / iops) * - -logf(__rand_0_1(&td->poisson_state)); + -logf(__rand_0_1(&td->poisson_state[ddir])); if (val) { - dprint(FD_RATE, "poisson rate iops=%llu\n", - (unsigned long long) 1000000 / val); + dprint(FD_RATE, "poisson rate iops=%llu, ddir=%d\n", + (unsigned long long) 1000000 / val, + ddir); } - td->last_usec += val; - return td->last_usec; + td->last_usec[ddir] += val; + return td->last_usec[ddir]; } else if (bps) { secs = bytes / bps; remainder = bytes % bps; @@ -840,11 +852,11 @@ static void do_io(struct thread_data *td, uint64_t *bytes_done) total_bytes = td->o.size; /* - * Allow random overwrite workloads to write up to io_limit + * Allow random overwrite workloads to write up to io_size * before starting verification phase as 'size' doesn't apply. */ if (td_write(td) && td_random(td) && td->o.norandommap) - total_bytes = max(total_bytes, (uint64_t) td->o.io_limit); + total_bytes = max(total_bytes, (uint64_t) td->o.io_size); /* * If verify_backlog is enabled, we'll run the verify in this * handler as well. For that case, we may need up to twice the @@ -862,7 +874,7 @@ static void do_io(struct thread_data *td, uint64_t *bytes_done) while ((td->o.read_iolog_file && !flist_empty(&td->io_log_list)) || (!flist_empty(&td->trim_list)) || !io_issue_bytes_exceeded(td) || td->o.time_based) { - struct timeval comp_time; + struct timespec comp_time; struct io_u *io_u; int full; enum fio_ddir ddir; @@ -872,11 +884,11 @@ static void do_io(struct thread_data *td, uint64_t *bytes_done) if (td->terminate || td->done) break; - update_tv_cache(td); + update_ts_cache(td); - if (runtime_exceeded(td, &td->tv_cache)) { - __update_tv_cache(td); - if (runtime_exceeded(td, &td->tv_cache)) { + if (runtime_exceeded(td, &td->ts_cache)) { + __update_ts_cache(td); + if (runtime_exceeded(td, &td->ts_cache)) { fio_mark_td_terminate(td); break; } @@ -993,7 +1005,7 @@ reap: if (ret < 0) break; if (!ddir_rw_sum(td->bytes_done) && - !(td->io_ops->flags & FIO_NOIO)) + !td_ioengine_flagged(td, FIO_NOIO)) continue; if (!in_ramp_time(td) && should_check_rate(td)) { @@ -1144,7 +1156,7 @@ static int init_io_u(struct thread_data *td) td->orig_buffer_size = (unsigned long long) max_bs * (unsigned long long) max_units; - if ((td->io_ops->flags & FIO_NOIO) || !(td_read(td) || td_write(td))) + if (td_ioengine_flagged(td, FIO_NOIO) || !(td_read(td) || td_write(td))) data_xfer = 0; err = 0; @@ -1164,7 +1176,7 @@ static int init_io_u(struct thread_data *td) * lucky and the allocator gives us an aligned address. */ if (td->o.odirect || td->o.mem_align || td->o.oatomic || - (td->io_ops->flags & FIO_RAWIO)) + td_ioengine_flagged(td, FIO_RAWIO)) td->orig_buffer_size += page_mask + td->o.mem_align; if (td->o.mem_type == MEM_SHMHUGE || td->o.mem_type == MEM_MMAPHUGE) { @@ -1183,8 +1195,8 @@ static int init_io_u(struct thread_data *td) return 1; if (td->o.odirect || td->o.mem_align || td->o.oatomic || - (td->io_ops->flags & FIO_RAWIO)) - p = PAGE_ALIGN(td->orig_buffer) + td->o.mem_align; + td_ioengine_flagged(td, FIO_RAWIO)) + p = PTR_ALIGN(td->orig_buffer, page_mask) + td->o.mem_align; else p = td->orig_buffer; @@ -1250,16 +1262,22 @@ static int init_io_u(struct thread_data *td) return 0; } +/* + * This function is Linux specific. + * FIO_HAVE_IOSCHED_SWITCH enabled currently means it's Linux. + */ static int switch_ioscheduler(struct thread_data *td) { +#ifdef FIO_HAVE_IOSCHED_SWITCH char tmp[256], tmp2[128]; FILE *f; int ret; - if (td->io_ops->flags & FIO_DISKLESSIO) + if (td_ioengine_flagged(td, FIO_DISKLESSIO)) return 0; - sprintf(tmp, "%s/queue/scheduler", td->sysfs_root); + assert(td->files && td->files[0]); + sprintf(tmp, "%s/queue/scheduler", td->files[0]->du->sysfs_root); f = fopen(tmp, "r+"); if (!f) { @@ -1299,6 +1317,14 @@ static int switch_ioscheduler(struct thread_data *td) */ tmp[strlen(tmp) - 1] = '\0'; + /* + * Write to "none" entry doesn't fail, so check the result here. + */ + if (!strcmp(tmp, "none")) { + log_err("fio: io scheduler is not tunable\n"); + fclose(f); + return 0; + } sprintf(tmp2, "[%s]", td->o.ioscheduler); if (!strstr(tmp, tmp2)) { @@ -1310,6 +1336,9 @@ static int switch_ioscheduler(struct thread_data *td) fclose(f); return 0; +#else + return 0; +#endif } static bool keep_running(struct thread_data *td) @@ -1327,8 +1356,8 @@ static bool keep_running(struct thread_data *td) if (exceeds_number_ios(td)) return false; - if (td->o.io_limit) - limit = td->o.io_limit; + if (td->o.io_size) + limit = td->o.io_size; else limit = td->o.size; @@ -1336,14 +1365,14 @@ static bool keep_running(struct thread_data *td) uint64_t diff; /* - * If the difference is less than the minimum IO size, we + * If the difference is less than the maximum IO size, we * are done. */ diff = limit - ddir_rw_sum(td->io_bytes); if (diff < td_max_bs(td)) return false; - if (fio_files_done(td) && !td->o.io_limit) + if (fio_files_done(td) && !td->o.io_size) return false; return true; @@ -1386,10 +1415,10 @@ static uint64_t do_dry_run(struct thread_data *td) break; io_u = get_io_u(td); - if (!io_u) + if (IS_ERR_OR_NULL(io_u)) break; - io_u_set(io_u, IO_U_F_FLIGHT); + io_u_set(td, io_u, IO_U_F_FLIGHT); io_u->error = 0; io_u->resid = 0; if (ddir_rw(acct_ddir(io_u))) @@ -1428,7 +1457,8 @@ static void *thread_main(void *data) struct thread_data *td = fd->td; struct thread_options *o = &td->o; struct sk_out *sk_out = fd->sk_out; - pthread_condattr_t attr; + uint64_t bytes_done[DDIR_RWDIR_CNT]; + int deadlock_loop_cnt; int clear_state; int ret; @@ -1453,12 +1483,18 @@ static void *thread_main(void *data) INIT_FLIST_HEAD(&td->verify_list); INIT_FLIST_HEAD(&td->trim_list); INIT_FLIST_HEAD(&td->next_rand_list); - pthread_mutex_init(&td->io_u_lock, NULL); td->io_hist_tree = RB_ROOT; - pthread_condattr_init(&attr); - pthread_cond_init(&td->verify_cond, &attr); - pthread_cond_init(&td->free_cond, &attr); + ret = mutex_cond_init_pshared(&td->io_u_lock, &td->free_cond); + if (ret) { + td_verror(td, ret, "mutex_cond_init_pshared"); + goto err; + } + ret = cond_init_pshared(&td->verify_cond); + if (ret) { + td_verror(td, ret, "mutex_cond_pshared"); + goto err; + } td_set_runstate(td, TD_INITIALIZED); dprint(FD_MUTEX, "up startup_mutex\n"); @@ -1480,6 +1516,14 @@ static void *thread_main(void *data) goto err; } + /* + * Do this early, we don't want the compress threads to be limited + * to the same CPUs as the IO workers. So do this before we set + * any potential CPU affinity + */ + if (iolog_compress_init(td, sk_out)) + goto err; + /* * If we have a gettimeofday() thread, make sure we exclude that * thread from this job @@ -1614,18 +1658,16 @@ static void *thread_main(void *data) goto err; } - if (iolog_compress_init(td, sk_out)) - goto err; - fio_verify_init(td); if (rate_submit_init(td, sk_out)) goto err; - fio_gettime(&td->epoch, NULL); + set_epoch_time(td, o->log_unix_epoch); fio_getrusage(&td->ru_start); memcpy(&td->bw_sample_time, &td->epoch, sizeof(td->epoch)); memcpy(&td->iops_sample_time, &td->epoch, sizeof(td->epoch)); + memcpy(&td->ss.prev_time, &td->epoch, sizeof(td->epoch)); if (o->ratemin[DDIR_READ] || o->ratemin[DDIR_WRITE] || o->ratemin[DDIR_TRIM]) { @@ -1637,23 +1679,27 @@ static void *thread_main(void *data) sizeof(td->bw_sample_time)); } + memset(bytes_done, 0, sizeof(bytes_done)); clear_state = 0; + while (keep_running(td)) { uint64_t verify_bytes; fio_gettime(&td->start, NULL); - memcpy(&td->tv_cache, &td->start, sizeof(td->start)); + memcpy(&td->ts_cache, &td->start, sizeof(td->start)); - if (clear_state) + if (clear_state) { clear_io_state(td, 0); + if (o->unlink_each_loop && unlink_all_files(td)) + break; + } + prune_io_piece_log(td); - if (td->o.verify_only && (td_write(td) || td_rw(td))) + if (td->o.verify_only && td_write(td)) verify_bytes = do_dry_run(td); else { - uint64_t bytes_done[DDIR_RWDIR_CNT]; - do_io(td, bytes_done); if (!ddir_rw_sum(bytes_done)) { @@ -1665,6 +1711,14 @@ static void *thread_main(void *data) } } + /* + * If we took too long to shut down, the main thread could + * already consider us reaped/exited. If that happens, break + * out and clean up. + */ + if (td->runstate >= TD_EXITED) + break; + clear_state = 1; /* @@ -1674,9 +1728,19 @@ static void *thread_main(void *data) * the rusage_sem, which would never get upped because * this thread is waiting for the stat mutex. */ - check_update_rusage(td); + deadlock_loop_cnt = 0; + do { + check_update_rusage(td); + if (!fio_mutex_down_trylock(stat_mutex)) + break; + usleep(1000); + if (deadlock_loop_cnt++ > 5000) { + log_err("fio seems to be stuck grabbing stat_mutex, forcibly exiting\n"); + td->error = EDEADLK; + goto err; + } + } while (1); - fio_mutex_down(stat_mutex); if (td_read(td) && td->io_bytes[DDIR_READ]) update_runtime(td, elapsed_us, DDIR_READ); if (td_write(td) && td->io_bytes[DDIR_WRITE]) @@ -1691,7 +1755,7 @@ static void *thread_main(void *data) if (!o->do_verify || o->verify == VERIFY_NONE || - (td->io_ops->flags & FIO_UNIDIR)) + td_ioengine_flagged(td, FIO_UNIDIR)) continue; clear_io_state(td, 0); @@ -1714,6 +1778,20 @@ static void *thread_main(void *data) break; } + /* + * If td ended up with no I/O when it should have had, + * then something went wrong unless FIO_NOIO or FIO_DISKLESSIO. + * (Are we not missing other flags that can be ignored ?) + */ + if ((td->o.size || td->o.io_size) && !ddir_rw_sum(bytes_done) && + !(td_ioengine_flagged(td, FIO_NOIO) || + td_ioengine_flagged(td, FIO_DISKLESSIO))) + log_err("%s: No I/O performed by %s, " + "perhaps try --debug=io option for details?\n", + td->o.name, td->io_ops->name); + + td_set_runstate(td, TD_FINISHING); + update_rusage_stat(td); td->ts.total_run_time = mtime_since_now(&td->epoch); td->ts.io_bytes[DDIR_READ] = td->io_bytes[DDIR_READ]; @@ -1772,9 +1850,6 @@ err: if (o->write_iolog_file) write_iolog_close(td); - fio_mutex_remove(td->mutex); - td->mutex = NULL; - td_set_runstate(td, TD_EXITED); /* @@ -1787,51 +1862,11 @@ err: return (void *) (uintptr_t) td->error; } - -/* - * We cannot pass the td data into a forked process, so attach the td and - * pass it to the thread worker. - */ -static int fork_main(struct sk_out *sk_out, int shmid, int offset) -{ - struct fork_data *fd; - void *data, *ret; - -#if !defined(__hpux) && !defined(CONFIG_NO_SHM) - data = shmat(shmid, NULL, 0); - if (data == (void *) -1) { - int __err = errno; - - perror("shmat"); - return __err; - } -#else - /* - * HP-UX inherits shm mappings? - */ - data = threads; -#endif - - fd = calloc(1, sizeof(*fd)); - fd->td = data + offset * sizeof(struct thread_data); - fd->sk_out = sk_out; - ret = thread_main(fd); - shmdt(data); - return (int) (uintptr_t) ret; -} - -static void dump_td_info(struct thread_data *td) -{ - log_err("fio: job '%s' hasn't exited in %lu seconds, it appears to " - "be stuck. Doing forceful exit of this job.\n", td->o.name, - (unsigned long) time_since_now(&td->terminate_time)); -} - /* * Run over the job map and reap the threads that have exited, if any. */ -static void reap_threads(unsigned int *nr_running, unsigned int *t_rate, - unsigned int *m_rate) +static void reap_threads(unsigned int *nr_running, uint64_t *t_rate, + uint64_t *m_rate) { struct thread_data *td; unsigned int cputhreads, realthreads, pending; @@ -1909,8 +1944,13 @@ static void reap_threads(unsigned int *nr_running, unsigned int *t_rate, * move on. */ if (td->terminate && + td->runstate < TD_FSYNCING && time_since_now(&td->terminate_time) >= FIO_REAP_TIMEOUT) { - dump_td_info(td); + log_err("fio: job '%s' (state=%d) hasn't exited in " + "%lu seconds, it appears to be stuck. Doing " + "forceful exit of this job.\n", + td->o.name, td->runstate, + (unsigned long) time_since_now(&td->terminate_time)); td_set_runstate(td, TD_REAPED); goto reaped; } @@ -2023,8 +2063,16 @@ static bool check_mount_writes(struct thread_data *td) if (!td_write(td) || td->o.allow_mounted_write) return false; + /* + * If FIO_HAVE_CHARDEV_SIZE is defined, it's likely that chrdevs + * are mkfs'd and mounted. + */ for_each_file(td, f, i) { - if (f->filetype != FIO_TYPE_BD) +#ifdef FIO_HAVE_CHARDEV_SIZE + if (f->filetype != FIO_TYPE_BLOCK && f->filetype != FIO_TYPE_CHAR) +#else + if (f->filetype != FIO_TYPE_BLOCK) +#endif continue; if (device_is_mounted(f->file_name)) goto mounted; @@ -2032,7 +2080,7 @@ static bool check_mount_writes(struct thread_data *td) return false; mounted: - log_err("fio: %s appears mounted, and 'allow_mounted_write' isn't set. Aborting.", f->file_name); + log_err("fio: %s appears mounted, and 'allow_mounted_write' isn't set. Aborting.\n", f->file_name); return true; } @@ -2068,7 +2116,8 @@ static bool waitee_running(struct thread_data *me) static void run_threads(struct sk_out *sk_out) { struct thread_data *td; - unsigned int i, todo, nr_running, m_rate, t_rate, nr_started; + unsigned int i, todo, nr_running, nr_started; + uint64_t m_rate, t_rate; uint64_t spent; if (fio_gtod_offload && fio_start_gtod_thread()) @@ -2153,8 +2202,9 @@ reap: while (todo) { struct thread_data *map[REAL_MAX_JOBS]; - struct timeval this_start; + struct timespec this_start; int this_jobs = 0, left; + struct fork_data *fd; /* * create threads (TD_NOT_CREATED -> TD_CREATED) @@ -2204,14 +2254,13 @@ reap: map[this_jobs++] = td; nr_started++; + fd = calloc(1, sizeof(*fd)); + fd->td = td; + fd->sk_out = sk_out; + if (td->o.use_thread) { - struct fork_data *fd; int ret; - fd = calloc(1, sizeof(*fd)); - fd->td = td; - fd->sk_out = sk_out; - dprint(FD_PROCESS, "will pthread_create\n"); ret = pthread_create(&td->thread, NULL, thread_main, fd); @@ -2231,8 +2280,9 @@ reap: dprint(FD_PROCESS, "will fork\n"); pid = fork(); if (!pid) { - int ret = fork_main(sk_out, shm_id, i); + int ret; + ret = (int)(uintptr_t)thread_main(fd); _exit(ret); } else if (i == fio_debug_jobno) *fio_debug_jobp = pid; @@ -2323,158 +2373,10 @@ reap: update_io_ticks(); } -void helper_reset(void) -{ - if (!helper_data) - return; - - pthread_mutex_lock(&helper_data->lock); - - if (!helper_data->reset) { - helper_data->reset = 1; - pthread_cond_signal(&helper_data->cond); - } - - pthread_mutex_unlock(&helper_data->lock); -} - -void helper_do_stat(void) -{ - if (!helper_data) - return; - - pthread_mutex_lock(&helper_data->lock); - helper_data->do_stat = 1; - pthread_cond_signal(&helper_data->cond); - pthread_mutex_unlock(&helper_data->lock); -} - -bool helper_should_exit(void) -{ - if (!helper_data) - return true; - - return helper_data->exit; -} - -static void wait_for_helper_thread_exit(void) -{ - void *ret; - - pthread_mutex_lock(&helper_data->lock); - helper_data->exit = 1; - pthread_cond_signal(&helper_data->cond); - pthread_mutex_unlock(&helper_data->lock); - - pthread_join(helper_data->thread, &ret); -} - static void free_disk_util(void) { disk_util_prune_entries(); - - pthread_cond_destroy(&helper_data->cond); - pthread_mutex_destroy(&helper_data->lock); - sfree(helper_data); -} - -static void *helper_thread_main(void *data) -{ - struct helper_data *hd = data; - unsigned int msec_to_next_event, next_log; - struct timeval tv, last_du; - int ret = 0; - - sk_out_assign(hd->sk_out); - - gettimeofday(&tv, NULL); - memcpy(&last_du, &tv, sizeof(tv)); - - fio_mutex_up(startup_mutex); - - msec_to_next_event = DISK_UTIL_MSEC; - while (!ret && !hd->exit) { - struct timespec ts; - struct timeval now; - uint64_t since_du; - - timeval_add_msec(&tv, msec_to_next_event); - ts.tv_sec = tv.tv_sec; - ts.tv_nsec = tv.tv_usec * 1000; - - pthread_mutex_lock(&hd->lock); - pthread_cond_timedwait(&hd->cond, &hd->lock, &ts); - - gettimeofday(&now, NULL); - - if (hd->reset) { - memcpy(&tv, &now, sizeof(tv)); - memcpy(&last_du, &now, sizeof(last_du)); - hd->reset = 0; - } - - pthread_mutex_unlock(&hd->lock); - - since_du = mtime_since(&last_du, &now); - if (since_du >= DISK_UTIL_MSEC || DISK_UTIL_MSEC - since_du < 10) { - ret = update_io_ticks(); - timeval_add_msec(&last_du, DISK_UTIL_MSEC); - msec_to_next_event = DISK_UTIL_MSEC; - if (since_du >= DISK_UTIL_MSEC) - msec_to_next_event -= (since_du - DISK_UTIL_MSEC); - } else { - if (since_du >= DISK_UTIL_MSEC) - msec_to_next_event = DISK_UTIL_MSEC - (DISK_UTIL_MSEC - since_du); - else - msec_to_next_event = DISK_UTIL_MSEC; - } - - if (hd->do_stat) { - hd->do_stat = 0; - __show_running_run_stats(); - } - - next_log = calc_log_samples(); - if (!next_log) - next_log = DISK_UTIL_MSEC; - - msec_to_next_event = min(next_log, msec_to_next_event); - - if (!is_backend) - print_thread_status(); - } - - fio_writeout_logs(false); - - sk_out_drop(); - return NULL; -} - -static int create_helper_thread(struct sk_out *sk_out) -{ - struct helper_data *hd; - int ret; - - hd = smalloc(sizeof(*hd)); - - setup_disk_util(); - - hd->sk_out = sk_out; - pthread_cond_init(&hd->cond, NULL); - pthread_mutex_init(&hd->lock, NULL); - - ret = pthread_create(&hd->thread, NULL, helper_thread_main, hd); - if (ret) { - log_err("Can't create helper thread: %s\n", strerror(ret)); - return 1; - } - - helper_data = hd; - - dprint(FD_MUTEX, "wait on startup_mutex\n"); - fio_mutex_down(startup_mutex); - dprint(FD_MUTEX, "done waiting on startup_mutex\n"); - return 0; + helper_thread_destroy(); } int fio_backend(struct sk_out *sk_out) @@ -2507,14 +2409,14 @@ int fio_backend(struct sk_out *sk_out) set_genesis_time(); stat_init(); - create_helper_thread(sk_out); + helper_thread_create(startup_mutex, sk_out); cgroup_list = smalloc(sizeof(*cgroup_list)); INIT_FLIST_HEAD(cgroup_list); run_threads(sk_out); - wait_for_helper_thread_exit(); + helper_thread_exit(); if (!fio_abort) { __show_run_stats(); @@ -2522,18 +2424,26 @@ int fio_backend(struct sk_out *sk_out) for (i = 0; i < DDIR_RWDIR_CNT; i++) { struct io_log *log = agg_io_log[i]; - flush_log(log, 0); + flush_log(log, false); free_log(log); } } } for_each_td(td, i) { + if (td->ss.dur) { + if (td->ss.iops_data != NULL) { + free(td->ss.iops_data); + free(td->ss.bw_data); + } + } fio_options_free(td); if (td->rusage_sem) { fio_mutex_remove(td->rusage_sem); td->rusage_sem = NULL; } + fio_mutex_remove(td->mutex); + td->mutex = NULL; } free_disk_util();