2 * Code related to writing an iolog of what a thread is doing, and to
3 * later read that back and replay
22 #include "lib/roundup.h"
24 #include <netinet/in.h>
25 #include <netinet/tcp.h>
26 #include <arpa/inet.h>
28 #include <sys/socket.h>
31 static int iolog_flush(struct io_log *log);
33 static const char iolog_ver2[] = "fio version 2 iolog";
34 static const char iolog_ver3[] = "fio version 3 iolog";
36 void queue_io_piece(struct thread_data *td, struct io_piece *ipo)
38 flist_add_tail(&ipo->list, &td->io_log_list);
39 td->total_io_size += ipo->len;
42 void log_io_u(const struct thread_data *td, const struct io_u *io_u)
46 if (!td->o.write_iolog_file)
49 fio_gettime(&now, NULL);
50 fprintf(td->iolog_f, "%llu %s %s %llu %llu\n",
51 (unsigned long long) utime_since_now(&td->io_log_start_time),
52 io_u->file->file_name, io_ddir_name(io_u->ddir), io_u->offset,
57 void log_file(struct thread_data *td, struct fio_file *f,
58 enum file_log_act what)
60 const char *act[] = { "add", "open", "close" };
65 if (!td->o.write_iolog_file)
70 * this happens on the pre-open/close done before the job starts
75 fio_gettime(&now, NULL);
76 fprintf(td->iolog_f, "%llu %s %s\n",
77 (unsigned long long) utime_since_now(&td->io_log_start_time),
78 f->file_name, act[what]);
81 static void iolog_delay(struct thread_data *td, unsigned long delay)
83 uint64_t usec = utime_since_now(&td->last_issue);
84 unsigned long orig_delay = delay;
88 if (delay < td->time_offset) {
93 delay -= td->time_offset;
99 fio_gettime(&ts, NULL);
100 while (delay && !td->terminate) {
102 if (this_delay > 500000)
105 usec_sleep(td, this_delay);
109 usec = utime_since_now(&ts);
110 if (usec > orig_delay)
111 td->time_offset = usec - orig_delay;
116 static int ipo_special(struct thread_data *td, struct io_piece *ipo)
124 if (ipo->ddir != DDIR_INVAL)
127 f = td->files[ipo->fileno];
130 iolog_delay(td, ipo->delay);
131 if (fio_fill_issue_time(td))
132 fio_gettime(&td->last_issue, NULL);
133 switch (ipo->file_action) {
134 case FIO_LOG_OPEN_FILE:
135 if (td->o.replay_redirect && fio_file_open(f)) {
136 dprint(FD_FILE, "iolog: ignoring re-open of file %s\n",
140 ret = td_io_open_file(td, f);
143 td_verror(td, ret, "iolog open file");
145 case FIO_LOG_CLOSE_FILE:
146 td_io_close_file(td, f);
148 case FIO_LOG_UNLINK_FILE:
149 td_io_unlink_file(td, f);
151 case FIO_LOG_ADD_FILE:
157 log_err("fio: bad file action %d\n", ipo->file_action);
164 static bool read_iolog(struct thread_data *td);
166 unsigned long long delay_since_ttime(const struct thread_data *td,
167 unsigned long long time)
171 const unsigned long long *last_ttime = &td->io_log_last_ttime;
173 if (!*last_ttime || td->o.no_stall || time < *last_ttime)
175 else if (td->o.replay_time_scale == 100)
176 return time - *last_ttime;
179 scale = (double) 100.0 / (double) td->o.replay_time_scale;
180 tmp = time - *last_ttime;
184 int read_iolog_get(struct thread_data *td, struct io_u *io_u)
186 struct io_piece *ipo;
187 unsigned long elapsed;
189 while (!flist_empty(&td->io_log_list)) {
192 if (td->o.read_iolog_chunked) {
193 if (td->io_log_checkmark == td->io_log_current) {
194 if (td->io_log_blktrace) {
195 if (!read_blktrace(td))
202 td->io_log_current--;
204 ipo = flist_first_entry(&td->io_log_list, struct io_piece, list);
205 flist_del(&ipo->list);
206 remove_trim_entry(td, ipo);
208 ret = ipo_special(td, ipo);
212 } else if (ret > 0) {
217 io_u->ddir = ipo->ddir;
218 if (ipo->ddir != DDIR_WAIT) {
219 io_u->offset = ipo->offset;
220 io_u->verify_offset = ipo->offset;
221 io_u->buflen = ipo->len;
222 io_u->file = td->files[ipo->fileno];
223 get_file(io_u->file);
224 dprint(FD_IO, "iolog: get %llu/%llu/%s\n", io_u->offset,
225 io_u->buflen, io_u->file->file_name);
227 iolog_delay(td, ipo->delay);
229 elapsed = mtime_since_genesis();
230 if (ipo->delay > elapsed)
231 usec_sleep(td, (ipo->delay - elapsed) * 1000);
236 if (io_u->ddir != DDIR_WAIT)
244 void prune_io_piece_log(struct thread_data *td)
246 struct io_piece *ipo;
247 struct fio_rb_node *n;
249 while ((n = rb_first(&td->io_hist_tree)) != NULL) {
250 ipo = rb_entry(n, struct io_piece, rb_node);
251 rb_erase(n, &td->io_hist_tree);
252 remove_trim_entry(td, ipo);
257 while (!flist_empty(&td->io_hist_list)) {
258 ipo = flist_first_entry(&td->io_hist_list, struct io_piece, list);
259 flist_del(&ipo->list);
260 remove_trim_entry(td, ipo);
267 * log a successful write, so we can unwind the log for verify
269 void log_io_piece(struct thread_data *td, struct io_u *io_u)
271 struct fio_rb_node **p, *parent;
272 struct io_piece *ipo, *__ipo;
274 ipo = calloc(1, sizeof(struct io_piece));
276 ipo->file = io_u->file;
277 ipo->offset = io_u->offset;
278 ipo->len = io_u->buflen;
279 ipo->numberio = io_u->numberio;
280 ipo->flags = IP_F_IN_FLIGHT;
284 if (io_u_should_trim(td, io_u)) {
285 flist_add_tail(&ipo->trim_list, &td->trim_list);
290 * Only sort writes if we don't have a random map in which case we need
291 * to check for duplicate blocks and drop the old one, which we rely on
292 * the rb insert/lookup for handling.
294 if (file_randommap(td, ipo->file)) {
295 INIT_FLIST_HEAD(&ipo->list);
296 flist_add_tail(&ipo->list, &td->io_hist_list);
297 ipo->flags |= IP_F_ONLIST;
302 RB_CLEAR_NODE(&ipo->rb_node);
305 * Sort the entry into the verification list
308 p = &td->io_hist_tree.rb_node;
314 __ipo = rb_entry(parent, struct io_piece, rb_node);
315 if (ipo->file < __ipo->file)
317 else if (ipo->file > __ipo->file)
319 else if (ipo->offset < __ipo->offset) {
321 overlap = ipo->offset + ipo->len > __ipo->offset;
323 else if (ipo->offset > __ipo->offset) {
325 overlap = __ipo->offset + __ipo->len > ipo->offset;
331 dprint(FD_IO, "iolog: overlap %llu/%lu, %llu/%lu\n",
332 __ipo->offset, __ipo->len,
333 ipo->offset, ipo->len);
335 rb_erase(parent, &td->io_hist_tree);
336 remove_trim_entry(td, __ipo);
337 if (!(__ipo->flags & IP_F_IN_FLIGHT))
343 rb_link_node(&ipo->rb_node, parent, p);
344 rb_insert_color(&ipo->rb_node, &td->io_hist_tree);
345 ipo->flags |= IP_F_ONRB;
349 void unlog_io_piece(struct thread_data *td, struct io_u *io_u)
351 struct io_piece *ipo = io_u->ipo;
353 if (td->ts.nr_block_infos) {
354 uint32_t *info = io_u_block_info(td, io_u);
355 if (BLOCK_INFO_STATE(*info) < BLOCK_STATE_TRIM_FAILURE) {
356 if (io_u->ddir == DDIR_TRIM)
357 *info = BLOCK_INFO_SET_STATE(*info,
358 BLOCK_STATE_TRIM_FAILURE);
359 else if (io_u->ddir == DDIR_WRITE)
360 *info = BLOCK_INFO_SET_STATE(*info,
361 BLOCK_STATE_WRITE_FAILURE);
368 if (ipo->flags & IP_F_ONRB)
369 rb_erase(&ipo->rb_node, &td->io_hist_tree);
370 else if (ipo->flags & IP_F_ONLIST)
371 flist_del(&ipo->list);
378 void trim_io_piece(const struct io_u *io_u)
380 struct io_piece *ipo = io_u->ipo;
385 ipo->len = io_u->xfer_buflen - io_u->resid;
388 void write_iolog_close(struct thread_data *td)
397 td->iolog_buf = NULL;
400 int64_t iolog_items_to_fetch(struct thread_data *td)
405 int64_t items_to_fetch;
407 if (!td->io_log_highmark)
411 fio_gettime(&now, NULL);
412 elapsed = ntime_since(&td->io_log_highmark_time, &now);
414 for_1s = (td->io_log_highmark - td->io_log_current) * 1000000000 / elapsed;
415 items_to_fetch = for_1s - td->io_log_current;
416 if (items_to_fetch < 0)
421 td->io_log_highmark = td->io_log_current + items_to_fetch;
422 td->io_log_checkmark = (td->io_log_highmark + 1) / 2;
423 fio_gettime(&td->io_log_highmark_time, NULL);
425 return items_to_fetch;
428 #define io_act(_td, _r) (((_td)->io_log_version == 3 && (r) == 5) || \
429 ((_td)->io_log_version == 2 && (r) == 4))
430 #define file_act(_td, _r) (((_td)->io_log_version == 3 && (r) == 3) || \
431 ((_td)->io_log_version == 2 && (r) == 2))
434 * Read version 2 and 3 iolog data. It is enhanced to include per-file logging,
437 static bool read_iolog(struct thread_data *td)
439 unsigned long long offset;
441 unsigned long long delay = 0;
442 int reads, writes, waits, fileno = 0, file_action = 0; /* stupid gcc */
443 char *rfname, *fname, *act;
446 bool realloc = false;
447 int64_t items_to_fetch = 0;
450 if (td->o.read_iolog_chunked) {
451 items_to_fetch = iolog_items_to_fetch(td);
457 * Read in the read iolog and store it, reuse the infrastructure
458 * for doing verifications.
461 rfname = fname = malloc(256+16);
462 act = malloc(256+16);
464 syncs = reads = writes = waits = 0;
465 while ((p = fgets(str, 4096, td->io_log_rfile)) != NULL) {
466 struct io_piece *ipo;
468 unsigned long long ttime;
470 if (td->io_log_version == 3) {
471 r = sscanf(p, "%llu %256s %256s %llu %u", &ttime, rfname, act,
473 delay = delay_since_ttime(td, ttime);
474 td->io_log_last_ttime = ttime;
476 * "wait" is not allowed with version 3
478 if (!strcmp(act, "wait")) {
479 log_err("iolog: ignoring wait command with"
480 " version 3 for file %s\n", fname);
483 } else /* version 2 */
484 r = sscanf(p, "%256s %256s %llu %u", rfname, act, &offset, &bytes);
486 if (td->o.replay_redirect)
487 fname = td->o.replay_redirect;
493 if (!strcmp(act, "wait"))
495 else if (!strcmp(act, "read"))
497 else if (!strcmp(act, "write"))
499 else if (!strcmp(act, "sync"))
501 else if (!strcmp(act, "datasync"))
503 else if (!strcmp(act, "trim"))
506 log_err("fio: bad iolog file action: %s\n",
510 fileno = get_fileno(td, fname);
511 } else if (file_act(td, r)) {
513 if (!strcmp(act, "add")) {
514 if (td->o.replay_redirect &&
515 get_fileno(td, fname) != -1) {
516 dprint(FD_FILE, "iolog: ignoring"
517 " re-add of file %s\n", fname);
519 fileno = add_file(td, fname, td->subjob_number, 1);
520 file_action = FIO_LOG_ADD_FILE;
522 } else if (!strcmp(act, "open")) {
523 fileno = get_fileno(td, fname);
524 file_action = FIO_LOG_OPEN_FILE;
525 } else if (!strcmp(act, "close")) {
526 fileno = get_fileno(td, fname);
527 file_action = FIO_LOG_CLOSE_FILE;
529 log_err("fio: bad iolog file action: %s\n",
534 log_err("bad iolog%d: %s\n", td->io_log_version, p);
540 else if (rw == DDIR_WRITE) {
542 * Don't add a write for ro mode
547 } else if (rw == DDIR_WAIT) {
551 } else if (rw == DDIR_INVAL) {
552 } else if (ddir_sync(rw)) {
555 log_err("bad ddir: %d\n", rw);
562 ipo = calloc(1, sizeof(*ipo));
565 if (td->io_log_version == 3)
567 if (rw == DDIR_WAIT) {
570 if (td->o.replay_scale)
571 ipo->offset = offset / td->o.replay_scale;
573 ipo->offset = offset;
574 ipo_bytes_align(td->o.replay_align, ipo);
577 if (rw != DDIR_INVAL && bytes > td->o.max_bs[rw]) {
579 td->o.max_bs[rw] = bytes;
581 ipo->fileno = fileno;
582 ipo->file_action = file_action;
586 queue_io_piece(td, ipo);
588 if (td->o.read_iolog_chunked) {
589 td->io_log_current++;
591 if (items_to_fetch == 0)
600 if (td->o.read_iolog_chunked) {
601 td->io_log_highmark = td->io_log_current;
602 td->io_log_checkmark = (td->io_log_highmark + 1) / 2;
603 fio_gettime(&td->io_log_highmark_time, NULL);
606 if (writes && read_only) {
607 log_err("fio: <%s> skips replay of %d writes due to"
608 " read-only\n", td->o.name, writes);
612 td->flags |= TD_F_SYNCS;
614 if (td->o.read_iolog_chunked) {
615 if (td->io_log_current == 0) {
618 td->o.td_ddir = TD_DDIR_RW;
619 if (realloc && td->orig_buffer)
623 init_io_u_buffers(td);
628 if (!reads && !writes && !waits)
630 else if (reads && !writes)
631 td->o.td_ddir = TD_DDIR_READ;
632 else if (!reads && writes)
633 td->o.td_ddir = TD_DDIR_WRITE;
635 td->o.td_ddir = TD_DDIR_RW;
640 static bool is_socket(const char *path)
645 r = stat(path, &buf);
649 return S_ISSOCK(buf.st_mode);
652 static int open_socket(const char *path)
654 struct sockaddr_un addr;
657 fd = socket(AF_UNIX, SOCK_STREAM, 0);
661 addr.sun_family = AF_UNIX;
662 if (snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", path) >=
663 sizeof(addr.sun_path)) {
664 log_err("%s: path name %s is too long for a Unix socket\n",
668 ret = connect(fd, (const struct sockaddr *)&addr, strlen(path) + sizeof(addr.sun_family));
677 * open iolog, check version, and call appropriate parser
679 static bool init_iolog_read(struct thread_data *td, char *fname)
681 char buffer[256], *p;
684 dprint(FD_IO, "iolog: name=%s\n", fname);
686 if (is_socket(fname)) {
689 fd = open_socket(fname);
692 } else if (!strcmp(fname, "-")) {
695 f = fopen(fname, "r");
698 perror("fopen read iolog");
702 p = fgets(buffer, sizeof(buffer), f);
704 td_verror(td, errno, "iolog read");
705 log_err("fio: unable to read iolog\n");
711 * versions 2 and 3 of the iolog store a specific string as the
712 * first line, check for that
714 if (!strncmp(iolog_ver2, buffer, strlen(iolog_ver2)))
715 td->io_log_version = 2;
716 else if (!strncmp(iolog_ver3, buffer, strlen(iolog_ver3)))
717 td->io_log_version = 3;
719 log_err("fio: iolog version 1 is no longer supported\n");
724 free_release_files(td);
725 td->io_log_rfile = f;
726 return read_iolog(td);
730 * Set up a log for storing io patterns.
732 static bool init_iolog_write(struct thread_data *td)
738 f = fopen(td->o.write_iolog_file, "a");
740 perror("fopen write iolog");
745 * That's it for writing, setup a log buffer and we're done.
748 td->iolog_buf = malloc(8192);
749 setvbuf(f, td->iolog_buf, _IOFBF, 8192);
750 fio_gettime(&td->io_log_start_time, NULL);
753 * write our version line
755 if (fprintf(f, "%s\n", iolog_ver3) < 0) {
756 perror("iolog init\n");
761 * add all known files
763 for_each_file(td, ff, i)
764 log_file(td, ff, FIO_LOG_ADD_FILE);
769 bool init_iolog(struct thread_data *td)
773 if (td->o.read_iolog_file) {
775 char * fname = get_name_by_idx(td->o.read_iolog_file, td->subjob_number);
778 * Check if it's a blktrace file and load that if possible.
779 * Otherwise assume it's a normal log file and load that.
781 if (is_blktrace(fname, &need_swap)) {
782 td->io_log_blktrace = 1;
783 ret = init_blktrace_read(td, fname, need_swap);
785 td->io_log_blktrace = 0;
786 ret = init_iolog_read(td, fname);
789 } else if (td->o.write_iolog_file)
790 ret = init_iolog_write(td);
795 td_verror(td, EINVAL, "failed initializing iolog");
800 void setup_log(struct io_log **log, struct log_params *p,
801 const char *filename)
805 struct io_u_plat_entry *entry;
806 struct flist_head *list;
808 l = scalloc(1, sizeof(*l));
809 INIT_FLIST_HEAD(&l->io_logs);
810 l->log_type = p->log_type;
811 l->log_offset = p->log_offset;
812 l->log_prio = p->log_prio;
813 l->log_gz = p->log_gz;
814 l->log_gz_store = p->log_gz_store;
815 l->avg_msec = p->avg_msec;
816 l->hist_msec = p->hist_msec;
817 l->hist_coarseness = p->hist_coarseness;
818 l->filename = strdup(filename);
821 /* Initialize histogram lists for each r/w direction,
822 * with initial io_u_plat of all zeros:
824 for (i = 0; i < DDIR_RWDIR_CNT; i++) {
825 list = &l->hist_window[i].list;
826 INIT_FLIST_HEAD(list);
827 entry = calloc(1, sizeof(struct io_u_plat_entry));
828 flist_add(&entry->list, list);
831 if (l->td && l->td->o.io_submit_mode != IO_MODE_OFFLOAD) {
832 unsigned int def_samples = DEF_LOG_ENTRIES;
835 __p = calloc(1, sizeof(*l->pending));
836 if (l->td->o.iodepth > DEF_LOG_ENTRIES)
837 def_samples = roundup_pow2(l->td->o.iodepth);
838 __p->max_samples = def_samples;
839 __p->log = calloc(__p->max_samples, log_entry_sz(l));
844 l->log_ddir_mask = LOG_OFFSET_SAMPLE_BIT;
846 l->log_ddir_mask |= LOG_PRIO_SAMPLE_BIT;
848 INIT_FLIST_HEAD(&l->chunk_list);
850 if (l->log_gz && !p->td)
852 else if (l->log_gz || l->log_gz_store) {
853 mutex_init_pshared(&l->chunk_lock);
854 mutex_init_pshared(&l->deferred_free_lock);
855 p->td->flags |= TD_F_COMPRESS_LOG;
861 #ifdef CONFIG_SETVBUF
862 static void *set_file_buffer(FILE *f)
864 size_t size = 1048576;
868 setvbuf(f, buf, _IOFBF, size);
872 static void clear_file_buffer(void *buf)
877 static void *set_file_buffer(FILE *f)
882 static void clear_file_buffer(void *buf)
887 void free_log(struct io_log *log)
889 while (!flist_empty(&log->io_logs)) {
890 struct io_logs *cur_log;
892 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
893 flist_del_init(&cur_log->list);
899 free(log->pending->log);
909 uint64_t hist_sum(int j, int stride, uint64_t *io_u_plat,
910 uint64_t *io_u_plat_last)
915 if (io_u_plat_last) {
916 for (k = sum = 0; k < stride; k++)
917 sum += io_u_plat[j + k] - io_u_plat_last[j + k];
919 for (k = sum = 0; k < stride; k++)
920 sum += io_u_plat[j + k];
926 static void flush_hist_samples(FILE *f, int hist_coarseness, void *samples,
927 uint64_t sample_size)
931 uint64_t i, j, nr_samples;
932 struct io_u_plat_entry *entry, *entry_before;
934 uint64_t *io_u_plat_before;
936 int stride = 1 << hist_coarseness;
941 s = __get_sample(samples, 0, 0);
942 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
944 nr_samples = sample_size / __log_entry_sz(log_offset);
946 for (i = 0; i < nr_samples; i++) {
947 s = __get_sample(samples, log_offset, i);
949 entry = s->data.plat_entry;
950 io_u_plat = entry->io_u_plat;
952 entry_before = flist_first_entry(&entry->list, struct io_u_plat_entry, list);
953 io_u_plat_before = entry_before->io_u_plat;
955 fprintf(f, "%lu, %u, %llu, ", (unsigned long) s->time,
956 io_sample_ddir(s), (unsigned long long) s->bs);
957 for (j = 0; j < FIO_IO_U_PLAT_NR - stride; j += stride) {
958 fprintf(f, "%llu, ", (unsigned long long)
959 hist_sum(j, stride, io_u_plat, io_u_plat_before));
961 fprintf(f, "%llu\n", (unsigned long long)
962 hist_sum(FIO_IO_U_PLAT_NR - stride, stride, io_u_plat,
965 flist_del(&entry_before->list);
970 void flush_samples(FILE *f, void *samples, uint64_t sample_size)
973 int log_offset, log_prio;
974 uint64_t i, nr_samples;
975 unsigned int prio_val;
981 s = __get_sample(samples, 0, 0);
982 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
983 log_prio = (s->__ddir & LOG_PRIO_SAMPLE_BIT) != 0;
987 fmt = "%lu, %" PRId64 ", %u, %llu, %llu, 0x%04x\n";
989 fmt = "%lu, %" PRId64 ", %u, %llu, %llu, %u\n";
992 fmt = "%lu, %" PRId64 ", %u, %llu, 0x%04x\n";
994 fmt = "%lu, %" PRId64 ", %u, %llu, %u\n";
997 nr_samples = sample_size / __log_entry_sz(log_offset);
999 for (i = 0; i < nr_samples; i++) {
1000 s = __get_sample(samples, log_offset, i);
1003 prio_val = s->priority;
1005 prio_val = ioprio_value_is_class_rt(s->priority);
1009 (unsigned long) s->time,
1011 io_sample_ddir(s), (unsigned long long) s->bs,
1014 struct io_sample_offset *so = (void *) s;
1017 (unsigned long) s->time,
1019 io_sample_ddir(s), (unsigned long long) s->bs,
1020 (unsigned long long) so->offset,
1028 struct iolog_flush_data {
1029 struct workqueue_work work;
1032 uint32_t nr_samples;
1036 #define GZ_CHUNK 131072
1038 static struct iolog_compress *get_new_chunk(unsigned int seq)
1040 struct iolog_compress *c;
1042 c = malloc(sizeof(*c));
1043 INIT_FLIST_HEAD(&c->list);
1044 c->buf = malloc(GZ_CHUNK);
1050 static void free_chunk(struct iolog_compress *ic)
1056 static int z_stream_init(z_stream *stream, int gz_hdr)
1060 memset(stream, 0, sizeof(*stream));
1061 stream->zalloc = Z_NULL;
1062 stream->zfree = Z_NULL;
1063 stream->opaque = Z_NULL;
1064 stream->next_in = Z_NULL;
1067 * zlib magic - add 32 for auto-detection of gz header or not,
1068 * if we decide to store files in a gzip friendly format.
1073 if (inflateInit2(stream, wbits) != Z_OK)
1079 struct inflate_chunk_iter {
1088 static void finish_chunk(z_stream *stream, FILE *f,
1089 struct inflate_chunk_iter *iter)
1093 ret = inflateEnd(stream);
1095 log_err("fio: failed to end log inflation seq %d (%d)\n",
1098 flush_samples(f, iter->buf, iter->buf_used);
1101 iter->buf_size = iter->buf_used = 0;
1105 * Iterative chunk inflation. Handles cases where we cross into a new
1106 * sequence, doing flush finish of previous chunk if needed.
1108 static size_t inflate_chunk(struct iolog_compress *ic, int gz_hdr, FILE *f,
1109 z_stream *stream, struct inflate_chunk_iter *iter)
1113 dprint(FD_COMPRESS, "inflate chunk size=%lu, seq=%u\n",
1114 (unsigned long) ic->len, ic->seq);
1116 if (ic->seq != iter->seq) {
1118 finish_chunk(stream, f, iter);
1120 z_stream_init(stream, gz_hdr);
1121 iter->seq = ic->seq;
1124 stream->avail_in = ic->len;
1125 stream->next_in = ic->buf;
1127 if (!iter->buf_size) {
1128 iter->buf_size = iter->chunk_sz;
1129 iter->buf = malloc(iter->buf_size);
1132 while (stream->avail_in) {
1133 size_t this_out = iter->buf_size - iter->buf_used;
1136 stream->avail_out = this_out;
1137 stream->next_out = iter->buf + iter->buf_used;
1139 err = inflate(stream, Z_NO_FLUSH);
1141 log_err("fio: failed inflating log: %d\n", err);
1146 iter->buf_used += this_out - stream->avail_out;
1148 if (!stream->avail_out) {
1149 iter->buf_size += iter->chunk_sz;
1150 iter->buf = realloc(iter->buf, iter->buf_size);
1154 if (err == Z_STREAM_END)
1158 ret = (void *) stream->next_in - ic->buf;
1160 dprint(FD_COMPRESS, "inflated to size=%lu\n", (unsigned long) iter->buf_size);
1166 * Inflate stored compressed chunks, or write them directly to the log
1167 * file if so instructed.
1169 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1171 struct inflate_chunk_iter iter = { .chunk_sz = log->log_gz, };
1174 while (!flist_empty(&log->chunk_list)) {
1175 struct iolog_compress *ic;
1177 ic = flist_first_entry(&log->chunk_list, struct iolog_compress, list);
1178 flist_del(&ic->list);
1180 if (log->log_gz_store) {
1183 dprint(FD_COMPRESS, "log write chunk size=%lu, "
1184 "seq=%u\n", (unsigned long) ic->len, ic->seq);
1186 ret = fwrite(ic->buf, ic->len, 1, f);
1187 if (ret != 1 || ferror(f)) {
1189 log_err("fio: error writing compressed log\n");
1192 inflate_chunk(ic, log->log_gz_store, f, &stream, &iter);
1198 finish_chunk(&stream, f, &iter);
1206 * Open compressed log file and decompress the stored chunks and
1207 * write them to stdout. The chunks are stored sequentially in the
1208 * file, so we iterate over them and do them one-by-one.
1210 int iolog_file_inflate(const char *file)
1212 struct inflate_chunk_iter iter = { .chunk_sz = 64 * 1024 * 1024, };
1213 struct iolog_compress ic;
1221 f = fopen(file, "r");
1227 if (stat(file, &sb) < 0) {
1233 ic.buf = buf = malloc(sb.st_size);
1234 ic.len = sb.st_size;
1237 ret = fread(ic.buf, ic.len, 1, f);
1238 if (ret == 0 && ferror(f)) {
1243 } else if (ferror(f) || (!feof(f) && ret != 1)) {
1244 log_err("fio: short read on reading log\n");
1253 * Each chunk will return Z_STREAM_END. We don't know how many
1254 * chunks are in the file, so we just keep looping and incrementing
1255 * the sequence number until we have consumed the whole compressed
1262 iret = inflate_chunk(&ic, 1, stdout, &stream, &iter);
1275 finish_chunk(&stream, stdout, &iter);
1285 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1290 int iolog_file_inflate(const char *file)
1292 log_err("fio: log inflation not possible without zlib\n");
1298 void flush_log(struct io_log *log, bool do_append)
1304 f = fopen(log->filename, "w");
1306 f = fopen(log->filename, "a");
1308 perror("fopen log");
1312 buf = set_file_buffer(f);
1314 inflate_gz_chunks(log, f);
1316 while (!flist_empty(&log->io_logs)) {
1317 struct io_logs *cur_log;
1319 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1320 flist_del_init(&cur_log->list);
1322 if (log->td && log == log->td->clat_hist_log)
1323 flush_hist_samples(f, log->hist_coarseness, cur_log->log,
1324 log_sample_sz(log, cur_log));
1326 flush_samples(f, cur_log->log, log_sample_sz(log, cur_log));
1332 clear_file_buffer(buf);
1335 static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
1337 if (td->flags & TD_F_COMPRESS_LOG)
1341 if (fio_trylock_file(log->filename))
1344 fio_lock_file(log->filename);
1346 if (td->client_type == FIO_CLIENT_TYPE_GUI || is_backend)
1347 fio_send_iolog(td, log, log->filename);
1349 flush_log(log, !td->o.per_job_logs);
1351 fio_unlock_file(log->filename);
1356 size_t log_chunk_sizes(struct io_log *log)
1358 struct flist_head *entry;
1361 if (flist_empty(&log->chunk_list))
1365 pthread_mutex_lock(&log->chunk_lock);
1366 flist_for_each(entry, &log->chunk_list) {
1367 struct iolog_compress *c;
1369 c = flist_entry(entry, struct iolog_compress, list);
1372 pthread_mutex_unlock(&log->chunk_lock);
1378 static void iolog_put_deferred(struct io_log *log, void *ptr)
1383 pthread_mutex_lock(&log->deferred_free_lock);
1384 if (log->deferred < IOLOG_MAX_DEFER) {
1385 log->deferred_items[log->deferred] = ptr;
1387 } else if (!fio_did_warn(FIO_WARN_IOLOG_DROP))
1388 log_err("fio: had to drop log entry free\n");
1389 pthread_mutex_unlock(&log->deferred_free_lock);
1392 static void iolog_free_deferred(struct io_log *log)
1399 pthread_mutex_lock(&log->deferred_free_lock);
1401 for (i = 0; i < log->deferred; i++) {
1402 free(log->deferred_items[i]);
1403 log->deferred_items[i] = NULL;
1407 pthread_mutex_unlock(&log->deferred_free_lock);
1410 static int gz_work(struct iolog_flush_data *data)
1412 struct iolog_compress *c = NULL;
1413 struct flist_head list;
1419 INIT_FLIST_HEAD(&list);
1421 memset(&stream, 0, sizeof(stream));
1422 stream.zalloc = Z_NULL;
1423 stream.zfree = Z_NULL;
1424 stream.opaque = Z_NULL;
1426 ret = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
1428 log_err("fio: failed to init gz stream\n");
1432 seq = ++data->log->chunk_seq;
1434 stream.next_in = (void *) data->samples;
1435 stream.avail_in = data->nr_samples * log_entry_sz(data->log);
1437 dprint(FD_COMPRESS, "deflate input size=%lu, seq=%u, log=%s\n",
1438 (unsigned long) stream.avail_in, seq,
1439 data->log->filename);
1442 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1443 (unsigned long) c->len);
1444 c = get_new_chunk(seq);
1445 stream.avail_out = GZ_CHUNK;
1446 stream.next_out = c->buf;
1447 ret = deflate(&stream, Z_NO_FLUSH);
1449 log_err("fio: deflate log (%d)\n", ret);
1454 c->len = GZ_CHUNK - stream.avail_out;
1455 flist_add_tail(&c->list, &list);
1457 } while (stream.avail_in);
1459 stream.next_out = c->buf + c->len;
1460 stream.avail_out = GZ_CHUNK - c->len;
1462 ret = deflate(&stream, Z_FINISH);
1465 * Z_BUF_ERROR is special, it just means we need more
1466 * output space. We'll handle that below. Treat any other
1469 if (ret != Z_BUF_ERROR) {
1470 log_err("fio: deflate log (%d)\n", ret);
1471 flist_del(&c->list);
1478 c->len = GZ_CHUNK - stream.avail_out;
1480 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq, (unsigned long) c->len);
1482 if (ret != Z_STREAM_END) {
1484 c = get_new_chunk(seq);
1485 stream.avail_out = GZ_CHUNK;
1486 stream.next_out = c->buf;
1487 ret = deflate(&stream, Z_FINISH);
1488 c->len = GZ_CHUNK - stream.avail_out;
1490 flist_add_tail(&c->list, &list);
1491 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1492 (unsigned long) c->len);
1493 } while (ret != Z_STREAM_END);
1496 dprint(FD_COMPRESS, "deflated to size=%lu\n", (unsigned long) total);
1498 ret = deflateEnd(&stream);
1500 log_err("fio: deflateEnd %d\n", ret);
1502 iolog_put_deferred(data->log, data->samples);
1504 if (!flist_empty(&list)) {
1505 pthread_mutex_lock(&data->log->chunk_lock);
1506 flist_splice_tail(&list, &data->log->chunk_list);
1507 pthread_mutex_unlock(&data->log->chunk_lock);
1516 while (!flist_empty(&list)) {
1517 c = flist_first_entry(list.next, struct iolog_compress, list);
1518 flist_del(&c->list);
1526 * Invoked from our compress helper thread, when logging would have exceeded
1527 * the specified memory limitation. Compresses the previously stored
1530 static int gz_work_async(struct submit_worker *sw, struct workqueue_work *work)
1532 return gz_work(container_of(work, struct iolog_flush_data, work));
1535 static int gz_init_worker(struct submit_worker *sw)
1537 struct thread_data *td = sw->wq->td;
1539 if (!fio_option_is_set(&td->o, log_gz_cpumask))
1542 if (fio_setaffinity(gettid(), td->o.log_gz_cpumask) == -1) {
1543 log_err("gz: failed to set CPU affinity\n");
1550 static struct workqueue_ops log_compress_wq_ops = {
1551 .fn = gz_work_async,
1552 .init_worker_fn = gz_init_worker,
1556 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1558 if (!(td->flags & TD_F_COMPRESS_LOG))
1561 workqueue_init(td, &td->log_compress_wq, &log_compress_wq_ops, 1, sk_out);
1565 void iolog_compress_exit(struct thread_data *td)
1567 if (!(td->flags & TD_F_COMPRESS_LOG))
1570 workqueue_exit(&td->log_compress_wq);
1574 * Queue work item to compress the existing log entries. We reset the
1575 * current log to a small size, and reference the existing log in the
1576 * data that we queue for compression. Once compression has been done,
1577 * this old log is freed. If called with finish == true, will not return
1578 * until the log compression has completed, and will flush all previous
1581 static int iolog_flush(struct io_log *log)
1583 struct iolog_flush_data *data;
1585 data = malloc(sizeof(*data));
1592 while (!flist_empty(&log->io_logs)) {
1593 struct io_logs *cur_log;
1595 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1596 flist_del_init(&cur_log->list);
1598 data->samples = cur_log->log;
1599 data->nr_samples = cur_log->nr_samples;
1610 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1612 struct iolog_flush_data *data;
1614 data = smalloc(sizeof(*data));
1620 data->samples = cur_log->log;
1621 data->nr_samples = cur_log->nr_samples;
1624 cur_log->nr_samples = cur_log->max_samples = 0;
1625 cur_log->log = NULL;
1627 workqueue_enqueue(&log->td->log_compress_wq, &data->work);
1629 iolog_free_deferred(log);
1635 static int iolog_flush(struct io_log *log)
1640 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1645 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1650 void iolog_compress_exit(struct thread_data *td)
1656 struct io_logs *iolog_cur_log(struct io_log *log)
1658 if (flist_empty(&log->io_logs))
1661 return flist_last_entry(&log->io_logs, struct io_logs, list);
1664 uint64_t iolog_nr_samples(struct io_log *iolog)
1666 struct flist_head *entry;
1669 flist_for_each(entry, &iolog->io_logs) {
1670 struct io_logs *cur_log;
1672 cur_log = flist_entry(entry, struct io_logs, list);
1673 ret += cur_log->nr_samples;
1679 static int __write_log(struct thread_data *td, struct io_log *log, int try)
1682 return finish_log(td, log, try);
1687 static int write_iops_log(struct thread_data *td, int try, bool unit_log)
1691 if (per_unit_log(td->iops_log) != unit_log)
1694 ret = __write_log(td, td->iops_log, try);
1696 td->iops_log = NULL;
1701 static int write_slat_log(struct thread_data *td, int try, bool unit_log)
1708 ret = __write_log(td, td->slat_log, try);
1710 td->slat_log = NULL;
1715 static int write_clat_log(struct thread_data *td, int try, bool unit_log)
1722 ret = __write_log(td, td->clat_log, try);
1724 td->clat_log = NULL;
1729 static int write_clat_hist_log(struct thread_data *td, int try, bool unit_log)
1736 ret = __write_log(td, td->clat_hist_log, try);
1738 td->clat_hist_log = NULL;
1743 static int write_lat_log(struct thread_data *td, int try, bool unit_log)
1750 ret = __write_log(td, td->lat_log, try);
1757 static int write_bandw_log(struct thread_data *td, int try, bool unit_log)
1761 if (per_unit_log(td->bw_log) != unit_log)
1764 ret = __write_log(td, td->bw_log, try);
1777 CLAT_HIST_LOG_MASK = 32,
1784 int (*fn)(struct thread_data *, int, bool);
1787 static struct log_type log_types[] = {
1789 .mask = BW_LOG_MASK,
1790 .fn = write_bandw_log,
1793 .mask = LAT_LOG_MASK,
1794 .fn = write_lat_log,
1797 .mask = SLAT_LOG_MASK,
1798 .fn = write_slat_log,
1801 .mask = CLAT_LOG_MASK,
1802 .fn = write_clat_log,
1805 .mask = IOPS_LOG_MASK,
1806 .fn = write_iops_log,
1809 .mask = CLAT_HIST_LOG_MASK,
1810 .fn = write_clat_hist_log,
1814 void td_writeout_logs(struct thread_data *td, bool unit_logs)
1816 unsigned int log_mask = 0;
1817 unsigned int log_left = ALL_LOG_NR;
1820 old_state = td_bump_runstate(td, TD_FINISHING);
1822 finalize_logs(td, unit_logs);
1825 int prev_log_left = log_left;
1827 for (i = 0; i < ALL_LOG_NR && log_left; i++) {
1828 struct log_type *lt = &log_types[i];
1831 if (!(log_mask & lt->mask)) {
1832 ret = lt->fn(td, log_left != 1, unit_logs);
1835 log_mask |= lt->mask;
1840 if (prev_log_left == log_left)
1844 td_restore_runstate(td, old_state);
1847 void fio_writeout_logs(bool unit_logs)
1849 struct thread_data *td;
1853 td_writeout_logs(td, unit_logs);