Rename 'fallthrough' attribute to 'fio_fallthrough'
[fio.git] / iolog.c
1 /*
2  * Code related to writing an iolog of what a thread is doing, and to
3  * later read that back and replay
4  */
5 #include <stdio.h>
6 #include <stdlib.h>
7 #include <assert.h>
8 #include <sys/types.h>
9 #include <sys/stat.h>
10 #include <unistd.h>
11 #ifdef CONFIG_ZLIB
12 #include <zlib.h>
13 #endif
14
15 #include "flist.h"
16 #include "fio.h"
17 #include "trim.h"
18 #include "filelock.h"
19 #include "smalloc.h"
20 #include "blktrace.h"
21 #include "pshared.h"
22 #include "lib/roundup.h"
23
24 #include <netinet/in.h>
25 #include <netinet/tcp.h>
26 #include <arpa/inet.h>
27 #include <sys/stat.h>
28 #include <sys/socket.h>
29 #include <sys/un.h>
30
31 static int iolog_flush(struct io_log *log);
32
33 static const char iolog_ver2[] = "fio version 2 iolog";
34
35 void queue_io_piece(struct thread_data *td, struct io_piece *ipo)
36 {
37         flist_add_tail(&ipo->list, &td->io_log_list);
38         td->total_io_size += ipo->len;
39 }
40
41 void log_io_u(const struct thread_data *td, const struct io_u *io_u)
42 {
43         if (!td->o.write_iolog_file)
44                 return;
45
46         fprintf(td->iolog_f, "%s %s %llu %llu\n", io_u->file->file_name,
47                                                 io_ddir_name(io_u->ddir),
48                                                 io_u->offset, io_u->buflen);
49 }
50
51 void log_file(struct thread_data *td, struct fio_file *f,
52               enum file_log_act what)
53 {
54         const char *act[] = { "add", "open", "close" };
55
56         assert(what < 3);
57
58         if (!td->o.write_iolog_file)
59                 return;
60
61
62         /*
63          * this happens on the pre-open/close done before the job starts
64          */
65         if (!td->iolog_f)
66                 return;
67
68         fprintf(td->iolog_f, "%s %s\n", f->file_name, act[what]);
69 }
70
71 static void iolog_delay(struct thread_data *td, unsigned long delay)
72 {
73         uint64_t usec = utime_since_now(&td->last_issue);
74         unsigned long orig_delay = delay;
75         uint64_t this_delay;
76         struct timespec ts;
77
78         if (delay < td->time_offset) {
79                 td->time_offset = 0;
80                 return;
81         }
82
83         delay -= td->time_offset;
84         if (delay < usec)
85                 return;
86
87         delay -= usec;
88
89         fio_gettime(&ts, NULL);
90         while (delay && !td->terminate) {
91                 this_delay = delay;
92                 if (this_delay > 500000)
93                         this_delay = 500000;
94
95                 usec_sleep(td, this_delay);
96                 delay -= this_delay;
97         }
98
99         usec = utime_since_now(&ts);
100         if (usec > orig_delay)
101                 td->time_offset = usec - orig_delay;
102         else
103                 td->time_offset = 0;
104 }
105
106 static int ipo_special(struct thread_data *td, struct io_piece *ipo)
107 {
108         struct fio_file *f;
109         int ret;
110
111         /*
112          * Not a special ipo
113          */
114         if (ipo->ddir != DDIR_INVAL)
115                 return 0;
116
117         f = td->files[ipo->fileno];
118
119         switch (ipo->file_action) {
120         case FIO_LOG_OPEN_FILE:
121                 if (td->o.replay_redirect && fio_file_open(f)) {
122                         dprint(FD_FILE, "iolog: ignoring re-open of file %s\n",
123                                         f->file_name);
124                         break;
125                 }
126                 ret = td_io_open_file(td, f);
127                 if (!ret)
128                         break;
129                 td_verror(td, ret, "iolog open file");
130                 return -1;
131         case FIO_LOG_CLOSE_FILE:
132                 td_io_close_file(td, f);
133                 break;
134         case FIO_LOG_UNLINK_FILE:
135                 td_io_unlink_file(td, f);
136                 break;
137         default:
138                 log_err("fio: bad file action %d\n", ipo->file_action);
139                 break;
140         }
141
142         return 1;
143 }
144
145 static bool read_iolog2(struct thread_data *td);
146
147 int read_iolog_get(struct thread_data *td, struct io_u *io_u)
148 {
149         struct io_piece *ipo;
150         unsigned long elapsed;
151
152         while (!flist_empty(&td->io_log_list)) {
153                 int ret;
154
155                 if (td->o.read_iolog_chunked) {
156                         if (td->io_log_checkmark == td->io_log_current) {
157                                 if (td->io_log_blktrace) {
158                                         if (!read_blktrace(td))
159                                                 return 1;
160                                 } else {
161                                         if (!read_iolog2(td))
162                                                 return 1;
163                                 }
164                         }
165                         td->io_log_current--;
166                 }
167                 ipo = flist_first_entry(&td->io_log_list, struct io_piece, list);
168                 flist_del(&ipo->list);
169                 remove_trim_entry(td, ipo);
170
171                 ret = ipo_special(td, ipo);
172                 if (ret < 0) {
173                         free(ipo);
174                         break;
175                 } else if (ret > 0) {
176                         free(ipo);
177                         continue;
178                 }
179
180                 io_u->ddir = ipo->ddir;
181                 if (ipo->ddir != DDIR_WAIT) {
182                         io_u->offset = ipo->offset;
183                         io_u->verify_offset = ipo->offset;
184                         io_u->buflen = ipo->len;
185                         io_u->file = td->files[ipo->fileno];
186                         get_file(io_u->file);
187                         dprint(FD_IO, "iolog: get %llu/%llu/%s\n", io_u->offset,
188                                                 io_u->buflen, io_u->file->file_name);
189                         if (ipo->delay)
190                                 iolog_delay(td, ipo->delay);
191                 } else {
192                         elapsed = mtime_since_genesis();
193                         if (ipo->delay > elapsed)
194                                 usec_sleep(td, (ipo->delay - elapsed) * 1000);
195                 }
196
197                 free(ipo);
198
199                 if (io_u->ddir != DDIR_WAIT)
200                         return 0;
201         }
202
203         td->done = 1;
204         return 1;
205 }
206
207 void prune_io_piece_log(struct thread_data *td)
208 {
209         struct io_piece *ipo;
210         struct fio_rb_node *n;
211
212         while ((n = rb_first(&td->io_hist_tree)) != NULL) {
213                 ipo = rb_entry(n, struct io_piece, rb_node);
214                 rb_erase(n, &td->io_hist_tree);
215                 remove_trim_entry(td, ipo);
216                 td->io_hist_len--;
217                 free(ipo);
218         }
219
220         while (!flist_empty(&td->io_hist_list)) {
221                 ipo = flist_first_entry(&td->io_hist_list, struct io_piece, list);
222                 flist_del(&ipo->list);
223                 remove_trim_entry(td, ipo);
224                 td->io_hist_len--;
225                 free(ipo);
226         }
227 }
228
229 /*
230  * log a successful write, so we can unwind the log for verify
231  */
232 void log_io_piece(struct thread_data *td, struct io_u *io_u)
233 {
234         struct fio_rb_node **p, *parent;
235         struct io_piece *ipo, *__ipo;
236
237         ipo = calloc(1, sizeof(struct io_piece));
238         init_ipo(ipo);
239         ipo->file = io_u->file;
240         ipo->offset = io_u->offset;
241         ipo->len = io_u->buflen;
242         ipo->numberio = io_u->numberio;
243         ipo->flags = IP_F_IN_FLIGHT;
244
245         io_u->ipo = ipo;
246
247         if (io_u_should_trim(td, io_u)) {
248                 flist_add_tail(&ipo->trim_list, &td->trim_list);
249                 td->trim_entries++;
250         }
251
252         /*
253          * Only sort writes if we don't have a random map in which case we need
254          * to check for duplicate blocks and drop the old one, which we rely on
255          * the rb insert/lookup for handling.
256          */
257         if (file_randommap(td, ipo->file)) {
258                 INIT_FLIST_HEAD(&ipo->list);
259                 flist_add_tail(&ipo->list, &td->io_hist_list);
260                 ipo->flags |= IP_F_ONLIST;
261                 td->io_hist_len++;
262                 return;
263         }
264
265         RB_CLEAR_NODE(&ipo->rb_node);
266
267         /*
268          * Sort the entry into the verification list
269          */
270 restart:
271         p = &td->io_hist_tree.rb_node;
272         parent = NULL;
273         while (*p) {
274                 int overlap = 0;
275                 parent = *p;
276
277                 __ipo = rb_entry(parent, struct io_piece, rb_node);
278                 if (ipo->file < __ipo->file)
279                         p = &(*p)->rb_left;
280                 else if (ipo->file > __ipo->file)
281                         p = &(*p)->rb_right;
282                 else if (ipo->offset < __ipo->offset) {
283                         p = &(*p)->rb_left;
284                         overlap = ipo->offset + ipo->len > __ipo->offset;
285                 }
286                 else if (ipo->offset > __ipo->offset) {
287                         p = &(*p)->rb_right;
288                         overlap = __ipo->offset + __ipo->len > ipo->offset;
289                 }
290                 else
291                         overlap = 1;
292
293                 if (overlap) {
294                         dprint(FD_IO, "iolog: overlap %llu/%lu, %llu/%lu\n",
295                                 __ipo->offset, __ipo->len,
296                                 ipo->offset, ipo->len);
297                         td->io_hist_len--;
298                         rb_erase(parent, &td->io_hist_tree);
299                         remove_trim_entry(td, __ipo);
300                         if (!(__ipo->flags & IP_F_IN_FLIGHT))
301                                 free(__ipo);
302                         goto restart;
303                 }
304         }
305
306         rb_link_node(&ipo->rb_node, parent, p);
307         rb_insert_color(&ipo->rb_node, &td->io_hist_tree);
308         ipo->flags |= IP_F_ONRB;
309         td->io_hist_len++;
310 }
311
312 void unlog_io_piece(struct thread_data *td, struct io_u *io_u)
313 {
314         struct io_piece *ipo = io_u->ipo;
315
316         if (td->ts.nr_block_infos) {
317                 uint32_t *info = io_u_block_info(td, io_u);
318                 if (BLOCK_INFO_STATE(*info) < BLOCK_STATE_TRIM_FAILURE) {
319                         if (io_u->ddir == DDIR_TRIM)
320                                 *info = BLOCK_INFO_SET_STATE(*info,
321                                                 BLOCK_STATE_TRIM_FAILURE);
322                         else if (io_u->ddir == DDIR_WRITE)
323                                 *info = BLOCK_INFO_SET_STATE(*info,
324                                                 BLOCK_STATE_WRITE_FAILURE);
325                 }
326         }
327
328         if (!ipo)
329                 return;
330
331         if (ipo->flags & IP_F_ONRB)
332                 rb_erase(&ipo->rb_node, &td->io_hist_tree);
333         else if (ipo->flags & IP_F_ONLIST)
334                 flist_del(&ipo->list);
335
336         free(ipo);
337         io_u->ipo = NULL;
338         td->io_hist_len--;
339 }
340
341 void trim_io_piece(const struct io_u *io_u)
342 {
343         struct io_piece *ipo = io_u->ipo;
344
345         if (!ipo)
346                 return;
347
348         ipo->len = io_u->xfer_buflen - io_u->resid;
349 }
350
351 void write_iolog_close(struct thread_data *td)
352 {
353         if (!td->iolog_f)
354                 return;
355
356         fflush(td->iolog_f);
357         fclose(td->iolog_f);
358         free(td->iolog_buf);
359         td->iolog_f = NULL;
360         td->iolog_buf = NULL;
361 }
362
363 int64_t iolog_items_to_fetch(struct thread_data *td)
364 {
365         struct timespec now;
366         uint64_t elapsed;
367         uint64_t for_1s;
368         int64_t items_to_fetch;
369
370         if (!td->io_log_highmark)
371                 return 10;
372
373
374         fio_gettime(&now, NULL);
375         elapsed = ntime_since(&td->io_log_highmark_time, &now);
376         if (elapsed) {
377                 for_1s = (td->io_log_highmark - td->io_log_current) * 1000000000 / elapsed;
378                 items_to_fetch = for_1s - td->io_log_current;
379                 if (items_to_fetch < 0)
380                         items_to_fetch = 0;
381         } else
382                 items_to_fetch = 0;
383
384         td->io_log_highmark = td->io_log_current + items_to_fetch;
385         td->io_log_checkmark = (td->io_log_highmark + 1) / 2;
386         fio_gettime(&td->io_log_highmark_time, NULL);
387
388         return items_to_fetch;
389 }
390
391 /*
392  * Read version 2 iolog data. It is enhanced to include per-file logging,
393  * syncs, etc.
394  */
395 static bool read_iolog2(struct thread_data *td)
396 {
397         unsigned long long offset;
398         unsigned int bytes;
399         int reads, writes, waits, fileno = 0, file_action = 0; /* stupid gcc */
400         char *rfname, *fname, *act;
401         char *str, *p;
402         enum fio_ddir rw;
403         bool realloc = false;
404         int64_t items_to_fetch = 0;
405         int syncs;
406
407         if (td->o.read_iolog_chunked) {
408                 items_to_fetch = iolog_items_to_fetch(td);
409                 if (!items_to_fetch)
410                         return true;
411         }
412
413         /*
414          * Read in the read iolog and store it, reuse the infrastructure
415          * for doing verifications.
416          */
417         str = malloc(4096);
418         rfname = fname = malloc(256+16);
419         act = malloc(256+16);
420
421         syncs = reads = writes = waits = 0;
422         while ((p = fgets(str, 4096, td->io_log_rfile)) != NULL) {
423                 struct io_piece *ipo;
424                 int r;
425
426                 r = sscanf(p, "%256s %256s %llu %u", rfname, act, &offset,
427                                                                         &bytes);
428
429                 if (td->o.replay_redirect)
430                         fname = td->o.replay_redirect;
431
432                 if (r == 4) {
433                         /*
434                          * Check action first
435                          */
436                         if (!strcmp(act, "wait"))
437                                 rw = DDIR_WAIT;
438                         else if (!strcmp(act, "read"))
439                                 rw = DDIR_READ;
440                         else if (!strcmp(act, "write"))
441                                 rw = DDIR_WRITE;
442                         else if (!strcmp(act, "sync"))
443                                 rw = DDIR_SYNC;
444                         else if (!strcmp(act, "datasync"))
445                                 rw = DDIR_DATASYNC;
446                         else if (!strcmp(act, "trim"))
447                                 rw = DDIR_TRIM;
448                         else {
449                                 log_err("fio: bad iolog file action: %s\n",
450                                                                         act);
451                                 continue;
452                         }
453                         fileno = get_fileno(td, fname);
454                 } else if (r == 2) {
455                         rw = DDIR_INVAL;
456                         if (!strcmp(act, "add")) {
457                                 if (td->o.replay_redirect &&
458                                     get_fileno(td, fname) != -1) {
459                                         dprint(FD_FILE, "iolog: ignoring"
460                                                 " re-add of file %s\n", fname);
461                                 } else {
462                                         fileno = add_file(td, fname, td->subjob_number, 1);
463                                         file_action = FIO_LOG_ADD_FILE;
464                                 }
465                                 continue;
466                         } else if (!strcmp(act, "open")) {
467                                 fileno = get_fileno(td, fname);
468                                 file_action = FIO_LOG_OPEN_FILE;
469                         } else if (!strcmp(act, "close")) {
470                                 fileno = get_fileno(td, fname);
471                                 file_action = FIO_LOG_CLOSE_FILE;
472                         } else {
473                                 log_err("fio: bad iolog file action: %s\n",
474                                                                         act);
475                                 continue;
476                         }
477                 } else {
478                         log_err("bad iolog2: %s\n", p);
479                         continue;
480                 }
481
482                 if (rw == DDIR_READ)
483                         reads++;
484                 else if (rw == DDIR_WRITE) {
485                         /*
486                          * Don't add a write for ro mode
487                          */
488                         if (read_only)
489                                 continue;
490                         writes++;
491                 } else if (rw == DDIR_WAIT) {
492                         if (td->o.no_stall)
493                                 continue;
494                         waits++;
495                 } else if (rw == DDIR_INVAL) {
496                 } else if (ddir_sync(rw)) {
497                         syncs++;
498                 } else {
499                         log_err("bad ddir: %d\n", rw);
500                         continue;
501                 }
502
503                 /*
504                  * Make note of file
505                  */
506                 ipo = calloc(1, sizeof(*ipo));
507                 init_ipo(ipo);
508                 ipo->ddir = rw;
509                 if (rw == DDIR_WAIT) {
510                         ipo->delay = offset;
511                 } else {
512                         if (td->o.replay_scale)
513                                 ipo->offset = offset / td->o.replay_scale;
514                         else
515                                 ipo->offset = offset;
516                         ipo_bytes_align(td->o.replay_align, ipo);
517
518                         ipo->len = bytes;
519                         if (rw != DDIR_INVAL && bytes > td->o.max_bs[rw]) {
520                                 realloc = true;
521                                 td->o.max_bs[rw] = bytes;
522                         }
523                         ipo->fileno = fileno;
524                         ipo->file_action = file_action;
525                         td->o.size += bytes;
526                 }
527
528                 queue_io_piece(td, ipo);
529
530                 if (td->o.read_iolog_chunked) {
531                         td->io_log_current++;
532                         items_to_fetch--;
533                         if (items_to_fetch == 0)
534                                 break;
535                 }
536         }
537
538         free(str);
539         free(act);
540         free(rfname);
541
542         if (td->o.read_iolog_chunked) {
543                 td->io_log_highmark = td->io_log_current;
544                 td->io_log_checkmark = (td->io_log_highmark + 1) / 2;
545                 fio_gettime(&td->io_log_highmark_time, NULL);
546         }
547
548         if (writes && read_only) {
549                 log_err("fio: <%s> skips replay of %d writes due to"
550                         " read-only\n", td->o.name, writes);
551                 writes = 0;
552         }
553         if (syncs)
554                 td->flags |= TD_F_SYNCS;
555
556         if (td->o.read_iolog_chunked) {
557                 if (td->io_log_current == 0) {
558                         return false;
559                 }
560                 td->o.td_ddir = TD_DDIR_RW;
561                 if (realloc && td->orig_buffer)
562                 {
563                         io_u_quiesce(td);
564                         free_io_mem(td);
565                         init_io_u_buffers(td);
566                 }
567                 return true;
568         }
569
570         if (!reads && !writes && !waits)
571                 return false;
572         else if (reads && !writes)
573                 td->o.td_ddir = TD_DDIR_READ;
574         else if (!reads && writes)
575                 td->o.td_ddir = TD_DDIR_WRITE;
576         else
577                 td->o.td_ddir = TD_DDIR_RW;
578
579         return true;
580 }
581
582 static bool is_socket(const char *path)
583 {
584         struct stat buf;
585         int r;
586
587         r = stat(path, &buf);
588         if (r == -1)
589                 return false;
590
591         return S_ISSOCK(buf.st_mode);
592 }
593
594 static int open_socket(const char *path)
595 {
596         struct sockaddr_un addr;
597         int ret, fd;
598
599         fd = socket(AF_UNIX, SOCK_STREAM, 0);
600         if (fd < 0)
601                 return fd;
602
603         addr.sun_family = AF_UNIX;
604         if (snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", path) >=
605             sizeof(addr.sun_path)) {
606                 log_err("%s: path name %s is too long for a Unix socket\n",
607                         __func__, path);
608         }
609
610         ret = connect(fd, (const struct sockaddr *)&addr, strlen(path) + sizeof(addr.sun_family));
611         if (!ret)
612                 return fd;
613
614         close(fd);
615         return -1;
616 }
617
618 /*
619  * open iolog, check version, and call appropriate parser
620  */
621 static bool init_iolog_read(struct thread_data *td, char *fname)
622 {
623         char buffer[256], *p;
624         FILE *f = NULL;
625
626         dprint(FD_IO, "iolog: name=%s\n", fname);
627
628         if (is_socket(fname)) {
629                 int fd;
630
631                 fd = open_socket(fname);
632                 if (fd >= 0)
633                         f = fdopen(fd, "r");
634         } else if (!strcmp(fname, "-")) {
635                 f = stdin;
636         } else
637                 f = fopen(fname, "r");
638
639         if (!f) {
640                 perror("fopen read iolog");
641                 return false;
642         }
643
644         p = fgets(buffer, sizeof(buffer), f);
645         if (!p) {
646                 td_verror(td, errno, "iolog read");
647                 log_err("fio: unable to read iolog\n");
648                 fclose(f);
649                 return false;
650         }
651
652         /*
653          * version 2 of the iolog stores a specific string as the
654          * first line, check for that
655          */
656         if (!strncmp(iolog_ver2, buffer, strlen(iolog_ver2))) {
657                 free_release_files(td);
658                 td->io_log_rfile = f;
659                 return read_iolog2(td);
660         }
661
662         log_err("fio: iolog version 1 is no longer supported\n");
663         fclose(f);
664         return false;
665 }
666
667 /*
668  * Set up a log for storing io patterns.
669  */
670 static bool init_iolog_write(struct thread_data *td)
671 {
672         struct fio_file *ff;
673         FILE *f;
674         unsigned int i;
675
676         f = fopen(td->o.write_iolog_file, "a");
677         if (!f) {
678                 perror("fopen write iolog");
679                 return false;
680         }
681
682         /*
683          * That's it for writing, setup a log buffer and we're done.
684           */
685         td->iolog_f = f;
686         td->iolog_buf = malloc(8192);
687         setvbuf(f, td->iolog_buf, _IOFBF, 8192);
688
689         /*
690          * write our version line
691          */
692         if (fprintf(f, "%s\n", iolog_ver2) < 0) {
693                 perror("iolog init\n");
694                 return false;
695         }
696
697         /*
698          * add all known files
699          */
700         for_each_file(td, ff, i)
701                 log_file(td, ff, FIO_LOG_ADD_FILE);
702
703         return true;
704 }
705
706 bool init_iolog(struct thread_data *td)
707 {
708         bool ret;
709
710         if (td->o.read_iolog_file) {
711                 int need_swap;
712                 char * fname = get_name_by_idx(td->o.read_iolog_file, td->subjob_number);
713
714                 /*
715                  * Check if it's a blktrace file and load that if possible.
716                  * Otherwise assume it's a normal log file and load that.
717                  */
718                 if (is_blktrace(fname, &need_swap)) {
719                         td->io_log_blktrace = 1;
720                         ret = init_blktrace_read(td, fname, need_swap);
721                 } else {
722                         td->io_log_blktrace = 0;
723                         ret = init_iolog_read(td, fname);
724                 }
725                 free(fname);
726         } else if (td->o.write_iolog_file)
727                 ret = init_iolog_write(td);
728         else
729                 ret = true;
730
731         if (!ret)
732                 td_verror(td, EINVAL, "failed initializing iolog");
733
734         return ret;
735 }
736
737 void setup_log(struct io_log **log, struct log_params *p,
738                const char *filename)
739 {
740         struct io_log *l;
741         int i;
742         struct io_u_plat_entry *entry;
743         struct flist_head *list;
744
745         l = scalloc(1, sizeof(*l));
746         INIT_FLIST_HEAD(&l->io_logs);
747         l->log_type = p->log_type;
748         l->log_offset = p->log_offset;
749         l->log_prio = p->log_prio;
750         l->log_gz = p->log_gz;
751         l->log_gz_store = p->log_gz_store;
752         l->avg_msec = p->avg_msec;
753         l->hist_msec = p->hist_msec;
754         l->hist_coarseness = p->hist_coarseness;
755         l->filename = strdup(filename);
756         l->td = p->td;
757
758         /* Initialize histogram lists for each r/w direction,
759          * with initial io_u_plat of all zeros:
760          */
761         for (i = 0; i < DDIR_RWDIR_CNT; i++) {
762                 list = &l->hist_window[i].list;
763                 INIT_FLIST_HEAD(list);
764                 entry = calloc(1, sizeof(struct io_u_plat_entry));
765                 flist_add(&entry->list, list);
766         }
767
768         if (l->td && l->td->o.io_submit_mode != IO_MODE_OFFLOAD) {
769                 unsigned int def_samples = DEF_LOG_ENTRIES;
770                 struct io_logs *__p;
771
772                 __p = calloc(1, sizeof(*l->pending));
773                 if (l->td->o.iodepth > DEF_LOG_ENTRIES)
774                         def_samples = roundup_pow2(l->td->o.iodepth);
775                 __p->max_samples = def_samples;
776                 __p->log = calloc(__p->max_samples, log_entry_sz(l));
777                 l->pending = __p;
778         }
779
780         if (l->log_offset)
781                 l->log_ddir_mask = LOG_OFFSET_SAMPLE_BIT;
782         if (l->log_prio)
783                 l->log_ddir_mask |= LOG_PRIO_SAMPLE_BIT;
784
785         INIT_FLIST_HEAD(&l->chunk_list);
786
787         if (l->log_gz && !p->td)
788                 l->log_gz = 0;
789         else if (l->log_gz || l->log_gz_store) {
790                 mutex_init_pshared(&l->chunk_lock);
791                 mutex_init_pshared(&l->deferred_free_lock);
792                 p->td->flags |= TD_F_COMPRESS_LOG;
793         }
794
795         *log = l;
796 }
797
798 #ifdef CONFIG_SETVBUF
799 static void *set_file_buffer(FILE *f)
800 {
801         size_t size = 1048576;
802         void *buf;
803
804         buf = malloc(size);
805         setvbuf(f, buf, _IOFBF, size);
806         return buf;
807 }
808
809 static void clear_file_buffer(void *buf)
810 {
811         free(buf);
812 }
813 #else
814 static void *set_file_buffer(FILE *f)
815 {
816         return NULL;
817 }
818
819 static void clear_file_buffer(void *buf)
820 {
821 }
822 #endif
823
824 void free_log(struct io_log *log)
825 {
826         while (!flist_empty(&log->io_logs)) {
827                 struct io_logs *cur_log;
828
829                 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
830                 flist_del_init(&cur_log->list);
831                 free(cur_log->log);
832                 sfree(cur_log);
833         }
834
835         if (log->pending) {
836                 free(log->pending->log);
837                 free(log->pending);
838                 log->pending = NULL;
839         }
840
841         free(log->pending);
842         free(log->filename);
843         sfree(log);
844 }
845
846 uint64_t hist_sum(int j, int stride, uint64_t *io_u_plat,
847                 uint64_t *io_u_plat_last)
848 {
849         uint64_t sum;
850         int k;
851
852         if (io_u_plat_last) {
853                 for (k = sum = 0; k < stride; k++)
854                         sum += io_u_plat[j + k] - io_u_plat_last[j + k];
855         } else {
856                 for (k = sum = 0; k < stride; k++)
857                         sum += io_u_plat[j + k];
858         }
859
860         return sum;
861 }
862
863 static void flush_hist_samples(FILE *f, int hist_coarseness, void *samples,
864                                uint64_t sample_size)
865 {
866         struct io_sample *s;
867         int log_offset;
868         uint64_t i, j, nr_samples;
869         struct io_u_plat_entry *entry, *entry_before;
870         uint64_t *io_u_plat;
871         uint64_t *io_u_plat_before;
872
873         int stride = 1 << hist_coarseness;
874         
875         if (!sample_size)
876                 return;
877
878         s = __get_sample(samples, 0, 0);
879         log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
880
881         nr_samples = sample_size / __log_entry_sz(log_offset);
882
883         for (i = 0; i < nr_samples; i++) {
884                 s = __get_sample(samples, log_offset, i);
885
886                 entry = s->data.plat_entry;
887                 io_u_plat = entry->io_u_plat;
888
889                 entry_before = flist_first_entry(&entry->list, struct io_u_plat_entry, list);
890                 io_u_plat_before = entry_before->io_u_plat;
891
892                 fprintf(f, "%lu, %u, %llu, ", (unsigned long) s->time,
893                                                 io_sample_ddir(s), (unsigned long long) s->bs);
894                 for (j = 0; j < FIO_IO_U_PLAT_NR - stride; j += stride) {
895                         fprintf(f, "%llu, ", (unsigned long long)
896                                 hist_sum(j, stride, io_u_plat, io_u_plat_before));
897                 }
898                 fprintf(f, "%llu\n", (unsigned long long)
899                         hist_sum(FIO_IO_U_PLAT_NR - stride, stride, io_u_plat,
900                                         io_u_plat_before));
901
902                 flist_del(&entry_before->list);
903                 free(entry_before);
904         }
905 }
906
907 void flush_samples(FILE *f, void *samples, uint64_t sample_size)
908 {
909         struct io_sample *s;
910         int log_offset, log_prio;
911         uint64_t i, nr_samples;
912         unsigned int prio_val;
913         const char *fmt;
914
915         if (!sample_size)
916                 return;
917
918         s = __get_sample(samples, 0, 0);
919         log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
920         log_prio = (s->__ddir & LOG_PRIO_SAMPLE_BIT) != 0;
921
922         if (log_offset) {
923                 if (log_prio)
924                         fmt = "%lu, %" PRId64 ", %u, %llu, %llu, 0x%04x\n";
925                 else
926                         fmt = "%lu, %" PRId64 ", %u, %llu, %llu, %u\n";
927         } else {
928                 if (log_prio)
929                         fmt = "%lu, %" PRId64 ", %u, %llu, 0x%04x\n";
930                 else
931                         fmt = "%lu, %" PRId64 ", %u, %llu, %u\n";
932         }
933
934         nr_samples = sample_size / __log_entry_sz(log_offset);
935
936         for (i = 0; i < nr_samples; i++) {
937                 s = __get_sample(samples, log_offset, i);
938
939                 if (log_prio)
940                         prio_val = s->priority;
941                 else
942                         prio_val = ioprio_value_is_class_rt(s->priority);
943
944                 if (!log_offset) {
945                         fprintf(f, fmt,
946                                 (unsigned long) s->time,
947                                 s->data.val,
948                                 io_sample_ddir(s), (unsigned long long) s->bs,
949                                 prio_val);
950                 } else {
951                         struct io_sample_offset *so = (void *) s;
952
953                         fprintf(f, fmt,
954                                 (unsigned long) s->time,
955                                 s->data.val,
956                                 io_sample_ddir(s), (unsigned long long) s->bs,
957                                 (unsigned long long) so->offset,
958                                 prio_val);
959                 }
960         }
961 }
962
963 #ifdef CONFIG_ZLIB
964
965 struct iolog_flush_data {
966         struct workqueue_work work;
967         struct io_log *log;
968         void *samples;
969         uint32_t nr_samples;
970         bool free;
971 };
972
973 #define GZ_CHUNK        131072
974
975 static struct iolog_compress *get_new_chunk(unsigned int seq)
976 {
977         struct iolog_compress *c;
978
979         c = malloc(sizeof(*c));
980         INIT_FLIST_HEAD(&c->list);
981         c->buf = malloc(GZ_CHUNK);
982         c->len = 0;
983         c->seq = seq;
984         return c;
985 }
986
987 static void free_chunk(struct iolog_compress *ic)
988 {
989         free(ic->buf);
990         free(ic);
991 }
992
993 static int z_stream_init(z_stream *stream, int gz_hdr)
994 {
995         int wbits = 15;
996
997         memset(stream, 0, sizeof(*stream));
998         stream->zalloc = Z_NULL;
999         stream->zfree = Z_NULL;
1000         stream->opaque = Z_NULL;
1001         stream->next_in = Z_NULL;
1002
1003         /*
1004          * zlib magic - add 32 for auto-detection of gz header or not,
1005          * if we decide to store files in a gzip friendly format.
1006          */
1007         if (gz_hdr)
1008                 wbits += 32;
1009
1010         if (inflateInit2(stream, wbits) != Z_OK)
1011                 return 1;
1012
1013         return 0;
1014 }
1015
1016 struct inflate_chunk_iter {
1017         unsigned int seq;
1018         int err;
1019         void *buf;
1020         size_t buf_size;
1021         size_t buf_used;
1022         size_t chunk_sz;
1023 };
1024
1025 static void finish_chunk(z_stream *stream, FILE *f,
1026                          struct inflate_chunk_iter *iter)
1027 {
1028         int ret;
1029
1030         ret = inflateEnd(stream);
1031         if (ret != Z_OK)
1032                 log_err("fio: failed to end log inflation seq %d (%d)\n",
1033                                 iter->seq, ret);
1034
1035         flush_samples(f, iter->buf, iter->buf_used);
1036         free(iter->buf);
1037         iter->buf = NULL;
1038         iter->buf_size = iter->buf_used = 0;
1039 }
1040
1041 /*
1042  * Iterative chunk inflation. Handles cases where we cross into a new
1043  * sequence, doing flush finish of previous chunk if needed.
1044  */
1045 static size_t inflate_chunk(struct iolog_compress *ic, int gz_hdr, FILE *f,
1046                             z_stream *stream, struct inflate_chunk_iter *iter)
1047 {
1048         size_t ret;
1049
1050         dprint(FD_COMPRESS, "inflate chunk size=%lu, seq=%u\n",
1051                                 (unsigned long) ic->len, ic->seq);
1052
1053         if (ic->seq != iter->seq) {
1054                 if (iter->seq)
1055                         finish_chunk(stream, f, iter);
1056
1057                 z_stream_init(stream, gz_hdr);
1058                 iter->seq = ic->seq;
1059         }
1060
1061         stream->avail_in = ic->len;
1062         stream->next_in = ic->buf;
1063
1064         if (!iter->buf_size) {
1065                 iter->buf_size = iter->chunk_sz;
1066                 iter->buf = malloc(iter->buf_size);
1067         }
1068
1069         while (stream->avail_in) {
1070                 size_t this_out = iter->buf_size - iter->buf_used;
1071                 int err;
1072
1073                 stream->avail_out = this_out;
1074                 stream->next_out = iter->buf + iter->buf_used;
1075
1076                 err = inflate(stream, Z_NO_FLUSH);
1077                 if (err < 0) {
1078                         log_err("fio: failed inflating log: %d\n", err);
1079                         iter->err = err;
1080                         break;
1081                 }
1082
1083                 iter->buf_used += this_out - stream->avail_out;
1084
1085                 if (!stream->avail_out) {
1086                         iter->buf_size += iter->chunk_sz;
1087                         iter->buf = realloc(iter->buf, iter->buf_size);
1088                         continue;
1089                 }
1090
1091                 if (err == Z_STREAM_END)
1092                         break;
1093         }
1094
1095         ret = (void *) stream->next_in - ic->buf;
1096
1097         dprint(FD_COMPRESS, "inflated to size=%lu\n", (unsigned long) iter->buf_size);
1098
1099         return ret;
1100 }
1101
1102 /*
1103  * Inflate stored compressed chunks, or write them directly to the log
1104  * file if so instructed.
1105  */
1106 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1107 {
1108         struct inflate_chunk_iter iter = { .chunk_sz = log->log_gz, };
1109         z_stream stream;
1110
1111         while (!flist_empty(&log->chunk_list)) {
1112                 struct iolog_compress *ic;
1113
1114                 ic = flist_first_entry(&log->chunk_list, struct iolog_compress, list);
1115                 flist_del(&ic->list);
1116
1117                 if (log->log_gz_store) {
1118                         size_t ret;
1119
1120                         dprint(FD_COMPRESS, "log write chunk size=%lu, "
1121                                 "seq=%u\n", (unsigned long) ic->len, ic->seq);
1122
1123                         ret = fwrite(ic->buf, ic->len, 1, f);
1124                         if (ret != 1 || ferror(f)) {
1125                                 iter.err = errno;
1126                                 log_err("fio: error writing compressed log\n");
1127                         }
1128                 } else
1129                         inflate_chunk(ic, log->log_gz_store, f, &stream, &iter);
1130
1131                 free_chunk(ic);
1132         }
1133
1134         if (iter.seq) {
1135                 finish_chunk(&stream, f, &iter);
1136                 free(iter.buf);
1137         }
1138
1139         return iter.err;
1140 }
1141
1142 /*
1143  * Open compressed log file and decompress the stored chunks and
1144  * write them to stdout. The chunks are stored sequentially in the
1145  * file, so we iterate over them and do them one-by-one.
1146  */
1147 int iolog_file_inflate(const char *file)
1148 {
1149         struct inflate_chunk_iter iter = { .chunk_sz = 64 * 1024 * 1024, };
1150         struct iolog_compress ic;
1151         z_stream stream;
1152         struct stat sb;
1153         size_t ret;
1154         size_t total;
1155         void *buf;
1156         FILE *f;
1157
1158         f = fopen(file, "r");
1159         if (!f) {
1160                 perror("fopen");
1161                 return 1;
1162         }
1163
1164         if (stat(file, &sb) < 0) {
1165                 fclose(f);
1166                 perror("stat");
1167                 return 1;
1168         }
1169
1170         ic.buf = buf = malloc(sb.st_size);
1171         ic.len = sb.st_size;
1172         ic.seq = 1;
1173
1174         ret = fread(ic.buf, ic.len, 1, f);
1175         if (ret == 0 && ferror(f)) {
1176                 perror("fread");
1177                 fclose(f);
1178                 free(buf);
1179                 return 1;
1180         } else if (ferror(f) || (!feof(f) && ret != 1)) {
1181                 log_err("fio: short read on reading log\n");
1182                 fclose(f);
1183                 free(buf);
1184                 return 1;
1185         }
1186
1187         fclose(f);
1188
1189         /*
1190          * Each chunk will return Z_STREAM_END. We don't know how many
1191          * chunks are in the file, so we just keep looping and incrementing
1192          * the sequence number until we have consumed the whole compressed
1193          * file.
1194          */
1195         total = ic.len;
1196         do {
1197                 size_t iret;
1198
1199                 iret = inflate_chunk(&ic,  1, stdout, &stream, &iter);
1200                 total -= iret;
1201                 if (!total)
1202                         break;
1203                 if (iter.err)
1204                         break;
1205
1206                 ic.seq++;
1207                 ic.len -= iret;
1208                 ic.buf += iret;
1209         } while (1);
1210
1211         if (iter.seq) {
1212                 finish_chunk(&stream, stdout, &iter);
1213                 free(iter.buf);
1214         }
1215
1216         free(buf);
1217         return iter.err;
1218 }
1219
1220 #else
1221
1222 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1223 {
1224         return 0;
1225 }
1226
1227 int iolog_file_inflate(const char *file)
1228 {
1229         log_err("fio: log inflation not possible without zlib\n");
1230         return 1;
1231 }
1232
1233 #endif
1234
1235 void flush_log(struct io_log *log, bool do_append)
1236 {
1237         void *buf;
1238         FILE *f;
1239
1240         if (!do_append)
1241                 f = fopen(log->filename, "w");
1242         else
1243                 f = fopen(log->filename, "a");
1244         if (!f) {
1245                 perror("fopen log");
1246                 return;
1247         }
1248
1249         buf = set_file_buffer(f);
1250
1251         inflate_gz_chunks(log, f);
1252
1253         while (!flist_empty(&log->io_logs)) {
1254                 struct io_logs *cur_log;
1255
1256                 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1257                 flist_del_init(&cur_log->list);
1258                 
1259                 if (log->td && log == log->td->clat_hist_log)
1260                         flush_hist_samples(f, log->hist_coarseness, cur_log->log,
1261                                            log_sample_sz(log, cur_log));
1262                 else
1263                         flush_samples(f, cur_log->log, log_sample_sz(log, cur_log));
1264                 
1265                 sfree(cur_log);
1266         }
1267
1268         fclose(f);
1269         clear_file_buffer(buf);
1270 }
1271
1272 static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
1273 {
1274         if (td->flags & TD_F_COMPRESS_LOG)
1275                 iolog_flush(log);
1276
1277         if (trylock) {
1278                 if (fio_trylock_file(log->filename))
1279                         return 1;
1280         } else
1281                 fio_lock_file(log->filename);
1282
1283         if (td->client_type == FIO_CLIENT_TYPE_GUI || is_backend)
1284                 fio_send_iolog(td, log, log->filename);
1285         else
1286                 flush_log(log, !td->o.per_job_logs);
1287
1288         fio_unlock_file(log->filename);
1289         free_log(log);
1290         return 0;
1291 }
1292
1293 size_t log_chunk_sizes(struct io_log *log)
1294 {
1295         struct flist_head *entry;
1296         size_t ret;
1297
1298         if (flist_empty(&log->chunk_list))
1299                 return 0;
1300
1301         ret = 0;
1302         pthread_mutex_lock(&log->chunk_lock);
1303         flist_for_each(entry, &log->chunk_list) {
1304                 struct iolog_compress *c;
1305
1306                 c = flist_entry(entry, struct iolog_compress, list);
1307                 ret += c->len;
1308         }
1309         pthread_mutex_unlock(&log->chunk_lock);
1310         return ret;
1311 }
1312
1313 #ifdef CONFIG_ZLIB
1314
1315 static void iolog_put_deferred(struct io_log *log, void *ptr)
1316 {
1317         if (!ptr)
1318                 return;
1319
1320         pthread_mutex_lock(&log->deferred_free_lock);
1321         if (log->deferred < IOLOG_MAX_DEFER) {
1322                 log->deferred_items[log->deferred] = ptr;
1323                 log->deferred++;
1324         } else if (!fio_did_warn(FIO_WARN_IOLOG_DROP))
1325                 log_err("fio: had to drop log entry free\n");
1326         pthread_mutex_unlock(&log->deferred_free_lock);
1327 }
1328
1329 static void iolog_free_deferred(struct io_log *log)
1330 {
1331         int i;
1332
1333         if (!log->deferred)
1334                 return;
1335
1336         pthread_mutex_lock(&log->deferred_free_lock);
1337
1338         for (i = 0; i < log->deferred; i++) {
1339                 free(log->deferred_items[i]);
1340                 log->deferred_items[i] = NULL;
1341         }
1342
1343         log->deferred = 0;
1344         pthread_mutex_unlock(&log->deferred_free_lock);
1345 }
1346
1347 static int gz_work(struct iolog_flush_data *data)
1348 {
1349         struct iolog_compress *c = NULL;
1350         struct flist_head list;
1351         unsigned int seq;
1352         z_stream stream;
1353         size_t total = 0;
1354         int ret;
1355
1356         INIT_FLIST_HEAD(&list);
1357
1358         memset(&stream, 0, sizeof(stream));
1359         stream.zalloc = Z_NULL;
1360         stream.zfree = Z_NULL;
1361         stream.opaque = Z_NULL;
1362
1363         ret = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
1364         if (ret != Z_OK) {
1365                 log_err("fio: failed to init gz stream\n");
1366                 goto err;
1367         }
1368
1369         seq = ++data->log->chunk_seq;
1370
1371         stream.next_in = (void *) data->samples;
1372         stream.avail_in = data->nr_samples * log_entry_sz(data->log);
1373
1374         dprint(FD_COMPRESS, "deflate input size=%lu, seq=%u, log=%s\n",
1375                                 (unsigned long) stream.avail_in, seq,
1376                                 data->log->filename);
1377         do {
1378                 if (c)
1379                         dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1380                                 (unsigned long) c->len);
1381                 c = get_new_chunk(seq);
1382                 stream.avail_out = GZ_CHUNK;
1383                 stream.next_out = c->buf;
1384                 ret = deflate(&stream, Z_NO_FLUSH);
1385                 if (ret < 0) {
1386                         log_err("fio: deflate log (%d)\n", ret);
1387                         free_chunk(c);
1388                         goto err;
1389                 }
1390
1391                 c->len = GZ_CHUNK - stream.avail_out;
1392                 flist_add_tail(&c->list, &list);
1393                 total += c->len;
1394         } while (stream.avail_in);
1395
1396         stream.next_out = c->buf + c->len;
1397         stream.avail_out = GZ_CHUNK - c->len;
1398
1399         ret = deflate(&stream, Z_FINISH);
1400         if (ret < 0) {
1401                 /*
1402                  * Z_BUF_ERROR is special, it just means we need more
1403                  * output space. We'll handle that below. Treat any other
1404                  * error as fatal.
1405                  */
1406                 if (ret != Z_BUF_ERROR) {
1407                         log_err("fio: deflate log (%d)\n", ret);
1408                         flist_del(&c->list);
1409                         free_chunk(c);
1410                         goto err;
1411                 }
1412         }
1413
1414         total -= c->len;
1415         c->len = GZ_CHUNK - stream.avail_out;
1416         total += c->len;
1417         dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq, (unsigned long) c->len);
1418
1419         if (ret != Z_STREAM_END) {
1420                 do {
1421                         c = get_new_chunk(seq);
1422                         stream.avail_out = GZ_CHUNK;
1423                         stream.next_out = c->buf;
1424                         ret = deflate(&stream, Z_FINISH);
1425                         c->len = GZ_CHUNK - stream.avail_out;
1426                         total += c->len;
1427                         flist_add_tail(&c->list, &list);
1428                         dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1429                                 (unsigned long) c->len);
1430                 } while (ret != Z_STREAM_END);
1431         }
1432
1433         dprint(FD_COMPRESS, "deflated to size=%lu\n", (unsigned long) total);
1434
1435         ret = deflateEnd(&stream);
1436         if (ret != Z_OK)
1437                 log_err("fio: deflateEnd %d\n", ret);
1438
1439         iolog_put_deferred(data->log, data->samples);
1440
1441         if (!flist_empty(&list)) {
1442                 pthread_mutex_lock(&data->log->chunk_lock);
1443                 flist_splice_tail(&list, &data->log->chunk_list);
1444                 pthread_mutex_unlock(&data->log->chunk_lock);
1445         }
1446
1447         ret = 0;
1448 done:
1449         if (data->free)
1450                 sfree(data);
1451         return ret;
1452 err:
1453         while (!flist_empty(&list)) {
1454                 c = flist_first_entry(list.next, struct iolog_compress, list);
1455                 flist_del(&c->list);
1456                 free_chunk(c);
1457         }
1458         ret = 1;
1459         goto done;
1460 }
1461
1462 /*
1463  * Invoked from our compress helper thread, when logging would have exceeded
1464  * the specified memory limitation. Compresses the previously stored
1465  * entries.
1466  */
1467 static int gz_work_async(struct submit_worker *sw, struct workqueue_work *work)
1468 {
1469         return gz_work(container_of(work, struct iolog_flush_data, work));
1470 }
1471
1472 static int gz_init_worker(struct submit_worker *sw)
1473 {
1474         struct thread_data *td = sw->wq->td;
1475
1476         if (!fio_option_is_set(&td->o, log_gz_cpumask))
1477                 return 0;
1478
1479         if (fio_setaffinity(gettid(), td->o.log_gz_cpumask) == -1) {
1480                 log_err("gz: failed to set CPU affinity\n");
1481                 return 1;
1482         }
1483
1484         return 0;
1485 }
1486
1487 static struct workqueue_ops log_compress_wq_ops = {
1488         .fn             = gz_work_async,
1489         .init_worker_fn = gz_init_worker,
1490         .nice           = 1,
1491 };
1492
1493 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1494 {
1495         if (!(td->flags & TD_F_COMPRESS_LOG))
1496                 return 0;
1497
1498         workqueue_init(td, &td->log_compress_wq, &log_compress_wq_ops, 1, sk_out);
1499         return 0;
1500 }
1501
1502 void iolog_compress_exit(struct thread_data *td)
1503 {
1504         if (!(td->flags & TD_F_COMPRESS_LOG))
1505                 return;
1506
1507         workqueue_exit(&td->log_compress_wq);
1508 }
1509
1510 /*
1511  * Queue work item to compress the existing log entries. We reset the
1512  * current log to a small size, and reference the existing log in the
1513  * data that we queue for compression. Once compression has been done,
1514  * this old log is freed. If called with finish == true, will not return
1515  * until the log compression has completed, and will flush all previous
1516  * logs too
1517  */
1518 static int iolog_flush(struct io_log *log)
1519 {
1520         struct iolog_flush_data *data;
1521
1522         data = malloc(sizeof(*data));
1523         if (!data)
1524                 return 1;
1525
1526         data->log = log;
1527         data->free = false;
1528
1529         while (!flist_empty(&log->io_logs)) {
1530                 struct io_logs *cur_log;
1531
1532                 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1533                 flist_del_init(&cur_log->list);
1534
1535                 data->samples = cur_log->log;
1536                 data->nr_samples = cur_log->nr_samples;
1537
1538                 sfree(cur_log);
1539
1540                 gz_work(data);
1541         }
1542
1543         free(data);
1544         return 0;
1545 }
1546
1547 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1548 {
1549         struct iolog_flush_data *data;
1550
1551         data = smalloc(sizeof(*data));
1552         if (!data)
1553                 return 1;
1554
1555         data->log = log;
1556
1557         data->samples = cur_log->log;
1558         data->nr_samples = cur_log->nr_samples;
1559         data->free = true;
1560
1561         cur_log->nr_samples = cur_log->max_samples = 0;
1562         cur_log->log = NULL;
1563
1564         workqueue_enqueue(&log->td->log_compress_wq, &data->work);
1565
1566         iolog_free_deferred(log);
1567
1568         return 0;
1569 }
1570 #else
1571
1572 static int iolog_flush(struct io_log *log)
1573 {
1574         return 1;
1575 }
1576
1577 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1578 {
1579         return 1;
1580 }
1581
1582 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1583 {
1584         return 0;
1585 }
1586
1587 void iolog_compress_exit(struct thread_data *td)
1588 {
1589 }
1590
1591 #endif
1592
1593 struct io_logs *iolog_cur_log(struct io_log *log)
1594 {
1595         if (flist_empty(&log->io_logs))
1596                 return NULL;
1597
1598         return flist_last_entry(&log->io_logs, struct io_logs, list);
1599 }
1600
1601 uint64_t iolog_nr_samples(struct io_log *iolog)
1602 {
1603         struct flist_head *entry;
1604         uint64_t ret = 0;
1605
1606         flist_for_each(entry, &iolog->io_logs) {
1607                 struct io_logs *cur_log;
1608
1609                 cur_log = flist_entry(entry, struct io_logs, list);
1610                 ret += cur_log->nr_samples;
1611         }
1612
1613         return ret;
1614 }
1615
1616 static int __write_log(struct thread_data *td, struct io_log *log, int try)
1617 {
1618         if (log)
1619                 return finish_log(td, log, try);
1620
1621         return 0;
1622 }
1623
1624 static int write_iops_log(struct thread_data *td, int try, bool unit_log)
1625 {
1626         int ret;
1627
1628         if (per_unit_log(td->iops_log) != unit_log)
1629                 return 0;
1630
1631         ret = __write_log(td, td->iops_log, try);
1632         if (!ret)
1633                 td->iops_log = NULL;
1634
1635         return ret;
1636 }
1637
1638 static int write_slat_log(struct thread_data *td, int try, bool unit_log)
1639 {
1640         int ret;
1641
1642         if (!unit_log)
1643                 return 0;
1644
1645         ret = __write_log(td, td->slat_log, try);
1646         if (!ret)
1647                 td->slat_log = NULL;
1648
1649         return ret;
1650 }
1651
1652 static int write_clat_log(struct thread_data *td, int try, bool unit_log)
1653 {
1654         int ret;
1655
1656         if (!unit_log)
1657                 return 0;
1658
1659         ret = __write_log(td, td->clat_log, try);
1660         if (!ret)
1661                 td->clat_log = NULL;
1662
1663         return ret;
1664 }
1665
1666 static int write_clat_hist_log(struct thread_data *td, int try, bool unit_log)
1667 {
1668         int ret;
1669
1670         if (!unit_log)
1671                 return 0;
1672
1673         ret = __write_log(td, td->clat_hist_log, try);
1674         if (!ret)
1675                 td->clat_hist_log = NULL;
1676
1677         return ret;
1678 }
1679
1680 static int write_lat_log(struct thread_data *td, int try, bool unit_log)
1681 {
1682         int ret;
1683
1684         if (!unit_log)
1685                 return 0;
1686
1687         ret = __write_log(td, td->lat_log, try);
1688         if (!ret)
1689                 td->lat_log = NULL;
1690
1691         return ret;
1692 }
1693
1694 static int write_bandw_log(struct thread_data *td, int try, bool unit_log)
1695 {
1696         int ret;
1697
1698         if (per_unit_log(td->bw_log) != unit_log)
1699                 return 0;
1700
1701         ret = __write_log(td, td->bw_log, try);
1702         if (!ret)
1703                 td->bw_log = NULL;
1704
1705         return ret;
1706 }
1707
1708 enum {
1709         BW_LOG_MASK     = 1,
1710         LAT_LOG_MASK    = 2,
1711         SLAT_LOG_MASK   = 4,
1712         CLAT_LOG_MASK   = 8,
1713         IOPS_LOG_MASK   = 16,
1714         CLAT_HIST_LOG_MASK = 32,
1715
1716         ALL_LOG_NR      = 6,
1717 };
1718
1719 struct log_type {
1720         unsigned int mask;
1721         int (*fn)(struct thread_data *, int, bool);
1722 };
1723
1724 static struct log_type log_types[] = {
1725         {
1726                 .mask   = BW_LOG_MASK,
1727                 .fn     = write_bandw_log,
1728         },
1729         {
1730                 .mask   = LAT_LOG_MASK,
1731                 .fn     = write_lat_log,
1732         },
1733         {
1734                 .mask   = SLAT_LOG_MASK,
1735                 .fn     = write_slat_log,
1736         },
1737         {
1738                 .mask   = CLAT_LOG_MASK,
1739                 .fn     = write_clat_log,
1740         },
1741         {
1742                 .mask   = IOPS_LOG_MASK,
1743                 .fn     = write_iops_log,
1744         },
1745         {
1746                 .mask   = CLAT_HIST_LOG_MASK,
1747                 .fn     = write_clat_hist_log,
1748         }
1749 };
1750
1751 void td_writeout_logs(struct thread_data *td, bool unit_logs)
1752 {
1753         unsigned int log_mask = 0;
1754         unsigned int log_left = ALL_LOG_NR;
1755         int old_state, i;
1756
1757         old_state = td_bump_runstate(td, TD_FINISHING);
1758
1759         finalize_logs(td, unit_logs);
1760
1761         while (log_left) {
1762                 int prev_log_left = log_left;
1763
1764                 for (i = 0; i < ALL_LOG_NR && log_left; i++) {
1765                         struct log_type *lt = &log_types[i];
1766                         int ret;
1767
1768                         if (!(log_mask & lt->mask)) {
1769                                 ret = lt->fn(td, log_left != 1, unit_logs);
1770                                 if (!ret) {
1771                                         log_left--;
1772                                         log_mask |= lt->mask;
1773                                 }
1774                         }
1775                 }
1776
1777                 if (prev_log_left == log_left)
1778                         usleep(5000);
1779         }
1780
1781         td_restore_runstate(td, old_state);
1782 }
1783
1784 void fio_writeout_logs(bool unit_logs)
1785 {
1786         struct thread_data *td;
1787         int i;
1788
1789         for_each_td(td, i)
1790                 td_writeout_logs(td, unit_logs);
1791 }