Add support for DDN's Infinite Memory Engine
authorGaëtan Bossu <gbossu@ddn.com>
Wed, 1 Aug 2018 14:27:37 +0000 (16:27 +0200)
committerGaëtan Bossu <gbossu@ddn.com>
Fri, 17 Aug 2018 19:26:51 +0000 (21:26 +0200)
Created 3 engines in engines/ime.c:
ime_psync, ime_psyncv, and ime_aio

Signed-off-by: Gaëtan Bossu <gbossu@ddn.com>
HOWTO
Makefile
configure
engines/ime.c [new file with mode: 0644]
examples/ime.fio [new file with mode: 0644]
options.c

diff --git a/HOWTO b/HOWTO
index 16c5ae3..c3dd964 100644 (file)
--- a/HOWTO
+++ b/HOWTO
@@ -1882,6 +1882,22 @@ I/O engine
                        mounted with DAX on a persistent memory device through the PMDK
                        libpmem library.
 
+               **ime_psync**
+                       Synchronous read and write using DDN's Infinite Memory Engine (IME).
+                       This engine is very basic and issues calls to IME whenever an IO is
+                       queued.
+
+               **ime_psyncv**
+                       Synchronous read and write using DDN's Infinite Memory Engine (IME).
+                       This engine uses iovecs and will try to stack as much IOs as possible
+                       (if the IOs are "contiguous" and the IO depth is not exceeded)
+                       before issuing a call to IME.
+
+               **ime_aio**
+                       Asynchronous read and write using DDN's Infinite Memory Engine (IME).
+                       This engine will try to stack as much IOs as possible by creating
+                       requests for IME. FIO will then decide when to commit these requests.
+
 I/O engine specific parameters
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
index 20d3ec1..dc28ece 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -142,6 +142,9 @@ endif
 ifdef CONFIG_LIBPMEM
   SOURCE += engines/libpmem.c
 endif
+ifdef CONFIG_IME
+  SOURCE += engines/ime.c
+endif
 
 ifeq ($(CONFIG_TARGET_OS), Linux)
   SOURCE += diskutil.c fifo.c blktrace.c cgroup.c trim.c engines/sg.c \
index 9bdc7a1..29ab39b 100755 (executable)
--- a/configure
+++ b/configure
@@ -199,6 +199,8 @@ for opt do
   ;;
   --disable-native) disable_native="yes"
   ;;
+  --with-ime=*) ime_path="$optarg"
+  ;;
   --help)
     show_help="yes"
     ;;
@@ -230,6 +232,7 @@ if test "$show_help" = "yes" ; then
   echo "--disable-optimizations Don't enable compiler optimizations"
   echo "--enable-cuda           Enable GPUDirect RDMA support"
   echo "--disable-native        Don't build for native host"
+  echo "--with-ime=             Install path for DDN's Infinite Memory Engine"
   exit $exit_val
 fi
 
@@ -1904,6 +1907,29 @@ print_config "PMDK dev-dax engine" "$devdax"
 # Report whether libpmem engine is enabled
 print_config "PMDK libpmem engine" "$pmem"
 
+##########################################
+# Check whether we support DDN's IME
+if test "$libime" != "yes" ; then
+  libime="no"
+fi
+cat > $TMPC << EOF
+#include <ime_native.h>
+int main(int argc, char **argv)
+{
+  int rc;
+  ime_native_init();
+  rc = ime_native_finalize();
+  return 0;
+}
+EOF
+if compile_prog "-I${ime_path}/include" "-L${ime_path}/lib -lim_client" "libime"; then
+  libime="yes"
+  CFLAGS="-I${ime_path}/include $CFLAGS"
+  LDFLAGS="-Wl,-rpath ${ime_path}/lib -L${ime_path}/lib $LDFLAGS"
+  LIBS="-lim_client $LIBS"
+fi
+print_config "DDN's Infinite Memory Engine" "$libime"
+
 ##########################################
 # Check if we have lex/yacc available
 yacc="no"
@@ -2394,6 +2420,9 @@ fi
 if test "$pmem" = "yes" ; then
   output_sym "CONFIG_LIBPMEM"
 fi
+if test "$libime" = "yes" ; then
+  output_sym "CONFIG_IME"
+fi
 if test "$arith" = "yes" ; then
   output_sym "CONFIG_ARITHMETIC"
   if test "$yacc_is_bison" = "yes" ; then
