2 * Code related to writing an iolog of what a thread is doing, and to
3 * later read that back and replay
23 #include <netinet/in.h>
24 #include <netinet/tcp.h>
25 #include <arpa/inet.h>
27 #include <sys/socket.h>
30 static int iolog_flush(struct io_log *log);
32 static const char iolog_ver2[] = "fio version 2 iolog";
34 void queue_io_piece(struct thread_data *td, struct io_piece *ipo)
36 flist_add_tail(&ipo->list, &td->io_log_list);
37 td->total_io_size += ipo->len;
40 void log_io_u(const struct thread_data *td, const struct io_u *io_u)
42 if (!td->o.write_iolog_file)
45 fprintf(td->iolog_f, "%s %s %llu %llu\n", io_u->file->file_name,
46 io_ddir_name(io_u->ddir),
47 io_u->offset, io_u->buflen);
50 void log_file(struct thread_data *td, struct fio_file *f,
51 enum file_log_act what)
53 const char *act[] = { "add", "open", "close" };
57 if (!td->o.write_iolog_file)
62 * this happens on the pre-open/close done before the job starts
67 fprintf(td->iolog_f, "%s %s\n", f->file_name, act[what]);
70 static void iolog_delay(struct thread_data *td, unsigned long delay)
72 uint64_t usec = utime_since_now(&td->last_issue);
73 unsigned long orig_delay = delay;
77 if (delay < td->time_offset) {
82 delay -= td->time_offset;
88 fio_gettime(&ts, NULL);
89 while (delay && !td->terminate) {
91 if (this_delay > 500000)
94 usec_sleep(td, this_delay);
98 usec = utime_since_now(&ts);
99 if (usec > orig_delay)
100 td->time_offset = usec - orig_delay;
105 static int ipo_special(struct thread_data *td, struct io_piece *ipo)
113 if (ipo->ddir != DDIR_INVAL)
116 f = td->files[ipo->fileno];
118 switch (ipo->file_action) {
119 case FIO_LOG_OPEN_FILE:
120 if (td->o.replay_redirect && fio_file_open(f)) {
121 dprint(FD_FILE, "iolog: ignoring re-open of file %s\n",
125 ret = td_io_open_file(td, f);
128 td_verror(td, ret, "iolog open file");
130 case FIO_LOG_CLOSE_FILE:
131 td_io_close_file(td, f);
133 case FIO_LOG_UNLINK_FILE:
134 td_io_unlink_file(td, f);
137 log_err("fio: bad file action %d\n", ipo->file_action);
144 static bool read_iolog2(struct thread_data *td);
146 int read_iolog_get(struct thread_data *td, struct io_u *io_u)
148 struct io_piece *ipo;
149 unsigned long elapsed;
151 while (!flist_empty(&td->io_log_list)) {
153 if (td->o.read_iolog_chunked) {
154 if (td->io_log_checkmark == td->io_log_current) {
155 if (!read_iolog2(td))
158 td->io_log_current--;
160 ipo = flist_first_entry(&td->io_log_list, struct io_piece, list);
161 flist_del(&ipo->list);
162 remove_trim_entry(td, ipo);
164 ret = ipo_special(td, ipo);
168 } else if (ret > 0) {
173 io_u->ddir = ipo->ddir;
174 if (ipo->ddir != DDIR_WAIT) {
175 io_u->offset = ipo->offset;
176 io_u->buflen = ipo->len;
177 io_u->file = td->files[ipo->fileno];
178 get_file(io_u->file);
179 dprint(FD_IO, "iolog: get %llu/%llu/%s\n", io_u->offset,
180 io_u->buflen, io_u->file->file_name);
182 iolog_delay(td, ipo->delay);
184 elapsed = mtime_since_genesis();
185 if (ipo->delay > elapsed)
186 usec_sleep(td, (ipo->delay - elapsed) * 1000);
191 if (io_u->ddir != DDIR_WAIT)
199 void prune_io_piece_log(struct thread_data *td)
201 struct io_piece *ipo;
202 struct fio_rb_node *n;
204 while ((n = rb_first(&td->io_hist_tree)) != NULL) {
205 ipo = rb_entry(n, struct io_piece, rb_node);
206 rb_erase(n, &td->io_hist_tree);
207 remove_trim_entry(td, ipo);
212 while (!flist_empty(&td->io_hist_list)) {
213 ipo = flist_first_entry(&td->io_hist_list, struct io_piece, list);
214 flist_del(&ipo->list);
215 remove_trim_entry(td, ipo);
222 * log a successful write, so we can unwind the log for verify
224 void log_io_piece(struct thread_data *td, struct io_u *io_u)
226 struct fio_rb_node **p, *parent;
227 struct io_piece *ipo, *__ipo;
229 ipo = calloc(1, sizeof(struct io_piece));
231 ipo->file = io_u->file;
232 ipo->offset = io_u->offset;
233 ipo->len = io_u->buflen;
234 ipo->numberio = io_u->numberio;
235 ipo->flags = IP_F_IN_FLIGHT;
239 if (io_u_should_trim(td, io_u)) {
240 flist_add_tail(&ipo->trim_list, &td->trim_list);
245 * Only sort writes if we don't have a random map in which case we need
246 * to check for duplicate blocks and drop the old one, which we rely on
247 * the rb insert/lookup for handling.
249 if (file_randommap(td, ipo->file)) {
250 INIT_FLIST_HEAD(&ipo->list);
251 flist_add_tail(&ipo->list, &td->io_hist_list);
252 ipo->flags |= IP_F_ONLIST;
257 RB_CLEAR_NODE(&ipo->rb_node);
260 * Sort the entry into the verification list
263 p = &td->io_hist_tree.rb_node;
269 __ipo = rb_entry(parent, struct io_piece, rb_node);
270 if (ipo->file < __ipo->file)
272 else if (ipo->file > __ipo->file)
274 else if (ipo->offset < __ipo->offset) {
276 overlap = ipo->offset + ipo->len > __ipo->offset;
278 else if (ipo->offset > __ipo->offset) {
280 overlap = __ipo->offset + __ipo->len > ipo->offset;
286 dprint(FD_IO, "iolog: overlap %llu/%lu, %llu/%lu\n",
287 __ipo->offset, __ipo->len,
288 ipo->offset, ipo->len);
290 rb_erase(parent, &td->io_hist_tree);
291 remove_trim_entry(td, __ipo);
292 if (!(__ipo->flags & IP_F_IN_FLIGHT))
298 rb_link_node(&ipo->rb_node, parent, p);
299 rb_insert_color(&ipo->rb_node, &td->io_hist_tree);
300 ipo->flags |= IP_F_ONRB;
304 void unlog_io_piece(struct thread_data *td, struct io_u *io_u)
306 struct io_piece *ipo = io_u->ipo;
308 if (td->ts.nr_block_infos) {
309 uint32_t *info = io_u_block_info(td, io_u);
310 if (BLOCK_INFO_STATE(*info) < BLOCK_STATE_TRIM_FAILURE) {
311 if (io_u->ddir == DDIR_TRIM)
312 *info = BLOCK_INFO_SET_STATE(*info,
313 BLOCK_STATE_TRIM_FAILURE);
314 else if (io_u->ddir == DDIR_WRITE)
315 *info = BLOCK_INFO_SET_STATE(*info,
316 BLOCK_STATE_WRITE_FAILURE);
323 if (ipo->flags & IP_F_ONRB)
324 rb_erase(&ipo->rb_node, &td->io_hist_tree);
325 else if (ipo->flags & IP_F_ONLIST)
326 flist_del(&ipo->list);
333 void trim_io_piece(const struct io_u *io_u)
335 struct io_piece *ipo = io_u->ipo;
340 ipo->len = io_u->xfer_buflen - io_u->resid;
343 void write_iolog_close(struct thread_data *td)
349 td->iolog_buf = NULL;
352 static int64_t iolog_items_to_fetch(struct thread_data *td)
357 int64_t items_to_fetch;
359 if (!td->io_log_highmark)
363 fio_gettime(&now, NULL);
364 elapsed = ntime_since(&td->io_log_highmark_time, &now);
366 for_1s = (td->io_log_highmark - td->io_log_current) * 1000000000 / elapsed;
367 items_to_fetch = for_1s - td->io_log_current;
368 if (items_to_fetch < 0)
373 td->io_log_highmark = td->io_log_current + items_to_fetch;
374 td->io_log_checkmark = (td->io_log_highmark + 1) / 2;
375 fio_gettime(&td->io_log_highmark_time, NULL);
377 return items_to_fetch;
381 * Read version 2 iolog data. It is enhanced to include per-file logging,
384 static bool read_iolog2(struct thread_data *td)
386 unsigned long long offset;
388 int reads, writes, waits, fileno = 0, file_action = 0; /* stupid gcc */
389 char *rfname, *fname, *act;
392 bool realloc = false;
393 int64_t items_to_fetch = 0;
395 if (td->o.read_iolog_chunked) {
396 items_to_fetch = iolog_items_to_fetch(td);
402 * Read in the read iolog and store it, reuse the infrastructure
403 * for doing verifications.
406 rfname = fname = malloc(256+16);
407 act = malloc(256+16);
409 reads = writes = waits = 0;
410 while ((p = fgets(str, 4096, td->io_log_rfile)) != NULL) {
411 struct io_piece *ipo;
414 r = sscanf(p, "%256s %256s %llu %u", rfname, act, &offset,
417 if (td->o.replay_redirect)
418 fname = td->o.replay_redirect;
424 if (!strcmp(act, "wait"))
426 else if (!strcmp(act, "read"))
428 else if (!strcmp(act, "write"))
430 else if (!strcmp(act, "sync"))
432 else if (!strcmp(act, "datasync"))
434 else if (!strcmp(act, "trim"))
437 log_err("fio: bad iolog file action: %s\n",
441 fileno = get_fileno(td, fname);
444 if (!strcmp(act, "add")) {
445 if (td->o.replay_redirect &&
446 get_fileno(td, fname) != -1) {
447 dprint(FD_FILE, "iolog: ignoring"
448 " re-add of file %s\n", fname);
450 fileno = add_file(td, fname, 0, 1);
451 file_action = FIO_LOG_ADD_FILE;
454 } else if (!strcmp(act, "open")) {
455 fileno = get_fileno(td, fname);
456 file_action = FIO_LOG_OPEN_FILE;
457 } else if (!strcmp(act, "close")) {
458 fileno = get_fileno(td, fname);
459 file_action = FIO_LOG_CLOSE_FILE;
461 log_err("fio: bad iolog file action: %s\n",
466 log_err("bad iolog2: %s\n", p);
472 else if (rw == DDIR_WRITE) {
474 * Don't add a write for ro mode
479 } else if (rw == DDIR_WAIT) {
483 } else if (rw == DDIR_INVAL) {
484 } else if (!ddir_sync(rw)) {
485 log_err("bad ddir: %d\n", rw);
492 ipo = calloc(1, sizeof(*ipo));
495 if (rw == DDIR_WAIT) {
498 if (td->o.replay_scale)
499 ipo->offset = offset / td->o.replay_scale;
501 ipo->offset = offset;
502 ipo_bytes_align(td->o.replay_align, ipo);
505 if (rw != DDIR_INVAL && bytes > td->o.max_bs[rw]) {
507 td->o.max_bs[rw] = bytes;
509 ipo->fileno = fileno;
510 ipo->file_action = file_action;
514 queue_io_piece(td, ipo);
516 if (td->o.read_iolog_chunked) {
517 td->io_log_current++;
519 if (items_to_fetch == 0)
528 if (td->o.read_iolog_chunked) {
529 td->io_log_highmark = td->io_log_current;
530 td->io_log_checkmark = (td->io_log_highmark + 1) / 2;
531 fio_gettime(&td->io_log_highmark_time, NULL);
534 if (writes && read_only) {
535 log_err("fio: <%s> skips replay of %d writes due to"
536 " read-only\n", td->o.name, writes);
540 if (td->o.read_iolog_chunked) {
541 if (td->io_log_current == 0) {
544 td->o.td_ddir = TD_DDIR_RW;
545 if (realloc && td->orig_buffer)
549 init_io_u_buffers(td);
554 if (!reads && !writes && !waits)
556 else if (reads && !writes)
557 td->o.td_ddir = TD_DDIR_READ;
558 else if (!reads && writes)
559 td->o.td_ddir = TD_DDIR_WRITE;
561 td->o.td_ddir = TD_DDIR_RW;
566 static bool is_socket(const char *path)
569 int r = stat(path, &buf);
573 return S_ISSOCK(buf.st_mode);
576 static int open_socket(const char *path)
578 int fd = socket(AF_UNIX, SOCK_STREAM, 0);
579 struct sockaddr_un addr;
582 addr.sun_family = AF_UNIX;
583 strncpy(addr.sun_path, path, sizeof(addr.sun_path));
584 if (connect(fd, (const struct sockaddr *)&addr, strlen(path) + sizeof(addr.sun_family)) == 0)
592 * open iolog, check version, and call appropriate parser
594 static bool init_iolog_read(struct thread_data *td)
596 char buffer[256], *p;
599 if (is_socket(td->o.read_iolog_file)) {
600 int fd = open_socket(td->o.read_iolog_file);
605 f = fopen(td->o.read_iolog_file, "r");
607 perror("fopen read iolog");
611 p = fgets(buffer, sizeof(buffer), f);
613 td_verror(td, errno, "iolog read");
614 log_err("fio: unable to read iolog\n");
618 td->io_log_rfile = f;
620 * version 2 of the iolog stores a specific string as the
621 * first line, check for that
623 if (!strncmp(iolog_ver2, buffer, strlen(iolog_ver2))) {
624 free_release_files(td);
625 ret = read_iolog2(td);
628 log_err("fio: iolog version 1 is no longer supported\n");
636 * Set up a log for storing io patterns.
638 static bool init_iolog_write(struct thread_data *td)
644 f = fopen(td->o.write_iolog_file, "a");
646 perror("fopen write iolog");
651 * That's it for writing, setup a log buffer and we're done.
654 td->iolog_buf = malloc(8192);
655 setvbuf(f, td->iolog_buf, _IOFBF, 8192);
658 * write our version line
660 if (fprintf(f, "%s\n", iolog_ver2) < 0) {
661 perror("iolog init\n");
666 * add all known files
668 for_each_file(td, ff, i)
669 log_file(td, ff, FIO_LOG_ADD_FILE);
674 bool init_iolog(struct thread_data *td)
678 if (td->o.read_iolog_file) {
682 * Check if it's a blktrace file and load that if possible.
683 * Otherwise assume it's a normal log file and load that.
685 if (is_blktrace(td->o.read_iolog_file, &need_swap))
686 ret = load_blktrace(td, td->o.read_iolog_file, need_swap);
688 ret = init_iolog_read(td);
689 } else if (td->o.write_iolog_file)
690 ret = init_iolog_write(td);
695 td_verror(td, EINVAL, "failed initializing iolog");
700 void setup_log(struct io_log **log, struct log_params *p,
701 const char *filename)
705 struct io_u_plat_entry *entry;
706 struct flist_head *list;
708 l = scalloc(1, sizeof(*l));
709 INIT_FLIST_HEAD(&l->io_logs);
710 l->log_type = p->log_type;
711 l->log_offset = p->log_offset;
712 l->log_gz = p->log_gz;
713 l->log_gz_store = p->log_gz_store;
714 l->avg_msec = p->avg_msec;
715 l->hist_msec = p->hist_msec;
716 l->hist_coarseness = p->hist_coarseness;
717 l->filename = strdup(filename);
720 /* Initialize histogram lists for each r/w direction,
721 * with initial io_u_plat of all zeros:
723 for (i = 0; i < DDIR_RWDIR_CNT; i++) {
724 list = &l->hist_window[i].list;
725 INIT_FLIST_HEAD(list);
726 entry = calloc(1, sizeof(struct io_u_plat_entry));
727 flist_add(&entry->list, list);
730 if (l->td && l->td->o.io_submit_mode != IO_MODE_OFFLOAD) {
733 __p = calloc(1, sizeof(*l->pending));
734 __p->max_samples = DEF_LOG_ENTRIES;
735 __p->log = calloc(__p->max_samples, log_entry_sz(l));
740 l->log_ddir_mask = LOG_OFFSET_SAMPLE_BIT;
742 INIT_FLIST_HEAD(&l->chunk_list);
744 if (l->log_gz && !p->td)
746 else if (l->log_gz || l->log_gz_store) {
747 mutex_init_pshared(&l->chunk_lock);
748 mutex_init_pshared(&l->deferred_free_lock);
749 p->td->flags |= TD_F_COMPRESS_LOG;
755 #ifdef CONFIG_SETVBUF
756 static void *set_file_buffer(FILE *f)
758 size_t size = 1048576;
762 setvbuf(f, buf, _IOFBF, size);
766 static void clear_file_buffer(void *buf)
771 static void *set_file_buffer(FILE *f)
776 static void clear_file_buffer(void *buf)
781 void free_log(struct io_log *log)
783 while (!flist_empty(&log->io_logs)) {
784 struct io_logs *cur_log;
786 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
787 flist_del_init(&cur_log->list);
793 free(log->pending->log);
803 uint64_t hist_sum(int j, int stride, uint64_t *io_u_plat,
804 uint64_t *io_u_plat_last)
809 if (io_u_plat_last) {
810 for (k = sum = 0; k < stride; k++)
811 sum += io_u_plat[j + k] - io_u_plat_last[j + k];
813 for (k = sum = 0; k < stride; k++)
814 sum += io_u_plat[j + k];
820 static void flush_hist_samples(FILE *f, int hist_coarseness, void *samples,
821 uint64_t sample_size)
825 uint64_t i, j, nr_samples;
826 struct io_u_plat_entry *entry, *entry_before;
828 uint64_t *io_u_plat_before;
830 int stride = 1 << hist_coarseness;
835 s = __get_sample(samples, 0, 0);
836 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
838 nr_samples = sample_size / __log_entry_sz(log_offset);
840 for (i = 0; i < nr_samples; i++) {
841 s = __get_sample(samples, log_offset, i);
843 entry = s->data.plat_entry;
844 io_u_plat = entry->io_u_plat;
846 entry_before = flist_first_entry(&entry->list, struct io_u_plat_entry, list);
847 io_u_plat_before = entry_before->io_u_plat;
849 fprintf(f, "%lu, %u, %llu, ", (unsigned long) s->time,
850 io_sample_ddir(s), (unsigned long long) s->bs);
851 for (j = 0; j < FIO_IO_U_PLAT_NR - stride; j += stride) {
852 fprintf(f, "%llu, ", (unsigned long long)
853 hist_sum(j, stride, io_u_plat, io_u_plat_before));
855 fprintf(f, "%llu\n", (unsigned long long)
856 hist_sum(FIO_IO_U_PLAT_NR - stride, stride, io_u_plat,
859 flist_del(&entry_before->list);
864 void flush_samples(FILE *f, void *samples, uint64_t sample_size)
868 uint64_t i, nr_samples;
873 s = __get_sample(samples, 0, 0);
874 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
876 nr_samples = sample_size / __log_entry_sz(log_offset);
878 for (i = 0; i < nr_samples; i++) {
879 s = __get_sample(samples, log_offset, i);
882 fprintf(f, "%lu, %" PRId64 ", %u, %llu\n",
883 (unsigned long) s->time,
885 io_sample_ddir(s), (unsigned long long) s->bs);
887 struct io_sample_offset *so = (void *) s;
889 fprintf(f, "%lu, %" PRId64 ", %u, %llu, %llu\n",
890 (unsigned long) s->time,
892 io_sample_ddir(s), (unsigned long long) s->bs,
893 (unsigned long long) so->offset);
900 struct iolog_flush_data {
901 struct workqueue_work work;
908 #define GZ_CHUNK 131072
910 static struct iolog_compress *get_new_chunk(unsigned int seq)
912 struct iolog_compress *c;
914 c = malloc(sizeof(*c));
915 INIT_FLIST_HEAD(&c->list);
916 c->buf = malloc(GZ_CHUNK);
922 static void free_chunk(struct iolog_compress *ic)
928 static int z_stream_init(z_stream *stream, int gz_hdr)
932 memset(stream, 0, sizeof(*stream));
933 stream->zalloc = Z_NULL;
934 stream->zfree = Z_NULL;
935 stream->opaque = Z_NULL;
936 stream->next_in = Z_NULL;
939 * zlib magic - add 32 for auto-detection of gz header or not,
940 * if we decide to store files in a gzip friendly format.
945 if (inflateInit2(stream, wbits) != Z_OK)
951 struct inflate_chunk_iter {
960 static void finish_chunk(z_stream *stream, FILE *f,
961 struct inflate_chunk_iter *iter)
965 ret = inflateEnd(stream);
967 log_err("fio: failed to end log inflation seq %d (%d)\n",
970 flush_samples(f, iter->buf, iter->buf_used);
973 iter->buf_size = iter->buf_used = 0;
977 * Iterative chunk inflation. Handles cases where we cross into a new
978 * sequence, doing flush finish of previous chunk if needed.
980 static size_t inflate_chunk(struct iolog_compress *ic, int gz_hdr, FILE *f,
981 z_stream *stream, struct inflate_chunk_iter *iter)
985 dprint(FD_COMPRESS, "inflate chunk size=%lu, seq=%u\n",
986 (unsigned long) ic->len, ic->seq);
988 if (ic->seq != iter->seq) {
990 finish_chunk(stream, f, iter);
992 z_stream_init(stream, gz_hdr);
996 stream->avail_in = ic->len;
997 stream->next_in = ic->buf;
999 if (!iter->buf_size) {
1000 iter->buf_size = iter->chunk_sz;
1001 iter->buf = malloc(iter->buf_size);
1004 while (stream->avail_in) {
1005 size_t this_out = iter->buf_size - iter->buf_used;
1008 stream->avail_out = this_out;
1009 stream->next_out = iter->buf + iter->buf_used;
1011 err = inflate(stream, Z_NO_FLUSH);
1013 log_err("fio: failed inflating log: %d\n", err);
1018 iter->buf_used += this_out - stream->avail_out;
1020 if (!stream->avail_out) {
1021 iter->buf_size += iter->chunk_sz;
1022 iter->buf = realloc(iter->buf, iter->buf_size);
1026 if (err == Z_STREAM_END)
1030 ret = (void *) stream->next_in - ic->buf;
1032 dprint(FD_COMPRESS, "inflated to size=%lu\n", (unsigned long) iter->buf_size);
1038 * Inflate stored compressed chunks, or write them directly to the log
1039 * file if so instructed.
1041 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1043 struct inflate_chunk_iter iter = { .chunk_sz = log->log_gz, };
1046 while (!flist_empty(&log->chunk_list)) {
1047 struct iolog_compress *ic;
1049 ic = flist_first_entry(&log->chunk_list, struct iolog_compress, list);
1050 flist_del(&ic->list);
1052 if (log->log_gz_store) {
1055 dprint(FD_COMPRESS, "log write chunk size=%lu, "
1056 "seq=%u\n", (unsigned long) ic->len, ic->seq);
1058 ret = fwrite(ic->buf, ic->len, 1, f);
1059 if (ret != 1 || ferror(f)) {
1061 log_err("fio: error writing compressed log\n");
1064 inflate_chunk(ic, log->log_gz_store, f, &stream, &iter);
1070 finish_chunk(&stream, f, &iter);
1078 * Open compressed log file and decompress the stored chunks and
1079 * write them to stdout. The chunks are stored sequentially in the
1080 * file, so we iterate over them and do them one-by-one.
1082 int iolog_file_inflate(const char *file)
1084 struct inflate_chunk_iter iter = { .chunk_sz = 64 * 1024 * 1024, };
1085 struct iolog_compress ic;
1093 f = fopen(file, "r");
1099 if (stat(file, &sb) < 0) {
1105 ic.buf = buf = malloc(sb.st_size);
1106 ic.len = sb.st_size;
1109 ret = fread(ic.buf, ic.len, 1, f);
1110 if (ret == 0 && ferror(f)) {
1115 } else if (ferror(f) || (!feof(f) && ret != 1)) {
1116 log_err("fio: short read on reading log\n");
1125 * Each chunk will return Z_STREAM_END. We don't know how many
1126 * chunks are in the file, so we just keep looping and incrementing
1127 * the sequence number until we have consumed the whole compressed
1134 iret = inflate_chunk(&ic, 1, stdout, &stream, &iter);
1147 finish_chunk(&stream, stdout, &iter);
1157 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1162 int iolog_file_inflate(const char *file)
1164 log_err("fio: log inflation not possible without zlib\n");
1170 void flush_log(struct io_log *log, bool do_append)
1176 f = fopen(log->filename, "w");
1178 f = fopen(log->filename, "a");
1180 perror("fopen log");
1184 buf = set_file_buffer(f);
1186 inflate_gz_chunks(log, f);
1188 while (!flist_empty(&log->io_logs)) {
1189 struct io_logs *cur_log;
1191 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1192 flist_del_init(&cur_log->list);
1194 if (log->td && log == log->td->clat_hist_log)
1195 flush_hist_samples(f, log->hist_coarseness, cur_log->log,
1196 log_sample_sz(log, cur_log));
1198 flush_samples(f, cur_log->log, log_sample_sz(log, cur_log));
1204 clear_file_buffer(buf);
1207 static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
1209 if (td->flags & TD_F_COMPRESS_LOG)
1213 if (fio_trylock_file(log->filename))
1216 fio_lock_file(log->filename);
1218 if (td->client_type == FIO_CLIENT_TYPE_GUI || is_backend)
1219 fio_send_iolog(td, log, log->filename);
1221 flush_log(log, !td->o.per_job_logs);
1223 fio_unlock_file(log->filename);
1228 size_t log_chunk_sizes(struct io_log *log)
1230 struct flist_head *entry;
1233 if (flist_empty(&log->chunk_list))
1237 pthread_mutex_lock(&log->chunk_lock);
1238 flist_for_each(entry, &log->chunk_list) {
1239 struct iolog_compress *c;
1241 c = flist_entry(entry, struct iolog_compress, list);
1244 pthread_mutex_unlock(&log->chunk_lock);
1250 static void iolog_put_deferred(struct io_log *log, void *ptr)
1255 pthread_mutex_lock(&log->deferred_free_lock);
1256 if (log->deferred < IOLOG_MAX_DEFER) {
1257 log->deferred_items[log->deferred] = ptr;
1259 } else if (!fio_did_warn(FIO_WARN_IOLOG_DROP))
1260 log_err("fio: had to drop log entry free\n");
1261 pthread_mutex_unlock(&log->deferred_free_lock);
1264 static void iolog_free_deferred(struct io_log *log)
1271 pthread_mutex_lock(&log->deferred_free_lock);
1273 for (i = 0; i < log->deferred; i++) {
1274 free(log->deferred_items[i]);
1275 log->deferred_items[i] = NULL;
1279 pthread_mutex_unlock(&log->deferred_free_lock);
1282 static int gz_work(struct iolog_flush_data *data)
1284 struct iolog_compress *c = NULL;
1285 struct flist_head list;
1291 INIT_FLIST_HEAD(&list);
1293 memset(&stream, 0, sizeof(stream));
1294 stream.zalloc = Z_NULL;
1295 stream.zfree = Z_NULL;
1296 stream.opaque = Z_NULL;
1298 ret = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
1300 log_err("fio: failed to init gz stream\n");
1304 seq = ++data->log->chunk_seq;
1306 stream.next_in = (void *) data->samples;
1307 stream.avail_in = data->nr_samples * log_entry_sz(data->log);
1309 dprint(FD_COMPRESS, "deflate input size=%lu, seq=%u, log=%s\n",
1310 (unsigned long) stream.avail_in, seq,
1311 data->log->filename);
1314 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1315 (unsigned long) c->len);
1316 c = get_new_chunk(seq);
1317 stream.avail_out = GZ_CHUNK;
1318 stream.next_out = c->buf;
1319 ret = deflate(&stream, Z_NO_FLUSH);
1321 log_err("fio: deflate log (%d)\n", ret);
1326 c->len = GZ_CHUNK - stream.avail_out;
1327 flist_add_tail(&c->list, &list);
1329 } while (stream.avail_in);
1331 stream.next_out = c->buf + c->len;
1332 stream.avail_out = GZ_CHUNK - c->len;
1334 ret = deflate(&stream, Z_FINISH);
1337 * Z_BUF_ERROR is special, it just means we need more
1338 * output space. We'll handle that below. Treat any other
1341 if (ret != Z_BUF_ERROR) {
1342 log_err("fio: deflate log (%d)\n", ret);
1343 flist_del(&c->list);
1350 c->len = GZ_CHUNK - stream.avail_out;
1352 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq, (unsigned long) c->len);
1354 if (ret != Z_STREAM_END) {
1356 c = get_new_chunk(seq);
1357 stream.avail_out = GZ_CHUNK;
1358 stream.next_out = c->buf;
1359 ret = deflate(&stream, Z_FINISH);
1360 c->len = GZ_CHUNK - stream.avail_out;
1362 flist_add_tail(&c->list, &list);
1363 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1364 (unsigned long) c->len);
1365 } while (ret != Z_STREAM_END);
1368 dprint(FD_COMPRESS, "deflated to size=%lu\n", (unsigned long) total);
1370 ret = deflateEnd(&stream);
1372 log_err("fio: deflateEnd %d\n", ret);
1374 iolog_put_deferred(data->log, data->samples);
1376 if (!flist_empty(&list)) {
1377 pthread_mutex_lock(&data->log->chunk_lock);
1378 flist_splice_tail(&list, &data->log->chunk_list);
1379 pthread_mutex_unlock(&data->log->chunk_lock);
1388 while (!flist_empty(&list)) {
1389 c = flist_first_entry(list.next, struct iolog_compress, list);
1390 flist_del(&c->list);
1398 * Invoked from our compress helper thread, when logging would have exceeded
1399 * the specified memory limitation. Compresses the previously stored
1402 static int gz_work_async(struct submit_worker *sw, struct workqueue_work *work)
1404 return gz_work(container_of(work, struct iolog_flush_data, work));
1407 static int gz_init_worker(struct submit_worker *sw)
1409 struct thread_data *td = sw->wq->td;
1411 if (!fio_option_is_set(&td->o, log_gz_cpumask))
1414 if (fio_setaffinity(gettid(), td->o.log_gz_cpumask) == -1) {
1415 log_err("gz: failed to set CPU affinity\n");
1422 static struct workqueue_ops log_compress_wq_ops = {
1423 .fn = gz_work_async,
1424 .init_worker_fn = gz_init_worker,
1428 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1430 if (!(td->flags & TD_F_COMPRESS_LOG))
1433 workqueue_init(td, &td->log_compress_wq, &log_compress_wq_ops, 1, sk_out);
1437 void iolog_compress_exit(struct thread_data *td)
1439 if (!(td->flags & TD_F_COMPRESS_LOG))
1442 workqueue_exit(&td->log_compress_wq);
1446 * Queue work item to compress the existing log entries. We reset the
1447 * current log to a small size, and reference the existing log in the
1448 * data that we queue for compression. Once compression has been done,
1449 * this old log is freed. If called with finish == true, will not return
1450 * until the log compression has completed, and will flush all previous
1453 static int iolog_flush(struct io_log *log)
1455 struct iolog_flush_data *data;
1457 data = malloc(sizeof(*data));
1464 while (!flist_empty(&log->io_logs)) {
1465 struct io_logs *cur_log;
1467 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1468 flist_del_init(&cur_log->list);
1470 data->samples = cur_log->log;
1471 data->nr_samples = cur_log->nr_samples;
1482 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1484 struct iolog_flush_data *data;
1486 data = smalloc(sizeof(*data));
1492 data->samples = cur_log->log;
1493 data->nr_samples = cur_log->nr_samples;
1496 cur_log->nr_samples = cur_log->max_samples = 0;
1497 cur_log->log = NULL;
1499 workqueue_enqueue(&log->td->log_compress_wq, &data->work);
1501 iolog_free_deferred(log);
1507 static int iolog_flush(struct io_log *log)
1512 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1517 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1522 void iolog_compress_exit(struct thread_data *td)
1528 struct io_logs *iolog_cur_log(struct io_log *log)
1530 if (flist_empty(&log->io_logs))
1533 return flist_last_entry(&log->io_logs, struct io_logs, list);
1536 uint64_t iolog_nr_samples(struct io_log *iolog)
1538 struct flist_head *entry;
1541 flist_for_each(entry, &iolog->io_logs) {
1542 struct io_logs *cur_log;
1544 cur_log = flist_entry(entry, struct io_logs, list);
1545 ret += cur_log->nr_samples;
1551 static int __write_log(struct thread_data *td, struct io_log *log, int try)
1554 return finish_log(td, log, try);
1559 static int write_iops_log(struct thread_data *td, int try, bool unit_log)
1563 if (per_unit_log(td->iops_log) != unit_log)
1566 ret = __write_log(td, td->iops_log, try);
1568 td->iops_log = NULL;
1573 static int write_slat_log(struct thread_data *td, int try, bool unit_log)
1580 ret = __write_log(td, td->slat_log, try);
1582 td->slat_log = NULL;
1587 static int write_clat_log(struct thread_data *td, int try, bool unit_log)
1594 ret = __write_log(td, td->clat_log, try);
1596 td->clat_log = NULL;
1601 static int write_clat_hist_log(struct thread_data *td, int try, bool unit_log)
1608 ret = __write_log(td, td->clat_hist_log, try);
1610 td->clat_hist_log = NULL;
1615 static int write_lat_log(struct thread_data *td, int try, bool unit_log)
1622 ret = __write_log(td, td->lat_log, try);
1629 static int write_bandw_log(struct thread_data *td, int try, bool unit_log)
1633 if (per_unit_log(td->bw_log) != unit_log)
1636 ret = __write_log(td, td->bw_log, try);
1649 CLAT_HIST_LOG_MASK = 32,
1656 int (*fn)(struct thread_data *, int, bool);
1659 static struct log_type log_types[] = {
1661 .mask = BW_LOG_MASK,
1662 .fn = write_bandw_log,
1665 .mask = LAT_LOG_MASK,
1666 .fn = write_lat_log,
1669 .mask = SLAT_LOG_MASK,
1670 .fn = write_slat_log,
1673 .mask = CLAT_LOG_MASK,
1674 .fn = write_clat_log,
1677 .mask = IOPS_LOG_MASK,
1678 .fn = write_iops_log,
1681 .mask = CLAT_HIST_LOG_MASK,
1682 .fn = write_clat_hist_log,
1686 void td_writeout_logs(struct thread_data *td, bool unit_logs)
1688 unsigned int log_mask = 0;
1689 unsigned int log_left = ALL_LOG_NR;
1692 old_state = td_bump_runstate(td, TD_FINISHING);
1694 finalize_logs(td, unit_logs);
1697 int prev_log_left = log_left;
1699 for (i = 0; i < ALL_LOG_NR && log_left; i++) {
1700 struct log_type *lt = &log_types[i];
1703 if (!(log_mask & lt->mask)) {
1704 ret = lt->fn(td, log_left != 1, unit_logs);
1707 log_mask |= lt->mask;
1712 if (prev_log_left == log_left)
1716 td_restore_runstate(td, old_state);
1719 void fio_writeout_logs(bool unit_logs)
1721 struct thread_data *td;
1725 td_writeout_logs(td, unit_logs);