2 * Code related to writing an iolog of what a thread is doing, and to
3 * later read that back and replay
23 static int iolog_flush(struct io_log *log);
25 static const char iolog_ver2[] = "fio version 2 iolog";
27 void queue_io_piece(struct thread_data *td, struct io_piece *ipo)
29 flist_add_tail(&ipo->list, &td->io_log_list);
30 td->total_io_size += ipo->len;
33 void log_io_u(const struct thread_data *td, const struct io_u *io_u)
35 if (!td->o.write_iolog_file)
38 fprintf(td->iolog_f, "%s %s %llu %lu\n", io_u->file->file_name,
39 io_ddir_name(io_u->ddir),
40 io_u->offset, io_u->buflen);
43 void log_file(struct thread_data *td, struct fio_file *f,
44 enum file_log_act what)
46 const char *act[] = { "add", "open", "close" };
50 if (!td->o.write_iolog_file)
55 * this happens on the pre-open/close done before the job starts
60 fprintf(td->iolog_f, "%s %s\n", f->file_name, act[what]);
63 static void iolog_delay(struct thread_data *td, unsigned long delay)
65 uint64_t usec = utime_since_now(&td->last_issue);
69 if (delay < td->time_offset) {
74 delay -= td->time_offset;
80 fio_gettime(&tv, NULL);
81 while (delay && !td->terminate) {
83 if (this_delay > 500000)
86 usec_sleep(td, this_delay);
90 usec = utime_since_now(&tv);
92 td->time_offset = usec - delay;
97 static int ipo_special(struct thread_data *td, struct io_piece *ipo)
105 if (ipo->ddir != DDIR_INVAL)
108 f = td->files[ipo->fileno];
110 switch (ipo->file_action) {
111 case FIO_LOG_OPEN_FILE:
112 ret = td_io_open_file(td, f);
115 td_verror(td, ret, "iolog open file");
117 case FIO_LOG_CLOSE_FILE:
118 td_io_close_file(td, f);
120 case FIO_LOG_UNLINK_FILE:
121 td_io_unlink_file(td, f);
124 log_err("fio: bad file action %d\n", ipo->file_action);
131 int read_iolog_get(struct thread_data *td, struct io_u *io_u)
133 struct io_piece *ipo;
134 unsigned long elapsed;
136 while (!flist_empty(&td->io_log_list)) {
139 ipo = flist_first_entry(&td->io_log_list, struct io_piece, list);
140 flist_del(&ipo->list);
141 remove_trim_entry(td, ipo);
143 ret = ipo_special(td, ipo);
147 } else if (ret > 0) {
152 io_u->ddir = ipo->ddir;
153 if (ipo->ddir != DDIR_WAIT) {
154 io_u->offset = ipo->offset;
155 io_u->buflen = ipo->len;
156 io_u->file = td->files[ipo->fileno];
157 get_file(io_u->file);
158 dprint(FD_IO, "iolog: get %llu/%lu/%s\n", io_u->offset,
159 io_u->buflen, io_u->file->file_name);
161 iolog_delay(td, ipo->delay);
163 elapsed = mtime_since_genesis();
164 if (ipo->delay > elapsed)
165 usec_sleep(td, (ipo->delay - elapsed) * 1000);
170 if (io_u->ddir != DDIR_WAIT)
178 void prune_io_piece_log(struct thread_data *td)
180 struct io_piece *ipo;
183 while ((n = rb_first(&td->io_hist_tree)) != NULL) {
184 ipo = rb_entry(n, struct io_piece, rb_node);
185 rb_erase(n, &td->io_hist_tree);
186 remove_trim_entry(td, ipo);
191 while (!flist_empty(&td->io_hist_list)) {
192 ipo = flist_first_entry(&td->io_hist_list, struct io_piece, list);
193 flist_del(&ipo->list);
194 remove_trim_entry(td, ipo);
201 * log a successful write, so we can unwind the log for verify
203 void log_io_piece(struct thread_data *td, struct io_u *io_u)
205 struct rb_node **p, *parent;
206 struct io_piece *ipo, *__ipo;
208 ipo = malloc(sizeof(struct io_piece));
210 ipo->file = io_u->file;
211 ipo->offset = io_u->offset;
212 ipo->len = io_u->buflen;
213 ipo->numberio = io_u->numberio;
214 ipo->flags = IP_F_IN_FLIGHT;
218 if (io_u_should_trim(td, io_u)) {
219 flist_add_tail(&ipo->trim_list, &td->trim_list);
224 * We don't need to sort the entries, if:
226 * Sequential writes, or
227 * Random writes that lay out the file as it goes along
229 * For both these cases, just reading back data in the order we
230 * wrote it out is the fastest.
232 * One exception is if we don't have a random map AND we are doing
233 * verifies, in that case we need to check for duplicate blocks and
234 * drop the old one, which we rely on the rb insert/lookup for
237 if (((!td->o.verifysort) || !td_random(td) || !td->o.overwrite) &&
238 (file_randommap(td, ipo->file) || td->o.verify == VERIFY_NONE)) {
239 INIT_FLIST_HEAD(&ipo->list);
240 flist_add_tail(&ipo->list, &td->io_hist_list);
241 ipo->flags |= IP_F_ONLIST;
246 RB_CLEAR_NODE(&ipo->rb_node);
249 * Sort the entry into the verification list
252 p = &td->io_hist_tree.rb_node;
258 __ipo = rb_entry(parent, struct io_piece, rb_node);
259 if (ipo->file < __ipo->file)
261 else if (ipo->file > __ipo->file)
263 else if (ipo->offset < __ipo->offset) {
265 overlap = ipo->offset + ipo->len > __ipo->offset;
267 else if (ipo->offset > __ipo->offset) {
269 overlap = __ipo->offset + __ipo->len > ipo->offset;
275 dprint(FD_IO, "iolog: overlap %llu/%lu, %llu/%lu",
276 __ipo->offset, __ipo->len,
277 ipo->offset, ipo->len);
279 rb_erase(parent, &td->io_hist_tree);
280 remove_trim_entry(td, __ipo);
286 rb_link_node(&ipo->rb_node, parent, p);
287 rb_insert_color(&ipo->rb_node, &td->io_hist_tree);
288 ipo->flags |= IP_F_ONRB;
292 void unlog_io_piece(struct thread_data *td, struct io_u *io_u)
294 struct io_piece *ipo = io_u->ipo;
296 if (td->ts.nr_block_infos) {
297 uint32_t *info = io_u_block_info(td, io_u);
298 if (BLOCK_INFO_STATE(*info) < BLOCK_STATE_TRIM_FAILURE) {
299 if (io_u->ddir == DDIR_TRIM)
300 *info = BLOCK_INFO_SET_STATE(*info,
301 BLOCK_STATE_TRIM_FAILURE);
302 else if (io_u->ddir == DDIR_WRITE)
303 *info = BLOCK_INFO_SET_STATE(*info,
304 BLOCK_STATE_WRITE_FAILURE);
311 if (ipo->flags & IP_F_ONRB)
312 rb_erase(&ipo->rb_node, &td->io_hist_tree);
313 else if (ipo->flags & IP_F_ONLIST)
314 flist_del(&ipo->list);
321 void trim_io_piece(struct thread_data *td, const struct io_u *io_u)
323 struct io_piece *ipo = io_u->ipo;
328 ipo->len = io_u->xfer_buflen - io_u->resid;
331 void write_iolog_close(struct thread_data *td)
337 td->iolog_buf = NULL;
341 * Read version 2 iolog data. It is enhanced to include per-file logging,
344 static int read_iolog2(struct thread_data *td, FILE *f)
346 unsigned long long offset;
348 int reads, writes, waits, fileno = 0, file_action = 0; /* stupid gcc */
349 char *rfname, *fname, *act;
353 free_release_files(td);
356 * Read in the read iolog and store it, reuse the infrastructure
357 * for doing verifications.
360 rfname = fname = malloc(256+16);
361 act = malloc(256+16);
363 reads = writes = waits = 0;
364 while ((p = fgets(str, 4096, f)) != NULL) {
365 struct io_piece *ipo;
368 r = sscanf(p, "%256s %256s %llu %u", rfname, act, &offset,
371 if (td->o.replay_redirect)
372 fname = td->o.replay_redirect;
378 if (!strcmp(act, "wait"))
380 else if (!strcmp(act, "read"))
382 else if (!strcmp(act, "write"))
384 else if (!strcmp(act, "sync"))
386 else if (!strcmp(act, "datasync"))
388 else if (!strcmp(act, "trim"))
391 log_err("fio: bad iolog file action: %s\n",
395 fileno = get_fileno(td, fname);
398 if (!strcmp(act, "add")) {
399 fileno = add_file(td, fname, 0, 1);
400 file_action = FIO_LOG_ADD_FILE;
402 } else if (!strcmp(act, "open")) {
403 fileno = get_fileno(td, fname);
404 file_action = FIO_LOG_OPEN_FILE;
405 } else if (!strcmp(act, "close")) {
406 fileno = get_fileno(td, fname);
407 file_action = FIO_LOG_CLOSE_FILE;
409 log_err("fio: bad iolog file action: %s\n",
414 log_err("bad iolog2: %s", p);
420 else if (rw == DDIR_WRITE) {
422 * Don't add a write for ro mode
427 } else if (rw == DDIR_WAIT) {
431 } else if (rw == DDIR_INVAL) {
432 } else if (!ddir_sync(rw)) {
433 log_err("bad ddir: %d\n", rw);
440 ipo = malloc(sizeof(*ipo));
443 if (rw == DDIR_WAIT) {
446 ipo->offset = offset;
448 if (rw != DDIR_INVAL && bytes > td->o.max_bs[rw])
449 td->o.max_bs[rw] = bytes;
450 ipo->fileno = fileno;
451 ipo->file_action = file_action;
455 queue_io_piece(td, ipo);
462 if (writes && read_only) {
463 log_err("fio: <%s> skips replay of %d writes due to"
464 " read-only\n", td->o.name, writes);
468 if (!reads && !writes && !waits)
470 else if (reads && !writes)
471 td->o.td_ddir = TD_DDIR_READ;
472 else if (!reads && writes)
473 td->o.td_ddir = TD_DDIR_WRITE;
475 td->o.td_ddir = TD_DDIR_RW;
481 * open iolog, check version, and call appropriate parser
483 static int init_iolog_read(struct thread_data *td)
485 char buffer[256], *p;
489 f = fopen(td->o.read_iolog_file, "r");
491 perror("fopen read iolog");
495 p = fgets(buffer, sizeof(buffer), f);
497 td_verror(td, errno, "iolog read");
498 log_err("fio: unable to read iolog\n");
504 * version 2 of the iolog stores a specific string as the
505 * first line, check for that
507 if (!strncmp(iolog_ver2, buffer, strlen(iolog_ver2)))
508 ret = read_iolog2(td, f);
510 log_err("fio: iolog version 1 is no longer supported\n");
519 * Set up a log for storing io patterns.
521 static int init_iolog_write(struct thread_data *td)
527 f = fopen(td->o.write_iolog_file, "a");
529 perror("fopen write iolog");
534 * That's it for writing, setup a log buffer and we're done.
537 td->iolog_buf = malloc(8192);
538 setvbuf(f, td->iolog_buf, _IOFBF, 8192);
541 * write our version line
543 if (fprintf(f, "%s\n", iolog_ver2) < 0) {
544 perror("iolog init\n");
549 * add all known files
551 for_each_file(td, ff, i)
552 log_file(td, ff, FIO_LOG_ADD_FILE);
557 int init_iolog(struct thread_data *td)
561 if (td->o.read_iolog_file) {
565 * Check if it's a blktrace file and load that if possible.
566 * Otherwise assume it's a normal log file and load that.
568 if (is_blktrace(td->o.read_iolog_file, &need_swap))
569 ret = load_blktrace(td, td->o.read_iolog_file, need_swap);
571 ret = init_iolog_read(td);
572 } else if (td->o.write_iolog_file)
573 ret = init_iolog_write(td);
576 td_verror(td, EINVAL, "failed initializing iolog");
581 void setup_log(struct io_log **log, struct log_params *p,
582 const char *filename)
586 struct io_u_plat_entry *entry;
587 struct flist_head *list;
589 l = scalloc(1, sizeof(*l));
590 INIT_FLIST_HEAD(&l->io_logs);
591 l->log_type = p->log_type;
592 l->log_offset = p->log_offset;
593 l->log_gz = p->log_gz;
594 l->log_gz_store = p->log_gz_store;
595 l->avg_msec = p->avg_msec;
596 l->hist_msec = p->hist_msec;
597 l->hist_coarseness = p->hist_coarseness;
598 l->filename = strdup(filename);
601 /* Initialize histogram lists for each r/w direction,
602 * with initial io_u_plat of all zeros:
604 for (i = 0; i < DDIR_RWDIR_CNT; i++) {
605 list = &l->hist_window[i].list;
606 INIT_FLIST_HEAD(list);
607 entry = calloc(1, sizeof(struct io_u_plat_entry));
608 flist_add(&entry->list, list);
611 if (l->td && l->td->o.io_submit_mode != IO_MODE_OFFLOAD) {
614 p = calloc(1, sizeof(*l->pending));
615 p->max_samples = DEF_LOG_ENTRIES;
616 p->log = calloc(p->max_samples, log_entry_sz(l));
621 l->log_ddir_mask = LOG_OFFSET_SAMPLE_BIT;
623 INIT_FLIST_HEAD(&l->chunk_list);
625 if (l->log_gz && !p->td)
627 else if (l->log_gz || l->log_gz_store) {
628 mutex_init_pshared(&l->chunk_lock);
629 p->td->flags |= TD_F_COMPRESS_LOG;
635 #ifdef CONFIG_SETVBUF
636 static void *set_file_buffer(FILE *f)
638 size_t size = 1048576;
642 setvbuf(f, buf, _IOFBF, size);
646 static void clear_file_buffer(void *buf)
651 static void *set_file_buffer(FILE *f)
656 static void clear_file_buffer(void *buf)
661 void free_log(struct io_log *log)
663 while (!flist_empty(&log->io_logs)) {
664 struct io_logs *cur_log;
666 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
667 flist_del_init(&cur_log->list);
673 free(log->pending->log);
683 inline unsigned long hist_sum(int j, int stride, unsigned int *io_u_plat,
684 unsigned int *io_u_plat_last)
689 if (io_u_plat_last) {
690 for (k = sum = 0; k < stride; k++)
691 sum += io_u_plat[j + k] - io_u_plat_last[j + k];
693 for (k = sum = 0; k < stride; k++)
694 sum += io_u_plat[j + k];
700 static void flush_hist_samples(FILE *f, int hist_coarseness, void *samples,
701 uint64_t sample_size)
705 uint64_t i, j, nr_samples;
706 struct io_u_plat_entry *entry, *entry_before;
707 unsigned int *io_u_plat;
708 unsigned int *io_u_plat_before;
710 int stride = 1 << hist_coarseness;
715 s = __get_sample(samples, 0, 0);
716 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
718 nr_samples = sample_size / __log_entry_sz(log_offset);
720 for (i = 0; i < nr_samples; i++) {
721 s = __get_sample(samples, log_offset, i);
723 entry = s->data.plat_entry;
724 io_u_plat = entry->io_u_plat;
726 entry_before = flist_first_entry(&entry->list, struct io_u_plat_entry, list);
727 io_u_plat_before = entry_before->io_u_plat;
729 fprintf(f, "%lu, %u, %u, ", (unsigned long) s->time,
730 io_sample_ddir(s), s->bs);
731 for (j = 0; j < FIO_IO_U_PLAT_NR - stride; j += stride) {
732 fprintf(f, "%lu, ", hist_sum(j, stride, io_u_plat,
735 fprintf(f, "%lu\n", (unsigned long)
736 hist_sum(FIO_IO_U_PLAT_NR - stride, stride, io_u_plat,
739 flist_del(&entry_before->list);
744 void flush_samples(FILE *f, void *samples, uint64_t sample_size)
748 uint64_t i, nr_samples;
753 s = __get_sample(samples, 0, 0);
754 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
756 nr_samples = sample_size / __log_entry_sz(log_offset);
758 for (i = 0; i < nr_samples; i++) {
759 s = __get_sample(samples, log_offset, i);
762 fprintf(f, "%lu, %" PRId64 ", %u, %u\n",
763 (unsigned long) s->time,
765 io_sample_ddir(s), s->bs);
767 struct io_sample_offset *so = (void *) s;
769 fprintf(f, "%lu, %" PRId64 ", %u, %u, %llu\n",
770 (unsigned long) s->time,
772 io_sample_ddir(s), s->bs,
773 (unsigned long long) so->offset);
780 struct iolog_flush_data {
781 struct workqueue_work work;
788 #define GZ_CHUNK 131072
790 static struct iolog_compress *get_new_chunk(unsigned int seq)
792 struct iolog_compress *c;
794 c = malloc(sizeof(*c));
795 INIT_FLIST_HEAD(&c->list);
796 c->buf = malloc(GZ_CHUNK);
802 static void free_chunk(struct iolog_compress *ic)
808 static int z_stream_init(z_stream *stream, int gz_hdr)
812 memset(stream, 0, sizeof(*stream));
813 stream->zalloc = Z_NULL;
814 stream->zfree = Z_NULL;
815 stream->opaque = Z_NULL;
816 stream->next_in = Z_NULL;
819 * zlib magic - add 32 for auto-detection of gz header or not,
820 * if we decide to store files in a gzip friendly format.
825 if (inflateInit2(stream, wbits) != Z_OK)
831 struct inflate_chunk_iter {
840 static void finish_chunk(z_stream *stream, FILE *f,
841 struct inflate_chunk_iter *iter)
845 ret = inflateEnd(stream);
847 log_err("fio: failed to end log inflation seq %d (%d)\n",
850 flush_samples(f, iter->buf, iter->buf_used);
853 iter->buf_size = iter->buf_used = 0;
857 * Iterative chunk inflation. Handles cases where we cross into a new
858 * sequence, doing flush finish of previous chunk if needed.
860 static size_t inflate_chunk(struct iolog_compress *ic, int gz_hdr, FILE *f,
861 z_stream *stream, struct inflate_chunk_iter *iter)
865 dprint(FD_COMPRESS, "inflate chunk size=%lu, seq=%u\n",
866 (unsigned long) ic->len, ic->seq);
868 if (ic->seq != iter->seq) {
870 finish_chunk(stream, f, iter);
872 z_stream_init(stream, gz_hdr);
876 stream->avail_in = ic->len;
877 stream->next_in = ic->buf;
879 if (!iter->buf_size) {
880 iter->buf_size = iter->chunk_sz;
881 iter->buf = malloc(iter->buf_size);
884 while (stream->avail_in) {
885 size_t this_out = iter->buf_size - iter->buf_used;
888 stream->avail_out = this_out;
889 stream->next_out = iter->buf + iter->buf_used;
891 err = inflate(stream, Z_NO_FLUSH);
893 log_err("fio: failed inflating log: %d\n", err);
898 iter->buf_used += this_out - stream->avail_out;
900 if (!stream->avail_out) {
901 iter->buf_size += iter->chunk_sz;
902 iter->buf = realloc(iter->buf, iter->buf_size);
906 if (err == Z_STREAM_END)
910 ret = (void *) stream->next_in - ic->buf;
912 dprint(FD_COMPRESS, "inflated to size=%lu\n", (unsigned long) iter->buf_size);
918 * Inflate stored compressed chunks, or write them directly to the log
919 * file if so instructed.
921 static int inflate_gz_chunks(struct io_log *log, FILE *f)
923 struct inflate_chunk_iter iter = { .chunk_sz = log->log_gz, };
926 while (!flist_empty(&log->chunk_list)) {
927 struct iolog_compress *ic;
929 ic = flist_first_entry(&log->chunk_list, struct iolog_compress, list);
930 flist_del(&ic->list);
932 if (log->log_gz_store) {
935 dprint(FD_COMPRESS, "log write chunk size=%lu, "
936 "seq=%u\n", (unsigned long) ic->len, ic->seq);
938 ret = fwrite(ic->buf, ic->len, 1, f);
939 if (ret != 1 || ferror(f)) {
941 log_err("fio: error writing compressed log\n");
944 inflate_chunk(ic, log->log_gz_store, f, &stream, &iter);
950 finish_chunk(&stream, f, &iter);
958 * Open compressed log file and decompress the stored chunks and
959 * write them to stdout. The chunks are stored sequentially in the
960 * file, so we iterate over them and do them one-by-one.
962 int iolog_file_inflate(const char *file)
964 struct inflate_chunk_iter iter = { .chunk_sz = 64 * 1024 * 1024, };
965 struct iolog_compress ic;
973 f = fopen(file, "r");
979 if (stat(file, &sb) < 0) {
985 ic.buf = buf = malloc(sb.st_size);
989 ret = fread(ic.buf, ic.len, 1, f);
995 } else if (ret != 1) {
996 log_err("fio: short read on reading log\n");
1005 * Each chunk will return Z_STREAM_END. We don't know how many
1006 * chunks are in the file, so we just keep looping and incrementing
1007 * the sequence number until we have consumed the whole compressed
1014 iret = inflate_chunk(&ic, 1, stdout, &stream, &iter);
1027 finish_chunk(&stream, stdout, &iter);
1037 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1042 int iolog_file_inflate(const char *file)
1044 log_err("fio: log inflation not possible without zlib\n");
1050 void flush_log(struct io_log *log, bool do_append)
1056 f = fopen(log->filename, "w");
1058 f = fopen(log->filename, "a");
1060 perror("fopen log");
1064 buf = set_file_buffer(f);
1066 inflate_gz_chunks(log, f);
1068 while (!flist_empty(&log->io_logs)) {
1069 struct io_logs *cur_log;
1071 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1072 flist_del_init(&cur_log->list);
1074 if (log->td && log == log->td->clat_hist_log)
1075 flush_hist_samples(f, log->hist_coarseness, cur_log->log,
1076 log_sample_sz(log, cur_log));
1078 flush_samples(f, cur_log->log, log_sample_sz(log, cur_log));
1084 clear_file_buffer(buf);
1087 static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
1089 if (td->flags & TD_F_COMPRESS_LOG)
1093 if (fio_trylock_file(log->filename))
1096 fio_lock_file(log->filename);
1098 if (td->client_type == FIO_CLIENT_TYPE_GUI || is_backend)
1099 fio_send_iolog(td, log, log->filename);
1101 flush_log(log, !td->o.per_job_logs);
1103 fio_unlock_file(log->filename);
1108 size_t log_chunk_sizes(struct io_log *log)
1110 struct flist_head *entry;
1113 if (flist_empty(&log->chunk_list))
1117 pthread_mutex_lock(&log->chunk_lock);
1118 flist_for_each(entry, &log->chunk_list) {
1119 struct iolog_compress *c;
1121 c = flist_entry(entry, struct iolog_compress, list);
1124 pthread_mutex_unlock(&log->chunk_lock);
1130 static int gz_work(struct iolog_flush_data *data)
1132 struct iolog_compress *c = NULL;
1133 struct flist_head list;
1139 INIT_FLIST_HEAD(&list);
1141 memset(&stream, 0, sizeof(stream));
1142 stream.zalloc = Z_NULL;
1143 stream.zfree = Z_NULL;
1144 stream.opaque = Z_NULL;
1146 ret = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
1148 log_err("fio: failed to init gz stream\n");
1152 seq = ++data->log->chunk_seq;
1154 stream.next_in = (void *) data->samples;
1155 stream.avail_in = data->nr_samples * log_entry_sz(data->log);
1157 dprint(FD_COMPRESS, "deflate input size=%lu, seq=%u, log=%s\n",
1158 (unsigned long) stream.avail_in, seq,
1159 data->log->filename);
1162 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1163 (unsigned long) c->len);
1164 c = get_new_chunk(seq);
1165 stream.avail_out = GZ_CHUNK;
1166 stream.next_out = c->buf;
1167 ret = deflate(&stream, Z_NO_FLUSH);
1169 log_err("fio: deflate log (%d)\n", ret);
1174 c->len = GZ_CHUNK - stream.avail_out;
1175 flist_add_tail(&c->list, &list);
1177 } while (stream.avail_in);
1179 stream.next_out = c->buf + c->len;
1180 stream.avail_out = GZ_CHUNK - c->len;
1182 ret = deflate(&stream, Z_FINISH);
1185 * Z_BUF_ERROR is special, it just means we need more
1186 * output space. We'll handle that below. Treat any other
1189 if (ret != Z_BUF_ERROR) {
1190 log_err("fio: deflate log (%d)\n", ret);
1191 flist_del(&c->list);
1198 c->len = GZ_CHUNK - stream.avail_out;
1200 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq, (unsigned long) c->len);
1202 if (ret != Z_STREAM_END) {
1204 c = get_new_chunk(seq);
1205 stream.avail_out = GZ_CHUNK;
1206 stream.next_out = c->buf;
1207 ret = deflate(&stream, Z_FINISH);
1208 c->len = GZ_CHUNK - stream.avail_out;
1210 flist_add_tail(&c->list, &list);
1211 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1212 (unsigned long) c->len);
1213 } while (ret != Z_STREAM_END);
1216 dprint(FD_COMPRESS, "deflated to size=%lu\n", (unsigned long) total);
1218 ret = deflateEnd(&stream);
1220 log_err("fio: deflateEnd %d\n", ret);
1222 free(data->samples);
1224 if (!flist_empty(&list)) {
1225 pthread_mutex_lock(&data->log->chunk_lock);
1226 flist_splice_tail(&list, &data->log->chunk_list);
1227 pthread_mutex_unlock(&data->log->chunk_lock);
1236 while (!flist_empty(&list)) {
1237 c = flist_first_entry(list.next, struct iolog_compress, list);
1238 flist_del(&c->list);
1246 * Invoked from our compress helper thread, when logging would have exceeded
1247 * the specified memory limitation. Compresses the previously stored
1250 static int gz_work_async(struct submit_worker *sw, struct workqueue_work *work)
1252 return gz_work(container_of(work, struct iolog_flush_data, work));
1255 static int gz_init_worker(struct submit_worker *sw)
1257 struct thread_data *td = sw->wq->td;
1259 if (!fio_option_is_set(&td->o, log_gz_cpumask))
1262 if (fio_setaffinity(gettid(), td->o.log_gz_cpumask) == -1) {
1263 log_err("gz: failed to set CPU affinity\n");
1270 static struct workqueue_ops log_compress_wq_ops = {
1271 .fn = gz_work_async,
1272 .init_worker_fn = gz_init_worker,
1276 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1278 if (!(td->flags & TD_F_COMPRESS_LOG))
1281 workqueue_init(td, &td->log_compress_wq, &log_compress_wq_ops, 1, sk_out);
1285 void iolog_compress_exit(struct thread_data *td)
1287 if (!(td->flags & TD_F_COMPRESS_LOG))
1290 workqueue_exit(&td->log_compress_wq);
1294 * Queue work item to compress the existing log entries. We reset the
1295 * current log to a small size, and reference the existing log in the
1296 * data that we queue for compression. Once compression has been done,
1297 * this old log is freed. If called with finish == true, will not return
1298 * until the log compression has completed, and will flush all previous
1301 static int iolog_flush(struct io_log *log)
1303 struct iolog_flush_data *data;
1305 data = malloc(sizeof(*data));
1312 while (!flist_empty(&log->io_logs)) {
1313 struct io_logs *cur_log;
1315 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1316 flist_del_init(&cur_log->list);
1318 data->samples = cur_log->log;
1319 data->nr_samples = cur_log->nr_samples;
1330 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1332 struct iolog_flush_data *data;
1334 data = malloc(sizeof(*data));
1340 data->samples = cur_log->log;
1341 data->nr_samples = cur_log->nr_samples;
1344 cur_log->nr_samples = cur_log->max_samples = 0;
1345 cur_log->log = NULL;
1347 workqueue_enqueue(&log->td->log_compress_wq, &data->work);
1352 static int iolog_flush(struct io_log *log)
1357 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1362 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1367 void iolog_compress_exit(struct thread_data *td)
1373 struct io_logs *iolog_cur_log(struct io_log *log)
1375 if (flist_empty(&log->io_logs))
1378 return flist_last_entry(&log->io_logs, struct io_logs, list);
1381 uint64_t iolog_nr_samples(struct io_log *iolog)
1383 struct flist_head *entry;
1386 flist_for_each(entry, &iolog->io_logs) {
1387 struct io_logs *cur_log;
1389 cur_log = flist_entry(entry, struct io_logs, list);
1390 ret += cur_log->nr_samples;
1396 static int __write_log(struct thread_data *td, struct io_log *log, int try)
1399 return finish_log(td, log, try);
1404 static int write_iops_log(struct thread_data *td, int try, bool unit_log)
1408 if (per_unit_log(td->iops_log) != unit_log)
1411 ret = __write_log(td, td->iops_log, try);
1413 td->iops_log = NULL;
1418 static int write_slat_log(struct thread_data *td, int try, bool unit_log)
1425 ret = __write_log(td, td->slat_log, try);
1427 td->slat_log = NULL;
1432 static int write_clat_log(struct thread_data *td, int try, bool unit_log)
1439 ret = __write_log(td, td->clat_log, try);
1441 td->clat_log = NULL;
1446 static int write_clat_hist_log(struct thread_data *td, int try, bool unit_log)
1453 ret = __write_log(td, td->clat_hist_log, try);
1455 td->clat_hist_log = NULL;
1460 static int write_lat_log(struct thread_data *td, int try, bool unit_log)
1467 ret = __write_log(td, td->lat_log, try);
1474 static int write_bandw_log(struct thread_data *td, int try, bool unit_log)
1478 if (per_unit_log(td->bw_log) != unit_log)
1481 ret = __write_log(td, td->bw_log, try);
1494 CLAT_HIST_LOG_MASK = 32,
1501 int (*fn)(struct thread_data *, int, bool);
1504 static struct log_type log_types[] = {
1506 .mask = BW_LOG_MASK,
1507 .fn = write_bandw_log,
1510 .mask = LAT_LOG_MASK,
1511 .fn = write_lat_log,
1514 .mask = SLAT_LOG_MASK,
1515 .fn = write_slat_log,
1518 .mask = CLAT_LOG_MASK,
1519 .fn = write_clat_log,
1522 .mask = IOPS_LOG_MASK,
1523 .fn = write_iops_log,
1526 .mask = CLAT_HIST_LOG_MASK,
1527 .fn = write_clat_hist_log,
1531 void td_writeout_logs(struct thread_data *td, bool unit_logs)
1533 unsigned int log_mask = 0;
1534 unsigned int log_left = ALL_LOG_NR;
1537 old_state = td_bump_runstate(td, TD_FINISHING);
1539 finalize_logs(td, unit_logs);
1542 int prev_log_left = log_left;
1544 for (i = 0; i < ALL_LOG_NR && log_left; i++) {
1545 struct log_type *lt = &log_types[i];
1548 if (!(log_mask & lt->mask)) {
1549 ret = lt->fn(td, log_left != 1, unit_logs);
1552 log_mask |= lt->mask;
1557 if (prev_log_left == log_left)
1561 td_restore_runstate(td, old_state);
1564 void fio_writeout_logs(bool unit_logs)
1566 struct thread_data *td;
1570 td_writeout_logs(td, unit_logs);