+ dprint(FD_NET, "server: got op [%s], pdu=%u, tag=%llx\n",
+ fio_server_op(cmd->opcode), cmd->pdu_len,
+ (unsigned long long) cmd->tag);
+
+ switch (cmd->opcode) {
+ case FIO_NET_CMD_QUIT:
+ fio_terminate_threads(TERMINATE_ALL);
+ ret = 0;
+ break;
+ case FIO_NET_CMD_EXIT:
+ exit_backend = 1;
+ return -1;
+ case FIO_NET_CMD_LOAD_FILE:
+ ret = handle_load_file_cmd(cmd);
+ break;
+ case FIO_NET_CMD_JOB:
+ ret = handle_job_cmd(cmd);
+ break;
+ case FIO_NET_CMD_JOBLINE:
+ ret = handle_jobline_cmd(cmd);
+ break;
+ case FIO_NET_CMD_PROBE:
+ ret = handle_probe_cmd(cmd);
+ break;
+ case FIO_NET_CMD_SEND_ETA:
+ ret = handle_send_eta_cmd(cmd);
+ break;
+ case FIO_NET_CMD_RUN:
+ ret = handle_run_cmd(sk_out, job_list, cmd);
+ break;
+ case FIO_NET_CMD_UPDATE_JOB:
+ ret = handle_update_job_cmd(cmd);
+ break;
+ case FIO_NET_CMD_VTRIGGER:
+ ret = handle_trigger_cmd(cmd);
+ break;
+ case FIO_NET_CMD_SENDFILE: {
+ struct cmd_sendfile_reply *in;
+ struct cmd_reply *rep;
+
+ rep = (struct cmd_reply *) (uintptr_t) cmd->tag;
+
+ in = (struct cmd_sendfile_reply *) cmd->payload;
+ in->size = le32_to_cpu(in->size);
+ in->error = le32_to_cpu(in->error);
+ if (in->error) {
+ ret = 1;
+ rep->error = in->error;
+ } else {
+ ret = 0;
+ rep->data = smalloc(in->size);
+ if (!rep->data) {
+ ret = 1;
+ rep->error = ENOMEM;
+ } else {
+ rep->size = in->size;
+ memcpy(rep->data, in->data, in->size);
+ }
+ }
+ fio_mutex_up(&rep->lock);
+ break;
+ }
+ default:
+ log_err("fio: unknown opcode: %s\n", fio_server_op(cmd->opcode));
+ ret = 1;
+ }
+
+ return ret;
+}
+
+/*
+ * Send a command with a separate PDU, not inlined in the command
+ */
+static int fio_send_cmd_ext_pdu(int sk, uint16_t opcode, const void *buf,
+ off_t size, uint64_t tag, uint32_t flags)
+{
+ struct fio_net_cmd cmd;
+ struct iovec iov[2];
+
+ iov[0].iov_base = (void *) &cmd;
+ iov[0].iov_len = sizeof(cmd);
+ iov[1].iov_base = (void *) buf;
+ iov[1].iov_len = size;
+
+ __fio_init_net_cmd(&cmd, opcode, size, tag);
+ cmd.flags = __cpu_to_le32(flags);
+ fio_net_cmd_crc_pdu(&cmd, buf);
+
+ return fio_sendv_data(sk, iov, 2);
+}
+
+static void finish_entry(struct sk_entry *entry)
+{
+ if (entry->flags & SK_F_FREE)
+ free(entry->buf);
+ else if (entry->flags & SK_F_COPY)
+ sfree(entry->buf);
+
+ sfree(entry);
+}
+
+static void entry_set_flags(struct sk_entry *entry, struct flist_head *list,
+ unsigned int *flags)
+{
+ if (!flist_empty(list))
+ *flags = FIO_NET_CMD_F_MORE;
+ else
+ *flags = 0;
+}
+
+static int send_vec_entry(struct sk_out *sk_out, struct sk_entry *first)
+{
+ unsigned int flags;
+ int ret;
+
+ entry_set_flags(first, &first->next, &flags);
+
+ ret = fio_send_cmd_ext_pdu(sk_out->sk, first->opcode, first->buf,
+ first->size, first->tag, flags);
+
+ while (!flist_empty(&first->next)) {
+ struct sk_entry *next;
+
+ next = flist_first_entry(&first->next, struct sk_entry, list);
+ flist_del_init(&next->list);
+
+ entry_set_flags(next, &first->next, &flags);
+
+ ret += fio_send_cmd_ext_pdu(sk_out->sk, next->opcode, next->buf,
+ next->size, next->tag, flags);
+ finish_entry(next);
+ }
+
+ return ret;
+}
+
+static int handle_sk_entry(struct sk_out *sk_out, struct sk_entry *entry)
+{
+ int ret;
+
+ fio_mutex_down(&sk_out->xmit);
+
+ if (entry->flags & SK_F_VEC)
+ ret = send_vec_entry(sk_out, entry);
+ else if (entry->flags & SK_F_SIMPLE) {
+ ret = fio_net_send_simple_cmd(sk_out->sk, entry->opcode,
+ entry->tag, NULL);
+ } else {
+ ret = fio_net_send_cmd(sk_out->sk, entry->opcode, entry->buf,
+ entry->size, &entry->tag, NULL);
+ }
+
+ fio_mutex_up(&sk_out->xmit);
+
+ if (ret)
+ log_err("fio: failed handling cmd %s\n", fio_server_op(entry->opcode));
+
+ finish_entry(entry);
+ return ret;
+}
+
+static int handle_xmits(struct sk_out *sk_out)
+{
+ struct sk_entry *entry;
+ FLIST_HEAD(list);
+ int ret = 0;
+
+ sk_lock(sk_out);
+ if (flist_empty(&sk_out->list)) {
+ sk_unlock(sk_out);
+ return 0;
+ }
+
+ flist_splice_init(&sk_out->list, &list);
+ sk_unlock(sk_out);