dcf6083c7d38aac15227f414acf7e7cdc3a09050
[fio.git] / iolog.c
1 /*
2  * Code related to writing an iolog of what a thread is doing, and to
3  * later read that back and replay
4  */
5 #include <stdio.h>
6 #include <stdlib.h>
7 #include <assert.h>
8 #include <sys/types.h>
9 #include <sys/stat.h>
10 #include <unistd.h>
11 #ifdef CONFIG_ZLIB
12 #include <zlib.h>
13 #endif
14
15 #include "flist.h"
16 #include "fio.h"
17 #include "trim.h"
18 #include "filelock.h"
19 #include "smalloc.h"
20 #include "blktrace.h"
21 #include "pshared.h"
22 #include "lib/roundup.h"
23
24 #include <netinet/in.h>
25 #include <netinet/tcp.h>
26 #include <arpa/inet.h>
27 #include <sys/stat.h>
28 #include <sys/socket.h>
29 #include <sys/un.h>
30
31 static int iolog_flush(struct io_log *log);
32
33 static const char iolog_ver2[] = "fio version 2 iolog";
34 static const char iolog_ver3[] = "fio version 3 iolog";
35
36 void queue_io_piece(struct thread_data *td, struct io_piece *ipo)
37 {
38         flist_add_tail(&ipo->list, &td->io_log_list);
39         td->total_io_size += ipo->len;
40 }
41
42 void log_io_u(const struct thread_data *td, const struct io_u *io_u)
43 {
44         struct timespec now;
45
46         if (!td->o.write_iolog_file)
47                 return;
48
49         fio_gettime(&now, NULL);
50         fprintf(td->iolog_f, "%llu %s %s %llu %llu\n",
51                 (unsigned long long) utime_since_now(&td->io_log_start_time),
52                 io_u->file->file_name, io_ddir_name(io_u->ddir), io_u->offset,
53                 io_u->buflen);
54
55 }
56
57 void log_file(struct thread_data *td, struct fio_file *f,
58               enum file_log_act what)
59 {
60         const char *act[] = { "add", "open", "close" };
61         struct timespec now;
62
63         assert(what < 3);
64
65         if (!td->o.write_iolog_file)
66                 return;
67
68
69         /*
70          * this happens on the pre-open/close done before the job starts
71          */
72         if (!td->iolog_f)
73                 return;
74
75         fio_gettime(&now, NULL);
76         fprintf(td->iolog_f, "%llu %s %s\n",
77                 (unsigned long long) utime_since_now(&td->io_log_start_time),
78                 f->file_name, act[what]);
79 }
80
81 static void iolog_delay(struct thread_data *td, unsigned long delay)
82 {
83         uint64_t usec = utime_since_now(&td->last_issue);
84         unsigned long orig_delay = delay;
85         struct timespec ts;
86         int ret = 0;
87
88         if (delay < td->time_offset) {
89                 td->time_offset = 0;
90                 return;
91         }
92
93         delay -= td->time_offset;
94         if (delay < usec)
95                 return;
96
97         delay -= usec;
98
99         fio_gettime(&ts, NULL);
100
101         while (delay && !td->terminate) {
102                 ret = io_u_queued_complete(td, 0);
103                 if (ret < 0)
104                         td_verror(td, -ret, "io_u_queued_complete");
105                 if (td->flags & TD_F_REGROW_LOGS)
106                         regrow_logs(td);
107                 if (utime_since_now(&ts) > delay)
108                         break;
109         }
110
111         usec = utime_since_now(&ts);
112         if (usec > orig_delay)
113                 td->time_offset = usec - orig_delay;
114         else
115                 td->time_offset = 0;
116 }
117
118 static int ipo_special(struct thread_data *td, struct io_piece *ipo)
119 {
120         struct fio_file *f;
121         int ret;
122
123         /*
124          * Not a special ipo
125          */
126         if (ipo->ddir != DDIR_INVAL)
127                 return 0;
128
129         f = td->files[ipo->fileno];
130
131         if (ipo->delay)
132                 iolog_delay(td, ipo->delay);
133         if (fio_fill_issue_time(td))
134                 fio_gettime(&td->last_issue, NULL);
135         switch (ipo->file_action) {
136         case FIO_LOG_OPEN_FILE:
137                 if (td->o.replay_redirect && fio_file_open(f)) {
138                         dprint(FD_FILE, "iolog: ignoring re-open of file %s\n",
139                                         f->file_name);
140                         break;
141                 }
142                 ret = td_io_open_file(td, f);
143                 if (!ret) {
144                         if (td->o.dp_type != FIO_DP_NONE) {
145                                 int dp_init_ret = dp_init(td);
146
147                                 if (dp_init_ret != 0) {
148                                         td_verror(td, abs(dp_init_ret), "dp_init");
149                                         return -1;
150                                 }
151                         }
152                         break;
153                 }
154                 td_verror(td, ret, "iolog open file");
155                 return -1;
156         case FIO_LOG_CLOSE_FILE:
157                 td_io_close_file(td, f);
158                 break;
159         case FIO_LOG_UNLINK_FILE:
160                 td_io_unlink_file(td, f);
161                 break;
162         case FIO_LOG_ADD_FILE:
163                 /*
164                  * Nothing to do
165                  */
166                 break;
167         default:
168                 log_err("fio: bad file action %d\n", ipo->file_action);
169                 break;
170         }
171
172         return 1;
173 }
174
175 static bool read_iolog(struct thread_data *td);
176
177 unsigned long long delay_since_ttime(const struct thread_data *td,
178                unsigned long long time)
179 {
180         double tmp;
181         double scale;
182         const unsigned long long *last_ttime = &td->io_log_last_ttime;
183
184         if (!*last_ttime || td->o.no_stall || time < *last_ttime)
185                 return 0;
186         else if (td->o.replay_time_scale == 100)
187                 return time - *last_ttime;
188
189
190         scale = (double) 100.0 / (double) td->o.replay_time_scale;
191         tmp = time - *last_ttime;
192         return tmp * scale;
193 }
194
195 int read_iolog_get(struct thread_data *td, struct io_u *io_u)
196 {
197         struct io_piece *ipo;
198         unsigned long elapsed;
199
200         while (!flist_empty(&td->io_log_list)) {
201                 int ret;
202
203                 if (td->o.read_iolog_chunked) {
204                         if (td->io_log_checkmark == td->io_log_current) {
205                                 if (td->io_log_blktrace) {
206                                         if (!read_blktrace(td))
207                                                 return 1;
208                                 } else {
209                                         if (!read_iolog(td))
210                                                 return 1;
211                                 }
212                         }
213                         td->io_log_current--;
214                 }
215                 ipo = flist_first_entry(&td->io_log_list, struct io_piece, list);
216                 flist_del(&ipo->list);
217                 remove_trim_entry(td, ipo);
218
219                 ret = ipo_special(td, ipo);
220                 if (ret < 0) {
221                         free(ipo);
222                         break;
223                 } else if (ret > 0) {
224                         free(ipo);
225                         continue;
226                 }
227
228                 io_u->ddir = ipo->ddir;
229                 if (ipo->ddir != DDIR_WAIT) {
230                         io_u->offset = ipo->offset;
231                         io_u->verify_offset = ipo->offset;
232                         io_u->buflen = ipo->len;
233                         io_u->file = td->files[ipo->fileno];
234                         get_file(io_u->file);
235                         dprint(FD_IO, "iolog: get %llu/%llu/%s\n", io_u->offset,
236                                                 io_u->buflen, io_u->file->file_name);
237                         if (ipo->delay)
238                                 iolog_delay(td, ipo->delay);
239
240                         if (td->o.dp_type != FIO_DP_NONE)
241                                 dp_fill_dspec_data(td, io_u);
242                 } else {
243                         elapsed = mtime_since_genesis();
244                         if (ipo->delay > elapsed)
245                                 usec_sleep(td, (ipo->delay - elapsed) * 1000);
246                 }
247
248                 free(ipo);
249
250                 if (io_u->ddir != DDIR_WAIT)
251                         return 0;
252         }
253
254         td->done = 1;
255         return 1;
256 }
257
258 void prune_io_piece_log(struct thread_data *td)
259 {
260         struct io_piece *ipo;
261         struct fio_rb_node *n;
262
263         while ((n = rb_first(&td->io_hist_tree)) != NULL) {
264                 ipo = rb_entry(n, struct io_piece, rb_node);
265                 rb_erase(n, &td->io_hist_tree);
266                 remove_trim_entry(td, ipo);
267                 td->io_hist_len--;
268                 free(ipo);
269         }
270
271         while (!flist_empty(&td->io_hist_list)) {
272                 ipo = flist_first_entry(&td->io_hist_list, struct io_piece, list);
273                 flist_del(&ipo->list);
274                 remove_trim_entry(td, ipo);
275                 td->io_hist_len--;
276                 free(ipo);
277         }
278 }
279
280 /*
281  * log a successful write, so we can unwind the log for verify
282  */
283 void log_io_piece(struct thread_data *td, struct io_u *io_u)
284 {
285         struct fio_rb_node **p, *parent;
286         struct io_piece *ipo, *__ipo;
287
288         ipo = calloc(1, sizeof(struct io_piece));
289         init_ipo(ipo);
290         ipo->file = io_u->file;
291         ipo->offset = io_u->offset;
292         ipo->len = io_u->buflen;
293         ipo->numberio = io_u->numberio;
294         ipo->flags = IP_F_IN_FLIGHT;
295
296         io_u->ipo = ipo;
297
298         if (io_u_should_trim(td, io_u)) {
299                 flist_add_tail(&ipo->trim_list, &td->trim_list);
300                 td->trim_entries++;
301         }
302
303         /*
304          * Sort writes if we don't have a random map in which case we need to
305          * check for duplicate blocks and drop the old one, which we rely on
306          * the rb insert/lookup for handling. Sort writes if we have offset
307          * modifier which can also create duplicate blocks.
308          */
309         if (!fio_offset_overlap_risk(td)) {
310                 INIT_FLIST_HEAD(&ipo->list);
311                 flist_add_tail(&ipo->list, &td->io_hist_list);
312                 ipo->flags |= IP_F_ONLIST;
313                 td->io_hist_len++;
314                 return;
315         }
316
317         RB_CLEAR_NODE(&ipo->rb_node);
318
319         /*
320          * Sort the entry into the verification list
321          */
322 restart:
323         p = &td->io_hist_tree.rb_node;
324         parent = NULL;
325         while (*p) {
326                 int overlap = 0;
327                 parent = *p;
328
329                 __ipo = rb_entry(parent, struct io_piece, rb_node);
330                 if (ipo->file < __ipo->file)
331                         p = &(*p)->rb_left;
332                 else if (ipo->file > __ipo->file)
333                         p = &(*p)->rb_right;
334                 else if (ipo->offset < __ipo->offset) {
335                         p = &(*p)->rb_left;
336                         overlap = ipo->offset + ipo->len > __ipo->offset;
337                 }
338                 else if (ipo->offset > __ipo->offset) {
339                         p = &(*p)->rb_right;
340                         overlap = __ipo->offset + __ipo->len > ipo->offset;
341                 }
342                 else
343                         overlap = 1;
344
345                 if (overlap) {
346                         dprint(FD_IO, "iolog: overlap %llu/%lu, %llu/%lu\n",
347                                 __ipo->offset, __ipo->len,
348                                 ipo->offset, ipo->len);
349                         td->io_hist_len--;
350                         rb_erase(parent, &td->io_hist_tree);
351                         remove_trim_entry(td, __ipo);
352                         if (!(__ipo->flags & IP_F_IN_FLIGHT))
353                                 free(__ipo);
354                         goto restart;
355                 }
356         }
357
358         rb_link_node(&ipo->rb_node, parent, p);
359         rb_insert_color(&ipo->rb_node, &td->io_hist_tree);
360         ipo->flags |= IP_F_ONRB;
361         td->io_hist_len++;
362 }
363
364 void unlog_io_piece(struct thread_data *td, struct io_u *io_u)
365 {
366         struct io_piece *ipo = io_u->ipo;
367
368         if (td->ts.nr_block_infos) {
369                 uint32_t *info = io_u_block_info(td, io_u);
370                 if (BLOCK_INFO_STATE(*info) < BLOCK_STATE_TRIM_FAILURE) {
371                         if (io_u->ddir == DDIR_TRIM)
372                                 *info = BLOCK_INFO_SET_STATE(*info,
373                                                 BLOCK_STATE_TRIM_FAILURE);
374                         else if (io_u->ddir == DDIR_WRITE)
375                                 *info = BLOCK_INFO_SET_STATE(*info,
376                                                 BLOCK_STATE_WRITE_FAILURE);
377                 }
378         }
379
380         if (!ipo)
381                 return;
382
383         if (ipo->flags & IP_F_ONRB)
384                 rb_erase(&ipo->rb_node, &td->io_hist_tree);
385         else if (ipo->flags & IP_F_ONLIST)
386                 flist_del(&ipo->list);
387
388         free(ipo);
389         io_u->ipo = NULL;
390         td->io_hist_len--;
391 }
392
393 void trim_io_piece(const struct io_u *io_u)
394 {
395         struct io_piece *ipo = io_u->ipo;
396
397         if (!ipo)
398                 return;
399
400         ipo->len = io_u->xfer_buflen - io_u->resid;
401 }
402
403 void write_iolog_close(struct thread_data *td)
404 {
405         if (!td->iolog_f)
406                 return;
407
408         fflush(td->iolog_f);
409         fclose(td->iolog_f);
410         free(td->iolog_buf);
411         td->iolog_f = NULL;
412         td->iolog_buf = NULL;
413 }
414
415 int64_t iolog_items_to_fetch(struct thread_data *td)
416 {
417         struct timespec now;
418         uint64_t elapsed;
419         uint64_t for_1s;
420         int64_t items_to_fetch;
421
422         if (!td->io_log_highmark)
423                 return 10;
424
425
426         fio_gettime(&now, NULL);
427         elapsed = ntime_since(&td->io_log_highmark_time, &now);
428         if (elapsed) {
429                 for_1s = (td->io_log_highmark - td->io_log_current) * 1000000000 / elapsed;
430                 items_to_fetch = for_1s - td->io_log_current;
431                 if (items_to_fetch < 0)
432                         items_to_fetch = 0;
433         } else
434                 items_to_fetch = 0;
435
436         td->io_log_highmark = td->io_log_current + items_to_fetch;
437         td->io_log_checkmark = (td->io_log_highmark + 1) / 2;
438         fio_gettime(&td->io_log_highmark_time, NULL);
439
440         return items_to_fetch;
441 }
442
443 #define io_act(_td, _r) (((_td)->io_log_version == 3 && (r) == 5) || \
444                                         ((_td)->io_log_version == 2 && (r) == 4))
445 #define file_act(_td, _r) (((_td)->io_log_version == 3 && (r) == 3) || \
446                                         ((_td)->io_log_version == 2 && (r) == 2))
447
448 /*
449  * Read version 2 and 3 iolog data. It is enhanced to include per-file logging,
450  * syncs, etc.
451  */
452 static bool read_iolog(struct thread_data *td)
453 {
454         unsigned long long offset;
455         unsigned int bytes;
456         unsigned long long delay = 0;
457         int reads, writes, trims, waits, fileno = 0, file_action = 0; /* stupid gcc */
458         char *rfname, *fname, *act;
459         char *str, *p;
460         enum fio_ddir rw;
461         bool realloc = false;
462         int64_t items_to_fetch = 0;
463         int syncs;
464
465         if (td->o.read_iolog_chunked) {
466                 items_to_fetch = iolog_items_to_fetch(td);
467                 if (!items_to_fetch)
468                         return true;
469         }
470
471         /*
472          * Read in the read iolog and store it, reuse the infrastructure
473          * for doing verifications.
474          */
475         str = malloc(4096);
476         rfname = fname = malloc(256+16);
477         act = malloc(256+16);
478
479         syncs = reads = writes = trims = waits = 0;
480         while ((p = fgets(str, 4096, td->io_log_rfile)) != NULL) {
481                 struct io_piece *ipo;
482                 int r;
483                 unsigned long long ttime;
484
485                 if (td->io_log_version == 3) {
486                         r = sscanf(p, "%llu %256s %256s %llu %u", &ttime, rfname, act,
487                                                         &offset, &bytes);
488                         delay = delay_since_ttime(td, ttime);
489                         td->io_log_last_ttime = ttime;
490                         /*
491                          * "wait" is not allowed with version 3
492                          */
493                         if (!strcmp(act, "wait")) {
494                                 log_err("iolog: ignoring wait command with"
495                                         " version 3 for file %s\n", fname);
496                                 continue;
497                         }
498                 } else /* version 2 */
499                         r = sscanf(p, "%256s %256s %llu %u", rfname, act, &offset, &bytes);
500
501                 if (td->o.replay_redirect)
502                         fname = td->o.replay_redirect;
503
504                 if (io_act(td, r)) {
505                         /*
506                          * Check action first
507                          */
508                         if (!strcmp(act, "wait"))
509                                 rw = DDIR_WAIT;
510                         else if (!strcmp(act, "read")) {
511                                 if (td->o.replay_skip & (1u << DDIR_READ))
512                                         continue;
513                                 rw = DDIR_READ;
514                         } else if (!strcmp(act, "write")) {
515                                 if (td->o.replay_skip & (1u << DDIR_WRITE))
516                                         continue;
517                                 rw = DDIR_WRITE;
518                         } else if (!strcmp(act, "sync")) {
519                                 if (td->o.replay_skip & (1u << DDIR_SYNC))
520                                         continue;
521                                 rw = DDIR_SYNC;
522                         } else if (!strcmp(act, "datasync"))
523                                 rw = DDIR_DATASYNC;
524                         else if (!strcmp(act, "trim")) {
525                                 if (td->o.replay_skip & (1u << DDIR_TRIM))
526                                         continue;
527                                 rw = DDIR_TRIM;
528                         } else {
529                                 log_err("fio: bad iolog file action: %s\n",
530                                                                         act);
531                                 continue;
532                         }
533                         fileno = get_fileno(td, fname);
534                 } else if (file_act(td, r)) {
535                         rw = DDIR_INVAL;
536                         if (!strcmp(act, "add")) {
537                                 if (td->o.replay_redirect &&
538                                     get_fileno(td, fname) != -1) {
539                                         dprint(FD_FILE, "iolog: ignoring"
540                                                 " re-add of file %s\n", fname);
541                                 } else {
542                                         fileno = add_file(td, fname, td->subjob_number, 1);
543                                         file_action = FIO_LOG_ADD_FILE;
544                                 }
545                         } else if (!strcmp(act, "open")) {
546                                 fileno = get_fileno(td, fname);
547                                 file_action = FIO_LOG_OPEN_FILE;
548                         } else if (!strcmp(act, "close")) {
549                                 fileno = get_fileno(td, fname);
550                                 file_action = FIO_LOG_CLOSE_FILE;
551                         } else {
552                                 log_err("fio: bad iolog file action: %s\n",
553                                                                         act);
554                                 continue;
555                         }
556                 } else {
557                         log_err("bad iolog%d: %s\n", td->io_log_version, p);
558                         continue;
559                 }
560
561                 if (rw == DDIR_READ)
562                         reads++;
563                 else if (rw == DDIR_WRITE) {
564                         /*
565                          * Don't add a write for ro mode
566                          */
567                         if (read_only)
568                                 continue;
569                         writes++;
570                 } else if (rw == DDIR_TRIM) {
571                         /*
572                          * Don't add a trim for ro mode
573                          */
574                         if (read_only)
575                                 continue;
576                         trims++;
577                 } else if (rw == DDIR_WAIT) {
578                         if (td->o.no_stall)
579                                 continue;
580                         waits++;
581                 } else if (rw == DDIR_INVAL) {
582                 } else if (ddir_sync(rw)) {
583                         syncs++;
584                 } else {
585                         log_err("bad ddir: %d\n", rw);
586                         continue;
587                 }
588
589                 /*
590                  * Make note of file
591                  */
592                 ipo = calloc(1, sizeof(*ipo));
593                 init_ipo(ipo);
594                 ipo->ddir = rw;
595                 if (td->io_log_version == 3)
596                         ipo->delay = delay;
597                 if (rw == DDIR_WAIT) {
598                         ipo->delay = offset;
599                 } else {
600                         if (td->o.replay_scale)
601                                 ipo->offset = offset / td->o.replay_scale;
602                         else
603                                 ipo->offset = offset;
604                         ipo_bytes_align(td->o.replay_align, ipo);
605
606                         ipo->len = bytes;
607                         if (rw != DDIR_INVAL && bytes > td->o.max_bs[rw]) {
608                                 realloc = true;
609                                 td->o.max_bs[rw] = bytes;
610                         }
611                         ipo->fileno = fileno;
612                         ipo->file_action = file_action;
613                         td->o.size += bytes;
614                 }
615
616                 queue_io_piece(td, ipo);
617
618                 if (td->o.read_iolog_chunked) {
619                         td->io_log_current++;
620                         items_to_fetch--;
621                         if (items_to_fetch == 0)
622                                 break;
623                 }
624         }
625
626         free(str);
627         free(act);
628         free(rfname);
629
630         if (td->o.read_iolog_chunked) {
631                 td->io_log_highmark = td->io_log_current;
632                 td->io_log_checkmark = (td->io_log_highmark + 1) / 2;
633                 fio_gettime(&td->io_log_highmark_time, NULL);
634         }
635
636         if (writes && read_only) {
637                 log_err("fio: <%s> skips replay of %d writes due to"
638                         " read-only\n", td->o.name, writes);
639                 writes = 0;
640         }
641         if (syncs)
642                 td->flags |= TD_F_SYNCS;
643
644         if (td->o.read_iolog_chunked) {
645                 if (td->io_log_current == 0) {
646                         return false;
647                 }
648                 td->o.td_ddir = TD_DDIR_RW;
649                 if (realloc && td->orig_buffer)
650                 {
651                         io_u_quiesce(td);
652                         free_io_mem(td);
653                         if (init_io_u_buffers(td))
654                                 return false;
655                 }
656                 return true;
657         }
658
659         if (!reads && !writes && !waits && !trims)
660                 return false;
661
662         td->o.td_ddir = 0;
663         if (reads)
664                 td->o.td_ddir |= TD_DDIR_READ;
665         if (writes)
666                 td->o.td_ddir |= TD_DDIR_WRITE;
667         if (trims)
668                 td->o.td_ddir |= TD_DDIR_TRIM;
669
670         return true;
671 }
672
673 static bool is_socket(const char *path)
674 {
675         struct stat buf;
676         int r;
677
678         r = stat(path, &buf);
679         if (r == -1)
680                 return false;
681
682         return S_ISSOCK(buf.st_mode);
683 }
684
685 static int open_socket(const char *path)
686 {
687         struct sockaddr_un addr;
688         int ret, fd;
689
690         fd = socket(AF_UNIX, SOCK_STREAM, 0);
691         if (fd < 0)
692                 return fd;
693
694         addr.sun_family = AF_UNIX;
695         if (snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", path) >=
696             sizeof(addr.sun_path)) {
697                 log_err("%s: path name %s is too long for a Unix socket\n",
698                         __func__, path);
699         }
700
701         ret = connect(fd, (const struct sockaddr *)&addr, strlen(path) + sizeof(addr.sun_family));
702         if (!ret)
703                 return fd;
704
705         close(fd);
706         return -1;
707 }
708
709 /*
710  * open iolog, check version, and call appropriate parser
711  */
712 static bool init_iolog_read(struct thread_data *td, char *fname)
713 {
714         char buffer[256], *p;
715         FILE *f = NULL;
716
717         dprint(FD_IO, "iolog: name=%s\n", fname);
718
719         if (is_socket(fname)) {
720                 int fd;
721
722                 fd = open_socket(fname);
723                 if (fd >= 0)
724                         f = fdopen(fd, "r");
725         } else if (!strcmp(fname, "-")) {
726                 f = stdin;
727         } else
728                 f = fopen(fname, "r");
729
730         if (!f) {
731                 perror("fopen read iolog");
732                 return false;
733         }
734
735         p = fgets(buffer, sizeof(buffer), f);
736         if (!p) {
737                 td_verror(td, errno, "iolog read");
738                 log_err("fio: unable to read iolog\n");
739                 fclose(f);
740                 return false;
741         }
742
743         /*
744          * versions 2 and 3 of the iolog store a specific string as the
745          * first line, check for that
746          */
747         if (!strncmp(iolog_ver2, buffer, strlen(iolog_ver2)))
748                 td->io_log_version = 2;
749         else if (!strncmp(iolog_ver3, buffer, strlen(iolog_ver3)))
750                 td->io_log_version = 3;
751         else {
752                 log_err("fio: iolog version 1 is no longer supported\n");
753                 fclose(f);
754                 return false;
755         }
756
757         free_release_files(td);
758         td->io_log_rfile = f;
759         return read_iolog(td);
760 }
761
762 /*
763  * Set up a log for storing io patterns.
764  */
765 static bool init_iolog_write(struct thread_data *td)
766 {
767         struct fio_file *ff;
768         FILE *f;
769         unsigned int i;
770
771         f = fopen(td->o.write_iolog_file, "a");
772         if (!f) {
773                 perror("fopen write iolog");
774                 return false;
775         }
776
777         /*
778          * That's it for writing, setup a log buffer and we're done.
779           */
780         td->iolog_f = f;
781         td->iolog_buf = malloc(8192);
782         setvbuf(f, td->iolog_buf, _IOFBF, 8192);
783         fio_gettime(&td->io_log_start_time, NULL);
784
785         /*
786          * write our version line
787          */
788         if (fprintf(f, "%s\n", iolog_ver3) < 0) {
789                 perror("iolog init\n");
790                 return false;
791         }
792
793         /*
794          * add all known files
795          */
796         for_each_file(td, ff, i)
797                 log_file(td, ff, FIO_LOG_ADD_FILE);
798
799         return true;
800 }
801
802 bool init_iolog(struct thread_data *td)
803 {
804         bool ret;
805
806         if (td->o.read_iolog_file) {
807                 int need_swap;
808                 char * fname = get_name_by_idx(td->o.read_iolog_file, td->subjob_number);
809
810                 /*
811                  * Check if it's a blktrace file and load that if possible.
812                  * Otherwise assume it's a normal log file and load that.
813                  */
814                 if (is_blktrace(fname, &need_swap)) {
815                         td->io_log_blktrace = 1;
816                         ret = init_blktrace_read(td, fname, need_swap);
817                 } else {
818                         td->io_log_blktrace = 0;
819                         ret = init_iolog_read(td, fname);
820                 }
821                 free(fname);
822         } else if (td->o.write_iolog_file)
823                 ret = init_iolog_write(td);
824         else
825                 ret = true;
826
827         if (!ret)
828                 td_verror(td, EINVAL, "failed initializing iolog");
829
830         init_disk_util(td);
831
832         return ret;
833 }
834
835 void setup_log(struct io_log **log, struct log_params *p,
836                const char *filename)
837 {
838         struct io_log *l;
839         int i;
840         struct io_u_plat_entry *entry;
841         struct flist_head *list;
842
843         l = scalloc(1, sizeof(*l));
844         assert(l);
845         INIT_FLIST_HEAD(&l->io_logs);
846         l->log_type = p->log_type;
847         l->log_offset = p->log_offset;
848         l->log_prio = p->log_prio;
849         l->log_issue_time = p->log_issue_time;
850         l->log_gz = p->log_gz;
851         l->log_gz_store = p->log_gz_store;
852         l->avg_msec = p->avg_msec;
853         l->hist_msec = p->hist_msec;
854         l->hist_coarseness = p->hist_coarseness;
855         l->filename = strdup(filename);
856         l->td = p->td;
857
858         /* Initialize histogram lists for each r/w direction,
859          * with initial io_u_plat of all zeros:
860          */
861         for (i = 0; i < DDIR_RWDIR_CNT; i++) {
862                 list = &l->hist_window[i].list;
863                 INIT_FLIST_HEAD(list);
864                 entry = calloc(1, sizeof(struct io_u_plat_entry));
865                 flist_add(&entry->list, list);
866         }
867
868         if (l->td && l->td->o.io_submit_mode != IO_MODE_OFFLOAD) {
869                 unsigned int def_samples = DEF_LOG_ENTRIES;
870                 struct io_logs *__p;
871
872                 __p = calloc(1, sizeof(*l->pending));
873                 if (l->td->o.iodepth > DEF_LOG_ENTRIES)
874                         def_samples = roundup_pow2(l->td->o.iodepth);
875                 __p->max_samples = def_samples;
876                 __p->log = calloc(__p->max_samples, log_entry_sz(l));
877                 l->pending = __p;
878         }
879
880         if (l->log_offset)
881                 l->log_ddir_mask = LOG_OFFSET_SAMPLE_BIT;
882         if (l->log_prio)
883                 l->log_ddir_mask |= LOG_PRIO_SAMPLE_BIT;
884         /*
885          * The bandwidth-log option generates agg-read_bw.log,
886          * agg-write_bw.log and agg-trim_bw.log for which l->td is NULL.
887          * Check if l->td is valid before dereferencing it.
888          */
889         if (l->td && l->td->o.log_max == IO_LOG_SAMPLE_BOTH)
890                 l->log_ddir_mask |= LOG_AVG_MAX_SAMPLE_BIT;
891
892         if (l->log_issue_time)
893                 l->log_ddir_mask |= LOG_ISSUE_TIME_SAMPLE_BIT;
894
895         INIT_FLIST_HEAD(&l->chunk_list);
896
897         if (l->log_gz && !p->td)
898                 l->log_gz = 0;
899         else if (l->log_gz || l->log_gz_store) {
900                 mutex_init_pshared(&l->chunk_lock);
901                 mutex_init_pshared(&l->deferred_free_lock);
902                 p->td->flags |= TD_F_COMPRESS_LOG;
903         }
904
905         *log = l;
906 }
907
908 #ifdef CONFIG_SETVBUF
909 static void *set_file_buffer(FILE *f)
910 {
911         size_t size = 1048576;
912         void *buf;
913
914         buf = malloc(size);
915         setvbuf(f, buf, _IOFBF, size);
916         return buf;
917 }
918
919 static void clear_file_buffer(void *buf)
920 {
921         free(buf);
922 }
923 #else
924 static void *set_file_buffer(FILE *f)
925 {
926         return NULL;
927 }
928
929 static void clear_file_buffer(void *buf)
930 {
931 }
932 #endif
933
934 void free_log(struct io_log *log)
935 {
936         while (!flist_empty(&log->io_logs)) {
937                 struct io_logs *cur_log;
938
939                 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
940                 flist_del_init(&cur_log->list);
941                 free(cur_log->log);
942                 sfree(cur_log);
943         }
944
945         if (log->pending) {
946                 free(log->pending->log);
947                 free(log->pending);
948                 log->pending = NULL;
949         }
950
951         free(log->pending);
952         free(log->filename);
953         sfree(log);
954 }
955
956 uint64_t hist_sum(int j, int stride, uint64_t *io_u_plat,
957                 uint64_t *io_u_plat_last)
958 {
959         uint64_t sum;
960         int k;
961
962         if (io_u_plat_last) {
963                 for (k = sum = 0; k < stride; k++)
964                         sum += io_u_plat[j + k] - io_u_plat_last[j + k];
965         } else {
966                 for (k = sum = 0; k < stride; k++)
967                         sum += io_u_plat[j + k];
968         }
969
970         return sum;
971 }
972
973 static void flush_hist_samples(FILE *f, int hist_coarseness, void *samples,
974                                uint64_t sample_size)
975 {
976         struct io_sample *s;
977         bool log_offset, log_issue_time;
978         uint64_t i, j, nr_samples;
979         struct io_u_plat_entry *entry, *entry_before;
980         uint64_t *io_u_plat;
981         uint64_t *io_u_plat_before;
982
983         int stride = 1 << hist_coarseness;
984         
985         if (!sample_size)
986                 return;
987
988         s = __get_sample(samples, 0, 0, 0);
989         log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
990         log_issue_time = (s->__ddir & LOG_ISSUE_TIME_SAMPLE_BIT) != 0;
991
992         nr_samples = sample_size / __log_entry_sz(log_offset, log_issue_time);
993
994         for (i = 0; i < nr_samples; i++) {
995                 s = __get_sample(samples, log_offset, log_issue_time, i);
996
997                 entry = s->data.plat_entry;
998                 io_u_plat = entry->io_u_plat;
999
1000                 entry_before = flist_first_entry(&entry->list, struct io_u_plat_entry, list);
1001                 io_u_plat_before = entry_before->io_u_plat;
1002
1003                 fprintf(f, "%lu, %u, %llu, ", (unsigned long) s->time,
1004                                                 io_sample_ddir(s), (unsigned long long) s->bs);
1005                 for (j = 0; j < FIO_IO_U_PLAT_NR - stride; j += stride) {
1006                         fprintf(f, "%llu, ", (unsigned long long)
1007                                 hist_sum(j, stride, io_u_plat, io_u_plat_before));
1008                 }
1009                 fprintf(f, "%llu\n", (unsigned long long)
1010                         hist_sum(FIO_IO_U_PLAT_NR - stride, stride, io_u_plat,
1011                                         io_u_plat_before));
1012
1013                 flist_del(&entry_before->list);
1014                 free(entry_before);
1015         }
1016 }
1017
1018 static int print_sample_fields(char **p, size_t *left, const char *fmt, ...) {
1019         va_list ap;
1020         int ret;
1021
1022         va_start(ap, fmt);
1023         ret = vsnprintf(*p, *left, fmt, ap);
1024         if (ret < 0 || ret >= *left) {
1025                 log_err("sample file write failed: %d\n", ret);
1026                 va_end(ap);
1027                 return -1;
1028         }
1029         va_end(ap);
1030
1031         *p += ret;
1032         *left -= ret;
1033
1034         return 0;
1035 }
1036
1037 /*
1038  * flush_samples - Generate output for log samples
1039  * Each sample output is built using a temporary buffer. This buffer size
1040  * assumptions are:
1041  * - Each sample has less than 10 fields
1042  * - Each sample field fits in 25 characters (20 digits for 64 bit number
1043  *   and a few bytes delimiter)
1044  */
1045 void flush_samples(FILE *f, void *samples, uint64_t sample_size)
1046 {
1047         struct io_sample *s;
1048         bool log_offset, log_prio, log_avg_max, log_issue_time;
1049         uint64_t i, nr_samples;
1050         char buf[256];
1051         char *p;
1052         size_t left;
1053         int ret;
1054
1055         if (!sample_size)
1056                 return;
1057
1058         s = __get_sample(samples, 0, 0, 0);
1059         log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
1060         log_prio = (s->__ddir & LOG_PRIO_SAMPLE_BIT) != 0;
1061         log_avg_max = (s->__ddir & LOG_AVG_MAX_SAMPLE_BIT) != 0;
1062         log_issue_time = (s->__ddir & LOG_ISSUE_TIME_SAMPLE_BIT) != 0;
1063
1064         nr_samples = sample_size / __log_entry_sz(log_offset, log_issue_time);
1065
1066         for (i = 0; i < nr_samples; i++) {
1067                 s = __get_sample(samples, log_offset, log_issue_time, i);
1068                 p = buf;
1069                 left = sizeof(buf);
1070
1071                 ret = print_sample_fields(&p, &left, "%" PRIu64 ", %" PRId64,
1072                                           s->time, s->data.val.val0);
1073                 if (ret)
1074                         return;
1075
1076                 if (log_avg_max) {
1077                         ret = print_sample_fields(&p, &left, ", %" PRId64,
1078                                                   s->data.val.val1);
1079                         if (ret)
1080                                 return;
1081                 }
1082
1083                 ret = print_sample_fields(&p, &left, ", %u, %llu",
1084                                           io_sample_ddir(s),
1085                                           (unsigned long long) s->bs);
1086                 if (ret)
1087                         return;
1088
1089                 if (log_offset) {
1090                         ret = print_sample_fields(&p, &left, ", %llu",
1091                                                   (unsigned long long) s->aux[IOS_AUX_OFFSET_INDEX]);
1092                         if (ret)
1093                                 return;
1094                 }
1095
1096                 if (log_prio)
1097                         ret = print_sample_fields(&p, &left, ", 0x%04x",
1098                                                   s->priority);
1099                 else
1100                         ret = print_sample_fields(&p, &left, ", %u",
1101                                                   ioprio_value_is_class_rt(s->priority));
1102                 if (ret)
1103                         return;
1104
1105                 if (log_issue_time) {
1106                         ret = print_sample_fields(&p, &left, ", %llu",
1107                                                   (unsigned long long) s->aux[IOS_AUX_ISSUE_TIME_INDEX]);
1108                         if (ret)
1109                                 return;
1110                 }
1111
1112                 fprintf(f, "%s\n", buf);
1113         }
1114 }
1115
1116 #ifdef CONFIG_ZLIB
1117
1118 struct iolog_flush_data {
1119         struct workqueue_work work;
1120         struct io_log *log;
1121         void *samples;
1122         uint32_t nr_samples;
1123         bool free;
1124 };
1125
1126 #define GZ_CHUNK        131072
1127
1128 static struct iolog_compress *get_new_chunk(unsigned int seq)
1129 {
1130         struct iolog_compress *c;
1131
1132         c = malloc(sizeof(*c));
1133         INIT_FLIST_HEAD(&c->list);
1134         c->buf = malloc(GZ_CHUNK);
1135         c->len = 0;
1136         c->seq = seq;
1137         return c;
1138 }
1139
1140 static void free_chunk(struct iolog_compress *ic)
1141 {
1142         free(ic->buf);
1143         free(ic);
1144 }
1145
1146 static int z_stream_init(z_stream *stream, int gz_hdr)
1147 {
1148         int wbits = 15;
1149
1150         memset(stream, 0, sizeof(*stream));
1151         stream->zalloc = Z_NULL;
1152         stream->zfree = Z_NULL;
1153         stream->opaque = Z_NULL;
1154         stream->next_in = Z_NULL;
1155
1156         /*
1157          * zlib magic - add 32 for auto-detection of gz header or not,
1158          * if we decide to store files in a gzip friendly format.
1159          */
1160         if (gz_hdr)
1161                 wbits += 32;
1162
1163         if (inflateInit2(stream, wbits) != Z_OK)
1164                 return 1;
1165
1166         return 0;
1167 }
1168
1169 struct inflate_chunk_iter {
1170         unsigned int seq;
1171         int err;
1172         void *buf;
1173         size_t buf_size;
1174         size_t buf_used;
1175         size_t chunk_sz;
1176 };
1177
1178 static void finish_chunk(z_stream *stream, FILE *f,
1179                          struct inflate_chunk_iter *iter)
1180 {
1181         int ret;
1182
1183         ret = inflateEnd(stream);
1184         if (ret != Z_OK)
1185                 log_err("fio: failed to end log inflation seq %d (%d)\n",
1186                                 iter->seq, ret);
1187
1188         flush_samples(f, iter->buf, iter->buf_used);
1189         free(iter->buf);
1190         iter->buf = NULL;
1191         iter->buf_size = iter->buf_used = 0;
1192 }
1193
1194 /*
1195  * Iterative chunk inflation. Handles cases where we cross into a new
1196  * sequence, doing flush finish of previous chunk if needed.
1197  */
1198 static size_t inflate_chunk(struct iolog_compress *ic, int gz_hdr, FILE *f,
1199                             z_stream *stream, struct inflate_chunk_iter *iter)
1200 {
1201         size_t ret;
1202
1203         dprint(FD_COMPRESS, "inflate chunk size=%lu, seq=%u\n",
1204                                 (unsigned long) ic->len, ic->seq);
1205
1206         if (ic->seq != iter->seq) {
1207                 if (iter->seq)
1208                         finish_chunk(stream, f, iter);
1209
1210                 z_stream_init(stream, gz_hdr);
1211                 iter->seq = ic->seq;
1212         }
1213
1214         stream->avail_in = ic->len;
1215         stream->next_in = ic->buf;
1216
1217         if (!iter->buf_size) {
1218                 iter->buf_size = iter->chunk_sz;
1219                 iter->buf = malloc(iter->buf_size);
1220         }
1221
1222         while (stream->avail_in) {
1223                 size_t this_out = iter->buf_size - iter->buf_used;
1224                 int err;
1225
1226                 stream->avail_out = this_out;
1227                 stream->next_out = iter->buf + iter->buf_used;
1228
1229                 err = inflate(stream, Z_NO_FLUSH);
1230                 if (err < 0) {
1231                         log_err("fio: failed inflating log: %d\n", err);
1232                         iter->err = err;
1233                         break;
1234                 }
1235
1236                 iter->buf_used += this_out - stream->avail_out;
1237
1238                 if (!stream->avail_out) {
1239                         iter->buf_size += iter->chunk_sz;
1240                         iter->buf = realloc(iter->buf, iter->buf_size);
1241                         continue;
1242                 }
1243
1244                 if (err == Z_STREAM_END)
1245                         break;
1246         }
1247
1248         ret = (void *) stream->next_in - ic->buf;
1249
1250         dprint(FD_COMPRESS, "inflated to size=%lu\n", (unsigned long) iter->buf_size);
1251
1252         return ret;
1253 }
1254
1255 /*
1256  * Inflate stored compressed chunks, or write them directly to the log
1257  * file if so instructed.
1258  */
1259 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1260 {
1261         struct inflate_chunk_iter iter = { .chunk_sz = log->log_gz, };
1262         z_stream stream;
1263
1264         while (!flist_empty(&log->chunk_list)) {
1265                 struct iolog_compress *ic;
1266
1267                 ic = flist_first_entry(&log->chunk_list, struct iolog_compress, list);
1268                 flist_del(&ic->list);
1269
1270                 if (log->log_gz_store) {
1271                         size_t ret;
1272
1273                         dprint(FD_COMPRESS, "log write chunk size=%lu, "
1274                                 "seq=%u\n", (unsigned long) ic->len, ic->seq);
1275
1276                         ret = fwrite(ic->buf, ic->len, 1, f);
1277                         if (ret != 1 || ferror(f)) {
1278                                 iter.err = errno;
1279                                 log_err("fio: error writing compressed log\n");
1280                         }
1281                 } else
1282                         inflate_chunk(ic, log->log_gz_store, f, &stream, &iter);
1283
1284                 free_chunk(ic);
1285         }
1286
1287         if (iter.seq) {
1288                 finish_chunk(&stream, f, &iter);
1289                 free(iter.buf);
1290         }
1291
1292         return iter.err;
1293 }
1294
1295 /*
1296  * Open compressed log file and decompress the stored chunks and
1297  * write them to stdout. The chunks are stored sequentially in the
1298  * file, so we iterate over them and do them one-by-one.
1299  */
1300 int iolog_file_inflate(const char *file)
1301 {
1302         struct inflate_chunk_iter iter = { .chunk_sz = 64 * 1024 * 1024, };
1303         struct iolog_compress ic;
1304         z_stream stream;
1305         struct stat sb;
1306         size_t ret;
1307         size_t total;
1308         void *buf;
1309         FILE *f;
1310
1311         f = fopen(file, "rb");
1312         if (!f) {
1313                 perror("fopen");
1314                 return 1;
1315         }
1316
1317         if (stat(file, &sb) < 0) {
1318                 fclose(f);
1319                 perror("stat");
1320                 return 1;
1321         }
1322
1323         ic.buf = buf = malloc(sb.st_size);
1324         ic.len = sb.st_size;
1325         ic.seq = 1;
1326
1327         ret = fread(ic.buf, ic.len, 1, f);
1328         if (ret == 0 && ferror(f)) {
1329                 perror("fread");
1330                 fclose(f);
1331                 free(buf);
1332                 return 1;
1333         } else if (ferror(f) || (!feof(f) && ret != 1)) {
1334                 log_err("fio: short read on reading log\n");
1335                 fclose(f);
1336                 free(buf);
1337                 return 1;
1338         }
1339
1340         fclose(f);
1341
1342         /*
1343          * Each chunk will return Z_STREAM_END. We don't know how many
1344          * chunks are in the file, so we just keep looping and incrementing
1345          * the sequence number until we have consumed the whole compressed
1346          * file.
1347          */
1348         total = ic.len;
1349         do {
1350                 size_t iret;
1351
1352                 iret = inflate_chunk(&ic,  1, stdout, &stream, &iter);
1353                 total -= iret;
1354                 if (!total)
1355                         break;
1356                 if (iter.err)
1357                         break;
1358
1359                 ic.seq++;
1360                 ic.len -= iret;
1361                 ic.buf += iret;
1362         } while (1);
1363
1364         if (iter.seq) {
1365                 finish_chunk(&stream, stdout, &iter);
1366                 free(iter.buf);
1367         }
1368
1369         free(buf);
1370         return iter.err;
1371 }
1372
1373 #else
1374
1375 static int inflate_gz_chunks(struct io_log *log, FILE *f)
1376 {
1377         return 0;
1378 }
1379
1380 int iolog_file_inflate(const char *file)
1381 {
1382         log_err("fio: log inflation not possible without zlib\n");
1383         return 1;
1384 }
1385
1386 #endif
1387
1388 void flush_log(struct io_log *log, bool do_append)
1389 {
1390         void *buf;
1391         FILE *f;
1392
1393         /*
1394          * If log_gz_store is true, we are writing a binary file.
1395          * Set the mode appropriately (on all platforms) to avoid issues
1396          * on windows (line-ending conversions, etc.)
1397          */
1398         if (!do_append)
1399                 if (log->log_gz_store)
1400                         f = fopen(log->filename, "wb");
1401                 else
1402                         f = fopen(log->filename, "w");
1403         else
1404                 if (log->log_gz_store)
1405                         f = fopen(log->filename, "ab");
1406                 else
1407                         f = fopen(log->filename, "a");
1408         if (!f) {
1409                 perror("fopen log");
1410                 return;
1411         }
1412
1413         buf = set_file_buffer(f);
1414
1415         inflate_gz_chunks(log, f);
1416
1417         while (!flist_empty(&log->io_logs)) {
1418                 struct io_logs *cur_log;
1419
1420                 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1421                 flist_del_init(&cur_log->list);
1422                 
1423                 if (log->td && log == log->td->clat_hist_log)
1424                         flush_hist_samples(f, log->hist_coarseness, cur_log->log,
1425                                            log_sample_sz(log, cur_log));
1426                 else
1427                         flush_samples(f, cur_log->log, log_sample_sz(log, cur_log));
1428                 
1429                 sfree(cur_log);
1430         }
1431
1432         fclose(f);
1433         clear_file_buffer(buf);
1434 }
1435
1436 static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
1437 {
1438         if (td->flags & TD_F_COMPRESS_LOG)
1439                 iolog_flush(log);
1440
1441         if (trylock) {
1442                 if (fio_trylock_file(log->filename))
1443                         return 1;
1444         } else
1445                 fio_lock_file(log->filename);
1446
1447         if (td->client_type == FIO_CLIENT_TYPE_GUI || is_backend)
1448                 fio_send_iolog(td, log, log->filename);
1449         else
1450                 flush_log(log, !td->o.per_job_logs);
1451
1452         fio_unlock_file(log->filename);
1453         free_log(log);
1454         return 0;
1455 }
1456
1457 size_t log_chunk_sizes(struct io_log *log)
1458 {
1459         struct flist_head *entry;
1460         size_t ret;
1461
1462         if (flist_empty(&log->chunk_list))
1463                 return 0;
1464
1465         ret = 0;
1466         pthread_mutex_lock(&log->chunk_lock);
1467         flist_for_each(entry, &log->chunk_list) {
1468                 struct iolog_compress *c;
1469
1470                 c = flist_entry(entry, struct iolog_compress, list);
1471                 ret += c->len;
1472         }
1473         pthread_mutex_unlock(&log->chunk_lock);
1474         return ret;
1475 }
1476
1477 #ifdef CONFIG_ZLIB
1478
1479 static void iolog_put_deferred(struct io_log *log, void *ptr)
1480 {
1481         if (!ptr)
1482                 return;
1483
1484         pthread_mutex_lock(&log->deferred_free_lock);
1485         if (log->deferred < IOLOG_MAX_DEFER) {
1486                 log->deferred_items[log->deferred] = ptr;
1487                 log->deferred++;
1488         } else if (!fio_did_warn(FIO_WARN_IOLOG_DROP))
1489                 log_err("fio: had to drop log entry free\n");
1490         pthread_mutex_unlock(&log->deferred_free_lock);
1491 }
1492
1493 static void iolog_free_deferred(struct io_log *log)
1494 {
1495         int i;
1496
1497         if (!log->deferred)
1498                 return;
1499
1500         pthread_mutex_lock(&log->deferred_free_lock);
1501
1502         for (i = 0; i < log->deferred; i++) {
1503                 free(log->deferred_items[i]);
1504                 log->deferred_items[i] = NULL;
1505         }
1506
1507         log->deferred = 0;
1508         pthread_mutex_unlock(&log->deferred_free_lock);
1509 }
1510
1511 static int gz_work(struct iolog_flush_data *data)
1512 {
1513         struct iolog_compress *c = NULL;
1514         struct flist_head list;
1515         unsigned int seq;
1516         z_stream stream;
1517         size_t total = 0;
1518         int ret;
1519
1520         INIT_FLIST_HEAD(&list);
1521
1522         memset(&stream, 0, sizeof(stream));
1523         stream.zalloc = Z_NULL;
1524         stream.zfree = Z_NULL;
1525         stream.opaque = Z_NULL;
1526
1527         ret = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
1528         if (ret != Z_OK) {
1529                 log_err("fio: failed to init gz stream\n");
1530                 goto err;
1531         }
1532
1533         seq = ++data->log->chunk_seq;
1534
1535         stream.next_in = (void *) data->samples;
1536         stream.avail_in = data->nr_samples * log_entry_sz(data->log);
1537
1538         dprint(FD_COMPRESS, "deflate input size=%lu, seq=%u, log=%s\n",
1539                                 (unsigned long) stream.avail_in, seq,
1540                                 data->log->filename);
1541         do {
1542                 if (c)
1543                         dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1544                                 (unsigned long) c->len);
1545                 c = get_new_chunk(seq);
1546                 stream.avail_out = GZ_CHUNK;
1547                 stream.next_out = c->buf;
1548                 ret = deflate(&stream, Z_NO_FLUSH);
1549                 if (ret < 0) {
1550                         log_err("fio: deflate log (%d)\n", ret);
1551                         free_chunk(c);
1552                         goto err;
1553                 }
1554
1555                 c->len = GZ_CHUNK - stream.avail_out;
1556                 flist_add_tail(&c->list, &list);
1557                 total += c->len;
1558         } while (stream.avail_in);
1559
1560         stream.next_out = c->buf + c->len;
1561         stream.avail_out = GZ_CHUNK - c->len;
1562
1563         ret = deflate(&stream, Z_FINISH);
1564         if (ret < 0) {
1565                 /*
1566                  * Z_BUF_ERROR is special, it just means we need more
1567                  * output space. We'll handle that below. Treat any other
1568                  * error as fatal.
1569                  */
1570                 if (ret != Z_BUF_ERROR) {
1571                         log_err("fio: deflate log (%d)\n", ret);
1572                         flist_del(&c->list);
1573                         free_chunk(c);
1574                         goto err;
1575                 }
1576         }
1577
1578         total -= c->len;
1579         c->len = GZ_CHUNK - stream.avail_out;
1580         total += c->len;
1581         dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq, (unsigned long) c->len);
1582
1583         if (ret != Z_STREAM_END) {
1584                 do {
1585                         c = get_new_chunk(seq);
1586                         stream.avail_out = GZ_CHUNK;
1587                         stream.next_out = c->buf;
1588                         ret = deflate(&stream, Z_FINISH);
1589                         c->len = GZ_CHUNK - stream.avail_out;
1590                         total += c->len;
1591                         flist_add_tail(&c->list, &list);
1592                         dprint(FD_COMPRESS, "seq=%d, chunk=%lu\n", seq,
1593                                 (unsigned long) c->len);
1594                 } while (ret != Z_STREAM_END);
1595         }
1596
1597         dprint(FD_COMPRESS, "deflated to size=%lu\n", (unsigned long) total);
1598
1599         ret = deflateEnd(&stream);
1600         if (ret != Z_OK)
1601                 log_err("fio: deflateEnd %d\n", ret);
1602
1603         iolog_put_deferred(data->log, data->samples);
1604
1605         if (!flist_empty(&list)) {
1606                 pthread_mutex_lock(&data->log->chunk_lock);
1607                 flist_splice_tail(&list, &data->log->chunk_list);
1608                 pthread_mutex_unlock(&data->log->chunk_lock);
1609         }
1610
1611         ret = 0;
1612 done:
1613         if (data->free)
1614                 sfree(data);
1615         return ret;
1616 err:
1617         while (!flist_empty(&list)) {
1618                 c = flist_first_entry(list.next, struct iolog_compress, list);
1619                 flist_del(&c->list);
1620                 free_chunk(c);
1621         }
1622         ret = 1;
1623         goto done;
1624 }
1625
1626 /*
1627  * Invoked from our compress helper thread, when logging would have exceeded
1628  * the specified memory limitation. Compresses the previously stored
1629  * entries.
1630  */
1631 static int gz_work_async(struct submit_worker *sw, struct workqueue_work *work)
1632 {
1633         return gz_work(container_of(work, struct iolog_flush_data, work));
1634 }
1635
1636 static int gz_init_worker(struct submit_worker *sw)
1637 {
1638         struct thread_data *td = sw->wq->td;
1639
1640         if (!fio_option_is_set(&td->o, log_gz_cpumask))
1641                 return 0;
1642
1643         if (fio_setaffinity(gettid(), td->o.log_gz_cpumask) == -1) {
1644                 log_err("gz: failed to set CPU affinity\n");
1645                 return 1;
1646         }
1647
1648         return 0;
1649 }
1650
1651 static struct workqueue_ops log_compress_wq_ops = {
1652         .fn             = gz_work_async,
1653         .init_worker_fn = gz_init_worker,
1654         .nice           = 1,
1655 };
1656
1657 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1658 {
1659         if (!(td->flags & TD_F_COMPRESS_LOG))
1660                 return 0;
1661
1662         workqueue_init(td, &td->log_compress_wq, &log_compress_wq_ops, 1, sk_out);
1663         return 0;
1664 }
1665
1666 void iolog_compress_exit(struct thread_data *td)
1667 {
1668         if (!(td->flags & TD_F_COMPRESS_LOG))
1669                 return;
1670
1671         workqueue_exit(&td->log_compress_wq);
1672 }
1673
1674 /*
1675  * Queue work item to compress the existing log entries. We reset the
1676  * current log to a small size, and reference the existing log in the
1677  * data that we queue for compression. Once compression has been done,
1678  * this old log is freed. Will not return until the log compression
1679  * has completed, and will flush all previous logs too
1680  */
1681 static int iolog_flush(struct io_log *log)
1682 {
1683         struct iolog_flush_data *data;
1684
1685         workqueue_flush(&log->td->log_compress_wq);
1686         data = malloc(sizeof(*data));
1687         if (!data)
1688                 return 1;
1689
1690         data->log = log;
1691         data->free = false;
1692
1693         while (!flist_empty(&log->io_logs)) {
1694                 struct io_logs *cur_log;
1695
1696                 cur_log = flist_first_entry(&log->io_logs, struct io_logs, list);
1697                 flist_del_init(&cur_log->list);
1698
1699                 data->samples = cur_log->log;
1700                 data->nr_samples = cur_log->nr_samples;
1701
1702                 sfree(cur_log);
1703
1704                 gz_work(data);
1705         }
1706
1707         free(data);
1708         return 0;
1709 }
1710
1711 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1712 {
1713         struct iolog_flush_data *data;
1714
1715         data = smalloc(sizeof(*data));
1716         if (!data)
1717                 return 1;
1718
1719         data->log = log;
1720
1721         data->samples = cur_log->log;
1722         data->nr_samples = cur_log->nr_samples;
1723         data->free = true;
1724
1725         cur_log->nr_samples = cur_log->max_samples = 0;
1726         cur_log->log = NULL;
1727
1728         workqueue_enqueue(&log->td->log_compress_wq, &data->work);
1729
1730         iolog_free_deferred(log);
1731
1732         return 0;
1733 }
1734 #else
1735
1736 static int iolog_flush(struct io_log *log)
1737 {
1738         return 1;
1739 }
1740
1741 int iolog_cur_flush(struct io_log *log, struct io_logs *cur_log)
1742 {
1743         return 1;
1744 }
1745
1746 int iolog_compress_init(struct thread_data *td, struct sk_out *sk_out)
1747 {
1748         return 0;
1749 }
1750
1751 void iolog_compress_exit(struct thread_data *td)
1752 {
1753 }
1754
1755 #endif
1756
1757 struct io_logs *iolog_cur_log(struct io_log *log)
1758 {
1759         if (flist_empty(&log->io_logs))
1760                 return NULL;
1761
1762         return flist_last_entry(&log->io_logs, struct io_logs, list);
1763 }
1764
1765 uint64_t iolog_nr_samples(struct io_log *iolog)
1766 {
1767         struct flist_head *entry;
1768         uint64_t ret = 0;
1769
1770         flist_for_each(entry, &iolog->io_logs) {
1771                 struct io_logs *cur_log;
1772
1773                 cur_log = flist_entry(entry, struct io_logs, list);
1774                 ret += cur_log->nr_samples;
1775         }
1776
1777         return ret;
1778 }
1779
1780 static int __write_log(struct thread_data *td, struct io_log *log, int try)
1781 {
1782         if (log)
1783                 return finish_log(td, log, try);
1784
1785         return 0;
1786 }
1787
1788 static int write_iops_log(struct thread_data *td, int try, bool unit_log)
1789 {
1790         int ret;
1791
1792         if (per_unit_log(td->iops_log) != unit_log)
1793                 return 0;
1794
1795         ret = __write_log(td, td->iops_log, try);
1796         if (!ret)
1797                 td->iops_log = NULL;
1798
1799         return ret;
1800 }
1801
1802 static int write_slat_log(struct thread_data *td, int try, bool unit_log)
1803 {
1804         int ret;
1805
1806         if (!unit_log)
1807                 return 0;
1808
1809         ret = __write_log(td, td->slat_log, try);
1810         if (!ret)
1811                 td->slat_log = NULL;
1812
1813         return ret;
1814 }
1815
1816 static int write_clat_log(struct thread_data *td, int try, bool unit_log)
1817 {
1818         int ret;
1819
1820         if (!unit_log)
1821                 return 0;
1822
1823         ret = __write_log(td, td->clat_log, try);
1824         if (!ret)
1825                 td->clat_log = NULL;
1826
1827         return ret;
1828 }
1829
1830 static int write_clat_hist_log(struct thread_data *td, int try, bool unit_log)
1831 {
1832         int ret;
1833
1834         if (!unit_log)
1835                 return 0;
1836
1837         ret = __write_log(td, td->clat_hist_log, try);
1838         if (!ret)
1839                 td->clat_hist_log = NULL;
1840
1841         return ret;
1842 }
1843
1844 static int write_lat_log(struct thread_data *td, int try, bool unit_log)
1845 {
1846         int ret;
1847
1848         if (!unit_log)
1849                 return 0;
1850
1851         ret = __write_log(td, td->lat_log, try);
1852         if (!ret)
1853                 td->lat_log = NULL;
1854
1855         return ret;
1856 }
1857
1858 static int write_bandw_log(struct thread_data *td, int try, bool unit_log)
1859 {
1860         int ret;
1861
1862         if (per_unit_log(td->bw_log) != unit_log)
1863                 return 0;
1864
1865         ret = __write_log(td, td->bw_log, try);
1866         if (!ret)
1867                 td->bw_log = NULL;
1868
1869         return ret;
1870 }
1871
1872 enum {
1873         BW_LOG_MASK     = 1,
1874         LAT_LOG_MASK    = 2,
1875         SLAT_LOG_MASK   = 4,
1876         CLAT_LOG_MASK   = 8,
1877         IOPS_LOG_MASK   = 16,
1878         CLAT_HIST_LOG_MASK = 32,
1879
1880         ALL_LOG_NR      = 6,
1881 };
1882
1883 struct log_type {
1884         unsigned int mask;
1885         int (*fn)(struct thread_data *, int, bool);
1886 };
1887
1888 static struct log_type log_types[] = {
1889         {
1890                 .mask   = BW_LOG_MASK,
1891                 .fn     = write_bandw_log,
1892         },
1893         {
1894                 .mask   = LAT_LOG_MASK,
1895                 .fn     = write_lat_log,
1896         },
1897         {
1898                 .mask   = SLAT_LOG_MASK,
1899                 .fn     = write_slat_log,
1900         },
1901         {
1902                 .mask   = CLAT_LOG_MASK,
1903                 .fn     = write_clat_log,
1904         },
1905         {
1906                 .mask   = IOPS_LOG_MASK,
1907                 .fn     = write_iops_log,
1908         },
1909         {
1910                 .mask   = CLAT_HIST_LOG_MASK,
1911                 .fn     = write_clat_hist_log,
1912         }
1913 };
1914
1915 void td_writeout_logs(struct thread_data *td, bool unit_logs)
1916 {
1917         unsigned int log_mask = 0;
1918         unsigned int log_left = ALL_LOG_NR;
1919         int old_state, i;
1920
1921         old_state = td_bump_runstate(td, TD_FINISHING);
1922
1923         finalize_logs(td, unit_logs);
1924
1925         while (log_left) {
1926                 int prev_log_left = log_left;
1927
1928                 for (i = 0; i < ALL_LOG_NR && log_left; i++) {
1929                         struct log_type *lt = &log_types[i];
1930                         int ret;
1931
1932                         if (!(log_mask & lt->mask)) {
1933                                 ret = lt->fn(td, log_left != 1, unit_logs);
1934                                 if (!ret) {
1935                                         log_left--;
1936                                         log_mask |= lt->mask;
1937                                 }
1938                         }
1939                 }
1940
1941                 if (prev_log_left == log_left)
1942                         usleep(5000);
1943         }
1944
1945         td_restore_runstate(td, old_state);
1946 }
1947
1948 void fio_writeout_logs(bool unit_logs)
1949 {
1950         for_each_td(td) {
1951                 td_writeout_logs(td, unit_logs);
1952         } end_for_each();
1953 }