f6023ee2ce6241398138d39134eaafe26e140998
[fio.git] / iolog.c
1 /*
2  * Code related to writing an iolog of what a thread is doing, and to
3  * later read that back and replay
4  */
5 #include <stdio.h>
6 #include <stdlib.h>
7 #include <assert.h>
8 #include <sys/types.h>
9 #include <sys/stat.h>
10 #include <unistd.h>
11 #ifdef CONFIG_ZLIB
12 #include <zlib.h>
13 #endif
14
15 #include "flist.h"
16 #include "fio.h"
17 #include "trim.h"
18 #include "filelock.h"
19 #include "smalloc.h"
20 #include "blktrace.h"
21 #include "pshared.h"
22 #include "lib/roundup.h"
23
24 #include <netinet/in.h>
25 #include <netinet/tcp.h>
26 #include <arpa/inet.h>
27 #include <sys/stat.h>
28 #include <sys/socket.h>
29 #include <sys/un.h>
30
31 static int iolog_flush(struct io_log *log);
32
33 static const char iolog_ver2[] = "fio version 2 iolog";
34 static const char iolog_ver3[] = "fio version 3 iolog";
35
36 void queue_io_piece(struct thread_data *td, struct io_piece *ipo)
37 {
38         flist_add_tail(&ipo->list, &td->io_log_list);
39         td->total_io_size += ipo->len;
40 }
41
42 void log_io_u(const struct thread_data *td, const struct io_u *io_u)
43 {
44         if (!td->o.write_iolog_file)
45                 return;
46
47         fprintf(td->iolog_f, "%s %s %llu %llu\n", io_u->file->file_name,
48                                                 io_ddir_name(io_u->ddir),
49                                                 io_u->offset, io_u->buflen);
50 }
51
52 void log_file(struct thread_data *td, struct fio_file *f,
53               enum file_log_act what)
54 {
55         const char *act[] = { "add", "open", "close" };
56
57         assert(what < 3);
58
59         if (!td->o.write_iolog_file)
60                 return;
61
62
63         /*
64          * this happens on the pre-open/close done before the job starts
65          */
66         if (!td->iolog_f)
67                 return;
68
69         fprintf(td->iolog_f, "%s %s\n", f->file_name, act[what]);
70 }
71
72 static void iolog_delay(struct thread_data *td, unsigned long delay)
73 {
74         uint64_t usec = utime_since_now(&td->last_issue);
75         unsigned long orig_delay = delay;
76         uint64_t this_delay;
77         struct timespec ts;
78
79         if (delay < td->time_offset) {
80                 td->time_offset = 0;
81                 return;
82         }
83
84         delay -= td->time_offset;
85         if (delay < usec)
86                 return;
87
88         delay -= usec;
89
90         fio_gettime(&ts, NULL);
91         while (delay && !td->terminate) {
92                 this_delay = delay;
93                 if (this_delay > 500000)
94                         this_delay = 500000;
95
96                 usec_sleep(td, this_delay);
97                 delay -= this_delay;
98         }
99
100         usec = utime_since_now(&ts);
101         if (usec > orig_delay)
102                 td->time_offset = usec - orig_delay;
103         else
104                 td->time_offset = 0;
105 }
106
107 static int ipo_special(struct thread_data *td, struct io_piece *ipo)
108 {
109         struct fio_file *f;
110         int ret;
111
112         /*
113          * Not a special ipo
114          */
115         if (ipo->ddir != DDIR_INVAL)
116                 return 0;
117
118         f = td->files[ipo->fileno];
119
120         if (ipo->delay)
121                 iolog_delay(td, ipo->delay);
122         if (fio_fill_issue_time(td))
123                 fio_gettime(&td->last_issue, NULL);
124         switch (ipo->file_action) {
125         case FIO_LOG_OPEN_FILE:
126                 if (td->o.replay_redirect && fio_file_open(f)) {
127                         dprint(FD_FILE, "iolog: ignoring re-open of file %s\n",
128                                         f->file_name);
129                         break;
130                 }
131                 ret = td_io_open_file(td, f);
132                 if (!ret)
133                         break;
134                 td_verror(td, ret, "iolog open file");
135                 return -1;
136         case FIO_LOG_CLOSE_FILE:
137                 td_io_close_file(td, f);
138                 break;
139         case FIO_LOG_UNLINK_FILE:
140                 td_io_unlink_file(td, f);
141                 break;
142         case FIO_LOG_ADD_FILE:
143                 /*
144                  * Nothing to do
145                  */
146                 break;
147         default:
148                 log_err("fio: bad file action %d\n", ipo->file_action);
149                 break;
150         }
151
152         return 1;
153 }
154
155 static bool read_iolog(struct thread_data *td);
156
157 unsigned long long delay_since_ttime(const struct thread_data *td,
158                unsigned long long time)
159 {
160         double tmp;
161         double scale;
162         const unsigned long long *last_ttime = &td->io_log_last_ttime;
163
164         if (!*last_ttime || td->o.no_stall || time < *last_ttime)
165                 return 0;
166         else if (td->o.replay_time_scale == 100)
167                 return time - *last_ttime;
168
169
170         scale = (double) 100.0 / (double) td->o.replay_time_scale;
171         tmp = time - *last_ttime;
172         return tmp * scale;
173 }
174
175 int read_iolog_get(struct thread_data *td, struct io_u *io_u)
176 {
177         struct io_piece *ipo;
178         unsigned long elapsed;
179
180         while (!flist_empty(&td->io_log_list)) {
181                 int ret;
182
183                 if (td->o.read_iolog_chunked) {
184                         if (td->io_log_checkmark == td->io_log_current) {
185                                 if (td->io_log_blktrace) {
186                                         if (!read_blktrace(td))
187                                                 return 1;
188                                 } else {
189                                         if (!read_iolog(td))
190                                                 return 1;
191                                 }
192                         }
193                         td->io_log_current--;
194                 }
195                 ipo = flist_first_entry(&td->io_log_list, struct io_piece, list);
196                 flist_del(&ipo->list);
197                 remove_trim_entry(td, ipo);
198
199                 ret = ipo_special(td, ipo);
200                 if (ret < 0) {
201                         free(ipo);
202                         break;
203                 } else if (ret > 0) {
204                         free(ipo);
205                         continue;
206                 }
207
208                 io_u->ddir = ipo->ddir;
209                 if (ipo->ddir != DDIR_WAIT) {
210                         io_u->offset = ipo->offset;
211                         io_u->verify_offset = ipo->offset;
212                         io_u->buflen = ipo->len;
213                         io_u->file = td->files[ipo->fileno];
214                         get_file(io_u->file);
215                         dprint(FD_IO, "iolog: get %llu/%llu/%s\n", io_u->offset,
216                                                 io_u->buflen, io_u->file->file_name);
217                         if (ipo->delay)
218                                 iolog_delay(td, ipo->delay);
219                 } else {
220                         elapsed = mtime_since_genesis();
221                         if (ipo->delay > elapsed)
222                                 usec_sleep(td, (ipo->delay - elapsed) * 1000);
223                 }
224
225                 free(ipo);
226
227                 if (io_u->ddir != DDIR_WAIT)
228                         return 0;
229         }
230
231         td->done = 1;
232         return 1;
233 }
234
235 void prune_io_piece_log(struct thread_data *td)
236 {
237         struct io_piece *ipo;
238         struct fio_rb_node *n;
239
240         while ((n = rb_first(&td->io_hist_tree)) != NULL) {
241                 ipo = rb_entry(n, struct io_piece, rb_node);
242                 rb_erase(n, &td->io_hist_tree);
243                 remove_trim_entry(td, ipo);
244                 td->io_hist_len--;
245                 free(ipo);
246         }
247
248         while (!flist_empty(&td->io_hist_list)) {
249                 ipo = flist_first_entry(&td->io_hist_list, struct io_piece, list);
250                 flist_del(&ipo->list);
251                 remove_trim_entry(td, ipo);
252                 td->io_hist_len--;
253                 free(ipo);
254         }
255 }
256
257 /*
258  * log a successful write, so we can unwind the log for verify
259  */
260 void log_io_piece(struct thread_data *td, struct io_u *io_u)
261 {
262         struct fio_rb_node **p, *parent;
263         struct io_piece *ipo, *__ipo;
264
265         ipo = calloc(1, sizeof(struct io_piece));
266         init_ipo(ipo);
267         ipo->file = io_u->file;
268         ipo->offset = io_u->offset;
269         ipo->len = io_u->buflen;
270         ipo->numberio = io_u->numberio;
271         ipo->flags = IP_F_IN_FLIGHT;
272
273         io_u->ipo = ipo;
274
275         if (io_u_should_trim(td, io_u)) {
276                 flist_add_tail(&ipo->trim_list, &td->trim_list);
277                 td->trim_entries++;
278         }
279
280         /*
281          * Only sort writes if we don't have a random map in which case we need
282          * to check for duplicate blocks and drop the old one, which we rely on
283          * the rb insert/lookup for handling.
284          */
285         if (file_randommap(td, ipo->file)) {
286                 INIT_FLIST_HEAD(&ipo->list);
287                 flist_add_tail(&ipo->list, &td->io_hist_list);
288                 ipo->flags |= IP_F_ONLIST;
289                 td->io_hist_len++;
290                 return;
291         }
292
293         RB_CLEAR_NODE(&ipo->rb_node);
294
295         /*
296          * Sort the entry into the verification list
297          */
298 restart:
299         p = &td->io_hist_tree.rb_node;
300         parent = NULL;
301         while (*p) {
302                 int overlap = 0;
303                 parent = *p;
304
305                 __ipo = rb_entry(parent, struct io_piece, rb_node);
306                 if (ipo->file < __ipo->file)
307                         p = &(*p)->rb_left;
308                 else if (ipo->file > __ipo->file)
309                         p = &(*p)->rb_right;
310                 else if (ipo->offset < __ipo->offset) {
311                         p = &(*p)->rb_left;
312                         overlap = ipo->offset + ipo->len > __ipo->offset;
313                 }
314                 else if (ipo->offset > __ipo->offset) {
315                         p = &(*p)->rb_right;
316                         overlap = __ipo->offset + __ipo->len > ipo->offset;
317                 }
318                 else
319                         overlap = 1;
320
321                 if (overlap) {
322                         dprint(FD_IO, "iolog: overlap %llu/%lu, %llu/%lu\n",
323                                 __ipo->offset, __ipo->len,
324                                 ipo->offset, ipo->len);
325                         td->io_hist_len--;
326                         rb_erase(parent, &td->io_hist_tree);
327                         remove_trim_entry(td, __ipo);
328                         if (!(__ipo->flags & IP_F_IN_FLIGHT))
329                                 free(__ipo);
330                         goto restart;
331                 }
332         }
333
334         rb_link_node(&ipo->rb_node, parent, p);
335         rb_insert_color(&ipo->rb_node, &td->io_hist_tree);
336         ipo->flags |= IP_F_ONRB;
337         td->io_hist_len++;
338 }
339
340 void unlog_io_piece(struct thread_data *td, struct io_u *io_u)
341 {
342         struct io_piece *ipo = io_u->ipo;
343
344         if (td->ts.nr_block_infos) {
345                 uint32_t *info = io_u_block_info(td, io_u);
346                 if (BLOCK_INFO_STATE(*info) < BLOCK_STATE_TRIM_FAILURE) {
347                         if (io_u->ddir == DDIR_TRIM)
348                                 *info = BLOCK_INFO_SET_STATE(*info,
349                                                 BLOCK_STATE_TRIM_FAILURE);
350                         else if (io_u->ddir == DDIR_WRITE)
351                                 *info = BLOCK_INFO_SET_STATE(*info,
352                                                 BLOCK_STATE_WRITE_FAILURE);
353                 }
354         }
355
356         if (!ipo)
357                 return;
358
359         if (ipo->flags & IP_F_ONRB)
360                 rb_erase(&ipo->rb_node, &td->io_hist_tree);
361         else if (ipo->flags & IP_F_ONLIST)
362                 flist_del(&ipo->list);
363
364         free(ipo);
365         io_u->ipo = NULL;
366         td->io_hist_len--;
367 }
368
369 void trim_io_piece(const struct io_u *io_u)
370 {
371         struct io_piece *ipo = io_u->ipo;
372
373         if (!ipo)
374                 return;
375
376         ipo->len = io_u->xfer_buflen - io_u->resid;
377 }
378
379 void write_iolog_close(struct thread_data *td)
380 {
381         if (!td->iolog_f)
382                 return;
383
384         fflush(td->iolog_f);
385         fclose(td->iolog_f);
386         free(td->iolog_buf);
387         td->iolog_f = NULL;
388         td->iolog_buf = NULL;
389 }
390
391 int64_t iolog_items_to_fetch(struct thread_data *td)
392 {
393         struct timespec now;
394         uint64_t elapsed;
395         uint64_t for_1s;
396         int64_t items_to_fetch;
397
398         if (!td->io_log_highmark)
399                 return 10;
400
401
402         fio_gettime(&now, NULL);
403         elapsed = ntime_since(&td->io_log_highmark_time, &now);
404         if (elapsed) {
405                 for_1s = (td->io_log_highmark - td->io_log_current) * 1000000000 / elapsed;
406                 items_to_fetch = for_1s - td->io_log_current;
407                 if (items_to_fetch < 0)
408                         items_to_fetch = 0;
409         } else
410                 items_to_fetch = 0;
411
412         td->io_log_highmark = td->io_log_current + items_to_fetch;
413         td->io_log_checkmark = (td->io_log_highmark + 1) / 2;
414         fio_gettime(&td->io_log_highmark_time, NULL);
415
416         return items_to_fetch;
417 }
418
419 #define io_act(_td, _r) (((_td)->io_log_version == 3 && (r) == 5) || \
420                                         ((_td)->io_log_version == 2 && (r) == 4))
421 #define file_act(_td, _r) (((_td)->io_log_version == 3 && (r) == 3) || \
422                                         ((_td)->io_log_version == 2 && (r) == 2))
423
424 /*
425  * Read version 2 and 3 iolog data. It is enhanced to include per-file logging,
426  * syncs, etc.
427  */
428 static bool read_iolog(struct thread_data *td)
429 {
430         unsigned long long offset;
431         unsigned int bytes;
432         unsigned long long delay = 0;
433         int reads, writes, waits, fileno = 0, file_action = 0; /* stupid gcc */
434         char *rfname, *fname, *act;
435         char *str, *p;
436         enum fio_ddir rw;
437         bool realloc = false;
438         int64_t items_to_fetch = 0;
439         int syncs;
440
441         if (td->o.read_iolog_chunked) {
442                 items_to_fetch = iolog_items_to_fetch(td);
443                 if (!items_to_fetch)
444                         return true;
445         }
446
447         /*
448          * Read in the read iolog and store it, reuse the infrastructure
449          * for doing verifications.
450          */
451         str = malloc(4096);
452         rfname = fname = malloc(256+16);
453         act = malloc(256+16);
454
455         syncs = reads = writes = waits = 0;
456         while ((p = fgets(str, 4096, td->io_log_rfile)) != NULL) {
457                 struct io_piece *ipo;
458                 int r;
459                 unsigned long long ttime;
460
461                 if (td->io_log_version == 3) {
462                         r = sscanf(p, "%llu %256s %256s %llu %u", &ttime, rfname, act,
463                                                         &offset, &bytes);
464                         delay = delay_since_ttime(td, ttime);
465                         td->io_log_last_ttime = ttime;
466                         /*
467                          * "wait" is not allowed with version 3
468                          */
469                         if (!strcmp(act, "wait")) {
470                                 log_err("iolog: ignoring wait command with"
471                                         " version 3 for file %s\n", fname);
472                                 continue;
473                         }
474                 } else /* version 2 */
475                         r = sscanf(p, "%256s %256s %llu %u", rfname, act, &offset, &bytes);
476
477                 if (td->o.replay_redirect)
478                         fname = td->o.replay_redirect;
479
480                 if (io_act(td, r)) {
481                         /*
482                          * Check action first
483                          */
484                         if (!strcmp(act, "wait"))
485                                 rw = DDIR_WAIT;
486                         else if (!strcmp(act, "read"))
487                                 rw = DDIR_READ;
488                         else if (!strcmp(act, "write"))
489                                 rw = DDIR_WRITE;
490                         else if (!strcmp(act, "sync"))
491                                 rw = DDIR_SYNC;
492                         else if (!strcmp(act, "datasync"))
493                                 rw = DDIR_DATASYNC;
494                         else if (!strcmp(act, "trim"))
495                                 rw = DDIR_TRIM;
496                         else {
497                                 log_err("fio: bad iolog file action: %s\n",
498                                                                         act);
499                                 continue;
500                         }
501                         fileno = get_fileno(td, fname);
502                 } else if (file_act(td, r)) {
503                         rw = DDIR_INVAL;
504                         if (!strcmp(act, "add")) {
505                                 if (td->o.replay_redirect &&
506                                     get_fileno(td, fname) != -1) {
507                                         dprint(FD_FILE, "iolog: ignoring"
508                                                 " re-add of file %s\n", fname);
509                                 } else {
510                                         fileno = add_file(td, fname, td->subjob_number, 1);
511                                         file_action = FIO_LOG_ADD_FILE;
512                                 }
513                         } else if (!strcmp(act, "open")) {
514                                 fileno = get_fileno(td, fname);
515                                 file_action = FIO_LOG_OPEN_FILE;
516                         } else if (!strcmp(act, "close")) {
517                                 fileno = get_fileno(td, fname);
518                                 file_action = FIO_LOG_CLOSE_FILE;
519                         } else {
520                                 log_err("fio: bad iolog file action: %s\n",
521                                                                         act);
522                                 continue;
523                         }
524                 } else {
525                         log_err("bad iolog%d: %s\n", td->io_log_version, p);
526                         continue;
527                 }
528
529                 if (rw == DDIR_READ)
530                         reads++;
531                 else if (rw == DDIR_WRITE) {
532                         /*
533                          * Don't add a write for ro mode
534                          */
535                         if (read_only)
536                                 continue;
537                         writes++;
538                 } else if (rw == DDIR_WAIT) {
539                         if (td->o.no_stall)
540                                 continue;
541                         waits++;
542                 } else if (rw == DDIR_INVAL) {
543                 } else if (ddir_sync(rw)) {
544                         syncs++;
545                 } else {
546                         log_err("bad ddir: %d\n", rw);
547                         continue;
548                 }
549
550                 /*
551                  * Make note of file
552                  */
553                 ipo = calloc(1, sizeof(*ipo));
554                 init_ipo(ipo);
555                 ipo->ddir = rw;
556                 if (td->io_log_version == 3)
557                         ipo->delay = delay;
558                 if (rw == DDIR_WAIT) {
559                         ipo->delay = offset;
560                 } else {
561                         if (td->o.replay_scale)
562                                 ipo->offset = offset / td->o.replay_scale;
563                         else
564                                 ipo->offset = offset;
565                         ipo_bytes_align(td->o.replay_align, ipo);
566
567                         ipo->len = bytes;
568                         if (rw != DDIR_INVAL && bytes > td->o.max_bs[rw]) {
569                                 realloc = true;
570                                 td->o.max_bs[rw] = bytes;
571                         }
572                         ipo->fileno = fileno;
573                         ipo->file_action = file_action;
574                         td->o.size += bytes;
575                 }
576
577                 queue_io_piece(td, ipo);
578
579                 if (td->o.read_iolog_chunked) {
580                         td->io_log_current++;
581                         items_to_fetch--;
582                         if (items_to_fetch == 0)
583                                 break;
584                 }
585         }
586
587         free(str);
588         free(act);
589         free(rfname);
590
591         if (td->o.read_iolog_chunked) {
592                 td->io_log_highmark = td->io_log_current;
593                 td->io_log_checkmark = (td->io_log_highmark + 1) / 2;
594                 fio_gettime(&td->io_log_highmark_time, NULL);
595         }
596
597         if (writes && read_only) {
598                 log_err("fio: <%s> skips replay of %d writes due to"
599                         " read-only\n", td->o.name, writes);
600                 writes = 0;
601         }
602         if (syncs)
603                 td->flags |= TD_F_SYNCS;
604
605         if (td->o.read_iolog_chunked) {
606                 if (td->io_log_current == 0) {
607                         return false;
608                 }
609                 td->o.td_ddir = TD_DDIR_RW;
610                 if (realloc && td->orig_buffer)
611                 {
612                         io_u_quiesce(td);
613                         free_io_mem(td);
614                         init_io_u_buffers(td);
615                 }
616                 return true;
617         }
618
619         if (!reads && !writes && !waits)
620                 return false;
621         else if (reads && !writes)
622                 td->o.td_ddir = TD_DDIR_READ;
623         else if (!reads && writes)
624                 td->o.td_ddir = TD_DDIR_WRITE;
625         else
626                 td->o.td_ddir = TD_DDIR_RW;
627
628         return true;
629 }
630
631 static bool is_socket(const char *path)
632 {
633         struct stat buf;
634         int r;
635
636         r = stat(path, &buf);
637         if (r == -1)
638                 return false;
639
640         return S_ISSOCK(buf.st_mode);
641 }
642
643 static int open_socket(const char *path)
644 {
645         struct sockaddr_un addr;
646         int ret, fd;
647
648         fd = socket(AF_UNIX, SOCK_STREAM, 0);
649         if (fd < 0)
650                 return fd;
651
652         addr.sun_family = AF_UNIX;
653         if (snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", path) >=
654             sizeof(addr.sun_path)) {
655                 log_err("%s: path name %s is too long for a Unix socket\n",
656                         __func__, path);
657         }
658
659         ret = connect(fd, (const struct sockaddr *)&addr, strlen(path) + sizeof(addr.sun_family));
660         if (!ret)
661                 return fd;
662
663         close(fd);
664         return -1;
665 }
666
667 /*
668  * open iolog, check version, and call appropriate parser
669  */
670 static bool init_iolog_read(struct thread_data *td, char *fname)
671 {
672         char buffer[256], *p;
673         FILE *f = NULL;
674
675         dprint(FD_IO, "iolog: name=%s\n", fname);
676
677         if (is_socket(fname)) {
678                 int fd;
679
680                 fd = open_socket(fname);
681                 if (fd >= 0)
682                         f = fdopen(fd, "r");
683         } else if (!strcmp(fname, "-")) {
684                 f = stdin;
685         } else
686                 f = fopen(fname, "r");
687
688         if (!f) {
689                 perror("fopen read iolog");
690                 return false;
691         }
692
693         p = fgets(buffer, sizeof(buffer), f);
694         if (!p) {
695                 td_verror(td, errno, "iolog read");
696                 log_err("fio: unable to read iolog\n");
697                 fclose(f);
698                 return false;
699         }
700
701         /*
702          * versions 2 and 3 of the iolog store a specific string as the
703          * first line, check for that
704          */
705         if (!strncmp(iolog_ver2, buffer, strlen(iolog_ver2)))
706                 td->io_log_version = 2;
707         else if (!strncmp(iolog_ver3, buffer, strlen(iolog_ver3)))
708                 td->io_log_version = 3;
709         else {
710                 log_err("fio: iolog version 1 is no longer supported\n");
711                 fclose(f);
712                 return false;
713         }
714
715         free_release_files(td);
716         td->io_log_rfile = f;
717         return read_iolog(td);
718 }
719
720 /*
721  * Set up a log for storing io patterns.
722  */
723 static bool init_iolog_write(struct thread_data *td)
724 {
725         struct fio_file *ff;
726         FILE *f;
727         unsigned int i;
728
729         f = fopen(td->o.write_iolog_file, "a");
730         if (!f) {
731                 perror("fopen write iolog");
732                 return false;
733         }
734
735         /*
736          * That's it for writing, setup a log buffer and we're done.
737           */
738         td->iolog_f = f;
739         td->iolog_buf = malloc(8192);
740         setvbuf(f, td->iolog_buf, _IOFBF, 8192);
741
742         /*
743          * write our version line
744          */
745         if (fprintf(f, "%s\n", iolog_ver2) < 0) {
746                 perror("iolog init\n");
747                 return false;
748         }
749
750         /*
751          * add all known files
752          */
753         for_each_file(td, ff, i)
754                 log_file(td, ff, FIO_LOG_ADD_FILE);
755
756         return true;
757 }
758
759 bool init_iolog(struct thread_data *td)
760 {
761         bool ret;
762
763         if (td->o.read_iolog_file) {
764                 int need_swap;
765                 char * fname = get_name_by_idx(td->o.read_iolog_file, td->subjob_number);
766
767                 /*
768                  * Check if it's a blktrace file and load that if possible.
769                  * Otherwise assume it's a normal log file and load that.
770                  */
771                 if (is_blktrace(fname, &need_swap)) {
772                         td->io_log_blktrace = 1;
773                         ret = init_blktrace_read(td, fname, need_swap);
774                 } else {
775                         td->io_log_blktrace = 0;
776                         ret = init_iolog_read(td, fname);
777                 }
778                 free(fname);
779         } else if (td->o.write_iolog_file)
780                 ret = init_iolog_write(td);
781         else
782                 ret = true;
783
784         if (!ret)
785                 td_verror(td, EINVAL, "failed initializing iolog");
786
787         return ret;
788 }
789
790 void setup_log(struct io_log **log, struct log_params *p,
791                const char *filename)
792 {
793         struct io_log *l;
794         int i;
795         struct io_u_plat_entry *entry;
796         struct flist_head *list;
797
798         l = scalloc(1, sizeof(*l));
799         INIT_FLIST_HEAD(&l->io_logs);
800         l->log_type = p->log_type;
801         l->log_offset = p->log_offset;
802         l->log_prio = p->log_prio;
803         l->log_gz = p->log_gz;
804         l->log_gz_store = p->log_gz_store;
805         l->avg_msec = p->avg_msec;
806         l->hist_msec = p->hist_msec;
807         l->hist_coarseness = p->hist_coarseness;
808         l->filename = strdup(filename);
809         l->td = p->td;
810
811         /* Initialize histogram lists for each r/w direction,
812          * with initial io_u_plat of all zeros:
813          */
814         for (i = 0; i < DDIR_RWDIR_CNT; i++) {
815                 list = &l->hist_window[i].list;
816                 INIT_FLIST_HEAD(list);
817                 entry = calloc(1, sizeof(struct io_u_plat_entry));
818                 flist_add(&entry->list, list);
819         }
820
821         if (l->td && l->td->o.io_submit_mode != IO_MODE_OFFLOAD) {
822                 unsigned int def_samples = DEF_LOG_ENTRIES;
823                 struct io_logs *__p;
824
825                 __p = calloc(1, sizeof(*l->pending));
826                 if (l->td->o.iodepth > DEF_LOG_ENTRIES)
827                         def_samples = roundup_pow2(l->td->o.iodepth);
828                 __p->max_samples = def_samples;
829                 __p->log = calloc(__p->max_samples, log_entry_sz(l));
830                 l->pending = __p;
831         }
832
833         if (l->log_offset)
834                 l->log_ddir_mask = LOG_OFFSET_SAMPLE_BIT;
835         if (l->log_prio)
836                 l->log_ddir_mask |= LOG_PRIO_SAMPLE_BIT;
837
838         INIT_FLIST_HEAD(&l->chunk_list);
839
840         if (l->log_gz && !p->td)
841                 l->log_gz = 0;
842         else if (l->log_gz || l->log_gz_store) {
843                 mutex_init_pshared(&l->chunk_lock);
844                 mutex_init_pshared(&l->deferred_free_lock);
845                 p->td->flags |= TD_F_COMPRESS_LOG;
846         }
847
848         *log = l;
849 }
850
851 #ifdef CONFIG_SETVBUF
852 static void *set_file_buffer(FILE *f)
853 {
854         size_t size = 1048576;
855         void *buf;
856
857         buf = malloc(size);
858         setvbuf(f, buf, _IOFBF, size);
859         return buf;
860 }
861
862 static void clear_file_buffer(void *buf)
863 {
864         free(buf);
865 }
866 #else
867 static void *set_file_buffer(FILE *f)
868 {
869         return NULL;
870 }
871
872 static void clear_file_buffer(void *buf)
873 {
874 }
875 #endif
876
877 void free_log(struct io_log *log)
878 {
879         while (!flist_empty(&log->io_logs)) {
880                 struct io_logs *cur_log;
881
882                 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
883                 flist_del_init(&cur_log->list);
884                 free(cur_log->log);
885                 sfree(cur_log);
886         }
887
888         if (log->pending) {
889                 free(log->pending->log);
890                 free(log->pending);
891                 log->pending = NULL;
892         }
893
894         free(log->pending);
895         free(log->filename);
896         sfree(log);
897 }
898
899 uint64_t hist_sum(int j, int stride, uint64_t *io_u_plat,
900                 uint64_t *io_u_plat_last)
901 {
902         uint64_t sum;
903         int k;
904
905         if (io_u_plat_last) {
906                 for (k = sum = 0; k < stride; k++)
907                         sum += io_u_plat[j + k] - io_u_plat_last[j + k];
908         } else {
909                 for (k = sum = 0; k < stride; k++)
910                         sum += io_u_plat[j + k];
911         }
912
913         return sum;
914 }
915
916 static void flush_hist_samples(FILE *f, int hist_coarseness, void *samples,
917                                uint64_t sample_size)
918 {
919         struct io_sample *s;
920         int log_offset;
921         uint64_t i, j, nr_samples;
922         struct io_u_plat_entry *entry, *entry_before;
923         uint64_t *io_u_plat;
924         uint64_t *io_u_plat_before;
925
926         int stride = 1 << hist_coarseness;
927         
928         if (!sample_size)
929                 return;
930
931         s = __get_sample(samples, 0, 0);
932         log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
933
934         nr_samples = sample_size / __log_entry_sz(log_offset);
935
936         for (i = 0; i < nr_samples; i++) {
937                 s = __get_sample(samples, log_offset, i);
938
939                 entry = s->data.plat_entry;
940                 io_u_plat = entry->io_u_plat;
941
942                 entry_before = flist_first_entry(&entry->list, struct io_u_plat_entry, list);
943                 io_u_plat_before = entry_before->io_u_plat;
944
945                 fprintf(f, "%lu, %u, %llu, ", (unsigned long) s->time,
946                                                 io_sample_ddir(s), (unsigned long long) s->bs);
947                 for (j = 0; j < FIO_IO_U_PLAT_NR - stride; j += stride) {
948                         fprintf(f, "%llu, ", (unsigned long long)
949                                 hist_sum(j, stride, io_u_plat, io_u_plat_before));
950                 }
951                 fprintf(f, "%llu\n", (unsigned long long)
952                         hist_sum(FIO_IO_U_PLAT_NR - stride, stride, io_u_plat,
953                                         io_u_plat_before));
954
955                 flist_del(&entry_before->list);
956                 free(entry_before);
957         }
958 }
959
960 void flush_samples(FILE *f, void *samples, uint64_t sample_size)
961 {
962         struct io_sample *s;
963         int log_offset, log_prio;
964         uint64_t i, nr_samples;
965         unsigned int prio_val;
966         const char *fmt;
967
968         if (!sample_size)
969                 return;
970
971         s = __get_sample(samples, 0, 0);
972         log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
973         log_prio = (s->__ddir & LOG_PRIO_SAMPLE_BIT) != 0;
974
975         if (log_offset) {
976                 if (log_prio)
977                         fmt = "%lu, %" PRId64 ", %u, %llu, %llu, 0x%04x\n";
978                 else
979                         fmt = "%lu, %" PRId64 ", %u, %llu, %llu, %u\n";
980         } else {
981                 if (log_prio)
982                         fmt = "%lu, %" PRId64 ", %u, %llu, 0x%04x\n";
983                 else
984                         fmt = "%lu, %" PRId64 ", %u, %llu, %u\n";
985         }
986
987         nr_samples = sample_size / __log_entry_sz(log_offset);
988
989         for (i = 0; i < nr_samples; i++) {
990                 s = __get_sample(samples, log_offset, i);
991
992                 if (log_prio)
993                         prio_val = s->priority;
994                 else
995                         prio_val = ioprio_value_is_class_rt(s->priority);
996
997                 if (!log_offset) {
998                         fprintf(f, fmt,
999                                 (unsigned long) s->time,
1000                                 s->data.val,
1001                                 io_sample_ddir(s), (unsigned long long) s->bs,
1002                                 prio_val);
1003                 } else {
1004                         struct io_sample_offset *so = (void *) s;
1005
1006                         fprintf(f, fmt,
1007                                 (unsigned long) s->time,
1008                                 s->data.val,
1009                                 io_sample_ddir(s), (unsigned long long) s->bs,
1010                                 (unsigned long long) so->offset,
1011                                 prio_val);
1012                 }
1013         }
1014 }
1015
1016 #ifdef CONFIG_ZLIB
1017
1018 struct iolog_flush_data {
1019         struct workqueue_work work;
1020         struct io_log *log;
1021         void *samples;
1022         uint32_t nr_samples;
1023         bool free;
1024 };
1025
1026 #define GZ_CHUNK        131072
1027
1028 static struct iolog_compress *get_new_chunk(unsigned int seq)
1029 {
1030         struct iolog_compress *c;
1031
1032         c = malloc(sizeof(*c));
1033         INIT_FLIST_HEAD(&c->list);
1034         c->buf = malloc(GZ_CHUNK);
1035         c->len = 0;
1036         c->seq = seq;
1037         return c;
1038 }
1039
1040 static void free_chunk(struct iolog_compress *ic)
1041 {
1042         free(ic->buf);
1043         free(ic);
1044 }
1045
1046 static int z_stream_init(z_stream *stream, int gz_hdr)
1047 {
1048         int wbits = 15;
1049
1050         memset(stream, 0, sizeof(*stream));
1051         stream->zalloc = Z_NULL;
1052         stream->zfree = Z_NULL;
1053         stream->opaque = Z_NULL;
1054         stream->next_in = Z_NULL;
1055
1056         /*
1057          * zlib magic - add 32 for auto-detection of gz header or not,
1058          * if we decide to store files in a gzip friendly format.
1059          */
1060         if (gz_hdr)
1061                 wbits += 32;
1062
1063         if (inflateInit2(stream, wbits) != Z_OK)
1064                 return 1;
1065
1066         return 0;
1067 }
1068
1069 struct inflate_chunk_iter {
1070         unsigned int seq;
1071         int err;
1072         void *buf;
1073         size_t buf_size;
1074         size_t buf_used;
1075         size_t chunk_sz;
1076 };
1077
1078 static void finish_chunk(z_stream *stream, FILE *f,
1079                          struct inflate_chunk_iter *iter)
1080 {
1081         int ret;
1082
1083         ret = inflateEnd(stream);
1084         if (ret != Z_OK)
1085                 log_err("fio: failed to end log inflation seq %d (%d)\n",
1086                                 iter->seq, ret);
1087
1088         flush_samples(f, iter->buf, iter->buf_used);
1089         free(iter->buf);
1090         iter->buf = NULL;
1091         iter->buf_size = iter->buf_used = 0;
1092 }
1093
1094 /*
1095  * Iterative chunk inflation. Handles cases where we cross into a new
1096  * sequence, doing flush finish of previous chunk if needed.
1097  */
1098 static size_t inflate_chunk(struct iolog_compress *ic, int gz_hdr, FILE *f,
1099                             z_stream *stream, struct inflate_chunk_iter *iter)
1100 {
1101         size_t ret;
1102
1103         dprint(FD_COMPRESS, "inflate chunk size=%lu, seq=%u\n",
1104                                 (unsigned long) ic->len, ic->seq);
1105
1106         if (ic->seq != iter->seq) {
1107                 if (iter->seq)
1108                         finish_chunk(stream, f, iter);
1109
1110                 z_stream_init(stream, gz_hdr);
1111                 iter->seq = ic->seq;
1112         }
1113
1114         stream->avail_in = ic->len;
1115         stream->next_in = ic->buf;
1116
1117         if (!iter->buf_size) {
1118                 iter->buf_size = iter->chunk_sz;
1119                 iter->buf = malloc(iter->buf_size);
1120         }
1121
1122         while (stream->avail_in) {
1123                 size_t this_out = iter->buf_size - iter->buf_used;
1124                 int err;
1125
1126                 stream->avail_out = this_out;
1127                 stream->next_out = iter->buf + iter->buf_used;
1128
1129                 err = inflate(stream, Z_NO_FLUSH);
1130                 if (err < 0) {
1131                         log_err("fio: failed inflating log: %d\n", err);
1132                         iter->err = err;
1133                         break;
1134                 }
1135
1136                 iter->buf_used += this_out - stream->avail_out;
1137
1138                 if (!stream->avail_out) {
1139                         iter->buf_size += iter->chunk_sz;
1140                         iter->buf = realloc(iter->buf, iter->buf_size);
1141                         continue;
1142                 }
1143
1144                 if (err == Z_STREAM_END)
1145                         break;
1146         }
1147
1148         ret = (void *) stream->next_in - ic->buf;
1149
1150         dprint(FD_COMPRESS, "inflated to size=%lu\n", (unsigned long) iter->buf_size);
1151
1152         return ret;
1153 }
1154
1155 /*
1156  * Inflate stored compressed chunks, or write them directly to the log
1157  * file if so instructed.
1158  */
1159 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1160 {
1161         struct inflate_chunk_iter iter = { .chunk_sz = log->log_gz, };
1162         z_stream stream;
1163
1164         while (!flist_empty(&log->chunk_list)) {
1165                 struct iolog_compress *ic;
1166
1167                 ic = flist_first_entry(&log->chunk_list, struct iolog_compress, list);
1168                 flist_del(&ic->list);
1169
1170                 if (log->log_gz_store) {
1171                         size_t ret;
1172
1173                         dprint(FD_COMPRESS, "log write chunk size=%lu, "
1174                                 "seq=%u\n", (unsigned long) ic->len, ic->seq);
1175
1176                         ret = fwrite(ic->buf, ic->len, 1, f);
1177                         if (ret != 1 || ferror(f)) {
1178                                 iter.err = errno;
1179                                 log_err("fio: error writing compressed log\n");
1180                         }
1181                 } else
1182                         inflate_chunk(ic, log->log_gz_store, f, &stream, &iter);
1183
1184                 free_chunk(ic);
1185         }
1186
1187         if (iter.seq) {
1188                 finish_chunk(&stream, f, &iter);
1189                 free(iter.buf);
1190         }
1191
1192         return iter.err;
1193 }
1194
1195 /*
1196  * Open compressed log file and decompress the stored chunks and
1197  * write them to stdout. The chunks are stored sequentially in the
1198  * file, so we iterate over them and do them one-by-one.
1199  */
1200 int iolog_file_inflate(const char *file)
1201 {
1202         struct inflate_chunk_iter iter = { .chunk_sz = 64 * 1024 * 1024, };
1203         struct iolog_compress ic;
1204         z_stream stream;
1205         struct stat sb;
1206         size_t ret;
1207         size_t total;
1208         void *buf;
1209         FILE *f;
1210
1211         f = fopen(file, "r");
1212         if (!f) {
1213                 perror("fopen");
1214                 return 1;
1215         }
1216
1217         if (stat(file, &sb) < 0) {
1218                 fclose(f);
1219                 perror("stat");
1220                 return 1;
1221         }
1222
1223         ic.buf = buf = malloc(sb.st_size);
1224         ic.len = sb.st_size;
1225         ic.seq = 1;
1226
1227         ret = fread(ic.buf, ic.len, 1, f);
1228         if (ret == 0 && ferror(f)) {
1229                 perror("fread");
1230                 fclose(f);
1231                 free(buf);
1232                 return 1;
1233         } else if (ferror(f) || (!feof(f) && ret != 1)) {
1234                 log_err("fio: short read on reading log\n");
1235                 fclose(f);
1236                 free(buf);
1237                 return 1;
1238         }
1239
1240         fclose(f);
1241
1242         /*
1243          * Each chunk will return Z_STREAM_END. We don't know how many
1244          * chunks are in the file, so we just keep looping and incrementing
1245          * the sequence number until we have consumed the whole compressed
1246          * file.
1247          */
1248         total = ic.len;
1249         do {
1250                 size_t iret;
1251
1252                 iret = inflate_chunk(&ic,  1, stdout, &stream, &iter);
1253                 total -= iret;
1254                 if (!total)
1255                         break;
1256                 if (iter.err)
1257                         break;
1258
1259                 ic.seq++;
1260                 ic.len -= iret;
1261                 ic.buf += iret;
1262         } while (1);
1263
1264         if (iter.seq) {
1265                 finish_chunk(&stream, stdout, &iter);
1266                 free(iter.buf);
1267         }
1268
1269         free(buf);
1270         return iter.err;
1271 }
1272
1273 #else
1274
1275 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1276 {
1277         return 0;
1278 }
1279
1280 int iolog_file_inflate(const char *file)
1281 {
1282         log_err("fio: log inflation not possible without zlib\n");
1283         return 1;
1284 }
1285
1286 #endif
1287
1288 void flush_log(struct io_log *log, bool do_append)
1289 {
1290         void *buf;
1291         FILE *f;
1292
1293         if (!do_append)
1294                 f = fopen(log->filename, "w");
1295         else
1296                 f = fopen(log->filename, "a");
1297         if (!f) {
1298                 perror("fopen log");
1299                 return;
1300         }
1301
1302         buf = set_file_buffer(f);
1303
1304         inflate_gz_chunks(log, f);
1305
1306         while (!flist_empty(&log->io_logs)) {
1307                 struct io_logs *cur_log;
1308
1309                 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1310                 flist_del_init(&cur_log->list);
1311                 
1312                 if (log->td && log == log->td->clat_hist_log)
1313                         flush_hist_samples(f, log->hist_coarseness, cur_log->log,
1314                                            log_sample_sz(log, cur_log));
1315                 else
1316                         flush_samples(f, cur_log->log, log_sample_sz(log, cur_log));
1317                 
1318                 sfree(cur_log);
1319         }
1320
1321         fclose(f);
1322         clear_file_buffer(buf);
1323 }
1324
1325 static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
1326 {
1327         if (td->flags & TD_F_COMPRESS_LOG)
1328                 iolog_flush(log);
1329
1330         if (trylock) {
1331                 if (fio_trylock_file(log->filename))
1332                         return 1;
1333         } else
1334                 fio_lock_file(log->filename);
1335
1336         if (td->client_type == FIO_CLIENT_TYPE_GUI || is_backend)
1337                 fio_send_iolog(td, log, log->filename);
1338         else
1339                 flush_log(log, !td->o.per_job_logs);
1340
1341         fio_unlock_file(log->filename);
1342         free_log(log);
1343         return 0;
1344 }
1345
1346 size_t log_chunk_sizes(struct io_log *log)
1347 {
1348         struct flist_head *entry;
1349         size_t ret;
1350
1351         if (flist_empty(&log->chunk_list))
1352                 return 0;
1353
1354         ret = 0;
1355         pthread_mutex_lock(&log->chunk_lock);
1356         flist_for_each(entry, &log->chunk_list) {
1357                 struct iolog_compress *c;
1358
1359                 c = flist_entry(entry, struct iolog_compress, list);
1360                 ret += c->len;
1361         }
1362         pthread_mutex_unlock(&log->chunk_lock);
1363         return ret;
1364 }
1365
1366 #ifdef CONFIG_ZLIB
1367
1368 static void iolog_put_deferred(struct io_log *log, void *ptr)
1369 {
1370         if (!ptr)
1371                 return;
1372
1373         pthread_mutex_lock(&log->deferred_free_lock);
1374         if (log->deferred < IOLOG_MAX_DEFER) {
1375                 log->deferred_items[log->deferred] = ptr;
1376                 log->deferred++;
1377         } else if (!fio_did_warn(FIO_WARN_IOLOG_DROP))
1378                 log_err("fio: had to drop log entry free\n");
1379         pthread_mutex_unlock(&log->deferred_free_lock);
1380 }
1381
1382 static void iolog_free_deferred(struct io_log *log)
1383 {
1384         int i;
1385
1386         if (!log->deferred)
1387                 return;
1388
1389         pthread_mutex_lock(&log->deferred_free_lock);
1390
1391         for (i = 0; i < log->deferred; i++) {
1392                 free(log->deferred_items[i]);
1393                 log->deferred_items[i] = NULL;
1394         }
1395
1396         log->deferred = 0;
1397         pthread_mutex_unlock(&log->deferred_free_lock);
1398 }
1399
1400 static int gz_work(struct iolog_flush_data *data)
1401 {
1402         struct iolog_compress *c = NULL;
1403         struct flist_head list;
1404         unsigned int seq;
1405         z_stream stream;
1406         size_t total = 0;
1407         int ret;
1408
1409         INIT_FLIST_HEAD(&list);
1410
1411         memset(&stream, 0, sizeof(stream));
1412         stream.zalloc = Z_NULL;
1413         stream.zfree = Z_NULL;
1414         stream.opaque = Z_NULL;
1415
1416         ret = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
1417         if (ret != Z_OK) {
1418                 log_err("fio: failed to init gz stream\n");
1419                 goto err;
1420         }
1421
1422         seq = ++data->log->chunk_seq;
1423
1424         stream.next_in = (void *) data->samples;
1425         stream.avail_in = data->nr_samples * log_entry_sz(data->log);
1426
1427         dprint(FD_COMPRESS, "deflate input size=%lu, seq=%u, log=%s\n",
1428                                 (unsigned long) stream.avail_in, seq,
1429                                 data->log->filename);
1430         do {
1431                 if (c)
1432                         dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1433                                 (unsigned long) c->len);
1434                 c = get_new_chunk(seq);
1435                 stream.avail_out = GZ_CHUNK;
1436                 stream.next_out = c->buf;
1437                 ret = deflate(&stream, Z_NO_FLUSH);
1438                 if (ret < 0) {
1439                         log_err("fio: deflate log (%d)\n", ret);
1440                         free_chunk(c);
1441                         goto err;
1442                 }
1443
1444                 c->len = GZ_CHUNK - stream.avail_out;
1445                 flist_add_tail(&c->list, &list);
1446                 total += c->len;
1447         } while (stream.avail_in);
1448
1449         stream.next_out = c->buf + c->len;
1450         stream.avail_out = GZ_CHUNK - c->len;
1451
1452         ret = deflate(&stream, Z_FINISH);
1453         if (ret < 0) {
1454                 /*
1455                  * Z_BUF_ERROR is special, it just means we need more
1456                  * output space. We'll handle that below. Treat any other
1457                  * error as fatal.
1458                  */
1459                 if (ret != Z_BUF_ERROR) {
1460                         log_err("fio: deflate log (%d)\n", ret);
1461                         flist_del(&c->list);
1462                         free_chunk(c);
1463                         goto err;
1464                 }
1465         }
1466
1467         total -= c->len;
1468         c->len = GZ_CHUNK - stream.avail_out;
1469         total += c->len;
1470         dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq, (unsigned long) c->len);
1471
1472         if (ret != Z_STREAM_END) {
1473                 do {
1474                         c = get_new_chunk(seq);
1475                         stream.avail_out = GZ_CHUNK;
1476                         stream.next_out = c->buf;
1477                         ret = deflate(&stream, Z_FINISH);
1478                         c->len = GZ_CHUNK - stream.avail_out;
1479                         total += c->len;
1480                         flist_add_tail(&c->list, &list);
1481                         dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1482                                 (unsigned long) c->len);
1483                 } while (ret != Z_STREAM_END);
1484         }
1485
1486         dprint(FD_COMPRESS, "deflated to size=%lu\n", (unsigned long) total);
1487
1488         ret = deflateEnd(&stream);
1489         if (ret != Z_OK)
1490                 log_err("fio: deflateEnd %d\n", ret);
1491
1492         iolog_put_deferred(data->log, data->samples);
1493
1494         if (!flist_empty(&list)) {
1495                 pthread_mutex_lock(&data->log->chunk_lock);
1496                 flist_splice_tail(&list, &data->log->chunk_list);
1497                 pthread_mutex_unlock(&data->log->chunk_lock);
1498         }
1499
1500         ret = 0;
1501 done:
1502         if (data->free)
1503                 sfree(data);
1504         return ret;
1505 err:
1506         while (!flist_empty(&list)) {
1507                 c = flist_first_entry(list.next, struct iolog_compress, list);
1508                 flist_del(&c->list);
1509                 free_chunk(c);
1510         }
1511         ret = 1;
1512         goto done;
1513 }
1514
1515 /*
1516  * Invoked from our compress helper thread, when logging would have exceeded
1517  * the specified memory limitation. Compresses the previously stored
1518  * entries.
1519  */
1520 static int gz_work_async(struct submit_worker *sw, struct workqueue_work *work)
1521 {
1522         return gz_work(container_of(work, struct iolog_flush_data, work));
1523 }
1524
1525 static int gz_init_worker(struct submit_worker *sw)
1526 {
1527         struct thread_data *td = sw->wq->td;
1528
1529         if (!fio_option_is_set(&td->o, log_gz_cpumask))
1530                 return 0;
1531
1532         if (fio_setaffinity(gettid(), td->o.log_gz_cpumask) == -1) {
1533                 log_err("gz: failed to set CPU affinity\n");
1534                 return 1;
1535         }
1536
1537         return 0;
1538 }
1539
1540 static struct workqueue_ops log_compress_wq_ops = {
1541         .fn             = gz_work_async,
1542         .init_worker_fn = gz_init_worker,
1543         .nice           = 1,
1544 };
1545
1546 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1547 {
1548         if (!(td->flags & TD_F_COMPRESS_LOG))
1549                 return 0;
1550
1551         workqueue_init(td, &td->log_compress_wq, &log_compress_wq_ops, 1, sk_out);
1552         return 0;
1553 }
1554
1555 void iolog_compress_exit(struct thread_data *td)
1556 {
1557         if (!(td->flags & TD_F_COMPRESS_LOG))
1558                 return;
1559
1560         workqueue_exit(&td->log_compress_wq);
1561 }
1562
1563 /*
1564  * Queue work item to compress the existing log entries. We reset the
1565  * current log to a small size, and reference the existing log in the
1566  * data that we queue for compression. Once compression has been done,
1567  * this old log is freed. If called with finish == true, will not return
1568  * until the log compression has completed, and will flush all previous
1569  * logs too
1570  */
1571 static int iolog_flush(struct io_log *log)
1572 {
1573         struct iolog_flush_data *data;
1574
1575         data = malloc(sizeof(*data));
1576         if (!data)
1577                 return 1;
1578
1579         data->log = log;
1580         data->free = false;
1581
1582         while (!flist_empty(&log->io_logs)) {
1583                 struct io_logs *cur_log;
1584
1585                 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1586                 flist_del_init(&cur_log->list);
1587
1588                 data->samples = cur_log->log;
1589                 data->nr_samples = cur_log->nr_samples;
1590
1591                 sfree(cur_log);
1592
1593                 gz_work(data);
1594         }
1595
1596         free(data);
1597         return 0;
1598 }
1599
1600 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1601 {
1602         struct iolog_flush_data *data;
1603
1604         data = smalloc(sizeof(*data));
1605         if (!data)
1606                 return 1;
1607
1608         data->log = log;
1609
1610         data->samples = cur_log->log;
1611         data->nr_samples = cur_log->nr_samples;
1612         data->free = true;
1613
1614         cur_log->nr_samples = cur_log->max_samples = 0;
1615         cur_log->log = NULL;
1616
1617         workqueue_enqueue(&log->td->log_compress_wq, &data->work);
1618
1619         iolog_free_deferred(log);
1620
1621         return 0;
1622 }
1623 #else
1624
1625 static int iolog_flush(struct io_log *log)
1626 {
1627         return 1;
1628 }
1629
1630 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1631 {
1632         return 1;
1633 }
1634
1635 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1636 {
1637         return 0;
1638 }
1639
1640 void iolog_compress_exit(struct thread_data *td)
1641 {
1642 }
1643
1644 #endif
1645
1646 struct io_logs *iolog_cur_log(struct io_log *log)
1647 {
1648         if (flist_empty(&log->io_logs))
1649                 return NULL;
1650
1651         return flist_last_entry(&log->io_logs, struct io_logs, list);
1652 }
1653
1654 uint64_t iolog_nr_samples(struct io_log *iolog)
1655 {
1656         struct flist_head *entry;
1657         uint64_t ret = 0;
1658
1659         flist_for_each(entry, &iolog->io_logs) {
1660                 struct io_logs *cur_log;
1661
1662                 cur_log = flist_entry(entry, struct io_logs, list);
1663                 ret += cur_log->nr_samples;
1664         }
1665
1666         return ret;
1667 }
1668
1669 static int __write_log(struct thread_data *td, struct io_log *log, int try)
1670 {
1671         if (log)
1672                 return finish_log(td, log, try);
1673
1674         return 0;
1675 }
1676
1677 static int write_iops_log(struct thread_data *td, int try, bool unit_log)
1678 {
1679         int ret;
1680
1681         if (per_unit_log(td->iops_log) != unit_log)
1682                 return 0;
1683
1684         ret = __write_log(td, td->iops_log, try);
1685         if (!ret)
1686                 td->iops_log = NULL;
1687
1688         return ret;
1689 }
1690
1691 static int write_slat_log(struct thread_data *td, int try, bool unit_log)
1692 {
1693         int ret;
1694
1695         if (!unit_log)
1696                 return 0;
1697
1698         ret = __write_log(td, td->slat_log, try);
1699         if (!ret)
1700                 td->slat_log = NULL;
1701
1702         return ret;
1703 }
1704
1705 static int write_clat_log(struct thread_data *td, int try, bool unit_log)
1706 {
1707         int ret;
1708
1709         if (!unit_log)
1710                 return 0;
1711
1712         ret = __write_log(td, td->clat_log, try);
1713         if (!ret)
1714                 td->clat_log = NULL;
1715
1716         return ret;
1717 }
1718
1719 static int write_clat_hist_log(struct thread_data *td, int try, bool unit_log)
1720 {
1721         int ret;
1722
1723         if (!unit_log)
1724                 return 0;
1725
1726         ret = __write_log(td, td->clat_hist_log, try);
1727         if (!ret)
1728                 td->clat_hist_log = NULL;
1729
1730         return ret;
1731 }
1732
1733 static int write_lat_log(struct thread_data *td, int try, bool unit_log)
1734 {
1735         int ret;
1736
1737         if (!unit_log)
1738                 return 0;
1739
1740         ret = __write_log(td, td->lat_log, try);
1741         if (!ret)
1742                 td->lat_log = NULL;
1743
1744         return ret;
1745 }
1746
1747 static int write_bandw_log(struct thread_data *td, int try, bool unit_log)
1748 {
1749         int ret;
1750
1751         if (per_unit_log(td->bw_log) != unit_log)
1752                 return 0;
1753
1754         ret = __write_log(td, td->bw_log, try);
1755         if (!ret)
1756                 td->bw_log = NULL;
1757
1758         return ret;
1759 }
1760
1761 enum {
1762         BW_LOG_MASK     = 1,
1763         LAT_LOG_MASK    = 2,
1764         SLAT_LOG_MASK   = 4,
1765         CLAT_LOG_MASK   = 8,
1766         IOPS_LOG_MASK   = 16,
1767         CLAT_HIST_LOG_MASK = 32,
1768
1769         ALL_LOG_NR      = 6,
1770 };
1771
1772 struct log_type {
1773         unsigned int mask;
1774         int (*fn)(struct thread_data *, int, bool);
1775 };
1776
1777 static struct log_type log_types[] = {
1778         {
1779                 .mask   = BW_LOG_MASK,
1780                 .fn     = write_bandw_log,
1781         },
1782         {
1783                 .mask   = LAT_LOG_MASK,
1784                 .fn     = write_lat_log,
1785         },
1786         {
1787                 .mask   = SLAT_LOG_MASK,
1788                 .fn     = write_slat_log,
1789         },
1790         {
1791                 .mask   = CLAT_LOG_MASK,
1792                 .fn     = write_clat_log,
1793         },
1794         {
1795                 .mask   = IOPS_LOG_MASK,
1796                 .fn     = write_iops_log,
1797         },
1798         {
1799                 .mask   = CLAT_HIST_LOG_MASK,
1800                 .fn     = write_clat_hist_log,
1801         }
1802 };
1803
1804 void td_writeout_logs(struct thread_data *td, bool unit_logs)
1805 {
1806         unsigned int log_mask = 0;
1807         unsigned int log_left = ALL_LOG_NR;
1808         int old_state, i;
1809
1810         old_state = td_bump_runstate(td, TD_FINISHING);
1811
1812         finalize_logs(td, unit_logs);
1813
1814         while (log_left) {
1815                 int prev_log_left = log_left;
1816
1817                 for (i = 0; i < ALL_LOG_NR && log_left; i++) {
1818                         struct log_type *lt = &log_types[i];
1819                         int ret;
1820
1821                         if (!(log_mask & lt->mask)) {
1822                                 ret = lt->fn(td, log_left != 1, unit_logs);
1823                                 if (!ret) {
1824                                         log_left--;
1825                                         log_mask |= lt->mask;
1826                                 }
1827                         }
1828                 }
1829
1830                 if (prev_log_left == log_left)
1831                         usleep(5000);
1832         }
1833
1834         td_restore_runstate(td, old_state);
1835 }
1836
1837 void fio_writeout_logs(bool unit_logs)
1838 {
1839         struct thread_data *td;
1840         int i;
1841
1842         for_each_td(td, i)
1843                 td_writeout_logs(td, unit_logs);
1844 }