2 * Code related to writing an iolog of what a thread is doing, and to
3 * later read that back and replay
23 #include <netinet/in.h>
24 #include <netinet/tcp.h>
25 #include <arpa/inet.h>
27 #include <sys/socket.h>
30 static int iolog_flush(struct io_log *log);
32 static const char iolog_ver2[] = "fio version 2 iolog";
34 void queue_io_piece(struct thread_data *td, struct io_piece *ipo)
36 flist_add_tail(&ipo->list, &td->io_log_list);
37 td->total_io_size += ipo->len;
40 void log_io_u(const struct thread_data *td, const struct io_u *io_u)
42 if (!td->o.write_iolog_file)
45 fprintf(td->iolog_f, "%s %s %llu %llu\n", io_u->file->file_name,
46 io_ddir_name(io_u->ddir),
47 io_u->offset, io_u->buflen);
50 void log_file(struct thread_data *td, struct fio_file *f,
51 enum file_log_act what)
53 const char *act[] = { "add", "open", "close" };
57 if (!td->o.write_iolog_file)
62 * this happens on the pre-open/close done before the job starts
67 fprintf(td->iolog_f, "%s %s\n", f->file_name, act[what]);
70 static void iolog_delay(struct thread_data *td, unsigned long delay)
72 uint64_t usec = utime_since_now(&td->last_issue);
73 unsigned long orig_delay = delay;
77 if (delay < td->time_offset) {
82 delay -= td->time_offset;
88 fio_gettime(&ts, NULL);
89 while (delay && !td->terminate) {
91 if (this_delay > 500000)
94 usec_sleep(td, this_delay);
98 usec = utime_since_now(&ts);
99 if (usec > orig_delay)
100 td->time_offset = usec - orig_delay;
105 static int ipo_special(struct thread_data *td, struct io_piece *ipo)
113 if (ipo->ddir != DDIR_INVAL)
116 f = td->files[ipo->fileno];
118 switch (ipo->file_action) {
119 case FIO_LOG_OPEN_FILE:
120 if (td->o.replay_redirect && fio_file_open(f)) {
121 dprint(FD_FILE, "iolog: ignoring re-open of file %s\n",
125 ret = td_io_open_file(td, f);
128 td_verror(td, ret, "iolog open file");
130 case FIO_LOG_CLOSE_FILE:
131 td_io_close_file(td, f);
133 case FIO_LOG_UNLINK_FILE:
134 td_io_unlink_file(td, f);
137 log_err("fio: bad file action %d\n", ipo->file_action);
144 static bool read_iolog2(struct thread_data *td);
146 int read_iolog_get(struct thread_data *td, struct io_u *io_u)
148 struct io_piece *ipo;
149 unsigned long elapsed;
151 while (!flist_empty(&td->io_log_list)) {
153 if (td->o.read_iolog_chunked) {
154 if (td->io_log_checkmark == td->io_log_current) {
155 if (!read_iolog2(td))
158 td->io_log_current--;
160 ipo = flist_first_entry(&td->io_log_list, struct io_piece, list);
161 flist_del(&ipo->list);
162 remove_trim_entry(td, ipo);
164 ret = ipo_special(td, ipo);
168 } else if (ret > 0) {
173 io_u->ddir = ipo->ddir;
174 if (ipo->ddir != DDIR_WAIT) {
175 io_u->offset = ipo->offset;
176 io_u->buflen = ipo->len;
177 io_u->file = td->files[ipo->fileno];
178 get_file(io_u->file);
179 dprint(FD_IO, "iolog: get %llu/%llu/%s\n", io_u->offset,
180 io_u->buflen, io_u->file->file_name);
182 iolog_delay(td, ipo->delay);
184 elapsed = mtime_since_genesis();
185 if (ipo->delay > elapsed)
186 usec_sleep(td, (ipo->delay - elapsed) * 1000);
191 if (io_u->ddir != DDIR_WAIT)
199 void prune_io_piece_log(struct thread_data *td)
201 struct io_piece *ipo;
202 struct fio_rb_node *n;
204 while ((n = rb_first(&td->io_hist_tree)) != NULL) {
205 ipo = rb_entry(n, struct io_piece, rb_node);
206 rb_erase(n, &td->io_hist_tree);
207 remove_trim_entry(td, ipo);
212 while (!flist_empty(&td->io_hist_list)) {
213 ipo = flist_first_entry(&td->io_hist_list, struct io_piece, list);
214 flist_del(&ipo->list);
215 remove_trim_entry(td, ipo);
222 * log a successful write, so we can unwind the log for verify
224 void log_io_piece(struct thread_data *td, struct io_u *io_u)
226 struct fio_rb_node **p, *parent;
227 struct io_piece *ipo, *__ipo;
229 ipo = calloc(1, sizeof(struct io_piece));
231 ipo->file = io_u->file;
232 ipo->offset = io_u->offset;
233 ipo->len = io_u->buflen;
234 ipo->numberio = io_u->numberio;
235 ipo->flags = IP_F_IN_FLIGHT;
239 if (io_u_should_trim(td, io_u)) {
240 flist_add_tail(&ipo->trim_list, &td->trim_list);
245 * Only sort writes if we don't have a random map in which case we need
246 * to check for duplicate blocks and drop the old one, which we rely on
247 * the rb insert/lookup for handling.
249 if (file_randommap(td, ipo->file)) {
250 INIT_FLIST_HEAD(&ipo->list);
251 flist_add_tail(&ipo->list, &td->io_hist_list);
252 ipo->flags |= IP_F_ONLIST;
257 RB_CLEAR_NODE(&ipo->rb_node);
260 * Sort the entry into the verification list
263 p = &td->io_hist_tree.rb_node;
269 __ipo = rb_entry(parent, struct io_piece, rb_node);
270 if (ipo->file < __ipo->file)
272 else if (ipo->file > __ipo->file)
274 else if (ipo->offset < __ipo->offset) {
276 overlap = ipo->offset + ipo->len > __ipo->offset;
278 else if (ipo->offset > __ipo->offset) {
280 overlap = __ipo->offset + __ipo->len > ipo->offset;
286 dprint(FD_IO, "iolog: overlap %llu/%lu, %llu/%lu\n",
287 __ipo->offset, __ipo->len,
288 ipo->offset, ipo->len);
290 rb_erase(parent, &td->io_hist_tree);
291 remove_trim_entry(td, __ipo);
292 if (!(__ipo->flags & IP_F_IN_FLIGHT))
298 rb_link_node(&ipo->rb_node, parent, p);
299 rb_insert_color(&ipo->rb_node, &td->io_hist_tree);
300 ipo->flags |= IP_F_ONRB;
304 void unlog_io_piece(struct thread_data *td, struct io_u *io_u)
306 struct io_piece *ipo = io_u->ipo;
308 if (td->ts.nr_block_infos) {
309 uint32_t *info = io_u_block_info(td, io_u);
310 if (BLOCK_INFO_STATE(*info) < BLOCK_STATE_TRIM_FAILURE) {
311 if (io_u->ddir == DDIR_TRIM)
312 *info = BLOCK_INFO_SET_STATE(*info,
313 BLOCK_STATE_TRIM_FAILURE);
314 else if (io_u->ddir == DDIR_WRITE)
315 *info = BLOCK_INFO_SET_STATE(*info,
316 BLOCK_STATE_WRITE_FAILURE);
323 if (ipo->flags & IP_F_ONRB)
324 rb_erase(&ipo->rb_node, &td->io_hist_tree);
325 else if (ipo->flags & IP_F_ONLIST)
326 flist_del(&ipo->list);
333 void trim_io_piece(const struct io_u *io_u)
335 struct io_piece *ipo = io_u->ipo;
340 ipo->len = io_u->xfer_buflen - io_u->resid;
343 void write_iolog_close(struct thread_data *td)
349 td->iolog_buf = NULL;
353 * Read version 2 iolog data. It is enhanced to include per-file logging,
356 static bool read_iolog2(struct thread_data *td)
358 unsigned long long offset;
360 int reads, writes, waits, fileno = 0, file_action = 0; /* stupid gcc */
361 char *rfname, *fname, *act;
364 int64_t items_to_fetch = 0;
366 if (td->o.read_iolog_chunked) {
367 if (td->io_log_highmark == 0) {
374 fio_gettime(&now, NULL);
375 elapsed = ntime_since(&td->io_log_highmark_time, &now);
377 for_1s = (td->io_log_highmark - td->io_log_current) * 1000000000 / elapsed;
378 items_to_fetch = for_1s - td->io_log_current;
381 if (items_to_fetch < 0)
384 td->io_log_highmark = td->io_log_current + items_to_fetch;
385 td->io_log_checkmark = (td->io_log_highmark + 1) / 2;
386 fio_gettime(&td->io_log_highmark_time, NULL);
388 if (items_to_fetch == 0)
393 * Read in the read iolog and store it, reuse the infrastructure
394 * for doing verifications.
397 rfname = fname = malloc(256+16);
398 act = malloc(256+16);
400 reads = writes = waits = 0;
401 while ((p = fgets(str, 4096, td->io_log_rfile)) != NULL) {
402 struct io_piece *ipo;
405 r = sscanf(p, "%256s %256s %llu %u", rfname, act, &offset,
408 if (td->o.replay_redirect)
409 fname = td->o.replay_redirect;
415 if (!strcmp(act, "wait"))
417 else if (!strcmp(act, "read"))
419 else if (!strcmp(act, "write"))
421 else if (!strcmp(act, "sync"))
423 else if (!strcmp(act, "datasync"))
425 else if (!strcmp(act, "trim"))
428 log_err("fio: bad iolog file action: %s\n",
432 fileno = get_fileno(td, fname);
435 if (!strcmp(act, "add")) {
436 if (td->o.replay_redirect &&
437 get_fileno(td, fname) != -1) {
438 dprint(FD_FILE, "iolog: ignoring"
439 " re-add of file %s\n", fname);
441 fileno = add_file(td, fname, 0, 1);
442 file_action = FIO_LOG_ADD_FILE;
445 } else if (!strcmp(act, "open")) {
446 fileno = get_fileno(td, fname);
447 file_action = FIO_LOG_OPEN_FILE;
448 } else if (!strcmp(act, "close")) {
449 fileno = get_fileno(td, fname);
450 file_action = FIO_LOG_CLOSE_FILE;
452 log_err("fio: bad iolog file action: %s\n",
457 log_err("bad iolog2: %s\n", p);
463 else if (rw == DDIR_WRITE) {
465 * Don't add a write for ro mode
470 } else if (rw == DDIR_WAIT) {
474 } else if (rw == DDIR_INVAL) {
475 } else if (!ddir_sync(rw)) {
476 log_err("bad ddir: %d\n", rw);
483 ipo = calloc(1, sizeof(*ipo));
486 if (rw == DDIR_WAIT) {
489 if (td->o.replay_scale)
490 ipo->offset = offset / td->o.replay_scale;
492 ipo->offset = offset;
493 ipo_bytes_align(td->o.replay_align, ipo);
496 if (rw != DDIR_INVAL && bytes > td->o.max_bs[rw])
497 td->o.max_bs[rw] = bytes;
498 ipo->fileno = fileno;
499 ipo->file_action = file_action;
503 queue_io_piece(td, ipo);
505 if (td->o.read_iolog_chunked) {
506 td->io_log_current++;
508 if (items_to_fetch == 0)
517 if (td->o.read_iolog_chunked) {
518 td->io_log_highmark = td->io_log_current;
519 td->io_log_checkmark = (td->io_log_highmark + 1) / 2;
520 fio_gettime(&td->io_log_highmark_time, NULL);
523 if (writes && read_only) {
524 log_err("fio: <%s> skips replay of %d writes due to"
525 " read-only\n", td->o.name, writes);
529 if (td->o.read_iolog_chunked) {
530 if (td->io_log_current == 0) {
533 td->o.td_ddir = TD_DDIR_RW;
537 if (!reads && !writes && !waits)
539 else if (reads && !writes)
540 td->o.td_ddir = TD_DDIR_READ;
541 else if (!reads && writes)
542 td->o.td_ddir = TD_DDIR_WRITE;
544 td->o.td_ddir = TD_DDIR_RW;
549 static bool is_socket(const char *path)
552 int r = stat(path, &buf);
556 return S_ISSOCK(buf.st_mode);
559 static int open_socket(const char *path)
561 int fd = socket(AF_UNIX, SOCK_STREAM, 0);
562 struct sockaddr_un addr;
565 addr.sun_family = AF_UNIX;
566 strncpy(addr.sun_path, path, sizeof(addr.sun_path));
567 if (connect(fd, (const struct sockaddr *)&addr, strlen(path) + sizeof(addr.sun_family)) == 0)
575 * open iolog, check version, and call appropriate parser
577 static bool init_iolog_read(struct thread_data *td)
579 char buffer[256], *p;
582 if (is_socket(td->o.read_iolog_file)) {
583 int fd = open_socket(td->o.read_iolog_file);
588 f = fopen(td->o.read_iolog_file, "r");
590 perror("fopen read iolog");
594 p = fgets(buffer, sizeof(buffer), f);
596 td_verror(td, errno, "iolog read");
597 log_err("fio: unable to read iolog\n");
601 td->io_log_rfile = f;
603 * version 2 of the iolog stores a specific string as the
604 * first line, check for that
606 if (!strncmp(iolog_ver2, buffer, strlen(iolog_ver2))) {
607 free_release_files(td);
608 ret = read_iolog2(td);
611 log_err("fio: iolog version 1 is no longer supported\n");
619 * Set up a log for storing io patterns.
621 static bool init_iolog_write(struct thread_data *td)
627 f = fopen(td->o.write_iolog_file, "a");
629 perror("fopen write iolog");
634 * That's it for writing, setup a log buffer and we're done.
637 td->iolog_buf = malloc(8192);
638 setvbuf(f, td->iolog_buf, _IOFBF, 8192);
641 * write our version line
643 if (fprintf(f, "%s\n", iolog_ver2) < 0) {
644 perror("iolog init\n");
649 * add all known files
651 for_each_file(td, ff, i)
652 log_file(td, ff, FIO_LOG_ADD_FILE);
657 bool init_iolog(struct thread_data *td)
661 if (td->o.read_iolog_file) {
665 * Check if it's a blktrace file and load that if possible.
666 * Otherwise assume it's a normal log file and load that.
668 if (is_blktrace(td->o.read_iolog_file, &need_swap))
669 ret = load_blktrace(td, td->o.read_iolog_file, need_swap);
671 ret = init_iolog_read(td);
672 } else if (td->o.write_iolog_file)
673 ret = init_iolog_write(td);
678 td_verror(td, EINVAL, "failed initializing iolog");
683 void setup_log(struct io_log **log, struct log_params *p,
684 const char *filename)
688 struct io_u_plat_entry *entry;
689 struct flist_head *list;
691 l = scalloc(1, sizeof(*l));
692 INIT_FLIST_HEAD(&l->io_logs);
693 l->log_type = p->log_type;
694 l->log_offset = p->log_offset;
695 l->log_gz = p->log_gz;
696 l->log_gz_store = p->log_gz_store;
697 l->avg_msec = p->avg_msec;
698 l->hist_msec = p->hist_msec;
699 l->hist_coarseness = p->hist_coarseness;
700 l->filename = strdup(filename);
703 /* Initialize histogram lists for each r/w direction,
704 * with initial io_u_plat of all zeros:
706 for (i = 0; i < DDIR_RWDIR_CNT; i++) {
707 list = &l->hist_window[i].list;
708 INIT_FLIST_HEAD(list);
709 entry = calloc(1, sizeof(struct io_u_plat_entry));
710 flist_add(&entry->list, list);
713 if (l->td && l->td->o.io_submit_mode != IO_MODE_OFFLOAD) {
716 __p = calloc(1, sizeof(*l->pending));
717 __p->max_samples = DEF_LOG_ENTRIES;
718 __p->log = calloc(__p->max_samples, log_entry_sz(l));
723 l->log_ddir_mask = LOG_OFFSET_SAMPLE_BIT;
725 INIT_FLIST_HEAD(&l->chunk_list);
727 if (l->log_gz && !p->td)
729 else if (l->log_gz || l->log_gz_store) {
730 mutex_init_pshared(&l->chunk_lock);
731 mutex_init_pshared(&l->deferred_free_lock);
732 p->td->flags |= TD_F_COMPRESS_LOG;
738 #ifdef CONFIG_SETVBUF
739 static void *set_file_buffer(FILE *f)
741 size_t size = 1048576;
745 setvbuf(f, buf, _IOFBF, size);
749 static void clear_file_buffer(void *buf)
754 static void *set_file_buffer(FILE *f)
759 static void clear_file_buffer(void *buf)
764 void free_log(struct io_log *log)
766 while (!flist_empty(&log->io_logs)) {
767 struct io_logs *cur_log;
769 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
770 flist_del_init(&cur_log->list);
776 free(log->pending->log);
786 uint64_t hist_sum(int j, int stride, uint64_t *io_u_plat,
787 uint64_t *io_u_plat_last)
792 if (io_u_plat_last) {
793 for (k = sum = 0; k < stride; k++)
794 sum += io_u_plat[j + k] - io_u_plat_last[j + k];
796 for (k = sum = 0; k < stride; k++)
797 sum += io_u_plat[j + k];
803 static void flush_hist_samples(FILE *f, int hist_coarseness, void *samples,
804 uint64_t sample_size)
808 uint64_t i, j, nr_samples;
809 struct io_u_plat_entry *entry, *entry_before;
811 uint64_t *io_u_plat_before;
813 int stride = 1 << hist_coarseness;
818 s = __get_sample(samples, 0, 0);
819 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
821 nr_samples = sample_size / __log_entry_sz(log_offset);
823 for (i = 0; i < nr_samples; i++) {
824 s = __get_sample(samples, log_offset, i);
826 entry = s->data.plat_entry;
827 io_u_plat = entry->io_u_plat;
829 entry_before = flist_first_entry(&entry->list, struct io_u_plat_entry, list);
830 io_u_plat_before = entry_before->io_u_plat;
832 fprintf(f, "%lu, %u, %llu, ", (unsigned long) s->time,
833 io_sample_ddir(s), (unsigned long long) s->bs);
834 for (j = 0; j < FIO_IO_U_PLAT_NR - stride; j += stride) {
835 fprintf(f, "%llu, ", (unsigned long long)
836 hist_sum(j, stride, io_u_plat, io_u_plat_before));
838 fprintf(f, "%llu\n", (unsigned long long)
839 hist_sum(FIO_IO_U_PLAT_NR - stride, stride, io_u_plat,
842 flist_del(&entry_before->list);
847 void flush_samples(FILE *f, void *samples, uint64_t sample_size)
851 uint64_t i, nr_samples;
856 s = __get_sample(samples, 0, 0);
857 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
859 nr_samples = sample_size / __log_entry_sz(log_offset);
861 for (i = 0; i < nr_samples; i++) {
862 s = __get_sample(samples, log_offset, i);
865 fprintf(f, "%lu, %" PRId64 ", %u, %llu\n",
866 (unsigned long) s->time,
868 io_sample_ddir(s), (unsigned long long) s->bs);
870 struct io_sample_offset *so = (void *) s;
872 fprintf(f, "%lu, %" PRId64 ", %u, %llu, %llu\n",
873 (unsigned long) s->time,
875 io_sample_ddir(s), (unsigned long long) s->bs,
876 (unsigned long long) so->offset);
883 struct iolog_flush_data {
884 struct workqueue_work work;
891 #define GZ_CHUNK 131072
893 static struct iolog_compress *get_new_chunk(unsigned int seq)
895 struct iolog_compress *c;
897 c = malloc(sizeof(*c));
898 INIT_FLIST_HEAD(&c->list);
899 c->buf = malloc(GZ_CHUNK);
905 static void free_chunk(struct iolog_compress *ic)
911 static int z_stream_init(z_stream *stream, int gz_hdr)
915 memset(stream, 0, sizeof(*stream));
916 stream->zalloc = Z_NULL;
917 stream->zfree = Z_NULL;
918 stream->opaque = Z_NULL;
919 stream->next_in = Z_NULL;
922 * zlib magic - add 32 for auto-detection of gz header or not,
923 * if we decide to store files in a gzip friendly format.
928 if (inflateInit2(stream, wbits) != Z_OK)
934 struct inflate_chunk_iter {
943 static void finish_chunk(z_stream *stream, FILE *f,
944 struct inflate_chunk_iter *iter)
948 ret = inflateEnd(stream);
950 log_err("fio: failed to end log inflation seq %d (%d)\n",
953 flush_samples(f, iter->buf, iter->buf_used);
956 iter->buf_size = iter->buf_used = 0;
960 * Iterative chunk inflation. Handles cases where we cross into a new
961 * sequence, doing flush finish of previous chunk if needed.
963 static size_t inflate_chunk(struct iolog_compress *ic, int gz_hdr, FILE *f,
964 z_stream *stream, struct inflate_chunk_iter *iter)
968 dprint(FD_COMPRESS, "inflate chunk size=%lu, seq=%u\n",
969 (unsigned long) ic->len, ic->seq);
971 if (ic->seq != iter->seq) {
973 finish_chunk(stream, f, iter);
975 z_stream_init(stream, gz_hdr);
979 stream->avail_in = ic->len;
980 stream->next_in = ic->buf;
982 if (!iter->buf_size) {
983 iter->buf_size = iter->chunk_sz;
984 iter->buf = malloc(iter->buf_size);
987 while (stream->avail_in) {
988 size_t this_out = iter->buf_size - iter->buf_used;
991 stream->avail_out = this_out;
992 stream->next_out = iter->buf + iter->buf_used;
994 err = inflate(stream, Z_NO_FLUSH);
996 log_err("fio: failed inflating log: %d\n", err);
1001 iter->buf_used += this_out - stream->avail_out;
1003 if (!stream->avail_out) {
1004 iter->buf_size += iter->chunk_sz;
1005 iter->buf = realloc(iter->buf, iter->buf_size);
1009 if (err == Z_STREAM_END)
1013 ret = (void *) stream->next_in - ic->buf;
1015 dprint(FD_COMPRESS, "inflated to size=%lu\n", (unsigned long) iter->buf_size);
1021 * Inflate stored compressed chunks, or write them directly to the log
1022 * file if so instructed.
1024 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1026 struct inflate_chunk_iter iter = { .chunk_sz = log->log_gz, };
1029 while (!flist_empty(&log->chunk_list)) {
1030 struct iolog_compress *ic;
1032 ic = flist_first_entry(&log->chunk_list, struct iolog_compress, list);
1033 flist_del(&ic->list);
1035 if (log->log_gz_store) {
1038 dprint(FD_COMPRESS, "log write chunk size=%lu, "
1039 "seq=%u\n", (unsigned long) ic->len, ic->seq);
1041 ret = fwrite(ic->buf, ic->len, 1, f);
1042 if (ret != 1 || ferror(f)) {
1044 log_err("fio: error writing compressed log\n");
1047 inflate_chunk(ic, log->log_gz_store, f, &stream, &iter);
1053 finish_chunk(&stream, f, &iter);
1061 * Open compressed log file and decompress the stored chunks and
1062 * write them to stdout. The chunks are stored sequentially in the
1063 * file, so we iterate over them and do them one-by-one.
1065 int iolog_file_inflate(const char *file)
1067 struct inflate_chunk_iter iter = { .chunk_sz = 64 * 1024 * 1024, };
1068 struct iolog_compress ic;
1076 f = fopen(file, "r");
1082 if (stat(file, &sb) < 0) {
1088 ic.buf = buf = malloc(sb.st_size);
1089 ic.len = sb.st_size;
1092 ret = fread(ic.buf, ic.len, 1, f);
1093 if (ret == 0 && ferror(f)) {
1098 } else if (ferror(f) || (!feof(f) && ret != 1)) {
1099 log_err("fio: short read on reading log\n");
1108 * Each chunk will return Z_STREAM_END. We don't know how many
1109 * chunks are in the file, so we just keep looping and incrementing
1110 * the sequence number until we have consumed the whole compressed
1117 iret = inflate_chunk(&ic, 1, stdout, &stream, &iter);
1130 finish_chunk(&stream, stdout, &iter);
1140 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1145 int iolog_file_inflate(const char *file)
1147 log_err("fio: log inflation not possible without zlib\n");
1153 void flush_log(struct io_log *log, bool do_append)
1159 f = fopen(log->filename, "w");
1161 f = fopen(log->filename, "a");
1163 perror("fopen log");
1167 buf = set_file_buffer(f);
1169 inflate_gz_chunks(log, f);
1171 while (!flist_empty(&log->io_logs)) {
1172 struct io_logs *cur_log;
1174 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1175 flist_del_init(&cur_log->list);
1177 if (log->td && log == log->td->clat_hist_log)
1178 flush_hist_samples(f, log->hist_coarseness, cur_log->log,
1179 log_sample_sz(log, cur_log));
1181 flush_samples(f, cur_log->log, log_sample_sz(log, cur_log));
1187 clear_file_buffer(buf);
1190 static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
1192 if (td->flags & TD_F_COMPRESS_LOG)
1196 if (fio_trylock_file(log->filename))
1199 fio_lock_file(log->filename);
1201 if (td->client_type == FIO_CLIENT_TYPE_GUI || is_backend)
1202 fio_send_iolog(td, log, log->filename);
1204 flush_log(log, !td->o.per_job_logs);
1206 fio_unlock_file(log->filename);
1211 size_t log_chunk_sizes(struct io_log *log)
1213 struct flist_head *entry;
1216 if (flist_empty(&log->chunk_list))
1220 pthread_mutex_lock(&log->chunk_lock);
1221 flist_for_each(entry, &log->chunk_list) {
1222 struct iolog_compress *c;
1224 c = flist_entry(entry, struct iolog_compress, list);
1227 pthread_mutex_unlock(&log->chunk_lock);
1233 static void iolog_put_deferred(struct io_log *log, void *ptr)
1238 pthread_mutex_lock(&log->deferred_free_lock);
1239 if (log->deferred < IOLOG_MAX_DEFER) {
1240 log->deferred_items[log->deferred] = ptr;
1242 } else if (!fio_did_warn(FIO_WARN_IOLOG_DROP))
1243 log_err("fio: had to drop log entry free\n");
1244 pthread_mutex_unlock(&log->deferred_free_lock);
1247 static void iolog_free_deferred(struct io_log *log)
1254 pthread_mutex_lock(&log->deferred_free_lock);
1256 for (i = 0; i < log->deferred; i++) {
1257 free(log->deferred_items[i]);
1258 log->deferred_items[i] = NULL;
1262 pthread_mutex_unlock(&log->deferred_free_lock);
1265 static int gz_work(struct iolog_flush_data *data)
1267 struct iolog_compress *c = NULL;
1268 struct flist_head list;
1274 INIT_FLIST_HEAD(&list);
1276 memset(&stream, 0, sizeof(stream));
1277 stream.zalloc = Z_NULL;
1278 stream.zfree = Z_NULL;
1279 stream.opaque = Z_NULL;
1281 ret = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
1283 log_err("fio: failed to init gz stream\n");
1287 seq = ++data->log->chunk_seq;
1289 stream.next_in = (void *) data->samples;
1290 stream.avail_in = data->nr_samples * log_entry_sz(data->log);
1292 dprint(FD_COMPRESS, "deflate input size=%lu, seq=%u, log=%s\n",
1293 (unsigned long) stream.avail_in, seq,
1294 data->log->filename);
1297 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1298 (unsigned long) c->len);
1299 c = get_new_chunk(seq);
1300 stream.avail_out = GZ_CHUNK;
1301 stream.next_out = c->buf;
1302 ret = deflate(&stream, Z_NO_FLUSH);
1304 log_err("fio: deflate log (%d)\n", ret);
1309 c->len = GZ_CHUNK - stream.avail_out;
1310 flist_add_tail(&c->list, &list);
1312 } while (stream.avail_in);
1314 stream.next_out = c->buf + c->len;
1315 stream.avail_out = GZ_CHUNK - c->len;
1317 ret = deflate(&stream, Z_FINISH);
1320 * Z_BUF_ERROR is special, it just means we need more
1321 * output space. We'll handle that below. Treat any other
1324 if (ret != Z_BUF_ERROR) {
1325 log_err("fio: deflate log (%d)\n", ret);
1326 flist_del(&c->list);
1333 c->len = GZ_CHUNK - stream.avail_out;
1335 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq, (unsigned long) c->len);
1337 if (ret != Z_STREAM_END) {
1339 c = get_new_chunk(seq);
1340 stream.avail_out = GZ_CHUNK;
1341 stream.next_out = c->buf;
1342 ret = deflate(&stream, Z_FINISH);
1343 c->len = GZ_CHUNK - stream.avail_out;
1345 flist_add_tail(&c->list, &list);
1346 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1347 (unsigned long) c->len);
1348 } while (ret != Z_STREAM_END);
1351 dprint(FD_COMPRESS, "deflated to size=%lu\n", (unsigned long) total);
1353 ret = deflateEnd(&stream);
1355 log_err("fio: deflateEnd %d\n", ret);
1357 iolog_put_deferred(data->log, data->samples);
1359 if (!flist_empty(&list)) {
1360 pthread_mutex_lock(&data->log->chunk_lock);
1361 flist_splice_tail(&list, &data->log->chunk_list);
1362 pthread_mutex_unlock(&data->log->chunk_lock);
1371 while (!flist_empty(&list)) {
1372 c = flist_first_entry(list.next, struct iolog_compress, list);
1373 flist_del(&c->list);
1381 * Invoked from our compress helper thread, when logging would have exceeded
1382 * the specified memory limitation. Compresses the previously stored
1385 static int gz_work_async(struct submit_worker *sw, struct workqueue_work *work)
1387 return gz_work(container_of(work, struct iolog_flush_data, work));
1390 static int gz_init_worker(struct submit_worker *sw)
1392 struct thread_data *td = sw->wq->td;
1394 if (!fio_option_is_set(&td->o, log_gz_cpumask))
1397 if (fio_setaffinity(gettid(), td->o.log_gz_cpumask) == -1) {
1398 log_err("gz: failed to set CPU affinity\n");
1405 static struct workqueue_ops log_compress_wq_ops = {
1406 .fn = gz_work_async,
1407 .init_worker_fn = gz_init_worker,
1411 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1413 if (!(td->flags & TD_F_COMPRESS_LOG))
1416 workqueue_init(td, &td->log_compress_wq, &log_compress_wq_ops, 1, sk_out);
1420 void iolog_compress_exit(struct thread_data *td)
1422 if (!(td->flags & TD_F_COMPRESS_LOG))
1425 workqueue_exit(&td->log_compress_wq);
1429 * Queue work item to compress the existing log entries. We reset the
1430 * current log to a small size, and reference the existing log in the
1431 * data that we queue for compression. Once compression has been done,
1432 * this old log is freed. If called with finish == true, will not return
1433 * until the log compression has completed, and will flush all previous
1436 static int iolog_flush(struct io_log *log)
1438 struct iolog_flush_data *data;
1440 data = malloc(sizeof(*data));
1447 while (!flist_empty(&log->io_logs)) {
1448 struct io_logs *cur_log;
1450 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1451 flist_del_init(&cur_log->list);
1453 data->samples = cur_log->log;
1454 data->nr_samples = cur_log->nr_samples;
1465 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1467 struct iolog_flush_data *data;
1469 data = smalloc(sizeof(*data));
1475 data->samples = cur_log->log;
1476 data->nr_samples = cur_log->nr_samples;
1479 cur_log->nr_samples = cur_log->max_samples = 0;
1480 cur_log->log = NULL;
1482 workqueue_enqueue(&log->td->log_compress_wq, &data->work);
1484 iolog_free_deferred(log);
1490 static int iolog_flush(struct io_log *log)
1495 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1500 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1505 void iolog_compress_exit(struct thread_data *td)
1511 struct io_logs *iolog_cur_log(struct io_log *log)
1513 if (flist_empty(&log->io_logs))
1516 return flist_last_entry(&log->io_logs, struct io_logs, list);
1519 uint64_t iolog_nr_samples(struct io_log *iolog)
1521 struct flist_head *entry;
1524 flist_for_each(entry, &iolog->io_logs) {
1525 struct io_logs *cur_log;
1527 cur_log = flist_entry(entry, struct io_logs, list);
1528 ret += cur_log->nr_samples;
1534 static int __write_log(struct thread_data *td, struct io_log *log, int try)
1537 return finish_log(td, log, try);
1542 static int write_iops_log(struct thread_data *td, int try, bool unit_log)
1546 if (per_unit_log(td->iops_log) != unit_log)
1549 ret = __write_log(td, td->iops_log, try);
1551 td->iops_log = NULL;
1556 static int write_slat_log(struct thread_data *td, int try, bool unit_log)
1563 ret = __write_log(td, td->slat_log, try);
1565 td->slat_log = NULL;
1570 static int write_clat_log(struct thread_data *td, int try, bool unit_log)
1577 ret = __write_log(td, td->clat_log, try);
1579 td->clat_log = NULL;
1584 static int write_clat_hist_log(struct thread_data *td, int try, bool unit_log)
1591 ret = __write_log(td, td->clat_hist_log, try);
1593 td->clat_hist_log = NULL;
1598 static int write_lat_log(struct thread_data *td, int try, bool unit_log)
1605 ret = __write_log(td, td->lat_log, try);
1612 static int write_bandw_log(struct thread_data *td, int try, bool unit_log)
1616 if (per_unit_log(td->bw_log) != unit_log)
1619 ret = __write_log(td, td->bw_log, try);
1632 CLAT_HIST_LOG_MASK = 32,
1639 int (*fn)(struct thread_data *, int, bool);
1642 static struct log_type log_types[] = {
1644 .mask = BW_LOG_MASK,
1645 .fn = write_bandw_log,
1648 .mask = LAT_LOG_MASK,
1649 .fn = write_lat_log,
1652 .mask = SLAT_LOG_MASK,
1653 .fn = write_slat_log,
1656 .mask = CLAT_LOG_MASK,
1657 .fn = write_clat_log,
1660 .mask = IOPS_LOG_MASK,
1661 .fn = write_iops_log,
1664 .mask = CLAT_HIST_LOG_MASK,
1665 .fn = write_clat_hist_log,
1669 void td_writeout_logs(struct thread_data *td, bool unit_logs)
1671 unsigned int log_mask = 0;
1672 unsigned int log_left = ALL_LOG_NR;
1675 old_state = td_bump_runstate(td, TD_FINISHING);
1677 finalize_logs(td, unit_logs);
1680 int prev_log_left = log_left;
1682 for (i = 0; i < ALL_LOG_NR && log_left; i++) {
1683 struct log_type *lt = &log_types[i];
1686 if (!(log_mask & lt->mask)) {
1687 ret = lt->fn(td, log_left != 1, unit_logs);
1690 log_mask |= lt->mask;
1695 if (prev_log_left == log_left)
1699 td_restore_runstate(td, old_state);
1702 void fio_writeout_logs(bool unit_logs)
1704 struct thread_data *td;
1708 td_writeout_logs(td, unit_logs);