#include <stdio.h>
#include <stdlib.h>
-#include <stdarg.h>
#include <unistd.h>
-#include <limits.h>
#include <errno.h>
-#include <sys/poll.h>
+#include <poll.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/socket.h>
#include "server.h"
#include "crc/crc16.h"
#include "lib/ieee754.h"
-#include "verify.h"
+#include "verify-state.h"
#include "smalloc.h"
int fio_net_port = FIO_NET_PORT;
-int exit_backend = 0;
+bool exit_backend = false;
enum {
SK_F_FREE = 1,
static pthread_key_t sk_out_key;
+#ifdef WIN32
+static char *fio_server_pipe_name = NULL;
+static HANDLE hjob = INVALID_HANDLE_VALUE;
+struct ffi_element {
+ union {
+ pthread_t thread;
+ HANDLE hProcess;
+ };
+ bool is_thread;
+};
+#endif
+
struct fio_fork_item {
struct flist_head list;
int exitval;
int signal;
int exited;
+#ifdef WIN32
+ struct ffi_element element;
+#else
pid_t pid;
+#endif
};
struct cmd_reply {
- struct fio_mutex lock;
+ struct fio_sem lock;
void *data;
size_t size;
int error;
static void sk_lock(struct sk_out *sk_out)
{
- fio_mutex_down(&sk_out->lock);
+ fio_sem_down(&sk_out->lock);
}
static void sk_unlock(struct sk_out *sk_out)
{
- fio_mutex_up(&sk_out->lock);
+ fio_sem_up(&sk_out->lock);
}
void sk_out_assign(struct sk_out *sk_out)
static void sk_out_free(struct sk_out *sk_out)
{
- __fio_mutex_remove(&sk_out->lock);
- __fio_mutex_remove(&sk_out->wait);
- __fio_mutex_remove(&sk_out->xmit);
+ __fio_sem_remove(&sk_out->lock);
+ __fio_sem_remove(&sk_out->wait);
+ __fio_sem_remove(&sk_out->xmit);
sfree(sk_out);
}
return fio_sendv_data(sk, &iov, 1);
}
+bool fio_server_poll_fd(int fd, short events, int timeout)
+{
+ struct pollfd pfd = {
+ .fd = fd,
+ .events = events,
+ };
+ int ret;
+
+ ret = poll(&pfd, 1, timeout);
+ if (ret < 0) {
+ if (errno == EINTR)
+ return false;
+ log_err("fio: poll: %s\n", strerror(errno));
+ return false;
+ } else if (!ret) {
+ return false;
+ }
+ if (pfd.revents & events)
+ return true;
+ return false;
+}
+
static int fio_recv_data(int sk, void *buf, unsigned int len, bool wait)
{
int flags;
if (crc != cmd->cmd_crc16) {
log_err("fio: server bad crc on command (got %x, wanted %x)\n",
cmd->cmd_crc16, crc);
+ fprintf(f_err, "fio: server bad crc on command (got %x, wanted %x)\n",
+ cmd->cmd_crc16, crc);
return 1;
}
break;
default:
log_err("fio: bad server cmd version %d\n", cmd->version);
+ fprintf(f_err, "fio: client/server version mismatch (%d != %d)\n",
+ cmd->version, FIO_SERVER_VER);
return 1;
}
if (cmdret->opcode == FIO_NET_CMD_TEXT) {
struct cmd_text_pdu *__pdu = (struct cmd_text_pdu *) cmdret->payload;
char *buf = (char *) __pdu->buf;
+ int len = le32_to_cpu(__pdu->buf_len);
- buf[__pdu->buf_len] = '\0';
+ buf[len] = '\0';
} else if (cmdret->opcode == FIO_NET_CMD_JOB) {
struct cmd_job_pdu *__pdu = (struct cmd_job_pdu *) cmdret->payload;
char *buf = (char *) __pdu->buf;
struct sk_entry *entry;
entry = smalloc(sizeof(*entry));
+ if (!entry)
+ return NULL;
+
INIT_FLIST_HEAD(&entry->next);
entry->opcode = opcode;
if (flags & SK_F_COPY) {
flist_add_tail(&entry->list, &sk_out->list);
sk_unlock(sk_out);
- fio_mutex_up(&sk_out->wait);
+ fio_sem_up(&sk_out->wait);
}
}
{
dprint(FD_NET, "server: sending quit\n");
- return fio_net_queue_cmd(FIO_NET_CMD_QUIT, NULL, 0, NULL, SK_F_SIMPLE | SK_F_INLINE);
+ return fio_net_queue_cmd(FIO_NET_CMD_QUIT, NULL, 0, NULL, SK_F_SIMPLE);
}
int fio_net_send_quit(int sk)
epdu.error = __cpu_to_le32(error);
epdu.signal = __cpu_to_le32(signal);
- return fio_net_queue_cmd(FIO_NET_CMD_STOP, &epdu, sizeof(epdu), &tag, SK_F_COPY | SK_F_INLINE);
+ return fio_net_queue_cmd(FIO_NET_CMD_STOP, &epdu, sizeof(epdu), &tag, SK_F_COPY);
}
static int fio_net_queue_stop(int error, int signal)
return fio_net_send_ack(NULL, error, signal);
}
+#ifdef WIN32
+static void fio_server_add_fork_item(struct ffi_element *element, struct flist_head *list)
+{
+ struct fio_fork_item *ffi;
+
+ ffi = malloc(sizeof(*ffi));
+ ffi->exitval = 0;
+ ffi->signal = 0;
+ ffi->exited = 0;
+ ffi->element = *element;
+ flist_add_tail(&ffi->list, list);
+}
+
+static void fio_server_add_conn_pid(struct flist_head *conn_list, HANDLE hProcess)
+{
+ struct ffi_element element = {.hProcess = hProcess, .is_thread=FALSE};
+ dprint(FD_NET, "server: forked off connection job (tid=%u)\n", (int) element.thread);
+
+ fio_server_add_fork_item(&element, conn_list);
+}
+
+static void fio_server_add_job_pid(struct flist_head *job_list, pthread_t thread)
+{
+ struct ffi_element element = {.thread = thread, .is_thread=TRUE};
+ dprint(FD_NET, "server: forked off job job (tid=%u)\n", (int) element.thread);
+ fio_server_add_fork_item(&element, job_list);
+}
+
+static void fio_server_check_fork_item(struct fio_fork_item *ffi)
+{
+ int ret;
+
+ if (ffi->element.is_thread) {
+
+ ret = pthread_kill(ffi->element.thread, 0);
+ if (ret) {
+ int rev_val;
+ pthread_join(ffi->element.thread, (void**) &rev_val); /*if the thread is dead, then join it to get status*/
+
+ ffi->exitval = rev_val;
+ if (ffi->exitval)
+ log_err("thread (tid=%u) exited with %x\n", (int) ffi->element.thread, (int) ffi->exitval);
+ dprint(FD_PROCESS, "thread (tid=%u) exited with %x\n", (int) ffi->element.thread, (int) ffi->exitval);
+ ffi->exited = 1;
+ }
+ } else {
+ DWORD exit_val;
+ GetExitCodeProcess(ffi->element.hProcess, &exit_val);
+
+ if (exit_val != STILL_ACTIVE) {
+ dprint(FD_PROCESS, "process %u exited with %d\n", GetProcessId(ffi->element.hProcess), exit_val);
+ ffi->exited = 1;
+ ffi->exitval = exit_val;
+ }
+ }
+}
+#else
static void fio_server_add_fork_item(pid_t pid, struct flist_head *list)
{
struct fio_fork_item *ffi;
}
}
}
+#endif
static void fio_server_fork_item_done(struct fio_fork_item *ffi, bool stop)
{
+#ifdef WIN32
+ if (ffi->element.is_thread)
+ dprint(FD_NET, "tid %u exited, sig=%u, exitval=%d\n", (int) ffi->element.thread, ffi->signal, ffi->exitval);
+ else {
+ dprint(FD_NET, "pid %u exited, sig=%u, exitval=%d\n", (int) GetProcessId(ffi->element.hProcess), ffi->signal, ffi->exitval);
+ CloseHandle(ffi->element.hProcess);
+ ffi->element.hProcess = INVALID_HANDLE_VALUE;
+ }
+#else
dprint(FD_NET, "pid %u exited, sig=%u, exitval=%d\n", (int) ffi->pid, ffi->signal, ffi->exitval);
+#endif
/*
* Fold STOP and QUIT...
return 0;
}
-static int handle_run_cmd(struct sk_out *sk_out, struct flist_head *job_list,
- struct fio_net_cmd *cmd)
+#ifdef WIN32
+static void *fio_backend_thread(void *data)
{
- pid_t pid;
int ret;
+ struct sk_out *sk_out = (struct sk_out *) data;
sk_out_assign(sk_out);
+ ret = fio_backend(sk_out);
+ sk_out_drop();
+
+ pthread_exit((void*) (intptr_t) ret);
+ return NULL;
+}
+#endif
+
+static int handle_run_cmd(struct sk_out *sk_out, struct flist_head *job_list,
+ struct fio_net_cmd *cmd)
+{
+ int ret;
+
fio_time_init();
set_genesis_time();
- pid = fork();
- if (pid) {
- fio_server_add_job_pid(job_list, pid);
- return 0;
+#ifdef WIN32
+ {
+ pthread_t thread;
+ /* both this thread and backend_thread call sk_out_assign() to double increment
+ * the ref count. This ensures struct is valid until both threads are done with it
+ */
+ sk_out_assign(sk_out);
+ ret = pthread_create(&thread, NULL, fio_backend_thread, sk_out);
+ if (ret) {
+ log_err("pthread_create: %s\n", strerror(ret));
+ return ret;
+ }
+
+ fio_server_add_job_pid(job_list, thread);
+ return ret;
}
+#else
+ {
+ pid_t pid;
+ sk_out_assign(sk_out);
+ pid = fork();
+ if (pid) {
+ fio_server_add_job_pid(job_list, pid);
+ return 0;
+ }
- ret = fio_backend(sk_out);
- free_threads_shm();
- sk_out_drop();
- _exit(ret);
+ ret = fio_backend(sk_out);
+ free_threads_shm();
+ sk_out_drop();
+ _exit(ret);
+ }
+#endif
}
static int handle_job_cmd(struct fio_net_cmd *cmd)
static int handle_probe_cmd(struct fio_net_cmd *cmd)
{
struct cmd_client_probe_pdu *pdu = (struct cmd_client_probe_pdu *) cmd->payload;
- struct cmd_probe_reply_pdu probe;
uint64_t tag = cmd->tag;
+ struct cmd_probe_reply_pdu probe = {
+#ifdef CONFIG_BIG_ENDIAN
+ .bigendian = 1,
+#endif
+ .os = FIO_OS,
+ .arch = FIO_ARCH,
+ .bpp = sizeof(void *),
+ .cpus = __cpu_to_le32(cpus_online()),
+ };
dprint(FD_NET, "server: sending probe reply\n");
strcpy(me, (char *) pdu->server);
- memset(&probe, 0, sizeof(probe));
gethostname((char *) probe.hostname, sizeof(probe.hostname));
-#ifdef CONFIG_BIG_ENDIAN
- probe.bigendian = 1;
-#endif
- strncpy((char *) probe.fio_version, fio_version_string, sizeof(probe.fio_version) - 1);
-
- probe.os = FIO_OS;
- probe.arch = FIO_ARCH;
- probe.bpp = sizeof(void *);
- probe.cpus = __cpu_to_le32(cpus_online());
+ snprintf((char *) probe.fio_version, sizeof(probe.fio_version), "%s",
+ fio_version_string);
/*
* If the client supports compression and we do too, then enable it
return 0;
}
- td = &threads[tnumber - 1];
+ td = tnumber_to_td(tnumber);
convert_thread_options_to_cpu(&td->o, &pdu->top);
send_update_job_reply(cmd->tag, 0);
return 0;
} else
fio_net_queue_cmd(FIO_NET_CMD_VTRIGGER, rep, sz, NULL, SK_F_FREE | SK_F_INLINE);
- fio_terminate_threads(TERMINATE_ALL);
+ fio_terminate_threads(TERMINATE_ALL, TERMINATE_ALL);
fio_server_check_jobs(job_list);
exec_trigger(buf);
return 0;
switch (cmd->opcode) {
case FIO_NET_CMD_QUIT:
- fio_terminate_threads(TERMINATE_ALL);
+ fio_terminate_threads(TERMINATE_ALL, TERMINATE_ALL);
ret = 0;
break;
case FIO_NET_CMD_EXIT:
- exit_backend = 1;
+ exit_backend = true;
return -1;
case FIO_NET_CMD_LOAD_FILE:
ret = handle_load_file_cmd(cmd);
memcpy(rep->data, in->data, in->size);
}
}
- fio_mutex_up(&rep->lock);
+ fio_sem_up(&rep->lock);
break;
}
default:
{
int ret;
- fio_mutex_down(&sk_out->xmit);
+ fio_sem_down(&sk_out->xmit);
if (entry->flags & SK_F_VEC)
ret = send_vec_entry(sk_out, entry);
entry->size, &entry->tag, NULL);
}
- fio_mutex_up(&sk_out->xmit);
+ fio_sem_up(&sk_out->xmit);
if (ret)
log_err("fio: failed handling cmd %s\n", fio_server_op(entry->opcode));
sk_unlock(sk_out);
while (!flist_empty(&list)) {
- entry = flist_entry(list.next, struct sk_entry, list);
+ entry = flist_first_entry(&list, struct sk_entry, list);
flist_del(&entry->list);
ret += handle_sk_entry(sk_out, entry);
}
.events = POLLIN,
};
- ret = 0;
do {
int timeout = 1000;
break;
} else if (!ret) {
fio_server_check_jobs(&job_list);
- fio_mutex_down_timeout(&sk_out->wait, timeout);
+ fio_sem_down_timeout(&sk_out->wait, timeout);
continue;
}
if (ret < 0)
break;
- cmd = fio_net_recv_cmd(sk_out->sk, true);
+ if (pfd.revents & POLLIN)
+ cmd = fio_net_recv_cmd(sk_out->sk, true);
if (!cmd) {
ret = -1;
break;
_exit(ret);
}
-/* get the address on this host bound by the input socket,
+/* get the address on this host bound by the input socket,
* whether it is ipv6 or ipv4 */
static int get_my_addr_str(int sk)
return 0;
}
+#ifdef WIN32
+static int handle_connection_process(void)
+{
+ WSAPROTOCOL_INFO protocol_info;
+ DWORD bytes_read;
+ HANDLE hpipe;
+ int sk;
+ struct sk_out *sk_out;
+ int ret;
+ char *msg = (char *) "connected";
+
+ log_info("server enter accept loop. ProcessID %d\n", GetCurrentProcessId());
+
+ hpipe = CreateFile(
+ fio_server_pipe_name,
+ GENERIC_READ | GENERIC_WRITE,
+ 0, NULL,
+ OPEN_EXISTING,
+ 0, NULL);
+
+ if (hpipe == INVALID_HANDLE_VALUE) {
+ log_err("couldnt open pipe %s error %lu\n",
+ fio_server_pipe_name, GetLastError());
+ return -1;
+ }
+
+ if (!ReadFile(hpipe, &protocol_info, sizeof(protocol_info), &bytes_read, NULL)) {
+ log_err("couldnt read pi from pipe %s error %lu\n", fio_server_pipe_name,
+ GetLastError());
+ }
+
+ if (use_ipv6) /* use protocol_info to create a duplicate of parents socket */
+ sk = WSASocket(AF_INET6, SOCK_STREAM, 0, &protocol_info, 0, 0);
+ else
+ sk = WSASocket(AF_INET, SOCK_STREAM, 0, &protocol_info, 0, 0);
+
+ sk_out = scalloc(1, sizeof(*sk_out));
+ if (!sk_out) {
+ CloseHandle(hpipe);
+ close(sk);
+ return -1;
+ }
+
+ sk_out->sk = sk;
+ sk_out->hProcess = INVALID_HANDLE_VALUE;
+ INIT_FLIST_HEAD(&sk_out->list);
+ __fio_sem_init(&sk_out->lock, FIO_SEM_UNLOCKED);
+ __fio_sem_init(&sk_out->wait, FIO_SEM_LOCKED);
+ __fio_sem_init(&sk_out->xmit, FIO_SEM_UNLOCKED);
+
+ get_my_addr_str(sk);
+
+ if (!WriteFile(hpipe, msg, strlen(msg), NULL, NULL)) {
+ log_err("couldnt write pipe\n");
+ close(sk);
+ return -1;
+ }
+ CloseHandle(hpipe);
+
+ sk_out_assign(sk_out);
+
+ ret = handle_connection(sk_out);
+ __sk_out_drop(sk_out);
+ return ret;
+}
+#endif
+
static int accept_loop(int listen_sk)
{
struct sockaddr_in addr;
struct sk_out *sk_out;
const char *from;
char buf[64];
+#ifdef WIN32
+ HANDLE hProcess;
+#else
pid_t pid;
-
+#endif
pfd.fd = listen_sk;
pfd.events = POLLIN;
do {
dprint(FD_NET, "server: connect from %s\n", from);
- sk_out = smalloc(sizeof(*sk_out));
+ sk_out = scalloc(1, sizeof(*sk_out));
+ if (!sk_out) {
+ close(sk);
+ return -1;
+ }
+
sk_out->sk = sk;
INIT_FLIST_HEAD(&sk_out->list);
- __fio_mutex_init(&sk_out->lock, FIO_MUTEX_UNLOCKED);
- __fio_mutex_init(&sk_out->wait, FIO_MUTEX_LOCKED);
- __fio_mutex_init(&sk_out->xmit, FIO_MUTEX_UNLOCKED);
+ __fio_sem_init(&sk_out->lock, FIO_SEM_UNLOCKED);
+ __fio_sem_init(&sk_out->wait, FIO_SEM_LOCKED);
+ __fio_sem_init(&sk_out->xmit, FIO_SEM_UNLOCKED);
+#ifdef WIN32
+ hProcess = windows_handle_connection(hjob, sk);
+ if (hProcess == INVALID_HANDLE_VALUE)
+ return -1;
+ sk_out->hProcess = hProcess;
+ fio_server_add_conn_pid(&conn_list, hProcess);
+#else
pid = fork();
if (pid) {
close(sk);
*/
sk_out_assign(sk_out);
handle_connection(sk_out);
+#endif
}
return exitval;
dst->unit_base = cpu_to_le32(src->unit_base);
dst->groupid = cpu_to_le32(src->groupid);
dst->unified_rw_rep = cpu_to_le32(src->unified_rw_rep);
+ dst->sig_figs = cpu_to_le32(src->sig_figs);
}
/*
void fio_server_send_ts(struct thread_stat *ts, struct group_run_stats *rs)
{
struct cmd_ts_pdu p;
- int i, j;
- void *ss_buf;
- uint64_t *ss_iops, *ss_bw;
+ int i, j, k;
+ size_t clat_prio_stats_extra_size = 0;
+ size_t ss_extra_size = 0;
+ size_t extended_buf_size = 0;
+ void *extended_buf;
+ void *extended_buf_wp;
dprint(FD_NET, "server sending end stats\n");
memset(&p, 0, sizeof(p));
- strncpy(p.ts.name, ts->name, FIO_JOBNAME_SIZE - 1);
- strncpy(p.ts.verror, ts->verror, FIO_VERROR_SIZE - 1);
- strncpy(p.ts.description, ts->description, FIO_JOBDESC_SIZE - 1);
+ snprintf(p.ts.name, sizeof(p.ts.name), "%s", ts->name);
+ snprintf(p.ts.verror, sizeof(p.ts.verror), "%s", ts->verror);
+ snprintf(p.ts.description, sizeof(p.ts.description), "%s",
+ ts->description);
p.ts.error = cpu_to_le32(ts->error);
p.ts.thread_number = cpu_to_le32(ts->thread_number);
p.ts.pid = cpu_to_le32(ts->pid);
p.ts.members = cpu_to_le32(ts->members);
p.ts.unified_rw_rep = cpu_to_le32(ts->unified_rw_rep);
+ p.ts.ioprio = cpu_to_le32(ts->ioprio);
+ p.ts.disable_prio_stat = cpu_to_le32(ts->disable_prio_stat);
for (i = 0; i < DDIR_RWDIR_CNT; i++) {
convert_io_stat(&p.ts.clat_stat[i], &ts->clat_stat[i]);
convert_io_stat(&p.ts.bw_stat[i], &ts->bw_stat[i]);
convert_io_stat(&p.ts.iops_stat[i], &ts->iops_stat[i]);
}
+ convert_io_stat(&p.ts.sync_stat, &ts->sync_stat);
p.ts.usr_time = cpu_to_le64(ts->usr_time);
p.ts.sys_time = cpu_to_le64(ts->sys_time);
p.ts.majf = cpu_to_le64(ts->majf);
p.ts.clat_percentiles = cpu_to_le32(ts->clat_percentiles);
p.ts.lat_percentiles = cpu_to_le32(ts->lat_percentiles);
+ p.ts.slat_percentiles = cpu_to_le32(ts->slat_percentiles);
p.ts.percentile_precision = cpu_to_le64(ts->percentile_precision);
for (i = 0; i < FIO_IO_U_LIST_MAX_LEN; i++) {
}
for (i = 0; i < FIO_IO_U_MAP_NR; i++) {
- p.ts.io_u_map[i] = cpu_to_le32(ts->io_u_map[i]);
- p.ts.io_u_submit[i] = cpu_to_le32(ts->io_u_submit[i]);
- p.ts.io_u_complete[i] = cpu_to_le32(ts->io_u_complete[i]);
+ p.ts.io_u_map[i] = cpu_to_le64(ts->io_u_map[i]);
+ p.ts.io_u_submit[i] = cpu_to_le64(ts->io_u_submit[i]);
+ p.ts.io_u_complete[i] = cpu_to_le64(ts->io_u_complete[i]);
}
for (i = 0; i < FIO_IO_U_LAT_N_NR; i++)
- p.ts.io_u_lat_n[i] = cpu_to_le32(ts->io_u_lat_n[i]);
+ p.ts.io_u_lat_n[i] = cpu_to_le64(ts->io_u_lat_n[i]);
for (i = 0; i < FIO_IO_U_LAT_U_NR; i++)
- p.ts.io_u_lat_u[i] = cpu_to_le32(ts->io_u_lat_u[i]);
+ p.ts.io_u_lat_u[i] = cpu_to_le64(ts->io_u_lat_u[i]);
for (i = 0; i < FIO_IO_U_LAT_M_NR; i++)
- p.ts.io_u_lat_m[i] = cpu_to_le32(ts->io_u_lat_m[i]);
+ p.ts.io_u_lat_m[i] = cpu_to_le64(ts->io_u_lat_m[i]);
- for (i = 0; i < DDIR_RWDIR_CNT; i++)
- for (j = 0; j < FIO_IO_U_PLAT_NR; j++)
- p.ts.io_u_plat[i][j] = cpu_to_le32(ts->io_u_plat[i][j]);
+ for (i = 0; i < FIO_LAT_CNT; i++)
+ for (j = 0; j < DDIR_RWDIR_CNT; j++)
+ for (k = 0; k < FIO_IO_U_PLAT_NR; k++)
+ p.ts.io_u_plat[i][j][k] = cpu_to_le64(ts->io_u_plat[i][j][k]);
- for (i = 0; i < DDIR_RWDIR_CNT; i++) {
+ for (j = 0; j < FIO_IO_U_PLAT_NR; j++)
+ p.ts.io_u_sync_plat[j] = cpu_to_le64(ts->io_u_sync_plat[j]);
+
+ for (i = 0; i < DDIR_RWDIR_SYNC_CNT; i++)
p.ts.total_io_u[i] = cpu_to_le64(ts->total_io_u[i]);
+
+ for (i = 0; i < DDIR_RWDIR_CNT; i++) {
p.ts.short_io_u[i] = cpu_to_le64(ts->short_io_u[i]);
p.ts.drop_io_u[i] = cpu_to_le64(ts->drop_io_u[i]);
}
p.ts.total_submit = cpu_to_le64(ts->total_submit);
p.ts.total_complete = cpu_to_le64(ts->total_complete);
+ p.ts.nr_zone_resets = cpu_to_le64(ts->nr_zone_resets);
for (i = 0; i < DDIR_RWDIR_CNT; i++) {
p.ts.io_bytes[i] = cpu_to_le64(ts->io_bytes[i]);
p.ts.ss_deviation.u.i = cpu_to_le64(fio_double_to_uint64(ts->ss_deviation.u.f));
p.ts.ss_criterion.u.i = cpu_to_le64(fio_double_to_uint64(ts->ss_criterion.u.f));
+ p.ts.cachehit = cpu_to_le64(ts->cachehit);
+ p.ts.cachemiss = cpu_to_le64(ts->cachemiss);
+
convert_gs(&p.rs, rs);
+ for (i = 0; i < DDIR_RWDIR_CNT; i++) {
+ if (ts->nr_clat_prio[i])
+ clat_prio_stats_extra_size += ts->nr_clat_prio[i] * sizeof(*ts->clat_prio[i]);
+ }
+ extended_buf_size += clat_prio_stats_extra_size;
+
dprint(FD_NET, "ts->ss_state = %d\n", ts->ss_state);
- if (ts->ss_state & FIO_SS_DATA) {
- dprint(FD_NET, "server sending steadystate ring buffers\n");
+ if (ts->ss_state & FIO_SS_DATA)
+ ss_extra_size = 2 * ts->ss_dur * sizeof(uint64_t);
+
+ extended_buf_size += ss_extra_size;
+ if (!extended_buf_size) {
+ fio_net_queue_cmd(FIO_NET_CMD_TS, &p, sizeof(p), NULL, SK_F_COPY);
+ return;
+ }
- ss_buf = malloc(sizeof(p) + 2*ts->ss_dur*sizeof(uint64_t));
+ extended_buf_size += sizeof(p);
+ extended_buf = calloc(1, extended_buf_size);
+ if (!extended_buf) {
+ log_err("fio: failed to allocate FIO_NET_CMD_TS buffer\n");
+ return;
+ }
- memcpy(ss_buf, &p, sizeof(p));
+ memcpy(extended_buf, &p, sizeof(p));
+ extended_buf_wp = (struct cmd_ts_pdu *)extended_buf + 1;
- ss_iops = (uint64_t *) ((struct cmd_ts_pdu *)ss_buf + 1);
- ss_bw = ss_iops + (int) ts->ss_dur;
- for (i = 0; i < ts->ss_dur; i++) {
- ss_iops[i] = cpu_to_le64(ts->ss_iops_data[i]);
- ss_bw[i] = cpu_to_le64(ts->ss_bw_data[i]);
+ if (clat_prio_stats_extra_size) {
+ for (i = 0; i < DDIR_RWDIR_CNT; i++) {
+ struct clat_prio_stat *prio = (struct clat_prio_stat *) extended_buf_wp;
+
+ for (j = 0; j < ts->nr_clat_prio[i]; j++) {
+ for (k = 0; k < FIO_IO_U_PLAT_NR; k++)
+ prio->io_u_plat[k] =
+ cpu_to_le64(ts->clat_prio[i][j].io_u_plat[k]);
+ convert_io_stat(&prio->clat_stat,
+ &ts->clat_prio[i][j].clat_stat);
+ prio->ioprio = cpu_to_le32(ts->clat_prio[i][j].ioprio);
+ prio++;
+ }
+
+ if (ts->nr_clat_prio[i]) {
+ uint64_t offset = (char *)extended_buf_wp - (char *)extended_buf;
+ struct cmd_ts_pdu *ptr = extended_buf;
+
+ ptr->ts.clat_prio_offset[i] = cpu_to_le64(offset);
+ ptr->ts.nr_clat_prio[i] = cpu_to_le32(ts->nr_clat_prio[i]);
+ }
+
+ extended_buf_wp = prio;
}
+ }
+
+ if (ss_extra_size) {
+ uint64_t *ss_iops, *ss_bw;
+ uint64_t offset;
+ struct cmd_ts_pdu *ptr = extended_buf;
- fio_net_queue_cmd(FIO_NET_CMD_TS, ss_buf, sizeof(p) + 2*ts->ss_dur*sizeof(uint64_t), NULL, SK_F_COPY);
+ dprint(FD_NET, "server sending steadystate ring buffers\n");
+
+ /* ss iops */
+ ss_iops = (uint64_t *) extended_buf_wp;
+ for (i = 0; i < ts->ss_dur; i++)
+ ss_iops[i] = cpu_to_le64(ts->ss_iops_data[i]);
+
+ offset = (char *)extended_buf_wp - (char *)extended_buf;
+ ptr->ts.ss_iops_data_offset = cpu_to_le64(offset);
+ extended_buf_wp = ss_iops + (int) ts->ss_dur;
- free(ss_buf);
+ /* ss bw */
+ ss_bw = extended_buf_wp;
+ for (i = 0; i < ts->ss_dur; i++)
+ ss_bw[i] = cpu_to_le64(ts->ss_bw_data[i]);
+
+ offset = (char *)extended_buf_wp - (char *)extended_buf;
+ ptr->ts.ss_bw_data_offset = cpu_to_le64(offset);
+ extended_buf_wp = ss_bw + (int) ts->ss_dur;
}
- else
- fio_net_queue_cmd(FIO_NET_CMD_TS, &p, sizeof(p), NULL, SK_F_COPY);
+
+ fio_net_queue_cmd(FIO_NET_CMD_TS, extended_buf, extended_buf_size, NULL, SK_F_COPY);
+ free(extended_buf);
}
void fio_server_send_gs(struct group_run_stats *rs)
}
void fio_server_send_job_options(struct flist_head *opt_list,
- unsigned int groupid)
+ unsigned int gid)
{
struct cmd_job_option pdu;
struct flist_head *entry;
p = flist_entry(entry, struct print_option, list);
memset(&pdu, 0, sizeof(pdu));
- if (groupid == -1U) {
+ if (gid == -1U) {
pdu.global = __cpu_to_le16(1);
pdu.groupid = 0;
} else {
pdu.global = 0;
- pdu.groupid = cpu_to_le32(groupid);
+ pdu.groupid = cpu_to_le32(gid);
}
len = strlen(p->name);
if (len >= sizeof(pdu.name)) {
{
int i;
- dst->name[FIO_DU_NAME_SZ - 1] = '\0';
- strncpy((char *) dst->name, (char *) src->name, FIO_DU_NAME_SZ - 1);
+ snprintf((char *) dst->name, sizeof(dst->name), "%s", src->name);
for (i = 0; i < 2; i++) {
dst->s.ios[i] = cpu_to_le64(src->s.ios[i]);
*last_entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, out_pdu, this_len,
NULL, SK_F_VEC | SK_F_INLINE | SK_F_FREE);
- flist_add_tail(&(*last_entry)->list, &first->next);
-
+ if (*last_entry)
+ flist_add_tail(&(*last_entry)->list, &first->next);
}
/*
stream->next_in = next_in;
stream->avail_in = next_sz;
do {
- if (! stream->avail_out) {
-
+ if (!stream->avail_out) {
__fio_net_prep_tail(stream, *out_pdu, last_entry, first);
+ if (*last_entry == NULL)
+ return 1;
*out_pdu = malloc(FIO_SERVER_MAX_FRAGMENT_PDU);
for (i = 0; i < cur_log->nr_samples; i++) {
struct io_sample *s;
struct io_u_plat_entry *cur_plat_entry, *prev_plat_entry;
- unsigned int *cur_plat, *prev_plat;
+ uint64_t *cur_plat, *prev_plat;
s = get_sample(log, cur_log, i);
ret = __deflate_pdu_buffer(s, sample_sz, &out_pdu, &entry, stream, first);
}
__fio_net_prep_tail(stream, out_pdu, &entry, first);
-
- return 0;
+ return entry == NULL;
}
static int __fio_append_iolog_gz(struct sk_entry *first, struct io_log *log,
entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, out_pdu, this_len,
NULL, SK_F_VEC | SK_F_INLINE | SK_F_FREE);
+ if (!entry) {
+ free(out_pdu);
+ return 1;
+ }
flist_add_tail(&entry->list, &first->next);
} while (stream->avail_in);
static int fio_append_iolog_gz(struct sk_entry *first, struct io_log *log)
{
+ z_stream stream = {
+ .zalloc = Z_NULL,
+ .zfree = Z_NULL,
+ .opaque = Z_NULL,
+ };
int ret = 0;
- z_stream stream;
-
- memset(&stream, 0, sizeof(stream));
- stream.zalloc = Z_NULL;
- stream.zfree = Z_NULL;
- stream.opaque = Z_NULL;
if (deflateInit(&stream, Z_DEFAULT_COMPRESSION) != Z_OK)
return 1;
entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, out_pdu, this_len,
NULL, SK_F_VEC | SK_F_INLINE | SK_F_FREE);
+ if (!entry) {
+ free(out_pdu);
+ break;
+ }
flist_add_tail(&entry->list, &first->next);
- } while (ret != Z_STREAM_END);
+ }
ret = deflateEnd(&stream);
if (ret == Z_OK)
{
struct sk_entry *entry;
struct flist_head *node;
+ int ret = 0;
pthread_mutex_lock(&log->chunk_lock);
flist_for_each(node, &log->chunk_list) {
c = flist_entry(node, struct iolog_compress, list);
entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, c->buf, c->len,
NULL, SK_F_VEC | SK_F_INLINE);
+ if (!entry) {
+ ret = 1;
+ break;
+ }
flist_add_tail(&entry->list, &first->next);
}
pthread_mutex_unlock(&log->chunk_lock);
-
- return 0;
+ return ret;
}
static int fio_append_text_log(struct sk_entry *first, struct io_log *log)
{
struct sk_entry *entry;
+ int ret = 0;
while (!flist_empty(&log->io_logs)) {
struct io_logs *cur_log;
entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, cur_log->log, size,
NULL, SK_F_VEC | SK_F_INLINE);
+ if (!entry) {
+ ret = 1;
+ break;
+ }
flist_add_tail(&entry->list, &first->next);
}
- return 0;
+ return ret;
}
int fio_send_iolog(struct thread_data *td, struct io_log *log, const char *name)
else
pdu.compressed = 0;
- strncpy((char *) pdu.name, name, FIO_NET_NAME_MAX);
- pdu.name[FIO_NET_NAME_MAX - 1] = '\0';
+ snprintf((char *) pdu.name, sizeof(pdu.name), "%s", name);
/*
* We can't do this for a pre-compressed log, but for that case,
struct io_sample *s = get_sample(log, cur_log, i);
s->time = cpu_to_le64(s->time);
- s->data.val = cpu_to_le64(s->data.val);
- s->__ddir = cpu_to_le32(s->__ddir);
- s->bs = cpu_to_le32(s->bs);
+ if (log->log_type != IO_LOG_TYPE_HIST)
+ s->data.val = cpu_to_le64(s->data.val);
+ s->__ddir = __cpu_to_le32(s->__ddir);
+ s->bs = cpu_to_le64(s->bs);
if (log->log_offset) {
struct io_sample_offset *so = (void *) s;
* Assemble header entry first
*/
first = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, &pdu, sizeof(pdu), NULL, SK_F_VEC | SK_F_INLINE | SK_F_COPY);
+ if (!first)
+ return 1;
/*
* Now append actual log entries. If log compression was enabled on
void fio_server_send_add_job(struct thread_data *td)
{
- struct cmd_add_job_pdu pdu;
+ struct cmd_add_job_pdu pdu = {
+ .thread_number = cpu_to_le32(td->thread_number),
+ .groupid = cpu_to_le32(td->groupid),
+ };
- memset(&pdu, 0, sizeof(pdu));
- pdu.thread_number = cpu_to_le32(td->thread_number);
- pdu.groupid = cpu_to_le32(td->groupid);
convert_thread_options_to_net(&pdu.top, &td->o);
fio_net_queue_cmd(FIO_NET_CMD_ADD_JOB, &pdu, sizeof(pdu), NULL,
if (!rep)
return ENOMEM;
- __fio_mutex_init(&rep->lock, FIO_MUTEX_LOCKED);
+ __fio_sem_init(&rep->lock, FIO_SEM_LOCKED);
rep->data = NULL;
rep->error = 0;
/*
* Wait for the backend to receive the reply
*/
- if (fio_mutex_down_timeout(&rep->lock, 10000)) {
+ if (fio_sem_down_timeout(&rep->lock, 10000)) {
log_err("fio: timed out waiting for reply\n");
ret = ETIMEDOUT;
goto fail;
*datap = data;
sfree(rep->data);
- __fio_mutex_remove(&rep->lock);
+ __fio_sem_remove(&rep->lock);
sfree(rep);
return ret;
}
/*
* Not fatal if fails, so just ignore it if that happens
*/
- setsockopt(sk, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt));
+ if (setsockopt(sk, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt))) {
+ }
#endif
if (use_ipv6) {
- const void *src = &saddr_in6.sin6_addr;
+ void *src = &saddr_in6.sin6_addr;
addr = (struct sockaddr *) &saddr_in6;
socklen = sizeof(saddr_in6);
saddr_in6.sin6_family = AF_INET6;
str = inet_ntop(AF_INET6, src, buf, sizeof(buf));
} else {
- const void *src = &saddr_in.sin_addr;
+ void *src = &saddr_in.sin_addr;
addr = (struct sockaddr *) &saddr_in;
socklen = sizeof(saddr_in);
mode = umask(000);
- memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
- strncpy(addr.sun_path, bind_sock, sizeof(addr.sun_path) - 1);
+ snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", bind_sock);
len = sizeof(addr.sun_family) + strlen(bind_sock) + 1;
if (!bind_sock) {
char *p, port[16];
- const void *src;
+ void *src;
int af;
if (use_ipv6) {
if (p)
strcat(p, port);
else
- strncpy(bind_str, port, sizeof(bind_str) - 1);
+ snprintf(bind_str, sizeof(bind_str), "%s", port);
} else
- strncpy(bind_str, bind_sock, sizeof(bind_str) - 1);
+ snprintf(bind_str, sizeof(bind_str), "%s", bind_sock);
log_info("fio: server listening on %s\n", bind_str);
ret = inet_pton(AF_INET, host, inp);
if (ret != 1) {
- struct addrinfo hints, *res;
-
- memset(&hints, 0, sizeof(hints));
- hints.ai_family = ipv6 ? AF_INET6 : AF_INET;
- hints.ai_socktype = SOCK_STREAM;
+ struct addrinfo *res, hints = {
+ .ai_family = ipv6 ? AF_INET6 : AF_INET,
+ .ai_socktype = SOCK_STREAM,
+ };
ret = getaddrinfo(host, NULL, &hints, &res);
if (ret) {
static void set_sig_handlers(void)
{
- struct sigaction act;
+ struct sigaction act = {
+ .sa_handler = sig_int,
+ .sa_flags = SA_RESTART,
+ };
- memset(&act, 0, sizeof(act));
- act.sa_handler = sig_int;
- act.sa_flags = SA_RESTART;
sigaction(SIGINT, &act, NULL);
+
+ /* Windows uses SIGBREAK as a quit signal from other applications */
+#ifdef WIN32
+ sigaction(SIGBREAK, &act, NULL);
+#endif
}
void fio_server_destroy_sk_key(void)
if (fio_handle_server_arg())
return -1;
+ set_sig_handlers();
+
+#ifdef WIN32
+ /* if this is a child process, go handle the connection */
+ if (fio_server_pipe_name != NULL) {
+ ret = handle_connection_process();
+ return ret;
+ }
+
+ /* job to link child processes so they terminate together */
+ hjob = windows_create_job();
+ if (hjob == INVALID_HANDLE_VALUE)
+ return -1;
+#endif
+
sk = fio_init_server_connection();
if (sk < 0)
return -1;
- set_sig_handlers();
-
ret = accept_loop(sk);
close(sk);
sk_out->sk = -1;
else {
log_info("\nfio: terminating on signal %d\n", signal);
- exit_backend = 1;
+ exit_backend = true;
}
}
*/
int fio_start_server(char *pidfile)
{
+ FILE *file;
pid_t pid;
int ret;
setsid();
openlog("fio", LOG_NDELAY|LOG_NOWAIT|LOG_PID, LOG_USER);
- log_syslog = 1;
- close(STDIN_FILENO);
- close(STDOUT_FILENO);
- close(STDERR_FILENO);
+ log_syslog = true;
+
+ file = freopen("/dev/null", "r", stdin);
+ if (!file)
+ perror("freopen");
+
+ file = freopen("/dev/null", "w", stdout);
+ if (!file)
+ perror("freopen");
+
+ file = freopen("/dev/null", "w", stderr);
+ if (!file)
+ perror("freopen");
+
f_out = NULL;
f_err = NULL;
ret = fio_server();
+ fclose(stdin);
+ fclose(stdout);
+ fclose(stderr);
+
closelog();
unlink(pidfile);
free(pidfile);
{
fio_server_arg = strdup(arg);
}
+
+#ifdef WIN32
+void fio_server_internal_set(const char *arg)
+{
+ fio_server_pipe_name = strdup(arg);
+}
+#endif