static void handle_stop(struct fio_client *client, struct fio_net_cmd *cmd);
static void handle_start(struct fio_client *client, struct fio_net_cmd *cmd);
+static void convert_text(struct fio_net_cmd *cmd);
+
struct client_ops fio_client_ops = {
.text = handle_text,
.disk_util = handle_du,
#define FIO_CLIENT_HASH_MASK (FIO_CLIENT_HASH_SZ - 1)
static struct flist_head client_hash[FIO_CLIENT_HASH_SZ];
+static struct cmd_iolog_pdu *convert_iolog(struct fio_net_cmd *, bool *);
+
static void fio_client_add_hash(struct fio_client *client)
{
int bucket = hash_long(client->fd, FIO_CLIENT_HASH_BITS);
return 1;
}
+static void fio_drain_client_text(struct fio_client *client)
+{
+ do {
+ struct fio_net_cmd *cmd;
+
+ cmd = fio_net_recv_cmd(client->fd, false);
+ if (!cmd)
+ break;
+
+ if (cmd->opcode == FIO_NET_CMD_TEXT) {
+ convert_text(cmd);
+ client->ops->text(client, cmd);
+ }
+
+ free(cmd);
+ } while (1);
+}
+
static void remove_client(struct fio_client *client)
{
assert(client->refs);
dprint(FD_NET, "client: removed <%s>\n", client->hostname);
+ fio_drain_client_text(client);
+
if (!flist_empty(&client->list))
flist_del_init(&client->list);
struct cmd_job_option *pdu = (struct cmd_job_option *) cmd->payload;
struct print_option *p;
+ if (!job_opt_object)
+ return;
+
pdu->global = le16_to_cpu(pdu->global);
- pdu->groupid = le16_to_cpu(pdu->groupid);
+ pdu->truncated = le16_to_cpu(pdu->truncated);
+ pdu->groupid = le32_to_cpu(pdu->groupid);
p = malloc(sizeof(*p));
p->name = strdup((char *) pdu->name);
fio_client_dec_jobs_eta(eta, client->ops->eta);
}
+static int fio_client_handle_iolog(struct fio_client *client,
+ struct fio_net_cmd *cmd)
+{
+ struct cmd_iolog_pdu *pdu;
+ bool store_direct;
+
+ pdu = convert_iolog(cmd, &store_direct);
+ if (!pdu) {
+ log_err("fio: failed converting IO log\n");
+ return 1;
+ }
+
+ if (store_direct) {
+ ssize_t ret;
+ size_t sz;
+ int fd;
+
+ fd = open((const char *) pdu->name,
+ O_WRONLY | O_CREAT | O_TRUNC, 0644);
+ if (fd < 0) {
+ log_err("fio: open log: %s\n", strerror(errno));
+ return 1;
+ }
+
+ sz = cmd->pdu_len - sizeof(*pdu);
+ ret = write(fd, pdu->samples, sz);
+ close(fd);
+
+ if (ret != sz) {
+ log_err("fio: short write on compressed log\n");
+ return 1;
+ }
+
+ return 0;
+ } else {
+ FILE *f;
+
+ f = fopen((const char *) pdu->name, "w");
+ if (!f) {
+ log_err("fio: fopen log: %s\n", strerror(errno));
+ return 1;
+ }
+
+ flush_samples(f, pdu->samples,
+ pdu->nr_samples * sizeof(struct io_sample));
+ fclose(f);
+ return 0;
+ }
+}
+
static void handle_probe(struct fio_client *client, struct fio_net_cmd *cmd)
{
struct cmd_probe_reply_pdu *probe = (struct cmd_probe_reply_pdu *) cmd->payload;
* This has been compressed on the server side, since it can be big.
* Uncompress here.
*/
-static struct cmd_iolog_pdu *convert_iolog(struct fio_net_cmd *cmd)
+static struct cmd_iolog_pdu *convert_iolog(struct fio_net_cmd *cmd,
+ bool *store_direct)
{
struct cmd_iolog_pdu *pdu = (struct cmd_iolog_pdu *) cmd->payload;
struct cmd_iolog_pdu *ret;
uint64_t i;
+ int compressed;
void *samples;
+ *store_direct = false;
+
/*
* Convert if compressed and we support it. If it's not
* compressed, we need not do anything.
*/
- if (le32_to_cpu(pdu->compressed)) {
+ compressed = le32_to_cpu(pdu->compressed);
+ if (compressed == XMIT_COMPRESSED) {
#ifndef CONFIG_ZLIB
log_err("fio: server sent compressed data by mistake\n");
return NULL;
#endif
ret = convert_iolog_gz(cmd, pdu);
+ printf("compressed iolog, %p\n", ret);
if (!ret) {
log_err("fio: failed decompressing log\n");
return NULL;
}
+ } else if (compressed == STORE_COMPRESSED) {
+ *store_direct = true;
+ ret = pdu;
} else
ret = pdu;
ret->compressed = le32_to_cpu(ret->compressed);
ret->log_offset = le32_to_cpu(ret->log_offset);
+ if (*store_direct)
+ return ret;
+
samples = &ret->samples[0];
for (i = 0; i < ret->nr_samples; i++) {
struct io_sample *s;
fio_net_send_cmd(fd, FIO_NET_CMD_SENDFILE, rep, size, &tag, NULL);
}
-static int send_file(struct fio_client *client, struct cmd_sendfile *pdu,
- uint64_t tag)
+static int fio_send_file(struct fio_client *client, struct cmd_sendfile *pdu,
+ uint64_t tag)
{
struct cmd_sendfile_reply *rep;
struct stat sb;
dprint(FD_NET, "client: handle %s\n", client->hostname);
- cmd = fio_net_recv_cmd(client->fd);
+ cmd = fio_net_recv_cmd(client->fd, true);
if (!cmd)
return 0;
break;
}
case FIO_NET_CMD_IOLOG:
- if (ops->iolog) {
- struct cmd_iolog_pdu *pdu;
-
- pdu = convert_iolog(cmd);
- ops->iolog(client, pdu);
- }
+ fio_client_handle_iolog(client, cmd);
break;
case FIO_NET_CMD_UPDATE_JOB:
ops->update_job(client, cmd);
}
case FIO_NET_CMD_SENDFILE: {
struct cmd_sendfile *pdu = (struct cmd_sendfile *) cmd->payload;
- send_file(client, pdu, cmd->tag);
+ fio_send_file(client, pdu, cmd->tag);
break;
}
case FIO_NET_CMD_JOB_OPT: {
static int handle_cmd_timeout(struct fio_client *client,
struct fio_net_cmd_reply *reply)
{
+ flist_del(&reply->list);
+ free(reply);
+
if (reply->opcode != FIO_NET_CMD_SEND_ETA)
return 1;
log_info("client <%s>: timeout on SEND_ETA\n", client->hostname);
- flist_del(&reply->list);
- free(reply);
flist_del_init(&client->eta_list);
if (client->eta_in_flight) {
log_err("fio: client %s, timeout on cmd %s\n", client->hostname,
fio_server_op(reply->opcode));
- flist_del(&reply->list);
- free(reply);
ret = 1;
}