Add Windows support for --server.
authorjames rizzo <james.rizzo@broadcom.com>
Thu, 16 Dec 2021 22:08:50 +0000 (15:08 -0700)
committerjames rizzo <james.rizzo@broadcom.com>
Thu, 16 Dec 2021 22:08:50 +0000 (15:08 -0700)
This required working around two calls to fork()

The first fork() was in accept_loop() where it creates a new process
to work on handle_connection(). This change set uses Window's
CreateProcess() in place of fork(). In order to support this,
it duplicates the socket connection via WSADuplicateSocket()
and passes the resulting WSAPROTOCOL_INFO to the process via a
named pipe. The pipe name is then passed to the process via a
new command line argument(--server-internal).

The second fork() was in handle_run_cmd() where it creates a new
process to work on fio_backend_thread(). This change set runs
fio_backend_thread() as a thread via pthread_create() instead of a
forked process.

There is also some supporting work in the monitoring of
spawned processes/threads in fio_server_check_conns() and
fio_server_check_jobs().

Signed-off-by: james rizzo <james.rizzo@broadcom.com>
init.c
os/os-windows.h
os/windows/posix.c
server.c
server.h

diff --git a/init.c b/init.c
index 5f069d9a5b4af0fb4e4cb2fe67861a25d020fbff..8a6066872a37439f6e05d72f404dd5a85de55861 100644 (file)
--- a/init.c
+++ b/init.c
@@ -224,6 +224,13 @@ static struct option l_opts[FIO_NR_OPTIONS] = {
                .has_arg        = optional_argument,
                .val            = 'S',
        },
