From: Jens Axboe Date: Tue, 11 Nov 2014 03:34:00 +0000 (-0700) Subject: Add support for verify triggers and verify state saving X-Git-Tag: fio-2.2.0~60^2~1 X-Git-Url: https://git.kernel.dk/?p=fio.git;a=commitdiff_plain;h=ca09be4b1a8e97f0bca5cfbddb399899cf561eaa Add support for verify triggers and verify state saving This allows you to (for instance) instantly terminate a verify write workload, and then later that everything was written correctly up to that very point. This can be useful for testing powercut scenarios, which is often problematic on storage devices. The trigger part is a file based notification scheme, similar to what is provided for the status dumps. When triggered, fio will exit immediately and write the verify state safely to disk. A trigger can be accompanied by a trigger command. Say you wanted to test powercut safety, the trigger could be something that immediately cut power to the machine. The verify state is either saved locally (if run locally), or saved over the network if run in client/server mode. Signed-off-by: Jens Axboe --- diff --git a/HOWTO b/HOWTO index 73be9d7b..cdb7b013 100644 --- a/HOWTO +++ b/HOWTO @@ -1316,6 +1316,21 @@ verify_backlog_batch=int Control how many blocks fio will verify if verify_backlog_batch is larger than verify_backlog, some blocks will be verified more than once. +verify_state_save=bool When a job exits during the write phase of a verify + workload, save its current state. This allows fio to replay + up until that point, if the verify state is loaded for the + verify read phase. The format of the filename is, roughly, + ---verify.state. is "local" + for a local run, "sock" for a client/server socket connection, + and "ip" (192.168.0.1, for instance) for a networked + client/server connection. + +verify_state_load=bool If a verify termination trigger was used, fio stores + the current write state of each thread. This can be used at + verification time so that fio knows how far it should verify. + Without this information, fio will run a full verification + pass, according to the settings in the job file used. + stonewall wait_for_previous Wait for preceding jobs in the job file to exit, before starting this one. Can be used to insert serialization diff --git a/backend.c b/backend.c index a93c458a..77f52337 100644 --- a/backend.c +++ b/backend.c @@ -526,6 +526,11 @@ static void do_verify(struct thread_data *td, uint64_t verify_bytes) break; } + if (verify_state_should_stop(td, io_u)) { + put_io_u(td, io_u); + break; + } + if (td->o.verify_async) io_u->end_io = verify_io_u_async; else @@ -758,6 +763,11 @@ static uint64_t do_io(struct thread_data *td) io_u->rand_seed *= __rand(&td->verify_state); } + if (verify_state_should_stop(td, io_u)) { + put_io_u(td, io_u); + break; + } + if (td->o.verify_async) io_u->end_io = verify_io_u_async; else @@ -977,6 +987,9 @@ static void cleanup_io_u(struct thread_data *td) io_u_rexit(&td->io_u_requeues); io_u_qexit(&td->io_u_freelist); io_u_qexit(&td->io_u_all); + + if (td->last_write_comp) + sfree(td->last_write_comp); } static int init_io_u(struct thread_data *td) @@ -1093,6 +1106,14 @@ static int init_io_u(struct thread_data *td) p += max_bs; } + if (td->o.verify != VERIFY_NONE) { + td->last_write_comp = scalloc(max_units, sizeof(uint64_t)); + if (!td->last_write_comp) { + log_err("fio: failed to alloc write comp data\n"); + return 1; + } + } + return 0; } @@ -1529,6 +1550,18 @@ static void *thread_main(void *data) td->ts.io_bytes[DDIR_WRITE] = td->io_bytes[DDIR_WRITE]; td->ts.io_bytes[DDIR_TRIM] = td->io_bytes[DDIR_TRIM]; + if (td->o.verify_state_save && !(td->flags & TD_F_VSTATE_SAVED) && + (td->o.verify != VERIFY_NONE && td_write(td))) { + struct all_io_list *state; + size_t sz; + + state = get_all_io_list(td->thread_number, &sz); + if (state) { + __verify_save_state(state, "local"); + free(state); + } + } + fio_unpin_memory(td); fio_writeout_logs(td); @@ -1554,6 +1587,7 @@ err: cleanup_io_u(td); close_ioengine(td); cgroup_shutdown(td, &cgroup_mnt); + verify_free_state(td); if (o->cpumask_set) { ret = fio_cpuset_exit(&o->cpumask); @@ -1730,9 +1764,80 @@ reaped: fio_terminate_threads(TERMINATE_ALL); } +static int __check_trigger_file(void) +{ + struct stat sb; + + if (!trigger_file) + return 0; + + if (stat(trigger_file, &sb)) + return 0; + + if (unlink(trigger_file) < 0) + log_err("fio: failed to unlink %s: %s\n", trigger_file, + strerror(errno)); + + return 1; +} + +static int trigger_timedout(void) +{ + if (trigger_timeout) + return time_since_genesis() >= trigger_timeout; + + return 0; +} + +void exec_trigger(const char *cmd) +{ + int ret; + + if (!cmd) + return; + + ret = system(cmd); + if (ret == -1) + log_err("fio: failed executing %s trigger\n", cmd); +} + +void check_trigger_file(void) +{ + if (__check_trigger_file() || trigger_timedout()) { + if (nr_clients) + fio_clients_send_trigger(trigger_cmd); + else { + verify_save_state(); + fio_terminate_threads(TERMINATE_ALL); + exec_trigger(trigger_cmd); + } + } +} + +static int fio_verify_load_state(struct thread_data *td) +{ + int ret; + + if (!td->o.verify_state) + return 0; + + if (is_backend) { + void *data; + + ret = fio_server_get_verify_state(td->o.name, + td->thread_number - 1, &data); + if (!ret) + verify_convert_assign_state(td, data); + } else + ret = verify_load_state(td, "local"); + + return ret; +} + static void do_usleep(unsigned int usecs) { check_for_running_stats(); + check_trigger_file(); usleep(usecs); } @@ -1786,12 +1891,16 @@ static void run_threads(void) if (!td->o.create_serialize) continue; + if (fio_verify_load_state(td)) + goto reap; + /* * do file setup here so it happens sequentially, * we don't want X number of threads getting their * client data interspersed on disk */ if (setup_files(td)) { +reap: exit_value++; if (td->error) log_err("fio: pid=%d, err=%d/%s\n", diff --git a/cconv.c b/cconv.c index 607cedee..d0a124ec 100644 --- a/cconv.c +++ b/cconv.c @@ -131,6 +131,7 @@ void convert_thread_options_to_cpu(struct thread_options *o, o->verifysort = le32_to_cpu(top->verifysort); o->verifysort_nr = le32_to_cpu(top->verifysort_nr); o->experimental_verify = le32_to_cpu(top->experimental_verify); + o->verify_state = le32_to_cpu(top->verify_state); o->verify_interval = le32_to_cpu(top->verify_interval); o->verify_offset = le32_to_cpu(top->verify_offset); @@ -308,6 +309,7 @@ void convert_thread_options_to_net(struct thread_options_pack *top, top->verifysort = cpu_to_le32(o->verifysort); top->verifysort_nr = cpu_to_le32(o->verifysort_nr); top->experimental_verify = cpu_to_le32(o->experimental_verify); + top->verify_state = cpu_to_le32(o->verify_state); top->verify_interval = cpu_to_le32(o->verify_interval); top->verify_offset = cpu_to_le32(o->verify_offset); top->verify_pattern_bytes = cpu_to_le32(o->verify_pattern_bytes); diff --git a/client.c b/client.c index 56ee6dc5..96b844cb 100644 --- a/client.c +++ b/client.c @@ -23,6 +23,7 @@ #include "server.h" #include "flist.h" #include "hash.h" +#include "verify.h" static void handle_du(struct fio_client *client, struct fio_net_cmd *cmd); static void handle_ts(struct fio_client *client, struct fio_net_cmd *cmd); @@ -337,10 +338,26 @@ int fio_client_add(struct client_ops *ops, const char *hostname, void **cookie) return 0; } +static const char *server_name(struct fio_client *client, char *buf, + size_t bufsize) +{ + const char *from; + + if (client->ipv6) + from = inet_ntop(AF_INET6, (struct sockaddr *) &client->addr6.sin6_addr, buf, bufsize); + else if (client->is_sock) + from = "sock"; + else + from = inet_ntop(AF_INET, (struct sockaddr *) &client->addr.sin_addr, buf, bufsize); + + return from; +} + static void probe_client(struct fio_client *client) { struct cmd_client_probe_pdu pdu; uint64_t tag; + char buf[64]; dprint(FD_NET, "client: send probe\n"); @@ -350,6 +367,8 @@ static void probe_client(struct fio_client *client) pdu.flags = 0; #endif + strcpy((char *) pdu.server, server_name(client, buf, sizeof(buf))); + fio_net_send_cmd(client->fd, FIO_NET_CMD_PROBE, &pdu, sizeof(pdu), &tag, &client->cmd_list); } @@ -1304,6 +1323,69 @@ static struct cmd_iolog_pdu *convert_iolog(struct fio_net_cmd *cmd) return ret; } +static void sendfile_reply(int fd, struct cmd_sendfile_reply *rep, + size_t size, uint64_t tag) +{ + rep->error = cpu_to_le32(rep->error); + fio_net_send_cmd(fd, FIO_NET_CMD_SENDFILE, rep, size, &tag, NULL); +} + +static int read_data(int fd, void *data, size_t size) +{ + ssize_t ret; + + while (size) { + ret = read(fd, data, size); + if (ret < 0) + return errno; + else if (!ret) + break; + else { + data += ret; + size -= ret; + } + } + + if (size) + return EAGAIN; + + return 0; +} + +static int send_file(struct fio_client *client, struct cmd_sendfile *pdu, + uint64_t tag) +{ + struct cmd_sendfile_reply *rep; + struct stat sb; + size_t size; + int fd; + + size = sizeof(*rep); + rep = malloc(size); + + if (stat((char *)pdu->path, &sb) < 0) { +fail: + rep->error = errno; + sendfile_reply(client->fd, rep, size, tag); + free(rep); + return 1; + } + + size += sb.st_size; + rep = realloc(rep, size); + rep->size = cpu_to_le32((uint32_t) sb.st_size); + + fd = open((char *)pdu->path, O_RDONLY); + if (fd == -1 ) + goto fail; + + rep->error = read_data(fd, &rep->data, sb.st_size); + sendfile_reply(client->fd, rep, size, tag); + free(rep); + close(fd); + return 0; +} + int fio_handle_client(struct fio_client *client) { struct client_ops *ops = client->ops; @@ -1323,12 +1405,10 @@ int fio_handle_client(struct fio_client *client) if (ops->quit) ops->quit(client, cmd); remove_client(client); - free(cmd); break; case FIO_NET_CMD_TEXT: convert_text(cmd); ops->text(client, cmd); - free(cmd); break; case FIO_NET_CMD_DU: { struct cmd_du_pdu *du = (struct cmd_du_pdu *) cmd->payload; @@ -1337,7 +1417,6 @@ int fio_handle_client(struct fio_client *client) convert_agg(&du->agg); ops->disk_util(client, cmd); - free(cmd); break; } case FIO_NET_CMD_TS: { @@ -1347,7 +1426,6 @@ int fio_handle_client(struct fio_client *client) convert_gs(&p->rs, &p->rs); ops->thread_status(client, cmd); - free(cmd); break; } case FIO_NET_CMD_GS: { @@ -1356,7 +1434,6 @@ int fio_handle_client(struct fio_client *client) convert_gs(gs, gs); ops->group_stats(client, cmd); - free(cmd); break; } case FIO_NET_CMD_ETA: { @@ -1365,26 +1442,22 @@ int fio_handle_client(struct fio_client *client) remove_reply_cmd(client, cmd); convert_jobs_eta(je); handle_eta(client, cmd); - free(cmd); break; } case FIO_NET_CMD_PROBE: remove_reply_cmd(client, cmd); ops->probe(client, cmd); - free(cmd); break; case FIO_NET_CMD_SERVER_START: client->state = Client_running; if (ops->job_start) ops->job_start(client, cmd); - free(cmd); break; case FIO_NET_CMD_START: { struct cmd_start_pdu *pdu = (struct cmd_start_pdu *) cmd->payload; pdu->jobs = le32_to_cpu(pdu->jobs); ops->start(client, cmd); - free(cmd); break; } case FIO_NET_CMD_STOP: { @@ -1395,7 +1468,6 @@ int fio_handle_client(struct fio_client *client) client->error = le32_to_cpu(pdu->error); client->signal = le32_to_cpu(pdu->signal); ops->stop(client, cmd); - free(cmd); break; } case FIO_NET_CMD_ADD_JOB: { @@ -1406,7 +1478,6 @@ int fio_handle_client(struct fio_client *client) if (ops->add_job) ops->add_job(client, cmd); - free(cmd); break; } case FIO_NET_CMD_IOLOG: @@ -1416,22 +1487,62 @@ int fio_handle_client(struct fio_client *client) pdu = convert_iolog(cmd); ops->iolog(client, pdu); } - free(cmd); break; case FIO_NET_CMD_UPDATE_JOB: ops->update_job(client, cmd); remove_reply_cmd(client, cmd); - free(cmd); break; + case FIO_NET_CMD_VTRIGGER: { + struct all_io_list *pdu = (struct all_io_list *) cmd->payload; + char buf[64]; + + __verify_save_state(pdu, server_name(client, buf, sizeof(buf))); + break; + } + case FIO_NET_CMD_SENDFILE: { + struct cmd_sendfile *pdu = (struct cmd_sendfile *) cmd->payload; + send_file(client, pdu, cmd->tag); + break; + } default: log_err("fio: unknown client op: %s\n", fio_server_op(cmd->opcode)); - free(cmd); break; } + free(cmd); return 1; } +int fio_clients_send_trigger(const char *cmd) +{ + struct flist_head *entry; + struct fio_client *client; + size_t slen; + + dprint(FD_NET, "client: send vtrigger: %s\n", cmd); + + if (!cmd) + slen = 0; + else + slen = strlen(cmd); + + flist_for_each(entry, &client_list) { + struct cmd_vtrigger_pdu *pdu; + + client = flist_entry(entry, struct fio_client, list); + + pdu = malloc(sizeof(*pdu) + slen); + pdu->len = cpu_to_le16((uint16_t) slen); + if (slen) + memcpy(pdu->cmd, cmd, slen); + fio_net_send_cmd(client->fd, FIO_NET_CMD_VTRIGGER, pdu, + sizeof(*pdu) + slen, NULL, NULL); + free(pdu); + } + + return 0; +} + static void request_client_etas(struct client_ops *ops) { struct fio_client *client; @@ -1558,6 +1669,7 @@ int fio_handle_clients(struct client_ops *ops) do { struct timeval tv; + int timeout; fio_gettime(&tv, NULL); if (mtime_since(&eta_tv, &tv) >= 900) { @@ -1568,7 +1680,11 @@ int fio_handle_clients(struct client_ops *ops) break; } - ret = poll(pfds, nr_clients, ops->eta_msec); + check_trigger_file(); + + timeout = min(100u, ops->eta_msec); + + ret = poll(pfds, nr_clients, timeout); if (ret < 0) { if (errno == EINTR) continue; diff --git a/client.h b/client.h index d632f468..8818de2b 100644 --- a/client.h +++ b/client.h @@ -137,6 +137,7 @@ extern struct fio_client *fio_get_client(struct fio_client *); extern void fio_put_client(struct fio_client *); extern int fio_client_update_options(struct fio_client *, struct thread_options *, uint64_t *); extern int fio_client_wait_for_reply(struct fio_client *, uint64_t); +extern int fio_clients_send_trigger(const char *); #define FIO_CLIENT_DEF_ETA_MSEC 900 diff --git a/fio.1 b/fio.1 index 82868a8e..89cbb820 100644 --- a/fio.1 +++ b/fio.1 @@ -1173,6 +1173,17 @@ Trim this number of IO blocks. .BI experimental_verify \fR=\fPbool Enable experimental verification. .TP +.BI verify_state_save \fR=\fPbool +When a job exits during the write phase of a verify workload, save its +current state. This allows fio to replay up until that point, if the +verify state is loaded for the verify read phase. +.TP +.BI verify_state_load \fR=\fPbool +If a verify termination trigger was used, fio stores the current write +state of each thread. This can be used at verification time so that fio +knows how far it should verify. Without this information, fio will run +a full verification pass, according to the settings in the job file used. +.TP .B stonewall "\fR,\fP wait_for_previous" Wait for preceding jobs in the job file to exit before starting this one. \fBstonewall\fR implies \fBnew_group\fR. diff --git a/fio.h b/fio.h index 28f45975..50cf1b03 100644 --- a/fio.h +++ b/fio.h @@ -74,6 +74,7 @@ enum { TD_F_COMPRESS = 128, TD_F_NOIO = 256, TD_F_COMPRESS_LOG = 512, + TD_F_VSTATE_SAVED = 1024, }; enum { @@ -123,6 +124,13 @@ struct thread_data { uint64_t stat_io_blocks[DDIR_RWDIR_CNT]; struct timeval iops_sample_time; + /* + * Tracks the last iodepth number of completed writes, if data + * verification is enabled + */ + uint64_t *last_write_comp; + unsigned int last_write_idx; + volatile int update_rusage; struct fio_mutex *rusage_sem; struct rusage ru_start; @@ -142,6 +150,7 @@ struct thread_data { int error; int sig; int done; + int stop_io; pid_t pid; char *orig_buffer; size_t orig_buffer_size; @@ -171,6 +180,8 @@ struct thread_data { unsigned int verify_batch; unsigned int trim_batch; + struct thread_io_list *vstate; + int shm_id; /* @@ -224,7 +235,7 @@ struct thread_data { uint64_t total_io_size; uint64_t fill_device_size; - unsigned long io_issues[DDIR_RWDIR_CNT]; + uint64_t io_issues[DDIR_RWDIR_CNT]; uint64_t io_blocks[DDIR_RWDIR_CNT]; uint64_t this_io_blocks[DDIR_RWDIR_CNT]; uint64_t io_bytes[DDIR_RWDIR_CNT]; @@ -380,6 +391,9 @@ extern int status_interval; extern const char fio_version_string[]; extern int helper_do_stat; extern pthread_cond_t helper_cond; +extern char *trigger_file; +extern char *trigger_cmd; +extern long long trigger_timeout; extern struct thread_data *threads; @@ -422,7 +436,7 @@ extern void fio_options_mem_dupe(struct thread_data *); extern void options_mem_dupe(void *data, struct fio_option *options); extern void td_fill_rand_seeds(struct thread_data *); extern void add_job_opts(const char **, int); -extern char *num2str(unsigned long, int, int, int, int); +extern char *num2str(uint64_t, int, int, int, int); extern int ioengine_load(struct thread_data *); extern int parse_dryrun(void); extern int fio_running_or_pending_io_threads(void); @@ -580,7 +594,7 @@ static inline unsigned int td_min_bs(struct thread_data *td) return min(td->o.min_bs[DDIR_TRIM], min_bs); } -static inline int is_power_of_2(unsigned long val) +static inline int is_power_of_2(uint64_t val) { return (val != 0 && ((val & (val - 1)) == 0)); } @@ -636,4 +650,7 @@ enum { FIO_CPUS_SPLIT, }; +extern void exec_trigger(const char *); +extern void check_trigger_file(void); + #endif diff --git a/init.c b/init.c index 70d5d069..c7fdcddc 100644 --- a/init.c +++ b/init.c @@ -64,6 +64,10 @@ int write_bw_log = 0; int read_only = 0; int status_interval = 0; +char *trigger_file = NULL; +char *trigger_cmd = NULL; +long long trigger_timeout = 0; + static int prev_group_jobs; unsigned long fio_debug = 0; @@ -241,6 +245,16 @@ static struct option l_opts[FIO_NR_OPTIONS] = { .has_arg = required_argument, .val = 'L', }, + { + .name = (char *) "trigger", + .has_arg = required_argument, + .val = 'W', + }, + { + .name = (char *) "trigger-timeout", + .has_arg = required_argument, + .val = 'B', + }, { .name = NULL, }, @@ -1665,6 +1679,8 @@ static void usage(const char *name) #ifdef CONFIG_ZLIB printf(" --inflate-log=log\tInflate and output compressed log\n"); #endif + printf(" --trigger=file:cmd\tExecute trigger cmd when file exists\n"); + printf(" --trigger-timeout=t\tExecute trigger af this time\n"); printf("\nFio was written by Jens Axboe "); printf("\n Jens Axboe "); printf("\n Jens Axboe \n"); @@ -2184,6 +2200,39 @@ int parse_cmd_line(int argc, char *argv[], int client_type) status_interval = val / 1000; break; } + case 'W': { + char *split, *cmd; + size_t sz; + + split = strchr(optarg, ':'); + if (!split) { + log_err("fio: trigger is file:command\n"); + do_exit++; + exit_val = 1; + } + + sz = split - optarg; + trigger_file = calloc(1, sz + 1); + strncpy(trigger_file, optarg, sz); + + split++; + cmd = trigger_cmd = strdup(split); + strip_blank_front(&trigger_cmd); + strip_blank_end(trigger_cmd); + if (strlen(trigger_cmd) == 0) { + free(cmd); + trigger_cmd = NULL; + } + break; + } + case 'B': + if (check_str_time(optarg, &trigger_timeout, 1)) { + log_err("fio: failed parsing time %s\n", optarg); + do_exit++; + exit_val = 1; + } + trigger_timeout /= 1000000; + break; case '?': log_err("%s: unrecognized option '%s'\n", argv[0], argv[optind - 1]); diff --git a/io_u.c b/io_u.c index e8894d5d..c51982d8 100644 --- a/io_u.c +++ b/io_u.c @@ -1282,6 +1282,9 @@ struct io_u *__get_io_u(struct thread_data *td) { struct io_u *io_u = NULL; + if (td->stop_io) + return NULL; + td_io_u_lock(td); again: @@ -1642,13 +1645,22 @@ static void io_completed(struct thread_data *td, struct io_u **io_u_ptr, if (!(io_u->flags & IO_U_F_VER_LIST)) td->this_io_bytes[ddir] += bytes; - if (ddir == DDIR_WRITE && f) { - if (f->first_write == -1ULL || - io_u->offset < f->first_write) - f->first_write = io_u->offset; - if (f->last_write == -1ULL || - ((io_u->offset + bytes) > f->last_write)) - f->last_write = io_u->offset + bytes; + if (ddir == DDIR_WRITE) { + if (f) { + if (f->first_write == -1ULL || + io_u->offset < f->first_write) + f->first_write = io_u->offset; + if (f->last_write == -1ULL || + ((io_u->offset + bytes) > f->last_write)) + f->last_write = io_u->offset + bytes; + } + if (td->last_write_comp) { + int idx = td->last_write_idx++; + + td->last_write_comp[idx] = io_u->offset; + if (td->last_write_idx == td->o.iodepth) + td->last_write_idx = 0; + } } if (ramp_time_over(td) && (td->runstate == TD_RUNNING || diff --git a/lib/num2str.c b/lib/num2str.c index 89618688..0ed05f33 100644 --- a/lib/num2str.c +++ b/lib/num2str.c @@ -9,7 +9,7 @@ /* * Cheesy number->string conversion, complete with carry rounding error. */ -char *num2str(unsigned long num, int maxlen, int base, int pow2, int unit_base) +char *num2str(uint64_t num, int maxlen, int base, int pow2, int unit_base) { const char *postfix[] = { "", "K", "M", "G", "P", "E" }; const char *byte_postfix[] = { "", "B", "bit" }; @@ -36,7 +36,7 @@ char *num2str(unsigned long num, int maxlen, int base, int pow2, int unit_base) modulo = -1U; while (post_index < sizeof(postfix)) { - sprintf(tmp, "%lu", num); + sprintf(tmp, "%llu", (unsigned long long) num); if (strlen(tmp) <= maxlen) break; @@ -51,12 +51,12 @@ done: if (post_index >= ARRAY_LENGTH(postfix)) post_index = 0; - sprintf(buf, "%lu%s%s", num, postfix[post_index], - byte_postfix[byte_post_index]); + sprintf(buf, "%llu%s%s", (unsigned long long) num, + postfix[post_index], byte_postfix[byte_post_index]); return buf; } - sprintf(tmp, "%lu", num); + sprintf(tmp, "%llu", (unsigned long long) num); decimals = maxlen - strlen(tmp); if (decimals <= 1) { if (carry) @@ -72,7 +72,7 @@ done: modulo = (modulo + 9) / 10; } while (1); - sprintf(buf, "%lu.%u%s%s", num, modulo, postfix[post_index], - byte_postfix[byte_post_index]); + sprintf(buf, "%llu.%u%s%s", (unsigned long long) num, modulo, + postfix[post_index], byte_postfix[byte_post_index]); return buf; } diff --git a/options.c b/options.c index ac094804..23469d8d 100644 --- a/options.c +++ b/options.c @@ -2524,6 +2524,28 @@ struct fio_option fio_options[FIO_MAX_OPTS] = { .off1 = td_var_offset(experimental_verify), .type = FIO_OPT_BOOL, .help = "Enable experimental verification", + .parent = "verify", + .category = FIO_OPT_C_IO, + .group = FIO_OPT_G_VERIFY, + }, + { + .name = "verify_state_load", + .lname = "Load verify state", + .off1 = td_var_offset(verify_state), + .type = FIO_OPT_BOOL, + .help = "Load verify termination state", + .parent = "verify", + .category = FIO_OPT_C_IO, + .group = FIO_OPT_G_VERIFY, + }, + { + .name = "verify_state_save", + .lname = "Save verify state", + .off1 = td_var_offset(verify_state_save), + .type = FIO_OPT_BOOL, + .def = "1", + .help = "Save verify state on termination", + .parent = "verify", .category = FIO_OPT_C_IO, .group = FIO_OPT_G_VERIFY, }, diff --git a/server.c b/server.c index d70444b9..a8d4868f 100644 --- a/server.c +++ b/server.c @@ -24,6 +24,8 @@ #include "server.h" #include "crc/crc16.h" #include "lib/ieee754.h" +#include "verify.h" +#include "smalloc.h" int fio_net_port = FIO_NET_PORT; @@ -41,6 +43,7 @@ static unsigned int has_zlib = 1; static unsigned int has_zlib = 0; #endif static unsigned int use_zlib; +static char me[128]; struct fio_fork_item { struct flist_head list; @@ -50,6 +53,13 @@ struct fio_fork_item { pid_t pid; }; +struct cmd_reply { + struct fio_mutex lock; + void *data; + size_t size; + int error; +}; + static const char *fio_server_ops[FIO_NET_CMD_NR] = { "", "QUIT", @@ -67,10 +77,12 @@ static const char *fio_server_ops[FIO_NET_CMD_NR] = { "DISK_UTIL", "SERVER_START", "ADD_JOB", - "CMD_RUN", - "CMD_IOLOG", - "CMD_UPDATE_JOB", - "CMD_LOAD_FILE", + "RUN", + "IOLOG", + "UPDATE_JOB", + "LOAD_FILE", + "VTRIGGER", + "SENDFILE", }; const char *fio_server_op(unsigned int op) @@ -661,6 +673,8 @@ static int handle_probe_cmd(struct fio_net_cmd *cmd) dprint(FD_NET, "server: sending probe reply\n"); + strcpy(me, (char *) pdu->server); + memset(&probe, 0, sizeof(probe)); gethostname((char *) probe.hostname, sizeof(probe.hostname)); #ifdef CONFIG_BIG_ENDIAN @@ -756,6 +770,31 @@ static int handle_update_job_cmd(struct fio_net_cmd *cmd) return 0; } +static int handle_trigger_cmd(struct fio_net_cmd *cmd) +{ + struct cmd_vtrigger_pdu *pdu = (struct cmd_vtrigger_pdu *) cmd->payload; + char *buf = (char *) pdu->cmd; + struct all_io_list *rep; + size_t sz; + + pdu->len = le16_to_cpu(pdu->len); + buf[pdu->len] = '\0'; + + rep = get_all_io_list(IO_LIST_ALL, &sz); + if (!rep) { + struct all_io_list state; + + state.threads = cpu_to_le64((uint64_t) 0); + fio_net_send_cmd(server_fd, FIO_NET_CMD_VTRIGGER, &state, sizeof(state), NULL, NULL); + } else { + fio_net_send_cmd(server_fd, FIO_NET_CMD_VTRIGGER, rep, sz, NULL, NULL); + free(rep); + } + + exec_trigger(buf); + return 0; +} + static int handle_command(struct flist_head *job_list, struct fio_net_cmd *cmd) { int ret; @@ -792,6 +831,35 @@ static int handle_command(struct flist_head *job_list, struct fio_net_cmd *cmd) case FIO_NET_CMD_UPDATE_JOB: ret = handle_update_job_cmd(cmd); break; + case FIO_NET_CMD_VTRIGGER: + ret = handle_trigger_cmd(cmd); + break; + case FIO_NET_CMD_SENDFILE: { + struct cmd_sendfile_reply *in; + struct cmd_reply *rep; + + rep = (struct cmd_reply *) (uintptr_t) cmd->tag; + + in = (struct cmd_sendfile_reply *) cmd->payload; + in->size = le32_to_cpu(in->size); + in->error = le32_to_cpu(in->error); + if (in->error) { + ret = 1; + rep->error = in->error; + } else { + ret = 0; + rep->data = smalloc(in->size); + if (!rep->data) { + ret = 1; + rep->error = ENOMEM; + } else { + rep->size = in->size; + memcpy(rep->data, in->data, in->size); + } + } + fio_mutex_up(&rep->lock); + break; + } default: log_err("fio: unknown opcode: %s\n", fio_server_op(cmd->opcode)); ret = 1; @@ -1302,6 +1370,70 @@ void fio_server_send_start(struct thread_data *td) fio_net_send_simple_cmd(server_fd, FIO_NET_CMD_SERVER_START, 0, NULL); } +int fio_server_get_verify_state(const char *name, int threadnumber, + void **datap) +{ + struct thread_io_list *s; + struct cmd_sendfile out; + struct cmd_reply *rep; + uint64_t tag; + void *data; + + dprint(FD_NET, "server: request verify state\n"); + + rep = smalloc(sizeof(*rep)); + if (!rep) { + log_err("fio: smalloc pool too small\n"); + return 1; + } + + __fio_mutex_init(&rep->lock, FIO_MUTEX_LOCKED); + rep->data = NULL; + rep->error = 0; + + verify_state_gen_name((char *) out.path, name, me, threadnumber); + tag = (uint64_t) (uintptr_t) rep; + fio_net_send_cmd(server_fd, FIO_NET_CMD_SENDFILE, &out, sizeof(out), + &tag, NULL); + + /* + * Wait for the backend to receive the reply + */ + if (fio_mutex_down_timeout(&rep->lock, 10)) { + log_err("fio: timed out waiting for reply\n"); + goto fail; + } + + if (rep->error) { + log_err("fio: failure on receiving state file: %s\n", strerror(rep->error)); +fail: + *datap = NULL; + sfree(rep); + fio_net_send_quit(server_fd); + return 1; + } + + /* + * The format is verify_state_hdr, then thread_io_list. Verify + * the header, and the thread_io_list checksum + */ + s = rep->data + sizeof(struct verify_state_hdr); + if (verify_state_hdr(rep->data, s)) + goto fail; + + /* + * Don't need the header from now, copy just the thread_io_list + */ + rep->size -= sizeof(struct verify_state_hdr); + data = malloc(rep->size); + memcpy(data, s, rep->size); + *datap = data; + + sfree(rep->data); + sfree(rep); + return 0; +} + static int fio_init_server_ip(void) { struct sockaddr *addr; diff --git a/server.h b/server.h index 46d05a69..84a06a11 100644 --- a/server.h +++ b/server.h @@ -38,7 +38,7 @@ struct fio_net_cmd_reply { }; enum { - FIO_SERVER_VER = 38, + FIO_SERVER_VER = 39, FIO_SERVER_MAX_FRAGMENT_PDU = 1024, FIO_SERVER_MAX_CMD_MB = 2048, @@ -62,7 +62,9 @@ enum { FIO_NET_CMD_IOLOG = 17, FIO_NET_CMD_UPDATE_JOB = 18, FIO_NET_CMD_LOAD_FILE = 19, - FIO_NET_CMD_NR = 20, + FIO_NET_CMD_VTRIGGER = 20, + FIO_NET_CMD_SENDFILE = 21, + FIO_NET_CMD_NR = 22, FIO_NET_CMD_F_MORE = 1UL << 0, @@ -77,6 +79,25 @@ enum { FIO_PROBE_FLAG_ZLIB = 1UL << 0, }; +struct cmd_sendfile { + uint8_t path[FIO_NET_NAME_MAX]; +}; + +struct cmd_sendfile_reply { + uint32_t size; + uint32_t error; + uint8_t data[0]; +}; + +/* + * Client sends this to server on VTRIGGER, server sends back a full + * all_io_list structure. + */ +struct cmd_vtrigger_pdu { + uint16_t len; + uint8_t cmd[]; +}; + struct cmd_load_file_pdu { uint16_t name_len; uint16_t client_type; @@ -95,6 +116,7 @@ struct cmd_du_pdu { struct cmd_client_probe_pdu { uint64_t flags; + uint8_t server[128]; }; struct cmd_probe_reply_pdu { @@ -175,6 +197,7 @@ extern void fio_server_send_ts(struct thread_stat *, struct group_run_stats *); extern void fio_server_send_gs(struct group_run_stats *); extern void fio_server_send_du(void); extern void fio_server_idle_loop(void); +extern int fio_server_get_verify_state(const char *, int, void **); extern int fio_recv_data(int sk, void *p, unsigned int len); extern int fio_send_data(int sk, const void *p, unsigned int len); diff --git a/thread_options.h b/thread_options.h index 74c3e991..f311e2c6 100644 --- a/thread_options.h +++ b/thread_options.h @@ -99,6 +99,8 @@ struct thread_options { unsigned long long verify_backlog; unsigned int verify_batch; unsigned int experimental_verify; + unsigned int verify_state; + unsigned int verify_state_save; unsigned int use_thread; unsigned int unlink; unsigned int do_disk_util; @@ -330,6 +332,9 @@ struct thread_options_pack { uint64_t verify_backlog; uint32_t verify_batch; uint32_t experimental_verify; + uint32_t verify_state; + uint32_t verify_state_save; + uint32_t pad; uint32_t use_thread; uint32_t unlink; uint32_t do_disk_util; @@ -349,7 +354,6 @@ struct thread_options_pack { uint32_t bs_is_seq_rand; uint32_t random_distribution; - uint32_t pad; fio_fp64_t zipf_theta; fio_fp64_t pareto_h; diff --git a/verify.c b/verify.c index f5c009ce..a0c2e38a 100644 --- a/verify.c +++ b/verify.c @@ -1286,3 +1286,291 @@ void verify_async_exit(struct thread_data *td) free(td->verify_threads); td->verify_threads = NULL; } + +struct all_io_list *get_all_io_list(int save_mask, size_t *sz) +{ + struct all_io_list *rep; + struct thread_data *td; + size_t depth; + void *next; + int i, nr; + + compiletime_assert(sizeof(struct all_io_list) == 8, "all_io_list"); + + /* + * Calculate reply space needed. We need one 'io_state' per thread, + * and the size will vary depending on depth. + */ + depth = 0; + nr = 0; + for_each_td(td, i) { + if (save_mask != IO_LIST_ALL && (i + 1) != save_mask) + continue; + td->stop_io = 1; + td->flags |= TD_F_VSTATE_SAVED; + depth += td->o.iodepth; + nr++; + } + + if (!nr) + return NULL; + + *sz = sizeof(*rep); + *sz += nr * sizeof(struct thread_io_list); + *sz += depth * sizeof(uint64_t); + rep = malloc(*sz); + + rep->threads = cpu_to_le64((uint64_t) nr); + + next = &rep->state[0]; + for_each_td(td, i) { + struct thread_io_list *s = next; + unsigned int comps; + + if (save_mask != IO_LIST_ALL && (i + 1) != save_mask) + continue; + + if (td->last_write_comp) { + int j, k; + + if (td->io_blocks[DDIR_WRITE] < td->o.iodepth) + comps = td->io_blocks[DDIR_WRITE]; + else + comps = td->o.iodepth; + + k = td->last_write_idx - 1; + for (j = 0; j < comps; j++) { + if (k == -1) + k = td->o.iodepth - 1; + s->offsets[j] = cpu_to_le64(td->last_write_comp[k]); + k--; + } + } else + comps = 0; + + s->no_comps = cpu_to_le64((uint64_t) comps); + s->depth = cpu_to_le64((uint64_t) td->o.iodepth); + s->numberio = cpu_to_le64((uint64_t) td->io_issues[DDIR_WRITE]); + s->index = cpu_to_le64((uint64_t) i); + s->rand.s[0] = cpu_to_le32(td->random_state.s1); + s->rand.s[1] = cpu_to_le32(td->random_state.s2); + s->rand.s[2] = cpu_to_le32(td->random_state.s3); + s->rand.s[3] = 0; + strncpy((char *) s->name, td->o.name, sizeof(s->name)); + next = io_list_next(s); + } + + return rep; +} + +static int open_state_file(const char *name, const char *prefix, int num, + int for_write) +{ + char out[64]; + int flags; + int fd; + + if (for_write) + flags = O_CREAT | O_TRUNC | O_WRONLY | O_SYNC; + else + flags = O_RDONLY; + + verify_state_gen_name(out, name, prefix, num); + + fd = open(out, flags, 0644); + if (fd == -1) { + perror("fio: open state file"); + return -1; + } + + return fd; +} + +static int write_thread_list_state(struct thread_io_list *s, + const char *prefix) +{ + struct verify_state_hdr hdr; + uint64_t crc; + ssize_t ret; + int fd; + + fd = open_state_file((const char *) s->name, prefix, s->index, 1); + if (fd == -1) + return 1; + + crc = fio_crc32c((void *)s, thread_io_list_sz(s)); + + hdr.version = cpu_to_le64((uint64_t) VSTATE_HDR_VERSION); + hdr.size = cpu_to_le64((uint64_t) thread_io_list_sz(s)); + hdr.crc = cpu_to_le64(crc); + ret = write(fd, &hdr, sizeof(hdr)); + if (ret != sizeof(hdr)) + goto write_fail; + + ret = write(fd, s, thread_io_list_sz(s)); + if (ret != thread_io_list_sz(s)) { +write_fail: + if (ret < 0) + perror("fio: write state file"); + log_err("fio: failed to write state file\n"); + ret = 1; + } else + ret = 0; + + close(fd); + return ret; +} + +void __verify_save_state(struct all_io_list *state, const char *prefix) +{ + struct thread_io_list *s = &state->state[0]; + unsigned int i; + + for (i = 0; i < le64_to_cpu(state->threads); i++) { + write_thread_list_state(s, prefix); + s = io_list_next(s); + } +} + +void verify_save_state(void) +{ + struct all_io_list *state; + size_t sz; + + state = get_all_io_list(IO_LIST_ALL, &sz); + if (state) { + __verify_save_state(state, "local"); + free(state); + } +} + +void verify_free_state(struct thread_data *td) +{ + if (td->vstate) + free(td->vstate); +} + +void verify_convert_assign_state(struct thread_data *td, + struct thread_io_list *s) +{ + int i; + + s->no_comps = le64_to_cpu(s->no_comps); + s->depth = le64_to_cpu(s->depth); + s->numberio = le64_to_cpu(s->numberio); + for (i = 0; i < 4; i++) + s->rand.s[i] = le32_to_cpu(s->rand.s[i]); + for (i = 0; i < s->no_comps; i++) + s->offsets[i] = le64_to_cpu(s->offsets[i]); + + td->vstate = s; +} + +int verify_state_hdr(struct verify_state_hdr *hdr, struct thread_io_list *s) +{ + uint64_t crc; + + hdr->version = le64_to_cpu(hdr->version); + hdr->size = le64_to_cpu(hdr->size); + hdr->crc = le64_to_cpu(hdr->crc); + + if (hdr->version != VSTATE_HDR_VERSION) + return 1; + + crc = fio_crc32c((void *)s, hdr->size); + if (crc != hdr->crc) + return 1; + + return 0; +} + +int verify_load_state(struct thread_data *td, const char *prefix) +{ + struct thread_io_list *s = NULL; + struct verify_state_hdr hdr; + uint64_t crc; + ssize_t ret; + int fd; + + if (!td->o.verify_state) + return 0; + + fd = open_state_file(td->o.name, prefix, td->thread_number - 1, 0); + if (fd == -1) + return 1; + + ret = read(fd, &hdr, sizeof(hdr)); + if (ret != sizeof(hdr)) { + if (ret < 0) + td_verror(td, errno, "read verify state hdr"); + log_err("fio: failed reading verify state header\n"); + goto err; + } + + hdr.version = le64_to_cpu(hdr.version); + hdr.size = le64_to_cpu(hdr.size); + hdr.crc = le64_to_cpu(hdr.crc); + + if (hdr.version != VSTATE_HDR_VERSION) { + log_err("fio: bad version in verify state header\n"); + goto err; + } + + s = malloc(hdr.size); + ret = read(fd, s, hdr.size); + if (ret != hdr.size) { + if (ret < 0) + td_verror(td, errno, "read verify state"); + log_err("fio: failed reading verity state\n"); + goto err; + } + + crc = fio_crc32c((void *)s, hdr.size); + if (crc != hdr.crc) { + log_err("fio: verify state is corrupt\n"); + goto err; + } + + close(fd); + + verify_convert_assign_state(td, s); + return 0; +err: + if (s) + free(s); + close(fd); + return 1; +} + +/* + * Use the loaded verify state to know when to stop doing verification + */ +int verify_state_should_stop(struct thread_data *td, struct io_u *io_u) +{ + struct thread_io_list *s = td->vstate; + int i; + + if (!s) + return 0; + + /* + * If we're not into the window of issues - depth yet, continue + */ + if (td->io_blocks[DDIR_READ] < s->depth || + s->numberio - td->io_blocks[DDIR_READ] > s->depth) + return 0; + + /* + * We're in the window of having to check if this io was + * completed or not. If the IO was seen as completed, then + * lets verify it. + */ + for (i = 0; i < s->no_comps; i++) + if (io_u->offset == s->offsets[i]) + return 0; + + /* + * Not found, we have to stop + */ + return 1; +} diff --git a/verify.h b/verify.h index bb3fd492..3e52f9c1 100644 --- a/verify.h +++ b/verify.h @@ -88,4 +88,60 @@ extern void fio_verify_init(struct thread_data *td); extern int verify_async_init(struct thread_data *); extern void verify_async_exit(struct thread_data *); +struct thread_rand_state { + uint32_t s[4]; +}; + +/* + * For dumping current write state + */ +struct thread_io_list { + uint64_t no_comps; + uint64_t depth; + uint64_t numberio; + uint64_t index; + struct thread_rand_state rand; + uint8_t name[64]; + uint64_t offsets[0]; +}; + +struct all_io_list { + uint64_t threads; + struct thread_io_list state[0]; +}; + +#define VSTATE_HDR_VERSION 0x01 + +struct verify_state_hdr { + uint64_t version; + uint64_t size; + uint64_t crc; +}; + +#define IO_LIST_ALL 0xffffffff +extern struct all_io_list *get_all_io_list(int, size_t *); +extern void __verify_save_state(struct all_io_list *, const char *); +extern void verify_save_state(void); +extern int verify_load_state(struct thread_data *, const char *); +extern void verify_free_state(struct thread_data *); +extern int verify_state_should_stop(struct thread_data *, struct io_u *); +extern void verify_convert_assign_state(struct thread_data *, struct thread_io_list *); +extern int verify_state_hdr(struct verify_state_hdr *, struct thread_io_list *); + +static inline size_t thread_io_list_sz(struct thread_io_list *s) +{ + return sizeof(*s) + le64_to_cpu(s->depth) * sizeof(uint64_t); +} + +static inline struct thread_io_list *io_list_next(struct thread_io_list *s) +{ + return (void *) s + thread_io_list_sz(s); +} + +static inline void verify_state_gen_name(char *out, const char *name, + const char *prefix, int num) +{ + sprintf(out, "%s-%s-%d-verify.state", prefix, name, num); +} + #endif