2 * Code related to writing an iolog of what a thread is doing, and to
3 * later read that back and replay
22 #include "lib/roundup.h"
24 #include <netinet/in.h>
25 #include <netinet/tcp.h>
26 #include <arpa/inet.h>
28 #include <sys/socket.h>
31 static int iolog_flush(struct io_log *log);
33 static const char iolog_ver2[] = "fio version 2 iolog";
35 void queue_io_piece(struct thread_data *td, struct io_piece *ipo)
37 flist_add_tail(&ipo->list, &td->io_log_list);
38 td->total_io_size += ipo->len;
41 void log_io_u(const struct thread_data *td, const struct io_u *io_u)
43 if (!td->o.write_iolog_file)
46 fprintf(td->iolog_f, "%s %s %llu %llu\n", io_u->file->file_name,
47 io_ddir_name(io_u->ddir),
48 io_u->offset, io_u->buflen);
51 void log_file(struct thread_data *td, struct fio_file *f,
52 enum file_log_act what)
54 const char *act[] = { "add", "open", "close" };
58 if (!td->o.write_iolog_file)
63 * this happens on the pre-open/close done before the job starts
68 fprintf(td->iolog_f, "%s %s\n", f->file_name, act[what]);
71 static void iolog_delay(struct thread_data *td, unsigned long delay)
73 uint64_t usec = utime_since_now(&td->last_issue);
74 unsigned long orig_delay = delay;
78 if (delay < td->time_offset) {
83 delay -= td->time_offset;
89 fio_gettime(&ts, NULL);
90 while (delay && !td->terminate) {
92 if (this_delay > 500000)
95 usec_sleep(td, this_delay);
99 usec = utime_since_now(&ts);
100 if (usec > orig_delay)
101 td->time_offset = usec - orig_delay;
106 static int ipo_special(struct thread_data *td, struct io_piece *ipo)
114 if (ipo->ddir != DDIR_INVAL)
117 f = td->files[ipo->fileno];
119 switch (ipo->file_action) {
120 case FIO_LOG_OPEN_FILE:
121 if (td->o.replay_redirect && fio_file_open(f)) {
122 dprint(FD_FILE, "iolog: ignoring re-open of file %s\n",
126 ret = td_io_open_file(td, f);
129 td_verror(td, ret, "iolog open file");
131 case FIO_LOG_CLOSE_FILE:
132 td_io_close_file(td, f);
134 case FIO_LOG_UNLINK_FILE:
135 td_io_unlink_file(td, f);
138 log_err("fio: bad file action %d\n", ipo->file_action);
145 static bool read_iolog2(struct thread_data *td);
147 int read_iolog_get(struct thread_data *td, struct io_u *io_u)
149 struct io_piece *ipo;
150 unsigned long elapsed;
152 while (!flist_empty(&td->io_log_list)) {
154 if (td->o.read_iolog_chunked) {
155 if (td->io_log_checkmark == td->io_log_current) {
156 if (!read_iolog2(td))
159 td->io_log_current--;
161 ipo = flist_first_entry(&td->io_log_list, struct io_piece, list);
162 flist_del(&ipo->list);
163 remove_trim_entry(td, ipo);
165 ret = ipo_special(td, ipo);
169 } else if (ret > 0) {
174 io_u->ddir = ipo->ddir;
175 if (ipo->ddir != DDIR_WAIT) {
176 io_u->offset = ipo->offset;
177 io_u->verify_offset = ipo->offset;
178 io_u->buflen = ipo->len;
179 io_u->file = td->files[ipo->fileno];
180 get_file(io_u->file);
181 dprint(FD_IO, "iolog: get %llu/%llu/%s\n", io_u->offset,
182 io_u->buflen, io_u->file->file_name);
184 iolog_delay(td, ipo->delay);
186 elapsed = mtime_since_genesis();
187 if (ipo->delay > elapsed)
188 usec_sleep(td, (ipo->delay - elapsed) * 1000);
193 if (io_u->ddir != DDIR_WAIT)
201 void prune_io_piece_log(struct thread_data *td)
203 struct io_piece *ipo;
204 struct fio_rb_node *n;
206 while ((n = rb_first(&td->io_hist_tree)) != NULL) {
207 ipo = rb_entry(n, struct io_piece, rb_node);
208 rb_erase(n, &td->io_hist_tree);
209 remove_trim_entry(td, ipo);
214 while (!flist_empty(&td->io_hist_list)) {
215 ipo = flist_first_entry(&td->io_hist_list, struct io_piece, list);
216 flist_del(&ipo->list);
217 remove_trim_entry(td, ipo);
224 * log a successful write, so we can unwind the log for verify
226 void log_io_piece(struct thread_data *td, struct io_u *io_u)
228 struct fio_rb_node **p, *parent;
229 struct io_piece *ipo, *__ipo;
231 ipo = calloc(1, sizeof(struct io_piece));
233 ipo->file = io_u->file;
234 ipo->offset = io_u->offset;
235 ipo->len = io_u->buflen;
236 ipo->numberio = io_u->numberio;
237 ipo->flags = IP_F_IN_FLIGHT;
241 if (io_u_should_trim(td, io_u)) {
242 flist_add_tail(&ipo->trim_list, &td->trim_list);
247 * Only sort writes if we don't have a random map in which case we need
248 * to check for duplicate blocks and drop the old one, which we rely on
249 * the rb insert/lookup for handling.
251 if (file_randommap(td, ipo->file)) {
252 INIT_FLIST_HEAD(&ipo->list);
253 flist_add_tail(&ipo->list, &td->io_hist_list);
254 ipo->flags |= IP_F_ONLIST;
259 RB_CLEAR_NODE(&ipo->rb_node);
262 * Sort the entry into the verification list
265 p = &td->io_hist_tree.rb_node;
271 __ipo = rb_entry(parent, struct io_piece, rb_node);
272 if (ipo->file < __ipo->file)
274 else if (ipo->file > __ipo->file)
276 else if (ipo->offset < __ipo->offset) {
278 overlap = ipo->offset + ipo->len > __ipo->offset;
280 else if (ipo->offset > __ipo->offset) {
282 overlap = __ipo->offset + __ipo->len > ipo->offset;
288 dprint(FD_IO, "iolog: overlap %llu/%lu, %llu/%lu\n",
289 __ipo->offset, __ipo->len,
290 ipo->offset, ipo->len);
292 rb_erase(parent, &td->io_hist_tree);
293 remove_trim_entry(td, __ipo);
294 if (!(__ipo->flags & IP_F_IN_FLIGHT))
300 rb_link_node(&ipo->rb_node, parent, p);
301 rb_insert_color(&ipo->rb_node, &td->io_hist_tree);
302 ipo->flags |= IP_F_ONRB;
306 void unlog_io_piece(struct thread_data *td, struct io_u *io_u)
308 struct io_piece *ipo = io_u->ipo;
310 if (td->ts.nr_block_infos) {
311 uint32_t *info = io_u_block_info(td, io_u);
312 if (BLOCK_INFO_STATE(*info) < BLOCK_STATE_TRIM_FAILURE) {
313 if (io_u->ddir == DDIR_TRIM)
314 *info = BLOCK_INFO_SET_STATE(*info,
315 BLOCK_STATE_TRIM_FAILURE);
316 else if (io_u->ddir == DDIR_WRITE)
317 *info = BLOCK_INFO_SET_STATE(*info,
318 BLOCK_STATE_WRITE_FAILURE);
325 if (ipo->flags & IP_F_ONRB)
326 rb_erase(&ipo->rb_node, &td->io_hist_tree);
327 else if (ipo->flags & IP_F_ONLIST)
328 flist_del(&ipo->list);
335 void trim_io_piece(const struct io_u *io_u)
337 struct io_piece *ipo = io_u->ipo;
342 ipo->len = io_u->xfer_buflen - io_u->resid;
345 void write_iolog_close(struct thread_data *td)
354 td->iolog_buf = NULL;
357 static int64_t iolog_items_to_fetch(struct thread_data *td)
362 int64_t items_to_fetch;
364 if (!td->io_log_highmark)
368 fio_gettime(&now, NULL);
369 elapsed = ntime_since(&td->io_log_highmark_time, &now);
371 for_1s = (td->io_log_highmark - td->io_log_current) * 1000000000 / elapsed;
372 items_to_fetch = for_1s - td->io_log_current;
373 if (items_to_fetch < 0)
378 td->io_log_highmark = td->io_log_current + items_to_fetch;
379 td->io_log_checkmark = (td->io_log_highmark + 1) / 2;
380 fio_gettime(&td->io_log_highmark_time, NULL);
382 return items_to_fetch;
386 * Read version 2 iolog data. It is enhanced to include per-file logging,
389 static bool read_iolog2(struct thread_data *td)
391 unsigned long long offset;
393 int reads, writes, waits, fileno = 0, file_action = 0; /* stupid gcc */
394 char *rfname, *fname, *act;
397 bool realloc = false;
398 int64_t items_to_fetch = 0;
400 if (td->o.read_iolog_chunked) {
401 items_to_fetch = iolog_items_to_fetch(td);
407 * Read in the read iolog and store it, reuse the infrastructure
408 * for doing verifications.
411 rfname = fname = malloc(256+16);
412 act = malloc(256+16);
414 reads = writes = waits = 0;
415 while ((p = fgets(str, 4096, td->io_log_rfile)) != NULL) {
416 struct io_piece *ipo;
419 r = sscanf(p, "%256s %256s %llu %u", rfname, act, &offset,
422 if (td->o.replay_redirect)
423 fname = td->o.replay_redirect;
429 if (!strcmp(act, "wait"))
431 else if (!strcmp(act, "read"))
433 else if (!strcmp(act, "write"))
435 else if (!strcmp(act, "sync"))
437 else if (!strcmp(act, "datasync"))
439 else if (!strcmp(act, "trim"))
442 log_err("fio: bad iolog file action: %s\n",
446 fileno = get_fileno(td, fname);
449 if (!strcmp(act, "add")) {
450 if (td->o.replay_redirect &&
451 get_fileno(td, fname) != -1) {
452 dprint(FD_FILE, "iolog: ignoring"
453 " re-add of file %s\n", fname);
455 fileno = add_file(td, fname, td->subjob_number, 1);
456 file_action = FIO_LOG_ADD_FILE;
459 } else if (!strcmp(act, "open")) {
460 fileno = get_fileno(td, fname);
461 file_action = FIO_LOG_OPEN_FILE;
462 } else if (!strcmp(act, "close")) {
463 fileno = get_fileno(td, fname);
464 file_action = FIO_LOG_CLOSE_FILE;
466 log_err("fio: bad iolog file action: %s\n",
471 log_err("bad iolog2: %s\n", p);
477 else if (rw == DDIR_WRITE) {
479 * Don't add a write for ro mode
484 } else if (rw == DDIR_WAIT) {
488 } else if (rw == DDIR_INVAL) {
489 } else if (!ddir_sync(rw)) {
490 log_err("bad ddir: %d\n", rw);
497 ipo = calloc(1, sizeof(*ipo));
500 if (rw == DDIR_WAIT) {
503 if (td->o.replay_scale)
504 ipo->offset = offset / td->o.replay_scale;
506 ipo->offset = offset;
507 ipo_bytes_align(td->o.replay_align, ipo);
510 if (rw != DDIR_INVAL && bytes > td->o.max_bs[rw]) {
512 td->o.max_bs[rw] = bytes;
514 ipo->fileno = fileno;
515 ipo->file_action = file_action;
519 queue_io_piece(td, ipo);
521 if (td->o.read_iolog_chunked) {
522 td->io_log_current++;
524 if (items_to_fetch == 0)
533 if (td->o.read_iolog_chunked) {
534 td->io_log_highmark = td->io_log_current;
535 td->io_log_checkmark = (td->io_log_highmark + 1) / 2;
536 fio_gettime(&td->io_log_highmark_time, NULL);
539 if (writes && read_only) {
540 log_err("fio: <%s> skips replay of %d writes due to"
541 " read-only\n", td->o.name, writes);
545 if (td->o.read_iolog_chunked) {
546 if (td->io_log_current == 0) {
549 td->o.td_ddir = TD_DDIR_RW;
550 if (realloc && td->orig_buffer)
554 init_io_u_buffers(td);
559 if (!reads && !writes && !waits)
561 else if (reads && !writes)
562 td->o.td_ddir = TD_DDIR_READ;
563 else if (!reads && writes)
564 td->o.td_ddir = TD_DDIR_WRITE;
566 td->o.td_ddir = TD_DDIR_RW;
571 static bool is_socket(const char *path)
576 r = stat(path, &buf);
580 return S_ISSOCK(buf.st_mode);
583 static int open_socket(const char *path)
585 struct sockaddr_un addr;
588 fd = socket(AF_UNIX, SOCK_STREAM, 0);
592 addr.sun_family = AF_UNIX;
593 if (snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", path) >=
594 sizeof(addr.sun_path)) {
595 log_err("%s: path name %s is too long for a Unix socket\n",
599 ret = connect(fd, (const struct sockaddr *)&addr, strlen(path) + sizeof(addr.sun_family));
608 * open iolog, check version, and call appropriate parser
610 static bool init_iolog_read(struct thread_data *td)
612 char buffer[256], *p, *fname;
615 fname = get_name_by_idx(td->o.read_iolog_file, td->subjob_number);
616 dprint(FD_IO, "iolog: name=%s\n", fname);
618 if (is_socket(fname)) {
621 fd = open_socket(fname);
624 } else if (!strcmp(fname, "-")) {
627 f = fopen(fname, "r");
632 perror("fopen read iolog");
636 p = fgets(buffer, sizeof(buffer), f);
638 td_verror(td, errno, "iolog read");
639 log_err("fio: unable to read iolog\n");
645 * version 2 of the iolog stores a specific string as the
646 * first line, check for that
648 if (!strncmp(iolog_ver2, buffer, strlen(iolog_ver2))) {
649 free_release_files(td);
650 td->io_log_rfile = f;
651 return read_iolog2(td);
654 log_err("fio: iolog version 1 is no longer supported\n");
660 * Set up a log for storing io patterns.
662 static bool init_iolog_write(struct thread_data *td)
668 f = fopen(td->o.write_iolog_file, "a");
670 perror("fopen write iolog");
675 * That's it for writing, setup a log buffer and we're done.
678 td->iolog_buf = malloc(8192);
679 setvbuf(f, td->iolog_buf, _IOFBF, 8192);
682 * write our version line
684 if (fprintf(f, "%s\n", iolog_ver2) < 0) {
685 perror("iolog init\n");
690 * add all known files
692 for_each_file(td, ff, i)
693 log_file(td, ff, FIO_LOG_ADD_FILE);
698 bool init_iolog(struct thread_data *td)
702 if (td->o.read_iolog_file) {
706 * Check if it's a blktrace file and load that if possible.
707 * Otherwise assume it's a normal log file and load that.
709 if (is_blktrace(td->o.read_iolog_file, &need_swap))
710 ret = load_blktrace(td, td->o.read_iolog_file, need_swap);
712 ret = init_iolog_read(td);
713 } else if (td->o.write_iolog_file)
714 ret = init_iolog_write(td);
719 td_verror(td, EINVAL, "failed initializing iolog");
724 void setup_log(struct io_log **log, struct log_params *p,
725 const char *filename)
729 struct io_u_plat_entry *entry;
730 struct flist_head *list;
732 l = scalloc(1, sizeof(*l));
733 INIT_FLIST_HEAD(&l->io_logs);
734 l->log_type = p->log_type;
735 l->log_offset = p->log_offset;
736 l->log_gz = p->log_gz;
737 l->log_gz_store = p->log_gz_store;
738 l->avg_msec = p->avg_msec;
739 l->hist_msec = p->hist_msec;
740 l->hist_coarseness = p->hist_coarseness;
741 l->filename = strdup(filename);
744 /* Initialize histogram lists for each r/w direction,
745 * with initial io_u_plat of all zeros:
747 for (i = 0; i < DDIR_RWDIR_CNT; i++) {
748 list = &l->hist_window[i].list;
749 INIT_FLIST_HEAD(list);
750 entry = calloc(1, sizeof(struct io_u_plat_entry));
751 flist_add(&entry->list, list);
754 if (l->td && l->td->o.io_submit_mode != IO_MODE_OFFLOAD) {
755 unsigned int def_samples = DEF_LOG_ENTRIES;
758 __p = calloc(1, sizeof(*l->pending));
759 if (l->td->o.iodepth > DEF_LOG_ENTRIES)
760 def_samples = roundup_pow2(l->td->o.iodepth);
761 __p->max_samples = def_samples;
762 __p->log = calloc(__p->max_samples, log_entry_sz(l));
767 l->log_ddir_mask = LOG_OFFSET_SAMPLE_BIT;
769 INIT_FLIST_HEAD(&l->chunk_list);
771 if (l->log_gz && !p->td)
773 else if (l->log_gz || l->log_gz_store) {
774 mutex_init_pshared(&l->chunk_lock);
775 mutex_init_pshared(&l->deferred_free_lock);
776 p->td->flags |= TD_F_COMPRESS_LOG;
782 #ifdef CONFIG_SETVBUF
783 static void *set_file_buffer(FILE *f)
785 size_t size = 1048576;
789 setvbuf(f, buf, _IOFBF, size);
793 static void clear_file_buffer(void *buf)
798 static void *set_file_buffer(FILE *f)
803 static void clear_file_buffer(void *buf)
808 void free_log(struct io_log *log)
810 while (!flist_empty(&log->io_logs)) {
811 struct io_logs *cur_log;
813 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
814 flist_del_init(&cur_log->list);
820 free(log->pending->log);
830 uint64_t hist_sum(int j, int stride, uint64_t *io_u_plat,
831 uint64_t *io_u_plat_last)
836 if (io_u_plat_last) {
837 for (k = sum = 0; k < stride; k++)
838 sum += io_u_plat[j + k] - io_u_plat_last[j + k];
840 for (k = sum = 0; k < stride; k++)
841 sum += io_u_plat[j + k];
847 static void flush_hist_samples(FILE *f, int hist_coarseness, void *samples,
848 uint64_t sample_size)
852 uint64_t i, j, nr_samples;
853 struct io_u_plat_entry *entry, *entry_before;
855 uint64_t *io_u_plat_before;
857 int stride = 1 << hist_coarseness;
862 s = __get_sample(samples, 0, 0);
863 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
865 nr_samples = sample_size / __log_entry_sz(log_offset);
867 for (i = 0; i < nr_samples; i++) {
868 s = __get_sample(samples, log_offset, i);
870 entry = s->data.plat_entry;
871 io_u_plat = entry->io_u_plat;
873 entry_before = flist_first_entry(&entry->list, struct io_u_plat_entry, list);
874 io_u_plat_before = entry_before->io_u_plat;
876 fprintf(f, "%lu, %u, %llu, ", (unsigned long) s->time,
877 io_sample_ddir(s), (unsigned long long) s->bs);
878 for (j = 0; j < FIO_IO_U_PLAT_NR - stride; j += stride) {
879 fprintf(f, "%llu, ", (unsigned long long)
880 hist_sum(j, stride, io_u_plat, io_u_plat_before));
882 fprintf(f, "%llu\n", (unsigned long long)
883 hist_sum(FIO_IO_U_PLAT_NR - stride, stride, io_u_plat,
886 flist_del(&entry_before->list);
891 void flush_samples(FILE *f, void *samples, uint64_t sample_size)
895 uint64_t i, nr_samples;
900 s = __get_sample(samples, 0, 0);
901 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
903 nr_samples = sample_size / __log_entry_sz(log_offset);
905 for (i = 0; i < nr_samples; i++) {
906 s = __get_sample(samples, log_offset, i);
909 fprintf(f, "%lu, %" PRId64 ", %u, %llu, %u\n",
910 (unsigned long) s->time,
912 io_sample_ddir(s), (unsigned long long) s->bs, s->priority_bit);
914 struct io_sample_offset *so = (void *) s;
916 fprintf(f, "%lu, %" PRId64 ", %u, %llu, %llu, %u\n",
917 (unsigned long) s->time,
919 io_sample_ddir(s), (unsigned long long) s->bs,
920 (unsigned long long) so->offset, s->priority_bit);
927 struct iolog_flush_data {
928 struct workqueue_work work;
935 #define GZ_CHUNK 131072
937 static struct iolog_compress *get_new_chunk(unsigned int seq)
939 struct iolog_compress *c;
941 c = malloc(sizeof(*c));
942 INIT_FLIST_HEAD(&c->list);
943 c->buf = malloc(GZ_CHUNK);
949 static void free_chunk(struct iolog_compress *ic)
955 static int z_stream_init(z_stream *stream, int gz_hdr)
959 memset(stream, 0, sizeof(*stream));
960 stream->zalloc = Z_NULL;
961 stream->zfree = Z_NULL;
962 stream->opaque = Z_NULL;
963 stream->next_in = Z_NULL;
966 * zlib magic - add 32 for auto-detection of gz header or not,
967 * if we decide to store files in a gzip friendly format.
972 if (inflateInit2(stream, wbits) != Z_OK)
978 struct inflate_chunk_iter {
987 static void finish_chunk(z_stream *stream, FILE *f,
988 struct inflate_chunk_iter *iter)
992 ret = inflateEnd(stream);
994 log_err("fio: failed to end log inflation seq %d (%d)\n",
997 flush_samples(f, iter->buf, iter->buf_used);
1000 iter->buf_size = iter->buf_used = 0;
1004 * Iterative chunk inflation. Handles cases where we cross into a new
1005 * sequence, doing flush finish of previous chunk if needed.
1007 static size_t inflate_chunk(struct iolog_compress *ic, int gz_hdr, FILE *f,
1008 z_stream *stream, struct inflate_chunk_iter *iter)
1012 dprint(FD_COMPRESS, "inflate chunk size=%lu, seq=%u\n",
1013 (unsigned long) ic->len, ic->seq);
1015 if (ic->seq != iter->seq) {
1017 finish_chunk(stream, f, iter);
1019 z_stream_init(stream, gz_hdr);
1020 iter->seq = ic->seq;
1023 stream->avail_in = ic->len;
1024 stream->next_in = ic->buf;
1026 if (!iter->buf_size) {
1027 iter->buf_size = iter->chunk_sz;
1028 iter->buf = malloc(iter->buf_size);
1031 while (stream->avail_in) {
1032 size_t this_out = iter->buf_size - iter->buf_used;
1035 stream->avail_out = this_out;
1036 stream->next_out = iter->buf + iter->buf_used;
1038 err = inflate(stream, Z_NO_FLUSH);
1040 log_err("fio: failed inflating log: %d\n", err);
1045 iter->buf_used += this_out - stream->avail_out;
1047 if (!stream->avail_out) {
1048 iter->buf_size += iter->chunk_sz;
1049 iter->buf = realloc(iter->buf, iter->buf_size);
1053 if (err == Z_STREAM_END)
1057 ret = (void *) stream->next_in - ic->buf;
1059 dprint(FD_COMPRESS, "inflated to size=%lu\n", (unsigned long) iter->buf_size);
1065 * Inflate stored compressed chunks, or write them directly to the log
1066 * file if so instructed.
1068 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1070 struct inflate_chunk_iter iter = { .chunk_sz = log->log_gz, };
1073 while (!flist_empty(&log->chunk_list)) {
1074 struct iolog_compress *ic;
1076 ic = flist_first_entry(&log->chunk_list, struct iolog_compress, list);
1077 flist_del(&ic->list);
1079 if (log->log_gz_store) {
1082 dprint(FD_COMPRESS, "log write chunk size=%lu, "
1083 "seq=%u\n", (unsigned long) ic->len, ic->seq);
1085 ret = fwrite(ic->buf, ic->len, 1, f);
1086 if (ret != 1 || ferror(f)) {
1088 log_err("fio: error writing compressed log\n");
1091 inflate_chunk(ic, log->log_gz_store, f, &stream, &iter);
1097 finish_chunk(&stream, f, &iter);
1105 * Open compressed log file and decompress the stored chunks and
1106 * write them to stdout. The chunks are stored sequentially in the
1107 * file, so we iterate over them and do them one-by-one.
1109 int iolog_file_inflate(const char *file)
1111 struct inflate_chunk_iter iter = { .chunk_sz = 64 * 1024 * 1024, };
1112 struct iolog_compress ic;
1120 f = fopen(file, "r");
1126 if (stat(file, &sb) < 0) {
1132 ic.buf = buf = malloc(sb.st_size);
1133 ic.len = sb.st_size;
1136 ret = fread(ic.buf, ic.len, 1, f);
1137 if (ret == 0 && ferror(f)) {
1142 } else if (ferror(f) || (!feof(f) && ret != 1)) {
1143 log_err("fio: short read on reading log\n");
1152 * Each chunk will return Z_STREAM_END. We don't know how many
1153 * chunks are in the file, so we just keep looping and incrementing
1154 * the sequence number until we have consumed the whole compressed
1161 iret = inflate_chunk(&ic, 1, stdout, &stream, &iter);
1174 finish_chunk(&stream, stdout, &iter);
1184 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1189 int iolog_file_inflate(const char *file)
1191 log_err("fio: log inflation not possible without zlib\n");
1197 void flush_log(struct io_log *log, bool do_append)
1203 f = fopen(log->filename, "w");
1205 f = fopen(log->filename, "a");
1207 perror("fopen log");
1211 buf = set_file_buffer(f);
1213 inflate_gz_chunks(log, f);
1215 while (!flist_empty(&log->io_logs)) {
1216 struct io_logs *cur_log;
1218 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1219 flist_del_init(&cur_log->list);
1221 if (log->td && log == log->td->clat_hist_log)
1222 flush_hist_samples(f, log->hist_coarseness, cur_log->log,
1223 log_sample_sz(log, cur_log));
1225 flush_samples(f, cur_log->log, log_sample_sz(log, cur_log));
1231 clear_file_buffer(buf);
1234 static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
1236 if (td->flags & TD_F_COMPRESS_LOG)
1240 if (fio_trylock_file(log->filename))
1243 fio_lock_file(log->filename);
1245 if (td->client_type == FIO_CLIENT_TYPE_GUI || is_backend)
1246 fio_send_iolog(td, log, log->filename);
1248 flush_log(log, !td->o.per_job_logs);
1250 fio_unlock_file(log->filename);
1255 size_t log_chunk_sizes(struct io_log *log)
1257 struct flist_head *entry;
1260 if (flist_empty(&log->chunk_list))
1264 pthread_mutex_lock(&log->chunk_lock);
1265 flist_for_each(entry, &log->chunk_list) {
1266 struct iolog_compress *c;
1268 c = flist_entry(entry, struct iolog_compress, list);
1271 pthread_mutex_unlock(&log->chunk_lock);
1277 static void iolog_put_deferred(struct io_log *log, void *ptr)
1282 pthread_mutex_lock(&log->deferred_free_lock);
1283 if (log->deferred < IOLOG_MAX_DEFER) {
1284 log->deferred_items[log->deferred] = ptr;
1286 } else if (!fio_did_warn(FIO_WARN_IOLOG_DROP))
1287 log_err("fio: had to drop log entry free\n");
1288 pthread_mutex_unlock(&log->deferred_free_lock);
1291 static void iolog_free_deferred(struct io_log *log)
1298 pthread_mutex_lock(&log->deferred_free_lock);
1300 for (i = 0; i < log->deferred; i++) {
1301 free(log->deferred_items[i]);
1302 log->deferred_items[i] = NULL;
1306 pthread_mutex_unlock(&log->deferred_free_lock);
1309 static int gz_work(struct iolog_flush_data *data)
1311 struct iolog_compress *c = NULL;
1312 struct flist_head list;
1318 INIT_FLIST_HEAD(&list);
1320 memset(&stream, 0, sizeof(stream));
1321 stream.zalloc = Z_NULL;
1322 stream.zfree = Z_NULL;
1323 stream.opaque = Z_NULL;
1325 ret = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
1327 log_err("fio: failed to init gz stream\n");
1331 seq = ++data->log->chunk_seq;
1333 stream.next_in = (void *) data->samples;
1334 stream.avail_in = data->nr_samples * log_entry_sz(data->log);
1336 dprint(FD_COMPRESS, "deflate input size=%lu, seq=%u, log=%s\n",
1337 (unsigned long) stream.avail_in, seq,
1338 data->log->filename);
1341 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1342 (unsigned long) c->len);
1343 c = get_new_chunk(seq);
1344 stream.avail_out = GZ_CHUNK;
1345 stream.next_out = c->buf;
1346 ret = deflate(&stream, Z_NO_FLUSH);
1348 log_err("fio: deflate log (%d)\n", ret);
1353 c->len = GZ_CHUNK - stream.avail_out;
1354 flist_add_tail(&c->list, &list);
1356 } while (stream.avail_in);
1358 stream.next_out = c->buf + c->len;
1359 stream.avail_out = GZ_CHUNK - c->len;
1361 ret = deflate(&stream, Z_FINISH);
1364 * Z_BUF_ERROR is special, it just means we need more
1365 * output space. We'll handle that below. Treat any other
1368 if (ret != Z_BUF_ERROR) {
1369 log_err("fio: deflate log (%d)\n", ret);
1370 flist_del(&c->list);
1377 c->len = GZ_CHUNK - stream.avail_out;
1379 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq, (unsigned long) c->len);
1381 if (ret != Z_STREAM_END) {
1383 c = get_new_chunk(seq);
1384 stream.avail_out = GZ_CHUNK;
1385 stream.next_out = c->buf;
1386 ret = deflate(&stream, Z_FINISH);
1387 c->len = GZ_CHUNK - stream.avail_out;
1389 flist_add_tail(&c->list, &list);
1390 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1391 (unsigned long) c->len);
1392 } while (ret != Z_STREAM_END);
1395 dprint(FD_COMPRESS, "deflated to size=%lu\n", (unsigned long) total);
1397 ret = deflateEnd(&stream);
1399 log_err("fio: deflateEnd %d\n", ret);
1401 iolog_put_deferred(data->log, data->samples);
1403 if (!flist_empty(&list)) {
1404 pthread_mutex_lock(&data->log->chunk_lock);
1405 flist_splice_tail(&list, &data->log->chunk_list);
1406 pthread_mutex_unlock(&data->log->chunk_lock);
1415 while (!flist_empty(&list)) {
1416 c = flist_first_entry(list.next, struct iolog_compress, list);
1417 flist_del(&c->list);
1425 * Invoked from our compress helper thread, when logging would have exceeded
1426 * the specified memory limitation. Compresses the previously stored
1429 static int gz_work_async(struct submit_worker *sw, struct workqueue_work *work)
1431 return gz_work(container_of(work, struct iolog_flush_data, work));
1434 static int gz_init_worker(struct submit_worker *sw)
1436 struct thread_data *td = sw->wq->td;
1438 if (!fio_option_is_set(&td->o, log_gz_cpumask))
1441 if (fio_setaffinity(gettid(), td->o.log_gz_cpumask) == -1) {
1442 log_err("gz: failed to set CPU affinity\n");
1449 static struct workqueue_ops log_compress_wq_ops = {
1450 .fn = gz_work_async,
1451 .init_worker_fn = gz_init_worker,
1455 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1457 if (!(td->flags & TD_F_COMPRESS_LOG))
1460 workqueue_init(td, &td->log_compress_wq, &log_compress_wq_ops, 1, sk_out);
1464 void iolog_compress_exit(struct thread_data *td)
1466 if (!(td->flags & TD_F_COMPRESS_LOG))
1469 workqueue_exit(&td->log_compress_wq);
1473 * Queue work item to compress the existing log entries. We reset the
1474 * current log to a small size, and reference the existing log in the
1475 * data that we queue for compression. Once compression has been done,
1476 * this old log is freed. If called with finish == true, will not return
1477 * until the log compression has completed, and will flush all previous
1480 static int iolog_flush(struct io_log *log)
1482 struct iolog_flush_data *data;
1484 data = malloc(sizeof(*data));
1491 while (!flist_empty(&log->io_logs)) {
1492 struct io_logs *cur_log;
1494 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1495 flist_del_init(&cur_log->list);
1497 data->samples = cur_log->log;
1498 data->nr_samples = cur_log->nr_samples;
1509 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1511 struct iolog_flush_data *data;
1513 data = smalloc(sizeof(*data));
1519 data->samples = cur_log->log;
1520 data->nr_samples = cur_log->nr_samples;
1523 cur_log->nr_samples = cur_log->max_samples = 0;
1524 cur_log->log = NULL;
1526 workqueue_enqueue(&log->td->log_compress_wq, &data->work);
1528 iolog_free_deferred(log);
1534 static int iolog_flush(struct io_log *log)
1539 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1544 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1549 void iolog_compress_exit(struct thread_data *td)
1555 struct io_logs *iolog_cur_log(struct io_log *log)
1557 if (flist_empty(&log->io_logs))
1560 return flist_last_entry(&log->io_logs, struct io_logs, list);
1563 uint64_t iolog_nr_samples(struct io_log *iolog)
1565 struct flist_head *entry;
1568 flist_for_each(entry, &iolog->io_logs) {
1569 struct io_logs *cur_log;
1571 cur_log = flist_entry(entry, struct io_logs, list);
1572 ret += cur_log->nr_samples;
1578 static int __write_log(struct thread_data *td, struct io_log *log, int try)
1581 return finish_log(td, log, try);
1586 static int write_iops_log(struct thread_data *td, int try, bool unit_log)
1590 if (per_unit_log(td->iops_log) != unit_log)
1593 ret = __write_log(td, td->iops_log, try);
1595 td->iops_log = NULL;
1600 static int write_slat_log(struct thread_data *td, int try, bool unit_log)
1607 ret = __write_log(td, td->slat_log, try);
1609 td->slat_log = NULL;
1614 static int write_clat_log(struct thread_data *td, int try, bool unit_log)
1621 ret = __write_log(td, td->clat_log, try);
1623 td->clat_log = NULL;
1628 static int write_clat_hist_log(struct thread_data *td, int try, bool unit_log)
1635 ret = __write_log(td, td->clat_hist_log, try);
1637 td->clat_hist_log = NULL;
1642 static int write_lat_log(struct thread_data *td, int try, bool unit_log)
1649 ret = __write_log(td, td->lat_log, try);
1656 static int write_bandw_log(struct thread_data *td, int try, bool unit_log)
1660 if (per_unit_log(td->bw_log) != unit_log)
1663 ret = __write_log(td, td->bw_log, try);
1676 CLAT_HIST_LOG_MASK = 32,
1683 int (*fn)(struct thread_data *, int, bool);
1686 static struct log_type log_types[] = {
1688 .mask = BW_LOG_MASK,
1689 .fn = write_bandw_log,
1692 .mask = LAT_LOG_MASK,
1693 .fn = write_lat_log,
1696 .mask = SLAT_LOG_MASK,
1697 .fn = write_slat_log,
1700 .mask = CLAT_LOG_MASK,
1701 .fn = write_clat_log,
1704 .mask = IOPS_LOG_MASK,
1705 .fn = write_iops_log,
1708 .mask = CLAT_HIST_LOG_MASK,
1709 .fn = write_clat_hist_log,
1713 void td_writeout_logs(struct thread_data *td, bool unit_logs)
1715 unsigned int log_mask = 0;
1716 unsigned int log_left = ALL_LOG_NR;
1719 old_state = td_bump_runstate(td, TD_FINISHING);
1721 finalize_logs(td, unit_logs);
1724 int prev_log_left = log_left;
1726 for (i = 0; i < ALL_LOG_NR && log_left; i++) {
1727 struct log_type *lt = &log_types[i];
1730 if (!(log_mask & lt->mask)) {
1731 ret = lt->fn(td, log_left != 1, unit_logs);
1734 log_mask |= lt->mask;
1739 if (prev_log_left == log_left)
1743 td_restore_runstate(td, old_state);
1746 void fio_writeout_logs(bool unit_logs)
1748 struct thread_data *td;
1752 td_writeout_logs(td, unit_logs);