first async IO code drop
authorchenh <hchen@redhat.com>
Wed, 2 Apr 2014 17:01:01 +0000 (13:01 -0400)
committerchenh <hchen@redhat.com>
Wed, 2 Apr 2014 17:01:01 +0000 (13:01 -0400)
Makefile
engines/gfapi.h [new file with mode: 0644]
engines/glusterfs.c
engines/glusterfs_async.c [new file with mode: 0644]
engines/glusterfs_sync.c [new file with mode: 0644]
fio.1
options.c

index a072d3a4d54bcb3b5f67f6c51bf33ce17f9ceec8..499f8e4999600996dddfa49be6471576f0b7617a 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -93,6 +93,8 @@ ifndef CONFIG_INET_ATON
 endif
 ifdef CONFIG_GFAPI
   SOURCE += engines/glusterfs.c
 endif
 ifdef CONFIG_GFAPI
   SOURCE += engines/glusterfs.c
+  SOURCE += engines/glusterfs_sync.c
+  SOURCE += engines/glusterfs_async.c
 endif
 
 ifeq ($(CONFIG_TARGET_OS), Linux)
 endif
 
 ifeq ($(CONFIG_TARGET_OS), Linux)
diff --git a/engines/gfapi.h b/engines/gfapi.h
new file mode 100644 (file)
index 0000000..d473815
--- /dev/null
@@ -0,0 +1,22 @@
+#include <glusterfs/api/glfs.h>
+#include <glusterfs/api/glfs-handles.h>
+#include "../fio.h"
+
+struct gf_options {
+    struct thread_data *td;
+    char *gf_vol;
+    char *gf_brick;
+};
+
+struct gf_data {
+    glfs_t *fs;
+    glfs_fd_t *fd;
+       struct io_u **aio_events;
+};
+
+extern struct fio_option gfapi_options[];
+extern int fio_gf_setup(struct thread_data *td);
+extern void fio_gf_cleanup(struct thread_data *td);
+extern int fio_gf_get_file_size(struct thread_data *td, struct fio_file *f);
+extern int fio_gf_open_file(struct thread_data *td, struct fio_file *f);
+extern int fio_gf_close_file(struct thread_data *td, struct fio_file *f);
index 2bd3e4e93c77621f2b2db94858fc14d9476fb494..e9132b79bc515cc6df70899bbeab5517e4a55045 100644 (file)
@@ -1,26 +1,13 @@
 /*
  * glusterfs engine
  *
 /*
  * glusterfs engine
  *
- * IO engine using Glusterfs's gfapi interface
+ * common Glusterfs's gfapi interface
  *
  */
 
  *
  */
 
-#include <glusterfs/api/glfs.h>
-#include <glusterfs/api/glfs-handles.h>
-#include "../fio.h"
+#include "gfapi.h"
 
 
-struct gf_options {
-    struct thread_data *td;
-    char *gf_vol;
-    char *gf_brick;
-};
-
-struct gf_data {
-    glfs_t *fs;
-    glfs_fd_t *fd;
-};
-
-static struct fio_option options[] = {
+struct fio_option gfapi_options[] = {
     {
         .name     = "volume",
         .lname    = "Glusterfs volume",
     {
         .name     = "volume",
         .lname    = "Glusterfs volume",
@@ -44,7 +31,7 @@ static struct fio_option options[] = {
     },
 };
 
     },
 };
 
-static int fio_gf_setup(struct thread_data *td)
+int fio_gf_setup(struct thread_data *td)
 {
        int r = 0;
        struct gf_data *g = NULL;
 {
        int r = 0;
        struct gf_data *g = NULL;
@@ -61,7 +48,7 @@ static int fio_gf_setup(struct thread_data *td)
            log_err("malloc failed.\n");
            return -ENOMEM;
        }
            log_err("malloc failed.\n");
            return -ENOMEM;
        }
-       g->fs = NULL; g->fd = NULL;
+       g->fs = NULL; g->fd = NULL; g->aio_events = NULL;
 
        g->fs = glfs_new (opt->gf_vol);
        if (!g->fs){
 
        g->fs = glfs_new (opt->gf_vol);
        if (!g->fs){
@@ -95,16 +82,29 @@ cleanup:
                 glfs_fini(g->fs);
             }
             free(g);
                 glfs_fini(g->fs);
             }
             free(g);
+            td->io_ops->data = NULL;
            }
        }
        return r;
 }
 
            }
        }
        return r;
 }
 
