2 * Code related to writing an iolog of what a thread is doing, and to
3 * later read that back and replay
23 static int iolog_flush(struct io_log *log);
25 static const char iolog_ver2[] = "fio version 2 iolog";
27 void queue_io_piece(struct thread_data *td, struct io_piece *ipo)
29 flist_add_tail(&ipo->list, &td->io_log_list);
30 td->total_io_size += ipo->len;
33 void log_io_u(const struct thread_data *td, const struct io_u *io_u)
35 if (!td->o.write_iolog_file)
38 fprintf(td->iolog_f, "%s %s %llu %lu\n", io_u->file->file_name,
39 io_ddir_name(io_u->ddir),
40 io_u->offset, io_u->buflen);
43 void log_file(struct thread_data *td, struct fio_file *f,
44 enum file_log_act what)
46 const char *act[] = { "add", "open", "close" };
50 if (!td->o.write_iolog_file)
55 * this happens on the pre-open/close done before the job starts
60 fprintf(td->iolog_f, "%s %s\n", f->file_name, act[what]);
63 static void iolog_delay(struct thread_data *td, unsigned long delay)
65 uint64_t usec = utime_since_now(&td->last_issue);
66 unsigned long orig_delay = delay;
70 if (delay < td->time_offset) {
75 delay -= td->time_offset;
81 fio_gettime(&ts, NULL);
82 while (delay && !td->terminate) {
84 if (this_delay > 500000)
87 usec_sleep(td, this_delay);
91 usec = utime_since_now(&ts);
92 if (usec > orig_delay)
93 td->time_offset = usec - orig_delay;
98 static int ipo_special(struct thread_data *td, struct io_piece *ipo)
106 if (ipo->ddir != DDIR_INVAL)
109 f = td->files[ipo->fileno];
111 switch (ipo->file_action) {
112 case FIO_LOG_OPEN_FILE:
113 if (td->o.replay_redirect && fio_file_open(f)) {
114 dprint(FD_FILE, "iolog: ignoring re-open of file %s\n",
118 ret = td_io_open_file(td, f);
121 td_verror(td, ret, "iolog open file");
123 case FIO_LOG_CLOSE_FILE:
124 td_io_close_file(td, f);
126 case FIO_LOG_UNLINK_FILE:
127 td_io_unlink_file(td, f);
130 log_err("fio: bad file action %d\n", ipo->file_action);
137 static bool read_iolog2(struct thread_data *td);
139 int read_iolog_get(struct thread_data *td, struct io_u *io_u)
141 struct io_piece *ipo;
142 unsigned long elapsed;
144 while (!flist_empty(&td->io_log_list)) {
146 if (td->o.read_iolog_chunked) {
147 if (td->io_log_checkmark == td->io_log_current) {
148 if (!read_iolog2(td))
151 td->io_log_current--;
153 ipo = flist_first_entry(&td->io_log_list, struct io_piece, list);
154 flist_del(&ipo->list);
155 remove_trim_entry(td, ipo);
157 ret = ipo_special(td, ipo);
161 } else if (ret > 0) {
166 io_u->ddir = ipo->ddir;
167 if (ipo->ddir != DDIR_WAIT) {
168 io_u->offset = ipo->offset;
169 io_u->buflen = ipo->len;
170 io_u->file = td->files[ipo->fileno];
171 get_file(io_u->file);
172 dprint(FD_IO, "iolog: get %llu/%lu/%s\n", io_u->offset,
173 io_u->buflen, io_u->file->file_name);
175 iolog_delay(td, ipo->delay);
177 elapsed = mtime_since_genesis();
178 if (ipo->delay > elapsed)
179 usec_sleep(td, (ipo->delay - elapsed) * 1000);
184 if (io_u->ddir != DDIR_WAIT)
192 void prune_io_piece_log(struct thread_data *td)
194 struct io_piece *ipo;
195 struct fio_rb_node *n;
197 while ((n = rb_first(&td->io_hist_tree)) != NULL) {
198 ipo = rb_entry(n, struct io_piece, rb_node);
199 rb_erase(n, &td->io_hist_tree);
200 remove_trim_entry(td, ipo);
205 while (!flist_empty(&td->io_hist_list)) {
206 ipo = flist_first_entry(&td->io_hist_list, struct io_piece, list);
207 flist_del(&ipo->list);
208 remove_trim_entry(td, ipo);
215 * log a successful write, so we can unwind the log for verify
217 void log_io_piece(struct thread_data *td, struct io_u *io_u)
219 struct fio_rb_node **p, *parent;
220 struct io_piece *ipo, *__ipo;
222 ipo = calloc(1, sizeof(struct io_piece));
224 ipo->file = io_u->file;
225 ipo->offset = io_u->offset;
226 ipo->len = io_u->buflen;
227 ipo->numberio = io_u->numberio;
228 ipo->flags = IP_F_IN_FLIGHT;
232 if (io_u_should_trim(td, io_u)) {
233 flist_add_tail(&ipo->trim_list, &td->trim_list);
238 * Only sort writes if we don't have a random map in which case we need
239 * to check for duplicate blocks and drop the old one, which we rely on
240 * the rb insert/lookup for handling.
242 if (file_randommap(td, ipo->file)) {
243 INIT_FLIST_HEAD(&ipo->list);
244 flist_add_tail(&ipo->list, &td->io_hist_list);
245 ipo->flags |= IP_F_ONLIST;
250 RB_CLEAR_NODE(&ipo->rb_node);
253 * Sort the entry into the verification list
256 p = &td->io_hist_tree.rb_node;
262 __ipo = rb_entry(parent, struct io_piece, rb_node);
263 if (ipo->file < __ipo->file)
265 else if (ipo->file > __ipo->file)
267 else if (ipo->offset < __ipo->offset) {
269 overlap = ipo->offset + ipo->len > __ipo->offset;
271 else if (ipo->offset > __ipo->offset) {
273 overlap = __ipo->offset + __ipo->len > ipo->offset;
279 dprint(FD_IO, "iolog: overlap %llu/%lu, %llu/%lu\n",
280 __ipo->offset, __ipo->len,
281 ipo->offset, ipo->len);
283 rb_erase(parent, &td->io_hist_tree);
284 remove_trim_entry(td, __ipo);
285 if (!(__ipo->flags & IP_F_IN_FLIGHT))
291 rb_link_node(&ipo->rb_node, parent, p);
292 rb_insert_color(&ipo->rb_node, &td->io_hist_tree);
293 ipo->flags |= IP_F_ONRB;
297 void unlog_io_piece(struct thread_data *td, struct io_u *io_u)
299 struct io_piece *ipo = io_u->ipo;
301 if (td->ts.nr_block_infos) {
302 uint32_t *info = io_u_block_info(td, io_u);
303 if (BLOCK_INFO_STATE(*info) < BLOCK_STATE_TRIM_FAILURE) {
304 if (io_u->ddir == DDIR_TRIM)
305 *info = BLOCK_INFO_SET_STATE(*info,
306 BLOCK_STATE_TRIM_FAILURE);
307 else if (io_u->ddir == DDIR_WRITE)
308 *info = BLOCK_INFO_SET_STATE(*info,
309 BLOCK_STATE_WRITE_FAILURE);
316 if (ipo->flags & IP_F_ONRB)
317 rb_erase(&ipo->rb_node, &td->io_hist_tree);
318 else if (ipo->flags & IP_F_ONLIST)
319 flist_del(&ipo->list);
326 void trim_io_piece(const struct io_u *io_u)
328 struct io_piece *ipo = io_u->ipo;
333 ipo->len = io_u->xfer_buflen - io_u->resid;
336 void write_iolog_close(struct thread_data *td)
342 td->iolog_buf = NULL;
346 * Read version 2 iolog data. It is enhanced to include per-file logging,
349 static bool read_iolog2(struct thread_data *td)
351 unsigned long long offset;
353 int reads, writes, waits, fileno = 0, file_action = 0; /* stupid gcc */
354 char *rfname, *fname, *act;
357 int64_t items_to_fetch = 0;
359 if (td->o.read_iolog_chunked) {
360 if (td->io_log_highmark == 0) {
366 fio_gettime(&now, NULL);
367 elapsed = ntime_since(&td->io_log_highmark_time, &now);
368 for_1s = (td->io_log_highmark - td->io_log_current) * 1000000000 / elapsed;
369 items_to_fetch = for_1s - td->io_log_current;
370 if (items_to_fetch < 0)
372 td->io_log_highmark = td->io_log_current + items_to_fetch;
373 td->io_log_checkmark = (td->io_log_highmark + 1) / 2;
374 fio_gettime(&td->io_log_highmark_time, NULL);
375 if (items_to_fetch == 0)
380 * Read in the read iolog and store it, reuse the infrastructure
381 * for doing verifications.
384 rfname = fname = malloc(256+16);
385 act = malloc(256+16);
387 reads = writes = waits = 0;
388 while ((p = fgets(str, 4096, td->io_log_rfile)) != NULL) {
389 struct io_piece *ipo;
392 r = sscanf(p, "%256s %256s %llu %u", rfname, act, &offset,
395 if (td->o.replay_redirect)
396 fname = td->o.replay_redirect;
402 if (!strcmp(act, "wait"))
404 else if (!strcmp(act, "read"))
406 else if (!strcmp(act, "write"))
408 else if (!strcmp(act, "sync"))
410 else if (!strcmp(act, "datasync"))
412 else if (!strcmp(act, "trim"))
415 log_err("fio: bad iolog file action: %s\n",
419 fileno = get_fileno(td, fname);
422 if (!strcmp(act, "add")) {
423 if (td->o.replay_redirect &&
424 get_fileno(td, fname) != -1) {
425 dprint(FD_FILE, "iolog: ignoring"
426 " re-add of file %s\n", fname);
428 fileno = add_file(td, fname, 0, 1);
429 file_action = FIO_LOG_ADD_FILE;
432 } else if (!strcmp(act, "open")) {
433 fileno = get_fileno(td, fname);
434 file_action = FIO_LOG_OPEN_FILE;
435 } else if (!strcmp(act, "close")) {
436 fileno = get_fileno(td, fname);
437 file_action = FIO_LOG_CLOSE_FILE;
439 log_err("fio: bad iolog file action: %s\n",
444 log_err("bad iolog2: %s\n", p);
450 else if (rw == DDIR_WRITE) {
452 * Don't add a write for ro mode
457 } else if (rw == DDIR_WAIT) {
461 } else if (rw == DDIR_INVAL) {
462 } else if (!ddir_sync(rw)) {
463 log_err("bad ddir: %d\n", rw);
470 ipo = calloc(1, sizeof(*ipo));
473 if (rw == DDIR_WAIT) {
476 if (td->o.replay_scale)
477 ipo->offset = offset / td->o.replay_scale;
479 ipo->offset = offset;
480 ipo_bytes_align(td->o.replay_align, ipo);
483 if (rw != DDIR_INVAL && bytes > td->o.max_bs[rw])
484 td->o.max_bs[rw] = bytes;
485 ipo->fileno = fileno;
486 ipo->file_action = file_action;
490 queue_io_piece(td, ipo);
492 if (td->o.read_iolog_chunked) {
493 td->io_log_current++;
495 if (items_to_fetch == 0)
504 if (td->o.read_iolog_chunked) {
505 td->io_log_highmark = td->io_log_current;
506 td->io_log_checkmark = (td->io_log_highmark + 1) / 2;
507 fio_gettime(&td->io_log_highmark_time, NULL);
510 if (writes && read_only) {
511 log_err("fio: <%s> skips replay of %d writes due to"
512 " read-only\n", td->o.name, writes);
516 if (td->o.read_iolog_chunked) {
517 if (td->io_log_current == 0) {
520 td->o.td_ddir = TD_DDIR_RW;
524 if (!reads && !writes && !waits)
526 else if (reads && !writes)
527 td->o.td_ddir = TD_DDIR_READ;
528 else if (!reads && writes)
529 td->o.td_ddir = TD_DDIR_WRITE;
531 td->o.td_ddir = TD_DDIR_RW;
537 * open iolog, check version, and call appropriate parser
539 static bool init_iolog_read(struct thread_data *td)
541 char buffer[256], *p;
545 f = fopen(td->o.read_iolog_file, "r");
547 perror("fopen read iolog");
551 p = fgets(buffer, sizeof(buffer), f);
553 td_verror(td, errno, "iolog read");
554 log_err("fio: unable to read iolog\n");
558 td->io_log_rfile = f;
560 * version 2 of the iolog stores a specific string as the
561 * first line, check for that
563 if (!strncmp(iolog_ver2, buffer, strlen(iolog_ver2))) {
564 free_release_files(td);
565 ret = read_iolog2(td);
568 log_err("fio: iolog version 1 is no longer supported\n");
576 * Set up a log for storing io patterns.
578 static bool init_iolog_write(struct thread_data *td)
584 f = fopen(td->o.write_iolog_file, "a");
586 perror("fopen write iolog");
591 * That's it for writing, setup a log buffer and we're done.
594 td->iolog_buf = malloc(8192);
595 setvbuf(f, td->iolog_buf, _IOFBF, 8192);
598 * write our version line
600 if (fprintf(f, "%s\n", iolog_ver2) < 0) {
601 perror("iolog init\n");
606 * add all known files
608 for_each_file(td, ff, i)
609 log_file(td, ff, FIO_LOG_ADD_FILE);
614 bool init_iolog(struct thread_data *td)
618 if (td->o.read_iolog_file) {
622 * Check if it's a blktrace file and load that if possible.
623 * Otherwise assume it's a normal log file and load that.
625 if (is_blktrace(td->o.read_iolog_file, &need_swap))
626 ret = load_blktrace(td, td->o.read_iolog_file, need_swap);
628 ret = init_iolog_read(td);
629 } else if (td->o.write_iolog_file)
630 ret = init_iolog_write(td);
635 td_verror(td, EINVAL, "failed initializing iolog");
640 void setup_log(struct io_log **log, struct log_params *p,
641 const char *filename)
645 struct io_u_plat_entry *entry;
646 struct flist_head *list;
648 l = scalloc(1, sizeof(*l));
649 INIT_FLIST_HEAD(&l->io_logs);
650 l->log_type = p->log_type;
651 l->log_offset = p->log_offset;
652 l->log_gz = p->log_gz;
653 l->log_gz_store = p->log_gz_store;
654 l->avg_msec = p->avg_msec;
655 l->hist_msec = p->hist_msec;
656 l->hist_coarseness = p->hist_coarseness;
657 l->filename = strdup(filename);
660 /* Initialize histogram lists for each r/w direction,
661 * with initial io_u_plat of all zeros:
663 for (i = 0; i < DDIR_RWDIR_CNT; i++) {
664 list = &l->hist_window[i].list;
665 INIT_FLIST_HEAD(list);
666 entry = calloc(1, sizeof(struct io_u_plat_entry));
667 flist_add(&entry->list, list);
670 if (l->td && l->td->o.io_submit_mode != IO_MODE_OFFLOAD) {
673 __p = calloc(1, sizeof(*l->pending));
674 __p->max_samples = DEF_LOG_ENTRIES;
675 __p->log = calloc(__p->max_samples, log_entry_sz(l));
680 l->log_ddir_mask = LOG_OFFSET_SAMPLE_BIT;
682 INIT_FLIST_HEAD(&l->chunk_list);
684 if (l->log_gz && !p->td)
686 else if (l->log_gz || l->log_gz_store) {
687 mutex_init_pshared(&l->chunk_lock);
688 mutex_init_pshared(&l->deferred_free_lock);
689 p->td->flags |= TD_F_COMPRESS_LOG;
695 #ifdef CONFIG_SETVBUF
696 static void *set_file_buffer(FILE *f)
698 size_t size = 1048576;
702 setvbuf(f, buf, _IOFBF, size);
706 static void clear_file_buffer(void *buf)
711 static void *set_file_buffer(FILE *f)
716 static void clear_file_buffer(void *buf)
721 void free_log(struct io_log *log)
723 while (!flist_empty(&log->io_logs)) {
724 struct io_logs *cur_log;
726 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
727 flist_del_init(&cur_log->list);
733 free(log->pending->log);
743 uint64_t hist_sum(int j, int stride, uint64_t *io_u_plat,
744 uint64_t *io_u_plat_last)
749 if (io_u_plat_last) {
750 for (k = sum = 0; k < stride; k++)
751 sum += io_u_plat[j + k] - io_u_plat_last[j + k];
753 for (k = sum = 0; k < stride; k++)
754 sum += io_u_plat[j + k];
760 static void flush_hist_samples(FILE *f, int hist_coarseness, void *samples,
761 uint64_t sample_size)
765 uint64_t i, j, nr_samples;
766 struct io_u_plat_entry *entry, *entry_before;
768 uint64_t *io_u_plat_before;
770 int stride = 1 << hist_coarseness;
775 s = __get_sample(samples, 0, 0);
776 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
778 nr_samples = sample_size / __log_entry_sz(log_offset);
780 for (i = 0; i < nr_samples; i++) {
781 s = __get_sample(samples, log_offset, i);
783 entry = s->data.plat_entry;
784 io_u_plat = entry->io_u_plat;
786 entry_before = flist_first_entry(&entry->list, struct io_u_plat_entry, list);
787 io_u_plat_before = entry_before->io_u_plat;
789 fprintf(f, "%lu, %u, %u, ", (unsigned long) s->time,
790 io_sample_ddir(s), s->bs);
791 for (j = 0; j < FIO_IO_U_PLAT_NR - stride; j += stride) {
792 fprintf(f, "%llu, ", (unsigned long long)
793 hist_sum(j, stride, io_u_plat, io_u_plat_before));
795 fprintf(f, "%llu\n", (unsigned long long)
796 hist_sum(FIO_IO_U_PLAT_NR - stride, stride, io_u_plat,
799 flist_del(&entry_before->list);
804 void flush_samples(FILE *f, void *samples, uint64_t sample_size)
808 uint64_t i, nr_samples;
813 s = __get_sample(samples, 0, 0);
814 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
816 nr_samples = sample_size / __log_entry_sz(log_offset);
818 for (i = 0; i < nr_samples; i++) {
819 s = __get_sample(samples, log_offset, i);
822 fprintf(f, "%lu, %" PRId64 ", %u, %u\n",
823 (unsigned long) s->time,
825 io_sample_ddir(s), s->bs);
827 struct io_sample_offset *so = (void *) s;
829 fprintf(f, "%lu, %" PRId64 ", %u, %u, %llu\n",
830 (unsigned long) s->time,
832 io_sample_ddir(s), s->bs,
833 (unsigned long long) so->offset);
840 struct iolog_flush_data {
841 struct workqueue_work work;
848 #define GZ_CHUNK 131072
850 static struct iolog_compress *get_new_chunk(unsigned int seq)
852 struct iolog_compress *c;
854 c = malloc(sizeof(*c));
855 INIT_FLIST_HEAD(&c->list);
856 c->buf = malloc(GZ_CHUNK);
862 static void free_chunk(struct iolog_compress *ic)
868 static int z_stream_init(z_stream *stream, int gz_hdr)
872 memset(stream, 0, sizeof(*stream));
873 stream->zalloc = Z_NULL;
874 stream->zfree = Z_NULL;
875 stream->opaque = Z_NULL;
876 stream->next_in = Z_NULL;
879 * zlib magic - add 32 for auto-detection of gz header or not,
880 * if we decide to store files in a gzip friendly format.
885 if (inflateInit2(stream, wbits) != Z_OK)
891 struct inflate_chunk_iter {
900 static void finish_chunk(z_stream *stream, FILE *f,
901 struct inflate_chunk_iter *iter)
905 ret = inflateEnd(stream);
907 log_err("fio: failed to end log inflation seq %d (%d)\n",
910 flush_samples(f, iter->buf, iter->buf_used);
913 iter->buf_size = iter->buf_used = 0;
917 * Iterative chunk inflation. Handles cases where we cross into a new
918 * sequence, doing flush finish of previous chunk if needed.
920 static size_t inflate_chunk(struct iolog_compress *ic, int gz_hdr, FILE *f,
921 z_stream *stream, struct inflate_chunk_iter *iter)
925 dprint(FD_COMPRESS, "inflate chunk size=%lu, seq=%u\n",
926 (unsigned long) ic->len, ic->seq);
928 if (ic->seq != iter->seq) {
930 finish_chunk(stream, f, iter);
932 z_stream_init(stream, gz_hdr);
936 stream->avail_in = ic->len;
937 stream->next_in = ic->buf;
939 if (!iter->buf_size) {
940 iter->buf_size = iter->chunk_sz;
941 iter->buf = malloc(iter->buf_size);
944 while (stream->avail_in) {
945 size_t this_out = iter->buf_size - iter->buf_used;
948 stream->avail_out = this_out;
949 stream->next_out = iter->buf + iter->buf_used;
951 err = inflate(stream, Z_NO_FLUSH);
953 log_err("fio: failed inflating log: %d\n", err);
958 iter->buf_used += this_out - stream->avail_out;
960 if (!stream->avail_out) {
961 iter->buf_size += iter->chunk_sz;
962 iter->buf = realloc(iter->buf, iter->buf_size);
966 if (err == Z_STREAM_END)
970 ret = (void *) stream->next_in - ic->buf;
972 dprint(FD_COMPRESS, "inflated to size=%lu\n", (unsigned long) iter->buf_size);
978 * Inflate stored compressed chunks, or write them directly to the log
979 * file if so instructed.
981 static int inflate_gz_chunks(struct io_log *log, FILE *f)
983 struct inflate_chunk_iter iter = { .chunk_sz = log->log_gz, };
986 while (!flist_empty(&log->chunk_list)) {
987 struct iolog_compress *ic;
989 ic = flist_first_entry(&log->chunk_list, struct iolog_compress, list);
990 flist_del(&ic->list);
992 if (log->log_gz_store) {
995 dprint(FD_COMPRESS, "log write chunk size=%lu, "
996 "seq=%u\n", (unsigned long) ic->len, ic->seq);
998 ret = fwrite(ic->buf, ic->len, 1, f);
999 if (ret != 1 || ferror(f)) {
1001 log_err("fio: error writing compressed log\n");
1004 inflate_chunk(ic, log->log_gz_store, f, &stream, &iter);
1010 finish_chunk(&stream, f, &iter);
1018 * Open compressed log file and decompress the stored chunks and
1019 * write them to stdout. The chunks are stored sequentially in the
1020 * file, so we iterate over them and do them one-by-one.
1022 int iolog_file_inflate(const char *file)
1024 struct inflate_chunk_iter iter = { .chunk_sz = 64 * 1024 * 1024, };
1025 struct iolog_compress ic;
1033 f = fopen(file, "r");
1039 if (stat(file, &sb) < 0) {
1045 ic.buf = buf = malloc(sb.st_size);
1046 ic.len = sb.st_size;
1049 ret = fread(ic.buf, ic.len, 1, f);
1050 if (ret == 0 && ferror(f)) {
1055 } else if (ferror(f) || (!feof(f) && ret != 1)) {
1056 log_err("fio: short read on reading log\n");
1065 * Each chunk will return Z_STREAM_END. We don't know how many
1066 * chunks are in the file, so we just keep looping and incrementing
1067 * the sequence number until we have consumed the whole compressed
1074 iret = inflate_chunk(&ic, 1, stdout, &stream, &iter);
1087 finish_chunk(&stream, stdout, &iter);
1097 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1102 int iolog_file_inflate(const char *file)
1104 log_err("fio: log inflation not possible without zlib\n");
1110 void flush_log(struct io_log *log, bool do_append)
1116 f = fopen(log->filename, "w");
1118 f = fopen(log->filename, "a");
1120 perror("fopen log");
1124 buf = set_file_buffer(f);
1126 inflate_gz_chunks(log, f);
1128 while (!flist_empty(&log->io_logs)) {
1129 struct io_logs *cur_log;
1131 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1132 flist_del_init(&cur_log->list);
1134 if (log->td && log == log->td->clat_hist_log)
1135 flush_hist_samples(f, log->hist_coarseness, cur_log->log,
1136 log_sample_sz(log, cur_log));
1138 flush_samples(f, cur_log->log, log_sample_sz(log, cur_log));
1144 clear_file_buffer(buf);
1147 static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
1149 if (td->flags & TD_F_COMPRESS_LOG)
1153 if (fio_trylock_file(log->filename))
1156 fio_lock_file(log->filename);
1158 if (td->client_type == FIO_CLIENT_TYPE_GUI || is_backend)
1159 fio_send_iolog(td, log, log->filename);
1161 flush_log(log, !td->o.per_job_logs);
1163 fio_unlock_file(log->filename);
1168 size_t log_chunk_sizes(struct io_log *log)
1170 struct flist_head *entry;
1173 if (flist_empty(&log->chunk_list))
1177 pthread_mutex_lock(&log->chunk_lock);
1178 flist_for_each(entry, &log->chunk_list) {
1179 struct iolog_compress *c;
1181 c = flist_entry(entry, struct iolog_compress, list);
1184 pthread_mutex_unlock(&log->chunk_lock);
1190 static void iolog_put_deferred(struct io_log *log, void *ptr)
1195 pthread_mutex_lock(&log->deferred_free_lock);
1196 if (log->deferred < IOLOG_MAX_DEFER) {
1197 log->deferred_items[log->deferred] = ptr;
1199 } else if (!fio_did_warn(FIO_WARN_IOLOG_DROP))
1200 log_err("fio: had to drop log entry free\n");
1201 pthread_mutex_unlock(&log->deferred_free_lock);
1204 static void iolog_free_deferred(struct io_log *log)
1211 pthread_mutex_lock(&log->deferred_free_lock);
1213 for (i = 0; i < log->deferred; i++) {
1214 free(log->deferred_items[i]);
1215 log->deferred_items[i] = NULL;
1219 pthread_mutex_unlock(&log->deferred_free_lock);
1222 static int gz_work(struct iolog_flush_data *data)
1224 struct iolog_compress *c = NULL;
1225 struct flist_head list;
1231 INIT_FLIST_HEAD(&list);
1233 memset(&stream, 0, sizeof(stream));
1234 stream.zalloc = Z_NULL;
1235 stream.zfree = Z_NULL;
1236 stream.opaque = Z_NULL;
1238 ret = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
1240 log_err("fio: failed to init gz stream\n");
1244 seq = ++data->log->chunk_seq;
1246 stream.next_in = (void *) data->samples;
1247 stream.avail_in = data->nr_samples * log_entry_sz(data->log);
1249 dprint(FD_COMPRESS, "deflate input size=%lu, seq=%u, log=%s\n",
1250 (unsigned long) stream.avail_in, seq,
1251 data->log->filename);
1254 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1255 (unsigned long) c->len);
1256 c = get_new_chunk(seq);
1257 stream.avail_out = GZ_CHUNK;
1258 stream.next_out = c->buf;
1259 ret = deflate(&stream, Z_NO_FLUSH);
1261 log_err("fio: deflate log (%d)\n", ret);
1266 c->len = GZ_CHUNK - stream.avail_out;
1267 flist_add_tail(&c->list, &list);
1269 } while (stream.avail_in);
1271 stream.next_out = c->buf + c->len;
1272 stream.avail_out = GZ_CHUNK - c->len;
1274 ret = deflate(&stream, Z_FINISH);
1277 * Z_BUF_ERROR is special, it just means we need more
1278 * output space. We'll handle that below. Treat any other
1281 if (ret != Z_BUF_ERROR) {
1282 log_err("fio: deflate log (%d)\n", ret);
1283 flist_del(&c->list);
1290 c->len = GZ_CHUNK - stream.avail_out;
1292 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq, (unsigned long) c->len);
1294 if (ret != Z_STREAM_END) {
1296 c = get_new_chunk(seq);
1297 stream.avail_out = GZ_CHUNK;
1298 stream.next_out = c->buf;
1299 ret = deflate(&stream, Z_FINISH);
1300 c->len = GZ_CHUNK - stream.avail_out;
1302 flist_add_tail(&c->list, &list);
1303 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1304 (unsigned long) c->len);
1305 } while (ret != Z_STREAM_END);
1308 dprint(FD_COMPRESS, "deflated to size=%lu\n", (unsigned long) total);
1310 ret = deflateEnd(&stream);
1312 log_err("fio: deflateEnd %d\n", ret);
1314 iolog_put_deferred(data->log, data->samples);
1316 if (!flist_empty(&list)) {
1317 pthread_mutex_lock(&data->log->chunk_lock);
1318 flist_splice_tail(&list, &data->log->chunk_list);
1319 pthread_mutex_unlock(&data->log->chunk_lock);
1328 while (!flist_empty(&list)) {
1329 c = flist_first_entry(list.next, struct iolog_compress, list);
1330 flist_del(&c->list);
1338 * Invoked from our compress helper thread, when logging would have exceeded
1339 * the specified memory limitation. Compresses the previously stored
1342 static int gz_work_async(struct submit_worker *sw, struct workqueue_work *work)
1344 return gz_work(container_of(work, struct iolog_flush_data, work));
1347 static int gz_init_worker(struct submit_worker *sw)
1349 struct thread_data *td = sw->wq->td;
1351 if (!fio_option_is_set(&td->o, log_gz_cpumask))
1354 if (fio_setaffinity(gettid(), td->o.log_gz_cpumask) == -1) {
1355 log_err("gz: failed to set CPU affinity\n");
1362 static struct workqueue_ops log_compress_wq_ops = {
1363 .fn = gz_work_async,
1364 .init_worker_fn = gz_init_worker,
1368 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1370 if (!(td->flags & TD_F_COMPRESS_LOG))
1373 workqueue_init(td, &td->log_compress_wq, &log_compress_wq_ops, 1, sk_out);
1377 void iolog_compress_exit(struct thread_data *td)
1379 if (!(td->flags & TD_F_COMPRESS_LOG))
1382 workqueue_exit(&td->log_compress_wq);
1386 * Queue work item to compress the existing log entries. We reset the
1387 * current log to a small size, and reference the existing log in the
1388 * data that we queue for compression. Once compression has been done,
1389 * this old log is freed. If called with finish == true, will not return
1390 * until the log compression has completed, and will flush all previous
1393 static int iolog_flush(struct io_log *log)
1395 struct iolog_flush_data *data;
1397 data = malloc(sizeof(*data));
1404 while (!flist_empty(&log->io_logs)) {
1405 struct io_logs *cur_log;
1407 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1408 flist_del_init(&cur_log->list);
1410 data->samples = cur_log->log;
1411 data->nr_samples = cur_log->nr_samples;
1422 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1424 struct iolog_flush_data *data;
1426 data = smalloc(sizeof(*data));
1432 data->samples = cur_log->log;
1433 data->nr_samples = cur_log->nr_samples;
1436 cur_log->nr_samples = cur_log->max_samples = 0;
1437 cur_log->log = NULL;
1439 workqueue_enqueue(&log->td->log_compress_wq, &data->work);
1441 iolog_free_deferred(log);
1447 static int iolog_flush(struct io_log *log)
1452 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1457 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1462 void iolog_compress_exit(struct thread_data *td)
1468 struct io_logs *iolog_cur_log(struct io_log *log)
1470 if (flist_empty(&log->io_logs))
1473 return flist_last_entry(&log->io_logs, struct io_logs, list);
1476 uint64_t iolog_nr_samples(struct io_log *iolog)
1478 struct flist_head *entry;
1481 flist_for_each(entry, &iolog->io_logs) {
1482 struct io_logs *cur_log;
1484 cur_log = flist_entry(entry, struct io_logs, list);
1485 ret += cur_log->nr_samples;
1491 static int __write_log(struct thread_data *td, struct io_log *log, int try)
1494 return finish_log(td, log, try);
1499 static int write_iops_log(struct thread_data *td, int try, bool unit_log)
1503 if (per_unit_log(td->iops_log) != unit_log)
1506 ret = __write_log(td, td->iops_log, try);
1508 td->iops_log = NULL;
1513 static int write_slat_log(struct thread_data *td, int try, bool unit_log)
1520 ret = __write_log(td, td->slat_log, try);
1522 td->slat_log = NULL;
1527 static int write_clat_log(struct thread_data *td, int try, bool unit_log)
1534 ret = __write_log(td, td->clat_log, try);
1536 td->clat_log = NULL;
1541 static int write_clat_hist_log(struct thread_data *td, int try, bool unit_log)
1548 ret = __write_log(td, td->clat_hist_log, try);
1550 td->clat_hist_log = NULL;
1555 static int write_lat_log(struct thread_data *td, int try, bool unit_log)
1562 ret = __write_log(td, td->lat_log, try);
1569 static int write_bandw_log(struct thread_data *td, int try, bool unit_log)
1573 if (per_unit_log(td->bw_log) != unit_log)
1576 ret = __write_log(td, td->bw_log, try);
1589 CLAT_HIST_LOG_MASK = 32,
1596 int (*fn)(struct thread_data *, int, bool);
1599 static struct log_type log_types[] = {
1601 .mask = BW_LOG_MASK,
1602 .fn = write_bandw_log,
1605 .mask = LAT_LOG_MASK,
1606 .fn = write_lat_log,
1609 .mask = SLAT_LOG_MASK,
1610 .fn = write_slat_log,
1613 .mask = CLAT_LOG_MASK,
1614 .fn = write_clat_log,
1617 .mask = IOPS_LOG_MASK,
1618 .fn = write_iops_log,
1621 .mask = CLAT_HIST_LOG_MASK,
1622 .fn = write_clat_hist_log,
1626 void td_writeout_logs(struct thread_data *td, bool unit_logs)
1628 unsigned int log_mask = 0;
1629 unsigned int log_left = ALL_LOG_NR;
1632 old_state = td_bump_runstate(td, TD_FINISHING);
1634 finalize_logs(td, unit_logs);
1637 int prev_log_left = log_left;
1639 for (i = 0; i < ALL_LOG_NR && log_left; i++) {
1640 struct log_type *lt = &log_types[i];
1643 if (!(log_mask & lt->mask)) {
1644 ret = lt->fn(td, log_left != 1, unit_logs);
1647 log_mask |= lt->mask;
1652 if (prev_log_left == log_left)
1656 td_restore_runstate(td, old_state);
1659 void fio_writeout_logs(bool unit_logs)
1661 struct thread_data *td;
1665 td_writeout_logs(td, unit_logs);