rpma: add librpma_apm_* and librpma_gpspm_* engines
authorJan Michalski <jan.m.michalski@intel.com>
Thu, 11 Feb 2021 11:47:06 +0000 (12:47 +0100)
committerJan M Michalski <jan.m.michalski@intel.com>
Sat, 13 Mar 2021 23:47:47 +0000 (23:47 +0000)
The Remote Persistent Memory Access (RPMA) Library is a C library
created to simplify accessing persistent memory on remote hosts over
Remote Direct Memory Access (RDMA).

The librpma_apm_client and librpma_apm_server is a pair of engines
which allows benchmarking persistent writes achieved via
the Appliance Persistency Method (APM; natively supported by
the librpma library) and regular reads (a part of the RDMA standard).

The librpma_gpspm_client and librpma_gpspm_server is a pair of
engines which allows benchmarking persistent writes achieved via
the General Purpose Persistency Method (GPSPM; build on top of
the librpma API).

The librpma library is available here: https://github.com/pmem/rpma
along with the set of scripts using the newly introduced engines
to construct miscellaneous benchmarking scenarios:
https://github.com/pmem/rpma/tree/master/tools/perf

The full history of the development of the librpma fio engines
is available at: https://github.com/pmem/fio/tree/rpma

Co-Authored-By: Lukasz Dorau <lukasz.dorau@intel.com>
Co-Authored-By: Tomasz Gromadzki <tomasz.gromadzki@intel.com>
Co-Authored-By: Jan Michalski <jan.m.michalski@intel.com>
Co-Authored-By: Oksana Salyk <oksana.salyk@intel.com>
21 files changed:
HOWTO
Makefile
ci/travis-install-librpma.sh [new file with mode: 0755]
ci/travis-install-pmdk.sh [new file with mode: 0755]
ci/travis-install.sh
configure
engines/librpma_apm.c [new file with mode: 0644]
engines/librpma_fio.c [new file with mode: 0644]
engines/librpma_fio.h [new file with mode: 0644]
engines/librpma_gpspm.c [new file with mode: 0644]
engines/librpma_gpspm_flush.pb-c.c [new file with mode: 0644]
engines/librpma_gpspm_flush.pb-c.h [new file with mode: 0644]
engines/librpma_gpspm_flush.proto [new file with mode: 0644]
examples/librpma_apm-client.fio [new file with mode: 0644]
examples/librpma_apm-server.fio [new file with mode: 0644]
examples/librpma_gpspm-client.fio [new file with mode: 0644]
examples/librpma_gpspm-server.fio [new file with mode: 0644]
fio.1
optgroup.c
optgroup.h
options.c

diff --git a/HOWTO b/HOWTO
index 52812cc7de37e7d5d95c34c45e757c8126643709..39f8c63dc714bb87bee9537c80427982158a98a7 100644 (file)
--- a/HOWTO
+++ b/HOWTO
@@ -2189,7 +2189,7 @@ with the caveat that when used on the command line, they must come after the
                this will be the starting port number since fio will use a range of
                ports.
 
-   [rdma]
+   [rdma], [librpma_*]
 
                The port to use for RDMA-CM communication. This should be the same value
                on the client and the server side.
@@ -2200,6 +2200,15 @@ with the caveat that when used on the command line, they must come after the
        is a TCP listener or UDP reader, the hostname is not used and must be omitted
        unless it is a valid UDP multicast address.
 
+.. option:: serverip=str : [librpma_*]
+
+       The IP address to be used for RDMA-CM based I/O.
+
+.. option:: direct_write_to_pmem=bool : [librpma_*]
+
+       Set to 1 only when Direct Write to PMem from the remote host is possible.
+       Otherwise, set to 0.
+
 .. option:: interface=str : [netsplice] [net]
 
        The IP address of the network interface used to send or receive UDP
index 612344d154093f33d165c81746972dae90eebd43..1aa9f37785e91c73a1644d4bebe5dce2d221ce04 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -94,6 +94,21 @@ ifdef CONFIG_RDMA
   rdma_LIBS = -libverbs -lrdmacm
   ENGINES += rdma
 endif
+ifdef CONFIG_LIBRPMA_APM
+  librpma_apm_SRCS = engines/librpma_apm.c
+  librpma_fio_SRCS = engines/librpma_fio.c
+  librpma_apm_LIBS = -lrpma -lpmem
+  ENGINES += librpma_apm
+endif
+ifdef CONFIG_LIBRPMA_GPSPM
+  librpma_gpspm_SRCS = engines/librpma_gpspm.c engines/librpma_gpspm_flush.pb-c.c
+  librpma_fio_SRCS = engines/librpma_fio.c
+  librpma_gpspm_LIBS = -lrpma -lpmem -lprotobuf-c
+  ENGINES += librpma_gpspm
+endif
+ifdef librpma_fio_SRCS
+  SOURCE += $(librpma_fio_SRCS)
+endif
 ifdef CONFIG_POSIXAIO
   SOURCE += engines/posixaio.c
 endif
diff --git a/ci/travis-install-librpma.sh b/ci/travis-install-librpma.sh
new file mode 100755 (executable)
index 0000000..b127f3f
--- /dev/null
@@ -0,0 +1,22 @@
+#!/bin/bash -e
+
+# 11.02.2021 Merge pull request #866 from ldorau/rpma-mmap-memory-for-rpma_mr_reg-in-rpma_flush_apm_new
+LIBRPMA_VERSION=fbac593917e98f3f26abf14f4fad5a832b330f5c
+ZIP_FILE=rpma.zip
+
+WORKDIR=$(pwd)
+
+# install librpma
+wget -O $ZIP_FILE https://github.com/pmem/rpma/archive/${LIBRPMA_VERSION}.zip
+unzip $ZIP_FILE
+mkdir -p rpma-${LIBRPMA_VERSION}/build
+cd rpma-${LIBRPMA_VERSION}/build
+cmake .. -DCMAKE_BUILD_TYPE=Release \
+       -DCMAKE_INSTALL_PREFIX=/usr \
+       -DBUILD_DOC=OFF \
+       -DBUILD_EXAMPLES=OFF \
+       -DBUILD_TESTS=OFF
+make -j$(nproc)
+sudo make -j$(nproc) install
+cd $WORKDIR
+rm -rf $ZIP_FILE rpma-${LIBRPMA_VERSION}
diff --git a/ci/travis-install-pmdk.sh b/ci/travis-install-pmdk.sh
new file mode 100755 (executable)
index 0000000..803438f
--- /dev/null
@@ -0,0 +1,28 @@
+#!/bin/bash -e
+
+# pmdk v1.9.1 release
+PMDK_VERSION=1.9.1
+
+WORKDIR=$(pwd)
+
+#
+# The '/bin/sh' shell used by PMDK's 'make install'
+# does not know the exact localization of clang
+# and fails with:
+#    /bin/sh: 1: clang: not found
+# if CC is not set to the full path of clang.
+#
+export CC=$(which $CC)
+
+# Install PMDK libraries, because PMDK's libpmem
+# is a dependency of the librpma fio engine.
+# Install it from a release package
+# with already generated documentation,
+# in order to not install 'pandoc'.
+wget https://github.com/pmem/pmdk/releases/download/${PMDK_VERSION}/pmdk-${PMDK_VERSION}.tar.gz
+tar -xzf pmdk-${PMDK_VERSION}.tar.gz
+cd pmdk-${PMDK_VERSION}
+make -j$(nproc) NDCTL_ENABLE=n
+sudo make -j$(nproc) install prefix=/usr NDCTL_ENABLE=n
+cd $WORKDIR
+rm -rf pmdk-${PMDK_VERSION}
index 103695dc6d6391f132d90681a6be4e8009b76f87..4c4c04c5d6ae4a781c52270a0f62fcbc337951e5 100755 (executable)
@@ -43,6 +43,16 @@ case "$TRAVIS_OS_NAME" in
        )
        sudo apt-get -qq update
        sudo apt-get install --no-install-recommends -qq -y "${pkgs[@]}"
+       # librpma is supported on the amd64 (x86_64) architecture for now
+       if [[ $CI_TARGET_ARCH == "amd64" ]]; then
+               # install libprotobuf-c-dev required by librpma_gpspm
+               sudo apt-get install --no-install-recommends -qq -y libprotobuf-c-dev
+               # PMDK libraries have to be installed, because
+               # libpmem is a dependency of the librpma fio engine
+               ci/travis-install-pmdk.sh
+               # install librpma from sources from GitHub
+               ci/travis-install-librpma.sh
+       fi
        ;;
     "osx")
        brew update >/dev/null 2>&1
index 748f7014c63541b26ef66dd8b72fcd1a8a27f777..1bbdb8c4a4fca44759eebb3c1ee562c9ce895dbb 100755 (executable)
--- a/configure
+++ b/configure
@@ -920,6 +920,49 @@ if test "$disable_rdma" != "yes" && compile_prog "" "-lrdmacm" "rdma"; then
 fi
 print_config "rdmacm" "$rdmacm"
 
+##########################################
+# librpma probe
+if test "$librpma" != "yes" ; then
+  librpma="no"
+fi
+cat > $TMPC << EOF
+#include <stdio.h>
+#include <librpma.h>
+int main(int argc, char **argv)
+{
+  enum rpma_conn_event event = RPMA_CONN_REJECTED;
+  (void) event; /* unused */
+  rpma_log_set_threshold(RPMA_LOG_THRESHOLD, RPMA_LOG_LEVEL_INFO);
+  return 0;
+}
+EOF
+if test "$disable_rdma" != "yes" && compile_prog "" "-lrpma" "rpma"; then
+    librpma="yes"
+fi
+print_config "librpma" "$librpma"
+
+##########################################
+# libprotobuf-c probe
+if test "$libprotobuf_c" != "yes" ; then
+  libprotobuf_c="no"
+fi
+cat > $TMPC << EOF
+#include <stdio.h>
+#include <protobuf-c/protobuf-c.h>
+#if !defined(PROTOBUF_C_VERSION_NUMBER)
+# error PROTOBUF_C_VERSION_NUMBER is not defined!
+#endif
+int main(int argc, char **argv)
+{
+  (void)protobuf_c_message_check(NULL);
+  return 0;
+}
+EOF
+if compile_prog "" "-lprotobuf-c" "protobuf_c"; then
+    libprotobuf_c="yes"
+fi
+print_config "libprotobuf_c" "$libprotobuf_c"
+
 ##########################################
 # asprintf() and vasprintf() probes
 if test "$have_asprintf" != "yes" ; then
@@ -2788,6 +2831,15 @@ fi
 if test "$libverbs" = "yes" -a "$rdmacm" = "yes" ; then
   output_sym "CONFIG_RDMA"
 fi
+# librpma is supported on the 'x86_64' architecture for now
+if test "$cpu" = "x86_64" -a "$libverbs" = "yes" -a "$rdmacm" = "yes" \
+    -a "$librpma" = "yes" -a "$libpmem" = "yes" ; then
+  output_sym "CONFIG_LIBRPMA_APM"
+fi
+if test "$cpu" = "x86_64" -a "$libverbs" = "yes" -a "$rdmacm" = "yes" \
+    -a "$librpma" = "yes" -a "$libpmem" = "yes" -a "$libprotobuf_c" = "yes" ; then
+  output_sym "CONFIG_LIBRPMA_GPSPM"
+fi
 if test "$clock_gettime" = "yes" ; then
   output_sym "CONFIG_CLOCK_GETTIME"
 fi