-static void fio_gf_cleanup(struct thread_data *td)
+void fio_gf_cleanup(struct thread_data *td)
 {
 {
+       struct gf_data *g = td->io_ops->data;
+
+       if (g) {
+        if (g->aio_events)
+            free(g->aio_events);
+        if (g->fd)
+            glfs_close(g->fd);
+        if (g->fs)
+            glfs_fini(g->fs);
+        free(g);
+        td->io_ops->data = NULL;
+       }
 }
 
 }
 
-static int fio_gf_get_file_size(struct thread_data *td, struct fio_file *f)
+int fio_gf_get_file_size(struct thread_data *td, struct fio_file *f)
 {
     struct stat buf;
     int ret;
 {
     struct stat buf;
     int ret;
@@ -132,7 +132,7 @@ static int fio_gf_get_file_size(struct thread_data *td, struct fio_file *f)
 
 }
 
 
 }
 
-static int fio_gf_open_file(struct thread_data *td, struct fio_file *f)
+int fio_gf_open_file(struct thread_data *td, struct fio_file *f)
 {
 
     int flags = 0;
 {
 
     int flags = 0;
@@ -230,108 +230,26 @@ static int fio_gf_open_file(struct thread_data *td, struct fio_file *f)
     return ret;
 }
 
     return ret;
 }
 
-static int fio_gf_close_file(struct thread_data *td, struct fio_file *f)
+int fio_gf_close_file(struct thread_data *td, struct fio_file *f)
 {
        int ret = 0;
        struct gf_data *g = td->io_ops->data;
 
        dprint(FD_FILE, "fd close %s\n", f->file_name);
 
 {
        int ret = 0;
        struct gf_data *g = td->io_ops->data;
 
        dprint(FD_FILE, "fd close %s\n", f->file_name);
 
-       if (g->fd && glfs_close(g->fd) < 0)
-           ret = errno;
+    if (g){
+        if (g->fd && glfs_close(g->fd) < 0)
+            ret = errno;
 
 
-       if (g->fs)
-           glfs_fini(g->fs);
+        if (g->fs)
+            glfs_fini(g->fs);
 
 
-       g->fd = NULL;
-       free(g);
+        g->fd = NULL;
+        free(g);
+    }
        td->io_ops->data = NULL;
        f->engine_data = 0;
 
        return ret;
 }
 
        td->io_ops->data = NULL;
        f->engine_data = 0;
 
        return ret;
 }
 
