From: Jens Axboe Date: Wed, 19 Oct 2005 11:42:09 +0000 (+0200) Subject: [PATCH] fio: Add ratemin/ratecycle/fsync options and lots of cleanups X-Git-Url: https://git.kernel.dk/?a=commitdiff_plain;h=4240cfa1bc78e23a2743f23f78ffddc9ce5d8230;p=disktools.git [PATCH] fio: Add ratemin/ratecycle/fsync options and lots of cleanups - Define max jobs and allocate threads in advance, was buggy - ->mutex should default to locked for proper synced startup - Add fsync option for fsync'ing for every N blocks on non-direct writes - Add ratemin option for quitting the thread if min rate isn't met - Add ratecycle option for setting rate average period - Lots of cleanups --- diff --git a/README.fio b/README.fio index 8430f0c..abe06f2 100644 --- a/README.fio +++ b/README.fio @@ -32,7 +32,10 @@ The format is as follows: random IO is randomized sequential IO is sequential rate=x Throttle rate to x KiB/sec + ratemin=x Quit if rate of x KiB/sec can't be met + ratecycle=x ratemin averaged over x msecs cpumask=x Allow job to run on CPUs defined by mask + fsync=x If writing, fsync after every x blocks have been written Examples using cmd line jobs diff --git a/fio.c b/fio.c index 51163ad..d0d1a37 100644 --- a/fio.c +++ b/fio.c @@ -37,6 +37,8 @@ #include #include +#define MAX_JOBS (1024) + /* * assume we don't have _get either, if _set isn't defined */ @@ -79,28 +81,36 @@ enum { #define IOPRIO_CLASS_SHIFT 13 -#define BS (4096) #define MASK (4095) -#define TIMEOUT (30) +#define DEF_BS (4096) +#define DEF_TIMEOUT (30) +#define DEF_RATE_CYCLE (1000) +#define DEF_ODIRECT (1) +#define DEF_SEQUENTIAL (1) +#define DEF_WRITESTAT (0) +#define DEF_RAND_REPEAT (1) + +#define ALIGN(buf) (char *) (((unsigned long) (buf) + MASK) & ~(MASK)) -#define ALIGN(buf) (((unsigned long) (buf) + MASK) & ~(MASK)) +static int sequential = DEF_SEQUENTIAL; +static int write_stat = DEF_WRITESTAT; +static int repeatable = DEF_RAND_REPEAT; +static int timeout = DEF_TIMEOUT; +static int odirect = DEF_ODIRECT; +static int global_bs = DEF_BS; -static int sequential = 1; -static int write_stat = 0; -static int repeatable = 1; static int thread_number; -static int timeout = TIMEOUT; -static int odirect = 1; -static int global_bs = BS; static char *ini_file; static int shm_id; static cpu_set_t def_cpumask; -#define DDIR_READ (0) -#define DDIR_WRITE (1) +enum { + DDIR_READ = 0, + DDIR_WRITE, +}; struct thread_data { char file_name[256]; @@ -109,24 +119,29 @@ struct thread_data { int fd; int stat_fd; pid_t pid; - int terminate; + volatile int terminate; unsigned int ddir; unsigned int ioprio; unsigned int sequential; unsigned int bs; unsigned int odirect; unsigned int delay_sleep; + unsigned int fsync_blocks; cpu_set_t cpumask; unsigned int rate; - unsigned int rate_usec_cycle; - unsigned int rate_pending_usleep; + unsigned int ratemin; + unsigned int ratecycle; + unsigned long rate_usec_cycle; + long rate_pending_usleep; + unsigned long rate_blocks; + struct timeval lastrate; unsigned long max_latency; /* msec */ unsigned long min_latency; /* msec */ unsigned long runtime; /* sec */ unsigned long blocks; - unsigned long blocks_read; + unsigned long io_blocks; unsigned long last_block; sem_t mutex; struct drand48_data random_state; @@ -137,6 +152,8 @@ struct thread_data { unsigned long stat_time; unsigned long stat_time_last; unsigned long stat_blocks_last; + + struct timeval start; }; static struct thread_data *threads; @@ -192,21 +209,18 @@ void shutdown_stat_file(struct thread_data *td) int init_stat_file(struct thread_data *td) { - char *n; + char n[256]; if (!write_stat) return 0; - n = malloc(256); sprintf(n, "%s.stat", td->file_name); td->stat_fd = open(n, O_WRONLY | O_CREAT | O_TRUNC, 0644); if (td->stat_fd == -1) { - free(n); td->error = errno; return 1; } - free(n); return 0; } @@ -285,7 +299,7 @@ void add_stat_sample(struct thread_data *td, unsigned long msec) return; #if 0 - sprintf(sample, "%lu, %lu\n", td->blocks_read, msec); + sprintf(sample, "%lu, %lu\n", td->io_blocks, msec); write(td->stat_fd, sample, strlen(sample)); #else td->stat_time += msec; @@ -298,7 +312,7 @@ void add_stat_sample(struct thread_data *td, unsigned long msec) td->stat_time_last = 0; td->stat_blocks_last = 0; sprintf(sample, "%lu, %lu\n", td->stat_time, rate); - //sprintf(sample, "%lu, %lu\n", td->blocks_read, msec); + //sprintf(sample, "%lu, %lu\n", td->io_blocks, msec); write(td->stat_fd, sample, strlen(sample)); } #endif @@ -321,6 +335,9 @@ void usec_sleep(int usec) void rate_throttle(struct thread_data *td, unsigned long time_spent) { + if (!td->rate) + return; + if (time_spent < td->rate_usec_cycle) { unsigned long s = td->rate_usec_cycle - time_spent; @@ -329,26 +346,59 @@ void rate_throttle(struct thread_data *td, unsigned long time_spent) usec_sleep(td->rate_pending_usleep); td->rate_pending_usleep = 0; } - } else if (td->rate_pending_usleep) { + } else { long overtime = time_spent - td->rate_usec_cycle; - if (overtime > td->rate_pending_usleep) - td->rate_pending_usleep = 0; - else - td->rate_pending_usleep -= overtime; + td->rate_pending_usleep -= overtime; + } +} + +int check_min_rate(struct thread_data *td, struct timeval *now) +{ + unsigned long spent = mtime_since(&td->start, now); + unsigned long rate; + + /* + * allow a 2 second settle period in the beginning + */ + if (spent < 2000) + return 0; + + /* + * if rate blocks is set, sample is running + */ + if (td->rate_blocks) { + spent = mtime_since(&td->lastrate, now); + if (spent < td->ratecycle) + return 0; + + rate = ((td->io_blocks - td->rate_blocks) * td->bs) / spent; + if (rate < td->ratemin) { + printf("Client%d: min rate %d not met, got %ldKiB/sec\n", td->thread_number, td->ratemin, rate); + return 1; + } } + + td->rate_blocks = td->io_blocks; + memcpy(&td->lastrate, now, sizeof(*now)); + return 0; } +#define should_fsync(td) ((td)->ddir == DDIR_WRITE && !(td)->odirect) + void do_thread_io(struct thread_data *td) { - struct timeval s, e, start; + struct timeval s, e; char *buffer, *ptr; unsigned long blocks, msec, usec; - ptr = malloc(td->bs+MASK); - buffer = (char *) ALIGN(ptr); + ptr = malloc(td->bs + MASK); + buffer = ALIGN(ptr); - gettimeofday(&start, NULL); + gettimeofday(&td->start, NULL); + + if (td->ratemin) + memcpy(&td->lastrate, &td->start, sizeof(td->start)); for (blocks = 0; blocks < td->blocks; blocks++) { off_t offset = get_next_offset(td); @@ -378,20 +428,25 @@ void do_thread_io(struct thread_data *td) break; } + td->io_blocks++; + + if (should_fsync(td) && td->fsync_blocks && + (td->io_blocks % td->fsync_blocks) == 0) + fsync(td->fd); + gettimeofday(&e, NULL); usec = utime_since(&s, &e); - msec = usec / 1000; - if (td->rate) - rate_throttle(td, usec); + rate_throttle(td, usec); - add_stat_sample(td, msec); - - td->blocks_read++; + if (check_min_rate(td, &e)) { + td->error = ENODATA; + break; + } - //if (td->ddir == DDIR_WRITE && !(td->blocks_read % 512)) - // fsync(td->fd); + msec = usec / 1000; + add_stat_sample(td, msec); if (msec < td->min_latency) td->min_latency = msec; @@ -399,11 +454,11 @@ void do_thread_io(struct thread_data *td) td->max_latency = msec; } - if (td->ddir == DDIR_WRITE && !td->odirect) + if (should_fsync(td)) fsync(td->fd); gettimeofday(&e, NULL); - td->runtime = mtime_since(&start, &e); + td->runtime = mtime_since(&td->start, &e); free(ptr); } @@ -412,7 +467,7 @@ void *thread_main(int shm_id, int offset, char *argv[]) { struct thread_data *td; void *data; - struct stat *statbuf = NULL; + struct stat st; int ret = 1, flags; data = shmat(shm_id, NULL, 0); @@ -429,7 +484,7 @@ void *thread_main(int shm_id, int offset, char *argv[]) printf("Thread (%s) (pid=%u) (f=%s) started\n", td->ddir == DDIR_READ ? "read" : "write", td->pid, td->file_name); fflush(stdout); - sprintf(argv[0], "%s%d\n", argv[0], offset); + sprintf(argv[0], "fio%d", offset); flags = 0; if (td->odirect) @@ -451,13 +506,12 @@ void *thread_main(int shm_id, int offset, char *argv[]) goto out; if (td->ddir == DDIR_READ) { - statbuf = malloc(sizeof(*statbuf)); - if (fstat(td->fd, statbuf) == -1) { + if (fstat(td->fd, &st) == -1) { td->error = errno; goto out; } - td->blocks = statbuf->st_size / td->bs; + td->blocks = st.st_size / td->bs; if (!td->blocks) { td->error = EINVAL; goto out; @@ -478,15 +532,13 @@ void *thread_main(int shm_id, int offset, char *argv[]) ret = 0; out: - if (statbuf) - free(statbuf); shutdown_stat_file(td); err: if (td->fd != -1) close(td->fd); if (ret) sem_post(&startup_sem); - shmdt(td); + shmdt(data); return NULL; } @@ -501,12 +553,12 @@ void show_thread_status(struct thread_data *td) unsigned long bw = 0; if (td->runtime) - bw = (td->blocks_read * td->bs) / td->runtime; + bw = (td->io_blocks * td->bs) / td->runtime; prio = td->ioprio & 0xff; prio_class = td->ioprio >> IOPRIO_CLASS_SHIFT; - printf("thread%d (%s): err=%2d, prio=%1d/%1d maxl=%5lumsec, io=%6luMiB, bw=%6luKiB/sec\n", td->thread_number, td->ddir == DDIR_READ ? " read": "write", td->error, prio_class, prio, td->max_latency, td->blocks_read * td->bs >> 20, bw); + printf("thread%d (%s): err=%2d, prio=%1d/%1d maxl=%5lumsec, io=%6luMiB, bw=%6luKiB/sec\n", td->thread_number, td->ddir == DDIR_READ ? " read": "write", td->error, prio_class, prio, td->max_latency, td->io_blocks * td->bs >> 20, bw); } void usage(char *progname) @@ -514,17 +566,32 @@ void usage(char *progname) printf("%s: <-s 0/1> <-b kb> <-t sec> <-w 0/1> <-c r,w,r...> file0... fileN\n", progname); } -void setup_rate(struct thread_data *td) +int setup_rate(struct thread_data *td) { - int nr_reads_per_sec = td->rate * 1024 / td->bs; + int nr_reads_per_sec; + + if (!td->rate) + return 0; + + if (td->rate < td->ratemin) { + fprintf(stderr, "min rate larger than nominal rate\n"); + return -1; + } + nr_reads_per_sec = td->rate * 1024 / td->bs; td->rate_usec_cycle = 1000000 / nr_reads_per_sec; td->rate_pending_usleep = 0; + return 0; } struct thread_data *get_new_job(void) { - struct thread_data *td = &threads[thread_number++]; + struct thread_data *td; + + if (thread_number >= MAX_JOBS) + return NULL; + + td = &threads[thread_number++]; td->thread_number = thread_number; td->ddir = DDIR_READ; @@ -532,6 +599,7 @@ struct thread_data *get_new_job(void) td->odirect = 1; td->delay_sleep = 0; td->rate = 0; + td->ratecycle = DEF_RATE_CYCLE; td->sequential = sequential; td->ioprio = 0; memcpy(&td->cpumask, &def_cpumask, sizeof(td->cpumask)); @@ -539,19 +607,26 @@ struct thread_data *get_new_job(void) return td; } -void add_job(struct thread_data *td, const char *filename, int prioclass, - int prio) +static void put_job(struct thread_data *td) +{ + memset(&threads[td->thread_number - 1], 0, sizeof(*td)); + thread_number--; +} + +int add_job(struct thread_data *td, const char *filename, int prioclass, + int prio) { strcpy(td->file_name, filename); td->stat_fd = -1; - sem_init(&td->mutex, 1, 1); + sem_init(&td->mutex, 1, 0); td->min_latency = 10000000; td->ioprio = (prioclass << IOPRIO_CLASS_SHIFT) | prio; - if (td->rate) - setup_rate(td); + if (setup_rate(td)) + return -1; printf("Client%d: file=%s, rw=%d, prio=%d, seq=%d, odir=%d, bs=%d, rate=%d\n", td->thread_number, filename, td->ddir, td->ioprio, td->sequential, td->odirect, td->bs, td->rate); + return 0; } static void fill_cpu_mask(cpu_set_t cpumask, int cpu) @@ -587,7 +662,7 @@ void fill_option(const char *input, char *output) * rw= * direct= */ -int parse_jobs_cmd(int argc, char *argv[], int index) +void parse_jobs_cmd(int argc, char *argv[], int index) { struct thread_data *td; unsigned int prio, prioclass, cpu; @@ -605,11 +680,13 @@ int parse_jobs_cmd(int argc, char *argv[], int index) break; filename[0] = 0; + td = get_new_job(); + if (!td) + break; prioclass = 2; prio = 4; - cpu = 0; c = strstr(p, "rw="); if (c) { @@ -669,6 +746,20 @@ int parse_jobs_cmd(int argc, char *argv[], int index) td->rate = strtoul(string, NULL, 10); } + c = strstr(p, "ratemin="); + if (c) { + c += 8; + fill_option(c, string); + td->ratemin = strtoul(string, NULL, 10); + } + + c = strstr(p, "ratecycle="); + if (c) { + c += 10; + fill_option(c, string); + td->ratecycle = strtoul(string, NULL, 10); + } + c = strstr(p, "cpumask="); if (c) { c += 8; @@ -677,6 +768,12 @@ int parse_jobs_cmd(int argc, char *argv[], int index) fill_cpu_mask(td->cpumask, cpu); } + c = strstr(p, "fsync="); + if (c) { + c += 6; + fill_option(c, string); + td->fsync_blocks = strtoul(string, NULL, 10); + } c = strstr(p, "random"); if (c) @@ -685,12 +782,12 @@ int parse_jobs_cmd(int argc, char *argv[], int index) if (c) td->sequential = 1; - add_job(td, filename, prioclass, prio); + if (add_job(td, filename, prioclass, prio)) + put_job(td); } free(string); free(filename); - return thread_number; } int check_int(char *p, char *name, unsigned int *val) @@ -721,7 +818,7 @@ int is_empty(char *line) int parse_jobs_ini(char *file) { - unsigned int prioclass, prio, cpu, jobs; + unsigned int prioclass, prio, cpu; struct thread_data *td; char *string, *name; fpos_t off; @@ -731,14 +828,12 @@ int parse_jobs_ini(char *file) f = fopen(file, "r"); if (!f) { perror("fopen"); - return 0; + return 1; } string = malloc(4096); name = malloc(256); - jobs = 0; - while ((p = fgets(string, 4096, f)) != NULL) { if (sscanf(p, "[%s]", name) != 1) continue; @@ -746,10 +841,11 @@ int parse_jobs_ini(char *file) name[strlen(name) - 1] = '\0'; td = get_new_job(); + if (!td) + break; prioclass = 2; prio = 4; - cpu = 0; fgetpos(f, &off); while ((p = fgets(string, 4096, f)) != NULL) { @@ -780,6 +876,14 @@ int parse_jobs_ini(char *file) fgetpos(f, &off); continue; } + if (!check_int(p, "ratemin", &td->ratemin)) { + fgetpos(f, &off); + continue; + } + if (!check_int(p, "ratecycle", &td->ratecycle)) { + fgetpos(f, &off); + continue; + } if (!check_int(p, "delay", &td->delay_sleep)) { fgetpos(f, &off); continue; @@ -789,6 +893,10 @@ int parse_jobs_ini(char *file) fgetpos(f, &off); continue; } + if (!check_int(p, "fsync", &td->fsync_blocks)) { + fgetpos(f, &off); + continue; + } if (!strcmp(p, "sequential")) { td->sequential = 1; fgetpos(f, &off); @@ -802,13 +910,13 @@ int parse_jobs_ini(char *file) } fsetpos(f, &off); - add_job(td, name, prioclass, prio); - jobs++; + if (add_job(td, name, prioclass, prio)) + put_job(td); } free(string); free(name); - return jobs; + return 0; } int parse_options(int argc, char *argv[]) @@ -831,10 +939,18 @@ int parse_options(int argc, char *argv[]) parm++; global_bs = atoi(parm); global_bs <<= 10; + if (!global_bs) { + printf("bad block size\n"); + global_bs = DEF_BS; + } break; case 't': parm++; timeout = atoi(parm); + if (!timeout) { + printf("bad timeout\n"); + timeout = DEF_TIMEOUT; + } break; case 'w': parm++; @@ -869,14 +985,9 @@ int main(int argc, char *argv[]) static unsigned long max_run[2], min_run[2], total_blocks[2]; static unsigned long max_bw[2], min_bw[2], maxl[2], minl[2]; static unsigned long read_mb, write_mb, read_agg, write_agg; - int i, jobs; - - if (sched_getaffinity(getpid(), sizeof(def_cpumask), &def_cpumask) == -1) { - perror("sched_getaffinity"); - return 1; - } + int i; - shm_id = shmget(0, (argc - 1) * sizeof(struct thread_data), IPC_CREAT | 0600); + shm_id = shmget(0, MAX_JOBS * sizeof(struct thread_data), IPC_CREAT | 0600); if (shm_id == -1) { perror("shmget"); return 1; @@ -890,27 +1001,30 @@ int main(int argc, char *argv[]) atexit(free_shm); + if (sched_getaffinity(getpid(), sizeof(def_cpumask), &def_cpumask) == -1) { + perror("sched_getaffinity"); + return 1; + } + i = parse_options(argc, argv); - if (ini_file) - jobs = parse_jobs_ini(ini_file); - else - jobs = parse_jobs_cmd(argc, argv, i); + if (ini_file) { + if (parse_jobs_ini(ini_file)) + return 1; + } else + parse_jobs_cmd(argc, argv, i); - if (global_bs <= 0) - global_bs = BS; - if (timeout <= 0) - timeout = TIMEOUT; + if (!thread_number) { + printf("Nothing to do\n"); + return 1; + } printf("%s: %s, bs=%uKiB, timeo=%u, write_stat=%u, odirect=%d\n", argv[0], sequential ? "sequential" : "random", global_bs >> 10, timeout, write_stat, odirect); - if (!jobs) { - printf("Nothing to do\n"); - return 1; - } else - printf("%d Clients configured\n", jobs); + printf("Starting %d threads\n", thread_number); + fflush(stdout); - for (i = 0; i < jobs; i++) { + for (i = 0; i < thread_number; i++) { sem_init(&startup_sem, 1, 1); if (fork()) @@ -921,15 +1035,11 @@ int main(int argc, char *argv[]) } } - if (!thread_number) { - usage(argv[0]); - return 1; - } - signal(SIGALRM, sig_handler); alarm(timeout); - printf("Starting %d threads\n", thread_number); + printf("Kicking off %d threads\n", thread_number); + fflush(stdout); for (i = 0; i < thread_number; i++) { struct thread_data *td = &threads[i]; @@ -958,7 +1068,7 @@ int main(int argc, char *argv[]) max_run[td->ddir] = td->runtime; if (td->runtime) - bw = (td->blocks_read * td->bs) / td->runtime; + bw = (td->io_blocks * td->bs) / td->runtime; if (bw < min_bw[td->ddir]) min_bw[td->ddir] = bw; if (bw > max_bw[td->ddir]) @@ -968,17 +1078,17 @@ int main(int argc, char *argv[]) if (td->max_latency > maxl[td->ddir]) maxl[td->ddir] = td->max_latency; - total_blocks[td->ddir] += td->blocks_read; + total_blocks[td->ddir] += td->io_blocks; if (td->ddir == DDIR_READ) { - read_mb += (td->bs * td->blocks_read) >> 20; + read_mb += (td->bs * td->io_blocks) >> 20; if (td->runtime) - read_agg += (td->blocks_read * td->bs) / td->runtime; + read_agg += (td->io_blocks * td->bs) / td->runtime; } if (td->ddir == DDIR_WRITE) { - write_mb += (td->bs * td->blocks_read) >> 20; + write_mb += (td->bs * td->io_blocks) >> 20; if (td->runtime) - write_agg += (td->blocks_read * td->bs) / td->runtime; + write_agg += (td->io_blocks * td->bs) / td->runtime; } show_stat: