--- /dev/null
+language: c
+compiler:
+ - clang
+ - gcc
+before_install:
+ - sudo apt-get -qq update
+ - sudo apt-get install -y libaio-dev libnuma-dev
#!/bin/sh
GVF=FIO-VERSION-FILE
-DEF_VER=fio-2.8
+DEF_VER=fio-2.9
LF='
'
disk log, that can quickly grow to a very large size. Setting
this option makes fio average the each log entry over the
specified period of time, reducing the resolution of the log.
- See log_max as well. Defaults to 0, logging all entries.
+ See log_max_value as well. Defaults to 0, logging all entries.
+
+log_max_value=bool If log_avg_msec is set, fio logs the average over that
+ window. If you instead want to log the maximum value, set this
+ option to 1. Defaults to 0, meaning that averaged values are
+ logged.
-log_max=bool If log_avg_msec is set, fio logs the average over that window.
- If you instead want to log the maximum value, set this option
- to 1. Defaults to 0, meaning that averaged values are logged.
-.
log_offset=int If this is set, the iolog options will include the byte
offset for the IO entry as well as the other data values.
1 : allocate space immidietly inside defragment event,
and free right after event
+[rbd] clustername=str Specifies the name of the Ceph cluster.
+[rbd] rbdname=str Specifies the name of the RBD.
+[rbd] pool=str Specifies the naem of the Ceph pool containing RBD.
+[rbd] clientname=str Specifies the username (without the 'client.' prefix)
+ used to access the Ceph cluster. If the clustername is
+ specified, the clientmae shall be the full type.id
+ string. If no type. prefix is given, fio will add
+ 'client.' by default.
+
[mtd] skip_bad=bool Skip operations against known bad blocks.
[libhdfs] hdfsdirectory libhdfs will create chunk in this HDFS directory
cpu= CPU usage. User and system time, along with the number
of context switches this thread went through, usage of
system and user time, and finally the number of major
- and minor page faults.
+ and minor page faults. The CPU utilization numbers are
+ averages for the jobs in that reporting group, while the
+ context and fault counters are summed.
IO depths= The distribution of io depths over the job life time. The
numbers are divided into powers of 2, so for example the
16= entries includes depths up to that value but higher
CFLAGS = -std=gnu99 -Wwrite-strings -Wall -Wdeclaration-after-statement $(OPTFLAGS) $(EXTFLAGS) $(BUILD_CFLAGS) -I. -I$(SRCDIR)
LIBS += -lm $(EXTLIBS)
PROGS = fio
-SCRIPTS = $(addprefix $(SRCDIR)/,tools/fio_generate_plots tools/plot/fio2gnuplot tools/genfio)
+SCRIPTS = $(addprefix $(SRCDIR)/,tools/fio_generate_plots tools/plot/fio2gnuplot tools/genfio tools/fiologpaser.py)
ifndef CONFIG_FIO_NO_OPT
CFLAGS += -O3
T_VS_OBJS = t/verify-state.o t/log.o crc/crc32c.o crc/crc32c-intel.o t/debug.o
T_VS_PROGS = t/fio-verify-state
+T_PIPE_ASYNC_OBJS = t/read-to-pipe-async.o
+T_PIPE_ASYNC_PROGS = t/read-to-pipe-async
+
+T_MEMLOCK_OBJS = t/memlock.o
+T_MEMLOCK_PROGS = t/memlock
+
T_OBJS = $(T_SMALLOC_OBJS)
T_OBJS += $(T_IEEE_OBJS)
T_OBJS += $(T_ZIPF_OBJS)
T_OBJS += $(T_BTRACE_FIO_OBJS)
T_OBJS += $(T_DEDUPE_OBJS)
T_OBJS += $(T_VS_OBJS)
+T_OBJS += $(T_PIPE_ASYNC_OBJS)
+T_OBJS += $(T_MEMLOCK_OBJS)
ifneq (,$(findstring CYGWIN,$(CONFIG_TARGET_OS)))
T_DEDUPE_OBJS += os/windows/posix.o lib/hweight.o
printing.o: printing.c printing.h
$(QUIET_CC)$(CC) $(CFLAGS) $(GTK_CFLAGS) $(CPPFLAGS) -c $<
+t/read-to-pipe-async: $(T_PIPE_ASYNC_OBJS)
+ $(QUIET_LINK)$(CC) $(LDFLAGS) $(CFLAGS) -o $@ $(T_PIPE_ASYNC_OBJS) $(LIBS)
+
+t/memlock: $(T_MEMLOCK_OBJS)
+ $(QUIET_LINK)$(CC) $(LDFLAGS) $(CFLAGS) -o $@ $(T_MEMLOCK_OBJS) $(LIBS)
+
t/stest: $(T_SMALLOC_OBJS)
$(QUIET_LINK)$(CC) $(LDFLAGS) $(CFLAGS) -o $@ $(T_SMALLOC_OBJS) $(LIBS)
@man -t tools/fio_generate_plots.1 | ps2pdf - fio_generate_plots.pdf
@man -t tools/plot/fio2gnuplot.1 | ps2pdf - fio2gnuplot.pdf
+test:
+
install: $(PROGS) $(SCRIPTS) tools/plot/fio2gnuplot.1 FORCE
$(INSTALL) -m 755 -d $(DESTDIR)$(bindir)
$(INSTALL) $(PROGS) $(SCRIPTS) $(DESTDIR)$(bindir)
if (flow_threshold_exceeded(td))
continue;
- if (!td->o.time_based && bytes_issued >= total_bytes)
+ /*
+ * Break if we exceeded the bytes. The exception is time
+ * based runs, but we still need to break out of the loop
+ * for those to run verification, if enabled.
+ */
+ if (bytes_issued >= total_bytes &&
+ (!td->o.time_based ||
+ (td->o.time_based && td->o.verify != VERIFY_NONE)))
break;
io_u = get_io_u(td);
bytes_done[i] = td->bytes_done[i] - bytes_done[i];
}
+static void free_file_completion_logging(struct thread_data *td)
+{
+ struct fio_file *f;
+ unsigned int i;
+
+ for_each_file(td, f, i) {
+ if (!f->last_write_comp)
+ break;
+ sfree(f->last_write_comp);
+ }
+}
+
+static int init_file_completion_logging(struct thread_data *td,
+ unsigned int depth)
+{
+ struct fio_file *f;
+ unsigned int i;
+
+ if (td->o.verify == VERIFY_NONE || !td->o.verify_state_save)
+ return 0;
+
+ for_each_file(td, f, i) {
+ f->last_write_comp = scalloc(depth, sizeof(uint64_t));
+ if (!f->last_write_comp)
+ goto cleanup;
+ }
+
+ return 0;
+
+cleanup:
+ free_file_completion_logging(td);
+ log_err("fio: failed to alloc write comp data\n");
+ return 1;
+}
+
static void cleanup_io_u(struct thread_data *td)
{
struct io_u *io_u;
io_u_qexit(&td->io_u_freelist);
io_u_qexit(&td->io_u_all);
- if (td->last_write_comp)
- sfree(td->last_write_comp);
+ free_file_completion_logging(td);
}
static int init_io_u(struct thread_data *td)
p += max_bs;
}
- if (td->o.verify != VERIFY_NONE) {
- td->last_write_comp = scalloc(max_units, sizeof(uint64_t));
- if (!td->last_write_comp) {
- log_err("fio: failed to alloc write comp data\n");
- return 1;
- }
- }
+ if (init_file_completion_logging(td, max_units))
+ return 1;
return 0;
}
if (is_backend) {
void *data;
- int ver;
ret = fio_server_get_verify_state(td->o.name,
- td->thread_number - 1, &data, &ver);
+ td->thread_number - 1, &data);
if (!ret)
- verify_convert_assign_state(td, data, ver);
+ verify_assign_state(td, data);
} else
ret = verify_load_state(td, "local");
return NULL;
}
-int fio_client_add_ini_file(void *cookie, const char *ini_file, int remote)
+int fio_client_add_ini_file(void *cookie, const char *ini_file, bool remote)
{
struct fio_client *client = cookie;
struct client_file *cf;
}
int fio_client_send_ini(struct fio_client *client, const char *filename,
- int remote)
+ bool remote)
{
int ret;
struct client_file {
char *file;
- int remote;
+ bool remote;
};
struct fio_client {
extern int fio_start_client(struct fio_client *);
extern int fio_start_all_clients(void);
extern int fio_clients_send_ini(const char *);
-extern int fio_client_send_ini(struct fio_client *, const char *, int);
+extern int fio_client_send_ini(struct fio_client *, const char *, bool);
extern int fio_handle_clients(struct client_ops *);
extern int fio_client_add(struct client_ops *, const char *, void **);
extern struct fio_client *fio_client_add_explicit(struct client_ops *, const char *, int, int);
extern void fio_client_add_cmd_option(void *, const char *);
-extern int fio_client_add_ini_file(void *, const char *, int);
+extern int fio_client_add_ini_file(void *, const char *, bool);
extern int fio_client_terminate(struct fio_client *);
extern void fio_clients_terminate(void);
extern struct fio_client *fio_get_client(struct fio_client *);
struct rbd_options {
void *pad;
+ char *cluster_name;
char *rbd_name;
char *pool_name;
char *client_name;
};
static struct fio_option options[] = {
+ {
+ .name = "clustername",
+ .lname = "ceph cluster name",
+ .type = FIO_OPT_STR_STORE,
+ .help = "Cluster name for ceph",
+ .off1 = offsetof(struct rbd_options, cluster_name),
+ .category = FIO_OPT_C_ENGINE,
+ .group = FIO_OPT_G_RBD,
+ },
{
.name = "rbdname",
.lname = "rbd engine rbdname",
struct rbd_options *o = td->eo;
int r;
- r = rados_create(&rbd->cluster, o->client_name);
+ if (o->cluster_name) {
+ char *client_name = NULL;
+
+ /*
+ * If we specify cluser name, the rados_creat2
+ * will not assume 'client.'. name is considered
+ * as a full type.id namestr
+ */
+ if (!index(o->client_name, '.')) {
+ client_name = calloc(1, strlen("client.") +
+ strlen(o->client_name) + 1);
+ strcat(client_name, "client.");
+ o->client_name = strcat(client_name, o->client_name);
+ }
+ r = rados_create2(&rbd->cluster, o->cluster_name,
+ o->client_name, 0);
+ } else
+ r = rados_create(&rbd->cluster, o->client_name);
+
if (r < 0) {
log_err("rados_create failed.\n");
goto failed_early;
rd->pd = ibv_alloc_pd(rd->cm_id->verbs);
if (rd->pd == NULL) {
- log_err("fio: ibv_alloc_pd fail\n");
+ log_err("fio: ibv_alloc_pd fail: %m\n");
return 1;
}
else
rd->channel = ibv_create_comp_channel(rd->cm_id->verbs);
if (rd->channel == NULL) {
- log_err("fio: ibv_create_comp_channel fail\n");
+ log_err("fio: ibv_create_comp_channel fail: %m\n");
goto err1;
}
rd->cq = ibv_create_cq(rd->cm_id->verbs,
qp_depth, rd, rd->channel, 0);
if (rd->cq == NULL) {
- log_err("fio: ibv_create_cq failed\n");
+ log_err("fio: ibv_create_cq failed: %m\n");
goto err2;
}
if (ibv_req_notify_cq(rd->cq, 0) != 0) {
- log_err("fio: ibv_create_cq failed\n");
+ log_err("fio: ibv_req_notify_cq failed: %m\n");
goto err3;
}
if (rd->is_client == 0) {
if (rdma_create_qp(rd->child_cm_id, rd->pd, &init_attr) != 0) {
- log_err("fio: rdma_create_qp failed\n");
+ log_err("fio: rdma_create_qp failed: %m\n");
goto err3;
}
rd->qp = rd->child_cm_id->qp;
} else {
if (rdma_create_qp(rd->cm_id, rd->pd, &init_attr) != 0) {
- log_err("fio: rdma_create_qp failed\n");
+ log_err("fio: rdma_create_qp failed: %m\n");
goto err3;
}
rd->qp = rd->cm_id->qp;
rd->recv_mr = ibv_reg_mr(rd->pd, &rd->recv_buf, sizeof(rd->recv_buf),
IBV_ACCESS_LOCAL_WRITE);
if (rd->recv_mr == NULL) {
- log_err("fio: recv_buf reg_mr failed\n");
+ log_err("fio: recv_buf reg_mr failed: %m\n");
return 1;
}
rd->send_mr = ibv_reg_mr(rd->pd, &rd->send_buf, sizeof(rd->send_buf),
0);
if (rd->send_mr == NULL) {
- log_err("fio: send_buf reg_mr failed\n");
+ log_err("fio: send_buf reg_mr failed: %m\n");
ibv_dereg_mr(rd->recv_mr);
return 1;
}
}
if (ibv_post_send(rd->qp, &r_io_u_d->sq_wr, &bad_wr) != 0) {
- log_err("fio: ibv_post_send fail\n");
+ log_err("fio: ibv_post_send fail: %m\n");
return -1;
}
r_io_u_d = io_us[i]->engine_data;
if (ibv_post_recv(rd->qp, &r_io_u_d->rq_wr, &bad_wr) !=
0) {
- log_err("fio: ibv_post_recv fail\n");
+ log_err("fio: ibv_post_recv fail: %m\n");
return 1;
}
}
|| (rd->rdma_protocol == FIO_RDMA_MEM_WRITE)) {
/* re-post the rq_wr */
if (ibv_post_recv(rd->qp, &rd->rq_wr, &bad_wr) != 0) {
- log_err("fio: ibv_post_recv fail\n");
+ log_err("fio: ibv_post_recv fail: %m\n");
return 1;
}
conn_param.retry_count = 10;
if (rdma_connect(rd->cm_id, &conn_param) != 0) {
- log_err("fio: rdma_connect fail\n");
+ log_err("fio: rdma_connect fail: %m\n");
return 1;
}
rd->send_buf.nr = htonl(td->o.iodepth);
if (ibv_post_send(rd->qp, &rd->sq_wr, &bad_wr) != 0) {
- log_err("fio: ibv_post_send fail");
+ log_err("fio: ibv_post_send fail: %m");
return 1;
}
conn_param.initiator_depth = 1;
if (rdma_accept(rd->child_cm_id, &conn_param) != 0) {
- log_err("fio: rdma_accept\n");
+ log_err("fio: rdma_accept: %m\n");
return 1;
}
ret = rdma_poll_wait(td, IBV_WC_RECV) < 0;
if (ibv_post_send(rd->qp, &rd->sq_wr, &bad_wr) != 0) {
- log_err("fio: ibv_post_send fail");
+ log_err("fio: ibv_post_send fail: %m");
return 1;
}
|| (rd->rdma_protocol ==
FIO_RDMA_MEM_READ))) {
if (ibv_post_send(rd->qp, &rd->sq_wr, &bad_wr) != 0) {
- log_err("fio: ibv_post_send fail");
+ log_err("fio: ibv_post_send fail: %m");
return 1;
}
/* rdma_listen */
if (rdma_bind_addr(rd->cm_id, (struct sockaddr *)&rd->addr) != 0) {
- log_err("fio: rdma_bind_addr fail\n");
+ log_err("fio: rdma_bind_addr fail: %m\n");
return 1;
}
if (rdma_listen(rd->cm_id, 3) != 0) {
- log_err("fio: rdma_listen fail\n");
+ log_err("fio: rdma_listen fail: %m\n");
return 1;
}
/* post recv buf */
if (ibv_post_recv(rd->qp, &rd->rq_wr, &bad_wr) != 0) {
- log_err("fio: ibv_post_recv fail\n");
+ log_err("fio: ibv_post_recv fail: %m\n");
return 1;
}
rd->cm_channel = rdma_create_event_channel();
if (!rd->cm_channel) {
- log_err("fio: rdma_create_event_channel fail\n");
+ log_err("fio: rdma_create_event_channel fail: %m\n");
return 1;
}
ret = rdma_create_id(rd->cm_channel, &rd->cm_id, rd, RDMA_PS_TCP);
if (ret) {
- log_err("fio: rdma_create_id fail\n");
+ log_err("fio: rdma_create_id fail: %m\n");
return 1;
}
IBV_ACCESS_REMOTE_READ |
IBV_ACCESS_REMOTE_WRITE);
if (io_u->mr == NULL) {
- log_err("fio: ibv_reg_mr io_u failed\n");
+ log_err("fio: ibv_reg_mr io_u failed: %m\n");
return 1;
}
#ifdef CONFIG_PWRITEV
register_ioengine(&ioengine_pvrw);
#endif
+#ifdef CONFIG_PWRITEV2
+ register_ioengine(&ioengine_pvrw2);
+#endif
}
static void fio_exit fio_syncio_unregister(void)
#ifdef CONFIG_PWRITEV
unregister_ioengine(&ioengine_pvrw);
#endif
+#ifdef CONFIG_PWRITEV2
+ unregister_ioengine(&ioengine_pvrw2);
+#endif
}
uint64_t first_write;
uint64_t last_write;
+ /*
+ * Tracks the last iodepth number of completed writes, if data
+ * verification is enabled
+ */
+ uint64_t *last_write_comp;
+ unsigned int last_write_idx;
+
/*
* For use by the io engine
*/
*/
total_size = 0;
for_each_file(td, f, i) {
+ f->fileno = i;
if (f->real_file_size == -1ULL)
total_size = -1ULL;
else
IO that completes. When writing to the disk log, that can quickly grow to a
very large size. Setting this option makes fio average the each log entry
over the specified period of time, reducing the resolution of the log. See
-\fBlog_max\fR as well. Defaults to 0, logging all entries.
+\fBlog_max_value\fR as well. Defaults to 0, logging all entries.
.TP
-.BI log_max \fR=\fPbool
+.BI log_max_value \fR=\fPbool
If \fBlog_avg_msec\fR is set, fio logs the average over that window. If you
instead want to log the maximum value, set this option to 1. Defaults to
0, meaning that averaged values are logged.
.BI 1:
allocate space immediately inside defragment event, and free right after event
.RE
+.TP
+.BI (rbd)clustername \fR=\fPstr
+Specifies the name of the ceph cluster.
.TP
.BI (rbd)rbdname \fR=\fPstr
Specifies the name of the RBD.
Specifies the name of the Ceph pool containing the RBD.
.TP
.BI (rbd)clientname \fR=\fPstr
-Specifies the username (without the 'client.' prefix) used to access the Ceph cluster.
+Specifies the username (without the 'client.' prefix) used to access the Ceph
+cluster. If the clustername is specified, the clientname shall be the full
+type.id string. If no type. prefix is given, fio will add 'client.' by default.
.TP
.BI (mtd)skipbad \fR=\fPbool
Skip operations against known bad blocks.
.TP
.B cpu
CPU usage statistics. Includes user and system time, number of context switches
-this thread went through and number of major and minor page faults.
+this thread went through and number of major and minor page faults. The CPU
+utilization numbers are averages for the jobs in that reporting group, while
+the context and fault counters are summed.
.TP
.B IO depths
Distribution of I/O depths. Each depth includes everything less than (or equal)
uint64_t stat_io_blocks[DDIR_RWDIR_CNT];
struct timeval iops_sample_time;
- /*
- * Tracks the last iodepth number of completed writes, if data
- * verification is enabled
- */
- uint64_t *last_write_comp;
- unsigned int last_write_idx;
-
volatile int update_rusage;
struct fio_mutex *rusage_sem;
struct rusage ru_start;
free(gco);
}
- ret = fio_client_send_ini(gc->client, ge->job_file, 0);
+ ret = fio_client_send_ini(gc->client, ge->job_file, false);
if (!ret)
return 0;
#error Define GOLDEN_RATIO_PRIME for your wordsize.
#endif
-#define GR_PRIME_64 0x9e37fffffffc0001ULL
+/*
+ * The above primes are actively bad for hashing, since they are
+ * too sparse. The 32-bit one is mostly ok, the 64-bit one causes
+ * real problems. Besides, the "prime" part is pointless for the
+ * multiplicative hash.
+ *
+ * Although a random odd number will do, it turns out that the golden
+ * ratio phi = (sqrt(5)-1)/2, or its negative, has particularly nice
+ * properties.
+ *
+ * These are the negative, (1 - phi) = (phi^2) = (3 - sqrt(5))/2.
+ * (See Knuth vol 3, section 6.4, exercise 9.)
+ */
+#define GOLDEN_RATIO_32 0x61C88647
+#define GOLDEN_RATIO_64 0x61C8864680B583EBull
static inline unsigned long __hash_long(unsigned long val)
{
unsigned long hash = val;
#if BITS_PER_LONG == 64
+ hash *= GOLDEN_RATIO_64;
+#else
/* Sigh, gcc can't optimise this alone like it does for 32 bits. */
unsigned long n = hash;
n <<= 18;
hash += n;
n <<= 2;
hash += n;
-#else
- /* On some cpus multiply is faster, on others gcc will do shifts */
- hash *= GOLDEN_RATIO_PRIME;
#endif
return hash;
static inline uint64_t __hash_u64(uint64_t val)
{
- return val * GR_PRIME_64;
+ return val * GOLDEN_RATIO_64;
}
static inline unsigned long hash_ptr(void *ptr, unsigned int bits)
* Bob Jenkins jhash
*/
-#define JHASH_INITVAL GOLDEN_RATIO_PRIME
+#define JHASH_INITVAL GOLDEN_RATIO_32
static inline uint32_t rol32(uint32_t word, uint32_t shift)
{
!strncmp(argv[optind], "-", 1))
break;
- if (fio_client_add_ini_file(cur_client, argv[optind], 0))
+ if (fio_client_add_ini_file(cur_client, argv[optind], false))
break;
optind++;
}
break;
case 'R':
did_arg = 1;
- if (fio_client_add_ini_file(cur_client, optarg, 1)) {
+ if (fio_client_add_ini_file(cur_client, optarg, true)) {
do_exit++;
exit_val = 1;
}
}
}
+static void file_log_write_comp(const struct thread_data *td, struct fio_file *f,
+ uint64_t offset, unsigned int bytes)
+{
+ int idx;
+
+ if (!f)
+ return;
+
+ if (f->first_write == -1ULL || offset < f->first_write)
+ f->first_write = offset;
+ if (f->last_write == -1ULL || ((offset + bytes) > f->last_write))
+ f->last_write = offset + bytes;
+
+ if (!f->last_write_comp)
+ return;
+
+ idx = f->last_write_idx++;
+ f->last_write_comp[idx] = offset;
+ if (f->last_write_idx == td->o.iodepth)
+ f->last_write_idx = 0;
+}
+
static void io_completed(struct thread_data *td, struct io_u **io_u_ptr,
struct io_completion_data *icd)
{
if (!(io_u->flags & IO_U_F_VER_LIST))
td->this_io_bytes[ddir] += bytes;
- if (ddir == DDIR_WRITE) {
- if (f) {
- if (f->first_write == -1ULL ||
- io_u->offset < f->first_write)
- f->first_write = io_u->offset;
- if (f->last_write == -1ULL ||
- ((io_u->offset + bytes) > f->last_write))
- f->last_write = io_u->offset + bytes;
- }
- if (td->last_write_comp) {
- int idx = td->last_write_idx++;
-
- td->last_write_comp[idx] = io_u->offset;
- if (td->last_write_idx == td->o.iodepth)
- td->last_write_idx = 0;
- }
- }
+ if (ddir == DDIR_WRITE)
+ file_log_write_comp(td, f, io_u->offset, bytes);
if (ramp_time_over(td) && (td->runstate == TD_RUNNING ||
td->runstate == TD_VERIFYING))
l = calloc(1, sizeof(*l));
l->nr_samples = 0;
- l->max_samples = 1024;
+ l->max_samples = DEF_LOG_ENTRIES;
l->log_type = p->log_type;
l->log_offset = p->log_offset;
l->log_gz = p->log_gz;
struct iolog_flush_data {
struct workqueue_work work;
- pthread_mutex_t lock;
- pthread_cond_t cv;
- int wait;
- volatile int done;
- volatile int refs;
struct io_log *log;
void *samples;
uint64_t nr_samples;
#ifdef CONFIG_ZLIB
-static void drop_data_unlock(struct iolog_flush_data *data)
+static int gz_work(struct iolog_flush_data *data)
{
- int refs;
-
- refs = --data->refs;
- pthread_mutex_unlock(&data->lock);
-
- if (!refs) {
- free(data);
- pthread_mutex_destroy(&data->lock);
- pthread_cond_destroy(&data->cv);
- }
-}
-
-/*
- * Invoked from our compress helper thread, when logging would have exceeded
- * the specified memory limitation. Compresses the previously stored
- * entries.
- */
-static int gz_work(struct submit_worker *sw, struct workqueue_work *work)
-{
- struct iolog_flush_data *data;
struct iolog_compress *c;
struct flist_head list;
unsigned int seq;
INIT_FLIST_HEAD(&list);
- data = container_of(work, struct iolog_flush_data, work);
-
stream.zalloc = Z_NULL;
stream.zfree = Z_NULL;
stream.opaque = Z_NULL;
ret = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
if (ret != Z_OK) {
log_err("fio: failed to init gz stream\n");
- return 0;
+ goto err;
}
seq = ++data->log->chunk_seq;
ret = 0;
done:
- if (data->wait) {
- pthread_mutex_lock(&data->lock);
- data->done = 1;
- pthread_cond_signal(&data->cv);
-
- drop_data_unlock(data);
- } else
- free(data);
+ free(data);
return ret;
err:
while (!flist_empty(&list)) {
goto done;
}
+/*
+ * Invoked from our compress helper thread, when logging would have exceeded
+ * the specified memory limitation. Compresses the previously stored
+ * entries.
+ */
+static int gz_work_async(struct submit_worker *sw, struct workqueue_work *work)
+{
+ return gz_work(container_of(work, struct iolog_flush_data, work));
+}
+
static int gz_init_worker(struct submit_worker *sw)
{
struct thread_data *td = sw->wq->td;
}
static struct workqueue_ops log_compress_wq_ops = {
- .fn = gz_work,
+ .fn = gz_work_async,
.init_worker_fn = gz_init_worker,
.nice = 1,
};
data->nr_samples = log->nr_samples;
log->nr_samples = 0;
- log->max_samples = 128;
+ log->max_samples = DEF_LOG_ENTRIES;
log->log = malloc(log->max_samples * log_entry_sz(log));
- data->wait = wait;
- if (data->wait) {
- pthread_mutex_init(&data->lock, NULL);
- pthread_cond_init(&data->cv, NULL);
- data->done = 0;
- data->refs = 2;
- }
-
- workqueue_enqueue(&log->td->log_compress_wq, &data->work);
-
- if (wait) {
- pthread_mutex_lock(&data->lock);
- while (!data->done)
- pthread_cond_wait(&data->cv, &data->lock);
-
- drop_data_unlock(data);
- }
+ if (!wait)
+ workqueue_enqueue(&log->td->log_compress_wq, &data->work);
+ else
+ gz_work(data);
return 0;
}
IO_LOG_TYPE_IOPS,
};
+#define DEF_LOG_ENTRIES 1024
+
/*
* Dynamically growing data sample log
*/
fio_gettime(&tv, NULL);
memcpy(&td->epoch, &tv, sizeof(tv));
memcpy(&td->start, &tv, sizeof(tv));
+ memcpy(&td->iops_sample_time, &tv, sizeof(tv));
+ memcpy(&td->bw_sample_time, &tv, sizeof(tv));
lat_target_reset(td);
clear_rusage_stat(td);
.help = "Use preadv/pwritev",
},
#endif
-#ifdef CONFIG_PWRITEV
+#ifdef CONFIG_PWRITEV2
{ .ival = "pvsync2",
.help = "Use preadv2/pwritev2",
},
#include <sys/ioctl.h>
#include <sys/uio.h>
#include <sys/syscall.h>
+#include <sys/sysmacros.h>
#include <sys/vfs.h>
#include <sys/mman.h>
#include <unistd.h>
<Product Id="*"
Codepage="1252" Language="1033"
Manufacturer="fio" Name="fio"
- UpgradeCode="2338A332-5511-43CF-B9BD-5C60496CCFCC" Version="2.8">
+ UpgradeCode="2338A332-5511-43CF-B9BD-5C60496CCFCC" Version="2.9">
<Package
Description="Flexible IO Tester"
InstallerVersion="301" Keywords="Installer,MSI,Database"
} pvt;
static inline const char *option_matches(const char *arg_str,
- const char *opt_name)
+ const char *opt_name, int smatch)
{
while (*arg_str != '\0' && *arg_str != '=') {
if (*arg_str++ != *opt_name++)
return NULL;
}
- if (*opt_name)
+ if (*opt_name && !smatch)
return NULL;
return arg_str;
}
for (lo = longopts; lo->name; lo++) {
- if ((opt_end = option_matches(carg+2, lo->name)))
+ opt_end = option_matches(carg+2, lo->name, 0);
+ if (opt_end)
break;
}
- if (!opt_end)
- return '?';
+ /*
+ * The GNU getopt_long_only() apparently allows a short match,
+ * if it's unique and if we don't have a full match. Let's
+ * do the same here, search and see if there is one (and only
+ * one) short match.
+ */
+ if (!opt_end) {
+ const struct option *lo_match = NULL;
+
+ for (lo = longopts; lo->name; lo++) {
+ const char *ret;
+
+ ret = option_matches(carg+2, lo->name, 1);
+ if (!ret)
+ continue;
+ if (!opt_end) {
+ opt_end = ret;
+ lo_match = lo;
+ } else {
+ opt_end = NULL;
+ break;
+ }
+ }
+ if (!opt_end)
+ return '?';
+ lo = lo_match;
+ }
if (longindex)
*longindex = lo-longopts;
#include <errno.h>
#include <features.h>
#include <inttypes.h>
+#include <sys/sysmacros.h>
#ifndef PROGRAM_NAME
# error "You must define PROGRAM_NAME before including this header"
}
int fio_server_get_verify_state(const char *name, int threadnumber,
- void **datap, int *version)
+ void **datap)
{
struct thread_io_list *s;
struct cmd_sendfile out;
* the header, and the thread_io_list checksum
*/
s = rep->data + sizeof(struct verify_state_hdr);
- if (verify_state_hdr(rep->data, s, version)) {
+ if (verify_state_hdr(rep->data, s)) {
ret = EILSEQ;
goto fail;
}
extern void fio_server_send_gs(struct group_run_stats *);
extern void fio_server_send_du(void);
extern void fio_server_send_job_options(struct flist_head *, unsigned int);
-extern int fio_server_get_verify_state(const char *, int, void **, int *);
+extern int fio_server_get_verify_state(const char *, int, void **);
extern struct fio_net_cmd *fio_net_recv_cmd(int sk, bool wait);
unsigned long long bw;
ts = &threadstats[i];
+ if (ts->groupid == -1)
+ continue;
rs = &runstats[ts->groupid];
rs->kb_base = ts->kb_base;
rs->unit_base = ts->unit_base;
--- /dev/null
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <pthread.h>
+
+static struct thread_data {
+ unsigned long mb;
+} td;
+
+static void *worker(void *data)
+{
+ struct thread_data *td = data;
+ unsigned long index;
+ size_t size;
+ char *buf;
+ int i, first = 1;
+
+ size = td->mb * 1024UL * 1024UL;
+ buf = malloc(size);
+
+ for (i = 0; i < 100000; i++) {
+ for (index = 0; index + 4096 < size; index += 4096)
+ memset(&buf[index+512], 0x89, 512);
+ if (first) {
+ printf("loop%d: did %lu MB\n", i+1, size/(1024UL*1024UL));
+ first = 0;
+ }
+ }
+ return NULL;
+}
+
+int main(int argc, char *argv[])
+{
+ unsigned long mb, threads;
+ pthread_t *pthreads;
+ int i;
+
+ if (argc < 3) {
+ printf("%s: <mb per thread> <threads>\n", argv[0]);
+ return 1;
+ }
+
+ mb = strtoul(argv[1], NULL, 10);
+ threads = strtoul(argv[2], NULL, 10);
+
+ pthreads = calloc(threads, sizeof(pthread_t));
+ td.mb = mb;
+
+ for (i = 0; i < threads; i++)
+ pthread_create(&pthreads[i], NULL, worker, &td);
+
+ for (i = 0; i < threads; i++) {
+ void *ret;
+
+ pthread_join(pthreads[i], &ret);
+ }
+ return 0;
+}
--- /dev/null
+/*
+ * Read a file and write the contents to stdout. If a given read takes
+ * longer than 'max_us' time, then we schedule a new thread to handle
+ * the next read. This avoids the coordinated omission problem, where
+ * one request appears to take a long time, but in reality a lot of
+ * requests would have been slow, but we don't notice since new submissions
+ * are not being issued if just 1 is held up.
+ *
+ * One test case:
+ *
+ * $ time (./read-to-pipe-async -f randfile.gz | gzip -dc > outfile; sync)
+ *
+ * This will read randfile.gz and log the latencies of doing so, while
+ * piping the output to gzip to decompress it. Any latencies over max_us
+ * are logged when they happen, and latency buckets are displayed at the
+ * end of the run
+ *
+ * gcc -Wall -g -O2 -o read-to-pipe-async read-to-pipe-async.c -lpthread
+ *
+ * Copyright (C) 2016 Jens Axboe
+ *
+ */
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <inttypes.h>
+#include <string.h>
+#include <pthread.h>
+#include <errno.h>
+#include <assert.h>
+
+#include "../flist.h"
+
+static int bs = 4096;
+static int max_us = 10000;
+static char *file;
+static int separate_writer = 1;
+
+#define PLAT_BITS 8
+#define PLAT_VAL (1 << PLAT_BITS)
+#define PLAT_GROUP_NR 19
+#define PLAT_NR (PLAT_GROUP_NR * PLAT_VAL)
+#define PLAT_LIST_MAX 20
+
+struct stats {
+ unsigned int plat[PLAT_NR];
+ unsigned int nr_samples;
+ unsigned int max;
+ unsigned int min;
+ unsigned int over;
+};
+
+static double plist[PLAT_LIST_MAX] = { 50.0, 75.0, 90.0, 95.0, 99.0, 99.5, 99.9, 99.99, 99.999, 99.9999, };
+
+struct thread_data {
+ int exit;
+ int done;
+ pthread_mutex_t lock;
+ pthread_cond_t cond;
+ pthread_mutex_t done_lock;
+ pthread_cond_t done_cond;
+ pthread_t thread;
+};
+
+struct writer_thread {
+ struct flist_head list;
+ struct flist_head done_list;
+ struct stats s;
+ struct thread_data thread;
+};
+
+struct reader_thread {
+ struct flist_head list;
+ struct flist_head done_list;
+ int started;
+ int busy;
+ int write_seq;
+ struct stats s;
+ struct thread_data thread;
+};
+
+struct work_item {
+ struct flist_head list;
+ void *buf;
+ size_t buf_size;
+ off_t off;
+ int fd;
+ int seq;
+ struct writer_thread *writer;
+ struct reader_thread *reader;
+ pthread_mutex_t lock;
+ pthread_cond_t cond;
+ pthread_t thread;
+};
+
+static struct reader_thread reader_thread;
+static struct writer_thread writer_thread;
+
+uint64_t utime_since(const struct timeval *s, const struct timeval *e)
+{
+ long sec, usec;
+ uint64_t ret;
+
+ sec = e->tv_sec - s->tv_sec;
+ usec = e->tv_usec - s->tv_usec;
+ if (sec > 0 && usec < 0) {
+ sec--;
+ usec += 1000000;
+ }
+
+ if (sec < 0 || (sec == 0 && usec < 0))
+ return 0;
+
+ ret = sec * 1000000ULL + usec;
+
+ return ret;
+}
+
+static struct work_item *find_seq(struct writer_thread *w, unsigned int seq)
+{
+ struct work_item *work;
+ struct flist_head *entry;
+
+ if (flist_empty(&w->list))
+ return NULL;
+
+ flist_for_each(entry, &w->list) {
+ work = flist_entry(entry, struct work_item, list);
+ if (work->seq == seq)
+ return work;
+ }
+
+ return NULL;
+}
+
+static unsigned int plat_val_to_idx(unsigned int val)
+{
+ unsigned int msb, error_bits, base, offset;
+
+ /* Find MSB starting from bit 0 */
+ if (val == 0)
+ msb = 0;
+ else
+ msb = sizeof(val)*8 - __builtin_clz(val) - 1;
+
+ /*
+ * MSB <= (PLAT_BITS-1), cannot be rounded off. Use
+ * all bits of the sample as index
+ */
+ if (msb <= PLAT_BITS)
+ return val;
+
+ /* Compute the number of error bits to discard*/
+ error_bits = msb - PLAT_BITS;
+
+ /* Compute the number of buckets before the group */
+ base = (error_bits + 1) << PLAT_BITS;
+
+ /*
+ * Discard the error bits and apply the mask to find the
+ * index for the buckets in the group
+ */
+ offset = (PLAT_VAL - 1) & (val >> error_bits);
+
+ /* Make sure the index does not exceed (array size - 1) */
+ return (base + offset) < (PLAT_NR - 1) ?
+ (base + offset) : (PLAT_NR - 1);
+}
+
+/*
+ * Convert the given index of the bucket array to the value
+ * represented by the bucket
+ */
+static unsigned int plat_idx_to_val(unsigned int idx)
+{
+ unsigned int error_bits, k, base;
+
+ assert(idx < PLAT_NR);
+
+ /* MSB <= (PLAT_BITS-1), cannot be rounded off. Use
+ * all bits of the sample as index */
+ if (idx < (PLAT_VAL << 1))
+ return idx;
+
+ /* Find the group and compute the minimum value of that group */
+ error_bits = (idx >> PLAT_BITS) - 1;
+ base = 1 << (error_bits + PLAT_BITS);
+
+ /* Find its bucket number of the group */
+ k = idx % PLAT_VAL;
+
+ /* Return the mean of the range of the bucket */
+ return base + ((k + 0.5) * (1 << error_bits));
+}
+
+static void add_lat(struct stats *s, unsigned int us, const char *name)
+{
+ int lat_index = 0;
+
+ if (us > s->max)
+ s->max = us;
+ if (us < s->min)
+ s->min = us;
+
+ if (us > max_us) {
+ fprintf(stderr, "%s latency=%u usec\n", name, us);
+ s->over++;
+ }
+
+ lat_index = plat_val_to_idx(us);
+ __sync_fetch_and_add(&s->plat[lat_index], 1);
+ __sync_fetch_and_add(&s->nr_samples, 1);
+}
+
+static int write_work(struct work_item *work)
+{
+ struct timeval s, e;
+ ssize_t ret;
+
+ gettimeofday(&s, NULL);
+ ret = write(STDOUT_FILENO, work->buf, work->buf_size);
+ gettimeofday(&e, NULL);
+ assert(ret == work->buf_size);
+
+ add_lat(&work->writer->s, utime_since(&s, &e), "write");
+ return work->seq + 1;
+}
+
+static void thread_exiting(struct thread_data *thread)
+{
+ __sync_fetch_and_add(&thread->done, 1);
+ pthread_cond_signal(&thread->done_cond);
+}
+
+static void *writer_fn(void *data)
+{
+ struct writer_thread *wt = data;
+ struct work_item *work;
+ unsigned int seq = 1;
+
+ work = NULL;
+ while (!wt->thread.exit || !flist_empty(&wt->list)) {
+ pthread_mutex_lock(&wt->thread.lock);
+
+ if (work) {
+ flist_add_tail(&work->list, &wt->done_list);
+ work = NULL;
+ }
+
+ work = find_seq(wt, seq);
+ if (work)
+ flist_del_init(&work->list);
+ else
+ pthread_cond_wait(&wt->thread.cond, &wt->thread.lock);
+
+ pthread_mutex_unlock(&wt->thread.lock);
+
+ if (work)
+ seq = write_work(work);
+ }
+
+ thread_exiting(&wt->thread);
+ return NULL;
+}
+
+static void reader_work(struct work_item *work)
+{
+ struct timeval s, e;
+ ssize_t ret;
+ size_t left;
+ void *buf;
+ off_t off;
+
+ gettimeofday(&s, NULL);
+
+ left = work->buf_size;
+ buf = work->buf;
+ off = work->off;
+ while (left) {
+ ret = pread(work->fd, buf, left, off);
+ if (!ret) {
+ fprintf(stderr, "zero read\n");
+ break;
+ } else if (ret < 0) {
+ fprintf(stderr, "errno=%d\n", errno);
+ break;
+ }
+ left -= ret;
+ off += ret;
+ buf += ret;
+ }
+
+ gettimeofday(&e, NULL);
+
+ add_lat(&work->reader->s, utime_since(&s, &e), "read");
+
+ pthread_cond_signal(&work->cond);
+
+ if (separate_writer) {
+ pthread_mutex_lock(&work->writer->thread.lock);
+ flist_add_tail(&work->list, &work->writer->list);
+ pthread_mutex_unlock(&work->writer->thread.lock);
+ pthread_cond_signal(&work->writer->thread.cond);
+ } else {
+ struct reader_thread *rt = work->reader;
+ struct work_item *next = NULL;
+ struct flist_head *entry;
+
+ /*
+ * Write current work if it matches in sequence.
+ */
+ if (work->seq == rt->write_seq)
+ goto write_it;
+
+ pthread_mutex_lock(&rt->thread.lock);
+
+ flist_add_tail(&work->list, &rt->done_list);
+
+ /*
+ * See if the next work item is here, if so, write it
+ */
+ work = NULL;
+ flist_for_each(entry, &rt->done_list) {
+ next = flist_entry(entry, struct work_item, list);
+ if (next->seq == rt->write_seq) {
+ work = next;
+ flist_del(&work->list);
+ break;
+ }
+ }
+
+ pthread_mutex_unlock(&rt->thread.lock);
+
+ if (work) {
+write_it:
+ write_work(work);
+ __sync_fetch_and_add(&rt->write_seq, 1);
+ }
+ }
+}
+
+static void *reader_one_off(void *data)
+{
+ reader_work(data);
+ return NULL;
+}
+
+static void *reader_fn(void *data)
+{
+ struct reader_thread *rt = data;
+ struct work_item *work;
+
+ while (!rt->thread.exit || !flist_empty(&rt->list)) {
+ work = NULL;
+ pthread_mutex_lock(&rt->thread.lock);
+ if (!flist_empty(&rt->list)) {
+ work = flist_first_entry(&rt->list, struct work_item, list);
+ flist_del_init(&work->list);
+ } else
+ pthread_cond_wait(&rt->thread.cond, &rt->thread.lock);
+ pthread_mutex_unlock(&rt->thread.lock);
+
+ if (work) {
+ __sync_fetch_and_add(&rt->busy, 1);
+ reader_work(work);
+ __sync_fetch_and_sub(&rt->busy, 1);
+ }
+ }
+
+ thread_exiting(&rt->thread);
+ return NULL;
+}
+
+static void queue_work(struct reader_thread *rt, struct work_item *work)
+{
+ if (!rt->started) {
+ pthread_mutex_lock(&rt->thread.lock);
+ flist_add_tail(&work->list, &rt->list);
+ pthread_mutex_unlock(&rt->thread.lock);
+
+ rt->started = 1;
+ pthread_create(&rt->thread.thread, NULL, reader_fn, rt);
+ } else if (!rt->busy && !pthread_mutex_trylock(&rt->thread.lock)) {
+ flist_add_tail(&work->list, &rt->list);
+ pthread_mutex_unlock(&rt->thread.lock);
+
+ pthread_cond_signal(&rt->thread.cond);
+ } else {
+ int ret = pthread_create(&work->thread, NULL, reader_one_off, work);
+ if (ret)
+ fprintf(stderr, "pthread_create=%d\n", ret);
+ else
+ pthread_detach(work->thread);
+ }
+}
+
+static unsigned int calc_percentiles(unsigned int *io_u_plat, unsigned long nr,
+ unsigned int **output)
+{
+ unsigned long sum = 0;
+ unsigned int len, i, j = 0;
+ unsigned int oval_len = 0;
+ unsigned int *ovals = NULL;
+ int is_last;
+
+ len = 0;
+ while (len < PLAT_LIST_MAX && plist[len] != 0.0)
+ len++;
+
+ if (!len)
+ return 0;
+
+ /*
+ * Calculate bucket values, note down max and min values
+ */
+ is_last = 0;
+ for (i = 0; i < PLAT_NR && !is_last; i++) {
+ sum += io_u_plat[i];
+ while (sum >= (plist[j] / 100.0 * nr)) {
+ assert(plist[j] <= 100.0);
+
+ if (j == oval_len) {
+ oval_len += 100;
+ ovals = realloc(ovals, oval_len * sizeof(unsigned int));
+ }
+
+ ovals[j] = plat_idx_to_val(i);
+ is_last = (j == len - 1);
+ if (is_last)
+ break;
+
+ j++;
+ }
+ }
+
+ *output = ovals;
+ return len;
+}
+
+static void show_latencies(struct stats *s, const char *msg)
+{
+ unsigned int *ovals = NULL;
+ unsigned int len, i;
+
+ len = calc_percentiles(s->plat, s->nr_samples, &ovals);
+ if (len) {
+ fprintf(stderr, "Latency percentiles (usec) (%s)\n", msg);
+ for (i = 0; i < len; i++)
+ fprintf(stderr, "\t%2.4fth: %u\n", plist[i], ovals[i]);
+ }
+
+ if (ovals)
+ free(ovals);
+
+ fprintf(stderr, "\tOver=%u, min=%u, max=%u\n", s->over, s->min, s->max);
+}
+
+static void init_thread(struct thread_data *thread)
+{
+ pthread_cond_init(&thread->cond, NULL);
+ pthread_cond_init(&thread->done_cond, NULL);
+ pthread_mutex_init(&thread->lock, NULL);
+ pthread_mutex_init(&thread->done_lock, NULL);
+ thread->exit = 0;
+}
+
+static void exit_thread(struct thread_data *thread,
+ void fn(struct writer_thread *),
+ struct writer_thread *wt)
+{
+ __sync_fetch_and_add(&thread->exit, 1);
+ pthread_cond_signal(&thread->cond);
+
+ while (!thread->done) {
+ pthread_mutex_lock(&thread->done_lock);
+
+ if (fn) {
+ struct timeval tv;
+ struct timespec ts;
+
+ gettimeofday(&tv, NULL);
+ ts.tv_sec = tv.tv_sec + 1;
+ ts.tv_nsec = tv.tv_usec * 1000ULL;
+
+ pthread_cond_timedwait(&thread->done_cond, &thread->done_lock, &ts);
+ fn(wt);
+ } else
+ pthread_cond_wait(&thread->done_cond, &thread->done_lock);
+
+ pthread_mutex_unlock(&thread->done_lock);
+ }
+}
+
+static int usage(char *argv[])
+{
+ fprintf(stderr, "%s: [-b blocksize] [-t max usec] [-w separate writer] -f file\n", argv[0]);
+ return 1;
+}
+
+static int parse_options(int argc, char *argv[])
+{
+ int c;
+
+ while ((c = getopt(argc, argv, "f:b:t:w:")) != -1) {
+ switch (c) {
+ case 'f':
+ file = strdup(optarg);
+ break;
+ case 'b':
+ bs = atoi(optarg);
+ break;
+ case 't':
+ max_us = atoi(optarg);
+ break;
+ case 'w':
+ separate_writer = atoi(optarg);
+ if (!separate_writer)
+ fprintf(stderr, "inline writing is broken\n");
+ break;
+ case '?':
+ default:
+ return usage(argv);
+ }
+ }
+
+ if (!file)
+ return usage(argv);
+
+ return 0;
+}
+
+static void prune_done_entries(struct writer_thread *wt)
+{
+ FLIST_HEAD(list);
+
+ if (flist_empty(&wt->done_list))
+ return;
+
+ if (pthread_mutex_trylock(&wt->thread.lock))
+ return;
+
+ if (!flist_empty(&wt->done_list))
+ flist_splice_init(&wt->done_list, &list);
+ pthread_mutex_unlock(&wt->thread.lock);
+
+ while (!flist_empty(&list)) {
+ struct work_item *work;
+
+ work = flist_first_entry(&list, struct work_item, list);
+ flist_del(&work->list);
+
+ pthread_cond_destroy(&work->cond);
+ pthread_mutex_destroy(&work->lock);
+ free(work->buf);
+ free(work);
+ }
+}
+
+int main(int argc, char *argv[])
+{
+ struct timeval s, re, we;
+ struct reader_thread *rt;
+ struct writer_thread *wt;
+ unsigned long rate;
+ struct stat sb;
+ size_t bytes;
+ off_t off;
+ int fd, seq;
+
+ if (parse_options(argc, argv))
+ return 1;
+
+ fd = open(file, O_RDONLY);
+ if (fd < 0) {
+ perror("open");
+ return 2;
+ }
+
+ if (fstat(fd, &sb) < 0) {
+ perror("stat");
+ return 3;
+ }
+
+ wt = &writer_thread;
+ init_thread(&wt->thread);
+ INIT_FLIST_HEAD(&wt->list);
+ INIT_FLIST_HEAD(&wt->done_list);
+ wt->s.max = 0;
+ wt->s.min = -1U;
+ pthread_create(&wt->thread.thread, NULL, writer_fn, wt);
+
+ rt = &reader_thread;
+ init_thread(&rt->thread);
+ INIT_FLIST_HEAD(&rt->list);
+ INIT_FLIST_HEAD(&rt->done_list);
+ rt->s.max = 0;
+ rt->s.min = -1U;
+ rt->write_seq = 1;
+
+ off = 0;
+ seq = 0;
+ bytes = 0;
+
+ gettimeofday(&s, NULL);
+
+ while (sb.st_size) {
+ struct work_item *work;
+ size_t this_len;
+ struct timespec ts;
+ struct timeval tv;
+
+ prune_done_entries(wt);
+
+ this_len = sb.st_size;
+ if (this_len > bs)
+ this_len = bs;
+
+ work = calloc(1, sizeof(*work));
+ work->buf = malloc(this_len);
+ work->buf_size = this_len;
+ work->off = off;
+ work->fd = fd;
+ work->seq = ++seq;
+ work->writer = wt;
+ work->reader = rt;
+ pthread_cond_init(&work->cond, NULL);
+ pthread_mutex_init(&work->lock, NULL);
+
+ queue_work(rt, work);
+
+ gettimeofday(&tv, NULL);
+ ts.tv_sec = tv.tv_sec;
+ ts.tv_nsec = tv.tv_usec * 1000ULL;
+ ts.tv_nsec += max_us * 1000ULL;
+ if (ts.tv_nsec >= 1000000000ULL) {
+ ts.tv_nsec -= 1000000000ULL;
+ ts.tv_sec++;
+ }
+
+ pthread_mutex_lock(&work->lock);
+ pthread_cond_timedwait(&work->cond, &work->lock, &ts);
+ pthread_mutex_unlock(&work->lock);
+
+ off += this_len;
+ sb.st_size -= this_len;
+ bytes += this_len;
+ }
+
+ exit_thread(&rt->thread, NULL, NULL);
+ gettimeofday(&re, NULL);
+
+ exit_thread(&wt->thread, prune_done_entries, wt);
+ gettimeofday(&we, NULL);
+
+ show_latencies(&rt->s, "READERS");
+ show_latencies(&wt->s, "WRITERS");
+
+ bytes /= 1024;
+ rate = (bytes * 1000UL * 1000UL) / utime_since(&s, &re);
+ fprintf(stderr, "Read rate (KB/sec) : %lu\n", rate);
+ rate = (bytes * 1000UL * 1000UL) / utime_since(&s, &we);
+ fprintf(stderr, "Write rate (KB/sec): %lu\n", rate);
+
+ close(fd);
+ return 0;
+}
{
int i;
- printf("Thread %u, %s\n", no_s, s->name);
- printf("Completions: %llu\n", (unsigned long long) s->no_comps);
- printf("Depth: %llu\n", (unsigned long long) s->depth);
- printf("Number IOs: %llu\n", (unsigned long long) s->numberio);
- printf("Index: %llu\n", (unsigned long long) s->index);
+ printf("Thread:\t\t%u\n", no_s);
+ printf("Name:\t\t%s\n", s->name);
+ printf("Completions:\t%llu\n", (unsigned long long) s->no_comps);
+ printf("Depth:\t\t%llu\n", (unsigned long long) s->depth);
+ printf("Number IOs:\t%llu\n", (unsigned long long) s->numberio);
+ printf("Index:\t\t%llu\n", (unsigned long long) s->index);
printf("Completions:\n");
- for (i = 0; i < s->no_comps; i++)
- printf("\t%llu\n", (unsigned long long) s->offsets[i]);
+ if (!s->no_comps)
+ return;
+ for (i = s->no_comps - 1; i >= 0; i--) {
+ printf("\t(file=%2llu) %llu\n",
+ (unsigned long long) s->comps[i].fileno,
+ (unsigned long long) s->comps[i].offset);
+ }
+}
+
+static void show(struct thread_io_list *s, size_t size)
+{
+ int no_s;
+
+ no_s = 0;
+ do {
+ int i;
+
+ s->no_comps = le64_to_cpu(s->no_comps);
+ s->depth = le32_to_cpu(s->depth);
+ s->nofiles = le32_to_cpu(s->nofiles);
+ s->numberio = le64_to_cpu(s->numberio);
+ s->index = le64_to_cpu(s->index);
+
+ for (i = 0; i < s->no_comps; i++) {
+ s->comps[i].fileno = le64_to_cpu(s->comps[i].fileno);
+ s->comps[i].offset = le64_to_cpu(s->comps[i].offset);
+ }
+
+ show_s(s, no_s);
+ no_s++;
+ size -= __thread_io_list_sz(s->depth, s->nofiles);
+ s = (void *) s + __thread_io_list_sz(s->depth, s->nofiles);
+ } while (size != 0);
}
static void show_verify_state(void *buf, size_t size)
struct verify_state_hdr *hdr = buf;
struct thread_io_list *s;
uint32_t crc;
- int no_s;
hdr->version = le64_to_cpu(hdr->version);
hdr->size = le64_to_cpu(hdr->size);
hdr->crc = le64_to_cpu(hdr->crc);
- printf("Version: %x, Size %u, crc %x\n", (unsigned int) hdr->version,
- (unsigned int) hdr->size,
- (unsigned int) hdr->crc);
+ printf("Version:\t0x%x\n", (unsigned int) hdr->version);
+ printf("Size:\t\t%u\n", (unsigned int) hdr->size);
+ printf("CRC:\t\t0x%x\n", (unsigned int) hdr->crc);
size -= sizeof(*hdr);
if (hdr->size != size) {
return;
}
- if (hdr->version != 0x02) {
- log_err("Can only handle version 2 headers\n");
- return;
- }
-
- no_s = 0;
- do {
- int i;
-
- s->no_comps = le64_to_cpu(s->no_comps);
- s->depth = le64_to_cpu(s->depth);
- s->numberio = le64_to_cpu(s->numberio);
- s->index = le64_to_cpu(s->index);
-
- for (i = 0; i < s->no_comps; i++)
- s->offsets[i] = le64_to_cpu(s->offsets[i]);
-
- show_s(s, no_s);
- no_s++;
- size -= __thread_io_list_sz(s->depth);
- s = (void *) s + __thread_io_list_sz(s->depth);
- } while (size != 0);
+ if (hdr->version == 0x03)
+ show(s, size);
+ else
+ log_err("Unsupported version %d\n", (int) hdr->version);
}
-int main(int argc, char *argv[])
+static int show_file(const char *file)
{
struct stat sb;
void *buf;
int ret, fd;
- debug_init();
-
- if (argc < 2) {
- log_err("Usage: %s <state file>\n", argv[0]);
- return 1;
- }
-
- fd = open(argv[1], O_RDONLY);
+ fd = open(file, O_RDONLY);
if (fd < 0) {
- log_err("open %s: %s\n", argv[1], strerror(errno));
+ log_err("open %s: %s\n", file, strerror(errno));
return 1;
}
free(buf);
return 0;
}
+
+int main(int argc, char *argv[])
+{
+ int i, ret;
+
+ debug_init();
+
+ if (argc < 2) {
+ log_err("Usage: %s <state file>\n", argv[0]);
+ return 1;
+ }
+
+ ret = 0;
+ for (i = 1; i < argc; i++) {
+ ret = show_file(argv[i]);
+ if (ret)
+ break;
+ }
+
+ return ret;
+}
--- /dev/null
+#!/bin/python
+#
+# fiologparser.py
+#
+# This tool lets you parse multiple fio log files and look at interaval
+# statistics even when samples are non-uniform. For instance:
+#
+# fiologparser.py -s *bw*
+#
+# to see per-interval sums for all bandwidth logs or:
+#
+# fiologparser.py -a *clat*
+#
+# to see per-interval average completion latency.
+
+import argparse
+
+def parse_args():
+ parser = argparse.ArgumentParser()
+ parser.add_argument('-i', '--interval', required=False, type=int, default=1000, help='interval of time in seconds.')
+ parser.add_argument('-d', '--divisor', required=False, type=int, default=1, help='divide the results by this value.')
+ parser.add_argument('-f', '--full', dest='full', action='store_true', default=False, help='print full output.')
+ parser.add_argument('-a', '--average', dest='average', action='store_true', default=False, help='print the average for each interval.')
+ parser.add_argument('-s', '--sum', dest='sum', action='store_true', default=False, help='print the sum for each interval.')
+ parser.add_argument("FILE", help="collectl log output files to parse", nargs="+")
+ args = parser.parse_args()
+
+ return args
+
+def get_ftime(series):
+ ftime = 0
+ for ts in series:
+ if ftime == 0 or ts.last.end < ftime:
+ ftime = ts.last.end
+ return ftime
+
+def print_full(ctx, series):
+ ftime = get_ftime(series)
+ start = 0
+ end = ctx.interval
+
+ while (start < ftime):
+ end = ftime if ftime < end else end
+ results = [ts.get_value(start, end) for ts in series]
+ print "%s, %s" % (end, ', '.join(["%0.3f" % i for i in results]))
+ start += ctx.interval
+ end += ctx.interval
+
+def print_sums(ctx, series):
+ ftime = get_ftime(series)
+ start = 0
+ end = ctx.interval
+
+ while (start < ftime):
+ end = ftime if ftime < end else end
+ results = [ts.get_value(start, end) for ts in series]
+ print "%s, %0.3f" % (end, sum(results))
+ start += ctx.interval
+ end += ctx.interval
+
+def print_averages(ctx, series):
+ ftime = get_ftime(series)
+ start = 0
+ end = ctx.interval
+
+ while (start < ftime):
+ end = ftime if ftime < end else end
+ results = [ts.get_value(start, end) for ts in series]
+ print "%s, %0.3f" % (end, float(sum(results))/len(results))
+ start += ctx.interval
+ end += ctx.interval
+
+
+def print_default(ctx, series):
+ ftime = get_ftime(series)
+ start = 0
+ end = ctx.interval
+ averages = []
+ weights = []
+
+ while (start < ftime):
+ end = ftime if ftime < end else end
+ results = [ts.get_value(start, end) for ts in series]
+ averages.append(sum(results))
+ weights.append(end-start)
+ start += ctx.interval
+ end += ctx.interval
+
+ total = 0
+ for i in xrange(0, len(averages)):
+ total += averages[i]*weights[i]
+ print '%0.3f' % (total/sum(weights))
+
+class TimeSeries():
+ def __init__(self, ctx, fn):
+ self.ctx = ctx
+ self.last = None
+ self.samples = []
+ self.read_data(fn)
+
+ def read_data(self, fn):
+ f = open(fn, 'r')
+ p_time = 0
+ for line in f:
+ (time, value, foo, bar) = line.rstrip('\r\n').rsplit(', ')
+ self.add_sample(p_time, int(time), int(value))
+ p_time = int(time)
+
+ def add_sample(self, start, end, value):
+ sample = Sample(ctx, start, end, value)
+ if not self.last or self.last.end < end:
+ self.last = sample
+ self.samples.append(sample)
+
+ def get_value(self, start, end):
+ value = 0
+ for sample in self.samples:
+ value += sample.get_contribution(start, end)
+ return value
+
+class Sample():
+ def __init__(self, ctx, start, end, value):
+ self.ctx = ctx
+ self.start = start
+ self.end = end
+ self.value = value
+
+ def get_contribution(self, start, end):
+ # short circuit if not within the bound
+ if (end < self.start or start > self.end):
+ return 0
+
+ sbound = self.start if start < self.start else start
+ ebound = self.end if end > self.end else end
+ ratio = float(ebound-sbound) / (end-start)
+ return self.value*ratio/ctx.divisor
+
+
+if __name__ == '__main__':
+ ctx = parse_args()
+ series = []
+ for fn in ctx.FILE:
+ series.append(TimeSeries(ctx, fn))
+ if ctx.sum:
+ print_sums(ctx, series)
+ elif ctx.average:
+ print_averages(ctx, series)
+ elif ctx.full:
+ print_full(ctx, series)
+ else:
+ print_default(ctx, series)
+
/*
* For dumping current write state
*/
-struct thread_io_list {
- uint64_t no_comps;
- uint64_t depth;
- uint64_t numberio;
- uint64_t index;
- struct thread_rand_state rand;
- uint8_t name[64];
- uint64_t offsets[0];
+struct file_comp {
+ uint64_t fileno;
+ uint64_t offset;
};
-struct thread_io_list_v1 {
+struct thread_io_list {
uint64_t no_comps;
- uint64_t depth;
+ uint32_t depth;
+ uint32_t nofiles;
uint64_t numberio;
uint64_t index;
- struct thread_rand32_state rand;
+ struct thread_rand_state rand;
uint8_t name[64];
- uint64_t offsets[0];
+ struct file_comp comps[0];
};
struct all_io_list {
struct thread_io_list state[0];
};
-#define VSTATE_HDR_VERSION_V1 0x01
-#define VSTATE_HDR_VERSION 0x02
+#define VSTATE_HDR_VERSION 0x03
struct verify_state_hdr {
uint64_t version;
extern int verify_load_state(struct thread_data *, const char *);
extern void verify_free_state(struct thread_data *);
extern int verify_state_should_stop(struct thread_data *, struct io_u *);
-extern void verify_convert_assign_state(struct thread_data *, void *, int);
-extern int verify_state_hdr(struct verify_state_hdr *, struct thread_io_list *,
- int *);
+extern void verify_assign_state(struct thread_data *, void *);
+extern int verify_state_hdr(struct verify_state_hdr *, struct thread_io_list *);
-static inline size_t __thread_io_list_sz(uint64_t depth)
+static inline size_t __thread_io_list_sz(uint32_t depth, uint32_t nofiles)
{
- return sizeof(struct thread_io_list) + depth * sizeof(uint64_t);
+ return sizeof(struct thread_io_list) + depth * nofiles * sizeof(struct file_comp);
}
static inline size_t thread_io_list_sz(struct thread_io_list *s)
{
- return __thread_io_list_sz(le64_to_cpu(s->depth));
+ return __thread_io_list_sz(le32_to_cpu(s->depth), le32_to_cpu(s->nofiles));
}
static inline struct thread_io_list *io_list_next(struct thread_io_list *s)
return 0;
}
+static int __fill_file_completions(struct thread_data *td,
+ struct thread_io_list *s,
+ struct fio_file *f, unsigned int *index)
+{
+ unsigned int comps;
+ int i, j;
+
+ if (!f->last_write_comp)
+ return 0;
+
+ if (td->io_blocks[DDIR_WRITE] < td->o.iodepth)
+ comps = td->io_blocks[DDIR_WRITE];
+ else
+ comps = td->o.iodepth;
+
+ j = f->last_write_idx - 1;
+ for (i = 0; i < comps; i++) {
+ if (j == -1)
+ j = td->o.iodepth - 1;
+ s->comps[*index].fileno = __cpu_to_le64(f->fileno);
+ s->comps[*index].offset = cpu_to_le64(f->last_write_comp[j]);
+ (*index)++;
+ j--;
+ }
+
+ return comps;
+}
+
+static int fill_file_completions(struct thread_data *td,
+ struct thread_io_list *s, unsigned int *index)
+{
+ struct fio_file *f;
+ unsigned int i;
+ int comps = 0;
+
+ for_each_file(td, f, i)
+ comps += __fill_file_completions(td, s, f, index);
+
+ return comps;
+}
+
struct all_io_list *get_all_io_list(int save_mask, size_t *sz)
{
struct all_io_list *rep;
continue;
td->stop_io = 1;
td->flags |= TD_F_VSTATE_SAVED;
- depth += td->o.iodepth;
+ depth += (td->o.iodepth * td->o.nr_files);
nr++;
}
*sz = sizeof(*rep);
*sz += nr * sizeof(struct thread_io_list);
- *sz += depth * sizeof(uint64_t);
+ *sz += depth * sizeof(struct file_comp);
rep = malloc(*sz);
memset(rep, 0, *sz);
next = &rep->state[0];
for_each_td(td, i) {
struct thread_io_list *s = next;
- unsigned int comps;
+ unsigned int comps, index = 0;
if (save_mask != IO_LIST_ALL && (i + 1) != save_mask)
continue;
- if (td->last_write_comp) {
- int j, k;
-
- if (td->io_blocks[DDIR_WRITE] < td->o.iodepth)
- comps = td->io_blocks[DDIR_WRITE];
- else
- comps = td->o.iodepth;
-
- k = td->last_write_idx - 1;
- for (j = 0; j < comps; j++) {
- if (k == -1)
- k = td->o.iodepth - 1;
- s->offsets[j] = cpu_to_le64(td->last_write_comp[k]);
- k--;
- }
- } else
- comps = 0;
+ comps = fill_file_completions(td, s, &index);
s->no_comps = cpu_to_le64((uint64_t) comps);
s->depth = cpu_to_le64((uint64_t) td->o.iodepth);
+ s->nofiles = cpu_to_le64((uint64_t) td->o.nr_files);
s->numberio = cpu_to_le64((uint64_t) td->io_issues[DDIR_WRITE]);
s->index = cpu_to_le64((uint64_t) i);
if (td->random_state.use64) {
free(td->vstate);
}
-static struct thread_io_list *convert_v1_list(struct thread_io_list_v1 *s)
+void verify_assign_state(struct thread_data *td, void *p)
{
- struct thread_io_list *til;
+ struct thread_io_list *s = p;
int i;
- til = malloc(__thread_io_list_sz(s->no_comps));
- til->no_comps = s->no_comps;
- til->depth = s->depth;
- til->numberio = s->numberio;
- til->index = s->index;
- memcpy(til->name, s->name, sizeof(til->name));
-
- til->rand.use64 = 0;
- for (i = 0; i < 4; i++)
- til->rand.state32.s[i] = s->rand.s[i];
+ s->no_comps = le64_to_cpu(s->no_comps);
+ s->depth = le32_to_cpu(s->depth);
+ s->nofiles = le32_to_cpu(s->nofiles);
+ s->numberio = le64_to_cpu(s->numberio);
+ s->rand.use64 = le64_to_cpu(s->rand.use64);
- for (i = 0; i < s->no_comps; i++)
- til->offsets[i] = s->offsets[i];
-
- return til;
-}
-
-void verify_convert_assign_state(struct thread_data *td, void *p, int version)
-{
- struct thread_io_list *til;
- int i;
-
- if (version == 1) {
- struct thread_io_list_v1 *s = p;
-
- s->no_comps = le64_to_cpu(s->no_comps);
- s->depth = le64_to_cpu(s->depth);
- s->numberio = le64_to_cpu(s->numberio);
- for (i = 0; i < 4; i++)
- s->rand.s[i] = le32_to_cpu(s->rand.s[i]);
- for (i = 0; i < s->no_comps; i++)
- s->offsets[i] = le64_to_cpu(s->offsets[i]);
-
- til = convert_v1_list(s);
- free(s);
+ if (s->rand.use64) {
+ for (i = 0; i < 6; i++)
+ s->rand.state64.s[i] = le64_to_cpu(s->rand.state64.s[i]);
} else {
- struct thread_io_list *s = p;
-
- s->no_comps = le64_to_cpu(s->no_comps);
- s->depth = le64_to_cpu(s->depth);
- s->numberio = le64_to_cpu(s->numberio);
- s->rand.use64 = le64_to_cpu(s->rand.use64);
-
- if (s->rand.use64) {
- for (i = 0; i < 6; i++)
- s->rand.state64.s[i] = le64_to_cpu(s->rand.state64.s[i]);
- } else {
- for (i = 0; i < 4; i++)
- s->rand.state32.s[i] = le32_to_cpu(s->rand.state32.s[i]);
- }
- for (i = 0; i < s->no_comps; i++)
- s->offsets[i] = le64_to_cpu(s->offsets[i]);
+ for (i = 0; i < 4; i++)
+ s->rand.state32.s[i] = le32_to_cpu(s->rand.state32.s[i]);
+ }
- til = p;
+ for (i = 0; i < s->no_comps; i++) {
+ s->comps[i].fileno = le64_to_cpu(s->comps[i].fileno);
+ s->comps[i].offset = le64_to_cpu(s->comps[i].offset);
}
- td->vstate = til;
+ td->vstate = p;
}
-int verify_state_hdr(struct verify_state_hdr *hdr, struct thread_io_list *s,
- int *version)
+int verify_state_hdr(struct verify_state_hdr *hdr, struct thread_io_list *s)
{
uint64_t crc;
hdr->size = le64_to_cpu(hdr->size);
hdr->crc = le64_to_cpu(hdr->crc);
- if (hdr->version != VSTATE_HDR_VERSION &&
- hdr->version != VSTATE_HDR_VERSION_V1)
+ if (hdr->version != VSTATE_HDR_VERSION)
return 1;
crc = fio_crc32c((void *)s, hdr->size);
if (crc != hdr->crc)
return 1;
- *version = hdr->version;
return 0;
}
hdr.size = le64_to_cpu(hdr.size);
hdr.crc = le64_to_cpu(hdr.crc);
- if (hdr.version != VSTATE_HDR_VERSION &&
- hdr.version != VSTATE_HDR_VERSION_V1) {
- log_err("fio: bad version in verify state header\n");
+ if (hdr.version != VSTATE_HDR_VERSION) {
+ log_err("fio: unsupported (%d) version in verify state header\n",
+ (unsigned int) hdr.version);
goto err;
}
close(fd);
- verify_convert_assign_state(td, s, hdr.version);
+ verify_assign_state(td, s);
return 0;
err:
if (s)
int verify_state_should_stop(struct thread_data *td, struct io_u *io_u)
{
struct thread_io_list *s = td->vstate;
+ struct fio_file *f = io_u->file;
int i;
- if (!s)
+ if (!s || !f)
return 0;
/*
* completed or not. If the IO was seen as completed, then
* lets verify it.
*/
- for (i = 0; i < s->no_comps; i++)
- if (io_u->offset == s->offsets[i])
+ for (i = 0; i < s->no_comps; i++) {
+ if (s->comps[i].fileno != f->fileno)
+ continue;
+ if (io_u->offset == s->comps[i].offset)
return 0;
+ }
/*
* Not found, we have to stop