2 * Code related to writing an iolog of what a thread is doing, and to
3 * later read that back and replay
23 static const char iolog_ver2[] = "fio version 2 iolog";
25 void queue_io_piece(struct thread_data *td, struct io_piece *ipo)
27 flist_add_tail(&ipo->list, &td->io_log_list);
28 td->total_io_size += ipo->len;
31 void log_io_u(const struct thread_data *td, const struct io_u *io_u)
33 if (!td->o.write_iolog_file)
36 fprintf(td->iolog_f, "%s %s %llu %lu\n", io_u->file->file_name,
37 io_ddir_name(io_u->ddir),
38 io_u->offset, io_u->buflen);
41 void log_file(struct thread_data *td, struct fio_file *f,
42 enum file_log_act what)
44 const char *act[] = { "add", "open", "close" };
48 if (!td->o.write_iolog_file)
53 * this happens on the pre-open/close done before the job starts
58 fprintf(td->iolog_f, "%s %s\n", f->file_name, act[what]);
61 static void iolog_delay(struct thread_data *td, unsigned long delay)
63 unsigned long usec = utime_since_now(&td->last_issue);
64 unsigned long this_delay;
71 while (delay && !td->terminate) {
73 if (this_delay > 500000)
76 usec_sleep(td, this_delay);
81 static int ipo_special(struct thread_data *td, struct io_piece *ipo)
89 if (ipo->ddir != DDIR_INVAL)
92 f = td->files[ipo->fileno];
94 switch (ipo->file_action) {
95 case FIO_LOG_OPEN_FILE:
96 ret = td_io_open_file(td, f);
99 td_verror(td, ret, "iolog open file");
101 case FIO_LOG_CLOSE_FILE:
102 td_io_close_file(td, f);
104 case FIO_LOG_UNLINK_FILE:
105 td_io_unlink_file(td, f);
108 log_err("fio: bad file action %d\n", ipo->file_action);
115 int read_iolog_get(struct thread_data *td, struct io_u *io_u)
117 struct io_piece *ipo;
118 unsigned long elapsed;
120 while (!flist_empty(&td->io_log_list)) {
123 ipo = flist_first_entry(&td->io_log_list, struct io_piece, list);
124 flist_del(&ipo->list);
125 remove_trim_entry(td, ipo);
127 ret = ipo_special(td, ipo);
131 } else if (ret > 0) {
136 io_u->ddir = ipo->ddir;
137 if (ipo->ddir != DDIR_WAIT) {
138 io_u->offset = ipo->offset;
139 io_u->buflen = ipo->len;
140 io_u->file = td->files[ipo->fileno];
141 get_file(io_u->file);
142 dprint(FD_IO, "iolog: get %llu/%lu/%s\n", io_u->offset,
143 io_u->buflen, io_u->file->file_name);
145 iolog_delay(td, ipo->delay);
147 elapsed = mtime_since_genesis();
148 if (ipo->delay > elapsed)
149 usec_sleep(td, (ipo->delay - elapsed) * 1000);
154 if (io_u->ddir != DDIR_WAIT)
162 void prune_io_piece_log(struct thread_data *td)
164 struct io_piece *ipo;
167 while ((n = rb_first(&td->io_hist_tree)) != NULL) {
168 ipo = rb_entry(n, struct io_piece, rb_node);
169 rb_erase(n, &td->io_hist_tree);
170 remove_trim_entry(td, ipo);
175 while (!flist_empty(&td->io_hist_list)) {
176 ipo = flist_entry(&td->io_hist_list, struct io_piece, list);
177 flist_del(&ipo->list);
178 remove_trim_entry(td, ipo);
185 * log a successful write, so we can unwind the log for verify
187 void log_io_piece(struct thread_data *td, struct io_u *io_u)
189 struct rb_node **p, *parent;
190 struct io_piece *ipo, *__ipo;
192 ipo = malloc(sizeof(struct io_piece));
194 ipo->file = io_u->file;
195 ipo->offset = io_u->offset;
196 ipo->len = io_u->buflen;
197 ipo->numberio = io_u->numberio;
198 ipo->flags = IP_F_IN_FLIGHT;
202 if (io_u_should_trim(td, io_u)) {
203 flist_add_tail(&ipo->trim_list, &td->trim_list);
208 * We don't need to sort the entries, if:
210 * Sequential writes, or
211 * Random writes that lay out the file as it goes along
213 * For both these cases, just reading back data in the order we
214 * wrote it out is the fastest.
216 * One exception is if we don't have a random map AND we are doing
217 * verifies, in that case we need to check for duplicate blocks and
218 * drop the old one, which we rely on the rb insert/lookup for
221 if (((!td->o.verifysort) || !td_random(td) || !td->o.overwrite) &&
222 (file_randommap(td, ipo->file) || td->o.verify == VERIFY_NONE)) {
223 INIT_FLIST_HEAD(&ipo->list);
224 flist_add_tail(&ipo->list, &td->io_hist_list);
225 ipo->flags |= IP_F_ONLIST;
230 RB_CLEAR_NODE(&ipo->rb_node);
233 * Sort the entry into the verification list
236 p = &td->io_hist_tree.rb_node;
241 __ipo = rb_entry(parent, struct io_piece, rb_node);
242 if (ipo->file < __ipo->file)
244 else if (ipo->file > __ipo->file)
246 else if (ipo->offset < __ipo->offset)
248 else if (ipo->offset > __ipo->offset)
251 dprint(FD_IO, "iolog: overlap %llu/%lu, %llu/%lu",
252 __ipo->offset, __ipo->len,
253 ipo->offset, ipo->len);
255 rb_erase(parent, &td->io_hist_tree);
256 remove_trim_entry(td, __ipo);
262 rb_link_node(&ipo->rb_node, parent, p);
263 rb_insert_color(&ipo->rb_node, &td->io_hist_tree);
264 ipo->flags |= IP_F_ONRB;
268 void unlog_io_piece(struct thread_data *td, struct io_u *io_u)
270 struct io_piece *ipo = io_u->ipo;
275 if (ipo->flags & IP_F_ONRB)
276 rb_erase(&ipo->rb_node, &td->io_hist_tree);
277 else if (ipo->flags & IP_F_ONLIST)
278 flist_del(&ipo->list);
285 void trim_io_piece(struct thread_data *td, const struct io_u *io_u)
287 struct io_piece *ipo = io_u->ipo;
292 ipo->len = io_u->xfer_buflen - io_u->resid;
295 void write_iolog_close(struct thread_data *td)
301 td->iolog_buf = NULL;
305 * Read version 2 iolog data. It is enhanced to include per-file logging,
308 static int read_iolog2(struct thread_data *td, FILE *f)
310 unsigned long long offset;
312 int reads, writes, waits, fileno = 0, file_action = 0; /* stupid gcc */
317 free_release_files(td);
320 * Read in the read iolog and store it, reuse the infrastructure
321 * for doing verifications.
324 fname = malloc(256+16);
325 act = malloc(256+16);
327 reads = writes = waits = 0;
328 while ((p = fgets(str, 4096, f)) != NULL) {
329 struct io_piece *ipo;
332 r = sscanf(p, "%256s %256s %llu %u", fname, act, &offset,
338 if (!strcmp(act, "wait"))
340 else if (!strcmp(act, "read"))
342 else if (!strcmp(act, "write"))
344 else if (!strcmp(act, "sync"))
346 else if (!strcmp(act, "datasync"))
348 else if (!strcmp(act, "trim"))
351 log_err("fio: bad iolog file action: %s\n",
355 fileno = get_fileno(td, fname);
358 if (!strcmp(act, "add")) {
359 fileno = add_file(td, fname, 0, 1);
360 file_action = FIO_LOG_ADD_FILE;
362 } else if (!strcmp(act, "open")) {
363 fileno = get_fileno(td, fname);
364 file_action = FIO_LOG_OPEN_FILE;
365 } else if (!strcmp(act, "close")) {
366 fileno = get_fileno(td, fname);
367 file_action = FIO_LOG_CLOSE_FILE;
369 log_err("fio: bad iolog file action: %s\n",
374 log_err("bad iolog2: %s", p);
380 else if (rw == DDIR_WRITE) {
382 * Don't add a write for ro mode
387 } else if (rw == DDIR_WAIT) {
389 } else if (rw == DDIR_INVAL) {
390 } else if (!ddir_sync(rw)) {
391 log_err("bad ddir: %d\n", rw);
398 ipo = malloc(sizeof(*ipo));
401 if (rw == DDIR_WAIT) {
404 ipo->offset = offset;
406 if (rw != DDIR_INVAL && bytes > td->o.max_bs[rw])
407 td->o.max_bs[rw] = bytes;
408 ipo->fileno = fileno;
409 ipo->file_action = file_action;
413 queue_io_piece(td, ipo);
420 if (writes && read_only) {
421 log_err("fio: <%s> skips replay of %d writes due to"
422 " read-only\n", td->o.name, writes);
426 if (!reads && !writes && !waits)
428 else if (reads && !writes)
429 td->o.td_ddir = TD_DDIR_READ;
430 else if (!reads && writes)
431 td->o.td_ddir = TD_DDIR_WRITE;
433 td->o.td_ddir = TD_DDIR_RW;
439 * open iolog, check version, and call appropriate parser
441 static int init_iolog_read(struct thread_data *td)
443 char buffer[256], *p;
447 f = fopen(td->o.read_iolog_file, "r");
449 perror("fopen read iolog");
453 p = fgets(buffer, sizeof(buffer), f);
455 td_verror(td, errno, "iolog read");
456 log_err("fio: unable to read iolog\n");
462 * version 2 of the iolog stores a specific string as the
463 * first line, check for that
465 if (!strncmp(iolog_ver2, buffer, strlen(iolog_ver2)))
466 ret = read_iolog2(td, f);
468 log_err("fio: iolog version 1 is no longer supported\n");
477 * Set up a log for storing io patterns.
479 static int init_iolog_write(struct thread_data *td)
485 f = fopen(td->o.write_iolog_file, "a");
487 perror("fopen write iolog");
492 * That's it for writing, setup a log buffer and we're done.
495 td->iolog_buf = malloc(8192);
496 setvbuf(f, td->iolog_buf, _IOFBF, 8192);
499 * write our version line
501 if (fprintf(f, "%s\n", iolog_ver2) < 0) {
502 perror("iolog init\n");
507 * add all known files
509 for_each_file(td, ff, i)
510 log_file(td, ff, FIO_LOG_ADD_FILE);
515 int init_iolog(struct thread_data *td)
519 if (td->o.read_iolog_file) {
523 * Check if it's a blktrace file and load that if possible.
524 * Otherwise assume it's a normal log file and load that.
526 if (is_blktrace(td->o.read_iolog_file, &need_swap))
527 ret = load_blktrace(td, td->o.read_iolog_file, need_swap);
529 ret = init_iolog_read(td);
530 } else if (td->o.write_iolog_file)
531 ret = init_iolog_write(td);
534 td_verror(td, EINVAL, "failed initializing iolog");
539 void setup_log(struct io_log **log, struct log_params *p,
540 const char *filename)
544 l = calloc(1, sizeof(*l));
546 l->max_samples = 1024;
547 l->log_type = p->log_type;
548 l->log_offset = p->log_offset;
549 l->log_gz = p->log_gz;
550 l->log_gz_store = p->log_gz_store;
551 l->log = malloc(l->max_samples * log_entry_sz(l));
552 l->avg_msec = p->avg_msec;
553 l->filename = strdup(filename);
557 l->log_ddir_mask = LOG_OFFSET_SAMPLE_BIT;
559 INIT_FLIST_HEAD(&l->chunk_list);
561 if (l->log_gz && !p->td)
563 else if (l->log_gz) {
564 pthread_mutex_init(&l->chunk_lock, NULL);
565 p->td->flags |= TD_F_COMPRESS_LOG;
571 #ifdef CONFIG_SETVBUF
572 static void *set_file_buffer(FILE *f)
574 size_t size = 1048576;
578 setvbuf(f, buf, _IOFBF, size);
582 static void clear_file_buffer(void *buf)
587 static void *set_file_buffer(FILE *f)
592 static void clear_file_buffer(void *buf)
597 void free_log(struct io_log *log)
604 static void flush_samples(FILE *f, void *samples, uint64_t sample_size)
608 uint64_t i, nr_samples;
613 s = __get_sample(samples, 0, 0);
614 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
616 nr_samples = sample_size / __log_entry_sz(log_offset);
618 for (i = 0; i < nr_samples; i++) {
619 s = __get_sample(samples, log_offset, i);
622 fprintf(f, "%lu, %lu, %u, %u\n",
623 (unsigned long) s->time,
624 (unsigned long) s->val,
625 io_sample_ddir(s), s->bs);
627 struct io_sample_offset *so = (void *) s;
629 fprintf(f, "%lu, %lu, %u, %u, %llu\n",
630 (unsigned long) s->time,
631 (unsigned long) s->val,
632 io_sample_ddir(s), s->bs,
633 (unsigned long long) so->offset);
640 struct iolog_flush_data {
647 struct iolog_compress {
648 struct flist_head list;
654 #define GZ_CHUNK 131072
656 static struct iolog_compress *get_new_chunk(unsigned int seq)
658 struct iolog_compress *c;
660 c = malloc(sizeof(*c));
661 INIT_FLIST_HEAD(&c->list);
662 c->buf = malloc(GZ_CHUNK);
668 static void free_chunk(struct iolog_compress *ic)
674 static int z_stream_init(z_stream *stream, int gz_hdr)
678 stream->zalloc = Z_NULL;
679 stream->zfree = Z_NULL;
680 stream->opaque = Z_NULL;
681 stream->next_in = Z_NULL;
684 * zlib magic - add 32 for auto-detection of gz header or not,
685 * if we decide to store files in a gzip friendly format.
690 if (inflateInit2(stream, wbits) != Z_OK)
696 struct inflate_chunk_iter {
705 static void finish_chunk(z_stream *stream, FILE *f,
706 struct inflate_chunk_iter *iter)
710 ret = inflateEnd(stream);
712 log_err("fio: failed to end log inflation (%d)\n", ret);
714 flush_samples(f, iter->buf, iter->buf_used);
717 iter->buf_size = iter->buf_used = 0;
721 * Iterative chunk inflation. Handles cases where we cross into a new
722 * sequence, doing flush finish of previous chunk if needed.
724 static size_t inflate_chunk(struct iolog_compress *ic, int gz_hdr, FILE *f,
725 z_stream *stream, struct inflate_chunk_iter *iter)
729 dprint(FD_COMPRESS, "inflate chunk size=%lu, seq=%u",
730 (unsigned long) ic->len, ic->seq);
732 if (ic->seq != iter->seq) {
734 finish_chunk(stream, f, iter);
736 z_stream_init(stream, gz_hdr);
740 stream->avail_in = ic->len;
741 stream->next_in = ic->buf;
743 if (!iter->buf_size) {
744 iter->buf_size = iter->chunk_sz;
745 iter->buf = malloc(iter->buf_size);
748 while (stream->avail_in) {
749 size_t this_out = iter->buf_size - iter->buf_used;
752 stream->avail_out = this_out;
753 stream->next_out = iter->buf + iter->buf_used;
755 err = inflate(stream, Z_NO_FLUSH);
757 log_err("fio: failed inflating log: %d\n", err);
762 iter->buf_used += this_out - stream->avail_out;
764 if (!stream->avail_out) {
765 iter->buf_size += iter->chunk_sz;
766 iter->buf = realloc(iter->buf, iter->buf_size);
770 if (err == Z_STREAM_END)
774 ret = (void *) stream->next_in - ic->buf;
776 dprint(FD_COMPRESS, "inflated to size=%lu\n", (unsigned long) ret);
782 * Inflate stored compressed chunks, or write them directly to the log
783 * file if so instructed.
785 static int inflate_gz_chunks(struct io_log *log, FILE *f)
787 struct inflate_chunk_iter iter = { .chunk_sz = log->log_gz, };
790 while (!flist_empty(&log->chunk_list)) {
791 struct iolog_compress *ic;
793 ic = flist_first_entry(&log->chunk_list, struct iolog_compress, list);
794 flist_del(&ic->list);
796 if (log->log_gz_store) {
799 dprint(FD_COMPRESS, "log write chunk size=%lu, "
800 "seq=%u\n", (unsigned long) ic->len, ic->seq);
802 ret = fwrite(ic->buf, ic->len, 1, f);
803 if (ret != 1 || ferror(f)) {
805 log_err("fio: error writing compressed log\n");
808 inflate_chunk(ic, log->log_gz_store, f, &stream, &iter);
814 finish_chunk(&stream, f, &iter);
822 * Open compressed log file and decompress the stored chunks and
823 * write them to stdout. The chunks are stored sequentially in the
824 * file, so we iterate over them and do them one-by-one.
826 int iolog_file_inflate(const char *file)
828 struct inflate_chunk_iter iter = { .chunk_sz = 64 * 1024 * 1024, };
829 struct iolog_compress ic;
837 f = fopen(file, "r");
843 if (stat(file, &sb) < 0) {
849 ic.buf = buf = malloc(sb.st_size);
853 ret = fread(ic.buf, ic.len, 1, f);
858 } else if (ret != 1) {
859 log_err("fio: short read on reading log\n");
867 * Each chunk will return Z_STREAM_END. We don't know how many
868 * chunks are in the file, so we just keep looping and incrementing
869 * the sequence number until we have consumed the whole compressed
876 ret = inflate_chunk(&ic, 1, stdout, &stream, &iter);
889 finish_chunk(&stream, stdout, &iter);
899 static int inflate_gz_chunks(struct io_log *log, FILE *f)
904 int iolog_file_inflate(const char *file)
906 log_err("fio: log inflation not possible without zlib\n");
912 void flush_log(struct io_log *log)
917 f = fopen(log->filename, "w");
923 buf = set_file_buffer(f);
925 inflate_gz_chunks(log, f);
927 flush_samples(f, log->log, log->nr_samples * log_entry_sz(log));
930 clear_file_buffer(buf);
933 static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
939 if (fio_trylock_file(log->filename))
942 fio_lock_file(log->filename);
944 if (td->client_type == FIO_CLIENT_TYPE_GUI)
945 fio_send_iolog(td, log, log->filename);
949 fio_unlock_file(log->filename);
957 * Invoked from our compress helper thread, when logging would have exceeded
958 * the specified memory limitation. Compresses the previously stored
961 static int gz_work(struct tp_work *work)
963 struct iolog_flush_data *data;
964 struct iolog_compress *c;
965 struct flist_head list;
971 INIT_FLIST_HEAD(&list);
973 data = container_of(work, struct iolog_flush_data, work);
975 stream.zalloc = Z_NULL;
976 stream.zfree = Z_NULL;
977 stream.opaque = Z_NULL;
979 ret = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
981 log_err("fio: failed to init gz stream\n");
985 seq = ++data->log->chunk_seq;
987 stream.next_in = (void *) data->samples;
988 stream.avail_in = data->nr_samples * log_entry_sz(data->log);
990 dprint(FD_COMPRESS, "deflate input size=%lu, seq=%u\n",
991 (unsigned long) stream.avail_in, seq);
993 c = get_new_chunk(seq);
994 stream.avail_out = GZ_CHUNK;
995 stream.next_out = c->buf;
996 ret = deflate(&stream, Z_NO_FLUSH);
998 log_err("fio: deflate log (%d)\n", ret);
1003 c->len = GZ_CHUNK - stream.avail_out;
1004 flist_add_tail(&c->list, &list);
1006 } while (stream.avail_in);
1008 stream.next_out = c->buf + c->len;
1009 stream.avail_out = GZ_CHUNK - c->len;
1011 ret = deflate(&stream, Z_FINISH);
1012 if (ret == Z_STREAM_END)
1013 c->len = GZ_CHUNK - stream.avail_out;
1016 c = get_new_chunk(seq);
1017 stream.avail_out = GZ_CHUNK;
1018 stream.next_out = c->buf;
1019 ret = deflate(&stream, Z_FINISH);
1020 c->len = GZ_CHUNK - stream.avail_out;
1022 flist_add_tail(&c->list, &list);
1023 } while (ret != Z_STREAM_END);
1026 dprint(FD_COMPRESS, "deflated to size=%lu\n", (unsigned long) total);
1028 ret = deflateEnd(&stream);
1030 log_err("fio: deflateEnd %d\n", ret);
1032 free(data->samples);
1034 if (!flist_empty(&list)) {
1035 pthread_mutex_lock(&data->log->chunk_lock);
1036 flist_splice_tail(&list, &data->log->chunk_list);
1037 pthread_mutex_unlock(&data->log->chunk_lock);
1044 pthread_cond_signal(&work->cv);
1050 while (!flist_empty(&list)) {
1051 c = flist_first_entry(list.next, struct iolog_compress, list);
1052 flist_del(&c->list);
1060 * Queue work item to compress the existing log entries. We copy the
1061 * samples, and reset the log sample count to 0 (so the logging will
1062 * continue to use the memory associated with the log). If called with
1063 * wait == 1, will not return until the log compression has completed.
1065 int iolog_flush(struct io_log *log, int wait)
1067 struct tp_data *tdat = log->td->tp_data;
1068 struct iolog_flush_data *data;
1071 data = malloc(sizeof(*data));
1077 sample_size = log->nr_samples * log_entry_sz(log);
1078 data->samples = malloc(sample_size);
1079 if (!data->samples) {
1084 memcpy(data->samples, log->log, sample_size);
1085 data->nr_samples = log->nr_samples;
1086 data->work.fn = gz_work;
1087 log->nr_samples = 0;
1090 pthread_mutex_init(&data->work.lock, NULL);
1091 pthread_cond_init(&data->work.cv, NULL);
1092 data->work.wait = 1;
1094 data->work.wait = 0;
1096 data->work.prio = 1;
1097 tp_queue_work(tdat, &data->work);
1100 pthread_mutex_lock(&data->work.lock);
1101 while (!data->work.done)
1102 pthread_cond_wait(&data->work.cv, &data->work.lock);
1103 pthread_mutex_unlock(&data->work.lock);
1112 int iolog_flush(struct io_log *log, int wait)
1119 static int write_iops_log(struct thread_data *td, int try)
1121 struct io_log *log = td->iops_log;
1126 return finish_log(td, log, try);
1129 static int write_slat_log(struct thread_data *td, int try)
1131 struct io_log *log = td->slat_log;
1136 return finish_log(td, log, try);
1139 static int write_clat_log(struct thread_data *td, int try)
1141 struct io_log *log = td->clat_log;
1146 return finish_log(td, log, try);
1149 static int write_lat_log(struct thread_data *td, int try)
1151 struct io_log *log = td->lat_log;
1156 return finish_log(td, log, try);
1159 static int write_bandw_log(struct thread_data *td, int try)
1161 struct io_log *log = td->bw_log;
1166 return finish_log(td, log, try);
1181 int (*fn)(struct thread_data *, int);
1184 static struct log_type log_types[] = {
1186 .mask = BW_LOG_MASK,
1187 .fn = write_bandw_log,
1190 .mask = LAT_LOG_MASK,
1191 .fn = write_lat_log,
1194 .mask = SLAT_LOG_MASK,
1195 .fn = write_slat_log,
1198 .mask = CLAT_LOG_MASK,
1199 .fn = write_clat_log,
1202 .mask = IOPS_LOG_MASK,
1203 .fn = write_iops_log,
1207 void fio_writeout_logs(struct thread_data *td)
1209 unsigned int log_mask = 0;
1210 unsigned int log_left = ALL_LOG_NR;
1213 old_state = td_bump_runstate(td, TD_FINISHING);
1218 int prev_log_left = log_left;
1220 for (i = 0; i < ALL_LOG_NR && log_left; i++) {
1221 struct log_type *lt = &log_types[i];
1224 if (!(log_mask & lt->mask)) {
1225 ret = lt->fn(td, log_left != 1);
1228 log_mask |= lt->mask;
1233 if (prev_log_left == log_left)
1237 td_restore_runstate(td, old_state);