diff --git a/engines/ime.c b/engines/ime.c
new file mode 100644 (file)
index 0000000..4eabcbc
--- /dev/null
@@ -0,0 +1,899 @@
+/*
+ * FIO engines for DDN's Infinite Memory Engine.
+ * This file defines 3 engines: ime_psync, ime_psyncv, and ime_aio
+ *
+ * Copyright (C) 2018      DataDirect Networks. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License,
+ * version 2 as published by the Free Software Foundation..
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ */
+
+/*
+ * Some details about the new engines are given below:
+ *
+ *
+ * ime_psync:
+ * Most basic engine that issues calls to ime_native whenever an IO is queued.
+ *
+ * ime_psyncv:
+ * This engine tries to queue the IOs (by creating iovecs) if asked by FIO (via
+ * iodepth_batch). It refuses to queue when the iovecs can't be appended, and
+ * waits for FIO to issue a commit. After a call to commit and get_events, new
+ * IOs can be queued.
+ *
+ * ime_aio:
+ * This engine tries to queue the IOs (by creating iovecs) if asked by FIO (via
+ * iodepth_batch). When the iovecs can't be appended to the current request, a
+ * new request for IME is created. These requests will be issued to IME when
+ * commit is called. Contrary to ime_psyncv, there can be several requests at
+ * once. We don't need to wait for a request to terminate before creating a new
+ * one.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <linux/limits.h>
+#include <ime_native.h>
+
+#include "../fio.h"
+
+
+/**************************************************************
+ *              Types and constants definitions
+ *
+ **************************************************************/
+
+/* define constants for async IOs */
+#define FIO_IME_IN_PROGRESS -1
+#define FIO_IME_REQ_ERROR   -2
+
+/* This flag is used when some jobs were created using threads. In that
+   case, IME can't be finalized in the engine-specific cleanup function,
+   because other threads might still use IME. Instead, IME is finalized
+   in the destructor (see fio_ime_unregister), only when the flag
+   fio_ime_is_initialized is true (which means at least one thread has
+   initialized IME). */
+static bool fio_ime_is_initialized = false;
+
+struct imesio_req {
+       int                     fd;             /* File descriptor */
+       enum fio_ddir   ddir;   /* Type of IO (read or write) */
+       off_t                   offset; /* File offset */
+};
+struct imeaio_req {
+       struct ime_aiocb        iocb;                   /* IME aio request */
+       ssize_t                 status;                 /* Status of the IME request */
+       enum fio_ddir           ddir;                   /* Type of IO (read or write) */
+       pthread_cond_t          cond_endio;             /* Condition var to notify FIO */
+       pthread_mutex_t         status_mutex;   /* Mutex for cond_endio */
+};
+
+/* This structure will be used for 2 engines: ime_psyncv and ime_aio */
+struct ime_data {
+       union {
+               struct imeaio_req       *aioreqs;       /* array of aio requests */
+               struct imesio_req       *sioreq;        /* pointer to the only syncio request */
+       };
+       struct iovec    *iovecs;                /* array of queued iovecs */
+       struct io_u     **io_us;                /* array of queued io_u pointers */
+       struct io_u     **event_io_us;  /* array of the events retieved afer get_events*/
+       unsigned int    queued;                 /* iovecs/io_us in the queue */
+       unsigned int    events;                 /* number of committed iovecs/io_us */
+
+       /* variables used to implement a "ring" queue */
+       unsigned int depth;                     /* max entries in the queue */
+       unsigned int head;                      /* index used to append */
+       unsigned int tail;                      /* index used to pop */
+       unsigned int cur_commit;        /* index of the first uncommitted req */
+
+       /* offset used by the last iovec (used to check if the iovecs can be appended)*/
+       unsigned long long      last_offset;
+
+       /* The variables below are used for aio only */
+       struct imeaio_req       *last_req; /* last request awaiting committing */
+};
+
+
+/**************************************************************
+ *         Private functions for queueing/unqueueing
+ *
+ **************************************************************/
+
+static void fio_ime_queue_incr (struct ime_data *ime_d)
+{
+       ime_d->head = (ime_d->head + 1) % ime_d->depth;
+       ime_d->queued++;
+}
+static void fio_ime_queue_red (struct ime_data *ime_d)
+{
+       ime_d->tail = (ime_d->tail + 1) % ime_d->depth;
+       ime_d->queued--;
+       ime_d->events--;
+}
+static void fio_ime_queue_commit (struct ime_data *ime_d, int iovcnt)
+{
+       ime_d->cur_commit = (ime_d->cur_commit + iovcnt) % ime_d->depth;
+       ime_d->events += iovcnt;
+}
+static void fio_ime_queue_reset (struct ime_data *ime_d)
+{
+       ime_d->head = 0;
+       ime_d->tail = 0;
+       ime_d->cur_commit = 0;
+       ime_d->queued = 0;
+       ime_d->events = 0;
+}
+
+
+/**************************************************************
+ *                   General IME functions
+ *             (needed for both sync and async IOs)
+ **************************************************************/
+
+static char *fio_set_ime_filename(char* filename)
+{
+       static __thread char ime_filename[PATH_MAX];
+       if (snprintf(ime_filename, PATH_MAX, "%s%s", DEFAULT_IME_FILE_PREFIX, filename) < PATH_MAX)
+               return ime_filename;
+       else
+               return NULL;
+}
+
+static int fio_ime_get_file_size(struct thread_data *td, struct fio_file *f)
+{
+       struct stat buf;
+       int ret;
+       char *ime_filename;
+
+       dprint(FD_FILE, "get file size %s\n", f->file_name);
+
+       ime_filename = fio_set_ime_filename(f->file_name);
+       if (ime_filename == NULL)
+               return 1;
+       ret = ime_native_stat(ime_filename, &buf);
+       if (ret == -1) {
+               td_verror(td, errno, "fstat");
+               return 1;
+       }
+
+       f->real_file_size = buf.st_size;
+       return 0;
+}
+
+/* This functions mimics the generic_file_open function, but issues
+   IME native calls instead of POSIX calls. */
+static int fio_ime_open_file(struct thread_data *td, struct fio_file *f)
+{
+       int flags = 0;
+       int ret;
+       uint64_t desired_fs;
+       char *ime_filename;
+
+       dprint(FD_FILE, "fd open %s\n", f->file_name);
+
+       if (td_trim(td)) {
+               td_verror(td, EINVAL, "IME does not support TRIM operation");
+               return 1;
+       }
+
+       if (td->o.oatomic) {
+               td_verror(td, EINVAL, "IME does not support atomic IO");
+               return 1;
+       }
+       if (td->o.odirect)
+               flags |= O_DIRECT;
+       if (td->o.sync_io)
+               flags |= O_SYNC;
+       if (td->o.create_on_open && td->o.allow_create)
+               flags |= O_CREAT;
+
+       if (td_write(td)) {
+               if (!read_only)
+                       flags |= O_RDWR;
+
+               if (td->o.allow_create)
+                       flags |= O_CREAT;
+       }
+       else if (td_read(td)) {
+               flags |= O_RDONLY;
+       }
+       else {
+               /* We should never go here. */
+               td_verror(td, EINVAL, "Unsopported open mode");
+               return 1;
+       }
+
+       ime_filename = fio_set_ime_filename(f->file_name);
+       if (ime_filename == NULL)
+               return 1;
+       f->fd = ime_native_open(ime_filename, flags, 0600);
+       if (f->fd == -1) {
+               char buf[FIO_VERROR_SIZE];
+               int __e = errno;
+
+               snprintf(buf, sizeof(buf), "open(%s)", f->file_name);
+               td_verror(td, __e, buf);
+               return 1;
+       }
+
+       /* Now we need to make sure the real file size is sufficient for FIO
+          to do its things. This is normally done before the file open function
+          is called, but because FIO would use POSIX calls, we need to do it
+          ourselves */
+       ret = fio_ime_get_file_size(td, f);
+       if (ret < 0) {
+               ime_native_close(f->fd);
+               td_verror(td, errno, "ime_get_file_size");
+               return 1;
+       }
+
+       desired_fs = f->io_size + f->file_offset;
+       if (td_write(td)) {
+               dprint(FD_FILE, "Laying out file %s%s\n",
+                       DEFAULT_IME_FILE_PREFIX, f->file_name);
+               if (!td->o.create_on_open &&
+                               f->real_file_size < desired_fs &&
+                               ime_native_ftruncate(f->fd, desired_fs) < 0) {
+                       ime_native_close(f->fd);
+                       td_verror(td, errno, "ime_native_ftruncate");
+                       return 1;
+               }
+               if (f->real_file_size < desired_fs)
+                       f->real_file_size = desired_fs;
+       }
+       else if (td_read(td) && f->real_file_size < desired_fs) {
+               ime_native_close(f->fd);
+               log_err("error: can't read %lu bytes from file with "
+                                               "%lu bytes\n", desired_fs, f->real_file_size);
+               return 1;
+       }
+
+       return 0;
+}
+
+static int fio_ime_close_file(struct thread_data fio_unused *td, struct fio_file *f)
+{
+       int ret = 0;
+
+       dprint(FD_FILE, "fd close %s\n", f->file_name);
+
+       if (ime_native_close(f->fd) < 0)
+               ret = errno;
+
+       f->fd = -1;
+       return ret;
+}
+
+static int fio_ime_unlink_file(struct thread_data *td, struct fio_file *f)
+{
+       int ret;
+
+       char *ime_filename = fio_set_ime_filename(f->file_name);
+       if (ime_filename == NULL)
+               return 1;
+       ret = unlink(ime_filename);
+
+       return ret < 0 ? errno : 0;
+}
+
+static struct io_u *fio_ime_event(struct thread_data *td, int event)
+{
+       struct ime_data *ime_d = td->io_ops_data;
+       return ime_d->event_io_us[event];
+}
+
+/* Setup file used to replace get_file_sizes when settin up the file.
+   Instead we will set real_file_sie to 0 for each file. This way we
+   can avoid calling ime_native_init before the forks are created. */
+static int fio_ime_setup(struct thread_data *td)
+{
+       struct fio_file *f;
+       unsigned int i;
+
+       for_each_file(td, f, i) {
+               dprint(FD_FILE, "setup: set file size to 0 for %p/%d/%s\n",
+                       f, i, f->file_name);
+               f->real_file_size = 0;
+       }
+
+       return 0;
+}
+
+static int fio_ime_engine_init(struct thread_data *td)
+{
+       struct fio_file *f;
+       unsigned int i;
+
+       dprint(FD_IO, "ime engine init\n");
+       if (fio_ime_is_initialized && !td->o.use_thread) {
+               log_err("Warning: something might go wrong. Not all threads/forks were"
+                               " created before the FIO jobs were initialized.\n");
+       }
+
+       ime_native_init();
+       fio_ime_is_initialized = true;
+
+       /* We have to temporarily set real_file_size so that
+          FIO can initialize properly. It will be corrected
+          on file open. */
+       for_each_file(td, f, i)
+               f->real_file_size = f->io_size + f->file_offset;
+
+       return 0;
+}
+
+static void fio_ime_engine_finalize(struct thread_data *td)
+{
+       /* Only finalize IME when using forks */
+       if (!td->o.use_thread) {
+               if (ime_native_finalize() < 0)
+                       log_err("error in ime_native_finalize\n");
+               fio_ime_is_initialized = false;
+       }
+}
+
+
+/**************************************************************
+ *             Private functions for blocking IOs
+ *                     (without iovecs)
+ **************************************************************/
+
+/* Notice: this function comes from the sync engine */
+/* It is used by the commit function to return a proper code and fill
+   some attributes in the io_u used for the IO. */
+static int fio_ime_psync_end(struct thread_data *td, struct io_u *io_u, ssize_t ret)
+{
+       if (ret != (ssize_t) io_u->xfer_buflen) {
+               if (ret >= 0) {
+                       io_u->resid = io_u->xfer_buflen - ret;
+                       io_u->error = 0;
+                       return FIO_Q_COMPLETED;
+               } else
+                       io_u->error = errno;
+       }
+
+       if (io_u->error) {
+               io_u_log_error(td, io_u);
+               td_verror(td, io_u->error, "xfer");
+       }
+
+       return FIO_Q_COMPLETED;
+}
+
+static enum fio_q_status fio_ime_psync_queue(struct thread_data *td,
+                                          struct io_u *io_u)
+{
+       struct fio_file *f = io_u->file;
+       ssize_t ret;
+
+       fio_ro_check(td, io_u);
+
+       if (io_u->ddir == DDIR_READ)
+               ret = ime_native_pread(f->fd, io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
+       else if (io_u->ddir == DDIR_WRITE)
+               ret = ime_native_pwrite(f->fd, io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
+       else if (io_u->ddir == DDIR_SYNC)
+               ret = ime_native_fsync(f->fd);
+       else {
+               ret = io_u->xfer_buflen;
+               io_u->error = EINVAL;
+       }
+
+       return fio_ime_psync_end(td, io_u, ret);
+}
+
+
+/**************************************************************
+ *             Private functions for blocking IOs
+ *                       (with iovecs)
+ **************************************************************/
+
+static bool fio_ime_psyncv_can_queue(struct ime_data *ime_d, struct io_u *io_u)
+{
+       /* We can only queue if:
+         - There are no queued iovecs
+         - Or if there is at least one:
+                - There must be no event waiting for retrieval
+                - The offsets must be contiguous
+                - The ddir and fd must be the same */
+       return (ime_d->queued == 0 || (
+                       ime_d->events == 0 &&
+                       ime_d->last_offset == io_u->offset &&
+                       ime_d->sioreq->ddir == io_u->ddir &&
+                       ime_d->sioreq->fd == io_u->file->fd));
+}
+
+/* Before using this function, we should have already
+   ensured that the queue is not full */
+static void fio_ime_psyncv_enqueue(struct ime_data *ime_d, struct io_u *io_u)
+{
+       struct imesio_req *ioreq = ime_d->sioreq;
+       struct iovec *iov = &ime_d->iovecs[ime_d->head];
+
+       iov->iov_base = io_u->xfer_buf;
+       iov->iov_len = io_u->xfer_buflen;
+
+       if (ime_d->queued == 0) {
+               ioreq->offset = io_u->offset;
+               ioreq->ddir = io_u->ddir;
+               ioreq->fd = io_u->file->fd;
+       }
+
+       ime_d->io_us[ime_d->head] = io_u;
+       ime_d->last_offset = io_u->offset + io_u->xfer_buflen;
+       fio_ime_queue_incr(ime_d);
+}
+
+/* Tries to queue an IO. It will fail if the IO can't be appended to the
+   current request or if the current request has been committed but not
+   yet retrieved by get_events. */
+static enum fio_q_status fio_ime_psyncv_queue(struct thread_data *td,
+       struct io_u *io_u)
+{
+       struct ime_data *ime_d = td->io_ops_data;
+
+       fio_ro_check(td, io_u);
+
+       if (ime_d->queued == ime_d->depth)
+               return FIO_Q_BUSY;
+
+       if (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_WRITE) {
+               if (!fio_ime_psyncv_can_queue(ime_d, io_u))
+                       return FIO_Q_BUSY;
+
+               dprint(FD_IO, "queue: ddir=%d at %u commit=%u queued=%u events=%u\n",
+                       io_u->ddir, ime_d->head, ime_d->cur_commit,
+                       ime_d->queued, ime_d->events);
+               fio_ime_psyncv_enqueue(ime_d, io_u);
+               return FIO_Q_QUEUED;
+       }
+       else if (io_u->ddir == DDIR_SYNC) {
+               if (ime_native_fsync(io_u->file->fd) < 0) {
+                       io_u->error = errno;
+                       td_verror(td, io_u->error, "fsync");
+               }
+               return FIO_Q_COMPLETED;
+       } else {
+               io_u->error = EINVAL;
+               td_verror(td, io_u->error, "wrong ddir");
+               return FIO_Q_COMPLETED;
+       }
+}
+
+/* Notice: this function comes from the sync engine */
+/* It is used by the commit function to return a proper code and fill
+   some attributes in the io_us appended to the current request. */
+static int fio_ime_psyncv_end(struct thread_data *td, ssize_t bytes)
+{
+       struct ime_data *ime_d = td->io_ops_data;
+       struct io_u *io_u;
+       unsigned int i;
+       int err = errno;
+
+       for (i = 0; i < ime_d->queued; i++) {
+               io_u = ime_d->io_us[i];
+
+               if (bytes == -1)
+                       io_u->error = err;
+               else {
+                       unsigned int this_io;
+
+                       this_io = bytes;
+                       if (this_io > io_u->xfer_buflen)
+                               this_io = io_u->xfer_buflen;
+
+                       io_u->resid = io_u->xfer_buflen - this_io;
+                       io_u->error = 0;
+                       bytes -= this_io;
+               }
+       }
+
+       if (bytes == -1) {
+               td_verror(td, err, "xfer psyncv");
+               return -err;
+       }
+
+       return 0;
+}
+
+/* Commits the current request by calling ime_native (with one or several
+   iovecs). After this commit, the corresponding events (one per iovec)
+   can be retrieved by get_events. */
+static int fio_ime_psyncv_commit(struct thread_data *td)
+{
+       struct ime_data *ime_d = td->io_ops_data;
+       struct imesio_req *ioreq;
+       int ret = 0;
+
+       /* Exit if there are no (new) events to commit
+          or if the previous committed event haven't been retrieved */
+       if (!ime_d->queued || ime_d->events)
+               return 0;
+
+       ioreq = ime_d->sioreq;
+       ime_d->events = ime_d->queued;
+       if (ioreq->ddir == DDIR_READ)
+               ret = ime_native_preadv(ioreq->fd, ime_d->iovecs, ime_d->queued, ioreq->offset);
+       else
+               ret = ime_native_pwritev(ioreq->fd, ime_d->iovecs, ime_d->queued, ioreq->offset);
+
+       dprint(FD_IO, "committed %d iovecs\n", ime_d->queued);
+
+       return fio_ime_psyncv_end(td, ret);
+}
+
+static int fio_ime_psyncv_getevents(struct thread_data *td, unsigned int min,
+                               unsigned int max, const struct timespec *t)
+{
+       struct ime_data *ime_d = td->io_ops_data;
+       struct io_u *io_u;
+       int events = 0;
+       unsigned int count;
+
+       if (ime_d->events) {
+               for (count = 0; count < ime_d->events; count++) {
+                       io_u = ime_d->io_us[count];
+                       ime_d->event_io_us[events] = io_u;
+                       events++;
+               }
+               fio_ime_queue_reset(ime_d);
+       }
+
+       dprint(FD_IO, "getevents(%u,%u) ret=%d queued=%u events=%u\n",
+               min, max, events, ime_d->queued, ime_d->events);
+       return events;
+}
+
+static int fio_ime_psyncv_init(struct thread_data *td)
+{
+       struct ime_data *ime_d;
+
+       if (fio_ime_engine_init(td) < 0)
+               return 1;
+
+       ime_d = calloc(1, sizeof(*ime_d));
+
+       ime_d->sioreq = malloc(sizeof(struct imesio_req));
+       ime_d->iovecs = malloc(td->o.iodepth * sizeof(struct iovec));
+       ime_d->io_us = malloc(2 * td->o.iodepth * sizeof(struct io_u *));
+       ime_d->event_io_us = ime_d->io_us + td->o.iodepth;
+
+       ime_d->depth = td->o.iodepth;
+
+       td->io_ops_data = ime_d;
+       return 0;
+}
+
+static void fio_ime_psyncv_clean(struct thread_data *td)
+{
+       struct ime_data *ime_d = td->io_ops_data;
+
+       if (ime_d) {
+               free(ime_d->sioreq);
+               free(ime_d->iovecs);
+               free(ime_d->io_us);
+               free(ime_d);
+               td->io_ops_data = NULL;
+       }
+
+       fio_ime_engine_finalize(td);
+}
+
+
+/**************************************************************
+ *           Private functions for non-blocking IOs
+ *
+ **************************************************************/
+
+void fio_ime_aio_complete_cb  (struct ime_aiocb *aiocb, int err,
+                                                          ssize_t bytes)
+{
+       struct imeaio_req *ioreq = (struct imeaio_req *) aiocb->user_context;
+
+       pthread_mutex_lock(&ioreq->status_mutex);
+       ioreq->status = err == 0 ? bytes : FIO_IME_REQ_ERROR;
+       pthread_mutex_unlock(&ioreq->status_mutex);
+
+       pthread_cond_signal(&ioreq->cond_endio);
+}
+
+static bool fio_ime_aio_can_queue (struct ime_data *ime_d, struct io_u *io_u)
+{
+       /* So far we can queue in any case. */
+       return true;
+}
+static bool fio_ime_aio_can_append (struct ime_data *ime_d, struct io_u *io_u)
+{
+       /* We can only append if:
+               - The iovecs will be contiguous in the array
+               - There is already a queued iovec
+               - The offsets are contiguous
+               - The ddir and fs are the same */
+       return (ime_d->head != 0 &&
+                       ime_d->queued - ime_d->events > 0 &&
+                       ime_d->last_offset == io_u->offset &&
+                       ime_d->last_req->ddir == io_u->ddir &&
+                       ime_d->last_req->iocb.fd == io_u->file->fd);
+}
+
+/* Before using this function, we should have already
+   ensured that the queue is not full */
+static void fio_ime_aio_enqueue(struct ime_data *ime_d, struct io_u *io_u)
+{
+       struct imeaio_req *ioreq = &ime_d->aioreqs[ime_d->head];
+       struct ime_aiocb *iocb = &ioreq->iocb;
+       struct iovec *iov = &ime_d->iovecs[ime_d->head];
+
+       iov->iov_base = io_u->xfer_buf;
+       iov->iov_len = io_u->xfer_buflen;
+
+       if (fio_ime_aio_can_append(ime_d, io_u))
+               ime_d->last_req->iocb.iovcnt++;
+       else {
+               ioreq->status = FIO_IME_IN_PROGRESS;
+               ioreq->ddir = io_u->ddir;
+               ime_d->last_req = ioreq;
+
+               iocb->complete_cb = &fio_ime_aio_complete_cb;
+               iocb->fd = io_u->file->fd;
+               iocb->file_offset = io_u->offset;
+               iocb->iov = iov;
+               iocb->iovcnt = 1;
+               iocb->flags = 0;
+               iocb->user_context = (intptr_t) ioreq;
+       }
+
+       ime_d->io_us[ime_d->head] = io_u;
+       ime_d->last_offset = io_u->offset + io_u->xfer_buflen;
+       fio_ime_queue_incr(ime_d);
+}
+
+/* Tries to queue an IO. It will create a new request if the IO can't be
+   appended to the current request. It will fail if the queue can't contain
+   any more io_u/iovec. In this case, commit and then get_events need to be
+   called. */
+static enum fio_q_status fio_ime_aio_queue(struct thread_data *td,
+               struct io_u *io_u)
+{
+       struct ime_data *ime_d = td->io_ops_data;
+
+       fio_ro_check(td, io_u);
+
+       dprint(FD_IO, "queue: ddir=%d at %u commit=%u queued=%u events=%u\n",
+               io_u->ddir, ime_d->head, ime_d->cur_commit,
+               ime_d->queued, ime_d->events);
+
+       if (ime_d->queued == ime_d->depth)
+               return FIO_Q_BUSY;
+
+       if (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_WRITE) {
+               if (!fio_ime_aio_can_queue(ime_d, io_u))
+                       return FIO_Q_BUSY;
+
+               fio_ime_aio_enqueue(ime_d, io_u);
+               return FIO_Q_QUEUED;
+       }
+       else if (io_u->ddir == DDIR_SYNC) {
+               if (ime_native_fsync(io_u->file->fd) < 0) {
+                       io_u->error = errno;
+                       td_verror(td, io_u->error, "fsync");
+               }
+               return FIO_Q_COMPLETED;
+       } else {
+               io_u->error = EINVAL;
+               td_verror(td, io_u->error, "wrong ddir");
+               return FIO_Q_COMPLETED;
+       }
+}
+
+static int fio_ime_aio_commit(struct thread_data *td)
+{
+       struct ime_data *ime_d = td->io_ops_data;
+       struct imeaio_req *ioreq;
+       int ret = 0;
+
+       /* Loop while there are events to commit */
+       while (ime_d->queued - ime_d->events) {
+               ioreq = &ime_d->aioreqs[ime_d->cur_commit];
+               if (ioreq->ddir == DDIR_READ)
+                       ret = ime_native_aio_read(&ioreq->iocb);
+               else
+                       ret = ime_native_aio_write(&ioreq->iocb);
+
+               fio_ime_queue_commit(ime_d, ioreq->iocb.iovcnt);
+
+               /* fio needs a negative error code */
+               if (ret < 0) {
+                       ioreq->status = FIO_IME_REQ_ERROR;
+                       return -errno;
+               }
+
+               io_u_mark_submit(td, ioreq->iocb.iovcnt);
+               dprint(FD_IO, "committed %d iovecs commit=%u queued=%u events=%u\n",
+                       ioreq->iocb.iovcnt, ime_d->cur_commit,
+                       ime_d->queued, ime_d->events);
+       }
+
+       return 0;
+}
+
+static int fio_ime_aio_getevents(struct thread_data *td, unsigned int min,
+                               unsigned int max, const struct timespec *t)
+{
+       struct ime_data *ime_d = td->io_ops_data;
+       struct imeaio_req *ioreq;
+       struct io_u *io_u;
+       int events = 0;
+       unsigned int count;
+       ssize_t bytes;
+
+       while (ime_d->events) {
+               ioreq = &ime_d->aioreqs[ime_d->tail];
+
+               /* Break if we already got events, and if we will
+                  exceed max if we append the next events */
+               if (events && events + ioreq->iocb.iovcnt > max)
+                       break;
+
+               if (ioreq->status != FIO_IME_IN_PROGRESS) {
+
+                       bytes = ioreq->status;
+                       for (count = 0; count < ioreq->iocb.iovcnt; count++) {
+                               io_u = ime_d->io_us[ime_d->tail];
+                               ime_d->event_io_us[events] = io_u;
+                               events++;
+                               fio_ime_queue_red(ime_d);
+
+                               if (ioreq->status == FIO_IME_REQ_ERROR)
+                                       io_u->error = EIO;
+                               else {
+                                       io_u->resid = bytes > io_u->xfer_buflen ?
+                                                                       0 : io_u->xfer_buflen - bytes;
+                                       io_u->error = 0;
+                                       bytes -= io_u->xfer_buflen - io_u->resid;
+                               }
+                       }
+               } else {
+                       pthread_mutex_lock(&ioreq->status_mutex);
+                       while (ioreq->status == FIO_IME_IN_PROGRESS) {
+                               pthread_cond_wait(&ioreq->cond_endio, &ioreq->status_mutex);
+                       }
+                       pthread_mutex_unlock(&ioreq->status_mutex);
+               }
+
+       }
+
+       dprint(FD_IO, "getevents(%u,%u) ret=%d queued=%u events=%u\n", min, max,
+               events, ime_d->queued, ime_d->events);
+       return events;
+}
+
+static int fio_ime_aio_init(struct thread_data *td)
+{
+       struct ime_data *ime_d;
+       struct imeaio_req *ioreq;
+       unsigned int i;
+
+       if (fio_ime_engine_init(td) < 0)
+               return 1;
+
+       ime_d = calloc(1, sizeof(*ime_d));
+
+       ime_d->aioreqs = malloc(td->o.iodepth * sizeof(struct imeaio_req));
+       ime_d->iovecs = malloc(td->o.iodepth * sizeof(struct iovec));
+       ime_d->io_us = malloc(2 * td->o.iodepth * sizeof(struct io_u *));
+       ime_d->event_io_us = ime_d->io_us + td->o.iodepth;
+
+       ime_d->depth = td->o.iodepth;
+       for (i = 0; i < ime_d->depth; i++) {
+               ioreq = &ime_d->aioreqs[i];
+               pthread_cond_init(&ioreq->cond_endio, NULL);
+               pthread_mutex_init(&ioreq->status_mutex, NULL);
+       }
+
+       td->io_ops_data = ime_d;
+       return 0;
+}
+
+static void fio_ime_aio_clean(struct thread_data *td)
+{
+       struct ime_data *ime_d = td->io_ops_data;
+       struct imeaio_req *ioreq;
+       unsigned int i;
+
+       if (ime_d) {
+               for (i = 0; i < ime_d->depth; i++) {
+                       ioreq = &ime_d->aioreqs[i];
+                       pthread_cond_destroy(&ioreq->cond_endio);
+                       pthread_mutex_destroy(&ioreq->status_mutex);
+               }
+               free(ime_d->aioreqs);
+               free(ime_d->iovecs);
+               free(ime_d->io_us);
+               free(ime_d);
+               td->io_ops_data = NULL;
+       }
+
+       fio_ime_engine_finalize(td);
+}
+
+
+/**************************************************************
+ *                   IO engines definitions
+ *
+ **************************************************************/
+
+/* The FIO_DISKLESSIO flag used for these engines is necessary to prevent
+   FIO from using POSIX calls. See fio_ime_open_file for more details. */
+
+static struct ioengine_ops ioengine_prw = {
+       .name           = "ime_psync",
+       .version        = FIO_IOOPS_VERSION,
+       .setup          = fio_ime_setup,
+       .init           = fio_ime_engine_init,
+       .cleanup        = fio_ime_engine_finalize,
+       .queue          = fio_ime_psync_queue,
+       .open_file      = fio_ime_open_file,
+       .close_file     = fio_ime_close_file,
+       .get_file_size  = fio_ime_get_file_size,
+       .unlink_file    = fio_ime_unlink_file,
+       .flags          = FIO_SYNCIO | FIO_DISKLESSIO,
+};
+
+static struct ioengine_ops ioengine_pvrw = {
+       .name           = "ime_psyncv",
+       .version        = FIO_IOOPS_VERSION,
+       .setup          = fio_ime_setup,
+       .init           = fio_ime_psyncv_init,
+       .cleanup        = fio_ime_psyncv_clean,
+       .queue          = fio_ime_psyncv_queue,
+       .commit         = fio_ime_psyncv_commit,
+       .getevents      = fio_ime_psyncv_getevents,
+       .event          = fio_ime_event,
+       .open_file      = fio_ime_open_file,
+       .close_file     = fio_ime_close_file,
+       .get_file_size  = fio_ime_get_file_size,
+       .unlink_file    = fio_ime_unlink_file,
+       .flags          = FIO_SYNCIO | FIO_DISKLESSIO,
+};
+
+static struct ioengine_ops ioengine_aio = {
+       .name           = "ime_aio",
+       .version        = FIO_IOOPS_VERSION,
+       .setup          = fio_ime_setup,
+       .init           = fio_ime_aio_init,
+       .cleanup        = fio_ime_aio_clean,
+       .queue          = fio_ime_aio_queue,
+       .commit         = fio_ime_aio_commit,
+       .getevents      = fio_ime_aio_getevents,
+       .event          = fio_ime_event,
+       .open_file      = fio_ime_open_file,
+       .close_file     = fio_ime_close_file,
+       .get_file_size  = fio_ime_get_file_size,
+       .unlink_file    = fio_ime_unlink_file,
+       .flags          = FIO_DISKLESSIO,
+};
+
+static void fio_init fio_ime_register(void)
+{
+       register_ioengine(&ioengine_prw);
+       register_ioengine(&ioengine_pvrw);
+       register_ioengine(&ioengine_aio);
+}
+
+static void fio_exit fio_ime_unregister(void)
+{
+       unregister_ioengine(&ioengine_prw);
+       unregister_ioengine(&ioengine_pvrw);
+       unregister_ioengine(&ioengine_aio);
+       if (fio_ime_is_initialized && ime_native_finalize() < 0) {
+               log_err("Warning: IME did not finalize properly\n");
+       }
+}
diff --git a/examples/ime.fio b/examples/ime.fio
new file mode 100644 (file)
index 0000000..e97fd1d
--- /dev/null
@@ -0,0 +1,51 @@
+# This jobfile performs basic write+read operations using
+# DDN's Infinite Memory Engine.
+
+[global]
+
+# Use as much jobs as possible to maximize performance
+numjobs=8
+
+# The filename should be uniform so that "read" jobs can read what
+# the "write" jobs have written.
+filename_format=fio-test-ime.$jobnum.$filenum
+
+size=25g
+bs=128k
+
+# These settings are useful for the asynchronous ime_aio engine:
+# by setting the io depth to twice the size of a "batch", we can
+# queue IOs while other IOs are "in-flight".
+iodepth=32
+iodepth_batch=16
+iodepth_batch_complete=16
+
+[write-psync]
+stonewall
+rw=write
+ioengine=ime_psync
+
+[read-psync]
+stonewall
+rw=read
+ioengine=ime_psync
+
+[write-psyncv]
+stonewall
+rw=write
+ioengine=ime_psyncv
+
+[read-psyncv]
+stonewall
+rw=read
+ioengine=ime_psyncv
+
+[write-aio]
+stonewall
+rw=write
+ioengine=ime_aio
+
+[read-aio]
+stonewall
+rw=read
+ioengine=ime_aio
\ No newline at end of file
index f592027..83f86ce 100644 (file)
--- a/options.c
+++ b/options.c
@@ -1889,6 +1889,17 @@ struct fio_option fio_options[FIO_MAX_OPTS] = {
                          },
 
 #endif
+#ifdef CONFIG_IME
+                         { .ival = "ime_psync",
+                           .help = "DDN's IME synchronous IO engine",
+                         },
+                         { .ival = "ime_psyncv",
+                           .help = "DDN's IME synchronous IO engine using iovecs",
+                         },
+                         { .ival = "ime_aio",
+                           .help = "DDN's IME asynchronous IO engine",
+                         },
+#endif
 #ifdef CONFIG_LINUX_DEVDAX
                          { .ival = "dev-dax",
                            .help = "DAX Device based IO engine",