diff --git a/engines/librpma_apm.c b/engines/librpma_apm.c
new file mode 100644 (file)
index 0000000..ffa3769
--- /dev/null
@@ -0,0 +1,256 @@
+/*
+* librpma_apm: IO engine that uses PMDK librpma to read and write data,
+ *             based on Appliance Persistency Method
+ *
+ * Copyright 2020-2021, Intel Corporation
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License,
+ * version 2 as published by the Free Software Foundation..
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ */
+
+#include "librpma_fio.h"
+
+/* client side implementation */
+
+static inline int client_io_flush(struct thread_data *td,
+               struct io_u *first_io_u, struct io_u *last_io_u,
+               unsigned long long int len);
+
+static int client_get_io_u_index(struct rpma_completion *cmpl,
+               unsigned int *io_u_index);
+
+static int client_init(struct thread_data *td)
+{
+       struct librpma_fio_client_data *ccd;
+       unsigned int sq_size;
+       uint32_t cq_size;
+       struct rpma_conn_cfg *cfg = NULL;
+       struct rpma_peer_cfg *pcfg = NULL;
+       int ret;
+
+       /* not supported readwrite = trim / randtrim / trimwrite */
+       if (td_trim(td)) {
+               td_verror(td, EINVAL, "Not supported mode.");
+               return -1;
+       }
+
+       /*
+        * Calculate the required queue sizes where:
+        * - the send queue (SQ) has to be big enough to accommodate
+        *   all io_us (WRITEs) and all flush requests (FLUSHes)
+        * - the completion queue (CQ) has to be big enough to accommodate all
+        *   success and error completions (cq_size = sq_size)
+        */
+       if (td_random(td) || td_rw(td)) {
+               /*
+                * sq_size = max(rand_read_sq_size, rand_write_sq_size)
+                * where rand_read_sq_size < rand_write_sq_size because read
+                * does not require flush afterwards
+                * rand_write_sq_size = N * (WRITE + FLUSH)
+                *
+                * Note: rw is no different from random write since having
+                * interleaved reads with writes in extreme forces you to flush
+                * as often as when the writes are random.
+                */
+               sq_size = 2 * td->o.iodepth;
+       } else if (td_write(td)) {
+               /* sequential TD_DDIR_WRITE only */
+               if (td->o.sync_io) {
+                       sq_size = 2; /* WRITE + FLUSH */
+               } else {
+                       /*
+                        * N * WRITE + B * FLUSH where:
+                        * - B == ceil(iodepth / iodepth_batch)
+                        *   which is the number of batches for N writes
+                        */
+                       sq_size = td->o.iodepth + LIBRPMA_FIO_CEIL(td->o.iodepth,
+                                       td->o.iodepth_batch);
+               }
+       } else {
+               /* TD_DDIR_READ only */
+               if (td->o.sync_io) {
+                       sq_size = 1; /* READ */
+               } else {
+                       sq_size = td->o.iodepth; /* N x READ */
+               }
+       }
+       cq_size = sq_size;
+
+       /* create a connection configuration object */
+       if ((ret = rpma_conn_cfg_new(&cfg))) {
+               librpma_td_verror(td, ret, "rpma_conn_cfg_new");
+               return -1;
+       }
+
+       /* apply queue sizes */
+       if ((ret = rpma_conn_cfg_set_sq_size(cfg, sq_size))) {
+               librpma_td_verror(td, ret, "rpma_conn_cfg_set_sq_size");
+               goto err_cfg_delete;
+       }
+       if ((ret = rpma_conn_cfg_set_cq_size(cfg, cq_size))) {
+               librpma_td_verror(td, ret, "rpma_conn_cfg_set_cq_size");
+               goto err_cfg_delete;
+       }
+
+       if (librpma_fio_client_init(td, cfg))
+               goto err_cfg_delete;
+
+       ccd = td->io_ops_data;
+
+       if (ccd->server_mr_flush_type == RPMA_FLUSH_TYPE_PERSISTENT) {
+               if (!ccd->ws->direct_write_to_pmem) {
+                       if (td->thread_number == 1)
+                               log_err(
+                                       "Fio librpma engine will not work until the Direct Write to PMem on the server side is possible (direct_write_to_pmem)\n");
+                       goto err_cleanup_common;
+               }
+
+               /* configure peer's direct write to pmem support */
+               if ((ret = rpma_peer_cfg_new(&pcfg))) {
+                       librpma_td_verror(td, ret, "rpma_peer_cfg_new");
+                       goto err_cleanup_common;
+               }
+
+               if ((ret = rpma_peer_cfg_set_direct_write_to_pmem(pcfg, true))) {
+                       librpma_td_verror(td, ret,
+                               "rpma_peer_cfg_set_direct_write_to_pmem");
+                       (void) rpma_peer_cfg_delete(&pcfg);
+                       goto err_cleanup_common;
+               }
+
+               if ((ret = rpma_conn_apply_remote_peer_cfg(ccd->conn, pcfg))) {
+                       librpma_td_verror(td, ret,
+                               "rpma_conn_apply_remote_peer_cfg");
+                       (void) rpma_peer_cfg_delete(&pcfg);
+                       goto err_cleanup_common;
+               }
+
+               (void) rpma_peer_cfg_delete(&pcfg);
+       } else if (td->thread_number == 1) {
+               /* XXX log_info mixes with the JSON output */
+               log_err(
+                       "Note: Direct Write to PMem is not supported by default nor required if you use DRAM instead of PMem on the server side (direct_write_to_pmem).\n"
+                       "Remember that flushing to DRAM does not make your data persistent and may be used only for experimental purposes.\n");
+       }
+
+       if ((ret = rpma_conn_cfg_delete(&cfg))) {
+               librpma_td_verror(td, ret, "rpma_conn_cfg_delete");
+               /* non fatal error - continue */
+       }
+
+       ccd->flush = client_io_flush;
+       ccd->get_io_u_index = client_get_io_u_index;
+
+       return 0;
+
+err_cleanup_common:
+       librpma_fio_client_cleanup(td);
+
+err_cfg_delete:
+       (void) rpma_conn_cfg_delete(&cfg);
+
+       return -1;
+}
+
+static void client_cleanup(struct thread_data *td)
+{
+       struct librpma_fio_client_data *ccd = td->io_ops_data;
+
+       if (ccd == NULL)
+               return;
+
+       free(ccd->client_data);
+
+       librpma_fio_client_cleanup(td);
+}
+
+static inline int client_io_flush(struct thread_data *td,
+               struct io_u *first_io_u, struct io_u *last_io_u,
+               unsigned long long int len)
+{
+       struct librpma_fio_client_data *ccd = td->io_ops_data;
+       size_t dst_offset = first_io_u->offset;
+       int ret;
+
+       if ((ret = rpma_flush(ccd->conn, ccd->server_mr, dst_offset, len,
+                       ccd->server_mr_flush_type, RPMA_F_COMPLETION_ALWAYS,
+                       (void *)(uintptr_t)last_io_u->index))) {
+               librpma_td_verror(td, ret, "rpma_flush");
+               return -1;
+       }
+
+       return 0;
+}
+
+static int client_get_io_u_index(struct rpma_completion *cmpl,
+               unsigned int *io_u_index)
+{
+       memcpy(io_u_index, &cmpl->op_context, sizeof(*io_u_index));
+
+       return 1;
+}
+
+FIO_STATIC struct ioengine_ops ioengine_client = {
+       .name                   = "librpma_apm_client",
+       .version                = FIO_IOOPS_VERSION,
+       .init                   = client_init,
+       .post_init              = librpma_fio_client_post_init,
+       .get_file_size          = librpma_fio_client_get_file_size,
+       .open_file              = librpma_fio_file_nop,
+       .queue                  = librpma_fio_client_queue,
+       .commit                 = librpma_fio_client_commit,
+       .getevents              = librpma_fio_client_getevents,
+       .event                  = librpma_fio_client_event,
+       .errdetails             = librpma_fio_client_errdetails,
+       .close_file             = librpma_fio_file_nop,
+       .cleanup                = client_cleanup,
+       .flags                  = FIO_DISKLESSIO,
+       .options                = librpma_fio_options,
+       .option_struct_size     = sizeof(struct librpma_fio_options_values),
+};
+
+/* server side implementation */
+
+static int server_open_file(struct thread_data *td, struct fio_file *f)
+{
+       return librpma_fio_server_open_file(td, f, NULL);
+}
+
+static enum fio_q_status server_queue(struct thread_data *td, struct io_u *io_u)
+{
+       return FIO_Q_COMPLETED;
+}
+
+FIO_STATIC struct ioengine_ops ioengine_server = {
+       .name                   = "librpma_apm_server",
+       .version                = FIO_IOOPS_VERSION,
+       .init                   = librpma_fio_server_init,
+       .open_file              = server_open_file,
+       .close_file             = librpma_fio_server_close_file,
+       .queue                  = server_queue,
+       .invalidate             = librpma_fio_file_nop,
+       .cleanup                = librpma_fio_server_cleanup,
+       .flags                  = FIO_SYNCIO,
+       .options                = librpma_fio_options,
+       .option_struct_size     = sizeof(struct librpma_fio_options_values),
+};
+
+/* register both engines */
+
+static void fio_init fio_librpma_apm_register(void)
+{
+       register_ioengine(&ioengine_client);
+       register_ioengine(&ioengine_server);
+}
+
+static void fio_exit fio_librpma_apm_unregister(void)
+{
+       unregister_ioengine(&ioengine_client);
+       unregister_ioengine(&ioengine_server);
+}
diff --git a/engines/librpma_fio.c b/engines/librpma_fio.c
new file mode 100644 (file)
index 0000000..810b55e
--- /dev/null
@@ -0,0 +1,1051 @@
+/*
+ * librpma_fio: librpma_apm and librpma_gpspm engines' common part.
+ *
+ * Copyright 2021, Intel Corporation
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License,
+ * version 2 as published by the Free Software Foundation..
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ */
+
+#include "librpma_fio.h"
+
+#include <libpmem.h>
+
+struct fio_option librpma_fio_options[] = {
+       {
+               .name   = "serverip",
+               .lname  = "rpma_server_ip",
+               .type   = FIO_OPT_STR_STORE,
+               .off1   = offsetof(struct librpma_fio_options_values, server_ip),
+               .help   = "IP address the server is listening on",
+               .def    = "",
+               .category = FIO_OPT_C_ENGINE,
+               .group  = FIO_OPT_G_LIBRPMA,
+       },
+       {
+               .name   = "port",
+               .lname  = "rpma_server port",
+               .type   = FIO_OPT_STR_STORE,
+               .off1   = offsetof(struct librpma_fio_options_values, port),
+               .help   = "port the server is listening on",
+               .def    = "7204",
+               .category = FIO_OPT_C_ENGINE,
+               .group  = FIO_OPT_G_LIBRPMA,
+       },
+       {
+               .name   = "direct_write_to_pmem",
+               .lname  = "Direct Write to PMem (via RDMA) from the remote host is possible",
+               .type   = FIO_OPT_BOOL,
+               .off1   = offsetof(struct librpma_fio_options_values,
+                                       direct_write_to_pmem),
+               .help   = "Set to true ONLY when Direct Write to PMem from the remote host is possible (https://pmem.io/rpma/documentation/basic-direct-write-to-pmem.html)",
+               .def    = "",
+               .category = FIO_OPT_C_ENGINE,
+               .group  = FIO_OPT_G_LIBRPMA,
+       },
+       {
+               .name   = NULL,
+       },
+};
+
+int librpma_fio_td_port(const char *port_base_str, struct thread_data *td,
+               char *port_out)
+{
+       unsigned long int port_ul = strtoul(port_base_str, NULL, 10);
+       unsigned int port_new;
+
+       port_out[0] = '\0';
+
+       if (port_ul == ULONG_MAX) {
+               td_verror(td, errno, "strtoul");
+               return -1;
+       }
+       port_ul += td->thread_number - 1;
+       if (port_ul >= UINT_MAX) {
+               log_err("[%u] port number (%lu) bigger than UINT_MAX\n",
+                       td->thread_number, port_ul);
+               return -1;
+       }
+
+       port_new = port_ul;
+       snprintf(port_out, LIBRPMA_FIO_PORT_STR_LEN_MAX - 1, "%u", port_new);
+
+       return 0;
+}
+
+char *librpma_fio_allocate_dram(struct thread_data *td, size_t size,
+       struct librpma_fio_mem *mem)
+{
+       char *mem_ptr = NULL;
+       int ret;
+
+       if ((ret = posix_memalign((void **)&mem_ptr, page_size, size))) {
+               log_err("fio: posix_memalign() failed\n");
+               td_verror(td, ret, "posix_memalign");
+               return NULL;
+       }
+
+       mem->mem_ptr = mem_ptr;
+       mem->size_mmap = 0;
+
+       return mem_ptr;
+}
+
+char *librpma_fio_allocate_pmem(struct thread_data *td, const char *filename,
+               size_t size, struct librpma_fio_mem *mem)
+{
+       size_t size_mmap = 0;
+       char *mem_ptr = NULL;
+       int is_pmem = 0;
+       size_t ws_offset;
+
+       if (size % page_size) {
+               log_err("fio: size (%zu) is not aligned to page size (%zu)\n",
+                       size, page_size);
+               return NULL;
+       }
+
+       ws_offset = (td->thread_number - 1) * size;
+
+       if (!filename) {
+               log_err("fio: filename is not set\n");
+               return NULL;
+       }
+
+       /* map the file */
+       mem_ptr = pmem_map_file(filename, 0 /* len */, 0 /* flags */,
+                       0 /* mode */, &size_mmap, &is_pmem);
+       if (mem_ptr == NULL) {
+               log_err("fio: pmem_map_file(%s) failed\n", filename);
+               /* pmem_map_file() sets errno on failure */
+               td_verror(td, errno, "pmem_map_file");
+               return NULL;
+       }
+
+       /* pmem is expected */
+       if (!is_pmem) {
+               log_err("fio: %s is not located in persistent memory\n",
+                       filename);
+               goto err_unmap;
+       }
+
+       /* check size of allocated persistent memory */
+       if (size_mmap < ws_offset + size) {
+               log_err(
+                       "fio: %s is too small to handle so many threads (%zu < %zu)\n",
+                       filename, size_mmap, ws_offset + size);
+               goto err_unmap;
+       }
+
+       log_info("fio: size of memory mapped from the file %s: %zu\n",
+               filename, size_mmap);
+
+       mem->mem_ptr = mem_ptr;
+       mem->size_mmap = size_mmap;
+
+       return mem_ptr + ws_offset;
+
+err_unmap:
+       (void) pmem_unmap(mem_ptr, size_mmap);
+       return NULL;
+}
+
+void librpma_fio_free(struct librpma_fio_mem *mem)
+{
+       if (mem->size_mmap)
+               (void) pmem_unmap(mem->mem_ptr, mem->size_mmap);
+       else
+               free(mem->mem_ptr);
+}
+
+#define LIBRPMA_FIO_RETRY_MAX_NO       10
+#define LIBRPMA_FIO_RETRY_DELAY_S      5
+
+int librpma_fio_client_init(struct thread_data *td,
+               struct rpma_conn_cfg *cfg)
+{
+       struct librpma_fio_client_data *ccd;
+       struct librpma_fio_options_values *o = td->eo;
+       struct ibv_context *dev = NULL;
+       char port_td[LIBRPMA_FIO_PORT_STR_LEN_MAX];
+       struct rpma_conn_req *req = NULL;
+       enum rpma_conn_event event;
+       struct rpma_conn_private_data pdata;
+       enum rpma_log_level log_level_aux = RPMA_LOG_LEVEL_WARNING;
+       int remote_flush_type;
+       int retry;
+       int ret;
+
+       /* --debug=net sets RPMA_LOG_THRESHOLD_AUX to RPMA_LOG_LEVEL_INFO */
+#ifdef FIO_INC_DEBUG
+       if ((1UL << FD_NET) & fio_debug)
+               log_level_aux = RPMA_LOG_LEVEL_INFO;
+#endif
+
+       /* configure logging thresholds to see more details */
+       rpma_log_set_threshold(RPMA_LOG_THRESHOLD, RPMA_LOG_LEVEL_INFO);
+       rpma_log_set_threshold(RPMA_LOG_THRESHOLD_AUX, log_level_aux);
+
+       /* obtain an IBV context for a remote IP address */
+       if ((ret = rpma_utils_get_ibv_context(o->server_ip,
+                       RPMA_UTIL_IBV_CONTEXT_REMOTE, &dev))) {
+               librpma_td_verror(td, ret, "rpma_utils_get_ibv_context");
+               return -1;
+       }
+
+       /* allocate client's data */
+       ccd = calloc(1, sizeof(*ccd));
+       if (ccd == NULL) {
+               td_verror(td, errno, "calloc");
+               return -1;
+       }
+
+       /* allocate all in-memory queues */
+       ccd->io_us_queued = calloc(td->o.iodepth, sizeof(*ccd->io_us_queued));
+       if (ccd->io_us_queued == NULL) {
+               td_verror(td, errno, "calloc");
+               goto err_free_ccd;
+       }
+
+       ccd->io_us_flight = calloc(td->o.iodepth, sizeof(*ccd->io_us_flight));
+       if (ccd->io_us_flight == NULL) {
+               td_verror(td, errno, "calloc");
+               goto err_free_io_u_queues;
+       }
+
+       ccd->io_us_completed = calloc(td->o.iodepth,
+                       sizeof(*ccd->io_us_completed));
+       if (ccd->io_us_completed == NULL) {
+               td_verror(td, errno, "calloc");
+               goto err_free_io_u_queues;
+       }
+
+       /* create a new peer object */
+       if ((ret = rpma_peer_new(dev, &ccd->peer))) {
+               librpma_td_verror(td, ret, "rpma_peer_new");
+               goto err_free_io_u_queues;
+       }
+
+       /* create a connection request */
+       if (librpma_fio_td_port(o->port, td, port_td))
+               goto err_peer_delete;
+
+       for (retry = 0; retry < LIBRPMA_FIO_RETRY_MAX_NO; retry++) {
+               if ((ret = rpma_conn_req_new(ccd->peer, o->server_ip, port_td,
+                               cfg, &req))) {
+                       librpma_td_verror(td, ret, "rpma_conn_req_new");
+                       goto err_peer_delete;
+               }
+
+               /*
+                * Connect the connection request
+                * and obtain the connection object.
+                */
+               if ((ret = rpma_conn_req_connect(&req, NULL, &ccd->conn))) {
+                       librpma_td_verror(td, ret, "rpma_conn_req_connect");
+                       goto err_req_delete;
+               }
+
+               /* wait for the connection to establish */
+               if ((ret = rpma_conn_next_event(ccd->conn, &event))) {
+                       librpma_td_verror(td, ret, "rpma_conn_next_event");
+                       goto err_conn_delete;
+               } else if (event == RPMA_CONN_ESTABLISHED) {
+                       break;
+               } else if (event == RPMA_CONN_REJECTED) {
+                       (void) rpma_conn_disconnect(ccd->conn);
+                       (void) rpma_conn_delete(&ccd->conn);
+                       if (retry < LIBRPMA_FIO_RETRY_MAX_NO - 1) {
+                               log_err("Thread [%d]: Retrying (#%i) ...\n",
+                                       td->thread_number, retry + 1);
+                               sleep(LIBRPMA_FIO_RETRY_DELAY_S);
+                       } else {
+                               log_err(
+                                       "Thread [%d]: The maximum number of retries exceeded. Closing.\n",
+                                       td->thread_number);
+                       }
+               } else {
+                       log_err(
+                               "rpma_conn_next_event returned an unexptected event: (%s != RPMA_CONN_ESTABLISHED)\n",
+                               rpma_utils_conn_event_2str(event));
+                       goto err_conn_delete;
+               }
+       }
+
+       if (retry > 0)
+               log_err("Thread [%d]: Connected after retry #%i\n",
+                       td->thread_number, retry);
+
+       if (ccd->conn == NULL)
+               goto err_peer_delete;
+
+       /* get the connection's private data sent from the server */
+       if ((ret = rpma_conn_get_private_data(ccd->conn, &pdata))) {
+               librpma_td_verror(td, ret, "rpma_conn_get_private_data");
+               goto err_conn_delete;
+       }
+
+       /* get the server's workspace representation */
+       ccd->ws = pdata.ptr;
+
+       /* create the server's memory representation */
+       if ((ret = rpma_mr_remote_from_descriptor(&ccd->ws->descriptor[0],
+                       ccd->ws->mr_desc_size, &ccd->server_mr))) {
+               librpma_td_verror(td, ret, "rpma_mr_remote_from_descriptor");
+               goto err_conn_delete;
+       }
+
+       /* get the total size of the shared server memory */
+       if ((ret = rpma_mr_remote_get_size(ccd->server_mr, &ccd->ws_size))) {
+               librpma_td_verror(td, ret, "rpma_mr_remote_get_size");
+               goto err_conn_delete;
+       }
+
+       /* get flush type of the remote node */
+       if ((ret = rpma_mr_remote_get_flush_type(ccd->server_mr,
+                       &remote_flush_type))) {
+               librpma_td_verror(td, ret, "rpma_mr_remote_get_flush_type");
+               goto err_conn_delete;
+       }
+
+       ccd->server_mr_flush_type =
+               (remote_flush_type & RPMA_MR_USAGE_FLUSH_TYPE_PERSISTENT) ?
+               RPMA_FLUSH_TYPE_PERSISTENT : RPMA_FLUSH_TYPE_VISIBILITY;
+
+       /*
+        * Assure an io_us buffer allocation is page-size-aligned which is required
+        * to register for RDMA. User-provided value is intentionally ignored.
+        */
+       td->o.mem_align = page_size;
+
+       td->io_ops_data = ccd;
+
+       return 0;
+
+err_conn_delete:
+       (void) rpma_conn_disconnect(ccd->conn);
+       (void) rpma_conn_delete(&ccd->conn);
+
+err_req_delete:
+       (void) rpma_conn_req_delete(&req);
+
+err_peer_delete:
+       (void) rpma_peer_delete(&ccd->peer);
+
+err_free_io_u_queues:
+       free(ccd->io_us_queued);
+       free(ccd->io_us_flight);
+       free(ccd->io_us_completed);
+
+err_free_ccd:
+       free(ccd);
+
+       return -1;
+}
+
+void librpma_fio_client_cleanup(struct thread_data *td)
+{
+       struct librpma_fio_client_data *ccd = td->io_ops_data;
+       enum rpma_conn_event ev;
+       int ret;
+
+       if (ccd == NULL)
+               return;
+
+       /* delete the iou's memory registration */
+       if ((ret = rpma_mr_dereg(&ccd->orig_mr)))
+               librpma_td_verror(td, ret, "rpma_mr_dereg");
+       /* delete the iou's memory registration */
+       if ((ret = rpma_mr_remote_delete(&ccd->server_mr)))
+               librpma_td_verror(td, ret, "rpma_mr_remote_delete");
+       /* initiate disconnection */
+       if ((ret = rpma_conn_disconnect(ccd->conn)))
+               librpma_td_verror(td, ret, "rpma_conn_disconnect");
+       /* wait for disconnection to end up */
+       if ((ret = rpma_conn_next_event(ccd->conn, &ev))) {
+               librpma_td_verror(td, ret, "rpma_conn_next_event");
+       } else if (ev != RPMA_CONN_CLOSED) {
+               log_err(
+                       "client_cleanup received an unexpected event (%s != RPMA_CONN_CLOSED)\n",
+                       rpma_utils_conn_event_2str(ev));
+       }
+       /* delete the connection */
+       if ((ret = rpma_conn_delete(&ccd->conn)))
+               librpma_td_verror(td, ret, "rpma_conn_delete");
+       /* delete the peer */
+       if ((ret = rpma_peer_delete(&ccd->peer)))
+               librpma_td_verror(td, ret, "rpma_peer_delete");
+       /* free the software queues */
+       free(ccd->io_us_queued);
+       free(ccd->io_us_flight);
+       free(ccd->io_us_completed);
+       free(ccd);
+       td->io_ops_data = NULL; /* zero ccd */
+}
+
+int librpma_fio_file_nop(struct thread_data *td, struct fio_file *f)
+{
+       /* NOP */
+       return 0;
+}
+
+int librpma_fio_client_post_init(struct thread_data *td)
+{
+       struct librpma_fio_client_data *ccd =  td->io_ops_data;
+       size_t io_us_size;
+       int ret;
+
+       /*
+        * td->orig_buffer is not aligned. The engine requires aligned io_us
+        * so FIO alignes up the address using the formula below.
+        */
+       ccd->orig_buffer_aligned = PTR_ALIGN(td->orig_buffer, page_mask) +
+                       td->o.mem_align;
+
+       /*
+        * td->orig_buffer_size beside the space really consumed by io_us
+        * has paddings which can be omitted for the memory registration.
+        */
+       io_us_size = (unsigned long long)td_max_bs(td) *
+                       (unsigned long long)td->o.iodepth;
+
+       if ((ret = rpma_mr_reg(ccd->peer, ccd->orig_buffer_aligned, io_us_size,
+                       RPMA_MR_USAGE_READ_DST | RPMA_MR_USAGE_READ_SRC |
+                       RPMA_MR_USAGE_WRITE_DST | RPMA_MR_USAGE_WRITE_SRC |
+                       RPMA_MR_USAGE_FLUSH_TYPE_PERSISTENT, &ccd->orig_mr)))
+               librpma_td_verror(td, ret, "rpma_mr_reg");
+       return ret;
+}
+
+int librpma_fio_client_get_file_size(struct thread_data *td,
+               struct fio_file *f)
+{
+       struct librpma_fio_client_data *ccd = td->io_ops_data;
+
+       f->real_file_size = ccd->ws_size;
+       fio_file_set_size_known(f);
+
+       return 0;
+}
+
+static enum fio_q_status client_queue_sync(struct thread_data *td,
+               struct io_u *io_u)
+{
+       struct librpma_fio_client_data *ccd = td->io_ops_data;
+       struct rpma_completion cmpl;
+       unsigned io_u_index;
+       int ret;
+
+       /* execute io_u */
+       if (io_u->ddir == DDIR_READ) {
+               /* post an RDMA read operation */
+               if (librpma_fio_client_io_read(td, io_u,
+                               RPMA_F_COMPLETION_ALWAYS))
+                       goto err;
+       } else if (io_u->ddir == DDIR_WRITE) {
+               /* post an RDMA write operation */
+               if (librpma_fio_client_io_write(td, io_u))
+                       goto err;
+               if (ccd->flush(td, io_u, io_u, io_u->xfer_buflen))
+                       goto err;
+       } else {
+               log_err("unsupported IO mode: %s\n", io_ddir_name(io_u->ddir));
+               goto err;
+       }
+
+       do {
+               /* get a completion */
+               ret = rpma_conn_completion_get(ccd->conn, &cmpl);
+               if (ret == RPMA_E_NO_COMPLETION) {
+                       /* lack of completion is not an error */
+                       continue;
+               } else if (ret != 0) {
+                       /* an error occurred */
+                       librpma_td_verror(td, ret, "rpma_conn_completion_get");
+                       goto err;
+               }
+
+               /* if io_us has completed with an error */
+               if (cmpl.op_status != IBV_WC_SUCCESS)
+                       goto err;
+
+               if (cmpl.op == RPMA_OP_SEND)
+                       ++ccd->op_send_completed;
+               else {
+                       if (cmpl.op == RPMA_OP_RECV)
+                               ++ccd->op_recv_completed;
+
+                       break;
+               }
+       } while (1);
+
+       if (ccd->get_io_u_index(&cmpl, &io_u_index) != 1)
+               goto err;
+
+       if (io_u->index != io_u_index) {
+               log_err(
+                       "no matching io_u for received completion found (io_u_index=%u)\n",
+                       io_u_index);
+               goto err;
+       }
+
+       /* make sure all SENDs are completed before exit - clean up SQ */
+       if (librpma_fio_client_io_complete_all_sends(td))
+               goto err;
+
+       return FIO_Q_COMPLETED;
+
+err:
+       io_u->error = -1;
+       return FIO_Q_COMPLETED;
+}
+
+enum fio_q_status librpma_fio_client_queue(struct thread_data *td,
+               struct io_u *io_u)
+{
+       struct librpma_fio_client_data *ccd = td->io_ops_data;
+
+       if (ccd->io_u_queued_nr == (int)td->o.iodepth)
+               return FIO_Q_BUSY;
+
+       if (td->o.sync_io)
+               return client_queue_sync(td, io_u);
+
+       /* io_u -> queued[] */
+       ccd->io_us_queued[ccd->io_u_queued_nr] = io_u;
+       ccd->io_u_queued_nr++;
+
+       return FIO_Q_QUEUED;
+}
+
+int librpma_fio_client_commit(struct thread_data *td)
+{
+       struct librpma_fio_client_data *ccd = td->io_ops_data;
+       int flags = RPMA_F_COMPLETION_ON_ERROR;
+       struct timespec now;
+       bool fill_time;
+       int i;
+       struct io_u *flush_first_io_u = NULL;
+       unsigned long long int flush_len = 0;
+
+       if (!ccd->io_us_queued)
+               return -1;
+
+       /* execute all io_us from queued[] */
+       for (i = 0; i < ccd->io_u_queued_nr; i++) {
+               struct io_u *io_u = ccd->io_us_queued[i];
+
+               if (io_u->ddir == DDIR_READ) {
+                       if (i + 1 == ccd->io_u_queued_nr ||
+                           ccd->io_us_queued[i + 1]->ddir == DDIR_WRITE)
+                               flags = RPMA_F_COMPLETION_ALWAYS;
+                       /* post an RDMA read operation */
+                       if (librpma_fio_client_io_read(td, io_u, flags))
+                               return -1;
+               } else if (io_u->ddir == DDIR_WRITE) {
+                       /* post an RDMA write operation */
+                       if (librpma_fio_client_io_write(td, io_u))
+                               return -1;
+
+                       /* cache the first io_u in the sequence */
+                       if (flush_first_io_u == NULL)
+                               flush_first_io_u = io_u;
+
+                       /*
+                        * the flush length is the sum of all io_u's creating
+                        * the sequence
+                        */
+                       flush_len += io_u->xfer_buflen;
+
+                       /*
+                        * if io_u's are random the rpma_flush is required
+                        * after each one of them
+                        */
+                       if (!td_random(td)) {
+                               /*
+                                * When the io_u's are sequential and
+                                * the current io_u is not the last one and
+                                * the next one is also a write operation
+                                * the flush can be postponed by one io_u and
+                                * cover all of them which build a continuous
+                                * sequence.
+                                */
+                               if ((i + 1 < ccd->io_u_queued_nr) &&
+                                   (ccd->io_us_queued[i + 1]->ddir == DDIR_WRITE))
+                                       continue;
+                       }
+
+                       /* flush all writes which build a continuous sequence */
+                       if (ccd->flush(td, flush_first_io_u, io_u, flush_len))
+                               return -1;
+
+                       /*
+                        * reset the flush parameters in preparation for
+                        * the next one
+                        */
+                       flush_first_io_u = NULL;
+                       flush_len = 0;
+               } else {
+                       log_err("unsupported IO mode: %s\n",
+                               io_ddir_name(io_u->ddir));
+                       return -1;
+               }
+       }
+
+       if ((fill_time = fio_fill_issue_time(td)))
+               fio_gettime(&now, NULL);
+
+       /* move executed io_us from queued[] to flight[] */
+       for (i = 0; i < ccd->io_u_queued_nr; i++) {
+               struct io_u *io_u = ccd->io_us_queued[i];
+
+               /* FIO does not do this if the engine is asynchronous */
+               if (fill_time)
+                       memcpy(&io_u->issue_time, &now, sizeof(now));
+
+               /* move executed io_us from queued[] to flight[] */
+               ccd->io_us_flight[ccd->io_u_flight_nr] = io_u;
+               ccd->io_u_flight_nr++;
+
+               /*
+                * FIO says:
+                * If an engine has the commit hook
+                * it has to call io_u_queued() itself.
+                */
+               io_u_queued(td, io_u);
+       }
+
+       /* FIO does not do this if an engine has the commit hook. */
+       io_u_mark_submit(td, ccd->io_u_queued_nr);
+       ccd->io_u_queued_nr = 0;
+
+       return 0;
+}
+
+/*
+ * RETURN VALUE
+ * - > 0  - a number of completed io_us
+ * -   0  - when no complicitions received
+ * - (-1) - when an error occurred
+ */
+static int client_getevent_process(struct thread_data *td)
+{
+       struct librpma_fio_client_data *ccd = td->io_ops_data;
+       struct rpma_completion cmpl;
+       /* io_u->index of completed io_u (cmpl.op_context) */
+       unsigned int io_u_index;
+       /* # of completed io_us */
+       int cmpl_num = 0;
+       /* helpers */
+       struct io_u *io_u;
+       int i;
+       int ret;
+
+       /* get a completion */
+       if ((ret = rpma_conn_completion_get(ccd->conn, &cmpl))) {
+               /* lack of completion is not an error */
+               if (ret == RPMA_E_NO_COMPLETION) {
+                       /* lack of completion is not an error */
+                       return 0;
+               }
+
+               /* an error occurred */
+               librpma_td_verror(td, ret, "rpma_conn_completion_get");
+               return -1;
+       }
+
+       /* if io_us has completed with an error */
+       if (cmpl.op_status != IBV_WC_SUCCESS) {
+               td->error = cmpl.op_status;
+               return -1;
+       }
+
+       if (cmpl.op == RPMA_OP_SEND)
+               ++ccd->op_send_completed;
+       else if (cmpl.op == RPMA_OP_RECV)
+               ++ccd->op_recv_completed;
+
+       if ((ret = ccd->get_io_u_index(&cmpl, &io_u_index)) != 1)
+               return ret;
+
+       /* look for an io_u being completed */
+       for (i = 0; i < ccd->io_u_flight_nr; ++i) {
+               if (ccd->io_us_flight[i]->index == io_u_index) {
+                       cmpl_num = i + 1;
+                       break;
+               }
+       }
+
+       /* if no matching io_u has been found */
+       if (cmpl_num == 0) {
+               log_err(
+                       "no matching io_u for received completion found (io_u_index=%u)\n",
+                       io_u_index);
+               return -1;
+       }
+
+       /* move completed io_us to the completed in-memory queue */
+       for (i = 0; i < cmpl_num; ++i) {
+               /* get and prepare io_u */
+               io_u = ccd->io_us_flight[i];
+
+               /* append to the queue */
+               ccd->io_us_completed[ccd->io_u_completed_nr] = io_u;
+               ccd->io_u_completed_nr++;
+       }
+
+       /* remove completed io_us from the flight queue */
+       for (i = cmpl_num; i < ccd->io_u_flight_nr; ++i)
+               ccd->io_us_flight[i - cmpl_num] = ccd->io_us_flight[i];
+       ccd->io_u_flight_nr -= cmpl_num;
+
+       return cmpl_num;
+}
+
+int librpma_fio_client_getevents(struct thread_data *td, unsigned int min,
+               unsigned int max, const struct timespec *t)
+{
+       struct librpma_fio_client_data *ccd = td->io_ops_data;
+       /* total # of completed io_us */
+       int cmpl_num_total = 0;
+       /* # of completed io_us from a single event */
+       int cmpl_num;
+
+       do {
+               cmpl_num = client_getevent_process(td);
+               if (cmpl_num > 0) {
+                       /* new completions collected */
+                       cmpl_num_total += cmpl_num;
+               } else if (cmpl_num == 0) {
+                       /*
+                        * It is required to make sure that CQEs for SENDs
+                        * will flow at least at the same pace as CQEs for RECVs.
+                        */
+                       if (cmpl_num_total >= min &&
+                           ccd->op_send_completed >= ccd->op_recv_completed)
+                               break;
+
+                       /*
+                        * To reduce CPU consumption one can use
+                        * the rpma_conn_completion_wait() function.
+                        * Note this greatly increase the latency
+                        * and make the results less stable.
+                        * The bandwidth stays more or less the same.
+                        */
+               } else {
+                       /* an error occurred */
+                       return -1;
+               }
+
+               /*
+                * The expected max can be exceeded if CQEs for RECVs will come up
+                * faster than CQEs for SENDs. But it is required to make sure CQEs for
+                * SENDs will flow at least at the same pace as CQEs for RECVs.
+                */
+       } while (cmpl_num_total < max ||
+                       ccd->op_send_completed < ccd->op_recv_completed);
+
+       /*
+        * All posted SENDs are completed and RECVs for them (responses) are
+        * completed. This is the initial situation so the counters are reset.
+        */
+       if (ccd->op_send_posted == ccd->op_send_completed &&
+                       ccd->op_send_completed == ccd->op_recv_completed) {
+               ccd->op_send_posted = 0;
+               ccd->op_send_completed = 0;
+               ccd->op_recv_completed = 0;
+       }
+
+       return cmpl_num_total;
+}
+
+struct io_u *librpma_fio_client_event(struct thread_data *td, int event)
+{
+       struct librpma_fio_client_data *ccd = td->io_ops_data;
+       struct io_u *io_u;
+       int i;
+
+       /* get the first io_u from the queue */
+       io_u = ccd->io_us_completed[0];
+
+       /* remove the first io_u from the queue */
+       for (i = 1; i < ccd->io_u_completed_nr; ++i)
+               ccd->io_us_completed[i - 1] = ccd->io_us_completed[i];
+       ccd->io_u_completed_nr--;
+
+       dprint_io_u(io_u, "client_event");
+
+       return io_u;
+}
+
+char *librpma_fio_client_errdetails(struct io_u *io_u)
+{
+       /* get the string representation of an error */
+       enum ibv_wc_status status = io_u->error;
+       const char *status_str = ibv_wc_status_str(status);
+
+       char *details = strdup(status_str);
+       if (details == NULL) {
+               fprintf(stderr, "Error: %s\n", status_str);
+               fprintf(stderr, "Fatal error: out of memory. Aborting.\n");
+               abort();
+       }
+
+       /* FIO frees the returned string when it becomes obsolete */
+       return details;
+}
+
+int librpma_fio_server_init(struct thread_data *td)
+{
+       struct librpma_fio_options_values *o = td->eo;
+       struct librpma_fio_server_data *csd;
+       struct ibv_context *dev = NULL;
+       enum rpma_log_level log_level_aux = RPMA_LOG_LEVEL_WARNING;
+       int ret = -1;
+
+       /* --debug=net sets RPMA_LOG_THRESHOLD_AUX to RPMA_LOG_LEVEL_INFO */
+#ifdef FIO_INC_DEBUG
+       if ((1UL << FD_NET) & fio_debug)
+               log_level_aux = RPMA_LOG_LEVEL_INFO;
+#endif
+
+       /* configure logging thresholds to see more details */
+       rpma_log_set_threshold(RPMA_LOG_THRESHOLD, RPMA_LOG_LEVEL_INFO);
+       rpma_log_set_threshold(RPMA_LOG_THRESHOLD_AUX, log_level_aux);
+
+
+       /* obtain an IBV context for a remote IP address */
+       if ((ret = rpma_utils_get_ibv_context(o->server_ip,
+                       RPMA_UTIL_IBV_CONTEXT_LOCAL, &dev))) {
+               librpma_td_verror(td, ret, "rpma_utils_get_ibv_context");
+               return -1;
+       }
+
+       /* allocate server's data */
+       csd = calloc(1, sizeof(*csd));
+       if (csd == NULL) {
+               td_verror(td, errno, "calloc");
+               return -1;
+       }
+
+       /* create a new peer object */
+       if ((ret = rpma_peer_new(dev, &csd->peer))) {
+               librpma_td_verror(td, ret, "rpma_peer_new");
+               goto err_free_csd;
+       }
+
+       td->io_ops_data = csd;
+
+       return 0;
+
+err_free_csd:
+       free(csd);
+
+       return -1;
+}
+
+void librpma_fio_server_cleanup(struct thread_data *td)
+{
+       struct librpma_fio_server_data *csd =  td->io_ops_data;
+       int ret;
+
+       if (csd == NULL)
+               return;
+
+       /* free the peer */
+       if ((ret = rpma_peer_delete(&csd->peer)))
+               librpma_td_verror(td, ret, "rpma_peer_delete");
+
+       free(csd);
+}
+
+int librpma_fio_server_open_file(struct thread_data *td, struct fio_file *f,
+               struct rpma_conn_cfg *cfg)
+{
+       struct librpma_fio_server_data *csd = td->io_ops_data;
+       struct librpma_fio_options_values *o = td->eo;
+       enum rpma_conn_event conn_event = RPMA_CONN_UNDEFINED;
+       struct librpma_fio_workspace ws = {0};
+       struct rpma_conn_private_data pdata;
+       uint32_t max_msg_num;
+       struct rpma_conn_req *conn_req;
+       struct rpma_conn *conn;
+       struct rpma_mr_local *mr;
+       char port_td[LIBRPMA_FIO_PORT_STR_LEN_MAX];
+       struct rpma_ep *ep;
+       size_t mem_size = td->o.size;
+       size_t mr_desc_size;
+       void *ws_ptr;
+       int usage_mem_type;
+       int ret;
+
+       if (!f->file_name) {
+               log_err("fio: filename is not set\n");
+               return -1;
+       }
+
+       /* start a listening endpoint at addr:port */
+       if (librpma_fio_td_port(o->port, td, port_td))
+               return -1;
+
+       if ((ret = rpma_ep_listen(csd->peer, o->server_ip, port_td, &ep))) {
+               librpma_td_verror(td, ret, "rpma_ep_listen");
+               return -1;
+       }
+
+       if (strcmp(f->file_name, "malloc") == 0) {
+               /* allocation from DRAM using posix_memalign() */
+               ws_ptr = librpma_fio_allocate_dram(td, mem_size, &csd->mem);
+               usage_mem_type = RPMA_MR_USAGE_FLUSH_TYPE_VISIBILITY;
+       } else {
+               /* allocation from PMEM using pmem_map_file() */
+               ws_ptr = librpma_fio_allocate_pmem(td, f->file_name,
+                               mem_size, &csd->mem);
+               usage_mem_type = RPMA_MR_USAGE_FLUSH_TYPE_PERSISTENT;
+       }
+
+       if (ws_ptr == NULL)
+               goto err_ep_shutdown;
+
+       f->real_file_size = mem_size;
+
+       if ((ret = rpma_mr_reg(csd->peer, ws_ptr, mem_size,
+                       RPMA_MR_USAGE_READ_DST | RPMA_MR_USAGE_READ_SRC |
+                       RPMA_MR_USAGE_WRITE_DST | RPMA_MR_USAGE_WRITE_SRC |
+                       usage_mem_type, &mr))) {
+               librpma_td_verror(td, ret, "rpma_mr_reg");
+               goto err_free;
+       }
+
+       /* get size of the memory region's descriptor */
+       if ((ret = rpma_mr_get_descriptor_size(mr, &mr_desc_size))) {
+               librpma_td_verror(td, ret, "rpma_mr_get_descriptor_size");
+               goto err_mr_dereg;
+       }
+
+       /* verify size of the memory region's descriptor */
+       if (mr_desc_size > LIBRPMA_FIO_DESCRIPTOR_MAX_SIZE) {
+               log_err(
+                       "size of the memory region's descriptor is too big (max=%i)\n",
+                       LIBRPMA_FIO_DESCRIPTOR_MAX_SIZE);
+               goto err_mr_dereg;
+       }
+
+       /* get the memory region's descriptor */
+       if ((ret = rpma_mr_get_descriptor(mr, &ws.descriptor[0]))) {
+               librpma_td_verror(td, ret, "rpma_mr_get_descriptor");
+               goto err_mr_dereg;
+       }
+
+       if (cfg != NULL) {
+               if ((ret = rpma_conn_cfg_get_rq_size(cfg, &max_msg_num))) {
+                       librpma_td_verror(td, ret, "rpma_conn_cfg_get_rq_size");
+                       goto err_mr_dereg;
+               }
+
+               /* verify whether iodepth fits into uint16_t */
+               if (max_msg_num > UINT16_MAX) {
+                       log_err("fio: iodepth too big (%u > %u)\n",
+                               max_msg_num, UINT16_MAX);
+                       return -1;
+               }
+
+               ws.max_msg_num = max_msg_num;
+       }
+
+       /* prepare a workspace description */
+       ws.direct_write_to_pmem = o->direct_write_to_pmem;
+       ws.mr_desc_size = mr_desc_size;
+       pdata.ptr = &ws;
+       pdata.len = sizeof(ws);
+
+       /* receive an incoming connection request */
+       if ((ret = rpma_ep_next_conn_req(ep, cfg, &conn_req))) {
+               librpma_td_verror(td, ret, "rpma_ep_next_conn_req");
+               goto err_mr_dereg;
+       }
+
+       if (csd->prepare_connection && csd->prepare_connection(td, conn_req))
+               goto err_req_delete;
+
+       /* accept the connection request and obtain the connection object */
+       if ((ret = rpma_conn_req_connect(&conn_req, &pdata, &conn))) {
+               librpma_td_verror(td, ret, "rpma_conn_req_connect");
+               goto err_req_delete;
+       }
+
+       /* wait for the connection to be established */
+       if ((ret = rpma_conn_next_event(conn, &conn_event))) {
+               librpma_td_verror(td, ret, "rpma_conn_next_event");
+               goto err_conn_delete;
+       } else if (conn_event != RPMA_CONN_ESTABLISHED) {
+               log_err("rpma_conn_next_event returned an unexptected event\n");
+               goto err_conn_delete;
+       }
+
+       /* end-point is no longer needed */
+       (void) rpma_ep_shutdown(&ep);
+
+       csd->ws_mr = mr;
+       csd->ws_ptr = ws_ptr;
+       csd->conn = conn;
+
+       return 0;
+
+err_conn_delete:
+       (void) rpma_conn_delete(&conn);
+
+err_req_delete:
+       (void) rpma_conn_req_delete(&conn_req);
+
+err_mr_dereg:
+       (void) rpma_mr_dereg(&mr);
+
+err_free:
+       librpma_fio_free(&csd->mem);
+
+err_ep_shutdown:
+       (void) rpma_ep_shutdown(&ep);
+
+       return -1;
+}
+
+int librpma_fio_server_close_file(struct thread_data *td, struct fio_file *f)
+{
+       struct librpma_fio_server_data *csd = td->io_ops_data;
+       enum rpma_conn_event conn_event = RPMA_CONN_UNDEFINED;
+       int rv = 0;
+       int ret;
+
+       /* wait for the connection to be closed */
+       ret = rpma_conn_next_event(csd->conn, &conn_event);
+       if (!ret && conn_event != RPMA_CONN_CLOSED) {
+               log_err("rpma_conn_next_event returned an unexptected event\n");
+               rv = -1;
+       }
+
+       if ((ret = rpma_conn_disconnect(csd->conn))) {
+               librpma_td_verror(td, ret, "rpma_conn_disconnect");
+               rv = -1;
+       }
+
+       if ((ret = rpma_conn_delete(&csd->conn))) {
+               librpma_td_verror(td, ret, "rpma_conn_delete");
+               rv = -1;
+       }
+
+       if ((ret = rpma_mr_dereg(&csd->ws_mr))) {
+               librpma_td_verror(td, ret, "rpma_mr_dereg");
+               rv = -1;
+       }
+
+       librpma_fio_free(&csd->mem);
+
+       return rv;
+}
diff --git a/engines/librpma_fio.h b/engines/librpma_fio.h
new file mode 100644 (file)
index 0000000..8cfb2e2
--- /dev/null
@@ -0,0 +1,273 @@
+/*
+ * librpma_fio: librpma_apm and librpma_gpspm engines' common header.
+ *
+ * Copyright 2021, Intel Corporation
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License,
+ * version 2 as published by the Free Software Foundation..
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ */
+
+#ifndef LIBRPMA_FIO_H
+#define LIBRPMA_FIO_H 1
+
+#include "../fio.h"
+#include "../optgroup.h"
+
+#include <librpma.h>
+
+/* servers' and clients' common */
+
+#define librpma_td_verror(td, err, func) \
+       td_vmsg((td), (err), rpma_err_2str(err), (func))
+
+/* ceil(a / b) = (a + b - 1) / b */
+#define LIBRPMA_FIO_CEIL(a, b) (((a) + (b) - 1) / (b))
+
+/* common option structure for server and client */
+struct librpma_fio_options_values {
+       /*
+        * FIO considers .off1 == 0 absent so the first meaningful field has to
+        * have padding ahead of it.
+        */
+       void *pad;
+       char *server_ip;
+       /* base server listening port */
+       char *port;
+       /* Direct Write to PMem is possible */
+       unsigned int direct_write_to_pmem;
+};
+
+extern struct fio_option librpma_fio_options[];
+
+/*
+ * Limited by the maximum length of the private data
+ * for rdma_connect() in case of RDMA_PS_TCP (28 bytes).
+ */
+#define LIBRPMA_FIO_DESCRIPTOR_MAX_SIZE 24
+
+struct librpma_fio_workspace {
+       uint16_t max_msg_num;   /* # of RQ slots */
+       uint8_t direct_write_to_pmem; /* Direct Write to PMem is possible */
+       uint8_t mr_desc_size;   /* size of mr_desc in descriptor[] */
+       /* buffer containing mr_desc */
+       char descriptor[LIBRPMA_FIO_DESCRIPTOR_MAX_SIZE];
+};
+
+#define LIBRPMA_FIO_PORT_STR_LEN_MAX 12
+
+int librpma_fio_td_port(const char *port_base_str, struct thread_data *td,
+               char *port_out);
+
+struct librpma_fio_mem {
+       /* memory buffer */
+       char *mem_ptr;
+
+       /* size of the mapped persistent memory */
+       size_t size_mmap;
+};
+
+char *librpma_fio_allocate_dram(struct thread_data *td, size_t size,
+               struct librpma_fio_mem *mem);
+
+char *librpma_fio_allocate_pmem(struct thread_data *td, const char *filename,
+               size_t size, struct librpma_fio_mem *mem);
+
+void librpma_fio_free(struct librpma_fio_mem *mem);
+
+/* clients' common */
+
+typedef int (*librpma_fio_flush_t)(struct thread_data *td,
+               struct io_u *first_io_u, struct io_u *last_io_u,
+               unsigned long long int len);
+
+/*
+ * RETURN VALUE
+ * - ( 1) - on success
+ * - ( 0) - skip
+ * - (-1) - on error
+ */
+typedef int (*librpma_fio_get_io_u_index_t)(struct rpma_completion *cmpl,
+               unsigned int *io_u_index);
+
+struct librpma_fio_client_data {
+       struct rpma_peer *peer;
+       struct rpma_conn *conn;
+
+       /* aligned td->orig_buffer */
+       char *orig_buffer_aligned;
+
+       /* ious's base address memory registration (cd->orig_buffer_aligned) */
+       struct rpma_mr_local *orig_mr;
+
+       struct librpma_fio_workspace *ws;
+
+       /* a server's memory representation */
+       struct rpma_mr_remote *server_mr;
+       enum rpma_flush_type server_mr_flush_type;
+
+       /* remote workspace description */
+       size_t ws_size;
+
+       /* in-memory queues */
+       struct io_u **io_us_queued;
+       int io_u_queued_nr;
+       struct io_u **io_us_flight;
+       int io_u_flight_nr;
+       struct io_u **io_us_completed;
+       int io_u_completed_nr;
+
+       /* SQ control. Note: all of them have to be kept in sync. */
+       uint32_t op_send_posted;
+       uint32_t op_send_completed;
+       uint32_t op_recv_completed;
+
+       librpma_fio_flush_t flush;
+       librpma_fio_get_io_u_index_t get_io_u_index;
+
+       /* engine-specific client data */
+       void *client_data;
+};
+
+int librpma_fio_client_init(struct thread_data *td,
+               struct rpma_conn_cfg *cfg);
+void librpma_fio_client_cleanup(struct thread_data *td);
+
+int librpma_fio_file_nop(struct thread_data *td, struct fio_file *f);
+int librpma_fio_client_get_file_size(struct thread_data *td,
+               struct fio_file *f);
+
+int librpma_fio_client_post_init(struct thread_data *td);
+
+enum fio_q_status librpma_fio_client_queue(struct thread_data *td,
+               struct io_u *io_u);
+
+int librpma_fio_client_commit(struct thread_data *td);
+
+int librpma_fio_client_getevents(struct thread_data *td, unsigned int min,
+               unsigned int max, const struct timespec *t);
+
+struct io_u *librpma_fio_client_event(struct thread_data *td, int event);
+
+char *librpma_fio_client_errdetails(struct io_u *io_u);
+
+static inline int librpma_fio_client_io_read(struct thread_data *td,
+               struct io_u *io_u, int flags)
+{
+       struct librpma_fio_client_data *ccd = td->io_ops_data;
+       size_t dst_offset = (char *)(io_u->xfer_buf) - ccd->orig_buffer_aligned;
+       size_t src_offset = io_u->offset;
+       int ret;
+
+       if ((ret = rpma_read(ccd->conn, ccd->orig_mr, dst_offset,
+                       ccd->server_mr, src_offset, io_u->xfer_buflen,
+                       flags, (void *)(uintptr_t)io_u->index))) {
+               librpma_td_verror(td, ret, "rpma_read");
+               return -1;
+       }
+
+       return 0;
+}
+
+static inline int librpma_fio_client_io_write(struct thread_data *td,
+               struct io_u *io_u)
+{
+       struct librpma_fio_client_data *ccd = td->io_ops_data;
+       size_t src_offset = (char *)(io_u->xfer_buf) - ccd->orig_buffer_aligned;
+       size_t dst_offset = io_u->offset;
+       int ret;
+
+       if ((ret = rpma_write(ccd->conn, ccd->server_mr, dst_offset,
+                       ccd->orig_mr, src_offset, io_u->xfer_buflen,
+                       RPMA_F_COMPLETION_ON_ERROR,
+                       (void *)(uintptr_t)io_u->index))) {
+               librpma_td_verror(td, ret, "rpma_write");
+               return -1;
+       }
+
+       return 0;
+}
+
+static inline int librpma_fio_client_io_complete_all_sends(
+               struct thread_data *td)
+{
+       struct librpma_fio_client_data *ccd = td->io_ops_data;
+       struct rpma_completion cmpl;
+       int ret;
+
+       while (ccd->op_send_posted != ccd->op_send_completed) {
+               /* get a completion */
+               ret = rpma_conn_completion_get(ccd->conn, &cmpl);
+               if (ret == RPMA_E_NO_COMPLETION) {
+                       /* lack of completion is not an error */
+                       continue;
+               } else if (ret != 0) {
+                       /* an error occurred */
+                       librpma_td_verror(td, ret, "rpma_conn_completion_get");
+                       break;
+               }
+
+               if (cmpl.op_status != IBV_WC_SUCCESS)
+                       return -1;
+
+               if (cmpl.op == RPMA_OP_SEND)
+                       ++ccd->op_send_completed;
+               else {
+                       log_err(
+                               "A completion other than RPMA_OP_SEND got during cleaning up the CQ from SENDs\n");
+                       return -1;
+               }
+       }
+
+       /*
+        * All posted SENDs are completed and RECVs for them (responses) are
+        * completed. This is the initial situation so the counters are reset.
+        */
+       if (ccd->op_send_posted == ccd->op_send_completed &&
+                       ccd->op_send_completed == ccd->op_recv_completed) {
+               ccd->op_send_posted = 0;
+               ccd->op_send_completed = 0;
+               ccd->op_recv_completed = 0;
+       }
+
+       return 0;
+}
+
+/* servers' common */
+
+typedef int (*librpma_fio_prepare_connection_t)(
+               struct thread_data *td,
+               struct rpma_conn_req *conn_req);
+
+struct librpma_fio_server_data {
+       struct rpma_peer *peer;
+
+       /* resources of an incoming connection */
+       struct rpma_conn *conn;
+
+       char *ws_ptr;
+       struct rpma_mr_local *ws_mr;
+       struct librpma_fio_mem mem;
+
+       /* engine-specific server data */
+       void *server_data;
+
+       librpma_fio_prepare_connection_t prepare_connection;
+};
+
+int librpma_fio_server_init(struct thread_data *td);
+
+void librpma_fio_server_cleanup(struct thread_data *td);
+
+int librpma_fio_server_open_file(struct thread_data *td,
+               struct fio_file *f, struct rpma_conn_cfg *cfg);
+
+int librpma_fio_server_close_file(struct thread_data *td,
+               struct fio_file *f);
+
+#endif /* LIBRPMA_FIO_H */
diff --git a/engines/librpma_gpspm.c b/engines/librpma_gpspm.c
new file mode 100644 (file)
index 0000000..ac614f4
--- /dev/null
@@ -0,0 +1,755 @@
+/*
+ * librpma_gpspm: IO engine that uses PMDK librpma to write data,
+ *             based on General Purpose Server Persistency Method
+ *
+ * Copyright 2020-2021, Intel Corporation
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License,
+ * version 2 as published by the Free Software Foundation..
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ */
+
+#include "librpma_fio.h"
+
+#include <libpmem.h>
+
+/* Generated by the protocol buffer compiler from: librpma_gpspm_flush.proto */
+#include "librpma_gpspm_flush.pb-c.h"
+
+#define MAX_MSG_SIZE (512)
+#define IO_U_BUF_LEN (2 * MAX_MSG_SIZE)
+#define SEND_OFFSET (0)
+#define RECV_OFFSET (SEND_OFFSET + MAX_MSG_SIZE)
+
+#define GPSPM_FLUSH_REQUEST__LAST \
+       { PROTOBUF_C_MESSAGE_INIT(&gpspm_flush_request__descriptor), 0, 0, 0 }
+
+/*
+ * 'Flush_req_last' is the last flush request
+ * the client has to send to server to indicate
+ * that the client is done.
+ */
+static const GPSPMFlushRequest Flush_req_last = GPSPM_FLUSH_REQUEST__LAST;
+
+#define IS_NOT_THE_LAST_MESSAGE(flush_req) \
+       (flush_req->length != Flush_req_last.length || \
+       flush_req->offset != Flush_req_last.offset)
+
+/* client side implementation */
+
+/* get next io_u message buffer in the round-robin fashion */
+#define IO_U_NEXT_BUF_OFF_CLIENT(cd) \
+       (IO_U_BUF_LEN * ((cd->msg_curr++) % cd->msg_num))
+
+struct client_data {
+       /* memory for sending and receiving buffered */
+       char *io_us_msgs;
+
+       /* resources for messaging buffer */
+       uint32_t msg_num;
+       uint32_t msg_curr;
+       struct rpma_mr_local *msg_mr;
+};
+
+static inline int client_io_flush(struct thread_data *td,
+               struct io_u *first_io_u, struct io_u *last_io_u,
+               unsigned long long int len);
+
+static int client_get_io_u_index(struct rpma_completion *cmpl,
+               unsigned int *io_u_index);
+
+static int client_init(struct thread_data *td)
+{
+       struct librpma_fio_client_data *ccd;
+       struct client_data *cd;
+       uint32_t write_num;
+       struct rpma_conn_cfg *cfg = NULL;
+       int ret;
+
+       /*
+        * not supported:
+        * - readwrite = read / trim / randread / randtrim /
+        *               / rw / randrw / trimwrite
+        */
+       if (td_read(td) || td_trim(td)) {
+               td_verror(td, EINVAL, "Not supported mode.");
+               return -1;
+       }
+
+       /* allocate client's data */
+       cd = calloc(1, sizeof(*cd));
+       if (cd == NULL) {
+               td_verror(td, errno, "calloc");
+               return -1;
+       }
+
+       /*
+        * Calculate the required number of WRITEs and FLUSHes.
+        *
+        * Note: Each flush is a request (SEND) and response (RECV) pair.
+        */
+       if (td_random(td)) {
+               write_num = td->o.iodepth; /* WRITE * N */
+               cd->msg_num = td->o.iodepth; /* FLUSH * N */
+       } else {
+               if (td->o.sync_io) {
+                       write_num = 1; /* WRITE */
+                       cd->msg_num = 1; /* FLUSH */
+               } else {
+                       write_num = td->o.iodepth; /* WRITE * N */
+                       /*
+                        * FLUSH * B where:
+                        * - B == ceil(iodepth / iodepth_batch)
+                        *   which is the number of batches for N writes
+                        */
+                       cd->msg_num = LIBRPMA_FIO_CEIL(td->o.iodepth,
+                                       td->o.iodepth_batch);
+               }
+       }
+
+       /* create a connection configuration object */
+       if ((ret = rpma_conn_cfg_new(&cfg))) {
+               librpma_td_verror(td, ret, "rpma_conn_cfg_new");
+               goto err_free_cd;
+       }
+
+       /*
+        * Calculate the required queue sizes where:
+        * - the send queue (SQ) has to be big enough to accommodate
+        *   all io_us (WRITEs) and all flush requests (SENDs)
+        * - the receive queue (RQ) has to be big enough to accommodate
+        *   all flush responses (RECVs)
+        * - the completion queue (CQ) has to be big enough to accommodate all
+        *   success and error completions (sq_size + rq_size)
+        */
+       if ((ret = rpma_conn_cfg_set_sq_size(cfg, write_num + cd->msg_num))) {
+               librpma_td_verror(td, ret, "rpma_conn_cfg_set_sq_size");
+               goto err_cfg_delete;
+       }
+       if ((ret = rpma_conn_cfg_set_rq_size(cfg, cd->msg_num))) {
+               librpma_td_verror(td, ret, "rpma_conn_cfg_set_rq_size");
+               goto err_cfg_delete;
+       }
+       if ((ret = rpma_conn_cfg_set_cq_size(cfg, write_num + cd->msg_num * 2))) {
+               librpma_td_verror(td, ret, "rpma_conn_cfg_set_cq_size");
+               goto err_cfg_delete;
+       }
+
+       if (librpma_fio_client_init(td, cfg))
+               goto err_cfg_delete;
+
+       ccd = td->io_ops_data;
+
+       if (ccd->ws->direct_write_to_pmem &&
+           ccd->server_mr_flush_type == RPMA_FLUSH_TYPE_PERSISTENT &&
+           td->thread_number == 1) {
+               /* XXX log_info mixes with the JSON output */
+               log_err(
+                       "Note: The server side supports Direct Write to PMem and it is equipped with PMem (direct_write_to_pmem).\n"
+                       "You can use librpma_client and librpma_server engines for better performance instead of GPSPM.\n");
+       }
+
+       /* validate the server's RQ capacity */
+       if (cd->msg_num > ccd->ws->max_msg_num) {
+               log_err(
+                       "server's RQ size (iodepth) too small to handle the client's workspace requirements (%u < %u)\n",
+                       ccd->ws->max_msg_num, cd->msg_num);
+               goto err_cleanup_common;
+       }
+
+       if ((ret = rpma_conn_cfg_delete(&cfg))) {
+               librpma_td_verror(td, ret, "rpma_conn_cfg_delete");
+               /* non fatal error - continue */
+       }
+
+       ccd->flush = client_io_flush;
+       ccd->get_io_u_index = client_get_io_u_index;
+       ccd->client_data = cd;
+
+       return 0;
+
+err_cleanup_common:
+       librpma_fio_client_cleanup(td);
+
+err_cfg_delete:
+       (void) rpma_conn_cfg_delete(&cfg);
+
+err_free_cd:
+       free(cd);
+
+       return -1;
+}
+
+static int client_post_init(struct thread_data *td)
+{
+       struct librpma_fio_client_data *ccd = td->io_ops_data;
+       struct client_data *cd = ccd->client_data;
+       unsigned int io_us_msgs_size;
+       int ret;
+
+       /* message buffers initialization and registration */
+       io_us_msgs_size = cd->msg_num * IO_U_BUF_LEN;
+       if ((ret = posix_memalign((void **)&cd->io_us_msgs, page_size,
+                       io_us_msgs_size))) {
+               td_verror(td, ret, "posix_memalign");
+               return ret;
+       }
+       if ((ret = rpma_mr_reg(ccd->peer, cd->io_us_msgs, io_us_msgs_size,
+                       RPMA_MR_USAGE_SEND | RPMA_MR_USAGE_RECV,
+                       &cd->msg_mr))) {
+               librpma_td_verror(td, ret, "rpma_mr_reg");
+               return ret;
+       }
+
+       return librpma_fio_client_post_init(td);
+}
+
+static void client_cleanup(struct thread_data *td)
+{
+       struct librpma_fio_client_data *ccd = td->io_ops_data;
+       struct client_data *cd;
+       size_t flush_req_size;
+       size_t io_u_buf_off;
+       size_t send_offset;
+       void *send_ptr;
+       int ret;
+
+       if (ccd == NULL)
+               return;
+
+       cd = ccd->client_data;
+       if (cd == NULL) {
+               librpma_fio_client_cleanup(td);
+               return;
+       }
+
+       /*
+        * Make sure all SEND completions are collected ergo there are free
+        * slots in the SQ for the last SEND message.
+        *
+        * Note: If any operation will fail we still can send the termination
+        * notice.
+        */
+       (void) librpma_fio_client_io_complete_all_sends(td);
+
+       /* prepare the last flush message and pack it to the send buffer */
+       flush_req_size = gpspm_flush_request__get_packed_size(&Flush_req_last);
+       if (flush_req_size > MAX_MSG_SIZE) {
+               log_err(
+                       "Packed flush request size is bigger than available send buffer space (%zu > %d\n",
+                       flush_req_size, MAX_MSG_SIZE);
+       } else {
+               io_u_buf_off = IO_U_NEXT_BUF_OFF_CLIENT(cd);
+               send_offset = io_u_buf_off + SEND_OFFSET;
+               send_ptr = cd->io_us_msgs + send_offset;
+               (void) gpspm_flush_request__pack(&Flush_req_last, send_ptr);
+
+               /* send the flush message */
+               if ((ret = rpma_send(ccd->conn, cd->msg_mr, send_offset,
+                               flush_req_size, RPMA_F_COMPLETION_ALWAYS,
+                               NULL)))
+                       librpma_td_verror(td, ret, "rpma_send");
+
+               ++ccd->op_send_posted;
+
+               /* Wait for the SEND to complete */
+               (void) librpma_fio_client_io_complete_all_sends(td);
+       }
+
+       /* deregister the messaging buffer memory */
+       if ((ret = rpma_mr_dereg(&cd->msg_mr)))
+               librpma_td_verror(td, ret, "rpma_mr_dereg");
+
+       free(ccd->client_data);
+
+       librpma_fio_client_cleanup(td);
+}
+
+static inline int client_io_flush(struct thread_data *td,
+               struct io_u *first_io_u, struct io_u *last_io_u,
+               unsigned long long int len)
+{
+       struct librpma_fio_client_data *ccd = td->io_ops_data;
+       struct client_data *cd = ccd->client_data;
+       size_t io_u_buf_off = IO_U_NEXT_BUF_OFF_CLIENT(cd);
+       size_t send_offset = io_u_buf_off + SEND_OFFSET;
+       size_t recv_offset = io_u_buf_off + RECV_OFFSET;
+       void *send_ptr = cd->io_us_msgs + send_offset;
+       void *recv_ptr = cd->io_us_msgs + recv_offset;
+       GPSPMFlushRequest flush_req = GPSPM_FLUSH_REQUEST__INIT;
+       size_t flush_req_size = 0;
+       int ret;
+
+       /* prepare a response buffer */
+       if ((ret = rpma_recv(ccd->conn, cd->msg_mr, recv_offset, MAX_MSG_SIZE,
+                       recv_ptr))) {
+               librpma_td_verror(td, ret, "rpma_recv");
+               return -1;
+       }
+
+       /* prepare a flush message and pack it to a send buffer */
+       flush_req.offset = first_io_u->offset;
+       flush_req.length = len;
+       flush_req.op_context = last_io_u->index;
+       flush_req_size = gpspm_flush_request__get_packed_size(&flush_req);
+       if (flush_req_size > MAX_MSG_SIZE) {
+               log_err(
+                       "Packed flush request size is bigger than available send buffer space (%"
+                       PRIu64 " > %d\n", flush_req_size, MAX_MSG_SIZE);
+               return -1;
+       }
+       (void) gpspm_flush_request__pack(&flush_req, send_ptr);
+
+       /* send the flush message */
+       if ((ret = rpma_send(ccd->conn, cd->msg_mr, send_offset, flush_req_size,
+                       RPMA_F_COMPLETION_ALWAYS, NULL))) {
+               librpma_td_verror(td, ret, "rpma_send");
+               return -1;
+       }
+
+       ++ccd->op_send_posted;
+
+       return 0;
+}
+
+static int client_get_io_u_index(struct rpma_completion *cmpl,
+               unsigned int *io_u_index)
+{
+       GPSPMFlushResponse *flush_resp;
+
+       if (cmpl->op != RPMA_OP_RECV)
+               return 0;
+
+       /* unpack a response from the received buffer */
+       flush_resp = gpspm_flush_response__unpack(NULL,
+                       cmpl->byte_len, cmpl->op_context);
+       if (flush_resp == NULL) {
+               log_err("Cannot unpack the flush response buffer\n");
+               return -1;
+       }
+
+       memcpy(io_u_index, &flush_resp->op_context, sizeof(*io_u_index));
+
+       gpspm_flush_response__free_unpacked(flush_resp, NULL);
+
+       return 1;
+}
+
+FIO_STATIC struct ioengine_ops ioengine_client = {
+       .name                   = "librpma_gpspm_client",
+       .version                = FIO_IOOPS_VERSION,
+       .init                   = client_init,
+       .post_init              = client_post_init,
+       .get_file_size          = librpma_fio_client_get_file_size,
+       .open_file              = librpma_fio_file_nop,
+       .queue                  = librpma_fio_client_queue,
+       .commit                 = librpma_fio_client_commit,
+       .getevents              = librpma_fio_client_getevents,
+       .event                  = librpma_fio_client_event,
+       .errdetails             = librpma_fio_client_errdetails,
+       .close_file             = librpma_fio_file_nop,
+       .cleanup                = client_cleanup,
+       .flags                  = FIO_DISKLESSIO,
+       .options                = librpma_fio_options,
+       .option_struct_size     = sizeof(struct librpma_fio_options_values),
+};
+
+/* server side implementation */
+
+#define IO_U_BUFF_OFF_SERVER(i) (i * IO_U_BUF_LEN)
+
+struct server_data {
+       /* aligned td->orig_buffer */
+       char *orig_buffer_aligned;
+
+       /* resources for messaging buffer from DRAM allocated by fio */
+       struct rpma_mr_local *msg_mr;
+
+       uint32_t msg_sqe_available; /* # of free SQ slots */
+
+       /* in-memory queues */
+       struct rpma_completion *msgs_queued;
+       uint32_t msg_queued_nr;
+};
+
+static int server_init(struct thread_data *td)
+{
+       struct librpma_fio_server_data *csd;
+       struct server_data *sd;
+       int ret = -1;
+
+       if ((ret = librpma_fio_server_init(td)))
+               return ret;
+
+       csd = td->io_ops_data;
+
+       /* allocate server's data */
+       sd = calloc(1, sizeof(*sd));
+       if (sd == NULL) {
+               td_verror(td, errno, "calloc");
+               goto err_server_cleanup;
+       }
+
+       /* allocate in-memory queue */
+       sd->msgs_queued = calloc(td->o.iodepth, sizeof(*sd->msgs_queued));
+       if (sd->msgs_queued == NULL) {
+               td_verror(td, errno, "calloc");
+               goto err_free_sd;
+       }
+
+       /*
+        * Assure a single io_u buffer can store both SEND and RECV messages and
+        * an io_us buffer allocation is page-size-aligned which is required
+        * to register for RDMA. User-provided values are intentionally ignored.
+        */
+       td->o.max_bs[DDIR_READ] = IO_U_BUF_LEN;
+       td->o.mem_align = page_size;
+
+       csd->server_data = sd;
+
+       return 0;
+
+err_free_sd:
+       free(sd);
+
+err_server_cleanup:
+       librpma_fio_server_cleanup(td);
+
+       return -1;
+}
+
+static int server_post_init(struct thread_data *td)
+{
+       struct librpma_fio_server_data *csd = td->io_ops_data;
+       struct server_data *sd = csd->server_data;
+       size_t io_us_size;
+       size_t io_u_buflen;
+       int ret;
+
+       /*
+        * td->orig_buffer is not aligned. The engine requires aligned io_us
+        * so FIO alignes up the address using the formula below.
+        */
+       sd->orig_buffer_aligned = PTR_ALIGN(td->orig_buffer, page_mask) +
+                       td->o.mem_align;
+
+       /*
+        * XXX
+        * Each io_u message buffer contains recv and send messages.
+        * Aligning each of those buffers may potentially give
+        * some performance benefits.
+        */
+       io_u_buflen = td_max_bs(td);
+
+       /* check whether io_u buffer is big enough */
+       if (io_u_buflen < IO_U_BUF_LEN) {
+               log_err(
+                       "blocksize too small to accommodate assumed maximal request/response pair size (%" PRIu64 " < %d)\n",
+                       io_u_buflen, IO_U_BUF_LEN);
+               return -1;
+       }
+
+       /*
+        * td->orig_buffer_size beside the space really consumed by io_us
+        * has paddings which can be omitted for the memory registration.
+        */
+       io_us_size = (unsigned long long)io_u_buflen *
+                       (unsigned long long)td->o.iodepth;
+
+       if ((ret = rpma_mr_reg(csd->peer, sd->orig_buffer_aligned, io_us_size,
+                       RPMA_MR_USAGE_SEND | RPMA_MR_USAGE_RECV,
+                       &sd->msg_mr))) {
+               librpma_td_verror(td, ret, "rpma_mr_reg");
+               return -1;
+       }
+
+       return 0;
+}
+
+static void server_cleanup(struct thread_data *td)
+{
+       struct librpma_fio_server_data *csd = td->io_ops_data;
+       struct server_data *sd;
+       int ret;
+
+       if (csd == NULL)
+               return;
+
+       sd = csd->server_data;
+
+       if (sd != NULL) {
+               /* rpma_mr_dereg(messaging buffer from DRAM) */
+               if ((ret = rpma_mr_dereg(&sd->msg_mr)))
+                       librpma_td_verror(td, ret, "rpma_mr_dereg");
+
+               free(sd->msgs_queued);
+               free(sd);
+       }
+
+       librpma_fio_server_cleanup(td);
+}
+
+static int prepare_connection(struct thread_data *td,
+               struct rpma_conn_req *conn_req)
+{
+       struct librpma_fio_server_data *csd = td->io_ops_data;
+       struct server_data *sd = csd->server_data;
+       int ret;
+       int i;
+
+       /* prepare buffers for a flush requests */
+       sd->msg_sqe_available = td->o.iodepth;
+       for (i = 0; i < td->o.iodepth; i++) {
+               size_t offset_recv_msg = IO_U_BUFF_OFF_SERVER(i) + RECV_OFFSET;
+               if ((ret = rpma_conn_req_recv(conn_req, sd->msg_mr,
+                               offset_recv_msg, MAX_MSG_SIZE,
+                               (const void *)(uintptr_t)i))) {
+                       librpma_td_verror(td, ret, "rpma_conn_req_recv");
+                       return ret;
+               }
+       }
+
+       return 0;
+}
+
+static int server_open_file(struct thread_data *td, struct fio_file *f)
+{
+       struct librpma_fio_server_data *csd = td->io_ops_data;
+       struct rpma_conn_cfg *cfg = NULL;
+       uint16_t max_msg_num = td->o.iodepth;
+       int ret;
+
+       csd->prepare_connection = prepare_connection;
+
+       /* create a connection configuration object */
+       if ((ret = rpma_conn_cfg_new(&cfg))) {
+               librpma_td_verror(td, ret, "rpma_conn_cfg_new");
+               return -1;
+       }
+
+       /*
+        * Calculate the required queue sizes where:
+        * - the send queue (SQ) has to be big enough to accommodate
+        *   all possible flush requests (SENDs)
+        * - the receive queue (RQ) has to be big enough to accommodate
+        *   all flush responses (RECVs)
+        * - the completion queue (CQ) has to be big enough to accommodate
+        *   all success and error completions (sq_size + rq_size)
+        */
+       if ((ret = rpma_conn_cfg_set_sq_size(cfg, max_msg_num))) {
+               librpma_td_verror(td, ret, "rpma_conn_cfg_set_sq_size");
+               goto err_cfg_delete;
+       }
+       if ((ret = rpma_conn_cfg_set_rq_size(cfg, max_msg_num))) {
+               librpma_td_verror(td, ret, "rpma_conn_cfg_set_rq_size");
+               goto err_cfg_delete;
+       }
+       if ((ret = rpma_conn_cfg_set_cq_size(cfg, max_msg_num * 2))) {
+               librpma_td_verror(td, ret, "rpma_conn_cfg_set_cq_size");
+               goto err_cfg_delete;
+       }
+
+       ret = librpma_fio_server_open_file(td, f, cfg);
+
+err_cfg_delete:
+       (void) rpma_conn_cfg_delete(&cfg);
+
+       return ret;
+}
+
+static int server_qe_process(struct thread_data *td,
+               struct rpma_completion *cmpl)
+{
+       struct librpma_fio_server_data *csd = td->io_ops_data;
+       struct server_data *sd = csd->server_data;
+       GPSPMFlushRequest *flush_req;
+       GPSPMFlushResponse flush_resp = GPSPM_FLUSH_RESPONSE__INIT;
+       size_t flush_resp_size = 0;
+       size_t send_buff_offset;
+       size_t recv_buff_offset;
+       size_t io_u_buff_offset;
+       void *send_buff_ptr;
+       void *recv_buff_ptr;
+       void *op_ptr;
+       int msg_index;
+       int ret;
+
+       /* calculate SEND/RECV pair parameters */
+       msg_index = (int)(uintptr_t)cmpl->op_context;
+       io_u_buff_offset = IO_U_BUFF_OFF_SERVER(msg_index);
+       send_buff_offset = io_u_buff_offset + SEND_OFFSET;
+       recv_buff_offset = io_u_buff_offset + RECV_OFFSET;
+       send_buff_ptr = sd->orig_buffer_aligned + send_buff_offset;
+       recv_buff_ptr = sd->orig_buffer_aligned + recv_buff_offset;
+
+       /* unpack a flush request from the received buffer */
+       flush_req = gpspm_flush_request__unpack(NULL, cmpl->byte_len,
+                       recv_buff_ptr);
+       if (flush_req == NULL) {
+               log_err("cannot unpack the flush request buffer\n");
+               goto err_terminate;
+       }
+
+       if (IS_NOT_THE_LAST_MESSAGE(flush_req)) {
+               op_ptr = csd->ws_ptr + flush_req->offset;
+               pmem_persist(op_ptr, flush_req->length);
+       } else {
+               /*
+                * This is the last message - the client is done.
+                */
+               gpspm_flush_request__free_unpacked(flush_req, NULL);
+               td->done = true;
+               return 0;
+       }
+
+       /* initiate the next receive operation */
+       if ((ret = rpma_recv(csd->conn, sd->msg_mr, recv_buff_offset,
+                       MAX_MSG_SIZE,
+                       (const void *)(uintptr_t)msg_index))) {
+               librpma_td_verror(td, ret, "rpma_recv");
+               goto err_free_unpacked;
+       }
+
+       /* prepare a flush response and pack it to a send buffer */
+       flush_resp.op_context = flush_req->op_context;
+       flush_resp_size = gpspm_flush_response__get_packed_size(&flush_resp);
+       if (flush_resp_size > MAX_MSG_SIZE) {
+               log_err(
+                       "Size of the packed flush response is bigger than the available space of the send buffer (%"
+                       PRIu64 " > %i\n", flush_resp_size, MAX_MSG_SIZE);
+               goto err_free_unpacked;
+       }
+
+       (void) gpspm_flush_response__pack(&flush_resp, send_buff_ptr);
+
+       /* send the flush response */
+       if ((ret = rpma_send(csd->conn, sd->msg_mr, send_buff_offset,
+                       flush_resp_size, RPMA_F_COMPLETION_ALWAYS, NULL))) {
+               librpma_td_verror(td, ret, "rpma_send");
+               goto err_free_unpacked;
+       }
+       --sd->msg_sqe_available;
+
+       gpspm_flush_request__free_unpacked(flush_req, NULL);
+
+       return 0;
+
+err_free_unpacked:
+       gpspm_flush_request__free_unpacked(flush_req, NULL);
+
+err_terminate:
+       td->terminate = true;
+
+       return -1;
+}
+
+static inline int server_queue_process(struct thread_data *td)
+{
+       struct librpma_fio_server_data *csd = td->io_ops_data;
+       struct server_data *sd = csd->server_data;
+       int ret;
+       int i;
+
+       /* min(# of queue entries, # of SQ entries available) */
+       uint32_t qes_to_process = min(sd->msg_queued_nr, sd->msg_sqe_available);
+       if (qes_to_process == 0)
+               return 0;
+
+       /* process queued completions */
+       for (i = 0; i < qes_to_process; ++i) {
+               if ((ret = server_qe_process(td, &sd->msgs_queued[i])))
+                       return ret;
+       }
+
+       /* progress the queue */
+       for (i = 0; i < sd->msg_queued_nr - qes_to_process; ++i) {
+               memcpy(&sd->msgs_queued[i],
+                       &sd->msgs_queued[qes_to_process + i],
+                       sizeof(sd->msgs_queued[i]));
+       }
+
+       sd->msg_queued_nr -= qes_to_process;
+
+       return 0;
+}
+
+static int server_cmpl_process(struct thread_data *td)
+{
+       struct librpma_fio_server_data *csd = td->io_ops_data;
+       struct server_data *sd = csd->server_data;
+       struct rpma_completion *cmpl = &sd->msgs_queued[sd->msg_queued_nr];
+       int ret;
+
+       ret = rpma_conn_completion_get(csd->conn, cmpl);
+       if (ret == RPMA_E_NO_COMPLETION) {
+               /* lack of completion is not an error */
+               return 0;
+       } else if (ret != 0) {
+               librpma_td_verror(td, ret, "rpma_conn_completion_get");
+               goto err_terminate;
+       }
+
+       /* validate the completion */
+       if (cmpl->op_status != IBV_WC_SUCCESS)
+               goto err_terminate;
+
+       if (cmpl->op == RPMA_OP_RECV)
+               ++sd->msg_queued_nr;
+       else if (cmpl->op == RPMA_OP_SEND)
+               ++sd->msg_sqe_available;
+
+       return 0;
+
+err_terminate:
+       td->terminate = true;
+
+       return -1;
+}
+
+static enum fio_q_status server_queue(struct thread_data *td, struct io_u *io_u)
+{
+       do {
+               if (server_cmpl_process(td))
+                       return FIO_Q_BUSY;
+
+               if (server_queue_process(td))
+                       return FIO_Q_BUSY;
+
+       } while (!td->done);
+
+       return FIO_Q_COMPLETED;
+}
+
+FIO_STATIC struct ioengine_ops ioengine_server = {
+       .name                   = "librpma_gpspm_server",
+       .version                = FIO_IOOPS_VERSION,
+       .init                   = server_init,
+       .post_init              = server_post_init,
+       .open_file              = server_open_file,
+       .close_file             = librpma_fio_server_close_file,
+       .queue                  = server_queue,
+       .invalidate             = librpma_fio_file_nop,
+       .cleanup                = server_cleanup,
+       .flags                  = FIO_SYNCIO,
+       .options                = librpma_fio_options,
+       .option_struct_size     = sizeof(struct librpma_fio_options_values),
+};
+
+/* register both engines */
+
+static void fio_init fio_librpma_gpspm_register(void)
+{
+       register_ioengine(&ioengine_client);
+       register_ioengine(&ioengine_server);
+}
+
+static void fio_exit fio_librpma_gpspm_unregister(void)
+{
+       unregister_ioengine(&ioengine_client);
+       unregister_ioengine(&ioengine_server);
+}
diff --git a/engines/librpma_gpspm_flush.pb-c.c b/engines/librpma_gpspm_flush.pb-c.c
new file mode 100644 (file)
index 0000000..3ff2475
--- /dev/null
@@ -0,0 +1,214 @@
+/*
+ * Copyright 2020, Intel Corporation
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License,
+ * version 2 as published by the Free Software Foundation..
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ */
+
+/* Generated by the protocol buffer compiler. DO NOT EDIT! */
+/* Generated from: librpma_gpspm_flush.proto */
+
+/* Do not generate deprecated warnings for self */
+#ifndef PROTOBUF_C__NO_DEPRECATED
+#define PROTOBUF_C__NO_DEPRECATED
+#endif
+
+#include "librpma_gpspm_flush.pb-c.h"
+void   gpspm_flush_request__init
+                     (GPSPMFlushRequest         *message)
+{
+  static const GPSPMFlushRequest init_value = GPSPM_FLUSH_REQUEST__INIT;
+  *message = init_value;
+}
+size_t gpspm_flush_request__get_packed_size
+                     (const GPSPMFlushRequest *message)
+{
+  assert(message->base.descriptor == &gpspm_flush_request__descriptor);
+  return protobuf_c_message_get_packed_size ((const ProtobufCMessage*)(message));
+}
+size_t gpspm_flush_request__pack
+                     (const GPSPMFlushRequest *message,
+                      uint8_t       *out)
+{
+  assert(message->base.descriptor == &gpspm_flush_request__descriptor);
+  return protobuf_c_message_pack ((const ProtobufCMessage*)message, out);
+}
+size_t gpspm_flush_request__pack_to_buffer
+                     (const GPSPMFlushRequest *message,
+                      ProtobufCBuffer *buffer)
+{
+  assert(message->base.descriptor == &gpspm_flush_request__descriptor);
+  return protobuf_c_message_pack_to_buffer ((const ProtobufCMessage*)message, buffer);
+}
+GPSPMFlushRequest *
+       gpspm_flush_request__unpack
+                     (ProtobufCAllocator  *allocator,
+                      size_t               len,
+                      const uint8_t       *data)
+{
+  return (GPSPMFlushRequest *)
+     protobuf_c_message_unpack (&gpspm_flush_request__descriptor,
+                                allocator, len, data);
+}
+void   gpspm_flush_request__free_unpacked
+                     (GPSPMFlushRequest *message,
+                      ProtobufCAllocator *allocator)
+{
+  if(!message)
+    return;
+  assert(message->base.descriptor == &gpspm_flush_request__descriptor);
+  protobuf_c_message_free_unpacked ((ProtobufCMessage*)message, allocator);
+}
+void   gpspm_flush_response__init
+                     (GPSPMFlushResponse         *message)
+{
+  static const GPSPMFlushResponse init_value = GPSPM_FLUSH_RESPONSE__INIT;
+  *message = init_value;
+}
+size_t gpspm_flush_response__get_packed_size
+                     (const GPSPMFlushResponse *message)
+{
+  assert(message->base.descriptor == &gpspm_flush_response__descriptor);
+  return protobuf_c_message_get_packed_size ((const ProtobufCMessage*)(message));
+}
+size_t gpspm_flush_response__pack
+                     (const GPSPMFlushResponse *message,
+                      uint8_t       *out)
+{
+  assert(message->base.descriptor == &gpspm_flush_response__descriptor);
+  return protobuf_c_message_pack ((const ProtobufCMessage*)message, out);
+}
+size_t gpspm_flush_response__pack_to_buffer
+                     (const GPSPMFlushResponse *message,
+                      ProtobufCBuffer *buffer)
+{
+  assert(message->base.descriptor == &gpspm_flush_response__descriptor);
+  return protobuf_c_message_pack_to_buffer ((const ProtobufCMessage*)message, buffer);
+}
+GPSPMFlushResponse *
+       gpspm_flush_response__unpack
+                     (ProtobufCAllocator  *allocator,
+                      size_t               len,
+                      const uint8_t       *data)
+{
+  return (GPSPMFlushResponse *)
+     protobuf_c_message_unpack (&gpspm_flush_response__descriptor,
+                                allocator, len, data);
+}
+void   gpspm_flush_response__free_unpacked
+                     (GPSPMFlushResponse *message,
+                      ProtobufCAllocator *allocator)
+{
+  if(!message)
+    return;
+  assert(message->base.descriptor == &gpspm_flush_response__descriptor);
+  protobuf_c_message_free_unpacked ((ProtobufCMessage*)message, allocator);
+}
+static const ProtobufCFieldDescriptor gpspm_flush_request__field_descriptors[3] =
+{
+  {
+    "offset",
+    1,
+    PROTOBUF_C_LABEL_REQUIRED,
+    PROTOBUF_C_TYPE_FIXED64,
+    0,   /* quantifier_offset */
+    offsetof(GPSPMFlushRequest, offset),
+    NULL,
+    NULL,
+    0,             /* flags */
+    0,NULL,NULL    /* reserved1,reserved2, etc */
+  },
+  {
+    "length",
+    2,
+    PROTOBUF_C_LABEL_REQUIRED,
+    PROTOBUF_C_TYPE_FIXED64,
+    0,   /* quantifier_offset */
+    offsetof(GPSPMFlushRequest, length),
+    NULL,
+    NULL,
+    0,             /* flags */
+    0,NULL,NULL    /* reserved1,reserved2, etc */
+  },
+  {
+    "op_context",
+    3,
+    PROTOBUF_C_LABEL_REQUIRED,
+    PROTOBUF_C_TYPE_FIXED64,
+    0,   /* quantifier_offset */
+    offsetof(GPSPMFlushRequest, op_context),
+    NULL,
+    NULL,
+    0,             /* flags */
+    0,NULL,NULL    /* reserved1,reserved2, etc */
+  },
+};
+static const unsigned gpspm_flush_request__field_indices_by_name[] = {
+  1,   /* field[1] = length */
+  0,   /* field[0] = offset */
+  2,   /* field[2] = op_context */
+};
+static const ProtobufCIntRange gpspm_flush_request__number_ranges[1 + 1] =
+{
+  { 1, 0 },
+  { 0, 3 }
+};
+const ProtobufCMessageDescriptor gpspm_flush_request__descriptor =
+{
+  PROTOBUF_C__MESSAGE_DESCRIPTOR_MAGIC,
+  "GPSPM_flush_request",
+  "GPSPMFlushRequest",
+  "GPSPMFlushRequest",
+  "",
+  sizeof(GPSPMFlushRequest),
+  3,
+  gpspm_flush_request__field_descriptors,
+  gpspm_flush_request__field_indices_by_name,
+  1,  gpspm_flush_request__number_ranges,
+  (ProtobufCMessageInit) gpspm_flush_request__init,
+  NULL,NULL,NULL    /* reserved[123] */
+};
+static const ProtobufCFieldDescriptor gpspm_flush_response__field_descriptors[1] =
+{
+  {
+    "op_context",
+    1,
+    PROTOBUF_C_LABEL_REQUIRED,
+    PROTOBUF_C_TYPE_FIXED64,
+    0,   /* quantifier_offset */
+    offsetof(GPSPMFlushResponse, op_context),
+    NULL,
+    NULL,
+    0,             /* flags */
+    0,NULL,NULL    /* reserved1,reserved2, etc */
+  },
+};
+static const unsigned gpspm_flush_response__field_indices_by_name[] = {
+  0,   /* field[0] = op_context */
+};
+static const ProtobufCIntRange gpspm_flush_response__number_ranges[1 + 1] =
+{
+  { 1, 0 },
+  { 0, 1 }
+};
+const ProtobufCMessageDescriptor gpspm_flush_response__descriptor =
+{
+  PROTOBUF_C__MESSAGE_DESCRIPTOR_MAGIC,
+  "GPSPM_flush_response",
+  "GPSPMFlushResponse",
+  "GPSPMFlushResponse",
+  "",
+  sizeof(GPSPMFlushResponse),
+  1,
+  gpspm_flush_response__field_descriptors,
+  gpspm_flush_response__field_indices_by_name,
+  1,  gpspm_flush_response__number_ranges,
+  (ProtobufCMessageInit) gpspm_flush_response__init,
+  NULL,NULL,NULL    /* reserved[123] */
+};
diff --git a/engines/librpma_gpspm_flush.pb-c.h b/engines/librpma_gpspm_flush.pb-c.h
new file mode 100644 (file)
index 0000000..ad475a9
--- /dev/null
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2020, Intel Corporation
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License,
+ * version 2 as published by the Free Software Foundation..
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ */
+
+/* Generated by the protocol buffer compiler. DO NOT EDIT! */
+/* Generated from: librpma_gpspm_flush.proto */
+
+#ifndef PROTOBUF_C_GPSPM_5fflush_2eproto__INCLUDED
+#define PROTOBUF_C_GPSPM_5fflush_2eproto__INCLUDED
+
+#include <protobuf-c/protobuf-c.h>
+
+PROTOBUF_C__BEGIN_DECLS
+
+#if PROTOBUF_C_VERSION_NUMBER < 1000000
+# error This file was generated by a newer version of protoc-c which is incompatible with your libprotobuf-c headers. Please update your headers.
+#elif 1003003 < PROTOBUF_C_MIN_COMPILER_VERSION
+# error This file was generated by an older version of protoc-c which is incompatible with your libprotobuf-c headers. Please regenerate this file with a newer version of protoc-c.
+#endif
+
+
+typedef struct _GPSPMFlushRequest GPSPMFlushRequest;
+typedef struct _GPSPMFlushResponse GPSPMFlushResponse;
+
+
+/* --- enums --- */
+
+
+/* --- messages --- */
+
+struct  _GPSPMFlushRequest
+{
+  ProtobufCMessage base;
+  uint64_t offset;
+  uint64_t length;
+  uint64_t op_context;
+};
+#define GPSPM_FLUSH_REQUEST__INIT \
+ { PROTOBUF_C_MESSAGE_INIT (&gpspm_flush_request__descriptor) \
+    , 0, 0, 0 }
+
+
+struct  _GPSPMFlushResponse
+{
+  ProtobufCMessage base;
+  uint64_t op_context;
+};
+#define GPSPM_FLUSH_RESPONSE__INIT \
+ { PROTOBUF_C_MESSAGE_INIT (&gpspm_flush_response__descriptor) \
+    , 0 }
+
+
+/* GPSPMFlushRequest methods */
+void   gpspm_flush_request__init
+                     (GPSPMFlushRequest         *message);
+size_t gpspm_flush_request__get_packed_size
+                     (const GPSPMFlushRequest   *message);
+size_t gpspm_flush_request__pack
+                     (const GPSPMFlushRequest   *message,
+                      uint8_t             *out);
+size_t gpspm_flush_request__pack_to_buffer
+                     (const GPSPMFlushRequest   *message,
+                      ProtobufCBuffer     *buffer);
+GPSPMFlushRequest *
+       gpspm_flush_request__unpack
+                     (ProtobufCAllocator  *allocator,
+                      size_t               len,
+                      const uint8_t       *data);
+void   gpspm_flush_request__free_unpacked
+                     (GPSPMFlushRequest *message,
+                      ProtobufCAllocator *allocator);
+/* GPSPMFlushResponse methods */
+void   gpspm_flush_response__init
+                     (GPSPMFlushResponse         *message);
+size_t gpspm_flush_response__get_packed_size
+                     (const GPSPMFlushResponse   *message);
+size_t gpspm_flush_response__pack
+                     (const GPSPMFlushResponse   *message,
+                      uint8_t             *out);
+size_t gpspm_flush_response__pack_to_buffer
+                     (const GPSPMFlushResponse   *message,
+                      ProtobufCBuffer     *buffer);
+GPSPMFlushResponse *
+       gpspm_flush_response__unpack
+                     (ProtobufCAllocator  *allocator,
+                      size_t               len,
+                      const uint8_t       *data);
+void   gpspm_flush_response__free_unpacked
+                     (GPSPMFlushResponse *message,
+                      ProtobufCAllocator *allocator);
+/* --- per-message closures --- */
+
+typedef void (*GPSPMFlushRequest_Closure)
+                 (const GPSPMFlushRequest *message,
+                  void *closure_data);
+typedef void (*GPSPMFlushResponse_Closure)
+                 (const GPSPMFlushResponse *message,
+                  void *closure_data);
+
+/* --- services --- */
+
+
+/* --- descriptors --- */
+
+extern const ProtobufCMessageDescriptor gpspm_flush_request__descriptor;
+extern const ProtobufCMessageDescriptor gpspm_flush_response__descriptor;
+
+PROTOBUF_C__END_DECLS
+
+
+#endif  /* PROTOBUF_C_GPSPM_5fflush_2eproto__INCLUDED */
diff --git a/engines/librpma_gpspm_flush.proto b/engines/librpma_gpspm_flush.proto
new file mode 100644 (file)
index 0000000..91765a7
--- /dev/null
@@ -0,0 +1,15 @@
+syntax = "proto2";
+
+message GPSPM_flush_request {
+    /* an offset of a region to be flushed within its memory registration */
+    required fixed64 offset = 1;
+    /* a length of a region to be flushed */
+    required fixed64 length = 2;
+    /* a user-defined operation context */
+    required fixed64 op_context = 3;
+}
+
+message GPSPM_flush_response {
+    /* the operation context of a completed request */
+    required fixed64 op_context = 1;
+}
diff --git a/examples/librpma_apm-client.fio b/examples/librpma_apm-client.fio
new file mode 100644 (file)
index 0000000..82a5d20
--- /dev/null
@@ -0,0 +1,24 @@
+# Example of the librpma_apm_client job
+
+[global]
+ioengine=librpma_apm_client
+create_serialize=0 # (required) forces specific initiation sequence
+serverip=[serverip] #IP address the server is listening on
+port=7204 # port(s) the server will listen on, <port; port + numjobs - 1> will be used
+thread
+
+# The client will get a remote memory region description after establishing
+# a connection.
+
+[client]
+numjobs=1 # number of parallel connections
+group_reporting=1
+sync=1 # 1 is the best for latency measurements, 0 for bandwidth
+iodepth=2 # total number of ious
+iodepth_batch_submit=1 # number of ious to be submitted at once
+rw=write # read/write/randread/randwrite/readwrite/rw
+rwmixread=70 # % of a mixed workload that should be reads
+blocksize=4KiB
+ramp_time=15s # gives some time to stabilize the workload
+time_based
+runtime=60s # run the workload for the specified period of time
diff --git a/examples/librpma_apm-server.fio b/examples/librpma_apm-server.fio
new file mode 100644 (file)
index 0000000..062b521
--- /dev/null
@@ -0,0 +1,26 @@
+# Example of the librpma_apm_server job
+
+[global]
+ioengine=librpma_apm_server
+create_serialize=0 # (required) forces specific initiation sequence
+kb_base=1000 # turn on the straight units handling (non-compatibility mode)
+serverip=[serverip] # IP address to listen on
+port=7204 # port(s) the server jobs will listen on, ports <port; port + numjobs - 1> will be used
+thread
+
+# The server side spawns one thread for each expected connection from
+# the client-side, opens and registers the range dedicated for this thread
+# (a workspace) from the provided memory.
+# Each of the server threads accepts a connection on the dedicated port
+# (different for each and every working thread) and waits for it to end up,
+# and closes itself.
+
+[server]
+# set to 1 (true) ONLY when Direct Write to PMem from the remote host is possible
+# (https://pmem.io/rpma/documentation/basic-direct-write-to-pmem.html)
+direct_write_to_pmem=0
+
+numjobs=1 # number of expected incomming connections
+size=100MiB # size of workspace for a single connection
+filename=malloc # device dax or an existing fsdax file or "malloc" for allocation from DRAM
+# filename=/dev/dax1.0
diff --git a/examples/librpma_gpspm-client.fio b/examples/librpma_gpspm-client.fio
new file mode 100644 (file)
index 0000000..843382d
--- /dev/null
@@ -0,0 +1,23 @@
+# Example of the librpma_gpspm_client job
+
+[global]
+ioengine=librpma_gpspm_client
+create_serialize=0 # (required) forces specific initiation sequence
+serverip=[serverip] #IP address the server is listening on
+port=7204 # port(s) the server will listen on, <port; port + numjobs - 1> will be used
+thread
+
+# The client will get a remote memory region description after establishing
+# a connection.
+
+[client]
+numjobs=1 # number of parallel connections
+group_reporting=1
+sync=1 # 1 is the best for latency measurements, 0 for bandwidth
+iodepth=2 # total number of ious
+iodepth_batch_submit=1 # number of ious to be submitted at once
+rw=write # write/randwrite
+blocksize=4KiB
+ramp_time=15s # gives some time to stabilize the workload
+time_based
+runtime=60s # run the workload for the specified period of time
diff --git a/examples/librpma_gpspm-server.fio b/examples/librpma_gpspm-server.fio
new file mode 100644 (file)
index 0000000..d618f2d
--- /dev/null
@@ -0,0 +1,31 @@
+# Example of the librpma_gpspm_server job
+
+[global]
+ioengine=librpma_gpspm_server
+create_serialize=0 # (required) forces specific initiation sequence
+kb_base=1000 # turn on the straight units handling (non-compatibility mode)
+serverip=[serverip] #IP address to listen on
+port=7204 # port(s) the server jobs will listen on, ports <port; port + numjobs - 1> will be used
+thread
+
+# The server side spawns one thread for each expected connection from
+# the client-side, opens and registers the range dedicated for this thread
+# (a workspace) from the provided memory.
+# Each of the server threads accepts a connection on the dedicated port
+# (different for each and every working thread), accepts and executes flush
+# requests, and sends back a flush response for each of the requests.
+# When the client is done it sends the termination notice to the server's thread.
+
+[server]
+# set to 1 (true) ONLY when Direct Write to PMem from the remote host is possible
+# (https://pmem.io/rpma/documentation/basic-direct-write-to-pmem.html)
+direct_write_to_pmem=0
+numjobs=1 # number of expected incomming connections
+iodepth=2 # number of parallel GPSPM requests
+size=100MiB # size of workspace for a single connection
+filename=malloc # device dax or an existing fsdax file or "malloc" for allocation from DRAM
+# filename=/dev/dax1.0
+
+# The client will terminate the server when the client will end up its job.
+time_based
+runtime=365d
diff --git a/fio.1 b/fio.1
index accc6a329a12fa98b65cd7fcd443b06faaef8fcf..e190c241836b743f4875e104d4ae659890e523c5 100644 (file)
--- a/fio.1
+++ b/fio.1
@@ -1949,7 +1949,7 @@ The TCP or UDP port to bind to or connect to. If this is used with
 this will be the starting port number since fio will use a range of
 ports.
 .TP
