2 * Code related to writing an iolog of what a thread is doing, and to
3 * later read that back and replay
24 static int iolog_flush(struct io_log *log);
26 static const char iolog_ver2[] = "fio version 2 iolog";
28 void queue_io_piece(struct thread_data *td, struct io_piece *ipo)
30 flist_add_tail(&ipo->list, &td->io_log_list);
31 td->total_io_size += ipo->len;
34 void log_io_u(const struct thread_data *td, const struct io_u *io_u)
36 if (!td->o.write_iolog_file)
39 fprintf(td->iolog_f, "%s %s %llu %lu\n", io_u->file->file_name,
40 io_ddir_name(io_u->ddir),
41 io_u->offset, io_u->buflen);
44 void log_file(struct thread_data *td, struct fio_file *f,
45 enum file_log_act what)
47 const char *act[] = { "add", "open", "close" };
51 if (!td->o.write_iolog_file)
56 * this happens on the pre-open/close done before the job starts
61 fprintf(td->iolog_f, "%s %s\n", f->file_name, act[what]);
64 static void iolog_delay(struct thread_data *td, unsigned long delay)
66 uint64_t usec = utime_since_now(&td->last_issue);
70 if (delay < td->time_offset) {
75 delay -= td->time_offset;
81 fio_gettime(&ts, NULL);
82 while (delay && !td->terminate) {
84 if (this_delay > 500000)
87 usec_sleep(td, this_delay);
91 usec = utime_since_now(&ts);
93 td->time_offset = usec - delay;
98 static int ipo_special(struct thread_data *td, struct io_piece *ipo)
106 if (ipo->ddir != DDIR_INVAL)
109 f = td->files[ipo->fileno];
111 switch (ipo->file_action) {
112 case FIO_LOG_OPEN_FILE:
113 if (td->o.replay_redirect && fio_file_open(f)) {
114 dprint(FD_FILE, "iolog: ignoring re-open of file %s\n",
118 ret = td_io_open_file(td, f);
121 td_verror(td, ret, "iolog open file");
123 case FIO_LOG_CLOSE_FILE:
124 td_io_close_file(td, f);
126 case FIO_LOG_UNLINK_FILE:
127 td_io_unlink_file(td, f);
130 log_err("fio: bad file action %d\n", ipo->file_action);
137 int read_iolog_get(struct thread_data *td, struct io_u *io_u)
139 struct io_piece *ipo;
140 unsigned long elapsed;
142 while (!flist_empty(&td->io_log_list)) {
145 ipo = flist_first_entry(&td->io_log_list, struct io_piece, list);
146 flist_del(&ipo->list);
147 remove_trim_entry(td, ipo);
149 ret = ipo_special(td, ipo);
153 } else if (ret > 0) {
158 io_u->ddir = ipo->ddir;
159 if (ipo->ddir != DDIR_WAIT) {
160 io_u->offset = ipo->offset;
161 io_u->buflen = ipo->len;
162 io_u->file = td->files[ipo->fileno];
163 get_file(io_u->file);
164 dprint(FD_IO, "iolog: get %llu/%lu/%s\n", io_u->offset,
165 io_u->buflen, io_u->file->file_name);
167 iolog_delay(td, ipo->delay);
169 elapsed = mtime_since_genesis();
170 if (ipo->delay > elapsed)
171 usec_sleep(td, (ipo->delay - elapsed) * 1000);
176 if (io_u->ddir != DDIR_WAIT)
184 void prune_io_piece_log(struct thread_data *td)
186 struct io_piece *ipo;
189 while ((n = rb_first(&td->io_hist_tree)) != NULL) {
190 ipo = rb_entry(n, struct io_piece, rb_node);
191 rb_erase(n, &td->io_hist_tree);
192 remove_trim_entry(td, ipo);
197 while (!flist_empty(&td->io_hist_list)) {
198 ipo = flist_first_entry(&td->io_hist_list, struct io_piece, list);
199 flist_del(&ipo->list);
200 remove_trim_entry(td, ipo);
207 * log a successful write, so we can unwind the log for verify
209 void log_io_piece(struct thread_data *td, struct io_u *io_u)
211 struct rb_node **p, *parent;
212 struct io_piece *ipo, *__ipo;
214 ipo = malloc(sizeof(struct io_piece));
216 ipo->file = io_u->file;
217 ipo->offset = io_u->offset;
218 ipo->len = io_u->buflen;
219 ipo->numberio = io_u->numberio;
220 ipo->flags = IP_F_IN_FLIGHT;
224 if (io_u_should_trim(td, io_u)) {
225 flist_add_tail(&ipo->trim_list, &td->trim_list);
230 * We don't need to sort the entries if we only performed sequential
231 * writes. In this case, just reading back data in the order we wrote
232 * it out is the faster but still safe.
234 * One exception is if we don't have a random map AND we are doing
235 * verifies, in that case we need to check for duplicate blocks and
236 * drop the old one, which we rely on the rb insert/lookup for
239 if (((!td->o.verifysort) || !td_random(td)) &&
240 (file_randommap(td, ipo->file) || td->o.verify == VERIFY_NONE)) {
241 INIT_FLIST_HEAD(&ipo->list);
242 flist_add_tail(&ipo->list, &td->io_hist_list);
243 ipo->flags |= IP_F_ONLIST;
248 RB_CLEAR_NODE(&ipo->rb_node);
251 * Sort the entry into the verification list
254 p = &td->io_hist_tree.rb_node;
260 __ipo = rb_entry(parent, struct io_piece, rb_node);
261 if (ipo->file < __ipo->file)
263 else if (ipo->file > __ipo->file)
265 else if (ipo->offset < __ipo->offset) {
267 overlap = ipo->offset + ipo->len > __ipo->offset;
269 else if (ipo->offset > __ipo->offset) {
271 overlap = __ipo->offset + __ipo->len > ipo->offset;
277 dprint(FD_IO, "iolog: overlap %llu/%lu, %llu/%lu\n",
278 __ipo->offset, __ipo->len,
279 ipo->offset, ipo->len);
281 rb_erase(parent, &td->io_hist_tree);
282 remove_trim_entry(td, __ipo);
283 if (!(__ipo->flags & IP_F_IN_FLIGHT))
289 rb_link_node(&ipo->rb_node, parent, p);
290 rb_insert_color(&ipo->rb_node, &td->io_hist_tree);
291 ipo->flags |= IP_F_ONRB;
295 void unlog_io_piece(struct thread_data *td, struct io_u *io_u)
297 struct io_piece *ipo = io_u->ipo;
299 if (td->ts.nr_block_infos) {
300 uint32_t *info = io_u_block_info(td, io_u);
301 if (BLOCK_INFO_STATE(*info) < BLOCK_STATE_TRIM_FAILURE) {
302 if (io_u->ddir == DDIR_TRIM)
303 *info = BLOCK_INFO_SET_STATE(*info,
304 BLOCK_STATE_TRIM_FAILURE);
305 else if (io_u->ddir == DDIR_WRITE)
306 *info = BLOCK_INFO_SET_STATE(*info,
307 BLOCK_STATE_WRITE_FAILURE);
314 if (ipo->flags & IP_F_ONRB)
315 rb_erase(&ipo->rb_node, &td->io_hist_tree);
316 else if (ipo->flags & IP_F_ONLIST)
317 flist_del(&ipo->list);
324 void trim_io_piece(struct thread_data *td, const struct io_u *io_u)
326 struct io_piece *ipo = io_u->ipo;
331 ipo->len = io_u->xfer_buflen - io_u->resid;
334 void write_iolog_close(struct thread_data *td)
340 td->iolog_buf = NULL;
344 * Read version 2 iolog data. It is enhanced to include per-file logging,
347 static int read_iolog2(struct thread_data *td, FILE *f)
349 unsigned long long offset;
351 int reads, writes, waits, fileno = 0, file_action = 0; /* stupid gcc */
352 char *rfname, *fname, *act;
356 free_release_files(td);
359 * Read in the read iolog and store it, reuse the infrastructure
360 * for doing verifications.
363 rfname = fname = malloc(256+16);
364 act = malloc(256+16);
366 reads = writes = waits = 0;
367 while ((p = fgets(str, 4096, f)) != NULL) {
368 struct io_piece *ipo;
371 r = sscanf(p, "%256s %256s %llu %u", rfname, act, &offset,
374 if (td->o.replay_redirect)
375 fname = td->o.replay_redirect;
381 if (!strcmp(act, "wait"))
383 else if (!strcmp(act, "read"))
385 else if (!strcmp(act, "write"))
387 else if (!strcmp(act, "sync"))
389 else if (!strcmp(act, "datasync"))
391 else if (!strcmp(act, "trim"))
394 log_err("fio: bad iolog file action: %s\n",
398 fileno = get_fileno(td, fname);
401 if (!strcmp(act, "add")) {
402 if (td->o.replay_redirect &&
403 get_fileno(td, fname) != -1) {
404 dprint(FD_FILE, "iolog: ignoring"
405 " re-add of file %s\n", fname);
407 fileno = add_file(td, fname, 0, 1);
408 file_action = FIO_LOG_ADD_FILE;
411 } else if (!strcmp(act, "open")) {
412 fileno = get_fileno(td, fname);
413 file_action = FIO_LOG_OPEN_FILE;
414 } else if (!strcmp(act, "close")) {
415 fileno = get_fileno(td, fname);
416 file_action = FIO_LOG_CLOSE_FILE;
418 log_err("fio: bad iolog file action: %s\n",
423 log_err("bad iolog2: %s\n", p);
429 else if (rw == DDIR_WRITE) {
431 * Don't add a write for ro mode
436 } else if (rw == DDIR_WAIT) {
440 } else if (rw == DDIR_INVAL) {
441 } else if (!ddir_sync(rw)) {
442 log_err("bad ddir: %d\n", rw);
449 ipo = malloc(sizeof(*ipo));
452 if (rw == DDIR_WAIT) {
455 if (td->o.replay_scale)
456 ipo->offset = offset / td->o.replay_scale;
458 ipo->offset = offset;
459 ipo_bytes_align(td->o.replay_align, ipo);
462 if (rw != DDIR_INVAL && bytes > td->o.max_bs[rw])
463 td->o.max_bs[rw] = bytes;
464 ipo->fileno = fileno;
465 ipo->file_action = file_action;
469 queue_io_piece(td, ipo);
476 if (writes && read_only) {
477 log_err("fio: <%s> skips replay of %d writes due to"
478 " read-only\n", td->o.name, writes);
482 if (!reads && !writes && !waits)
484 else if (reads && !writes)
485 td->o.td_ddir = TD_DDIR_READ;
486 else if (!reads && writes)
487 td->o.td_ddir = TD_DDIR_WRITE;
489 td->o.td_ddir = TD_DDIR_RW;
495 * open iolog, check version, and call appropriate parser
497 static int init_iolog_read(struct thread_data *td)
499 char buffer[256], *p;
503 f = fopen(td->o.read_iolog_file, "r");
505 perror("fopen read iolog");
509 p = fgets(buffer, sizeof(buffer), f);
511 td_verror(td, errno, "iolog read");
512 log_err("fio: unable to read iolog\n");
518 * version 2 of the iolog stores a specific string as the
519 * first line, check for that
521 if (!strncmp(iolog_ver2, buffer, strlen(iolog_ver2)))
522 ret = read_iolog2(td, f);
524 log_err("fio: iolog version 1 is no longer supported\n");
533 * Set up a log for storing io patterns.
535 static int init_iolog_write(struct thread_data *td)
541 f = fopen(td->o.write_iolog_file, "a");
543 perror("fopen write iolog");
548 * That's it for writing, setup a log buffer and we're done.
551 td->iolog_buf = malloc(8192);
552 setvbuf(f, td->iolog_buf, _IOFBF, 8192);
555 * write our version line
557 if (fprintf(f, "%s\n", iolog_ver2) < 0) {
558 perror("iolog init\n");
563 * add all known files
565 for_each_file(td, ff, i)
566 log_file(td, ff, FIO_LOG_ADD_FILE);
571 int init_iolog(struct thread_data *td)
575 if (td->o.read_iolog_file) {
579 * Check if it's a blktrace file and load that if possible.
580 * Otherwise assume it's a normal log file and load that.
582 if (is_blktrace(td->o.read_iolog_file, &need_swap))
583 ret = load_blktrace(td, td->o.read_iolog_file, need_swap);
585 ret = init_iolog_read(td);
586 } else if (td->o.write_iolog_file)
587 ret = init_iolog_write(td);
590 td_verror(td, EINVAL, "failed initializing iolog");
595 void setup_log(struct io_log **log, struct log_params *p,
596 const char *filename)
600 struct io_u_plat_entry *entry;
601 struct flist_head *list;
603 l = scalloc(1, sizeof(*l));
604 INIT_FLIST_HEAD(&l->io_logs);
605 l->log_type = p->log_type;
606 l->log_offset = p->log_offset;
607 l->log_gz = p->log_gz;
608 l->log_gz_store = p->log_gz_store;
609 l->avg_msec = p->avg_msec;
610 l->hist_msec = p->hist_msec;
611 l->hist_coarseness = p->hist_coarseness;
612 l->filename = strdup(filename);
615 /* Initialize histogram lists for each r/w direction,
616 * with initial io_u_plat of all zeros:
618 for (i = 0; i < DDIR_RWDIR_CNT; i++) {
619 list = &l->hist_window[i].list;
620 INIT_FLIST_HEAD(list);
621 entry = calloc(1, sizeof(struct io_u_plat_entry));
622 flist_add(&entry->list, list);
625 if (l->td && l->td->o.io_submit_mode != IO_MODE_OFFLOAD) {
628 p = calloc(1, sizeof(*l->pending));
629 p->max_samples = DEF_LOG_ENTRIES;
630 p->log = calloc(p->max_samples, log_entry_sz(l));
635 l->log_ddir_mask = LOG_OFFSET_SAMPLE_BIT;
637 INIT_FLIST_HEAD(&l->chunk_list);
639 if (l->log_gz && !p->td)
641 else if (l->log_gz || l->log_gz_store) {
642 mutex_init_pshared(&l->chunk_lock);
643 mutex_init_pshared(&l->deferred_free_lock);
644 p->td->flags |= TD_F_COMPRESS_LOG;
650 #ifdef CONFIG_SETVBUF
651 static void *set_file_buffer(FILE *f)
653 size_t size = 1048576;
657 setvbuf(f, buf, _IOFBF, size);
661 static void clear_file_buffer(void *buf)
666 static void *set_file_buffer(FILE *f)
671 static void clear_file_buffer(void *buf)
676 void free_log(struct io_log *log)
678 while (!flist_empty(&log->io_logs)) {
679 struct io_logs *cur_log;
681 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
682 flist_del_init(&cur_log->list);
688 free(log->pending->log);
698 unsigned long hist_sum(int j, int stride, unsigned int *io_u_plat,
699 unsigned int *io_u_plat_last)
704 if (io_u_plat_last) {
705 for (k = sum = 0; k < stride; k++)
706 sum += io_u_plat[j + k] - io_u_plat_last[j + k];
708 for (k = sum = 0; k < stride; k++)
709 sum += io_u_plat[j + k];
715 static void flush_hist_samples(FILE *f, int hist_coarseness, void *samples,
716 uint64_t sample_size)
720 uint64_t i, j, nr_samples;
721 struct io_u_plat_entry *entry, *entry_before;
722 unsigned int *io_u_plat;
723 unsigned int *io_u_plat_before;
725 int stride = 1 << hist_coarseness;
730 s = __get_sample(samples, 0, 0);
731 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
733 nr_samples = sample_size / __log_entry_sz(log_offset);
735 for (i = 0; i < nr_samples; i++) {
736 s = __get_sample(samples, log_offset, i);
738 entry = s->data.plat_entry;
739 io_u_plat = entry->io_u_plat;
741 entry_before = flist_first_entry(&entry->list, struct io_u_plat_entry, list);
742 io_u_plat_before = entry_before->io_u_plat;
744 fprintf(f, "%lu, %u, %u, ", (unsigned long) s->time,
745 io_sample_ddir(s), s->bs);
746 for (j = 0; j < FIO_IO_U_PLAT_NR - stride; j += stride) {
747 fprintf(f, "%lu, ", hist_sum(j, stride, io_u_plat,
750 fprintf(f, "%lu\n", (unsigned long)
751 hist_sum(FIO_IO_U_PLAT_NR - stride, stride, io_u_plat,
754 flist_del(&entry_before->list);
759 void flush_samples(FILE *f, void *samples, uint64_t sample_size)
763 uint64_t i, nr_samples;
768 s = __get_sample(samples, 0, 0);
769 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
771 nr_samples = sample_size / __log_entry_sz(log_offset);
773 for (i = 0; i < nr_samples; i++) {
774 s = __get_sample(samples, log_offset, i);
777 fprintf(f, "%lu, %" PRId64 ", %u, %u\n",
778 (unsigned long) s->time,
780 io_sample_ddir(s), s->bs);
782 struct io_sample_offset *so = (void *) s;
784 fprintf(f, "%lu, %" PRId64 ", %u, %u, %llu\n",
785 (unsigned long) s->time,
787 io_sample_ddir(s), s->bs,
788 (unsigned long long) so->offset);
795 struct iolog_flush_data {
796 struct workqueue_work work;
803 #define GZ_CHUNK 131072
805 static struct iolog_compress *get_new_chunk(unsigned int seq)
807 struct iolog_compress *c;
809 c = malloc(sizeof(*c));
810 INIT_FLIST_HEAD(&c->list);
811 c->buf = malloc(GZ_CHUNK);
817 static void free_chunk(struct iolog_compress *ic)
823 static int z_stream_init(z_stream *stream, int gz_hdr)
827 memset(stream, 0, sizeof(*stream));
828 stream->zalloc = Z_NULL;
829 stream->zfree = Z_NULL;
830 stream->opaque = Z_NULL;
831 stream->next_in = Z_NULL;
834 * zlib magic - add 32 for auto-detection of gz header or not,
835 * if we decide to store files in a gzip friendly format.
840 if (inflateInit2(stream, wbits) != Z_OK)
846 struct inflate_chunk_iter {
855 static void finish_chunk(z_stream *stream, FILE *f,
856 struct inflate_chunk_iter *iter)
860 ret = inflateEnd(stream);
862 log_err("fio: failed to end log inflation seq %d (%d)\n",
865 flush_samples(f, iter->buf, iter->buf_used);
868 iter->buf_size = iter->buf_used = 0;
872 * Iterative chunk inflation. Handles cases where we cross into a new
873 * sequence, doing flush finish of previous chunk if needed.
875 static size_t inflate_chunk(struct iolog_compress *ic, int gz_hdr, FILE *f,
876 z_stream *stream, struct inflate_chunk_iter *iter)
880 dprint(FD_COMPRESS, "inflate chunk size=%lu, seq=%u\n",
881 (unsigned long) ic->len, ic->seq);
883 if (ic->seq != iter->seq) {
885 finish_chunk(stream, f, iter);
887 z_stream_init(stream, gz_hdr);
891 stream->avail_in = ic->len;
892 stream->next_in = ic->buf;
894 if (!iter->buf_size) {
895 iter->buf_size = iter->chunk_sz;
896 iter->buf = malloc(iter->buf_size);
899 while (stream->avail_in) {
900 size_t this_out = iter->buf_size - iter->buf_used;
903 stream->avail_out = this_out;
904 stream->next_out = iter->buf + iter->buf_used;
906 err = inflate(stream, Z_NO_FLUSH);
908 log_err("fio: failed inflating log: %d\n", err);
913 iter->buf_used += this_out - stream->avail_out;
915 if (!stream->avail_out) {
916 iter->buf_size += iter->chunk_sz;
917 iter->buf = realloc(iter->buf, iter->buf_size);
921 if (err == Z_STREAM_END)
925 ret = (void *) stream->next_in - ic->buf;
927 dprint(FD_COMPRESS, "inflated to size=%lu\n", (unsigned long) iter->buf_size);
933 * Inflate stored compressed chunks, or write them directly to the log
934 * file if so instructed.
936 static int inflate_gz_chunks(struct io_log *log, FILE *f)
938 struct inflate_chunk_iter iter = { .chunk_sz = log->log_gz, };
941 while (!flist_empty(&log->chunk_list)) {
942 struct iolog_compress *ic;
944 ic = flist_first_entry(&log->chunk_list, struct iolog_compress, list);
945 flist_del(&ic->list);
947 if (log->log_gz_store) {
950 dprint(FD_COMPRESS, "log write chunk size=%lu, "
951 "seq=%u\n", (unsigned long) ic->len, ic->seq);
953 ret = fwrite(ic->buf, ic->len, 1, f);
954 if (ret != 1 || ferror(f)) {
956 log_err("fio: error writing compressed log\n");
959 inflate_chunk(ic, log->log_gz_store, f, &stream, &iter);
965 finish_chunk(&stream, f, &iter);
973 * Open compressed log file and decompress the stored chunks and
974 * write them to stdout. The chunks are stored sequentially in the
975 * file, so we iterate over them and do them one-by-one.
977 int iolog_file_inflate(const char *file)
979 struct inflate_chunk_iter iter = { .chunk_sz = 64 * 1024 * 1024, };
980 struct iolog_compress ic;
988 f = fopen(file, "r");
994 if (stat(file, &sb) < 0) {
1000 ic.buf = buf = malloc(sb.st_size);
1001 ic.len = sb.st_size;
1004 ret = fread(ic.buf, ic.len, 1, f);
1010 } else if (ret != 1) {
1011 log_err("fio: short read on reading log\n");
1020 * Each chunk will return Z_STREAM_END. We don't know how many
1021 * chunks are in the file, so we just keep looping and incrementing
1022 * the sequence number until we have consumed the whole compressed
1029 iret = inflate_chunk(&ic, 1, stdout, &stream, &iter);
1042 finish_chunk(&stream, stdout, &iter);
1052 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1057 int iolog_file_inflate(const char *file)
1059 log_err("fio: log inflation not possible without zlib\n");
1065 void flush_log(struct io_log *log, bool do_append)
1071 f = fopen(log->filename, "w");
1073 f = fopen(log->filename, "a");
1075 perror("fopen log");
1079 buf = set_file_buffer(f);
1081 inflate_gz_chunks(log, f);
1083 while (!flist_empty(&log->io_logs)) {
1084 struct io_logs *cur_log;
1086 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1087 flist_del_init(&cur_log->list);
1089 if (log->td && log == log->td->clat_hist_log)
1090 flush_hist_samples(f, log->hist_coarseness, cur_log->log,
1091 log_sample_sz(log, cur_log));
1093 flush_samples(f, cur_log->log, log_sample_sz(log, cur_log));
1099 clear_file_buffer(buf);
1102 static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
1104 if (td->flags & TD_F_COMPRESS_LOG)
1108 if (fio_trylock_file(log->filename))
1111 fio_lock_file(log->filename);
1113 if (td->client_type == FIO_CLIENT_TYPE_GUI || is_backend)
1114 fio_send_iolog(td, log, log->filename);
1116 flush_log(log, !td->o.per_job_logs);
1118 fio_unlock_file(log->filename);
1123 size_t log_chunk_sizes(struct io_log *log)
1125 struct flist_head *entry;
1128 if (flist_empty(&log->chunk_list))
1132 pthread_mutex_lock(&log->chunk_lock);
1133 flist_for_each(entry, &log->chunk_list) {
1134 struct iolog_compress *c;
1136 c = flist_entry(entry, struct iolog_compress, list);
1139 pthread_mutex_unlock(&log->chunk_lock);
1145 static bool warned_on_drop;
1147 static void iolog_put_deferred(struct io_log *log, void *ptr)
1152 pthread_mutex_lock(&log->deferred_free_lock);
1153 if (log->deferred < IOLOG_MAX_DEFER) {
1154 log->deferred_items[log->deferred] = ptr;
1156 } else if (!warned_on_drop) {
1157 log_err("fio: had to drop log entry free\n");
1158 warned_on_drop = true;
1160 pthread_mutex_unlock(&log->deferred_free_lock);
1163 static void iolog_free_deferred(struct io_log *log)
1170 pthread_mutex_lock(&log->deferred_free_lock);
1172 for (i = 0; i < log->deferred; i++) {
1173 free(log->deferred_items[i]);
1174 log->deferred_items[i] = NULL;
1178 pthread_mutex_unlock(&log->deferred_free_lock);
1181 static int gz_work(struct iolog_flush_data *data)
1183 struct iolog_compress *c = NULL;
1184 struct flist_head list;
1190 INIT_FLIST_HEAD(&list);
1192 memset(&stream, 0, sizeof(stream));
1193 stream.zalloc = Z_NULL;
1194 stream.zfree = Z_NULL;
1195 stream.opaque = Z_NULL;
1197 ret = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
1199 log_err("fio: failed to init gz stream\n");
1203 seq = ++data->log->chunk_seq;
1205 stream.next_in = (void *) data->samples;
1206 stream.avail_in = data->nr_samples * log_entry_sz(data->log);
1208 dprint(FD_COMPRESS, "deflate input size=%lu, seq=%u, log=%s\n",
1209 (unsigned long) stream.avail_in, seq,
1210 data->log->filename);
1213 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1214 (unsigned long) c->len);
1215 c = get_new_chunk(seq);
1216 stream.avail_out = GZ_CHUNK;
1217 stream.next_out = c->buf;
1218 ret = deflate(&stream, Z_NO_FLUSH);
1220 log_err("fio: deflate log (%d)\n", ret);
1225 c->len = GZ_CHUNK - stream.avail_out;
1226 flist_add_tail(&c->list, &list);
1228 } while (stream.avail_in);
1230 stream.next_out = c->buf + c->len;
1231 stream.avail_out = GZ_CHUNK - c->len;
1233 ret = deflate(&stream, Z_FINISH);
1236 * Z_BUF_ERROR is special, it just means we need more
1237 * output space. We'll handle that below. Treat any other
1240 if (ret != Z_BUF_ERROR) {
1241 log_err("fio: deflate log (%d)\n", ret);
1242 flist_del(&c->list);
1249 c->len = GZ_CHUNK - stream.avail_out;
1251 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq, (unsigned long) c->len);
1253 if (ret != Z_STREAM_END) {
1255 c = get_new_chunk(seq);
1256 stream.avail_out = GZ_CHUNK;
1257 stream.next_out = c->buf;
1258 ret = deflate(&stream, Z_FINISH);
1259 c->len = GZ_CHUNK - stream.avail_out;
1261 flist_add_tail(&c->list, &list);
1262 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1263 (unsigned long) c->len);
1264 } while (ret != Z_STREAM_END);
1267 dprint(FD_COMPRESS, "deflated to size=%lu\n", (unsigned long) total);
1269 ret = deflateEnd(&stream);
1271 log_err("fio: deflateEnd %d\n", ret);
1273 iolog_put_deferred(data->log, data->samples);
1275 if (!flist_empty(&list)) {
1276 pthread_mutex_lock(&data->log->chunk_lock);
1277 flist_splice_tail(&list, &data->log->chunk_list);
1278 pthread_mutex_unlock(&data->log->chunk_lock);
1287 while (!flist_empty(&list)) {
1288 c = flist_first_entry(list.next, struct iolog_compress, list);
1289 flist_del(&c->list);
1297 * Invoked from our compress helper thread, when logging would have exceeded
1298 * the specified memory limitation. Compresses the previously stored
1301 static int gz_work_async(struct submit_worker *sw, struct workqueue_work *work)
1303 return gz_work(container_of(work, struct iolog_flush_data, work));
1306 static int gz_init_worker(struct submit_worker *sw)
1308 struct thread_data *td = sw->wq->td;
1310 if (!fio_option_is_set(&td->o, log_gz_cpumask))
1313 if (fio_setaffinity(gettid(), td->o.log_gz_cpumask) == -1) {
1314 log_err("gz: failed to set CPU affinity\n");
1321 static struct workqueue_ops log_compress_wq_ops = {
1322 .fn = gz_work_async,
1323 .init_worker_fn = gz_init_worker,
1327 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1329 if (!(td->flags & TD_F_COMPRESS_LOG))
1332 workqueue_init(td, &td->log_compress_wq, &log_compress_wq_ops, 1, sk_out);
1336 void iolog_compress_exit(struct thread_data *td)
1338 if (!(td->flags & TD_F_COMPRESS_LOG))
1341 workqueue_exit(&td->log_compress_wq);
1345 * Queue work item to compress the existing log entries. We reset the
1346 * current log to a small size, and reference the existing log in the
1347 * data that we queue for compression. Once compression has been done,
1348 * this old log is freed. If called with finish == true, will not return
1349 * until the log compression has completed, and will flush all previous
1352 static int iolog_flush(struct io_log *log)
1354 struct iolog_flush_data *data;
1356 data = malloc(sizeof(*data));
1363 while (!flist_empty(&log->io_logs)) {
1364 struct io_logs *cur_log;
1366 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1367 flist_del_init(&cur_log->list);
1369 data->samples = cur_log->log;
1370 data->nr_samples = cur_log->nr_samples;
1381 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1383 struct iolog_flush_data *data;
1385 data = smalloc(sizeof(*data));
1391 data->samples = cur_log->log;
1392 data->nr_samples = cur_log->nr_samples;
1395 cur_log->nr_samples = cur_log->max_samples = 0;
1396 cur_log->log = NULL;
1398 workqueue_enqueue(&log->td->log_compress_wq, &data->work);
1400 iolog_free_deferred(log);
1406 static int iolog_flush(struct io_log *log)
1411 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1416 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1421 void iolog_compress_exit(struct thread_data *td)
1427 struct io_logs *iolog_cur_log(struct io_log *log)
1429 if (flist_empty(&log->io_logs))
1432 return flist_last_entry(&log->io_logs, struct io_logs, list);
1435 uint64_t iolog_nr_samples(struct io_log *iolog)
1437 struct flist_head *entry;
1440 flist_for_each(entry, &iolog->io_logs) {
1441 struct io_logs *cur_log;
1443 cur_log = flist_entry(entry, struct io_logs, list);
1444 ret += cur_log->nr_samples;
1450 static int __write_log(struct thread_data *td, struct io_log *log, int try)
1453 return finish_log(td, log, try);
1458 static int write_iops_log(struct thread_data *td, int try, bool unit_log)
1462 if (per_unit_log(td->iops_log) != unit_log)
1465 ret = __write_log(td, td->iops_log, try);
1467 td->iops_log = NULL;
1472 static int write_slat_log(struct thread_data *td, int try, bool unit_log)
1479 ret = __write_log(td, td->slat_log, try);
1481 td->slat_log = NULL;
1486 static int write_clat_log(struct thread_data *td, int try, bool unit_log)
1493 ret = __write_log(td, td->clat_log, try);
1495 td->clat_log = NULL;
1500 static int write_clat_hist_log(struct thread_data *td, int try, bool unit_log)
1507 ret = __write_log(td, td->clat_hist_log, try);
1509 td->clat_hist_log = NULL;
1514 static int write_lat_log(struct thread_data *td, int try, bool unit_log)
1521 ret = __write_log(td, td->lat_log, try);
1528 static int write_bandw_log(struct thread_data *td, int try, bool unit_log)
1532 if (per_unit_log(td->bw_log) != unit_log)
1535 ret = __write_log(td, td->bw_log, try);
1548 CLAT_HIST_LOG_MASK = 32,
1555 int (*fn)(struct thread_data *, int, bool);
1558 static struct log_type log_types[] = {
1560 .mask = BW_LOG_MASK,
1561 .fn = write_bandw_log,
1564 .mask = LAT_LOG_MASK,
1565 .fn = write_lat_log,
1568 .mask = SLAT_LOG_MASK,
1569 .fn = write_slat_log,
1572 .mask = CLAT_LOG_MASK,
1573 .fn = write_clat_log,
1576 .mask = IOPS_LOG_MASK,
1577 .fn = write_iops_log,
1580 .mask = CLAT_HIST_LOG_MASK,
1581 .fn = write_clat_hist_log,
1585 void td_writeout_logs(struct thread_data *td, bool unit_logs)
1587 unsigned int log_mask = 0;
1588 unsigned int log_left = ALL_LOG_NR;
1591 old_state = td_bump_runstate(td, TD_FINISHING);
1593 finalize_logs(td, unit_logs);
1596 int prev_log_left = log_left;
1598 for (i = 0; i < ALL_LOG_NR && log_left; i++) {
1599 struct log_type *lt = &log_types[i];
1602 if (!(log_mask & lt->mask)) {
1603 ret = lt->fn(td, log_left != 1, unit_logs);
1606 log_mask |= lt->mask;
1611 if (prev_log_left == log_left)
1615 td_restore_runstate(td, old_state);
1618 void fio_writeout_logs(bool unit_logs)
1620 struct thread_data *td;
1624 td_writeout_logs(td, unit_logs);