if verify_backlog_batch is larger than verify_backlog, some
blocks will be verified more than once.
+verify_state_save=bool When a job exits during the write phase of a verify
+ workload, save its current state. This allows fio to replay
+ up until that point, if the verify state is loaded for the
+ verify read phase. The format of the filename is, roughly,
+ <type>-<jobname>-<jobindex>-verify.state. <type> is "local"
+ for a local run, "sock" for a client/server socket connection,
+ and "ip" (192.168.0.1, for instance) for a networked
+ client/server connection.
+
+verify_state_load=bool If a verify termination trigger was used, fio stores
+ the current write state of each thread. This can be used at
+ verification time so that fio knows how far it should verify.
+ Without this information, fio will run a full verification
+ pass, according to the settings in the job file used.
+
stonewall
wait_for_previous Wait for preceding jobs in the job file to exit, before
starting this one. Can be used to insert serialization
break;
}
+ if (verify_state_should_stop(td, io_u)) {
+ put_io_u(td, io_u);
+ break;
+ }
+
if (td->o.verify_async)
io_u->end_io = verify_io_u_async;
else
io_u->rand_seed *= __rand(&td->verify_state);
}
+ if (verify_state_should_stop(td, io_u)) {
+ put_io_u(td, io_u);
+ break;
+ }
+
if (td->o.verify_async)
io_u->end_io = verify_io_u_async;
else
io_u_rexit(&td->io_u_requeues);
io_u_qexit(&td->io_u_freelist);
io_u_qexit(&td->io_u_all);
+
+ if (td->last_write_comp)
+ sfree(td->last_write_comp);
}
static int init_io_u(struct thread_data *td)
p += max_bs;
}
+ if (td->o.verify != VERIFY_NONE) {
+ td->last_write_comp = scalloc(max_units, sizeof(uint64_t));
+ if (!td->last_write_comp) {
+ log_err("fio: failed to alloc write comp data\n");
+ return 1;
+ }
+ }
+
return 0;
}
td->ts.io_bytes[DDIR_WRITE] = td->io_bytes[DDIR_WRITE];
td->ts.io_bytes[DDIR_TRIM] = td->io_bytes[DDIR_TRIM];
+ if (td->o.verify_state_save && !(td->flags & TD_F_VSTATE_SAVED) &&
+ (td->o.verify != VERIFY_NONE && td_write(td))) {
+ struct all_io_list *state;
+ size_t sz;
+
+ state = get_all_io_list(td->thread_number, &sz);
+ if (state) {
+ __verify_save_state(state, "local");
+ free(state);
+ }
+ }
+
fio_unpin_memory(td);
fio_writeout_logs(td);
cleanup_io_u(td);
close_ioengine(td);
cgroup_shutdown(td, &cgroup_mnt);
+ verify_free_state(td);
if (o->cpumask_set) {
ret = fio_cpuset_exit(&o->cpumask);
fio_terminate_threads(TERMINATE_ALL);
}
+static int __check_trigger_file(void)
+{
+ struct stat sb;
+
+ if (!trigger_file)
+ return 0;
+
+ if (stat(trigger_file, &sb))
+ return 0;
+
+ if (unlink(trigger_file) < 0)
+ log_err("fio: failed to unlink %s: %s\n", trigger_file,
+ strerror(errno));
+
+ return 1;
+}
+
+static int trigger_timedout(void)
+{
+ if (trigger_timeout)
+ return time_since_genesis() >= trigger_timeout;
+
+ return 0;
+}
+
+void exec_trigger(const char *cmd)
+{
+ int ret;
+
+ if (!cmd)
+ return;
+
+ ret = system(cmd);
+ if (ret == -1)
+ log_err("fio: failed executing %s trigger\n", cmd);
+}
+
+void check_trigger_file(void)
+{
+ if (__check_trigger_file() || trigger_timedout()) {
+ if (nr_clients)
+ fio_clients_send_trigger(trigger_cmd);
+ else {
+ verify_save_state();
+ fio_terminate_threads(TERMINATE_ALL);
+ exec_trigger(trigger_cmd);
+ }
+ }
+}
+
+static int fio_verify_load_state(struct thread_data *td)
+{
+ int ret;
+
+ if (!td->o.verify_state)
+ return 0;
+
+ if (is_backend) {
+ void *data;
+
+ ret = fio_server_get_verify_state(td->o.name,
+ td->thread_number - 1, &data);
+ if (!ret)
+ verify_convert_assign_state(td, data);
+ } else
+ ret = verify_load_state(td, "local");
+
+ return ret;
+}
+
static void do_usleep(unsigned int usecs)
{
check_for_running_stats();
+ check_trigger_file();
usleep(usecs);
}
if (!td->o.create_serialize)
continue;
+ if (fio_verify_load_state(td))
+ goto reap;
+
/*
* do file setup here so it happens sequentially,
* we don't want X number of threads getting their
* client data interspersed on disk
*/
if (setup_files(td)) {
+reap:
exit_value++;
if (td->error)
log_err("fio: pid=%d, err=%d/%s\n",
o->verifysort = le32_to_cpu(top->verifysort);
o->verifysort_nr = le32_to_cpu(top->verifysort_nr);
o->experimental_verify = le32_to_cpu(top->experimental_verify);
+ o->verify_state = le32_to_cpu(top->verify_state);
o->verify_interval = le32_to_cpu(top->verify_interval);
o->verify_offset = le32_to_cpu(top->verify_offset);
top->verifysort = cpu_to_le32(o->verifysort);
top->verifysort_nr = cpu_to_le32(o->verifysort_nr);
top->experimental_verify = cpu_to_le32(o->experimental_verify);
+ top->verify_state = cpu_to_le32(o->verify_state);
top->verify_interval = cpu_to_le32(o->verify_interval);
top->verify_offset = cpu_to_le32(o->verify_offset);
top->verify_pattern_bytes = cpu_to_le32(o->verify_pattern_bytes);
#include "server.h"
#include "flist.h"
#include "hash.h"
+#include "verify.h"
static void handle_du(struct fio_client *client, struct fio_net_cmd *cmd);
static void handle_ts(struct fio_client *client, struct fio_net_cmd *cmd);
return 0;
}
+static const char *server_name(struct fio_client *client, char *buf,
+ size_t bufsize)
+{
+ const char *from;
+
+ if (client->ipv6)
+ from = inet_ntop(AF_INET6, (struct sockaddr *) &client->addr6.sin6_addr, buf, bufsize);
+ else if (client->is_sock)
+ from = "sock";
+ else
+ from = inet_ntop(AF_INET, (struct sockaddr *) &client->addr.sin_addr, buf, bufsize);
+
+ return from;
+}
+
static void probe_client(struct fio_client *client)
{
struct cmd_client_probe_pdu pdu;
uint64_t tag;
+ char buf[64];
dprint(FD_NET, "client: send probe\n");
pdu.flags = 0;
#endif
+ strcpy((char *) pdu.server, server_name(client, buf, sizeof(buf)));
+
fio_net_send_cmd(client->fd, FIO_NET_CMD_PROBE, &pdu, sizeof(pdu), &tag, &client->cmd_list);
}
return ret;
}
+static void sendfile_reply(int fd, struct cmd_sendfile_reply *rep,
+ size_t size, uint64_t tag)
+{
+ rep->error = cpu_to_le32(rep->error);
+ fio_net_send_cmd(fd, FIO_NET_CMD_SENDFILE, rep, size, &tag, NULL);
+}
+
+static int read_data(int fd, void *data, size_t size)
+{
+ ssize_t ret;
+
+ while (size) {
+ ret = read(fd, data, size);
+ if (ret < 0)
+ return errno;
+ else if (!ret)
+ break;
+ else {
+ data += ret;
+ size -= ret;
+ }
+ }
+
+ if (size)
+ return EAGAIN;
+
+ return 0;
+}
+
+static int send_file(struct fio_client *client, struct cmd_sendfile *pdu,
+ uint64_t tag)
+{
+ struct cmd_sendfile_reply *rep;
+ struct stat sb;
+ size_t size;
+ int fd;
+
+ size = sizeof(*rep);
+ rep = malloc(size);
+
+ if (stat((char *)pdu->path, &sb) < 0) {
+fail:
+ rep->error = errno;
+ sendfile_reply(client->fd, rep, size, tag);
+ free(rep);
+ return 1;
+ }
+
+ size += sb.st_size;
+ rep = realloc(rep, size);
+ rep->size = cpu_to_le32((uint32_t) sb.st_size);
+
+ fd = open((char *)pdu->path, O_RDONLY);
+ if (fd == -1 )
+ goto fail;
+
+ rep->error = read_data(fd, &rep->data, sb.st_size);
+ sendfile_reply(client->fd, rep, size, tag);
+ free(rep);
+ close(fd);
+ return 0;
+}
+
int fio_handle_client(struct fio_client *client)
{
struct client_ops *ops = client->ops;
if (ops->quit)
ops->quit(client, cmd);
remove_client(client);
- free(cmd);
break;
case FIO_NET_CMD_TEXT:
convert_text(cmd);
ops->text(client, cmd);
- free(cmd);
break;
case FIO_NET_CMD_DU: {
struct cmd_du_pdu *du = (struct cmd_du_pdu *) cmd->payload;
convert_agg(&du->agg);
ops->disk_util(client, cmd);
- free(cmd);
break;
}
case FIO_NET_CMD_TS: {
convert_gs(&p->rs, &p->rs);
ops->thread_status(client, cmd);
- free(cmd);
break;
}
case FIO_NET_CMD_GS: {
convert_gs(gs, gs);
ops->group_stats(client, cmd);
- free(cmd);
break;
}
case FIO_NET_CMD_ETA: {
remove_reply_cmd(client, cmd);
convert_jobs_eta(je);
handle_eta(client, cmd);
- free(cmd);
break;
}
case FIO_NET_CMD_PROBE:
remove_reply_cmd(client, cmd);
ops->probe(client, cmd);
- free(cmd);
break;
case FIO_NET_CMD_SERVER_START:
client->state = Client_running;
if (ops->job_start)
ops->job_start(client, cmd);
- free(cmd);
break;
case FIO_NET_CMD_START: {
struct cmd_start_pdu *pdu = (struct cmd_start_pdu *) cmd->payload;
pdu->jobs = le32_to_cpu(pdu->jobs);
ops->start(client, cmd);
- free(cmd);
break;
}
case FIO_NET_CMD_STOP: {
client->error = le32_to_cpu(pdu->error);
client->signal = le32_to_cpu(pdu->signal);
ops->stop(client, cmd);
- free(cmd);
break;
}
case FIO_NET_CMD_ADD_JOB: {
if (ops->add_job)
ops->add_job(client, cmd);
- free(cmd);
break;
}
case FIO_NET_CMD_IOLOG:
pdu = convert_iolog(cmd);
ops->iolog(client, pdu);
}
- free(cmd);
break;
case FIO_NET_CMD_UPDATE_JOB:
ops->update_job(client, cmd);
remove_reply_cmd(client, cmd);
- free(cmd);
break;
+ case FIO_NET_CMD_VTRIGGER: {
+ struct all_io_list *pdu = (struct all_io_list *) cmd->payload;
+ char buf[64];
+
+ __verify_save_state(pdu, server_name(client, buf, sizeof(buf)));
+ break;
+ }
+ case FIO_NET_CMD_SENDFILE: {
+ struct cmd_sendfile *pdu = (struct cmd_sendfile *) cmd->payload;
+ send_file(client, pdu, cmd->tag);
+ break;
+ }
default:
log_err("fio: unknown client op: %s\n", fio_server_op(cmd->opcode));
- free(cmd);
break;
}
+ free(cmd);
return 1;
}
+int fio_clients_send_trigger(const char *cmd)
+{
+ struct flist_head *entry;
+ struct fio_client *client;
+ size_t slen;
+
+ dprint(FD_NET, "client: send vtrigger: %s\n", cmd);
+
+ if (!cmd)
+ slen = 0;
+ else
+ slen = strlen(cmd);
+
+ flist_for_each(entry, &client_list) {
+ struct cmd_vtrigger_pdu *pdu;
+
+ client = flist_entry(entry, struct fio_client, list);
+
+ pdu = malloc(sizeof(*pdu) + slen);
+ pdu->len = cpu_to_le16((uint16_t) slen);
+ if (slen)
+ memcpy(pdu->cmd, cmd, slen);
+ fio_net_send_cmd(client->fd, FIO_NET_CMD_VTRIGGER, pdu,
+ sizeof(*pdu) + slen, NULL, NULL);
+ free(pdu);
+ }
+
+ return 0;
+}
+
static void request_client_etas(struct client_ops *ops)
{
struct fio_client *client;
do {
struct timeval tv;
+ int timeout;
fio_gettime(&tv, NULL);
if (mtime_since(&eta_tv, &tv) >= 900) {
break;
}
- ret = poll(pfds, nr_clients, ops->eta_msec);
+ check_trigger_file();
+
+ timeout = min(100u, ops->eta_msec);
+
+ ret = poll(pfds, nr_clients, timeout);
if (ret < 0) {
if (errno == EINTR)
continue;
extern void fio_put_client(struct fio_client *);
extern int fio_client_update_options(struct fio_client *, struct thread_options *, uint64_t *);
extern int fio_client_wait_for_reply(struct fio_client *, uint64_t);
+extern int fio_clients_send_trigger(const char *);
#define FIO_CLIENT_DEF_ETA_MSEC 900
.BI experimental_verify \fR=\fPbool
Enable experimental verification.
.TP
+.BI verify_state_save \fR=\fPbool
+When a job exits during the write phase of a verify workload, save its
+current state. This allows fio to replay up until that point, if the
+verify state is loaded for the verify read phase.
+.TP
+.BI verify_state_load \fR=\fPbool
+If a verify termination trigger was used, fio stores the current write
+state of each thread. This can be used at verification time so that fio
+knows how far it should verify. Without this information, fio will run
+a full verification pass, according to the settings in the job file used.
+.TP
.B stonewall "\fR,\fP wait_for_previous"
Wait for preceding jobs in the job file to exit before starting this one.
\fBstonewall\fR implies \fBnew_group\fR.
TD_F_COMPRESS = 128,
TD_F_NOIO = 256,
TD_F_COMPRESS_LOG = 512,
+ TD_F_VSTATE_SAVED = 1024,
};
enum {
uint64_t stat_io_blocks[DDIR_RWDIR_CNT];
struct timeval iops_sample_time;
+ /*
+ * Tracks the last iodepth number of completed writes, if data
+ * verification is enabled
+ */
+ uint64_t *last_write_comp;
+ unsigned int last_write_idx;
+
volatile int update_rusage;
struct fio_mutex *rusage_sem;
struct rusage ru_start;
int error;
int sig;
int done;
+ int stop_io;
pid_t pid;
char *orig_buffer;
size_t orig_buffer_size;
unsigned int verify_batch;
unsigned int trim_batch;
+ struct thread_io_list *vstate;
+
int shm_id;
/*
uint64_t total_io_size;
uint64_t fill_device_size;
- unsigned long io_issues[DDIR_RWDIR_CNT];
+ uint64_t io_issues[DDIR_RWDIR_CNT];
uint64_t io_blocks[DDIR_RWDIR_CNT];
uint64_t this_io_blocks[DDIR_RWDIR_CNT];
uint64_t io_bytes[DDIR_RWDIR_CNT];
extern const char fio_version_string[];
extern int helper_do_stat;
extern pthread_cond_t helper_cond;
+extern char *trigger_file;
+extern char *trigger_cmd;
+extern long long trigger_timeout;
extern struct thread_data *threads;
extern void options_mem_dupe(void *data, struct fio_option *options);
extern void td_fill_rand_seeds(struct thread_data *);
extern void add_job_opts(const char **, int);
-extern char *num2str(unsigned long, int, int, int, int);
+extern char *num2str(uint64_t, int, int, int, int);
extern int ioengine_load(struct thread_data *);
extern int parse_dryrun(void);
extern int fio_running_or_pending_io_threads(void);
return min(td->o.min_bs[DDIR_TRIM], min_bs);
}
-static inline int is_power_of_2(unsigned long val)
+static inline int is_power_of_2(uint64_t val)
{
return (val != 0 && ((val & (val - 1)) == 0));
}
FIO_CPUS_SPLIT,
};
+extern void exec_trigger(const char *);
+extern void check_trigger_file(void);
+
#endif
int read_only = 0;
int status_interval = 0;
+char *trigger_file = NULL;
+char *trigger_cmd = NULL;
+long long trigger_timeout = 0;
+
static int prev_group_jobs;
unsigned long fio_debug = 0;
.has_arg = required_argument,
.val = 'L',
},
+ {
+ .name = (char *) "trigger",
+ .has_arg = required_argument,
+ .val = 'W',
+ },
+ {
+ .name = (char *) "trigger-timeout",
+ .has_arg = required_argument,
+ .val = 'B',
+ },
{
.name = NULL,
},
#ifdef CONFIG_ZLIB
printf(" --inflate-log=log\tInflate and output compressed log\n");
#endif
+ printf(" --trigger=file:cmd\tExecute trigger cmd when file exists\n");
+ printf(" --trigger-timeout=t\tExecute trigger af this time\n");
printf("\nFio was written by Jens Axboe <jens.axboe@oracle.com>");
printf("\n Jens Axboe <jaxboe@fusionio.com>");
printf("\n Jens Axboe <axboe@fb.com>\n");
status_interval = val / 1000;
break;
}
+ case 'W': {
+ char *split, *cmd;
+ size_t sz;
+
+ split = strchr(optarg, ':');
+ if (!split) {
+ log_err("fio: trigger is file:command\n");
+ do_exit++;
+ exit_val = 1;
+ }
+
+ sz = split - optarg;
+ trigger_file = calloc(1, sz + 1);
+ strncpy(trigger_file, optarg, sz);
+
+ split++;
+ cmd = trigger_cmd = strdup(split);
+ strip_blank_front(&trigger_cmd);
+ strip_blank_end(trigger_cmd);
+ if (strlen(trigger_cmd) == 0) {
+ free(cmd);
+ trigger_cmd = NULL;
+ }
+ break;
+ }
+ case 'B':
+ if (check_str_time(optarg, &trigger_timeout, 1)) {
+ log_err("fio: failed parsing time %s\n", optarg);
+ do_exit++;
+ exit_val = 1;
+ }
+ trigger_timeout /= 1000000;
+ break;
case '?':
log_err("%s: unrecognized option '%s'\n", argv[0],
argv[optind - 1]);
{
struct io_u *io_u = NULL;
+ if (td->stop_io)
+ return NULL;
+
td_io_u_lock(td);
again:
if (!(io_u->flags & IO_U_F_VER_LIST))
td->this_io_bytes[ddir] += bytes;
- if (ddir == DDIR_WRITE && f) {
- if (f->first_write == -1ULL ||
- io_u->offset < f->first_write)
- f->first_write = io_u->offset;
- if (f->last_write == -1ULL ||
- ((io_u->offset + bytes) > f->last_write))
- f->last_write = io_u->offset + bytes;
+ if (ddir == DDIR_WRITE) {
+ if (f) {
+ if (f->first_write == -1ULL ||
+ io_u->offset < f->first_write)
+ f->first_write = io_u->offset;
+ if (f->last_write == -1ULL ||
+ ((io_u->offset + bytes) > f->last_write))
+ f->last_write = io_u->offset + bytes;
+ }
+ if (td->last_write_comp) {
+ int idx = td->last_write_idx++;
+
+ td->last_write_comp[idx] = io_u->offset;
+ if (td->last_write_idx == td->o.iodepth)
+ td->last_write_idx = 0;
+ }
}
if (ramp_time_over(td) && (td->runstate == TD_RUNNING ||
/*
* Cheesy number->string conversion, complete with carry rounding error.
*/
-char *num2str(unsigned long num, int maxlen, int base, int pow2, int unit_base)
+char *num2str(uint64_t num, int maxlen, int base, int pow2, int unit_base)
{
const char *postfix[] = { "", "K", "M", "G", "P", "E" };
const char *byte_postfix[] = { "", "B", "bit" };
modulo = -1U;
while (post_index < sizeof(postfix)) {
- sprintf(tmp, "%lu", num);
+ sprintf(tmp, "%llu", (unsigned long long) num);
if (strlen(tmp) <= maxlen)
break;
if (post_index >= ARRAY_LENGTH(postfix))
post_index = 0;
- sprintf(buf, "%lu%s%s", num, postfix[post_index],
- byte_postfix[byte_post_index]);
+ sprintf(buf, "%llu%s%s", (unsigned long long) num,
+ postfix[post_index], byte_postfix[byte_post_index]);
return buf;
}
- sprintf(tmp, "%lu", num);
+ sprintf(tmp, "%llu", (unsigned long long) num);
decimals = maxlen - strlen(tmp);
if (decimals <= 1) {
if (carry)
modulo = (modulo + 9) / 10;
} while (1);
- sprintf(buf, "%lu.%u%s%s", num, modulo, postfix[post_index],
- byte_postfix[byte_post_index]);
+ sprintf(buf, "%llu.%u%s%s", (unsigned long long) num, modulo,
+ postfix[post_index], byte_postfix[byte_post_index]);
return buf;
}
.off1 = td_var_offset(experimental_verify),
.type = FIO_OPT_BOOL,
.help = "Enable experimental verification",
+ .parent = "verify",
+ .category = FIO_OPT_C_IO,
+ .group = FIO_OPT_G_VERIFY,
+ },
+ {
+ .name = "verify_state_load",
+ .lname = "Load verify state",
+ .off1 = td_var_offset(verify_state),
+ .type = FIO_OPT_BOOL,
+ .help = "Load verify termination state",
+ .parent = "verify",
+ .category = FIO_OPT_C_IO,
+ .group = FIO_OPT_G_VERIFY,
+ },
+ {
+ .name = "verify_state_save",
+ .lname = "Save verify state",
+ .off1 = td_var_offset(verify_state_save),
+ .type = FIO_OPT_BOOL,
+ .def = "1",
+ .help = "Save verify state on termination",
+ .parent = "verify",
.category = FIO_OPT_C_IO,
.group = FIO_OPT_G_VERIFY,
},
#include "server.h"
#include "crc/crc16.h"
#include "lib/ieee754.h"
+#include "verify.h"
+#include "smalloc.h"
int fio_net_port = FIO_NET_PORT;
static unsigned int has_zlib = 0;
#endif
static unsigned int use_zlib;
+static char me[128];
struct fio_fork_item {
struct flist_head list;
pid_t pid;
};
+struct cmd_reply {
+ struct fio_mutex lock;
+ void *data;
+ size_t size;
+ int error;
+};
+
static const char *fio_server_ops[FIO_NET_CMD_NR] = {
"",
"QUIT",
"DISK_UTIL",
"SERVER_START",
"ADD_JOB",
- "CMD_RUN",
- "CMD_IOLOG",
- "CMD_UPDATE_JOB",
- "CMD_LOAD_FILE",
+ "RUN",
+ "IOLOG",
+ "UPDATE_JOB",
+ "LOAD_FILE",
+ "VTRIGGER",
+ "SENDFILE",
};
const char *fio_server_op(unsigned int op)
dprint(FD_NET, "server: sending probe reply\n");
+ strcpy(me, (char *) pdu->server);
+
memset(&probe, 0, sizeof(probe));
gethostname((char *) probe.hostname, sizeof(probe.hostname));
#ifdef CONFIG_BIG_ENDIAN
return 0;
}
+static int handle_trigger_cmd(struct fio_net_cmd *cmd)
+{
+ struct cmd_vtrigger_pdu *pdu = (struct cmd_vtrigger_pdu *) cmd->payload;
+ char *buf = (char *) pdu->cmd;
+ struct all_io_list *rep;
+ size_t sz;
+
+ pdu->len = le16_to_cpu(pdu->len);
+ buf[pdu->len] = '\0';
+
+ rep = get_all_io_list(IO_LIST_ALL, &sz);
+ if (!rep) {
+ struct all_io_list state;
+
+ state.threads = cpu_to_le64((uint64_t) 0);
+ fio_net_send_cmd(server_fd, FIO_NET_CMD_VTRIGGER, &state, sizeof(state), NULL, NULL);
+ } else {
+ fio_net_send_cmd(server_fd, FIO_NET_CMD_VTRIGGER, rep, sz, NULL, NULL);
+ free(rep);
+ }
+
+ exec_trigger(buf);
+ return 0;
+}
+
static int handle_command(struct flist_head *job_list, struct fio_net_cmd *cmd)
{
int ret;
case FIO_NET_CMD_UPDATE_JOB:
ret = handle_update_job_cmd(cmd);
break;
+ case FIO_NET_CMD_VTRIGGER:
+ ret = handle_trigger_cmd(cmd);
+ break;
+ case FIO_NET_CMD_SENDFILE: {
+ struct cmd_sendfile_reply *in;
+ struct cmd_reply *rep;
+
+ rep = (struct cmd_reply *) (uintptr_t) cmd->tag;
+
+ in = (struct cmd_sendfile_reply *) cmd->payload;
+ in->size = le32_to_cpu(in->size);
+ in->error = le32_to_cpu(in->error);
+ if (in->error) {
+ ret = 1;
+ rep->error = in->error;
+ } else {
+ ret = 0;
+ rep->data = smalloc(in->size);
+ if (!rep->data) {
+ ret = 1;
+ rep->error = ENOMEM;
+ } else {
+ rep->size = in->size;
+ memcpy(rep->data, in->data, in->size);
+ }
+ }
+ fio_mutex_up(&rep->lock);
+ break;
+ }
default:
log_err("fio: unknown opcode: %s\n", fio_server_op(cmd->opcode));
ret = 1;
fio_net_send_simple_cmd(server_fd, FIO_NET_CMD_SERVER_START, 0, NULL);
}
+int fio_server_get_verify_state(const char *name, int threadnumber,
+ void **datap)
+{
+ struct thread_io_list *s;
+ struct cmd_sendfile out;
+ struct cmd_reply *rep;
+ uint64_t tag;
+ void *data;
+
+ dprint(FD_NET, "server: request verify state\n");
+
+ rep = smalloc(sizeof(*rep));
+ if (!rep) {
+ log_err("fio: smalloc pool too small\n");
+ return 1;
+ }
+
+ __fio_mutex_init(&rep->lock, FIO_MUTEX_LOCKED);
+ rep->data = NULL;
+ rep->error = 0;
+
+ verify_state_gen_name((char *) out.path, name, me, threadnumber);
+ tag = (uint64_t) (uintptr_t) rep;
+ fio_net_send_cmd(server_fd, FIO_NET_CMD_SENDFILE, &out, sizeof(out),
+ &tag, NULL);
+
+ /*
+ * Wait for the backend to receive the reply
+ */
+ if (fio_mutex_down_timeout(&rep->lock, 10)) {
+ log_err("fio: timed out waiting for reply\n");
+ goto fail;
+ }
+
+ if (rep->error) {
+ log_err("fio: failure on receiving state file: %s\n", strerror(rep->error));
+fail:
+ *datap = NULL;
+ sfree(rep);
+ fio_net_send_quit(server_fd);
+ return 1;
+ }
+
+ /*
+ * The format is verify_state_hdr, then thread_io_list. Verify
+ * the header, and the thread_io_list checksum
+ */
+ s = rep->data + sizeof(struct verify_state_hdr);
+ if (verify_state_hdr(rep->data, s))
+ goto fail;
+
+ /*
+ * Don't need the header from now, copy just the thread_io_list
+ */
+ rep->size -= sizeof(struct verify_state_hdr);
+ data = malloc(rep->size);
+ memcpy(data, s, rep->size);
+ *datap = data;
+
+ sfree(rep->data);
+ sfree(rep);
+ return 0;
+}
+
static int fio_init_server_ip(void)
{
struct sockaddr *addr;
};
enum {
- FIO_SERVER_VER = 38,
+ FIO_SERVER_VER = 39,
FIO_SERVER_MAX_FRAGMENT_PDU = 1024,
FIO_SERVER_MAX_CMD_MB = 2048,
FIO_NET_CMD_IOLOG = 17,
FIO_NET_CMD_UPDATE_JOB = 18,
FIO_NET_CMD_LOAD_FILE = 19,
- FIO_NET_CMD_NR = 20,
+ FIO_NET_CMD_VTRIGGER = 20,
+ FIO_NET_CMD_SENDFILE = 21,
+ FIO_NET_CMD_NR = 22,
FIO_NET_CMD_F_MORE = 1UL << 0,
FIO_PROBE_FLAG_ZLIB = 1UL << 0,
};
+struct cmd_sendfile {
+ uint8_t path[FIO_NET_NAME_MAX];
+};
+
+struct cmd_sendfile_reply {
+ uint32_t size;
+ uint32_t error;
+ uint8_t data[0];
+};
+
+/*
+ * Client sends this to server on VTRIGGER, server sends back a full
+ * all_io_list structure.
+ */
+struct cmd_vtrigger_pdu {
+ uint16_t len;
+ uint8_t cmd[];
+};
+
struct cmd_load_file_pdu {
uint16_t name_len;
uint16_t client_type;
struct cmd_client_probe_pdu {
uint64_t flags;
+ uint8_t server[128];
};
struct cmd_probe_reply_pdu {
extern void fio_server_send_gs(struct group_run_stats *);
extern void fio_server_send_du(void);
extern void fio_server_idle_loop(void);
+extern int fio_server_get_verify_state(const char *, int, void **);
extern int fio_recv_data(int sk, void *p, unsigned int len);
extern int fio_send_data(int sk, const void *p, unsigned int len);
unsigned long long verify_backlog;
unsigned int verify_batch;
unsigned int experimental_verify;
+ unsigned int verify_state;
+ unsigned int verify_state_save;
unsigned int use_thread;
unsigned int unlink;
unsigned int do_disk_util;
uint64_t verify_backlog;
uint32_t verify_batch;
uint32_t experimental_verify;
+ uint32_t verify_state;
+ uint32_t verify_state_save;
+ uint32_t pad;
uint32_t use_thread;
uint32_t unlink;
uint32_t do_disk_util;
uint32_t bs_is_seq_rand;
uint32_t random_distribution;
- uint32_t pad;
fio_fp64_t zipf_theta;
fio_fp64_t pareto_h;
free(td->verify_threads);
td->verify_threads = NULL;
}
+
+struct all_io_list *get_all_io_list(int save_mask, size_t *sz)
+{
+ struct all_io_list *rep;
+ struct thread_data *td;
+ size_t depth;
+ void *next;
+ int i, nr;
+
+ compiletime_assert(sizeof(struct all_io_list) == 8, "all_io_list");
+
+ /*
+ * Calculate reply space needed. We need one 'io_state' per thread,
+ * and the size will vary depending on depth.
+ */
+ depth = 0;
+ nr = 0;
+ for_each_td(td, i) {
+ if (save_mask != IO_LIST_ALL && (i + 1) != save_mask)
+ continue;
+ td->stop_io = 1;
+ td->flags |= TD_F_VSTATE_SAVED;
+ depth += td->o.iodepth;
+ nr++;
+ }
+
+ if (!nr)
+ return NULL;
+
+ *sz = sizeof(*rep);
+ *sz += nr * sizeof(struct thread_io_list);
+ *sz += depth * sizeof(uint64_t);
+ rep = malloc(*sz);
+
+ rep->threads = cpu_to_le64((uint64_t) nr);
+
+ next = &rep->state[0];
+ for_each_td(td, i) {
+ struct thread_io_list *s = next;
+ unsigned int comps;
+
+ if (save_mask != IO_LIST_ALL && (i + 1) != save_mask)
+ continue;
+
+ if (td->last_write_comp) {
+ int j, k;
+
+ if (td->io_blocks[DDIR_WRITE] < td->o.iodepth)
+ comps = td->io_blocks[DDIR_WRITE];
+ else
+ comps = td->o.iodepth;
+
+ k = td->last_write_idx - 1;
+ for (j = 0; j < comps; j++) {
+ if (k == -1)
+ k = td->o.iodepth - 1;
+ s->offsets[j] = cpu_to_le64(td->last_write_comp[k]);
+ k--;
+ }
+ } else
+ comps = 0;
+
+ s->no_comps = cpu_to_le64((uint64_t) comps);
+ s->depth = cpu_to_le64((uint64_t) td->o.iodepth);
+ s->numberio = cpu_to_le64((uint64_t) td->io_issues[DDIR_WRITE]);
+ s->index = cpu_to_le64((uint64_t) i);
+ s->rand.s[0] = cpu_to_le32(td->random_state.s1);
+ s->rand.s[1] = cpu_to_le32(td->random_state.s2);
+ s->rand.s[2] = cpu_to_le32(td->random_state.s3);
+ s->rand.s[3] = 0;
+ strncpy((char *) s->name, td->o.name, sizeof(s->name));
+ next = io_list_next(s);
+ }
+
+ return rep;
+}
+
+static int open_state_file(const char *name, const char *prefix, int num,
+ int for_write)
+{
+ char out[64];
+ int flags;
+ int fd;
+
+ if (for_write)
+ flags = O_CREAT | O_TRUNC | O_WRONLY | O_SYNC;
+ else
+ flags = O_RDONLY;
+
+ verify_state_gen_name(out, name, prefix, num);
+
+ fd = open(out, flags, 0644);
+ if (fd == -1) {
+ perror("fio: open state file");
+ return -1;
+ }
+
+ return fd;
+}
+
+static int write_thread_list_state(struct thread_io_list *s,
+ const char *prefix)
+{
+ struct verify_state_hdr hdr;
+ uint64_t crc;
+ ssize_t ret;
+ int fd;
+
+ fd = open_state_file((const char *) s->name, prefix, s->index, 1);
+ if (fd == -1)
+ return 1;
+
+ crc = fio_crc32c((void *)s, thread_io_list_sz(s));
+
+ hdr.version = cpu_to_le64((uint64_t) VSTATE_HDR_VERSION);
+ hdr.size = cpu_to_le64((uint64_t) thread_io_list_sz(s));
+ hdr.crc = cpu_to_le64(crc);
+ ret = write(fd, &hdr, sizeof(hdr));
+ if (ret != sizeof(hdr))
+ goto write_fail;
+
+ ret = write(fd, s, thread_io_list_sz(s));
+ if (ret != thread_io_list_sz(s)) {
+write_fail:
+ if (ret < 0)
+ perror("fio: write state file");
+ log_err("fio: failed to write state file\n");
+ ret = 1;
+ } else
+ ret = 0;
+
+ close(fd);
+ return ret;
+}
+
+void __verify_save_state(struct all_io_list *state, const char *prefix)
+{
+ struct thread_io_list *s = &state->state[0];
+ unsigned int i;
+
+ for (i = 0; i < le64_to_cpu(state->threads); i++) {
+ write_thread_list_state(s, prefix);
+ s = io_list_next(s);
+ }
+}
+
+void verify_save_state(void)
+{
+ struct all_io_list *state;
+ size_t sz;
+
+ state = get_all_io_list(IO_LIST_ALL, &sz);
+ if (state) {
+ __verify_save_state(state, "local");
+ free(state);
+ }
+}
+
+void verify_free_state(struct thread_data *td)
+{
+ if (td->vstate)
+ free(td->vstate);
+}
+
+void verify_convert_assign_state(struct thread_data *td,
+ struct thread_io_list *s)
+{
+ int i;
+
+ s->no_comps = le64_to_cpu(s->no_comps);
+ s->depth = le64_to_cpu(s->depth);
+ s->numberio = le64_to_cpu(s->numberio);
+ for (i = 0; i < 4; i++)
+ s->rand.s[i] = le32_to_cpu(s->rand.s[i]);
+ for (i = 0; i < s->no_comps; i++)
+ s->offsets[i] = le64_to_cpu(s->offsets[i]);
+
+ td->vstate = s;
+}
+
+int verify_state_hdr(struct verify_state_hdr *hdr, struct thread_io_list *s)
+{
+ uint64_t crc;
+
+ hdr->version = le64_to_cpu(hdr->version);
+ hdr->size = le64_to_cpu(hdr->size);
+ hdr->crc = le64_to_cpu(hdr->crc);
+
+ if (hdr->version != VSTATE_HDR_VERSION)
+ return 1;
+
+ crc = fio_crc32c((void *)s, hdr->size);
+ if (crc != hdr->crc)
+ return 1;
+
+ return 0;
+}
+
+int verify_load_state(struct thread_data *td, const char *prefix)
+{
+ struct thread_io_list *s = NULL;
+ struct verify_state_hdr hdr;
+ uint64_t crc;
+ ssize_t ret;
+ int fd;
+
+ if (!td->o.verify_state)
+ return 0;
+
+ fd = open_state_file(td->o.name, prefix, td->thread_number - 1, 0);
+ if (fd == -1)
+ return 1;
+
+ ret = read(fd, &hdr, sizeof(hdr));
+ if (ret != sizeof(hdr)) {
+ if (ret < 0)
+ td_verror(td, errno, "read verify state hdr");
+ log_err("fio: failed reading verify state header\n");
+ goto err;
+ }
+
+ hdr.version = le64_to_cpu(hdr.version);
+ hdr.size = le64_to_cpu(hdr.size);
+ hdr.crc = le64_to_cpu(hdr.crc);
+
+ if (hdr.version != VSTATE_HDR_VERSION) {
+ log_err("fio: bad version in verify state header\n");
+ goto err;
+ }
+
+ s = malloc(hdr.size);
+ ret = read(fd, s, hdr.size);
+ if (ret != hdr.size) {
+ if (ret < 0)
+ td_verror(td, errno, "read verify state");
+ log_err("fio: failed reading verity state\n");
+ goto err;
+ }
+
+ crc = fio_crc32c((void *)s, hdr.size);
+ if (crc != hdr.crc) {
+ log_err("fio: verify state is corrupt\n");
+ goto err;
+ }
+
+ close(fd);
+
+ verify_convert_assign_state(td, s);
+ return 0;
+err:
+ if (s)
+ free(s);
+ close(fd);
+ return 1;
+}
+
+/*
+ * Use the loaded verify state to know when to stop doing verification
+ */
+int verify_state_should_stop(struct thread_data *td, struct io_u *io_u)
+{
+ struct thread_io_list *s = td->vstate;
+ int i;
+
+ if (!s)
+ return 0;
+
+ /*
+ * If we're not into the window of issues - depth yet, continue
+ */
+ if (td->io_blocks[DDIR_READ] < s->depth ||
+ s->numberio - td->io_blocks[DDIR_READ] > s->depth)
+ return 0;
+
+ /*
+ * We're in the window of having to check if this io was
+ * completed or not. If the IO was seen as completed, then
+ * lets verify it.
+ */
+ for (i = 0; i < s->no_comps; i++)
+ if (io_u->offset == s->offsets[i])
+ return 0;
+
+ /*
+ * Not found, we have to stop
+ */
+ return 1;
+}
extern int verify_async_init(struct thread_data *);
extern void verify_async_exit(struct thread_data *);
+struct thread_rand_state {
+ uint32_t s[4];
+};
+
+/*
+ * For dumping current write state
+ */
+struct thread_io_list {
+ uint64_t no_comps;
+ uint64_t depth;
+ uint64_t numberio;
+ uint64_t index;
+ struct thread_rand_state rand;
+ uint8_t name[64];
+ uint64_t offsets[0];
+};
+
+struct all_io_list {
+ uint64_t threads;
+ struct thread_io_list state[0];
+};
+
+#define VSTATE_HDR_VERSION 0x01
+
+struct verify_state_hdr {
+ uint64_t version;
+ uint64_t size;
+ uint64_t crc;
+};
+
+#define IO_LIST_ALL 0xffffffff
+extern struct all_io_list *get_all_io_list(int, size_t *);
+extern void __verify_save_state(struct all_io_list *, const char *);
+extern void verify_save_state(void);
+extern int verify_load_state(struct thread_data *, const char *);
+extern void verify_free_state(struct thread_data *);
+extern int verify_state_should_stop(struct thread_data *, struct io_u *);
+extern void verify_convert_assign_state(struct thread_data *, struct thread_io_list *);
+extern int verify_state_hdr(struct verify_state_hdr *, struct thread_io_list *);
+
+static inline size_t thread_io_list_sz(struct thread_io_list *s)
+{
+ return sizeof(*s) + le64_to_cpu(s->depth) * sizeof(uint64_t);
+}
+
+static inline struct thread_io_list *io_list_next(struct thread_io_list *s)
+{
+ return (void *) s + thread_io_list_sz(s);
+}
+
+static inline void verify_state_gen_name(char *out, const char *name,
+ const char *prefix, int num)
+{
+ sprintf(out, "%s-%s-%d-verify.state", prefix, name, num);
+}
+
#endif