+#ifdef WIN32
+       {
+               .name           = (char *) "server-internal",
+               .has_arg        = required_argument,
+               .val            = 'N',
+       },
+#endif
        {       .name           = (char *) "daemonize",
                .has_arg        = required_argument,
                .val            = 'D',
@@ -2789,6 +2796,12 @@ int parse_cmd_line(int argc, char *argv[], int client_type)
                        exit_val = 1;
 #endif
                        break;
+#ifdef WIN32
+               case 'N':
+                       did_arg = true;
+                       fio_server_internal_set(optarg);
+                       break;
+#endif
                case 'D':
                        if (pid_file)
                                free(pid_file);
index 59da9dba1a2c3fbf09200f313a539926e9de762a..510b8143db1d0b538a63a1b8c67d9b79cb39dea1 100644 (file)
@@ -110,6 +110,8 @@ int nanosleep(const struct timespec *rqtp, struct timespec *rmtp);
 ssize_t pread(int fildes, void *buf, size_t nbyte, off_t offset);
 ssize_t pwrite(int fildes, const void *buf, size_t nbyte,
                off_t offset);
+HANDLE windows_handle_connection(HANDLE hjob, int sk);
+HANDLE windows_create_job(void);
 
 static inline int blockdev_size(struct fio_file *f, unsigned long long *bytes)
 {
index 09c2e4a7857bd4121441b866c70657a935fde730..0978bcf6f4638cecfe019c36b24ae15f0f1579f0 100644 (file)
@@ -1026,3 +1026,174 @@ in_addr_t inet_network(const char *cp)
        hbo = ((nbo & 0xFF) << 24) + ((nbo & 0xFF00) << 8) + ((nbo & 0xFF0000) >> 8) + ((nbo & 0xFF000000) >> 24);
        return hbo;
 }
+
+static HANDLE create_named_pipe(char *pipe_name, int wait_connect_time)
+{
+       HANDLE hpipe;
+
+       hpipe = CreateNamedPipe (
+                       pipe_name,
+                       PIPE_ACCESS_DUPLEX,
+                       PIPE_WAIT | PIPE_TYPE_BYTE,
+                       1, 0, 0, wait_connect_time, NULL);
+
+       if (hpipe == INVALID_HANDLE_VALUE) {
+               log_err("ConnectNamedPipe failed (%lu).\n", GetLastError());
+               return INVALID_HANDLE_VALUE;
+       }
+
+       if (!ConnectNamedPipe(hpipe, NULL)) {
+               log_err("ConnectNamedPipe failed (%lu).\n", GetLastError());
+               CloseHandle(hpipe);
+               return INVALID_HANDLE_VALUE;
+       }
+
+       return hpipe;
+}
+
+static BOOL windows_create_process(PROCESS_INFORMATION *pi, const char *args, HANDLE *hjob)
+{
+       LPSTR this_cmd_line = GetCommandLine();
+       LPSTR new_process_cmd_line = malloc((strlen(this_cmd_line)+strlen(args)) * sizeof(char *));
+       STARTUPINFO si = {0};
+       DWORD flags = 0;
+
+       strcpy(new_process_cmd_line, this_cmd_line);
+       strcat(new_process_cmd_line, args);
+
+       si.cb = sizeof(si);
+       memset(pi, 0, sizeof(*pi));
+
+       if ((hjob != NULL) && (*hjob != INVALID_HANDLE_VALUE))
+               flags = CREATE_SUSPENDED | CREATE_BREAKAWAY_FROM_JOB;
+
+       flags |= CREATE_NEW_CONSOLE;
+
+       if( !CreateProcess( NULL,
+               new_process_cmd_line,
+               NULL,    /* Process handle not inherited */
+               NULL,    /* Thread handle not inherited */
+               TRUE,    /* no handle inheritance */
+               flags,
+               NULL,    /* Use parent's environment block */
+               NULL,    /* Use parent's starting directory */
+               &si,
+               pi )
+       )
+       {
+               log_err("CreateProcess failed (%lu).\n", GetLastError() );
+               free(new_process_cmd_line);
+               return 1;
+       }
+       if ((hjob != NULL) && (*hjob != INVALID_HANDLE_VALUE)) {
+               BOOL ret = AssignProcessToJobObject(*hjob, pi->hProcess);
+               if (!ret) {
+                       log_err("AssignProcessToJobObject failed (%lu).\n", GetLastError() );
+                       return 1;
+               }
+
+               ResumeThread(pi->hThread);
+       }
+
+       free(new_process_cmd_line);
+       return 0;
+}
+
+HANDLE windows_create_job(void)
+{
+       JOBOBJECT_EXTENDED_LIMIT_INFORMATION jeli = { 0 };
+       BOOL success;
+       HANDLE hjob = CreateJobObject(NULL, NULL);
+
+       jeli.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
+       success = SetInformationJobObject(hjob, JobObjectExtendedLimitInformation, &jeli, sizeof(jeli));
+       if ( success == 0 ) {
+        log_err( "SetInformationJobObject failed: error %lu\n", GetLastError() );
+        return INVALID_HANDLE_VALUE;
+    }
+       return hjob;
+}
+
+/* wait for a child process to either exit or connect to a child */
+static bool monitor_process_till_connect(PROCESS_INFORMATION *pi, HANDLE *hpipe)
+{
+       bool connected = FALSE;
+       bool process_alive = TRUE;
+       char buffer[32] = {0};
+       DWORD bytes_read;
+
+       do {
+               DWORD exit_code;
+               GetExitCodeProcess(pi->hProcess, &exit_code);
+               if (exit_code != STILL_ACTIVE) {
+                       dprint(FD_PROCESS, "process %u exited %d\n", GetProcessId(pi->hProcess), exit_code);
+                       break;
+               }
+
+               memset(buffer, 0, sizeof(buffer));
+               ReadFile(*hpipe, &buffer, sizeof(buffer) - 1, &bytes_read, NULL);
+               if (bytes_read && strstr(buffer, "connected")) {
+                       dprint(FD_PROCESS, "process %u connected to client\n", GetProcessId(pi->hProcess));
+                       connected = TRUE;
+               }
+               usleep(10*1000);
+       } while (process_alive && !connected);
+       return connected;
+}
+
+/*create a process with --server-internal to emulate fork() */
+HANDLE windows_handle_connection(HANDLE hjob, int sk)
+{
+       char pipe_name[64] =  "\\\\.\\pipe\\fiointernal-";
+       char args[128] = " --server-internal=";
+       PROCESS_INFORMATION pi;
+       HANDLE hpipe = INVALID_HANDLE_VALUE;
+       WSAPROTOCOL_INFO protocol_info;
+       HANDLE ret;
+
+       sprintf(pipe_name+strlen(pipe_name), "%d", GetCurrentProcessId());
+       sprintf(args+strlen(args), "%s", pipe_name);
+
+       if (windows_create_process(&pi, args, &hjob) != 0)
+               return INVALID_HANDLE_VALUE;
+       else
+               ret = pi.hProcess;
+
+       /* duplicate socket and write the protocol_info to pipe so child can
+        * duplicate the communciation socket */
+       if (WSADuplicateSocket(sk, GetProcessId(pi.hProcess), &protocol_info)) {
+               log_err("WSADuplicateSocket failed (%lu).\n", GetLastError());
+               ret = INVALID_HANDLE_VALUE;
+               goto cleanup;
+       }
+
+       /* make a pipe with a unique name based upon processid */
+       hpipe = create_named_pipe(pipe_name, 1000);
+       if (hpipe == INVALID_HANDLE_VALUE) {
+               ret = INVALID_HANDLE_VALUE;
+               goto cleanup;
+       }
+
+       if (!WriteFile(hpipe, &protocol_info, sizeof(protocol_info), NULL, NULL)) {
+               log_err("WriteFile failed (%lu).\n", GetLastError());
+               ret = INVALID_HANDLE_VALUE;
+               goto cleanup;
+       }
+
+       dprint(FD_PROCESS, "process %d created child process %u\n", GetCurrentProcessId(), GetProcessId(pi.hProcess));
+
+       /* monitor the process until it either exits or connects. This level
+        * doesnt care which of those occurs because the result is that it
+        * needs to loop around and create another child process to monitor */
+       if (!monitor_process_till_connect(&pi, &hpipe))
+               ret = INVALID_HANDLE_VALUE;
+
+cleanup:
+       /* close the handles and pipes because this thread is done monitoring them */
+       if (ret == INVALID_HANDLE_VALUE)
+               CloseHandle(pi.hProcess);
+       CloseHandle(pi.hThread);
+       DisconnectNamedPipe(hpipe);
+       CloseHandle(hpipe);
+       return ret;
+}
\ No newline at end of file
index 1f627e8f116acbbd3456552ed87986960eed011b..331c8c9fbf96b66042dff4bd69b734809d8932c6 100644 (file)
--- a/server.c
+++ b/server.c
@@ -63,12 +63,28 @@ static char me[128];
 
 static pthread_key_t sk_out_key;
 
+#ifdef WIN32
+static char *fio_server_pipe_name  = NULL;
+static HANDLE hjob = INVALID_HANDLE_VALUE;
+struct ffi_element {
+       union {
+               pthread_t thread;
+               HANDLE hProcess;
+       };
+       bool is_thread;
+};
+#endif
+
 struct fio_fork_item {
        struct flist_head list;
        int exitval;
        int signal;
        int exited;
+#ifdef WIN32
+       struct ffi_element element;
+#else
        pid_t pid;
+#endif
 };
 
 struct cmd_reply {
@@ -672,6 +688,63 @@ static int fio_net_queue_stop(int error, int signal)
        return fio_net_send_ack(NULL, error, signal);
 }
 
+#ifdef WIN32
+static void fio_server_add_fork_item(struct ffi_element *element, struct flist_head *list)
+{
+       struct fio_fork_item *ffi;
+
+       ffi = malloc(sizeof(*ffi));
+       ffi->exitval = 0;
+       ffi->signal = 0;
+       ffi->exited = 0;
+       ffi->element = *element;
+       flist_add_tail(&ffi->list, list);
+}
+
+static void fio_server_add_conn_pid(struct flist_head *conn_list, HANDLE hProcess)
+{
+       struct ffi_element element = {.hProcess = hProcess, .is_thread=FALSE};
+       dprint(FD_NET, "server: forked off connection job (tid=%u)\n", (int) element.thread);
+
+       fio_server_add_fork_item(&element, conn_list);
+}
+
+static void fio_server_add_job_pid(struct flist_head *job_list, pthread_t thread)
+{
+       struct ffi_element element = {.thread = thread, .is_thread=TRUE};
+       dprint(FD_NET, "server: forked off job job (tid=%u)\n", (int) element.thread);
+       fio_server_add_fork_item(&element, job_list);
+}
+
+static void fio_server_check_fork_item(struct fio_fork_item *ffi)
+{
+       int ret;
+
+       if (ffi->element.is_thread) {
+
+               ret = pthread_kill(ffi->element.thread, 0);
+               if (ret) {
+                       int rev_val;
+                       pthread_join(ffi->element.thread, (void**) &rev_val); /*if the thread is dead, then join it to get status*/
+
+                       ffi->exitval = rev_val;
+                       if (ffi->exitval)
+                               log_err("thread (tid=%u) exited with %x\n", (int) ffi->element.thread, (int) ffi->exitval);
+                       dprint(FD_PROCESS, "thread (tid=%u) exited with %x\n", (int) ffi->element.thread, (int) ffi->exitval);
+                       ffi->exited = 1;
+               }
+       } else {
+               DWORD exit_val;
+               GetExitCodeProcess(ffi->element.hProcess, &exit_val);
+
+               if (exit_val != STILL_ACTIVE) {
+                       dprint(FD_PROCESS, "process %u exited with %d\n", GetProcessId(ffi->element.hProcess), exit_val);
+                       ffi->exited = 1;
+                       ffi->exitval = exit_val;
+               }
+       }
+}
+#else
 static void fio_server_add_fork_item(pid_t pid, struct flist_head *list)
 {
        struct fio_fork_item *ffi;
@@ -719,10 +792,21 @@ static void fio_server_check_fork_item(struct fio_fork_item *ffi)
                }
        }
 }
