2 * Read a file and write the contents to stdout. If a given read takes
3 * longer than 'max_us' time, then we schedule a new thread to handle
4 * the next read. This avoids the coordinated omission problem, where
5 * one request appears to take a long time, but in reality a lot of
6 * requests would have been slow, but we don't notice since new submissions
7 * are not being issued if just 1 is held up.
11 * $ time (./read-to-pipe-async -f randfile.gz | gzip -dc > outfile; sync)
13 * This will read randfile.gz and log the latencies of doing so, while
14 * piping the output to gzip to decompress it. Any latencies over max_us
15 * are logged when they happen, and latency buckets are displayed at the
18 * gcc -Wall -g -O2 -o read-to-pipe-async read-to-pipe-async.c -lpthread
20 * Copyright (C) 2016 Jens Axboe
29 #include <sys/types.h>
40 #include "compiler/compiler.h"
43 static int max_us = 10000;
45 static int separate_writer = 1;
48 #define PLAT_VAL (1 << PLAT_BITS)
49 #define PLAT_GROUP_NR 19
50 #define PLAT_NR (PLAT_GROUP_NR * PLAT_VAL)
51 #define PLAT_LIST_MAX 20
54 #define CHECK_ZERO_OR_ABORT(code) assert(code)
56 #define CHECK_ZERO_OR_ABORT(code) \
58 if (fio_unlikely((code) != 0)) { \
59 log_err("failed checking code %i != 0", (code)); \
66 unsigned int plat[PLAT_NR];
67 unsigned int nr_samples;
73 static double plist[PLAT_LIST_MAX] = { 50.0, 75.0, 90.0, 95.0, 99.0, 99.5, 99.9, 99.99, 99.999, 99.9999, };
80 pthread_mutex_t done_lock;
81 pthread_cond_t done_cond;
85 struct writer_thread {
86 struct flist_head list;
87 struct flist_head done_list;
89 struct thread_data thread;
92 struct reader_thread {
93 struct flist_head list;
94 struct flist_head done_list;
99 struct thread_data thread;
103 struct flist_head list;
109 struct writer_thread *writer;
110 struct reader_thread *reader;
111 pthread_mutex_t lock;
116 static struct reader_thread reader_thread;
117 static struct writer_thread writer_thread;
119 uint64_t utime_since(const struct timespec *s, const struct timespec *e)
124 sec = e->tv_sec - s->tv_sec;
125 usec = (e->tv_nsec - s->tv_nsec) / 1000;
126 if (sec > 0 && usec < 0) {
131 if (sec < 0 || (sec == 0 && usec < 0))
134 ret = sec * 1000000ULL + usec;
139 static struct work_item *find_seq(struct writer_thread *w, int seq)
141 struct work_item *work;
142 struct flist_head *entry;
144 if (flist_empty(&w->list))
147 flist_for_each(entry, &w->list) {
148 work = flist_entry(entry, struct work_item, list);
149 if (work->seq == seq)
156 static unsigned int plat_val_to_idx(unsigned int val)
158 unsigned int msb, error_bits, base, offset;
160 /* Find MSB starting from bit 0 */
164 msb = sizeof(val)*8 - __builtin_clz(val) - 1;
167 * MSB <= (PLAT_BITS-1), cannot be rounded off. Use
168 * all bits of the sample as index
170 if (msb <= PLAT_BITS)
173 /* Compute the number of error bits to discard*/
174 error_bits = msb - PLAT_BITS;
176 /* Compute the number of buckets before the group */
177 base = (error_bits + 1) << PLAT_BITS;
180 * Discard the error bits and apply the mask to find the
181 * index for the buckets in the group
183 offset = (PLAT_VAL - 1) & (val >> error_bits);
185 /* Make sure the index does not exceed (array size - 1) */
186 return (base + offset) < (PLAT_NR - 1) ?
187 (base + offset) : (PLAT_NR - 1);
191 * Convert the given index of the bucket array to the value
192 * represented by the bucket
194 static unsigned int plat_idx_to_val(unsigned int idx)
196 unsigned int error_bits, k, base;
198 assert(idx < PLAT_NR);
200 /* MSB <= (PLAT_BITS-1), cannot be rounded off. Use
201 * all bits of the sample as index */
202 if (idx < (PLAT_VAL << 1))
205 /* Find the group and compute the minimum value of that group */
206 error_bits = (idx >> PLAT_BITS) - 1;
207 base = 1 << (error_bits + PLAT_BITS);
209 /* Find its bucket number of the group */
212 /* Return the mean of the range of the bucket */
213 return base + ((k + 0.5) * (1 << error_bits));
216 static void add_lat(struct stats *s, unsigned int us, const char *name)
226 fprintf(stderr, "%s latency=%u usec\n", name, us);
230 lat_index = plat_val_to_idx(us);
231 __sync_fetch_and_add(&s->plat[lat_index], 1);
232 __sync_fetch_and_add(&s->nr_samples, 1);
235 static int write_work(struct work_item *work)
237 struct timespec s, e;
240 clock_gettime(CLOCK_MONOTONIC, &s);
241 ret = write(STDOUT_FILENO, work->buf, work->buf_size);
244 clock_gettime(CLOCK_MONOTONIC, &e);
245 assert(ret == work->buf_size);
247 add_lat(&work->writer->s, utime_since(&s, &e), "write");
248 return work->seq + 1;
251 static void thread_exiting(struct thread_data *thread)
253 __sync_fetch_and_add(&thread->done, 1);
254 pthread_cond_signal(&thread->done_cond);
257 static void *writer_fn(void *data)
259 struct writer_thread *wt = data;
260 struct work_item *work;
264 while (!(seq < 0) && (!wt->thread.exit || !flist_empty(&wt->list))) {
265 pthread_mutex_lock(&wt->thread.lock);
268 flist_add_tail(&work->list, &wt->done_list);
270 work = find_seq(wt, seq);
272 flist_del_init(&work->list);
274 pthread_cond_wait(&wt->thread.cond, &wt->thread.lock);
276 pthread_mutex_unlock(&wt->thread.lock);
279 seq = write_work(work);
282 thread_exiting(&wt->thread);
286 static void reader_work(struct work_item *work)
288 struct timespec s, e;
294 clock_gettime(CLOCK_MONOTONIC, &s);
296 left = work->buf_size;
300 ret = pread(work->fd, buf, left, off);
302 fprintf(stderr, "zero read\n");
304 } else if (ret < 0) {
305 fprintf(stderr, "errno=%d\n", errno);
313 clock_gettime(CLOCK_MONOTONIC, &e);
315 add_lat(&work->reader->s, utime_since(&s, &e), "read");
317 pthread_cond_signal(&work->cond);
319 if (separate_writer) {
320 pthread_mutex_lock(&work->writer->thread.lock);
321 flist_add_tail(&work->list, &work->writer->list);
322 pthread_mutex_unlock(&work->writer->thread.lock);
323 pthread_cond_signal(&work->writer->thread.cond);
325 struct reader_thread *rt = work->reader;
326 struct work_item *next = NULL;
327 struct flist_head *entry;
330 * Write current work if it matches in sequence.
332 if (work->seq == rt->write_seq)
335 pthread_mutex_lock(&rt->thread.lock);
337 flist_add_tail(&work->list, &rt->done_list);
340 * See if the next work item is here, if so, write it
343 flist_for_each(entry, &rt->done_list) {
344 next = flist_entry(entry, struct work_item, list);
345 if (next->seq == rt->write_seq) {
347 flist_del(&work->list);
352 pthread_mutex_unlock(&rt->thread.lock);
357 __sync_fetch_and_add(&rt->write_seq, 1);
362 static void *reader_one_off(void *data)
368 static void *reader_fn(void *data)
370 struct reader_thread *rt = data;
371 struct work_item *work;
373 while (!rt->thread.exit || !flist_empty(&rt->list)) {
375 pthread_mutex_lock(&rt->thread.lock);
376 if (!flist_empty(&rt->list)) {
377 work = flist_first_entry(&rt->list, struct work_item, list);
378 flist_del_init(&work->list);
380 pthread_cond_wait(&rt->thread.cond, &rt->thread.lock);
381 pthread_mutex_unlock(&rt->thread.lock);
384 __sync_fetch_and_add(&rt->busy, 1);
386 __sync_fetch_and_sub(&rt->busy, 1);
390 thread_exiting(&rt->thread);
394 static void queue_work(struct reader_thread *rt, struct work_item *work)
397 pthread_mutex_lock(&rt->thread.lock);
398 flist_add_tail(&work->list, &rt->list);
399 pthread_mutex_unlock(&rt->thread.lock);
402 pthread_create(&rt->thread.thread, NULL, reader_fn, rt);
403 } else if (!rt->busy && !pthread_mutex_trylock(&rt->thread.lock)) {
404 flist_add_tail(&work->list, &rt->list);
405 pthread_mutex_unlock(&rt->thread.lock);
407 pthread_cond_signal(&rt->thread.cond);
409 int ret = pthread_create(&work->thread, NULL, reader_one_off, work);
411 fprintf(stderr, "pthread_create=%d\n", ret);
413 ret = pthread_detach(work->thread);
415 fprintf(stderr, "pthread_detach=%d\n", ret);
420 static unsigned int calc_percentiles(unsigned int *io_u_plat, unsigned long nr,
421 unsigned int **output)
423 unsigned long sum = 0;
424 unsigned int len, i, j = 0;
425 unsigned int oval_len = 0;
426 unsigned int *ovals = NULL;
430 while (len < PLAT_LIST_MAX && plist[len] != 0.0)
437 * Calculate bucket values, note down max and min values
440 for (i = 0; i < PLAT_NR && !is_last; i++) {
442 while (sum >= (plist[j] / 100.0 * nr)) {
443 assert(plist[j] <= 100.0);
447 ovals = realloc(ovals, oval_len * sizeof(unsigned int));
450 ovals[j] = plat_idx_to_val(i);
451 is_last = (j == len - 1);
463 static void show_latencies(struct stats *s, const char *msg)
465 unsigned int *ovals = NULL;
468 len = calc_percentiles(s->plat, s->nr_samples, &ovals);
470 fprintf(stderr, "Latency percentiles (usec) (%s)\n", msg);
471 for (i = 0; i < len; i++)
472 fprintf(stderr, "\t%2.4fth: %u\n", plist[i], ovals[i]);
478 fprintf(stderr, "\tOver=%u, min=%u, max=%u\n", s->over, s->min, s->max);
481 static void init_thread(struct thread_data *thread)
483 pthread_condattr_t cattr;
486 ret = pthread_condattr_init(&cattr);
487 CHECK_ZERO_OR_ABORT(ret);
488 #ifdef CONFIG_PTHREAD_CONDATTR_SETCLOCK
489 ret = pthread_condattr_setclock(&cattr, CLOCK_MONOTONIC);
490 CHECK_ZERO_OR_ABORT(ret);
492 pthread_cond_init(&thread->cond, &cattr);
493 pthread_cond_init(&thread->done_cond, &cattr);
494 pthread_mutex_init(&thread->lock, NULL);
495 pthread_mutex_init(&thread->done_lock, NULL);
499 static void exit_thread(struct thread_data *thread,
500 void fn(struct writer_thread *),
501 struct writer_thread *wt)
503 __sync_fetch_and_add(&thread->exit, 1);
504 pthread_cond_signal(&thread->cond);
506 while (!thread->done) {
507 pthread_mutex_lock(&thread->done_lock);
512 #ifdef CONFIG_PTHREAD_CONDATTR_SETCLOCK
513 clock_gettime(CLOCK_MONOTONIC, &ts);
515 clock_gettime(CLOCK_REALTIME, &ts);
519 pthread_cond_timedwait(&thread->done_cond, &thread->done_lock, &ts);
522 pthread_cond_wait(&thread->done_cond, &thread->done_lock);
524 pthread_mutex_unlock(&thread->done_lock);
528 static int usage(char *argv[])
530 fprintf(stderr, "%s: [-b blocksize] [-t max usec] [-w separate writer] -f file\n", argv[0]);
534 static int parse_options(int argc, char *argv[])
538 while ((c = getopt(argc, argv, "f:b:t:w:")) != -1) {
543 file = strdup(optarg);
549 max_us = atoi(optarg);
552 separate_writer = atoi(optarg);
553 if (!separate_writer)
554 fprintf(stderr, "inline writing is broken\n");
568 static void prune_done_entries(struct writer_thread *wt)
572 if (flist_empty(&wt->done_list))
575 if (pthread_mutex_trylock(&wt->thread.lock))
578 if (!flist_empty(&wt->done_list))
579 flist_splice_init(&wt->done_list, &list);
580 pthread_mutex_unlock(&wt->thread.lock);
582 while (!flist_empty(&list)) {
583 struct work_item *work;
585 work = flist_first_entry(&list, struct work_item, list);
586 flist_del(&work->list);
588 pthread_cond_destroy(&work->cond);
589 pthread_mutex_destroy(&work->lock);
595 int main(int argc, char *argv[])
597 pthread_condattr_t cattr;
598 struct timespec s, re, we;
599 struct reader_thread *rt;
600 struct writer_thread *wt;
609 if (parse_options(argc, argv))
612 fd = open(file, O_RDONLY);
618 if (fstat(fd, &sb) < 0) {
624 init_thread(&wt->thread);
625 INIT_FLIST_HEAD(&wt->list);
626 INIT_FLIST_HEAD(&wt->done_list);
629 pthread_create(&wt->thread.thread, NULL, writer_fn, wt);
632 init_thread(&rt->thread);
633 INIT_FLIST_HEAD(&rt->list);
634 INIT_FLIST_HEAD(&rt->done_list);
643 ret = pthread_condattr_init(&cattr);
644 CHECK_ZERO_OR_ABORT(ret);
645 #ifdef CONFIG_PTHREAD_CONDATTR_SETCLOCK
646 ret = pthread_condattr_setclock(&cattr, CLOCK_MONOTONIC);
647 CHECK_ZERO_OR_ABORT(ret);
650 clock_gettime(CLOCK_MONOTONIC, &s);
653 struct work_item *work;
657 prune_done_entries(wt);
659 this_len = sb.st_size;
663 work = calloc(1, sizeof(*work));
664 work->buf = malloc(this_len);
665 work->buf_size = this_len;
671 pthread_cond_init(&work->cond, &cattr);
672 pthread_mutex_init(&work->lock, NULL);
674 queue_work(rt, work);
676 #ifdef CONFIG_PTHREAD_CONDATTR_SETCLOCK
677 clock_gettime(CLOCK_MONOTONIC, &ts);
679 clock_gettime(CLOCK_REALTIME, &ts);
681 ts.tv_nsec += max_us * 1000ULL;
682 if (ts.tv_nsec >= 1000000000ULL) {
683 ts.tv_nsec -= 1000000000ULL;
687 pthread_mutex_lock(&work->lock);
688 pthread_cond_timedwait(&work->cond, &work->lock, &ts);
689 pthread_mutex_unlock(&work->lock);
692 sb.st_size -= this_len;
696 exit_thread(&rt->thread, NULL, NULL);
697 clock_gettime(CLOCK_MONOTONIC, &re);
699 exit_thread(&wt->thread, prune_done_entries, wt);
700 clock_gettime(CLOCK_MONOTONIC, &we);
702 show_latencies(&rt->s, "READERS");
703 show_latencies(&wt->s, "WRITERS");
706 elapsed = utime_since(&s, &re);
707 rate = elapsed ? (bytes * 1000UL * 1000UL) / elapsed : 0;
708 fprintf(stderr, "Read rate (KiB/sec) : %lu\n", rate);
709 elapsed = utime_since(&s, &we);
710 rate = elapsed ? (bytes * 1000UL * 1000UL) / elapsed : 0;
711 fprintf(stderr, "Write rate (KiB/sec): %lu\n", rate);