2 * Code related to writing an iolog of what a thread is doing, and to
3 * later read that back and replay
23 #include <netinet/in.h>
24 #include <netinet/tcp.h>
25 #include <arpa/inet.h>
27 #include <sys/socket.h>
30 static int iolog_flush(struct io_log *log);
32 static const char iolog_ver2[] = "fio version 2 iolog";
34 void queue_io_piece(struct thread_data *td, struct io_piece *ipo)
36 flist_add_tail(&ipo->list, &td->io_log_list);
37 td->total_io_size += ipo->len;
40 void log_io_u(const struct thread_data *td, const struct io_u *io_u)
42 if (!td->o.write_iolog_file)
45 fprintf(td->iolog_f, "%s %s %llu %llu\n", io_u->file->file_name,
46 io_ddir_name(io_u->ddir),
47 io_u->offset, io_u->buflen);
50 void log_file(struct thread_data *td, struct fio_file *f,
51 enum file_log_act what)
53 const char *act[] = { "add", "open", "close" };
57 if (!td->o.write_iolog_file)
62 * this happens on the pre-open/close done before the job starts
67 fprintf(td->iolog_f, "%s %s\n", f->file_name, act[what]);
70 static void iolog_delay(struct thread_data *td, unsigned long delay)
72 uint64_t usec = utime_since_now(&td->last_issue);
73 unsigned long orig_delay = delay;
77 if (delay < td->time_offset) {
82 delay -= td->time_offset;
88 fio_gettime(&ts, NULL);
89 while (delay && !td->terminate) {
91 if (this_delay > 500000)
94 usec_sleep(td, this_delay);
98 usec = utime_since_now(&ts);
99 if (usec > orig_delay)
100 td->time_offset = usec - orig_delay;
105 static int ipo_special(struct thread_data *td, struct io_piece *ipo)
113 if (ipo->ddir != DDIR_INVAL)
116 f = td->files[ipo->fileno];
118 switch (ipo->file_action) {
119 case FIO_LOG_OPEN_FILE:
120 if (td->o.replay_redirect && fio_file_open(f)) {
121 dprint(FD_FILE, "iolog: ignoring re-open of file %s\n",
125 ret = td_io_open_file(td, f);
128 td_verror(td, ret, "iolog open file");
130 case FIO_LOG_CLOSE_FILE:
131 td_io_close_file(td, f);
133 case FIO_LOG_UNLINK_FILE:
134 td_io_unlink_file(td, f);
137 log_err("fio: bad file action %d\n", ipo->file_action);
144 static bool read_iolog2(struct thread_data *td);
146 int read_iolog_get(struct thread_data *td, struct io_u *io_u)
148 struct io_piece *ipo;
149 unsigned long elapsed;
151 while (!flist_empty(&td->io_log_list)) {
153 if (td->o.read_iolog_chunked) {
154 if (td->io_log_checkmark == td->io_log_current) {
155 if (!read_iolog2(td))
158 td->io_log_current--;
160 ipo = flist_first_entry(&td->io_log_list, struct io_piece, list);
161 flist_del(&ipo->list);
162 remove_trim_entry(td, ipo);
164 ret = ipo_special(td, ipo);
168 } else if (ret > 0) {
173 io_u->ddir = ipo->ddir;
174 if (ipo->ddir != DDIR_WAIT) {
175 io_u->offset = ipo->offset;
176 io_u->buflen = ipo->len;
177 io_u->file = td->files[ipo->fileno];
178 get_file(io_u->file);
179 dprint(FD_IO, "iolog: get %llu/%llu/%s\n", io_u->offset,
180 io_u->buflen, io_u->file->file_name);
182 iolog_delay(td, ipo->delay);
184 elapsed = mtime_since_genesis();
185 if (ipo->delay > elapsed)
186 usec_sleep(td, (ipo->delay - elapsed) * 1000);
191 if (io_u->ddir != DDIR_WAIT)
199 void prune_io_piece_log(struct thread_data *td)
201 struct io_piece *ipo;
202 struct fio_rb_node *n;
204 while ((n = rb_first(&td->io_hist_tree)) != NULL) {
205 ipo = rb_entry(n, struct io_piece, rb_node);
206 rb_erase(n, &td->io_hist_tree);
207 remove_trim_entry(td, ipo);
212 while (!flist_empty(&td->io_hist_list)) {
213 ipo = flist_first_entry(&td->io_hist_list, struct io_piece, list);
214 flist_del(&ipo->list);
215 remove_trim_entry(td, ipo);
222 * log a successful write, so we can unwind the log for verify
224 void log_io_piece(struct thread_data *td, struct io_u *io_u)
226 struct fio_rb_node **p, *parent;
227 struct io_piece *ipo, *__ipo;
229 ipo = calloc(1, sizeof(struct io_piece));
231 ipo->file = io_u->file;
232 ipo->offset = io_u->offset;
233 ipo->len = io_u->buflen;
234 ipo->numberio = io_u->numberio;
235 ipo->flags = IP_F_IN_FLIGHT;
239 if (io_u_should_trim(td, io_u)) {
240 flist_add_tail(&ipo->trim_list, &td->trim_list);
245 * Only sort writes if we don't have a random map in which case we need
246 * to check for duplicate blocks and drop the old one, which we rely on
247 * the rb insert/lookup for handling.
249 if (file_randommap(td, ipo->file)) {
250 INIT_FLIST_HEAD(&ipo->list);
251 flist_add_tail(&ipo->list, &td->io_hist_list);
252 ipo->flags |= IP_F_ONLIST;
257 RB_CLEAR_NODE(&ipo->rb_node);
260 * Sort the entry into the verification list
263 p = &td->io_hist_tree.rb_node;
269 __ipo = rb_entry(parent, struct io_piece, rb_node);
270 if (ipo->file < __ipo->file)
272 else if (ipo->file > __ipo->file)
274 else if (ipo->offset < __ipo->offset) {
276 overlap = ipo->offset + ipo->len > __ipo->offset;
278 else if (ipo->offset > __ipo->offset) {
280 overlap = __ipo->offset + __ipo->len > ipo->offset;
286 dprint(FD_IO, "iolog: overlap %llu/%lu, %llu/%lu\n",
287 __ipo->offset, __ipo->len,
288 ipo->offset, ipo->len);
290 rb_erase(parent, &td->io_hist_tree);
291 remove_trim_entry(td, __ipo);
292 if (!(__ipo->flags & IP_F_IN_FLIGHT))
298 rb_link_node(&ipo->rb_node, parent, p);
299 rb_insert_color(&ipo->rb_node, &td->io_hist_tree);
300 ipo->flags |= IP_F_ONRB;
304 void unlog_io_piece(struct thread_data *td, struct io_u *io_u)
306 struct io_piece *ipo = io_u->ipo;
308 if (td->ts.nr_block_infos) {
309 uint32_t *info = io_u_block_info(td, io_u);
310 if (BLOCK_INFO_STATE(*info) < BLOCK_STATE_TRIM_FAILURE) {
311 if (io_u->ddir == DDIR_TRIM)
312 *info = BLOCK_INFO_SET_STATE(*info,
313 BLOCK_STATE_TRIM_FAILURE);
314 else if (io_u->ddir == DDIR_WRITE)
315 *info = BLOCK_INFO_SET_STATE(*info,
316 BLOCK_STATE_WRITE_FAILURE);
323 if (ipo->flags & IP_F_ONRB)
324 rb_erase(&ipo->rb_node, &td->io_hist_tree);
325 else if (ipo->flags & IP_F_ONLIST)
326 flist_del(&ipo->list);
333 void trim_io_piece(const struct io_u *io_u)
335 struct io_piece *ipo = io_u->ipo;
340 ipo->len = io_u->xfer_buflen - io_u->resid;
343 void write_iolog_close(struct thread_data *td)
349 td->iolog_buf = NULL;
352 static int64_t iolog_items_to_fetch(struct thread_data *td)
357 int64_t items_to_fetch;
359 if (!td->io_log_highmark)
363 fio_gettime(&now, NULL);
364 elapsed = ntime_since(&td->io_log_highmark_time, &now);
366 for_1s = (td->io_log_highmark - td->io_log_current) * 1000000000 / elapsed;
367 items_to_fetch = for_1s - td->io_log_current;
368 if (items_to_fetch < 0)
373 td->io_log_highmark = td->io_log_current + items_to_fetch;
374 td->io_log_checkmark = (td->io_log_highmark + 1) / 2;
375 fio_gettime(&td->io_log_highmark_time, NULL);
377 return items_to_fetch;
381 * Read version 2 iolog data. It is enhanced to include per-file logging,
384 static bool read_iolog2(struct thread_data *td)
386 unsigned long long offset;
388 int reads, writes, waits, fileno = 0, file_action = 0; /* stupid gcc */
389 char *rfname, *fname, *act;
392 int64_t items_to_fetch = 0;
394 if (td->o.read_iolog_chunked) {
395 items_to_fetch = iolog_items_to_fetch(td);
401 * Read in the read iolog and store it, reuse the infrastructure
402 * for doing verifications.
405 rfname = fname = malloc(256+16);
406 act = malloc(256+16);
408 reads = writes = waits = 0;
409 while ((p = fgets(str, 4096, td->io_log_rfile)) != NULL) {
410 struct io_piece *ipo;
413 r = sscanf(p, "%256s %256s %llu %u", rfname, act, &offset,
416 if (td->o.replay_redirect)
417 fname = td->o.replay_redirect;
423 if (!strcmp(act, "wait"))
425 else if (!strcmp(act, "read"))
427 else if (!strcmp(act, "write"))
429 else if (!strcmp(act, "sync"))
431 else if (!strcmp(act, "datasync"))
433 else if (!strcmp(act, "trim"))
436 log_err("fio: bad iolog file action: %s\n",
440 fileno = get_fileno(td, fname);
443 if (!strcmp(act, "add")) {
444 if (td->o.replay_redirect &&
445 get_fileno(td, fname) != -1) {
446 dprint(FD_FILE, "iolog: ignoring"
447 " re-add of file %s\n", fname);
449 fileno = add_file(td, fname, 0, 1);
450 file_action = FIO_LOG_ADD_FILE;
453 } else if (!strcmp(act, "open")) {
454 fileno = get_fileno(td, fname);
455 file_action = FIO_LOG_OPEN_FILE;
456 } else if (!strcmp(act, "close")) {
457 fileno = get_fileno(td, fname);
458 file_action = FIO_LOG_CLOSE_FILE;
460 log_err("fio: bad iolog file action: %s\n",
465 log_err("bad iolog2: %s\n", p);
471 else if (rw == DDIR_WRITE) {
473 * Don't add a write for ro mode
478 } else if (rw == DDIR_WAIT) {
482 } else if (rw == DDIR_INVAL) {
483 } else if (!ddir_sync(rw)) {
484 log_err("bad ddir: %d\n", rw);
491 ipo = calloc(1, sizeof(*ipo));
494 if (rw == DDIR_WAIT) {
497 if (td->o.replay_scale)
498 ipo->offset = offset / td->o.replay_scale;
500 ipo->offset = offset;
501 ipo_bytes_align(td->o.replay_align, ipo);
504 if (rw != DDIR_INVAL && bytes > td->o.max_bs[rw])
505 td->o.max_bs[rw] = bytes;
506 ipo->fileno = fileno;
507 ipo->file_action = file_action;
511 queue_io_piece(td, ipo);
513 if (td->o.read_iolog_chunked) {
514 td->io_log_current++;
516 if (items_to_fetch == 0)
525 if (td->o.read_iolog_chunked) {
526 td->io_log_highmark = td->io_log_current;
527 td->io_log_checkmark = (td->io_log_highmark + 1) / 2;
528 fio_gettime(&td->io_log_highmark_time, NULL);
531 if (writes && read_only) {
532 log_err("fio: <%s> skips replay of %d writes due to"
533 " read-only\n", td->o.name, writes);
537 if (td->o.read_iolog_chunked) {
538 if (td->io_log_current == 0) {
541 td->o.td_ddir = TD_DDIR_RW;
545 if (!reads && !writes && !waits)
547 else if (reads && !writes)
548 td->o.td_ddir = TD_DDIR_READ;
549 else if (!reads && writes)
550 td->o.td_ddir = TD_DDIR_WRITE;
552 td->o.td_ddir = TD_DDIR_RW;
557 static bool is_socket(const char *path)
560 int r = stat(path, &buf);
564 return S_ISSOCK(buf.st_mode);
567 static int open_socket(const char *path)
569 int fd = socket(AF_UNIX, SOCK_STREAM, 0);
570 struct sockaddr_un addr;
573 addr.sun_family = AF_UNIX;
574 strncpy(addr.sun_path, path, sizeof(addr.sun_path));
575 if (connect(fd, (const struct sockaddr *)&addr, strlen(path) + sizeof(addr.sun_family)) == 0)
583 * open iolog, check version, and call appropriate parser
585 static bool init_iolog_read(struct thread_data *td)
587 char buffer[256], *p;
590 if (is_socket(td->o.read_iolog_file)) {
591 int fd = open_socket(td->o.read_iolog_file);
596 f = fopen(td->o.read_iolog_file, "r");
598 perror("fopen read iolog");
602 p = fgets(buffer, sizeof(buffer), f);
604 td_verror(td, errno, "iolog read");
605 log_err("fio: unable to read iolog\n");
609 td->io_log_rfile = f;
611 * version 2 of the iolog stores a specific string as the
612 * first line, check for that
614 if (!strncmp(iolog_ver2, buffer, strlen(iolog_ver2))) {
615 free_release_files(td);
616 ret = read_iolog2(td);
619 log_err("fio: iolog version 1 is no longer supported\n");
627 * Set up a log for storing io patterns.
629 static bool init_iolog_write(struct thread_data *td)
635 f = fopen(td->o.write_iolog_file, "a");
637 perror("fopen write iolog");
642 * That's it for writing, setup a log buffer and we're done.
645 td->iolog_buf = malloc(8192);
646 setvbuf(f, td->iolog_buf, _IOFBF, 8192);
649 * write our version line
651 if (fprintf(f, "%s\n", iolog_ver2) < 0) {
652 perror("iolog init\n");
657 * add all known files
659 for_each_file(td, ff, i)
660 log_file(td, ff, FIO_LOG_ADD_FILE);
665 bool init_iolog(struct thread_data *td)
669 if (td->o.read_iolog_file) {
673 * Check if it's a blktrace file and load that if possible.
674 * Otherwise assume it's a normal log file and load that.
676 if (is_blktrace(td->o.read_iolog_file, &need_swap))
677 ret = load_blktrace(td, td->o.read_iolog_file, need_swap);
679 ret = init_iolog_read(td);
680 } else if (td->o.write_iolog_file)
681 ret = init_iolog_write(td);
686 td_verror(td, EINVAL, "failed initializing iolog");
691 void setup_log(struct io_log **log, struct log_params *p,
692 const char *filename)
696 struct io_u_plat_entry *entry;
697 struct flist_head *list;
699 l = scalloc(1, sizeof(*l));
700 INIT_FLIST_HEAD(&l->io_logs);
701 l->log_type = p->log_type;
702 l->log_offset = p->log_offset;
703 l->log_gz = p->log_gz;
704 l->log_gz_store = p->log_gz_store;
705 l->avg_msec = p->avg_msec;
706 l->hist_msec = p->hist_msec;
707 l->hist_coarseness = p->hist_coarseness;
708 l->filename = strdup(filename);
711 /* Initialize histogram lists for each r/w direction,
712 * with initial io_u_plat of all zeros:
714 for (i = 0; i < DDIR_RWDIR_CNT; i++) {
715 list = &l->hist_window[i].list;
716 INIT_FLIST_HEAD(list);
717 entry = calloc(1, sizeof(struct io_u_plat_entry));
718 flist_add(&entry->list, list);
721 if (l->td && l->td->o.io_submit_mode != IO_MODE_OFFLOAD) {
724 __p = calloc(1, sizeof(*l->pending));
725 __p->max_samples = DEF_LOG_ENTRIES;
726 __p->log = calloc(__p->max_samples, log_entry_sz(l));
731 l->log_ddir_mask = LOG_OFFSET_SAMPLE_BIT;
733 INIT_FLIST_HEAD(&l->chunk_list);
735 if (l->log_gz && !p->td)
737 else if (l->log_gz || l->log_gz_store) {
738 mutex_init_pshared(&l->chunk_lock);
739 mutex_init_pshared(&l->deferred_free_lock);
740 p->td->flags |= TD_F_COMPRESS_LOG;
746 #ifdef CONFIG_SETVBUF
747 static void *set_file_buffer(FILE *f)
749 size_t size = 1048576;
753 setvbuf(f, buf, _IOFBF, size);
757 static void clear_file_buffer(void *buf)
762 static void *set_file_buffer(FILE *f)
767 static void clear_file_buffer(void *buf)
772 void free_log(struct io_log *log)
774 while (!flist_empty(&log->io_logs)) {
775 struct io_logs *cur_log;
777 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
778 flist_del_init(&cur_log->list);
784 free(log->pending->log);
794 uint64_t hist_sum(int j, int stride, uint64_t *io_u_plat,
795 uint64_t *io_u_plat_last)
800 if (io_u_plat_last) {
801 for (k = sum = 0; k < stride; k++)
802 sum += io_u_plat[j + k] - io_u_plat_last[j + k];
804 for (k = sum = 0; k < stride; k++)
805 sum += io_u_plat[j + k];
811 static void flush_hist_samples(FILE *f, int hist_coarseness, void *samples,
812 uint64_t sample_size)
816 uint64_t i, j, nr_samples;
817 struct io_u_plat_entry *entry, *entry_before;
819 uint64_t *io_u_plat_before;
821 int stride = 1 << hist_coarseness;
826 s = __get_sample(samples, 0, 0);
827 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
829 nr_samples = sample_size / __log_entry_sz(log_offset);
831 for (i = 0; i < nr_samples; i++) {
832 s = __get_sample(samples, log_offset, i);
834 entry = s->data.plat_entry;
835 io_u_plat = entry->io_u_plat;
837 entry_before = flist_first_entry(&entry->list, struct io_u_plat_entry, list);
838 io_u_plat_before = entry_before->io_u_plat;
840 fprintf(f, "%lu, %u, %llu, ", (unsigned long) s->time,
841 io_sample_ddir(s), (unsigned long long) s->bs);
842 for (j = 0; j < FIO_IO_U_PLAT_NR - stride; j += stride) {
843 fprintf(f, "%llu, ", (unsigned long long)
844 hist_sum(j, stride, io_u_plat, io_u_plat_before));
846 fprintf(f, "%llu\n", (unsigned long long)
847 hist_sum(FIO_IO_U_PLAT_NR - stride, stride, io_u_plat,
850 flist_del(&entry_before->list);
855 void flush_samples(FILE *f, void *samples, uint64_t sample_size)
859 uint64_t i, nr_samples;
864 s = __get_sample(samples, 0, 0);
865 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
867 nr_samples = sample_size / __log_entry_sz(log_offset);
869 for (i = 0; i < nr_samples; i++) {
870 s = __get_sample(samples, log_offset, i);
873 fprintf(f, "%lu, %" PRId64 ", %u, %llu\n",
874 (unsigned long) s->time,
876 io_sample_ddir(s), (unsigned long long) s->bs);
878 struct io_sample_offset *so = (void *) s;
880 fprintf(f, "%lu, %" PRId64 ", %u, %llu, %llu\n",
881 (unsigned long) s->time,
883 io_sample_ddir(s), (unsigned long long) s->bs,
884 (unsigned long long) so->offset);
891 struct iolog_flush_data {
892 struct workqueue_work work;
899 #define GZ_CHUNK 131072
901 static struct iolog_compress *get_new_chunk(unsigned int seq)
903 struct iolog_compress *c;
905 c = malloc(sizeof(*c));
906 INIT_FLIST_HEAD(&c->list);
907 c->buf = malloc(GZ_CHUNK);
913 static void free_chunk(struct iolog_compress *ic)
919 static int z_stream_init(z_stream *stream, int gz_hdr)
923 memset(stream, 0, sizeof(*stream));
924 stream->zalloc = Z_NULL;
925 stream->zfree = Z_NULL;
926 stream->opaque = Z_NULL;
927 stream->next_in = Z_NULL;
930 * zlib magic - add 32 for auto-detection of gz header or not,
931 * if we decide to store files in a gzip friendly format.
936 if (inflateInit2(stream, wbits) != Z_OK)
942 struct inflate_chunk_iter {
951 static void finish_chunk(z_stream *stream, FILE *f,
952 struct inflate_chunk_iter *iter)
956 ret = inflateEnd(stream);
958 log_err("fio: failed to end log inflation seq %d (%d)\n",
961 flush_samples(f, iter->buf, iter->buf_used);
964 iter->buf_size = iter->buf_used = 0;
968 * Iterative chunk inflation. Handles cases where we cross into a new
969 * sequence, doing flush finish of previous chunk if needed.
971 static size_t inflate_chunk(struct iolog_compress *ic, int gz_hdr, FILE *f,
972 z_stream *stream, struct inflate_chunk_iter *iter)
976 dprint(FD_COMPRESS, "inflate chunk size=%lu, seq=%u\n",
977 (unsigned long) ic->len, ic->seq);
979 if (ic->seq != iter->seq) {
981 finish_chunk(stream, f, iter);
983 z_stream_init(stream, gz_hdr);
987 stream->avail_in = ic->len;
988 stream->next_in = ic->buf;
990 if (!iter->buf_size) {
991 iter->buf_size = iter->chunk_sz;
992 iter->buf = malloc(iter->buf_size);
995 while (stream->avail_in) {
996 size_t this_out = iter->buf_size - iter->buf_used;
999 stream->avail_out = this_out;
1000 stream->next_out = iter->buf + iter->buf_used;
1002 err = inflate(stream, Z_NO_FLUSH);
1004 log_err("fio: failed inflating log: %d\n", err);
1009 iter->buf_used += this_out - stream->avail_out;
1011 if (!stream->avail_out) {
1012 iter->buf_size += iter->chunk_sz;
1013 iter->buf = realloc(iter->buf, iter->buf_size);
1017 if (err == Z_STREAM_END)
1021 ret = (void *) stream->next_in - ic->buf;
1023 dprint(FD_COMPRESS, "inflated to size=%lu\n", (unsigned long) iter->buf_size);
1029 * Inflate stored compressed chunks, or write them directly to the log
1030 * file if so instructed.
1032 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1034 struct inflate_chunk_iter iter = { .chunk_sz = log->log_gz, };
1037 while (!flist_empty(&log->chunk_list)) {
1038 struct iolog_compress *ic;
1040 ic = flist_first_entry(&log->chunk_list, struct iolog_compress, list);
1041 flist_del(&ic->list);
1043 if (log->log_gz_store) {
1046 dprint(FD_COMPRESS, "log write chunk size=%lu, "
1047 "seq=%u\n", (unsigned long) ic->len, ic->seq);
1049 ret = fwrite(ic->buf, ic->len, 1, f);
1050 if (ret != 1 || ferror(f)) {
1052 log_err("fio: error writing compressed log\n");
1055 inflate_chunk(ic, log->log_gz_store, f, &stream, &iter);
1061 finish_chunk(&stream, f, &iter);
1069 * Open compressed log file and decompress the stored chunks and
1070 * write them to stdout. The chunks are stored sequentially in the
1071 * file, so we iterate over them and do them one-by-one.
1073 int iolog_file_inflate(const char *file)
1075 struct inflate_chunk_iter iter = { .chunk_sz = 64 * 1024 * 1024, };
1076 struct iolog_compress ic;
1084 f = fopen(file, "r");
1090 if (stat(file, &sb) < 0) {
1096 ic.buf = buf = malloc(sb.st_size);
1097 ic.len = sb.st_size;
1100 ret = fread(ic.buf, ic.len, 1, f);
1101 if (ret == 0 && ferror(f)) {
1106 } else if (ferror(f) || (!feof(f) && ret != 1)) {
1107 log_err("fio: short read on reading log\n");
1116 * Each chunk will return Z_STREAM_END. We don't know how many
1117 * chunks are in the file, so we just keep looping and incrementing
1118 * the sequence number until we have consumed the whole compressed
1125 iret = inflate_chunk(&ic, 1, stdout, &stream, &iter);
1138 finish_chunk(&stream, stdout, &iter);
1148 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1153 int iolog_file_inflate(const char *file)
1155 log_err("fio: log inflation not possible without zlib\n");
1161 void flush_log(struct io_log *log, bool do_append)
1167 f = fopen(log->filename, "w");
1169 f = fopen(log->filename, "a");
1171 perror("fopen log");
1175 buf = set_file_buffer(f);
1177 inflate_gz_chunks(log, f);
1179 while (!flist_empty(&log->io_logs)) {
1180 struct io_logs *cur_log;
1182 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1183 flist_del_init(&cur_log->list);
1185 if (log->td && log == log->td->clat_hist_log)
1186 flush_hist_samples(f, log->hist_coarseness, cur_log->log,
1187 log_sample_sz(log, cur_log));
1189 flush_samples(f, cur_log->log, log_sample_sz(log, cur_log));
1195 clear_file_buffer(buf);
1198 static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
1200 if (td->flags & TD_F_COMPRESS_LOG)
1204 if (fio_trylock_file(log->filename))
1207 fio_lock_file(log->filename);
1209 if (td->client_type == FIO_CLIENT_TYPE_GUI || is_backend)
1210 fio_send_iolog(td, log, log->filename);
1212 flush_log(log, !td->o.per_job_logs);
1214 fio_unlock_file(log->filename);
1219 size_t log_chunk_sizes(struct io_log *log)
1221 struct flist_head *entry;
1224 if (flist_empty(&log->chunk_list))
1228 pthread_mutex_lock(&log->chunk_lock);
1229 flist_for_each(entry, &log->chunk_list) {
1230 struct iolog_compress *c;
1232 c = flist_entry(entry, struct iolog_compress, list);
1235 pthread_mutex_unlock(&log->chunk_lock);
1241 static void iolog_put_deferred(struct io_log *log, void *ptr)
1246 pthread_mutex_lock(&log->deferred_free_lock);
1247 if (log->deferred < IOLOG_MAX_DEFER) {
1248 log->deferred_items[log->deferred] = ptr;
1250 } else if (!fio_did_warn(FIO_WARN_IOLOG_DROP))
1251 log_err("fio: had to drop log entry free\n");
1252 pthread_mutex_unlock(&log->deferred_free_lock);
1255 static void iolog_free_deferred(struct io_log *log)
1262 pthread_mutex_lock(&log->deferred_free_lock);
1264 for (i = 0; i < log->deferred; i++) {
1265 free(log->deferred_items[i]);
1266 log->deferred_items[i] = NULL;
1270 pthread_mutex_unlock(&log->deferred_free_lock);
1273 static int gz_work(struct iolog_flush_data *data)
1275 struct iolog_compress *c = NULL;
1276 struct flist_head list;
1282 INIT_FLIST_HEAD(&list);
1284 memset(&stream, 0, sizeof(stream));
1285 stream.zalloc = Z_NULL;
1286 stream.zfree = Z_NULL;
1287 stream.opaque = Z_NULL;
1289 ret = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
1291 log_err("fio: failed to init gz stream\n");
1295 seq = ++data->log->chunk_seq;
1297 stream.next_in = (void *) data->samples;
1298 stream.avail_in = data->nr_samples * log_entry_sz(data->log);
1300 dprint(FD_COMPRESS, "deflate input size=%lu, seq=%u, log=%s\n",
1301 (unsigned long) stream.avail_in, seq,
1302 data->log->filename);
1305 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1306 (unsigned long) c->len);
1307 c = get_new_chunk(seq);
1308 stream.avail_out = GZ_CHUNK;
1309 stream.next_out = c->buf;
1310 ret = deflate(&stream, Z_NO_FLUSH);
1312 log_err("fio: deflate log (%d)\n", ret);
1317 c->len = GZ_CHUNK - stream.avail_out;
1318 flist_add_tail(&c->list, &list);
1320 } while (stream.avail_in);
1322 stream.next_out = c->buf + c->len;
1323 stream.avail_out = GZ_CHUNK - c->len;
1325 ret = deflate(&stream, Z_FINISH);
1328 * Z_BUF_ERROR is special, it just means we need more
1329 * output space. We'll handle that below. Treat any other
1332 if (ret != Z_BUF_ERROR) {
1333 log_err("fio: deflate log (%d)\n", ret);
1334 flist_del(&c->list);
1341 c->len = GZ_CHUNK - stream.avail_out;
1343 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq, (unsigned long) c->len);
1345 if (ret != Z_STREAM_END) {
1347 c = get_new_chunk(seq);
1348 stream.avail_out = GZ_CHUNK;
1349 stream.next_out = c->buf;
1350 ret = deflate(&stream, Z_FINISH);
1351 c->len = GZ_CHUNK - stream.avail_out;
1353 flist_add_tail(&c->list, &list);
1354 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1355 (unsigned long) c->len);
1356 } while (ret != Z_STREAM_END);
1359 dprint(FD_COMPRESS, "deflated to size=%lu\n", (unsigned long) total);
1361 ret = deflateEnd(&stream);
1363 log_err("fio: deflateEnd %d\n", ret);
1365 iolog_put_deferred(data->log, data->samples);
1367 if (!flist_empty(&list)) {
1368 pthread_mutex_lock(&data->log->chunk_lock);
1369 flist_splice_tail(&list, &data->log->chunk_list);
1370 pthread_mutex_unlock(&data->log->chunk_lock);
1379 while (!flist_empty(&list)) {
1380 c = flist_first_entry(list.next, struct iolog_compress, list);
1381 flist_del(&c->list);
1389 * Invoked from our compress helper thread, when logging would have exceeded
1390 * the specified memory limitation. Compresses the previously stored
1393 static int gz_work_async(struct submit_worker *sw, struct workqueue_work *work)
1395 return gz_work(container_of(work, struct iolog_flush_data, work));
1398 static int gz_init_worker(struct submit_worker *sw)
1400 struct thread_data *td = sw->wq->td;
1402 if (!fio_option_is_set(&td->o, log_gz_cpumask))
1405 if (fio_setaffinity(gettid(), td->o.log_gz_cpumask) == -1) {
1406 log_err("gz: failed to set CPU affinity\n");
1413 static struct workqueue_ops log_compress_wq_ops = {
1414 .fn = gz_work_async,
1415 .init_worker_fn = gz_init_worker,
1419 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1421 if (!(td->flags & TD_F_COMPRESS_LOG))
1424 workqueue_init(td, &td->log_compress_wq, &log_compress_wq_ops, 1, sk_out);
1428 void iolog_compress_exit(struct thread_data *td)
1430 if (!(td->flags & TD_F_COMPRESS_LOG))
1433 workqueue_exit(&td->log_compress_wq);
1437 * Queue work item to compress the existing log entries. We reset the
1438 * current log to a small size, and reference the existing log in the
1439 * data that we queue for compression. Once compression has been done,
1440 * this old log is freed. If called with finish == true, will not return
1441 * until the log compression has completed, and will flush all previous
1444 static int iolog_flush(struct io_log *log)
1446 struct iolog_flush_data *data;
1448 data = malloc(sizeof(*data));
1455 while (!flist_empty(&log->io_logs)) {
1456 struct io_logs *cur_log;
1458 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1459 flist_del_init(&cur_log->list);
1461 data->samples = cur_log->log;
1462 data->nr_samples = cur_log->nr_samples;
1473 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1475 struct iolog_flush_data *data;
1477 data = smalloc(sizeof(*data));
1483 data->samples = cur_log->log;
1484 data->nr_samples = cur_log->nr_samples;
1487 cur_log->nr_samples = cur_log->max_samples = 0;
1488 cur_log->log = NULL;
1490 workqueue_enqueue(&log->td->log_compress_wq, &data->work);
1492 iolog_free_deferred(log);
1498 static int iolog_flush(struct io_log *log)
1503 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1508 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1513 void iolog_compress_exit(struct thread_data *td)
1519 struct io_logs *iolog_cur_log(struct io_log *log)
1521 if (flist_empty(&log->io_logs))
1524 return flist_last_entry(&log->io_logs, struct io_logs, list);
1527 uint64_t iolog_nr_samples(struct io_log *iolog)
1529 struct flist_head *entry;
1532 flist_for_each(entry, &iolog->io_logs) {
1533 struct io_logs *cur_log;
1535 cur_log = flist_entry(entry, struct io_logs, list);
1536 ret += cur_log->nr_samples;
1542 static int __write_log(struct thread_data *td, struct io_log *log, int try)
1545 return finish_log(td, log, try);
1550 static int write_iops_log(struct thread_data *td, int try, bool unit_log)
1554 if (per_unit_log(td->iops_log) != unit_log)
1557 ret = __write_log(td, td->iops_log, try);
1559 td->iops_log = NULL;
1564 static int write_slat_log(struct thread_data *td, int try, bool unit_log)
1571 ret = __write_log(td, td->slat_log, try);
1573 td->slat_log = NULL;
1578 static int write_clat_log(struct thread_data *td, int try, bool unit_log)
1585 ret = __write_log(td, td->clat_log, try);
1587 td->clat_log = NULL;
1592 static int write_clat_hist_log(struct thread_data *td, int try, bool unit_log)
1599 ret = __write_log(td, td->clat_hist_log, try);
1601 td->clat_hist_log = NULL;
1606 static int write_lat_log(struct thread_data *td, int try, bool unit_log)
1613 ret = __write_log(td, td->lat_log, try);
1620 static int write_bandw_log(struct thread_data *td, int try, bool unit_log)
1624 if (per_unit_log(td->bw_log) != unit_log)
1627 ret = __write_log(td, td->bw_log, try);
1640 CLAT_HIST_LOG_MASK = 32,
1647 int (*fn)(struct thread_data *, int, bool);
1650 static struct log_type log_types[] = {
1652 .mask = BW_LOG_MASK,
1653 .fn = write_bandw_log,
1656 .mask = LAT_LOG_MASK,
1657 .fn = write_lat_log,
1660 .mask = SLAT_LOG_MASK,
1661 .fn = write_slat_log,
1664 .mask = CLAT_LOG_MASK,
1665 .fn = write_clat_log,
1668 .mask = IOPS_LOG_MASK,
1669 .fn = write_iops_log,
1672 .mask = CLAT_HIST_LOG_MASK,
1673 .fn = write_clat_hist_log,
1677 void td_writeout_logs(struct thread_data *td, bool unit_logs)
1679 unsigned int log_mask = 0;
1680 unsigned int log_left = ALL_LOG_NR;
1683 old_state = td_bump_runstate(td, TD_FINISHING);
1685 finalize_logs(td, unit_logs);
1688 int prev_log_left = log_left;
1690 for (i = 0; i < ALL_LOG_NR && log_left; i++) {
1691 struct log_type *lt = &log_types[i];
1694 if (!(log_mask & lt->mask)) {
1695 ret = lt->fn(td, log_left != 1, unit_logs);
1698 log_mask |= lt->mask;
1703 if (prev_log_left == log_left)
1707 td_restore_runstate(td, old_state);
1710 void fio_writeout_logs(bool unit_logs)
1712 struct thread_data *td;
1716 td_writeout_logs(td, unit_logs);