summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--HOWTO11
-rw-r--r--Makefile15
-rwxr-xr-xci/travis-install-librpma.sh22
-rwxr-xr-xci/travis-install-pmdk.sh28
-rwxr-xr-xci/travis-install.sh10
-rwxr-xr-xconfigure52
-rw-r--r--engines/librpma_apm.c256
-rw-r--r--engines/librpma_fio.c1051
-rw-r--r--engines/librpma_fio.h273
-rw-r--r--engines/librpma_gpspm.c755
-rw-r--r--engines/librpma_gpspm_flush.pb-c.c214
-rw-r--r--engines/librpma_gpspm_flush.pb-c.h120
-rw-r--r--engines/librpma_gpspm_flush.proto15
-rw-r--r--examples/librpma_apm-client.fio24
-rw-r--r--examples/librpma_apm-server.fio26
-rw-r--r--examples/librpma_gpspm-client.fio23
-rw-r--r--examples/librpma_gpspm-server.fio31
-rw-r--r--fio.18
-rw-r--r--optgroup.c4
-rw-r--r--optgroup.h2
-rw-r--r--options.c10
21 files changed, 2948 insertions, 2 deletions
diff --git a/HOWTO b/HOWTO
index 52812cc7..39f8c63d 100644
--- a/HOWTO
+++ b/HOWTO
@@ -2189,7 +2189,7 @@ with the caveat that when used on the command line, they must come after the
this will be the starting port number since fio will use a range of
ports.
- [rdma]
+ [rdma], [librpma_*]
The port to use for RDMA-CM communication. This should be the same value
on the client and the server side.
@@ -2200,6 +2200,15 @@ with the caveat that when used on the command line, they must come after the
is a TCP listener or UDP reader, the hostname is not used and must be omitted
unless it is a valid UDP multicast address.
+.. option:: serverip=str : [librpma_*]
+
+ The IP address to be used for RDMA-CM based I/O.
+
+.. option:: direct_write_to_pmem=bool : [librpma_*]
+
+ Set to 1 only when Direct Write to PMem from the remote host is possible.
+ Otherwise, set to 0.
+
.. option:: interface=str : [netsplice] [net]
The IP address of the network interface used to send or receive UDP
diff --git a/Makefile b/Makefile
index 612344d1..1aa9f377 100644
--- a/Makefile
+++ b/Makefile
@@ -94,6 +94,21 @@ ifdef CONFIG_RDMA
rdma_LIBS = -libverbs -lrdmacm
ENGINES += rdma
endif
+ifdef CONFIG_LIBRPMA_APM
+ librpma_apm_SRCS = engines/librpma_apm.c
+ librpma_fio_SRCS = engines/librpma_fio.c
+ librpma_apm_LIBS = -lrpma -lpmem
+ ENGINES += librpma_apm
+endif
+ifdef CONFIG_LIBRPMA_GPSPM
+ librpma_gpspm_SRCS = engines/librpma_gpspm.c engines/librpma_gpspm_flush.pb-c.c
+ librpma_fio_SRCS = engines/librpma_fio.c
+ librpma_gpspm_LIBS = -lrpma -lpmem -lprotobuf-c
+ ENGINES += librpma_gpspm
+endif
+ifdef librpma_fio_SRCS
+ SOURCE += $(librpma_fio_SRCS)
+endif
ifdef CONFIG_POSIXAIO
SOURCE += engines/posixaio.c
endif
diff --git a/ci/travis-install-librpma.sh b/ci/travis-install-librpma.sh
new file mode 100755
index 00000000..b127f3f5
--- /dev/null
+++ b/ci/travis-install-librpma.sh
@@ -0,0 +1,22 @@
+#!/bin/bash -e
+
+# 11.02.2021 Merge pull request #866 from ldorau/rpma-mmap-memory-for-rpma_mr_reg-in-rpma_flush_apm_new
+LIBRPMA_VERSION=fbac593917e98f3f26abf14f4fad5a832b330f5c
+ZIP_FILE=rpma.zip
+
+WORKDIR=$(pwd)
+
+# install librpma
+wget -O $ZIP_FILE https://github.com/pmem/rpma/archive/${LIBRPMA_VERSION}.zip
+unzip $ZIP_FILE
+mkdir -p rpma-${LIBRPMA_VERSION}/build
+cd rpma-${LIBRPMA_VERSION}/build
+cmake .. -DCMAKE_BUILD_TYPE=Release \
+ -DCMAKE_INSTALL_PREFIX=/usr \
+ -DBUILD_DOC=OFF \
+ -DBUILD_EXAMPLES=OFF \
+ -DBUILD_TESTS=OFF
+make -j$(nproc)
+sudo make -j$(nproc) install
+cd $WORKDIR
+rm -rf $ZIP_FILE rpma-${LIBRPMA_VERSION}
diff --git a/ci/travis-install-pmdk.sh b/ci/travis-install-pmdk.sh
new file mode 100755
index 00000000..803438f8
--- /dev/null
+++ b/ci/travis-install-pmdk.sh
@@ -0,0 +1,28 @@
+#!/bin/bash -e
+
+# pmdk v1.9.1 release
+PMDK_VERSION=1.9.1
+
+WORKDIR=$(pwd)
+
+#
+# The '/bin/sh' shell used by PMDK's 'make install'
+# does not know the exact localization of clang
+# and fails with:
+# /bin/sh: 1: clang: not found
+# if CC is not set to the full path of clang.
+#
+export CC=$(which $CC)
+
+# Install PMDK libraries, because PMDK's libpmem
+# is a dependency of the librpma fio engine.
+# Install it from a release package
+# with already generated documentation,
+# in order to not install 'pandoc'.
+wget https://github.com/pmem/pmdk/releases/download/${PMDK_VERSION}/pmdk-${PMDK_VERSION}.tar.gz
+tar -xzf pmdk-${PMDK_VERSION}.tar.gz
+cd pmdk-${PMDK_VERSION}
+make -j$(nproc) NDCTL_ENABLE=n
+sudo make -j$(nproc) install prefix=/usr NDCTL_ENABLE=n
+cd $WORKDIR
+rm -rf pmdk-${PMDK_VERSION}
diff --git a/ci/travis-install.sh b/ci/travis-install.sh
index 103695dc..4c4c04c5 100755
--- a/ci/travis-install.sh
+++ b/ci/travis-install.sh
@@ -43,6 +43,16 @@ case "$TRAVIS_OS_NAME" in
)
sudo apt-get -qq update
sudo apt-get install --no-install-recommends -qq -y "${pkgs[@]}"
+ # librpma is supported on the amd64 (x86_64) architecture for now
+ if [[ $CI_TARGET_ARCH == "amd64" ]]; then
+ # install libprotobuf-c-dev required by librpma_gpspm
+ sudo apt-get install --no-install-recommends -qq -y libprotobuf-c-dev
+ # PMDK libraries have to be installed, because
+ # libpmem is a dependency of the librpma fio engine
+ ci/travis-install-pmdk.sh
+ # install librpma from sources from GitHub
+ ci/travis-install-librpma.sh
+ fi
;;
"osx")
brew update >/dev/null 2>&1
diff --git a/configure b/configure
index 748f7014..1bbdb8c4 100755
--- a/configure
+++ b/configure
@@ -921,6 +921,49 @@ fi
print_config "rdmacm" "$rdmacm"
##########################################
+# librpma probe
+if test "$librpma" != "yes" ; then
+ librpma="no"
+fi
+cat > $TMPC << EOF
+#include <stdio.h>
+#include <librpma.h>
+int main(int argc, char **argv)
+{
+ enum rpma_conn_event event = RPMA_CONN_REJECTED;
+ (void) event; /* unused */
+ rpma_log_set_threshold(RPMA_LOG_THRESHOLD, RPMA_LOG_LEVEL_INFO);
+ return 0;
+}
+EOF
+if test "$disable_rdma" != "yes" && compile_prog "" "-lrpma" "rpma"; then
+ librpma="yes"
+fi
+print_config "librpma" "$librpma"
+
+##########################################
+# libprotobuf-c probe
+if test "$libprotobuf_c" != "yes" ; then
+ libprotobuf_c="no"
+fi
+cat > $TMPC << EOF
+#include <stdio.h>
+#include <protobuf-c/protobuf-c.h>
+#if !defined(PROTOBUF_C_VERSION_NUMBER)
+# error PROTOBUF_C_VERSION_NUMBER is not defined!
+#endif
+int main(int argc, char **argv)
+{
+ (void)protobuf_c_message_check(NULL);
+ return 0;
+}
+EOF
+if compile_prog "" "-lprotobuf-c" "protobuf_c"; then
+ libprotobuf_c="yes"
+fi
+print_config "libprotobuf_c" "$libprotobuf_c"
+
+##########################################
# asprintf() and vasprintf() probes
if test "$have_asprintf" != "yes" ; then
have_asprintf="no"
@@ -2788,6 +2831,15 @@ fi
if test "$libverbs" = "yes" -a "$rdmacm" = "yes" ; then
output_sym "CONFIG_RDMA"
fi
+# librpma is supported on the 'x86_64' architecture for now
+if test "$cpu" = "x86_64" -a "$libverbs" = "yes" -a "$rdmacm" = "yes" \
+ -a "$librpma" = "yes" -a "$libpmem" = "yes" ; then
+ output_sym "CONFIG_LIBRPMA_APM"
+fi
+if test "$cpu" = "x86_64" -a "$libverbs" = "yes" -a "$rdmacm" = "yes" \
+ -a "$librpma" = "yes" -a "$libpmem" = "yes" -a "$libprotobuf_c" = "yes" ; then
+ output_sym "CONFIG_LIBRPMA_GPSPM"
+fi
if test "$clock_gettime" = "yes" ; then
output_sym "CONFIG_CLOCK_GETTIME"
fi
diff --git a/engines/librpma_apm.c b/engines/librpma_apm.c
new file mode 100644
index 00000000..ffa3769d
--- /dev/null
+++ b/engines/librpma_apm.c
@@ -0,0 +1,256 @@
+/*
+* librpma_apm: IO engine that uses PMDK librpma to read and write data,
+ * based on Appliance Persistency Method
+ *
+ * Copyright 2020-2021, Intel Corporation
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License,
+ * version 2 as published by the Free Software Foundation..
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ */
+
+#include "librpma_fio.h"
+
+/* client side implementation */
+
+static inline int client_io_flush(struct thread_data *td,
+ struct io_u *first_io_u, struct io_u *last_io_u,
+ unsigned long long int len);
+
+static int client_get_io_u_index(struct rpma_completion *cmpl,
+ unsigned int *io_u_index);
+
+static int client_init(struct thread_data *td)
+{
+ struct librpma_fio_client_data *ccd;
+ unsigned int sq_size;
+ uint32_t cq_size;
+ struct rpma_conn_cfg *cfg = NULL;
+ struct rpma_peer_cfg *pcfg = NULL;
+ int ret;
+
+ /* not supported readwrite = trim / randtrim / trimwrite */
+ if (td_trim(td)) {
+ td_verror(td, EINVAL, "Not supported mode.");
+ return -1;
+ }
+
+ /*
+ * Calculate the required queue sizes where:
+ * - the send queue (SQ) has to be big enough to accommodate
+ * all io_us (WRITEs) and all flush requests (FLUSHes)
+ * - the completion queue (CQ) has to be big enough to accommodate all
+ * success and error completions (cq_size = sq_size)
+ */
+ if (td_random(td) || td_rw(td)) {
+ /*
+ * sq_size = max(rand_read_sq_size, rand_write_sq_size)
+ * where rand_read_sq_size < rand_write_sq_size because read
+ * does not require flush afterwards
+ * rand_write_sq_size = N * (WRITE + FLUSH)
+ *
+ * Note: rw is no different from random write since having
+ * interleaved reads with writes in extreme forces you to flush
+ * as often as when the writes are random.
+ */
+ sq_size = 2 * td->o.iodepth;
+ } else if (td_write(td)) {
+ /* sequential TD_DDIR_WRITE only */
+ if (td->o.sync_io) {
+ sq_size = 2; /* WRITE + FLUSH */
+ } else {
+ /*
+ * N * WRITE + B * FLUSH where:
+ * - B == ceil(iodepth / iodepth_batch)
+ * which is the number of batches for N writes
+ */
+ sq_size = td->o.iodepth + LIBRPMA_FIO_CEIL(td->o.iodepth,
+ td->o.iodepth_batch);
+ }
+ } else {
+ /* TD_DDIR_READ only */
+ if (td->o.sync_io) {
+ sq_size = 1; /* READ */
+ } else {
+ sq_size = td->o.iodepth; /* N x READ */
+ }
+ }
+ cq_size = sq_size;
+
+ /* create a connection configuration object */
+ if ((ret = rpma_conn_cfg_new(&cfg))) {
+ librpma_td_verror(td, ret, "rpma_conn_cfg_new");
+ return -1;
+ }
+
+ /* apply queue sizes */
+ if ((ret = rpma_conn_cfg_set_sq_size(cfg, sq_size))) {
+ librpma_td_verror(td, ret, "rpma_conn_cfg_set_sq_size");
+ goto err_cfg_delete;
+ }
+ if ((ret = rpma_conn_cfg_set_cq_size(cfg, cq_size))) {
+ librpma_td_verror(td, ret, "rpma_conn_cfg_set_cq_size");
+ goto err_cfg_delete;
+ }
+
+ if (librpma_fio_client_init(td, cfg))
+ goto err_cfg_delete;
+
+ ccd = td->io_ops_data;
+
+ if (ccd->server_mr_flush_type == RPMA_FLUSH_TYPE_PERSISTENT) {
+ if (!ccd->ws->direct_write_to_pmem) {
+ if (td->thread_number == 1)
+ log_err(
+ "Fio librpma engine will not work until the Direct Write to PMem on the server side is possible (direct_write_to_pmem)\n");
+ goto err_cleanup_common;
+ }
+
+ /* configure peer's direct write to pmem support */
+ if ((ret = rpma_peer_cfg_new(&pcfg))) {
+ librpma_td_verror(td, ret, "rpma_peer_cfg_new");
+ goto err_cleanup_common;
+ }
+
+ if ((ret = rpma_peer_cfg_set_direct_write_to_pmem(pcfg, true))) {
+ librpma_td_verror(td, ret,
+ "rpma_peer_cfg_set_direct_write_to_pmem");
+ (void) rpma_peer_cfg_delete(&pcfg);
+ goto err_cleanup_common;
+ }
+
+ if ((ret = rpma_conn_apply_remote_peer_cfg(ccd->conn, pcfg))) {
+ librpma_td_verror(td, ret,
+ "rpma_conn_apply_remote_peer_cfg");
+ (void) rpma_peer_cfg_delete(&pcfg);
+ goto err_cleanup_common;
+ }
+
+ (void) rpma_peer_cfg_delete(&pcfg);
+ } else if (td->thread_number == 1) {
+ /* XXX log_info mixes with the JSON output */
+ log_err(
+ "Note: Direct Write to PMem is not supported by default nor required if you use DRAM instead of PMem on the server side (direct_write_to_pmem).\n"
+ "Remember that flushing to DRAM does not make your data persistent and may be used only for experimental purposes.\n");
+ }
+
+ if ((ret = rpma_conn_cfg_delete(&cfg))) {
+ librpma_td_verror(td, ret, "rpma_conn_cfg_delete");
+ /* non fatal error - continue */
+ }
+
+ ccd->flush = client_io_flush;
+ ccd->get_io_u_index = client_get_io_u_index;
+
+ return 0;
+
+err_cleanup_common:
+ librpma_fio_client_cleanup(td);
+
+err_cfg_delete:
+ (void) rpma_conn_cfg_delete(&cfg);
+
+ return -1;
+}
+
+static void client_cleanup(struct thread_data *td)
+{
+ struct librpma_fio_client_data *ccd = td->io_ops_data;
+
+ if (ccd == NULL)
+ return;
+
+ free(ccd->client_data);
+
+ librpma_fio_client_cleanup(td);
+}
+
+static inline int client_io_flush(struct thread_data *td,
+ struct io_u *first_io_u, struct io_u *last_io_u,
+ unsigned long long int len)
+{
+ struct librpma_fio_client_data *ccd = td->io_ops_data;
+ size_t dst_offset = first_io_u->offset;
+ int ret;
+
+ if ((ret = rpma_flush(ccd->conn, ccd->server_mr, dst_offset, len,
+ ccd->server_mr_flush_type, RPMA_F_COMPLETION_ALWAYS,
+ (void *)(uintptr_t)last_io_u->index))) {
+ librpma_td_verror(td, ret, "rpma_flush");
+ return -1;
+ }
+
+ return 0;
+}
+
+static int client_get_io_u_index(struct rpma_completion *cmpl,
+ unsigned int *io_u_index)
+{
+ memcpy(io_u_index, &cmpl->op_context, sizeof(*io_u_index));
+
+ return 1;
+}
+
+FIO_STATIC struct ioengine_ops ioengine_client = {
+ .name = "librpma_apm_client",
+ .version = FIO_IOOPS_VERSION,
+ .init = client_init,
+ .post_init = librpma_fio_client_post_init,
+ .get_file_size = librpma_fio_client_get_file_size,
+ .open_file = librpma_fio_file_nop,
+ .queue = librpma_fio_client_queue,
+ .commit = librpma_fio_client_commit,
+ .getevents = librpma_fio_client_getevents,
+ .event = librpma_fio_client_event,
+ .errdetails = librpma_fio_client_errdetails,
+ .close_file = librpma_fio_file_nop,
+ .cleanup = client_cleanup,
+ .flags = FIO_DISKLESSIO,
+ .options = librpma_fio_options,
+ .option_struct_size = sizeof(struct librpma_fio_options_values),
+};
+
+/* server side implementation */
+
+static int server_open_file(struct thread_data *td, struct fio_file *f)
+{
+ return librpma_fio_server_open_file(td, f, NULL);
+}
+
+static enum fio_q_status server_queue(struct thread_data *td, struct io_u *io_u)
+{
+ return FIO_Q_COMPLETED;
+}
+
+FIO_STATIC struct ioengine_ops ioengine_server = {
+ .name = "librpma_apm_server",
+ .version = FIO_IOOPS_VERSION,
+ .init = librpma_fio_server_init,
+ .open_file = server_open_file,
+ .close_file = librpma_fio_server_close_file,
+ .queue = server_queue,
+ .invalidate = librpma_fio_file_nop,
+ .cleanup = librpma_fio_server_cleanup,
+ .flags = FIO_SYNCIO,
+ .options = librpma_fio_options,
+ .option_struct_size = sizeof(struct librpma_fio_options_values),
+};
+
+/* register both engines */
+
+static void fio_init fio_librpma_apm_register(void)
+{
+ register_ioengine(&ioengine_client);
+ register_ioengine(&ioengine_server);
+}
+
+static void fio_exit fio_librpma_apm_unregister(void)
+{
+ unregister_ioengine(&ioengine_client);
+ unregister_ioengine(&ioengine_server);
+}
diff --git a/engines/librpma_fio.c b/engines/librpma_fio.c
new file mode 100644
index 00000000..810b55e2
--- /dev/null
+++ b/engines/librpma_fio.c
@@ -0,0 +1,1051 @@
+/*
+ * librpma_fio: librpma_apm and librpma_gpspm engines' common part.
+ *
+ * Copyright 2021, Intel Corporation
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License,
+ * version 2 as published by the Free Software Foundation..
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ */
+
+#include "librpma_fio.h"
+
+#include <libpmem.h>
+
+struct fio_option librpma_fio_options[] = {
+ {
+ .name = "serverip",
+ .lname = "rpma_server_ip",
+ .type = FIO_OPT_STR_STORE,
+ .off1 = offsetof(struct librpma_fio_options_values, server_ip),
+ .help = "IP address the server is listening on",
+ .def = "",
+ .category = FIO_OPT_C_ENGINE,
+ .group = FIO_OPT_G_LIBRPMA,
+ },
+ {
+ .name = "port",
+ .lname = "rpma_server port",
+ .type = FIO_OPT_STR_STORE,
+ .off1 = offsetof(struct librpma_fio_options_values, port),
+ .help = "port the server is listening on",
+ .def = "7204",
+ .category = FIO_OPT_C_ENGINE,
+ .group = FIO_OPT_G_LIBRPMA,
+ },
+ {
+ .name = "direct_write_to_pmem",
+ .lname = "Direct Write to PMem (via RDMA) from the remote host is possible",
+ .type = FIO_OPT_BOOL,
+ .off1 = offsetof(struct librpma_fio_options_values,
+ direct_write_to_pmem),
+ .help = "Set to true ONLY when Direct Write to PMem from the remote host is possible (https://pmem.io/rpma/documentation/basic-direct-write-to-pmem.html)",
+ .def = "",
+ .category = FIO_OPT_C_ENGINE,
+ .group = FIO_OPT_G_LIBRPMA,
+ },
+ {
+ .name = NULL,
+ },
+};
+
+int librpma_fio_td_port(const char *port_base_str, struct thread_data *td,
+ char *port_out)
+{
+ unsigned long int port_ul = strtoul(port_base_str, NULL, 10);
+ unsigned int port_new;
+
+ port_out[0] = '\0';
+
+ if (port_ul == ULONG_MAX) {
+ td_verror(td, errno, "strtoul");
+ return -1;
+ }
+ port_ul += td->thread_number - 1;
+ if (port_ul >= UINT_MAX) {
+ log_err("[%u] port number (%lu) bigger than UINT_MAX\n",
+ td->thread_number, port_ul);
+ return -1;
+ }
+
+ port_new = port_ul;
+ snprintf(port_out, LIBRPMA_FIO_PORT_STR_LEN_MAX - 1, "%u", port_new);
+
+ return 0;
+}
+
+char *librpma_fio_allocate_dram(struct thread_data *td, size_t size,
+ struct librpma_fio_mem *mem)
+{
+ char *mem_ptr = NULL;
+ int ret;
+
+ if ((ret = posix_memalign((void **)&mem_ptr, page_size, size))) {
+ log_err("fio: posix_memalign() failed\n");
+ td_verror(td, ret, "posix_memalign");
+ return NULL;
+ }
+
+ mem->mem_ptr = mem_ptr;
+ mem->size_mmap = 0;
+
+ return mem_ptr;
+}
+
+char *librpma_fio_allocate_pmem(struct thread_data *td, const char *filename,
+ size_t size, struct librpma_fio_mem *mem)
+{
+ size_t size_mmap = 0;
+ char *mem_ptr = NULL;
+ int is_pmem = 0;
+ size_t ws_offset;
+
+ if (size % page_size) {
+ log_err("fio: size (%zu) is not aligned to page size (%zu)\n",
+ size, page_size);
+ return NULL;
+ }
+
+ ws_offset = (td->thread_number - 1) * size;
+
+ if (!filename) {
+ log_err("fio: filename is not set\n");
+ return NULL;
+ }
+
+ /* map the file */
+ mem_ptr = pmem_map_file(filename, 0 /* len */, 0 /* flags */,
+ 0 /* mode */, &size_mmap, &is_pmem);
+ if (mem_ptr == NULL) {
+ log_err("fio: pmem_map_file(%s) failed\n", filename);
+ /* pmem_map_file() sets errno on failure */
+ td_verror(td, errno, "pmem_map_file");
+ return NULL;
+ }
+
+ /* pmem is expected */
+ if (!is_pmem) {
+ log_err("fio: %s is not located in persistent memory\n",
+ filename);
+ goto err_unmap;
+ }
+
+ /* check size of allocated persistent memory */
+ if (size_mmap < ws_offset + size) {
+ log_err(
+ "fio: %s is too small to handle so many threads (%zu < %zu)\n",
+ filename, size_mmap, ws_offset + size);
+ goto err_unmap;
+ }
+
+ log_info("fio: size of memory mapped from the file %s: %zu\n",
+ filename, size_mmap);
+
+ mem->mem_ptr = mem_ptr;
+ mem->size_mmap = size_mmap;
+
+ return mem_ptr + ws_offset;
+
+err_unmap:
+ (void) pmem_unmap(mem_ptr, size_mmap);
+ return NULL;
+}
+
+void librpma_fio_free(struct librpma_fio_mem *mem)
+{
+ if (mem->size_mmap)
+ (void) pmem_unmap(mem->mem_ptr, mem->size_mmap);
+ else
+ free(mem->mem_ptr);
+}
+
+#define LIBRPMA_FIO_RETRY_MAX_NO 10
+#define LIBRPMA_FIO_RETRY_DELAY_S 5
+
+int librpma_fio_client_init(struct thread_data *td,
+ struct rpma_conn_cfg *cfg)
+{
+ struct librpma_fio_client_data *ccd;
+ struct librpma_fio_options_values *o = td->eo;
+ struct ibv_context *dev = NULL;
+ char port_td[LIBRPMA_FIO_PORT_STR_LEN_MAX];
+ struct rpma_conn_req *req = NULL;
+ enum rpma_conn_event event;
+ struct rpma_conn_private_data pdata;
+ enum rpma_log_level log_level_aux = RPMA_LOG_LEVEL_WARNING;
+ int remote_flush_type;
+ int retry;
+ int ret;
+
+ /* --debug=net sets RPMA_LOG_THRESHOLD_AUX to RPMA_LOG_LEVEL_INFO */
+#ifdef FIO_INC_DEBUG
+ if ((1UL << FD_NET) & fio_debug)
+ log_level_aux = RPMA_LOG_LEVEL_INFO;
+#endif
+
+ /* configure logging thresholds to see more details */
+ rpma_log_set_threshold(RPMA_LOG_THRESHOLD, RPMA_LOG_LEVEL_INFO);
+ rpma_log_set_threshold(RPMA_LOG_THRESHOLD_AUX, log_level_aux);
+
+ /* obtain an IBV context for a remote IP address */
+ if ((ret = rpma_utils_get_ibv_context(o->server_ip,
+ RPMA_UTIL_IBV_CONTEXT_REMOTE, &dev))) {
+ librpma_td_verror(td, ret, "rpma_utils_get_ibv_context");
+ return -1;
+ }
+
+ /* allocate client's data */
+ ccd = calloc(1, sizeof(*ccd));
+ if (ccd == NULL) {
+ td_verror(td, errno, "calloc");
+ return -1;
+ }
+
+ /* allocate all in-memory queues */
+ ccd->io_us_queued = calloc(td->o.iodepth, sizeof(*ccd->io_us_queued));
+ if (ccd->io_us_queued == NULL) {
+ td_verror(td, errno, "calloc");
+ goto err_free_ccd;
+ }
+
+ ccd->io_us_flight = calloc(td->o.iodepth, sizeof(*ccd->io_us_flight));
+ if (ccd->io_us_flight == NULL) {
+ td_verror(td, errno, "calloc");
+ goto err_free_io_u_queues;
+ }
+
+ ccd->io_us_completed = calloc(td->o.iodepth,
+ sizeof(*ccd->io_us_completed));
+ if (ccd->io_us_completed == NULL) {
+ td_verror(td, errno, "calloc");
+ goto err_free_io_u_queues;
+ }
+
+ /* create a new peer object */
+ if ((ret = rpma_peer_new(dev, &ccd->peer))) {
+ librpma_td_verror(td, ret, "rpma_peer_new");
+ goto err_free_io_u_queues;
+ }
+
+ /* create a connection request */
+ if (librpma_fio_td_port(o->port, td, port_td))
+ goto err_peer_delete;
+
+ for (retry = 0; retry < LIBRPMA_FIO_RETRY_MAX_NO; retry++) {
+ if ((ret = rpma_conn_req_new(ccd->peer, o->server_ip, port_td,
+ cfg, &req))) {
+ librpma_td_verror(td, ret, "rpma_conn_req_new");
+ goto err_peer_delete;
+ }
+
+ /*
+ * Connect the connection request
+ * and obtain the connection object.
+ */
+ if ((ret = rpma_conn_req_connect(&req, NULL, &ccd->conn))) {
+ librpma_td_verror(td, ret, "rpma_conn_req_connect");
+ goto err_req_delete;
+ }
+
+ /* wait for the connection to establish */
+ if ((ret = rpma_conn_next_event(ccd->conn, &event))) {
+ librpma_td_verror(td, ret, "rpma_conn_next_event");
+ goto err_conn_delete;
+ } else if (event == RPMA_CONN_ESTABLISHED) {
+ break;
+ } else if (event == RPMA_CONN_REJECTED) {
+ (void) rpma_conn_disconnect(ccd->conn);
+ (void) rpma_conn_delete(&ccd->conn);
+ if (retry < LIBRPMA_FIO_RETRY_MAX_NO - 1) {
+ log_err("Thread [%d]: Retrying (#%i) ...\n",
+ td->thread_number, retry + 1);
+ sleep(LIBRPMA_FIO_RETRY_DELAY_S);
+ } else {
+ log_err(
+ "Thread [%d]: The maximum number of retries exceeded. Closing.\n",
+ td->thread_number);
+ }
+ } else {
+ log_err(
+ "rpma_conn_next_event returned an unexptected event: (%s != RPMA_CONN_ESTABLISHED)\n",
+ rpma_utils_conn_event_2str(event));
+ goto err_conn_delete;
+ }
+ }
+
+ if (retry > 0)
+ log_err("Thread [%d]: Connected after retry #%i\n",
+ td->thread_number, retry);
+
+ if (ccd->conn == NULL)
+ goto err_peer_delete;
+
+ /* get the connection's private data sent from the server */
+ if ((ret = rpma_conn_get_private_data(ccd->conn, &pdata))) {
+ librpma_td_verror(td, ret, "rpma_conn_get_private_data");
+ goto err_conn_delete;
+ }
+
+ /* get the server's workspace representation */
+ ccd->ws = pdata.ptr;
+
+ /* create the server's memory representation */
+ if ((ret = rpma_mr_remote_from_descriptor(&ccd->ws->descriptor[0],
+ ccd->ws->mr_desc_size, &ccd->server_mr))) {
+ librpma_td_verror(td, ret, "rpma_mr_remote_from_descriptor");
+ goto err_conn_delete;
+ }
+
+ /* get the total size of the shared server memory */
+ if ((ret = rpma_mr_remote_get_size(ccd->server_mr, &ccd->ws_size))) {
+ librpma_td_verror(td, ret, "rpma_mr_remote_get_size");
+ goto err_conn_delete;
+ }
+
+ /* get flush type of the remote node */
+ if ((ret = rpma_mr_remote_get_flush_type(ccd->server_mr,
+ &remote_flush_type))) {
+ librpma_td_verror(td, ret, "rpma_mr_remote_get_flush_type");
+ goto err_conn_delete;
+ }
+
+ ccd->server_mr_flush_type =
+ (remote_flush_type & RPMA_MR_USAGE_FLUSH_TYPE_PERSISTENT) ?
+ RPMA_FLUSH_TYPE_PERSISTENT : RPMA_FLUSH_TYPE_VISIBILITY;
+
+ /*
+ * Assure an io_us buffer allocation is page-size-aligned which is required
+ * to register for RDMA. User-provided value is intentionally ignored.
+ */
+ td->o.mem_align = page_size;
+
+ td->io_ops_data = ccd;
+
+ return 0;
+
+err_conn_delete:
+ (void) rpma_conn_disconnect(ccd->conn);
+ (void) rpma_conn_delete(&ccd->conn);
+
+err_req_delete:
+ (void) rpma_conn_req_delete(&req);
+
+err_peer_delete:
+ (void) rpma_peer_delete(&ccd->peer);
+
+err_free_io_u_queues:
+ free(ccd->io_us_queued);
+ free(ccd->io_us_flight);
+ free(ccd->io_us_completed);
+
+err_free_ccd:
+ free(ccd);
+
+ return -1;
+}
+
+void librpma_fio_client_cleanup(struct thread_data *td)
+{
+ struct librpma_fio_client_data *ccd = td->io_ops_data;
+ enum rpma_conn_event ev;
+ int ret;
+
+ if (ccd == NULL)
+ return;
+
+ /* delete the iou's memory registration */
+ if ((ret = rpma_mr_dereg(&ccd->orig_mr)))
+ librpma_td_verror(td, ret, "rpma_mr_dereg");
+ /* delete the iou's memory registration */
+ if ((ret = rpma_mr_remote_delete(&ccd->server_mr)))
+ librpma_td_verror(td, ret, "rpma_mr_remote_delete");
+ /* initiate disconnection */
+ if ((ret = rpma_conn_disconnect(ccd->conn)))
+ librpma_td_verror(td, ret, "rpma_conn_disconnect");
+ /* wait for disconnection to end up */
+ if ((ret = rpma_conn_next_event(ccd->conn, &ev))) {
+ librpma_td_verror(td, ret, "rpma_conn_next_event");
+ } else if (ev != RPMA_CONN_CLOSED) {
+ log_err(
+ "client_cleanup received an unexpected event (%s != RPMA_CONN_CLOSED)\n",
+ rpma_utils_conn_event_2str(ev));
+ }
+ /* delete the connection */
+ if ((ret = rpma_conn_delete(&ccd->conn)))
+ librpma_td_verror(td, ret, "rpma_conn_delete");
+ /* delete the peer */
+ if ((ret = rpma_peer_delete(&ccd->peer)))
+ librpma_td_verror(td, ret, "rpma_peer_delete");
+ /* free the software queues */
+ free(ccd->io_us_queued);
+ free(ccd->io_us_flight);
+ free(ccd->io_us_completed);
+ free(ccd);
+ td->io_ops_data = NULL; /* zero ccd */
+}
+
+int librpma_fio_file_nop(struct thread_data *td, struct fio_file *f)
+{
+ /* NOP */
+ return 0;
+}
+
+int librpma_fio_client_post_init(struct thread_data *td)
+{
+ struct librpma_fio_client_data *ccd = td->io_ops_data;
+ size_t io_us_size;
+ int ret;
+
+ /*
+ * td->orig_buffer is not aligned. The engine requires aligned io_us
+ * so FIO alignes up the address using the formula below.
+ */
+ ccd->orig_buffer_aligned = PTR_ALIGN(td->orig_buffer, page_mask) +
+ td->o.mem_align;
+
+ /*
+ * td->orig_buffer_size beside the space really consumed by io_us
+ * has paddings which can be omitted for the memory registration.
+ */
+ io_us_size = (unsigned long long)td_max_bs(td) *
+ (unsigned long long)td->o.iodepth;
+
+ if ((ret = rpma_mr_reg(ccd->peer, ccd->orig_buffer_aligned, io_us_size,
+ RPMA_MR_USAGE_READ_DST | RPMA_MR_USAGE_READ_SRC |
+ RPMA_MR_USAGE_WRITE_DST | RPMA_MR_USAGE_WRITE_SRC |
+ RPMA_MR_USAGE_FLUSH_TYPE_PERSISTENT, &ccd->orig_mr)))
+ librpma_td_verror(td, ret, "rpma_mr_reg");
+ return ret;
+}
+
+int librpma_fio_client_get_file_size(struct thread_data *td,
+ struct fio_file *f)
+{
+ struct librpma_fio_client_data *ccd = td->io_ops_data;
+
+ f->real_file_size = ccd->ws_size;
+ fio_file_set_size_known(f);
+
+ return 0;
+}
+
+static enum fio_q_status client_queue_sync(struct thread_data *td,
+ struct io_u *io_u)
+{
+ struct librpma_fio_client_data *ccd = td->io_ops_data;
+ struct rpma_completion cmpl;
+ unsigned io_u_index;
+ int ret;
+
+ /* execute io_u */
+ if (io_u->ddir == DDIR_READ) {
+ /* post an RDMA read operation */
+ if (librpma_fio_client_io_read(td, io_u,
+ RPMA_F_COMPLETION_ALWAYS))
+ goto err;
+ } else if (io_u->ddir == DDIR_WRITE) {
+ /* post an RDMA write operation */
+ if (librpma_fio_client_io_write(td, io_u))
+ goto err;
+ if (ccd->flush(td, io_u, io_u, io_u->xfer_buflen))
+ goto err;
+ } else {
+ log_err("unsupported IO mode: %s\n", io_ddir_name(io_u->ddir));
+ goto err;
+ }
+
+ do {
+ /* get a completion */
+ ret = rpma_conn_completion_get(ccd->conn, &cmpl);
+ if (ret == RPMA_E_NO_COMPLETION) {
+ /* lack of completion is not an error */
+ continue;
+ } else if (ret != 0) {
+ /* an error occurred */
+ librpma_td_verror(td, ret, "rpma_conn_completion_get");
+ goto err;
+ }
+
+ /* if io_us has completed with an error */
+ if (cmpl.op_status != IBV_WC_SUCCESS)
+ goto err;
+
+ if (cmpl.op == RPMA_OP_SEND)
+ ++ccd->op_send_completed;
+ else {
+ if (cmpl.op == RPMA_OP_RECV)
+ ++ccd->op_recv_completed;
+
+ break;
+ }
+ } while (1);
+
+ if (ccd->get_io_u_index(&cmpl, &io_u_index) != 1)
+ goto err;
+
+ if (io_u->index != io_u_index) {
+ log_err(
+ "no matching io_u for received completion found (io_u_index=%u)\n",
+ io_u_index);
+ goto err;
+ }
+
+ /* make sure all SENDs are completed before exit - clean up SQ */
+ if (librpma_fio_client_io_complete_all_sends(td))
+ goto err;
+
+ return FIO_Q_COMPLETED;
+
+err:
+ io_u->error = -1;
+ return FIO_Q_COMPLETED;
+}
+
+enum fio_q_status librpma_fio_client_queue(struct thread_data *td,
+ struct io_u *io_u)
+{
+ struct librpma_fio_client_data *ccd = td->io_ops_data;
+
+ if (ccd->io_u_queued_nr == (int)td->o.iodepth)
+ return FIO_Q_BUSY;
+
+ if (td->o.sync_io)
+ return client_queue_sync(td, io_u);
+
+ /* io_u -> queued[] */
+ ccd->io_us_queued[ccd->io_u_queued_nr] = io_u;
+ ccd->io_u_queued_nr++;
+
+ return FIO_Q_QUEUED;
+}
+
+int librpma_fio_client_commit(struct thread_data *td)
+{
+ struct librpma_fio_client_data *ccd = td->io_ops_data;
+ int flags = RPMA_F_COMPLETION_ON_ERROR;
+ struct timespec now;
+ bool fill_time;
+ int i;
+ struct io_u *flush_first_io_u = NULL;
+ unsigned long long int flush_len = 0;
+
+ if (!ccd->io_us_queued)
+ return -1;
+
+ /* execute all io_us from queued[] */
+ for (i = 0; i < ccd->io_u_queued_nr; i++) {
+ struct io_u *io_u = ccd->io_us_queued[i];
+
+ if (io_u->ddir == DDIR_READ) {
+ if (i + 1 == ccd->io_u_queued_nr ||
+ ccd->io_us_queued[i + 1]->ddir == DDIR_WRITE)
+ flags = RPMA_F_COMPLETION_ALWAYS;
+ /* post an RDMA read operation */
+ if (librpma_fio_client_io_read(td, io_u, flags))
+ return -1;
+ } else if (io_u->ddir == DDIR_WRITE) {
+ /* post an RDMA write operation */
+ if (librpma_fio_client_io_write(td, io_u))
+ return -1;
+
+ /* cache the first io_u in the sequence */
+ if (flush_first_io_u == NULL)
+ flush_first_io_u = io_u;
+
+ /*
+ * the flush length is the sum of all io_u's creating
+ * the sequence
+ */
+ flush_len += io_u->xfer_buflen;
+
+ /*
+ * if io_u's are random the rpma_flush is required
+ * after each one of them
+ */
+ if (!td_random(td)) {
+ /*
+ * When the io_u's are sequential and
+ * the current io_u is not the last one and
+ * the next one is also a write operation
+ * the flush can be postponed by one io_u and
+ * cover all of them which build a continuous
+ * sequence.
+ */
+ if ((i + 1 < ccd->io_u_queued_nr) &&
+ (ccd->io_us_queued[i + 1]->ddir == DDIR_WRITE))
+ continue;
+ }
+
+ /* flush all writes which build a continuous sequence */
+ if (ccd->flush(td, flush_first_io_u, io_u, flush_len))
+ return -1;
+
+ /*
+ * reset the flush parameters in preparation for
+ * the next one
+ */
+ flush_first_io_u = NULL;
+ flush_len = 0;
+ } else {
+ log_err("unsupported IO mode: %s\n",
+ io_ddir_name(io_u->ddir));
+ return -1;
+ }
+ }
+
+ if ((fill_time = fio_fill_issue_time(td)))
+ fio_gettime(&now, NULL);
+
+ /* move executed io_us from queued[] to flight[] */
+ for (i = 0; i < ccd->io_u_queued_nr; i++) {
+ struct io_u *io_u = ccd->io_us_queued[i];
+
+ /* FIO does not do this if the engine is asynchronous */
+ if (fill_time)
+ memcpy(&io_u->issue_time, &now, sizeof(now));
+
+ /* move executed io_us from queued[] to flight[] */
+ ccd->io_us_flight[ccd->io_u_flight_nr] = io_u;
+ ccd->io_u_flight_nr++;
+
+ /*
+ * FIO says:
+ * If an engine has the commit hook
+ * it has to call io_u_queued() itself.
+ */
+ io_u_queued(td, io_u);
+ }
+
+ /* FIO does not do this if an engine has the commit hook. */
+ io_u_mark_submit(td, ccd->io_u_queued_nr);
+ ccd->io_u_queued_nr = 0;
+
+ return 0;
+}
+
+/*
+ * RETURN VALUE
+ * - > 0 - a number of completed io_us
+ * - 0 - when no complicitions received
+ * - (-1) - when an error occurred
+ */
+static int client_getevent_process(struct thread_data *td)
+{
+ struct librpma_fio_client_data *ccd = td->io_ops_data;
+ struct rpma_completion cmpl;
+ /* io_u->index of completed io_u (cmpl.op_context) */
+ unsigned int io_u_index;
+ /* # of completed io_us */
+ int cmpl_num = 0;
+ /* helpers */
+ struct io_u *io_u;
+ int i;
+ int ret;
+
+ /* get a completion */
+ if ((ret = rpma_conn_completion_get(ccd->conn, &cmpl))) {
+ /* lack of completion is not an error */
+ if (ret == RPMA_E_NO_COMPLETION) {
+ /* lack of completion is not an error */
+ return 0;
+ }
+
+ /* an error occurred */
+ librpma_td_verror(td, ret, "rpma_conn_completion_get");
+ return -1;
+ }
+
+ /* if io_us has completed with an error */
+ if (cmpl.op_status != IBV_WC_SUCCESS) {
+ td->error = cmpl.op_status;
+ return -1;
+ }
+
+ if (cmpl.op == RPMA_OP_SEND)
+ ++ccd->op_send_completed;
+ else if (cmpl.op == RPMA_OP_RECV)
+ ++ccd->op_recv_completed;
+
+ if ((ret = ccd->get_io_u_index(&cmpl, &io_u_index)) != 1)
+ return ret;
+
+ /* look for an io_u being completed */
+ for (i = 0; i < ccd->io_u_flight_nr; ++i) {
+ if (ccd->io_us_flight[i]->index == io_u_index) {
+ cmpl_num = i + 1;
+ break;
+ }
+ }
+
+ /* if no matching io_u has been found */
+ if (cmpl_num == 0) {
+ log_err(
+ "no matching io_u for received completion found (io_u_index=%u)\n",
+ io_u_index);
+ return -1;
+ }
+
+ /* move completed io_us to the completed in-memory queue */
+ for (i = 0; i < cmpl_num; ++i) {
+ /* get and prepare io_u */
+ io_u = ccd->io_us_flight[i];
+
+ /* append to the queue */
+ ccd->io_us_completed[ccd->io_u_completed_nr] = io_u;
+ ccd->io_u_completed_nr++;
+ }
+
+ /* remove completed io_us from the flight queue */
+ for (i = cmpl_num; i < ccd->io_u_flight_nr; ++i)
+ ccd->io_us_flight[i - cmpl_num] = ccd->io_us_flight[i];
+ ccd->io_u_flight_nr -= cmpl_num;
+
+ return cmpl_num;
+}
+
+int librpma_fio_client_getevents(struct thread_data *td, unsigned int min,
+ unsigned int max, const struct timespec *t)
+{
+ struct librpma_fio_client_data *ccd = td->io_ops_data;
+ /* total # of completed io_us */
+ int cmpl_num_total = 0;
+ /* # of completed io_us from a single event */
+ int cmpl_num;
+
+ do {
+ cmpl_num = client_getevent_process(td);
+ if (cmpl_num > 0) {
+ /* new completions collected */
+ cmpl_num_total += cmpl_num;
+ } else if (cmpl_num == 0) {
+ /*
+ * It is required to make sure that CQEs for SENDs
+ * will flow at least at the same pace as CQEs for RECVs.
+ */
+ if (cmpl_num_total >= min &&
+ ccd->op_send_completed >= ccd->op_recv_completed)
+ break;
+
+ /*
+ * To reduce CPU consumption one can use
+ * the rpma_conn_completion_wait() function.
+ * Note this greatly increase the latency
+ * and make the results less stable.
+ * The bandwidth stays more or less the same.
+ */
+ } else {
+ /* an error occurred */
+ return -1;
+ }
+
+ /*
+ * The expected max can be exceeded if CQEs for RECVs will come up
+ * faster than CQEs for SENDs. But it is required to make sure CQEs for
+ * SENDs will flow at least at the same pace as CQEs for RECVs.
+ */
+ } while (cmpl_num_total < max ||
+ ccd->op_send_completed < ccd->op_recv_completed);
+
+ /*
+ * All posted SENDs are completed and RECVs for them (responses) are
+ * completed. This is the initial situation so the counters are reset.
+ */
+ if (ccd->op_send_posted == ccd->op_send_completed &&
+ ccd->op_send_completed == ccd->op_recv_completed) {
+ ccd->op_send_posted = 0;
+ ccd->op_send_completed = 0;
+ ccd->op_recv_completed = 0;
+ }
+
+ return cmpl_num_total;
+}
+
+struct io_u *librpma_fio_client_event(struct thread_data *td, int event)
+{
+ struct librpma_fio_client_data *ccd = td->io_ops_data;
+ struct io_u *io_u;
+ int i;
+
+ /* get the first io_u from the queue */
+ io_u = ccd->io_us_completed[0];
+
+ /* remove the first io_u from the queue */
+ for (i = 1; i < ccd->io_u_completed_nr; ++i)
+ ccd->io_us_completed[i - 1] = ccd->io_us_completed[i];
+ ccd->io_u_completed_nr--;
+
+ dprint_io_u(io_u, "client_event");
+
+ return io_u;
+}
+
+char *librpma_fio_client_errdetails(struct io_u *io_u)
+{
+ /* get the string representation of an error */
+ enum ibv_wc_status status = io_u->error;
+ const char *status_str = ibv_wc_status_str(status);
+
+ char *details = strdup(status_str);
+ if (details == NULL) {
+ fprintf(stderr, "Error: %s\n", status_str);
+ fprintf(stderr, "Fatal error: out of memory. Aborting.\n");
+ abort();
+ }
+
+ /* FIO frees the returned string when it becomes obsolete */
+ return details;
+}
+
+int librpma_fio_server_init(struct thread_data *td)
+{
+ struct librpma_fio_options_values *o = td->eo;
+ struct librpma_fio_server_data *csd;
+ struct ibv_context *dev = NULL;
+ enum rpma_log_level log_level_aux = RPMA_LOG_LEVEL_WARNING;
+ int ret = -1;
+
+ /* --debug=net sets RPMA_LOG_THRESHOLD_AUX to RPMA_LOG_LEVEL_INFO */
+#ifdef FIO_INC_DEBUG
+ if ((1UL << FD_NET) & fio_debug)
+ log_level_aux = RPMA_LOG_LEVEL_INFO;
+#endif
+
+ /* configure logging thresholds to see more details */
+ rpma_log_set_threshold(RPMA_LOG_THRESHOLD, RPMA_LOG_LEVEL_INFO);
+ rpma_log_set_threshold(RPMA_LOG_THRESHOLD_AUX, log_level_aux);
+
+
+ /* obtain an IBV context for a remote IP address */
+ if ((ret = rpma_utils_get_ibv_context(o->server_ip,
+ RPMA_UTIL_IBV_CONTEXT_LOCAL, &dev))) {
+ librpma_td_verror(td, ret, "rpma_utils_get_ibv_context");
+ return -1;
+ }
+
+ /* allocate server's data */
+ csd = calloc(1, sizeof(*csd));
+ if (csd == NULL) {
+ td_verror(td, errno, "calloc");
+ return -1;
+ }
+
+ /* create a new peer object */
+ if ((ret = rpma_peer_new(dev, &csd->peer))) {
+ librpma_td_verror(td, ret, "rpma_peer_new");
+ goto err_free_csd;
+ }
+
+ td->io_ops_data = csd;
+
+ return 0;
+
+err_free_csd:
+ free(csd);
+
+ return -1;
+}
+
+void librpma_fio_server_cleanup(struct thread_data *td)
+{
+ struct librpma_fio_server_data *csd = td->io_ops_data;
+ int ret;
+
+ if (csd == NULL)
+ return;
+
+ /* free the peer */
+ if ((ret = rpma_peer_delete(&csd->peer)))
+ librpma_td_verror(td, ret, "rpma_peer_delete");
+
+ free(csd);
+}
+
+int librpma_fio_server_open_file(struct thread_data *td, struct fio_file *f,
+ struct rpma_conn_cfg *cfg)
+{
+ struct librpma_fio_server_data *csd = td->io_ops_data;
+ struct librpma_fio_options_values *o = td->eo;
+ enum rpma_conn_event conn_event = RPMA_CONN_UNDEFINED;
+ struct librpma_fio_workspace ws = {0};
+ struct rpma_conn_private_data pdata;
+ uint32_t max_msg_num;
+ struct rpma_conn_req *conn_req;
+ struct rpma_conn *conn;
+ struct rpma_mr_local *mr;
+ char port_td[LIBRPMA_FIO_PORT_STR_LEN_MAX];
+ struct rpma_ep *ep;
+ size_t mem_size = td->o.size;
+ size_t mr_desc_size;
+ void *ws_ptr;
+ int usage_mem_type;
+ int ret;
+
+ if (!f->file_name) {
+ log_err("fio: filename is not set\n");
+ return -1;
+ }
+
+ /* start a listening endpoint at addr:port */
+ if (librpma_fio_td_port(o->port, td, port_td))
+ return -1;
+
+ if ((ret = rpma_ep_listen(csd->peer, o->server_ip, port_td, &ep))) {
+ librpma_td_verror(td, ret, "rpma_ep_listen");
+ return -1;
+ }
+
+ if (strcmp(f->file_name, "malloc") == 0) {
+ /* allocation from DRAM using posix_memalign() */
+ ws_ptr = librpma_fio_allocate_dram(td, mem_size, &csd->mem);
+ usage_mem_type = RPMA_MR_USAGE_FLUSH_TYPE_VISIBILITY;
+ } else {
+ /* allocation from PMEM using pmem_map_file() */
+ ws_ptr = librpma_fio_allocate_pmem(td, f->file_name,
+ mem_size, &csd->mem);
+ usage_mem_type = RPMA_MR_USAGE_FLUSH_TYPE_PERSISTENT;
+ }
+
+ if (ws_ptr == NULL)
+ goto err_ep_shutdown;
+
+ f->real_file_size = mem_size;
+
+ if ((ret = rpma_mr_reg(csd->peer, ws_ptr, mem_size,
+ RPMA_MR_USAGE_READ_DST | RPMA_MR_USAGE_READ_SRC |
+ RPMA_MR_USAGE_WRITE_DST | RPMA_MR_USAGE_WRITE_SRC |
+ usage_mem_type, &mr))) {
+ librpma_td_verror(td, ret, "rpma_mr_reg");
+ goto err_free;
+ }
+
+ /* get size of the memory region's descriptor */
+ if ((ret = rpma_mr_get_descriptor_size(mr, &mr_desc_size))) {
+ librpma_td_verror(td, ret, "rpma_mr_get_descriptor_size");
+ goto err_mr_dereg;
+ }
+
+ /* verify size of the memory region's descriptor */
+ if (mr_desc_size > LIBRPMA_FIO_DESCRIPTOR_MAX_SIZE) {
+ log_err(
+ "size of the memory region's descriptor is too big (max=%i)\n",
+ LIBRPMA_FIO_DESCRIPTOR_MAX_SIZE);
+ goto err_mr_dereg;
+ }
+
+ /* get the memory region's descriptor */
+ if ((ret = rpma_mr_get_descriptor(mr, &ws.descriptor[0]))) {
+ librpma_td_verror(td, ret, "rpma_mr_get_descriptor");
+ goto err_mr_dereg;
+ }
+
+ if (cfg != NULL) {
+ if ((ret = rpma_conn_cfg_get_rq_size(cfg, &max_msg_num))) {
+ librpma_td_verror(td, ret, "rpma_conn_cfg_get_rq_size");
+ goto err_mr_dereg;
+ }
+
+ /* verify whether iodepth fits into uint16_t */
+ if (max_msg_num > UINT16_MAX) {
+ log_err("fio: iodepth too big (%u > %u)\n",
+ max_msg_num, UINT16_MAX);
+ return -1;
+ }
+
+ ws.max_msg_num = max_msg_num;
+ }
+
+ /* prepare a workspace description */
+ ws.direct_write_to_pmem = o->direct_write_to_pmem;
+ ws.mr_desc_size = mr_desc_size;
+ pdata.ptr = &ws;
+ pdata.len = sizeof(ws);
+
+ /* receive an incoming connection request */
+ if ((ret = rpma_ep_next_conn_req(ep, cfg, &conn_req))) {
+ librpma_td_verror(td, ret, "rpma_ep_next_conn_req");
+ goto err_mr_dereg;
+ }
+
+ if (csd->prepare_connection && csd->prepare_connection(td, conn_req))
+ goto err_req_delete;
+
+ /* accept the connection request and obtain the connection object */
+ if ((ret = rpma_conn_req_connect(&conn_req, &pdata, &conn))) {
+ librpma_td_verror(td, ret, "rpma_conn_req_connect");
+ goto err_req_delete;
+ }
+
+ /* wait for the connection to be established */
+ if ((ret = rpma_conn_next_event(conn, &conn_event))) {
+ librpma_td_verror(td, ret, "rpma_conn_next_event");
+ goto err_conn_delete;
+ } else if (conn_event != RPMA_CONN_ESTABLISHED) {
+ log_err("rpma_conn_next_event returned an unexptected event\n");
+ goto err_conn_delete;
+ }
+
+ /* end-point is no longer needed */
+ (void) rpma_ep_shutdown(&ep);
+
+ csd->ws_mr = mr;
+ csd->ws_ptr = ws_ptr;
+ csd->conn = conn;
+
+ return 0;
+
+err_conn_delete:
+ (void) rpma_conn_delete(&conn);
+
+err_req_delete:
+ (void) rpma_conn_req_delete(&conn_req);
+
+err_mr_dereg:
+ (void) rpma_mr_dereg(&mr);
+
+err_free:
+ librpma_fio_free(&csd->mem);
+
+err_ep_shutdown:
+ (void) rpma_ep_shutdown(&ep);
+
+ return -1;
+}
+
+int librpma_fio_server_close_file(struct thread_data *td, struct fio_file *f)
+{
+ struct librpma_fio_server_data *csd = td->io_ops_data;
+ enum rpma_conn_event conn_event = RPMA_CONN_UNDEFINED;
+ int rv = 0;
+ int ret;
+
+ /* wait for the connection to be closed */
+ ret = rpma_conn_next_event(csd->conn, &conn_event);
+ if (!ret && conn_event != RPMA_CONN_CLOSED) {
+ log_err("rpma_conn_next_event returned an unexptected event\n");
+ rv = -1;
+ }
+
+ if ((ret = rpma_conn_disconnect(csd->conn))) {
+ librpma_td_verror(td, ret, "rpma_conn_disconnect");
+ rv = -1;
+ }
+
+ if ((ret = rpma_conn_delete(&csd->conn))) {
+ librpma_td_verror(td, ret, "rpma_conn_delete");
+ rv = -1;
+ }
+
+ if ((ret = rpma_mr_dereg(&csd->ws_mr))) {
+ librpma_td_verror(td, ret, "rpma_mr_dereg");
+ rv = -1;
+ }
+
+ librpma_fio_free(&csd->mem);
+
+ return rv;
+}
diff --git a/engines/librpma_fio.h b/engines/librpma_fio.h
new file mode 100644
index 00000000..8cfb2e2d
--- /dev/null
+++ b/engines/librpma_fio.h
@@ -0,0 +1,273 @@
+/*
+ * librpma_fio: librpma_apm and librpma_gpspm engines' common header.
+ *
+ * Copyright 2021, Intel Corporation
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License,
+ * version 2 as published by the Free Software Foundation..
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ */
+
+#ifndef LIBRPMA_FIO_H
+#define LIBRPMA_FIO_H 1
+
+#include "../fio.h"
+#include "../optgroup.h"
+
+#include <librpma.h>
+
+/* servers' and clients' common */
+
+#define librpma_td_verror(td, err, func) \
+ td_vmsg((td), (err), rpma_err_2str(err), (func))
+
+/* ceil(a / b) = (a + b - 1) / b */
+#define LIBRPMA_FIO_CEIL(a, b) (((a) + (b) - 1) / (b))
+
+/* common option structure for server and client */
+struct librpma_fio_options_values {
+ /*
+ * FIO considers .off1 == 0 absent so the first meaningful field has to
+ * have padding ahead of it.
+ */
+ void *pad;
+ char *server_ip;
+ /* base server listening port */
+ char *port;
+ /* Direct Write to PMem is possible */
+ unsigned int direct_write_to_pmem;
+};
+
+extern struct fio_option librpma_fio_options[];
+
+/*
+ * Limited by the maximum length of the private data
+ * for rdma_connect() in case of RDMA_PS_TCP (28 bytes).
+ */
+#define LIBRPMA_FIO_DESCRIPTOR_MAX_SIZE 24
+
+struct librpma_fio_workspace {
+ uint16_t max_msg_num; /* # of RQ slots */
+ uint8_t direct_write_to_pmem; /* Direct Write to PMem is possible */
+ uint8_t mr_desc_size; /* size of mr_desc in descriptor[] */
+ /* buffer containing mr_desc */
+ char descriptor[LIBRPMA_FIO_DESCRIPTOR_MAX_SIZE];
+};
+
+#define LIBRPMA_FIO_PORT_STR_LEN_MAX 12
+
+int librpma_fio_td_port(const char *port_base_str, struct thread_data *td,
+ char *port_out);
+
+struct librpma_fio_mem {
+ /* memory buffer */
+ char *mem_ptr;
+
+ /* size of the mapped persistent memory */
+ size_t size_mmap;
+};
+
+char *librpma_fio_allocate_dram(struct thread_data *td, size_t size,
+ struct librpma_fio_mem *mem);
+
+char *librpma_fio_allocate_pmem(struct thread_data *td, const char *filename,
+ size_t size, struct librpma_fio_mem *mem);
+
+void librpma_fio_free(struct librpma_fio_mem *mem);
+
+/* clients' common */
+
+typedef int (*librpma_fio_flush_t)(struct thread_data *td,
+ struct io_u *first_io_u, struct io_u *last_io_u,
+ unsigned long long int len);
+
+/*
+ * RETURN VALUE
+ * - ( 1) - on success
+ * - ( 0) - skip
+ * - (-1) - on error
+ */
+typedef int (*librpma_fio_get_io_u_index_t)(struct rpma_completion *cmpl,
+ unsigned int *io_u_index);
+
+struct librpma_fio_client_data {
+ struct rpma_peer *peer;
+ struct rpma_conn *conn;
+
+ /* aligned td->orig_buffer */
+ char *orig_buffer_aligned;
+
+ /* ious's base address memory registration (cd->orig_buffer_aligned) */
+ struct rpma_mr_local *orig_mr;
+
+ struct librpma_fio_workspace *ws;
+
+ /* a server's memory representation */
+ struct rpma_mr_remote *server_mr;
+ enum rpma_flush_type server_mr_flush_type;
+
+ /* remote workspace description */
+ size_t ws_size;
+
+ /* in-memory queues */
+ struct io_u **io_us_queued;
+ int io_u_queued_nr;
+ struct io_u **io_us_flight;
+ int io_u_flight_nr;
+ struct io_u **io_us_completed;
+ int io_u_completed_nr;
+
+ /* SQ control. Note: all of them have to be kept in sync. */
+ uint32_t op_send_posted;
+ uint32_t op_send_completed;
+ uint32_t op_recv_completed;
+
+ librpma_fio_flush_t flush;
+ librpma_fio_get_io_u_index_t get_io_u_index;
+
+ /* engine-specific client data */
+ void *client_data;
+};
+
+int librpma_fio_client_init(struct thread_data *td,
+ struct rpma_conn_cfg *cfg);
+void librpma_fio_client_cleanup(struct thread_data *td);
+
+int librpma_fio_file_nop(struct thread_data *td, struct fio_file *f);
+int librpma_fio_client_get_file_size(struct thread_data *td,
+ struct fio_file *f);
+
+int librpma_fio_client_post_init(struct thread_data *td);
+
+enum fio_q_status librpma_fio_client_queue(struct thread_data *td,
+ struct io_u *io_u);
+
+int librpma_fio_client_commit(struct thread_data *td);
+
+int librpma_fio_client_getevents(struct thread_data *td, unsigned int min,
+ unsigned int max, const struct timespec *t);
+
+struct io_u *librpma_fio_client_event(struct thread_data *td, int event);
+
+char *librpma_fio_client_errdetails(struct io_u *io_u);
+
+static inline int librpma_fio_client_io_read(struct thread_data *td,
+ struct io_u *io_u, int flags)
+{
+ struct librpma_fio_client_data *ccd = td->io_ops_data;
+ size_t dst_offset = (char *)(io_u->xfer_buf) - ccd->orig_buffer_aligned;
+ size_t src_offset = io_u->offset;
+ int ret;
+
+ if ((ret = rpma_read(ccd->conn, ccd->orig_mr, dst_offset,
+ ccd->server_mr, src_offset, io_u->xfer_buflen,
+ flags, (void *)(uintptr_t)io_u->index))) {
+ librpma_td_verror(td, ret, "rpma_read");
+ return -1;
+ }
+
+ return 0;
+}
+
+static inline int librpma_fio_client_io_write(struct thread_data *td,
+ struct io_u *io_u)
+{
+ struct librpma_fio_client_data *ccd = td->io_ops_data;
+ size_t src_offset = (char *)(io_u->xfer_buf) - ccd->orig_buffer_aligned;
+ size_t dst_offset = io_u->offset;
+ int ret;
+
+ if ((ret = rpma_write(ccd->conn, ccd->server_mr, dst_offset,
+ ccd->orig_mr, src_offset, io_u->xfer_buflen,
+ RPMA_F_COMPLETION_ON_ERROR,
+ (void *)(uintptr_t)io_u->index))) {
+ librpma_td_verror(td, ret, "rpma_write");
+ return -1;
+ }
+
+ return 0;
+}
+
+static inline int librpma_fio_client_io_complete_all_sends(
+ struct thread_data *td)
+{
+ struct librpma_fio_client_data *ccd = td->io_ops_data;
+ struct rpma_completion cmpl;
+ int ret;
+
+ while (ccd->op_send_posted != ccd->op_send_completed) {
+ /* get a completion */
+ ret = rpma_conn_completion_get(ccd->conn, &cmpl);
+ if (ret == RPMA_E_NO_COMPLETION) {
+ /* lack of completion is not an error */
+ continue;
+ } else if (ret != 0) {
+ /* an error occurred */
+ librpma_td_verror(td, ret, "rpma_conn_completion_get");
+ break;
+ }
+
+ if (cmpl.op_status != IBV_WC_SUCCESS)
+ return -1;
+
+ if (cmpl.op == RPMA_OP_SEND)
+ ++ccd->op_send_completed;
+ else {
+ log_err(
+ "A completion other than RPMA_OP_SEND got during cleaning up the CQ from SENDs\n");
+ return -1;
+ }
+ }
+
+ /*
+ * All posted SENDs are completed and RECVs for them (responses) are
+ * completed. This is the initial situation so the counters are reset.
+ */
+ if (ccd->op_send_posted == ccd->op_send_completed &&
+ ccd->op_send_completed == ccd->op_recv_completed) {
+ ccd->op_send_posted = 0;
+ ccd->op_send_completed = 0;
+ ccd->op_recv_completed = 0;
+ }
+
+ return 0;
+}
+
+/* servers' common */
+
+typedef int (*librpma_fio_prepare_connection_t)(
+ struct thread_data *td,
+ struct rpma_conn_req *conn_req);
+
+struct librpma_fio_server_data {
+ struct rpma_peer *peer;
+
+ /* resources of an incoming connection */
+ struct rpma_conn *conn;
+
+ char *ws_ptr;
+ struct rpma_mr_local *ws_mr;
+ struct librpma_fio_mem mem;
+
+ /* engine-specific server data */
+ void *server_data;
+
+ librpma_fio_prepare_connection_t prepare_connection;
+};
+
+int librpma_fio_server_init(struct thread_data *td);
+
+void librpma_fio_server_cleanup(struct thread_data *td);
+
+int librpma_fio_server_open_file(struct thread_data *td,
+ struct fio_file *f, struct rpma_conn_cfg *cfg);
+
+int librpma_fio_server_close_file(struct thread_data *td,
+ struct fio_file *f);
+
+#endif /* LIBRPMA_FIO_H */
diff --git a/engines/librpma_gpspm.c b/engines/librpma_gpspm.c
new file mode 100644
index 00000000..ac614f46
--- /dev/null
+++ b/engines/librpma_gpspm.c
@@ -0,0 +1,755 @@
+/*
+ * librpma_gpspm: IO engine that uses PMDK librpma to write data,
+ * based on General Purpose Server Persistency Method
+ *
+ * Copyright 2020-2021, Intel Corporation
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License,
+ * version 2 as published by the Free Software Foundation..
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ */
+
+#include "librpma_fio.h"
+
+#include <libpmem.h>
+
+/* Generated by the protocol buffer compiler from: librpma_gpspm_flush.proto */
+#include "librpma_gpspm_flush.pb-c.h"
+
+#define MAX_MSG_SIZE (512)
+#define IO_U_BUF_LEN (2 * MAX_MSG_SIZE)
+#define SEND_OFFSET (0)
+#define RECV_OFFSET (SEND_OFFSET + MAX_MSG_SIZE)
+
+#define GPSPM_FLUSH_REQUEST__LAST \
+ { PROTOBUF_C_MESSAGE_INIT(&gpspm_flush_request__descriptor), 0, 0, 0 }
+
+/*
+ * 'Flush_req_last' is the last flush request
+ * the client has to send to server to indicate
+ * that the client is done.
+ */
+static const GPSPMFlushRequest Flush_req_last = GPSPM_FLUSH_REQUEST__LAST;
+
+#define IS_NOT_THE_LAST_MESSAGE(flush_req) \
+ (flush_req->length != Flush_req_last.length || \
+ flush_req->offset != Flush_req_last.offset)
+
+/* client side implementation */
+
+/* get next io_u message buffer in the round-robin fashion */
+#define IO_U_NEXT_BUF_OFF_CLIENT(cd) \
+ (IO_U_BUF_LEN * ((cd->msg_curr++) % cd->msg_num))
+
+struct client_data {
+ /* memory for sending and receiving buffered */
+ char *io_us_msgs;
+
+ /* resources for messaging buffer */
+ uint32_t msg_num;
+ uint32_t msg_curr;
+ struct rpma_mr_local *msg_mr;
+};
+
+static inline int client_io_flush(struct thread_data *td,
+ struct io_u *first_io_u, struct io_u *last_io_u,
+ unsigned long long int len);
+
+static int client_get_io_u_index(struct rpma_completion *cmpl,
+ unsigned int *io_u_index);
+
+static int client_init(struct thread_data *td)
+{
+ struct librpma_fio_client_data *ccd;
+ struct client_data *cd;
+ uint32_t write_num;
+ struct rpma_conn_cfg *cfg = NULL;
+ int ret;
+
+ /*
+ * not supported:
+ * - readwrite = read / trim / randread / randtrim /
+ * / rw / randrw / trimwrite
+ */
+ if (td_read(td) || td_trim(td)) {
+ td_verror(td, EINVAL, "Not supported mode.");
+ return -1;
+ }
+
+ /* allocate client's data */
+ cd = calloc(1, sizeof(*cd));
+ if (cd == NULL) {
+ td_verror(td, errno, "calloc");
+ return -1;
+ }
+
+ /*
+ * Calculate the required number of WRITEs and FLUSHes.
+ *
+ * Note: Each flush is a request (SEND) and response (RECV) pair.
+ */
+ if (td_random(td)) {
+ write_num = td->o.iodepth; /* WRITE * N */
+ cd->msg_num = td->o.iodepth; /* FLUSH * N */
+ } else {
+ if (td->o.sync_io) {
+ write_num = 1; /* WRITE */
+ cd->msg_num = 1; /* FLUSH */
+ } else {
+ write_num = td->o.iodepth; /* WRITE * N */
+ /*
+ * FLUSH * B where:
+ * - B == ceil(iodepth / iodepth_batch)
+ * which is the number of batches for N writes
+ */
+ cd->msg_num = LIBRPMA_FIO_CEIL(td->o.iodepth,
+ td->o.iodepth_batch);
+ }
+ }
+
+ /* create a connection configuration object */
+ if ((ret = rpma_conn_cfg_new(&cfg))) {
+ librpma_td_verror(td, ret, "rpma_conn_cfg_new");
+ goto err_free_cd;
+ }
+
+ /*
+ * Calculate the required queue sizes where:
+ * - the send queue (SQ) has to be big enough to accommodate
+ * all io_us (WRITEs) and all flush requests (SENDs)
+ * - the receive queue (RQ) has to be big enough to accommodate
+ * all flush responses (RECVs)
+ * - the completion queue (CQ) has to be big enough to accommodate all
+ * success and error completions (sq_size + rq_size)
+ */
+ if ((ret = rpma_conn_cfg_set_sq_size(cfg, write_num + cd->msg_num))) {
+ librpma_td_verror(td, ret, "rpma_conn_cfg_set_sq_size");
+ goto err_cfg_delete;
+ }
+ if ((ret = rpma_conn_cfg_set_rq_size(cfg, cd->msg_num))) {
+ librpma_td_verror(td, ret, "rpma_conn_cfg_set_rq_size");
+ goto err_cfg_delete;
+ }
+ if ((ret = rpma_conn_cfg_set_cq_size(cfg, write_num + cd->msg_num * 2))) {
+ librpma_td_verror(td, ret, "rpma_conn_cfg_set_cq_size");
+ goto err_cfg_delete;
+ }
+
+ if (librpma_fio_client_init(td, cfg))
+ goto err_cfg_delete;
+
+ ccd = td->io_ops_data;
+
+ if (ccd->ws->direct_write_to_pmem &&
+ ccd->server_mr_flush_type == RPMA_FLUSH_TYPE_PERSISTENT &&
+ td->thread_number == 1) {
+ /* XXX log_info mixes with the JSON output */
+ log_err(
+ "Note: The server side supports Direct Write to PMem and it is equipped with PMem (direct_write_to_pmem).\n"
+ "You can use librpma_client and librpma_server engines for better performance instead of GPSPM.\n");
+ }
+
+ /* validate the server's RQ capacity */
+ if (cd->msg_num > ccd->ws->max_msg_num) {
+ log_err(
+ "server's RQ size (iodepth) too small to handle the client's workspace requirements (%u < %u)\n",
+ ccd->ws->max_msg_num, cd->msg_num);
+ goto err_cleanup_common;
+ }
+
+ if ((ret = rpma_conn_cfg_delete(&cfg))) {
+ librpma_td_verror(td, ret, "rpma_conn_cfg_delete");
+ /* non fatal error - continue */
+ }
+
+ ccd->flush = client_io_flush;
+ ccd->get_io_u_index = client_get_io_u_index;
+ ccd->client_data = cd;
+
+ return 0;
+
+err_cleanup_common:
+ librpma_fio_client_cleanup(td);
+
+err_cfg_delete:
+ (void) rpma_conn_cfg_delete(&cfg);
+
+err_free_cd:
+ free(cd);
+
+ return -1;
+}
+
+static int client_post_init(struct thread_data *td)
+{
+ struct librpma_fio_client_data *ccd = td->io_ops_data;
+ struct client_data *cd = ccd->client_data;
+ unsigned int io_us_msgs_size;
+ int ret;
+
+ /* message buffers initialization and registration */
+ io_us_msgs_size = cd->msg_num * IO_U_BUF_LEN;
+ if ((ret = posix_memalign((void **)&cd->io_us_msgs, page_size,
+ io_us_msgs_size))) {
+ td_verror(td, ret, "posix_memalign");
+ return ret;
+ }
+ if ((ret = rpma_mr_reg(ccd->peer, cd->io_us_msgs, io_us_msgs_size,
+ RPMA_MR_USAGE_SEND | RPMA_MR_USAGE_RECV,
+ &cd->msg_mr))) {
+ librpma_td_verror(td, ret, "rpma_mr_reg");
+ return ret;
+ }
+
+ return librpma_fio_client_post_init(td);
+}
+
+static void client_cleanup(struct thread_data *td)
+{
+ struct librpma_fio_client_data *ccd = td->io_ops_data;
+ struct client_data *cd;
+ size_t flush_req_size;
+ size_t io_u_buf_off;
+ size_t send_offset;
+ void *send_ptr;
+ int ret;
+
+ if (ccd == NULL)
+ return;
+
+ cd = ccd->client_data;
+ if (cd == NULL) {
+ librpma_fio_client_cleanup(td);
+ return;
+ }
+
+ /*
+ * Make sure all SEND completions are collected ergo there are free
+ * slots in the SQ for the last SEND message.
+ *
+ * Note: If any operation will fail we still can send the termination
+ * notice.
+ */
+ (void) librpma_fio_client_io_complete_all_sends(td);
+
+ /* prepare the last flush message and pack it to the send buffer */
+ flush_req_size = gpspm_flush_request__get_packed_size(&Flush_req_last);
+ if (flush_req_size > MAX_MSG_SIZE) {
+ log_err(
+ "Packed flush request size is bigger than available send buffer space (%zu > %d\n",
+ flush_req_size, MAX_MSG_SIZE);
+ } else {
+ io_u_buf_off = IO_U_NEXT_BUF_OFF_CLIENT(cd);
+ send_offset = io_u_buf_off + SEND_OFFSET;
+ send_ptr = cd->io_us_msgs + send_offset;
+ (void) gpspm_flush_request__pack(&Flush_req_last, send_ptr);
+
+ /* send the flush message */
+ if ((ret = rpma_send(ccd->conn, cd->msg_mr, send_offset,
+ flush_req_size, RPMA_F_COMPLETION_ALWAYS,
+ NULL)))
+ librpma_td_verror(td, ret, "rpma_send");
+
+ ++ccd->op_send_posted;
+
+ /* Wait for the SEND to complete */
+ (void) librpma_fio_client_io_complete_all_sends(td);
+ }
+
+ /* deregister the messaging buffer memory */
+ if ((ret = rpma_mr_dereg(&cd->msg_mr)))
+ librpma_td_verror(td, ret, "rpma_mr_dereg");
+
+ free(ccd->client_data);
+
+ librpma_fio_client_cleanup(td);
+}
+
+static inline int client_io_flush(struct thread_data *td,
+ struct io_u *first_io_u, struct io_u *last_io_u,
+ unsigned long long int len)
+{
+ struct librpma_fio_client_data *ccd = td->io_ops_data;
+ struct client_data *cd = ccd->client_data;
+ size_t io_u_buf_off = IO_U_NEXT_BUF_OFF_CLIENT(cd);
+ size_t send_offset = io_u_buf_off + SEND_OFFSET;
+ size_t recv_offset = io_u_buf_off + RECV_OFFSET;
+ void *send_ptr = cd->io_us_msgs + send_offset;
+ void *recv_ptr = cd->io_us_msgs + recv_offset;
+ GPSPMFlushRequest flush_req = GPSPM_FLUSH_REQUEST__INIT;
+ size_t flush_req_size = 0;
+ int ret;
+
+ /* prepare a response buffer */
+ if ((ret = rpma_recv(ccd->conn, cd->msg_mr, recv_offset, MAX_MSG_SIZE,
+ recv_ptr))) {
+ librpma_td_verror(td, ret, "rpma_recv");
+ return -1;
+ }
+
+ /* prepare a flush message and pack it to a send buffer */
+ flush_req.offset = first_io_u->offset;
+ flush_req.length = len;
+ flush_req.op_context = last_io_u->index;
+ flush_req_size = gpspm_flush_request__get_packed_size(&flush_req);
+ if (flush_req_size > MAX_MSG_SIZE) {
+ log_err(
+ "Packed flush request size is bigger than available send buffer space (%"
+ PRIu64 " > %d\n", flush_req_size, MAX_MSG_SIZE);
+ return -1;
+ }
+ (void) gpspm_flush_request__pack(&flush_req, send_ptr);
+
+ /* send the flush message */
+ if ((ret = rpma_send(ccd->conn, cd->msg_mr, send_offset, flush_req_size,
+ RPMA_F_COMPLETION_ALWAYS, NULL))) {
+ librpma_td_verror(td, ret, "rpma_send");
+ return -1;
+ }
+
+ ++ccd->op_send_posted;
+
+ return 0;
+}
+
+static int client_get_io_u_index(struct rpma_completion *cmpl,
+ unsigned int *io_u_index)
+{
+ GPSPMFlushResponse *flush_resp;
+
+ if (cmpl->op != RPMA_OP_RECV)
+ return 0;
+
+ /* unpack a response from the received buffer */
+ flush_resp = gpspm_flush_response__unpack(NULL,
+ cmpl->byte_len, cmpl->op_context);
+ if (flush_resp == NULL) {
+ log_err("Cannot unpack the flush response buffer\n");
+ return -1;
+ }
+
+ memcpy(io_u_index, &flush_resp->op_context, sizeof(*io_u_index));
+
+ gpspm_flush_response__free_unpacked(flush_resp, NULL);
+
+ return 1;
+}
+
+FIO_STATIC struct ioengine_ops ioengine_client = {
+ .name = "librpma_gpspm_client",
+ .version = FIO_IOOPS_VERSION,
+ .init = client_init,
+ .post_init = client_post_init,
+ .get_file_size = librpma_fio_client_get_file_size,
+ .open_file = librpma_fio_file_nop,
+ .queue = librpma_fio_client_queue,
+ .commit = librpma_fio_client_commit,
+ .getevents = librpma_fio_client_getevents,
+ .event = librpma_fio_client_event,
+ .errdetails = librpma_fio_client_errdetails,
+ .close_file = librpma_fio_file_nop,
+ .cleanup = client_cleanup,
+ .flags = FIO_DISKLESSIO,
+ .options = librpma_fio_options,
+ .option_struct_size = sizeof(struct librpma_fio_options_values),
+};
+
+/* server side implementation */
+
+#define IO_U_BUFF_OFF_SERVER(i) (i * IO_U_BUF_LEN)
+
+struct server_data {
+ /* aligned td->orig_buffer */
+ char *orig_buffer_aligned;
+
+ /* resources for messaging buffer from DRAM allocated by fio */
+ struct rpma_mr_local *msg_mr;
+
+ uint32_t msg_sqe_available; /* # of free SQ slots */
+
+ /* in-memory queues */
+ struct rpma_completion *msgs_queued;
+ uint32_t msg_queued_nr;
+};
+
+static int server_init(struct thread_data *td)
+{
+ struct librpma_fio_server_data *csd;
+ struct server_data *sd;
+ int ret = -1;
+
+ if ((ret = librpma_fio_server_init(td)))
+ return ret;
+
+ csd = td->io_ops_data;
+
+ /* allocate server's data */
+ sd = calloc(1, sizeof(*sd));
+ if (sd == NULL) {
+ td_verror(td, errno, "calloc");
+ goto err_server_cleanup;
+ }
+
+ /* allocate in-memory queue */
+ sd->msgs_queued = calloc(td->o.iodepth, sizeof(*sd->msgs_queued));
+ if (sd->msgs_queued == NULL) {
+ td_verror(td, errno, "calloc");
+ goto err_free_sd;
+ }
+
+ /*
+ * Assure a single io_u buffer can store both SEND and RECV messages and
+ * an io_us buffer allocation is page-size-aligned which is required
+ * to register for RDMA. User-provided values are intentionally ignored.
+ */
+ td->o.max_bs[DDIR_READ] = IO_U_BUF_LEN;
+ td->o.mem_align = page_size;
+
+ csd->server_data = sd;
+
+ return 0;
+
+err_free_sd:
+ free(sd);
+
+err_server_cleanup:
+ librpma_fio_server_cleanup(td);
+
+ return -1;
+}
+
+static int server_post_init(struct thread_data *td)
+{
+ struct librpma_fio_server_data *csd = td->io_ops_data;
+ struct server_data *sd = csd->server_data;
+ size_t io_us_size;
+ size_t io_u_buflen;
+ int ret;
+
+ /*
+ * td->orig_buffer is not aligned. The engine requires aligned io_us
+ * so FIO alignes up the address using the formula below.
+ */
+ sd->orig_buffer_aligned = PTR_ALIGN(td->orig_buffer, page_mask) +
+ td->o.mem_align;
+
+ /*
+ * XXX
+ * Each io_u message buffer contains recv and send messages.
+ * Aligning each of those buffers may potentially give
+ * some performance benefits.
+ */
+ io_u_buflen = td_max_bs(td);
+
+ /* check whether io_u buffer is big enough */
+ if (io_u_buflen < IO_U_BUF_LEN) {
+ log_err(
+ "blocksize too small to accommodate assumed maximal request/response pair size (%" PRIu64 " < %d)\n",
+ io_u_buflen, IO_U_BUF_LEN);
+ return -1;
+ }
+
+ /*
+ * td->orig_buffer_size beside the space really consumed by io_us
+ * has paddings which can be omitted for the memory registration.
+ */
+ io_us_size = (unsigned long long)io_u_buflen *
+ (unsigned long long)td->o.iodepth;
+
+ if ((ret = rpma_mr_reg(csd->peer, sd->orig_buffer_aligned, io_us_size,
+ RPMA_MR_USAGE_SEND | RPMA_MR_USAGE_RECV,
+ &sd->msg_mr))) {
+ librpma_td_verror(td, ret, "rpma_mr_reg");
+ return -1;
+ }
+
+ return 0;
+}
+
+static void server_cleanup(struct thread_data *td)
+{
+ struct librpma_fio_server_data *csd = td->io_ops_data;
+ struct server_data *sd;
+ int ret;
+
+ if (csd == NULL)
+ return;
+
+ sd = csd->server_data;
+
+ if (sd != NULL) {
+ /* rpma_mr_dereg(messaging buffer from DRAM) */
+ if ((ret = rpma_mr_dereg(&sd->msg_mr)))
+ librpma_td_verror(td, ret, "rpma_mr_dereg");
+
+ free(sd->msgs_queued);
+ free(sd);
+ }
+
+ librpma_fio_server_cleanup(td);
+}
+
+static int prepare_connection(struct thread_data *td,
+ struct rpma_conn_req *conn_req)
+{
+ struct librpma_fio_server_data *csd = td->io_ops_data;
+ struct server_data *sd = csd->server_data;
+ int ret;
+ int i;
+
+ /* prepare buffers for a flush requests */
+ sd->msg_sqe_available = td->o.iodepth;
+ for (i = 0; i < td->o.iodepth; i++) {
+ size_t offset_recv_msg = IO_U_BUFF_OFF_SERVER(i) + RECV_OFFSET;
+ if ((ret = rpma_conn_req_recv(conn_req, sd->msg_mr,
+ offset_recv_msg, MAX_MSG_SIZE,
+ (const void *)(uintptr_t)i))) {
+ librpma_td_verror(td, ret, "rpma_conn_req_recv");
+ return ret;
+ }
+ }
+
+ return 0;
+}
+
+static int server_open_file(struct thread_data *td, struct fio_file *f)
+{
+ struct librpma_fio_server_data *csd = td->io_ops_data;
+ struct rpma_conn_cfg *cfg = NULL;
+ uint16_t max_msg_num = td->o.iodepth;
+ int ret;
+
+ csd->prepare_connection = prepare_connection;
+
+ /* create a connection configuration object */
+ if ((ret = rpma_conn_cfg_new(&cfg))) {
+ librpma_td_verror(td, ret, "rpma_conn_cfg_new");
+ return -1;
+ }
+
+ /*
+ * Calculate the required queue sizes where:
+ * - the send queue (SQ) has to be big enough to accommodate
+ * all possible flush requests (SENDs)
+ * - the receive queue (RQ) has to be big enough to accommodate
+ * all flush responses (RECVs)
+ * - the completion queue (CQ) has to be big enough to accommodate
+ * all success and error completions (sq_size + rq_size)
+ */
+ if ((ret = rpma_conn_cfg_set_sq_size(cfg, max_msg_num))) {
+ librpma_td_verror(td, ret, "rpma_conn_cfg_set_sq_size");
+ goto err_cfg_delete;
+ }
+ if ((ret = rpma_conn_cfg_set_rq_size(cfg, max_msg_num))) {
+ librpma_td_verror(td, ret, "rpma_conn_cfg_set_rq_size");
+ goto err_cfg_delete;
+ }
+ if ((ret = rpma_conn_cfg_set_cq_size(cfg, max_msg_num * 2))) {
+ librpma_td_verror(td, ret, "rpma_conn_cfg_set_cq_size");
+ goto err_cfg_delete;
+ }
+
+ ret = librpma_fio_server_open_file(td, f, cfg);
+
+err_cfg_delete:
+ (void) rpma_conn_cfg_delete(&cfg);
+
+ return ret;
+}
+
+static int server_qe_process(struct thread_data *td,
+ struct rpma_completion *cmpl)
+{
+ struct librpma_fio_server_data *csd = td->io_ops_data;
+ struct server_data *sd = csd->server_data;
+ GPSPMFlushRequest *flush_req;
+ GPSPMFlushResponse flush_resp = GPSPM_FLUSH_RESPONSE__INIT;
+ size_t flush_resp_size = 0;
+ size_t send_buff_offset;
+ size_t recv_buff_offset;
+ size_t io_u_buff_offset;
+ void *send_buff_ptr;
+ void *recv_buff_ptr;
+ void *op_ptr;
+ int msg_index;
+ int ret;
+
+ /* calculate SEND/RECV pair parameters */
+ msg_index = (int)(uintptr_t)cmpl->op_context;
+ io_u_buff_offset = IO_U_BUFF_OFF_SERVER(msg_index);
+ send_buff_offset = io_u_buff_offset + SEND_OFFSET;
+ recv_buff_offset = io_u_buff_offset + RECV_OFFSET;
+ send_buff_ptr = sd->orig_buffer_aligned + send_buff_offset;
+ recv_buff_ptr = sd->orig_buffer_aligned + recv_buff_offset;
+
+ /* unpack a flush request from the received buffer */
+ flush_req = gpspm_flush_request__unpack(NULL, cmpl->byte_len,
+ recv_buff_ptr);
+ if (flush_req == NULL) {
+ log_err("cannot unpack the flush request buffer\n");
+ goto err_terminate;
+ }
+
+ if (IS_NOT_THE_LAST_MESSAGE(flush_req)) {
+ op_ptr = csd->ws_ptr + flush_req->offset;
+ pmem_persist(op_ptr, flush_req->length);
+ } else {
+ /*
+ * This is the last message - the client is done.
+ */
+ gpspm_flush_request__free_unpacked(flush_req, NULL);
+ td->done = true;
+ return 0;
+ }
+
+ /* initiate the next receive operation */
+ if ((ret = rpma_recv(csd->conn, sd->msg_mr, recv_buff_offset,
+ MAX_MSG_SIZE,
+ (const void *)(uintptr_t)msg_index))) {
+ librpma_td_verror(td, ret, "rpma_recv");
+ goto err_free_unpacked;
+ }
+
+ /* prepare a flush response and pack it to a send buffer */
+ flush_resp.op_context = flush_req->op_context;
+ flush_resp_size = gpspm_flush_response__get_packed_size(&flush_resp);
+ if (flush_resp_size > MAX_MSG_SIZE) {
+ log_err(
+ "Size of the packed flush response is bigger than the available space of the send buffer (%"
+ PRIu64 " > %i\n", flush_resp_size, MAX_MSG_SIZE);
+ goto err_free_unpacked;
+ }
+
+ (void) gpspm_flush_response__pack(&flush_resp, send_buff_ptr);
+
+ /* send the flush response */
+ if ((ret = rpma_send(csd->conn, sd->msg_mr, send_buff_offset,
+ flush_resp_size, RPMA_F_COMPLETION_ALWAYS, NULL))) {
+ librpma_td_verror(td, ret, "rpma_send");
+ goto err_free_unpacked;
+ }
+ --sd->msg_sqe_available;
+
+ gpspm_flush_request__free_unpacked(flush_req, NULL);
+
+ return 0;
+
+err_free_unpacked:
+ gpspm_flush_request__free_unpacked(flush_req, NULL);
+
+err_terminate:
+ td->terminate = true;
+
+ return -1;
+}
+
+static inline int server_queue_process(struct thread_data *td)
+{
+ struct librpma_fio_server_data *csd = td->io_ops_data;
+ struct server_data *sd = csd->server_data;
+ int ret;
+ int i;
+
+ /* min(# of queue entries, # of SQ entries available) */
+ uint32_t qes_to_process = min(sd->msg_queued_nr, sd->msg_sqe_available);
+ if (qes_to_process == 0)
+ return 0;
+
+ /* process queued completions */
+ for (i = 0; i < qes_to_process; ++i) {
+ if ((ret = server_qe_process(td, &sd->msgs_queued[i])))
+ return ret;
+ }
+
+ /* progress the queue */
+ for (i = 0; i < sd->msg_queued_nr - qes_to_process; ++i) {
+ memcpy(&sd->msgs_queued[i],
+ &sd->msgs_queued[qes_to_process + i],
+ sizeof(sd->msgs_queued[i]));
+ }
+
+ sd->msg_queued_nr -= qes_to_process;
+
+ return 0;
+}
+
+static int server_cmpl_process(struct thread_data *td)
+{
+ struct librpma_fio_server_data *csd = td->io_ops_data;
+ struct server_data *sd = csd->server_data;
+ struct rpma_completion *cmpl = &sd->msgs_queued[sd->msg_queued_nr];
+ int ret;
+
+ ret = rpma_conn_completion_get(csd->conn, cmpl);
+ if (ret == RPMA_E_NO_COMPLETION) {
+ /* lack of completion is not an error */
+ return 0;
+ } else if (ret != 0) {
+ librpma_td_verror(td, ret, "rpma_conn_completion_get");
+ goto err_terminate;
+ }
+
+ /* validate the completion */
+ if (cmpl->op_status != IBV_WC_SUCCESS)
+ goto err_terminate;
+
+ if (cmpl->op == RPMA_OP_RECV)
+ ++sd->msg_queued_nr;
+ else if (cmpl->op == RPMA_OP_SEND)
+ ++sd->msg_sqe_available;
+
+ return 0;
+
+err_terminate:
+ td->terminate = true;
+
+ return -1;
+}
+
+static enum fio_q_status server_queue(struct thread_data *td, struct io_u *io_u)
+{
+ do {
+ if (server_cmpl_process(td))
+ return FIO_Q_BUSY;
+
+ if (server_queue_process(td))
+ return FIO_Q_BUSY;
+
+ } while (!td->done);
+
+ return FIO_Q_COMPLETED;
+}
+
+FIO_STATIC struct ioengine_ops ioengine_server = {
+ .name = "librpma_gpspm_server",
+ .version = FIO_IOOPS_VERSION,
+ .init = server_init,
+ .post_init = server_post_init,
+ .open_file = server_open_file,
+ .close_file = librpma_fio_server_close_file,
+ .queue = server_queue,
+ .invalidate = librpma_fio_file_nop,
+ .cleanup = server_cleanup,
+ .flags = FIO_SYNCIO,
+ .options = librpma_fio_options,
+ .option_struct_size = sizeof(struct librpma_fio_options_values),
+};
+
+/* register both engines */
+
+static void fio_init fio_librpma_gpspm_register(void)
+{
+ register_ioengine(&ioengine_client);
+ register_ioengine(&ioengine_server);
+}
+
+static void fio_exit fio_librpma_gpspm_unregister(void)
+{
+ unregister_ioengine(&ioengine_client);
+ unregister_ioengine(&ioengine_server);
+}
diff --git a/engines/librpma_gpspm_flush.pb-c.c b/engines/librpma_gpspm_flush.pb-c.c
new file mode 100644
index 00000000..3ff24756
--- /dev/null
+++ b/engines/librpma_gpspm_flush.pb-c.c
@@ -0,0 +1,214 @@
+/*
+ * Copyright 2020, Intel Corporation
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License,
+ * version 2 as published by the Free Software Foundation..
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ */
+
+/* Generated by the protocol buffer compiler. DO NOT EDIT! */
+/* Generated from: librpma_gpspm_flush.proto */
+
+/* Do not generate deprecated warnings for self */
+#ifndef PROTOBUF_C__NO_DEPRECATED
+#define PROTOBUF_C__NO_DEPRECATED
+#endif
+
+#include "librpma_gpspm_flush.pb-c.h"
+void gpspm_flush_request__init
+ (GPSPMFlushRequest *message)
+{
+ static const GPSPMFlushRequest init_value = GPSPM_FLUSH_REQUEST__INIT;
+ *message = init_value;
+}
+size_t gpspm_flush_request__get_packed_size
+ (const GPSPMFlushRequest *message)
+{
+ assert(message->base.descriptor == &gpspm_flush_request__descriptor);
+ return protobuf_c_message_get_packed_size ((const ProtobufCMessage*)(message));
+}
+size_t gpspm_flush_request__pack
+ (const GPSPMFlushRequest *message,
+ uint8_t *out)
+{
+ assert(message->base.descriptor == &gpspm_flush_request__descriptor);
+ return protobuf_c_message_pack ((const ProtobufCMessage*)message, out);
+}
+size_t gpspm_flush_request__pack_to_buffer
+ (const GPSPMFlushRequest *message,
+ ProtobufCBuffer *buffer)
+{
+ assert(message->base.descriptor == &gpspm_flush_request__descriptor);
+ return protobuf_c_message_pack_to_buffer ((const ProtobufCMessage*)message, buffer);
+}
+GPSPMFlushRequest *
+ gpspm_flush_request__unpack
+ (ProtobufCAllocator *allocator,
+ size_t len,
+ const uint8_t *data)
+{
+ return (GPSPMFlushRequest *)
+ protobuf_c_message_unpack (&gpspm_flush_request__descriptor,
+ allocator, len, data);
+}
+void gpspm_flush_request__free_unpacked
+ (GPSPMFlushRequest *message,
+ ProtobufCAllocator *allocator)
+{
+ if(!message)
+ return;
+ assert(message->base.descriptor == &gpspm_flush_request__descriptor);
+ protobuf_c_message_free_unpacked ((ProtobufCMessage*)message, allocator);
+}
+void gpspm_flush_response__init
+ (GPSPMFlushResponse *message)
+{
+ static const GPSPMFlushResponse init_value = GPSPM_FLUSH_RESPONSE__INIT;
+ *message = init_value;
+}
+size_t gpspm_flush_response__get_packed_size
+ (const GPSPMFlushResponse *message)
+{
+ assert(message->base.descriptor == &gpspm_flush_response__descriptor);
+ return protobuf_c_message_get_packed_size ((const ProtobufCMessage*)(message));
+}
+size_t gpspm_flush_response__pack
+ (const GPSPMFlushResponse *message,
+ uint8_t *out)
+{
+ assert(message->base.descriptor == &gpspm_flush_response__descriptor);
+ return protobuf_c_message_pack ((const ProtobufCMessage*)message, out);
+}
+size_t gpspm_flush_response__pack_to_buffer
+ (const GPSPMFlushResponse *message,
+ ProtobufCBuffer *buffer)
+{
+ assert(message->base.descriptor == &gpspm_flush_response__descriptor);
+ return protobuf_c_message_pack_to_buffer ((const ProtobufCMessage*)message, buffer);
+}
+GPSPMFlushResponse *
+ gpspm_flush_response__unpack
+ (ProtobufCAllocator *allocator,
+ size_t len,
+ const uint8_t *data)
+{
+ return (GPSPMFlushResponse *)
+ protobuf_c_message_unpack (&gpspm_flush_response__descriptor,
+ allocator, len, data);
+}
+void gpspm_flush_response__free_unpacked
+ (GPSPMFlushResponse *message,
+ ProtobufCAllocator *allocator)
+{
+ if(!message)
+ return;
+ assert(message->base.descriptor == &gpspm_flush_response__descriptor);
+ protobuf_c_message_free_unpacked ((ProtobufCMessage*)message, allocator);
+}
+static const ProtobufCFieldDescriptor gpspm_flush_request__field_descriptors[3] =
+{
+ {
+ "offset",
+ 1,
+ PROTOBUF_C_LABEL_REQUIRED,
+ PROTOBUF_C_TYPE_FIXED64,
+ 0, /* quantifier_offset */
+ offsetof(GPSPMFlushRequest, offset),
+ NULL,
+ NULL,
+ 0, /* flags */
+ 0,NULL,NULL /* reserved1,reserved2, etc */
+ },
+ {
+ "length",
+ 2,
+ PROTOBUF_C_LABEL_REQUIRED,
+ PROTOBUF_C_TYPE_FIXED64,
+ 0, /* quantifier_offset */
+ offsetof(GPSPMFlushRequest, length),
+ NULL,
+ NULL,
+ 0, /* flags */
+ 0,NULL,NULL /* reserved1,reserved2, etc */
+ },
+ {
+ "op_context",
+ 3,
+ PROTOBUF_C_LABEL_REQUIRED,
+ PROTOBUF_C_TYPE_FIXED64,
+ 0, /* quantifier_offset */
+ offsetof(GPSPMFlushRequest, op_context),
+ NULL,
+ NULL,
+ 0, /* flags */
+ 0,NULL,NULL /* reserved1,reserved2, etc */
+ },
+};
+static const unsigned gpspm_flush_request__field_indices_by_name[] = {
+ 1, /* field[1] = length */
+ 0, /* field[0] = offset */
+ 2, /* field[2] = op_context */
+};
+static const ProtobufCIntRange gpspm_flush_request__number_ranges[1 + 1] =
+{
+ { 1, 0 },
+ { 0, 3 }
+};
+const ProtobufCMessageDescriptor gpspm_flush_request__descriptor =
+{
+ PROTOBUF_C__MESSAGE_DESCRIPTOR_MAGIC,
+ "GPSPM_flush_request",
+ "GPSPMFlushRequest",
+ "GPSPMFlushRequest",
+ "",
+ sizeof(GPSPMFlushRequest),
+ 3,
+ gpspm_flush_request__field_descriptors,
+ gpspm_flush_request__field_indices_by_name,
+ 1, gpspm_flush_request__number_ranges,
+ (ProtobufCMessageInit) gpspm_flush_request__init,
+ NULL,NULL,NULL /* reserved[123] */
+};
+static const ProtobufCFieldDescriptor gpspm_flush_response__field_descriptors[1] =
+{
+ {
+ "op_context",
+ 1,
+ PROTOBUF_C_LABEL_REQUIRED,
+ PROTOBUF_C_TYPE_FIXED64,
+ 0, /* quantifier_offset */
+ offsetof(GPSPMFlushResponse, op_context),
+ NULL,
+ NULL,
+ 0, /* flags */
+ 0,NULL,NULL /* reserved1,reserved2, etc */
+ },
+};
+static const unsigned gpspm_flush_response__field_indices_by_name[] = {
+ 0, /* field[0] = op_context */
+};
+static const ProtobufCIntRange gpspm_flush_response__number_ranges[1 + 1] =
+{
+ { 1, 0 },
+ { 0, 1 }
+};
+const ProtobufCMessageDescriptor gpspm_flush_response__descriptor =
+{
+ PROTOBUF_C__MESSAGE_DESCRIPTOR_MAGIC,
+ "GPSPM_flush_response",
+ "GPSPMFlushResponse",
+ "GPSPMFlushResponse",
+ "",
+ sizeof(GPSPMFlushResponse),
+ 1,
+ gpspm_flush_response__field_descriptors,
+ gpspm_flush_response__field_indices_by_name,
+ 1, gpspm_flush_response__number_ranges,
+ (ProtobufCMessageInit) gpspm_flush_response__init,
+ NULL,NULL,NULL /* reserved[123] */
+};
diff --git a/engines/librpma_gpspm_flush.pb-c.h b/engines/librpma_gpspm_flush.pb-c.h
new file mode 100644
index 00000000..ad475a95
--- /dev/null
+++ b/engines/librpma_gpspm_flush.pb-c.h
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2020, Intel Corporation
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License,
+ * version 2 as published by the Free Software Foundation..
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ */
+
+/* Generated by the protocol buffer compiler. DO NOT EDIT! */
+/* Generated from: librpma_gpspm_flush.proto */
+
+#ifndef PROTOBUF_C_GPSPM_5fflush_2eproto__INCLUDED
+#define PROTOBUF_C_GPSPM_5fflush_2eproto__INCLUDED
+
+#include <protobuf-c/protobuf-c.h>
+
+PROTOBUF_C__BEGIN_DECLS
+
+#if PROTOBUF_C_VERSION_NUMBER < 1000000
+# error This file was generated by a newer version of protoc-c which is incompatible with your libprotobuf-c headers. Please update your headers.
+#elif 1003003 < PROTOBUF_C_MIN_COMPILER_VERSION
+# error This file was generated by an older version of protoc-c which is incompatible with your libprotobuf-c headers. Please regenerate this file with a newer version of protoc-c.
+#endif
+
+
+typedef struct _GPSPMFlushRequest GPSPMFlushRequest;
+typedef struct _GPSPMFlushResponse GPSPMFlushResponse;
+
+
+/* --- enums --- */
+
+
+/* --- messages --- */
+
+struct _GPSPMFlushRequest
+{
+ ProtobufCMessage base;
+ uint64_t offset;
+ uint64_t length;
+ uint64_t op_context;
+};
+#define GPSPM_FLUSH_REQUEST__INIT \
+ { PROTOBUF_C_MESSAGE_INIT (&gpspm_flush_request__descriptor) \
+ , 0, 0, 0 }
+
+
+struct _GPSPMFlushResponse
+{
+ ProtobufCMessage base;
+ uint64_t op_context;
+};
+#define GPSPM_FLUSH_RESPONSE__INIT \
+ { PROTOBUF_C_MESSAGE_INIT (&gpspm_flush_response__descriptor) \
+ , 0 }
+
+
+/* GPSPMFlushRequest methods */
+void gpspm_flush_request__init
+ (GPSPMFlushRequest *message);
+size_t gpspm_flush_request__get_packed_size
+ (const GPSPMFlushRequest *message);
+size_t gpspm_flush_request__pack
+ (const GPSPMFlushRequest *message,
+ uint8_t *out);
+size_t gpspm_flush_request__pack_to_buffer
+ (const GPSPMFlushRequest *message,
+ ProtobufCBuffer *buffer);
+GPSPMFlushRequest *
+ gpspm_flush_request__unpack
+ (ProtobufCAllocator *allocator,
+ size_t len,
+ const uint8_t *data);
+void gpspm_flush_request__free_unpacked
+ (GPSPMFlushRequest *message,
+ ProtobufCAllocator *allocator);
+/* GPSPMFlushResponse methods */
+void gpspm_flush_response__init
+ (GPSPMFlushResponse *message);
+size_t gpspm_flush_response__get_packed_size
+ (const GPSPMFlushResponse *message);
+size_t gpspm_flush_response__pack
+ (const GPSPMFlushResponse *message,
+ uint8_t *out);
+size_t gpspm_flush_response__pack_to_buffer
+ (const GPSPMFlushResponse *message,
+ ProtobufCBuffer *buffer);
+GPSPMFlushResponse *
+ gpspm_flush_response__unpack
+ (ProtobufCAllocator *allocator,
+ size_t len,
+ const uint8_t *data);
+void gpspm_flush_response__free_unpacked
+ (GPSPMFlushResponse *message,
+ ProtobufCAllocator *allocator);
+/* --- per-message closures --- */
+
+typedef void (*GPSPMFlushRequest_Closure)
+ (const GPSPMFlushRequest *message,
+ void *closure_data);
+typedef void (*GPSPMFlushResponse_Closure)
+ (const GPSPMFlushResponse *message,
+ void *closure_data);
+
+/* --- services --- */
+
+
+/* --- descriptors --- */
+
+extern const ProtobufCMessageDescriptor gpspm_flush_request__descriptor;
+extern const ProtobufCMessageDescriptor gpspm_flush_response__descriptor;
+
+PROTOBUF_C__END_DECLS
+
+
+#endif /* PROTOBUF_C_GPSPM_5fflush_2eproto__INCLUDED */
diff --git a/engines/librpma_gpspm_flush.proto b/engines/librpma_gpspm_flush.proto
new file mode 100644
index 00000000..91765a7f
--- /dev/null
+++ b/engines/librpma_gpspm_flush.proto
@@ -0,0 +1,15 @@
+syntax = "proto2";
+
+message GPSPM_flush_request {
+ /* an offset of a region to be flushed within its memory registration */
+ required fixed64 offset = 1;
+ /* a length of a region to be flushed */
+ required fixed64 length = 2;
+ /* a user-defined operation context */
+ required fixed64 op_context = 3;
+}
+
+message GPSPM_flush_response {
+ /* the operation context of a completed request */
+ required fixed64 op_context = 1;
+}
diff --git a/examples/librpma_apm-client.fio b/examples/librpma_apm-client.fio
new file mode 100644
index 00000000..82a5d20c
--- /dev/null
+++ b/examples/librpma_apm-client.fio
@@ -0,0 +1,24 @@
+# Example of the librpma_apm_client job
+
+[global]
+ioengine=librpma_apm_client
+create_serialize=0 # (required) forces specific initiation sequence
+serverip=[serverip] #IP address the server is listening on
+port=7204 # port(s) the server will listen on, <port; port + numjobs - 1> will be used
+thread
+
+# The client will get a remote memory region description after establishing
+# a connection.
+
+[client]
+numjobs=1 # number of parallel connections
+group_reporting=1
+sync=1 # 1 is the best for latency measurements, 0 for bandwidth
+iodepth=2 # total number of ious
+iodepth_batch_submit=1 # number of ious to be submitted at once
+rw=write # read/write/randread/randwrite/readwrite/rw
+rwmixread=70 # % of a mixed workload that should be reads
+blocksize=4KiB
+ramp_time=15s # gives some time to stabilize the workload
+time_based
+runtime=60s # run the workload for the specified period of time
diff --git a/examples/librpma_apm-server.fio b/examples/librpma_apm-server.fio
new file mode 100644
index 00000000..062b5215
--- /dev/null
+++ b/examples/librpma_apm-server.fio
@@ -0,0 +1,26 @@
+# Example of the librpma_apm_server job
+
+[global]
+ioengine=librpma_apm_server
+create_serialize=0 # (required) forces specific initiation sequence
+kb_base=1000 # turn on the straight units handling (non-compatibility mode)
+serverip=[serverip] # IP address to listen on
+port=7204 # port(s) the server jobs will listen on, ports <port; port + numjobs - 1> will be used
+thread
+
+# The server side spawns one thread for each expected connection from
+# the client-side, opens and registers the range dedicated for this thread
+# (a workspace) from the provided memory.
+# Each of the server threads accepts a connection on the dedicated port
+# (different for each and every working thread) and waits for it to end up,
+# and closes itself.
+
+[server]
+# set to 1 (true) ONLY when Direct Write to PMem from the remote host is possible
+# (https://pmem.io/rpma/documentation/basic-direct-write-to-pmem.html)
+direct_write_to_pmem=0
+
+numjobs=1 # number of expected incomming connections
+size=100MiB # size of workspace for a single connection
+filename=malloc # device dax or an existing fsdax file or "malloc" for allocation from DRAM
+# filename=/dev/dax1.0
diff --git a/examples/librpma_gpspm-client.fio b/examples/librpma_gpspm-client.fio
new file mode 100644
index 00000000..843382df
--- /dev/null
+++ b/examples/librpma_gpspm-client.fio
@@ -0,0 +1,23 @@
+# Example of the librpma_gpspm_client job
+
+[global]
+ioengine=librpma_gpspm_client
+create_serialize=0 # (required) forces specific initiation sequence
+serverip=[serverip] #IP address the server is listening on
+port=7204 # port(s) the server will listen on, <port; port + numjobs - 1> will be used
+thread
+
+# The client will get a remote memory region description after establishing
+# a connection.
+
+[client]
+numjobs=1 # number of parallel connections
+group_reporting=1
+sync=1 # 1 is the best for latency measurements, 0 for bandwidth
+iodepth=2 # total number of ious
+iodepth_batch_submit=1 # number of ious to be submitted at once
+rw=write # write/randwrite
+blocksize=4KiB
+ramp_time=15s # gives some time to stabilize the workload
+time_based
+runtime=60s # run the workload for the specified period of time
diff --git a/examples/librpma_gpspm-server.fio b/examples/librpma_gpspm-server.fio
new file mode 100644
index 00000000..d618f2db
--- /dev/null
+++ b/examples/librpma_gpspm-server.fio
@@ -0,0 +1,31 @@
+# Example of the librpma_gpspm_server job
+
+[global]
+ioengine=librpma_gpspm_server
+create_serialize=0 # (required) forces specific initiation sequence
+kb_base=1000 # turn on the straight units handling (non-compatibility mode)
+serverip=[serverip] #IP address to listen on
+port=7204 # port(s) the server jobs will listen on, ports <port; port + numjobs - 1> will be used
+thread
+
+# The server side spawns one thread for each expected connection from
+# the client-side, opens and registers the range dedicated for this thread
+# (a workspace) from the provided memory.
+# Each of the server threads accepts a connection on the dedicated port
+# (different for each and every working thread), accepts and executes flush
+# requests, and sends back a flush response for each of the requests.
+# When the client is done it sends the termination notice to the server's thread.
+
+[server]
+# set to 1 (true) ONLY when Direct Write to PMem from the remote host is possible
+# (https://pmem.io/rpma/documentation/basic-direct-write-to-pmem.html)
+direct_write_to_pmem=0
+numjobs=1 # number of expected incomming connections
+iodepth=2 # number of parallel GPSPM requests
+size=100MiB # size of workspace for a single connection
+filename=malloc # device dax or an existing fsdax file or "malloc" for allocation from DRAM
+# filename=/dev/dax1.0
+
+# The client will terminate the server when the client will end up its job.
+time_based
+runtime=365d
diff --git a/fio.1 b/fio.1
index accc6a32..e190c241 100644
--- a/fio.1
+++ b/fio.1
@@ -1949,7 +1949,7 @@ The TCP or UDP port to bind to or connect to. If this is used with
this will be the starting port number since fio will use a range of
ports.
.TP
-.BI (rdma)port
+.BI (rdma, librpma_*)port
The port to use for RDMA-CM communication. This should be the same
value on the client and the server side.
.TP
@@ -1958,6 +1958,12 @@ The hostname or IP address to use for TCP, UDP or RDMA-CM based I/O.
If the job is a TCP listener or UDP reader, the hostname is not used
and must be omitted unless it is a valid UDP multicast address.
.TP
+.BI (librpma_*)serverip \fR=\fPstr
+The IP address to be used for RDMA-CM based I/O.
+.TP
+.BI (librpma_*_server)direct_write_to_pmem \fR=\fPbool
+Set to 1 only when Direct Write to PMem from the remote host is possible. Otherwise, set to 0.
+.TP
.BI (netsplice,net)interface \fR=\fPstr
The IP address of the network interface used to send or receive UDP
multicast.
diff --git a/optgroup.c b/optgroup.c
index 64774896..5b6d22a3 100644
--- a/optgroup.c
+++ b/optgroup.c
@@ -142,6 +142,10 @@ static const struct opt_group fio_opt_cat_groups[] = {
.mask = FIO_OPT_G_RDMA,
},
{
+ .name = "librpma I/O engines", /* librpma_apm && librpma_gpspm */
+ .mask = FIO_OPT_G_LIBRPMA,
+ },
+ {
.name = "libaio I/O engine", /* libaio */
.mask = FIO_OPT_G_LIBAIO,
},
diff --git a/optgroup.h b/optgroup.h
index d2f1ceb3..641547d2 100644
--- a/optgroup.h
+++ b/optgroup.h
@@ -52,6 +52,7 @@ enum opt_category_group {
__FIO_OPT_G_E4DEFRAG,
__FIO_OPT_G_NETIO,
__FIO_OPT_G_RDMA,
+ __FIO_OPT_G_LIBRPMA,
__FIO_OPT_G_LIBAIO,
__FIO_OPT_G_ACT,
__FIO_OPT_G_LATPROF,
@@ -94,6 +95,7 @@ enum opt_category_group {
FIO_OPT_G_E4DEFRAG = (1ULL << __FIO_OPT_G_E4DEFRAG),
FIO_OPT_G_NETIO = (1ULL << __FIO_OPT_G_NETIO),
FIO_OPT_G_RDMA = (1ULL << __FIO_OPT_G_RDMA),
+ FIO_OPT_G_LIBRPMA = (1ULL << __FIO_OPT_G_LIBRPMA),
FIO_OPT_G_LIBAIO = (1ULL << __FIO_OPT_G_LIBAIO),
FIO_OPT_G_ACT = (1ULL << __FIO_OPT_G_ACT),
FIO_OPT_G_LATPROF = (1ULL << __FIO_OPT_G_LATPROF),
diff --git a/options.c b/options.c
index e62e0cfb..625112c5 100644
--- a/options.c
+++ b/options.c
@@ -1913,6 +1913,16 @@ struct fio_option fio_options[FIO_MAX_OPTS] = {
.help = "RDMA IO engine",
},
#endif
+#ifdef CONFIG_LIBRPMA_APM
+ { .ival = "librpma_apm",
+ .help = "librpma IO engine in APM mode",
+ },
+#endif
+#ifdef CONFIG_LIBRPMA_GPSPM
+ { .ival = "librpma_gpspm",
+ .help = "librpma IO engine in GPSPM mode",
+ },
+#endif
#ifdef CONFIG_LINUX_EXT4_MOVE_EXTENT
{ .ival = "e4defrag",
.help = "ext4 defrag engine",