2 * Code related to writing an iolog of what a thread is doing, and to
3 * later read that back and replay
23 static const char iolog_ver2[] = "fio version 2 iolog";
25 void queue_io_piece(struct thread_data *td, struct io_piece *ipo)
27 flist_add_tail(&ipo->list, &td->io_log_list);
28 td->total_io_size += ipo->len;
31 void log_io_u(const struct thread_data *td, const struct io_u *io_u)
33 if (!td->o.write_iolog_file)
36 fprintf(td->iolog_f, "%s %s %llu %lu\n", io_u->file->file_name,
37 io_ddir_name(io_u->ddir),
38 io_u->offset, io_u->buflen);
41 void log_file(struct thread_data *td, struct fio_file *f,
42 enum file_log_act what)
44 const char *act[] = { "add", "open", "close" };
48 if (!td->o.write_iolog_file)
53 * this happens on the pre-open/close done before the job starts
58 fprintf(td->iolog_f, "%s %s\n", f->file_name, act[what]);
61 static void iolog_delay(struct thread_data *td, unsigned long delay)
63 uint64_t usec = utime_since_now(&td->last_issue);
67 if (delay < td->time_offset) {
72 delay -= td->time_offset;
78 fio_gettime(&tv, NULL);
79 while (delay && !td->terminate) {
81 if (this_delay > 500000)
84 usec_sleep(td, this_delay);
88 usec = utime_since_now(&tv);
90 td->time_offset = usec - delay;
95 static int ipo_special(struct thread_data *td, struct io_piece *ipo)
103 if (ipo->ddir != DDIR_INVAL)
106 f = td->files[ipo->fileno];
108 switch (ipo->file_action) {
109 case FIO_LOG_OPEN_FILE:
110 ret = td_io_open_file(td, f);
113 td_verror(td, ret, "iolog open file");
115 case FIO_LOG_CLOSE_FILE:
116 td_io_close_file(td, f);
118 case FIO_LOG_UNLINK_FILE:
119 td_io_unlink_file(td, f);
122 log_err("fio: bad file action %d\n", ipo->file_action);
129 int read_iolog_get(struct thread_data *td, struct io_u *io_u)
131 struct io_piece *ipo;
132 unsigned long elapsed;
134 while (!flist_empty(&td->io_log_list)) {
137 ipo = flist_first_entry(&td->io_log_list, struct io_piece, list);
138 flist_del(&ipo->list);
139 remove_trim_entry(td, ipo);
141 ret = ipo_special(td, ipo);
145 } else if (ret > 0) {
150 io_u->ddir = ipo->ddir;
151 if (ipo->ddir != DDIR_WAIT) {
152 io_u->offset = ipo->offset;
153 io_u->buflen = ipo->len;
154 io_u->file = td->files[ipo->fileno];
155 get_file(io_u->file);
156 dprint(FD_IO, "iolog: get %llu/%lu/%s\n", io_u->offset,
157 io_u->buflen, io_u->file->file_name);
159 iolog_delay(td, ipo->delay);
161 elapsed = mtime_since_genesis();
162 if (ipo->delay > elapsed)
163 usec_sleep(td, (ipo->delay - elapsed) * 1000);
168 if (io_u->ddir != DDIR_WAIT)
176 void prune_io_piece_log(struct thread_data *td)
178 struct io_piece *ipo;
181 while ((n = rb_first(&td->io_hist_tree)) != NULL) {
182 ipo = rb_entry(n, struct io_piece, rb_node);
183 rb_erase(n, &td->io_hist_tree);
184 remove_trim_entry(td, ipo);
189 while (!flist_empty(&td->io_hist_list)) {
190 ipo = flist_first_entry(&td->io_hist_list, struct io_piece, list);
191 flist_del(&ipo->list);
192 remove_trim_entry(td, ipo);
199 * log a successful write, so we can unwind the log for verify
201 void log_io_piece(struct thread_data *td, struct io_u *io_u)
203 struct rb_node **p, *parent;
204 struct io_piece *ipo, *__ipo;
206 ipo = malloc(sizeof(struct io_piece));
208 ipo->file = io_u->file;
209 ipo->offset = io_u->offset;
210 ipo->len = io_u->buflen;
211 ipo->numberio = io_u->numberio;
212 ipo->flags = IP_F_IN_FLIGHT;
216 if (io_u_should_trim(td, io_u)) {
217 flist_add_tail(&ipo->trim_list, &td->trim_list);
222 * We don't need to sort the entries, if:
224 * Sequential writes, or
225 * Random writes that lay out the file as it goes along
227 * For both these cases, just reading back data in the order we
228 * wrote it out is the fastest.
230 * One exception is if we don't have a random map AND we are doing
231 * verifies, in that case we need to check for duplicate blocks and
232 * drop the old one, which we rely on the rb insert/lookup for
235 if (((!td->o.verifysort) || !td_random(td) || !td->o.overwrite) &&
236 (file_randommap(td, ipo->file) || td->o.verify == VERIFY_NONE)) {
237 INIT_FLIST_HEAD(&ipo->list);
238 flist_add_tail(&ipo->list, &td->io_hist_list);
239 ipo->flags |= IP_F_ONLIST;
244 RB_CLEAR_NODE(&ipo->rb_node);
247 * Sort the entry into the verification list
250 p = &td->io_hist_tree.rb_node;
256 __ipo = rb_entry(parent, struct io_piece, rb_node);
257 if (ipo->file < __ipo->file)
259 else if (ipo->file > __ipo->file)
261 else if (ipo->offset < __ipo->offset) {
263 overlap = ipo->offset + ipo->len > __ipo->offset;
265 else if (ipo->offset > __ipo->offset) {
267 overlap = __ipo->offset + __ipo->len > ipo->offset;
273 dprint(FD_IO, "iolog: overlap %llu/%lu, %llu/%lu",
274 __ipo->offset, __ipo->len,
275 ipo->offset, ipo->len);
277 rb_erase(parent, &td->io_hist_tree);
278 remove_trim_entry(td, __ipo);
284 rb_link_node(&ipo->rb_node, parent, p);
285 rb_insert_color(&ipo->rb_node, &td->io_hist_tree);
286 ipo->flags |= IP_F_ONRB;
290 void unlog_io_piece(struct thread_data *td, struct io_u *io_u)
292 struct io_piece *ipo = io_u->ipo;
297 if (ipo->flags & IP_F_ONRB)
298 rb_erase(&ipo->rb_node, &td->io_hist_tree);
299 else if (ipo->flags & IP_F_ONLIST)
300 flist_del(&ipo->list);
307 void trim_io_piece(struct thread_data *td, const struct io_u *io_u)
309 struct io_piece *ipo = io_u->ipo;
314 ipo->len = io_u->xfer_buflen - io_u->resid;
317 void write_iolog_close(struct thread_data *td)
323 td->iolog_buf = NULL;
327 * Read version 2 iolog data. It is enhanced to include per-file logging,
330 static int read_iolog2(struct thread_data *td, FILE *f)
332 unsigned long long offset;
334 int reads, writes, waits, fileno = 0, file_action = 0; /* stupid gcc */
339 free_release_files(td);
342 * Read in the read iolog and store it, reuse the infrastructure
343 * for doing verifications.
346 fname = malloc(256+16);
347 act = malloc(256+16);
349 reads = writes = waits = 0;
350 while ((p = fgets(str, 4096, f)) != NULL) {
351 struct io_piece *ipo;
354 r = sscanf(p, "%256s %256s %llu %u", fname, act, &offset,
360 if (!strcmp(act, "wait"))
362 else if (!strcmp(act, "read"))
364 else if (!strcmp(act, "write"))
366 else if (!strcmp(act, "sync"))
368 else if (!strcmp(act, "datasync"))
370 else if (!strcmp(act, "trim"))
373 log_err("fio: bad iolog file action: %s\n",
377 fileno = get_fileno(td, fname);
380 if (!strcmp(act, "add")) {
381 fileno = add_file(td, fname, 0, 1);
382 file_action = FIO_LOG_ADD_FILE;
384 } else if (!strcmp(act, "open")) {
385 fileno = get_fileno(td, fname);
386 file_action = FIO_LOG_OPEN_FILE;
387 } else if (!strcmp(act, "close")) {
388 fileno = get_fileno(td, fname);
389 file_action = FIO_LOG_CLOSE_FILE;
391 log_err("fio: bad iolog file action: %s\n",
396 log_err("bad iolog2: %s", p);
402 else if (rw == DDIR_WRITE) {
404 * Don't add a write for ro mode
409 } else if (rw == DDIR_WAIT) {
411 } else if (rw == DDIR_INVAL) {
412 } else if (!ddir_sync(rw)) {
413 log_err("bad ddir: %d\n", rw);
420 ipo = malloc(sizeof(*ipo));
423 if (rw == DDIR_WAIT) {
426 ipo->offset = offset;
428 if (rw != DDIR_INVAL && bytes > td->o.max_bs[rw])
429 td->o.max_bs[rw] = bytes;
430 ipo->fileno = fileno;
431 ipo->file_action = file_action;
435 queue_io_piece(td, ipo);
442 if (writes && read_only) {
443 log_err("fio: <%s> skips replay of %d writes due to"
444 " read-only\n", td->o.name, writes);
448 if (!reads && !writes && !waits)
450 else if (reads && !writes)
451 td->o.td_ddir = TD_DDIR_READ;
452 else if (!reads && writes)
453 td->o.td_ddir = TD_DDIR_WRITE;
455 td->o.td_ddir = TD_DDIR_RW;
461 * open iolog, check version, and call appropriate parser
463 static int init_iolog_read(struct thread_data *td)
465 char buffer[256], *p;
469 f = fopen(td->o.read_iolog_file, "r");
471 perror("fopen read iolog");
475 p = fgets(buffer, sizeof(buffer), f);
477 td_verror(td, errno, "iolog read");
478 log_err("fio: unable to read iolog\n");
484 * version 2 of the iolog stores a specific string as the
485 * first line, check for that
487 if (!strncmp(iolog_ver2, buffer, strlen(iolog_ver2)))
488 ret = read_iolog2(td, f);
490 log_err("fio: iolog version 1 is no longer supported\n");
499 * Set up a log for storing io patterns.
501 static int init_iolog_write(struct thread_data *td)
507 f = fopen(td->o.write_iolog_file, "a");
509 perror("fopen write iolog");
514 * That's it for writing, setup a log buffer and we're done.
517 td->iolog_buf = malloc(8192);
518 setvbuf(f, td->iolog_buf, _IOFBF, 8192);
521 * write our version line
523 if (fprintf(f, "%s\n", iolog_ver2) < 0) {
524 perror("iolog init\n");
529 * add all known files
531 for_each_file(td, ff, i)
532 log_file(td, ff, FIO_LOG_ADD_FILE);
537 int init_iolog(struct thread_data *td)
541 if (td->o.read_iolog_file) {
545 * Check if it's a blktrace file and load that if possible.
546 * Otherwise assume it's a normal log file and load that.
548 if (is_blktrace(td->o.read_iolog_file, &need_swap))
549 ret = load_blktrace(td, td->o.read_iolog_file, need_swap);
551 ret = init_iolog_read(td);
552 } else if (td->o.write_iolog_file)
553 ret = init_iolog_write(td);
556 td_verror(td, EINVAL, "failed initializing iolog");
561 void setup_log(struct io_log **log, struct log_params *p,
562 const char *filename)
566 l = calloc(1, sizeof(*l));
568 l->max_samples = 1024;
569 l->log_type = p->log_type;
570 l->log_offset = p->log_offset;
571 l->log_gz = p->log_gz;
572 l->log_gz_store = p->log_gz_store;
573 l->log = malloc(l->max_samples * log_entry_sz(l));
574 l->avg_msec = p->avg_msec;
575 l->filename = strdup(filename);
579 l->log_ddir_mask = LOG_OFFSET_SAMPLE_BIT;
581 INIT_FLIST_HEAD(&l->chunk_list);
583 if (l->log_gz && !p->td)
585 else if (l->log_gz) {
586 pthread_mutex_init(&l->chunk_lock, NULL);
587 p->td->flags |= TD_F_COMPRESS_LOG;
593 #ifdef CONFIG_SETVBUF
594 static void *set_file_buffer(FILE *f)
596 size_t size = 1048576;
600 setvbuf(f, buf, _IOFBF, size);
604 static void clear_file_buffer(void *buf)
609 static void *set_file_buffer(FILE *f)
614 static void clear_file_buffer(void *buf)
619 void free_log(struct io_log *log)
626 static void flush_samples(FILE *f, void *samples, uint64_t sample_size)
630 uint64_t i, nr_samples;
635 s = __get_sample(samples, 0, 0);
636 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
638 nr_samples = sample_size / __log_entry_sz(log_offset);
640 for (i = 0; i < nr_samples; i++) {
641 s = __get_sample(samples, log_offset, i);
644 fprintf(f, "%lu, %lu, %u, %u\n",
645 (unsigned long) s->time,
646 (unsigned long) s->val,
647 io_sample_ddir(s), s->bs);
649 struct io_sample_offset *so = (void *) s;
651 fprintf(f, "%lu, %lu, %u, %u, %llu\n",
652 (unsigned long) s->time,
653 (unsigned long) s->val,
654 io_sample_ddir(s), s->bs,
655 (unsigned long long) so->offset);
662 struct iolog_flush_data {
669 struct iolog_compress {
670 struct flist_head list;
676 #define GZ_CHUNK 131072
678 static struct iolog_compress *get_new_chunk(unsigned int seq)
680 struct iolog_compress *c;
682 c = malloc(sizeof(*c));
683 INIT_FLIST_HEAD(&c->list);
684 c->buf = malloc(GZ_CHUNK);
690 static void free_chunk(struct iolog_compress *ic)
696 static int z_stream_init(z_stream *stream, int gz_hdr)
700 stream->zalloc = Z_NULL;
701 stream->zfree = Z_NULL;
702 stream->opaque = Z_NULL;
703 stream->next_in = Z_NULL;
706 * zlib magic - add 32 for auto-detection of gz header or not,
707 * if we decide to store files in a gzip friendly format.
712 if (inflateInit2(stream, wbits) != Z_OK)
718 struct inflate_chunk_iter {
727 static void finish_chunk(z_stream *stream, FILE *f,
728 struct inflate_chunk_iter *iter)
732 ret = inflateEnd(stream);
734 log_err("fio: failed to end log inflation (%d)\n", ret);
736 flush_samples(f, iter->buf, iter->buf_used);
739 iter->buf_size = iter->buf_used = 0;
743 * Iterative chunk inflation. Handles cases where we cross into a new
744 * sequence, doing flush finish of previous chunk if needed.
746 static size_t inflate_chunk(struct iolog_compress *ic, int gz_hdr, FILE *f,
747 z_stream *stream, struct inflate_chunk_iter *iter)
751 dprint(FD_COMPRESS, "inflate chunk size=%lu, seq=%u",
752 (unsigned long) ic->len, ic->seq);
754 if (ic->seq != iter->seq) {
756 finish_chunk(stream, f, iter);
758 z_stream_init(stream, gz_hdr);
762 stream->avail_in = ic->len;
763 stream->next_in = ic->buf;
765 if (!iter->buf_size) {
766 iter->buf_size = iter->chunk_sz;
767 iter->buf = malloc(iter->buf_size);
770 while (stream->avail_in) {
771 size_t this_out = iter->buf_size - iter->buf_used;
774 stream->avail_out = this_out;
775 stream->next_out = iter->buf + iter->buf_used;
777 err = inflate(stream, Z_NO_FLUSH);
779 log_err("fio: failed inflating log: %d\n", err);
784 iter->buf_used += this_out - stream->avail_out;
786 if (!stream->avail_out) {
787 iter->buf_size += iter->chunk_sz;
788 iter->buf = realloc(iter->buf, iter->buf_size);
792 if (err == Z_STREAM_END)
796 ret = (void *) stream->next_in - ic->buf;
798 dprint(FD_COMPRESS, "inflated to size=%lu\n", (unsigned long) ret);
804 * Inflate stored compressed chunks, or write them directly to the log
805 * file if so instructed.
807 static int inflate_gz_chunks(struct io_log *log, FILE *f)
809 struct inflate_chunk_iter iter = { .chunk_sz = log->log_gz, };
812 while (!flist_empty(&log->chunk_list)) {
813 struct iolog_compress *ic;
815 ic = flist_first_entry(&log->chunk_list, struct iolog_compress, list);
816 flist_del(&ic->list);
818 if (log->log_gz_store) {
821 dprint(FD_COMPRESS, "log write chunk size=%lu, "
822 "seq=%u\n", (unsigned long) ic->len, ic->seq);
824 ret = fwrite(ic->buf, ic->len, 1, f);
825 if (ret != 1 || ferror(f)) {
827 log_err("fio: error writing compressed log\n");
830 inflate_chunk(ic, log->log_gz_store, f, &stream, &iter);
836 finish_chunk(&stream, f, &iter);
844 * Open compressed log file and decompress the stored chunks and
845 * write them to stdout. The chunks are stored sequentially in the
846 * file, so we iterate over them and do them one-by-one.
848 int iolog_file_inflate(const char *file)
850 struct inflate_chunk_iter iter = { .chunk_sz = 64 * 1024 * 1024, };
851 struct iolog_compress ic;
859 f = fopen(file, "r");
865 if (stat(file, &sb) < 0) {
871 ic.buf = buf = malloc(sb.st_size);
875 ret = fread(ic.buf, ic.len, 1, f);
880 } else if (ret != 1) {
881 log_err("fio: short read on reading log\n");
889 * Each chunk will return Z_STREAM_END. We don't know how many
890 * chunks are in the file, so we just keep looping and incrementing
891 * the sequence number until we have consumed the whole compressed
898 iret = inflate_chunk(&ic, 1, stdout, &stream, &iter);
911 finish_chunk(&stream, stdout, &iter);
921 static int inflate_gz_chunks(struct io_log *log, FILE *f)
926 int iolog_file_inflate(const char *file)
928 log_err("fio: log inflation not possible without zlib\n");
934 void flush_log(struct io_log *log)
939 f = fopen(log->filename, "w");
945 buf = set_file_buffer(f);
947 inflate_gz_chunks(log, f);
949 flush_samples(f, log->log, log->nr_samples * log_entry_sz(log));
952 clear_file_buffer(buf);
955 static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
961 if (fio_trylock_file(log->filename))
964 fio_lock_file(log->filename);
966 if (td->client_type == FIO_CLIENT_TYPE_GUI)
967 fio_send_iolog(td, log, log->filename);
971 fio_unlock_file(log->filename);
979 * Invoked from our compress helper thread, when logging would have exceeded
980 * the specified memory limitation. Compresses the previously stored
983 static int gz_work(struct tp_work *work)
985 struct iolog_flush_data *data;
986 struct iolog_compress *c;
987 struct flist_head list;
993 INIT_FLIST_HEAD(&list);
995 data = container_of(work, struct iolog_flush_data, work);
997 stream.zalloc = Z_NULL;
998 stream.zfree = Z_NULL;
999 stream.opaque = Z_NULL;
1001 ret = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
1003 log_err("fio: failed to init gz stream\n");
1007 seq = ++data->log->chunk_seq;
1009 stream.next_in = (void *) data->samples;
1010 stream.avail_in = data->nr_samples * log_entry_sz(data->log);
1012 dprint(FD_COMPRESS, "deflate input size=%lu, seq=%u\n",
1013 (unsigned long) stream.avail_in, seq);
1015 c = get_new_chunk(seq);
1016 stream.avail_out = GZ_CHUNK;
1017 stream.next_out = c->buf;
1018 ret = deflate(&stream, Z_NO_FLUSH);
1020 log_err("fio: deflate log (%d)\n", ret);
1025 c->len = GZ_CHUNK - stream.avail_out;
1026 flist_add_tail(&c->list, &list);
1028 } while (stream.avail_in);
1030 stream.next_out = c->buf + c->len;
1031 stream.avail_out = GZ_CHUNK - c->len;
1033 ret = deflate(&stream, Z_FINISH);
1034 if (ret == Z_STREAM_END)
1035 c->len = GZ_CHUNK - stream.avail_out;
1038 c = get_new_chunk(seq);
1039 stream.avail_out = GZ_CHUNK;
1040 stream.next_out = c->buf;
1041 ret = deflate(&stream, Z_FINISH);
1042 c->len = GZ_CHUNK - stream.avail_out;
1044 flist_add_tail(&c->list, &list);
1045 } while (ret != Z_STREAM_END);
1048 dprint(FD_COMPRESS, "deflated to size=%lu\n", (unsigned long) total);
1050 ret = deflateEnd(&stream);
1052 log_err("fio: deflateEnd %d\n", ret);
1054 free(data->samples);
1056 if (!flist_empty(&list)) {
1057 pthread_mutex_lock(&data->log->chunk_lock);
1058 flist_splice_tail(&list, &data->log->chunk_list);
1059 pthread_mutex_unlock(&data->log->chunk_lock);
1066 pthread_cond_signal(&work->cv);
1072 while (!flist_empty(&list)) {
1073 c = flist_first_entry(list.next, struct iolog_compress, list);
1074 flist_del(&c->list);
1082 * Queue work item to compress the existing log entries. We copy the
1083 * samples, and reset the log sample count to 0 (so the logging will
1084 * continue to use the memory associated with the log). If called with
1085 * wait == 1, will not return until the log compression has completed.
1087 int iolog_flush(struct io_log *log, int wait)
1089 struct tp_data *tdat = log->td->tp_data;
1090 struct iolog_flush_data *data;
1093 data = malloc(sizeof(*data));
1099 sample_size = log->nr_samples * log_entry_sz(log);
1100 data->samples = malloc(sample_size);
1101 if (!data->samples) {
1106 memcpy(data->samples, log->log, sample_size);
1107 data->nr_samples = log->nr_samples;
1108 data->work.fn = gz_work;
1109 log->nr_samples = 0;
1112 pthread_mutex_init(&data->work.lock, NULL);
1113 pthread_cond_init(&data->work.cv, NULL);
1114 data->work.wait = 1;
1116 data->work.wait = 0;
1118 data->work.prio = 1;
1119 tp_queue_work(tdat, &data->work);
1122 pthread_mutex_lock(&data->work.lock);
1123 while (!data->work.done)
1124 pthread_cond_wait(&data->work.cv, &data->work.lock);
1125 pthread_mutex_unlock(&data->work.lock);
1134 int iolog_flush(struct io_log *log, int wait)
1141 static int write_iops_log(struct thread_data *td, int try)
1143 struct io_log *log = td->iops_log;
1148 return finish_log(td, log, try);
1151 static int write_slat_log(struct thread_data *td, int try)
1153 struct io_log *log = td->slat_log;
1158 return finish_log(td, log, try);
1161 static int write_clat_log(struct thread_data *td, int try)
1163 struct io_log *log = td->clat_log;
1168 return finish_log(td, log, try);
1171 static int write_lat_log(struct thread_data *td, int try)
1173 struct io_log *log = td->lat_log;
1178 return finish_log(td, log, try);
1181 static int write_bandw_log(struct thread_data *td, int try)
1183 struct io_log *log = td->bw_log;
1188 return finish_log(td, log, try);
1203 int (*fn)(struct thread_data *, int);
1206 static struct log_type log_types[] = {
1208 .mask = BW_LOG_MASK,
1209 .fn = write_bandw_log,
1212 .mask = LAT_LOG_MASK,
1213 .fn = write_lat_log,
1216 .mask = SLAT_LOG_MASK,
1217 .fn = write_slat_log,
1220 .mask = CLAT_LOG_MASK,
1221 .fn = write_clat_log,
1224 .mask = IOPS_LOG_MASK,
1225 .fn = write_iops_log,
1229 void fio_writeout_logs(struct thread_data *td)
1231 unsigned int log_mask = 0;
1232 unsigned int log_left = ALL_LOG_NR;
1235 old_state = td_bump_runstate(td, TD_FINISHING);
1240 int prev_log_left = log_left;
1242 for (i = 0; i < ALL_LOG_NR && log_left; i++) {
1243 struct log_type *lt = &log_types[i];
1246 if (!(log_mask & lt->mask)) {
1247 ret = lt->fn(td, log_left != 1);
1250 log_mask |= lt->mask;
1255 if (prev_log_left == log_left)
1259 td_restore_runstate(td, old_state);