X-Git-Url: https://git.kernel.dk/?p=fio.git;a=blobdiff_plain;f=server.c;h=248a2d44899c5c6191f83248209057e1980f01e2;hp=859a401b2e528da99c41674c6145514883e62953;hb=refs%2Fheads%2Fmaster;hpb=864314464e2772a9885da34ea041f130073affe9 diff --git a/server.c b/server.c index 859a401b..afaeb348 100644 --- a/server.c +++ b/server.c @@ -1,5 +1,6 @@ #include #include +#include #include #include #include @@ -63,12 +64,28 @@ static char me[128]; static pthread_key_t sk_out_key; +#ifdef WIN32 +static char *fio_server_pipe_name = NULL; +static HANDLE hjob = INVALID_HANDLE_VALUE; +struct ffi_element { + union { + pthread_t thread; + HANDLE hProcess; + }; + bool is_thread; +}; +#endif + struct fio_fork_item { struct flist_head list; int exitval; int signal; int exited; +#ifdef WIN32 + struct ffi_element element; +#else pid_t pid; +#endif }; struct cmd_reply { @@ -250,6 +267,28 @@ static int fio_send_data(int sk, const void *p, unsigned int len) return fio_sendv_data(sk, &iov, 1); } +bool fio_server_poll_fd(int fd, short events, int timeout) +{ + struct pollfd pfd = { + .fd = fd, + .events = events, + }; + int ret; + + ret = poll(&pfd, 1, timeout); + if (ret < 0) { + if (errno == EINTR) + return false; + log_err("fio: poll: %s\n", strerror(errno)); + return false; + } else if (!ret) { + return false; + } + if (pfd.revents & events) + return true; + return false; +} + static int fio_recv_data(int sk, void *buf, unsigned int len, bool wait) { int flags; @@ -651,6 +690,63 @@ static int fio_net_queue_stop(int error, int signal) return fio_net_send_ack(NULL, error, signal); } +#ifdef WIN32 +static void fio_server_add_fork_item(struct ffi_element *element, struct flist_head *list) +{ + struct fio_fork_item *ffi; + + ffi = malloc(sizeof(*ffi)); + ffi->exitval = 0; + ffi->signal = 0; + ffi->exited = 0; + ffi->element = *element; + flist_add_tail(&ffi->list, list); +} + +static void fio_server_add_conn_pid(struct flist_head *conn_list, HANDLE hProcess) +{ + struct ffi_element element = {.hProcess = hProcess, .is_thread=FALSE}; + dprint(FD_NET, "server: forked off connection job (tid=%u)\n", (int) element.thread); + + fio_server_add_fork_item(&element, conn_list); +} + +static void fio_server_add_job_pid(struct flist_head *job_list, pthread_t thread) +{ + struct ffi_element element = {.thread = thread, .is_thread=TRUE}; + dprint(FD_NET, "server: forked off job job (tid=%u)\n", (int) element.thread); + fio_server_add_fork_item(&element, job_list); +} + +static void fio_server_check_fork_item(struct fio_fork_item *ffi) +{ + int ret; + + if (ffi->element.is_thread) { + + ret = pthread_kill(ffi->element.thread, 0); + if (ret) { + int rev_val; + pthread_join(ffi->element.thread, (void**) &rev_val); /*if the thread is dead, then join it to get status*/ + + ffi->exitval = rev_val; + if (ffi->exitval) + log_err("thread (tid=%u) exited with %x\n", (int) ffi->element.thread, (int) ffi->exitval); + dprint(FD_PROCESS, "thread (tid=%u) exited with %x\n", (int) ffi->element.thread, (int) ffi->exitval); + ffi->exited = 1; + } + } else { + DWORD exit_val; + GetExitCodeProcess(ffi->element.hProcess, &exit_val); + + if (exit_val != STILL_ACTIVE) { + dprint(FD_PROCESS, "process %u exited with %d\n", GetProcessId(ffi->element.hProcess), exit_val); + ffi->exited = 1; + ffi->exitval = exit_val; + } + } +} +#else static void fio_server_add_fork_item(pid_t pid, struct flist_head *list) { struct fio_fork_item *ffi; @@ -698,10 +794,21 @@ static void fio_server_check_fork_item(struct fio_fork_item *ffi) } } } +#endif static void fio_server_fork_item_done(struct fio_fork_item *ffi, bool stop) { +#ifdef WIN32 + if (ffi->element.is_thread) + dprint(FD_NET, "tid %u exited, sig=%u, exitval=%d\n", (int) ffi->element.thread, ffi->signal, ffi->exitval); + else { + dprint(FD_NET, "pid %u exited, sig=%u, exitval=%d\n", (int) GetProcessId(ffi->element.hProcess), ffi->signal, ffi->exitval); + CloseHandle(ffi->element.hProcess); + ffi->element.hProcess = INVALID_HANDLE_VALUE; + } +#else dprint(FD_NET, "pid %u exited, sig=%u, exitval=%d\n", (int) ffi->pid, ffi->signal, ffi->exitval); +#endif /* * Fold STOP and QUIT... @@ -762,27 +869,62 @@ static int handle_load_file_cmd(struct fio_net_cmd *cmd) return 0; } -static int handle_run_cmd(struct sk_out *sk_out, struct flist_head *job_list, - struct fio_net_cmd *cmd) +#ifdef WIN32 +static void *fio_backend_thread(void *data) { - pid_t pid; int ret; + struct sk_out *sk_out = (struct sk_out *) data; sk_out_assign(sk_out); + ret = fio_backend(sk_out); + sk_out_drop(); + + pthread_exit((void*) (intptr_t) ret); + return NULL; +} +#endif + +static int handle_run_cmd(struct sk_out *sk_out, struct flist_head *job_list, + struct fio_net_cmd *cmd) +{ + int ret; + fio_time_init(); set_genesis_time(); - pid = fork(); - if (pid) { - fio_server_add_job_pid(job_list, pid); - return 0; +#ifdef WIN32 + { + pthread_t thread; + /* both this thread and backend_thread call sk_out_assign() to double increment + * the ref count. This ensures struct is valid until both threads are done with it + */ + sk_out_assign(sk_out); + ret = pthread_create(&thread, NULL, fio_backend_thread, sk_out); + if (ret) { + log_err("pthread_create: %s\n", strerror(ret)); + return ret; + } + + fio_server_add_job_pid(job_list, thread); + return ret; } +#else + { + pid_t pid; + sk_out_assign(sk_out); + pid = fork(); + if (pid) { + fio_server_add_job_pid(job_list, pid); + return 0; + } - ret = fio_backend(sk_out); - free_threads_shm(); - sk_out_drop(); - _exit(ret); + ret = fio_backend(sk_out); + free_threads_shm(); + sk_out_drop(); + _exit(ret); + } +#endif } static int handle_job_cmd(struct fio_net_cmd *cmd) @@ -858,7 +1000,7 @@ static int handle_probe_cmd(struct fio_net_cmd *cmd) .os = FIO_OS, .arch = FIO_ARCH, .bpp = sizeof(void *), - .cpus = __cpu_to_le32(cpus_online()), + .cpus = __cpu_to_le32(cpus_configured()), }; dprint(FD_NET, "server: sending probe reply\n"); @@ -941,6 +1083,7 @@ static int handle_update_job_cmd(struct fio_net_cmd *cmd) struct cmd_add_job_pdu *pdu = (struct cmd_add_job_pdu *) cmd->payload; struct thread_data *td; uint32_t tnumber; + int ret; tnumber = le32_to_cpu(pdu->thread_number); @@ -952,8 +1095,9 @@ static int handle_update_job_cmd(struct fio_net_cmd *cmd) } td = tnumber_to_td(tnumber); - convert_thread_options_to_cpu(&td->o, &pdu->top); - send_update_job_reply(cmd->tag, 0); + ret = convert_thread_options_to_cpu(&td->o, &pdu->top, + cmd->pdu_len - offsetof(struct cmd_add_job_pdu, top)); + send_update_job_reply(cmd->tag, ret); return 0; } @@ -1182,7 +1326,7 @@ static int handle_xmits(struct sk_out *sk_out) sk_unlock(sk_out); while (!flist_empty(&list)) { - entry = flist_entry(list.next, struct sk_entry, list); + entry = flist_first_entry(&list, struct sk_entry, list); flist_del(&entry->list); ret += handle_sk_entry(sk_out, entry); } @@ -1238,7 +1382,8 @@ static int handle_connection(struct sk_out *sk_out) if (ret < 0) break; - cmd = fio_net_recv_cmd(sk_out->sk, true); + if (pfd.revents & POLLIN) + cmd = fio_net_recv_cmd(sk_out->sk, true); if (!cmd) { ret = -1; break; @@ -1300,6 +1445,73 @@ static int get_my_addr_str(int sk) return 0; } +#ifdef WIN32 +static int handle_connection_process(void) +{ + WSAPROTOCOL_INFO protocol_info; + DWORD bytes_read; + HANDLE hpipe; + int sk; + struct sk_out *sk_out; + int ret; + char *msg = (char *) "connected"; + + log_info("server enter accept loop. ProcessID %d\n", GetCurrentProcessId()); + + hpipe = CreateFile( + fio_server_pipe_name, + GENERIC_READ | GENERIC_WRITE, + 0, NULL, + OPEN_EXISTING, + 0, NULL); + + if (hpipe == INVALID_HANDLE_VALUE) { + log_err("couldnt open pipe %s error %lu\n", + fio_server_pipe_name, GetLastError()); + return -1; + } + + if (!ReadFile(hpipe, &protocol_info, sizeof(protocol_info), &bytes_read, NULL)) { + log_err("couldnt read pi from pipe %s error %lu\n", fio_server_pipe_name, + GetLastError()); + } + + if (use_ipv6) /* use protocol_info to create a duplicate of parents socket */ + sk = WSASocket(AF_INET6, SOCK_STREAM, 0, &protocol_info, 0, 0); + else + sk = WSASocket(AF_INET, SOCK_STREAM, 0, &protocol_info, 0, 0); + + sk_out = scalloc(1, sizeof(*sk_out)); + if (!sk_out) { + CloseHandle(hpipe); + close(sk); + return -1; + } + + sk_out->sk = sk; + sk_out->hProcess = INVALID_HANDLE_VALUE; + INIT_FLIST_HEAD(&sk_out->list); + __fio_sem_init(&sk_out->lock, FIO_SEM_UNLOCKED); + __fio_sem_init(&sk_out->wait, FIO_SEM_LOCKED); + __fio_sem_init(&sk_out->xmit, FIO_SEM_UNLOCKED); + + get_my_addr_str(sk); + + if (!WriteFile(hpipe, msg, strlen(msg), NULL, NULL)) { + log_err("couldnt write pipe\n"); + close(sk); + return -1; + } + CloseHandle(hpipe); + + sk_out_assign(sk_out); + + ret = handle_connection(sk_out); + __sk_out_drop(sk_out); + return ret; +} +#endif + static int accept_loop(int listen_sk) { struct sockaddr_in addr; @@ -1317,8 +1529,11 @@ static int accept_loop(int listen_sk) struct sk_out *sk_out; const char *from; char buf[64]; +#ifdef WIN32 + HANDLE hProcess; +#else pid_t pid; - +#endif pfd.fd = listen_sk; pfd.events = POLLIN; do { @@ -1376,6 +1591,13 @@ static int accept_loop(int listen_sk) __fio_sem_init(&sk_out->wait, FIO_SEM_LOCKED); __fio_sem_init(&sk_out->xmit, FIO_SEM_UNLOCKED); +#ifdef WIN32 + hProcess = windows_handle_connection(hjob, sk); + if (hProcess == INVALID_HANDLE_VALUE) + return -1; + sk_out->hProcess = hProcess; + fio_server_add_conn_pid(&conn_list, hProcess); +#else pid = fork(); if (pid) { close(sk); @@ -1392,6 +1614,7 @@ static int accept_loop(int listen_sk) */ sk_out_assign(sk_out); handle_connection(sk_out); +#endif } return exitval; @@ -1465,8 +1688,11 @@ void fio_server_send_ts(struct thread_stat *ts, struct group_run_stats *rs) { struct cmd_ts_pdu p; int i, j, k; - void *ss_buf; - uint64_t *ss_iops, *ss_bw; + size_t clat_prio_stats_extra_size = 0; + size_t ss_extra_size = 0; + size_t extended_buf_size = 0; + void *extended_buf; + void *extended_buf_wp; dprint(FD_NET, "server sending end stats\n"); @@ -1480,9 +1706,12 @@ void fio_server_send_ts(struct thread_stat *ts, struct group_run_stats *rs) p.ts.error = cpu_to_le32(ts->error); p.ts.thread_number = cpu_to_le32(ts->thread_number); p.ts.groupid = cpu_to_le32(ts->groupid); + p.ts.job_start = cpu_to_le64(ts->job_start); p.ts.pid = cpu_to_le32(ts->pid); p.ts.members = cpu_to_le32(ts->members); p.ts.unified_rw_rep = cpu_to_le32(ts->unified_rw_rep); + p.ts.ioprio = cpu_to_le32(ts->ioprio); + p.ts.disable_prio_stat = cpu_to_le32(ts->disable_prio_stat); for (i = 0; i < DDIR_RWDIR_CNT; i++) { convert_io_stat(&p.ts.clat_stat[i], &ts->clat_stat[i]); @@ -1577,38 +1806,87 @@ void fio_server_send_ts(struct thread_stat *ts, struct group_run_stats *rs) p.ts.cachehit = cpu_to_le64(ts->cachehit); p.ts.cachemiss = cpu_to_le64(ts->cachemiss); + convert_gs(&p.rs, rs); + for (i = 0; i < DDIR_RWDIR_CNT; i++) { - for (j = 0; j < FIO_IO_U_PLAT_NR; j++) { - p.ts.io_u_plat_high_prio[i][j] = cpu_to_le64(ts->io_u_plat_high_prio[i][j]); - p.ts.io_u_plat_low_prio[i][j] = cpu_to_le64(ts->io_u_plat_low_prio[i][j]); + if (ts->nr_clat_prio[i]) + clat_prio_stats_extra_size += ts->nr_clat_prio[i] * sizeof(*ts->clat_prio[i]); + } + extended_buf_size += clat_prio_stats_extra_size; + + dprint(FD_NET, "ts->ss_state = %d\n", ts->ss_state); + if (ts->ss_state & FIO_SS_DATA) + ss_extra_size = 2 * ts->ss_dur * sizeof(uint64_t); + + extended_buf_size += ss_extra_size; + if (!extended_buf_size) { + fio_net_queue_cmd(FIO_NET_CMD_TS, &p, sizeof(p), NULL, SK_F_COPY); + return; + } + + extended_buf_size += sizeof(p); + extended_buf = calloc(1, extended_buf_size); + if (!extended_buf) { + log_err("fio: failed to allocate FIO_NET_CMD_TS buffer\n"); + return; + } + + memcpy(extended_buf, &p, sizeof(p)); + extended_buf_wp = (struct cmd_ts_pdu *)extended_buf + 1; + + if (clat_prio_stats_extra_size) { + for (i = 0; i < DDIR_RWDIR_CNT; i++) { + struct clat_prio_stat *prio = (struct clat_prio_stat *) extended_buf_wp; + + for (j = 0; j < ts->nr_clat_prio[i]; j++) { + for (k = 0; k < FIO_IO_U_PLAT_NR; k++) + prio->io_u_plat[k] = + cpu_to_le64(ts->clat_prio[i][j].io_u_plat[k]); + convert_io_stat(&prio->clat_stat, + &ts->clat_prio[i][j].clat_stat); + prio->ioprio = cpu_to_le32(ts->clat_prio[i][j].ioprio); + prio++; + } + + if (ts->nr_clat_prio[i]) { + uint64_t offset = (char *)extended_buf_wp - (char *)extended_buf; + struct cmd_ts_pdu *ptr = extended_buf; + + ptr->ts.clat_prio_offset[i] = cpu_to_le64(offset); + ptr->ts.nr_clat_prio[i] = cpu_to_le32(ts->nr_clat_prio[i]); + } + + extended_buf_wp = prio; } - convert_io_stat(&p.ts.clat_high_prio_stat[i], &ts->clat_high_prio_stat[i]); - convert_io_stat(&p.ts.clat_low_prio_stat[i], &ts->clat_low_prio_stat[i]); } - convert_gs(&p.rs, rs); + if (ss_extra_size) { + uint64_t *ss_iops, *ss_bw; + uint64_t offset; + struct cmd_ts_pdu *ptr = extended_buf; - dprint(FD_NET, "ts->ss_state = %d\n", ts->ss_state); - if (ts->ss_state & FIO_SS_DATA) { dprint(FD_NET, "server sending steadystate ring buffers\n"); - ss_buf = malloc(sizeof(p) + 2*ts->ss_dur*sizeof(uint64_t)); + /* ss iops */ + ss_iops = (uint64_t *) extended_buf_wp; + for (i = 0; i < ts->ss_dur; i++) + ss_iops[i] = cpu_to_le64(ts->ss_iops_data[i]); - memcpy(ss_buf, &p, sizeof(p)); + offset = (char *)extended_buf_wp - (char *)extended_buf; + ptr->ts.ss_iops_data_offset = cpu_to_le64(offset); + extended_buf_wp = ss_iops + (int) ts->ss_dur; - ss_iops = (uint64_t *) ((struct cmd_ts_pdu *)ss_buf + 1); - ss_bw = ss_iops + (int) ts->ss_dur; - for (i = 0; i < ts->ss_dur; i++) { - ss_iops[i] = cpu_to_le64(ts->ss_iops_data[i]); + /* ss bw */ + ss_bw = extended_buf_wp; + for (i = 0; i < ts->ss_dur; i++) ss_bw[i] = cpu_to_le64(ts->ss_bw_data[i]); - } - - fio_net_queue_cmd(FIO_NET_CMD_TS, ss_buf, sizeof(p) + 2*ts->ss_dur*sizeof(uint64_t), NULL, SK_F_COPY); - free(ss_buf); + offset = (char *)extended_buf_wp - (char *)extended_buf; + ptr->ts.ss_bw_data_offset = cpu_to_le64(offset); } - else - fio_net_queue_cmd(FIO_NET_CMD_TS, &p, sizeof(p), NULL, SK_F_COPY); + + fio_net_queue_cmd(FIO_NET_CMD_TS, extended_buf, extended_buf_size, NULL, SK_F_COPY); + free(extended_buf); } void fio_server_send_gs(struct group_run_stats *rs) @@ -1981,6 +2259,7 @@ int fio_send_iolog(struct thread_data *td, struct io_log *log, const char *name) .thread_number = cpu_to_le32(td->thread_number), .log_type = cpu_to_le32(log->log_type), .log_hist_coarseness = cpu_to_le32(log->hist_coarseness), + .per_job_logs = cpu_to_le32(td->o.per_job_logs), }; struct sk_entry *first; struct flist_head *entry; @@ -2009,7 +2288,10 @@ int fio_send_iolog(struct thread_data *td, struct io_log *log, const char *name) struct io_sample *s = get_sample(log, cur_log, i); s->time = cpu_to_le64(s->time); - s->data.val = cpu_to_le64(s->data.val); + if (log->log_type != IO_LOG_TYPE_HIST) { + s->data.val.val0 = cpu_to_le64(s->data.val.val0); + s->data.val.val1 = cpu_to_le64(s->data.val.val1); + } s->__ddir = __cpu_to_le32(s->__ddir); s->bs = cpu_to_le64(s->bs); @@ -2047,22 +2329,29 @@ int fio_send_iolog(struct thread_data *td, struct io_log *log, const char *name) void fio_server_send_add_job(struct thread_data *td) { - struct cmd_add_job_pdu pdu = { - .thread_number = cpu_to_le32(td->thread_number), - .groupid = cpu_to_le32(td->groupid), - }; + struct cmd_add_job_pdu *pdu; + size_t cmd_sz = offsetof(struct cmd_add_job_pdu, top) + + thread_options_pack_size(&td->o); - convert_thread_options_to_net(&pdu.top, &td->o); + pdu = malloc(cmd_sz); + pdu->thread_number = cpu_to_le32(td->thread_number); + pdu->groupid = cpu_to_le32(td->groupid); - fio_net_queue_cmd(FIO_NET_CMD_ADD_JOB, &pdu, sizeof(pdu), NULL, - SK_F_COPY); + convert_thread_options_to_net(&pdu->top, &td->o); + + fio_net_queue_cmd(FIO_NET_CMD_ADD_JOB, pdu, cmd_sz, NULL, SK_F_COPY); + free(pdu); } void fio_server_send_start(struct thread_data *td) { struct sk_out *sk_out = pthread_getspecific(sk_out_key); - assert(sk_out->sk != -1); + if (sk_out->sk == -1) { + log_err("pthread getting specific for key failed, sk_out %p, sk %i, err: %i:%s", + sk_out, sk_out->sk, errno, strerror(errno)); + abort(); + } fio_net_queue_cmd(FIO_NET_CMD_SERVER_START, NULL, 0, NULL, SK_F_SIMPLE); } @@ -2457,6 +2746,11 @@ static void set_sig_handlers(void) }; sigaction(SIGINT, &act, NULL); + + /* Windows uses SIGBREAK as a quit signal from other applications */ +#ifdef WIN32 + sigaction(SIGBREAK, &act, NULL); +#endif } void fio_server_destroy_sk_key(void) @@ -2484,12 +2778,25 @@ static int fio_server(void) if (fio_handle_server_arg()) return -1; + set_sig_handlers(); + +#ifdef WIN32 + /* if this is a child process, go handle the connection */ + if (fio_server_pipe_name != NULL) { + ret = handle_connection_process(); + return ret; + } + + /* job to link child processes so they terminate together */ + hjob = windows_create_job(); + if (hjob == INVALID_HANDLE_VALUE) + return -1; +#endif + sk = fio_init_server_connection(); if (sk < 0) return -1; - set_sig_handlers(); - ret = accept_loop(sk); close(sk); @@ -2630,3 +2937,10 @@ void fio_server_set_arg(const char *arg) { fio_server_arg = strdup(arg); } + +#ifdef WIN32 +void fio_server_internal_set(const char *arg) +{ + fio_server_pipe_name = strdup(arg); +} +#endif