Token-based flow control
authorDan Ehrenberg <dehrenberg@google.com>
Mon, 20 Feb 2012 10:05:14 +0000 (11:05 +0100)
committerJens Axboe <axboe@kernel.dk>
Mon, 20 Feb 2012 10:07:58 +0000 (11:07 +0100)
This patch allows two fio jobs to be kept to a certain
proportion of each other using token-based flow control.
There are three new parameters: flow, flow_watermark, and
flow_sleep, documented in the fio options. An example of an fio
job using these parameters is below:

[global]
norandommap
thread
time_based
runtime=30
direct=1
ioengine=libaio
iodepth=256
size=100g
bs=8k
filename=/tmp/testfile
flow_watermark=100
flow_sleep=1000

[job2]
numjobs=1
rw=write
flow=-8

[job1]
numjobs=1
rw=randread
flow=1

The motivating application of this patch was to allow random reads
and sequential writes at a particular given proportion.

This initial version is only correct when run with 'thread', as shared
state is represented with a global variable. It also only allows two
jobs to be synchronized properly. A future version might do more, but
no more functionality was needed for my application.

Tested: Ran a few fio jobs with this flow control, observing
the proportion of IOPS to match what was intended by the job file.
Varied the flow_watermark and flow_sleep parameters and observed
the effect on throughput.

Signed-off-by: Dan Ehrenberg <dehrenberg@google.com>
Modified by me to support flow_id, so an arbitrary number of flows can
be used. This means it no longer relies on global context, so it can be
used from a thread or process alike. Also added man page documentation.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
HOWTO
Makefile
backend.c
examples/flow [new file with mode: 0644]
fio.1
fio.h
flow.c [new file with mode: 0644]
flow.h [new file with mode: 0644]
init.c
options.c

diff --git a/HOWTO b/HOWTO
index c6304a7ca2ed7bf27b65df9ce844efa88f221534..da037d2b92cf18a682d13ac7210e8404a9b8e9e0 100644 (file)
--- a/HOWTO
+++ b/HOWTO
@@ -1231,6 +1231,25 @@ uid=int          Instead of running as the invoking user, set the user ID to
 
 gid=int                Set group ID, see uid.
 
+flow_id=int    The ID of the flow. If not specified, it defaults to being a
+               global flow. See flow.
+
+flow=int       Weight in token-based flow control. If this value is used, then
+               there is a 'flow counter' which is used to regulate the
+               proportion of activity between two or more jobs. fio attempts
+               to keep this flow counter near zero. The 'flow' parameter
+               stands for how much should be added or subtracted to the flow
+               counter on each iteration of the main I/O loop. That is, if
+               one job has flow=8 and another job has flow=-1, then there
+               will be a roughly 1:8 ratio in how much one runs vs the other.
+
+flow_watermark=int     The maximum value that the absolute value of the flow
+               counter is allowed to reach before the job must wait for a
+               lower value of the counter.
+
+flow_sleep=int The period of time, in microseconds, to wait after the flow
+               watermark has been exceeded before retrying operations
+
 In addition, there are some parameters which are only valid when a specific
 ioengine is in use. These are used identically to normal parameters, with the
 caveat that when used on the command line, they must come after the ioengine
