SK_F_COPY = 2,
SK_F_SIMPLE = 4,
SK_F_VEC = 8,
+ SK_F_INLINE = 16,
};
struct sk_entry {
int opcode; /* Actual command fields */
void *buf;
off_t size;
- uint64_t *tagptr;
+ uint64_t tag;
struct flist_head next; /* Other sk_entry's, if linked command */
};
struct sk_out {
- unsigned int refs;
-
- int sk;
- struct fio_mutex *lock;
- struct flist_head list;
- struct fio_mutex *wait;
+ unsigned int refs; /* frees sk_out when it drops to zero.
+ * protected by below ->lock */
+
+ int sk; /* socket fd to talk to client */
+ struct fio_mutex lock; /* protects ref and below list */
+ struct flist_head list; /* list of pending transmit work */
+ struct fio_mutex wait; /* wake backend when items added to list */
+ struct fio_mutex xmit; /* held while sending data */
};
static char *fio_server_arg;
static void sk_lock(struct sk_out *sk_out)
{
- fio_mutex_down(sk_out->lock);
+ fio_mutex_down(&sk_out->lock);
}
static void sk_unlock(struct sk_out *sk_out)
{
- fio_mutex_up(sk_out->lock);
+ fio_mutex_up(&sk_out->lock);
}
void sk_out_assign(struct sk_out *sk_out)
static void sk_out_free(struct sk_out *sk_out)
{
- fio_mutex_remove(sk_out->lock);
- fio_mutex_remove(sk_out->wait);
+ __fio_mutex_remove(&sk_out->lock);
+ __fio_mutex_remove(&sk_out->wait);
+ __fio_mutex_remove(&sk_out->xmit);
sfree(sk_out);
}
pthread_setspecific(sk_out_key, NULL);
}
+static void __fio_init_net_cmd(struct fio_net_cmd *cmd, uint16_t opcode,
+ uint32_t pdu_len, uint64_t tag)
+{
+ memset(cmd, 0, sizeof(*cmd));
+
+ cmd->version = __cpu_to_le16(FIO_SERVER_VER);
+ cmd->opcode = cpu_to_le16(opcode);
+ cmd->tag = cpu_to_le64(tag);
+ cmd->pdu_len = cpu_to_le32(pdu_len);
+}
+
+
+static void fio_init_net_cmd(struct fio_net_cmd *cmd, uint16_t opcode,
+ const void *pdu, uint32_t pdu_len, uint64_t tag)
+{
+ __fio_init_net_cmd(cmd, opcode, pdu_len, tag);
+
+ if (pdu)
+ memcpy(&cmd->payload, pdu, pdu_len);
+}
+
const char *fio_server_op(unsigned int op)
{
static char buf[32];
return 1;
}
-int fio_send_data(int sk, const void *p, unsigned int len)
+static int fio_send_data(int sk, const void *p, unsigned int len)
{
struct iovec iov = { .iov_base = (void *) p, .iov_len = len };
return fio_sendv_data(sk, &iov, 1);
}
-int fio_recv_data(int sk, void *p, unsigned int len)
+static int fio_recv_data(int sk, void *p, unsigned int len)
{
do {
int ret = recv(sk, p, len, MSG_WAITALL);
free(reply);
}
-void fio_net_cmd_crc_pdu(struct fio_net_cmd *cmd, const void *pdu)
+static void fio_net_cmd_crc_pdu(struct fio_net_cmd *cmd, const void *pdu)
{
uint32_t pdu_len;
cmd->pdu_crc16 = __cpu_to_le16(fio_crc16(pdu, pdu_len));
}
-void fio_net_cmd_crc(struct fio_net_cmd *cmd)
+static void fio_net_cmd_crc(struct fio_net_cmd *cmd)
{
fio_net_cmd_crc_pdu(cmd, cmd->payload);
}
return ret;
}
-struct sk_entry *fio_net_prep_cmd(uint16_t opcode, void *buf, off_t size,
- uint64_t *tagptr, int flags)
+static struct sk_entry *fio_net_prep_cmd(uint16_t opcode, void *buf,
+ size_t size, uint64_t *tagptr,
+ int flags)
{
struct sk_entry *entry;
memcpy(entry->buf, buf, size);
} else
entry->buf = buf;
+
entry->size = size;
- entry->tagptr = tagptr;
+ if (tagptr)
+ entry->tag = *tagptr;
+ else
+ entry->tag = 0;
entry->flags = flags;
-
return entry;
}
+static int handle_sk_entry(struct sk_out *sk_out, struct sk_entry *entry);
+
static void fio_net_queue_entry(struct sk_entry *entry)
{
struct sk_out *sk_out = pthread_getspecific(sk_out_key);
- sk_lock(sk_out);
- flist_add_tail(&entry->list, &sk_out->list);
- sk_unlock(sk_out);
+ if (entry->flags & SK_F_INLINE)
+ handle_sk_entry(sk_out, entry);
+ else {
+ sk_lock(sk_out);
+ flist_add_tail(&entry->list, &sk_out->list);
+ sk_unlock(sk_out);
- fio_mutex_up(sk_out->wait);
+ fio_mutex_up(&sk_out->wait);
+ }
}
static int fio_net_queue_cmd(uint16_t opcode, void *buf, off_t size,
sfree(entry);
}
-static void entry_set_flags_tag(struct sk_entry *entry, struct flist_head *list,
- unsigned int *flags, uint64_t *tag)
+static void entry_set_flags(struct sk_entry *entry, struct flist_head *list,
+ unsigned int *flags)
{
if (!flist_empty(list))
*flags = FIO_NET_CMD_F_MORE;
else
*flags = 0;
-
- if (entry->tagptr)
- *tag = *entry->tagptr;
- else
- *tag = 0;
}
static int send_vec_entry(struct sk_out *sk_out, struct sk_entry *first)
{
unsigned int flags;
- uint64_t tag;
int ret;
- entry_set_flags_tag(first, &first->next, &flags, &tag);
+ entry_set_flags(first, &first->next, &flags);
- ret = fio_send_cmd_ext_pdu(sk_out->sk, first->opcode, first->buf, first->size, tag, flags);
+ ret = fio_send_cmd_ext_pdu(sk_out->sk, first->opcode, first->buf,
+ first->size, first->tag, flags);
while (!flist_empty(&first->next)) {
struct sk_entry *next;
next = flist_first_entry(&first->next, struct sk_entry, list);
flist_del_init(&next->list);
- entry_set_flags_tag(next, &first->next, &flags, &tag);
+ entry_set_flags(next, &first->next, &flags);
- ret += fio_send_cmd_ext_pdu(sk_out->sk, next->opcode, next->buf, next->size, tag, flags);
+ ret += fio_send_cmd_ext_pdu(sk_out->sk, next->opcode, next->buf,
+ next->size, next->tag, flags);
finish_entry(next);
}
{
int ret;
+ fio_mutex_down(&sk_out->xmit);
+
if (entry->flags & SK_F_VEC)
ret = send_vec_entry(sk_out, entry);
- if (entry->flags & SK_F_SIMPLE) {
- uint64_t tag = 0;
-
- if (entry->tagptr)
- tag = *entry->tagptr;
+ else if (entry->flags & SK_F_SIMPLE) {
+ ret = fio_net_send_simple_cmd(sk_out->sk, entry->opcode,
+ entry->tag, NULL);
+ } else {
+ ret = fio_net_send_cmd(sk_out->sk, entry->opcode, entry->buf,
+ entry->size, &entry->tag, NULL);
+ }
- ret = fio_net_send_simple_cmd(sk_out->sk, entry->opcode, tag, NULL);
- } else
- ret = fio_net_send_cmd(sk_out->sk, entry->opcode, entry->buf, entry->size, entry->tagptr, NULL);
+ fio_mutex_up(&sk_out->xmit);
if (ret)
log_err("fio: failed handling cmd %s\n", fio_server_op(entry->opcode));
break;
} else if (!ret) {
fio_server_check_jobs(&job_list);
- fio_mutex_down_timeout(sk_out->wait, timeout);
+ fio_mutex_down_timeout(&sk_out->wait, timeout);
continue;
}
handle_xmits(sk_out);
close(sk_out->sk);
+ sk_out->sk = -1;
__sk_out_drop(sk_out);
_exit(ret);
}
/* get the address on this host bound by the input socket,
* whether it is ipv6 or ipv4 */
-int get_my_addr_str(int sk)
+static int get_my_addr_str(int sk)
{
struct sockaddr_in6 myaddr6 = { 0, };
struct sockaddr_in myaddr4 = { 0, };
return 0;
}
-static int accept_loop(struct sk_out *sk_out, int listen_sk)
+static int accept_loop(int listen_sk)
{
struct sockaddr_in addr;
struct sockaddr_in6 addr6;
fio_set_fd_nonblocking(listen_sk, "server");
while (!exit_backend) {
+ struct sk_out *sk_out;
const char *from;
char buf[64];
pid_t pid;
sk_out = smalloc(sizeof(*sk_out));
sk_out->sk = sk;
INIT_FLIST_HEAD(&sk_out->list);
- sk_out->lock = fio_mutex_init(FIO_MUTEX_UNLOCKED);
- sk_out->wait = fio_mutex_init(FIO_MUTEX_LOCKED);
+ __fio_mutex_init(&sk_out->lock, FIO_MUTEX_UNLOCKED);
+ __fio_mutex_init(&sk_out->wait, FIO_MUTEX_LOCKED);
+ __fio_mutex_init(&sk_out->xmit, FIO_MUTEX_UNLOCKED);
pid = fork();
if (pid) {
continue;
}
- /* exits */
- get_my_addr_str(sk); /* if error, it's already logged, non-fatal */
+ /* if error, it's already logged, non-fatal */
+ get_my_addr_str(sk);
+
+ /*
+ * Assign sk_out here, it'll be dropped in handle_connection()
+ * since that function calls _exit() when done
+ */
sk_out_assign(sk_out);
handle_connection(sk_out);
}
fio_net_queue_cmd(FIO_NET_CMD_GS, &gs, sizeof(gs), NULL, SK_F_COPY);
}
+void fio_server_send_job_options(struct flist_head *opt_list,
+ unsigned int groupid)
+{
+ struct cmd_job_option pdu;
+ struct flist_head *entry;
+
+ if (flist_empty(opt_list))
+ return;
+
+ flist_for_each(entry, opt_list) {
+ struct print_option *p;
+ size_t len;
+
+ p = flist_entry(entry, struct print_option, list);
+ memset(&pdu, 0, sizeof(pdu));
+
+ if (groupid == -1U) {
+ pdu.global = __cpu_to_le16(1);
+ pdu.groupid = 0;
+ } else {
+ pdu.global = 0;
+ pdu.groupid = cpu_to_le32(groupid);
+ }
+ len = strlen(p->name);
+ if (len >= sizeof(pdu.name)) {
+ len = sizeof(pdu.name) - 1;
+ pdu.truncated = __cpu_to_le16(1);
+ }
+ memcpy(pdu.name, p->name, len);
+ if (p->value) {
+ len = strlen(p->value);
+ if (len >= sizeof(pdu.value)) {
+ len = sizeof(pdu.value) - 1;
+ pdu.truncated = __cpu_to_le16(1);
+ }
+ memcpy(pdu.value, p->value, len);
+ }
+ fio_net_queue_cmd(FIO_NET_CMD_JOB_OPT, &pdu, sizeof(pdu), NULL, SK_F_COPY);
+ }
+}
+
static void convert_agg(struct disk_util_agg *dst, struct disk_util_agg *src)
{
int i;
}
}
-static int fio_send_iolog_gz(struct sk_entry *first, struct io_log *log)
+static int fio_append_iolog_gz(struct sk_entry *first, struct io_log *log)
{
int ret = 0;
#ifdef CONFIG_ZLIB
this_len = FIO_SERVER_MAX_FRAGMENT_PDU - stream.avail_out;
entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, out_pdu, this_len,
- NULL, SK_F_FREE | SK_F_VEC);
+ NULL, SK_F_VEC | SK_F_INLINE | SK_F_FREE);
+ out_pdu = NULL;
flist_add_tail(&entry->list, &first->next);
} while (stream.avail_in);
return ret;
}
+static int fio_append_gz_chunks(struct sk_entry *first, struct io_log *log)
+{
+ struct sk_entry *entry;
+ struct flist_head *node;
+
+ pthread_mutex_lock(&log->chunk_lock);
+ flist_for_each(node, &log->chunk_list) {
+ struct iolog_compress *c;
+
+ c = flist_entry(node, struct iolog_compress, list);
+ entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, c->buf, c->len,
+ NULL, SK_F_VEC | SK_F_INLINE);
+ flist_add_tail(&entry->list, &first->next);
+ }
+ pthread_mutex_unlock(&log->chunk_lock);
+
+ return 0;
+}
+
+static int fio_append_text_log(struct sk_entry *first, struct io_log *log)
+{
+ struct sk_entry *entry;
+ size_t size = log->nr_samples * log_entry_sz(log);
+
+ entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, log->log, size,
+ NULL, SK_F_VEC | SK_F_INLINE);
+ flist_add_tail(&entry->list, &first->next);
+ return 0;
+}
+
int fio_send_iolog(struct thread_data *td, struct io_log *log, const char *name)
{
struct cmd_iolog_pdu pdu;
pdu.nr_samples = cpu_to_le64(log->nr_samples);
pdu.thread_number = cpu_to_le32(td->thread_number);
pdu.log_type = cpu_to_le32(log->log_type);
- pdu.compressed = cpu_to_le32(use_zlib);
+
+ if (!flist_empty(&log->chunk_list))
+ pdu.compressed = __cpu_to_le32(STORE_COMPRESSED);
+ else if (use_zlib)
+ pdu.compressed = __cpu_to_le32(XMIT_COMPRESSED);
+ else
+ pdu.compressed = 0;
strncpy((char *) pdu.name, name, FIO_NET_NAME_MAX);
pdu.name[FIO_NET_NAME_MAX - 1] = '\0';
+ /*
+ * We can't do this for a pre-compressed log, but for that case,
+ * log->nr_samples is zero anyway.
+ */
for (i = 0; i < log->nr_samples; i++) {
struct io_sample *s = get_sample(log, i);
/*
* Assemble header entry first
*/
- first = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, &pdu, sizeof(pdu), NULL, SK_F_COPY | SK_F_VEC);
+ first = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, &pdu, sizeof(pdu), NULL, SK_F_VEC | SK_F_INLINE | SK_F_COPY);
/*
- * Now append actual log entries. Compress if we can, otherwise just
- * plain text output.
+ * Now append actual log entries. If log compression was enabled on
+ * the job, just send out the compressed chunks directly. If we
+ * have a plain log, compress if we can, then send. Otherwise, send
+ * the plain text output.
*/
- if (use_zlib)
- ret = fio_send_iolog_gz(first, log);
- else {
- struct sk_entry *entry;
-
- entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, log->log,
- log->nr_samples * log_entry_sz(log),
- NULL, SK_F_FREE | SK_F_VEC);
- flist_add_tail(&entry->list, &first->next);
- }
+ if (!flist_empty(&log->chunk_list))
+ ret = fio_append_gz_chunks(first, log);
+ else if (use_zlib)
+ ret = fio_append_iolog_gz(first, log);
+ else
+ ret = fio_append_text_log(first, log);
+ fio_net_queue_entry(first);
return ret;
}
pdu.groupid = cpu_to_le32(td->groupid);
convert_thread_options_to_net(&pdu.top, &td->o);
- fio_net_queue_cmd(FIO_NET_CMD_ADD_JOB, &pdu, sizeof(pdu), NULL, SK_F_COPY);
+ fio_net_queue_cmd(FIO_NET_CMD_ADD_JOB, &pdu, sizeof(pdu), NULL,
+ SK_F_COPY);
}
void fio_server_send_start(struct thread_data *td)
verify_state_gen_name((char *) out.path, sizeof(out.path), name, me,
threadnumber);
tag = (uint64_t) (uintptr_t) rep;
- fio_net_queue_cmd(FIO_NET_CMD_SENDFILE, &out, sizeof(out), &tag, SK_F_COPY);
+ fio_net_queue_cmd(FIO_NET_CMD_SENDFILE, &out, sizeof(out), &tag,
+ SK_F_COPY);
/*
* Wait for the backend to receive the reply
}
if (rep->error) {
- log_err("fio: failure on receiving state file: %s\n", strerror(rep->error));
+ log_err("fio: failure on receiving state file: %s\n",
+ strerror(rep->error));
fail:
*datap = NULL;
sfree(rep);
log_info("fio: server listening on %s\n", bind_str);
- if (listen(sk, 0) < 0) {
+ if (listen(sk, 4) < 0) {
log_err("fio: listen: %s\n", strerror(errno));
close(sk);
return -1;
static int fio_server(void)
{
- struct sk_out *sk_out;
int sk, ret;
+ if (pthread_key_create(&sk_out_key, NULL)) {
+ log_err("fio: can't create sk_out backend key\n");
+ return -1;
+ }
+
+ pthread_setspecific(sk_out_key, NULL);
+
dprint(FD_NET, "starting server\n");
if (fio_handle_server_arg())
set_sig_handlers();
- if (pthread_key_create(&sk_out_key, NULL))
- log_err("fio: can't create sk_out backend key\n");
-
- ret = accept_loop(sk_out, sk);
+ ret = accept_loop(sk);
close(sk);