gid=int Set group ID, see uid.
+flow_id=int The ID of the flow. If not specified, it defaults to being a
+ global flow. See flow.
+
+flow=int Weight in token-based flow control. If this value is used, then
+ there is a 'flow counter' which is used to regulate the
+ proportion of activity between two or more jobs. fio attempts
+ to keep this flow counter near zero. The 'flow' parameter
+ stands for how much should be added or subtracted to the flow
+ counter on each iteration of the main I/O loop. That is, if
+ one job has flow=8 and another job has flow=-1, then there
+ will be a roughly 1:8 ratio in how much one runs vs the other.
+
+flow_watermark=int The maximum value that the absolute value of the flow
+ counter is allowed to reach before the job must wait for a
+ lower value of the counter.
+
+flow_sleep=int The period of time, in microseconds, to wait after the flow
+ watermark has been exceeded before retrying operations
+
In addition, there are some parameters which are only valid when a specific
ioengine is in use. These are used identically to normal parameters, with the
caveat that when used on the command line, they must come after the ioengine
rbtree.c smalloc.c filehash.c profile.c debug.c lib/rand.c \
lib/num2str.c lib/ieee754.c $(wildcard crc/*.c) engines/cpu.c \
engines/mmap.c engines/sync.c engines/null.c engines/net.c \
- memalign.c server.c client.c iolog.c backend.c libfio.c
+ memalign.c server.c client.c iolog.c backend.c libfio.c flow.c
ifeq ($(UNAME), Linux)
SOURCE += diskutil.c fifo.c blktrace.c helpers.c cgroup.c trim.c \
}
}
+ if (flow_threshold_exceeded(td))
+ continue;
+
io_u = __get_io_u(td);
if (!io_u)
break;
}
}
+ if (flow_threshold_exceeded(td))
+ continue;
+
io_u = get_io_u(td);
if (!io_u)
break;
--- /dev/null
+# Example usage of flows. The below will have roughly a 1:8 difference
+# between job2 and job1.
+[global]
+norandommap
+thread
+time_based
+runtime=30
+direct=1
+ioengine=libaio
+iodepth=256
+size=100g
+bs=8k
+filename=/tmp/testfile
+flow_watermark=100
+flow_sleep=1000
+
+[job2]
+numjobs=1
+rw=write
+flow=-8
+
+[job1]
+numjobs=1
+rw=randread
+flow=1
.BI gid \fR=\fPint
Set group ID, see \fBuid\fR.
.TP
+.BI flow_id \fR=\fPint
+The ID of the flow. If not specified, it defaults to being a global flow. See
+\fBflow\fR.
+.TP
+.BI flow \fR=\fPint
+Weight in token-based flow control. If this value is used, then there is a
+\fBflow counter\fR which is used to regulate the proportion of activity between
+two or more jobs. fio attempts to keep this flow counter near zero. The
+\fBflow\fR parameter stands for how much should be added or subtracted to the
+flow counter on each iteration of the main I/O loop. That is, if one job has
+\fBflow=8\fR and another job has \fBflow=-1\fR, then there will be a roughly
+1:8 ratio in how much one runs vs the other.
+.TP
+.BI flow_watermark \fR=\fPint
+The maximum value that the absolute value of the flow counter is allowed to
+reach before the job must wait for a lower value of the counter.
+.TP
+.BI flow_sleep \fR=\fPint
+The period of time, in microseconds, to wait after the flow watermark has been
+exceeded before retrying operations
+.TP
.BI clat_percentiles \fR=\fPbool
Enable the reporting of percentiles of completion latencies.
.TP
#include "lib/rand.h"
#include "server.h"
#include "stat.h"
+#include "flow.h"
#ifdef FIO_HAVE_GUASI
#include <guasi.h>
unsigned int uid;
unsigned int gid;
+ int flow_id;
+ int flow;
+ int flow_watermark;
+ unsigned int flow_sleep;
+
unsigned int sync_file_range;
};
unsigned int total_err_count;
int first_error;
+ struct fio_flow *flow;
+
/*
* Can be overloaded by profiles
*/
--- /dev/null
+#include "fio.h"
+#include "mutex.h"
+#include "smalloc.h"
+#include "flist.h"
+
+struct fio_flow {
+ unsigned int refs;
+ struct flist_head list;
+ unsigned int id;
+ long long int flow_counter;
+};
+
+static struct flist_head *flow_list;
+static struct fio_mutex *flow_lock;
+
+int flow_threshold_exceeded(struct thread_data *td)
+{
+ struct fio_flow *flow = td->flow;
+ int sign;
+
+ if (!flow)
+ return 0;
+
+ sign = td->o.flow > 0 ? 1 : -1;
+ if (sign * flow->flow_counter > td->o.flow_watermark) {
+ if (td->o.flow_sleep)
+ usleep(td->o.flow_sleep);
+ return 1;
+ }
+
+ /* No synchronization needed because it doesn't
+ * matter if the flow count is slightly inaccurate */
+ flow->flow_counter += td->o.flow;
+ return 0;
+}
+
+static struct fio_flow *flow_get(unsigned int id)
+{
+ struct fio_flow *flow;
+ struct flist_head *n;
+
+ fio_mutex_down(flow_lock);
+
+ flist_for_each(n, flow_list) {
+ flow = flist_entry(n, struct fio_flow, list);
+ if (flow->id == id)
+ break;
+
+ flow = NULL;
+ }
+
+ if (!flow) {
+ flow = smalloc(sizeof(*flow));
+ flow->refs = 0;
+ INIT_FLIST_HEAD(&flow->list);
+ flow->id = id;
+ flow->flow_counter = 0;
+
+ flist_add_tail(&flow->list, flow_list);
+ }
+
+ flow->refs++;
+ fio_mutex_up(flow_lock);
+ return flow;
+}
+
+static void flow_put(struct fio_flow *flow)
+{
+ fio_mutex_down(flow_lock);
+
+ if (!--flow->refs) {
+ flist_del(&flow->list);
+ sfree(flow);
+ }
+
+ fio_mutex_up(flow_lock);
+}
+
+void flow_init_job(struct thread_data *td)
+{
+ if (td->o.flow)
+ td->flow = flow_get(td->o.flow_id);
+}
+
+void flow_exit_job(struct thread_data *td)
+{
+ if (td->flow) {
+ flow_put(td->flow);
+ td->flow = NULL;
+ }
+}
+
+void flow_init(void)
+{
+ flow_lock = fio_mutex_init(1);
+ flow_list = smalloc(sizeof(*flow_list));
+ INIT_FLIST_HEAD(flow_list);
+}
+
+void flow_exit(void)
+{
+ fio_mutex_remove(flow_lock);
+ sfree(flow_list);
+}
--- /dev/null
+#ifndef FIO_FLOW_H
+#define FIO_FLOW_H
+
+int flow_threshold_exceeded(struct thread_data *td);
+void flow_init_job(struct thread_data *td);
+void flow_exit_job(struct thread_data *td);
+
+void flow_exit(void);
+void flow_init(void);
+
+#endif
threads = NULL;
file_hash_exit();
+ flow_exit();
fio_debug_jobp = NULL;
shmdt(tp);
shmctl(shm_id, IPC_RMID, &sbuf);
fio_debug_jobp = (void *) hash + file_hash_size;
*fio_debug_jobp = -1;
file_hash_init(hash);
+
+ flow_init();
+
return 0;
}
return;
profile_td_exit(td);
+ flow_exit_job(td);
if (td->error)
log_info("fio: %s\n", td->verror);
if (fixup_options(td))
goto err;
+ flow_init_job(td);
+
/*
* IO engines only need this for option callbacks, and the address may
* change in subprocesses.
.off1 = td_var_offset(gid),
.help = "Run job with this group ID",
},
+ {
+ .name = "flow_id",
+ .type = FIO_OPT_INT,
+ .off1 = td_var_offset(flow_id),
+ .help = "The flow index ID to use",
+ .def = "0",
+ },
+ {
+ .name = "flow",
+ .type = FIO_OPT_INT,
+ .off1 = td_var_offset(flow),
+ .help = "Weight for flow control of this job",
+ .parent = "flow_id",
+ .def = "0",
+ },
+ {
+ .name = "flow_watermark",
+ .type = FIO_OPT_INT,
+ .off1 = td_var_offset(flow_watermark),
+ .help = "High watermark for flow control. This option"
+ " should be set to the same value for all threads"
+ " with non-zero flow.",
+ .parent = "flow_id",
+ .def = "1024",
+ },
+ {
+ .name = "flow_sleep",
+ .type = FIO_OPT_INT,
+ .off1 = td_var_offset(flow_sleep),
+ .help = "How many microseconds to sleep after being held"
+ " back by the flow control mechanism",
+ .parent = "flow_id",
+ .def = "0",
+ },
{
.name = NULL,
},