fix missing headers in multiple files
[fio.git] / t / read-to-pipe-async.c
CommitLineData
6d5a9bc2
JA
1/*
2 * Read a file and write the contents to stdout. If a given read takes
3 * longer than 'max_us' time, then we schedule a new thread to handle
4 * the next read. This avoids the coordinated omission problem, where
5 * one request appears to take a long time, but in reality a lot of
6 * requests would have been slow, but we don't notice since new submissions
7 * are not being issued if just 1 is held up.
8 *
9 * One test case:
10 *
11 * $ time (./read-to-pipe-async -f randfile.gz | gzip -dc > outfile; sync)
12 *
13 * This will read randfile.gz and log the latencies of doing so, while
14 * piping the output to gzip to decompress it. Any latencies over max_us
15 * are logged when they happen, and latency buckets are displayed at the
16 * end of the run
17 *
18 * gcc -Wall -g -O2 -o read-to-pipe-async read-to-pipe-async.c -lpthread
19 *
20 * Copyright (C) 2016 Jens Axboe
21 *
22 */
78b66d32 23
6d5a9bc2
JA
24#include <stdio.h>
25#include <stdlib.h>
26#include <unistd.h>
27#include <fcntl.h>
28#include <sys/time.h>
29#include <sys/types.h>
30#include <sys/stat.h>
31#include <inttypes.h>
32#include <string.h>
33#include <pthread.h>
34#include <errno.h>
35#include <assert.h>
36
37#include "../flist.h"
38
1f081ec0
DP
39#include "compiler/compiler.h"
40
6d5a9bc2
JA
41static int bs = 4096;
42static int max_us = 10000;
43static char *file;
44static int separate_writer = 1;
45
46#define PLAT_BITS 8
47#define PLAT_VAL (1 << PLAT_BITS)
48#define PLAT_GROUP_NR 19
49#define PLAT_NR (PLAT_GROUP_NR * PLAT_VAL)
50#define PLAT_LIST_MAX 20
51
1f081ec0
DP
52#ifndef NDEBUG
53#define CHECK_ZERO_OR_ABORT(code) assert(code)
54#else
55#define CHECK_ZERO_OR_ABORT(code) \
56 do { \
57 if (fio_unlikely((code) != 0)) { \
58 log_err("failed checking code %i != 0", (code)); \
59 abort(); \
60 } \
61 } while (0)
62#endif
63
6d5a9bc2
JA
64struct stats {
65 unsigned int plat[PLAT_NR];
66 unsigned int nr_samples;
67 unsigned int max;
68 unsigned int min;
69 unsigned int over;
70};
71
72static double plist[PLAT_LIST_MAX] = { 50.0, 75.0, 90.0, 95.0, 99.0, 99.5, 99.9, 99.99, 99.999, 99.9999, };
73
74struct thread_data {
75 int exit;
76 int done;
77 pthread_mutex_t lock;
78 pthread_cond_t cond;
79 pthread_mutex_t done_lock;
80 pthread_cond_t done_cond;
81 pthread_t thread;
82};
83
84struct writer_thread {
85 struct flist_head list;
86 struct flist_head done_list;
87 struct stats s;
88 struct thread_data thread;
89};
90
91struct reader_thread {
92 struct flist_head list;
93 struct flist_head done_list;
94 int started;
95 int busy;
96 int write_seq;
97 struct stats s;
98 struct thread_data thread;
99};
100
101struct work_item {
102 struct flist_head list;
103 void *buf;
104 size_t buf_size;
105 off_t off;
106 int fd;
107 int seq;
108 struct writer_thread *writer;
109 struct reader_thread *reader;
110 pthread_mutex_t lock;
111 pthread_cond_t cond;
112 pthread_t thread;
113};
114
115static struct reader_thread reader_thread;
116static struct writer_thread writer_thread;
117
6a087e5a 118uint64_t utime_since(const struct timespec *s, const struct timespec *e)
6d5a9bc2
JA
119{
120 long sec, usec;
121 uint64_t ret;
122
123 sec = e->tv_sec - s->tv_sec;
6a087e5a 124 usec = (e->tv_nsec - s->tv_nsec) / 1000;
6d5a9bc2
JA
125 if (sec > 0 && usec < 0) {
126 sec--;
127 usec += 1000000;
128 }
129
130 if (sec < 0 || (sec == 0 && usec < 0))
131 return 0;
132
133 ret = sec * 1000000ULL + usec;
134
135 return ret;
136}
137
1f081ec0 138static struct work_item *find_seq(struct writer_thread *w, int seq)
6d5a9bc2
JA
139{
140 struct work_item *work;
141 struct flist_head *entry;
142
143 if (flist_empty(&w->list))
144 return NULL;
145
146 flist_for_each(entry, &w->list) {
147 work = flist_entry(entry, struct work_item, list);
148 if (work->seq == seq)
149 return work;
150 }
151
152 return NULL;
153}
154
155static unsigned int plat_val_to_idx(unsigned int val)
156{
157 unsigned int msb, error_bits, base, offset;
158
159 /* Find MSB starting from bit 0 */
160 if (val == 0)
161 msb = 0;
162 else
163 msb = sizeof(val)*8 - __builtin_clz(val) - 1;
164
165 /*
166 * MSB <= (PLAT_BITS-1), cannot be rounded off. Use
167 * all bits of the sample as index
168 */
169 if (msb <= PLAT_BITS)
170 return val;
171
172 /* Compute the number of error bits to discard*/
173 error_bits = msb - PLAT_BITS;
174
175 /* Compute the number of buckets before the group */
176 base = (error_bits + 1) << PLAT_BITS;
177
178 /*
179 * Discard the error bits and apply the mask to find the
180 * index for the buckets in the group
181 */
182 offset = (PLAT_VAL - 1) & (val >> error_bits);
183
184 /* Make sure the index does not exceed (array size - 1) */
185 return (base + offset) < (PLAT_NR - 1) ?
186 (base + offset) : (PLAT_NR - 1);
187}
188
189/*
190 * Convert the given index of the bucket array to the value
191 * represented by the bucket
192 */
193static unsigned int plat_idx_to_val(unsigned int idx)
194{
195 unsigned int error_bits, k, base;
196
197 assert(idx < PLAT_NR);
198
199 /* MSB <= (PLAT_BITS-1), cannot be rounded off. Use
200 * all bits of the sample as index */
201 if (idx < (PLAT_VAL << 1))
202 return idx;
203
204 /* Find the group and compute the minimum value of that group */
205 error_bits = (idx >> PLAT_BITS) - 1;
206 base = 1 << (error_bits + PLAT_BITS);
207
208 /* Find its bucket number of the group */
209 k = idx % PLAT_VAL;
210
211 /* Return the mean of the range of the bucket */
212 return base + ((k + 0.5) * (1 << error_bits));
213}
214
215static void add_lat(struct stats *s, unsigned int us, const char *name)
216{
217 int lat_index = 0;
218
219 if (us > s->max)
220 s->max = us;
221 if (us < s->min)
222 s->min = us;
223
224 if (us > max_us) {
225 fprintf(stderr, "%s latency=%u usec\n", name, us);
226 s->over++;
227 }
228
229 lat_index = plat_val_to_idx(us);
230 __sync_fetch_and_add(&s->plat[lat_index], 1);
231 __sync_fetch_and_add(&s->nr_samples, 1);
232}
233
234static int write_work(struct work_item *work)
235{
6a087e5a 236 struct timespec s, e;
6d5a9bc2
JA
237 ssize_t ret;
238
6a087e5a 239 clock_gettime(CLOCK_MONOTONIC, &s);
6d5a9bc2 240 ret = write(STDOUT_FILENO, work->buf, work->buf_size);
1f081ec0
DP
241 if (ret < 0)
242 return (int)ret;
6a087e5a 243 clock_gettime(CLOCK_MONOTONIC, &e);
6d5a9bc2
JA
244 assert(ret == work->buf_size);
245
246 add_lat(&work->writer->s, utime_since(&s, &e), "write");
247 return work->seq + 1;
248}
249
28cacec4
JA
250static void thread_exiting(struct thread_data *thread)
251{
252 __sync_fetch_and_add(&thread->done, 1);
253 pthread_cond_signal(&thread->done_cond);
254}
255
6d5a9bc2
JA
256static void *writer_fn(void *data)
257{
258 struct writer_thread *wt = data;
259 struct work_item *work;
1f081ec0 260 int seq = 1;
6d5a9bc2
JA
261
262 work = NULL;
1f081ec0 263 while (!(seq < 0) && (!wt->thread.exit || !flist_empty(&wt->list))) {
6d5a9bc2
JA
264 pthread_mutex_lock(&wt->thread.lock);
265
45bf2f40 266 if (work)
6d5a9bc2 267 flist_add_tail(&work->list, &wt->done_list);
6d5a9bc2
JA
268
269 work = find_seq(wt, seq);
270 if (work)
271 flist_del_init(&work->list);
272 else
273 pthread_cond_wait(&wt->thread.cond, &wt->thread.lock);
274
275 pthread_mutex_unlock(&wt->thread.lock);
276
277 if (work)
278 seq = write_work(work);
279 }
280
28cacec4 281 thread_exiting(&wt->thread);
6d5a9bc2
JA
282 return NULL;
283}
284
285static void reader_work(struct work_item *work)
286{
6a087e5a 287 struct timespec s, e;
6d5a9bc2
JA
288 ssize_t ret;
289 size_t left;
290 void *buf;
291 off_t off;
292
6a087e5a 293 clock_gettime(CLOCK_MONOTONIC, &s);
6d5a9bc2
JA
294
295 left = work->buf_size;
296 buf = work->buf;
297 off = work->off;
298 while (left) {
299 ret = pread(work->fd, buf, left, off);
300 if (!ret) {
301 fprintf(stderr, "zero read\n");
302 break;
303 } else if (ret < 0) {
304 fprintf(stderr, "errno=%d\n", errno);
305 break;
306 }
307 left -= ret;
308 off += ret;
309 buf += ret;
310 }
311
6a087e5a 312 clock_gettime(CLOCK_MONOTONIC, &e);
6d5a9bc2
JA
313
314 add_lat(&work->reader->s, utime_since(&s, &e), "read");
315
316 pthread_cond_signal(&work->cond);
317
318 if (separate_writer) {
319 pthread_mutex_lock(&work->writer->thread.lock);
320 flist_add_tail(&work->list, &work->writer->list);
321 pthread_mutex_unlock(&work->writer->thread.lock);
322 pthread_cond_signal(&work->writer->thread.cond);
323 } else {
324 struct reader_thread *rt = work->reader;
325 struct work_item *next = NULL;
326 struct flist_head *entry;
327
328 /*
329 * Write current work if it matches in sequence.
330 */
331 if (work->seq == rt->write_seq)
332 goto write_it;
333
334 pthread_mutex_lock(&rt->thread.lock);
335
336 flist_add_tail(&work->list, &rt->done_list);
337
338 /*
339 * See if the next work item is here, if so, write it
340 */
341 work = NULL;
342 flist_for_each(entry, &rt->done_list) {
343 next = flist_entry(entry, struct work_item, list);
344 if (next->seq == rt->write_seq) {
345 work = next;
346 flist_del(&work->list);
347 break;
348 }
349 }
350
351 pthread_mutex_unlock(&rt->thread.lock);
352
353 if (work) {
354write_it:
355 write_work(work);
356 __sync_fetch_and_add(&rt->write_seq, 1);
357 }
358 }
359}
360
361static void *reader_one_off(void *data)
362{
363 reader_work(data);
364 return NULL;
365}
366
367static void *reader_fn(void *data)
368{
369 struct reader_thread *rt = data;
370 struct work_item *work;
371
372 while (!rt->thread.exit || !flist_empty(&rt->list)) {
373 work = NULL;
374 pthread_mutex_lock(&rt->thread.lock);
375 if (!flist_empty(&rt->list)) {
376 work = flist_first_entry(&rt->list, struct work_item, list);
377 flist_del_init(&work->list);
378 } else
379 pthread_cond_wait(&rt->thread.cond, &rt->thread.lock);
380 pthread_mutex_unlock(&rt->thread.lock);
381
382 if (work) {
28cacec4 383 __sync_fetch_and_add(&rt->busy, 1);
6d5a9bc2 384 reader_work(work);
28cacec4 385 __sync_fetch_and_sub(&rt->busy, 1);
6d5a9bc2
JA
386 }
387 }
388
28cacec4 389 thread_exiting(&rt->thread);
6d5a9bc2
JA
390 return NULL;
391}
392
393static void queue_work(struct reader_thread *rt, struct work_item *work)
394{
395 if (!rt->started) {
396 pthread_mutex_lock(&rt->thread.lock);
397 flist_add_tail(&work->list, &rt->list);
398 pthread_mutex_unlock(&rt->thread.lock);
399
400 rt->started = 1;
401 pthread_create(&rt->thread.thread, NULL, reader_fn, rt);
402 } else if (!rt->busy && !pthread_mutex_trylock(&rt->thread.lock)) {
403 flist_add_tail(&work->list, &rt->list);
404 pthread_mutex_unlock(&rt->thread.lock);
405
406 pthread_cond_signal(&rt->thread.cond);
407 } else {
408 int ret = pthread_create(&work->thread, NULL, reader_one_off, work);
2c1520d7 409 if (ret) {
6d5a9bc2 410 fprintf(stderr, "pthread_create=%d\n", ret);
2c1520d7
BVA
411 } else {
412 ret = pthread_detach(work->thread);
413 if (ret)
414 fprintf(stderr, "pthread_detach=%d\n", ret);
415 }
6d5a9bc2
JA
416 }
417}
418
419static unsigned int calc_percentiles(unsigned int *io_u_plat, unsigned long nr,
420 unsigned int **output)
421{
422 unsigned long sum = 0;
423 unsigned int len, i, j = 0;
424 unsigned int oval_len = 0;
425 unsigned int *ovals = NULL;
426 int is_last;
427
428 len = 0;
429 while (len < PLAT_LIST_MAX && plist[len] != 0.0)
430 len++;
431
432 if (!len)
433 return 0;
434
435 /*
436 * Calculate bucket values, note down max and min values
437 */
438 is_last = 0;
439 for (i = 0; i < PLAT_NR && !is_last; i++) {
440 sum += io_u_plat[i];
441 while (sum >= (plist[j] / 100.0 * nr)) {
442 assert(plist[j] <= 100.0);
443
444 if (j == oval_len) {
445 oval_len += 100;
446 ovals = realloc(ovals, oval_len * sizeof(unsigned int));
447 }
448
449 ovals[j] = plat_idx_to_val(i);
450 is_last = (j == len - 1);
451 if (is_last)
452 break;
453
454 j++;
455 }
456 }
457
458 *output = ovals;
459 return len;
460}
461
462static void show_latencies(struct stats *s, const char *msg)
463{
464 unsigned int *ovals = NULL;
465 unsigned int len, i;
466
467 len = calc_percentiles(s->plat, s->nr_samples, &ovals);
468 if (len) {
469 fprintf(stderr, "Latency percentiles (usec) (%s)\n", msg);
470 for (i = 0; i < len; i++)
471 fprintf(stderr, "\t%2.4fth: %u\n", plist[i], ovals[i]);
472 }
473
474 if (ovals)
475 free(ovals);
476
477 fprintf(stderr, "\tOver=%u, min=%u, max=%u\n", s->over, s->min, s->max);
478}
479
480static void init_thread(struct thread_data *thread)
481{
78b66d32
BVA
482 pthread_condattr_t cattr;
483 int ret;
484
485 ret = pthread_condattr_init(&cattr);
1f081ec0 486 CHECK_ZERO_OR_ABORT(ret);
78b66d32
BVA
487#ifdef CONFIG_PTHREAD_CONDATTR_SETCLOCK
488 ret = pthread_condattr_setclock(&cattr, CLOCK_MONOTONIC);
1f081ec0 489 CHECK_ZERO_OR_ABORT(ret);
78b66d32
BVA
490#endif
491 pthread_cond_init(&thread->cond, &cattr);
492 pthread_cond_init(&thread->done_cond, &cattr);
6d5a9bc2
JA
493 pthread_mutex_init(&thread->lock, NULL);
494 pthread_mutex_init(&thread->done_lock, NULL);
495 thread->exit = 0;
496}
497
498static void exit_thread(struct thread_data *thread,
499 void fn(struct writer_thread *),
500 struct writer_thread *wt)
501{
28cacec4 502 __sync_fetch_and_add(&thread->exit, 1);
6d5a9bc2
JA
503 pthread_cond_signal(&thread->cond);
504
505 while (!thread->done) {
506 pthread_mutex_lock(&thread->done_lock);
507
508 if (fn) {
53280a1d 509 struct timespec ts;
6d5a9bc2 510
78b66d32
BVA
511#ifdef CONFIG_PTHREAD_CONDATTR_SETCLOCK
512 clock_gettime(CLOCK_MONOTONIC, &ts);
513#else
514 clock_gettime(CLOCK_REALTIME, &ts);
515#endif
516 ts.tv_sec++;
6d5a9bc2 517
53280a1d 518 pthread_cond_timedwait(&thread->done_cond, &thread->done_lock, &ts);
6d5a9bc2
JA
519 fn(wt);
520 } else
521 pthread_cond_wait(&thread->done_cond, &thread->done_lock);
522
523 pthread_mutex_unlock(&thread->done_lock);
524 }
525}
526
527static int usage(char *argv[])
528{
529 fprintf(stderr, "%s: [-b blocksize] [-t max usec] [-w separate writer] -f file\n", argv[0]);
530 return 1;
531}
532
533static int parse_options(int argc, char *argv[])
534{
535 int c;
536
537 while ((c = getopt(argc, argv, "f:b:t:w:")) != -1) {
538 switch (c) {
539 case 'f':
a06aec04
BVA
540 if (file)
541 return usage(argv);
6d5a9bc2
JA
542 file = strdup(optarg);
543 break;
544 case 'b':
545 bs = atoi(optarg);
546 break;
547 case 't':
548 max_us = atoi(optarg);
549 break;
550 case 'w':
551 separate_writer = atoi(optarg);
552 if (!separate_writer)
553 fprintf(stderr, "inline writing is broken\n");
554 break;
555 case '?':
556 default:
557 return usage(argv);
558 }
559 }
560
561 if (!file)
562 return usage(argv);
563
564 return 0;
565}
566
567static void prune_done_entries(struct writer_thread *wt)
568{
569 FLIST_HEAD(list);
570
571 if (flist_empty(&wt->done_list))
572 return;
573
574 if (pthread_mutex_trylock(&wt->thread.lock))
575 return;
576
577 if (!flist_empty(&wt->done_list))
578 flist_splice_init(&wt->done_list, &list);
579 pthread_mutex_unlock(&wt->thread.lock);
580
581 while (!flist_empty(&list)) {
582 struct work_item *work;
583
584 work = flist_first_entry(&list, struct work_item, list);
585 flist_del(&work->list);
586
587 pthread_cond_destroy(&work->cond);
588 pthread_mutex_destroy(&work->lock);
589 free(work->buf);
590 free(work);
591 }
592}
593
594int main(int argc, char *argv[])
595{
78b66d32 596 pthread_condattr_t cattr;
6a087e5a 597 struct timespec s, re, we;
6d5a9bc2
JA
598 struct reader_thread *rt;
599 struct writer_thread *wt;
600 unsigned long rate;
b38cc6cb 601 uint64_t elapsed;
6d5a9bc2
JA
602 struct stat sb;
603 size_t bytes;
604 off_t off;
605 int fd, seq;
78b66d32 606 int ret;
6d5a9bc2
JA
607
608 if (parse_options(argc, argv))
609 return 1;
610
611 fd = open(file, O_RDONLY);
612 if (fd < 0) {
613 perror("open");
614 return 2;
615 }
616
617 if (fstat(fd, &sb) < 0) {
618 perror("stat");
619 return 3;
620 }
621
622 wt = &writer_thread;
623 init_thread(&wt->thread);
624 INIT_FLIST_HEAD(&wt->list);
625 INIT_FLIST_HEAD(&wt->done_list);
626 wt->s.max = 0;
627 wt->s.min = -1U;
628 pthread_create(&wt->thread.thread, NULL, writer_fn, wt);
629
630 rt = &reader_thread;
631 init_thread(&rt->thread);
632 INIT_FLIST_HEAD(&rt->list);
633 INIT_FLIST_HEAD(&rt->done_list);
634 rt->s.max = 0;
635 rt->s.min = -1U;
636 rt->write_seq = 1;
637
638 off = 0;
639 seq = 0;
640 bytes = 0;
641
78b66d32 642 ret = pthread_condattr_init(&cattr);
1f081ec0 643 CHECK_ZERO_OR_ABORT(ret);
78b66d32
BVA
644#ifdef CONFIG_PTHREAD_CONDATTR_SETCLOCK
645 ret = pthread_condattr_setclock(&cattr, CLOCK_MONOTONIC);
1f081ec0 646 CHECK_ZERO_OR_ABORT(ret);
78b66d32
BVA
647#endif
648
6a087e5a 649 clock_gettime(CLOCK_MONOTONIC, &s);
6d5a9bc2
JA
650
651 while (sb.st_size) {
652 struct work_item *work;
653 size_t this_len;
53280a1d 654 struct timespec ts;
6d5a9bc2
JA
655
656 prune_done_entries(wt);
657
658 this_len = sb.st_size;
659 if (this_len > bs)
660 this_len = bs;
661
662 work = calloc(1, sizeof(*work));
663 work->buf = malloc(this_len);
664 work->buf_size = this_len;
665 work->off = off;
666 work->fd = fd;
667 work->seq = ++seq;
668 work->writer = wt;
669 work->reader = rt;
78b66d32 670 pthread_cond_init(&work->cond, &cattr);
6d5a9bc2
JA
671 pthread_mutex_init(&work->lock, NULL);
672
673 queue_work(rt, work);
674
78b66d32
BVA
675#ifdef CONFIG_PTHREAD_CONDATTR_SETCLOCK
676 clock_gettime(CLOCK_MONOTONIC, &ts);
677#else
678 clock_gettime(CLOCK_REALTIME, &ts);
679#endif
53280a1d
JA
680 ts.tv_nsec += max_us * 1000ULL;
681 if (ts.tv_nsec >= 1000000000ULL) {
682 ts.tv_nsec -= 1000000000ULL;
683 ts.tv_sec++;
6d5a9bc2
JA
684 }
685
686 pthread_mutex_lock(&work->lock);
53280a1d 687 pthread_cond_timedwait(&work->cond, &work->lock, &ts);
6d5a9bc2
JA
688 pthread_mutex_unlock(&work->lock);
689
690 off += this_len;
691 sb.st_size -= this_len;
692 bytes += this_len;
693 }
694
695 exit_thread(&rt->thread, NULL, NULL);
6a087e5a 696 clock_gettime(CLOCK_MONOTONIC, &re);
6d5a9bc2
JA
697
698 exit_thread(&wt->thread, prune_done_entries, wt);
6a087e5a 699 clock_gettime(CLOCK_MONOTONIC, &we);
6d5a9bc2
JA
700
701 show_latencies(&rt->s, "READERS");
702 show_latencies(&wt->s, "WRITERS");
703
704 bytes /= 1024;
b38cc6cb
BVA
705 elapsed = utime_since(&s, &re);
706 rate = elapsed ? (bytes * 1000UL * 1000UL) / elapsed : 0;
4870138d 707 fprintf(stderr, "Read rate (KiB/sec) : %lu\n", rate);
b38cc6cb
BVA
708 elapsed = utime_since(&s, &we);
709 rate = elapsed ? (bytes * 1000UL * 1000UL) / elapsed : 0;
4870138d 710 fprintf(stderr, "Write rate (KiB/sec): %lu\n", rate);
6d5a9bc2
JA
711
712 close(fd);
713 return 0;
714}