2 * Code related to writing an iolog of what a thread is doing, and to
3 * later read that back and replay
23 static int iolog_flush(struct io_log *log);
25 static const char iolog_ver2[] = "fio version 2 iolog";
27 void queue_io_piece(struct thread_data *td, struct io_piece *ipo)
29 flist_add_tail(&ipo->list, &td->io_log_list);
30 td->total_io_size += ipo->len;
33 void log_io_u(const struct thread_data *td, const struct io_u *io_u)
35 if (!td->o.write_iolog_file)
38 fprintf(td->iolog_f, "%s %s %llu %lu\n", io_u->file->file_name,
39 io_ddir_name(io_u->ddir),
40 io_u->offset, io_u->buflen);
43 void log_file(struct thread_data *td, struct fio_file *f,
44 enum file_log_act what)
46 const char *act[] = { "add", "open", "close" };
50 if (!td->o.write_iolog_file)
55 * this happens on the pre-open/close done before the job starts
60 fprintf(td->iolog_f, "%s %s\n", f->file_name, act[what]);
63 static void iolog_delay(struct thread_data *td, unsigned long delay)
65 uint64_t usec = utime_since_now(&td->last_issue);
69 if (delay < td->time_offset) {
74 delay -= td->time_offset;
80 fio_gettime(&tv, NULL);
81 while (delay && !td->terminate) {
83 if (this_delay > 500000)
86 usec_sleep(td, this_delay);
90 usec = utime_since_now(&tv);
92 td->time_offset = usec - delay;
97 static int ipo_special(struct thread_data *td, struct io_piece *ipo)
105 if (ipo->ddir != DDIR_INVAL)
108 f = td->files[ipo->fileno];
110 switch (ipo->file_action) {
111 case FIO_LOG_OPEN_FILE:
112 if (td->o.replay_redirect && fio_file_open(f)) {
113 dprint(FD_FILE, "iolog: ignoring re-open of file %s\n",
117 ret = td_io_open_file(td, f);
120 td_verror(td, ret, "iolog open file");
122 case FIO_LOG_CLOSE_FILE:
123 td_io_close_file(td, f);
125 case FIO_LOG_UNLINK_FILE:
126 td_io_unlink_file(td, f);
129 log_err("fio: bad file action %d\n", ipo->file_action);
136 int read_iolog_get(struct thread_data *td, struct io_u *io_u)
138 struct io_piece *ipo;
139 unsigned long elapsed;
141 while (!flist_empty(&td->io_log_list)) {
144 ipo = flist_first_entry(&td->io_log_list, struct io_piece, list);
145 flist_del(&ipo->list);
146 remove_trim_entry(td, ipo);
148 ret = ipo_special(td, ipo);
152 } else if (ret > 0) {
157 io_u->ddir = ipo->ddir;
158 if (ipo->ddir != DDIR_WAIT) {
159 io_u->offset = ipo->offset;
160 io_u->buflen = ipo->len;
161 io_u->file = td->files[ipo->fileno];
162 get_file(io_u->file);
163 dprint(FD_IO, "iolog: get %llu/%lu/%s\n", io_u->offset,
164 io_u->buflen, io_u->file->file_name);
166 iolog_delay(td, ipo->delay);
168 elapsed = mtime_since_genesis();
169 if (ipo->delay > elapsed)
170 usec_sleep(td, (ipo->delay - elapsed) * 1000);
175 if (io_u->ddir != DDIR_WAIT)
183 void prune_io_piece_log(struct thread_data *td)
185 struct io_piece *ipo;
188 while ((n = rb_first(&td->io_hist_tree)) != NULL) {
189 ipo = rb_entry(n, struct io_piece, rb_node);
190 rb_erase(n, &td->io_hist_tree);
191 remove_trim_entry(td, ipo);
196 while (!flist_empty(&td->io_hist_list)) {
197 ipo = flist_first_entry(&td->io_hist_list, struct io_piece, list);
198 flist_del(&ipo->list);
199 remove_trim_entry(td, ipo);
206 * log a successful write, so we can unwind the log for verify
208 void log_io_piece(struct thread_data *td, struct io_u *io_u)
210 struct rb_node **p, *parent;
211 struct io_piece *ipo, *__ipo;
213 ipo = malloc(sizeof(struct io_piece));
215 ipo->file = io_u->file;
216 ipo->offset = io_u->offset;
217 ipo->len = io_u->buflen;
218 ipo->numberio = io_u->numberio;
219 ipo->flags = IP_F_IN_FLIGHT;
223 if (io_u_should_trim(td, io_u)) {
224 flist_add_tail(&ipo->trim_list, &td->trim_list);
229 * We don't need to sort the entries, if:
231 * Sequential writes, or
232 * Random writes that lay out the file as it goes along
234 * For both these cases, just reading back data in the order we
235 * wrote it out is the fastest.
237 * One exception is if we don't have a random map AND we are doing
238 * verifies, in that case we need to check for duplicate blocks and
239 * drop the old one, which we rely on the rb insert/lookup for
242 if (((!td->o.verifysort) || !td_random(td) || !td->o.overwrite) &&
243 (file_randommap(td, ipo->file) || td->o.verify == VERIFY_NONE)) {
244 INIT_FLIST_HEAD(&ipo->list);
245 flist_add_tail(&ipo->list, &td->io_hist_list);
246 ipo->flags |= IP_F_ONLIST;
251 RB_CLEAR_NODE(&ipo->rb_node);
254 * Sort the entry into the verification list
257 p = &td->io_hist_tree.rb_node;
263 __ipo = rb_entry(parent, struct io_piece, rb_node);
264 if (ipo->file < __ipo->file)
266 else if (ipo->file > __ipo->file)
268 else if (ipo->offset < __ipo->offset) {
270 overlap = ipo->offset + ipo->len > __ipo->offset;
272 else if (ipo->offset > __ipo->offset) {
274 overlap = __ipo->offset + __ipo->len > ipo->offset;
280 dprint(FD_IO, "iolog: overlap %llu/%lu, %llu/%lu",
281 __ipo->offset, __ipo->len,
282 ipo->offset, ipo->len);
284 rb_erase(parent, &td->io_hist_tree);
285 remove_trim_entry(td, __ipo);
291 rb_link_node(&ipo->rb_node, parent, p);
292 rb_insert_color(&ipo->rb_node, &td->io_hist_tree);
293 ipo->flags |= IP_F_ONRB;
297 void unlog_io_piece(struct thread_data *td, struct io_u *io_u)
299 struct io_piece *ipo = io_u->ipo;
301 if (td->ts.nr_block_infos) {
302 uint32_t *info = io_u_block_info(td, io_u);
303 if (BLOCK_INFO_STATE(*info) < BLOCK_STATE_TRIM_FAILURE) {
304 if (io_u->ddir == DDIR_TRIM)
305 *info = BLOCK_INFO_SET_STATE(*info,
306 BLOCK_STATE_TRIM_FAILURE);
307 else if (io_u->ddir == DDIR_WRITE)
308 *info = BLOCK_INFO_SET_STATE(*info,
309 BLOCK_STATE_WRITE_FAILURE);
316 if (ipo->flags & IP_F_ONRB)
317 rb_erase(&ipo->rb_node, &td->io_hist_tree);
318 else if (ipo->flags & IP_F_ONLIST)
319 flist_del(&ipo->list);
326 void trim_io_piece(struct thread_data *td, const struct io_u *io_u)
328 struct io_piece *ipo = io_u->ipo;
333 ipo->len = io_u->xfer_buflen - io_u->resid;
336 void write_iolog_close(struct thread_data *td)
342 td->iolog_buf = NULL;
346 * Read version 2 iolog data. It is enhanced to include per-file logging,
349 static int read_iolog2(struct thread_data *td, FILE *f)
351 unsigned long long offset;
353 int reads, writes, waits, fileno = 0, file_action = 0; /* stupid gcc */
354 char *rfname, *fname, *act;
358 free_release_files(td);
361 * Read in the read iolog and store it, reuse the infrastructure
362 * for doing verifications.
365 rfname = fname = malloc(256+16);
366 act = malloc(256+16);
368 reads = writes = waits = 0;
369 while ((p = fgets(str, 4096, f)) != NULL) {
370 struct io_piece *ipo;
373 r = sscanf(p, "%256s %256s %llu %u", rfname, act, &offset,
376 if (td->o.replay_redirect)
377 fname = td->o.replay_redirect;
383 if (!strcmp(act, "wait"))
385 else if (!strcmp(act, "read"))
387 else if (!strcmp(act, "write"))
389 else if (!strcmp(act, "sync"))
391 else if (!strcmp(act, "datasync"))
393 else if (!strcmp(act, "trim"))
396 log_err("fio: bad iolog file action: %s\n",
400 fileno = get_fileno(td, fname);
403 if (!strcmp(act, "add")) {
404 if (td->o.replay_redirect &&
405 get_fileno(td, fname) != -1) {
406 dprint(FD_FILE, "iolog: ignoring"
407 " re-add of file %s\n", fname);
409 fileno = add_file(td, fname, 0, 1);
410 file_action = FIO_LOG_ADD_FILE;
413 } else if (!strcmp(act, "open")) {
414 fileno = get_fileno(td, fname);
415 file_action = FIO_LOG_OPEN_FILE;
416 } else if (!strcmp(act, "close")) {
417 fileno = get_fileno(td, fname);
418 file_action = FIO_LOG_CLOSE_FILE;
420 log_err("fio: bad iolog file action: %s\n",
425 log_err("bad iolog2: %s\n", p);
431 else if (rw == DDIR_WRITE) {
433 * Don't add a write for ro mode
438 } else if (rw == DDIR_WAIT) {
442 } else if (rw == DDIR_INVAL) {
443 } else if (!ddir_sync(rw)) {
444 log_err("bad ddir: %d\n", rw);
451 ipo = malloc(sizeof(*ipo));
454 if (rw == DDIR_WAIT) {
457 if (td->o.replay_scale)
458 ipo->offset = offset / td->o.replay_scale;
460 ipo->offset = offset;
461 ipo_bytes_align(td->o.replay_align, ipo);
464 if (rw != DDIR_INVAL && bytes > td->o.max_bs[rw])
465 td->o.max_bs[rw] = bytes;
466 ipo->fileno = fileno;
467 ipo->file_action = file_action;
471 queue_io_piece(td, ipo);
478 if (writes && read_only) {
479 log_err("fio: <%s> skips replay of %d writes due to"
480 " read-only\n", td->o.name, writes);
484 if (!reads && !writes && !waits)
486 else if (reads && !writes)
487 td->o.td_ddir = TD_DDIR_READ;
488 else if (!reads && writes)
489 td->o.td_ddir = TD_DDIR_WRITE;
491 td->o.td_ddir = TD_DDIR_RW;
497 * open iolog, check version, and call appropriate parser
499 static int init_iolog_read(struct thread_data *td)
501 char buffer[256], *p;
505 f = fopen(td->o.read_iolog_file, "r");
507 perror("fopen read iolog");
511 p = fgets(buffer, sizeof(buffer), f);
513 td_verror(td, errno, "iolog read");
514 log_err("fio: unable to read iolog\n");
520 * version 2 of the iolog stores a specific string as the
521 * first line, check for that
523 if (!strncmp(iolog_ver2, buffer, strlen(iolog_ver2)))
524 ret = read_iolog2(td, f);
526 log_err("fio: iolog version 1 is no longer supported\n");
535 * Set up a log for storing io patterns.
537 static int init_iolog_write(struct thread_data *td)
543 f = fopen(td->o.write_iolog_file, "a");
545 perror("fopen write iolog");
550 * That's it for writing, setup a log buffer and we're done.
553 td->iolog_buf = malloc(8192);
554 setvbuf(f, td->iolog_buf, _IOFBF, 8192);
557 * write our version line
559 if (fprintf(f, "%s\n", iolog_ver2) < 0) {
560 perror("iolog init\n");
565 * add all known files
567 for_each_file(td, ff, i)
568 log_file(td, ff, FIO_LOG_ADD_FILE);
573 int init_iolog(struct thread_data *td)
577 if (td->o.read_iolog_file) {
581 * Check if it's a blktrace file and load that if possible.
582 * Otherwise assume it's a normal log file and load that.
584 if (is_blktrace(td->o.read_iolog_file, &need_swap))
585 ret = load_blktrace(td, td->o.read_iolog_file, need_swap);
587 ret = init_iolog_read(td);
588 } else if (td->o.write_iolog_file)
589 ret = init_iolog_write(td);
592 td_verror(td, EINVAL, "failed initializing iolog");
597 void setup_log(struct io_log **log, struct log_params *p,
598 const char *filename)
602 struct io_u_plat_entry *entry;
603 struct flist_head *list;
605 l = scalloc(1, sizeof(*l));
606 INIT_FLIST_HEAD(&l->io_logs);
607 l->log_type = p->log_type;
608 l->log_offset = p->log_offset;
609 l->log_gz = p->log_gz;
610 l->log_gz_store = p->log_gz_store;
611 l->avg_msec = p->avg_msec;
612 l->hist_msec = p->hist_msec;
613 l->hist_coarseness = p->hist_coarseness;
614 l->filename = strdup(filename);
617 /* Initialize histogram lists for each r/w direction,
618 * with initial io_u_plat of all zeros:
620 for (i = 0; i < DDIR_RWDIR_CNT; i++) {
621 list = &l->hist_window[i].list;
622 INIT_FLIST_HEAD(list);
623 entry = calloc(1, sizeof(struct io_u_plat_entry));
624 flist_add(&entry->list, list);
627 if (l->td && l->td->o.io_submit_mode != IO_MODE_OFFLOAD) {
630 p = calloc(1, sizeof(*l->pending));
631 p->max_samples = DEF_LOG_ENTRIES;
632 p->log = calloc(p->max_samples, log_entry_sz(l));
637 l->log_ddir_mask = LOG_OFFSET_SAMPLE_BIT;
639 INIT_FLIST_HEAD(&l->chunk_list);
641 if (l->log_gz && !p->td)
643 else if (l->log_gz || l->log_gz_store) {
644 mutex_init_pshared(&l->chunk_lock);
645 p->td->flags |= TD_F_COMPRESS_LOG;
651 #ifdef CONFIG_SETVBUF
652 static void *set_file_buffer(FILE *f)
654 size_t size = 1048576;
658 setvbuf(f, buf, _IOFBF, size);
662 static void clear_file_buffer(void *buf)
667 static void *set_file_buffer(FILE *f)
672 static void clear_file_buffer(void *buf)
677 void free_log(struct io_log *log)
679 while (!flist_empty(&log->io_logs)) {
680 struct io_logs *cur_log;
682 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
683 flist_del_init(&cur_log->list);
689 free(log->pending->log);
699 inline unsigned long hist_sum(int j, int stride, unsigned int *io_u_plat,
700 unsigned int *io_u_plat_last)
705 if (io_u_plat_last) {
706 for (k = sum = 0; k < stride; k++)
707 sum += io_u_plat[j + k] - io_u_plat_last[j + k];
709 for (k = sum = 0; k < stride; k++)
710 sum += io_u_plat[j + k];
716 static void flush_hist_samples(FILE *f, int hist_coarseness, void *samples,
717 uint64_t sample_size)
721 uint64_t i, j, nr_samples;
722 struct io_u_plat_entry *entry, *entry_before;
723 unsigned int *io_u_plat;
724 unsigned int *io_u_plat_before;
726 int stride = 1 << hist_coarseness;
731 s = __get_sample(samples, 0, 0);
732 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
734 nr_samples = sample_size / __log_entry_sz(log_offset);
736 for (i = 0; i < nr_samples; i++) {
737 s = __get_sample(samples, log_offset, i);
739 entry = s->data.plat_entry;
740 io_u_plat = entry->io_u_plat;
742 entry_before = flist_first_entry(&entry->list, struct io_u_plat_entry, list);
743 io_u_plat_before = entry_before->io_u_plat;
745 fprintf(f, "%lu, %u, %u, ", (unsigned long) s->time,
746 io_sample_ddir(s), s->bs);
747 for (j = 0; j < FIO_IO_U_PLAT_NR - stride; j += stride) {
748 fprintf(f, "%lu, ", hist_sum(j, stride, io_u_plat,
751 fprintf(f, "%lu\n", (unsigned long)
752 hist_sum(FIO_IO_U_PLAT_NR - stride, stride, io_u_plat,
755 flist_del(&entry_before->list);
760 void flush_samples(FILE *f, void *samples, uint64_t sample_size)
764 uint64_t i, nr_samples;
769 s = __get_sample(samples, 0, 0);
770 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
772 nr_samples = sample_size / __log_entry_sz(log_offset);
774 for (i = 0; i < nr_samples; i++) {
775 s = __get_sample(samples, log_offset, i);
778 fprintf(f, "%lu, %" PRId64 ", %u, %u\n",
779 (unsigned long) s->time,
781 io_sample_ddir(s), s->bs);
783 struct io_sample_offset *so = (void *) s;
785 fprintf(f, "%lu, %" PRId64 ", %u, %u, %llu\n",
786 (unsigned long) s->time,
788 io_sample_ddir(s), s->bs,
789 (unsigned long long) so->offset);
796 struct iolog_flush_data {
797 struct workqueue_work work;
804 #define GZ_CHUNK 131072
806 static struct iolog_compress *get_new_chunk(unsigned int seq)
808 struct iolog_compress *c;
810 c = malloc(sizeof(*c));
811 INIT_FLIST_HEAD(&c->list);
812 c->buf = malloc(GZ_CHUNK);
818 static void free_chunk(struct iolog_compress *ic)
824 static int z_stream_init(z_stream *stream, int gz_hdr)
828 memset(stream, 0, sizeof(*stream));
829 stream->zalloc = Z_NULL;
830 stream->zfree = Z_NULL;
831 stream->opaque = Z_NULL;
832 stream->next_in = Z_NULL;
835 * zlib magic - add 32 for auto-detection of gz header or not,
836 * if we decide to store files in a gzip friendly format.
841 if (inflateInit2(stream, wbits) != Z_OK)
847 struct inflate_chunk_iter {
856 static void finish_chunk(z_stream *stream, FILE *f,
857 struct inflate_chunk_iter *iter)
861 ret = inflateEnd(stream);
863 log_err("fio: failed to end log inflation seq %d (%d)\n",
866 flush_samples(f, iter->buf, iter->buf_used);
869 iter->buf_size = iter->buf_used = 0;
873 * Iterative chunk inflation. Handles cases where we cross into a new
874 * sequence, doing flush finish of previous chunk if needed.
876 static size_t inflate_chunk(struct iolog_compress *ic, int gz_hdr, FILE *f,
877 z_stream *stream, struct inflate_chunk_iter *iter)
881 dprint(FD_COMPRESS, "inflate chunk size=%lu, seq=%u\n",
882 (unsigned long) ic->len, ic->seq);
884 if (ic->seq != iter->seq) {
886 finish_chunk(stream, f, iter);
888 z_stream_init(stream, gz_hdr);
892 stream->avail_in = ic->len;
893 stream->next_in = ic->buf;
895 if (!iter->buf_size) {
896 iter->buf_size = iter->chunk_sz;
897 iter->buf = malloc(iter->buf_size);
900 while (stream->avail_in) {
901 size_t this_out = iter->buf_size - iter->buf_used;
904 stream->avail_out = this_out;
905 stream->next_out = iter->buf + iter->buf_used;
907 err = inflate(stream, Z_NO_FLUSH);
909 log_err("fio: failed inflating log: %d\n", err);
914 iter->buf_used += this_out - stream->avail_out;
916 if (!stream->avail_out) {
917 iter->buf_size += iter->chunk_sz;
918 iter->buf = realloc(iter->buf, iter->buf_size);
922 if (err == Z_STREAM_END)
926 ret = (void *) stream->next_in - ic->buf;
928 dprint(FD_COMPRESS, "inflated to size=%lu\n", (unsigned long) iter->buf_size);
934 * Inflate stored compressed chunks, or write them directly to the log
935 * file if so instructed.
937 static int inflate_gz_chunks(struct io_log *log, FILE *f)
939 struct inflate_chunk_iter iter = { .chunk_sz = log->log_gz, };
942 while (!flist_empty(&log->chunk_list)) {
943 struct iolog_compress *ic;
945 ic = flist_first_entry(&log->chunk_list, struct iolog_compress, list);
946 flist_del(&ic->list);
948 if (log->log_gz_store) {
951 dprint(FD_COMPRESS, "log write chunk size=%lu, "
952 "seq=%u\n", (unsigned long) ic->len, ic->seq);
954 ret = fwrite(ic->buf, ic->len, 1, f);
955 if (ret != 1 || ferror(f)) {
957 log_err("fio: error writing compressed log\n");
960 inflate_chunk(ic, log->log_gz_store, f, &stream, &iter);
966 finish_chunk(&stream, f, &iter);
974 * Open compressed log file and decompress the stored chunks and
975 * write them to stdout. The chunks are stored sequentially in the
976 * file, so we iterate over them and do them one-by-one.
978 int iolog_file_inflate(const char *file)
980 struct inflate_chunk_iter iter = { .chunk_sz = 64 * 1024 * 1024, };
981 struct iolog_compress ic;
989 f = fopen(file, "r");
995 if (stat(file, &sb) < 0) {
1001 ic.buf = buf = malloc(sb.st_size);
1002 ic.len = sb.st_size;
1005 ret = fread(ic.buf, ic.len, 1, f);
1011 } else if (ret != 1) {
1012 log_err("fio: short read on reading log\n");
1021 * Each chunk will return Z_STREAM_END. We don't know how many
1022 * chunks are in the file, so we just keep looping and incrementing
1023 * the sequence number until we have consumed the whole compressed
1030 iret = inflate_chunk(&ic, 1, stdout, &stream, &iter);
1043 finish_chunk(&stream, stdout, &iter);
1053 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1058 int iolog_file_inflate(const char *file)
1060 log_err("fio: log inflation not possible without zlib\n");
1066 void flush_log(struct io_log *log, bool do_append)
1072 f = fopen(log->filename, "w");
1074 f = fopen(log->filename, "a");
1076 perror("fopen log");
1080 buf = set_file_buffer(f);
1082 inflate_gz_chunks(log, f);
1084 while (!flist_empty(&log->io_logs)) {
1085 struct io_logs *cur_log;
1087 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1088 flist_del_init(&cur_log->list);
1090 if (log->td && log == log->td->clat_hist_log)
1091 flush_hist_samples(f, log->hist_coarseness, cur_log->log,
1092 log_sample_sz(log, cur_log));
1094 flush_samples(f, cur_log->log, log_sample_sz(log, cur_log));
1100 clear_file_buffer(buf);
1103 static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
1105 if (td->flags & TD_F_COMPRESS_LOG)
1109 if (fio_trylock_file(log->filename))
1112 fio_lock_file(log->filename);
1114 if (td->client_type == FIO_CLIENT_TYPE_GUI || is_backend)
1115 fio_send_iolog(td, log, log->filename);
1117 flush_log(log, !td->o.per_job_logs);
1119 fio_unlock_file(log->filename);
1124 size_t log_chunk_sizes(struct io_log *log)
1126 struct flist_head *entry;
1129 if (flist_empty(&log->chunk_list))
1133 pthread_mutex_lock(&log->chunk_lock);
1134 flist_for_each(entry, &log->chunk_list) {
1135 struct iolog_compress *c;
1137 c = flist_entry(entry, struct iolog_compress, list);
1140 pthread_mutex_unlock(&log->chunk_lock);
1146 static int gz_work(struct iolog_flush_data *data)
1148 struct iolog_compress *c = NULL;
1149 struct flist_head list;
1155 INIT_FLIST_HEAD(&list);
1157 memset(&stream, 0, sizeof(stream));
1158 stream.zalloc = Z_NULL;
1159 stream.zfree = Z_NULL;
1160 stream.opaque = Z_NULL;
1162 ret = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
1164 log_err("fio: failed to init gz stream\n");
1168 seq = ++data->log->chunk_seq;
1170 stream.next_in = (void *) data->samples;
1171 stream.avail_in = data->nr_samples * log_entry_sz(data->log);
1173 dprint(FD_COMPRESS, "deflate input size=%lu, seq=%u, log=%s\n",
1174 (unsigned long) stream.avail_in, seq,
1175 data->log->filename);
1178 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1179 (unsigned long) c->len);
1180 c = get_new_chunk(seq);
1181 stream.avail_out = GZ_CHUNK;
1182 stream.next_out = c->buf;
1183 ret = deflate(&stream, Z_NO_FLUSH);
1185 log_err("fio: deflate log (%d)\n", ret);
1190 c->len = GZ_CHUNK - stream.avail_out;
1191 flist_add_tail(&c->list, &list);
1193 } while (stream.avail_in);
1195 stream.next_out = c->buf + c->len;
1196 stream.avail_out = GZ_CHUNK - c->len;
1198 ret = deflate(&stream, Z_FINISH);
1201 * Z_BUF_ERROR is special, it just means we need more
1202 * output space. We'll handle that below. Treat any other
1205 if (ret != Z_BUF_ERROR) {
1206 log_err("fio: deflate log (%d)\n", ret);
1207 flist_del(&c->list);
1214 c->len = GZ_CHUNK - stream.avail_out;
1216 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq, (unsigned long) c->len);
1218 if (ret != Z_STREAM_END) {
1220 c = get_new_chunk(seq);
1221 stream.avail_out = GZ_CHUNK;
1222 stream.next_out = c->buf;
1223 ret = deflate(&stream, Z_FINISH);
1224 c->len = GZ_CHUNK - stream.avail_out;
1226 flist_add_tail(&c->list, &list);
1227 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1228 (unsigned long) c->len);
1229 } while (ret != Z_STREAM_END);
1232 dprint(FD_COMPRESS, "deflated to size=%lu\n", (unsigned long) total);
1234 ret = deflateEnd(&stream);
1236 log_err("fio: deflateEnd %d\n", ret);
1238 free(data->samples);
1240 if (!flist_empty(&list)) {
1241 pthread_mutex_lock(&data->log->chunk_lock);
1242 flist_splice_tail(&list, &data->log->chunk_list);
1243 pthread_mutex_unlock(&data->log->chunk_lock);
1252 while (!flist_empty(&list)) {
1253 c = flist_first_entry(list.next, struct iolog_compress, list);
1254 flist_del(&c->list);
1262 * Invoked from our compress helper thread, when logging would have exceeded
1263 * the specified memory limitation. Compresses the previously stored
1266 static int gz_work_async(struct submit_worker *sw, struct workqueue_work *work)
1268 return gz_work(container_of(work, struct iolog_flush_data, work));
1271 static int gz_init_worker(struct submit_worker *sw)
1273 struct thread_data *td = sw->wq->td;
1275 if (!fio_option_is_set(&td->o, log_gz_cpumask))
1278 if (fio_setaffinity(gettid(), td->o.log_gz_cpumask) == -1) {
1279 log_err("gz: failed to set CPU affinity\n");
1286 static struct workqueue_ops log_compress_wq_ops = {
1287 .fn = gz_work_async,
1288 .init_worker_fn = gz_init_worker,
1292 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1294 if (!(td->flags & TD_F_COMPRESS_LOG))
1297 workqueue_init(td, &td->log_compress_wq, &log_compress_wq_ops, 1, sk_out);
1301 void iolog_compress_exit(struct thread_data *td)
1303 if (!(td->flags & TD_F_COMPRESS_LOG))
1306 workqueue_exit(&td->log_compress_wq);
1310 * Queue work item to compress the existing log entries. We reset the
1311 * current log to a small size, and reference the existing log in the
1312 * data that we queue for compression. Once compression has been done,
1313 * this old log is freed. If called with finish == true, will not return
1314 * until the log compression has completed, and will flush all previous
1317 static int iolog_flush(struct io_log *log)
1319 struct iolog_flush_data *data;
1321 data = malloc(sizeof(*data));
1328 while (!flist_empty(&log->io_logs)) {
1329 struct io_logs *cur_log;
1331 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1332 flist_del_init(&cur_log->list);
1334 data->samples = cur_log->log;
1335 data->nr_samples = cur_log->nr_samples;
1346 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1348 struct iolog_flush_data *data;
1350 data = malloc(sizeof(*data));
1356 data->samples = cur_log->log;
1357 data->nr_samples = cur_log->nr_samples;
1360 cur_log->nr_samples = cur_log->max_samples = 0;
1361 cur_log->log = NULL;
1363 workqueue_enqueue(&log->td->log_compress_wq, &data->work);
1368 static int iolog_flush(struct io_log *log)
1373 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1378 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1383 void iolog_compress_exit(struct thread_data *td)
1389 struct io_logs *iolog_cur_log(struct io_log *log)
1391 if (flist_empty(&log->io_logs))
1394 return flist_last_entry(&log->io_logs, struct io_logs, list);
1397 uint64_t iolog_nr_samples(struct io_log *iolog)
1399 struct flist_head *entry;
1402 flist_for_each(entry, &iolog->io_logs) {
1403 struct io_logs *cur_log;
1405 cur_log = flist_entry(entry, struct io_logs, list);
1406 ret += cur_log->nr_samples;
1412 static int __write_log(struct thread_data *td, struct io_log *log, int try)
1415 return finish_log(td, log, try);
1420 static int write_iops_log(struct thread_data *td, int try, bool unit_log)
1424 if (per_unit_log(td->iops_log) != unit_log)
1427 ret = __write_log(td, td->iops_log, try);
1429 td->iops_log = NULL;
1434 static int write_slat_log(struct thread_data *td, int try, bool unit_log)
1441 ret = __write_log(td, td->slat_log, try);
1443 td->slat_log = NULL;
1448 static int write_clat_log(struct thread_data *td, int try, bool unit_log)
1455 ret = __write_log(td, td->clat_log, try);
1457 td->clat_log = NULL;
1462 static int write_clat_hist_log(struct thread_data *td, int try, bool unit_log)
1469 ret = __write_log(td, td->clat_hist_log, try);
1471 td->clat_hist_log = NULL;
1476 static int write_lat_log(struct thread_data *td, int try, bool unit_log)
1483 ret = __write_log(td, td->lat_log, try);
1490 static int write_bandw_log(struct thread_data *td, int try, bool unit_log)
1494 if (per_unit_log(td->bw_log) != unit_log)
1497 ret = __write_log(td, td->bw_log, try);
1510 CLAT_HIST_LOG_MASK = 32,
1517 int (*fn)(struct thread_data *, int, bool);
1520 static struct log_type log_types[] = {
1522 .mask = BW_LOG_MASK,
1523 .fn = write_bandw_log,
1526 .mask = LAT_LOG_MASK,
1527 .fn = write_lat_log,
1530 .mask = SLAT_LOG_MASK,
1531 .fn = write_slat_log,
1534 .mask = CLAT_LOG_MASK,
1535 .fn = write_clat_log,
1538 .mask = IOPS_LOG_MASK,
1539 .fn = write_iops_log,
1542 .mask = CLAT_HIST_LOG_MASK,
1543 .fn = write_clat_hist_log,
1547 void td_writeout_logs(struct thread_data *td, bool unit_logs)
1549 unsigned int log_mask = 0;
1550 unsigned int log_left = ALL_LOG_NR;
1553 old_state = td_bump_runstate(td, TD_FINISHING);
1555 finalize_logs(td, unit_logs);
1558 int prev_log_left = log_left;
1560 for (i = 0; i < ALL_LOG_NR && log_left; i++) {
1561 struct log_type *lt = &log_types[i];
1564 if (!(log_mask & lt->mask)) {
1565 ret = lt->fn(td, log_left != 1, unit_logs);
1568 log_mask |= lt->mask;
1573 if (prev_log_left == log_left)
1577 td_restore_runstate(td, old_state);
1580 void fio_writeout_logs(bool unit_logs)
1582 struct thread_data *td;
1586 td_writeout_logs(td, unit_logs);