Token-based flow control
authorDan Ehrenberg <dehrenberg@google.com>
Mon, 20 Feb 2012 10:05:14 +0000 (11:05 +0100)
committerJens Axboe <axboe@kernel.dk>
Mon, 20 Feb 2012 10:07:58 +0000 (11:07 +0100)
This patch allows two fio jobs to be kept to a certain
proportion of each other using token-based flow control.
There are three new parameters: flow, flow_watermark, and
flow_sleep, documented in the fio options. An example of an fio
job using these parameters is below:

[global]
norandommap
thread
time_based
runtime=30
direct=1
ioengine=libaio
iodepth=256
size=100g
bs=8k
filename=/tmp/testfile
flow_watermark=100
flow_sleep=1000

[job2]
numjobs=1
rw=write
flow=-8

[job1]
numjobs=1
rw=randread
flow=1

The motivating application of this patch was to allow random reads
and sequential writes at a particular given proportion.

This initial version is only correct when run with 'thread', as shared
state is represented with a global variable. It also only allows two
jobs to be synchronized properly. A future version might do more, but
no more functionality was needed for my application.

Tested: Ran a few fio jobs with this flow control, observing
the proportion of IOPS to match what was intended by the job file.
Varied the flow_watermark and flow_sleep parameters and observed
the effect on throughput.

Signed-off-by: Dan Ehrenberg <dehrenberg@google.com>
Modified by me to support flow_id, so an arbitrary number of flows can
be used. This means it no longer relies on global context, so it can be
used from a thread or process alike. Also added man page documentation.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
HOWTO
Makefile
backend.c
examples/flow [new file with mode: 0644]
fio.1
fio.h
flow.c [new file with mode: 0644]
flow.h [new file with mode: 0644]
init.c
options.c

diff --git a/HOWTO b/HOWTO
index c6304a7..da037d2 100644 (file)
--- a/HOWTO
+++ b/HOWTO
@@ -1231,6 +1231,25 @@ uid=int          Instead of running as the invoking user, set the user ID to
 
 gid=int                Set group ID, see uid.
 
+flow_id=int    The ID of the flow. If not specified, it defaults to being a
+               global flow. See flow.
+
+flow=int       Weight in token-based flow control. If this value is used, then
+               there is a 'flow counter' which is used to regulate the
+               proportion of activity between two or more jobs. fio attempts
+               to keep this flow counter near zero. The 'flow' parameter
+               stands for how much should be added or subtracted to the flow
+               counter on each iteration of the main I/O loop. That is, if
+               one job has flow=8 and another job has flow=-1, then there
+               will be a roughly 1:8 ratio in how much one runs vs the other.
+
+flow_watermark=int     The maximum value that the absolute value of the flow
+               counter is allowed to reach before the job must wait for a
+               lower value of the counter.
+
+flow_sleep=int The period of time, in microseconds, to wait after the flow
+               watermark has been exceeded before retrying operations
+
 In addition, there are some parameters which are only valid when a specific
 ioengine is in use. These are used identically to normal parameters, with the
 caveat that when used on the command line, they must come after the ioengine
