2 * Code related to writing an iolog of what a thread is doing, and to
3 * later read that back and replay
23 static const char iolog_ver2[] = "fio version 2 iolog";
25 void queue_io_piece(struct thread_data *td, struct io_piece *ipo)
27 flist_add_tail(&ipo->list, &td->io_log_list);
28 td->total_io_size += ipo->len;
31 void log_io_u(struct thread_data *td, struct io_u *io_u)
33 const char *act[] = { "read", "write", "sync", "datasync",
34 "sync_file_range", "wait", "trim" };
36 assert(io_u->ddir <= 6);
38 if (!td->o.write_iolog_file)
41 fprintf(td->iolog_f, "%s %s %llu %lu\n", io_u->file->file_name,
42 act[io_u->ddir], io_u->offset,
46 void log_file(struct thread_data *td, struct fio_file *f,
47 enum file_log_act what)
49 const char *act[] = { "add", "open", "close" };
53 if (!td->o.write_iolog_file)
58 * this happens on the pre-open/close done before the job starts
63 fprintf(td->iolog_f, "%s %s\n", f->file_name, act[what]);
66 static void iolog_delay(struct thread_data *td, unsigned long delay)
68 unsigned long usec = utime_since_now(&td->last_issue);
69 unsigned long this_delay;
76 while (delay && !td->terminate) {
78 if (this_delay > 500000)
81 usec_sleep(td, this_delay);
86 static int ipo_special(struct thread_data *td, struct io_piece *ipo)
94 if (ipo->ddir != DDIR_INVAL)
97 f = td->files[ipo->fileno];
99 switch (ipo->file_action) {
100 case FIO_LOG_OPEN_FILE:
101 ret = td_io_open_file(td, f);
104 td_verror(td, ret, "iolog open file");
106 case FIO_LOG_CLOSE_FILE:
107 td_io_close_file(td, f);
109 case FIO_LOG_UNLINK_FILE:
110 td_io_unlink_file(td, f);
113 log_err("fio: bad file action %d\n", ipo->file_action);
120 int read_iolog_get(struct thread_data *td, struct io_u *io_u)
122 struct io_piece *ipo;
123 unsigned long elapsed;
125 while (!flist_empty(&td->io_log_list)) {
128 ipo = flist_first_entry(&td->io_log_list, struct io_piece, list);
129 flist_del(&ipo->list);
130 remove_trim_entry(td, ipo);
132 ret = ipo_special(td, ipo);
136 } else if (ret > 0) {
141 io_u->ddir = ipo->ddir;
142 if (ipo->ddir != DDIR_WAIT) {
143 io_u->offset = ipo->offset;
144 io_u->buflen = ipo->len;
145 io_u->file = td->files[ipo->fileno];
146 get_file(io_u->file);
147 dprint(FD_IO, "iolog: get %llu/%lu/%s\n", io_u->offset,
148 io_u->buflen, io_u->file->file_name);
150 iolog_delay(td, ipo->delay);
152 elapsed = mtime_since_genesis();
153 if (ipo->delay > elapsed)
154 usec_sleep(td, (ipo->delay - elapsed) * 1000);
159 if (io_u->ddir != DDIR_WAIT)
167 void prune_io_piece_log(struct thread_data *td)
169 struct io_piece *ipo;
172 while ((n = rb_first(&td->io_hist_tree)) != NULL) {
173 ipo = rb_entry(n, struct io_piece, rb_node);
174 rb_erase(n, &td->io_hist_tree);
175 remove_trim_entry(td, ipo);
180 while (!flist_empty(&td->io_hist_list)) {
181 ipo = flist_entry(&td->io_hist_list, struct io_piece, list);
182 flist_del(&ipo->list);
183 remove_trim_entry(td, ipo);
190 * log a successful write, so we can unwind the log for verify
192 void log_io_piece(struct thread_data *td, struct io_u *io_u)
194 struct rb_node **p, *parent;
195 struct io_piece *ipo, *__ipo;
197 ipo = malloc(sizeof(struct io_piece));
199 ipo->file = io_u->file;
200 ipo->offset = io_u->offset;
201 ipo->len = io_u->buflen;
202 ipo->numberio = io_u->numberio;
203 ipo->flags = IP_F_IN_FLIGHT;
207 if (io_u_should_trim(td, io_u)) {
208 flist_add_tail(&ipo->trim_list, &td->trim_list);
213 * We don't need to sort the entries, if:
215 * Sequential writes, or
216 * Random writes that lay out the file as it goes along
218 * For both these cases, just reading back data in the order we
219 * wrote it out is the fastest.
221 * One exception is if we don't have a random map AND we are doing
222 * verifies, in that case we need to check for duplicate blocks and
223 * drop the old one, which we rely on the rb insert/lookup for
226 if (((!td->o.verifysort) || !td_random(td) || !td->o.overwrite) &&
227 (file_randommap(td, ipo->file) || td->o.verify == VERIFY_NONE)) {
228 INIT_FLIST_HEAD(&ipo->list);
229 flist_add_tail(&ipo->list, &td->io_hist_list);
230 ipo->flags |= IP_F_ONLIST;
235 RB_CLEAR_NODE(&ipo->rb_node);
238 * Sort the entry into the verification list
241 p = &td->io_hist_tree.rb_node;
246 __ipo = rb_entry(parent, struct io_piece, rb_node);
247 if (ipo->file < __ipo->file)
249 else if (ipo->file > __ipo->file)
251 else if (ipo->offset < __ipo->offset)
253 else if (ipo->offset > __ipo->offset)
256 dprint(FD_IO, "iolog: overlap %llu/%lu, %llu/%lu",
257 __ipo->offset, __ipo->len,
258 ipo->offset, ipo->len);
260 rb_erase(parent, &td->io_hist_tree);
261 remove_trim_entry(td, __ipo);
267 rb_link_node(&ipo->rb_node, parent, p);
268 rb_insert_color(&ipo->rb_node, &td->io_hist_tree);
269 ipo->flags |= IP_F_ONRB;
273 void unlog_io_piece(struct thread_data *td, struct io_u *io_u)
275 struct io_piece *ipo = io_u->ipo;
280 if (ipo->flags & IP_F_ONRB)
281 rb_erase(&ipo->rb_node, &td->io_hist_tree);
282 else if (ipo->flags & IP_F_ONLIST)
283 flist_del(&ipo->list);
290 void trim_io_piece(struct thread_data *td, struct io_u *io_u)
292 struct io_piece *ipo = io_u->ipo;
297 ipo->len = io_u->xfer_buflen - io_u->resid;
300 void write_iolog_close(struct thread_data *td)
306 td->iolog_buf = NULL;
310 * Read version 2 iolog data. It is enhanced to include per-file logging,
313 static int read_iolog2(struct thread_data *td, FILE *f)
315 unsigned long long offset;
317 int reads, writes, waits, fileno = 0, file_action = 0; /* stupid gcc */
322 free_release_files(td);
325 * Read in the read iolog and store it, reuse the infrastructure
326 * for doing verifications.
329 fname = malloc(256+16);
330 act = malloc(256+16);
332 reads = writes = waits = 0;
333 while ((p = fgets(str, 4096, f)) != NULL) {
334 struct io_piece *ipo;
337 r = sscanf(p, "%256s %256s %llu %u", fname, act, &offset,
343 if (!strcmp(act, "wait"))
345 else if (!strcmp(act, "read"))
347 else if (!strcmp(act, "write"))
349 else if (!strcmp(act, "sync"))
351 else if (!strcmp(act, "datasync"))
353 else if (!strcmp(act, "trim"))
356 log_err("fio: bad iolog file action: %s\n",
360 fileno = get_fileno(td, fname);
363 if (!strcmp(act, "add")) {
364 fileno = add_file(td, fname, 0, 1);
365 file_action = FIO_LOG_ADD_FILE;
367 } else if (!strcmp(act, "open")) {
368 fileno = get_fileno(td, fname);
369 file_action = FIO_LOG_OPEN_FILE;
370 } else if (!strcmp(act, "close")) {
371 fileno = get_fileno(td, fname);
372 file_action = FIO_LOG_CLOSE_FILE;
374 log_err("fio: bad iolog file action: %s\n",
379 log_err("bad iolog2: %s", p);
385 else if (rw == DDIR_WRITE) {
387 * Don't add a write for ro mode
392 } else if (rw == DDIR_WAIT) {
394 } else if (rw == DDIR_INVAL) {
395 } else if (!ddir_sync(rw)) {
396 log_err("bad ddir: %d\n", rw);
403 ipo = malloc(sizeof(*ipo));
406 if (rw == DDIR_WAIT) {
409 ipo->offset = offset;
411 if (rw != DDIR_INVAL && bytes > td->o.max_bs[rw])
412 td->o.max_bs[rw] = bytes;
413 ipo->fileno = fileno;
414 ipo->file_action = file_action;
418 queue_io_piece(td, ipo);
425 if (writes && read_only) {
426 log_err("fio: <%s> skips replay of %d writes due to"
427 " read-only\n", td->o.name, writes);
431 if (!reads && !writes && !waits)
433 else if (reads && !writes)
434 td->o.td_ddir = TD_DDIR_READ;
435 else if (!reads && writes)
436 td->o.td_ddir = TD_DDIR_WRITE;
438 td->o.td_ddir = TD_DDIR_RW;
444 * open iolog, check version, and call appropriate parser
446 static int init_iolog_read(struct thread_data *td)
448 char buffer[256], *p;
452 f = fopen(td->o.read_iolog_file, "r");
454 perror("fopen read iolog");
458 p = fgets(buffer, sizeof(buffer), f);
460 td_verror(td, errno, "iolog read");
461 log_err("fio: unable to read iolog\n");
467 * version 2 of the iolog stores a specific string as the
468 * first line, check for that
470 if (!strncmp(iolog_ver2, buffer, strlen(iolog_ver2)))
471 ret = read_iolog2(td, f);
473 log_err("fio: iolog version 1 is no longer supported\n");
482 * Set up a log for storing io patterns.
484 static int init_iolog_write(struct thread_data *td)
490 f = fopen(td->o.write_iolog_file, "a");
492 perror("fopen write iolog");
497 * That's it for writing, setup a log buffer and we're done.
500 td->iolog_buf = malloc(8192);
501 setvbuf(f, td->iolog_buf, _IOFBF, 8192);
504 * write our version line
506 if (fprintf(f, "%s\n", iolog_ver2) < 0) {
507 perror("iolog init\n");
512 * add all known files
514 for_each_file(td, ff, i)
515 log_file(td, ff, FIO_LOG_ADD_FILE);
520 int init_iolog(struct thread_data *td)
524 if (td->o.read_iolog_file) {
528 * Check if it's a blktrace file and load that if possible.
529 * Otherwise assume it's a normal log file and load that.
531 if (is_blktrace(td->o.read_iolog_file, &need_swap))
532 ret = load_blktrace(td, td->o.read_iolog_file, need_swap);
534 ret = init_iolog_read(td);
535 } else if (td->o.write_iolog_file)
536 ret = init_iolog_write(td);
539 td_verror(td, EINVAL, "failed initializing iolog");
544 void setup_log(struct io_log **log, struct log_params *p,
545 const char *filename)
547 struct io_log *l = malloc(sizeof(*l));
549 memset(l, 0, sizeof(*l));
551 l->max_samples = 1024;
552 l->log_type = p->log_type;
553 l->log_offset = p->log_offset;
554 l->log_gz = p->log_gz;
555 l->log_gz_store = p->log_gz_store;
556 l->log = malloc(l->max_samples * log_entry_sz(l));
557 l->avg_msec = p->avg_msec;
558 l->filename = strdup(filename);
562 l->log_ddir_mask = LOG_OFFSET_SAMPLE_BIT;
564 INIT_FLIST_HEAD(&l->chunk_list);
566 if (l->log_gz && !p->td)
568 else if (l->log_gz) {
569 pthread_mutex_init(&l->chunk_lock, NULL);
570 p->td->flags |= TD_F_COMPRESS_LOG;
576 #ifdef CONFIG_SETVBUF
577 static void *set_file_buffer(FILE *f)
579 size_t size = 1048576;
583 setvbuf(f, buf, _IOFBF, size);
587 static void clear_file_buffer(void *buf)
592 static void *set_file_buffer(FILE *f)
597 static void clear_file_buffer(void *buf)
602 void free_log(struct io_log *log)
609 static void flush_samples(FILE *f, void *samples, uint64_t sample_size)
613 uint64_t i, nr_samples;
618 s = __get_sample(samples, 0, 0);
619 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
621 nr_samples = sample_size / __log_entry_sz(log_offset);
623 for (i = 0; i < nr_samples; i++) {
624 s = __get_sample(samples, log_offset, i);
627 fprintf(f, "%lu, %lu, %u, %u\n",
628 (unsigned long) s->time,
629 (unsigned long) s->val,
630 io_sample_ddir(s), s->bs);
632 struct io_sample_offset *so = (void *) s;
634 fprintf(f, "%lu, %lu, %u, %u, %llu\n",
635 (unsigned long) s->time,
636 (unsigned long) s->val,
637 io_sample_ddir(s), s->bs,
638 (unsigned long long) so->offset);
645 struct iolog_flush_data {
652 struct iolog_compress {
653 struct flist_head list;
659 #define GZ_CHUNK 131072
661 static struct iolog_compress *get_new_chunk(unsigned int seq)
663 struct iolog_compress *c;
665 c = malloc(sizeof(*c));
666 INIT_FLIST_HEAD(&c->list);
667 c->buf = malloc(GZ_CHUNK);
673 static void free_chunk(struct iolog_compress *ic)
679 static int z_stream_init(z_stream *stream, int gz_hdr)
683 stream->zalloc = Z_NULL;
684 stream->zfree = Z_NULL;
685 stream->opaque = Z_NULL;
686 stream->next_in = Z_NULL;
689 * zlib magic - add 32 for auto-detection of gz header or not,
690 * if we decide to store files in a gzip friendly format.
695 if (inflateInit2(stream, wbits) != Z_OK)
701 struct inflate_chunk_iter {
710 static void finish_chunk(z_stream *stream, FILE *f,
711 struct inflate_chunk_iter *iter)
715 ret = inflateEnd(stream);
717 log_err("fio: failed to end log inflation (%d)\n", ret);
719 flush_samples(f, iter->buf, iter->buf_used);
722 iter->buf_size = iter->buf_used = 0;
726 * Iterative chunk inflation. Handles cases where we cross into a new
727 * sequence, doing flush finish of previous chunk if needed.
729 static size_t inflate_chunk(struct iolog_compress *ic, int gz_hdr, FILE *f,
730 z_stream *stream, struct inflate_chunk_iter *iter)
734 dprint(FD_COMPRESS, "inflate chunk size=%lu, seq=%u",
735 (unsigned long) ic->len, ic->seq);
737 if (ic->seq != iter->seq) {
739 finish_chunk(stream, f, iter);
741 z_stream_init(stream, gz_hdr);
745 stream->avail_in = ic->len;
746 stream->next_in = ic->buf;
748 if (!iter->buf_size) {
749 iter->buf_size = iter->chunk_sz;
750 iter->buf = malloc(iter->buf_size);
753 while (stream->avail_in) {
754 size_t this_out = iter->buf_size - iter->buf_used;
757 stream->avail_out = this_out;
758 stream->next_out = iter->buf + iter->buf_used;
760 err = inflate(stream, Z_NO_FLUSH);
762 log_err("fio: failed inflating log: %d\n", err);
767 iter->buf_used += this_out - stream->avail_out;
769 if (!stream->avail_out) {
770 iter->buf_size += iter->chunk_sz;
771 iter->buf = realloc(iter->buf, iter->buf_size);
775 if (err == Z_STREAM_END)
779 ret = (void *) stream->next_in - ic->buf;
781 dprint(FD_COMPRESS, "inflated to size=%lu\n", (unsigned long) ret);
787 * Inflate stored compressed chunks, or write them directly to the log
788 * file if so instructed.
790 static int inflate_gz_chunks(struct io_log *log, FILE *f)
792 struct inflate_chunk_iter iter = { .chunk_sz = log->log_gz, };
795 while (!flist_empty(&log->chunk_list)) {
796 struct iolog_compress *ic;
798 ic = flist_first_entry(&log->chunk_list, struct iolog_compress, list);
799 flist_del(&ic->list);
801 if (log->log_gz_store) {
804 dprint(FD_COMPRESS, "log write chunk size=%lu, "
805 "seq=%u\n", (unsigned long) ic->len, ic->seq);
807 ret = fwrite(ic->buf, ic->len, 1, f);
808 if (ret != 1 || ferror(f)) {
810 log_err("fio: error writing compressed log\n");
813 inflate_chunk(ic, log->log_gz_store, f, &stream, &iter);
819 finish_chunk(&stream, f, &iter);
827 * Open compressed log file and decompress the stored chunks and
828 * write them to stdout. The chunks are stored sequentially in the
829 * file, so we iterate over them and do them one-by-one.
831 int iolog_file_inflate(const char *file)
833 struct inflate_chunk_iter iter = { .chunk_sz = 64 * 1024 * 1024, };
834 struct iolog_compress ic;
842 f = fopen(file, "r");
848 if (stat(file, &sb) < 0) {
854 ic.buf = buf = malloc(sb.st_size);
858 ret = fread(ic.buf, ic.len, 1, f);
863 } else if (ret != 1) {
864 log_err("fio: short read on reading log\n");
872 * Each chunk will return Z_STREAM_END. We don't know how many
873 * chunks are in the file, so we just keep looping and incrementing
874 * the sequence number until we have consumed the whole compressed
881 ret = inflate_chunk(&ic, 1, stdout, &stream, &iter);
894 finish_chunk(&stream, stdout, &iter);
904 static int inflate_gz_chunks(struct io_log *log, FILE *f)
909 int iolog_file_inflate(const char *file)
911 log_err("fio: log inflation not possible without zlib\n");
917 void flush_log(struct io_log *log)
922 f = fopen(log->filename, "w");
928 buf = set_file_buffer(f);
930 inflate_gz_chunks(log, f);
932 flush_samples(f, log->log, log->nr_samples * log_entry_sz(log));
935 clear_file_buffer(buf);
938 static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
944 if (fio_trylock_file(log->filename))
947 fio_lock_file(log->filename);
949 if (td->client_type == FIO_CLIENT_TYPE_GUI)
950 fio_send_iolog(td, log, log->filename);
954 fio_unlock_file(log->filename);
962 * Invoked from our compress helper thread, when logging would have exceeded
963 * the specified memory limitation. Compresses the previously stored
966 static int gz_work(struct tp_work *work)
968 struct iolog_flush_data *data;
969 struct iolog_compress *c;
970 struct flist_head list;
976 INIT_FLIST_HEAD(&list);
978 data = container_of(work, struct iolog_flush_data, work);
980 stream.zalloc = Z_NULL;
981 stream.zfree = Z_NULL;
982 stream.opaque = Z_NULL;
984 ret = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
986 log_err("fio: failed to init gz stream\n");
990 seq = ++data->log->chunk_seq;
992 stream.next_in = (void *) data->samples;
993 stream.avail_in = data->nr_samples * log_entry_sz(data->log);
995 dprint(FD_COMPRESS, "deflate input size=%lu, seq=%u\n",
996 (unsigned long) stream.avail_in, seq);
998 c = get_new_chunk(seq);
999 stream.avail_out = GZ_CHUNK;
1000 stream.next_out = c->buf;
1001 ret = deflate(&stream, Z_NO_FLUSH);
1003 log_err("fio: deflate log (%d)\n", ret);
1008 c->len = GZ_CHUNK - stream.avail_out;
1009 flist_add_tail(&c->list, &list);
1011 } while (stream.avail_in);
1013 stream.next_out = c->buf + c->len;
1014 stream.avail_out = GZ_CHUNK - c->len;
1016 ret = deflate(&stream, Z_FINISH);
1017 if (ret == Z_STREAM_END)
1018 c->len = GZ_CHUNK - stream.avail_out;
1021 c = get_new_chunk(seq);
1022 stream.avail_out = GZ_CHUNK;
1023 stream.next_out = c->buf;
1024 ret = deflate(&stream, Z_FINISH);
1025 c->len = GZ_CHUNK - stream.avail_out;
1027 flist_add_tail(&c->list, &list);
1028 } while (ret != Z_STREAM_END);
1031 dprint(FD_COMPRESS, "deflated to size=%lu\n", (unsigned long) total);
1033 ret = deflateEnd(&stream);
1035 log_err("fio: deflateEnd %d\n", ret);
1037 free(data->samples);
1039 if (!flist_empty(&list)) {
1040 pthread_mutex_lock(&data->log->chunk_lock);
1041 flist_splice_tail(&list, &data->log->chunk_list);
1042 pthread_mutex_unlock(&data->log->chunk_lock);
1049 pthread_cond_signal(&work->cv);
1055 while (!flist_empty(&list)) {
1056 c = flist_first_entry(list.next, struct iolog_compress, list);
1057 flist_del(&c->list);
1065 * Queue work item to compress the existing log entries. We copy the
1066 * samples, and reset the log sample count to 0 (so the logging will
1067 * continue to use the memory associated with the log). If called with
1068 * wait == 1, will not return until the log compression has completed.
1070 int iolog_flush(struct io_log *log, int wait)
1072 struct tp_data *tdat = log->td->tp_data;
1073 struct iolog_flush_data *data;
1076 data = malloc(sizeof(*data));
1082 sample_size = log->nr_samples * log_entry_sz(log);
1083 data->samples = malloc(sample_size);
1084 if (!data->samples) {
1089 memcpy(data->samples, log->log, sample_size);
1090 data->nr_samples = log->nr_samples;
1091 data->work.fn = gz_work;
1092 log->nr_samples = 0;
1095 pthread_mutex_init(&data->work.lock, NULL);
1096 pthread_cond_init(&data->work.cv, NULL);
1097 data->work.wait = 1;
1099 data->work.wait = 0;
1101 data->work.prio = 1;
1102 tp_queue_work(tdat, &data->work);
1105 pthread_mutex_lock(&data->work.lock);
1106 while (!data->work.done)
1107 pthread_cond_wait(&data->work.cv, &data->work.lock);
1108 pthread_mutex_unlock(&data->work.lock);
1117 int iolog_flush(struct io_log *log, int wait)
1124 static int write_iops_log(struct thread_data *td, int try)
1126 struct io_log *log = td->iops_log;
1131 return finish_log(td, log, try);
1134 static int write_slat_log(struct thread_data *td, int try)
1136 struct io_log *log = td->slat_log;
1141 return finish_log(td, log, try);
1144 static int write_clat_log(struct thread_data *td, int try)
1146 struct io_log *log = td->clat_log;
1151 return finish_log(td, log, try);
1154 static int write_lat_log(struct thread_data *td, int try)
1156 struct io_log *log = td->lat_log;
1161 return finish_log(td, log, try);
1164 static int write_bandw_log(struct thread_data *td, int try)
1166 struct io_log *log = td->bw_log;
1171 return finish_log(td, log, try);
1186 int (*fn)(struct thread_data *, int);
1189 static struct log_type log_types[] = {
1191 .mask = BW_LOG_MASK,
1192 .fn = write_bandw_log,
1195 .mask = LAT_LOG_MASK,
1196 .fn = write_lat_log,
1199 .mask = SLAT_LOG_MASK,
1200 .fn = write_slat_log,
1203 .mask = CLAT_LOG_MASK,
1204 .fn = write_clat_log,
1207 .mask = IOPS_LOG_MASK,
1208 .fn = write_iops_log,
1212 void fio_writeout_logs(struct thread_data *td)
1214 unsigned int log_mask = 0;
1215 unsigned int log_left = ALL_LOG_NR;
1218 old_state = td_bump_runstate(td, TD_FINISHING);
1223 int prev_log_left = log_left;
1225 for (i = 0; i < ALL_LOG_NR && log_left; i++) {
1226 struct log_type *lt = &log_types[i];
1229 if (!(log_mask & lt->mask)) {
1230 ret = lt->fn(td, log_left != 1);
1233 log_mask |= lt->mask;
1238 if (prev_log_left == log_left)
1242 td_restore_runstate(td, old_state);