2 * Code related to writing an iolog of what a thread is doing, and to
3 * later read that back and replay
24 static int iolog_flush(struct io_log *log);
26 static const char iolog_ver2[] = "fio version 2 iolog";
28 void queue_io_piece(struct thread_data *td, struct io_piece *ipo)
30 flist_add_tail(&ipo->list, &td->io_log_list);
31 td->total_io_size += ipo->len;
34 void log_io_u(const struct thread_data *td, const struct io_u *io_u)
36 if (!td->o.write_iolog_file)
39 fprintf(td->iolog_f, "%s %s %llu %lu\n", io_u->file->file_name,
40 io_ddir_name(io_u->ddir),
41 io_u->offset, io_u->buflen);
44 void log_file(struct thread_data *td, struct fio_file *f,
45 enum file_log_act what)
47 const char *act[] = { "add", "open", "close" };
51 if (!td->o.write_iolog_file)
56 * this happens on the pre-open/close done before the job starts
61 fprintf(td->iolog_f, "%s %s\n", f->file_name, act[what]);
64 static void iolog_delay(struct thread_data *td, unsigned long delay)
66 uint64_t usec = utime_since_now(&td->last_issue);
70 if (delay < td->time_offset) {
75 delay -= td->time_offset;
81 fio_gettime(&ts, NULL);
82 while (delay && !td->terminate) {
84 if (this_delay > 500000)
87 usec_sleep(td, this_delay);
91 usec = utime_since_now(&ts);
93 td->time_offset = usec - delay;
98 static int ipo_special(struct thread_data *td, struct io_piece *ipo)
106 if (ipo->ddir != DDIR_INVAL)
109 f = td->files[ipo->fileno];
111 switch (ipo->file_action) {
112 case FIO_LOG_OPEN_FILE:
113 if (td->o.replay_redirect && fio_file_open(f)) {
114 dprint(FD_FILE, "iolog: ignoring re-open of file %s\n",
118 ret = td_io_open_file(td, f);
121 td_verror(td, ret, "iolog open file");
123 case FIO_LOG_CLOSE_FILE:
124 td_io_close_file(td, f);
126 case FIO_LOG_UNLINK_FILE:
127 td_io_unlink_file(td, f);
130 log_err("fio: bad file action %d\n", ipo->file_action);
137 int read_iolog_get(struct thread_data *td, struct io_u *io_u)
139 struct io_piece *ipo;
140 unsigned long elapsed;
142 while (!flist_empty(&td->io_log_list)) {
145 ipo = flist_first_entry(&td->io_log_list, struct io_piece, list);
146 flist_del(&ipo->list);
147 remove_trim_entry(td, ipo);
149 ret = ipo_special(td, ipo);
153 } else if (ret > 0) {
158 io_u->ddir = ipo->ddir;
159 if (ipo->ddir != DDIR_WAIT) {
160 io_u->offset = ipo->offset;
161 io_u->buflen = ipo->len;
162 io_u->file = td->files[ipo->fileno];
163 get_file(io_u->file);
164 dprint(FD_IO, "iolog: get %llu/%lu/%s\n", io_u->offset,
165 io_u->buflen, io_u->file->file_name);
167 iolog_delay(td, ipo->delay);
169 elapsed = mtime_since_genesis();
170 if (ipo->delay > elapsed)
171 usec_sleep(td, (ipo->delay - elapsed) * 1000);
176 if (io_u->ddir != DDIR_WAIT)
184 void prune_io_piece_log(struct thread_data *td)
186 struct io_piece *ipo;
189 while ((n = rb_first(&td->io_hist_tree)) != NULL) {
190 ipo = rb_entry(n, struct io_piece, rb_node);
191 rb_erase(n, &td->io_hist_tree);
192 remove_trim_entry(td, ipo);
197 while (!flist_empty(&td->io_hist_list)) {
198 ipo = flist_first_entry(&td->io_hist_list, struct io_piece, list);
199 flist_del(&ipo->list);
200 remove_trim_entry(td, ipo);
207 * log a successful write, so we can unwind the log for verify
209 void log_io_piece(struct thread_data *td, struct io_u *io_u)
211 struct rb_node **p, *parent;
212 struct io_piece *ipo, *__ipo;
214 ipo = malloc(sizeof(struct io_piece));
216 ipo->file = io_u->file;
217 ipo->offset = io_u->offset;
218 ipo->len = io_u->buflen;
219 ipo->numberio = io_u->numberio;
220 ipo->flags = IP_F_IN_FLIGHT;
224 if (io_u_should_trim(td, io_u)) {
225 flist_add_tail(&ipo->trim_list, &td->trim_list);
230 * We don't need to sort the entries if we only performed sequential
231 * writes. In this case, just reading back data in the order we wrote
232 * it out is the faster but still safe.
234 * One exception is if we don't have a random map in which case we need
235 * to check for duplicate blocks and drop the old one, which we rely on
236 * the rb insert/lookup for handling.
238 if (((!td->o.verifysort) || !td_random(td)) &&
239 file_randommap(td, ipo->file)) {
240 INIT_FLIST_HEAD(&ipo->list);
241 flist_add_tail(&ipo->list, &td->io_hist_list);
242 ipo->flags |= IP_F_ONLIST;
247 RB_CLEAR_NODE(&ipo->rb_node);
250 * Sort the entry into the verification list
253 p = &td->io_hist_tree.rb_node;
259 __ipo = rb_entry(parent, struct io_piece, rb_node);
260 if (ipo->file < __ipo->file)
262 else if (ipo->file > __ipo->file)
264 else if (ipo->offset < __ipo->offset) {
266 overlap = ipo->offset + ipo->len > __ipo->offset;
268 else if (ipo->offset > __ipo->offset) {
270 overlap = __ipo->offset + __ipo->len > ipo->offset;
276 dprint(FD_IO, "iolog: overlap %llu/%lu, %llu/%lu\n",
277 __ipo->offset, __ipo->len,
278 ipo->offset, ipo->len);
280 rb_erase(parent, &td->io_hist_tree);
281 remove_trim_entry(td, __ipo);
282 if (!(__ipo->flags & IP_F_IN_FLIGHT))
288 rb_link_node(&ipo->rb_node, parent, p);
289 rb_insert_color(&ipo->rb_node, &td->io_hist_tree);
290 ipo->flags |= IP_F_ONRB;
294 void unlog_io_piece(struct thread_data *td, struct io_u *io_u)
296 struct io_piece *ipo = io_u->ipo;
298 if (td->ts.nr_block_infos) {
299 uint32_t *info = io_u_block_info(td, io_u);
300 if (BLOCK_INFO_STATE(*info) < BLOCK_STATE_TRIM_FAILURE) {
301 if (io_u->ddir == DDIR_TRIM)
302 *info = BLOCK_INFO_SET_STATE(*info,
303 BLOCK_STATE_TRIM_FAILURE);
304 else if (io_u->ddir == DDIR_WRITE)
305 *info = BLOCK_INFO_SET_STATE(*info,
306 BLOCK_STATE_WRITE_FAILURE);
313 if (ipo->flags & IP_F_ONRB)
314 rb_erase(&ipo->rb_node, &td->io_hist_tree);
315 else if (ipo->flags & IP_F_ONLIST)
316 flist_del(&ipo->list);
323 void trim_io_piece(struct thread_data *td, const struct io_u *io_u)
325 struct io_piece *ipo = io_u->ipo;
330 ipo->len = io_u->xfer_buflen - io_u->resid;
333 void write_iolog_close(struct thread_data *td)
339 td->iolog_buf = NULL;
343 * Read version 2 iolog data. It is enhanced to include per-file logging,
346 static int read_iolog2(struct thread_data *td, FILE *f)
348 unsigned long long offset;
350 int reads, writes, waits, fileno = 0, file_action = 0; /* stupid gcc */
351 char *rfname, *fname, *act;
355 free_release_files(td);
358 * Read in the read iolog and store it, reuse the infrastructure
359 * for doing verifications.
362 rfname = fname = malloc(256+16);
363 act = malloc(256+16);
365 reads = writes = waits = 0;
366 while ((p = fgets(str, 4096, f)) != NULL) {
367 struct io_piece *ipo;
370 r = sscanf(p, "%256s %256s %llu %u", rfname, act, &offset,
373 if (td->o.replay_redirect)
374 fname = td->o.replay_redirect;
380 if (!strcmp(act, "wait"))
382 else if (!strcmp(act, "read"))
384 else if (!strcmp(act, "write"))
386 else if (!strcmp(act, "sync"))
388 else if (!strcmp(act, "datasync"))
390 else if (!strcmp(act, "trim"))
393 log_err("fio: bad iolog file action: %s\n",
397 fileno = get_fileno(td, fname);
400 if (!strcmp(act, "add")) {
401 if (td->o.replay_redirect &&
402 get_fileno(td, fname) != -1) {
403 dprint(FD_FILE, "iolog: ignoring"
404 " re-add of file %s\n", fname);
406 fileno = add_file(td, fname, 0, 1);
407 file_action = FIO_LOG_ADD_FILE;
410 } else if (!strcmp(act, "open")) {
411 fileno = get_fileno(td, fname);
412 file_action = FIO_LOG_OPEN_FILE;
413 } else if (!strcmp(act, "close")) {
414 fileno = get_fileno(td, fname);
415 file_action = FIO_LOG_CLOSE_FILE;
417 log_err("fio: bad iolog file action: %s\n",
422 log_err("bad iolog2: %s\n", p);
428 else if (rw == DDIR_WRITE) {
430 * Don't add a write for ro mode
435 } else if (rw == DDIR_WAIT) {
439 } else if (rw == DDIR_INVAL) {
440 } else if (!ddir_sync(rw)) {
441 log_err("bad ddir: %d\n", rw);
448 ipo = malloc(sizeof(*ipo));
451 if (rw == DDIR_WAIT) {
454 if (td->o.replay_scale)
455 ipo->offset = offset / td->o.replay_scale;
457 ipo->offset = offset;
458 ipo_bytes_align(td->o.replay_align, ipo);
461 if (rw != DDIR_INVAL && bytes > td->o.max_bs[rw])
462 td->o.max_bs[rw] = bytes;
463 ipo->fileno = fileno;
464 ipo->file_action = file_action;
468 queue_io_piece(td, ipo);
475 if (writes && read_only) {
476 log_err("fio: <%s> skips replay of %d writes due to"
477 " read-only\n", td->o.name, writes);
481 if (!reads && !writes && !waits)
483 else if (reads && !writes)
484 td->o.td_ddir = TD_DDIR_READ;
485 else if (!reads && writes)
486 td->o.td_ddir = TD_DDIR_WRITE;
488 td->o.td_ddir = TD_DDIR_RW;
494 * open iolog, check version, and call appropriate parser
496 static int init_iolog_read(struct thread_data *td)
498 char buffer[256], *p;
502 f = fopen(td->o.read_iolog_file, "r");
504 perror("fopen read iolog");
508 p = fgets(buffer, sizeof(buffer), f);
510 td_verror(td, errno, "iolog read");
511 log_err("fio: unable to read iolog\n");
517 * version 2 of the iolog stores a specific string as the
518 * first line, check for that
520 if (!strncmp(iolog_ver2, buffer, strlen(iolog_ver2)))
521 ret = read_iolog2(td, f);
523 log_err("fio: iolog version 1 is no longer supported\n");
532 * Set up a log for storing io patterns.
534 static int init_iolog_write(struct thread_data *td)
540 f = fopen(td->o.write_iolog_file, "a");
542 perror("fopen write iolog");
547 * That's it for writing, setup a log buffer and we're done.
550 td->iolog_buf = malloc(8192);
551 setvbuf(f, td->iolog_buf, _IOFBF, 8192);
554 * write our version line
556 if (fprintf(f, "%s\n", iolog_ver2) < 0) {
557 perror("iolog init\n");
562 * add all known files
564 for_each_file(td, ff, i)
565 log_file(td, ff, FIO_LOG_ADD_FILE);
570 int init_iolog(struct thread_data *td)
574 if (td->o.read_iolog_file) {
578 * Check if it's a blktrace file and load that if possible.
579 * Otherwise assume it's a normal log file and load that.
581 if (is_blktrace(td->o.read_iolog_file, &need_swap))
582 ret = load_blktrace(td, td->o.read_iolog_file, need_swap);
584 ret = init_iolog_read(td);
585 } else if (td->o.write_iolog_file)
586 ret = init_iolog_write(td);
589 td_verror(td, EINVAL, "failed initializing iolog");
596 void setup_log(struct io_log **log, struct log_params *p,
597 const char *filename)
601 struct io_u_plat_entry *entry;
602 struct flist_head *list;
604 l = scalloc(1, sizeof(*l));
605 INIT_FLIST_HEAD(&l->io_logs);
606 l->log_type = p->log_type;
607 l->log_offset = p->log_offset;
608 l->log_gz = p->log_gz;
609 l->log_gz_store = p->log_gz_store;
610 l->avg_msec = p->avg_msec;
611 l->hist_msec = p->hist_msec;
612 l->hist_coarseness = p->hist_coarseness;
613 l->filename = strdup(filename);
616 /* Initialize histogram lists for each r/w direction,
617 * with initial io_u_plat of all zeros:
619 for (i = 0; i < DDIR_RWDIR_CNT; i++) {
620 list = &l->hist_window[i].list;
621 INIT_FLIST_HEAD(list);
622 entry = calloc(1, sizeof(struct io_u_plat_entry));
623 flist_add(&entry->list, list);
626 if (l->td && l->td->o.io_submit_mode != IO_MODE_OFFLOAD) {
629 p = calloc(1, sizeof(*l->pending));
630 p->max_samples = DEF_LOG_ENTRIES;
631 p->log = calloc(p->max_samples, log_entry_sz(l));
636 l->log_ddir_mask = LOG_OFFSET_SAMPLE_BIT;
638 INIT_FLIST_HEAD(&l->chunk_list);
640 if (l->log_gz && !p->td)
642 else if (l->log_gz || l->log_gz_store) {
643 mutex_init_pshared(&l->chunk_lock);
644 mutex_init_pshared(&l->deferred_free_lock);
645 p->td->flags |= TD_F_COMPRESS_LOG;
651 #ifdef CONFIG_SETVBUF
652 static void *set_file_buffer(FILE *f)
654 size_t size = 1048576;
658 setvbuf(f, buf, _IOFBF, size);
662 static void clear_file_buffer(void *buf)
667 static void *set_file_buffer(FILE *f)
672 static void clear_file_buffer(void *buf)
677 void free_log(struct io_log *log)
679 while (!flist_empty(&log->io_logs)) {
680 struct io_logs *cur_log;
682 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
683 flist_del_init(&cur_log->list);
689 free(log->pending->log);
699 unsigned long hist_sum(int j, int stride, unsigned int *io_u_plat,
700 unsigned int *io_u_plat_last)
705 if (io_u_plat_last) {
706 for (k = sum = 0; k < stride; k++)
707 sum += io_u_plat[j + k] - io_u_plat_last[j + k];
709 for (k = sum = 0; k < stride; k++)
710 sum += io_u_plat[j + k];
716 static void flush_hist_samples(FILE *f, int hist_coarseness, void *samples,
717 uint64_t sample_size)
721 uint64_t i, j, nr_samples;
722 struct io_u_plat_entry *entry, *entry_before;
723 unsigned int *io_u_plat;
724 unsigned int *io_u_plat_before;
726 int stride = 1 << hist_coarseness;
731 s = __get_sample(samples, 0, 0);
732 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
734 nr_samples = sample_size / __log_entry_sz(log_offset);
736 for (i = 0; i < nr_samples; i++) {
737 s = __get_sample(samples, log_offset, i);
739 entry = s->data.plat_entry;
740 io_u_plat = entry->io_u_plat;
742 entry_before = flist_first_entry(&entry->list, struct io_u_plat_entry, list);
743 io_u_plat_before = entry_before->io_u_plat;
745 fprintf(f, "%lu, %u, %u, ", (unsigned long) s->time,
746 io_sample_ddir(s), s->bs);
747 for (j = 0; j < FIO_IO_U_PLAT_NR - stride; j += stride) {
748 fprintf(f, "%lu, ", hist_sum(j, stride, io_u_plat,
751 fprintf(f, "%lu\n", (unsigned long)
752 hist_sum(FIO_IO_U_PLAT_NR - stride, stride, io_u_plat,
755 flist_del(&entry_before->list);
760 void flush_samples(FILE *f, void *samples, uint64_t sample_size)
764 uint64_t i, nr_samples;
769 s = __get_sample(samples, 0, 0);
770 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
772 nr_samples = sample_size / __log_entry_sz(log_offset);
774 for (i = 0; i < nr_samples; i++) {
775 s = __get_sample(samples, log_offset, i);
778 fprintf(f, "%lu, %" PRId64 ", %u, %u\n",
779 (unsigned long) s->time,
781 io_sample_ddir(s), s->bs);
783 struct io_sample_offset *so = (void *) s;
785 fprintf(f, "%lu, %" PRId64 ", %u, %u, %llu\n",
786 (unsigned long) s->time,
788 io_sample_ddir(s), s->bs,
789 (unsigned long long) so->offset);
796 struct iolog_flush_data {
797 struct workqueue_work work;
804 #define GZ_CHUNK 131072
806 static struct iolog_compress *get_new_chunk(unsigned int seq)
808 struct iolog_compress *c;
810 c = malloc(sizeof(*c));
811 INIT_FLIST_HEAD(&c->list);
812 c->buf = malloc(GZ_CHUNK);
818 static void free_chunk(struct iolog_compress *ic)
824 static int z_stream_init(z_stream *stream, int gz_hdr)
828 memset(stream, 0, sizeof(*stream));
829 stream->zalloc = Z_NULL;
830 stream->zfree = Z_NULL;
831 stream->opaque = Z_NULL;
832 stream->next_in = Z_NULL;
835 * zlib magic - add 32 for auto-detection of gz header or not,
836 * if we decide to store files in a gzip friendly format.
841 if (inflateInit2(stream, wbits) != Z_OK)
847 struct inflate_chunk_iter {
856 static void finish_chunk(z_stream *stream, FILE *f,
857 struct inflate_chunk_iter *iter)
861 ret = inflateEnd(stream);
863 log_err("fio: failed to end log inflation seq %d (%d)\n",
866 flush_samples(f, iter->buf, iter->buf_used);
869 iter->buf_size = iter->buf_used = 0;
873 * Iterative chunk inflation. Handles cases where we cross into a new
874 * sequence, doing flush finish of previous chunk if needed.
876 static size_t inflate_chunk(struct iolog_compress *ic, int gz_hdr, FILE *f,
877 z_stream *stream, struct inflate_chunk_iter *iter)
881 dprint(FD_COMPRESS, "inflate chunk size=%lu, seq=%u\n",
882 (unsigned long) ic->len, ic->seq);
884 if (ic->seq != iter->seq) {
886 finish_chunk(stream, f, iter);
888 z_stream_init(stream, gz_hdr);
892 stream->avail_in = ic->len;
893 stream->next_in = ic->buf;
895 if (!iter->buf_size) {
896 iter->buf_size = iter->chunk_sz;
897 iter->buf = malloc(iter->buf_size);
900 while (stream->avail_in) {
901 size_t this_out = iter->buf_size - iter->buf_used;
904 stream->avail_out = this_out;
905 stream->next_out = iter->buf + iter->buf_used;
907 err = inflate(stream, Z_NO_FLUSH);
909 log_err("fio: failed inflating log: %d\n", err);
914 iter->buf_used += this_out - stream->avail_out;
916 if (!stream->avail_out) {
917 iter->buf_size += iter->chunk_sz;
918 iter->buf = realloc(iter->buf, iter->buf_size);
922 if (err == Z_STREAM_END)
926 ret = (void *) stream->next_in - ic->buf;
928 dprint(FD_COMPRESS, "inflated to size=%lu\n", (unsigned long) iter->buf_size);
934 * Inflate stored compressed chunks, or write them directly to the log
935 * file if so instructed.
937 static int inflate_gz_chunks(struct io_log *log, FILE *f)
939 struct inflate_chunk_iter iter = { .chunk_sz = log->log_gz, };
942 while (!flist_empty(&log->chunk_list)) {
943 struct iolog_compress *ic;
945 ic = flist_first_entry(&log->chunk_list, struct iolog_compress, list);
946 flist_del(&ic->list);
948 if (log->log_gz_store) {
951 dprint(FD_COMPRESS, "log write chunk size=%lu, "
952 "seq=%u\n", (unsigned long) ic->len, ic->seq);
954 ret = fwrite(ic->buf, ic->len, 1, f);
955 if (ret != 1 || ferror(f)) {
957 log_err("fio: error writing compressed log\n");
960 inflate_chunk(ic, log->log_gz_store, f, &stream, &iter);
966 finish_chunk(&stream, f, &iter);
974 * Open compressed log file and decompress the stored chunks and
975 * write them to stdout. The chunks are stored sequentially in the
976 * file, so we iterate over them and do them one-by-one.
978 int iolog_file_inflate(const char *file)
980 struct inflate_chunk_iter iter = { .chunk_sz = 64 * 1024 * 1024, };
981 struct iolog_compress ic;
989 f = fopen(file, "r");
995 if (stat(file, &sb) < 0) {
1001 ic.buf = buf = malloc(sb.st_size);
1002 ic.len = sb.st_size;
1005 ret = fread(ic.buf, ic.len, 1, f);
1011 } else if (ret != 1) {
1012 log_err("fio: short read on reading log\n");
1021 * Each chunk will return Z_STREAM_END. We don't know how many
1022 * chunks are in the file, so we just keep looping and incrementing
1023 * the sequence number until we have consumed the whole compressed
1030 iret = inflate_chunk(&ic, 1, stdout, &stream, &iter);
1043 finish_chunk(&stream, stdout, &iter);
1053 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1058 int iolog_file_inflate(const char *file)
1060 log_err("fio: log inflation not possible without zlib\n");
1066 void flush_log(struct io_log *log, bool do_append)
1072 f = fopen(log->filename, "w");
1074 f = fopen(log->filename, "a");
1076 perror("fopen log");
1080 buf = set_file_buffer(f);
1082 inflate_gz_chunks(log, f);
1084 while (!flist_empty(&log->io_logs)) {
1085 struct io_logs *cur_log;
1087 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1088 flist_del_init(&cur_log->list);
1090 if (log->td && log == log->td->clat_hist_log)
1091 flush_hist_samples(f, log->hist_coarseness, cur_log->log,
1092 log_sample_sz(log, cur_log));
1094 flush_samples(f, cur_log->log, log_sample_sz(log, cur_log));
1100 clear_file_buffer(buf);
1103 static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
1105 if (td->flags & TD_F_COMPRESS_LOG)
1109 if (fio_trylock_file(log->filename))
1112 fio_lock_file(log->filename);
1114 if (td->client_type == FIO_CLIENT_TYPE_GUI || is_backend)
1115 fio_send_iolog(td, log, log->filename);
1117 flush_log(log, !td->o.per_job_logs);
1119 fio_unlock_file(log->filename);
1124 size_t log_chunk_sizes(struct io_log *log)
1126 struct flist_head *entry;
1129 if (flist_empty(&log->chunk_list))
1133 pthread_mutex_lock(&log->chunk_lock);
1134 flist_for_each(entry, &log->chunk_list) {
1135 struct iolog_compress *c;
1137 c = flist_entry(entry, struct iolog_compress, list);
1140 pthread_mutex_unlock(&log->chunk_lock);
1146 static bool warned_on_drop;
1148 static void iolog_put_deferred(struct io_log *log, void *ptr)
1153 pthread_mutex_lock(&log->deferred_free_lock);
1154 if (log->deferred < IOLOG_MAX_DEFER) {
1155 log->deferred_items[log->deferred] = ptr;
1157 } else if (!warned_on_drop) {
1158 log_err("fio: had to drop log entry free\n");
1159 warned_on_drop = true;
1161 pthread_mutex_unlock(&log->deferred_free_lock);
1164 static void iolog_free_deferred(struct io_log *log)
1171 pthread_mutex_lock(&log->deferred_free_lock);
1173 for (i = 0; i < log->deferred; i++) {
1174 free(log->deferred_items[i]);
1175 log->deferred_items[i] = NULL;
1179 pthread_mutex_unlock(&log->deferred_free_lock);
1182 static int gz_work(struct iolog_flush_data *data)
1184 struct iolog_compress *c = NULL;
1185 struct flist_head list;
1191 INIT_FLIST_HEAD(&list);
1193 memset(&stream, 0, sizeof(stream));
1194 stream.zalloc = Z_NULL;
1195 stream.zfree = Z_NULL;
1196 stream.opaque = Z_NULL;
1198 ret = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
1200 log_err("fio: failed to init gz stream\n");
1204 seq = ++data->log->chunk_seq;
1206 stream.next_in = (void *) data->samples;
1207 stream.avail_in = data->nr_samples * log_entry_sz(data->log);
1209 dprint(FD_COMPRESS, "deflate input size=%lu, seq=%u, log=%s\n",
1210 (unsigned long) stream.avail_in, seq,
1211 data->log->filename);
1214 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1215 (unsigned long) c->len);
1216 c = get_new_chunk(seq);
1217 stream.avail_out = GZ_CHUNK;
1218 stream.next_out = c->buf;
1219 ret = deflate(&stream, Z_NO_FLUSH);
1221 log_err("fio: deflate log (%d)\n", ret);
1226 c->len = GZ_CHUNK - stream.avail_out;
1227 flist_add_tail(&c->list, &list);
1229 } while (stream.avail_in);
1231 stream.next_out = c->buf + c->len;
1232 stream.avail_out = GZ_CHUNK - c->len;
1234 ret = deflate(&stream, Z_FINISH);
1237 * Z_BUF_ERROR is special, it just means we need more
1238 * output space. We'll handle that below. Treat any other
1241 if (ret != Z_BUF_ERROR) {
1242 log_err("fio: deflate log (%d)\n", ret);
1243 flist_del(&c->list);
1250 c->len = GZ_CHUNK - stream.avail_out;
1252 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq, (unsigned long) c->len);
1254 if (ret != Z_STREAM_END) {
1256 c = get_new_chunk(seq);
1257 stream.avail_out = GZ_CHUNK;
1258 stream.next_out = c->buf;
1259 ret = deflate(&stream, Z_FINISH);
1260 c->len = GZ_CHUNK - stream.avail_out;
1262 flist_add_tail(&c->list, &list);
1263 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1264 (unsigned long) c->len);
1265 } while (ret != Z_STREAM_END);
1268 dprint(FD_COMPRESS, "deflated to size=%lu\n", (unsigned long) total);
1270 ret = deflateEnd(&stream);
1272 log_err("fio: deflateEnd %d\n", ret);
1274 iolog_put_deferred(data->log, data->samples);
1276 if (!flist_empty(&list)) {
1277 pthread_mutex_lock(&data->log->chunk_lock);
1278 flist_splice_tail(&list, &data->log->chunk_list);
1279 pthread_mutex_unlock(&data->log->chunk_lock);
1288 while (!flist_empty(&list)) {
1289 c = flist_first_entry(list.next, struct iolog_compress, list);
1290 flist_del(&c->list);
1298 * Invoked from our compress helper thread, when logging would have exceeded
1299 * the specified memory limitation. Compresses the previously stored
1302 static int gz_work_async(struct submit_worker *sw, struct workqueue_work *work)
1304 return gz_work(container_of(work, struct iolog_flush_data, work));
1307 static int gz_init_worker(struct submit_worker *sw)
1309 struct thread_data *td = sw->wq->td;
1311 if (!fio_option_is_set(&td->o, log_gz_cpumask))
1314 if (fio_setaffinity(gettid(), td->o.log_gz_cpumask) == -1) {
1315 log_err("gz: failed to set CPU affinity\n");
1322 static struct workqueue_ops log_compress_wq_ops = {
1323 .fn = gz_work_async,
1324 .init_worker_fn = gz_init_worker,
1328 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1330 if (!(td->flags & TD_F_COMPRESS_LOG))
1333 workqueue_init(td, &td->log_compress_wq, &log_compress_wq_ops, 1, sk_out);
1337 void iolog_compress_exit(struct thread_data *td)
1339 if (!(td->flags & TD_F_COMPRESS_LOG))
1342 workqueue_exit(&td->log_compress_wq);
1346 * Queue work item to compress the existing log entries. We reset the
1347 * current log to a small size, and reference the existing log in the
1348 * data that we queue for compression. Once compression has been done,
1349 * this old log is freed. If called with finish == true, will not return
1350 * until the log compression has completed, and will flush all previous
1353 static int iolog_flush(struct io_log *log)
1355 struct iolog_flush_data *data;
1357 data = malloc(sizeof(*data));
1364 while (!flist_empty(&log->io_logs)) {
1365 struct io_logs *cur_log;
1367 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1368 flist_del_init(&cur_log->list);
1370 data->samples = cur_log->log;
1371 data->nr_samples = cur_log->nr_samples;
1382 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1384 struct iolog_flush_data *data;
1386 data = smalloc(sizeof(*data));
1392 data->samples = cur_log->log;
1393 data->nr_samples = cur_log->nr_samples;
1396 cur_log->nr_samples = cur_log->max_samples = 0;
1397 cur_log->log = NULL;
1399 workqueue_enqueue(&log->td->log_compress_wq, &data->work);
1401 iolog_free_deferred(log);
1407 static int iolog_flush(struct io_log *log)
1412 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1417 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1422 void iolog_compress_exit(struct thread_data *td)
1428 struct io_logs *iolog_cur_log(struct io_log *log)
1430 if (flist_empty(&log->io_logs))
1433 return flist_last_entry(&log->io_logs, struct io_logs, list);
1436 uint64_t iolog_nr_samples(struct io_log *iolog)
1438 struct flist_head *entry;
1441 flist_for_each(entry, &iolog->io_logs) {
1442 struct io_logs *cur_log;
1444 cur_log = flist_entry(entry, struct io_logs, list);
1445 ret += cur_log->nr_samples;
1451 static int __write_log(struct thread_data *td, struct io_log *log, int try)
1454 return finish_log(td, log, try);
1459 static int write_iops_log(struct thread_data *td, int try, bool unit_log)
1463 if (per_unit_log(td->iops_log) != unit_log)
1466 ret = __write_log(td, td->iops_log, try);
1468 td->iops_log = NULL;
1473 static int write_slat_log(struct thread_data *td, int try, bool unit_log)
1480 ret = __write_log(td, td->slat_log, try);
1482 td->slat_log = NULL;
1487 static int write_clat_log(struct thread_data *td, int try, bool unit_log)
1494 ret = __write_log(td, td->clat_log, try);
1496 td->clat_log = NULL;
1501 static int write_clat_hist_log(struct thread_data *td, int try, bool unit_log)
1508 ret = __write_log(td, td->clat_hist_log, try);
1510 td->clat_hist_log = NULL;
1515 static int write_lat_log(struct thread_data *td, int try, bool unit_log)
1522 ret = __write_log(td, td->lat_log, try);
1529 static int write_bandw_log(struct thread_data *td, int try, bool unit_log)
1533 if (per_unit_log(td->bw_log) != unit_log)
1536 ret = __write_log(td, td->bw_log, try);
1549 CLAT_HIST_LOG_MASK = 32,
1556 int (*fn)(struct thread_data *, int, bool);
1559 static struct log_type log_types[] = {
1561 .mask = BW_LOG_MASK,
1562 .fn = write_bandw_log,
1565 .mask = LAT_LOG_MASK,
1566 .fn = write_lat_log,
1569 .mask = SLAT_LOG_MASK,
1570 .fn = write_slat_log,
1573 .mask = CLAT_LOG_MASK,
1574 .fn = write_clat_log,
1577 .mask = IOPS_LOG_MASK,
1578 .fn = write_iops_log,
1581 .mask = CLAT_HIST_LOG_MASK,
1582 .fn = write_clat_hist_log,
1586 void td_writeout_logs(struct thread_data *td, bool unit_logs)
1588 unsigned int log_mask = 0;
1589 unsigned int log_left = ALL_LOG_NR;
1592 old_state = td_bump_runstate(td, TD_FINISHING);
1594 finalize_logs(td, unit_logs);
1597 int prev_log_left = log_left;
1599 for (i = 0; i < ALL_LOG_NR && log_left; i++) {
1600 struct log_type *lt = &log_types[i];
1603 if (!(log_mask & lt->mask)) {
1604 ret = lt->fn(td, log_left != 1, unit_logs);
1607 log_mask |= lt->mask;
1612 if (prev_log_left == log_left)
1616 td_restore_runstate(td, old_state);
1619 void fio_writeout_logs(bool unit_logs)
1621 struct thread_data *td;
1625 td_writeout_logs(td, unit_logs);