Merge branch 'security-token' of https://github.com/sfc-gh-rnarubin/fio
[fio.git] / t / read-to-pipe-async.c
1 /*
2  * Read a file and write the contents to stdout. If a given read takes
3  * longer than 'max_us' time, then we schedule a new thread to handle
4  * the next read. This avoids the coordinated omission problem, where
5  * one request appears to take a long time, but in reality a lot of
6  * requests would have been slow, but we don't notice since new submissions
7  * are not being issued if just 1 is held up.
8  *
9  * One test case:
10  *
11  * $ time (./read-to-pipe-async -f randfile.gz | gzip -dc > outfile; sync)
12  *
13  * This will read randfile.gz and log the latencies of doing so, while
14  * piping the output to gzip to decompress it. Any latencies over max_us
15  * are logged when they happen, and latency buckets are displayed at the
16  * end of the run
17  *
18  * gcc -Wall -g -O2 -o read-to-pipe-async read-to-pipe-async.c -lpthread
19  *
20  * Copyright (C) 2016 Jens Axboe
21  *
22  */
23
24 #include <stdio.h>
25 #include <stdlib.h>
26 #include <unistd.h>
27 #include <fcntl.h>
28 #include <sys/time.h>
29 #include <sys/types.h>
30 #include <sys/stat.h>
31 #include <inttypes.h>
32 #include <string.h>
33 #include <pthread.h>
34 #include <errno.h>
35 #include <assert.h>
36
37 #include "../flist.h"
38 #include "../log.h"
39
40 #include "compiler/compiler.h"
41
42 static int bs = 4096;
43 static int max_us = 10000;
44 static char *file;
45 static int separate_writer = 1;
46
47 #define PLAT_BITS       8
48 #define PLAT_VAL        (1 << PLAT_BITS)
49 #define PLAT_GROUP_NR   19
50 #define PLAT_NR         (PLAT_GROUP_NR * PLAT_VAL)
51 #define PLAT_LIST_MAX   20
52
53 #ifndef NDEBUG
54 #define CHECK_ZERO_OR_ABORT(code) assert(code)
55 #else
56 #define CHECK_ZERO_OR_ABORT(code)                                                                               \
57         do {                                                                                                                            \
58                 if (fio_unlikely((code) != 0)) {                                                                \
59                         log_err("failed checking code %i != 0", (code));        \
60                         abort();                                                                                                        \
61                 }                                                                                                                               \
62         } while (0)
63 #endif
64
65 struct stats {
66         unsigned int plat[PLAT_NR];
67         unsigned int nr_samples;
68         unsigned int max;
69         unsigned int min;
70         unsigned int over;
71 };
72
73 static double plist[PLAT_LIST_MAX] = { 50.0, 75.0, 90.0, 95.0, 99.0, 99.5, 99.9, 99.99, 99.999, 99.9999, };
74
75 struct thread_data {
76         int exit;
77         int done;
78         pthread_mutex_t lock;
79         pthread_cond_t cond;
80         pthread_mutex_t done_lock;
81         pthread_cond_t done_cond;
82         pthread_t thread;
83 };
84
85 struct writer_thread {
86         struct flist_head list;
87         struct flist_head done_list;
88         struct stats s;
89         struct thread_data thread;
90 };
91
92 struct reader_thread {
93         struct flist_head list;
94         struct flist_head done_list;
95         int started;
96         int busy;
97         int write_seq;
98         struct stats s;
99         struct thread_data thread;
100 };
101
102 struct work_item {
103         struct flist_head list;
104         void *buf;
105         size_t buf_size;
106         off_t off;
107         int fd;
108         int seq;
109         struct writer_thread *writer;
110         struct reader_thread *reader;
111         pthread_mutex_t lock;
112         pthread_cond_t cond;
113         pthread_t thread;
114 };
115
116 static struct reader_thread reader_thread;
117 static struct writer_thread writer_thread;
118
119 uint64_t utime_since(const struct timespec *s, const struct timespec *e)
120 {
121         long sec, usec;
122         uint64_t ret;
123
124         sec = e->tv_sec - s->tv_sec;
125         usec = (e->tv_nsec - s->tv_nsec) / 1000;
126         if (sec > 0 && usec < 0) {
127                 sec--;
128                 usec += 1000000;
129         }
130
131         if (sec < 0 || (sec == 0 && usec < 0))
132                 return 0;
133
134         ret = sec * 1000000ULL + usec;
135
136         return ret;
137 }
138
139 static struct work_item *find_seq(struct writer_thread *w, int seq)
140 {
141         struct work_item *work;
142         struct flist_head *entry;
143
144         if (flist_empty(&w->list))
145                 return NULL;
146
147         flist_for_each(entry, &w->list) {
148                 work = flist_entry(entry, struct work_item, list);
149                 if (work->seq == seq)
150                         return work;
151         }
152
153         return NULL;
154 }
155
156 static unsigned int plat_val_to_idx(unsigned int val)
157 {
158         unsigned int msb, error_bits, base, offset;
159
160         /* Find MSB starting from bit 0 */
161         if (val == 0)
162                 msb = 0;
163         else
164                 msb = sizeof(val)*8 - __builtin_clz(val) - 1;
165
166         /*
167          * MSB <= (PLAT_BITS-1), cannot be rounded off. Use
168          * all bits of the sample as index
169          */
170         if (msb <= PLAT_BITS)
171                 return val;
172
173         /* Compute the number of error bits to discard*/
174         error_bits = msb - PLAT_BITS;
175
176         /* Compute the number of buckets before the group */
177         base = (error_bits + 1) << PLAT_BITS;
178
179         /*
180          * Discard the error bits and apply the mask to find the
181          * index for the buckets in the group
182          */
183         offset = (PLAT_VAL - 1) & (val >> error_bits);
184
185         /* Make sure the index does not exceed (array size - 1) */
186         return (base + offset) < (PLAT_NR - 1) ?
187                 (base + offset) : (PLAT_NR - 1);
188 }
189
190 /*
191  * Convert the given index of the bucket array to the value
192  * represented by the bucket
193  */
194 static unsigned int plat_idx_to_val(unsigned int idx)
195 {
196         unsigned int error_bits, k, base;
197
198         assert(idx < PLAT_NR);
199
200         /* MSB <= (PLAT_BITS-1), cannot be rounded off. Use
201          * all bits of the sample as index */
202         if (idx < (PLAT_VAL << 1))
203                 return idx;
204
205         /* Find the group and compute the minimum value of that group */
206         error_bits = (idx >> PLAT_BITS) - 1;
207         base = 1 << (error_bits + PLAT_BITS);
208
209         /* Find its bucket number of the group */
210         k = idx % PLAT_VAL;
211
212         /* Return the mean of the range of the bucket */
213         return base + ((k + 0.5) * (1 << error_bits));
214 }
215
216 static void add_lat(struct stats *s, unsigned int us, const char *name)
217 {
218         int lat_index = 0;
219
220         if (us > s->max)
221                 s->max = us;
222         if (us < s->min)
223                 s->min = us;
224
225         if (us > max_us) {
226                 fprintf(stderr, "%s latency=%u usec\n", name, us);
227                 s->over++;
228         }
229
230         lat_index = plat_val_to_idx(us);
231         __sync_fetch_and_add(&s->plat[lat_index], 1);
232         __sync_fetch_and_add(&s->nr_samples, 1);
233 }
234
235 static int write_work(struct work_item *work)
236 {
237         struct timespec s, e;
238         ssize_t ret;
239
240         clock_gettime(CLOCK_MONOTONIC, &s);
241         ret = write(STDOUT_FILENO, work->buf, work->buf_size);
242         if (ret < 0)
243                 return (int)ret;
244         clock_gettime(CLOCK_MONOTONIC, &e);
245         assert(ret == work->buf_size);
246
247         add_lat(&work->writer->s, utime_since(&s, &e), "write");
248         return work->seq + 1;
249 }
250
251 static void thread_exiting(struct thread_data *thread)
252 {
253         __sync_fetch_and_add(&thread->done, 1);
254         pthread_cond_signal(&thread->done_cond);
255 }
256
257 static void *writer_fn(void *data)
258 {
259         struct writer_thread *wt = data;
260         struct work_item *work;
261         int seq = 1;
262
263         work = NULL;
264         while (!(seq < 0) && (!wt->thread.exit || !flist_empty(&wt->list))) {
265                 pthread_mutex_lock(&wt->thread.lock);
266
267                 if (work)
268                         flist_add_tail(&work->list, &wt->done_list);
269         
270                 work = find_seq(wt, seq);
271                 if (work)
272                         flist_del_init(&work->list);
273                 else
274                         pthread_cond_wait(&wt->thread.cond, &wt->thread.lock);
275
276                 pthread_mutex_unlock(&wt->thread.lock);
277
278                 if (work)
279                         seq = write_work(work);
280         }
281
282         thread_exiting(&wt->thread);
283         return NULL;
284 }
285
286 static void reader_work(struct work_item *work)
287 {
288         struct timespec s, e;
289         ssize_t ret;
290         size_t left;
291         void *buf;
292         off_t off;
293
294         clock_gettime(CLOCK_MONOTONIC, &s);
295
296         left = work->buf_size;
297         buf = work->buf;
298         off = work->off;
299         while (left) {
300                 ret = pread(work->fd, buf, left, off);
301                 if (!ret) {
302                         fprintf(stderr, "zero read\n");
303                         break;
304                 } else if (ret < 0) {
305                         fprintf(stderr, "errno=%d\n", errno);
306                         break;
307                 }
308                 left -= ret;
309                 off += ret;
310                 buf += ret;
311         }
312
313         clock_gettime(CLOCK_MONOTONIC, &e);
314
315         add_lat(&work->reader->s, utime_since(&s, &e), "read");
316
317         pthread_cond_signal(&work->cond);
318
319         if (separate_writer) {
320                 pthread_mutex_lock(&work->writer->thread.lock);
321                 flist_add_tail(&work->list, &work->writer->list);
322                 pthread_mutex_unlock(&work->writer->thread.lock);
323                 pthread_cond_signal(&work->writer->thread.cond);
324         } else {
325                 struct reader_thread *rt = work->reader;
326                 struct work_item *next = NULL;
327                 struct flist_head *entry;
328
329                 /*
330                  * Write current work if it matches in sequence.
331                  */
332                 if (work->seq == rt->write_seq)
333                         goto write_it;
334
335                 pthread_mutex_lock(&rt->thread.lock);
336
337                 flist_add_tail(&work->list, &rt->done_list);
338
339                 /*
340                  * See if the next work item is here, if so, write it
341                  */
342                 work = NULL;
343                 flist_for_each(entry, &rt->done_list) {
344                         next = flist_entry(entry, struct work_item, list);
345                         if (next->seq == rt->write_seq) {
346                                 work = next;
347                                 flist_del(&work->list);
348                                 break;
349                         }
350                 }
351
352                 pthread_mutex_unlock(&rt->thread.lock);
353         
354                 if (work) {
355 write_it:
356                         write_work(work);
357                         __sync_fetch_and_add(&rt->write_seq, 1);
358                 }
359         }
360 }
361
362 static void *reader_one_off(void *data)
363 {
364         reader_work(data);
365         return NULL;
366 }
367
368 static void *reader_fn(void *data)
369 {
370         struct reader_thread *rt = data;
371         struct work_item *work;
372
373         while (!rt->thread.exit || !flist_empty(&rt->list)) {
374                 work = NULL;
375                 pthread_mutex_lock(&rt->thread.lock);
376                 if (!flist_empty(&rt->list)) {
377                         work = flist_first_entry(&rt->list, struct work_item, list);
378                         flist_del_init(&work->list);
379                 } else
380                         pthread_cond_wait(&rt->thread.cond, &rt->thread.lock);
381                 pthread_mutex_unlock(&rt->thread.lock);
382
383                 if (work) {
384                         __sync_fetch_and_add(&rt->busy, 1);
385                         reader_work(work);
386                         __sync_fetch_and_sub(&rt->busy, 1);
387                 }
388         }
389
390         thread_exiting(&rt->thread);
391         return NULL;
392 }
393
394 static void queue_work(struct reader_thread *rt, struct work_item *work)
395 {
396         if (!rt->started) {
397                 pthread_mutex_lock(&rt->thread.lock);
398                 flist_add_tail(&work->list, &rt->list);
399                 pthread_mutex_unlock(&rt->thread.lock);
400
401                 rt->started = 1;
402                 pthread_create(&rt->thread.thread, NULL, reader_fn, rt);
403         } else if (!rt->busy && !pthread_mutex_trylock(&rt->thread.lock)) {
404                 flist_add_tail(&work->list, &rt->list);
405                 pthread_mutex_unlock(&rt->thread.lock);
406
407                 pthread_cond_signal(&rt->thread.cond);
408         } else {
409                 int ret = pthread_create(&work->thread, NULL, reader_one_off, work);
410                 if (ret) {
411                         fprintf(stderr, "pthread_create=%d\n", ret);
412                 } else {
413                         ret = pthread_detach(work->thread);
414                         if (ret)
415                                 fprintf(stderr, "pthread_detach=%d\n", ret);
416                 }
417         }
418 }
419
420 static unsigned int calc_percentiles(unsigned int *io_u_plat, unsigned long nr,
421                                      unsigned int **output)
422 {
423         unsigned long sum = 0;
424         unsigned int len, i, j = 0;
425         unsigned int oval_len = 0;
426         unsigned int *ovals = NULL;
427         int is_last;
428
429         len = 0;
430         while (len < PLAT_LIST_MAX && plist[len] != 0.0)
431                 len++;
432
433         if (!len)
434                 return 0;
435
436         /*
437          * Calculate bucket values, note down max and min values
438          */
439         is_last = 0;
440         for (i = 0; i < PLAT_NR && !is_last; i++) {
441                 sum += io_u_plat[i];
442                 while (sum >= (plist[j] / 100.0 * nr)) {
443                         assert(plist[j] <= 100.0);
444
445                         if (j == oval_len) {
446                                 oval_len += 100;
447                                 ovals = realloc(ovals, oval_len * sizeof(unsigned int));
448                         }
449
450                         ovals[j] = plat_idx_to_val(i);
451                         is_last = (j == len - 1);
452                         if (is_last)
453                                 break;
454
455                         j++;
456                 }
457         }
458
459         *output = ovals;
460         return len;
461 }
462
463 static void show_latencies(struct stats *s, const char *msg)
464 {
465         unsigned int *ovals = NULL;
466         unsigned int len, i;
467
468         len = calc_percentiles(s->plat, s->nr_samples, &ovals);
469         if (len) {
470                 fprintf(stderr, "Latency percentiles (usec) (%s)\n", msg);
471                 for (i = 0; i < len; i++)
472                         fprintf(stderr, "\t%2.4fth: %u\n", plist[i], ovals[i]);
473         }
474
475         if (ovals)
476                 free(ovals);
477
478         fprintf(stderr, "\tOver=%u, min=%u, max=%u\n", s->over, s->min, s->max);
479 }
480
481 static void init_thread(struct thread_data *thread)
482 {
483         pthread_condattr_t cattr;
484         int ret;
485
486         ret = pthread_condattr_init(&cattr);
487         CHECK_ZERO_OR_ABORT(ret);
488 #ifdef CONFIG_PTHREAD_CONDATTR_SETCLOCK
489         ret = pthread_condattr_setclock(&cattr, CLOCK_MONOTONIC);
490         CHECK_ZERO_OR_ABORT(ret);
491 #endif
492         pthread_cond_init(&thread->cond, &cattr);
493         pthread_cond_init(&thread->done_cond, &cattr);
494         pthread_mutex_init(&thread->lock, NULL);
495         pthread_mutex_init(&thread->done_lock, NULL);
496         thread->exit = 0;
497 }
498
499 static void exit_thread(struct thread_data *thread,
500                         void fn(struct writer_thread *),
501                         struct writer_thread *wt)
502 {
503         __sync_fetch_and_add(&thread->exit, 1);
504         pthread_cond_signal(&thread->cond);
505
506         while (!thread->done) {
507                 pthread_mutex_lock(&thread->done_lock);
508
509                 if (fn) {
510                         struct timespec ts;
511
512 #ifdef CONFIG_PTHREAD_CONDATTR_SETCLOCK
513                         clock_gettime(CLOCK_MONOTONIC, &ts);
514 #else
515                         clock_gettime(CLOCK_REALTIME, &ts);
516 #endif
517                         ts.tv_sec++;
518
519                         pthread_cond_timedwait(&thread->done_cond, &thread->done_lock, &ts);
520                         fn(wt);
521                 } else
522                         pthread_cond_wait(&thread->done_cond, &thread->done_lock);
523
524                 pthread_mutex_unlock(&thread->done_lock);
525         }
526 }
527
528 static int usage(char *argv[])
529 {
530         fprintf(stderr, "%s: [-b blocksize] [-t max usec] [-w separate writer] -f file\n", argv[0]);
531         return 1;
532 }
533
534 static int parse_options(int argc, char *argv[])
535 {
536         int c;
537
538         while ((c = getopt(argc, argv, "f:b:t:w:")) != -1) {
539                 switch (c) {
540                 case 'f':
541                         if (file)
542                                 return usage(argv);
543                         file = strdup(optarg);
544                         break;
545                 case 'b':
546                         bs = atoi(optarg);
547                         break;
548                 case 't':
549                         max_us = atoi(optarg);
550                         break;
551                 case 'w':
552                         separate_writer = atoi(optarg);
553                         if (!separate_writer)
554                                 fprintf(stderr, "inline writing is broken\n");
555                         break;
556                 case '?':
557                 default:
558                         return usage(argv);
559                 }
560         }
561
562         if (!file)
563                 return usage(argv);
564
565         return 0;
566 }
567
568 static void prune_done_entries(struct writer_thread *wt)
569 {
570         FLIST_HEAD(list);
571
572         if (flist_empty(&wt->done_list))
573                 return;
574
575         if (pthread_mutex_trylock(&wt->thread.lock))
576                 return;
577
578         if (!flist_empty(&wt->done_list))
579                 flist_splice_init(&wt->done_list, &list);
580         pthread_mutex_unlock(&wt->thread.lock);
581
582         while (!flist_empty(&list)) {
583                 struct work_item *work;
584
585                 work = flist_first_entry(&list, struct work_item, list);
586                 flist_del(&work->list);
587
588                 pthread_cond_destroy(&work->cond);
589                 pthread_mutex_destroy(&work->lock);
590                 free(work->buf);
591                 free(work);
592         }
593 }
594
595 int main(int argc, char *argv[])
596 {
597         pthread_condattr_t cattr;
598         struct timespec s, re, we;
599         struct reader_thread *rt;
600         struct writer_thread *wt;
601         unsigned long rate;
602         uint64_t elapsed;
603         struct stat sb;
604         size_t bytes;
605         off_t off;
606         int fd, seq;
607         int ret;
608
609         if (parse_options(argc, argv))
610                 return 1;
611
612         fd = open(file, O_RDONLY);
613         if (fd < 0) {
614                 perror("open");
615                 return 2;
616         }
617
618         if (fstat(fd, &sb) < 0) {
619                 perror("stat");
620                 return 3;
621         }
622
623         wt = &writer_thread;
624         init_thread(&wt->thread);
625         INIT_FLIST_HEAD(&wt->list);
626         INIT_FLIST_HEAD(&wt->done_list);
627         wt->s.max = 0;
628         wt->s.min = -1U;
629         pthread_create(&wt->thread.thread, NULL, writer_fn, wt);
630
631         rt = &reader_thread;
632         init_thread(&rt->thread);
633         INIT_FLIST_HEAD(&rt->list);
634         INIT_FLIST_HEAD(&rt->done_list);
635         rt->s.max = 0;
636         rt->s.min = -1U;
637         rt->write_seq = 1;
638
639         off = 0;
640         seq = 0;
641         bytes = 0;
642
643         ret = pthread_condattr_init(&cattr);
644         CHECK_ZERO_OR_ABORT(ret);
645 #ifdef CONFIG_PTHREAD_CONDATTR_SETCLOCK
646         ret = pthread_condattr_setclock(&cattr, CLOCK_MONOTONIC);
647         CHECK_ZERO_OR_ABORT(ret);
648 #endif
649
650         clock_gettime(CLOCK_MONOTONIC, &s);
651
652         while (sb.st_size) {
653                 struct work_item *work;
654                 size_t this_len;
655                 struct timespec ts;
656
657                 prune_done_entries(wt);
658
659                 this_len = sb.st_size;
660                 if (this_len > bs)
661                         this_len = bs;
662
663                 work = calloc(1, sizeof(*work));
664                 work->buf = malloc(this_len);
665                 work->buf_size = this_len;
666                 work->off = off;
667                 work->fd = fd;
668                 work->seq = ++seq;
669                 work->writer = wt;
670                 work->reader = rt;
671                 pthread_cond_init(&work->cond, &cattr);
672                 pthread_mutex_init(&work->lock, NULL);
673
674                 queue_work(rt, work);
675
676 #ifdef CONFIG_PTHREAD_CONDATTR_SETCLOCK
677                 clock_gettime(CLOCK_MONOTONIC, &ts);
678 #else
679                 clock_gettime(CLOCK_REALTIME, &ts);
680 #endif
681                 ts.tv_nsec += max_us * 1000ULL;
682                 if (ts.tv_nsec >= 1000000000ULL) {
683                         ts.tv_nsec -= 1000000000ULL;
684                         ts.tv_sec++;
685                 }
686
687                 pthread_mutex_lock(&work->lock);
688                 pthread_cond_timedwait(&work->cond, &work->lock, &ts);
689                 pthread_mutex_unlock(&work->lock);
690
691                 off += this_len;
692                 sb.st_size -= this_len;
693                 bytes += this_len;
694         }
695
696         exit_thread(&rt->thread, NULL, NULL);
697         clock_gettime(CLOCK_MONOTONIC, &re);
698
699         exit_thread(&wt->thread, prune_done_entries, wt);
700         clock_gettime(CLOCK_MONOTONIC, &we);
701
702         show_latencies(&rt->s, "READERS");
703         show_latencies(&wt->s, "WRITERS");
704
705         bytes /= 1024;
706         elapsed = utime_since(&s, &re);
707         rate = elapsed ? (bytes * 1000UL * 1000UL) / elapsed : 0;
708         fprintf(stderr, "Read rate (KiB/sec) : %lu\n", rate);
709         elapsed = utime_since(&s, &we);
710         rate = elapsed ? (bytes * 1000UL * 1000UL) / elapsed : 0;
711         fprintf(stderr, "Write rate (KiB/sec): %lu\n", rate);
712
713         close(fd);
714         return 0;
715 }