2 * Code related to writing an iolog of what a thread is doing, and to
3 * later read that back and replay
23 static int iolog_flush(struct io_log *log);
25 static const char iolog_ver2[] = "fio version 2 iolog";
27 void queue_io_piece(struct thread_data *td, struct io_piece *ipo)
29 flist_add_tail(&ipo->list, &td->io_log_list);
30 td->total_io_size += ipo->len;
33 void log_io_u(const struct thread_data *td, const struct io_u *io_u)
35 if (!td->o.write_iolog_file)
38 fprintf(td->iolog_f, "%s %s %llu %lu\n", io_u->file->file_name,
39 io_ddir_name(io_u->ddir),
40 io_u->offset, io_u->buflen);
43 void log_file(struct thread_data *td, struct fio_file *f,
44 enum file_log_act what)
46 const char *act[] = { "add", "open", "close" };
50 if (!td->o.write_iolog_file)
55 * this happens on the pre-open/close done before the job starts
60 fprintf(td->iolog_f, "%s %s\n", f->file_name, act[what]);
63 static void iolog_delay(struct thread_data *td, unsigned long delay)
65 uint64_t usec = utime_since_now(&td->last_issue);
66 unsigned long orig_delay = delay;
70 if (delay < td->time_offset) {
75 delay -= td->time_offset;
81 fio_gettime(&ts, NULL);
82 while (delay && !td->terminate) {
84 if (this_delay > 500000)
87 usec_sleep(td, this_delay);
91 usec = utime_since_now(&ts);
92 if (usec > orig_delay)
93 td->time_offset = usec - orig_delay;
98 static int ipo_special(struct thread_data *td, struct io_piece *ipo)
106 if (ipo->ddir != DDIR_INVAL)
109 f = td->files[ipo->fileno];
111 switch (ipo->file_action) {
112 case FIO_LOG_OPEN_FILE:
113 if (td->o.replay_redirect && fio_file_open(f)) {
114 dprint(FD_FILE, "iolog: ignoring re-open of file %s\n",
118 ret = td_io_open_file(td, f);
121 td_verror(td, ret, "iolog open file");
123 case FIO_LOG_CLOSE_FILE:
124 td_io_close_file(td, f);
126 case FIO_LOG_UNLINK_FILE:
127 td_io_unlink_file(td, f);
130 log_err("fio: bad file action %d\n", ipo->file_action);
137 int read_iolog_get(struct thread_data *td, struct io_u *io_u)
139 struct io_piece *ipo;
140 unsigned long elapsed;
142 while (!flist_empty(&td->io_log_list)) {
145 ipo = flist_first_entry(&td->io_log_list, struct io_piece, list);
146 flist_del(&ipo->list);
147 remove_trim_entry(td, ipo);
149 ret = ipo_special(td, ipo);
153 } else if (ret > 0) {
158 io_u->ddir = ipo->ddir;
159 if (ipo->ddir != DDIR_WAIT) {
160 io_u->offset = ipo->offset;
161 io_u->buflen = ipo->len;
162 io_u->file = td->files[ipo->fileno];
163 get_file(io_u->file);
164 dprint(FD_IO, "iolog: get %llu/%lu/%s\n", io_u->offset,
165 io_u->buflen, io_u->file->file_name);
167 iolog_delay(td, ipo->delay);
169 elapsed = mtime_since_genesis();
170 if (ipo->delay > elapsed)
171 usec_sleep(td, (ipo->delay - elapsed) * 1000);
176 if (io_u->ddir != DDIR_WAIT)
184 void prune_io_piece_log(struct thread_data *td)
186 struct io_piece *ipo;
187 struct fio_rb_node *n;
189 while ((n = rb_first(&td->io_hist_tree)) != NULL) {
190 ipo = rb_entry(n, struct io_piece, rb_node);
191 rb_erase(n, &td->io_hist_tree);
192 remove_trim_entry(td, ipo);
197 while (!flist_empty(&td->io_hist_list)) {
198 ipo = flist_first_entry(&td->io_hist_list, struct io_piece, list);
199 flist_del(&ipo->list);
200 remove_trim_entry(td, ipo);
207 * log a successful write, so we can unwind the log for verify
209 void log_io_piece(struct thread_data *td, struct io_u *io_u)
211 struct fio_rb_node **p, *parent;
212 struct io_piece *ipo, *__ipo;
214 ipo = calloc(1, sizeof(struct io_piece));
216 ipo->file = io_u->file;
217 ipo->offset = io_u->offset;
218 ipo->len = io_u->buflen;
219 ipo->numberio = io_u->numberio;
220 ipo->flags = IP_F_IN_FLIGHT;
224 if (io_u_should_trim(td, io_u)) {
225 flist_add_tail(&ipo->trim_list, &td->trim_list);
230 * Only sort writes if we don't have a random map in which case we need
231 * to check for duplicate blocks and drop the old one, which we rely on
232 * the rb insert/lookup for handling.
234 if (file_randommap(td, ipo->file)) {
235 INIT_FLIST_HEAD(&ipo->list);
236 flist_add_tail(&ipo->list, &td->io_hist_list);
237 ipo->flags |= IP_F_ONLIST;
242 RB_CLEAR_NODE(&ipo->rb_node);
245 * Sort the entry into the verification list
248 p = &td->io_hist_tree.rb_node;
254 __ipo = rb_entry(parent, struct io_piece, rb_node);
255 if (ipo->file < __ipo->file)
257 else if (ipo->file > __ipo->file)
259 else if (ipo->offset < __ipo->offset) {
261 overlap = ipo->offset + ipo->len > __ipo->offset;
263 else if (ipo->offset > __ipo->offset) {
265 overlap = __ipo->offset + __ipo->len > ipo->offset;
271 dprint(FD_IO, "iolog: overlap %llu/%lu, %llu/%lu\n",
272 __ipo->offset, __ipo->len,
273 ipo->offset, ipo->len);
275 rb_erase(parent, &td->io_hist_tree);
276 remove_trim_entry(td, __ipo);
277 if (!(__ipo->flags & IP_F_IN_FLIGHT))
283 rb_link_node(&ipo->rb_node, parent, p);
284 rb_insert_color(&ipo->rb_node, &td->io_hist_tree);
285 ipo->flags |= IP_F_ONRB;
289 void unlog_io_piece(struct thread_data *td, struct io_u *io_u)
291 struct io_piece *ipo = io_u->ipo;
293 if (td->ts.nr_block_infos) {
294 uint32_t *info = io_u_block_info(td, io_u);
295 if (BLOCK_INFO_STATE(*info) < BLOCK_STATE_TRIM_FAILURE) {
296 if (io_u->ddir == DDIR_TRIM)
297 *info = BLOCK_INFO_SET_STATE(*info,
298 BLOCK_STATE_TRIM_FAILURE);
299 else if (io_u->ddir == DDIR_WRITE)
300 *info = BLOCK_INFO_SET_STATE(*info,
301 BLOCK_STATE_WRITE_FAILURE);
308 if (ipo->flags & IP_F_ONRB)
309 rb_erase(&ipo->rb_node, &td->io_hist_tree);
310 else if (ipo->flags & IP_F_ONLIST)
311 flist_del(&ipo->list);
318 void trim_io_piece(const struct io_u *io_u)
320 struct io_piece *ipo = io_u->ipo;
325 ipo->len = io_u->xfer_buflen - io_u->resid;
328 void write_iolog_close(struct thread_data *td)
334 td->iolog_buf = NULL;
338 * Read version 2 iolog data. It is enhanced to include per-file logging,
341 static bool read_iolog2(struct thread_data *td, FILE *f)
343 unsigned long long offset;
345 int reads, writes, waits, fileno = 0, file_action = 0; /* stupid gcc */
346 char *rfname, *fname, *act;
350 free_release_files(td);
353 * Read in the read iolog and store it, reuse the infrastructure
354 * for doing verifications.
357 rfname = fname = malloc(256+16);
358 act = malloc(256+16);
360 reads = writes = waits = 0;
361 while ((p = fgets(str, 4096, f)) != NULL) {
362 struct io_piece *ipo;
365 r = sscanf(p, "%256s %256s %llu %u", rfname, act, &offset,
368 if (td->o.replay_redirect)
369 fname = td->o.replay_redirect;
375 if (!strcmp(act, "wait"))
377 else if (!strcmp(act, "read"))
379 else if (!strcmp(act, "write"))
381 else if (!strcmp(act, "sync"))
383 else if (!strcmp(act, "datasync"))
385 else if (!strcmp(act, "trim"))
388 log_err("fio: bad iolog file action: %s\n",
392 fileno = get_fileno(td, fname);
395 if (!strcmp(act, "add")) {
396 if (td->o.replay_redirect &&
397 get_fileno(td, fname) != -1) {
398 dprint(FD_FILE, "iolog: ignoring"
399 " re-add of file %s\n", fname);
401 fileno = add_file(td, fname, 0, 1);
402 file_action = FIO_LOG_ADD_FILE;
405 } else if (!strcmp(act, "open")) {
406 fileno = get_fileno(td, fname);
407 file_action = FIO_LOG_OPEN_FILE;
408 } else if (!strcmp(act, "close")) {
409 fileno = get_fileno(td, fname);
410 file_action = FIO_LOG_CLOSE_FILE;
412 log_err("fio: bad iolog file action: %s\n",
417 log_err("bad iolog2: %s\n", p);
423 else if (rw == DDIR_WRITE) {
425 * Don't add a write for ro mode
430 } else if (rw == DDIR_WAIT) {
434 } else if (rw == DDIR_INVAL) {
435 } else if (!ddir_sync(rw)) {
436 log_err("bad ddir: %d\n", rw);
443 ipo = calloc(1, sizeof(*ipo));
446 if (rw == DDIR_WAIT) {
449 if (td->o.replay_scale)
450 ipo->offset = offset / td->o.replay_scale;
452 ipo->offset = offset;
453 ipo_bytes_align(td->o.replay_align, ipo);
456 if (rw != DDIR_INVAL && bytes > td->o.max_bs[rw])
457 td->o.max_bs[rw] = bytes;
458 ipo->fileno = fileno;
459 ipo->file_action = file_action;
463 queue_io_piece(td, ipo);
470 if (writes && read_only) {
471 log_err("fio: <%s> skips replay of %d writes due to"
472 " read-only\n", td->o.name, writes);
476 if (!reads && !writes && !waits)
478 else if (reads && !writes)
479 td->o.td_ddir = TD_DDIR_READ;
480 else if (!reads && writes)
481 td->o.td_ddir = TD_DDIR_WRITE;
483 td->o.td_ddir = TD_DDIR_RW;
489 * open iolog, check version, and call appropriate parser
491 static bool init_iolog_read(struct thread_data *td)
493 char buffer[256], *p;
497 f = fopen(td->o.read_iolog_file, "r");
499 perror("fopen read iolog");
503 p = fgets(buffer, sizeof(buffer), f);
505 td_verror(td, errno, "iolog read");
506 log_err("fio: unable to read iolog\n");
512 * version 2 of the iolog stores a specific string as the
513 * first line, check for that
515 if (!strncmp(iolog_ver2, buffer, strlen(iolog_ver2)))
516 ret = read_iolog2(td, f);
518 log_err("fio: iolog version 1 is no longer supported\n");
527 * Set up a log for storing io patterns.
529 static bool init_iolog_write(struct thread_data *td)
535 f = fopen(td->o.write_iolog_file, "a");
537 perror("fopen write iolog");
542 * That's it for writing, setup a log buffer and we're done.
545 td->iolog_buf = malloc(8192);
546 setvbuf(f, td->iolog_buf, _IOFBF, 8192);
549 * write our version line
551 if (fprintf(f, "%s\n", iolog_ver2) < 0) {
552 perror("iolog init\n");
557 * add all known files
559 for_each_file(td, ff, i)
560 log_file(td, ff, FIO_LOG_ADD_FILE);
565 bool init_iolog(struct thread_data *td)
569 if (td->o.read_iolog_file) {
573 * Check if it's a blktrace file and load that if possible.
574 * Otherwise assume it's a normal log file and load that.
576 if (is_blktrace(td->o.read_iolog_file, &need_swap))
577 ret = load_blktrace(td, td->o.read_iolog_file, need_swap);
579 ret = init_iolog_read(td);
580 } else if (td->o.write_iolog_file)
581 ret = init_iolog_write(td);
586 td_verror(td, EINVAL, "failed initializing iolog");
591 void setup_log(struct io_log **log, struct log_params *p,
592 const char *filename)
596 struct io_u_plat_entry *entry;
597 struct flist_head *list;
599 l = scalloc(1, sizeof(*l));
600 INIT_FLIST_HEAD(&l->io_logs);
601 l->log_type = p->log_type;
602 l->log_offset = p->log_offset;
603 l->log_gz = p->log_gz;
604 l->log_gz_store = p->log_gz_store;
605 l->avg_msec = p->avg_msec;
606 l->hist_msec = p->hist_msec;
607 l->hist_coarseness = p->hist_coarseness;
608 l->filename = strdup(filename);
611 /* Initialize histogram lists for each r/w direction,
612 * with initial io_u_plat of all zeros:
614 for (i = 0; i < DDIR_RWDIR_CNT; i++) {
615 list = &l->hist_window[i].list;
616 INIT_FLIST_HEAD(list);
617 entry = calloc(1, sizeof(struct io_u_plat_entry));
618 flist_add(&entry->list, list);
621 if (l->td && l->td->o.io_submit_mode != IO_MODE_OFFLOAD) {
624 __p = calloc(1, sizeof(*l->pending));
625 __p->max_samples = DEF_LOG_ENTRIES;
626 __p->log = calloc(__p->max_samples, log_entry_sz(l));
631 l->log_ddir_mask = LOG_OFFSET_SAMPLE_BIT;
633 INIT_FLIST_HEAD(&l->chunk_list);
635 if (l->log_gz && !p->td)
637 else if (l->log_gz || l->log_gz_store) {
638 mutex_init_pshared(&l->chunk_lock);
639 mutex_init_pshared(&l->deferred_free_lock);
640 p->td->flags |= TD_F_COMPRESS_LOG;
646 #ifdef CONFIG_SETVBUF
647 static void *set_file_buffer(FILE *f)
649 size_t size = 1048576;
653 setvbuf(f, buf, _IOFBF, size);
657 static void clear_file_buffer(void *buf)
662 static void *set_file_buffer(FILE *f)
667 static void clear_file_buffer(void *buf)
672 void free_log(struct io_log *log)
674 while (!flist_empty(&log->io_logs)) {
675 struct io_logs *cur_log;
677 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
678 flist_del_init(&cur_log->list);
684 free(log->pending->log);
694 uint64_t hist_sum(int j, int stride, uint64_t *io_u_plat,
695 uint64_t *io_u_plat_last)
700 if (io_u_plat_last) {
701 for (k = sum = 0; k < stride; k++)
702 sum += io_u_plat[j + k] - io_u_plat_last[j + k];
704 for (k = sum = 0; k < stride; k++)
705 sum += io_u_plat[j + k];
711 static void flush_hist_samples(FILE *f, int hist_coarseness, void *samples,
712 uint64_t sample_size)
716 uint64_t i, j, nr_samples;
717 struct io_u_plat_entry *entry, *entry_before;
719 uint64_t *io_u_plat_before;
721 int stride = 1 << hist_coarseness;
726 s = __get_sample(samples, 0, 0);
727 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
729 nr_samples = sample_size / __log_entry_sz(log_offset);
731 for (i = 0; i < nr_samples; i++) {
732 s = __get_sample(samples, log_offset, i);
734 entry = s->data.plat_entry;
735 io_u_plat = entry->io_u_plat;
737 entry_before = flist_first_entry(&entry->list, struct io_u_plat_entry, list);
738 io_u_plat_before = entry_before->io_u_plat;
740 fprintf(f, "%lu, %u, %u, ", (unsigned long) s->time,
741 io_sample_ddir(s), s->bs);
742 for (j = 0; j < FIO_IO_U_PLAT_NR - stride; j += stride) {
743 fprintf(f, "%llu, ", (unsigned long long)
744 hist_sum(j, stride, io_u_plat, io_u_plat_before));
746 fprintf(f, "%llu\n", (unsigned long long)
747 hist_sum(FIO_IO_U_PLAT_NR - stride, stride, io_u_plat,
750 flist_del(&entry_before->list);
755 void flush_samples(FILE *f, void *samples, uint64_t sample_size)
759 uint64_t i, nr_samples;
764 s = __get_sample(samples, 0, 0);
765 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
767 nr_samples = sample_size / __log_entry_sz(log_offset);
769 for (i = 0; i < nr_samples; i++) {
770 s = __get_sample(samples, log_offset, i);
773 fprintf(f, "%lu, %" PRId64 ", %u, %u\n",
774 (unsigned long) s->time,
776 io_sample_ddir(s), s->bs);
778 struct io_sample_offset *so = (void *) s;
780 fprintf(f, "%lu, %" PRId64 ", %u, %u, %llu\n",
781 (unsigned long) s->time,
783 io_sample_ddir(s), s->bs,
784 (unsigned long long) so->offset);
791 struct iolog_flush_data {
792 struct workqueue_work work;
799 #define GZ_CHUNK 131072
801 static struct iolog_compress *get_new_chunk(unsigned int seq)
803 struct iolog_compress *c;
805 c = malloc(sizeof(*c));
806 INIT_FLIST_HEAD(&c->list);
807 c->buf = malloc(GZ_CHUNK);
813 static void free_chunk(struct iolog_compress *ic)
819 static int z_stream_init(z_stream *stream, int gz_hdr)
823 memset(stream, 0, sizeof(*stream));
824 stream->zalloc = Z_NULL;
825 stream->zfree = Z_NULL;
826 stream->opaque = Z_NULL;
827 stream->next_in = Z_NULL;
830 * zlib magic - add 32 for auto-detection of gz header or not,
831 * if we decide to store files in a gzip friendly format.
836 if (inflateInit2(stream, wbits) != Z_OK)
842 struct inflate_chunk_iter {
851 static void finish_chunk(z_stream *stream, FILE *f,
852 struct inflate_chunk_iter *iter)
856 ret = inflateEnd(stream);
858 log_err("fio: failed to end log inflation seq %d (%d)\n",
861 flush_samples(f, iter->buf, iter->buf_used);
864 iter->buf_size = iter->buf_used = 0;
868 * Iterative chunk inflation. Handles cases where we cross into a new
869 * sequence, doing flush finish of previous chunk if needed.
871 static size_t inflate_chunk(struct iolog_compress *ic, int gz_hdr, FILE *f,
872 z_stream *stream, struct inflate_chunk_iter *iter)
876 dprint(FD_COMPRESS, "inflate chunk size=%lu, seq=%u\n",
877 (unsigned long) ic->len, ic->seq);
879 if (ic->seq != iter->seq) {
881 finish_chunk(stream, f, iter);
883 z_stream_init(stream, gz_hdr);
887 stream->avail_in = ic->len;
888 stream->next_in = ic->buf;
890 if (!iter->buf_size) {
891 iter->buf_size = iter->chunk_sz;
892 iter->buf = malloc(iter->buf_size);
895 while (stream->avail_in) {
896 size_t this_out = iter->buf_size - iter->buf_used;
899 stream->avail_out = this_out;
900 stream->next_out = iter->buf + iter->buf_used;
902 err = inflate(stream, Z_NO_FLUSH);
904 log_err("fio: failed inflating log: %d\n", err);
909 iter->buf_used += this_out - stream->avail_out;
911 if (!stream->avail_out) {
912 iter->buf_size += iter->chunk_sz;
913 iter->buf = realloc(iter->buf, iter->buf_size);
917 if (err == Z_STREAM_END)
921 ret = (void *) stream->next_in - ic->buf;
923 dprint(FD_COMPRESS, "inflated to size=%lu\n", (unsigned long) iter->buf_size);
929 * Inflate stored compressed chunks, or write them directly to the log
930 * file if so instructed.
932 static int inflate_gz_chunks(struct io_log *log, FILE *f)
934 struct inflate_chunk_iter iter = { .chunk_sz = log->log_gz, };
937 while (!flist_empty(&log->chunk_list)) {
938 struct iolog_compress *ic;
940 ic = flist_first_entry(&log->chunk_list, struct iolog_compress, list);
941 flist_del(&ic->list);
943 if (log->log_gz_store) {
946 dprint(FD_COMPRESS, "log write chunk size=%lu, "
947 "seq=%u\n", (unsigned long) ic->len, ic->seq);
949 ret = fwrite(ic->buf, ic->len, 1, f);
950 if (ret != 1 || ferror(f)) {
952 log_err("fio: error writing compressed log\n");
955 inflate_chunk(ic, log->log_gz_store, f, &stream, &iter);
961 finish_chunk(&stream, f, &iter);
969 * Open compressed log file and decompress the stored chunks and
970 * write them to stdout. The chunks are stored sequentially in the
971 * file, so we iterate over them and do them one-by-one.
973 int iolog_file_inflate(const char *file)
975 struct inflate_chunk_iter iter = { .chunk_sz = 64 * 1024 * 1024, };
976 struct iolog_compress ic;
984 f = fopen(file, "r");
990 if (stat(file, &sb) < 0) {
996 ic.buf = buf = malloc(sb.st_size);
1000 ret = fread(ic.buf, ic.len, 1, f);
1001 if (ret == 0 && ferror(f)) {
1006 } else if (ferror(f) || (!feof(f) && ret != 1)) {
1007 log_err("fio: short read on reading log\n");
1016 * Each chunk will return Z_STREAM_END. We don't know how many
1017 * chunks are in the file, so we just keep looping and incrementing
1018 * the sequence number until we have consumed the whole compressed
1025 iret = inflate_chunk(&ic, 1, stdout, &stream, &iter);
1038 finish_chunk(&stream, stdout, &iter);
1048 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1053 int iolog_file_inflate(const char *file)
1055 log_err("fio: log inflation not possible without zlib\n");
1061 void flush_log(struct io_log *log, bool do_append)
1067 f = fopen(log->filename, "w");
1069 f = fopen(log->filename, "a");
1071 perror("fopen log");
1075 buf = set_file_buffer(f);
1077 inflate_gz_chunks(log, f);
1079 while (!flist_empty(&log->io_logs)) {
1080 struct io_logs *cur_log;
1082 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1083 flist_del_init(&cur_log->list);
1085 if (log->td && log == log->td->clat_hist_log)
1086 flush_hist_samples(f, log->hist_coarseness, cur_log->log,
1087 log_sample_sz(log, cur_log));
1089 flush_samples(f, cur_log->log, log_sample_sz(log, cur_log));
1095 clear_file_buffer(buf);
1098 static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
1100 if (td->flags & TD_F_COMPRESS_LOG)
1104 if (fio_trylock_file(log->filename))
1107 fio_lock_file(log->filename);
1109 if (td->client_type == FIO_CLIENT_TYPE_GUI || is_backend)
1110 fio_send_iolog(td, log, log->filename);
1112 flush_log(log, !td->o.per_job_logs);
1114 fio_unlock_file(log->filename);
1119 size_t log_chunk_sizes(struct io_log *log)
1121 struct flist_head *entry;
1124 if (flist_empty(&log->chunk_list))
1128 pthread_mutex_lock(&log->chunk_lock);
1129 flist_for_each(entry, &log->chunk_list) {
1130 struct iolog_compress *c;
1132 c = flist_entry(entry, struct iolog_compress, list);
1135 pthread_mutex_unlock(&log->chunk_lock);
1141 static void iolog_put_deferred(struct io_log *log, void *ptr)
1146 pthread_mutex_lock(&log->deferred_free_lock);
1147 if (log->deferred < IOLOG_MAX_DEFER) {
1148 log->deferred_items[log->deferred] = ptr;
1150 } else if (!fio_did_warn(FIO_WARN_IOLOG_DROP))
1151 log_err("fio: had to drop log entry free\n");
1152 pthread_mutex_unlock(&log->deferred_free_lock);
1155 static void iolog_free_deferred(struct io_log *log)
1162 pthread_mutex_lock(&log->deferred_free_lock);
1164 for (i = 0; i < log->deferred; i++) {
1165 free(log->deferred_items[i]);
1166 log->deferred_items[i] = NULL;
1170 pthread_mutex_unlock(&log->deferred_free_lock);
1173 static int gz_work(struct iolog_flush_data *data)
1175 struct iolog_compress *c = NULL;
1176 struct flist_head list;
1182 INIT_FLIST_HEAD(&list);
1184 memset(&stream, 0, sizeof(stream));
1185 stream.zalloc = Z_NULL;
1186 stream.zfree = Z_NULL;
1187 stream.opaque = Z_NULL;
1189 ret = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
1191 log_err("fio: failed to init gz stream\n");
1195 seq = ++data->log->chunk_seq;
1197 stream.next_in = (void *) data->samples;
1198 stream.avail_in = data->nr_samples * log_entry_sz(data->log);
1200 dprint(FD_COMPRESS, "deflate input size=%lu, seq=%u, log=%s\n",
1201 (unsigned long) stream.avail_in, seq,
1202 data->log->filename);
1205 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1206 (unsigned long) c->len);
1207 c = get_new_chunk(seq);
1208 stream.avail_out = GZ_CHUNK;
1209 stream.next_out = c->buf;
1210 ret = deflate(&stream, Z_NO_FLUSH);
1212 log_err("fio: deflate log (%d)\n", ret);
1217 c->len = GZ_CHUNK - stream.avail_out;
1218 flist_add_tail(&c->list, &list);
1220 } while (stream.avail_in);
1222 stream.next_out = c->buf + c->len;
1223 stream.avail_out = GZ_CHUNK - c->len;
1225 ret = deflate(&stream, Z_FINISH);
1228 * Z_BUF_ERROR is special, it just means we need more
1229 * output space. We'll handle that below. Treat any other
1232 if (ret != Z_BUF_ERROR) {
1233 log_err("fio: deflate log (%d)\n", ret);
1234 flist_del(&c->list);
1241 c->len = GZ_CHUNK - stream.avail_out;
1243 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq, (unsigned long) c->len);
1245 if (ret != Z_STREAM_END) {
1247 c = get_new_chunk(seq);
1248 stream.avail_out = GZ_CHUNK;
1249 stream.next_out = c->buf;
1250 ret = deflate(&stream, Z_FINISH);
1251 c->len = GZ_CHUNK - stream.avail_out;
1253 flist_add_tail(&c->list, &list);
1254 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1255 (unsigned long) c->len);
1256 } while (ret != Z_STREAM_END);
1259 dprint(FD_COMPRESS, "deflated to size=%lu\n", (unsigned long) total);
1261 ret = deflateEnd(&stream);
1263 log_err("fio: deflateEnd %d\n", ret);
1265 iolog_put_deferred(data->log, data->samples);
1267 if (!flist_empty(&list)) {
1268 pthread_mutex_lock(&data->log->chunk_lock);
1269 flist_splice_tail(&list, &data->log->chunk_list);
1270 pthread_mutex_unlock(&data->log->chunk_lock);
1279 while (!flist_empty(&list)) {
1280 c = flist_first_entry(list.next, struct iolog_compress, list);
1281 flist_del(&c->list);
1289 * Invoked from our compress helper thread, when logging would have exceeded
1290 * the specified memory limitation. Compresses the previously stored
1293 static int gz_work_async(struct submit_worker *sw, struct workqueue_work *work)
1295 return gz_work(container_of(work, struct iolog_flush_data, work));
1298 static int gz_init_worker(struct submit_worker *sw)
1300 struct thread_data *td = sw->wq->td;
1302 if (!fio_option_is_set(&td->o, log_gz_cpumask))
1305 if (fio_setaffinity(gettid(), td->o.log_gz_cpumask) == -1) {
1306 log_err("gz: failed to set CPU affinity\n");
1313 static struct workqueue_ops log_compress_wq_ops = {
1314 .fn = gz_work_async,
1315 .init_worker_fn = gz_init_worker,
1319 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1321 if (!(td->flags & TD_F_COMPRESS_LOG))
1324 workqueue_init(td, &td->log_compress_wq, &log_compress_wq_ops, 1, sk_out);
1328 void iolog_compress_exit(struct thread_data *td)
1330 if (!(td->flags & TD_F_COMPRESS_LOG))
1333 workqueue_exit(&td->log_compress_wq);
1337 * Queue work item to compress the existing log entries. We reset the
1338 * current log to a small size, and reference the existing log in the
1339 * data that we queue for compression. Once compression has been done,
1340 * this old log is freed. If called with finish == true, will not return
1341 * until the log compression has completed, and will flush all previous
1344 static int iolog_flush(struct io_log *log)
1346 struct iolog_flush_data *data;
1348 data = malloc(sizeof(*data));
1355 while (!flist_empty(&log->io_logs)) {
1356 struct io_logs *cur_log;
1358 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1359 flist_del_init(&cur_log->list);
1361 data->samples = cur_log->log;
1362 data->nr_samples = cur_log->nr_samples;
1373 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1375 struct iolog_flush_data *data;
1377 data = smalloc(sizeof(*data));
1383 data->samples = cur_log->log;
1384 data->nr_samples = cur_log->nr_samples;
1387 cur_log->nr_samples = cur_log->max_samples = 0;
1388 cur_log->log = NULL;
1390 workqueue_enqueue(&log->td->log_compress_wq, &data->work);
1392 iolog_free_deferred(log);
1398 static int iolog_flush(struct io_log *log)
1403 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1408 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1413 void iolog_compress_exit(struct thread_data *td)
1419 struct io_logs *iolog_cur_log(struct io_log *log)
1421 if (flist_empty(&log->io_logs))
1424 return flist_last_entry(&log->io_logs, struct io_logs, list);
1427 uint64_t iolog_nr_samples(struct io_log *iolog)
1429 struct flist_head *entry;
1432 flist_for_each(entry, &iolog->io_logs) {
1433 struct io_logs *cur_log;
1435 cur_log = flist_entry(entry, struct io_logs, list);
1436 ret += cur_log->nr_samples;
1442 static int __write_log(struct thread_data *td, struct io_log *log, int try)
1445 return finish_log(td, log, try);
1450 static int write_iops_log(struct thread_data *td, int try, bool unit_log)
1454 if (per_unit_log(td->iops_log) != unit_log)
1457 ret = __write_log(td, td->iops_log, try);
1459 td->iops_log = NULL;
1464 static int write_slat_log(struct thread_data *td, int try, bool unit_log)
1471 ret = __write_log(td, td->slat_log, try);
1473 td->slat_log = NULL;
1478 static int write_clat_log(struct thread_data *td, int try, bool unit_log)
1485 ret = __write_log(td, td->clat_log, try);
1487 td->clat_log = NULL;
1492 static int write_clat_hist_log(struct thread_data *td, int try, bool unit_log)
1499 ret = __write_log(td, td->clat_hist_log, try);
1501 td->clat_hist_log = NULL;
1506 static int write_lat_log(struct thread_data *td, int try, bool unit_log)
1513 ret = __write_log(td, td->lat_log, try);
1520 static int write_bandw_log(struct thread_data *td, int try, bool unit_log)
1524 if (per_unit_log(td->bw_log) != unit_log)
1527 ret = __write_log(td, td->bw_log, try);
1540 CLAT_HIST_LOG_MASK = 32,
1547 int (*fn)(struct thread_data *, int, bool);
1550 static struct log_type log_types[] = {
1552 .mask = BW_LOG_MASK,
1553 .fn = write_bandw_log,
1556 .mask = LAT_LOG_MASK,
1557 .fn = write_lat_log,
1560 .mask = SLAT_LOG_MASK,
1561 .fn = write_slat_log,
1564 .mask = CLAT_LOG_MASK,
1565 .fn = write_clat_log,
1568 .mask = IOPS_LOG_MASK,
1569 .fn = write_iops_log,
1572 .mask = CLAT_HIST_LOG_MASK,
1573 .fn = write_clat_hist_log,
1577 void td_writeout_logs(struct thread_data *td, bool unit_logs)
1579 unsigned int log_mask = 0;
1580 unsigned int log_left = ALL_LOG_NR;
1583 old_state = td_bump_runstate(td, TD_FINISHING);
1585 finalize_logs(td, unit_logs);
1588 int prev_log_left = log_left;
1590 for (i = 0; i < ALL_LOG_NR && log_left; i++) {
1591 struct log_type *lt = &log_types[i];
1594 if (!(log_mask & lt->mask)) {
1595 ret = lt->fn(td, log_left != 1, unit_logs);
1598 log_mask |= lt->mask;
1603 if (prev_log_left == log_left)
1607 td_restore_runstate(td, old_state);
1610 void fio_writeout_logs(bool unit_logs)
1612 struct thread_data *td;
1616 td_writeout_logs(td, unit_logs);