+ memset(&probe, 0, sizeof(probe));
+ gethostname((char *) probe.hostname, sizeof(probe.hostname));
+#ifdef CONFIG_BIG_ENDIAN
+ probe.bigendian = 1;
+#endif
+ strncpy((char *) probe.fio_version, fio_version_string, sizeof(probe.fio_version));
+
+ probe.os = FIO_OS;
+ probe.arch = FIO_ARCH;
+ probe.bpp = sizeof(void *);
+ probe.cpus = __cpu_to_le32(cpus_online());
+
+ /*
+ * If the client supports compression and we do too, then enable it
+ */
+ if (has_zlib && le64_to_cpu(pdu->flags) & FIO_PROBE_FLAG_ZLIB) {
+ probe.flags = __cpu_to_le64(FIO_PROBE_FLAG_ZLIB);
+ use_zlib = 1;
+ } else {
+ probe.flags = 0;
+ use_zlib = 0;
+ }
+
+ return fio_net_send_cmd(server_fd, FIO_NET_CMD_PROBE, &probe, sizeof(probe), &tag, NULL);
+}
+
+static int handle_send_eta_cmd(struct fio_net_cmd *cmd)
+{
+ struct jobs_eta *je;
+ uint64_t tag = cmd->tag;
+ size_t size;
+ int i;
+
+ je = get_jobs_eta(1, &size);
+ if (!je)
+ return 0;
+
+ dprint(FD_NET, "server sending status\n");
+
+ je->nr_running = cpu_to_le32(je->nr_running);
+ je->nr_ramp = cpu_to_le32(je->nr_ramp);
+ je->nr_pending = cpu_to_le32(je->nr_pending);
+ je->nr_setting_up = cpu_to_le32(je->nr_setting_up);
+ je->files_open = cpu_to_le32(je->files_open);
+
+ for (i = 0; i < DDIR_RWDIR_CNT; i++) {
+ je->m_rate[i] = cpu_to_le32(je->m_rate[i]);
+ je->t_rate[i] = cpu_to_le32(je->t_rate[i]);
+ je->m_iops[i] = cpu_to_le32(je->m_iops[i]);
+ je->t_iops[i] = cpu_to_le32(je->t_iops[i]);
+ je->rate[i] = cpu_to_le32(je->rate[i]);
+ je->iops[i] = cpu_to_le32(je->iops[i]);
+ }
+
+ je->elapsed_sec = cpu_to_le64(je->elapsed_sec);
+ je->eta_sec = cpu_to_le64(je->eta_sec);
+ je->nr_threads = cpu_to_le32(je->nr_threads);
+ je->is_pow2 = cpu_to_le32(je->is_pow2);
+ je->unit_base = cpu_to_le32(je->unit_base);
+
+ fio_net_send_cmd(server_fd, FIO_NET_CMD_ETA, je, size, &tag, NULL);
+ free(je);
+ return 0;
+}
+
+static int send_update_job_reply(int fd, uint64_t __tag, int error)
+{
+ uint64_t tag = __tag;
+ uint32_t pdu_error;
+
+ pdu_error = __cpu_to_le32(error);
+ return fio_net_send_cmd(fd, FIO_NET_CMD_UPDATE_JOB, &pdu_error, sizeof(pdu_error), &tag, NULL);
+}
+
+static int handle_update_job_cmd(struct fio_net_cmd *cmd)
+{
+ struct cmd_add_job_pdu *pdu = (struct cmd_add_job_pdu *) cmd->payload;
+ struct thread_data *td;
+ uint32_t tnumber;
+
+ tnumber = le32_to_cpu(pdu->thread_number);
+
+ dprint(FD_NET, "server: updating options for job %u\n", tnumber);
+
+ if (!tnumber || tnumber > thread_number) {
+ send_update_job_reply(server_fd, cmd->tag, ENODEV);
+ return 0;
+ }
+
+ td = &threads[tnumber - 1];
+ convert_thread_options_to_cpu(&td->o, &pdu->top);
+ send_update_job_reply(server_fd, cmd->tag, 0);
+ return 0;
+}
+
+static int handle_trigger_cmd(struct fio_net_cmd *cmd)
+{
+ struct cmd_vtrigger_pdu *pdu = (struct cmd_vtrigger_pdu *) cmd->payload;
+ char *buf = (char *) pdu->cmd;
+ struct all_io_list *rep;
+ size_t sz;
+
+ pdu->len = le16_to_cpu(pdu->len);
+ buf[pdu->len] = '\0';
+
+ rep = get_all_io_list(IO_LIST_ALL, &sz);
+ if (!rep) {
+ struct all_io_list state;
+
+ state.threads = cpu_to_le64((uint64_t) 0);
+ fio_net_send_cmd(server_fd, FIO_NET_CMD_VTRIGGER, &state, sizeof(state), NULL, NULL);
+ } else {
+ fio_net_send_cmd(server_fd, FIO_NET_CMD_VTRIGGER, rep, sz, NULL, NULL);
+ free(rep);
+ }
+
+ exec_trigger(buf);
+ return 0;
+}
+
+static int handle_command(struct flist_head *job_list, struct fio_net_cmd *cmd)
+{
+ int ret;
+
+ dprint(FD_NET, "server: got op [%s], pdu=%u, tag=%llx\n",
+ fio_server_op(cmd->opcode), cmd->pdu_len,
+ (unsigned long long) cmd->tag);
+
+ switch (cmd->opcode) {
+ case FIO_NET_CMD_QUIT:
+ fio_terminate_threads(TERMINATE_ALL);
+ return -1;
+ case FIO_NET_CMD_EXIT:
+ exit_backend = 1;
+ return -1;
+ case FIO_NET_CMD_LOAD_FILE:
+ ret = handle_load_file_cmd(cmd);
+ break;
+ case FIO_NET_CMD_JOB:
+ ret = handle_job_cmd(cmd);
+ break;
+ case FIO_NET_CMD_JOBLINE:
+ ret = handle_jobline_cmd(cmd);
+ break;
+ case FIO_NET_CMD_PROBE:
+ ret = handle_probe_cmd(cmd);
+ break;
+ case FIO_NET_CMD_SEND_ETA:
+ ret = handle_send_eta_cmd(cmd);
+ break;
+ case FIO_NET_CMD_RUN:
+ ret = handle_run_cmd(job_list, cmd);
+ break;
+ case FIO_NET_CMD_UPDATE_JOB:
+ ret = handle_update_job_cmd(cmd);
+ break;
+ case FIO_NET_CMD_VTRIGGER:
+ ret = handle_trigger_cmd(cmd);
+ break;
+ case FIO_NET_CMD_SENDFILE: {
+ struct cmd_sendfile_reply *in;
+ struct cmd_reply *rep;
+
+ rep = (struct cmd_reply *) (uintptr_t) cmd->tag;
+
+ in = (struct cmd_sendfile_reply *) cmd->payload;
+ in->size = le32_to_cpu(in->size);
+ in->error = le32_to_cpu(in->error);
+ if (in->error) {
+ ret = 1;
+ rep->error = in->error;
+ } else {
+ ret = 0;
+ rep->data = smalloc(in->size);
+ if (!rep->data) {
+ ret = 1;
+ rep->error = ENOMEM;
+ } else {
+ rep->size = in->size;
+ memcpy(rep->data, in->data, in->size);
+ }
+ }
+ fio_mutex_up(&rep->lock);
+ break;
+ }
+ default:
+ log_err("fio: unknown opcode: %s\n", fio_server_op(cmd->opcode));
+ ret = 1;
+ }
+
+ return ret;
+}
+
+static int handle_connection(int sk)
+{
+ struct fio_net_cmd *cmd = NULL;
+ FLIST_HEAD(job_list);
+ int ret = 0;
+
+ reset_fio_state();
+ server_fd = sk;
+
+ /* read forever */
+ while (!exit_backend) {
+ struct pollfd pfd = {
+ .fd = sk,
+ .events = POLLIN,
+ };
+
+ ret = 0;
+ do {
+ int timeout = 1000;
+
+ if (!flist_empty(&job_list))
+ timeout = 100;
+
+ ret = poll(&pfd, 1, timeout);
+ if (ret < 0) {
+ if (errno == EINTR)
+ break;
+ log_err("fio: poll: %s\n", strerror(errno));
+ break;
+ } else if (!ret) {
+ fio_server_check_jobs(&job_list);
+ continue;
+ }
+
+ if (pfd.revents & POLLIN)
+ break;
+ if (pfd.revents & (POLLERR|POLLHUP)) {
+ ret = 1;
+ break;
+ }
+ } while (!exit_backend);
+
+ fio_server_check_jobs(&job_list);
+
+ if (ret < 0)
+ break;
+
+ cmd = fio_net_recv_cmd(sk);
+ if (!cmd) {
+ ret = -1;
+ break;
+ }
+
+ ret = handle_command(&job_list, cmd);
+ if (ret)
+ break;
+
+ free(cmd);
+ cmd = NULL;
+ }
+
+ if (cmd)
+ free(cmd);
+
+ close(sk);
+ _exit(ret);
+}
+
+/* get the address on this host bound by the input socket,
+ * whether it is ipv6 or ipv4 */
+
+int get_my_addr_str( int sk )
+{
+ int ret;
+ struct sockaddr * sockaddr_p;
+ struct sockaddr_in myaddr4 = {0};
+ struct sockaddr_in6 myaddr6 = {0};
+ char * net_addr;
+ socklen_t len = use_ipv6 ? sizeof(myaddr6) : sizeof(myaddr4);
+
+ if (use_ipv6)
+ sockaddr_p = (struct sockaddr * )&myaddr6;
+ else
+ sockaddr_p = (struct sockaddr * )&myaddr4;
+ ret = getsockname(sk, sockaddr_p, &len);
+ if (ret) {
+ log_err("fio: getsockaddr: %s\n", strerror(errno));
+ return -1;
+ }
+ if (use_ipv6)
+ net_addr = (char * )&myaddr6.sin6_addr;
+ else
+ net_addr = (char * )&myaddr4.sin_addr;
+ if (NULL == inet_ntop(use_ipv6?AF_INET6:AF_INET, net_addr, client_sockaddr_str, INET6_ADDRSTRLEN-1)) {
+ log_err("inet_ntop: failed to convert addr to string\n");
+ return -1;
+ }
+ dprint(FD_NET, "fio server bound to addr %s\n", client_sockaddr_str);
+ return 0;
+}
+
+static int accept_loop(int listen_sk)
+{
+ struct sockaddr_in addr;
+ struct sockaddr_in6 addr6;
+ socklen_t len = use_ipv6 ? sizeof(addr6) : sizeof(addr);
+ struct pollfd pfd;
+ int ret = 0, sk, exitval = 0;
+ FLIST_HEAD(conn_list);
+
+ dprint(FD_NET, "server enter accept loop\n");
+
+ fio_set_fd_nonblocking(listen_sk, "server");
+
+ while (!exit_backend) {
+ const char *from;
+ char buf[64];
+ pid_t pid;
+
+ pfd.fd = listen_sk;
+ pfd.events = POLLIN;
+ do {
+ int timeout = 1000;
+
+ if (!flist_empty(&conn_list))
+ timeout = 100;
+
+ ret = poll(&pfd, 1, timeout);
+ if (ret < 0) {
+ if (errno == EINTR)
+ break;
+ log_err("fio: poll: %s\n", strerror(errno));
+ break;
+ } else if (!ret) {
+ fio_server_check_conns(&conn_list);
+ continue;
+ }
+
+ if (pfd.revents & POLLIN)
+ break;
+ } while (!exit_backend);
+
+ fio_server_check_conns(&conn_list);
+
+ if (exit_backend || ret < 0)
+ break;
+
+ if (use_ipv6)
+ sk = accept(listen_sk, (struct sockaddr *) &addr6, &len);
+ else
+ sk = accept(listen_sk, (struct sockaddr *) &addr, &len);
+
+ if (sk < 0) {
+ log_err("fio: accept: %s\n", strerror(errno));
+ return -1;
+ }
+
+ if (use_ipv6)
+ from = inet_ntop(AF_INET6, (struct sockaddr *) &addr6.sin6_addr, buf, sizeof(buf));
+ else
+ from = inet_ntop(AF_INET, (struct sockaddr *) &addr.sin_addr, buf, sizeof(buf));
+
+ dprint(FD_NET, "server: connect from %s\n", from);
+
+ pid = fork();
+ if (pid) {
+ close(sk);
+ fio_server_add_conn_pid(&conn_list, pid);
+ continue;
+ }
+
+ /* exits */
+ get_my_addr_str(sk); /* if error, it's already logged, non-fatal */
+ handle_connection(sk);
+ }
+
+ return exitval;
+}
+
+int fio_server_text_output(int level, const char *buf, size_t len)
+{
+ struct cmd_text_pdu *pdu;
+ unsigned int tlen;
+ struct timeval tv;
+
+ if (server_fd == -1)
+ return log_local_buf(buf, len);
+
+ tlen = sizeof(*pdu) + len;
+ pdu = malloc(tlen);
+
+ pdu->level = __cpu_to_le32(level);
+ pdu->buf_len = __cpu_to_le32(len);
+
+ gettimeofday(&tv, NULL);
+ pdu->log_sec = __cpu_to_le64(tv.tv_sec);
+ pdu->log_usec = __cpu_to_le64(tv.tv_usec);
+
+ memcpy(pdu->buf, buf, len);
+
+ fio_net_send_cmd(server_fd, FIO_NET_CMD_TEXT, pdu, tlen, NULL, NULL);
+ free(pdu);
+ return len;
+}
+
+static void convert_io_stat(struct io_stat *dst, struct io_stat *src)
+{
+ dst->max_val = cpu_to_le64(src->max_val);
+ dst->min_val = cpu_to_le64(src->min_val);
+ dst->samples = cpu_to_le64(src->samples);
+
+ /*
+ * Encode to IEEE 754 for network transfer
+ */
+ dst->mean.u.i = cpu_to_le64(fio_double_to_uint64(src->mean.u.f));
+ dst->S.u.i = cpu_to_le64(fio_double_to_uint64(src->S.u.f));
+}
+
+static void convert_gs(struct group_run_stats *dst, struct group_run_stats *src)
+{
+ int i;
+
+ for (i = 0; i < DDIR_RWDIR_CNT; i++) {
+ dst->max_run[i] = cpu_to_le64(src->max_run[i]);
+ dst->min_run[i] = cpu_to_le64(src->min_run[i]);
+ dst->max_bw[i] = cpu_to_le64(src->max_bw[i]);
+ dst->min_bw[i] = cpu_to_le64(src->min_bw[i]);
+ dst->io_kb[i] = cpu_to_le64(src->io_kb[i]);
+ dst->agg[i] = cpu_to_le64(src->agg[i]);
+ }
+
+ dst->kb_base = cpu_to_le32(src->kb_base);
+ dst->unit_base = cpu_to_le32(src->unit_base);
+ dst->groupid = cpu_to_le32(src->groupid);
+ dst->unified_rw_rep = cpu_to_le32(src->unified_rw_rep);
+}
+
+/*
+ * Send a CMD_TS, which packs struct thread_stat and group_run_stats
+ * into a single payload.
+ */
+void fio_server_send_ts(struct thread_stat *ts, struct group_run_stats *rs)
+{
+ struct cmd_ts_pdu p;
+ int i, j;
+
+ dprint(FD_NET, "server sending end stats\n");
+
+ memset(&p, 0, sizeof(p));
+
+ strncpy(p.ts.name, ts->name, FIO_JOBNAME_SIZE - 1);
+ strncpy(p.ts.verror, ts->verror, FIO_VERROR_SIZE - 1);
+ strncpy(p.ts.description, ts->description, FIO_JOBDESC_SIZE - 1);
+
+ p.ts.error = cpu_to_le32(ts->error);
+ p.ts.thread_number = cpu_to_le32(ts->thread_number);
+ p.ts.groupid = cpu_to_le32(ts->groupid);
+ p.ts.pid = cpu_to_le32(ts->pid);
+ p.ts.members = cpu_to_le32(ts->members);
+ p.ts.unified_rw_rep = cpu_to_le32(ts->unified_rw_rep);
+
+ for (i = 0; i < DDIR_RWDIR_CNT; i++) {