2 * Code related to writing an iolog of what a thread is doing, and to
3 * later read that back and replay
23 static int iolog_flush(struct io_log *log);
25 static const char iolog_ver2[] = "fio version 2 iolog";
27 void queue_io_piece(struct thread_data *td, struct io_piece *ipo)
29 flist_add_tail(&ipo->list, &td->io_log_list);
30 td->total_io_size += ipo->len;
33 void log_io_u(const struct thread_data *td, const struct io_u *io_u)
35 if (!td->o.write_iolog_file)
38 fprintf(td->iolog_f, "%s %s %llu %lu\n", io_u->file->file_name,
39 io_ddir_name(io_u->ddir),
40 io_u->offset, io_u->buflen);
43 void log_file(struct thread_data *td, struct fio_file *f,
44 enum file_log_act what)
46 const char *act[] = { "add", "open", "close" };
50 if (!td->o.write_iolog_file)
55 * this happens on the pre-open/close done before the job starts
60 fprintf(td->iolog_f, "%s %s\n", f->file_name, act[what]);
63 static void iolog_delay(struct thread_data *td, unsigned long delay)
65 uint64_t usec = utime_since_now(&td->last_issue);
69 if (delay < td->time_offset) {
74 delay -= td->time_offset;
80 fio_gettime(&ts, NULL);
81 while (delay && !td->terminate) {
83 if (this_delay > 500000)
86 usec_sleep(td, this_delay);
90 usec = utime_since_now(&ts);
92 td->time_offset = usec - delay;
97 static int ipo_special(struct thread_data *td, struct io_piece *ipo)
105 if (ipo->ddir != DDIR_INVAL)
108 f = td->files[ipo->fileno];
110 switch (ipo->file_action) {
111 case FIO_LOG_OPEN_FILE:
112 if (td->o.replay_redirect && fio_file_open(f)) {
113 dprint(FD_FILE, "iolog: ignoring re-open of file %s\n",
117 ret = td_io_open_file(td, f);
120 td_verror(td, ret, "iolog open file");
122 case FIO_LOG_CLOSE_FILE:
123 td_io_close_file(td, f);
125 case FIO_LOG_UNLINK_FILE:
126 td_io_unlink_file(td, f);
129 log_err("fio: bad file action %d\n", ipo->file_action);
136 int read_iolog_get(struct thread_data *td, struct io_u *io_u)
138 struct io_piece *ipo;
139 unsigned long elapsed;
141 while (!flist_empty(&td->io_log_list)) {
144 ipo = flist_first_entry(&td->io_log_list, struct io_piece, list);
145 flist_del(&ipo->list);
146 remove_trim_entry(td, ipo);
148 ret = ipo_special(td, ipo);
152 } else if (ret > 0) {
157 io_u->ddir = ipo->ddir;
158 if (ipo->ddir != DDIR_WAIT) {
159 io_u->offset = ipo->offset;
160 io_u->buflen = ipo->len;
161 io_u->file = td->files[ipo->fileno];
162 get_file(io_u->file);
163 dprint(FD_IO, "iolog: get %llu/%lu/%s\n", io_u->offset,
164 io_u->buflen, io_u->file->file_name);
166 iolog_delay(td, ipo->delay);
168 elapsed = mtime_since_genesis();
169 if (ipo->delay > elapsed)
170 usec_sleep(td, (ipo->delay - elapsed) * 1000);
175 if (io_u->ddir != DDIR_WAIT)
183 void prune_io_piece_log(struct thread_data *td)
185 struct io_piece *ipo;
186 struct fio_rb_node *n;
188 while ((n = rb_first(&td->io_hist_tree)) != NULL) {
189 ipo = rb_entry(n, struct io_piece, rb_node);
190 rb_erase(n, &td->io_hist_tree);
191 remove_trim_entry(td, ipo);
196 while (!flist_empty(&td->io_hist_list)) {
197 ipo = flist_first_entry(&td->io_hist_list, struct io_piece, list);
198 flist_del(&ipo->list);
199 remove_trim_entry(td, ipo);
206 * log a successful write, so we can unwind the log for verify
208 void log_io_piece(struct thread_data *td, struct io_u *io_u)
210 struct fio_rb_node **p, *parent;
211 struct io_piece *ipo, *__ipo;
213 ipo = malloc(sizeof(struct io_piece));
215 ipo->file = io_u->file;
216 ipo->offset = io_u->offset;
217 ipo->len = io_u->buflen;
218 ipo->numberio = io_u->numberio;
219 ipo->flags = IP_F_IN_FLIGHT;
223 if (io_u_should_trim(td, io_u)) {
224 flist_add_tail(&ipo->trim_list, &td->trim_list);
229 * We don't need to sort the entries if we only performed sequential
230 * writes. In this case, just reading back data in the order we wrote
231 * it out is the faster but still safe.
233 * One exception is if we don't have a random map in which case we need
234 * to check for duplicate blocks and drop the old one, which we rely on
235 * the rb insert/lookup for handling.
237 if (((!td->o.verifysort) || !td_random(td)) &&
238 file_randommap(td, ipo->file)) {
239 INIT_FLIST_HEAD(&ipo->list);
240 flist_add_tail(&ipo->list, &td->io_hist_list);
241 ipo->flags |= IP_F_ONLIST;
246 RB_CLEAR_NODE(&ipo->rb_node);
249 * Sort the entry into the verification list
252 p = &td->io_hist_tree.rb_node;
258 __ipo = rb_entry(parent, struct io_piece, rb_node);
259 if (ipo->file < __ipo->file)
261 else if (ipo->file > __ipo->file)
263 else if (ipo->offset < __ipo->offset) {
265 overlap = ipo->offset + ipo->len > __ipo->offset;
267 else if (ipo->offset > __ipo->offset) {
269 overlap = __ipo->offset + __ipo->len > ipo->offset;
275 dprint(FD_IO, "iolog: overlap %llu/%lu, %llu/%lu\n",
276 __ipo->offset, __ipo->len,
277 ipo->offset, ipo->len);
279 rb_erase(parent, &td->io_hist_tree);
280 remove_trim_entry(td, __ipo);
281 if (!(__ipo->flags & IP_F_IN_FLIGHT))
287 rb_link_node(&ipo->rb_node, parent, p);
288 rb_insert_color(&ipo->rb_node, &td->io_hist_tree);
289 ipo->flags |= IP_F_ONRB;
293 void unlog_io_piece(struct thread_data *td, struct io_u *io_u)
295 struct io_piece *ipo = io_u->ipo;
297 if (td->ts.nr_block_infos) {
298 uint32_t *info = io_u_block_info(td, io_u);
299 if (BLOCK_INFO_STATE(*info) < BLOCK_STATE_TRIM_FAILURE) {
300 if (io_u->ddir == DDIR_TRIM)
301 *info = BLOCK_INFO_SET_STATE(*info,
302 BLOCK_STATE_TRIM_FAILURE);
303 else if (io_u->ddir == DDIR_WRITE)
304 *info = BLOCK_INFO_SET_STATE(*info,
305 BLOCK_STATE_WRITE_FAILURE);
312 if (ipo->flags & IP_F_ONRB)
313 rb_erase(&ipo->rb_node, &td->io_hist_tree);
314 else if (ipo->flags & IP_F_ONLIST)
315 flist_del(&ipo->list);
322 void trim_io_piece(struct thread_data *td, const struct io_u *io_u)
324 struct io_piece *ipo = io_u->ipo;
329 ipo->len = io_u->xfer_buflen - io_u->resid;
332 void write_iolog_close(struct thread_data *td)
338 td->iolog_buf = NULL;
342 * Read version 2 iolog data. It is enhanced to include per-file logging,
345 static int read_iolog2(struct thread_data *td, FILE *f)
347 unsigned long long offset;
349 int reads, writes, waits, fileno = 0, file_action = 0; /* stupid gcc */
350 char *rfname, *fname, *act;
354 free_release_files(td);
357 * Read in the read iolog and store it, reuse the infrastructure
358 * for doing verifications.
361 rfname = fname = malloc(256+16);
362 act = malloc(256+16);
364 reads = writes = waits = 0;
365 while ((p = fgets(str, 4096, f)) != NULL) {
366 struct io_piece *ipo;
369 r = sscanf(p, "%256s %256s %llu %u", rfname, act, &offset,
372 if (td->o.replay_redirect)
373 fname = td->o.replay_redirect;
379 if (!strcmp(act, "wait"))
381 else if (!strcmp(act, "read"))
383 else if (!strcmp(act, "write"))
385 else if (!strcmp(act, "sync"))
387 else if (!strcmp(act, "datasync"))
389 else if (!strcmp(act, "trim"))
392 log_err("fio: bad iolog file action: %s\n",
396 fileno = get_fileno(td, fname);
399 if (!strcmp(act, "add")) {
400 if (td->o.replay_redirect &&
401 get_fileno(td, fname) != -1) {
402 dprint(FD_FILE, "iolog: ignoring"
403 " re-add of file %s\n", fname);
405 fileno = add_file(td, fname, 0, 1);
406 file_action = FIO_LOG_ADD_FILE;
409 } else if (!strcmp(act, "open")) {
410 fileno = get_fileno(td, fname);
411 file_action = FIO_LOG_OPEN_FILE;
412 } else if (!strcmp(act, "close")) {
413 fileno = get_fileno(td, fname);
414 file_action = FIO_LOG_CLOSE_FILE;
416 log_err("fio: bad iolog file action: %s\n",
421 log_err("bad iolog2: %s\n", p);
427 else if (rw == DDIR_WRITE) {
429 * Don't add a write for ro mode
434 } else if (rw == DDIR_WAIT) {
438 } else if (rw == DDIR_INVAL) {
439 } else if (!ddir_sync(rw)) {
440 log_err("bad ddir: %d\n", rw);
447 ipo = malloc(sizeof(*ipo));
450 if (rw == DDIR_WAIT) {
453 if (td->o.replay_scale)
454 ipo->offset = offset / td->o.replay_scale;
456 ipo->offset = offset;
457 ipo_bytes_align(td->o.replay_align, ipo);
460 if (rw != DDIR_INVAL && bytes > td->o.max_bs[rw])
461 td->o.max_bs[rw] = bytes;
462 ipo->fileno = fileno;
463 ipo->file_action = file_action;
467 queue_io_piece(td, ipo);
474 if (writes && read_only) {
475 log_err("fio: <%s> skips replay of %d writes due to"
476 " read-only\n", td->o.name, writes);
480 if (!reads && !writes && !waits)
482 else if (reads && !writes)
483 td->o.td_ddir = TD_DDIR_READ;
484 else if (!reads && writes)
485 td->o.td_ddir = TD_DDIR_WRITE;
487 td->o.td_ddir = TD_DDIR_RW;
493 * open iolog, check version, and call appropriate parser
495 static int init_iolog_read(struct thread_data *td)
497 char buffer[256], *p;
501 f = fopen(td->o.read_iolog_file, "r");
503 perror("fopen read iolog");
507 p = fgets(buffer, sizeof(buffer), f);
509 td_verror(td, errno, "iolog read");
510 log_err("fio: unable to read iolog\n");
516 * version 2 of the iolog stores a specific string as the
517 * first line, check for that
519 if (!strncmp(iolog_ver2, buffer, strlen(iolog_ver2)))
520 ret = read_iolog2(td, f);
522 log_err("fio: iolog version 1 is no longer supported\n");
531 * Set up a log for storing io patterns.
533 static int init_iolog_write(struct thread_data *td)
539 f = fopen(td->o.write_iolog_file, "a");
541 perror("fopen write iolog");
546 * That's it for writing, setup a log buffer and we're done.
549 td->iolog_buf = malloc(8192);
550 setvbuf(f, td->iolog_buf, _IOFBF, 8192);
553 * write our version line
555 if (fprintf(f, "%s\n", iolog_ver2) < 0) {
556 perror("iolog init\n");
561 * add all known files
563 for_each_file(td, ff, i)
564 log_file(td, ff, FIO_LOG_ADD_FILE);
569 int init_iolog(struct thread_data *td)
573 if (td->o.read_iolog_file) {
577 * Check if it's a blktrace file and load that if possible.
578 * Otherwise assume it's a normal log file and load that.
580 if (is_blktrace(td->o.read_iolog_file, &need_swap))
581 ret = load_blktrace(td, td->o.read_iolog_file, need_swap);
583 ret = init_iolog_read(td);
584 } else if (td->o.write_iolog_file)
585 ret = init_iolog_write(td);
588 td_verror(td, EINVAL, "failed initializing iolog");
593 void setup_log(struct io_log **log, struct log_params *p,
594 const char *filename)
598 struct io_u_plat_entry *entry;
599 struct flist_head *list;
601 l = scalloc(1, sizeof(*l));
602 INIT_FLIST_HEAD(&l->io_logs);
603 l->log_type = p->log_type;
604 l->log_offset = p->log_offset;
605 l->log_gz = p->log_gz;
606 l->log_gz_store = p->log_gz_store;
607 l->avg_msec = p->avg_msec;
608 l->hist_msec = p->hist_msec;
609 l->hist_coarseness = p->hist_coarseness;
610 l->filename = strdup(filename);
613 /* Initialize histogram lists for each r/w direction,
614 * with initial io_u_plat of all zeros:
616 for (i = 0; i < DDIR_RWDIR_CNT; i++) {
617 list = &l->hist_window[i].list;
618 INIT_FLIST_HEAD(list);
619 entry = calloc(1, sizeof(struct io_u_plat_entry));
620 flist_add(&entry->list, list);
623 if (l->td && l->td->o.io_submit_mode != IO_MODE_OFFLOAD) {
626 p = calloc(1, sizeof(*l->pending));
627 p->max_samples = DEF_LOG_ENTRIES;
628 p->log = calloc(p->max_samples, log_entry_sz(l));
633 l->log_ddir_mask = LOG_OFFSET_SAMPLE_BIT;
635 INIT_FLIST_HEAD(&l->chunk_list);
637 if (l->log_gz && !p->td)
639 else if (l->log_gz || l->log_gz_store) {
640 mutex_init_pshared(&l->chunk_lock);
641 mutex_init_pshared(&l->deferred_free_lock);
642 p->td->flags |= TD_F_COMPRESS_LOG;
648 #ifdef CONFIG_SETVBUF
649 static void *set_file_buffer(FILE *f)
651 size_t size = 1048576;
655 setvbuf(f, buf, _IOFBF, size);
659 static void clear_file_buffer(void *buf)
664 static void *set_file_buffer(FILE *f)
669 static void clear_file_buffer(void *buf)
674 void free_log(struct io_log *log)
676 while (!flist_empty(&log->io_logs)) {
677 struct io_logs *cur_log;
679 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
680 flist_del_init(&cur_log->list);
686 free(log->pending->log);
696 uint64_t hist_sum(int j, int stride, uint64_t *io_u_plat,
697 uint64_t *io_u_plat_last)
702 if (io_u_plat_last) {
703 for (k = sum = 0; k < stride; k++)
704 sum += io_u_plat[j + k] - io_u_plat_last[j + k];
706 for (k = sum = 0; k < stride; k++)
707 sum += io_u_plat[j + k];
713 static void flush_hist_samples(FILE *f, int hist_coarseness, void *samples,
714 uint64_t sample_size)
718 uint64_t i, j, nr_samples;
719 struct io_u_plat_entry *entry, *entry_before;
721 uint64_t *io_u_plat_before;
723 int stride = 1 << hist_coarseness;
728 s = __get_sample(samples, 0, 0);
729 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
731 nr_samples = sample_size / __log_entry_sz(log_offset);
733 for (i = 0; i < nr_samples; i++) {
734 s = __get_sample(samples, log_offset, i);
736 entry = s->data.plat_entry;
737 io_u_plat = entry->io_u_plat;
739 entry_before = flist_first_entry(&entry->list, struct io_u_plat_entry, list);
740 io_u_plat_before = entry_before->io_u_plat;
742 fprintf(f, "%lu, %u, %u, ", (unsigned long) s->time,
743 io_sample_ddir(s), s->bs);
744 for (j = 0; j < FIO_IO_U_PLAT_NR - stride; j += stride) {
745 fprintf(f, "%llu, ", (unsigned long long)
746 hist_sum(j, stride, io_u_plat, io_u_plat_before));
748 fprintf(f, "%llu\n", (unsigned long long)
749 hist_sum(FIO_IO_U_PLAT_NR - stride, stride, io_u_plat,
752 flist_del(&entry_before->list);
757 void flush_samples(FILE *f, void *samples, uint64_t sample_size)
761 uint64_t i, nr_samples;
766 s = __get_sample(samples, 0, 0);
767 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
769 nr_samples = sample_size / __log_entry_sz(log_offset);
771 for (i = 0; i < nr_samples; i++) {
772 s = __get_sample(samples, log_offset, i);
775 fprintf(f, "%lu, %" PRId64 ", %u, %u\n",
776 (unsigned long) s->time,
778 io_sample_ddir(s), s->bs);
780 struct io_sample_offset *so = (void *) s;
782 fprintf(f, "%lu, %" PRId64 ", %u, %u, %llu\n",
783 (unsigned long) s->time,
785 io_sample_ddir(s), s->bs,
786 (unsigned long long) so->offset);
793 struct iolog_flush_data {
794 struct workqueue_work work;
801 #define GZ_CHUNK 131072
803 static struct iolog_compress *get_new_chunk(unsigned int seq)
805 struct iolog_compress *c;
807 c = malloc(sizeof(*c));
808 INIT_FLIST_HEAD(&c->list);
809 c->buf = malloc(GZ_CHUNK);
815 static void free_chunk(struct iolog_compress *ic)
821 static int z_stream_init(z_stream *stream, int gz_hdr)
825 memset(stream, 0, sizeof(*stream));
826 stream->zalloc = Z_NULL;
827 stream->zfree = Z_NULL;
828 stream->opaque = Z_NULL;
829 stream->next_in = Z_NULL;
832 * zlib magic - add 32 for auto-detection of gz header or not,
833 * if we decide to store files in a gzip friendly format.
838 if (inflateInit2(stream, wbits) != Z_OK)
844 struct inflate_chunk_iter {
853 static void finish_chunk(z_stream *stream, FILE *f,
854 struct inflate_chunk_iter *iter)
858 ret = inflateEnd(stream);
860 log_err("fio: failed to end log inflation seq %d (%d)\n",
863 flush_samples(f, iter->buf, iter->buf_used);
866 iter->buf_size = iter->buf_used = 0;
870 * Iterative chunk inflation. Handles cases where we cross into a new
871 * sequence, doing flush finish of previous chunk if needed.
873 static size_t inflate_chunk(struct iolog_compress *ic, int gz_hdr, FILE *f,
874 z_stream *stream, struct inflate_chunk_iter *iter)
878 dprint(FD_COMPRESS, "inflate chunk size=%lu, seq=%u\n",
879 (unsigned long) ic->len, ic->seq);
881 if (ic->seq != iter->seq) {
883 finish_chunk(stream, f, iter);
885 z_stream_init(stream, gz_hdr);
889 stream->avail_in = ic->len;
890 stream->next_in = ic->buf;
892 if (!iter->buf_size) {
893 iter->buf_size = iter->chunk_sz;
894 iter->buf = malloc(iter->buf_size);
897 while (stream->avail_in) {
898 size_t this_out = iter->buf_size - iter->buf_used;
901 stream->avail_out = this_out;
902 stream->next_out = iter->buf + iter->buf_used;
904 err = inflate(stream, Z_NO_FLUSH);
906 log_err("fio: failed inflating log: %d\n", err);
911 iter->buf_used += this_out - stream->avail_out;
913 if (!stream->avail_out) {
914 iter->buf_size += iter->chunk_sz;
915 iter->buf = realloc(iter->buf, iter->buf_size);
919 if (err == Z_STREAM_END)
923 ret = (void *) stream->next_in - ic->buf;
925 dprint(FD_COMPRESS, "inflated to size=%lu\n", (unsigned long) iter->buf_size);
931 * Inflate stored compressed chunks, or write them directly to the log
932 * file if so instructed.
934 static int inflate_gz_chunks(struct io_log *log, FILE *f)
936 struct inflate_chunk_iter iter = { .chunk_sz = log->log_gz, };
939 while (!flist_empty(&log->chunk_list)) {
940 struct iolog_compress *ic;
942 ic = flist_first_entry(&log->chunk_list, struct iolog_compress, list);
943 flist_del(&ic->list);
945 if (log->log_gz_store) {
948 dprint(FD_COMPRESS, "log write chunk size=%lu, "
949 "seq=%u\n", (unsigned long) ic->len, ic->seq);
951 ret = fwrite(ic->buf, ic->len, 1, f);
952 if (ret != 1 || ferror(f)) {
954 log_err("fio: error writing compressed log\n");
957 inflate_chunk(ic, log->log_gz_store, f, &stream, &iter);
963 finish_chunk(&stream, f, &iter);
971 * Open compressed log file and decompress the stored chunks and
972 * write them to stdout. The chunks are stored sequentially in the
973 * file, so we iterate over them and do them one-by-one.
975 int iolog_file_inflate(const char *file)
977 struct inflate_chunk_iter iter = { .chunk_sz = 64 * 1024 * 1024, };
978 struct iolog_compress ic;
986 f = fopen(file, "r");
992 if (stat(file, &sb) < 0) {
998 ic.buf = buf = malloc(sb.st_size);
1002 ret = fread(ic.buf, ic.len, 1, f);
1008 } else if (ret != 1) {
1009 log_err("fio: short read on reading log\n");
1018 * Each chunk will return Z_STREAM_END. We don't know how many
1019 * chunks are in the file, so we just keep looping and incrementing
1020 * the sequence number until we have consumed the whole compressed
1027 iret = inflate_chunk(&ic, 1, stdout, &stream, &iter);
1040 finish_chunk(&stream, stdout, &iter);
1050 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1055 int iolog_file_inflate(const char *file)
1057 log_err("fio: log inflation not possible without zlib\n");
1063 void flush_log(struct io_log *log, bool do_append)
1069 f = fopen(log->filename, "w");
1071 f = fopen(log->filename, "a");
1073 perror("fopen log");
1077 buf = set_file_buffer(f);
1079 inflate_gz_chunks(log, f);
1081 while (!flist_empty(&log->io_logs)) {
1082 struct io_logs *cur_log;
1084 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1085 flist_del_init(&cur_log->list);
1087 if (log->td && log == log->td->clat_hist_log)
1088 flush_hist_samples(f, log->hist_coarseness, cur_log->log,
1089 log_sample_sz(log, cur_log));
1091 flush_samples(f, cur_log->log, log_sample_sz(log, cur_log));
1097 clear_file_buffer(buf);
1100 static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
1102 if (td->flags & TD_F_COMPRESS_LOG)
1106 if (fio_trylock_file(log->filename))
1109 fio_lock_file(log->filename);
1111 if (td->client_type == FIO_CLIENT_TYPE_GUI || is_backend)
1112 fio_send_iolog(td, log, log->filename);
1114 flush_log(log, !td->o.per_job_logs);
1116 fio_unlock_file(log->filename);
1121 size_t log_chunk_sizes(struct io_log *log)
1123 struct flist_head *entry;
1126 if (flist_empty(&log->chunk_list))
1130 pthread_mutex_lock(&log->chunk_lock);
1131 flist_for_each(entry, &log->chunk_list) {
1132 struct iolog_compress *c;
1134 c = flist_entry(entry, struct iolog_compress, list);
1137 pthread_mutex_unlock(&log->chunk_lock);
1143 static void iolog_put_deferred(struct io_log *log, void *ptr)
1148 pthread_mutex_lock(&log->deferred_free_lock);
1149 if (log->deferred < IOLOG_MAX_DEFER) {
1150 log->deferred_items[log->deferred] = ptr;
1152 } else if (!fio_did_warn(FIO_WARN_IOLOG_DROP))
1153 log_err("fio: had to drop log entry free\n");
1154 pthread_mutex_unlock(&log->deferred_free_lock);
1157 static void iolog_free_deferred(struct io_log *log)
1164 pthread_mutex_lock(&log->deferred_free_lock);
1166 for (i = 0; i < log->deferred; i++) {
1167 free(log->deferred_items[i]);
1168 log->deferred_items[i] = NULL;
1172 pthread_mutex_unlock(&log->deferred_free_lock);
1175 static int gz_work(struct iolog_flush_data *data)
1177 struct iolog_compress *c = NULL;
1178 struct flist_head list;
1184 INIT_FLIST_HEAD(&list);
1186 memset(&stream, 0, sizeof(stream));
1187 stream.zalloc = Z_NULL;
1188 stream.zfree = Z_NULL;
1189 stream.opaque = Z_NULL;
1191 ret = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
1193 log_err("fio: failed to init gz stream\n");
1197 seq = ++data->log->chunk_seq;
1199 stream.next_in = (void *) data->samples;
1200 stream.avail_in = data->nr_samples * log_entry_sz(data->log);
1202 dprint(FD_COMPRESS, "deflate input size=%lu, seq=%u, log=%s\n",
1203 (unsigned long) stream.avail_in, seq,
1204 data->log->filename);
1207 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1208 (unsigned long) c->len);
1209 c = get_new_chunk(seq);
1210 stream.avail_out = GZ_CHUNK;
1211 stream.next_out = c->buf;
1212 ret = deflate(&stream, Z_NO_FLUSH);
1214 log_err("fio: deflate log (%d)\n", ret);
1219 c->len = GZ_CHUNK - stream.avail_out;
1220 flist_add_tail(&c->list, &list);
1222 } while (stream.avail_in);
1224 stream.next_out = c->buf + c->len;
1225 stream.avail_out = GZ_CHUNK - c->len;
1227 ret = deflate(&stream, Z_FINISH);
1230 * Z_BUF_ERROR is special, it just means we need more
1231 * output space. We'll handle that below. Treat any other
1234 if (ret != Z_BUF_ERROR) {
1235 log_err("fio: deflate log (%d)\n", ret);
1236 flist_del(&c->list);
1243 c->len = GZ_CHUNK - stream.avail_out;
1245 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq, (unsigned long) c->len);
1247 if (ret != Z_STREAM_END) {
1249 c = get_new_chunk(seq);
1250 stream.avail_out = GZ_CHUNK;
1251 stream.next_out = c->buf;
1252 ret = deflate(&stream, Z_FINISH);
1253 c->len = GZ_CHUNK - stream.avail_out;
1255 flist_add_tail(&c->list, &list);
1256 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1257 (unsigned long) c->len);
1258 } while (ret != Z_STREAM_END);
1261 dprint(FD_COMPRESS, "deflated to size=%lu\n", (unsigned long) total);
1263 ret = deflateEnd(&stream);
1265 log_err("fio: deflateEnd %d\n", ret);
1267 iolog_put_deferred(data->log, data->samples);
1269 if (!flist_empty(&list)) {
1270 pthread_mutex_lock(&data->log->chunk_lock);
1271 flist_splice_tail(&list, &data->log->chunk_list);
1272 pthread_mutex_unlock(&data->log->chunk_lock);
1281 while (!flist_empty(&list)) {
1282 c = flist_first_entry(list.next, struct iolog_compress, list);
1283 flist_del(&c->list);
1291 * Invoked from our compress helper thread, when logging would have exceeded
1292 * the specified memory limitation. Compresses the previously stored
1295 static int gz_work_async(struct submit_worker *sw, struct workqueue_work *work)
1297 return gz_work(container_of(work, struct iolog_flush_data, work));
1300 static int gz_init_worker(struct submit_worker *sw)
1302 struct thread_data *td = sw->wq->td;
1304 if (!fio_option_is_set(&td->o, log_gz_cpumask))
1307 if (fio_setaffinity(gettid(), td->o.log_gz_cpumask) == -1) {
1308 log_err("gz: failed to set CPU affinity\n");
1315 static struct workqueue_ops log_compress_wq_ops = {
1316 .fn = gz_work_async,
1317 .init_worker_fn = gz_init_worker,
1321 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1323 if (!(td->flags & TD_F_COMPRESS_LOG))
1326 workqueue_init(td, &td->log_compress_wq, &log_compress_wq_ops, 1, sk_out);
1330 void iolog_compress_exit(struct thread_data *td)
1332 if (!(td->flags & TD_F_COMPRESS_LOG))
1335 workqueue_exit(&td->log_compress_wq);
1339 * Queue work item to compress the existing log entries. We reset the
1340 * current log to a small size, and reference the existing log in the
1341 * data that we queue for compression. Once compression has been done,
1342 * this old log is freed. If called with finish == true, will not return
1343 * until the log compression has completed, and will flush all previous
1346 static int iolog_flush(struct io_log *log)
1348 struct iolog_flush_data *data;
1350 data = malloc(sizeof(*data));
1357 while (!flist_empty(&log->io_logs)) {
1358 struct io_logs *cur_log;
1360 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1361 flist_del_init(&cur_log->list);
1363 data->samples = cur_log->log;
1364 data->nr_samples = cur_log->nr_samples;
1375 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1377 struct iolog_flush_data *data;
1379 data = smalloc(sizeof(*data));
1385 data->samples = cur_log->log;
1386 data->nr_samples = cur_log->nr_samples;
1389 cur_log->nr_samples = cur_log->max_samples = 0;
1390 cur_log->log = NULL;
1392 workqueue_enqueue(&log->td->log_compress_wq, &data->work);
1394 iolog_free_deferred(log);
1400 static int iolog_flush(struct io_log *log)
1405 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1410 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1415 void iolog_compress_exit(struct thread_data *td)
1421 struct io_logs *iolog_cur_log(struct io_log *log)
1423 if (flist_empty(&log->io_logs))
1426 return flist_last_entry(&log->io_logs, struct io_logs, list);
1429 uint64_t iolog_nr_samples(struct io_log *iolog)
1431 struct flist_head *entry;
1434 flist_for_each(entry, &iolog->io_logs) {
1435 struct io_logs *cur_log;
1437 cur_log = flist_entry(entry, struct io_logs, list);
1438 ret += cur_log->nr_samples;
1444 static int __write_log(struct thread_data *td, struct io_log *log, int try)
1447 return finish_log(td, log, try);
1452 static int write_iops_log(struct thread_data *td, int try, bool unit_log)
1456 if (per_unit_log(td->iops_log) != unit_log)
1459 ret = __write_log(td, td->iops_log, try);
1461 td->iops_log = NULL;
1466 static int write_slat_log(struct thread_data *td, int try, bool unit_log)
1473 ret = __write_log(td, td->slat_log, try);
1475 td->slat_log = NULL;
1480 static int write_clat_log(struct thread_data *td, int try, bool unit_log)
1487 ret = __write_log(td, td->clat_log, try);
1489 td->clat_log = NULL;
1494 static int write_clat_hist_log(struct thread_data *td, int try, bool unit_log)
1501 ret = __write_log(td, td->clat_hist_log, try);
1503 td->clat_hist_log = NULL;
1508 static int write_lat_log(struct thread_data *td, int try, bool unit_log)
1515 ret = __write_log(td, td->lat_log, try);
1522 static int write_bandw_log(struct thread_data *td, int try, bool unit_log)
1526 if (per_unit_log(td->bw_log) != unit_log)
1529 ret = __write_log(td, td->bw_log, try);
1542 CLAT_HIST_LOG_MASK = 32,
1549 int (*fn)(struct thread_data *, int, bool);
1552 static struct log_type log_types[] = {
1554 .mask = BW_LOG_MASK,
1555 .fn = write_bandw_log,
1558 .mask = LAT_LOG_MASK,
1559 .fn = write_lat_log,
1562 .mask = SLAT_LOG_MASK,
1563 .fn = write_slat_log,
1566 .mask = CLAT_LOG_MASK,
1567 .fn = write_clat_log,
1570 .mask = IOPS_LOG_MASK,
1571 .fn = write_iops_log,
1574 .mask = CLAT_HIST_LOG_MASK,
1575 .fn = write_clat_hist_log,
1579 void td_writeout_logs(struct thread_data *td, bool unit_logs)
1581 unsigned int log_mask = 0;
1582 unsigned int log_left = ALL_LOG_NR;
1585 old_state = td_bump_runstate(td, TD_FINISHING);
1587 finalize_logs(td, unit_logs);
1590 int prev_log_left = log_left;
1592 for (i = 0; i < ALL_LOG_NR && log_left; i++) {
1593 struct log_type *lt = &log_types[i];
1596 if (!(log_mask & lt->mask)) {
1597 ret = lt->fn(td, log_left != 1, unit_logs);
1600 log_mask |= lt->mask;
1605 if (prev_log_left == log_left)
1609 td_restore_runstate(td, old_state);
1612 void fio_writeout_logs(bool unit_logs)
1614 struct thread_data *td;
1618 td_writeout_logs(td, unit_logs);