2 * Code related to writing an iolog of what a thread is doing, and to
3 * later read that back and replay
22 #include "lib/roundup.h"
24 #include <netinet/in.h>
25 #include <netinet/tcp.h>
26 #include <arpa/inet.h>
28 #include <sys/socket.h>
31 static int iolog_flush(struct io_log *log);
33 static const char iolog_ver2[] = "fio version 2 iolog";
35 void queue_io_piece(struct thread_data *td, struct io_piece *ipo)
37 flist_add_tail(&ipo->list, &td->io_log_list);
38 td->total_io_size += ipo->len;
41 void log_io_u(const struct thread_data *td, const struct io_u *io_u)
43 if (!td->o.write_iolog_file)
46 fprintf(td->iolog_f, "%s %s %llu %llu\n", io_u->file->file_name,
47 io_ddir_name(io_u->ddir),
48 io_u->offset, io_u->buflen);
51 void log_file(struct thread_data *td, struct fio_file *f,
52 enum file_log_act what)
54 const char *act[] = { "add", "open", "close" };
58 if (!td->o.write_iolog_file)
63 * this happens on the pre-open/close done before the job starts
68 fprintf(td->iolog_f, "%s %s\n", f->file_name, act[what]);
71 static void iolog_delay(struct thread_data *td, unsigned long delay)
73 uint64_t usec = utime_since_now(&td->last_issue);
74 unsigned long orig_delay = delay;
78 if (delay < td->time_offset) {
83 delay -= td->time_offset;
89 fio_gettime(&ts, NULL);
90 while (delay && !td->terminate) {
92 if (this_delay > 500000)
95 usec_sleep(td, this_delay);
99 usec = utime_since_now(&ts);
100 if (usec > orig_delay)
101 td->time_offset = usec - orig_delay;
106 static int ipo_special(struct thread_data *td, struct io_piece *ipo)
114 if (ipo->ddir != DDIR_INVAL)
117 f = td->files[ipo->fileno];
119 switch (ipo->file_action) {
120 case FIO_LOG_OPEN_FILE:
121 if (td->o.replay_redirect && fio_file_open(f)) {
122 dprint(FD_FILE, "iolog: ignoring re-open of file %s\n",
126 ret = td_io_open_file(td, f);
129 td_verror(td, ret, "iolog open file");
131 case FIO_LOG_CLOSE_FILE:
132 td_io_close_file(td, f);
134 case FIO_LOG_UNLINK_FILE:
135 td_io_unlink_file(td, f);
138 log_err("fio: bad file action %d\n", ipo->file_action);
145 static bool read_iolog2(struct thread_data *td);
147 int read_iolog_get(struct thread_data *td, struct io_u *io_u)
149 struct io_piece *ipo;
150 unsigned long elapsed;
152 while (!flist_empty(&td->io_log_list)) {
155 if (td->o.read_iolog_chunked) {
156 if (td->io_log_checkmark == td->io_log_current) {
157 if (td->io_log_blktrace) {
158 if (!read_blktrace(td))
161 if (!read_iolog2(td))
165 td->io_log_current--;
167 ipo = flist_first_entry(&td->io_log_list, struct io_piece, list);
168 flist_del(&ipo->list);
169 remove_trim_entry(td, ipo);
171 ret = ipo_special(td, ipo);
175 } else if (ret > 0) {
180 io_u->ddir = ipo->ddir;
181 if (ipo->ddir != DDIR_WAIT) {
182 io_u->offset = ipo->offset;
183 io_u->verify_offset = ipo->offset;
184 io_u->buflen = ipo->len;
185 io_u->file = td->files[ipo->fileno];
186 get_file(io_u->file);
187 dprint(FD_IO, "iolog: get %llu/%llu/%s\n", io_u->offset,
188 io_u->buflen, io_u->file->file_name);
190 iolog_delay(td, ipo->delay);
192 elapsed = mtime_since_genesis();
193 if (ipo->delay > elapsed)
194 usec_sleep(td, (ipo->delay - elapsed) * 1000);
199 if (io_u->ddir != DDIR_WAIT)
207 void prune_io_piece_log(struct thread_data *td)
209 struct io_piece *ipo;
210 struct fio_rb_node *n;
212 while ((n = rb_first(&td->io_hist_tree)) != NULL) {
213 ipo = rb_entry(n, struct io_piece, rb_node);
214 rb_erase(n, &td->io_hist_tree);
215 remove_trim_entry(td, ipo);
220 while (!flist_empty(&td->io_hist_list)) {
221 ipo = flist_first_entry(&td->io_hist_list, struct io_piece, list);
222 flist_del(&ipo->list);
223 remove_trim_entry(td, ipo);
230 * log a successful write, so we can unwind the log for verify
232 void log_io_piece(struct thread_data *td, struct io_u *io_u)
234 struct fio_rb_node **p, *parent;
235 struct io_piece *ipo, *__ipo;
237 ipo = calloc(1, sizeof(struct io_piece));
239 ipo->file = io_u->file;
240 ipo->offset = io_u->offset;
241 ipo->len = io_u->buflen;
242 ipo->numberio = io_u->numberio;
243 ipo->flags = IP_F_IN_FLIGHT;
247 if (io_u_should_trim(td, io_u)) {
248 flist_add_tail(&ipo->trim_list, &td->trim_list);
253 * Only sort writes if we don't have a random map in which case we need
254 * to check for duplicate blocks and drop the old one, which we rely on
255 * the rb insert/lookup for handling.
257 if (file_randommap(td, ipo->file)) {
258 INIT_FLIST_HEAD(&ipo->list);
259 flist_add_tail(&ipo->list, &td->io_hist_list);
260 ipo->flags |= IP_F_ONLIST;
265 RB_CLEAR_NODE(&ipo->rb_node);
268 * Sort the entry into the verification list
271 p = &td->io_hist_tree.rb_node;
277 __ipo = rb_entry(parent, struct io_piece, rb_node);
278 if (ipo->file < __ipo->file)
280 else if (ipo->file > __ipo->file)
282 else if (ipo->offset < __ipo->offset) {
284 overlap = ipo->offset + ipo->len > __ipo->offset;
286 else if (ipo->offset > __ipo->offset) {
288 overlap = __ipo->offset + __ipo->len > ipo->offset;
294 dprint(FD_IO, "iolog: overlap %llu/%lu, %llu/%lu\n",
295 __ipo->offset, __ipo->len,
296 ipo->offset, ipo->len);
298 rb_erase(parent, &td->io_hist_tree);
299 remove_trim_entry(td, __ipo);
300 if (!(__ipo->flags & IP_F_IN_FLIGHT))
306 rb_link_node(&ipo->rb_node, parent, p);
307 rb_insert_color(&ipo->rb_node, &td->io_hist_tree);
308 ipo->flags |= IP_F_ONRB;
312 void unlog_io_piece(struct thread_data *td, struct io_u *io_u)
314 struct io_piece *ipo = io_u->ipo;
316 if (td->ts.nr_block_infos) {
317 uint32_t *info = io_u_block_info(td, io_u);
318 if (BLOCK_INFO_STATE(*info) < BLOCK_STATE_TRIM_FAILURE) {
319 if (io_u->ddir == DDIR_TRIM)
320 *info = BLOCK_INFO_SET_STATE(*info,
321 BLOCK_STATE_TRIM_FAILURE);
322 else if (io_u->ddir == DDIR_WRITE)
323 *info = BLOCK_INFO_SET_STATE(*info,
324 BLOCK_STATE_WRITE_FAILURE);
331 if (ipo->flags & IP_F_ONRB)
332 rb_erase(&ipo->rb_node, &td->io_hist_tree);
333 else if (ipo->flags & IP_F_ONLIST)
334 flist_del(&ipo->list);
341 void trim_io_piece(const struct io_u *io_u)
343 struct io_piece *ipo = io_u->ipo;
348 ipo->len = io_u->xfer_buflen - io_u->resid;
351 void write_iolog_close(struct thread_data *td)
360 td->iolog_buf = NULL;
363 int64_t iolog_items_to_fetch(struct thread_data *td)
368 int64_t items_to_fetch;
370 if (!td->io_log_highmark)
374 fio_gettime(&now, NULL);
375 elapsed = ntime_since(&td->io_log_highmark_time, &now);
377 for_1s = (td->io_log_highmark - td->io_log_current) * 1000000000 / elapsed;
378 items_to_fetch = for_1s - td->io_log_current;
379 if (items_to_fetch < 0)
384 td->io_log_highmark = td->io_log_current + items_to_fetch;
385 td->io_log_checkmark = (td->io_log_highmark + 1) / 2;
386 fio_gettime(&td->io_log_highmark_time, NULL);
388 return items_to_fetch;
392 * Read version 2 iolog data. It is enhanced to include per-file logging,
395 static bool read_iolog2(struct thread_data *td)
397 unsigned long long offset;
399 int reads, writes, waits, fileno = 0, file_action = 0; /* stupid gcc */
400 char *rfname, *fname, *act;
403 bool realloc = false;
404 int64_t items_to_fetch = 0;
406 if (td->o.read_iolog_chunked) {
407 items_to_fetch = iolog_items_to_fetch(td);
413 * Read in the read iolog and store it, reuse the infrastructure
414 * for doing verifications.
417 rfname = fname = malloc(256+16);
418 act = malloc(256+16);
420 reads = writes = waits = 0;
421 while ((p = fgets(str, 4096, td->io_log_rfile)) != NULL) {
422 struct io_piece *ipo;
425 r = sscanf(p, "%256s %256s %llu %u", rfname, act, &offset,
428 if (td->o.replay_redirect)
429 fname = td->o.replay_redirect;
435 if (!strcmp(act, "wait"))
437 else if (!strcmp(act, "read"))
439 else if (!strcmp(act, "write"))
441 else if (!strcmp(act, "sync"))
443 else if (!strcmp(act, "datasync"))
445 else if (!strcmp(act, "trim"))
448 log_err("fio: bad iolog file action: %s\n",
452 fileno = get_fileno(td, fname);
455 if (!strcmp(act, "add")) {
456 if (td->o.replay_redirect &&
457 get_fileno(td, fname) != -1) {
458 dprint(FD_FILE, "iolog: ignoring"
459 " re-add of file %s\n", fname);
461 fileno = add_file(td, fname, td->subjob_number, 1);
462 file_action = FIO_LOG_ADD_FILE;
465 } else if (!strcmp(act, "open")) {
466 fileno = get_fileno(td, fname);
467 file_action = FIO_LOG_OPEN_FILE;
468 } else if (!strcmp(act, "close")) {
469 fileno = get_fileno(td, fname);
470 file_action = FIO_LOG_CLOSE_FILE;
472 log_err("fio: bad iolog file action: %s\n",
477 log_err("bad iolog2: %s\n", p);
483 else if (rw == DDIR_WRITE) {
485 * Don't add a write for ro mode
490 } else if (rw == DDIR_WAIT) {
494 } else if (rw == DDIR_INVAL) {
495 } else if (!ddir_sync(rw)) {
496 log_err("bad ddir: %d\n", rw);
503 ipo = calloc(1, sizeof(*ipo));
506 if (rw == DDIR_WAIT) {
509 if (td->o.replay_scale)
510 ipo->offset = offset / td->o.replay_scale;
512 ipo->offset = offset;
513 ipo_bytes_align(td->o.replay_align, ipo);
516 if (rw != DDIR_INVAL && bytes > td->o.max_bs[rw]) {
518 td->o.max_bs[rw] = bytes;
520 ipo->fileno = fileno;
521 ipo->file_action = file_action;
525 queue_io_piece(td, ipo);
527 if (td->o.read_iolog_chunked) {
528 td->io_log_current++;
530 if (items_to_fetch == 0)
539 if (td->o.read_iolog_chunked) {
540 td->io_log_highmark = td->io_log_current;
541 td->io_log_checkmark = (td->io_log_highmark + 1) / 2;
542 fio_gettime(&td->io_log_highmark_time, NULL);
545 if (writes && read_only) {
546 log_err("fio: <%s> skips replay of %d writes due to"
547 " read-only\n", td->o.name, writes);
551 if (td->o.read_iolog_chunked) {
552 if (td->io_log_current == 0) {
555 td->o.td_ddir = TD_DDIR_RW;
556 if (realloc && td->orig_buffer)
560 init_io_u_buffers(td);
565 if (!reads && !writes && !waits)
567 else if (reads && !writes)
568 td->o.td_ddir = TD_DDIR_READ;
569 else if (!reads && writes)
570 td->o.td_ddir = TD_DDIR_WRITE;
572 td->o.td_ddir = TD_DDIR_RW;
577 static bool is_socket(const char *path)
582 r = stat(path, &buf);
586 return S_ISSOCK(buf.st_mode);
589 static int open_socket(const char *path)
591 struct sockaddr_un addr;
594 fd = socket(AF_UNIX, SOCK_STREAM, 0);
598 addr.sun_family = AF_UNIX;
599 if (snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", path) >=
600 sizeof(addr.sun_path)) {
601 log_err("%s: path name %s is too long for a Unix socket\n",
605 ret = connect(fd, (const struct sockaddr *)&addr, strlen(path) + sizeof(addr.sun_family));
614 * open iolog, check version, and call appropriate parser
616 static bool init_iolog_read(struct thread_data *td, char *fname)
618 char buffer[256], *p;
621 dprint(FD_IO, "iolog: name=%s\n", fname);
623 if (is_socket(fname)) {
626 fd = open_socket(fname);
629 } else if (!strcmp(fname, "-")) {
632 f = fopen(fname, "r");
637 perror("fopen read iolog");
641 p = fgets(buffer, sizeof(buffer), f);
643 td_verror(td, errno, "iolog read");
644 log_err("fio: unable to read iolog\n");
650 * version 2 of the iolog stores a specific string as the
651 * first line, check for that
653 if (!strncmp(iolog_ver2, buffer, strlen(iolog_ver2))) {
654 free_release_files(td);
655 td->io_log_rfile = f;
656 return read_iolog2(td);
659 log_err("fio: iolog version 1 is no longer supported\n");
665 * Set up a log for storing io patterns.
667 static bool init_iolog_write(struct thread_data *td)
673 f = fopen(td->o.write_iolog_file, "a");
675 perror("fopen write iolog");
680 * That's it for writing, setup a log buffer and we're done.
683 td->iolog_buf = malloc(8192);
684 setvbuf(f, td->iolog_buf, _IOFBF, 8192);
687 * write our version line
689 if (fprintf(f, "%s\n", iolog_ver2) < 0) {
690 perror("iolog init\n");
695 * add all known files
697 for_each_file(td, ff, i)
698 log_file(td, ff, FIO_LOG_ADD_FILE);
703 bool init_iolog(struct thread_data *td)
707 if (td->o.read_iolog_file) {
709 char * fname = get_name_by_idx(td->o.read_iolog_file, td->subjob_number);
712 * Check if it's a blktrace file and load that if possible.
713 * Otherwise assume it's a normal log file and load that.
715 if (is_blktrace(fname, &need_swap)) {
716 td->io_log_blktrace = 1;
717 ret = init_blktrace_read(td, fname, need_swap);
719 td->io_log_blktrace = 0;
720 ret = init_iolog_read(td, fname);
722 } else if (td->o.write_iolog_file)
723 ret = init_iolog_write(td);
728 td_verror(td, EINVAL, "failed initializing iolog");
733 void setup_log(struct io_log **log, struct log_params *p,
734 const char *filename)
738 struct io_u_plat_entry *entry;
739 struct flist_head *list;
741 l = scalloc(1, sizeof(*l));
742 INIT_FLIST_HEAD(&l->io_logs);
743 l->log_type = p->log_type;
744 l->log_offset = p->log_offset;
745 l->log_prio = p->log_prio;
746 l->log_gz = p->log_gz;
747 l->log_gz_store = p->log_gz_store;
748 l->avg_msec = p->avg_msec;
749 l->hist_msec = p->hist_msec;
750 l->hist_coarseness = p->hist_coarseness;
751 l->filename = strdup(filename);
754 /* Initialize histogram lists for each r/w direction,
755 * with initial io_u_plat of all zeros:
757 for (i = 0; i < DDIR_RWDIR_CNT; i++) {
758 list = &l->hist_window[i].list;
759 INIT_FLIST_HEAD(list);
760 entry = calloc(1, sizeof(struct io_u_plat_entry));
761 flist_add(&entry->list, list);
764 if (l->td && l->td->o.io_submit_mode != IO_MODE_OFFLOAD) {
765 unsigned int def_samples = DEF_LOG_ENTRIES;
768 __p = calloc(1, sizeof(*l->pending));
769 if (l->td->o.iodepth > DEF_LOG_ENTRIES)
770 def_samples = roundup_pow2(l->td->o.iodepth);
771 __p->max_samples = def_samples;
772 __p->log = calloc(__p->max_samples, log_entry_sz(l));
777 l->log_ddir_mask = LOG_OFFSET_SAMPLE_BIT;
779 l->log_ddir_mask |= LOG_PRIO_SAMPLE_BIT;
781 INIT_FLIST_HEAD(&l->chunk_list);
783 if (l->log_gz && !p->td)
785 else if (l->log_gz || l->log_gz_store) {
786 mutex_init_pshared(&l->chunk_lock);
787 mutex_init_pshared(&l->deferred_free_lock);
788 p->td->flags |= TD_F_COMPRESS_LOG;
794 #ifdef CONFIG_SETVBUF
795 static void *set_file_buffer(FILE *f)
797 size_t size = 1048576;
801 setvbuf(f, buf, _IOFBF, size);
805 static void clear_file_buffer(void *buf)
810 static void *set_file_buffer(FILE *f)
815 static void clear_file_buffer(void *buf)
820 void free_log(struct io_log *log)
822 while (!flist_empty(&log->io_logs)) {
823 struct io_logs *cur_log;
825 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
826 flist_del_init(&cur_log->list);
832 free(log->pending->log);
842 uint64_t hist_sum(int j, int stride, uint64_t *io_u_plat,
843 uint64_t *io_u_plat_last)
848 if (io_u_plat_last) {
849 for (k = sum = 0; k < stride; k++)
850 sum += io_u_plat[j + k] - io_u_plat_last[j + k];
852 for (k = sum = 0; k < stride; k++)
853 sum += io_u_plat[j + k];
859 static void flush_hist_samples(FILE *f, int hist_coarseness, void *samples,
860 uint64_t sample_size)
864 uint64_t i, j, nr_samples;
865 struct io_u_plat_entry *entry, *entry_before;
867 uint64_t *io_u_plat_before;
869 int stride = 1 << hist_coarseness;
874 s = __get_sample(samples, 0, 0);
875 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
877 nr_samples = sample_size / __log_entry_sz(log_offset);
879 for (i = 0; i < nr_samples; i++) {
880 s = __get_sample(samples, log_offset, i);
882 entry = s->data.plat_entry;
883 io_u_plat = entry->io_u_plat;
885 entry_before = flist_first_entry(&entry->list, struct io_u_plat_entry, list);
886 io_u_plat_before = entry_before->io_u_plat;
888 fprintf(f, "%lu, %u, %llu, ", (unsigned long) s->time,
889 io_sample_ddir(s), (unsigned long long) s->bs);
890 for (j = 0; j < FIO_IO_U_PLAT_NR - stride; j += stride) {
891 fprintf(f, "%llu, ", (unsigned long long)
892 hist_sum(j, stride, io_u_plat, io_u_plat_before));
894 fprintf(f, "%llu\n", (unsigned long long)
895 hist_sum(FIO_IO_U_PLAT_NR - stride, stride, io_u_plat,
898 flist_del(&entry_before->list);
903 void flush_samples(FILE *f, void *samples, uint64_t sample_size)
906 int log_offset, log_prio;
907 uint64_t i, nr_samples;
908 unsigned int prio_val;
914 s = __get_sample(samples, 0, 0);
915 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
916 log_prio = (s->__ddir & LOG_PRIO_SAMPLE_BIT) != 0;
920 fmt = "%lu, %" PRId64 ", %u, %llu, %llu, 0x%04x\n";
922 fmt = "%lu, %" PRId64 ", %u, %llu, %llu, %u\n";
925 fmt = "%lu, %" PRId64 ", %u, %llu, 0x%04x\n";
927 fmt = "%lu, %" PRId64 ", %u, %llu, %u\n";
930 nr_samples = sample_size / __log_entry_sz(log_offset);
932 for (i = 0; i < nr_samples; i++) {
933 s = __get_sample(samples, log_offset, i);
936 prio_val = s->priority;
938 prio_val = ioprio_value_is_class_rt(s->priority);
942 (unsigned long) s->time,
944 io_sample_ddir(s), (unsigned long long) s->bs,
947 struct io_sample_offset *so = (void *) s;
950 (unsigned long) s->time,
952 io_sample_ddir(s), (unsigned long long) s->bs,
953 (unsigned long long) so->offset,
961 struct iolog_flush_data {
962 struct workqueue_work work;
969 #define GZ_CHUNK 131072
971 static struct iolog_compress *get_new_chunk(unsigned int seq)
973 struct iolog_compress *c;
975 c = malloc(sizeof(*c));
976 INIT_FLIST_HEAD(&c->list);
977 c->buf = malloc(GZ_CHUNK);
983 static void free_chunk(struct iolog_compress *ic)
989 static int z_stream_init(z_stream *stream, int gz_hdr)
993 memset(stream, 0, sizeof(*stream));
994 stream->zalloc = Z_NULL;
995 stream->zfree = Z_NULL;
996 stream->opaque = Z_NULL;
997 stream->next_in = Z_NULL;
1000 * zlib magic - add 32 for auto-detection of gz header or not,
1001 * if we decide to store files in a gzip friendly format.
1006 if (inflateInit2(stream, wbits) != Z_OK)
1012 struct inflate_chunk_iter {
1021 static void finish_chunk(z_stream *stream, FILE *f,
1022 struct inflate_chunk_iter *iter)
1026 ret = inflateEnd(stream);
1028 log_err("fio: failed to end log inflation seq %d (%d)\n",
1031 flush_samples(f, iter->buf, iter->buf_used);
1034 iter->buf_size = iter->buf_used = 0;
1038 * Iterative chunk inflation. Handles cases where we cross into a new
1039 * sequence, doing flush finish of previous chunk if needed.
1041 static size_t inflate_chunk(struct iolog_compress *ic, int gz_hdr, FILE *f,
1042 z_stream *stream, struct inflate_chunk_iter *iter)
1046 dprint(FD_COMPRESS, "inflate chunk size=%lu, seq=%u\n",
1047 (unsigned long) ic->len, ic->seq);
1049 if (ic->seq != iter->seq) {
1051 finish_chunk(stream, f, iter);
1053 z_stream_init(stream, gz_hdr);
1054 iter->seq = ic->seq;
1057 stream->avail_in = ic->len;
1058 stream->next_in = ic->buf;
1060 if (!iter->buf_size) {
1061 iter->buf_size = iter->chunk_sz;
1062 iter->buf = malloc(iter->buf_size);
1065 while (stream->avail_in) {
1066 size_t this_out = iter->buf_size - iter->buf_used;
1069 stream->avail_out = this_out;
1070 stream->next_out = iter->buf + iter->buf_used;
1072 err = inflate(stream, Z_NO_FLUSH);
1074 log_err("fio: failed inflating log: %d\n", err);
1079 iter->buf_used += this_out - stream->avail_out;
1081 if (!stream->avail_out) {
1082 iter->buf_size += iter->chunk_sz;
1083 iter->buf = realloc(iter->buf, iter->buf_size);
1087 if (err == Z_STREAM_END)
1091 ret = (void *) stream->next_in - ic->buf;
1093 dprint(FD_COMPRESS, "inflated to size=%lu\n", (unsigned long) iter->buf_size);
1099 * Inflate stored compressed chunks, or write them directly to the log
1100 * file if so instructed.
1102 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1104 struct inflate_chunk_iter iter = { .chunk_sz = log->log_gz, };
1107 while (!flist_empty(&log->chunk_list)) {
1108 struct iolog_compress *ic;
1110 ic = flist_first_entry(&log->chunk_list, struct iolog_compress, list);
1111 flist_del(&ic->list);
1113 if (log->log_gz_store) {
1116 dprint(FD_COMPRESS, "log write chunk size=%lu, "
1117 "seq=%u\n", (unsigned long) ic->len, ic->seq);
1119 ret = fwrite(ic->buf, ic->len, 1, f);
1120 if (ret != 1 || ferror(f)) {
1122 log_err("fio: error writing compressed log\n");
1125 inflate_chunk(ic, log->log_gz_store, f, &stream, &iter);
1131 finish_chunk(&stream, f, &iter);
1139 * Open compressed log file and decompress the stored chunks and
1140 * write them to stdout. The chunks are stored sequentially in the
1141 * file, so we iterate over them and do them one-by-one.
1143 int iolog_file_inflate(const char *file)
1145 struct inflate_chunk_iter iter = { .chunk_sz = 64 * 1024 * 1024, };
1146 struct iolog_compress ic;
1154 f = fopen(file, "r");
1160 if (stat(file, &sb) < 0) {
1166 ic.buf = buf = malloc(sb.st_size);
1167 ic.len = sb.st_size;
1170 ret = fread(ic.buf, ic.len, 1, f);
1171 if (ret == 0 && ferror(f)) {
1176 } else if (ferror(f) || (!feof(f) && ret != 1)) {
1177 log_err("fio: short read on reading log\n");
1186 * Each chunk will return Z_STREAM_END. We don't know how many
1187 * chunks are in the file, so we just keep looping and incrementing
1188 * the sequence number until we have consumed the whole compressed
1195 iret = inflate_chunk(&ic, 1, stdout, &stream, &iter);
1208 finish_chunk(&stream, stdout, &iter);
1218 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1223 int iolog_file_inflate(const char *file)
1225 log_err("fio: log inflation not possible without zlib\n");
1231 void flush_log(struct io_log *log, bool do_append)
1237 f = fopen(log->filename, "w");
1239 f = fopen(log->filename, "a");
1241 perror("fopen log");
1245 buf = set_file_buffer(f);
1247 inflate_gz_chunks(log, f);
1249 while (!flist_empty(&log->io_logs)) {
1250 struct io_logs *cur_log;
1252 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1253 flist_del_init(&cur_log->list);
1255 if (log->td && log == log->td->clat_hist_log)
1256 flush_hist_samples(f, log->hist_coarseness, cur_log->log,
1257 log_sample_sz(log, cur_log));
1259 flush_samples(f, cur_log->log, log_sample_sz(log, cur_log));
1265 clear_file_buffer(buf);
1268 static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
1270 if (td->flags & TD_F_COMPRESS_LOG)
1274 if (fio_trylock_file(log->filename))
1277 fio_lock_file(log->filename);
1279 if (td->client_type == FIO_CLIENT_TYPE_GUI || is_backend)
1280 fio_send_iolog(td, log, log->filename);
1282 flush_log(log, !td->o.per_job_logs);
1284 fio_unlock_file(log->filename);
1289 size_t log_chunk_sizes(struct io_log *log)
1291 struct flist_head *entry;
1294 if (flist_empty(&log->chunk_list))
1298 pthread_mutex_lock(&log->chunk_lock);
1299 flist_for_each(entry, &log->chunk_list) {
1300 struct iolog_compress *c;
1302 c = flist_entry(entry, struct iolog_compress, list);
1305 pthread_mutex_unlock(&log->chunk_lock);
1311 static void iolog_put_deferred(struct io_log *log, void *ptr)
1316 pthread_mutex_lock(&log->deferred_free_lock);
1317 if (log->deferred < IOLOG_MAX_DEFER) {
1318 log->deferred_items[log->deferred] = ptr;
1320 } else if (!fio_did_warn(FIO_WARN_IOLOG_DROP))
1321 log_err("fio: had to drop log entry free\n");
1322 pthread_mutex_unlock(&log->deferred_free_lock);
1325 static void iolog_free_deferred(struct io_log *log)
1332 pthread_mutex_lock(&log->deferred_free_lock);
1334 for (i = 0; i < log->deferred; i++) {
1335 free(log->deferred_items[i]);
1336 log->deferred_items[i] = NULL;
1340 pthread_mutex_unlock(&log->deferred_free_lock);
1343 static int gz_work(struct iolog_flush_data *data)
1345 struct iolog_compress *c = NULL;
1346 struct flist_head list;
1352 INIT_FLIST_HEAD(&list);
1354 memset(&stream, 0, sizeof(stream));
1355 stream.zalloc = Z_NULL;
1356 stream.zfree = Z_NULL;
1357 stream.opaque = Z_NULL;
1359 ret = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
1361 log_err("fio: failed to init gz stream\n");
1365 seq = ++data->log->chunk_seq;
1367 stream.next_in = (void *) data->samples;
1368 stream.avail_in = data->nr_samples * log_entry_sz(data->log);
1370 dprint(FD_COMPRESS, "deflate input size=%lu, seq=%u, log=%s\n",
1371 (unsigned long) stream.avail_in, seq,
1372 data->log->filename);
1375 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1376 (unsigned long) c->len);
1377 c = get_new_chunk(seq);
1378 stream.avail_out = GZ_CHUNK;
1379 stream.next_out = c->buf;
1380 ret = deflate(&stream, Z_NO_FLUSH);
1382 log_err("fio: deflate log (%d)\n", ret);
1387 c->len = GZ_CHUNK - stream.avail_out;
1388 flist_add_tail(&c->list, &list);
1390 } while (stream.avail_in);
1392 stream.next_out = c->buf + c->len;
1393 stream.avail_out = GZ_CHUNK - c->len;
1395 ret = deflate(&stream, Z_FINISH);
1398 * Z_BUF_ERROR is special, it just means we need more
1399 * output space. We'll handle that below. Treat any other
1402 if (ret != Z_BUF_ERROR) {
1403 log_err("fio: deflate log (%d)\n", ret);
1404 flist_del(&c->list);
1411 c->len = GZ_CHUNK - stream.avail_out;
1413 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq, (unsigned long) c->len);
1415 if (ret != Z_STREAM_END) {
1417 c = get_new_chunk(seq);
1418 stream.avail_out = GZ_CHUNK;
1419 stream.next_out = c->buf;
1420 ret = deflate(&stream, Z_FINISH);
1421 c->len = GZ_CHUNK - stream.avail_out;
1423 flist_add_tail(&c->list, &list);
1424 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1425 (unsigned long) c->len);
1426 } while (ret != Z_STREAM_END);
1429 dprint(FD_COMPRESS, "deflated to size=%lu\n", (unsigned long) total);
1431 ret = deflateEnd(&stream);
1433 log_err("fio: deflateEnd %d\n", ret);
1435 iolog_put_deferred(data->log, data->samples);
1437 if (!flist_empty(&list)) {
1438 pthread_mutex_lock(&data->log->chunk_lock);
1439 flist_splice_tail(&list, &data->log->chunk_list);
1440 pthread_mutex_unlock(&data->log->chunk_lock);
1449 while (!flist_empty(&list)) {
1450 c = flist_first_entry(list.next, struct iolog_compress, list);
1451 flist_del(&c->list);
1459 * Invoked from our compress helper thread, when logging would have exceeded
1460 * the specified memory limitation. Compresses the previously stored
1463 static int gz_work_async(struct submit_worker *sw, struct workqueue_work *work)
1465 return gz_work(container_of(work, struct iolog_flush_data, work));
1468 static int gz_init_worker(struct submit_worker *sw)
1470 struct thread_data *td = sw->wq->td;
1472 if (!fio_option_is_set(&td->o, log_gz_cpumask))
1475 if (fio_setaffinity(gettid(), td->o.log_gz_cpumask) == -1) {
1476 log_err("gz: failed to set CPU affinity\n");
1483 static struct workqueue_ops log_compress_wq_ops = {
1484 .fn = gz_work_async,
1485 .init_worker_fn = gz_init_worker,
1489 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1491 if (!(td->flags & TD_F_COMPRESS_LOG))
1494 workqueue_init(td, &td->log_compress_wq, &log_compress_wq_ops, 1, sk_out);
1498 void iolog_compress_exit(struct thread_data *td)
1500 if (!(td->flags & TD_F_COMPRESS_LOG))
1503 workqueue_exit(&td->log_compress_wq);
1507 * Queue work item to compress the existing log entries. We reset the
1508 * current log to a small size, and reference the existing log in the
1509 * data that we queue for compression. Once compression has been done,
1510 * this old log is freed. If called with finish == true, will not return
1511 * until the log compression has completed, and will flush all previous
1514 static int iolog_flush(struct io_log *log)
1516 struct iolog_flush_data *data;
1518 data = malloc(sizeof(*data));
1525 while (!flist_empty(&log->io_logs)) {
1526 struct io_logs *cur_log;
1528 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1529 flist_del_init(&cur_log->list);
1531 data->samples = cur_log->log;
1532 data->nr_samples = cur_log->nr_samples;
1543 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1545 struct iolog_flush_data *data;
1547 data = smalloc(sizeof(*data));
1553 data->samples = cur_log->log;
1554 data->nr_samples = cur_log->nr_samples;
1557 cur_log->nr_samples = cur_log->max_samples = 0;
1558 cur_log->log = NULL;
1560 workqueue_enqueue(&log->td->log_compress_wq, &data->work);
1562 iolog_free_deferred(log);
1568 static int iolog_flush(struct io_log *log)
1573 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1578 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1583 void iolog_compress_exit(struct thread_data *td)
1589 struct io_logs *iolog_cur_log(struct io_log *log)
1591 if (flist_empty(&log->io_logs))
1594 return flist_last_entry(&log->io_logs, struct io_logs, list);
1597 uint64_t iolog_nr_samples(struct io_log *iolog)
1599 struct flist_head *entry;
1602 flist_for_each(entry, &iolog->io_logs) {
1603 struct io_logs *cur_log;
1605 cur_log = flist_entry(entry, struct io_logs, list);
1606 ret += cur_log->nr_samples;
1612 static int __write_log(struct thread_data *td, struct io_log *log, int try)
1615 return finish_log(td, log, try);
1620 static int write_iops_log(struct thread_data *td, int try, bool unit_log)
1624 if (per_unit_log(td->iops_log) != unit_log)
1627 ret = __write_log(td, td->iops_log, try);
1629 td->iops_log = NULL;
1634 static int write_slat_log(struct thread_data *td, int try, bool unit_log)
1641 ret = __write_log(td, td->slat_log, try);
1643 td->slat_log = NULL;
1648 static int write_clat_log(struct thread_data *td, int try, bool unit_log)
1655 ret = __write_log(td, td->clat_log, try);
1657 td->clat_log = NULL;
1662 static int write_clat_hist_log(struct thread_data *td, int try, bool unit_log)
1669 ret = __write_log(td, td->clat_hist_log, try);
1671 td->clat_hist_log = NULL;
1676 static int write_lat_log(struct thread_data *td, int try, bool unit_log)
1683 ret = __write_log(td, td->lat_log, try);
1690 static int write_bandw_log(struct thread_data *td, int try, bool unit_log)
1694 if (per_unit_log(td->bw_log) != unit_log)
1697 ret = __write_log(td, td->bw_log, try);
1710 CLAT_HIST_LOG_MASK = 32,
1717 int (*fn)(struct thread_data *, int, bool);
1720 static struct log_type log_types[] = {
1722 .mask = BW_LOG_MASK,
1723 .fn = write_bandw_log,
1726 .mask = LAT_LOG_MASK,
1727 .fn = write_lat_log,
1730 .mask = SLAT_LOG_MASK,
1731 .fn = write_slat_log,
1734 .mask = CLAT_LOG_MASK,
1735 .fn = write_clat_log,
1738 .mask = IOPS_LOG_MASK,
1739 .fn = write_iops_log,
1742 .mask = CLAT_HIST_LOG_MASK,
1743 .fn = write_clat_hist_log,
1747 void td_writeout_logs(struct thread_data *td, bool unit_logs)
1749 unsigned int log_mask = 0;
1750 unsigned int log_left = ALL_LOG_NR;
1753 old_state = td_bump_runstate(td, TD_FINISHING);
1755 finalize_logs(td, unit_logs);
1758 int prev_log_left = log_left;
1760 for (i = 0; i < ALL_LOG_NR && log_left; i++) {
1761 struct log_type *lt = &log_types[i];
1764 if (!(log_mask & lt->mask)) {
1765 ret = lt->fn(td, log_left != 1, unit_logs);
1768 log_mask |= lt->mask;
1773 if (prev_log_left == log_left)
1777 td_restore_runstate(td, old_state);
1780 void fio_writeout_logs(bool unit_logs)
1782 struct thread_data *td;
1786 td_writeout_logs(td, unit_logs);