t/read-to-pipe-async: remove dead code
[fio.git] / t / read-to-pipe-async.c
CommitLineData
6d5a9bc2
JA
1/*
2 * Read a file and write the contents to stdout. If a given read takes
3 * longer than 'max_us' time, then we schedule a new thread to handle
4 * the next read. This avoids the coordinated omission problem, where
5 * one request appears to take a long time, but in reality a lot of
6 * requests would have been slow, but we don't notice since new submissions
7 * are not being issued if just 1 is held up.
8 *
9 * One test case:
10 *
11 * $ time (./read-to-pipe-async -f randfile.gz | gzip -dc > outfile; sync)
12 *
13 * This will read randfile.gz and log the latencies of doing so, while
14 * piping the output to gzip to decompress it. Any latencies over max_us
15 * are logged when they happen, and latency buckets are displayed at the
16 * end of the run
17 *
18 * gcc -Wall -g -O2 -o read-to-pipe-async read-to-pipe-async.c -lpthread
19 *
20 * Copyright (C) 2016 Jens Axboe
21 *
22 */
78b66d32 23
6d5a9bc2
JA
24#include <stdio.h>
25#include <stdlib.h>
26#include <unistd.h>
27#include <fcntl.h>
28#include <sys/time.h>
29#include <sys/types.h>
30#include <sys/stat.h>
31#include <inttypes.h>
32#include <string.h>
33#include <pthread.h>
34#include <errno.h>
35#include <assert.h>
36
37#include "../flist.h"
38
39static int bs = 4096;
40static int max_us = 10000;
41static char *file;
42static int separate_writer = 1;
43
44#define PLAT_BITS 8
45#define PLAT_VAL (1 << PLAT_BITS)
46#define PLAT_GROUP_NR 19
47#define PLAT_NR (PLAT_GROUP_NR * PLAT_VAL)
48#define PLAT_LIST_MAX 20
49
50struct stats {
51 unsigned int plat[PLAT_NR];
52 unsigned int nr_samples;
53 unsigned int max;
54 unsigned int min;
55 unsigned int over;
56};
57
58static double plist[PLAT_LIST_MAX] = { 50.0, 75.0, 90.0, 95.0, 99.0, 99.5, 99.9, 99.99, 99.999, 99.9999, };
59
60struct thread_data {
61 int exit;
62 int done;
63 pthread_mutex_t lock;
64 pthread_cond_t cond;
65 pthread_mutex_t done_lock;
66 pthread_cond_t done_cond;
67 pthread_t thread;
68};
69
70struct writer_thread {
71 struct flist_head list;
72 struct flist_head done_list;
73 struct stats s;
74 struct thread_data thread;
75};
76
77struct reader_thread {
78 struct flist_head list;
79 struct flist_head done_list;
80 int started;
81 int busy;
82 int write_seq;
83 struct stats s;
84 struct thread_data thread;
85};
86
87struct work_item {
88 struct flist_head list;
89 void *buf;
90 size_t buf_size;
91 off_t off;
92 int fd;
93 int seq;
94 struct writer_thread *writer;
95 struct reader_thread *reader;
96 pthread_mutex_t lock;
97 pthread_cond_t cond;
98 pthread_t thread;
99};
100
101static struct reader_thread reader_thread;
102static struct writer_thread writer_thread;
103
6a087e5a 104uint64_t utime_since(const struct timespec *s, const struct timespec *e)
6d5a9bc2
JA
105{
106 long sec, usec;
107 uint64_t ret;
108
109 sec = e->tv_sec - s->tv_sec;
6a087e5a 110 usec = (e->tv_nsec - s->tv_nsec) / 1000;
6d5a9bc2
JA
111 if (sec > 0 && usec < 0) {
112 sec--;
113 usec += 1000000;
114 }
115
116 if (sec < 0 || (sec == 0 && usec < 0))
117 return 0;
118
119 ret = sec * 1000000ULL + usec;
120
121 return ret;
122}
123
124static struct work_item *find_seq(struct writer_thread *w, unsigned int seq)
125{
126 struct work_item *work;
127 struct flist_head *entry;
128
129 if (flist_empty(&w->list))
130 return NULL;
131
132 flist_for_each(entry, &w->list) {
133 work = flist_entry(entry, struct work_item, list);
134 if (work->seq == seq)
135 return work;
136 }
137
138 return NULL;
139}
140
141static unsigned int plat_val_to_idx(unsigned int val)
142{
143 unsigned int msb, error_bits, base, offset;
144
145 /* Find MSB starting from bit 0 */
146 if (val == 0)
147 msb = 0;
148 else
149 msb = sizeof(val)*8 - __builtin_clz(val) - 1;
150
151 /*
152 * MSB <= (PLAT_BITS-1), cannot be rounded off. Use
153 * all bits of the sample as index
154 */
155 if (msb <= PLAT_BITS)
156 return val;
157
158 /* Compute the number of error bits to discard*/
159 error_bits = msb - PLAT_BITS;
160
161 /* Compute the number of buckets before the group */
162 base = (error_bits + 1) << PLAT_BITS;
163
164 /*
165 * Discard the error bits and apply the mask to find the
166 * index for the buckets in the group
167 */
168 offset = (PLAT_VAL - 1) & (val >> error_bits);
169
170 /* Make sure the index does not exceed (array size - 1) */
171 return (base + offset) < (PLAT_NR - 1) ?
172 (base + offset) : (PLAT_NR - 1);
173}
174
175/*
176 * Convert the given index of the bucket array to the value
177 * represented by the bucket
178 */
179static unsigned int plat_idx_to_val(unsigned int idx)
180{
181 unsigned int error_bits, k, base;
182
183 assert(idx < PLAT_NR);
184
185 /* MSB <= (PLAT_BITS-1), cannot be rounded off. Use
186 * all bits of the sample as index */
187 if (idx < (PLAT_VAL << 1))
188 return idx;
189
190 /* Find the group and compute the minimum value of that group */
191 error_bits = (idx >> PLAT_BITS) - 1;
192 base = 1 << (error_bits + PLAT_BITS);
193
194 /* Find its bucket number of the group */
195 k = idx % PLAT_VAL;
196
197 /* Return the mean of the range of the bucket */
198 return base + ((k + 0.5) * (1 << error_bits));
199}
200
201static void add_lat(struct stats *s, unsigned int us, const char *name)
202{
203 int lat_index = 0;
204
205 if (us > s->max)
206 s->max = us;
207 if (us < s->min)
208 s->min = us;
209
210 if (us > max_us) {
211 fprintf(stderr, "%s latency=%u usec\n", name, us);
212 s->over++;
213 }
214
215 lat_index = plat_val_to_idx(us);
216 __sync_fetch_and_add(&s->plat[lat_index], 1);
217 __sync_fetch_and_add(&s->nr_samples, 1);
218}
219
220static int write_work(struct work_item *work)
221{
6a087e5a 222 struct timespec s, e;
6d5a9bc2
JA
223 ssize_t ret;
224
6a087e5a 225 clock_gettime(CLOCK_MONOTONIC, &s);
6d5a9bc2 226 ret = write(STDOUT_FILENO, work->buf, work->buf_size);
6a087e5a 227 clock_gettime(CLOCK_MONOTONIC, &e);
6d5a9bc2
JA
228 assert(ret == work->buf_size);
229
230 add_lat(&work->writer->s, utime_since(&s, &e), "write");
231 return work->seq + 1;
232}
233
28cacec4
JA
234static void thread_exiting(struct thread_data *thread)
235{
236 __sync_fetch_and_add(&thread->done, 1);
237 pthread_cond_signal(&thread->done_cond);
238}
239
6d5a9bc2
JA
240static void *writer_fn(void *data)
241{
242 struct writer_thread *wt = data;
243 struct work_item *work;
244 unsigned int seq = 1;
245
246 work = NULL;
247 while (!wt->thread.exit || !flist_empty(&wt->list)) {
248 pthread_mutex_lock(&wt->thread.lock);
249
45bf2f40 250 if (work)
6d5a9bc2 251 flist_add_tail(&work->list, &wt->done_list);
6d5a9bc2
JA
252
253 work = find_seq(wt, seq);
254 if (work)
255 flist_del_init(&work->list);
256 else
257 pthread_cond_wait(&wt->thread.cond, &wt->thread.lock);
258
259 pthread_mutex_unlock(&wt->thread.lock);
260
261 if (work)
262 seq = write_work(work);
263 }
264
28cacec4 265 thread_exiting(&wt->thread);
6d5a9bc2
JA
266 return NULL;
267}
268
269static void reader_work(struct work_item *work)
270{
6a087e5a 271 struct timespec s, e;
6d5a9bc2
JA
272 ssize_t ret;
273 size_t left;
274 void *buf;
275 off_t off;
276
6a087e5a 277 clock_gettime(CLOCK_MONOTONIC, &s);
6d5a9bc2
JA
278
279 left = work->buf_size;
280 buf = work->buf;
281 off = work->off;
282 while (left) {
283 ret = pread(work->fd, buf, left, off);
284 if (!ret) {
285 fprintf(stderr, "zero read\n");
286 break;
287 } else if (ret < 0) {
288 fprintf(stderr, "errno=%d\n", errno);
289 break;
290 }
291 left -= ret;
292 off += ret;
293 buf += ret;
294 }
295
6a087e5a 296 clock_gettime(CLOCK_MONOTONIC, &e);
6d5a9bc2
JA
297
298 add_lat(&work->reader->s, utime_since(&s, &e), "read");
299
300 pthread_cond_signal(&work->cond);
301
302 if (separate_writer) {
303 pthread_mutex_lock(&work->writer->thread.lock);
304 flist_add_tail(&work->list, &work->writer->list);
305 pthread_mutex_unlock(&work->writer->thread.lock);
306 pthread_cond_signal(&work->writer->thread.cond);
307 } else {
308 struct reader_thread *rt = work->reader;
309 struct work_item *next = NULL;
310 struct flist_head *entry;
311
312 /*
313 * Write current work if it matches in sequence.
314 */
315 if (work->seq == rt->write_seq)
316 goto write_it;
317
318 pthread_mutex_lock(&rt->thread.lock);
319
320 flist_add_tail(&work->list, &rt->done_list);
321
322 /*
323 * See if the next work item is here, if so, write it
324 */
325 work = NULL;
326 flist_for_each(entry, &rt->done_list) {
327 next = flist_entry(entry, struct work_item, list);
328 if (next->seq == rt->write_seq) {
329 work = next;
330 flist_del(&work->list);
331 break;
332 }
333 }
334
335 pthread_mutex_unlock(&rt->thread.lock);
336
337 if (work) {
338write_it:
339 write_work(work);
340 __sync_fetch_and_add(&rt->write_seq, 1);
341 }
342 }
343}
344
345static void *reader_one_off(void *data)
346{
347 reader_work(data);
348 return NULL;
349}
350
351static void *reader_fn(void *data)
352{
353 struct reader_thread *rt = data;
354 struct work_item *work;
355
356 while (!rt->thread.exit || !flist_empty(&rt->list)) {
357 work = NULL;
358 pthread_mutex_lock(&rt->thread.lock);
359 if (!flist_empty(&rt->list)) {
360 work = flist_first_entry(&rt->list, struct work_item, list);
361 flist_del_init(&work->list);
362 } else
363 pthread_cond_wait(&rt->thread.cond, &rt->thread.lock);
364 pthread_mutex_unlock(&rt->thread.lock);
365
366 if (work) {
28cacec4 367 __sync_fetch_and_add(&rt->busy, 1);
6d5a9bc2 368 reader_work(work);
28cacec4 369 __sync_fetch_and_sub(&rt->busy, 1);
6d5a9bc2
JA
370 }
371 }
372
28cacec4 373 thread_exiting(&rt->thread);
6d5a9bc2
JA
374 return NULL;
375}
376
377static void queue_work(struct reader_thread *rt, struct work_item *work)
378{
379 if (!rt->started) {
380 pthread_mutex_lock(&rt->thread.lock);
381 flist_add_tail(&work->list, &rt->list);
382 pthread_mutex_unlock(&rt->thread.lock);
383
384 rt->started = 1;
385 pthread_create(&rt->thread.thread, NULL, reader_fn, rt);
386 } else if (!rt->busy && !pthread_mutex_trylock(&rt->thread.lock)) {
387 flist_add_tail(&work->list, &rt->list);
388 pthread_mutex_unlock(&rt->thread.lock);
389
390 pthread_cond_signal(&rt->thread.cond);
391 } else {
392 int ret = pthread_create(&work->thread, NULL, reader_one_off, work);
2c1520d7 393 if (ret) {
6d5a9bc2 394 fprintf(stderr, "pthread_create=%d\n", ret);
2c1520d7
BVA
395 } else {
396 ret = pthread_detach(work->thread);
397 if (ret)
398 fprintf(stderr, "pthread_detach=%d\n", ret);
399 }
6d5a9bc2
JA
400 }
401}
402
403static unsigned int calc_percentiles(unsigned int *io_u_plat, unsigned long nr,
404 unsigned int **output)
405{
406 unsigned long sum = 0;
407 unsigned int len, i, j = 0;
408 unsigned int oval_len = 0;
409 unsigned int *ovals = NULL;
410 int is_last;
411
412 len = 0;
413 while (len < PLAT_LIST_MAX && plist[len] != 0.0)
414 len++;
415
416 if (!len)
417 return 0;
418
419 /*
420 * Calculate bucket values, note down max and min values
421 */
422 is_last = 0;
423 for (i = 0; i < PLAT_NR && !is_last; i++) {
424 sum += io_u_plat[i];
425 while (sum >= (plist[j] / 100.0 * nr)) {
426 assert(plist[j] <= 100.0);
427
428 if (j == oval_len) {
429 oval_len += 100;
430 ovals = realloc(ovals, oval_len * sizeof(unsigned int));
431 }
432
433 ovals[j] = plat_idx_to_val(i);
434 is_last = (j == len - 1);
435 if (is_last)
436 break;
437
438 j++;
439 }
440 }
441
442 *output = ovals;
443 return len;
444}
445
446static void show_latencies(struct stats *s, const char *msg)
447{
448 unsigned int *ovals = NULL;
449 unsigned int len, i;
450
451 len = calc_percentiles(s->plat, s->nr_samples, &ovals);
452 if (len) {
453 fprintf(stderr, "Latency percentiles (usec) (%s)\n", msg);
454 for (i = 0; i < len; i++)
455 fprintf(stderr, "\t%2.4fth: %u\n", plist[i], ovals[i]);
456 }
457
458 if (ovals)
459 free(ovals);
460
461 fprintf(stderr, "\tOver=%u, min=%u, max=%u\n", s->over, s->min, s->max);
462}
463
464static void init_thread(struct thread_data *thread)
465{
78b66d32
BVA
466 pthread_condattr_t cattr;
467 int ret;
468
469 ret = pthread_condattr_init(&cattr);
470 assert(ret == 0);
471#ifdef CONFIG_PTHREAD_CONDATTR_SETCLOCK
472 ret = pthread_condattr_setclock(&cattr, CLOCK_MONOTONIC);
473 assert(ret == 0);
474#endif
475 pthread_cond_init(&thread->cond, &cattr);
476 pthread_cond_init(&thread->done_cond, &cattr);
6d5a9bc2
JA
477 pthread_mutex_init(&thread->lock, NULL);
478 pthread_mutex_init(&thread->done_lock, NULL);
479 thread->exit = 0;
480}
481
482static void exit_thread(struct thread_data *thread,
483 void fn(struct writer_thread *),
484 struct writer_thread *wt)
485{
28cacec4 486 __sync_fetch_and_add(&thread->exit, 1);
6d5a9bc2
JA
487 pthread_cond_signal(&thread->cond);
488
489 while (!thread->done) {
490 pthread_mutex_lock(&thread->done_lock);
491
492 if (fn) {
53280a1d 493 struct timespec ts;
6d5a9bc2 494
78b66d32
BVA
495#ifdef CONFIG_PTHREAD_CONDATTR_SETCLOCK
496 clock_gettime(CLOCK_MONOTONIC, &ts);
497#else
498 clock_gettime(CLOCK_REALTIME, &ts);
499#endif
500 ts.tv_sec++;
6d5a9bc2 501
53280a1d 502 pthread_cond_timedwait(&thread->done_cond, &thread->done_lock, &ts);
6d5a9bc2
JA
503 fn(wt);
504 } else
505 pthread_cond_wait(&thread->done_cond, &thread->done_lock);
506
507 pthread_mutex_unlock(&thread->done_lock);
508 }
509}
510
511static int usage(char *argv[])
512{
513 fprintf(stderr, "%s: [-b blocksize] [-t max usec] [-w separate writer] -f file\n", argv[0]);
514 return 1;
515}
516
517static int parse_options(int argc, char *argv[])
518{
519 int c;
520
521 while ((c = getopt(argc, argv, "f:b:t:w:")) != -1) {
522 switch (c) {
523 case 'f':
a06aec04
BVA
524 if (file)
525 return usage(argv);
6d5a9bc2
JA
526 file = strdup(optarg);
527 break;
528 case 'b':
529 bs = atoi(optarg);
530 break;
531 case 't':
532 max_us = atoi(optarg);
533 break;
534 case 'w':
535 separate_writer = atoi(optarg);
536 if (!separate_writer)
537 fprintf(stderr, "inline writing is broken\n");
538 break;
539 case '?':
540 default:
541 return usage(argv);
542 }
543 }
544
545 if (!file)
546 return usage(argv);
547
548 return 0;
549}
550
551static void prune_done_entries(struct writer_thread *wt)
552{
553 FLIST_HEAD(list);
554
555 if (flist_empty(&wt->done_list))
556 return;
557
558 if (pthread_mutex_trylock(&wt->thread.lock))
559 return;
560
561 if (!flist_empty(&wt->done_list))
562 flist_splice_init(&wt->done_list, &list);
563 pthread_mutex_unlock(&wt->thread.lock);
564
565 while (!flist_empty(&list)) {
566 struct work_item *work;
567
568 work = flist_first_entry(&list, struct work_item, list);
569 flist_del(&work->list);
570
571 pthread_cond_destroy(&work->cond);
572 pthread_mutex_destroy(&work->lock);
573 free(work->buf);
574 free(work);
575 }
576}
577
578int main(int argc, char *argv[])
579{
78b66d32 580 pthread_condattr_t cattr;
6a087e5a 581 struct timespec s, re, we;
6d5a9bc2
JA
582 struct reader_thread *rt;
583 struct writer_thread *wt;
584 unsigned long rate;
b38cc6cb 585 uint64_t elapsed;
6d5a9bc2
JA
586 struct stat sb;
587 size_t bytes;
588 off_t off;
589 int fd, seq;
78b66d32 590 int ret;
6d5a9bc2
JA
591
592 if (parse_options(argc, argv))
593 return 1;
594
595 fd = open(file, O_RDONLY);
596 if (fd < 0) {
597 perror("open");
598 return 2;
599 }
600
601 if (fstat(fd, &sb) < 0) {
602 perror("stat");
603 return 3;
604 }
605
606 wt = &writer_thread;
607 init_thread(&wt->thread);
608 INIT_FLIST_HEAD(&wt->list);
609 INIT_FLIST_HEAD(&wt->done_list);
610 wt->s.max = 0;
611 wt->s.min = -1U;
612 pthread_create(&wt->thread.thread, NULL, writer_fn, wt);
613
614 rt = &reader_thread;
615 init_thread(&rt->thread);
616 INIT_FLIST_HEAD(&rt->list);
617 INIT_FLIST_HEAD(&rt->done_list);
618 rt->s.max = 0;
619 rt->s.min = -1U;
620 rt->write_seq = 1;
621
622 off = 0;
623 seq = 0;
624 bytes = 0;
625
78b66d32
BVA
626 ret = pthread_condattr_init(&cattr);
627 assert(ret == 0);
628#ifdef CONFIG_PTHREAD_CONDATTR_SETCLOCK
629 ret = pthread_condattr_setclock(&cattr, CLOCK_MONOTONIC);
630 assert(ret == 0);
631#endif
632
6a087e5a 633 clock_gettime(CLOCK_MONOTONIC, &s);
6d5a9bc2
JA
634
635 while (sb.st_size) {
636 struct work_item *work;
637 size_t this_len;
53280a1d 638 struct timespec ts;
6d5a9bc2
JA
639
640 prune_done_entries(wt);
641
642 this_len = sb.st_size;
643 if (this_len > bs)
644 this_len = bs;
645
646 work = calloc(1, sizeof(*work));
647 work->buf = malloc(this_len);
648 work->buf_size = this_len;
649 work->off = off;
650 work->fd = fd;
651 work->seq = ++seq;
652 work->writer = wt;
653 work->reader = rt;
78b66d32 654 pthread_cond_init(&work->cond, &cattr);
6d5a9bc2
JA
655 pthread_mutex_init(&work->lock, NULL);
656
657 queue_work(rt, work);
658
78b66d32
BVA
659#ifdef CONFIG_PTHREAD_CONDATTR_SETCLOCK
660 clock_gettime(CLOCK_MONOTONIC, &ts);
661#else
662 clock_gettime(CLOCK_REALTIME, &ts);
663#endif
53280a1d
JA
664 ts.tv_nsec += max_us * 1000ULL;
665 if (ts.tv_nsec >= 1000000000ULL) {
666 ts.tv_nsec -= 1000000000ULL;
667 ts.tv_sec++;
6d5a9bc2
JA
668 }
669
670 pthread_mutex_lock(&work->lock);
53280a1d 671 pthread_cond_timedwait(&work->cond, &work->lock, &ts);
6d5a9bc2
JA
672 pthread_mutex_unlock(&work->lock);
673
674 off += this_len;
675 sb.st_size -= this_len;
676 bytes += this_len;
677 }
678
679 exit_thread(&rt->thread, NULL, NULL);
6a087e5a 680 clock_gettime(CLOCK_MONOTONIC, &re);
6d5a9bc2
JA
681
682 exit_thread(&wt->thread, prune_done_entries, wt);
6a087e5a 683 clock_gettime(CLOCK_MONOTONIC, &we);
6d5a9bc2
JA
684
685 show_latencies(&rt->s, "READERS");
686 show_latencies(&wt->s, "WRITERS");
687
688 bytes /= 1024;
b38cc6cb
BVA
689 elapsed = utime_since(&s, &re);
690 rate = elapsed ? (bytes * 1000UL * 1000UL) / elapsed : 0;
4870138d 691 fprintf(stderr, "Read rate (KiB/sec) : %lu\n", rate);
b38cc6cb
BVA
692 elapsed = utime_since(&s, &we);
693 rate = elapsed ? (bytes * 1000UL * 1000UL) / elapsed : 0;
4870138d 694 fprintf(stderr, "Write rate (KiB/sec): %lu\n", rate);
6d5a9bc2
JA
695
696 close(fd);
697 return 0;
698}