2 * Code related to writing an iolog of what a thread is doing, and to
3 * later read that back and replay
23 #include <netinet/in.h>
24 #include <netinet/tcp.h>
25 #include <arpa/inet.h>
27 #include <sys/socket.h>
30 static int iolog_flush(struct io_log *log);
32 static const char iolog_ver2[] = "fio version 2 iolog";
34 void queue_io_piece(struct thread_data *td, struct io_piece *ipo)
36 flist_add_tail(&ipo->list, &td->io_log_list);
37 td->total_io_size += ipo->len;
40 void log_io_u(const struct thread_data *td, const struct io_u *io_u)
42 if (!td->o.write_iolog_file)
45 fprintf(td->iolog_f, "%s %s %llu %llu\n", io_u->file->file_name,
46 io_ddir_name(io_u->ddir),
47 io_u->offset, io_u->buflen);
50 void log_file(struct thread_data *td, struct fio_file *f,
51 enum file_log_act what)
53 const char *act[] = { "add", "open", "close" };
57 if (!td->o.write_iolog_file)
62 * this happens on the pre-open/close done before the job starts
67 fprintf(td->iolog_f, "%s %s\n", f->file_name, act[what]);
70 static void iolog_delay(struct thread_data *td, unsigned long delay)
72 uint64_t usec = utime_since_now(&td->last_issue);
73 unsigned long orig_delay = delay;
77 if (delay < td->time_offset) {
82 delay -= td->time_offset;
88 fio_gettime(&ts, NULL);
89 while (delay && !td->terminate) {
91 if (this_delay > 500000)
94 usec_sleep(td, this_delay);
98 usec = utime_since_now(&ts);
99 if (usec > orig_delay)
100 td->time_offset = usec - orig_delay;
105 static int ipo_special(struct thread_data *td, struct io_piece *ipo)
113 if (ipo->ddir != DDIR_INVAL)
116 f = td->files[ipo->fileno];
118 switch (ipo->file_action) {
119 case FIO_LOG_OPEN_FILE:
120 if (td->o.replay_redirect && fio_file_open(f)) {
121 dprint(FD_FILE, "iolog: ignoring re-open of file %s\n",
125 ret = td_io_open_file(td, f);
128 td_verror(td, ret, "iolog open file");
130 case FIO_LOG_CLOSE_FILE:
131 td_io_close_file(td, f);
133 case FIO_LOG_UNLINK_FILE:
134 td_io_unlink_file(td, f);
137 log_err("fio: bad file action %d\n", ipo->file_action);
144 static bool read_iolog2(struct thread_data *td);
146 int read_iolog_get(struct thread_data *td, struct io_u *io_u)
148 struct io_piece *ipo;
149 unsigned long elapsed;
151 while (!flist_empty(&td->io_log_list)) {
153 if (td->o.read_iolog_chunked) {
154 if (td->io_log_checkmark == td->io_log_current) {
155 if (!read_iolog2(td))
158 td->io_log_current--;
160 ipo = flist_first_entry(&td->io_log_list, struct io_piece, list);
161 flist_del(&ipo->list);
162 remove_trim_entry(td, ipo);
164 ret = ipo_special(td, ipo);
168 } else if (ret > 0) {
173 io_u->ddir = ipo->ddir;
174 if (ipo->ddir != DDIR_WAIT) {
175 io_u->offset = ipo->offset;
176 io_u->buflen = ipo->len;
177 io_u->file = td->files[ipo->fileno];
178 get_file(io_u->file);
179 dprint(FD_IO, "iolog: get %llu/%llu/%s\n", io_u->offset,
180 io_u->buflen, io_u->file->file_name);
182 iolog_delay(td, ipo->delay);
184 elapsed = mtime_since_genesis();
185 if (ipo->delay > elapsed)
186 usec_sleep(td, (ipo->delay - elapsed) * 1000);
191 if (io_u->ddir != DDIR_WAIT)
199 void prune_io_piece_log(struct thread_data *td)
201 struct io_piece *ipo;
202 struct fio_rb_node *n;
204 while ((n = rb_first(&td->io_hist_tree)) != NULL) {
205 ipo = rb_entry(n, struct io_piece, rb_node);
206 rb_erase(n, &td->io_hist_tree);
207 remove_trim_entry(td, ipo);
212 while (!flist_empty(&td->io_hist_list)) {
213 ipo = flist_first_entry(&td->io_hist_list, struct io_piece, list);
214 flist_del(&ipo->list);
215 remove_trim_entry(td, ipo);
222 * log a successful write, so we can unwind the log for verify
224 void log_io_piece(struct thread_data *td, struct io_u *io_u)
226 struct fio_rb_node **p, *parent;
227 struct io_piece *ipo, *__ipo;
229 ipo = calloc(1, sizeof(struct io_piece));
231 ipo->file = io_u->file;
232 ipo->offset = io_u->offset;
233 ipo->len = io_u->buflen;
234 ipo->numberio = io_u->numberio;
235 ipo->flags = IP_F_IN_FLIGHT;
239 if (io_u_should_trim(td, io_u)) {
240 flist_add_tail(&ipo->trim_list, &td->trim_list);
245 * Only sort writes if we don't have a random map in which case we need
246 * to check for duplicate blocks and drop the old one, which we rely on
247 * the rb insert/lookup for handling.
249 if (file_randommap(td, ipo->file)) {
250 INIT_FLIST_HEAD(&ipo->list);
251 flist_add_tail(&ipo->list, &td->io_hist_list);
252 ipo->flags |= IP_F_ONLIST;
257 RB_CLEAR_NODE(&ipo->rb_node);
260 * Sort the entry into the verification list
263 p = &td->io_hist_tree.rb_node;
269 __ipo = rb_entry(parent, struct io_piece, rb_node);
270 if (ipo->file < __ipo->file)
272 else if (ipo->file > __ipo->file)
274 else if (ipo->offset < __ipo->offset) {
276 overlap = ipo->offset + ipo->len > __ipo->offset;
278 else if (ipo->offset > __ipo->offset) {
280 overlap = __ipo->offset + __ipo->len > ipo->offset;
286 dprint(FD_IO, "iolog: overlap %llu/%lu, %llu/%lu\n",
287 __ipo->offset, __ipo->len,
288 ipo->offset, ipo->len);
290 rb_erase(parent, &td->io_hist_tree);
291 remove_trim_entry(td, __ipo);
292 if (!(__ipo->flags & IP_F_IN_FLIGHT))
298 rb_link_node(&ipo->rb_node, parent, p);
299 rb_insert_color(&ipo->rb_node, &td->io_hist_tree);
300 ipo->flags |= IP_F_ONRB;
304 void unlog_io_piece(struct thread_data *td, struct io_u *io_u)
306 struct io_piece *ipo = io_u->ipo;
308 if (td->ts.nr_block_infos) {
309 uint32_t *info = io_u_block_info(td, io_u);
310 if (BLOCK_INFO_STATE(*info) < BLOCK_STATE_TRIM_FAILURE) {
311 if (io_u->ddir == DDIR_TRIM)
312 *info = BLOCK_INFO_SET_STATE(*info,
313 BLOCK_STATE_TRIM_FAILURE);
314 else if (io_u->ddir == DDIR_WRITE)
315 *info = BLOCK_INFO_SET_STATE(*info,
316 BLOCK_STATE_WRITE_FAILURE);
323 if (ipo->flags & IP_F_ONRB)
324 rb_erase(&ipo->rb_node, &td->io_hist_tree);
325 else if (ipo->flags & IP_F_ONLIST)
326 flist_del(&ipo->list);
333 void trim_io_piece(const struct io_u *io_u)
335 struct io_piece *ipo = io_u->ipo;
340 ipo->len = io_u->xfer_buflen - io_u->resid;
343 void write_iolog_close(struct thread_data *td)
349 td->iolog_buf = NULL;
353 * Read version 2 iolog data. It is enhanced to include per-file logging,
356 static bool read_iolog2(struct thread_data *td)
358 unsigned long long offset;
360 int reads, writes, waits, fileno = 0, file_action = 0; /* stupid gcc */
361 char *rfname, *fname, *act;
364 int64_t items_to_fetch = 0;
366 if (td->o.read_iolog_chunked) {
367 if (td->io_log_highmark == 0) {
373 fio_gettime(&now, NULL);
374 elapsed = ntime_since(&td->io_log_highmark_time, &now);
375 for_1s = (td->io_log_highmark - td->io_log_current) * 1000000000 / elapsed;
376 items_to_fetch = for_1s - td->io_log_current;
377 if (items_to_fetch < 0)
379 td->io_log_highmark = td->io_log_current + items_to_fetch;
380 td->io_log_checkmark = (td->io_log_highmark + 1) / 2;
381 fio_gettime(&td->io_log_highmark_time, NULL);
382 if (items_to_fetch == 0)
387 * Read in the read iolog and store it, reuse the infrastructure
388 * for doing verifications.
391 rfname = fname = malloc(256+16);
392 act = malloc(256+16);
394 reads = writes = waits = 0;
395 while ((p = fgets(str, 4096, td->io_log_rfile)) != NULL) {
396 struct io_piece *ipo;
399 r = sscanf(p, "%256s %256s %llu %u", rfname, act, &offset,
402 if (td->o.replay_redirect)
403 fname = td->o.replay_redirect;
409 if (!strcmp(act, "wait"))
411 else if (!strcmp(act, "read"))
413 else if (!strcmp(act, "write"))
415 else if (!strcmp(act, "sync"))
417 else if (!strcmp(act, "datasync"))
419 else if (!strcmp(act, "trim"))
422 log_err("fio: bad iolog file action: %s\n",
426 fileno = get_fileno(td, fname);
429 if (!strcmp(act, "add")) {
430 if (td->o.replay_redirect &&
431 get_fileno(td, fname) != -1) {
432 dprint(FD_FILE, "iolog: ignoring"
433 " re-add of file %s\n", fname);
435 fileno = add_file(td, fname, 0, 1);
436 file_action = FIO_LOG_ADD_FILE;
439 } else if (!strcmp(act, "open")) {
440 fileno = get_fileno(td, fname);
441 file_action = FIO_LOG_OPEN_FILE;
442 } else if (!strcmp(act, "close")) {
443 fileno = get_fileno(td, fname);
444 file_action = FIO_LOG_CLOSE_FILE;
446 log_err("fio: bad iolog file action: %s\n",
451 log_err("bad iolog2: %s\n", p);
457 else if (rw == DDIR_WRITE) {
459 * Don't add a write for ro mode
464 } else if (rw == DDIR_WAIT) {
468 } else if (rw == DDIR_INVAL) {
469 } else if (!ddir_sync(rw)) {
470 log_err("bad ddir: %d\n", rw);
477 ipo = calloc(1, sizeof(*ipo));
480 if (rw == DDIR_WAIT) {
483 if (td->o.replay_scale)
484 ipo->offset = offset / td->o.replay_scale;
486 ipo->offset = offset;
487 ipo_bytes_align(td->o.replay_align, ipo);
490 if (rw != DDIR_INVAL && bytes > td->o.max_bs[rw])
491 td->o.max_bs[rw] = bytes;
492 ipo->fileno = fileno;
493 ipo->file_action = file_action;
497 queue_io_piece(td, ipo);
499 if (td->o.read_iolog_chunked) {
500 td->io_log_current++;
502 if (items_to_fetch == 0)
511 if (td->o.read_iolog_chunked) {
512 td->io_log_highmark = td->io_log_current;
513 td->io_log_checkmark = (td->io_log_highmark + 1) / 2;
514 fio_gettime(&td->io_log_highmark_time, NULL);
517 if (writes && read_only) {
518 log_err("fio: <%s> skips replay of %d writes due to"
519 " read-only\n", td->o.name, writes);
523 if (td->o.read_iolog_chunked) {
524 if (td->io_log_current == 0) {
527 td->o.td_ddir = TD_DDIR_RW;
531 if (!reads && !writes && !waits)
533 else if (reads && !writes)
534 td->o.td_ddir = TD_DDIR_READ;
535 else if (!reads && writes)
536 td->o.td_ddir = TD_DDIR_WRITE;
538 td->o.td_ddir = TD_DDIR_RW;
543 static bool is_socket(const char *path)
546 int r = stat(path, &buf);
550 return S_ISSOCK(buf.st_mode);
553 static int open_socket(const char *path)
555 int fd = socket(AF_UNIX, SOCK_STREAM, 0);
556 struct sockaddr_un addr;
559 addr.sun_family = AF_UNIX;
560 strncpy(addr.sun_path, path, sizeof(addr.sun_path));
561 if (connect(fd, (const struct sockaddr *)&addr, strlen(path) + sizeof(addr.sun_family)) == 0)
569 * open iolog, check version, and call appropriate parser
571 static bool init_iolog_read(struct thread_data *td)
573 char buffer[256], *p;
576 if (is_socket(td->o.read_iolog_file)) {
577 int fd = open_socket(td->o.read_iolog_file);
582 f = fopen(td->o.read_iolog_file, "r");
584 perror("fopen read iolog");
588 p = fgets(buffer, sizeof(buffer), f);
590 td_verror(td, errno, "iolog read");
591 log_err("fio: unable to read iolog\n");
595 td->io_log_rfile = f;
597 * version 2 of the iolog stores a specific string as the
598 * first line, check for that
600 if (!strncmp(iolog_ver2, buffer, strlen(iolog_ver2))) {
601 free_release_files(td);
602 ret = read_iolog2(td);
605 log_err("fio: iolog version 1 is no longer supported\n");
613 * Set up a log for storing io patterns.
615 static bool init_iolog_write(struct thread_data *td)
621 f = fopen(td->o.write_iolog_file, "a");
623 perror("fopen write iolog");
628 * That's it for writing, setup a log buffer and we're done.
631 td->iolog_buf = malloc(8192);
632 setvbuf(f, td->iolog_buf, _IOFBF, 8192);
635 * write our version line
637 if (fprintf(f, "%s\n", iolog_ver2) < 0) {
638 perror("iolog init\n");
643 * add all known files
645 for_each_file(td, ff, i)
646 log_file(td, ff, FIO_LOG_ADD_FILE);
651 bool init_iolog(struct thread_data *td)
655 if (td->o.read_iolog_file) {
659 * Check if it's a blktrace file and load that if possible.
660 * Otherwise assume it's a normal log file and load that.
662 if (is_blktrace(td->o.read_iolog_file, &need_swap))
663 ret = load_blktrace(td, td->o.read_iolog_file, need_swap);
665 ret = init_iolog_read(td);
666 } else if (td->o.write_iolog_file)
667 ret = init_iolog_write(td);
672 td_verror(td, EINVAL, "failed initializing iolog");
677 void setup_log(struct io_log **log, struct log_params *p,
678 const char *filename)
682 struct io_u_plat_entry *entry;
683 struct flist_head *list;
685 l = scalloc(1, sizeof(*l));
686 INIT_FLIST_HEAD(&l->io_logs);
687 l->log_type = p->log_type;
688 l->log_offset = p->log_offset;
689 l->log_gz = p->log_gz;
690 l->log_gz_store = p->log_gz_store;
691 l->avg_msec = p->avg_msec;
692 l->hist_msec = p->hist_msec;
693 l->hist_coarseness = p->hist_coarseness;
694 l->filename = strdup(filename);
697 /* Initialize histogram lists for each r/w direction,
698 * with initial io_u_plat of all zeros:
700 for (i = 0; i < DDIR_RWDIR_CNT; i++) {
701 list = &l->hist_window[i].list;
702 INIT_FLIST_HEAD(list);
703 entry = calloc(1, sizeof(struct io_u_plat_entry));
704 flist_add(&entry->list, list);
707 if (l->td && l->td->o.io_submit_mode != IO_MODE_OFFLOAD) {
710 __p = calloc(1, sizeof(*l->pending));
711 __p->max_samples = DEF_LOG_ENTRIES;
712 __p->log = calloc(__p->max_samples, log_entry_sz(l));
717 l->log_ddir_mask = LOG_OFFSET_SAMPLE_BIT;
719 INIT_FLIST_HEAD(&l->chunk_list);
721 if (l->log_gz && !p->td)
723 else if (l->log_gz || l->log_gz_store) {
724 mutex_init_pshared(&l->chunk_lock);
725 mutex_init_pshared(&l->deferred_free_lock);
726 p->td->flags |= TD_F_COMPRESS_LOG;
732 #ifdef CONFIG_SETVBUF
733 static void *set_file_buffer(FILE *f)
735 size_t size = 1048576;
739 setvbuf(f, buf, _IOFBF, size);
743 static void clear_file_buffer(void *buf)
748 static void *set_file_buffer(FILE *f)
753 static void clear_file_buffer(void *buf)
758 void free_log(struct io_log *log)
760 while (!flist_empty(&log->io_logs)) {
761 struct io_logs *cur_log;
763 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
764 flist_del_init(&cur_log->list);
770 free(log->pending->log);
780 uint64_t hist_sum(int j, int stride, uint64_t *io_u_plat,
781 uint64_t *io_u_plat_last)
786 if (io_u_plat_last) {
787 for (k = sum = 0; k < stride; k++)
788 sum += io_u_plat[j + k] - io_u_plat_last[j + k];
790 for (k = sum = 0; k < stride; k++)
791 sum += io_u_plat[j + k];
797 static void flush_hist_samples(FILE *f, int hist_coarseness, void *samples,
798 uint64_t sample_size)
802 uint64_t i, j, nr_samples;
803 struct io_u_plat_entry *entry, *entry_before;
805 uint64_t *io_u_plat_before;
807 int stride = 1 << hist_coarseness;
812 s = __get_sample(samples, 0, 0);
813 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
815 nr_samples = sample_size / __log_entry_sz(log_offset);
817 for (i = 0; i < nr_samples; i++) {
818 s = __get_sample(samples, log_offset, i);
820 entry = s->data.plat_entry;
821 io_u_plat = entry->io_u_plat;
823 entry_before = flist_first_entry(&entry->list, struct io_u_plat_entry, list);
824 io_u_plat_before = entry_before->io_u_plat;
826 fprintf(f, "%lu, %u, %llu, ", (unsigned long) s->time,
827 io_sample_ddir(s), (unsigned long long) s->bs);
828 for (j = 0; j < FIO_IO_U_PLAT_NR - stride; j += stride) {
829 fprintf(f, "%llu, ", (unsigned long long)
830 hist_sum(j, stride, io_u_plat, io_u_plat_before));
832 fprintf(f, "%llu\n", (unsigned long long)
833 hist_sum(FIO_IO_U_PLAT_NR - stride, stride, io_u_plat,
836 flist_del(&entry_before->list);
841 void flush_samples(FILE *f, void *samples, uint64_t sample_size)
845 uint64_t i, nr_samples;
850 s = __get_sample(samples, 0, 0);
851 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
853 nr_samples = sample_size / __log_entry_sz(log_offset);
855 for (i = 0; i < nr_samples; i++) {
856 s = __get_sample(samples, log_offset, i);
859 fprintf(f, "%lu, %" PRId64 ", %u, %llu\n",
860 (unsigned long) s->time,
862 io_sample_ddir(s), (unsigned long long) s->bs);
864 struct io_sample_offset *so = (void *) s;
866 fprintf(f, "%lu, %" PRId64 ", %u, %llu, %llu\n",
867 (unsigned long) s->time,
869 io_sample_ddir(s), (unsigned long long) s->bs,
870 (unsigned long long) so->offset);
877 struct iolog_flush_data {
878 struct workqueue_work work;
885 #define GZ_CHUNK 131072
887 static struct iolog_compress *get_new_chunk(unsigned int seq)
889 struct iolog_compress *c;
891 c = malloc(sizeof(*c));
892 INIT_FLIST_HEAD(&c->list);
893 c->buf = malloc(GZ_CHUNK);
899 static void free_chunk(struct iolog_compress *ic)
905 static int z_stream_init(z_stream *stream, int gz_hdr)
909 memset(stream, 0, sizeof(*stream));
910 stream->zalloc = Z_NULL;
911 stream->zfree = Z_NULL;
912 stream->opaque = Z_NULL;
913 stream->next_in = Z_NULL;
916 * zlib magic - add 32 for auto-detection of gz header or not,
917 * if we decide to store files in a gzip friendly format.
922 if (inflateInit2(stream, wbits) != Z_OK)
928 struct inflate_chunk_iter {
937 static void finish_chunk(z_stream *stream, FILE *f,
938 struct inflate_chunk_iter *iter)
942 ret = inflateEnd(stream);
944 log_err("fio: failed to end log inflation seq %d (%d)\n",
947 flush_samples(f, iter->buf, iter->buf_used);
950 iter->buf_size = iter->buf_used = 0;
954 * Iterative chunk inflation. Handles cases where we cross into a new
955 * sequence, doing flush finish of previous chunk if needed.
957 static size_t inflate_chunk(struct iolog_compress *ic, int gz_hdr, FILE *f,
958 z_stream *stream, struct inflate_chunk_iter *iter)
962 dprint(FD_COMPRESS, "inflate chunk size=%lu, seq=%u\n",
963 (unsigned long) ic->len, ic->seq);
965 if (ic->seq != iter->seq) {
967 finish_chunk(stream, f, iter);
969 z_stream_init(stream, gz_hdr);
973 stream->avail_in = ic->len;
974 stream->next_in = ic->buf;
976 if (!iter->buf_size) {
977 iter->buf_size = iter->chunk_sz;
978 iter->buf = malloc(iter->buf_size);
981 while (stream->avail_in) {
982 size_t this_out = iter->buf_size - iter->buf_used;
985 stream->avail_out = this_out;
986 stream->next_out = iter->buf + iter->buf_used;
988 err = inflate(stream, Z_NO_FLUSH);
990 log_err("fio: failed inflating log: %d\n", err);
995 iter->buf_used += this_out - stream->avail_out;
997 if (!stream->avail_out) {
998 iter->buf_size += iter->chunk_sz;
999 iter->buf = realloc(iter->buf, iter->buf_size);
1003 if (err == Z_STREAM_END)
1007 ret = (void *) stream->next_in - ic->buf;
1009 dprint(FD_COMPRESS, "inflated to size=%lu\n", (unsigned long) iter->buf_size);
1015 * Inflate stored compressed chunks, or write them directly to the log
1016 * file if so instructed.
1018 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1020 struct inflate_chunk_iter iter = { .chunk_sz = log->log_gz, };
1023 while (!flist_empty(&log->chunk_list)) {
1024 struct iolog_compress *ic;
1026 ic = flist_first_entry(&log->chunk_list, struct iolog_compress, list);
1027 flist_del(&ic->list);
1029 if (log->log_gz_store) {
1032 dprint(FD_COMPRESS, "log write chunk size=%lu, "
1033 "seq=%u\n", (unsigned long) ic->len, ic->seq);
1035 ret = fwrite(ic->buf, ic->len, 1, f);
1036 if (ret != 1 || ferror(f)) {
1038 log_err("fio: error writing compressed log\n");
1041 inflate_chunk(ic, log->log_gz_store, f, &stream, &iter);
1047 finish_chunk(&stream, f, &iter);
1055 * Open compressed log file and decompress the stored chunks and
1056 * write them to stdout. The chunks are stored sequentially in the
1057 * file, so we iterate over them and do them one-by-one.
1059 int iolog_file_inflate(const char *file)
1061 struct inflate_chunk_iter iter = { .chunk_sz = 64 * 1024 * 1024, };
1062 struct iolog_compress ic;
1070 f = fopen(file, "r");
1076 if (stat(file, &sb) < 0) {
1082 ic.buf = buf = malloc(sb.st_size);
1083 ic.len = sb.st_size;
1086 ret = fread(ic.buf, ic.len, 1, f);
1087 if (ret == 0 && ferror(f)) {
1092 } else if (ferror(f) || (!feof(f) && ret != 1)) {
1093 log_err("fio: short read on reading log\n");
1102 * Each chunk will return Z_STREAM_END. We don't know how many
1103 * chunks are in the file, so we just keep looping and incrementing
1104 * the sequence number until we have consumed the whole compressed
1111 iret = inflate_chunk(&ic, 1, stdout, &stream, &iter);
1124 finish_chunk(&stream, stdout, &iter);
1134 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1139 int iolog_file_inflate(const char *file)
1141 log_err("fio: log inflation not possible without zlib\n");
1147 void flush_log(struct io_log *log, bool do_append)
1153 f = fopen(log->filename, "w");
1155 f = fopen(log->filename, "a");
1157 perror("fopen log");
1161 buf = set_file_buffer(f);
1163 inflate_gz_chunks(log, f);
1165 while (!flist_empty(&log->io_logs)) {
1166 struct io_logs *cur_log;
1168 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1169 flist_del_init(&cur_log->list);
1171 if (log->td && log == log->td->clat_hist_log)
1172 flush_hist_samples(f, log->hist_coarseness, cur_log->log,
1173 log_sample_sz(log, cur_log));
1175 flush_samples(f, cur_log->log, log_sample_sz(log, cur_log));
1181 clear_file_buffer(buf);
1184 static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
1186 if (td->flags & TD_F_COMPRESS_LOG)
1190 if (fio_trylock_file(log->filename))
1193 fio_lock_file(log->filename);
1195 if (td->client_type == FIO_CLIENT_TYPE_GUI || is_backend)
1196 fio_send_iolog(td, log, log->filename);
1198 flush_log(log, !td->o.per_job_logs);
1200 fio_unlock_file(log->filename);
1205 size_t log_chunk_sizes(struct io_log *log)
1207 struct flist_head *entry;
1210 if (flist_empty(&log->chunk_list))
1214 pthread_mutex_lock(&log->chunk_lock);
1215 flist_for_each(entry, &log->chunk_list) {
1216 struct iolog_compress *c;
1218 c = flist_entry(entry, struct iolog_compress, list);
1221 pthread_mutex_unlock(&log->chunk_lock);
1227 static void iolog_put_deferred(struct io_log *log, void *ptr)
1232 pthread_mutex_lock(&log->deferred_free_lock);
1233 if (log->deferred < IOLOG_MAX_DEFER) {
1234 log->deferred_items[log->deferred] = ptr;
1236 } else if (!fio_did_warn(FIO_WARN_IOLOG_DROP))
1237 log_err("fio: had to drop log entry free\n");
1238 pthread_mutex_unlock(&log->deferred_free_lock);
1241 static void iolog_free_deferred(struct io_log *log)
1248 pthread_mutex_lock(&log->deferred_free_lock);
1250 for (i = 0; i < log->deferred; i++) {
1251 free(log->deferred_items[i]);
1252 log->deferred_items[i] = NULL;
1256 pthread_mutex_unlock(&log->deferred_free_lock);
1259 static int gz_work(struct iolog_flush_data *data)
1261 struct iolog_compress *c = NULL;
1262 struct flist_head list;
1268 INIT_FLIST_HEAD(&list);
1270 memset(&stream, 0, sizeof(stream));
1271 stream.zalloc = Z_NULL;
1272 stream.zfree = Z_NULL;
1273 stream.opaque = Z_NULL;
1275 ret = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
1277 log_err("fio: failed to init gz stream\n");
1281 seq = ++data->log->chunk_seq;
1283 stream.next_in = (void *) data->samples;
1284 stream.avail_in = data->nr_samples * log_entry_sz(data->log);
1286 dprint(FD_COMPRESS, "deflate input size=%lu, seq=%u, log=%s\n",
1287 (unsigned long) stream.avail_in, seq,
1288 data->log->filename);
1291 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1292 (unsigned long) c->len);
1293 c = get_new_chunk(seq);
1294 stream.avail_out = GZ_CHUNK;
1295 stream.next_out = c->buf;
1296 ret = deflate(&stream, Z_NO_FLUSH);
1298 log_err("fio: deflate log (%d)\n", ret);
1303 c->len = GZ_CHUNK - stream.avail_out;
1304 flist_add_tail(&c->list, &list);
1306 } while (stream.avail_in);
1308 stream.next_out = c->buf + c->len;
1309 stream.avail_out = GZ_CHUNK - c->len;
1311 ret = deflate(&stream, Z_FINISH);
1314 * Z_BUF_ERROR is special, it just means we need more
1315 * output space. We'll handle that below. Treat any other
1318 if (ret != Z_BUF_ERROR) {
1319 log_err("fio: deflate log (%d)\n", ret);
1320 flist_del(&c->list);
1327 c->len = GZ_CHUNK - stream.avail_out;
1329 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq, (unsigned long) c->len);
1331 if (ret != Z_STREAM_END) {
1333 c = get_new_chunk(seq);
1334 stream.avail_out = GZ_CHUNK;
1335 stream.next_out = c->buf;
1336 ret = deflate(&stream, Z_FINISH);
1337 c->len = GZ_CHUNK - stream.avail_out;
1339 flist_add_tail(&c->list, &list);
1340 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1341 (unsigned long) c->len);
1342 } while (ret != Z_STREAM_END);
1345 dprint(FD_COMPRESS, "deflated to size=%lu\n", (unsigned long) total);
1347 ret = deflateEnd(&stream);
1349 log_err("fio: deflateEnd %d\n", ret);
1351 iolog_put_deferred(data->log, data->samples);
1353 if (!flist_empty(&list)) {
1354 pthread_mutex_lock(&data->log->chunk_lock);
1355 flist_splice_tail(&list, &data->log->chunk_list);
1356 pthread_mutex_unlock(&data->log->chunk_lock);
1365 while (!flist_empty(&list)) {
1366 c = flist_first_entry(list.next, struct iolog_compress, list);
1367 flist_del(&c->list);
1375 * Invoked from our compress helper thread, when logging would have exceeded
1376 * the specified memory limitation. Compresses the previously stored
1379 static int gz_work_async(struct submit_worker *sw, struct workqueue_work *work)
1381 return gz_work(container_of(work, struct iolog_flush_data, work));
1384 static int gz_init_worker(struct submit_worker *sw)
1386 struct thread_data *td = sw->wq->td;
1388 if (!fio_option_is_set(&td->o, log_gz_cpumask))
1391 if (fio_setaffinity(gettid(), td->o.log_gz_cpumask) == -1) {
1392 log_err("gz: failed to set CPU affinity\n");
1399 static struct workqueue_ops log_compress_wq_ops = {
1400 .fn = gz_work_async,
1401 .init_worker_fn = gz_init_worker,
1405 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1407 if (!(td->flags & TD_F_COMPRESS_LOG))
1410 workqueue_init(td, &td->log_compress_wq, &log_compress_wq_ops, 1, sk_out);
1414 void iolog_compress_exit(struct thread_data *td)
1416 if (!(td->flags & TD_F_COMPRESS_LOG))
1419 workqueue_exit(&td->log_compress_wq);
1423 * Queue work item to compress the existing log entries. We reset the
1424 * current log to a small size, and reference the existing log in the
1425 * data that we queue for compression. Once compression has been done,
1426 * this old log is freed. If called with finish == true, will not return
1427 * until the log compression has completed, and will flush all previous
1430 static int iolog_flush(struct io_log *log)
1432 struct iolog_flush_data *data;
1434 data = malloc(sizeof(*data));
1441 while (!flist_empty(&log->io_logs)) {
1442 struct io_logs *cur_log;
1444 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1445 flist_del_init(&cur_log->list);
1447 data->samples = cur_log->log;
1448 data->nr_samples = cur_log->nr_samples;
1459 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1461 struct iolog_flush_data *data;
1463 data = smalloc(sizeof(*data));
1469 data->samples = cur_log->log;
1470 data->nr_samples = cur_log->nr_samples;
1473 cur_log->nr_samples = cur_log->max_samples = 0;
1474 cur_log->log = NULL;
1476 workqueue_enqueue(&log->td->log_compress_wq, &data->work);
1478 iolog_free_deferred(log);
1484 static int iolog_flush(struct io_log *log)
1489 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1494 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1499 void iolog_compress_exit(struct thread_data *td)
1505 struct io_logs *iolog_cur_log(struct io_log *log)
1507 if (flist_empty(&log->io_logs))
1510 return flist_last_entry(&log->io_logs, struct io_logs, list);
1513 uint64_t iolog_nr_samples(struct io_log *iolog)
1515 struct flist_head *entry;
1518 flist_for_each(entry, &iolog->io_logs) {
1519 struct io_logs *cur_log;
1521 cur_log = flist_entry(entry, struct io_logs, list);
1522 ret += cur_log->nr_samples;
1528 static int __write_log(struct thread_data *td, struct io_log *log, int try)
1531 return finish_log(td, log, try);
1536 static int write_iops_log(struct thread_data *td, int try, bool unit_log)
1540 if (per_unit_log(td->iops_log) != unit_log)
1543 ret = __write_log(td, td->iops_log, try);
1545 td->iops_log = NULL;
1550 static int write_slat_log(struct thread_data *td, int try, bool unit_log)
1557 ret = __write_log(td, td->slat_log, try);
1559 td->slat_log = NULL;
1564 static int write_clat_log(struct thread_data *td, int try, bool unit_log)
1571 ret = __write_log(td, td->clat_log, try);
1573 td->clat_log = NULL;
1578 static int write_clat_hist_log(struct thread_data *td, int try, bool unit_log)
1585 ret = __write_log(td, td->clat_hist_log, try);
1587 td->clat_hist_log = NULL;
1592 static int write_lat_log(struct thread_data *td, int try, bool unit_log)
1599 ret = __write_log(td, td->lat_log, try);
1606 static int write_bandw_log(struct thread_data *td, int try, bool unit_log)
1610 if (per_unit_log(td->bw_log) != unit_log)
1613 ret = __write_log(td, td->bw_log, try);
1626 CLAT_HIST_LOG_MASK = 32,
1633 int (*fn)(struct thread_data *, int, bool);
1636 static struct log_type log_types[] = {
1638 .mask = BW_LOG_MASK,
1639 .fn = write_bandw_log,
1642 .mask = LAT_LOG_MASK,
1643 .fn = write_lat_log,
1646 .mask = SLAT_LOG_MASK,
1647 .fn = write_slat_log,
1650 .mask = CLAT_LOG_MASK,
1651 .fn = write_clat_log,
1654 .mask = IOPS_LOG_MASK,
1655 .fn = write_iops_log,
1658 .mask = CLAT_HIST_LOG_MASK,
1659 .fn = write_clat_hist_log,
1663 void td_writeout_logs(struct thread_data *td, bool unit_logs)
1665 unsigned int log_mask = 0;
1666 unsigned int log_left = ALL_LOG_NR;
1669 old_state = td_bump_runstate(td, TD_FINISHING);
1671 finalize_logs(td, unit_logs);
1674 int prev_log_left = log_left;
1676 for (i = 0; i < ALL_LOG_NR && log_left; i++) {
1677 struct log_type *lt = &log_types[i];
1680 if (!(log_mask & lt->mask)) {
1681 ret = lt->fn(td, log_left != 1, unit_logs);
1684 log_mask |= lt->mask;
1689 if (prev_log_left == log_left)
1693 td_restore_runstate(td, old_state);
1696 void fio_writeout_logs(bool unit_logs)
1698 struct thread_data *td;
1702 td_writeout_logs(td, unit_logs);