-#define LAST_POS(f)    ((f)->engine_data)
-static int fio_gf_prep(struct thread_data *td, struct io_u *io_u)
-{
-       struct fio_file *f = io_u->file;
-       struct gf_data *g = td->io_ops->data;
-
-       dprint(FD_FILE, "fio prep\n");
-
-       if (!ddir_rw(io_u->ddir))
-               return 0;
-
-       if (LAST_POS(f) != -1ULL && LAST_POS(f) == io_u->offset)
-               return 0;
-
-       if (glfs_lseek(g->fd, io_u->offset, SEEK_SET) < 0) {
-               td_verror(td, errno, "lseek");
-               return 1;
-       }
-
-       return 0;
-}
-
-static int fio_gf_queue(struct thread_data *td, struct io_u *io_u)
-{
-    struct gf_data *g = td->io_ops->data;
-    int ret = 0;
-
-    dprint(FD_FILE, "fio queue len %lu\n", io_u->xfer_buflen);
-    fio_ro_check(td, io_u);
-
-    if (io_u->ddir == DDIR_READ)
-        ret = glfs_read(g->fd, io_u->xfer_buf, io_u->xfer_buflen, 0);
-    else if (io_u->ddir == DDIR_WRITE)
-        ret = glfs_write(g->fd, io_u->xfer_buf, io_u->xfer_buflen, 0);
-    else {         
-        log_err("unsupported operation.\n");
-        return -EINVAL;
-    }
-    dprint(FD_FILE, "fio len %lu ret %d\n", io_u->xfer_buflen, ret);
-    if (io_u->file && ret >= 0 && ddir_rw(io_u->ddir))
-        LAST_POS(io_u->file) = io_u->offset + ret;
-
-    if (ret != (int) io_u->xfer_buflen) {
-        if (ret >= 0) {
-            io_u->resid = io_u->xfer_buflen - ret;
-            io_u->error = 0;
-            return FIO_Q_COMPLETED;
-        } else
-            io_u->error = errno;
-    }
-
-    if (io_u->error){
-        log_err("IO failed.\n");
-        td_verror(td, io_u->error, "xfer");
-    }
-
-    return FIO_Q_COMPLETED;
-
-}
-
-static struct ioengine_ops ioengine = {
-       .name               = "gfapi",
-       .version            = FIO_IOOPS_VERSION,
-       .init           = fio_gf_setup,
-       .cleanup        = fio_gf_cleanup,
-       .prep               = fio_gf_prep,
-       .queue              = fio_gf_queue,
-       .open_file          = fio_gf_open_file,
-       .close_file         = fio_gf_close_file,
-       .get_file_size  = fio_gf_get_file_size,
-       .options        = options,
-       .option_struct_size = sizeof(struct gf_options),
-       .flags              = FIO_SYNCIO | FIO_DISKLESSIO,
-};
-
-static void fio_init fio_gf_register(void)
-{
-    register_ioengine(&ioengine);
-}
-
-static void fio_exit fio_gf_unregister(void)
-{
-    unregister_ioengine(&ioengine);
-}
diff --git a/engines/glusterfs_async.c b/engines/glusterfs_async.c
new file mode 100644 (file)
index 0000000..8eee2a2
--- /dev/null
@@ -0,0 +1,171 @@
+/*
+ * glusterfs engine
+ *
+ * IO engine using Glusterfs's gfapi async interface
+ *
+ */
+#include "gfapi.h"
+
+struct fio_gf_iou {
+       struct io_u *io_u;
+       int io_complete;
+};
+
+static struct io_u *fio_gf_event(struct thread_data *td, int event)
+{
+       struct gf_data *gf_data = td->io_ops->data;
+       dprint(FD_IO, "%s\n", __FUNCTION__);
+       return gf_data->aio_events[event];
+}
+
+static int fio_gf_getevents(struct thread_data *td, unsigned int min,
+                            unsigned int max, struct timespec *t)
+{
+       struct gf_data *g = td->io_ops->data;
+       unsigned int events = 0;
+       struct io_u *io_u;
+       int i = 0;
+       struct fio_gf_iou *io = NULL;
+
+       dprint(FD_IO, "%s\n", __FUNCTION__);
+       do {
+               io_u_qiter(&td->io_u_all, io_u, i) {
+                       if (!(io_u->flags & IO_U_F_FLIGHT))
+                               continue;
+
+                       io = (struct fio_gf_iou *)io_u->engine_data;
+
+                       if (io && io->io_complete) {
+                               io->io_complete = 0;
+                               g->aio_events[events] = io_u;
+                               events++;
+                       }
+
+               }
+               if (events < min)
+                       usleep(100);
+               else
+                       break;
+
+       } while (1);
+
+       return events;
+}
+
+#define LAST_POS(f)    ((f)->engine_data)
+static int fio_gf_prep(struct thread_data *td, struct io_u *io_u)
+{
+       struct fio_file *f = io_u->file;
+       struct gf_data *g = td->io_ops->data;
+       struct fio_gf_iou *io = NULL;
+
+       dprint(FD_FILE, "fio prep\n");
+
+       io = malloc(sizeof(struct fio_gf_iou));
+    if (!io){
+               td_verror(td, errno, "malloc");
+               return 1;
+    }
+       io->io_complete = 0;
+       io->io_u = io_u;
+       io_u->engine_data = io;
+
+       g->aio_events = malloc(td->o.iodepth * sizeof(struct io_u *));
+       if (!g->aio_events){
+               td_verror(td, errno, "malloc");
+        free(io);
+        return 1;
+    }
+
+       memset(g->aio_events, 0, td->o.iodepth * sizeof(struct io_u *));
+
+       if (!ddir_rw(io_u->ddir))
+               return 0;
+
+       if (LAST_POS(f) != -1ULL && LAST_POS(f) == io_u->offset)
+               return 0;
+
+       if (glfs_lseek(g->fd, io_u->offset, SEEK_SET) < 0) {
+               td_verror(td, errno, "lseek");
+               return 1;
+       }
+       io = malloc(sizeof(struct fio_gf_iou));
+    if (!io){
+               td_verror(td, errno, "malloc");
+        return 1;
+    }
+
+       return 0;
+}
+
+static void gf_async_cb(glfs_fd_t *fd, ssize_t ret, void *data) 
+{
+       struct io_u *io_u = (struct io_u *)data;
+       struct fio_gf_iou *iou =
+           (struct fio_gf_iou *)io_u->engine_data;
+
+    dprint(FD_IO, "%s ret %lu\n", __FUNCTION__, ret);    
+    iou->io_complete = 1;
+}
+
+static int fio_gf_async_queue(struct thread_data fio_unused *td, struct io_u *io_u)
+{
+       struct gf_data *g = td->io_ops->data;
+    int r = 0;
+
+       fio_ro_check(td, io_u);
+
+       if (io_u->ddir == DDIR_READ)
+               r = glfs_pread_async(g->fd, io_u->xfer_buf, io_u->xfer_buflen, io_u->offset,
+                         0, gf_async_cb, (void *)io_u);
+       else if (io_u->ddir == DDIR_WRITE)
+               r = glfs_pread_async(g->fd, io_u->xfer_buf, io_u->xfer_buflen, io_u->offset,
+                         0, gf_async_cb, (void *)io_u);
+    else if (io_u->ddir == DDIR_SYNC) {
+        r = glfs_fsync_async(g->fd, gf_async_cb, (void *)io_u);
+    }else {        
+        log_err("unsupported operation.\n");
+        io_u->error = -EINVAL;
+        goto failed;
+    }
+    if (r){
+        log_err("glfs failed.\n");
+        io_u->error = r;
+        goto failed;
+    }
+
+       return FIO_Q_QUEUED;
+
+failed:
+       io_u->error = r;
+       td_verror(td, io_u->error, "xfer");
+       return FIO_Q_COMPLETED;
+}
+
+
+static struct ioengine_ops ioengine = {
+       .name               = "gfapi_async",
+       .version            = FIO_IOOPS_VERSION,
+       .init           = fio_gf_setup,
+       .cleanup        = fio_gf_cleanup,
+    .prep           = fio_gf_prep,
+       .queue              = fio_gf_async_queue,
+       .open_file          = fio_gf_open_file,
+       .close_file         = fio_gf_close_file,
+       .get_file_size  = fio_gf_get_file_size,
+       .getevents      = fio_gf_getevents,
+       .event          = fio_gf_event,
+       .options        = gfapi_options,
+       .option_struct_size = sizeof(struct gf_options),
+       .flags              = FIO_DISKLESSIO,
+};
+
+static void fio_init fio_gf_register(void)
+{
+    register_ioengine(&ioengine);
+}
+
+static void fio_exit fio_gf_unregister(void)
+{
+    unregister_ioengine(&ioengine);
+}
diff --git a/engines/glusterfs_sync.c b/engines/glusterfs_sync.c
new file mode 100644 (file)
index 0000000..cff6427
--- /dev/null
@@ -0,0 +1,93 @@
+/*
+ * glusterfs engine
+ *
+ * IO engine using Glusterfs's gfapi sync interface
+ *
+ */
+
+#include "gfapi.h"
+
+#define LAST_POS(f)    ((f)->engine_data)
+static int fio_gf_prep(struct thread_data *td, struct io_u *io_u)
+{
+       struct fio_file *f = io_u->file;
+       struct gf_data *g = td->io_ops->data;
+
+       dprint(FD_FILE, "fio prep\n");
+
+       if (!ddir_rw(io_u->ddir))
+               return 0;
+
+       if (LAST_POS(f) != -1ULL && LAST_POS(f) == io_u->offset)
+               return 0;
+
+       if (glfs_lseek(g->fd, io_u->offset, SEEK_SET) < 0) {
+               td_verror(td, errno, "lseek");
+               return 1;
+       }
+
+       return 0;
+}
+
+static int fio_gf_queue(struct thread_data *td, struct io_u *io_u)
+{
+    struct gf_data *g = td->io_ops->data;
+    int ret = 0;
+
+    dprint(FD_FILE, "fio queue len %lu\n", io_u->xfer_buflen);
+    fio_ro_check(td, io_u);
+
+    if (io_u->ddir == DDIR_READ)
+        ret = glfs_read(g->fd, io_u->xfer_buf, io_u->xfer_buflen, 0);
+    else if (io_u->ddir == DDIR_WRITE)
+        ret = glfs_write(g->fd, io_u->xfer_buf, io_u->xfer_buflen, 0);
+    else {         
+        log_err("unsupported operation.\n");
+        return -EINVAL;
+    }
+    dprint(FD_FILE, "fio len %lu ret %d\n", io_u->xfer_buflen, ret);
+    if (io_u->file && ret >= 0 && ddir_rw(io_u->ddir))
+        LAST_POS(io_u->file) = io_u->offset + ret;
+
+    if (ret != (int) io_u->xfer_buflen) {
+        if (ret >= 0) {
+            io_u->resid = io_u->xfer_buflen - ret;
+            io_u->error = 0;
+            return FIO_Q_COMPLETED;
+        } else
+            io_u->error = errno;
+    }
+
+    if (io_u->error){
+        log_err("IO failed.\n");
+        td_verror(td, io_u->error, "xfer");
+    }
+
+    return FIO_Q_COMPLETED;
+
+}
+
+static struct ioengine_ops ioengine = {
+       .name               = "gfapi",
+       .version            = FIO_IOOPS_VERSION,
+       .init           = fio_gf_setup,
+       .cleanup        = fio_gf_cleanup,
+       .prep               = fio_gf_prep,
+       .queue              = fio_gf_queue,
+       .open_file          = fio_gf_open_file,
+       .close_file         = fio_gf_close_file,
+       .get_file_size  = fio_gf_get_file_size,
+       .options        = gfapi_options,
+       .option_struct_size = sizeof(struct gf_options),
+       .flags              = FIO_SYNCIO | FIO_DISKLESSIO,
+};
+
+static void fio_init fio_gf_register(void)
+{
+    register_ioengine(&ioengine);
+}
+
+static void fio_exit fio_gf_unregister(void)
+{
+    unregister_ioengine(&ioengine);
+}
diff --git a/fio.1 b/fio.1
index d6198b77e5952413bb626b35fc14f78d1f181566..a23258578a49cff73123a7abf41281d5118c0553 100644 (file)
--- a/fio.1
+++ b/fio.1
@@ -596,7 +596,12 @@ without the need to use the kernel rbd driver. This ioengine defines engine spec
 options.
 .TP
 .B gfapi
 options.
 .TP
 .B gfapi
