#include <unistd.h>
#include <limits.h>
#include <errno.h>
-#include <fcntl.h>
#include <sys/poll.h>
#include <sys/types.h>
#include <sys/wait.h>
#endif
#include "fio.h"
+#include "options.h"
#include "server.h"
#include "crc/crc16.h"
#include "lib/ieee754.h"
+#include "verify.h"
+#include "smalloc.h"
int fio_net_port = FIO_NET_PORT;
static unsigned int has_zlib = 0;
#endif
static unsigned int use_zlib;
+static char me[128];
struct fio_fork_item {
struct flist_head list;
pid_t pid;
};
+struct cmd_reply {
+ struct fio_mutex lock;
+ void *data;
+ size_t size;
+ int error;
+};
+
static const char *fio_server_ops[FIO_NET_CMD_NR] = {
"",
"QUIT",
"DISK_UTIL",
"SERVER_START",
"ADD_JOB",
- "CMD_RUN",
- "CMD_IOLOG",
+ "RUN",
+ "IOLOG",
+ "UPDATE_JOB",
+ "LOAD_FILE",
+ "VTRIGGER",
+ "SENDFILE",
};
const char *fio_server_op(unsigned int op)
*/
struct fio_net_cmd *fio_net_recv_cmd(int sk)
{
- struct fio_net_cmd cmd, *cmdret = NULL;
+ struct fio_net_cmd cmd, *tmp, *cmdret = NULL;
size_t cmd_size = 0, pdu_offset = 0;
uint16_t crc;
int ret, first = 1;
} else
cmd_size += cmd.pdu_len;
- cmdret = realloc(cmdret, cmd_size);
+ if (cmd_size / 1024 > FIO_SERVER_MAX_CMD_MB * 1024) {
+ log_err("fio: cmd+pdu too large (%llu)\n", (unsigned long long) cmd_size);
+ ret = 1;
+ break;
+ }
+
+ tmp = realloc(cmdret, cmd_size);
+ if (!tmp) {
+ log_err("fio: server failed allocating cmd\n");
+ ret = 1;
+ break;
+ }
+ cmdret = tmp;
if (first)
memcpy(cmdret, &cmd, sizeof(cmd));
/* zero-terminate text input */
if (cmdret->pdu_len) {
if (cmdret->opcode == FIO_NET_CMD_TEXT) {
- struct cmd_text_pdu *pdu = (struct cmd_text_pdu *) cmdret->payload;
- char *buf = (char *) pdu->buf;
+ struct cmd_text_pdu *__pdu = (struct cmd_text_pdu *) cmdret->payload;
+ char *buf = (char *) __pdu->buf;
- buf[pdu->buf_len] = '\0';
+ buf[__pdu->buf_len] = '\0';
} else if (cmdret->opcode == FIO_NET_CMD_JOB) {
- struct cmd_job_pdu *pdu = (struct cmd_job_pdu *) cmdret->payload;
- char *buf = (char *) pdu->buf;
- int len = le32_to_cpu(pdu->buf_len);
+ struct cmd_job_pdu *__pdu = (struct cmd_job_pdu *) cmdret->payload;
+ char *buf = (char *) __pdu->buf;
+ int len = le32_to_cpu(__pdu->buf_len);
buf[len] = '\0';
}
reply = calloc(1, sizeof(*reply));
INIT_FLIST_HEAD(&reply->list);
- gettimeofday(&reply->tv, NULL);
+ fio_gettime(&reply->tv, NULL);
reply->saved_tag = tag;
reply->opcode = opcode;
fio_server_check_fork_items(conn_list);
}
+static int handle_load_file_cmd(struct fio_net_cmd *cmd)
+{
+ struct cmd_load_file_pdu *pdu = (struct cmd_load_file_pdu *) cmd->payload;
+ void *file_name = pdu->file;
+ struct cmd_start_pdu spdu;
+
+ dprint(FD_NET, "server: loading local file %s\n", (char *) file_name);
+
+ pdu->name_len = le16_to_cpu(pdu->name_len);
+ pdu->client_type = le16_to_cpu(pdu->client_type);
+
+ if (parse_jobs_ini(file_name, 0, 0, pdu->client_type)) {
+ fio_net_send_quit(server_fd);
+ return -1;
+ }
+
+ spdu.jobs = cpu_to_le32(thread_number);
+ spdu.stat_outputs = cpu_to_le32(stat_number);
+ fio_net_send_cmd(server_fd, FIO_NET_CMD_START, &spdu, sizeof(spdu), NULL, NULL);
+ return 0;
+}
+
static int handle_run_cmd(struct flist_head *job_list, struct fio_net_cmd *cmd)
{
pid_t pid;
int ret;
+ fio_time_init();
set_genesis_time();
pid = fork();
dprint(FD_NET, "server: sending probe reply\n");
+ strcpy(me, (char *) pdu->server);
+
memset(&probe, 0, sizeof(probe));
gethostname((char *) probe.hostname, sizeof(probe.hostname));
#ifdef CONFIG_BIG_ENDIAN
static int handle_send_eta_cmd(struct fio_net_cmd *cmd)
{
struct jobs_eta *je;
- size_t size;
uint64_t tag = cmd->tag;
+ size_t size;
int i;
- if (!thread_number)
+ je = get_jobs_eta(1, &size);
+ if (!je)
return 0;
- size = sizeof(*je) + thread_number * sizeof(char) + 1;
- je = malloc(size);
- memset(je, 0, size);
-
- if (!calc_thread_status(je, 1)) {
- free(je);
- return 0;
- }
-
dprint(FD_NET, "server sending status\n");
je->nr_running = cpu_to_le32(je->nr_running);
return 0;
}
+static int handle_trigger_cmd(struct fio_net_cmd *cmd)
+{
+ struct cmd_vtrigger_pdu *pdu = (struct cmd_vtrigger_pdu *) cmd->payload;
+ char *buf = (char *) pdu->cmd;
+ struct all_io_list *rep;
+ size_t sz;
+
+ pdu->len = le16_to_cpu(pdu->len);
+ buf[pdu->len] = '\0';
+
+ rep = get_all_io_list(IO_LIST_ALL, &sz);
+ if (!rep) {
+ struct all_io_list state;
+
+ state.threads = cpu_to_le64((uint64_t) 0);
+ fio_net_send_cmd(server_fd, FIO_NET_CMD_VTRIGGER, &state, sizeof(state), NULL, NULL);
+ } else {
+ fio_net_send_cmd(server_fd, FIO_NET_CMD_VTRIGGER, rep, sz, NULL, NULL);
+ free(rep);
+ }
+
+ exec_trigger(buf);
+ return 0;
+}
+
static int handle_command(struct flist_head *job_list, struct fio_net_cmd *cmd)
{
int ret;
case FIO_NET_CMD_EXIT:
exit_backend = 1;
return -1;
+ case FIO_NET_CMD_LOAD_FILE:
+ ret = handle_load_file_cmd(cmd);
+ break;
case FIO_NET_CMD_JOB:
ret = handle_job_cmd(cmd);
break;
case FIO_NET_CMD_UPDATE_JOB:
ret = handle_update_job_cmd(cmd);
break;
+ case FIO_NET_CMD_VTRIGGER:
+ ret = handle_trigger_cmd(cmd);
+ break;
+ case FIO_NET_CMD_SENDFILE: {
+ struct cmd_sendfile_reply *in;
+ struct cmd_reply *rep;
+
+ rep = (struct cmd_reply *) (uintptr_t) cmd->tag;
+
+ in = (struct cmd_sendfile_reply *) cmd->payload;
+ in->size = le32_to_cpu(in->size);
+ in->error = le32_to_cpu(in->error);
+ if (in->error) {
+ ret = 1;
+ rep->error = in->error;
+ } else {
+ ret = 0;
+ rep->data = smalloc(in->size);
+ if (!rep->data) {
+ ret = 1;
+ rep->error = ENOMEM;
+ } else {
+ rep->size = in->size;
+ memcpy(rep->data, in->data, in->size);
+ }
+ }
+ fio_mutex_up(&rep->lock);
+ break;
+ }
default:
log_err("fio: unknown opcode: %s\n", fio_server_op(cmd->opcode));
ret = 1;
_exit(ret);
}
+/* get the address on this host bound by the input socket,
+ * whether it is ipv6 or ipv4 */
+
+int get_my_addr_str( int sk )
+{
+ int ret;
+ struct sockaddr * sockaddr_p;
+ struct sockaddr_in myaddr4 = {0};
+ struct sockaddr_in6 myaddr6 = {0};
+ char * net_addr;
+ socklen_t len = use_ipv6 ? sizeof(myaddr6) : sizeof(myaddr4);
+
+ if (use_ipv6)
+ sockaddr_p = (struct sockaddr * )&myaddr6;
+ else
+ sockaddr_p = (struct sockaddr * )&myaddr4;
+ ret = getsockname(sk, sockaddr_p, &len);
+ if (ret) {
+ log_err("fio: getsockaddr: %s\n", strerror(errno));
+ return -1;
+ }
+ if (use_ipv6)
+ net_addr = (char * )&myaddr6.sin6_addr;
+ else
+ net_addr = (char * )&myaddr4.sin_addr;
+ if (NULL == inet_ntop(use_ipv6?AF_INET6:AF_INET, net_addr, client_sockaddr_str, INET6_ADDRSTRLEN-1)) {
+ log_err("inet_ntop: failed to convert addr to string\n");
+ return -1;
+ }
+ dprint(FD_NET, "fio server bound to addr %s\n", client_sockaddr_str);
+ return 0;
+}
+
static int accept_loop(int listen_sk)
{
struct sockaddr_in addr;
struct sockaddr_in6 addr6;
socklen_t len = use_ipv6 ? sizeof(addr6) : sizeof(addr);
struct pollfd pfd;
- int ret = 0, sk, flags, exitval = 0;
+ int ret = 0, sk, exitval = 0;
FLIST_HEAD(conn_list);
dprint(FD_NET, "server enter accept loop\n");
- flags = fcntl(listen_sk, F_GETFL);
- flags |= O_NONBLOCK;
- fcntl(listen_sk, F_SETFL, flags);
+ fio_set_fd_nonblocking(listen_sk, "server");
while (!exit_backend) {
const char *from;
}
/* exits */
+ get_my_addr_str(sk); /* if error, it's already logged, non-fatal */
handle_connection(sk);
}
/*
* Encode to IEEE 754 for network transfer
*/
- dst->mean.u.i = __cpu_to_le64(fio_double_to_uint64(src->mean.u.f));
- dst->S.u.i = __cpu_to_le64(fio_double_to_uint64(src->S.u.f));
+ dst->mean.u.i = cpu_to_le64(fio_double_to_uint64(src->mean.u.f));
+ dst->S.u.i = cpu_to_le64(fio_double_to_uint64(src->S.u.f));
}
static void convert_gs(struct group_run_stats *dst, struct group_run_stats *src)
p.ts.minf = cpu_to_le64(ts->minf);
p.ts.majf = cpu_to_le64(ts->majf);
p.ts.clat_percentiles = cpu_to_le64(ts->clat_percentiles);
+ p.ts.percentile_precision = cpu_to_le64(ts->percentile_precision);
for (i = 0; i < FIO_IO_U_LIST_MAX_LEN; i++) {
fio_fp64_t *src = &ts->percentile_list[i];
fio_fp64_t *dst = &p.ts.percentile_list[i];
- dst->u.i = __cpu_to_le64(fio_double_to_uint64(src->u.f));
+ dst->u.i = cpu_to_le64(fio_double_to_uint64(src->u.f));
}
for (i = 0; i < FIO_IO_U_MAP_NR; i++) {
for (i = 0; i < DDIR_RWDIR_CNT; i++) {
p.ts.total_io_u[i] = cpu_to_le64(ts->total_io_u[i]);
p.ts.short_io_u[i] = cpu_to_le64(ts->short_io_u[i]);
+ p.ts.drop_io_u[i] = cpu_to_le64(ts->drop_io_u[i]);
}
p.ts.total_submit = cpu_to_le64(ts->total_submit);
p.ts.latency_depth = cpu_to_le32(ts->latency_depth);
p.ts.latency_target = cpu_to_le64(ts->latency_target);
p.ts.latency_window = cpu_to_le64(ts->latency_window);
- p.ts.latency_percentile.u.i = __cpu_to_le64(fio_double_to_uint64(ts->latency_percentile.u.f));
+ p.ts.latency_percentile.u.i = cpu_to_le64(fio_double_to_uint64(ts->latency_percentile.u.f));
+
+ p.ts.nr_block_infos = le64_to_cpu(ts->nr_block_infos);
+ for (i = 0; i < p.ts.nr_block_infos; i++)
+ p.ts.block_infos[i] = le32_to_cpu(ts->block_infos[i]);
convert_gs(&p.rs, rs);
int i;
for (i = 0; i < 2; i++) {
- dst->ios[i] = cpu_to_le32(src->ios[i]);
- dst->merges[i] = cpu_to_le32(src->merges[i]);
+ dst->ios[i] = cpu_to_le64(src->ios[i]);
+ dst->merges[i] = cpu_to_le64(src->merges[i]);
dst->sectors[i] = cpu_to_le64(src->sectors[i]);
- dst->ticks[i] = cpu_to_le32(src->ticks[i]);
+ dst->ticks[i] = cpu_to_le64(src->ticks[i]);
}
- dst->io_ticks = cpu_to_le32(src->io_ticks);
- dst->time_in_queue = cpu_to_le32(src->time_in_queue);
+ dst->io_ticks = cpu_to_le64(src->io_ticks);
+ dst->time_in_queue = cpu_to_le64(src->time_in_queue);
dst->slavecount = cpu_to_le32(src->slavecount);
- dst->max_util.u.i = __cpu_to_le64(fio_double_to_uint64(src->max_util.u.f));
+ dst->max_util.u.i = cpu_to_le64(fio_double_to_uint64(src->max_util.u.f));
}
static void convert_dus(struct disk_util_stat *dst, struct disk_util_stat *src)
strncpy((char *) dst->name, (char *) src->name, FIO_DU_NAME_SZ - 1);
for (i = 0; i < 2; i++) {
- dst->s.ios[i] = cpu_to_le32(src->s.ios[i]);
- dst->s.merges[i] = cpu_to_le32(src->s.merges[i]);
+ dst->s.ios[i] = cpu_to_le64(src->s.ios[i]);
+ dst->s.merges[i] = cpu_to_le64(src->s.merges[i]);
dst->s.sectors[i] = cpu_to_le64(src->s.sectors[i]);
- dst->s.ticks[i] = cpu_to_le32(src->s.ticks[i]);
+ dst->s.ticks[i] = cpu_to_le64(src->s.ticks[i]);
}
- dst->s.io_ticks = cpu_to_le32(src->s.io_ticks);
- dst->s.time_in_queue = cpu_to_le32(src->s.time_in_queue);
+ dst->s.io_ticks = cpu_to_le64(src->s.io_ticks);
+ dst->s.time_in_queue = cpu_to_le64(src->s.time_in_queue);
dst->s.msec = cpu_to_le64(src->s.msec);
}
}
stream.next_in = (void *) log->log;
- stream.avail_in = log->nr_samples * sizeof(struct io_sample);
+ stream.avail_in = log->nr_samples * log_entry_sz(log);
do {
unsigned int this_len, flags = 0;
- int ret;
stream.avail_out = FIO_SERVER_MAX_FRAGMENT_PDU;
stream.next_out = out_pdu;
struct cmd_iolog_pdu pdu;
int i, ret = 0;
+ pdu.nr_samples = cpu_to_le64(log->nr_samples);
pdu.thread_number = cpu_to_le32(td->thread_number);
- pdu.nr_samples = __cpu_to_le32(log->nr_samples);
pdu.log_type = cpu_to_le32(log->log_type);
pdu.compressed = cpu_to_le32(use_zlib);
- strcpy((char *) pdu.name, name);
+
+ strncpy((char *) pdu.name, name, FIO_NET_NAME_MAX);
+ pdu.name[FIO_NET_NAME_MAX - 1] = '\0';
for (i = 0; i < log->nr_samples; i++) {
- struct io_sample *s = &log->log[i];
+ struct io_sample *s = get_sample(log, i);
+
+ s->time = cpu_to_le64(s->time);
+ s->val = cpu_to_le64(s->val);
+ s->__ddir = cpu_to_le32(s->__ddir);
+ s->bs = cpu_to_le32(s->bs);
+
+ if (log->log_offset) {
+ struct io_sample_offset *so = (void *) s;
- s->time = cpu_to_le64(s->time);
- s->val = cpu_to_le64(s->val);
- s->ddir = cpu_to_le32(s->ddir);
- s->bs = cpu_to_le32(s->bs);
+ so->offset = cpu_to_le64(so->offset);
+ }
}
/*
return fio_send_iolog_gz(&pdu, log);
return fio_send_cmd_ext_pdu(server_fd, FIO_NET_CMD_IOLOG, log->log,
- log->nr_samples * sizeof(struct io_sample), 0, 0);
+ log->nr_samples * log_entry_sz(log), 0, 0);
}
void fio_server_send_add_job(struct thread_data *td)
fio_net_send_simple_cmd(server_fd, FIO_NET_CMD_SERVER_START, 0, NULL);
}
+int fio_server_get_verify_state(const char *name, int threadnumber,
+ void **datap, int *version)
+{
+ struct thread_io_list *s;
+ struct cmd_sendfile out;
+ struct cmd_reply *rep;
+ uint64_t tag;
+ void *data;
+
+ dprint(FD_NET, "server: request verify state\n");
+
+ rep = smalloc(sizeof(*rep));
+ if (!rep) {
+ log_err("fio: smalloc pool too small\n");
+ return 1;
+ }
+
+ __fio_mutex_init(&rep->lock, FIO_MUTEX_LOCKED);
+ rep->data = NULL;
+ rep->error = 0;
+
+ verify_state_gen_name((char *) out.path, sizeof(out.path), name, me,
+ threadnumber);
+ tag = (uint64_t) (uintptr_t) rep;
+ fio_net_send_cmd(server_fd, FIO_NET_CMD_SENDFILE, &out, sizeof(out),
+ &tag, NULL);
+
+ /*
+ * Wait for the backend to receive the reply
+ */
+ if (fio_mutex_down_timeout(&rep->lock, 10)) {
+ log_err("fio: timed out waiting for reply\n");
+ goto fail;
+ }
+
+ if (rep->error) {
+ log_err("fio: failure on receiving state file: %s\n", strerror(rep->error));
+fail:
+ *datap = NULL;
+ sfree(rep);
+ fio_net_send_quit(server_fd);
+ return 1;
+ }
+
+ /*
+ * The format is verify_state_hdr, then thread_io_list. Verify
+ * the header, and the thread_io_list checksum
+ */
+ s = rep->data + sizeof(struct verify_state_hdr);
+ if (verify_state_hdr(rep->data, s, version))
+ goto fail;
+
+ /*
+ * Don't need the header from now, copy just the thread_io_list
+ */
+ rep->size -= sizeof(struct verify_state_hdr);
+ data = malloc(rep->size);
+ memcpy(data, s, rep->size);
+ *datap = data;
+
+ sfree(rep->data);
+ __fio_mutex_remove(&rep->lock);
+ sfree(rep);
+ return 0;
+}
+
static int fio_init_server_ip(void)
{
struct sockaddr *addr;
opt = 1;
if (setsockopt(sk, SOL_SOCKET, SO_REUSEADDR, (void *)&opt, sizeof(opt)) < 0) {
- log_err("fio: setsockopt: %s\n", strerror(errno));
+ log_err("fio: setsockopt(REUSEADDR): %s\n", strerror(errno));
close(sk);
return -1;
}
#ifdef SO_REUSEPORT
if (setsockopt(sk, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) < 0) {
- log_err("fio: setsockopt: %s\n", strerror(errno));
+ log_err("fio: setsockopt(REUSEPORT): %s\n", strerror(errno));
close(sk);
return -1;
}
free(pidfile);
return -1;
} else if (pid) {
- int ret = write_pid(pid, pidfile);
-
+ ret = write_pid(pid, pidfile);
free(pidfile);
- exit(ret);
+ _exit(ret);
}
setsid();