From 9e684a4976b7934f5ce011ea281dfef3352e5738 Mon Sep 17 00:00:00 2001 From: Dan Ehrenberg Date: Mon, 20 Feb 2012 11:05:14 +0100 Subject: [PATCH] Token-based flow control This patch allows two fio jobs to be kept to a certain proportion of each other using token-based flow control. There are three new parameters: flow, flow_watermark, and flow_sleep, documented in the fio options. An example of an fio job using these parameters is below: [global] norandommap thread time_based runtime=30 direct=1 ioengine=libaio iodepth=256 size=100g bs=8k filename=/tmp/testfile flow_watermark=100 flow_sleep=1000 [job2] numjobs=1 rw=write flow=-8 [job1] numjobs=1 rw=randread flow=1 The motivating application of this patch was to allow random reads and sequential writes at a particular given proportion. This initial version is only correct when run with 'thread', as shared state is represented with a global variable. It also only allows two jobs to be synchronized properly. A future version might do more, but no more functionality was needed for my application. Tested: Ran a few fio jobs with this flow control, observing the proportion of IOPS to match what was intended by the job file. Varied the flow_watermark and flow_sleep parameters and observed the effect on throughput. Signed-off-by: Dan Ehrenberg Modified by me to support flow_id, so an arbitrary number of flows can be used. This means it no longer relies on global context, so it can be used from a thread or process alike. Also added man page documentation. Signed-off-by: Jens Axboe --- HOWTO | 19 +++++++++ Makefile | 2 +- backend.c | 6 +++ examples/flow | 25 ++++++++++++ fio.1 | 21 ++++++++++ fio.h | 8 ++++ flow.c | 104 ++++++++++++++++++++++++++++++++++++++++++++++++++ flow.h | 11 ++++++ init.c | 7 ++++ options.c | 34 +++++++++++++++++ 10 files changed, 236 insertions(+), 1 deletion(-) create mode 100644 examples/flow create mode 100644 flow.c create mode 100644 flow.h diff --git a/HOWTO b/HOWTO index c6304a7c..da037d2b 100644 --- a/HOWTO +++ b/HOWTO @@ -1231,6 +1231,25 @@ uid=int Instead of running as the invoking user, set the user ID to gid=int Set group ID, see uid. +flow_id=int The ID of the flow. If not specified, it defaults to being a + global flow. See flow. + +flow=int Weight in token-based flow control. If this value is used, then + there is a 'flow counter' which is used to regulate the + proportion of activity between two or more jobs. fio attempts + to keep this flow counter near zero. The 'flow' parameter + stands for how much should be added or subtracted to the flow + counter on each iteration of the main I/O loop. That is, if + one job has flow=8 and another job has flow=-1, then there + will be a roughly 1:8 ratio in how much one runs vs the other. + +flow_watermark=int The maximum value that the absolute value of the flow + counter is allowed to reach before the job must wait for a + lower value of the counter. + +flow_sleep=int The period of time, in microseconds, to wait after the flow + watermark has been exceeded before retrying operations + In addition, there are some parameters which are only valid when a specific ioengine is in use. These are used identically to normal parameters, with the caveat that when used on the command line, they must come after the ioengine diff --git a/Makefile b/Makefile index dc468b56..8cce80ff 100644 --- a/Makefile +++ b/Makefile @@ -14,7 +14,7 @@ SOURCE = gettime.c fio.c ioengines.c init.c stat.c log.c time.c filesetup.c \ rbtree.c smalloc.c filehash.c profile.c debug.c lib/rand.c \ lib/num2str.c lib/ieee754.c $(wildcard crc/*.c) engines/cpu.c \ engines/mmap.c engines/sync.c engines/null.c engines/net.c \ - memalign.c server.c client.c iolog.c backend.c libfio.c + memalign.c server.c client.c iolog.c backend.c libfio.c flow.c ifeq ($(UNAME), Linux) SOURCE += diskutil.c fifo.c blktrace.c helpers.c cgroup.c trim.c \ diff --git a/backend.c b/backend.c index eaa6ea77..52791040 100644 --- a/backend.c +++ b/backend.c @@ -409,6 +409,9 @@ static void do_verify(struct thread_data *td) } } + if (flow_threshold_exceeded(td)) + continue; + io_u = __get_io_u(td); if (!io_u) break; @@ -560,6 +563,9 @@ static void do_io(struct thread_data *td) } } + if (flow_threshold_exceeded(td)) + continue; + io_u = get_io_u(td); if (!io_u) break; diff --git a/examples/flow b/examples/flow new file mode 100644 index 00000000..4b078cf8 --- /dev/null +++ b/examples/flow @@ -0,0 +1,25 @@ +# Example usage of flows. The below will have roughly a 1:8 difference +# between job2 and job1. +[global] +norandommap +thread +time_based +runtime=30 +direct=1 +ioengine=libaio +iodepth=256 +size=100g +bs=8k +filename=/tmp/testfile +flow_watermark=100 +flow_sleep=1000 + +[job2] +numjobs=1 +rw=write +flow=-8 + +[job1] +numjobs=1 +rw=randread +flow=1 diff --git a/fio.1 b/fio.1 index 75c8ec66..910812fb 100644 --- a/fio.1 +++ b/fio.1 @@ -958,6 +958,27 @@ the thread/process does any work. .BI gid \fR=\fPint Set group ID, see \fBuid\fR. .TP +.BI flow_id \fR=\fPint +The ID of the flow. If not specified, it defaults to being a global flow. See +\fBflow\fR. +.TP +.BI flow \fR=\fPint +Weight in token-based flow control. If this value is used, then there is a +\fBflow counter\fR which is used to regulate the proportion of activity between +two or more jobs. fio attempts to keep this flow counter near zero. The +\fBflow\fR parameter stands for how much should be added or subtracted to the +flow counter on each iteration of the main I/O loop. That is, if one job has +\fBflow=8\fR and another job has \fBflow=-1\fR, then there will be a roughly +1:8 ratio in how much one runs vs the other. +.TP +.BI flow_watermark \fR=\fPint +The maximum value that the absolute value of the flow counter is allowed to +reach before the job must wait for a lower value of the counter. +.TP +.BI flow_sleep \fR=\fPint +The period of time, in microseconds, to wait after the flow watermark has been +exceeded before retrying operations +.TP .BI clat_percentiles \fR=\fPbool Enable the reporting of percentiles of completion latencies. .TP diff --git a/fio.h b/fio.h index c4629802..5c912d0a 100644 --- a/fio.h +++ b/fio.h @@ -38,6 +38,7 @@ struct thread_data; #include "lib/rand.h" #include "server.h" #include "stat.h" +#include "flow.h" #ifdef FIO_HAVE_GUASI #include @@ -258,6 +259,11 @@ struct thread_options { unsigned int uid; unsigned int gid; + int flow_id; + int flow; + int flow_watermark; + unsigned int flow_sleep; + unsigned int sync_file_range; }; @@ -470,6 +476,8 @@ struct thread_data { unsigned int total_err_count; int first_error; + struct fio_flow *flow; + /* * Can be overloaded by profiles */ diff --git a/flow.c b/flow.c new file mode 100644 index 00000000..e5c4a404 --- /dev/null +++ b/flow.c @@ -0,0 +1,104 @@ +#include "fio.h" +#include "mutex.h" +#include "smalloc.h" +#include "flist.h" + +struct fio_flow { + unsigned int refs; + struct flist_head list; + unsigned int id; + long long int flow_counter; +}; + +static struct flist_head *flow_list; +static struct fio_mutex *flow_lock; + +int flow_threshold_exceeded(struct thread_data *td) +{ + struct fio_flow *flow = td->flow; + int sign; + + if (!flow) + return 0; + + sign = td->o.flow > 0 ? 1 : -1; + if (sign * flow->flow_counter > td->o.flow_watermark) { + if (td->o.flow_sleep) + usleep(td->o.flow_sleep); + return 1; + } + + /* No synchronization needed because it doesn't + * matter if the flow count is slightly inaccurate */ + flow->flow_counter += td->o.flow; + return 0; +} + +static struct fio_flow *flow_get(unsigned int id) +{ + struct fio_flow *flow; + struct flist_head *n; + + fio_mutex_down(flow_lock); + + flist_for_each(n, flow_list) { + flow = flist_entry(n, struct fio_flow, list); + if (flow->id == id) + break; + + flow = NULL; + } + + if (!flow) { + flow = smalloc(sizeof(*flow)); + flow->refs = 0; + INIT_FLIST_HEAD(&flow->list); + flow->id = id; + flow->flow_counter = 0; + + flist_add_tail(&flow->list, flow_list); + } + + flow->refs++; + fio_mutex_up(flow_lock); + return flow; +} + +static void flow_put(struct fio_flow *flow) +{ + fio_mutex_down(flow_lock); + + if (!--flow->refs) { + flist_del(&flow->list); + sfree(flow); + } + + fio_mutex_up(flow_lock); +} + +void flow_init_job(struct thread_data *td) +{ + if (td->o.flow) + td->flow = flow_get(td->o.flow_id); +} + +void flow_exit_job(struct thread_data *td) +{ + if (td->flow) { + flow_put(td->flow); + td->flow = NULL; + } +} + +void flow_init(void) +{ + flow_lock = fio_mutex_init(1); + flow_list = smalloc(sizeof(*flow_list)); + INIT_FLIST_HEAD(flow_list); +} + +void flow_exit(void) +{ + fio_mutex_remove(flow_lock); + sfree(flow_list); +} diff --git a/flow.h b/flow.h new file mode 100644 index 00000000..c0a45c3c --- /dev/null +++ b/flow.h @@ -0,0 +1,11 @@ +#ifndef FIO_FLOW_H +#define FIO_FLOW_H + +int flow_threshold_exceeded(struct thread_data *td); +void flow_init_job(struct thread_data *td); +void flow_exit_job(struct thread_data *td); + +void flow_exit(void); +void flow_init(void); + +#endif diff --git a/init.c b/init.c index 8dc57845..d7d80118 100644 --- a/init.c +++ b/init.c @@ -221,6 +221,7 @@ static void free_shm(void) threads = NULL; file_hash_exit(); + flow_exit(); fio_debug_jobp = NULL; shmdt(tp); shmctl(shm_id, IPC_RMID, &sbuf); @@ -277,6 +278,9 @@ static int setup_thread_area(void) fio_debug_jobp = (void *) hash + file_hash_size; *fio_debug_jobp = -1; file_hash_init(hash); + + flow_init(); + return 0; } @@ -324,6 +328,7 @@ static void put_job(struct thread_data *td) return; profile_td_exit(td); + flow_exit_job(td); if (td->error) log_info("fio: %s\n", td->verror); @@ -796,6 +801,8 @@ static int add_job(struct thread_data *td, const char *jobname, int job_add_num) if (fixup_options(td)) goto err; + flow_init_job(td); + /* * IO engines only need this for option callbacks, and the address may * change in subprocesses. diff --git a/options.c b/options.c index c3fdb563..bd8141a8 100644 --- a/options.c +++ b/options.c @@ -2169,6 +2169,40 @@ static struct fio_option options[FIO_MAX_OPTS] = { .off1 = td_var_offset(gid), .help = "Run job with this group ID", }, + { + .name = "flow_id", + .type = FIO_OPT_INT, + .off1 = td_var_offset(flow_id), + .help = "The flow index ID to use", + .def = "0", + }, + { + .name = "flow", + .type = FIO_OPT_INT, + .off1 = td_var_offset(flow), + .help = "Weight for flow control of this job", + .parent = "flow_id", + .def = "0", + }, + { + .name = "flow_watermark", + .type = FIO_OPT_INT, + .off1 = td_var_offset(flow_watermark), + .help = "High watermark for flow control. This option" + " should be set to the same value for all threads" + " with non-zero flow.", + .parent = "flow_id", + .def = "1024", + }, + { + .name = "flow_sleep", + .type = FIO_OPT_INT, + .off1 = td_var_offset(flow_sleep), + .help = "How many microseconds to sleep after being held" + " back by the flow control mechanism", + .parent = "flow_id", + .def = "0", + }, { .name = NULL, }, -- 2.25.1