2 * Code related to writing an iolog of what a thread is doing, and to
3 * later read that back and replay
24 static int iolog_flush(struct io_log *log);
26 static const char iolog_ver2[] = "fio version 2 iolog";
28 void queue_io_piece(struct thread_data *td, struct io_piece *ipo)
30 flist_add_tail(&ipo->list, &td->io_log_list);
31 td->total_io_size += ipo->len;
34 void log_io_u(const struct thread_data *td, const struct io_u *io_u)
36 if (!td->o.write_iolog_file)
39 fprintf(td->iolog_f, "%s %s %llu %lu\n", io_u->file->file_name,
40 io_ddir_name(io_u->ddir),
41 io_u->offset, io_u->buflen);
44 void log_file(struct thread_data *td, struct fio_file *f,
45 enum file_log_act what)
47 const char *act[] = { "add", "open", "close" };
51 if (!td->o.write_iolog_file)
56 * this happens on the pre-open/close done before the job starts
61 fprintf(td->iolog_f, "%s %s\n", f->file_name, act[what]);
64 static void iolog_delay(struct thread_data *td, unsigned long delay)
66 uint64_t usec = utime_since_now(&td->last_issue);
70 if (delay < td->time_offset) {
75 delay -= td->time_offset;
81 fio_gettime(&ts, NULL);
82 while (delay && !td->terminate) {
84 if (this_delay > 500000)
87 usec_sleep(td, this_delay);
91 usec = utime_since_now(&ts);
93 td->time_offset = usec - delay;
98 static int ipo_special(struct thread_data *td, struct io_piece *ipo)
106 if (ipo->ddir != DDIR_INVAL)
109 f = td->files[ipo->fileno];
111 switch (ipo->file_action) {
112 case FIO_LOG_OPEN_FILE:
113 if (td->o.replay_redirect && fio_file_open(f)) {
114 dprint(FD_FILE, "iolog: ignoring re-open of file %s\n",
118 ret = td_io_open_file(td, f);
121 td_verror(td, ret, "iolog open file");
123 case FIO_LOG_CLOSE_FILE:
124 td_io_close_file(td, f);
126 case FIO_LOG_UNLINK_FILE:
127 td_io_unlink_file(td, f);
130 log_err("fio: bad file action %d\n", ipo->file_action);
137 int read_iolog_get(struct thread_data *td, struct io_u *io_u)
139 struct io_piece *ipo;
140 unsigned long elapsed;
142 while (!flist_empty(&td->io_log_list)) {
145 ipo = flist_first_entry(&td->io_log_list, struct io_piece, list);
146 flist_del(&ipo->list);
147 remove_trim_entry(td, ipo);
149 ret = ipo_special(td, ipo);
153 } else if (ret > 0) {
158 io_u->ddir = ipo->ddir;
159 if (ipo->ddir != DDIR_WAIT) {
160 io_u->offset = ipo->offset;
161 io_u->buflen = ipo->len;
162 io_u->file = td->files[ipo->fileno];
163 get_file(io_u->file);
164 dprint(FD_IO, "iolog: get %llu/%lu/%s\n", io_u->offset,
165 io_u->buflen, io_u->file->file_name);
167 iolog_delay(td, ipo->delay);
169 elapsed = mtime_since_genesis();
170 if (ipo->delay > elapsed)
171 usec_sleep(td, (ipo->delay - elapsed) * 1000);
176 if (io_u->ddir != DDIR_WAIT)
184 void prune_io_piece_log(struct thread_data *td)
186 struct io_piece *ipo;
189 while ((n = rb_first(&td->io_hist_tree)) != NULL) {
190 ipo = rb_entry(n, struct io_piece, rb_node);
191 rb_erase(n, &td->io_hist_tree);
192 remove_trim_entry(td, ipo);
197 while (!flist_empty(&td->io_hist_list)) {
198 ipo = flist_first_entry(&td->io_hist_list, struct io_piece, list);
199 flist_del(&ipo->list);
200 remove_trim_entry(td, ipo);
207 * log a successful write, so we can unwind the log for verify
209 void log_io_piece(struct thread_data *td, struct io_u *io_u)
211 struct rb_node **p, *parent;
212 struct io_piece *ipo, *__ipo;
214 ipo = malloc(sizeof(struct io_piece));
216 ipo->file = io_u->file;
217 ipo->offset = io_u->offset;
218 ipo->len = io_u->buflen;
219 ipo->numberio = io_u->numberio;
220 ipo->flags = IP_F_IN_FLIGHT;
224 if (io_u_should_trim(td, io_u)) {
225 flist_add_tail(&ipo->trim_list, &td->trim_list);
230 * We don't need to sort the entries if we only performed sequential
231 * writes. In this case, just reading back data in the order we wrote
232 * it out is the faster but still safe.
234 * One exception is if we don't have a random map in which case we need
235 * to check for duplicate blocks and drop the old one, which we rely on
236 * the rb insert/lookup for handling.
238 if (((!td->o.verifysort) || !td_random(td)) &&
239 file_randommap(td, ipo->file)) {
240 INIT_FLIST_HEAD(&ipo->list);
241 flist_add_tail(&ipo->list, &td->io_hist_list);
242 ipo->flags |= IP_F_ONLIST;
247 RB_CLEAR_NODE(&ipo->rb_node);
250 * Sort the entry into the verification list
253 p = &td->io_hist_tree.rb_node;
259 __ipo = rb_entry(parent, struct io_piece, rb_node);
260 if (ipo->file < __ipo->file)
262 else if (ipo->file > __ipo->file)
264 else if (ipo->offset < __ipo->offset) {
266 overlap = ipo->offset + ipo->len > __ipo->offset;
268 else if (ipo->offset > __ipo->offset) {
270 overlap = __ipo->offset + __ipo->len > ipo->offset;
276 dprint(FD_IO, "iolog: overlap %llu/%lu, %llu/%lu\n",
277 __ipo->offset, __ipo->len,
278 ipo->offset, ipo->len);
280 rb_erase(parent, &td->io_hist_tree);
281 remove_trim_entry(td, __ipo);
282 if (!(__ipo->flags & IP_F_IN_FLIGHT))
288 rb_link_node(&ipo->rb_node, parent, p);
289 rb_insert_color(&ipo->rb_node, &td->io_hist_tree);
290 ipo->flags |= IP_F_ONRB;
294 void unlog_io_piece(struct thread_data *td, struct io_u *io_u)
296 struct io_piece *ipo = io_u->ipo;
298 if (td->ts.nr_block_infos) {
299 uint32_t *info = io_u_block_info(td, io_u);
300 if (BLOCK_INFO_STATE(*info) < BLOCK_STATE_TRIM_FAILURE) {
301 if (io_u->ddir == DDIR_TRIM)
302 *info = BLOCK_INFO_SET_STATE(*info,
303 BLOCK_STATE_TRIM_FAILURE);
304 else if (io_u->ddir == DDIR_WRITE)
305 *info = BLOCK_INFO_SET_STATE(*info,
306 BLOCK_STATE_WRITE_FAILURE);
313 if (ipo->flags & IP_F_ONRB)
314 rb_erase(&ipo->rb_node, &td->io_hist_tree);
315 else if (ipo->flags & IP_F_ONLIST)
316 flist_del(&ipo->list);
323 void trim_io_piece(struct thread_data *td, const struct io_u *io_u)
325 struct io_piece *ipo = io_u->ipo;
330 ipo->len = io_u->xfer_buflen - io_u->resid;
333 void write_iolog_close(struct thread_data *td)
339 td->iolog_buf = NULL;
343 * Read version 2 iolog data. It is enhanced to include per-file logging,
346 static int read_iolog2(struct thread_data *td, FILE *f)
348 unsigned long long offset;
350 int reads, writes, waits, fileno = 0, file_action = 0; /* stupid gcc */
351 char *rfname, *fname, *act;
355 free_release_files(td);
358 * Read in the read iolog and store it, reuse the infrastructure
359 * for doing verifications.
362 rfname = fname = malloc(256+16);
363 act = malloc(256+16);
365 reads = writes = waits = 0;
366 while ((p = fgets(str, 4096, f)) != NULL) {
367 struct io_piece *ipo;
370 r = sscanf(p, "%256s %256s %llu %u", rfname, act, &offset,
373 if (td->o.replay_redirect)
374 fname = td->o.replay_redirect;
380 if (!strcmp(act, "wait"))
382 else if (!strcmp(act, "read"))
384 else if (!strcmp(act, "write"))
386 else if (!strcmp(act, "sync"))
388 else if (!strcmp(act, "datasync"))
390 else if (!strcmp(act, "trim"))
393 log_err("fio: bad iolog file action: %s\n",
397 fileno = get_fileno(td, fname);
400 if (!strcmp(act, "add")) {
401 if (td->o.replay_redirect &&
402 get_fileno(td, fname) != -1) {
403 dprint(FD_FILE, "iolog: ignoring"
404 " re-add of file %s\n", fname);
406 fileno = add_file(td, fname, 0, 1);
407 file_action = FIO_LOG_ADD_FILE;
410 } else if (!strcmp(act, "open")) {
411 fileno = get_fileno(td, fname);
412 file_action = FIO_LOG_OPEN_FILE;
413 } else if (!strcmp(act, "close")) {
414 fileno = get_fileno(td, fname);
415 file_action = FIO_LOG_CLOSE_FILE;
417 log_err("fio: bad iolog file action: %s\n",
422 log_err("bad iolog2: %s\n", p);
428 else if (rw == DDIR_WRITE) {
430 * Don't add a write for ro mode
435 } else if (rw == DDIR_WAIT) {
439 } else if (rw == DDIR_INVAL) {
440 } else if (!ddir_sync(rw)) {
441 log_err("bad ddir: %d\n", rw);
448 ipo = malloc(sizeof(*ipo));
451 if (rw == DDIR_WAIT) {
454 if (td->o.replay_scale)
455 ipo->offset = offset / td->o.replay_scale;
457 ipo->offset = offset;
458 ipo_bytes_align(td->o.replay_align, ipo);
461 if (rw != DDIR_INVAL && bytes > td->o.max_bs[rw])
462 td->o.max_bs[rw] = bytes;
463 ipo->fileno = fileno;
464 ipo->file_action = file_action;
468 queue_io_piece(td, ipo);
475 if (writes && read_only) {
476 log_err("fio: <%s> skips replay of %d writes due to"
477 " read-only\n", td->o.name, writes);
481 if (!reads && !writes && !waits)
483 else if (reads && !writes)
484 td->o.td_ddir = TD_DDIR_READ;
485 else if (!reads && writes)
486 td->o.td_ddir = TD_DDIR_WRITE;
488 td->o.td_ddir = TD_DDIR_RW;
494 * open iolog, check version, and call appropriate parser
496 static int init_iolog_read(struct thread_data *td)
498 char buffer[256], *p;
502 f = fopen(td->o.read_iolog_file, "r");
504 perror("fopen read iolog");
508 p = fgets(buffer, sizeof(buffer), f);
510 td_verror(td, errno, "iolog read");
511 log_err("fio: unable to read iolog\n");
517 * version 2 of the iolog stores a specific string as the
518 * first line, check for that
520 if (!strncmp(iolog_ver2, buffer, strlen(iolog_ver2)))
521 ret = read_iolog2(td, f);
523 log_err("fio: iolog version 1 is no longer supported\n");
532 * Set up a log for storing io patterns.
534 static int init_iolog_write(struct thread_data *td)
540 f = fopen(td->o.write_iolog_file, "a");
542 perror("fopen write iolog");
547 * That's it for writing, setup a log buffer and we're done.
550 td->iolog_buf = malloc(8192);
551 setvbuf(f, td->iolog_buf, _IOFBF, 8192);
554 * write our version line
556 if (fprintf(f, "%s\n", iolog_ver2) < 0) {
557 perror("iolog init\n");
562 * add all known files
564 for_each_file(td, ff, i)
565 log_file(td, ff, FIO_LOG_ADD_FILE);
570 int init_iolog(struct thread_data *td)
574 if (td->o.read_iolog_file) {
578 * Check if it's a blktrace file and load that if possible.
579 * Otherwise assume it's a normal log file and load that.
581 if (is_blktrace(td->o.read_iolog_file, &need_swap))
582 ret = load_blktrace(td, td->o.read_iolog_file, need_swap);
584 ret = init_iolog_read(td);
585 } else if (td->o.write_iolog_file)
586 ret = init_iolog_write(td);
589 td_verror(td, EINVAL, "failed initializing iolog");
594 void setup_log(struct io_log **log, struct log_params *p,
595 const char *filename)
599 struct io_u_plat_entry *entry;
600 struct flist_head *list;
602 l = scalloc(1, sizeof(*l));
603 INIT_FLIST_HEAD(&l->io_logs);
604 l->log_type = p->log_type;
605 l->log_offset = p->log_offset;
606 l->log_gz = p->log_gz;
607 l->log_gz_store = p->log_gz_store;
608 l->avg_msec = p->avg_msec;
609 l->hist_msec = p->hist_msec;
610 l->hist_coarseness = p->hist_coarseness;
611 l->filename = strdup(filename);
614 /* Initialize histogram lists for each r/w direction,
615 * with initial io_u_plat of all zeros:
617 for (i = 0; i < DDIR_RWDIR_CNT; i++) {
618 list = &l->hist_window[i].list;
619 INIT_FLIST_HEAD(list);
620 entry = calloc(1, sizeof(struct io_u_plat_entry));
621 flist_add(&entry->list, list);
624 if (l->td && l->td->o.io_submit_mode != IO_MODE_OFFLOAD) {
627 p = calloc(1, sizeof(*l->pending));
628 p->max_samples = DEF_LOG_ENTRIES;
629 p->log = calloc(p->max_samples, log_entry_sz(l));
634 l->log_ddir_mask = LOG_OFFSET_SAMPLE_BIT;
636 INIT_FLIST_HEAD(&l->chunk_list);
638 if (l->log_gz && !p->td)
640 else if (l->log_gz || l->log_gz_store) {
641 mutex_init_pshared(&l->chunk_lock);
642 mutex_init_pshared(&l->deferred_free_lock);
643 p->td->flags |= TD_F_COMPRESS_LOG;
649 #ifdef CONFIG_SETVBUF
650 static void *set_file_buffer(FILE *f)
652 size_t size = 1048576;
656 setvbuf(f, buf, _IOFBF, size);
660 static void clear_file_buffer(void *buf)
665 static void *set_file_buffer(FILE *f)
670 static void clear_file_buffer(void *buf)
675 void free_log(struct io_log *log)
677 while (!flist_empty(&log->io_logs)) {
678 struct io_logs *cur_log;
680 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
681 flist_del_init(&cur_log->list);
687 free(log->pending->log);
697 uint64_t hist_sum(int j, int stride, uint64_t *io_u_plat,
698 uint64_t *io_u_plat_last)
703 if (io_u_plat_last) {
704 for (k = sum = 0; k < stride; k++)
705 sum += io_u_plat[j + k] - io_u_plat_last[j + k];
707 for (k = sum = 0; k < stride; k++)
708 sum += io_u_plat[j + k];
714 static void flush_hist_samples(FILE *f, int hist_coarseness, void *samples,
715 uint64_t sample_size)
719 uint64_t i, j, nr_samples;
720 struct io_u_plat_entry *entry, *entry_before;
722 uint64_t *io_u_plat_before;
724 int stride = 1 << hist_coarseness;
729 s = __get_sample(samples, 0, 0);
730 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
732 nr_samples = sample_size / __log_entry_sz(log_offset);
734 for (i = 0; i < nr_samples; i++) {
735 s = __get_sample(samples, log_offset, i);
737 entry = s->data.plat_entry;
738 io_u_plat = entry->io_u_plat;
740 entry_before = flist_first_entry(&entry->list, struct io_u_plat_entry, list);
741 io_u_plat_before = entry_before->io_u_plat;
743 fprintf(f, "%lu, %u, %u, ", (unsigned long) s->time,
744 io_sample_ddir(s), s->bs);
745 for (j = 0; j < FIO_IO_U_PLAT_NR - stride; j += stride) {
746 fprintf(f, "%llu, ", (unsigned long long)
747 hist_sum(j, stride, io_u_plat, io_u_plat_before));
749 fprintf(f, "%llu\n", (unsigned long long)
750 hist_sum(FIO_IO_U_PLAT_NR - stride, stride, io_u_plat,
753 flist_del(&entry_before->list);
758 void flush_samples(FILE *f, void *samples, uint64_t sample_size)
762 uint64_t i, nr_samples;
767 s = __get_sample(samples, 0, 0);
768 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
770 nr_samples = sample_size / __log_entry_sz(log_offset);
772 for (i = 0; i < nr_samples; i++) {
773 s = __get_sample(samples, log_offset, i);
776 fprintf(f, "%lu, %" PRId64 ", %u, %u\n",
777 (unsigned long) s->time,
779 io_sample_ddir(s), s->bs);
781 struct io_sample_offset *so = (void *) s;
783 fprintf(f, "%lu, %" PRId64 ", %u, %u, %llu\n",
784 (unsigned long) s->time,
786 io_sample_ddir(s), s->bs,
787 (unsigned long long) so->offset);
794 struct iolog_flush_data {
795 struct workqueue_work work;
802 #define GZ_CHUNK 131072
804 static struct iolog_compress *get_new_chunk(unsigned int seq)
806 struct iolog_compress *c;
808 c = malloc(sizeof(*c));
809 INIT_FLIST_HEAD(&c->list);
810 c->buf = malloc(GZ_CHUNK);
816 static void free_chunk(struct iolog_compress *ic)
822 static int z_stream_init(z_stream *stream, int gz_hdr)
826 memset(stream, 0, sizeof(*stream));
827 stream->zalloc = Z_NULL;
828 stream->zfree = Z_NULL;
829 stream->opaque = Z_NULL;
830 stream->next_in = Z_NULL;
833 * zlib magic - add 32 for auto-detection of gz header or not,
834 * if we decide to store files in a gzip friendly format.
839 if (inflateInit2(stream, wbits) != Z_OK)
845 struct inflate_chunk_iter {
854 static void finish_chunk(z_stream *stream, FILE *f,
855 struct inflate_chunk_iter *iter)
859 ret = inflateEnd(stream);
861 log_err("fio: failed to end log inflation seq %d (%d)\n",
864 flush_samples(f, iter->buf, iter->buf_used);
867 iter->buf_size = iter->buf_used = 0;
871 * Iterative chunk inflation. Handles cases where we cross into a new
872 * sequence, doing flush finish of previous chunk if needed.
874 static size_t inflate_chunk(struct iolog_compress *ic, int gz_hdr, FILE *f,
875 z_stream *stream, struct inflate_chunk_iter *iter)
879 dprint(FD_COMPRESS, "inflate chunk size=%lu, seq=%u\n",
880 (unsigned long) ic->len, ic->seq);
882 if (ic->seq != iter->seq) {
884 finish_chunk(stream, f, iter);
886 z_stream_init(stream, gz_hdr);
890 stream->avail_in = ic->len;
891 stream->next_in = ic->buf;
893 if (!iter->buf_size) {
894 iter->buf_size = iter->chunk_sz;
895 iter->buf = malloc(iter->buf_size);
898 while (stream->avail_in) {
899 size_t this_out = iter->buf_size - iter->buf_used;
902 stream->avail_out = this_out;
903 stream->next_out = iter->buf + iter->buf_used;
905 err = inflate(stream, Z_NO_FLUSH);
907 log_err("fio: failed inflating log: %d\n", err);
912 iter->buf_used += this_out - stream->avail_out;
914 if (!stream->avail_out) {
915 iter->buf_size += iter->chunk_sz;
916 iter->buf = realloc(iter->buf, iter->buf_size);
920 if (err == Z_STREAM_END)
924 ret = (void *) stream->next_in - ic->buf;
926 dprint(FD_COMPRESS, "inflated to size=%lu\n", (unsigned long) iter->buf_size);
932 * Inflate stored compressed chunks, or write them directly to the log
933 * file if so instructed.
935 static int inflate_gz_chunks(struct io_log *log, FILE *f)
937 struct inflate_chunk_iter iter = { .chunk_sz = log->log_gz, };
940 while (!flist_empty(&log->chunk_list)) {
941 struct iolog_compress *ic;
943 ic = flist_first_entry(&log->chunk_list, struct iolog_compress, list);
944 flist_del(&ic->list);
946 if (log->log_gz_store) {
949 dprint(FD_COMPRESS, "log write chunk size=%lu, "
950 "seq=%u\n", (unsigned long) ic->len, ic->seq);
952 ret = fwrite(ic->buf, ic->len, 1, f);
953 if (ret != 1 || ferror(f)) {
955 log_err("fio: error writing compressed log\n");
958 inflate_chunk(ic, log->log_gz_store, f, &stream, &iter);
964 finish_chunk(&stream, f, &iter);
972 * Open compressed log file and decompress the stored chunks and
973 * write them to stdout. The chunks are stored sequentially in the
974 * file, so we iterate over them and do them one-by-one.
976 int iolog_file_inflate(const char *file)
978 struct inflate_chunk_iter iter = { .chunk_sz = 64 * 1024 * 1024, };
979 struct iolog_compress ic;
987 f = fopen(file, "r");
993 if (stat(file, &sb) < 0) {
999 ic.buf = buf = malloc(sb.st_size);
1000 ic.len = sb.st_size;
1003 ret = fread(ic.buf, ic.len, 1, f);
1009 } else if (ret != 1) {
1010 log_err("fio: short read on reading log\n");
1019 * Each chunk will return Z_STREAM_END. We don't know how many
1020 * chunks are in the file, so we just keep looping and incrementing
1021 * the sequence number until we have consumed the whole compressed
1028 iret = inflate_chunk(&ic, 1, stdout, &stream, &iter);
1041 finish_chunk(&stream, stdout, &iter);
1051 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1056 int iolog_file_inflate(const char *file)
1058 log_err("fio: log inflation not possible without zlib\n");
1064 void flush_log(struct io_log *log, bool do_append)
1070 f = fopen(log->filename, "w");
1072 f = fopen(log->filename, "a");
1074 perror("fopen log");
1078 buf = set_file_buffer(f);
1080 inflate_gz_chunks(log, f);
1082 while (!flist_empty(&log->io_logs)) {
1083 struct io_logs *cur_log;
1085 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1086 flist_del_init(&cur_log->list);
1088 if (log->td && log == log->td->clat_hist_log)
1089 flush_hist_samples(f, log->hist_coarseness, cur_log->log,
1090 log_sample_sz(log, cur_log));
1092 flush_samples(f, cur_log->log, log_sample_sz(log, cur_log));
1098 clear_file_buffer(buf);
1101 static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
1103 if (td->flags & TD_F_COMPRESS_LOG)
1107 if (fio_trylock_file(log->filename))
1110 fio_lock_file(log->filename);
1112 if (td->client_type == FIO_CLIENT_TYPE_GUI || is_backend)
1113 fio_send_iolog(td, log, log->filename);
1115 flush_log(log, !td->o.per_job_logs);
1117 fio_unlock_file(log->filename);
1122 size_t log_chunk_sizes(struct io_log *log)
1124 struct flist_head *entry;
1127 if (flist_empty(&log->chunk_list))
1131 pthread_mutex_lock(&log->chunk_lock);
1132 flist_for_each(entry, &log->chunk_list) {
1133 struct iolog_compress *c;
1135 c = flist_entry(entry, struct iolog_compress, list);
1138 pthread_mutex_unlock(&log->chunk_lock);
1144 static void iolog_put_deferred(struct io_log *log, void *ptr)
1149 pthread_mutex_lock(&log->deferred_free_lock);
1150 if (log->deferred < IOLOG_MAX_DEFER) {
1151 log->deferred_items[log->deferred] = ptr;
1153 } else if (!fio_did_warn(FIO_WARN_IOLOG_DROP))
1154 log_err("fio: had to drop log entry free\n");
1155 pthread_mutex_unlock(&log->deferred_free_lock);
1158 static void iolog_free_deferred(struct io_log *log)
1165 pthread_mutex_lock(&log->deferred_free_lock);
1167 for (i = 0; i < log->deferred; i++) {
1168 free(log->deferred_items[i]);
1169 log->deferred_items[i] = NULL;
1173 pthread_mutex_unlock(&log->deferred_free_lock);
1176 static int gz_work(struct iolog_flush_data *data)
1178 struct iolog_compress *c = NULL;
1179 struct flist_head list;
1185 INIT_FLIST_HEAD(&list);
1187 memset(&stream, 0, sizeof(stream));
1188 stream.zalloc = Z_NULL;
1189 stream.zfree = Z_NULL;
1190 stream.opaque = Z_NULL;
1192 ret = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
1194 log_err("fio: failed to init gz stream\n");
1198 seq = ++data->log->chunk_seq;
1200 stream.next_in = (void *) data->samples;
1201 stream.avail_in = data->nr_samples * log_entry_sz(data->log);
1203 dprint(FD_COMPRESS, "deflate input size=%lu, seq=%u, log=%s\n",
1204 (unsigned long) stream.avail_in, seq,
1205 data->log->filename);
1208 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1209 (unsigned long) c->len);
1210 c = get_new_chunk(seq);
1211 stream.avail_out = GZ_CHUNK;
1212 stream.next_out = c->buf;
1213 ret = deflate(&stream, Z_NO_FLUSH);
1215 log_err("fio: deflate log (%d)\n", ret);
1220 c->len = GZ_CHUNK - stream.avail_out;
1221 flist_add_tail(&c->list, &list);
1223 } while (stream.avail_in);
1225 stream.next_out = c->buf + c->len;
1226 stream.avail_out = GZ_CHUNK - c->len;
1228 ret = deflate(&stream, Z_FINISH);
1231 * Z_BUF_ERROR is special, it just means we need more
1232 * output space. We'll handle that below. Treat any other
1235 if (ret != Z_BUF_ERROR) {
1236 log_err("fio: deflate log (%d)\n", ret);
1237 flist_del(&c->list);
1244 c->len = GZ_CHUNK - stream.avail_out;
1246 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq, (unsigned long) c->len);
1248 if (ret != Z_STREAM_END) {
1250 c = get_new_chunk(seq);
1251 stream.avail_out = GZ_CHUNK;
1252 stream.next_out = c->buf;
1253 ret = deflate(&stream, Z_FINISH);
1254 c->len = GZ_CHUNK - stream.avail_out;
1256 flist_add_tail(&c->list, &list);
1257 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1258 (unsigned long) c->len);
1259 } while (ret != Z_STREAM_END);
1262 dprint(FD_COMPRESS, "deflated to size=%lu\n", (unsigned long) total);
1264 ret = deflateEnd(&stream);
1266 log_err("fio: deflateEnd %d\n", ret);
1268 iolog_put_deferred(data->log, data->samples);
1270 if (!flist_empty(&list)) {
1271 pthread_mutex_lock(&data->log->chunk_lock);
1272 flist_splice_tail(&list, &data->log->chunk_list);
1273 pthread_mutex_unlock(&data->log->chunk_lock);
1282 while (!flist_empty(&list)) {
1283 c = flist_first_entry(list.next, struct iolog_compress, list);
1284 flist_del(&c->list);
1292 * Invoked from our compress helper thread, when logging would have exceeded
1293 * the specified memory limitation. Compresses the previously stored
1296 static int gz_work_async(struct submit_worker *sw, struct workqueue_work *work)
1298 return gz_work(container_of(work, struct iolog_flush_data, work));
1301 static int gz_init_worker(struct submit_worker *sw)
1303 struct thread_data *td = sw->wq->td;
1305 if (!fio_option_is_set(&td->o, log_gz_cpumask))
1308 if (fio_setaffinity(gettid(), td->o.log_gz_cpumask) == -1) {
1309 log_err("gz: failed to set CPU affinity\n");
1316 static struct workqueue_ops log_compress_wq_ops = {
1317 .fn = gz_work_async,
1318 .init_worker_fn = gz_init_worker,
1322 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1324 if (!(td->flags & TD_F_COMPRESS_LOG))
1327 workqueue_init(td, &td->log_compress_wq, &log_compress_wq_ops, 1, sk_out);
1331 void iolog_compress_exit(struct thread_data *td)
1333 if (!(td->flags & TD_F_COMPRESS_LOG))
1336 workqueue_exit(&td->log_compress_wq);
1340 * Queue work item to compress the existing log entries. We reset the
1341 * current log to a small size, and reference the existing log in the
1342 * data that we queue for compression. Once compression has been done,
1343 * this old log is freed. If called with finish == true, will not return
1344 * until the log compression has completed, and will flush all previous
1347 static int iolog_flush(struct io_log *log)
1349 struct iolog_flush_data *data;
1351 data = malloc(sizeof(*data));
1358 while (!flist_empty(&log->io_logs)) {
1359 struct io_logs *cur_log;
1361 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1362 flist_del_init(&cur_log->list);
1364 data->samples = cur_log->log;
1365 data->nr_samples = cur_log->nr_samples;
1376 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1378 struct iolog_flush_data *data;
1380 data = smalloc(sizeof(*data));
1386 data->samples = cur_log->log;
1387 data->nr_samples = cur_log->nr_samples;
1390 cur_log->nr_samples = cur_log->max_samples = 0;
1391 cur_log->log = NULL;
1393 workqueue_enqueue(&log->td->log_compress_wq, &data->work);
1395 iolog_free_deferred(log);
1401 static int iolog_flush(struct io_log *log)
1406 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1411 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1416 void iolog_compress_exit(struct thread_data *td)
1422 struct io_logs *iolog_cur_log(struct io_log *log)
1424 if (flist_empty(&log->io_logs))
1427 return flist_last_entry(&log->io_logs, struct io_logs, list);
1430 uint64_t iolog_nr_samples(struct io_log *iolog)
1432 struct flist_head *entry;
1435 flist_for_each(entry, &iolog->io_logs) {
1436 struct io_logs *cur_log;
1438 cur_log = flist_entry(entry, struct io_logs, list);
1439 ret += cur_log->nr_samples;
1445 static int __write_log(struct thread_data *td, struct io_log *log, int try)
1448 return finish_log(td, log, try);
1453 static int write_iops_log(struct thread_data *td, int try, bool unit_log)
1457 if (per_unit_log(td->iops_log) != unit_log)
1460 ret = __write_log(td, td->iops_log, try);
1462 td->iops_log = NULL;
1467 static int write_slat_log(struct thread_data *td, int try, bool unit_log)
1474 ret = __write_log(td, td->slat_log, try);
1476 td->slat_log = NULL;
1481 static int write_clat_log(struct thread_data *td, int try, bool unit_log)
1488 ret = __write_log(td, td->clat_log, try);
1490 td->clat_log = NULL;
1495 static int write_clat_hist_log(struct thread_data *td, int try, bool unit_log)
1502 ret = __write_log(td, td->clat_hist_log, try);
1504 td->clat_hist_log = NULL;
1509 static int write_lat_log(struct thread_data *td, int try, bool unit_log)
1516 ret = __write_log(td, td->lat_log, try);
1523 static int write_bandw_log(struct thread_data *td, int try, bool unit_log)
1527 if (per_unit_log(td->bw_log) != unit_log)
1530 ret = __write_log(td, td->bw_log, try);
1543 CLAT_HIST_LOG_MASK = 32,
1550 int (*fn)(struct thread_data *, int, bool);
1553 static struct log_type log_types[] = {
1555 .mask = BW_LOG_MASK,
1556 .fn = write_bandw_log,
1559 .mask = LAT_LOG_MASK,
1560 .fn = write_lat_log,
1563 .mask = SLAT_LOG_MASK,
1564 .fn = write_slat_log,
1567 .mask = CLAT_LOG_MASK,
1568 .fn = write_clat_log,
1571 .mask = IOPS_LOG_MASK,
1572 .fn = write_iops_log,
1575 .mask = CLAT_HIST_LOG_MASK,
1576 .fn = write_clat_hist_log,
1580 void td_writeout_logs(struct thread_data *td, bool unit_logs)
1582 unsigned int log_mask = 0;
1583 unsigned int log_left = ALL_LOG_NR;
1586 old_state = td_bump_runstate(td, TD_FINISHING);
1588 finalize_logs(td, unit_logs);
1591 int prev_log_left = log_left;
1593 for (i = 0; i < ALL_LOG_NR && log_left; i++) {
1594 struct log_type *lt = &log_types[i];
1597 if (!(log_mask & lt->mask)) {
1598 ret = lt->fn(td, log_left != 1, unit_logs);
1601 log_mask |= lt->mask;
1606 if (prev_log_left == log_left)
1610 td_restore_runstate(td, old_state);
1613 void fio_writeout_logs(bool unit_logs)
1615 struct thread_data *td;
1619 td_writeout_logs(td, unit_logs);