[PATCH] fio: Add ratemin/ratecycle/fsync options and lots of cleanups
authorJens Axboe <axboe@suse.de>
Wed, 19 Oct 2005 11:42:09 +0000 (13:42 +0200)
committerJens Axboe <axboe@suse.de>
Wed, 19 Oct 2005 11:42:09 +0000 (13:42 +0200)
- Define max jobs and allocate threads in advance, was buggy
- ->mutex should default to locked for proper synced startup
- Add fsync option for fsync'ing for every N blocks on non-direct writes
- Add ratemin option for quitting the thread if min rate isn't met
- Add ratecycle option for setting rate average period
- Lots of cleanups

README.fio
fio.c

index 8430f0c1325f94b32e49d97845b563112732bb99..abe06f21fb6dac113e50f5f61259e65d6810a928 100644 (file)
@@ -32,7 +32,10 @@ The <jobs> format is as follows:
        random          IO is randomized
        sequential      IO is sequential
        rate=x          Throttle rate to x KiB/sec
+       ratemin=x       Quit if rate of x KiB/sec can't be met
+       ratecycle=x     ratemin averaged over x msecs
        cpumask=x       Allow job to run on CPUs defined by mask
+       fsync=x         If writing, fsync after every x blocks have been written
 
 
 Examples using cmd line jobs
diff --git a/fio.c b/fio.c
index 51163adc0cdfef950b3dba6d7946dd5303cf441e..d0d1a37c50508fae3b1302982657981b2c043f83 100644 (file)
--- a/fio.c
+++ b/fio.c
@@ -37,6 +37,8 @@
 #include <sys/shm.h>
 #include <asm/unistd.h>
 
+#define MAX_JOBS       (1024)
+
 /*
  * assume we don't have _get either, if _set isn't defined
  */
