first async IO code drop
authorchenh <hchen@redhat.com>
Wed, 2 Apr 2014 17:01:01 +0000 (13:01 -0400)
committerchenh <hchen@redhat.com>
Wed, 2 Apr 2014 17:01:01 +0000 (13:01 -0400)
Makefile
engines/gfapi.h [new file with mode: 0644]
engines/glusterfs.c
engines/glusterfs_async.c [new file with mode: 0644]
engines/glusterfs_sync.c [new file with mode: 0644]
fio.1
options.c

index a072d3a4d54bcb3b5f67f6c51bf33ce17f9ceec8..499f8e4999600996dddfa49be6471576f0b7617a 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -93,6 +93,8 @@ ifndef CONFIG_INET_ATON
 endif
 ifdef CONFIG_GFAPI
   SOURCE += engines/glusterfs.c
+  SOURCE += engines/glusterfs_sync.c
+  SOURCE += engines/glusterfs_async.c
 endif
 
 ifeq ($(CONFIG_TARGET_OS), Linux)
diff --git a/engines/gfapi.h b/engines/gfapi.h
new file mode 100644 (file)
index 0000000..d473815
--- /dev/null
@@ -0,0 +1,22 @@
+#include <glusterfs/api/glfs.h>
+#include <glusterfs/api/glfs-handles.h>
+#include "../fio.h"
+
+struct gf_options {
+    struct thread_data *td;
+    char *gf_vol;
+    char *gf_brick;
+};
+
+struct gf_data {
+    glfs_t *fs;
+    glfs_fd_t *fd;
+       struct io_u **aio_events;
+};
+
+extern struct fio_option gfapi_options[];
+extern int fio_gf_setup(struct thread_data *td);
+extern void fio_gf_cleanup(struct thread_data *td);
+extern int fio_gf_get_file_size(struct thread_data *td, struct fio_file *f);
+extern int fio_gf_open_file(struct thread_data *td, struct fio_file *f);
+extern int fio_gf_close_file(struct thread_data *td, struct fio_file *f);
index 2bd3e4e93c77621f2b2db94858fc14d9476fb494..e9132b79bc515cc6df70899bbeab5517e4a55045 100644 (file)
@@ -1,26 +1,13 @@
 /*
  * glusterfs engine
  *
- * IO engine using Glusterfs's gfapi interface
+ * common Glusterfs's gfapi interface
  *
  */
 
-#include <glusterfs/api/glfs.h>
-#include <glusterfs/api/glfs-handles.h>
-#include "../fio.h"
+#include "gfapi.h"
 
-struct gf_options {
-    struct thread_data *td;
-    char *gf_vol;
-    char *gf_brick;
-};
-
-struct gf_data {
-    glfs_t *fs;
-    glfs_fd_t *fd;
-};
-
-static struct fio_option options[] = {
+struct fio_option gfapi_options[] = {
     {
         .name     = "volume",
         .lname    = "Glusterfs volume",
@@ -44,7 +31,7 @@ static struct fio_option options[] = {
     },
 };
 
-static int fio_gf_setup(struct thread_data *td)
+int fio_gf_setup(struct thread_data *td)
 {
        int r = 0;
        struct gf_data *g = NULL;
@@ -61,7 +48,7 @@ static int fio_gf_setup(struct thread_data *td)
            log_err("malloc failed.\n");
            return -ENOMEM;
        }
-       g->fs = NULL; g->fd = NULL;
+       g->fs = NULL; g->fd = NULL; g->aio_events = NULL;
 
        g->fs = glfs_new (opt->gf_vol);
        if (!g->fs){
@@ -95,16 +82,29 @@ cleanup:
                 glfs_fini(g->fs);
             }
             free(g);
+            td->io_ops->data = NULL;
            }
        }
        return r;
 }
 
