2 * Code related to writing an iolog of what a thread is doing, and to
3 * later read that back and replay
23 #include <netinet/in.h>
24 #include <netinet/tcp.h>
25 #include <arpa/inet.h>
27 #include <sys/socket.h>
30 static int iolog_flush(struct io_log *log);
32 static const char iolog_ver2[] = "fio version 2 iolog";
34 void queue_io_piece(struct thread_data *td, struct io_piece *ipo)
36 flist_add_tail(&ipo->list, &td->io_log_list);
37 td->total_io_size += ipo->len;
40 void log_io_u(const struct thread_data *td, const struct io_u *io_u)
42 if (!td->o.write_iolog_file)
45 fprintf(td->iolog_f, "%s %s %llu %llu\n", io_u->file->file_name,
46 io_ddir_name(io_u->ddir),
47 io_u->offset, io_u->buflen);
50 void log_file(struct thread_data *td, struct fio_file *f,
51 enum file_log_act what)
53 const char *act[] = { "add", "open", "close" };
57 if (!td->o.write_iolog_file)
62 * this happens on the pre-open/close done before the job starts
67 fprintf(td->iolog_f, "%s %s\n", f->file_name, act[what]);
70 static void iolog_delay(struct thread_data *td, unsigned long delay)
72 uint64_t usec = utime_since_now(&td->last_issue);
73 unsigned long orig_delay = delay;
77 if (delay < td->time_offset) {
82 delay -= td->time_offset;
88 fio_gettime(&ts, NULL);
89 while (delay && !td->terminate) {
91 if (this_delay > 500000)
94 usec_sleep(td, this_delay);
98 usec = utime_since_now(&ts);
99 if (usec > orig_delay)
100 td->time_offset = usec - orig_delay;
105 static int ipo_special(struct thread_data *td, struct io_piece *ipo)
113 if (ipo->ddir != DDIR_INVAL)
116 f = td->files[ipo->fileno];
118 switch (ipo->file_action) {
119 case FIO_LOG_OPEN_FILE:
120 if (td->o.replay_redirect && fio_file_open(f)) {
121 dprint(FD_FILE, "iolog: ignoring re-open of file %s\n",
125 ret = td_io_open_file(td, f);
128 td_verror(td, ret, "iolog open file");
130 case FIO_LOG_CLOSE_FILE:
131 td_io_close_file(td, f);
133 case FIO_LOG_UNLINK_FILE:
134 td_io_unlink_file(td, f);
137 log_err("fio: bad file action %d\n", ipo->file_action);
144 static bool read_iolog2(struct thread_data *td);
146 int read_iolog_get(struct thread_data *td, struct io_u *io_u)
148 struct io_piece *ipo;
149 unsigned long elapsed;
151 while (!flist_empty(&td->io_log_list)) {
153 if (td->o.read_iolog_chunked) {
154 if (td->io_log_checkmark == td->io_log_current) {
155 if (!read_iolog2(td))
158 td->io_log_current--;
160 ipo = flist_first_entry(&td->io_log_list, struct io_piece, list);
161 flist_del(&ipo->list);
162 remove_trim_entry(td, ipo);
164 ret = ipo_special(td, ipo);
168 } else if (ret > 0) {
173 io_u->ddir = ipo->ddir;
174 if (ipo->ddir != DDIR_WAIT) {
175 io_u->offset = ipo->offset;
176 io_u->buflen = ipo->len;
177 io_u->file = td->files[ipo->fileno];
178 get_file(io_u->file);
179 dprint(FD_IO, "iolog: get %llu/%llu/%s\n", io_u->offset,
180 io_u->buflen, io_u->file->file_name);
182 iolog_delay(td, ipo->delay);
184 elapsed = mtime_since_genesis();
185 if (ipo->delay > elapsed)
186 usec_sleep(td, (ipo->delay - elapsed) * 1000);
191 if (io_u->ddir != DDIR_WAIT)
199 void prune_io_piece_log(struct thread_data *td)
201 struct io_piece *ipo;
202 struct fio_rb_node *n;
204 while ((n = rb_first(&td->io_hist_tree)) != NULL) {
205 ipo = rb_entry(n, struct io_piece, rb_node);
206 rb_erase(n, &td->io_hist_tree);
207 remove_trim_entry(td, ipo);
212 while (!flist_empty(&td->io_hist_list)) {
213 ipo = flist_first_entry(&td->io_hist_list, struct io_piece, list);
214 flist_del(&ipo->list);
215 remove_trim_entry(td, ipo);
222 * log a successful write, so we can unwind the log for verify
224 void log_io_piece(struct thread_data *td, struct io_u *io_u)
226 struct fio_rb_node **p, *parent;
227 struct io_piece *ipo, *__ipo;
229 ipo = calloc(1, sizeof(struct io_piece));
231 ipo->file = io_u->file;
232 ipo->offset = io_u->offset;
233 ipo->len = io_u->buflen;
234 ipo->numberio = io_u->numberio;
235 ipo->flags = IP_F_IN_FLIGHT;
239 if (io_u_should_trim(td, io_u)) {
240 flist_add_tail(&ipo->trim_list, &td->trim_list);
245 * Only sort writes if we don't have a random map in which case we need
246 * to check for duplicate blocks and drop the old one, which we rely on
247 * the rb insert/lookup for handling.
249 if (file_randommap(td, ipo->file)) {
250 INIT_FLIST_HEAD(&ipo->list);
251 flist_add_tail(&ipo->list, &td->io_hist_list);
252 ipo->flags |= IP_F_ONLIST;
257 RB_CLEAR_NODE(&ipo->rb_node);
260 * Sort the entry into the verification list
263 p = &td->io_hist_tree.rb_node;
269 __ipo = rb_entry(parent, struct io_piece, rb_node);
270 if (ipo->file < __ipo->file)
272 else if (ipo->file > __ipo->file)
274 else if (ipo->offset < __ipo->offset) {
276 overlap = ipo->offset + ipo->len > __ipo->offset;
278 else if (ipo->offset > __ipo->offset) {
280 overlap = __ipo->offset + __ipo->len > ipo->offset;
286 dprint(FD_IO, "iolog: overlap %llu/%lu, %llu/%lu\n",
287 __ipo->offset, __ipo->len,
288 ipo->offset, ipo->len);
290 rb_erase(parent, &td->io_hist_tree);
291 remove_trim_entry(td, __ipo);
292 if (!(__ipo->flags & IP_F_IN_FLIGHT))
298 rb_link_node(&ipo->rb_node, parent, p);
299 rb_insert_color(&ipo->rb_node, &td->io_hist_tree);
300 ipo->flags |= IP_F_ONRB;
304 void unlog_io_piece(struct thread_data *td, struct io_u *io_u)
306 struct io_piece *ipo = io_u->ipo;
308 if (td->ts.nr_block_infos) {
309 uint32_t *info = io_u_block_info(td, io_u);
310 if (BLOCK_INFO_STATE(*info) < BLOCK_STATE_TRIM_FAILURE) {
311 if (io_u->ddir == DDIR_TRIM)
312 *info = BLOCK_INFO_SET_STATE(*info,
313 BLOCK_STATE_TRIM_FAILURE);
314 else if (io_u->ddir == DDIR_WRITE)
315 *info = BLOCK_INFO_SET_STATE(*info,
316 BLOCK_STATE_WRITE_FAILURE);
323 if (ipo->flags & IP_F_ONRB)
324 rb_erase(&ipo->rb_node, &td->io_hist_tree);
325 else if (ipo->flags & IP_F_ONLIST)
326 flist_del(&ipo->list);
333 void trim_io_piece(const struct io_u *io_u)
335 struct io_piece *ipo = io_u->ipo;
340 ipo->len = io_u->xfer_buflen - io_u->resid;
343 void write_iolog_close(struct thread_data *td)
352 td->iolog_buf = NULL;
355 static int64_t iolog_items_to_fetch(struct thread_data *td)
360 int64_t items_to_fetch;
362 if (!td->io_log_highmark)
366 fio_gettime(&now, NULL);
367 elapsed = ntime_since(&td->io_log_highmark_time, &now);
369 for_1s = (td->io_log_highmark - td->io_log_current) * 1000000000 / elapsed;
370 items_to_fetch = for_1s - td->io_log_current;
371 if (items_to_fetch < 0)
376 td->io_log_highmark = td->io_log_current + items_to_fetch;
377 td->io_log_checkmark = (td->io_log_highmark + 1) / 2;
378 fio_gettime(&td->io_log_highmark_time, NULL);
380 return items_to_fetch;
384 * Read version 2 iolog data. It is enhanced to include per-file logging,
387 static bool read_iolog2(struct thread_data *td)
389 unsigned long long offset;
391 int reads, writes, waits, fileno = 0, file_action = 0; /* stupid gcc */
392 char *rfname, *fname, *act;
395 bool realloc = false;
396 int64_t items_to_fetch = 0;
398 if (td->o.read_iolog_chunked) {
399 items_to_fetch = iolog_items_to_fetch(td);
405 * Read in the read iolog and store it, reuse the infrastructure
406 * for doing verifications.
409 rfname = fname = malloc(256+16);
410 act = malloc(256+16);
412 reads = writes = waits = 0;
413 while ((p = fgets(str, 4096, td->io_log_rfile)) != NULL) {
414 struct io_piece *ipo;
417 r = sscanf(p, "%256s %256s %llu %u", rfname, act, &offset,
420 if (td->o.replay_redirect)
421 fname = td->o.replay_redirect;
427 if (!strcmp(act, "wait"))
429 else if (!strcmp(act, "read"))
431 else if (!strcmp(act, "write"))
433 else if (!strcmp(act, "sync"))
435 else if (!strcmp(act, "datasync"))
437 else if (!strcmp(act, "trim"))
440 log_err("fio: bad iolog file action: %s\n",
444 fileno = get_fileno(td, fname);
447 if (!strcmp(act, "add")) {
448 if (td->o.replay_redirect &&
449 get_fileno(td, fname) != -1) {
450 dprint(FD_FILE, "iolog: ignoring"
451 " re-add of file %s\n", fname);
453 fileno = add_file(td, fname, td->subjob_number, 1);
454 file_action = FIO_LOG_ADD_FILE;
457 } else if (!strcmp(act, "open")) {
458 fileno = get_fileno(td, fname);
459 file_action = FIO_LOG_OPEN_FILE;
460 } else if (!strcmp(act, "close")) {
461 fileno = get_fileno(td, fname);
462 file_action = FIO_LOG_CLOSE_FILE;
464 log_err("fio: bad iolog file action: %s\n",
469 log_err("bad iolog2: %s\n", p);
475 else if (rw == DDIR_WRITE) {
477 * Don't add a write for ro mode
482 } else if (rw == DDIR_WAIT) {
486 } else if (rw == DDIR_INVAL) {
487 } else if (!ddir_sync(rw)) {
488 log_err("bad ddir: %d\n", rw);
495 ipo = calloc(1, sizeof(*ipo));
498 if (rw == DDIR_WAIT) {
501 if (td->o.replay_scale)
502 ipo->offset = offset / td->o.replay_scale;
504 ipo->offset = offset;
505 ipo_bytes_align(td->o.replay_align, ipo);
508 if (rw != DDIR_INVAL && bytes > td->o.max_bs[rw]) {
510 td->o.max_bs[rw] = bytes;
512 ipo->fileno = fileno;
513 ipo->file_action = file_action;
517 queue_io_piece(td, ipo);
519 if (td->o.read_iolog_chunked) {
520 td->io_log_current++;
522 if (items_to_fetch == 0)
531 if (td->o.read_iolog_chunked) {
532 td->io_log_highmark = td->io_log_current;
533 td->io_log_checkmark = (td->io_log_highmark + 1) / 2;
534 fio_gettime(&td->io_log_highmark_time, NULL);
537 if (writes && read_only) {
538 log_err("fio: <%s> skips replay of %d writes due to"
539 " read-only\n", td->o.name, writes);
543 if (td->o.read_iolog_chunked) {
544 if (td->io_log_current == 0) {
547 td->o.td_ddir = TD_DDIR_RW;
548 if (realloc && td->orig_buffer)
552 init_io_u_buffers(td);
557 if (!reads && !writes && !waits)
559 else if (reads && !writes)
560 td->o.td_ddir = TD_DDIR_READ;
561 else if (!reads && writes)
562 td->o.td_ddir = TD_DDIR_WRITE;
564 td->o.td_ddir = TD_DDIR_RW;
569 static bool is_socket(const char *path)
574 r = stat(path, &buf);
578 return S_ISSOCK(buf.st_mode);
581 static int open_socket(const char *path)
583 struct sockaddr_un addr;
586 fd = socket(AF_UNIX, SOCK_STREAM, 0);
590 addr.sun_family = AF_UNIX;
591 if (snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", path) >=
592 sizeof(addr.sun_path)) {
593 log_err("%s: path name %s is too long for a Unix socket\n",
597 ret = connect(fd, (const struct sockaddr *)&addr, strlen(path) + sizeof(addr.sun_family));
606 * open iolog, check version, and call appropriate parser
608 static bool init_iolog_read(struct thread_data *td)
610 char buffer[256], *p, *fname;
613 fname = get_name_by_idx(td->o.read_iolog_file, td->subjob_number);
614 dprint(FD_IO, "iolog: name=%s\n", fname);
616 if (is_socket(fname)) {
619 fd = open_socket(fname);
622 } else if (!strcmp(fname, "-")) {
625 f = fopen(fname, "r");
630 perror("fopen read iolog");
634 p = fgets(buffer, sizeof(buffer), f);
636 td_verror(td, errno, "iolog read");
637 log_err("fio: unable to read iolog\n");
643 * version 2 of the iolog stores a specific string as the
644 * first line, check for that
646 if (!strncmp(iolog_ver2, buffer, strlen(iolog_ver2))) {
647 free_release_files(td);
648 td->io_log_rfile = f;
649 return read_iolog2(td);
652 log_err("fio: iolog version 1 is no longer supported\n");
658 * Set up a log for storing io patterns.
660 static bool init_iolog_write(struct thread_data *td)
666 f = fopen(td->o.write_iolog_file, "a");
668 perror("fopen write iolog");
673 * That's it for writing, setup a log buffer and we're done.
676 td->iolog_buf = malloc(8192);
677 setvbuf(f, td->iolog_buf, _IOFBF, 8192);
680 * write our version line
682 if (fprintf(f, "%s\n", iolog_ver2) < 0) {
683 perror("iolog init\n");
688 * add all known files
690 for_each_file(td, ff, i)
691 log_file(td, ff, FIO_LOG_ADD_FILE);
696 bool init_iolog(struct thread_data *td)
700 if (td->o.read_iolog_file) {
704 * Check if it's a blktrace file and load that if possible.
705 * Otherwise assume it's a normal log file and load that.
707 if (is_blktrace(td->o.read_iolog_file, &need_swap))
708 ret = load_blktrace(td, td->o.read_iolog_file, need_swap);
710 ret = init_iolog_read(td);
711 } else if (td->o.write_iolog_file)
712 ret = init_iolog_write(td);
717 td_verror(td, EINVAL, "failed initializing iolog");
722 void setup_log(struct io_log **log, struct log_params *p,
723 const char *filename)
727 struct io_u_plat_entry *entry;
728 struct flist_head *list;
730 l = scalloc(1, sizeof(*l));
731 INIT_FLIST_HEAD(&l->io_logs);
732 l->log_type = p->log_type;
733 l->log_offset = p->log_offset;
734 l->log_gz = p->log_gz;
735 l->log_gz_store = p->log_gz_store;
736 l->avg_msec = p->avg_msec;
737 l->hist_msec = p->hist_msec;
738 l->hist_coarseness = p->hist_coarseness;
739 l->filename = strdup(filename);
742 /* Initialize histogram lists for each r/w direction,
743 * with initial io_u_plat of all zeros:
745 for (i = 0; i < DDIR_RWDIR_CNT; i++) {
746 list = &l->hist_window[i].list;
747 INIT_FLIST_HEAD(list);
748 entry = calloc(1, sizeof(struct io_u_plat_entry));
749 flist_add(&entry->list, list);
752 if (l->td && l->td->o.io_submit_mode != IO_MODE_OFFLOAD) {
755 __p = calloc(1, sizeof(*l->pending));
756 __p->max_samples = DEF_LOG_ENTRIES;
757 __p->log = calloc(__p->max_samples, log_entry_sz(l));
762 l->log_ddir_mask = LOG_OFFSET_SAMPLE_BIT;
764 INIT_FLIST_HEAD(&l->chunk_list);
766 if (l->log_gz && !p->td)
768 else if (l->log_gz || l->log_gz_store) {
769 mutex_init_pshared(&l->chunk_lock);
770 mutex_init_pshared(&l->deferred_free_lock);
771 p->td->flags |= TD_F_COMPRESS_LOG;
777 #ifdef CONFIG_SETVBUF
778 static void *set_file_buffer(FILE *f)
780 size_t size = 1048576;
784 setvbuf(f, buf, _IOFBF, size);
788 static void clear_file_buffer(void *buf)
793 static void *set_file_buffer(FILE *f)
798 static void clear_file_buffer(void *buf)
803 void free_log(struct io_log *log)
805 while (!flist_empty(&log->io_logs)) {
806 struct io_logs *cur_log;
808 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
809 flist_del_init(&cur_log->list);
815 free(log->pending->log);
825 uint64_t hist_sum(int j, int stride, uint64_t *io_u_plat,
826 uint64_t *io_u_plat_last)
831 if (io_u_plat_last) {
832 for (k = sum = 0; k < stride; k++)
833 sum += io_u_plat[j + k] - io_u_plat_last[j + k];
835 for (k = sum = 0; k < stride; k++)
836 sum += io_u_plat[j + k];
842 static void flush_hist_samples(FILE *f, int hist_coarseness, void *samples,
843 uint64_t sample_size)
847 uint64_t i, j, nr_samples;
848 struct io_u_plat_entry *entry, *entry_before;
850 uint64_t *io_u_plat_before;
852 int stride = 1 << hist_coarseness;
857 s = __get_sample(samples, 0, 0);
858 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
860 nr_samples = sample_size / __log_entry_sz(log_offset);
862 for (i = 0; i < nr_samples; i++) {
863 s = __get_sample(samples, log_offset, i);
865 entry = s->data.plat_entry;
866 io_u_plat = entry->io_u_plat;
868 entry_before = flist_first_entry(&entry->list, struct io_u_plat_entry, list);
869 io_u_plat_before = entry_before->io_u_plat;
871 fprintf(f, "%lu, %u, %llu, ", (unsigned long) s->time,
872 io_sample_ddir(s), (unsigned long long) s->bs);
873 for (j = 0; j < FIO_IO_U_PLAT_NR - stride; j += stride) {
874 fprintf(f, "%llu, ", (unsigned long long)
875 hist_sum(j, stride, io_u_plat, io_u_plat_before));
877 fprintf(f, "%llu\n", (unsigned long long)
878 hist_sum(FIO_IO_U_PLAT_NR - stride, stride, io_u_plat,
881 flist_del(&entry_before->list);
886 void flush_samples(FILE *f, void *samples, uint64_t sample_size)
890 uint64_t i, nr_samples;
895 s = __get_sample(samples, 0, 0);
896 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
898 nr_samples = sample_size / __log_entry_sz(log_offset);
900 for (i = 0; i < nr_samples; i++) {
901 s = __get_sample(samples, log_offset, i);
904 fprintf(f, "%lu, %" PRId64 ", %u, %llu, %u\n",
905 (unsigned long) s->time,
907 io_sample_ddir(s), (unsigned long long) s->bs, s->priority_bit);
909 struct io_sample_offset *so = (void *) s;
911 fprintf(f, "%lu, %" PRId64 ", %u, %llu, %llu, %u\n",
912 (unsigned long) s->time,
914 io_sample_ddir(s), (unsigned long long) s->bs,
915 (unsigned long long) so->offset, s->priority_bit);
922 struct iolog_flush_data {
923 struct workqueue_work work;
930 #define GZ_CHUNK 131072
932 static struct iolog_compress *get_new_chunk(unsigned int seq)
934 struct iolog_compress *c;
936 c = malloc(sizeof(*c));
937 INIT_FLIST_HEAD(&c->list);
938 c->buf = malloc(GZ_CHUNK);
944 static void free_chunk(struct iolog_compress *ic)
950 static int z_stream_init(z_stream *stream, int gz_hdr)
954 memset(stream, 0, sizeof(*stream));
955 stream->zalloc = Z_NULL;
956 stream->zfree = Z_NULL;
957 stream->opaque = Z_NULL;
958 stream->next_in = Z_NULL;
961 * zlib magic - add 32 for auto-detection of gz header or not,
962 * if we decide to store files in a gzip friendly format.
967 if (inflateInit2(stream, wbits) != Z_OK)
973 struct inflate_chunk_iter {
982 static void finish_chunk(z_stream *stream, FILE *f,
983 struct inflate_chunk_iter *iter)
987 ret = inflateEnd(stream);
989 log_err("fio: failed to end log inflation seq %d (%d)\n",
992 flush_samples(f, iter->buf, iter->buf_used);
995 iter->buf_size = iter->buf_used = 0;
999 * Iterative chunk inflation. Handles cases where we cross into a new
1000 * sequence, doing flush finish of previous chunk if needed.
1002 static size_t inflate_chunk(struct iolog_compress *ic, int gz_hdr, FILE *f,
1003 z_stream *stream, struct inflate_chunk_iter *iter)
1007 dprint(FD_COMPRESS, "inflate chunk size=%lu, seq=%u\n",
1008 (unsigned long) ic->len, ic->seq);
1010 if (ic->seq != iter->seq) {
1012 finish_chunk(stream, f, iter);
1014 z_stream_init(stream, gz_hdr);
1015 iter->seq = ic->seq;
1018 stream->avail_in = ic->len;
1019 stream->next_in = ic->buf;
1021 if (!iter->buf_size) {
1022 iter->buf_size = iter->chunk_sz;
1023 iter->buf = malloc(iter->buf_size);
1026 while (stream->avail_in) {
1027 size_t this_out = iter->buf_size - iter->buf_used;
1030 stream->avail_out = this_out;
1031 stream->next_out = iter->buf + iter->buf_used;
1033 err = inflate(stream, Z_NO_FLUSH);
1035 log_err("fio: failed inflating log: %d\n", err);
1040 iter->buf_used += this_out - stream->avail_out;
1042 if (!stream->avail_out) {
1043 iter->buf_size += iter->chunk_sz;
1044 iter->buf = realloc(iter->buf, iter->buf_size);
1048 if (err == Z_STREAM_END)
1052 ret = (void *) stream->next_in - ic->buf;
1054 dprint(FD_COMPRESS, "inflated to size=%lu\n", (unsigned long) iter->buf_size);
1060 * Inflate stored compressed chunks, or write them directly to the log
1061 * file if so instructed.
1063 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1065 struct inflate_chunk_iter iter = { .chunk_sz = log->log_gz, };
1068 while (!flist_empty(&log->chunk_list)) {
1069 struct iolog_compress *ic;
1071 ic = flist_first_entry(&log->chunk_list, struct iolog_compress, list);
1072 flist_del(&ic->list);
1074 if (log->log_gz_store) {
1077 dprint(FD_COMPRESS, "log write chunk size=%lu, "
1078 "seq=%u\n", (unsigned long) ic->len, ic->seq);
1080 ret = fwrite(ic->buf, ic->len, 1, f);
1081 if (ret != 1 || ferror(f)) {
1083 log_err("fio: error writing compressed log\n");
1086 inflate_chunk(ic, log->log_gz_store, f, &stream, &iter);
1092 finish_chunk(&stream, f, &iter);
1100 * Open compressed log file and decompress the stored chunks and
1101 * write them to stdout. The chunks are stored sequentially in the
1102 * file, so we iterate over them and do them one-by-one.
1104 int iolog_file_inflate(const char *file)
1106 struct inflate_chunk_iter iter = { .chunk_sz = 64 * 1024 * 1024, };
1107 struct iolog_compress ic;
1115 f = fopen(file, "r");
1121 if (stat(file, &sb) < 0) {
1127 ic.buf = buf = malloc(sb.st_size);
1128 ic.len = sb.st_size;
1131 ret = fread(ic.buf, ic.len, 1, f);
1132 if (ret == 0 && ferror(f)) {
1137 } else if (ferror(f) || (!feof(f) && ret != 1)) {
1138 log_err("fio: short read on reading log\n");
1147 * Each chunk will return Z_STREAM_END. We don't know how many
1148 * chunks are in the file, so we just keep looping and incrementing
1149 * the sequence number until we have consumed the whole compressed
1156 iret = inflate_chunk(&ic, 1, stdout, &stream, &iter);
1169 finish_chunk(&stream, stdout, &iter);
1179 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1184 int iolog_file_inflate(const char *file)
1186 log_err("fio: log inflation not possible without zlib\n");
1192 void flush_log(struct io_log *log, bool do_append)
1198 f = fopen(log->filename, "w");
1200 f = fopen(log->filename, "a");
1202 perror("fopen log");
1206 buf = set_file_buffer(f);
1208 inflate_gz_chunks(log, f);
1210 while (!flist_empty(&log->io_logs)) {
1211 struct io_logs *cur_log;
1213 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1214 flist_del_init(&cur_log->list);
1216 if (log->td && log == log->td->clat_hist_log)
1217 flush_hist_samples(f, log->hist_coarseness, cur_log->log,
1218 log_sample_sz(log, cur_log));
1220 flush_samples(f, cur_log->log, log_sample_sz(log, cur_log));
1226 clear_file_buffer(buf);
1229 static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
1231 if (td->flags & TD_F_COMPRESS_LOG)
1235 if (fio_trylock_file(log->filename))
1238 fio_lock_file(log->filename);
1240 if (td->client_type == FIO_CLIENT_TYPE_GUI || is_backend)
1241 fio_send_iolog(td, log, log->filename);
1243 flush_log(log, !td->o.per_job_logs);
1245 fio_unlock_file(log->filename);
1250 size_t log_chunk_sizes(struct io_log *log)
1252 struct flist_head *entry;
1255 if (flist_empty(&log->chunk_list))
1259 pthread_mutex_lock(&log->chunk_lock);
1260 flist_for_each(entry, &log->chunk_list) {
1261 struct iolog_compress *c;
1263 c = flist_entry(entry, struct iolog_compress, list);
1266 pthread_mutex_unlock(&log->chunk_lock);
1272 static void iolog_put_deferred(struct io_log *log, void *ptr)
1277 pthread_mutex_lock(&log->deferred_free_lock);
1278 if (log->deferred < IOLOG_MAX_DEFER) {
1279 log->deferred_items[log->deferred] = ptr;
1281 } else if (!fio_did_warn(FIO_WARN_IOLOG_DROP))
1282 log_err("fio: had to drop log entry free\n");
1283 pthread_mutex_unlock(&log->deferred_free_lock);
1286 static void iolog_free_deferred(struct io_log *log)
1293 pthread_mutex_lock(&log->deferred_free_lock);
1295 for (i = 0; i < log->deferred; i++) {
1296 free(log->deferred_items[i]);
1297 log->deferred_items[i] = NULL;
1301 pthread_mutex_unlock(&log->deferred_free_lock);
1304 static int gz_work(struct iolog_flush_data *data)
1306 struct iolog_compress *c = NULL;
1307 struct flist_head list;
1313 INIT_FLIST_HEAD(&list);
1315 memset(&stream, 0, sizeof(stream));
1316 stream.zalloc = Z_NULL;
1317 stream.zfree = Z_NULL;
1318 stream.opaque = Z_NULL;
1320 ret = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
1322 log_err("fio: failed to init gz stream\n");
1326 seq = ++data->log->chunk_seq;
1328 stream.next_in = (void *) data->samples;
1329 stream.avail_in = data->nr_samples * log_entry_sz(data->log);
1331 dprint(FD_COMPRESS, "deflate input size=%lu, seq=%u, log=%s\n",
1332 (unsigned long) stream.avail_in, seq,
1333 data->log->filename);
1336 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1337 (unsigned long) c->len);
1338 c = get_new_chunk(seq);
1339 stream.avail_out = GZ_CHUNK;
1340 stream.next_out = c->buf;
1341 ret = deflate(&stream, Z_NO_FLUSH);
1343 log_err("fio: deflate log (%d)\n", ret);
1348 c->len = GZ_CHUNK - stream.avail_out;
1349 flist_add_tail(&c->list, &list);
1351 } while (stream.avail_in);
1353 stream.next_out = c->buf + c->len;
1354 stream.avail_out = GZ_CHUNK - c->len;
1356 ret = deflate(&stream, Z_FINISH);
1359 * Z_BUF_ERROR is special, it just means we need more
1360 * output space. We'll handle that below. Treat any other
1363 if (ret != Z_BUF_ERROR) {
1364 log_err("fio: deflate log (%d)\n", ret);
1365 flist_del(&c->list);
1372 c->len = GZ_CHUNK - stream.avail_out;
1374 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq, (unsigned long) c->len);
1376 if (ret != Z_STREAM_END) {
1378 c = get_new_chunk(seq);
1379 stream.avail_out = GZ_CHUNK;
1380 stream.next_out = c->buf;
1381 ret = deflate(&stream, Z_FINISH);
1382 c->len = GZ_CHUNK - stream.avail_out;
1384 flist_add_tail(&c->list, &list);
1385 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1386 (unsigned long) c->len);
1387 } while (ret != Z_STREAM_END);
1390 dprint(FD_COMPRESS, "deflated to size=%lu\n", (unsigned long) total);
1392 ret = deflateEnd(&stream);
1394 log_err("fio: deflateEnd %d\n", ret);
1396 iolog_put_deferred(data->log, data->samples);
1398 if (!flist_empty(&list)) {
1399 pthread_mutex_lock(&data->log->chunk_lock);
1400 flist_splice_tail(&list, &data->log->chunk_list);
1401 pthread_mutex_unlock(&data->log->chunk_lock);
1410 while (!flist_empty(&list)) {
1411 c = flist_first_entry(list.next, struct iolog_compress, list);
1412 flist_del(&c->list);
1420 * Invoked from our compress helper thread, when logging would have exceeded
1421 * the specified memory limitation. Compresses the previously stored
1424 static int gz_work_async(struct submit_worker *sw, struct workqueue_work *work)
1426 return gz_work(container_of(work, struct iolog_flush_data, work));
1429 static int gz_init_worker(struct submit_worker *sw)
1431 struct thread_data *td = sw->wq->td;
1433 if (!fio_option_is_set(&td->o, log_gz_cpumask))
1436 if (fio_setaffinity(gettid(), td->o.log_gz_cpumask) == -1) {
1437 log_err("gz: failed to set CPU affinity\n");
1444 static struct workqueue_ops log_compress_wq_ops = {
1445 .fn = gz_work_async,
1446 .init_worker_fn = gz_init_worker,
1450 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1452 if (!(td->flags & TD_F_COMPRESS_LOG))
1455 workqueue_init(td, &td->log_compress_wq, &log_compress_wq_ops, 1, sk_out);
1459 void iolog_compress_exit(struct thread_data *td)
1461 if (!(td->flags & TD_F_COMPRESS_LOG))
1464 workqueue_exit(&td->log_compress_wq);
1468 * Queue work item to compress the existing log entries. We reset the
1469 * current log to a small size, and reference the existing log in the
1470 * data that we queue for compression. Once compression has been done,
1471 * this old log is freed. If called with finish == true, will not return
1472 * until the log compression has completed, and will flush all previous
1475 static int iolog_flush(struct io_log *log)
1477 struct iolog_flush_data *data;
1479 data = malloc(sizeof(*data));
1486 while (!flist_empty(&log->io_logs)) {
1487 struct io_logs *cur_log;
1489 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1490 flist_del_init(&cur_log->list);
1492 data->samples = cur_log->log;
1493 data->nr_samples = cur_log->nr_samples;
1504 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1506 struct iolog_flush_data *data;
1508 data = smalloc(sizeof(*data));
1514 data->samples = cur_log->log;
1515 data->nr_samples = cur_log->nr_samples;
1518 cur_log->nr_samples = cur_log->max_samples = 0;
1519 cur_log->log = NULL;
1521 workqueue_enqueue(&log->td->log_compress_wq, &data->work);
1523 iolog_free_deferred(log);
1529 static int iolog_flush(struct io_log *log)
1534 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1539 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1544 void iolog_compress_exit(struct thread_data *td)
1550 struct io_logs *iolog_cur_log(struct io_log *log)
1552 if (flist_empty(&log->io_logs))
1555 return flist_last_entry(&log->io_logs, struct io_logs, list);
1558 uint64_t iolog_nr_samples(struct io_log *iolog)
1560 struct flist_head *entry;
1563 flist_for_each(entry, &iolog->io_logs) {
1564 struct io_logs *cur_log;
1566 cur_log = flist_entry(entry, struct io_logs, list);
1567 ret += cur_log->nr_samples;
1573 static int __write_log(struct thread_data *td, struct io_log *log, int try)
1576 return finish_log(td, log, try);
1581 static int write_iops_log(struct thread_data *td, int try, bool unit_log)
1585 if (per_unit_log(td->iops_log) != unit_log)
1588 ret = __write_log(td, td->iops_log, try);
1590 td->iops_log = NULL;
1595 static int write_slat_log(struct thread_data *td, int try, bool unit_log)
1602 ret = __write_log(td, td->slat_log, try);
1604 td->slat_log = NULL;
1609 static int write_clat_log(struct thread_data *td, int try, bool unit_log)
1616 ret = __write_log(td, td->clat_log, try);
1618 td->clat_log = NULL;
1623 static int write_clat_hist_log(struct thread_data *td, int try, bool unit_log)
1630 ret = __write_log(td, td->clat_hist_log, try);
1632 td->clat_hist_log = NULL;
1637 static int write_lat_log(struct thread_data *td, int try, bool unit_log)
1644 ret = __write_log(td, td->lat_log, try);
1651 static int write_bandw_log(struct thread_data *td, int try, bool unit_log)
1655 if (per_unit_log(td->bw_log) != unit_log)
1658 ret = __write_log(td, td->bw_log, try);
1671 CLAT_HIST_LOG_MASK = 32,
1678 int (*fn)(struct thread_data *, int, bool);
1681 static struct log_type log_types[] = {
1683 .mask = BW_LOG_MASK,
1684 .fn = write_bandw_log,
1687 .mask = LAT_LOG_MASK,
1688 .fn = write_lat_log,
1691 .mask = SLAT_LOG_MASK,
1692 .fn = write_slat_log,
1695 .mask = CLAT_LOG_MASK,
1696 .fn = write_clat_log,
1699 .mask = IOPS_LOG_MASK,
1700 .fn = write_iops_log,
1703 .mask = CLAT_HIST_LOG_MASK,
1704 .fn = write_clat_hist_log,
1708 void td_writeout_logs(struct thread_data *td, bool unit_logs)
1710 unsigned int log_mask = 0;
1711 unsigned int log_left = ALL_LOG_NR;
1714 old_state = td_bump_runstate(td, TD_FINISHING);
1716 finalize_logs(td, unit_logs);
1719 int prev_log_left = log_left;
1721 for (i = 0; i < ALL_LOG_NR && log_left; i++) {
1722 struct log_type *lt = &log_types[i];
1725 if (!(log_mask & lt->mask)) {
1726 ret = lt->fn(td, log_left != 1, unit_logs);
1729 log_mask |= lt->mask;
1734 if (prev_log_left == log_left)
1738 td_restore_runstate(td, old_state);
1741 void fio_writeout_logs(bool unit_logs)
1743 struct thread_data *td;
1747 td_writeout_logs(td, unit_logs);