-Using Glusterfs libgfapi to direct access to Glusterfs volumes without
+Using Glusterfs libgfapi sync interface to direct access to Glusterfs volumes without
+having to go through FUSE. This ioengine defines engine specific
+options.
+.TP
+.B gfapi_async
+Using Glusterfs libgfapi async interface to direct access to Glusterfs volumes without
 having to go through FUSE. This ioengine defines engine specific
 options.
 .RE
 having to go through FUSE. This ioengine defines engine specific
 options.
 .RE
index 070d00f77f6977bdbb7e5ef71aeff8547cc5f600..c81cd73c30201b75ba1237f26a7970d436073628 100644 (file)
--- a/options.c
+++ b/options.c
@@ -1525,7 +1525,10 @@ struct fio_option fio_options[FIO_MAX_OPTS] = {
 #endif
 #ifdef CONFIG_GFAPI
                          { .ival = "gfapi",
 #endif
 #ifdef CONFIG_GFAPI
                          { .ival = "gfapi",
-                           .help = "Glusterfs libgfapi based engine"
+                           .help = "Glusterfs libgfapi(sync) based engine"
+                         },
+                         { .ival = "gfapi_async",
+                           .help = "Glusterfs libgfapi(async) based engine"
                          },
 #endif
 
                          },
 #endif