2 * Code related to writing an iolog of what a thread is doing, and to
3 * later read that back and replay
23 static const char iolog_ver2[] = "fio version 2 iolog";
25 void queue_io_piece(struct thread_data *td, struct io_piece *ipo)
27 flist_add_tail(&ipo->list, &td->io_log_list);
28 td->total_io_size += ipo->len;
31 void log_io_u(const struct thread_data *td, const struct io_u *io_u)
33 if (!td->o.write_iolog_file)
36 fprintf(td->iolog_f, "%s %s %llu %lu\n", io_u->file->file_name,
37 io_ddir_name(io_u->ddir),
38 io_u->offset, io_u->buflen);
41 void log_file(struct thread_data *td, struct fio_file *f,
42 enum file_log_act what)
44 const char *act[] = { "add", "open", "close" };
48 if (!td->o.write_iolog_file)
53 * this happens on the pre-open/close done before the job starts
58 fprintf(td->iolog_f, "%s %s\n", f->file_name, act[what]);
61 static void iolog_delay(struct thread_data *td, unsigned long delay)
63 uint64_t usec = utime_since_now(&td->last_issue);
67 if (delay < td->time_offset) {
72 delay -= td->time_offset;
78 fio_gettime(&tv, NULL);
79 while (delay && !td->terminate) {
81 if (this_delay > 500000)
84 usec_sleep(td, this_delay);
88 usec = utime_since_now(&tv);
90 td->time_offset = usec - delay;
95 static int ipo_special(struct thread_data *td, struct io_piece *ipo)
103 if (ipo->ddir != DDIR_INVAL)
106 f = td->files[ipo->fileno];
108 switch (ipo->file_action) {
109 case FIO_LOG_OPEN_FILE:
110 ret = td_io_open_file(td, f);
113 td_verror(td, ret, "iolog open file");
115 case FIO_LOG_CLOSE_FILE:
116 td_io_close_file(td, f);
118 case FIO_LOG_UNLINK_FILE:
119 td_io_unlink_file(td, f);
122 log_err("fio: bad file action %d\n", ipo->file_action);
129 int read_iolog_get(struct thread_data *td, struct io_u *io_u)
131 struct io_piece *ipo;
132 unsigned long elapsed;
134 while (!flist_empty(&td->io_log_list)) {
137 ipo = flist_first_entry(&td->io_log_list, struct io_piece, list);
138 flist_del(&ipo->list);
139 remove_trim_entry(td, ipo);
141 ret = ipo_special(td, ipo);
145 } else if (ret > 0) {
150 io_u->ddir = ipo->ddir;
151 if (ipo->ddir != DDIR_WAIT) {
152 io_u->offset = ipo->offset;
153 io_u->buflen = ipo->len;
154 io_u->file = td->files[ipo->fileno];
155 get_file(io_u->file);
156 dprint(FD_IO, "iolog: get %llu/%lu/%s\n", io_u->offset,
157 io_u->buflen, io_u->file->file_name);
159 iolog_delay(td, ipo->delay);
161 elapsed = mtime_since_genesis();
162 if (ipo->delay > elapsed)
163 usec_sleep(td, (ipo->delay - elapsed) * 1000);
168 if (io_u->ddir != DDIR_WAIT)
176 void prune_io_piece_log(struct thread_data *td)
178 struct io_piece *ipo;
181 while ((n = rb_first(&td->io_hist_tree)) != NULL) {
182 ipo = rb_entry(n, struct io_piece, rb_node);
183 rb_erase(n, &td->io_hist_tree);
184 remove_trim_entry(td, ipo);
189 while (!flist_empty(&td->io_hist_list)) {
190 ipo = flist_first_entry(&td->io_hist_list, struct io_piece, list);
191 flist_del(&ipo->list);
192 remove_trim_entry(td, ipo);
199 * log a successful write, so we can unwind the log for verify
201 void log_io_piece(struct thread_data *td, struct io_u *io_u)
203 struct rb_node **p, *parent;
204 struct io_piece *ipo, *__ipo;
206 ipo = malloc(sizeof(struct io_piece));
208 ipo->file = io_u->file;
209 ipo->offset = io_u->offset;
210 ipo->len = io_u->buflen;
211 ipo->numberio = io_u->numberio;
212 ipo->flags = IP_F_IN_FLIGHT;
216 if (io_u_should_trim(td, io_u)) {
217 flist_add_tail(&ipo->trim_list, &td->trim_list);
222 * We don't need to sort the entries, if:
224 * Sequential writes, or
225 * Random writes that lay out the file as it goes along
227 * For both these cases, just reading back data in the order we
228 * wrote it out is the fastest.
230 * One exception is if we don't have a random map AND we are doing
231 * verifies, in that case we need to check for duplicate blocks and
232 * drop the old one, which we rely on the rb insert/lookup for
235 if (((!td->o.verifysort) || !td_random(td) || !td->o.overwrite) &&
236 (file_randommap(td, ipo->file) || td->o.verify == VERIFY_NONE)) {
237 INIT_FLIST_HEAD(&ipo->list);
238 flist_add_tail(&ipo->list, &td->io_hist_list);
239 ipo->flags |= IP_F_ONLIST;
244 RB_CLEAR_NODE(&ipo->rb_node);
247 * Sort the entry into the verification list
250 p = &td->io_hist_tree.rb_node;
256 __ipo = rb_entry(parent, struct io_piece, rb_node);
257 if (ipo->file < __ipo->file)
259 else if (ipo->file > __ipo->file)
261 else if (ipo->offset < __ipo->offset) {
263 overlap = ipo->offset + ipo->len > __ipo->offset;
265 else if (ipo->offset > __ipo->offset) {
267 overlap = __ipo->offset + __ipo->len > ipo->offset;
273 dprint(FD_IO, "iolog: overlap %llu/%lu, %llu/%lu",
274 __ipo->offset, __ipo->len,
275 ipo->offset, ipo->len);
277 rb_erase(parent, &td->io_hist_tree);
278 remove_trim_entry(td, __ipo);
284 rb_link_node(&ipo->rb_node, parent, p);
285 rb_insert_color(&ipo->rb_node, &td->io_hist_tree);
286 ipo->flags |= IP_F_ONRB;
290 void unlog_io_piece(struct thread_data *td, struct io_u *io_u)
292 struct io_piece *ipo = io_u->ipo;
297 if (ipo->flags & IP_F_ONRB)
298 rb_erase(&ipo->rb_node, &td->io_hist_tree);
299 else if (ipo->flags & IP_F_ONLIST)
300 flist_del(&ipo->list);
307 void trim_io_piece(struct thread_data *td, const struct io_u *io_u)
309 struct io_piece *ipo = io_u->ipo;
314 ipo->len = io_u->xfer_buflen - io_u->resid;
317 void write_iolog_close(struct thread_data *td)
323 td->iolog_buf = NULL;
327 * Read version 2 iolog data. It is enhanced to include per-file logging,
330 static int read_iolog2(struct thread_data *td, FILE *f)
332 unsigned long long offset;
334 int reads, writes, waits, fileno = 0, file_action = 0; /* stupid gcc */
339 free_release_files(td);
342 * Read in the read iolog and store it, reuse the infrastructure
343 * for doing verifications.
346 fname = malloc(256+16);
347 act = malloc(256+16);
349 reads = writes = waits = 0;
350 while ((p = fgets(str, 4096, f)) != NULL) {
351 struct io_piece *ipo;
354 r = sscanf(p, "%256s %256s %llu %u", fname, act, &offset,
360 if (!strcmp(act, "wait"))
362 else if (!strcmp(act, "read"))
364 else if (!strcmp(act, "write"))
366 else if (!strcmp(act, "sync"))
368 else if (!strcmp(act, "datasync"))
370 else if (!strcmp(act, "trim"))
373 log_err("fio: bad iolog file action: %s\n",
377 fileno = get_fileno(td, fname);
380 if (!strcmp(act, "add")) {
381 fileno = add_file(td, fname, 0, 1);
382 file_action = FIO_LOG_ADD_FILE;
384 } else if (!strcmp(act, "open")) {
385 fileno = get_fileno(td, fname);
386 file_action = FIO_LOG_OPEN_FILE;
387 } else if (!strcmp(act, "close")) {
388 fileno = get_fileno(td, fname);
389 file_action = FIO_LOG_CLOSE_FILE;
391 log_err("fio: bad iolog file action: %s\n",
396 log_err("bad iolog2: %s", p);
402 else if (rw == DDIR_WRITE) {
404 * Don't add a write for ro mode
409 } else if (rw == DDIR_WAIT) {
411 } else if (rw == DDIR_INVAL) {
412 } else if (!ddir_sync(rw)) {
413 log_err("bad ddir: %d\n", rw);
420 ipo = malloc(sizeof(*ipo));
423 if (rw == DDIR_WAIT) {
426 ipo->offset = offset;
428 if (rw != DDIR_INVAL && bytes > td->o.max_bs[rw])
429 td->o.max_bs[rw] = bytes;
430 ipo->fileno = fileno;
431 ipo->file_action = file_action;
435 queue_io_piece(td, ipo);
442 if (writes && read_only) {
443 log_err("fio: <%s> skips replay of %d writes due to"
444 " read-only\n", td->o.name, writes);
448 if (!reads && !writes && !waits)
450 else if (reads && !writes)
451 td->o.td_ddir = TD_DDIR_READ;
452 else if (!reads && writes)
453 td->o.td_ddir = TD_DDIR_WRITE;
455 td->o.td_ddir = TD_DDIR_RW;
461 * open iolog, check version, and call appropriate parser
463 static int init_iolog_read(struct thread_data *td)
465 char buffer[256], *p;
469 f = fopen(td->o.read_iolog_file, "r");
471 perror("fopen read iolog");
475 p = fgets(buffer, sizeof(buffer), f);
477 td_verror(td, errno, "iolog read");
478 log_err("fio: unable to read iolog\n");
484 * version 2 of the iolog stores a specific string as the
485 * first line, check for that
487 if (!strncmp(iolog_ver2, buffer, strlen(iolog_ver2)))
488 ret = read_iolog2(td, f);
490 log_err("fio: iolog version 1 is no longer supported\n");
499 * Set up a log for storing io patterns.
501 static int init_iolog_write(struct thread_data *td)
507 f = fopen(td->o.write_iolog_file, "a");
509 perror("fopen write iolog");
514 * That's it for writing, setup a log buffer and we're done.
517 td->iolog_buf = malloc(8192);
518 setvbuf(f, td->iolog_buf, _IOFBF, 8192);
521 * write our version line
523 if (fprintf(f, "%s\n", iolog_ver2) < 0) {
524 perror("iolog init\n");
529 * add all known files
531 for_each_file(td, ff, i)
532 log_file(td, ff, FIO_LOG_ADD_FILE);
537 int init_iolog(struct thread_data *td)
541 if (td->o.read_iolog_file) {
545 * Check if it's a blktrace file and load that if possible.
546 * Otherwise assume it's a normal log file and load that.
548 if (is_blktrace(td->o.read_iolog_file, &need_swap))
549 ret = load_blktrace(td, td->o.read_iolog_file, need_swap);
551 ret = init_iolog_read(td);
552 } else if (td->o.write_iolog_file)
553 ret = init_iolog_write(td);
556 td_verror(td, EINVAL, "failed initializing iolog");
561 void setup_log(struct io_log **log, struct log_params *p,
562 const char *filename)
566 l = calloc(1, sizeof(*l));
568 l->max_samples = 1024;
569 l->log_type = p->log_type;
570 l->log_offset = p->log_offset;
571 l->log_gz = p->log_gz;
572 l->log_gz_store = p->log_gz_store;
573 l->log = malloc(l->max_samples * log_entry_sz(l));
574 l->avg_msec = p->avg_msec;
575 l->filename = strdup(filename);
579 l->log_ddir_mask = LOG_OFFSET_SAMPLE_BIT;
581 INIT_FLIST_HEAD(&l->chunk_list);
583 if (l->log_gz && !p->td)
585 else if (l->log_gz) {
586 pthread_mutex_init(&l->chunk_lock, NULL);
587 p->td->flags |= TD_F_COMPRESS_LOG;
593 #ifdef CONFIG_SETVBUF
594 static void *set_file_buffer(FILE *f)
596 size_t size = 1048576;
600 setvbuf(f, buf, _IOFBF, size);
604 static void clear_file_buffer(void *buf)
609 static void *set_file_buffer(FILE *f)
614 static void clear_file_buffer(void *buf)
619 void free_log(struct io_log *log)
626 static void flush_samples(FILE *f, void *samples, uint64_t sample_size)
630 uint64_t i, nr_samples;
635 s = __get_sample(samples, 0, 0);
636 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
638 nr_samples = sample_size / __log_entry_sz(log_offset);
640 for (i = 0; i < nr_samples; i++) {
641 s = __get_sample(samples, log_offset, i);
644 fprintf(f, "%lu, %lu, %u, %u\n",
645 (unsigned long) s->time,
646 (unsigned long) s->val,
647 io_sample_ddir(s), s->bs);
649 struct io_sample_offset *so = (void *) s;
651 fprintf(f, "%lu, %lu, %u, %u, %llu\n",
652 (unsigned long) s->time,
653 (unsigned long) s->val,
654 io_sample_ddir(s), s->bs,
655 (unsigned long long) so->offset);
662 struct iolog_flush_data {
669 struct iolog_compress {
670 struct flist_head list;
676 #define GZ_CHUNK 131072
678 static struct iolog_compress *get_new_chunk(unsigned int seq)
680 struct iolog_compress *c;
682 c = malloc(sizeof(*c));
683 INIT_FLIST_HEAD(&c->list);
684 c->buf = malloc(GZ_CHUNK);
690 static void free_chunk(struct iolog_compress *ic)
696 static int z_stream_init(z_stream *stream, int gz_hdr)
700 stream->zalloc = Z_NULL;
701 stream->zfree = Z_NULL;
702 stream->opaque = Z_NULL;
703 stream->next_in = Z_NULL;
706 * zlib magic - add 32 for auto-detection of gz header or not,
707 * if we decide to store files in a gzip friendly format.
712 if (inflateInit2(stream, wbits) != Z_OK)
718 struct inflate_chunk_iter {
727 static void finish_chunk(z_stream *stream, FILE *f,
728 struct inflate_chunk_iter *iter)
732 ret = inflateEnd(stream);
734 log_err("fio: failed to end log inflation (%d)\n", ret);
736 flush_samples(f, iter->buf, iter->buf_used);
739 iter->buf_size = iter->buf_used = 0;
743 * Iterative chunk inflation. Handles cases where we cross into a new
744 * sequence, doing flush finish of previous chunk if needed.
746 static size_t inflate_chunk(struct iolog_compress *ic, int gz_hdr, FILE *f,
747 z_stream *stream, struct inflate_chunk_iter *iter)
751 dprint(FD_COMPRESS, "inflate chunk size=%lu, seq=%u",
752 (unsigned long) ic->len, ic->seq);
754 if (ic->seq != iter->seq) {
756 finish_chunk(stream, f, iter);
758 z_stream_init(stream, gz_hdr);
762 stream->avail_in = ic->len;
763 stream->next_in = ic->buf;
765 if (!iter->buf_size) {
766 iter->buf_size = iter->chunk_sz;
767 iter->buf = malloc(iter->buf_size);
770 while (stream->avail_in) {
771 size_t this_out = iter->buf_size - iter->buf_used;
774 stream->avail_out = this_out;
775 stream->next_out = iter->buf + iter->buf_used;
777 err = inflate(stream, Z_NO_FLUSH);
779 log_err("fio: failed inflating log: %d\n", err);
784 iter->buf_used += this_out - stream->avail_out;
786 if (!stream->avail_out) {
787 iter->buf_size += iter->chunk_sz;
788 iter->buf = realloc(iter->buf, iter->buf_size);
792 if (err == Z_STREAM_END)
796 ret = (void *) stream->next_in - ic->buf;
798 dprint(FD_COMPRESS, "inflated to size=%lu\n", (unsigned long) ret);
804 * Inflate stored compressed chunks, or write them directly to the log
805 * file if so instructed.
807 static int inflate_gz_chunks(struct io_log *log, FILE *f)
809 struct inflate_chunk_iter iter = { .chunk_sz = log->log_gz, };
812 while (!flist_empty(&log->chunk_list)) {
813 struct iolog_compress *ic;
815 ic = flist_first_entry(&log->chunk_list, struct iolog_compress, list);
816 flist_del(&ic->list);
818 if (log->log_gz_store) {
821 dprint(FD_COMPRESS, "log write chunk size=%lu, "
822 "seq=%u\n", (unsigned long) ic->len, ic->seq);
824 ret = fwrite(ic->buf, ic->len, 1, f);
825 if (ret != 1 || ferror(f)) {
827 log_err("fio: error writing compressed log\n");
830 inflate_chunk(ic, log->log_gz_store, f, &stream, &iter);
836 finish_chunk(&stream, f, &iter);
844 * Open compressed log file and decompress the stored chunks and
845 * write them to stdout. The chunks are stored sequentially in the
846 * file, so we iterate over them and do them one-by-one.
848 int iolog_file_inflate(const char *file)
850 struct inflate_chunk_iter iter = { .chunk_sz = 64 * 1024 * 1024, };
851 struct iolog_compress ic;
859 f = fopen(file, "r");
865 if (stat(file, &sb) < 0) {
871 ic.buf = buf = malloc(sb.st_size);
875 ret = fread(ic.buf, ic.len, 1, f);
881 } else if (ret != 1) {
882 log_err("fio: short read on reading log\n");
891 * Each chunk will return Z_STREAM_END. We don't know how many
892 * chunks are in the file, so we just keep looping and incrementing
893 * the sequence number until we have consumed the whole compressed
900 iret = inflate_chunk(&ic, 1, stdout, &stream, &iter);
913 finish_chunk(&stream, stdout, &iter);
923 static int inflate_gz_chunks(struct io_log *log, FILE *f)
928 int iolog_file_inflate(const char *file)
930 log_err("fio: log inflation not possible without zlib\n");
936 void flush_log(struct io_log *log)
941 f = fopen(log->filename, "w");
947 buf = set_file_buffer(f);
949 inflate_gz_chunks(log, f);
951 flush_samples(f, log->log, log->nr_samples * log_entry_sz(log));
954 clear_file_buffer(buf);
957 static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
963 if (fio_trylock_file(log->filename))
966 fio_lock_file(log->filename);
968 if (td->client_type == FIO_CLIENT_TYPE_GUI)
969 fio_send_iolog(td, log, log->filename);
973 fio_unlock_file(log->filename);
981 * Invoked from our compress helper thread, when logging would have exceeded
982 * the specified memory limitation. Compresses the previously stored
985 static int gz_work(struct tp_work *work)
987 struct iolog_flush_data *data;
988 struct iolog_compress *c;
989 struct flist_head list;
995 INIT_FLIST_HEAD(&list);
997 data = container_of(work, struct iolog_flush_data, work);
999 stream.zalloc = Z_NULL;
1000 stream.zfree = Z_NULL;
1001 stream.opaque = Z_NULL;
1003 ret = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
1005 log_err("fio: failed to init gz stream\n");
1009 seq = ++data->log->chunk_seq;
1011 stream.next_in = (void *) data->samples;
1012 stream.avail_in = data->nr_samples * log_entry_sz(data->log);
1014 dprint(FD_COMPRESS, "deflate input size=%lu, seq=%u\n",
1015 (unsigned long) stream.avail_in, seq);
1017 c = get_new_chunk(seq);
1018 stream.avail_out = GZ_CHUNK;
1019 stream.next_out = c->buf;
1020 ret = deflate(&stream, Z_NO_FLUSH);
1022 log_err("fio: deflate log (%d)\n", ret);
1027 c->len = GZ_CHUNK - stream.avail_out;
1028 flist_add_tail(&c->list, &list);
1030 } while (stream.avail_in);
1032 stream.next_out = c->buf + c->len;
1033 stream.avail_out = GZ_CHUNK - c->len;
1035 ret = deflate(&stream, Z_FINISH);
1036 if (ret == Z_STREAM_END)
1037 c->len = GZ_CHUNK - stream.avail_out;
1040 c = get_new_chunk(seq);
1041 stream.avail_out = GZ_CHUNK;
1042 stream.next_out = c->buf;
1043 ret = deflate(&stream, Z_FINISH);
1044 c->len = GZ_CHUNK - stream.avail_out;
1046 flist_add_tail(&c->list, &list);
1047 } while (ret != Z_STREAM_END);
1050 dprint(FD_COMPRESS, "deflated to size=%lu\n", (unsigned long) total);
1052 ret = deflateEnd(&stream);
1054 log_err("fio: deflateEnd %d\n", ret);
1056 free(data->samples);
1058 if (!flist_empty(&list)) {
1059 pthread_mutex_lock(&data->log->chunk_lock);
1060 flist_splice_tail(&list, &data->log->chunk_list);
1061 pthread_mutex_unlock(&data->log->chunk_lock);
1068 pthread_cond_signal(&work->cv);
1074 while (!flist_empty(&list)) {
1075 c = flist_first_entry(list.next, struct iolog_compress, list);
1076 flist_del(&c->list);
1084 * Queue work item to compress the existing log entries. We copy the
1085 * samples, and reset the log sample count to 0 (so the logging will
1086 * continue to use the memory associated with the log). If called with
1087 * wait == 1, will not return until the log compression has completed.
1089 int iolog_flush(struct io_log *log, int wait)
1091 struct tp_data *tdat = log->td->tp_data;
1092 struct iolog_flush_data *data;
1095 data = malloc(sizeof(*data));
1101 sample_size = log->nr_samples * log_entry_sz(log);
1102 data->samples = malloc(sample_size);
1103 if (!data->samples) {
1108 memcpy(data->samples, log->log, sample_size);
1109 data->nr_samples = log->nr_samples;
1110 data->work.fn = gz_work;
1111 log->nr_samples = 0;
1114 pthread_mutex_init(&data->work.lock, NULL);
1115 pthread_cond_init(&data->work.cv, NULL);
1116 data->work.wait = 1;
1118 data->work.wait = 0;
1120 data->work.prio = 1;
1121 tp_queue_work(tdat, &data->work);
1124 pthread_mutex_lock(&data->work.lock);
1125 while (!data->work.done)
1126 pthread_cond_wait(&data->work.cv, &data->work.lock);
1127 pthread_mutex_unlock(&data->work.lock);
1136 int iolog_flush(struct io_log *log, int wait)
1143 static int write_iops_log(struct thread_data *td, int try)
1145 struct io_log *log = td->iops_log;
1150 return finish_log(td, log, try);
1153 static int write_slat_log(struct thread_data *td, int try)
1155 struct io_log *log = td->slat_log;
1160 return finish_log(td, log, try);
1163 static int write_clat_log(struct thread_data *td, int try)
1165 struct io_log *log = td->clat_log;
1170 return finish_log(td, log, try);
1173 static int write_lat_log(struct thread_data *td, int try)
1175 struct io_log *log = td->lat_log;
1180 return finish_log(td, log, try);
1183 static int write_bandw_log(struct thread_data *td, int try)
1185 struct io_log *log = td->bw_log;
1190 return finish_log(td, log, try);
1205 int (*fn)(struct thread_data *, int);
1208 static struct log_type log_types[] = {
1210 .mask = BW_LOG_MASK,
1211 .fn = write_bandw_log,
1214 .mask = LAT_LOG_MASK,
1215 .fn = write_lat_log,
1218 .mask = SLAT_LOG_MASK,
1219 .fn = write_slat_log,
1222 .mask = CLAT_LOG_MASK,
1223 .fn = write_clat_log,
1226 .mask = IOPS_LOG_MASK,
1227 .fn = write_iops_log,
1231 void fio_writeout_logs(struct thread_data *td)
1233 unsigned int log_mask = 0;
1234 unsigned int log_left = ALL_LOG_NR;
1237 old_state = td_bump_runstate(td, TD_FINISHING);
1242 int prev_log_left = log_left;
1244 for (i = 0; i < ALL_LOG_NR && log_left; i++) {
1245 struct log_type *lt = &log_types[i];
1248 if (!(log_mask & lt->mask)) {
1249 ret = lt->fn(td, log_left != 1);
1252 log_mask |= lt->mask;
1257 if (prev_log_left == log_left)
1261 td_restore_runstate(td, old_state);