@@ -79,28 +81,36 @@ enum {
 
 #define IOPRIO_CLASS_SHIFT     13
 
-#define BS     (4096)
 #define MASK   (4095)
 
-#define TIMEOUT        (30)
+#define DEF_BS         (4096)
+#define DEF_TIMEOUT    (30)
+#define DEF_RATE_CYCLE (1000)
+#define DEF_ODIRECT    (1)
+#define DEF_SEQUENTIAL (1)
+#define DEF_WRITESTAT  (0)
+#define DEF_RAND_REPEAT        (1)
+
+#define ALIGN(buf)     (char *) (((unsigned long) (buf) + MASK) & ~(MASK))
 
-#define ALIGN(buf)      (((unsigned long) (buf) + MASK) & ~(MASK))
+static int sequential = DEF_SEQUENTIAL;
+static int write_stat = DEF_WRITESTAT;
+static int repeatable = DEF_RAND_REPEAT;
+static int timeout = DEF_TIMEOUT;
+static int odirect = DEF_ODIRECT;
+static int global_bs = DEF_BS;
 
-static int sequential = 1;
-static int write_stat = 0;
-static int repeatable = 1;
 static int thread_number;
-static int timeout = TIMEOUT;
-static int odirect = 1;
-static int global_bs = BS;
 static char *ini_file;
 
 static int shm_id;
 
 static cpu_set_t def_cpumask;
 
-#define DDIR_READ      (0)
-#define DDIR_WRITE     (1)
+enum {
+       DDIR_READ = 0,
+       DDIR_WRITE,
+};
 
 struct thread_data {
        char file_name[256];
@@ -109,24 +119,29 @@ struct thread_data {
        int fd;
        int stat_fd;
        pid_t pid;
-       int terminate;
+       volatile int terminate;
        unsigned int ddir;
        unsigned int ioprio;
        unsigned int sequential;
        unsigned int bs;
        unsigned int odirect;
        unsigned int delay_sleep;
+       unsigned int fsync_blocks;
        cpu_set_t cpumask;
 
        unsigned int rate;
-       unsigned int rate_usec_cycle;
-       unsigned int rate_pending_usleep;
+       unsigned int ratemin;
+       unsigned int ratecycle;
+       unsigned long rate_usec_cycle;
+       long rate_pending_usleep;
+       unsigned long rate_blocks;
+       struct timeval lastrate;
 
        unsigned long max_latency;      /* msec */
        unsigned long min_latency;      /* msec */
        unsigned long runtime;          /* sec */
        unsigned long blocks;
-       unsigned long blocks_read;
+       unsigned long io_blocks;
        unsigned long last_block;
        sem_t mutex;
        struct drand48_data random_state;
@@ -137,6 +152,8 @@ struct thread_data {
        unsigned long stat_time;
        unsigned long stat_time_last;
        unsigned long stat_blocks_last;
+
+       struct timeval start;
 };
 
 static struct thread_data *threads;
@@ -192,21 +209,18 @@ void shutdown_stat_file(struct thread_data *td)
 
 int init_stat_file(struct thread_data *td)
 {
-       char *n;
+       char n[256];
 
        if (!write_stat)
                return 0;
 
-       n = malloc(256);
        sprintf(n, "%s.stat", td->file_name);
        td->stat_fd = open(n, O_WRONLY | O_CREAT | O_TRUNC, 0644);
        if (td->stat_fd == -1) {
-               free(n);
                td->error = errno;
                return 1;
        }
 
-       free(n);
        return 0;
 }
 
@@ -285,7 +299,7 @@ void add_stat_sample(struct thread_data *td, unsigned long msec)
                return;
 
 #if 0
-       sprintf(sample, "%lu, %lu\n", td->blocks_read, msec);
+       sprintf(sample, "%lu, %lu\n", td->io_blocks, msec);
        write(td->stat_fd, sample, strlen(sample));
 #else
        td->stat_time += msec;
@@ -298,7 +312,7 @@ void add_stat_sample(struct thread_data *td, unsigned long msec)
                td->stat_time_last = 0;
                td->stat_blocks_last = 0;
                sprintf(sample, "%lu, %lu\n", td->stat_time, rate);
-               //sprintf(sample, "%lu, %lu\n", td->blocks_read, msec);
+               //sprintf(sample, "%lu, %lu\n", td->io_blocks, msec);
                write(td->stat_fd, sample, strlen(sample));
        }
 #endif
@@ -321,6 +335,9 @@ void usec_sleep(int usec)
 
 void rate_throttle(struct thread_data *td, unsigned long time_spent)
 {
+       if (!td->rate)
+               return;
+
        if (time_spent < td->rate_usec_cycle) {
                unsigned long s = td->rate_usec_cycle - time_spent;
 
@@ -329,26 +346,59 @@ void rate_throttle(struct thread_data *td, unsigned long time_spent)
                        usec_sleep(td->rate_pending_usleep);
                        td->rate_pending_usleep = 0;
                }
-       } else if (td->rate_pending_usleep) {
+       } else {
                long overtime = time_spent - td->rate_usec_cycle;
 
-               if (overtime > td->rate_pending_usleep)
-                       td->rate_pending_usleep = 0;
-               else
-                       td->rate_pending_usleep -= overtime;
+               td->rate_pending_usleep -= overtime;
+       }
+}
+
+int check_min_rate(struct thread_data *td, struct timeval *now)
+{
+       unsigned long spent = mtime_since(&td->start, now);
+       unsigned long rate;
+
+       /*
+        * allow a 2 second settle period in the beginning
+        */
+       if (spent < 2000)
+               return 0;
+
+       /*
+        * if rate blocks is set, sample is running
+        */
+       if (td->rate_blocks) {
+               spent = mtime_since(&td->lastrate, now);
+               if (spent < td->ratecycle)
+                       return 0;
+
+               rate = ((td->io_blocks - td->rate_blocks) * td->bs) / spent;
+               if (rate < td->ratemin) {
+                       printf("Client%d: min rate %d not met, got %ldKiB/sec\n", td->thread_number, td->ratemin, rate);
+                       return 1;
+               }
        }
+
+       td->rate_blocks = td->io_blocks;
+       memcpy(&td->lastrate, now, sizeof(*now));
+       return 0;
 }
 
+#define should_fsync(td)       ((td)->ddir == DDIR_WRITE && !(td)->odirect)
+
 void do_thread_io(struct thread_data *td)
 {
-       struct timeval s, e, start;
+       struct timeval s, e;
        char *buffer, *ptr;
        unsigned long blocks, msec, usec;
 
-       ptr = malloc(td->bs+MASK);
-       buffer = (char *) ALIGN(ptr);
+       ptr = malloc(td->bs + MASK);
+       buffer = ALIGN(ptr);
 
-       gettimeofday(&start, NULL);
+       gettimeofday(&td->start, NULL);
+
+       if (td->ratemin)
+               memcpy(&td->lastrate, &td->start, sizeof(td->start));
 
        for (blocks = 0; blocks < td->blocks; blocks++) {
                off_t offset = get_next_offset(td);
@@ -378,20 +428,25 @@ void do_thread_io(struct thread_data *td)
                        break;
                }
 
+               td->io_blocks++;
+
+               if (should_fsync(td) && td->fsync_blocks &&
+                   (td->io_blocks % td->fsync_blocks) == 0)
+                       fsync(td->fd);
+
                gettimeofday(&e, NULL);
 
                usec = utime_since(&s, &e);
-               msec = usec / 1000;
 
-               if (td->rate)
-                       rate_throttle(td, usec);
+               rate_throttle(td, usec);
 
-               add_stat_sample(td, msec);
-
-               td->blocks_read++;
+               if (check_min_rate(td, &e)) {
+                       td->error = ENODATA;
+                       break;
+               }
 
-               //if (td->ddir == DDIR_WRITE && !(td->blocks_read % 512))
-               //      fsync(td->fd);
+               msec = usec / 1000;
+               add_stat_sample(td, msec);
 
                if (msec < td->min_latency)
                        td->min_latency = msec;
@@ -399,11 +454,11 @@ void do_thread_io(struct thread_data *td)
                        td->max_latency = msec;
        }
 
-       if (td->ddir == DDIR_WRITE && !td->odirect)
+       if (should_fsync(td))
                fsync(td->fd);
 
        gettimeofday(&e, NULL);
-       td->runtime = mtime_since(&start, &e);
+       td->runtime = mtime_since(&td->start, &e);
 
        free(ptr);
 }