-.BI (rdma)port
+.BI (rdma, librpma_*)port
 The port to use for RDMA-CM communication. This should be the same
 value on the client and the server side.
 .TP
@@ -1958,6 +1958,12 @@ The hostname or IP address to use for TCP, UDP or RDMA-CM based I/O.
 If the job is a TCP listener or UDP reader, the hostname is not used
 and must be omitted unless it is a valid UDP multicast address.
 .TP
+.BI (librpma_*)serverip \fR=\fPstr
+The IP address to be used for RDMA-CM based I/O.
+.TP
+.BI (librpma_*_server)direct_write_to_pmem \fR=\fPbool
+Set to 1 only when Direct Write to PMem from the remote host is possible. Otherwise, set to 0.
+.TP
 .BI (netsplice,net)interface \fR=\fPstr
 The IP address of the network interface used to send or receive UDP
 multicast.
index 647748963193db4db016b2befde7fa2ec4f3a758..5b6d22a3145430b0949a2cab69fff23949c36a28 100644 (file)
@@ -141,6 +141,10 @@ static const struct opt_group fio_opt_cat_groups[] = {
                .name   = "RDMA I/O engine", /* rdma */
                .mask   = FIO_OPT_G_RDMA,
        },
+       {
+               .name   = "librpma I/O engines", /* librpma_apm && librpma_gpspm */
+               .mask   = FIO_OPT_G_LIBRPMA,
+       },
        {
                .name   = "libaio I/O engine", /* libaio */
                .mask   = FIO_OPT_G_LIBAIO,
index d2f1ceb391c34fd6101cf3e4b6a867b078b178e6..641547d271f6e632cbc45d558afaf58c8cd25b67 100644 (file)
@@ -52,6 +52,7 @@ enum opt_category_group {
        __FIO_OPT_G_E4DEFRAG,
        __FIO_OPT_G_NETIO,
        __FIO_OPT_G_RDMA,
+       __FIO_OPT_G_LIBRPMA,
        __FIO_OPT_G_LIBAIO,
        __FIO_OPT_G_ACT,
        __FIO_OPT_G_LATPROF,
@@ -94,6 +95,7 @@ enum opt_category_group {
        FIO_OPT_G_E4DEFRAG      = (1ULL << __FIO_OPT_G_E4DEFRAG),
        FIO_OPT_G_NETIO         = (1ULL << __FIO_OPT_G_NETIO),
        FIO_OPT_G_RDMA          = (1ULL << __FIO_OPT_G_RDMA),
+       FIO_OPT_G_LIBRPMA       = (1ULL << __FIO_OPT_G_LIBRPMA),
        FIO_OPT_G_LIBAIO        = (1ULL << __FIO_OPT_G_LIBAIO),
        FIO_OPT_G_ACT           = (1ULL << __FIO_OPT_G_ACT),
        FIO_OPT_G_LATPROF       = (1ULL << __FIO_OPT_G_LATPROF),
index e62e0cfb35413a3f59b8c8d3d141a40d6c44f8b7..625112c5345dbb219fb06cdd0aa1e59e9ee92a6c 100644 (file)
--- a/options.c
+++ b/options.c
@@ -1913,6 +1913,16 @@ struct fio_option fio_options[FIO_MAX_OPTS] = {
                            .help = "RDMA IO engine",
                          },
 #endif
+#ifdef CONFIG_LIBRPMA_APM
+                         { .ival = "librpma_apm",
+                           .help = "librpma IO engine in APM mode",
+                         },
+#endif
+#ifdef CONFIG_LIBRPMA_GPSPM
+                         { .ival = "librpma_gpspm",
+                           .help = "librpma IO engine in GPSPM mode",
+                         },
+#endif
 #ifdef CONFIG_LINUX_EXT4_MOVE_EXTENT
                          { .ival = "e4defrag",
                            .help = "ext4 defrag engine",