2 * Code related to writing an iolog of what a thread is doing, and to
3 * later read that back and replay
22 #include "lib/roundup.h"
24 #include <netinet/in.h>
25 #include <netinet/tcp.h>
26 #include <arpa/inet.h>
28 #include <sys/socket.h>
31 static int iolog_flush(struct io_log *log);
33 static const char iolog_ver2[] = "fio version 2 iolog";
35 void queue_io_piece(struct thread_data *td, struct io_piece *ipo)
37 flist_add_tail(&ipo->list, &td->io_log_list);
38 td->total_io_size += ipo->len;
41 void log_io_u(const struct thread_data *td, const struct io_u *io_u)
43 if (!td->o.write_iolog_file)
46 fprintf(td->iolog_f, "%s %s %llu %llu\n", io_u->file->file_name,
47 io_ddir_name(io_u->ddir),
48 io_u->offset, io_u->buflen);
51 void log_file(struct thread_data *td, struct fio_file *f,
52 enum file_log_act what)
54 const char *act[] = { "add", "open", "close" };
58 if (!td->o.write_iolog_file)
63 * this happens on the pre-open/close done before the job starts
68 fprintf(td->iolog_f, "%s %s\n", f->file_name, act[what]);
71 static void iolog_delay(struct thread_data *td, unsigned long delay)
73 uint64_t usec = utime_since_now(&td->last_issue);
74 unsigned long orig_delay = delay;
78 if (delay < td->time_offset) {
83 delay -= td->time_offset;
89 fio_gettime(&ts, NULL);
90 while (delay && !td->terminate) {
92 if (this_delay > 500000)
95 usec_sleep(td, this_delay);
99 usec = utime_since_now(&ts);
100 if (usec > orig_delay)
101 td->time_offset = usec - orig_delay;
106 static int ipo_special(struct thread_data *td, struct io_piece *ipo)
114 if (ipo->ddir != DDIR_INVAL)
117 f = td->files[ipo->fileno];
119 switch (ipo->file_action) {
120 case FIO_LOG_OPEN_FILE:
121 if (td->o.replay_redirect && fio_file_open(f)) {
122 dprint(FD_FILE, "iolog: ignoring re-open of file %s\n",
126 ret = td_io_open_file(td, f);
129 td_verror(td, ret, "iolog open file");
131 case FIO_LOG_CLOSE_FILE:
132 td_io_close_file(td, f);
134 case FIO_LOG_UNLINK_FILE:
135 td_io_unlink_file(td, f);
138 log_err("fio: bad file action %d\n", ipo->file_action);
145 static bool read_iolog2(struct thread_data *td);
147 int read_iolog_get(struct thread_data *td, struct io_u *io_u)
149 struct io_piece *ipo;
150 unsigned long elapsed;
152 while (!flist_empty(&td->io_log_list)) {
154 if (td->o.read_iolog_chunked) {
155 if (td->io_log_checkmark == td->io_log_current) {
156 if (!read_iolog2(td))
159 td->io_log_current--;
161 ipo = flist_first_entry(&td->io_log_list, struct io_piece, list);
162 flist_del(&ipo->list);
163 remove_trim_entry(td, ipo);
165 ret = ipo_special(td, ipo);
169 } else if (ret > 0) {
174 io_u->ddir = ipo->ddir;
175 if (ipo->ddir != DDIR_WAIT) {
176 io_u->offset = ipo->offset;
177 io_u->buflen = ipo->len;
178 io_u->file = td->files[ipo->fileno];
179 get_file(io_u->file);
180 dprint(FD_IO, "iolog: get %llu/%llu/%s\n", io_u->offset,
181 io_u->buflen, io_u->file->file_name);
183 iolog_delay(td, ipo->delay);
185 elapsed = mtime_since_genesis();
186 if (ipo->delay > elapsed)
187 usec_sleep(td, (ipo->delay - elapsed) * 1000);
192 if (io_u->ddir != DDIR_WAIT)
200 void prune_io_piece_log(struct thread_data *td)
202 struct io_piece *ipo;
203 struct fio_rb_node *n;
205 while ((n = rb_first(&td->io_hist_tree)) != NULL) {
206 ipo = rb_entry(n, struct io_piece, rb_node);
207 rb_erase(n, &td->io_hist_tree);
208 remove_trim_entry(td, ipo);
213 while (!flist_empty(&td->io_hist_list)) {
214 ipo = flist_first_entry(&td->io_hist_list, struct io_piece, list);
215 flist_del(&ipo->list);
216 remove_trim_entry(td, ipo);
223 * log a successful write, so we can unwind the log for verify
225 void log_io_piece(struct thread_data *td, struct io_u *io_u)
227 struct fio_rb_node **p, *parent;
228 struct io_piece *ipo, *__ipo;
230 ipo = calloc(1, sizeof(struct io_piece));
232 ipo->file = io_u->file;
233 ipo->offset = io_u->offset;
234 ipo->len = io_u->buflen;
235 ipo->numberio = io_u->numberio;
236 ipo->flags = IP_F_IN_FLIGHT;
240 if (io_u_should_trim(td, io_u)) {
241 flist_add_tail(&ipo->trim_list, &td->trim_list);
246 * Only sort writes if we don't have a random map in which case we need
247 * to check for duplicate blocks and drop the old one, which we rely on
248 * the rb insert/lookup for handling.
250 if (file_randommap(td, ipo->file)) {
251 INIT_FLIST_HEAD(&ipo->list);
252 flist_add_tail(&ipo->list, &td->io_hist_list);
253 ipo->flags |= IP_F_ONLIST;
258 RB_CLEAR_NODE(&ipo->rb_node);
261 * Sort the entry into the verification list
264 p = &td->io_hist_tree.rb_node;
270 __ipo = rb_entry(parent, struct io_piece, rb_node);
271 if (ipo->file < __ipo->file)
273 else if (ipo->file > __ipo->file)
275 else if (ipo->offset < __ipo->offset) {
277 overlap = ipo->offset + ipo->len > __ipo->offset;
279 else if (ipo->offset > __ipo->offset) {
281 overlap = __ipo->offset + __ipo->len > ipo->offset;
287 dprint(FD_IO, "iolog: overlap %llu/%lu, %llu/%lu\n",
288 __ipo->offset, __ipo->len,
289 ipo->offset, ipo->len);
291 rb_erase(parent, &td->io_hist_tree);
292 remove_trim_entry(td, __ipo);
293 if (!(__ipo->flags & IP_F_IN_FLIGHT))
299 rb_link_node(&ipo->rb_node, parent, p);
300 rb_insert_color(&ipo->rb_node, &td->io_hist_tree);
301 ipo->flags |= IP_F_ONRB;
305 void unlog_io_piece(struct thread_data *td, struct io_u *io_u)
307 struct io_piece *ipo = io_u->ipo;
309 if (td->ts.nr_block_infos) {
310 uint32_t *info = io_u_block_info(td, io_u);
311 if (BLOCK_INFO_STATE(*info) < BLOCK_STATE_TRIM_FAILURE) {
312 if (io_u->ddir == DDIR_TRIM)
313 *info = BLOCK_INFO_SET_STATE(*info,
314 BLOCK_STATE_TRIM_FAILURE);
315 else if (io_u->ddir == DDIR_WRITE)
316 *info = BLOCK_INFO_SET_STATE(*info,
317 BLOCK_STATE_WRITE_FAILURE);
324 if (ipo->flags & IP_F_ONRB)
325 rb_erase(&ipo->rb_node, &td->io_hist_tree);
326 else if (ipo->flags & IP_F_ONLIST)
327 flist_del(&ipo->list);
334 void trim_io_piece(const struct io_u *io_u)
336 struct io_piece *ipo = io_u->ipo;
341 ipo->len = io_u->xfer_buflen - io_u->resid;
344 void write_iolog_close(struct thread_data *td)
353 td->iolog_buf = NULL;
356 static int64_t iolog_items_to_fetch(struct thread_data *td)
361 int64_t items_to_fetch;
363 if (!td->io_log_highmark)
367 fio_gettime(&now, NULL);
368 elapsed = ntime_since(&td->io_log_highmark_time, &now);
370 for_1s = (td->io_log_highmark - td->io_log_current) * 1000000000 / elapsed;
371 items_to_fetch = for_1s - td->io_log_current;
372 if (items_to_fetch < 0)
377 td->io_log_highmark = td->io_log_current + items_to_fetch;
378 td->io_log_checkmark = (td->io_log_highmark + 1) / 2;
379 fio_gettime(&td->io_log_highmark_time, NULL);
381 return items_to_fetch;
385 * Read version 2 iolog data. It is enhanced to include per-file logging,
388 static bool read_iolog2(struct thread_data *td)
390 unsigned long long offset;
392 int reads, writes, waits, fileno = 0, file_action = 0; /* stupid gcc */
393 char *rfname, *fname, *act;
396 bool realloc = false;
397 int64_t items_to_fetch = 0;
399 if (td->o.read_iolog_chunked) {
400 items_to_fetch = iolog_items_to_fetch(td);
406 * Read in the read iolog and store it, reuse the infrastructure
407 * for doing verifications.
410 rfname = fname = malloc(256+16);
411 act = malloc(256+16);
413 reads = writes = waits = 0;
414 while ((p = fgets(str, 4096, td->io_log_rfile)) != NULL) {
415 struct io_piece *ipo;
418 r = sscanf(p, "%256s %256s %llu %u", rfname, act, &offset,
421 if (td->o.replay_redirect)
422 fname = td->o.replay_redirect;
428 if (!strcmp(act, "wait"))
430 else if (!strcmp(act, "read"))
432 else if (!strcmp(act, "write"))
434 else if (!strcmp(act, "sync"))
436 else if (!strcmp(act, "datasync"))
438 else if (!strcmp(act, "trim"))
441 log_err("fio: bad iolog file action: %s\n",
445 fileno = get_fileno(td, fname);
448 if (!strcmp(act, "add")) {
449 if (td->o.replay_redirect &&
450 get_fileno(td, fname) != -1) {
451 dprint(FD_FILE, "iolog: ignoring"
452 " re-add of file %s\n", fname);
454 fileno = add_file(td, fname, td->subjob_number, 1);
455 file_action = FIO_LOG_ADD_FILE;
458 } else if (!strcmp(act, "open")) {
459 fileno = get_fileno(td, fname);
460 file_action = FIO_LOG_OPEN_FILE;
461 } else if (!strcmp(act, "close")) {
462 fileno = get_fileno(td, fname);
463 file_action = FIO_LOG_CLOSE_FILE;
465 log_err("fio: bad iolog file action: %s\n",
470 log_err("bad iolog2: %s\n", p);
476 else if (rw == DDIR_WRITE) {
478 * Don't add a write for ro mode
483 } else if (rw == DDIR_WAIT) {
487 } else if (rw == DDIR_INVAL) {
488 } else if (!ddir_sync(rw)) {
489 log_err("bad ddir: %d\n", rw);
496 ipo = calloc(1, sizeof(*ipo));
499 if (rw == DDIR_WAIT) {
502 if (td->o.replay_scale)
503 ipo->offset = offset / td->o.replay_scale;
505 ipo->offset = offset;
506 ipo_bytes_align(td->o.replay_align, ipo);
509 if (rw != DDIR_INVAL && bytes > td->o.max_bs[rw]) {
511 td->o.max_bs[rw] = bytes;
513 ipo->fileno = fileno;
514 ipo->file_action = file_action;
518 queue_io_piece(td, ipo);
520 if (td->o.read_iolog_chunked) {
521 td->io_log_current++;
523 if (items_to_fetch == 0)
532 if (td->o.read_iolog_chunked) {
533 td->io_log_highmark = td->io_log_current;
534 td->io_log_checkmark = (td->io_log_highmark + 1) / 2;
535 fio_gettime(&td->io_log_highmark_time, NULL);
538 if (writes && read_only) {
539 log_err("fio: <%s> skips replay of %d writes due to"
540 " read-only\n", td->o.name, writes);
544 if (td->o.read_iolog_chunked) {
545 if (td->io_log_current == 0) {
548 td->o.td_ddir = TD_DDIR_RW;
549 if (realloc && td->orig_buffer)
553 init_io_u_buffers(td);
558 if (!reads && !writes && !waits)
560 else if (reads && !writes)
561 td->o.td_ddir = TD_DDIR_READ;
562 else if (!reads && writes)
563 td->o.td_ddir = TD_DDIR_WRITE;
565 td->o.td_ddir = TD_DDIR_RW;
570 static bool is_socket(const char *path)
575 r = stat(path, &buf);
579 return S_ISSOCK(buf.st_mode);
582 static int open_socket(const char *path)
584 struct sockaddr_un addr;
587 fd = socket(AF_UNIX, SOCK_STREAM, 0);
591 addr.sun_family = AF_UNIX;
592 if (snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", path) >=
593 sizeof(addr.sun_path)) {
594 log_err("%s: path name %s is too long for a Unix socket\n",
598 ret = connect(fd, (const struct sockaddr *)&addr, strlen(path) + sizeof(addr.sun_family));
607 * open iolog, check version, and call appropriate parser
609 static bool init_iolog_read(struct thread_data *td)
611 char buffer[256], *p, *fname;
614 fname = get_name_by_idx(td->o.read_iolog_file, td->subjob_number);
615 dprint(FD_IO, "iolog: name=%s\n", fname);
617 if (is_socket(fname)) {
620 fd = open_socket(fname);
623 } else if (!strcmp(fname, "-")) {
626 f = fopen(fname, "r");
631 perror("fopen read iolog");
635 p = fgets(buffer, sizeof(buffer), f);
637 td_verror(td, errno, "iolog read");
638 log_err("fio: unable to read iolog\n");
644 * version 2 of the iolog stores a specific string as the
645 * first line, check for that
647 if (!strncmp(iolog_ver2, buffer, strlen(iolog_ver2))) {
648 free_release_files(td);
649 td->io_log_rfile = f;
650 return read_iolog2(td);
653 log_err("fio: iolog version 1 is no longer supported\n");
659 * Set up a log for storing io patterns.
661 static bool init_iolog_write(struct thread_data *td)
667 f = fopen(td->o.write_iolog_file, "a");
669 perror("fopen write iolog");
674 * That's it for writing, setup a log buffer and we're done.
677 td->iolog_buf = malloc(8192);
678 setvbuf(f, td->iolog_buf, _IOFBF, 8192);
681 * write our version line
683 if (fprintf(f, "%s\n", iolog_ver2) < 0) {
684 perror("iolog init\n");
689 * add all known files
691 for_each_file(td, ff, i)
692 log_file(td, ff, FIO_LOG_ADD_FILE);
697 bool init_iolog(struct thread_data *td)
701 if (td->o.read_iolog_file) {
705 * Check if it's a blktrace file and load that if possible.
706 * Otherwise assume it's a normal log file and load that.
708 if (is_blktrace(td->o.read_iolog_file, &need_swap))
709 ret = load_blktrace(td, td->o.read_iolog_file, need_swap);
711 ret = init_iolog_read(td);
712 } else if (td->o.write_iolog_file)
713 ret = init_iolog_write(td);
718 td_verror(td, EINVAL, "failed initializing iolog");
723 void setup_log(struct io_log **log, struct log_params *p,
724 const char *filename)
728 struct io_u_plat_entry *entry;
729 struct flist_head *list;
731 l = scalloc(1, sizeof(*l));
732 INIT_FLIST_HEAD(&l->io_logs);
733 l->log_type = p->log_type;
734 l->log_offset = p->log_offset;
735 l->log_gz = p->log_gz;
736 l->log_gz_store = p->log_gz_store;
737 l->avg_msec = p->avg_msec;
738 l->hist_msec = p->hist_msec;
739 l->hist_coarseness = p->hist_coarseness;
740 l->filename = strdup(filename);
743 /* Initialize histogram lists for each r/w direction,
744 * with initial io_u_plat of all zeros:
746 for (i = 0; i < DDIR_RWDIR_CNT; i++) {
747 list = &l->hist_window[i].list;
748 INIT_FLIST_HEAD(list);
749 entry = calloc(1, sizeof(struct io_u_plat_entry));
750 flist_add(&entry->list, list);
753 if (l->td && l->td->o.io_submit_mode != IO_MODE_OFFLOAD) {
754 unsigned int def_samples = DEF_LOG_ENTRIES;
757 __p = calloc(1, sizeof(*l->pending));
758 if (l->td->o.iodepth > DEF_LOG_ENTRIES)
759 def_samples = roundup_pow2(l->td->o.iodepth);
760 __p->max_samples = def_samples;
761 __p->log = calloc(__p->max_samples, log_entry_sz(l));
766 l->log_ddir_mask = LOG_OFFSET_SAMPLE_BIT;
768 INIT_FLIST_HEAD(&l->chunk_list);
770 if (l->log_gz && !p->td)
772 else if (l->log_gz || l->log_gz_store) {
773 mutex_init_pshared(&l->chunk_lock);
774 mutex_init_pshared(&l->deferred_free_lock);
775 p->td->flags |= TD_F_COMPRESS_LOG;
781 #ifdef CONFIG_SETVBUF
782 static void *set_file_buffer(FILE *f)
784 size_t size = 1048576;
788 setvbuf(f, buf, _IOFBF, size);
792 static void clear_file_buffer(void *buf)
797 static void *set_file_buffer(FILE *f)
802 static void clear_file_buffer(void *buf)
807 void free_log(struct io_log *log)
809 while (!flist_empty(&log->io_logs)) {
810 struct io_logs *cur_log;
812 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
813 flist_del_init(&cur_log->list);
819 free(log->pending->log);
829 uint64_t hist_sum(int j, int stride, uint64_t *io_u_plat,
830 uint64_t *io_u_plat_last)
835 if (io_u_plat_last) {
836 for (k = sum = 0; k < stride; k++)
837 sum += io_u_plat[j + k] - io_u_plat_last[j + k];
839 for (k = sum = 0; k < stride; k++)
840 sum += io_u_plat[j + k];
846 static void flush_hist_samples(FILE *f, int hist_coarseness, void *samples,
847 uint64_t sample_size)
851 uint64_t i, j, nr_samples;
852 struct io_u_plat_entry *entry, *entry_before;
854 uint64_t *io_u_plat_before;
856 int stride = 1 << hist_coarseness;
861 s = __get_sample(samples, 0, 0);
862 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
864 nr_samples = sample_size / __log_entry_sz(log_offset);
866 for (i = 0; i < nr_samples; i++) {
867 s = __get_sample(samples, log_offset, i);
869 entry = s->data.plat_entry;
870 io_u_plat = entry->io_u_plat;
872 entry_before = flist_first_entry(&entry->list, struct io_u_plat_entry, list);
873 io_u_plat_before = entry_before->io_u_plat;
875 fprintf(f, "%lu, %u, %llu, ", (unsigned long) s->time,
876 io_sample_ddir(s), (unsigned long long) s->bs);
877 for (j = 0; j < FIO_IO_U_PLAT_NR - stride; j += stride) {
878 fprintf(f, "%llu, ", (unsigned long long)
879 hist_sum(j, stride, io_u_plat, io_u_plat_before));
881 fprintf(f, "%llu\n", (unsigned long long)
882 hist_sum(FIO_IO_U_PLAT_NR - stride, stride, io_u_plat,
885 flist_del(&entry_before->list);
890 void flush_samples(FILE *f, void *samples, uint64_t sample_size)
894 uint64_t i, nr_samples;
899 s = __get_sample(samples, 0, 0);
900 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
902 nr_samples = sample_size / __log_entry_sz(log_offset);
904 for (i = 0; i < nr_samples; i++) {
905 s = __get_sample(samples, log_offset, i);
908 fprintf(f, "%lu, %" PRId64 ", %u, %llu, %u\n",
909 (unsigned long) s->time,
911 io_sample_ddir(s), (unsigned long long) s->bs, s->priority_bit);
913 struct io_sample_offset *so = (void *) s;
915 fprintf(f, "%lu, %" PRId64 ", %u, %llu, %llu, %u\n",
916 (unsigned long) s->time,
918 io_sample_ddir(s), (unsigned long long) s->bs,
919 (unsigned long long) so->offset, s->priority_bit);
926 struct iolog_flush_data {
927 struct workqueue_work work;
934 #define GZ_CHUNK 131072
936 static struct iolog_compress *get_new_chunk(unsigned int seq)
938 struct iolog_compress *c;
940 c = malloc(sizeof(*c));
941 INIT_FLIST_HEAD(&c->list);
942 c->buf = malloc(GZ_CHUNK);
948 static void free_chunk(struct iolog_compress *ic)
954 static int z_stream_init(z_stream *stream, int gz_hdr)
958 memset(stream, 0, sizeof(*stream));
959 stream->zalloc = Z_NULL;
960 stream->zfree = Z_NULL;
961 stream->opaque = Z_NULL;
962 stream->next_in = Z_NULL;
965 * zlib magic - add 32 for auto-detection of gz header or not,
966 * if we decide to store files in a gzip friendly format.
971 if (inflateInit2(stream, wbits) != Z_OK)
977 struct inflate_chunk_iter {
986 static void finish_chunk(z_stream *stream, FILE *f,
987 struct inflate_chunk_iter *iter)
991 ret = inflateEnd(stream);
993 log_err("fio: failed to end log inflation seq %d (%d)\n",
996 flush_samples(f, iter->buf, iter->buf_used);
999 iter->buf_size = iter->buf_used = 0;
1003 * Iterative chunk inflation. Handles cases where we cross into a new
1004 * sequence, doing flush finish of previous chunk if needed.
1006 static size_t inflate_chunk(struct iolog_compress *ic, int gz_hdr, FILE *f,
1007 z_stream *stream, struct inflate_chunk_iter *iter)
1011 dprint(FD_COMPRESS, "inflate chunk size=%lu, seq=%u\n",
1012 (unsigned long) ic->len, ic->seq);
1014 if (ic->seq != iter->seq) {
1016 finish_chunk(stream, f, iter);
1018 z_stream_init(stream, gz_hdr);
1019 iter->seq = ic->seq;
1022 stream->avail_in = ic->len;
1023 stream->next_in = ic->buf;
1025 if (!iter->buf_size) {
1026 iter->buf_size = iter->chunk_sz;
1027 iter->buf = malloc(iter->buf_size);
1030 while (stream->avail_in) {
1031 size_t this_out = iter->buf_size - iter->buf_used;
1034 stream->avail_out = this_out;
1035 stream->next_out = iter->buf + iter->buf_used;
1037 err = inflate(stream, Z_NO_FLUSH);
1039 log_err("fio: failed inflating log: %d\n", err);
1044 iter->buf_used += this_out - stream->avail_out;
1046 if (!stream->avail_out) {
1047 iter->buf_size += iter->chunk_sz;
1048 iter->buf = realloc(iter->buf, iter->buf_size);
1052 if (err == Z_STREAM_END)
1056 ret = (void *) stream->next_in - ic->buf;
1058 dprint(FD_COMPRESS, "inflated to size=%lu\n", (unsigned long) iter->buf_size);
1064 * Inflate stored compressed chunks, or write them directly to the log
1065 * file if so instructed.
1067 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1069 struct inflate_chunk_iter iter = { .chunk_sz = log->log_gz, };
1072 while (!flist_empty(&log->chunk_list)) {
1073 struct iolog_compress *ic;
1075 ic = flist_first_entry(&log->chunk_list, struct iolog_compress, list);
1076 flist_del(&ic->list);
1078 if (log->log_gz_store) {
1081 dprint(FD_COMPRESS, "log write chunk size=%lu, "
1082 "seq=%u\n", (unsigned long) ic->len, ic->seq);
1084 ret = fwrite(ic->buf, ic->len, 1, f);
1085 if (ret != 1 || ferror(f)) {
1087 log_err("fio: error writing compressed log\n");
1090 inflate_chunk(ic, log->log_gz_store, f, &stream, &iter);
1096 finish_chunk(&stream, f, &iter);
1104 * Open compressed log file and decompress the stored chunks and
1105 * write them to stdout. The chunks are stored sequentially in the
1106 * file, so we iterate over them and do them one-by-one.
1108 int iolog_file_inflate(const char *file)
1110 struct inflate_chunk_iter iter = { .chunk_sz = 64 * 1024 * 1024, };
1111 struct iolog_compress ic;
1119 f = fopen(file, "r");
1125 if (stat(file, &sb) < 0) {
1131 ic.buf = buf = malloc(sb.st_size);
1132 ic.len = sb.st_size;
1135 ret = fread(ic.buf, ic.len, 1, f);
1136 if (ret == 0 && ferror(f)) {
1141 } else if (ferror(f) || (!feof(f) && ret != 1)) {
1142 log_err("fio: short read on reading log\n");
1151 * Each chunk will return Z_STREAM_END. We don't know how many
1152 * chunks are in the file, so we just keep looping and incrementing
1153 * the sequence number until we have consumed the whole compressed
1160 iret = inflate_chunk(&ic, 1, stdout, &stream, &iter);
1173 finish_chunk(&stream, stdout, &iter);
1183 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1188 int iolog_file_inflate(const char *file)
1190 log_err("fio: log inflation not possible without zlib\n");
1196 void flush_log(struct io_log *log, bool do_append)
1202 f = fopen(log->filename, "w");
1204 f = fopen(log->filename, "a");
1206 perror("fopen log");
1210 buf = set_file_buffer(f);
1212 inflate_gz_chunks(log, f);
1214 while (!flist_empty(&log->io_logs)) {
1215 struct io_logs *cur_log;
1217 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1218 flist_del_init(&cur_log->list);
1220 if (log->td && log == log->td->clat_hist_log)
1221 flush_hist_samples(f, log->hist_coarseness, cur_log->log,
1222 log_sample_sz(log, cur_log));
1224 flush_samples(f, cur_log->log, log_sample_sz(log, cur_log));
1230 clear_file_buffer(buf);
1233 static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
1235 if (td->flags & TD_F_COMPRESS_LOG)
1239 if (fio_trylock_file(log->filename))
1242 fio_lock_file(log->filename);
1244 if (td->client_type == FIO_CLIENT_TYPE_GUI || is_backend)
1245 fio_send_iolog(td, log, log->filename);
1247 flush_log(log, !td->o.per_job_logs);
1249 fio_unlock_file(log->filename);
1254 size_t log_chunk_sizes(struct io_log *log)
1256 struct flist_head *entry;
1259 if (flist_empty(&log->chunk_list))
1263 pthread_mutex_lock(&log->chunk_lock);
1264 flist_for_each(entry, &log->chunk_list) {
1265 struct iolog_compress *c;
1267 c = flist_entry(entry, struct iolog_compress, list);
1270 pthread_mutex_unlock(&log->chunk_lock);
1276 static void iolog_put_deferred(struct io_log *log, void *ptr)
1281 pthread_mutex_lock(&log->deferred_free_lock);
1282 if (log->deferred < IOLOG_MAX_DEFER) {
1283 log->deferred_items[log->deferred] = ptr;
1285 } else if (!fio_did_warn(FIO_WARN_IOLOG_DROP))
1286 log_err("fio: had to drop log entry free\n");
1287 pthread_mutex_unlock(&log->deferred_free_lock);
1290 static void iolog_free_deferred(struct io_log *log)
1297 pthread_mutex_lock(&log->deferred_free_lock);
1299 for (i = 0; i < log->deferred; i++) {
1300 free(log->deferred_items[i]);
1301 log->deferred_items[i] = NULL;
1305 pthread_mutex_unlock(&log->deferred_free_lock);
1308 static int gz_work(struct iolog_flush_data *data)
1310 struct iolog_compress *c = NULL;
1311 struct flist_head list;
1317 INIT_FLIST_HEAD(&list);
1319 memset(&stream, 0, sizeof(stream));
1320 stream.zalloc = Z_NULL;
1321 stream.zfree = Z_NULL;
1322 stream.opaque = Z_NULL;
1324 ret = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
1326 log_err("fio: failed to init gz stream\n");
1330 seq = ++data->log->chunk_seq;
1332 stream.next_in = (void *) data->samples;
1333 stream.avail_in = data->nr_samples * log_entry_sz(data->log);
1335 dprint(FD_COMPRESS, "deflate input size=%lu, seq=%u, log=%s\n",
1336 (unsigned long) stream.avail_in, seq,
1337 data->log->filename);
1340 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1341 (unsigned long) c->len);
1342 c = get_new_chunk(seq);
1343 stream.avail_out = GZ_CHUNK;
1344 stream.next_out = c->buf;
1345 ret = deflate(&stream, Z_NO_FLUSH);
1347 log_err("fio: deflate log (%d)\n", ret);
1352 c->len = GZ_CHUNK - stream.avail_out;
1353 flist_add_tail(&c->list, &list);
1355 } while (stream.avail_in);
1357 stream.next_out = c->buf + c->len;
1358 stream.avail_out = GZ_CHUNK - c->len;
1360 ret = deflate(&stream, Z_FINISH);
1363 * Z_BUF_ERROR is special, it just means we need more
1364 * output space. We'll handle that below. Treat any other
1367 if (ret != Z_BUF_ERROR) {
1368 log_err("fio: deflate log (%d)\n", ret);
1369 flist_del(&c->list);
1376 c->len = GZ_CHUNK - stream.avail_out;
1378 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq, (unsigned long) c->len);
1380 if (ret != Z_STREAM_END) {
1382 c = get_new_chunk(seq);
1383 stream.avail_out = GZ_CHUNK;
1384 stream.next_out = c->buf;
1385 ret = deflate(&stream, Z_FINISH);
1386 c->len = GZ_CHUNK - stream.avail_out;
1388 flist_add_tail(&c->list, &list);
1389 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1390 (unsigned long) c->len);
1391 } while (ret != Z_STREAM_END);
1394 dprint(FD_COMPRESS, "deflated to size=%lu\n", (unsigned long) total);
1396 ret = deflateEnd(&stream);
1398 log_err("fio: deflateEnd %d\n", ret);
1400 iolog_put_deferred(data->log, data->samples);
1402 if (!flist_empty(&list)) {
1403 pthread_mutex_lock(&data->log->chunk_lock);
1404 flist_splice_tail(&list, &data->log->chunk_list);
1405 pthread_mutex_unlock(&data->log->chunk_lock);
1414 while (!flist_empty(&list)) {
1415 c = flist_first_entry(list.next, struct iolog_compress, list);
1416 flist_del(&c->list);
1424 * Invoked from our compress helper thread, when logging would have exceeded
1425 * the specified memory limitation. Compresses the previously stored
1428 static int gz_work_async(struct submit_worker *sw, struct workqueue_work *work)
1430 return gz_work(container_of(work, struct iolog_flush_data, work));
1433 static int gz_init_worker(struct submit_worker *sw)
1435 struct thread_data *td = sw->wq->td;
1437 if (!fio_option_is_set(&td->o, log_gz_cpumask))
1440 if (fio_setaffinity(gettid(), td->o.log_gz_cpumask) == -1) {
1441 log_err("gz: failed to set CPU affinity\n");
1448 static struct workqueue_ops log_compress_wq_ops = {
1449 .fn = gz_work_async,
1450 .init_worker_fn = gz_init_worker,
1454 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1456 if (!(td->flags & TD_F_COMPRESS_LOG))
1459 workqueue_init(td, &td->log_compress_wq, &log_compress_wq_ops, 1, sk_out);
1463 void iolog_compress_exit(struct thread_data *td)
1465 if (!(td->flags & TD_F_COMPRESS_LOG))
1468 workqueue_exit(&td->log_compress_wq);
1472 * Queue work item to compress the existing log entries. We reset the
1473 * current log to a small size, and reference the existing log in the
1474 * data that we queue for compression. Once compression has been done,
1475 * this old log is freed. If called with finish == true, will not return
1476 * until the log compression has completed, and will flush all previous
1479 static int iolog_flush(struct io_log *log)
1481 struct iolog_flush_data *data;
1483 data = malloc(sizeof(*data));
1490 while (!flist_empty(&log->io_logs)) {
1491 struct io_logs *cur_log;
1493 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1494 flist_del_init(&cur_log->list);
1496 data->samples = cur_log->log;
1497 data->nr_samples = cur_log->nr_samples;
1508 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1510 struct iolog_flush_data *data;
1512 data = smalloc(sizeof(*data));
1518 data->samples = cur_log->log;
1519 data->nr_samples = cur_log->nr_samples;
1522 cur_log->nr_samples = cur_log->max_samples = 0;
1523 cur_log->log = NULL;
1525 workqueue_enqueue(&log->td->log_compress_wq, &data->work);
1527 iolog_free_deferred(log);
1533 static int iolog_flush(struct io_log *log)
1538 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1543 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1548 void iolog_compress_exit(struct thread_data *td)
1554 struct io_logs *iolog_cur_log(struct io_log *log)
1556 if (flist_empty(&log->io_logs))
1559 return flist_last_entry(&log->io_logs, struct io_logs, list);
1562 uint64_t iolog_nr_samples(struct io_log *iolog)
1564 struct flist_head *entry;
1567 flist_for_each(entry, &iolog->io_logs) {
1568 struct io_logs *cur_log;
1570 cur_log = flist_entry(entry, struct io_logs, list);
1571 ret += cur_log->nr_samples;
1577 static int __write_log(struct thread_data *td, struct io_log *log, int try)
1580 return finish_log(td, log, try);
1585 static int write_iops_log(struct thread_data *td, int try, bool unit_log)
1589 if (per_unit_log(td->iops_log) != unit_log)
1592 ret = __write_log(td, td->iops_log, try);
1594 td->iops_log = NULL;
1599 static int write_slat_log(struct thread_data *td, int try, bool unit_log)
1606 ret = __write_log(td, td->slat_log, try);
1608 td->slat_log = NULL;
1613 static int write_clat_log(struct thread_data *td, int try, bool unit_log)
1620 ret = __write_log(td, td->clat_log, try);
1622 td->clat_log = NULL;
1627 static int write_clat_hist_log(struct thread_data *td, int try, bool unit_log)
1634 ret = __write_log(td, td->clat_hist_log, try);
1636 td->clat_hist_log = NULL;
1641 static int write_lat_log(struct thread_data *td, int try, bool unit_log)
1648 ret = __write_log(td, td->lat_log, try);
1655 static int write_bandw_log(struct thread_data *td, int try, bool unit_log)
1659 if (per_unit_log(td->bw_log) != unit_log)
1662 ret = __write_log(td, td->bw_log, try);
1675 CLAT_HIST_LOG_MASK = 32,
1682 int (*fn)(struct thread_data *, int, bool);
1685 static struct log_type log_types[] = {
1687 .mask = BW_LOG_MASK,
1688 .fn = write_bandw_log,
1691 .mask = LAT_LOG_MASK,
1692 .fn = write_lat_log,
1695 .mask = SLAT_LOG_MASK,
1696 .fn = write_slat_log,
1699 .mask = CLAT_LOG_MASK,
1700 .fn = write_clat_log,
1703 .mask = IOPS_LOG_MASK,
1704 .fn = write_iops_log,
1707 .mask = CLAT_HIST_LOG_MASK,
1708 .fn = write_clat_hist_log,
1712 void td_writeout_logs(struct thread_data *td, bool unit_logs)
1714 unsigned int log_mask = 0;
1715 unsigned int log_left = ALL_LOG_NR;
1718 old_state = td_bump_runstate(td, TD_FINISHING);
1720 finalize_logs(td, unit_logs);
1723 int prev_log_left = log_left;
1725 for (i = 0; i < ALL_LOG_NR && log_left; i++) {
1726 struct log_type *lt = &log_types[i];
1729 if (!(log_mask & lt->mask)) {
1730 ret = lt->fn(td, log_left != 1, unit_logs);
1733 log_mask |= lt->mask;
1738 if (prev_log_left == log_left)
1742 td_restore_runstate(td, old_state);
1745 void fio_writeout_logs(bool unit_logs)
1747 struct thread_data *td;
1751 td_writeout_logs(td, unit_logs);