From: james rizzo Date: Thu, 16 Dec 2021 22:08:50 +0000 (-0700) Subject: Add Windows support for --server. X-Git-Tag: fio-3.30~78^2~1 X-Git-Url: https://git.kernel.dk/?a=commitdiff_plain;h=f8fef4c68889f8a7ae7d556d9a747b60c71b1b43;p=fio.git Add Windows support for --server. This required working around two calls to fork() The first fork() was in accept_loop() where it creates a new process to work on handle_connection(). This change set uses Window's CreateProcess() in place of fork(). In order to support this, it duplicates the socket connection via WSADuplicateSocket() and passes the resulting WSAPROTOCOL_INFO to the process via a named pipe. The pipe name is then passed to the process via a new command line argument(--server-internal). The second fork() was in handle_run_cmd() where it creates a new process to work on fio_backend_thread(). This change set runs fio_backend_thread() as a thread via pthread_create() instead of a forked process. There is also some supporting work in the monitoring of spawned processes/threads in fio_server_check_conns() and fio_server_check_jobs(). Signed-off-by: james rizzo --- diff --git a/init.c b/init.c index 5f069d9a..8a606687 100644 --- a/init.c +++ b/init.c @@ -224,6 +224,13 @@ static struct option l_opts[FIO_NR_OPTIONS] = { .has_arg = optional_argument, .val = 'S', }, +#ifdef WIN32 + { + .name = (char *) "server-internal", + .has_arg = required_argument, + .val = 'N', + }, +#endif { .name = (char *) "daemonize", .has_arg = required_argument, .val = 'D', @@ -2789,6 +2796,12 @@ int parse_cmd_line(int argc, char *argv[], int client_type) exit_val = 1; #endif break; +#ifdef WIN32 + case 'N': + did_arg = true; + fio_server_internal_set(optarg); + break; +#endif case 'D': if (pid_file) free(pid_file); diff --git a/os/os-windows.h b/os/os-windows.h index 59da9dba..510b8143 100644 --- a/os/os-windows.h +++ b/os/os-windows.h @@ -110,6 +110,8 @@ int nanosleep(const struct timespec *rqtp, struct timespec *rmtp); ssize_t pread(int fildes, void *buf, size_t nbyte, off_t offset); ssize_t pwrite(int fildes, const void *buf, size_t nbyte, off_t offset); +HANDLE windows_handle_connection(HANDLE hjob, int sk); +HANDLE windows_create_job(void); static inline int blockdev_size(struct fio_file *f, unsigned long long *bytes) { diff --git a/os/windows/posix.c b/os/windows/posix.c index 09c2e4a7..0978bcf6 100644 --- a/os/windows/posix.c +++ b/os/windows/posix.c @@ -1026,3 +1026,174 @@ in_addr_t inet_network(const char *cp) hbo = ((nbo & 0xFF) << 24) + ((nbo & 0xFF00) << 8) + ((nbo & 0xFF0000) >> 8) + ((nbo & 0xFF000000) >> 24); return hbo; } + +static HANDLE create_named_pipe(char *pipe_name, int wait_connect_time) +{ + HANDLE hpipe; + + hpipe = CreateNamedPipe ( + pipe_name, + PIPE_ACCESS_DUPLEX, + PIPE_WAIT | PIPE_TYPE_BYTE, + 1, 0, 0, wait_connect_time, NULL); + + if (hpipe == INVALID_HANDLE_VALUE) { + log_err("ConnectNamedPipe failed (%lu).\n", GetLastError()); + return INVALID_HANDLE_VALUE; + } + + if (!ConnectNamedPipe(hpipe, NULL)) { + log_err("ConnectNamedPipe failed (%lu).\n", GetLastError()); + CloseHandle(hpipe); + return INVALID_HANDLE_VALUE; + } + + return hpipe; +} + +static BOOL windows_create_process(PROCESS_INFORMATION *pi, const char *args, HANDLE *hjob) +{ + LPSTR this_cmd_line = GetCommandLine(); + LPSTR new_process_cmd_line = malloc((strlen(this_cmd_line)+strlen(args)) * sizeof(char *)); + STARTUPINFO si = {0}; + DWORD flags = 0; + + strcpy(new_process_cmd_line, this_cmd_line); + strcat(new_process_cmd_line, args); + + si.cb = sizeof(si); + memset(pi, 0, sizeof(*pi)); + + if ((hjob != NULL) && (*hjob != INVALID_HANDLE_VALUE)) + flags = CREATE_SUSPENDED | CREATE_BREAKAWAY_FROM_JOB; + + flags |= CREATE_NEW_CONSOLE; + + if( !CreateProcess( NULL, + new_process_cmd_line, + NULL, /* Process handle not inherited */ + NULL, /* Thread handle not inherited */ + TRUE, /* no handle inheritance */ + flags, + NULL, /* Use parent's environment block */ + NULL, /* Use parent's starting directory */ + &si, + pi ) + ) + { + log_err("CreateProcess failed (%lu).\n", GetLastError() ); + free(new_process_cmd_line); + return 1; + } + if ((hjob != NULL) && (*hjob != INVALID_HANDLE_VALUE)) { + BOOL ret = AssignProcessToJobObject(*hjob, pi->hProcess); + if (!ret) { + log_err("AssignProcessToJobObject failed (%lu).\n", GetLastError() ); + return 1; + } + + ResumeThread(pi->hThread); + } + + free(new_process_cmd_line); + return 0; +} + +HANDLE windows_create_job(void) +{ + JOBOBJECT_EXTENDED_LIMIT_INFORMATION jeli = { 0 }; + BOOL success; + HANDLE hjob = CreateJobObject(NULL, NULL); + + jeli.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE; + success = SetInformationJobObject(hjob, JobObjectExtendedLimitInformation, &jeli, sizeof(jeli)); + if ( success == 0 ) { + log_err( "SetInformationJobObject failed: error %lu\n", GetLastError() ); + return INVALID_HANDLE_VALUE; + } + return hjob; +} + +/* wait for a child process to either exit or connect to a child */ +static bool monitor_process_till_connect(PROCESS_INFORMATION *pi, HANDLE *hpipe) +{ + bool connected = FALSE; + bool process_alive = TRUE; + char buffer[32] = {0}; + DWORD bytes_read; + + do { + DWORD exit_code; + GetExitCodeProcess(pi->hProcess, &exit_code); + if (exit_code != STILL_ACTIVE) { + dprint(FD_PROCESS, "process %u exited %d\n", GetProcessId(pi->hProcess), exit_code); + break; + } + + memset(buffer, 0, sizeof(buffer)); + ReadFile(*hpipe, &buffer, sizeof(buffer) - 1, &bytes_read, NULL); + if (bytes_read && strstr(buffer, "connected")) { + dprint(FD_PROCESS, "process %u connected to client\n", GetProcessId(pi->hProcess)); + connected = TRUE; + } + usleep(10*1000); + } while (process_alive && !connected); + return connected; +} + +/*create a process with --server-internal to emulate fork() */ +HANDLE windows_handle_connection(HANDLE hjob, int sk) +{ + char pipe_name[64] = "\\\\.\\pipe\\fiointernal-"; + char args[128] = " --server-internal="; + PROCESS_INFORMATION pi; + HANDLE hpipe = INVALID_HANDLE_VALUE; + WSAPROTOCOL_INFO protocol_info; + HANDLE ret; + + sprintf(pipe_name+strlen(pipe_name), "%d", GetCurrentProcessId()); + sprintf(args+strlen(args), "%s", pipe_name); + + if (windows_create_process(&pi, args, &hjob) != 0) + return INVALID_HANDLE_VALUE; + else + ret = pi.hProcess; + + /* duplicate socket and write the protocol_info to pipe so child can + * duplicate the communciation socket */ + if (WSADuplicateSocket(sk, GetProcessId(pi.hProcess), &protocol_info)) { + log_err("WSADuplicateSocket failed (%lu).\n", GetLastError()); + ret = INVALID_HANDLE_VALUE; + goto cleanup; + } + + /* make a pipe with a unique name based upon processid */ + hpipe = create_named_pipe(pipe_name, 1000); + if (hpipe == INVALID_HANDLE_VALUE) { + ret = INVALID_HANDLE_VALUE; + goto cleanup; + } + + if (!WriteFile(hpipe, &protocol_info, sizeof(protocol_info), NULL, NULL)) { + log_err("WriteFile failed (%lu).\n", GetLastError()); + ret = INVALID_HANDLE_VALUE; + goto cleanup; + } + + dprint(FD_PROCESS, "process %d created child process %u\n", GetCurrentProcessId(), GetProcessId(pi.hProcess)); + + /* monitor the process until it either exits or connects. This level + * doesnt care which of those occurs because the result is that it + * needs to loop around and create another child process to monitor */ + if (!monitor_process_till_connect(&pi, &hpipe)) + ret = INVALID_HANDLE_VALUE; + +cleanup: + /* close the handles and pipes because this thread is done monitoring them */ + if (ret == INVALID_HANDLE_VALUE) + CloseHandle(pi.hProcess); + CloseHandle(pi.hThread); + DisconnectNamedPipe(hpipe); + CloseHandle(hpipe); + return ret; +} \ No newline at end of file diff --git a/server.c b/server.c index 1f627e8f..331c8c9f 100644 --- a/server.c +++ b/server.c @@ -63,12 +63,28 @@ static char me[128]; static pthread_key_t sk_out_key; +#ifdef WIN32 +static char *fio_server_pipe_name = NULL; +static HANDLE hjob = INVALID_HANDLE_VALUE; +struct ffi_element { + union { + pthread_t thread; + HANDLE hProcess; + }; + bool is_thread; +}; +#endif + struct fio_fork_item { struct flist_head list; int exitval; int signal; int exited; +#ifdef WIN32 + struct ffi_element element; +#else pid_t pid; +#endif }; struct cmd_reply { @@ -672,6 +688,63 @@ static int fio_net_queue_stop(int error, int signal) return fio_net_send_ack(NULL, error, signal); } +#ifdef WIN32 +static void fio_server_add_fork_item(struct ffi_element *element, struct flist_head *list) +{ + struct fio_fork_item *ffi; + + ffi = malloc(sizeof(*ffi)); + ffi->exitval = 0; + ffi->signal = 0; + ffi->exited = 0; + ffi->element = *element; + flist_add_tail(&ffi->list, list); +} + +static void fio_server_add_conn_pid(struct flist_head *conn_list, HANDLE hProcess) +{ + struct ffi_element element = {.hProcess = hProcess, .is_thread=FALSE}; + dprint(FD_NET, "server: forked off connection job (tid=%u)\n", (int) element.thread); + + fio_server_add_fork_item(&element, conn_list); +} + +static void fio_server_add_job_pid(struct flist_head *job_list, pthread_t thread) +{ + struct ffi_element element = {.thread = thread, .is_thread=TRUE}; + dprint(FD_NET, "server: forked off job job (tid=%u)\n", (int) element.thread); + fio_server_add_fork_item(&element, job_list); +} + +static void fio_server_check_fork_item(struct fio_fork_item *ffi) +{ + int ret; + + if (ffi->element.is_thread) { + + ret = pthread_kill(ffi->element.thread, 0); + if (ret) { + int rev_val; + pthread_join(ffi->element.thread, (void**) &rev_val); /*if the thread is dead, then join it to get status*/ + + ffi->exitval = rev_val; + if (ffi->exitval) + log_err("thread (tid=%u) exited with %x\n", (int) ffi->element.thread, (int) ffi->exitval); + dprint(FD_PROCESS, "thread (tid=%u) exited with %x\n", (int) ffi->element.thread, (int) ffi->exitval); + ffi->exited = 1; + } + } else { + DWORD exit_val; + GetExitCodeProcess(ffi->element.hProcess, &exit_val); + + if (exit_val != STILL_ACTIVE) { + dprint(FD_PROCESS, "process %u exited with %d\n", GetProcessId(ffi->element.hProcess), exit_val); + ffi->exited = 1; + ffi->exitval = exit_val; + } + } +} +#else static void fio_server_add_fork_item(pid_t pid, struct flist_head *list) { struct fio_fork_item *ffi; @@ -719,10 +792,21 @@ static void fio_server_check_fork_item(struct fio_fork_item *ffi) } } } +#endif static void fio_server_fork_item_done(struct fio_fork_item *ffi, bool stop) { +#ifdef WIN32 + if (ffi->element.is_thread) + dprint(FD_NET, "tid %u exited, sig=%u, exitval=%d\n", (int) ffi->element.thread, ffi->signal, ffi->exitval); + else { + dprint(FD_NET, "pid %u exited, sig=%u, exitval=%d\n", (int) GetProcessId(ffi->element.hProcess), ffi->signal, ffi->exitval); + CloseHandle(ffi->element.hProcess); + ffi->element.hProcess = INVALID_HANDLE_VALUE; + } +#else dprint(FD_NET, "pid %u exited, sig=%u, exitval=%d\n", (int) ffi->pid, ffi->signal, ffi->exitval); +#endif /* * Fold STOP and QUIT... @@ -783,27 +867,62 @@ static int handle_load_file_cmd(struct fio_net_cmd *cmd) return 0; } -static int handle_run_cmd(struct sk_out *sk_out, struct flist_head *job_list, - struct fio_net_cmd *cmd) +#ifdef WIN32 +static void *fio_backend_thread(void *data) { - pid_t pid; int ret; + struct sk_out *sk_out = (struct sk_out *) data; sk_out_assign(sk_out); + ret = fio_backend(sk_out); + sk_out_drop(); + + pthread_exit((void*) (intptr_t) ret); + return NULL; +} +#endif + +static int handle_run_cmd(struct sk_out *sk_out, struct flist_head *job_list, + struct fio_net_cmd *cmd) +{ + int ret; + fio_time_init(); set_genesis_time(); - pid = fork(); - if (pid) { - fio_server_add_job_pid(job_list, pid); - return 0; +#ifdef WIN32 + { + pthread_t thread; + /* both this thread and backend_thread call sk_out_assign() to double increment + * the ref count. This ensures struct is valid until both threads are done with it + */ + sk_out_assign(sk_out); + ret = pthread_create(&thread, NULL, fio_backend_thread, sk_out); + if (ret) { + log_err("pthread_create: %s\n", strerror(ret)); + return ret; + } + + fio_server_add_job_pid(job_list, thread); + return ret; } +#else + { + pid_t pid; + sk_out_assign(sk_out); + pid = fork(); + if (pid) { + fio_server_add_job_pid(job_list, pid); + return 0; + } - ret = fio_backend(sk_out); - free_threads_shm(); - sk_out_drop(); - _exit(ret); + ret = fio_backend(sk_out); + free_threads_shm(); + sk_out_drop(); + _exit(ret); + } +#endif } static int handle_job_cmd(struct fio_net_cmd *cmd) @@ -1322,6 +1441,73 @@ static int get_my_addr_str(int sk) return 0; } +#ifdef WIN32 +static int handle_connection_process(void) +{ + WSAPROTOCOL_INFO protocol_info; + DWORD bytes_read; + HANDLE hpipe; + int sk; + struct sk_out *sk_out; + int ret; + char *msg = (char *) "connected"; + + log_info("server enter accept loop. ProcessID %d\n", GetCurrentProcessId()); + + hpipe = CreateFile( + fio_server_pipe_name, + GENERIC_READ | GENERIC_WRITE, + 0, NULL, + OPEN_EXISTING, + 0, NULL); + + if (hpipe == INVALID_HANDLE_VALUE) { + log_err("couldnt open pipe %s error %lu\n", + fio_server_pipe_name, GetLastError()); + return -1; + } + + if (!ReadFile(hpipe, &protocol_info, sizeof(protocol_info), &bytes_read, NULL)) { + log_err("couldnt read pi from pipe %s error %lu\n", fio_server_pipe_name, + GetLastError()); + } + + if (use_ipv6) /* use protocol_info to create a duplicate of parents socket */ + sk = WSASocket(AF_INET6, SOCK_STREAM, 0, &protocol_info, 0, 0); + else + sk = WSASocket(AF_INET, SOCK_STREAM, 0, &protocol_info, 0, 0); + + sk_out = scalloc(1, sizeof(*sk_out)); + if (!sk_out) { + CloseHandle(hpipe); + close(sk); + return -1; + } + + sk_out->sk = sk; + sk_out->hProcess = INVALID_HANDLE_VALUE; + INIT_FLIST_HEAD(&sk_out->list); + __fio_sem_init(&sk_out->lock, FIO_SEM_UNLOCKED); + __fio_sem_init(&sk_out->wait, FIO_SEM_LOCKED); + __fio_sem_init(&sk_out->xmit, FIO_SEM_UNLOCKED); + + get_my_addr_str(sk); + + if (!WriteFile(hpipe, msg, strlen(msg), NULL, NULL)) { + log_err("couldnt write pipe\n"); + close(sk); + return -1; + } + CloseHandle(hpipe); + + sk_out_assign(sk_out); + + ret = handle_connection(sk_out); + __sk_out_drop(sk_out); + return ret; +} +#endif + static int accept_loop(int listen_sk) { struct sockaddr_in addr; @@ -1339,8 +1525,11 @@ static int accept_loop(int listen_sk) struct sk_out *sk_out; const char *from; char buf[64]; +#ifdef WIN32 + HANDLE hProcess; +#else pid_t pid; - +#endif pfd.fd = listen_sk; pfd.events = POLLIN; do { @@ -1398,6 +1587,13 @@ static int accept_loop(int listen_sk) __fio_sem_init(&sk_out->wait, FIO_SEM_LOCKED); __fio_sem_init(&sk_out->xmit, FIO_SEM_UNLOCKED); +#ifdef WIN32 + hProcess = windows_handle_connection(hjob, sk); + if (hProcess == INVALID_HANDLE_VALUE) + return -1; + sk_out->hProcess = hProcess; + fio_server_add_conn_pid(&conn_list, hProcess); +#else pid = fork(); if (pid) { close(sk); @@ -1414,6 +1610,7 @@ static int accept_loop(int listen_sk) */ sk_out_assign(sk_out); handle_connection(sk_out); +#endif } return exitval; @@ -2511,12 +2708,25 @@ static int fio_server(void) if (fio_handle_server_arg()) return -1; + set_sig_handlers(); + +#ifdef WIN32 + /* if this is a child process, go handle the connection */ + if (fio_server_pipe_name != NULL) { + ret = handle_connection_process(); + return ret; + } + + /* job to link child processes so they terminate together */ + hjob = windows_create_job(); + if (hjob == INVALID_HANDLE_VALUE) + return -1; +#endif + sk = fio_init_server_connection(); if (sk < 0) return -1; - set_sig_handlers(); - ret = accept_loop(sk); close(sk); @@ -2657,3 +2867,10 @@ void fio_server_set_arg(const char *arg) { fio_server_arg = strdup(arg); } + +#ifdef WIN32 +void fio_server_internal_set(const char *arg) +{ + fio_server_pipe_name = strdup(arg); +} +#endif \ No newline at end of file diff --git a/server.h b/server.h index 75600df2..35a3dc1d 100644 --- a/server.h +++ b/server.h @@ -15,6 +15,9 @@ struct sk_out { unsigned int refs; /* frees sk_out when it drops to zero. * protected by below ->lock */ +#ifdef WIN32 + HANDLE hProcess; /* process handle of handle_connection_process*/ +#endif int sk; /* socket fd to talk to client */ struct fio_sem lock; /* protects ref and below list */ struct flist_head list; /* list of pending transmit work */ @@ -212,6 +215,7 @@ extern int fio_server_text_output(int, const char *, size_t); extern int fio_net_send_cmd(int, uint16_t, const void *, off_t, uint64_t *, struct flist_head *); extern int fio_net_send_simple_cmd(int, uint16_t, uint64_t, struct flist_head *); extern void fio_server_set_arg(const char *); +extern void fio_server_internal_set(const char *); extern int fio_server_parse_string(const char *, char **, bool *, int *, struct in_addr *, struct in6_addr *, int *); extern int fio_server_parse_host(const char *, int, struct in_addr *, struct in6_addr *); extern const char *fio_server_op(unsigned int);