2 * Code related to writing an iolog of what a thread is doing, and to
3 * later read that back and replay
23 #include <netinet/in.h>
24 #include <netinet/tcp.h>
25 #include <arpa/inet.h>
27 #include <sys/socket.h>
30 static int iolog_flush(struct io_log *log);
32 static const char iolog_ver2[] = "fio version 2 iolog";
34 void queue_io_piece(struct thread_data *td, struct io_piece *ipo)
36 flist_add_tail(&ipo->list, &td->io_log_list);
37 td->total_io_size += ipo->len;
40 void log_io_u(const struct thread_data *td, const struct io_u *io_u)
42 if (!td->o.write_iolog_file)
45 fprintf(td->iolog_f, "%s %s %llu %llu\n", io_u->file->file_name,
46 io_ddir_name(io_u->ddir),
47 io_u->offset, io_u->buflen);
50 void log_file(struct thread_data *td, struct fio_file *f,
51 enum file_log_act what)
53 const char *act[] = { "add", "open", "close" };
57 if (!td->o.write_iolog_file)
62 * this happens on the pre-open/close done before the job starts
67 fprintf(td->iolog_f, "%s %s\n", f->file_name, act[what]);
70 static void iolog_delay(struct thread_data *td, unsigned long delay)
72 uint64_t usec = utime_since_now(&td->last_issue);
73 unsigned long orig_delay = delay;
77 if (delay < td->time_offset) {
82 delay -= td->time_offset;
88 fio_gettime(&ts, NULL);
89 while (delay && !td->terminate) {
91 if (this_delay > 500000)
94 usec_sleep(td, this_delay);
98 usec = utime_since_now(&ts);
99 if (usec > orig_delay)
100 td->time_offset = usec - orig_delay;
105 static int ipo_special(struct thread_data *td, struct io_piece *ipo)
113 if (ipo->ddir != DDIR_INVAL)
116 f = td->files[ipo->fileno];
118 switch (ipo->file_action) {
119 case FIO_LOG_OPEN_FILE:
120 if (td->o.replay_redirect && fio_file_open(f)) {
121 dprint(FD_FILE, "iolog: ignoring re-open of file %s\n",
125 ret = td_io_open_file(td, f);
128 td_verror(td, ret, "iolog open file");
130 case FIO_LOG_CLOSE_FILE:
131 td_io_close_file(td, f);
133 case FIO_LOG_UNLINK_FILE:
134 td_io_unlink_file(td, f);
137 log_err("fio: bad file action %d\n", ipo->file_action);
144 static bool read_iolog2(struct thread_data *td);
146 int read_iolog_get(struct thread_data *td, struct io_u *io_u)
148 struct io_piece *ipo;
149 unsigned long elapsed;
151 while (!flist_empty(&td->io_log_list)) {
153 if (td->o.read_iolog_chunked) {
154 if (td->io_log_checkmark == td->io_log_current) {
155 if (!read_iolog2(td))
158 td->io_log_current--;
160 ipo = flist_first_entry(&td->io_log_list, struct io_piece, list);
161 flist_del(&ipo->list);
162 remove_trim_entry(td, ipo);
164 ret = ipo_special(td, ipo);
168 } else if (ret > 0) {
173 io_u->ddir = ipo->ddir;
174 if (ipo->ddir != DDIR_WAIT) {
175 io_u->offset = ipo->offset;
176 io_u->buflen = ipo->len;
177 io_u->file = td->files[ipo->fileno];
178 get_file(io_u->file);
179 dprint(FD_IO, "iolog: get %llu/%llu/%s\n", io_u->offset,
180 io_u->buflen, io_u->file->file_name);
182 iolog_delay(td, ipo->delay);
184 elapsed = mtime_since_genesis();
185 if (ipo->delay > elapsed)
186 usec_sleep(td, (ipo->delay - elapsed) * 1000);
191 if (io_u->ddir != DDIR_WAIT)
199 void prune_io_piece_log(struct thread_data *td)
201 struct io_piece *ipo;
202 struct fio_rb_node *n;
204 while ((n = rb_first(&td->io_hist_tree)) != NULL) {
205 ipo = rb_entry(n, struct io_piece, rb_node);
206 rb_erase(n, &td->io_hist_tree);
207 remove_trim_entry(td, ipo);
212 while (!flist_empty(&td->io_hist_list)) {
213 ipo = flist_first_entry(&td->io_hist_list, struct io_piece, list);
214 flist_del(&ipo->list);
215 remove_trim_entry(td, ipo);
222 * log a successful write, so we can unwind the log for verify
224 void log_io_piece(struct thread_data *td, struct io_u *io_u)
226 struct fio_rb_node **p, *parent;
227 struct io_piece *ipo, *__ipo;
229 ipo = calloc(1, sizeof(struct io_piece));
231 ipo->file = io_u->file;
232 ipo->offset = io_u->offset;
233 ipo->len = io_u->buflen;
234 ipo->numberio = io_u->numberio;
235 ipo->flags = IP_F_IN_FLIGHT;
239 if (io_u_should_trim(td, io_u)) {
240 flist_add_tail(&ipo->trim_list, &td->trim_list);
245 * Only sort writes if we don't have a random map in which case we need
246 * to check for duplicate blocks and drop the old one, which we rely on
247 * the rb insert/lookup for handling.
249 if (file_randommap(td, ipo->file)) {
250 INIT_FLIST_HEAD(&ipo->list);
251 flist_add_tail(&ipo->list, &td->io_hist_list);
252 ipo->flags |= IP_F_ONLIST;
257 RB_CLEAR_NODE(&ipo->rb_node);
260 * Sort the entry into the verification list
263 p = &td->io_hist_tree.rb_node;
269 __ipo = rb_entry(parent, struct io_piece, rb_node);
270 if (ipo->file < __ipo->file)
272 else if (ipo->file > __ipo->file)
274 else if (ipo->offset < __ipo->offset) {
276 overlap = ipo->offset + ipo->len > __ipo->offset;
278 else if (ipo->offset > __ipo->offset) {
280 overlap = __ipo->offset + __ipo->len > ipo->offset;
286 dprint(FD_IO, "iolog: overlap %llu/%lu, %llu/%lu\n",
287 __ipo->offset, __ipo->len,
288 ipo->offset, ipo->len);
290 rb_erase(parent, &td->io_hist_tree);
291 remove_trim_entry(td, __ipo);
292 if (!(__ipo->flags & IP_F_IN_FLIGHT))
298 rb_link_node(&ipo->rb_node, parent, p);
299 rb_insert_color(&ipo->rb_node, &td->io_hist_tree);
300 ipo->flags |= IP_F_ONRB;
304 void unlog_io_piece(struct thread_data *td, struct io_u *io_u)
306 struct io_piece *ipo = io_u->ipo;
308 if (td->ts.nr_block_infos) {
309 uint32_t *info = io_u_block_info(td, io_u);
310 if (BLOCK_INFO_STATE(*info) < BLOCK_STATE_TRIM_FAILURE) {
311 if (io_u->ddir == DDIR_TRIM)
312 *info = BLOCK_INFO_SET_STATE(*info,
313 BLOCK_STATE_TRIM_FAILURE);
314 else if (io_u->ddir == DDIR_WRITE)
315 *info = BLOCK_INFO_SET_STATE(*info,
316 BLOCK_STATE_WRITE_FAILURE);
323 if (ipo->flags & IP_F_ONRB)
324 rb_erase(&ipo->rb_node, &td->io_hist_tree);
325 else if (ipo->flags & IP_F_ONLIST)
326 flist_del(&ipo->list);
333 void trim_io_piece(const struct io_u *io_u)
335 struct io_piece *ipo = io_u->ipo;
340 ipo->len = io_u->xfer_buflen - io_u->resid;
343 void write_iolog_close(struct thread_data *td)
349 td->iolog_buf = NULL;
352 static int64_t iolog_items_to_fetch(struct thread_data *td)
357 int64_t items_to_fetch;
359 if (!td->io_log_highmark)
363 fio_gettime(&now, NULL);
364 elapsed = ntime_since(&td->io_log_highmark_time, &now);
366 for_1s = (td->io_log_highmark - td->io_log_current) * 1000000000 / elapsed;
367 items_to_fetch = for_1s - td->io_log_current;
368 if (items_to_fetch < 0)
373 td->io_log_highmark = td->io_log_current + items_to_fetch;
374 td->io_log_checkmark = (td->io_log_highmark + 1) / 2;
375 fio_gettime(&td->io_log_highmark_time, NULL);
377 return items_to_fetch;
381 * Read version 2 iolog data. It is enhanced to include per-file logging,
384 static bool read_iolog2(struct thread_data *td)
386 unsigned long long offset;
388 int reads, writes, waits, fileno = 0, file_action = 0; /* stupid gcc */
389 char *rfname, *fname, *act;
392 bool realloc = false;
393 int64_t items_to_fetch = 0;
395 if (td->o.read_iolog_chunked) {
396 items_to_fetch = iolog_items_to_fetch(td);
402 * Read in the read iolog and store it, reuse the infrastructure
403 * for doing verifications.
406 rfname = fname = malloc(256+16);
407 act = malloc(256+16);
409 reads = writes = waits = 0;
410 while ((p = fgets(str, 4096, td->io_log_rfile)) != NULL) {
411 struct io_piece *ipo;
414 r = sscanf(p, "%256s %256s %llu %u", rfname, act, &offset,
417 if (td->o.replay_redirect)
418 fname = td->o.replay_redirect;
424 if (!strcmp(act, "wait"))
426 else if (!strcmp(act, "read"))
428 else if (!strcmp(act, "write"))
430 else if (!strcmp(act, "sync"))
432 else if (!strcmp(act, "datasync"))
434 else if (!strcmp(act, "trim"))
437 log_err("fio: bad iolog file action: %s\n",
441 fileno = get_fileno(td, fname);
444 if (!strcmp(act, "add")) {
445 if (td->o.replay_redirect &&
446 get_fileno(td, fname) != -1) {
447 dprint(FD_FILE, "iolog: ignoring"
448 " re-add of file %s\n", fname);
450 fileno = add_file(td, fname, td->subjob_number, 1);
451 file_action = FIO_LOG_ADD_FILE;
454 } else if (!strcmp(act, "open")) {
455 fileno = get_fileno(td, fname);
456 file_action = FIO_LOG_OPEN_FILE;
457 } else if (!strcmp(act, "close")) {
458 fileno = get_fileno(td, fname);
459 file_action = FIO_LOG_CLOSE_FILE;
461 log_err("fio: bad iolog file action: %s\n",
466 log_err("bad iolog2: %s\n", p);
472 else if (rw == DDIR_WRITE) {
474 * Don't add a write for ro mode
479 } else if (rw == DDIR_WAIT) {
483 } else if (rw == DDIR_INVAL) {
484 } else if (!ddir_sync(rw)) {
485 log_err("bad ddir: %d\n", rw);
492 ipo = calloc(1, sizeof(*ipo));
495 if (rw == DDIR_WAIT) {
498 if (td->o.replay_scale)
499 ipo->offset = offset / td->o.replay_scale;
501 ipo->offset = offset;
502 ipo_bytes_align(td->o.replay_align, ipo);
505 if (rw != DDIR_INVAL && bytes > td->o.max_bs[rw]) {
507 td->o.max_bs[rw] = bytes;
509 ipo->fileno = fileno;
510 ipo->file_action = file_action;
514 queue_io_piece(td, ipo);
516 if (td->o.read_iolog_chunked) {
517 td->io_log_current++;
519 if (items_to_fetch == 0)
528 if (td->o.read_iolog_chunked) {
529 td->io_log_highmark = td->io_log_current;
530 td->io_log_checkmark = (td->io_log_highmark + 1) / 2;
531 fio_gettime(&td->io_log_highmark_time, NULL);
534 if (writes && read_only) {
535 log_err("fio: <%s> skips replay of %d writes due to"
536 " read-only\n", td->o.name, writes);
540 if (td->o.read_iolog_chunked) {
541 if (td->io_log_current == 0) {
544 td->o.td_ddir = TD_DDIR_RW;
545 if (realloc && td->orig_buffer)
549 init_io_u_buffers(td);
554 if (!reads && !writes && !waits)
556 else if (reads && !writes)
557 td->o.td_ddir = TD_DDIR_READ;
558 else if (!reads && writes)
559 td->o.td_ddir = TD_DDIR_WRITE;
561 td->o.td_ddir = TD_DDIR_RW;
566 static bool is_socket(const char *path)
569 int r = stat(path, &buf);
573 return S_ISSOCK(buf.st_mode);
576 static int open_socket(const char *path)
578 int fd = socket(AF_UNIX, SOCK_STREAM, 0);
579 struct sockaddr_un addr;
582 addr.sun_family = AF_UNIX;
583 strncpy(addr.sun_path, path, sizeof(addr.sun_path));
584 if (connect(fd, (const struct sockaddr *)&addr, strlen(path) + sizeof(addr.sun_family)) == 0)
592 * open iolog, check version, and call appropriate parser
594 static bool init_iolog_read(struct thread_data *td)
596 char buffer[256], *p;
599 char* fname = get_name_by_idx(td->o.read_iolog_file, td->subjob_number);
600 dprint(FD_IO, "iolog: name=%s\n", fname);
602 if (is_socket(fname)) {
603 int fd = open_socket(fname);
608 f = fopen(fname, "r");
611 perror("fopen read iolog");
615 p = fgets(buffer, sizeof(buffer), f);
617 td_verror(td, errno, "iolog read");
618 log_err("fio: unable to read iolog\n");
622 td->io_log_rfile = f;
624 * version 2 of the iolog stores a specific string as the
625 * first line, check for that
627 if (!strncmp(iolog_ver2, buffer, strlen(iolog_ver2))) {
628 free_release_files(td);
629 ret = read_iolog2(td);
632 log_err("fio: iolog version 1 is no longer supported\n");
640 * Set up a log for storing io patterns.
642 static bool init_iolog_write(struct thread_data *td)
648 f = fopen(td->o.write_iolog_file, "a");
650 perror("fopen write iolog");
655 * That's it for writing, setup a log buffer and we're done.
658 td->iolog_buf = malloc(8192);
659 setvbuf(f, td->iolog_buf, _IOFBF, 8192);
662 * write our version line
664 if (fprintf(f, "%s\n", iolog_ver2) < 0) {
665 perror("iolog init\n");
670 * add all known files
672 for_each_file(td, ff, i)
673 log_file(td, ff, FIO_LOG_ADD_FILE);
678 bool init_iolog(struct thread_data *td)
682 if (td->o.read_iolog_file) {
686 * Check if it's a blktrace file and load that if possible.
687 * Otherwise assume it's a normal log file and load that.
689 if (is_blktrace(td->o.read_iolog_file, &need_swap))
690 ret = load_blktrace(td, td->o.read_iolog_file, need_swap);
692 ret = init_iolog_read(td);
693 } else if (td->o.write_iolog_file)
694 ret = init_iolog_write(td);
699 td_verror(td, EINVAL, "failed initializing iolog");
704 void setup_log(struct io_log **log, struct log_params *p,
705 const char *filename)
709 struct io_u_plat_entry *entry;
710 struct flist_head *list;
712 l = scalloc(1, sizeof(*l));
713 INIT_FLIST_HEAD(&l->io_logs);
714 l->log_type = p->log_type;
715 l->log_offset = p->log_offset;
716 l->log_gz = p->log_gz;
717 l->log_gz_store = p->log_gz_store;
718 l->avg_msec = p->avg_msec;
719 l->hist_msec = p->hist_msec;
720 l->hist_coarseness = p->hist_coarseness;
721 l->filename = strdup(filename);
724 /* Initialize histogram lists for each r/w direction,
725 * with initial io_u_plat of all zeros:
727 for (i = 0; i < DDIR_RWDIR_CNT; i++) {
728 list = &l->hist_window[i].list;
729 INIT_FLIST_HEAD(list);
730 entry = calloc(1, sizeof(struct io_u_plat_entry));
731 flist_add(&entry->list, list);
734 if (l->td && l->td->o.io_submit_mode != IO_MODE_OFFLOAD) {
737 __p = calloc(1, sizeof(*l->pending));
738 __p->max_samples = DEF_LOG_ENTRIES;
739 __p->log = calloc(__p->max_samples, log_entry_sz(l));
744 l->log_ddir_mask = LOG_OFFSET_SAMPLE_BIT;
746 INIT_FLIST_HEAD(&l->chunk_list);
748 if (l->log_gz && !p->td)
750 else if (l->log_gz || l->log_gz_store) {
751 mutex_init_pshared(&l->chunk_lock);
752 mutex_init_pshared(&l->deferred_free_lock);
753 p->td->flags |= TD_F_COMPRESS_LOG;
759 #ifdef CONFIG_SETVBUF
760 static void *set_file_buffer(FILE *f)
762 size_t size = 1048576;
766 setvbuf(f, buf, _IOFBF, size);
770 static void clear_file_buffer(void *buf)
775 static void *set_file_buffer(FILE *f)
780 static void clear_file_buffer(void *buf)
785 void free_log(struct io_log *log)
787 while (!flist_empty(&log->io_logs)) {
788 struct io_logs *cur_log;
790 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
791 flist_del_init(&cur_log->list);
797 free(log->pending->log);
807 uint64_t hist_sum(int j, int stride, uint64_t *io_u_plat,
808 uint64_t *io_u_plat_last)
813 if (io_u_plat_last) {
814 for (k = sum = 0; k < stride; k++)
815 sum += io_u_plat[j + k] - io_u_plat_last[j + k];
817 for (k = sum = 0; k < stride; k++)
818 sum += io_u_plat[j + k];
824 static void flush_hist_samples(FILE *f, int hist_coarseness, void *samples,
825 uint64_t sample_size)
829 uint64_t i, j, nr_samples;
830 struct io_u_plat_entry *entry, *entry_before;
832 uint64_t *io_u_plat_before;
834 int stride = 1 << hist_coarseness;
839 s = __get_sample(samples, 0, 0);
840 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
842 nr_samples = sample_size / __log_entry_sz(log_offset);
844 for (i = 0; i < nr_samples; i++) {
845 s = __get_sample(samples, log_offset, i);
847 entry = s->data.plat_entry;
848 io_u_plat = entry->io_u_plat;
850 entry_before = flist_first_entry(&entry->list, struct io_u_plat_entry, list);
851 io_u_plat_before = entry_before->io_u_plat;
853 fprintf(f, "%lu, %u, %llu, ", (unsigned long) s->time,
854 io_sample_ddir(s), (unsigned long long) s->bs);
855 for (j = 0; j < FIO_IO_U_PLAT_NR - stride; j += stride) {
856 fprintf(f, "%llu, ", (unsigned long long)
857 hist_sum(j, stride, io_u_plat, io_u_plat_before));
859 fprintf(f, "%llu\n", (unsigned long long)
860 hist_sum(FIO_IO_U_PLAT_NR - stride, stride, io_u_plat,
863 flist_del(&entry_before->list);
868 void flush_samples(FILE *f, void *samples, uint64_t sample_size)
872 uint64_t i, nr_samples;
877 s = __get_sample(samples, 0, 0);
878 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
880 nr_samples = sample_size / __log_entry_sz(log_offset);
882 for (i = 0; i < nr_samples; i++) {
883 s = __get_sample(samples, log_offset, i);
886 fprintf(f, "%lu, %" PRId64 ", %u, %llu\n",
887 (unsigned long) s->time,
889 io_sample_ddir(s), (unsigned long long) s->bs);
891 struct io_sample_offset *so = (void *) s;
893 fprintf(f, "%lu, %" PRId64 ", %u, %llu, %llu\n",
894 (unsigned long) s->time,
896 io_sample_ddir(s), (unsigned long long) s->bs,
897 (unsigned long long) so->offset);
904 struct iolog_flush_data {
905 struct workqueue_work work;
912 #define GZ_CHUNK 131072
914 static struct iolog_compress *get_new_chunk(unsigned int seq)
916 struct iolog_compress *c;
918 c = malloc(sizeof(*c));
919 INIT_FLIST_HEAD(&c->list);
920 c->buf = malloc(GZ_CHUNK);
926 static void free_chunk(struct iolog_compress *ic)
932 static int z_stream_init(z_stream *stream, int gz_hdr)
936 memset(stream, 0, sizeof(*stream));
937 stream->zalloc = Z_NULL;
938 stream->zfree = Z_NULL;
939 stream->opaque = Z_NULL;
940 stream->next_in = Z_NULL;
943 * zlib magic - add 32 for auto-detection of gz header or not,
944 * if we decide to store files in a gzip friendly format.
949 if (inflateInit2(stream, wbits) != Z_OK)
955 struct inflate_chunk_iter {
964 static void finish_chunk(z_stream *stream, FILE *f,
965 struct inflate_chunk_iter *iter)
969 ret = inflateEnd(stream);
971 log_err("fio: failed to end log inflation seq %d (%d)\n",
974 flush_samples(f, iter->buf, iter->buf_used);
977 iter->buf_size = iter->buf_used = 0;
981 * Iterative chunk inflation. Handles cases where we cross into a new
982 * sequence, doing flush finish of previous chunk if needed.
984 static size_t inflate_chunk(struct iolog_compress *ic, int gz_hdr, FILE *f,
985 z_stream *stream, struct inflate_chunk_iter *iter)
989 dprint(FD_COMPRESS, "inflate chunk size=%lu, seq=%u\n",
990 (unsigned long) ic->len, ic->seq);
992 if (ic->seq != iter->seq) {
994 finish_chunk(stream, f, iter);
996 z_stream_init(stream, gz_hdr);
1000 stream->avail_in = ic->len;
1001 stream->next_in = ic->buf;
1003 if (!iter->buf_size) {
1004 iter->buf_size = iter->chunk_sz;
1005 iter->buf = malloc(iter->buf_size);
1008 while (stream->avail_in) {
1009 size_t this_out = iter->buf_size - iter->buf_used;
1012 stream->avail_out = this_out;
1013 stream->next_out = iter->buf + iter->buf_used;
1015 err = inflate(stream, Z_NO_FLUSH);
1017 log_err("fio: failed inflating log: %d\n", err);
1022 iter->buf_used += this_out - stream->avail_out;
1024 if (!stream->avail_out) {
1025 iter->buf_size += iter->chunk_sz;
1026 iter->buf = realloc(iter->buf, iter->buf_size);
1030 if (err == Z_STREAM_END)
1034 ret = (void *) stream->next_in - ic->buf;
1036 dprint(FD_COMPRESS, "inflated to size=%lu\n", (unsigned long) iter->buf_size);
1042 * Inflate stored compressed chunks, or write them directly to the log
1043 * file if so instructed.
1045 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1047 struct inflate_chunk_iter iter = { .chunk_sz = log->log_gz, };
1050 while (!flist_empty(&log->chunk_list)) {
1051 struct iolog_compress *ic;
1053 ic = flist_first_entry(&log->chunk_list, struct iolog_compress, list);
1054 flist_del(&ic->list);
1056 if (log->log_gz_store) {
1059 dprint(FD_COMPRESS, "log write chunk size=%lu, "
1060 "seq=%u\n", (unsigned long) ic->len, ic->seq);
1062 ret = fwrite(ic->buf, ic->len, 1, f);
1063 if (ret != 1 || ferror(f)) {
1065 log_err("fio: error writing compressed log\n");
1068 inflate_chunk(ic, log->log_gz_store, f, &stream, &iter);
1074 finish_chunk(&stream, f, &iter);
1082 * Open compressed log file and decompress the stored chunks and
1083 * write them to stdout. The chunks are stored sequentially in the
1084 * file, so we iterate over them and do them one-by-one.
1086 int iolog_file_inflate(const char *file)
1088 struct inflate_chunk_iter iter = { .chunk_sz = 64 * 1024 * 1024, };
1089 struct iolog_compress ic;
1097 f = fopen(file, "r");
1103 if (stat(file, &sb) < 0) {
1109 ic.buf = buf = malloc(sb.st_size);
1110 ic.len = sb.st_size;
1113 ret = fread(ic.buf, ic.len, 1, f);
1114 if (ret == 0 && ferror(f)) {
1119 } else if (ferror(f) || (!feof(f) && ret != 1)) {
1120 log_err("fio: short read on reading log\n");
1129 * Each chunk will return Z_STREAM_END. We don't know how many
1130 * chunks are in the file, so we just keep looping and incrementing
1131 * the sequence number until we have consumed the whole compressed
1138 iret = inflate_chunk(&ic, 1, stdout, &stream, &iter);
1151 finish_chunk(&stream, stdout, &iter);
1161 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1166 int iolog_file_inflate(const char *file)
1168 log_err("fio: log inflation not possible without zlib\n");
1174 void flush_log(struct io_log *log, bool do_append)
1180 f = fopen(log->filename, "w");
1182 f = fopen(log->filename, "a");
1184 perror("fopen log");
1188 buf = set_file_buffer(f);
1190 inflate_gz_chunks(log, f);
1192 while (!flist_empty(&log->io_logs)) {
1193 struct io_logs *cur_log;
1195 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1196 flist_del_init(&cur_log->list);
1198 if (log->td && log == log->td->clat_hist_log)
1199 flush_hist_samples(f, log->hist_coarseness, cur_log->log,
1200 log_sample_sz(log, cur_log));
1202 flush_samples(f, cur_log->log, log_sample_sz(log, cur_log));
1208 clear_file_buffer(buf);
1211 static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
1213 if (td->flags & TD_F_COMPRESS_LOG)
1217 if (fio_trylock_file(log->filename))
1220 fio_lock_file(log->filename);
1222 if (td->client_type == FIO_CLIENT_TYPE_GUI || is_backend)
1223 fio_send_iolog(td, log, log->filename);
1225 flush_log(log, !td->o.per_job_logs);
1227 fio_unlock_file(log->filename);
1232 size_t log_chunk_sizes(struct io_log *log)
1234 struct flist_head *entry;
1237 if (flist_empty(&log->chunk_list))
1241 pthread_mutex_lock(&log->chunk_lock);
1242 flist_for_each(entry, &log->chunk_list) {
1243 struct iolog_compress *c;
1245 c = flist_entry(entry, struct iolog_compress, list);
1248 pthread_mutex_unlock(&log->chunk_lock);
1254 static void iolog_put_deferred(struct io_log *log, void *ptr)
1259 pthread_mutex_lock(&log->deferred_free_lock);
1260 if (log->deferred < IOLOG_MAX_DEFER) {
1261 log->deferred_items[log->deferred] = ptr;
1263 } else if (!fio_did_warn(FIO_WARN_IOLOG_DROP))
1264 log_err("fio: had to drop log entry free\n");
1265 pthread_mutex_unlock(&log->deferred_free_lock);
1268 static void iolog_free_deferred(struct io_log *log)
1275 pthread_mutex_lock(&log->deferred_free_lock);
1277 for (i = 0; i < log->deferred; i++) {
1278 free(log->deferred_items[i]);
1279 log->deferred_items[i] = NULL;
1283 pthread_mutex_unlock(&log->deferred_free_lock);
1286 static int gz_work(struct iolog_flush_data *data)
1288 struct iolog_compress *c = NULL;
1289 struct flist_head list;
1295 INIT_FLIST_HEAD(&list);
1297 memset(&stream, 0, sizeof(stream));
1298 stream.zalloc = Z_NULL;
1299 stream.zfree = Z_NULL;
1300 stream.opaque = Z_NULL;
1302 ret = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
1304 log_err("fio: failed to init gz stream\n");
1308 seq = ++data->log->chunk_seq;
1310 stream.next_in = (void *) data->samples;
1311 stream.avail_in = data->nr_samples * log_entry_sz(data->log);
1313 dprint(FD_COMPRESS, "deflate input size=%lu, seq=%u, log=%s\n",
1314 (unsigned long) stream.avail_in, seq,
1315 data->log->filename);
1318 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1319 (unsigned long) c->len);
1320 c = get_new_chunk(seq);
1321 stream.avail_out = GZ_CHUNK;
1322 stream.next_out = c->buf;
1323 ret = deflate(&stream, Z_NO_FLUSH);
1325 log_err("fio: deflate log (%d)\n", ret);
1330 c->len = GZ_CHUNK - stream.avail_out;
1331 flist_add_tail(&c->list, &list);
1333 } while (stream.avail_in);
1335 stream.next_out = c->buf + c->len;
1336 stream.avail_out = GZ_CHUNK - c->len;
1338 ret = deflate(&stream, Z_FINISH);
1341 * Z_BUF_ERROR is special, it just means we need more
1342 * output space. We'll handle that below. Treat any other
1345 if (ret != Z_BUF_ERROR) {
1346 log_err("fio: deflate log (%d)\n", ret);
1347 flist_del(&c->list);
1354 c->len = GZ_CHUNK - stream.avail_out;
1356 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq, (unsigned long) c->len);
1358 if (ret != Z_STREAM_END) {
1360 c = get_new_chunk(seq);
1361 stream.avail_out = GZ_CHUNK;
1362 stream.next_out = c->buf;
1363 ret = deflate(&stream, Z_FINISH);
1364 c->len = GZ_CHUNK - stream.avail_out;
1366 flist_add_tail(&c->list, &list);
1367 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1368 (unsigned long) c->len);
1369 } while (ret != Z_STREAM_END);
1372 dprint(FD_COMPRESS, "deflated to size=%lu\n", (unsigned long) total);
1374 ret = deflateEnd(&stream);
1376 log_err("fio: deflateEnd %d\n", ret);
1378 iolog_put_deferred(data->log, data->samples);
1380 if (!flist_empty(&list)) {
1381 pthread_mutex_lock(&data->log->chunk_lock);
1382 flist_splice_tail(&list, &data->log->chunk_list);
1383 pthread_mutex_unlock(&data->log->chunk_lock);
1392 while (!flist_empty(&list)) {
1393 c = flist_first_entry(list.next, struct iolog_compress, list);
1394 flist_del(&c->list);
1402 * Invoked from our compress helper thread, when logging would have exceeded
1403 * the specified memory limitation. Compresses the previously stored
1406 static int gz_work_async(struct submit_worker *sw, struct workqueue_work *work)
1408 return gz_work(container_of(work, struct iolog_flush_data, work));
1411 static int gz_init_worker(struct submit_worker *sw)
1413 struct thread_data *td = sw->wq->td;
1415 if (!fio_option_is_set(&td->o, log_gz_cpumask))
1418 if (fio_setaffinity(gettid(), td->o.log_gz_cpumask) == -1) {
1419 log_err("gz: failed to set CPU affinity\n");
1426 static struct workqueue_ops log_compress_wq_ops = {
1427 .fn = gz_work_async,
1428 .init_worker_fn = gz_init_worker,
1432 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1434 if (!(td->flags & TD_F_COMPRESS_LOG))
1437 workqueue_init(td, &td->log_compress_wq, &log_compress_wq_ops, 1, sk_out);
1441 void iolog_compress_exit(struct thread_data *td)
1443 if (!(td->flags & TD_F_COMPRESS_LOG))
1446 workqueue_exit(&td->log_compress_wq);
1450 * Queue work item to compress the existing log entries. We reset the
1451 * current log to a small size, and reference the existing log in the
1452 * data that we queue for compression. Once compression has been done,
1453 * this old log is freed. If called with finish == true, will not return
1454 * until the log compression has completed, and will flush all previous
1457 static int iolog_flush(struct io_log *log)
1459 struct iolog_flush_data *data;
1461 data = malloc(sizeof(*data));
1468 while (!flist_empty(&log->io_logs)) {
1469 struct io_logs *cur_log;
1471 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1472 flist_del_init(&cur_log->list);
1474 data->samples = cur_log->log;
1475 data->nr_samples = cur_log->nr_samples;
1486 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1488 struct iolog_flush_data *data;
1490 data = smalloc(sizeof(*data));
1496 data->samples = cur_log->log;
1497 data->nr_samples = cur_log->nr_samples;
1500 cur_log->nr_samples = cur_log->max_samples = 0;
1501 cur_log->log = NULL;
1503 workqueue_enqueue(&log->td->log_compress_wq, &data->work);
1505 iolog_free_deferred(log);
1511 static int iolog_flush(struct io_log *log)
1516 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1521 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1526 void iolog_compress_exit(struct thread_data *td)
1532 struct io_logs *iolog_cur_log(struct io_log *log)
1534 if (flist_empty(&log->io_logs))
1537 return flist_last_entry(&log->io_logs, struct io_logs, list);
1540 uint64_t iolog_nr_samples(struct io_log *iolog)
1542 struct flist_head *entry;
1545 flist_for_each(entry, &iolog->io_logs) {
1546 struct io_logs *cur_log;
1548 cur_log = flist_entry(entry, struct io_logs, list);
1549 ret += cur_log->nr_samples;
1555 static int __write_log(struct thread_data *td, struct io_log *log, int try)
1558 return finish_log(td, log, try);
1563 static int write_iops_log(struct thread_data *td, int try, bool unit_log)
1567 if (per_unit_log(td->iops_log) != unit_log)
1570 ret = __write_log(td, td->iops_log, try);
1572 td->iops_log = NULL;
1577 static int write_slat_log(struct thread_data *td, int try, bool unit_log)
1584 ret = __write_log(td, td->slat_log, try);
1586 td->slat_log = NULL;
1591 static int write_clat_log(struct thread_data *td, int try, bool unit_log)
1598 ret = __write_log(td, td->clat_log, try);
1600 td->clat_log = NULL;
1605 static int write_clat_hist_log(struct thread_data *td, int try, bool unit_log)
1612 ret = __write_log(td, td->clat_hist_log, try);
1614 td->clat_hist_log = NULL;
1619 static int write_lat_log(struct thread_data *td, int try, bool unit_log)
1626 ret = __write_log(td, td->lat_log, try);
1633 static int write_bandw_log(struct thread_data *td, int try, bool unit_log)
1637 if (per_unit_log(td->bw_log) != unit_log)
1640 ret = __write_log(td, td->bw_log, try);
1653 CLAT_HIST_LOG_MASK = 32,
1660 int (*fn)(struct thread_data *, int, bool);
1663 static struct log_type log_types[] = {
1665 .mask = BW_LOG_MASK,
1666 .fn = write_bandw_log,
1669 .mask = LAT_LOG_MASK,
1670 .fn = write_lat_log,
1673 .mask = SLAT_LOG_MASK,
1674 .fn = write_slat_log,
1677 .mask = CLAT_LOG_MASK,
1678 .fn = write_clat_log,
1681 .mask = IOPS_LOG_MASK,
1682 .fn = write_iops_log,
1685 .mask = CLAT_HIST_LOG_MASK,
1686 .fn = write_clat_hist_log,
1690 void td_writeout_logs(struct thread_data *td, bool unit_logs)
1692 unsigned int log_mask = 0;
1693 unsigned int log_left = ALL_LOG_NR;
1696 old_state = td_bump_runstate(td, TD_FINISHING);
1698 finalize_logs(td, unit_logs);
1701 int prev_log_left = log_left;
1703 for (i = 0; i < ALL_LOG_NR && log_left; i++) {
1704 struct log_type *lt = &log_types[i];
1707 if (!(log_mask & lt->mask)) {
1708 ret = lt->fn(td, log_left != 1, unit_logs);
1711 log_mask |= lt->mask;
1716 if (prev_log_left == log_left)
1720 td_restore_runstate(td, old_state);
1723 void fio_writeout_logs(bool unit_logs)
1725 struct thread_data *td;
1729 td_writeout_logs(td, unit_logs);