From aee2ab6775d96609a4632703827c409a7f9abcca Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Thu, 3 Jul 2014 09:10:39 -0600 Subject: [PATCH] Add support for runtime log compression If log_compression=64M is specified, fio will "bite" off chunks of the IO logs and runtime compress them with zlib. This can greatly reduce the amount of memory required to do iops/bw/lat logging of a run, at the cost of using some background CPU for the compression. Signed-off-by: Jens Axboe --- HOWTO | 14 ++ Makefile | 3 +- backend.c | 19 ++- cconv.c | 2 + fio.1 | 11 ++ fio.h | 3 + flist.h | 7 + init.c | 54 ++++++-- iolog.c | 352 ++++++++++++++++++++++++++++++++++++++++++++--- iolog.h | 25 +++- options.c | 13 ++ stat.c | 26 ++-- thread_options.h | 2 + tp.c | 104 ++++++++++++++ tp.h | 32 +++++ 15 files changed, 615 insertions(+), 52 deletions(-) create mode 100644 tp.c create mode 100644 tp.h diff --git a/HOWTO b/HOWTO index 3001fe74..e94d09f3 100644 --- a/HOWTO +++ b/HOWTO @@ -1336,6 +1336,20 @@ log_avg_msec=int By default, fio will log an entry in the iops, latency, log_offset=int If this is set, the iolog options will include the byte offset for the IO entry as well as the other data values. +log_compression=int If this is set, fio will compress the IO logs as + it goes, to keep the memory footprint lower. When a log + reaches the specified size, that chunk is removed and + compressed in the background. Given that IO logs are + fairly highly compressible, this yields a nice memory + savings for longer runs. The downside is that the + compression will consume some background CPU cycles, so + it may impact the run. This, however, is also true if + the logging ends up consuming most of the system memory. + So pick your poison. The IO logs are saved normally at the + end of a run, by decompressing the chunks and storing them + in the specified log file. This feature depends on the + availability of zlib. + lockmem=int Pin down the specified amount of memory with mlock(2). Can potentially be used instead of removing memory or booting with less memory to simulate a smaller amount of memory. diff --git a/Makefile b/Makefile index d49c8a49..f179b644 100644 --- a/Makefile +++ b/Makefile @@ -35,7 +35,8 @@ SOURCE := gettime.c ioengines.c init.c stat.c log.c time.c filesetup.c \ cconv.c lib/prio_tree.c json.c lib/zipf.c lib/axmap.c \ lib/lfsr.c gettime-thread.c helpers.c lib/flist_sort.c \ lib/hweight.c lib/getrusage.c idletime.c td_error.c \ - profiles/tiobench.c profiles/act.c io_u_queue.c filelock.c + profiles/tiobench.c profiles/act.c io_u_queue.c filelock.c \ + tp.c ifdef CONFIG_64BIT_LLP64 CFLAGS += -DBITS_PER_LONG=32 diff --git a/backend.c b/backend.c index 448fc598..e32d8416 100644 --- a/backend.c +++ b/backend.c @@ -53,6 +53,7 @@ #include "lib/getrusage.h" #include "idletime.h" #include "err.h" +#include "tp.h" static pthread_t disk_util_thread; static struct fio_mutex *disk_thread_mutex; @@ -1443,6 +1444,9 @@ static void *thread_main(void *data) goto err; } + if (td->flags & TD_F_COMPRESS_LOG) + tp_init(&td->tp_data); + fio_verify_init(td); fio_gettime(&td->epoch, NULL); @@ -1524,6 +1528,9 @@ static void *thread_main(void *data) fio_writeout_logs(td); + if (td->flags & TD_F_COMPRESS_LOG) + tp_exit(&td->tp_data); + if (o->exec_postrun) exec_string(o, o->exec_postrun, (const char *)"postrun"); @@ -2020,9 +2027,13 @@ int fio_backend(void) return 0; if (write_bw_log) { - setup_log(&agg_io_log[DDIR_READ], 0, IO_LOG_TYPE_BW, 0, "agg-read_bw.log"); - setup_log(&agg_io_log[DDIR_WRITE], 0, IO_LOG_TYPE_BW, 0, "agg-write_bw.log"); - setup_log(&agg_io_log[DDIR_TRIM], 0, IO_LOG_TYPE_BW, 0, "agg-trim_bw.log"); + struct log_params p = { + .log_type = IO_LOG_TYPE_BW, + }; + + setup_log(&agg_io_log[DDIR_READ], &p, "agg-read_bw.log"); + setup_log(&agg_io_log[DDIR_WRITE], &p, "agg-write_bw.log"); + setup_log(&agg_io_log[DDIR_TRIM], &p, "agg-trim_bw.log"); } startup_mutex = fio_mutex_init(FIO_MUTEX_LOCKED); @@ -2046,7 +2057,7 @@ int fio_backend(void) for (i = 0; i < DDIR_RWDIR_CNT; i++) { struct io_log *log = agg_io_log[i]; - __finish_log(log); + flush_log(log); free_log(log); } } diff --git a/cconv.c b/cconv.c index d2539753..aeec04b8 100644 --- a/cconv.c +++ b/cconv.c @@ -152,6 +152,7 @@ void convert_thread_options_to_cpu(struct thread_options *o, o->use_os_rand = le32_to_cpu(top->use_os_rand); o->log_avg_msec = le32_to_cpu(top->log_avg_msec); o->log_offset = le32_to_cpu(top->log_offset); + o->log_gz = le32_to_cpu(top->log_gz); o->norandommap = le32_to_cpu(top->norandommap); o->softrandommap = le32_to_cpu(top->softrandommap); o->bs_unaligned = le32_to_cpu(top->bs_unaligned); @@ -323,6 +324,7 @@ void convert_thread_options_to_net(struct thread_options_pack *top, top->use_os_rand = cpu_to_le32(o->use_os_rand); top->log_avg_msec = cpu_to_le32(o->log_avg_msec); top->log_offset = cpu_to_le32(o->log_offset); + top->log_gz = cpu_to_le32(o->log_gz); top->norandommap = cpu_to_le32(o->norandommap); top->softrandommap = cpu_to_le32(o->softrandommap); top->bs_unaligned = cpu_to_le32(o->bs_unaligned); diff --git a/fio.1 b/fio.1 index da44e570..363c806f 100644 --- a/fio.1 +++ b/fio.1 @@ -1214,6 +1214,17 @@ Defaults to 0. If this is set, the iolog options will include the byte offset for the IO entry as well as the other data values. .TP +.BI log_compression \fR=\fPint +If this is set, fio will compress the IO logs as it goes, to keep the memory +footprint lower. When a log reaches the specified size, that chunk is removed +and compressed in the background. Given that IO logs are fairly highly +compressible, this yields a nice memory savings for longer runs. The downside +is that the compression will consume some background CPU cycles, so it may +impact the run. This, however, is also true if the logging ends up consuming +most of the system memory. So pick your poison. The IO logs are saved +normally at the end of a run, by decompressing the chunks and storing them +in the specified log file. This feature depends on the availability of zlib. +.TP .BI disable_lat \fR=\fPbool Disable measurements of total latency numbers. Useful only for cutting back the number of calls to \fBgettimeofday\fR\|(2), as that does impact performance at diff --git a/fio.h b/fio.h index 4d4af0a7..df0d0200 100644 --- a/fio.h +++ b/fio.h @@ -73,6 +73,7 @@ enum { TD_F_PROFILE_OPS = 64, TD_F_COMPRESS = 128, TD_F_NOIO = 256, + TD_F_COMPRESS_LOG = 512, }; enum { @@ -112,6 +113,8 @@ struct thread_data { struct io_log *bw_log; struct io_log *iops_log; + struct tp_data *tp_data; + uint64_t stat_io_bytes[DDIR_RWDIR_CNT]; struct timeval bw_sample_time; diff --git a/flist.h b/flist.h index 8e130414..938ce1d8 100644 --- a/flist.h +++ b/flist.h @@ -140,6 +140,13 @@ static inline void flist_splice(const struct flist_head *list, __flist_splice(list, head, head->next); } +static inline void flist_splice_tail(struct flist_head *list, + struct flist_head *head) +{ + if (!flist_empty(list)) + __flist_splice(list, head->prev, head); +} + static inline void flist_splice_init(struct flist_head *list, struct flist_head *head) { diff --git a/init.c b/init.c index c2d6109f..678d5985 100644 --- a/init.c +++ b/init.c @@ -1145,25 +1145,49 @@ static int add_job(struct thread_data *td, const char *jobname, int job_add_num, goto err; if (o->lat_log_file) { - snprintf(logname, sizeof(logname), "%s_lat.log", o->lat_log_file); - setup_log(&td->lat_log, o->log_avg_msec, IO_LOG_TYPE_LAT, - o->log_offset, logname); - snprintf(logname, sizeof(logname), "%s_slat.log", o->lat_log_file); - setup_log(&td->slat_log, o->log_avg_msec, IO_LOG_TYPE_SLAT, - o->log_offset, logname); - snprintf(logname, sizeof(logname), "%s_clat.log", o->lat_log_file); - setup_log(&td->clat_log, o->log_avg_msec, IO_LOG_TYPE_CLAT, - o->log_offset, logname); + struct log_params p = { + .td = td, + .avg_msec = o->log_avg_msec, + .log_type = IO_LOG_TYPE_LAT, + .log_offset = o->log_offset, + .log_gz = o->log_gz, + }; + + snprintf(logname, sizeof(logname), "%s_lat.log", + o->lat_log_file); + setup_log(&td->lat_log, &p, logname); + snprintf(logname, sizeof(logname), "%s_slat.log", + o->lat_log_file); + setup_log(&td->slat_log, &p, logname); + snprintf(logname, sizeof(logname), "%s_clat.log", + o->lat_log_file); + setup_log(&td->clat_log, &p, logname); } if (o->bw_log_file) { - snprintf(logname, sizeof(logname), "%s_bw.log", o->bw_log_file); - setup_log(&td->bw_log, o->log_avg_msec, IO_LOG_TYPE_BW, - o->log_offset, logname); + struct log_params p = { + .td = td, + .avg_msec = o->log_avg_msec, + .log_type = IO_LOG_TYPE_BW, + .log_offset = o->log_offset, + .log_gz = o->log_gz, + }; + + snprintf(logname, sizeof(logname), "%s_bw.log", + o->bw_log_file); + setup_log(&td->bw_log, &p, logname); } if (o->iops_log_file) { - snprintf(logname, sizeof(logname), "%s_iops.log", o->iops_log_file); - setup_log(&td->iops_log, o->log_avg_msec, IO_LOG_TYPE_IOPS, - o->log_offset, logname); + struct log_params p = { + .td = td, + .avg_msec = o->log_avg_msec, + .log_type = IO_LOG_TYPE_IOPS, + .log_offset = o->log_offset, + .log_gz = o->log_gz, + }; + + snprintf(logname, sizeof(logname), "%s_iops.log", + o->iops_log_file); + setup_log(&td->iops_log, &p, logname); } if (!o->name) diff --git a/iolog.c b/iolog.c index 96afec66..7c05d328 100644 --- a/iolog.c +++ b/iolog.c @@ -6,14 +6,50 @@ #include #include #include +#ifdef CONFIG_ZLIB +#include +#endif + #include "flist.h" #include "fio.h" #include "verify.h" #include "trim.h" #include "filelock.h" +#include "tp.h" static const char iolog_ver2[] = "fio version 2 iolog"; +#ifdef CONFIG_ZLIB + +struct iolog_compress { + struct flist_head list; + void *buf; + size_t len; + unsigned int seq; +}; + +#define GZ_CHUNK 131072 + +static struct iolog_compress *get_new_chunk(unsigned int seq) +{ + struct iolog_compress *c; + + c = malloc(sizeof(*c)); + INIT_FLIST_HEAD(&c->list); + c->buf = malloc(GZ_CHUNK); + c->len = 0; + c->seq = seq; + return c; +} + +static void free_chunk(struct iolog_compress *ic) +{ + free(ic->buf); + free(ic); +} + +#endif + void queue_io_piece(struct thread_data *td, struct io_piece *ipo) { flist_add_tail(&ipo->list, &td->io_log_list); @@ -539,19 +575,31 @@ int init_iolog(struct thread_data *td) return ret; } -void setup_log(struct io_log **log, unsigned long avg_msec, int log_type, - int log_offset, const char *filename) +void setup_log(struct io_log **log, struct log_params *p, + const char *filename) { struct io_log *l = malloc(sizeof(*l)); memset(l, 0, sizeof(*l)); l->nr_samples = 0; l->max_samples = 1024; - l->log_type = log_type; - l->log_offset = log_offset; + l->log_type = p->log_type; + l->log_offset = p->log_offset; + l->log_gz = p->log_gz; l->log = malloc(l->max_samples * log_entry_sz(l)); - l->avg_msec = avg_msec; + l->avg_msec = p->avg_msec; l->filename = strdup(filename); + l->td = p->td; + + INIT_FLIST_HEAD(&l->chunk_list); + + if (l->log_gz && !p->td) + l->log_gz = 0; + else if (l->log_gz) { + pthread_mutex_init(&l->chunk_lock, NULL); + p->td->flags |= TD_F_COMPRESS_LOG; + } + *log = l; } @@ -588,24 +636,15 @@ void free_log(struct io_log *log) free(log); } -void __finish_log(struct io_log *log) +static void flush_samples(FILE *f, void *samples, uint64_t nr_samples, + int log_offset) { uint64_t i; - void *buf; - FILE *f; - - f = fopen(log->filename, "a"); - if (!f) { - perror("fopen log"); - return; - } - buf = set_file_buffer(f); + for (i = 0; i < nr_samples; i++) { + struct io_sample *s = __get_sample(samples, log_offset, i); - for (i = 0; i < log->nr_samples; i++) { - struct io_sample *s = get_sample(log, i); - - if (!log->log_offset) { + if (!log_offset) { fprintf(f, "%lu, %lu, %u, %u\n", (unsigned long) s->time, (unsigned long) s->val, @@ -620,6 +659,130 @@ void __finish_log(struct io_log *log) (unsigned long long) so->offset); } } +} + +#ifdef CONFIG_ZLIB +static int z_stream_init(z_stream *stream) +{ + stream->zalloc = Z_NULL; + stream->zfree = Z_NULL; + stream->opaque = Z_NULL; + stream->next_in = Z_NULL; + + if (inflateInit(stream) != Z_OK) + return 1; + + return 0; +} + +struct flush_chunk_iter { + unsigned int seq; + void *buf; + size_t buf_size; + size_t buf_used; + size_t chunk_sz; +}; + +static void finish_chunk(z_stream *stream, int log_offset, FILE *f, + struct flush_chunk_iter *iter) +{ + uint64_t nr_samples; + int ret; + + ret = inflateEnd(stream); + if (ret != Z_OK) + log_err("fio: failed to end log inflation (%d)\n", ret); + + nr_samples = iter->buf_used / __log_entry_sz(log_offset); + flush_samples(f, iter->buf, nr_samples, log_offset); + free(iter->buf); + iter->buf = NULL; + iter->buf_size = iter->buf_used = 0; +} + +static int flush_chunk(struct iolog_compress *ic, int log_offset, FILE *f, + z_stream *stream, struct flush_chunk_iter *iter) +{ + if (ic->seq != iter->seq) { + if (iter->seq) + finish_chunk(stream, log_offset, f, iter); + + z_stream_init(stream); + iter->seq = ic->seq; + } + + stream->avail_in = ic->len; + stream->next_in = ic->buf; + + if (!iter->buf_size) { + iter->buf_size = iter->chunk_sz; + iter->buf = malloc(iter->buf_size); + } + + while (stream->avail_in) { + int err; + + stream->avail_out = iter->buf_size - iter->buf_used; + stream->next_out = iter->buf + iter->buf_used; + + err = inflate(stream, Z_NO_FLUSH); + if (err < 0) { + log_err("fio: failed inflating log: %d\n", err); + break; + } + + iter->buf_used += iter->buf_size - iter->buf_used - stream->avail_out; + } + + free_chunk(ic); + return 0; +} + +static void flush_gz_chunks(struct io_log *log, FILE *f) +{ + struct flush_chunk_iter iter = { .chunk_sz = log->log_gz, }; + struct flist_head *node; + z_stream stream; + + while (!flist_empty(&log->chunk_list)) { + struct iolog_compress *ic; + + node = log->chunk_list.next; + ic = flist_entry(node, struct iolog_compress, list); + flist_del(&ic->list); + flush_chunk(ic, log->log_offset, f, &stream, &iter); + } + + if (iter.seq) { + finish_chunk(&stream, log->log_offset, f, &iter); + free(iter.buf); + } +} + +#else + +static void flush_gz_chunks(struct io_log *log, FILE *f) +{ +} + +#endif + +void flush_log(struct io_log *log) +{ + void *buf; + FILE *f; + + f = fopen(log->filename, "w"); + if (!f) { + perror("fopen log"); + return; + } + + buf = set_file_buffer(f); + + flush_gz_chunks(log, f); + + flush_samples(f, log->log, log->nr_samples, log->log_offset); fclose(f); clear_file_buffer(buf); @@ -627,22 +790,167 @@ void __finish_log(struct io_log *log) static int finish_log(struct thread_data *td, struct io_log *log, int trylock) { + if (td->tp_data) + iolog_flush(log, 1); + if (trylock) { if (fio_trylock_file(log->filename)) return 1; } else fio_lock_file(log->filename); - if (td->client_type == FIO_CLIENT_TYPE_GUI) { + if (td->client_type == FIO_CLIENT_TYPE_GUI) fio_send_iolog(td, log, log->filename); - } else - __finish_log(log); + else + flush_log(log); fio_unlock_file(log->filename); free_log(log); return 0; } +#ifdef CONFIG_ZLIB + +struct iolog_flush_data { + struct tp_work work; + struct io_log *log; + void *samples; + uint64_t nr_samples; +}; + +static int gz_work(struct tp_work *work) +{ + struct iolog_flush_data *data; + struct iolog_compress *c; + struct flist_head list; + unsigned int seq; + z_stream stream; + size_t total = 0; + int ret; + + INIT_FLIST_HEAD(&list); + + data = container_of(work, struct iolog_flush_data, work); + + stream.zalloc = Z_NULL; + stream.zfree = Z_NULL; + stream.opaque = Z_NULL; + + if (deflateInit(&stream, Z_DEFAULT_COMPRESSION) != Z_OK) { + log_err("fio: failed to init gz stream\n"); + return 0; + } + + seq = ++data->log->chunk_seq; + stream.next_in = (void *) data->samples; + stream.avail_in = data->nr_samples * log_entry_sz(data->log); + + do { + c = get_new_chunk(seq); + stream.avail_out = GZ_CHUNK; + stream.next_out = c->buf; + ret = deflate(&stream, Z_NO_FLUSH); + if (ret < 0) { + log_err("fio: deflate log (%d)\n", ret); + break; + } + + c->len = GZ_CHUNK - stream.avail_out; + flist_add_tail(&c->list, &list); + total += c->len; + } while (stream.avail_in); + + stream.next_out = c->buf + c->len; + stream.avail_out = GZ_CHUNK - c->len; + + ret = deflate(&stream, Z_FINISH); + if (ret == Z_STREAM_END) + c->len = GZ_CHUNK - stream.avail_out; + else { + do { + c = get_new_chunk(seq); + stream.avail_out = GZ_CHUNK; + stream.next_out = c->buf; + ret = deflate(&stream, Z_FINISH); + c->len = GZ_CHUNK - stream.avail_out; + flist_add_tail(&c->list, &list); + } while (ret != Z_STREAM_END); + } + + ret = deflateEnd(&stream); + if (ret != Z_OK) + log_err("fio: deflateEnd %d\n", ret); + + free(data->samples); + + if (!flist_empty(&list)) { + pthread_mutex_lock(&data->log->chunk_lock); + flist_splice_tail(&list, &data->log->chunk_list); + pthread_mutex_unlock(&data->log->chunk_lock); + } + + if (work->wait) { + work->done = 1; + pthread_cond_signal(&work->cv); + } else + free(data); + + return 0; +} + +int iolog_flush(struct io_log *log, int wait) +{ + struct thread_data *td = log->td; + struct iolog_flush_data *data; + size_t sample_size; + + data = malloc(sizeof(*data)); + if (!data) + return 1; + + data->log = log; + + sample_size = log->nr_samples * log_entry_sz(log); + data->samples = malloc(sample_size); + if (!data->samples) { + free(data); + return 1; + } + + memcpy(data->samples, log->log, sample_size); + data->nr_samples = log->nr_samples; + data->work.fn = gz_work; + log->nr_samples = 0; + + if (wait) { + pthread_mutex_init(&data->work.lock, NULL); + pthread_cond_init(&data->work.cv, NULL); + data->work.wait = 1; + } else + data->work.wait = 0; + + tp_queue_work(td->tp_data, &data->work); + + if (wait) { + pthread_mutex_lock(&data->work.lock); + while (!data->work.done) + pthread_cond_wait(&data->work.cv, &data->work.lock); + pthread_mutex_unlock(&data->work.lock); + free(data); + } + + return 0; +} + +#else + +int iolog_flush(struct io_log *log, int wait) +{ + return 1; +} + +#endif + static int write_iops_log(struct thread_data *td, int try) { struct io_log *log = td->iops_log; diff --git a/iolog.h b/iolog.h index f97d91f7..66c596e9 100644 --- a/iolog.h +++ b/iolog.h @@ -54,6 +54,8 @@ struct io_log { char *filename; + struct thread_data *td; + unsigned int log_type; /* @@ -66,6 +68,11 @@ struct io_log { */ unsigned int log_offset; + /* + * Max size of log entries before a chunk is compressed + */ + unsigned int log_gz; + /* * Windowed average, for logging single entries average over some * period of time. @@ -73,6 +80,10 @@ struct io_log { struct io_stat avg_window[DDIR_RWDIR_CNT]; unsigned long avg_msec; unsigned long avg_last; + + pthread_mutex_t chunk_lock; + unsigned int chunk_seq; + struct flist_head chunk_list; }; static inline size_t __log_entry_sz(int log_offset) @@ -156,6 +167,15 @@ extern void write_iolog_close(struct thread_data *); /* * Logging */ +struct log_params { + struct thread_data *td; + unsigned long avg_msec; + int log_type; + int log_offset; + int log_gz; + int log_compress; +}; + extern void finalize_logs(struct thread_data *td); extern void add_lat_sample(struct thread_data *, enum fio_ddir, unsigned long, unsigned int, uint64_t); @@ -169,13 +189,14 @@ extern void add_iops_sample(struct thread_data *, enum fio_ddir, unsigned int, struct timeval *); extern void init_disk_util(struct thread_data *); extern void update_rusage_stat(struct thread_data *); -extern void setup_log(struct io_log **, unsigned long, int, int, const char *); -extern void __finish_log(struct io_log *); +extern void setup_log(struct io_log **, struct log_params *, const char *); +extern void flush_log(struct io_log *); extern void free_log(struct io_log *); extern struct io_log *agg_io_log[DDIR_RWDIR_CNT]; extern int write_bw_log; extern void add_agg_sample(unsigned long, enum fio_ddir, unsigned int); extern void fio_writeout_logs(struct thread_data *); +extern int iolog_flush(struct io_log *, int); static inline void init_ipo(struct io_piece *ipo) { diff --git a/options.c b/options.c index 6d326d4e..75c9bf5e 100644 --- a/options.c +++ b/options.c @@ -3104,6 +3104,19 @@ struct fio_option fio_options[FIO_MAX_OPTS] = { .category = FIO_OPT_C_LOG, .group = FIO_OPT_G_INVALID, }, +#ifdef CONFIG_ZLIB + { + .name = "log_compression", + .lname = "Log compression", + .type = FIO_OPT_INT, + .off1 = td_var_offset(log_gz), + .help = "Log in compressed chunks of this size", + .minval = 32 * 1024 * 1024ULL, + .maxval = 512 * 1024 * 1024ULL, + .category = FIO_OPT_C_LOG, + .group = FIO_OPT_G_INVALID, + }, +#endif { .name = "bwavgtime", .lname = "Bandwidth average time", diff --git a/stat.c b/stat.c index 58744a82..ccf08a64 100644 --- a/stat.c +++ b/stat.c @@ -1565,7 +1565,7 @@ static void __add_log_sample(struct io_log *iolog, unsigned long val, enum fio_ddir ddir, unsigned int bs, unsigned long t, uint64_t offset) { - const int nr_samples = iolog->nr_samples; + uint64_t nr_samples = iolog->nr_samples; struct io_sample *s; if (iolog->disabled) @@ -1579,14 +1579,24 @@ static void __add_log_sample(struct io_log *iolog, unsigned long val, void *new_log; new_size = 2 * iolog->max_samples * log_entry_sz(iolog); - new_log = realloc(iolog->log, new_size); - if (!new_log) { - log_err("fio: failed extending iolog! Will stop logging.\n"); - iolog->disabled = 1; - return; + + if (iolog->log_gz && (new_size > iolog->log_gz)) { + if (iolog_flush(iolog, 0)) { + log_err("fio: failed flushing iolog! Will stop logging.\n"); + iolog->disabled = 1; + return; + } + nr_samples = iolog->nr_samples; + } else { + new_log = realloc(iolog->log, new_size); + if (!new_log) { + log_err("fio: failed extending iolog! Will stop logging.\n"); + iolog->disabled = 1; + return; + } + iolog->log = new_log; + iolog->max_samples <<= 1; } - iolog->log = new_log; - iolog->max_samples <<= 1; } s = get_sample(iolog, nr_samples); diff --git a/thread_options.h b/thread_options.h index e53000ab..deb3d675 100644 --- a/thread_options.h +++ b/thread_options.h @@ -109,6 +109,7 @@ struct thread_options { unsigned int use_os_rand; unsigned int log_avg_msec; unsigned int log_offset; + unsigned int log_gz; unsigned int norandommap; unsigned int softrandommap; unsigned int bs_unaligned; @@ -337,6 +338,7 @@ struct thread_options_pack { uint32_t use_os_rand; uint32_t log_avg_msec; uint32_t log_offset; + uint32_t log_gz; uint32_t norandommap; uint32_t softrandommap; uint32_t bs_unaligned; diff --git a/tp.c b/tp.c new file mode 100644 index 00000000..36e2d0f5 --- /dev/null +++ b/tp.c @@ -0,0 +1,104 @@ +#include +#include +#include +#include +#include +#include + +#include "smalloc.h" +#include "log.h" +#include "tp.h" + +static void tp_flush_work(struct flist_head *list) +{ + struct tp_work *work; + + while (!flist_empty(list)) { + work = flist_entry(list->next, struct tp_work, list); + flist_del(&work->list); + work->fn(work); + } +} + +static void *tp_thread(void *data) +{ + struct tp_data *tdat = data; + struct flist_head work_list; + + INIT_FLIST_HEAD(&work_list); + + printf("tp_thread running\n"); + + while (1) { + pthread_mutex_lock(&tdat->lock); + + if (!tdat->thread_exit && flist_empty(&tdat->work)) + pthread_cond_wait(&tdat->cv, &tdat->lock); + + if (!flist_empty(&tdat->work)) { + flist_splice(&tdat->work, &work_list); + INIT_FLIST_HEAD(&tdat->work); + } + + pthread_mutex_unlock(&tdat->lock); + + if (flist_empty(&work_list)) { + if (tdat->thread_exit) + break; + continue; + } + + tp_flush_work(&work_list); + } + + printf("outta here\n"); + return NULL; +} + +void tp_queue_work(struct tp_data *tdat, struct tp_work *work) +{ + work->done = 0; + + pthread_mutex_lock(&tdat->lock); + flist_add_tail(&work->list, &tdat->work); + pthread_cond_signal(&tdat->cv); + pthread_mutex_unlock(&tdat->lock); +} + +void tp_init(struct tp_data **tdatp) +{ + struct tp_data *tdat; + int ret; + + if (*tdatp) + return; + + *tdatp = tdat = smalloc(sizeof(*tdat)); + pthread_mutex_init(&tdat->lock, NULL); + INIT_FLIST_HEAD(&tdat->work); + pthread_cond_init(&tdat->cv, NULL); + pthread_cond_init(&tdat->sleep_cv, NULL); + + ret = pthread_create(&tdat->thread, NULL, tp_thread, tdat); + if (ret) + log_err("fio: failed to create tp thread\n"); +} + +void tp_exit(struct tp_data **tdatp) +{ + struct tp_data *tdat = *tdatp; + void *ret; + + if (!tdat) + return; + + tdat->thread_exit = 1; + pthread_mutex_lock(&tdat->lock); + pthread_cond_signal(&tdat->cv); + pthread_mutex_unlock(&tdat->lock); + + pthread_join(tdat->thread, &ret); + + sfree(tdat); + *tdatp = NULL; +} diff --git a/tp.h b/tp.h new file mode 100644 index 00000000..b1aa1e2d --- /dev/null +++ b/tp.h @@ -0,0 +1,32 @@ +#ifndef FIO_TP_H +#define FIO_TP_H + +#include "flist.h" + +struct tp_work; +typedef int (tp_work_fn)(struct tp_work *); + +struct tp_work { + struct flist_head list; + tp_work_fn *fn; + int wait; + pthread_cond_t cv; + pthread_mutex_t lock; + volatile int done; +}; + +struct tp_data { + pthread_t thread; + pthread_cond_t cv; + pthread_mutex_t lock; + struct flist_head work; + volatile int thread_exit; + pthread_cond_t sleep_cv; + volatile int sleeping; +}; + +extern void tp_init(struct tp_data **); +extern void tp_exit(struct tp_data **); +extern void tp_queue_work(struct tp_data *, struct tp_work *); + +#endif -- 2.25.1