Merge branch 'master' of https://github.com/blah325/fio
authorJens Axboe <axboe@kernel.dk>
Thu, 3 Feb 2022 22:22:41 +0000 (15:22 -0700)
committerJens Axboe <axboe@kernel.dk>
Thu, 3 Feb 2022 22:22:41 +0000 (15:22 -0700)
* 'master' of https://github.com/blah325/fio:
  Added a new windows only IO engine option “no_completion_thread”.
  Add Windows support for --server.
  Avoid client calls to recv() without prior poll()

client.c
engines/windowsaio.c
init.c
optgroup.h
os/os-windows.h
os/windows/posix.c
server.c
server.h

index be8411d8232f4c547e04b246209beff433ba9efa..7a4bfc0d37ec88dfb013b2b4e77e4550264ab6de 100644 (file)
--- a/client.c
+++ b/client.c
@@ -284,9 +284,10 @@ static int fio_client_dec_jobs_eta(struct client_eta *eta, client_eta_op eta_fn)
 static void fio_drain_client_text(struct fio_client *client)
 {
        do {
-               struct fio_net_cmd *cmd;
+               struct fio_net_cmd *cmd = NULL;
 
-               cmd = fio_net_recv_cmd(client->fd, false);
+               if (fio_server_poll_fd(client->fd, POLLIN, 0))
+                       cmd = fio_net_recv_cmd(client->fd, false);
                if (!cmd)
                        break;
 
index 9868e816adb68b8e196dc42d1832b8869e36fec3..d82c80536145409356739c3a5ea94d518259f141 100644 (file)
@@ -11,6 +11,7 @@
 #include <errno.h>
 
 #include "../fio.h"
+#include "../optgroup.h"
 
 typedef BOOL (WINAPI *CANCELIOEX)(HANDLE hFile, LPOVERLAPPED lpOverlapped);
 
@@ -35,6 +36,26 @@ struct thread_ctx {
        struct windowsaio_data *wd;
 };
 
+struct windowsaio_options {
+       struct thread_data *td;
+       unsigned int no_completion_thread;
+};
+
+static struct fio_option options[] = {
+       {
+               .name   = "no_completion_thread",
+               .lname  = "No completion polling thread",
+               .type   = FIO_OPT_STR_SET,
+               .off1   = offsetof(struct windowsaio_options, no_completion_thread),
+               .help   = "Use to avoid separate completion polling thread",
+               .category = FIO_OPT_C_ENGINE,
+               .group  = FIO_OPT_G_WINDOWSAIO,
+       },
+       {
+               .name   = NULL,
+       },
+};
+
 static DWORD WINAPI IoCompletionRoutine(LPVOID lpParameter);
 
 static int fio_windowsaio_init(struct thread_data *td)
@@ -80,6 +101,7 @@ static int fio_windowsaio_init(struct thread_data *td)
                struct thread_ctx *ctx;
                struct windowsaio_data *wd;
                HANDLE hFile;
+               struct windowsaio_options *o = td->eo;
 
                hFile = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
                if (hFile == INVALID_HANDLE_VALUE) {
@@ -91,29 +113,30 @@ static int fio_windowsaio_init(struct thread_data *td)
                wd->iothread_running = TRUE;
                wd->iocp = hFile;
 
-               if (!rc)
-                       ctx = malloc(sizeof(struct thread_ctx));
+               if (o->no_completion_thread == 0) {
+                       if (!rc)
+                               ctx = malloc(sizeof(struct thread_ctx));
 
-               if (!rc && ctx == NULL) {
-                       log_err("windowsaio: failed to allocate memory for thread context structure\n");
-                       CloseHandle(hFile);
-                       rc = 1;
-               }
+                       if (!rc && ctx == NULL) {
+                               log_err("windowsaio: failed to allocate memory for thread context structure\n");
+                               CloseHandle(hFile);
+                               rc = 1;
+                       }
 
-               if (!rc) {
-                       DWORD threadid;
+                       if (!rc) {
+                               DWORD threadid;
 
-                       ctx->iocp = hFile;
-                       ctx->wd = wd;
-                       wd->iothread = CreateThread(NULL, 0, IoCompletionRoutine, ctx, 0, &threadid);
-                       if (!wd->iothread)
-                               log_err("windowsaio: failed to create io completion thread\n");
-                       else if (fio_option_is_set(&td->o, cpumask))
-                               fio_setaffinity(threadid, td->o.cpumask);
+                               ctx->iocp = hFile;
+                               ctx->wd = wd;
+                               wd->iothread = CreateThread(NULL, 0, IoCompletionRoutine, ctx, 0, &threadid);
+                               if (!wd->iothread)
+                                       log_err("windowsaio: failed to create io completion thread\n");
+                               else if (fio_option_is_set(&td->o, cpumask))
+                                       fio_setaffinity(threadid, td->o.cpumask);
+                       }
+                       if (rc || wd->iothread == NULL)
+                               rc = 1;
                }
-
-               if (rc || wd->iothread == NULL)
-                       rc = 1;
        }
 
        return rc;
@@ -302,9 +325,63 @@ static struct io_u* fio_windowsaio_event(struct thread_data *td, int event)
        return wd->aio_events[event];
 }
 
-static int fio_windowsaio_getevents(struct thread_data *td, unsigned int min,
-                                   unsigned int max,
-                                   const struct timespec *t)
+/* dequeue completion entrees directly (no separate completion thread) */
+static int fio_windowsaio_getevents_nothread(struct thread_data *td, unsigned int min,
+                                   unsigned int max, const struct timespec *t)
+{
+       struct windowsaio_data *wd = td->io_ops_data;
+       unsigned int dequeued = 0;
+       struct io_u *io_u;
+       DWORD start_count = 0;
+       DWORD end_count = 0;
+       DWORD mswait = 250;
+       struct fio_overlapped *fov;
+
+       if (t != NULL) {
+               mswait = (t->tv_sec * 1000) + (t->tv_nsec / 1000000);
+               start_count = GetTickCount();
+               end_count = start_count + (t->tv_sec * 1000) + (t->tv_nsec / 1000000);
+       }
+
+       do {
+               BOOL ret;
+               OVERLAPPED *ovl;
+
+               ULONG entries = min(16, max-dequeued);
+               OVERLAPPED_ENTRY oe[16];
+               ret = GetQueuedCompletionStatusEx(wd->iocp, oe, 16, &entries, mswait, 0);
+               if (ret && entries) {
+                       int entry_num;
+
+                       for (entry_num=0; entry_num<entries; entry_num++) {
+                               ovl = oe[entry_num].lpOverlapped;
+                               fov = CONTAINING_RECORD(ovl, struct fio_overlapped, o);
+                               io_u = fov->io_u;
+
+                               if (ovl->Internal == ERROR_SUCCESS) {
+                                       io_u->resid = io_u->xfer_buflen - ovl->InternalHigh;
+                                       io_u->error = 0;
+                               } else {
+                                       io_u->resid = io_u->xfer_buflen;
+                                       io_u->error = win_to_posix_error(GetLastError());
+                               }
+
+                               fov->io_complete = FALSE;
+                               wd->aio_events[dequeued] = io_u;
+                               dequeued++;
+                       }
+               }
+
+               if (dequeued >= min ||
+                       (t != NULL && timeout_expired(start_count, end_count)))
+                       break;
+       } while (1);
+       return dequeued;
+}
+
+/* dequeue completion entrees creates by separate IoCompletionRoutine thread */
+static int fio_windowaio_getevents_thread(struct thread_data *td, unsigned int min,
+                                   unsigned int max, const struct timespec *t)
 {
        struct windowsaio_data *wd = td->io_ops_data;
        unsigned int dequeued = 0;
@@ -334,7 +411,6 @@ static int fio_windowsaio_getevents(struct thread_data *td, unsigned int min,
                                wd->aio_events[dequeued] = io_u;
                                dequeued++;
                        }
-
                }
                if (dequeued >= min)
                        break;
@@ -353,6 +429,16 @@ static int fio_windowsaio_getevents(struct thread_data *td, unsigned int min,
        return dequeued;
 }
 
+static int fio_windowsaio_getevents(struct thread_data *td, unsigned int min,
+                                   unsigned int max, const struct timespec *t)
+{
+       struct windowsaio_options *o = td->eo;
+
+       if (o->no_completion_thread)
+               return fio_windowsaio_getevents_nothread(td, min, max, t);
+       return fio_windowaio_getevents_thread(td, min, max, t);
+}
+
 static enum fio_q_status fio_windowsaio_queue(struct thread_data *td,
                                              struct io_u *io_u)
 {
@@ -484,6 +570,8 @@ static struct ioengine_ops ioengine = {
        .get_file_size  = generic_get_file_size,
        .io_u_init      = fio_windowsaio_io_u_init,
        .io_u_free      = fio_windowsaio_io_u_free,
+       .options        = options,
+       .option_struct_size     = sizeof(struct windowsaio_options),
 };
 
 static void fio_init fio_windowsaio_register(void)
diff --git a/init.c b/init.c
index 07daaa84b17c6318727846e9634919c328fe6a07..f983dbbb6d988328b0d2b3cceb22b259b14f2c07 100644 (file)
--- a/init.c
+++ b/init.c
@@ -224,6 +224,13 @@ static struct option l_opts[FIO_NR_OPTIONS] = {
                .has_arg        = optional_argument,
                .val            = 'S',
        },
+#ifdef WIN32
+       {
+               .name           = (char *) "server-internal",
+               .has_arg        = required_argument,
+               .val            = 'N',
+       },
+#endif
        {       .name           = (char *) "daemonize",
                .has_arg        = required_argument,
                .val            = 'D',
@@ -2795,6 +2802,12 @@ int parse_cmd_line(int argc, char *argv[], int client_type)
                        exit_val = 1;
 #endif
                        break;
+#ifdef WIN32
+               case 'N':
+                       did_arg = true;
+                       fio_server_internal_set(optarg);
+                       break;
+#endif
                case 'D':
                        if (pid_file)
                                free(pid_file);
index 1fb84a296b51990f5d909c92d2ff75a52f89178e..3ac8f62a81aa96b10a825ee69dffe0f0d070076a 100644 (file)
@@ -71,6 +71,7 @@ enum opt_category_group {
        __FIO_OPT_G_LIBCUFILE,
        __FIO_OPT_G_DFS,
        __FIO_OPT_G_NFS,
+       __FIO_OPT_G_WINDOWSAIO,
 
        FIO_OPT_G_RATE          = (1ULL << __FIO_OPT_G_RATE),
        FIO_OPT_G_ZONE          = (1ULL << __FIO_OPT_G_ZONE),
@@ -116,6 +117,7 @@ enum opt_category_group {
        FIO_OPT_G_FILESTAT      = (1ULL << __FIO_OPT_G_FILESTAT),
        FIO_OPT_G_LIBCUFILE     = (1ULL << __FIO_OPT_G_LIBCUFILE),
        FIO_OPT_G_DFS           = (1ULL << __FIO_OPT_G_DFS),
+       FIO_OPT_G_WINDOWSAIO    = (1ULL << __FIO_OPT_G_WINDOWSAIO),
 };
 
 extern const struct opt_group *opt_group_from_mask(uint64_t *mask);
index 59da9dba1a2c3fbf09200f313a539926e9de762a..510b8143db1d0b538a63a1b8c67d9b79cb39dea1 100644 (file)
@@ -110,6 +110,8 @@ int nanosleep(const struct timespec *rqtp, struct timespec *rmtp);
 ssize_t pread(int fildes, void *buf, size_t nbyte, off_t offset);
 ssize_t pwrite(int fildes, const void *buf, size_t nbyte,
                off_t offset);
+HANDLE windows_handle_connection(HANDLE hjob, int sk);
+HANDLE windows_create_job(void);
 
 static inline int blockdev_size(struct fio_file *f, unsigned long long *bytes)
 {
index 09c2e4a7857bd4121441b866c70657a935fde730..0978bcf6f4638cecfe019c36b24ae15f0f1579f0 100644 (file)
@@ -1026,3 +1026,174 @@ in_addr_t inet_network(const char *cp)
        hbo = ((nbo & 0xFF) << 24) + ((nbo & 0xFF00) << 8) + ((nbo & 0xFF0000) >> 8) + ((nbo & 0xFF000000) >> 24);
        return hbo;
 }
+
+static HANDLE create_named_pipe(char *pipe_name, int wait_connect_time)
+{
+       HANDLE hpipe;
+
+       hpipe = CreateNamedPipe (
+                       pipe_name,
+                       PIPE_ACCESS_DUPLEX,
+                       PIPE_WAIT | PIPE_TYPE_BYTE,
+                       1, 0, 0, wait_connect_time, NULL);
+
+       if (hpipe == INVALID_HANDLE_VALUE) {
+               log_err("ConnectNamedPipe failed (%lu).\n", GetLastError());
+               return INVALID_HANDLE_VALUE;
+       }
+
+       if (!ConnectNamedPipe(hpipe, NULL)) {
+               log_err("ConnectNamedPipe failed (%lu).\n", GetLastError());
+               CloseHandle(hpipe);
+               return INVALID_HANDLE_VALUE;
+       }
+
+       return hpipe;
+}
+
+static BOOL windows_create_process(PROCESS_INFORMATION *pi, const char *args, HANDLE *hjob)
+{
+       LPSTR this_cmd_line = GetCommandLine();
+       LPSTR new_process_cmd_line = malloc((strlen(this_cmd_line)+strlen(args)) * sizeof(char *));
+       STARTUPINFO si = {0};
+       DWORD flags = 0;
+
+       strcpy(new_process_cmd_line, this_cmd_line);
+       strcat(new_process_cmd_line, args);
+
+       si.cb = sizeof(si);
+       memset(pi, 0, sizeof(*pi));
+
+       if ((hjob != NULL) && (*hjob != INVALID_HANDLE_VALUE))
+               flags = CREATE_SUSPENDED | CREATE_BREAKAWAY_FROM_JOB;
+
+       flags |= CREATE_NEW_CONSOLE;
+
+       if( !CreateProcess( NULL,
+               new_process_cmd_line,
+               NULL,    /* Process handle not inherited */
+               NULL,    /* Thread handle not inherited */
+               TRUE,    /* no handle inheritance */
+               flags,
+               NULL,    /* Use parent's environment block */
+               NULL,    /* Use parent's starting directory */
+               &si,
+               pi )
+       )
+       {
+               log_err("CreateProcess failed (%lu).\n", GetLastError() );
+               free(new_process_cmd_line);
+               return 1;
+       }
+       if ((hjob != NULL) && (*hjob != INVALID_HANDLE_VALUE)) {
+               BOOL ret = AssignProcessToJobObject(*hjob, pi->hProcess);
+               if (!ret) {
+                       log_err("AssignProcessToJobObject failed (%lu).\n", GetLastError() );
+                       return 1;
+               }
+
+               ResumeThread(pi->hThread);
+       }
+
+       free(new_process_cmd_line);
+       return 0;
+}
+
+HANDLE windows_create_job(void)
+{
+       JOBOBJECT_EXTENDED_LIMIT_INFORMATION jeli = { 0 };
+       BOOL success;
+       HANDLE hjob = CreateJobObject(NULL, NULL);
+
+       jeli.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
+       success = SetInformationJobObject(hjob, JobObjectExtendedLimitInformation, &jeli, sizeof(jeli));
+       if ( success == 0 ) {
+        log_err( "SetInformationJobObject failed: error %lu\n", GetLastError() );
+        return INVALID_HANDLE_VALUE;
+    }
+       return hjob;
+}
+
+/* wait for a child process to either exit or connect to a child */
+static bool monitor_process_till_connect(PROCESS_INFORMATION *pi, HANDLE *hpipe)
+{
+       bool connected = FALSE;
+       bool process_alive = TRUE;
+       char buffer[32] = {0};
+       DWORD bytes_read;
+
+       do {
+               DWORD exit_code;
+               GetExitCodeProcess(pi->hProcess, &exit_code);
+               if (exit_code != STILL_ACTIVE) {
+                       dprint(FD_PROCESS, "process %u exited %d\n", GetProcessId(pi->hProcess), exit_code);
+                       break;
+               }
+
+               memset(buffer, 0, sizeof(buffer));
+               ReadFile(*hpipe, &buffer, sizeof(buffer) - 1, &bytes_read, NULL);
+               if (bytes_read && strstr(buffer, "connected")) {
+                       dprint(FD_PROCESS, "process %u connected to client\n", GetProcessId(pi->hProcess));
+                       connected = TRUE;
+               }
+               usleep(10*1000);
+       } while (process_alive && !connected);
+       return connected;
+}
+
+/*create a process with --server-internal to emulate fork() */
+HANDLE windows_handle_connection(HANDLE hjob, int sk)
+{
+       char pipe_name[64] =  "\\\\.\\pipe\\fiointernal-";
+       char args[128] = " --server-internal=";
+       PROCESS_INFORMATION pi;
+       HANDLE hpipe = INVALID_HANDLE_VALUE;
+       WSAPROTOCOL_INFO protocol_info;
+       HANDLE ret;
+
+       sprintf(pipe_name+strlen(pipe_name), "%d", GetCurrentProcessId());
+       sprintf(args+strlen(args), "%s", pipe_name);
+
+       if (windows_create_process(&pi, args, &hjob) != 0)
+               return INVALID_HANDLE_VALUE;
+       else
+               ret = pi.hProcess;
+
+       /* duplicate socket and write the protocol_info to pipe so child can
+        * duplicate the communciation socket */
+       if (WSADuplicateSocket(sk, GetProcessId(pi.hProcess), &protocol_info)) {
+               log_err("WSADuplicateSocket failed (%lu).\n", GetLastError());
+               ret = INVALID_HANDLE_VALUE;
+               goto cleanup;
+       }
+
+       /* make a pipe with a unique name based upon processid */
+       hpipe = create_named_pipe(pipe_name, 1000);
+       if (hpipe == INVALID_HANDLE_VALUE) {
+               ret = INVALID_HANDLE_VALUE;
+               goto cleanup;
+       }
+
+       if (!WriteFile(hpipe, &protocol_info, sizeof(protocol_info), NULL, NULL)) {
+               log_err("WriteFile failed (%lu).\n", GetLastError());
+               ret = INVALID_HANDLE_VALUE;
+               goto cleanup;
+       }
+
+       dprint(FD_PROCESS, "process %d created child process %u\n", GetCurrentProcessId(), GetProcessId(pi.hProcess));
+
+       /* monitor the process until it either exits or connects. This level
+        * doesnt care which of those occurs because the result is that it
+        * needs to loop around and create another child process to monitor */
+       if (!monitor_process_till_connect(&pi, &hpipe))
+               ret = INVALID_HANDLE_VALUE;
+
+cleanup:
+       /* close the handles and pipes because this thread is done monitoring them */
+       if (ret == INVALID_HANDLE_VALUE)
+               CloseHandle(pi.hProcess);
+       CloseHandle(pi.hThread);
+       DisconnectNamedPipe(hpipe);
+       CloseHandle(hpipe);
+       return ret;
+}
\ No newline at end of file
index 90c52e01ac231f4972bbd94e91361d3fe61ad132..331c8c9fbf96b66042dff4bd69b734809d8932c6 100644 (file)
--- a/server.c
+++ b/server.c
@@ -63,12 +63,28 @@ static char me[128];
 
 static pthread_key_t sk_out_key;
 
+#ifdef WIN32
+static char *fio_server_pipe_name  = NULL;
+static HANDLE hjob = INVALID_HANDLE_VALUE;
+struct ffi_element {
+       union {
+               pthread_t thread;
+               HANDLE hProcess;
+       };
+       bool is_thread;
+};
+#endif
+
 struct fio_fork_item {
        struct flist_head list;
        int exitval;
        int signal;
        int exited;
+#ifdef WIN32
+       struct ffi_element element;
+#else
        pid_t pid;
+#endif
 };
 
 struct cmd_reply {
@@ -250,6 +266,27 @@ static int fio_send_data(int sk, const void *p, unsigned int len)
        return fio_sendv_data(sk, &iov, 1);
 }
 
+bool fio_server_poll_fd(int fd, short events, int timeout) {
+       struct pollfd pfd = {
+               .fd     = fd,
+               .events = events,
+       };
+       int ret;
+
+       ret = poll(&pfd, 1, timeout);
+       if (ret < 0) {
+               if (errno == EINTR)
+                       return false;
+               log_err("fio: poll: %s\n", strerror(errno));
+               return false;
+       } else if (!ret) {
+               return false;
+       }
+       if (pfd.revents & events)
+               return true;
+       return false;
+}
+
 static int fio_recv_data(int sk, void *buf, unsigned int len, bool wait)
 {
        int flags;
@@ -651,6 +688,63 @@ static int fio_net_queue_stop(int error, int signal)
        return fio_net_send_ack(NULL, error, signal);
 }
 
+#ifdef WIN32
+static void fio_server_add_fork_item(struct ffi_element *element, struct flist_head *list)
+{
+       struct fio_fork_item *ffi;
+
+       ffi = malloc(sizeof(*ffi));
+       ffi->exitval = 0;
+       ffi->signal = 0;
+       ffi->exited = 0;
+       ffi->element = *element;
+       flist_add_tail(&ffi->list, list);
+}
+
+static void fio_server_add_conn_pid(struct flist_head *conn_list, HANDLE hProcess)
+{
+       struct ffi_element element = {.hProcess = hProcess, .is_thread=FALSE};
+       dprint(FD_NET, "server: forked off connection job (tid=%u)\n", (int) element.thread);
+
+       fio_server_add_fork_item(&element, conn_list);
+}
+
+static void fio_server_add_job_pid(struct flist_head *job_list, pthread_t thread)
+{
+       struct ffi_element element = {.thread = thread, .is_thread=TRUE};
+       dprint(FD_NET, "server: forked off job job (tid=%u)\n", (int) element.thread);
+       fio_server_add_fork_item(&element, job_list);
+}
+
+static void fio_server_check_fork_item(struct fio_fork_item *ffi)
+{
+       int ret;
+
+       if (ffi->element.is_thread) {
+
+               ret = pthread_kill(ffi->element.thread, 0);
+               if (ret) {
+                       int rev_val;
+                       pthread_join(ffi->element.thread, (void**) &rev_val); /*if the thread is dead, then join it to get status*/
+
+                       ffi->exitval = rev_val;
+                       if (ffi->exitval)
+                               log_err("thread (tid=%u) exited with %x\n", (int) ffi->element.thread, (int) ffi->exitval);
+                       dprint(FD_PROCESS, "thread (tid=%u) exited with %x\n", (int) ffi->element.thread, (int) ffi->exitval);
+                       ffi->exited = 1;
+               }
+       } else {
+               DWORD exit_val;
+               GetExitCodeProcess(ffi->element.hProcess, &exit_val);
+
+               if (exit_val != STILL_ACTIVE) {
+                       dprint(FD_PROCESS, "process %u exited with %d\n", GetProcessId(ffi->element.hProcess), exit_val);
+                       ffi->exited = 1;
+                       ffi->exitval = exit_val;
+               }
+       }
+}
+#else
 static void fio_server_add_fork_item(pid_t pid, struct flist_head *list)
 {
        struct fio_fork_item *ffi;
@@ -698,10 +792,21 @@ static void fio_server_check_fork_item(struct fio_fork_item *ffi)
                }
        }
 }
+#endif
 
 static void fio_server_fork_item_done(struct fio_fork_item *ffi, bool stop)
 {
+#ifdef WIN32
+       if (ffi->element.is_thread)
+               dprint(FD_NET, "tid %u exited, sig=%u, exitval=%d\n", (int) ffi->element.thread, ffi->signal, ffi->exitval);
+       else {
+               dprint(FD_NET, "pid %u exited, sig=%u, exitval=%d\n", (int)  GetProcessId(ffi->element.hProcess), ffi->signal, ffi->exitval);
+               CloseHandle(ffi->element.hProcess);
+               ffi->element.hProcess = INVALID_HANDLE_VALUE;
+       }
+#else
        dprint(FD_NET, "pid %u exited, sig=%u, exitval=%d\n", (int) ffi->pid, ffi->signal, ffi->exitval);
+#endif
 
        /*
         * Fold STOP and QUIT...
@@ -762,27 +867,62 @@ static int handle_load_file_cmd(struct fio_net_cmd *cmd)
        return 0;
 }
 
-static int handle_run_cmd(struct sk_out *sk_out, struct flist_head *job_list,
-                         struct fio_net_cmd *cmd)
+#ifdef WIN32
+static void *fio_backend_thread(void *data)
 {
-       pid_t pid;
        int ret;
+       struct sk_out *sk_out = (struct sk_out *) data;
 
        sk_out_assign(sk_out);
 
+       ret = fio_backend(sk_out);
+       sk_out_drop();
+
+       pthread_exit((void*) (intptr_t) ret);
+       return NULL;
+}
+#endif
+
+static int handle_run_cmd(struct sk_out *sk_out, struct flist_head *job_list,
+                         struct fio_net_cmd *cmd)
+{
+       int ret;
+
        fio_time_init();
        set_genesis_time();
 
-       pid = fork();
-       if (pid) {
-               fio_server_add_job_pid(job_list, pid);
-               return 0;
+#ifdef WIN32
+       {
+               pthread_t thread;
+               /* both this thread and backend_thread call sk_out_assign() to double increment
+                * the ref count.  This ensures struct is valid until both threads are done with it
+                */
+               sk_out_assign(sk_out);
+               ret = pthread_create(&thread, NULL,     fio_backend_thread, sk_out);
+               if (ret) {
+                       log_err("pthread_create: %s\n", strerror(ret));
+                       return ret;
+               }
+
+               fio_server_add_job_pid(job_list, thread);
+               return ret;
        }
+#else
+    {
+               pid_t pid;
+               sk_out_assign(sk_out);
+               pid = fork();
+               if (pid) {
+                       fio_server_add_job_pid(job_list, pid);
+                       return 0;
+               }
 
-       ret = fio_backend(sk_out);
-       free_threads_shm();
-       sk_out_drop();
-       _exit(ret);
+               ret = fio_backend(sk_out);
+               free_threads_shm();
+               sk_out_drop();
+               _exit(ret);
+       }
+#endif
 }
 
 static int handle_job_cmd(struct fio_net_cmd *cmd)
@@ -1238,7 +1378,8 @@ static int handle_connection(struct sk_out *sk_out)
                if (ret < 0)
                        break;
 
-               cmd = fio_net_recv_cmd(sk_out->sk, true);
+               if (pfd.revents & POLLIN)
+                       cmd = fio_net_recv_cmd(sk_out->sk, true);
                if (!cmd) {
                        ret = -1;
                        break;
@@ -1300,6 +1441,73 @@ static int get_my_addr_str(int sk)
        return 0;
 }
 
+#ifdef WIN32
+static int handle_connection_process(void)
+{
+       WSAPROTOCOL_INFO protocol_info;
+       DWORD bytes_read;
+       HANDLE hpipe;
+       int sk;
+       struct sk_out *sk_out;
+       int ret;
+       char *msg = (char *) "connected";
+
+       log_info("server enter accept loop.  ProcessID %d\n", GetCurrentProcessId());
+
+       hpipe = CreateFile(
+                                       fio_server_pipe_name,
+                                       GENERIC_READ | GENERIC_WRITE,
+                                       0, NULL,
+                                       OPEN_EXISTING,
+                                       0, NULL);
+
+       if (hpipe == INVALID_HANDLE_VALUE) {
+               log_err("couldnt open pipe %s error %lu\n",
+                               fio_server_pipe_name, GetLastError());
+               return -1;
+       }
+
+       if (!ReadFile(hpipe, &protocol_info, sizeof(protocol_info), &bytes_read, NULL)) {
+               log_err("couldnt read pi from pipe %s error %lu\n", fio_server_pipe_name,
+                               GetLastError());
+       }
+
+       if (use_ipv6) /* use protocol_info to create a duplicate of parents socket */
+               sk = WSASocket(AF_INET6, SOCK_STREAM, 0, &protocol_info, 0, 0);
+       else
+               sk = WSASocket(AF_INET,  SOCK_STREAM, 0, &protocol_info, 0, 0);
+
+       sk_out = scalloc(1, sizeof(*sk_out));
+       if (!sk_out) {
+               CloseHandle(hpipe);
+               close(sk);
+               return -1;
+       }
+
+       sk_out->sk = sk;
+       sk_out->hProcess = INVALID_HANDLE_VALUE;
+       INIT_FLIST_HEAD(&sk_out->list);
+       __fio_sem_init(&sk_out->lock, FIO_SEM_UNLOCKED);
+       __fio_sem_init(&sk_out->wait, FIO_SEM_LOCKED);
+       __fio_sem_init(&sk_out->xmit, FIO_SEM_UNLOCKED);
+
+       get_my_addr_str(sk);
+
+       if (!WriteFile(hpipe, msg, strlen(msg), NULL, NULL)) {
+               log_err("couldnt write pipe\n");
+               close(sk);
+               return -1;
+       }
+       CloseHandle(hpipe);
+
+       sk_out_assign(sk_out);
+
+       ret = handle_connection(sk_out);
+       __sk_out_drop(sk_out);
+       return ret;
+}
+#endif
+
 static int accept_loop(int listen_sk)
 {
        struct sockaddr_in addr;
@@ -1317,8 +1525,11 @@ static int accept_loop(int listen_sk)
                struct sk_out *sk_out;
                const char *from;
                char buf[64];
+#ifdef WIN32
+               HANDLE hProcess;
+#else
                pid_t pid;
-
+#endif
                pfd.fd = listen_sk;
                pfd.events = POLLIN;
                do {
@@ -1376,6 +1587,13 @@ static int accept_loop(int listen_sk)
                __fio_sem_init(&sk_out->wait, FIO_SEM_LOCKED);
                __fio_sem_init(&sk_out->xmit, FIO_SEM_UNLOCKED);
 
+#ifdef WIN32
+               hProcess = windows_handle_connection(hjob, sk);
+               if (hProcess == INVALID_HANDLE_VALUE)
+                       return -1;
+               sk_out->hProcess = hProcess;
+               fio_server_add_conn_pid(&conn_list, hProcess);
+#else
                pid = fork();
                if (pid) {
                        close(sk);
@@ -1392,6 +1610,7 @@ static int accept_loop(int listen_sk)
                 */
                sk_out_assign(sk_out);
                handle_connection(sk_out);
+#endif
        }
 
        return exitval;
@@ -2489,12 +2708,25 @@ static int fio_server(void)
        if (fio_handle_server_arg())
                return -1;
 
+       set_sig_handlers();
+
+#ifdef WIN32
+       /* if this is a child process, go handle the connection */
+       if (fio_server_pipe_name != NULL) {
+               ret = handle_connection_process();
+               return ret;
+       }
+
+       /* job to link child processes so they terminate together */
+       hjob = windows_create_job();
+       if (hjob == INVALID_HANDLE_VALUE)
+               return -1;
+#endif
+
        sk = fio_init_server_connection();
        if (sk < 0)
                return -1;
 
-       set_sig_handlers();
-
        ret = accept_loop(sk);
 
        close(sk);
@@ -2635,3 +2867,10 @@ void fio_server_set_arg(const char *arg)
 {
        fio_server_arg = strdup(arg);
 }
+
+#ifdef WIN32
+void fio_server_internal_set(const char *arg)
+{
+       fio_server_pipe_name = strdup(arg);
+}
+#endif
\ No newline at end of file
index 25b6bbdc25dfb6f6f45e7ddb318d6a5cb9030156..35a3dc1d555441b78aef0808d0d80c7e4d92ff87 100644 (file)
--- a/server.h
+++ b/server.h
@@ -15,6 +15,9 @@ struct sk_out {
        unsigned int refs;      /* frees sk_out when it drops to zero.
                                 * protected by below ->lock */
 
+#ifdef WIN32
+       HANDLE hProcess;                /* process handle of handle_connection_process*/
+#endif
        int sk;                 /* socket fd to talk to client */
        struct fio_sem lock;    /* protects ref and below list */
        struct flist_head list; /* list of pending transmit work */
@@ -212,6 +215,7 @@ extern int fio_server_text_output(int, const char *, size_t);
 extern int fio_net_send_cmd(int, uint16_t, const void *, off_t, uint64_t *, struct flist_head *);
 extern int fio_net_send_simple_cmd(int, uint16_t, uint64_t, struct flist_head *);
 extern void fio_server_set_arg(const char *);
+extern void fio_server_internal_set(const char *);
 extern int fio_server_parse_string(const char *, char **, bool *, int *, struct in_addr *, struct in6_addr *, int *);
 extern int fio_server_parse_host(const char *, int, struct in_addr *, struct in6_addr *);
 extern const char *fio_server_op(unsigned int);
@@ -222,6 +226,7 @@ extern void fio_server_send_gs(struct group_run_stats *);
 extern void fio_server_send_du(void);
 extern void fio_server_send_job_options(struct flist_head *, unsigned int);
 extern int fio_server_get_verify_state(const char *, int, void **);
+extern bool fio_server_poll_fd(int fd, short events, int timeout);
 
 extern struct fio_net_cmd *fio_net_recv_cmd(int sk, bool wait);