2 * Code related to writing an iolog of what a thread is doing, and to
3 * later read that back and replay
23 #include <netinet/in.h>
24 #include <netinet/tcp.h>
25 #include <arpa/inet.h>
27 #include <sys/socket.h>
30 static int iolog_flush(struct io_log *log);
32 static const char iolog_ver2[] = "fio version 2 iolog";
34 void queue_io_piece(struct thread_data *td, struct io_piece *ipo)
36 flist_add_tail(&ipo->list, &td->io_log_list);
37 td->total_io_size += ipo->len;
40 void log_io_u(const struct thread_data *td, const struct io_u *io_u)
42 if (!td->o.write_iolog_file)
45 fprintf(td->iolog_f, "%s %s %llu %llu\n", io_u->file->file_name,
46 io_ddir_name(io_u->ddir),
47 io_u->offset, io_u->buflen);
50 void log_file(struct thread_data *td, struct fio_file *f,
51 enum file_log_act what)
53 const char *act[] = { "add", "open", "close" };
57 if (!td->o.write_iolog_file)
62 * this happens on the pre-open/close done before the job starts
67 fprintf(td->iolog_f, "%s %s\n", f->file_name, act[what]);
70 static void iolog_delay(struct thread_data *td, unsigned long delay)
72 uint64_t usec = utime_since_now(&td->last_issue);
73 unsigned long orig_delay = delay;
77 if (delay < td->time_offset) {
82 delay -= td->time_offset;
88 fio_gettime(&ts, NULL);
89 while (delay && !td->terminate) {
91 if (this_delay > 500000)
94 usec_sleep(td, this_delay);
98 usec = utime_since_now(&ts);
99 if (usec > orig_delay)
100 td->time_offset = usec - orig_delay;
105 static int ipo_special(struct thread_data *td, struct io_piece *ipo)
113 if (ipo->ddir != DDIR_INVAL)
116 f = td->files[ipo->fileno];
118 switch (ipo->file_action) {
119 case FIO_LOG_OPEN_FILE:
120 if (td->o.replay_redirect && fio_file_open(f)) {
121 dprint(FD_FILE, "iolog: ignoring re-open of file %s\n",
125 ret = td_io_open_file(td, f);
128 td_verror(td, ret, "iolog open file");
130 case FIO_LOG_CLOSE_FILE:
131 td_io_close_file(td, f);
133 case FIO_LOG_UNLINK_FILE:
134 td_io_unlink_file(td, f);
137 log_err("fio: bad file action %d\n", ipo->file_action);
144 static bool read_iolog2(struct thread_data *td);
146 int read_iolog_get(struct thread_data *td, struct io_u *io_u)
148 struct io_piece *ipo;
149 unsigned long elapsed;
151 while (!flist_empty(&td->io_log_list)) {
153 if (td->o.read_iolog_chunked) {
154 if (td->io_log_checkmark == td->io_log_current) {
155 if (!read_iolog2(td))
158 td->io_log_current--;
160 ipo = flist_first_entry(&td->io_log_list, struct io_piece, list);
161 flist_del(&ipo->list);
162 remove_trim_entry(td, ipo);
164 ret = ipo_special(td, ipo);
168 } else if (ret > 0) {
173 io_u->ddir = ipo->ddir;
174 if (ipo->ddir != DDIR_WAIT) {
175 io_u->offset = ipo->offset;
176 io_u->buflen = ipo->len;
177 io_u->file = td->files[ipo->fileno];
178 get_file(io_u->file);
179 dprint(FD_IO, "iolog: get %llu/%llu/%s\n", io_u->offset,
180 io_u->buflen, io_u->file->file_name);
182 iolog_delay(td, ipo->delay);
184 elapsed = mtime_since_genesis();
185 if (ipo->delay > elapsed)
186 usec_sleep(td, (ipo->delay - elapsed) * 1000);
191 if (io_u->ddir != DDIR_WAIT)
199 void prune_io_piece_log(struct thread_data *td)
201 struct io_piece *ipo;
202 struct fio_rb_node *n;
204 while ((n = rb_first(&td->io_hist_tree)) != NULL) {
205 ipo = rb_entry(n, struct io_piece, rb_node);
206 rb_erase(n, &td->io_hist_tree);
207 remove_trim_entry(td, ipo);
212 while (!flist_empty(&td->io_hist_list)) {
213 ipo = flist_first_entry(&td->io_hist_list, struct io_piece, list);
214 flist_del(&ipo->list);
215 remove_trim_entry(td, ipo);
222 * log a successful write, so we can unwind the log for verify
224 void log_io_piece(struct thread_data *td, struct io_u *io_u)
226 struct fio_rb_node **p, *parent;
227 struct io_piece *ipo, *__ipo;
229 ipo = calloc(1, sizeof(struct io_piece));
231 ipo->file = io_u->file;
232 ipo->offset = io_u->offset;
233 ipo->len = io_u->buflen;
234 ipo->numberio = io_u->numberio;
235 ipo->flags = IP_F_IN_FLIGHT;
239 if (io_u_should_trim(td, io_u)) {
240 flist_add_tail(&ipo->trim_list, &td->trim_list);
245 * Only sort writes if we don't have a random map in which case we need
246 * to check for duplicate blocks and drop the old one, which we rely on
247 * the rb insert/lookup for handling.
249 if (file_randommap(td, ipo->file)) {
250 INIT_FLIST_HEAD(&ipo->list);
251 flist_add_tail(&ipo->list, &td->io_hist_list);
252 ipo->flags |= IP_F_ONLIST;
257 RB_CLEAR_NODE(&ipo->rb_node);
260 * Sort the entry into the verification list
263 p = &td->io_hist_tree.rb_node;
269 __ipo = rb_entry(parent, struct io_piece, rb_node);
270 if (ipo->file < __ipo->file)
272 else if (ipo->file > __ipo->file)
274 else if (ipo->offset < __ipo->offset) {
276 overlap = ipo->offset + ipo->len > __ipo->offset;
278 else if (ipo->offset > __ipo->offset) {
280 overlap = __ipo->offset + __ipo->len > ipo->offset;
286 dprint(FD_IO, "iolog: overlap %llu/%lu, %llu/%lu\n",
287 __ipo->offset, __ipo->len,
288 ipo->offset, ipo->len);
290 rb_erase(parent, &td->io_hist_tree);
291 remove_trim_entry(td, __ipo);
292 if (!(__ipo->flags & IP_F_IN_FLIGHT))
298 rb_link_node(&ipo->rb_node, parent, p);
299 rb_insert_color(&ipo->rb_node, &td->io_hist_tree);
300 ipo->flags |= IP_F_ONRB;
304 void unlog_io_piece(struct thread_data *td, struct io_u *io_u)
306 struct io_piece *ipo = io_u->ipo;
308 if (td->ts.nr_block_infos) {
309 uint32_t *info = io_u_block_info(td, io_u);
310 if (BLOCK_INFO_STATE(*info) < BLOCK_STATE_TRIM_FAILURE) {
311 if (io_u->ddir == DDIR_TRIM)
312 *info = BLOCK_INFO_SET_STATE(*info,
313 BLOCK_STATE_TRIM_FAILURE);
314 else if (io_u->ddir == DDIR_WRITE)
315 *info = BLOCK_INFO_SET_STATE(*info,
316 BLOCK_STATE_WRITE_FAILURE);
323 if (ipo->flags & IP_F_ONRB)
324 rb_erase(&ipo->rb_node, &td->io_hist_tree);
325 else if (ipo->flags & IP_F_ONLIST)
326 flist_del(&ipo->list);
333 void trim_io_piece(const struct io_u *io_u)
335 struct io_piece *ipo = io_u->ipo;
340 ipo->len = io_u->xfer_buflen - io_u->resid;
343 void write_iolog_close(struct thread_data *td)
349 td->iolog_buf = NULL;
352 static int64_t iolog_items_to_fetch(struct thread_data *td)
357 int64_t items_to_fetch;
359 if (!td->io_log_highmark)
363 fio_gettime(&now, NULL);
364 elapsed = ntime_since(&td->io_log_highmark_time, &now);
366 for_1s = (td->io_log_highmark - td->io_log_current) * 1000000000 / elapsed;
367 items_to_fetch = for_1s - td->io_log_current;
368 if (items_to_fetch < 0)
373 td->io_log_highmark = td->io_log_current + items_to_fetch;
374 td->io_log_checkmark = (td->io_log_highmark + 1) / 2;
375 fio_gettime(&td->io_log_highmark_time, NULL);
377 return items_to_fetch;
381 * Read version 2 iolog data. It is enhanced to include per-file logging,
384 static bool read_iolog2(struct thread_data *td)
386 unsigned long long offset;
388 int reads, writes, waits, fileno = 0, file_action = 0; /* stupid gcc */
389 char *rfname, *fname, *act;
392 bool realloc = false;
393 int64_t items_to_fetch = 0;
395 if (td->o.read_iolog_chunked) {
396 items_to_fetch = iolog_items_to_fetch(td);
402 * Read in the read iolog and store it, reuse the infrastructure
403 * for doing verifications.
406 rfname = fname = malloc(256+16);
407 act = malloc(256+16);
409 reads = writes = waits = 0;
410 while ((p = fgets(str, 4096, td->io_log_rfile)) != NULL) {
411 struct io_piece *ipo;
414 r = sscanf(p, "%256s %256s %llu %u", rfname, act, &offset,
417 if (td->o.replay_redirect)
418 fname = td->o.replay_redirect;
424 if (!strcmp(act, "wait"))
426 else if (!strcmp(act, "read"))
428 else if (!strcmp(act, "write"))
430 else if (!strcmp(act, "sync"))
432 else if (!strcmp(act, "datasync"))
434 else if (!strcmp(act, "trim"))
437 log_err("fio: bad iolog file action: %s\n",
441 fileno = get_fileno(td, fname);
444 if (!strcmp(act, "add")) {
445 if (td->o.replay_redirect &&
446 get_fileno(td, fname) != -1) {
447 dprint(FD_FILE, "iolog: ignoring"
448 " re-add of file %s\n", fname);
450 fileno = add_file(td, fname, td->subjob_number, 1);
451 file_action = FIO_LOG_ADD_FILE;
454 } else if (!strcmp(act, "open")) {
455 fileno = get_fileno(td, fname);
456 file_action = FIO_LOG_OPEN_FILE;
457 } else if (!strcmp(act, "close")) {
458 fileno = get_fileno(td, fname);
459 file_action = FIO_LOG_CLOSE_FILE;
461 log_err("fio: bad iolog file action: %s\n",
466 log_err("bad iolog2: %s\n", p);
472 else if (rw == DDIR_WRITE) {
474 * Don't add a write for ro mode
479 } else if (rw == DDIR_WAIT) {
483 } else if (rw == DDIR_INVAL) {
484 } else if (!ddir_sync(rw)) {
485 log_err("bad ddir: %d\n", rw);
492 ipo = calloc(1, sizeof(*ipo));
495 if (rw == DDIR_WAIT) {
498 if (td->o.replay_scale)
499 ipo->offset = offset / td->o.replay_scale;
501 ipo->offset = offset;
502 ipo_bytes_align(td->o.replay_align, ipo);
505 if (rw != DDIR_INVAL && bytes > td->o.max_bs[rw]) {
507 td->o.max_bs[rw] = bytes;
509 ipo->fileno = fileno;
510 ipo->file_action = file_action;
514 queue_io_piece(td, ipo);
516 if (td->o.read_iolog_chunked) {
517 td->io_log_current++;
519 if (items_to_fetch == 0)
528 if (td->o.read_iolog_chunked) {
529 td->io_log_highmark = td->io_log_current;
530 td->io_log_checkmark = (td->io_log_highmark + 1) / 2;
531 fio_gettime(&td->io_log_highmark_time, NULL);
534 if (writes && read_only) {
535 log_err("fio: <%s> skips replay of %d writes due to"
536 " read-only\n", td->o.name, writes);
540 if (td->o.read_iolog_chunked) {
541 if (td->io_log_current == 0) {
544 td->o.td_ddir = TD_DDIR_RW;
545 if (realloc && td->orig_buffer)
549 init_io_u_buffers(td);
554 if (!reads && !writes && !waits)
556 else if (reads && !writes)
557 td->o.td_ddir = TD_DDIR_READ;
558 else if (!reads && writes)
559 td->o.td_ddir = TD_DDIR_WRITE;
561 td->o.td_ddir = TD_DDIR_RW;
566 static bool is_socket(const char *path)
569 int r = stat(path, &buf);
573 return S_ISSOCK(buf.st_mode);
576 static int open_socket(const char *path)
578 int fd = socket(AF_UNIX, SOCK_STREAM, 0);
579 struct sockaddr_un addr;
582 addr.sun_family = AF_UNIX;
583 if (snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", path) >=
584 sizeof(addr.sun_path))
585 log_err("%s: path name %s is too long for a Unix socket\n",
587 if (connect(fd, (const struct sockaddr *)&addr, strlen(path) + sizeof(addr.sun_family)) == 0)
595 * open iolog, check version, and call appropriate parser
597 static bool init_iolog_read(struct thread_data *td)
599 char buffer[256], *p;
602 char* fname = get_name_by_idx(td->o.read_iolog_file, td->subjob_number);
603 dprint(FD_IO, "iolog: name=%s\n", fname);
605 if (is_socket(fname)) {
606 int fd = open_socket(fname);
611 f = fopen(fname, "r");
614 perror("fopen read iolog");
618 p = fgets(buffer, sizeof(buffer), f);
620 td_verror(td, errno, "iolog read");
621 log_err("fio: unable to read iolog\n");
625 td->io_log_rfile = f;
627 * version 2 of the iolog stores a specific string as the
628 * first line, check for that
630 if (!strncmp(iolog_ver2, buffer, strlen(iolog_ver2))) {
631 free_release_files(td);
632 ret = read_iolog2(td);
635 log_err("fio: iolog version 1 is no longer supported\n");
643 * Set up a log for storing io patterns.
645 static bool init_iolog_write(struct thread_data *td)
651 f = fopen(td->o.write_iolog_file, "a");
653 perror("fopen write iolog");
658 * That's it for writing, setup a log buffer and we're done.
661 td->iolog_buf = malloc(8192);
662 setvbuf(f, td->iolog_buf, _IOFBF, 8192);
665 * write our version line
667 if (fprintf(f, "%s\n", iolog_ver2) < 0) {
668 perror("iolog init\n");
673 * add all known files
675 for_each_file(td, ff, i)
676 log_file(td, ff, FIO_LOG_ADD_FILE);
681 bool init_iolog(struct thread_data *td)
685 if (td->o.read_iolog_file) {
689 * Check if it's a blktrace file and load that if possible.
690 * Otherwise assume it's a normal log file and load that.
692 if (is_blktrace(td->o.read_iolog_file, &need_swap))
693 ret = load_blktrace(td, td->o.read_iolog_file, need_swap);
695 ret = init_iolog_read(td);
696 } else if (td->o.write_iolog_file)
697 ret = init_iolog_write(td);
702 td_verror(td, EINVAL, "failed initializing iolog");
707 void setup_log(struct io_log **log, struct log_params *p,
708 const char *filename)
712 struct io_u_plat_entry *entry;
713 struct flist_head *list;
715 l = scalloc(1, sizeof(*l));
716 INIT_FLIST_HEAD(&l->io_logs);
717 l->log_type = p->log_type;
718 l->log_offset = p->log_offset;
719 l->log_gz = p->log_gz;
720 l->log_gz_store = p->log_gz_store;
721 l->avg_msec = p->avg_msec;
722 l->hist_msec = p->hist_msec;
723 l->hist_coarseness = p->hist_coarseness;
724 l->filename = strdup(filename);
727 /* Initialize histogram lists for each r/w direction,
728 * with initial io_u_plat of all zeros:
730 for (i = 0; i < DDIR_RWDIR_CNT; i++) {
731 list = &l->hist_window[i].list;
732 INIT_FLIST_HEAD(list);
733 entry = calloc(1, sizeof(struct io_u_plat_entry));
734 flist_add(&entry->list, list);
737 if (l->td && l->td->o.io_submit_mode != IO_MODE_OFFLOAD) {
740 __p = calloc(1, sizeof(*l->pending));
741 __p->max_samples = DEF_LOG_ENTRIES;
742 __p->log = calloc(__p->max_samples, log_entry_sz(l));
747 l->log_ddir_mask = LOG_OFFSET_SAMPLE_BIT;
749 INIT_FLIST_HEAD(&l->chunk_list);
751 if (l->log_gz && !p->td)
753 else if (l->log_gz || l->log_gz_store) {
754 mutex_init_pshared(&l->chunk_lock);
755 mutex_init_pshared(&l->deferred_free_lock);
756 p->td->flags |= TD_F_COMPRESS_LOG;
762 #ifdef CONFIG_SETVBUF
763 static void *set_file_buffer(FILE *f)
765 size_t size = 1048576;
769 setvbuf(f, buf, _IOFBF, size);
773 static void clear_file_buffer(void *buf)
778 static void *set_file_buffer(FILE *f)
783 static void clear_file_buffer(void *buf)
788 void free_log(struct io_log *log)
790 while (!flist_empty(&log->io_logs)) {
791 struct io_logs *cur_log;
793 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
794 flist_del_init(&cur_log->list);
800 free(log->pending->log);
810 uint64_t hist_sum(int j, int stride, uint64_t *io_u_plat,
811 uint64_t *io_u_plat_last)
816 if (io_u_plat_last) {
817 for (k = sum = 0; k < stride; k++)
818 sum += io_u_plat[j + k] - io_u_plat_last[j + k];
820 for (k = sum = 0; k < stride; k++)
821 sum += io_u_plat[j + k];
827 static void flush_hist_samples(FILE *f, int hist_coarseness, void *samples,
828 uint64_t sample_size)
832 uint64_t i, j, nr_samples;
833 struct io_u_plat_entry *entry, *entry_before;
835 uint64_t *io_u_plat_before;
837 int stride = 1 << hist_coarseness;
842 s = __get_sample(samples, 0, 0);
843 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
845 nr_samples = sample_size / __log_entry_sz(log_offset);
847 for (i = 0; i < nr_samples; i++) {
848 s = __get_sample(samples, log_offset, i);
850 entry = s->data.plat_entry;
851 io_u_plat = entry->io_u_plat;
853 entry_before = flist_first_entry(&entry->list, struct io_u_plat_entry, list);
854 io_u_plat_before = entry_before->io_u_plat;
856 fprintf(f, "%lu, %u, %llu, ", (unsigned long) s->time,
857 io_sample_ddir(s), (unsigned long long) s->bs);
858 for (j = 0; j < FIO_IO_U_PLAT_NR - stride; j += stride) {
859 fprintf(f, "%llu, ", (unsigned long long)
860 hist_sum(j, stride, io_u_plat, io_u_plat_before));
862 fprintf(f, "%llu\n", (unsigned long long)
863 hist_sum(FIO_IO_U_PLAT_NR - stride, stride, io_u_plat,
866 flist_del(&entry_before->list);
871 void flush_samples(FILE *f, void *samples, uint64_t sample_size)
875 uint64_t i, nr_samples;
880 s = __get_sample(samples, 0, 0);
881 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
883 nr_samples = sample_size / __log_entry_sz(log_offset);
885 for (i = 0; i < nr_samples; i++) {
886 s = __get_sample(samples, log_offset, i);
889 fprintf(f, "%lu, %" PRId64 ", %u, %llu\n",
890 (unsigned long) s->time,
892 io_sample_ddir(s), (unsigned long long) s->bs);
894 struct io_sample_offset *so = (void *) s;
896 fprintf(f, "%lu, %" PRId64 ", %u, %llu, %llu\n",
897 (unsigned long) s->time,
899 io_sample_ddir(s), (unsigned long long) s->bs,
900 (unsigned long long) so->offset);
907 struct iolog_flush_data {
908 struct workqueue_work work;
915 #define GZ_CHUNK 131072
917 static struct iolog_compress *get_new_chunk(unsigned int seq)
919 struct iolog_compress *c;
921 c = malloc(sizeof(*c));
922 INIT_FLIST_HEAD(&c->list);
923 c->buf = malloc(GZ_CHUNK);
929 static void free_chunk(struct iolog_compress *ic)
935 static int z_stream_init(z_stream *stream, int gz_hdr)
939 memset(stream, 0, sizeof(*stream));
940 stream->zalloc = Z_NULL;
941 stream->zfree = Z_NULL;
942 stream->opaque = Z_NULL;
943 stream->next_in = Z_NULL;
946 * zlib magic - add 32 for auto-detection of gz header or not,
947 * if we decide to store files in a gzip friendly format.
952 if (inflateInit2(stream, wbits) != Z_OK)
958 struct inflate_chunk_iter {
967 static void finish_chunk(z_stream *stream, FILE *f,
968 struct inflate_chunk_iter *iter)
972 ret = inflateEnd(stream);
974 log_err("fio: failed to end log inflation seq %d (%d)\n",
977 flush_samples(f, iter->buf, iter->buf_used);
980 iter->buf_size = iter->buf_used = 0;
984 * Iterative chunk inflation. Handles cases where we cross into a new
985 * sequence, doing flush finish of previous chunk if needed.
987 static size_t inflate_chunk(struct iolog_compress *ic, int gz_hdr, FILE *f,
988 z_stream *stream, struct inflate_chunk_iter *iter)
992 dprint(FD_COMPRESS, "inflate chunk size=%lu, seq=%u\n",
993 (unsigned long) ic->len, ic->seq);
995 if (ic->seq != iter->seq) {
997 finish_chunk(stream, f, iter);
999 z_stream_init(stream, gz_hdr);
1000 iter->seq = ic->seq;
1003 stream->avail_in = ic->len;
1004 stream->next_in = ic->buf;
1006 if (!iter->buf_size) {
1007 iter->buf_size = iter->chunk_sz;
1008 iter->buf = malloc(iter->buf_size);
1011 while (stream->avail_in) {
1012 size_t this_out = iter->buf_size - iter->buf_used;
1015 stream->avail_out = this_out;
1016 stream->next_out = iter->buf + iter->buf_used;
1018 err = inflate(stream, Z_NO_FLUSH);
1020 log_err("fio: failed inflating log: %d\n", err);
1025 iter->buf_used += this_out - stream->avail_out;
1027 if (!stream->avail_out) {
1028 iter->buf_size += iter->chunk_sz;
1029 iter->buf = realloc(iter->buf, iter->buf_size);
1033 if (err == Z_STREAM_END)
1037 ret = (void *) stream->next_in - ic->buf;
1039 dprint(FD_COMPRESS, "inflated to size=%lu\n", (unsigned long) iter->buf_size);
1045 * Inflate stored compressed chunks, or write them directly to the log
1046 * file if so instructed.
1048 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1050 struct inflate_chunk_iter iter = { .chunk_sz = log->log_gz, };
1053 while (!flist_empty(&log->chunk_list)) {
1054 struct iolog_compress *ic;
1056 ic = flist_first_entry(&log->chunk_list, struct iolog_compress, list);
1057 flist_del(&ic->list);
1059 if (log->log_gz_store) {
1062 dprint(FD_COMPRESS, "log write chunk size=%lu, "
1063 "seq=%u\n", (unsigned long) ic->len, ic->seq);
1065 ret = fwrite(ic->buf, ic->len, 1, f);
1066 if (ret != 1 || ferror(f)) {
1068 log_err("fio: error writing compressed log\n");
1071 inflate_chunk(ic, log->log_gz_store, f, &stream, &iter);
1077 finish_chunk(&stream, f, &iter);
1085 * Open compressed log file and decompress the stored chunks and
1086 * write them to stdout. The chunks are stored sequentially in the
1087 * file, so we iterate over them and do them one-by-one.
1089 int iolog_file_inflate(const char *file)
1091 struct inflate_chunk_iter iter = { .chunk_sz = 64 * 1024 * 1024, };
1092 struct iolog_compress ic;
1100 f = fopen(file, "r");
1106 if (stat(file, &sb) < 0) {
1112 ic.buf = buf = malloc(sb.st_size);
1113 ic.len = sb.st_size;
1116 ret = fread(ic.buf, ic.len, 1, f);
1117 if (ret == 0 && ferror(f)) {
1122 } else if (ferror(f) || (!feof(f) && ret != 1)) {
1123 log_err("fio: short read on reading log\n");
1132 * Each chunk will return Z_STREAM_END. We don't know how many
1133 * chunks are in the file, so we just keep looping and incrementing
1134 * the sequence number until we have consumed the whole compressed
1141 iret = inflate_chunk(&ic, 1, stdout, &stream, &iter);
1154 finish_chunk(&stream, stdout, &iter);
1164 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1169 int iolog_file_inflate(const char *file)
1171 log_err("fio: log inflation not possible without zlib\n");
1177 void flush_log(struct io_log *log, bool do_append)
1183 f = fopen(log->filename, "w");
1185 f = fopen(log->filename, "a");
1187 perror("fopen log");
1191 buf = set_file_buffer(f);
1193 inflate_gz_chunks(log, f);
1195 while (!flist_empty(&log->io_logs)) {
1196 struct io_logs *cur_log;
1198 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1199 flist_del_init(&cur_log->list);
1201 if (log->td && log == log->td->clat_hist_log)
1202 flush_hist_samples(f, log->hist_coarseness, cur_log->log,
1203 log_sample_sz(log, cur_log));
1205 flush_samples(f, cur_log->log, log_sample_sz(log, cur_log));
1211 clear_file_buffer(buf);
1214 static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
1216 if (td->flags & TD_F_COMPRESS_LOG)
1220 if (fio_trylock_file(log->filename))
1223 fio_lock_file(log->filename);
1225 if (td->client_type == FIO_CLIENT_TYPE_GUI || is_backend)
1226 fio_send_iolog(td, log, log->filename);
1228 flush_log(log, !td->o.per_job_logs);
1230 fio_unlock_file(log->filename);
1235 size_t log_chunk_sizes(struct io_log *log)
1237 struct flist_head *entry;
1240 if (flist_empty(&log->chunk_list))
1244 pthread_mutex_lock(&log->chunk_lock);
1245 flist_for_each(entry, &log->chunk_list) {
1246 struct iolog_compress *c;
1248 c = flist_entry(entry, struct iolog_compress, list);
1251 pthread_mutex_unlock(&log->chunk_lock);
1257 static void iolog_put_deferred(struct io_log *log, void *ptr)
1262 pthread_mutex_lock(&log->deferred_free_lock);
1263 if (log->deferred < IOLOG_MAX_DEFER) {
1264 log->deferred_items[log->deferred] = ptr;
1266 } else if (!fio_did_warn(FIO_WARN_IOLOG_DROP))
1267 log_err("fio: had to drop log entry free\n");
1268 pthread_mutex_unlock(&log->deferred_free_lock);
1271 static void iolog_free_deferred(struct io_log *log)
1278 pthread_mutex_lock(&log->deferred_free_lock);
1280 for (i = 0; i < log->deferred; i++) {
1281 free(log->deferred_items[i]);
1282 log->deferred_items[i] = NULL;
1286 pthread_mutex_unlock(&log->deferred_free_lock);
1289 static int gz_work(struct iolog_flush_data *data)
1291 struct iolog_compress *c = NULL;
1292 struct flist_head list;
1298 INIT_FLIST_HEAD(&list);
1300 memset(&stream, 0, sizeof(stream));
1301 stream.zalloc = Z_NULL;
1302 stream.zfree = Z_NULL;
1303 stream.opaque = Z_NULL;
1305 ret = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
1307 log_err("fio: failed to init gz stream\n");
1311 seq = ++data->log->chunk_seq;
1313 stream.next_in = (void *) data->samples;
1314 stream.avail_in = data->nr_samples * log_entry_sz(data->log);
1316 dprint(FD_COMPRESS, "deflate input size=%lu, seq=%u, log=%s\n",
1317 (unsigned long) stream.avail_in, seq,
1318 data->log->filename);
1321 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1322 (unsigned long) c->len);
1323 c = get_new_chunk(seq);
1324 stream.avail_out = GZ_CHUNK;
1325 stream.next_out = c->buf;
1326 ret = deflate(&stream, Z_NO_FLUSH);
1328 log_err("fio: deflate log (%d)\n", ret);
1333 c->len = GZ_CHUNK - stream.avail_out;
1334 flist_add_tail(&c->list, &list);
1336 } while (stream.avail_in);
1338 stream.next_out = c->buf + c->len;
1339 stream.avail_out = GZ_CHUNK - c->len;
1341 ret = deflate(&stream, Z_FINISH);
1344 * Z_BUF_ERROR is special, it just means we need more
1345 * output space. We'll handle that below. Treat any other
1348 if (ret != Z_BUF_ERROR) {
1349 log_err("fio: deflate log (%d)\n", ret);
1350 flist_del(&c->list);
1357 c->len = GZ_CHUNK - stream.avail_out;
1359 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq, (unsigned long) c->len);
1361 if (ret != Z_STREAM_END) {
1363 c = get_new_chunk(seq);
1364 stream.avail_out = GZ_CHUNK;
1365 stream.next_out = c->buf;
1366 ret = deflate(&stream, Z_FINISH);
1367 c->len = GZ_CHUNK - stream.avail_out;
1369 flist_add_tail(&c->list, &list);
1370 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1371 (unsigned long) c->len);
1372 } while (ret != Z_STREAM_END);
1375 dprint(FD_COMPRESS, "deflated to size=%lu\n", (unsigned long) total);
1377 ret = deflateEnd(&stream);
1379 log_err("fio: deflateEnd %d\n", ret);
1381 iolog_put_deferred(data->log, data->samples);
1383 if (!flist_empty(&list)) {
1384 pthread_mutex_lock(&data->log->chunk_lock);
1385 flist_splice_tail(&list, &data->log->chunk_list);
1386 pthread_mutex_unlock(&data->log->chunk_lock);
1395 while (!flist_empty(&list)) {
1396 c = flist_first_entry(list.next, struct iolog_compress, list);
1397 flist_del(&c->list);
1405 * Invoked from our compress helper thread, when logging would have exceeded
1406 * the specified memory limitation. Compresses the previously stored
1409 static int gz_work_async(struct submit_worker *sw, struct workqueue_work *work)
1411 return gz_work(container_of(work, struct iolog_flush_data, work));
1414 static int gz_init_worker(struct submit_worker *sw)
1416 struct thread_data *td = sw->wq->td;
1418 if (!fio_option_is_set(&td->o, log_gz_cpumask))
1421 if (fio_setaffinity(gettid(), td->o.log_gz_cpumask) == -1) {
1422 log_err("gz: failed to set CPU affinity\n");
1429 static struct workqueue_ops log_compress_wq_ops = {
1430 .fn = gz_work_async,
1431 .init_worker_fn = gz_init_worker,
1435 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1437 if (!(td->flags & TD_F_COMPRESS_LOG))
1440 workqueue_init(td, &td->log_compress_wq, &log_compress_wq_ops, 1, sk_out);
1444 void iolog_compress_exit(struct thread_data *td)
1446 if (!(td->flags & TD_F_COMPRESS_LOG))
1449 workqueue_exit(&td->log_compress_wq);
1453 * Queue work item to compress the existing log entries. We reset the
1454 * current log to a small size, and reference the existing log in the
1455 * data that we queue for compression. Once compression has been done,
1456 * this old log is freed. If called with finish == true, will not return
1457 * until the log compression has completed, and will flush all previous
1460 static int iolog_flush(struct io_log *log)
1462 struct iolog_flush_data *data;
1464 data = malloc(sizeof(*data));
1471 while (!flist_empty(&log->io_logs)) {
1472 struct io_logs *cur_log;
1474 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1475 flist_del_init(&cur_log->list);
1477 data->samples = cur_log->log;
1478 data->nr_samples = cur_log->nr_samples;
1489 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1491 struct iolog_flush_data *data;
1493 data = smalloc(sizeof(*data));
1499 data->samples = cur_log->log;
1500 data->nr_samples = cur_log->nr_samples;
1503 cur_log->nr_samples = cur_log->max_samples = 0;
1504 cur_log->log = NULL;
1506 workqueue_enqueue(&log->td->log_compress_wq, &data->work);
1508 iolog_free_deferred(log);
1514 static int iolog_flush(struct io_log *log)
1519 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1524 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1529 void iolog_compress_exit(struct thread_data *td)
1535 struct io_logs *iolog_cur_log(struct io_log *log)
1537 if (flist_empty(&log->io_logs))
1540 return flist_last_entry(&log->io_logs, struct io_logs, list);
1543 uint64_t iolog_nr_samples(struct io_log *iolog)
1545 struct flist_head *entry;
1548 flist_for_each(entry, &iolog->io_logs) {
1549 struct io_logs *cur_log;
1551 cur_log = flist_entry(entry, struct io_logs, list);
1552 ret += cur_log->nr_samples;
1558 static int __write_log(struct thread_data *td, struct io_log *log, int try)
1561 return finish_log(td, log, try);
1566 static int write_iops_log(struct thread_data *td, int try, bool unit_log)
1570 if (per_unit_log(td->iops_log) != unit_log)
1573 ret = __write_log(td, td->iops_log, try);
1575 td->iops_log = NULL;
1580 static int write_slat_log(struct thread_data *td, int try, bool unit_log)
1587 ret = __write_log(td, td->slat_log, try);
1589 td->slat_log = NULL;
1594 static int write_clat_log(struct thread_data *td, int try, bool unit_log)
1601 ret = __write_log(td, td->clat_log, try);
1603 td->clat_log = NULL;
1608 static int write_clat_hist_log(struct thread_data *td, int try, bool unit_log)
1615 ret = __write_log(td, td->clat_hist_log, try);
1617 td->clat_hist_log = NULL;
1622 static int write_lat_log(struct thread_data *td, int try, bool unit_log)
1629 ret = __write_log(td, td->lat_log, try);
1636 static int write_bandw_log(struct thread_data *td, int try, bool unit_log)
1640 if (per_unit_log(td->bw_log) != unit_log)
1643 ret = __write_log(td, td->bw_log, try);
1656 CLAT_HIST_LOG_MASK = 32,
1663 int (*fn)(struct thread_data *, int, bool);
1666 static struct log_type log_types[] = {
1668 .mask = BW_LOG_MASK,
1669 .fn = write_bandw_log,
1672 .mask = LAT_LOG_MASK,
1673 .fn = write_lat_log,
1676 .mask = SLAT_LOG_MASK,
1677 .fn = write_slat_log,
1680 .mask = CLAT_LOG_MASK,
1681 .fn = write_clat_log,
1684 .mask = IOPS_LOG_MASK,
1685 .fn = write_iops_log,
1688 .mask = CLAT_HIST_LOG_MASK,
1689 .fn = write_clat_hist_log,
1693 void td_writeout_logs(struct thread_data *td, bool unit_logs)
1695 unsigned int log_mask = 0;
1696 unsigned int log_left = ALL_LOG_NR;
1699 old_state = td_bump_runstate(td, TD_FINISHING);
1701 finalize_logs(td, unit_logs);
1704 int prev_log_left = log_left;
1706 for (i = 0; i < ALL_LOG_NR && log_left; i++) {
1707 struct log_type *lt = &log_types[i];
1710 if (!(log_mask & lt->mask)) {
1711 ret = lt->fn(td, log_left != 1, unit_logs);
1714 log_mask |= lt->mask;
1719 if (prev_log_left == log_left)
1723 td_restore_runstate(td, old_state);
1726 void fio_writeout_logs(bool unit_logs)
1728 struct thread_data *td;
1732 td_writeout_logs(td, unit_logs);