index dc468b5663a349fc63308032cc66d81279a528d5..8cce80ffd634b22c74b065204281e50424918d77 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -14,7 +14,7 @@ SOURCE = gettime.c fio.c ioengines.c init.c stat.c log.c time.c filesetup.c \
                rbtree.c smalloc.c filehash.c profile.c debug.c lib/rand.c \
                lib/num2str.c lib/ieee754.c $(wildcard crc/*.c) engines/cpu.c \
                engines/mmap.c engines/sync.c engines/null.c engines/net.c \
-               memalign.c server.c client.c iolog.c backend.c libfio.c
+               memalign.c server.c client.c iolog.c backend.c libfio.c flow.c
 
 ifeq ($(UNAME), Linux)
   SOURCE += diskutil.c fifo.c blktrace.c helpers.c cgroup.c trim.c \
index eaa6ea77752e9d82feaf968c1aed5c6fdc20778c..52791040ff21384d283b2a09de3ff9718e19d866 100644 (file)
--- a/backend.c
+++ b/backend.c
@@ -409,6 +409,9 @@ static void do_verify(struct thread_data *td)
                        }
                }
 
+               if (flow_threshold_exceeded(td))
+                       continue;
+
                io_u = __get_io_u(td);
                if (!io_u)
                        break;
@@ -560,6 +563,9 @@ static void do_io(struct thread_data *td)
                        }
                }
 
+               if (flow_threshold_exceeded(td))
+                       continue;
+
                io_u = get_io_u(td);
                if (!io_u)
                        break;
diff --git a/examples/flow b/examples/flow
new file mode 100644 (file)
index 0000000..4b078cf
--- /dev/null
@@ -0,0 +1,25 @@
+# Example usage of flows. The below will have roughly a 1:8 difference
+# between job2 and job1.
+[global]
+norandommap
+thread
+time_based
+runtime=30
+direct=1
+ioengine=libaio
+iodepth=256
+size=100g
+bs=8k
+filename=/tmp/testfile
+flow_watermark=100
+flow_sleep=1000
+
+[job2]
+numjobs=1
+rw=write
+flow=-8
+
+[job1]
+numjobs=1
+rw=randread
+flow=1
diff --git a/fio.1 b/fio.1
index 75c8ec669b2d6c447f9405cf1d8bf4028685a0b3..910812fb5bef351bc8a8ca5d557aacb3bf2e0a8f 100644 (file)
--- a/fio.1
+++ b/fio.1
@@ -958,6 +958,27 @@ the thread/process does any work.
 .BI gid \fR=\fPint
 Set group ID, see \fBuid\fR.
 .TP
+.BI flow_id \fR=\fPint
+The ID of the flow. If not specified, it defaults to being a global flow. See
+\fBflow\fR.
+.TP
+.BI flow \fR=\fPint
+Weight in token-based flow control. If this value is used, then there is a
+\fBflow counter\fR which is used to regulate the proportion of activity between
+two or more jobs. fio attempts to keep this flow counter near zero. The
+\fBflow\fR parameter stands for how much should be added or subtracted to the
+flow counter on each iteration of the main I/O loop. That is, if one job has
+\fBflow=8\fR and another job has \fBflow=-1\fR, then there will be a roughly
+1:8 ratio in how much one runs vs the other.
+.TP
+.BI flow_watermark \fR=\fPint
+The maximum value that the absolute value of the flow counter is allowed to
+reach before the job must wait for a lower value of the counter.
+.TP
+.BI flow_sleep \fR=\fPint
+The period of time, in microseconds, to wait after the flow watermark has been
+exceeded before retrying operations
+.TP
 .BI clat_percentiles \fR=\fPbool
 Enable the reporting of percentiles of completion latencies.
 .TP
diff --git a/fio.h b/fio.h
index c462980255edef7d9bcf2032b71430fe4a939ff7..5c912d0aab89da4c3b8c0510e5220107e083a067 100644 (file)
--- a/fio.h
+++ b/fio.h
@@ -38,6 +38,7 @@ struct thread_data;
 #include "lib/rand.h"
 #include "server.h"
 #include "stat.h"
+#include "flow.h"
 
 #ifdef FIO_HAVE_GUASI
 #include <guasi.h>
@@ -258,6 +259,11 @@ struct thread_options {
        unsigned int uid;
        unsigned int gid;
 
+       int flow_id;
+       int flow;
+       int flow_watermark;
+       unsigned int flow_sleep;
+
        unsigned int sync_file_range;
 };
 
@@ -470,6 +476,8 @@ struct thread_data {
        unsigned int total_err_count;
        int first_error;
 
+       struct fio_flow *flow;
+
        /*
         * Can be overloaded by profiles
         */
diff --git a/flow.c b/flow.c
new file mode 100644 (file)
index 0000000..e5c4a40
--- /dev/null
+++ b/flow.c
@@ -0,0 +1,104 @@
+#include "fio.h"
+#include "mutex.h"
+#include "smalloc.h"
+#include "flist.h"
+
+struct fio_flow {
+       unsigned int refs;
+       struct flist_head list;
+       unsigned int id;
+       long long int flow_counter;
+};
+
+static struct flist_head *flow_list;
+static struct fio_mutex *flow_lock;
+
+int flow_threshold_exceeded(struct thread_data *td)
+{
+       struct fio_flow *flow = td->flow;
+       int sign;
+
+       if (!flow)
+               return 0;
+
+       sign = td->o.flow > 0 ? 1 : -1;
+       if (sign * flow->flow_counter > td->o.flow_watermark) {
+               if (td->o.flow_sleep)
+                       usleep(td->o.flow_sleep);
+               return 1;
+       }
+
+       /* No synchronization needed because it doesn't
+        * matter if the flow count is slightly inaccurate */
+       flow->flow_counter += td->o.flow;
+       return 0;
+}
+
+static struct fio_flow *flow_get(unsigned int id)
+{
+       struct fio_flow *flow;
+       struct flist_head *n;
+
+       fio_mutex_down(flow_lock);
+
+       flist_for_each(n, flow_list) {
+               flow = flist_entry(n, struct fio_flow, list);
+               if (flow->id == id)
+                       break;
+
+               flow = NULL;
+       }
+
+       if (!flow) {
+               flow = smalloc(sizeof(*flow));
+               flow->refs = 0;
+               INIT_FLIST_HEAD(&flow->list);
+               flow->id = id;
+               flow->flow_counter = 0;
+
+               flist_add_tail(&flow->list, flow_list);
+       }
+
+       flow->refs++;
+       fio_mutex_up(flow_lock);
+       return flow;
+}
+
+static void flow_put(struct fio_flow *flow)
+{
+       fio_mutex_down(flow_lock);
+
+       if (!--flow->refs) {
+               flist_del(&flow->list);
+               sfree(flow);
+       }
+
+       fio_mutex_up(flow_lock);
+}
+
+void flow_init_job(struct thread_data *td)
+{
+       if (td->o.flow)
+               td->flow = flow_get(td->o.flow_id);
+}
+
+void flow_exit_job(struct thread_data *td)
+{
+       if (td->flow) {
+               flow_put(td->flow);
+               td->flow = NULL;
+       }
+}
+
+void flow_init(void)
+{
+       flow_lock = fio_mutex_init(1);
+       flow_list = smalloc(sizeof(*flow_list));
+       INIT_FLIST_HEAD(flow_list);
+}
+
+void flow_exit(void)
+{
+       fio_mutex_remove(flow_lock);
+       sfree(flow_list);
+}
diff --git a/flow.h b/flow.h
new file mode 100644 (file)
index 0000000..c0a45c3
--- /dev/null
+++ b/flow.h
@@ -0,0 +1,11 @@
+#ifndef FIO_FLOW_H
+#define FIO_FLOW_H
+
+int flow_threshold_exceeded(struct thread_data *td);
+void flow_init_job(struct thread_data *td);
+void flow_exit_job(struct thread_data *td);
+
+void flow_exit(void);
+void flow_init(void);
+
+#endif
diff --git a/init.c b/init.c
index 8dc57845ee24605ea316e6252ca8f5684365d74c..d7d801181c435247e24217d5d53bfcc372698837 100644 (file)
--- a/init.c
+++ b/init.c
@@ -221,6 +221,7 @@ static void free_shm(void)
 
                threads = NULL;
                file_hash_exit();
+               flow_exit();
                fio_debug_jobp = NULL;
                shmdt(tp);
                shmctl(shm_id, IPC_RMID, &sbuf);
@@ -277,6 +278,9 @@ static int setup_thread_area(void)
        fio_debug_jobp = (void *) hash + file_hash_size;
        *fio_debug_jobp = -1;
        file_hash_init(hash);
+
+       flow_init();
+
        return 0;
 }
 
