[PATCH] fio: remove old stat code
[disktools.git] / fio.c
1 /*
2  * fio - the flexible io tester
3  *
4  * Copyright (C) 2005 Jens Axboe <axboe@suse.de>
5  *
6  *  This program is free software; you can redistribute it and/or modify
7  *  it under the terms of the GNU General Public License as published by
8  *  the Free Software Foundation; either version 2 of the License, or
9  *  (at your option) any later version.
10  *
11  *  This program is distributed in the hope that it will be useful,
12  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  *  GNU General Public License for more details.
15  *
16  *  You should have received a copy of the GNU General Public License
17  *  along with this program; if not, write to the Free Software
18  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19  *
20  */
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <unistd.h>
24 #include <fcntl.h>
25 #include <string.h>
26 #include <errno.h>
27 #include <signal.h>
28 #include <time.h>
29 #include <ctype.h>
30 #include <sched.h>
31 #include <libaio.h>
32 #include <sys/time.h>
33 #include <sys/types.h>
34 #include <sys/stat.h>
35 #include <sys/wait.h>
36 #include <semaphore.h>
37 #include <sys/ipc.h>
38 #include <sys/shm.h>
39 #include <asm/unistd.h>
40
41 #define MAX_JOBS        (1024)
42
43 /*
44  * assume we don't have _get either, if _set isn't defined
45  */
46 #ifndef __NR_ioprio_set
47
48 #if defined(__i386__)
49 #define __NR_ioprio_set         289
50 #define __NR_ioprio_get         290
51 #elif defined(__powerpc__) || defined(__powerpc64__)
52 #define __NR_ioprio_set         273
53 #define __NR_ioprio_get         274
54 #elif defined(__x86_64__)
55 #define __NR_ioprio_set         251
56 #define __NR_ioprio_get         252
57 #elif defined(__ia64__)
58 #define __NR_ioprio_set         1274
59 #define __NR_ioprio_get         1275
60 #elif defined(__alpha__)
61 #define __NR_ioprio_set         442
62 #define __NR_ioprio_get         443
63 #elif defined(__s390x__) || defined(__s390__)
64 #define __NR_ioprio_set         282
65 #define __NR_ioprio_get         283
66 #else
67 #error "Unsupported arch"
68 #endif
69
70 #endif
71
72 static int ioprio_set(int which, int who, int ioprio)
73 {
74         return syscall(__NR_ioprio_set, which, who, ioprio);
75 }
76
77 enum {
78         IOPRIO_WHO_PROCESS = 1,
79         IOPRIO_WHO_PGRP,
80         IOPRIO_WHO_USER,
81 };
82
83 #define IOPRIO_CLASS_SHIFT      13
84
85 #define MASK    (4095)
86
87 #define DEF_BS          (4096)
88 #define DEF_TIMEOUT     (30)
89 #define DEF_RATE_CYCLE  (1000)
90 #define DEF_ODIRECT     (1)
91 #define DEF_SEQUENTIAL  (1)
92 #define DEF_WRITESTAT   (0)
93 #define DEF_RAND_REPEAT (1)
94
95 #define ALIGN(buf)      (char *) (((unsigned long) (buf) + MASK) & ~(MASK))
96
97 static int write_stat = DEF_WRITESTAT;
98 static int repeatable = DEF_RAND_REPEAT;
99 static int rate_quit = 1;
100
101 static int thread_number;
102 static char *ini_file;
103
104 static int shm_id;
105
106 enum {
107         DDIR_READ = 0,
108         DDIR_WRITE,
109 };
110
111 /*
112  * thread life cycle
113  */
114 enum {
115         TD_NOT_CREATED = 0,
116         TD_CREATED,
117         TD_STARTED,
118         TD_EXITED,
119         TD_REAPED,
120 };
121
122 struct thread_data {
123         char file_name[256];
124         int thread_number;
125         int error;
126         int fd;
127         int stat_fd;
128         pid_t pid;
129         char *buf;
130         volatile int terminate;
131         volatile int runstate;
132         unsigned int ddir;
133         unsigned int ioprio;
134         unsigned int sequential;
135         unsigned int bs;
136         unsigned int odirect;
137         unsigned int delay_sleep;
138         unsigned int fsync_blocks;
139         unsigned int start_delay;
140         unsigned int timeout;
141         unsigned int use_aio;
142         cpu_set_t cpumask;
143
144         io_context_t *aio_ctx;
145         struct iocb *aio_iocbs;
146         unsigned int aio_depth;
147         unsigned int aio_cur_depth;
148         struct io_event *aio_events;
149         char *aio_iocbs_status;
150
151         unsigned int rate;
152         unsigned int ratemin;
153         unsigned int ratecycle;
154         unsigned long rate_usec_cycle;
155         long rate_pending_usleep;
156         unsigned long rate_blocks;
157         struct timeval lastrate;
158
159         unsigned long max_latency;      /* msec */
160         unsigned long min_latency;      /* msec */
161         unsigned long runtime;          /* sec */
162         unsigned long blocks;
163         unsigned long io_blocks;
164         unsigned long last_block;
165         sem_t mutex;
166         struct drand48_data random_state;
167
168         /*
169          * bandwidth stat
170          */
171         unsigned long stat_time;
172         unsigned long stat_time_last;
173         unsigned long stat_blocks_last;
174
175         struct timeval start;
176 };
177
178 static struct thread_data *threads;
179 static struct thread_data def_thread;
180
181 static sem_t startup_sem;
182
183 static void sig_handler(int sig)
184 {
185         int i;
186
187         for (i = 0; i < thread_number; i++) {
188                 struct thread_data *td = &threads[i];
189
190                 td->terminate = 1;
191                 td->start_delay = 0;
192         }
193 }
194
195 static int init_random_state(struct thread_data *td)
196 {
197         unsigned long seed = 123;
198
199         if (td->sequential)
200                 return 0;
201
202         if (!repeatable) {
203                 int fd = open("/dev/random", O_RDONLY);
204
205                 if (fd == -1) {
206                         td->error = errno;
207                         return 1;
208                 }
209
210                 if (read(fd, &seed, sizeof(seed)) < (int) sizeof(seed)) {
211                         td->error = EIO;
212                         close(fd);
213                         return 1;
214                 }
215
216                 close(fd);
217         }
218
219         srand48_r(seed, &td->random_state);
220         return 0;
221 }
222
223 static void shutdown_stat_file(struct thread_data *td)
224 {
225         if (td->stat_fd != -1) {
226                 fsync(td->stat_fd);
227                 close(td->stat_fd);
228         }
229 }
230
231 static int init_stat_file(struct thread_data *td)
232 {
233         char n[256];
234
235         if (!write_stat)
236                 return 0;
237
238         sprintf(n, "fio_thread%d.stat", td->thread_number);
239         td->stat_fd = open(n, O_WRONLY | O_CREAT | O_TRUNC, 0644);
240         if (td->stat_fd == -1) {
241                 perror("open stat file");
242                 td->error = errno;
243                 return 1;
244         }
245
246         return 0;
247 }
248
249 static unsigned long utime_since(struct timeval *s, struct timeval *e)
250 {
251         double sec, usec;
252
253         sec = e->tv_sec - s->tv_sec;
254         usec = e->tv_usec - s->tv_usec;
255         if (sec > 0 && usec < 0) {
256                 sec--;
257                 usec += 1000000;
258         }
259
260         sec *= (double) 1000000;
261
262         return sec + usec;
263 }
264
265 static unsigned long mtime_since(struct timeval *s, struct timeval *e)
266 {
267         double sec, usec;
268
269         sec = e->tv_sec - s->tv_sec;
270         usec = e->tv_usec - s->tv_usec;
271         if (sec > 0 && usec < 0) {
272                 sec--;
273                 usec += 1000000;
274         }
275
276         sec *= (double) 1000;
277         usec /= (double) 1000;
278
279         return sec + usec;
280 }
281
282 static inline unsigned long msec_now(struct timeval *s)
283 {
284         return s->tv_sec * 1000 + s->tv_usec / 1000;
285 }
286
287 static unsigned long get_next_offset(struct thread_data *td)
288 {
289         unsigned long b;
290         long r;
291
292         if (!td->sequential) {
293                 lrand48_r(&td->random_state, &r);
294                 b = (1+(double) (td->blocks-1) * r / (RAND_MAX+1.0));
295         } else {
296                 b = td->last_block;
297                 td->last_block++;
298         }
299
300         return b * td->bs;
301 }
302
303 static void add_stat_sample(struct thread_data *td, unsigned long msec)
304 {
305         char sample[256];
306
307         if (!td->stat_fd)
308                 return;
309
310         td->stat_time += msec;
311         td->stat_time_last += msec;
312         td->stat_blocks_last++;
313
314         if (td->stat_time_last >= 500) {
315                 unsigned long rate = td->stat_blocks_last * td->bs / (td->stat_time_last);
316
317                 td->stat_time_last = 0;
318                 td->stat_blocks_last = 0;
319                 sprintf(sample, "%lu, %lu\n", td->stat_time, rate);
320                 write(td->stat_fd, sample, strlen(sample));
321         }
322 }
323
324 static void usec_sleep(int usec)
325 {
326         struct timespec req = { .tv_sec = 0, .tv_nsec = usec * 1000 };
327         struct timespec rem;
328
329         do {
330                 rem.tv_sec = rem.tv_nsec = 0;
331                 nanosleep(&req, &rem);
332                 if (!rem.tv_nsec)
333                         break;
334
335                 req.tv_nsec = rem.tv_nsec;
336         } while (1);
337 }
338
339 static void rate_throttle(struct thread_data *td, unsigned long time_spent)
340 {
341         if (!td->rate)
342                 return;
343
344         if (time_spent < td->rate_usec_cycle) {
345                 unsigned long s = td->rate_usec_cycle - time_spent;
346
347                 td->rate_pending_usleep += s;
348                 if (td->rate_pending_usleep >= 100000) {
349                         usec_sleep(td->rate_pending_usleep);
350                         td->rate_pending_usleep = 0;
351                 }
352         } else {
353                 long overtime = time_spent - td->rate_usec_cycle;
354
355                 td->rate_pending_usleep -= overtime;
356         }
357 }
358
359 static int check_min_rate(struct thread_data *td, struct timeval *now)
360 {
361         unsigned long spent;
362         unsigned long rate;
363
364         /*
365          * allow a 2 second settle period in the beginning
366          */
367         if (mtime_since(&td->start, now) < 2000)
368                 return 0;
369
370         /*
371          * if rate blocks is set, sample is running
372          */
373         if (td->rate_blocks) {
374                 spent = mtime_since(&td->lastrate, now);
375                 if (spent < td->ratecycle)
376                         return 0;
377
378                 rate = ((td->io_blocks - td->rate_blocks) * td->bs) / spent;
379                 if (rate < td->ratemin) {
380                         printf("Client%d: min rate %d not met, got %ldKiB/sec\n", td->thread_number, td->ratemin, rate);
381                         if (rate_quit)
382                                 sig_handler(0);
383                         return 1;
384                 }
385         }
386
387         td->rate_blocks = td->io_blocks;
388         memcpy(&td->lastrate, now, sizeof(*now));
389         return 0;
390 }
391
392 static inline int runtime_exceeded(struct thread_data *td, struct timeval *t)
393 {
394         if (mtime_since(&td->start, t) >= td->timeout * 1000)
395                 return 1;
396
397         return 0;
398 }
399
400 #define should_fsync(td)        ((td)->ddir == DDIR_WRITE && !(td)->odirect)
401
402 static void do_sync_io(struct thread_data *td)
403 {
404         struct timeval s, e;
405         unsigned long blocks, msec, usec;
406
407         for (blocks = 0; blocks < td->blocks; blocks++) {
408                 off_t offset = get_next_offset(td);
409                 int ret;
410
411                 if (td->terminate)
412                         break;
413
414                 if (lseek(td->fd, offset, SEEK_SET) == -1) {
415                         td->error = errno;
416                         break;
417                 }
418
419                 if (td->delay_sleep)
420                         usec_sleep(td->delay_sleep);
421
422                 gettimeofday(&s, NULL);
423
424                 if (td->ddir == DDIR_READ)
425                         ret = read(td->fd, td->buf, td->bs);
426                 else
427                         ret = write(td->fd, td->buf, td->bs);
428
429                 if (ret < (int) td->bs) {
430                         if (ret == -1)
431                                 td->error = errno;
432                         break;
433                 }
434
435                 td->io_blocks++;
436
437                 if (should_fsync(td) && td->fsync_blocks &&
438                     (td->io_blocks % td->fsync_blocks) == 0)
439                         fsync(td->fd);
440
441                 gettimeofday(&e, NULL);
442
443                 usec = utime_since(&s, &e);
444
445                 rate_throttle(td, usec);
446
447                 if (check_min_rate(td, &e)) {
448                         td->error = ENODATA;
449                         break;
450                 }
451
452                 msec = usec / 1000;
453                 add_stat_sample(td, msec);
454
455                 if (msec < td->min_latency)
456                         td->min_latency = msec;
457                 if (msec > td->max_latency)
458                         td->max_latency = msec;
459
460                 if (runtime_exceeded(td, &e))
461                         break;
462         }
463
464         if (should_fsync(td))
465                 fsync(td->fd);
466 }
467
468 static void aio_put_iocb(struct thread_data *td, struct iocb *iocb)
469 {
470         long offset = ((long) iocb - (long) td->aio_iocbs)/ sizeof(struct iocb);
471
472         td->aio_iocbs_status[offset] = 0;
473         td->aio_cur_depth--;
474 }
475
476 static struct iocb *aio_get_iocb(struct thread_data *td, struct timeval *t)
477 {
478         struct iocb *iocb = NULL;
479         unsigned int i;
480
481         for (i = 0; i < td->aio_depth; i++) {
482                 if (td->aio_iocbs_status[i] == 0) {
483                         td->aio_iocbs_status[i] = 1;
484                         iocb = &td->aio_iocbs[i];
485                         break;
486                 }
487         }
488
489         if (iocb) {
490                 off_t off = get_next_offset(td);
491                 char *p = td->buf + i * td->bs;
492
493                 if (td->ddir == DDIR_READ)
494                         io_prep_pread(iocb, td->fd, p, td->bs, off);
495                 else
496                         io_prep_pwrite(iocb, td->fd, p, td->bs, off);
497
498                 io_set_callback(iocb, (io_callback_t) msec_now(t));
499         }
500
501         return iocb;
502 }
503
504 static int aio_submit(struct thread_data *td, struct iocb *iocb)
505 {
506         int ret;
507
508         do {
509                 ret = io_submit(*td->aio_ctx, 1, &iocb);
510                 if (ret == 1)
511                         return 0;
512
513                 if (errno == EINTR)
514                         continue;
515                 else if (errno == EAGAIN)
516                         usleep(100);
517                 else
518                         break;
519         } while (1);
520
521         return 1;
522 }
523
524 #define iocb_time(iocb) ((unsigned long) (iocb)->data)
525
526 static void do_async_io(struct thread_data *td)
527 {
528         struct timeval s, e;
529         unsigned long blocks, msec, usec;
530
531         for (blocks = 0; blocks < td->blocks; blocks++) {
532                 struct timespec ts = { .tv_sec = 0, .tv_nsec = 0};
533                 struct timespec *timeout;
534                 int ret, i, min_evts = 0;
535                 struct iocb *iocb;
536
537                 if (td->terminate)
538                         break;
539
540                 if (td->delay_sleep)
541                         usec_sleep(td->delay_sleep);
542
543                 gettimeofday(&s, NULL);
544
545                 iocb = aio_get_iocb(td, &s);
546
547                 ret = aio_submit(td, iocb);
548                 if (ret) {
549                         td->error = errno;
550                         break;
551                 }
552
553                 td->aio_cur_depth++;
554
555                 if (td->aio_cur_depth < td->aio_depth) {
556                         timeout = &ts;
557                         min_evts = 0;
558                 } else {
559                         timeout = NULL;
560                         min_evts = 1;
561                 }
562
563                 ret = io_getevents(*td->aio_ctx, min_evts, td->aio_cur_depth, td->aio_events, timeout);
564                 if (ret < 0) {
565                         td->error = errno;
566                         break;
567                 } else if (!ret)
568                         continue;
569
570                 gettimeofday(&e, NULL);
571
572                 for (i = 0; i < ret; i++) {
573                         struct io_event *ev = td->aio_events + i;
574
575                         td->io_blocks++;
576
577                         iocb = ev->obj;
578
579                         msec = msec_now(&e) - iocb_time(iocb);
580                         add_stat_sample(td, msec);
581
582                         if (msec < td->min_latency)
583                                 td->min_latency = msec;
584                         if (msec > td->max_latency)
585                                 td->max_latency = msec;
586
587                         aio_put_iocb(td, iocb);
588                 }
589
590                 /*
591                  * the rate is batched for now, it should work for batches
592                  * of completions except the very first one which may look
593                  * a little bursty
594                  */
595                 usec = utime_since(&s, &e);
596
597                 rate_throttle(td, usec);
598
599                 if (check_min_rate(td, &e)) {
600                         td->error = ENODATA;
601                         break;
602                 }
603
604                 if (runtime_exceeded(td, &e))
605                         break;
606         }
607 }
608
609 static void cleanup_pending_aio(struct thread_data *td)
610 {
611         struct timespec ts = { .tv_sec = 0, .tv_nsec = 0};
612         unsigned int i;
613         int r;
614
615         /*
616          * get immediately available events, if any
617          */
618         r = io_getevents(*td->aio_ctx, 0, td->aio_cur_depth, td->aio_events, &ts);
619         if (r > 0) {
620                 for (i = 0; i < r; i++)
621                         aio_put_iocb(td, &td->aio_iocbs[i]);
622         }
623
624         /*
625          * now cancel remaining active events
626          */
627         for (i = 0; i < td->aio_depth; i++) {
628                 if (td->aio_iocbs_status[i] == 0)
629                         continue;
630
631                 r = io_cancel(*td->aio_ctx, &td->aio_iocbs[i], td->aio_events);
632                 if (!r)
633                         aio_put_iocb(td, &td->aio_iocbs[i]);
634         }
635
636         if (td->aio_cur_depth)
637                 io_getevents(*td->aio_ctx, td->aio_cur_depth, td->aio_cur_depth, td->aio_events, NULL);
638 }
639
640 static void cleanup_aio(struct thread_data *td)
641 {
642         if (td->aio_cur_depth)
643                 cleanup_pending_aio(td);
644
645         if (td->aio_ctx) {
646                 io_destroy(*td->aio_ctx);
647                 free(td->aio_ctx);
648         }
649         if (td->aio_iocbs)
650                 free(td->aio_iocbs);
651         if (td->aio_events)
652                 free(td->aio_events);
653         if (td->aio_iocbs_status)
654                 free(td->aio_iocbs_status);
655 }
656
657 static int init_aio(struct thread_data *td)
658 {
659         td->aio_ctx = malloc(sizeof(*td->aio_ctx));
660
661         if (io_queue_init(td->aio_depth, td->aio_ctx)) {
662                 td->error = errno;
663                 return 1;
664         }
665
666         td->aio_iocbs = malloc(td->aio_depth * sizeof(struct iocb));
667         td->aio_events = malloc(td->aio_depth * sizeof(struct io_event));
668         td->aio_iocbs_status = malloc(td->aio_depth * sizeof(char));
669         return 0;
670 }
671
672 static void *thread_main(int shm_id, int offset, char *argv[])
673 {
674         struct thread_data *td;
675         struct timeval end;
676         void *data, *ptr = NULL;
677         struct stat st;
678         int ret = 1, flags;
679
680         setsid();
681
682         data = shmat(shm_id, NULL, 0);
683         td = data + offset * sizeof(struct thread_data);
684         td->pid = getpid();
685
686         td->fd = -1;
687
688         if (sched_setaffinity(td->pid, sizeof(td->cpumask), &td->cpumask) == -1) {
689                 td->error = errno;
690                 goto err;
691         }
692
693         printf("Thread (%s) (pid=%u) (f=%s) (aio=%d) started\n", td->ddir == DDIR_READ ? "read" : "write", td->pid, td->file_name, td->aio_depth);
694         fflush(stdout);
695
696         sprintf(argv[0], "fio%d", offset);
697
698         flags = 0;
699         if (td->odirect)
700                 flags |= O_DIRECT;
701
702         if (td->ddir == DDIR_READ)
703                 td->fd = open(td->file_name, flags | O_RDONLY);
704         else
705                 td->fd = open(td->file_name, flags | O_WRONLY | O_CREAT | O_TRUNC, 0644);
706
707         if (td->fd == -1) {
708                 td->error = errno;
709                 goto err;
710         }
711
712         if (td->use_aio && init_aio(td))
713                 goto err;
714
715         if (init_random_state(td))
716                 goto err;
717         if (init_stat_file(td))
718                 goto err;
719
720         if (td->ddir == DDIR_READ) {
721                 if (fstat(td->fd, &st) == -1) {
722                         td->error = errno;
723                         goto err;
724                 }
725
726                 td->blocks = st.st_size / td->bs;
727                 if (!td->blocks) {
728                         td->error = EINVAL;
729                         goto err;
730                 }
731         } else
732                 td->blocks = 1024 * 1024 * 1024 / td->bs;
733
734         if (td->ioprio) {
735                 if (ioprio_set(IOPRIO_WHO_PROCESS, 0, td->ioprio) == -1) {
736                         td->error = errno;
737                         goto err;
738                 }
739         }
740
741         sem_post(&startup_sem);
742         sem_wait(&td->mutex);
743
744         gettimeofday(&td->start, NULL);
745
746         if (td->ratemin)
747                 memcpy(&td->lastrate, &td->start, sizeof(td->start));
748
749         if (!td->use_aio) {
750                 ptr = malloc(td->bs + MASK);
751                 td->buf = ALIGN(ptr);
752                 do_sync_io(td);
753         } else {
754                 ptr = malloc(td->bs * td->aio_depth + MASK);
755                 td->buf = ALIGN(ptr);
756                 do_async_io(td);
757         }
758
759         gettimeofday(&end, NULL);
760         td->runtime = mtime_since(&td->start, &end);
761
762         ret = 0;
763
764 err:
765         shutdown_stat_file(td);
766         if (td->use_aio)
767                 cleanup_aio(td);
768         if (td->fd != -1) {
769                 close(td->fd);
770                 td->fd = -1;
771         }
772         if (ret) {
773                 sem_post(&startup_sem);
774                 sem_wait(&td->mutex);
775         }
776         if (ptr)
777                 free(ptr);
778         td->runstate = TD_EXITED;
779         shmdt(data);
780         return NULL;
781 }
782
783 static void free_shm(void)
784 {
785         shmdt(threads);
786 }
787
788 static void show_thread_status(struct thread_data *td)
789 {
790         int prio, prio_class;
791         unsigned long bw = 0;
792
793         if (!td->io_blocks && !td->error)
794                 return;
795
796         if (td->runtime)
797                 bw = (td->io_blocks * td->bs) / td->runtime;
798
799         prio = td->ioprio & 0xff;
800         prio_class = td->ioprio >> IOPRIO_CLASS_SHIFT;
801
802         printf("thread%d (%s): err=%2d, prio=%1d/%1d maxl=%5lumsec, io=%6luMiB, bw=%6luKiB/sec\n", td->thread_number, td->ddir == DDIR_READ ? " read": "write", td->error, prio_class, prio, td->max_latency, td->io_blocks * td->bs >> 20, bw);
803 }
804
805 static int setup_rate(struct thread_data *td)
806 {
807         int nr_reads_per_sec;
808
809         if (!td->rate)
810                 return 0;
811
812         if (td->rate < td->ratemin) {
813                 fprintf(stderr, "min rate larger than nominal rate\n");
814                 return -1;
815         }
816
817         nr_reads_per_sec = td->rate * 1024 / td->bs;
818         td->rate_usec_cycle = 1000000 / nr_reads_per_sec;
819         td->rate_pending_usleep = 0;
820         return 0;
821 }
822
823 static struct thread_data *get_new_job(int global)
824 {
825         struct thread_data *td;
826
827         if (global)
828                 return &def_thread;
829         if (thread_number >= MAX_JOBS)
830                 return NULL;
831
832         td = &threads[thread_number++];
833         memset(td, 0, sizeof(*td));
834
835         td->thread_number = thread_number;
836         td->ddir = def_thread.ddir;
837         td->bs = def_thread.bs;
838         td->odirect = def_thread.odirect;
839         td->ratecycle = def_thread.ratecycle;
840         td->sequential = def_thread.sequential;
841         td->timeout = def_thread.timeout;
842         memcpy(&td->cpumask, &def_thread.cpumask, sizeof(td->cpumask));
843
844         return td;
845 }
846
847 static void put_job(struct thread_data *td)
848 {
849         memset(&threads[td->thread_number - 1], 0, sizeof(*td));
850         thread_number--;
851 }
852
853 static int add_job(struct thread_data *td, const char *filename, int prioclass,
854                    int prio)
855 {
856         if (td == &def_thread)
857                 return 0;
858
859         strcpy(td->file_name, filename);
860         td->stat_fd = -1;
861         sem_init(&td->mutex, 1, 0);
862         td->min_latency = 10000000;
863         td->ioprio = (prioclass << IOPRIO_CLASS_SHIFT) | prio;
864
865         if (td->use_aio && !td->aio_depth)
866                 td->aio_depth = 1;
867
868         if (setup_rate(td))
869                 return -1;
870
871         printf("Client%d: file=%s, rw=%d, prio=%d, seq=%d, odir=%d, bs=%d, rate=%d, aio=%d, aio_depth=%d\n", td->thread_number, filename, td->ddir, td->ioprio, td->sequential, td->odirect, td->bs, td->rate, td->use_aio, td->aio_depth);
872         return 0;
873 }
874
875 static void fill_cpu_mask(cpu_set_t cpumask, int cpu)
876 {
877         unsigned int i;
878
879         CPU_ZERO(&cpumask);
880
881         for (i = 0; i < sizeof(int) * 8; i++) {
882                 if ((1 << i) & cpu)
883                         CPU_SET(i, &cpumask);
884         }
885 }
886
887 static void fill_option(const char *input, char *output)
888 {
889         int i;
890
891         i = 0;
892         while (input[i] != ',' && input[i] != '}' && input[i] != '\0') {
893                 output[i] = input[i];
894                 i++;
895         }
896
897         output[i] = '\0';
898 }
899
900 /*
901  * job key words:
902  *
903  * file=
904  * bs=
905  * rw=
906  * direct=
907  */
908 static void parse_jobs_cmd(int argc, char *argv[], int index)
909 {
910         struct thread_data *td;
911         unsigned int prio, prioclass, cpu;
912         char *string, *filename, *p, *c;
913         int i;
914
915         string = malloc(256);
916         filename = malloc(256);
917
918         for (i = index; i < argc; i++) {
919                 p = argv[i];
920
921                 c = strpbrk(p, "{");
922                 if (!c)
923                         break;
924
925                 filename[0] = 0;
926
927                 td = get_new_job(0);
928                 if (!td)
929                         break;
930
931                 prioclass = 2;
932                 prio = 4;
933
934                 c = strstr(p, "rw=");
935                 if (c) {
936                         c += 3;
937                         if (*c == '0')
938                                 td->ddir = DDIR_READ;
939                         else
940                                 td->ddir = DDIR_WRITE;
941                 }
942
943                 c = strstr(p, "prio=");
944                 if (c) {
945                         c += 5;
946                         prio = *c - '0';
947                 }
948
949                 c = strstr(p, "prioclass=");
950                 if (c) {
951                         c += 10;
952                         prioclass = *c - '0';
953                 }
954
955                 c = strstr(p, "file=");
956                 if (c) {
957                         c += 5;
958                         fill_option(c, filename);
959                 }
960
961                 c = strstr(p, "bs=");
962                 if (c) {
963                         c += 3;
964                         fill_option(c, string);
965                         td->bs = strtoul(string, NULL, 10);
966                         td->bs <<= 10;
967                 }
968
969                 c = strstr(p, "direct=");
970                 if (c) {
971                         c += 7;
972                         if (*c != '0')
973                                 td->odirect = 1;
974                         else
975                                 td->odirect = 0;
976                 }
977
978                 c = strstr(p, "delay=");
979                 if (c) {
980                         c += 6;
981                         fill_option(c, string);
982                         td->delay_sleep = strtoul(string, NULL, 10);
983                 }
984
985                 c = strstr(p, "rate=");
986                 if (c) {
987                         c += 5;
988                         fill_option(c, string);
989                         td->rate = strtoul(string, NULL, 10);
990                 }
991
992                 c = strstr(p, "ratemin=");
993                 if (c) {
994                         c += 8;
995                         fill_option(c, string);
996                         td->ratemin = strtoul(string, NULL, 10);
997                 }
998
999                 c = strstr(p, "ratecycle=");
1000                 if (c) {
1001                         c += 10;
1002                         fill_option(c, string);
1003                         td->ratecycle = strtoul(string, NULL, 10);
1004                 }
1005
1006                 c = strstr(p, "cpumask=");
1007                 if (c) {
1008                         c += 8;
1009                         fill_option(c, string);
1010                         cpu = strtoul(string, NULL, 10);
1011                         fill_cpu_mask(td->cpumask, cpu);
1012                 }
1013
1014                 c = strstr(p, "fsync=");
1015                 if (c) {
1016                         c += 6;
1017                         fill_option(c, string);
1018                         td->fsync_blocks = strtoul(string, NULL, 10);
1019                 }
1020
1021                 c = strstr(p, "startdelay=");
1022                 if (c) {
1023                         c += 11;
1024                         fill_option(c, string);
1025                         td->start_delay = strtoul(string, NULL, 10);
1026                 }
1027
1028                 c = strstr(p, "timeout=");
1029                 if (c) {
1030                         c += 8;
1031                         fill_option(c, string);
1032                         td->timeout = strtoul(string, NULL, 10);
1033                 }
1034
1035                 c = strstr(p, "aio_depth=");
1036                 if (c) {
1037                         c += 10;
1038                         fill_option(c, string);
1039                         td->aio_depth = strtoul(string, NULL, 10);
1040                 }
1041
1042                 c = strstr(p, "aio");
1043                 if (c)
1044                         td->use_aio = 1;
1045
1046                 c = strstr(p, "random");
1047                 if (c)
1048                         td->sequential = 0;
1049                 c = strstr(p, "sequential");
1050                 if (c)
1051                         td->sequential = 1;
1052
1053                 if (add_job(td, filename, prioclass, prio))
1054                         put_job(td);
1055         }
1056
1057         free(string);
1058         free(filename);
1059 }
1060
1061 static int check_int(char *p, char *name, unsigned int *val)
1062 {
1063         char str[128];
1064
1065         sprintf(str, "%s=%%d", name);
1066         if (sscanf(p, str, val) == 1)
1067                 return 0;
1068
1069         sprintf(str, "%s = %%d", name);
1070         if (sscanf(p, str, val) == 1)
1071                 return 0;
1072
1073         return 1;
1074 }
1075
1076 static int is_empty_or_comment(char *line)
1077 {
1078         unsigned int i;
1079
1080         for (i = 0; i < strlen(line); i++) {
1081                 if (line[i] == ';')
1082                         return 1;
1083                 if (!isspace(line[i]) && !iscntrl(line[i]))
1084                         return 0;
1085         }
1086
1087         return 1;
1088 }
1089
1090 static int parse_jobs_ini(char *file)
1091 {
1092         unsigned int prioclass, prio, cpu, global;
1093         struct thread_data *td;
1094         char *string, *name;
1095         fpos_t off;
1096         FILE *f;
1097         char *p;
1098
1099         f = fopen(file, "r");
1100         if (!f) {
1101                 perror("fopen");
1102                 return 1;
1103         }
1104
1105         string = malloc(4096);
1106         name = malloc(256);
1107
1108         while ((p = fgets(string, 4096, f)) != NULL) {
1109                 if (is_empty_or_comment(p))
1110                         continue;
1111                 if (sscanf(p, "[%s]", name) != 1)
1112                         continue;
1113
1114                 global = !strncmp(name, "global", 6);
1115
1116                 name[strlen(name) - 1] = '\0';
1117
1118                 td = get_new_job(global);
1119                 if (!td)
1120                         break;
1121
1122                 prioclass = 2;
1123                 prio = 4;
1124
1125                 fgetpos(f, &off);
1126                 while ((p = fgets(string, 4096, f)) != NULL) {
1127                         if (is_empty_or_comment(p))
1128                                 continue;
1129                         if (strstr(p, "["))
1130                                 break;
1131                         if (!check_int(p, "bs", &td->bs)) {
1132                                 td->bs <<= 10;
1133                                 fgetpos(f, &off);
1134                                 continue;
1135                         }
1136                         if (!check_int(p, "rw", &td->ddir)) {
1137                                 fgetpos(f, &off);
1138                                 continue;
1139                         }
1140                         if (!check_int(p, "prio", &prio)) {
1141                                 fgetpos(f, &off);
1142                                 continue;
1143                         }
1144                         if (!check_int(p, "prioclass", &prioclass)) {
1145                                 fgetpos(f, &off);
1146                                 continue;
1147                         }
1148                         if (!check_int(p, "direct", &td->odirect)) {
1149                                 fgetpos(f, &off);
1150                                 continue;
1151                         }
1152                         if (!check_int(p, "rate", &td->rate)) {
1153                                 fgetpos(f, &off);
1154                                 continue;
1155                         }
1156                         if (!check_int(p, "ratemin", &td->ratemin)) {
1157                                 fgetpos(f, &off);
1158                                 continue;
1159                         }
1160                         if (!check_int(p, "ratecycle", &td->ratecycle)) {
1161                                 fgetpos(f, &off);
1162                                 continue;
1163                         }
1164                         if (!check_int(p, "delay", &td->delay_sleep)) {
1165                                 fgetpos(f, &off);
1166                                 continue;
1167                         }
1168                         if (!check_int(p, "cpumask", &cpu)) {
1169                                 fill_cpu_mask(td->cpumask, cpu);
1170                                 fgetpos(f, &off);
1171                                 continue;
1172                         }
1173                         if (!check_int(p, "fsync", &td->fsync_blocks)) {
1174                                 fgetpos(f, &off);
1175                                 continue;
1176                         }
1177                         if (!check_int(p, "startdelay", &td->start_delay)) {
1178                                 fgetpos(f, &off);
1179                                 continue;
1180                         }
1181                         if (!check_int(p, "timeout", &td->timeout)) {
1182                                 fgetpos(f, &off);
1183                                 continue;
1184                         }
1185                         if (!check_int(p, "aio_depth", &td->aio_depth)) {
1186                                 fgetpos(f, &off);
1187                                 continue;
1188                         }
1189                         if (!strncmp(p, "sequential", 10)) {
1190                                 td->sequential = 1;
1191                                 fgetpos(f, &off);
1192                                 continue;
1193                         }
1194                         if (!strncmp(p, "random", 6)) {
1195                                 td->sequential = 0;
1196                                 fgetpos(f, &off);
1197                                 continue;
1198                         }
1199                         if (!strncmp(p, "aio", 3)) {
1200                                 td->use_aio = 1;
1201                                 fgetpos(f, &off);
1202                                 continue;
1203                         }
1204
1205                         printf("Client%d: bad option %s\n",td->thread_number,p);
1206                 }
1207                 fsetpos(f, &off);
1208
1209                 if (add_job(td, name, prioclass, prio))
1210                         put_job(td);
1211         }
1212
1213         free(string);
1214         free(name);
1215         fclose(f);
1216         return 0;
1217 }
1218
1219 static int parse_options(int argc, char *argv[])
1220 {
1221         int i;
1222
1223         for (i = 1; i < argc; i++) {
1224                 char *parm = argv[i];
1225
1226                 if (parm[0] != '-')
1227                         break;
1228
1229                 parm++;
1230                 switch (*parm) {
1231                         case 's':
1232                                 parm++;
1233                                 def_thread.sequential = !!atoi(parm);
1234                                 break;
1235                         case 'b':
1236                                 parm++;
1237                                 def_thread.bs = atoi(parm);
1238                                 def_thread.bs <<= 10;
1239                                 if (!def_thread.bs) {
1240                                         printf("bad block size\n");
1241                                         def_thread.bs = DEF_BS;
1242                                 }
1243                                 break;
1244                         case 't':
1245                                 parm++;
1246                                 def_thread.timeout = atoi(parm);
1247                                 break;
1248                         case 'w':
1249                                 parm++;
1250                                 write_stat = !!atoi(parm);
1251                                 break;
1252                         case 'r':
1253                                 parm++;
1254                                 repeatable = !!atoi(parm);
1255                                 break;
1256                         case 'R':
1257                                 parm++;
1258                                 rate_quit = !!atoi(parm);
1259                                 break;
1260                         case 'o':
1261                                 parm++;
1262                                 def_thread.odirect = !!atoi(parm);
1263                                 break;
1264                         case 'f':
1265                                 if (i + 1 >= argc) {
1266                                         printf("-f needs file as arg\n");
1267                                         break;
1268                                 }
1269                                 ini_file = strdup(argv[i+1]);
1270                                 break;
1271                         default:
1272                                 printf("bad option %s\n", argv[i]);
1273                                 break;
1274                 }
1275         }
1276
1277         return i;
1278 }
1279
1280 static void reap_threads(int *nr_running, int *t_rate, int *m_rate)
1281 {
1282         int i;
1283
1284         for (i = 0; i < thread_number; i++) {
1285                 struct thread_data *td = &threads[i];
1286
1287                 if (td->runstate != TD_EXITED)
1288                         continue;
1289
1290                 td->runstate = TD_REAPED;
1291                 waitpid(td->pid, NULL, 0);
1292                 (*nr_running)--;
1293                 (*m_rate) -= td->ratemin;
1294                 (*t_rate) -= td->rate;
1295
1296                 if (td->terminate)
1297                         continue;
1298
1299                 printf("Threads now running: %d", *nr_running);
1300                 if (*m_rate || *t_rate)
1301                         printf(", rate %d/%dKiB/sec", *t_rate, *m_rate);
1302                 printf("\n");
1303         }
1304 }
1305
1306 static void run_threads(char *argv[])
1307 {
1308         struct timeval genesis, now;
1309         struct thread_data *td;
1310         unsigned long spent;
1311         int i, todo, nr_running, m_rate, t_rate;
1312
1313         gettimeofday(&genesis, NULL);
1314
1315         printf("Starting %d threads\n", thread_number);
1316         fflush(stdout);
1317
1318         signal(SIGINT, sig_handler);
1319
1320         todo = thread_number;
1321         nr_running = 0;
1322         m_rate = t_rate = 0;
1323
1324         while (todo) {
1325                 for (i = 0; i < thread_number; i++) {
1326                         td = &threads[i];
1327
1328                         if (td->runstate != TD_NOT_CREATED)
1329                                 continue;
1330
1331                         /*
1332                          * never got a chance to start, killed by other
1333                          * thread for some reason
1334                          */
1335                         if (td->terminate) {
1336                                 todo--;
1337                                 continue;
1338                         }
1339
1340                         if (td->start_delay) {
1341                                 gettimeofday(&now, NULL);
1342                                 spent = mtime_since(&genesis, &now);
1343
1344                                 if (td->start_delay * 1000 > spent)
1345                                         continue;
1346                         }
1347
1348                         td->runstate = TD_CREATED;
1349                         sem_init(&startup_sem, 1, 1);
1350                         todo--;
1351
1352                         if (fork())
1353                                 sem_wait(&startup_sem);
1354                         else {
1355                                 thread_main(shm_id, i, argv);
1356                                 exit(0);
1357                         }
1358                 }
1359
1360                 for (i = 0; i < thread_number; i++) {
1361                         struct thread_data *td = &threads[i];
1362
1363                         if (td->runstate == TD_CREATED) {
1364                                 td->runstate = TD_STARTED;
1365                                 nr_running++;
1366                                 m_rate += td->ratemin;
1367                                 t_rate += td->rate;
1368                                 sem_post(&td->mutex);
1369
1370                                 printf("Threads now running: %d", nr_running);
1371                                 if (m_rate || t_rate)
1372                                         printf(", rate %d/%dKiB/sec", t_rate, m_rate);
1373                                 printf("\n");
1374                         }
1375                 }
1376
1377                 reap_threads(&nr_running, &t_rate, &m_rate);
1378
1379                 if (todo)
1380                         usleep(100000);
1381         }
1382
1383         while (nr_running) {
1384                 reap_threads(&nr_running, &t_rate, &m_rate);
1385                 usleep(10000);
1386         }
1387 }
1388
1389 int main(int argc, char *argv[])
1390 {
1391         static unsigned long max_run[2], min_run[2], total_blocks[2];
1392         static unsigned long max_bw[2], min_bw[2], maxl[2], minl[2];
1393         static unsigned long read_mb, write_mb, read_agg, write_agg;
1394         int i;
1395
1396         shm_id = shmget(0, MAX_JOBS * sizeof(struct thread_data), IPC_CREAT | 0600);
1397         if (shm_id == -1) {
1398                 perror("shmget");
1399                 return 1;
1400         }
1401
1402         threads = shmat(shm_id, NULL, 0);
1403         if (threads == (void *) -1 ) {
1404                 perror("shmat");
1405                 return 1;
1406         }
1407
1408         atexit(free_shm);
1409
1410         if (sched_getaffinity(getpid(), sizeof(cpu_set_t), &def_thread.cpumask) == -1) {
1411                 perror("sched_getaffinity");
1412                 return 1;
1413         }
1414
1415         /*
1416          * fill globals
1417          */
1418         def_thread.ddir = DDIR_READ;
1419         def_thread.bs = DEF_BS;
1420         def_thread.odirect = 1;
1421         def_thread.ratecycle = DEF_RATE_CYCLE;
1422         def_thread.sequential = 1;
1423         def_thread.timeout = DEF_TIMEOUT;
1424
1425         i = parse_options(argc, argv);
1426
1427         if (ini_file) {
1428                 if (parse_jobs_ini(ini_file))
1429                         return 1;
1430         } else
1431                 parse_jobs_cmd(argc, argv, i);
1432
1433         if (!thread_number) {
1434                 printf("Nothing to do\n");
1435                 return 1;
1436         }
1437
1438         printf("%s: %s, bs=%uKiB, timeo=%u, write_stat=%u, odirect=%d\n", argv[0], def_thread.sequential ? "sequential" : "random", def_thread.bs >> 10, def_thread.timeout, write_stat, def_thread.odirect);
1439
1440         run_threads(argv);
1441
1442         min_bw[0] = min_run[0] = ~0UL;
1443         min_bw[1] = min_run[1] = ~0UL;
1444         minl[0] = minl[1] = ~0UL;
1445         for (i = 0; i < thread_number; i++) {
1446                 struct thread_data *td = &threads[i];
1447                 unsigned long bw = 0;
1448
1449                 if (td->error)
1450                         goto show_stat;
1451
1452                 if (td->runtime < min_run[td->ddir])
1453                         min_run[td->ddir] = td->runtime;
1454                 if (td->runtime > max_run[td->ddir])
1455                         max_run[td->ddir] = td->runtime;
1456
1457                 if (td->runtime)
1458                         bw = (td->io_blocks * td->bs) / td->runtime;
1459                 if (bw < min_bw[td->ddir])
1460                         min_bw[td->ddir] = bw;
1461                 if (bw > max_bw[td->ddir])
1462                         max_bw[td->ddir] = bw;
1463                 if (td->max_latency < minl[td->ddir])
1464                         minl[td->ddir] = td->max_latency;
1465                 if (td->max_latency > maxl[td->ddir])
1466                         maxl[td->ddir] = td->max_latency;
1467
1468                 total_blocks[td->ddir] += td->io_blocks;
1469
1470                 if (td->ddir == DDIR_READ) {
1471                         read_mb += (td->bs * td->io_blocks) >> 20;
1472                         if (td->runtime)
1473                                 read_agg += (td->io_blocks * td->bs) / td->runtime;
1474                 }
1475                 if (td->ddir == DDIR_WRITE) {
1476                         write_mb += (td->bs * td->io_blocks) >> 20;
1477                         if (td->runtime)
1478                                 write_agg += (td->io_blocks * td->bs) / td->runtime;
1479                 }
1480
1481 show_stat:
1482                 show_thread_status(td);
1483         }
1484
1485         printf("Run status:\n");
1486         if (max_run[DDIR_READ])
1487                 printf("   READ: io=%luMiB, aggrb=%lu, minl=%lu, maxl=%lu, minb=%lu, maxb=%lu, mint=%lumsec, maxt=%lumsec\n", read_mb, read_agg, minl[0], maxl[0], min_bw[0], max_bw[0], min_run[0], max_run[0]);
1488         if (max_run[DDIR_WRITE])
1489                 printf("  WRITE: io=%luMiB, aggrb=%lu, minl=%lu, maxl=%lu, minb=%lu, maxb=%lu, mint=%lumsec, maxt=%lumsec\n", write_mb, write_agg, minl[1], maxl[1], min_bw[1], max_bw[1], min_run[1], max_run[1]);
1490
1491         return 0;
1492 }