2 * Code related to writing an iolog of what a thread is doing, and to
3 * later read that back and replay
23 #include <netinet/in.h>
24 #include <netinet/tcp.h>
25 #include <arpa/inet.h>
27 #include <sys/socket.h>
30 static int iolog_flush(struct io_log *log);
32 static const char iolog_ver2[] = "fio version 2 iolog";
34 void queue_io_piece(struct thread_data *td, struct io_piece *ipo)
36 flist_add_tail(&ipo->list, &td->io_log_list);
37 td->total_io_size += ipo->len;
40 void log_io_u(const struct thread_data *td, const struct io_u *io_u)
42 if (!td->o.write_iolog_file)
45 fprintf(td->iolog_f, "%s %s %llu %llu\n", io_u->file->file_name,
46 io_ddir_name(io_u->ddir),
47 io_u->offset, io_u->buflen);
50 void log_file(struct thread_data *td, struct fio_file *f,
51 enum file_log_act what)
53 const char *act[] = { "add", "open", "close" };
57 if (!td->o.write_iolog_file)
62 * this happens on the pre-open/close done before the job starts
67 fprintf(td->iolog_f, "%s %s\n", f->file_name, act[what]);
70 static void iolog_delay(struct thread_data *td, unsigned long delay)
72 uint64_t usec = utime_since_now(&td->last_issue);
73 unsigned long orig_delay = delay;
77 if (delay < td->time_offset) {
82 delay -= td->time_offset;
88 fio_gettime(&ts, NULL);
89 while (delay && !td->terminate) {
91 if (this_delay > 500000)
94 usec_sleep(td, this_delay);
98 usec = utime_since_now(&ts);
99 if (usec > orig_delay)
100 td->time_offset = usec - orig_delay;
105 static int ipo_special(struct thread_data *td, struct io_piece *ipo)
113 if (ipo->ddir != DDIR_INVAL)
116 f = td->files[ipo->fileno];
118 switch (ipo->file_action) {
119 case FIO_LOG_OPEN_FILE:
120 if (td->o.replay_redirect && fio_file_open(f)) {
121 dprint(FD_FILE, "iolog: ignoring re-open of file %s\n",
125 ret = td_io_open_file(td, f);
128 td_verror(td, ret, "iolog open file");
130 case FIO_LOG_CLOSE_FILE:
131 td_io_close_file(td, f);
133 case FIO_LOG_UNLINK_FILE:
134 td_io_unlink_file(td, f);
137 log_err("fio: bad file action %d\n", ipo->file_action);
144 static bool read_iolog2(struct thread_data *td);
146 int read_iolog_get(struct thread_data *td, struct io_u *io_u)
148 struct io_piece *ipo;
149 unsigned long elapsed;
151 while (!flist_empty(&td->io_log_list)) {
153 if (td->o.read_iolog_chunked) {
154 if (td->io_log_checkmark == td->io_log_current) {
155 if (!read_iolog2(td))
158 td->io_log_current--;
160 ipo = flist_first_entry(&td->io_log_list, struct io_piece, list);
161 flist_del(&ipo->list);
162 remove_trim_entry(td, ipo);
164 ret = ipo_special(td, ipo);
168 } else if (ret > 0) {
173 io_u->ddir = ipo->ddir;
174 if (ipo->ddir != DDIR_WAIT) {
175 io_u->offset = ipo->offset;
176 io_u->buflen = ipo->len;
177 io_u->file = td->files[ipo->fileno];
178 get_file(io_u->file);
179 dprint(FD_IO, "iolog: get %llu/%llu/%s\n", io_u->offset,
180 io_u->buflen, io_u->file->file_name);
182 iolog_delay(td, ipo->delay);
184 elapsed = mtime_since_genesis();
185 if (ipo->delay > elapsed)
186 usec_sleep(td, (ipo->delay - elapsed) * 1000);
191 if (io_u->ddir != DDIR_WAIT)
199 void prune_io_piece_log(struct thread_data *td)
201 struct io_piece *ipo;
202 struct fio_rb_node *n;
204 while ((n = rb_first(&td->io_hist_tree)) != NULL) {
205 ipo = rb_entry(n, struct io_piece, rb_node);
206 rb_erase(n, &td->io_hist_tree);
207 remove_trim_entry(td, ipo);
212 while (!flist_empty(&td->io_hist_list)) {
213 ipo = flist_first_entry(&td->io_hist_list, struct io_piece, list);
214 flist_del(&ipo->list);
215 remove_trim_entry(td, ipo);
222 * log a successful write, so we can unwind the log for verify
224 void log_io_piece(struct thread_data *td, struct io_u *io_u)
226 struct fio_rb_node **p, *parent;
227 struct io_piece *ipo, *__ipo;
229 ipo = calloc(1, sizeof(struct io_piece));
231 ipo->file = io_u->file;
232 ipo->offset = io_u->offset;
233 ipo->len = io_u->buflen;
234 ipo->numberio = io_u->numberio;
235 ipo->flags = IP_F_IN_FLIGHT;
239 if (io_u_should_trim(td, io_u)) {
240 flist_add_tail(&ipo->trim_list, &td->trim_list);
245 * Only sort writes if we don't have a random map in which case we need
246 * to check for duplicate blocks and drop the old one, which we rely on
247 * the rb insert/lookup for handling.
249 if (file_randommap(td, ipo->file)) {
250 INIT_FLIST_HEAD(&ipo->list);
251 flist_add_tail(&ipo->list, &td->io_hist_list);
252 ipo->flags |= IP_F_ONLIST;
257 RB_CLEAR_NODE(&ipo->rb_node);
260 * Sort the entry into the verification list
263 p = &td->io_hist_tree.rb_node;
269 __ipo = rb_entry(parent, struct io_piece, rb_node);
270 if (ipo->file < __ipo->file)
272 else if (ipo->file > __ipo->file)
274 else if (ipo->offset < __ipo->offset) {
276 overlap = ipo->offset + ipo->len > __ipo->offset;
278 else if (ipo->offset > __ipo->offset) {
280 overlap = __ipo->offset + __ipo->len > ipo->offset;
286 dprint(FD_IO, "iolog: overlap %llu/%lu, %llu/%lu\n",
287 __ipo->offset, __ipo->len,
288 ipo->offset, ipo->len);
290 rb_erase(parent, &td->io_hist_tree);
291 remove_trim_entry(td, __ipo);
292 if (!(__ipo->flags & IP_F_IN_FLIGHT))
298 rb_link_node(&ipo->rb_node, parent, p);
299 rb_insert_color(&ipo->rb_node, &td->io_hist_tree);
300 ipo->flags |= IP_F_ONRB;
304 void unlog_io_piece(struct thread_data *td, struct io_u *io_u)
306 struct io_piece *ipo = io_u->ipo;
308 if (td->ts.nr_block_infos) {
309 uint32_t *info = io_u_block_info(td, io_u);
310 if (BLOCK_INFO_STATE(*info) < BLOCK_STATE_TRIM_FAILURE) {
311 if (io_u->ddir == DDIR_TRIM)
312 *info = BLOCK_INFO_SET_STATE(*info,
313 BLOCK_STATE_TRIM_FAILURE);
314 else if (io_u->ddir == DDIR_WRITE)
315 *info = BLOCK_INFO_SET_STATE(*info,
316 BLOCK_STATE_WRITE_FAILURE);
323 if (ipo->flags & IP_F_ONRB)
324 rb_erase(&ipo->rb_node, &td->io_hist_tree);
325 else if (ipo->flags & IP_F_ONLIST)
326 flist_del(&ipo->list);
333 void trim_io_piece(const struct io_u *io_u)
335 struct io_piece *ipo = io_u->ipo;
340 ipo->len = io_u->xfer_buflen - io_u->resid;
343 void write_iolog_close(struct thread_data *td)
352 td->iolog_buf = NULL;
355 static int64_t iolog_items_to_fetch(struct thread_data *td)
360 int64_t items_to_fetch;
362 if (!td->io_log_highmark)
366 fio_gettime(&now, NULL);
367 elapsed = ntime_since(&td->io_log_highmark_time, &now);
369 for_1s = (td->io_log_highmark - td->io_log_current) * 1000000000 / elapsed;
370 items_to_fetch = for_1s - td->io_log_current;
371 if (items_to_fetch < 0)
376 td->io_log_highmark = td->io_log_current + items_to_fetch;
377 td->io_log_checkmark = (td->io_log_highmark + 1) / 2;
378 fio_gettime(&td->io_log_highmark_time, NULL);
380 return items_to_fetch;
384 * Read version 2 iolog data. It is enhanced to include per-file logging,
387 static bool read_iolog2(struct thread_data *td)
389 unsigned long long offset;
391 int reads, writes, waits, fileno = 0, file_action = 0; /* stupid gcc */
392 char *rfname, *fname, *act;
395 bool realloc = false;
396 int64_t items_to_fetch = 0;
398 if (td->o.read_iolog_chunked) {
399 items_to_fetch = iolog_items_to_fetch(td);
405 * Read in the read iolog and store it, reuse the infrastructure
406 * for doing verifications.
409 rfname = fname = malloc(256+16);
410 act = malloc(256+16);
412 reads = writes = waits = 0;
413 while ((p = fgets(str, 4096, td->io_log_rfile)) != NULL) {
414 struct io_piece *ipo;
417 r = sscanf(p, "%256s %256s %llu %u", rfname, act, &offset,
420 if (td->o.replay_redirect)
421 fname = td->o.replay_redirect;
427 if (!strcmp(act, "wait"))
429 else if (!strcmp(act, "read"))
431 else if (!strcmp(act, "write"))
433 else if (!strcmp(act, "sync"))
435 else if (!strcmp(act, "datasync"))
437 else if (!strcmp(act, "trim"))
440 log_err("fio: bad iolog file action: %s\n",
444 fileno = get_fileno(td, fname);
447 if (!strcmp(act, "add")) {
448 if (td->o.replay_redirect &&
449 get_fileno(td, fname) != -1) {
450 dprint(FD_FILE, "iolog: ignoring"
451 " re-add of file %s\n", fname);
453 fileno = add_file(td, fname, td->subjob_number, 1);
454 file_action = FIO_LOG_ADD_FILE;
457 } else if (!strcmp(act, "open")) {
458 fileno = get_fileno(td, fname);
459 file_action = FIO_LOG_OPEN_FILE;
460 } else if (!strcmp(act, "close")) {
461 fileno = get_fileno(td, fname);
462 file_action = FIO_LOG_CLOSE_FILE;
464 log_err("fio: bad iolog file action: %s\n",
469 log_err("bad iolog2: %s\n", p);
475 else if (rw == DDIR_WRITE) {
477 * Don't add a write for ro mode
482 } else if (rw == DDIR_WAIT) {
486 } else if (rw == DDIR_INVAL) {
487 } else if (!ddir_sync(rw)) {
488 log_err("bad ddir: %d\n", rw);
495 ipo = calloc(1, sizeof(*ipo));
498 if (rw == DDIR_WAIT) {
501 if (td->o.replay_scale)
502 ipo->offset = offset / td->o.replay_scale;
504 ipo->offset = offset;
505 ipo_bytes_align(td->o.replay_align, ipo);
508 if (rw != DDIR_INVAL && bytes > td->o.max_bs[rw]) {
510 td->o.max_bs[rw] = bytes;
512 ipo->fileno = fileno;
513 ipo->file_action = file_action;
517 queue_io_piece(td, ipo);
519 if (td->o.read_iolog_chunked) {
520 td->io_log_current++;
522 if (items_to_fetch == 0)
531 if (td->o.read_iolog_chunked) {
532 td->io_log_highmark = td->io_log_current;
533 td->io_log_checkmark = (td->io_log_highmark + 1) / 2;
534 fio_gettime(&td->io_log_highmark_time, NULL);
537 if (writes && read_only) {
538 log_err("fio: <%s> skips replay of %d writes due to"
539 " read-only\n", td->o.name, writes);
543 if (td->o.read_iolog_chunked) {
544 if (td->io_log_current == 0) {
547 td->o.td_ddir = TD_DDIR_RW;
548 if (realloc && td->orig_buffer)
552 init_io_u_buffers(td);
557 if (!reads && !writes && !waits)
559 else if (reads && !writes)
560 td->o.td_ddir = TD_DDIR_READ;
561 else if (!reads && writes)
562 td->o.td_ddir = TD_DDIR_WRITE;
564 td->o.td_ddir = TD_DDIR_RW;
569 static bool is_socket(const char *path)
574 r = stat(path, &buf);
578 return S_ISSOCK(buf.st_mode);
581 static int open_socket(const char *path)
583 struct sockaddr_un addr;
586 fd = socket(AF_UNIX, SOCK_STREAM, 0);
590 addr.sun_family = AF_UNIX;
591 if (snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", path) >=
592 sizeof(addr.sun_path)) {
593 log_err("%s: path name %s is too long for a Unix socket\n",
597 ret = connect(fd, (const struct sockaddr *)&addr, strlen(path) + sizeof(addr.sun_family));
606 * open iolog, check version, and call appropriate parser
608 static bool init_iolog_read(struct thread_data *td)
610 char buffer[256], *p, *fname;
613 fname = get_name_by_idx(td->o.read_iolog_file, td->subjob_number);
614 dprint(FD_IO, "iolog: name=%s\n", fname);
616 if (is_socket(fname)) {
619 fd = open_socket(fname);
623 f = fopen(fname, "r");
628 perror("fopen read iolog");
632 p = fgets(buffer, sizeof(buffer), f);
634 td_verror(td, errno, "iolog read");
635 log_err("fio: unable to read iolog\n");
641 * version 2 of the iolog stores a specific string as the
642 * first line, check for that
644 if (!strncmp(iolog_ver2, buffer, strlen(iolog_ver2))) {
645 free_release_files(td);
646 td->io_log_rfile = f;
647 return read_iolog2(td);
650 log_err("fio: iolog version 1 is no longer supported\n");
656 * Set up a log for storing io patterns.
658 static bool init_iolog_write(struct thread_data *td)
664 f = fopen(td->o.write_iolog_file, "a");
666 perror("fopen write iolog");
671 * That's it for writing, setup a log buffer and we're done.
674 td->iolog_buf = malloc(8192);
675 setvbuf(f, td->iolog_buf, _IOFBF, 8192);
678 * write our version line
680 if (fprintf(f, "%s\n", iolog_ver2) < 0) {
681 perror("iolog init\n");
686 * add all known files
688 for_each_file(td, ff, i)
689 log_file(td, ff, FIO_LOG_ADD_FILE);
694 bool init_iolog(struct thread_data *td)
698 if (td->o.read_iolog_file) {
702 * Check if it's a blktrace file and load that if possible.
703 * Otherwise assume it's a normal log file and load that.
705 if (is_blktrace(td->o.read_iolog_file, &need_swap))
706 ret = load_blktrace(td, td->o.read_iolog_file, need_swap);
708 ret = init_iolog_read(td);
709 } else if (td->o.write_iolog_file)
710 ret = init_iolog_write(td);
715 td_verror(td, EINVAL, "failed initializing iolog");
720 void setup_log(struct io_log **log, struct log_params *p,
721 const char *filename)
725 struct io_u_plat_entry *entry;
726 struct flist_head *list;
728 l = scalloc(1, sizeof(*l));
729 INIT_FLIST_HEAD(&l->io_logs);
730 l->log_type = p->log_type;
731 l->log_offset = p->log_offset;
732 l->log_gz = p->log_gz;
733 l->log_gz_store = p->log_gz_store;
734 l->avg_msec = p->avg_msec;
735 l->hist_msec = p->hist_msec;
736 l->hist_coarseness = p->hist_coarseness;
737 l->filename = strdup(filename);
740 /* Initialize histogram lists for each r/w direction,
741 * with initial io_u_plat of all zeros:
743 for (i = 0; i < DDIR_RWDIR_CNT; i++) {
744 list = &l->hist_window[i].list;
745 INIT_FLIST_HEAD(list);
746 entry = calloc(1, sizeof(struct io_u_plat_entry));
747 flist_add(&entry->list, list);
750 if (l->td && l->td->o.io_submit_mode != IO_MODE_OFFLOAD) {
753 __p = calloc(1, sizeof(*l->pending));
754 __p->max_samples = DEF_LOG_ENTRIES;
755 __p->log = calloc(__p->max_samples, log_entry_sz(l));
760 l->log_ddir_mask = LOG_OFFSET_SAMPLE_BIT;
762 INIT_FLIST_HEAD(&l->chunk_list);
764 if (l->log_gz && !p->td)
766 else if (l->log_gz || l->log_gz_store) {
767 mutex_init_pshared(&l->chunk_lock);
768 mutex_init_pshared(&l->deferred_free_lock);
769 p->td->flags |= TD_F_COMPRESS_LOG;
775 #ifdef CONFIG_SETVBUF
776 static void *set_file_buffer(FILE *f)
778 size_t size = 1048576;
782 setvbuf(f, buf, _IOFBF, size);
786 static void clear_file_buffer(void *buf)
791 static void *set_file_buffer(FILE *f)
796 static void clear_file_buffer(void *buf)
801 void free_log(struct io_log *log)
803 while (!flist_empty(&log->io_logs)) {
804 struct io_logs *cur_log;
806 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
807 flist_del_init(&cur_log->list);
813 free(log->pending->log);
823 uint64_t hist_sum(int j, int stride, uint64_t *io_u_plat,
824 uint64_t *io_u_plat_last)
829 if (io_u_plat_last) {
830 for (k = sum = 0; k < stride; k++)
831 sum += io_u_plat[j + k] - io_u_plat_last[j + k];
833 for (k = sum = 0; k < stride; k++)
834 sum += io_u_plat[j + k];
840 static void flush_hist_samples(FILE *f, int hist_coarseness, void *samples,
841 uint64_t sample_size)
845 uint64_t i, j, nr_samples;
846 struct io_u_plat_entry *entry, *entry_before;
848 uint64_t *io_u_plat_before;
850 int stride = 1 << hist_coarseness;
855 s = __get_sample(samples, 0, 0);
856 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
858 nr_samples = sample_size / __log_entry_sz(log_offset);
860 for (i = 0; i < nr_samples; i++) {
861 s = __get_sample(samples, log_offset, i);
863 entry = s->data.plat_entry;
864 io_u_plat = entry->io_u_plat;
866 entry_before = flist_first_entry(&entry->list, struct io_u_plat_entry, list);
867 io_u_plat_before = entry_before->io_u_plat;
869 fprintf(f, "%lu, %u, %llu, ", (unsigned long) s->time,
870 io_sample_ddir(s), (unsigned long long) s->bs);
871 for (j = 0; j < FIO_IO_U_PLAT_NR - stride; j += stride) {
872 fprintf(f, "%llu, ", (unsigned long long)
873 hist_sum(j, stride, io_u_plat, io_u_plat_before));
875 fprintf(f, "%llu\n", (unsigned long long)
876 hist_sum(FIO_IO_U_PLAT_NR - stride, stride, io_u_plat,
879 flist_del(&entry_before->list);
884 void flush_samples(FILE *f, void *samples, uint64_t sample_size)
888 uint64_t i, nr_samples;
893 s = __get_sample(samples, 0, 0);
894 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
896 nr_samples = sample_size / __log_entry_sz(log_offset);
898 for (i = 0; i < nr_samples; i++) {
899 s = __get_sample(samples, log_offset, i);
902 fprintf(f, "%lu, %" PRId64 ", %u, %llu, %u\n",
903 (unsigned long) s->time,
905 io_sample_ddir(s), (unsigned long long) s->bs, s->priority_bit);
907 struct io_sample_offset *so = (void *) s;
909 fprintf(f, "%lu, %" PRId64 ", %u, %llu, %llu, %u\n",
910 (unsigned long) s->time,
912 io_sample_ddir(s), (unsigned long long) s->bs,
913 (unsigned long long) so->offset, s->priority_bit);
920 struct iolog_flush_data {
921 struct workqueue_work work;
928 #define GZ_CHUNK 131072
930 static struct iolog_compress *get_new_chunk(unsigned int seq)
932 struct iolog_compress *c;
934 c = malloc(sizeof(*c));
935 INIT_FLIST_HEAD(&c->list);
936 c->buf = malloc(GZ_CHUNK);
942 static void free_chunk(struct iolog_compress *ic)
948 static int z_stream_init(z_stream *stream, int gz_hdr)
952 memset(stream, 0, sizeof(*stream));
953 stream->zalloc = Z_NULL;
954 stream->zfree = Z_NULL;
955 stream->opaque = Z_NULL;
956 stream->next_in = Z_NULL;
959 * zlib magic - add 32 for auto-detection of gz header or not,
960 * if we decide to store files in a gzip friendly format.
965 if (inflateInit2(stream, wbits) != Z_OK)
971 struct inflate_chunk_iter {
980 static void finish_chunk(z_stream *stream, FILE *f,
981 struct inflate_chunk_iter *iter)
985 ret = inflateEnd(stream);
987 log_err("fio: failed to end log inflation seq %d (%d)\n",
990 flush_samples(f, iter->buf, iter->buf_used);
993 iter->buf_size = iter->buf_used = 0;
997 * Iterative chunk inflation. Handles cases where we cross into a new
998 * sequence, doing flush finish of previous chunk if needed.
1000 static size_t inflate_chunk(struct iolog_compress *ic, int gz_hdr, FILE *f,
1001 z_stream *stream, struct inflate_chunk_iter *iter)
1005 dprint(FD_COMPRESS, "inflate chunk size=%lu, seq=%u\n",
1006 (unsigned long) ic->len, ic->seq);
1008 if (ic->seq != iter->seq) {
1010 finish_chunk(stream, f, iter);
1012 z_stream_init(stream, gz_hdr);
1013 iter->seq = ic->seq;
1016 stream->avail_in = ic->len;
1017 stream->next_in = ic->buf;
1019 if (!iter->buf_size) {
1020 iter->buf_size = iter->chunk_sz;
1021 iter->buf = malloc(iter->buf_size);
1024 while (stream->avail_in) {
1025 size_t this_out = iter->buf_size - iter->buf_used;
1028 stream->avail_out = this_out;
1029 stream->next_out = iter->buf + iter->buf_used;
1031 err = inflate(stream, Z_NO_FLUSH);
1033 log_err("fio: failed inflating log: %d\n", err);
1038 iter->buf_used += this_out - stream->avail_out;
1040 if (!stream->avail_out) {
1041 iter->buf_size += iter->chunk_sz;
1042 iter->buf = realloc(iter->buf, iter->buf_size);
1046 if (err == Z_STREAM_END)
1050 ret = (void *) stream->next_in - ic->buf;
1052 dprint(FD_COMPRESS, "inflated to size=%lu\n", (unsigned long) iter->buf_size);
1058 * Inflate stored compressed chunks, or write them directly to the log
1059 * file if so instructed.
1061 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1063 struct inflate_chunk_iter iter = { .chunk_sz = log->log_gz, };
1066 while (!flist_empty(&log->chunk_list)) {
1067 struct iolog_compress *ic;
1069 ic = flist_first_entry(&log->chunk_list, struct iolog_compress, list);
1070 flist_del(&ic->list);
1072 if (log->log_gz_store) {
1075 dprint(FD_COMPRESS, "log write chunk size=%lu, "
1076 "seq=%u\n", (unsigned long) ic->len, ic->seq);
1078 ret = fwrite(ic->buf, ic->len, 1, f);
1079 if (ret != 1 || ferror(f)) {
1081 log_err("fio: error writing compressed log\n");
1084 inflate_chunk(ic, log->log_gz_store, f, &stream, &iter);
1090 finish_chunk(&stream, f, &iter);
1098 * Open compressed log file and decompress the stored chunks and
1099 * write them to stdout. The chunks are stored sequentially in the
1100 * file, so we iterate over them and do them one-by-one.
1102 int iolog_file_inflate(const char *file)
1104 struct inflate_chunk_iter iter = { .chunk_sz = 64 * 1024 * 1024, };
1105 struct iolog_compress ic;
1113 f = fopen(file, "r");
1119 if (stat(file, &sb) < 0) {
1125 ic.buf = buf = malloc(sb.st_size);
1126 ic.len = sb.st_size;
1129 ret = fread(ic.buf, ic.len, 1, f);
1130 if (ret == 0 && ferror(f)) {
1135 } else if (ferror(f) || (!feof(f) && ret != 1)) {
1136 log_err("fio: short read on reading log\n");
1145 * Each chunk will return Z_STREAM_END. We don't know how many
1146 * chunks are in the file, so we just keep looping and incrementing
1147 * the sequence number until we have consumed the whole compressed
1154 iret = inflate_chunk(&ic, 1, stdout, &stream, &iter);
1167 finish_chunk(&stream, stdout, &iter);
1177 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1182 int iolog_file_inflate(const char *file)
1184 log_err("fio: log inflation not possible without zlib\n");
1190 void flush_log(struct io_log *log, bool do_append)
1196 f = fopen(log->filename, "w");
1198 f = fopen(log->filename, "a");
1200 perror("fopen log");
1204 buf = set_file_buffer(f);
1206 inflate_gz_chunks(log, f);
1208 while (!flist_empty(&log->io_logs)) {
1209 struct io_logs *cur_log;
1211 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1212 flist_del_init(&cur_log->list);
1214 if (log->td && log == log->td->clat_hist_log)
1215 flush_hist_samples(f, log->hist_coarseness, cur_log->log,
1216 log_sample_sz(log, cur_log));
1218 flush_samples(f, cur_log->log, log_sample_sz(log, cur_log));
1224 clear_file_buffer(buf);
1227 static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
1229 if (td->flags & TD_F_COMPRESS_LOG)
1233 if (fio_trylock_file(log->filename))
1236 fio_lock_file(log->filename);
1238 if (td->client_type == FIO_CLIENT_TYPE_GUI || is_backend)
1239 fio_send_iolog(td, log, log->filename);
1241 flush_log(log, !td->o.per_job_logs);
1243 fio_unlock_file(log->filename);
1248 size_t log_chunk_sizes(struct io_log *log)
1250 struct flist_head *entry;
1253 if (flist_empty(&log->chunk_list))
1257 pthread_mutex_lock(&log->chunk_lock);
1258 flist_for_each(entry, &log->chunk_list) {
1259 struct iolog_compress *c;
1261 c = flist_entry(entry, struct iolog_compress, list);
1264 pthread_mutex_unlock(&log->chunk_lock);
1270 static void iolog_put_deferred(struct io_log *log, void *ptr)
1275 pthread_mutex_lock(&log->deferred_free_lock);
1276 if (log->deferred < IOLOG_MAX_DEFER) {
1277 log->deferred_items[log->deferred] = ptr;
1279 } else if (!fio_did_warn(FIO_WARN_IOLOG_DROP))
1280 log_err("fio: had to drop log entry free\n");
1281 pthread_mutex_unlock(&log->deferred_free_lock);
1284 static void iolog_free_deferred(struct io_log *log)
1291 pthread_mutex_lock(&log->deferred_free_lock);
1293 for (i = 0; i < log->deferred; i++) {
1294 free(log->deferred_items[i]);
1295 log->deferred_items[i] = NULL;
1299 pthread_mutex_unlock(&log->deferred_free_lock);
1302 static int gz_work(struct iolog_flush_data *data)
1304 struct iolog_compress *c = NULL;
1305 struct flist_head list;
1311 INIT_FLIST_HEAD(&list);
1313 memset(&stream, 0, sizeof(stream));
1314 stream.zalloc = Z_NULL;
1315 stream.zfree = Z_NULL;
1316 stream.opaque = Z_NULL;
1318 ret = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
1320 log_err("fio: failed to init gz stream\n");
1324 seq = ++data->log->chunk_seq;
1326 stream.next_in = (void *) data->samples;
1327 stream.avail_in = data->nr_samples * log_entry_sz(data->log);
1329 dprint(FD_COMPRESS, "deflate input size=%lu, seq=%u, log=%s\n",
1330 (unsigned long) stream.avail_in, seq,
1331 data->log->filename);
1334 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1335 (unsigned long) c->len);
1336 c = get_new_chunk(seq);
1337 stream.avail_out = GZ_CHUNK;
1338 stream.next_out = c->buf;
1339 ret = deflate(&stream, Z_NO_FLUSH);
1341 log_err("fio: deflate log (%d)\n", ret);
1346 c->len = GZ_CHUNK - stream.avail_out;
1347 flist_add_tail(&c->list, &list);
1349 } while (stream.avail_in);
1351 stream.next_out = c->buf + c->len;
1352 stream.avail_out = GZ_CHUNK - c->len;
1354 ret = deflate(&stream, Z_FINISH);
1357 * Z_BUF_ERROR is special, it just means we need more
1358 * output space. We'll handle that below. Treat any other
1361 if (ret != Z_BUF_ERROR) {
1362 log_err("fio: deflate log (%d)\n", ret);
1363 flist_del(&c->list);
1370 c->len = GZ_CHUNK - stream.avail_out;
1372 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq, (unsigned long) c->len);
1374 if (ret != Z_STREAM_END) {
1376 c = get_new_chunk(seq);
1377 stream.avail_out = GZ_CHUNK;
1378 stream.next_out = c->buf;
1379 ret = deflate(&stream, Z_FINISH);
1380 c->len = GZ_CHUNK - stream.avail_out;
1382 flist_add_tail(&c->list, &list);
1383 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1384 (unsigned long) c->len);
1385 } while (ret != Z_STREAM_END);
1388 dprint(FD_COMPRESS, "deflated to size=%lu\n", (unsigned long) total);
1390 ret = deflateEnd(&stream);
1392 log_err("fio: deflateEnd %d\n", ret);
1394 iolog_put_deferred(data->log, data->samples);
1396 if (!flist_empty(&list)) {
1397 pthread_mutex_lock(&data->log->chunk_lock);
1398 flist_splice_tail(&list, &data->log->chunk_list);
1399 pthread_mutex_unlock(&data->log->chunk_lock);
1408 while (!flist_empty(&list)) {
1409 c = flist_first_entry(list.next, struct iolog_compress, list);
1410 flist_del(&c->list);
1418 * Invoked from our compress helper thread, when logging would have exceeded
1419 * the specified memory limitation. Compresses the previously stored
1422 static int gz_work_async(struct submit_worker *sw, struct workqueue_work *work)
1424 return gz_work(container_of(work, struct iolog_flush_data, work));
1427 static int gz_init_worker(struct submit_worker *sw)
1429 struct thread_data *td = sw->wq->td;
1431 if (!fio_option_is_set(&td->o, log_gz_cpumask))
1434 if (fio_setaffinity(gettid(), td->o.log_gz_cpumask) == -1) {
1435 log_err("gz: failed to set CPU affinity\n");
1442 static struct workqueue_ops log_compress_wq_ops = {
1443 .fn = gz_work_async,
1444 .init_worker_fn = gz_init_worker,
1448 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1450 if (!(td->flags & TD_F_COMPRESS_LOG))
1453 workqueue_init(td, &td->log_compress_wq, &log_compress_wq_ops, 1, sk_out);
1457 void iolog_compress_exit(struct thread_data *td)
1459 if (!(td->flags & TD_F_COMPRESS_LOG))
1462 workqueue_exit(&td->log_compress_wq);
1466 * Queue work item to compress the existing log entries. We reset the
1467 * current log to a small size, and reference the existing log in the
1468 * data that we queue for compression. Once compression has been done,
1469 * this old log is freed. If called with finish == true, will not return
1470 * until the log compression has completed, and will flush all previous
1473 static int iolog_flush(struct io_log *log)
1475 struct iolog_flush_data *data;
1477 data = malloc(sizeof(*data));
1484 while (!flist_empty(&log->io_logs)) {
1485 struct io_logs *cur_log;
1487 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1488 flist_del_init(&cur_log->list);
1490 data->samples = cur_log->log;
1491 data->nr_samples = cur_log->nr_samples;
1502 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1504 struct iolog_flush_data *data;
1506 data = smalloc(sizeof(*data));
1512 data->samples = cur_log->log;
1513 data->nr_samples = cur_log->nr_samples;
1516 cur_log->nr_samples = cur_log->max_samples = 0;
1517 cur_log->log = NULL;
1519 workqueue_enqueue(&log->td->log_compress_wq, &data->work);
1521 iolog_free_deferred(log);
1527 static int iolog_flush(struct io_log *log)
1532 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1537 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1542 void iolog_compress_exit(struct thread_data *td)
1548 struct io_logs *iolog_cur_log(struct io_log *log)
1550 if (flist_empty(&log->io_logs))
1553 return flist_last_entry(&log->io_logs, struct io_logs, list);
1556 uint64_t iolog_nr_samples(struct io_log *iolog)
1558 struct flist_head *entry;
1561 flist_for_each(entry, &iolog->io_logs) {
1562 struct io_logs *cur_log;
1564 cur_log = flist_entry(entry, struct io_logs, list);
1565 ret += cur_log->nr_samples;
1571 static int __write_log(struct thread_data *td, struct io_log *log, int try)
1574 return finish_log(td, log, try);
1579 static int write_iops_log(struct thread_data *td, int try, bool unit_log)
1583 if (per_unit_log(td->iops_log) != unit_log)
1586 ret = __write_log(td, td->iops_log, try);
1588 td->iops_log = NULL;
1593 static int write_slat_log(struct thread_data *td, int try, bool unit_log)
1600 ret = __write_log(td, td->slat_log, try);
1602 td->slat_log = NULL;
1607 static int write_clat_log(struct thread_data *td, int try, bool unit_log)
1614 ret = __write_log(td, td->clat_log, try);
1616 td->clat_log = NULL;
1621 static int write_clat_hist_log(struct thread_data *td, int try, bool unit_log)
1628 ret = __write_log(td, td->clat_hist_log, try);
1630 td->clat_hist_log = NULL;
1635 static int write_lat_log(struct thread_data *td, int try, bool unit_log)
1642 ret = __write_log(td, td->lat_log, try);
1649 static int write_bandw_log(struct thread_data *td, int try, bool unit_log)
1653 if (per_unit_log(td->bw_log) != unit_log)
1656 ret = __write_log(td, td->bw_log, try);
1669 CLAT_HIST_LOG_MASK = 32,
1676 int (*fn)(struct thread_data *, int, bool);
1679 static struct log_type log_types[] = {
1681 .mask = BW_LOG_MASK,
1682 .fn = write_bandw_log,
1685 .mask = LAT_LOG_MASK,
1686 .fn = write_lat_log,
1689 .mask = SLAT_LOG_MASK,
1690 .fn = write_slat_log,
1693 .mask = CLAT_LOG_MASK,
1694 .fn = write_clat_log,
1697 .mask = IOPS_LOG_MASK,
1698 .fn = write_iops_log,
1701 .mask = CLAT_HIST_LOG_MASK,
1702 .fn = write_clat_hist_log,
1706 void td_writeout_logs(struct thread_data *td, bool unit_logs)
1708 unsigned int log_mask = 0;
1709 unsigned int log_left = ALL_LOG_NR;
1712 old_state = td_bump_runstate(td, TD_FINISHING);
1714 finalize_logs(td, unit_logs);
1717 int prev_log_left = log_left;
1719 for (i = 0; i < ALL_LOG_NR && log_left; i++) {
1720 struct log_type *lt = &log_types[i];
1723 if (!(log_mask & lt->mask)) {
1724 ret = lt->fn(td, log_left != 1, unit_logs);
1727 log_mask |= lt->mask;
1732 if (prev_log_left == log_left)
1736 td_restore_runstate(td, old_state);
1739 void fio_writeout_logs(bool unit_logs)
1741 struct thread_data *td;
1745 td_writeout_logs(td, unit_logs);