2 * Code related to writing an iolog of what a thread is doing, and to
3 * later read that back and replay
24 static int iolog_flush(struct io_log *log);
26 static const char iolog_ver2[] = "fio version 2 iolog";
28 void queue_io_piece(struct thread_data *td, struct io_piece *ipo)
30 flist_add_tail(&ipo->list, &td->io_log_list);
31 td->total_io_size += ipo->len;
34 void log_io_u(const struct thread_data *td, const struct io_u *io_u)
36 if (!td->o.write_iolog_file)
39 fprintf(td->iolog_f, "%s %s %llu %lu\n", io_u->file->file_name,
40 io_ddir_name(io_u->ddir),
41 io_u->offset, io_u->buflen);
44 void log_file(struct thread_data *td, struct fio_file *f,
45 enum file_log_act what)
47 const char *act[] = { "add", "open", "close" };
51 if (!td->o.write_iolog_file)
56 * this happens on the pre-open/close done before the job starts
61 fprintf(td->iolog_f, "%s %s\n", f->file_name, act[what]);
64 static void iolog_delay(struct thread_data *td, unsigned long delay)
66 uint64_t usec = utime_since_now(&td->last_issue);
70 if (delay < td->time_offset) {
75 delay -= td->time_offset;
81 fio_gettime(&ts, NULL);
82 while (delay && !td->terminate) {
84 if (this_delay > 500000)
87 usec_sleep(td, this_delay);
91 usec = utime_since_now(&ts);
93 td->time_offset = usec - delay;
98 static int ipo_special(struct thread_data *td, struct io_piece *ipo)
106 if (ipo->ddir != DDIR_INVAL)
109 f = td->files[ipo->fileno];
111 switch (ipo->file_action) {
112 case FIO_LOG_OPEN_FILE:
113 if (td->o.replay_redirect && fio_file_open(f)) {
114 dprint(FD_FILE, "iolog: ignoring re-open of file %s\n",
118 ret = td_io_open_file(td, f);
121 td_verror(td, ret, "iolog open file");
123 case FIO_LOG_CLOSE_FILE:
124 td_io_close_file(td, f);
126 case FIO_LOG_UNLINK_FILE:
127 td_io_unlink_file(td, f);
130 log_err("fio: bad file action %d\n", ipo->file_action);
137 int read_iolog_get(struct thread_data *td, struct io_u *io_u)
139 struct io_piece *ipo;
140 unsigned long elapsed;
142 while (!flist_empty(&td->io_log_list)) {
145 ipo = flist_first_entry(&td->io_log_list, struct io_piece, list);
146 flist_del(&ipo->list);
147 remove_trim_entry(td, ipo);
149 ret = ipo_special(td, ipo);
153 } else if (ret > 0) {
158 io_u->ddir = ipo->ddir;
159 if (ipo->ddir != DDIR_WAIT) {
160 io_u->offset = ipo->offset;
161 io_u->buflen = ipo->len;
162 io_u->file = td->files[ipo->fileno];
163 get_file(io_u->file);
164 dprint(FD_IO, "iolog: get %llu/%lu/%s\n", io_u->offset,
165 io_u->buflen, io_u->file->file_name);
167 iolog_delay(td, ipo->delay);
169 elapsed = mtime_since_genesis();
170 if (ipo->delay > elapsed)
171 usec_sleep(td, (ipo->delay - elapsed) * 1000);
176 if (io_u->ddir != DDIR_WAIT)
184 void prune_io_piece_log(struct thread_data *td)
186 struct io_piece *ipo;
189 while ((n = rb_first(&td->io_hist_tree)) != NULL) {
190 ipo = rb_entry(n, struct io_piece, rb_node);
191 rb_erase(n, &td->io_hist_tree);
192 remove_trim_entry(td, ipo);
197 while (!flist_empty(&td->io_hist_list)) {
198 ipo = flist_first_entry(&td->io_hist_list, struct io_piece, list);
199 flist_del(&ipo->list);
200 remove_trim_entry(td, ipo);
207 * log a successful write, so we can unwind the log for verify
209 void log_io_piece(struct thread_data *td, struct io_u *io_u)
211 struct rb_node **p, *parent;
212 struct io_piece *ipo, *__ipo;
214 ipo = malloc(sizeof(struct io_piece));
216 ipo->file = io_u->file;
217 ipo->offset = io_u->offset;
218 ipo->len = io_u->buflen;
219 ipo->numberio = io_u->numberio;
220 ipo->flags = IP_F_IN_FLIGHT;
224 if (io_u_should_trim(td, io_u)) {
225 flist_add_tail(&ipo->trim_list, &td->trim_list);
230 * We don't need to sort the entries, if:
232 * Sequential writes, or
233 * Random writes that lay out the file as it goes along
235 * For both these cases, just reading back data in the order we
236 * wrote it out is the fastest.
238 * One exception is if we don't have a random map AND we are doing
239 * verifies, in that case we need to check for duplicate blocks and
240 * drop the old one, which we rely on the rb insert/lookup for
243 if (((!td->o.verifysort) || !td_random(td) || !td->o.overwrite) &&
244 (file_randommap(td, ipo->file) || td->o.verify == VERIFY_NONE)) {
245 INIT_FLIST_HEAD(&ipo->list);
246 flist_add_tail(&ipo->list, &td->io_hist_list);
247 ipo->flags |= IP_F_ONLIST;
252 RB_CLEAR_NODE(&ipo->rb_node);
255 * Sort the entry into the verification list
258 p = &td->io_hist_tree.rb_node;
264 __ipo = rb_entry(parent, struct io_piece, rb_node);
265 if (ipo->file < __ipo->file)
267 else if (ipo->file > __ipo->file)
269 else if (ipo->offset < __ipo->offset) {
271 overlap = ipo->offset + ipo->len > __ipo->offset;
273 else if (ipo->offset > __ipo->offset) {
275 overlap = __ipo->offset + __ipo->len > ipo->offset;
281 dprint(FD_IO, "iolog: overlap %llu/%lu, %llu/%lu\n",
282 __ipo->offset, __ipo->len,
283 ipo->offset, ipo->len);
285 rb_erase(parent, &td->io_hist_tree);
286 remove_trim_entry(td, __ipo);
292 rb_link_node(&ipo->rb_node, parent, p);
293 rb_insert_color(&ipo->rb_node, &td->io_hist_tree);
294 ipo->flags |= IP_F_ONRB;
298 void unlog_io_piece(struct thread_data *td, struct io_u *io_u)
300 struct io_piece *ipo = io_u->ipo;
302 if (td->ts.nr_block_infos) {
303 uint32_t *info = io_u_block_info(td, io_u);
304 if (BLOCK_INFO_STATE(*info) < BLOCK_STATE_TRIM_FAILURE) {
305 if (io_u->ddir == DDIR_TRIM)
306 *info = BLOCK_INFO_SET_STATE(*info,
307 BLOCK_STATE_TRIM_FAILURE);
308 else if (io_u->ddir == DDIR_WRITE)
309 *info = BLOCK_INFO_SET_STATE(*info,
310 BLOCK_STATE_WRITE_FAILURE);
317 if (ipo->flags & IP_F_ONRB)
318 rb_erase(&ipo->rb_node, &td->io_hist_tree);
319 else if (ipo->flags & IP_F_ONLIST)
320 flist_del(&ipo->list);
327 void trim_io_piece(struct thread_data *td, const struct io_u *io_u)
329 struct io_piece *ipo = io_u->ipo;
334 ipo->len = io_u->xfer_buflen - io_u->resid;
337 void write_iolog_close(struct thread_data *td)
343 td->iolog_buf = NULL;
347 * Read version 2 iolog data. It is enhanced to include per-file logging,
350 static int read_iolog2(struct thread_data *td, FILE *f)
352 unsigned long long offset;
354 int reads, writes, waits, fileno = 0, file_action = 0; /* stupid gcc */
355 char *rfname, *fname, *act;
359 free_release_files(td);
362 * Read in the read iolog and store it, reuse the infrastructure
363 * for doing verifications.
366 rfname = fname = malloc(256+16);
367 act = malloc(256+16);
369 reads = writes = waits = 0;
370 while ((p = fgets(str, 4096, f)) != NULL) {
371 struct io_piece *ipo;
374 r = sscanf(p, "%256s %256s %llu %u", rfname, act, &offset,
377 if (td->o.replay_redirect)
378 fname = td->o.replay_redirect;
384 if (!strcmp(act, "wait"))
386 else if (!strcmp(act, "read"))
388 else if (!strcmp(act, "write"))
390 else if (!strcmp(act, "sync"))
392 else if (!strcmp(act, "datasync"))
394 else if (!strcmp(act, "trim"))
397 log_err("fio: bad iolog file action: %s\n",
401 fileno = get_fileno(td, fname);
404 if (!strcmp(act, "add")) {
405 if (td->o.replay_redirect &&
406 get_fileno(td, fname) != -1) {
407 dprint(FD_FILE, "iolog: ignoring"
408 " re-add of file %s\n", fname);
410 fileno = add_file(td, fname, 0, 1);
411 file_action = FIO_LOG_ADD_FILE;
414 } else if (!strcmp(act, "open")) {
415 fileno = get_fileno(td, fname);
416 file_action = FIO_LOG_OPEN_FILE;
417 } else if (!strcmp(act, "close")) {
418 fileno = get_fileno(td, fname);
419 file_action = FIO_LOG_CLOSE_FILE;
421 log_err("fio: bad iolog file action: %s\n",
426 log_err("bad iolog2: %s\n", p);
432 else if (rw == DDIR_WRITE) {
434 * Don't add a write for ro mode
439 } else if (rw == DDIR_WAIT) {
443 } else if (rw == DDIR_INVAL) {
444 } else if (!ddir_sync(rw)) {
445 log_err("bad ddir: %d\n", rw);
452 ipo = malloc(sizeof(*ipo));
455 if (rw == DDIR_WAIT) {
458 if (td->o.replay_scale)
459 ipo->offset = offset / td->o.replay_scale;
461 ipo->offset = offset;
462 ipo_bytes_align(td->o.replay_align, ipo);
465 if (rw != DDIR_INVAL && bytes > td->o.max_bs[rw])
466 td->o.max_bs[rw] = bytes;
467 ipo->fileno = fileno;
468 ipo->file_action = file_action;
472 queue_io_piece(td, ipo);
479 if (writes && read_only) {
480 log_err("fio: <%s> skips replay of %d writes due to"
481 " read-only\n", td->o.name, writes);
485 if (!reads && !writes && !waits)
487 else if (reads && !writes)
488 td->o.td_ddir = TD_DDIR_READ;
489 else if (!reads && writes)
490 td->o.td_ddir = TD_DDIR_WRITE;
492 td->o.td_ddir = TD_DDIR_RW;
498 * open iolog, check version, and call appropriate parser
500 static int init_iolog_read(struct thread_data *td)
502 char buffer[256], *p;
506 f = fopen(td->o.read_iolog_file, "r");
508 perror("fopen read iolog");
512 p = fgets(buffer, sizeof(buffer), f);
514 td_verror(td, errno, "iolog read");
515 log_err("fio: unable to read iolog\n");
521 * version 2 of the iolog stores a specific string as the
522 * first line, check for that
524 if (!strncmp(iolog_ver2, buffer, strlen(iolog_ver2)))
525 ret = read_iolog2(td, f);
527 log_err("fio: iolog version 1 is no longer supported\n");
536 * Set up a log for storing io patterns.
538 static int init_iolog_write(struct thread_data *td)
544 f = fopen(td->o.write_iolog_file, "a");
546 perror("fopen write iolog");
551 * That's it for writing, setup a log buffer and we're done.
554 td->iolog_buf = malloc(8192);
555 setvbuf(f, td->iolog_buf, _IOFBF, 8192);
558 * write our version line
560 if (fprintf(f, "%s\n", iolog_ver2) < 0) {
561 perror("iolog init\n");
566 * add all known files
568 for_each_file(td, ff, i)
569 log_file(td, ff, FIO_LOG_ADD_FILE);
574 int init_iolog(struct thread_data *td)
578 if (td->o.read_iolog_file) {
582 * Check if it's a blktrace file and load that if possible.
583 * Otherwise assume it's a normal log file and load that.
585 if (is_blktrace(td->o.read_iolog_file, &need_swap))
586 ret = load_blktrace(td, td->o.read_iolog_file, need_swap);
588 ret = init_iolog_read(td);
589 } else if (td->o.write_iolog_file)
590 ret = init_iolog_write(td);
593 td_verror(td, EINVAL, "failed initializing iolog");
598 void setup_log(struct io_log **log, struct log_params *p,
599 const char *filename)
603 struct io_u_plat_entry *entry;
604 struct flist_head *list;
606 l = scalloc(1, sizeof(*l));
607 INIT_FLIST_HEAD(&l->io_logs);
608 l->log_type = p->log_type;
609 l->log_offset = p->log_offset;
610 l->log_gz = p->log_gz;
611 l->log_gz_store = p->log_gz_store;
612 l->avg_msec = p->avg_msec;
613 l->hist_msec = p->hist_msec;
614 l->hist_coarseness = p->hist_coarseness;
615 l->filename = strdup(filename);
618 /* Initialize histogram lists for each r/w direction,
619 * with initial io_u_plat of all zeros:
621 for (i = 0; i < DDIR_RWDIR_CNT; i++) {
622 list = &l->hist_window[i].list;
623 INIT_FLIST_HEAD(list);
624 entry = calloc(1, sizeof(struct io_u_plat_entry));
625 flist_add(&entry->list, list);
628 if (l->td && l->td->o.io_submit_mode != IO_MODE_OFFLOAD) {
631 p = calloc(1, sizeof(*l->pending));
632 p->max_samples = DEF_LOG_ENTRIES;
633 p->log = calloc(p->max_samples, log_entry_sz(l));
638 l->log_ddir_mask = LOG_OFFSET_SAMPLE_BIT;
640 INIT_FLIST_HEAD(&l->chunk_list);
642 if (l->log_gz && !p->td)
644 else if (l->log_gz || l->log_gz_store) {
645 mutex_init_pshared(&l->chunk_lock);
646 p->td->flags |= TD_F_COMPRESS_LOG;
652 #ifdef CONFIG_SETVBUF
653 static void *set_file_buffer(FILE *f)
655 size_t size = 1048576;
659 setvbuf(f, buf, _IOFBF, size);
663 static void clear_file_buffer(void *buf)
668 static void *set_file_buffer(FILE *f)
673 static void clear_file_buffer(void *buf)
678 void free_log(struct io_log *log)
680 while (!flist_empty(&log->io_logs)) {
681 struct io_logs *cur_log;
683 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
684 flist_del_init(&cur_log->list);
690 free(log->pending->log);
700 unsigned long hist_sum(int j, int stride, unsigned int *io_u_plat,
701 unsigned int *io_u_plat_last)
706 if (io_u_plat_last) {
707 for (k = sum = 0; k < stride; k++)
708 sum += io_u_plat[j + k] - io_u_plat_last[j + k];
710 for (k = sum = 0; k < stride; k++)
711 sum += io_u_plat[j + k];
717 static void flush_hist_samples(FILE *f, int hist_coarseness, void *samples,
718 uint64_t sample_size)
722 uint64_t i, j, nr_samples;
723 struct io_u_plat_entry *entry, *entry_before;
724 unsigned int *io_u_plat;
725 unsigned int *io_u_plat_before;
727 int stride = 1 << hist_coarseness;
732 s = __get_sample(samples, 0, 0);
733 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
735 nr_samples = sample_size / __log_entry_sz(log_offset);
737 for (i = 0; i < nr_samples; i++) {
738 s = __get_sample(samples, log_offset, i);
740 entry = s->data.plat_entry;
741 io_u_plat = entry->io_u_plat;
743 entry_before = flist_first_entry(&entry->list, struct io_u_plat_entry, list);
744 io_u_plat_before = entry_before->io_u_plat;
746 fprintf(f, "%lu, %u, %u, ", (unsigned long) s->time,
747 io_sample_ddir(s), s->bs);
748 for (j = 0; j < FIO_IO_U_PLAT_NR - stride; j += stride) {
749 fprintf(f, "%lu, ", hist_sum(j, stride, io_u_plat,
752 fprintf(f, "%lu\n", (unsigned long)
753 hist_sum(FIO_IO_U_PLAT_NR - stride, stride, io_u_plat,
756 flist_del(&entry_before->list);
761 void flush_samples(FILE *f, void *samples, uint64_t sample_size)
765 uint64_t i, nr_samples;
770 s = __get_sample(samples, 0, 0);
771 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
773 nr_samples = sample_size / __log_entry_sz(log_offset);
775 for (i = 0; i < nr_samples; i++) {
776 s = __get_sample(samples, log_offset, i);
779 fprintf(f, "%lu, %" PRId64 ", %u, %u\n",
780 (unsigned long) s->time,
782 io_sample_ddir(s), s->bs);
784 struct io_sample_offset *so = (void *) s;
786 fprintf(f, "%lu, %" PRId64 ", %u, %u, %llu\n",
787 (unsigned long) s->time,
789 io_sample_ddir(s), s->bs,
790 (unsigned long long) so->offset);
797 struct iolog_flush_data {
798 struct workqueue_work work;
805 #define GZ_CHUNK 131072
807 static struct iolog_compress *get_new_chunk(unsigned int seq)
809 struct iolog_compress *c;
811 c = malloc(sizeof(*c));
812 INIT_FLIST_HEAD(&c->list);
813 c->buf = malloc(GZ_CHUNK);
819 static void free_chunk(struct iolog_compress *ic)
825 static int z_stream_init(z_stream *stream, int gz_hdr)
829 memset(stream, 0, sizeof(*stream));
830 stream->zalloc = Z_NULL;
831 stream->zfree = Z_NULL;
832 stream->opaque = Z_NULL;
833 stream->next_in = Z_NULL;
836 * zlib magic - add 32 for auto-detection of gz header or not,
837 * if we decide to store files in a gzip friendly format.
842 if (inflateInit2(stream, wbits) != Z_OK)
848 struct inflate_chunk_iter {
857 static void finish_chunk(z_stream *stream, FILE *f,
858 struct inflate_chunk_iter *iter)
862 ret = inflateEnd(stream);
864 log_err("fio: failed to end log inflation seq %d (%d)\n",
867 flush_samples(f, iter->buf, iter->buf_used);
870 iter->buf_size = iter->buf_used = 0;
874 * Iterative chunk inflation. Handles cases where we cross into a new
875 * sequence, doing flush finish of previous chunk if needed.
877 static size_t inflate_chunk(struct iolog_compress *ic, int gz_hdr, FILE *f,
878 z_stream *stream, struct inflate_chunk_iter *iter)
882 dprint(FD_COMPRESS, "inflate chunk size=%lu, seq=%u\n",
883 (unsigned long) ic->len, ic->seq);
885 if (ic->seq != iter->seq) {
887 finish_chunk(stream, f, iter);
889 z_stream_init(stream, gz_hdr);
893 stream->avail_in = ic->len;
894 stream->next_in = ic->buf;
896 if (!iter->buf_size) {
897 iter->buf_size = iter->chunk_sz;
898 iter->buf = malloc(iter->buf_size);
901 while (stream->avail_in) {
902 size_t this_out = iter->buf_size - iter->buf_used;
905 stream->avail_out = this_out;
906 stream->next_out = iter->buf + iter->buf_used;
908 err = inflate(stream, Z_NO_FLUSH);
910 log_err("fio: failed inflating log: %d\n", err);
915 iter->buf_used += this_out - stream->avail_out;
917 if (!stream->avail_out) {
918 iter->buf_size += iter->chunk_sz;
919 iter->buf = realloc(iter->buf, iter->buf_size);
923 if (err == Z_STREAM_END)
927 ret = (void *) stream->next_in - ic->buf;
929 dprint(FD_COMPRESS, "inflated to size=%lu\n", (unsigned long) iter->buf_size);
935 * Inflate stored compressed chunks, or write them directly to the log
936 * file if so instructed.
938 static int inflate_gz_chunks(struct io_log *log, FILE *f)
940 struct inflate_chunk_iter iter = { .chunk_sz = log->log_gz, };
943 while (!flist_empty(&log->chunk_list)) {
944 struct iolog_compress *ic;
946 ic = flist_first_entry(&log->chunk_list, struct iolog_compress, list);
947 flist_del(&ic->list);
949 if (log->log_gz_store) {
952 dprint(FD_COMPRESS, "log write chunk size=%lu, "
953 "seq=%u\n", (unsigned long) ic->len, ic->seq);
955 ret = fwrite(ic->buf, ic->len, 1, f);
956 if (ret != 1 || ferror(f)) {
958 log_err("fio: error writing compressed log\n");
961 inflate_chunk(ic, log->log_gz_store, f, &stream, &iter);
967 finish_chunk(&stream, f, &iter);
975 * Open compressed log file and decompress the stored chunks and
976 * write them to stdout. The chunks are stored sequentially in the
977 * file, so we iterate over them and do them one-by-one.
979 int iolog_file_inflate(const char *file)
981 struct inflate_chunk_iter iter = { .chunk_sz = 64 * 1024 * 1024, };
982 struct iolog_compress ic;
990 f = fopen(file, "r");
996 if (stat(file, &sb) < 0) {
1002 ic.buf = buf = malloc(sb.st_size);
1003 ic.len = sb.st_size;
1006 ret = fread(ic.buf, ic.len, 1, f);
1012 } else if (ret != 1) {
1013 log_err("fio: short read on reading log\n");
1022 * Each chunk will return Z_STREAM_END. We don't know how many
1023 * chunks are in the file, so we just keep looping and incrementing
1024 * the sequence number until we have consumed the whole compressed
1031 iret = inflate_chunk(&ic, 1, stdout, &stream, &iter);
1044 finish_chunk(&stream, stdout, &iter);
1054 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1059 int iolog_file_inflate(const char *file)
1061 log_err("fio: log inflation not possible without zlib\n");
1067 void flush_log(struct io_log *log, bool do_append)
1073 f = fopen(log->filename, "w");
1075 f = fopen(log->filename, "a");
1077 perror("fopen log");
1081 buf = set_file_buffer(f);
1083 inflate_gz_chunks(log, f);
1085 while (!flist_empty(&log->io_logs)) {
1086 struct io_logs *cur_log;
1088 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1089 flist_del_init(&cur_log->list);
1091 if (log->td && log == log->td->clat_hist_log)
1092 flush_hist_samples(f, log->hist_coarseness, cur_log->log,
1093 log_sample_sz(log, cur_log));
1095 flush_samples(f, cur_log->log, log_sample_sz(log, cur_log));
1101 clear_file_buffer(buf);
1104 static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
1106 if (td->flags & TD_F_COMPRESS_LOG)
1110 if (fio_trylock_file(log->filename))
1113 fio_lock_file(log->filename);
1115 if (td->client_type == FIO_CLIENT_TYPE_GUI || is_backend)
1116 fio_send_iolog(td, log, log->filename);
1118 flush_log(log, !td->o.per_job_logs);
1120 fio_unlock_file(log->filename);
1125 size_t log_chunk_sizes(struct io_log *log)
1127 struct flist_head *entry;
1130 if (flist_empty(&log->chunk_list))
1134 pthread_mutex_lock(&log->chunk_lock);
1135 flist_for_each(entry, &log->chunk_list) {
1136 struct iolog_compress *c;
1138 c = flist_entry(entry, struct iolog_compress, list);
1141 pthread_mutex_unlock(&log->chunk_lock);
1147 static int gz_work(struct iolog_flush_data *data)
1149 struct iolog_compress *c = NULL;
1150 struct flist_head list;
1156 INIT_FLIST_HEAD(&list);
1158 memset(&stream, 0, sizeof(stream));
1159 stream.zalloc = Z_NULL;
1160 stream.zfree = Z_NULL;
1161 stream.opaque = Z_NULL;
1163 ret = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
1165 log_err("fio: failed to init gz stream\n");
1169 seq = ++data->log->chunk_seq;
1171 stream.next_in = (void *) data->samples;
1172 stream.avail_in = data->nr_samples * log_entry_sz(data->log);
1174 dprint(FD_COMPRESS, "deflate input size=%lu, seq=%u, log=%s\n",
1175 (unsigned long) stream.avail_in, seq,
1176 data->log->filename);
1179 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1180 (unsigned long) c->len);
1181 c = get_new_chunk(seq);
1182 stream.avail_out = GZ_CHUNK;
1183 stream.next_out = c->buf;
1184 ret = deflate(&stream, Z_NO_FLUSH);
1186 log_err("fio: deflate log (%d)\n", ret);
1191 c->len = GZ_CHUNK - stream.avail_out;
1192 flist_add_tail(&c->list, &list);
1194 } while (stream.avail_in);
1196 stream.next_out = c->buf + c->len;
1197 stream.avail_out = GZ_CHUNK - c->len;
1199 ret = deflate(&stream, Z_FINISH);
1202 * Z_BUF_ERROR is special, it just means we need more
1203 * output space. We'll handle that below. Treat any other
1206 if (ret != Z_BUF_ERROR) {
1207 log_err("fio: deflate log (%d)\n", ret);
1208 flist_del(&c->list);
1215 c->len = GZ_CHUNK - stream.avail_out;
1217 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq, (unsigned long) c->len);
1219 if (ret != Z_STREAM_END) {
1221 c = get_new_chunk(seq);
1222 stream.avail_out = GZ_CHUNK;
1223 stream.next_out = c->buf;
1224 ret = deflate(&stream, Z_FINISH);
1225 c->len = GZ_CHUNK - stream.avail_out;
1227 flist_add_tail(&c->list, &list);
1228 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1229 (unsigned long) c->len);
1230 } while (ret != Z_STREAM_END);
1233 dprint(FD_COMPRESS, "deflated to size=%lu\n", (unsigned long) total);
1235 ret = deflateEnd(&stream);
1237 log_err("fio: deflateEnd %d\n", ret);
1239 free(data->samples);
1241 if (!flist_empty(&list)) {
1242 pthread_mutex_lock(&data->log->chunk_lock);
1243 flist_splice_tail(&list, &data->log->chunk_list);
1244 pthread_mutex_unlock(&data->log->chunk_lock);
1253 while (!flist_empty(&list)) {
1254 c = flist_first_entry(list.next, struct iolog_compress, list);
1255 flist_del(&c->list);
1263 * Invoked from our compress helper thread, when logging would have exceeded
1264 * the specified memory limitation. Compresses the previously stored
1267 static int gz_work_async(struct submit_worker *sw, struct workqueue_work *work)
1269 return gz_work(container_of(work, struct iolog_flush_data, work));
1272 static int gz_init_worker(struct submit_worker *sw)
1274 struct thread_data *td = sw->wq->td;
1276 if (!fio_option_is_set(&td->o, log_gz_cpumask))
1279 if (fio_setaffinity(gettid(), td->o.log_gz_cpumask) == -1) {
1280 log_err("gz: failed to set CPU affinity\n");
1287 static struct workqueue_ops log_compress_wq_ops = {
1288 .fn = gz_work_async,
1289 .init_worker_fn = gz_init_worker,
1293 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1295 if (!(td->flags & TD_F_COMPRESS_LOG))
1298 workqueue_init(td, &td->log_compress_wq, &log_compress_wq_ops, 1, sk_out);
1302 void iolog_compress_exit(struct thread_data *td)
1304 if (!(td->flags & TD_F_COMPRESS_LOG))
1307 workqueue_exit(&td->log_compress_wq);
1311 * Queue work item to compress the existing log entries. We reset the
1312 * current log to a small size, and reference the existing log in the
1313 * data that we queue for compression. Once compression has been done,
1314 * this old log is freed. If called with finish == true, will not return
1315 * until the log compression has completed, and will flush all previous
1318 static int iolog_flush(struct io_log *log)
1320 struct iolog_flush_data *data;
1322 data = malloc(sizeof(*data));
1329 while (!flist_empty(&log->io_logs)) {
1330 struct io_logs *cur_log;
1332 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1333 flist_del_init(&cur_log->list);
1335 data->samples = cur_log->log;
1336 data->nr_samples = cur_log->nr_samples;
1347 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1349 struct iolog_flush_data *data;
1351 data = malloc(sizeof(*data));
1357 data->samples = cur_log->log;
1358 data->nr_samples = cur_log->nr_samples;
1361 cur_log->nr_samples = cur_log->max_samples = 0;
1362 cur_log->log = NULL;
1364 workqueue_enqueue(&log->td->log_compress_wq, &data->work);
1369 static int iolog_flush(struct io_log *log)
1374 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1379 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1384 void iolog_compress_exit(struct thread_data *td)
1390 struct io_logs *iolog_cur_log(struct io_log *log)
1392 if (flist_empty(&log->io_logs))
1395 return flist_last_entry(&log->io_logs, struct io_logs, list);
1398 uint64_t iolog_nr_samples(struct io_log *iolog)
1400 struct flist_head *entry;
1403 flist_for_each(entry, &iolog->io_logs) {
1404 struct io_logs *cur_log;
1406 cur_log = flist_entry(entry, struct io_logs, list);
1407 ret += cur_log->nr_samples;
1413 static int __write_log(struct thread_data *td, struct io_log *log, int try)
1416 return finish_log(td, log, try);
1421 static int write_iops_log(struct thread_data *td, int try, bool unit_log)
1425 if (per_unit_log(td->iops_log) != unit_log)
1428 ret = __write_log(td, td->iops_log, try);
1430 td->iops_log = NULL;
1435 static int write_slat_log(struct thread_data *td, int try, bool unit_log)
1442 ret = __write_log(td, td->slat_log, try);
1444 td->slat_log = NULL;
1449 static int write_clat_log(struct thread_data *td, int try, bool unit_log)
1456 ret = __write_log(td, td->clat_log, try);
1458 td->clat_log = NULL;
1463 static int write_clat_hist_log(struct thread_data *td, int try, bool unit_log)
1470 ret = __write_log(td, td->clat_hist_log, try);
1472 td->clat_hist_log = NULL;
1477 static int write_lat_log(struct thread_data *td, int try, bool unit_log)
1484 ret = __write_log(td, td->lat_log, try);
1491 static int write_bandw_log(struct thread_data *td, int try, bool unit_log)
1495 if (per_unit_log(td->bw_log) != unit_log)
1498 ret = __write_log(td, td->bw_log, try);
1511 CLAT_HIST_LOG_MASK = 32,
1518 int (*fn)(struct thread_data *, int, bool);
1521 static struct log_type log_types[] = {
1523 .mask = BW_LOG_MASK,
1524 .fn = write_bandw_log,
1527 .mask = LAT_LOG_MASK,
1528 .fn = write_lat_log,
1531 .mask = SLAT_LOG_MASK,
1532 .fn = write_slat_log,
1535 .mask = CLAT_LOG_MASK,
1536 .fn = write_clat_log,
1539 .mask = IOPS_LOG_MASK,
1540 .fn = write_iops_log,
1543 .mask = CLAT_HIST_LOG_MASK,
1544 .fn = write_clat_hist_log,
1548 void td_writeout_logs(struct thread_data *td, bool unit_logs)
1550 unsigned int log_mask = 0;
1551 unsigned int log_left = ALL_LOG_NR;
1554 old_state = td_bump_runstate(td, TD_FINISHING);
1556 finalize_logs(td, unit_logs);
1559 int prev_log_left = log_left;
1561 for (i = 0; i < ALL_LOG_NR && log_left; i++) {
1562 struct log_type *lt = &log_types[i];
1565 if (!(log_mask & lt->mask)) {
1566 ret = lt->fn(td, log_left != 1, unit_logs);
1569 log_mask |= lt->mask;
1574 if (prev_log_left == log_left)
1578 td_restore_runstate(td, old_state);
1581 void fio_writeout_logs(bool unit_logs)
1583 struct thread_data *td;
1587 td_writeout_logs(td, unit_logs);