2 * Code related to writing an iolog of what a thread is doing, and to
3 * later read that back and replay
23 #include <netinet/in.h>
24 #include <netinet/tcp.h>
25 #include <arpa/inet.h>
27 #include <sys/socket.h>
30 static int iolog_flush(struct io_log *log);
32 static const char iolog_ver2[] = "fio version 2 iolog";
34 void queue_io_piece(struct thread_data *td, struct io_piece *ipo)
36 flist_add_tail(&ipo->list, &td->io_log_list);
37 td->total_io_size += ipo->len;
40 void log_io_u(const struct thread_data *td, const struct io_u *io_u)
42 if (!td->o.write_iolog_file)
45 fprintf(td->iolog_f, "%s %s %llu %llu\n", io_u->file->file_name,
46 io_ddir_name(io_u->ddir),
47 io_u->offset, io_u->buflen);
50 void log_file(struct thread_data *td, struct fio_file *f,
51 enum file_log_act what)
53 const char *act[] = { "add", "open", "close" };
57 if (!td->o.write_iolog_file)
62 * this happens on the pre-open/close done before the job starts
67 fprintf(td->iolog_f, "%s %s\n", f->file_name, act[what]);
70 static void iolog_delay(struct thread_data *td, unsigned long delay)
72 uint64_t usec = utime_since_now(&td->last_issue);
73 unsigned long orig_delay = delay;
77 if (delay < td->time_offset) {
82 delay -= td->time_offset;
88 fio_gettime(&ts, NULL);
89 while (delay && !td->terminate) {
91 if (this_delay > 500000)
94 usec_sleep(td, this_delay);
98 usec = utime_since_now(&ts);
99 if (usec > orig_delay)
100 td->time_offset = usec - orig_delay;
105 static int ipo_special(struct thread_data *td, struct io_piece *ipo)
113 if (ipo->ddir != DDIR_INVAL)
116 f = td->files[ipo->fileno];
118 switch (ipo->file_action) {
119 case FIO_LOG_OPEN_FILE:
120 if (td->o.replay_redirect && fio_file_open(f)) {
121 dprint(FD_FILE, "iolog: ignoring re-open of file %s\n",
125 ret = td_io_open_file(td, f);
128 td_verror(td, ret, "iolog open file");
130 case FIO_LOG_CLOSE_FILE:
131 td_io_close_file(td, f);
133 case FIO_LOG_UNLINK_FILE:
134 td_io_unlink_file(td, f);
137 log_err("fio: bad file action %d\n", ipo->file_action);
144 static bool read_iolog2(struct thread_data *td);
146 int read_iolog_get(struct thread_data *td, struct io_u *io_u)
148 struct io_piece *ipo;
149 unsigned long elapsed;
151 while (!flist_empty(&td->io_log_list)) {
153 if (td->o.read_iolog_chunked) {
154 if (td->io_log_checkmark == td->io_log_current) {
155 if (!read_iolog2(td))
158 td->io_log_current--;
160 ipo = flist_first_entry(&td->io_log_list, struct io_piece, list);
161 flist_del(&ipo->list);
162 remove_trim_entry(td, ipo);
164 ret = ipo_special(td, ipo);
168 } else if (ret > 0) {
173 io_u->ddir = ipo->ddir;
174 if (ipo->ddir != DDIR_WAIT) {
175 io_u->offset = ipo->offset;
176 io_u->buflen = ipo->len;
177 io_u->file = td->files[ipo->fileno];
178 get_file(io_u->file);
179 dprint(FD_IO, "iolog: get %llu/%llu/%s\n", io_u->offset,
180 io_u->buflen, io_u->file->file_name);
182 iolog_delay(td, ipo->delay);
184 elapsed = mtime_since_genesis();
185 if (ipo->delay > elapsed)
186 usec_sleep(td, (ipo->delay - elapsed) * 1000);
191 if (io_u->ddir != DDIR_WAIT)
199 void prune_io_piece_log(struct thread_data *td)
201 struct io_piece *ipo;
202 struct fio_rb_node *n;
204 while ((n = rb_first(&td->io_hist_tree)) != NULL) {
205 ipo = rb_entry(n, struct io_piece, rb_node);
206 rb_erase(n, &td->io_hist_tree);
207 remove_trim_entry(td, ipo);
212 while (!flist_empty(&td->io_hist_list)) {
213 ipo = flist_first_entry(&td->io_hist_list, struct io_piece, list);
214 flist_del(&ipo->list);
215 remove_trim_entry(td, ipo);
222 * log a successful write, so we can unwind the log for verify
224 void log_io_piece(struct thread_data *td, struct io_u *io_u)
226 struct fio_rb_node **p, *parent;
227 struct io_piece *ipo, *__ipo;
229 ipo = calloc(1, sizeof(struct io_piece));
231 ipo->file = io_u->file;
232 ipo->offset = io_u->offset;
233 ipo->len = io_u->buflen;
234 ipo->numberio = io_u->numberio;
235 ipo->flags = IP_F_IN_FLIGHT;
239 if (io_u_should_trim(td, io_u)) {
240 flist_add_tail(&ipo->trim_list, &td->trim_list);
245 * Only sort writes if we don't have a random map in which case we need
246 * to check for duplicate blocks and drop the old one, which we rely on
247 * the rb insert/lookup for handling.
249 if (file_randommap(td, ipo->file)) {
250 INIT_FLIST_HEAD(&ipo->list);
251 flist_add_tail(&ipo->list, &td->io_hist_list);
252 ipo->flags |= IP_F_ONLIST;
257 RB_CLEAR_NODE(&ipo->rb_node);
260 * Sort the entry into the verification list
263 p = &td->io_hist_tree.rb_node;
269 __ipo = rb_entry(parent, struct io_piece, rb_node);
270 if (ipo->file < __ipo->file)
272 else if (ipo->file > __ipo->file)
274 else if (ipo->offset < __ipo->offset) {
276 overlap = ipo->offset + ipo->len > __ipo->offset;
278 else if (ipo->offset > __ipo->offset) {
280 overlap = __ipo->offset + __ipo->len > ipo->offset;
286 dprint(FD_IO, "iolog: overlap %llu/%lu, %llu/%lu\n",
287 __ipo->offset, __ipo->len,
288 ipo->offset, ipo->len);
290 rb_erase(parent, &td->io_hist_tree);
291 remove_trim_entry(td, __ipo);
292 if (!(__ipo->flags & IP_F_IN_FLIGHT))
298 rb_link_node(&ipo->rb_node, parent, p);
299 rb_insert_color(&ipo->rb_node, &td->io_hist_tree);
300 ipo->flags |= IP_F_ONRB;
304 void unlog_io_piece(struct thread_data *td, struct io_u *io_u)
306 struct io_piece *ipo = io_u->ipo;
308 if (td->ts.nr_block_infos) {
309 uint32_t *info = io_u_block_info(td, io_u);
310 if (BLOCK_INFO_STATE(*info) < BLOCK_STATE_TRIM_FAILURE) {
311 if (io_u->ddir == DDIR_TRIM)
312 *info = BLOCK_INFO_SET_STATE(*info,
313 BLOCK_STATE_TRIM_FAILURE);
314 else if (io_u->ddir == DDIR_WRITE)
315 *info = BLOCK_INFO_SET_STATE(*info,
316 BLOCK_STATE_WRITE_FAILURE);
323 if (ipo->flags & IP_F_ONRB)
324 rb_erase(&ipo->rb_node, &td->io_hist_tree);
325 else if (ipo->flags & IP_F_ONLIST)
326 flist_del(&ipo->list);
333 void trim_io_piece(const struct io_u *io_u)
335 struct io_piece *ipo = io_u->ipo;
340 ipo->len = io_u->xfer_buflen - io_u->resid;
343 void write_iolog_close(struct thread_data *td)
349 td->iolog_buf = NULL;
352 static int64_t iolog_items_to_fetch(struct thread_data *td)
357 int64_t items_to_fetch;
359 if (!td->io_log_highmark)
363 fio_gettime(&now, NULL);
364 elapsed = ntime_since(&td->io_log_highmark_time, &now);
366 for_1s = (td->io_log_highmark - td->io_log_current) * 1000000000 / elapsed;
367 items_to_fetch = for_1s - td->io_log_current;
368 if (items_to_fetch < 0)
373 td->io_log_highmark = td->io_log_current + items_to_fetch;
374 td->io_log_checkmark = (td->io_log_highmark + 1) / 2;
375 fio_gettime(&td->io_log_highmark_time, NULL);
377 return items_to_fetch;
381 * Read version 2 iolog data. It is enhanced to include per-file logging,
384 static bool read_iolog2(struct thread_data *td)
386 unsigned long long offset;
388 int reads, writes, waits, fileno = 0, file_action = 0; /* stupid gcc */
389 char *rfname, *fname, *act;
392 bool realloc = false;
393 int64_t items_to_fetch = 0;
395 if (td->o.read_iolog_chunked) {
396 items_to_fetch = iolog_items_to_fetch(td);
402 * Read in the read iolog and store it, reuse the infrastructure
403 * for doing verifications.
406 rfname = fname = malloc(256+16);
407 act = malloc(256+16);
409 reads = writes = waits = 0;
410 while ((p = fgets(str, 4096, td->io_log_rfile)) != NULL) {
411 struct io_piece *ipo;
414 r = sscanf(p, "%256s %256s %llu %u", rfname, act, &offset,
417 if (td->o.replay_redirect)
418 fname = td->o.replay_redirect;
424 if (!strcmp(act, "wait"))
426 else if (!strcmp(act, "read"))
428 else if (!strcmp(act, "write"))
430 else if (!strcmp(act, "sync"))
432 else if (!strcmp(act, "datasync"))
434 else if (!strcmp(act, "trim"))
437 log_err("fio: bad iolog file action: %s\n",
441 fileno = get_fileno(td, fname);
444 if (!strcmp(act, "add")) {
445 if (td->o.replay_redirect &&
446 get_fileno(td, fname) != -1) {
447 dprint(FD_FILE, "iolog: ignoring"
448 " re-add of file %s\n", fname);
450 fileno = add_file(td, fname, td->subjob_number, 1);
451 file_action = FIO_LOG_ADD_FILE;
454 } else if (!strcmp(act, "open")) {
455 fileno = get_fileno(td, fname);
456 file_action = FIO_LOG_OPEN_FILE;
457 } else if (!strcmp(act, "close")) {
458 fileno = get_fileno(td, fname);
459 file_action = FIO_LOG_CLOSE_FILE;
461 log_err("fio: bad iolog file action: %s\n",
466 log_err("bad iolog2: %s\n", p);
472 else if (rw == DDIR_WRITE) {
474 * Don't add a write for ro mode
479 } else if (rw == DDIR_WAIT) {
483 } else if (rw == DDIR_INVAL) {
484 } else if (!ddir_sync(rw)) {
485 log_err("bad ddir: %d\n", rw);
492 ipo = calloc(1, sizeof(*ipo));
495 if (rw == DDIR_WAIT) {
498 if (td->o.replay_scale)
499 ipo->offset = offset / td->o.replay_scale;
501 ipo->offset = offset;
502 ipo_bytes_align(td->o.replay_align, ipo);
505 if (rw != DDIR_INVAL && bytes > td->o.max_bs[rw]) {
507 td->o.max_bs[rw] = bytes;
509 ipo->fileno = fileno;
510 ipo->file_action = file_action;
514 queue_io_piece(td, ipo);
516 if (td->o.read_iolog_chunked) {
517 td->io_log_current++;
519 if (items_to_fetch == 0)
528 if (td->o.read_iolog_chunked) {
529 td->io_log_highmark = td->io_log_current;
530 td->io_log_checkmark = (td->io_log_highmark + 1) / 2;
531 fio_gettime(&td->io_log_highmark_time, NULL);
534 if (writes && read_only) {
535 log_err("fio: <%s> skips replay of %d writes due to"
536 " read-only\n", td->o.name, writes);
540 if (td->o.read_iolog_chunked) {
541 if (td->io_log_current == 0) {
544 td->o.td_ddir = TD_DDIR_RW;
545 if (realloc && td->orig_buffer)
549 init_io_u_buffers(td);
554 if (!reads && !writes && !waits)
556 else if (reads && !writes)
557 td->o.td_ddir = TD_DDIR_READ;
558 else if (!reads && writes)
559 td->o.td_ddir = TD_DDIR_WRITE;
561 td->o.td_ddir = TD_DDIR_RW;
566 static bool is_socket(const char *path)
571 r = stat(path, &buf);
575 return S_ISSOCK(buf.st_mode);
578 static int open_socket(const char *path)
580 struct sockaddr_un addr;
583 fd = socket(AF_UNIX, SOCK_STREAM, 0);
587 addr.sun_family = AF_UNIX;
588 if (snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", path) >=
589 sizeof(addr.sun_path)) {
590 log_err("%s: path name %s is too long for a Unix socket\n",
594 ret = connect(fd, (const struct sockaddr *)&addr, strlen(path) + sizeof(addr.sun_family));
603 * open iolog, check version, and call appropriate parser
605 static bool init_iolog_read(struct thread_data *td)
607 char buffer[256], *p, *fname;
610 fname = get_name_by_idx(td->o.read_iolog_file, td->subjob_number);
611 dprint(FD_IO, "iolog: name=%s\n", fname);
613 if (is_socket(fname)) {
616 fd = open_socket(fname);
620 f = fopen(fname, "r");
625 perror("fopen read iolog");
629 p = fgets(buffer, sizeof(buffer), f);
631 td_verror(td, errno, "iolog read");
632 log_err("fio: unable to read iolog\n");
638 * version 2 of the iolog stores a specific string as the
639 * first line, check for that
641 if (!strncmp(iolog_ver2, buffer, strlen(iolog_ver2))) {
642 free_release_files(td);
643 td->io_log_rfile = f;
644 return read_iolog2(td);
647 log_err("fio: iolog version 1 is no longer supported\n");
653 * Set up a log for storing io patterns.
655 static bool init_iolog_write(struct thread_data *td)
661 f = fopen(td->o.write_iolog_file, "a");
663 perror("fopen write iolog");
668 * That's it for writing, setup a log buffer and we're done.
671 td->iolog_buf = malloc(8192);
672 setvbuf(f, td->iolog_buf, _IOFBF, 8192);
675 * write our version line
677 if (fprintf(f, "%s\n", iolog_ver2) < 0) {
678 perror("iolog init\n");
683 * add all known files
685 for_each_file(td, ff, i)
686 log_file(td, ff, FIO_LOG_ADD_FILE);
691 bool init_iolog(struct thread_data *td)
695 if (td->o.read_iolog_file) {
699 * Check if it's a blktrace file and load that if possible.
700 * Otherwise assume it's a normal log file and load that.
702 if (is_blktrace(td->o.read_iolog_file, &need_swap))
703 ret = load_blktrace(td, td->o.read_iolog_file, need_swap);
705 ret = init_iolog_read(td);
706 } else if (td->o.write_iolog_file)
707 ret = init_iolog_write(td);
712 td_verror(td, EINVAL, "failed initializing iolog");
717 void setup_log(struct io_log **log, struct log_params *p,
718 const char *filename)
722 struct io_u_plat_entry *entry;
723 struct flist_head *list;
725 l = scalloc(1, sizeof(*l));
726 INIT_FLIST_HEAD(&l->io_logs);
727 l->log_type = p->log_type;
728 l->log_offset = p->log_offset;
729 l->log_gz = p->log_gz;
730 l->log_gz_store = p->log_gz_store;
731 l->avg_msec = p->avg_msec;
732 l->hist_msec = p->hist_msec;
733 l->hist_coarseness = p->hist_coarseness;
734 l->filename = strdup(filename);
737 /* Initialize histogram lists for each r/w direction,
738 * with initial io_u_plat of all zeros:
740 for (i = 0; i < DDIR_RWDIR_CNT; i++) {
741 list = &l->hist_window[i].list;
742 INIT_FLIST_HEAD(list);
743 entry = calloc(1, sizeof(struct io_u_plat_entry));
744 flist_add(&entry->list, list);
747 if (l->td && l->td->o.io_submit_mode != IO_MODE_OFFLOAD) {
750 __p = calloc(1, sizeof(*l->pending));
751 __p->max_samples = DEF_LOG_ENTRIES;
752 __p->log = calloc(__p->max_samples, log_entry_sz(l));
757 l->log_ddir_mask = LOG_OFFSET_SAMPLE_BIT;
759 INIT_FLIST_HEAD(&l->chunk_list);
761 if (l->log_gz && !p->td)
763 else if (l->log_gz || l->log_gz_store) {
764 mutex_init_pshared(&l->chunk_lock);
765 mutex_init_pshared(&l->deferred_free_lock);
766 p->td->flags |= TD_F_COMPRESS_LOG;
772 #ifdef CONFIG_SETVBUF
773 static void *set_file_buffer(FILE *f)
775 size_t size = 1048576;
779 setvbuf(f, buf, _IOFBF, size);
783 static void clear_file_buffer(void *buf)
788 static void *set_file_buffer(FILE *f)
793 static void clear_file_buffer(void *buf)
798 void free_log(struct io_log *log)
800 while (!flist_empty(&log->io_logs)) {
801 struct io_logs *cur_log;
803 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
804 flist_del_init(&cur_log->list);
810 free(log->pending->log);
820 uint64_t hist_sum(int j, int stride, uint64_t *io_u_plat,
821 uint64_t *io_u_plat_last)
826 if (io_u_plat_last) {
827 for (k = sum = 0; k < stride; k++)
828 sum += io_u_plat[j + k] - io_u_plat_last[j + k];
830 for (k = sum = 0; k < stride; k++)
831 sum += io_u_plat[j + k];
837 static void flush_hist_samples(FILE *f, int hist_coarseness, void *samples,
838 uint64_t sample_size)
842 uint64_t i, j, nr_samples;
843 struct io_u_plat_entry *entry, *entry_before;
845 uint64_t *io_u_plat_before;
847 int stride = 1 << hist_coarseness;
852 s = __get_sample(samples, 0, 0);
853 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
855 nr_samples = sample_size / __log_entry_sz(log_offset);
857 for (i = 0; i < nr_samples; i++) {
858 s = __get_sample(samples, log_offset, i);
860 entry = s->data.plat_entry;
861 io_u_plat = entry->io_u_plat;
863 entry_before = flist_first_entry(&entry->list, struct io_u_plat_entry, list);
864 io_u_plat_before = entry_before->io_u_plat;
866 fprintf(f, "%lu, %u, %llu, ", (unsigned long) s->time,
867 io_sample_ddir(s), (unsigned long long) s->bs);
868 for (j = 0; j < FIO_IO_U_PLAT_NR - stride; j += stride) {
869 fprintf(f, "%llu, ", (unsigned long long)
870 hist_sum(j, stride, io_u_plat, io_u_plat_before));
872 fprintf(f, "%llu\n", (unsigned long long)
873 hist_sum(FIO_IO_U_PLAT_NR - stride, stride, io_u_plat,
876 flist_del(&entry_before->list);
881 void flush_samples(FILE *f, void *samples, uint64_t sample_size)
885 uint64_t i, nr_samples;
890 s = __get_sample(samples, 0, 0);
891 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
893 nr_samples = sample_size / __log_entry_sz(log_offset);
895 for (i = 0; i < nr_samples; i++) {
896 s = __get_sample(samples, log_offset, i);
899 fprintf(f, "%lu, %" PRId64 ", %u, %llu\n",
900 (unsigned long) s->time,
902 io_sample_ddir(s), (unsigned long long) s->bs);
904 struct io_sample_offset *so = (void *) s;
906 fprintf(f, "%lu, %" PRId64 ", %u, %llu, %llu\n",
907 (unsigned long) s->time,
909 io_sample_ddir(s), (unsigned long long) s->bs,
910 (unsigned long long) so->offset);
917 struct iolog_flush_data {
918 struct workqueue_work work;
925 #define GZ_CHUNK 131072
927 static struct iolog_compress *get_new_chunk(unsigned int seq)
929 struct iolog_compress *c;
931 c = malloc(sizeof(*c));
932 INIT_FLIST_HEAD(&c->list);
933 c->buf = malloc(GZ_CHUNK);
939 static void free_chunk(struct iolog_compress *ic)
945 static int z_stream_init(z_stream *stream, int gz_hdr)
949 memset(stream, 0, sizeof(*stream));
950 stream->zalloc = Z_NULL;
951 stream->zfree = Z_NULL;
952 stream->opaque = Z_NULL;
953 stream->next_in = Z_NULL;
956 * zlib magic - add 32 for auto-detection of gz header or not,
957 * if we decide to store files in a gzip friendly format.
962 if (inflateInit2(stream, wbits) != Z_OK)
968 struct inflate_chunk_iter {
977 static void finish_chunk(z_stream *stream, FILE *f,
978 struct inflate_chunk_iter *iter)
982 ret = inflateEnd(stream);
984 log_err("fio: failed to end log inflation seq %d (%d)\n",
987 flush_samples(f, iter->buf, iter->buf_used);
990 iter->buf_size = iter->buf_used = 0;
994 * Iterative chunk inflation. Handles cases where we cross into a new
995 * sequence, doing flush finish of previous chunk if needed.
997 static size_t inflate_chunk(struct iolog_compress *ic, int gz_hdr, FILE *f,
998 z_stream *stream, struct inflate_chunk_iter *iter)
1002 dprint(FD_COMPRESS, "inflate chunk size=%lu, seq=%u\n",
1003 (unsigned long) ic->len, ic->seq);
1005 if (ic->seq != iter->seq) {
1007 finish_chunk(stream, f, iter);
1009 z_stream_init(stream, gz_hdr);
1010 iter->seq = ic->seq;
1013 stream->avail_in = ic->len;
1014 stream->next_in = ic->buf;
1016 if (!iter->buf_size) {
1017 iter->buf_size = iter->chunk_sz;
1018 iter->buf = malloc(iter->buf_size);
1021 while (stream->avail_in) {
1022 size_t this_out = iter->buf_size - iter->buf_used;
1025 stream->avail_out = this_out;
1026 stream->next_out = iter->buf + iter->buf_used;
1028 err = inflate(stream, Z_NO_FLUSH);
1030 log_err("fio: failed inflating log: %d\n", err);
1035 iter->buf_used += this_out - stream->avail_out;
1037 if (!stream->avail_out) {
1038 iter->buf_size += iter->chunk_sz;
1039 iter->buf = realloc(iter->buf, iter->buf_size);
1043 if (err == Z_STREAM_END)
1047 ret = (void *) stream->next_in - ic->buf;
1049 dprint(FD_COMPRESS, "inflated to size=%lu\n", (unsigned long) iter->buf_size);
1055 * Inflate stored compressed chunks, or write them directly to the log
1056 * file if so instructed.
1058 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1060 struct inflate_chunk_iter iter = { .chunk_sz = log->log_gz, };
1063 while (!flist_empty(&log->chunk_list)) {
1064 struct iolog_compress *ic;
1066 ic = flist_first_entry(&log->chunk_list, struct iolog_compress, list);
1067 flist_del(&ic->list);
1069 if (log->log_gz_store) {
1072 dprint(FD_COMPRESS, "log write chunk size=%lu, "
1073 "seq=%u\n", (unsigned long) ic->len, ic->seq);
1075 ret = fwrite(ic->buf, ic->len, 1, f);
1076 if (ret != 1 || ferror(f)) {
1078 log_err("fio: error writing compressed log\n");
1081 inflate_chunk(ic, log->log_gz_store, f, &stream, &iter);
1087 finish_chunk(&stream, f, &iter);
1095 * Open compressed log file and decompress the stored chunks and
1096 * write them to stdout. The chunks are stored sequentially in the
1097 * file, so we iterate over them and do them one-by-one.
1099 int iolog_file_inflate(const char *file)
1101 struct inflate_chunk_iter iter = { .chunk_sz = 64 * 1024 * 1024, };
1102 struct iolog_compress ic;
1110 f = fopen(file, "r");
1116 if (stat(file, &sb) < 0) {
1122 ic.buf = buf = malloc(sb.st_size);
1123 ic.len = sb.st_size;
1126 ret = fread(ic.buf, ic.len, 1, f);
1127 if (ret == 0 && ferror(f)) {
1132 } else if (ferror(f) || (!feof(f) && ret != 1)) {
1133 log_err("fio: short read on reading log\n");
1142 * Each chunk will return Z_STREAM_END. We don't know how many
1143 * chunks are in the file, so we just keep looping and incrementing
1144 * the sequence number until we have consumed the whole compressed
1151 iret = inflate_chunk(&ic, 1, stdout, &stream, &iter);
1164 finish_chunk(&stream, stdout, &iter);
1174 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1179 int iolog_file_inflate(const char *file)
1181 log_err("fio: log inflation not possible without zlib\n");
1187 void flush_log(struct io_log *log, bool do_append)
1193 f = fopen(log->filename, "w");
1195 f = fopen(log->filename, "a");
1197 perror("fopen log");
1201 buf = set_file_buffer(f);
1203 inflate_gz_chunks(log, f);
1205 while (!flist_empty(&log->io_logs)) {
1206 struct io_logs *cur_log;
1208 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1209 flist_del_init(&cur_log->list);
1211 if (log->td && log == log->td->clat_hist_log)
1212 flush_hist_samples(f, log->hist_coarseness, cur_log->log,
1213 log_sample_sz(log, cur_log));
1215 flush_samples(f, cur_log->log, log_sample_sz(log, cur_log));
1221 clear_file_buffer(buf);
1224 static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
1226 if (td->flags & TD_F_COMPRESS_LOG)
1230 if (fio_trylock_file(log->filename))
1233 fio_lock_file(log->filename);
1235 if (td->client_type == FIO_CLIENT_TYPE_GUI || is_backend)
1236 fio_send_iolog(td, log, log->filename);
1238 flush_log(log, !td->o.per_job_logs);
1240 fio_unlock_file(log->filename);
1245 size_t log_chunk_sizes(struct io_log *log)
1247 struct flist_head *entry;
1250 if (flist_empty(&log->chunk_list))
1254 pthread_mutex_lock(&log->chunk_lock);
1255 flist_for_each(entry, &log->chunk_list) {
1256 struct iolog_compress *c;
1258 c = flist_entry(entry, struct iolog_compress, list);
1261 pthread_mutex_unlock(&log->chunk_lock);
1267 static void iolog_put_deferred(struct io_log *log, void *ptr)
1272 pthread_mutex_lock(&log->deferred_free_lock);
1273 if (log->deferred < IOLOG_MAX_DEFER) {
1274 log->deferred_items[log->deferred] = ptr;
1276 } else if (!fio_did_warn(FIO_WARN_IOLOG_DROP))
1277 log_err("fio: had to drop log entry free\n");
1278 pthread_mutex_unlock(&log->deferred_free_lock);
1281 static void iolog_free_deferred(struct io_log *log)
1288 pthread_mutex_lock(&log->deferred_free_lock);
1290 for (i = 0; i < log->deferred; i++) {
1291 free(log->deferred_items[i]);
1292 log->deferred_items[i] = NULL;
1296 pthread_mutex_unlock(&log->deferred_free_lock);
1299 static int gz_work(struct iolog_flush_data *data)
1301 struct iolog_compress *c = NULL;
1302 struct flist_head list;
1308 INIT_FLIST_HEAD(&list);
1310 memset(&stream, 0, sizeof(stream));
1311 stream.zalloc = Z_NULL;
1312 stream.zfree = Z_NULL;
1313 stream.opaque = Z_NULL;
1315 ret = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
1317 log_err("fio: failed to init gz stream\n");
1321 seq = ++data->log->chunk_seq;
1323 stream.next_in = (void *) data->samples;
1324 stream.avail_in = data->nr_samples * log_entry_sz(data->log);
1326 dprint(FD_COMPRESS, "deflate input size=%lu, seq=%u, log=%s\n",
1327 (unsigned long) stream.avail_in, seq,
1328 data->log->filename);
1331 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1332 (unsigned long) c->len);
1333 c = get_new_chunk(seq);
1334 stream.avail_out = GZ_CHUNK;
1335 stream.next_out = c->buf;
1336 ret = deflate(&stream, Z_NO_FLUSH);
1338 log_err("fio: deflate log (%d)\n", ret);
1343 c->len = GZ_CHUNK - stream.avail_out;
1344 flist_add_tail(&c->list, &list);
1346 } while (stream.avail_in);
1348 stream.next_out = c->buf + c->len;
1349 stream.avail_out = GZ_CHUNK - c->len;
1351 ret = deflate(&stream, Z_FINISH);
1354 * Z_BUF_ERROR is special, it just means we need more
1355 * output space. We'll handle that below. Treat any other
1358 if (ret != Z_BUF_ERROR) {
1359 log_err("fio: deflate log (%d)\n", ret);
1360 flist_del(&c->list);
1367 c->len = GZ_CHUNK - stream.avail_out;
1369 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq, (unsigned long) c->len);
1371 if (ret != Z_STREAM_END) {
1373 c = get_new_chunk(seq);
1374 stream.avail_out = GZ_CHUNK;
1375 stream.next_out = c->buf;
1376 ret = deflate(&stream, Z_FINISH);
1377 c->len = GZ_CHUNK - stream.avail_out;
1379 flist_add_tail(&c->list, &list);
1380 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1381 (unsigned long) c->len);
1382 } while (ret != Z_STREAM_END);
1385 dprint(FD_COMPRESS, "deflated to size=%lu\n", (unsigned long) total);
1387 ret = deflateEnd(&stream);
1389 log_err("fio: deflateEnd %d\n", ret);
1391 iolog_put_deferred(data->log, data->samples);
1393 if (!flist_empty(&list)) {
1394 pthread_mutex_lock(&data->log->chunk_lock);
1395 flist_splice_tail(&list, &data->log->chunk_list);
1396 pthread_mutex_unlock(&data->log->chunk_lock);
1405 while (!flist_empty(&list)) {
1406 c = flist_first_entry(list.next, struct iolog_compress, list);
1407 flist_del(&c->list);
1415 * Invoked from our compress helper thread, when logging would have exceeded
1416 * the specified memory limitation. Compresses the previously stored
1419 static int gz_work_async(struct submit_worker *sw, struct workqueue_work *work)
1421 return gz_work(container_of(work, struct iolog_flush_data, work));
1424 static int gz_init_worker(struct submit_worker *sw)
1426 struct thread_data *td = sw->wq->td;
1428 if (!fio_option_is_set(&td->o, log_gz_cpumask))
1431 if (fio_setaffinity(gettid(), td->o.log_gz_cpumask) == -1) {
1432 log_err("gz: failed to set CPU affinity\n");
1439 static struct workqueue_ops log_compress_wq_ops = {
1440 .fn = gz_work_async,
1441 .init_worker_fn = gz_init_worker,
1445 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1447 if (!(td->flags & TD_F_COMPRESS_LOG))
1450 workqueue_init(td, &td->log_compress_wq, &log_compress_wq_ops, 1, sk_out);
1454 void iolog_compress_exit(struct thread_data *td)
1456 if (!(td->flags & TD_F_COMPRESS_LOG))
1459 workqueue_exit(&td->log_compress_wq);
1463 * Queue work item to compress the existing log entries. We reset the
1464 * current log to a small size, and reference the existing log in the
1465 * data that we queue for compression. Once compression has been done,
1466 * this old log is freed. If called with finish == true, will not return
1467 * until the log compression has completed, and will flush all previous
1470 static int iolog_flush(struct io_log *log)
1472 struct iolog_flush_data *data;
1474 data = malloc(sizeof(*data));
1481 while (!flist_empty(&log->io_logs)) {
1482 struct io_logs *cur_log;
1484 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1485 flist_del_init(&cur_log->list);
1487 data->samples = cur_log->log;
1488 data->nr_samples = cur_log->nr_samples;
1499 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1501 struct iolog_flush_data *data;
1503 data = smalloc(sizeof(*data));
1509 data->samples = cur_log->log;
1510 data->nr_samples = cur_log->nr_samples;
1513 cur_log->nr_samples = cur_log->max_samples = 0;
1514 cur_log->log = NULL;
1516 workqueue_enqueue(&log->td->log_compress_wq, &data->work);
1518 iolog_free_deferred(log);
1524 static int iolog_flush(struct io_log *log)
1529 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1534 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1539 void iolog_compress_exit(struct thread_data *td)
1545 struct io_logs *iolog_cur_log(struct io_log *log)
1547 if (flist_empty(&log->io_logs))
1550 return flist_last_entry(&log->io_logs, struct io_logs, list);
1553 uint64_t iolog_nr_samples(struct io_log *iolog)
1555 struct flist_head *entry;
1558 flist_for_each(entry, &iolog->io_logs) {
1559 struct io_logs *cur_log;
1561 cur_log = flist_entry(entry, struct io_logs, list);
1562 ret += cur_log->nr_samples;
1568 static int __write_log(struct thread_data *td, struct io_log *log, int try)
1571 return finish_log(td, log, try);
1576 static int write_iops_log(struct thread_data *td, int try, bool unit_log)
1580 if (per_unit_log(td->iops_log) != unit_log)
1583 ret = __write_log(td, td->iops_log, try);
1585 td->iops_log = NULL;
1590 static int write_slat_log(struct thread_data *td, int try, bool unit_log)
1597 ret = __write_log(td, td->slat_log, try);
1599 td->slat_log = NULL;
1604 static int write_clat_log(struct thread_data *td, int try, bool unit_log)
1611 ret = __write_log(td, td->clat_log, try);
1613 td->clat_log = NULL;
1618 static int write_clat_hist_log(struct thread_data *td, int try, bool unit_log)
1625 ret = __write_log(td, td->clat_hist_log, try);
1627 td->clat_hist_log = NULL;
1632 static int write_lat_log(struct thread_data *td, int try, bool unit_log)
1639 ret = __write_log(td, td->lat_log, try);
1646 static int write_bandw_log(struct thread_data *td, int try, bool unit_log)
1650 if (per_unit_log(td->bw_log) != unit_log)
1653 ret = __write_log(td, td->bw_log, try);
1666 CLAT_HIST_LOG_MASK = 32,
1673 int (*fn)(struct thread_data *, int, bool);
1676 static struct log_type log_types[] = {
1678 .mask = BW_LOG_MASK,
1679 .fn = write_bandw_log,
1682 .mask = LAT_LOG_MASK,
1683 .fn = write_lat_log,
1686 .mask = SLAT_LOG_MASK,
1687 .fn = write_slat_log,
1690 .mask = CLAT_LOG_MASK,
1691 .fn = write_clat_log,
1694 .mask = IOPS_LOG_MASK,
1695 .fn = write_iops_log,
1698 .mask = CLAT_HIST_LOG_MASK,
1699 .fn = write_clat_hist_log,
1703 void td_writeout_logs(struct thread_data *td, bool unit_logs)
1705 unsigned int log_mask = 0;
1706 unsigned int log_left = ALL_LOG_NR;
1709 old_state = td_bump_runstate(td, TD_FINISHING);
1711 finalize_logs(td, unit_logs);
1714 int prev_log_left = log_left;
1716 for (i = 0; i < ALL_LOG_NR && log_left; i++) {
1717 struct log_type *lt = &log_types[i];
1720 if (!(log_mask & lt->mask)) {
1721 ret = lt->fn(td, log_left != 1, unit_logs);
1724 log_mask |= lt->mask;
1729 if (prev_log_left == log_left)
1733 td_restore_runstate(td, old_state);
1736 void fio_writeout_logs(bool unit_logs)
1738 struct thread_data *td;
1742 td_writeout_logs(td, unit_logs);