2 * Code related to writing an iolog of what a thread is doing, and to
3 * later read that back and replay
22 #include "lib/roundup.h"
24 #include <netinet/in.h>
25 #include <netinet/tcp.h>
26 #include <arpa/inet.h>
28 #include <sys/socket.h>
31 static int iolog_flush(struct io_log *log);
33 static const char iolog_ver2[] = "fio version 2 iolog";
34 static const char iolog_ver3[] = "fio version 3 iolog";
36 void queue_io_piece(struct thread_data *td, struct io_piece *ipo)
38 flist_add_tail(&ipo->list, &td->io_log_list);
39 td->total_io_size += ipo->len;
42 void log_io_u(const struct thread_data *td, const struct io_u *io_u)
46 if (!td->o.write_iolog_file)
49 fio_gettime(&now, NULL);
50 fprintf(td->iolog_f, "%llu %s %s %llu %llu\n",
51 (unsigned long long) utime_since_now(&td->io_log_start_time),
52 io_u->file->file_name, io_ddir_name(io_u->ddir), io_u->offset,
57 void log_file(struct thread_data *td, struct fio_file *f,
58 enum file_log_act what)
60 const char *act[] = { "add", "open", "close" };
65 if (!td->o.write_iolog_file)
70 * this happens on the pre-open/close done before the job starts
75 fio_gettime(&now, NULL);
76 fprintf(td->iolog_f, "%llu %s %s\n",
77 (unsigned long long) utime_since_now(&td->io_log_start_time),
78 f->file_name, act[what]);
81 static void iolog_delay(struct thread_data *td, unsigned long delay)
83 uint64_t usec = utime_since_now(&td->last_issue);
84 unsigned long orig_delay = delay;
88 if (delay < td->time_offset) {
93 delay -= td->time_offset;
99 fio_gettime(&ts, NULL);
100 while (delay && !td->terminate) {
102 if (this_delay > 500000)
105 usec_sleep(td, this_delay);
109 usec = utime_since_now(&ts);
110 if (usec > orig_delay)
111 td->time_offset = usec - orig_delay;
116 static int ipo_special(struct thread_data *td, struct io_piece *ipo)
124 if (ipo->ddir != DDIR_INVAL)
127 f = td->files[ipo->fileno];
130 iolog_delay(td, ipo->delay);
131 if (fio_fill_issue_time(td))
132 fio_gettime(&td->last_issue, NULL);
133 switch (ipo->file_action) {
134 case FIO_LOG_OPEN_FILE:
135 if (td->o.replay_redirect && fio_file_open(f)) {
136 dprint(FD_FILE, "iolog: ignoring re-open of file %s\n",
140 ret = td_io_open_file(td, f);
143 td_verror(td, ret, "iolog open file");
145 case FIO_LOG_CLOSE_FILE:
146 td_io_close_file(td, f);
148 case FIO_LOG_UNLINK_FILE:
149 td_io_unlink_file(td, f);
151 case FIO_LOG_ADD_FILE:
157 log_err("fio: bad file action %d\n", ipo->file_action);
164 static bool read_iolog(struct thread_data *td);
166 unsigned long long delay_since_ttime(const struct thread_data *td,
167 unsigned long long time)
171 const unsigned long long *last_ttime = &td->io_log_last_ttime;
173 if (!*last_ttime || td->o.no_stall || time < *last_ttime)
175 else if (td->o.replay_time_scale == 100)
176 return time - *last_ttime;
179 scale = (double) 100.0 / (double) td->o.replay_time_scale;
180 tmp = time - *last_ttime;
184 int read_iolog_get(struct thread_data *td, struct io_u *io_u)
186 struct io_piece *ipo;
187 unsigned long elapsed;
189 while (!flist_empty(&td->io_log_list)) {
192 if (td->o.read_iolog_chunked) {
193 if (td->io_log_checkmark == td->io_log_current) {
194 if (td->io_log_blktrace) {
195 if (!read_blktrace(td))
202 td->io_log_current--;
204 ipo = flist_first_entry(&td->io_log_list, struct io_piece, list);
205 flist_del(&ipo->list);
206 remove_trim_entry(td, ipo);
208 ret = ipo_special(td, ipo);
212 } else if (ret > 0) {
217 io_u->ddir = ipo->ddir;
218 if (ipo->ddir != DDIR_WAIT) {
219 io_u->offset = ipo->offset;
220 io_u->verify_offset = ipo->offset;
221 io_u->buflen = ipo->len;
222 io_u->file = td->files[ipo->fileno];
223 get_file(io_u->file);
224 dprint(FD_IO, "iolog: get %llu/%llu/%s\n", io_u->offset,
225 io_u->buflen, io_u->file->file_name);
227 iolog_delay(td, ipo->delay);
229 elapsed = mtime_since_genesis();
230 if (ipo->delay > elapsed)
231 usec_sleep(td, (ipo->delay - elapsed) * 1000);
236 if (io_u->ddir != DDIR_WAIT)
244 void prune_io_piece_log(struct thread_data *td)
246 struct io_piece *ipo;
247 struct fio_rb_node *n;
249 while ((n = rb_first(&td->io_hist_tree)) != NULL) {
250 ipo = rb_entry(n, struct io_piece, rb_node);
251 rb_erase(n, &td->io_hist_tree);
252 remove_trim_entry(td, ipo);
257 while (!flist_empty(&td->io_hist_list)) {
258 ipo = flist_first_entry(&td->io_hist_list, struct io_piece, list);
259 flist_del(&ipo->list);
260 remove_trim_entry(td, ipo);
267 * log a successful write, so we can unwind the log for verify
269 void log_io_piece(struct thread_data *td, struct io_u *io_u)
271 struct fio_rb_node **p, *parent;
272 struct io_piece *ipo, *__ipo;
274 ipo = calloc(1, sizeof(struct io_piece));
276 ipo->file = io_u->file;
277 ipo->offset = io_u->offset;
278 ipo->len = io_u->buflen;
279 ipo->numberio = io_u->numberio;
280 ipo->flags = IP_F_IN_FLIGHT;
284 if (io_u_should_trim(td, io_u)) {
285 flist_add_tail(&ipo->trim_list, &td->trim_list);
290 * Only sort writes if we don't have a random map in which case we need
291 * to check for duplicate blocks and drop the old one, which we rely on
292 * the rb insert/lookup for handling.
294 if (file_randommap(td, ipo->file)) {
295 INIT_FLIST_HEAD(&ipo->list);
296 flist_add_tail(&ipo->list, &td->io_hist_list);
297 ipo->flags |= IP_F_ONLIST;
302 RB_CLEAR_NODE(&ipo->rb_node);
305 * Sort the entry into the verification list
308 p = &td->io_hist_tree.rb_node;
314 __ipo = rb_entry(parent, struct io_piece, rb_node);
315 if (ipo->file < __ipo->file)
317 else if (ipo->file > __ipo->file)
319 else if (ipo->offset < __ipo->offset) {
321 overlap = ipo->offset + ipo->len > __ipo->offset;
323 else if (ipo->offset > __ipo->offset) {
325 overlap = __ipo->offset + __ipo->len > ipo->offset;
331 dprint(FD_IO, "iolog: overlap %llu/%lu, %llu/%lu\n",
332 __ipo->offset, __ipo->len,
333 ipo->offset, ipo->len);
335 rb_erase(parent, &td->io_hist_tree);
336 remove_trim_entry(td, __ipo);
337 if (!(__ipo->flags & IP_F_IN_FLIGHT))
343 rb_link_node(&ipo->rb_node, parent, p);
344 rb_insert_color(&ipo->rb_node, &td->io_hist_tree);
345 ipo->flags |= IP_F_ONRB;
349 void unlog_io_piece(struct thread_data *td, struct io_u *io_u)
351 struct io_piece *ipo = io_u->ipo;
353 if (td->ts.nr_block_infos) {
354 uint32_t *info = io_u_block_info(td, io_u);
355 if (BLOCK_INFO_STATE(*info) < BLOCK_STATE_TRIM_FAILURE) {
356 if (io_u->ddir == DDIR_TRIM)
357 *info = BLOCK_INFO_SET_STATE(*info,
358 BLOCK_STATE_TRIM_FAILURE);
359 else if (io_u->ddir == DDIR_WRITE)
360 *info = BLOCK_INFO_SET_STATE(*info,
361 BLOCK_STATE_WRITE_FAILURE);
368 if (ipo->flags & IP_F_ONRB)
369 rb_erase(&ipo->rb_node, &td->io_hist_tree);
370 else if (ipo->flags & IP_F_ONLIST)
371 flist_del(&ipo->list);
378 void trim_io_piece(const struct io_u *io_u)
380 struct io_piece *ipo = io_u->ipo;
385 ipo->len = io_u->xfer_buflen - io_u->resid;
388 void write_iolog_close(struct thread_data *td)
397 td->iolog_buf = NULL;
400 int64_t iolog_items_to_fetch(struct thread_data *td)
405 int64_t items_to_fetch;
407 if (!td->io_log_highmark)
411 fio_gettime(&now, NULL);
412 elapsed = ntime_since(&td->io_log_highmark_time, &now);
414 for_1s = (td->io_log_highmark - td->io_log_current) * 1000000000 / elapsed;
415 items_to_fetch = for_1s - td->io_log_current;
416 if (items_to_fetch < 0)
421 td->io_log_highmark = td->io_log_current + items_to_fetch;
422 td->io_log_checkmark = (td->io_log_highmark + 1) / 2;
423 fio_gettime(&td->io_log_highmark_time, NULL);
425 return items_to_fetch;
428 #define io_act(_td, _r) (((_td)->io_log_version == 3 && (r) == 5) || \
429 ((_td)->io_log_version == 2 && (r) == 4))
430 #define file_act(_td, _r) (((_td)->io_log_version == 3 && (r) == 3) || \
431 ((_td)->io_log_version == 2 && (r) == 2))
434 * Read version 2 and 3 iolog data. It is enhanced to include per-file logging,
437 static bool read_iolog(struct thread_data *td)
439 unsigned long long offset;
441 unsigned long long delay = 0;
442 int reads, writes, waits, fileno = 0, file_action = 0; /* stupid gcc */
443 char *rfname, *fname, *act;
446 bool realloc = false;
447 int64_t items_to_fetch = 0;
450 if (td->o.read_iolog_chunked) {
451 items_to_fetch = iolog_items_to_fetch(td);
457 * Read in the read iolog and store it, reuse the infrastructure
458 * for doing verifications.
461 rfname = fname = malloc(256+16);
462 act = malloc(256+16);
464 syncs = reads = writes = waits = 0;
465 while ((p = fgets(str, 4096, td->io_log_rfile)) != NULL) {
466 struct io_piece *ipo;
468 unsigned long long ttime;
470 if (td->io_log_version == 3) {
471 r = sscanf(p, "%llu %256s %256s %llu %u", &ttime, rfname, act,
473 delay = delay_since_ttime(td, ttime);
474 td->io_log_last_ttime = ttime;
476 * "wait" is not allowed with version 3
478 if (!strcmp(act, "wait")) {
479 log_err("iolog: ignoring wait command with"
480 " version 3 for file %s\n", fname);
483 } else /* version 2 */
484 r = sscanf(p, "%256s %256s %llu %u", rfname, act, &offset, &bytes);
486 if (td->o.replay_redirect)
487 fname = td->o.replay_redirect;
493 if (!strcmp(act, "wait"))
495 else if (!strcmp(act, "read"))
497 else if (!strcmp(act, "write"))
499 else if (!strcmp(act, "sync"))
501 else if (!strcmp(act, "datasync"))
503 else if (!strcmp(act, "trim"))
506 log_err("fio: bad iolog file action: %s\n",
510 fileno = get_fileno(td, fname);
511 } else if (file_act(td, r)) {
513 if (!strcmp(act, "add")) {
514 if (td->o.replay_redirect &&
515 get_fileno(td, fname) != -1) {
516 dprint(FD_FILE, "iolog: ignoring"
517 " re-add of file %s\n", fname);
519 fileno = add_file(td, fname, td->subjob_number, 1);
520 file_action = FIO_LOG_ADD_FILE;
522 } else if (!strcmp(act, "open")) {
523 fileno = get_fileno(td, fname);
524 file_action = FIO_LOG_OPEN_FILE;
525 } else if (!strcmp(act, "close")) {
526 fileno = get_fileno(td, fname);
527 file_action = FIO_LOG_CLOSE_FILE;
529 log_err("fio: bad iolog file action: %s\n",
534 log_err("bad iolog%d: %s\n", td->io_log_version, p);
540 else if (rw == DDIR_WRITE) {
542 * Don't add a write for ro mode
547 } else if (rw == DDIR_WAIT) {
551 } else if (rw == DDIR_INVAL) {
552 } else if (ddir_sync(rw)) {
555 log_err("bad ddir: %d\n", rw);
562 ipo = calloc(1, sizeof(*ipo));
565 if (td->io_log_version == 3)
567 if (rw == DDIR_WAIT) {
570 if (td->o.replay_scale)
571 ipo->offset = offset / td->o.replay_scale;
573 ipo->offset = offset;
574 ipo_bytes_align(td->o.replay_align, ipo);
577 if (rw != DDIR_INVAL && bytes > td->o.max_bs[rw]) {
579 td->o.max_bs[rw] = bytes;
581 ipo->fileno = fileno;
582 ipo->file_action = file_action;
586 queue_io_piece(td, ipo);
588 if (td->o.read_iolog_chunked) {
589 td->io_log_current++;
591 if (items_to_fetch == 0)
600 if (td->o.read_iolog_chunked) {
601 td->io_log_highmark = td->io_log_current;
602 td->io_log_checkmark = (td->io_log_highmark + 1) / 2;
603 fio_gettime(&td->io_log_highmark_time, NULL);
606 if (writes && read_only) {
607 log_err("fio: <%s> skips replay of %d writes due to"
608 " read-only\n", td->o.name, writes);
612 td->flags |= TD_F_SYNCS;
614 if (td->o.read_iolog_chunked) {
615 if (td->io_log_current == 0) {
618 td->o.td_ddir = TD_DDIR_RW;
619 if (realloc && td->orig_buffer)
623 if (init_io_u_buffers(td))
629 if (!reads && !writes && !waits)
631 else if (reads && !writes)
632 td->o.td_ddir = TD_DDIR_READ;
633 else if (!reads && writes)
634 td->o.td_ddir = TD_DDIR_WRITE;
636 td->o.td_ddir = TD_DDIR_RW;
641 static bool is_socket(const char *path)
646 r = stat(path, &buf);
650 return S_ISSOCK(buf.st_mode);
653 static int open_socket(const char *path)
655 struct sockaddr_un addr;
658 fd = socket(AF_UNIX, SOCK_STREAM, 0);
662 addr.sun_family = AF_UNIX;
663 if (snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", path) >=
664 sizeof(addr.sun_path)) {
665 log_err("%s: path name %s is too long for a Unix socket\n",
669 ret = connect(fd, (const struct sockaddr *)&addr, strlen(path) + sizeof(addr.sun_family));
678 * open iolog, check version, and call appropriate parser
680 static bool init_iolog_read(struct thread_data *td, char *fname)
682 char buffer[256], *p;
685 dprint(FD_IO, "iolog: name=%s\n", fname);
687 if (is_socket(fname)) {
690 fd = open_socket(fname);
693 } else if (!strcmp(fname, "-")) {
696 f = fopen(fname, "r");
699 perror("fopen read iolog");
703 p = fgets(buffer, sizeof(buffer), f);
705 td_verror(td, errno, "iolog read");
706 log_err("fio: unable to read iolog\n");
712 * versions 2 and 3 of the iolog store a specific string as the
713 * first line, check for that
715 if (!strncmp(iolog_ver2, buffer, strlen(iolog_ver2)))
716 td->io_log_version = 2;
717 else if (!strncmp(iolog_ver3, buffer, strlen(iolog_ver3)))
718 td->io_log_version = 3;
720 log_err("fio: iolog version 1 is no longer supported\n");
725 free_release_files(td);
726 td->io_log_rfile = f;
727 return read_iolog(td);
731 * Set up a log for storing io patterns.
733 static bool init_iolog_write(struct thread_data *td)
739 f = fopen(td->o.write_iolog_file, "a");
741 perror("fopen write iolog");
746 * That's it for writing, setup a log buffer and we're done.
749 td->iolog_buf = malloc(8192);
750 setvbuf(f, td->iolog_buf, _IOFBF, 8192);
751 fio_gettime(&td->io_log_start_time, NULL);
754 * write our version line
756 if (fprintf(f, "%s\n", iolog_ver3) < 0) {
757 perror("iolog init\n");
762 * add all known files
764 for_each_file(td, ff, i)
765 log_file(td, ff, FIO_LOG_ADD_FILE);
770 bool init_iolog(struct thread_data *td)
774 if (td->o.read_iolog_file) {
776 char * fname = get_name_by_idx(td->o.read_iolog_file, td->subjob_number);
779 * Check if it's a blktrace file and load that if possible.
780 * Otherwise assume it's a normal log file and load that.
782 if (is_blktrace(fname, &need_swap)) {
783 td->io_log_blktrace = 1;
784 ret = init_blktrace_read(td, fname, need_swap);
786 td->io_log_blktrace = 0;
787 ret = init_iolog_read(td, fname);
790 } else if (td->o.write_iolog_file)
791 ret = init_iolog_write(td);
796 td_verror(td, EINVAL, "failed initializing iolog");
801 void setup_log(struct io_log **log, struct log_params *p,
802 const char *filename)
806 struct io_u_plat_entry *entry;
807 struct flist_head *list;
809 l = scalloc(1, sizeof(*l));
810 INIT_FLIST_HEAD(&l->io_logs);
811 l->log_type = p->log_type;
812 l->log_offset = p->log_offset;
813 l->log_prio = p->log_prio;
814 l->log_gz = p->log_gz;
815 l->log_gz_store = p->log_gz_store;
816 l->avg_msec = p->avg_msec;
817 l->hist_msec = p->hist_msec;
818 l->hist_coarseness = p->hist_coarseness;
819 l->filename = strdup(filename);
822 /* Initialize histogram lists for each r/w direction,
823 * with initial io_u_plat of all zeros:
825 for (i = 0; i < DDIR_RWDIR_CNT; i++) {
826 list = &l->hist_window[i].list;
827 INIT_FLIST_HEAD(list);
828 entry = calloc(1, sizeof(struct io_u_plat_entry));
829 flist_add(&entry->list, list);
832 if (l->td && l->td->o.io_submit_mode != IO_MODE_OFFLOAD) {
833 unsigned int def_samples = DEF_LOG_ENTRIES;
836 __p = calloc(1, sizeof(*l->pending));
837 if (l->td->o.iodepth > DEF_LOG_ENTRIES)
838 def_samples = roundup_pow2(l->td->o.iodepth);
839 __p->max_samples = def_samples;
840 __p->log = calloc(__p->max_samples, log_entry_sz(l));
845 l->log_ddir_mask = LOG_OFFSET_SAMPLE_BIT;
847 l->log_ddir_mask |= LOG_PRIO_SAMPLE_BIT;
849 INIT_FLIST_HEAD(&l->chunk_list);
851 if (l->log_gz && !p->td)
853 else if (l->log_gz || l->log_gz_store) {
854 mutex_init_pshared(&l->chunk_lock);
855 mutex_init_pshared(&l->deferred_free_lock);
856 p->td->flags |= TD_F_COMPRESS_LOG;
862 #ifdef CONFIG_SETVBUF
863 static void *set_file_buffer(FILE *f)
865 size_t size = 1048576;
869 setvbuf(f, buf, _IOFBF, size);
873 static void clear_file_buffer(void *buf)
878 static void *set_file_buffer(FILE *f)
883 static void clear_file_buffer(void *buf)
888 void free_log(struct io_log *log)
890 while (!flist_empty(&log->io_logs)) {
891 struct io_logs *cur_log;
893 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
894 flist_del_init(&cur_log->list);
900 free(log->pending->log);
910 uint64_t hist_sum(int j, int stride, uint64_t *io_u_plat,
911 uint64_t *io_u_plat_last)
916 if (io_u_plat_last) {
917 for (k = sum = 0; k < stride; k++)
918 sum += io_u_plat[j + k] - io_u_plat_last[j + k];
920 for (k = sum = 0; k < stride; k++)
921 sum += io_u_plat[j + k];
927 static void flush_hist_samples(FILE *f, int hist_coarseness, void *samples,
928 uint64_t sample_size)
932 uint64_t i, j, nr_samples;
933 struct io_u_plat_entry *entry, *entry_before;
935 uint64_t *io_u_plat_before;
937 int stride = 1 << hist_coarseness;
942 s = __get_sample(samples, 0, 0);
943 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
945 nr_samples = sample_size / __log_entry_sz(log_offset);
947 for (i = 0; i < nr_samples; i++) {
948 s = __get_sample(samples, log_offset, i);
950 entry = s->data.plat_entry;
951 io_u_plat = entry->io_u_plat;
953 entry_before = flist_first_entry(&entry->list, struct io_u_plat_entry, list);
954 io_u_plat_before = entry_before->io_u_plat;
956 fprintf(f, "%lu, %u, %llu, ", (unsigned long) s->time,
957 io_sample_ddir(s), (unsigned long long) s->bs);
958 for (j = 0; j < FIO_IO_U_PLAT_NR - stride; j += stride) {
959 fprintf(f, "%llu, ", (unsigned long long)
960 hist_sum(j, stride, io_u_plat, io_u_plat_before));
962 fprintf(f, "%llu\n", (unsigned long long)
963 hist_sum(FIO_IO_U_PLAT_NR - stride, stride, io_u_plat,
966 flist_del(&entry_before->list);
971 void flush_samples(FILE *f, void *samples, uint64_t sample_size)
974 int log_offset, log_prio;
975 uint64_t i, nr_samples;
976 unsigned int prio_val;
982 s = __get_sample(samples, 0, 0);
983 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
984 log_prio = (s->__ddir & LOG_PRIO_SAMPLE_BIT) != 0;
988 fmt = "%lu, %" PRId64 ", %u, %llu, %llu, 0x%04x\n";
990 fmt = "%lu, %" PRId64 ", %u, %llu, %llu, %u\n";
993 fmt = "%lu, %" PRId64 ", %u, %llu, 0x%04x\n";
995 fmt = "%lu, %" PRId64 ", %u, %llu, %u\n";
998 nr_samples = sample_size / __log_entry_sz(log_offset);
1000 for (i = 0; i < nr_samples; i++) {
1001 s = __get_sample(samples, log_offset, i);
1004 prio_val = s->priority;
1006 prio_val = ioprio_value_is_class_rt(s->priority);
1010 (unsigned long) s->time,
1012 io_sample_ddir(s), (unsigned long long) s->bs,
1015 struct io_sample_offset *so = (void *) s;
1018 (unsigned long) s->time,
1020 io_sample_ddir(s), (unsigned long long) s->bs,
1021 (unsigned long long) so->offset,
1029 struct iolog_flush_data {
1030 struct workqueue_work work;
1033 uint32_t nr_samples;
1037 #define GZ_CHUNK 131072
1039 static struct iolog_compress *get_new_chunk(unsigned int seq)
1041 struct iolog_compress *c;
1043 c = malloc(sizeof(*c));
1044 INIT_FLIST_HEAD(&c->list);
1045 c->buf = malloc(GZ_CHUNK);
1051 static void free_chunk(struct iolog_compress *ic)
1057 static int z_stream_init(z_stream *stream, int gz_hdr)
1061 memset(stream, 0, sizeof(*stream));
1062 stream->zalloc = Z_NULL;
1063 stream->zfree = Z_NULL;
1064 stream->opaque = Z_NULL;
1065 stream->next_in = Z_NULL;
1068 * zlib magic - add 32 for auto-detection of gz header or not,
1069 * if we decide to store files in a gzip friendly format.
1074 if (inflateInit2(stream, wbits) != Z_OK)
1080 struct inflate_chunk_iter {
1089 static void finish_chunk(z_stream *stream, FILE *f,
1090 struct inflate_chunk_iter *iter)
1094 ret = inflateEnd(stream);
1096 log_err("fio: failed to end log inflation seq %d (%d)\n",
1099 flush_samples(f, iter->buf, iter->buf_used);
1102 iter->buf_size = iter->buf_used = 0;
1106 * Iterative chunk inflation. Handles cases where we cross into a new
1107 * sequence, doing flush finish of previous chunk if needed.
1109 static size_t inflate_chunk(struct iolog_compress *ic, int gz_hdr, FILE *f,
1110 z_stream *stream, struct inflate_chunk_iter *iter)
1114 dprint(FD_COMPRESS, "inflate chunk size=%lu, seq=%u\n",
1115 (unsigned long) ic->len, ic->seq);
1117 if (ic->seq != iter->seq) {
1119 finish_chunk(stream, f, iter);
1121 z_stream_init(stream, gz_hdr);
1122 iter->seq = ic->seq;
1125 stream->avail_in = ic->len;
1126 stream->next_in = ic->buf;
1128 if (!iter->buf_size) {
1129 iter->buf_size = iter->chunk_sz;
1130 iter->buf = malloc(iter->buf_size);
1133 while (stream->avail_in) {
1134 size_t this_out = iter->buf_size - iter->buf_used;
1137 stream->avail_out = this_out;
1138 stream->next_out = iter->buf + iter->buf_used;
1140 err = inflate(stream, Z_NO_FLUSH);
1142 log_err("fio: failed inflating log: %d\n", err);
1147 iter->buf_used += this_out - stream->avail_out;
1149 if (!stream->avail_out) {
1150 iter->buf_size += iter->chunk_sz;
1151 iter->buf = realloc(iter->buf, iter->buf_size);
1155 if (err == Z_STREAM_END)
1159 ret = (void *) stream->next_in - ic->buf;
1161 dprint(FD_COMPRESS, "inflated to size=%lu\n", (unsigned long) iter->buf_size);
1167 * Inflate stored compressed chunks, or write them directly to the log
1168 * file if so instructed.
1170 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1172 struct inflate_chunk_iter iter = { .chunk_sz = log->log_gz, };
1175 while (!flist_empty(&log->chunk_list)) {
1176 struct iolog_compress *ic;
1178 ic = flist_first_entry(&log->chunk_list, struct iolog_compress, list);
1179 flist_del(&ic->list);
1181 if (log->log_gz_store) {
1184 dprint(FD_COMPRESS, "log write chunk size=%lu, "
1185 "seq=%u\n", (unsigned long) ic->len, ic->seq);
1187 ret = fwrite(ic->buf, ic->len, 1, f);
1188 if (ret != 1 || ferror(f)) {
1190 log_err("fio: error writing compressed log\n");
1193 inflate_chunk(ic, log->log_gz_store, f, &stream, &iter);
1199 finish_chunk(&stream, f, &iter);
1207 * Open compressed log file and decompress the stored chunks and
1208 * write them to stdout. The chunks are stored sequentially in the
1209 * file, so we iterate over them and do them one-by-one.
1211 int iolog_file_inflate(const char *file)
1213 struct inflate_chunk_iter iter = { .chunk_sz = 64 * 1024 * 1024, };
1214 struct iolog_compress ic;
1222 f = fopen(file, "rb");
1228 if (stat(file, &sb) < 0) {
1234 ic.buf = buf = malloc(sb.st_size);
1235 ic.len = sb.st_size;
1238 ret = fread(ic.buf, ic.len, 1, f);
1239 if (ret == 0 && ferror(f)) {
1244 } else if (ferror(f) || (!feof(f) && ret != 1)) {
1245 log_err("fio: short read on reading log\n");
1254 * Each chunk will return Z_STREAM_END. We don't know how many
1255 * chunks are in the file, so we just keep looping and incrementing
1256 * the sequence number until we have consumed the whole compressed
1263 iret = inflate_chunk(&ic, 1, stdout, &stream, &iter);
1276 finish_chunk(&stream, stdout, &iter);
1286 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1291 int iolog_file_inflate(const char *file)
1293 log_err("fio: log inflation not possible without zlib\n");
1299 void flush_log(struct io_log *log, bool do_append)
1305 * If log_gz_store is true, we are writing a binary file.
1306 * Set the mode appropriately (on all platforms) to avoid issues
1307 * on windows (line-ending conversions, etc.)
1310 if (log->log_gz_store)
1311 f = fopen(log->filename, "wb");
1313 f = fopen(log->filename, "w");
1315 if (log->log_gz_store)
1316 f = fopen(log->filename, "ab");
1318 f = fopen(log->filename, "a");
1320 perror("fopen log");
1324 buf = set_file_buffer(f);
1326 inflate_gz_chunks(log, f);
1328 while (!flist_empty(&log->io_logs)) {
1329 struct io_logs *cur_log;
1331 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1332 flist_del_init(&cur_log->list);
1334 if (log->td && log == log->td->clat_hist_log)
1335 flush_hist_samples(f, log->hist_coarseness, cur_log->log,
1336 log_sample_sz(log, cur_log));
1338 flush_samples(f, cur_log->log, log_sample_sz(log, cur_log));
1344 clear_file_buffer(buf);
1347 static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
1349 if (td->flags & TD_F_COMPRESS_LOG)
1353 if (fio_trylock_file(log->filename))
1356 fio_lock_file(log->filename);
1358 if (td->client_type == FIO_CLIENT_TYPE_GUI || is_backend)
1359 fio_send_iolog(td, log, log->filename);
1361 flush_log(log, !td->o.per_job_logs);
1363 fio_unlock_file(log->filename);
1368 size_t log_chunk_sizes(struct io_log *log)
1370 struct flist_head *entry;
1373 if (flist_empty(&log->chunk_list))
1377 pthread_mutex_lock(&log->chunk_lock);
1378 flist_for_each(entry, &log->chunk_list) {
1379 struct iolog_compress *c;
1381 c = flist_entry(entry, struct iolog_compress, list);
1384 pthread_mutex_unlock(&log->chunk_lock);
1390 static void iolog_put_deferred(struct io_log *log, void *ptr)
1395 pthread_mutex_lock(&log->deferred_free_lock);
1396 if (log->deferred < IOLOG_MAX_DEFER) {
1397 log->deferred_items[log->deferred] = ptr;
1399 } else if (!fio_did_warn(FIO_WARN_IOLOG_DROP))
1400 log_err("fio: had to drop log entry free\n");
1401 pthread_mutex_unlock(&log->deferred_free_lock);
1404 static void iolog_free_deferred(struct io_log *log)
1411 pthread_mutex_lock(&log->deferred_free_lock);
1413 for (i = 0; i < log->deferred; i++) {
1414 free(log->deferred_items[i]);
1415 log->deferred_items[i] = NULL;
1419 pthread_mutex_unlock(&log->deferred_free_lock);
1422 static int gz_work(struct iolog_flush_data *data)
1424 struct iolog_compress *c = NULL;
1425 struct flist_head list;
1431 INIT_FLIST_HEAD(&list);
1433 memset(&stream, 0, sizeof(stream));
1434 stream.zalloc = Z_NULL;
1435 stream.zfree = Z_NULL;
1436 stream.opaque = Z_NULL;
1438 ret = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
1440 log_err("fio: failed to init gz stream\n");
1444 seq = ++data->log->chunk_seq;
1446 stream.next_in = (void *) data->samples;
1447 stream.avail_in = data->nr_samples * log_entry_sz(data->log);
1449 dprint(FD_COMPRESS, "deflate input size=%lu, seq=%u, log=%s\n",
1450 (unsigned long) stream.avail_in, seq,
1451 data->log->filename);
1454 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1455 (unsigned long) c->len);
1456 c = get_new_chunk(seq);
1457 stream.avail_out = GZ_CHUNK;
1458 stream.next_out = c->buf;
1459 ret = deflate(&stream, Z_NO_FLUSH);
1461 log_err("fio: deflate log (%d)\n", ret);
1466 c->len = GZ_CHUNK - stream.avail_out;
1467 flist_add_tail(&c->list, &list);
1469 } while (stream.avail_in);
1471 stream.next_out = c->buf + c->len;
1472 stream.avail_out = GZ_CHUNK - c->len;
1474 ret = deflate(&stream, Z_FINISH);
1477 * Z_BUF_ERROR is special, it just means we need more
1478 * output space. We'll handle that below. Treat any other
1481 if (ret != Z_BUF_ERROR) {
1482 log_err("fio: deflate log (%d)\n", ret);
1483 flist_del(&c->list);
1490 c->len = GZ_CHUNK - stream.avail_out;
1492 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq, (unsigned long) c->len);
1494 if (ret != Z_STREAM_END) {
1496 c = get_new_chunk(seq);
1497 stream.avail_out = GZ_CHUNK;
1498 stream.next_out = c->buf;
1499 ret = deflate(&stream, Z_FINISH);
1500 c->len = GZ_CHUNK - stream.avail_out;
1502 flist_add_tail(&c->list, &list);
1503 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1504 (unsigned long) c->len);
1505 } while (ret != Z_STREAM_END);
1508 dprint(FD_COMPRESS, "deflated to size=%lu\n", (unsigned long) total);
1510 ret = deflateEnd(&stream);
1512 log_err("fio: deflateEnd %d\n", ret);
1514 iolog_put_deferred(data->log, data->samples);
1516 if (!flist_empty(&list)) {
1517 pthread_mutex_lock(&data->log->chunk_lock);
1518 flist_splice_tail(&list, &data->log->chunk_list);
1519 pthread_mutex_unlock(&data->log->chunk_lock);
1528 while (!flist_empty(&list)) {
1529 c = flist_first_entry(list.next, struct iolog_compress, list);
1530 flist_del(&c->list);
1538 * Invoked from our compress helper thread, when logging would have exceeded
1539 * the specified memory limitation. Compresses the previously stored
1542 static int gz_work_async(struct submit_worker *sw, struct workqueue_work *work)
1544 return gz_work(container_of(work, struct iolog_flush_data, work));
1547 static int gz_init_worker(struct submit_worker *sw)
1549 struct thread_data *td = sw->wq->td;
1551 if (!fio_option_is_set(&td->o, log_gz_cpumask))
1554 if (fio_setaffinity(gettid(), td->o.log_gz_cpumask) == -1) {
1555 log_err("gz: failed to set CPU affinity\n");
1562 static struct workqueue_ops log_compress_wq_ops = {
1563 .fn = gz_work_async,
1564 .init_worker_fn = gz_init_worker,
1568 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1570 if (!(td->flags & TD_F_COMPRESS_LOG))
1573 workqueue_init(td, &td->log_compress_wq, &log_compress_wq_ops, 1, sk_out);
1577 void iolog_compress_exit(struct thread_data *td)
1579 if (!(td->flags & TD_F_COMPRESS_LOG))
1582 workqueue_exit(&td->log_compress_wq);
1586 * Queue work item to compress the existing log entries. We reset the
1587 * current log to a small size, and reference the existing log in the
1588 * data that we queue for compression. Once compression has been done,
1589 * this old log is freed. Will not return until the log compression
1590 * has completed, and will flush all previous logs too
1592 static int iolog_flush(struct io_log *log)
1594 struct iolog_flush_data *data;
1596 workqueue_flush(&log->td->log_compress_wq);
1597 data = malloc(sizeof(*data));
1604 while (!flist_empty(&log->io_logs)) {
1605 struct io_logs *cur_log;
1607 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1608 flist_del_init(&cur_log->list);
1610 data->samples = cur_log->log;
1611 data->nr_samples = cur_log->nr_samples;
1622 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1624 struct iolog_flush_data *data;
1626 data = smalloc(sizeof(*data));
1632 data->samples = cur_log->log;
1633 data->nr_samples = cur_log->nr_samples;
1636 cur_log->nr_samples = cur_log->max_samples = 0;
1637 cur_log->log = NULL;
1639 workqueue_enqueue(&log->td->log_compress_wq, &data->work);
1641 iolog_free_deferred(log);
1647 static int iolog_flush(struct io_log *log)
1652 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1657 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1662 void iolog_compress_exit(struct thread_data *td)
1668 struct io_logs *iolog_cur_log(struct io_log *log)
1670 if (flist_empty(&log->io_logs))
1673 return flist_last_entry(&log->io_logs, struct io_logs, list);
1676 uint64_t iolog_nr_samples(struct io_log *iolog)
1678 struct flist_head *entry;
1681 flist_for_each(entry, &iolog->io_logs) {
1682 struct io_logs *cur_log;
1684 cur_log = flist_entry(entry, struct io_logs, list);
1685 ret += cur_log->nr_samples;
1691 static int __write_log(struct thread_data *td, struct io_log *log, int try)
1694 return finish_log(td, log, try);
1699 static int write_iops_log(struct thread_data *td, int try, bool unit_log)
1703 if (per_unit_log(td->iops_log) != unit_log)
1706 ret = __write_log(td, td->iops_log, try);
1708 td->iops_log = NULL;
1713 static int write_slat_log(struct thread_data *td, int try, bool unit_log)
1720 ret = __write_log(td, td->slat_log, try);
1722 td->slat_log = NULL;
1727 static int write_clat_log(struct thread_data *td, int try, bool unit_log)
1734 ret = __write_log(td, td->clat_log, try);
1736 td->clat_log = NULL;
1741 static int write_clat_hist_log(struct thread_data *td, int try, bool unit_log)
1748 ret = __write_log(td, td->clat_hist_log, try);
1750 td->clat_hist_log = NULL;
1755 static int write_lat_log(struct thread_data *td, int try, bool unit_log)
1762 ret = __write_log(td, td->lat_log, try);
1769 static int write_bandw_log(struct thread_data *td, int try, bool unit_log)
1773 if (per_unit_log(td->bw_log) != unit_log)
1776 ret = __write_log(td, td->bw_log, try);
1789 CLAT_HIST_LOG_MASK = 32,
1796 int (*fn)(struct thread_data *, int, bool);
1799 static struct log_type log_types[] = {
1801 .mask = BW_LOG_MASK,
1802 .fn = write_bandw_log,
1805 .mask = LAT_LOG_MASK,
1806 .fn = write_lat_log,
1809 .mask = SLAT_LOG_MASK,
1810 .fn = write_slat_log,
1813 .mask = CLAT_LOG_MASK,
1814 .fn = write_clat_log,
1817 .mask = IOPS_LOG_MASK,
1818 .fn = write_iops_log,
1821 .mask = CLAT_HIST_LOG_MASK,
1822 .fn = write_clat_hist_log,
1826 void td_writeout_logs(struct thread_data *td, bool unit_logs)
1828 unsigned int log_mask = 0;
1829 unsigned int log_left = ALL_LOG_NR;
1832 old_state = td_bump_runstate(td, TD_FINISHING);
1834 finalize_logs(td, unit_logs);
1837 int prev_log_left = log_left;
1839 for (i = 0; i < ALL_LOG_NR && log_left; i++) {
1840 struct log_type *lt = &log_types[i];
1843 if (!(log_mask & lt->mask)) {
1844 ret = lt->fn(td, log_left != 1, unit_logs);
1847 log_mask |= lt->mask;
1852 if (prev_log_left == log_left)
1856 td_restore_runstate(td, old_state);
1859 void fio_writeout_logs(bool unit_logs)
1861 struct thread_data *td;
1865 td_writeout_logs(td, unit_logs);