``flow=8`` and another job has ``flow=-1``, then there will be a roughly 1:8
ratio in how much one runs vs the other.
-.. option:: flow_watermark=int
-
- The maximum value that the absolute value of the flow counter is allowed to
- reach before the job must wait for a lower value of the counter.
-
.. option:: flow_sleep=int
- The period of time, in microseconds, to wait after the flow watermark has
- been exceeded before retrying operations.
+ The period of time, in microseconds, to wait after the flow counter
+ has exceeded its proportion before retrying operations.
.. option:: stonewall, wait_for_previous
#define ARCH_CPU_CLOCK_WRAPS
+#define atomic_add(p, v) \
+ atomic_fetch_add((_Atomic typeof(*(p)) *)(p), v)
+#define atomic_sub(p, v) \
+ atomic_fetch_sub((_Atomic typeof(*(p)) *)(p), v)
+#define atomic_load_relaxed(p) \
+ atomic_load_explicit((_Atomic typeof(*(p)) *)(p), \
+ memory_order_relaxed)
#define atomic_load_acquire(p) \
atomic_load_explicit((_Atomic typeof(*(p)) *)(p), \
memory_order_acquire)
o->gid = le32_to_cpu(top->gid);
o->flow_id = __le32_to_cpu(top->flow_id);
o->flow = __le32_to_cpu(top->flow);
- o->flow_watermark = __le32_to_cpu(top->flow_watermark);
o->flow_sleep = le32_to_cpu(top->flow_sleep);
o->sync_file_range = le32_to_cpu(top->sync_file_range);
o->latency_target = le64_to_cpu(top->latency_target);
top->gid = cpu_to_le32(o->gid);
top->flow_id = __cpu_to_le32(o->flow_id);
top->flow = __cpu_to_le32(o->flow);
- top->flow_watermark = __cpu_to_le32(o->flow_watermark);
top->flow_sleep = cpu_to_le32(o->flow_sleep);
top->sync_file_range = cpu_to_le32(o->sync_file_range);
top->latency_target = __cpu_to_le64(o->latency_target);
[backward]
rw=read:-8k
-flow=-2
+flow=2
#offset=50%
size=100g
bs=8k
filename=/tmp/testfile
-flow_watermark=100
flow_sleep=1000
[job2]
numjobs=1
rw=write
-flow=-8
+flow=1
[job1]
numjobs=1
rw=randread
-flow=1
+flow=8
flow. See \fBflow\fR.
.TP
.BI flow \fR=\fPint
-Weight in token-based flow control. If this value is used, then there is
-a 'flow counter' which is used to regulate the proportion of activity between
-two or more jobs. Fio attempts to keep this flow counter near zero. The
-\fBflow\fR parameter stands for how much should be added or subtracted to the
-flow counter on each iteration of the main I/O loop. That is, if one job has
-`flow=8' and another job has `flow=\-1', then there will be a roughly 1:8
-ratio in how much one runs vs the other.
-.TP
-.BI flow_watermark \fR=\fPint
-The maximum value that the absolute value of the flow counter is allowed to
-reach before the job must wait for a lower value of the counter.
+Weight in token-based flow control. If this value is used,
+then fio regulates the activity between two or more jobs
+sharing the same flow_id.
+Fio attempts to keep each job activity proportional to other jobs' activities
+in the same flow_id group, with respect to requested weight per job.
+That is, if one job has `flow=3', another job has `flow=2'
+and another with `flow=1`, then there will be a roughly 3:2:1 ratio
+in how much one runs vs the others.
.TP
.BI flow_sleep \fR=\fPint
-The period of time, in microseconds, to wait after the flow watermark has
-been exceeded before retrying operations.
+The period of time, in microseconds, to wait after the flow counter
+has exceeded its proportion before retrying operations.
.TP
.BI stonewall "\fR,\fB wait_for_previous"
Wait for preceding jobs in the job file to exit, before starting this
int first_error;
struct fio_flow *flow;
+ unsigned long long flow_counter;
/*
* Can be overloaded by profiles
unsigned int refs;
struct flist_head list;
unsigned int id;
- long long int flow_counter;
+ unsigned long long flow_counter;
+ unsigned int total_weight;
};
static struct flist_head *flow_list;
int flow_threshold_exceeded(struct thread_data *td)
{
struct fio_flow *flow = td->flow;
- long long flow_counter;
+ double flow_counter_ratio, flow_weight_ratio;
if (!flow)
return 0;
- if (td->o.flow > 0)
- flow_counter = flow->flow_counter;
- else
- flow_counter = -flow->flow_counter;
-
- if (flow_counter > td->o.flow_watermark) {
+ flow_counter_ratio = (double)td->flow_counter /
+ atomic_load_relaxed(&flow->flow_counter);
+ flow_weight_ratio = (double)td->o.flow /
+ atomic_load_relaxed(&flow->total_weight);
+
+ /*
+ * each thread/process executing a fio job will stall based on the
+ * expected user ratio for a given flow_id group. the idea is to keep
+ * 2 counters, flow and job-specific counter to test if the
+ * ratio between them is proportional to other jobs in the same flow_id
+ */
+ if (flow_counter_ratio > flow_weight_ratio) {
if (td->o.flow_sleep) {
io_u_quiesce(td);
usleep(td->o.flow_sleep);
return 1;
}
- /* No synchronization needed because it doesn't
- * matter if the flow count is slightly inaccurate */
- flow->flow_counter += td->o.flow;
+ /*
+ * increment flow(shared counter, therefore atomically)
+ * and job-specific counter
+ */
+ atomic_add(&flow->flow_counter, 1);
+ ++td->flow_counter;
+
return 0;
}
flow->refs = 0;
INIT_FLIST_HEAD(&flow->list);
flow->id = id;
- flow->flow_counter = 0;
+ flow->flow_counter = 1;
+ flow->total_weight = 0;
flist_add_tail(&flow->list, flow_list);
}
return flow;
}
-static void flow_put(struct fio_flow *flow)
+static void flow_put(struct fio_flow *flow, unsigned long long flow_counter,
+ unsigned int weight)
{
if (!flow_lock)
return;
fio_sem_down(flow_lock);
+ atomic_sub(&flow->flow_counter, flow_counter);
+ atomic_sub(&flow->total_weight, weight);
+
if (!--flow->refs) {
+ assert(flow->flow_counter == 1);
flist_del(&flow->list);
sfree(flow);
}
void flow_init_job(struct thread_data *td)
{
- if (td->o.flow)
+ if (td->o.flow) {
td->flow = flow_get(td->o.flow_id);
+ td->flow_counter = 0;
+ atomic_add(&td->flow->total_weight, td->o.flow);
+ }
}
void flow_exit_job(struct thread_data *td)
{
if (td->flow) {
- flow_put(td->flow);
+ flow_put(td->flow, td->flow_counter, td->o.flow);
td->flow = NULL;
}
}
#ifndef FIO_FLOW_H
#define FIO_FLOW_H
+#define FLOW_MAX_WEIGHT 1000
+
int flow_threshold_exceeded(struct thread_data *td);
void flow_init_job(struct thread_data *td);
void flow_exit_job(struct thread_data *td);
{
.name = "flow",
.lname = "I/O flow weight",
- .type = FIO_OPT_INT,
+ .type = FIO_OPT_ULL,
.off1 = offsetof(struct thread_options, flow),
.help = "Weight for flow control of this job",
.parent = "flow_id",
.hide = 1,
.def = "0",
+ .maxval = FLOW_MAX_WEIGHT,
.category = FIO_OPT_C_IO,
.group = FIO_OPT_G_IO_FLOW,
},
{
.name = "flow_watermark",
.lname = "I/O flow watermark",
- .type = FIO_OPT_INT,
- .off1 = offsetof(struct thread_options, flow_watermark),
- .help = "High watermark for flow control. This option"
- " should be set to the same value for all threads"
- " with non-zero flow.",
- .parent = "flow_id",
- .hide = 1,
- .def = "1024",
+ .type = FIO_OPT_SOFT_DEPRECATED,
.category = FIO_OPT_C_IO,
.group = FIO_OPT_G_IO_FLOW,
},
};
enum {
- FIO_SERVER_VER = 84,
+ FIO_SERVER_VER = 85,
FIO_SERVER_MAX_FRAGMENT_PDU = 1024,
FIO_SERVER_MAX_CMD_MB = 2048,
flow_id=1
[flow1]
-flow=-8
+flow=1
rate_iops=1000
[flow2]
-flow=1
+flow=8
-# Expected results: no parse warnings, runs and with roughly 1/8 iops between
-# the two jobs.
-# Buggy result: parse warning on flow value overflow, no 1/8 division between
-# jobs.
+# Expected results: no parse warnings, runs and with roughly 1:5:10 iops
+# between the three jobs.
+# Buggy result: parse warning on flow value overflow, no 1:5:10 division
+# between jobs.
#
[global]
bs=4k
ioengine=null
size=100g
-runtime=10
+runtime=12
flow_id=1
-gtod_cpu=1
+flow_sleep=100
+thread
+log_avg_msec=1000
+write_iops_log=t0012.fio
[flow1]
-flow=-8
-rate_iops=1000
+flow=1
[flow2]
-flow=1
+flow=5
+
+[flow3]
+flow=10
--- /dev/null
+# Expected results: no parse warnings, runs and with roughly 1:2:3 iops
+# between the three jobs for the first 5 seconds, then
+# runs with roughly 1:2 iops between the two jobs for
+# the remaining 5 seconds.
+#
+# Buggy result: parse warning on flow value overflow, no 1:2:3 division between
+# the three jobs for the first 5 seconds or no 1:2 division between
+# the first two jobs for the remaining 5 seconds.
+#
+
+[global]
+bs=4k
+ioengine=null
+size=100g
+runtime=12
+flow_id=1
+thread
+log_avg_msec=1000
+write_iops_log=t0014.fio
+
+[flow1]
+flow=1
+
+[flow2]
+flow=2
+
+[flow3]
+flow=3
+runtime=5
self.passed = False
+class FioJobTest_t0012(FioJobTest):
+ """Test consists of fio test job t0012
+ Confirm ratios of job iops are 1:5:10
+ job1,job2,job3 respectively"""
+
+ def check_result(self):
+ super(FioJobTest_t0012, self).check_result()
+
+ if not self.passed:
+ return
+
+ iops_files = []
+ for i in range(1,4):
+ file_data, success = self.get_file(os.path.join(self.test_dir, "{0}_iops.{1}.log".format(os.path.basename(self.fio_job), i)))
+
+ if not success:
+ self.failure_reason = "{0} unable to open output file,".format(self.failure_reason)
+ self.passed = False
+ return
+
+ iops_files.append(file_data.splitlines())
+
+ # there are 9 samples for job1 and job2, 4 samples for job3
+ iops1 = 0.0
+ iops2 = 0.0
+ iops3 = 0.0
+ for i in range(9):
+ iops1 = iops1 + float(iops_files[0][i].split(',')[1])
+ iops2 = iops2 + float(iops_files[1][i].split(',')[1])
+ iops3 = iops3 + float(iops_files[2][i].split(',')[1])
+
+ ratio1 = iops3/iops2
+ ratio2 = iops3/iops1
+ logging.debug(
+ "sample {0}: job1 iops={1} job2 iops={2} job3 iops={3} job3/job2={4:.3f} job3/job1={5:.3f}".format(
+ i, iops1, iops2, iops3, ratio1, ratio2
+ )
+ )
+
+ # test job1 and job2 succeeded to recalibrate
+ if ratio1 < 1 or ratio1 > 3 or ratio2 < 7 or ratio2 > 13:
+ self.failure_reason = "{0} iops ratio mismatch iops1={1} iops2={2} iops3={3} expected r1~2 r2~10 got r1={4:.3f} r2={5:.3f},".format(
+ self.failure_reason, iops1, iops2, iops3, ratio1, ratio2
+ )
+ self.passed = False
+ return
+
+
+class FioJobTest_t0014(FioJobTest):
+ """Test consists of fio test job t0014
+ Confirm that job1_iops / job2_iops ~ 1:2 for entire duration
+ and that job1_iops / job3_iops ~ 1:3 for first half of duration.
+
+ The test is about making sure the flow feature can
+ re-calibrate the activity dynamically"""
+
+ def check_result(self):
+ super(FioJobTest_t0014, self).check_result()
+
+ if not self.passed:
+ return
+
+ iops_files = []
+ for i in range(1,4):
+ file_data, success = self.get_file(os.path.join(self.test_dir, "{0}_iops.{1}.log".format(os.path.basename(self.fio_job), i)))
+
+ if not success:
+ self.failure_reason = "{0} unable to open output file,".format(self.failure_reason)
+ self.passed = False
+ return
+
+ iops_files.append(file_data.splitlines())
+
+ # there are 9 samples for job1 and job2, 4 samples for job3
+ iops1 = 0.0
+ iops2 = 0.0
+ iops3 = 0.0
+ for i in range(9):
+ if i < 4:
+ iops3 = iops3 + float(iops_files[2][i].split(',')[1])
+ elif i == 4:
+ ratio1 = iops1 / iops2
+ ratio2 = iops1 / iops3
+
+
+ if ratio1 < 0.43 or ratio1 > 0.57 or ratio2 < 0.21 or ratio2 > 0.45:
+ self.failure_reason = "{0} iops ratio mismatch iops1={1} iops2={2} iops3={3}\
+ expected r1~0.5 r2~0.33 got r1={4:.3f} r2={5:.3f},".format(
+ self.failure_reason, iops1, iops2, iops3, ratio1, ratio2
+ )
+ self.passed = False
+
+ iops1 = iops1 + float(iops_files[0][i].split(',')[1])
+ iops2 = iops2 + float(iops_files[1][i].split(',')[1])
+
+ ratio1 = iops1/iops2
+ ratio2 = iops1/iops3
+ logging.debug(
+ "sample {0}: job1 iops={1} job2 iops={2} job3 iops={3} job1/job2={4:.3f} job1/job3={5:.3f}".format(
+ i, iops1, iops2, iops3, ratio1, ratio2
+ )
+ )
+
+ # test job1 and job2 succeeded to recalibrate
+ if ratio1 < 0.43 or ratio1 > 0.57:
+ self.failure_reason = "{0} iops ratio mismatch iops1={1} iops2={2} expected ratio~0.5 got ratio={3:.3f},".format(
+ self.failure_reason, iops1, iops2, ratio1
+ )
+ self.passed = False
+ return
+
+
class FioJobTest_iops_rate(FioJobTest):
"""Test consists of fio test job t0009
Confirm that job0 iops == 1000
self.failure_reason = "{0} iops value mismatch,".format(self.failure_reason)
self.passed = False
- if ratio < 7 or ratio > 9:
+ if ratio < 6 or ratio > 10:
self.failure_reason = "{0} iops ratio mismatch,".format(self.failure_reason)
self.passed = False
},
{
'test_id': 12,
- 'test_class': FioJobTest_iops_rate,
+ 'test_class': FioJobTest_t0012,
'job': 't0012.fio',
'success': SUCCESS_DEFAULT,
'pre_job': None,
'pre_success': None,
'output_format': 'json',
- 'requirements': [Requirements.not_macos],
- # mac os does not support CPU affinity
- # which is required for gtod offloading
+ 'requirements': [],
},
{
'test_id': 13,
'output_format': 'json',
'requirements': [],
},
+ {
+ 'test_id': 14,
+ 'test_class': FioJobTest_t0014,
+ 'job': 't0014.fio',
+ 'success': SUCCESS_DEFAULT,
+ 'pre_job': None,
+ 'pre_success': None,
+ 'output_format': 'json',
+ 'requirements': [],
+ },
{
'test_id': 1000,
'test_class': FioExeTest,
unsigned int uid;
unsigned int gid;
- int flow_id;
- int flow;
- int flow_watermark;
- unsigned int flow_sleep;
-
unsigned int offset_increment_percent;
unsigned long long offset_increment;
unsigned long long number_ios;
fio_fp64_t latency_percentile;
uint32_t latency_run;
+ /*
+ * flow support
+ */
+ int flow_id;
+ unsigned int flow;
+ unsigned int flow_sleep;
+
unsigned int sig_figs;
unsigned block_error_hist;
uint32_t uid;
uint32_t gid;
- int32_t flow_id;
- int32_t flow;
- int32_t flow_watermark;
- uint32_t flow_sleep;
-
uint32_t offset_increment_percent;
uint64_t offset_increment;
uint64_t number_ios;
fio_fp64_t latency_percentile;
uint32_t latency_run;
+ /*
+ * flow support
+ */
+ int32_t flow_id;
+ uint32_t flow;
+ uint32_t flow_sleep;
+
uint32_t sig_figs;
uint32_t block_error_hist;