#include "filelock.h"
#include "smalloc.h"
+static int iolog_flush(struct io_log *log);
+
static const char iolog_ver2[] = "fio version 2 iolog";
void queue_io_piece(struct thread_data *td, struct io_piece *ipo)
{
struct io_log *l;
- l = smalloc(sizeof(*l));
- l->nr_samples = 0;
+ l = scalloc(1, sizeof(*l));
+ INIT_FLIST_HEAD(&l->io_logs);
l->log_type = p->log_type;
l->log_offset = p->log_offset;
l->log_gz = p->log_gz;
void free_log(struct io_log *log)
{
- free(log->log);
+ while (!flist_empty(&log->io_logs)) {
+ struct io_logs *cur_log;
+
+ cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
+ flist_del_init(&cur_log->list);
+ free(cur_log->log);
+ }
+
free(log->filename);
sfree(log);
}
struct workqueue_work work;
struct io_log *log;
void *samples;
- uint64_t nr_samples;
+ uint32_t nr_samples;
+ bool free;
};
#define GZ_CHUNK 131072
inflate_gz_chunks(log, f);
- flush_samples(f, log->log, log->nr_samples * log_entry_sz(log));
+ while (!flist_empty(&log->io_logs)) {
+ struct io_logs *cur_log;
+
+ cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
+ flist_del_init(&cur_log->list);
+ flush_samples(f, cur_log->log, cur_log->nr_samples * log_entry_sz(log));
+ }
fclose(f);
clear_file_buffer(buf);
static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
{
if (td->flags & TD_F_COMPRESS_LOG)
- iolog_flush(log, 1);
+ iolog_flush(log);
if (trylock) {
if (fio_trylock_file(log->filename))
INIT_FLIST_HEAD(&list);
+ memset(&stream, 0, sizeof(stream));
stream.zalloc = Z_NULL;
stream.zfree = Z_NULL;
stream.opaque = Z_NULL;
ret = 0;
done:
- free(data);
+ if (data->free)
+ free(data);
return ret;
err:
while (!flist_empty(&list)) {
* Queue work item to compress the existing log entries. We reset the
* current log to a small size, and reference the existing log in the
* data that we queue for compression. Once compression has been done,
- * this old log is freed. If called with wait == 1, will not return until
- * the log compression has completed.
+ * this old log is freed. If called with finish == true, will not return
+ * until the log compression has completed, and will flush all previous
+ * logs too
*/
-int iolog_flush(struct io_log *log, int wait)
+static int iolog_flush(struct io_log *log)
{
struct iolog_flush_data *data;
- io_u_quiesce(log->td);
-
data = malloc(sizeof(*data));
if (!data)
return 1;
data->log = log;
+ data->free = false;
- data->samples = log->log;
- data->nr_samples = log->nr_samples;
+ while (!flist_empty(&log->io_logs)) {
+ struct io_logs *cur_log;
- log->nr_samples = 0;
- log->max_samples = DEF_LOG_ENTRIES;
- log->log = malloc(log->max_samples * log_entry_sz(log));
+ cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
+ flist_del_init(&cur_log->list);
+
+ data->samples = cur_log->log;
+ data->nr_samples = cur_log->nr_samples;
+
+ cur_log->nr_samples = 0;
+ cur_log->max_samples = 0;
+ cur_log->log = NULL;
- if (!wait)
- workqueue_enqueue(&log->td->log_compress_wq, &data->work);
- else
gz_work(data);
+ }
+ free(data);
return 0;
}
+int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
+{
+ struct iolog_flush_data *data;
+
+ io_u_quiesce(log->td);
+
+ data = malloc(sizeof(*data));
+ if (!data)
+ return 1;
+
+ data->log = log;
+
+ data->samples = cur_log->log;
+ data->nr_samples = cur_log->nr_samples;
+ data->free = true;
+
+ cur_log->nr_samples = cur_log->max_samples = 0;
+ cur_log->log = NULL;
+
+ workqueue_enqueue(&log->td->log_compress_wq, &data->work);
+ return 0;
+}
#else
-int iolog_flush(struct io_log *log, int wait)
+static int iolog_flush(struct io_log *log)
+{
+ return 1;
+}
+
+int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
{
return 1;
}
#endif
+struct io_logs *iolog_cur_log(struct io_log *log)
+{
+ if (flist_empty(&log->io_logs))
+ return NULL;
+
+ return flist_last_entry(&log->io_logs, struct io_logs, list);
+}
+
+uint64_t iolog_nr_samples(struct io_log *iolog)
+{
+ struct flist_head *entry;
+ uint64_t ret = 0;
+
+ flist_for_each(entry, &iolog->io_logs) {
+ struct io_logs *cur_log;
+
+ cur_log = flist_entry(entry, struct io_logs, list);
+ ret += cur_log->nr_samples;
+ }
+
+ return ret;
+}
+
static int __write_log(struct thread_data *td, struct io_log *log, int try)
{
if (log)
};
#define DEF_LOG_ENTRIES 1024
+#define MAX_LOG_ENTRIES (1024 * DEF_LOG_ENTRIES)
+
+#define LOG_QUIESCE_SZ (64 * 1024 * 1024)
+
+struct io_logs {
+ struct flist_head list;
+ uint64_t nr_samples;
+ uint64_t max_samples;
+ void *log;
+};
/*
* Dynamically growing data sample log
/*
* Entries already logged
*/
- uint64_t nr_samples;
- uint64_t max_samples;
- void *log;
+ struct flist_head io_logs;
+ uint32_t cur_log_max;
unsigned int log_ddir_mask;
/*
* If we fail extending the log, stop collecting more entries.
*/
- unsigned int disabled;
+ bool disabled;
/*
* Log offsets
return (struct io_sample *) ((char *) samples + sample_offset);
}
+struct io_logs *iolog_cur_log(struct io_log *);
+uint64_t iolog_nr_samples(struct io_log *);
+
static inline struct io_sample *get_sample(struct io_log *iolog,
+ struct io_logs *cur_log,
uint64_t sample)
{
- return __get_sample(iolog->log, iolog->log_offset, sample);
+ return __get_sample(cur_log->log, iolog->log_offset, sample);
}
enum {
extern void free_log(struct io_log *);
extern void fio_writeout_logs(bool);
extern void td_writeout_logs(struct thread_data *, bool);
-extern int iolog_flush(struct io_log *, int);
+extern int iolog_cur_flush(struct io_log *, struct io_logs *);
static inline void init_ipo(struct io_piece *ipo)
{
}
}
-static int fio_append_iolog_gz(struct sk_entry *first, struct io_log *log)
-{
- int ret = 0;
#ifdef CONFIG_ZLIB
+static int __fio_append_iolog_gz(struct sk_entry *first, struct io_log *log,
+ struct io_logs *cur_log, z_stream *stream)
+{
struct sk_entry *entry;
- z_stream stream;
void *out_pdu;
+ int ret;
- /*
- * Dirty - since the log is potentially huge, compress it into
- * FIO_SERVER_MAX_FRAGMENT_PDU chunks and let the receiving
- * side defragment it.
- */
- out_pdu = malloc(FIO_SERVER_MAX_FRAGMENT_PDU);
-
- stream.zalloc = Z_NULL;
- stream.zfree = Z_NULL;
- stream.opaque = Z_NULL;
-
- if (deflateInit(&stream, Z_DEFAULT_COMPRESSION) != Z_OK) {
- ret = 1;
- goto err;
- }
-
- stream.next_in = (void *) log->log;
- stream.avail_in = log->nr_samples * log_entry_sz(log);
+ stream->next_in = (void *) cur_log->log;
+ stream->avail_in = cur_log->nr_samples * log_entry_sz(log);
do {
unsigned int this_len;
- stream.avail_out = FIO_SERVER_MAX_FRAGMENT_PDU;
- stream.next_out = out_pdu;
- ret = deflate(&stream, Z_FINISH);
+ /*
+ * Dirty - since the log is potentially huge, compress it into
+ * FIO_SERVER_MAX_FRAGMENT_PDU chunks and let the receiving
+ * side defragment it.
+ */
+ out_pdu = malloc(FIO_SERVER_MAX_FRAGMENT_PDU);
+
+ stream->avail_out = FIO_SERVER_MAX_FRAGMENT_PDU;
+ stream->next_out = out_pdu;
+ ret = deflate(stream, Z_FINISH);
/* may be Z_OK, or Z_STREAM_END */
- if (ret < 0)
- goto err_zlib;
+ if (ret < 0) {
+ free(out_pdu);
+ return 1;
+ }
- this_len = FIO_SERVER_MAX_FRAGMENT_PDU - stream.avail_out;
+ this_len = FIO_SERVER_MAX_FRAGMENT_PDU - stream->avail_out;
entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, out_pdu, this_len,
- NULL, SK_F_VEC | SK_F_INLINE | SK_F_FREE);
- out_pdu = NULL;
+ NULL, SK_F_VEC | SK_F_INLINE | SK_F_FREE);
flist_add_tail(&entry->list, &first->next);
- } while (stream.avail_in);
+ } while (stream->avail_in);
+
+ return 0;
+}
+
+static int fio_append_iolog_gz(struct sk_entry *first, struct io_log *log)
+{
+ int ret = 0;
+ z_stream stream;
+
+ memset(&stream, 0, sizeof(stream));
+ stream.zalloc = Z_NULL;
+ stream.zfree = Z_NULL;
+ stream.opaque = Z_NULL;
+
+ if (deflateInit(&stream, Z_DEFAULT_COMPRESSION) != Z_OK)
+ return 1;
+
+ while (!flist_empty(&log->io_logs)) {
+ struct io_logs *cur_log;
+
+ cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
+ flist_del_init(&cur_log->list);
+
+ ret = __fio_append_iolog_gz(first, log, cur_log, &stream);
+ if (ret)
+ break;
+ }
-err_zlib:
deflateEnd(&stream);
-err:
- free(out_pdu);
-#endif
return ret;
}
+#else
+static int fio_append_iolog_gz(struct sk_entry *first, struct io_log *log)
+{
+ return 1;
+}
+#endif
static int fio_append_gz_chunks(struct sk_entry *first, struct io_log *log)
{
static int fio_append_text_log(struct sk_entry *first, struct io_log *log)
{
struct sk_entry *entry;
- size_t size = log->nr_samples * log_entry_sz(log);
- entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, log->log, size,
- NULL, SK_F_VEC | SK_F_INLINE);
- flist_add_tail(&entry->list, &first->next);
+ while (!flist_empty(&log->io_logs)) {
+ struct io_logs *cur_log;
+ size_t size;
+
+ cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
+ flist_del_init(&cur_log->list);
+
+ size = cur_log->nr_samples * log_entry_sz(log);
+
+ entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, cur_log->log, size,
+ NULL, SK_F_VEC | SK_F_INLINE);
+ flist_add_tail(&entry->list, &first->next);
+ }
+
return 0;
}
{
struct cmd_iolog_pdu pdu;
struct sk_entry *first;
- int i, ret = 0;
+ struct flist_head *entry;
+ int ret = 0;
- pdu.nr_samples = cpu_to_le64(log->nr_samples);
+ pdu.nr_samples = cpu_to_le64(iolog_nr_samples(log));
pdu.thread_number = cpu_to_le32(td->thread_number);
pdu.log_type = cpu_to_le32(log->log_type);
* We can't do this for a pre-compressed log, but for that case,
* log->nr_samples is zero anyway.
*/
- for (i = 0; i < log->nr_samples; i++) {
- struct io_sample *s = get_sample(log, i);
+ flist_for_each(entry, &log->io_logs) {
+ struct io_logs *cur_log;
+ int i;
- s->time = cpu_to_le64(s->time);
- s->val = cpu_to_le64(s->val);
- s->__ddir = cpu_to_le32(s->__ddir);
- s->bs = cpu_to_le32(s->bs);
+ cur_log = flist_entry(entry, struct io_logs, list);
- if (log->log_offset) {
- struct io_sample_offset *so = (void *) s;
+ for (i = 0; i < cur_log->nr_samples; i++) {
+ struct io_sample *s = get_sample(log, cur_log, i);
- so->offset = cpu_to_le64(so->offset);
+ s->time = cpu_to_le64(s->time);
+ s->val = cpu_to_le64(s->val);
+ s->__ddir = cpu_to_le32(s->__ddir);
+ s->bs = cpu_to_le32(s->bs);
+
+ if (log->log_offset) {
+ struct io_sample_offset *so = (void *) s;
+
+ so->offset = cpu_to_le64(so->offset);
+ }
}
}
is->samples++;
}
+/*
+ * Return a struct io_logs, which is added to the tail of the log
+ * list for 'iolog'.
+ */
+static struct io_logs *get_new_log(struct io_log *iolog)
+{
+ size_t new_size, new_samples;
+ struct io_logs *cur_log;
+
+ /*
+ * Cap the size at MAX_LOG_ENTRIES, so we don't keep doubling
+ * forever
+ */
+ if (!iolog->cur_log_max)
+ new_samples = DEF_LOG_ENTRIES;
+ else {
+ new_samples = iolog->cur_log_max * 2;
+ if (new_samples > MAX_LOG_ENTRIES)
+ new_samples = MAX_LOG_ENTRIES;
+ }
+
+ /*
+ * If the alloc size is sufficiently large, quiesce pending IO before
+ * attempting it. This is to avoid spending a long time in alloc with
+ * IO pending, which will unfairly skew the completion latencies of
+ * inflight IO.
+ */
+ new_size = new_samples * log_entry_sz(iolog);
+ if (new_size >= LOG_QUIESCE_SZ)
+ io_u_quiesce(iolog->td);
+
+ cur_log = malloc(sizeof(*cur_log));
+ if (cur_log) {
+ INIT_FLIST_HEAD(&cur_log->list);
+ cur_log->log = malloc(new_size);
+ if (cur_log->log) {
+ cur_log->nr_samples = 0;
+ cur_log->max_samples = new_samples;
+ flist_add_tail(&cur_log->list, &iolog->io_logs);
+ iolog->cur_log_max = new_samples;
+ return cur_log;
+ }
+ free(cur_log);
+ }
+
+ return NULL;
+}
+
+static struct io_logs *get_cur_log(struct io_log *iolog)
+{
+ struct io_logs *cur_log;
+
+ cur_log = iolog_cur_log(iolog);
+ if (!cur_log) {
+ cur_log = get_new_log(iolog);
+ if (!cur_log)
+ return NULL;
+ }
+
+ if (cur_log->nr_samples < cur_log->max_samples)
+ return cur_log;
+
+ /*
+ * No room for a new sample. If we're compressing on the fly, flush
+ * out the current chunk
+ */
+ if (iolog->log_gz) {
+ if (iolog_cur_flush(iolog, cur_log)) {
+ log_err("fio: failed flushing iolog! Will stop logging.\n");
+ return NULL;
+ }
+ }
+
+ /*
+ * Get a new log array, and add to our list
+ */
+ cur_log = get_new_log(iolog);
+ if (!cur_log) {
+ log_err("fio: failed extending iolog! Will stop logging.\n");
+ return NULL;
+ }
+
+ return cur_log;
+}
+
static void __add_log_sample(struct io_log *iolog, unsigned long val,
enum fio_ddir ddir, unsigned int bs,
unsigned long t, uint64_t offset)
{
- uint64_t nr_samples = iolog->nr_samples;
- struct io_sample *s;
+ struct io_logs *cur_log;
if (iolog->disabled)
return;
-
- if (!iolog->nr_samples)
+ if (flist_empty(&iolog->io_logs))
iolog->avg_last = t;
- if (iolog->nr_samples == iolog->max_samples) {
- size_t new_size, new_samples;
- void *new_log;
+ cur_log = get_cur_log(iolog);
+ if (cur_log) {
+ struct io_sample *s;
- if (!iolog->max_samples)
- new_samples = DEF_LOG_ENTRIES;
- else
- new_samples = iolog->max_samples * 2;
-
- new_size = new_samples * log_entry_sz(iolog);
-
- if (iolog->log_gz && (new_size > iolog->log_gz)) {
- if (!iolog->log) {
- iolog->log = malloc(new_size);
- iolog->max_samples = new_samples;
- } else if (iolog_flush(iolog, 0)) {
- log_err("fio: failed flushing iolog! Will stop logging.\n");
- iolog->disabled = 1;
- return;
- }
- nr_samples = iolog->nr_samples;
- } else {
- new_log = realloc(iolog->log, new_size);
- if (!new_log) {
- log_err("fio: failed extending iolog! Will stop logging.\n");
- iolog->disabled = 1;
- return;
- }
- iolog->log = new_log;
- iolog->max_samples = new_samples;
- }
- }
+ s = get_sample(iolog, cur_log, cur_log->nr_samples);
- s = get_sample(iolog, nr_samples);
+ s->val = val;
+ s->time = t;
+ io_sample_set_ddir(iolog, s, ddir);
+ s->bs = bs;
- s->val = val;
- s->time = t;
- io_sample_set_ddir(iolog, s, ddir);
- s->bs = bs;
+ if (iolog->log_offset) {
+ struct io_sample_offset *so = (void *) s;
- if (iolog->log_offset) {
- struct io_sample_offset *so = (void *) s;
+ so->offset = offset;
+ }
- so->offset = offset;
+ cur_log->nr_samples++;
+ return;
}
- iolog->nr_samples++;
+ iolog->disabled = true;
}
static inline void reset_io_stat(struct io_stat *ios)