+#endif
 
 static void fio_server_fork_item_done(struct fio_fork_item *ffi, bool stop)
 {
+#ifdef WIN32
+       if (ffi->element.is_thread)
+               dprint(FD_NET, "tid %u exited, sig=%u, exitval=%d\n", (int) ffi->element.thread, ffi->signal, ffi->exitval);
+       else {
+               dprint(FD_NET, "pid %u exited, sig=%u, exitval=%d\n", (int)  GetProcessId(ffi->element.hProcess), ffi->signal, ffi->exitval);
+               CloseHandle(ffi->element.hProcess);
+               ffi->element.hProcess = INVALID_HANDLE_VALUE;
+       }
+#else
        dprint(FD_NET, "pid %u exited, sig=%u, exitval=%d\n", (int) ffi->pid, ffi->signal, ffi->exitval);
+#endif
 
        /*
         * Fold STOP and QUIT...
@@ -783,27 +867,62 @@ static int handle_load_file_cmd(struct fio_net_cmd *cmd)
        return 0;
 }
 
-static int handle_run_cmd(struct sk_out *sk_out, struct flist_head *job_list,
-                         struct fio_net_cmd *cmd)
+#ifdef WIN32
+static void *fio_backend_thread(void *data)
 {
-       pid_t pid;
        int ret;
+       struct sk_out *sk_out = (struct sk_out *) data;
 
        sk_out_assign(sk_out);
 
+       ret = fio_backend(sk_out);
+       sk_out_drop();
+
+       pthread_exit((void*) (intptr_t) ret);
+       return NULL;
+}
+#endif
+
+static int handle_run_cmd(struct sk_out *sk_out, struct flist_head *job_list,
+                         struct fio_net_cmd *cmd)
+{
+       int ret;
+
        fio_time_init();
        set_genesis_time();
 
-       pid = fork();
-       if (pid) {
-               fio_server_add_job_pid(job_list, pid);
-               return 0;
+#ifdef WIN32
+       {
+               pthread_t thread;
+               /* both this thread and backend_thread call sk_out_assign() to double increment
+                * the ref count.  This ensures struct is valid until both threads are done with it
+                */
+               sk_out_assign(sk_out);
+               ret = pthread_create(&thread, NULL,     fio_backend_thread, sk_out);
+               if (ret) {
+                       log_err("pthread_create: %s\n", strerror(ret));
+                       return ret;
+               }
+
+               fio_server_add_job_pid(job_list, thread);
+               return ret;
        }