index dc468b5..8cce80f 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -14,7 +14,7 @@ SOURCE = gettime.c fio.c ioengines.c init.c stat.c log.c time.c filesetup.c \
                rbtree.c smalloc.c filehash.c profile.c debug.c lib/rand.c \
                lib/num2str.c lib/ieee754.c $(wildcard crc/*.c) engines/cpu.c \
                engines/mmap.c engines/sync.c engines/null.c engines/net.c \
-               memalign.c server.c client.c iolog.c backend.c libfio.c
+               memalign.c server.c client.c iolog.c backend.c libfio.c flow.c
 
 ifeq ($(UNAME), Linux)
   SOURCE += diskutil.c fifo.c blktrace.c helpers.c cgroup.c trim.c \
index eaa6ea7..5279104 100644 (file)
--- a/backend.c
+++ b/backend.c
@@ -409,6 +409,9 @@ static void do_verify(struct thread_data *td)
                        }
                }
 
+               if (flow_threshold_exceeded(td))
+                       continue;
+
                io_u = __get_io_u(td);
                if (!io_u)
                        break;
@@ -560,6 +563,9 @@ static void do_io(struct thread_data *td)
                        }
                }
 
+               if (flow_threshold_exceeded(td))
+                       continue;
+
                io_u = get_io_u(td);
                if (!io_u)
                        break;
diff --git a/examples/flow b/examples/flow
new file mode 100644 (file)
index 0000000..4b078cf
--- /dev/null
@@ -0,0 +1,25 @@
+# Example usage of flows. The below will have roughly a 1:8 difference
+# between job2 and job1.
+[global]
+norandommap
+thread
+time_based
+runtime=30
+direct=1
+ioengine=libaio
+iodepth=256
+size=100g
+bs=8k
+filename=/tmp/testfile
+flow_watermark=100
+flow_sleep=1000
+
+[job2]
+numjobs=1
+rw=write
+flow=-8
+
+[job1]
+numjobs=1
+rw=randread
+flow=1
diff --git a/fio.1 b/fio.1
index 75c8ec6..910812f 100644 (file)
--- a/fio.1
+++ b/fio.1
@@ -958,6 +958,27 @@ the thread/process does any work.
 .BI gid \fR=\fPint
 Set group ID, see \fBuid\fR.
 .TP
+.BI flow_id \fR=\fPint
+The ID of the flow. If not specified, it defaults to being a global flow. See
+\fBflow\fR.
+.TP
+.BI flow \fR=\fPint
+Weight in token-based flow control. If this value is used, then there is a
+\fBflow counter\fR which is used to regulate the proportion of activity between
+two or more jobs. fio attempts to keep this flow counter near zero. The
+\fBflow\fR parameter stands for how much should be added or subtracted to the
+flow counter on each iteration of the main I/O loop. That is, if one job has
+\fBflow=8\fR and another job has \fBflow=-1\fR, then there will be a roughly
+1:8 ratio in how much one runs vs the other.
+.TP
+.BI flow_watermark \fR=\fPint
+The maximum value that the absolute value of the flow counter is allowed to
+reach before the job must wait for a lower value of the counter.
+.TP
+.BI flow_sleep \fR=\fPint
+The period of time, in microseconds, to wait after the flow watermark has been
+exceeded before retrying operations
+.TP
 .BI clat_percentiles \fR=\fPbool
 Enable the reporting of percentiles of completion latencies.
 .TP
diff --git a/fio.h b/fio.h
index c462980..5c912d0 100644 (file)
--- a/fio.h
+++ b/fio.h
@@ -38,6 +38,7 @@ struct thread_data;
 #include "lib/rand.h"
 #include "server.h"
 #include "stat.h"
+#include "flow.h"
 
 #ifdef FIO_HAVE_GUASI
 #include <guasi.h>
@@ -258,6 +259,11 @@ struct thread_options {
        unsigned int uid;
        unsigned int gid;
 
+       int flow_id;
+       int flow;
+       int flow_watermark;
+       unsigned int flow_sleep;
+
        unsigned int sync_file_range;
 };
 
@@ -470,6 +476,8 @@ struct thread_data {
        unsigned int total_err_count;
        int first_error;
 
+       struct fio_flow *flow;
+
        /*
         * Can be overloaded by profiles
         */
diff --git a/flow.c b/flow.c
new file mode 100644 (file)
index 0000000..e5c4a40
--- /dev/null
+++ b/flow.c
@@ -0,0 +1,104 @@
+#include "fio.h"
+#include "mutex.h"
+#include "smalloc.h"
+#include "flist.h"
+
+struct fio_flow {
+       unsigned int refs;
+       struct flist_head list;
+       unsigned int id;
+       long long int flow_counter;
+};
+
+static struct flist_head *flow_list;
+static struct fio_mutex *flow_lock;
+
+int flow_threshold_exceeded(struct thread_data *td)
+{
+       struct fio_flow *flow = td->flow;
+       int sign;
+
+       if (!flow)
+               return 0;
+
+       sign = td->o.flow > 0 ? 1 : -1;
+       if (sign * flow->flow_counter > td->o.flow_watermark) {
+               if (td->o.flow_sleep)
+                       usleep(td->o.flow_sleep);
+               return 1;
+       }
+
+       /* No synchronization needed because it doesn't
+        * matter if the flow count is slightly inaccurate */
+       flow->flow_counter += td->o.flow;
+       return 0;
+}
+
+static struct fio_flow *flow_get(unsigned int id)
+{
+       struct fio_flow *flow;
+       struct flist_head *n;
+
+       fio_mutex_down(flow_lock);
+
+       flist_for_each(n, flow_list) {
+               flow = flist_entry(n, struct fio_flow, list);
+               if (flow->id == id)
+                       break;
+
+               flow = NULL;
+       }
+
+       if (!flow) {
+               flow = smalloc(sizeof(*flow));
+               flow->refs = 0;
+               INIT_FLIST_HEAD(&flow->list);
+               flow->id = id;
+               flow->flow_counter = 0;
+
+               flist_add_tail(&flow->list, flow_list);
+       }
+
+       flow->refs++;
+       fio_mutex_up(flow_lock);
+       return flow;
+}
+
+static void flow_put(struct fio_flow *flow)
+{
+       fio_mutex_down(flow_lock);
+
+       if (!--flow->refs) {
+               flist_del(&flow->list);
+               sfree(flow);
+       }
+
+       fio_mutex_up(flow_lock);
+}
+
+void flow_init_job(struct thread_data *td)
+{
+       if (td->o.flow)
+               td->flow = flow_get(td->o.flow_id);
+}
+
+void flow_exit_job(struct thread_data *td)
+{
+       if (td->flow) {
+               flow_put(td->flow);
+               td->flow = NULL;
+       }
+}
+
+void flow_init(void)
+{
+       flow_lock = fio_mutex_init(1);
+       flow_list = smalloc(sizeof(*flow_list));
+       INIT_FLIST_HEAD(flow_list);
+}
+
+void flow_exit(void)
+{
+       fio_mutex_remove(flow_lock);
+       sfree(flow_list);
+}
diff --git a/flow.h b/flow.h
new file mode 100644 (file)
index 0000000..c0a45c3
--- /dev/null
+++ b/flow.h
@@ -0,0 +1,11 @@
+#ifndef FIO_FLOW_H
+#define FIO_FLOW_H
+
+int flow_threshold_exceeded(struct thread_data *td);
+void flow_init_job(struct thread_data *td);
+void flow_exit_job(struct thread_data *td);
+
+void flow_exit(void);
+void flow_init(void);
+
+#endif
diff --git a/init.c b/init.c
index 8dc5784..d7d8011 100644 (file)
--- a/init.c
+++ b/init.c
@@ -221,6 +221,7 @@ static void free_shm(void)
 
                threads = NULL;
                file_hash_exit();
+               flow_exit();
                fio_debug_jobp = NULL;
                shmdt(tp);
                shmctl(shm_id, IPC_RMID, &sbuf);
@@ -277,6 +278,9 @@ static int setup_thread_area(void)
        fio_debug_jobp = (void *) hash + file_hash_size;
        *fio_debug_jobp = -1;
        file_hash_init(hash);
+
+       flow_init();
+
        return 0;
 }
 