@@ -412,7 +467,7 @@ void *thread_main(int shm_id, int offset, char *argv[])
 {
        struct thread_data *td;
        void *data;
-       struct stat *statbuf = NULL;
+       struct stat st;
        int ret = 1, flags;
 
        data = shmat(shm_id, NULL, 0);
@@ -429,7 +484,7 @@ void *thread_main(int shm_id, int offset, char *argv[])
        printf("Thread (%s) (pid=%u) (f=%s) started\n", td->ddir == DDIR_READ ? "read" : "write", td->pid, td->file_name);
        fflush(stdout);
 
-       sprintf(argv[0], "%s%d\n", argv[0], offset);
+       sprintf(argv[0], "fio%d", offset);
 
        flags = 0;
        if (td->odirect)
@@ -451,13 +506,12 @@ void *thread_main(int shm_id, int offset, char *argv[])
                goto out;
 
        if (td->ddir == DDIR_READ) {
-               statbuf = malloc(sizeof(*statbuf));
-               if (fstat(td->fd, statbuf) == -1) {
+               if (fstat(td->fd, &st) == -1) {
                        td->error = errno;
                        goto out;
                }
 
-               td->blocks = statbuf->st_size / td->bs;
+               td->blocks = st.st_size / td->bs;
                if (!td->blocks) {
                        td->error = EINVAL;
                        goto out;
@@ -478,15 +532,13 @@ void *thread_main(int shm_id, int offset, char *argv[])
        ret = 0;
 
 out:
-       if (statbuf)
-               free(statbuf);
        shutdown_stat_file(td);
 err:
        if (td->fd != -1)
                close(td->fd);
        if (ret)
                sem_post(&startup_sem);
-       shmdt(td);
+       shmdt(data);
        return NULL;
 }
 
@@ -501,12 +553,12 @@ void show_thread_status(struct thread_data *td)
        unsigned long bw = 0;
 
        if (td->runtime)
-               bw = (td->blocks_read * td->bs) / td->runtime;
+               bw = (td->io_blocks * td->bs) / td->runtime;
 
        prio = td->ioprio & 0xff;
        prio_class = td->ioprio >> IOPRIO_CLASS_SHIFT;
 
-       printf("thread%d (%s): err=%2d, prio=%1d/%1d maxl=%5lumsec, io=%6luMiB, bw=%6luKiB/sec\n", td->thread_number, td->ddir == DDIR_READ ? " read": "write", td->error, prio_class, prio, td->max_latency, td->blocks_read * td->bs >> 20, bw);
+       printf("thread%d (%s): err=%2d, prio=%1d/%1d maxl=%5lumsec, io=%6luMiB, bw=%6luKiB/sec\n", td->thread_number, td->ddir == DDIR_READ ? " read": "write", td->error, prio_class, prio, td->max_latency, td->io_blocks * td->bs >> 20, bw);
 }
 
 void usage(char *progname)
@@ -514,17 +566,32 @@ void usage(char *progname)
        printf("%s: <-s 0/1> <-b kb> <-t sec> <-w 0/1> <-c r,w,r...> file0... fileN\n", progname);
 }
 
-void setup_rate(struct thread_data *td)
+int setup_rate(struct thread_data *td)
 {
-       int nr_reads_per_sec = td->rate * 1024 / td->bs;
+       int nr_reads_per_sec;
+
+       if (!td->rate)
+               return 0;
+
+       if (td->rate < td->ratemin) {
+               fprintf(stderr, "min rate larger than nominal rate\n");
+               return -1;
+       }
 
+       nr_reads_per_sec = td->rate * 1024 / td->bs;
        td->rate_usec_cycle = 1000000 / nr_reads_per_sec;
        td->rate_pending_usleep = 0;
+       return 0;
 }
 
 struct thread_data *get_new_job(void)
 {
-       struct thread_data *td = &threads[thread_number++];
+       struct thread_data *td;
+
+       if (thread_number >= MAX_JOBS)
+               return NULL;
+
+       td = &threads[thread_number++];
 
        td->thread_number = thread_number;
        td->ddir = DDIR_READ;
@@ -532,6 +599,7 @@ struct thread_data *get_new_job(void)
        td->odirect = 1;
        td->delay_sleep = 0;
        td->rate = 0;
+       td->ratecycle = DEF_RATE_CYCLE;
        td->sequential = sequential;
        td->ioprio = 0;
        memcpy(&td->cpumask, &def_cpumask, sizeof(td->cpumask));
@@ -539,19 +607,26 @@ struct thread_data *get_new_job(void)
        return td;
 }
 
-void add_job(struct thread_data *td, const char *filename, int prioclass,
-            int prio)
+static void put_job(struct thread_data *td)
+{
+       memset(&threads[td->thread_number - 1], 0, sizeof(*td));
+       thread_number--;
+}
+
+int add_job(struct thread_data *td, const char *filename, int prioclass,
+           int prio)
 {
        strcpy(td->file_name, filename);
        td->stat_fd = -1;
-       sem_init(&td->mutex, 1, 1);
+       sem_init(&td->mutex, 1, 0);
        td->min_latency = 10000000;
        td->ioprio = (prioclass << IOPRIO_CLASS_SHIFT) | prio;
 
-       if (td->rate)
-               setup_rate(td);
+       if (setup_rate(td))
+               return -1;
 
        printf("Client%d: file=%s, rw=%d, prio=%d, seq=%d, odir=%d, bs=%d, rate=%d\n", td->thread_number, filename, td->ddir, td->ioprio, td->sequential, td->odirect, td->bs, td->rate);
+       return 0;
 }
 
 static void fill_cpu_mask(cpu_set_t cpumask, int cpu)
@@ -587,7 +662,7 @@ void fill_option(const char *input, char *output)
  * rw=
  * direct=
  */
-int parse_jobs_cmd(int argc, char *argv[], int index)
+void parse_jobs_cmd(int argc, char *argv[], int index)
 {
        struct thread_data *td;
        unsigned int prio, prioclass, cpu;
@@ -605,11 +680,13 @@ int parse_jobs_cmd(int argc, char *argv[], int index)
                        break;
 
                filename[0] = 0;
+
                td = get_new_job();
+               if (!td)
+                       break;
 
                prioclass = 2;
                prio = 4;
-               cpu = 0;
 
                c = strstr(p, "rw=");
                if (c) {
@@ -669,6 +746,20 @@ int parse_jobs_cmd(int argc, char *argv[], int index)
                        td->rate = strtoul(string, NULL, 10);
                }
 
+               c = strstr(p, "ratemin=");
+               if (c) {
+                       c += 8;
+                       fill_option(c, string);
+                       td->ratemin = strtoul(string, NULL, 10);
+               }
+
+               c = strstr(p, "ratecycle=");
+               if (c) {
+                       c += 10;
+                       fill_option(c, string);
+                       td->ratecycle = strtoul(string, NULL, 10);
+               }
+
                c = strstr(p, "cpumask=");
                if (c) {
                        c += 8;
@@ -677,6 +768,12 @@ int parse_jobs_cmd(int argc, char *argv[], int index)
                        fill_cpu_mask(td->cpumask, cpu);
                }
 
+               c = strstr(p, "fsync=");
+               if (c) {
+                       c += 6;
+                       fill_option(c, string);
+                       td->fsync_blocks = strtoul(string, NULL, 10);
+               }
 
                c = strstr(p, "random");
                if (c)
@@ -685,12 +782,12 @@ int parse_jobs_cmd(int argc, char *argv[], int index)
                if (c)
                        td->sequential = 1;
 
-               add_job(td, filename, prioclass, prio);
+               if (add_job(td, filename, prioclass, prio))
+                       put_job(td);
        }
 
        free(string);
        free(filename);
-       return thread_number;
 }
 
 int check_int(char *p, char *name, unsigned int *val)
