2 * Code related to writing an iolog of what a thread is doing, and to
3 * later read that back and replay
24 static int iolog_flush(struct io_log *log);
26 static const char iolog_ver2[] = "fio version 2 iolog";
28 void queue_io_piece(struct thread_data *td, struct io_piece *ipo)
30 flist_add_tail(&ipo->list, &td->io_log_list);
31 td->total_io_size += ipo->len;
34 void log_io_u(const struct thread_data *td, const struct io_u *io_u)
36 if (!td->o.write_iolog_file)
39 fprintf(td->iolog_f, "%s %s %llu %lu\n", io_u->file->file_name,
40 io_ddir_name(io_u->ddir),
41 io_u->offset, io_u->buflen);
44 void log_file(struct thread_data *td, struct fio_file *f,
45 enum file_log_act what)
47 const char *act[] = { "add", "open", "close" };
51 if (!td->o.write_iolog_file)
56 * this happens on the pre-open/close done before the job starts
61 fprintf(td->iolog_f, "%s %s\n", f->file_name, act[what]);
64 static void iolog_delay(struct thread_data *td, unsigned long delay)
66 uint64_t usec = utime_since_now(&td->last_issue);
70 if (delay < td->time_offset) {
75 delay -= td->time_offset;
81 fio_gettime(&ts, NULL);
82 while (delay && !td->terminate) {
84 if (this_delay > 500000)
87 usec_sleep(td, this_delay);
91 usec = utime_since_now(&ts);
93 td->time_offset = usec - delay;
98 static int ipo_special(struct thread_data *td, struct io_piece *ipo)
106 if (ipo->ddir != DDIR_INVAL)
109 f = td->files[ipo->fileno];
111 switch (ipo->file_action) {
112 case FIO_LOG_OPEN_FILE:
113 if (td->o.replay_redirect && fio_file_open(f)) {
114 dprint(FD_FILE, "iolog: ignoring re-open of file %s\n",
118 ret = td_io_open_file(td, f);
121 td_verror(td, ret, "iolog open file");
123 case FIO_LOG_CLOSE_FILE:
124 td_io_close_file(td, f);
126 case FIO_LOG_UNLINK_FILE:
127 td_io_unlink_file(td, f);
130 log_err("fio: bad file action %d\n", ipo->file_action);
137 int read_iolog_get(struct thread_data *td, struct io_u *io_u)
139 struct io_piece *ipo;
140 unsigned long elapsed;
142 while (!flist_empty(&td->io_log_list)) {
145 ipo = flist_first_entry(&td->io_log_list, struct io_piece, list);
146 flist_del(&ipo->list);
147 remove_trim_entry(td, ipo);
149 ret = ipo_special(td, ipo);
153 } else if (ret > 0) {
158 io_u->ddir = ipo->ddir;
159 if (ipo->ddir != DDIR_WAIT) {
160 io_u->offset = ipo->offset;
161 io_u->buflen = ipo->len;
162 io_u->file = td->files[ipo->fileno];
163 get_file(io_u->file);
164 dprint(FD_IO, "iolog: get %llu/%lu/%s\n", io_u->offset,
165 io_u->buflen, io_u->file->file_name);
167 iolog_delay(td, ipo->delay);
169 elapsed = mtime_since_genesis();
170 if (ipo->delay > elapsed)
171 usec_sleep(td, (ipo->delay - elapsed) * 1000);
176 if (io_u->ddir != DDIR_WAIT)
184 void prune_io_piece_log(struct thread_data *td)
186 struct io_piece *ipo;
189 while ((n = rb_first(&td->io_hist_tree)) != NULL) {
190 ipo = rb_entry(n, struct io_piece, rb_node);
191 rb_erase(n, &td->io_hist_tree);
192 remove_trim_entry(td, ipo);
197 while (!flist_empty(&td->io_hist_list)) {
198 ipo = flist_first_entry(&td->io_hist_list, struct io_piece, list);
199 flist_del(&ipo->list);
200 remove_trim_entry(td, ipo);
207 * log a successful write, so we can unwind the log for verify
209 void log_io_piece(struct thread_data *td, struct io_u *io_u)
211 struct rb_node **p, *parent;
212 struct io_piece *ipo, *__ipo;
214 ipo = malloc(sizeof(struct io_piece));
216 ipo->file = io_u->file;
217 ipo->offset = io_u->offset;
218 ipo->len = io_u->buflen;
219 ipo->numberio = io_u->numberio;
220 ipo->flags = IP_F_IN_FLIGHT;
224 if (io_u_should_trim(td, io_u)) {
225 flist_add_tail(&ipo->trim_list, &td->trim_list);
230 * We don't need to sort the entries, if:
232 * Sequential writes, or
233 * Random writes that lay out the file as it goes along
235 * For both these cases, just reading back data in the order we
236 * wrote it out is the fastest.
238 * One exception is if we don't have a random map AND we are doing
239 * verifies, in that case we need to check for duplicate blocks and
240 * drop the old one, which we rely on the rb insert/lookup for
243 if (((!td->o.verifysort) || !td_random(td) || !td->o.overwrite) &&
244 (file_randommap(td, ipo->file) || td->o.verify == VERIFY_NONE)) {
245 INIT_FLIST_HEAD(&ipo->list);
246 flist_add_tail(&ipo->list, &td->io_hist_list);
247 ipo->flags |= IP_F_ONLIST;
252 RB_CLEAR_NODE(&ipo->rb_node);
255 * Sort the entry into the verification list
258 p = &td->io_hist_tree.rb_node;
264 __ipo = rb_entry(parent, struct io_piece, rb_node);
265 if (ipo->file < __ipo->file)
267 else if (ipo->file > __ipo->file)
269 else if (ipo->offset < __ipo->offset) {
271 overlap = ipo->offset + ipo->len > __ipo->offset;
273 else if (ipo->offset > __ipo->offset) {
275 overlap = __ipo->offset + __ipo->len > ipo->offset;
281 dprint(FD_IO, "iolog: overlap %llu/%lu, %llu/%lu\n",
282 __ipo->offset, __ipo->len,
283 ipo->offset, ipo->len);
285 rb_erase(parent, &td->io_hist_tree);
286 remove_trim_entry(td, __ipo);
287 if (!(__ipo->flags & IP_F_IN_FLIGHT))
293 rb_link_node(&ipo->rb_node, parent, p);
294 rb_insert_color(&ipo->rb_node, &td->io_hist_tree);
295 ipo->flags |= IP_F_ONRB;
299 void unlog_io_piece(struct thread_data *td, struct io_u *io_u)
301 struct io_piece *ipo = io_u->ipo;
303 if (td->ts.nr_block_infos) {
304 uint32_t *info = io_u_block_info(td, io_u);
305 if (BLOCK_INFO_STATE(*info) < BLOCK_STATE_TRIM_FAILURE) {
306 if (io_u->ddir == DDIR_TRIM)
307 *info = BLOCK_INFO_SET_STATE(*info,
308 BLOCK_STATE_TRIM_FAILURE);
309 else if (io_u->ddir == DDIR_WRITE)
310 *info = BLOCK_INFO_SET_STATE(*info,
311 BLOCK_STATE_WRITE_FAILURE);
318 if (ipo->flags & IP_F_ONRB)
319 rb_erase(&ipo->rb_node, &td->io_hist_tree);
320 else if (ipo->flags & IP_F_ONLIST)
321 flist_del(&ipo->list);
328 void trim_io_piece(struct thread_data *td, const struct io_u *io_u)
330 struct io_piece *ipo = io_u->ipo;
335 ipo->len = io_u->xfer_buflen - io_u->resid;
338 void write_iolog_close(struct thread_data *td)
344 td->iolog_buf = NULL;
348 * Read version 2 iolog data. It is enhanced to include per-file logging,
351 static int read_iolog2(struct thread_data *td, FILE *f)
353 unsigned long long offset;
355 int reads, writes, waits, fileno = 0, file_action = 0; /* stupid gcc */
356 char *rfname, *fname, *act;
360 free_release_files(td);
363 * Read in the read iolog and store it, reuse the infrastructure
364 * for doing verifications.
367 rfname = fname = malloc(256+16);
368 act = malloc(256+16);
370 reads = writes = waits = 0;
371 while ((p = fgets(str, 4096, f)) != NULL) {
372 struct io_piece *ipo;
375 r = sscanf(p, "%256s %256s %llu %u", rfname, act, &offset,
378 if (td->o.replay_redirect)
379 fname = td->o.replay_redirect;
385 if (!strcmp(act, "wait"))
387 else if (!strcmp(act, "read"))
389 else if (!strcmp(act, "write"))
391 else if (!strcmp(act, "sync"))
393 else if (!strcmp(act, "datasync"))
395 else if (!strcmp(act, "trim"))
398 log_err("fio: bad iolog file action: %s\n",
402 fileno = get_fileno(td, fname);
405 if (!strcmp(act, "add")) {
406 if (td->o.replay_redirect &&
407 get_fileno(td, fname) != -1) {
408 dprint(FD_FILE, "iolog: ignoring"
409 " re-add of file %s\n", fname);
411 fileno = add_file(td, fname, 0, 1);
412 file_action = FIO_LOG_ADD_FILE;
415 } else if (!strcmp(act, "open")) {
416 fileno = get_fileno(td, fname);
417 file_action = FIO_LOG_OPEN_FILE;
418 } else if (!strcmp(act, "close")) {
419 fileno = get_fileno(td, fname);
420 file_action = FIO_LOG_CLOSE_FILE;
422 log_err("fio: bad iolog file action: %s\n",
427 log_err("bad iolog2: %s\n", p);
433 else if (rw == DDIR_WRITE) {
435 * Don't add a write for ro mode
440 } else if (rw == DDIR_WAIT) {
444 } else if (rw == DDIR_INVAL) {
445 } else if (!ddir_sync(rw)) {
446 log_err("bad ddir: %d\n", rw);
453 ipo = malloc(sizeof(*ipo));
456 if (rw == DDIR_WAIT) {
459 if (td->o.replay_scale)
460 ipo->offset = offset / td->o.replay_scale;
462 ipo->offset = offset;
463 ipo_bytes_align(td->o.replay_align, ipo);
466 if (rw != DDIR_INVAL && bytes > td->o.max_bs[rw])
467 td->o.max_bs[rw] = bytes;
468 ipo->fileno = fileno;
469 ipo->file_action = file_action;
473 queue_io_piece(td, ipo);
480 if (writes && read_only) {
481 log_err("fio: <%s> skips replay of %d writes due to"
482 " read-only\n", td->o.name, writes);
486 if (!reads && !writes && !waits)
488 else if (reads && !writes)
489 td->o.td_ddir = TD_DDIR_READ;
490 else if (!reads && writes)
491 td->o.td_ddir = TD_DDIR_WRITE;
493 td->o.td_ddir = TD_DDIR_RW;
499 * open iolog, check version, and call appropriate parser
501 static int init_iolog_read(struct thread_data *td)
503 char buffer[256], *p;
507 f = fopen(td->o.read_iolog_file, "r");
509 perror("fopen read iolog");
513 p = fgets(buffer, sizeof(buffer), f);
515 td_verror(td, errno, "iolog read");
516 log_err("fio: unable to read iolog\n");
522 * version 2 of the iolog stores a specific string as the
523 * first line, check for that
525 if (!strncmp(iolog_ver2, buffer, strlen(iolog_ver2)))
526 ret = read_iolog2(td, f);
528 log_err("fio: iolog version 1 is no longer supported\n");
537 * Set up a log for storing io patterns.
539 static int init_iolog_write(struct thread_data *td)
545 f = fopen(td->o.write_iolog_file, "a");
547 perror("fopen write iolog");
552 * That's it for writing, setup a log buffer and we're done.
555 td->iolog_buf = malloc(8192);
556 setvbuf(f, td->iolog_buf, _IOFBF, 8192);
559 * write our version line
561 if (fprintf(f, "%s\n", iolog_ver2) < 0) {
562 perror("iolog init\n");
567 * add all known files
569 for_each_file(td, ff, i)
570 log_file(td, ff, FIO_LOG_ADD_FILE);
575 int init_iolog(struct thread_data *td)
579 if (td->o.read_iolog_file) {
583 * Check if it's a blktrace file and load that if possible.
584 * Otherwise assume it's a normal log file and load that.
586 if (is_blktrace(td->o.read_iolog_file, &need_swap))
587 ret = load_blktrace(td, td->o.read_iolog_file, need_swap);
589 ret = init_iolog_read(td);
590 } else if (td->o.write_iolog_file)
591 ret = init_iolog_write(td);
594 td_verror(td, EINVAL, "failed initializing iolog");
599 void setup_log(struct io_log **log, struct log_params *p,
600 const char *filename)
604 struct io_u_plat_entry *entry;
605 struct flist_head *list;
607 l = scalloc(1, sizeof(*l));
608 INIT_FLIST_HEAD(&l->io_logs);
609 l->log_type = p->log_type;
610 l->log_offset = p->log_offset;
611 l->log_gz = p->log_gz;
612 l->log_gz_store = p->log_gz_store;
613 l->avg_msec = p->avg_msec;
614 l->hist_msec = p->hist_msec;
615 l->hist_coarseness = p->hist_coarseness;
616 l->filename = strdup(filename);
619 /* Initialize histogram lists for each r/w direction,
620 * with initial io_u_plat of all zeros:
622 for (i = 0; i < DDIR_RWDIR_CNT; i++) {
623 list = &l->hist_window[i].list;
624 INIT_FLIST_HEAD(list);
625 entry = calloc(1, sizeof(struct io_u_plat_entry));
626 flist_add(&entry->list, list);
629 if (l->td && l->td->o.io_submit_mode != IO_MODE_OFFLOAD) {
632 p = calloc(1, sizeof(*l->pending));
633 p->max_samples = DEF_LOG_ENTRIES;
634 p->log = calloc(p->max_samples, log_entry_sz(l));
639 l->log_ddir_mask = LOG_OFFSET_SAMPLE_BIT;
641 INIT_FLIST_HEAD(&l->chunk_list);
643 if (l->log_gz && !p->td)
645 else if (l->log_gz || l->log_gz_store) {
646 mutex_init_pshared(&l->chunk_lock);
647 mutex_init_pshared(&l->deferred_free_lock);
648 p->td->flags |= TD_F_COMPRESS_LOG;
654 #ifdef CONFIG_SETVBUF
655 static void *set_file_buffer(FILE *f)
657 size_t size = 1048576;
661 setvbuf(f, buf, _IOFBF, size);
665 static void clear_file_buffer(void *buf)
670 static void *set_file_buffer(FILE *f)
675 static void clear_file_buffer(void *buf)
680 void free_log(struct io_log *log)
682 while (!flist_empty(&log->io_logs)) {
683 struct io_logs *cur_log;
685 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
686 flist_del_init(&cur_log->list);
692 free(log->pending->log);
702 unsigned long hist_sum(int j, int stride, unsigned int *io_u_plat,
703 unsigned int *io_u_plat_last)
708 if (io_u_plat_last) {
709 for (k = sum = 0; k < stride; k++)
710 sum += io_u_plat[j + k] - io_u_plat_last[j + k];
712 for (k = sum = 0; k < stride; k++)
713 sum += io_u_plat[j + k];
719 static void flush_hist_samples(FILE *f, int hist_coarseness, void *samples,
720 uint64_t sample_size)
724 uint64_t i, j, nr_samples;
725 struct io_u_plat_entry *entry, *entry_before;
726 unsigned int *io_u_plat;
727 unsigned int *io_u_plat_before;
729 int stride = 1 << hist_coarseness;
734 s = __get_sample(samples, 0, 0);
735 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
737 nr_samples = sample_size / __log_entry_sz(log_offset);
739 for (i = 0; i < nr_samples; i++) {
740 s = __get_sample(samples, log_offset, i);
742 entry = s->data.plat_entry;
743 io_u_plat = entry->io_u_plat;
745 entry_before = flist_first_entry(&entry->list, struct io_u_plat_entry, list);
746 io_u_plat_before = entry_before->io_u_plat;
748 fprintf(f, "%lu, %u, %u, ", (unsigned long) s->time,
749 io_sample_ddir(s), s->bs);
750 for (j = 0; j < FIO_IO_U_PLAT_NR - stride; j += stride) {
751 fprintf(f, "%lu, ", hist_sum(j, stride, io_u_plat,
754 fprintf(f, "%lu\n", (unsigned long)
755 hist_sum(FIO_IO_U_PLAT_NR - stride, stride, io_u_plat,
758 flist_del(&entry_before->list);
763 void flush_samples(FILE *f, void *samples, uint64_t sample_size)
767 uint64_t i, nr_samples;
772 s = __get_sample(samples, 0, 0);
773 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
775 nr_samples = sample_size / __log_entry_sz(log_offset);
777 for (i = 0; i < nr_samples; i++) {
778 s = __get_sample(samples, log_offset, i);
781 fprintf(f, "%lu, %" PRId64 ", %u, %u\n",
782 (unsigned long) s->time,
784 io_sample_ddir(s), s->bs);
786 struct io_sample_offset *so = (void *) s;
788 fprintf(f, "%lu, %" PRId64 ", %u, %u, %llu\n",
789 (unsigned long) s->time,
791 io_sample_ddir(s), s->bs,
792 (unsigned long long) so->offset);
799 struct iolog_flush_data {
800 struct workqueue_work work;
807 #define GZ_CHUNK 131072
809 static struct iolog_compress *get_new_chunk(unsigned int seq)
811 struct iolog_compress *c;
813 c = malloc(sizeof(*c));
814 INIT_FLIST_HEAD(&c->list);
815 c->buf = malloc(GZ_CHUNK);
821 static void free_chunk(struct iolog_compress *ic)
827 static int z_stream_init(z_stream *stream, int gz_hdr)
831 memset(stream, 0, sizeof(*stream));
832 stream->zalloc = Z_NULL;
833 stream->zfree = Z_NULL;
834 stream->opaque = Z_NULL;
835 stream->next_in = Z_NULL;
838 * zlib magic - add 32 for auto-detection of gz header or not,
839 * if we decide to store files in a gzip friendly format.
844 if (inflateInit2(stream, wbits) != Z_OK)
850 struct inflate_chunk_iter {
859 static void finish_chunk(z_stream *stream, FILE *f,
860 struct inflate_chunk_iter *iter)
864 ret = inflateEnd(stream);
866 log_err("fio: failed to end log inflation seq %d (%d)\n",
869 flush_samples(f, iter->buf, iter->buf_used);
872 iter->buf_size = iter->buf_used = 0;
876 * Iterative chunk inflation. Handles cases where we cross into a new
877 * sequence, doing flush finish of previous chunk if needed.
879 static size_t inflate_chunk(struct iolog_compress *ic, int gz_hdr, FILE *f,
880 z_stream *stream, struct inflate_chunk_iter *iter)
884 dprint(FD_COMPRESS, "inflate chunk size=%lu, seq=%u\n",
885 (unsigned long) ic->len, ic->seq);
887 if (ic->seq != iter->seq) {
889 finish_chunk(stream, f, iter);
891 z_stream_init(stream, gz_hdr);
895 stream->avail_in = ic->len;
896 stream->next_in = ic->buf;
898 if (!iter->buf_size) {
899 iter->buf_size = iter->chunk_sz;
900 iter->buf = malloc(iter->buf_size);
903 while (stream->avail_in) {
904 size_t this_out = iter->buf_size - iter->buf_used;
907 stream->avail_out = this_out;
908 stream->next_out = iter->buf + iter->buf_used;
910 err = inflate(stream, Z_NO_FLUSH);
912 log_err("fio: failed inflating log: %d\n", err);
917 iter->buf_used += this_out - stream->avail_out;
919 if (!stream->avail_out) {
920 iter->buf_size += iter->chunk_sz;
921 iter->buf = realloc(iter->buf, iter->buf_size);
925 if (err == Z_STREAM_END)
929 ret = (void *) stream->next_in - ic->buf;
931 dprint(FD_COMPRESS, "inflated to size=%lu\n", (unsigned long) iter->buf_size);
937 * Inflate stored compressed chunks, or write them directly to the log
938 * file if so instructed.
940 static int inflate_gz_chunks(struct io_log *log, FILE *f)
942 struct inflate_chunk_iter iter = { .chunk_sz = log->log_gz, };
945 while (!flist_empty(&log->chunk_list)) {
946 struct iolog_compress *ic;
948 ic = flist_first_entry(&log->chunk_list, struct iolog_compress, list);
949 flist_del(&ic->list);
951 if (log->log_gz_store) {
954 dprint(FD_COMPRESS, "log write chunk size=%lu, "
955 "seq=%u\n", (unsigned long) ic->len, ic->seq);
957 ret = fwrite(ic->buf, ic->len, 1, f);
958 if (ret != 1 || ferror(f)) {
960 log_err("fio: error writing compressed log\n");
963 inflate_chunk(ic, log->log_gz_store, f, &stream, &iter);
969 finish_chunk(&stream, f, &iter);
977 * Open compressed log file and decompress the stored chunks and
978 * write them to stdout. The chunks are stored sequentially in the
979 * file, so we iterate over them and do them one-by-one.
981 int iolog_file_inflate(const char *file)
983 struct inflate_chunk_iter iter = { .chunk_sz = 64 * 1024 * 1024, };
984 struct iolog_compress ic;
992 f = fopen(file, "r");
998 if (stat(file, &sb) < 0) {
1004 ic.buf = buf = malloc(sb.st_size);
1005 ic.len = sb.st_size;
1008 ret = fread(ic.buf, ic.len, 1, f);
1014 } else if (ret != 1) {
1015 log_err("fio: short read on reading log\n");
1024 * Each chunk will return Z_STREAM_END. We don't know how many
1025 * chunks are in the file, so we just keep looping and incrementing
1026 * the sequence number until we have consumed the whole compressed
1033 iret = inflate_chunk(&ic, 1, stdout, &stream, &iter);
1046 finish_chunk(&stream, stdout, &iter);
1056 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1061 int iolog_file_inflate(const char *file)
1063 log_err("fio: log inflation not possible without zlib\n");
1069 void flush_log(struct io_log *log, bool do_append)
1075 f = fopen(log->filename, "w");
1077 f = fopen(log->filename, "a");
1079 perror("fopen log");
1083 buf = set_file_buffer(f);
1085 inflate_gz_chunks(log, f);
1087 while (!flist_empty(&log->io_logs)) {
1088 struct io_logs *cur_log;
1090 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1091 flist_del_init(&cur_log->list);
1093 if (log->td && log == log->td->clat_hist_log)
1094 flush_hist_samples(f, log->hist_coarseness, cur_log->log,
1095 log_sample_sz(log, cur_log));
1097 flush_samples(f, cur_log->log, log_sample_sz(log, cur_log));
1103 clear_file_buffer(buf);
1106 static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
1108 if (td->flags & TD_F_COMPRESS_LOG)
1112 if (fio_trylock_file(log->filename))
1115 fio_lock_file(log->filename);
1117 if (td->client_type == FIO_CLIENT_TYPE_GUI || is_backend)
1118 fio_send_iolog(td, log, log->filename);
1120 flush_log(log, !td->o.per_job_logs);
1122 fio_unlock_file(log->filename);
1127 size_t log_chunk_sizes(struct io_log *log)
1129 struct flist_head *entry;
1132 if (flist_empty(&log->chunk_list))
1136 pthread_mutex_lock(&log->chunk_lock);
1137 flist_for_each(entry, &log->chunk_list) {
1138 struct iolog_compress *c;
1140 c = flist_entry(entry, struct iolog_compress, list);
1143 pthread_mutex_unlock(&log->chunk_lock);
1149 static bool warned_on_drop;
1151 static void iolog_put_deferred(struct io_log *log, void *ptr)
1156 pthread_mutex_lock(&log->deferred_free_lock);
1157 if (log->deferred < IOLOG_MAX_DEFER) {
1158 log->deferred_items[log->deferred] = ptr;
1160 } else if (!warned_on_drop) {
1161 log_err("fio: had to drop log entry free\n");
1162 warned_on_drop = true;
1164 pthread_mutex_unlock(&log->deferred_free_lock);
1167 static void iolog_free_deferred(struct io_log *log)
1174 pthread_mutex_lock(&log->deferred_free_lock);
1176 for (i = 0; i < log->deferred; i++) {
1177 free(log->deferred_items[i]);
1178 log->deferred_items[i] = NULL;
1182 pthread_mutex_unlock(&log->deferred_free_lock);
1185 static int gz_work(struct iolog_flush_data *data)
1187 struct iolog_compress *c = NULL;
1188 struct flist_head list;
1194 INIT_FLIST_HEAD(&list);
1196 memset(&stream, 0, sizeof(stream));
1197 stream.zalloc = Z_NULL;
1198 stream.zfree = Z_NULL;
1199 stream.opaque = Z_NULL;
1201 ret = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
1203 log_err("fio: failed to init gz stream\n");
1207 seq = ++data->log->chunk_seq;
1209 stream.next_in = (void *) data->samples;
1210 stream.avail_in = data->nr_samples * log_entry_sz(data->log);
1212 dprint(FD_COMPRESS, "deflate input size=%lu, seq=%u, log=%s\n",
1213 (unsigned long) stream.avail_in, seq,
1214 data->log->filename);
1217 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1218 (unsigned long) c->len);
1219 c = get_new_chunk(seq);
1220 stream.avail_out = GZ_CHUNK;
1221 stream.next_out = c->buf;
1222 ret = deflate(&stream, Z_NO_FLUSH);
1224 log_err("fio: deflate log (%d)\n", ret);
1229 c->len = GZ_CHUNK - stream.avail_out;
1230 flist_add_tail(&c->list, &list);
1232 } while (stream.avail_in);
1234 stream.next_out = c->buf + c->len;
1235 stream.avail_out = GZ_CHUNK - c->len;
1237 ret = deflate(&stream, Z_FINISH);
1240 * Z_BUF_ERROR is special, it just means we need more
1241 * output space. We'll handle that below. Treat any other
1244 if (ret != Z_BUF_ERROR) {
1245 log_err("fio: deflate log (%d)\n", ret);
1246 flist_del(&c->list);
1253 c->len = GZ_CHUNK - stream.avail_out;
1255 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq, (unsigned long) c->len);
1257 if (ret != Z_STREAM_END) {
1259 c = get_new_chunk(seq);
1260 stream.avail_out = GZ_CHUNK;
1261 stream.next_out = c->buf;
1262 ret = deflate(&stream, Z_FINISH);
1263 c->len = GZ_CHUNK - stream.avail_out;
1265 flist_add_tail(&c->list, &list);
1266 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1267 (unsigned long) c->len);
1268 } while (ret != Z_STREAM_END);
1271 dprint(FD_COMPRESS, "deflated to size=%lu\n", (unsigned long) total);
1273 ret = deflateEnd(&stream);
1275 log_err("fio: deflateEnd %d\n", ret);
1277 iolog_put_deferred(data->log, data->samples);
1279 if (!flist_empty(&list)) {
1280 pthread_mutex_lock(&data->log->chunk_lock);
1281 flist_splice_tail(&list, &data->log->chunk_list);
1282 pthread_mutex_unlock(&data->log->chunk_lock);
1291 while (!flist_empty(&list)) {
1292 c = flist_first_entry(list.next, struct iolog_compress, list);
1293 flist_del(&c->list);
1301 * Invoked from our compress helper thread, when logging would have exceeded
1302 * the specified memory limitation. Compresses the previously stored
1305 static int gz_work_async(struct submit_worker *sw, struct workqueue_work *work)
1307 return gz_work(container_of(work, struct iolog_flush_data, work));
1310 static int gz_init_worker(struct submit_worker *sw)
1312 struct thread_data *td = sw->wq->td;
1314 if (!fio_option_is_set(&td->o, log_gz_cpumask))
1317 if (fio_setaffinity(gettid(), td->o.log_gz_cpumask) == -1) {
1318 log_err("gz: failed to set CPU affinity\n");
1325 static struct workqueue_ops log_compress_wq_ops = {
1326 .fn = gz_work_async,
1327 .init_worker_fn = gz_init_worker,
1331 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1333 if (!(td->flags & TD_F_COMPRESS_LOG))
1336 workqueue_init(td, &td->log_compress_wq, &log_compress_wq_ops, 1, sk_out);
1340 void iolog_compress_exit(struct thread_data *td)
1342 if (!(td->flags & TD_F_COMPRESS_LOG))
1345 workqueue_exit(&td->log_compress_wq);
1349 * Queue work item to compress the existing log entries. We reset the
1350 * current log to a small size, and reference the existing log in the
1351 * data that we queue for compression. Once compression has been done,
1352 * this old log is freed. If called with finish == true, will not return
1353 * until the log compression has completed, and will flush all previous
1356 static int iolog_flush(struct io_log *log)
1358 struct iolog_flush_data *data;
1360 data = malloc(sizeof(*data));
1367 while (!flist_empty(&log->io_logs)) {
1368 struct io_logs *cur_log;
1370 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1371 flist_del_init(&cur_log->list);
1373 data->samples = cur_log->log;
1374 data->nr_samples = cur_log->nr_samples;
1385 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1387 struct iolog_flush_data *data;
1389 data = smalloc(sizeof(*data));
1395 data->samples = cur_log->log;
1396 data->nr_samples = cur_log->nr_samples;
1399 cur_log->nr_samples = cur_log->max_samples = 0;
1400 cur_log->log = NULL;
1402 workqueue_enqueue(&log->td->log_compress_wq, &data->work);
1404 iolog_free_deferred(log);
1410 static int iolog_flush(struct io_log *log)
1415 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1420 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1425 void iolog_compress_exit(struct thread_data *td)
1431 struct io_logs *iolog_cur_log(struct io_log *log)
1433 if (flist_empty(&log->io_logs))
1436 return flist_last_entry(&log->io_logs, struct io_logs, list);
1439 uint64_t iolog_nr_samples(struct io_log *iolog)
1441 struct flist_head *entry;
1444 flist_for_each(entry, &iolog->io_logs) {
1445 struct io_logs *cur_log;
1447 cur_log = flist_entry(entry, struct io_logs, list);
1448 ret += cur_log->nr_samples;
1454 static int __write_log(struct thread_data *td, struct io_log *log, int try)
1457 return finish_log(td, log, try);
1462 static int write_iops_log(struct thread_data *td, int try, bool unit_log)
1466 if (per_unit_log(td->iops_log) != unit_log)
1469 ret = __write_log(td, td->iops_log, try);
1471 td->iops_log = NULL;
1476 static int write_slat_log(struct thread_data *td, int try, bool unit_log)
1483 ret = __write_log(td, td->slat_log, try);
1485 td->slat_log = NULL;
1490 static int write_clat_log(struct thread_data *td, int try, bool unit_log)
1497 ret = __write_log(td, td->clat_log, try);
1499 td->clat_log = NULL;
1504 static int write_clat_hist_log(struct thread_data *td, int try, bool unit_log)
1511 ret = __write_log(td, td->clat_hist_log, try);
1513 td->clat_hist_log = NULL;
1518 static int write_lat_log(struct thread_data *td, int try, bool unit_log)
1525 ret = __write_log(td, td->lat_log, try);
1532 static int write_bandw_log(struct thread_data *td, int try, bool unit_log)
1536 if (per_unit_log(td->bw_log) != unit_log)
1539 ret = __write_log(td, td->bw_log, try);
1552 CLAT_HIST_LOG_MASK = 32,
1559 int (*fn)(struct thread_data *, int, bool);
1562 static struct log_type log_types[] = {
1564 .mask = BW_LOG_MASK,
1565 .fn = write_bandw_log,
1568 .mask = LAT_LOG_MASK,
1569 .fn = write_lat_log,
1572 .mask = SLAT_LOG_MASK,
1573 .fn = write_slat_log,
1576 .mask = CLAT_LOG_MASK,
1577 .fn = write_clat_log,
1580 .mask = IOPS_LOG_MASK,
1581 .fn = write_iops_log,
1584 .mask = CLAT_HIST_LOG_MASK,
1585 .fn = write_clat_hist_log,
1589 void td_writeout_logs(struct thread_data *td, bool unit_logs)
1591 unsigned int log_mask = 0;
1592 unsigned int log_left = ALL_LOG_NR;
1595 old_state = td_bump_runstate(td, TD_FINISHING);
1597 finalize_logs(td, unit_logs);
1600 int prev_log_left = log_left;
1602 for (i = 0; i < ALL_LOG_NR && log_left; i++) {
1603 struct log_type *lt = &log_types[i];
1606 if (!(log_mask & lt->mask)) {
1607 ret = lt->fn(td, log_left != 1, unit_logs);
1610 log_mask |= lt->mask;
1615 if (prev_log_left == log_left)
1619 td_restore_runstate(td, old_state);
1622 void fio_writeout_logs(bool unit_logs)
1624 struct thread_data *td;
1628 td_writeout_logs(td, unit_logs);