2 * Code related to writing an iolog of what a thread is doing, and to
3 * later read that back and replay
24 static int iolog_flush(struct io_log *log);
26 static const char iolog_ver2[] = "fio version 2 iolog";
28 void queue_io_piece(struct thread_data *td, struct io_piece *ipo)
30 flist_add_tail(&ipo->list, &td->io_log_list);
31 td->total_io_size += ipo->len;
34 void log_io_u(const struct thread_data *td, const struct io_u *io_u)
36 if (!td->o.write_iolog_file)
39 fprintf(td->iolog_f, "%s %s %llu %lu\n", io_u->file->file_name,
40 io_ddir_name(io_u->ddir),
41 io_u->offset, io_u->buflen);
44 void log_file(struct thread_data *td, struct fio_file *f,
45 enum file_log_act what)
47 const char *act[] = { "add", "open", "close" };
51 if (!td->o.write_iolog_file)
56 * this happens on the pre-open/close done before the job starts
61 fprintf(td->iolog_f, "%s %s\n", f->file_name, act[what]);
64 static void iolog_delay(struct thread_data *td, unsigned long delay)
66 uint64_t usec = utime_since_now(&td->last_issue);
70 if (delay < td->time_offset) {
75 delay -= td->time_offset;
81 fio_gettime(&ts, NULL);
82 while (delay && !td->terminate) {
84 if (this_delay > 500000)
87 usec_sleep(td, this_delay);
91 usec = utime_since_now(&ts);
93 td->time_offset = usec - delay;
98 static int ipo_special(struct thread_data *td, struct io_piece *ipo)
106 if (ipo->ddir != DDIR_INVAL)
109 f = td->files[ipo->fileno];
111 switch (ipo->file_action) {
112 case FIO_LOG_OPEN_FILE:
113 if (td->o.replay_redirect && fio_file_open(f)) {
114 dprint(FD_FILE, "iolog: ignoring re-open of file %s\n",
118 ret = td_io_open_file(td, f);
121 td_verror(td, ret, "iolog open file");
123 case FIO_LOG_CLOSE_FILE:
124 td_io_close_file(td, f);
126 case FIO_LOG_UNLINK_FILE:
127 td_io_unlink_file(td, f);
130 log_err("fio: bad file action %d\n", ipo->file_action);
137 int read_iolog_get(struct thread_data *td, struct io_u *io_u)
139 struct io_piece *ipo;
140 unsigned long elapsed;
142 while (!flist_empty(&td->io_log_list)) {
145 ipo = flist_first_entry(&td->io_log_list, struct io_piece, list);
146 flist_del(&ipo->list);
147 remove_trim_entry(td, ipo);
149 ret = ipo_special(td, ipo);
153 } else if (ret > 0) {
158 io_u->ddir = ipo->ddir;
159 if (ipo->ddir != DDIR_WAIT) {
160 io_u->offset = ipo->offset;
161 io_u->buflen = ipo->len;
162 io_u->file = td->files[ipo->fileno];
163 get_file(io_u->file);
164 dprint(FD_IO, "iolog: get %llu/%lu/%s\n", io_u->offset,
165 io_u->buflen, io_u->file->file_name);
167 iolog_delay(td, ipo->delay);
169 elapsed = mtime_since_genesis();
170 if (ipo->delay > elapsed)
171 usec_sleep(td, (ipo->delay - elapsed) * 1000);
176 if (io_u->ddir != DDIR_WAIT)
184 void prune_io_piece_log(struct thread_data *td)
186 struct io_piece *ipo;
189 while ((n = rb_first(&td->io_hist_tree)) != NULL) {
190 ipo = rb_entry(n, struct io_piece, rb_node);
191 rb_erase(n, &td->io_hist_tree);
192 remove_trim_entry(td, ipo);
197 while (!flist_empty(&td->io_hist_list)) {
198 ipo = flist_first_entry(&td->io_hist_list, struct io_piece, list);
199 flist_del(&ipo->list);
200 remove_trim_entry(td, ipo);
207 * log a successful write, so we can unwind the log for verify
209 void log_io_piece(struct thread_data *td, struct io_u *io_u)
211 struct rb_node **p, *parent;
212 struct io_piece *ipo, *__ipo;
214 ipo = malloc(sizeof(struct io_piece));
216 ipo->file = io_u->file;
217 ipo->offset = io_u->offset;
218 ipo->len = io_u->buflen;
219 ipo->numberio = io_u->numberio;
220 ipo->flags = IP_F_IN_FLIGHT;
224 if (io_u_should_trim(td, io_u)) {
225 flist_add_tail(&ipo->trim_list, &td->trim_list);
230 * We don't need to sort the entries, if:
232 * Sequential writes, or
233 * Random writes that lay out the file as it goes along
235 * For both these cases, just reading back data in the order we
236 * wrote it out is the fastest.
238 * One exception is if we don't have a random map AND we are doing
239 * verifies, in that case we need to check for duplicate blocks and
240 * drop the old one, which we rely on the rb insert/lookup for
243 if (((!td->o.verifysort) || !td_random(td) || !td->o.overwrite) &&
244 (file_randommap(td, ipo->file) || td->o.verify == VERIFY_NONE)) {
245 INIT_FLIST_HEAD(&ipo->list);
246 flist_add_tail(&ipo->list, &td->io_hist_list);
247 ipo->flags |= IP_F_ONLIST;
252 RB_CLEAR_NODE(&ipo->rb_node);
255 * Sort the entry into the verification list
258 p = &td->io_hist_tree.rb_node;
264 __ipo = rb_entry(parent, struct io_piece, rb_node);
265 if (ipo->file < __ipo->file)
267 else if (ipo->file > __ipo->file)
269 else if (ipo->offset < __ipo->offset) {
271 overlap = ipo->offset + ipo->len > __ipo->offset;
273 else if (ipo->offset > __ipo->offset) {
275 overlap = __ipo->offset + __ipo->len > ipo->offset;
281 dprint(FD_IO, "iolog: overlap %llu/%lu, %llu/%lu\n",
282 __ipo->offset, __ipo->len,
283 ipo->offset, ipo->len);
285 rb_erase(parent, &td->io_hist_tree);
286 remove_trim_entry(td, __ipo);
292 rb_link_node(&ipo->rb_node, parent, p);
293 rb_insert_color(&ipo->rb_node, &td->io_hist_tree);
294 ipo->flags |= IP_F_ONRB;
298 void unlog_io_piece(struct thread_data *td, struct io_u *io_u)
300 struct io_piece *ipo = io_u->ipo;
302 if (td->ts.nr_block_infos) {
303 uint32_t *info = io_u_block_info(td, io_u);
304 if (BLOCK_INFO_STATE(*info) < BLOCK_STATE_TRIM_FAILURE) {
305 if (io_u->ddir == DDIR_TRIM)
306 *info = BLOCK_INFO_SET_STATE(*info,
307 BLOCK_STATE_TRIM_FAILURE);
308 else if (io_u->ddir == DDIR_WRITE)
309 *info = BLOCK_INFO_SET_STATE(*info,
310 BLOCK_STATE_WRITE_FAILURE);
317 if (ipo->flags & IP_F_ONRB)
318 rb_erase(&ipo->rb_node, &td->io_hist_tree);
319 else if (ipo->flags & IP_F_ONLIST)
320 flist_del(&ipo->list);
327 void trim_io_piece(struct thread_data *td, const struct io_u *io_u)
329 struct io_piece *ipo = io_u->ipo;
334 ipo->len = io_u->xfer_buflen - io_u->resid;
337 void write_iolog_close(struct thread_data *td)
343 td->iolog_buf = NULL;
347 * Read version 2 iolog data. It is enhanced to include per-file logging,
350 static int read_iolog2(struct thread_data *td, FILE *f)
352 unsigned long long offset;
354 int reads, writes, waits, fileno = 0, file_action = 0; /* stupid gcc */
355 char *rfname, *fname, *act;
359 free_release_files(td);
362 * Read in the read iolog and store it, reuse the infrastructure
363 * for doing verifications.
366 rfname = fname = malloc(256+16);
367 act = malloc(256+16);
369 reads = writes = waits = 0;
370 while ((p = fgets(str, 4096, f)) != NULL) {
371 struct io_piece *ipo;
374 r = sscanf(p, "%256s %256s %llu %u", rfname, act, &offset,
377 if (td->o.replay_redirect)
378 fname = td->o.replay_redirect;
384 if (!strcmp(act, "wait"))
386 else if (!strcmp(act, "read"))
388 else if (!strcmp(act, "write"))
390 else if (!strcmp(act, "sync"))
392 else if (!strcmp(act, "datasync"))
394 else if (!strcmp(act, "trim"))
397 log_err("fio: bad iolog file action: %s\n",
401 fileno = get_fileno(td, fname);
404 if (!strcmp(act, "add")) {
405 if (td->o.replay_redirect &&
406 get_fileno(td, fname) != -1) {
407 dprint(FD_FILE, "iolog: ignoring"
408 " re-add of file %s\n", fname);
410 fileno = add_file(td, fname, 0, 1);
411 file_action = FIO_LOG_ADD_FILE;
414 } else if (!strcmp(act, "open")) {
415 fileno = get_fileno(td, fname);
416 file_action = FIO_LOG_OPEN_FILE;
417 } else if (!strcmp(act, "close")) {
418 fileno = get_fileno(td, fname);
419 file_action = FIO_LOG_CLOSE_FILE;
421 log_err("fio: bad iolog file action: %s\n",
426 log_err("bad iolog2: %s\n", p);
432 else if (rw == DDIR_WRITE) {
434 * Don't add a write for ro mode
439 } else if (rw == DDIR_WAIT) {
443 } else if (rw == DDIR_INVAL) {
444 } else if (!ddir_sync(rw)) {
445 log_err("bad ddir: %d\n", rw);
452 ipo = malloc(sizeof(*ipo));
455 if (rw == DDIR_WAIT) {
458 if (td->o.replay_scale)
459 ipo->offset = offset / td->o.replay_scale;
461 ipo->offset = offset;
462 ipo_bytes_align(td->o.replay_align, ipo);
465 if (rw != DDIR_INVAL && bytes > td->o.max_bs[rw])
466 td->o.max_bs[rw] = bytes;
467 ipo->fileno = fileno;
468 ipo->file_action = file_action;
472 queue_io_piece(td, ipo);
479 if (writes && read_only) {
480 log_err("fio: <%s> skips replay of %d writes due to"
481 " read-only\n", td->o.name, writes);
485 if (!reads && !writes && !waits)
487 else if (reads && !writes)
488 td->o.td_ddir = TD_DDIR_READ;
489 else if (!reads && writes)
490 td->o.td_ddir = TD_DDIR_WRITE;
492 td->o.td_ddir = TD_DDIR_RW;
498 * open iolog, check version, and call appropriate parser
500 static int init_iolog_read(struct thread_data *td)
502 char buffer[256], *p;
506 f = fopen(td->o.read_iolog_file, "r");
508 perror("fopen read iolog");
512 p = fgets(buffer, sizeof(buffer), f);
514 td_verror(td, errno, "iolog read");
515 log_err("fio: unable to read iolog\n");
521 * version 2 of the iolog stores a specific string as the
522 * first line, check for that
524 if (!strncmp(iolog_ver2, buffer, strlen(iolog_ver2)))
525 ret = read_iolog2(td, f);
527 log_err("fio: iolog version 1 is no longer supported\n");
536 * Set up a log for storing io patterns.
538 static int init_iolog_write(struct thread_data *td)
544 f = fopen(td->o.write_iolog_file, "a");
546 perror("fopen write iolog");
551 * That's it for writing, setup a log buffer and we're done.
554 td->iolog_buf = malloc(8192);
555 setvbuf(f, td->iolog_buf, _IOFBF, 8192);
558 * write our version line
560 if (fprintf(f, "%s\n", iolog_ver2) < 0) {
561 perror("iolog init\n");
566 * add all known files
568 for_each_file(td, ff, i)
569 log_file(td, ff, FIO_LOG_ADD_FILE);
574 int init_iolog(struct thread_data *td)
578 if (td->o.read_iolog_file) {
582 * Check if it's a blktrace file and load that if possible.
583 * Otherwise assume it's a normal log file and load that.
585 if (is_blktrace(td->o.read_iolog_file, &need_swap))
586 ret = load_blktrace(td, td->o.read_iolog_file, need_swap);
588 ret = init_iolog_read(td);
589 } else if (td->o.write_iolog_file)
590 ret = init_iolog_write(td);
593 td_verror(td, EINVAL, "failed initializing iolog");
598 void setup_log(struct io_log **log, struct log_params *p,
599 const char *filename)
603 struct io_u_plat_entry *entry;
604 struct flist_head *list;
606 l = scalloc(1, sizeof(*l));
607 INIT_FLIST_HEAD(&l->io_logs);
608 l->log_type = p->log_type;
609 l->log_offset = p->log_offset;
610 l->log_gz = p->log_gz;
611 l->log_gz_store = p->log_gz_store;
612 l->avg_msec = p->avg_msec;
613 l->hist_msec = p->hist_msec;
614 l->hist_coarseness = p->hist_coarseness;
615 l->filename = strdup(filename);
618 /* Initialize histogram lists for each r/w direction,
619 * with initial io_u_plat of all zeros:
621 for (i = 0; i < DDIR_RWDIR_CNT; i++) {
622 list = &l->hist_window[i].list;
623 INIT_FLIST_HEAD(list);
624 entry = calloc(1, sizeof(struct io_u_plat_entry));
625 flist_add(&entry->list, list);
628 if (l->td && l->td->o.io_submit_mode != IO_MODE_OFFLOAD) {
631 p = calloc(1, sizeof(*l->pending));
632 p->max_samples = DEF_LOG_ENTRIES;
633 p->log = calloc(p->max_samples, log_entry_sz(l));
638 l->log_ddir_mask = LOG_OFFSET_SAMPLE_BIT;
640 INIT_FLIST_HEAD(&l->chunk_list);
642 if (l->log_gz && !p->td)
644 else if (l->log_gz || l->log_gz_store) {
645 mutex_init_pshared(&l->chunk_lock);
646 mutex_init_pshared(&l->deferred_free_lock);
647 p->td->flags |= TD_F_COMPRESS_LOG;
653 #ifdef CONFIG_SETVBUF
654 static void *set_file_buffer(FILE *f)
656 size_t size = 1048576;
660 setvbuf(f, buf, _IOFBF, size);
664 static void clear_file_buffer(void *buf)
669 static void *set_file_buffer(FILE *f)
674 static void clear_file_buffer(void *buf)
679 void free_log(struct io_log *log)
681 while (!flist_empty(&log->io_logs)) {
682 struct io_logs *cur_log;
684 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
685 flist_del_init(&cur_log->list);
691 free(log->pending->log);
701 unsigned long hist_sum(int j, int stride, unsigned int *io_u_plat,
702 unsigned int *io_u_plat_last)
707 if (io_u_plat_last) {
708 for (k = sum = 0; k < stride; k++)
709 sum += io_u_plat[j + k] - io_u_plat_last[j + k];
711 for (k = sum = 0; k < stride; k++)
712 sum += io_u_plat[j + k];
718 static void flush_hist_samples(FILE *f, int hist_coarseness, void *samples,
719 uint64_t sample_size)
723 uint64_t i, j, nr_samples;
724 struct io_u_plat_entry *entry, *entry_before;
725 unsigned int *io_u_plat;
726 unsigned int *io_u_plat_before;
728 int stride = 1 << hist_coarseness;
733 s = __get_sample(samples, 0, 0);
734 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
736 nr_samples = sample_size / __log_entry_sz(log_offset);
738 for (i = 0; i < nr_samples; i++) {
739 s = __get_sample(samples, log_offset, i);
741 entry = s->data.plat_entry;
742 io_u_plat = entry->io_u_plat;
744 entry_before = flist_first_entry(&entry->list, struct io_u_plat_entry, list);
745 io_u_plat_before = entry_before->io_u_plat;
747 fprintf(f, "%lu, %u, %u, ", (unsigned long) s->time,
748 io_sample_ddir(s), s->bs);
749 for (j = 0; j < FIO_IO_U_PLAT_NR - stride; j += stride) {
750 fprintf(f, "%lu, ", hist_sum(j, stride, io_u_plat,
753 fprintf(f, "%lu\n", (unsigned long)
754 hist_sum(FIO_IO_U_PLAT_NR - stride, stride, io_u_plat,
757 flist_del(&entry_before->list);
762 void flush_samples(FILE *f, void *samples, uint64_t sample_size)
766 uint64_t i, nr_samples;
771 s = __get_sample(samples, 0, 0);
772 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
774 nr_samples = sample_size / __log_entry_sz(log_offset);
776 for (i = 0; i < nr_samples; i++) {
777 s = __get_sample(samples, log_offset, i);
780 fprintf(f, "%lu, %" PRId64 ", %u, %u\n",
781 (unsigned long) s->time,
783 io_sample_ddir(s), s->bs);
785 struct io_sample_offset *so = (void *) s;
787 fprintf(f, "%lu, %" PRId64 ", %u, %u, %llu\n",
788 (unsigned long) s->time,
790 io_sample_ddir(s), s->bs,
791 (unsigned long long) so->offset);
798 struct iolog_flush_data {
799 struct workqueue_work work;
806 #define GZ_CHUNK 131072
808 static struct iolog_compress *get_new_chunk(unsigned int seq)
810 struct iolog_compress *c;
812 c = malloc(sizeof(*c));
813 INIT_FLIST_HEAD(&c->list);
814 c->buf = malloc(GZ_CHUNK);
820 static void free_chunk(struct iolog_compress *ic)
826 static int z_stream_init(z_stream *stream, int gz_hdr)
830 memset(stream, 0, sizeof(*stream));
831 stream->zalloc = Z_NULL;
832 stream->zfree = Z_NULL;
833 stream->opaque = Z_NULL;
834 stream->next_in = Z_NULL;
837 * zlib magic - add 32 for auto-detection of gz header or not,
838 * if we decide to store files in a gzip friendly format.
843 if (inflateInit2(stream, wbits) != Z_OK)
849 struct inflate_chunk_iter {
858 static void finish_chunk(z_stream *stream, FILE *f,
859 struct inflate_chunk_iter *iter)
863 ret = inflateEnd(stream);
865 log_err("fio: failed to end log inflation seq %d (%d)\n",
868 flush_samples(f, iter->buf, iter->buf_used);
871 iter->buf_size = iter->buf_used = 0;
875 * Iterative chunk inflation. Handles cases where we cross into a new
876 * sequence, doing flush finish of previous chunk if needed.
878 static size_t inflate_chunk(struct iolog_compress *ic, int gz_hdr, FILE *f,
879 z_stream *stream, struct inflate_chunk_iter *iter)
883 dprint(FD_COMPRESS, "inflate chunk size=%lu, seq=%u\n",
884 (unsigned long) ic->len, ic->seq);
886 if (ic->seq != iter->seq) {
888 finish_chunk(stream, f, iter);
890 z_stream_init(stream, gz_hdr);
894 stream->avail_in = ic->len;
895 stream->next_in = ic->buf;
897 if (!iter->buf_size) {
898 iter->buf_size = iter->chunk_sz;
899 iter->buf = malloc(iter->buf_size);
902 while (stream->avail_in) {
903 size_t this_out = iter->buf_size - iter->buf_used;
906 stream->avail_out = this_out;
907 stream->next_out = iter->buf + iter->buf_used;
909 err = inflate(stream, Z_NO_FLUSH);
911 log_err("fio: failed inflating log: %d\n", err);
916 iter->buf_used += this_out - stream->avail_out;
918 if (!stream->avail_out) {
919 iter->buf_size += iter->chunk_sz;
920 iter->buf = realloc(iter->buf, iter->buf_size);
924 if (err == Z_STREAM_END)
928 ret = (void *) stream->next_in - ic->buf;
930 dprint(FD_COMPRESS, "inflated to size=%lu\n", (unsigned long) iter->buf_size);
936 * Inflate stored compressed chunks, or write them directly to the log
937 * file if so instructed.
939 static int inflate_gz_chunks(struct io_log *log, FILE *f)
941 struct inflate_chunk_iter iter = { .chunk_sz = log->log_gz, };
944 while (!flist_empty(&log->chunk_list)) {
945 struct iolog_compress *ic;
947 ic = flist_first_entry(&log->chunk_list, struct iolog_compress, list);
948 flist_del(&ic->list);
950 if (log->log_gz_store) {
953 dprint(FD_COMPRESS, "log write chunk size=%lu, "
954 "seq=%u\n", (unsigned long) ic->len, ic->seq);
956 ret = fwrite(ic->buf, ic->len, 1, f);
957 if (ret != 1 || ferror(f)) {
959 log_err("fio: error writing compressed log\n");
962 inflate_chunk(ic, log->log_gz_store, f, &stream, &iter);
968 finish_chunk(&stream, f, &iter);
976 * Open compressed log file and decompress the stored chunks and
977 * write them to stdout. The chunks are stored sequentially in the
978 * file, so we iterate over them and do them one-by-one.
980 int iolog_file_inflate(const char *file)
982 struct inflate_chunk_iter iter = { .chunk_sz = 64 * 1024 * 1024, };
983 struct iolog_compress ic;
991 f = fopen(file, "r");
997 if (stat(file, &sb) < 0) {
1003 ic.buf = buf = malloc(sb.st_size);
1004 ic.len = sb.st_size;
1007 ret = fread(ic.buf, ic.len, 1, f);
1013 } else if (ret != 1) {
1014 log_err("fio: short read on reading log\n");
1023 * Each chunk will return Z_STREAM_END. We don't know how many
1024 * chunks are in the file, so we just keep looping and incrementing
1025 * the sequence number until we have consumed the whole compressed
1032 iret = inflate_chunk(&ic, 1, stdout, &stream, &iter);
1045 finish_chunk(&stream, stdout, &iter);
1055 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1060 int iolog_file_inflate(const char *file)
1062 log_err("fio: log inflation not possible without zlib\n");
1068 void flush_log(struct io_log *log, bool do_append)
1074 f = fopen(log->filename, "w");
1076 f = fopen(log->filename, "a");
1078 perror("fopen log");
1082 buf = set_file_buffer(f);
1084 inflate_gz_chunks(log, f);
1086 while (!flist_empty(&log->io_logs)) {
1087 struct io_logs *cur_log;
1089 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1090 flist_del_init(&cur_log->list);
1092 if (log->td && log == log->td->clat_hist_log)
1093 flush_hist_samples(f, log->hist_coarseness, cur_log->log,
1094 log_sample_sz(log, cur_log));
1096 flush_samples(f, cur_log->log, log_sample_sz(log, cur_log));
1102 clear_file_buffer(buf);
1105 static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
1107 if (td->flags & TD_F_COMPRESS_LOG)
1111 if (fio_trylock_file(log->filename))
1114 fio_lock_file(log->filename);
1116 if (td->client_type == FIO_CLIENT_TYPE_GUI || is_backend)
1117 fio_send_iolog(td, log, log->filename);
1119 flush_log(log, !td->o.per_job_logs);
1121 fio_unlock_file(log->filename);
1126 size_t log_chunk_sizes(struct io_log *log)
1128 struct flist_head *entry;
1131 if (flist_empty(&log->chunk_list))
1135 pthread_mutex_lock(&log->chunk_lock);
1136 flist_for_each(entry, &log->chunk_list) {
1137 struct iolog_compress *c;
1139 c = flist_entry(entry, struct iolog_compress, list);
1142 pthread_mutex_unlock(&log->chunk_lock);
1148 static bool warned_on_drop;
1150 static void iolog_put_deferred(struct io_log *log, void *ptr)
1155 pthread_mutex_lock(&log->deferred_free_lock);
1156 if (log->deferred < IOLOG_MAX_DEFER) {
1157 log->deferred_items[log->deferred] = ptr;
1159 } else if (!warned_on_drop) {
1160 log_err("fio: had to drop log entry free\n");
1161 warned_on_drop = true;
1163 pthread_mutex_unlock(&log->deferred_free_lock);
1166 static void iolog_free_deferred(struct io_log *log)
1173 pthread_mutex_lock(&log->deferred_free_lock);
1175 for (i = 0; i < log->deferred; i++) {
1176 free(log->deferred_items[i]);
1177 log->deferred_items[i] = NULL;
1181 pthread_mutex_unlock(&log->deferred_free_lock);
1184 static int gz_work(struct iolog_flush_data *data)
1186 struct iolog_compress *c = NULL;
1187 struct flist_head list;
1193 INIT_FLIST_HEAD(&list);
1195 memset(&stream, 0, sizeof(stream));
1196 stream.zalloc = Z_NULL;
1197 stream.zfree = Z_NULL;
1198 stream.opaque = Z_NULL;
1200 ret = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
1202 log_err("fio: failed to init gz stream\n");
1206 seq = ++data->log->chunk_seq;
1208 stream.next_in = (void *) data->samples;
1209 stream.avail_in = data->nr_samples * log_entry_sz(data->log);
1211 dprint(FD_COMPRESS, "deflate input size=%lu, seq=%u, log=%s\n",
1212 (unsigned long) stream.avail_in, seq,
1213 data->log->filename);
1216 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1217 (unsigned long) c->len);
1218 c = get_new_chunk(seq);
1219 stream.avail_out = GZ_CHUNK;
1220 stream.next_out = c->buf;
1221 ret = deflate(&stream, Z_NO_FLUSH);
1223 log_err("fio: deflate log (%d)\n", ret);
1228 c->len = GZ_CHUNK - stream.avail_out;
1229 flist_add_tail(&c->list, &list);
1231 } while (stream.avail_in);
1233 stream.next_out = c->buf + c->len;
1234 stream.avail_out = GZ_CHUNK - c->len;
1236 ret = deflate(&stream, Z_FINISH);
1239 * Z_BUF_ERROR is special, it just means we need more
1240 * output space. We'll handle that below. Treat any other
1243 if (ret != Z_BUF_ERROR) {
1244 log_err("fio: deflate log (%d)\n", ret);
1245 flist_del(&c->list);
1252 c->len = GZ_CHUNK - stream.avail_out;
1254 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq, (unsigned long) c->len);
1256 if (ret != Z_STREAM_END) {
1258 c = get_new_chunk(seq);
1259 stream.avail_out = GZ_CHUNK;
1260 stream.next_out = c->buf;
1261 ret = deflate(&stream, Z_FINISH);
1262 c->len = GZ_CHUNK - stream.avail_out;
1264 flist_add_tail(&c->list, &list);
1265 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1266 (unsigned long) c->len);
1267 } while (ret != Z_STREAM_END);
1270 dprint(FD_COMPRESS, "deflated to size=%lu\n", (unsigned long) total);
1272 ret = deflateEnd(&stream);
1274 log_err("fio: deflateEnd %d\n", ret);
1276 iolog_put_deferred(data->log, data->samples);
1278 if (!flist_empty(&list)) {
1279 pthread_mutex_lock(&data->log->chunk_lock);
1280 flist_splice_tail(&list, &data->log->chunk_list);
1281 pthread_mutex_unlock(&data->log->chunk_lock);
1290 while (!flist_empty(&list)) {
1291 c = flist_first_entry(list.next, struct iolog_compress, list);
1292 flist_del(&c->list);
1300 * Invoked from our compress helper thread, when logging would have exceeded
1301 * the specified memory limitation. Compresses the previously stored
1304 static int gz_work_async(struct submit_worker *sw, struct workqueue_work *work)
1306 return gz_work(container_of(work, struct iolog_flush_data, work));
1309 static int gz_init_worker(struct submit_worker *sw)
1311 struct thread_data *td = sw->wq->td;
1313 if (!fio_option_is_set(&td->o, log_gz_cpumask))
1316 if (fio_setaffinity(gettid(), td->o.log_gz_cpumask) == -1) {
1317 log_err("gz: failed to set CPU affinity\n");
1324 static struct workqueue_ops log_compress_wq_ops = {
1325 .fn = gz_work_async,
1326 .init_worker_fn = gz_init_worker,
1330 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1332 if (!(td->flags & TD_F_COMPRESS_LOG))
1335 workqueue_init(td, &td->log_compress_wq, &log_compress_wq_ops, 1, sk_out);
1339 void iolog_compress_exit(struct thread_data *td)
1341 if (!(td->flags & TD_F_COMPRESS_LOG))
1344 workqueue_exit(&td->log_compress_wq);
1348 * Queue work item to compress the existing log entries. We reset the
1349 * current log to a small size, and reference the existing log in the
1350 * data that we queue for compression. Once compression has been done,
1351 * this old log is freed. If called with finish == true, will not return
1352 * until the log compression has completed, and will flush all previous
1355 static int iolog_flush(struct io_log *log)
1357 struct iolog_flush_data *data;
1359 data = malloc(sizeof(*data));
1366 while (!flist_empty(&log->io_logs)) {
1367 struct io_logs *cur_log;
1369 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1370 flist_del_init(&cur_log->list);
1372 data->samples = cur_log->log;
1373 data->nr_samples = cur_log->nr_samples;
1384 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1386 struct iolog_flush_data *data;
1388 data = smalloc(sizeof(*data));
1394 data->samples = cur_log->log;
1395 data->nr_samples = cur_log->nr_samples;
1398 cur_log->nr_samples = cur_log->max_samples = 0;
1399 cur_log->log = NULL;
1401 workqueue_enqueue(&log->td->log_compress_wq, &data->work);
1403 iolog_free_deferred(log);
1409 static int iolog_flush(struct io_log *log)
1414 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1419 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1424 void iolog_compress_exit(struct thread_data *td)
1430 struct io_logs *iolog_cur_log(struct io_log *log)
1432 if (flist_empty(&log->io_logs))
1435 return flist_last_entry(&log->io_logs, struct io_logs, list);
1438 uint64_t iolog_nr_samples(struct io_log *iolog)
1440 struct flist_head *entry;
1443 flist_for_each(entry, &iolog->io_logs) {
1444 struct io_logs *cur_log;
1446 cur_log = flist_entry(entry, struct io_logs, list);
1447 ret += cur_log->nr_samples;
1453 static int __write_log(struct thread_data *td, struct io_log *log, int try)
1456 return finish_log(td, log, try);
1461 static int write_iops_log(struct thread_data *td, int try, bool unit_log)
1465 if (per_unit_log(td->iops_log) != unit_log)
1468 ret = __write_log(td, td->iops_log, try);
1470 td->iops_log = NULL;
1475 static int write_slat_log(struct thread_data *td, int try, bool unit_log)
1482 ret = __write_log(td, td->slat_log, try);
1484 td->slat_log = NULL;
1489 static int write_clat_log(struct thread_data *td, int try, bool unit_log)
1496 ret = __write_log(td, td->clat_log, try);
1498 td->clat_log = NULL;
1503 static int write_clat_hist_log(struct thread_data *td, int try, bool unit_log)
1510 ret = __write_log(td, td->clat_hist_log, try);
1512 td->clat_hist_log = NULL;
1517 static int write_lat_log(struct thread_data *td, int try, bool unit_log)
1524 ret = __write_log(td, td->lat_log, try);
1531 static int write_bandw_log(struct thread_data *td, int try, bool unit_log)
1535 if (per_unit_log(td->bw_log) != unit_log)
1538 ret = __write_log(td, td->bw_log, try);
1551 CLAT_HIST_LOG_MASK = 32,
1558 int (*fn)(struct thread_data *, int, bool);
1561 static struct log_type log_types[] = {
1563 .mask = BW_LOG_MASK,
1564 .fn = write_bandw_log,
1567 .mask = LAT_LOG_MASK,
1568 .fn = write_lat_log,
1571 .mask = SLAT_LOG_MASK,
1572 .fn = write_slat_log,
1575 .mask = CLAT_LOG_MASK,
1576 .fn = write_clat_log,
1579 .mask = IOPS_LOG_MASK,
1580 .fn = write_iops_log,
1583 .mask = CLAT_HIST_LOG_MASK,
1584 .fn = write_clat_hist_log,
1588 void td_writeout_logs(struct thread_data *td, bool unit_logs)
1590 unsigned int log_mask = 0;
1591 unsigned int log_left = ALL_LOG_NR;
1594 old_state = td_bump_runstate(td, TD_FINISHING);
1596 finalize_logs(td, unit_logs);
1599 int prev_log_left = log_left;
1601 for (i = 0; i < ALL_LOG_NR && log_left; i++) {
1602 struct log_type *lt = &log_types[i];
1605 if (!(log_mask & lt->mask)) {
1606 ret = lt->fn(td, log_left != 1, unit_logs);
1609 log_mask |= lt->mask;
1614 if (prev_log_left == log_left)
1618 td_restore_runstate(td, old_state);
1621 void fio_writeout_logs(bool unit_logs)
1623 struct thread_data *td;
1627 td_writeout_logs(td, unit_logs);