2 * Code related to writing an iolog of what a thread is doing, and to
3 * later read that back and replay
23 static int iolog_flush(struct io_log *log);
25 static const char iolog_ver2[] = "fio version 2 iolog";
27 void queue_io_piece(struct thread_data *td, struct io_piece *ipo)
29 flist_add_tail(&ipo->list, &td->io_log_list);
30 td->total_io_size += ipo->len;
33 void log_io_u(const struct thread_data *td, const struct io_u *io_u)
35 if (!td->o.write_iolog_file)
38 fprintf(td->iolog_f, "%s %s %llu %lu\n", io_u->file->file_name,
39 io_ddir_name(io_u->ddir),
40 io_u->offset, io_u->buflen);
43 void log_file(struct thread_data *td, struct fio_file *f,
44 enum file_log_act what)
46 const char *act[] = { "add", "open", "close" };
50 if (!td->o.write_iolog_file)
55 * this happens on the pre-open/close done before the job starts
60 fprintf(td->iolog_f, "%s %s\n", f->file_name, act[what]);
63 static void iolog_delay(struct thread_data *td, unsigned long delay)
65 uint64_t usec = utime_since_now(&td->last_issue);
69 if (delay < td->time_offset) {
74 delay -= td->time_offset;
80 fio_gettime(&tv, NULL);
81 while (delay && !td->terminate) {
83 if (this_delay > 500000)
86 usec_sleep(td, this_delay);
90 usec = utime_since_now(&tv);
92 td->time_offset = usec - delay;
97 static int ipo_special(struct thread_data *td, struct io_piece *ipo)
105 if (ipo->ddir != DDIR_INVAL)
108 f = td->files[ipo->fileno];
110 switch (ipo->file_action) {
111 case FIO_LOG_OPEN_FILE:
112 ret = td_io_open_file(td, f);
115 td_verror(td, ret, "iolog open file");
117 case FIO_LOG_CLOSE_FILE:
118 td_io_close_file(td, f);
120 case FIO_LOG_UNLINK_FILE:
121 td_io_unlink_file(td, f);
124 log_err("fio: bad file action %d\n", ipo->file_action);
131 int read_iolog_get(struct thread_data *td, struct io_u *io_u)
133 struct io_piece *ipo;
134 unsigned long elapsed;
136 while (!flist_empty(&td->io_log_list)) {
139 ipo = flist_first_entry(&td->io_log_list, struct io_piece, list);
140 flist_del(&ipo->list);
141 remove_trim_entry(td, ipo);
143 ret = ipo_special(td, ipo);
147 } else if (ret > 0) {
152 io_u->ddir = ipo->ddir;
153 if (ipo->ddir != DDIR_WAIT) {
154 io_u->offset = ipo->offset;
155 io_u->buflen = ipo->len;
156 io_u->file = td->files[ipo->fileno];
157 get_file(io_u->file);
158 dprint(FD_IO, "iolog: get %llu/%lu/%s\n", io_u->offset,
159 io_u->buflen, io_u->file->file_name);
161 iolog_delay(td, ipo->delay);
163 elapsed = mtime_since_genesis();
164 if (ipo->delay > elapsed)
165 usec_sleep(td, (ipo->delay - elapsed) * 1000);
170 if (io_u->ddir != DDIR_WAIT)
178 void prune_io_piece_log(struct thread_data *td)
180 struct io_piece *ipo;
183 while ((n = rb_first(&td->io_hist_tree)) != NULL) {
184 ipo = rb_entry(n, struct io_piece, rb_node);
185 rb_erase(n, &td->io_hist_tree);
186 remove_trim_entry(td, ipo);
191 while (!flist_empty(&td->io_hist_list)) {
192 ipo = flist_first_entry(&td->io_hist_list, struct io_piece, list);
193 flist_del(&ipo->list);
194 remove_trim_entry(td, ipo);
201 * log a successful write, so we can unwind the log for verify
203 void log_io_piece(struct thread_data *td, struct io_u *io_u)
205 struct rb_node **p, *parent;
206 struct io_piece *ipo, *__ipo;
208 ipo = malloc(sizeof(struct io_piece));
210 ipo->file = io_u->file;
211 ipo->offset = io_u->offset;
212 ipo->len = io_u->buflen;
213 ipo->numberio = io_u->numberio;
214 ipo->flags = IP_F_IN_FLIGHT;
218 if (io_u_should_trim(td, io_u)) {
219 flist_add_tail(&ipo->trim_list, &td->trim_list);
224 * We don't need to sort the entries, if:
226 * Sequential writes, or
227 * Random writes that lay out the file as it goes along
229 * For both these cases, just reading back data in the order we
230 * wrote it out is the fastest.
232 * One exception is if we don't have a random map AND we are doing
233 * verifies, in that case we need to check for duplicate blocks and
234 * drop the old one, which we rely on the rb insert/lookup for
237 if (((!td->o.verifysort) || !td_random(td) || !td->o.overwrite) &&
238 (file_randommap(td, ipo->file) || td->o.verify == VERIFY_NONE)) {
239 INIT_FLIST_HEAD(&ipo->list);
240 flist_add_tail(&ipo->list, &td->io_hist_list);
241 ipo->flags |= IP_F_ONLIST;
246 RB_CLEAR_NODE(&ipo->rb_node);
249 * Sort the entry into the verification list
252 p = &td->io_hist_tree.rb_node;
258 __ipo = rb_entry(parent, struct io_piece, rb_node);
259 if (ipo->file < __ipo->file)
261 else if (ipo->file > __ipo->file)
263 else if (ipo->offset < __ipo->offset) {
265 overlap = ipo->offset + ipo->len > __ipo->offset;
267 else if (ipo->offset > __ipo->offset) {
269 overlap = __ipo->offset + __ipo->len > ipo->offset;
275 dprint(FD_IO, "iolog: overlap %llu/%lu, %llu/%lu",
276 __ipo->offset, __ipo->len,
277 ipo->offset, ipo->len);
279 rb_erase(parent, &td->io_hist_tree);
280 remove_trim_entry(td, __ipo);
286 rb_link_node(&ipo->rb_node, parent, p);
287 rb_insert_color(&ipo->rb_node, &td->io_hist_tree);
288 ipo->flags |= IP_F_ONRB;
292 void unlog_io_piece(struct thread_data *td, struct io_u *io_u)
294 struct io_piece *ipo = io_u->ipo;
296 if (td->ts.nr_block_infos) {
297 uint32_t *info = io_u_block_info(td, io_u);
298 if (BLOCK_INFO_STATE(*info) < BLOCK_STATE_TRIM_FAILURE) {
299 if (io_u->ddir == DDIR_TRIM)
300 *info = BLOCK_INFO_SET_STATE(*info,
301 BLOCK_STATE_TRIM_FAILURE);
302 else if (io_u->ddir == DDIR_WRITE)
303 *info = BLOCK_INFO_SET_STATE(*info,
304 BLOCK_STATE_WRITE_FAILURE);
311 if (ipo->flags & IP_F_ONRB)
312 rb_erase(&ipo->rb_node, &td->io_hist_tree);
313 else if (ipo->flags & IP_F_ONLIST)
314 flist_del(&ipo->list);
321 void trim_io_piece(struct thread_data *td, const struct io_u *io_u)
323 struct io_piece *ipo = io_u->ipo;
328 ipo->len = io_u->xfer_buflen - io_u->resid;
331 void write_iolog_close(struct thread_data *td)
337 td->iolog_buf = NULL;
341 * Read version 2 iolog data. It is enhanced to include per-file logging,
344 static int read_iolog2(struct thread_data *td, FILE *f)
346 unsigned long long offset;
348 int reads, writes, waits, fileno = 0, file_action = 0; /* stupid gcc */
349 char *rfname, *fname, *act;
353 free_release_files(td);
356 * Read in the read iolog and store it, reuse the infrastructure
357 * for doing verifications.
360 rfname = fname = malloc(256+16);
361 act = malloc(256+16);
363 reads = writes = waits = 0;
364 while ((p = fgets(str, 4096, f)) != NULL) {
365 struct io_piece *ipo;
368 r = sscanf(p, "%256s %256s %llu %u", rfname, act, &offset,
371 if (td->o.replay_redirect)
372 fname = td->o.replay_redirect;
378 if (!strcmp(act, "wait"))
380 else if (!strcmp(act, "read"))
382 else if (!strcmp(act, "write"))
384 else if (!strcmp(act, "sync"))
386 else if (!strcmp(act, "datasync"))
388 else if (!strcmp(act, "trim"))
391 log_err("fio: bad iolog file action: %s\n",
395 fileno = get_fileno(td, fname);
398 if (!strcmp(act, "add")) {
399 fileno = add_file(td, fname, 0, 1);
400 file_action = FIO_LOG_ADD_FILE;
402 } else if (!strcmp(act, "open")) {
403 fileno = get_fileno(td, fname);
404 file_action = FIO_LOG_OPEN_FILE;
405 } else if (!strcmp(act, "close")) {
406 fileno = get_fileno(td, fname);
407 file_action = FIO_LOG_CLOSE_FILE;
409 log_err("fio: bad iolog file action: %s\n",
414 log_err("bad iolog2: %s", p);
420 else if (rw == DDIR_WRITE) {
422 * Don't add a write for ro mode
427 } else if (rw == DDIR_WAIT) {
429 } else if (rw == DDIR_INVAL) {
430 } else if (!ddir_sync(rw)) {
431 log_err("bad ddir: %d\n", rw);
438 ipo = malloc(sizeof(*ipo));
441 if (rw == DDIR_WAIT) {
444 ipo->offset = offset;
446 if (rw != DDIR_INVAL && bytes > td->o.max_bs[rw])
447 td->o.max_bs[rw] = bytes;
448 ipo->fileno = fileno;
449 ipo->file_action = file_action;
453 queue_io_piece(td, ipo);
460 if (writes && read_only) {
461 log_err("fio: <%s> skips replay of %d writes due to"
462 " read-only\n", td->o.name, writes);
466 if (!reads && !writes && !waits)
468 else if (reads && !writes)
469 td->o.td_ddir = TD_DDIR_READ;
470 else if (!reads && writes)
471 td->o.td_ddir = TD_DDIR_WRITE;
473 td->o.td_ddir = TD_DDIR_RW;
479 * open iolog, check version, and call appropriate parser
481 static int init_iolog_read(struct thread_data *td)
483 char buffer[256], *p;
487 f = fopen(td->o.read_iolog_file, "r");
489 perror("fopen read iolog");
493 p = fgets(buffer, sizeof(buffer), f);
495 td_verror(td, errno, "iolog read");
496 log_err("fio: unable to read iolog\n");
502 * version 2 of the iolog stores a specific string as the
503 * first line, check for that
505 if (!strncmp(iolog_ver2, buffer, strlen(iolog_ver2)))
506 ret = read_iolog2(td, f);
508 log_err("fio: iolog version 1 is no longer supported\n");
517 * Set up a log for storing io patterns.
519 static int init_iolog_write(struct thread_data *td)
525 f = fopen(td->o.write_iolog_file, "a");
527 perror("fopen write iolog");
532 * That's it for writing, setup a log buffer and we're done.
535 td->iolog_buf = malloc(8192);
536 setvbuf(f, td->iolog_buf, _IOFBF, 8192);
539 * write our version line
541 if (fprintf(f, "%s\n", iolog_ver2) < 0) {
542 perror("iolog init\n");
547 * add all known files
549 for_each_file(td, ff, i)
550 log_file(td, ff, FIO_LOG_ADD_FILE);
555 int init_iolog(struct thread_data *td)
559 if (td->o.read_iolog_file) {
563 * Check if it's a blktrace file and load that if possible.
564 * Otherwise assume it's a normal log file and load that.
566 if (is_blktrace(td->o.read_iolog_file, &need_swap))
567 ret = load_blktrace(td, td->o.read_iolog_file, need_swap);
569 ret = init_iolog_read(td);
570 } else if (td->o.write_iolog_file)
571 ret = init_iolog_write(td);
574 td_verror(td, EINVAL, "failed initializing iolog");
579 void setup_log(struct io_log **log, struct log_params *p,
580 const char *filename)
584 struct io_u_plat_entry *entry;
585 struct flist_head *list;
587 l = scalloc(1, sizeof(*l));
588 INIT_FLIST_HEAD(&l->io_logs);
589 l->log_type = p->log_type;
590 l->log_offset = p->log_offset;
591 l->log_gz = p->log_gz;
592 l->log_gz_store = p->log_gz_store;
593 l->avg_msec = p->avg_msec;
594 l->hist_msec = p->hist_msec;
595 l->hist_coarseness = p->hist_coarseness;
596 l->filename = strdup(filename);
599 /* Initialize histogram lists for each r/w direction,
600 * with initial io_u_plat of all zeros:
602 for (i = 0; i < DDIR_RWDIR_CNT; i++) {
603 list = &l->hist_window[i].list;
604 INIT_FLIST_HEAD(list);
605 entry = calloc(1, sizeof(struct io_u_plat_entry));
606 flist_add(&entry->list, list);
609 if (l->td && l->td->o.io_submit_mode != IO_MODE_OFFLOAD) {
612 p = calloc(1, sizeof(*l->pending));
613 p->max_samples = DEF_LOG_ENTRIES;
614 p->log = calloc(p->max_samples, log_entry_sz(l));
619 l->log_ddir_mask = LOG_OFFSET_SAMPLE_BIT;
621 INIT_FLIST_HEAD(&l->chunk_list);
623 if (l->log_gz && !p->td)
625 else if (l->log_gz || l->log_gz_store) {
626 mutex_init_pshared(&l->chunk_lock);
627 p->td->flags |= TD_F_COMPRESS_LOG;
633 #ifdef CONFIG_SETVBUF
634 static void *set_file_buffer(FILE *f)
636 size_t size = 1048576;
640 setvbuf(f, buf, _IOFBF, size);
644 static void clear_file_buffer(void *buf)
649 static void *set_file_buffer(FILE *f)
654 static void clear_file_buffer(void *buf)
659 void free_log(struct io_log *log)
661 while (!flist_empty(&log->io_logs)) {
662 struct io_logs *cur_log;
664 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
665 flist_del_init(&cur_log->list);
671 free(log->pending->log);
681 inline unsigned long hist_sum(int j, int stride, unsigned int *io_u_plat,
682 unsigned int *io_u_plat_last)
687 if (io_u_plat_last) {
688 for (k = sum = 0; k < stride; k++)
689 sum += io_u_plat[j + k] - io_u_plat_last[j + k];
691 for (k = sum = 0; k < stride; k++)
692 sum += io_u_plat[j + k];
698 static void flush_hist_samples(FILE *f, int hist_coarseness, void *samples,
699 uint64_t sample_size)
703 uint64_t i, j, nr_samples;
704 struct io_u_plat_entry *entry, *entry_before;
705 unsigned int *io_u_plat;
706 unsigned int *io_u_plat_before;
708 int stride = 1 << hist_coarseness;
713 s = __get_sample(samples, 0, 0);
714 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
716 nr_samples = sample_size / __log_entry_sz(log_offset);
718 for (i = 0; i < nr_samples; i++) {
719 s = __get_sample(samples, log_offset, i);
721 entry = (struct io_u_plat_entry *) (uintptr_t) s->val;
722 io_u_plat = entry->io_u_plat;
724 entry_before = flist_first_entry(&entry->list, struct io_u_plat_entry, list);
725 io_u_plat_before = entry_before->io_u_plat;
727 fprintf(f, "%lu, %u, %u, ", (unsigned long) s->time,
728 io_sample_ddir(s), s->bs);
729 for (j = 0; j < FIO_IO_U_PLAT_NR - stride; j += stride) {
730 fprintf(f, "%lu, ", hist_sum(j, stride, io_u_plat,
733 fprintf(f, "%lu\n", (unsigned long)
734 hist_sum(FIO_IO_U_PLAT_NR - stride, stride, io_u_plat,
737 flist_del(&entry_before->list);
742 void flush_samples(FILE *f, void *samples, uint64_t sample_size)
746 uint64_t i, nr_samples;
751 s = __get_sample(samples, 0, 0);
752 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
754 nr_samples = sample_size / __log_entry_sz(log_offset);
756 for (i = 0; i < nr_samples; i++) {
757 s = __get_sample(samples, log_offset, i);
760 fprintf(f, "%lu, %lu, %u, %u\n",
761 (unsigned long) s->time,
762 (unsigned long) s->val,
763 io_sample_ddir(s), s->bs);
765 struct io_sample_offset *so = (void *) s;
767 fprintf(f, "%lu, %lu, %u, %u, %llu\n",
768 (unsigned long) s->time,
769 (unsigned long) s->val,
770 io_sample_ddir(s), s->bs,
771 (unsigned long long) so->offset);
778 struct iolog_flush_data {
779 struct workqueue_work work;
786 #define GZ_CHUNK 131072
788 static struct iolog_compress *get_new_chunk(unsigned int seq)
790 struct iolog_compress *c;
792 c = malloc(sizeof(*c));
793 INIT_FLIST_HEAD(&c->list);
794 c->buf = malloc(GZ_CHUNK);
800 static void free_chunk(struct iolog_compress *ic)
806 static int z_stream_init(z_stream *stream, int gz_hdr)
810 memset(stream, 0, sizeof(*stream));
811 stream->zalloc = Z_NULL;
812 stream->zfree = Z_NULL;
813 stream->opaque = Z_NULL;
814 stream->next_in = Z_NULL;
817 * zlib magic - add 32 for auto-detection of gz header or not,
818 * if we decide to store files in a gzip friendly format.
823 if (inflateInit2(stream, wbits) != Z_OK)
829 struct inflate_chunk_iter {
838 static void finish_chunk(z_stream *stream, FILE *f,
839 struct inflate_chunk_iter *iter)
843 ret = inflateEnd(stream);
845 log_err("fio: failed to end log inflation seq %d (%d)\n",
848 flush_samples(f, iter->buf, iter->buf_used);
851 iter->buf_size = iter->buf_used = 0;
855 * Iterative chunk inflation. Handles cases where we cross into a new
856 * sequence, doing flush finish of previous chunk if needed.
858 static size_t inflate_chunk(struct iolog_compress *ic, int gz_hdr, FILE *f,
859 z_stream *stream, struct inflate_chunk_iter *iter)
863 dprint(FD_COMPRESS, "inflate chunk size=%lu, seq=%u\n",
864 (unsigned long) ic->len, ic->seq);
866 if (ic->seq != iter->seq) {
868 finish_chunk(stream, f, iter);
870 z_stream_init(stream, gz_hdr);
874 stream->avail_in = ic->len;
875 stream->next_in = ic->buf;
877 if (!iter->buf_size) {
878 iter->buf_size = iter->chunk_sz;
879 iter->buf = malloc(iter->buf_size);
882 while (stream->avail_in) {
883 size_t this_out = iter->buf_size - iter->buf_used;
886 stream->avail_out = this_out;
887 stream->next_out = iter->buf + iter->buf_used;
889 err = inflate(stream, Z_NO_FLUSH);
891 log_err("fio: failed inflating log: %d\n", err);
896 iter->buf_used += this_out - stream->avail_out;
898 if (!stream->avail_out) {
899 iter->buf_size += iter->chunk_sz;
900 iter->buf = realloc(iter->buf, iter->buf_size);
904 if (err == Z_STREAM_END)
908 ret = (void *) stream->next_in - ic->buf;
910 dprint(FD_COMPRESS, "inflated to size=%lu\n", (unsigned long) iter->buf_size);
916 * Inflate stored compressed chunks, or write them directly to the log
917 * file if so instructed.
919 static int inflate_gz_chunks(struct io_log *log, FILE *f)
921 struct inflate_chunk_iter iter = { .chunk_sz = log->log_gz, };
924 while (!flist_empty(&log->chunk_list)) {
925 struct iolog_compress *ic;
927 ic = flist_first_entry(&log->chunk_list, struct iolog_compress, list);
928 flist_del(&ic->list);
930 if (log->log_gz_store) {
933 dprint(FD_COMPRESS, "log write chunk size=%lu, "
934 "seq=%u\n", (unsigned long) ic->len, ic->seq);
936 ret = fwrite(ic->buf, ic->len, 1, f);
937 if (ret != 1 || ferror(f)) {
939 log_err("fio: error writing compressed log\n");
942 inflate_chunk(ic, log->log_gz_store, f, &stream, &iter);
948 finish_chunk(&stream, f, &iter);
956 * Open compressed log file and decompress the stored chunks and
957 * write them to stdout. The chunks are stored sequentially in the
958 * file, so we iterate over them and do them one-by-one.
960 int iolog_file_inflate(const char *file)
962 struct inflate_chunk_iter iter = { .chunk_sz = 64 * 1024 * 1024, };
963 struct iolog_compress ic;
971 f = fopen(file, "r");
977 if (stat(file, &sb) < 0) {
983 ic.buf = buf = malloc(sb.st_size);
987 ret = fread(ic.buf, ic.len, 1, f);
993 } else if (ret != 1) {
994 log_err("fio: short read on reading log\n");
1003 * Each chunk will return Z_STREAM_END. We don't know how many
1004 * chunks are in the file, so we just keep looping and incrementing
1005 * the sequence number until we have consumed the whole compressed
1012 iret = inflate_chunk(&ic, 1, stdout, &stream, &iter);
1025 finish_chunk(&stream, stdout, &iter);
1035 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1040 int iolog_file_inflate(const char *file)
1042 log_err("fio: log inflation not possible without zlib\n");
1048 void flush_log(struct io_log *log, bool do_append)
1054 f = fopen(log->filename, "w");
1056 f = fopen(log->filename, "a");
1058 perror("fopen log");
1062 buf = set_file_buffer(f);
1064 inflate_gz_chunks(log, f);
1066 while (!flist_empty(&log->io_logs)) {
1067 struct io_logs *cur_log;
1069 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1070 flist_del_init(&cur_log->list);
1072 if (log->td && log == log->td->clat_hist_log)
1073 flush_hist_samples(f, log->hist_coarseness, cur_log->log,
1074 log_sample_sz(log, cur_log));
1076 flush_samples(f, cur_log->log, log_sample_sz(log, cur_log));
1082 clear_file_buffer(buf);
1085 static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
1087 if (td->flags & TD_F_COMPRESS_LOG)
1091 if (fio_trylock_file(log->filename))
1094 fio_lock_file(log->filename);
1096 if (td->client_type == FIO_CLIENT_TYPE_GUI || is_backend)
1097 fio_send_iolog(td, log, log->filename);
1099 flush_log(log, !td->o.per_job_logs);
1101 fio_unlock_file(log->filename);
1106 size_t log_chunk_sizes(struct io_log *log)
1108 struct flist_head *entry;
1111 if (flist_empty(&log->chunk_list))
1115 pthread_mutex_lock(&log->chunk_lock);
1116 flist_for_each(entry, &log->chunk_list) {
1117 struct iolog_compress *c;
1119 c = flist_entry(entry, struct iolog_compress, list);
1122 pthread_mutex_unlock(&log->chunk_lock);
1128 static int gz_work(struct iolog_flush_data *data)
1130 struct iolog_compress *c = NULL;
1131 struct flist_head list;
1137 INIT_FLIST_HEAD(&list);
1139 memset(&stream, 0, sizeof(stream));
1140 stream.zalloc = Z_NULL;
1141 stream.zfree = Z_NULL;
1142 stream.opaque = Z_NULL;
1144 ret = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
1146 log_err("fio: failed to init gz stream\n");
1150 seq = ++data->log->chunk_seq;
1152 stream.next_in = (void *) data->samples;
1153 stream.avail_in = data->nr_samples * log_entry_sz(data->log);
1155 dprint(FD_COMPRESS, "deflate input size=%lu, seq=%u, log=%s\n",
1156 (unsigned long) stream.avail_in, seq,
1157 data->log->filename);
1160 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1161 (unsigned long) c->len);
1162 c = get_new_chunk(seq);
1163 stream.avail_out = GZ_CHUNK;
1164 stream.next_out = c->buf;
1165 ret = deflate(&stream, Z_NO_FLUSH);
1167 log_err("fio: deflate log (%d)\n", ret);
1172 c->len = GZ_CHUNK - stream.avail_out;
1173 flist_add_tail(&c->list, &list);
1175 } while (stream.avail_in);
1177 stream.next_out = c->buf + c->len;
1178 stream.avail_out = GZ_CHUNK - c->len;
1180 ret = deflate(&stream, Z_FINISH);
1183 * Z_BUF_ERROR is special, it just means we need more
1184 * output space. We'll handle that below. Treat any other
1187 if (ret != Z_BUF_ERROR) {
1188 log_err("fio: deflate log (%d)\n", ret);
1189 flist_del(&c->list);
1196 c->len = GZ_CHUNK - stream.avail_out;
1198 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq, (unsigned long) c->len);
1200 if (ret != Z_STREAM_END) {
1202 c = get_new_chunk(seq);
1203 stream.avail_out = GZ_CHUNK;
1204 stream.next_out = c->buf;
1205 ret = deflate(&stream, Z_FINISH);
1206 c->len = GZ_CHUNK - stream.avail_out;
1208 flist_add_tail(&c->list, &list);
1209 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1210 (unsigned long) c->len);
1211 } while (ret != Z_STREAM_END);
1214 dprint(FD_COMPRESS, "deflated to size=%lu\n", (unsigned long) total);
1216 ret = deflateEnd(&stream);
1218 log_err("fio: deflateEnd %d\n", ret);
1220 free(data->samples);
1222 if (!flist_empty(&list)) {
1223 pthread_mutex_lock(&data->log->chunk_lock);
1224 flist_splice_tail(&list, &data->log->chunk_list);
1225 pthread_mutex_unlock(&data->log->chunk_lock);
1234 while (!flist_empty(&list)) {
1235 c = flist_first_entry(list.next, struct iolog_compress, list);
1236 flist_del(&c->list);
1244 * Invoked from our compress helper thread, when logging would have exceeded
1245 * the specified memory limitation. Compresses the previously stored
1248 static int gz_work_async(struct submit_worker *sw, struct workqueue_work *work)
1250 return gz_work(container_of(work, struct iolog_flush_data, work));
1253 static int gz_init_worker(struct submit_worker *sw)
1255 struct thread_data *td = sw->wq->td;
1257 if (!fio_option_is_set(&td->o, log_gz_cpumask))
1260 if (fio_setaffinity(gettid(), td->o.log_gz_cpumask) == -1) {
1261 log_err("gz: failed to set CPU affinity\n");
1268 static struct workqueue_ops log_compress_wq_ops = {
1269 .fn = gz_work_async,
1270 .init_worker_fn = gz_init_worker,
1274 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1276 if (!(td->flags & TD_F_COMPRESS_LOG))
1279 workqueue_init(td, &td->log_compress_wq, &log_compress_wq_ops, 1, sk_out);
1283 void iolog_compress_exit(struct thread_data *td)
1285 if (!(td->flags & TD_F_COMPRESS_LOG))
1288 workqueue_exit(&td->log_compress_wq);
1292 * Queue work item to compress the existing log entries. We reset the
1293 * current log to a small size, and reference the existing log in the
1294 * data that we queue for compression. Once compression has been done,
1295 * this old log is freed. If called with finish == true, will not return
1296 * until the log compression has completed, and will flush all previous
1299 static int iolog_flush(struct io_log *log)
1301 struct iolog_flush_data *data;
1303 data = malloc(sizeof(*data));
1310 while (!flist_empty(&log->io_logs)) {
1311 struct io_logs *cur_log;
1313 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1314 flist_del_init(&cur_log->list);
1316 data->samples = cur_log->log;
1317 data->nr_samples = cur_log->nr_samples;
1328 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1330 struct iolog_flush_data *data;
1332 data = malloc(sizeof(*data));
1338 data->samples = cur_log->log;
1339 data->nr_samples = cur_log->nr_samples;
1342 cur_log->nr_samples = cur_log->max_samples = 0;
1343 cur_log->log = NULL;
1345 workqueue_enqueue(&log->td->log_compress_wq, &data->work);
1350 static int iolog_flush(struct io_log *log)
1355 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1360 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1365 void iolog_compress_exit(struct thread_data *td)
1371 struct io_logs *iolog_cur_log(struct io_log *log)
1373 if (flist_empty(&log->io_logs))
1376 return flist_last_entry(&log->io_logs, struct io_logs, list);
1379 uint64_t iolog_nr_samples(struct io_log *iolog)
1381 struct flist_head *entry;
1384 flist_for_each(entry, &iolog->io_logs) {
1385 struct io_logs *cur_log;
1387 cur_log = flist_entry(entry, struct io_logs, list);
1388 ret += cur_log->nr_samples;
1394 static int __write_log(struct thread_data *td, struct io_log *log, int try)
1397 return finish_log(td, log, try);
1402 static int write_iops_log(struct thread_data *td, int try, bool unit_log)
1406 if (per_unit_log(td->iops_log) != unit_log)
1409 ret = __write_log(td, td->iops_log, try);
1411 td->iops_log = NULL;
1416 static int write_slat_log(struct thread_data *td, int try, bool unit_log)
1423 ret = __write_log(td, td->slat_log, try);
1425 td->slat_log = NULL;
1430 static int write_clat_log(struct thread_data *td, int try, bool unit_log)
1437 ret = __write_log(td, td->clat_log, try);
1439 td->clat_log = NULL;
1444 static int write_clat_hist_log(struct thread_data *td, int try, bool unit_log)
1451 ret = __write_log(td, td->clat_hist_log, try);
1453 td->clat_hist_log = NULL;
1458 static int write_lat_log(struct thread_data *td, int try, bool unit_log)
1465 ret = __write_log(td, td->lat_log, try);
1472 static int write_bandw_log(struct thread_data *td, int try, bool unit_log)
1476 if (per_unit_log(td->bw_log) != unit_log)
1479 ret = __write_log(td, td->bw_log, try);
1492 CLAT_HIST_LOG_MASK = 32,
1499 int (*fn)(struct thread_data *, int, bool);
1502 static struct log_type log_types[] = {
1504 .mask = BW_LOG_MASK,
1505 .fn = write_bandw_log,
1508 .mask = LAT_LOG_MASK,
1509 .fn = write_lat_log,
1512 .mask = SLAT_LOG_MASK,
1513 .fn = write_slat_log,
1516 .mask = CLAT_LOG_MASK,
1517 .fn = write_clat_log,
1520 .mask = IOPS_LOG_MASK,
1521 .fn = write_iops_log,
1524 .mask = CLAT_HIST_LOG_MASK,
1525 .fn = write_clat_hist_log,
1529 void td_writeout_logs(struct thread_data *td, bool unit_logs)
1531 unsigned int log_mask = 0;
1532 unsigned int log_left = ALL_LOG_NR;
1535 old_state = td_bump_runstate(td, TD_FINISHING);
1537 finalize_logs(td, unit_logs);
1540 int prev_log_left = log_left;
1542 for (i = 0; i < ALL_LOG_NR && log_left; i++) {
1543 struct log_type *lt = &log_types[i];
1546 if (!(log_mask & lt->mask)) {
1547 ret = lt->fn(td, log_left != 1, unit_logs);
1550 log_mask |= lt->mask;
1555 if (prev_log_left == log_left)
1559 td_restore_runstate(td, old_state);
1562 void fio_writeout_logs(bool unit_logs)
1564 struct thread_data *td;
1568 td_writeout_logs(td, unit_logs);