2 * Code related to writing an iolog of what a thread is doing, and to
3 * later read that back and replay
23 static int iolog_flush(struct io_log *log);
25 static const char iolog_ver2[] = "fio version 2 iolog";
27 void queue_io_piece(struct thread_data *td, struct io_piece *ipo)
29 flist_add_tail(&ipo->list, &td->io_log_list);
30 td->total_io_size += ipo->len;
33 void log_io_u(const struct thread_data *td, const struct io_u *io_u)
35 if (!td->o.write_iolog_file)
38 fprintf(td->iolog_f, "%s %s %llu %lu\n", io_u->file->file_name,
39 io_ddir_name(io_u->ddir),
40 io_u->offset, io_u->buflen);
43 void log_file(struct thread_data *td, struct fio_file *f,
44 enum file_log_act what)
46 const char *act[] = { "add", "open", "close" };
50 if (!td->o.write_iolog_file)
55 * this happens on the pre-open/close done before the job starts
60 fprintf(td->iolog_f, "%s %s\n", f->file_name, act[what]);
63 static void iolog_delay(struct thread_data *td, unsigned long delay)
65 uint64_t usec = utime_since_now(&td->last_issue);
66 unsigned long orig_delay = delay;
70 if (delay < td->time_offset) {
75 delay -= td->time_offset;
81 fio_gettime(&ts, NULL);
82 while (delay && !td->terminate) {
84 if (this_delay > 500000)
87 usec_sleep(td, this_delay);
91 usec = utime_since_now(&ts);
92 if (usec > orig_delay)
93 td->time_offset = usec - orig_delay;
98 static int ipo_special(struct thread_data *td, struct io_piece *ipo)
106 if (ipo->ddir != DDIR_INVAL)
109 f = td->files[ipo->fileno];
111 switch (ipo->file_action) {
112 case FIO_LOG_OPEN_FILE:
113 if (td->o.replay_redirect && fio_file_open(f)) {
114 dprint(FD_FILE, "iolog: ignoring re-open of file %s\n",
118 ret = td_io_open_file(td, f);
121 td_verror(td, ret, "iolog open file");
123 case FIO_LOG_CLOSE_FILE:
124 td_io_close_file(td, f);
126 case FIO_LOG_UNLINK_FILE:
127 td_io_unlink_file(td, f);
130 log_err("fio: bad file action %d\n", ipo->file_action);
137 int read_iolog_get(struct thread_data *td, struct io_u *io_u)
139 struct io_piece *ipo;
140 unsigned long elapsed;
142 while (!flist_empty(&td->io_log_list)) {
145 ipo = flist_first_entry(&td->io_log_list, struct io_piece, list);
146 flist_del(&ipo->list);
147 remove_trim_entry(td, ipo);
149 ret = ipo_special(td, ipo);
153 } else if (ret > 0) {
158 io_u->ddir = ipo->ddir;
159 if (ipo->ddir != DDIR_WAIT) {
160 io_u->offset = ipo->offset;
161 io_u->buflen = ipo->len;
162 io_u->file = td->files[ipo->fileno];
163 get_file(io_u->file);
164 dprint(FD_IO, "iolog: get %llu/%lu/%s\n", io_u->offset,
165 io_u->buflen, io_u->file->file_name);
167 iolog_delay(td, ipo->delay);
169 elapsed = mtime_since_genesis();
170 if (ipo->delay > elapsed)
171 usec_sleep(td, (ipo->delay - elapsed) * 1000);
176 if (io_u->ddir != DDIR_WAIT)
184 void prune_io_piece_log(struct thread_data *td)
186 struct io_piece *ipo;
187 struct fio_rb_node *n;
189 while ((n = rb_first(&td->io_hist_tree)) != NULL) {
190 ipo = rb_entry(n, struct io_piece, rb_node);
191 rb_erase(n, &td->io_hist_tree);
192 remove_trim_entry(td, ipo);
197 while (!flist_empty(&td->io_hist_list)) {
198 ipo = flist_first_entry(&td->io_hist_list, struct io_piece, list);
199 flist_del(&ipo->list);
200 remove_trim_entry(td, ipo);
207 * log a successful write, so we can unwind the log for verify
209 void log_io_piece(struct thread_data *td, struct io_u *io_u)
211 struct fio_rb_node **p, *parent;
212 struct io_piece *ipo, *__ipo;
214 ipo = malloc(sizeof(struct io_piece));
216 ipo->file = io_u->file;
217 ipo->offset = io_u->offset;
218 ipo->len = io_u->buflen;
219 ipo->numberio = io_u->numberio;
220 ipo->flags = IP_F_IN_FLIGHT;
224 if (io_u_should_trim(td, io_u)) {
225 flist_add_tail(&ipo->trim_list, &td->trim_list);
230 * Only sort writes if we don't have a random map in which case we need
231 * to check for duplicate blocks and drop the old one, which we rely on
232 * the rb insert/lookup for handling.
234 if (file_randommap(td, ipo->file)) {
235 INIT_FLIST_HEAD(&ipo->list);
236 flist_add_tail(&ipo->list, &td->io_hist_list);
237 ipo->flags |= IP_F_ONLIST;
242 RB_CLEAR_NODE(&ipo->rb_node);
245 * Sort the entry into the verification list
248 p = &td->io_hist_tree.rb_node;
254 __ipo = rb_entry(parent, struct io_piece, rb_node);
255 if (ipo->file < __ipo->file)
257 else if (ipo->file > __ipo->file)
259 else if (ipo->offset < __ipo->offset) {
261 overlap = ipo->offset + ipo->len > __ipo->offset;
263 else if (ipo->offset > __ipo->offset) {
265 overlap = __ipo->offset + __ipo->len > ipo->offset;
271 dprint(FD_IO, "iolog: overlap %llu/%lu, %llu/%lu\n",
272 __ipo->offset, __ipo->len,
273 ipo->offset, ipo->len);
275 rb_erase(parent, &td->io_hist_tree);
276 remove_trim_entry(td, __ipo);
277 if (!(__ipo->flags & IP_F_IN_FLIGHT))
283 rb_link_node(&ipo->rb_node, parent, p);
284 rb_insert_color(&ipo->rb_node, &td->io_hist_tree);
285 ipo->flags |= IP_F_ONRB;
289 void unlog_io_piece(struct thread_data *td, struct io_u *io_u)
291 struct io_piece *ipo = io_u->ipo;
293 if (td->ts.nr_block_infos) {
294 uint32_t *info = io_u_block_info(td, io_u);
295 if (BLOCK_INFO_STATE(*info) < BLOCK_STATE_TRIM_FAILURE) {
296 if (io_u->ddir == DDIR_TRIM)
297 *info = BLOCK_INFO_SET_STATE(*info,
298 BLOCK_STATE_TRIM_FAILURE);
299 else if (io_u->ddir == DDIR_WRITE)
300 *info = BLOCK_INFO_SET_STATE(*info,
301 BLOCK_STATE_WRITE_FAILURE);
308 if (ipo->flags & IP_F_ONRB)
309 rb_erase(&ipo->rb_node, &td->io_hist_tree);
310 else if (ipo->flags & IP_F_ONLIST)
311 flist_del(&ipo->list);
318 void trim_io_piece(struct thread_data *td, const struct io_u *io_u)
320 struct io_piece *ipo = io_u->ipo;
325 ipo->len = io_u->xfer_buflen - io_u->resid;
328 void write_iolog_close(struct thread_data *td)
334 td->iolog_buf = NULL;
338 * Read version 2 iolog data. It is enhanced to include per-file logging,
341 static int read_iolog2(struct thread_data *td, FILE *f)
343 unsigned long long offset;
345 int reads, writes, waits, fileno = 0, file_action = 0; /* stupid gcc */
346 char *rfname, *fname, *act;
350 free_release_files(td);
353 * Read in the read iolog and store it, reuse the infrastructure
354 * for doing verifications.
357 rfname = fname = malloc(256+16);
358 act = malloc(256+16);
360 reads = writes = waits = 0;
361 while ((p = fgets(str, 4096, f)) != NULL) {
362 struct io_piece *ipo;
365 r = sscanf(p, "%256s %256s %llu %u", rfname, act, &offset,
368 if (td->o.replay_redirect)
369 fname = td->o.replay_redirect;
375 if (!strcmp(act, "wait"))
377 else if (!strcmp(act, "read"))
379 else if (!strcmp(act, "write"))
381 else if (!strcmp(act, "sync"))
383 else if (!strcmp(act, "datasync"))
385 else if (!strcmp(act, "trim"))
388 log_err("fio: bad iolog file action: %s\n",
392 fileno = get_fileno(td, fname);
395 if (!strcmp(act, "add")) {
396 if (td->o.replay_redirect &&
397 get_fileno(td, fname) != -1) {
398 dprint(FD_FILE, "iolog: ignoring"
399 " re-add of file %s\n", fname);
401 fileno = add_file(td, fname, 0, 1);
402 file_action = FIO_LOG_ADD_FILE;
405 } else if (!strcmp(act, "open")) {
406 fileno = get_fileno(td, fname);
407 file_action = FIO_LOG_OPEN_FILE;
408 } else if (!strcmp(act, "close")) {
409 fileno = get_fileno(td, fname);
410 file_action = FIO_LOG_CLOSE_FILE;
412 log_err("fio: bad iolog file action: %s\n",
417 log_err("bad iolog2: %s\n", p);
423 else if (rw == DDIR_WRITE) {
425 * Don't add a write for ro mode
430 } else if (rw == DDIR_WAIT) {
434 } else if (rw == DDIR_INVAL) {
435 } else if (!ddir_sync(rw)) {
436 log_err("bad ddir: %d\n", rw);
443 ipo = malloc(sizeof(*ipo));
446 if (rw == DDIR_WAIT) {
449 if (td->o.replay_scale)
450 ipo->offset = offset / td->o.replay_scale;
452 ipo->offset = offset;
453 ipo_bytes_align(td->o.replay_align, ipo);
456 if (rw != DDIR_INVAL && bytes > td->o.max_bs[rw])
457 td->o.max_bs[rw] = bytes;
458 ipo->fileno = fileno;
459 ipo->file_action = file_action;
463 queue_io_piece(td, ipo);
470 if (writes && read_only) {
471 log_err("fio: <%s> skips replay of %d writes due to"
472 " read-only\n", td->o.name, writes);
476 if (!reads && !writes && !waits)
478 else if (reads && !writes)
479 td->o.td_ddir = TD_DDIR_READ;
480 else if (!reads && writes)
481 td->o.td_ddir = TD_DDIR_WRITE;
483 td->o.td_ddir = TD_DDIR_RW;
489 * open iolog, check version, and call appropriate parser
491 static int init_iolog_read(struct thread_data *td)
493 char buffer[256], *p;
497 f = fopen(td->o.read_iolog_file, "r");
499 perror("fopen read iolog");
503 p = fgets(buffer, sizeof(buffer), f);
505 td_verror(td, errno, "iolog read");
506 log_err("fio: unable to read iolog\n");
512 * version 2 of the iolog stores a specific string as the
513 * first line, check for that
515 if (!strncmp(iolog_ver2, buffer, strlen(iolog_ver2)))
516 ret = read_iolog2(td, f);
518 log_err("fio: iolog version 1 is no longer supported\n");
527 * Set up a log for storing io patterns.
529 static int init_iolog_write(struct thread_data *td)
535 f = fopen(td->o.write_iolog_file, "a");
537 perror("fopen write iolog");
542 * That's it for writing, setup a log buffer and we're done.
545 td->iolog_buf = malloc(8192);
546 setvbuf(f, td->iolog_buf, _IOFBF, 8192);
549 * write our version line
551 if (fprintf(f, "%s\n", iolog_ver2) < 0) {
552 perror("iolog init\n");
557 * add all known files
559 for_each_file(td, ff, i)
560 log_file(td, ff, FIO_LOG_ADD_FILE);
565 int init_iolog(struct thread_data *td)
569 if (td->o.read_iolog_file) {
573 * Check if it's a blktrace file and load that if possible.
574 * Otherwise assume it's a normal log file and load that.
576 if (is_blktrace(td->o.read_iolog_file, &need_swap))
577 ret = load_blktrace(td, td->o.read_iolog_file, need_swap);
579 ret = init_iolog_read(td);
580 } else if (td->o.write_iolog_file)
581 ret = init_iolog_write(td);
584 td_verror(td, EINVAL, "failed initializing iolog");
589 void setup_log(struct io_log **log, struct log_params *p,
590 const char *filename)
594 struct io_u_plat_entry *entry;
595 struct flist_head *list;
597 l = scalloc(1, sizeof(*l));
598 INIT_FLIST_HEAD(&l->io_logs);
599 l->log_type = p->log_type;
600 l->log_offset = p->log_offset;
601 l->log_gz = p->log_gz;
602 l->log_gz_store = p->log_gz_store;
603 l->avg_msec = p->avg_msec;
604 l->hist_msec = p->hist_msec;
605 l->hist_coarseness = p->hist_coarseness;
606 l->filename = strdup(filename);
609 /* Initialize histogram lists for each r/w direction,
610 * with initial io_u_plat of all zeros:
612 for (i = 0; i < DDIR_RWDIR_CNT; i++) {
613 list = &l->hist_window[i].list;
614 INIT_FLIST_HEAD(list);
615 entry = calloc(1, sizeof(struct io_u_plat_entry));
616 flist_add(&entry->list, list);
619 if (l->td && l->td->o.io_submit_mode != IO_MODE_OFFLOAD) {
622 p = calloc(1, sizeof(*l->pending));
623 p->max_samples = DEF_LOG_ENTRIES;
624 p->log = calloc(p->max_samples, log_entry_sz(l));
629 l->log_ddir_mask = LOG_OFFSET_SAMPLE_BIT;
631 INIT_FLIST_HEAD(&l->chunk_list);
633 if (l->log_gz && !p->td)
635 else if (l->log_gz || l->log_gz_store) {
636 mutex_init_pshared(&l->chunk_lock);
637 mutex_init_pshared(&l->deferred_free_lock);
638 p->td->flags |= TD_F_COMPRESS_LOG;
644 #ifdef CONFIG_SETVBUF
645 static void *set_file_buffer(FILE *f)
647 size_t size = 1048576;
651 setvbuf(f, buf, _IOFBF, size);
655 static void clear_file_buffer(void *buf)
660 static void *set_file_buffer(FILE *f)
665 static void clear_file_buffer(void *buf)
670 void free_log(struct io_log *log)
672 while (!flist_empty(&log->io_logs)) {
673 struct io_logs *cur_log;
675 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
676 flist_del_init(&cur_log->list);
682 free(log->pending->log);
692 uint64_t hist_sum(int j, int stride, uint64_t *io_u_plat,
693 uint64_t *io_u_plat_last)
698 if (io_u_plat_last) {
699 for (k = sum = 0; k < stride; k++)
700 sum += io_u_plat[j + k] - io_u_plat_last[j + k];
702 for (k = sum = 0; k < stride; k++)
703 sum += io_u_plat[j + k];
709 static void flush_hist_samples(FILE *f, int hist_coarseness, void *samples,
710 uint64_t sample_size)
714 uint64_t i, j, nr_samples;
715 struct io_u_plat_entry *entry, *entry_before;
717 uint64_t *io_u_plat_before;
719 int stride = 1 << hist_coarseness;
724 s = __get_sample(samples, 0, 0);
725 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
727 nr_samples = sample_size / __log_entry_sz(log_offset);
729 for (i = 0; i < nr_samples; i++) {
730 s = __get_sample(samples, log_offset, i);
732 entry = s->data.plat_entry;
733 io_u_plat = entry->io_u_plat;
735 entry_before = flist_first_entry(&entry->list, struct io_u_plat_entry, list);
736 io_u_plat_before = entry_before->io_u_plat;
738 fprintf(f, "%lu, %u, %u, ", (unsigned long) s->time,
739 io_sample_ddir(s), s->bs);
740 for (j = 0; j < FIO_IO_U_PLAT_NR - stride; j += stride) {
741 fprintf(f, "%llu, ", (unsigned long long)
742 hist_sum(j, stride, io_u_plat, io_u_plat_before));
744 fprintf(f, "%llu\n", (unsigned long long)
745 hist_sum(FIO_IO_U_PLAT_NR - stride, stride, io_u_plat,
748 flist_del(&entry_before->list);
753 void flush_samples(FILE *f, void *samples, uint64_t sample_size)
757 uint64_t i, nr_samples;
762 s = __get_sample(samples, 0, 0);
763 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
765 nr_samples = sample_size / __log_entry_sz(log_offset);
767 for (i = 0; i < nr_samples; i++) {
768 s = __get_sample(samples, log_offset, i);
771 fprintf(f, "%lu, %" PRId64 ", %u, %u\n",
772 (unsigned long) s->time,
774 io_sample_ddir(s), s->bs);
776 struct io_sample_offset *so = (void *) s;
778 fprintf(f, "%lu, %" PRId64 ", %u, %u, %llu\n",
779 (unsigned long) s->time,
781 io_sample_ddir(s), s->bs,
782 (unsigned long long) so->offset);
789 struct iolog_flush_data {
790 struct workqueue_work work;
797 #define GZ_CHUNK 131072
799 static struct iolog_compress *get_new_chunk(unsigned int seq)
801 struct iolog_compress *c;
803 c = malloc(sizeof(*c));
804 INIT_FLIST_HEAD(&c->list);
805 c->buf = malloc(GZ_CHUNK);
811 static void free_chunk(struct iolog_compress *ic)
817 static int z_stream_init(z_stream *stream, int gz_hdr)
821 memset(stream, 0, sizeof(*stream));
822 stream->zalloc = Z_NULL;
823 stream->zfree = Z_NULL;
824 stream->opaque = Z_NULL;
825 stream->next_in = Z_NULL;
828 * zlib magic - add 32 for auto-detection of gz header or not,
829 * if we decide to store files in a gzip friendly format.
834 if (inflateInit2(stream, wbits) != Z_OK)
840 struct inflate_chunk_iter {
849 static void finish_chunk(z_stream *stream, FILE *f,
850 struct inflate_chunk_iter *iter)
854 ret = inflateEnd(stream);
856 log_err("fio: failed to end log inflation seq %d (%d)\n",
859 flush_samples(f, iter->buf, iter->buf_used);
862 iter->buf_size = iter->buf_used = 0;
866 * Iterative chunk inflation. Handles cases where we cross into a new
867 * sequence, doing flush finish of previous chunk if needed.
869 static size_t inflate_chunk(struct iolog_compress *ic, int gz_hdr, FILE *f,
870 z_stream *stream, struct inflate_chunk_iter *iter)
874 dprint(FD_COMPRESS, "inflate chunk size=%lu, seq=%u\n",
875 (unsigned long) ic->len, ic->seq);
877 if (ic->seq != iter->seq) {
879 finish_chunk(stream, f, iter);
881 z_stream_init(stream, gz_hdr);
885 stream->avail_in = ic->len;
886 stream->next_in = ic->buf;
888 if (!iter->buf_size) {
889 iter->buf_size = iter->chunk_sz;
890 iter->buf = malloc(iter->buf_size);
893 while (stream->avail_in) {
894 size_t this_out = iter->buf_size - iter->buf_used;
897 stream->avail_out = this_out;
898 stream->next_out = iter->buf + iter->buf_used;
900 err = inflate(stream, Z_NO_FLUSH);
902 log_err("fio: failed inflating log: %d\n", err);
907 iter->buf_used += this_out - stream->avail_out;
909 if (!stream->avail_out) {
910 iter->buf_size += iter->chunk_sz;
911 iter->buf = realloc(iter->buf, iter->buf_size);
915 if (err == Z_STREAM_END)
919 ret = (void *) stream->next_in - ic->buf;
921 dprint(FD_COMPRESS, "inflated to size=%lu\n", (unsigned long) iter->buf_size);
927 * Inflate stored compressed chunks, or write them directly to the log
928 * file if so instructed.
930 static int inflate_gz_chunks(struct io_log *log, FILE *f)
932 struct inflate_chunk_iter iter = { .chunk_sz = log->log_gz, };
935 while (!flist_empty(&log->chunk_list)) {
936 struct iolog_compress *ic;
938 ic = flist_first_entry(&log->chunk_list, struct iolog_compress, list);
939 flist_del(&ic->list);
941 if (log->log_gz_store) {
944 dprint(FD_COMPRESS, "log write chunk size=%lu, "
945 "seq=%u\n", (unsigned long) ic->len, ic->seq);
947 ret = fwrite(ic->buf, ic->len, 1, f);
948 if (ret != 1 || ferror(f)) {
950 log_err("fio: error writing compressed log\n");
953 inflate_chunk(ic, log->log_gz_store, f, &stream, &iter);
959 finish_chunk(&stream, f, &iter);
967 * Open compressed log file and decompress the stored chunks and
968 * write them to stdout. The chunks are stored sequentially in the
969 * file, so we iterate over them and do them one-by-one.
971 int iolog_file_inflate(const char *file)
973 struct inflate_chunk_iter iter = { .chunk_sz = 64 * 1024 * 1024, };
974 struct iolog_compress ic;
982 f = fopen(file, "r");
988 if (stat(file, &sb) < 0) {
994 ic.buf = buf = malloc(sb.st_size);
998 ret = fread(ic.buf, ic.len, 1, f);
999 if (ret == 0 && ferror(f)) {
1004 } else if (ferror(f) || (!feof(f) && ret != 1)) {
1005 log_err("fio: short read on reading log\n");
1014 * Each chunk will return Z_STREAM_END. We don't know how many
1015 * chunks are in the file, so we just keep looping and incrementing
1016 * the sequence number until we have consumed the whole compressed
1023 iret = inflate_chunk(&ic, 1, stdout, &stream, &iter);
1036 finish_chunk(&stream, stdout, &iter);
1046 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1051 int iolog_file_inflate(const char *file)
1053 log_err("fio: log inflation not possible without zlib\n");
1059 void flush_log(struct io_log *log, bool do_append)
1065 f = fopen(log->filename, "w");
1067 f = fopen(log->filename, "a");
1069 perror("fopen log");
1073 buf = set_file_buffer(f);
1075 inflate_gz_chunks(log, f);
1077 while (!flist_empty(&log->io_logs)) {
1078 struct io_logs *cur_log;
1080 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1081 flist_del_init(&cur_log->list);
1083 if (log->td && log == log->td->clat_hist_log)
1084 flush_hist_samples(f, log->hist_coarseness, cur_log->log,
1085 log_sample_sz(log, cur_log));
1087 flush_samples(f, cur_log->log, log_sample_sz(log, cur_log));
1093 clear_file_buffer(buf);
1096 static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
1098 if (td->flags & TD_F_COMPRESS_LOG)
1102 if (fio_trylock_file(log->filename))
1105 fio_lock_file(log->filename);
1107 if (td->client_type == FIO_CLIENT_TYPE_GUI || is_backend)
1108 fio_send_iolog(td, log, log->filename);
1110 flush_log(log, !td->o.per_job_logs);
1112 fio_unlock_file(log->filename);
1117 size_t log_chunk_sizes(struct io_log *log)
1119 struct flist_head *entry;
1122 if (flist_empty(&log->chunk_list))
1126 pthread_mutex_lock(&log->chunk_lock);
1127 flist_for_each(entry, &log->chunk_list) {
1128 struct iolog_compress *c;
1130 c = flist_entry(entry, struct iolog_compress, list);
1133 pthread_mutex_unlock(&log->chunk_lock);
1139 static void iolog_put_deferred(struct io_log *log, void *ptr)
1144 pthread_mutex_lock(&log->deferred_free_lock);
1145 if (log->deferred < IOLOG_MAX_DEFER) {
1146 log->deferred_items[log->deferred] = ptr;
1148 } else if (!fio_did_warn(FIO_WARN_IOLOG_DROP))
1149 log_err("fio: had to drop log entry free\n");
1150 pthread_mutex_unlock(&log->deferred_free_lock);
1153 static void iolog_free_deferred(struct io_log *log)
1160 pthread_mutex_lock(&log->deferred_free_lock);
1162 for (i = 0; i < log->deferred; i++) {
1163 free(log->deferred_items[i]);
1164 log->deferred_items[i] = NULL;
1168 pthread_mutex_unlock(&log->deferred_free_lock);
1171 static int gz_work(struct iolog_flush_data *data)
1173 struct iolog_compress *c = NULL;
1174 struct flist_head list;
1180 INIT_FLIST_HEAD(&list);
1182 memset(&stream, 0, sizeof(stream));
1183 stream.zalloc = Z_NULL;
1184 stream.zfree = Z_NULL;
1185 stream.opaque = Z_NULL;
1187 ret = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
1189 log_err("fio: failed to init gz stream\n");
1193 seq = ++data->log->chunk_seq;
1195 stream.next_in = (void *) data->samples;
1196 stream.avail_in = data->nr_samples * log_entry_sz(data->log);
1198 dprint(FD_COMPRESS, "deflate input size=%lu, seq=%u, log=%s\n",
1199 (unsigned long) stream.avail_in, seq,
1200 data->log->filename);
1203 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1204 (unsigned long) c->len);
1205 c = get_new_chunk(seq);
1206 stream.avail_out = GZ_CHUNK;
1207 stream.next_out = c->buf;
1208 ret = deflate(&stream, Z_NO_FLUSH);
1210 log_err("fio: deflate log (%d)\n", ret);
1215 c->len = GZ_CHUNK - stream.avail_out;
1216 flist_add_tail(&c->list, &list);
1218 } while (stream.avail_in);
1220 stream.next_out = c->buf + c->len;
1221 stream.avail_out = GZ_CHUNK - c->len;
1223 ret = deflate(&stream, Z_FINISH);
1226 * Z_BUF_ERROR is special, it just means we need more
1227 * output space. We'll handle that below. Treat any other
1230 if (ret != Z_BUF_ERROR) {
1231 log_err("fio: deflate log (%d)\n", ret);
1232 flist_del(&c->list);
1239 c->len = GZ_CHUNK - stream.avail_out;
1241 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq, (unsigned long) c->len);
1243 if (ret != Z_STREAM_END) {
1245 c = get_new_chunk(seq);
1246 stream.avail_out = GZ_CHUNK;
1247 stream.next_out = c->buf;
1248 ret = deflate(&stream, Z_FINISH);
1249 c->len = GZ_CHUNK - stream.avail_out;
1251 flist_add_tail(&c->list, &list);
1252 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1253 (unsigned long) c->len);
1254 } while (ret != Z_STREAM_END);
1257 dprint(FD_COMPRESS, "deflated to size=%lu\n", (unsigned long) total);
1259 ret = deflateEnd(&stream);
1261 log_err("fio: deflateEnd %d\n", ret);
1263 iolog_put_deferred(data->log, data->samples);
1265 if (!flist_empty(&list)) {
1266 pthread_mutex_lock(&data->log->chunk_lock);
1267 flist_splice_tail(&list, &data->log->chunk_list);
1268 pthread_mutex_unlock(&data->log->chunk_lock);
1277 while (!flist_empty(&list)) {
1278 c = flist_first_entry(list.next, struct iolog_compress, list);
1279 flist_del(&c->list);
1287 * Invoked from our compress helper thread, when logging would have exceeded
1288 * the specified memory limitation. Compresses the previously stored
1291 static int gz_work_async(struct submit_worker *sw, struct workqueue_work *work)
1293 return gz_work(container_of(work, struct iolog_flush_data, work));
1296 static int gz_init_worker(struct submit_worker *sw)
1298 struct thread_data *td = sw->wq->td;
1300 if (!fio_option_is_set(&td->o, log_gz_cpumask))
1303 if (fio_setaffinity(gettid(), td->o.log_gz_cpumask) == -1) {
1304 log_err("gz: failed to set CPU affinity\n");
1311 static struct workqueue_ops log_compress_wq_ops = {
1312 .fn = gz_work_async,
1313 .init_worker_fn = gz_init_worker,
1317 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1319 if (!(td->flags & TD_F_COMPRESS_LOG))
1322 workqueue_init(td, &td->log_compress_wq, &log_compress_wq_ops, 1, sk_out);
1326 void iolog_compress_exit(struct thread_data *td)
1328 if (!(td->flags & TD_F_COMPRESS_LOG))
1331 workqueue_exit(&td->log_compress_wq);
1335 * Queue work item to compress the existing log entries. We reset the
1336 * current log to a small size, and reference the existing log in the
1337 * data that we queue for compression. Once compression has been done,
1338 * this old log is freed. If called with finish == true, will not return
1339 * until the log compression has completed, and will flush all previous
1342 static int iolog_flush(struct io_log *log)
1344 struct iolog_flush_data *data;
1346 data = malloc(sizeof(*data));
1353 while (!flist_empty(&log->io_logs)) {
1354 struct io_logs *cur_log;
1356 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1357 flist_del_init(&cur_log->list);
1359 data->samples = cur_log->log;
1360 data->nr_samples = cur_log->nr_samples;
1371 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1373 struct iolog_flush_data *data;
1375 data = smalloc(sizeof(*data));
1381 data->samples = cur_log->log;
1382 data->nr_samples = cur_log->nr_samples;
1385 cur_log->nr_samples = cur_log->max_samples = 0;
1386 cur_log->log = NULL;
1388 workqueue_enqueue(&log->td->log_compress_wq, &data->work);
1390 iolog_free_deferred(log);
1396 static int iolog_flush(struct io_log *log)
1401 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1406 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1411 void iolog_compress_exit(struct thread_data *td)
1417 struct io_logs *iolog_cur_log(struct io_log *log)
1419 if (flist_empty(&log->io_logs))
1422 return flist_last_entry(&log->io_logs, struct io_logs, list);
1425 uint64_t iolog_nr_samples(struct io_log *iolog)
1427 struct flist_head *entry;
1430 flist_for_each(entry, &iolog->io_logs) {
1431 struct io_logs *cur_log;
1433 cur_log = flist_entry(entry, struct io_logs, list);
1434 ret += cur_log->nr_samples;
1440 static int __write_log(struct thread_data *td, struct io_log *log, int try)
1443 return finish_log(td, log, try);
1448 static int write_iops_log(struct thread_data *td, int try, bool unit_log)
1452 if (per_unit_log(td->iops_log) != unit_log)
1455 ret = __write_log(td, td->iops_log, try);
1457 td->iops_log = NULL;
1462 static int write_slat_log(struct thread_data *td, int try, bool unit_log)
1469 ret = __write_log(td, td->slat_log, try);
1471 td->slat_log = NULL;
1476 static int write_clat_log(struct thread_data *td, int try, bool unit_log)
1483 ret = __write_log(td, td->clat_log, try);
1485 td->clat_log = NULL;
1490 static int write_clat_hist_log(struct thread_data *td, int try, bool unit_log)
1497 ret = __write_log(td, td->clat_hist_log, try);
1499 td->clat_hist_log = NULL;
1504 static int write_lat_log(struct thread_data *td, int try, bool unit_log)
1511 ret = __write_log(td, td->lat_log, try);
1518 static int write_bandw_log(struct thread_data *td, int try, bool unit_log)
1522 if (per_unit_log(td->bw_log) != unit_log)
1525 ret = __write_log(td, td->bw_log, try);
1538 CLAT_HIST_LOG_MASK = 32,
1545 int (*fn)(struct thread_data *, int, bool);
1548 static struct log_type log_types[] = {
1550 .mask = BW_LOG_MASK,
1551 .fn = write_bandw_log,
1554 .mask = LAT_LOG_MASK,
1555 .fn = write_lat_log,
1558 .mask = SLAT_LOG_MASK,
1559 .fn = write_slat_log,
1562 .mask = CLAT_LOG_MASK,
1563 .fn = write_clat_log,
1566 .mask = IOPS_LOG_MASK,
1567 .fn = write_iops_log,
1570 .mask = CLAT_HIST_LOG_MASK,
1571 .fn = write_clat_hist_log,
1575 void td_writeout_logs(struct thread_data *td, bool unit_logs)
1577 unsigned int log_mask = 0;
1578 unsigned int log_left = ALL_LOG_NR;
1581 old_state = td_bump_runstate(td, TD_FINISHING);
1583 finalize_logs(td, unit_logs);
1586 int prev_log_left = log_left;
1588 for (i = 0; i < ALL_LOG_NR && log_left; i++) {
1589 struct log_type *lt = &log_types[i];
1592 if (!(log_mask & lt->mask)) {
1593 ret = lt->fn(td, log_left != 1, unit_logs);
1596 log_mask |= lt->mask;
1601 if (prev_log_left == log_left)
1605 td_restore_runstate(td, old_state);
1608 void fio_writeout_logs(bool unit_logs)
1610 struct thread_data *td;
1614 td_writeout_logs(td, unit_logs);