@@ -721,7 +818,7 @@ int is_empty(char *line)
 
 int parse_jobs_ini(char *file)
 {
-       unsigned int prioclass, prio, cpu, jobs;
+       unsigned int prioclass, prio, cpu;
        struct thread_data *td;
        char *string, *name;
        fpos_t off;
@@ -731,14 +828,12 @@ int parse_jobs_ini(char *file)
        f = fopen(file, "r");
        if (!f) {
                perror("fopen");
-               return 0;
+               return 1;
        }
 
        string = malloc(4096);
        name = malloc(256);
 
-       jobs = 0;
-
        while ((p = fgets(string, 4096, f)) != NULL) {
                if (sscanf(p, "[%s]", name) != 1)
                        continue;
@@ -746,10 +841,11 @@ int parse_jobs_ini(char *file)
                name[strlen(name) - 1] = '\0';
 
                td = get_new_job();
+               if (!td)
+                       break;
 
                prioclass = 2;
                prio = 4;
-               cpu = 0;
 
                fgetpos(f, &off);
                while ((p = fgets(string, 4096, f)) != NULL) {
@@ -780,6 +876,14 @@ int parse_jobs_ini(char *file)
                                fgetpos(f, &off);
                                continue;
                        }
+                       if (!check_int(p, "ratemin", &td->ratemin)) {
+                               fgetpos(f, &off);
+                               continue;
+                       }
+                       if (!check_int(p, "ratecycle", &td->ratecycle)) {
+                               fgetpos(f, &off);
+                               continue;
+                       }
                        if (!check_int(p, "delay", &td->delay_sleep)) {
                                fgetpos(f, &off);
                                continue;
@@ -789,6 +893,10 @@ int parse_jobs_ini(char *file)
                                fgetpos(f, &off);
                                continue;
                        }
+                       if (!check_int(p, "fsync", &td->fsync_blocks)) {
+                               fgetpos(f, &off);
+                               continue;
+                       }
                        if (!strcmp(p, "sequential")) {
                                td->sequential = 1;
                                fgetpos(f, &off);
@@ -802,13 +910,13 @@ int parse_jobs_ini(char *file)
                }
                fsetpos(f, &off);
 
-               add_job(td, name, prioclass, prio);
-               jobs++;
+               if (add_job(td, name, prioclass, prio))
+                       put_job(td);
        }
 
        free(string);
        free(name);
-       return jobs;
+       return 0;
 }
 
 int parse_options(int argc, char *argv[])
