#include <arpa/inet.h>
#include <netdb.h>
#include <signal.h>
+#include <zlib.h>
#include "fio.h"
#include "client.h"
static void handle_probe(struct fio_client *client, struct fio_net_cmd *cmd);
static void handle_text(struct fio_client *client, struct fio_net_cmd *cmd);
static void handle_stop(struct fio_client *client, struct fio_net_cmd *cmd);
+static void handle_start(struct fio_client *client, struct fio_net_cmd *cmd);
struct client_ops fio_client_ops = {
- .text_op = handle_text,
+ .text = handle_text,
.disk_util = handle_du,
.thread_status = handle_ts,
.group_stats = handle_gs,
.stop = handle_stop,
+ .start = handle_start,
.eta = display_thread_status,
.probe = handle_probe,
+ .eta_msec = FIO_CLIENT_DEF_ETA_MSEC,
+ .client_type = FIO_CLIENT_TYPE_CLI,
};
static struct timeval eta_tv;
return NULL;
}
-static void remove_client(struct fio_client *client)
+void fio_put_client(struct fio_client *client)
{
- assert(client->refs);
-
if (--client->refs)
return;
+ free(client->hostname);
+ if (client->argv)
+ free(client->argv);
+ if (client->name)
+ free(client->name);
+
+ free(client);
+}
+
+static void remove_client(struct fio_client *client)
+{
+ assert(client->refs);
+
dprint(FD_NET, "client: removed <%s>\n", client->hostname);
- flist_del(&client->list);
+
+ if (!flist_empty(&client->list))
+ flist_del_init(&client->list);
fio_client_remove_hash(client);
fio_client_dec_jobs_eta(client->eta_in_flight, client->ops->eta);
}
- free(client->hostname);
- if (client->argv)
- free(client->argv);
- if (client->name)
- free(client->name);
-
- free(client);
nr_clients--;
sum_stat_clients--;
+
+ fio_put_client(client);
}
-static void put_client(struct fio_client *client)
+struct fio_client *fio_get_client(struct fio_client *client)
{
- remove_client(client);
+ client->refs++;
+ return client;
}
static void __fio_client_add_cmd_option(struct fio_client *client,
client->fd = -1;
client->ops = ops;
client->refs = 1;
+ client->type = ops->client_type;
__fio_client_add_cmd_option(client, "fio");
client->fd = -1;
client->ops = ops;
client->refs = 1;
+ client->type = ops->client_type;
__fio_client_add_cmd_option(client, "fio");
return 0;
}
+static void probe_client(struct fio_client *client)
+{
+ dprint(FD_NET, "client: send probe\n");
+
+ fio_net_send_simple_cmd(client->fd, FIO_NET_CMD_PROBE, 0, &client->cmd_list);
+}
+
static int fio_client_connect_ip(struct fio_client *client)
{
struct sockaddr *addr;
fd = socket(domain, SOCK_STREAM, 0);
if (fd < 0) {
+ int ret = -errno;
+
log_err("fio: socket: %s\n", strerror(errno));
- return -1;
+ return ret;
}
if (connect(fd, addr, socklen) < 0) {
+ int ret = -errno;
+
log_err("fio: connect: %s\n", strerror(errno));
log_err("fio: failed to connect to %s:%u\n", client->hostname,
client->port);
close(fd);
- return -1;
+ return ret;
}
return fd;
fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (fd < 0) {
+ int ret = -errno;
+
log_err("fio: socket: %s\n", strerror(errno));
- return -1;
+ return ret;
}
len = sizeof(addr->sun_family) + strlen(addr->sun_path) + 1;
if (connect(fd, (struct sockaddr *) addr, len) < 0) {
+ int ret = -errno;
+
log_err("fio: connect; %s\n", strerror(errno));
close(fd);
- return -1;
+ return ret;
}
return fd;
}
-static int fio_client_connect(struct fio_client *client)
+int fio_client_connect(struct fio_client *client)
{
int fd;
dprint(FD_NET, "client: %s connected %d\n", client->hostname, fd);
if (fd < 0)
- return 1;
+ return fd;
client->fd = fd;
fio_client_add_hash(client);
client->state = Client_connected;
+
+ probe_client(client);
return 0;
}
+void fio_client_terminate(struct fio_client *client)
+{
+ fio_net_send_simple_cmd(client->fd, FIO_NET_CMD_QUIT, 0, NULL);
+}
+
void fio_clients_terminate(void)
{
struct flist_head *entry;
flist_for_each(entry, &client_list) {
client = flist_entry(entry, struct fio_client, list);
-
- fio_net_send_simple_cmd(client->fd, FIO_NET_CMD_QUIT, 0, NULL);
+ fio_client_terminate(client);
}
}
sigaction(SIGTERM, &act, NULL);
}
-static void probe_client(struct fio_client *client)
-{
- dprint(FD_NET, "client: send probe\n");
-
- fio_net_send_simple_cmd(client->fd, FIO_NET_CMD_PROBE, 0, &client->cmd_list);
-}
-
static int send_client_cmd_line(struct fio_client *client)
{
struct cmd_single_line_pdu *cslp;
free(lens);
clp->lines = cpu_to_le16(client->argc);
+ clp->client_type = __cpu_to_le16(client->type);
ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_JOBLINE, pdu, mem, 0);
free(pdu);
return ret;
continue;
}
- probe_client(client);
-
if (client->argc > 1)
send_client_cmd_line(client);
}
* Send file contents to server backend. We could use sendfile(), but to remain
* more portable lets just read/write the darn thing.
*/
-static int fio_client_send_ini(struct fio_client *client, const char *filename)
+static int __fio_client_send_ini(struct fio_client *client, const char *filename)
{
+ struct cmd_job_pdu *pdu;
+ size_t p_size;
struct stat sb;
- char *p, *buf;
+ char *p;
+ void *buf;
off_t len;
int fd, ret;
fd = open(filename, O_RDONLY);
if (fd < 0) {
+ int ret = -errno;
+
log_err("fio: job file <%s> open: %s\n", filename, strerror(errno));
- return 1;
+ return ret;
}
if (fstat(fd, &sb) < 0) {
+ int ret = -errno;
+
log_err("fio: job file stat: %s\n", strerror(errno));
close(fd);
- return 1;
+ return ret;
}
- buf = malloc(sb.st_size);
+ p_size = sb.st_size + sizeof(*pdu);
+ pdu = malloc(p_size);
+ buf = pdu->buf;
len = sb.st_size;
p = buf;
return 1;
}
+ pdu->buf_len = __cpu_to_le32(sb.st_size);
+ pdu->client_type = cpu_to_le32(client->type);
+
client->sent_job = 1;
- ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_JOB, buf, sb.st_size, 0);
- free(buf);
+ ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_JOB, pdu, p_size, 0);
+ free(pdu);
close(fd);
return ret;
}
+int fio_client_send_ini(struct fio_client *client, const char *filename)
+{
+ int ret;
+
+ ret = __fio_client_send_ini(client, filename);
+ if (!ret)
+ client->sent_job = 1;
+
+ return ret;
+}
+
int fio_clients_send_ini(const char *filename)
{
struct fio_client *client;
if (fio_client_send_ini(client, filename))
remove_client(client);
-
- client->sent_job = 1;
}
return !nr_clients;
client->eta_in_flight = NULL;
flist_del_init(&client->eta_list);
+ if (client->ops->jobs_eta)
+ client->ops->jobs_eta(client, je);
+
fio_client_sum_jobs_eta(&eta->eta, je);
fio_client_dec_jobs_eta(eta, client->ops->eta);
}
struct cmd_start_pdu *pdu = (struct cmd_start_pdu *) cmd->payload;
client->state = Client_started;
- client->jobs = le32_to_cpu(pdu->jobs);
+ client->jobs = pdu->jobs;
}
static void handle_stop(struct fio_client *client, struct fio_net_cmd *cmd)
pdu->log_usec = le64_to_cpu(pdu->log_usec);
}
+/*
+ * This has been compressed on the server side, since it can be big.
+ * Uncompress here.
+ */
+static struct cmd_iolog_pdu *convert_iolog(struct fio_net_cmd *cmd)
+{
+ struct cmd_iolog_pdu *pdu = (struct cmd_iolog_pdu *) cmd->payload;
+ struct cmd_iolog_pdu *ret;
+ uint32_t nr_samples;
+ unsigned long total;
+ z_stream stream;
+ void *p;
+ int i;
+
+ stream.zalloc = Z_NULL;
+ stream.zfree = Z_NULL;
+ stream.opaque = Z_NULL;
+ stream.avail_in = 0;
+ stream.next_in = Z_NULL;
+
+ if (inflateInit(&stream) != Z_OK)
+ return NULL;
+
+ /*
+ * Get header first, it's not compressed
+ */
+ nr_samples = le32_to_cpu(pdu->nr_samples);
+
+ total = nr_samples * sizeof(struct io_sample);
+ ret = malloc(total + sizeof(*pdu));
+ ret->nr_samples = nr_samples;
+ ret->log_type = le32_to_cpu(pdu->log_type);
+ strcpy((char *) ret->name, (char *) pdu->name);
+
+ p = (void *) ret + sizeof(*pdu);
+
+ stream.avail_in = cmd->pdu_len - sizeof(*pdu);
+ stream.next_in = (void *) pdu + sizeof(*pdu);
+ while (stream.avail_in) {
+ unsigned int this_chunk = 65536;
+ unsigned int this_len;
+ int err;
+
+ if (this_chunk > total)
+ this_chunk = total;
+
+ stream.avail_out = this_chunk;
+ stream.next_out = p;
+ err = inflate(&stream, Z_NO_FLUSH);
+ /* may be Z_OK, or Z_STREAM_END */
+ if (err < 0) {
+ log_err("fio: inflate error %d\n", err);
+ free(ret);
+ ret = NULL;
+ goto out;
+ }
+
+ this_len = this_chunk - stream.avail_out;
+ p += this_len;
+ total -= this_len;
+ }
+
+ for (i = 0; i < ret->nr_samples; i++) {
+ struct io_sample *s = &ret->samples[i];
+
+ s->time = le64_to_cpu(s->time);
+ s->val = le64_to_cpu(s->val);
+ s->ddir = le32_to_cpu(s->ddir);
+ s->bs = le32_to_cpu(s->bs);
+ }
+
+out:
+ inflateEnd(&stream);
+ return ret;
+}
+
int fio_handle_client(struct fio_client *client)
{
struct client_ops *ops = client->ops;
switch (cmd->opcode) {
case FIO_NET_CMD_QUIT:
if (ops->quit)
- ops->quit(client);
+ ops->quit(client, cmd);
remove_client(client);
free(cmd);
break;
case FIO_NET_CMD_TEXT:
convert_text(cmd);
- ops->text_op(client, cmd);
+ ops->text(client, cmd);
free(cmd);
break;
case FIO_NET_CMD_DU: {
break;
case FIO_NET_CMD_SERVER_START:
client->state = Client_running;
+ if (ops->job_start)
+ ops->job_start(client, cmd);
free(cmd);
break;
- case FIO_NET_CMD_START:
- handle_start(client, cmd);
+ case FIO_NET_CMD_START: {
+ struct cmd_start_pdu *pdu = (struct cmd_start_pdu *) cmd->payload;
+
+ pdu->jobs = le32_to_cpu(pdu->jobs);
+ ops->start(client, cmd);
free(cmd);
break;
+ }
case FIO_NET_CMD_STOP: {
struct cmd_end_pdu *pdu = (struct cmd_end_pdu *) cmd->payload;
ops->add_job(client, cmd);
free(cmd);
break;
+ case FIO_NET_CMD_IOLOG:
+ if (ops->iolog) {
+ struct cmd_iolog_pdu *pdu;
+
+ pdu = convert_iolog(cmd);
+ ops->iolog(client, pdu);
+ }
+ free(cmd);
+ break;
default:
log_err("fio: unknown client op: %s\n", fio_server_op(cmd->opcode));
free(cmd);
struct timeval tv;
gettimeofday(&tv, NULL);
- if (mtime_since(&eta_tv, &tv) >= 900) {
+ if (mtime_since(&eta_tv, &tv) >= ops->eta_msec) {
request_client_etas(ops);
memcpy(&eta_tv, &tv, sizeof(tv));
retval = 1;
} else if (client->error)
retval = 1;
- put_client(client);
+ fio_put_client(client);
}
}