2 * Code related to writing an iolog of what a thread is doing, and to
3 * later read that back and replay
22 #include "lib/roundup.h"
24 #include <netinet/in.h>
25 #include <netinet/tcp.h>
26 #include <arpa/inet.h>
28 #include <sys/socket.h>
31 static int iolog_flush(struct io_log *log);
33 static const char iolog_ver2[] = "fio version 2 iolog";
35 void queue_io_piece(struct thread_data *td, struct io_piece *ipo)
37 flist_add_tail(&ipo->list, &td->io_log_list);
38 td->total_io_size += ipo->len;
41 void log_io_u(const struct thread_data *td, const struct io_u *io_u)
43 if (!td->o.write_iolog_file)
46 fprintf(td->iolog_f, "%s %s %llu %llu\n", io_u->file->file_name,
47 io_ddir_name(io_u->ddir),
48 io_u->offset, io_u->buflen);
51 void log_file(struct thread_data *td, struct fio_file *f,
52 enum file_log_act what)
54 const char *act[] = { "add", "open", "close" };
58 if (!td->o.write_iolog_file)
63 * this happens on the pre-open/close done before the job starts
68 fprintf(td->iolog_f, "%s %s\n", f->file_name, act[what]);
71 static void iolog_delay(struct thread_data *td, unsigned long delay)
73 uint64_t usec = utime_since_now(&td->last_issue);
74 unsigned long orig_delay = delay;
78 if (delay < td->time_offset) {
83 delay -= td->time_offset;
89 fio_gettime(&ts, NULL);
90 while (delay && !td->terminate) {
92 if (this_delay > 500000)
95 usec_sleep(td, this_delay);
99 usec = utime_since_now(&ts);
100 if (usec > orig_delay)
101 td->time_offset = usec - orig_delay;
106 static int ipo_special(struct thread_data *td, struct io_piece *ipo)
114 if (ipo->ddir != DDIR_INVAL)
117 f = td->files[ipo->fileno];
119 switch (ipo->file_action) {
120 case FIO_LOG_OPEN_FILE:
121 if (td->o.replay_redirect && fio_file_open(f)) {
122 dprint(FD_FILE, "iolog: ignoring re-open of file %s\n",
126 ret = td_io_open_file(td, f);
129 td_verror(td, ret, "iolog open file");
131 case FIO_LOG_CLOSE_FILE:
132 td_io_close_file(td, f);
134 case FIO_LOG_UNLINK_FILE:
135 td_io_unlink_file(td, f);
138 log_err("fio: bad file action %d\n", ipo->file_action);
145 static bool read_iolog2(struct thread_data *td);
147 int read_iolog_get(struct thread_data *td, struct io_u *io_u)
149 struct io_piece *ipo;
150 unsigned long elapsed;
152 while (!flist_empty(&td->io_log_list)) {
155 if (!td->io_log_blktrace && td->o.read_iolog_chunked) {
156 if (td->io_log_checkmark == td->io_log_current) {
157 if (!read_iolog2(td))
160 td->io_log_current--;
162 ipo = flist_first_entry(&td->io_log_list, struct io_piece, list);
163 flist_del(&ipo->list);
164 remove_trim_entry(td, ipo);
166 ret = ipo_special(td, ipo);
170 } else if (ret > 0) {
175 io_u->ddir = ipo->ddir;
176 if (ipo->ddir != DDIR_WAIT) {
177 io_u->offset = ipo->offset;
178 io_u->verify_offset = ipo->offset;
179 io_u->buflen = ipo->len;
180 io_u->file = td->files[ipo->fileno];
181 get_file(io_u->file);
182 dprint(FD_IO, "iolog: get %llu/%llu/%s\n", io_u->offset,
183 io_u->buflen, io_u->file->file_name);
185 iolog_delay(td, ipo->delay);
187 elapsed = mtime_since_genesis();
188 if (ipo->delay > elapsed)
189 usec_sleep(td, (ipo->delay - elapsed) * 1000);
194 if (io_u->ddir != DDIR_WAIT)
202 void prune_io_piece_log(struct thread_data *td)
204 struct io_piece *ipo;
205 struct fio_rb_node *n;
207 while ((n = rb_first(&td->io_hist_tree)) != NULL) {
208 ipo = rb_entry(n, struct io_piece, rb_node);
209 rb_erase(n, &td->io_hist_tree);
210 remove_trim_entry(td, ipo);
215 while (!flist_empty(&td->io_hist_list)) {
216 ipo = flist_first_entry(&td->io_hist_list, struct io_piece, list);
217 flist_del(&ipo->list);
218 remove_trim_entry(td, ipo);
225 * log a successful write, so we can unwind the log for verify
227 void log_io_piece(struct thread_data *td, struct io_u *io_u)
229 struct fio_rb_node **p, *parent;
230 struct io_piece *ipo, *__ipo;
232 ipo = calloc(1, sizeof(struct io_piece));
234 ipo->file = io_u->file;
235 ipo->offset = io_u->offset;
236 ipo->len = io_u->buflen;
237 ipo->numberio = io_u->numberio;
238 ipo->flags = IP_F_IN_FLIGHT;
242 if (io_u_should_trim(td, io_u)) {
243 flist_add_tail(&ipo->trim_list, &td->trim_list);
248 * Only sort writes if we don't have a random map in which case we need
249 * to check for duplicate blocks and drop the old one, which we rely on
250 * the rb insert/lookup for handling.
252 if (file_randommap(td, ipo->file)) {
253 INIT_FLIST_HEAD(&ipo->list);
254 flist_add_tail(&ipo->list, &td->io_hist_list);
255 ipo->flags |= IP_F_ONLIST;
260 RB_CLEAR_NODE(&ipo->rb_node);
263 * Sort the entry into the verification list
266 p = &td->io_hist_tree.rb_node;
272 __ipo = rb_entry(parent, struct io_piece, rb_node);
273 if (ipo->file < __ipo->file)
275 else if (ipo->file > __ipo->file)
277 else if (ipo->offset < __ipo->offset) {
279 overlap = ipo->offset + ipo->len > __ipo->offset;
281 else if (ipo->offset > __ipo->offset) {
283 overlap = __ipo->offset + __ipo->len > ipo->offset;
289 dprint(FD_IO, "iolog: overlap %llu/%lu, %llu/%lu\n",
290 __ipo->offset, __ipo->len,
291 ipo->offset, ipo->len);
293 rb_erase(parent, &td->io_hist_tree);
294 remove_trim_entry(td, __ipo);
295 if (!(__ipo->flags & IP_F_IN_FLIGHT))
301 rb_link_node(&ipo->rb_node, parent, p);
302 rb_insert_color(&ipo->rb_node, &td->io_hist_tree);
303 ipo->flags |= IP_F_ONRB;
307 void unlog_io_piece(struct thread_data *td, struct io_u *io_u)
309 struct io_piece *ipo = io_u->ipo;
311 if (td->ts.nr_block_infos) {
312 uint32_t *info = io_u_block_info(td, io_u);
313 if (BLOCK_INFO_STATE(*info) < BLOCK_STATE_TRIM_FAILURE) {
314 if (io_u->ddir == DDIR_TRIM)
315 *info = BLOCK_INFO_SET_STATE(*info,
316 BLOCK_STATE_TRIM_FAILURE);
317 else if (io_u->ddir == DDIR_WRITE)
318 *info = BLOCK_INFO_SET_STATE(*info,
319 BLOCK_STATE_WRITE_FAILURE);
326 if (ipo->flags & IP_F_ONRB)
327 rb_erase(&ipo->rb_node, &td->io_hist_tree);
328 else if (ipo->flags & IP_F_ONLIST)
329 flist_del(&ipo->list);
336 void trim_io_piece(const struct io_u *io_u)
338 struct io_piece *ipo = io_u->ipo;
343 ipo->len = io_u->xfer_buflen - io_u->resid;
346 void write_iolog_close(struct thread_data *td)
355 td->iolog_buf = NULL;
358 static int64_t iolog_items_to_fetch(struct thread_data *td)
363 int64_t items_to_fetch;
365 if (!td->io_log_highmark)
369 fio_gettime(&now, NULL);
370 elapsed = ntime_since(&td->io_log_highmark_time, &now);
372 for_1s = (td->io_log_highmark - td->io_log_current) * 1000000000 / elapsed;
373 items_to_fetch = for_1s - td->io_log_current;
374 if (items_to_fetch < 0)
379 td->io_log_highmark = td->io_log_current + items_to_fetch;
380 td->io_log_checkmark = (td->io_log_highmark + 1) / 2;
381 fio_gettime(&td->io_log_highmark_time, NULL);
383 return items_to_fetch;
387 * Read version 2 iolog data. It is enhanced to include per-file logging,
390 static bool read_iolog2(struct thread_data *td)
392 unsigned long long offset;
394 int reads, writes, waits, fileno = 0, file_action = 0; /* stupid gcc */
395 char *rfname, *fname, *act;
398 bool realloc = false;
399 int64_t items_to_fetch = 0;
401 if (td->o.read_iolog_chunked) {
402 items_to_fetch = iolog_items_to_fetch(td);
408 * Read in the read iolog and store it, reuse the infrastructure
409 * for doing verifications.
412 rfname = fname = malloc(256+16);
413 act = malloc(256+16);
415 reads = writes = waits = 0;
416 while ((p = fgets(str, 4096, td->io_log_rfile)) != NULL) {
417 struct io_piece *ipo;
420 r = sscanf(p, "%256s %256s %llu %u", rfname, act, &offset,
423 if (td->o.replay_redirect)
424 fname = td->o.replay_redirect;
430 if (!strcmp(act, "wait"))
432 else if (!strcmp(act, "read"))
434 else if (!strcmp(act, "write"))
436 else if (!strcmp(act, "sync"))
438 else if (!strcmp(act, "datasync"))
440 else if (!strcmp(act, "trim"))
443 log_err("fio: bad iolog file action: %s\n",
447 fileno = get_fileno(td, fname);
450 if (!strcmp(act, "add")) {
451 if (td->o.replay_redirect &&
452 get_fileno(td, fname) != -1) {
453 dprint(FD_FILE, "iolog: ignoring"
454 " re-add of file %s\n", fname);
456 fileno = add_file(td, fname, td->subjob_number, 1);
457 file_action = FIO_LOG_ADD_FILE;
460 } else if (!strcmp(act, "open")) {
461 fileno = get_fileno(td, fname);
462 file_action = FIO_LOG_OPEN_FILE;
463 } else if (!strcmp(act, "close")) {
464 fileno = get_fileno(td, fname);
465 file_action = FIO_LOG_CLOSE_FILE;
467 log_err("fio: bad iolog file action: %s\n",
472 log_err("bad iolog2: %s\n", p);
478 else if (rw == DDIR_WRITE) {
480 * Don't add a write for ro mode
485 } else if (rw == DDIR_WAIT) {
489 } else if (rw == DDIR_INVAL) {
490 } else if (!ddir_sync(rw)) {
491 log_err("bad ddir: %d\n", rw);
498 ipo = calloc(1, sizeof(*ipo));
501 if (rw == DDIR_WAIT) {
504 if (td->o.replay_scale)
505 ipo->offset = offset / td->o.replay_scale;
507 ipo->offset = offset;
508 ipo_bytes_align(td->o.replay_align, ipo);
511 if (rw != DDIR_INVAL && bytes > td->o.max_bs[rw]) {
513 td->o.max_bs[rw] = bytes;
515 ipo->fileno = fileno;
516 ipo->file_action = file_action;
520 queue_io_piece(td, ipo);
522 if (td->o.read_iolog_chunked) {
523 td->io_log_current++;
525 if (items_to_fetch == 0)
534 if (td->o.read_iolog_chunked) {
535 td->io_log_highmark = td->io_log_current;
536 td->io_log_checkmark = (td->io_log_highmark + 1) / 2;
537 fio_gettime(&td->io_log_highmark_time, NULL);
540 if (writes && read_only) {
541 log_err("fio: <%s> skips replay of %d writes due to"
542 " read-only\n", td->o.name, writes);
546 if (td->o.read_iolog_chunked) {
547 if (td->io_log_current == 0) {
550 td->o.td_ddir = TD_DDIR_RW;
551 if (realloc && td->orig_buffer)
555 init_io_u_buffers(td);
560 if (!reads && !writes && !waits)
562 else if (reads && !writes)
563 td->o.td_ddir = TD_DDIR_READ;
564 else if (!reads && writes)
565 td->o.td_ddir = TD_DDIR_WRITE;
567 td->o.td_ddir = TD_DDIR_RW;
572 static bool is_socket(const char *path)
577 r = stat(path, &buf);
581 return S_ISSOCK(buf.st_mode);
584 static int open_socket(const char *path)
586 struct sockaddr_un addr;
589 fd = socket(AF_UNIX, SOCK_STREAM, 0);
593 addr.sun_family = AF_UNIX;
594 if (snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", path) >=
595 sizeof(addr.sun_path)) {
596 log_err("%s: path name %s is too long for a Unix socket\n",
600 ret = connect(fd, (const struct sockaddr *)&addr, strlen(path) + sizeof(addr.sun_family));
609 * open iolog, check version, and call appropriate parser
611 static bool init_iolog_read(struct thread_data *td, char *fname)
613 char buffer[256], *p;
616 dprint(FD_IO, "iolog: name=%s\n", fname);
618 if (is_socket(fname)) {
621 fd = open_socket(fname);
624 } else if (!strcmp(fname, "-")) {
627 f = fopen(fname, "r");
632 perror("fopen read iolog");
636 p = fgets(buffer, sizeof(buffer), f);
638 td_verror(td, errno, "iolog read");
639 log_err("fio: unable to read iolog\n");
645 * version 2 of the iolog stores a specific string as the
646 * first line, check for that
648 if (!strncmp(iolog_ver2, buffer, strlen(iolog_ver2))) {
649 free_release_files(td);
650 td->io_log_rfile = f;
651 return read_iolog2(td);
654 log_err("fio: iolog version 1 is no longer supported\n");
660 * Set up a log for storing io patterns.
662 static bool init_iolog_write(struct thread_data *td)
668 f = fopen(td->o.write_iolog_file, "a");
670 perror("fopen write iolog");
675 * That's it for writing, setup a log buffer and we're done.
678 td->iolog_buf = malloc(8192);
679 setvbuf(f, td->iolog_buf, _IOFBF, 8192);
682 * write our version line
684 if (fprintf(f, "%s\n", iolog_ver2) < 0) {
685 perror("iolog init\n");
690 * add all known files
692 for_each_file(td, ff, i)
693 log_file(td, ff, FIO_LOG_ADD_FILE);
698 bool init_iolog(struct thread_data *td)
702 if (td->o.read_iolog_file) {
704 char * fname = get_name_by_idx(td->o.read_iolog_file, td->subjob_number);
707 * Check if it's a blktrace file and load that if possible.
708 * Otherwise assume it's a normal log file and load that.
710 if (is_blktrace(fname, &need_swap)) {
711 td->io_log_blktrace = 1;
712 ret = load_blktrace(td, fname, need_swap);
714 td->io_log_blktrace = 0;
715 ret = init_iolog_read(td, fname);
717 } else if (td->o.write_iolog_file)
718 ret = init_iolog_write(td);
723 td_verror(td, EINVAL, "failed initializing iolog");
728 void setup_log(struct io_log **log, struct log_params *p,
729 const char *filename)
733 struct io_u_plat_entry *entry;
734 struct flist_head *list;
736 l = scalloc(1, sizeof(*l));
737 INIT_FLIST_HEAD(&l->io_logs);
738 l->log_type = p->log_type;
739 l->log_offset = p->log_offset;
740 l->log_prio = p->log_prio;
741 l->log_gz = p->log_gz;
742 l->log_gz_store = p->log_gz_store;
743 l->avg_msec = p->avg_msec;
744 l->hist_msec = p->hist_msec;
745 l->hist_coarseness = p->hist_coarseness;
746 l->filename = strdup(filename);
749 /* Initialize histogram lists for each r/w direction,
750 * with initial io_u_plat of all zeros:
752 for (i = 0; i < DDIR_RWDIR_CNT; i++) {
753 list = &l->hist_window[i].list;
754 INIT_FLIST_HEAD(list);
755 entry = calloc(1, sizeof(struct io_u_plat_entry));
756 flist_add(&entry->list, list);
759 if (l->td && l->td->o.io_submit_mode != IO_MODE_OFFLOAD) {
760 unsigned int def_samples = DEF_LOG_ENTRIES;
763 __p = calloc(1, sizeof(*l->pending));
764 if (l->td->o.iodepth > DEF_LOG_ENTRIES)
765 def_samples = roundup_pow2(l->td->o.iodepth);
766 __p->max_samples = def_samples;
767 __p->log = calloc(__p->max_samples, log_entry_sz(l));
772 l->log_ddir_mask = LOG_OFFSET_SAMPLE_BIT;
774 l->log_ddir_mask |= LOG_PRIO_SAMPLE_BIT;
776 INIT_FLIST_HEAD(&l->chunk_list);
778 if (l->log_gz && !p->td)
780 else if (l->log_gz || l->log_gz_store) {
781 mutex_init_pshared(&l->chunk_lock);
782 mutex_init_pshared(&l->deferred_free_lock);
783 p->td->flags |= TD_F_COMPRESS_LOG;
789 #ifdef CONFIG_SETVBUF
790 static void *set_file_buffer(FILE *f)
792 size_t size = 1048576;
796 setvbuf(f, buf, _IOFBF, size);
800 static void clear_file_buffer(void *buf)
805 static void *set_file_buffer(FILE *f)
810 static void clear_file_buffer(void *buf)
815 void free_log(struct io_log *log)
817 while (!flist_empty(&log->io_logs)) {
818 struct io_logs *cur_log;
820 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
821 flist_del_init(&cur_log->list);
827 free(log->pending->log);
837 uint64_t hist_sum(int j, int stride, uint64_t *io_u_plat,
838 uint64_t *io_u_plat_last)
843 if (io_u_plat_last) {
844 for (k = sum = 0; k < stride; k++)
845 sum += io_u_plat[j + k] - io_u_plat_last[j + k];
847 for (k = sum = 0; k < stride; k++)
848 sum += io_u_plat[j + k];
854 static void flush_hist_samples(FILE *f, int hist_coarseness, void *samples,
855 uint64_t sample_size)
859 uint64_t i, j, nr_samples;
860 struct io_u_plat_entry *entry, *entry_before;
862 uint64_t *io_u_plat_before;
864 int stride = 1 << hist_coarseness;
869 s = __get_sample(samples, 0, 0);
870 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
872 nr_samples = sample_size / __log_entry_sz(log_offset);
874 for (i = 0; i < nr_samples; i++) {
875 s = __get_sample(samples, log_offset, i);
877 entry = s->data.plat_entry;
878 io_u_plat = entry->io_u_plat;
880 entry_before = flist_first_entry(&entry->list, struct io_u_plat_entry, list);
881 io_u_plat_before = entry_before->io_u_plat;
883 fprintf(f, "%lu, %u, %llu, ", (unsigned long) s->time,
884 io_sample_ddir(s), (unsigned long long) s->bs);
885 for (j = 0; j < FIO_IO_U_PLAT_NR - stride; j += stride) {
886 fprintf(f, "%llu, ", (unsigned long long)
887 hist_sum(j, stride, io_u_plat, io_u_plat_before));
889 fprintf(f, "%llu\n", (unsigned long long)
890 hist_sum(FIO_IO_U_PLAT_NR - stride, stride, io_u_plat,
893 flist_del(&entry_before->list);
898 void flush_samples(FILE *f, void *samples, uint64_t sample_size)
901 int log_offset, log_prio;
902 uint64_t i, nr_samples;
903 unsigned int prio_val;
909 s = __get_sample(samples, 0, 0);
910 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
911 log_prio = (s->__ddir & LOG_PRIO_SAMPLE_BIT) != 0;
915 fmt = "%lu, %" PRId64 ", %u, %llu, %llu, 0x%04x\n";
917 fmt = "%lu, %" PRId64 ", %u, %llu, %llu, %u\n";
920 fmt = "%lu, %" PRId64 ", %u, %llu, 0x%04x\n";
922 fmt = "%lu, %" PRId64 ", %u, %llu, %u\n";
925 nr_samples = sample_size / __log_entry_sz(log_offset);
927 for (i = 0; i < nr_samples; i++) {
928 s = __get_sample(samples, log_offset, i);
931 prio_val = s->priority;
933 prio_val = ioprio_value_is_class_rt(s->priority);
937 (unsigned long) s->time,
939 io_sample_ddir(s), (unsigned long long) s->bs,
942 struct io_sample_offset *so = (void *) s;
945 (unsigned long) s->time,
947 io_sample_ddir(s), (unsigned long long) s->bs,
948 (unsigned long long) so->offset,
956 struct iolog_flush_data {
957 struct workqueue_work work;
964 #define GZ_CHUNK 131072
966 static struct iolog_compress *get_new_chunk(unsigned int seq)
968 struct iolog_compress *c;
970 c = malloc(sizeof(*c));
971 INIT_FLIST_HEAD(&c->list);
972 c->buf = malloc(GZ_CHUNK);
978 static void free_chunk(struct iolog_compress *ic)
984 static int z_stream_init(z_stream *stream, int gz_hdr)
988 memset(stream, 0, sizeof(*stream));
989 stream->zalloc = Z_NULL;
990 stream->zfree = Z_NULL;
991 stream->opaque = Z_NULL;
992 stream->next_in = Z_NULL;
995 * zlib magic - add 32 for auto-detection of gz header or not,
996 * if we decide to store files in a gzip friendly format.
1001 if (inflateInit2(stream, wbits) != Z_OK)
1007 struct inflate_chunk_iter {
1016 static void finish_chunk(z_stream *stream, FILE *f,
1017 struct inflate_chunk_iter *iter)
1021 ret = inflateEnd(stream);
1023 log_err("fio: failed to end log inflation seq %d (%d)\n",
1026 flush_samples(f, iter->buf, iter->buf_used);
1029 iter->buf_size = iter->buf_used = 0;
1033 * Iterative chunk inflation. Handles cases where we cross into a new
1034 * sequence, doing flush finish of previous chunk if needed.
1036 static size_t inflate_chunk(struct iolog_compress *ic, int gz_hdr, FILE *f,
1037 z_stream *stream, struct inflate_chunk_iter *iter)
1041 dprint(FD_COMPRESS, "inflate chunk size=%lu, seq=%u\n",
1042 (unsigned long) ic->len, ic->seq);
1044 if (ic->seq != iter->seq) {
1046 finish_chunk(stream, f, iter);
1048 z_stream_init(stream, gz_hdr);
1049 iter->seq = ic->seq;
1052 stream->avail_in = ic->len;
1053 stream->next_in = ic->buf;
1055 if (!iter->buf_size) {
1056 iter->buf_size = iter->chunk_sz;
1057 iter->buf = malloc(iter->buf_size);
1060 while (stream->avail_in) {
1061 size_t this_out = iter->buf_size - iter->buf_used;
1064 stream->avail_out = this_out;
1065 stream->next_out = iter->buf + iter->buf_used;
1067 err = inflate(stream, Z_NO_FLUSH);
1069 log_err("fio: failed inflating log: %d\n", err);
1074 iter->buf_used += this_out - stream->avail_out;
1076 if (!stream->avail_out) {
1077 iter->buf_size += iter->chunk_sz;
1078 iter->buf = realloc(iter->buf, iter->buf_size);
1082 if (err == Z_STREAM_END)
1086 ret = (void *) stream->next_in - ic->buf;
1088 dprint(FD_COMPRESS, "inflated to size=%lu\n", (unsigned long) iter->buf_size);
1094 * Inflate stored compressed chunks, or write them directly to the log
1095 * file if so instructed.
1097 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1099 struct inflate_chunk_iter iter = { .chunk_sz = log->log_gz, };
1102 while (!flist_empty(&log->chunk_list)) {
1103 struct iolog_compress *ic;
1105 ic = flist_first_entry(&log->chunk_list, struct iolog_compress, list);
1106 flist_del(&ic->list);
1108 if (log->log_gz_store) {
1111 dprint(FD_COMPRESS, "log write chunk size=%lu, "
1112 "seq=%u\n", (unsigned long) ic->len, ic->seq);
1114 ret = fwrite(ic->buf, ic->len, 1, f);
1115 if (ret != 1 || ferror(f)) {
1117 log_err("fio: error writing compressed log\n");
1120 inflate_chunk(ic, log->log_gz_store, f, &stream, &iter);
1126 finish_chunk(&stream, f, &iter);
1134 * Open compressed log file and decompress the stored chunks and
1135 * write them to stdout. The chunks are stored sequentially in the
1136 * file, so we iterate over them and do them one-by-one.
1138 int iolog_file_inflate(const char *file)
1140 struct inflate_chunk_iter iter = { .chunk_sz = 64 * 1024 * 1024, };
1141 struct iolog_compress ic;
1149 f = fopen(file, "r");
1155 if (stat(file, &sb) < 0) {
1161 ic.buf = buf = malloc(sb.st_size);
1162 ic.len = sb.st_size;
1165 ret = fread(ic.buf, ic.len, 1, f);
1166 if (ret == 0 && ferror(f)) {
1171 } else if (ferror(f) || (!feof(f) && ret != 1)) {
1172 log_err("fio: short read on reading log\n");
1181 * Each chunk will return Z_STREAM_END. We don't know how many
1182 * chunks are in the file, so we just keep looping and incrementing
1183 * the sequence number until we have consumed the whole compressed
1190 iret = inflate_chunk(&ic, 1, stdout, &stream, &iter);
1203 finish_chunk(&stream, stdout, &iter);
1213 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1218 int iolog_file_inflate(const char *file)
1220 log_err("fio: log inflation not possible without zlib\n");
1226 void flush_log(struct io_log *log, bool do_append)
1232 f = fopen(log->filename, "w");
1234 f = fopen(log->filename, "a");
1236 perror("fopen log");
1240 buf = set_file_buffer(f);
1242 inflate_gz_chunks(log, f);
1244 while (!flist_empty(&log->io_logs)) {
1245 struct io_logs *cur_log;
1247 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1248 flist_del_init(&cur_log->list);
1250 if (log->td && log == log->td->clat_hist_log)
1251 flush_hist_samples(f, log->hist_coarseness, cur_log->log,
1252 log_sample_sz(log, cur_log));
1254 flush_samples(f, cur_log->log, log_sample_sz(log, cur_log));
1260 clear_file_buffer(buf);
1263 static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
1265 if (td->flags & TD_F_COMPRESS_LOG)
1269 if (fio_trylock_file(log->filename))
1272 fio_lock_file(log->filename);
1274 if (td->client_type == FIO_CLIENT_TYPE_GUI || is_backend)
1275 fio_send_iolog(td, log, log->filename);
1277 flush_log(log, !td->o.per_job_logs);
1279 fio_unlock_file(log->filename);
1284 size_t log_chunk_sizes(struct io_log *log)
1286 struct flist_head *entry;
1289 if (flist_empty(&log->chunk_list))
1293 pthread_mutex_lock(&log->chunk_lock);
1294 flist_for_each(entry, &log->chunk_list) {
1295 struct iolog_compress *c;
1297 c = flist_entry(entry, struct iolog_compress, list);
1300 pthread_mutex_unlock(&log->chunk_lock);
1306 static void iolog_put_deferred(struct io_log *log, void *ptr)
1311 pthread_mutex_lock(&log->deferred_free_lock);
1312 if (log->deferred < IOLOG_MAX_DEFER) {
1313 log->deferred_items[log->deferred] = ptr;
1315 } else if (!fio_did_warn(FIO_WARN_IOLOG_DROP))
1316 log_err("fio: had to drop log entry free\n");
1317 pthread_mutex_unlock(&log->deferred_free_lock);
1320 static void iolog_free_deferred(struct io_log *log)
1327 pthread_mutex_lock(&log->deferred_free_lock);
1329 for (i = 0; i < log->deferred; i++) {
1330 free(log->deferred_items[i]);
1331 log->deferred_items[i] = NULL;
1335 pthread_mutex_unlock(&log->deferred_free_lock);
1338 static int gz_work(struct iolog_flush_data *data)
1340 struct iolog_compress *c = NULL;
1341 struct flist_head list;
1347 INIT_FLIST_HEAD(&list);
1349 memset(&stream, 0, sizeof(stream));
1350 stream.zalloc = Z_NULL;
1351 stream.zfree = Z_NULL;
1352 stream.opaque = Z_NULL;
1354 ret = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
1356 log_err("fio: failed to init gz stream\n");
1360 seq = ++data->log->chunk_seq;
1362 stream.next_in = (void *) data->samples;
1363 stream.avail_in = data->nr_samples * log_entry_sz(data->log);
1365 dprint(FD_COMPRESS, "deflate input size=%lu, seq=%u, log=%s\n",
1366 (unsigned long) stream.avail_in, seq,
1367 data->log->filename);
1370 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1371 (unsigned long) c->len);
1372 c = get_new_chunk(seq);
1373 stream.avail_out = GZ_CHUNK;
1374 stream.next_out = c->buf;
1375 ret = deflate(&stream, Z_NO_FLUSH);
1377 log_err("fio: deflate log (%d)\n", ret);
1382 c->len = GZ_CHUNK - stream.avail_out;
1383 flist_add_tail(&c->list, &list);
1385 } while (stream.avail_in);
1387 stream.next_out = c->buf + c->len;
1388 stream.avail_out = GZ_CHUNK - c->len;
1390 ret = deflate(&stream, Z_FINISH);
1393 * Z_BUF_ERROR is special, it just means we need more
1394 * output space. We'll handle that below. Treat any other
1397 if (ret != Z_BUF_ERROR) {
1398 log_err("fio: deflate log (%d)\n", ret);
1399 flist_del(&c->list);
1406 c->len = GZ_CHUNK - stream.avail_out;
1408 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq, (unsigned long) c->len);
1410 if (ret != Z_STREAM_END) {
1412 c = get_new_chunk(seq);
1413 stream.avail_out = GZ_CHUNK;
1414 stream.next_out = c->buf;
1415 ret = deflate(&stream, Z_FINISH);
1416 c->len = GZ_CHUNK - stream.avail_out;
1418 flist_add_tail(&c->list, &list);
1419 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1420 (unsigned long) c->len);
1421 } while (ret != Z_STREAM_END);
1424 dprint(FD_COMPRESS, "deflated to size=%lu\n", (unsigned long) total);
1426 ret = deflateEnd(&stream);
1428 log_err("fio: deflateEnd %d\n", ret);
1430 iolog_put_deferred(data->log, data->samples);
1432 if (!flist_empty(&list)) {
1433 pthread_mutex_lock(&data->log->chunk_lock);
1434 flist_splice_tail(&list, &data->log->chunk_list);
1435 pthread_mutex_unlock(&data->log->chunk_lock);
1444 while (!flist_empty(&list)) {
1445 c = flist_first_entry(list.next, struct iolog_compress, list);
1446 flist_del(&c->list);
1454 * Invoked from our compress helper thread, when logging would have exceeded
1455 * the specified memory limitation. Compresses the previously stored
1458 static int gz_work_async(struct submit_worker *sw, struct workqueue_work *work)
1460 return gz_work(container_of(work, struct iolog_flush_data, work));
1463 static int gz_init_worker(struct submit_worker *sw)
1465 struct thread_data *td = sw->wq->td;
1467 if (!fio_option_is_set(&td->o, log_gz_cpumask))
1470 if (fio_setaffinity(gettid(), td->o.log_gz_cpumask) == -1) {
1471 log_err("gz: failed to set CPU affinity\n");
1478 static struct workqueue_ops log_compress_wq_ops = {
1479 .fn = gz_work_async,
1480 .init_worker_fn = gz_init_worker,
1484 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1486 if (!(td->flags & TD_F_COMPRESS_LOG))
1489 workqueue_init(td, &td->log_compress_wq, &log_compress_wq_ops, 1, sk_out);
1493 void iolog_compress_exit(struct thread_data *td)
1495 if (!(td->flags & TD_F_COMPRESS_LOG))
1498 workqueue_exit(&td->log_compress_wq);
1502 * Queue work item to compress the existing log entries. We reset the
1503 * current log to a small size, and reference the existing log in the
1504 * data that we queue for compression. Once compression has been done,
1505 * this old log is freed. If called with finish == true, will not return
1506 * until the log compression has completed, and will flush all previous
1509 static int iolog_flush(struct io_log *log)
1511 struct iolog_flush_data *data;
1513 data = malloc(sizeof(*data));
1520 while (!flist_empty(&log->io_logs)) {
1521 struct io_logs *cur_log;
1523 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1524 flist_del_init(&cur_log->list);
1526 data->samples = cur_log->log;
1527 data->nr_samples = cur_log->nr_samples;
1538 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1540 struct iolog_flush_data *data;
1542 data = smalloc(sizeof(*data));
1548 data->samples = cur_log->log;
1549 data->nr_samples = cur_log->nr_samples;
1552 cur_log->nr_samples = cur_log->max_samples = 0;
1553 cur_log->log = NULL;
1555 workqueue_enqueue(&log->td->log_compress_wq, &data->work);
1557 iolog_free_deferred(log);
1563 static int iolog_flush(struct io_log *log)
1568 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1573 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1578 void iolog_compress_exit(struct thread_data *td)
1584 struct io_logs *iolog_cur_log(struct io_log *log)
1586 if (flist_empty(&log->io_logs))
1589 return flist_last_entry(&log->io_logs, struct io_logs, list);
1592 uint64_t iolog_nr_samples(struct io_log *iolog)
1594 struct flist_head *entry;
1597 flist_for_each(entry, &iolog->io_logs) {
1598 struct io_logs *cur_log;
1600 cur_log = flist_entry(entry, struct io_logs, list);
1601 ret += cur_log->nr_samples;
1607 static int __write_log(struct thread_data *td, struct io_log *log, int try)
1610 return finish_log(td, log, try);
1615 static int write_iops_log(struct thread_data *td, int try, bool unit_log)
1619 if (per_unit_log(td->iops_log) != unit_log)
1622 ret = __write_log(td, td->iops_log, try);
1624 td->iops_log = NULL;
1629 static int write_slat_log(struct thread_data *td, int try, bool unit_log)
1636 ret = __write_log(td, td->slat_log, try);
1638 td->slat_log = NULL;
1643 static int write_clat_log(struct thread_data *td, int try, bool unit_log)
1650 ret = __write_log(td, td->clat_log, try);
1652 td->clat_log = NULL;
1657 static int write_clat_hist_log(struct thread_data *td, int try, bool unit_log)
1664 ret = __write_log(td, td->clat_hist_log, try);
1666 td->clat_hist_log = NULL;
1671 static int write_lat_log(struct thread_data *td, int try, bool unit_log)
1678 ret = __write_log(td, td->lat_log, try);
1685 static int write_bandw_log(struct thread_data *td, int try, bool unit_log)
1689 if (per_unit_log(td->bw_log) != unit_log)
1692 ret = __write_log(td, td->bw_log, try);
1705 CLAT_HIST_LOG_MASK = 32,
1712 int (*fn)(struct thread_data *, int, bool);
1715 static struct log_type log_types[] = {
1717 .mask = BW_LOG_MASK,
1718 .fn = write_bandw_log,
1721 .mask = LAT_LOG_MASK,
1722 .fn = write_lat_log,
1725 .mask = SLAT_LOG_MASK,
1726 .fn = write_slat_log,
1729 .mask = CLAT_LOG_MASK,
1730 .fn = write_clat_log,
1733 .mask = IOPS_LOG_MASK,
1734 .fn = write_iops_log,
1737 .mask = CLAT_HIST_LOG_MASK,
1738 .fn = write_clat_hist_log,
1742 void td_writeout_logs(struct thread_data *td, bool unit_logs)
1744 unsigned int log_mask = 0;
1745 unsigned int log_left = ALL_LOG_NR;
1748 old_state = td_bump_runstate(td, TD_FINISHING);
1750 finalize_logs(td, unit_logs);
1753 int prev_log_left = log_left;
1755 for (i = 0; i < ALL_LOG_NR && log_left; i++) {
1756 struct log_type *lt = &log_types[i];
1759 if (!(log_mask & lt->mask)) {
1760 ret = lt->fn(td, log_left != 1, unit_logs);
1763 log_mask |= lt->mask;
1768 if (prev_log_left == log_left)
1772 td_restore_runstate(td, old_state);
1775 void fio_writeout_logs(bool unit_logs)
1777 struct thread_data *td;
1781 td_writeout_logs(td, unit_logs);