t/zbd: avoid test case 31 failure with small devices
[fio.git] / t / read-to-pipe-async.c
1 /*
2  * Read a file and write the contents to stdout. If a given read takes
3  * longer than 'max_us' time, then we schedule a new thread to handle
4  * the next read. This avoids the coordinated omission problem, where
5  * one request appears to take a long time, but in reality a lot of
6  * requests would have been slow, but we don't notice since new submissions
7  * are not being issued if just 1 is held up.
8  *
9  * One test case:
10  *
11  * $ time (./read-to-pipe-async -f randfile.gz | gzip -dc > outfile; sync)
12  *
13  * This will read randfile.gz and log the latencies of doing so, while
14  * piping the output to gzip to decompress it. Any latencies over max_us
15  * are logged when they happen, and latency buckets are displayed at the
16  * end of the run
17  *
18  * gcc -Wall -g -O2 -o read-to-pipe-async read-to-pipe-async.c -lpthread
19  *
20  * Copyright (C) 2016 Jens Axboe
21  *
22  */
23
24 #include <stdio.h>
25 #include <stdlib.h>
26 #include <unistd.h>
27 #include <fcntl.h>
28 #include <sys/time.h>
29 #include <sys/types.h>
30 #include <sys/stat.h>
31 #include <inttypes.h>
32 #include <string.h>
33 #include <pthread.h>
34 #include <errno.h>
35 #include <assert.h>
36
37 #include "../flist.h"
38
39 #include "compiler/compiler.h"
40
41 static int bs = 4096;
42 static int max_us = 10000;
43 static char *file;
44 static int separate_writer = 1;
45
46 #define PLAT_BITS       8
47 #define PLAT_VAL        (1 << PLAT_BITS)
48 #define PLAT_GROUP_NR   19
49 #define PLAT_NR         (PLAT_GROUP_NR * PLAT_VAL)
50 #define PLAT_LIST_MAX   20
51
52 #ifndef NDEBUG
53 #define CHECK_ZERO_OR_ABORT(code) assert(code)
54 #else
55 #define CHECK_ZERO_OR_ABORT(code)                                                                               \
56         do {                                                                                                                            \
57                 if (fio_unlikely((code) != 0)) {                                                                \
58                         log_err("failed checking code %i != 0", (code));        \
59                         abort();                                                                                                        \
60                 }                                                                                                                               \
61         } while (0)
62 #endif
63
64 struct stats {
65         unsigned int plat[PLAT_NR];
66         unsigned int nr_samples;
67         unsigned int max;
68         unsigned int min;
69         unsigned int over;
70 };
71
72 static double plist[PLAT_LIST_MAX] = { 50.0, 75.0, 90.0, 95.0, 99.0, 99.5, 99.9, 99.99, 99.999, 99.9999, };
73
74 struct thread_data {
75         int exit;
76         int done;
77         pthread_mutex_t lock;
78         pthread_cond_t cond;
79         pthread_mutex_t done_lock;
80         pthread_cond_t done_cond;
81         pthread_t thread;
82 };
83
84 struct writer_thread {
85         struct flist_head list;
86         struct flist_head done_list;
87         struct stats s;
88         struct thread_data thread;
89 };
90
91 struct reader_thread {
92         struct flist_head list;
93         struct flist_head done_list;
94         int started;
95         int busy;
96         int write_seq;
97         struct stats s;
98         struct thread_data thread;
99 };
100
101 struct work_item {
102         struct flist_head list;
103         void *buf;
104         size_t buf_size;
105         off_t off;
106         int fd;
107         int seq;
108         struct writer_thread *writer;
109         struct reader_thread *reader;
110         pthread_mutex_t lock;
111         pthread_cond_t cond;
112         pthread_t thread;
113 };
114
115 static struct reader_thread reader_thread;
116 static struct writer_thread writer_thread;
117
118 uint64_t utime_since(const struct timespec *s, const struct timespec *e)
119 {
120         long sec, usec;
121         uint64_t ret;
122
123         sec = e->tv_sec - s->tv_sec;
124         usec = (e->tv_nsec - s->tv_nsec) / 1000;
125         if (sec > 0 && usec < 0) {
126                 sec--;
127                 usec += 1000000;
128         }
129
130         if (sec < 0 || (sec == 0 && usec < 0))
131                 return 0;
132
133         ret = sec * 1000000ULL + usec;
134
135         return ret;
136 }
137
138 static struct work_item *find_seq(struct writer_thread *w, int seq)
139 {
140         struct work_item *work;
141         struct flist_head *entry;
142
143         if (flist_empty(&w->list))
144                 return NULL;
145
146         flist_for_each(entry, &w->list) {
147                 work = flist_entry(entry, struct work_item, list);
148                 if (work->seq == seq)
149                         return work;
150         }
151
152         return NULL;
153 }
154
155 static unsigned int plat_val_to_idx(unsigned int val)
156 {
157         unsigned int msb, error_bits, base, offset;
158
159         /* Find MSB starting from bit 0 */
160         if (val == 0)
161                 msb = 0;
162         else
163                 msb = sizeof(val)*8 - __builtin_clz(val) - 1;
164
165         /*
166          * MSB <= (PLAT_BITS-1), cannot be rounded off. Use
167          * all bits of the sample as index
168          */
169         if (msb <= PLAT_BITS)
170                 return val;
171
172         /* Compute the number of error bits to discard*/
173         error_bits = msb - PLAT_BITS;
174
175         /* Compute the number of buckets before the group */
176         base = (error_bits + 1) << PLAT_BITS;
177
178         /*
179          * Discard the error bits and apply the mask to find the
180          * index for the buckets in the group
181          */
182         offset = (PLAT_VAL - 1) & (val >> error_bits);
183
184         /* Make sure the index does not exceed (array size - 1) */
185         return (base + offset) < (PLAT_NR - 1) ?
186                 (base + offset) : (PLAT_NR - 1);
187 }
188
189 /*
190  * Convert the given index of the bucket array to the value
191  * represented by the bucket
192  */
193 static unsigned int plat_idx_to_val(unsigned int idx)
194 {
195         unsigned int error_bits, k, base;
196
197         assert(idx < PLAT_NR);
198
199         /* MSB <= (PLAT_BITS-1), cannot be rounded off. Use
200          * all bits of the sample as index */
201         if (idx < (PLAT_VAL << 1))
202                 return idx;
203
204         /* Find the group and compute the minimum value of that group */
205         error_bits = (idx >> PLAT_BITS) - 1;
206         base = 1 << (error_bits + PLAT_BITS);
207
208         /* Find its bucket number of the group */
209         k = idx % PLAT_VAL;
210
211         /* Return the mean of the range of the bucket */
212         return base + ((k + 0.5) * (1 << error_bits));
213 }
214
215 static void add_lat(struct stats *s, unsigned int us, const char *name)
216 {
217         int lat_index = 0;
218
219         if (us > s->max)
220                 s->max = us;
221         if (us < s->min)
222                 s->min = us;
223
224         if (us > max_us) {
225                 fprintf(stderr, "%s latency=%u usec\n", name, us);
226                 s->over++;
227         }
228
229         lat_index = plat_val_to_idx(us);
230         __sync_fetch_and_add(&s->plat[lat_index], 1);
231         __sync_fetch_and_add(&s->nr_samples, 1);
232 }
233
234 static int write_work(struct work_item *work)
235 {
236         struct timespec s, e;
237         ssize_t ret;
238
239         clock_gettime(CLOCK_MONOTONIC, &s);
240         ret = write(STDOUT_FILENO, work->buf, work->buf_size);
241         if (ret < 0)
242                 return (int)ret;
243         clock_gettime(CLOCK_MONOTONIC, &e);
244         assert(ret == work->buf_size);
245
246         add_lat(&work->writer->s, utime_since(&s, &e), "write");
247         return work->seq + 1;
248 }
249
250 static void thread_exiting(struct thread_data *thread)
251 {
252         __sync_fetch_and_add(&thread->done, 1);
253         pthread_cond_signal(&thread->done_cond);
254 }
255
256 static void *writer_fn(void *data)
257 {
258         struct writer_thread *wt = data;
259         struct work_item *work;
260         int seq = 1;
261
262         work = NULL;
263         while (!(seq < 0) && (!wt->thread.exit || !flist_empty(&wt->list))) {
264                 pthread_mutex_lock(&wt->thread.lock);
265
266                 if (work)
267                         flist_add_tail(&work->list, &wt->done_list);
268         
269                 work = find_seq(wt, seq);
270                 if (work)
271                         flist_del_init(&work->list);
272                 else
273                         pthread_cond_wait(&wt->thread.cond, &wt->thread.lock);
274
275                 pthread_mutex_unlock(&wt->thread.lock);
276
277                 if (work)
278                         seq = write_work(work);
279         }
280
281         thread_exiting(&wt->thread);
282         return NULL;
283 }
284
285 static void reader_work(struct work_item *work)
286 {
287         struct timespec s, e;
288         ssize_t ret;
289         size_t left;
290         void *buf;
291         off_t off;
292
293         clock_gettime(CLOCK_MONOTONIC, &s);
294
295         left = work->buf_size;
296         buf = work->buf;
297         off = work->off;
298         while (left) {
299                 ret = pread(work->fd, buf, left, off);
300                 if (!ret) {
301                         fprintf(stderr, "zero read\n");
302                         break;
303                 } else if (ret < 0) {
304                         fprintf(stderr, "errno=%d\n", errno);
305                         break;
306                 }
307                 left -= ret;
308                 off += ret;
309                 buf += ret;
310         }
311
312         clock_gettime(CLOCK_MONOTONIC, &e);
313
314         add_lat(&work->reader->s, utime_since(&s, &e), "read");
315
316         pthread_cond_signal(&work->cond);
317
318         if (separate_writer) {
319                 pthread_mutex_lock(&work->writer->thread.lock);
320                 flist_add_tail(&work->list, &work->writer->list);
321                 pthread_mutex_unlock(&work->writer->thread.lock);
322                 pthread_cond_signal(&work->writer->thread.cond);
323         } else {
324                 struct reader_thread *rt = work->reader;
325                 struct work_item *next = NULL;
326                 struct flist_head *entry;
327
328                 /*
329                  * Write current work if it matches in sequence.
330                  */
331                 if (work->seq == rt->write_seq)
332                         goto write_it;
333
334                 pthread_mutex_lock(&rt->thread.lock);
335
336                 flist_add_tail(&work->list, &rt->done_list);
337
338                 /*
339                  * See if the next work item is here, if so, write it
340                  */
341                 work = NULL;
342                 flist_for_each(entry, &rt->done_list) {
343                         next = flist_entry(entry, struct work_item, list);
344                         if (next->seq == rt->write_seq) {
345                                 work = next;
346                                 flist_del(&work->list);
347                                 break;
348                         }
349                 }
350
351                 pthread_mutex_unlock(&rt->thread.lock);
352         
353                 if (work) {
354 write_it:
355                         write_work(work);
356                         __sync_fetch_and_add(&rt->write_seq, 1);
357                 }
358         }
359 }
360
361 static void *reader_one_off(void *data)
362 {
363         reader_work(data);
364         return NULL;
365 }
366
367 static void *reader_fn(void *data)
368 {
369         struct reader_thread *rt = data;
370         struct work_item *work;
371
372         while (!rt->thread.exit || !flist_empty(&rt->list)) {
373                 work = NULL;
374                 pthread_mutex_lock(&rt->thread.lock);
375                 if (!flist_empty(&rt->list)) {
376                         work = flist_first_entry(&rt->list, struct work_item, list);
377                         flist_del_init(&work->list);
378                 } else
379                         pthread_cond_wait(&rt->thread.cond, &rt->thread.lock);
380                 pthread_mutex_unlock(&rt->thread.lock);
381
382                 if (work) {
383                         __sync_fetch_and_add(&rt->busy, 1);
384                         reader_work(work);
385                         __sync_fetch_and_sub(&rt->busy, 1);
386                 }
387         }
388
389         thread_exiting(&rt->thread);
390         return NULL;
391 }
392
393 static void queue_work(struct reader_thread *rt, struct work_item *work)
394 {
395         if (!rt->started) {
396                 pthread_mutex_lock(&rt->thread.lock);
397                 flist_add_tail(&work->list, &rt->list);
398                 pthread_mutex_unlock(&rt->thread.lock);
399
400                 rt->started = 1;
401                 pthread_create(&rt->thread.thread, NULL, reader_fn, rt);
402         } else if (!rt->busy && !pthread_mutex_trylock(&rt->thread.lock)) {
403                 flist_add_tail(&work->list, &rt->list);
404                 pthread_mutex_unlock(&rt->thread.lock);
405
406                 pthread_cond_signal(&rt->thread.cond);
407         } else {
408                 int ret = pthread_create(&work->thread, NULL, reader_one_off, work);
409                 if (ret) {
410                         fprintf(stderr, "pthread_create=%d\n", ret);
411                 } else {
412                         ret = pthread_detach(work->thread);
413                         if (ret)
414                                 fprintf(stderr, "pthread_detach=%d\n", ret);
415                 }
416         }
417 }
418
419 static unsigned int calc_percentiles(unsigned int *io_u_plat, unsigned long nr,
420                                      unsigned int **output)
421 {
422         unsigned long sum = 0;
423         unsigned int len, i, j = 0;
424         unsigned int oval_len = 0;
425         unsigned int *ovals = NULL;
426         int is_last;
427
428         len = 0;
429         while (len < PLAT_LIST_MAX && plist[len] != 0.0)
430                 len++;
431
432         if (!len)
433                 return 0;
434
435         /*
436          * Calculate bucket values, note down max and min values
437          */
438         is_last = 0;
439         for (i = 0; i < PLAT_NR && !is_last; i++) {
440                 sum += io_u_plat[i];
441                 while (sum >= (plist[j] / 100.0 * nr)) {
442                         assert(plist[j] <= 100.0);
443
444                         if (j == oval_len) {
445                                 oval_len += 100;
446                                 ovals = realloc(ovals, oval_len * sizeof(unsigned int));
447                         }
448
449                         ovals[j] = plat_idx_to_val(i);
450                         is_last = (j == len - 1);
451                         if (is_last)
452                                 break;
453
454                         j++;
455                 }
456         }
457
458         *output = ovals;
459         return len;
460 }
461
462 static void show_latencies(struct stats *s, const char *msg)
463 {
464         unsigned int *ovals = NULL;
465         unsigned int len, i;
466
467         len = calc_percentiles(s->plat, s->nr_samples, &ovals);
468         if (len) {
469                 fprintf(stderr, "Latency percentiles (usec) (%s)\n", msg);
470                 for (i = 0; i < len; i++)
471                         fprintf(stderr, "\t%2.4fth: %u\n", plist[i], ovals[i]);
472         }
473
474         if (ovals)
475                 free(ovals);
476
477         fprintf(stderr, "\tOver=%u, min=%u, max=%u\n", s->over, s->min, s->max);
478 }
479
480 static void init_thread(struct thread_data *thread)
481 {
482         pthread_condattr_t cattr;
483         int ret;
484
485         ret = pthread_condattr_init(&cattr);
486         CHECK_ZERO_OR_ABORT(ret);
487 #ifdef CONFIG_PTHREAD_CONDATTR_SETCLOCK
488         ret = pthread_condattr_setclock(&cattr, CLOCK_MONOTONIC);
489         CHECK_ZERO_OR_ABORT(ret);
490 #endif
491         pthread_cond_init(&thread->cond, &cattr);
492         pthread_cond_init(&thread->done_cond, &cattr);
493         pthread_mutex_init(&thread->lock, NULL);
494         pthread_mutex_init(&thread->done_lock, NULL);
495         thread->exit = 0;
496 }
497
498 static void exit_thread(struct thread_data *thread,
499                         void fn(struct writer_thread *),
500                         struct writer_thread *wt)
501 {
502         __sync_fetch_and_add(&thread->exit, 1);
503         pthread_cond_signal(&thread->cond);
504
505         while (!thread->done) {
506                 pthread_mutex_lock(&thread->done_lock);
507
508                 if (fn) {
509                         struct timespec ts;
510
511 #ifdef CONFIG_PTHREAD_CONDATTR_SETCLOCK
512                         clock_gettime(CLOCK_MONOTONIC, &ts);
513 #else
514                         clock_gettime(CLOCK_REALTIME, &ts);
515 #endif
516                         ts.tv_sec++;
517
518                         pthread_cond_timedwait(&thread->done_cond, &thread->done_lock, &ts);
519                         fn(wt);
520                 } else
521                         pthread_cond_wait(&thread->done_cond, &thread->done_lock);
522
523                 pthread_mutex_unlock(&thread->done_lock);
524         }
525 }
526
527 static int usage(char *argv[])
528 {
529         fprintf(stderr, "%s: [-b blocksize] [-t max usec] [-w separate writer] -f file\n", argv[0]);
530         return 1;
531 }
532
533 static int parse_options(int argc, char *argv[])
534 {
535         int c;
536
537         while ((c = getopt(argc, argv, "f:b:t:w:")) != -1) {
538                 switch (c) {
539                 case 'f':
540                         if (file)
541                                 return usage(argv);
542                         file = strdup(optarg);
543                         break;
544                 case 'b':
545                         bs = atoi(optarg);
546                         break;
547                 case 't':
548                         max_us = atoi(optarg);
549                         break;
550                 case 'w':
551                         separate_writer = atoi(optarg);
552                         if (!separate_writer)
553                                 fprintf(stderr, "inline writing is broken\n");
554                         break;
555                 case '?':
556                 default:
557                         return usage(argv);
558                 }
559         }
560
561         if (!file)
562                 return usage(argv);
563
564         return 0;
565 }
566
567 static void prune_done_entries(struct writer_thread *wt)
568 {
569         FLIST_HEAD(list);
570
571         if (flist_empty(&wt->done_list))
572                 return;
573
574         if (pthread_mutex_trylock(&wt->thread.lock))
575                 return;
576
577         if (!flist_empty(&wt->done_list))
578                 flist_splice_init(&wt->done_list, &list);
579         pthread_mutex_unlock(&wt->thread.lock);
580
581         while (!flist_empty(&list)) {
582                 struct work_item *work;
583
584                 work = flist_first_entry(&list, struct work_item, list);
585                 flist_del(&work->list);
586
587                 pthread_cond_destroy(&work->cond);
588                 pthread_mutex_destroy(&work->lock);
589                 free(work->buf);
590                 free(work);
591         }
592 }
593
594 int main(int argc, char *argv[])
595 {
596         pthread_condattr_t cattr;
597         struct timespec s, re, we;
598         struct reader_thread *rt;
599         struct writer_thread *wt;
600         unsigned long rate;
601         uint64_t elapsed;
602         struct stat sb;
603         size_t bytes;
604         off_t off;
605         int fd, seq;
606         int ret;
607
608         if (parse_options(argc, argv))
609                 return 1;
610
611         fd = open(file, O_RDONLY);
612         if (fd < 0) {
613                 perror("open");
614                 return 2;
615         }
616
617         if (fstat(fd, &sb) < 0) {
618                 perror("stat");
619                 return 3;
620         }
621
622         wt = &writer_thread;
623         init_thread(&wt->thread);
624         INIT_FLIST_HEAD(&wt->list);
625         INIT_FLIST_HEAD(&wt->done_list);
626         wt->s.max = 0;
627         wt->s.min = -1U;
628         pthread_create(&wt->thread.thread, NULL, writer_fn, wt);
629
630         rt = &reader_thread;
631         init_thread(&rt->thread);
632         INIT_FLIST_HEAD(&rt->list);
633         INIT_FLIST_HEAD(&rt->done_list);
634         rt->s.max = 0;
635         rt->s.min = -1U;
636         rt->write_seq = 1;
637
638         off = 0;
639         seq = 0;
640         bytes = 0;
641
642         ret = pthread_condattr_init(&cattr);
643         CHECK_ZERO_OR_ABORT(ret);
644 #ifdef CONFIG_PTHREAD_CONDATTR_SETCLOCK
645         ret = pthread_condattr_setclock(&cattr, CLOCK_MONOTONIC);
646         CHECK_ZERO_OR_ABORT(ret);
647 #endif
648
649         clock_gettime(CLOCK_MONOTONIC, &s);
650
651         while (sb.st_size) {
652                 struct work_item *work;
653                 size_t this_len;
654                 struct timespec ts;
655
656                 prune_done_entries(wt);
657
658                 this_len = sb.st_size;
659                 if (this_len > bs)
660                         this_len = bs;
661
662                 work = calloc(1, sizeof(*work));
663                 work->buf = malloc(this_len);
664                 work->buf_size = this_len;
665                 work->off = off;
666                 work->fd = fd;
667                 work->seq = ++seq;
668                 work->writer = wt;
669                 work->reader = rt;
670                 pthread_cond_init(&work->cond, &cattr);
671                 pthread_mutex_init(&work->lock, NULL);
672
673                 queue_work(rt, work);
674
675 #ifdef CONFIG_PTHREAD_CONDATTR_SETCLOCK
676                 clock_gettime(CLOCK_MONOTONIC, &ts);
677 #else
678                 clock_gettime(CLOCK_REALTIME, &ts);
679 #endif
680                 ts.tv_nsec += max_us * 1000ULL;
681                 if (ts.tv_nsec >= 1000000000ULL) {
682                         ts.tv_nsec -= 1000000000ULL;
683                         ts.tv_sec++;
684                 }
685
686                 pthread_mutex_lock(&work->lock);
687                 pthread_cond_timedwait(&work->cond, &work->lock, &ts);
688                 pthread_mutex_unlock(&work->lock);
689
690                 off += this_len;
691                 sb.st_size -= this_len;
692                 bytes += this_len;
693         }
694
695         exit_thread(&rt->thread, NULL, NULL);
696         clock_gettime(CLOCK_MONOTONIC, &re);
697
698         exit_thread(&wt->thread, prune_done_entries, wt);
699         clock_gettime(CLOCK_MONOTONIC, &we);
700
701         show_latencies(&rt->s, "READERS");
702         show_latencies(&wt->s, "WRITERS");
703
704         bytes /= 1024;
705         elapsed = utime_since(&s, &re);
706         rate = elapsed ? (bytes * 1000UL * 1000UL) / elapsed : 0;
707         fprintf(stderr, "Read rate (KiB/sec) : %lu\n", rate);
708         elapsed = utime_since(&s, &we);
709         rate = elapsed ? (bytes * 1000UL * 1000UL) / elapsed : 0;
710         fprintf(stderr, "Write rate (KiB/sec): %lu\n", rate);
711
712         close(fd);
713         return 0;
714 }