+#else
+    {
+               pid_t pid;
+               sk_out_assign(sk_out);
+               pid = fork();
+               if (pid) {
+                       fio_server_add_job_pid(job_list, pid);
+                       return 0;
+               }
 
-       ret = fio_backend(sk_out);
-       free_threads_shm();
-       sk_out_drop();
-       _exit(ret);
+               ret = fio_backend(sk_out);
+               free_threads_shm();
+               sk_out_drop();
+               _exit(ret);
+       }
+#endif
 }
 
 static int handle_job_cmd(struct fio_net_cmd *cmd)
@@ -1322,6 +1441,73 @@ static int get_my_addr_str(int sk)
        return 0;
 }
 
+#ifdef WIN32
+static int handle_connection_process(void)
+{
+       WSAPROTOCOL_INFO protocol_info;
+       DWORD bytes_read;
+       HANDLE hpipe;
+       int sk;
+       struct sk_out *sk_out;
+       int ret;
+       char *msg = (char *) "connected";
+
+       log_info("server enter accept loop.  ProcessID %d\n", GetCurrentProcessId());
+
+       hpipe = CreateFile(
+                                       fio_server_pipe_name,
+                                       GENERIC_READ | GENERIC_WRITE,
+                                       0, NULL,
+                                       OPEN_EXISTING,
+                                       0, NULL);
+
+       if (hpipe == INVALID_HANDLE_VALUE) {
+               log_err("couldnt open pipe %s error %lu\n",
+                               fio_server_pipe_name, GetLastError());
+               return -1;
+       }
+
+       if (!ReadFile(hpipe, &protocol_info, sizeof(protocol_info), &bytes_read, NULL)) {
+               log_err("couldnt read pi from pipe %s error %lu\n", fio_server_pipe_name,
+                               GetLastError());
+       }
+
+       if (use_ipv6) /* use protocol_info to create a duplicate of parents socket */
+               sk = WSASocket(AF_INET6, SOCK_STREAM, 0, &protocol_info, 0, 0);
+       else
+               sk = WSASocket(AF_INET,  SOCK_STREAM, 0, &protocol_info, 0, 0);
+
+       sk_out = scalloc(1, sizeof(*sk_out));
+       if (!sk_out) {
+               CloseHandle(hpipe);
+               close(sk);
+               return -1;
+       }
+
+       sk_out->sk = sk;
+       sk_out->hProcess = INVALID_HANDLE_VALUE;
+       INIT_FLIST_HEAD(&sk_out->list);
+       __fio_sem_init(&sk_out->lock, FIO_SEM_UNLOCKED);
+       __fio_sem_init(&sk_out->wait, FIO_SEM_LOCKED);
+       __fio_sem_init(&sk_out->xmit, FIO_SEM_UNLOCKED);
+
+       get_my_addr_str(sk);
+
+       if (!WriteFile(hpipe, msg, strlen(msg), NULL, NULL)) {
+               log_err("couldnt write pipe\n");
+               close(sk);
+               return -1;
+       }
+       CloseHandle(hpipe);
+
+       sk_out_assign(sk_out);
+
+       ret = handle_connection(sk_out);
+       __sk_out_drop(sk_out);
+       return ret;
+}
+#endif
+
 static int accept_loop(int listen_sk)
 {
        struct sockaddr_in addr;
@@ -1339,8 +1525,11 @@ static int accept_loop(int listen_sk)
                struct sk_out *sk_out;
                const char *from;
                char buf[64];
+#ifdef WIN32
+               HANDLE hProcess;
+#else
                pid_t pid;
-
+#endif
                pfd.fd = listen_sk;
                pfd.events = POLLIN;
                do {
@@ -1398,6 +1587,13 @@ static int accept_loop(int listen_sk)
                __fio_sem_init(&sk_out->wait, FIO_SEM_LOCKED);
                __fio_sem_init(&sk_out->xmit, FIO_SEM_UNLOCKED);
 
+#ifdef WIN32
+               hProcess = windows_handle_connection(hjob, sk);
+               if (hProcess == INVALID_HANDLE_VALUE)
+                       return -1;
+               sk_out->hProcess = hProcess;
+               fio_server_add_conn_pid(&conn_list, hProcess);
+#else
                pid = fork();
                if (pid) {
                        close(sk);
@@ -1414,6 +1610,7 @@ static int accept_loop(int listen_sk)
                 */
                sk_out_assign(sk_out);
                handle_connection(sk_out);
+#endif
        }
 
        return exitval;
@@ -2511,12 +2708,25 @@ static int fio_server(void)
        if (fio_handle_server_arg())
                return -1;
 
+       set_sig_handlers();
+
+#ifdef WIN32
+       /* if this is a child process, go handle the connection */
+       if (fio_server_pipe_name != NULL) {
+               ret = handle_connection_process();
+               return ret;
+       }
+
+       /* job to link child processes so they terminate together */
+       hjob = windows_create_job();
+       if (hjob == INVALID_HANDLE_VALUE)
+               return -1;
+#endif
+
        sk = fio_init_server_connection();
        if (sk < 0)
                return -1;
 
-       set_sig_handlers();
-
        ret = accept_loop(sk);
 
        close(sk);
@@ -2657,3 +2867,10 @@ void fio_server_set_arg(const char *arg)
 {
        fio_server_arg = strdup(arg);
 }
+
+#ifdef WIN32
+void fio_server_internal_set(const char *arg)
+{
+       fio_server_pipe_name = strdup(arg);
+}
+#endif
\ No newline at end of file
index 75600df209f874738d94e532dc427e313ba0311f..35a3dc1d555441b78aef0808d0d80c7e4d92ff87 100644 (file)
--- a/server.h
+++ b/server.h
@@ -15,6 +15,9 @@ struct sk_out {
        unsigned int refs;      /* frees sk_out when it drops to zero.
                                 * protected by below ->lock */
 
+#ifdef WIN32
+       HANDLE hProcess;                /* process handle of handle_connection_process*/
+#endif
        int sk;                 /* socket fd to talk to client */
        struct fio_sem lock;    /* protects ref and below list */
        struct flist_head list; /* list of pending transmit work */
@@ -212,6 +215,7 @@ extern int fio_server_text_output(int, const char *, size_t);
 extern int fio_net_send_cmd(int, uint16_t, const void *, off_t, uint64_t *, struct flist_head *);
 extern int fio_net_send_simple_cmd(int, uint16_t, uint64_t, struct flist_head *);
 extern void fio_server_set_arg(const char *);
+extern void fio_server_internal_set(const char *);
 extern int fio_server_parse_string(const char *, char **, bool *, int *, struct in_addr *, struct in6_addr *, int *);
 extern int fio_server_parse_host(const char *, int, struct in_addr *, struct in6_addr *);
 extern const char *fio_server_op(unsigned int);