2 * Code related to writing an iolog of what a thread is doing, and to
3 * later read that back and replay
24 static int iolog_flush(struct io_log *log);
26 static const char iolog_ver2[] = "fio version 2 iolog";
28 void queue_io_piece(struct thread_data *td, struct io_piece *ipo)
30 flist_add_tail(&ipo->list, &td->io_log_list);
31 td->total_io_size += ipo->len;
34 void log_io_u(const struct thread_data *td, const struct io_u *io_u)
36 if (!td->o.write_iolog_file)
39 fprintf(td->iolog_f, "%s %s %llu %lu\n", io_u->file->file_name,
40 io_ddir_name(io_u->ddir),
41 io_u->offset, io_u->buflen);
44 void log_file(struct thread_data *td, struct fio_file *f,
45 enum file_log_act what)
47 const char *act[] = { "add", "open", "close" };
51 if (!td->o.write_iolog_file)
56 * this happens on the pre-open/close done before the job starts
61 fprintf(td->iolog_f, "%s %s\n", f->file_name, act[what]);
64 static void iolog_delay(struct thread_data *td, unsigned long delay)
66 uint64_t usec = utime_since_now(&td->last_issue);
70 if (delay < td->time_offset) {
75 delay -= td->time_offset;
81 fio_gettime(&ts, NULL);
82 while (delay && !td->terminate) {
84 if (this_delay > 500000)
87 usec_sleep(td, this_delay);
91 usec = utime_since_now(&ts);
93 td->time_offset = usec - delay;
98 static int ipo_special(struct thread_data *td, struct io_piece *ipo)
106 if (ipo->ddir != DDIR_INVAL)
109 f = td->files[ipo->fileno];
111 switch (ipo->file_action) {
112 case FIO_LOG_OPEN_FILE:
113 if (td->o.replay_redirect && fio_file_open(f)) {
114 dprint(FD_FILE, "iolog: ignoring re-open of file %s\n",
118 ret = td_io_open_file(td, f);
121 td_verror(td, ret, "iolog open file");
123 case FIO_LOG_CLOSE_FILE:
124 td_io_close_file(td, f);
126 case FIO_LOG_UNLINK_FILE:
127 td_io_unlink_file(td, f);
130 log_err("fio: bad file action %d\n", ipo->file_action);
137 int read_iolog_get(struct thread_data *td, struct io_u *io_u)
139 struct io_piece *ipo;
140 unsigned long elapsed;
142 while (!flist_empty(&td->io_log_list)) {
145 ipo = flist_first_entry(&td->io_log_list, struct io_piece, list);
146 flist_del(&ipo->list);
147 remove_trim_entry(td, ipo);
149 ret = ipo_special(td, ipo);
153 } else if (ret > 0) {
158 io_u->ddir = ipo->ddir;
159 if (ipo->ddir != DDIR_WAIT) {
160 io_u->offset = ipo->offset;
161 io_u->buflen = ipo->len;
162 io_u->file = td->files[ipo->fileno];
163 get_file(io_u->file);
164 dprint(FD_IO, "iolog: get %llu/%lu/%s\n", io_u->offset,
165 io_u->buflen, io_u->file->file_name);
167 iolog_delay(td, ipo->delay);
169 elapsed = mtime_since_genesis();
170 if (ipo->delay > elapsed)
171 usec_sleep(td, (ipo->delay - elapsed) * 1000);
176 if (io_u->ddir != DDIR_WAIT)
184 void prune_io_piece_log(struct thread_data *td)
186 struct io_piece *ipo;
189 while ((n = rb_first(&td->io_hist_tree)) != NULL) {
190 ipo = rb_entry(n, struct io_piece, rb_node);
191 rb_erase(n, &td->io_hist_tree);
192 remove_trim_entry(td, ipo);
197 while (!flist_empty(&td->io_hist_list)) {
198 ipo = flist_first_entry(&td->io_hist_list, struct io_piece, list);
199 flist_del(&ipo->list);
200 remove_trim_entry(td, ipo);
207 * log a successful write, so we can unwind the log for verify
209 void log_io_piece(struct thread_data *td, struct io_u *io_u)
211 struct rb_node **p, *parent;
212 struct io_piece *ipo, *__ipo;
214 ipo = malloc(sizeof(struct io_piece));
216 ipo->file = io_u->file;
217 ipo->offset = io_u->offset;
218 ipo->len = io_u->buflen;
219 ipo->numberio = io_u->numberio;
220 ipo->flags = IP_F_IN_FLIGHT;
224 if (io_u_should_trim(td, io_u)) {
225 flist_add_tail(&ipo->trim_list, &td->trim_list);
230 * We don't need to sort the entries if we only performed sequential
231 * writes. In this case, just reading back data in the order we wrote
232 * it out is the faster but still safe.
234 * One exception is if we don't have a random map in which case we need
235 * to check for duplicate blocks and drop the old one, which we rely on
236 * the rb insert/lookup for handling.
238 if (((!td->o.verifysort) || !td_random(td)) &&
239 file_randommap(td, ipo->file)) {
240 INIT_FLIST_HEAD(&ipo->list);
241 flist_add_tail(&ipo->list, &td->io_hist_list);
242 ipo->flags |= IP_F_ONLIST;
247 RB_CLEAR_NODE(&ipo->rb_node);
250 * Sort the entry into the verification list
253 p = &td->io_hist_tree.rb_node;
259 __ipo = rb_entry(parent, struct io_piece, rb_node);
260 if (ipo->file < __ipo->file)
262 else if (ipo->file > __ipo->file)
264 else if (ipo->offset < __ipo->offset) {
266 overlap = ipo->offset + ipo->len > __ipo->offset;
268 else if (ipo->offset > __ipo->offset) {
270 overlap = __ipo->offset + __ipo->len > ipo->offset;
276 dprint(FD_IO, "iolog: overlap %llu/%lu, %llu/%lu\n",
277 __ipo->offset, __ipo->len,
278 ipo->offset, ipo->len);
280 rb_erase(parent, &td->io_hist_tree);
281 remove_trim_entry(td, __ipo);
282 if (!(__ipo->flags & IP_F_IN_FLIGHT))
288 rb_link_node(&ipo->rb_node, parent, p);
289 rb_insert_color(&ipo->rb_node, &td->io_hist_tree);
290 ipo->flags |= IP_F_ONRB;
294 void unlog_io_piece(struct thread_data *td, struct io_u *io_u)
296 struct io_piece *ipo = io_u->ipo;
298 if (td->ts.nr_block_infos) {
299 uint32_t *info = io_u_block_info(td, io_u);
300 if (BLOCK_INFO_STATE(*info) < BLOCK_STATE_TRIM_FAILURE) {
301 if (io_u->ddir == DDIR_TRIM)
302 *info = BLOCK_INFO_SET_STATE(*info,
303 BLOCK_STATE_TRIM_FAILURE);
304 else if (io_u->ddir == DDIR_WRITE)
305 *info = BLOCK_INFO_SET_STATE(*info,
306 BLOCK_STATE_WRITE_FAILURE);
313 if (ipo->flags & IP_F_ONRB)
314 rb_erase(&ipo->rb_node, &td->io_hist_tree);
315 else if (ipo->flags & IP_F_ONLIST)
316 flist_del(&ipo->list);
323 void trim_io_piece(struct thread_data *td, const struct io_u *io_u)
325 struct io_piece *ipo = io_u->ipo;
330 ipo->len = io_u->xfer_buflen - io_u->resid;
333 void write_iolog_close(struct thread_data *td)
339 td->iolog_buf = NULL;
343 * Read version 2 iolog data. It is enhanced to include per-file logging,
346 static int read_iolog2(struct thread_data *td, FILE *f)
348 unsigned long long offset;
350 int reads, writes, waits, fileno = 0, file_action = 0; /* stupid gcc */
351 char *rfname, *fname, *act;
355 free_release_files(td);
358 * Read in the read iolog and store it, reuse the infrastructure
359 * for doing verifications.
362 rfname = fname = malloc(256+16);
363 act = malloc(256+16);
365 reads = writes = waits = 0;
366 while ((p = fgets(str, 4096, f)) != NULL) {
367 struct io_piece *ipo;
370 r = sscanf(p, "%256s %256s %llu %u", rfname, act, &offset,
373 if (td->o.replay_redirect)
374 fname = td->o.replay_redirect;
380 if (!strcmp(act, "wait"))
382 else if (!strcmp(act, "read"))
384 else if (!strcmp(act, "write"))
386 else if (!strcmp(act, "sync"))
388 else if (!strcmp(act, "datasync"))
390 else if (!strcmp(act, "trim"))
393 log_err("fio: bad iolog file action: %s\n",
397 fileno = get_fileno(td, fname);
400 if (!strcmp(act, "add")) {
401 if (td->o.replay_redirect &&
402 get_fileno(td, fname) != -1) {
403 dprint(FD_FILE, "iolog: ignoring"
404 " re-add of file %s\n", fname);
406 fileno = add_file(td, fname, 0, 1);
407 file_action = FIO_LOG_ADD_FILE;
410 } else if (!strcmp(act, "open")) {
411 fileno = get_fileno(td, fname);
412 file_action = FIO_LOG_OPEN_FILE;
413 } else if (!strcmp(act, "close")) {
414 fileno = get_fileno(td, fname);
415 file_action = FIO_LOG_CLOSE_FILE;
417 log_err("fio: bad iolog file action: %s\n",
422 log_err("bad iolog2: %s\n", p);
428 else if (rw == DDIR_WRITE) {
430 * Don't add a write for ro mode
435 } else if (rw == DDIR_WAIT) {
439 } else if (rw == DDIR_INVAL) {
440 } else if (!ddir_sync(rw)) {
441 log_err("bad ddir: %d\n", rw);
448 ipo = malloc(sizeof(*ipo));
451 if (rw == DDIR_WAIT) {
454 if (td->o.replay_scale)
455 ipo->offset = offset / td->o.replay_scale;
457 ipo->offset = offset;
458 ipo_bytes_align(td->o.replay_align, ipo);
461 if (rw != DDIR_INVAL && bytes > td->o.max_bs[rw])
462 td->o.max_bs[rw] = bytes;
463 ipo->fileno = fileno;
464 ipo->file_action = file_action;
468 queue_io_piece(td, ipo);
475 if (writes && read_only) {
476 log_err("fio: <%s> skips replay of %d writes due to"
477 " read-only\n", td->o.name, writes);
481 if (!reads && !writes && !waits)
483 else if (reads && !writes)
484 td->o.td_ddir = TD_DDIR_READ;
485 else if (!reads && writes)
486 td->o.td_ddir = TD_DDIR_WRITE;
488 td->o.td_ddir = TD_DDIR_RW;
494 * open iolog, check version, and call appropriate parser
496 static int init_iolog_read(struct thread_data *td)
498 char buffer[256], *p;
502 f = fopen(td->o.read_iolog_file, "r");
504 perror("fopen read iolog");
508 p = fgets(buffer, sizeof(buffer), f);
510 td_verror(td, errno, "iolog read");
511 log_err("fio: unable to read iolog\n");
517 * version 2 of the iolog stores a specific string as the
518 * first line, check for that
520 if (!strncmp(iolog_ver2, buffer, strlen(iolog_ver2)))
521 ret = read_iolog2(td, f);
523 log_err("fio: iolog version 1 is no longer supported\n");
532 * Set up a log for storing io patterns.
534 static int init_iolog_write(struct thread_data *td)
540 f = fopen(td->o.write_iolog_file, "a");
542 perror("fopen write iolog");
547 * That's it for writing, setup a log buffer and we're done.
550 td->iolog_buf = malloc(8192);
551 setvbuf(f, td->iolog_buf, _IOFBF, 8192);
554 * write our version line
556 if (fprintf(f, "%s\n", iolog_ver2) < 0) {
557 perror("iolog init\n");
562 * add all known files
564 for_each_file(td, ff, i)
565 log_file(td, ff, FIO_LOG_ADD_FILE);
570 int init_iolog(struct thread_data *td)
574 if (td->o.read_iolog_file) {
578 * Check if it's a blktrace file and load that if possible.
579 * Otherwise assume it's a normal log file and load that.
581 if (is_blktrace(td->o.read_iolog_file, &need_swap))
582 ret = load_blktrace(td, td->o.read_iolog_file, need_swap);
584 ret = init_iolog_read(td);
585 } else if (td->o.write_iolog_file)
586 ret = init_iolog_write(td);
589 td_verror(td, EINVAL, "failed initializing iolog");
594 void setup_log(struct io_log **log, struct log_params *p,
595 const char *filename)
599 struct io_u_plat_entry *entry;
600 struct flist_head *list;
602 l = scalloc(1, sizeof(*l));
603 INIT_FLIST_HEAD(&l->io_logs);
604 l->log_type = p->log_type;
605 l->log_offset = p->log_offset;
606 l->log_gz = p->log_gz;
607 l->log_gz_store = p->log_gz_store;
608 l->avg_msec = p->avg_msec;
609 l->hist_msec = p->hist_msec;
610 l->hist_coarseness = p->hist_coarseness;
611 l->filename = strdup(filename);
614 /* Initialize histogram lists for each r/w direction,
615 * with initial io_u_plat of all zeros:
617 for (i = 0; i < DDIR_RWDIR_CNT; i++) {
618 list = &l->hist_window[i].list;
619 INIT_FLIST_HEAD(list);
620 entry = calloc(1, sizeof(struct io_u_plat_entry));
621 flist_add(&entry->list, list);
624 if (l->td && l->td->o.io_submit_mode != IO_MODE_OFFLOAD) {
627 p = calloc(1, sizeof(*l->pending));
628 p->max_samples = DEF_LOG_ENTRIES;
629 p->log = calloc(p->max_samples, log_entry_sz(l));
634 l->log_ddir_mask = LOG_OFFSET_SAMPLE_BIT;
636 INIT_FLIST_HEAD(&l->chunk_list);
638 if (l->log_gz && !p->td)
640 else if (l->log_gz || l->log_gz_store) {
641 mutex_init_pshared(&l->chunk_lock);
642 mutex_init_pshared(&l->deferred_free_lock);
643 p->td->flags |= TD_F_COMPRESS_LOG;
649 #ifdef CONFIG_SETVBUF
650 static void *set_file_buffer(FILE *f)
652 size_t size = 1048576;
656 setvbuf(f, buf, _IOFBF, size);
660 static void clear_file_buffer(void *buf)
665 static void *set_file_buffer(FILE *f)
670 static void clear_file_buffer(void *buf)
675 void free_log(struct io_log *log)
677 while (!flist_empty(&log->io_logs)) {
678 struct io_logs *cur_log;
680 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
681 flist_del_init(&cur_log->list);
687 free(log->pending->log);
697 unsigned long hist_sum(int j, int stride, unsigned int *io_u_plat,
698 unsigned int *io_u_plat_last)
703 if (io_u_plat_last) {
704 for (k = sum = 0; k < stride; k++)
705 sum += io_u_plat[j + k] - io_u_plat_last[j + k];
707 for (k = sum = 0; k < stride; k++)
708 sum += io_u_plat[j + k];
714 static void flush_hist_samples(FILE *f, int hist_coarseness, void *samples,
715 uint64_t sample_size)
719 uint64_t i, j, nr_samples;
720 struct io_u_plat_entry *entry, *entry_before;
721 unsigned int *io_u_plat;
722 unsigned int *io_u_plat_before;
724 int stride = 1 << hist_coarseness;
729 s = __get_sample(samples, 0, 0);
730 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
732 nr_samples = sample_size / __log_entry_sz(log_offset);
734 for (i = 0; i < nr_samples; i++) {
735 s = __get_sample(samples, log_offset, i);
737 entry = s->data.plat_entry;
738 io_u_plat = entry->io_u_plat;
740 entry_before = flist_first_entry(&entry->list, struct io_u_plat_entry, list);
741 io_u_plat_before = entry_before->io_u_plat;
743 fprintf(f, "%lu, %u, %u, ", (unsigned long) s->time,
744 io_sample_ddir(s), s->bs);
745 for (j = 0; j < FIO_IO_U_PLAT_NR - stride; j += stride) {
746 fprintf(f, "%lu, ", hist_sum(j, stride, io_u_plat,
749 fprintf(f, "%lu\n", (unsigned long)
750 hist_sum(FIO_IO_U_PLAT_NR - stride, stride, io_u_plat,
753 flist_del(&entry_before->list);
758 void flush_samples(FILE *f, void *samples, uint64_t sample_size)
762 uint64_t i, nr_samples;
767 s = __get_sample(samples, 0, 0);
768 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
770 nr_samples = sample_size / __log_entry_sz(log_offset);
772 for (i = 0; i < nr_samples; i++) {
773 s = __get_sample(samples, log_offset, i);
776 fprintf(f, "%lu, %" PRId64 ", %u, %u\n",
777 (unsigned long) s->time,
779 io_sample_ddir(s), s->bs);
781 struct io_sample_offset *so = (void *) s;
783 fprintf(f, "%lu, %" PRId64 ", %u, %u, %llu\n",
784 (unsigned long) s->time,
786 io_sample_ddir(s), s->bs,
787 (unsigned long long) so->offset);
794 struct iolog_flush_data {
795 struct workqueue_work work;
802 #define GZ_CHUNK 131072
804 static struct iolog_compress *get_new_chunk(unsigned int seq)
806 struct iolog_compress *c;
808 c = malloc(sizeof(*c));
809 INIT_FLIST_HEAD(&c->list);
810 c->buf = malloc(GZ_CHUNK);
816 static void free_chunk(struct iolog_compress *ic)
822 static int z_stream_init(z_stream *stream, int gz_hdr)
826 memset(stream, 0, sizeof(*stream));
827 stream->zalloc = Z_NULL;
828 stream->zfree = Z_NULL;
829 stream->opaque = Z_NULL;
830 stream->next_in = Z_NULL;
833 * zlib magic - add 32 for auto-detection of gz header or not,
834 * if we decide to store files in a gzip friendly format.
839 if (inflateInit2(stream, wbits) != Z_OK)
845 struct inflate_chunk_iter {
854 static void finish_chunk(z_stream *stream, FILE *f,
855 struct inflate_chunk_iter *iter)
859 ret = inflateEnd(stream);
861 log_err("fio: failed to end log inflation seq %d (%d)\n",
864 flush_samples(f, iter->buf, iter->buf_used);
867 iter->buf_size = iter->buf_used = 0;
871 * Iterative chunk inflation. Handles cases where we cross into a new
872 * sequence, doing flush finish of previous chunk if needed.
874 static size_t inflate_chunk(struct iolog_compress *ic, int gz_hdr, FILE *f,
875 z_stream *stream, struct inflate_chunk_iter *iter)
879 dprint(FD_COMPRESS, "inflate chunk size=%lu, seq=%u\n",
880 (unsigned long) ic->len, ic->seq);
882 if (ic->seq != iter->seq) {
884 finish_chunk(stream, f, iter);
886 z_stream_init(stream, gz_hdr);
890 stream->avail_in = ic->len;
891 stream->next_in = ic->buf;
893 if (!iter->buf_size) {
894 iter->buf_size = iter->chunk_sz;
895 iter->buf = malloc(iter->buf_size);
898 while (stream->avail_in) {
899 size_t this_out = iter->buf_size - iter->buf_used;
902 stream->avail_out = this_out;
903 stream->next_out = iter->buf + iter->buf_used;
905 err = inflate(stream, Z_NO_FLUSH);
907 log_err("fio: failed inflating log: %d\n", err);
912 iter->buf_used += this_out - stream->avail_out;
914 if (!stream->avail_out) {
915 iter->buf_size += iter->chunk_sz;
916 iter->buf = realloc(iter->buf, iter->buf_size);
920 if (err == Z_STREAM_END)
924 ret = (void *) stream->next_in - ic->buf;
926 dprint(FD_COMPRESS, "inflated to size=%lu\n", (unsigned long) iter->buf_size);
932 * Inflate stored compressed chunks, or write them directly to the log
933 * file if so instructed.
935 static int inflate_gz_chunks(struct io_log *log, FILE *f)
937 struct inflate_chunk_iter iter = { .chunk_sz = log->log_gz, };
940 while (!flist_empty(&log->chunk_list)) {
941 struct iolog_compress *ic;
943 ic = flist_first_entry(&log->chunk_list, struct iolog_compress, list);
944 flist_del(&ic->list);
946 if (log->log_gz_store) {
949 dprint(FD_COMPRESS, "log write chunk size=%lu, "
950 "seq=%u\n", (unsigned long) ic->len, ic->seq);
952 ret = fwrite(ic->buf, ic->len, 1, f);
953 if (ret != 1 || ferror(f)) {
955 log_err("fio: error writing compressed log\n");
958 inflate_chunk(ic, log->log_gz_store, f, &stream, &iter);
964 finish_chunk(&stream, f, &iter);
972 * Open compressed log file and decompress the stored chunks and
973 * write them to stdout. The chunks are stored sequentially in the
974 * file, so we iterate over them and do them one-by-one.
976 int iolog_file_inflate(const char *file)
978 struct inflate_chunk_iter iter = { .chunk_sz = 64 * 1024 * 1024, };
979 struct iolog_compress ic;
987 f = fopen(file, "r");
993 if (stat(file, &sb) < 0) {
999 ic.buf = buf = malloc(sb.st_size);
1000 ic.len = sb.st_size;
1003 ret = fread(ic.buf, ic.len, 1, f);
1009 } else if (ret != 1) {
1010 log_err("fio: short read on reading log\n");
1019 * Each chunk will return Z_STREAM_END. We don't know how many
1020 * chunks are in the file, so we just keep looping and incrementing
1021 * the sequence number until we have consumed the whole compressed
1028 iret = inflate_chunk(&ic, 1, stdout, &stream, &iter);
1041 finish_chunk(&stream, stdout, &iter);
1051 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1056 int iolog_file_inflate(const char *file)
1058 log_err("fio: log inflation not possible without zlib\n");
1064 void flush_log(struct io_log *log, bool do_append)
1070 f = fopen(log->filename, "w");
1072 f = fopen(log->filename, "a");
1074 perror("fopen log");
1078 buf = set_file_buffer(f);
1080 inflate_gz_chunks(log, f);
1082 while (!flist_empty(&log->io_logs)) {
1083 struct io_logs *cur_log;
1085 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1086 flist_del_init(&cur_log->list);
1088 if (log->td && log == log->td->clat_hist_log)
1089 flush_hist_samples(f, log->hist_coarseness, cur_log->log,
1090 log_sample_sz(log, cur_log));
1092 flush_samples(f, cur_log->log, log_sample_sz(log, cur_log));
1098 clear_file_buffer(buf);
1101 static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
1103 if (td->flags & TD_F_COMPRESS_LOG)
1107 if (fio_trylock_file(log->filename))
1110 fio_lock_file(log->filename);
1112 if (td->client_type == FIO_CLIENT_TYPE_GUI || is_backend)
1113 fio_send_iolog(td, log, log->filename);
1115 flush_log(log, !td->o.per_job_logs);
1117 fio_unlock_file(log->filename);
1122 size_t log_chunk_sizes(struct io_log *log)
1124 struct flist_head *entry;
1127 if (flist_empty(&log->chunk_list))
1131 pthread_mutex_lock(&log->chunk_lock);
1132 flist_for_each(entry, &log->chunk_list) {
1133 struct iolog_compress *c;
1135 c = flist_entry(entry, struct iolog_compress, list);
1138 pthread_mutex_unlock(&log->chunk_lock);
1144 static bool warned_on_drop;
1146 static void iolog_put_deferred(struct io_log *log, void *ptr)
1151 pthread_mutex_lock(&log->deferred_free_lock);
1152 if (log->deferred < IOLOG_MAX_DEFER) {
1153 log->deferred_items[log->deferred] = ptr;
1155 } else if (!warned_on_drop) {
1156 log_err("fio: had to drop log entry free\n");
1157 warned_on_drop = true;
1159 pthread_mutex_unlock(&log->deferred_free_lock);
1162 static void iolog_free_deferred(struct io_log *log)
1169 pthread_mutex_lock(&log->deferred_free_lock);
1171 for (i = 0; i < log->deferred; i++) {
1172 free(log->deferred_items[i]);
1173 log->deferred_items[i] = NULL;
1177 pthread_mutex_unlock(&log->deferred_free_lock);
1180 static int gz_work(struct iolog_flush_data *data)
1182 struct iolog_compress *c = NULL;
1183 struct flist_head list;
1189 INIT_FLIST_HEAD(&list);
1191 memset(&stream, 0, sizeof(stream));
1192 stream.zalloc = Z_NULL;
1193 stream.zfree = Z_NULL;
1194 stream.opaque = Z_NULL;
1196 ret = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
1198 log_err("fio: failed to init gz stream\n");
1202 seq = ++data->log->chunk_seq;
1204 stream.next_in = (void *) data->samples;
1205 stream.avail_in = data->nr_samples * log_entry_sz(data->log);
1207 dprint(FD_COMPRESS, "deflate input size=%lu, seq=%u, log=%s\n",
1208 (unsigned long) stream.avail_in, seq,
1209 data->log->filename);
1212 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1213 (unsigned long) c->len);
1214 c = get_new_chunk(seq);
1215 stream.avail_out = GZ_CHUNK;
1216 stream.next_out = c->buf;
1217 ret = deflate(&stream, Z_NO_FLUSH);
1219 log_err("fio: deflate log (%d)\n", ret);
1224 c->len = GZ_CHUNK - stream.avail_out;
1225 flist_add_tail(&c->list, &list);
1227 } while (stream.avail_in);
1229 stream.next_out = c->buf + c->len;
1230 stream.avail_out = GZ_CHUNK - c->len;
1232 ret = deflate(&stream, Z_FINISH);
1235 * Z_BUF_ERROR is special, it just means we need more
1236 * output space. We'll handle that below. Treat any other
1239 if (ret != Z_BUF_ERROR) {
1240 log_err("fio: deflate log (%d)\n", ret);
1241 flist_del(&c->list);
1248 c->len = GZ_CHUNK - stream.avail_out;
1250 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq, (unsigned long) c->len);
1252 if (ret != Z_STREAM_END) {
1254 c = get_new_chunk(seq);
1255 stream.avail_out = GZ_CHUNK;
1256 stream.next_out = c->buf;
1257 ret = deflate(&stream, Z_FINISH);
1258 c->len = GZ_CHUNK - stream.avail_out;
1260 flist_add_tail(&c->list, &list);
1261 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1262 (unsigned long) c->len);
1263 } while (ret != Z_STREAM_END);
1266 dprint(FD_COMPRESS, "deflated to size=%lu\n", (unsigned long) total);
1268 ret = deflateEnd(&stream);
1270 log_err("fio: deflateEnd %d\n", ret);
1272 iolog_put_deferred(data->log, data->samples);
1274 if (!flist_empty(&list)) {
1275 pthread_mutex_lock(&data->log->chunk_lock);
1276 flist_splice_tail(&list, &data->log->chunk_list);
1277 pthread_mutex_unlock(&data->log->chunk_lock);
1286 while (!flist_empty(&list)) {
1287 c = flist_first_entry(list.next, struct iolog_compress, list);
1288 flist_del(&c->list);
1296 * Invoked from our compress helper thread, when logging would have exceeded
1297 * the specified memory limitation. Compresses the previously stored
1300 static int gz_work_async(struct submit_worker *sw, struct workqueue_work *work)
1302 return gz_work(container_of(work, struct iolog_flush_data, work));
1305 static int gz_init_worker(struct submit_worker *sw)
1307 struct thread_data *td = sw->wq->td;
1309 if (!fio_option_is_set(&td->o, log_gz_cpumask))
1312 if (fio_setaffinity(gettid(), td->o.log_gz_cpumask) == -1) {
1313 log_err("gz: failed to set CPU affinity\n");
1320 static struct workqueue_ops log_compress_wq_ops = {
1321 .fn = gz_work_async,
1322 .init_worker_fn = gz_init_worker,
1326 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1328 if (!(td->flags & TD_F_COMPRESS_LOG))
1331 workqueue_init(td, &td->log_compress_wq, &log_compress_wq_ops, 1, sk_out);
1335 void iolog_compress_exit(struct thread_data *td)
1337 if (!(td->flags & TD_F_COMPRESS_LOG))
1340 workqueue_exit(&td->log_compress_wq);
1344 * Queue work item to compress the existing log entries. We reset the
1345 * current log to a small size, and reference the existing log in the
1346 * data that we queue for compression. Once compression has been done,
1347 * this old log is freed. If called with finish == true, will not return
1348 * until the log compression has completed, and will flush all previous
1351 static int iolog_flush(struct io_log *log)
1353 struct iolog_flush_data *data;
1355 data = malloc(sizeof(*data));
1362 while (!flist_empty(&log->io_logs)) {
1363 struct io_logs *cur_log;
1365 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1366 flist_del_init(&cur_log->list);
1368 data->samples = cur_log->log;
1369 data->nr_samples = cur_log->nr_samples;
1380 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1382 struct iolog_flush_data *data;
1384 data = smalloc(sizeof(*data));
1390 data->samples = cur_log->log;
1391 data->nr_samples = cur_log->nr_samples;
1394 cur_log->nr_samples = cur_log->max_samples = 0;
1395 cur_log->log = NULL;
1397 workqueue_enqueue(&log->td->log_compress_wq, &data->work);
1399 iolog_free_deferred(log);
1405 static int iolog_flush(struct io_log *log)
1410 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1415 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1420 void iolog_compress_exit(struct thread_data *td)
1426 struct io_logs *iolog_cur_log(struct io_log *log)
1428 if (flist_empty(&log->io_logs))
1431 return flist_last_entry(&log->io_logs, struct io_logs, list);
1434 uint64_t iolog_nr_samples(struct io_log *iolog)
1436 struct flist_head *entry;
1439 flist_for_each(entry, &iolog->io_logs) {
1440 struct io_logs *cur_log;
1442 cur_log = flist_entry(entry, struct io_logs, list);
1443 ret += cur_log->nr_samples;
1449 static int __write_log(struct thread_data *td, struct io_log *log, int try)
1452 return finish_log(td, log, try);
1457 static int write_iops_log(struct thread_data *td, int try, bool unit_log)
1461 if (per_unit_log(td->iops_log) != unit_log)
1464 ret = __write_log(td, td->iops_log, try);
1466 td->iops_log = NULL;
1471 static int write_slat_log(struct thread_data *td, int try, bool unit_log)
1478 ret = __write_log(td, td->slat_log, try);
1480 td->slat_log = NULL;
1485 static int write_clat_log(struct thread_data *td, int try, bool unit_log)
1492 ret = __write_log(td, td->clat_log, try);
1494 td->clat_log = NULL;
1499 static int write_clat_hist_log(struct thread_data *td, int try, bool unit_log)
1506 ret = __write_log(td, td->clat_hist_log, try);
1508 td->clat_hist_log = NULL;
1513 static int write_lat_log(struct thread_data *td, int try, bool unit_log)
1520 ret = __write_log(td, td->lat_log, try);
1527 static int write_bandw_log(struct thread_data *td, int try, bool unit_log)
1531 if (per_unit_log(td->bw_log) != unit_log)
1534 ret = __write_log(td, td->bw_log, try);
1547 CLAT_HIST_LOG_MASK = 32,
1554 int (*fn)(struct thread_data *, int, bool);
1557 static struct log_type log_types[] = {
1559 .mask = BW_LOG_MASK,
1560 .fn = write_bandw_log,
1563 .mask = LAT_LOG_MASK,
1564 .fn = write_lat_log,
1567 .mask = SLAT_LOG_MASK,
1568 .fn = write_slat_log,
1571 .mask = CLAT_LOG_MASK,
1572 .fn = write_clat_log,
1575 .mask = IOPS_LOG_MASK,
1576 .fn = write_iops_log,
1579 .mask = CLAT_HIST_LOG_MASK,
1580 .fn = write_clat_hist_log,
1584 void td_writeout_logs(struct thread_data *td, bool unit_logs)
1586 unsigned int log_mask = 0;
1587 unsigned int log_left = ALL_LOG_NR;
1590 old_state = td_bump_runstate(td, TD_FINISHING);
1592 finalize_logs(td, unit_logs);
1595 int prev_log_left = log_left;
1597 for (i = 0; i < ALL_LOG_NR && log_left; i++) {
1598 struct log_type *lt = &log_types[i];
1601 if (!(log_mask & lt->mask)) {
1602 ret = lt->fn(td, log_left != 1, unit_logs);
1605 log_mask |= lt->mask;
1610 if (prev_log_left == log_left)
1614 td_restore_runstate(td, old_state);
1617 void fio_writeout_logs(bool unit_logs)
1619 struct thread_data *td;
1623 td_writeout_logs(td, unit_logs);