[PATCH] fio: rename stat file + location, it wasn't reliable before
[disktools.git] / fio.c
1 /*
2  * fio - the flexible io tester
3  *
4  * Copyright (C) 2005 Jens Axboe <axboe@suse.de>
5  *
6  *  This program is free software; you can redistribute it and/or modify
7  *  it under the terms of the GNU General Public License as published by
8  *  the Free Software Foundation; either version 2 of the License, or
9  *  (at your option) any later version.
10  *
11  *  This program is distributed in the hope that it will be useful,
12  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  *  GNU General Public License for more details.
15  *
16  *  You should have received a copy of the GNU General Public License
17  *  along with this program; if not, write to the Free Software
18  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19  *
20  */
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <unistd.h>
24 #include <fcntl.h>
25 #include <string.h>
26 #include <errno.h>
27 #include <signal.h>
28 #include <time.h>
29 #include <ctype.h>
30 #include <sched.h>
31 #include <libaio.h>
32 #include <sys/time.h>
33 #include <sys/types.h>
34 #include <sys/stat.h>
35 #include <sys/wait.h>
36 #include <semaphore.h>
37 #include <sys/ipc.h>
38 #include <sys/shm.h>
39 #include <asm/unistd.h>
40
41 #define MAX_JOBS        (1024)
42
43 /*
44  * assume we don't have _get either, if _set isn't defined
45  */
46 #ifndef __NR_ioprio_set
47
48 #if defined(__i386__)
49 #define __NR_ioprio_set         289
50 #define __NR_ioprio_get         290
51 #elif defined(__powerpc__) || defined(__powerpc64__)
52 #define __NR_ioprio_set         273
53 #define __NR_ioprio_get         274
54 #elif defined(__x86_64__)
55 #define __NR_ioprio_set         251
56 #define __NR_ioprio_get         252
57 #elif defined(__ia64__)
58 #define __NR_ioprio_set         1274
59 #define __NR_ioprio_get         1275
60 #elif defined(__alpha__)
61 #define __NR_ioprio_set         442
62 #define __NR_ioprio_get         443
63 #elif defined(__s390x__) || defined(__s390__)
64 #define __NR_ioprio_set         282
65 #define __NR_ioprio_get         283
66 #else
67 #error "Unsupported arch"
68 #endif
69
70 #endif
71
72 static int ioprio_set(int which, int who, int ioprio)
73 {
74         return syscall(__NR_ioprio_set, which, who, ioprio);
75 }
76
77 enum {
78         IOPRIO_WHO_PROCESS = 1,
79         IOPRIO_WHO_PGRP,
80         IOPRIO_WHO_USER,
81 };
82
83 #define IOPRIO_CLASS_SHIFT      13
84
85 #define MASK    (4095)
86
87 #define DEF_BS          (4096)
88 #define DEF_TIMEOUT     (30)
89 #define DEF_RATE_CYCLE  (1000)
90 #define DEF_ODIRECT     (1)
91 #define DEF_SEQUENTIAL  (1)
92 #define DEF_WRITESTAT   (0)
93 #define DEF_RAND_REPEAT (1)
94
95 #define ALIGN(buf)      (char *) (((unsigned long) (buf) + MASK) & ~(MASK))
96
97 static int write_stat = DEF_WRITESTAT;
98 static int repeatable = DEF_RAND_REPEAT;
99 static int rate_quit = 1;
100
101 static int thread_number;
102 static char *ini_file;
103
104 static int shm_id;
105
106 enum {
107         DDIR_READ = 0,
108         DDIR_WRITE,
109 };
110
111 /*
112  * thread life cycle
113  */
114 enum {
115         TD_NOT_CREATED = 0,
116         TD_CREATED,
117         TD_STARTED,
118         TD_EXITED,
119         TD_REAPED,
120 };
121
122 struct thread_data {
123         char file_name[256];
124         int thread_number;
125         int error;
126         int fd;
127         int stat_fd;
128         pid_t pid;
129         char *buf;
130         volatile int terminate;
131         volatile int runstate;
132         unsigned int ddir;
133         unsigned int ioprio;
134         unsigned int sequential;
135         unsigned int bs;
136         unsigned int odirect;
137         unsigned int delay_sleep;
138         unsigned int fsync_blocks;
139         unsigned int start_delay;
140         unsigned int timeout;
141         unsigned int use_aio;
142         cpu_set_t cpumask;
143
144         io_context_t *aio_ctx;
145         struct iocb *aio_iocbs;
146         unsigned int aio_depth;
147         unsigned int aio_cur_depth;
148         struct io_event *aio_events;
149         char *aio_iocbs_status;
150
151         unsigned int rate;
152         unsigned int ratemin;
153         unsigned int ratecycle;
154         unsigned long rate_usec_cycle;
155         long rate_pending_usleep;
156         unsigned long rate_blocks;
157         struct timeval lastrate;
158
159         unsigned long max_latency;      /* msec */
160         unsigned long min_latency;      /* msec */
161         unsigned long runtime;          /* sec */
162         unsigned long blocks;
163         unsigned long io_blocks;
164         unsigned long last_block;
165         sem_t mutex;
166         struct drand48_data random_state;
167
168         /*
169          * bandwidth stat
170          */
171         unsigned long stat_time;
172         unsigned long stat_time_last;
173         unsigned long stat_blocks_last;
174
175         struct timeval start;
176 };
177
178 static struct thread_data *threads;
179 static struct thread_data def_thread;
180
181 static sem_t startup_sem;
182
183 static void sig_handler(int sig)
184 {
185         int i;
186
187         for (i = 0; i < thread_number; i++) {
188                 struct thread_data *td = &threads[i];
189
190                 td->terminate = 1;
191                 td->start_delay = 0;
192         }
193 }
194
195 static int init_random_state(struct thread_data *td)
196 {
197         unsigned long seed = 123;
198
199         if (td->sequential)
200                 return 0;
201
202         if (!repeatable) {
203                 int fd = open("/dev/random", O_RDONLY);
204
205                 if (fd == -1) {
206                         td->error = errno;
207                         return 1;
208                 }
209
210                 if (read(fd, &seed, sizeof(seed)) < (int) sizeof(seed)) {
211                         td->error = EIO;
212                         close(fd);
213                         return 1;
214                 }
215
216                 close(fd);
217         }
218
219         srand48_r(seed, &td->random_state);
220         return 0;
221 }
222
223 static void shutdown_stat_file(struct thread_data *td)
224 {
225         if (td->stat_fd != -1) {
226                 fsync(td->stat_fd);
227                 close(td->stat_fd);
228         }
229 }
230
231 static int init_stat_file(struct thread_data *td)
232 {
233         char n[256];
234
235         if (!write_stat)
236                 return 0;
237
238         sprintf(n, "fio_thread%d.stat", td->thread_number);
239         td->stat_fd = open(n, O_WRONLY | O_CREAT | O_TRUNC, 0644);
240         if (td->stat_fd == -1) {
241                 perror("open stat file");
242                 td->error = errno;
243                 return 1;
244         }
245
246         return 0;
247 }
248
249 static unsigned long utime_since(struct timeval *s, struct timeval *e)
250 {
251         double sec, usec;
252
253         sec = e->tv_sec - s->tv_sec;
254         usec = e->tv_usec - s->tv_usec;
255         if (sec > 0 && usec < 0) {
256                 sec--;
257                 usec += 1000000;
258         }
259
260         sec *= (double) 1000000;
261
262         return sec + usec;
263 }
264
265 static unsigned long mtime_since(struct timeval *s, struct timeval *e)
266 {
267         double sec, usec;
268
269         sec = e->tv_sec - s->tv_sec;
270         usec = e->tv_usec - s->tv_usec;
271         if (sec > 0 && usec < 0) {
272                 sec--;
273                 usec += 1000000;
274         }
275
276         sec *= (double) 1000;
277         usec /= (double) 1000;
278
279         return sec + usec;
280 }
281
282 static inline unsigned long msec_now(struct timeval *s)
283 {
284         return s->tv_sec * 1000 + s->tv_usec / 1000;
285 }
286
287 static unsigned long get_next_offset(struct thread_data *td)
288 {
289         unsigned long b;
290         long r;
291
292         if (!td->sequential) {
293                 lrand48_r(&td->random_state, &r);
294                 b = (1+(double) (td->blocks-1) * r / (RAND_MAX+1.0));
295         } else {
296                 b = td->last_block;
297                 td->last_block++;
298         }
299
300         return b * td->bs;
301 }
302
303 static void add_stat_sample(struct thread_data *td, unsigned long msec)
304 {
305         char sample[256];
306
307         if (!td->stat_fd)
308                 return;
309
310 #if 0
311         sprintf(sample, "%lu, %lu\n", td->io_blocks, msec);
312         write(td->stat_fd, sample, strlen(sample));
313 #else
314         td->stat_time += msec;
315         td->stat_time_last += msec;
316         td->stat_blocks_last++;
317
318         if (td->stat_time_last >= 500) {
319                 unsigned long rate = td->stat_blocks_last * td->bs / (td->stat_time_last);
320
321                 td->stat_time_last = 0;
322                 td->stat_blocks_last = 0;
323                 sprintf(sample, "%lu, %lu\n", td->stat_time, rate);
324                 //sprintf(sample, "%lu, %lu\n", td->io_blocks, msec);
325                 write(td->stat_fd, sample, strlen(sample));
326         }
327 #endif
328 }
329
330 static void usec_sleep(int usec)
331 {
332         struct timespec req = { .tv_sec = 0, .tv_nsec = usec * 1000 };
333         struct timespec rem;
334
335         do {
336                 rem.tv_sec = rem.tv_nsec = 0;
337                 nanosleep(&req, &rem);
338                 if (!rem.tv_nsec)
339                         break;
340
341                 req.tv_nsec = rem.tv_nsec;
342         } while (1);
343 }
344
345 static void rate_throttle(struct thread_data *td, unsigned long time_spent)
346 {
347         if (!td->rate)
348                 return;
349
350         if (time_spent < td->rate_usec_cycle) {
351                 unsigned long s = td->rate_usec_cycle - time_spent;
352
353                 td->rate_pending_usleep += s;
354                 if (td->rate_pending_usleep >= 100000) {
355                         usec_sleep(td->rate_pending_usleep);
356                         td->rate_pending_usleep = 0;
357                 }
358         } else {
359                 long overtime = time_spent - td->rate_usec_cycle;
360
361                 td->rate_pending_usleep -= overtime;
362         }
363 }
364
365 static int check_min_rate(struct thread_data *td, struct timeval *now)
366 {
367         unsigned long spent = mtime_since(&td->start, now);
368         unsigned long rate;
369
370         /*
371          * allow a 2 second settle period in the beginning
372          */
373         if (spent < 2000)
374                 return 0;
375
376         /*
377          * if rate blocks is set, sample is running
378          */
379         if (td->rate_blocks) {
380                 spent = mtime_since(&td->lastrate, now);
381                 if (spent < td->ratecycle)
382                         return 0;
383
384                 rate = ((td->io_blocks - td->rate_blocks) * td->bs) / spent;
385                 if (rate < td->ratemin) {
386                         printf("Client%d: min rate %d not met, got %ldKiB/sec\n", td->thread_number, td->ratemin, rate);
387                         if (rate_quit)
388                                 sig_handler(0);
389                         return 1;
390                 }
391         }
392
393         td->rate_blocks = td->io_blocks;
394         memcpy(&td->lastrate, now, sizeof(*now));
395         return 0;
396 }
397
398 static inline int runtime_exceeded(struct thread_data *td, struct timeval *t)
399 {
400         if (mtime_since(&td->start, t) >= td->timeout * 1000)
401                 return 1;
402
403         return 0;
404 }
405
406 #define should_fsync(td)        ((td)->ddir == DDIR_WRITE && !(td)->odirect)
407
408 static void do_sync_io(struct thread_data *td)
409 {
410         struct timeval s, e;
411         unsigned long blocks, msec, usec;
412
413         for (blocks = 0; blocks < td->blocks; blocks++) {
414                 off_t offset = get_next_offset(td);
415                 int ret;
416
417                 if (td->terminate)
418                         break;
419
420                 if (lseek(td->fd, offset, SEEK_SET) == -1) {
421                         td->error = errno;
422                         break;
423                 }
424
425                 if (td->delay_sleep)
426                         usec_sleep(td->delay_sleep);
427
428                 gettimeofday(&s, NULL);
429
430                 if (td->ddir == DDIR_READ)
431                         ret = read(td->fd, td->buf, td->bs);
432                 else
433                         ret = write(td->fd, td->buf, td->bs);
434
435                 if (ret < (int) td->bs) {
436                         if (ret == -1)
437                                 td->error = errno;
438                         break;
439                 }
440
441                 td->io_blocks++;
442
443                 if (should_fsync(td) && td->fsync_blocks &&
444                     (td->io_blocks % td->fsync_blocks) == 0)
445                         fsync(td->fd);
446
447                 gettimeofday(&e, NULL);
448
449                 usec = utime_since(&s, &e);
450
451                 rate_throttle(td, usec);
452
453                 if (check_min_rate(td, &e)) {
454                         td->error = ENODATA;
455                         break;
456                 }
457
458                 msec = usec / 1000;
459                 add_stat_sample(td, msec);
460
461                 if (msec < td->min_latency)
462                         td->min_latency = msec;
463                 if (msec > td->max_latency)
464                         td->max_latency = msec;
465
466                 if (runtime_exceeded(td, &e))
467                         break;
468         }
469
470         if (should_fsync(td))
471                 fsync(td->fd);
472 }
473
474 static void aio_put_iocb(struct thread_data *td, struct iocb *iocb)
475 {
476         long offset = ((long) iocb - (long) td->aio_iocbs)/ sizeof(struct iocb);
477
478         td->aio_iocbs_status[offset] = 0;
479         td->aio_cur_depth--;
480 }
481
482 static struct iocb *aio_get_iocb(struct thread_data *td, struct timeval *t)
483 {
484         struct iocb *iocb = NULL;
485         unsigned int i;
486
487         for (i = 0; i < td->aio_depth; i++) {
488                 if (td->aio_iocbs_status[i] == 0) {
489                         td->aio_iocbs_status[i] = 1;
490                         iocb = &td->aio_iocbs[i];
491                         break;
492                 }
493         }
494
495         if (iocb) {
496                 off_t off = get_next_offset(td);
497                 char *p = td->buf + i * td->bs;
498
499                 if (td->ddir == DDIR_READ)
500                         io_prep_pread(iocb, td->fd, p, td->bs, off);
501                 else
502                         io_prep_pwrite(iocb, td->fd, p, td->bs, off);
503
504                 io_set_callback(iocb, (io_callback_t) msec_now(t));
505         }
506
507         return iocb;
508 }
509
510 static int aio_submit(struct thread_data *td, struct iocb *iocb)
511 {
512         int ret;
513
514         do {
515                 ret = io_submit(*td->aio_ctx, 1, &iocb);
516                 if (ret == 1)
517                         return 0;
518
519                 if (errno == EINTR)
520                         continue;
521                 else if (errno == EAGAIN)
522                         usleep(100);
523                 else
524                         break;
525         } while (1);
526
527         return 1;
528 }
529
530 #define iocb_time(iocb) ((unsigned long) (iocb)->data)
531
532 static void do_async_io(struct thread_data *td)
533 {
534         struct timeval s, e;
535         unsigned long blocks, msec, usec;
536
537         for (blocks = 0; blocks < td->blocks; blocks++) {
538                 struct timespec ts = { .tv_sec = 0, .tv_nsec = 0};
539                 struct timespec *timeout;
540                 int ret, i, min_evts = 0;
541                 struct iocb *iocb;
542
543                 if (td->terminate)
544                         break;
545
546                 if (td->delay_sleep)
547                         usec_sleep(td->delay_sleep);
548
549                 gettimeofday(&s, NULL);
550
551                 iocb = aio_get_iocb(td, &s);
552
553                 ret = aio_submit(td, iocb);
554                 if (ret) {
555                         td->error = errno;
556                         break;
557                 }
558
559                 td->aio_cur_depth++;
560
561                 if (td->aio_cur_depth < td->aio_depth) {
562                         timeout = &ts;
563                         min_evts = 0;
564                 } else {
565                         timeout = NULL;
566                         min_evts = 1;
567                 }
568
569                 ret = io_getevents(*td->aio_ctx, min_evts, td->aio_cur_depth, td->aio_events, timeout);
570                 if (ret < 0) {
571                         td->error = errno;
572                         break;
573                 } else if (!ret)
574                         continue;
575
576                 gettimeofday(&e, NULL);
577
578                 for (i = 0; i < ret; i++) {
579                         struct io_event *ev = td->aio_events + i;
580
581                         td->io_blocks++;
582
583                         iocb = ev->obj;
584
585                         msec = msec_now(&e) - iocb_time(iocb);
586                         add_stat_sample(td, msec);
587
588                         if (msec < td->min_latency)
589                                 td->min_latency = msec;
590                         if (msec > td->max_latency)
591                                 td->max_latency = msec;
592
593                         aio_put_iocb(td, iocb);
594                 }
595
596                 /*
597                  * the rate is batched for now, it should work for batches
598                  * of completions except the very first one which may look
599                  * a little bursty
600                  */
601                 usec = utime_since(&s, &e);
602
603                 rate_throttle(td, usec);
604
605                 if (check_min_rate(td, &e)) {
606                         td->error = ENODATA;
607                         break;
608                 }
609
610                 if (runtime_exceeded(td, &e))
611                         break;
612         }
613 }
614
615 static void cleanup_pending_aio(struct thread_data *td)
616 {
617         struct timespec ts = { .tv_sec = 0, .tv_nsec = 0};
618         unsigned int i;
619         int r;
620
621         /*
622          * get immediately available events, if any
623          */
624         r = io_getevents(*td->aio_ctx, 0, td->aio_cur_depth, td->aio_events, &ts);
625         if (r > 0) {
626                 for (i = 0; i < r; i++)
627                         aio_put_iocb(td, &td->aio_iocbs[i]);
628         }
629
630         /*
631          * now cancel remaining active events
632          */
633         for (i = 0; i < td->aio_depth; i++) {
634                 if (td->aio_iocbs_status[i] == 0)
635                         continue;
636
637                 r = io_cancel(*td->aio_ctx, &td->aio_iocbs[i], td->aio_events);
638                 if (!r)
639                         aio_put_iocb(td, &td->aio_iocbs[i]);
640         }
641
642         if (td->aio_cur_depth)
643                 io_getevents(*td->aio_ctx, td->aio_cur_depth, td->aio_cur_depth, td->aio_events, NULL);
644 }
645
646 static void cleanup_aio(struct thread_data *td)
647 {
648         if (td->aio_cur_depth)
649                 cleanup_pending_aio(td);
650
651         if (td->aio_ctx) {
652                 io_destroy(*td->aio_ctx);
653                 free(td->aio_ctx);
654         }
655         if (td->aio_iocbs)
656                 free(td->aio_iocbs);
657         if (td->aio_events)
658                 free(td->aio_events);
659         if (td->aio_iocbs_status)
660                 free(td->aio_iocbs_status);
661 }
662
663 static int init_aio(struct thread_data *td)
664 {
665         td->aio_ctx = malloc(sizeof(*td->aio_ctx));
666
667         if (io_queue_init(td->aio_depth, td->aio_ctx)) {
668                 td->error = errno;
669                 return 1;
670         }
671
672         td->aio_iocbs = malloc(td->aio_depth * sizeof(struct iocb));
673         td->aio_events = malloc(td->aio_depth * sizeof(struct io_event));
674         td->aio_iocbs_status = malloc(td->aio_depth * sizeof(char));
675         return 0;
676 }
677
678 static void *thread_main(int shm_id, int offset, char *argv[])
679 {
680         struct thread_data *td;
681         struct timeval end;
682         void *data, *ptr = NULL;
683         struct stat st;
684         int ret = 1, flags;
685
686         setsid();
687
688         data = shmat(shm_id, NULL, 0);
689         td = data + offset * sizeof(struct thread_data);
690         td->pid = getpid();
691
692         td->fd = -1;
693
694         if (sched_setaffinity(td->pid, sizeof(td->cpumask), &td->cpumask) == -1) {
695                 td->error = errno;
696                 goto err;
697         }
698
699         printf("Thread (%s) (pid=%u) (f=%s) (aio=%d) started\n", td->ddir == DDIR_READ ? "read" : "write", td->pid, td->file_name, td->aio_depth);
700         fflush(stdout);
701
702         sprintf(argv[0], "fio%d", offset);
703
704         flags = 0;
705         if (td->odirect)
706                 flags |= O_DIRECT;
707
708         if (td->ddir == DDIR_READ)
709                 td->fd = open(td->file_name, flags | O_RDONLY);
710         else
711                 td->fd = open(td->file_name, flags | O_WRONLY | O_CREAT | O_TRUNC, 0644);
712
713         if (td->fd == -1) {
714                 td->error = errno;
715                 goto err;
716         }
717
718         if (td->use_aio && init_aio(td))
719                 goto err;
720
721         if (init_random_state(td))
722                 goto err;
723         if (init_stat_file(td))
724                 goto err;
725
726         if (td->ddir == DDIR_READ) {
727                 if (fstat(td->fd, &st) == -1) {
728                         td->error = errno;
729                         goto err;
730                 }
731
732                 td->blocks = st.st_size / td->bs;
733                 if (!td->blocks) {
734                         td->error = EINVAL;
735                         goto err;
736                 }
737         } else
738                 td->blocks = 1024 * 1024 * 1024 / td->bs;
739
740         if (td->ioprio) {
741                 if (ioprio_set(IOPRIO_WHO_PROCESS, 0, td->ioprio) == -1) {
742                         td->error = errno;
743                         goto err;
744                 }
745         }
746
747         sem_post(&startup_sem);
748         sem_wait(&td->mutex);
749
750         gettimeofday(&td->start, NULL);
751
752         if (td->ratemin)
753                 memcpy(&td->lastrate, &td->start, sizeof(td->start));
754
755         if (!td->use_aio) {
756                 ptr = malloc(td->bs + MASK);
757                 td->buf = ALIGN(ptr);
758                 do_sync_io(td);
759         } else {
760                 ptr = malloc(td->bs * td->aio_depth + MASK);
761                 td->buf = ALIGN(ptr);
762                 do_async_io(td);
763         }
764
765         gettimeofday(&end, NULL);
766         td->runtime = mtime_since(&td->start, &end);
767
768         ret = 0;
769
770 err:
771         shutdown_stat_file(td);
772         if (td->use_aio)
773                 cleanup_aio(td);
774         if (td->fd != -1) {
775                 close(td->fd);
776                 td->fd = -1;
777         }
778         if (ret) {
779                 sem_post(&startup_sem);
780                 sem_wait(&td->mutex);
781         }
782         if (ptr)
783                 free(ptr);
784         td->runstate = TD_EXITED;
785         shmdt(data);
786         return NULL;
787 }
788
789 static void free_shm(void)
790 {
791         shmdt(threads);
792 }
793
794 static void show_thread_status(struct thread_data *td)
795 {
796         int prio, prio_class;
797         unsigned long bw = 0;
798
799         if (!td->io_blocks && !td->error)
800                 return;
801
802         if (td->runtime)
803                 bw = (td->io_blocks * td->bs) / td->runtime;
804
805         prio = td->ioprio & 0xff;
806         prio_class = td->ioprio >> IOPRIO_CLASS_SHIFT;
807
808         printf("thread%d (%s): err=%2d, prio=%1d/%1d maxl=%5lumsec, io=%6luMiB, bw=%6luKiB/sec\n", td->thread_number, td->ddir == DDIR_READ ? " read": "write", td->error, prio_class, prio, td->max_latency, td->io_blocks * td->bs >> 20, bw);
809 }
810
811 static int setup_rate(struct thread_data *td)
812 {
813         int nr_reads_per_sec;
814
815         if (!td->rate)
816                 return 0;
817
818         if (td->rate < td->ratemin) {
819                 fprintf(stderr, "min rate larger than nominal rate\n");
820                 return -1;
821         }
822
823         nr_reads_per_sec = td->rate * 1024 / td->bs;
824         td->rate_usec_cycle = 1000000 / nr_reads_per_sec;
825         td->rate_pending_usleep = 0;
826         return 0;
827 }
828
829 static struct thread_data *get_new_job(int global)
830 {
831         struct thread_data *td;
832
833         if (global)
834                 return &def_thread;
835         if (thread_number >= MAX_JOBS)
836                 return NULL;
837
838         td = &threads[thread_number++];
839         memset(td, 0, sizeof(*td));
840
841         td->thread_number = thread_number;
842         td->ddir = def_thread.ddir;
843         td->bs = def_thread.bs;
844         td->odirect = def_thread.odirect;
845         td->ratecycle = def_thread.ratecycle;
846         td->sequential = def_thread.sequential;
847         td->timeout = def_thread.timeout;
848         memcpy(&td->cpumask, &def_thread.cpumask, sizeof(td->cpumask));
849
850         return td;
851 }
852
853 static void put_job(struct thread_data *td)
854 {
855         memset(&threads[td->thread_number - 1], 0, sizeof(*td));
856         thread_number--;
857 }
858
859 static int add_job(struct thread_data *td, const char *filename, int prioclass,
860                    int prio)
861 {
862         if (td == &def_thread)
863                 return 0;
864
865         strcpy(td->file_name, filename);
866         td->stat_fd = -1;
867         sem_init(&td->mutex, 1, 0);
868         td->min_latency = 10000000;
869         td->ioprio = (prioclass << IOPRIO_CLASS_SHIFT) | prio;
870
871         if (td->use_aio && !td->aio_depth)
872                 td->aio_depth = 1;
873
874         if (setup_rate(td))
875                 return -1;
876
877         printf("Client%d: file=%s, rw=%d, prio=%d, seq=%d, odir=%d, bs=%d, rate=%d, aio=%d, aio_depth=%d\n", td->thread_number, filename, td->ddir, td->ioprio, td->sequential, td->odirect, td->bs, td->rate, td->use_aio, td->aio_depth);
878         return 0;
879 }
880
881 static void fill_cpu_mask(cpu_set_t cpumask, int cpu)
882 {
883         unsigned int i;
884
885         CPU_ZERO(&cpumask);
886
887         for (i = 0; i < sizeof(int) * 8; i++) {
888                 if ((1 << i) & cpu)
889                         CPU_SET(i, &cpumask);
890         }
891 }
892
893 static void fill_option(const char *input, char *output)
894 {
895         int i;
896
897         i = 0;
898         while (input[i] != ',' && input[i] != '}' && input[i] != '\0') {
899                 output[i] = input[i];
900                 i++;
901         }
902
903         output[i] = '\0';
904 }
905
906 /*
907  * job key words:
908  *
909  * file=
910  * bs=
911  * rw=
912  * direct=
913  */
914 static void parse_jobs_cmd(int argc, char *argv[], int index)
915 {
916         struct thread_data *td;
917         unsigned int prio, prioclass, cpu;
918         char *string, *filename, *p, *c;
919         int i;
920
921         string = malloc(256);
922         filename = malloc(256);
923
924         for (i = index; i < argc; i++) {
925                 p = argv[i];
926
927                 c = strpbrk(p, "{");
928                 if (!c)
929                         break;
930
931                 filename[0] = 0;
932
933                 td = get_new_job(0);
934                 if (!td)
935                         break;
936
937                 prioclass = 2;
938                 prio = 4;
939
940                 c = strstr(p, "rw=");
941                 if (c) {
942                         c += 3;
943                         if (*c == '0')
944                                 td->ddir = DDIR_READ;
945                         else
946                                 td->ddir = DDIR_WRITE;
947                 }
948
949                 c = strstr(p, "prio=");
950                 if (c) {
951                         c += 5;
952                         prio = *c - '0';
953                 }
954
955                 c = strstr(p, "prioclass=");
956                 if (c) {
957                         c += 10;
958                         prioclass = *c - '0';
959                 }
960
961                 c = strstr(p, "file=");
962                 if (c) {
963                         c += 5;
964                         fill_option(c, filename);
965                 }
966
967                 c = strstr(p, "bs=");
968                 if (c) {
969                         c += 3;
970                         fill_option(c, string);
971                         td->bs = strtoul(string, NULL, 10);
972                         td->bs <<= 10;
973                 }
974
975                 c = strstr(p, "direct=");
976                 if (c) {
977                         c += 7;
978                         if (*c != '0')
979                                 td->odirect = 1;
980                         else
981                                 td->odirect = 0;
982                 }
983
984                 c = strstr(p, "delay=");
985                 if (c) {
986                         c += 6;
987                         fill_option(c, string);
988                         td->delay_sleep = strtoul(string, NULL, 10);
989                 }
990
991                 c = strstr(p, "rate=");
992                 if (c) {
993                         c += 5;
994                         fill_option(c, string);
995                         td->rate = strtoul(string, NULL, 10);
996                 }
997
998                 c = strstr(p, "ratemin=");
999                 if (c) {
1000                         c += 8;
1001                         fill_option(c, string);
1002                         td->ratemin = strtoul(string, NULL, 10);
1003                 }
1004
1005                 c = strstr(p, "ratecycle=");
1006                 if (c) {
1007                         c += 10;
1008                         fill_option(c, string);
1009                         td->ratecycle = strtoul(string, NULL, 10);
1010                 }
1011
1012                 c = strstr(p, "cpumask=");
1013                 if (c) {
1014                         c += 8;
1015                         fill_option(c, string);
1016                         cpu = strtoul(string, NULL, 10);
1017                         fill_cpu_mask(td->cpumask, cpu);
1018                 }
1019
1020                 c = strstr(p, "fsync=");
1021                 if (c) {
1022                         c += 6;
1023                         fill_option(c, string);
1024                         td->fsync_blocks = strtoul(string, NULL, 10);
1025                 }
1026
1027                 c = strstr(p, "startdelay=");
1028                 if (c) {
1029                         c += 11;
1030                         fill_option(c, string);
1031                         td->start_delay = strtoul(string, NULL, 10);
1032                 }
1033
1034                 c = strstr(p, "timeout=");
1035                 if (c) {
1036                         c += 8;
1037                         fill_option(c, string);
1038                         td->timeout = strtoul(string, NULL, 10);
1039                 }
1040
1041                 c = strstr(p, "aio_depth=");
1042                 if (c) {
1043                         c += 10;
1044                         fill_option(c, string);
1045                         td->aio_depth = strtoul(string, NULL, 10);
1046                 }
1047
1048                 c = strstr(p, "aio");
1049                 if (c)
1050                         td->use_aio = 1;
1051
1052                 c = strstr(p, "random");
1053                 if (c)
1054                         td->sequential = 0;
1055                 c = strstr(p, "sequential");
1056                 if (c)
1057                         td->sequential = 1;
1058
1059                 if (add_job(td, filename, prioclass, prio))
1060                         put_job(td);
1061         }
1062
1063         free(string);
1064         free(filename);
1065 }
1066
1067 static int check_int(char *p, char *name, unsigned int *val)
1068 {
1069         char str[128];
1070
1071         sprintf(str, "%s=%%d", name);
1072         if (sscanf(p, str, val) == 1)
1073                 return 0;
1074
1075         sprintf(str, "%s = %%d", name);
1076         if (sscanf(p, str, val) == 1)
1077                 return 0;
1078
1079         return 1;
1080 }
1081
1082 static int is_empty_or_comment(char *line)
1083 {
1084         unsigned int i;
1085
1086         for (i = 0; i < strlen(line); i++) {
1087                 if (line[i] == ';')
1088                         return 1;
1089                 if (!isspace(line[i]) && !iscntrl(line[i]))
1090                         return 0;
1091         }
1092
1093         return 1;
1094 }
1095
1096 static int parse_jobs_ini(char *file)
1097 {
1098         unsigned int prioclass, prio, cpu, global;
1099         struct thread_data *td;
1100         char *string, *name;
1101         fpos_t off;
1102         FILE *f;
1103         char *p;
1104
1105         f = fopen(file, "r");
1106         if (!f) {
1107                 perror("fopen");
1108                 return 1;
1109         }
1110
1111         string = malloc(4096);
1112         name = malloc(256);
1113
1114         while ((p = fgets(string, 4096, f)) != NULL) {
1115                 if (is_empty_or_comment(p))
1116                         continue;
1117                 if (sscanf(p, "[%s]", name) != 1)
1118                         continue;
1119
1120                 global = !strncmp(name, "global", 6);
1121
1122                 name[strlen(name) - 1] = '\0';
1123
1124                 td = get_new_job(global);
1125                 if (!td)
1126                         break;
1127
1128                 prioclass = 2;
1129                 prio = 4;
1130
1131                 fgetpos(f, &off);
1132                 while ((p = fgets(string, 4096, f)) != NULL) {
1133                         if (is_empty_or_comment(p))
1134                                 continue;
1135                         if (strstr(p, "["))
1136                                 break;
1137                         if (!check_int(p, "bs", &td->bs)) {
1138                                 td->bs <<= 10;
1139                                 fgetpos(f, &off);
1140                                 continue;
1141                         }
1142                         if (!check_int(p, "rw", &td->ddir)) {
1143                                 fgetpos(f, &off);
1144                                 continue;
1145                         }
1146                         if (!check_int(p, "prio", &prio)) {
1147                                 fgetpos(f, &off);
1148                                 continue;
1149                         }
1150                         if (!check_int(p, "prioclass", &prioclass)) {
1151                                 fgetpos(f, &off);
1152                                 continue;
1153                         }
1154                         if (!check_int(p, "direct", &td->odirect)) {
1155                                 fgetpos(f, &off);
1156                                 continue;
1157                         }
1158                         if (!check_int(p, "rate", &td->rate)) {
1159                                 fgetpos(f, &off);
1160                                 continue;
1161                         }
1162                         if (!check_int(p, "ratemin", &td->ratemin)) {
1163                                 fgetpos(f, &off);
1164                                 continue;
1165                         }
1166                         if (!check_int(p, "ratecycle", &td->ratecycle)) {
1167                                 fgetpos(f, &off);
1168                                 continue;
1169                         }
1170                         if (!check_int(p, "delay", &td->delay_sleep)) {
1171                                 fgetpos(f, &off);
1172                                 continue;
1173                         }
1174                         if (!check_int(p, "cpumask", &cpu)) {
1175                                 fill_cpu_mask(td->cpumask, cpu);
1176                                 fgetpos(f, &off);
1177                                 continue;
1178                         }
1179                         if (!check_int(p, "fsync", &td->fsync_blocks)) {
1180                                 fgetpos(f, &off);
1181                                 continue;
1182                         }
1183                         if (!check_int(p, "startdelay", &td->start_delay)) {
1184                                 fgetpos(f, &off);
1185                                 continue;
1186                         }
1187                         if (!check_int(p, "timeout", &td->timeout)) {
1188                                 fgetpos(f, &off);
1189                                 continue;
1190                         }
1191                         if (!check_int(p, "aio_depth", &td->aio_depth)) {
1192                                 fgetpos(f, &off);
1193                                 continue;
1194                         }
1195                         if (!strncmp(p, "sequential", 10)) {
1196                                 td->sequential = 1;
1197                                 fgetpos(f, &off);
1198                                 continue;
1199                         }
1200                         if (!strncmp(p, "random", 6)) {
1201                                 td->sequential = 0;
1202                                 fgetpos(f, &off);
1203                                 continue;
1204                         }
1205                         if (!strncmp(p, "aio", 3)) {
1206                                 td->use_aio = 1;
1207                                 fgetpos(f, &off);
1208                                 continue;
1209                         }
1210
1211                         printf("Client%d: bad option %s\n",td->thread_number,p);
1212                 }
1213                 fsetpos(f, &off);
1214
1215                 if (add_job(td, name, prioclass, prio))
1216                         put_job(td);
1217         }
1218
1219         free(string);
1220         free(name);
1221         fclose(f);
1222         return 0;
1223 }
1224
1225 static int parse_options(int argc, char *argv[])
1226 {
1227         int i;
1228
1229         for (i = 1; i < argc; i++) {
1230                 char *parm = argv[i];
1231
1232                 if (parm[0] != '-')
1233                         break;
1234
1235                 parm++;
1236                 switch (*parm) {
1237                         case 's':
1238                                 parm++;
1239                                 def_thread.sequential = !!atoi(parm);
1240                                 break;
1241                         case 'b':
1242                                 parm++;
1243                                 def_thread.bs = atoi(parm);
1244                                 def_thread.bs <<= 10;
1245                                 if (!def_thread.bs) {
1246                                         printf("bad block size\n");
1247                                         def_thread.bs = DEF_BS;
1248                                 }
1249                                 break;
1250                         case 't':
1251                                 parm++;
1252                                 def_thread.timeout = atoi(parm);
1253                                 break;
1254                         case 'w':
1255                                 parm++;
1256                                 write_stat = !!atoi(parm);
1257                                 break;
1258                         case 'r':
1259                                 parm++;
1260                                 repeatable = !!atoi(parm);
1261                                 break;
1262                         case 'R':
1263                                 parm++;
1264                                 rate_quit = !!atoi(parm);
1265                                 break;
1266                         case 'o':
1267                                 parm++;
1268                                 def_thread.odirect = !!atoi(parm);
1269                                 break;
1270                         case 'f':
1271                                 if (i + 1 >= argc) {
1272                                         printf("-f needs file as arg\n");
1273                                         break;
1274                                 }
1275                                 ini_file = strdup(argv[i+1]);
1276                                 break;
1277                         default:
1278                                 printf("bad option %s\n", argv[i]);
1279                                 break;
1280                 }
1281         }
1282
1283         return i;
1284 }
1285
1286 static void reap_threads(int *nr_running, int *t_rate, int *m_rate)
1287 {
1288         int i;
1289
1290         for (i = 0; i < thread_number; i++) {
1291                 struct thread_data *td = &threads[i];
1292
1293                 if (td->runstate != TD_EXITED)
1294                         continue;
1295
1296                 td->runstate = TD_REAPED;
1297                 waitpid(td->pid, NULL, 0);
1298                 (*nr_running)--;
1299                 (*m_rate) -= td->ratemin;
1300                 (*t_rate) -= td->rate;
1301
1302                 if (td->terminate)
1303                         continue;
1304
1305                 printf("Threads now running: %d", *nr_running);
1306                 if (*m_rate || *t_rate)
1307                         printf(", rate %d/%dKiB/sec", *t_rate, *m_rate);
1308                 printf("\n");
1309         }
1310 }
1311
1312 static void run_threads(char *argv[])
1313 {
1314         struct timeval genesis, now;
1315         struct thread_data *td;
1316         unsigned long spent;
1317         int i, todo, nr_running, m_rate, t_rate;
1318
1319         gettimeofday(&genesis, NULL);
1320
1321         printf("Starting %d threads\n", thread_number);
1322         fflush(stdout);
1323
1324         signal(SIGINT, sig_handler);
1325
1326         todo = thread_number;
1327         nr_running = 0;
1328         m_rate = t_rate = 0;
1329
1330         while (todo) {
1331                 for (i = 0; i < thread_number; i++) {
1332                         td = &threads[i];
1333
1334                         if (td->runstate != TD_NOT_CREATED)
1335                                 continue;
1336
1337                         /*
1338                          * never got a chance to start, killed by other
1339                          * thread for some reason
1340                          */
1341                         if (td->terminate) {
1342                                 todo--;
1343                                 continue;
1344                         }
1345
1346                         if (td->start_delay) {
1347                                 gettimeofday(&now, NULL);
1348                                 spent = mtime_since(&genesis, &now);
1349
1350                                 if (td->start_delay * 1000 > spent)
1351                                         continue;
1352                         }
1353
1354                         td->runstate = TD_CREATED;
1355                         sem_init(&startup_sem, 1, 1);
1356                         todo--;
1357
1358                         if (fork())
1359                                 sem_wait(&startup_sem);
1360                         else {
1361                                 thread_main(shm_id, i, argv);
1362                                 exit(0);
1363                         }
1364                 }
1365
1366                 for (i = 0; i < thread_number; i++) {
1367                         struct thread_data *td = &threads[i];
1368
1369                         if (td->runstate == TD_CREATED) {
1370                                 td->runstate = TD_STARTED;
1371                                 nr_running++;
1372                                 m_rate += td->ratemin;
1373                                 t_rate += td->rate;
1374                                 sem_post(&td->mutex);
1375
1376                                 printf("Threads now running: %d", nr_running);
1377                                 if (m_rate || t_rate)
1378                                         printf(", rate %d/%dKiB/sec", t_rate, m_rate);
1379                                 printf("\n");
1380                         }
1381                 }
1382
1383                 reap_threads(&nr_running, &t_rate, &m_rate);
1384
1385                 if (todo)
1386                         usleep(100000);
1387         }
1388
1389         while (nr_running) {
1390                 reap_threads(&nr_running, &t_rate, &m_rate);
1391                 usleep(10000);
1392         }
1393 }
1394
1395 int main(int argc, char *argv[])
1396 {
1397         static unsigned long max_run[2], min_run[2], total_blocks[2];
1398         static unsigned long max_bw[2], min_bw[2], maxl[2], minl[2];
1399         static unsigned long read_mb, write_mb, read_agg, write_agg;
1400         int i;
1401
1402         shm_id = shmget(0, MAX_JOBS * sizeof(struct thread_data), IPC_CREAT | 0600);
1403         if (shm_id == -1) {
1404                 perror("shmget");
1405                 return 1;
1406         }
1407
1408         threads = shmat(shm_id, NULL, 0);
1409         if (threads == (void *) -1 ) {
1410                 perror("shmat");
1411                 return 1;
1412         }
1413
1414         atexit(free_shm);
1415
1416         if (sched_getaffinity(getpid(), sizeof(cpu_set_t), &def_thread.cpumask) == -1) {
1417                 perror("sched_getaffinity");
1418                 return 1;
1419         }
1420
1421         /*
1422          * fill globals
1423          */
1424         def_thread.ddir = DDIR_READ;
1425         def_thread.bs = DEF_BS;
1426         def_thread.odirect = 1;
1427         def_thread.ratecycle = DEF_RATE_CYCLE;
1428         def_thread.sequential = 1;
1429         def_thread.timeout = DEF_TIMEOUT;
1430
1431         i = parse_options(argc, argv);
1432
1433         if (ini_file) {
1434                 if (parse_jobs_ini(ini_file))
1435                         return 1;
1436         } else
1437                 parse_jobs_cmd(argc, argv, i);
1438
1439         if (!thread_number) {
1440                 printf("Nothing to do\n");
1441                 return 1;
1442         }
1443
1444         printf("%s: %s, bs=%uKiB, timeo=%u, write_stat=%u, odirect=%d\n", argv[0], def_thread.sequential ? "sequential" : "random", def_thread.bs >> 10, def_thread.timeout, write_stat, def_thread.odirect);
1445
1446         run_threads(argv);
1447
1448         min_bw[0] = min_run[0] = ~0UL;
1449         min_bw[1] = min_run[1] = ~0UL;
1450         minl[0] = minl[1] = ~0UL;
1451         for (i = 0; i < thread_number; i++) {
1452                 struct thread_data *td = &threads[i];
1453                 unsigned long bw = 0;
1454
1455                 if (td->error)
1456                         goto show_stat;
1457
1458                 if (td->runtime < min_run[td->ddir])
1459                         min_run[td->ddir] = td->runtime;
1460                 if (td->runtime > max_run[td->ddir])
1461                         max_run[td->ddir] = td->runtime;
1462
1463                 if (td->runtime)
1464                         bw = (td->io_blocks * td->bs) / td->runtime;
1465                 if (bw < min_bw[td->ddir])
1466                         min_bw[td->ddir] = bw;
1467                 if (bw > max_bw[td->ddir])
1468                         max_bw[td->ddir] = bw;
1469                 if (td->max_latency < minl[td->ddir])
1470                         minl[td->ddir] = td->max_latency;
1471                 if (td->max_latency > maxl[td->ddir])
1472                         maxl[td->ddir] = td->max_latency;
1473
1474                 total_blocks[td->ddir] += td->io_blocks;
1475
1476                 if (td->ddir == DDIR_READ) {
1477                         read_mb += (td->bs * td->io_blocks) >> 20;
1478                         if (td->runtime)
1479                                 read_agg += (td->io_blocks * td->bs) / td->runtime;
1480                 }
1481                 if (td->ddir == DDIR_WRITE) {
1482                         write_mb += (td->bs * td->io_blocks) >> 20;
1483                         if (td->runtime)
1484                                 write_agg += (td->io_blocks * td->bs) / td->runtime;
1485                 }
1486
1487 show_stat:
1488                 show_thread_status(td);
1489         }
1490
1491         printf("Run status:\n");
1492         if (max_run[DDIR_READ])
1493                 printf("   READ: io=%luMiB, aggrb=%lu, minl=%lu, maxl=%lu, minb=%lu, maxb=%lu, mint=%lumsec, maxt=%lumsec\n", read_mb, read_agg, minl[0], maxl[0], min_bw[0], max_bw[0], min_run[0], max_run[0]);
1494         if (max_run[DDIR_WRITE])
1495                 printf("  WRITE: io=%luMiB, aggrb=%lu, minl=%lu, maxl=%lu, minb=%lu, maxb=%lu, mint=%lumsec, maxt=%lumsec\n", write_mb, write_agg, minl[1], maxl[1], min_bw[1], max_bw[1], min_run[1], max_run[1]);
1496
1497         return 0;
1498 }