-static void fio_gf_cleanup(struct thread_data *td)
+void fio_gf_cleanup(struct thread_data *td)
 {
+       struct gf_data *g = td->io_ops->data;
+
+       if (g) {
+        if (g->aio_events)
+            free(g->aio_events);
+        if (g->fd)
+            glfs_close(g->fd);
+        if (g->fs)
+            glfs_fini(g->fs);
+        free(g);
+        td->io_ops->data = NULL;
+       }
 }
 
-static int fio_gf_get_file_size(struct thread_data *td, struct fio_file *f)
+int fio_gf_get_file_size(struct thread_data *td, struct fio_file *f)
 {
     struct stat buf;
     int ret;
@@ -132,7 +132,7 @@ static int fio_gf_get_file_size(struct thread_data *td, struct fio_file *f)
 
 }
 
-static int fio_gf_open_file(struct thread_data *td, struct fio_file *f)
+int fio_gf_open_file(struct thread_data *td, struct fio_file *f)
 {
 
     int flags = 0;
@@ -230,108 +230,26 @@ static int fio_gf_open_file(struct thread_data *td, struct fio_file *f)
     return ret;
 }
 
-static int fio_gf_close_file(struct thread_data *td, struct fio_file *f)
+int fio_gf_close_file(struct thread_data *td, struct fio_file *f)
 {
        int ret = 0;
        struct gf_data *g = td->io_ops->data;
 
        dprint(FD_FILE, "fd close %s\n", f->file_name);
 
-       if (g->fd && glfs_close(g->fd) < 0)
-           ret = errno;
+    if (g){
+        if (g->fd && glfs_close(g->fd) < 0)
+            ret = errno;
 
-       if (g->fs)
-           glfs_fini(g->fs);
+        if (g->fs)
+            glfs_fini(g->fs);
 
-       g->fd = NULL;
-       free(g);
+        g->fd = NULL;
+        free(g);
+    }
        td->io_ops->data = NULL;
        f->engine_data = 0;
 
        return ret;
 }
 
