fio: ioengine flag cleanup
[fio.git] / t / read-to-pipe-async.c
1 /*
2  * Read a file and write the contents to stdout. If a given read takes
3  * longer than 'max_us' time, then we schedule a new thread to handle
4  * the next read. This avoids the coordinated omission problem, where
5  * one request appears to take a long time, but in reality a lot of
6  * requests would have been slow, but we don't notice since new submissions
7  * are not being issued if just 1 is held up.
8  *
9  * One test case:
10  *
11  * $ time (./read-to-pipe-async -f randfile.gz | gzip -dc > outfile; sync)
12  *
13  * This will read randfile.gz and log the latencies of doing so, while
14  * piping the output to gzip to decompress it. Any latencies over max_us
15  * are logged when they happen, and latency buckets are displayed at the
16  * end of the run
17  *
18  * gcc -Wall -g -O2 -o read-to-pipe-async read-to-pipe-async.c -lpthread
19  *
20  * Copyright (C) 2016 Jens Axboe
21  *
22  */
23
24 #include <stdio.h>
25 #include <stdlib.h>
26 #include <unistd.h>
27 #include <fcntl.h>
28 #include <sys/time.h>
29 #include <sys/types.h>
30 #include <sys/stat.h>
31 #include <inttypes.h>
32 #include <string.h>
33 #include <pthread.h>
34 #include <errno.h>
35 #include <assert.h>
36
37 #include "../flist.h"
38
39 static int bs = 4096;
40 static int max_us = 10000;
41 static char *file;
42 static int separate_writer = 1;
43
44 #define PLAT_BITS       8
45 #define PLAT_VAL        (1 << PLAT_BITS)
46 #define PLAT_GROUP_NR   19
47 #define PLAT_NR         (PLAT_GROUP_NR * PLAT_VAL)
48 #define PLAT_LIST_MAX   20
49
50 struct stats {
51         unsigned int plat[PLAT_NR];
52         unsigned int nr_samples;
53         unsigned int max;
54         unsigned int min;
55         unsigned int over;
56 };
57
58 static double plist[PLAT_LIST_MAX] = { 50.0, 75.0, 90.0, 95.0, 99.0, 99.5, 99.9, 99.99, 99.999, 99.9999, };
59
60 struct thread_data {
61         int exit;
62         int done;
63         pthread_mutex_t lock;
64         pthread_cond_t cond;
65         pthread_mutex_t done_lock;
66         pthread_cond_t done_cond;
67         pthread_t thread;
68 };
69
70 struct writer_thread {
71         struct flist_head list;
72         struct flist_head done_list;
73         struct stats s;
74         struct thread_data thread;
75 };
76
77 struct reader_thread {
78         struct flist_head list;
79         struct flist_head done_list;
80         int started;
81         int busy;
82         int write_seq;
83         struct stats s;
84         struct thread_data thread;
85 };
86
87 struct work_item {
88         struct flist_head list;
89         void *buf;
90         size_t buf_size;
91         off_t off;
92         int fd;
93         int seq;
94         struct writer_thread *writer;
95         struct reader_thread *reader;
96         pthread_mutex_t lock;
97         pthread_cond_t cond;
98         pthread_t thread;
99 };
100
101 static struct reader_thread reader_thread;
102 static struct writer_thread writer_thread;
103
104 uint64_t utime_since(const struct timespec *s, const struct timespec *e)
105 {
106         long sec, usec;
107         uint64_t ret;
108
109         sec = e->tv_sec - s->tv_sec;
110         usec = (e->tv_nsec - s->tv_nsec) / 1000;
111         if (sec > 0 && usec < 0) {
112                 sec--;
113                 usec += 1000000;
114         }
115
116         if (sec < 0 || (sec == 0 && usec < 0))
117                 return 0;
118
119         ret = sec * 1000000ULL + usec;
120
121         return ret;
122 }
123
124 static struct work_item *find_seq(struct writer_thread *w, unsigned int seq)
125 {
126         struct work_item *work;
127         struct flist_head *entry;
128
129         if (flist_empty(&w->list))
130                 return NULL;
131
132         flist_for_each(entry, &w->list) {
133                 work = flist_entry(entry, struct work_item, list);
134                 if (work->seq == seq)
135                         return work;
136         }
137
138         return NULL;
139 }
140
141 static unsigned int plat_val_to_idx(unsigned int val)
142 {
143         unsigned int msb, error_bits, base, offset;
144
145         /* Find MSB starting from bit 0 */
146         if (val == 0)
147                 msb = 0;
148         else
149                 msb = sizeof(val)*8 - __builtin_clz(val) - 1;
150
151         /*
152          * MSB <= (PLAT_BITS-1), cannot be rounded off. Use
153          * all bits of the sample as index
154          */
155         if (msb <= PLAT_BITS)
156                 return val;
157
158         /* Compute the number of error bits to discard*/
159         error_bits = msb - PLAT_BITS;
160
161         /* Compute the number of buckets before the group */
162         base = (error_bits + 1) << PLAT_BITS;
163
164         /*
165          * Discard the error bits and apply the mask to find the
166          * index for the buckets in the group
167          */
168         offset = (PLAT_VAL - 1) & (val >> error_bits);
169
170         /* Make sure the index does not exceed (array size - 1) */
171         return (base + offset) < (PLAT_NR - 1) ?
172                 (base + offset) : (PLAT_NR - 1);
173 }
174
175 /*
176  * Convert the given index of the bucket array to the value
177  * represented by the bucket
178  */
179 static unsigned int plat_idx_to_val(unsigned int idx)
180 {
181         unsigned int error_bits, k, base;
182
183         assert(idx < PLAT_NR);
184
185         /* MSB <= (PLAT_BITS-1), cannot be rounded off. Use
186          * all bits of the sample as index */
187         if (idx < (PLAT_VAL << 1))
188                 return idx;
189
190         /* Find the group and compute the minimum value of that group */
191         error_bits = (idx >> PLAT_BITS) - 1;
192         base = 1 << (error_bits + PLAT_BITS);
193
194         /* Find its bucket number of the group */
195         k = idx % PLAT_VAL;
196
197         /* Return the mean of the range of the bucket */
198         return base + ((k + 0.5) * (1 << error_bits));
199 }
200
201 static void add_lat(struct stats *s, unsigned int us, const char *name)
202 {
203         int lat_index = 0;
204
205         if (us > s->max)
206                 s->max = us;
207         if (us < s->min)
208                 s->min = us;
209
210         if (us > max_us) {
211                 fprintf(stderr, "%s latency=%u usec\n", name, us);
212                 s->over++;
213         }
214
215         lat_index = plat_val_to_idx(us);
216         __sync_fetch_and_add(&s->plat[lat_index], 1);
217         __sync_fetch_and_add(&s->nr_samples, 1);
218 }
219
220 static int write_work(struct work_item *work)
221 {
222         struct timespec s, e;
223         ssize_t ret;
224
225         clock_gettime(CLOCK_MONOTONIC, &s);
226         ret = write(STDOUT_FILENO, work->buf, work->buf_size);
227         clock_gettime(CLOCK_MONOTONIC, &e);
228         assert(ret == work->buf_size);
229
230         add_lat(&work->writer->s, utime_since(&s, &e), "write");
231         return work->seq + 1;
232 }
233
234 static void thread_exiting(struct thread_data *thread)
235 {
236         __sync_fetch_and_add(&thread->done, 1);
237         pthread_cond_signal(&thread->done_cond);
238 }
239
240 static void *writer_fn(void *data)
241 {
242         struct writer_thread *wt = data;
243         struct work_item *work;
244         unsigned int seq = 1;
245
246         work = NULL;
247         while (!wt->thread.exit || !flist_empty(&wt->list)) {
248                 pthread_mutex_lock(&wt->thread.lock);
249
250                 if (work) {
251                         flist_add_tail(&work->list, &wt->done_list);
252                         work = NULL;
253                 }
254         
255                 work = find_seq(wt, seq);
256                 if (work)
257                         flist_del_init(&work->list);
258                 else
259                         pthread_cond_wait(&wt->thread.cond, &wt->thread.lock);
260
261                 pthread_mutex_unlock(&wt->thread.lock);
262
263                 if (work)
264                         seq = write_work(work);
265         }
266
267         thread_exiting(&wt->thread);
268         return NULL;
269 }
270
271 static void reader_work(struct work_item *work)
272 {
273         struct timespec s, e;
274         ssize_t ret;
275         size_t left;
276         void *buf;
277         off_t off;
278
279         clock_gettime(CLOCK_MONOTONIC, &s);
280
281         left = work->buf_size;
282         buf = work->buf;
283         off = work->off;
284         while (left) {
285                 ret = pread(work->fd, buf, left, off);
286                 if (!ret) {
287                         fprintf(stderr, "zero read\n");
288                         break;
289                 } else if (ret < 0) {
290                         fprintf(stderr, "errno=%d\n", errno);
291                         break;
292                 }
293                 left -= ret;
294                 off += ret;
295                 buf += ret;
296         }
297
298         clock_gettime(CLOCK_MONOTONIC, &e);
299
300         add_lat(&work->reader->s, utime_since(&s, &e), "read");
301
302         pthread_cond_signal(&work->cond);
303
304         if (separate_writer) {
305                 pthread_mutex_lock(&work->writer->thread.lock);
306                 flist_add_tail(&work->list, &work->writer->list);
307                 pthread_mutex_unlock(&work->writer->thread.lock);
308                 pthread_cond_signal(&work->writer->thread.cond);
309         } else {
310                 struct reader_thread *rt = work->reader;
311                 struct work_item *next = NULL;
312                 struct flist_head *entry;
313
314                 /*
315                  * Write current work if it matches in sequence.
316                  */
317                 if (work->seq == rt->write_seq)
318                         goto write_it;
319
320                 pthread_mutex_lock(&rt->thread.lock);
321
322                 flist_add_tail(&work->list, &rt->done_list);
323
324                 /*
325                  * See if the next work item is here, if so, write it
326                  */
327                 work = NULL;
328                 flist_for_each(entry, &rt->done_list) {
329                         next = flist_entry(entry, struct work_item, list);
330                         if (next->seq == rt->write_seq) {
331                                 work = next;
332                                 flist_del(&work->list);
333                                 break;
334                         }
335                 }
336
337                 pthread_mutex_unlock(&rt->thread.lock);
338         
339                 if (work) {
340 write_it:
341                         write_work(work);
342                         __sync_fetch_and_add(&rt->write_seq, 1);
343                 }
344         }
345 }
346
347 static void *reader_one_off(void *data)
348 {
349         reader_work(data);
350         return NULL;
351 }
352
353 static void *reader_fn(void *data)
354 {
355         struct reader_thread *rt = data;
356         struct work_item *work;
357
358         while (!rt->thread.exit || !flist_empty(&rt->list)) {
359                 work = NULL;
360                 pthread_mutex_lock(&rt->thread.lock);
361                 if (!flist_empty(&rt->list)) {
362                         work = flist_first_entry(&rt->list, struct work_item, list);
363                         flist_del_init(&work->list);
364                 } else
365                         pthread_cond_wait(&rt->thread.cond, &rt->thread.lock);
366                 pthread_mutex_unlock(&rt->thread.lock);
367
368                 if (work) {
369                         __sync_fetch_and_add(&rt->busy, 1);
370                         reader_work(work);
371                         __sync_fetch_and_sub(&rt->busy, 1);
372                 }
373         }
374
375         thread_exiting(&rt->thread);
376         return NULL;
377 }
378
379 static void queue_work(struct reader_thread *rt, struct work_item *work)
380 {
381         if (!rt->started) {
382                 pthread_mutex_lock(&rt->thread.lock);
383                 flist_add_tail(&work->list, &rt->list);
384                 pthread_mutex_unlock(&rt->thread.lock);
385
386                 rt->started = 1;
387                 pthread_create(&rt->thread.thread, NULL, reader_fn, rt);
388         } else if (!rt->busy && !pthread_mutex_trylock(&rt->thread.lock)) {
389                 flist_add_tail(&work->list, &rt->list);
390                 pthread_mutex_unlock(&rt->thread.lock);
391
392                 pthread_cond_signal(&rt->thread.cond);
393         } else {
394                 int ret = pthread_create(&work->thread, NULL, reader_one_off, work);
395                 if (ret) {
396                         fprintf(stderr, "pthread_create=%d\n", ret);
397                 } else {
398                         ret = pthread_detach(work->thread);
399                         if (ret)
400                                 fprintf(stderr, "pthread_detach=%d\n", ret);
401                 }
402         }
403 }
404
405 static unsigned int calc_percentiles(unsigned int *io_u_plat, unsigned long nr,
406                                      unsigned int **output)
407 {
408         unsigned long sum = 0;
409         unsigned int len, i, j = 0;
410         unsigned int oval_len = 0;
411         unsigned int *ovals = NULL;
412         int is_last;
413
414         len = 0;
415         while (len < PLAT_LIST_MAX && plist[len] != 0.0)
416                 len++;
417
418         if (!len)
419                 return 0;
420
421         /*
422          * Calculate bucket values, note down max and min values
423          */
424         is_last = 0;
425         for (i = 0; i < PLAT_NR && !is_last; i++) {
426                 sum += io_u_plat[i];
427                 while (sum >= (plist[j] / 100.0 * nr)) {
428                         assert(plist[j] <= 100.0);
429
430                         if (j == oval_len) {
431                                 oval_len += 100;
432                                 ovals = realloc(ovals, oval_len * sizeof(unsigned int));
433                         }
434
435                         ovals[j] = plat_idx_to_val(i);
436                         is_last = (j == len - 1);
437                         if (is_last)
438                                 break;
439
440                         j++;
441                 }
442         }
443
444         *output = ovals;
445         return len;
446 }
447
448 static void show_latencies(struct stats *s, const char *msg)
449 {
450         unsigned int *ovals = NULL;
451         unsigned int len, i;
452
453         len = calc_percentiles(s->plat, s->nr_samples, &ovals);
454         if (len) {
455                 fprintf(stderr, "Latency percentiles (usec) (%s)\n", msg);
456                 for (i = 0; i < len; i++)
457                         fprintf(stderr, "\t%2.4fth: %u\n", plist[i], ovals[i]);
458         }
459
460         if (ovals)
461                 free(ovals);
462
463         fprintf(stderr, "\tOver=%u, min=%u, max=%u\n", s->over, s->min, s->max);
464 }
465
466 static void init_thread(struct thread_data *thread)
467 {
468         pthread_condattr_t cattr;
469         int ret;
470
471         ret = pthread_condattr_init(&cattr);
472         assert(ret == 0);
473 #ifdef CONFIG_PTHREAD_CONDATTR_SETCLOCK
474         ret = pthread_condattr_setclock(&cattr, CLOCK_MONOTONIC);
475         assert(ret == 0);
476 #endif
477         pthread_cond_init(&thread->cond, &cattr);
478         pthread_cond_init(&thread->done_cond, &cattr);
479         pthread_mutex_init(&thread->lock, NULL);
480         pthread_mutex_init(&thread->done_lock, NULL);
481         thread->exit = 0;
482 }
483
484 static void exit_thread(struct thread_data *thread,
485                         void fn(struct writer_thread *),
486                         struct writer_thread *wt)
487 {
488         __sync_fetch_and_add(&thread->exit, 1);
489         pthread_cond_signal(&thread->cond);
490
491         while (!thread->done) {
492                 pthread_mutex_lock(&thread->done_lock);
493
494                 if (fn) {
495                         struct timespec ts;
496
497 #ifdef CONFIG_PTHREAD_CONDATTR_SETCLOCK
498                         clock_gettime(CLOCK_MONOTONIC, &ts);
499 #else
500                         clock_gettime(CLOCK_REALTIME, &ts);
501 #endif
502                         ts.tv_sec++;
503
504                         pthread_cond_timedwait(&thread->done_cond, &thread->done_lock, &ts);
505                         fn(wt);
506                 } else
507                         pthread_cond_wait(&thread->done_cond, &thread->done_lock);
508
509                 pthread_mutex_unlock(&thread->done_lock);
510         }
511 }
512
513 static int usage(char *argv[])
514 {
515         fprintf(stderr, "%s: [-b blocksize] [-t max usec] [-w separate writer] -f file\n", argv[0]);
516         return 1;
517 }
518
519 static int parse_options(int argc, char *argv[])
520 {
521         int c;
522
523         while ((c = getopt(argc, argv, "f:b:t:w:")) != -1) {
524                 switch (c) {
525                 case 'f':
526                         if (file)
527                                 return usage(argv);
528                         file = strdup(optarg);
529                         break;
530                 case 'b':
531                         bs = atoi(optarg);
532                         break;
533                 case 't':
534                         max_us = atoi(optarg);
535                         break;
536                 case 'w':
537                         separate_writer = atoi(optarg);
538                         if (!separate_writer)
539                                 fprintf(stderr, "inline writing is broken\n");
540                         break;
541                 case '?':
542                 default:
543                         return usage(argv);
544                 }
545         }
546
547         if (!file)
548                 return usage(argv);
549
550         return 0;
551 }
552
553 static void prune_done_entries(struct writer_thread *wt)
554 {
555         FLIST_HEAD(list);
556
557         if (flist_empty(&wt->done_list))
558                 return;
559
560         if (pthread_mutex_trylock(&wt->thread.lock))
561                 return;
562
563         if (!flist_empty(&wt->done_list))
564                 flist_splice_init(&wt->done_list, &list);
565         pthread_mutex_unlock(&wt->thread.lock);
566
567         while (!flist_empty(&list)) {
568                 struct work_item *work;
569
570                 work = flist_first_entry(&list, struct work_item, list);
571                 flist_del(&work->list);
572
573                 pthread_cond_destroy(&work->cond);
574                 pthread_mutex_destroy(&work->lock);
575                 free(work->buf);
576                 free(work);
577         }
578 }
579
580 int main(int argc, char *argv[])
581 {
582         pthread_condattr_t cattr;
583         struct timespec s, re, we;
584         struct reader_thread *rt;
585         struct writer_thread *wt;
586         unsigned long rate;
587         uint64_t elapsed;
588         struct stat sb;
589         size_t bytes;
590         off_t off;
591         int fd, seq;
592         int ret;
593
594         if (parse_options(argc, argv))
595                 return 1;
596
597         fd = open(file, O_RDONLY);
598         if (fd < 0) {
599                 perror("open");
600                 return 2;
601         }
602
603         if (fstat(fd, &sb) < 0) {
604                 perror("stat");
605                 return 3;
606         }
607
608         wt = &writer_thread;
609         init_thread(&wt->thread);
610         INIT_FLIST_HEAD(&wt->list);
611         INIT_FLIST_HEAD(&wt->done_list);
612         wt->s.max = 0;
613         wt->s.min = -1U;
614         pthread_create(&wt->thread.thread, NULL, writer_fn, wt);
615
616         rt = &reader_thread;
617         init_thread(&rt->thread);
618         INIT_FLIST_HEAD(&rt->list);
619         INIT_FLIST_HEAD(&rt->done_list);
620         rt->s.max = 0;
621         rt->s.min = -1U;
622         rt->write_seq = 1;
623
624         off = 0;
625         seq = 0;
626         bytes = 0;
627
628         ret = pthread_condattr_init(&cattr);
629         assert(ret == 0);
630 #ifdef CONFIG_PTHREAD_CONDATTR_SETCLOCK
631         ret = pthread_condattr_setclock(&cattr, CLOCK_MONOTONIC);
632         assert(ret == 0);
633 #endif
634
635         clock_gettime(CLOCK_MONOTONIC, &s);
636
637         while (sb.st_size) {
638                 struct work_item *work;
639                 size_t this_len;
640                 struct timespec ts;
641
642                 prune_done_entries(wt);
643
644                 this_len = sb.st_size;
645                 if (this_len > bs)
646                         this_len = bs;
647
648                 work = calloc(1, sizeof(*work));
649                 work->buf = malloc(this_len);
650                 work->buf_size = this_len;
651                 work->off = off;
652                 work->fd = fd;
653                 work->seq = ++seq;
654                 work->writer = wt;
655                 work->reader = rt;
656                 pthread_cond_init(&work->cond, &cattr);
657                 pthread_mutex_init(&work->lock, NULL);
658
659                 queue_work(rt, work);
660
661 #ifdef CONFIG_PTHREAD_CONDATTR_SETCLOCK
662                 clock_gettime(CLOCK_MONOTONIC, &ts);
663 #else
664                 clock_gettime(CLOCK_REALTIME, &ts);
665 #endif
666                 ts.tv_nsec += max_us * 1000ULL;
667                 if (ts.tv_nsec >= 1000000000ULL) {
668                         ts.tv_nsec -= 1000000000ULL;
669                         ts.tv_sec++;
670                 }
671
672                 pthread_mutex_lock(&work->lock);
673                 pthread_cond_timedwait(&work->cond, &work->lock, &ts);
674                 pthread_mutex_unlock(&work->lock);
675
676                 off += this_len;
677                 sb.st_size -= this_len;
678                 bytes += this_len;
679         }
680
681         exit_thread(&rt->thread, NULL, NULL);
682         clock_gettime(CLOCK_MONOTONIC, &re);
683
684         exit_thread(&wt->thread, prune_done_entries, wt);
685         clock_gettime(CLOCK_MONOTONIC, &we);
686
687         show_latencies(&rt->s, "READERS");
688         show_latencies(&wt->s, "WRITERS");
689
690         bytes /= 1024;
691         elapsed = utime_since(&s, &re);
692         rate = elapsed ? (bytes * 1000UL * 1000UL) / elapsed : 0;
693         fprintf(stderr, "Read rate (KiB/sec) : %lu\n", rate);
694         elapsed = utime_since(&s, &we);
695         rate = elapsed ? (bytes * 1000UL * 1000UL) / elapsed : 0;
696         fprintf(stderr, "Write rate (KiB/sec): %lu\n", rate);
697
698         close(fd);
699         return 0;
700 }