2 * Code related to writing an iolog of what a thread is doing, and to
3 * later read that back and replay
25 static int iolog_flush(struct io_log *log);
27 static const char iolog_ver2[] = "fio version 2 iolog";
29 void queue_io_piece(struct thread_data *td, struct io_piece *ipo)
31 flist_add_tail(&ipo->list, &td->io_log_list);
32 td->total_io_size += ipo->len;
35 void log_io_u(const struct thread_data *td, const struct io_u *io_u)
37 if (!td->o.write_iolog_file)
40 fprintf(td->iolog_f, "%s %s %llu %lu\n", io_u->file->file_name,
41 io_ddir_name(io_u->ddir),
42 io_u->offset, io_u->buflen);
45 void log_file(struct thread_data *td, struct fio_file *f,
46 enum file_log_act what)
48 const char *act[] = { "add", "open", "close" };
52 if (!td->o.write_iolog_file)
57 * this happens on the pre-open/close done before the job starts
62 fprintf(td->iolog_f, "%s %s\n", f->file_name, act[what]);
65 static void iolog_delay(struct thread_data *td, unsigned long delay)
67 uint64_t usec = utime_since_now(&td->last_issue);
71 if (delay < td->time_offset) {
76 delay -= td->time_offset;
82 fio_gettime(&ts, NULL);
83 while (delay && !td->terminate) {
85 if (this_delay > 500000)
88 usec_sleep(td, this_delay);
92 usec = utime_since_now(&ts);
94 td->time_offset = usec - delay;
99 static int ipo_special(struct thread_data *td, struct io_piece *ipo)
107 if (ipo->ddir != DDIR_INVAL)
110 f = td->files[ipo->fileno];
112 switch (ipo->file_action) {
113 case FIO_LOG_OPEN_FILE:
114 if (td->o.replay_redirect && fio_file_open(f)) {
115 dprint(FD_FILE, "iolog: ignoring re-open of file %s\n",
119 ret = td_io_open_file(td, f);
122 td_verror(td, ret, "iolog open file");
124 case FIO_LOG_CLOSE_FILE:
125 td_io_close_file(td, f);
127 case FIO_LOG_UNLINK_FILE:
128 td_io_unlink_file(td, f);
131 log_err("fio: bad file action %d\n", ipo->file_action);
138 int read_iolog_get(struct thread_data *td, struct io_u *io_u)
140 struct io_piece *ipo;
141 unsigned long elapsed;
143 while (!flist_empty(&td->io_log_list)) {
146 ipo = flist_first_entry(&td->io_log_list, struct io_piece, list);
147 flist_del(&ipo->list);
148 remove_trim_entry(td, ipo);
150 ret = ipo_special(td, ipo);
154 } else if (ret > 0) {
159 io_u->ddir = ipo->ddir;
160 if (ipo->ddir != DDIR_WAIT) {
161 io_u->offset = ipo->offset;
162 io_u->buflen = ipo->len;
163 io_u->file = td->files[ipo->fileno];
164 get_file(io_u->file);
165 dprint(FD_IO, "iolog: get %llu/%lu/%s\n", io_u->offset,
166 io_u->buflen, io_u->file->file_name);
168 iolog_delay(td, ipo->delay);
170 elapsed = mtime_since_genesis();
171 if (ipo->delay > elapsed)
172 usec_sleep(td, (ipo->delay - elapsed) * 1000);
177 if (io_u->ddir != DDIR_WAIT)
185 void prune_io_piece_log(struct thread_data *td)
187 struct io_piece *ipo;
188 struct fio_rb_node *n;
190 while ((n = rb_first(&td->io_hist_tree)) != NULL) {
191 ipo = rb_entry(n, struct io_piece, rb_node);
192 rb_erase(n, &td->io_hist_tree);
193 remove_trim_entry(td, ipo);
198 while (!flist_empty(&td->io_hist_list)) {
199 ipo = flist_first_entry(&td->io_hist_list, struct io_piece, list);
200 flist_del(&ipo->list);
201 remove_trim_entry(td, ipo);
208 * log a successful write, so we can unwind the log for verify
210 void log_io_piece(struct thread_data *td, struct io_u *io_u)
212 struct fio_rb_node **p, *parent;
213 struct io_piece *ipo, *__ipo;
215 ipo = malloc(sizeof(struct io_piece));
217 ipo->file = io_u->file;
218 ipo->offset = io_u->offset;
219 ipo->len = io_u->buflen;
220 ipo->numberio = io_u->numberio;
221 ipo->flags = IP_F_IN_FLIGHT;
225 if (io_u_should_trim(td, io_u)) {
226 flist_add_tail(&ipo->trim_list, &td->trim_list);
231 * We don't need to sort the entries if we only performed sequential
232 * writes. In this case, just reading back data in the order we wrote
233 * it out is the faster but still safe.
235 * One exception is if we don't have a random map in which case we need
236 * to check for duplicate blocks and drop the old one, which we rely on
237 * the rb insert/lookup for handling.
239 if (((!td->o.verifysort) || !td_random(td)) &&
240 file_randommap(td, ipo->file)) {
241 INIT_FLIST_HEAD(&ipo->list);
242 flist_add_tail(&ipo->list, &td->io_hist_list);
243 ipo->flags |= IP_F_ONLIST;
248 RB_CLEAR_NODE(&ipo->rb_node);
251 * Sort the entry into the verification list
254 p = &td->io_hist_tree.rb_node;
260 __ipo = rb_entry(parent, struct io_piece, rb_node);
261 if (ipo->file < __ipo->file)
263 else if (ipo->file > __ipo->file)
265 else if (ipo->offset < __ipo->offset) {
267 overlap = ipo->offset + ipo->len > __ipo->offset;
269 else if (ipo->offset > __ipo->offset) {
271 overlap = __ipo->offset + __ipo->len > ipo->offset;
277 dprint(FD_IO, "iolog: overlap %llu/%lu, %llu/%lu\n",
278 __ipo->offset, __ipo->len,
279 ipo->offset, ipo->len);
281 rb_erase(parent, &td->io_hist_tree);
282 remove_trim_entry(td, __ipo);
283 if (!(__ipo->flags & IP_F_IN_FLIGHT))
289 rb_link_node(&ipo->rb_node, parent, p);
290 rb_insert_color(&ipo->rb_node, &td->io_hist_tree);
291 ipo->flags |= IP_F_ONRB;
295 void unlog_io_piece(struct thread_data *td, struct io_u *io_u)
297 struct io_piece *ipo = io_u->ipo;
299 if (td->ts.nr_block_infos) {
300 uint32_t *info = io_u_block_info(td, io_u);
301 if (BLOCK_INFO_STATE(*info) < BLOCK_STATE_TRIM_FAILURE) {
302 if (io_u->ddir == DDIR_TRIM)
303 *info = BLOCK_INFO_SET_STATE(*info,
304 BLOCK_STATE_TRIM_FAILURE);
305 else if (io_u->ddir == DDIR_WRITE)
306 *info = BLOCK_INFO_SET_STATE(*info,
307 BLOCK_STATE_WRITE_FAILURE);
314 if (ipo->flags & IP_F_ONRB)
315 rb_erase(&ipo->rb_node, &td->io_hist_tree);
316 else if (ipo->flags & IP_F_ONLIST)
317 flist_del(&ipo->list);
324 void trim_io_piece(struct thread_data *td, const struct io_u *io_u)
326 struct io_piece *ipo = io_u->ipo;
331 ipo->len = io_u->xfer_buflen - io_u->resid;
334 void write_iolog_close(struct thread_data *td)
340 td->iolog_buf = NULL;
344 * Read version 2 iolog data. It is enhanced to include per-file logging,
347 static int read_iolog2(struct thread_data *td, FILE *f)
349 unsigned long long offset;
351 int reads, writes, waits, fileno = 0, file_action = 0; /* stupid gcc */
352 char *rfname, *fname, *act;
356 free_release_files(td);
359 * Read in the read iolog and store it, reuse the infrastructure
360 * for doing verifications.
363 rfname = fname = malloc(256+16);
364 act = malloc(256+16);
366 reads = writes = waits = 0;
367 while ((p = fgets(str, 4096, f)) != NULL) {
368 struct io_piece *ipo;
371 r = sscanf(p, "%256s %256s %llu %u", rfname, act, &offset,
374 if (td->o.replay_redirect)
375 fname = td->o.replay_redirect;
381 if (!strcmp(act, "wait"))
383 else if (!strcmp(act, "read"))
385 else if (!strcmp(act, "write"))
387 else if (!strcmp(act, "sync"))
389 else if (!strcmp(act, "datasync"))
391 else if (!strcmp(act, "trim"))
394 log_err("fio: bad iolog file action: %s\n",
398 fileno = get_fileno(td, fname);
401 if (!strcmp(act, "add")) {
402 if (td->o.replay_redirect &&
403 get_fileno(td, fname) != -1) {
404 dprint(FD_FILE, "iolog: ignoring"
405 " re-add of file %s\n", fname);
407 fileno = add_file(td, fname, 0, 1);
408 file_action = FIO_LOG_ADD_FILE;
411 } else if (!strcmp(act, "open")) {
412 fileno = get_fileno(td, fname);
413 file_action = FIO_LOG_OPEN_FILE;
414 } else if (!strcmp(act, "close")) {
415 fileno = get_fileno(td, fname);
416 file_action = FIO_LOG_CLOSE_FILE;
418 log_err("fio: bad iolog file action: %s\n",
423 log_err("bad iolog2: %s\n", p);
429 else if (rw == DDIR_WRITE) {
431 * Don't add a write for ro mode
436 } else if (rw == DDIR_WAIT) {
440 } else if (rw == DDIR_INVAL) {
441 } else if (!ddir_sync(rw)) {
442 log_err("bad ddir: %d\n", rw);
449 ipo = malloc(sizeof(*ipo));
452 if (rw == DDIR_WAIT) {
455 if (td->o.replay_scale)
456 ipo->offset = offset / td->o.replay_scale;
458 ipo->offset = offset;
459 ipo_bytes_align(td->o.replay_align, ipo);
462 if (rw != DDIR_INVAL && bytes > td->o.max_bs[rw])
463 td->o.max_bs[rw] = bytes;
464 ipo->fileno = fileno;
465 ipo->file_action = file_action;
469 queue_io_piece(td, ipo);
476 if (writes && read_only) {
477 log_err("fio: <%s> skips replay of %d writes due to"
478 " read-only\n", td->o.name, writes);
482 if (!reads && !writes && !waits)
484 else if (reads && !writes)
485 td->o.td_ddir = TD_DDIR_READ;
486 else if (!reads && writes)
487 td->o.td_ddir = TD_DDIR_WRITE;
489 td->o.td_ddir = TD_DDIR_RW;
495 * open iolog, check version, and call appropriate parser
497 static int init_iolog_read(struct thread_data *td)
499 char buffer[256], *p;
503 f = fopen(td->o.read_iolog_file, "r");
505 perror("fopen read iolog");
509 p = fgets(buffer, sizeof(buffer), f);
511 td_verror(td, errno, "iolog read");
512 log_err("fio: unable to read iolog\n");
518 * version 2 of the iolog stores a specific string as the
519 * first line, check for that
521 if (!strncmp(iolog_ver2, buffer, strlen(iolog_ver2)))
522 ret = read_iolog2(td, f);
524 log_err("fio: iolog version 1 is no longer supported\n");
533 * Set up a log for storing io patterns.
535 static int init_iolog_write(struct thread_data *td)
541 f = fopen(td->o.write_iolog_file, "a");
543 perror("fopen write iolog");
548 * That's it for writing, setup a log buffer and we're done.
551 td->iolog_buf = malloc(8192);
552 setvbuf(f, td->iolog_buf, _IOFBF, 8192);
555 * write our version line
557 if (fprintf(f, "%s\n", iolog_ver2) < 0) {
558 perror("iolog init\n");
563 * add all known files
565 for_each_file(td, ff, i)
566 log_file(td, ff, FIO_LOG_ADD_FILE);
571 int init_iolog(struct thread_data *td)
575 if (td->o.read_iolog_file) {
579 * Check if it's a blktrace file and load that if possible.
580 * Otherwise assume it's a normal log file and load that.
582 if (is_blktrace(td->o.read_iolog_file, &need_swap))
583 ret = load_blktrace(td, td->o.read_iolog_file, need_swap);
585 ret = init_iolog_read(td);
586 } else if (td->o.write_iolog_file)
587 ret = init_iolog_write(td);
590 td_verror(td, EINVAL, "failed initializing iolog");
595 void setup_log(struct io_log **log, struct log_params *p,
596 const char *filename)
600 struct io_u_plat_entry *entry;
601 struct flist_head *list;
603 l = scalloc(1, sizeof(*l));
604 INIT_FLIST_HEAD(&l->io_logs);
605 l->log_type = p->log_type;
606 l->log_offset = p->log_offset;
607 l->log_gz = p->log_gz;
608 l->log_gz_store = p->log_gz_store;
609 l->avg_msec = p->avg_msec;
610 l->hist_msec = p->hist_msec;
611 l->hist_coarseness = p->hist_coarseness;
612 l->filename = strdup(filename);
615 /* Initialize histogram lists for each r/w direction,
616 * with initial io_u_plat of all zeros:
618 for (i = 0; i < DDIR_RWDIR_CNT; i++) {
619 list = &l->hist_window[i].list;
620 INIT_FLIST_HEAD(list);
621 entry = calloc(1, sizeof(struct io_u_plat_entry));
622 flist_add(&entry->list, list);
625 if (l->td && l->td->o.io_submit_mode != IO_MODE_OFFLOAD) {
628 p = calloc(1, sizeof(*l->pending));
629 p->max_samples = DEF_LOG_ENTRIES;
630 p->log = calloc(p->max_samples, log_entry_sz(l));
635 l->log_ddir_mask = LOG_OFFSET_SAMPLE_BIT;
637 INIT_FLIST_HEAD(&l->chunk_list);
639 if (l->log_gz && !p->td)
641 else if (l->log_gz || l->log_gz_store) {
642 mutex_init_pshared(&l->chunk_lock);
643 mutex_init_pshared(&l->deferred_free_lock);
644 p->td->flags |= TD_F_COMPRESS_LOG;
650 #ifdef CONFIG_SETVBUF
651 static void *set_file_buffer(FILE *f)
653 size_t size = 1048576;
657 setvbuf(f, buf, _IOFBF, size);
661 static void clear_file_buffer(void *buf)
666 static void *set_file_buffer(FILE *f)
671 static void clear_file_buffer(void *buf)
676 void free_log(struct io_log *log)
678 while (!flist_empty(&log->io_logs)) {
679 struct io_logs *cur_log;
681 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
682 flist_del_init(&cur_log->list);
688 free(log->pending->log);
698 uint64_t hist_sum(int j, int stride, uint64_t *io_u_plat,
699 uint64_t *io_u_plat_last)
704 if (io_u_plat_last) {
705 for (k = sum = 0; k < stride; k++)
706 sum += io_u_plat[j + k] - io_u_plat_last[j + k];
708 for (k = sum = 0; k < stride; k++)
709 sum += io_u_plat[j + k];
715 static void flush_hist_samples(FILE *f, int hist_coarseness, void *samples,
716 uint64_t sample_size)
720 uint64_t i, j, nr_samples;
721 struct io_u_plat_entry *entry, *entry_before;
723 uint64_t *io_u_plat_before;
725 int stride = 1 << hist_coarseness;
730 s = __get_sample(samples, 0, 0);
731 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
733 nr_samples = sample_size / __log_entry_sz(log_offset);
735 for (i = 0; i < nr_samples; i++) {
736 s = __get_sample(samples, log_offset, i);
738 entry = s->data.plat_entry;
739 io_u_plat = entry->io_u_plat;
741 entry_before = flist_first_entry(&entry->list, struct io_u_plat_entry, list);
742 io_u_plat_before = entry_before->io_u_plat;
744 fprintf(f, "%lu, %u, %u, ", (unsigned long) s->time,
745 io_sample_ddir(s), s->bs);
746 for (j = 0; j < FIO_IO_U_PLAT_NR - stride; j += stride) {
747 fprintf(f, "%llu, ", (unsigned long long)
748 hist_sum(j, stride, io_u_plat, io_u_plat_before));
750 fprintf(f, "%llu\n", (unsigned long long)
751 hist_sum(FIO_IO_U_PLAT_NR - stride, stride, io_u_plat,
754 flist_del(&entry_before->list);
759 void flush_samples(FILE *f, void *samples, uint64_t sample_size)
763 uint64_t i, nr_samples;
768 s = __get_sample(samples, 0, 0);
769 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
771 nr_samples = sample_size / __log_entry_sz(log_offset);
773 for (i = 0; i < nr_samples; i++) {
774 s = __get_sample(samples, log_offset, i);
777 fprintf(f, "%lu, %" PRId64 ", %u, %u\n",
778 (unsigned long) s->time,
780 io_sample_ddir(s), s->bs);
782 struct io_sample_offset *so = (void *) s;
784 fprintf(f, "%lu, %" PRId64 ", %u, %u, %llu\n",
785 (unsigned long) s->time,
787 io_sample_ddir(s), s->bs,
788 (unsigned long long) so->offset);
795 struct iolog_flush_data {
796 struct workqueue_work work;
803 #define GZ_CHUNK 131072
805 static struct iolog_compress *get_new_chunk(unsigned int seq)
807 struct iolog_compress *c;
809 c = malloc(sizeof(*c));
810 INIT_FLIST_HEAD(&c->list);
811 c->buf = malloc(GZ_CHUNK);
817 static void free_chunk(struct iolog_compress *ic)
823 static int z_stream_init(z_stream *stream, int gz_hdr)
827 memset(stream, 0, sizeof(*stream));
828 stream->zalloc = Z_NULL;
829 stream->zfree = Z_NULL;
830 stream->opaque = Z_NULL;
831 stream->next_in = Z_NULL;
834 * zlib magic - add 32 for auto-detection of gz header or not,
835 * if we decide to store files in a gzip friendly format.
840 if (inflateInit2(stream, wbits) != Z_OK)
846 struct inflate_chunk_iter {
855 static void finish_chunk(z_stream *stream, FILE *f,
856 struct inflate_chunk_iter *iter)
860 ret = inflateEnd(stream);
862 log_err("fio: failed to end log inflation seq %d (%d)\n",
865 flush_samples(f, iter->buf, iter->buf_used);
868 iter->buf_size = iter->buf_used = 0;
872 * Iterative chunk inflation. Handles cases where we cross into a new
873 * sequence, doing flush finish of previous chunk if needed.
875 static size_t inflate_chunk(struct iolog_compress *ic, int gz_hdr, FILE *f,
876 z_stream *stream, struct inflate_chunk_iter *iter)
880 dprint(FD_COMPRESS, "inflate chunk size=%lu, seq=%u\n",
881 (unsigned long) ic->len, ic->seq);
883 if (ic->seq != iter->seq) {
885 finish_chunk(stream, f, iter);
887 z_stream_init(stream, gz_hdr);
891 stream->avail_in = ic->len;
892 stream->next_in = ic->buf;
894 if (!iter->buf_size) {
895 iter->buf_size = iter->chunk_sz;
896 iter->buf = malloc(iter->buf_size);
899 while (stream->avail_in) {
900 size_t this_out = iter->buf_size - iter->buf_used;
903 stream->avail_out = this_out;
904 stream->next_out = iter->buf + iter->buf_used;
906 err = inflate(stream, Z_NO_FLUSH);
908 log_err("fio: failed inflating log: %d\n", err);
913 iter->buf_used += this_out - stream->avail_out;
915 if (!stream->avail_out) {
916 iter->buf_size += iter->chunk_sz;
917 iter->buf = realloc(iter->buf, iter->buf_size);
921 if (err == Z_STREAM_END)
925 ret = (void *) stream->next_in - ic->buf;
927 dprint(FD_COMPRESS, "inflated to size=%lu\n", (unsigned long) iter->buf_size);
933 * Inflate stored compressed chunks, or write them directly to the log
934 * file if so instructed.
936 static int inflate_gz_chunks(struct io_log *log, FILE *f)
938 struct inflate_chunk_iter iter = { .chunk_sz = log->log_gz, };
941 while (!flist_empty(&log->chunk_list)) {
942 struct iolog_compress *ic;
944 ic = flist_first_entry(&log->chunk_list, struct iolog_compress, list);
945 flist_del(&ic->list);
947 if (log->log_gz_store) {
950 dprint(FD_COMPRESS, "log write chunk size=%lu, "
951 "seq=%u\n", (unsigned long) ic->len, ic->seq);
953 ret = fwrite(ic->buf, ic->len, 1, f);
954 if (ret != 1 || ferror(f)) {
956 log_err("fio: error writing compressed log\n");
959 inflate_chunk(ic, log->log_gz_store, f, &stream, &iter);
965 finish_chunk(&stream, f, &iter);
973 * Open compressed log file and decompress the stored chunks and
974 * write them to stdout. The chunks are stored sequentially in the
975 * file, so we iterate over them and do them one-by-one.
977 int iolog_file_inflate(const char *file)
979 struct inflate_chunk_iter iter = { .chunk_sz = 64 * 1024 * 1024, };
980 struct iolog_compress ic;
988 f = fopen(file, "r");
994 if (stat(file, &sb) < 0) {
1000 ic.buf = buf = malloc(sb.st_size);
1001 ic.len = sb.st_size;
1004 ret = fread(ic.buf, ic.len, 1, f);
1010 } else if (ret != 1) {
1011 log_err("fio: short read on reading log\n");
1020 * Each chunk will return Z_STREAM_END. We don't know how many
1021 * chunks are in the file, so we just keep looping and incrementing
1022 * the sequence number until we have consumed the whole compressed
1029 iret = inflate_chunk(&ic, 1, stdout, &stream, &iter);
1042 finish_chunk(&stream, stdout, &iter);
1052 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1057 int iolog_file_inflate(const char *file)
1059 log_err("fio: log inflation not possible without zlib\n");
1065 void flush_log(struct io_log *log, bool do_append)
1071 f = fopen(log->filename, "w");
1073 f = fopen(log->filename, "a");
1075 perror("fopen log");
1079 buf = set_file_buffer(f);
1081 inflate_gz_chunks(log, f);
1083 while (!flist_empty(&log->io_logs)) {
1084 struct io_logs *cur_log;
1086 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1087 flist_del_init(&cur_log->list);
1089 if (log->td && log == log->td->clat_hist_log)
1090 flush_hist_samples(f, log->hist_coarseness, cur_log->log,
1091 log_sample_sz(log, cur_log));
1093 flush_samples(f, cur_log->log, log_sample_sz(log, cur_log));
1099 clear_file_buffer(buf);
1102 static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
1104 if (td->flags & TD_F_COMPRESS_LOG)
1108 if (fio_trylock_file(log->filename))
1111 fio_lock_file(log->filename);
1113 if (td->client_type == FIO_CLIENT_TYPE_GUI || is_backend)
1114 fio_send_iolog(td, log, log->filename);
1116 flush_log(log, !td->o.per_job_logs);
1118 fio_unlock_file(log->filename);
1123 size_t log_chunk_sizes(struct io_log *log)
1125 struct flist_head *entry;
1128 if (flist_empty(&log->chunk_list))
1132 pthread_mutex_lock(&log->chunk_lock);
1133 flist_for_each(entry, &log->chunk_list) {
1134 struct iolog_compress *c;
1136 c = flist_entry(entry, struct iolog_compress, list);
1139 pthread_mutex_unlock(&log->chunk_lock);
1145 static void iolog_put_deferred(struct io_log *log, void *ptr)
1150 pthread_mutex_lock(&log->deferred_free_lock);
1151 if (log->deferred < IOLOG_MAX_DEFER) {
1152 log->deferred_items[log->deferred] = ptr;
1154 } else if (!fio_did_warn(FIO_WARN_IOLOG_DROP))
1155 log_err("fio: had to drop log entry free\n");
1156 pthread_mutex_unlock(&log->deferred_free_lock);
1159 static void iolog_free_deferred(struct io_log *log)
1166 pthread_mutex_lock(&log->deferred_free_lock);
1168 for (i = 0; i < log->deferred; i++) {
1169 free(log->deferred_items[i]);
1170 log->deferred_items[i] = NULL;
1174 pthread_mutex_unlock(&log->deferred_free_lock);
1177 static int gz_work(struct iolog_flush_data *data)
1179 struct iolog_compress *c = NULL;
1180 struct flist_head list;
1186 INIT_FLIST_HEAD(&list);
1188 memset(&stream, 0, sizeof(stream));
1189 stream.zalloc = Z_NULL;
1190 stream.zfree = Z_NULL;
1191 stream.opaque = Z_NULL;
1193 ret = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
1195 log_err("fio: failed to init gz stream\n");
1199 seq = ++data->log->chunk_seq;
1201 stream.next_in = (void *) data->samples;
1202 stream.avail_in = data->nr_samples * log_entry_sz(data->log);
1204 dprint(FD_COMPRESS, "deflate input size=%lu, seq=%u, log=%s\n",
1205 (unsigned long) stream.avail_in, seq,
1206 data->log->filename);
1209 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1210 (unsigned long) c->len);
1211 c = get_new_chunk(seq);
1212 stream.avail_out = GZ_CHUNK;
1213 stream.next_out = c->buf;
1214 ret = deflate(&stream, Z_NO_FLUSH);
1216 log_err("fio: deflate log (%d)\n", ret);
1221 c->len = GZ_CHUNK - stream.avail_out;
1222 flist_add_tail(&c->list, &list);
1224 } while (stream.avail_in);
1226 stream.next_out = c->buf + c->len;
1227 stream.avail_out = GZ_CHUNK - c->len;
1229 ret = deflate(&stream, Z_FINISH);
1232 * Z_BUF_ERROR is special, it just means we need more
1233 * output space. We'll handle that below. Treat any other
1236 if (ret != Z_BUF_ERROR) {
1237 log_err("fio: deflate log (%d)\n", ret);
1238 flist_del(&c->list);
1245 c->len = GZ_CHUNK - stream.avail_out;
1247 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq, (unsigned long) c->len);
1249 if (ret != Z_STREAM_END) {
1251 c = get_new_chunk(seq);
1252 stream.avail_out = GZ_CHUNK;
1253 stream.next_out = c->buf;
1254 ret = deflate(&stream, Z_FINISH);
1255 c->len = GZ_CHUNK - stream.avail_out;
1257 flist_add_tail(&c->list, &list);
1258 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1259 (unsigned long) c->len);
1260 } while (ret != Z_STREAM_END);
1263 dprint(FD_COMPRESS, "deflated to size=%lu\n", (unsigned long) total);
1265 ret = deflateEnd(&stream);
1267 log_err("fio: deflateEnd %d\n", ret);
1269 iolog_put_deferred(data->log, data->samples);
1271 if (!flist_empty(&list)) {
1272 pthread_mutex_lock(&data->log->chunk_lock);
1273 flist_splice_tail(&list, &data->log->chunk_list);
1274 pthread_mutex_unlock(&data->log->chunk_lock);
1283 while (!flist_empty(&list)) {
1284 c = flist_first_entry(list.next, struct iolog_compress, list);
1285 flist_del(&c->list);
1293 * Invoked from our compress helper thread, when logging would have exceeded
1294 * the specified memory limitation. Compresses the previously stored
1297 static int gz_work_async(struct submit_worker *sw, struct workqueue_work *work)
1299 return gz_work(container_of(work, struct iolog_flush_data, work));
1302 static int gz_init_worker(struct submit_worker *sw)
1304 struct thread_data *td = sw->wq->td;
1306 if (!fio_option_is_set(&td->o, log_gz_cpumask))
1309 if (fio_setaffinity(gettid(), td->o.log_gz_cpumask) == -1) {
1310 log_err("gz: failed to set CPU affinity\n");
1317 static struct workqueue_ops log_compress_wq_ops = {
1318 .fn = gz_work_async,
1319 .init_worker_fn = gz_init_worker,
1323 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1325 if (!(td->flags & TD_F_COMPRESS_LOG))
1328 workqueue_init(td, &td->log_compress_wq, &log_compress_wq_ops, 1, sk_out);
1332 void iolog_compress_exit(struct thread_data *td)
1334 if (!(td->flags & TD_F_COMPRESS_LOG))
1337 workqueue_exit(&td->log_compress_wq);
1341 * Queue work item to compress the existing log entries. We reset the
1342 * current log to a small size, and reference the existing log in the
1343 * data that we queue for compression. Once compression has been done,
1344 * this old log is freed. If called with finish == true, will not return
1345 * until the log compression has completed, and will flush all previous
1348 static int iolog_flush(struct io_log *log)
1350 struct iolog_flush_data *data;
1352 data = malloc(sizeof(*data));
1359 while (!flist_empty(&log->io_logs)) {
1360 struct io_logs *cur_log;
1362 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1363 flist_del_init(&cur_log->list);
1365 data->samples = cur_log->log;
1366 data->nr_samples = cur_log->nr_samples;
1377 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1379 struct iolog_flush_data *data;
1381 data = smalloc(sizeof(*data));
1387 data->samples = cur_log->log;
1388 data->nr_samples = cur_log->nr_samples;
1391 cur_log->nr_samples = cur_log->max_samples = 0;
1392 cur_log->log = NULL;
1394 workqueue_enqueue(&log->td->log_compress_wq, &data->work);
1396 iolog_free_deferred(log);
1402 static int iolog_flush(struct io_log *log)
1407 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1412 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1417 void iolog_compress_exit(struct thread_data *td)
1423 struct io_logs *iolog_cur_log(struct io_log *log)
1425 if (flist_empty(&log->io_logs))
1428 return flist_last_entry(&log->io_logs, struct io_logs, list);
1431 uint64_t iolog_nr_samples(struct io_log *iolog)
1433 struct flist_head *entry;
1436 flist_for_each(entry, &iolog->io_logs) {
1437 struct io_logs *cur_log;
1439 cur_log = flist_entry(entry, struct io_logs, list);
1440 ret += cur_log->nr_samples;
1446 static int __write_log(struct thread_data *td, struct io_log *log, int try)
1449 return finish_log(td, log, try);
1454 static int write_iops_log(struct thread_data *td, int try, bool unit_log)
1458 if (per_unit_log(td->iops_log) != unit_log)
1461 ret = __write_log(td, td->iops_log, try);
1463 td->iops_log = NULL;
1468 static int write_slat_log(struct thread_data *td, int try, bool unit_log)
1475 ret = __write_log(td, td->slat_log, try);
1477 td->slat_log = NULL;
1482 static int write_clat_log(struct thread_data *td, int try, bool unit_log)
1489 ret = __write_log(td, td->clat_log, try);
1491 td->clat_log = NULL;
1496 static int write_clat_hist_log(struct thread_data *td, int try, bool unit_log)
1503 ret = __write_log(td, td->clat_hist_log, try);
1505 td->clat_hist_log = NULL;
1510 static int write_lat_log(struct thread_data *td, int try, bool unit_log)
1517 ret = __write_log(td, td->lat_log, try);
1524 static int write_bandw_log(struct thread_data *td, int try, bool unit_log)
1528 if (per_unit_log(td->bw_log) != unit_log)
1531 ret = __write_log(td, td->bw_log, try);
1544 CLAT_HIST_LOG_MASK = 32,
1551 int (*fn)(struct thread_data *, int, bool);
1554 static struct log_type log_types[] = {
1556 .mask = BW_LOG_MASK,
1557 .fn = write_bandw_log,
1560 .mask = LAT_LOG_MASK,
1561 .fn = write_lat_log,
1564 .mask = SLAT_LOG_MASK,
1565 .fn = write_slat_log,
1568 .mask = CLAT_LOG_MASK,
1569 .fn = write_clat_log,
1572 .mask = IOPS_LOG_MASK,
1573 .fn = write_iops_log,
1576 .mask = CLAT_HIST_LOG_MASK,
1577 .fn = write_clat_hist_log,
1581 void td_writeout_logs(struct thread_data *td, bool unit_logs)
1583 unsigned int log_mask = 0;
1584 unsigned int log_left = ALL_LOG_NR;
1587 old_state = td_bump_runstate(td, TD_FINISHING);
1589 finalize_logs(td, unit_logs);
1592 int prev_log_left = log_left;
1594 for (i = 0; i < ALL_LOG_NR && log_left; i++) {
1595 struct log_type *lt = &log_types[i];
1598 if (!(log_mask & lt->mask)) {
1599 ret = lt->fn(td, log_left != 1, unit_logs);
1602 log_mask |= lt->mask;
1607 if (prev_log_left == log_left)
1611 td_restore_runstate(td, old_state);
1614 void fio_writeout_logs(bool unit_logs)
1616 struct thread_data *td;
1620 td_writeout_logs(td, unit_logs);