2 * Code related to writing an iolog of what a thread is doing, and to
3 * later read that back and replay
23 static int iolog_flush(struct io_log *log);
25 static const char iolog_ver2[] = "fio version 2 iolog";
27 void queue_io_piece(struct thread_data *td, struct io_piece *ipo)
29 flist_add_tail(&ipo->list, &td->io_log_list);
30 td->total_io_size += ipo->len;
33 void log_io_u(const struct thread_data *td, const struct io_u *io_u)
35 if (!td->o.write_iolog_file)
38 fprintf(td->iolog_f, "%s %s %llu %lu\n", io_u->file->file_name,
39 io_ddir_name(io_u->ddir),
40 io_u->offset, io_u->buflen);
43 void log_file(struct thread_data *td, struct fio_file *f,
44 enum file_log_act what)
46 const char *act[] = { "add", "open", "close" };
50 if (!td->o.write_iolog_file)
55 * this happens on the pre-open/close done before the job starts
60 fprintf(td->iolog_f, "%s %s\n", f->file_name, act[what]);
63 static void iolog_delay(struct thread_data *td, unsigned long delay)
65 uint64_t usec = utime_since_now(&td->last_issue);
69 if (delay < td->time_offset) {
74 delay -= td->time_offset;
80 fio_gettime(&tv, NULL);
81 while (delay && !td->terminate) {
83 if (this_delay > 500000)
86 usec_sleep(td, this_delay);
90 usec = utime_since_now(&tv);
92 td->time_offset = usec - delay;
97 static int ipo_special(struct thread_data *td, struct io_piece *ipo)
105 if (ipo->ddir != DDIR_INVAL)
108 f = td->files[ipo->fileno];
110 switch (ipo->file_action) {
111 case FIO_LOG_OPEN_FILE:
112 if (td->o.replay_redirect && fio_file_open(f)) {
113 dprint(FD_FILE, "iolog: ignoring re-open of file %s\n",
117 ret = td_io_open_file(td, f);
120 td_verror(td, ret, "iolog open file");
122 case FIO_LOG_CLOSE_FILE:
123 td_io_close_file(td, f);
125 case FIO_LOG_UNLINK_FILE:
126 td_io_unlink_file(td, f);
129 log_err("fio: bad file action %d\n", ipo->file_action);
136 int read_iolog_get(struct thread_data *td, struct io_u *io_u)
138 struct io_piece *ipo;
139 unsigned long elapsed;
141 while (!flist_empty(&td->io_log_list)) {
144 ipo = flist_first_entry(&td->io_log_list, struct io_piece, list);
145 flist_del(&ipo->list);
146 remove_trim_entry(td, ipo);
148 ret = ipo_special(td, ipo);
152 } else if (ret > 0) {
157 io_u->ddir = ipo->ddir;
158 if (ipo->ddir != DDIR_WAIT) {
159 io_u->offset = ipo->offset;
160 io_u->buflen = ipo->len;
161 io_u->file = td->files[ipo->fileno];
162 get_file(io_u->file);
163 dprint(FD_IO, "iolog: get %llu/%lu/%s\n", io_u->offset,
164 io_u->buflen, io_u->file->file_name);
166 iolog_delay(td, ipo->delay);
168 elapsed = mtime_since_genesis();
169 if (ipo->delay > elapsed)
170 usec_sleep(td, (ipo->delay - elapsed) * 1000);
175 if (io_u->ddir != DDIR_WAIT)
183 void prune_io_piece_log(struct thread_data *td)
185 struct io_piece *ipo;
188 while ((n = rb_first(&td->io_hist_tree)) != NULL) {
189 ipo = rb_entry(n, struct io_piece, rb_node);
190 rb_erase(n, &td->io_hist_tree);
191 remove_trim_entry(td, ipo);
196 while (!flist_empty(&td->io_hist_list)) {
197 ipo = flist_first_entry(&td->io_hist_list, struct io_piece, list);
198 flist_del(&ipo->list);
199 remove_trim_entry(td, ipo);
206 * log a successful write, so we can unwind the log for verify
208 void log_io_piece(struct thread_data *td, struct io_u *io_u)
210 struct rb_node **p, *parent;
211 struct io_piece *ipo, *__ipo;
213 ipo = malloc(sizeof(struct io_piece));
215 ipo->file = io_u->file;
216 ipo->offset = io_u->offset;
217 ipo->len = io_u->buflen;
218 ipo->numberio = io_u->numberio;
219 ipo->flags = IP_F_IN_FLIGHT;
223 if (io_u_should_trim(td, io_u)) {
224 flist_add_tail(&ipo->trim_list, &td->trim_list);
229 * We don't need to sort the entries, if:
231 * Sequential writes, or
232 * Random writes that lay out the file as it goes along
234 * For both these cases, just reading back data in the order we
235 * wrote it out is the fastest.
237 * One exception is if we don't have a random map AND we are doing
238 * verifies, in that case we need to check for duplicate blocks and
239 * drop the old one, which we rely on the rb insert/lookup for
242 if (((!td->o.verifysort) || !td_random(td) || !td->o.overwrite) &&
243 (file_randommap(td, ipo->file) || td->o.verify == VERIFY_NONE)) {
244 INIT_FLIST_HEAD(&ipo->list);
245 flist_add_tail(&ipo->list, &td->io_hist_list);
246 ipo->flags |= IP_F_ONLIST;
251 RB_CLEAR_NODE(&ipo->rb_node);
254 * Sort the entry into the verification list
257 p = &td->io_hist_tree.rb_node;
263 __ipo = rb_entry(parent, struct io_piece, rb_node);
264 if (ipo->file < __ipo->file)
266 else if (ipo->file > __ipo->file)
268 else if (ipo->offset < __ipo->offset) {
270 overlap = ipo->offset + ipo->len > __ipo->offset;
272 else if (ipo->offset > __ipo->offset) {
274 overlap = __ipo->offset + __ipo->len > ipo->offset;
280 dprint(FD_IO, "iolog: overlap %llu/%lu, %llu/%lu",
281 __ipo->offset, __ipo->len,
282 ipo->offset, ipo->len);
284 rb_erase(parent, &td->io_hist_tree);
285 remove_trim_entry(td, __ipo);
291 rb_link_node(&ipo->rb_node, parent, p);
292 rb_insert_color(&ipo->rb_node, &td->io_hist_tree);
293 ipo->flags |= IP_F_ONRB;
297 void unlog_io_piece(struct thread_data *td, struct io_u *io_u)
299 struct io_piece *ipo = io_u->ipo;
301 if (td->ts.nr_block_infos) {
302 uint32_t *info = io_u_block_info(td, io_u);
303 if (BLOCK_INFO_STATE(*info) < BLOCK_STATE_TRIM_FAILURE) {
304 if (io_u->ddir == DDIR_TRIM)
305 *info = BLOCK_INFO_SET_STATE(*info,
306 BLOCK_STATE_TRIM_FAILURE);
307 else if (io_u->ddir == DDIR_WRITE)
308 *info = BLOCK_INFO_SET_STATE(*info,
309 BLOCK_STATE_WRITE_FAILURE);
316 if (ipo->flags & IP_F_ONRB)
317 rb_erase(&ipo->rb_node, &td->io_hist_tree);
318 else if (ipo->flags & IP_F_ONLIST)
319 flist_del(&ipo->list);
326 void trim_io_piece(struct thread_data *td, const struct io_u *io_u)
328 struct io_piece *ipo = io_u->ipo;
333 ipo->len = io_u->xfer_buflen - io_u->resid;
336 void write_iolog_close(struct thread_data *td)
342 td->iolog_buf = NULL;
346 * Read version 2 iolog data. It is enhanced to include per-file logging,
349 static int read_iolog2(struct thread_data *td, FILE *f)
351 unsigned long long offset;
353 int reads, writes, waits, fileno = 0, file_action = 0; /* stupid gcc */
354 char *rfname, *fname, *act;
358 free_release_files(td);
361 * Read in the read iolog and store it, reuse the infrastructure
362 * for doing verifications.
365 rfname = fname = malloc(256+16);
366 act = malloc(256+16);
368 reads = writes = waits = 0;
369 while ((p = fgets(str, 4096, f)) != NULL) {
370 struct io_piece *ipo;
373 r = sscanf(p, "%256s %256s %llu %u", rfname, act, &offset,
376 if (td->o.replay_redirect)
377 fname = td->o.replay_redirect;
383 if (!strcmp(act, "wait"))
385 else if (!strcmp(act, "read"))
387 else if (!strcmp(act, "write"))
389 else if (!strcmp(act, "sync"))
391 else if (!strcmp(act, "datasync"))
393 else if (!strcmp(act, "trim"))
396 log_err("fio: bad iolog file action: %s\n",
400 fileno = get_fileno(td, fname);
403 if (!strcmp(act, "add")) {
404 if (td->o.replay_redirect &&
405 get_fileno(td, fname) != -1) {
406 dprint(FD_FILE, "iolog: ignoring"
407 " re-add of file %s\n", fname);
409 fileno = add_file(td, fname, 0, 1);
410 file_action = FIO_LOG_ADD_FILE;
413 } else if (!strcmp(act, "open")) {
414 fileno = get_fileno(td, fname);
415 file_action = FIO_LOG_OPEN_FILE;
416 } else if (!strcmp(act, "close")) {
417 fileno = get_fileno(td, fname);
418 file_action = FIO_LOG_CLOSE_FILE;
420 log_err("fio: bad iolog file action: %s\n",
425 log_err("bad iolog2: %s", p);
431 else if (rw == DDIR_WRITE) {
433 * Don't add a write for ro mode
438 } else if (rw == DDIR_WAIT) {
442 } else if (rw == DDIR_INVAL) {
443 } else if (!ddir_sync(rw)) {
444 log_err("bad ddir: %d\n", rw);
451 ipo = malloc(sizeof(*ipo));
454 if (rw == DDIR_WAIT) {
457 ipo->offset = offset;
459 if (rw != DDIR_INVAL && bytes > td->o.max_bs[rw])
460 td->o.max_bs[rw] = bytes;
461 ipo->fileno = fileno;
462 ipo->file_action = file_action;
466 queue_io_piece(td, ipo);
473 if (writes && read_only) {
474 log_err("fio: <%s> skips replay of %d writes due to"
475 " read-only\n", td->o.name, writes);
479 if (!reads && !writes && !waits)
481 else if (reads && !writes)
482 td->o.td_ddir = TD_DDIR_READ;
483 else if (!reads && writes)
484 td->o.td_ddir = TD_DDIR_WRITE;
486 td->o.td_ddir = TD_DDIR_RW;
492 * open iolog, check version, and call appropriate parser
494 static int init_iolog_read(struct thread_data *td)
496 char buffer[256], *p;
500 f = fopen(td->o.read_iolog_file, "r");
502 perror("fopen read iolog");
506 p = fgets(buffer, sizeof(buffer), f);
508 td_verror(td, errno, "iolog read");
509 log_err("fio: unable to read iolog\n");
515 * version 2 of the iolog stores a specific string as the
516 * first line, check for that
518 if (!strncmp(iolog_ver2, buffer, strlen(iolog_ver2)))
519 ret = read_iolog2(td, f);
521 log_err("fio: iolog version 1 is no longer supported\n");
530 * Set up a log for storing io patterns.
532 static int init_iolog_write(struct thread_data *td)
538 f = fopen(td->o.write_iolog_file, "a");
540 perror("fopen write iolog");
545 * That's it for writing, setup a log buffer and we're done.
548 td->iolog_buf = malloc(8192);
549 setvbuf(f, td->iolog_buf, _IOFBF, 8192);
552 * write our version line
554 if (fprintf(f, "%s\n", iolog_ver2) < 0) {
555 perror("iolog init\n");
560 * add all known files
562 for_each_file(td, ff, i)
563 log_file(td, ff, FIO_LOG_ADD_FILE);
568 int init_iolog(struct thread_data *td)
572 if (td->o.read_iolog_file) {
576 * Check if it's a blktrace file and load that if possible.
577 * Otherwise assume it's a normal log file and load that.
579 if (is_blktrace(td->o.read_iolog_file, &need_swap))
580 ret = load_blktrace(td, td->o.read_iolog_file, need_swap);
582 ret = init_iolog_read(td);
583 } else if (td->o.write_iolog_file)
584 ret = init_iolog_write(td);
587 td_verror(td, EINVAL, "failed initializing iolog");
592 void setup_log(struct io_log **log, struct log_params *p,
593 const char *filename)
597 struct io_u_plat_entry *entry;
598 struct flist_head *list;
600 l = scalloc(1, sizeof(*l));
601 INIT_FLIST_HEAD(&l->io_logs);
602 l->log_type = p->log_type;
603 l->log_offset = p->log_offset;
604 l->log_gz = p->log_gz;
605 l->log_gz_store = p->log_gz_store;
606 l->avg_msec = p->avg_msec;
607 l->hist_msec = p->hist_msec;
608 l->hist_coarseness = p->hist_coarseness;
609 l->filename = strdup(filename);
612 /* Initialize histogram lists for each r/w direction,
613 * with initial io_u_plat of all zeros:
615 for (i = 0; i < DDIR_RWDIR_CNT; i++) {
616 list = &l->hist_window[i].list;
617 INIT_FLIST_HEAD(list);
618 entry = calloc(1, sizeof(struct io_u_plat_entry));
619 flist_add(&entry->list, list);
622 if (l->td && l->td->o.io_submit_mode != IO_MODE_OFFLOAD) {
625 p = calloc(1, sizeof(*l->pending));
626 p->max_samples = DEF_LOG_ENTRIES;
627 p->log = calloc(p->max_samples, log_entry_sz(l));
632 l->log_ddir_mask = LOG_OFFSET_SAMPLE_BIT;
634 INIT_FLIST_HEAD(&l->chunk_list);
636 if (l->log_gz && !p->td)
638 else if (l->log_gz || l->log_gz_store) {
639 mutex_init_pshared(&l->chunk_lock);
640 p->td->flags |= TD_F_COMPRESS_LOG;
646 #ifdef CONFIG_SETVBUF
647 static void *set_file_buffer(FILE *f)
649 size_t size = 1048576;
653 setvbuf(f, buf, _IOFBF, size);
657 static void clear_file_buffer(void *buf)
662 static void *set_file_buffer(FILE *f)
667 static void clear_file_buffer(void *buf)
672 void free_log(struct io_log *log)
674 while (!flist_empty(&log->io_logs)) {
675 struct io_logs *cur_log;
677 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
678 flist_del_init(&cur_log->list);
684 free(log->pending->log);
694 inline unsigned long hist_sum(int j, int stride, unsigned int *io_u_plat,
695 unsigned int *io_u_plat_last)
700 if (io_u_plat_last) {
701 for (k = sum = 0; k < stride; k++)
702 sum += io_u_plat[j + k] - io_u_plat_last[j + k];
704 for (k = sum = 0; k < stride; k++)
705 sum += io_u_plat[j + k];
711 static void flush_hist_samples(FILE *f, int hist_coarseness, void *samples,
712 uint64_t sample_size)
716 uint64_t i, j, nr_samples;
717 struct io_u_plat_entry *entry, *entry_before;
718 unsigned int *io_u_plat;
719 unsigned int *io_u_plat_before;
721 int stride = 1 << hist_coarseness;
726 s = __get_sample(samples, 0, 0);
727 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
729 nr_samples = sample_size / __log_entry_sz(log_offset);
731 for (i = 0; i < nr_samples; i++) {
732 s = __get_sample(samples, log_offset, i);
734 entry = s->data.plat_entry;
735 io_u_plat = entry->io_u_plat;
737 entry_before = flist_first_entry(&entry->list, struct io_u_plat_entry, list);
738 io_u_plat_before = entry_before->io_u_plat;
740 fprintf(f, "%lu, %u, %u, ", (unsigned long) s->time,
741 io_sample_ddir(s), s->bs);
742 for (j = 0; j < FIO_IO_U_PLAT_NR - stride; j += stride) {
743 fprintf(f, "%lu, ", hist_sum(j, stride, io_u_plat,
746 fprintf(f, "%lu\n", (unsigned long)
747 hist_sum(FIO_IO_U_PLAT_NR - stride, stride, io_u_plat,
750 flist_del(&entry_before->list);
755 void flush_samples(FILE *f, void *samples, uint64_t sample_size)
759 uint64_t i, nr_samples;
764 s = __get_sample(samples, 0, 0);
765 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
767 nr_samples = sample_size / __log_entry_sz(log_offset);
769 for (i = 0; i < nr_samples; i++) {
770 s = __get_sample(samples, log_offset, i);
773 fprintf(f, "%lu, %" PRId64 ", %u, %u\n",
774 (unsigned long) s->time,
776 io_sample_ddir(s), s->bs);
778 struct io_sample_offset *so = (void *) s;
780 fprintf(f, "%lu, %" PRId64 ", %u, %u, %llu\n",
781 (unsigned long) s->time,
783 io_sample_ddir(s), s->bs,
784 (unsigned long long) so->offset);
791 struct iolog_flush_data {
792 struct workqueue_work work;
799 #define GZ_CHUNK 131072
801 static struct iolog_compress *get_new_chunk(unsigned int seq)
803 struct iolog_compress *c;
805 c = malloc(sizeof(*c));
806 INIT_FLIST_HEAD(&c->list);
807 c->buf = malloc(GZ_CHUNK);
813 static void free_chunk(struct iolog_compress *ic)
819 static int z_stream_init(z_stream *stream, int gz_hdr)
823 memset(stream, 0, sizeof(*stream));
824 stream->zalloc = Z_NULL;
825 stream->zfree = Z_NULL;
826 stream->opaque = Z_NULL;
827 stream->next_in = Z_NULL;
830 * zlib magic - add 32 for auto-detection of gz header or not,
831 * if we decide to store files in a gzip friendly format.
836 if (inflateInit2(stream, wbits) != Z_OK)
842 struct inflate_chunk_iter {
851 static void finish_chunk(z_stream *stream, FILE *f,
852 struct inflate_chunk_iter *iter)
856 ret = inflateEnd(stream);
858 log_err("fio: failed to end log inflation seq %d (%d)\n",
861 flush_samples(f, iter->buf, iter->buf_used);
864 iter->buf_size = iter->buf_used = 0;
868 * Iterative chunk inflation. Handles cases where we cross into a new
869 * sequence, doing flush finish of previous chunk if needed.
871 static size_t inflate_chunk(struct iolog_compress *ic, int gz_hdr, FILE *f,
872 z_stream *stream, struct inflate_chunk_iter *iter)
876 dprint(FD_COMPRESS, "inflate chunk size=%lu, seq=%u\n",
877 (unsigned long) ic->len, ic->seq);
879 if (ic->seq != iter->seq) {
881 finish_chunk(stream, f, iter);
883 z_stream_init(stream, gz_hdr);
887 stream->avail_in = ic->len;
888 stream->next_in = ic->buf;
890 if (!iter->buf_size) {
891 iter->buf_size = iter->chunk_sz;
892 iter->buf = malloc(iter->buf_size);
895 while (stream->avail_in) {
896 size_t this_out = iter->buf_size - iter->buf_used;
899 stream->avail_out = this_out;
900 stream->next_out = iter->buf + iter->buf_used;
902 err = inflate(stream, Z_NO_FLUSH);
904 log_err("fio: failed inflating log: %d\n", err);
909 iter->buf_used += this_out - stream->avail_out;
911 if (!stream->avail_out) {
912 iter->buf_size += iter->chunk_sz;
913 iter->buf = realloc(iter->buf, iter->buf_size);
917 if (err == Z_STREAM_END)
921 ret = (void *) stream->next_in - ic->buf;
923 dprint(FD_COMPRESS, "inflated to size=%lu\n", (unsigned long) iter->buf_size);
929 * Inflate stored compressed chunks, or write them directly to the log
930 * file if so instructed.
932 static int inflate_gz_chunks(struct io_log *log, FILE *f)
934 struct inflate_chunk_iter iter = { .chunk_sz = log->log_gz, };
937 while (!flist_empty(&log->chunk_list)) {
938 struct iolog_compress *ic;
940 ic = flist_first_entry(&log->chunk_list, struct iolog_compress, list);
941 flist_del(&ic->list);
943 if (log->log_gz_store) {
946 dprint(FD_COMPRESS, "log write chunk size=%lu, "
947 "seq=%u\n", (unsigned long) ic->len, ic->seq);
949 ret = fwrite(ic->buf, ic->len, 1, f);
950 if (ret != 1 || ferror(f)) {
952 log_err("fio: error writing compressed log\n");
955 inflate_chunk(ic, log->log_gz_store, f, &stream, &iter);
961 finish_chunk(&stream, f, &iter);
969 * Open compressed log file and decompress the stored chunks and
970 * write them to stdout. The chunks are stored sequentially in the
971 * file, so we iterate over them and do them one-by-one.
973 int iolog_file_inflate(const char *file)
975 struct inflate_chunk_iter iter = { .chunk_sz = 64 * 1024 * 1024, };
976 struct iolog_compress ic;
984 f = fopen(file, "r");
990 if (stat(file, &sb) < 0) {
996 ic.buf = buf = malloc(sb.st_size);
1000 ret = fread(ic.buf, ic.len, 1, f);
1006 } else if (ret != 1) {
1007 log_err("fio: short read on reading log\n");
1016 * Each chunk will return Z_STREAM_END. We don't know how many
1017 * chunks are in the file, so we just keep looping and incrementing
1018 * the sequence number until we have consumed the whole compressed
1025 iret = inflate_chunk(&ic, 1, stdout, &stream, &iter);
1038 finish_chunk(&stream, stdout, &iter);
1048 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1053 int iolog_file_inflate(const char *file)
1055 log_err("fio: log inflation not possible without zlib\n");
1061 void flush_log(struct io_log *log, bool do_append)
1067 f = fopen(log->filename, "w");
1069 f = fopen(log->filename, "a");
1071 perror("fopen log");
1075 buf = set_file_buffer(f);
1077 inflate_gz_chunks(log, f);
1079 while (!flist_empty(&log->io_logs)) {
1080 struct io_logs *cur_log;
1082 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1083 flist_del_init(&cur_log->list);
1085 if (log->td && log == log->td->clat_hist_log)
1086 flush_hist_samples(f, log->hist_coarseness, cur_log->log,
1087 log_sample_sz(log, cur_log));
1089 flush_samples(f, cur_log->log, log_sample_sz(log, cur_log));
1095 clear_file_buffer(buf);
1098 static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
1100 if (td->flags & TD_F_COMPRESS_LOG)
1104 if (fio_trylock_file(log->filename))
1107 fio_lock_file(log->filename);
1109 if (td->client_type == FIO_CLIENT_TYPE_GUI || is_backend)
1110 fio_send_iolog(td, log, log->filename);
1112 flush_log(log, !td->o.per_job_logs);
1114 fio_unlock_file(log->filename);
1119 size_t log_chunk_sizes(struct io_log *log)
1121 struct flist_head *entry;
1124 if (flist_empty(&log->chunk_list))
1128 pthread_mutex_lock(&log->chunk_lock);
1129 flist_for_each(entry, &log->chunk_list) {
1130 struct iolog_compress *c;
1132 c = flist_entry(entry, struct iolog_compress, list);
1135 pthread_mutex_unlock(&log->chunk_lock);
1141 static int gz_work(struct iolog_flush_data *data)
1143 struct iolog_compress *c = NULL;
1144 struct flist_head list;
1150 INIT_FLIST_HEAD(&list);
1152 memset(&stream, 0, sizeof(stream));
1153 stream.zalloc = Z_NULL;
1154 stream.zfree = Z_NULL;
1155 stream.opaque = Z_NULL;
1157 ret = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
1159 log_err("fio: failed to init gz stream\n");
1163 seq = ++data->log->chunk_seq;
1165 stream.next_in = (void *) data->samples;
1166 stream.avail_in = data->nr_samples * log_entry_sz(data->log);
1168 dprint(FD_COMPRESS, "deflate input size=%lu, seq=%u, log=%s\n",
1169 (unsigned long) stream.avail_in, seq,
1170 data->log->filename);
1173 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1174 (unsigned long) c->len);
1175 c = get_new_chunk(seq);
1176 stream.avail_out = GZ_CHUNK;
1177 stream.next_out = c->buf;
1178 ret = deflate(&stream, Z_NO_FLUSH);
1180 log_err("fio: deflate log (%d)\n", ret);
1185 c->len = GZ_CHUNK - stream.avail_out;
1186 flist_add_tail(&c->list, &list);
1188 } while (stream.avail_in);
1190 stream.next_out = c->buf + c->len;
1191 stream.avail_out = GZ_CHUNK - c->len;
1193 ret = deflate(&stream, Z_FINISH);
1196 * Z_BUF_ERROR is special, it just means we need more
1197 * output space. We'll handle that below. Treat any other
1200 if (ret != Z_BUF_ERROR) {
1201 log_err("fio: deflate log (%d)\n", ret);
1202 flist_del(&c->list);
1209 c->len = GZ_CHUNK - stream.avail_out;
1211 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq, (unsigned long) c->len);
1213 if (ret != Z_STREAM_END) {
1215 c = get_new_chunk(seq);
1216 stream.avail_out = GZ_CHUNK;
1217 stream.next_out = c->buf;
1218 ret = deflate(&stream, Z_FINISH);
1219 c->len = GZ_CHUNK - stream.avail_out;
1221 flist_add_tail(&c->list, &list);
1222 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1223 (unsigned long) c->len);
1224 } while (ret != Z_STREAM_END);
1227 dprint(FD_COMPRESS, "deflated to size=%lu\n", (unsigned long) total);
1229 ret = deflateEnd(&stream);
1231 log_err("fio: deflateEnd %d\n", ret);
1233 free(data->samples);
1235 if (!flist_empty(&list)) {
1236 pthread_mutex_lock(&data->log->chunk_lock);
1237 flist_splice_tail(&list, &data->log->chunk_list);
1238 pthread_mutex_unlock(&data->log->chunk_lock);
1247 while (!flist_empty(&list)) {
1248 c = flist_first_entry(list.next, struct iolog_compress, list);
1249 flist_del(&c->list);
1257 * Invoked from our compress helper thread, when logging would have exceeded
1258 * the specified memory limitation. Compresses the previously stored
1261 static int gz_work_async(struct submit_worker *sw, struct workqueue_work *work)
1263 return gz_work(container_of(work, struct iolog_flush_data, work));
1266 static int gz_init_worker(struct submit_worker *sw)
1268 struct thread_data *td = sw->wq->td;
1270 if (!fio_option_is_set(&td->o, log_gz_cpumask))
1273 if (fio_setaffinity(gettid(), td->o.log_gz_cpumask) == -1) {
1274 log_err("gz: failed to set CPU affinity\n");
1281 static struct workqueue_ops log_compress_wq_ops = {
1282 .fn = gz_work_async,
1283 .init_worker_fn = gz_init_worker,
1287 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1289 if (!(td->flags & TD_F_COMPRESS_LOG))
1292 workqueue_init(td, &td->log_compress_wq, &log_compress_wq_ops, 1, sk_out);
1296 void iolog_compress_exit(struct thread_data *td)
1298 if (!(td->flags & TD_F_COMPRESS_LOG))
1301 workqueue_exit(&td->log_compress_wq);
1305 * Queue work item to compress the existing log entries. We reset the
1306 * current log to a small size, and reference the existing log in the
1307 * data that we queue for compression. Once compression has been done,
1308 * this old log is freed. If called with finish == true, will not return
1309 * until the log compression has completed, and will flush all previous
1312 static int iolog_flush(struct io_log *log)
1314 struct iolog_flush_data *data;
1316 data = malloc(sizeof(*data));
1323 while (!flist_empty(&log->io_logs)) {
1324 struct io_logs *cur_log;
1326 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1327 flist_del_init(&cur_log->list);
1329 data->samples = cur_log->log;
1330 data->nr_samples = cur_log->nr_samples;
1341 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1343 struct iolog_flush_data *data;
1345 data = malloc(sizeof(*data));
1351 data->samples = cur_log->log;
1352 data->nr_samples = cur_log->nr_samples;
1355 cur_log->nr_samples = cur_log->max_samples = 0;
1356 cur_log->log = NULL;
1358 workqueue_enqueue(&log->td->log_compress_wq, &data->work);
1363 static int iolog_flush(struct io_log *log)
1368 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1373 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1378 void iolog_compress_exit(struct thread_data *td)
1384 struct io_logs *iolog_cur_log(struct io_log *log)
1386 if (flist_empty(&log->io_logs))
1389 return flist_last_entry(&log->io_logs, struct io_logs, list);
1392 uint64_t iolog_nr_samples(struct io_log *iolog)
1394 struct flist_head *entry;
1397 flist_for_each(entry, &iolog->io_logs) {
1398 struct io_logs *cur_log;
1400 cur_log = flist_entry(entry, struct io_logs, list);
1401 ret += cur_log->nr_samples;
1407 static int __write_log(struct thread_data *td, struct io_log *log, int try)
1410 return finish_log(td, log, try);
1415 static int write_iops_log(struct thread_data *td, int try, bool unit_log)
1419 if (per_unit_log(td->iops_log) != unit_log)
1422 ret = __write_log(td, td->iops_log, try);
1424 td->iops_log = NULL;
1429 static int write_slat_log(struct thread_data *td, int try, bool unit_log)
1436 ret = __write_log(td, td->slat_log, try);
1438 td->slat_log = NULL;
1443 static int write_clat_log(struct thread_data *td, int try, bool unit_log)
1450 ret = __write_log(td, td->clat_log, try);
1452 td->clat_log = NULL;
1457 static int write_clat_hist_log(struct thread_data *td, int try, bool unit_log)
1464 ret = __write_log(td, td->clat_hist_log, try);
1466 td->clat_hist_log = NULL;
1471 static int write_lat_log(struct thread_data *td, int try, bool unit_log)
1478 ret = __write_log(td, td->lat_log, try);
1485 static int write_bandw_log(struct thread_data *td, int try, bool unit_log)
1489 if (per_unit_log(td->bw_log) != unit_log)
1492 ret = __write_log(td, td->bw_log, try);
1505 CLAT_HIST_LOG_MASK = 32,
1512 int (*fn)(struct thread_data *, int, bool);
1515 static struct log_type log_types[] = {
1517 .mask = BW_LOG_MASK,
1518 .fn = write_bandw_log,
1521 .mask = LAT_LOG_MASK,
1522 .fn = write_lat_log,
1525 .mask = SLAT_LOG_MASK,
1526 .fn = write_slat_log,
1529 .mask = CLAT_LOG_MASK,
1530 .fn = write_clat_log,
1533 .mask = IOPS_LOG_MASK,
1534 .fn = write_iops_log,
1537 .mask = CLAT_HIST_LOG_MASK,
1538 .fn = write_clat_hist_log,
1542 void td_writeout_logs(struct thread_data *td, bool unit_logs)
1544 unsigned int log_mask = 0;
1545 unsigned int log_left = ALL_LOG_NR;
1548 old_state = td_bump_runstate(td, TD_FINISHING);
1550 finalize_logs(td, unit_logs);
1553 int prev_log_left = log_left;
1555 for (i = 0; i < ALL_LOG_NR && log_left; i++) {
1556 struct log_type *lt = &log_types[i];
1559 if (!(log_mask & lt->mask)) {
1560 ret = lt->fn(td, log_left != 1, unit_logs);
1563 log_mask |= lt->mask;
1568 if (prev_log_left == log_left)
1572 td_restore_runstate(td, old_state);
1575 void fio_writeout_logs(bool unit_logs)
1577 struct thread_data *td;
1581 td_writeout_logs(td, unit_logs);