+ fio_lock_file(log->filename);
+
+ if (td->client_type == FIO_CLIENT_TYPE_GUI || is_backend)
+ fio_send_iolog(td, log, log->filename);
+ else
+ flush_log(log, !td->o.per_job_logs);
+
+ fio_unlock_file(log->filename);
+ free_log(log);
+ return 0;
+}
+
+size_t log_chunk_sizes(struct io_log *log)
+{
+ struct flist_head *entry;
+ size_t ret;
+
+ if (flist_empty(&log->chunk_list))
+ return 0;
+
+ ret = 0;
+ pthread_mutex_lock(&log->chunk_lock);
+ flist_for_each(entry, &log->chunk_list) {
+ struct iolog_compress *c;
+
+ c = flist_entry(entry, struct iolog_compress, list);
+ ret += c->len;
+ }
+ pthread_mutex_unlock(&log->chunk_lock);
+ return ret;
+}
+
+#ifdef CONFIG_ZLIB
+
+static bool warned_on_drop;
+
+static void iolog_put_deferred(struct io_log *log, void *ptr)
+{
+ if (!ptr)
+ return;
+
+ pthread_mutex_lock(&log->deferred_free_lock);
+ if (log->deferred < IOLOG_MAX_DEFER) {
+ log->deferred_items[log->deferred] = ptr;
+ log->deferred++;
+ } else if (!warned_on_drop) {
+ log_err("fio: had to drop log entry free\n");
+ warned_on_drop = true;
+ }
+ pthread_mutex_unlock(&log->deferred_free_lock);
+}
+
+static void iolog_free_deferred(struct io_log *log)
+{
+ int i;
+
+ if (!log->deferred)
+ return;
+
+ pthread_mutex_lock(&log->deferred_free_lock);
+
+ for (i = 0; i < log->deferred; i++) {
+ free(log->deferred_items[i]);
+ log->deferred_items[i] = NULL;
+ }
+
+ log->deferred = 0;
+ pthread_mutex_unlock(&log->deferred_free_lock);
+}
+
+static int gz_work(struct iolog_flush_data *data)
+{
+ struct iolog_compress *c = NULL;
+ struct flist_head list;
+ unsigned int seq;
+ z_stream stream;
+ size_t total = 0;
+ int ret;
+
+ INIT_FLIST_HEAD(&list);
+
+ memset(&stream, 0, sizeof(stream));
+ stream.zalloc = Z_NULL;
+ stream.zfree = Z_NULL;
+ stream.opaque = Z_NULL;
+
+ ret = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
+ if (ret != Z_OK) {
+ log_err("fio: failed to init gz stream\n");
+ goto err;
+ }
+
+ seq = ++data->log->chunk_seq;
+
+ stream.next_in = (void *) data->samples;
+ stream.avail_in = data->nr_samples * log_entry_sz(data->log);
+
+ dprint(FD_COMPRESS, "deflate input size=%lu, seq=%u, log=%s\n",
+ (unsigned long) stream.avail_in, seq,
+ data->log->filename);
+ do {
+ if (c)
+ dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
+ (unsigned long) c->len);
+ c = get_new_chunk(seq);
+ stream.avail_out = GZ_CHUNK;
+ stream.next_out = c->buf;
+ ret = deflate(&stream, Z_NO_FLUSH);
+ if (ret < 0) {
+ log_err("fio: deflate log (%d)\n", ret);
+ free_chunk(c);
+ goto err;
+ }
+
+ c->len = GZ_CHUNK - stream.avail_out;
+ flist_add_tail(&c->list, &list);
+ total += c->len;
+ } while (stream.avail_in);
+
+ stream.next_out = c->buf + c->len;
+ stream.avail_out = GZ_CHUNK - c->len;
+
+ ret = deflate(&stream, Z_FINISH);
+ if (ret < 0) {
+ /*
+ * Z_BUF_ERROR is special, it just means we need more
+ * output space. We'll handle that below. Treat any other
+ * error as fatal.
+ */
+ if (ret != Z_BUF_ERROR) {
+ log_err("fio: deflate log (%d)\n", ret);
+ flist_del(&c->list);
+ free_chunk(c);
+ goto err;
+ }
+ }
+
+ total -= c->len;
+ c->len = GZ_CHUNK - stream.avail_out;
+ total += c->len;
+ dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq, (unsigned long) c->len);
+
+ if (ret != Z_STREAM_END) {
+ do {
+ c = get_new_chunk(seq);
+ stream.avail_out = GZ_CHUNK;
+ stream.next_out = c->buf;
+ ret = deflate(&stream, Z_FINISH);
+ c->len = GZ_CHUNK - stream.avail_out;
+ total += c->len;
+ flist_add_tail(&c->list, &list);
+ dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
+ (unsigned long) c->len);
+ } while (ret != Z_STREAM_END);
+ }
+
+ dprint(FD_COMPRESS, "deflated to size=%lu\n", (unsigned long) total);
+
+ ret = deflateEnd(&stream);
+ if (ret != Z_OK)
+ log_err("fio: deflateEnd %d\n", ret);
+
+ iolog_put_deferred(data->log, data->samples);
+
+ if (!flist_empty(&list)) {
+ pthread_mutex_lock(&data->log->chunk_lock);
+ flist_splice_tail(&list, &data->log->chunk_list);
+ pthread_mutex_unlock(&data->log->chunk_lock);
+ }
+
+ ret = 0;
+done:
+ if (data->free)
+ sfree(data);
+ return ret;
+err:
+ while (!flist_empty(&list)) {
+ c = flist_first_entry(list.next, struct iolog_compress, list);
+ flist_del(&c->list);
+ free_chunk(c);
+ }
+ ret = 1;
+ goto done;
+}
+
+/*
+ * Invoked from our compress helper thread, when logging would have exceeded
+ * the specified memory limitation. Compresses the previously stored
+ * entries.
+ */
+static int gz_work_async(struct submit_worker *sw, struct workqueue_work *work)
+{
+ return gz_work(container_of(work, struct iolog_flush_data, work));
+}
+
+static int gz_init_worker(struct submit_worker *sw)
+{
+ struct thread_data *td = sw->wq->td;
+
+ if (!fio_option_is_set(&td->o, log_gz_cpumask))
+ return 0;
+
+ if (fio_setaffinity(gettid(), td->o.log_gz_cpumask) == -1) {
+ log_err("gz: failed to set CPU affinity\n");
+ return 1;
+ }
+
+ return 0;
+}
+
+static struct workqueue_ops log_compress_wq_ops = {
+ .fn = gz_work_async,
+ .init_worker_fn = gz_init_worker,
+ .nice = 1,
+};
+
+int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
+{
+ if (!(td->flags & TD_F_COMPRESS_LOG))
+ return 0;
+
+ workqueue_init(td, &td->log_compress_wq, &log_compress_wq_ops, 1, sk_out);
+ return 0;
+}
+
+void iolog_compress_exit(struct thread_data *td)
+{
+ if (!(td->flags & TD_F_COMPRESS_LOG))
+ return;
+
+ workqueue_exit(&td->log_compress_wq);
+}
+
+/*
+ * Queue work item to compress the existing log entries. We reset the
+ * current log to a small size, and reference the existing log in the
+ * data that we queue for compression. Once compression has been done,
+ * this old log is freed. If called with finish == true, will not return
+ * until the log compression has completed, and will flush all previous
+ * logs too
+ */
+static int iolog_flush(struct io_log *log)
+{
+ struct iolog_flush_data *data;
+
+ data = malloc(sizeof(*data));
+ if (!data)
+ return 1;
+
+ data->log = log;
+ data->free = false;
+
+ while (!flist_empty(&log->io_logs)) {
+ struct io_logs *cur_log;
+
+ cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
+ flist_del_init(&cur_log->list);
+
+ data->samples = cur_log->log;
+ data->nr_samples = cur_log->nr_samples;
+
+ sfree(cur_log);
+
+ gz_work(data);
+ }
+
+ free(data);
+ return 0;
+}
+
+int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
+{
+ struct iolog_flush_data *data;
+
+ data = smalloc(sizeof(*data));
+ if (!data)
+ return 1;
+
+ data->log = log;
+
+ data->samples = cur_log->log;
+ data->nr_samples = cur_log->nr_samples;
+ data->free = true;
+
+ cur_log->nr_samples = cur_log->max_samples = 0;
+ cur_log->log = NULL;
+
+ workqueue_enqueue(&log->td->log_compress_wq, &data->work);
+
+ iolog_free_deferred(log);
+
+ return 0;
+}
+#else
+
+static int iolog_flush(struct io_log *log)
+{
+ return 1;
+}
+
+int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
+{
+ return 1;
+}
+
+int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
+{
+ return 0;
+}
+
+void iolog_compress_exit(struct thread_data *td)
+{
+}
+
+#endif
+
+struct io_logs *iolog_cur_log(struct io_log *log)
+{
+ if (flist_empty(&log->io_logs))
+ return NULL;
+
+ return flist_last_entry(&log->io_logs, struct io_logs, list);
+}
+
+uint64_t iolog_nr_samples(struct io_log *iolog)
+{
+ struct flist_head *entry;
+ uint64_t ret = 0;
+
+ flist_for_each(entry, &iolog->io_logs) {
+ struct io_logs *cur_log;
+
+ cur_log = flist_entry(entry, struct io_logs, list);
+ ret += cur_log->nr_samples;
+ }
+
+ return ret;
+}
+
+static int __write_log(struct thread_data *td, struct io_log *log, int try)
+{
+ if (log)
+ return finish_log(td, log, try);
+
+ return 0;
+}
+
+static int write_iops_log(struct thread_data *td, int try, bool unit_log)
+{
+ int ret;
+
+ if (per_unit_log(td->iops_log) != unit_log)
+ return 0;
+
+ ret = __write_log(td, td->iops_log, try);
+ if (!ret)
+ td->iops_log = NULL;
+
+ return ret;
+}
+
+static int write_slat_log(struct thread_data *td, int try, bool unit_log)
+{
+ int ret;
+
+ if (!unit_log)
+ return 0;
+
+ ret = __write_log(td, td->slat_log, try);
+ if (!ret)
+ td->slat_log = NULL;
+
+ return ret;
+}
+
+static int write_clat_log(struct thread_data *td, int try, bool unit_log)
+{
+ int ret;
+
+ if (!unit_log)
+ return 0;
+
+ ret = __write_log(td, td->clat_log, try);
+ if (!ret)
+ td->clat_log = NULL;
+
+ return ret;
+}
+
+static int write_clat_hist_log(struct thread_data *td, int try, bool unit_log)
+{
+ int ret;
+
+ if (!unit_log)
+ return 0;
+
+ ret = __write_log(td, td->clat_hist_log, try);
+ if (!ret)
+ td->clat_hist_log = NULL;
+
+ return ret;