glusterfs: update for new API
[fio.git] / t / read-to-pipe-async.c
1 /*
2  * Read a file and write the contents to stdout. If a given read takes
3  * longer than 'max_us' time, then we schedule a new thread to handle
4  * the next read. This avoids the coordinated omission problem, where
5  * one request appears to take a long time, but in reality a lot of
6  * requests would have been slow, but we don't notice since new submissions
7  * are not being issued if just 1 is held up.
8  *
9  * One test case:
10  *
11  * $ time (./read-to-pipe-async -f randfile.gz | gzip -dc > outfile; sync)
12  *
13  * This will read randfile.gz and log the latencies of doing so, while
14  * piping the output to gzip to decompress it. Any latencies over max_us
15  * are logged when they happen, and latency buckets are displayed at the
16  * end of the run
17  *
18  * gcc -Wall -g -O2 -o read-to-pipe-async read-to-pipe-async.c -lpthread
19  *
20  * Copyright (C) 2016 Jens Axboe
21  *
22  */
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26 #include <fcntl.h>
27 #include <sys/time.h>
28 #include <sys/types.h>
29 #include <sys/stat.h>
30 #include <inttypes.h>
31 #include <string.h>
32 #include <pthread.h>
33 #include <errno.h>
34 #include <assert.h>
35
36 #include "../flist.h"
37
38 static int bs = 4096;
39 static int max_us = 10000;
40 static char *file;
41 static int separate_writer = 1;
42
43 #define PLAT_BITS       8
44 #define PLAT_VAL        (1 << PLAT_BITS)
45 #define PLAT_GROUP_NR   19
46 #define PLAT_NR         (PLAT_GROUP_NR * PLAT_VAL)
47 #define PLAT_LIST_MAX   20
48
49 struct stats {
50         unsigned int plat[PLAT_NR];
51         unsigned int nr_samples;
52         unsigned int max;
53         unsigned int min;
54         unsigned int over;
55 };
56
57 static double plist[PLAT_LIST_MAX] = { 50.0, 75.0, 90.0, 95.0, 99.0, 99.5, 99.9, 99.99, 99.999, 99.9999, };
58
59 struct thread_data {
60         int exit;
61         int done;
62         pthread_mutex_t lock;
63         pthread_cond_t cond;
64         pthread_mutex_t done_lock;
65         pthread_cond_t done_cond;
66         pthread_t thread;
67 };
68
69 struct writer_thread {
70         struct flist_head list;
71         struct flist_head done_list;
72         struct stats s;
73         struct thread_data thread;
74 };
75
76 struct reader_thread {
77         struct flist_head list;
78         struct flist_head done_list;
79         int started;
80         int busy;
81         int write_seq;
82         struct stats s;
83         struct thread_data thread;
84 };
85
86 struct work_item {
87         struct flist_head list;
88         void *buf;
89         size_t buf_size;
90         off_t off;
91         int fd;
92         int seq;
93         struct writer_thread *writer;
94         struct reader_thread *reader;
95         pthread_mutex_t lock;
96         pthread_cond_t cond;
97         pthread_t thread;
98 };
99
100 static struct reader_thread reader_thread;
101 static struct writer_thread writer_thread;
102
103 uint64_t utime_since(const struct timeval *s, const struct timeval *e)
104 {
105         long sec, usec;
106         uint64_t ret;
107
108         sec = e->tv_sec - s->tv_sec;
109         usec = e->tv_usec - s->tv_usec;
110         if (sec > 0 && usec < 0) {
111                 sec--;
112                 usec += 1000000;
113         }
114
115         if (sec < 0 || (sec == 0 && usec < 0))
116                 return 0;
117
118         ret = sec * 1000000ULL + usec;
119
120         return ret;
121 }
122
123 static struct work_item *find_seq(struct writer_thread *w, unsigned int seq)
124 {
125         struct work_item *work;
126         struct flist_head *entry;
127
128         if (flist_empty(&w->list))
129                 return NULL;
130
131         flist_for_each(entry, &w->list) {
132                 work = flist_entry(entry, struct work_item, list);
133                 if (work->seq == seq)
134                         return work;
135         }
136
137         return NULL;
138 }
139
140 static unsigned int plat_val_to_idx(unsigned int val)
141 {
142         unsigned int msb, error_bits, base, offset;
143
144         /* Find MSB starting from bit 0 */
145         if (val == 0)
146                 msb = 0;
147         else
148                 msb = sizeof(val)*8 - __builtin_clz(val) - 1;
149
150         /*
151          * MSB <= (PLAT_BITS-1), cannot be rounded off. Use
152          * all bits of the sample as index
153          */
154         if (msb <= PLAT_BITS)
155                 return val;
156
157         /* Compute the number of error bits to discard*/
158         error_bits = msb - PLAT_BITS;
159
160         /* Compute the number of buckets before the group */
161         base = (error_bits + 1) << PLAT_BITS;
162
163         /*
164          * Discard the error bits and apply the mask to find the
165          * index for the buckets in the group
166          */
167         offset = (PLAT_VAL - 1) & (val >> error_bits);
168
169         /* Make sure the index does not exceed (array size - 1) */
170         return (base + offset) < (PLAT_NR - 1) ?
171                 (base + offset) : (PLAT_NR - 1);
172 }
173
174 /*
175  * Convert the given index of the bucket array to the value
176  * represented by the bucket
177  */
178 static unsigned int plat_idx_to_val(unsigned int idx)
179 {
180         unsigned int error_bits, k, base;
181
182         assert(idx < PLAT_NR);
183
184         /* MSB <= (PLAT_BITS-1), cannot be rounded off. Use
185          * all bits of the sample as index */
186         if (idx < (PLAT_VAL << 1))
187                 return idx;
188
189         /* Find the group and compute the minimum value of that group */
190         error_bits = (idx >> PLAT_BITS) - 1;
191         base = 1 << (error_bits + PLAT_BITS);
192
193         /* Find its bucket number of the group */
194         k = idx % PLAT_VAL;
195
196         /* Return the mean of the range of the bucket */
197         return base + ((k + 0.5) * (1 << error_bits));
198 }
199
200 static void add_lat(struct stats *s, unsigned int us, const char *name)
201 {
202         int lat_index = 0;
203
204         if (us > s->max)
205                 s->max = us;
206         if (us < s->min)
207                 s->min = us;
208
209         if (us > max_us) {
210                 fprintf(stderr, "%s latency=%u usec\n", name, us);
211                 s->over++;
212         }
213
214         lat_index = plat_val_to_idx(us);
215         __sync_fetch_and_add(&s->plat[lat_index], 1);
216         __sync_fetch_and_add(&s->nr_samples, 1);
217 }
218
219 static int write_work(struct work_item *work)
220 {
221         struct timeval s, e;
222         ssize_t ret;
223
224         gettimeofday(&s, NULL);
225         ret = write(STDOUT_FILENO, work->buf, work->buf_size);
226         gettimeofday(&e, NULL);
227         assert(ret == work->buf_size);
228
229         add_lat(&work->writer->s, utime_since(&s, &e), "write");
230         return work->seq + 1;
231 }
232
233 static void thread_exiting(struct thread_data *thread)
234 {
235         __sync_fetch_and_add(&thread->done, 1);
236         pthread_cond_signal(&thread->done_cond);
237 }
238
239 static void *writer_fn(void *data)
240 {
241         struct writer_thread *wt = data;
242         struct work_item *work;
243         unsigned int seq = 1;
244
245         work = NULL;
246         while (!wt->thread.exit || !flist_empty(&wt->list)) {
247                 pthread_mutex_lock(&wt->thread.lock);
248
249                 if (work) {
250                         flist_add_tail(&work->list, &wt->done_list);
251                         work = NULL;
252                 }
253         
254                 work = find_seq(wt, seq);
255                 if (work)
256                         flist_del_init(&work->list);
257                 else
258                         pthread_cond_wait(&wt->thread.cond, &wt->thread.lock);
259
260                 pthread_mutex_unlock(&wt->thread.lock);
261
262                 if (work)
263                         seq = write_work(work);
264         }
265
266         thread_exiting(&wt->thread);
267         return NULL;
268 }
269
270 static void reader_work(struct work_item *work)
271 {
272         struct timeval s, e;
273         ssize_t ret;
274         size_t left;
275         void *buf;
276         off_t off;
277
278         gettimeofday(&s, NULL);
279
280         left = work->buf_size;
281         buf = work->buf;
282         off = work->off;
283         while (left) {
284                 ret = pread(work->fd, buf, left, off);
285                 if (!ret) {
286                         fprintf(stderr, "zero read\n");
287                         break;
288                 } else if (ret < 0) {
289                         fprintf(stderr, "errno=%d\n", errno);
290                         break;
291                 }
292                 left -= ret;
293                 off += ret;
294                 buf += ret;
295         }
296
297         gettimeofday(&e, NULL);
298
299         add_lat(&work->reader->s, utime_since(&s, &e), "read");
300
301         pthread_cond_signal(&work->cond);
302
303         if (separate_writer) {
304                 pthread_mutex_lock(&work->writer->thread.lock);
305                 flist_add_tail(&work->list, &work->writer->list);
306                 pthread_mutex_unlock(&work->writer->thread.lock);
307                 pthread_cond_signal(&work->writer->thread.cond);
308         } else {
309                 struct reader_thread *rt = work->reader;
310                 struct work_item *next = NULL;
311                 struct flist_head *entry;
312
313                 /*
314                  * Write current work if it matches in sequence.
315                  */
316                 if (work->seq == rt->write_seq)
317                         goto write_it;
318
319                 pthread_mutex_lock(&rt->thread.lock);
320
321                 flist_add_tail(&work->list, &rt->done_list);
322
323                 /*
324                  * See if the next work item is here, if so, write it
325                  */
326                 work = NULL;
327                 flist_for_each(entry, &rt->done_list) {
328                         next = flist_entry(entry, struct work_item, list);
329                         if (next->seq == rt->write_seq) {
330                                 work = next;
331                                 flist_del(&work->list);
332                                 break;
333                         }
334                 }
335
336                 pthread_mutex_unlock(&rt->thread.lock);
337         
338                 if (work) {
339 write_it:
340                         write_work(work);
341                         __sync_fetch_and_add(&rt->write_seq, 1);
342                 }
343         }
344 }
345
346 static void *reader_one_off(void *data)
347 {
348         reader_work(data);
349         return NULL;
350 }
351
352 static void *reader_fn(void *data)
353 {
354         struct reader_thread *rt = data;
355         struct work_item *work;
356
357         while (!rt->thread.exit || !flist_empty(&rt->list)) {
358                 work = NULL;
359                 pthread_mutex_lock(&rt->thread.lock);
360                 if (!flist_empty(&rt->list)) {
361                         work = flist_first_entry(&rt->list, struct work_item, list);
362                         flist_del_init(&work->list);
363                 } else
364                         pthread_cond_wait(&rt->thread.cond, &rt->thread.lock);
365                 pthread_mutex_unlock(&rt->thread.lock);
366
367                 if (work) {
368                         __sync_fetch_and_add(&rt->busy, 1);
369                         reader_work(work);
370                         __sync_fetch_and_sub(&rt->busy, 1);
371                 }
372         }
373
374         thread_exiting(&rt->thread);
375         return NULL;
376 }
377
378 static void queue_work(struct reader_thread *rt, struct work_item *work)
379 {
380         if (!rt->started) {
381                 pthread_mutex_lock(&rt->thread.lock);
382                 flist_add_tail(&work->list, &rt->list);
383                 pthread_mutex_unlock(&rt->thread.lock);
384
385                 rt->started = 1;
386                 pthread_create(&rt->thread.thread, NULL, reader_fn, rt);
387         } else if (!rt->busy && !pthread_mutex_trylock(&rt->thread.lock)) {
388                 flist_add_tail(&work->list, &rt->list);
389                 pthread_mutex_unlock(&rt->thread.lock);
390
391                 pthread_cond_signal(&rt->thread.cond);
392         } else {
393                 int ret = pthread_create(&work->thread, NULL, reader_one_off, work);
394                 if (ret)
395                         fprintf(stderr, "pthread_create=%d\n", ret);
396                 else
397                         pthread_detach(work->thread);
398         }
399 }
400
401 static unsigned int calc_percentiles(unsigned int *io_u_plat, unsigned long nr,
402                                      unsigned int **output)
403 {
404         unsigned long sum = 0;
405         unsigned int len, i, j = 0;
406         unsigned int oval_len = 0;
407         unsigned int *ovals = NULL;
408         int is_last;
409
410         len = 0;
411         while (len < PLAT_LIST_MAX && plist[len] != 0.0)
412                 len++;
413
414         if (!len)
415                 return 0;
416
417         /*
418          * Calculate bucket values, note down max and min values
419          */
420         is_last = 0;
421         for (i = 0; i < PLAT_NR && !is_last; i++) {
422                 sum += io_u_plat[i];
423                 while (sum >= (plist[j] / 100.0 * nr)) {
424                         assert(plist[j] <= 100.0);
425
426                         if (j == oval_len) {
427                                 oval_len += 100;
428                                 ovals = realloc(ovals, oval_len * sizeof(unsigned int));
429                         }
430
431                         ovals[j] = plat_idx_to_val(i);
432                         is_last = (j == len - 1);
433                         if (is_last)
434                                 break;
435
436                         j++;
437                 }
438         }
439
440         *output = ovals;
441         return len;
442 }
443
444 static void show_latencies(struct stats *s, const char *msg)
445 {
446         unsigned int *ovals = NULL;
447         unsigned int len, i;
448
449         len = calc_percentiles(s->plat, s->nr_samples, &ovals);
450         if (len) {
451                 fprintf(stderr, "Latency percentiles (usec) (%s)\n", msg);
452                 for (i = 0; i < len; i++)
453                         fprintf(stderr, "\t%2.4fth: %u\n", plist[i], ovals[i]);
454         }
455
456         if (ovals)
457                 free(ovals);
458
459         fprintf(stderr, "\tOver=%u, min=%u, max=%u\n", s->over, s->min, s->max);
460 }
461
462 static void init_thread(struct thread_data *thread)
463 {
464         pthread_cond_init(&thread->cond, NULL);
465         pthread_cond_init(&thread->done_cond, NULL);
466         pthread_mutex_init(&thread->lock, NULL);
467         pthread_mutex_init(&thread->done_lock, NULL);
468         thread->exit = 0;
469 }
470
471 static void exit_thread(struct thread_data *thread,
472                         void fn(struct writer_thread *),
473                         struct writer_thread *wt)
474 {
475         __sync_fetch_and_add(&thread->exit, 1);
476         pthread_cond_signal(&thread->cond);
477
478         while (!thread->done) {
479                 pthread_mutex_lock(&thread->done_lock);
480
481                 if (fn) {
482                         struct timeval tv;
483                         struct timespec ts;
484
485                         gettimeofday(&tv, NULL);
486                         ts.tv_sec = tv.tv_sec + 1;
487                         ts.tv_nsec = tv.tv_usec * 1000ULL;
488
489                         pthread_cond_timedwait(&thread->done_cond, &thread->done_lock, &ts);
490                         fn(wt);
491                 } else
492                         pthread_cond_wait(&thread->done_cond, &thread->done_lock);
493
494                 pthread_mutex_unlock(&thread->done_lock);
495         }
496 }
497
498 static int usage(char *argv[])
499 {
500         fprintf(stderr, "%s: [-b blocksize] [-t max usec] [-w separate writer] -f file\n", argv[0]);
501         return 1;
502 }
503
504 static int parse_options(int argc, char *argv[])
505 {
506         int c;
507
508         while ((c = getopt(argc, argv, "f:b:t:w:")) != -1) {
509                 switch (c) {
510                 case 'f':
511                         file = strdup(optarg);
512                         break;
513                 case 'b':
514                         bs = atoi(optarg);
515                         break;
516                 case 't':
517                         max_us = atoi(optarg);
518                         break;
519                 case 'w':
520                         separate_writer = atoi(optarg);
521                         if (!separate_writer)
522                                 fprintf(stderr, "inline writing is broken\n");
523                         break;
524                 case '?':
525                 default:
526                         return usage(argv);
527                 }
528         }
529
530         if (!file)
531                 return usage(argv);
532
533         return 0;
534 }
535
536 static void prune_done_entries(struct writer_thread *wt)
537 {
538         FLIST_HEAD(list);
539
540         if (flist_empty(&wt->done_list))
541                 return;
542
543         if (pthread_mutex_trylock(&wt->thread.lock))
544                 return;
545
546         if (!flist_empty(&wt->done_list))
547                 flist_splice_init(&wt->done_list, &list);
548         pthread_mutex_unlock(&wt->thread.lock);
549
550         while (!flist_empty(&list)) {
551                 struct work_item *work;
552
553                 work = flist_first_entry(&list, struct work_item, list);
554                 flist_del(&work->list);
555
556                 pthread_cond_destroy(&work->cond);
557                 pthread_mutex_destroy(&work->lock);
558                 free(work->buf);
559                 free(work);
560         }
561 }
562
563 int main(int argc, char *argv[])
564 {
565         struct timeval s, re, we;
566         struct reader_thread *rt;
567         struct writer_thread *wt;
568         unsigned long rate;
569         struct stat sb;
570         size_t bytes;
571         off_t off;
572         int fd, seq;
573
574         if (parse_options(argc, argv))
575                 return 1;
576
577         fd = open(file, O_RDONLY);
578         if (fd < 0) {
579                 perror("open");
580                 return 2;
581         }
582
583         if (fstat(fd, &sb) < 0) {
584                 perror("stat");
585                 return 3;
586         }
587
588         wt = &writer_thread;
589         init_thread(&wt->thread);
590         INIT_FLIST_HEAD(&wt->list);
591         INIT_FLIST_HEAD(&wt->done_list);
592         wt->s.max = 0;
593         wt->s.min = -1U;
594         pthread_create(&wt->thread.thread, NULL, writer_fn, wt);
595
596         rt = &reader_thread;
597         init_thread(&rt->thread);
598         INIT_FLIST_HEAD(&rt->list);
599         INIT_FLIST_HEAD(&rt->done_list);
600         rt->s.max = 0;
601         rt->s.min = -1U;
602         rt->write_seq = 1;
603
604         off = 0;
605         seq = 0;
606         bytes = 0;
607
608         gettimeofday(&s, NULL);
609
610         while (sb.st_size) {
611                 struct work_item *work;
612                 size_t this_len;
613                 struct timespec ts;
614                 struct timeval tv;
615
616                 prune_done_entries(wt);
617
618                 this_len = sb.st_size;
619                 if (this_len > bs)
620                         this_len = bs;
621
622                 work = calloc(1, sizeof(*work));
623                 work->buf = malloc(this_len);
624                 work->buf_size = this_len;
625                 work->off = off;
626                 work->fd = fd;
627                 work->seq = ++seq;
628                 work->writer = wt;
629                 work->reader = rt;
630                 pthread_cond_init(&work->cond, NULL);
631                 pthread_mutex_init(&work->lock, NULL);
632
633                 queue_work(rt, work);
634
635                 gettimeofday(&tv, NULL);
636                 ts.tv_sec = tv.tv_sec;
637                 ts.tv_nsec = tv.tv_usec * 1000ULL;
638                 ts.tv_nsec += max_us * 1000ULL;
639                 if (ts.tv_nsec >= 1000000000ULL) {
640                         ts.tv_nsec -= 1000000000ULL;
641                         ts.tv_sec++;
642                 }
643
644                 pthread_mutex_lock(&work->lock);
645                 pthread_cond_timedwait(&work->cond, &work->lock, &ts);
646                 pthread_mutex_unlock(&work->lock);
647
648                 off += this_len;
649                 sb.st_size -= this_len;
650                 bytes += this_len;
651         }
652
653         exit_thread(&rt->thread, NULL, NULL);
654         gettimeofday(&re, NULL);
655
656         exit_thread(&wt->thread, prune_done_entries, wt);
657         gettimeofday(&we, NULL);
658
659         show_latencies(&rt->s, "READERS");
660         show_latencies(&wt->s, "WRITERS");
661
662         bytes /= 1024;
663         rate = (bytes * 1000UL * 1000UL) / utime_since(&s, &re);
664         fprintf(stderr, "Read rate (KiB/sec) : %lu\n", rate);
665         rate = (bytes * 1000UL * 1000UL) / utime_since(&s, &we);
666         fprintf(stderr, "Write rate (KiB/sec): %lu\n", rate);
667
668         close(fd);
669         return 0;
670 }