+ handle_xmits(sk_out);
+
+ close(sk_out->sk);
+ sk_out->sk = -1;
+ __sk_out_drop(sk_out);
+ _exit(ret);
+}
+
+/* get the address on this host bound by the input socket,
+ * whether it is ipv6 or ipv4 */
+
+static int get_my_addr_str(int sk)
+{
+ struct sockaddr_in6 myaddr6 = { 0, };
+ struct sockaddr_in myaddr4 = { 0, };
+ struct sockaddr *sockaddr_p;
+ char *net_addr;
+ socklen_t len;
+ int ret;
+
+ if (use_ipv6) {
+ len = sizeof(myaddr6);
+ sockaddr_p = (struct sockaddr * )&myaddr6;
+ net_addr = (char * )&myaddr6.sin6_addr;
+ } else {
+ len = sizeof(myaddr4);
+ sockaddr_p = (struct sockaddr * )&myaddr4;
+ net_addr = (char * )&myaddr4.sin_addr;
+ }
+
+ ret = getsockname(sk, sockaddr_p, &len);
+ if (ret) {
+ log_err("fio: getsockname: %s\n", strerror(errno));
+ return -1;
+ }
+
+ if (!inet_ntop(use_ipv6?AF_INET6:AF_INET, net_addr, client_sockaddr_str, INET6_ADDRSTRLEN - 1)) {
+ log_err("inet_ntop: failed to convert addr to string\n");
+ return -1;
+ }
+
+ dprint(FD_NET, "fio server bound to addr %s\n", client_sockaddr_str);
+ return 0;
+}
+
+static int accept_loop(int listen_sk)
+{
+ struct sockaddr_in addr;
+ struct sockaddr_in6 addr6;
+ socklen_t len = use_ipv6 ? sizeof(addr6) : sizeof(addr);
+ struct pollfd pfd;
+ int ret = 0, sk, exitval = 0;
+ FLIST_HEAD(conn_list);
+
+ dprint(FD_NET, "server enter accept loop\n");
+
+ fio_set_fd_nonblocking(listen_sk, "server");
+
+ while (!exit_backend) {
+ struct sk_out *sk_out;
+ const char *from;
+ char buf[64];
+ pid_t pid;
+
+ pfd.fd = listen_sk;
+ pfd.events = POLLIN;
+ do {
+ int timeout = 1000;
+
+ if (!flist_empty(&conn_list))
+ timeout = 100;
+
+ ret = poll(&pfd, 1, timeout);
+ if (ret < 0) {
+ if (errno == EINTR)
+ break;
+ log_err("fio: poll: %s\n", strerror(errno));
+ break;
+ } else if (!ret) {
+ fio_server_check_conns(&conn_list);
+ continue;
+ }
+
+ if (pfd.revents & POLLIN)
+ break;
+ } while (!exit_backend);
+
+ fio_server_check_conns(&conn_list);
+
+ if (exit_backend || ret < 0)
+ break;
+
+ if (use_ipv6)
+ sk = accept(listen_sk, (struct sockaddr *) &addr6, &len);
+ else
+ sk = accept(listen_sk, (struct sockaddr *) &addr, &len);
+
+ if (sk < 0) {
+ log_err("fio: accept: %s\n", strerror(errno));
+ return -1;
+ }
+
+ if (use_ipv6)
+ from = inet_ntop(AF_INET6, (struct sockaddr *) &addr6.sin6_addr, buf, sizeof(buf));
+ else
+ from = inet_ntop(AF_INET, (struct sockaddr *) &addr.sin_addr, buf, sizeof(buf));
+
+ dprint(FD_NET, "server: connect from %s\n", from);
+
+ sk_out = scalloc(1, sizeof(*sk_out));
+ if (!sk_out) {
+ close(sk);
+ return -1;
+ }
+
+ sk_out->sk = sk;
+ INIT_FLIST_HEAD(&sk_out->list);
+ __fio_sem_init(&sk_out->lock, FIO_SEM_UNLOCKED);
+ __fio_sem_init(&sk_out->wait, FIO_SEM_LOCKED);
+ __fio_sem_init(&sk_out->xmit, FIO_SEM_UNLOCKED);
+
+ pid = fork();
+ if (pid) {
+ close(sk);
+ fio_server_add_conn_pid(&conn_list, pid);
+ continue;
+ }
+
+ /* if error, it's already logged, non-fatal */
+ get_my_addr_str(sk);
+
+ /*
+ * Assign sk_out here, it'll be dropped in handle_connection()
+ * since that function calls _exit() when done
+ */
+ sk_out_assign(sk_out);
+ handle_connection(sk_out);
+ }
+
+ return exitval;
+}
+
+int fio_server_text_output(int level, const char *buf, size_t len)
+{
+ struct sk_out *sk_out = pthread_getspecific(sk_out_key);
+ struct cmd_text_pdu *pdu;
+ unsigned int tlen;
+ struct timeval tv;
+
+ if (!sk_out || sk_out->sk == -1)
+ return -1;
+
+ tlen = sizeof(*pdu) + len;
+ pdu = malloc(tlen);
+
+ pdu->level = __cpu_to_le32(level);
+ pdu->buf_len = __cpu_to_le32(len);
+
+ gettimeofday(&tv, NULL);
+ pdu->log_sec = __cpu_to_le64(tv.tv_sec);
+ pdu->log_usec = __cpu_to_le64(tv.tv_usec);
+
+ memcpy(pdu->buf, buf, len);
+
+ fio_net_queue_cmd(FIO_NET_CMD_TEXT, pdu, tlen, NULL, SK_F_COPY);
+ free(pdu);
+ return len;
+}
+
+static void convert_io_stat(struct io_stat *dst, struct io_stat *src)
+{
+ dst->max_val = cpu_to_le64(src->max_val);
+ dst->min_val = cpu_to_le64(src->min_val);
+ dst->samples = cpu_to_le64(src->samples);
+
+ /*
+ * Encode to IEEE 754 for network transfer
+ */
+ dst->mean.u.i = cpu_to_le64(fio_double_to_uint64(src->mean.u.f));
+ dst->S.u.i = cpu_to_le64(fio_double_to_uint64(src->S.u.f));
+}
+
+static void convert_gs(struct group_run_stats *dst, struct group_run_stats *src)
+{
+ int i;
+
+ for (i = 0; i < DDIR_RWDIR_CNT; i++) {
+ dst->max_run[i] = cpu_to_le64(src->max_run[i]);
+ dst->min_run[i] = cpu_to_le64(src->min_run[i]);
+ dst->max_bw[i] = cpu_to_le64(src->max_bw[i]);
+ dst->min_bw[i] = cpu_to_le64(src->min_bw[i]);
+ dst->iobytes[i] = cpu_to_le64(src->iobytes[i]);
+ dst->agg[i] = cpu_to_le64(src->agg[i]);
+ }
+
+ dst->kb_base = cpu_to_le32(src->kb_base);
+ dst->unit_base = cpu_to_le32(src->unit_base);
+ dst->groupid = cpu_to_le32(src->groupid);
+ dst->unified_rw_rep = cpu_to_le32(src->unified_rw_rep);
+ dst->sig_figs = cpu_to_le32(src->sig_figs);
+}
+
+/*
+ * Send a CMD_TS, which packs struct thread_stat and group_run_stats
+ * into a single payload.
+ */
+void fio_server_send_ts(struct thread_stat *ts, struct group_run_stats *rs)
+{
+ struct cmd_ts_pdu p;
+ int i, j;
+ void *ss_buf;
+ uint64_t *ss_iops, *ss_bw;
+
+ dprint(FD_NET, "server sending end stats\n");
+
+ memset(&p, 0, sizeof(p));
+
+ strncpy(p.ts.name, ts->name, FIO_JOBNAME_SIZE - 1);
+ strncpy(p.ts.verror, ts->verror, FIO_VERROR_SIZE - 1);
+ strncpy(p.ts.description, ts->description, FIO_JOBDESC_SIZE - 1);
+
+ p.ts.error = cpu_to_le32(ts->error);
+ p.ts.thread_number = cpu_to_le32(ts->thread_number);
+ p.ts.groupid = cpu_to_le32(ts->groupid);
+ p.ts.pid = cpu_to_le32(ts->pid);
+ p.ts.members = cpu_to_le32(ts->members);
+ p.ts.unified_rw_rep = cpu_to_le32(ts->unified_rw_rep);
+
+ for (i = 0; i < DDIR_RWDIR_CNT; i++) {
+ convert_io_stat(&p.ts.clat_stat[i], &ts->clat_stat[i]);
+ convert_io_stat(&p.ts.slat_stat[i], &ts->slat_stat[i]);
+ convert_io_stat(&p.ts.lat_stat[i], &ts->lat_stat[i]);
+ convert_io_stat(&p.ts.bw_stat[i], &ts->bw_stat[i]);
+ convert_io_stat(&p.ts.iops_stat[i], &ts->iops_stat[i]);
+ }
+
+ p.ts.usr_time = cpu_to_le64(ts->usr_time);
+ p.ts.sys_time = cpu_to_le64(ts->sys_time);
+ p.ts.ctx = cpu_to_le64(ts->ctx);
+ p.ts.minf = cpu_to_le64(ts->minf);
+ p.ts.majf = cpu_to_le64(ts->majf);
+ p.ts.clat_percentiles = cpu_to_le32(ts->clat_percentiles);
+ p.ts.lat_percentiles = cpu_to_le32(ts->lat_percentiles);
+ p.ts.percentile_precision = cpu_to_le64(ts->percentile_precision);
+
+ for (i = 0; i < FIO_IO_U_LIST_MAX_LEN; i++) {
+ fio_fp64_t *src = &ts->percentile_list[i];
+ fio_fp64_t *dst = &p.ts.percentile_list[i];
+
+ dst->u.i = cpu_to_le64(fio_double_to_uint64(src->u.f));
+ }
+
+ for (i = 0; i < FIO_IO_U_MAP_NR; i++) {
+ p.ts.io_u_map[i] = cpu_to_le64(ts->io_u_map[i]);
+ p.ts.io_u_submit[i] = cpu_to_le64(ts->io_u_submit[i]);
+ p.ts.io_u_complete[i] = cpu_to_le64(ts->io_u_complete[i]);
+ }
+
+ for (i = 0; i < FIO_IO_U_LAT_N_NR; i++)
+ p.ts.io_u_lat_n[i] = cpu_to_le64(ts->io_u_lat_n[i]);
+ for (i = 0; i < FIO_IO_U_LAT_U_NR; i++)
+ p.ts.io_u_lat_u[i] = cpu_to_le64(ts->io_u_lat_u[i]);
+ for (i = 0; i < FIO_IO_U_LAT_M_NR; i++)
+ p.ts.io_u_lat_m[i] = cpu_to_le64(ts->io_u_lat_m[i]);
+
+ for (i = 0; i < DDIR_RWDIR_CNT; i++)
+ for (j = 0; j < FIO_IO_U_PLAT_NR; j++)
+ p.ts.io_u_plat[i][j] = cpu_to_le64(ts->io_u_plat[i][j]);
+
+ for (i = 0; i < DDIR_RWDIR_CNT; i++) {
+ p.ts.total_io_u[i] = cpu_to_le64(ts->total_io_u[i]);
+ p.ts.short_io_u[i] = cpu_to_le64(ts->short_io_u[i]);
+ p.ts.drop_io_u[i] = cpu_to_le64(ts->drop_io_u[i]);
+ }
+
+ p.ts.total_submit = cpu_to_le64(ts->total_submit);
+ p.ts.total_complete = cpu_to_le64(ts->total_complete);
+
+ for (i = 0; i < DDIR_RWDIR_CNT; i++) {
+ p.ts.io_bytes[i] = cpu_to_le64(ts->io_bytes[i]);
+ p.ts.runtime[i] = cpu_to_le64(ts->runtime[i]);
+ }
+
+ p.ts.total_run_time = cpu_to_le64(ts->total_run_time);
+ p.ts.continue_on_error = cpu_to_le16(ts->continue_on_error);
+ p.ts.total_err_count = cpu_to_le64(ts->total_err_count);
+ p.ts.first_error = cpu_to_le32(ts->first_error);
+ p.ts.kb_base = cpu_to_le32(ts->kb_base);
+ p.ts.unit_base = cpu_to_le32(ts->unit_base);
+
+ p.ts.latency_depth = cpu_to_le32(ts->latency_depth);
+ p.ts.latency_target = cpu_to_le64(ts->latency_target);
+ p.ts.latency_window = cpu_to_le64(ts->latency_window);
+ p.ts.latency_percentile.u.i = cpu_to_le64(fio_double_to_uint64(ts->latency_percentile.u.f));
+
+ p.ts.sig_figs = cpu_to_le32(ts->sig_figs);
+
+ p.ts.nr_block_infos = cpu_to_le64(ts->nr_block_infos);
+ for (i = 0; i < p.ts.nr_block_infos; i++)
+ p.ts.block_infos[i] = cpu_to_le32(ts->block_infos[i]);
+
+ p.ts.ss_dur = cpu_to_le64(ts->ss_dur);
+ p.ts.ss_state = cpu_to_le32(ts->ss_state);
+ p.ts.ss_head = cpu_to_le32(ts->ss_head);
+ p.ts.ss_limit.u.i = cpu_to_le64(fio_double_to_uint64(ts->ss_limit.u.f));
+ p.ts.ss_slope.u.i = cpu_to_le64(fio_double_to_uint64(ts->ss_slope.u.f));
+ p.ts.ss_deviation.u.i = cpu_to_le64(fio_double_to_uint64(ts->ss_deviation.u.f));
+ p.ts.ss_criterion.u.i = cpu_to_le64(fio_double_to_uint64(ts->ss_criterion.u.f));
+
+ convert_gs(&p.rs, rs);
+
+ dprint(FD_NET, "ts->ss_state = %d\n", ts->ss_state);
+ if (ts->ss_state & FIO_SS_DATA) {
+ dprint(FD_NET, "server sending steadystate ring buffers\n");
+
+ ss_buf = malloc(sizeof(p) + 2*ts->ss_dur*sizeof(uint64_t));
+
+ memcpy(ss_buf, &p, sizeof(p));
+
+ ss_iops = (uint64_t *) ((struct cmd_ts_pdu *)ss_buf + 1);
+ ss_bw = ss_iops + (int) ts->ss_dur;
+ for (i = 0; i < ts->ss_dur; i++) {
+ ss_iops[i] = cpu_to_le64(ts->ss_iops_data[i]);
+ ss_bw[i] = cpu_to_le64(ts->ss_bw_data[i]);
+ }
+
+ fio_net_queue_cmd(FIO_NET_CMD_TS, ss_buf, sizeof(p) + 2*ts->ss_dur*sizeof(uint64_t), NULL, SK_F_COPY);
+
+ free(ss_buf);
+ }
+ else
+ fio_net_queue_cmd(FIO_NET_CMD_TS, &p, sizeof(p), NULL, SK_F_COPY);
+}
+
+void fio_server_send_gs(struct group_run_stats *rs)
+{
+ struct group_run_stats gs;
+
+ dprint(FD_NET, "server sending group run stats\n");
+
+ convert_gs(&gs, rs);
+ fio_net_queue_cmd(FIO_NET_CMD_GS, &gs, sizeof(gs), NULL, SK_F_COPY);
+}
+
+void fio_server_send_job_options(struct flist_head *opt_list,
+ unsigned int gid)
+{
+ struct cmd_job_option pdu;
+ struct flist_head *entry;
+
+ if (flist_empty(opt_list))
+ return;
+
+ flist_for_each(entry, opt_list) {
+ struct print_option *p;
+ size_t len;
+
+ p = flist_entry(entry, struct print_option, list);
+ memset(&pdu, 0, sizeof(pdu));
+
+ if (gid == -1U) {
+ pdu.global = __cpu_to_le16(1);
+ pdu.groupid = 0;
+ } else {
+ pdu.global = 0;
+ pdu.groupid = cpu_to_le32(gid);
+ }
+ len = strlen(p->name);
+ if (len >= sizeof(pdu.name)) {
+ len = sizeof(pdu.name) - 1;
+ pdu.truncated = __cpu_to_le16(1);
+ }
+ memcpy(pdu.name, p->name, len);
+ if (p->value) {
+ len = strlen(p->value);
+ if (len >= sizeof(pdu.value)) {
+ len = sizeof(pdu.value) - 1;
+ pdu.truncated = __cpu_to_le16(1);
+ }
+ memcpy(pdu.value, p->value, len);
+ }
+ fio_net_queue_cmd(FIO_NET_CMD_JOB_OPT, &pdu, sizeof(pdu), NULL, SK_F_COPY);
+ }
+}
+
+static void convert_agg(struct disk_util_agg *dst, struct disk_util_agg *src)
+{
+ int i;
+
+ for (i = 0; i < 2; i++) {
+ dst->ios[i] = cpu_to_le64(src->ios[i]);
+ dst->merges[i] = cpu_to_le64(src->merges[i]);
+ dst->sectors[i] = cpu_to_le64(src->sectors[i]);
+ dst->ticks[i] = cpu_to_le64(src->ticks[i]);
+ }
+
+ dst->io_ticks = cpu_to_le64(src->io_ticks);
+ dst->time_in_queue = cpu_to_le64(src->time_in_queue);
+ dst->slavecount = cpu_to_le32(src->slavecount);
+ dst->max_util.u.i = cpu_to_le64(fio_double_to_uint64(src->max_util.u.f));
+}
+
+static void convert_dus(struct disk_util_stat *dst, struct disk_util_stat *src)
+{
+ int i;
+
+ dst->name[FIO_DU_NAME_SZ - 1] = '\0';
+ strncpy((char *) dst->name, (char *) src->name, FIO_DU_NAME_SZ - 1);
+
+ for (i = 0; i < 2; i++) {
+ dst->s.ios[i] = cpu_to_le64(src->s.ios[i]);
+ dst->s.merges[i] = cpu_to_le64(src->s.merges[i]);
+ dst->s.sectors[i] = cpu_to_le64(src->s.sectors[i]);
+ dst->s.ticks[i] = cpu_to_le64(src->s.ticks[i]);
+ }
+
+ dst->s.io_ticks = cpu_to_le64(src->s.io_ticks);
+ dst->s.time_in_queue = cpu_to_le64(src->s.time_in_queue);
+ dst->s.msec = cpu_to_le64(src->s.msec);
+}
+
+void fio_server_send_du(void)
+{
+ struct disk_util *du;
+ struct flist_head *entry;
+ struct cmd_du_pdu pdu;
+
+ dprint(FD_NET, "server: sending disk_util %d\n", !flist_empty(&disk_list));
+
+ memset(&pdu, 0, sizeof(pdu));
+
+ flist_for_each(entry, &disk_list) {
+ du = flist_entry(entry, struct disk_util, list);
+
+ convert_dus(&pdu.dus, &du->dus);
+ convert_agg(&pdu.agg, &du->agg);
+
+ fio_net_queue_cmd(FIO_NET_CMD_DU, &pdu, sizeof(pdu), NULL, SK_F_COPY);
+ }
+}
+
+#ifdef CONFIG_ZLIB
+
+static inline void __fio_net_prep_tail(z_stream *stream, void *out_pdu,
+ struct sk_entry **last_entry,
+ struct sk_entry *first)
+{
+ unsigned int this_len = FIO_SERVER_MAX_FRAGMENT_PDU - stream->avail_out;
+
+ *last_entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, out_pdu, this_len,
+ NULL, SK_F_VEC | SK_F_INLINE | SK_F_FREE);
+ if (*last_entry)
+ flist_add_tail(&(*last_entry)->list, &first->next);
+}
+
+/*
+ * Deflates the next input given, creating as many new packets in the
+ * linked list as necessary.
+ */
+static int __deflate_pdu_buffer(void *next_in, unsigned int next_sz, void **out_pdu,
+ struct sk_entry **last_entry, z_stream *stream,
+ struct sk_entry *first)
+{
+ int ret;
+
+ stream->next_in = next_in;
+ stream->avail_in = next_sz;
+ do {
+ if (!stream->avail_out) {
+ __fio_net_prep_tail(stream, *out_pdu, last_entry, first);
+ if (*last_entry == NULL)
+ return 1;
+
+ *out_pdu = malloc(FIO_SERVER_MAX_FRAGMENT_PDU);
+
+ stream->avail_out = FIO_SERVER_MAX_FRAGMENT_PDU;
+ stream->next_out = *out_pdu;
+ }
+
+ ret = deflate(stream, Z_BLOCK);
+
+ if (ret < 0) {
+ free(*out_pdu);
+ return 1;
+ }
+ } while (stream->avail_in);
+
+ return 0;
+}
+
+static int __fio_append_iolog_gz_hist(struct sk_entry *first, struct io_log *log,
+ struct io_logs *cur_log, z_stream *stream)
+{
+ struct sk_entry *entry;
+ void *out_pdu;
+ int ret, i, j;
+ int sample_sz = log_entry_sz(log);
+
+ out_pdu = malloc(FIO_SERVER_MAX_FRAGMENT_PDU);
+ stream->avail_out = FIO_SERVER_MAX_FRAGMENT_PDU;
+ stream->next_out = out_pdu;
+
+ for (i = 0; i < cur_log->nr_samples; i++) {
+ struct io_sample *s;
+ struct io_u_plat_entry *cur_plat_entry, *prev_plat_entry;
+ uint64_t *cur_plat, *prev_plat;
+
+ s = get_sample(log, cur_log, i);
+ ret = __deflate_pdu_buffer(s, sample_sz, &out_pdu, &entry, stream, first);
+ if (ret)
+ return ret;
+
+ /* Do the subtraction on server side so that client doesn't have to
+ * reconstruct our linked list from packets.
+ */
+ cur_plat_entry = s->data.plat_entry;
+ prev_plat_entry = flist_first_entry(&cur_plat_entry->list, struct io_u_plat_entry, list);
+ cur_plat = cur_plat_entry->io_u_plat;
+ prev_plat = prev_plat_entry->io_u_plat;
+
+ for (j = 0; j < FIO_IO_U_PLAT_NR; j++) {
+ cur_plat[j] -= prev_plat[j];
+ }
+
+ flist_del(&prev_plat_entry->list);
+ free(prev_plat_entry);
+
+ ret = __deflate_pdu_buffer(cur_plat_entry, sizeof(*cur_plat_entry),
+ &out_pdu, &entry, stream, first);
+
+ if (ret)
+ return ret;
+ }
+
+ __fio_net_prep_tail(stream, out_pdu, &entry, first);
+ return entry == NULL;