t/read-to-pipe-async: standalone test app
[fio.git] / t / read-to-pipe-async.c
CommitLineData
6d5a9bc2
JA
1/*
2 * Read a file and write the contents to stdout. If a given read takes
3 * longer than 'max_us' time, then we schedule a new thread to handle
4 * the next read. This avoids the coordinated omission problem, where
5 * one request appears to take a long time, but in reality a lot of
6 * requests would have been slow, but we don't notice since new submissions
7 * are not being issued if just 1 is held up.
8 *
9 * One test case:
10 *
11 * $ time (./read-to-pipe-async -f randfile.gz | gzip -dc > outfile; sync)
12 *
13 * This will read randfile.gz and log the latencies of doing so, while
14 * piping the output to gzip to decompress it. Any latencies over max_us
15 * are logged when they happen, and latency buckets are displayed at the
16 * end of the run
17 *
18 * gcc -Wall -g -O2 -o read-to-pipe-async read-to-pipe-async.c -lpthread
19 *
20 * Copyright (C) 2016 Jens Axboe
21 *
22 */
23#include <stdio.h>
24#include <stdlib.h>
25#include <unistd.h>
26#include <fcntl.h>
27#include <sys/time.h>
28#include <sys/types.h>
29#include <sys/stat.h>
30#include <inttypes.h>
31#include <string.h>
32#include <pthread.h>
33#include <errno.h>
34#include <assert.h>
35
36#include "../flist.h"
37
38static int bs = 4096;
39static int max_us = 10000;
40static char *file;
41static int separate_writer = 1;
42
43#define PLAT_BITS 8
44#define PLAT_VAL (1 << PLAT_BITS)
45#define PLAT_GROUP_NR 19
46#define PLAT_NR (PLAT_GROUP_NR * PLAT_VAL)
47#define PLAT_LIST_MAX 20
48
49struct stats {
50 unsigned int plat[PLAT_NR];
51 unsigned int nr_samples;
52 unsigned int max;
53 unsigned int min;
54 unsigned int over;
55};
56
57static double plist[PLAT_LIST_MAX] = { 50.0, 75.0, 90.0, 95.0, 99.0, 99.5, 99.9, 99.99, 99.999, 99.9999, };
58
59struct thread_data {
60 int exit;
61 int done;
62 pthread_mutex_t lock;
63 pthread_cond_t cond;
64 pthread_mutex_t done_lock;
65 pthread_cond_t done_cond;
66 pthread_t thread;
67};
68
69struct writer_thread {
70 struct flist_head list;
71 struct flist_head done_list;
72 struct stats s;
73 struct thread_data thread;
74};
75
76struct reader_thread {
77 struct flist_head list;
78 struct flist_head done_list;
79 int started;
80 int busy;
81 int write_seq;
82 struct stats s;
83 struct thread_data thread;
84};
85
86struct work_item {
87 struct flist_head list;
88 void *buf;
89 size_t buf_size;
90 off_t off;
91 int fd;
92 int seq;
93 struct writer_thread *writer;
94 struct reader_thread *reader;
95 pthread_mutex_t lock;
96 pthread_cond_t cond;
97 pthread_t thread;
98};
99
100static struct reader_thread reader_thread;
101static struct writer_thread writer_thread;
102
103uint64_t utime_since(const struct timeval *s, const struct timeval *e)
104{
105 long sec, usec;
106 uint64_t ret;
107
108 sec = e->tv_sec - s->tv_sec;
109 usec = e->tv_usec - s->tv_usec;
110 if (sec > 0 && usec < 0) {
111 sec--;
112 usec += 1000000;
113 }
114
115 if (sec < 0 || (sec == 0 && usec < 0))
116 return 0;
117
118 ret = sec * 1000000ULL + usec;
119
120 return ret;
121}
122
123static struct work_item *find_seq(struct writer_thread *w, unsigned int seq)
124{
125 struct work_item *work;
126 struct flist_head *entry;
127
128 if (flist_empty(&w->list))
129 return NULL;
130
131 flist_for_each(entry, &w->list) {
132 work = flist_entry(entry, struct work_item, list);
133 if (work->seq == seq)
134 return work;
135 }
136
137 return NULL;
138}
139
140static unsigned int plat_val_to_idx(unsigned int val)
141{
142 unsigned int msb, error_bits, base, offset;
143
144 /* Find MSB starting from bit 0 */
145 if (val == 0)
146 msb = 0;
147 else
148 msb = sizeof(val)*8 - __builtin_clz(val) - 1;
149
150 /*
151 * MSB <= (PLAT_BITS-1), cannot be rounded off. Use
152 * all bits of the sample as index
153 */
154 if (msb <= PLAT_BITS)
155 return val;
156
157 /* Compute the number of error bits to discard*/
158 error_bits = msb - PLAT_BITS;
159
160 /* Compute the number of buckets before the group */
161 base = (error_bits + 1) << PLAT_BITS;
162
163 /*
164 * Discard the error bits and apply the mask to find the
165 * index for the buckets in the group
166 */
167 offset = (PLAT_VAL - 1) & (val >> error_bits);
168
169 /* Make sure the index does not exceed (array size - 1) */
170 return (base + offset) < (PLAT_NR - 1) ?
171 (base + offset) : (PLAT_NR - 1);
172}
173
174/*
175 * Convert the given index of the bucket array to the value
176 * represented by the bucket
177 */
178static unsigned int plat_idx_to_val(unsigned int idx)
179{
180 unsigned int error_bits, k, base;
181
182 assert(idx < PLAT_NR);
183
184 /* MSB <= (PLAT_BITS-1), cannot be rounded off. Use
185 * all bits of the sample as index */
186 if (idx < (PLAT_VAL << 1))
187 return idx;
188
189 /* Find the group and compute the minimum value of that group */
190 error_bits = (idx >> PLAT_BITS) - 1;
191 base = 1 << (error_bits + PLAT_BITS);
192
193 /* Find its bucket number of the group */
194 k = idx % PLAT_VAL;
195
196 /* Return the mean of the range of the bucket */
197 return base + ((k + 0.5) * (1 << error_bits));
198}
199
200static void add_lat(struct stats *s, unsigned int us, const char *name)
201{
202 int lat_index = 0;
203
204 if (us > s->max)
205 s->max = us;
206 if (us < s->min)
207 s->min = us;
208
209 if (us > max_us) {
210 fprintf(stderr, "%s latency=%u usec\n", name, us);
211 s->over++;
212 }
213
214 lat_index = plat_val_to_idx(us);
215 __sync_fetch_and_add(&s->plat[lat_index], 1);
216 __sync_fetch_and_add(&s->nr_samples, 1);
217}
218
219static int write_work(struct work_item *work)
220{
221 struct timeval s, e;
222 ssize_t ret;
223
224 gettimeofday(&s, NULL);
225 ret = write(STDOUT_FILENO, work->buf, work->buf_size);
226 gettimeofday(&e, NULL);
227 assert(ret == work->buf_size);
228
229 add_lat(&work->writer->s, utime_since(&s, &e), "write");
230 return work->seq + 1;
231}
232
233static void *writer_fn(void *data)
234{
235 struct writer_thread *wt = data;
236 struct work_item *work;
237 unsigned int seq = 1;
238
239 work = NULL;
240 while (!wt->thread.exit || !flist_empty(&wt->list)) {
241 pthread_mutex_lock(&wt->thread.lock);
242
243 if (work) {
244 flist_add_tail(&work->list, &wt->done_list);
245 work = NULL;
246 }
247
248 work = find_seq(wt, seq);
249 if (work)
250 flist_del_init(&work->list);
251 else
252 pthread_cond_wait(&wt->thread.cond, &wt->thread.lock);
253
254 pthread_mutex_unlock(&wt->thread.lock);
255
256 if (work)
257 seq = write_work(work);
258 }
259
260 wt->thread.done = 1;
261 pthread_cond_signal(&wt->thread.done_cond);
262 return NULL;
263}
264
265static void reader_work(struct work_item *work)
266{
267 struct timeval s, e;
268 ssize_t ret;
269 size_t left;
270 void *buf;
271 off_t off;
272
273 gettimeofday(&s, NULL);
274
275 left = work->buf_size;
276 buf = work->buf;
277 off = work->off;
278 while (left) {
279 ret = pread(work->fd, buf, left, off);
280 if (!ret) {
281 fprintf(stderr, "zero read\n");
282 break;
283 } else if (ret < 0) {
284 fprintf(stderr, "errno=%d\n", errno);
285 break;
286 }
287 left -= ret;
288 off += ret;
289 buf += ret;
290 }
291
292 gettimeofday(&e, NULL);
293
294 add_lat(&work->reader->s, utime_since(&s, &e), "read");
295
296 pthread_cond_signal(&work->cond);
297
298 if (separate_writer) {
299 pthread_mutex_lock(&work->writer->thread.lock);
300 flist_add_tail(&work->list, &work->writer->list);
301 pthread_mutex_unlock(&work->writer->thread.lock);
302 pthread_cond_signal(&work->writer->thread.cond);
303 } else {
304 struct reader_thread *rt = work->reader;
305 struct work_item *next = NULL;
306 struct flist_head *entry;
307
308 /*
309 * Write current work if it matches in sequence.
310 */
311 if (work->seq == rt->write_seq)
312 goto write_it;
313
314 pthread_mutex_lock(&rt->thread.lock);
315
316 flist_add_tail(&work->list, &rt->done_list);
317
318 /*
319 * See if the next work item is here, if so, write it
320 */
321 work = NULL;
322 flist_for_each(entry, &rt->done_list) {
323 next = flist_entry(entry, struct work_item, list);
324 if (next->seq == rt->write_seq) {
325 work = next;
326 flist_del(&work->list);
327 break;
328 }
329 }
330
331 pthread_mutex_unlock(&rt->thread.lock);
332
333 if (work) {
334write_it:
335 write_work(work);
336 __sync_fetch_and_add(&rt->write_seq, 1);
337 }
338 }
339}
340
341static void *reader_one_off(void *data)
342{
343 reader_work(data);
344 return NULL;
345}
346
347static void *reader_fn(void *data)
348{
349 struct reader_thread *rt = data;
350 struct work_item *work;
351
352 while (!rt->thread.exit || !flist_empty(&rt->list)) {
353 work = NULL;
354 pthread_mutex_lock(&rt->thread.lock);
355 if (!flist_empty(&rt->list)) {
356 work = flist_first_entry(&rt->list, struct work_item, list);
357 flist_del_init(&work->list);
358 } else
359 pthread_cond_wait(&rt->thread.cond, &rt->thread.lock);
360 pthread_mutex_unlock(&rt->thread.lock);
361
362 if (work) {
363 rt->busy = 1;
364 reader_work(work);
365 rt->busy = 0;
366 }
367 }
368
369 rt->thread.done = 1;
370 pthread_cond_signal(&rt->thread.done_cond);
371 return NULL;
372}
373
374static void queue_work(struct reader_thread *rt, struct work_item *work)
375{
376 if (!rt->started) {
377 pthread_mutex_lock(&rt->thread.lock);
378 flist_add_tail(&work->list, &rt->list);
379 pthread_mutex_unlock(&rt->thread.lock);
380
381 rt->started = 1;
382 pthread_create(&rt->thread.thread, NULL, reader_fn, rt);
383 } else if (!rt->busy && !pthread_mutex_trylock(&rt->thread.lock)) {
384 flist_add_tail(&work->list, &rt->list);
385 pthread_mutex_unlock(&rt->thread.lock);
386
387 pthread_cond_signal(&rt->thread.cond);
388 } else {
389 int ret = pthread_create(&work->thread, NULL, reader_one_off, work);
390 if (ret)
391 fprintf(stderr, "pthread_create=%d\n", ret);
392 else
393 pthread_detach(work->thread);
394 }
395}
396
397static unsigned int calc_percentiles(unsigned int *io_u_plat, unsigned long nr,
398 unsigned int **output)
399{
400 unsigned long sum = 0;
401 unsigned int len, i, j = 0;
402 unsigned int oval_len = 0;
403 unsigned int *ovals = NULL;
404 int is_last;
405
406 len = 0;
407 while (len < PLAT_LIST_MAX && plist[len] != 0.0)
408 len++;
409
410 if (!len)
411 return 0;
412
413 /*
414 * Calculate bucket values, note down max and min values
415 */
416 is_last = 0;
417 for (i = 0; i < PLAT_NR && !is_last; i++) {
418 sum += io_u_plat[i];
419 while (sum >= (plist[j] / 100.0 * nr)) {
420 assert(plist[j] <= 100.0);
421
422 if (j == oval_len) {
423 oval_len += 100;
424 ovals = realloc(ovals, oval_len * sizeof(unsigned int));
425 }
426
427 ovals[j] = plat_idx_to_val(i);
428 is_last = (j == len - 1);
429 if (is_last)
430 break;
431
432 j++;
433 }
434 }
435
436 *output = ovals;
437 return len;
438}
439
440static void show_latencies(struct stats *s, const char *msg)
441{
442 unsigned int *ovals = NULL;
443 unsigned int len, i;
444
445 len = calc_percentiles(s->plat, s->nr_samples, &ovals);
446 if (len) {
447 fprintf(stderr, "Latency percentiles (usec) (%s)\n", msg);
448 for (i = 0; i < len; i++)
449 fprintf(stderr, "\t%2.4fth: %u\n", plist[i], ovals[i]);
450 }
451
452 if (ovals)
453 free(ovals);
454
455 fprintf(stderr, "\tOver=%u, min=%u, max=%u\n", s->over, s->min, s->max);
456}
457
458static void init_thread(struct thread_data *thread)
459{
460 pthread_cond_init(&thread->cond, NULL);
461 pthread_cond_init(&thread->done_cond, NULL);
462 pthread_mutex_init(&thread->lock, NULL);
463 pthread_mutex_init(&thread->done_lock, NULL);
464 thread->exit = 0;
465}
466
467static void exit_thread(struct thread_data *thread,
468 void fn(struct writer_thread *),
469 struct writer_thread *wt)
470{
471 thread->exit = 1;
472 pthread_cond_signal(&thread->cond);
473
474 while (!thread->done) {
475 pthread_mutex_lock(&thread->done_lock);
476
477 if (fn) {
478 struct timespec t;
479
480 clock_gettime(CLOCK_REALTIME, &t);
481 t.tv_sec++;
482
483
484 pthread_cond_timedwait(&thread->done_cond, &thread->done_lock, &t);
485 fn(wt);
486 } else
487 pthread_cond_wait(&thread->done_cond, &thread->done_lock);
488
489 pthread_mutex_unlock(&thread->done_lock);
490 }
491}
492
493static int usage(char *argv[])
494{
495 fprintf(stderr, "%s: [-b blocksize] [-t max usec] [-w separate writer] -f file\n", argv[0]);
496 return 1;
497}
498
499static int parse_options(int argc, char *argv[])
500{
501 int c;
502
503 while ((c = getopt(argc, argv, "f:b:t:w:")) != -1) {
504 switch (c) {
505 case 'f':
506 file = strdup(optarg);
507 break;
508 case 'b':
509 bs = atoi(optarg);
510 break;
511 case 't':
512 max_us = atoi(optarg);
513 break;
514 case 'w':
515 separate_writer = atoi(optarg);
516 if (!separate_writer)
517 fprintf(stderr, "inline writing is broken\n");
518 break;
519 case '?':
520 default:
521 return usage(argv);
522 }
523 }
524
525 if (!file)
526 return usage(argv);
527
528 return 0;
529}
530
531static void prune_done_entries(struct writer_thread *wt)
532{
533 FLIST_HEAD(list);
534
535 if (flist_empty(&wt->done_list))
536 return;
537
538 if (pthread_mutex_trylock(&wt->thread.lock))
539 return;
540
541 if (!flist_empty(&wt->done_list))
542 flist_splice_init(&wt->done_list, &list);
543 pthread_mutex_unlock(&wt->thread.lock);
544
545 while (!flist_empty(&list)) {
546 struct work_item *work;
547
548 work = flist_first_entry(&list, struct work_item, list);
549 flist_del(&work->list);
550
551 pthread_cond_destroy(&work->cond);
552 pthread_mutex_destroy(&work->lock);
553 free(work->buf);
554 free(work);
555 }
556}
557
558int main(int argc, char *argv[])
559{
560 struct timeval s, re, we;
561 struct reader_thread *rt;
562 struct writer_thread *wt;
563 unsigned long rate;
564 struct stat sb;
565 size_t bytes;
566 off_t off;
567 int fd, seq;
568
569 if (parse_options(argc, argv))
570 return 1;
571
572 fd = open(file, O_RDONLY);
573 if (fd < 0) {
574 perror("open");
575 return 2;
576 }
577
578 if (fstat(fd, &sb) < 0) {
579 perror("stat");
580 return 3;
581 }
582
583 wt = &writer_thread;
584 init_thread(&wt->thread);
585 INIT_FLIST_HEAD(&wt->list);
586 INIT_FLIST_HEAD(&wt->done_list);
587 wt->s.max = 0;
588 wt->s.min = -1U;
589 pthread_create(&wt->thread.thread, NULL, writer_fn, wt);
590
591 rt = &reader_thread;
592 init_thread(&rt->thread);
593 INIT_FLIST_HEAD(&rt->list);
594 INIT_FLIST_HEAD(&rt->done_list);
595 rt->s.max = 0;
596 rt->s.min = -1U;
597 rt->write_seq = 1;
598
599 off = 0;
600 seq = 0;
601 bytes = 0;
602
603 gettimeofday(&s, NULL);
604
605 while (sb.st_size) {
606 struct work_item *work;
607 size_t this_len;
608 struct timespec t;
609
610 prune_done_entries(wt);
611
612 this_len = sb.st_size;
613 if (this_len > bs)
614 this_len = bs;
615
616 work = calloc(1, sizeof(*work));
617 work->buf = malloc(this_len);
618 work->buf_size = this_len;
619 work->off = off;
620 work->fd = fd;
621 work->seq = ++seq;
622 work->writer = wt;
623 work->reader = rt;
624 pthread_cond_init(&work->cond, NULL);
625 pthread_mutex_init(&work->lock, NULL);
626
627 queue_work(rt, work);
628
629 clock_gettime(CLOCK_REALTIME, &t);
630 t.tv_nsec += max_us * 1000ULL;
631 if (t.tv_nsec >= 1000000000ULL) {
632 t.tv_nsec -= 1000000000ULL;
633 t.tv_sec++;
634 }
635
636 pthread_mutex_lock(&work->lock);
637 pthread_cond_timedwait(&work->cond, &work->lock, &t);
638 pthread_mutex_unlock(&work->lock);
639
640 off += this_len;
641 sb.st_size -= this_len;
642 bytes += this_len;
643 }
644
645 exit_thread(&rt->thread, NULL, NULL);
646 gettimeofday(&re, NULL);
647
648 exit_thread(&wt->thread, prune_done_entries, wt);
649 gettimeofday(&we, NULL);
650
651 show_latencies(&rt->s, "READERS");
652 show_latencies(&wt->s, "WRITERS");
653
654 bytes /= 1024;
655 rate = (bytes * 1000UL * 1000UL) / utime_since(&s, &re);
656 fprintf(stderr, "Read rate (KB/sec) : %lu\n", rate);
657 rate = (bytes * 1000UL * 1000UL) / utime_since(&s, &we);
658 fprintf(stderr, "Write rate (KB/sec): %lu\n", rate);
659
660 close(fd);
661 return 0;
662}