@@ -324,6 +328,7 @@ static void put_job(struct thread_data *td)
                return;
 
        profile_td_exit(td);
+       flow_exit_job(td);
 
        if (td->error)
                log_info("fio: %s\n", td->verror);
@@ -796,6 +801,8 @@ static int add_job(struct thread_data *td, const char *jobname, int job_add_num)
        if (fixup_options(td))
                goto err;
 
+       flow_init_job(td);
+
        /*
         * IO engines only need this for option callbacks, and the address may
         * change in subprocesses.
index c3fdb56..bd8141a 100644 (file)
--- a/options.c
+++ b/options.c
@@ -2169,6 +2169,40 @@ static struct fio_option options[FIO_MAX_OPTS] = {
                .off1   = td_var_offset(gid),
                .help   = "Run job with this group ID",
        },
+       {
+               .name   = "flow_id",
+               .type   = FIO_OPT_INT,
+               .off1   = td_var_offset(flow_id),
+               .help   = "The flow index ID to use",
+               .def    = "0",
+       },
+       {
+               .name   = "flow",
+               .type   = FIO_OPT_INT,
+               .off1   = td_var_offset(flow),
+               .help   = "Weight for flow control of this job",
+               .parent = "flow_id",
+               .def    = "0",
+       },
+       {
+               .name   = "flow_watermark",
+               .type   = FIO_OPT_INT,
+               .off1   = td_var_offset(flow_watermark),
+               .help   = "High watermark for flow control. This option"
+                       " should be set to the same value for all threads"
+                       " with non-zero flow.",
+               .parent = "flow_id",
+               .def    = "1024",
+       },
+       {
+               .name   = "flow_sleep",
+               .type   = FIO_OPT_INT,
+               .off1   = td_var_offset(flow_sleep),
+               .help   = "How many microseconds to sleep after being held"
+                       " back by the flow control mechanism",
+               .parent = "flow_id",
+               .def    = "0",
+       },
        {
                .name = NULL,
        },