2 * Code related to writing an iolog of what a thread is doing, and to
3 * later read that back and replay
22 #include "lib/roundup.h"
24 #include <netinet/in.h>
25 #include <netinet/tcp.h>
26 #include <arpa/inet.h>
28 #include <sys/socket.h>
31 static int iolog_flush(struct io_log *log);
33 static const char iolog_ver2[] = "fio version 2 iolog";
34 static const char iolog_ver3[] = "fio version 3 iolog";
36 void queue_io_piece(struct thread_data *td, struct io_piece *ipo)
38 flist_add_tail(&ipo->list, &td->io_log_list);
39 td->total_io_size += ipo->len;
42 void log_io_u(const struct thread_data *td, const struct io_u *io_u)
46 if (!td->o.write_iolog_file)
49 fio_gettime(&now, NULL);
50 fprintf(td->iolog_f, "%lu %s %s %llu %llu\n", utime_since_now(&td->io_log_start_time),
51 io_u->file->file_name,
52 io_ddir_name(io_u->ddir),
53 io_u->offset, io_u->buflen);
57 void log_file(struct thread_data *td, struct fio_file *f,
58 enum file_log_act what)
60 const char *act[] = { "add", "open", "close" };
65 if (!td->o.write_iolog_file)
70 * this happens on the pre-open/close done before the job starts
75 fio_gettime(&now, NULL);
76 fprintf(td->iolog_f, "%lu %s %s\n", utime_since_now(&td->io_log_start_time),
77 f->file_name, act[what]);
80 static void iolog_delay(struct thread_data *td, unsigned long delay)
82 uint64_t usec = utime_since_now(&td->last_issue);
83 unsigned long orig_delay = delay;
87 if (delay < td->time_offset) {
92 delay -= td->time_offset;
98 fio_gettime(&ts, NULL);
99 while (delay && !td->terminate) {
101 if (this_delay > 500000)
104 usec_sleep(td, this_delay);
108 usec = utime_since_now(&ts);
109 if (usec > orig_delay)
110 td->time_offset = usec - orig_delay;
115 static int ipo_special(struct thread_data *td, struct io_piece *ipo)
123 if (ipo->ddir != DDIR_INVAL)
126 f = td->files[ipo->fileno];
129 iolog_delay(td, ipo->delay);
130 if (fio_fill_issue_time(td))
131 fio_gettime(&td->last_issue, NULL);
132 switch (ipo->file_action) {
133 case FIO_LOG_OPEN_FILE:
134 if (td->o.replay_redirect && fio_file_open(f)) {
135 dprint(FD_FILE, "iolog: ignoring re-open of file %s\n",
139 ret = td_io_open_file(td, f);
142 td_verror(td, ret, "iolog open file");
144 case FIO_LOG_CLOSE_FILE:
145 td_io_close_file(td, f);
147 case FIO_LOG_UNLINK_FILE:
148 td_io_unlink_file(td, f);
150 case FIO_LOG_ADD_FILE:
156 log_err("fio: bad file action %d\n", ipo->file_action);
163 static bool read_iolog(struct thread_data *td);
165 unsigned long long delay_since_ttime(const struct thread_data *td,
166 unsigned long long time)
170 const unsigned long long *last_ttime = &td->io_log_last_ttime;
172 if (!*last_ttime || td->o.no_stall || time < *last_ttime)
174 else if (td->o.replay_time_scale == 100)
175 return time - *last_ttime;
178 scale = (double) 100.0 / (double) td->o.replay_time_scale;
179 tmp = time - *last_ttime;
183 int read_iolog_get(struct thread_data *td, struct io_u *io_u)
185 struct io_piece *ipo;
186 unsigned long elapsed;
188 while (!flist_empty(&td->io_log_list)) {
191 if (td->o.read_iolog_chunked) {
192 if (td->io_log_checkmark == td->io_log_current) {
193 if (td->io_log_blktrace) {
194 if (!read_blktrace(td))
201 td->io_log_current--;
203 ipo = flist_first_entry(&td->io_log_list, struct io_piece, list);
204 flist_del(&ipo->list);
205 remove_trim_entry(td, ipo);
207 ret = ipo_special(td, ipo);
211 } else if (ret > 0) {
216 io_u->ddir = ipo->ddir;
217 if (ipo->ddir != DDIR_WAIT) {
218 io_u->offset = ipo->offset;
219 io_u->verify_offset = ipo->offset;
220 io_u->buflen = ipo->len;
221 io_u->file = td->files[ipo->fileno];
222 get_file(io_u->file);
223 dprint(FD_IO, "iolog: get %llu/%llu/%s\n", io_u->offset,
224 io_u->buflen, io_u->file->file_name);
226 iolog_delay(td, ipo->delay);
228 elapsed = mtime_since_genesis();
229 if (ipo->delay > elapsed)
230 usec_sleep(td, (ipo->delay - elapsed) * 1000);
235 if (io_u->ddir != DDIR_WAIT)
243 void prune_io_piece_log(struct thread_data *td)
245 struct io_piece *ipo;
246 struct fio_rb_node *n;
248 while ((n = rb_first(&td->io_hist_tree)) != NULL) {
249 ipo = rb_entry(n, struct io_piece, rb_node);
250 rb_erase(n, &td->io_hist_tree);
251 remove_trim_entry(td, ipo);
256 while (!flist_empty(&td->io_hist_list)) {
257 ipo = flist_first_entry(&td->io_hist_list, struct io_piece, list);
258 flist_del(&ipo->list);
259 remove_trim_entry(td, ipo);
266 * log a successful write, so we can unwind the log for verify
268 void log_io_piece(struct thread_data *td, struct io_u *io_u)
270 struct fio_rb_node **p, *parent;
271 struct io_piece *ipo, *__ipo;
273 ipo = calloc(1, sizeof(struct io_piece));
275 ipo->file = io_u->file;
276 ipo->offset = io_u->offset;
277 ipo->len = io_u->buflen;
278 ipo->numberio = io_u->numberio;
279 ipo->flags = IP_F_IN_FLIGHT;
283 if (io_u_should_trim(td, io_u)) {
284 flist_add_tail(&ipo->trim_list, &td->trim_list);
289 * Only sort writes if we don't have a random map in which case we need
290 * to check for duplicate blocks and drop the old one, which we rely on
291 * the rb insert/lookup for handling.
293 if (file_randommap(td, ipo->file)) {
294 INIT_FLIST_HEAD(&ipo->list);
295 flist_add_tail(&ipo->list, &td->io_hist_list);
296 ipo->flags |= IP_F_ONLIST;
301 RB_CLEAR_NODE(&ipo->rb_node);
304 * Sort the entry into the verification list
307 p = &td->io_hist_tree.rb_node;
313 __ipo = rb_entry(parent, struct io_piece, rb_node);
314 if (ipo->file < __ipo->file)
316 else if (ipo->file > __ipo->file)
318 else if (ipo->offset < __ipo->offset) {
320 overlap = ipo->offset + ipo->len > __ipo->offset;
322 else if (ipo->offset > __ipo->offset) {
324 overlap = __ipo->offset + __ipo->len > ipo->offset;
330 dprint(FD_IO, "iolog: overlap %llu/%lu, %llu/%lu\n",
331 __ipo->offset, __ipo->len,
332 ipo->offset, ipo->len);
334 rb_erase(parent, &td->io_hist_tree);
335 remove_trim_entry(td, __ipo);
336 if (!(__ipo->flags & IP_F_IN_FLIGHT))
342 rb_link_node(&ipo->rb_node, parent, p);
343 rb_insert_color(&ipo->rb_node, &td->io_hist_tree);
344 ipo->flags |= IP_F_ONRB;
348 void unlog_io_piece(struct thread_data *td, struct io_u *io_u)
350 struct io_piece *ipo = io_u->ipo;
352 if (td->ts.nr_block_infos) {
353 uint32_t *info = io_u_block_info(td, io_u);
354 if (BLOCK_INFO_STATE(*info) < BLOCK_STATE_TRIM_FAILURE) {
355 if (io_u->ddir == DDIR_TRIM)
356 *info = BLOCK_INFO_SET_STATE(*info,
357 BLOCK_STATE_TRIM_FAILURE);
358 else if (io_u->ddir == DDIR_WRITE)
359 *info = BLOCK_INFO_SET_STATE(*info,
360 BLOCK_STATE_WRITE_FAILURE);
367 if (ipo->flags & IP_F_ONRB)
368 rb_erase(&ipo->rb_node, &td->io_hist_tree);
369 else if (ipo->flags & IP_F_ONLIST)
370 flist_del(&ipo->list);
377 void trim_io_piece(const struct io_u *io_u)
379 struct io_piece *ipo = io_u->ipo;
384 ipo->len = io_u->xfer_buflen - io_u->resid;
387 void write_iolog_close(struct thread_data *td)
396 td->iolog_buf = NULL;
399 int64_t iolog_items_to_fetch(struct thread_data *td)
404 int64_t items_to_fetch;
406 if (!td->io_log_highmark)
410 fio_gettime(&now, NULL);
411 elapsed = ntime_since(&td->io_log_highmark_time, &now);
413 for_1s = (td->io_log_highmark - td->io_log_current) * 1000000000 / elapsed;
414 items_to_fetch = for_1s - td->io_log_current;
415 if (items_to_fetch < 0)
420 td->io_log_highmark = td->io_log_current + items_to_fetch;
421 td->io_log_checkmark = (td->io_log_highmark + 1) / 2;
422 fio_gettime(&td->io_log_highmark_time, NULL);
424 return items_to_fetch;
427 #define io_act(_td, _r) (((_td)->io_log_version == 3 && (r) == 5) || \
428 ((_td)->io_log_version == 2 && (r) == 4))
429 #define file_act(_td, _r) (((_td)->io_log_version == 3 && (r) == 3) || \
430 ((_td)->io_log_version == 2 && (r) == 2))
433 * Read version 2 and 3 iolog data. It is enhanced to include per-file logging,
436 static bool read_iolog(struct thread_data *td)
438 unsigned long long offset;
440 unsigned long long delay = 0;
441 int reads, writes, waits, fileno = 0, file_action = 0; /* stupid gcc */
442 char *rfname, *fname, *act;
445 bool realloc = false;
446 int64_t items_to_fetch = 0;
449 if (td->o.read_iolog_chunked) {
450 items_to_fetch = iolog_items_to_fetch(td);
456 * Read in the read iolog and store it, reuse the infrastructure
457 * for doing verifications.
460 rfname = fname = malloc(256+16);
461 act = malloc(256+16);
463 syncs = reads = writes = waits = 0;
464 while ((p = fgets(str, 4096, td->io_log_rfile)) != NULL) {
465 struct io_piece *ipo;
467 unsigned long long ttime;
469 if (td->io_log_version == 3) {
470 r = sscanf(p, "%llu %256s %256s %llu %u", &ttime, rfname, act,
472 delay = delay_since_ttime(td, ttime);
473 td->io_log_last_ttime = ttime;
475 * "wait" is not allowed with version 3
477 if (!strcmp(act, "wait")) {
478 log_err("iolog: ignoring wait command with"
479 " version 3 for file %s\n", fname);
482 } else /* version 2 */
483 r = sscanf(p, "%256s %256s %llu %u", rfname, act, &offset, &bytes);
485 if (td->o.replay_redirect)
486 fname = td->o.replay_redirect;
492 if (!strcmp(act, "wait"))
494 else if (!strcmp(act, "read"))
496 else if (!strcmp(act, "write"))
498 else if (!strcmp(act, "sync"))
500 else if (!strcmp(act, "datasync"))
502 else if (!strcmp(act, "trim"))
505 log_err("fio: bad iolog file action: %s\n",
509 fileno = get_fileno(td, fname);
510 } else if (file_act(td, r)) {
512 if (!strcmp(act, "add")) {
513 if (td->o.replay_redirect &&
514 get_fileno(td, fname) != -1) {
515 dprint(FD_FILE, "iolog: ignoring"
516 " re-add of file %s\n", fname);
518 fileno = add_file(td, fname, td->subjob_number, 1);
519 file_action = FIO_LOG_ADD_FILE;
521 } else if (!strcmp(act, "open")) {
522 fileno = get_fileno(td, fname);
523 file_action = FIO_LOG_OPEN_FILE;
524 } else if (!strcmp(act, "close")) {
525 fileno = get_fileno(td, fname);
526 file_action = FIO_LOG_CLOSE_FILE;
528 log_err("fio: bad iolog file action: %s\n",
533 log_err("bad iolog%d: %s\n", td->io_log_version, p);
539 else if (rw == DDIR_WRITE) {
541 * Don't add a write for ro mode
546 } else if (rw == DDIR_WAIT) {
550 } else if (rw == DDIR_INVAL) {
551 } else if (ddir_sync(rw)) {
554 log_err("bad ddir: %d\n", rw);
561 ipo = calloc(1, sizeof(*ipo));
564 if (td->io_log_version == 3)
566 if (rw == DDIR_WAIT) {
569 if (td->o.replay_scale)
570 ipo->offset = offset / td->o.replay_scale;
572 ipo->offset = offset;
573 ipo_bytes_align(td->o.replay_align, ipo);
576 if (rw != DDIR_INVAL && bytes > td->o.max_bs[rw]) {
578 td->o.max_bs[rw] = bytes;
580 ipo->fileno = fileno;
581 ipo->file_action = file_action;
585 queue_io_piece(td, ipo);
587 if (td->o.read_iolog_chunked) {
588 td->io_log_current++;
590 if (items_to_fetch == 0)
599 if (td->o.read_iolog_chunked) {
600 td->io_log_highmark = td->io_log_current;
601 td->io_log_checkmark = (td->io_log_highmark + 1) / 2;
602 fio_gettime(&td->io_log_highmark_time, NULL);
605 if (writes && read_only) {
606 log_err("fio: <%s> skips replay of %d writes due to"
607 " read-only\n", td->o.name, writes);
611 td->flags |= TD_F_SYNCS;
613 if (td->o.read_iolog_chunked) {
614 if (td->io_log_current == 0) {
617 td->o.td_ddir = TD_DDIR_RW;
618 if (realloc && td->orig_buffer)
622 init_io_u_buffers(td);
627 if (!reads && !writes && !waits)
629 else if (reads && !writes)
630 td->o.td_ddir = TD_DDIR_READ;
631 else if (!reads && writes)
632 td->o.td_ddir = TD_DDIR_WRITE;
634 td->o.td_ddir = TD_DDIR_RW;
639 static bool is_socket(const char *path)
644 r = stat(path, &buf);
648 return S_ISSOCK(buf.st_mode);
651 static int open_socket(const char *path)
653 struct sockaddr_un addr;
656 fd = socket(AF_UNIX, SOCK_STREAM, 0);
660 addr.sun_family = AF_UNIX;
661 if (snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", path) >=
662 sizeof(addr.sun_path)) {
663 log_err("%s: path name %s is too long for a Unix socket\n",
667 ret = connect(fd, (const struct sockaddr *)&addr, strlen(path) + sizeof(addr.sun_family));
676 * open iolog, check version, and call appropriate parser
678 static bool init_iolog_read(struct thread_data *td, char *fname)
680 char buffer[256], *p;
683 dprint(FD_IO, "iolog: name=%s\n", fname);
685 if (is_socket(fname)) {
688 fd = open_socket(fname);
691 } else if (!strcmp(fname, "-")) {
694 f = fopen(fname, "r");
697 perror("fopen read iolog");
701 p = fgets(buffer, sizeof(buffer), f);
703 td_verror(td, errno, "iolog read");
704 log_err("fio: unable to read iolog\n");
710 * versions 2 and 3 of the iolog store a specific string as the
711 * first line, check for that
713 if (!strncmp(iolog_ver2, buffer, strlen(iolog_ver2)))
714 td->io_log_version = 2;
715 else if (!strncmp(iolog_ver3, buffer, strlen(iolog_ver3)))
716 td->io_log_version = 3;
718 log_err("fio: iolog version 1 is no longer supported\n");
723 free_release_files(td);
724 td->io_log_rfile = f;
725 return read_iolog(td);
729 * Set up a log for storing io patterns.
731 static bool init_iolog_write(struct thread_data *td)
737 f = fopen(td->o.write_iolog_file, "a");
739 perror("fopen write iolog");
744 * That's it for writing, setup a log buffer and we're done.
747 td->iolog_buf = malloc(8192);
748 setvbuf(f, td->iolog_buf, _IOFBF, 8192);
749 fio_gettime(&td->io_log_start_time, NULL);
752 * write our version line
754 if (fprintf(f, "%s\n", iolog_ver3) < 0) {
755 perror("iolog init\n");
760 * add all known files
762 for_each_file(td, ff, i)
763 log_file(td, ff, FIO_LOG_ADD_FILE);
768 bool init_iolog(struct thread_data *td)
772 if (td->o.read_iolog_file) {
774 char * fname = get_name_by_idx(td->o.read_iolog_file, td->subjob_number);
777 * Check if it's a blktrace file and load that if possible.
778 * Otherwise assume it's a normal log file and load that.
780 if (is_blktrace(fname, &need_swap)) {
781 td->io_log_blktrace = 1;
782 ret = init_blktrace_read(td, fname, need_swap);
784 td->io_log_blktrace = 0;
785 ret = init_iolog_read(td, fname);
788 } else if (td->o.write_iolog_file)
789 ret = init_iolog_write(td);
794 td_verror(td, EINVAL, "failed initializing iolog");
799 void setup_log(struct io_log **log, struct log_params *p,
800 const char *filename)
804 struct io_u_plat_entry *entry;
805 struct flist_head *list;
807 l = scalloc(1, sizeof(*l));
808 INIT_FLIST_HEAD(&l->io_logs);
809 l->log_type = p->log_type;
810 l->log_offset = p->log_offset;
811 l->log_prio = p->log_prio;
812 l->log_gz = p->log_gz;
813 l->log_gz_store = p->log_gz_store;
814 l->avg_msec = p->avg_msec;
815 l->hist_msec = p->hist_msec;
816 l->hist_coarseness = p->hist_coarseness;
817 l->filename = strdup(filename);
820 /* Initialize histogram lists for each r/w direction,
821 * with initial io_u_plat of all zeros:
823 for (i = 0; i < DDIR_RWDIR_CNT; i++) {
824 list = &l->hist_window[i].list;
825 INIT_FLIST_HEAD(list);
826 entry = calloc(1, sizeof(struct io_u_plat_entry));
827 flist_add(&entry->list, list);
830 if (l->td && l->td->o.io_submit_mode != IO_MODE_OFFLOAD) {
831 unsigned int def_samples = DEF_LOG_ENTRIES;
834 __p = calloc(1, sizeof(*l->pending));
835 if (l->td->o.iodepth > DEF_LOG_ENTRIES)
836 def_samples = roundup_pow2(l->td->o.iodepth);
837 __p->max_samples = def_samples;
838 __p->log = calloc(__p->max_samples, log_entry_sz(l));
843 l->log_ddir_mask = LOG_OFFSET_SAMPLE_BIT;
845 l->log_ddir_mask |= LOG_PRIO_SAMPLE_BIT;
847 INIT_FLIST_HEAD(&l->chunk_list);
849 if (l->log_gz && !p->td)
851 else if (l->log_gz || l->log_gz_store) {
852 mutex_init_pshared(&l->chunk_lock);
853 mutex_init_pshared(&l->deferred_free_lock);
854 p->td->flags |= TD_F_COMPRESS_LOG;
860 #ifdef CONFIG_SETVBUF
861 static void *set_file_buffer(FILE *f)
863 size_t size = 1048576;
867 setvbuf(f, buf, _IOFBF, size);
871 static void clear_file_buffer(void *buf)
876 static void *set_file_buffer(FILE *f)
881 static void clear_file_buffer(void *buf)
886 void free_log(struct io_log *log)
888 while (!flist_empty(&log->io_logs)) {
889 struct io_logs *cur_log;
891 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
892 flist_del_init(&cur_log->list);
898 free(log->pending->log);
908 uint64_t hist_sum(int j, int stride, uint64_t *io_u_plat,
909 uint64_t *io_u_plat_last)
914 if (io_u_plat_last) {
915 for (k = sum = 0; k < stride; k++)
916 sum += io_u_plat[j + k] - io_u_plat_last[j + k];
918 for (k = sum = 0; k < stride; k++)
919 sum += io_u_plat[j + k];
925 static void flush_hist_samples(FILE *f, int hist_coarseness, void *samples,
926 uint64_t sample_size)
930 uint64_t i, j, nr_samples;
931 struct io_u_plat_entry *entry, *entry_before;
933 uint64_t *io_u_plat_before;
935 int stride = 1 << hist_coarseness;
940 s = __get_sample(samples, 0, 0);
941 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
943 nr_samples = sample_size / __log_entry_sz(log_offset);
945 for (i = 0; i < nr_samples; i++) {
946 s = __get_sample(samples, log_offset, i);
948 entry = s->data.plat_entry;
949 io_u_plat = entry->io_u_plat;
951 entry_before = flist_first_entry(&entry->list, struct io_u_plat_entry, list);
952 io_u_plat_before = entry_before->io_u_plat;
954 fprintf(f, "%lu, %u, %llu, ", (unsigned long) s->time,
955 io_sample_ddir(s), (unsigned long long) s->bs);
956 for (j = 0; j < FIO_IO_U_PLAT_NR - stride; j += stride) {
957 fprintf(f, "%llu, ", (unsigned long long)
958 hist_sum(j, stride, io_u_plat, io_u_plat_before));
960 fprintf(f, "%llu\n", (unsigned long long)
961 hist_sum(FIO_IO_U_PLAT_NR - stride, stride, io_u_plat,
964 flist_del(&entry_before->list);
969 void flush_samples(FILE *f, void *samples, uint64_t sample_size)
972 int log_offset, log_prio;
973 uint64_t i, nr_samples;
974 unsigned int prio_val;
980 s = __get_sample(samples, 0, 0);
981 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
982 log_prio = (s->__ddir & LOG_PRIO_SAMPLE_BIT) != 0;
986 fmt = "%lu, %" PRId64 ", %u, %llu, %llu, 0x%04x\n";
988 fmt = "%lu, %" PRId64 ", %u, %llu, %llu, %u\n";
991 fmt = "%lu, %" PRId64 ", %u, %llu, 0x%04x\n";
993 fmt = "%lu, %" PRId64 ", %u, %llu, %u\n";
996 nr_samples = sample_size / __log_entry_sz(log_offset);
998 for (i = 0; i < nr_samples; i++) {
999 s = __get_sample(samples, log_offset, i);
1002 prio_val = s->priority;
1004 prio_val = ioprio_value_is_class_rt(s->priority);
1008 (unsigned long) s->time,
1010 io_sample_ddir(s), (unsigned long long) s->bs,
1013 struct io_sample_offset *so = (void *) s;
1016 (unsigned long) s->time,
1018 io_sample_ddir(s), (unsigned long long) s->bs,
1019 (unsigned long long) so->offset,
1027 struct iolog_flush_data {
1028 struct workqueue_work work;
1031 uint32_t nr_samples;
1035 #define GZ_CHUNK 131072
1037 static struct iolog_compress *get_new_chunk(unsigned int seq)
1039 struct iolog_compress *c;
1041 c = malloc(sizeof(*c));
1042 INIT_FLIST_HEAD(&c->list);
1043 c->buf = malloc(GZ_CHUNK);
1049 static void free_chunk(struct iolog_compress *ic)
1055 static int z_stream_init(z_stream *stream, int gz_hdr)
1059 memset(stream, 0, sizeof(*stream));
1060 stream->zalloc = Z_NULL;
1061 stream->zfree = Z_NULL;
1062 stream->opaque = Z_NULL;
1063 stream->next_in = Z_NULL;
1066 * zlib magic - add 32 for auto-detection of gz header or not,
1067 * if we decide to store files in a gzip friendly format.
1072 if (inflateInit2(stream, wbits) != Z_OK)
1078 struct inflate_chunk_iter {
1087 static void finish_chunk(z_stream *stream, FILE *f,
1088 struct inflate_chunk_iter *iter)
1092 ret = inflateEnd(stream);
1094 log_err("fio: failed to end log inflation seq %d (%d)\n",
1097 flush_samples(f, iter->buf, iter->buf_used);
1100 iter->buf_size = iter->buf_used = 0;
1104 * Iterative chunk inflation. Handles cases where we cross into a new
1105 * sequence, doing flush finish of previous chunk if needed.
1107 static size_t inflate_chunk(struct iolog_compress *ic, int gz_hdr, FILE *f,
1108 z_stream *stream, struct inflate_chunk_iter *iter)
1112 dprint(FD_COMPRESS, "inflate chunk size=%lu, seq=%u\n",
1113 (unsigned long) ic->len, ic->seq);
1115 if (ic->seq != iter->seq) {
1117 finish_chunk(stream, f, iter);
1119 z_stream_init(stream, gz_hdr);
1120 iter->seq = ic->seq;
1123 stream->avail_in = ic->len;
1124 stream->next_in = ic->buf;
1126 if (!iter->buf_size) {
1127 iter->buf_size = iter->chunk_sz;
1128 iter->buf = malloc(iter->buf_size);
1131 while (stream->avail_in) {
1132 size_t this_out = iter->buf_size - iter->buf_used;
1135 stream->avail_out = this_out;
1136 stream->next_out = iter->buf + iter->buf_used;
1138 err = inflate(stream, Z_NO_FLUSH);
1140 log_err("fio: failed inflating log: %d\n", err);
1145 iter->buf_used += this_out - stream->avail_out;
1147 if (!stream->avail_out) {
1148 iter->buf_size += iter->chunk_sz;
1149 iter->buf = realloc(iter->buf, iter->buf_size);
1153 if (err == Z_STREAM_END)
1157 ret = (void *) stream->next_in - ic->buf;
1159 dprint(FD_COMPRESS, "inflated to size=%lu\n", (unsigned long) iter->buf_size);
1165 * Inflate stored compressed chunks, or write them directly to the log
1166 * file if so instructed.
1168 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1170 struct inflate_chunk_iter iter = { .chunk_sz = log->log_gz, };
1173 while (!flist_empty(&log->chunk_list)) {
1174 struct iolog_compress *ic;
1176 ic = flist_first_entry(&log->chunk_list, struct iolog_compress, list);
1177 flist_del(&ic->list);
1179 if (log->log_gz_store) {
1182 dprint(FD_COMPRESS, "log write chunk size=%lu, "
1183 "seq=%u\n", (unsigned long) ic->len, ic->seq);
1185 ret = fwrite(ic->buf, ic->len, 1, f);
1186 if (ret != 1 || ferror(f)) {
1188 log_err("fio: error writing compressed log\n");
1191 inflate_chunk(ic, log->log_gz_store, f, &stream, &iter);
1197 finish_chunk(&stream, f, &iter);
1205 * Open compressed log file and decompress the stored chunks and
1206 * write them to stdout. The chunks are stored sequentially in the
1207 * file, so we iterate over them and do them one-by-one.
1209 int iolog_file_inflate(const char *file)
1211 struct inflate_chunk_iter iter = { .chunk_sz = 64 * 1024 * 1024, };
1212 struct iolog_compress ic;
1220 f = fopen(file, "r");
1226 if (stat(file, &sb) < 0) {
1232 ic.buf = buf = malloc(sb.st_size);
1233 ic.len = sb.st_size;
1236 ret = fread(ic.buf, ic.len, 1, f);
1237 if (ret == 0 && ferror(f)) {
1242 } else if (ferror(f) || (!feof(f) && ret != 1)) {
1243 log_err("fio: short read on reading log\n");
1252 * Each chunk will return Z_STREAM_END. We don't know how many
1253 * chunks are in the file, so we just keep looping and incrementing
1254 * the sequence number until we have consumed the whole compressed
1261 iret = inflate_chunk(&ic, 1, stdout, &stream, &iter);
1274 finish_chunk(&stream, stdout, &iter);
1284 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1289 int iolog_file_inflate(const char *file)
1291 log_err("fio: log inflation not possible without zlib\n");
1297 void flush_log(struct io_log *log, bool do_append)
1303 f = fopen(log->filename, "w");
1305 f = fopen(log->filename, "a");
1307 perror("fopen log");
1311 buf = set_file_buffer(f);
1313 inflate_gz_chunks(log, f);
1315 while (!flist_empty(&log->io_logs)) {
1316 struct io_logs *cur_log;
1318 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1319 flist_del_init(&cur_log->list);
1321 if (log->td && log == log->td->clat_hist_log)
1322 flush_hist_samples(f, log->hist_coarseness, cur_log->log,
1323 log_sample_sz(log, cur_log));
1325 flush_samples(f, cur_log->log, log_sample_sz(log, cur_log));
1331 clear_file_buffer(buf);
1334 static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
1336 if (td->flags & TD_F_COMPRESS_LOG)
1340 if (fio_trylock_file(log->filename))
1343 fio_lock_file(log->filename);
1345 if (td->client_type == FIO_CLIENT_TYPE_GUI || is_backend)
1346 fio_send_iolog(td, log, log->filename);
1348 flush_log(log, !td->o.per_job_logs);
1350 fio_unlock_file(log->filename);
1355 size_t log_chunk_sizes(struct io_log *log)
1357 struct flist_head *entry;
1360 if (flist_empty(&log->chunk_list))
1364 pthread_mutex_lock(&log->chunk_lock);
1365 flist_for_each(entry, &log->chunk_list) {
1366 struct iolog_compress *c;
1368 c = flist_entry(entry, struct iolog_compress, list);
1371 pthread_mutex_unlock(&log->chunk_lock);
1377 static void iolog_put_deferred(struct io_log *log, void *ptr)
1382 pthread_mutex_lock(&log->deferred_free_lock);
1383 if (log->deferred < IOLOG_MAX_DEFER) {
1384 log->deferred_items[log->deferred] = ptr;
1386 } else if (!fio_did_warn(FIO_WARN_IOLOG_DROP))
1387 log_err("fio: had to drop log entry free\n");
1388 pthread_mutex_unlock(&log->deferred_free_lock);
1391 static void iolog_free_deferred(struct io_log *log)
1398 pthread_mutex_lock(&log->deferred_free_lock);
1400 for (i = 0; i < log->deferred; i++) {
1401 free(log->deferred_items[i]);
1402 log->deferred_items[i] = NULL;
1406 pthread_mutex_unlock(&log->deferred_free_lock);
1409 static int gz_work(struct iolog_flush_data *data)
1411 struct iolog_compress *c = NULL;
1412 struct flist_head list;
1418 INIT_FLIST_HEAD(&list);
1420 memset(&stream, 0, sizeof(stream));
1421 stream.zalloc = Z_NULL;
1422 stream.zfree = Z_NULL;
1423 stream.opaque = Z_NULL;
1425 ret = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
1427 log_err("fio: failed to init gz stream\n");
1431 seq = ++data->log->chunk_seq;
1433 stream.next_in = (void *) data->samples;
1434 stream.avail_in = data->nr_samples * log_entry_sz(data->log);
1436 dprint(FD_COMPRESS, "deflate input size=%lu, seq=%u, log=%s\n",
1437 (unsigned long) stream.avail_in, seq,
1438 data->log->filename);
1441 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1442 (unsigned long) c->len);
1443 c = get_new_chunk(seq);
1444 stream.avail_out = GZ_CHUNK;
1445 stream.next_out = c->buf;
1446 ret = deflate(&stream, Z_NO_FLUSH);
1448 log_err("fio: deflate log (%d)\n", ret);
1453 c->len = GZ_CHUNK - stream.avail_out;
1454 flist_add_tail(&c->list, &list);
1456 } while (stream.avail_in);
1458 stream.next_out = c->buf + c->len;
1459 stream.avail_out = GZ_CHUNK - c->len;
1461 ret = deflate(&stream, Z_FINISH);
1464 * Z_BUF_ERROR is special, it just means we need more
1465 * output space. We'll handle that below. Treat any other
1468 if (ret != Z_BUF_ERROR) {
1469 log_err("fio: deflate log (%d)\n", ret);
1470 flist_del(&c->list);
1477 c->len = GZ_CHUNK - stream.avail_out;
1479 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq, (unsigned long) c->len);
1481 if (ret != Z_STREAM_END) {
1483 c = get_new_chunk(seq);
1484 stream.avail_out = GZ_CHUNK;
1485 stream.next_out = c->buf;
1486 ret = deflate(&stream, Z_FINISH);
1487 c->len = GZ_CHUNK - stream.avail_out;
1489 flist_add_tail(&c->list, &list);
1490 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1491 (unsigned long) c->len);
1492 } while (ret != Z_STREAM_END);
1495 dprint(FD_COMPRESS, "deflated to size=%lu\n", (unsigned long) total);
1497 ret = deflateEnd(&stream);
1499 log_err("fio: deflateEnd %d\n", ret);
1501 iolog_put_deferred(data->log, data->samples);
1503 if (!flist_empty(&list)) {
1504 pthread_mutex_lock(&data->log->chunk_lock);
1505 flist_splice_tail(&list, &data->log->chunk_list);
1506 pthread_mutex_unlock(&data->log->chunk_lock);
1515 while (!flist_empty(&list)) {
1516 c = flist_first_entry(list.next, struct iolog_compress, list);
1517 flist_del(&c->list);
1525 * Invoked from our compress helper thread, when logging would have exceeded
1526 * the specified memory limitation. Compresses the previously stored
1529 static int gz_work_async(struct submit_worker *sw, struct workqueue_work *work)
1531 return gz_work(container_of(work, struct iolog_flush_data, work));
1534 static int gz_init_worker(struct submit_worker *sw)
1536 struct thread_data *td = sw->wq->td;
1538 if (!fio_option_is_set(&td->o, log_gz_cpumask))
1541 if (fio_setaffinity(gettid(), td->o.log_gz_cpumask) == -1) {
1542 log_err("gz: failed to set CPU affinity\n");
1549 static struct workqueue_ops log_compress_wq_ops = {
1550 .fn = gz_work_async,
1551 .init_worker_fn = gz_init_worker,
1555 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1557 if (!(td->flags & TD_F_COMPRESS_LOG))
1560 workqueue_init(td, &td->log_compress_wq, &log_compress_wq_ops, 1, sk_out);
1564 void iolog_compress_exit(struct thread_data *td)
1566 if (!(td->flags & TD_F_COMPRESS_LOG))
1569 workqueue_exit(&td->log_compress_wq);
1573 * Queue work item to compress the existing log entries. We reset the
1574 * current log to a small size, and reference the existing log in the
1575 * data that we queue for compression. Once compression has been done,
1576 * this old log is freed. If called with finish == true, will not return
1577 * until the log compression has completed, and will flush all previous
1580 static int iolog_flush(struct io_log *log)
1582 struct iolog_flush_data *data;
1584 data = malloc(sizeof(*data));
1591 while (!flist_empty(&log->io_logs)) {
1592 struct io_logs *cur_log;
1594 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1595 flist_del_init(&cur_log->list);
1597 data->samples = cur_log->log;
1598 data->nr_samples = cur_log->nr_samples;
1609 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1611 struct iolog_flush_data *data;
1613 data = smalloc(sizeof(*data));
1619 data->samples = cur_log->log;
1620 data->nr_samples = cur_log->nr_samples;
1623 cur_log->nr_samples = cur_log->max_samples = 0;
1624 cur_log->log = NULL;
1626 workqueue_enqueue(&log->td->log_compress_wq, &data->work);
1628 iolog_free_deferred(log);
1634 static int iolog_flush(struct io_log *log)
1639 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1644 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1649 void iolog_compress_exit(struct thread_data *td)
1655 struct io_logs *iolog_cur_log(struct io_log *log)
1657 if (flist_empty(&log->io_logs))
1660 return flist_last_entry(&log->io_logs, struct io_logs, list);
1663 uint64_t iolog_nr_samples(struct io_log *iolog)
1665 struct flist_head *entry;
1668 flist_for_each(entry, &iolog->io_logs) {
1669 struct io_logs *cur_log;
1671 cur_log = flist_entry(entry, struct io_logs, list);
1672 ret += cur_log->nr_samples;
1678 static int __write_log(struct thread_data *td, struct io_log *log, int try)
1681 return finish_log(td, log, try);
1686 static int write_iops_log(struct thread_data *td, int try, bool unit_log)
1690 if (per_unit_log(td->iops_log) != unit_log)
1693 ret = __write_log(td, td->iops_log, try);
1695 td->iops_log = NULL;
1700 static int write_slat_log(struct thread_data *td, int try, bool unit_log)
1707 ret = __write_log(td, td->slat_log, try);
1709 td->slat_log = NULL;
1714 static int write_clat_log(struct thread_data *td, int try, bool unit_log)
1721 ret = __write_log(td, td->clat_log, try);
1723 td->clat_log = NULL;
1728 static int write_clat_hist_log(struct thread_data *td, int try, bool unit_log)
1735 ret = __write_log(td, td->clat_hist_log, try);
1737 td->clat_hist_log = NULL;
1742 static int write_lat_log(struct thread_data *td, int try, bool unit_log)
1749 ret = __write_log(td, td->lat_log, try);
1756 static int write_bandw_log(struct thread_data *td, int try, bool unit_log)
1760 if (per_unit_log(td->bw_log) != unit_log)
1763 ret = __write_log(td, td->bw_log, try);
1776 CLAT_HIST_LOG_MASK = 32,
1783 int (*fn)(struct thread_data *, int, bool);
1786 static struct log_type log_types[] = {
1788 .mask = BW_LOG_MASK,
1789 .fn = write_bandw_log,
1792 .mask = LAT_LOG_MASK,
1793 .fn = write_lat_log,
1796 .mask = SLAT_LOG_MASK,
1797 .fn = write_slat_log,
1800 .mask = CLAT_LOG_MASK,
1801 .fn = write_clat_log,
1804 .mask = IOPS_LOG_MASK,
1805 .fn = write_iops_log,
1808 .mask = CLAT_HIST_LOG_MASK,
1809 .fn = write_clat_hist_log,
1813 void td_writeout_logs(struct thread_data *td, bool unit_logs)
1815 unsigned int log_mask = 0;
1816 unsigned int log_left = ALL_LOG_NR;
1819 old_state = td_bump_runstate(td, TD_FINISHING);
1821 finalize_logs(td, unit_logs);
1824 int prev_log_left = log_left;
1826 for (i = 0; i < ALL_LOG_NR && log_left; i++) {
1827 struct log_type *lt = &log_types[i];
1830 if (!(log_mask & lt->mask)) {
1831 ret = lt->fn(td, log_left != 1, unit_logs);
1834 log_mask |= lt->mask;
1839 if (prev_log_left == log_left)
1843 td_restore_runstate(td, old_state);
1846 void fio_writeout_logs(bool unit_logs)
1848 struct thread_data *td;
1852 td_writeout_logs(td, unit_logs);