2 * Code related to writing an iolog of what a thread is doing, and to
3 * later read that back and replay
23 static const char iolog_ver2[] = "fio version 2 iolog";
25 void queue_io_piece(struct thread_data *td, struct io_piece *ipo)
27 flist_add_tail(&ipo->list, &td->io_log_list);
28 td->total_io_size += ipo->len;
31 void log_io_u(const struct thread_data *td, const struct io_u *io_u)
33 if (!td->o.write_iolog_file)
36 fprintf(td->iolog_f, "%s %s %llu %lu\n", io_u->file->file_name,
37 io_ddir_name(io_u->ddir),
38 io_u->offset, io_u->buflen);
41 void log_file(struct thread_data *td, struct fio_file *f,
42 enum file_log_act what)
44 const char *act[] = { "add", "open", "close" };
48 if (!td->o.write_iolog_file)
53 * this happens on the pre-open/close done before the job starts
58 fprintf(td->iolog_f, "%s %s\n", f->file_name, act[what]);
61 static void iolog_delay(struct thread_data *td, unsigned long delay)
63 uint64_t usec = utime_since_now(&td->last_issue);
67 if (delay < td->time_offset) {
72 delay -= td->time_offset;
78 fio_gettime(&tv, NULL);
79 while (delay && !td->terminate) {
81 if (this_delay > 500000)
84 usec_sleep(td, this_delay);
88 usec = utime_since_now(&tv);
90 td->time_offset = usec - delay;
95 static int ipo_special(struct thread_data *td, struct io_piece *ipo)
103 if (ipo->ddir != DDIR_INVAL)
106 f = td->files[ipo->fileno];
108 switch (ipo->file_action) {
109 case FIO_LOG_OPEN_FILE:
110 ret = td_io_open_file(td, f);
113 td_verror(td, ret, "iolog open file");
115 case FIO_LOG_CLOSE_FILE:
116 td_io_close_file(td, f);
118 case FIO_LOG_UNLINK_FILE:
119 td_io_unlink_file(td, f);
122 log_err("fio: bad file action %d\n", ipo->file_action);
129 int read_iolog_get(struct thread_data *td, struct io_u *io_u)
131 struct io_piece *ipo;
132 unsigned long elapsed;
134 while (!flist_empty(&td->io_log_list)) {
137 ipo = flist_first_entry(&td->io_log_list, struct io_piece, list);
138 flist_del(&ipo->list);
139 remove_trim_entry(td, ipo);
141 ret = ipo_special(td, ipo);
145 } else if (ret > 0) {
150 io_u->ddir = ipo->ddir;
151 if (ipo->ddir != DDIR_WAIT) {
152 io_u->offset = ipo->offset;
153 io_u->buflen = ipo->len;
154 io_u->file = td->files[ipo->fileno];
155 get_file(io_u->file);
156 dprint(FD_IO, "iolog: get %llu/%lu/%s\n", io_u->offset,
157 io_u->buflen, io_u->file->file_name);
159 iolog_delay(td, ipo->delay);
161 elapsed = mtime_since_genesis();
162 if (ipo->delay > elapsed)
163 usec_sleep(td, (ipo->delay - elapsed) * 1000);
168 if (io_u->ddir != DDIR_WAIT)
176 void prune_io_piece_log(struct thread_data *td)
178 struct io_piece *ipo;
181 while ((n = rb_first(&td->io_hist_tree)) != NULL) {
182 ipo = rb_entry(n, struct io_piece, rb_node);
183 rb_erase(n, &td->io_hist_tree);
184 remove_trim_entry(td, ipo);
189 while (!flist_empty(&td->io_hist_list)) {
190 ipo = flist_first_entry(&td->io_hist_list, struct io_piece, list);
191 flist_del(&ipo->list);
192 remove_trim_entry(td, ipo);
199 * log a successful write, so we can unwind the log for verify
201 void log_io_piece(struct thread_data *td, struct io_u *io_u)
203 struct rb_node **p, *parent;
204 struct io_piece *ipo, *__ipo;
206 ipo = malloc(sizeof(struct io_piece));
208 ipo->file = io_u->file;
209 ipo->offset = io_u->offset;
210 ipo->len = io_u->buflen;
211 ipo->numberio = io_u->numberio;
212 ipo->flags = IP_F_IN_FLIGHT;
216 if (io_u_should_trim(td, io_u)) {
217 flist_add_tail(&ipo->trim_list, &td->trim_list);
222 * We don't need to sort the entries, if:
224 * Sequential writes, or
225 * Random writes that lay out the file as it goes along
227 * For both these cases, just reading back data in the order we
228 * wrote it out is the fastest.
230 * One exception is if we don't have a random map AND we are doing
231 * verifies, in that case we need to check for duplicate blocks and
232 * drop the old one, which we rely on the rb insert/lookup for
235 if (((!td->o.verifysort) || !td_random(td) || !td->o.overwrite) &&
236 (file_randommap(td, ipo->file) || td->o.verify == VERIFY_NONE)) {
237 INIT_FLIST_HEAD(&ipo->list);
238 flist_add_tail(&ipo->list, &td->io_hist_list);
239 ipo->flags |= IP_F_ONLIST;
244 RB_CLEAR_NODE(&ipo->rb_node);
247 * Sort the entry into the verification list
250 p = &td->io_hist_tree.rb_node;
255 __ipo = rb_entry(parent, struct io_piece, rb_node);
256 if (ipo->file < __ipo->file)
258 else if (ipo->file > __ipo->file)
260 else if (ipo->offset < __ipo->offset)
262 else if (ipo->offset > __ipo->offset)
265 dprint(FD_IO, "iolog: overlap %llu/%lu, %llu/%lu",
266 __ipo->offset, __ipo->len,
267 ipo->offset, ipo->len);
269 rb_erase(parent, &td->io_hist_tree);
270 remove_trim_entry(td, __ipo);
276 rb_link_node(&ipo->rb_node, parent, p);
277 rb_insert_color(&ipo->rb_node, &td->io_hist_tree);
278 ipo->flags |= IP_F_ONRB;
282 void unlog_io_piece(struct thread_data *td, struct io_u *io_u)
284 struct io_piece *ipo = io_u->ipo;
289 if (ipo->flags & IP_F_ONRB)
290 rb_erase(&ipo->rb_node, &td->io_hist_tree);
291 else if (ipo->flags & IP_F_ONLIST)
292 flist_del(&ipo->list);
299 void trim_io_piece(struct thread_data *td, const struct io_u *io_u)
301 struct io_piece *ipo = io_u->ipo;
306 ipo->len = io_u->xfer_buflen - io_u->resid;
309 void write_iolog_close(struct thread_data *td)
315 td->iolog_buf = NULL;
319 * Read version 2 iolog data. It is enhanced to include per-file logging,
322 static int read_iolog2(struct thread_data *td, FILE *f)
324 unsigned long long offset;
326 int reads, writes, waits, fileno = 0, file_action = 0; /* stupid gcc */
331 free_release_files(td);
334 * Read in the read iolog and store it, reuse the infrastructure
335 * for doing verifications.
338 fname = malloc(256+16);
339 act = malloc(256+16);
341 reads = writes = waits = 0;
342 while ((p = fgets(str, 4096, f)) != NULL) {
343 struct io_piece *ipo;
346 r = sscanf(p, "%256s %256s %llu %u", fname, act, &offset,
352 if (!strcmp(act, "wait"))
354 else if (!strcmp(act, "read"))
356 else if (!strcmp(act, "write"))
358 else if (!strcmp(act, "sync"))
360 else if (!strcmp(act, "datasync"))
362 else if (!strcmp(act, "trim"))
365 log_err("fio: bad iolog file action: %s\n",
369 fileno = get_fileno(td, fname);
372 if (!strcmp(act, "add")) {
373 fileno = add_file(td, fname, 0, 1);
374 file_action = FIO_LOG_ADD_FILE;
376 } else if (!strcmp(act, "open")) {
377 fileno = get_fileno(td, fname);
378 file_action = FIO_LOG_OPEN_FILE;
379 } else if (!strcmp(act, "close")) {
380 fileno = get_fileno(td, fname);
381 file_action = FIO_LOG_CLOSE_FILE;
383 log_err("fio: bad iolog file action: %s\n",
388 log_err("bad iolog2: %s", p);
394 else if (rw == DDIR_WRITE) {
396 * Don't add a write for ro mode
401 } else if (rw == DDIR_WAIT) {
403 } else if (rw == DDIR_INVAL) {
404 } else if (!ddir_sync(rw)) {
405 log_err("bad ddir: %d\n", rw);
412 ipo = malloc(sizeof(*ipo));
415 if (rw == DDIR_WAIT) {
418 ipo->offset = offset;
420 if (rw != DDIR_INVAL && bytes > td->o.max_bs[rw])
421 td->o.max_bs[rw] = bytes;
422 ipo->fileno = fileno;
423 ipo->file_action = file_action;
427 queue_io_piece(td, ipo);
434 if (writes && read_only) {
435 log_err("fio: <%s> skips replay of %d writes due to"
436 " read-only\n", td->o.name, writes);
440 if (!reads && !writes && !waits)
442 else if (reads && !writes)
443 td->o.td_ddir = TD_DDIR_READ;
444 else if (!reads && writes)
445 td->o.td_ddir = TD_DDIR_WRITE;
447 td->o.td_ddir = TD_DDIR_RW;
453 * open iolog, check version, and call appropriate parser
455 static int init_iolog_read(struct thread_data *td)
457 char buffer[256], *p;
461 f = fopen(td->o.read_iolog_file, "r");
463 perror("fopen read iolog");
467 p = fgets(buffer, sizeof(buffer), f);
469 td_verror(td, errno, "iolog read");
470 log_err("fio: unable to read iolog\n");
476 * version 2 of the iolog stores a specific string as the
477 * first line, check for that
479 if (!strncmp(iolog_ver2, buffer, strlen(iolog_ver2)))
480 ret = read_iolog2(td, f);
482 log_err("fio: iolog version 1 is no longer supported\n");
491 * Set up a log for storing io patterns.
493 static int init_iolog_write(struct thread_data *td)
499 f = fopen(td->o.write_iolog_file, "a");
501 perror("fopen write iolog");
506 * That's it for writing, setup a log buffer and we're done.
509 td->iolog_buf = malloc(8192);
510 setvbuf(f, td->iolog_buf, _IOFBF, 8192);
513 * write our version line
515 if (fprintf(f, "%s\n", iolog_ver2) < 0) {
516 perror("iolog init\n");
521 * add all known files
523 for_each_file(td, ff, i)
524 log_file(td, ff, FIO_LOG_ADD_FILE);
529 int init_iolog(struct thread_data *td)
533 if (td->o.read_iolog_file) {
537 * Check if it's a blktrace file and load that if possible.
538 * Otherwise assume it's a normal log file and load that.
540 if (is_blktrace(td->o.read_iolog_file, &need_swap))
541 ret = load_blktrace(td, td->o.read_iolog_file, need_swap);
543 ret = init_iolog_read(td);
544 } else if (td->o.write_iolog_file)
545 ret = init_iolog_write(td);
548 td_verror(td, EINVAL, "failed initializing iolog");
553 void setup_log(struct io_log **log, struct log_params *p,
554 const char *filename)
558 l = calloc(1, sizeof(*l));
560 l->max_samples = 1024;
561 l->log_type = p->log_type;
562 l->log_offset = p->log_offset;
563 l->log_gz = p->log_gz;
564 l->log_gz_store = p->log_gz_store;
565 l->log = malloc(l->max_samples * log_entry_sz(l));
566 l->avg_msec = p->avg_msec;
567 l->filename = strdup(filename);
571 l->log_ddir_mask = LOG_OFFSET_SAMPLE_BIT;
573 INIT_FLIST_HEAD(&l->chunk_list);
575 if (l->log_gz && !p->td)
577 else if (l->log_gz) {
578 pthread_mutex_init(&l->chunk_lock, NULL);
579 p->td->flags |= TD_F_COMPRESS_LOG;
585 #ifdef CONFIG_SETVBUF
586 static void *set_file_buffer(FILE *f)
588 size_t size = 1048576;
592 setvbuf(f, buf, _IOFBF, size);
596 static void clear_file_buffer(void *buf)
601 static void *set_file_buffer(FILE *f)
606 static void clear_file_buffer(void *buf)
611 void free_log(struct io_log *log)
618 static void flush_samples(FILE *f, void *samples, uint64_t sample_size)
622 uint64_t i, nr_samples;
627 s = __get_sample(samples, 0, 0);
628 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
630 nr_samples = sample_size / __log_entry_sz(log_offset);
632 for (i = 0; i < nr_samples; i++) {
633 s = __get_sample(samples, log_offset, i);
636 fprintf(f, "%lu, %lu, %u, %u\n",
637 (unsigned long) s->time,
638 (unsigned long) s->val,
639 io_sample_ddir(s), s->bs);
641 struct io_sample_offset *so = (void *) s;
643 fprintf(f, "%lu, %lu, %u, %u, %llu\n",
644 (unsigned long) s->time,
645 (unsigned long) s->val,
646 io_sample_ddir(s), s->bs,
647 (unsigned long long) so->offset);
654 struct iolog_flush_data {
661 struct iolog_compress {
662 struct flist_head list;
668 #define GZ_CHUNK 131072
670 static struct iolog_compress *get_new_chunk(unsigned int seq)
672 struct iolog_compress *c;
674 c = malloc(sizeof(*c));
675 INIT_FLIST_HEAD(&c->list);
676 c->buf = malloc(GZ_CHUNK);
682 static void free_chunk(struct iolog_compress *ic)
688 static int z_stream_init(z_stream *stream, int gz_hdr)
692 stream->zalloc = Z_NULL;
693 stream->zfree = Z_NULL;
694 stream->opaque = Z_NULL;
695 stream->next_in = Z_NULL;
698 * zlib magic - add 32 for auto-detection of gz header or not,
699 * if we decide to store files in a gzip friendly format.
704 if (inflateInit2(stream, wbits) != Z_OK)
710 struct inflate_chunk_iter {
719 static void finish_chunk(z_stream *stream, FILE *f,
720 struct inflate_chunk_iter *iter)
724 ret = inflateEnd(stream);
726 log_err("fio: failed to end log inflation (%d)\n", ret);
728 flush_samples(f, iter->buf, iter->buf_used);
731 iter->buf_size = iter->buf_used = 0;
735 * Iterative chunk inflation. Handles cases where we cross into a new
736 * sequence, doing flush finish of previous chunk if needed.
738 static size_t inflate_chunk(struct iolog_compress *ic, int gz_hdr, FILE *f,
739 z_stream *stream, struct inflate_chunk_iter *iter)
743 dprint(FD_COMPRESS, "inflate chunk size=%lu, seq=%u",
744 (unsigned long) ic->len, ic->seq);
746 if (ic->seq != iter->seq) {
748 finish_chunk(stream, f, iter);
750 z_stream_init(stream, gz_hdr);
754 stream->avail_in = ic->len;
755 stream->next_in = ic->buf;
757 if (!iter->buf_size) {
758 iter->buf_size = iter->chunk_sz;
759 iter->buf = malloc(iter->buf_size);
762 while (stream->avail_in) {
763 size_t this_out = iter->buf_size - iter->buf_used;
766 stream->avail_out = this_out;
767 stream->next_out = iter->buf + iter->buf_used;
769 err = inflate(stream, Z_NO_FLUSH);
771 log_err("fio: failed inflating log: %d\n", err);
776 iter->buf_used += this_out - stream->avail_out;
778 if (!stream->avail_out) {
779 iter->buf_size += iter->chunk_sz;
780 iter->buf = realloc(iter->buf, iter->buf_size);
784 if (err == Z_STREAM_END)
788 ret = (void *) stream->next_in - ic->buf;
790 dprint(FD_COMPRESS, "inflated to size=%lu\n", (unsigned long) ret);
796 * Inflate stored compressed chunks, or write them directly to the log
797 * file if so instructed.
799 static int inflate_gz_chunks(struct io_log *log, FILE *f)
801 struct inflate_chunk_iter iter = { .chunk_sz = log->log_gz, };
804 while (!flist_empty(&log->chunk_list)) {
805 struct iolog_compress *ic;
807 ic = flist_first_entry(&log->chunk_list, struct iolog_compress, list);
808 flist_del(&ic->list);
810 if (log->log_gz_store) {
813 dprint(FD_COMPRESS, "log write chunk size=%lu, "
814 "seq=%u\n", (unsigned long) ic->len, ic->seq);
816 ret = fwrite(ic->buf, ic->len, 1, f);
817 if (ret != 1 || ferror(f)) {
819 log_err("fio: error writing compressed log\n");
822 inflate_chunk(ic, log->log_gz_store, f, &stream, &iter);
828 finish_chunk(&stream, f, &iter);
836 * Open compressed log file and decompress the stored chunks and
837 * write them to stdout. The chunks are stored sequentially in the
838 * file, so we iterate over them and do them one-by-one.
840 int iolog_file_inflate(const char *file)
842 struct inflate_chunk_iter iter = { .chunk_sz = 64 * 1024 * 1024, };
843 struct iolog_compress ic;
851 f = fopen(file, "r");
857 if (stat(file, &sb) < 0) {
863 ic.buf = buf = malloc(sb.st_size);
867 ret = fread(ic.buf, ic.len, 1, f);
872 } else if (ret != 1) {
873 log_err("fio: short read on reading log\n");
881 * Each chunk will return Z_STREAM_END. We don't know how many
882 * chunks are in the file, so we just keep looping and incrementing
883 * the sequence number until we have consumed the whole compressed
890 iret = inflate_chunk(&ic, 1, stdout, &stream, &iter);
903 finish_chunk(&stream, stdout, &iter);
913 static int inflate_gz_chunks(struct io_log *log, FILE *f)
918 int iolog_file_inflate(const char *file)
920 log_err("fio: log inflation not possible without zlib\n");
926 void flush_log(struct io_log *log)
931 f = fopen(log->filename, "w");
937 buf = set_file_buffer(f);
939 inflate_gz_chunks(log, f);
941 flush_samples(f, log->log, log->nr_samples * log_entry_sz(log));
944 clear_file_buffer(buf);
947 static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
953 if (fio_trylock_file(log->filename))
956 fio_lock_file(log->filename);
958 if (td->client_type == FIO_CLIENT_TYPE_GUI)
959 fio_send_iolog(td, log, log->filename);
963 fio_unlock_file(log->filename);
971 * Invoked from our compress helper thread, when logging would have exceeded
972 * the specified memory limitation. Compresses the previously stored
975 static int gz_work(struct tp_work *work)
977 struct iolog_flush_data *data;
978 struct iolog_compress *c;
979 struct flist_head list;
985 INIT_FLIST_HEAD(&list);
987 data = container_of(work, struct iolog_flush_data, work);
989 stream.zalloc = Z_NULL;
990 stream.zfree = Z_NULL;
991 stream.opaque = Z_NULL;
993 ret = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
995 log_err("fio: failed to init gz stream\n");
999 seq = ++data->log->chunk_seq;
1001 stream.next_in = (void *) data->samples;
1002 stream.avail_in = data->nr_samples * log_entry_sz(data->log);
1004 dprint(FD_COMPRESS, "deflate input size=%lu, seq=%u\n",
1005 (unsigned long) stream.avail_in, seq);
1007 c = get_new_chunk(seq);
1008 stream.avail_out = GZ_CHUNK;
1009 stream.next_out = c->buf;
1010 ret = deflate(&stream, Z_NO_FLUSH);
1012 log_err("fio: deflate log (%d)\n", ret);
1017 c->len = GZ_CHUNK - stream.avail_out;
1018 flist_add_tail(&c->list, &list);
1020 } while (stream.avail_in);
1022 stream.next_out = c->buf + c->len;
1023 stream.avail_out = GZ_CHUNK - c->len;
1025 ret = deflate(&stream, Z_FINISH);
1026 if (ret == Z_STREAM_END)
1027 c->len = GZ_CHUNK - stream.avail_out;
1030 c = get_new_chunk(seq);
1031 stream.avail_out = GZ_CHUNK;
1032 stream.next_out = c->buf;
1033 ret = deflate(&stream, Z_FINISH);
1034 c->len = GZ_CHUNK - stream.avail_out;
1036 flist_add_tail(&c->list, &list);
1037 } while (ret != Z_STREAM_END);
1040 dprint(FD_COMPRESS, "deflated to size=%lu\n", (unsigned long) total);
1042 ret = deflateEnd(&stream);
1044 log_err("fio: deflateEnd %d\n", ret);
1046 free(data->samples);
1048 if (!flist_empty(&list)) {
1049 pthread_mutex_lock(&data->log->chunk_lock);
1050 flist_splice_tail(&list, &data->log->chunk_list);
1051 pthread_mutex_unlock(&data->log->chunk_lock);
1058 pthread_cond_signal(&work->cv);
1064 while (!flist_empty(&list)) {
1065 c = flist_first_entry(list.next, struct iolog_compress, list);
1066 flist_del(&c->list);
1074 * Queue work item to compress the existing log entries. We copy the
1075 * samples, and reset the log sample count to 0 (so the logging will
1076 * continue to use the memory associated with the log). If called with
1077 * wait == 1, will not return until the log compression has completed.
1079 int iolog_flush(struct io_log *log, int wait)
1081 struct tp_data *tdat = log->td->tp_data;
1082 struct iolog_flush_data *data;
1085 data = malloc(sizeof(*data));
1091 sample_size = log->nr_samples * log_entry_sz(log);
1092 data->samples = malloc(sample_size);
1093 if (!data->samples) {
1098 memcpy(data->samples, log->log, sample_size);
1099 data->nr_samples = log->nr_samples;
1100 data->work.fn = gz_work;
1101 log->nr_samples = 0;
1104 pthread_mutex_init(&data->work.lock, NULL);
1105 pthread_cond_init(&data->work.cv, NULL);
1106 data->work.wait = 1;
1108 data->work.wait = 0;
1110 data->work.prio = 1;
1111 tp_queue_work(tdat, &data->work);
1114 pthread_mutex_lock(&data->work.lock);
1115 while (!data->work.done)
1116 pthread_cond_wait(&data->work.cv, &data->work.lock);
1117 pthread_mutex_unlock(&data->work.lock);
1126 int iolog_flush(struct io_log *log, int wait)
1133 static int write_iops_log(struct thread_data *td, int try)
1135 struct io_log *log = td->iops_log;
1140 return finish_log(td, log, try);
1143 static int write_slat_log(struct thread_data *td, int try)
1145 struct io_log *log = td->slat_log;
1150 return finish_log(td, log, try);
1153 static int write_clat_log(struct thread_data *td, int try)
1155 struct io_log *log = td->clat_log;
1160 return finish_log(td, log, try);
1163 static int write_lat_log(struct thread_data *td, int try)
1165 struct io_log *log = td->lat_log;
1170 return finish_log(td, log, try);
1173 static int write_bandw_log(struct thread_data *td, int try)
1175 struct io_log *log = td->bw_log;
1180 return finish_log(td, log, try);
1195 int (*fn)(struct thread_data *, int);
1198 static struct log_type log_types[] = {
1200 .mask = BW_LOG_MASK,
1201 .fn = write_bandw_log,
1204 .mask = LAT_LOG_MASK,
1205 .fn = write_lat_log,
1208 .mask = SLAT_LOG_MASK,
1209 .fn = write_slat_log,
1212 .mask = CLAT_LOG_MASK,
1213 .fn = write_clat_log,
1216 .mask = IOPS_LOG_MASK,
1217 .fn = write_iops_log,
1221 void fio_writeout_logs(struct thread_data *td)
1223 unsigned int log_mask = 0;
1224 unsigned int log_left = ALL_LOG_NR;
1227 old_state = td_bump_runstate(td, TD_FINISHING);
1232 int prev_log_left = log_left;
1234 for (i = 0; i < ALL_LOG_NR && log_left; i++) {
1235 struct log_type *lt = &log_types[i];
1238 if (!(log_mask & lt->mask)) {
1239 ret = lt->fn(td, log_left != 1);
1242 log_mask |= lt->mask;
1247 if (prev_log_left == log_left)
1251 td_restore_runstate(td, old_state);