Merge branch 'cifuzz-integration' of https://github.com/DavidKorczynski/fio
[fio.git] / iolog.c
1 /*
2  * Code related to writing an iolog of what a thread is doing, and to
3  * later read that back and replay
4  */
5 #include <stdio.h>
6 #include <stdlib.h>
7 #include <assert.h>
8 #include <sys/types.h>
9 #include <sys/stat.h>
10 #include <unistd.h>
11 #ifdef CONFIG_ZLIB
12 #include <zlib.h>
13 #endif
14
15 #include "flist.h"
16 #include "fio.h"
17 #include "trim.h"
18 #include "filelock.h"
19 #include "smalloc.h"
20 #include "blktrace.h"
21 #include "pshared.h"
22 #include "lib/roundup.h"
23
24 #include <netinet/in.h>
25 #include <netinet/tcp.h>
26 #include <arpa/inet.h>
27 #include <sys/stat.h>
28 #include <sys/socket.h>
29 #include <sys/un.h>
30
31 static int iolog_flush(struct io_log *log);
32
33 static const char iolog_ver2[] = "fio version 2 iolog";
34
35 void queue_io_piece(struct thread_data *td, struct io_piece *ipo)
36 {
37         flist_add_tail(&ipo->list, &td->io_log_list);
38         td->total_io_size += ipo->len;
39 }
40
41 void log_io_u(const struct thread_data *td, const struct io_u *io_u)
42 {
43         if (!td->o.write_iolog_file)
44                 return;
45
46         fprintf(td->iolog_f, "%s %s %llu %llu\n", io_u->file->file_name,
47                                                 io_ddir_name(io_u->ddir),
48                                                 io_u->offset, io_u->buflen);
49 }
50
51 void log_file(struct thread_data *td, struct fio_file *f,
52               enum file_log_act what)
53 {
54         const char *act[] = { "add", "open", "close" };
55
56         assert(what < 3);
57
58         if (!td->o.write_iolog_file)
59                 return;
60
61
62         /*
63          * this happens on the pre-open/close done before the job starts
64          */
65         if (!td->iolog_f)
66                 return;
67
68         fprintf(td->iolog_f, "%s %s\n", f->file_name, act[what]);
69 }
70
71 static void iolog_delay(struct thread_data *td, unsigned long delay)
72 {
73         uint64_t usec = utime_since_now(&td->last_issue);
74         unsigned long orig_delay = delay;
75         uint64_t this_delay;
76         struct timespec ts;
77
78         if (delay < td->time_offset) {
79                 td->time_offset = 0;
80                 return;
81         }
82
83         delay -= td->time_offset;
84         if (delay < usec)
85                 return;
86
87         delay -= usec;
88
89         fio_gettime(&ts, NULL);
90         while (delay && !td->terminate) {
91                 this_delay = delay;
92                 if (this_delay > 500000)
93                         this_delay = 500000;
94
95                 usec_sleep(td, this_delay);
96                 delay -= this_delay;
97         }
98
99         usec = utime_since_now(&ts);
100         if (usec > orig_delay)
101                 td->time_offset = usec - orig_delay;
102         else
103                 td->time_offset = 0;
104 }
105
106 static int ipo_special(struct thread_data *td, struct io_piece *ipo)
107 {
108         struct fio_file *f;
109         int ret;
110
111         /*
112          * Not a special ipo
113          */
114         if (ipo->ddir != DDIR_INVAL)
115                 return 0;
116
117         f = td->files[ipo->fileno];
118
119         switch (ipo->file_action) {
120         case FIO_LOG_OPEN_FILE:
121                 if (td->o.replay_redirect && fio_file_open(f)) {
122                         dprint(FD_FILE, "iolog: ignoring re-open of file %s\n",
123                                         f->file_name);
124                         break;
125                 }
126                 ret = td_io_open_file(td, f);
127                 if (!ret)
128                         break;
129                 td_verror(td, ret, "iolog open file");
130                 return -1;
131         case FIO_LOG_CLOSE_FILE:
132                 td_io_close_file(td, f);
133                 break;
134         case FIO_LOG_UNLINK_FILE:
135                 td_io_unlink_file(td, f);
136                 break;
137         default:
138                 log_err("fio: bad file action %d\n", ipo->file_action);
139                 break;
140         }
141
142         return 1;
143 }
144
145 static bool read_iolog2(struct thread_data *td);
146
147 int read_iolog_get(struct thread_data *td, struct io_u *io_u)
148 {
149         struct io_piece *ipo;
150         unsigned long elapsed;
151
152         while (!flist_empty(&td->io_log_list)) {
153                 int ret;
154
155                 if (td->o.read_iolog_chunked) {
156                         if (td->io_log_checkmark == td->io_log_current) {
157                                 if (td->io_log_blktrace) {
158                                         if (!read_blktrace(td))
159                                                 return 1;
160                                 } else {
161                                         if (!read_iolog2(td))
162                                                 return 1;
163                                 }
164                         }
165                         td->io_log_current--;
166                 }
167                 ipo = flist_first_entry(&td->io_log_list, struct io_piece, list);
168                 flist_del(&ipo->list);
169                 remove_trim_entry(td, ipo);
170
171                 ret = ipo_special(td, ipo);
172                 if (ret < 0) {
173                         free(ipo);
174                         break;
175                 } else if (ret > 0) {
176                         free(ipo);
177                         continue;
178                 }
179
180                 io_u->ddir = ipo->ddir;
181                 if (ipo->ddir != DDIR_WAIT) {
182                         io_u->offset = ipo->offset;
183                         io_u->verify_offset = ipo->offset;
184                         io_u->buflen = ipo->len;
185                         io_u->file = td->files[ipo->fileno];
186                         get_file(io_u->file);
187                         dprint(FD_IO, "iolog: get %llu/%llu/%s\n", io_u->offset,
188                                                 io_u->buflen, io_u->file->file_name);
189                         if (ipo->delay)
190                                 iolog_delay(td, ipo->delay);
191                 } else {
192                         elapsed = mtime_since_genesis();
193                         if (ipo->delay > elapsed)
194                                 usec_sleep(td, (ipo->delay - elapsed) * 1000);
195                 }
196
197                 free(ipo);
198
199                 if (io_u->ddir != DDIR_WAIT)
200                         return 0;
201         }
202
203         td->done = 1;
204         return 1;
205 }
206
207 void prune_io_piece_log(struct thread_data *td)
208 {
209         struct io_piece *ipo;
210         struct fio_rb_node *n;
211
212         while ((n = rb_first(&td->io_hist_tree)) != NULL) {
213                 ipo = rb_entry(n, struct io_piece, rb_node);
214                 rb_erase(n, &td->io_hist_tree);
215                 remove_trim_entry(td, ipo);
216                 td->io_hist_len--;
217                 free(ipo);
218         }
219
220         while (!flist_empty(&td->io_hist_list)) {
221                 ipo = flist_first_entry(&td->io_hist_list, struct io_piece, list);
222                 flist_del(&ipo->list);
223                 remove_trim_entry(td, ipo);
224                 td->io_hist_len--;
225                 free(ipo);
226         }
227 }
228
229 /*
230  * log a successful write, so we can unwind the log for verify
231  */
232 void log_io_piece(struct thread_data *td, struct io_u *io_u)
233 {
234         struct fio_rb_node **p, *parent;
235         struct io_piece *ipo, *__ipo;
236
237         ipo = calloc(1, sizeof(struct io_piece));
238         init_ipo(ipo);
239         ipo->file = io_u->file;
240         ipo->offset = io_u->offset;
241         ipo->len = io_u->buflen;
242         ipo->numberio = io_u->numberio;
243         ipo->flags = IP_F_IN_FLIGHT;
244
245         io_u->ipo = ipo;
246
247         if (io_u_should_trim(td, io_u)) {
248                 flist_add_tail(&ipo->trim_list, &td->trim_list);
249                 td->trim_entries++;
250         }
251
252         /*
253          * Only sort writes if we don't have a random map in which case we need
254          * to check for duplicate blocks and drop the old one, which we rely on
255          * the rb insert/lookup for handling.
256          */
257         if (file_randommap(td, ipo->file)) {
258                 INIT_FLIST_HEAD(&ipo->list);
259                 flist_add_tail(&ipo->list, &td->io_hist_list);
260                 ipo->flags |= IP_F_ONLIST;
261                 td->io_hist_len++;
262                 return;
263         }
264
265         RB_CLEAR_NODE(&ipo->rb_node);
266
267         /*
268          * Sort the entry into the verification list
269          */
270 restart:
271         p = &td->io_hist_tree.rb_node;
272         parent = NULL;
273         while (*p) {
274                 int overlap = 0;
275                 parent = *p;
276
277                 __ipo = rb_entry(parent, struct io_piece, rb_node);
278                 if (ipo->file < __ipo->file)
279                         p = &(*p)->rb_left;
280                 else if (ipo->file > __ipo->file)
281                         p = &(*p)->rb_right;
282                 else if (ipo->offset < __ipo->offset) {
283                         p = &(*p)->rb_left;
284                         overlap = ipo->offset + ipo->len > __ipo->offset;
285                 }
286                 else if (ipo->offset > __ipo->offset) {
287                         p = &(*p)->rb_right;
288                         overlap = __ipo->offset + __ipo->len > ipo->offset;
289                 }
290                 else
291                         overlap = 1;
292
293                 if (overlap) {
294                         dprint(FD_IO, "iolog: overlap %llu/%lu, %llu/%lu\n",
295                                 __ipo->offset, __ipo->len,
296                                 ipo->offset, ipo->len);
297                         td->io_hist_len--;
298                         rb_erase(parent, &td->io_hist_tree);
299                         remove_trim_entry(td, __ipo);
300                         if (!(__ipo->flags & IP_F_IN_FLIGHT))
301                                 free(__ipo);
302                         goto restart;
303                 }
304         }
305
306         rb_link_node(&ipo->rb_node, parent, p);
307         rb_insert_color(&ipo->rb_node, &td->io_hist_tree);
308         ipo->flags |= IP_F_ONRB;
309         td->io_hist_len++;
310 }
311
312 void unlog_io_piece(struct thread_data *td, struct io_u *io_u)
313 {
314         struct io_piece *ipo = io_u->ipo;
315
316         if (td->ts.nr_block_infos) {
317                 uint32_t *info = io_u_block_info(td, io_u);
318                 if (BLOCK_INFO_STATE(*info) < BLOCK_STATE_TRIM_FAILURE) {
319                         if (io_u->ddir == DDIR_TRIM)
320                                 *info = BLOCK_INFO_SET_STATE(*info,
321                                                 BLOCK_STATE_TRIM_FAILURE);
322                         else if (io_u->ddir == DDIR_WRITE)
323                                 *info = BLOCK_INFO_SET_STATE(*info,
324                                                 BLOCK_STATE_WRITE_FAILURE);
325                 }
326         }
327
328         if (!ipo)
329                 return;
330
331         if (ipo->flags & IP_F_ONRB)
332                 rb_erase(&ipo->rb_node, &td->io_hist_tree);
333         else if (ipo->flags & IP_F_ONLIST)
334                 flist_del(&ipo->list);
335
336         free(ipo);
337         io_u->ipo = NULL;
338         td->io_hist_len--;
339 }
340
341 void trim_io_piece(const struct io_u *io_u)
342 {
343         struct io_piece *ipo = io_u->ipo;
344
345         if (!ipo)
346                 return;
347
348         ipo->len = io_u->xfer_buflen - io_u->resid;
349 }
350
351 void write_iolog_close(struct thread_data *td)
352 {
353         if (!td->iolog_f)
354                 return;
355
356         fflush(td->iolog_f);
357         fclose(td->iolog_f);
358         free(td->iolog_buf);
359         td->iolog_f = NULL;
360         td->iolog_buf = NULL;
361 }
362
363 int64_t iolog_items_to_fetch(struct thread_data *td)
364 {
365         struct timespec now;
366         uint64_t elapsed;
367         uint64_t for_1s;
368         int64_t items_to_fetch;
369
370         if (!td->io_log_highmark)
371                 return 10;
372
373
374         fio_gettime(&now, NULL);
375         elapsed = ntime_since(&td->io_log_highmark_time, &now);
376         if (elapsed) {
377                 for_1s = (td->io_log_highmark - td->io_log_current) * 1000000000 / elapsed;
378                 items_to_fetch = for_1s - td->io_log_current;
379                 if (items_to_fetch < 0)
380                         items_to_fetch = 0;
381         } else
382                 items_to_fetch = 0;
383
384         td->io_log_highmark = td->io_log_current + items_to_fetch;
385         td->io_log_checkmark = (td->io_log_highmark + 1) / 2;
386         fio_gettime(&td->io_log_highmark_time, NULL);
387
388         return items_to_fetch;
389 }
390
391 /*
392  * Read version 2 iolog data. It is enhanced to include per-file logging,
393  * syncs, etc.
394  */
395 static bool read_iolog2(struct thread_data *td)
396 {
397         unsigned long long offset;
398         unsigned int bytes;
399         int reads, writes, waits, fileno = 0, file_action = 0; /* stupid gcc */
400         char *rfname, *fname, *act;
401         char *str, *p;
402         enum fio_ddir rw;
403         bool realloc = false;
404         int64_t items_to_fetch = 0;
405
406         if (td->o.read_iolog_chunked) {
407                 items_to_fetch = iolog_items_to_fetch(td);
408                 if (!items_to_fetch)
409                         return true;
410         }
411
412         /*
413          * Read in the read iolog and store it, reuse the infrastructure
414          * for doing verifications.
415          */
416         str = malloc(4096);
417         rfname = fname = malloc(256+16);
418         act = malloc(256+16);
419
420         reads = writes = waits = 0;
421         while ((p = fgets(str, 4096, td->io_log_rfile)) != NULL) {
422                 struct io_piece *ipo;
423                 int r;
424
425                 r = sscanf(p, "%256s %256s %llu %u", rfname, act, &offset,
426                                                                         &bytes);
427
428                 if (td->o.replay_redirect)
429                         fname = td->o.replay_redirect;
430
431                 if (r == 4) {
432                         /*
433                          * Check action first
434                          */
435                         if (!strcmp(act, "wait"))
436                                 rw = DDIR_WAIT;
437                         else if (!strcmp(act, "read"))
438                                 rw = DDIR_READ;
439                         else if (!strcmp(act, "write"))
440                                 rw = DDIR_WRITE;
441                         else if (!strcmp(act, "sync"))
442                                 rw = DDIR_SYNC;
443                         else if (!strcmp(act, "datasync"))
444                                 rw = DDIR_DATASYNC;
445                         else if (!strcmp(act, "trim"))
446                                 rw = DDIR_TRIM;
447                         else {
448                                 log_err("fio: bad iolog file action: %s\n",
449                                                                         act);
450                                 continue;
451                         }
452                         fileno = get_fileno(td, fname);
453                 } else if (r == 2) {
454                         rw = DDIR_INVAL;
455                         if (!strcmp(act, "add")) {
456                                 if (td->o.replay_redirect &&
457                                     get_fileno(td, fname) != -1) {
458                                         dprint(FD_FILE, "iolog: ignoring"
459                                                 " re-add of file %s\n", fname);
460                                 } else {
461                                         fileno = add_file(td, fname, td->subjob_number, 1);
462                                         file_action = FIO_LOG_ADD_FILE;
463                                 }
464                                 continue;
465                         } else if (!strcmp(act, "open")) {
466                                 fileno = get_fileno(td, fname);
467                                 file_action = FIO_LOG_OPEN_FILE;
468                         } else if (!strcmp(act, "close")) {
469                                 fileno = get_fileno(td, fname);
470                                 file_action = FIO_LOG_CLOSE_FILE;
471                         } else {
472                                 log_err("fio: bad iolog file action: %s\n",
473                                                                         act);
474                                 continue;
475                         }
476                 } else {
477                         log_err("bad iolog2: %s\n", p);
478                         continue;
479                 }
480
481                 if (rw == DDIR_READ)
482                         reads++;
483                 else if (rw == DDIR_WRITE) {
484                         /*
485                          * Don't add a write for ro mode
486                          */
487                         if (read_only)
488                                 continue;
489                         writes++;
490                 } else if (rw == DDIR_WAIT) {
491                         if (td->o.no_stall)
492                                 continue;
493                         waits++;
494                 } else if (rw == DDIR_INVAL) {
495                 } else if (!ddir_sync(rw)) {
496                         log_err("bad ddir: %d\n", rw);
497                         continue;
498                 }
499
500                 /*
501                  * Make note of file
502                  */
503                 ipo = calloc(1, sizeof(*ipo));
504                 init_ipo(ipo);
505                 ipo->ddir = rw;
506                 if (rw == DDIR_WAIT) {
507                         ipo->delay = offset;
508                 } else {
509                         if (td->o.replay_scale)
510                                 ipo->offset = offset / td->o.replay_scale;
511                         else
512                                 ipo->offset = offset;
513                         ipo_bytes_align(td->o.replay_align, ipo);
514
515                         ipo->len = bytes;
516                         if (rw != DDIR_INVAL && bytes > td->o.max_bs[rw]) {
517                                 realloc = true;
518                                 td->o.max_bs[rw] = bytes;
519                         }
520                         ipo->fileno = fileno;
521                         ipo->file_action = file_action;
522                         td->o.size += bytes;
523                 }
524
525                 queue_io_piece(td, ipo);
526
527                 if (td->o.read_iolog_chunked) {
528                         td->io_log_current++;
529                         items_to_fetch--;
530                         if (items_to_fetch == 0)
531                                 break;
532                 }
533         }
534
535         free(str);
536         free(act);
537         free(rfname);
538
539         if (td->o.read_iolog_chunked) {
540                 td->io_log_highmark = td->io_log_current;
541                 td->io_log_checkmark = (td->io_log_highmark + 1) / 2;
542                 fio_gettime(&td->io_log_highmark_time, NULL);
543         }
544
545         if (writes && read_only) {
546                 log_err("fio: <%s> skips replay of %d writes due to"
547                         " read-only\n", td->o.name, writes);
548                 writes = 0;
549         }
550
551         if (td->o.read_iolog_chunked) {
552                 if (td->io_log_current == 0) {
553                         return false;
554                 }
555                 td->o.td_ddir = TD_DDIR_RW;
556                 if (realloc && td->orig_buffer)
557                 {
558                         io_u_quiesce(td);
559                         free_io_mem(td);
560                         init_io_u_buffers(td);
561                 }
562                 return true;
563         }
564
565         if (!reads && !writes && !waits)
566                 return false;
567         else if (reads && !writes)
568                 td->o.td_ddir = TD_DDIR_READ;
569         else if (!reads && writes)
570                 td->o.td_ddir = TD_DDIR_WRITE;
571         else
572                 td->o.td_ddir = TD_DDIR_RW;
573
574         return true;
575 }
576
577 static bool is_socket(const char *path)
578 {
579         struct stat buf;
580         int r;
581
582         r = stat(path, &buf);
583         if (r == -1)
584                 return false;
585
586         return S_ISSOCK(buf.st_mode);
587 }
588
589 static int open_socket(const char *path)
590 {
591         struct sockaddr_un addr;
592         int ret, fd;
593
594         fd = socket(AF_UNIX, SOCK_STREAM, 0);
595         if (fd < 0)
596                 return fd;
597
598         addr.sun_family = AF_UNIX;
599         if (snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", path) >=
600             sizeof(addr.sun_path)) {
601                 log_err("%s: path name %s is too long for a Unix socket\n",
602                         __func__, path);
603         }
604
605         ret = connect(fd, (const struct sockaddr *)&addr, strlen(path) + sizeof(addr.sun_family));
606         if (!ret)
607                 return fd;
608
609         close(fd);
610         return -1;
611 }
612
613 /*
614  * open iolog, check version, and call appropriate parser
615  */
616 static bool init_iolog_read(struct thread_data *td, char *fname)
617 {
618         char buffer[256], *p;
619         FILE *f = NULL;
620
621         dprint(FD_IO, "iolog: name=%s\n", fname);
622
623         if (is_socket(fname)) {
624                 int fd;
625
626                 fd = open_socket(fname);
627                 if (fd >= 0)
628                         f = fdopen(fd, "r");
629         } else if (!strcmp(fname, "-")) {
630                 f = stdin;
631         } else
632                 f = fopen(fname, "r");
633
634         if (!f) {
635                 perror("fopen read iolog");
636                 return false;
637         }
638
639         p = fgets(buffer, sizeof(buffer), f);
640         if (!p) {
641                 td_verror(td, errno, "iolog read");
642                 log_err("fio: unable to read iolog\n");
643                 fclose(f);
644                 return false;
645         }
646
647         /*
648          * version 2 of the iolog stores a specific string as the
649          * first line, check for that
650          */
651         if (!strncmp(iolog_ver2, buffer, strlen(iolog_ver2))) {
652                 free_release_files(td);
653                 td->io_log_rfile = f;
654                 return read_iolog2(td);
655         }
656
657         log_err("fio: iolog version 1 is no longer supported\n");
658         fclose(f);
659         return false;
660 }
661
662 /*
663  * Set up a log for storing io patterns.
664  */
665 static bool init_iolog_write(struct thread_data *td)
666 {
667         struct fio_file *ff;
668         FILE *f;
669         unsigned int i;
670
671         f = fopen(td->o.write_iolog_file, "a");
672         if (!f) {
673                 perror("fopen write iolog");
674                 return false;
675         }
676
677         /*
678          * That's it for writing, setup a log buffer and we're done.
679           */
680         td->iolog_f = f;
681         td->iolog_buf = malloc(8192);
682         setvbuf(f, td->iolog_buf, _IOFBF, 8192);
683
684         /*
685          * write our version line
686          */
687         if (fprintf(f, "%s\n", iolog_ver2) < 0) {
688                 perror("iolog init\n");
689                 return false;
690         }
691
692         /*
693          * add all known files
694          */
695         for_each_file(td, ff, i)
696                 log_file(td, ff, FIO_LOG_ADD_FILE);
697
698         return true;
699 }
700
701 bool init_iolog(struct thread_data *td)
702 {
703         bool ret;
704
705         if (td->o.read_iolog_file) {
706                 int need_swap;
707                 char * fname = get_name_by_idx(td->o.read_iolog_file, td->subjob_number);
708
709                 /*
710                  * Check if it's a blktrace file and load that if possible.
711                  * Otherwise assume it's a normal log file and load that.
712                  */
713                 if (is_blktrace(fname, &need_swap)) {
714                         td->io_log_blktrace = 1;
715                         ret = init_blktrace_read(td, fname, need_swap);
716                 } else {
717                         td->io_log_blktrace = 0;
718                         ret = init_iolog_read(td, fname);
719                 }
720                 free(fname);
721         } else if (td->o.write_iolog_file)
722                 ret = init_iolog_write(td);
723         else
724                 ret = true;
725
726         if (!ret)
727                 td_verror(td, EINVAL, "failed initializing iolog");
728
729         return ret;
730 }
731
732 void setup_log(struct io_log **log, struct log_params *p,
733                const char *filename)
734 {
735         struct io_log *l;
736         int i;
737         struct io_u_plat_entry *entry;
738         struct flist_head *list;
739
740         l = scalloc(1, sizeof(*l));
741         INIT_FLIST_HEAD(&l->io_logs);
742         l->log_type = p->log_type;
743         l->log_offset = p->log_offset;
744         l->log_prio = p->log_prio;
745         l->log_gz = p->log_gz;
746         l->log_gz_store = p->log_gz_store;
747         l->avg_msec = p->avg_msec;
748         l->hist_msec = p->hist_msec;
749         l->hist_coarseness = p->hist_coarseness;
750         l->filename = strdup(filename);
751         l->td = p->td;
752
753         /* Initialize histogram lists for each r/w direction,
754          * with initial io_u_plat of all zeros:
755          */
756         for (i = 0; i < DDIR_RWDIR_CNT; i++) {
757                 list = &l->hist_window[i].list;
758                 INIT_FLIST_HEAD(list);
759                 entry = calloc(1, sizeof(struct io_u_plat_entry));
760                 flist_add(&entry->list, list);
761         }
762
763         if (l->td && l->td->o.io_submit_mode != IO_MODE_OFFLOAD) {
764                 unsigned int def_samples = DEF_LOG_ENTRIES;
765                 struct io_logs *__p;
766
767                 __p = calloc(1, sizeof(*l->pending));
768                 if (l->td->o.iodepth > DEF_LOG_ENTRIES)
769                         def_samples = roundup_pow2(l->td->o.iodepth);
770                 __p->max_samples = def_samples;
771                 __p->log = calloc(__p->max_samples, log_entry_sz(l));
772                 l->pending = __p;
773         }
774
775         if (l->log_offset)
776                 l->log_ddir_mask = LOG_OFFSET_SAMPLE_BIT;
777         if (l->log_prio)
778                 l->log_ddir_mask |= LOG_PRIO_SAMPLE_BIT;
779
780         INIT_FLIST_HEAD(&l->chunk_list);
781
782         if (l->log_gz && !p->td)
783                 l->log_gz = 0;
784         else if (l->log_gz || l->log_gz_store) {
785                 mutex_init_pshared(&l->chunk_lock);
786                 mutex_init_pshared(&l->deferred_free_lock);
787                 p->td->flags |= TD_F_COMPRESS_LOG;
788         }
789
790         *log = l;
791 }
792
793 #ifdef CONFIG_SETVBUF
794 static void *set_file_buffer(FILE *f)
795 {
796         size_t size = 1048576;
797         void *buf;
798
799         buf = malloc(size);
800         setvbuf(f, buf, _IOFBF, size);
801         return buf;
802 }
803
804 static void clear_file_buffer(void *buf)
805 {
806         free(buf);
807 }
808 #else
809 static void *set_file_buffer(FILE *f)
810 {
811         return NULL;
812 }
813
814 static void clear_file_buffer(void *buf)
815 {
816 }
817 #endif
818
819 void free_log(struct io_log *log)
820 {
821         while (!flist_empty(&log->io_logs)) {
822                 struct io_logs *cur_log;
823
824                 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
825                 flist_del_init(&cur_log->list);
826                 free(cur_log->log);
827                 sfree(cur_log);
828         }
829
830         if (log->pending) {
831                 free(log->pending->log);
832                 free(log->pending);
833                 log->pending = NULL;
834         }
835
836         free(log->pending);
837         free(log->filename);
838         sfree(log);
839 }
840
841 uint64_t hist_sum(int j, int stride, uint64_t *io_u_plat,
842                 uint64_t *io_u_plat_last)
843 {
844         uint64_t sum;
845         int k;
846
847         if (io_u_plat_last) {
848                 for (k = sum = 0; k < stride; k++)
849                         sum += io_u_plat[j + k] - io_u_plat_last[j + k];
850         } else {
851                 for (k = sum = 0; k < stride; k++)
852                         sum += io_u_plat[j + k];
853         }
854
855         return sum;
856 }
857
858 static void flush_hist_samples(FILE *f, int hist_coarseness, void *samples,
859                                uint64_t sample_size)
860 {
861         struct io_sample *s;
862         int log_offset;
863         uint64_t i, j, nr_samples;
864         struct io_u_plat_entry *entry, *entry_before;
865         uint64_t *io_u_plat;
866         uint64_t *io_u_plat_before;
867
868         int stride = 1 << hist_coarseness;
869         
870         if (!sample_size)
871                 return;
872
873         s = __get_sample(samples, 0, 0);
874         log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
875
876         nr_samples = sample_size / __log_entry_sz(log_offset);
877
878         for (i = 0; i < nr_samples; i++) {
879                 s = __get_sample(samples, log_offset, i);
880
881                 entry = s->data.plat_entry;
882                 io_u_plat = entry->io_u_plat;
883
884                 entry_before = flist_first_entry(&entry->list, struct io_u_plat_entry, list);
885                 io_u_plat_before = entry_before->io_u_plat;
886
887                 fprintf(f, "%lu, %u, %llu, ", (unsigned long) s->time,
888                                                 io_sample_ddir(s), (unsigned long long) s->bs);
889                 for (j = 0; j < FIO_IO_U_PLAT_NR - stride; j += stride) {
890                         fprintf(f, "%llu, ", (unsigned long long)
891                                 hist_sum(j, stride, io_u_plat, io_u_plat_before));
892                 }
893                 fprintf(f, "%llu\n", (unsigned long long)
894                         hist_sum(FIO_IO_U_PLAT_NR - stride, stride, io_u_plat,
895                                         io_u_plat_before));
896
897                 flist_del(&entry_before->list);
898                 free(entry_before);
899         }
900 }
901
902 void flush_samples(FILE *f, void *samples, uint64_t sample_size)
903 {
904         struct io_sample *s;
905         int log_offset, log_prio;
906         uint64_t i, nr_samples;
907         unsigned int prio_val;
908         const char *fmt;
909
910         if (!sample_size)
911                 return;
912
913         s = __get_sample(samples, 0, 0);
914         log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
915         log_prio = (s->__ddir & LOG_PRIO_SAMPLE_BIT) != 0;
916
917         if (log_offset) {
918                 if (log_prio)
919                         fmt = "%lu, %" PRId64 ", %u, %llu, %llu, 0x%04x\n";
920                 else
921                         fmt = "%lu, %" PRId64 ", %u, %llu, %llu, %u\n";
922         } else {
923                 if (log_prio)
924                         fmt = "%lu, %" PRId64 ", %u, %llu, 0x%04x\n";
925                 else
926                         fmt = "%lu, %" PRId64 ", %u, %llu, %u\n";
927         }
928
929         nr_samples = sample_size / __log_entry_sz(log_offset);
930
931         for (i = 0; i < nr_samples; i++) {
932                 s = __get_sample(samples, log_offset, i);
933
934                 if (log_prio)
935                         prio_val = s->priority;
936                 else
937                         prio_val = ioprio_value_is_class_rt(s->priority);
938
939                 if (!log_offset) {
940                         fprintf(f, fmt,
941                                 (unsigned long) s->time,
942                                 s->data.val,
943                                 io_sample_ddir(s), (unsigned long long) s->bs,
944                                 prio_val);
945                 } else {
946                         struct io_sample_offset *so = (void *) s;
947
948                         fprintf(f, fmt,
949                                 (unsigned long) s->time,
950                                 s->data.val,
951                                 io_sample_ddir(s), (unsigned long long) s->bs,
952                                 (unsigned long long) so->offset,
953                                 prio_val);
954                 }
955         }
956 }
957
958 #ifdef CONFIG_ZLIB
959
960 struct iolog_flush_data {
961         struct workqueue_work work;
962         struct io_log *log;
963         void *samples;
964         uint32_t nr_samples;
965         bool free;
966 };
967
968 #define GZ_CHUNK        131072
969
970 static struct iolog_compress *get_new_chunk(unsigned int seq)
971 {
972         struct iolog_compress *c;
973
974         c = malloc(sizeof(*c));
975         INIT_FLIST_HEAD(&c->list);
976         c->buf = malloc(GZ_CHUNK);
977         c->len = 0;
978         c->seq = seq;
979         return c;
980 }
981
982 static void free_chunk(struct iolog_compress *ic)
983 {
984         free(ic->buf);
985         free(ic);
986 }
987
988 static int z_stream_init(z_stream *stream, int gz_hdr)
989 {
990         int wbits = 15;
991
992         memset(stream, 0, sizeof(*stream));
993         stream->zalloc = Z_NULL;
994         stream->zfree = Z_NULL;
995         stream->opaque = Z_NULL;
996         stream->next_in = Z_NULL;
997
998         /*
999          * zlib magic - add 32 for auto-detection of gz header or not,
1000          * if we decide to store files in a gzip friendly format.
1001          */
1002         if (gz_hdr)
1003                 wbits += 32;
1004
1005         if (inflateInit2(stream, wbits) != Z_OK)
1006                 return 1;
1007
1008         return 0;
1009 }
1010
1011 struct inflate_chunk_iter {
1012         unsigned int seq;
1013         int err;
1014         void *buf;
1015         size_t buf_size;
1016         size_t buf_used;
1017         size_t chunk_sz;
1018 };
1019
1020 static void finish_chunk(z_stream *stream, FILE *f,
1021                          struct inflate_chunk_iter *iter)
1022 {
1023         int ret;
1024
1025         ret = inflateEnd(stream);
1026         if (ret != Z_OK)
1027                 log_err("fio: failed to end log inflation seq %d (%d)\n",
1028                                 iter->seq, ret);
1029
1030         flush_samples(f, iter->buf, iter->buf_used);
1031         free(iter->buf);
1032         iter->buf = NULL;
1033         iter->buf_size = iter->buf_used = 0;
1034 }
1035
1036 /*
1037  * Iterative chunk inflation. Handles cases where we cross into a new
1038  * sequence, doing flush finish of previous chunk if needed.
1039  */
1040 static size_t inflate_chunk(struct iolog_compress *ic, int gz_hdr, FILE *f,
1041                             z_stream *stream, struct inflate_chunk_iter *iter)
1042 {
1043         size_t ret;
1044
1045         dprint(FD_COMPRESS, "inflate chunk size=%lu, seq=%u\n",
1046                                 (unsigned long) ic->len, ic->seq);
1047
1048         if (ic->seq != iter->seq) {
1049                 if (iter->seq)
1050                         finish_chunk(stream, f, iter);
1051
1052                 z_stream_init(stream, gz_hdr);
1053                 iter->seq = ic->seq;
1054         }
1055
1056         stream->avail_in = ic->len;
1057         stream->next_in = ic->buf;
1058
1059         if (!iter->buf_size) {
1060                 iter->buf_size = iter->chunk_sz;
1061                 iter->buf = malloc(iter->buf_size);
1062         }
1063
1064         while (stream->avail_in) {
1065                 size_t this_out = iter->buf_size - iter->buf_used;
1066                 int err;
1067
1068                 stream->avail_out = this_out;
1069                 stream->next_out = iter->buf + iter->buf_used;
1070
1071                 err = inflate(stream, Z_NO_FLUSH);
1072                 if (err < 0) {
1073                         log_err("fio: failed inflating log: %d\n", err);
1074                         iter->err = err;
1075                         break;
1076                 }
1077
1078                 iter->buf_used += this_out - stream->avail_out;
1079
1080                 if (!stream->avail_out) {
1081                         iter->buf_size += iter->chunk_sz;
1082                         iter->buf = realloc(iter->buf, iter->buf_size);
1083                         continue;
1084                 }
1085
1086                 if (err == Z_STREAM_END)
1087                         break;
1088         }
1089
1090         ret = (void *) stream->next_in - ic->buf;
1091
1092         dprint(FD_COMPRESS, "inflated to size=%lu\n", (unsigned long) iter->buf_size);
1093
1094         return ret;
1095 }
1096
1097 /*
1098  * Inflate stored compressed chunks, or write them directly to the log
1099  * file if so instructed.
1100  */
1101 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1102 {
1103         struct inflate_chunk_iter iter = { .chunk_sz = log->log_gz, };
1104         z_stream stream;
1105
1106         while (!flist_empty(&log->chunk_list)) {
1107                 struct iolog_compress *ic;
1108
1109                 ic = flist_first_entry(&log->chunk_list, struct iolog_compress, list);
1110                 flist_del(&ic->list);
1111
1112                 if (log->log_gz_store) {
1113                         size_t ret;
1114
1115                         dprint(FD_COMPRESS, "log write chunk size=%lu, "
1116                                 "seq=%u\n", (unsigned long) ic->len, ic->seq);
1117
1118                         ret = fwrite(ic->buf, ic->len, 1, f);
1119                         if (ret != 1 || ferror(f)) {
1120                                 iter.err = errno;
1121                                 log_err("fio: error writing compressed log\n");
1122                         }
1123                 } else
1124                         inflate_chunk(ic, log->log_gz_store, f, &stream, &iter);
1125
1126                 free_chunk(ic);
1127         }
1128
1129         if (iter.seq) {
1130                 finish_chunk(&stream, f, &iter);
1131                 free(iter.buf);
1132         }
1133
1134         return iter.err;
1135 }
1136
1137 /*
1138  * Open compressed log file and decompress the stored chunks and
1139  * write them to stdout. The chunks are stored sequentially in the
1140  * file, so we iterate over them and do them one-by-one.
1141  */
1142 int iolog_file_inflate(const char *file)
1143 {
1144         struct inflate_chunk_iter iter = { .chunk_sz = 64 * 1024 * 1024, };
1145         struct iolog_compress ic;
1146         z_stream stream;
1147         struct stat sb;
1148         size_t ret;
1149         size_t total;
1150         void *buf;
1151         FILE *f;
1152
1153         f = fopen(file, "r");
1154         if (!f) {
1155                 perror("fopen");
1156                 return 1;
1157         }
1158
1159         if (stat(file, &sb) < 0) {
1160                 fclose(f);
1161                 perror("stat");
1162                 return 1;
1163         }
1164
1165         ic.buf = buf = malloc(sb.st_size);
1166         ic.len = sb.st_size;
1167         ic.seq = 1;
1168
1169         ret = fread(ic.buf, ic.len, 1, f);
1170         if (ret == 0 && ferror(f)) {
1171                 perror("fread");
1172                 fclose(f);
1173                 free(buf);
1174                 return 1;
1175         } else if (ferror(f) || (!feof(f) && ret != 1)) {
1176                 log_err("fio: short read on reading log\n");
1177                 fclose(f);
1178                 free(buf);
1179                 return 1;
1180         }
1181
1182         fclose(f);
1183
1184         /*
1185          * Each chunk will return Z_STREAM_END. We don't know how many
1186          * chunks are in the file, so we just keep looping and incrementing
1187          * the sequence number until we have consumed the whole compressed
1188          * file.
1189          */
1190         total = ic.len;
1191         do {
1192                 size_t iret;
1193
1194                 iret = inflate_chunk(&ic,  1, stdout, &stream, &iter);
1195                 total -= iret;
1196                 if (!total)
1197                         break;
1198                 if (iter.err)
1199                         break;
1200
1201                 ic.seq++;
1202                 ic.len -= iret;
1203                 ic.buf += iret;
1204         } while (1);
1205
1206         if (iter.seq) {
1207                 finish_chunk(&stream, stdout, &iter);
1208                 free(iter.buf);
1209         }
1210
1211         free(buf);
1212         return iter.err;
1213 }
1214
1215 #else
1216
1217 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1218 {
1219         return 0;
1220 }
1221
1222 int iolog_file_inflate(const char *file)
1223 {
1224         log_err("fio: log inflation not possible without zlib\n");
1225         return 1;
1226 }
1227
1228 #endif
1229
1230 void flush_log(struct io_log *log, bool do_append)
1231 {
1232         void *buf;
1233         FILE *f;
1234
1235         if (!do_append)
1236                 f = fopen(log->filename, "w");
1237         else
1238                 f = fopen(log->filename, "a");
1239         if (!f) {
1240                 perror("fopen log");
1241                 return;
1242         }
1243
1244         buf = set_file_buffer(f);
1245
1246         inflate_gz_chunks(log, f);
1247
1248         while (!flist_empty(&log->io_logs)) {
1249                 struct io_logs *cur_log;
1250
1251                 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1252                 flist_del_init(&cur_log->list);
1253                 
1254                 if (log->td && log == log->td->clat_hist_log)
1255                         flush_hist_samples(f, log->hist_coarseness, cur_log->log,
1256                                            log_sample_sz(log, cur_log));
1257                 else
1258                         flush_samples(f, cur_log->log, log_sample_sz(log, cur_log));
1259                 
1260                 sfree(cur_log);
1261         }
1262
1263         fclose(f);
1264         clear_file_buffer(buf);
1265 }
1266
1267 static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
1268 {
1269         if (td->flags & TD_F_COMPRESS_LOG)
1270                 iolog_flush(log);
1271
1272         if (trylock) {
1273                 if (fio_trylock_file(log->filename))
1274                         return 1;
1275         } else
1276                 fio_lock_file(log->filename);
1277
1278         if (td->client_type == FIO_CLIENT_TYPE_GUI || is_backend)
1279                 fio_send_iolog(td, log, log->filename);
1280         else
1281                 flush_log(log, !td->o.per_job_logs);
1282
1283         fio_unlock_file(log->filename);
1284         free_log(log);
1285         return 0;
1286 }
1287
1288 size_t log_chunk_sizes(struct io_log *log)
1289 {
1290         struct flist_head *entry;
1291         size_t ret;
1292
1293         if (flist_empty(&log->chunk_list))
1294                 return 0;
1295
1296         ret = 0;
1297         pthread_mutex_lock(&log->chunk_lock);
1298         flist_for_each(entry, &log->chunk_list) {
1299                 struct iolog_compress *c;
1300
1301                 c = flist_entry(entry, struct iolog_compress, list);
1302                 ret += c->len;
1303         }
1304         pthread_mutex_unlock(&log->chunk_lock);
1305         return ret;
1306 }
1307
1308 #ifdef CONFIG_ZLIB
1309
1310 static void iolog_put_deferred(struct io_log *log, void *ptr)
1311 {
1312         if (!ptr)
1313                 return;
1314
1315         pthread_mutex_lock(&log->deferred_free_lock);
1316         if (log->deferred < IOLOG_MAX_DEFER) {
1317                 log->deferred_items[log->deferred] = ptr;
1318                 log->deferred++;
1319         } else if (!fio_did_warn(FIO_WARN_IOLOG_DROP))
1320                 log_err("fio: had to drop log entry free\n");
1321         pthread_mutex_unlock(&log->deferred_free_lock);
1322 }
1323
1324 static void iolog_free_deferred(struct io_log *log)
1325 {
1326         int i;
1327
1328         if (!log->deferred)
1329                 return;
1330
1331         pthread_mutex_lock(&log->deferred_free_lock);
1332
1333         for (i = 0; i < log->deferred; i++) {
1334                 free(log->deferred_items[i]);
1335                 log->deferred_items[i] = NULL;
1336         }
1337
1338         log->deferred = 0;
1339         pthread_mutex_unlock(&log->deferred_free_lock);
1340 }
1341
1342 static int gz_work(struct iolog_flush_data *data)
1343 {
1344         struct iolog_compress *c = NULL;
1345         struct flist_head list;
1346         unsigned int seq;
1347         z_stream stream;
1348         size_t total = 0;
1349         int ret;
1350
1351         INIT_FLIST_HEAD(&list);
1352
1353         memset(&stream, 0, sizeof(stream));
1354         stream.zalloc = Z_NULL;
1355         stream.zfree = Z_NULL;
1356         stream.opaque = Z_NULL;
1357
1358         ret = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
1359         if (ret != Z_OK) {
1360                 log_err("fio: failed to init gz stream\n");
1361                 goto err;
1362         }
1363
1364         seq = ++data->log->chunk_seq;
1365
1366         stream.next_in = (void *) data->samples;
1367         stream.avail_in = data->nr_samples * log_entry_sz(data->log);
1368
1369         dprint(FD_COMPRESS, "deflate input size=%lu, seq=%u, log=%s\n",
1370                                 (unsigned long) stream.avail_in, seq,
1371                                 data->log->filename);
1372         do {
1373                 if (c)
1374                         dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1375                                 (unsigned long) c->len);
1376                 c = get_new_chunk(seq);
1377                 stream.avail_out = GZ_CHUNK;
1378                 stream.next_out = c->buf;
1379                 ret = deflate(&stream, Z_NO_FLUSH);
1380                 if (ret < 0) {
1381                         log_err("fio: deflate log (%d)\n", ret);
1382                         free_chunk(c);
1383                         goto err;
1384                 }
1385
1386                 c->len = GZ_CHUNK - stream.avail_out;
1387                 flist_add_tail(&c->list, &list);
1388                 total += c->len;
1389         } while (stream.avail_in);
1390
1391         stream.next_out = c->buf + c->len;
1392         stream.avail_out = GZ_CHUNK - c->len;
1393
1394         ret = deflate(&stream, Z_FINISH);
1395         if (ret < 0) {
1396                 /*
1397                  * Z_BUF_ERROR is special, it just means we need more
1398                  * output space. We'll handle that below. Treat any other
1399                  * error as fatal.
1400                  */
1401                 if (ret != Z_BUF_ERROR) {
1402                         log_err("fio: deflate log (%d)\n", ret);
1403                         flist_del(&c->list);
1404                         free_chunk(c);
1405                         goto err;
1406                 }
1407         }
1408
1409         total -= c->len;
1410         c->len = GZ_CHUNK - stream.avail_out;
1411         total += c->len;
1412         dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq, (unsigned long) c->len);
1413
1414         if (ret != Z_STREAM_END) {
1415                 do {
1416                         c = get_new_chunk(seq);
1417                         stream.avail_out = GZ_CHUNK;
1418                         stream.next_out = c->buf;
1419                         ret = deflate(&stream, Z_FINISH);
1420                         c->len = GZ_CHUNK - stream.avail_out;
1421                         total += c->len;
1422                         flist_add_tail(&c->list, &list);
1423                         dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1424                                 (unsigned long) c->len);
1425                 } while (ret != Z_STREAM_END);
1426         }
1427
1428         dprint(FD_COMPRESS, "deflated to size=%lu\n", (unsigned long) total);
1429
1430         ret = deflateEnd(&stream);
1431         if (ret != Z_OK)
1432                 log_err("fio: deflateEnd %d\n", ret);
1433
1434         iolog_put_deferred(data->log, data->samples);
1435
1436         if (!flist_empty(&list)) {
1437                 pthread_mutex_lock(&data->log->chunk_lock);
1438                 flist_splice_tail(&list, &data->log->chunk_list);
1439                 pthread_mutex_unlock(&data->log->chunk_lock);
1440         }
1441
1442         ret = 0;
1443 done:
1444         if (data->free)
1445                 sfree(data);
1446         return ret;
1447 err:
1448         while (!flist_empty(&list)) {
1449                 c = flist_first_entry(list.next, struct iolog_compress, list);
1450                 flist_del(&c->list);
1451                 free_chunk(c);
1452         }
1453         ret = 1;
1454         goto done;
1455 }
1456
1457 /*
1458  * Invoked from our compress helper thread, when logging would have exceeded
1459  * the specified memory limitation. Compresses the previously stored
1460  * entries.
1461  */
1462 static int gz_work_async(struct submit_worker *sw, struct workqueue_work *work)
1463 {
1464         return gz_work(container_of(work, struct iolog_flush_data, work));
1465 }
1466
1467 static int gz_init_worker(struct submit_worker *sw)
1468 {
1469         struct thread_data *td = sw->wq->td;
1470
1471         if (!fio_option_is_set(&td->o, log_gz_cpumask))
1472                 return 0;
1473
1474         if (fio_setaffinity(gettid(), td->o.log_gz_cpumask) == -1) {
1475                 log_err("gz: failed to set CPU affinity\n");
1476                 return 1;
1477         }
1478
1479         return 0;
1480 }
1481
1482 static struct workqueue_ops log_compress_wq_ops = {
1483         .fn             = gz_work_async,
1484         .init_worker_fn = gz_init_worker,
1485         .nice           = 1,
1486 };
1487
1488 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1489 {
1490         if (!(td->flags & TD_F_COMPRESS_LOG))
1491                 return 0;
1492
1493         workqueue_init(td, &td->log_compress_wq, &log_compress_wq_ops, 1, sk_out);
1494         return 0;
1495 }
1496
1497 void iolog_compress_exit(struct thread_data *td)
1498 {
1499         if (!(td->flags & TD_F_COMPRESS_LOG))
1500                 return;
1501
1502         workqueue_exit(&td->log_compress_wq);
1503 }
1504
1505 /*
1506  * Queue work item to compress the existing log entries. We reset the
1507  * current log to a small size, and reference the existing log in the
1508  * data that we queue for compression. Once compression has been done,
1509  * this old log is freed. If called with finish == true, will not return
1510  * until the log compression has completed, and will flush all previous
1511  * logs too
1512  */
1513 static int iolog_flush(struct io_log *log)
1514 {
1515         struct iolog_flush_data *data;
1516
1517         data = malloc(sizeof(*data));
1518         if (!data)
1519                 return 1;
1520
1521         data->log = log;
1522         data->free = false;
1523
1524         while (!flist_empty(&log->io_logs)) {
1525                 struct io_logs *cur_log;
1526
1527                 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1528                 flist_del_init(&cur_log->list);
1529
1530                 data->samples = cur_log->log;
1531                 data->nr_samples = cur_log->nr_samples;
1532
1533                 sfree(cur_log);
1534
1535                 gz_work(data);
1536         }
1537
1538         free(data);
1539         return 0;
1540 }
1541
1542 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1543 {
1544         struct iolog_flush_data *data;
1545
1546         data = smalloc(sizeof(*data));
1547         if (!data)
1548                 return 1;
1549
1550         data->log = log;
1551
1552         data->samples = cur_log->log;
1553         data->nr_samples = cur_log->nr_samples;
1554         data->free = true;
1555
1556         cur_log->nr_samples = cur_log->max_samples = 0;
1557         cur_log->log = NULL;
1558
1559         workqueue_enqueue(&log->td->log_compress_wq, &data->work);
1560
1561         iolog_free_deferred(log);
1562
1563         return 0;
1564 }
1565 #else
1566
1567 static int iolog_flush(struct io_log *log)
1568 {
1569         return 1;
1570 }
1571
1572 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1573 {
1574         return 1;
1575 }
1576
1577 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1578 {
1579         return 0;
1580 }
1581
1582 void iolog_compress_exit(struct thread_data *td)
1583 {
1584 }
1585
1586 #endif
1587
1588 struct io_logs *iolog_cur_log(struct io_log *log)
1589 {
1590         if (flist_empty(&log->io_logs))
1591                 return NULL;
1592
1593         return flist_last_entry(&log->io_logs, struct io_logs, list);
1594 }
1595
1596 uint64_t iolog_nr_samples(struct io_log *iolog)
1597 {
1598         struct flist_head *entry;
1599         uint64_t ret = 0;
1600
1601         flist_for_each(entry, &iolog->io_logs) {
1602                 struct io_logs *cur_log;
1603
1604                 cur_log = flist_entry(entry, struct io_logs, list);
1605                 ret += cur_log->nr_samples;
1606         }
1607
1608         return ret;
1609 }
1610
1611 static int __write_log(struct thread_data *td, struct io_log *log, int try)
1612 {
1613         if (log)
1614                 return finish_log(td, log, try);
1615
1616         return 0;
1617 }
1618
1619 static int write_iops_log(struct thread_data *td, int try, bool unit_log)
1620 {
1621         int ret;
1622
1623         if (per_unit_log(td->iops_log) != unit_log)
1624                 return 0;
1625
1626         ret = __write_log(td, td->iops_log, try);
1627         if (!ret)
1628                 td->iops_log = NULL;
1629
1630         return ret;
1631 }
1632
1633 static int write_slat_log(struct thread_data *td, int try, bool unit_log)
1634 {
1635         int ret;
1636
1637         if (!unit_log)
1638                 return 0;
1639
1640         ret = __write_log(td, td->slat_log, try);
1641         if (!ret)
1642                 td->slat_log = NULL;
1643
1644         return ret;
1645 }
1646
1647 static int write_clat_log(struct thread_data *td, int try, bool unit_log)
1648 {
1649         int ret;
1650
1651         if (!unit_log)
1652                 return 0;
1653
1654         ret = __write_log(td, td->clat_log, try);
1655         if (!ret)
1656                 td->clat_log = NULL;
1657
1658         return ret;
1659 }
1660
1661 static int write_clat_hist_log(struct thread_data *td, int try, bool unit_log)
1662 {
1663         int ret;
1664
1665         if (!unit_log)
1666                 return 0;
1667
1668         ret = __write_log(td, td->clat_hist_log, try);
1669         if (!ret)
1670                 td->clat_hist_log = NULL;
1671
1672         return ret;
1673 }
1674
1675 static int write_lat_log(struct thread_data *td, int try, bool unit_log)
1676 {
1677         int ret;
1678
1679         if (!unit_log)
1680                 return 0;
1681
1682         ret = __write_log(td, td->lat_log, try);
1683         if (!ret)
1684                 td->lat_log = NULL;
1685
1686         return ret;
1687 }
1688
1689 static int write_bandw_log(struct thread_data *td, int try, bool unit_log)
1690 {
1691         int ret;
1692
1693         if (per_unit_log(td->bw_log) != unit_log)
1694                 return 0;
1695
1696         ret = __write_log(td, td->bw_log, try);
1697         if (!ret)
1698                 td->bw_log = NULL;
1699
1700         return ret;
1701 }
1702
1703 enum {
1704         BW_LOG_MASK     = 1,
1705         LAT_LOG_MASK    = 2,
1706         SLAT_LOG_MASK   = 4,
1707         CLAT_LOG_MASK   = 8,
1708         IOPS_LOG_MASK   = 16,
1709         CLAT_HIST_LOG_MASK = 32,
1710
1711         ALL_LOG_NR      = 6,
1712 };
1713
1714 struct log_type {
1715         unsigned int mask;
1716         int (*fn)(struct thread_data *, int, bool);
1717 };
1718
1719 static struct log_type log_types[] = {
1720         {
1721                 .mask   = BW_LOG_MASK,
1722                 .fn     = write_bandw_log,
1723         },
1724         {
1725                 .mask   = LAT_LOG_MASK,
1726                 .fn     = write_lat_log,
1727         },
1728         {
1729                 .mask   = SLAT_LOG_MASK,
1730                 .fn     = write_slat_log,
1731         },
1732         {
1733                 .mask   = CLAT_LOG_MASK,
1734                 .fn     = write_clat_log,
1735         },
1736         {
1737                 .mask   = IOPS_LOG_MASK,
1738                 .fn     = write_iops_log,
1739         },
1740         {
1741                 .mask   = CLAT_HIST_LOG_MASK,
1742                 .fn     = write_clat_hist_log,
1743         }
1744 };
1745
1746 void td_writeout_logs(struct thread_data *td, bool unit_logs)
1747 {
1748         unsigned int log_mask = 0;
1749         unsigned int log_left = ALL_LOG_NR;
1750         int old_state, i;
1751
1752         old_state = td_bump_runstate(td, TD_FINISHING);
1753
1754         finalize_logs(td, unit_logs);
1755
1756         while (log_left) {
1757                 int prev_log_left = log_left;
1758
1759                 for (i = 0; i < ALL_LOG_NR && log_left; i++) {
1760                         struct log_type *lt = &log_types[i];
1761                         int ret;
1762
1763                         if (!(log_mask & lt->mask)) {
1764                                 ret = lt->fn(td, log_left != 1, unit_logs);
1765                                 if (!ret) {
1766                                         log_left--;
1767                                         log_mask |= lt->mask;
1768                                 }
1769                         }
1770                 }
1771
1772                 if (prev_log_left == log_left)
1773                         usleep(5000);
1774         }
1775
1776         td_restore_runstate(td, old_state);
1777 }
1778
1779 void fio_writeout_logs(bool unit_logs)
1780 {
1781         struct thread_data *td;
1782         int i;
1783
1784         for_each_td(td, i)
1785                 td_writeout_logs(td, unit_logs);
1786 }