2 * Code related to writing an iolog of what a thread is doing, and to
3 * later read that back and replay
22 #include "lib/roundup.h"
24 #include <netinet/in.h>
25 #include <netinet/tcp.h>
26 #include <arpa/inet.h>
28 #include <sys/socket.h>
31 static int iolog_flush(struct io_log *log);
33 static const char iolog_ver2[] = "fio version 2 iolog";
34 static const char iolog_ver3[] = "fio version 3 iolog";
36 void queue_io_piece(struct thread_data *td, struct io_piece *ipo)
38 flist_add_tail(&ipo->list, &td->io_log_list);
39 td->total_io_size += ipo->len;
42 void log_io_u(const struct thread_data *td, const struct io_u *io_u)
44 if (!td->o.write_iolog_file)
47 fprintf(td->iolog_f, "%s %s %llu %llu\n", io_u->file->file_name,
48 io_ddir_name(io_u->ddir),
49 io_u->offset, io_u->buflen);
52 void log_file(struct thread_data *td, struct fio_file *f,
53 enum file_log_act what)
55 const char *act[] = { "add", "open", "close" };
59 if (!td->o.write_iolog_file)
64 * this happens on the pre-open/close done before the job starts
69 fprintf(td->iolog_f, "%s %s\n", f->file_name, act[what]);
72 static void iolog_delay(struct thread_data *td, unsigned long delay)
74 uint64_t usec = utime_since_now(&td->last_issue);
75 unsigned long orig_delay = delay;
79 if (delay < td->time_offset) {
84 delay -= td->time_offset;
90 fio_gettime(&ts, NULL);
91 while (delay && !td->terminate) {
93 if (this_delay > 500000)
96 usec_sleep(td, this_delay);
100 usec = utime_since_now(&ts);
101 if (usec > orig_delay)
102 td->time_offset = usec - orig_delay;
107 static int ipo_special(struct thread_data *td, struct io_piece *ipo)
115 if (ipo->ddir != DDIR_INVAL)
118 f = td->files[ipo->fileno];
121 iolog_delay(td, ipo->delay);
122 if (fio_fill_issue_time(td))
123 fio_gettime(&td->last_issue, NULL);
124 switch (ipo->file_action) {
125 case FIO_LOG_OPEN_FILE:
126 if (td->o.replay_redirect && fio_file_open(f)) {
127 dprint(FD_FILE, "iolog: ignoring re-open of file %s\n",
131 ret = td_io_open_file(td, f);
134 td_verror(td, ret, "iolog open file");
136 case FIO_LOG_CLOSE_FILE:
137 td_io_close_file(td, f);
139 case FIO_LOG_UNLINK_FILE:
140 td_io_unlink_file(td, f);
142 case FIO_LOG_ADD_FILE:
148 log_err("fio: bad file action %d\n", ipo->file_action);
155 static bool read_iolog(struct thread_data *td);
157 unsigned long long delay_since_ttime(const struct thread_data *td,
158 unsigned long long time)
162 const unsigned long long *last_ttime = &td->io_log_last_ttime;
164 if (!*last_ttime || td->o.no_stall || time < *last_ttime)
166 else if (td->o.replay_time_scale == 100)
167 return time - *last_ttime;
170 scale = (double) 100.0 / (double) td->o.replay_time_scale;
171 tmp = time - *last_ttime;
175 int read_iolog_get(struct thread_data *td, struct io_u *io_u)
177 struct io_piece *ipo;
178 unsigned long elapsed;
180 while (!flist_empty(&td->io_log_list)) {
183 if (td->o.read_iolog_chunked) {
184 if (td->io_log_checkmark == td->io_log_current) {
185 if (td->io_log_blktrace) {
186 if (!read_blktrace(td))
193 td->io_log_current--;
195 ipo = flist_first_entry(&td->io_log_list, struct io_piece, list);
196 flist_del(&ipo->list);
197 remove_trim_entry(td, ipo);
199 ret = ipo_special(td, ipo);
203 } else if (ret > 0) {
208 io_u->ddir = ipo->ddir;
209 if (ipo->ddir != DDIR_WAIT) {
210 io_u->offset = ipo->offset;
211 io_u->verify_offset = ipo->offset;
212 io_u->buflen = ipo->len;
213 io_u->file = td->files[ipo->fileno];
214 get_file(io_u->file);
215 dprint(FD_IO, "iolog: get %llu/%llu/%s\n", io_u->offset,
216 io_u->buflen, io_u->file->file_name);
218 iolog_delay(td, ipo->delay);
220 elapsed = mtime_since_genesis();
221 if (ipo->delay > elapsed)
222 usec_sleep(td, (ipo->delay - elapsed) * 1000);
227 if (io_u->ddir != DDIR_WAIT)
235 void prune_io_piece_log(struct thread_data *td)
237 struct io_piece *ipo;
238 struct fio_rb_node *n;
240 while ((n = rb_first(&td->io_hist_tree)) != NULL) {
241 ipo = rb_entry(n, struct io_piece, rb_node);
242 rb_erase(n, &td->io_hist_tree);
243 remove_trim_entry(td, ipo);
248 while (!flist_empty(&td->io_hist_list)) {
249 ipo = flist_first_entry(&td->io_hist_list, struct io_piece, list);
250 flist_del(&ipo->list);
251 remove_trim_entry(td, ipo);
258 * log a successful write, so we can unwind the log for verify
260 void log_io_piece(struct thread_data *td, struct io_u *io_u)
262 struct fio_rb_node **p, *parent;
263 struct io_piece *ipo, *__ipo;
265 ipo = calloc(1, sizeof(struct io_piece));
267 ipo->file = io_u->file;
268 ipo->offset = io_u->offset;
269 ipo->len = io_u->buflen;
270 ipo->numberio = io_u->numberio;
271 ipo->flags = IP_F_IN_FLIGHT;
275 if (io_u_should_trim(td, io_u)) {
276 flist_add_tail(&ipo->trim_list, &td->trim_list);
281 * Only sort writes if we don't have a random map in which case we need
282 * to check for duplicate blocks and drop the old one, which we rely on
283 * the rb insert/lookup for handling.
285 if (file_randommap(td, ipo->file)) {
286 INIT_FLIST_HEAD(&ipo->list);
287 flist_add_tail(&ipo->list, &td->io_hist_list);
288 ipo->flags |= IP_F_ONLIST;
293 RB_CLEAR_NODE(&ipo->rb_node);
296 * Sort the entry into the verification list
299 p = &td->io_hist_tree.rb_node;
305 __ipo = rb_entry(parent, struct io_piece, rb_node);
306 if (ipo->file < __ipo->file)
308 else if (ipo->file > __ipo->file)
310 else if (ipo->offset < __ipo->offset) {
312 overlap = ipo->offset + ipo->len > __ipo->offset;
314 else if (ipo->offset > __ipo->offset) {
316 overlap = __ipo->offset + __ipo->len > ipo->offset;
322 dprint(FD_IO, "iolog: overlap %llu/%lu, %llu/%lu\n",
323 __ipo->offset, __ipo->len,
324 ipo->offset, ipo->len);
326 rb_erase(parent, &td->io_hist_tree);
327 remove_trim_entry(td, __ipo);
328 if (!(__ipo->flags & IP_F_IN_FLIGHT))
334 rb_link_node(&ipo->rb_node, parent, p);
335 rb_insert_color(&ipo->rb_node, &td->io_hist_tree);
336 ipo->flags |= IP_F_ONRB;
340 void unlog_io_piece(struct thread_data *td, struct io_u *io_u)
342 struct io_piece *ipo = io_u->ipo;
344 if (td->ts.nr_block_infos) {
345 uint32_t *info = io_u_block_info(td, io_u);
346 if (BLOCK_INFO_STATE(*info) < BLOCK_STATE_TRIM_FAILURE) {
347 if (io_u->ddir == DDIR_TRIM)
348 *info = BLOCK_INFO_SET_STATE(*info,
349 BLOCK_STATE_TRIM_FAILURE);
350 else if (io_u->ddir == DDIR_WRITE)
351 *info = BLOCK_INFO_SET_STATE(*info,
352 BLOCK_STATE_WRITE_FAILURE);
359 if (ipo->flags & IP_F_ONRB)
360 rb_erase(&ipo->rb_node, &td->io_hist_tree);
361 else if (ipo->flags & IP_F_ONLIST)
362 flist_del(&ipo->list);
369 void trim_io_piece(const struct io_u *io_u)
371 struct io_piece *ipo = io_u->ipo;
376 ipo->len = io_u->xfer_buflen - io_u->resid;
379 void write_iolog_close(struct thread_data *td)
388 td->iolog_buf = NULL;
391 int64_t iolog_items_to_fetch(struct thread_data *td)
396 int64_t items_to_fetch;
398 if (!td->io_log_highmark)
402 fio_gettime(&now, NULL);
403 elapsed = ntime_since(&td->io_log_highmark_time, &now);
405 for_1s = (td->io_log_highmark - td->io_log_current) * 1000000000 / elapsed;
406 items_to_fetch = for_1s - td->io_log_current;
407 if (items_to_fetch < 0)
412 td->io_log_highmark = td->io_log_current + items_to_fetch;
413 td->io_log_checkmark = (td->io_log_highmark + 1) / 2;
414 fio_gettime(&td->io_log_highmark_time, NULL);
416 return items_to_fetch;
419 #define io_act(_td, _r) (((_td)->io_log_version == 3 && (r) == 5) || \
420 ((_td)->io_log_version == 2 && (r) == 4))
421 #define file_act(_td, _r) (((_td)->io_log_version == 3 && (r) == 3) || \
422 ((_td)->io_log_version == 2 && (r) == 2))
425 * Read version 2 and 3 iolog data. It is enhanced to include per-file logging,
428 static bool read_iolog(struct thread_data *td)
430 unsigned long long offset;
432 unsigned long long delay = 0;
433 int reads, writes, waits, fileno = 0, file_action = 0; /* stupid gcc */
434 char *rfname, *fname, *act;
437 bool realloc = false;
438 int64_t items_to_fetch = 0;
441 if (td->o.read_iolog_chunked) {
442 items_to_fetch = iolog_items_to_fetch(td);
448 * Read in the read iolog and store it, reuse the infrastructure
449 * for doing verifications.
452 rfname = fname = malloc(256+16);
453 act = malloc(256+16);
455 syncs = reads = writes = waits = 0;
456 while ((p = fgets(str, 4096, td->io_log_rfile)) != NULL) {
457 struct io_piece *ipo;
459 unsigned long long ttime;
461 if (td->io_log_version == 3) {
462 r = sscanf(p, "%llu %256s %256s %llu %u", &ttime, rfname, act,
464 delay = delay_since_ttime(td, ttime);
465 td->io_log_last_ttime = ttime;
467 * "wait" is not allowed with version 3
469 if (!strcmp(act, "wait")) {
470 log_err("iolog: ignoring wait command with"
471 " version 3 for file %s\n", fname);
474 } else /* version 2 */
475 r = sscanf(p, "%256s %256s %llu %u", rfname, act, &offset, &bytes);
477 if (td->o.replay_redirect)
478 fname = td->o.replay_redirect;
484 if (!strcmp(act, "wait"))
486 else if (!strcmp(act, "read"))
488 else if (!strcmp(act, "write"))
490 else if (!strcmp(act, "sync"))
492 else if (!strcmp(act, "datasync"))
494 else if (!strcmp(act, "trim"))
497 log_err("fio: bad iolog file action: %s\n",
501 fileno = get_fileno(td, fname);
502 } else if (file_act(td, r)) {
504 if (!strcmp(act, "add")) {
505 if (td->o.replay_redirect &&
506 get_fileno(td, fname) != -1) {
507 dprint(FD_FILE, "iolog: ignoring"
508 " re-add of file %s\n", fname);
510 fileno = add_file(td, fname, td->subjob_number, 1);
511 file_action = FIO_LOG_ADD_FILE;
513 } else if (!strcmp(act, "open")) {
514 fileno = get_fileno(td, fname);
515 file_action = FIO_LOG_OPEN_FILE;
516 } else if (!strcmp(act, "close")) {
517 fileno = get_fileno(td, fname);
518 file_action = FIO_LOG_CLOSE_FILE;
520 log_err("fio: bad iolog file action: %s\n",
525 log_err("bad iolog%d: %s\n", td->io_log_version, p);
531 else if (rw == DDIR_WRITE) {
533 * Don't add a write for ro mode
538 } else if (rw == DDIR_WAIT) {
542 } else if (rw == DDIR_INVAL) {
543 } else if (ddir_sync(rw)) {
546 log_err("bad ddir: %d\n", rw);
553 ipo = calloc(1, sizeof(*ipo));
556 if (td->io_log_version == 3)
558 if (rw == DDIR_WAIT) {
561 if (td->o.replay_scale)
562 ipo->offset = offset / td->o.replay_scale;
564 ipo->offset = offset;
565 ipo_bytes_align(td->o.replay_align, ipo);
568 if (rw != DDIR_INVAL && bytes > td->o.max_bs[rw]) {
570 td->o.max_bs[rw] = bytes;
572 ipo->fileno = fileno;
573 ipo->file_action = file_action;
577 queue_io_piece(td, ipo);
579 if (td->o.read_iolog_chunked) {
580 td->io_log_current++;
582 if (items_to_fetch == 0)
591 if (td->o.read_iolog_chunked) {
592 td->io_log_highmark = td->io_log_current;
593 td->io_log_checkmark = (td->io_log_highmark + 1) / 2;
594 fio_gettime(&td->io_log_highmark_time, NULL);
597 if (writes && read_only) {
598 log_err("fio: <%s> skips replay of %d writes due to"
599 " read-only\n", td->o.name, writes);
603 td->flags |= TD_F_SYNCS;
605 if (td->o.read_iolog_chunked) {
606 if (td->io_log_current == 0) {
609 td->o.td_ddir = TD_DDIR_RW;
610 if (realloc && td->orig_buffer)
614 init_io_u_buffers(td);
619 if (!reads && !writes && !waits)
621 else if (reads && !writes)
622 td->o.td_ddir = TD_DDIR_READ;
623 else if (!reads && writes)
624 td->o.td_ddir = TD_DDIR_WRITE;
626 td->o.td_ddir = TD_DDIR_RW;
631 static bool is_socket(const char *path)
636 r = stat(path, &buf);
640 return S_ISSOCK(buf.st_mode);
643 static int open_socket(const char *path)
645 struct sockaddr_un addr;
648 fd = socket(AF_UNIX, SOCK_STREAM, 0);
652 addr.sun_family = AF_UNIX;
653 if (snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", path) >=
654 sizeof(addr.sun_path)) {
655 log_err("%s: path name %s is too long for a Unix socket\n",
659 ret = connect(fd, (const struct sockaddr *)&addr, strlen(path) + sizeof(addr.sun_family));
668 * open iolog, check version, and call appropriate parser
670 static bool init_iolog_read(struct thread_data *td, char *fname)
672 char buffer[256], *p;
675 dprint(FD_IO, "iolog: name=%s\n", fname);
677 if (is_socket(fname)) {
680 fd = open_socket(fname);
683 } else if (!strcmp(fname, "-")) {
686 f = fopen(fname, "r");
689 perror("fopen read iolog");
693 p = fgets(buffer, sizeof(buffer), f);
695 td_verror(td, errno, "iolog read");
696 log_err("fio: unable to read iolog\n");
702 * versions 2 and 3 of the iolog store a specific string as the
703 * first line, check for that
705 if (!strncmp(iolog_ver2, buffer, strlen(iolog_ver2)))
706 td->io_log_version = 2;
707 else if (!strncmp(iolog_ver3, buffer, strlen(iolog_ver3)))
708 td->io_log_version = 3;
710 log_err("fio: iolog version 1 is no longer supported\n");
715 free_release_files(td);
716 td->io_log_rfile = f;
717 return read_iolog(td);
721 * Set up a log for storing io patterns.
723 static bool init_iolog_write(struct thread_data *td)
729 f = fopen(td->o.write_iolog_file, "a");
731 perror("fopen write iolog");
736 * That's it for writing, setup a log buffer and we're done.
739 td->iolog_buf = malloc(8192);
740 setvbuf(f, td->iolog_buf, _IOFBF, 8192);
743 * write our version line
745 if (fprintf(f, "%s\n", iolog_ver2) < 0) {
746 perror("iolog init\n");
751 * add all known files
753 for_each_file(td, ff, i)
754 log_file(td, ff, FIO_LOG_ADD_FILE);
759 bool init_iolog(struct thread_data *td)
763 if (td->o.read_iolog_file) {
765 char * fname = get_name_by_idx(td->o.read_iolog_file, td->subjob_number);
768 * Check if it's a blktrace file and load that if possible.
769 * Otherwise assume it's a normal log file and load that.
771 if (is_blktrace(fname, &need_swap)) {
772 td->io_log_blktrace = 1;
773 ret = init_blktrace_read(td, fname, need_swap);
775 td->io_log_blktrace = 0;
776 ret = init_iolog_read(td, fname);
779 } else if (td->o.write_iolog_file)
780 ret = init_iolog_write(td);
785 td_verror(td, EINVAL, "failed initializing iolog");
790 void setup_log(struct io_log **log, struct log_params *p,
791 const char *filename)
795 struct io_u_plat_entry *entry;
796 struct flist_head *list;
798 l = scalloc(1, sizeof(*l));
799 INIT_FLIST_HEAD(&l->io_logs);
800 l->log_type = p->log_type;
801 l->log_offset = p->log_offset;
802 l->log_prio = p->log_prio;
803 l->log_gz = p->log_gz;
804 l->log_gz_store = p->log_gz_store;
805 l->avg_msec = p->avg_msec;
806 l->hist_msec = p->hist_msec;
807 l->hist_coarseness = p->hist_coarseness;
808 l->filename = strdup(filename);
811 /* Initialize histogram lists for each r/w direction,
812 * with initial io_u_plat of all zeros:
814 for (i = 0; i < DDIR_RWDIR_CNT; i++) {
815 list = &l->hist_window[i].list;
816 INIT_FLIST_HEAD(list);
817 entry = calloc(1, sizeof(struct io_u_plat_entry));
818 flist_add(&entry->list, list);
821 if (l->td && l->td->o.io_submit_mode != IO_MODE_OFFLOAD) {
822 unsigned int def_samples = DEF_LOG_ENTRIES;
825 __p = calloc(1, sizeof(*l->pending));
826 if (l->td->o.iodepth > DEF_LOG_ENTRIES)
827 def_samples = roundup_pow2(l->td->o.iodepth);
828 __p->max_samples = def_samples;
829 __p->log = calloc(__p->max_samples, log_entry_sz(l));
834 l->log_ddir_mask = LOG_OFFSET_SAMPLE_BIT;
836 l->log_ddir_mask |= LOG_PRIO_SAMPLE_BIT;
838 INIT_FLIST_HEAD(&l->chunk_list);
840 if (l->log_gz && !p->td)
842 else if (l->log_gz || l->log_gz_store) {
843 mutex_init_pshared(&l->chunk_lock);
844 mutex_init_pshared(&l->deferred_free_lock);
845 p->td->flags |= TD_F_COMPRESS_LOG;
851 #ifdef CONFIG_SETVBUF
852 static void *set_file_buffer(FILE *f)
854 size_t size = 1048576;
858 setvbuf(f, buf, _IOFBF, size);
862 static void clear_file_buffer(void *buf)
867 static void *set_file_buffer(FILE *f)
872 static void clear_file_buffer(void *buf)
877 void free_log(struct io_log *log)
879 while (!flist_empty(&log->io_logs)) {
880 struct io_logs *cur_log;
882 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
883 flist_del_init(&cur_log->list);
889 free(log->pending->log);
899 uint64_t hist_sum(int j, int stride, uint64_t *io_u_plat,
900 uint64_t *io_u_plat_last)
905 if (io_u_plat_last) {
906 for (k = sum = 0; k < stride; k++)
907 sum += io_u_plat[j + k] - io_u_plat_last[j + k];
909 for (k = sum = 0; k < stride; k++)
910 sum += io_u_plat[j + k];
916 static void flush_hist_samples(FILE *f, int hist_coarseness, void *samples,
917 uint64_t sample_size)
921 uint64_t i, j, nr_samples;
922 struct io_u_plat_entry *entry, *entry_before;
924 uint64_t *io_u_plat_before;
926 int stride = 1 << hist_coarseness;
931 s = __get_sample(samples, 0, 0);
932 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
934 nr_samples = sample_size / __log_entry_sz(log_offset);
936 for (i = 0; i < nr_samples; i++) {
937 s = __get_sample(samples, log_offset, i);
939 entry = s->data.plat_entry;
940 io_u_plat = entry->io_u_plat;
942 entry_before = flist_first_entry(&entry->list, struct io_u_plat_entry, list);
943 io_u_plat_before = entry_before->io_u_plat;
945 fprintf(f, "%lu, %u, %llu, ", (unsigned long) s->time,
946 io_sample_ddir(s), (unsigned long long) s->bs);
947 for (j = 0; j < FIO_IO_U_PLAT_NR - stride; j += stride) {
948 fprintf(f, "%llu, ", (unsigned long long)
949 hist_sum(j, stride, io_u_plat, io_u_plat_before));
951 fprintf(f, "%llu\n", (unsigned long long)
952 hist_sum(FIO_IO_U_PLAT_NR - stride, stride, io_u_plat,
955 flist_del(&entry_before->list);
960 void flush_samples(FILE *f, void *samples, uint64_t sample_size)
963 int log_offset, log_prio;
964 uint64_t i, nr_samples;
965 unsigned int prio_val;
971 s = __get_sample(samples, 0, 0);
972 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
973 log_prio = (s->__ddir & LOG_PRIO_SAMPLE_BIT) != 0;
977 fmt = "%lu, %" PRId64 ", %u, %llu, %llu, 0x%04x\n";
979 fmt = "%lu, %" PRId64 ", %u, %llu, %llu, %u\n";
982 fmt = "%lu, %" PRId64 ", %u, %llu, 0x%04x\n";
984 fmt = "%lu, %" PRId64 ", %u, %llu, %u\n";
987 nr_samples = sample_size / __log_entry_sz(log_offset);
989 for (i = 0; i < nr_samples; i++) {
990 s = __get_sample(samples, log_offset, i);
993 prio_val = s->priority;
995 prio_val = ioprio_value_is_class_rt(s->priority);
999 (unsigned long) s->time,
1001 io_sample_ddir(s), (unsigned long long) s->bs,
1004 struct io_sample_offset *so = (void *) s;
1007 (unsigned long) s->time,
1009 io_sample_ddir(s), (unsigned long long) s->bs,
1010 (unsigned long long) so->offset,
1018 struct iolog_flush_data {
1019 struct workqueue_work work;
1022 uint32_t nr_samples;
1026 #define GZ_CHUNK 131072
1028 static struct iolog_compress *get_new_chunk(unsigned int seq)
1030 struct iolog_compress *c;
1032 c = malloc(sizeof(*c));
1033 INIT_FLIST_HEAD(&c->list);
1034 c->buf = malloc(GZ_CHUNK);
1040 static void free_chunk(struct iolog_compress *ic)
1046 static int z_stream_init(z_stream *stream, int gz_hdr)
1050 memset(stream, 0, sizeof(*stream));
1051 stream->zalloc = Z_NULL;
1052 stream->zfree = Z_NULL;
1053 stream->opaque = Z_NULL;
1054 stream->next_in = Z_NULL;
1057 * zlib magic - add 32 for auto-detection of gz header or not,
1058 * if we decide to store files in a gzip friendly format.
1063 if (inflateInit2(stream, wbits) != Z_OK)
1069 struct inflate_chunk_iter {
1078 static void finish_chunk(z_stream *stream, FILE *f,
1079 struct inflate_chunk_iter *iter)
1083 ret = inflateEnd(stream);
1085 log_err("fio: failed to end log inflation seq %d (%d)\n",
1088 flush_samples(f, iter->buf, iter->buf_used);
1091 iter->buf_size = iter->buf_used = 0;
1095 * Iterative chunk inflation. Handles cases where we cross into a new
1096 * sequence, doing flush finish of previous chunk if needed.
1098 static size_t inflate_chunk(struct iolog_compress *ic, int gz_hdr, FILE *f,
1099 z_stream *stream, struct inflate_chunk_iter *iter)
1103 dprint(FD_COMPRESS, "inflate chunk size=%lu, seq=%u\n",
1104 (unsigned long) ic->len, ic->seq);
1106 if (ic->seq != iter->seq) {
1108 finish_chunk(stream, f, iter);
1110 z_stream_init(stream, gz_hdr);
1111 iter->seq = ic->seq;
1114 stream->avail_in = ic->len;
1115 stream->next_in = ic->buf;
1117 if (!iter->buf_size) {
1118 iter->buf_size = iter->chunk_sz;
1119 iter->buf = malloc(iter->buf_size);
1122 while (stream->avail_in) {
1123 size_t this_out = iter->buf_size - iter->buf_used;
1126 stream->avail_out = this_out;
1127 stream->next_out = iter->buf + iter->buf_used;
1129 err = inflate(stream, Z_NO_FLUSH);
1131 log_err("fio: failed inflating log: %d\n", err);
1136 iter->buf_used += this_out - stream->avail_out;
1138 if (!stream->avail_out) {
1139 iter->buf_size += iter->chunk_sz;
1140 iter->buf = realloc(iter->buf, iter->buf_size);
1144 if (err == Z_STREAM_END)
1148 ret = (void *) stream->next_in - ic->buf;
1150 dprint(FD_COMPRESS, "inflated to size=%lu\n", (unsigned long) iter->buf_size);
1156 * Inflate stored compressed chunks, or write them directly to the log
1157 * file if so instructed.
1159 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1161 struct inflate_chunk_iter iter = { .chunk_sz = log->log_gz, };
1164 while (!flist_empty(&log->chunk_list)) {
1165 struct iolog_compress *ic;
1167 ic = flist_first_entry(&log->chunk_list, struct iolog_compress, list);
1168 flist_del(&ic->list);
1170 if (log->log_gz_store) {
1173 dprint(FD_COMPRESS, "log write chunk size=%lu, "
1174 "seq=%u\n", (unsigned long) ic->len, ic->seq);
1176 ret = fwrite(ic->buf, ic->len, 1, f);
1177 if (ret != 1 || ferror(f)) {
1179 log_err("fio: error writing compressed log\n");
1182 inflate_chunk(ic, log->log_gz_store, f, &stream, &iter);
1188 finish_chunk(&stream, f, &iter);
1196 * Open compressed log file and decompress the stored chunks and
1197 * write them to stdout. The chunks are stored sequentially in the
1198 * file, so we iterate over them and do them one-by-one.
1200 int iolog_file_inflate(const char *file)
1202 struct inflate_chunk_iter iter = { .chunk_sz = 64 * 1024 * 1024, };
1203 struct iolog_compress ic;
1211 f = fopen(file, "r");
1217 if (stat(file, &sb) < 0) {
1223 ic.buf = buf = malloc(sb.st_size);
1224 ic.len = sb.st_size;
1227 ret = fread(ic.buf, ic.len, 1, f);
1228 if (ret == 0 && ferror(f)) {
1233 } else if (ferror(f) || (!feof(f) && ret != 1)) {
1234 log_err("fio: short read on reading log\n");
1243 * Each chunk will return Z_STREAM_END. We don't know how many
1244 * chunks are in the file, so we just keep looping and incrementing
1245 * the sequence number until we have consumed the whole compressed
1252 iret = inflate_chunk(&ic, 1, stdout, &stream, &iter);
1265 finish_chunk(&stream, stdout, &iter);
1275 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1280 int iolog_file_inflate(const char *file)
1282 log_err("fio: log inflation not possible without zlib\n");
1288 void flush_log(struct io_log *log, bool do_append)
1294 f = fopen(log->filename, "w");
1296 f = fopen(log->filename, "a");
1298 perror("fopen log");
1302 buf = set_file_buffer(f);
1304 inflate_gz_chunks(log, f);
1306 while (!flist_empty(&log->io_logs)) {
1307 struct io_logs *cur_log;
1309 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1310 flist_del_init(&cur_log->list);
1312 if (log->td && log == log->td->clat_hist_log)
1313 flush_hist_samples(f, log->hist_coarseness, cur_log->log,
1314 log_sample_sz(log, cur_log));
1316 flush_samples(f, cur_log->log, log_sample_sz(log, cur_log));
1322 clear_file_buffer(buf);
1325 static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
1327 if (td->flags & TD_F_COMPRESS_LOG)
1331 if (fio_trylock_file(log->filename))
1334 fio_lock_file(log->filename);
1336 if (td->client_type == FIO_CLIENT_TYPE_GUI || is_backend)
1337 fio_send_iolog(td, log, log->filename);
1339 flush_log(log, !td->o.per_job_logs);
1341 fio_unlock_file(log->filename);
1346 size_t log_chunk_sizes(struct io_log *log)
1348 struct flist_head *entry;
1351 if (flist_empty(&log->chunk_list))
1355 pthread_mutex_lock(&log->chunk_lock);
1356 flist_for_each(entry, &log->chunk_list) {
1357 struct iolog_compress *c;
1359 c = flist_entry(entry, struct iolog_compress, list);
1362 pthread_mutex_unlock(&log->chunk_lock);
1368 static void iolog_put_deferred(struct io_log *log, void *ptr)
1373 pthread_mutex_lock(&log->deferred_free_lock);
1374 if (log->deferred < IOLOG_MAX_DEFER) {
1375 log->deferred_items[log->deferred] = ptr;
1377 } else if (!fio_did_warn(FIO_WARN_IOLOG_DROP))
1378 log_err("fio: had to drop log entry free\n");
1379 pthread_mutex_unlock(&log->deferred_free_lock);
1382 static void iolog_free_deferred(struct io_log *log)
1389 pthread_mutex_lock(&log->deferred_free_lock);
1391 for (i = 0; i < log->deferred; i++) {
1392 free(log->deferred_items[i]);
1393 log->deferred_items[i] = NULL;
1397 pthread_mutex_unlock(&log->deferred_free_lock);
1400 static int gz_work(struct iolog_flush_data *data)
1402 struct iolog_compress *c = NULL;
1403 struct flist_head list;
1409 INIT_FLIST_HEAD(&list);
1411 memset(&stream, 0, sizeof(stream));
1412 stream.zalloc = Z_NULL;
1413 stream.zfree = Z_NULL;
1414 stream.opaque = Z_NULL;
1416 ret = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
1418 log_err("fio: failed to init gz stream\n");
1422 seq = ++data->log->chunk_seq;
1424 stream.next_in = (void *) data->samples;
1425 stream.avail_in = data->nr_samples * log_entry_sz(data->log);
1427 dprint(FD_COMPRESS, "deflate input size=%lu, seq=%u, log=%s\n",
1428 (unsigned long) stream.avail_in, seq,
1429 data->log->filename);
1432 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1433 (unsigned long) c->len);
1434 c = get_new_chunk(seq);
1435 stream.avail_out = GZ_CHUNK;
1436 stream.next_out = c->buf;
1437 ret = deflate(&stream, Z_NO_FLUSH);
1439 log_err("fio: deflate log (%d)\n", ret);
1444 c->len = GZ_CHUNK - stream.avail_out;
1445 flist_add_tail(&c->list, &list);
1447 } while (stream.avail_in);
1449 stream.next_out = c->buf + c->len;
1450 stream.avail_out = GZ_CHUNK - c->len;
1452 ret = deflate(&stream, Z_FINISH);
1455 * Z_BUF_ERROR is special, it just means we need more
1456 * output space. We'll handle that below. Treat any other
1459 if (ret != Z_BUF_ERROR) {
1460 log_err("fio: deflate log (%d)\n", ret);
1461 flist_del(&c->list);
1468 c->len = GZ_CHUNK - stream.avail_out;
1470 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq, (unsigned long) c->len);
1472 if (ret != Z_STREAM_END) {
1474 c = get_new_chunk(seq);
1475 stream.avail_out = GZ_CHUNK;
1476 stream.next_out = c->buf;
1477 ret = deflate(&stream, Z_FINISH);
1478 c->len = GZ_CHUNK - stream.avail_out;
1480 flist_add_tail(&c->list, &list);
1481 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1482 (unsigned long) c->len);
1483 } while (ret != Z_STREAM_END);
1486 dprint(FD_COMPRESS, "deflated to size=%lu\n", (unsigned long) total);
1488 ret = deflateEnd(&stream);
1490 log_err("fio: deflateEnd %d\n", ret);
1492 iolog_put_deferred(data->log, data->samples);
1494 if (!flist_empty(&list)) {
1495 pthread_mutex_lock(&data->log->chunk_lock);
1496 flist_splice_tail(&list, &data->log->chunk_list);
1497 pthread_mutex_unlock(&data->log->chunk_lock);
1506 while (!flist_empty(&list)) {
1507 c = flist_first_entry(list.next, struct iolog_compress, list);
1508 flist_del(&c->list);
1516 * Invoked from our compress helper thread, when logging would have exceeded
1517 * the specified memory limitation. Compresses the previously stored
1520 static int gz_work_async(struct submit_worker *sw, struct workqueue_work *work)
1522 return gz_work(container_of(work, struct iolog_flush_data, work));
1525 static int gz_init_worker(struct submit_worker *sw)
1527 struct thread_data *td = sw->wq->td;
1529 if (!fio_option_is_set(&td->o, log_gz_cpumask))
1532 if (fio_setaffinity(gettid(), td->o.log_gz_cpumask) == -1) {
1533 log_err("gz: failed to set CPU affinity\n");
1540 static struct workqueue_ops log_compress_wq_ops = {
1541 .fn = gz_work_async,
1542 .init_worker_fn = gz_init_worker,
1546 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1548 if (!(td->flags & TD_F_COMPRESS_LOG))
1551 workqueue_init(td, &td->log_compress_wq, &log_compress_wq_ops, 1, sk_out);
1555 void iolog_compress_exit(struct thread_data *td)
1557 if (!(td->flags & TD_F_COMPRESS_LOG))
1560 workqueue_exit(&td->log_compress_wq);
1564 * Queue work item to compress the existing log entries. We reset the
1565 * current log to a small size, and reference the existing log in the
1566 * data that we queue for compression. Once compression has been done,
1567 * this old log is freed. If called with finish == true, will not return
1568 * until the log compression has completed, and will flush all previous
1571 static int iolog_flush(struct io_log *log)
1573 struct iolog_flush_data *data;
1575 data = malloc(sizeof(*data));
1582 while (!flist_empty(&log->io_logs)) {
1583 struct io_logs *cur_log;
1585 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1586 flist_del_init(&cur_log->list);
1588 data->samples = cur_log->log;
1589 data->nr_samples = cur_log->nr_samples;
1600 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1602 struct iolog_flush_data *data;
1604 data = smalloc(sizeof(*data));
1610 data->samples = cur_log->log;
1611 data->nr_samples = cur_log->nr_samples;
1614 cur_log->nr_samples = cur_log->max_samples = 0;
1615 cur_log->log = NULL;
1617 workqueue_enqueue(&log->td->log_compress_wq, &data->work);
1619 iolog_free_deferred(log);
1625 static int iolog_flush(struct io_log *log)
1630 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1635 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1640 void iolog_compress_exit(struct thread_data *td)
1646 struct io_logs *iolog_cur_log(struct io_log *log)
1648 if (flist_empty(&log->io_logs))
1651 return flist_last_entry(&log->io_logs, struct io_logs, list);
1654 uint64_t iolog_nr_samples(struct io_log *iolog)
1656 struct flist_head *entry;
1659 flist_for_each(entry, &iolog->io_logs) {
1660 struct io_logs *cur_log;
1662 cur_log = flist_entry(entry, struct io_logs, list);
1663 ret += cur_log->nr_samples;
1669 static int __write_log(struct thread_data *td, struct io_log *log, int try)
1672 return finish_log(td, log, try);
1677 static int write_iops_log(struct thread_data *td, int try, bool unit_log)
1681 if (per_unit_log(td->iops_log) != unit_log)
1684 ret = __write_log(td, td->iops_log, try);
1686 td->iops_log = NULL;
1691 static int write_slat_log(struct thread_data *td, int try, bool unit_log)
1698 ret = __write_log(td, td->slat_log, try);
1700 td->slat_log = NULL;
1705 static int write_clat_log(struct thread_data *td, int try, bool unit_log)
1712 ret = __write_log(td, td->clat_log, try);
1714 td->clat_log = NULL;
1719 static int write_clat_hist_log(struct thread_data *td, int try, bool unit_log)
1726 ret = __write_log(td, td->clat_hist_log, try);
1728 td->clat_hist_log = NULL;
1733 static int write_lat_log(struct thread_data *td, int try, bool unit_log)
1740 ret = __write_log(td, td->lat_log, try);
1747 static int write_bandw_log(struct thread_data *td, int try, bool unit_log)
1751 if (per_unit_log(td->bw_log) != unit_log)
1754 ret = __write_log(td, td->bw_log, try);
1767 CLAT_HIST_LOG_MASK = 32,
1774 int (*fn)(struct thread_data *, int, bool);
1777 static struct log_type log_types[] = {
1779 .mask = BW_LOG_MASK,
1780 .fn = write_bandw_log,
1783 .mask = LAT_LOG_MASK,
1784 .fn = write_lat_log,
1787 .mask = SLAT_LOG_MASK,
1788 .fn = write_slat_log,
1791 .mask = CLAT_LOG_MASK,
1792 .fn = write_clat_log,
1795 .mask = IOPS_LOG_MASK,
1796 .fn = write_iops_log,
1799 .mask = CLAT_HIST_LOG_MASK,
1800 .fn = write_clat_hist_log,
1804 void td_writeout_logs(struct thread_data *td, bool unit_logs)
1806 unsigned int log_mask = 0;
1807 unsigned int log_left = ALL_LOG_NR;
1810 old_state = td_bump_runstate(td, TD_FINISHING);
1812 finalize_logs(td, unit_logs);
1815 int prev_log_left = log_left;
1817 for (i = 0; i < ALL_LOG_NR && log_left; i++) {
1818 struct log_type *lt = &log_types[i];
1821 if (!(log_mask & lt->mask)) {
1822 ret = lt->fn(td, log_left != 1, unit_logs);
1825 log_mask |= lt->mask;
1830 if (prev_log_left == log_left)
1834 td_restore_runstate(td, old_state);
1837 void fio_writeout_logs(bool unit_logs)
1839 struct thread_data *td;
1843 td_writeout_logs(td, unit_logs);