X-Git-Url: https://git.kernel.dk/?p=fio.git;a=blobdiff_plain;f=rate-submit.c;h=cf00d9bc75c56f057b41a337ae8bdc740aebaa66;hp=a68a3dd3a7b26bf1e4a03c03e3583c28ebfc6faa;hb=04ba61dfa67784d4dfcc22a2b3de7ede28e22e40;hpb=405113013b076aeb89634a333af96f948044b110 diff --git a/rate-submit.c b/rate-submit.c index a68a3dd3..cf00d9bc 100644 --- a/rate-submit.c +++ b/rate-submit.c @@ -1,18 +1,66 @@ +/* + * Rated submission helpers + * + * Copyright (C) 2015 Jens Axboe + * + */ #include "fio.h" -#include "ioengine.h" +#include "ioengines.h" #include "lib/getrusage.h" +#include "rate-submit.h" -static void io_workqueue_fn(struct submit_worker *sw, - struct workqueue_work *work) +static void check_overlap(struct io_u *io_u) +{ + int i; + struct thread_data *td; + bool overlap = false; + + do { + /* + * Allow only one thread to check for overlap at a + * time to prevent two threads from thinking the coast + * is clear and then submitting IOs that overlap with + * each other + * + * If an overlap is found, release the lock and + * re-acquire it before checking again to give other + * threads a chance to make progress + * + * If an overlap is not found, release the lock when the + * io_u's IO_U_F_FLIGHT flag is set so that this io_u + * can be checked by other threads as they assess overlap + */ + pthread_mutex_lock(&overlap_check); + for_each_td(td, i) { + if (td->runstate <= TD_SETTING_UP || + td->runstate >= TD_FINISHING || + !td->o.serialize_overlap || + td->o.io_submit_mode != IO_MODE_OFFLOAD) + continue; + + overlap = in_flight_overlap(&td->io_u_all, io_u); + if (overlap) { + pthread_mutex_unlock(&overlap_check); + break; + } + } + } while (overlap); +} + +static int io_workqueue_fn(struct submit_worker *sw, + struct workqueue_work *work) { struct io_u *io_u = container_of(work, struct io_u, work); const enum fio_ddir ddir = io_u->ddir; - struct thread_data *td = sw->private; - int ret; + struct thread_data *td = sw->priv; + int ret, error; + + if (td->o.serialize_overlap) + check_overlap(io_u); dprint(FD_RATE, "io_u %p queued by %u\n", io_u, gettid()); - io_u_set(io_u, IO_U_F_NO_FILE_PUT); + io_u_set(td, io_u, IO_U_F_NO_FILE_PUT); td->cur_depth++; @@ -23,12 +71,14 @@ static void io_workqueue_fn(struct submit_worker *sw, ret = io_u_queued_complete(td, 1); if (ret > 0) td->cur_depth -= ret; - io_u_clear(io_u, IO_U_F_FLIGHT); + else if (ret < 0) + break; + io_u_clear(td, io_u, IO_U_F_FLIGHT); } while (1); dprint(FD_RATE, "io_u %p ret %d by %u\n", io_u, ret, gettid()); - io_queue_event(td, io_u, &ret, ddir, NULL, 0, NULL); + error = io_queue_event(td, io_u, &ret, ddir, NULL, 0, NULL); if (ret == FIO_Q_COMPLETED) td->cur_depth--; @@ -43,17 +93,20 @@ static void io_workqueue_fn(struct submit_worker *sw, ret = io_u_queued_complete(td, min_evts); if (ret > 0) td->cur_depth -= ret; - } else if (ret == FIO_Q_BUSY) { - ret = io_u_queued_complete(td, td->cur_depth); - if (ret > 0) - td->cur_depth -= ret; } + + if (error || td->error) + pthread_cond_signal(&td->parent->free_cond); + + return 0; } static bool io_workqueue_pre_sleep_flush_fn(struct submit_worker *sw) { - struct thread_data *td = sw->private; + struct thread_data *td = sw->priv; + if (td->error) + return false; if (td->io_u_queued || td->cur_depth || td->io_u_in_flight) return true; @@ -62,7 +115,7 @@ static bool io_workqueue_pre_sleep_flush_fn(struct submit_worker *sw) static void io_workqueue_pre_sleep_fn(struct submit_worker *sw) { - struct thread_data *td = sw->private; + struct thread_data *td = sw->priv; int ret; ret = io_u_quiesce(td); @@ -75,21 +128,20 @@ static int io_workqueue_alloc_fn(struct submit_worker *sw) struct thread_data *td; td = calloc(1, sizeof(*td)); - sw->private = td; + sw->priv = td; return 0; } static void io_workqueue_free_fn(struct submit_worker *sw) { - free(sw->private); - sw->private = NULL; + free(sw->priv); + sw->priv = NULL; } static int io_workqueue_init_worker_fn(struct submit_worker *sw) { struct thread_data *parent = sw->wq->td; - struct thread_data *td = sw->private; - int fio_unused ret; + struct thread_data *td = sw->priv; memcpy(&td->o, &parent->o, sizeof(td->o)); memcpy(&td->ts, &parent->ts, sizeof(td->ts)); @@ -101,28 +153,27 @@ static int io_workqueue_init_worker_fn(struct submit_worker *sw) if (ioengine_load(td)) goto err; - if (td->o.odirect) - td->io_ops->flags |= FIO_RAWIO; - td->pid = gettid(); INIT_FLIST_HEAD(&td->io_log_list); INIT_FLIST_HEAD(&td->io_hist_list); INIT_FLIST_HEAD(&td->verify_list); INIT_FLIST_HEAD(&td->trim_list); - INIT_FLIST_HEAD(&td->next_rand_list); td->io_hist_tree = RB_ROOT; td->o.iodepth = 1; if (td_io_init(td)) goto err_io_init; - fio_gettime(&td->epoch, NULL); + if (td->io_ops->post_init && td->io_ops->post_init(td)) + goto err_io_init; + + set_epoch_time(td, td->o.log_unix_epoch); fio_getrusage(&td->ru_start); clear_io_state(td, 1); td_set_runstate(td, TD_RUNNING); - td->flags |= TD_F_CHILD; + td->flags |= TD_F_CHILD | TD_F_NEED_LOCK; td->parent = parent; return 0; @@ -136,7 +187,7 @@ err: static void io_workqueue_exit_worker_fn(struct submit_worker *sw, unsigned int *sum_cnt) { - struct thread_data *td = sw->private; + struct thread_data *td = sw->priv; (*sum_cnt)++; sum_thread_stats(&sw->wq->td->ts, &td->ts, *sum_cnt == 1); @@ -204,7 +255,7 @@ static void sum_ddir(struct thread_data *dst, struct thread_data *src, static void io_workqueue_update_acct_fn(struct submit_worker *sw) { - struct thread_data *src = sw->private; + struct thread_data *src = sw->priv; struct thread_data *dst = sw->wq->td; if (td_read(src)) @@ -216,7 +267,7 @@ static void io_workqueue_update_acct_fn(struct submit_worker *sw) } -struct workqueue_ops rated_wq_ops = { +static struct workqueue_ops rated_wq_ops = { .fn = io_workqueue_fn, .pre_sleep_flush_fn = io_workqueue_pre_sleep_flush_fn, .pre_sleep_fn = io_workqueue_pre_sleep_fn, @@ -226,3 +277,19 @@ struct workqueue_ops rated_wq_ops = { .init_worker_fn = io_workqueue_init_worker_fn, .exit_worker_fn = io_workqueue_exit_worker_fn, }; + +int rate_submit_init(struct thread_data *td, struct sk_out *sk_out) +{ + if (td->o.io_submit_mode != IO_MODE_OFFLOAD) + return 0; + + return workqueue_init(td, &td->io_wq, &rated_wq_ops, td->o.iodepth, sk_out); +} + +void rate_submit_exit(struct thread_data *td) +{ + if (td->o.io_submit_mode != IO_MODE_OFFLOAD) + return; + + workqueue_exit(&td->io_wq); +}