@@ -831,10 +939,18 @@ int parse_options(int argc, char *argv[])
                                parm++;
                                global_bs = atoi(parm);
                                global_bs <<= 10;
+                               if (!global_bs) {
+                                       printf("bad block size\n");
+                                       global_bs = DEF_BS;
+                               }
                                break;
                        case 't':
                                parm++;
                                timeout = atoi(parm);
+                               if (!timeout) {
+                                       printf("bad timeout\n");
+                                       timeout = DEF_TIMEOUT;
+                               }
                                break;
                        case 'w':
                                parm++;
@@ -869,14 +985,9 @@ int main(int argc, char *argv[])
        static unsigned long max_run[2], min_run[2], total_blocks[2];
        static unsigned long max_bw[2], min_bw[2], maxl[2], minl[2];
        static unsigned long read_mb, write_mb, read_agg, write_agg;
-       int i, jobs;
-
-       if (sched_getaffinity(getpid(), sizeof(def_cpumask), &def_cpumask) == -1) {
-               perror("sched_getaffinity");
-               return 1;
-       }
+       int i;
 
-       shm_id = shmget(0, (argc - 1) * sizeof(struct thread_data), IPC_CREAT | 0600);
+       shm_id = shmget(0, MAX_JOBS * sizeof(struct thread_data), IPC_CREAT | 0600);
        if (shm_id == -1) {
                perror("shmget");
                return 1;
@@ -890,27 +1001,30 @@ int main(int argc, char *argv[])
 
        atexit(free_shm);
 
+       if (sched_getaffinity(getpid(), sizeof(def_cpumask), &def_cpumask) == -1) {
+               perror("sched_getaffinity");
+               return 1;
+       }
+
        i = parse_options(argc, argv);
 
-       if (ini_file)
-               jobs = parse_jobs_ini(ini_file);
-       else
-               jobs = parse_jobs_cmd(argc, argv, i);
+       if (ini_file) {
+               if (parse_jobs_ini(ini_file))
+                       return 1;
+       } else
+               parse_jobs_cmd(argc, argv, i);
 
-       if (global_bs <= 0)
-               global_bs = BS;
-       if (timeout <= 0)
-               timeout = TIMEOUT;
+       if (!thread_number) {
+               printf("Nothing to do\n");
+               return 1;
+       }
 
        printf("%s: %s, bs=%uKiB, timeo=%u, write_stat=%u, odirect=%d\n", argv[0], sequential ? "sequential" : "random", global_bs >> 10, timeout, write_stat, odirect);
 
-       if (!jobs) {
-               printf("Nothing to do\n");
-               return 1;
-       } else
-               printf("%d Clients configured\n", jobs);
+       printf("Starting %d threads\n", thread_number);
+       fflush(stdout);
 
-       for (i = 0; i < jobs; i++) {
+       for (i = 0; i < thread_number; i++) {
                sem_init(&startup_sem, 1, 1);
 
                if (fork())
@@ -921,15 +1035,11 @@ int main(int argc, char *argv[])
                }
        }
 
-       if (!thread_number) {
-               usage(argv[0]);
-               return 1;
-       }
-
        signal(SIGALRM, sig_handler);
        alarm(timeout);
 
-       printf("Starting %d threads\n", thread_number);
+       printf("Kicking off %d threads\n", thread_number);
+       fflush(stdout);
        for (i = 0; i < thread_number; i++) {
                struct thread_data *td = &threads[i];
 
@@ -958,7 +1068,7 @@ int main(int argc, char *argv[])
                        max_run[td->ddir] = td->runtime;
 
                if (td->runtime)
-                       bw = (td->blocks_read * td->bs) / td->runtime;
+                       bw = (td->io_blocks * td->bs) / td->runtime;
                if (bw < min_bw[td->ddir])
                        min_bw[td->ddir] = bw;
                if (bw > max_bw[td->ddir])
@@ -968,17 +1078,17 @@ int main(int argc, char *argv[])
                if (td->max_latency > maxl[td->ddir])
                        maxl[td->ddir] = td->max_latency;
 
-               total_blocks[td->ddir] += td->blocks_read;
+               total_blocks[td->ddir] += td->io_blocks;
 
                if (td->ddir == DDIR_READ) {
-                       read_mb += (td->bs * td->blocks_read) >> 20;
+                       read_mb += (td->bs * td->io_blocks) >> 20;
                        if (td->runtime)
-                               read_agg += (td->blocks_read * td->bs) / td->runtime;
+                               read_agg += (td->io_blocks * td->bs) / td->runtime;
                }
                if (td->ddir == DDIR_WRITE) {
-                       write_mb += (td->bs * td->blocks_read) >> 20;
+                       write_mb += (td->bs * td->io_blocks) >> 20;
                        if (td->runtime)
-                               write_agg += (td->blocks_read * td->bs) / td->runtime;
+                               write_agg += (td->io_blocks * td->bs) / td->runtime;
                }
 
 show_stat: