2 * Code related to writing an iolog of what a thread is doing, and to
3 * later read that back and replay
23 static const char iolog_ver2[] = "fio version 2 iolog";
25 void queue_io_piece(struct thread_data *td, struct io_piece *ipo)
27 flist_add_tail(&ipo->list, &td->io_log_list);
28 td->total_io_size += ipo->len;
31 void log_io_u(struct thread_data *td, struct io_u *io_u)
33 const char *act[] = { "read", "write", "sync", "datasync",
34 "sync_file_range", "wait", "trim" };
36 assert(io_u->ddir <= 6);
38 if (!td->o.write_iolog_file)
41 fprintf(td->iolog_f, "%s %s %llu %lu\n", io_u->file->file_name,
42 act[io_u->ddir], io_u->offset,
46 void log_file(struct thread_data *td, struct fio_file *f,
47 enum file_log_act what)
49 const char *act[] = { "add", "open", "close" };
53 if (!td->o.write_iolog_file)
58 * this happens on the pre-open/close done before the job starts
63 fprintf(td->iolog_f, "%s %s\n", f->file_name, act[what]);
66 static void iolog_delay(struct thread_data *td, unsigned long delay)
68 unsigned long usec = utime_since_now(&td->last_issue);
69 unsigned long this_delay;
77 * less than 100 usec delay, just regard it as noise
82 while (delay && !td->terminate) {
84 if (this_delay > 500000)
87 usec_sleep(td, this_delay);
92 static int ipo_special(struct thread_data *td, struct io_piece *ipo)
100 if (ipo->ddir != DDIR_INVAL)
103 f = td->files[ipo->fileno];
105 switch (ipo->file_action) {
106 case FIO_LOG_OPEN_FILE:
107 ret = td_io_open_file(td, f);
110 td_verror(td, ret, "iolog open file");
112 case FIO_LOG_CLOSE_FILE:
113 td_io_close_file(td, f);
115 case FIO_LOG_UNLINK_FILE:
116 unlink(f->file_name);
119 log_err("fio: bad file action %d\n", ipo->file_action);
126 int read_iolog_get(struct thread_data *td, struct io_u *io_u)
128 struct io_piece *ipo;
129 unsigned long elapsed;
131 while (!flist_empty(&td->io_log_list)) {
134 ipo = flist_first_entry(&td->io_log_list, struct io_piece, list);
135 flist_del(&ipo->list);
136 remove_trim_entry(td, ipo);
138 ret = ipo_special(td, ipo);
142 } else if (ret > 0) {
147 io_u->ddir = ipo->ddir;
148 if (ipo->ddir != DDIR_WAIT) {
149 io_u->offset = ipo->offset;
150 io_u->buflen = ipo->len;
151 io_u->file = td->files[ipo->fileno];
152 get_file(io_u->file);
153 dprint(FD_IO, "iolog: get %llu/%lu/%s\n", io_u->offset,
154 io_u->buflen, io_u->file->file_name);
156 iolog_delay(td, ipo->delay);
158 elapsed = mtime_since_genesis();
159 if (ipo->delay > elapsed)
160 usec_sleep(td, (ipo->delay - elapsed) * 1000);
165 if (io_u->ddir != DDIR_WAIT)
173 void prune_io_piece_log(struct thread_data *td)
175 struct io_piece *ipo;
178 while ((n = rb_first(&td->io_hist_tree)) != NULL) {
179 ipo = rb_entry(n, struct io_piece, rb_node);
180 rb_erase(n, &td->io_hist_tree);
181 remove_trim_entry(td, ipo);
186 while (!flist_empty(&td->io_hist_list)) {
187 ipo = flist_entry(&td->io_hist_list, struct io_piece, list);
188 flist_del(&ipo->list);
189 remove_trim_entry(td, ipo);
196 * log a successful write, so we can unwind the log for verify
198 void log_io_piece(struct thread_data *td, struct io_u *io_u)
200 struct rb_node **p, *parent;
201 struct io_piece *ipo, *__ipo;
203 ipo = malloc(sizeof(struct io_piece));
205 ipo->file = io_u->file;
206 ipo->offset = io_u->offset;
207 ipo->len = io_u->buflen;
208 ipo->numberio = io_u->numberio;
209 ipo->flags = IP_F_IN_FLIGHT;
213 if (io_u_should_trim(td, io_u)) {
214 flist_add_tail(&ipo->trim_list, &td->trim_list);
219 * We don't need to sort the entries, if:
221 * Sequential writes, or
222 * Random writes that lay out the file as it goes along
224 * For both these cases, just reading back data in the order we
225 * wrote it out is the fastest.
227 * One exception is if we don't have a random map AND we are doing
228 * verifies, in that case we need to check for duplicate blocks and
229 * drop the old one, which we rely on the rb insert/lookup for
232 if (((!td->o.verifysort) || !td_random(td) || !td->o.overwrite) &&
233 (file_randommap(td, ipo->file) || td->o.verify == VERIFY_NONE)) {
234 INIT_FLIST_HEAD(&ipo->list);
235 flist_add_tail(&ipo->list, &td->io_hist_list);
236 ipo->flags |= IP_F_ONLIST;
241 RB_CLEAR_NODE(&ipo->rb_node);
244 * Sort the entry into the verification list
247 p = &td->io_hist_tree.rb_node;
252 __ipo = rb_entry(parent, struct io_piece, rb_node);
253 if (ipo->file < __ipo->file)
255 else if (ipo->file > __ipo->file)
257 else if (ipo->offset < __ipo->offset)
259 else if (ipo->offset > __ipo->offset)
262 dprint(FD_IO, "iolog: overlap %llu/%lu, %llu/%lu",
263 __ipo->offset, __ipo->len,
264 ipo->offset, ipo->len);
266 rb_erase(parent, &td->io_hist_tree);
267 remove_trim_entry(td, __ipo);
273 rb_link_node(&ipo->rb_node, parent, p);
274 rb_insert_color(&ipo->rb_node, &td->io_hist_tree);
275 ipo->flags |= IP_F_ONRB;
279 void unlog_io_piece(struct thread_data *td, struct io_u *io_u)
281 struct io_piece *ipo = io_u->ipo;
286 if (ipo->flags & IP_F_ONRB)
287 rb_erase(&ipo->rb_node, &td->io_hist_tree);
288 else if (ipo->flags & IP_F_ONLIST)
289 flist_del(&ipo->list);
296 void trim_io_piece(struct thread_data *td, struct io_u *io_u)
298 struct io_piece *ipo = io_u->ipo;
303 ipo->len = io_u->xfer_buflen - io_u->resid;
306 void write_iolog_close(struct thread_data *td)
312 td->iolog_buf = NULL;
316 * Read version 2 iolog data. It is enhanced to include per-file logging,
319 static int read_iolog2(struct thread_data *td, FILE *f)
321 unsigned long long offset;
323 int reads, writes, waits, fileno = 0, file_action = 0; /* stupid gcc */
328 free_release_files(td);
331 * Read in the read iolog and store it, reuse the infrastructure
332 * for doing verifications.
335 fname = malloc(256+16);
336 act = malloc(256+16);
338 reads = writes = waits = 0;
339 while ((p = fgets(str, 4096, f)) != NULL) {
340 struct io_piece *ipo;
343 r = sscanf(p, "%256s %256s %llu %u", fname, act, &offset,
349 if (!strcmp(act, "wait"))
351 else if (!strcmp(act, "read"))
353 else if (!strcmp(act, "write"))
355 else if (!strcmp(act, "sync"))
357 else if (!strcmp(act, "datasync"))
359 else if (!strcmp(act, "trim"))
362 log_err("fio: bad iolog file action: %s\n",
366 fileno = get_fileno(td, fname);
369 if (!strcmp(act, "add")) {
370 fileno = add_file(td, fname, 0, 1);
371 file_action = FIO_LOG_ADD_FILE;
373 } else if (!strcmp(act, "open")) {
374 fileno = get_fileno(td, fname);
375 file_action = FIO_LOG_OPEN_FILE;
376 } else if (!strcmp(act, "close")) {
377 fileno = get_fileno(td, fname);
378 file_action = FIO_LOG_CLOSE_FILE;
380 log_err("fio: bad iolog file action: %s\n",
385 log_err("bad iolog2: %s", p);
391 else if (rw == DDIR_WRITE) {
393 * Don't add a write for ro mode
398 } else if (rw == DDIR_WAIT) {
400 } else if (rw == DDIR_INVAL) {
401 } else if (!ddir_sync(rw)) {
402 log_err("bad ddir: %d\n", rw);
409 ipo = malloc(sizeof(*ipo));
412 if (rw == DDIR_WAIT) {
415 ipo->offset = offset;
417 if (rw != DDIR_INVAL && bytes > td->o.max_bs[rw])
418 td->o.max_bs[rw] = bytes;
419 ipo->fileno = fileno;
420 ipo->file_action = file_action;
424 queue_io_piece(td, ipo);
431 if (writes && read_only) {
432 log_err("fio: <%s> skips replay of %d writes due to"
433 " read-only\n", td->o.name, writes);
437 if (!reads && !writes && !waits)
439 else if (reads && !writes)
440 td->o.td_ddir = TD_DDIR_READ;
441 else if (!reads && writes)
442 td->o.td_ddir = TD_DDIR_WRITE;
444 td->o.td_ddir = TD_DDIR_RW;
450 * open iolog, check version, and call appropriate parser
452 static int init_iolog_read(struct thread_data *td)
454 char buffer[256], *p;
458 f = fopen(td->o.read_iolog_file, "r");
460 perror("fopen read iolog");
464 p = fgets(buffer, sizeof(buffer), f);
466 td_verror(td, errno, "iolog read");
467 log_err("fio: unable to read iolog\n");
473 * version 2 of the iolog stores a specific string as the
474 * first line, check for that
476 if (!strncmp(iolog_ver2, buffer, strlen(iolog_ver2)))
477 ret = read_iolog2(td, f);
479 log_err("fio: iolog version 1 is no longer supported\n");
488 * Set up a log for storing io patterns.
490 static int init_iolog_write(struct thread_data *td)
496 f = fopen(td->o.write_iolog_file, "a");
498 perror("fopen write iolog");
503 * That's it for writing, setup a log buffer and we're done.
506 td->iolog_buf = malloc(8192);
507 setvbuf(f, td->iolog_buf, _IOFBF, 8192);
510 * write our version line
512 if (fprintf(f, "%s\n", iolog_ver2) < 0) {
513 perror("iolog init\n");
518 * add all known files
520 for_each_file(td, ff, i)
521 log_file(td, ff, FIO_LOG_ADD_FILE);
526 int init_iolog(struct thread_data *td)
530 if (td->o.read_iolog_file) {
534 * Check if it's a blktrace file and load that if possible.
535 * Otherwise assume it's a normal log file and load that.
537 if (is_blktrace(td->o.read_iolog_file, &need_swap))
538 ret = load_blktrace(td, td->o.read_iolog_file, need_swap);
540 ret = init_iolog_read(td);
541 } else if (td->o.write_iolog_file)
542 ret = init_iolog_write(td);
545 td_verror(td, EINVAL, "failed initializing iolog");
550 void setup_log(struct io_log **log, struct log_params *p,
551 const char *filename)
553 struct io_log *l = malloc(sizeof(*l));
555 memset(l, 0, sizeof(*l));
557 l->max_samples = 1024;
558 l->log_type = p->log_type;
559 l->log_offset = p->log_offset;
560 l->log_gz = p->log_gz;
561 l->log_gz_store = p->log_gz_store;
562 l->log = malloc(l->max_samples * log_entry_sz(l));
563 l->avg_msec = p->avg_msec;
564 l->filename = strdup(filename);
568 l->log_ddir_mask = LOG_OFFSET_SAMPLE_BIT;
570 INIT_FLIST_HEAD(&l->chunk_list);
572 if (l->log_gz && !p->td)
574 else if (l->log_gz) {
575 pthread_mutex_init(&l->chunk_lock, NULL);
576 p->td->flags |= TD_F_COMPRESS_LOG;
582 #ifdef CONFIG_SETVBUF
583 static void *set_file_buffer(FILE *f)
585 size_t size = 1048576;
589 setvbuf(f, buf, _IOFBF, size);
593 static void clear_file_buffer(void *buf)
598 static void *set_file_buffer(FILE *f)
603 static void clear_file_buffer(void *buf)
608 void free_log(struct io_log *log)
615 static void flush_samples(FILE *f, void *samples, uint64_t sample_size)
619 uint64_t i, nr_samples;
624 s = __get_sample(samples, 0, 0);
625 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
627 nr_samples = sample_size / __log_entry_sz(log_offset);
629 for (i = 0; i < nr_samples; i++) {
630 s = __get_sample(samples, log_offset, i);
633 fprintf(f, "%lu, %lu, %u, %u\n",
634 (unsigned long) s->time,
635 (unsigned long) s->val,
636 io_sample_ddir(s), s->bs);
638 struct io_sample_offset *so = (void *) s;
640 fprintf(f, "%lu, %lu, %u, %u, %llu\n",
641 (unsigned long) s->time,
642 (unsigned long) s->val,
643 io_sample_ddir(s), s->bs,
644 (unsigned long long) so->offset);
651 struct iolog_flush_data {
658 struct iolog_compress {
659 struct flist_head list;
665 #define GZ_CHUNK 131072
667 static struct iolog_compress *get_new_chunk(unsigned int seq)
669 struct iolog_compress *c;
671 c = malloc(sizeof(*c));
672 INIT_FLIST_HEAD(&c->list);
673 c->buf = malloc(GZ_CHUNK);
679 static void free_chunk(struct iolog_compress *ic)
685 static int z_stream_init(z_stream *stream, int gz_hdr)
689 stream->zalloc = Z_NULL;
690 stream->zfree = Z_NULL;
691 stream->opaque = Z_NULL;
692 stream->next_in = Z_NULL;
695 * zlib magic - add 32 for auto-detection of gz header or not,
696 * if we decide to store files in a gzip friendly format.
701 if (inflateInit2(stream, wbits) != Z_OK)
707 struct inflate_chunk_iter {
716 static void finish_chunk(z_stream *stream, FILE *f,
717 struct inflate_chunk_iter *iter)
721 ret = inflateEnd(stream);
723 log_err("fio: failed to end log inflation (%d)\n", ret);
725 flush_samples(f, iter->buf, iter->buf_used);
728 iter->buf_size = iter->buf_used = 0;
732 * Iterative chunk inflation. Handles cases where we cross into a new
733 * sequence, doing flush finish of previous chunk if needed.
735 static size_t inflate_chunk(struct iolog_compress *ic, int gz_hdr, FILE *f,
736 z_stream *stream, struct inflate_chunk_iter *iter)
738 if (ic->seq != iter->seq) {
740 finish_chunk(stream, f, iter);
742 z_stream_init(stream, gz_hdr);
746 stream->avail_in = ic->len;
747 stream->next_in = ic->buf;
749 if (!iter->buf_size) {
750 iter->buf_size = iter->chunk_sz;
751 iter->buf = malloc(iter->buf_size);
754 while (stream->avail_in) {
755 size_t this_out = iter->buf_size - iter->buf_used;
758 stream->avail_out = this_out;
759 stream->next_out = iter->buf + iter->buf_used;
761 err = inflate(stream, Z_NO_FLUSH);
763 log_err("fio: failed inflating log: %d\n", err);
768 iter->buf_used += this_out - stream->avail_out;
770 if (!stream->avail_out) {
771 iter->buf_size += iter->chunk_sz;
772 iter->buf = realloc(iter->buf, iter->buf_size);
776 if (err == Z_STREAM_END)
780 return (void *) stream->next_in - ic->buf;
784 * Inflate stored compressed chunks, or write them directly to the log
785 * file if so instructed.
787 static int inflate_gz_chunks(struct io_log *log, FILE *f)
789 struct inflate_chunk_iter iter = { .chunk_sz = log->log_gz, };
792 while (!flist_empty(&log->chunk_list)) {
793 struct iolog_compress *ic;
795 ic = flist_first_entry(&log->chunk_list, struct iolog_compress, list);
796 flist_del(&ic->list);
798 if (log->log_gz_store) {
801 ret = fwrite(ic->buf, ic->len, 1, f);
802 if (ret != 1 || ferror(f)) {
804 log_err("fio: error writing compressed log\n");
807 inflate_chunk(ic, log->log_gz_store, f, &stream, &iter);
813 finish_chunk(&stream, f, &iter);
821 * Open compressed log file and decompress the stored chunks and
822 * write them to stdout. The chunks are stored sequentially in the
823 * file, so we iterate over them and do them one-by-one.
825 int iolog_file_inflate(const char *file)
827 struct inflate_chunk_iter iter = { .chunk_sz = 64 * 1024 * 1024, };
828 struct iolog_compress ic;
836 f = fopen(file, "r");
842 if (stat(file, &sb) < 0) {
848 ic.buf = buf = malloc(sb.st_size);
852 ret = fread(ic.buf, ic.len, 1, f);
857 } else if (ret != 1) {
858 log_err("fio: short read on reading log\n");
866 * Each chunk will return Z_STREAM_END. We don't know how many
867 * chunks are in the file, so we just keep looping and incrementing
868 * the sequence number until we have consumed the whole compressed
875 ret = inflate_chunk(&ic, 1, stdout, &stream, &iter);
888 finish_chunk(&stream, stdout, &iter);
898 static int inflate_gz_chunks(struct io_log *log, FILE *f)
903 int iolog_file_inflate(const char *file)
905 log_err("fio: log inflation not possible without zlib\n");
911 void flush_log(struct io_log *log)
916 f = fopen(log->filename, "w");
922 buf = set_file_buffer(f);
924 inflate_gz_chunks(log, f);
926 flush_samples(f, log->log, log->nr_samples * log_entry_sz(log));
929 clear_file_buffer(buf);
932 static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
938 if (fio_trylock_file(log->filename))
941 fio_lock_file(log->filename);
943 if (td->client_type == FIO_CLIENT_TYPE_GUI)
944 fio_send_iolog(td, log, log->filename);
948 fio_unlock_file(log->filename);
956 * Invoked from our compress helper thread, when logging would have exceeded
957 * the specified memory limitation. Compresses the previously stored
960 static int gz_work(struct tp_work *work)
962 struct iolog_flush_data *data;
963 struct iolog_compress *c;
964 struct flist_head list;
970 INIT_FLIST_HEAD(&list);
972 data = container_of(work, struct iolog_flush_data, work);
974 stream.zalloc = Z_NULL;
975 stream.zfree = Z_NULL;
976 stream.opaque = Z_NULL;
978 ret = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
980 log_err("fio: failed to init gz stream\n");
984 seq = ++data->log->chunk_seq;
986 stream.next_in = (void *) data->samples;
987 stream.avail_in = data->nr_samples * log_entry_sz(data->log);
990 c = get_new_chunk(seq);
991 stream.avail_out = GZ_CHUNK;
992 stream.next_out = c->buf;
993 ret = deflate(&stream, Z_NO_FLUSH);
995 log_err("fio: deflate log (%d)\n", ret);
1000 c->len = GZ_CHUNK - stream.avail_out;
1001 flist_add_tail(&c->list, &list);
1003 } while (stream.avail_in);
1005 stream.next_out = c->buf + c->len;
1006 stream.avail_out = GZ_CHUNK - c->len;
1008 ret = deflate(&stream, Z_FINISH);
1009 if (ret == Z_STREAM_END)
1010 c->len = GZ_CHUNK - stream.avail_out;
1013 c = get_new_chunk(seq);
1014 stream.avail_out = GZ_CHUNK;
1015 stream.next_out = c->buf;
1016 ret = deflate(&stream, Z_FINISH);
1017 c->len = GZ_CHUNK - stream.avail_out;
1018 flist_add_tail(&c->list, &list);
1019 } while (ret != Z_STREAM_END);
1022 ret = deflateEnd(&stream);
1024 log_err("fio: deflateEnd %d\n", ret);
1026 free(data->samples);
1028 if (!flist_empty(&list)) {
1029 pthread_mutex_lock(&data->log->chunk_lock);
1030 flist_splice_tail(&list, &data->log->chunk_list);
1031 pthread_mutex_unlock(&data->log->chunk_lock);
1038 pthread_cond_signal(&work->cv);
1044 while (!flist_empty(&list)) {
1045 c = flist_first_entry(list.next, struct iolog_compress, list);
1046 flist_del(&c->list);
1054 * Queue work item to compress the existing log entries. We copy the
1055 * samples, and reset the log sample count to 0 (so the logging will
1056 * continue to use the memory associated with the log). If called with
1057 * wait == 1, will not return until the log compression has completed.
1059 int iolog_flush(struct io_log *log, int wait)
1061 struct tp_data *tdat = log->td->tp_data;
1062 struct iolog_flush_data *data;
1065 data = malloc(sizeof(*data));
1071 sample_size = log->nr_samples * log_entry_sz(log);
1072 data->samples = malloc(sample_size);
1073 if (!data->samples) {
1078 memcpy(data->samples, log->log, sample_size);
1079 data->nr_samples = log->nr_samples;
1080 data->work.fn = gz_work;
1081 log->nr_samples = 0;
1084 pthread_mutex_init(&data->work.lock, NULL);
1085 pthread_cond_init(&data->work.cv, NULL);
1086 data->work.wait = 1;
1088 data->work.wait = 0;
1090 data->work.prio = 1;
1091 tp_queue_work(tdat, &data->work);
1094 pthread_mutex_lock(&data->work.lock);
1095 while (!data->work.done)
1096 pthread_cond_wait(&data->work.cv, &data->work.lock);
1097 pthread_mutex_unlock(&data->work.lock);
1106 int iolog_flush(struct io_log *log, int wait)
1113 static int write_iops_log(struct thread_data *td, int try)
1115 struct io_log *log = td->iops_log;
1120 return finish_log(td, log, try);
1123 static int write_slat_log(struct thread_data *td, int try)
1125 struct io_log *log = td->slat_log;
1130 return finish_log(td, log, try);
1133 static int write_clat_log(struct thread_data *td, int try)
1135 struct io_log *log = td->clat_log;
1140 return finish_log(td, log, try);
1143 static int write_lat_log(struct thread_data *td, int try)
1145 struct io_log *log = td->lat_log;
1150 return finish_log(td, log, try);
1153 static int write_bandw_log(struct thread_data *td, int try)
1155 struct io_log *log = td->bw_log;
1160 return finish_log(td, log, try);
1175 int (*fn)(struct thread_data *, int);
1178 static struct log_type log_types[] = {
1180 .mask = BW_LOG_MASK,
1181 .fn = write_bandw_log,
1184 .mask = LAT_LOG_MASK,
1185 .fn = write_lat_log,
1188 .mask = SLAT_LOG_MASK,
1189 .fn = write_slat_log,
1192 .mask = CLAT_LOG_MASK,
1193 .fn = write_clat_log,
1196 .mask = IOPS_LOG_MASK,
1197 .fn = write_iops_log,
1201 void fio_writeout_logs(struct thread_data *td)
1203 unsigned int log_mask = 0;
1204 unsigned int log_left = ALL_LOG_NR;
1207 old_state = td_bump_runstate(td, TD_FINISHING);
1212 int prev_log_left = log_left;
1214 for (i = 0; i < ALL_LOG_NR && log_left; i++) {
1215 struct log_type *lt = &log_types[i];
1218 if (!(log_mask & lt->mask)) {
1219 ret = lt->fn(td, log_left != 1);
1222 log_mask |= lt->mask;
1227 if (prev_log_left == log_left)
1231 td_restore_runstate(td, old_state);