-#define LAST_POS(f)    ((f)->engine_data)
-static int fio_gf_prep(struct thread_data *td, struct io_u *io_u)
-{
-       struct fio_file *f = io_u->file;
-       struct gf_data *g = td->io_ops->data;
-
-       dprint(FD_FILE, "fio prep\n");
-
-       if (!ddir_rw(io_u->ddir))
-               return 0;
-
-       if (LAST_POS(f) != -1ULL && LAST_POS(f) == io_u->offset)
-               return 0;
-
-       if (glfs_lseek(g->fd, io_u->offset, SEEK_SET) < 0) {
-               td_verror(td, errno, "lseek");
-               return 1;
-       }
-
-       return 0;
-}
-
-static int fio_gf_queue(struct thread_data *td, struct io_u *io_u)
-{
-    struct gf_data *g = td->io_ops->data;
-    int ret = 0;
-
-    dprint(FD_FILE, "fio queue len %lu\n", io_u->xfer_buflen);
-    fio_ro_check(td, io_u);
-
-    if (io_u->ddir == DDIR_READ)
-        ret = glfs_read(g->fd, io_u->xfer_buf, io_u->xfer_buflen, 0);
-    else if (io_u->ddir == DDIR_WRITE)
-        ret = glfs_write(g->fd, io_u->xfer_buf, io_u->xfer_buflen, 0);
-    else {         
-        log_err("unsupported operation.\n");
-        return -EINVAL;
-    }
-    dprint(FD_FILE, "fio len %lu ret %d\n", io_u->xfer_buflen, ret);
-    if (io_u->file && ret >= 0 && ddir_rw(io_u->ddir))
-        LAST_POS(io_u->file) = io_u->offset + ret;
-
-    if (ret != (int) io_u->xfer_buflen) {
-        if (ret >= 0) {
-            io_u->resid = io_u->xfer_buflen - ret;
-            io_u->error = 0;
-            return FIO_Q_COMPLETED;
-        } else
-            io_u->error = errno;
-    }
-
-    if (io_u->error){
-        log_err("IO failed.\n");
-        td_verror(td, io_u->error, "xfer");
-    }
-
-    return FIO_Q_COMPLETED;
-
-}
-
-static struct ioengine_ops ioengine = {
-       .name               = "gfapi",
-       .version            = FIO_IOOPS_VERSION,
-       .init           = fio_gf_setup,
-       .cleanup        = fio_gf_cleanup,
-       .prep               = fio_gf_prep,
-       .queue              = fio_gf_queue,
-       .open_file          = fio_gf_open_file,
-       .close_file         = fio_gf_close_file,
-       .get_file_size  = fio_gf_get_file_size,
-       .options        = options,
-       .option_struct_size = sizeof(struct gf_options),
-       .flags              = FIO_SYNCIO | FIO_DISKLESSIO,
-};
-
-static void fio_init fio_gf_register(void)
-{
-    register_ioengine(&ioengine);
-}
-
-static void fio_exit fio_gf_unregister(void)
-{
-    unregister_ioengine(&ioengine);
-}
diff --git a/engines/glusterfs_async.c b/engines/glusterfs_async.c
new file mode 100644 (file)
index 0000000..8eee2a2
--- /dev/null
@@ -0,0 +1,171 @@
+/*
+ * glusterfs engine
+ *
+ * IO engine using Glusterfs's gfapi async interface
+ *
+ */
+#include "gfapi.h"
+
+struct fio_gf_iou {
+       struct io_u *io_u;
+       int io_complete;
+};
+
+static struct io_u *fio_gf_event(struct thread_data *td, int event)
+{
+       struct gf_data *gf_data = td->io_ops->data;
+       dprint(FD_IO, "%s\n", __FUNCTION__);
+       return gf_data->aio_events[event];
+}
+
+static int fio_gf_getevents(struct thread_data *td, unsigned int min,
+                            unsigned int max, struct timespec *t)
+{
+       struct gf_data *g = td->io_ops->data;
+       unsigned int events = 0;
+       struct io_u *io_u;
+       int i = 0;
+       struct fio_gf_iou *io = NULL;
+
+       dprint(FD_IO, "%s\n", __FUNCTION__);
+       do {
+               io_u_qiter(&td->io_u_all, io_u, i) {
+                       if (!(io_u->flags & IO_U_F_FLIGHT))
+                               continue;
+
+                       io = (struct fio_gf_iou *)io_u->engine_data;
+
+                       if (io && io->io_complete) {
+                               io->io_complete = 0;
+                               g->aio_events[events] = io_u;
+                               events++;
+                       }
+
+               }
+               if (events < min)
+                       usleep(100);
+               else
+                       break;
+
+       } while (1);
+
+       return events;
+}
+
+#define LAST_POS(f)    ((f)->engine_data)
+static int fio_gf_prep(struct thread_data *td, struct io_u *io_u)
+{
+       struct fio_file *f = io_u->file;
+       struct gf_data *g = td->io_ops->data;
+       struct fio_gf_iou *io = NULL;
+
+       dprint(FD_FILE, "fio prep\n");
+
+       io = malloc(sizeof(struct fio_gf_iou));
+    if (!io){
+               td_verror(td, errno, "malloc");
+               return 1;
+    }
+       io->io_complete = 0;
+       io->io_u = io_u;
+       io_u->engine_data = io;
+
+       g->aio_events = malloc(td->o.iodepth * sizeof(struct io_u *));
+       if (!g->aio_events){
+               td_verror(td, errno, "malloc");
+        free(io);
+        return 1;
+    }
+
+       memset(g->aio_events, 0, td->o.iodepth * sizeof(struct io_u *));
+
+       if (!ddir_rw(io_u->ddir))
+               return 0;
+
+       if (LAST_POS(f) != -1ULL && LAST_POS(f) == io_u->offset)
+               return 0;
+
+       if (glfs_lseek(g->fd, io_u->offset, SEEK_SET) < 0) {
+               td_verror(td, errno, "lseek");
+               return 1;
+       }
+       io = malloc(sizeof(struct fio_gf_iou));
+    if (!io){
+               td_verror(td, errno, "malloc");
+        return 1;
+    }
+
+       return 0;
+}
+
+static void gf_async_cb(glfs_fd_t *fd, ssize_t ret, void *data) 
+{
+       struct io_u *io_u = (struct io_u *)data;
+       struct fio_gf_iou *iou =
+           (struct fio_gf_iou *)io_u->engine_data;
+
+    dprint(FD_IO, "%s ret %lu\n", __FUNCTION__, ret);    
+    iou->io_complete = 1;
+}
+
+static int fio_gf_async_queue(struct thread_data fio_unused *td, struct io_u *io_u)
+{
+       struct gf_data *g = td->io_ops->data;
+    int r = 0;
+
+       fio_ro_check(td, io_u);
+
+       if (io_u->ddir == DDIR_READ)
+               r = glfs_pread_async(g->fd, io_u->xfer_buf, io_u->xfer_buflen, io_u->offset,
+                         0, gf_async_cb, (void *)io_u);
+       else if (io_u->ddir == DDIR_WRITE)
+               r = glfs_pread_async(g->fd, io_u->xfer_buf, io_u->xfer_buflen, io_u->offset,
+                         0, gf_async_cb, (void *)io_u);
+    else if (io_u->ddir == DDIR_SYNC) {
+        r = glfs_fsync_async(g->fd, gf_async_cb, (void *)io_u);
+    }else {        
+        log_err("unsupported operation.\n");
+        io_u->error = -EINVAL;
+        goto failed;
+    }
+    if (r){
+        log_err("glfs failed.\n");
+        io_u->error = r;
+        goto failed;
+    }
+
+       return FIO_Q_QUEUED;
+
+failed:
+       io_u->error = r;
+       td_verror(td, io_u->error, "xfer");
+       return FIO_Q_COMPLETED;
+}
+
+
+static struct ioengine_ops ioengine = {
+       .name               = "gfapi_async",
+       .version            = FIO_IOOPS_VERSION,
+       .init           = fio_gf_setup,
+       .cleanup        = fio_gf_cleanup,
+    .prep           = fio_gf_prep,
+       .queue              = fio_gf_async_queue,
+       .open_file          = fio_gf_open_file,
+       .close_file         = fio_gf_close_file,
+       .get_file_size  = fio_gf_get_file_size,
+       .getevents      = fio_gf_getevents,
+       .event          = fio_gf_event,
+       .options        = gfapi_options,
+       .option_struct_size = sizeof(struct gf_options),
+       .flags              = FIO_DISKLESSIO,
+};
+
+static void fio_init fio_gf_register(void)
+{
+    register_ioengine(&ioengine);
+}
+
+static void fio_exit fio_gf_unregister(void)
+{
+    unregister_ioengine(&ioengine);
+}
diff --git a/engines/glusterfs_sync.c b/engines/glusterfs_sync.c
new file mode 100644 (file)
index 0000000..cff6427
--- /dev/null
@@ -0,0 +1,93 @@
+/*
+ * glusterfs engine
+ *
+ * IO engine using Glusterfs's gfapi sync interface
+ *
+ */
+
+#include "gfapi.h"
+
+#define LAST_POS(f)    ((f)->engine_data)
+static int fio_gf_prep(struct thread_data *td, struct io_u *io_u)
+{
+       struct fio_file *f = io_u->file;
+       struct gf_data *g = td->io_ops->data;
+
+       dprint(FD_FILE, "fio prep\n");
+
+       if (!ddir_rw(io_u->ddir))
+               return 0;
+
+       if (LAST_POS(f) != -1ULL && LAST_POS(f) == io_u->offset)
+               return 0;
+
+       if (glfs_lseek(g->fd, io_u->offset, SEEK_SET) < 0) {
+               td_verror(td, errno, "lseek");
+               return 1;
+       }
+
+       return 0;
+}
+
+static int fio_gf_queue(struct thread_data *td, struct io_u *io_u)
+{
+    struct gf_data *g = td->io_ops->data;
+    int ret = 0;
+
+    dprint(FD_FILE, "fio queue len %lu\n", io_u->xfer_buflen);
+    fio_ro_check(td, io_u);
+
+    if (io_u->ddir == DDIR_READ)
+        ret = glfs_read(g->fd, io_u->xfer_buf, io_u->xfer_buflen, 0);
+    else if (io_u->ddir == DDIR_WRITE)
+        ret = glfs_write(g->fd, io_u->xfer_buf, io_u->xfer_buflen, 0);
+    else {         
+        log_err("unsupported operation.\n");
+        return -EINVAL;
+    }
+    dprint(FD_FILE, "fio len %lu ret %d\n", io_u->xfer_buflen, ret);
+    if (io_u->file && ret >= 0 && ddir_rw(io_u->ddir))
+        LAST_POS(io_u->file) = io_u->offset + ret;
+
+    if (ret != (int) io_u->xfer_buflen) {
+        if (ret >= 0) {
+            io_u->resid = io_u->xfer_buflen - ret;
+            io_u->error = 0;
+            return FIO_Q_COMPLETED;
+        } else
+            io_u->error = errno;
+    }
+
+    if (io_u->error){
+        log_err("IO failed.\n");
+        td_verror(td, io_u->error, "xfer");
+    }
+
+    return FIO_Q_COMPLETED;
+
+}
+
+static struct ioengine_ops ioengine = {
+       .name               = "gfapi",
+       .version            = FIO_IOOPS_VERSION,
+       .init           = fio_gf_setup,
+       .cleanup        = fio_gf_cleanup,
+       .prep               = fio_gf_prep,
+       .queue              = fio_gf_queue,
+       .open_file          = fio_gf_open_file,
+       .close_file         = fio_gf_close_file,
+       .get_file_size  = fio_gf_get_file_size,
+       .options        = gfapi_options,
+       .option_struct_size = sizeof(struct gf_options),
+       .flags              = FIO_SYNCIO | FIO_DISKLESSIO,
+};
+
+static void fio_init fio_gf_register(void)
+{
+    register_ioengine(&ioengine);
+}
+
+static void fio_exit fio_gf_unregister(void)
+{
+    unregister_ioengine(&ioengine);
+}
diff --git a/fio.1 b/fio.1
index d6198b77e5952413bb626b35fc14f78d1f181566..a23258578a49cff73123a7abf41281d5118c0553 100644 (file)
--- a/fio.1
+++ b/fio.1
@@ -596,7 +596,12 @@ without the need to use the kernel rbd driver. This ioengine defines engine spec
 options.
 .TP
 .B gfapi
-Using Glusterfs libgfapi to direct access to Glusterfs volumes without
+Using Glusterfs libgfapi sync interface to direct access to Glusterfs volumes without
+having to go through FUSE. This ioengine defines engine specific
+options.
+.TP
+.B gfapi_async
+Using Glusterfs libgfapi async interface to direct access to Glusterfs volumes without
 having to go through FUSE. This ioengine defines engine specific
 options.
 .RE
index 070d00f77f6977bdbb7e5ef71aeff8547cc5f600..c81cd73c30201b75ba1237f26a7970d436073628 100644 (file)
--- a/options.c
+++ b/options.c
@@ -1525,7 +1525,10 @@ struct fio_option fio_options[FIO_MAX_OPTS] = {
 #endif
 #ifdef CONFIG_GFAPI
                          { .ival = "gfapi",
-                           .help = "Glusterfs libgfapi based engine"
+                           .help = "Glusterfs libgfapi(sync) based engine"
+                         },
+                         { .ival = "gfapi_async",
+                           .help = "Glusterfs libgfapi(async) based engine"
                          },
 #endif