2 * Code related to writing an iolog of what a thread is doing, and to
3 * later read that back and replay
23 #include <netinet/in.h>
24 #include <netinet/tcp.h>
25 #include <arpa/inet.h>
27 #include <sys/socket.h>
30 static int iolog_flush(struct io_log *log);
32 static const char iolog_ver2[] = "fio version 2 iolog";
34 void queue_io_piece(struct thread_data *td, struct io_piece *ipo)
36 flist_add_tail(&ipo->list, &td->io_log_list);
37 td->total_io_size += ipo->len;
40 void log_io_u(const struct thread_data *td, const struct io_u *io_u)
42 if (!td->o.write_iolog_file)
45 fprintf(td->iolog_f, "%s %s %llu %lu\n", io_u->file->file_name,
46 io_ddir_name(io_u->ddir),
47 io_u->offset, io_u->buflen);
50 void log_file(struct thread_data *td, struct fio_file *f,
51 enum file_log_act what)
53 const char *act[] = { "add", "open", "close" };
57 if (!td->o.write_iolog_file)
62 * this happens on the pre-open/close done before the job starts
67 fprintf(td->iolog_f, "%s %s\n", f->file_name, act[what]);
70 static void iolog_delay(struct thread_data *td, unsigned long delay)
72 uint64_t usec = utime_since_now(&td->last_issue);
73 unsigned long orig_delay = delay;
77 if (delay < td->time_offset) {
82 delay -= td->time_offset;
88 fio_gettime(&ts, NULL);
89 while (delay && !td->terminate) {
91 if (this_delay > 500000)
94 usec_sleep(td, this_delay);
98 usec = utime_since_now(&ts);
99 if (usec > orig_delay)
100 td->time_offset = usec - orig_delay;
105 static int ipo_special(struct thread_data *td, struct io_piece *ipo)
113 if (ipo->ddir != DDIR_INVAL)
116 f = td->files[ipo->fileno];
118 switch (ipo->file_action) {
119 case FIO_LOG_OPEN_FILE:
120 if (td->o.replay_redirect && fio_file_open(f)) {
121 dprint(FD_FILE, "iolog: ignoring re-open of file %s\n",
125 ret = td_io_open_file(td, f);
128 td_verror(td, ret, "iolog open file");
130 case FIO_LOG_CLOSE_FILE:
131 td_io_close_file(td, f);
133 case FIO_LOG_UNLINK_FILE:
134 td_io_unlink_file(td, f);
137 log_err("fio: bad file action %d\n", ipo->file_action);
144 int read_iolog_get(struct thread_data *td, struct io_u *io_u)
146 struct io_piece *ipo;
147 unsigned long elapsed;
149 while (!flist_empty(&td->io_log_list)) {
152 ipo = flist_first_entry(&td->io_log_list, struct io_piece, list);
153 flist_del(&ipo->list);
154 remove_trim_entry(td, ipo);
156 ret = ipo_special(td, ipo);
160 } else if (ret > 0) {
165 io_u->ddir = ipo->ddir;
166 if (ipo->ddir != DDIR_WAIT) {
167 io_u->offset = ipo->offset;
168 io_u->buflen = ipo->len;
169 io_u->file = td->files[ipo->fileno];
170 get_file(io_u->file);
171 dprint(FD_IO, "iolog: get %llu/%lu/%s\n", io_u->offset,
172 io_u->buflen, io_u->file->file_name);
174 iolog_delay(td, ipo->delay);
176 elapsed = mtime_since_genesis();
177 if (ipo->delay > elapsed)
178 usec_sleep(td, (ipo->delay - elapsed) * 1000);
183 if (io_u->ddir != DDIR_WAIT)
191 void prune_io_piece_log(struct thread_data *td)
193 struct io_piece *ipo;
194 struct fio_rb_node *n;
196 while ((n = rb_first(&td->io_hist_tree)) != NULL) {
197 ipo = rb_entry(n, struct io_piece, rb_node);
198 rb_erase(n, &td->io_hist_tree);
199 remove_trim_entry(td, ipo);
204 while (!flist_empty(&td->io_hist_list)) {
205 ipo = flist_first_entry(&td->io_hist_list, struct io_piece, list);
206 flist_del(&ipo->list);
207 remove_trim_entry(td, ipo);
214 * log a successful write, so we can unwind the log for verify
216 void log_io_piece(struct thread_data *td, struct io_u *io_u)
218 struct fio_rb_node **p, *parent;
219 struct io_piece *ipo, *__ipo;
221 ipo = calloc(1, sizeof(struct io_piece));
223 ipo->file = io_u->file;
224 ipo->offset = io_u->offset;
225 ipo->len = io_u->buflen;
226 ipo->numberio = io_u->numberio;
227 ipo->flags = IP_F_IN_FLIGHT;
231 if (io_u_should_trim(td, io_u)) {
232 flist_add_tail(&ipo->trim_list, &td->trim_list);
237 * Only sort writes if we don't have a random map in which case we need
238 * to check for duplicate blocks and drop the old one, which we rely on
239 * the rb insert/lookup for handling.
241 if (file_randommap(td, ipo->file)) {
242 INIT_FLIST_HEAD(&ipo->list);
243 flist_add_tail(&ipo->list, &td->io_hist_list);
244 ipo->flags |= IP_F_ONLIST;
249 RB_CLEAR_NODE(&ipo->rb_node);
252 * Sort the entry into the verification list
255 p = &td->io_hist_tree.rb_node;
261 __ipo = rb_entry(parent, struct io_piece, rb_node);
262 if (ipo->file < __ipo->file)
264 else if (ipo->file > __ipo->file)
266 else if (ipo->offset < __ipo->offset) {
268 overlap = ipo->offset + ipo->len > __ipo->offset;
270 else if (ipo->offset > __ipo->offset) {
272 overlap = __ipo->offset + __ipo->len > ipo->offset;
278 dprint(FD_IO, "iolog: overlap %llu/%lu, %llu/%lu\n",
279 __ipo->offset, __ipo->len,
280 ipo->offset, ipo->len);
282 rb_erase(parent, &td->io_hist_tree);
283 remove_trim_entry(td, __ipo);
284 if (!(__ipo->flags & IP_F_IN_FLIGHT))
290 rb_link_node(&ipo->rb_node, parent, p);
291 rb_insert_color(&ipo->rb_node, &td->io_hist_tree);
292 ipo->flags |= IP_F_ONRB;
296 void unlog_io_piece(struct thread_data *td, struct io_u *io_u)
298 struct io_piece *ipo = io_u->ipo;
300 if (td->ts.nr_block_infos) {
301 uint32_t *info = io_u_block_info(td, io_u);
302 if (BLOCK_INFO_STATE(*info) < BLOCK_STATE_TRIM_FAILURE) {
303 if (io_u->ddir == DDIR_TRIM)
304 *info = BLOCK_INFO_SET_STATE(*info,
305 BLOCK_STATE_TRIM_FAILURE);
306 else if (io_u->ddir == DDIR_WRITE)
307 *info = BLOCK_INFO_SET_STATE(*info,
308 BLOCK_STATE_WRITE_FAILURE);
315 if (ipo->flags & IP_F_ONRB)
316 rb_erase(&ipo->rb_node, &td->io_hist_tree);
317 else if (ipo->flags & IP_F_ONLIST)
318 flist_del(&ipo->list);
325 void trim_io_piece(const struct io_u *io_u)
327 struct io_piece *ipo = io_u->ipo;
332 ipo->len = io_u->xfer_buflen - io_u->resid;
335 void write_iolog_close(struct thread_data *td)
341 td->iolog_buf = NULL;
345 * Read version 2 iolog data. It is enhanced to include per-file logging,
348 static bool read_iolog2(struct thread_data *td, FILE *f)
350 unsigned long long offset;
352 int reads, writes, waits, fileno = 0, file_action = 0; /* stupid gcc */
353 char *rfname, *fname, *act;
357 free_release_files(td);
360 * Read in the read iolog and store it, reuse the infrastructure
361 * for doing verifications.
364 rfname = fname = malloc(256+16);
365 act = malloc(256+16);
367 reads = writes = waits = 0;
368 while ((p = fgets(str, 4096, f)) != NULL) {
369 struct io_piece *ipo;
372 r = sscanf(p, "%256s %256s %llu %u", rfname, act, &offset,
375 if (td->o.replay_redirect)
376 fname = td->o.replay_redirect;
382 if (!strcmp(act, "wait"))
384 else if (!strcmp(act, "read"))
386 else if (!strcmp(act, "write"))
388 else if (!strcmp(act, "sync"))
390 else if (!strcmp(act, "datasync"))
392 else if (!strcmp(act, "trim"))
395 log_err("fio: bad iolog file action: %s\n",
399 fileno = get_fileno(td, fname);
402 if (!strcmp(act, "add")) {
403 if (td->o.replay_redirect &&
404 get_fileno(td, fname) != -1) {
405 dprint(FD_FILE, "iolog: ignoring"
406 " re-add of file %s\n", fname);
408 fileno = add_file(td, fname, 0, 1);
409 file_action = FIO_LOG_ADD_FILE;
412 } else if (!strcmp(act, "open")) {
413 fileno = get_fileno(td, fname);
414 file_action = FIO_LOG_OPEN_FILE;
415 } else if (!strcmp(act, "close")) {
416 fileno = get_fileno(td, fname);
417 file_action = FIO_LOG_CLOSE_FILE;
419 log_err("fio: bad iolog file action: %s\n",
424 log_err("bad iolog2: %s\n", p);
430 else if (rw == DDIR_WRITE) {
432 * Don't add a write for ro mode
437 } else if (rw == DDIR_WAIT) {
441 } else if (rw == DDIR_INVAL) {
442 } else if (!ddir_sync(rw)) {
443 log_err("bad ddir: %d\n", rw);
450 ipo = calloc(1, sizeof(*ipo));
453 if (rw == DDIR_WAIT) {
456 if (td->o.replay_scale)
457 ipo->offset = offset / td->o.replay_scale;
459 ipo->offset = offset;
460 ipo_bytes_align(td->o.replay_align, ipo);
463 if (rw != DDIR_INVAL && bytes > td->o.max_bs[rw])
464 td->o.max_bs[rw] = bytes;
465 ipo->fileno = fileno;
466 ipo->file_action = file_action;
470 queue_io_piece(td, ipo);
477 if (writes && read_only) {
478 log_err("fio: <%s> skips replay of %d writes due to"
479 " read-only\n", td->o.name, writes);
483 if (!reads && !writes && !waits)
485 else if (reads && !writes)
486 td->o.td_ddir = TD_DDIR_READ;
487 else if (!reads && writes)
488 td->o.td_ddir = TD_DDIR_WRITE;
490 td->o.td_ddir = TD_DDIR_RW;
495 static bool is_socket(const char *path)
498 int r = stat(path, &buf);
502 return S_ISSOCK(buf.st_mode);
505 static int open_socket(const char *path)
507 int fd = socket(AF_UNIX, SOCK_STREAM, 0);
508 struct sockaddr_un addr;
511 addr.sun_family = AF_UNIX;
512 strncpy(addr.sun_path, path, sizeof(addr.sun_path));
513 if (connect(fd, (const struct sockaddr *)&addr, strlen(path) + sizeof(addr.sun_family)) == 0)
521 * open iolog, check version, and call appropriate parser
523 static bool init_iolog_read(struct thread_data *td)
525 char buffer[256], *p;
528 if (is_socket(td->o.read_iolog_file)) {
529 int fd = open_socket(td->o.read_iolog_file);
534 f = fopen(td->o.read_iolog_file, "r");
536 perror("fopen read iolog");
540 p = fgets(buffer, sizeof(buffer), f);
542 td_verror(td, errno, "iolog read");
543 log_err("fio: unable to read iolog\n");
549 * version 2 of the iolog stores a specific string as the
550 * first line, check for that
552 if (!strncmp(iolog_ver2, buffer, strlen(iolog_ver2)))
553 ret = read_iolog2(td, f);
555 log_err("fio: iolog version 1 is no longer supported\n");
564 * Set up a log for storing io patterns.
566 static bool init_iolog_write(struct thread_data *td)
572 f = fopen(td->o.write_iolog_file, "a");
574 perror("fopen write iolog");
579 * That's it for writing, setup a log buffer and we're done.
582 td->iolog_buf = malloc(8192);
583 setvbuf(f, td->iolog_buf, _IOFBF, 8192);
586 * write our version line
588 if (fprintf(f, "%s\n", iolog_ver2) < 0) {
589 perror("iolog init\n");
594 * add all known files
596 for_each_file(td, ff, i)
597 log_file(td, ff, FIO_LOG_ADD_FILE);
602 bool init_iolog(struct thread_data *td)
606 if (td->o.read_iolog_file) {
610 * Check if it's a blktrace file and load that if possible.
611 * Otherwise assume it's a normal log file and load that.
613 if (is_blktrace(td->o.read_iolog_file, &need_swap))
614 ret = load_blktrace(td, td->o.read_iolog_file, need_swap);
616 ret = init_iolog_read(td);
617 } else if (td->o.write_iolog_file)
618 ret = init_iolog_write(td);
623 td_verror(td, EINVAL, "failed initializing iolog");
628 void setup_log(struct io_log **log, struct log_params *p,
629 const char *filename)
633 struct io_u_plat_entry *entry;
634 struct flist_head *list;
636 l = scalloc(1, sizeof(*l));
637 INIT_FLIST_HEAD(&l->io_logs);
638 l->log_type = p->log_type;
639 l->log_offset = p->log_offset;
640 l->log_gz = p->log_gz;
641 l->log_gz_store = p->log_gz_store;
642 l->avg_msec = p->avg_msec;
643 l->hist_msec = p->hist_msec;
644 l->hist_coarseness = p->hist_coarseness;
645 l->filename = strdup(filename);
648 /* Initialize histogram lists for each r/w direction,
649 * with initial io_u_plat of all zeros:
651 for (i = 0; i < DDIR_RWDIR_CNT; i++) {
652 list = &l->hist_window[i].list;
653 INIT_FLIST_HEAD(list);
654 entry = calloc(1, sizeof(struct io_u_plat_entry));
655 flist_add(&entry->list, list);
658 if (l->td && l->td->o.io_submit_mode != IO_MODE_OFFLOAD) {
661 __p = calloc(1, sizeof(*l->pending));
662 __p->max_samples = DEF_LOG_ENTRIES;
663 __p->log = calloc(__p->max_samples, log_entry_sz(l));
668 l->log_ddir_mask = LOG_OFFSET_SAMPLE_BIT;
670 INIT_FLIST_HEAD(&l->chunk_list);
672 if (l->log_gz && !p->td)
674 else if (l->log_gz || l->log_gz_store) {
675 mutex_init_pshared(&l->chunk_lock);
676 mutex_init_pshared(&l->deferred_free_lock);
677 p->td->flags |= TD_F_COMPRESS_LOG;
683 #ifdef CONFIG_SETVBUF
684 static void *set_file_buffer(FILE *f)
686 size_t size = 1048576;
690 setvbuf(f, buf, _IOFBF, size);
694 static void clear_file_buffer(void *buf)
699 static void *set_file_buffer(FILE *f)
704 static void clear_file_buffer(void *buf)
709 void free_log(struct io_log *log)
711 while (!flist_empty(&log->io_logs)) {
712 struct io_logs *cur_log;
714 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
715 flist_del_init(&cur_log->list);
721 free(log->pending->log);
731 uint64_t hist_sum(int j, int stride, uint64_t *io_u_plat,
732 uint64_t *io_u_plat_last)
737 if (io_u_plat_last) {
738 for (k = sum = 0; k < stride; k++)
739 sum += io_u_plat[j + k] - io_u_plat_last[j + k];
741 for (k = sum = 0; k < stride; k++)
742 sum += io_u_plat[j + k];
748 static void flush_hist_samples(FILE *f, int hist_coarseness, void *samples,
749 uint64_t sample_size)
753 uint64_t i, j, nr_samples;
754 struct io_u_plat_entry *entry, *entry_before;
756 uint64_t *io_u_plat_before;
758 int stride = 1 << hist_coarseness;
763 s = __get_sample(samples, 0, 0);
764 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
766 nr_samples = sample_size / __log_entry_sz(log_offset);
768 for (i = 0; i < nr_samples; i++) {
769 s = __get_sample(samples, log_offset, i);
771 entry = s->data.plat_entry;
772 io_u_plat = entry->io_u_plat;
774 entry_before = flist_first_entry(&entry->list, struct io_u_plat_entry, list);
775 io_u_plat_before = entry_before->io_u_plat;
777 fprintf(f, "%lu, %u, %u, ", (unsigned long) s->time,
778 io_sample_ddir(s), s->bs);
779 for (j = 0; j < FIO_IO_U_PLAT_NR - stride; j += stride) {
780 fprintf(f, "%llu, ", (unsigned long long)
781 hist_sum(j, stride, io_u_plat, io_u_plat_before));
783 fprintf(f, "%llu\n", (unsigned long long)
784 hist_sum(FIO_IO_U_PLAT_NR - stride, stride, io_u_plat,
787 flist_del(&entry_before->list);
792 void flush_samples(FILE *f, void *samples, uint64_t sample_size)
796 uint64_t i, nr_samples;
801 s = __get_sample(samples, 0, 0);
802 log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
804 nr_samples = sample_size / __log_entry_sz(log_offset);
806 for (i = 0; i < nr_samples; i++) {
807 s = __get_sample(samples, log_offset, i);
810 fprintf(f, "%lu, %" PRId64 ", %u, %u\n",
811 (unsigned long) s->time,
813 io_sample_ddir(s), s->bs);
815 struct io_sample_offset *so = (void *) s;
817 fprintf(f, "%lu, %" PRId64 ", %u, %u, %llu\n",
818 (unsigned long) s->time,
820 io_sample_ddir(s), s->bs,
821 (unsigned long long) so->offset);
828 struct iolog_flush_data {
829 struct workqueue_work work;
836 #define GZ_CHUNK 131072
838 static struct iolog_compress *get_new_chunk(unsigned int seq)
840 struct iolog_compress *c;
842 c = malloc(sizeof(*c));
843 INIT_FLIST_HEAD(&c->list);
844 c->buf = malloc(GZ_CHUNK);
850 static void free_chunk(struct iolog_compress *ic)
856 static int z_stream_init(z_stream *stream, int gz_hdr)
860 memset(stream, 0, sizeof(*stream));
861 stream->zalloc = Z_NULL;
862 stream->zfree = Z_NULL;
863 stream->opaque = Z_NULL;
864 stream->next_in = Z_NULL;
867 * zlib magic - add 32 for auto-detection of gz header or not,
868 * if we decide to store files in a gzip friendly format.
873 if (inflateInit2(stream, wbits) != Z_OK)
879 struct inflate_chunk_iter {
888 static void finish_chunk(z_stream *stream, FILE *f,
889 struct inflate_chunk_iter *iter)
893 ret = inflateEnd(stream);
895 log_err("fio: failed to end log inflation seq %d (%d)\n",
898 flush_samples(f, iter->buf, iter->buf_used);
901 iter->buf_size = iter->buf_used = 0;
905 * Iterative chunk inflation. Handles cases where we cross into a new
906 * sequence, doing flush finish of previous chunk if needed.
908 static size_t inflate_chunk(struct iolog_compress *ic, int gz_hdr, FILE *f,
909 z_stream *stream, struct inflate_chunk_iter *iter)
913 dprint(FD_COMPRESS, "inflate chunk size=%lu, seq=%u\n",
914 (unsigned long) ic->len, ic->seq);
916 if (ic->seq != iter->seq) {
918 finish_chunk(stream, f, iter);
920 z_stream_init(stream, gz_hdr);
924 stream->avail_in = ic->len;
925 stream->next_in = ic->buf;
927 if (!iter->buf_size) {
928 iter->buf_size = iter->chunk_sz;
929 iter->buf = malloc(iter->buf_size);
932 while (stream->avail_in) {
933 size_t this_out = iter->buf_size - iter->buf_used;
936 stream->avail_out = this_out;
937 stream->next_out = iter->buf + iter->buf_used;
939 err = inflate(stream, Z_NO_FLUSH);
941 log_err("fio: failed inflating log: %d\n", err);
946 iter->buf_used += this_out - stream->avail_out;
948 if (!stream->avail_out) {
949 iter->buf_size += iter->chunk_sz;
950 iter->buf = realloc(iter->buf, iter->buf_size);
954 if (err == Z_STREAM_END)
958 ret = (void *) stream->next_in - ic->buf;
960 dprint(FD_COMPRESS, "inflated to size=%lu\n", (unsigned long) iter->buf_size);
966 * Inflate stored compressed chunks, or write them directly to the log
967 * file if so instructed.
969 static int inflate_gz_chunks(struct io_log *log, FILE *f)
971 struct inflate_chunk_iter iter = { .chunk_sz = log->log_gz, };
974 while (!flist_empty(&log->chunk_list)) {
975 struct iolog_compress *ic;
977 ic = flist_first_entry(&log->chunk_list, struct iolog_compress, list);
978 flist_del(&ic->list);
980 if (log->log_gz_store) {
983 dprint(FD_COMPRESS, "log write chunk size=%lu, "
984 "seq=%u\n", (unsigned long) ic->len, ic->seq);
986 ret = fwrite(ic->buf, ic->len, 1, f);
987 if (ret != 1 || ferror(f)) {
989 log_err("fio: error writing compressed log\n");
992 inflate_chunk(ic, log->log_gz_store, f, &stream, &iter);
998 finish_chunk(&stream, f, &iter);
1006 * Open compressed log file and decompress the stored chunks and
1007 * write them to stdout. The chunks are stored sequentially in the
1008 * file, so we iterate over them and do them one-by-one.
1010 int iolog_file_inflate(const char *file)
1012 struct inflate_chunk_iter iter = { .chunk_sz = 64 * 1024 * 1024, };
1013 struct iolog_compress ic;
1021 f = fopen(file, "r");
1027 if (stat(file, &sb) < 0) {
1033 ic.buf = buf = malloc(sb.st_size);
1034 ic.len = sb.st_size;
1037 ret = fread(ic.buf, ic.len, 1, f);
1038 if (ret == 0 && ferror(f)) {
1043 } else if (ferror(f) || (!feof(f) && ret != 1)) {
1044 log_err("fio: short read on reading log\n");
1053 * Each chunk will return Z_STREAM_END. We don't know how many
1054 * chunks are in the file, so we just keep looping and incrementing
1055 * the sequence number until we have consumed the whole compressed
1062 iret = inflate_chunk(&ic, 1, stdout, &stream, &iter);
1075 finish_chunk(&stream, stdout, &iter);
1085 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1090 int iolog_file_inflate(const char *file)
1092 log_err("fio: log inflation not possible without zlib\n");
1098 void flush_log(struct io_log *log, bool do_append)
1104 f = fopen(log->filename, "w");
1106 f = fopen(log->filename, "a");
1108 perror("fopen log");
1112 buf = set_file_buffer(f);
1114 inflate_gz_chunks(log, f);
1116 while (!flist_empty(&log->io_logs)) {
1117 struct io_logs *cur_log;
1119 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1120 flist_del_init(&cur_log->list);
1122 if (log->td && log == log->td->clat_hist_log)
1123 flush_hist_samples(f, log->hist_coarseness, cur_log->log,
1124 log_sample_sz(log, cur_log));
1126 flush_samples(f, cur_log->log, log_sample_sz(log, cur_log));
1132 clear_file_buffer(buf);
1135 static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
1137 if (td->flags & TD_F_COMPRESS_LOG)
1141 if (fio_trylock_file(log->filename))
1144 fio_lock_file(log->filename);
1146 if (td->client_type == FIO_CLIENT_TYPE_GUI || is_backend)
1147 fio_send_iolog(td, log, log->filename);
1149 flush_log(log, !td->o.per_job_logs);
1151 fio_unlock_file(log->filename);
1156 size_t log_chunk_sizes(struct io_log *log)
1158 struct flist_head *entry;
1161 if (flist_empty(&log->chunk_list))
1165 pthread_mutex_lock(&log->chunk_lock);
1166 flist_for_each(entry, &log->chunk_list) {
1167 struct iolog_compress *c;
1169 c = flist_entry(entry, struct iolog_compress, list);
1172 pthread_mutex_unlock(&log->chunk_lock);
1178 static void iolog_put_deferred(struct io_log *log, void *ptr)
1183 pthread_mutex_lock(&log->deferred_free_lock);
1184 if (log->deferred < IOLOG_MAX_DEFER) {
1185 log->deferred_items[log->deferred] = ptr;
1187 } else if (!fio_did_warn(FIO_WARN_IOLOG_DROP))
1188 log_err("fio: had to drop log entry free\n");
1189 pthread_mutex_unlock(&log->deferred_free_lock);
1192 static void iolog_free_deferred(struct io_log *log)
1199 pthread_mutex_lock(&log->deferred_free_lock);
1201 for (i = 0; i < log->deferred; i++) {
1202 free(log->deferred_items[i]);
1203 log->deferred_items[i] = NULL;
1207 pthread_mutex_unlock(&log->deferred_free_lock);
1210 static int gz_work(struct iolog_flush_data *data)
1212 struct iolog_compress *c = NULL;
1213 struct flist_head list;
1219 INIT_FLIST_HEAD(&list);
1221 memset(&stream, 0, sizeof(stream));
1222 stream.zalloc = Z_NULL;
1223 stream.zfree = Z_NULL;
1224 stream.opaque = Z_NULL;
1226 ret = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
1228 log_err("fio: failed to init gz stream\n");
1232 seq = ++data->log->chunk_seq;
1234 stream.next_in = (void *) data->samples;
1235 stream.avail_in = data->nr_samples * log_entry_sz(data->log);
1237 dprint(FD_COMPRESS, "deflate input size=%lu, seq=%u, log=%s\n",
1238 (unsigned long) stream.avail_in, seq,
1239 data->log->filename);
1242 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1243 (unsigned long) c->len);
1244 c = get_new_chunk(seq);
1245 stream.avail_out = GZ_CHUNK;
1246 stream.next_out = c->buf;
1247 ret = deflate(&stream, Z_NO_FLUSH);
1249 log_err("fio: deflate log (%d)\n", ret);
1254 c->len = GZ_CHUNK - stream.avail_out;
1255 flist_add_tail(&c->list, &list);
1257 } while (stream.avail_in);
1259 stream.next_out = c->buf + c->len;
1260 stream.avail_out = GZ_CHUNK - c->len;
1262 ret = deflate(&stream, Z_FINISH);
1265 * Z_BUF_ERROR is special, it just means we need more
1266 * output space. We'll handle that below. Treat any other
1269 if (ret != Z_BUF_ERROR) {
1270 log_err("fio: deflate log (%d)\n", ret);
1271 flist_del(&c->list);
1278 c->len = GZ_CHUNK - stream.avail_out;
1280 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq, (unsigned long) c->len);
1282 if (ret != Z_STREAM_END) {
1284 c = get_new_chunk(seq);
1285 stream.avail_out = GZ_CHUNK;
1286 stream.next_out = c->buf;
1287 ret = deflate(&stream, Z_FINISH);
1288 c->len = GZ_CHUNK - stream.avail_out;
1290 flist_add_tail(&c->list, &list);
1291 dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1292 (unsigned long) c->len);
1293 } while (ret != Z_STREAM_END);
1296 dprint(FD_COMPRESS, "deflated to size=%lu\n", (unsigned long) total);
1298 ret = deflateEnd(&stream);
1300 log_err("fio: deflateEnd %d\n", ret);
1302 iolog_put_deferred(data->log, data->samples);
1304 if (!flist_empty(&list)) {
1305 pthread_mutex_lock(&data->log->chunk_lock);
1306 flist_splice_tail(&list, &data->log->chunk_list);
1307 pthread_mutex_unlock(&data->log->chunk_lock);
1316 while (!flist_empty(&list)) {
1317 c = flist_first_entry(list.next, struct iolog_compress, list);
1318 flist_del(&c->list);
1326 * Invoked from our compress helper thread, when logging would have exceeded
1327 * the specified memory limitation. Compresses the previously stored
1330 static int gz_work_async(struct submit_worker *sw, struct workqueue_work *work)
1332 return gz_work(container_of(work, struct iolog_flush_data, work));
1335 static int gz_init_worker(struct submit_worker *sw)
1337 struct thread_data *td = sw->wq->td;
1339 if (!fio_option_is_set(&td->o, log_gz_cpumask))
1342 if (fio_setaffinity(gettid(), td->o.log_gz_cpumask) == -1) {
1343 log_err("gz: failed to set CPU affinity\n");
1350 static struct workqueue_ops log_compress_wq_ops = {
1351 .fn = gz_work_async,
1352 .init_worker_fn = gz_init_worker,
1356 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1358 if (!(td->flags & TD_F_COMPRESS_LOG))
1361 workqueue_init(td, &td->log_compress_wq, &log_compress_wq_ops, 1, sk_out);
1365 void iolog_compress_exit(struct thread_data *td)
1367 if (!(td->flags & TD_F_COMPRESS_LOG))
1370 workqueue_exit(&td->log_compress_wq);
1374 * Queue work item to compress the existing log entries. We reset the
1375 * current log to a small size, and reference the existing log in the
1376 * data that we queue for compression. Once compression has been done,
1377 * this old log is freed. If called with finish == true, will not return
1378 * until the log compression has completed, and will flush all previous
1381 static int iolog_flush(struct io_log *log)
1383 struct iolog_flush_data *data;
1385 data = malloc(sizeof(*data));
1392 while (!flist_empty(&log->io_logs)) {
1393 struct io_logs *cur_log;
1395 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1396 flist_del_init(&cur_log->list);
1398 data->samples = cur_log->log;
1399 data->nr_samples = cur_log->nr_samples;
1410 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1412 struct iolog_flush_data *data;
1414 data = smalloc(sizeof(*data));
1420 data->samples = cur_log->log;
1421 data->nr_samples = cur_log->nr_samples;
1424 cur_log->nr_samples = cur_log->max_samples = 0;
1425 cur_log->log = NULL;
1427 workqueue_enqueue(&log->td->log_compress_wq, &data->work);
1429 iolog_free_deferred(log);
1435 static int iolog_flush(struct io_log *log)
1440 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1445 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1450 void iolog_compress_exit(struct thread_data *td)
1456 struct io_logs *iolog_cur_log(struct io_log *log)
1458 if (flist_empty(&log->io_logs))
1461 return flist_last_entry(&log->io_logs, struct io_logs, list);
1464 uint64_t iolog_nr_samples(struct io_log *iolog)
1466 struct flist_head *entry;
1469 flist_for_each(entry, &iolog->io_logs) {
1470 struct io_logs *cur_log;
1472 cur_log = flist_entry(entry, struct io_logs, list);
1473 ret += cur_log->nr_samples;
1479 static int __write_log(struct thread_data *td, struct io_log *log, int try)
1482 return finish_log(td, log, try);
1487 static int write_iops_log(struct thread_data *td, int try, bool unit_log)
1491 if (per_unit_log(td->iops_log) != unit_log)
1494 ret = __write_log(td, td->iops_log, try);
1496 td->iops_log = NULL;
1501 static int write_slat_log(struct thread_data *td, int try, bool unit_log)
1508 ret = __write_log(td, td->slat_log, try);
1510 td->slat_log = NULL;
1515 static int write_clat_log(struct thread_data *td, int try, bool unit_log)
1522 ret = __write_log(td, td->clat_log, try);
1524 td->clat_log = NULL;
1529 static int write_clat_hist_log(struct thread_data *td, int try, bool unit_log)
1536 ret = __write_log(td, td->clat_hist_log, try);
1538 td->clat_hist_log = NULL;
1543 static int write_lat_log(struct thread_data *td, int try, bool unit_log)
1550 ret = __write_log(td, td->lat_log, try);
1557 static int write_bandw_log(struct thread_data *td, int try, bool unit_log)
1561 if (per_unit_log(td->bw_log) != unit_log)
1564 ret = __write_log(td, td->bw_log, try);
1577 CLAT_HIST_LOG_MASK = 32,
1584 int (*fn)(struct thread_data *, int, bool);
1587 static struct log_type log_types[] = {
1589 .mask = BW_LOG_MASK,
1590 .fn = write_bandw_log,
1593 .mask = LAT_LOG_MASK,
1594 .fn = write_lat_log,
1597 .mask = SLAT_LOG_MASK,
1598 .fn = write_slat_log,
1601 .mask = CLAT_LOG_MASK,
1602 .fn = write_clat_log,
1605 .mask = IOPS_LOG_MASK,
1606 .fn = write_iops_log,
1609 .mask = CLAT_HIST_LOG_MASK,
1610 .fn = write_clat_hist_log,
1614 void td_writeout_logs(struct thread_data *td, bool unit_logs)
1616 unsigned int log_mask = 0;
1617 unsigned int log_left = ALL_LOG_NR;
1620 old_state = td_bump_runstate(td, TD_FINISHING);
1622 finalize_logs(td, unit_logs);
1625 int prev_log_left = log_left;
1627 for (i = 0; i < ALL_LOG_NR && log_left; i++) {
1628 struct log_type *lt = &log_types[i];
1631 if (!(log_mask & lt->mask)) {
1632 ret = lt->fn(td, log_left != 1, unit_logs);
1635 log_mask |= lt->mask;
1640 if (prev_log_left == log_left)
1644 td_restore_runstate(td, old_state);
1647 void fio_writeout_logs(bool unit_logs)
1649 struct thread_data *td;
1653 td_writeout_logs(td, unit_logs);