2 * Code related to writing an iolog of what a thread is doing, and to
3 * later read that back and replay
22 #include "lib/roundup.h"
24 #include <netinet/in.h>
25 #include <netinet/tcp.h>
26 #include <arpa/inet.h>
28 #include <sys/socket.h>
31 static int iolog_flush(struct io_log *log);
33 static const char iolog_ver2[] = "fio version 2 iolog";
35 void queue_io_piece(struct thread_data *td, struct io_piece *ipo)
37 flist_add_tail(&ipo->list, &td->io_log_list);
38 td->total_io_size += ipo->len;
41 void log_io_u(const struct thread_data *td, const struct io_u *io_u)
43 if (!td->o.write_iolog_file)
46 fprintf(td->iolog_f, "%s %s %llu %llu\n", io_u->file->file_name,
47 io_ddir_name(io_u->ddir),
48 io_u->offset, io_u->buflen);
51 void log_file(struct thread_data *td, struct fio_file *f,
52 enum file_log_act what)
54 const char *act[] = { "add", "open", "close" };
58 if (!td->o.write_iolog_file)
63 * this happens on the pre-open/close done before the job starts
68 fprintf(td->iolog_f, "%s %s\n", f->file_name, act[what]);
71 static void iolog_delay(struct thread_data *td, unsigned long delay)
73 uint64_t usec = utime_since_now(&td->last_issue);
74 unsigned long orig_delay = delay;
78 if (delay < td->time_offset) {
83 delay -= td->time_offset;
89 fio_gettime(&ts, NULL);
90 while (delay && !td->terminate) {
92 if (this_delay > 500000)
95 usec_sleep(td, this_delay);
99 usec = utime_since_now(&ts);
100 if (usec > orig_delay)
101 td->time_offset = usec - orig_delay;
106 static int ipo_special(struct thread_data *td, struct io_piece *ipo)
114 if (ipo->ddir != DDIR_INVAL)
117 f = td->files[ipo->fileno];
119 switch (ipo->file_action) {
120 case FIO_LOG_OPEN_FILE:
121 if (td->o.replay_redirect && fio_file_open(f)) {
122 dprint(FD_FILE, "iolog: ignoring re-open of file %s\n",
126 ret = td_io_open_file(td, f);
129 td_verror(td, ret, "iolog open file");
131 case FIO_LOG_CLOSE_FILE:
132 td_io_close_file(td, f);
134 case FIO_LOG_UNLINK_FILE:
135 td_io_unlink_file(td, f);
138 log_err("fio: bad file action %d\n", ipo->file_action);
145 static bool read_iolog2(struct thread_data *td);
147 int read_iolog_get(struct thread_data *td, struct io_u *io_u)
149 struct io_piece *ipo;
150 unsigned long elapsed;
152 while (!flist_empty(&td->io_log_list)) {
155 if (td->o.read_iolog_chunked) {
156 if (td->io_log_checkmark == td->io_log_current) {
157 if (td->io_log_blktrace) {
158 if (!read_blktrace(td))
161 if (!read_iolog2(td))
165 td->io_log_current--;
167 ipo = flist_first_entry(&td->io_log_list, struct io_piece, list);
168 flist_del(&ipo->list);
169 remove_trim_entry(td, ipo);
171 ret = ipo_special(td, ipo);
175 } else if (ret > 0) {
180 io_u->ddir = ipo->ddir;
181 if (ipo->ddir != DDIR_WAIT) {
182 io_u->offset = ipo->offset;
183 io_u->verify_offset = ipo->offset;
184 io_u->buflen = ipo->len;
185 io_u->file = td->files[ipo->fileno];
186 get_file(io_u->file);
187 dprint(FD_IO, "iolog: get %llu/%llu/%s\n", io_u->offset,
188 io_u->buflen, io_u->file->file_name);
190 iolog_delay(td, ipo->delay);
192 elapsed = mtime_since_genesis();
193 if (ipo->delay > elapsed)
194 usec_sleep(td, (ipo->delay - elapsed) * 1000);
199 if (io_u->ddir != DDIR_WAIT)
207 void prune_io_piece_log(struct thread_data *td)
209 struct io_piece *ipo;
210 struct fio_rb_node *n;
212 while ((n = rb_first(&td->io_hist_tree)) != NULL) {
213 ipo = rb_entry(n, struct io_piece, rb_node);
214 rb_erase(n, &td->io_hist_tree);
215 remove_trim_entry(td, ipo);
220 while (!flist_empty(&td->io_hist_list)) {
221 ipo = flist_first_entry(&td->io_hist_list, struct io_piece, list);
222 flist_del(&ipo->list);
223 remove_trim_entry(td, ipo);
230 * log a successful write, so we can unwind the log for verify
232 void log_io_piece(struct thread_data *td, struct io_u *io_u)
234 struct fio_rb_node **p, *parent;
235 struct io_piece *ipo, *__ipo;
237 ipo = calloc(1, sizeof(struct io_piece));
239 ipo->file = io_u->file;
240 ipo->offset = io_u->offset;
241 ipo->len = io_u->buflen;
242 ipo->numberio = io_u->numberio;
243 ipo->flags = IP_F_IN_FLIGHT;
247 if (io_u_should_trim(td, io_u)) {
248 flist_add_tail(&ipo->trim_list, &td->trim_list);
253 * Only sort writes if we don't have a random map in which case we need
254 * to check for duplicate blocks and drop the old one, which we rely on
255 * the rb insert/lookup for handling.
257 if (file_randommap(td, ipo->file)) {
258 INIT_FLIST_HEAD(&ipo->list);
259 flist_add_tail(&ipo->list, &td->io_hist_list);
260 ipo->flags |= IP_F_ONLIST;
265 RB_CLEAR_NODE(&ipo->rb_node);
268 * Sort the entry into the verification list
271 p = &td->io_hist_tree.rb_node;
277 __ipo = rb_entry(parent, struct io_piece, rb_node);
278 if (ipo->file < __ipo->file)
280 else if (ipo->file > __ipo->file)
282 else if (ipo->offset < __ipo->offset) {
284 overlap = ipo->offset + ipo->len > __ipo->offset;
286 else if (ipo->offset > __ipo->offset) {
288 overlap = __ipo->offset + __ipo->len > ipo->offset;
294 dprint(FD_IO, "iolog: overlap %llu/%lu, %llu/%lu\n",
295 __ipo->offset, __ipo->len,
296 ipo->offset, ipo->len);
298 rb_erase(parent, &td->io_hist_tree);
299 remove_trim_entry(td, __ipo);
300 if (!(__ipo->flags & IP_F_IN_FLIGHT))
306 rb_link_node(&ipo->rb_node, parent, p);
307 rb_insert_color(&ipo->rb_node, &td->io_hist_tree);
308 ipo->flags |= IP_F_ONRB;
312 void unlog_io_piece(struct thread_data *td, struct io_u *io_u)
314 struct io_piece *ipo = io_u->ipo;
316 if (td->ts.nr_block_infos) {
317 uint32_t *info = io_u_block_info(td, io_u);
318 if (BLOCK_INFO_STATE(*info) < BLOCK_STATE_TRIM_FAILURE) {
319 if (io_u->ddir == DDIR_TRIM)
320 *info = BLOCK_INFO_SET_STATE(*info,
321 BLOCK_STATE_TRIM_FAILURE);
322 else if (io_u->ddir == DDIR_WRITE)
323 *info = BLOCK_INFO_SET_STATE(*info,
324 BLOCK_STATE_WRITE_FAILURE);
331 if (ipo->flags & IP_F_ONRB)
332 rb_erase(&ipo->rb_node, &td->io_hist_tree);
333 else if (ipo->flags & IP_F_ONLIST)
334 flist_del(&ipo->list);
341 void trim_io_piece(const struct io_u *io_u)
343 struct io_piece *ipo = io_u->ipo;
348 ipo->len = io_u->xfer_buflen - io_u->resid;
351 void write_iolog_close(struct thread_data *td)
360 td->iolog_buf = NULL;
363 int64_t iolog_items_to_fetch(struct thread_data *td)
368 int64_t items_to_fetch;
370 if (!td->io_log_highmark)
374 fio_gettime(&now, NULL);
375 elapsed = ntime_since(&td->io_log_highmark_time, &now);
377 for_1s = (td->io_log_highmark - td->io_log_current) * 1000000000 / elapsed;
378 items_to_fetch = for_1s - td->io_log_current;
379 if (items_to_fetch < 0)
384 td->io_log_highmark = td->io_log_current + items_to_fetch;
385 td->io_log_checkmark = (td->io_log_highmark + 1) / 2;
386 fio_gettime(&td->io_log_highmark_time, NULL);
388 return items_to_fetch;
392 * Read version 2 iolog data. It is enhanced to include per-file logging,
395 static bool read_iolog2(struct thread_data *td)
397 unsigned long long offset;
399 int reads, writes, waits, fileno = 0, file_action = 0; /* stupid gcc */
400 char *rfname, *fname, *act;
403 bool realloc = false;
404 int64_t items_to_fetch = 0;
407 if (td->o.read_iolog_chunked) {
408 items_to_fetch = iolog_items_to_fetch(td);
414 * Read in the read iolog and store it, reuse the infrastructure
415 * for doing verifications.
418 rfname = fname = malloc(256+16);
419 act = malloc(256+16);
421 syncs = reads = writes = waits = 0;
422 while ((p = fgets(str, 4096, td->io_log_rfile)) != NULL) {
423 struct io_piece *ipo;
426 r = sscanf(p, "%256s %256s %llu %u", rfname, act, &offset,
429 if (td->o.replay_redirect)
430 fname = td->o.replay_redirect;
436 if (!strcmp(act, "wait"))
438 else if (!strcmp(act, "read"))
440 else if (!strcmp(act, "write"))
442 else if (!strcmp(act, "sync"))
444 else if (!strcmp(act, "datasync"))
446 else if (!strcmp(act, "trim"))
449 log_err("fio: bad iolog file action: %s\n",
453 fileno = get_fileno(td, fname);
456 if (!strcmp(act, "add")) {
457 if (td->o.replay_redirect &&
458 get_fileno(td, fname) != -1) {
459 dprint(FD_FILE, "iolog: ignoring"
460 " re-add of file %s\n", fname);
462 fileno = add_file(td, fname, td->subjob_number, 1);
463 file_action = FIO_LOG_ADD_FILE;
466 } else if (!strcmp(act, "open")) {
467 fileno = get_fileno(td, fname);
468 file_action = FIO_LOG_OPEN_FILE;
469 } else if (!strcmp(act, "close")) {
470 fileno = get_fileno(td, fname);
471 file_action = FIO_LOG_CLOSE_FILE;
473 log_err("fio: bad iolog file action: %s\n",
478 log_err("bad iolog2: %s\n", p);
484 else if (rw == DDIR_WRITE) {
486 * Don't add a write for ro mode
491 } else if (rw == DDIR_WAIT) {
495 } else if (rw == DDIR_INVAL) {
496 } else if (ddir_sync(rw)) {
499 log_err("bad ddir: %d\n", rw);
506 ipo = calloc(1, sizeof(*ipo));
509 if (rw == DDIR_WAIT) {
512 if (td->o.replay_scale)
513 ipo->offset = offset / td->o.replay_scale;
515 ipo->offset = offset;
516 ipo_bytes_align(td->o.replay_align, ipo);
519 if (rw != DDIR_INVAL && bytes > td->o.max_bs[rw]) {
521 td->o.max_bs[rw] = bytes;
523 ipo->fileno = fileno;
524 ipo->file_action = file_action;
528 queue_io_piece(td, ipo);
530 if (td->o.read_iolog_chunked) {
531 td->io_log_current++;
533 if (items_to_fetch == 0)
542 if (td->o.read_iolog_chunked) {
543 td->io_log_highmark = td->io_log_current;
544 td->io_log_checkmark = (td->io_log_highmark + 1) / 2;
545 fio_gettime(&td->io_log_highmark_time, NULL);
548 if (writes && read_only) {
549 log_err("fio: <%s> skips replay of %d writes due to"
550 " read-only\n", td->o.name, writes);
554 td->flags |= TD_F_SYNCS;
556 if (td->o.read_iolog_chunked) {
557 if (td->io_log_current == 0) {
560 td->o.td_ddir = TD_DDIR_RW;
561 if (realloc && td->orig_buffer)
565 init_io_u_buffers(td);
570 if (!reads && !writes && !waits)
572 else if (reads && !writes)
573 td->o.td_ddir = TD_DDIR_READ;
574 else if (!reads && writes)
575 td->o.td_ddir = TD_DDIR_WRITE;
577 td->o.td_ddir = TD_DDIR_RW;
582 static bool is_socket(const char *path)
587 r = stat(path, &buf);
591 return S_ISSOCK(buf.st_mode);
594 static int open_socket(const char *path)
596 struct sockaddr_un addr;
599 fd = socket(AF_UNIX, SOCK_STREAM, 0);
603 addr.sun_family = AF_UNIX;
604 if (snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", path) >=
605 sizeof(addr.sun_path)) {
606 log_err("%s: path name %s is too long for a Unix socket\n",
610 ret = connect(fd, (const struct sockaddr *)&addr, strlen(path) + sizeof(addr.sun_family));
619 * open iolog, check version, and call appropriate parser
621 static bool init_iolog_read(struct thread_data *td, char *fname)
623 char buffer[256], *p;
626 dprint(FD_IO, "iolog: name=%s\n", fname);
628 if (is_socket(fname)) {
631 fd = open_socket(fname);
634 } else if (!strcmp(fname, "-")) {
637 f = fopen(fname, "r");
640 perror("fopen read iolog");
644 p = fgets(buffer, sizeof(buffer), f);
646 td_verror(td, errno, "iolog read");
647 log_err("fio: unable to read iolog\n");
653 * version 2 of the iolog stores a specific string as the
654 * first line, check for that
656 if (!strncmp(iolog_ver2, buffer, strlen(iolog_ver2))) {
657 free_release_files(td);
658 td->io_log_rfile = f;
659 return read_iolog2(td);
662 log_err("fio: iolog version 1 is no longer supported\n");
668 * Set up a log for storing io patterns.
670 static bool init_iolog_write(struct thread_data *td)
676 f = fopen(td->o.write_iolog_file, "a");
678 perror("fopen write iolog");
683 * That's it for writing, setup a log buffer and we're done.
686 td->iolog_buf = malloc(8192);
687 setvbuf(f, td->iolog_buf, _IOFBF, 8192);
690 * write our version line
692 if (fprintf(f, "%s\n", iolog_ver2) < 0) {
693 perror("iolog init\n");
698 * add all known files
700 for_each_file(td, ff, i)
701 log_file(td, ff, FIO_LOG_ADD_FILE);
706 bool init_iolog(struct thread_data *td)
710 if (td->o.read_iolog_file) {
712 char * fname = get_name_by_idx(td->o.read_iolog_file, td->subjob_number);
715 * Check if it's a blktrace file and load that if possible.
716 * Otherwise assume it's a normal log file and load that.
718 if (is_blktrace(fname, &need_swap)) {
719 td->io_log_blktrace = 1;
720 ret = init_blktrace_read(td, fname, need_swap);
722 td->io_log_blktrace = 0;
723 ret = init_iolog_read(td, fname);
726 } else if (td->o.write_iolog_file)
727 ret = init_iolog_write(td);
732 td_verror(td, EINVAL, "failed initializing iolog");
737 void setup_log(struct io_log **log, struct log_params *p,
738 const char *filename)
742 struct io_u_plat_entry *entry;
743 struct flist_head *list;
745 l = scalloc(1, sizeof(*l));
746 INIT_FLIST_HEAD(&l->io_logs);
747 l->log_type = p->log_type;
748 l->log_offset = p->log_offset;
749 l->log_prio = p->log_prio;
750 l->log_gz = p->log_gz;
751 l->log_gz_store = p->log_gz_store;
752 l->avg_msec = p->avg_msec;
753 l->hist_msec = p->hist_msec;
754 l->hist_coarseness = p->hist_coarseness;
755 l->filename = strdup(filename);
758 /* Initialize histogram lists for each r/w direction,
759 * with initial io_u_plat of all zeros:
761 for (i = 0; i < DDIR_RWDIR_CNT; i++) {
762 list = &l->hist_window[i].list;
763 INIT_FLIST_HEAD(list);
764 entry = calloc(1, sizeof(struct io_u_plat_entry));
765 flist_add(&entry->list, list);
768 if (l->td && l->td->o.io_submit_mode != IO_MODE_OFFLOAD) {
769 unsigned int def_samples = DEF_LOG_ENTRIES;
772 __p = calloc(1, sizeof(*l->pending));
773 if (l->td->o.iodepth > DEF_LOG_ENTRIES)
774 def_samples = roundup_pow2(l->td->o.iodepth);
775 __p->max_samples = def_samples;
776 __p->log = calloc(__p->max_samples, log_entry_sz(l));
781 l->log_ddir_mask = LOG_OFFSET_SAMPLE_BIT;
783 l->log_ddir_mask |= LOG_PRIO_SAMPLE_BIT;
785 INIT_FLIST_HEAD(&l->chunk_list);
787 if (l->log_gz && !p->td)
789 else if (l->log_gz || l->log_gz_store) {
790 mutex_init_pshared(&l->chunk_lock);
791 mutex_init_pshared(&l->deferred_free_lock);
792 p->td->flags |= TD_F_COMPRESS_LOG;
798 #ifdef CONFIG_SETVBUF
799 static void *set_file_buffer(FILE *f)
801 size_t size = 1048576;
805 setvbuf(f, buf, _IOFBF, size);
809 static void clear_file_buffer(void *buf)
814 static void *set_file_buffer(FILE *f)
819 static void clear_file_buffer(void *buf)
824 void free_log(struct io_log *log)
826 while (!flist_empty(&log->io_logs)) {
827 struct io_logs *cur_log;
829 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
830 flist_del_init(&cur_log->list);
836 free(log->pending->log);
846 uint64_t hist_sum(int j, int stride, uint64_t *io_u_plat,
847 uint64_t *io_u_plat_last)
852 if (io_u_plat_last) {
853 for (k = sum = 0; k < stride; k++)
854 sum += io_u_plat[j + k] - io_u_plat_last[j + k];
856 for (k = sum = 0; k < stride; k++)
857 sum += io_u_plat[j + k];
863 static void flush_hist_samples(FILE *f, int hist_coarseness, void *samples,
864 uint64_t sample_size)
868 uint64_t i, j, nr_samples;
869 struct io_u_plat_entry *entry, *entry_before;
871 uint64_t *io_u_plat_before;
873 int stride = 1 << hist_coarseness;
878 s = __get_sample(samples, 0, 0);
879 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
881 nr_samples = sample_size / __log_entry_sz(log_offset);
883 for (i = 0; i < nr_samples; i++) {
884 s = __get_sample(samples, log_offset, i);
886 entry = s->data.plat_entry;
887 io_u_plat = entry->io_u_plat;
889 entry_before = flist_first_entry(&entry->list, struct io_u_plat_entry, list);
890 io_u_plat_before = entry_before->io_u_plat;
892 fprintf(f, "%lu, %u, %llu, ", (unsigned long) s->time,
893 io_sample_ddir(s), (unsigned long long) s->bs);
894 for (j = 0; j < FIO_IO_U_PLAT_NR - stride; j += stride) {
895 fprintf(f, "%llu, ", (unsigned long long)
896 hist_sum(j, stride, io_u_plat, io_u_plat_before));
898 fprintf(f, "%llu\n", (unsigned long long)
899 hist_sum(FIO_IO_U_PLAT_NR - stride, stride, io_u_plat,
902 flist_del(&entry_before->list);
907 void flush_samples(FILE *f, void *samples, uint64_t sample_size)
910 int log_offset, log_prio;
911 uint64_t i, nr_samples;
912 unsigned int prio_val;
918 s = __get_sample(samples, 0, 0);
919 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
920 log_prio = (s->__ddir & LOG_PRIO_SAMPLE_BIT) != 0;
924 fmt = "%lu, %" PRId64 ", %u, %llu, %llu, 0x%04x\n";
926 fmt = "%lu, %" PRId64 ", %u, %llu, %llu, %u\n";
929 fmt = "%lu, %" PRId64 ", %u, %llu, 0x%04x\n";
931 fmt = "%lu, %" PRId64 ", %u, %llu, %u\n";
934 nr_samples = sample_size / __log_entry_sz(log_offset);
936 for (i = 0; i < nr_samples; i++) {
937 s = __get_sample(samples, log_offset, i);
940 prio_val = s->priority;
942 prio_val = ioprio_value_is_class_rt(s->priority);
946 (unsigned long) s->time,
948 io_sample_ddir(s), (unsigned long long) s->bs,
951 struct io_sample_offset *so = (void *) s;
954 (unsigned long) s->time,
956 io_sample_ddir(s), (unsigned long long) s->bs,
957 (unsigned long long) so->offset,
965 struct iolog_flush_data {
966 struct workqueue_work work;
973 #define GZ_CHUNK 131072
975 static struct iolog_compress *get_new_chunk(unsigned int seq)
977 struct iolog_compress *c;
979 c = malloc(sizeof(*c));
980 INIT_FLIST_HEAD(&c->list);
981 c->buf = malloc(GZ_CHUNK);
987 static void free_chunk(struct iolog_compress *ic)
993 static int z_stream_init(z_stream *stream, int gz_hdr)
997 memset(stream, 0, sizeof(*stream));
998 stream->zalloc = Z_NULL;
999 stream->zfree = Z_NULL;
1000 stream->opaque = Z_NULL;
1001 stream->next_in = Z_NULL;
1004 * zlib magic - add 32 for auto-detection of gz header or not,
1005 * if we decide to store files in a gzip friendly format.
1010 if (inflateInit2(stream, wbits) != Z_OK)
1016 struct inflate_chunk_iter {
1025 static void finish_chunk(z_stream *stream, FILE *f,
1026 struct inflate_chunk_iter *iter)
1030 ret = inflateEnd(stream);
1032 log_err("fio: failed to end log inflation seq %d (%d)\n",
1035 flush_samples(f, iter->buf, iter->buf_used);
1038 iter->buf_size = iter->buf_used = 0;
1042 * Iterative chunk inflation. Handles cases where we cross into a new
1043 * sequence, doing flush finish of previous chunk if needed.
1045 static size_t inflate_chunk(struct iolog_compress *ic, int gz_hdr, FILE *f,
1046 z_stream *stream, struct inflate_chunk_iter *iter)
1050 dprint(FD_COMPRESS, "inflate chunk size=%lu, seq=%u\n",
1051 (unsigned long) ic->len, ic->seq);
1053 if (ic->seq != iter->seq) {
1055 finish_chunk(stream, f, iter);
1057 z_stream_init(stream, gz_hdr);
1058 iter->seq = ic->seq;
1061 stream->avail_in = ic->len;
1062 stream->next_in = ic->buf;
1064 if (!iter->buf_size) {
1065 iter->buf_size = iter->chunk_sz;
1066 iter->buf = malloc(iter->buf_size);
1069 while (stream->avail_in) {
1070 size_t this_out = iter->buf_size - iter->buf_used;
1073 stream->avail_out = this_out;
1074 stream->next_out = iter->buf + iter->buf_used;
1076 err = inflate(stream, Z_NO_FLUSH);
1078 log_err("fio: failed inflating log: %d\n", err);
1083 iter->buf_used += this_out - stream->avail_out;
1085 if (!stream->avail_out) {
1086 iter->buf_size += iter->chunk_sz;
1087 iter->buf = realloc(iter->buf, iter->buf_size);
1091 if (err == Z_STREAM_END)
1095 ret = (void *) stream->next_in - ic->buf;
1097 dprint(FD_COMPRESS, "inflated to size=%lu\n", (unsigned long) iter->buf_size);
1103 * Inflate stored compressed chunks, or write them directly to the log
1104 * file if so instructed.
1106 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1108 struct inflate_chunk_iter iter = { .chunk_sz = log->log_gz, };
1111 while (!flist_empty(&log->chunk_list)) {
1112 struct iolog_compress *ic;
1114 ic = flist_first_entry(&log->chunk_list, struct iolog_compress, list);
1115 flist_del(&ic->list);
1117 if (log->log_gz_store) {
1120 dprint(FD_COMPRESS, "log write chunk size=%lu, "
1121 "seq=%u\n", (unsigned long) ic->len, ic->seq);
1123 ret = fwrite(ic->buf, ic->len, 1, f);
1124 if (ret != 1 || ferror(f)) {
1126 log_err("fio: error writing compressed log\n");
1129 inflate_chunk(ic, log->log_gz_store, f, &stream, &iter);
1135 finish_chunk(&stream, f, &iter);
1143 * Open compressed log file and decompress the stored chunks and
1144 * write them to stdout. The chunks are stored sequentially in the
1145 * file, so we iterate over them and do them one-by-one.
1147 int iolog_file_inflate(const char *file)
1149 struct inflate_chunk_iter iter = { .chunk_sz = 64 * 1024 * 1024, };
1150 struct iolog_compress ic;
1158 f = fopen(file, "r");
1164 if (stat(file, &sb) < 0) {
1170 ic.buf = buf = malloc(sb.st_size);
1171 ic.len = sb.st_size;
1174 ret = fread(ic.buf, ic.len, 1, f);
1175 if (ret == 0 && ferror(f)) {
1180 } else if (ferror(f) || (!feof(f) && ret != 1)) {
1181 log_err("fio: short read on reading log\n");
1190 * Each chunk will return Z_STREAM_END. We don't know how many
1191 * chunks are in the file, so we just keep looping and incrementing
1192 * the sequence number until we have consumed the whole compressed
1199 iret = inflate_chunk(&ic, 1, stdout, &stream, &iter);
1212 finish_chunk(&stream, stdout, &iter);
1222 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1227 int iolog_file_inflate(const char *file)
1229 log_err("fio: log inflation not possible without zlib\n");
1235 void flush_log(struct io_log *log, bool do_append)
1241 f = fopen(log->filename, "w");
1243 f = fopen(log->filename, "a");
1245 perror("fopen log");
1249 buf = set_file_buffer(f);
1251 inflate_gz_chunks(log, f);
1253 while (!flist_empty(&log->io_logs)) {
1254 struct io_logs *cur_log;
1256 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1257 flist_del_init(&cur_log->list);
1259 if (log->td && log == log->td->clat_hist_log)
1260 flush_hist_samples(f, log->hist_coarseness, cur_log->log,
1261 log_sample_sz(log, cur_log));
1263 flush_samples(f, cur_log->log, log_sample_sz(log, cur_log));
1269 clear_file_buffer(buf);
1272 static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
1274 if (td->flags & TD_F_COMPRESS_LOG)
1278 if (fio_trylock_file(log->filename))
1281 fio_lock_file(log->filename);
1283 if (td->client_type == FIO_CLIENT_TYPE_GUI || is_backend)
1284 fio_send_iolog(td, log, log->filename);
1286 flush_log(log, !td->o.per_job_logs);
1288 fio_unlock_file(log->filename);
1293 size_t log_chunk_sizes(struct io_log *log)
1295 struct flist_head *entry;
1298 if (flist_empty(&log->chunk_list))
1302 pthread_mutex_lock(&log->chunk_lock);
1303 flist_for_each(entry, &log->chunk_list) {
1304 struct iolog_compress *c;
1306 c = flist_entry(entry, struct iolog_compress, list);
1309 pthread_mutex_unlock(&log->chunk_lock);
1315 static void iolog_put_deferred(struct io_log *log, void *ptr)
1320 pthread_mutex_lock(&log->deferred_free_lock);
1321 if (log->deferred < IOLOG_MAX_DEFER) {
1322 log->deferred_items[log->deferred] = ptr;
1324 } else if (!fio_did_warn(FIO_WARN_IOLOG_DROP))
1325 log_err("fio: had to drop log entry free\n");
1326 pthread_mutex_unlock(&log->deferred_free_lock);
1329 static void iolog_free_deferred(struct io_log *log)
1336 pthread_mutex_lock(&log->deferred_free_lock);
1338 for (i = 0; i < log->deferred; i++) {
1339 free(log->deferred_items[i]);
1340 log->deferred_items[i] = NULL;
1344 pthread_mutex_unlock(&log->deferred_free_lock);
1347 static int gz_work(struct iolog_flush_data *data)
1349 struct iolog_compress *c = NULL;
1350 struct flist_head list;
1356 INIT_FLIST_HEAD(&list);
1358 memset(&stream, 0, sizeof(stream));
1359 stream.zalloc = Z_NULL;
1360 stream.zfree = Z_NULL;
1361 stream.opaque = Z_NULL;
1363 ret = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
1365 log_err("fio: failed to init gz stream\n");
1369 seq = ++data->log->chunk_seq;
1371 stream.next_in = (void *) data->samples;
1372 stream.avail_in = data->nr_samples * log_entry_sz(data->log);
1374 dprint(FD_COMPRESS, "deflate input size=%lu, seq=%u, log=%s\n",
1375 (unsigned long) stream.avail_in, seq,
1376 data->log->filename);
1379 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1380 (unsigned long) c->len);
1381 c = get_new_chunk(seq);
1382 stream.avail_out = GZ_CHUNK;
1383 stream.next_out = c->buf;
1384 ret = deflate(&stream, Z_NO_FLUSH);
1386 log_err("fio: deflate log (%d)\n", ret);
1391 c->len = GZ_CHUNK - stream.avail_out;
1392 flist_add_tail(&c->list, &list);
1394 } while (stream.avail_in);
1396 stream.next_out = c->buf + c->len;
1397 stream.avail_out = GZ_CHUNK - c->len;
1399 ret = deflate(&stream, Z_FINISH);
1402 * Z_BUF_ERROR is special, it just means we need more
1403 * output space. We'll handle that below. Treat any other
1406 if (ret != Z_BUF_ERROR) {
1407 log_err("fio: deflate log (%d)\n", ret);
1408 flist_del(&c->list);
1415 c->len = GZ_CHUNK - stream.avail_out;
1417 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq, (unsigned long) c->len);
1419 if (ret != Z_STREAM_END) {
1421 c = get_new_chunk(seq);
1422 stream.avail_out = GZ_CHUNK;
1423 stream.next_out = c->buf;
1424 ret = deflate(&stream, Z_FINISH);
1425 c->len = GZ_CHUNK - stream.avail_out;
1427 flist_add_tail(&c->list, &list);
1428 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1429 (unsigned long) c->len);
1430 } while (ret != Z_STREAM_END);
1433 dprint(FD_COMPRESS, "deflated to size=%lu\n", (unsigned long) total);
1435 ret = deflateEnd(&stream);
1437 log_err("fio: deflateEnd %d\n", ret);
1439 iolog_put_deferred(data->log, data->samples);
1441 if (!flist_empty(&list)) {
1442 pthread_mutex_lock(&data->log->chunk_lock);
1443 flist_splice_tail(&list, &data->log->chunk_list);
1444 pthread_mutex_unlock(&data->log->chunk_lock);
1453 while (!flist_empty(&list)) {
1454 c = flist_first_entry(list.next, struct iolog_compress, list);
1455 flist_del(&c->list);
1463 * Invoked from our compress helper thread, when logging would have exceeded
1464 * the specified memory limitation. Compresses the previously stored
1467 static int gz_work_async(struct submit_worker *sw, struct workqueue_work *work)
1469 return gz_work(container_of(work, struct iolog_flush_data, work));
1472 static int gz_init_worker(struct submit_worker *sw)
1474 struct thread_data *td = sw->wq->td;
1476 if (!fio_option_is_set(&td->o, log_gz_cpumask))
1479 if (fio_setaffinity(gettid(), td->o.log_gz_cpumask) == -1) {
1480 log_err("gz: failed to set CPU affinity\n");
1487 static struct workqueue_ops log_compress_wq_ops = {
1488 .fn = gz_work_async,
1489 .init_worker_fn = gz_init_worker,
1493 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1495 if (!(td->flags & TD_F_COMPRESS_LOG))
1498 workqueue_init(td, &td->log_compress_wq, &log_compress_wq_ops, 1, sk_out);
1502 void iolog_compress_exit(struct thread_data *td)
1504 if (!(td->flags & TD_F_COMPRESS_LOG))
1507 workqueue_exit(&td->log_compress_wq);
1511 * Queue work item to compress the existing log entries. We reset the
1512 * current log to a small size, and reference the existing log in the
1513 * data that we queue for compression. Once compression has been done,
1514 * this old log is freed. If called with finish == true, will not return
1515 * until the log compression has completed, and will flush all previous
1518 static int iolog_flush(struct io_log *log)
1520 struct iolog_flush_data *data;
1522 data = malloc(sizeof(*data));
1529 while (!flist_empty(&log->io_logs)) {
1530 struct io_logs *cur_log;
1532 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1533 flist_del_init(&cur_log->list);
1535 data->samples = cur_log->log;
1536 data->nr_samples = cur_log->nr_samples;
1547 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1549 struct iolog_flush_data *data;
1551 data = smalloc(sizeof(*data));
1557 data->samples = cur_log->log;
1558 data->nr_samples = cur_log->nr_samples;
1561 cur_log->nr_samples = cur_log->max_samples = 0;
1562 cur_log->log = NULL;
1564 workqueue_enqueue(&log->td->log_compress_wq, &data->work);
1566 iolog_free_deferred(log);
1572 static int iolog_flush(struct io_log *log)
1577 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1582 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1587 void iolog_compress_exit(struct thread_data *td)
1593 struct io_logs *iolog_cur_log(struct io_log *log)
1595 if (flist_empty(&log->io_logs))
1598 return flist_last_entry(&log->io_logs, struct io_logs, list);
1601 uint64_t iolog_nr_samples(struct io_log *iolog)
1603 struct flist_head *entry;
1606 flist_for_each(entry, &iolog->io_logs) {
1607 struct io_logs *cur_log;
1609 cur_log = flist_entry(entry, struct io_logs, list);
1610 ret += cur_log->nr_samples;
1616 static int __write_log(struct thread_data *td, struct io_log *log, int try)
1619 return finish_log(td, log, try);
1624 static int write_iops_log(struct thread_data *td, int try, bool unit_log)
1628 if (per_unit_log(td->iops_log) != unit_log)
1631 ret = __write_log(td, td->iops_log, try);
1633 td->iops_log = NULL;
1638 static int write_slat_log(struct thread_data *td, int try, bool unit_log)
1645 ret = __write_log(td, td->slat_log, try);
1647 td->slat_log = NULL;
1652 static int write_clat_log(struct thread_data *td, int try, bool unit_log)
1659 ret = __write_log(td, td->clat_log, try);
1661 td->clat_log = NULL;
1666 static int write_clat_hist_log(struct thread_data *td, int try, bool unit_log)
1673 ret = __write_log(td, td->clat_hist_log, try);
1675 td->clat_hist_log = NULL;
1680 static int write_lat_log(struct thread_data *td, int try, bool unit_log)
1687 ret = __write_log(td, td->lat_log, try);
1694 static int write_bandw_log(struct thread_data *td, int try, bool unit_log)
1698 if (per_unit_log(td->bw_log) != unit_log)
1701 ret = __write_log(td, td->bw_log, try);
1714 CLAT_HIST_LOG_MASK = 32,
1721 int (*fn)(struct thread_data *, int, bool);
1724 static struct log_type log_types[] = {
1726 .mask = BW_LOG_MASK,
1727 .fn = write_bandw_log,
1730 .mask = LAT_LOG_MASK,
1731 .fn = write_lat_log,
1734 .mask = SLAT_LOG_MASK,
1735 .fn = write_slat_log,
1738 .mask = CLAT_LOG_MASK,
1739 .fn = write_clat_log,
1742 .mask = IOPS_LOG_MASK,
1743 .fn = write_iops_log,
1746 .mask = CLAT_HIST_LOG_MASK,
1747 .fn = write_clat_hist_log,
1751 void td_writeout_logs(struct thread_data *td, bool unit_logs)
1753 unsigned int log_mask = 0;
1754 unsigned int log_left = ALL_LOG_NR;
1757 old_state = td_bump_runstate(td, TD_FINISHING);
1759 finalize_logs(td, unit_logs);
1762 int prev_log_left = log_left;
1764 for (i = 0; i < ALL_LOG_NR && log_left; i++) {
1765 struct log_type *lt = &log_types[i];
1768 if (!(log_mask & lt->mask)) {
1769 ret = lt->fn(td, log_left != 1, unit_logs);
1772 log_mask |= lt->mask;
1777 if (prev_log_left == log_left)
1781 td_restore_runstate(td, old_state);
1784 void fio_writeout_logs(bool unit_logs)
1786 struct thread_data *td;
1790 td_writeout_logs(td, unit_logs);