From 7e419452839dee5afa86b0a6c78136c421e1d706 Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Thu, 19 May 2016 09:20:16 -0600 Subject: [PATCH 1/1] iolog: switch to list based scheme The previous iolog implementation had one big log, that we continually resized when we ran out of space. The resize involved a realloc() of the data, which could potentially be slow for big logs. This could introduce long latency outliers for workloads, since fio did not quiesce IO before resizing the log. Rewrite the logging code to have a list of logs, and cap each of them at a reasonable size. This means we never have to realloc data, and that we allocate smaller chunks instead. Signed-off-by: Jens Axboe --- iolog.c | 115 +++++++++++++++++++++++++++++++++++-------- iolog.h | 25 +++++++--- server.c | 139 +++++++++++++++++++++++++++++++++------------------- stat.c | 146 ++++++++++++++++++++++++++++++++++++++----------------- 4 files changed, 304 insertions(+), 121 deletions(-) diff --git a/iolog.c b/iolog.c index 71afe86c..abc2dc3e 100644 --- a/iolog.c +++ b/iolog.c @@ -20,6 +20,8 @@ #include "filelock.h" #include "smalloc.h" +static int iolog_flush(struct io_log *log); + static const char iolog_ver2[] = "fio version 2 iolog"; void queue_io_piece(struct thread_data *td, struct io_piece *ipo) @@ -575,8 +577,8 @@ void setup_log(struct io_log **log, struct log_params *p, { struct io_log *l; - l = smalloc(sizeof(*l)); - l->nr_samples = 0; + l = scalloc(1, sizeof(*l)); + INIT_FLIST_HEAD(&l->io_logs); l->log_type = p->log_type; l->log_offset = p->log_offset; l->log_gz = p->log_gz; @@ -628,7 +630,14 @@ static void clear_file_buffer(void *buf) void free_log(struct io_log *log) { - free(log->log); + while (!flist_empty(&log->io_logs)) { + struct io_logs *cur_log; + + cur_log = flist_first_entry(&log->io_logs, struct io_logs, list); + flist_del_init(&cur_log->list); + free(cur_log->log); + } + free(log->filename); sfree(log); } @@ -673,7 +682,8 @@ struct iolog_flush_data { struct workqueue_work work; struct io_log *log; void *samples; - uint64_t nr_samples; + uint32_t nr_samples; + bool free; }; #define GZ_CHUNK 131072 @@ -954,7 +964,13 @@ void flush_log(struct io_log *log, int do_append) inflate_gz_chunks(log, f); - flush_samples(f, log->log, log->nr_samples * log_entry_sz(log)); + while (!flist_empty(&log->io_logs)) { + struct io_logs *cur_log; + + cur_log = flist_first_entry(&log->io_logs, struct io_logs, list); + flist_del_init(&cur_log->list); + flush_samples(f, cur_log->log, cur_log->nr_samples * log_entry_sz(log)); + } fclose(f); clear_file_buffer(buf); @@ -963,7 +979,7 @@ void flush_log(struct io_log *log, int do_append) static int finish_log(struct thread_data *td, struct io_log *log, int trylock) { if (td->flags & TD_F_COMPRESS_LOG) - iolog_flush(log, 1); + iolog_flush(log); if (trylock) { if (fio_trylock_file(log->filename)) @@ -1014,6 +1030,7 @@ static int gz_work(struct iolog_flush_data *data) INIT_FLIST_HEAD(&list); + memset(&stream, 0, sizeof(stream)); stream.zalloc = Z_NULL; stream.zfree = Z_NULL; stream.opaque = Z_NULL; @@ -1081,7 +1098,8 @@ static int gz_work(struct iolog_flush_data *data) ret = 0; done: - free(data); + if (data->free) + free(data); return ret; err: while (!flist_empty(&list)) { @@ -1145,39 +1163,71 @@ void iolog_compress_exit(struct thread_data *td) * Queue work item to compress the existing log entries. We reset the * current log to a small size, and reference the existing log in the * data that we queue for compression. Once compression has been done, - * this old log is freed. If called with wait == 1, will not return until - * the log compression has completed. + * this old log is freed. If called with finish == true, will not return + * until the log compression has completed, and will flush all previous + * logs too */ -int iolog_flush(struct io_log *log, int wait) +static int iolog_flush(struct io_log *log) { struct iolog_flush_data *data; - io_u_quiesce(log->td); - data = malloc(sizeof(*data)); if (!data) return 1; data->log = log; + data->free = false; - data->samples = log->log; - data->nr_samples = log->nr_samples; + while (!flist_empty(&log->io_logs)) { + struct io_logs *cur_log; - log->nr_samples = 0; - log->max_samples = DEF_LOG_ENTRIES; - log->log = malloc(log->max_samples * log_entry_sz(log)); + cur_log = flist_first_entry(&log->io_logs, struct io_logs, list); + flist_del_init(&cur_log->list); + + data->samples = cur_log->log; + data->nr_samples = cur_log->nr_samples; + + cur_log->nr_samples = 0; + cur_log->max_samples = 0; + cur_log->log = NULL; - if (!wait) - workqueue_enqueue(&log->td->log_compress_wq, &data->work); - else gz_work(data); + } + free(data); return 0; } +int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log) +{ + struct iolog_flush_data *data; + + io_u_quiesce(log->td); + + data = malloc(sizeof(*data)); + if (!data) + return 1; + + data->log = log; + + data->samples = cur_log->log; + data->nr_samples = cur_log->nr_samples; + data->free = true; + + cur_log->nr_samples = cur_log->max_samples = 0; + cur_log->log = NULL; + + workqueue_enqueue(&log->td->log_compress_wq, &data->work); + return 0; +} #else -int iolog_flush(struct io_log *log, int wait) +static int iolog_flush(struct io_log *log) +{ + return 1; +} + +int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log) { return 1; } @@ -1193,6 +1243,29 @@ void iolog_compress_exit(struct thread_data *td) #endif +struct io_logs *iolog_cur_log(struct io_log *log) +{ + if (flist_empty(&log->io_logs)) + return NULL; + + return flist_last_entry(&log->io_logs, struct io_logs, list); +} + +uint64_t iolog_nr_samples(struct io_log *iolog) +{ + struct flist_head *entry; + uint64_t ret = 0; + + flist_for_each(entry, &iolog->io_logs) { + struct io_logs *cur_log; + + cur_log = flist_entry(entry, struct io_logs, list); + ret += cur_log->nr_samples; + } + + return ret; +} + static int __write_log(struct thread_data *td, struct io_log *log, int try) { if (log) diff --git a/iolog.h b/iolog.h index 739a7c8d..2b7813b9 100644 --- a/iolog.h +++ b/iolog.h @@ -42,6 +42,16 @@ enum { }; #define DEF_LOG_ENTRIES 1024 +#define MAX_LOG_ENTRIES (1024 * DEF_LOG_ENTRIES) + +#define LOG_QUIESCE_SZ (64 * 1024 * 1024) + +struct io_logs { + struct flist_head list; + uint64_t nr_samples; + uint64_t max_samples; + void *log; +}; /* * Dynamically growing data sample log @@ -50,9 +60,8 @@ struct io_log { /* * Entries already logged */ - uint64_t nr_samples; - uint64_t max_samples; - void *log; + struct flist_head io_logs; + uint32_t cur_log_max; unsigned int log_ddir_mask; @@ -65,7 +74,7 @@ struct io_log { /* * If we fail extending the log, stop collecting more entries. */ - unsigned int disabled; + bool disabled; /* * Log offsets @@ -128,10 +137,14 @@ static inline struct io_sample *__get_sample(void *samples, int log_offset, return (struct io_sample *) ((char *) samples + sample_offset); } +struct io_logs *iolog_cur_log(struct io_log *); +uint64_t iolog_nr_samples(struct io_log *); + static inline struct io_sample *get_sample(struct io_log *iolog, + struct io_logs *cur_log, uint64_t sample) { - return __get_sample(iolog->log, iolog->log_offset, sample); + return __get_sample(cur_log->log, iolog->log_offset, sample); } enum { @@ -219,7 +232,7 @@ extern void flush_samples(FILE *, void *, uint64_t); extern void free_log(struct io_log *); extern void fio_writeout_logs(bool); extern void td_writeout_logs(struct thread_data *, bool); -extern int iolog_flush(struct io_log *, int); +extern int iolog_cur_flush(struct io_log *, struct io_logs *); static inline void init_ipo(struct io_piece *ipo) { diff --git a/server.c b/server.c index dcb7c2da..d36c5113 100644 --- a/server.c +++ b/server.c @@ -1652,58 +1652,79 @@ void fio_server_send_du(void) } } -static int fio_append_iolog_gz(struct sk_entry *first, struct io_log *log) -{ - int ret = 0; #ifdef CONFIG_ZLIB +static int __fio_append_iolog_gz(struct sk_entry *first, struct io_log *log, + struct io_logs *cur_log, z_stream *stream) +{ struct sk_entry *entry; - z_stream stream; void *out_pdu; + int ret; - /* - * Dirty - since the log is potentially huge, compress it into - * FIO_SERVER_MAX_FRAGMENT_PDU chunks and let the receiving - * side defragment it. - */ - out_pdu = malloc(FIO_SERVER_MAX_FRAGMENT_PDU); - - stream.zalloc = Z_NULL; - stream.zfree = Z_NULL; - stream.opaque = Z_NULL; - - if (deflateInit(&stream, Z_DEFAULT_COMPRESSION) != Z_OK) { - ret = 1; - goto err; - } - - stream.next_in = (void *) log->log; - stream.avail_in = log->nr_samples * log_entry_sz(log); + stream->next_in = (void *) cur_log->log; + stream->avail_in = cur_log->nr_samples * log_entry_sz(log); do { unsigned int this_len; - stream.avail_out = FIO_SERVER_MAX_FRAGMENT_PDU; - stream.next_out = out_pdu; - ret = deflate(&stream, Z_FINISH); + /* + * Dirty - since the log is potentially huge, compress it into + * FIO_SERVER_MAX_FRAGMENT_PDU chunks and let the receiving + * side defragment it. + */ + out_pdu = malloc(FIO_SERVER_MAX_FRAGMENT_PDU); + + stream->avail_out = FIO_SERVER_MAX_FRAGMENT_PDU; + stream->next_out = out_pdu; + ret = deflate(stream, Z_FINISH); /* may be Z_OK, or Z_STREAM_END */ - if (ret < 0) - goto err_zlib; + if (ret < 0) { + free(out_pdu); + return 1; + } - this_len = FIO_SERVER_MAX_FRAGMENT_PDU - stream.avail_out; + this_len = FIO_SERVER_MAX_FRAGMENT_PDU - stream->avail_out; entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, out_pdu, this_len, - NULL, SK_F_VEC | SK_F_INLINE | SK_F_FREE); - out_pdu = NULL; + NULL, SK_F_VEC | SK_F_INLINE | SK_F_FREE); flist_add_tail(&entry->list, &first->next); - } while (stream.avail_in); + } while (stream->avail_in); + + return 0; +} + +static int fio_append_iolog_gz(struct sk_entry *first, struct io_log *log) +{ + int ret = 0; + z_stream stream; + + memset(&stream, 0, sizeof(stream)); + stream.zalloc = Z_NULL; + stream.zfree = Z_NULL; + stream.opaque = Z_NULL; + + if (deflateInit(&stream, Z_DEFAULT_COMPRESSION) != Z_OK) + return 1; + + while (!flist_empty(&log->io_logs)) { + struct io_logs *cur_log; + + cur_log = flist_first_entry(&log->io_logs, struct io_logs, list); + flist_del_init(&cur_log->list); + + ret = __fio_append_iolog_gz(first, log, cur_log, &stream); + if (ret) + break; + } -err_zlib: deflateEnd(&stream); -err: - free(out_pdu); -#endif return ret; } +#else +static int fio_append_iolog_gz(struct sk_entry *first, struct io_log *log) +{ + return 1; +} +#endif static int fio_append_gz_chunks(struct sk_entry *first, struct io_log *log) { @@ -1727,11 +1748,21 @@ static int fio_append_gz_chunks(struct sk_entry *first, struct io_log *log) static int fio_append_text_log(struct sk_entry *first, struct io_log *log) { struct sk_entry *entry; - size_t size = log->nr_samples * log_entry_sz(log); - entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, log->log, size, - NULL, SK_F_VEC | SK_F_INLINE); - flist_add_tail(&entry->list, &first->next); + while (!flist_empty(&log->io_logs)) { + struct io_logs *cur_log; + size_t size; + + cur_log = flist_first_entry(&log->io_logs, struct io_logs, list); + flist_del_init(&cur_log->list); + + size = cur_log->nr_samples * log_entry_sz(log); + + entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, cur_log->log, size, + NULL, SK_F_VEC | SK_F_INLINE); + flist_add_tail(&entry->list, &first->next); + } + return 0; } @@ -1739,9 +1770,10 @@ int fio_send_iolog(struct thread_data *td, struct io_log *log, const char *name) { struct cmd_iolog_pdu pdu; struct sk_entry *first; - int i, ret = 0; + struct flist_head *entry; + int ret = 0; - pdu.nr_samples = cpu_to_le64(log->nr_samples); + pdu.nr_samples = cpu_to_le64(iolog_nr_samples(log)); pdu.thread_number = cpu_to_le32(td->thread_number); pdu.log_type = cpu_to_le32(log->log_type); @@ -1759,18 +1791,25 @@ int fio_send_iolog(struct thread_data *td, struct io_log *log, const char *name) * We can't do this for a pre-compressed log, but for that case, * log->nr_samples is zero anyway. */ - for (i = 0; i < log->nr_samples; i++) { - struct io_sample *s = get_sample(log, i); + flist_for_each(entry, &log->io_logs) { + struct io_logs *cur_log; + int i; - s->time = cpu_to_le64(s->time); - s->val = cpu_to_le64(s->val); - s->__ddir = cpu_to_le32(s->__ddir); - s->bs = cpu_to_le32(s->bs); + cur_log = flist_entry(entry, struct io_logs, list); - if (log->log_offset) { - struct io_sample_offset *so = (void *) s; + for (i = 0; i < cur_log->nr_samples; i++) { + struct io_sample *s = get_sample(log, cur_log, i); - so->offset = cpu_to_le64(so->offset); + s->time = cpu_to_le64(s->time); + s->val = cpu_to_le64(s->val); + s->__ddir = cpu_to_le32(s->__ddir); + s->bs = cpu_to_le32(s->bs); + + if (log->log_offset) { + struct io_sample_offset *so = (void *) s; + + so->offset = cpu_to_le64(so->offset); + } } } diff --git a/stat.c b/stat.c index 4d87c29a..b42e886c 100644 --- a/stat.c +++ b/stat.c @@ -1849,66 +1849,124 @@ static inline void add_stat_sample(struct io_stat *is, unsigned long data) is->samples++; } +/* + * Return a struct io_logs, which is added to the tail of the log + * list for 'iolog'. + */ +static struct io_logs *get_new_log(struct io_log *iolog) +{ + size_t new_size, new_samples; + struct io_logs *cur_log; + + /* + * Cap the size at MAX_LOG_ENTRIES, so we don't keep doubling + * forever + */ + if (!iolog->cur_log_max) + new_samples = DEF_LOG_ENTRIES; + else { + new_samples = iolog->cur_log_max * 2; + if (new_samples > MAX_LOG_ENTRIES) + new_samples = MAX_LOG_ENTRIES; + } + + /* + * If the alloc size is sufficiently large, quiesce pending IO before + * attempting it. This is to avoid spending a long time in alloc with + * IO pending, which will unfairly skew the completion latencies of + * inflight IO. + */ + new_size = new_samples * log_entry_sz(iolog); + if (new_size >= LOG_QUIESCE_SZ) + io_u_quiesce(iolog->td); + + cur_log = malloc(sizeof(*cur_log)); + if (cur_log) { + INIT_FLIST_HEAD(&cur_log->list); + cur_log->log = malloc(new_size); + if (cur_log->log) { + cur_log->nr_samples = 0; + cur_log->max_samples = new_samples; + flist_add_tail(&cur_log->list, &iolog->io_logs); + iolog->cur_log_max = new_samples; + return cur_log; + } + free(cur_log); + } + + return NULL; +} + +static struct io_logs *get_cur_log(struct io_log *iolog) +{ + struct io_logs *cur_log; + + cur_log = iolog_cur_log(iolog); + if (!cur_log) { + cur_log = get_new_log(iolog); + if (!cur_log) + return NULL; + } + + if (cur_log->nr_samples < cur_log->max_samples) + return cur_log; + + /* + * No room for a new sample. If we're compressing on the fly, flush + * out the current chunk + */ + if (iolog->log_gz) { + if (iolog_cur_flush(iolog, cur_log)) { + log_err("fio: failed flushing iolog! Will stop logging.\n"); + return NULL; + } + } + + /* + * Get a new log array, and add to our list + */ + cur_log = get_new_log(iolog); + if (!cur_log) { + log_err("fio: failed extending iolog! Will stop logging.\n"); + return NULL; + } + + return cur_log; +} + static void __add_log_sample(struct io_log *iolog, unsigned long val, enum fio_ddir ddir, unsigned int bs, unsigned long t, uint64_t offset) { - uint64_t nr_samples = iolog->nr_samples; - struct io_sample *s; + struct io_logs *cur_log; if (iolog->disabled) return; - - if (!iolog->nr_samples) + if (flist_empty(&iolog->io_logs)) iolog->avg_last = t; - if (iolog->nr_samples == iolog->max_samples) { - size_t new_size, new_samples; - void *new_log; + cur_log = get_cur_log(iolog); + if (cur_log) { + struct io_sample *s; - if (!iolog->max_samples) - new_samples = DEF_LOG_ENTRIES; - else - new_samples = iolog->max_samples * 2; - - new_size = new_samples * log_entry_sz(iolog); - - if (iolog->log_gz && (new_size > iolog->log_gz)) { - if (!iolog->log) { - iolog->log = malloc(new_size); - iolog->max_samples = new_samples; - } else if (iolog_flush(iolog, 0)) { - log_err("fio: failed flushing iolog! Will stop logging.\n"); - iolog->disabled = 1; - return; - } - nr_samples = iolog->nr_samples; - } else { - new_log = realloc(iolog->log, new_size); - if (!new_log) { - log_err("fio: failed extending iolog! Will stop logging.\n"); - iolog->disabled = 1; - return; - } - iolog->log = new_log; - iolog->max_samples = new_samples; - } - } + s = get_sample(iolog, cur_log, cur_log->nr_samples); - s = get_sample(iolog, nr_samples); + s->val = val; + s->time = t; + io_sample_set_ddir(iolog, s, ddir); + s->bs = bs; - s->val = val; - s->time = t; - io_sample_set_ddir(iolog, s, ddir); - s->bs = bs; + if (iolog->log_offset) { + struct io_sample_offset *so = (void *) s; - if (iolog->log_offset) { - struct io_sample_offset *so = (void *) s; + so->offset = offset; + } - so->offset = offset; + cur_log->nr_samples++; + return; } - iolog->nr_samples++; + iolog->disabled = true; } static inline void reset_io_stat(struct io_stat *ios) -- 2.25.1