X-Git-Url: https://git.kernel.dk/?a=blobdiff_plain;f=backend.c;h=bb8bd13bd11f973e616fd3897483e27ec833cbc5;hb=acbda87c34c743ff2d9e125d9539bcfbbf49eb75;hp=4bc00e696602d681a7008512f0f9a9c1fbb73c30;hpb=5be9bf098d3766a8f27d446d82911c5ef0153676;p=fio.git diff --git a/backend.c b/backend.c index 4bc00e69..3a99850d 100644 --- a/backend.c +++ b/backend.c @@ -18,33 +18,22 @@ * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software - * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * */ #include -#include #include -#include #include -#include -#include #include -#include #include #include #include -#include -#include #include +#include #include "fio.h" -#ifndef FIO_NO_HAVE_SHM_H -#include -#endif -#include "hash.h" #include "smalloc.h" #include "verify.h" -#include "trim.h" #include "diskutil.h" #include "cgroup.h" #include "profile.h" @@ -58,12 +47,14 @@ #include "lib/mountcheck.h" #include "rate-submit.h" #include "helper_thread.h" +#include "pshared.h" +#include "zone-dist.h" -static struct fio_mutex *startup_mutex; +static struct fio_sem *startup_sem; static struct flist_head *cgroup_list; -static char *cgroup_mnt; +static struct cgroup_mnt *cgroup_mnt; static int exit_value; -static volatile int fio_abort; +static volatile bool fio_abort; static unsigned int nr_process = 0; static unsigned int nr_thread = 0; @@ -71,16 +62,22 @@ struct io_log *agg_io_log[DDIR_RWDIR_CNT]; int groupid = 0; unsigned int thread_number = 0; +unsigned int nr_segments = 0; +unsigned int cur_segment = 0; unsigned int stat_number = 0; -int shm_id = 0; int temp_stall_ts; unsigned long done_secs = 0; +#ifdef PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP +pthread_mutex_t overlap_check = PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP; +#else +pthread_mutex_t overlap_check = PTHREAD_MUTEX_INITIALIZER; +#endif #define JOB_START_TIMEOUT (5 * 1000) static void sig_int(int sig) { - if (threads) { + if (nr_segments) { if (is_backend) fio_server_got_signal(sig); else { @@ -89,7 +86,7 @@ static void sig_int(int sig) exit_value = 128; } - fio_terminate_threads(TERMINATE_ALL); + fio_terminate_threads(TERMINATE_ALL, TERMINATE_ALL); } } @@ -136,16 +133,13 @@ static void set_sig_handlers(void) /* * Check if we are above the minimum rate given. */ -static bool __check_min_rate(struct thread_data *td, struct timeval *now, +static bool __check_min_rate(struct thread_data *td, struct timespec *now, enum fio_ddir ddir) { - unsigned long long bytes = 0; - unsigned long iops = 0; - unsigned long spent; - unsigned long rate; - unsigned int ratemin = 0; - unsigned int rate_iops = 0; - unsigned int rate_iops_min = 0; + unsigned long long current_rate_check_bytes = td->this_io_bytes[ddir]; + unsigned long current_rate_check_blocks = td->this_io_blocks[ddir]; + unsigned long long option_rate_bytes_min = td->o.ratemin[ddir]; + unsigned int option_rate_iops_min = td->o.rate_iops_min[ddir]; assert(ddir_rw(ddir)); @@ -158,81 +152,55 @@ static bool __check_min_rate(struct thread_data *td, struct timeval *now, if (mtime_since(&td->start, now) < 2000) return false; - iops += td->this_io_blocks[ddir]; - bytes += td->this_io_bytes[ddir]; - ratemin += td->o.ratemin[ddir]; - rate_iops += td->o.rate_iops[ddir]; - rate_iops_min += td->o.rate_iops_min[ddir]; - /* - * if rate blocks is set, sample is running + * if last_rate_check_blocks or last_rate_check_bytes is set, + * we can compute a rate per ratecycle */ - if (td->rate_bytes[ddir] || td->rate_blocks[ddir]) { - spent = mtime_since(&td->lastrate[ddir], now); - if (spent < td->o.ratecycle) + if (td->last_rate_check_bytes[ddir] || td->last_rate_check_blocks[ddir]) { + unsigned long spent = mtime_since(&td->last_rate_check_time[ddir], now); + if (spent < td->o.ratecycle || spent==0) return false; - if (td->o.rate[ddir] || td->o.ratemin[ddir]) { + if (td->o.ratemin[ddir]) { /* * check bandwidth specified rate */ - if (bytes < td->rate_bytes[ddir]) { - log_err("%s: rate_min=%uB/s not met, only transferred %lluB\n", - td->o.name, ratemin, bytes); + unsigned long long current_rate_bytes = + ((current_rate_check_bytes - td->last_rate_check_bytes[ddir]) * 1000) / spent; + if (current_rate_bytes < option_rate_bytes_min) { + log_err("%s: rate_min=%lluB/s not met, got %lluB/s\n", + td->o.name, option_rate_bytes_min, current_rate_bytes); return true; - } else { - if (spent) - rate = ((bytes - td->rate_bytes[ddir]) * 1000) / spent; - else - rate = 0; - - if (rate < ratemin || - bytes < td->rate_bytes[ddir]) { - log_err("%s: rate_min=%uB/s not met, got %luB/s\n", - td->o.name, ratemin, rate); - return true; - } } } else { /* * checks iops specified rate */ - if (iops < rate_iops) { - log_err("%s: rate_iops_min=%u not met, only performed %lu IOs\n", - td->o.name, rate_iops, iops); + unsigned long long current_rate_iops = + ((current_rate_check_blocks - td->last_rate_check_blocks[ddir]) * 1000) / spent; + + if (current_rate_iops < option_rate_iops_min) { + log_err("%s: rate_iops_min=%u not met, got %llu IOPS\n", + td->o.name, option_rate_iops_min, current_rate_iops); return true; - } else { - if (spent) - rate = ((iops - td->rate_blocks[ddir]) * 1000) / spent; - else - rate = 0; - - if (rate < rate_iops_min || - iops < td->rate_blocks[ddir]) { - log_err("%s: rate_iops_min=%u not met, got %lu IOPS\n", - td->o.name, rate_iops_min, rate); - return true; - } } } } - td->rate_bytes[ddir] = bytes; - td->rate_blocks[ddir] = iops; - memcpy(&td->lastrate[ddir], now, sizeof(*now)); + td->last_rate_check_bytes[ddir] = current_rate_check_bytes; + td->last_rate_check_blocks[ddir] = current_rate_check_blocks; + memcpy(&td->last_rate_check_time[ddir], now, sizeof(*now)); return false; } -static bool check_min_rate(struct thread_data *td, struct timeval *now) +static bool check_min_rate(struct thread_data *td, struct timespec *now) { bool ret = false; - if (td->bytes_done[DDIR_READ]) - ret |= __check_min_rate(td, now, DDIR_READ); - if (td->bytes_done[DDIR_WRITE]) - ret |= __check_min_rate(td, now, DDIR_WRITE); - if (td->bytes_done[DDIR_TRIM]) - ret |= __check_min_rate(td, now, DDIR_TRIM); + for_each_rw_ddir(ddir) { + if (td->bytes_done[ddir]) + ret |= __check_min_rate(td, now, ddir); + } return ret; } @@ -249,8 +217,6 @@ static void cleanup_pending_aio(struct thread_data *td) * get immediately available events, if any */ r = io_u_queued_complete(td, 0); - if (r < 0) - return; /* * now cancel remaining active events @@ -279,13 +245,14 @@ static void cleanup_pending_aio(struct thread_data *td) static bool fio_io_sync(struct thread_data *td, struct fio_file *f) { struct io_u *io_u = __get_io_u(td); - int ret; + enum fio_q_status ret; if (!io_u) return true; io_u->ddir = DDIR_SYNC; io_u->file = f; + io_u_set(td, io_u, IO_U_F_NO_FILE_PUT); if (td_io_prep(td, io_u)) { put_io_u(td, io_u); @@ -294,16 +261,13 @@ static bool fio_io_sync(struct thread_data *td, struct fio_file *f) requeue: ret = td_io_queue(td, io_u); - if (ret < 0) { - td_verror(td, io_u->error, "td_io_queue"); - put_io_u(td, io_u); - return true; - } else if (ret == FIO_Q_QUEUED) { - if (td_io_commit(td)) - return true; + switch (ret) { + case FIO_Q_QUEUED: + td_io_commit(td); if (io_u_queued_complete(td, 1) < 0) return true; - } else if (ret == FIO_Q_COMPLETED) { + break; + case FIO_Q_COMPLETED: if (io_u->error) { td_verror(td, io_u->error, "td_io_queue"); return true; @@ -311,9 +275,9 @@ requeue: if (io_u_sync_complete(td, io_u) < 0) return true; - } else if (ret == FIO_Q_BUSY) { - if (td_io_commit(td)) - return true; + break; + case FIO_Q_BUSY: + td_io_commit(td); goto requeue; } @@ -322,7 +286,7 @@ requeue: static int fio_file_fsync(struct thread_data *td, struct fio_file *f) { - int ret; + int ret, ret2; if (fio_file_open(f)) return fio_io_sync(td, f); @@ -331,22 +295,24 @@ static int fio_file_fsync(struct thread_data *td, struct fio_file *f) return 1; ret = fio_io_sync(td, f); - td_io_close_file(td, f); - return ret; + ret2 = 0; + if (fio_file_open(f)) + ret2 = td_io_close_file(td, f); + return (ret || ret2); } -static inline void __update_tv_cache(struct thread_data *td) +static inline void __update_ts_cache(struct thread_data *td) { - fio_gettime(&td->tv_cache, NULL); + fio_gettime(&td->ts_cache, NULL); } -static inline void update_tv_cache(struct thread_data *td) +static inline void update_ts_cache(struct thread_data *td) { - if ((++td->tv_cache_nr & td->tv_cache_mask) == td->tv_cache_mask) - __update_tv_cache(td); + if ((++td->ts_cache_nr & td->ts_cache_mask) == td->ts_cache_mask) + __update_ts_cache(td); } -static inline bool runtime_exceeded(struct thread_data *td, struct timeval *t) +static inline bool runtime_exceeded(struct thread_data *td, struct timespec *t) { if (in_ramp_time(td)) return false; @@ -400,7 +366,7 @@ static bool break_on_this_error(struct thread_data *td, enum fio_ddir ddir, td_clear_error(td); *retptr = 0; return false; - } else if (td->o.fill_device && err == ENOSPC) { + } else if (td->o.fill_device && (err == ENOSPC || err == EDQUOT)) { /* * We expect to hit this error if * fill_device option is set. @@ -426,11 +392,11 @@ static void check_update_rusage(struct thread_data *td) if (td->update_rusage) { td->update_rusage = 0; update_rusage_stat(td); - fio_mutex_up(td->rusage_sem); + fio_sem_up(td->rusage_sem); } } -static int wait_for_completions(struct thread_data *td, struct timeval *time) +static int wait_for_completions(struct thread_data *td, struct timespec *time) { const int full = queue_full(td); int min_evts = 0; @@ -446,9 +412,7 @@ static int wait_for_completions(struct thread_data *td, struct timeval *time) if ((full && !min_evts) || !td->o.iodepth_batch_complete_min) min_evts = 1; - if (time && (__should_check_rate(td, DDIR_READ) || - __should_check_rate(td, DDIR_WRITE) || - __should_check_rate(td, DDIR_TRIM))) + if (time && should_check_rate(td)) fio_gettime(time, NULL); do { @@ -462,24 +426,22 @@ static int wait_for_completions(struct thread_data *td, struct timeval *time) int io_queue_event(struct thread_data *td, struct io_u *io_u, int *ret, enum fio_ddir ddir, uint64_t *bytes_issued, int from_verify, - struct timeval *comp_time) + struct timespec *comp_time) { - int ret2; - switch (*ret) { case FIO_Q_COMPLETED: if (io_u->error) { *ret = -io_u->error; clear_io_u(td, io_u); } else if (io_u->resid) { - int bytes = io_u->xfer_buflen - io_u->resid; + long long bytes = io_u->xfer_buflen - io_u->resid; struct fio_file *f = io_u->file; if (bytes_issued) *bytes_issued += bytes; if (!from_verify) - trim_io_piece(td, io_u); + trim_io_piece(io_u); /* * zero read, fail @@ -499,16 +461,13 @@ int io_queue_event(struct thread_data *td, struct io_u *io_u, int *ret, if (ddir_rw(io_u->ddir)) td->ts.short_io_u[io_u->ddir]++; - f = io_u->file; if (io_u->offset == f->real_file_size) goto sync_done; requeue_io_u(td, &io_u); } else { sync_done: - if (comp_time && (__should_check_rate(td, DDIR_READ) || - __should_check_rate(td, DDIR_WRITE) || - __should_check_rate(td, DDIR_TRIM))) + if (comp_time && should_check_rate(td)) fio_gettime(comp_time, NULL); *ret = io_u_sync_complete(td, io_u); @@ -542,9 +501,7 @@ sync_done: if (!from_verify) unlog_io_piece(td, io_u); requeue_io_u(td, &io_u); - ret2 = td_io_commit(td); - if (ret2 < 0) - *ret = ret2; + td_io_commit(td); break; default: assert(*ret < 0); @@ -586,6 +543,50 @@ static int unlink_all_files(struct thread_data *td) return ret; } +/* + * Check if io_u will overlap an in-flight IO in the queue + */ +bool in_flight_overlap(struct io_u_queue *q, struct io_u *io_u) +{ + bool overlap; + struct io_u *check_io_u; + unsigned long long x1, x2, y1, y2; + int i; + + x1 = io_u->offset; + x2 = io_u->offset + io_u->buflen; + overlap = false; + io_u_qiter(q, check_io_u, i) { + if (check_io_u->flags & IO_U_F_FLIGHT) { + y1 = check_io_u->offset; + y2 = check_io_u->offset + check_io_u->buflen; + + if (x1 < y2 && y1 < x2) { + overlap = true; + dprint(FD_IO, "in-flight overlap: %llu/%llu, %llu/%llu\n", + x1, io_u->buflen, + y1, check_io_u->buflen); + break; + } + } + } + + return overlap; +} + +static enum fio_q_status io_u_submit(struct thread_data *td, struct io_u *io_u) +{ + /* + * Check for overlap if the user asked us to, and we have + * at least one IO in flight besides this one. + */ + if (td->o.serialize_overlap && td->cur_depth > 1 && + in_flight_overlap(&td->io_u_all, io_u)) + return FIO_Q_BUSY; + + return td_io_queue(td, io_u); +} + /* * The main verify engine. Runs over the writes we previously submitted, * reads the blocks back in, and checks the crc/md5 of the data. @@ -633,12 +634,12 @@ static void do_verify(struct thread_data *td, uint64_t verify_bytes) enum fio_ddir ddir; int full; - update_tv_cache(td); + update_ts_cache(td); check_update_rusage(td); - if (runtime_exceeded(td, &td->tv_cache)) { - __update_tv_cache(td); - if (runtime_exceeded(td, &td->tv_cache)) { + if (runtime_exceeded(td, &td->ts_cache)) { + __update_ts_cache(td); + if (runtime_exceeded(td, &td->ts_cache)) { fio_mark_td_terminate(td); break; } @@ -691,6 +692,7 @@ static void do_verify(struct thread_data *td, uint64_t verify_bytes) break; } else if (io_u->ddir == DDIR_WRITE) { io_u->ddir = DDIR_READ; + populate_verify_io_u(td, io_u); break; } else { put_io_u(td, io_u); @@ -716,7 +718,7 @@ static void do_verify(struct thread_data *td, uint64_t verify_bytes) if (!td->o.disable_slat) fio_gettime(&io_u->start_time, NULL); - ret = td_io_queue(td, io_u); + ret = io_u_submit(td, io_u); if (io_queue_event(td, io_u, &ret, ddir, NULL, 1, NULL)) break; @@ -801,32 +803,116 @@ static bool io_complete_bytes_exceeded(struct thread_data *td) */ static long long usec_for_io(struct thread_data *td, enum fio_ddir ddir) { - uint64_t secs, remainder, bps, bytes, iops; + uint64_t bps = td->rate_bps[ddir]; assert(!(td->flags & TD_F_CHILD)); - bytes = td->rate_io_issue_bytes[ddir]; - bps = td->rate_bps[ddir]; if (td->o.rate_process == RATE_PROCESS_POISSON) { - uint64_t val; - iops = bps / td->o.bs[ddir]; + uint64_t val, iops; + + iops = bps / td->o.min_bs[ddir]; val = (int64_t) (1000000 / iops) * - -logf(__rand_0_1(&td->poisson_state)); + -logf(__rand_0_1(&td->poisson_state[ddir])); if (val) { - dprint(FD_RATE, "poisson rate iops=%llu\n", - (unsigned long long) 1000000 / val); + dprint(FD_RATE, "poisson rate iops=%llu, ddir=%d\n", + (unsigned long long) 1000000 / val, + ddir); } - td->last_usec += val; - return td->last_usec; + td->last_usec[ddir] += val; + return td->last_usec[ddir]; } else if (bps) { - secs = bytes / bps; - remainder = bytes % bps; + uint64_t bytes = td->rate_io_issue_bytes[ddir]; + uint64_t secs = bytes / bps; + uint64_t remainder = bytes % bps; + return remainder * 1000000 / bps + secs * 1000000; } return 0; } +static void init_thinktime(struct thread_data *td) +{ + if (td->o.thinktime_blocks_type == THINKTIME_BLOCKS_TYPE_COMPLETE) + td->thinktime_blocks_counter = td->io_blocks; + else + td->thinktime_blocks_counter = td->io_issues; + td->last_thinktime = td->epoch; + td->last_thinktime_blocks = 0; +} + +static void handle_thinktime(struct thread_data *td, enum fio_ddir ddir, + struct timespec *time) +{ + unsigned long long b; + uint64_t total; + int left; + struct timespec now; + bool stall = false; + + if (td->o.thinktime_iotime) { + fio_gettime(&now, NULL); + if (utime_since(&td->last_thinktime, &now) + >= td->o.thinktime_iotime + td->o.thinktime) { + stall = true; + } else if (!fio_option_is_set(&td->o, thinktime_blocks)) { + /* + * When thinktime_iotime is set and thinktime_blocks is + * not set, skip the thinktime_blocks check, since + * thinktime_blocks default value 1 does not work + * together with thinktime_iotime. + */ + return; + } + + } + + b = ddir_rw_sum(td->thinktime_blocks_counter); + if (b >= td->last_thinktime_blocks + td->o.thinktime_blocks) + stall = true; + + if (!stall) + return; + + io_u_quiesce(td); + + total = 0; + if (td->o.thinktime_spin) + total = usec_spin(td->o.thinktime_spin); + + left = td->o.thinktime - total; + if (left) + total += usec_sleep(td, left); + + /* + * If we're ignoring thinktime for the rate, add the number of bytes + * we would have done while sleeping, minus one block to ensure we + * start issuing immediately after the sleep. + */ + if (total && td->rate_bps[ddir] && td->o.rate_ign_think) { + uint64_t missed = (td->rate_bps[ddir] * total) / 1000000ULL; + uint64_t bs = td->o.min_bs[ddir]; + uint64_t usperop = bs * 1000000ULL / td->rate_bps[ddir]; + uint64_t over; + + if (usperop <= total) + over = bs; + else + over = (usperop - total) / usperop * -bs; + + td->rate_io_issue_bytes[ddir] += (missed - over); + /* adjust for rate_process=poisson */ + td->last_usec[ddir] += total; + } + + if (time && should_check_rate(td)) + fio_gettime(time, NULL); + + td->last_thinktime_blocks = b; + if (td->o.thinktime_iotime) + td->last_thinktime = now; +} + /* * Main IO worker function. It retrieves io_u's to process and queues * and reaps them, checking for rate and errors along the way. @@ -873,7 +959,7 @@ static void do_io(struct thread_data *td, uint64_t *bytes_done) while ((td->o.read_iolog_file && !flist_empty(&td->io_log_list)) || (!flist_empty(&td->trim_list)) || !io_issue_bytes_exceeded(td) || td->o.time_based) { - struct timeval comp_time; + struct timespec comp_time; struct io_u *io_u; int full; enum fio_ddir ddir; @@ -883,11 +969,11 @@ static void do_io(struct thread_data *td, uint64_t *bytes_done) if (td->terminate || td->done) break; - update_tv_cache(td); + update_ts_cache(td); - if (runtime_exceeded(td, &td->tv_cache)) { - __update_tv_cache(td); - if (runtime_exceeded(td, &td->tv_cache)) { + if (runtime_exceeded(td, &td->ts_cache)) { + __update_ts_cache(td); + if (runtime_exceeded(td, &td->ts_cache)) { fio_mark_td_terminate(td); break; } @@ -900,8 +986,10 @@ static void do_io(struct thread_data *td, uint64_t *bytes_done) * Break if we exceeded the bytes. The exception is time * based runs, but we still need to break out of the loop * for those to run verification, if enabled. + * Jobs read from iolog do not use this stop condition. */ if (bytes_issued >= total_bytes && + !td->o.read_iolog_file && (!td->o.time_based || (td->o.time_based && td->o.verify != VERIFY_NONE))) break; @@ -911,6 +999,7 @@ static void do_io(struct thread_data *td, uint64_t *bytes_done) int err = PTR_ERR(io_u); io_u = NULL; + ddir = DDIR_INVAL; if (err == -EBUSY) { ret = FIO_Q_BUSY; goto reap; @@ -920,6 +1009,9 @@ static void do_io(struct thread_data *td, uint64_t *bytes_done) break; } + if (io_u->ddir == DDIR_WRITE && td->flags & TD_F_DO_VERIFY) + populate_verify_io_u(td, io_u); + ddir = io_u->ddir; /* @@ -930,12 +1022,6 @@ static void do_io(struct thread_data *td, uint64_t *bytes_done) if (td->o.verify != VERIFY_NONE && io_u->ddir == DDIR_READ && ((io_u->flags & IO_U_F_VER_LIST) || !td_rw(td))) { - if (!td->o.verify_pattern_bytes) { - io_u->rand_seed = __rand(&td->verify_state); - if (sizeof(int) != sizeof(long *)) - io_u->rand_seed *= __rand(&td->verify_state); - } - if (verify_state_should_stop(td, io_u)) { put_io_u(td, io_u); break; @@ -963,8 +1049,8 @@ static void do_io(struct thread_data *td, uint64_t *bytes_done) log_io_piece(td, io_u); if (td->o.io_submit_mode == IO_MODE_OFFLOAD) { - const unsigned long blen = io_u->xfer_buflen; - const enum fio_ddir ddir = acct_ddir(io_u); + const unsigned long long blen = io_u->xfer_buflen; + const enum fio_ddir __ddir = acct_ddir(io_u); if (td->error) break; @@ -972,17 +1058,19 @@ static void do_io(struct thread_data *td, uint64_t *bytes_done) workqueue_enqueue(&td->io_wq, &io_u->work); ret = FIO_Q_QUEUED; - if (ddir_rw(ddir)) { - td->io_issues[ddir]++; - td->io_issue_bytes[ddir] += blen; - td->rate_io_issue_bytes[ddir] += blen; + if (ddir_rw(__ddir)) { + td->io_issues[__ddir]++; + td->io_issue_bytes[__ddir] += blen; + td->rate_io_issue_bytes[__ddir] += blen; } - if (should_check_rate(td)) - td->rate_next_io_time[ddir] = usec_for_io(td, ddir); + if (should_check_rate(td)) { + td->rate_next_io_time[__ddir] = usec_for_io(td, __ddir); + fio_gettime(&comp_time, NULL); + } } else { - ret = td_io_queue(td, io_u); + ret = io_u_submit(td, io_u); if (should_check_rate(td)) td->rate_next_io_time[ddir] = usec_for_io(td, ddir); @@ -1003,6 +1091,10 @@ reap: } if (ret < 0) break; + + if (ddir_rw(ddir) && td->o.thinktime) + handle_thinktime(td, ddir, &comp_time); + if (!ddir_rw_sum(td->bytes_done) && !td_ioengine_flagged(td, FIO_NOIO)) continue; @@ -1010,31 +1102,13 @@ reap: if (!in_ramp_time(td) && should_check_rate(td)) { if (check_min_rate(td, &comp_time)) { if (exitall_on_terminate || td->o.exitall_error) - fio_terminate_threads(td->groupid); + fio_terminate_threads(td->groupid, td->o.exit_what); td_verror(td, EIO, "check_min_rate"); break; } } if (!in_ramp_time(td) && td->o.latency_target) lat_target_check(td); - - if (td->o.thinktime) { - unsigned long long b; - - b = ddir_rw_sum(td->io_blocks); - if (!(b % td->o.thinktime_blocks)) { - int left; - - io_u_quiesce(td); - - if (td->o.thinktime_spin) - usec_spin(td->o.thinktime_spin); - - left = td->o.thinktime - td->o.thinktime_spin; - if (left) - usec_sleep(td, left); - } - } } check_update_rusage(td); @@ -1042,7 +1116,7 @@ reap: if (td->trim_entries) log_err("fio: %lu trim entries leaked?\n", td->trim_entries); - if (td->o.fill_device && td->error == ENOSPC) { + if (td->o.fill_device && (td->error == ENOSPC || td->error == EDQUOT)) { td->error = 0; fio_mark_td_terminate(td); } @@ -1057,11 +1131,12 @@ reap: if (i) { ret = io_u_queued_complete(td, i); - if (td->o.fill_device && td->error == ENOSPC) + if (td->o.fill_device && + (td->error == ENOSPC || td->error == EDQUOT)) td->error = 0; } - if (should_fsync(td) && td->o.end_fsync) { + if (should_fsync(td) && (td->o.end_fsync || td->o.fsync_on_close)) { td_set_runstate(td, TD_FSYNCING); for_each_file(td, f, i) { @@ -1072,8 +1147,11 @@ reap: f->file_name); } } - } else + } else { + if (td->o.io_submit_mode == IO_MODE_OFFLOAD) + workqueue_flush(&td->io_wq); cleanup_pending_aio(td); + } /* * stop job if we failed doing any IO @@ -1129,14 +1207,14 @@ static void cleanup_io_u(struct thread_data *td) if (td->io_ops->io_u_free) td->io_ops->io_u_free(td, io_u); - fio_memfree(io_u, sizeof(*io_u)); + fio_memfree(io_u, sizeof(*io_u), td_offload_overlap(td)); } free_io_mem(td); io_u_rexit(&td->io_u_requeues); - io_u_qexit(&td->io_u_freelist); - io_u_qexit(&td->io_u_all); + io_u_qexit(&td->io_u_freelist, false); + io_u_qexit(&td->io_u_all, td_offload_overlap(td)); free_file_completion_logging(td); } @@ -1144,9 +1222,74 @@ static void cleanup_io_u(struct thread_data *td) static int init_io_u(struct thread_data *td) { struct io_u *io_u; - unsigned int max_bs, min_write; int cl_align, i, max_units; - int data_xfer = 1, err; + int err; + + max_units = td->o.iodepth; + + err = 0; + err += !io_u_rinit(&td->io_u_requeues, td->o.iodepth); + err += !io_u_qinit(&td->io_u_freelist, td->o.iodepth, false); + err += !io_u_qinit(&td->io_u_all, td->o.iodepth, td_offload_overlap(td)); + + if (err) { + log_err("fio: failed setting up IO queues\n"); + return 1; + } + + cl_align = os_cache_line_size(); + + for (i = 0; i < max_units; i++) { + void *ptr; + + if (td->terminate) + return 1; + + ptr = fio_memalign(cl_align, sizeof(*io_u), td_offload_overlap(td)); + if (!ptr) { + log_err("fio: unable to allocate aligned memory\n"); + return 1; + } + + io_u = ptr; + memset(io_u, 0, sizeof(*io_u)); + INIT_FLIST_HEAD(&io_u->verify_list); + dprint(FD_MEM, "io_u alloc %p, index %u\n", io_u, i); + + io_u->index = i; + io_u->flags = IO_U_F_FREE; + io_u_qpush(&td->io_u_freelist, io_u); + + /* + * io_u never leaves this stack, used for iteration of all + * io_u buffers. + */ + io_u_qpush(&td->io_u_all, io_u); + + if (td->io_ops->io_u_init) { + int ret = td->io_ops->io_u_init(td, io_u); + + if (ret) { + log_err("fio: failed to init engine data: %d\n", ret); + return 1; + } + } + } + + init_io_u_buffers(td); + + if (init_file_completion_logging(td, max_units)) + return 1; + + return 0; +} + +int init_io_u_buffers(struct thread_data *td) +{ + struct io_u *io_u; + unsigned long long max_bs, min_write; + int i, max_units; + int data_xfer = 1; char *p; max_units = td->o.iodepth; @@ -1158,16 +1301,6 @@ static int init_io_u(struct thread_data *td) if (td_ioengine_flagged(td, FIO_NOIO) || !(td_read(td) || td_write(td))) data_xfer = 0; - err = 0; - err += io_u_rinit(&td->io_u_requeues, td->o.iodepth); - err += io_u_qinit(&td->io_u_freelist, td->o.iodepth); - err += io_u_qinit(&td->io_u_all, td->o.iodepth); - - if (err) { - log_err("fio: failed setting up IO queues\n"); - return 1; - } - /* * if we may later need to do address alignment, then add any * possible adjustment here so that we don't cause a buffer @@ -1179,7 +1312,7 @@ static int init_io_u(struct thread_data *td) td->orig_buffer_size += page_mask + td->o.mem_align; if (td->o.mem_type == MEM_SHMHUGE || td->o.mem_type == MEM_MMAPHUGE) { - unsigned long bs; + unsigned long long bs; bs = td->orig_buffer_size + td->o.hugepage_size - 1; td->orig_buffer_size = bs & ~(td->o.hugepage_size - 1); @@ -1199,23 +1332,8 @@ static int init_io_u(struct thread_data *td) else p = td->orig_buffer; - cl_align = os_cache_line_size(); - for (i = 0; i < max_units; i++) { - void *ptr; - - if (td->terminate) - return 1; - - ptr = fio_memalign(cl_align, sizeof(*io_u)); - if (!ptr) { - log_err("fio: unable to allocate aligned memory\n"); - break; - } - - io_u = ptr; - memset(io_u, 0, sizeof(*io_u)); - INIT_FLIST_HEAD(&io_u->verify_list); + io_u = td->io_u_all.io_us[i]; dprint(FD_MEM, "io_u alloc %p, index %u\n", io_u, i); if (data_xfer) { @@ -1232,51 +1350,25 @@ static int init_io_u(struct thread_data *td) fill_verify_pattern(td, io_u->buf, max_bs, io_u, 0, 0); } } - - io_u->index = i; - io_u->flags = IO_U_F_FREE; - io_u_qpush(&td->io_u_freelist, io_u); - - /* - * io_u never leaves this stack, used for iteration of all - * io_u buffers. - */ - io_u_qpush(&td->io_u_all, io_u); - - if (td->io_ops->io_u_init) { - int ret = td->io_ops->io_u_init(td, io_u); - - if (ret) { - log_err("fio: failed to init engine data: %d\n", ret); - return 1; - } - } - p += max_bs; } - if (init_file_completion_logging(td, max_units)) - return 1; - return 0; } +#ifdef FIO_HAVE_IOSCHED_SWITCH /* - * This function is Linux specific. + * These functions are Linux specific. * FIO_HAVE_IOSCHED_SWITCH enabled currently means it's Linux. */ -static int switch_ioscheduler(struct thread_data *td) +static int set_ioscheduler(struct thread_data *td, struct fio_file *file) { -#ifdef FIO_HAVE_IOSCHED_SWITCH - char tmp[256], tmp2[128]; + char tmp[256], tmp2[128], *p; FILE *f; int ret; - if (td_ioengine_flagged(td, FIO_DISKLESSIO)) - return 0; - - assert(td->files && td->files[0]); - sprintf(tmp, "%s/queue/scheduler", td->files[0]->du->sysfs_root); + assert(file->du && file->du->sysfs_root); + sprintf(tmp, "%s/queue/scheduler", file->du->sysfs_root); f = fopen(tmp, "r+"); if (!f) { @@ -1304,17 +1396,19 @@ static int switch_ioscheduler(struct thread_data *td) /* * Read back and check that the selected scheduler is now the default. */ - memset(tmp, 0, sizeof(tmp)); - ret = fread(tmp, sizeof(tmp), 1, f); + ret = fread(tmp, 1, sizeof(tmp) - 1, f); if (ferror(f) || ret < 0) { td_verror(td, errno, "fread"); fclose(f); return 1; } + tmp[ret] = '\0'; /* - * either a list of io schedulers or "none\n" is expected. + * either a list of io schedulers or "none\n" is expected. Strip the + * trailing newline. */ - tmp[strlen(tmp) - 1] = '\0'; + p = tmp; + strsep(&p, "\n"); /* * Write to "none" entry doesn't fail, so check the result here. @@ -1327,7 +1421,7 @@ static int switch_ioscheduler(struct thread_data *td) sprintf(tmp2, "[%s]", td->o.ioscheduler); if (!strstr(tmp, tmp2)) { - log_err("fio: io scheduler %s not found\n", td->o.ioscheduler); + log_err("fio: unable to set io scheduler to %s\n", td->o.ioscheduler); td_verror(td, EINVAL, "iosched_switch"); fclose(f); return 1; @@ -1335,17 +1429,63 @@ static int switch_ioscheduler(struct thread_data *td) fclose(f); return 0; +} + +static int switch_ioscheduler(struct thread_data *td) +{ + struct fio_file *f; + unsigned int i; + int ret = 0; + + if (td_ioengine_flagged(td, FIO_DISKLESSIO)) + return 0; + + assert(td->files && td->files[0]); + + for_each_file(td, f, i) { + + /* Only consider regular files and block device files */ + switch (f->filetype) { + case FIO_TYPE_FILE: + case FIO_TYPE_BLOCK: + /* + * Make sure that the device hosting the file could + * be determined. + */ + if (!f->du) + continue; + break; + case FIO_TYPE_CHAR: + case FIO_TYPE_PIPE: + default: + continue; + } + + ret = set_ioscheduler(td, f); + if (ret) + return ret; + } + + return 0; +} + #else + +static int switch_ioscheduler(struct thread_data *td) +{ return 0; -#endif } +#endif /* FIO_HAVE_IOSCHED_SWITCH */ + static bool keep_running(struct thread_data *td) { unsigned long long limit; if (td->done) return false; + if (td->terminate) + return false; if (td->o.time_based) return true; if (td->o.loops) { @@ -1380,16 +1520,17 @@ static bool keep_running(struct thread_data *td) return false; } -static int exec_string(struct thread_options *o, const char *string, const char *mode) +static int exec_string(struct thread_options *o, const char *string, + const char *mode) { - size_t newlen = strlen(string) + strlen(o->name) + strlen(mode) + 9 + 1; int ret; char *str; - str = malloc(newlen); - sprintf(str, "%s &> %s.%s.txt", string, o->name, mode); + if (asprintf(&str, "%s > %s.%s.txt 2>&1", string, o->name, mode) < 0) + return -1; - log_info("%s : Saving output of %s in %s.%s.txt\n",o->name, mode, o->name, mode); + log_info("%s : Saving output of %s in %s.%s.txt\n", o->name, mode, + o->name, mode); ret = system(str); if (ret == -1) log_err("fio: exec of cmd <%s> failed\n", str); @@ -1456,9 +1597,10 @@ static void *thread_main(void *data) struct thread_data *td = fd->td; struct thread_options *o = &td->o; struct sk_out *sk_out = fd->sk_out; + uint64_t bytes_done[DDIR_RWDIR_CNT]; int deadlock_loop_cnt; - int clear_state; - int ret; + bool clear_state; + int res, ret; sk_out_assign(sk_out); free(fd); @@ -1469,7 +1611,7 @@ static void *thread_main(void *data) } else td->pid = gettid(); - fio_local_clock_init(o->use_thread); + fio_local_clock_init(); dprint(FD_PROCESS, "jobs pid=%d started\n", (int) td->pid); @@ -1480,7 +1622,6 @@ static void *thread_main(void *data) INIT_FLIST_HEAD(&td->io_hist_list); INIT_FLIST_HEAD(&td->verify_list); INIT_FLIST_HEAD(&td->trim_list); - INIT_FLIST_HEAD(&td->next_rand_list); td->io_hist_tree = RB_ROOT; ret = mutex_cond_init_pshared(&td->io_u_lock, &td->free_cond); @@ -1495,11 +1636,11 @@ static void *thread_main(void *data) } td_set_runstate(td, TD_INITIALIZED); - dprint(FD_MUTEX, "up startup_mutex\n"); - fio_mutex_up(startup_mutex); - dprint(FD_MUTEX, "wait on td->mutex\n"); - fio_mutex_down(td->mutex); - dprint(FD_MUTEX, "done waiting on td->mutex\n"); + dprint(FD_MUTEX, "up startup_sem\n"); + fio_sem_up(startup_sem); + dprint(FD_MUTEX, "wait on td->sem\n"); + fio_sem_down(td->sem); + dprint(FD_MUTEX, "done waiting on td->sem\n"); /* * A new gid requires privilege, so we need to do this before setting @@ -1514,6 +1655,8 @@ static void *thread_main(void *data) goto err; } + td_zone_gen_index(td); + /* * Do this early, we don't want the compress threads to be limited * to the same CPUs as the IO workers. So do this before we set @@ -1609,15 +1752,10 @@ static void *thread_main(void *data) * May alter parameters that init_io_u() will use, so we need to * do this first. */ - if (init_iolog(td)) - goto err; - - if (init_io_u(td)) - goto err; - - if (o->verify_async && verify_async_init(td)) + if (!init_iolog(td)) goto err; + /* ioprio_set() has to be done before td_io_init() */ if (fio_option_is_set(o, ioprio) || fio_option_is_set(o, ioprio_class)) { ret = ioprio_set(IOPRIO_WHO_PROCESS, 0, o->ioprio_class, o->ioprio); @@ -1625,8 +1763,22 @@ static void *thread_main(void *data) td_verror(td, errno, "ioprio_set"); goto err; } + td->ioprio = ioprio_value(o->ioprio_class, o->ioprio); + td->ts.ioprio = td->ioprio; } + if (td_io_init(td)) + goto err; + + if (init_io_u(td)) + goto err; + + if (td->io_ops->post_init && td->io_ops->post_init(td)) + goto err; + + if (o->verify_async && verify_async_init(td)) + goto err; + if (o->cgroup && cgroup_setup(td, cgroup_list, &cgroup_mnt)) goto err; @@ -1642,47 +1794,46 @@ static void *thread_main(void *data) if (!o->create_serialize && setup_files(td)) goto err; - if (td_io_init(td)) + if (!init_random_map(td)) goto err; - if (init_random_map(td)) + if (o->exec_prerun && exec_string(o, o->exec_prerun, "prerun")) goto err; - if (o->exec_prerun && exec_string(o, o->exec_prerun, (const char *)"prerun")) + if (o->pre_read && !pre_read_files(td)) goto err; - if (o->pre_read) { - if (pre_read_files(td) < 0) - goto err; - } - fio_verify_init(td); if (rate_submit_init(td, sk_out)) goto err; - set_epoch_time(td, o->log_unix_epoch); + set_epoch_time(td, o->log_unix_epoch | o->log_alternate_epoch, o->log_alternate_epoch_clock_id); fio_getrusage(&td->ru_start); memcpy(&td->bw_sample_time, &td->epoch, sizeof(td->epoch)); memcpy(&td->iops_sample_time, &td->epoch, sizeof(td->epoch)); memcpy(&td->ss.prev_time, &td->epoch, sizeof(td->epoch)); + init_thinktime(td); + if (o->ratemin[DDIR_READ] || o->ratemin[DDIR_WRITE] || o->ratemin[DDIR_TRIM]) { - memcpy(&td->lastrate[DDIR_READ], &td->bw_sample_time, + memcpy(&td->last_rate_check_time[DDIR_READ], &td->bw_sample_time, sizeof(td->bw_sample_time)); - memcpy(&td->lastrate[DDIR_WRITE], &td->bw_sample_time, + memcpy(&td->last_rate_check_time[DDIR_WRITE], &td->bw_sample_time, sizeof(td->bw_sample_time)); - memcpy(&td->lastrate[DDIR_TRIM], &td->bw_sample_time, + memcpy(&td->last_rate_check_time[DDIR_TRIM], &td->bw_sample_time, sizeof(td->bw_sample_time)); } - clear_state = 0; + memset(bytes_done, 0, sizeof(bytes_done)); + clear_state = false; + while (keep_running(td)) { uint64_t verify_bytes; fio_gettime(&td->start, NULL); - memcpy(&td->tv_cache, &td->start, sizeof(td->start)); + memcpy(&td->ts_cache, &td->start, sizeof(td->start)); if (clear_state) { clear_io_state(td, 0); @@ -1693,11 +1844,9 @@ static void *thread_main(void *data) prune_io_piece_log(td); - if (td->o.verify_only && (td_write(td) || td_rw(td))) + if (td->o.verify_only && td_write(td)) verify_bytes = do_dry_run(td); else { - uint64_t bytes_done[DDIR_RWDIR_CNT]; - do_io(td, bytes_done); if (!ddir_rw_sum(bytes_done)) { @@ -1717,7 +1866,7 @@ static void *thread_main(void *data) if (td->runstate >= TD_EXITED) break; - clear_state = 1; + clear_state = true; /* * Make sure we've successfully updated the rusage stats @@ -1729,11 +1878,11 @@ static void *thread_main(void *data) deadlock_loop_cnt = 0; do { check_update_rusage(td); - if (!fio_mutex_down_trylock(stat_mutex)) + if (!fio_sem_down_trylock(stat_sem)) break; usleep(1000); if (deadlock_loop_cnt++ > 5000) { - log_err("fio seems to be stuck grabbing stat_mutex, forcibly exiting\n"); + log_err("fio seems to be stuck grabbing stat_sem, forcibly exiting\n"); td->error = EDEADLK; goto err; } @@ -1746,7 +1895,7 @@ static void *thread_main(void *data) if (td_trim(td) && td->io_bytes[DDIR_TRIM]) update_runtime(td, elapsed_us, DDIR_TRIM); fio_gettime(&td->start, NULL); - fio_mutex_up(stat_mutex); + fio_sem_up(stat_sem); if (td->error || td->terminate) break; @@ -1767,22 +1916,35 @@ static void *thread_main(void *data) */ check_update_rusage(td); - fio_mutex_down(stat_mutex); + fio_sem_down(stat_sem); update_runtime(td, elapsed_us, DDIR_READ); fio_gettime(&td->start, NULL); - fio_mutex_up(stat_mutex); + fio_sem_up(stat_sem); if (td->error || td->terminate) break; } + /* + * Acquire this lock if we were doing overlap checking in + * offload mode so that we don't clean up this job while + * another thread is checking its io_u's for overlap + */ + if (td_offload_overlap(td)) { + int res = pthread_mutex_lock(&overlap_check); + assert(res == 0); + } td_set_runstate(td, TD_FINISHING); + if (td_offload_overlap(td)) { + res = pthread_mutex_unlock(&overlap_check); + assert(res == 0); + } update_rusage_stat(td); td->ts.total_run_time = mtime_since_now(&td->epoch); - td->ts.io_bytes[DDIR_READ] = td->io_bytes[DDIR_READ]; - td->ts.io_bytes[DDIR_WRITE] = td->io_bytes[DDIR_WRITE]; - td->ts.io_bytes[DDIR_TRIM] = td->io_bytes[DDIR_TRIM]; + for_each_rw_ddir(ddir) { + td->ts.io_bytes[ddir] = td->io_bytes[ddir]; + } if (td->o.verify_state_save && !(td->flags & TD_F_VSTATE_SAVED) && (td->o.verify != VERIFY_NONE && td_write(td))) @@ -1796,10 +1958,10 @@ static void *thread_main(void *data) rate_submit_exit(td); if (o->exec_postrun) - exec_string(o, o->exec_postrun, (const char *)"postrun"); + exec_string(o, o->exec_postrun, "postrun"); if (exitall_on_terminate || (o->exitall_error && td->error)) - fio_terminate_threads(td->groupid); + fio_terminate_threads(td->groupid, td->o.exit_what); err: if (td->error) @@ -1812,17 +1974,9 @@ err: close_and_free_files(td); cleanup_io_u(td); close_ioengine(td); - cgroup_shutdown(td, &cgroup_mnt); + cgroup_shutdown(td, cgroup_mnt); verify_free_state(td); - - if (td->zone_state_index) { - int i; - - for (i = 0; i < DDIR_RWDIR_CNT; i++) - free(td->zone_state_index[i]); - free(td->zone_state_index); - td->zone_state_index = NULL; - } + td_zone_free_index(td); if (fio_option_is_set(o, cpumask)) { ret = fio_cpuset_exit(&o->cpumask); @@ -1835,9 +1989,8 @@ err: */ if (o->write_iolog_file) write_iolog_close(td); - - fio_mutex_remove(td->mutex); - td->mutex = NULL; + if (td->io_log_rfile) + fclose(td->io_log_rfile); td_set_runstate(td, TD_EXITED); @@ -1851,14 +2004,6 @@ err: return (void *) (uintptr_t) td->error; } -static void dump_td_info(struct thread_data *td) -{ - log_err("fio: job '%s' (state=%d) hasn't exited in %lu seconds, it " - "appears to be stuck. Doing forceful exit of this job.\n", - td->o.name, td->runstate, - (unsigned long) time_since_now(&td->terminate_time)); -} - /* * Run over the job map and reap the threads that have exited, if any. */ @@ -1876,11 +2021,7 @@ static void reap_threads(unsigned int *nr_running, uint64_t *t_rate, for_each_td(td, i) { int flags = 0; - /* - * ->io_ops is NULL for a thread that has closed its - * io engine - */ - if (td->io_ops && !strcmp(td->io_ops->name, "cpuio")) + if (!strcmp(td->o.ioengine, "cpuio")) cputhreads++; else realthreads++; @@ -1943,7 +2084,11 @@ static void reap_threads(unsigned int *nr_running, uint64_t *t_rate, if (td->terminate && td->runstate < TD_FSYNCING && time_since_now(&td->terminate_time) >= FIO_REAP_TIMEOUT) { - dump_td_info(td); + log_err("fio: job '%s' (state=%d) hasn't exited in " + "%lu seconds, it appears to be stuck. Doing " + "forceful exit of this job.\n", + td->o.name, td->runstate, + (unsigned long) time_since_now(&td->terminate_time)); td_set_runstate(td, TD_REAPED); goto reaped; } @@ -1965,10 +2110,11 @@ reaped: done_secs += mtime_since_now(&td->epoch) / 1000; profile_td_exit(td); + flow_exit_job(td); } if (*nr_running == cputhreads && !pending && realthreads) - fio_terminate_threads(TERMINATE_ALL); + fio_terminate_threads(TERMINATE_ALL, TERMINATE_ALL); } static bool __check_trigger_file(void) @@ -1991,7 +2137,10 @@ static bool __check_trigger_file(void) static bool trigger_timedout(void) { if (trigger_timeout) - return time_since_genesis() >= trigger_timeout; + if (time_since_genesis() >= trigger_timeout) { + trigger_timeout = 0; + return true; + } return false; } @@ -2000,7 +2149,7 @@ void exec_trigger(const char *cmd) { int ret; - if (!cmd) + if (!cmd || cmd[0] == '\0') return; ret = system(cmd); @@ -2015,7 +2164,7 @@ void check_trigger_file(void) fio_clients_send_trigger(trigger_remote_cmd); else { verify_save_state(IO_LIST_ALL); - fio_terminate_threads(TERMINATE_ALL); + fio_terminate_threads(TERMINATE_ALL, TERMINATE_ALL); exec_trigger(trigger_cmd); } } @@ -2035,8 +2184,16 @@ static int fio_verify_load_state(struct thread_data *td) td->thread_number - 1, &data); if (!ret) verify_assign_state(td, data); - } else - ret = verify_load_state(td, "local"); + } else { + char prefix[PATH_MAX]; + + if (aux_path) + sprintf(prefix, "%s%clocal", aux_path, + FIO_OS_PATH_SEPARATOR); + else + strcpy(prefix, "local"); + ret = verify_load_state(td, prefix); + } return ret; } @@ -2056,8 +2213,16 @@ static bool check_mount_writes(struct thread_data *td) if (!td_write(td) || td->o.allow_mounted_write) return false; + /* + * If FIO_HAVE_CHARDEV_SIZE is defined, it's likely that chrdevs + * are mkfs'd and mounted. + */ for_each_file(td, f, i) { +#ifdef FIO_HAVE_CHARDEV_SIZE + if (f->filetype != FIO_TYPE_BLOCK && f->filetype != FIO_TYPE_CHAR) +#else if (f->filetype != FIO_TYPE_BLOCK) +#endif continue; if (device_is_mounted(f->file_name)) goto mounted; @@ -2123,18 +2288,22 @@ static void run_threads(struct sk_out *sk_out) } if (output_format & FIO_OUTPUT_NORMAL) { - log_info("Starting "); + struct buf_output out; + + buf_output_init(&out); + __log_buf(&out, "Starting "); if (nr_thread) - log_info("%d thread%s", nr_thread, + __log_buf(&out, "%d thread%s", nr_thread, nr_thread > 1 ? "s" : ""); if (nr_process) { if (nr_thread) - log_info(" and "); - log_info("%d process%s", nr_process, + __log_buf(&out, " and "); + __log_buf(&out, "%d process%s", nr_process, nr_process > 1 ? "es" : ""); } - log_info("\n"); - log_info_flush(); + __log_buf(&out, "\n"); + log_info_buf(out.buf, out.buflen); + buf_output_free(&out); } todo = thread_number; @@ -2145,8 +2314,25 @@ static void run_threads(struct sk_out *sk_out) for_each_td(td, i) { print_status_init(td->thread_number - 1); - if (!td->o.create_serialize) + if (!td->o.create_serialize) { + /* + * When operating on a single rile in parallel, + * perform single-threaded early setup so that + * when setup_files() does not run into issues + * later. + */ + if (!i && td->o.nr_files==1) { + if (setup_shared_file(td)) { + exit_value++; + if (td->error) + log_err("fio: pid=%d, err=%d/%s\n", + (int) td->pid, td->error, td->verror); + td_set_runstate(td, TD_REAPED); + todo--; + } + } continue; + } if (fio_verify_load_state(td)) goto reap; @@ -2187,7 +2373,7 @@ reap: while (todo) { struct thread_data *map[REAL_MAX_JOBS]; - struct timeval this_start; + struct timespec this_start; int this_jobs = 0, left; struct fork_data *fd; @@ -2228,7 +2414,7 @@ reap: init_disk_util(td); - td->rusage_sem = fio_mutex_init(FIO_MUTEX_LOCKED); + td->rusage_sem = fio_sem_init(FIO_SEM_LOCKED); td->update_rusage = 0; /* @@ -2256,13 +2442,19 @@ reap: nr_started--; break; } + fd = NULL; ret = pthread_detach(td->thread); if (ret) log_err("pthread_detach: %s", strerror(ret)); } else { pid_t pid; + struct fio_file **files; + void *eo; dprint(FD_PROCESS, "will fork\n"); + files = td->files; + eo = td->eo; + read_barrier(); pid = fork(); if (!pid) { int ret; @@ -2271,16 +2463,23 @@ reap: _exit(ret); } else if (i == fio_debug_jobno) *fio_debug_jobp = pid; + // freeing previously allocated memory for files + // this memory freed MUST NOT be shared between processes, only the pointer itself may be shared within TD + free(files); + free(eo); + free(fd); + fd = NULL; } - dprint(FD_MUTEX, "wait on startup_mutex\n"); - if (fio_mutex_down_timeout(startup_mutex, 10000)) { + dprint(FD_MUTEX, "wait on startup_sem\n"); + if (fio_sem_down_timeout(startup_sem, 10000)) { log_err("fio: job startup hung? exiting.\n"); - fio_terminate_threads(TERMINATE_ALL); - fio_abort = 1; + fio_terminate_threads(TERMINATE_ALL, TERMINATE_ALL); + fio_abort = true; nr_started--; + free(fd); break; } - dprint(FD_MUTEX, "done waiting on startup_mutex\n"); + dprint(FD_MUTEX, "done waiting on startup_sem\n"); } /* @@ -2339,7 +2538,7 @@ reap: m_rate += ddir_rw_sum(td->o.ratemin); t_rate += ddir_rw_sum(td->o.rate); todo--; - fio_mutex_up(td->mutex); + fio_sem_up(td->sem); } reap_threads(&nr_running, &t_rate, &m_rate); @@ -2388,16 +2587,25 @@ int fio_backend(struct sk_out *sk_out) setup_log(&agg_io_log[DDIR_TRIM], &p, "agg-trim_bw.log"); } - startup_mutex = fio_mutex_init(FIO_MUTEX_LOCKED); - if (startup_mutex == NULL) + if (init_global_dedupe_working_set_seeds()) { + log_err("fio: failed to initialize global dedupe working set\n"); + return 1; + } + + startup_sem = fio_sem_init(FIO_SEM_LOCKED); + if (!sk_out) + is_local_backend = true; + if (startup_sem == NULL) return 1; set_genesis_time(); stat_init(); - helper_thread_create(startup_mutex, sk_out); + if (helper_thread_create(startup_sem, sk_out)) + log_err("fio: failed to create helper thread\n"); cgroup_list = smalloc(sizeof(*cgroup_list)); - INIT_FLIST_HEAD(cgroup_list); + if (cgroup_list) + INIT_FLIST_HEAD(cgroup_list); run_threads(sk_out); @@ -2416,25 +2624,27 @@ int fio_backend(struct sk_out *sk_out) } for_each_td(td, i) { - if (td->ss.dur) { - if (td->ss.iops_data != NULL) { - free(td->ss.iops_data); - free(td->ss.bw_data); - } - } + struct thread_stat *ts = &td->ts; + + free_clat_prio_stats(ts); + steadystate_free(td); fio_options_free(td); + fio_dump_options_free(td); if (td->rusage_sem) { - fio_mutex_remove(td->rusage_sem); + fio_sem_remove(td->rusage_sem); td->rusage_sem = NULL; } + fio_sem_remove(td->sem); + td->sem = NULL; } free_disk_util(); - cgroup_kill(cgroup_list); - sfree(cgroup_list); - sfree(cgroup_mnt); + if (cgroup_list) { + cgroup_kill(cgroup_list); + sfree(cgroup_list); + } - fio_mutex_remove(startup_mutex); + fio_sem_remove(startup_sem); stat_exit(); return exit_value; }