#include <sys/shm.h>
#include <asm/unistd.h>
+#define MAX_JOBS (1024)
+
/*
* assume we don't have _get either, if _set isn't defined
*/
#define IOPRIO_CLASS_SHIFT 13
-#define BS (4096)
#define MASK (4095)
-#define TIMEOUT (30)
+#define DEF_BS (4096)
+#define DEF_TIMEOUT (30)
+#define DEF_RATE_CYCLE (1000)
+#define DEF_ODIRECT (1)
+#define DEF_SEQUENTIAL (1)
+#define DEF_WRITESTAT (0)
+#define DEF_RAND_REPEAT (1)
+
+#define ALIGN(buf) (char *) (((unsigned long) (buf) + MASK) & ~(MASK))
-#define ALIGN(buf) (((unsigned long) (buf) + MASK) & ~(MASK))
+static int sequential = DEF_SEQUENTIAL;
+static int write_stat = DEF_WRITESTAT;
+static int repeatable = DEF_RAND_REPEAT;
+static int timeout = DEF_TIMEOUT;
+static int odirect = DEF_ODIRECT;
+static int global_bs = DEF_BS;
-static int sequential = 1;
-static int write_stat = 0;
-static int repeatable = 1;
static int thread_number;
-static int timeout = TIMEOUT;
-static int odirect = 1;
-static int global_bs = BS;
static char *ini_file;
static int shm_id;
static cpu_set_t def_cpumask;
-#define DDIR_READ (0)
-#define DDIR_WRITE (1)
+enum {
+ DDIR_READ = 0,
+ DDIR_WRITE,
+};
struct thread_data {
char file_name[256];
int fd;
int stat_fd;
pid_t pid;
- int terminate;
+ volatile int terminate;
unsigned int ddir;
unsigned int ioprio;
unsigned int sequential;
unsigned int bs;
unsigned int odirect;
unsigned int delay_sleep;
+ unsigned int fsync_blocks;
cpu_set_t cpumask;
unsigned int rate;
- unsigned int rate_usec_cycle;
- unsigned int rate_pending_usleep;
+ unsigned int ratemin;
+ unsigned int ratecycle;
+ unsigned long rate_usec_cycle;
+ long rate_pending_usleep;
+ unsigned long rate_blocks;
+ struct timeval lastrate;
unsigned long max_latency; /* msec */
unsigned long min_latency; /* msec */
unsigned long runtime; /* sec */
unsigned long blocks;
- unsigned long blocks_read;
+ unsigned long io_blocks;
unsigned long last_block;
sem_t mutex;
struct drand48_data random_state;
unsigned long stat_time;
unsigned long stat_time_last;
unsigned long stat_blocks_last;
+
+ struct timeval start;
};
static struct thread_data *threads;
int init_stat_file(struct thread_data *td)
{
- char *n;
+ char n[256];
if (!write_stat)
return 0;
- n = malloc(256);
sprintf(n, "%s.stat", td->file_name);
td->stat_fd = open(n, O_WRONLY | O_CREAT | O_TRUNC, 0644);
if (td->stat_fd == -1) {
- free(n);
td->error = errno;
return 1;
}
- free(n);
return 0;
}
return;
#if 0
- sprintf(sample, "%lu, %lu\n", td->blocks_read, msec);
+ sprintf(sample, "%lu, %lu\n", td->io_blocks, msec);
write(td->stat_fd, sample, strlen(sample));
#else
td->stat_time += msec;
td->stat_time_last = 0;
td->stat_blocks_last = 0;
sprintf(sample, "%lu, %lu\n", td->stat_time, rate);
- //sprintf(sample, "%lu, %lu\n", td->blocks_read, msec);
+ //sprintf(sample, "%lu, %lu\n", td->io_blocks, msec);
write(td->stat_fd, sample, strlen(sample));
}
#endif
void rate_throttle(struct thread_data *td, unsigned long time_spent)
{
+ if (!td->rate)
+ return;
+
if (time_spent < td->rate_usec_cycle) {
unsigned long s = td->rate_usec_cycle - time_spent;
usec_sleep(td->rate_pending_usleep);
td->rate_pending_usleep = 0;
}
- } else if (td->rate_pending_usleep) {
+ } else {
long overtime = time_spent - td->rate_usec_cycle;
- if (overtime > td->rate_pending_usleep)
- td->rate_pending_usleep = 0;
- else
- td->rate_pending_usleep -= overtime;
+ td->rate_pending_usleep -= overtime;
+ }
+}
+
+int check_min_rate(struct thread_data *td, struct timeval *now)
+{
+ unsigned long spent = mtime_since(&td->start, now);
+ unsigned long rate;
+
+ /*
+ * allow a 2 second settle period in the beginning
+ */
+ if (spent < 2000)
+ return 0;
+
+ /*
+ * if rate blocks is set, sample is running
+ */
+ if (td->rate_blocks) {
+ spent = mtime_since(&td->lastrate, now);
+ if (spent < td->ratecycle)
+ return 0;
+
+ rate = ((td->io_blocks - td->rate_blocks) * td->bs) / spent;
+ if (rate < td->ratemin) {
+ printf("Client%d: min rate %d not met, got %ldKiB/sec\n", td->thread_number, td->ratemin, rate);
+ return 1;
+ }
}
+
+ td->rate_blocks = td->io_blocks;
+ memcpy(&td->lastrate, now, sizeof(*now));
+ return 0;
}
+#define should_fsync(td) ((td)->ddir == DDIR_WRITE && !(td)->odirect)
+
void do_thread_io(struct thread_data *td)
{
- struct timeval s, e, start;
+ struct timeval s, e;
char *buffer, *ptr;
unsigned long blocks, msec, usec;
- ptr = malloc(td->bs+MASK);
- buffer = (char *) ALIGN(ptr);
+ ptr = malloc(td->bs + MASK);
+ buffer = ALIGN(ptr);
- gettimeofday(&start, NULL);
+ gettimeofday(&td->start, NULL);
+
+ if (td->ratemin)
+ memcpy(&td->lastrate, &td->start, sizeof(td->start));
for (blocks = 0; blocks < td->blocks; blocks++) {
off_t offset = get_next_offset(td);
break;
}
+ td->io_blocks++;
+
+ if (should_fsync(td) && td->fsync_blocks &&
+ (td->io_blocks % td->fsync_blocks) == 0)
+ fsync(td->fd);
+
gettimeofday(&e, NULL);
usec = utime_since(&s, &e);
- msec = usec / 1000;
- if (td->rate)
- rate_throttle(td, usec);
+ rate_throttle(td, usec);
- add_stat_sample(td, msec);
-
- td->blocks_read++;
+ if (check_min_rate(td, &e)) {
+ td->error = ENODATA;
+ break;
+ }
- //if (td->ddir == DDIR_WRITE && !(td->blocks_read % 512))
- // fsync(td->fd);
+ msec = usec / 1000;
+ add_stat_sample(td, msec);
if (msec < td->min_latency)
td->min_latency = msec;
td->max_latency = msec;
}
- if (td->ddir == DDIR_WRITE && !td->odirect)
+ if (should_fsync(td))
fsync(td->fd);
gettimeofday(&e, NULL);
- td->runtime = mtime_since(&start, &e);
+ td->runtime = mtime_since(&td->start, &e);
free(ptr);
}
{
struct thread_data *td;
void *data;
- struct stat *statbuf = NULL;
+ struct stat st;
int ret = 1, flags;
data = shmat(shm_id, NULL, 0);
printf("Thread (%s) (pid=%u) (f=%s) started\n", td->ddir == DDIR_READ ? "read" : "write", td->pid, td->file_name);
fflush(stdout);
- sprintf(argv[0], "%s%d\n", argv[0], offset);
+ sprintf(argv[0], "fio%d", offset);
flags = 0;
if (td->odirect)
goto out;
if (td->ddir == DDIR_READ) {
- statbuf = malloc(sizeof(*statbuf));
- if (fstat(td->fd, statbuf) == -1) {
+ if (fstat(td->fd, &st) == -1) {
td->error = errno;
goto out;
}
- td->blocks = statbuf->st_size / td->bs;
+ td->blocks = st.st_size / td->bs;
if (!td->blocks) {
td->error = EINVAL;
goto out;
ret = 0;
out:
- if (statbuf)
- free(statbuf);
shutdown_stat_file(td);
err:
if (td->fd != -1)
close(td->fd);
if (ret)
sem_post(&startup_sem);
- shmdt(td);
+ shmdt(data);
return NULL;
}
unsigned long bw = 0;
if (td->runtime)
- bw = (td->blocks_read * td->bs) / td->runtime;
+ bw = (td->io_blocks * td->bs) / td->runtime;
prio = td->ioprio & 0xff;
prio_class = td->ioprio >> IOPRIO_CLASS_SHIFT;
- printf("thread%d (%s): err=%2d, prio=%1d/%1d maxl=%5lumsec, io=%6luMiB, bw=%6luKiB/sec\n", td->thread_number, td->ddir == DDIR_READ ? " read": "write", td->error, prio_class, prio, td->max_latency, td->blocks_read * td->bs >> 20, bw);
+ printf("thread%d (%s): err=%2d, prio=%1d/%1d maxl=%5lumsec, io=%6luMiB, bw=%6luKiB/sec\n", td->thread_number, td->ddir == DDIR_READ ? " read": "write", td->error, prio_class, prio, td->max_latency, td->io_blocks * td->bs >> 20, bw);
}
void usage(char *progname)
printf("%s: <-s 0/1> <-b kb> <-t sec> <-w 0/1> <-c r,w,r...> file0... fileN\n", progname);
}
-void setup_rate(struct thread_data *td)
+int setup_rate(struct thread_data *td)
{
- int nr_reads_per_sec = td->rate * 1024 / td->bs;
+ int nr_reads_per_sec;
+
+ if (!td->rate)
+ return 0;
+
+ if (td->rate < td->ratemin) {
+ fprintf(stderr, "min rate larger than nominal rate\n");
+ return -1;
+ }
+ nr_reads_per_sec = td->rate * 1024 / td->bs;
td->rate_usec_cycle = 1000000 / nr_reads_per_sec;
td->rate_pending_usleep = 0;
+ return 0;
}
struct thread_data *get_new_job(void)
{
- struct thread_data *td = &threads[thread_number++];
+ struct thread_data *td;
+
+ if (thread_number >= MAX_JOBS)
+ return NULL;
+
+ td = &threads[thread_number++];
td->thread_number = thread_number;
td->ddir = DDIR_READ;
td->odirect = 1;
td->delay_sleep = 0;
td->rate = 0;
+ td->ratecycle = DEF_RATE_CYCLE;
td->sequential = sequential;
td->ioprio = 0;
memcpy(&td->cpumask, &def_cpumask, sizeof(td->cpumask));
return td;
}
-void add_job(struct thread_data *td, const char *filename, int prioclass,
- int prio)
+static void put_job(struct thread_data *td)
+{
+ memset(&threads[td->thread_number - 1], 0, sizeof(*td));
+ thread_number--;
+}
+
+int add_job(struct thread_data *td, const char *filename, int prioclass,
+ int prio)
{
strcpy(td->file_name, filename);
td->stat_fd = -1;
- sem_init(&td->mutex, 1, 1);
+ sem_init(&td->mutex, 1, 0);
td->min_latency = 10000000;
td->ioprio = (prioclass << IOPRIO_CLASS_SHIFT) | prio;
- if (td->rate)
- setup_rate(td);
+ if (setup_rate(td))
+ return -1;
printf("Client%d: file=%s, rw=%d, prio=%d, seq=%d, odir=%d, bs=%d, rate=%d\n", td->thread_number, filename, td->ddir, td->ioprio, td->sequential, td->odirect, td->bs, td->rate);
+ return 0;
}
static void fill_cpu_mask(cpu_set_t cpumask, int cpu)
* rw=
* direct=
*/
-int parse_jobs_cmd(int argc, char *argv[], int index)
+void parse_jobs_cmd(int argc, char *argv[], int index)
{
struct thread_data *td;
unsigned int prio, prioclass, cpu;
break;
filename[0] = 0;
+
td = get_new_job();
+ if (!td)
+ break;
prioclass = 2;
prio = 4;
- cpu = 0;
c = strstr(p, "rw=");
if (c) {
td->rate = strtoul(string, NULL, 10);
}
+ c = strstr(p, "ratemin=");
+ if (c) {
+ c += 8;
+ fill_option(c, string);
+ td->ratemin = strtoul(string, NULL, 10);
+ }
+
+ c = strstr(p, "ratecycle=");
+ if (c) {
+ c += 10;
+ fill_option(c, string);
+ td->ratecycle = strtoul(string, NULL, 10);
+ }
+
c = strstr(p, "cpumask=");
if (c) {
c += 8;
fill_cpu_mask(td->cpumask, cpu);
}
+ c = strstr(p, "fsync=");
+ if (c) {
+ c += 6;
+ fill_option(c, string);
+ td->fsync_blocks = strtoul(string, NULL, 10);
+ }
c = strstr(p, "random");
if (c)
if (c)
td->sequential = 1;
- add_job(td, filename, prioclass, prio);
+ if (add_job(td, filename, prioclass, prio))
+ put_job(td);
}
free(string);
free(filename);
- return thread_number;
}
int check_int(char *p, char *name, unsigned int *val)
int parse_jobs_ini(char *file)
{
- unsigned int prioclass, prio, cpu, jobs;
+ unsigned int prioclass, prio, cpu;
struct thread_data *td;
char *string, *name;
fpos_t off;
f = fopen(file, "r");
if (!f) {
perror("fopen");
- return 0;
+ return 1;
}
string = malloc(4096);
name = malloc(256);
- jobs = 0;
-
while ((p = fgets(string, 4096, f)) != NULL) {
if (sscanf(p, "[%s]", name) != 1)
continue;
name[strlen(name) - 1] = '\0';
td = get_new_job();
+ if (!td)
+ break;
prioclass = 2;
prio = 4;
- cpu = 0;
fgetpos(f, &off);
while ((p = fgets(string, 4096, f)) != NULL) {
fgetpos(f, &off);
continue;
}
+ if (!check_int(p, "ratemin", &td->ratemin)) {
+ fgetpos(f, &off);
+ continue;
+ }
+ if (!check_int(p, "ratecycle", &td->ratecycle)) {
+ fgetpos(f, &off);
+ continue;
+ }
if (!check_int(p, "delay", &td->delay_sleep)) {
fgetpos(f, &off);
continue;
fgetpos(f, &off);
continue;
}
+ if (!check_int(p, "fsync", &td->fsync_blocks)) {
+ fgetpos(f, &off);
+ continue;
+ }
if (!strcmp(p, "sequential")) {
td->sequential = 1;
fgetpos(f, &off);
}
fsetpos(f, &off);
- add_job(td, name, prioclass, prio);
- jobs++;
+ if (add_job(td, name, prioclass, prio))
+ put_job(td);
}
free(string);
free(name);
- return jobs;
+ return 0;
}
int parse_options(int argc, char *argv[])
parm++;
global_bs = atoi(parm);
global_bs <<= 10;
+ if (!global_bs) {
+ printf("bad block size\n");
+ global_bs = DEF_BS;
+ }
break;
case 't':
parm++;
timeout = atoi(parm);
+ if (!timeout) {
+ printf("bad timeout\n");
+ timeout = DEF_TIMEOUT;
+ }
break;
case 'w':
parm++;
static unsigned long max_run[2], min_run[2], total_blocks[2];
static unsigned long max_bw[2], min_bw[2], maxl[2], minl[2];
static unsigned long read_mb, write_mb, read_agg, write_agg;
- int i, jobs;
-
- if (sched_getaffinity(getpid(), sizeof(def_cpumask), &def_cpumask) == -1) {
- perror("sched_getaffinity");
- return 1;
- }
+ int i;
- shm_id = shmget(0, (argc - 1) * sizeof(struct thread_data), IPC_CREAT | 0600);
+ shm_id = shmget(0, MAX_JOBS * sizeof(struct thread_data), IPC_CREAT | 0600);
if (shm_id == -1) {
perror("shmget");
return 1;
atexit(free_shm);
+ if (sched_getaffinity(getpid(), sizeof(def_cpumask), &def_cpumask) == -1) {
+ perror("sched_getaffinity");
+ return 1;
+ }
+
i = parse_options(argc, argv);
- if (ini_file)
- jobs = parse_jobs_ini(ini_file);
- else
- jobs = parse_jobs_cmd(argc, argv, i);
+ if (ini_file) {
+ if (parse_jobs_ini(ini_file))
+ return 1;
+ } else
+ parse_jobs_cmd(argc, argv, i);
- if (global_bs <= 0)
- global_bs = BS;
- if (timeout <= 0)
- timeout = TIMEOUT;
+ if (!thread_number) {
+ printf("Nothing to do\n");
+ return 1;
+ }
printf("%s: %s, bs=%uKiB, timeo=%u, write_stat=%u, odirect=%d\n", argv[0], sequential ? "sequential" : "random", global_bs >> 10, timeout, write_stat, odirect);
- if (!jobs) {
- printf("Nothing to do\n");
- return 1;
- } else
- printf("%d Clients configured\n", jobs);
+ printf("Starting %d threads\n", thread_number);
+ fflush(stdout);
- for (i = 0; i < jobs; i++) {
+ for (i = 0; i < thread_number; i++) {
sem_init(&startup_sem, 1, 1);
if (fork())
}
}
- if (!thread_number) {
- usage(argv[0]);
- return 1;
- }
-
signal(SIGALRM, sig_handler);
alarm(timeout);
- printf("Starting %d threads\n", thread_number);
+ printf("Kicking off %d threads\n", thread_number);
+ fflush(stdout);
for (i = 0; i < thread_number; i++) {
struct thread_data *td = &threads[i];
max_run[td->ddir] = td->runtime;
if (td->runtime)
- bw = (td->blocks_read * td->bs) / td->runtime;
+ bw = (td->io_blocks * td->bs) / td->runtime;
if (bw < min_bw[td->ddir])
min_bw[td->ddir] = bw;
if (bw > max_bw[td->ddir])
if (td->max_latency > maxl[td->ddir])
maxl[td->ddir] = td->max_latency;
- total_blocks[td->ddir] += td->blocks_read;
+ total_blocks[td->ddir] += td->io_blocks;
if (td->ddir == DDIR_READ) {
- read_mb += (td->bs * td->blocks_read) >> 20;
+ read_mb += (td->bs * td->io_blocks) >> 20;
if (td->runtime)
- read_agg += (td->blocks_read * td->bs) / td->runtime;
+ read_agg += (td->io_blocks * td->bs) / td->runtime;
}
if (td->ddir == DDIR_WRITE) {
- write_mb += (td->bs * td->blocks_read) >> 20;
+ write_mb += (td->bs * td->io_blocks) >> 20;
if (td->runtime)
- write_agg += (td->blocks_read * td->bs) / td->runtime;
+ write_agg += (td->io_blocks * td->bs) / td->runtime;
}
show_stat: