2 * Code related to writing an iolog of what a thread is doing, and to
3 * later read that back and replay
22 #include "lib/roundup.h"
24 #include <netinet/in.h>
25 #include <netinet/tcp.h>
26 #include <arpa/inet.h>
28 #include <sys/socket.h>
31 static int iolog_flush(struct io_log *log);
33 static const char iolog_ver2[] = "fio version 2 iolog";
35 void queue_io_piece(struct thread_data *td, struct io_piece *ipo)
37 flist_add_tail(&ipo->list, &td->io_log_list);
38 td->total_io_size += ipo->len;
41 void log_io_u(const struct thread_data *td, const struct io_u *io_u)
43 if (!td->o.write_iolog_file)
46 fprintf(td->iolog_f, "%s %s %llu %llu\n", io_u->file->file_name,
47 io_ddir_name(io_u->ddir),
48 io_u->offset, io_u->buflen);
51 void log_file(struct thread_data *td, struct fio_file *f,
52 enum file_log_act what)
54 const char *act[] = { "add", "open", "close" };
58 if (!td->o.write_iolog_file)
63 * this happens on the pre-open/close done before the job starts
68 fprintf(td->iolog_f, "%s %s\n", f->file_name, act[what]);
71 static void iolog_delay(struct thread_data *td, unsigned long delay)
73 uint64_t usec = utime_since_now(&td->last_issue);
74 unsigned long orig_delay = delay;
78 if (delay < td->time_offset) {
83 delay -= td->time_offset;
89 fio_gettime(&ts, NULL);
90 while (delay && !td->terminate) {
92 if (this_delay > 500000)
95 usec_sleep(td, this_delay);
99 usec = utime_since_now(&ts);
100 if (usec > orig_delay)
101 td->time_offset = usec - orig_delay;
106 static int ipo_special(struct thread_data *td, struct io_piece *ipo)
114 if (ipo->ddir != DDIR_INVAL)
117 f = td->files[ipo->fileno];
119 switch (ipo->file_action) {
120 case FIO_LOG_OPEN_FILE:
121 if (td->o.replay_redirect && fio_file_open(f)) {
122 dprint(FD_FILE, "iolog: ignoring re-open of file %s\n",
126 ret = td_io_open_file(td, f);
129 td_verror(td, ret, "iolog open file");
131 case FIO_LOG_CLOSE_FILE:
132 td_io_close_file(td, f);
134 case FIO_LOG_UNLINK_FILE:
135 td_io_unlink_file(td, f);
138 log_err("fio: bad file action %d\n", ipo->file_action);
145 static bool read_iolog2(struct thread_data *td);
147 int read_iolog_get(struct thread_data *td, struct io_u *io_u)
149 struct io_piece *ipo;
150 unsigned long elapsed;
152 while (!flist_empty(&td->io_log_list)) {
155 if (td->o.read_iolog_chunked) {
156 if (td->io_log_checkmark == td->io_log_current) {
157 if (td->io_log_blktrace) {
158 if (!read_blktrace(td))
161 if (!read_iolog2(td))
165 td->io_log_current--;
167 ipo = flist_first_entry(&td->io_log_list, struct io_piece, list);
168 flist_del(&ipo->list);
169 remove_trim_entry(td, ipo);
171 ret = ipo_special(td, ipo);
175 } else if (ret > 0) {
180 io_u->ddir = ipo->ddir;
181 if (ipo->ddir != DDIR_WAIT) {
182 io_u->offset = ipo->offset;
183 io_u->verify_offset = ipo->offset;
184 io_u->buflen = ipo->len;
185 io_u->file = td->files[ipo->fileno];
186 get_file(io_u->file);
187 dprint(FD_IO, "iolog: get %llu/%llu/%s\n", io_u->offset,
188 io_u->buflen, io_u->file->file_name);
190 iolog_delay(td, ipo->delay);
192 elapsed = mtime_since_genesis();
193 if (ipo->delay > elapsed)
194 usec_sleep(td, (ipo->delay - elapsed) * 1000);
199 if (io_u->ddir != DDIR_WAIT)
207 void prune_io_piece_log(struct thread_data *td)
209 struct io_piece *ipo;
210 struct fio_rb_node *n;
212 while ((n = rb_first(&td->io_hist_tree)) != NULL) {
213 ipo = rb_entry(n, struct io_piece, rb_node);
214 rb_erase(n, &td->io_hist_tree);
215 remove_trim_entry(td, ipo);
220 while (!flist_empty(&td->io_hist_list)) {
221 ipo = flist_first_entry(&td->io_hist_list, struct io_piece, list);
222 flist_del(&ipo->list);
223 remove_trim_entry(td, ipo);
230 * log a successful write, so we can unwind the log for verify
232 void log_io_piece(struct thread_data *td, struct io_u *io_u)
234 struct fio_rb_node **p, *parent;
235 struct io_piece *ipo, *__ipo;
237 ipo = calloc(1, sizeof(struct io_piece));
239 ipo->file = io_u->file;
240 ipo->offset = io_u->offset;
241 ipo->len = io_u->buflen;
242 ipo->numberio = io_u->numberio;
243 ipo->flags = IP_F_IN_FLIGHT;
247 if (io_u_should_trim(td, io_u)) {
248 flist_add_tail(&ipo->trim_list, &td->trim_list);
253 * Only sort writes if we don't have a random map in which case we need
254 * to check for duplicate blocks and drop the old one, which we rely on
255 * the rb insert/lookup for handling.
257 if (file_randommap(td, ipo->file)) {
258 INIT_FLIST_HEAD(&ipo->list);
259 flist_add_tail(&ipo->list, &td->io_hist_list);
260 ipo->flags |= IP_F_ONLIST;
265 RB_CLEAR_NODE(&ipo->rb_node);
268 * Sort the entry into the verification list
271 p = &td->io_hist_tree.rb_node;
277 __ipo = rb_entry(parent, struct io_piece, rb_node);
278 if (ipo->file < __ipo->file)
280 else if (ipo->file > __ipo->file)
282 else if (ipo->offset < __ipo->offset) {
284 overlap = ipo->offset + ipo->len > __ipo->offset;
286 else if (ipo->offset > __ipo->offset) {
288 overlap = __ipo->offset + __ipo->len > ipo->offset;
294 dprint(FD_IO, "iolog: overlap %llu/%lu, %llu/%lu\n",
295 __ipo->offset, __ipo->len,
296 ipo->offset, ipo->len);
298 rb_erase(parent, &td->io_hist_tree);
299 remove_trim_entry(td, __ipo);
300 if (!(__ipo->flags & IP_F_IN_FLIGHT))
306 rb_link_node(&ipo->rb_node, parent, p);
307 rb_insert_color(&ipo->rb_node, &td->io_hist_tree);
308 ipo->flags |= IP_F_ONRB;
312 void unlog_io_piece(struct thread_data *td, struct io_u *io_u)
314 struct io_piece *ipo = io_u->ipo;
316 if (td->ts.nr_block_infos) {
317 uint32_t *info = io_u_block_info(td, io_u);
318 if (BLOCK_INFO_STATE(*info) < BLOCK_STATE_TRIM_FAILURE) {
319 if (io_u->ddir == DDIR_TRIM)
320 *info = BLOCK_INFO_SET_STATE(*info,
321 BLOCK_STATE_TRIM_FAILURE);
322 else if (io_u->ddir == DDIR_WRITE)
323 *info = BLOCK_INFO_SET_STATE(*info,
324 BLOCK_STATE_WRITE_FAILURE);
331 if (ipo->flags & IP_F_ONRB)
332 rb_erase(&ipo->rb_node, &td->io_hist_tree);
333 else if (ipo->flags & IP_F_ONLIST)
334 flist_del(&ipo->list);
341 void trim_io_piece(const struct io_u *io_u)
343 struct io_piece *ipo = io_u->ipo;
348 ipo->len = io_u->xfer_buflen - io_u->resid;
351 void write_iolog_close(struct thread_data *td)
360 td->iolog_buf = NULL;
363 int64_t iolog_items_to_fetch(struct thread_data *td)
368 int64_t items_to_fetch;
370 if (!td->io_log_highmark)
374 fio_gettime(&now, NULL);
375 elapsed = ntime_since(&td->io_log_highmark_time, &now);
377 for_1s = (td->io_log_highmark - td->io_log_current) * 1000000000 / elapsed;
378 items_to_fetch = for_1s - td->io_log_current;
379 if (items_to_fetch < 0)
384 td->io_log_highmark = td->io_log_current + items_to_fetch;
385 td->io_log_checkmark = (td->io_log_highmark + 1) / 2;
386 fio_gettime(&td->io_log_highmark_time, NULL);
388 return items_to_fetch;
392 * Read version 2 iolog data. It is enhanced to include per-file logging,
395 static bool read_iolog2(struct thread_data *td)
397 unsigned long long offset;
399 int reads, writes, waits, fileno = 0, file_action = 0; /* stupid gcc */
400 char *rfname, *fname, *act;
403 bool realloc = false;
404 int64_t items_to_fetch = 0;
406 if (td->o.read_iolog_chunked) {
407 items_to_fetch = iolog_items_to_fetch(td);
413 * Read in the read iolog and store it, reuse the infrastructure
414 * for doing verifications.
417 rfname = fname = malloc(256+16);
418 act = malloc(256+16);
420 reads = writes = waits = 0;
421 while ((p = fgets(str, 4096, td->io_log_rfile)) != NULL) {
422 struct io_piece *ipo;
425 r = sscanf(p, "%256s %256s %llu %u", rfname, act, &offset,
428 if (td->o.replay_redirect)
429 fname = td->o.replay_redirect;
435 if (!strcmp(act, "wait"))
437 else if (!strcmp(act, "read"))
439 else if (!strcmp(act, "write"))
441 else if (!strcmp(act, "sync"))
443 else if (!strcmp(act, "datasync"))
445 else if (!strcmp(act, "trim"))
448 log_err("fio: bad iolog file action: %s\n",
452 fileno = get_fileno(td, fname);
455 if (!strcmp(act, "add")) {
456 if (td->o.replay_redirect &&
457 get_fileno(td, fname) != -1) {
458 dprint(FD_FILE, "iolog: ignoring"
459 " re-add of file %s\n", fname);
461 fileno = add_file(td, fname, td->subjob_number, 1);
462 file_action = FIO_LOG_ADD_FILE;
465 } else if (!strcmp(act, "open")) {
466 fileno = get_fileno(td, fname);
467 file_action = FIO_LOG_OPEN_FILE;
468 } else if (!strcmp(act, "close")) {
469 fileno = get_fileno(td, fname);
470 file_action = FIO_LOG_CLOSE_FILE;
472 log_err("fio: bad iolog file action: %s\n",
477 log_err("bad iolog2: %s\n", p);
483 else if (rw == DDIR_WRITE) {
485 * Don't add a write for ro mode
490 } else if (rw == DDIR_WAIT) {
494 } else if (rw == DDIR_INVAL) {
495 } else if (!ddir_sync(rw)) {
496 log_err("bad ddir: %d\n", rw);
503 ipo = calloc(1, sizeof(*ipo));
506 if (rw == DDIR_WAIT) {
509 if (td->o.replay_scale)
510 ipo->offset = offset / td->o.replay_scale;
512 ipo->offset = offset;
513 ipo_bytes_align(td->o.replay_align, ipo);
516 if (rw != DDIR_INVAL && bytes > td->o.max_bs[rw]) {
518 td->o.max_bs[rw] = bytes;
520 ipo->fileno = fileno;
521 ipo->file_action = file_action;
525 queue_io_piece(td, ipo);
527 if (td->o.read_iolog_chunked) {
528 td->io_log_current++;
530 if (items_to_fetch == 0)
539 if (td->o.read_iolog_chunked) {
540 td->io_log_highmark = td->io_log_current;
541 td->io_log_checkmark = (td->io_log_highmark + 1) / 2;
542 fio_gettime(&td->io_log_highmark_time, NULL);
545 if (writes && read_only) {
546 log_err("fio: <%s> skips replay of %d writes due to"
547 " read-only\n", td->o.name, writes);
551 if (td->o.read_iolog_chunked) {
552 if (td->io_log_current == 0) {
555 td->o.td_ddir = TD_DDIR_RW;
556 if (realloc && td->orig_buffer)
560 init_io_u_buffers(td);
565 if (!reads && !writes && !waits)
567 else if (reads && !writes)
568 td->o.td_ddir = TD_DDIR_READ;
569 else if (!reads && writes)
570 td->o.td_ddir = TD_DDIR_WRITE;
572 td->o.td_ddir = TD_DDIR_RW;
577 static bool is_socket(const char *path)
582 r = stat(path, &buf);
586 return S_ISSOCK(buf.st_mode);
589 static int open_socket(const char *path)
591 struct sockaddr_un addr;
594 fd = socket(AF_UNIX, SOCK_STREAM, 0);
598 addr.sun_family = AF_UNIX;
599 if (snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", path) >=
600 sizeof(addr.sun_path)) {
601 log_err("%s: path name %s is too long for a Unix socket\n",
605 ret = connect(fd, (const struct sockaddr *)&addr, strlen(path) + sizeof(addr.sun_family));
614 * open iolog, check version, and call appropriate parser
616 static bool init_iolog_read(struct thread_data *td, char *fname)
618 char buffer[256], *p;
621 dprint(FD_IO, "iolog: name=%s\n", fname);
623 if (is_socket(fname)) {
626 fd = open_socket(fname);
629 } else if (!strcmp(fname, "-")) {
632 f = fopen(fname, "r");
635 perror("fopen read iolog");
639 p = fgets(buffer, sizeof(buffer), f);
641 td_verror(td, errno, "iolog read");
642 log_err("fio: unable to read iolog\n");
648 * version 2 of the iolog stores a specific string as the
649 * first line, check for that
651 if (!strncmp(iolog_ver2, buffer, strlen(iolog_ver2))) {
652 free_release_files(td);
653 td->io_log_rfile = f;
654 return read_iolog2(td);
657 log_err("fio: iolog version 1 is no longer supported\n");
663 * Set up a log for storing io patterns.
665 static bool init_iolog_write(struct thread_data *td)
671 f = fopen(td->o.write_iolog_file, "a");
673 perror("fopen write iolog");
678 * That's it for writing, setup a log buffer and we're done.
681 td->iolog_buf = malloc(8192);
682 setvbuf(f, td->iolog_buf, _IOFBF, 8192);
685 * write our version line
687 if (fprintf(f, "%s\n", iolog_ver2) < 0) {
688 perror("iolog init\n");
693 * add all known files
695 for_each_file(td, ff, i)
696 log_file(td, ff, FIO_LOG_ADD_FILE);
701 bool init_iolog(struct thread_data *td)
705 if (td->o.read_iolog_file) {
707 char * fname = get_name_by_idx(td->o.read_iolog_file, td->subjob_number);
710 * Check if it's a blktrace file and load that if possible.
711 * Otherwise assume it's a normal log file and load that.
713 if (is_blktrace(fname, &need_swap)) {
714 td->io_log_blktrace = 1;
715 ret = init_blktrace_read(td, fname, need_swap);
717 td->io_log_blktrace = 0;
718 ret = init_iolog_read(td, fname);
721 } else if (td->o.write_iolog_file)
722 ret = init_iolog_write(td);
727 td_verror(td, EINVAL, "failed initializing iolog");
732 void setup_log(struct io_log **log, struct log_params *p,
733 const char *filename)
737 struct io_u_plat_entry *entry;
738 struct flist_head *list;
740 l = scalloc(1, sizeof(*l));
741 INIT_FLIST_HEAD(&l->io_logs);
742 l->log_type = p->log_type;
743 l->log_offset = p->log_offset;
744 l->log_prio = p->log_prio;
745 l->log_gz = p->log_gz;
746 l->log_gz_store = p->log_gz_store;
747 l->avg_msec = p->avg_msec;
748 l->hist_msec = p->hist_msec;
749 l->hist_coarseness = p->hist_coarseness;
750 l->filename = strdup(filename);
753 /* Initialize histogram lists for each r/w direction,
754 * with initial io_u_plat of all zeros:
756 for (i = 0; i < DDIR_RWDIR_CNT; i++) {
757 list = &l->hist_window[i].list;
758 INIT_FLIST_HEAD(list);
759 entry = calloc(1, sizeof(struct io_u_plat_entry));
760 flist_add(&entry->list, list);
763 if (l->td && l->td->o.io_submit_mode != IO_MODE_OFFLOAD) {
764 unsigned int def_samples = DEF_LOG_ENTRIES;
767 __p = calloc(1, sizeof(*l->pending));
768 if (l->td->o.iodepth > DEF_LOG_ENTRIES)
769 def_samples = roundup_pow2(l->td->o.iodepth);
770 __p->max_samples = def_samples;
771 __p->log = calloc(__p->max_samples, log_entry_sz(l));
776 l->log_ddir_mask = LOG_OFFSET_SAMPLE_BIT;
778 l->log_ddir_mask |= LOG_PRIO_SAMPLE_BIT;
780 INIT_FLIST_HEAD(&l->chunk_list);
782 if (l->log_gz && !p->td)
784 else if (l->log_gz || l->log_gz_store) {
785 mutex_init_pshared(&l->chunk_lock);
786 mutex_init_pshared(&l->deferred_free_lock);
787 p->td->flags |= TD_F_COMPRESS_LOG;
793 #ifdef CONFIG_SETVBUF
794 static void *set_file_buffer(FILE *f)
796 size_t size = 1048576;
800 setvbuf(f, buf, _IOFBF, size);
804 static void clear_file_buffer(void *buf)
809 static void *set_file_buffer(FILE *f)
814 static void clear_file_buffer(void *buf)
819 void free_log(struct io_log *log)
821 while (!flist_empty(&log->io_logs)) {
822 struct io_logs *cur_log;
824 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
825 flist_del_init(&cur_log->list);
831 free(log->pending->log);
841 uint64_t hist_sum(int j, int stride, uint64_t *io_u_plat,
842 uint64_t *io_u_plat_last)
847 if (io_u_plat_last) {
848 for (k = sum = 0; k < stride; k++)
849 sum += io_u_plat[j + k] - io_u_plat_last[j + k];
851 for (k = sum = 0; k < stride; k++)
852 sum += io_u_plat[j + k];
858 static void flush_hist_samples(FILE *f, int hist_coarseness, void *samples,
859 uint64_t sample_size)
863 uint64_t i, j, nr_samples;
864 struct io_u_plat_entry *entry, *entry_before;
866 uint64_t *io_u_plat_before;
868 int stride = 1 << hist_coarseness;
873 s = __get_sample(samples, 0, 0);
874 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
876 nr_samples = sample_size / __log_entry_sz(log_offset);
878 for (i = 0; i < nr_samples; i++) {
879 s = __get_sample(samples, log_offset, i);
881 entry = s->data.plat_entry;
882 io_u_plat = entry->io_u_plat;
884 entry_before = flist_first_entry(&entry->list, struct io_u_plat_entry, list);
885 io_u_plat_before = entry_before->io_u_plat;
887 fprintf(f, "%lu, %u, %llu, ", (unsigned long) s->time,
888 io_sample_ddir(s), (unsigned long long) s->bs);
889 for (j = 0; j < FIO_IO_U_PLAT_NR - stride; j += stride) {
890 fprintf(f, "%llu, ", (unsigned long long)
891 hist_sum(j, stride, io_u_plat, io_u_plat_before));
893 fprintf(f, "%llu\n", (unsigned long long)
894 hist_sum(FIO_IO_U_PLAT_NR - stride, stride, io_u_plat,
897 flist_del(&entry_before->list);
902 void flush_samples(FILE *f, void *samples, uint64_t sample_size)
905 int log_offset, log_prio;
906 uint64_t i, nr_samples;
907 unsigned int prio_val;
913 s = __get_sample(samples, 0, 0);
914 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
915 log_prio = (s->__ddir & LOG_PRIO_SAMPLE_BIT) != 0;
919 fmt = "%lu, %" PRId64 ", %u, %llu, %llu, 0x%04x\n";
921 fmt = "%lu, %" PRId64 ", %u, %llu, %llu, %u\n";
924 fmt = "%lu, %" PRId64 ", %u, %llu, 0x%04x\n";
926 fmt = "%lu, %" PRId64 ", %u, %llu, %u\n";
929 nr_samples = sample_size / __log_entry_sz(log_offset);
931 for (i = 0; i < nr_samples; i++) {
932 s = __get_sample(samples, log_offset, i);
935 prio_val = s->priority;
937 prio_val = ioprio_value_is_class_rt(s->priority);
941 (unsigned long) s->time,
943 io_sample_ddir(s), (unsigned long long) s->bs,
946 struct io_sample_offset *so = (void *) s;
949 (unsigned long) s->time,
951 io_sample_ddir(s), (unsigned long long) s->bs,
952 (unsigned long long) so->offset,
960 struct iolog_flush_data {
961 struct workqueue_work work;
968 #define GZ_CHUNK 131072
970 static struct iolog_compress *get_new_chunk(unsigned int seq)
972 struct iolog_compress *c;
974 c = malloc(sizeof(*c));
975 INIT_FLIST_HEAD(&c->list);
976 c->buf = malloc(GZ_CHUNK);
982 static void free_chunk(struct iolog_compress *ic)
988 static int z_stream_init(z_stream *stream, int gz_hdr)
992 memset(stream, 0, sizeof(*stream));
993 stream->zalloc = Z_NULL;
994 stream->zfree = Z_NULL;
995 stream->opaque = Z_NULL;
996 stream->next_in = Z_NULL;
999 * zlib magic - add 32 for auto-detection of gz header or not,
1000 * if we decide to store files in a gzip friendly format.
1005 if (inflateInit2(stream, wbits) != Z_OK)
1011 struct inflate_chunk_iter {
1020 static void finish_chunk(z_stream *stream, FILE *f,
1021 struct inflate_chunk_iter *iter)
1025 ret = inflateEnd(stream);
1027 log_err("fio: failed to end log inflation seq %d (%d)\n",
1030 flush_samples(f, iter->buf, iter->buf_used);
1033 iter->buf_size = iter->buf_used = 0;
1037 * Iterative chunk inflation. Handles cases where we cross into a new
1038 * sequence, doing flush finish of previous chunk if needed.
1040 static size_t inflate_chunk(struct iolog_compress *ic, int gz_hdr, FILE *f,
1041 z_stream *stream, struct inflate_chunk_iter *iter)
1045 dprint(FD_COMPRESS, "inflate chunk size=%lu, seq=%u\n",
1046 (unsigned long) ic->len, ic->seq);
1048 if (ic->seq != iter->seq) {
1050 finish_chunk(stream, f, iter);
1052 z_stream_init(stream, gz_hdr);
1053 iter->seq = ic->seq;
1056 stream->avail_in = ic->len;
1057 stream->next_in = ic->buf;
1059 if (!iter->buf_size) {
1060 iter->buf_size = iter->chunk_sz;
1061 iter->buf = malloc(iter->buf_size);
1064 while (stream->avail_in) {
1065 size_t this_out = iter->buf_size - iter->buf_used;
1068 stream->avail_out = this_out;
1069 stream->next_out = iter->buf + iter->buf_used;
1071 err = inflate(stream, Z_NO_FLUSH);
1073 log_err("fio: failed inflating log: %d\n", err);
1078 iter->buf_used += this_out - stream->avail_out;
1080 if (!stream->avail_out) {
1081 iter->buf_size += iter->chunk_sz;
1082 iter->buf = realloc(iter->buf, iter->buf_size);
1086 if (err == Z_STREAM_END)
1090 ret = (void *) stream->next_in - ic->buf;
1092 dprint(FD_COMPRESS, "inflated to size=%lu\n", (unsigned long) iter->buf_size);
1098 * Inflate stored compressed chunks, or write them directly to the log
1099 * file if so instructed.
1101 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1103 struct inflate_chunk_iter iter = { .chunk_sz = log->log_gz, };
1106 while (!flist_empty(&log->chunk_list)) {
1107 struct iolog_compress *ic;
1109 ic = flist_first_entry(&log->chunk_list, struct iolog_compress, list);
1110 flist_del(&ic->list);
1112 if (log->log_gz_store) {
1115 dprint(FD_COMPRESS, "log write chunk size=%lu, "
1116 "seq=%u\n", (unsigned long) ic->len, ic->seq);
1118 ret = fwrite(ic->buf, ic->len, 1, f);
1119 if (ret != 1 || ferror(f)) {
1121 log_err("fio: error writing compressed log\n");
1124 inflate_chunk(ic, log->log_gz_store, f, &stream, &iter);
1130 finish_chunk(&stream, f, &iter);
1138 * Open compressed log file and decompress the stored chunks and
1139 * write them to stdout. The chunks are stored sequentially in the
1140 * file, so we iterate over them and do them one-by-one.
1142 int iolog_file_inflate(const char *file)
1144 struct inflate_chunk_iter iter = { .chunk_sz = 64 * 1024 * 1024, };
1145 struct iolog_compress ic;
1153 f = fopen(file, "r");
1159 if (stat(file, &sb) < 0) {
1165 ic.buf = buf = malloc(sb.st_size);
1166 ic.len = sb.st_size;
1169 ret = fread(ic.buf, ic.len, 1, f);
1170 if (ret == 0 && ferror(f)) {
1175 } else if (ferror(f) || (!feof(f) && ret != 1)) {
1176 log_err("fio: short read on reading log\n");
1185 * Each chunk will return Z_STREAM_END. We don't know how many
1186 * chunks are in the file, so we just keep looping and incrementing
1187 * the sequence number until we have consumed the whole compressed
1194 iret = inflate_chunk(&ic, 1, stdout, &stream, &iter);
1207 finish_chunk(&stream, stdout, &iter);
1217 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1222 int iolog_file_inflate(const char *file)
1224 log_err("fio: log inflation not possible without zlib\n");
1230 void flush_log(struct io_log *log, bool do_append)
1236 f = fopen(log->filename, "w");
1238 f = fopen(log->filename, "a");
1240 perror("fopen log");
1244 buf = set_file_buffer(f);
1246 inflate_gz_chunks(log, f);
1248 while (!flist_empty(&log->io_logs)) {
1249 struct io_logs *cur_log;
1251 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1252 flist_del_init(&cur_log->list);
1254 if (log->td && log == log->td->clat_hist_log)
1255 flush_hist_samples(f, log->hist_coarseness, cur_log->log,
1256 log_sample_sz(log, cur_log));
1258 flush_samples(f, cur_log->log, log_sample_sz(log, cur_log));
1264 clear_file_buffer(buf);
1267 static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
1269 if (td->flags & TD_F_COMPRESS_LOG)
1273 if (fio_trylock_file(log->filename))
1276 fio_lock_file(log->filename);
1278 if (td->client_type == FIO_CLIENT_TYPE_GUI || is_backend)
1279 fio_send_iolog(td, log, log->filename);
1281 flush_log(log, !td->o.per_job_logs);
1283 fio_unlock_file(log->filename);
1288 size_t log_chunk_sizes(struct io_log *log)
1290 struct flist_head *entry;
1293 if (flist_empty(&log->chunk_list))
1297 pthread_mutex_lock(&log->chunk_lock);
1298 flist_for_each(entry, &log->chunk_list) {
1299 struct iolog_compress *c;
1301 c = flist_entry(entry, struct iolog_compress, list);
1304 pthread_mutex_unlock(&log->chunk_lock);
1310 static void iolog_put_deferred(struct io_log *log, void *ptr)
1315 pthread_mutex_lock(&log->deferred_free_lock);
1316 if (log->deferred < IOLOG_MAX_DEFER) {
1317 log->deferred_items[log->deferred] = ptr;
1319 } else if (!fio_did_warn(FIO_WARN_IOLOG_DROP))
1320 log_err("fio: had to drop log entry free\n");
1321 pthread_mutex_unlock(&log->deferred_free_lock);
1324 static void iolog_free_deferred(struct io_log *log)
1331 pthread_mutex_lock(&log->deferred_free_lock);
1333 for (i = 0; i < log->deferred; i++) {
1334 free(log->deferred_items[i]);
1335 log->deferred_items[i] = NULL;
1339 pthread_mutex_unlock(&log->deferred_free_lock);
1342 static int gz_work(struct iolog_flush_data *data)
1344 struct iolog_compress *c = NULL;
1345 struct flist_head list;
1351 INIT_FLIST_HEAD(&list);
1353 memset(&stream, 0, sizeof(stream));
1354 stream.zalloc = Z_NULL;
1355 stream.zfree = Z_NULL;
1356 stream.opaque = Z_NULL;
1358 ret = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
1360 log_err("fio: failed to init gz stream\n");
1364 seq = ++data->log->chunk_seq;
1366 stream.next_in = (void *) data->samples;
1367 stream.avail_in = data->nr_samples * log_entry_sz(data->log);
1369 dprint(FD_COMPRESS, "deflate input size=%lu, seq=%u, log=%s\n",
1370 (unsigned long) stream.avail_in, seq,
1371 data->log->filename);
1374 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1375 (unsigned long) c->len);
1376 c = get_new_chunk(seq);
1377 stream.avail_out = GZ_CHUNK;
1378 stream.next_out = c->buf;
1379 ret = deflate(&stream, Z_NO_FLUSH);
1381 log_err("fio: deflate log (%d)\n", ret);
1386 c->len = GZ_CHUNK - stream.avail_out;
1387 flist_add_tail(&c->list, &list);
1389 } while (stream.avail_in);
1391 stream.next_out = c->buf + c->len;
1392 stream.avail_out = GZ_CHUNK - c->len;
1394 ret = deflate(&stream, Z_FINISH);
1397 * Z_BUF_ERROR is special, it just means we need more
1398 * output space. We'll handle that below. Treat any other
1401 if (ret != Z_BUF_ERROR) {
1402 log_err("fio: deflate log (%d)\n", ret);
1403 flist_del(&c->list);
1410 c->len = GZ_CHUNK - stream.avail_out;
1412 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq, (unsigned long) c->len);
1414 if (ret != Z_STREAM_END) {
1416 c = get_new_chunk(seq);
1417 stream.avail_out = GZ_CHUNK;
1418 stream.next_out = c->buf;
1419 ret = deflate(&stream, Z_FINISH);
1420 c->len = GZ_CHUNK - stream.avail_out;
1422 flist_add_tail(&c->list, &list);
1423 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1424 (unsigned long) c->len);
1425 } while (ret != Z_STREAM_END);
1428 dprint(FD_COMPRESS, "deflated to size=%lu\n", (unsigned long) total);
1430 ret = deflateEnd(&stream);
1432 log_err("fio: deflateEnd %d\n", ret);
1434 iolog_put_deferred(data->log, data->samples);
1436 if (!flist_empty(&list)) {
1437 pthread_mutex_lock(&data->log->chunk_lock);
1438 flist_splice_tail(&list, &data->log->chunk_list);
1439 pthread_mutex_unlock(&data->log->chunk_lock);
1448 while (!flist_empty(&list)) {
1449 c = flist_first_entry(list.next, struct iolog_compress, list);
1450 flist_del(&c->list);
1458 * Invoked from our compress helper thread, when logging would have exceeded
1459 * the specified memory limitation. Compresses the previously stored
1462 static int gz_work_async(struct submit_worker *sw, struct workqueue_work *work)
1464 return gz_work(container_of(work, struct iolog_flush_data, work));
1467 static int gz_init_worker(struct submit_worker *sw)
1469 struct thread_data *td = sw->wq->td;
1471 if (!fio_option_is_set(&td->o, log_gz_cpumask))
1474 if (fio_setaffinity(gettid(), td->o.log_gz_cpumask) == -1) {
1475 log_err("gz: failed to set CPU affinity\n");
1482 static struct workqueue_ops log_compress_wq_ops = {
1483 .fn = gz_work_async,
1484 .init_worker_fn = gz_init_worker,
1488 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1490 if (!(td->flags & TD_F_COMPRESS_LOG))
1493 workqueue_init(td, &td->log_compress_wq, &log_compress_wq_ops, 1, sk_out);
1497 void iolog_compress_exit(struct thread_data *td)
1499 if (!(td->flags & TD_F_COMPRESS_LOG))
1502 workqueue_exit(&td->log_compress_wq);
1506 * Queue work item to compress the existing log entries. We reset the
1507 * current log to a small size, and reference the existing log in the
1508 * data that we queue for compression. Once compression has been done,
1509 * this old log is freed. If called with finish == true, will not return
1510 * until the log compression has completed, and will flush all previous
1513 static int iolog_flush(struct io_log *log)
1515 struct iolog_flush_data *data;
1517 data = malloc(sizeof(*data));
1524 while (!flist_empty(&log->io_logs)) {
1525 struct io_logs *cur_log;
1527 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1528 flist_del_init(&cur_log->list);
1530 data->samples = cur_log->log;
1531 data->nr_samples = cur_log->nr_samples;
1542 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1544 struct iolog_flush_data *data;
1546 data = smalloc(sizeof(*data));
1552 data->samples = cur_log->log;
1553 data->nr_samples = cur_log->nr_samples;
1556 cur_log->nr_samples = cur_log->max_samples = 0;
1557 cur_log->log = NULL;
1559 workqueue_enqueue(&log->td->log_compress_wq, &data->work);
1561 iolog_free_deferred(log);
1567 static int iolog_flush(struct io_log *log)
1572 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1577 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1582 void iolog_compress_exit(struct thread_data *td)
1588 struct io_logs *iolog_cur_log(struct io_log *log)
1590 if (flist_empty(&log->io_logs))
1593 return flist_last_entry(&log->io_logs, struct io_logs, list);
1596 uint64_t iolog_nr_samples(struct io_log *iolog)
1598 struct flist_head *entry;
1601 flist_for_each(entry, &iolog->io_logs) {
1602 struct io_logs *cur_log;
1604 cur_log = flist_entry(entry, struct io_logs, list);
1605 ret += cur_log->nr_samples;
1611 static int __write_log(struct thread_data *td, struct io_log *log, int try)
1614 return finish_log(td, log, try);
1619 static int write_iops_log(struct thread_data *td, int try, bool unit_log)
1623 if (per_unit_log(td->iops_log) != unit_log)
1626 ret = __write_log(td, td->iops_log, try);
1628 td->iops_log = NULL;
1633 static int write_slat_log(struct thread_data *td, int try, bool unit_log)
1640 ret = __write_log(td, td->slat_log, try);
1642 td->slat_log = NULL;
1647 static int write_clat_log(struct thread_data *td, int try, bool unit_log)
1654 ret = __write_log(td, td->clat_log, try);
1656 td->clat_log = NULL;
1661 static int write_clat_hist_log(struct thread_data *td, int try, bool unit_log)
1668 ret = __write_log(td, td->clat_hist_log, try);
1670 td->clat_hist_log = NULL;
1675 static int write_lat_log(struct thread_data *td, int try, bool unit_log)
1682 ret = __write_log(td, td->lat_log, try);
1689 static int write_bandw_log(struct thread_data *td, int try, bool unit_log)
1693 if (per_unit_log(td->bw_log) != unit_log)
1696 ret = __write_log(td, td->bw_log, try);
1709 CLAT_HIST_LOG_MASK = 32,
1716 int (*fn)(struct thread_data *, int, bool);
1719 static struct log_type log_types[] = {
1721 .mask = BW_LOG_MASK,
1722 .fn = write_bandw_log,
1725 .mask = LAT_LOG_MASK,
1726 .fn = write_lat_log,
1729 .mask = SLAT_LOG_MASK,
1730 .fn = write_slat_log,
1733 .mask = CLAT_LOG_MASK,
1734 .fn = write_clat_log,
1737 .mask = IOPS_LOG_MASK,
1738 .fn = write_iops_log,
1741 .mask = CLAT_HIST_LOG_MASK,
1742 .fn = write_clat_hist_log,
1746 void td_writeout_logs(struct thread_data *td, bool unit_logs)
1748 unsigned int log_mask = 0;
1749 unsigned int log_left = ALL_LOG_NR;
1752 old_state = td_bump_runstate(td, TD_FINISHING);
1754 finalize_logs(td, unit_logs);
1757 int prev_log_left = log_left;
1759 for (i = 0; i < ALL_LOG_NR && log_left; i++) {
1760 struct log_type *lt = &log_types[i];
1763 if (!(log_mask & lt->mask)) {
1764 ret = lt->fn(td, log_left != 1, unit_logs);
1767 log_mask |= lt->mask;
1772 if (prev_log_left == log_left)
1776 td_restore_runstate(td, old_state);
1779 void fio_writeout_logs(bool unit_logs)
1781 struct thread_data *td;
1785 td_writeout_logs(td, unit_logs);