2 * Code related to writing an iolog of what a thread is doing, and to
3 * later read that back and replay
23 static int iolog_flush(struct io_log *log);
25 static const char iolog_ver2[] = "fio version 2 iolog";
27 void queue_io_piece(struct thread_data *td, struct io_piece *ipo)
29 flist_add_tail(&ipo->list, &td->io_log_list);
30 td->total_io_size += ipo->len;
33 void log_io_u(const struct thread_data *td, const struct io_u *io_u)
35 if (!td->o.write_iolog_file)
38 fprintf(td->iolog_f, "%s %s %llu %lu\n", io_u->file->file_name,
39 io_ddir_name(io_u->ddir),
40 io_u->offset, io_u->buflen);
43 void log_file(struct thread_data *td, struct fio_file *f,
44 enum file_log_act what)
46 const char *act[] = { "add", "open", "close" };
50 if (!td->o.write_iolog_file)
55 * this happens on the pre-open/close done before the job starts
60 fprintf(td->iolog_f, "%s %s\n", f->file_name, act[what]);
63 static void iolog_delay(struct thread_data *td, unsigned long delay)
65 uint64_t usec = utime_since_now(&td->last_issue);
69 if (delay < td->time_offset) {
74 delay -= td->time_offset;
80 fio_gettime(&tv, NULL);
81 while (delay && !td->terminate) {
83 if (this_delay > 500000)
86 usec_sleep(td, this_delay);
90 usec = utime_since_now(&tv);
92 td->time_offset = usec - delay;
97 static int ipo_special(struct thread_data *td, struct io_piece *ipo)
105 if (ipo->ddir != DDIR_INVAL)
108 f = td->files[ipo->fileno];
110 switch (ipo->file_action) {
111 case FIO_LOG_OPEN_FILE:
112 ret = td_io_open_file(td, f);
115 td_verror(td, ret, "iolog open file");
117 case FIO_LOG_CLOSE_FILE:
118 td_io_close_file(td, f);
120 case FIO_LOG_UNLINK_FILE:
121 td_io_unlink_file(td, f);
124 log_err("fio: bad file action %d\n", ipo->file_action);
131 int read_iolog_get(struct thread_data *td, struct io_u *io_u)
133 struct io_piece *ipo;
134 unsigned long elapsed;
136 while (!flist_empty(&td->io_log_list)) {
139 ipo = flist_first_entry(&td->io_log_list, struct io_piece, list);
140 flist_del(&ipo->list);
141 remove_trim_entry(td, ipo);
143 ret = ipo_special(td, ipo);
147 } else if (ret > 0) {
152 io_u->ddir = ipo->ddir;
153 if (ipo->ddir != DDIR_WAIT) {
154 io_u->offset = ipo->offset;
155 io_u->buflen = ipo->len;
156 io_u->file = td->files[ipo->fileno];
157 get_file(io_u->file);
158 dprint(FD_IO, "iolog: get %llu/%lu/%s\n", io_u->offset,
159 io_u->buflen, io_u->file->file_name);
161 iolog_delay(td, ipo->delay);
163 elapsed = mtime_since_genesis();
164 if (ipo->delay > elapsed)
165 usec_sleep(td, (ipo->delay - elapsed) * 1000);
170 if (io_u->ddir != DDIR_WAIT)
178 void prune_io_piece_log(struct thread_data *td)
180 struct io_piece *ipo;
183 while ((n = rb_first(&td->io_hist_tree)) != NULL) {
184 ipo = rb_entry(n, struct io_piece, rb_node);
185 rb_erase(n, &td->io_hist_tree);
186 remove_trim_entry(td, ipo);
191 while (!flist_empty(&td->io_hist_list)) {
192 ipo = flist_first_entry(&td->io_hist_list, struct io_piece, list);
193 flist_del(&ipo->list);
194 remove_trim_entry(td, ipo);
201 * log a successful write, so we can unwind the log for verify
203 void log_io_piece(struct thread_data *td, struct io_u *io_u)
205 struct rb_node **p, *parent;
206 struct io_piece *ipo, *__ipo;
208 ipo = malloc(sizeof(struct io_piece));
210 ipo->file = io_u->file;
211 ipo->offset = io_u->offset;
212 ipo->len = io_u->buflen;
213 ipo->numberio = io_u->numberio;
214 ipo->flags = IP_F_IN_FLIGHT;
218 if (io_u_should_trim(td, io_u)) {
219 flist_add_tail(&ipo->trim_list, &td->trim_list);
224 * We don't need to sort the entries, if:
226 * Sequential writes, or
227 * Random writes that lay out the file as it goes along
229 * For both these cases, just reading back data in the order we
230 * wrote it out is the fastest.
232 * One exception is if we don't have a random map AND we are doing
233 * verifies, in that case we need to check for duplicate blocks and
234 * drop the old one, which we rely on the rb insert/lookup for
237 if (((!td->o.verifysort) || !td_random(td) || !td->o.overwrite) &&
238 (file_randommap(td, ipo->file) || td->o.verify == VERIFY_NONE)) {
239 INIT_FLIST_HEAD(&ipo->list);
240 flist_add_tail(&ipo->list, &td->io_hist_list);
241 ipo->flags |= IP_F_ONLIST;
246 RB_CLEAR_NODE(&ipo->rb_node);
249 * Sort the entry into the verification list
252 p = &td->io_hist_tree.rb_node;
258 __ipo = rb_entry(parent, struct io_piece, rb_node);
259 if (ipo->file < __ipo->file)
261 else if (ipo->file > __ipo->file)
263 else if (ipo->offset < __ipo->offset) {
265 overlap = ipo->offset + ipo->len > __ipo->offset;
267 else if (ipo->offset > __ipo->offset) {
269 overlap = __ipo->offset + __ipo->len > ipo->offset;
275 dprint(FD_IO, "iolog: overlap %llu/%lu, %llu/%lu",
276 __ipo->offset, __ipo->len,
277 ipo->offset, ipo->len);
279 rb_erase(parent, &td->io_hist_tree);
280 remove_trim_entry(td, __ipo);
286 rb_link_node(&ipo->rb_node, parent, p);
287 rb_insert_color(&ipo->rb_node, &td->io_hist_tree);
288 ipo->flags |= IP_F_ONRB;
292 void unlog_io_piece(struct thread_data *td, struct io_u *io_u)
294 struct io_piece *ipo = io_u->ipo;
296 if (td->ts.nr_block_infos) {
297 uint32_t *info = io_u_block_info(td, io_u);
298 if (BLOCK_INFO_STATE(*info) < BLOCK_STATE_TRIM_FAILURE) {
299 if (io_u->ddir == DDIR_TRIM)
300 *info = BLOCK_INFO_SET_STATE(*info,
301 BLOCK_STATE_TRIM_FAILURE);
302 else if (io_u->ddir == DDIR_WRITE)
303 *info = BLOCK_INFO_SET_STATE(*info,
304 BLOCK_STATE_WRITE_FAILURE);
311 if (ipo->flags & IP_F_ONRB)
312 rb_erase(&ipo->rb_node, &td->io_hist_tree);
313 else if (ipo->flags & IP_F_ONLIST)
314 flist_del(&ipo->list);
321 void trim_io_piece(struct thread_data *td, const struct io_u *io_u)
323 struct io_piece *ipo = io_u->ipo;
328 ipo->len = io_u->xfer_buflen - io_u->resid;
331 void write_iolog_close(struct thread_data *td)
337 td->iolog_buf = NULL;
341 * Read version 2 iolog data. It is enhanced to include per-file logging,
344 static int read_iolog2(struct thread_data *td, FILE *f)
346 unsigned long long offset;
348 int reads, writes, waits, fileno = 0, file_action = 0; /* stupid gcc */
353 free_release_files(td);
356 * Read in the read iolog and store it, reuse the infrastructure
357 * for doing verifications.
360 fname = malloc(256+16);
361 act = malloc(256+16);
363 reads = writes = waits = 0;
364 while ((p = fgets(str, 4096, f)) != NULL) {
365 struct io_piece *ipo;
368 r = sscanf(p, "%256s %256s %llu %u", fname, act, &offset,
374 if (!strcmp(act, "wait"))
376 else if (!strcmp(act, "read"))
378 else if (!strcmp(act, "write"))
380 else if (!strcmp(act, "sync"))
382 else if (!strcmp(act, "datasync"))
384 else if (!strcmp(act, "trim"))
387 log_err("fio: bad iolog file action: %s\n",
391 fileno = get_fileno(td, fname);
394 if (!strcmp(act, "add")) {
395 fileno = add_file(td, fname, 0, 1);
396 file_action = FIO_LOG_ADD_FILE;
398 } else if (!strcmp(act, "open")) {
399 fileno = get_fileno(td, fname);
400 file_action = FIO_LOG_OPEN_FILE;
401 } else if (!strcmp(act, "close")) {
402 fileno = get_fileno(td, fname);
403 file_action = FIO_LOG_CLOSE_FILE;
405 log_err("fio: bad iolog file action: %s\n",
410 log_err("bad iolog2: %s", p);
416 else if (rw == DDIR_WRITE) {
418 * Don't add a write for ro mode
423 } else if (rw == DDIR_WAIT) {
425 } else if (rw == DDIR_INVAL) {
426 } else if (!ddir_sync(rw)) {
427 log_err("bad ddir: %d\n", rw);
434 ipo = malloc(sizeof(*ipo));
437 if (rw == DDIR_WAIT) {
440 ipo->offset = offset;
442 if (rw != DDIR_INVAL && bytes > td->o.max_bs[rw])
443 td->o.max_bs[rw] = bytes;
444 ipo->fileno = fileno;
445 ipo->file_action = file_action;
449 queue_io_piece(td, ipo);
456 if (writes && read_only) {
457 log_err("fio: <%s> skips replay of %d writes due to"
458 " read-only\n", td->o.name, writes);
462 if (!reads && !writes && !waits)
464 else if (reads && !writes)
465 td->o.td_ddir = TD_DDIR_READ;
466 else if (!reads && writes)
467 td->o.td_ddir = TD_DDIR_WRITE;
469 td->o.td_ddir = TD_DDIR_RW;
475 * open iolog, check version, and call appropriate parser
477 static int init_iolog_read(struct thread_data *td)
479 char buffer[256], *p;
483 f = fopen(td->o.read_iolog_file, "r");
485 perror("fopen read iolog");
489 p = fgets(buffer, sizeof(buffer), f);
491 td_verror(td, errno, "iolog read");
492 log_err("fio: unable to read iolog\n");
498 * version 2 of the iolog stores a specific string as the
499 * first line, check for that
501 if (!strncmp(iolog_ver2, buffer, strlen(iolog_ver2)))
502 ret = read_iolog2(td, f);
504 log_err("fio: iolog version 1 is no longer supported\n");
513 * Set up a log for storing io patterns.
515 static int init_iolog_write(struct thread_data *td)
521 f = fopen(td->o.write_iolog_file, "a");
523 perror("fopen write iolog");
528 * That's it for writing, setup a log buffer and we're done.
531 td->iolog_buf = malloc(8192);
532 setvbuf(f, td->iolog_buf, _IOFBF, 8192);
535 * write our version line
537 if (fprintf(f, "%s\n", iolog_ver2) < 0) {
538 perror("iolog init\n");
543 * add all known files
545 for_each_file(td, ff, i)
546 log_file(td, ff, FIO_LOG_ADD_FILE);
551 int init_iolog(struct thread_data *td)
555 if (td->o.read_iolog_file) {
559 * Check if it's a blktrace file and load that if possible.
560 * Otherwise assume it's a normal log file and load that.
562 if (is_blktrace(td->o.read_iolog_file, &need_swap))
563 ret = load_blktrace(td, td->o.read_iolog_file, need_swap);
565 ret = init_iolog_read(td);
566 } else if (td->o.write_iolog_file)
567 ret = init_iolog_write(td);
570 td_verror(td, EINVAL, "failed initializing iolog");
575 void setup_log(struct io_log **log, struct log_params *p,
576 const char *filename)
580 struct io_u_plat_entry *entry;
581 struct flist_head *list;
583 l = scalloc(1, sizeof(*l));
584 INIT_FLIST_HEAD(&l->io_logs);
585 l->log_type = p->log_type;
586 l->log_offset = p->log_offset;
587 l->log_gz = p->log_gz;
588 l->log_gz_store = p->log_gz_store;
589 l->avg_msec = p->avg_msec;
590 l->hist_msec = p->hist_msec;
591 l->hist_coarseness = p->hist_coarseness;
592 l->filename = strdup(filename);
595 /* Initialize histogram lists for each r/w direction,
596 * with initial io_u_plat of all zeros:
598 for (i = 0; i < DDIR_RWDIR_CNT; i++) {
599 list = &l->hist_window[i].list;
600 INIT_FLIST_HEAD(list);
601 entry = calloc(1, sizeof(struct io_u_plat_entry));
602 flist_add(&entry->list, list);
605 if (l->td && l->td->o.io_submit_mode != IO_MODE_OFFLOAD) {
608 p = calloc(1, sizeof(*l->pending));
609 p->max_samples = DEF_LOG_ENTRIES;
610 p->log = calloc(p->max_samples, log_entry_sz(l));
615 l->log_ddir_mask = LOG_OFFSET_SAMPLE_BIT;
617 INIT_FLIST_HEAD(&l->chunk_list);
619 if (l->log_gz && !p->td)
621 else if (l->log_gz || l->log_gz_store) {
622 mutex_init_pshared(&l->chunk_lock);
623 p->td->flags |= TD_F_COMPRESS_LOG;
629 #ifdef CONFIG_SETVBUF
630 static void *set_file_buffer(FILE *f)
632 size_t size = 1048576;
636 setvbuf(f, buf, _IOFBF, size);
640 static void clear_file_buffer(void *buf)
645 static void *set_file_buffer(FILE *f)
650 static void clear_file_buffer(void *buf)
655 void free_log(struct io_log *log)
657 while (!flist_empty(&log->io_logs)) {
658 struct io_logs *cur_log;
660 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
661 flist_del_init(&cur_log->list);
667 free(log->pending->log);
677 inline unsigned long hist_sum(int j, int stride, unsigned int *io_u_plat,
678 unsigned int *io_u_plat_last)
683 if (io_u_plat_last) {
684 for (k = sum = 0; k < stride; k++)
685 sum += io_u_plat[j + k] - io_u_plat_last[j + k];
687 for (k = sum = 0; k < stride; k++)
688 sum += io_u_plat[j + k];
694 static void flush_hist_samples(FILE *f, int hist_coarseness, void *samples,
695 uint64_t sample_size)
699 uint64_t i, j, nr_samples;
700 struct io_u_plat_entry *entry, *entry_before;
701 unsigned int *io_u_plat;
702 unsigned int *io_u_plat_before;
704 int stride = 1 << hist_coarseness;
709 s = __get_sample(samples, 0, 0);
710 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
712 nr_samples = sample_size / __log_entry_sz(log_offset);
714 for (i = 0; i < nr_samples; i++) {
715 s = __get_sample(samples, log_offset, i);
717 entry = (struct io_u_plat_entry *) (uintptr_t) s->val;
718 io_u_plat = entry->io_u_plat;
720 entry_before = flist_first_entry(&entry->list, struct io_u_plat_entry, list);
721 io_u_plat_before = entry_before->io_u_plat;
723 fprintf(f, "%lu, %u, %u, ", (unsigned long) s->time,
724 io_sample_ddir(s), s->bs);
725 for (j = 0; j < FIO_IO_U_PLAT_NR - stride; j += stride) {
726 fprintf(f, "%lu, ", hist_sum(j, stride, io_u_plat,
729 fprintf(f, "%lu\n", (unsigned long)
730 hist_sum(FIO_IO_U_PLAT_NR - stride, stride, io_u_plat,
733 flist_del(&entry_before->list);
738 void flush_samples(FILE *f, void *samples, uint64_t sample_size)
742 uint64_t i, nr_samples;
747 s = __get_sample(samples, 0, 0);
748 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
750 nr_samples = sample_size / __log_entry_sz(log_offset);
752 for (i = 0; i < nr_samples; i++) {
753 s = __get_sample(samples, log_offset, i);
756 fprintf(f, "%lu, %lu, %u, %u\n",
757 (unsigned long) s->time,
758 (unsigned long) s->val,
759 io_sample_ddir(s), s->bs);
761 struct io_sample_offset *so = (void *) s;
763 fprintf(f, "%lu, %lu, %u, %u, %llu\n",
764 (unsigned long) s->time,
765 (unsigned long) s->val,
766 io_sample_ddir(s), s->bs,
767 (unsigned long long) so->offset);
774 struct iolog_flush_data {
775 struct workqueue_work work;
782 #define GZ_CHUNK 131072
784 static struct iolog_compress *get_new_chunk(unsigned int seq)
786 struct iolog_compress *c;
788 c = malloc(sizeof(*c));
789 INIT_FLIST_HEAD(&c->list);
790 c->buf = malloc(GZ_CHUNK);
796 static void free_chunk(struct iolog_compress *ic)
802 static int z_stream_init(z_stream *stream, int gz_hdr)
806 memset(stream, 0, sizeof(*stream));
807 stream->zalloc = Z_NULL;
808 stream->zfree = Z_NULL;
809 stream->opaque = Z_NULL;
810 stream->next_in = Z_NULL;
813 * zlib magic - add 32 for auto-detection of gz header or not,
814 * if we decide to store files in a gzip friendly format.
819 if (inflateInit2(stream, wbits) != Z_OK)
825 struct inflate_chunk_iter {
834 static void finish_chunk(z_stream *stream, FILE *f,
835 struct inflate_chunk_iter *iter)
839 ret = inflateEnd(stream);
841 log_err("fio: failed to end log inflation seq %d (%d)\n",
844 flush_samples(f, iter->buf, iter->buf_used);
847 iter->buf_size = iter->buf_used = 0;
851 * Iterative chunk inflation. Handles cases where we cross into a new
852 * sequence, doing flush finish of previous chunk if needed.
854 static size_t inflate_chunk(struct iolog_compress *ic, int gz_hdr, FILE *f,
855 z_stream *stream, struct inflate_chunk_iter *iter)
859 dprint(FD_COMPRESS, "inflate chunk size=%lu, seq=%u\n",
860 (unsigned long) ic->len, ic->seq);
862 if (ic->seq != iter->seq) {
864 finish_chunk(stream, f, iter);
866 z_stream_init(stream, gz_hdr);
870 stream->avail_in = ic->len;
871 stream->next_in = ic->buf;
873 if (!iter->buf_size) {
874 iter->buf_size = iter->chunk_sz;
875 iter->buf = malloc(iter->buf_size);
878 while (stream->avail_in) {
879 size_t this_out = iter->buf_size - iter->buf_used;
882 stream->avail_out = this_out;
883 stream->next_out = iter->buf + iter->buf_used;
885 err = inflate(stream, Z_NO_FLUSH);
887 log_err("fio: failed inflating log: %d\n", err);
892 iter->buf_used += this_out - stream->avail_out;
894 if (!stream->avail_out) {
895 iter->buf_size += iter->chunk_sz;
896 iter->buf = realloc(iter->buf, iter->buf_size);
900 if (err == Z_STREAM_END)
904 ret = (void *) stream->next_in - ic->buf;
906 dprint(FD_COMPRESS, "inflated to size=%lu\n", (unsigned long) iter->buf_size);
912 * Inflate stored compressed chunks, or write them directly to the log
913 * file if so instructed.
915 static int inflate_gz_chunks(struct io_log *log, FILE *f)
917 struct inflate_chunk_iter iter = { .chunk_sz = log->log_gz, };
920 while (!flist_empty(&log->chunk_list)) {
921 struct iolog_compress *ic;
923 ic = flist_first_entry(&log->chunk_list, struct iolog_compress, list);
924 flist_del(&ic->list);
926 if (log->log_gz_store) {
929 dprint(FD_COMPRESS, "log write chunk size=%lu, "
930 "seq=%u\n", (unsigned long) ic->len, ic->seq);
932 ret = fwrite(ic->buf, ic->len, 1, f);
933 if (ret != 1 || ferror(f)) {
935 log_err("fio: error writing compressed log\n");
938 inflate_chunk(ic, log->log_gz_store, f, &stream, &iter);
944 finish_chunk(&stream, f, &iter);
952 * Open compressed log file and decompress the stored chunks and
953 * write them to stdout. The chunks are stored sequentially in the
954 * file, so we iterate over them and do them one-by-one.
956 int iolog_file_inflate(const char *file)
958 struct inflate_chunk_iter iter = { .chunk_sz = 64 * 1024 * 1024, };
959 struct iolog_compress ic;
967 f = fopen(file, "r");
973 if (stat(file, &sb) < 0) {
979 ic.buf = buf = malloc(sb.st_size);
983 ret = fread(ic.buf, ic.len, 1, f);
989 } else if (ret != 1) {
990 log_err("fio: short read on reading log\n");
999 * Each chunk will return Z_STREAM_END. We don't know how many
1000 * chunks are in the file, so we just keep looping and incrementing
1001 * the sequence number until we have consumed the whole compressed
1008 iret = inflate_chunk(&ic, 1, stdout, &stream, &iter);
1021 finish_chunk(&stream, stdout, &iter);
1031 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1036 int iolog_file_inflate(const char *file)
1038 log_err("fio: log inflation not possible without zlib\n");
1044 void flush_log(struct io_log *log, bool do_append)
1050 f = fopen(log->filename, "w");
1052 f = fopen(log->filename, "a");
1054 perror("fopen log");
1058 buf = set_file_buffer(f);
1060 inflate_gz_chunks(log, f);
1062 while (!flist_empty(&log->io_logs)) {
1063 struct io_logs *cur_log;
1065 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1066 flist_del_init(&cur_log->list);
1068 if (log->td && log == log->td->clat_hist_log)
1069 flush_hist_samples(f, log->hist_coarseness, cur_log->log,
1070 log_sample_sz(log, cur_log));
1072 flush_samples(f, cur_log->log, log_sample_sz(log, cur_log));
1078 clear_file_buffer(buf);
1081 static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
1083 if (td->flags & TD_F_COMPRESS_LOG)
1087 if (fio_trylock_file(log->filename))
1090 fio_lock_file(log->filename);
1092 if (td->client_type == FIO_CLIENT_TYPE_GUI || is_backend)
1093 fio_send_iolog(td, log, log->filename);
1095 flush_log(log, !td->o.per_job_logs);
1097 fio_unlock_file(log->filename);
1102 size_t log_chunk_sizes(struct io_log *log)
1104 struct flist_head *entry;
1107 if (flist_empty(&log->chunk_list))
1111 pthread_mutex_lock(&log->chunk_lock);
1112 flist_for_each(entry, &log->chunk_list) {
1113 struct iolog_compress *c;
1115 c = flist_entry(entry, struct iolog_compress, list);
1118 pthread_mutex_unlock(&log->chunk_lock);
1124 static int gz_work(struct iolog_flush_data *data)
1126 struct iolog_compress *c = NULL;
1127 struct flist_head list;
1133 INIT_FLIST_HEAD(&list);
1135 memset(&stream, 0, sizeof(stream));
1136 stream.zalloc = Z_NULL;
1137 stream.zfree = Z_NULL;
1138 stream.opaque = Z_NULL;
1140 ret = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
1142 log_err("fio: failed to init gz stream\n");
1146 seq = ++data->log->chunk_seq;
1148 stream.next_in = (void *) data->samples;
1149 stream.avail_in = data->nr_samples * log_entry_sz(data->log);
1151 dprint(FD_COMPRESS, "deflate input size=%lu, seq=%u, log=%s\n",
1152 (unsigned long) stream.avail_in, seq,
1153 data->log->filename);
1156 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1157 (unsigned long) c->len);
1158 c = get_new_chunk(seq);
1159 stream.avail_out = GZ_CHUNK;
1160 stream.next_out = c->buf;
1161 ret = deflate(&stream, Z_NO_FLUSH);
1163 log_err("fio: deflate log (%d)\n", ret);
1168 c->len = GZ_CHUNK - stream.avail_out;
1169 flist_add_tail(&c->list, &list);
1171 } while (stream.avail_in);
1173 stream.next_out = c->buf + c->len;
1174 stream.avail_out = GZ_CHUNK - c->len;
1176 ret = deflate(&stream, Z_FINISH);
1179 * Z_BUF_ERROR is special, it just means we need more
1180 * output space. We'll handle that below. Treat any other
1183 if (ret != Z_BUF_ERROR) {
1184 log_err("fio: deflate log (%d)\n", ret);
1185 flist_del(&c->list);
1192 c->len = GZ_CHUNK - stream.avail_out;
1194 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq, (unsigned long) c->len);
1196 if (ret != Z_STREAM_END) {
1198 c = get_new_chunk(seq);
1199 stream.avail_out = GZ_CHUNK;
1200 stream.next_out = c->buf;
1201 ret = deflate(&stream, Z_FINISH);
1202 c->len = GZ_CHUNK - stream.avail_out;
1204 flist_add_tail(&c->list, &list);
1205 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1206 (unsigned long) c->len);
1207 } while (ret != Z_STREAM_END);
1210 dprint(FD_COMPRESS, "deflated to size=%lu\n", (unsigned long) total);
1212 ret = deflateEnd(&stream);
1214 log_err("fio: deflateEnd %d\n", ret);
1216 free(data->samples);
1218 if (!flist_empty(&list)) {
1219 pthread_mutex_lock(&data->log->chunk_lock);
1220 flist_splice_tail(&list, &data->log->chunk_list);
1221 pthread_mutex_unlock(&data->log->chunk_lock);
1230 while (!flist_empty(&list)) {
1231 c = flist_first_entry(list.next, struct iolog_compress, list);
1232 flist_del(&c->list);
1240 * Invoked from our compress helper thread, when logging would have exceeded
1241 * the specified memory limitation. Compresses the previously stored
1244 static int gz_work_async(struct submit_worker *sw, struct workqueue_work *work)
1246 return gz_work(container_of(work, struct iolog_flush_data, work));
1249 static int gz_init_worker(struct submit_worker *sw)
1251 struct thread_data *td = sw->wq->td;
1253 if (!fio_option_is_set(&td->o, log_gz_cpumask))
1256 if (fio_setaffinity(gettid(), td->o.log_gz_cpumask) == -1) {
1257 log_err("gz: failed to set CPU affinity\n");
1264 static struct workqueue_ops log_compress_wq_ops = {
1265 .fn = gz_work_async,
1266 .init_worker_fn = gz_init_worker,
1270 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1272 if (!(td->flags & TD_F_COMPRESS_LOG))
1275 workqueue_init(td, &td->log_compress_wq, &log_compress_wq_ops, 1, sk_out);
1279 void iolog_compress_exit(struct thread_data *td)
1281 if (!(td->flags & TD_F_COMPRESS_LOG))
1284 workqueue_exit(&td->log_compress_wq);
1288 * Queue work item to compress the existing log entries. We reset the
1289 * current log to a small size, and reference the existing log in the
1290 * data that we queue for compression. Once compression has been done,
1291 * this old log is freed. If called with finish == true, will not return
1292 * until the log compression has completed, and will flush all previous
1295 static int iolog_flush(struct io_log *log)
1297 struct iolog_flush_data *data;
1299 data = malloc(sizeof(*data));
1306 while (!flist_empty(&log->io_logs)) {
1307 struct io_logs *cur_log;
1309 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1310 flist_del_init(&cur_log->list);
1312 data->samples = cur_log->log;
1313 data->nr_samples = cur_log->nr_samples;
1324 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1326 struct iolog_flush_data *data;
1328 data = malloc(sizeof(*data));
1334 data->samples = cur_log->log;
1335 data->nr_samples = cur_log->nr_samples;
1338 cur_log->nr_samples = cur_log->max_samples = 0;
1339 cur_log->log = NULL;
1341 workqueue_enqueue(&log->td->log_compress_wq, &data->work);
1346 static int iolog_flush(struct io_log *log)
1351 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1356 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1361 void iolog_compress_exit(struct thread_data *td)
1367 struct io_logs *iolog_cur_log(struct io_log *log)
1369 if (flist_empty(&log->io_logs))
1372 return flist_last_entry(&log->io_logs, struct io_logs, list);
1375 uint64_t iolog_nr_samples(struct io_log *iolog)
1377 struct flist_head *entry;
1380 flist_for_each(entry, &iolog->io_logs) {
1381 struct io_logs *cur_log;
1383 cur_log = flist_entry(entry, struct io_logs, list);
1384 ret += cur_log->nr_samples;
1390 static int __write_log(struct thread_data *td, struct io_log *log, int try)
1393 return finish_log(td, log, try);
1398 static int write_iops_log(struct thread_data *td, int try, bool unit_log)
1402 if (per_unit_log(td->iops_log) != unit_log)
1405 ret = __write_log(td, td->iops_log, try);
1407 td->iops_log = NULL;
1412 static int write_slat_log(struct thread_data *td, int try, bool unit_log)
1419 ret = __write_log(td, td->slat_log, try);
1421 td->slat_log = NULL;
1426 static int write_clat_log(struct thread_data *td, int try, bool unit_log)
1433 ret = __write_log(td, td->clat_log, try);
1435 td->clat_log = NULL;
1440 static int write_clat_hist_log(struct thread_data *td, int try, bool unit_log)
1447 ret = __write_log(td, td->clat_hist_log, try);
1449 td->clat_hist_log = NULL;
1454 static int write_lat_log(struct thread_data *td, int try, bool unit_log)
1461 ret = __write_log(td, td->lat_log, try);
1468 static int write_bandw_log(struct thread_data *td, int try, bool unit_log)
1472 if (per_unit_log(td->bw_log) != unit_log)
1475 ret = __write_log(td, td->bw_log, try);
1488 CLAT_HIST_LOG_MASK = 32,
1495 int (*fn)(struct thread_data *, int, bool);
1498 static struct log_type log_types[] = {
1500 .mask = BW_LOG_MASK,
1501 .fn = write_bandw_log,
1504 .mask = LAT_LOG_MASK,
1505 .fn = write_lat_log,
1508 .mask = SLAT_LOG_MASK,
1509 .fn = write_slat_log,
1512 .mask = CLAT_LOG_MASK,
1513 .fn = write_clat_log,
1516 .mask = IOPS_LOG_MASK,
1517 .fn = write_iops_log,
1520 .mask = CLAT_HIST_LOG_MASK,
1521 .fn = write_clat_hist_log,
1525 void td_writeout_logs(struct thread_data *td, bool unit_logs)
1527 unsigned int log_mask = 0;
1528 unsigned int log_left = ALL_LOG_NR;
1531 old_state = td_bump_runstate(td, TD_FINISHING);
1533 finalize_logs(td, unit_logs);
1536 int prev_log_left = log_left;
1538 for (i = 0; i < ALL_LOG_NR && log_left; i++) {
1539 struct log_type *lt = &log_types[i];
1542 if (!(log_mask & lt->mask)) {
1543 ret = lt->fn(td, log_left != 1, unit_logs);
1546 log_mask |= lt->mask;
1551 if (prev_log_left == log_left)
1555 td_restore_runstate(td, old_state);
1558 void fio_writeout_logs(bool unit_logs)
1560 struct thread_data *td;
1564 td_writeout_logs(td, unit_logs);