@@ -324,6 +328,7 @@ static void put_job(struct thread_data *td)
                return;
 
        profile_td_exit(td);
+       flow_exit_job(td);
 
        if (td->error)
                log_info("fio: %s\n", td->verror);
@@ -796,6 +801,8 @@ static int add_job(struct thread_data *td, const char *jobname, int job_add_num)
        if (fixup_options(td))
                goto err;
 
+       flow_init_job(td);
+
        /*
         * IO engines only need this for option callbacks, and the address may
         * change in subprocesses.
index c3fdb56314a5007c00bab2bdee57817816c9e9ee..bd8141a83f34d8a012742991dbd1422fb20c54d6 100644 (file)
--- a/options.c
+++ b/options.c
@@ -2169,6 +2169,40 @@ static struct fio_option options[FIO_MAX_OPTS] = {
                .off1   = td_var_offset(gid),
                .help   = "Run job with this group ID",
        },
+       {
+               .name   = "flow_id",
+               .type   = FIO_OPT_INT,
+               .off1   = td_var_offset(flow_id),
+               .help   = "The flow index ID to use",
+               .def    = "0",
+       },
+       {
+               .name   = "flow",
+               .type   = FIO_OPT_INT,
+               .off1   = td_var_offset(flow),
+               .help   = "Weight for flow control of this job",
+               .parent = "flow_id",
+               .def    = "0",
+       },
+       {
+               .name   = "flow_watermark",
+               .type   = FIO_OPT_INT,
+               .off1   = td_var_offset(flow_watermark),
+               .help   = "High watermark for flow control. This option"
+                       " should be set to the same value for all threads"
+                       " with non-zero flow.",
+               .parent = "flow_id",
+               .def    = "1024",
+       },
+       {
+               .name   = "flow_sleep",
+               .type   = FIO_OPT_INT,
+               .off1   = td_var_offset(flow_sleep),
+               .help   = "How many microseconds to sleep after being held"
+                       " back by the flow control mechanism",
+               .parent = "flow_id",
+               .def    = "0",
+       },
        {
                .name = NULL,
        },