Merge branch 'zbd-no-parallel-init' of https://github.com/floatious/fio master
authorJens Axboe <axboe@kernel.dk>
Thu, 22 Apr 2021 17:18:23 +0000 (11:18 -0600)
committerJens Axboe <axboe@kernel.dk>
Thu, 22 Apr 2021 17:18:23 +0000 (11:18 -0600)
* 'zbd-no-parallel-init' of https://github.com/floatious/fio:
  init: zonemode=zbd does not work with create_serialize=0

52 files changed:
.gitignore
FIO-VERSION-GEN
HOWTO
Makefile
backend.c
cconv.c
ci/travis-install-librpma.sh [new file with mode: 0755]
ci/travis-install-pmdk.sh [new file with mode: 0755]
ci/travis-install.sh
configure
engines/dfs.c [new file with mode: 0644]
engines/falloc.c
engines/filecreate.c
engines/filedelete.c [new file with mode: 0644]
engines/io_uring.c
engines/librpma_apm.c [new file with mode: 0644]
engines/librpma_fio.c [new file with mode: 0644]
engines/librpma_fio.h [new file with mode: 0644]
engines/librpma_gpspm.c [new file with mode: 0644]
engines/librpma_gpspm_flush.pb-c.c [new file with mode: 0644]
engines/librpma_gpspm_flush.pb-c.h [new file with mode: 0644]
engines/librpma_gpspm_flush.proto [new file with mode: 0644]
engines/rados.c
eta.c
examples/dfs.fio [new file with mode: 0644]
examples/filedelete-ioengine.fio [new file with mode: 0644]
examples/librpma_apm-client.fio [new file with mode: 0644]
examples/librpma_apm-server.fio [new file with mode: 0644]
examples/librpma_gpspm-client.fio [new file with mode: 0644]
examples/librpma_gpspm-server.fio [new file with mode: 0644]
filesetup.c
fio.1
gettime.c
init.c
io_u.c
iolog.c
optgroup.c
optgroup.h
options.c
options.h
os/os-linux.h
parse.c
parse.h
server.c
server.h
stat.c
stat.h
t/io_uring.c
t/zbd/test-zbd-support
thread_options.h
zbd.c
zbd.h

index 0aa4a3611c031024f631418fee0fad1ba94d0cae..6651f96edc72ea3295c75cc9f9628eea9e267386 100644 (file)
@@ -30,3 +30,4 @@ doc/output
 /tags
 /TAGS
 /t/zbd/test-zbd-support.log.*
+/t/fuzz/fuzz_parseini
index 81a6355b981b1694bb79c3f073de9e693778e3b4..294860716cb75dc6e5dd099c7bc2cbb41cbae609 100755 (executable)
@@ -1,7 +1,7 @@
 #!/bin/sh
 
 GVF=FIO-VERSION-FILE
-DEF_VER=fio-3.25
+DEF_VER=fio-3.26
 
 LF='
 '
diff --git a/HOWTO b/HOWTO
index 52812cc7de37e7d5d95c34c45e757c8126643709..e6078c5f1e16e1143d4057b9a1e03bad21954d1f 100644 (file)
--- a/HOWTO
+++ b/HOWTO
@@ -1146,11 +1146,31 @@ I/O type
        behaves in a similar fashion, except it sends the same offset 8 number of
        times before generating a new offset.
 
-.. option:: unified_rw_reporting=bool
+.. option:: unified_rw_reporting=str
 
        Fio normally reports statistics on a per data direction basis, meaning that
-       reads, writes, and trims are accounted and reported separately. If this
-       option is set fio sums the results and report them as "mixed" instead.
+       reads, writes, and trims are accounted and reported separately. This option
+       determines whether fio reports the results normally, summed together, or as
+       both options.
+       Accepted values are:
+
+               **none**
+                       Normal statistics reporting.
+
+               **mixed**
+                       Statistics are summed per data direction and reported together.
+
+               **both**
+                       Statistics are reported normally, followed by the mixed statistics.
+
+               **0**
+                       Backward-compatible alias for **none**.
+
+               **1**
+                       Backward-compatible alias for **mixed**.
+               
+               **2**
+                       Alias for **both**.
 
 .. option:: randrepeat=bool
 
@@ -2035,6 +2055,11 @@ I/O engine
                        and 'nrfiles', so that files will be created.
                        This engine is to measure file lookup and meta data access.
 
+               **filedelete**
+                       Simply delete the files by unlink() and do no I/O to them. You need to set 'filesize'
+                       and 'nrfiles', so that the files will be created.
+                       This engine is to measure file delete.
+
                **libpmem**
                        Read and write using mmap I/O to a file on a filesystem
                        mounted with DAX on a persistent memory device through the PMDK
@@ -2067,6 +2092,9 @@ I/O engine
                        unless :option:`verify` is set or :option:`cuda_io` is `posix`.
                        :option:`iomem` must not be `cudamalloc`. This ioengine defines
                        engine specific options.
+               **dfs**
+                       I/O engine supporting asynchronous read and write operations to the
+                       DAOS File System (DFS) via libdfs.
 
 I/O engine specific parameters
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -2189,7 +2217,7 @@ with the caveat that when used on the command line, they must come after the
                this will be the starting port number since fio will use a range of
                ports.
 
-   [rdma]
+   [rdma], [librpma_*]
 
                The port to use for RDMA-CM communication. This should be the same value
                on the client and the server side.
@@ -2200,6 +2228,15 @@ with the caveat that when used on the command line, they must come after the
        is a TCP listener or UDP reader, the hostname is not used and must be omitted
        unless it is a valid UDP multicast address.
 
+.. option:: serverip=str : [librpma_*]
+
+       The IP address to be used for RDMA-CM based I/O.
+
+.. option:: direct_write_to_pmem=bool : [librpma_*]
+
+       Set to 1 only when Direct Write to PMem from the remote host is possible.
+       Otherwise, set to 0.
+
 .. option:: interface=str : [netsplice] [net]
 
        The IP address of the network interface used to send or receive UDP
@@ -2296,6 +2333,12 @@ with the caveat that when used on the command line, they must come after the
         Poll store instead of waiting for completion. Usually this provides better
         throughput at cost of higher(up to 100%) CPU utilization.
 
+.. option:: touch_objects=bool : [rados]
+
+        During initialization, touch (create if do not exist) all objects (files).
+        Touching all objects affects ceph caches and likely impacts test results.
+        Enabled by default.
+
 .. option:: skip_bad=bool : [mtd]
 
        Skip operations against known bad blocks.
@@ -2452,6 +2495,24 @@ with the caveat that when used on the command line, they must come after the
                GPU to RAM before a write and copied from RAM to GPU after a
                read. :option:`verify` does not affect use of cudaMemcpy.
 
+.. option:: pool=str : [dfs]
+
+       Specify the UUID of the DAOS pool to connect to.
+
+.. option:: cont=str : [dfs]
+
+       Specify the UUID of the DAOS container to open.
+
+.. option:: chunk_size=int : [dfs]
+
+       Specificy a different chunk size (in bytes) for the dfs file.
+       Use DAOS container's chunk size by default.
+
+.. option:: object_class=str : [dfs]
+
+       Specificy a different object class for the dfs file.
+       Use DAOS container's object class by default.
+
 I/O depth
 ~~~~~~~~~
 
@@ -2663,11 +2724,12 @@ I/O latency
        true, fio will continue running and try to meet :option:`latency_target`
        by adjusting queue depth.
 
-.. option:: max_latency=time
+.. option:: max_latency=time[,time][,time]
 
        If set, fio will exit the job with an ETIMEDOUT error if it exceeds this
        maximum latency. When the unit is omitted, the value is interpreted in
-       microseconds.
+       microseconds. Comma-separated values may be specified for reads, writes,
+       and trims as described in :option:`blocksize`.
 
 .. option:: rate_cycle=int
 
index 612344d154093f33d165c81746972dae90eebd43..ba027b2e1a7110ee58591a874a61119507da1bf6 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -51,7 +51,7 @@ SOURCE :=     $(sort $(patsubst $(SRCDIR)/%,%,$(wildcard $(SRCDIR)/crc/*.c)) \
                pshared.c options.c \
                smalloc.c filehash.c profile.c debug.c engines/cpu.c \
                engines/mmap.c engines/sync.c engines/null.c engines/net.c \
-               engines/ftruncate.c engines/filecreate.c engines/filestat.c \
+               engines/ftruncate.c engines/filecreate.c engines/filestat.c engines/filedelete.c \
                server.c client.c iolog.c backend.c libfio.c flow.c cconv.c \
                gettime-thread.c helpers.c json.c idletime.c td_error.c \
                profiles/tiobench.c profiles/act.c io_u_queue.c filelock.c \
@@ -94,6 +94,21 @@ ifdef CONFIG_RDMA
   rdma_LIBS = -libverbs -lrdmacm
   ENGINES += rdma
 endif
+ifdef CONFIG_LIBRPMA_APM
+  librpma_apm_SRCS = engines/librpma_apm.c
+  librpma_fio_SRCS = engines/librpma_fio.c
+  librpma_apm_LIBS = -lrpma -lpmem
+  ENGINES += librpma_apm
+endif
+ifdef CONFIG_LIBRPMA_GPSPM
+  librpma_gpspm_SRCS = engines/librpma_gpspm.c engines/librpma_gpspm_flush.pb-c.c
+  librpma_fio_SRCS = engines/librpma_fio.c
+  librpma_gpspm_LIBS = -lrpma -lpmem -lprotobuf-c
+  ENGINES += librpma_gpspm
+endif
+ifdef librpma_fio_SRCS
+  SOURCE += $(librpma_fio_SRCS)
+endif
 ifdef CONFIG_POSIXAIO
   SOURCE += engines/posixaio.c
 endif
@@ -130,6 +145,11 @@ ifdef CONFIG_HTTP
   http_LIBS = -lcurl -lssl -lcrypto
   ENGINES += http
 endif
+ifdef CONFIG_DFS
+  dfs_SRCS = engines/dfs.c
+  dfs_LIBS = -luuid -ldaos -ldfs
+  ENGINES += dfs
+endif
 SOURCE += oslib/asprintf.c
 ifndef CONFIG_STRSEP
   SOURCE += oslib/strsep.c
index f2efddd67d365dc9c4300892275caaa88796a2e1..399c299e14aa6f518502e02cf94f1def7e713403 100644 (file)
--- a/backend.c
+++ b/backend.c
@@ -1341,22 +1341,19 @@ int init_io_u_buffers(struct thread_data *td)
        return 0;
 }
 
+#ifdef FIO_HAVE_IOSCHED_SWITCH
 /*
- * This function is Linux specific.
+ * These functions are Linux specific.
  * FIO_HAVE_IOSCHED_SWITCH enabled currently means it's Linux.
  */
-static int switch_ioscheduler(struct thread_data *td)
+static int set_ioscheduler(struct thread_data *td, struct fio_file *file)
 {
-#ifdef FIO_HAVE_IOSCHED_SWITCH
        char tmp[256], tmp2[128], *p;
        FILE *f;
        int ret;
 
-       if (td_ioengine_flagged(td, FIO_DISKLESSIO))
-               return 0;
-
-       assert(td->files && td->files[0]);
-       sprintf(tmp, "%s/queue/scheduler", td->files[0]->du->sysfs_root);
+       assert(file->du && file->du->sysfs_root);
+       sprintf(tmp, "%s/queue/scheduler", file->du->sysfs_root);
 
        f = fopen(tmp, "r+");
        if (!f) {
@@ -1417,11 +1414,55 @@ static int switch_ioscheduler(struct thread_data *td)
 
        fclose(f);
        return 0;
+}
+
+static int switch_ioscheduler(struct thread_data *td)
+{
+       struct fio_file *f;
+       unsigned int i;
+       int ret = 0;
+
+       if (td_ioengine_flagged(td, FIO_DISKLESSIO))
+               return 0;
+
+       assert(td->files && td->files[0]);
+
+       for_each_file(td, f, i) {
+
+               /* Only consider regular files and block device files */
+               switch (f->filetype) {
+               case FIO_TYPE_FILE:
+               case FIO_TYPE_BLOCK:
+                       /*
+                        * Make sure that the device hosting the file could
+                        * be determined.
+                        */
+                       if (!f->du)
+                               continue;
+                       break;
+               case FIO_TYPE_CHAR:
+               case FIO_TYPE_PIPE:
+               default:
+                       continue;
+               }
+
+               ret = set_ioscheduler(td, f);
+               if (ret)
+                       return ret;
+       }
+
+       return 0;
+}
+
 #else
+
+static int switch_ioscheduler(struct thread_data *td)
+{
        return 0;
-#endif
 }
 
+#endif /* FIO_HAVE_IOSCHED_SWITCH */
+
 static bool keep_running(struct thread_data *td)
 {
        unsigned long long limit;
@@ -2537,6 +2578,7 @@ int fio_backend(struct sk_out *sk_out)
        for_each_td(td, i) {
                steadystate_free(td);
                fio_options_free(td);
+               fio_dump_options_free(td);
                if (td->rusage_sem) {
                        fio_sem_remove(td->rusage_sem);
                        td->rusage_sem = NULL;
diff --git a/cconv.c b/cconv.c
index b10868fb3de6b2dd0844799ab36fd145ff68448e..aa06e3ea6ee7004cc4b65d99839a83f492766d9c 100644 (file)
--- a/cconv.c
+++ b/cconv.c
@@ -143,6 +143,8 @@ void convert_thread_options_to_cpu(struct thread_options *o,
                o->rate_iops_min[i] = le32_to_cpu(top->rate_iops_min[i]);
 
                o->perc_rand[i] = le32_to_cpu(top->perc_rand[i]);
+
+               o->max_latency[i] = le64_to_cpu(top->max_latency[i]);
        }
 
        o->ratecycle = le32_to_cpu(top->ratecycle);
@@ -289,7 +291,6 @@ void convert_thread_options_to_cpu(struct thread_options *o,
        o->sync_file_range = le32_to_cpu(top->sync_file_range);
        o->latency_target = le64_to_cpu(top->latency_target);
        o->latency_window = le64_to_cpu(top->latency_window);
-       o->max_latency = le64_to_cpu(top->max_latency);
        o->latency_percentile.u.f = fio_uint64_to_double(le64_to_cpu(top->latency_percentile.u.i));
        o->latency_run = le32_to_cpu(top->latency_run);
        o->compress_percentage = le32_to_cpu(top->compress_percentage);
@@ -491,7 +492,6 @@ void convert_thread_options_to_net(struct thread_options_pack *top,
        top->sync_file_range = cpu_to_le32(o->sync_file_range);
        top->latency_target = __cpu_to_le64(o->latency_target);
        top->latency_window = __cpu_to_le64(o->latency_window);
-       top->max_latency = __cpu_to_le64(o->max_latency);
        top->latency_percentile.u.i = __cpu_to_le64(fio_double_to_uint64(o->latency_percentile.u.f));
        top->latency_run = __cpu_to_le32(o->latency_run);
        top->compress_percentage = cpu_to_le32(o->compress_percentage);
@@ -550,6 +550,8 @@ void convert_thread_options_to_net(struct thread_options_pack *top,
                top->rate_iops_min[i] = cpu_to_le32(o->rate_iops_min[i]);
 
                top->perc_rand[i] = cpu_to_le32(o->perc_rand[i]);
+
+               top->max_latency[i] = __cpu_to_le64(o->max_latency[i]);
        }
 
        memcpy(top->verify_pattern, o->verify_pattern, MAX_PATTERN_SIZE);
diff --git a/ci/travis-install-librpma.sh b/ci/travis-install-librpma.sh
new file mode 100755 (executable)
index 0000000..b127f3f
--- /dev/null
@@ -0,0 +1,22 @@
+#!/bin/bash -e
+
+# 11.02.2021 Merge pull request #866 from ldorau/rpma-mmap-memory-for-rpma_mr_reg-in-rpma_flush_apm_new
+LIBRPMA_VERSION=fbac593917e98f3f26abf14f4fad5a832b330f5c
+ZIP_FILE=rpma.zip
+
+WORKDIR=$(pwd)
+
+# install librpma
+wget -O $ZIP_FILE https://github.com/pmem/rpma/archive/${LIBRPMA_VERSION}.zip
+unzip $ZIP_FILE
+mkdir -p rpma-${LIBRPMA_VERSION}/build
+cd rpma-${LIBRPMA_VERSION}/build
+cmake .. -DCMAKE_BUILD_TYPE=Release \
+       -DCMAKE_INSTALL_PREFIX=/usr \
+       -DBUILD_DOC=OFF \
+       -DBUILD_EXAMPLES=OFF \
+       -DBUILD_TESTS=OFF
+make -j$(nproc)
+sudo make -j$(nproc) install
+cd $WORKDIR
+rm -rf $ZIP_FILE rpma-${LIBRPMA_VERSION}
diff --git a/ci/travis-install-pmdk.sh b/ci/travis-install-pmdk.sh
new file mode 100755 (executable)
index 0000000..803438f
--- /dev/null
@@ -0,0 +1,28 @@
+#!/bin/bash -e
+
+# pmdk v1.9.1 release
+PMDK_VERSION=1.9.1
+
+WORKDIR=$(pwd)
+
+#
+# The '/bin/sh' shell used by PMDK's 'make install'
+# does not know the exact localization of clang
+# and fails with:
+#    /bin/sh: 1: clang: not found
+# if CC is not set to the full path of clang.
+#
+export CC=$(which $CC)
+
+# Install PMDK libraries, because PMDK's libpmem
+# is a dependency of the librpma fio engine.
+# Install it from a release package
+# with already generated documentation,
+# in order to not install 'pandoc'.
+wget https://github.com/pmem/pmdk/releases/download/${PMDK_VERSION}/pmdk-${PMDK_VERSION}.tar.gz
+tar -xzf pmdk-${PMDK_VERSION}.tar.gz
+cd pmdk-${PMDK_VERSION}
+make -j$(nproc) NDCTL_ENABLE=n
+sudo make -j$(nproc) install prefix=/usr NDCTL_ENABLE=n
+cd $WORKDIR
+rm -rf pmdk-${PMDK_VERSION}
index 103695dc6d6391f132d90681a6be4e8009b76f87..4c4c04c5d6ae4a781c52270a0f62fcbc337951e5 100755 (executable)
@@ -43,6 +43,16 @@ case "$TRAVIS_OS_NAME" in
        )
        sudo apt-get -qq update
        sudo apt-get install --no-install-recommends -qq -y "${pkgs[@]}"
+       # librpma is supported on the amd64 (x86_64) architecture for now
+       if [[ $CI_TARGET_ARCH == "amd64" ]]; then
+               # install libprotobuf-c-dev required by librpma_gpspm
+               sudo apt-get install --no-install-recommends -qq -y libprotobuf-c-dev
+               # PMDK libraries have to be installed, because
+               # libpmem is a dependency of the librpma fio engine
+               ci/travis-install-pmdk.sh
+               # install librpma from sources from GitHub
+               ci/travis-install-librpma.sh
+       fi
        ;;
     "osx")
        brew update >/dev/null 2>&1
index 748f7014c63541b26ef66dd8b72fcd1a8a27f777..a7d82be06b3c886550a77c34636c539895bc6b16 100755 (executable)
--- a/configure
+++ b/configure
@@ -171,6 +171,7 @@ march_set="no"
 libiscsi="no"
 libnbd="no"
 libzbc=""
+dfs=""
 dynamic_engines="no"
 prefix=/usr/local
 
@@ -242,6 +243,8 @@ for opt do
   ;;
   --dynamic-libengines) dynamic_engines="yes"
   ;;
+  --disable-dfs) dfs="no"
+  ;;
   --help)
     show_help="yes"
     ;;
@@ -284,6 +287,7 @@ if test "$show_help" = "yes" ; then
   echo "--disable-libzbc        Disable libzbc even if found"
   echo "--disable-tcmalloc     Disable tcmalloc support"
   echo "--dynamic-libengines   Lib-based ioengines as dynamic libraries"
+  echo "--disable-dfs          Disable DAOS File System support even if found"
   exit $exit_val
 fi
 
@@ -413,6 +417,8 @@ CYGWIN*)
   clock_gettime="yes" # clock_monotonic probe has dependency on this
   clock_monotonic="yes"
   sched_idle="yes"
+  pthread_condattr_setclock="no"
+  pthread_affinity="no"
   ;;
 esac
 
@@ -758,10 +764,8 @@ print_config "POSIX pshared support" "$posix_pshared"
 
 ##########################################
 # POSIX pthread_condattr_setclock() probe
-if test "$pthread_condattr_setclock" != "yes" ; then
-  pthread_condattr_setclock="no"
-fi
-cat > $TMPC <<EOF
+if test "$pthread_condattr_setclock" != "no" ; then
+  cat > $TMPC <<EOF
 #include <pthread.h>
 int main(void)
 {
@@ -770,11 +774,12 @@ int main(void)
   return 0;
 }
 EOF
-if compile_prog "" "$LIBS" "pthread_condattr_setclock" ; then
-  pthread_condattr_setclock=yes
-elif compile_prog "" "$LIBS -lpthread" "pthread_condattr_setclock" ; then
-  pthread_condattr_setclock=yes
-  LIBS="$LIBS -lpthread"
+  if compile_prog "" "$LIBS" "pthread_condattr_setclock" ; then
+    pthread_condattr_setclock=yes
+  elif compile_prog "" "$LIBS -lpthread" "pthread_condattr_setclock" ; then
+    pthread_condattr_setclock=yes
+    LIBS="$LIBS -lpthread"
+  fi
 fi
 print_config "pthread_condattr_setclock()" "$pthread_condattr_setclock"
 
@@ -799,6 +804,29 @@ elif compile_prog "" "$LIBS -lpthread" "pthread_sigmask" ; then
 fi
 print_config "pthread_sigmask()" "$pthread_sigmask"
 
+##########################################
+# pthread_getaffinity_np() probe
+if test "$pthread_getaffinity" != "yes" ; then
+  pthread_getaffinity="no"
+fi
+cat > $TMPC <<EOF
+#include <stddef.h> /* NULL */
+#include <signal.h> /* pthread_sigmask() */
+#include <pthread.h>
+int main(void)
+{
+  cpu_set_t set;
+  return pthread_getaffinity_np(pthread_self(), sizeof(set), &set);
+}
+EOF
+if compile_prog "" "$LIBS" "pthread_getaffinity" ; then
+  pthread_getaffinity="yes"
+elif compile_prog "" "$LIBS -lpthread" "pthread_getaffinity" ; then
+  pthread_getaffinity="yes"
+  LIBS="$LIBS -lpthread"
+fi
+print_config "pthread_getaffinity_np()" "$pthread_getaffinity"
+
 ##########################################
 # solaris aio probe
 if test "$solaris_aio" != "yes" ; then
@@ -920,6 +948,49 @@ if test "$disable_rdma" != "yes" && compile_prog "" "-lrdmacm" "rdma"; then
 fi
 print_config "rdmacm" "$rdmacm"
 
+##########################################
+# librpma probe
+if test "$librpma" != "yes" ; then
+  librpma="no"
+fi
+cat > $TMPC << EOF
+#include <stdio.h>
+#include <librpma.h>
+int main(int argc, char **argv)
+{
+  enum rpma_conn_event event = RPMA_CONN_REJECTED;
+  (void) event; /* unused */
+  rpma_log_set_threshold(RPMA_LOG_THRESHOLD, RPMA_LOG_LEVEL_INFO);
+  return 0;
+}
+EOF
+if test "$disable_rdma" != "yes" && compile_prog "" "-lrpma" "rpma"; then
+    librpma="yes"
+fi
+print_config "librpma" "$librpma"
+
+##########################################
+# libprotobuf-c probe
+if test "$libprotobuf_c" != "yes" ; then
+  libprotobuf_c="no"
+fi
+cat > $TMPC << EOF
+#include <stdio.h>
+#include <protobuf-c/protobuf-c.h>
+#if !defined(PROTOBUF_C_VERSION_NUMBER)
+# error PROTOBUF_C_VERSION_NUMBER is not defined!
+#endif
+int main(int argc, char **argv)
+{
+  (void)protobuf_c_message_check(NULL);
+  return 0;
+}
+EOF
+if compile_prog "" "-lprotobuf-c" "protobuf_c"; then
+    libprotobuf_c="yes"
+fi
+print_config "libprotobuf_c" "$libprotobuf_c"
+
 ##########################################
 # asprintf() and vasprintf() probes
 if test "$have_asprintf" != "yes" ; then
@@ -2179,6 +2250,33 @@ if test "$libnbd" != "no" ; then
 fi
 print_config "NBD engine" "$libnbd"
 
+##########################################
+# check for dfs (DAOS File System)
+if test "$dfs" != "no" ; then
+  cat > $TMPC << EOF
+#include <fcntl.h>
+#include <daos.h>
+#include <daos_fs.h>
+
+int main(int argc, char **argv)
+{
+  daos_handle_t        poh;
+  daos_handle_t        coh;
+  dfs_t                *dfs;
+
+  (void) dfs_mount(poh, coh, O_RDWR, &dfs);
+
+  return 0;
+}
+EOF
+  if compile_prog "" "-luuid -ldfs -ldaos" "dfs"; then
+    dfs="yes"
+  else
+    dfs="no"
+  fi
+fi
+print_config "DAOS File System (dfs) Engine" "$dfs"
+
 ##########################################
 # Check if we have lex/yacc available
 yacc="no"
@@ -2749,6 +2847,9 @@ fi
 if test "$pthread_sigmask" = "yes" ; then
   output_sym "CONFIG_PTHREAD_SIGMASK"
 fi
+if test "$pthread_getaffinity" = "yes" ; then
+  output_sym "CONFIG_PTHREAD_GETAFFINITY"
+fi
 if test "$have_asprintf" = "yes" ; then
     output_sym "CONFIG_HAVE_ASPRINTF"
 fi
@@ -2788,18 +2889,21 @@ fi
 if test "$libverbs" = "yes" -a "$rdmacm" = "yes" ; then
   output_sym "CONFIG_RDMA"
 fi
+# librpma is supported on the 'x86_64' architecture for now
+if test "$cpu" = "x86_64" -a "$libverbs" = "yes" -a "$rdmacm" = "yes" \
+    -a "$librpma" = "yes" -a "$libpmem" = "yes" ; then
+  output_sym "CONFIG_LIBRPMA_APM"
+fi
+if test "$cpu" = "x86_64" -a "$libverbs" = "yes" -a "$rdmacm" = "yes" \
+    -a "$librpma" = "yes" -a "$libpmem" = "yes" -a "$libprotobuf_c" = "yes" ; then
+  output_sym "CONFIG_LIBRPMA_GPSPM"
+fi
 if test "$clock_gettime" = "yes" ; then
   output_sym "CONFIG_CLOCK_GETTIME"
 fi
 if test "$clock_monotonic" = "yes" ; then
   output_sym "CONFIG_CLOCK_MONOTONIC"
 fi
-if test "$clock_monotonic_raw" = "yes" ; then
-  output_sym "CONFIG_CLOCK_MONOTONIC_RAW"
-fi
-if test "$clock_monotonic_precise" = "yes" ; then
-  output_sym "CONFIG_CLOCK_MONOTONIC_PRECISE"
-fi
 if test "$clockid_t" = "yes"; then
   output_sym "CONFIG_CLOCKID_T"
 fi
@@ -2994,6 +3098,9 @@ fi
 if test "$libcufile" = "yes" ; then
   output_sym "CONFIG_LIBCUFILE"
 fi
+if test "$dfs" = "yes" ; then
+  output_sym "CONFIG_DFS"
+fi
 if test "$march_set" = "no" && test "$build_native" = "yes" ; then
   output_sym "CONFIG_BUILD_NATIVE"
 fi
diff --git a/engines/dfs.c b/engines/dfs.c
new file mode 100644 (file)
index 0000000..0343b10
--- /dev/null
@@ -0,0 +1,583 @@
+/**
+ * FIO engine for DAOS File System (dfs).
+ *
+ * (C) Copyright 2020-2021 Intel Corporation.
+ */
+
+#include <fio.h>
+#include <optgroup.h>
+
+#include <daos.h>
+#include <daos_fs.h>
+
+static bool            daos_initialized;
+static int             num_threads;
+static pthread_mutex_t daos_mutex = PTHREAD_MUTEX_INITIALIZER;
+daos_handle_t          poh;  /* pool handle */
+daos_handle_t          coh;  /* container handle */
+daos_oclass_id_t       cid = OC_UNKNOWN;  /* object class */
+dfs_t                  *dfs; /* dfs mount reference */
+
+struct daos_iou {
+       struct io_u     *io_u;
+       daos_event_t    ev;
+       d_sg_list_t     sgl;
+       d_iov_t         iov;
+       daos_size_t     size;
+       bool            complete;
+};
+
+struct daos_data {
+       daos_handle_t   eqh;
+       dfs_obj_t       *obj;
+       struct io_u     **io_us;
+       int             queued;
+       int             num_ios;
+};
+
+struct daos_fio_options {
+       void            *pad;
+       char            *pool;   /* Pool UUID */
+       char            *cont;   /* Container UUID */
+       daos_size_t     chsz;    /* Chunk size */
+       char            *oclass; /* object class */
+#if !defined(DAOS_API_VERSION_MAJOR) || DAOS_API_VERSION_MAJOR < 1
+       char            *svcl;   /* service replica list, deprecated */
+#endif
+};
+
+static struct fio_option options[] = {
+       {
+               .name           = "pool",
+               .lname          = "pool uuid",
+               .type           = FIO_OPT_STR_STORE,
+               .off1           = offsetof(struct daos_fio_options, pool),
+               .help           = "DAOS pool uuid",
+               .category       = FIO_OPT_C_ENGINE,
+               .group          = FIO_OPT_G_DFS,
+       },
+       {
+               .name           = "cont",
+               .lname          = "container uuid",
+               .type           = FIO_OPT_STR_STORE,
+               .off1           = offsetof(struct daos_fio_options, cont),
+               .help           = "DAOS container uuid",
+               .category       = FIO_OPT_C_ENGINE,
+               .group          = FIO_OPT_G_DFS,
+       },
+       {
+               .name           = "chunk_size",
+               .lname          = "DFS chunk size",
+               .type           = FIO_OPT_ULL,
+               .off1           = offsetof(struct daos_fio_options, chsz),
+               .help           = "DFS chunk size in bytes",
+               .def            = "0", /* use container default */
+               .category       = FIO_OPT_C_ENGINE,
+               .group          = FIO_OPT_G_DFS,
+       },
+       {
+               .name           = "object_class",
+               .lname          = "object class",
+               .type           = FIO_OPT_STR_STORE,
+               .off1           = offsetof(struct daos_fio_options, oclass),
+               .help           = "DAOS object class",
+               .category       = FIO_OPT_C_ENGINE,
+               .group          = FIO_OPT_G_DFS,
+       },
+#if !defined(DAOS_API_VERSION_MAJOR) || DAOS_API_VERSION_MAJOR < 1
+       {
+               .name           = "svcl",
+               .lname          = "List of service ranks",
+               .type           = FIO_OPT_STR_STORE,
+               .off1           = offsetof(struct daos_fio_options, svcl),
+               .help           = "List of pool replicated service ranks",
+               .category       = FIO_OPT_C_ENGINE,
+               .group          = FIO_OPT_G_DFS,
+       },
+#endif
+       {
+               .name           = NULL,
+       },
+};
+
+static int daos_fio_global_init(struct thread_data *td)
+{
+       struct daos_fio_options *eo = td->eo;
+       uuid_t                  pool_uuid, co_uuid;
+       daos_pool_info_t        pool_info;
+       daos_cont_info_t        co_info;
+       int                     rc = 0;
+
+#if !defined(DAOS_API_VERSION_MAJOR) || DAOS_API_VERSION_MAJOR < 1
+       if (!eo->pool || !eo->cont || !eo->svcl) {
+#else
+       if (!eo->pool || !eo->cont) {
+#endif
+               log_err("Missing required DAOS options\n");
+               return EINVAL;
+       }
+
+       rc = daos_init();
+       if (rc != -DER_ALREADY && rc) {
+               log_err("Failed to initialize daos %d\n", rc);
+               td_verror(td, rc, "daos_init");
+               return rc;
+       }
+
+       rc = uuid_parse(eo->pool, pool_uuid);
+       if (rc) {
+               log_err("Failed to parse 'Pool uuid': %s\n", eo->pool);
+               td_verror(td, EINVAL, "uuid_parse(eo->pool)");
+               return EINVAL;
+       }
+
+       rc = uuid_parse(eo->cont, co_uuid);
+       if (rc) {
+               log_err("Failed to parse 'Cont uuid': %s\n", eo->cont);
+               td_verror(td, EINVAL, "uuid_parse(eo->cont)");
+               return EINVAL;
+       }
+
+       /* Connect to the DAOS pool */
+#if !defined(DAOS_API_VERSION_MAJOR) || DAOS_API_VERSION_MAJOR < 1
+       d_rank_list_t *svcl = NULL;
+
+       svcl = daos_rank_list_parse(eo->svcl, ":");
+       if (svcl == NULL) {
+               log_err("Failed to parse svcl\n");
+               td_verror(td, EINVAL, "daos_rank_list_parse");
+               return EINVAL;
+       }
+
+       rc = daos_pool_connect(pool_uuid, NULL, svcl, DAOS_PC_RW,
+                       &poh, &pool_info, NULL);
+       d_rank_list_free(svcl);
+#else
+       rc = daos_pool_connect(pool_uuid, NULL, DAOS_PC_RW, &poh, &pool_info,
+                              NULL);
+#endif
+       if (rc) {
+               log_err("Failed to connect to pool %d\n", rc);
+               td_verror(td, rc, "daos_pool_connect");
+               return rc;
+       }
+
+       /* Open the DAOS container */
+       rc = daos_cont_open(poh, co_uuid, DAOS_COO_RW, &coh, &co_info, NULL);
+       if (rc) {
+               log_err("Failed to open container: %d\n", rc);
+               td_verror(td, rc, "daos_cont_open");
+               (void)daos_pool_disconnect(poh, NULL);
+               return rc;
+       }
+
+       /* Mount encapsulated filesystem */
+       rc = dfs_mount(poh, coh, O_RDWR, &dfs);
+       if (rc) {
+               log_err("Failed to mount DFS namespace: %d\n", rc);
+               td_verror(td, rc, "dfs_mount");
+               (void)daos_pool_disconnect(poh, NULL);
+               (void)daos_cont_close(coh, NULL);
+               return rc;
+       }
+
+       /* Retrieve object class to use, if specified */
+       if (eo->oclass)
+               cid = daos_oclass_name2id(eo->oclass);
+
+       return 0;
+}
+
+static int daos_fio_global_cleanup()
+{
+       int rc;
+       int ret = 0;
+
+       rc = dfs_umount(dfs);
+       if (rc) {
+               log_err("failed to umount dfs: %d\n", rc);
+               ret = rc;
+       }
+       rc = daos_cont_close(coh, NULL);
+       if (rc) {
+               log_err("failed to close container: %d\n", rc);
+               if (ret == 0)
+                       ret = rc;
+       }
+       rc = daos_pool_disconnect(poh, NULL);
+       if (rc) {
+               log_err("failed to disconnect pool: %d\n", rc);
+               if (ret == 0)
+                       ret = rc;
+       }
+       rc = daos_fini();
+       if (rc) {
+               log_err("failed to finalize daos: %d\n", rc);
+               if (ret == 0)
+                       ret = rc;
+       }
+
+       return ret;
+}
+
+static int daos_fio_setup(struct thread_data *td)
+{
+       return 0;
+}
+
+static int daos_fio_init(struct thread_data *td)
+{
+       struct daos_data        *dd;
+       int                     rc = 0;
+
+       pthread_mutex_lock(&daos_mutex);
+
+       dd = malloc(sizeof(*dd));
+       if (dd == NULL) {
+               log_err("Failed to allocate DAOS-private data\n");
+               rc = ENOMEM;
+               goto out;
+       }
+
+       dd->queued      = 0;
+       dd->num_ios     = td->o.iodepth;
+       dd->io_us       = calloc(dd->num_ios, sizeof(struct io_u *));
+       if (dd->io_us == NULL) {
+               log_err("Failed to allocate IO queue\n");
+               rc = ENOMEM;
+               goto out;
+       }
+
+       /* initialize DAOS stack if not already up */
+       if (!daos_initialized) {
+               rc = daos_fio_global_init(td);
+               if (rc)
+                       goto out;
+               daos_initialized = true;
+       }
+
+       rc = daos_eq_create(&dd->eqh);
+       if (rc) {
+               log_err("Failed to create event queue: %d\n", rc);
+               td_verror(td, rc, "daos_eq_create");
+               goto out;
+       }
+
+       td->io_ops_data = dd;
+       num_threads++;
+out:
+       if (rc) {
+               if (dd) {
+                       free(dd->io_us);
+                       free(dd);
+               }
+               if (num_threads == 0 && daos_initialized) {
+                       /* don't clobber error return value */
+                       (void)daos_fio_global_cleanup();
+                       daos_initialized = false;
+               }
+       }
+       pthread_mutex_unlock(&daos_mutex);
+       return rc;
+}
+
+static void daos_fio_cleanup(struct thread_data *td)
+{
+       struct daos_data        *dd = td->io_ops_data;
+       int                     rc;
+
+       if (dd == NULL)
+               return;
+
+       rc = daos_eq_destroy(dd->eqh, DAOS_EQ_DESTROY_FORCE);
+       if (rc < 0) {
+               log_err("failed to destroy event queue: %d\n", rc);
+               td_verror(td, rc, "daos_eq_destroy");
+       }
+
+       free(dd->io_us);
+       free(dd);
+
+       pthread_mutex_lock(&daos_mutex);
+       num_threads--;
+       if (daos_initialized && num_threads == 0) {
+               int ret;
+
+               ret = daos_fio_global_cleanup();
+               if (ret < 0 && rc == 0) {
+                       log_err("failed to clean up: %d\n", ret);
+                       td_verror(td, ret, "daos_fio_global_cleanup");
+               }
+               daos_initialized = false;
+       }
+       pthread_mutex_unlock(&daos_mutex);
+}
+
+static int daos_fio_get_file_size(struct thread_data *td, struct fio_file *f)
+{
+       char            *file_name = f->file_name;
+       struct stat     stbuf = {0};
+       int             rc;
+
+       dprint(FD_FILE, "dfs stat %s\n", f->file_name);
+
+       if (!daos_initialized)
+               return 0;
+
+       rc = dfs_stat(dfs, NULL, file_name, &stbuf);
+       if (rc) {
+               log_err("Failed to stat %s: %d\n", f->file_name, rc);
+               td_verror(td, rc, "dfs_stat");
+               return rc;
+       }
+
+       f->real_file_size = stbuf.st_size;
+       return 0;
+}
+
+static int daos_fio_close(struct thread_data *td, struct fio_file *f)
+{
+       struct daos_data        *dd = td->io_ops_data;
+       int                     rc;
+
+       dprint(FD_FILE, "dfs release %s\n", f->file_name);
+
+       rc = dfs_release(dd->obj);
+       if (rc) {
+               log_err("Failed to release %s: %d\n", f->file_name, rc);
+               td_verror(td, rc, "dfs_release");
+               return rc;
+       }
+
+       return 0;
+}
+
+static int daos_fio_open(struct thread_data *td, struct fio_file *f)
+{
+       struct daos_data        *dd = td->io_ops_data;
+       struct daos_fio_options *eo = td->eo;
+       int                     flags = 0;
+       int                     rc;
+
+       dprint(FD_FILE, "dfs open %s (%s/%d/%d)\n",
+              f->file_name, td_write(td) & !read_only ? "rw" : "r",
+              td->o.create_on_open, td->o.allow_create);
+
+       if (td->o.create_on_open && td->o.allow_create)
+               flags |= O_CREAT;
+
+       if (td_write(td)) {
+               if (!read_only)
+                       flags |= O_RDWR;
+               if (td->o.allow_create)
+                       flags |= O_CREAT;
+       } else if (td_read(td)) {
+               flags |= O_RDONLY;
+       }
+
+       rc = dfs_open(dfs, NULL, f->file_name,
+                     S_IFREG | S_IRUSR | S_IWUSR,
+                     flags, cid, eo->chsz, NULL, &dd->obj);
+       if (rc) {
+               log_err("Failed to open %s: %d\n", f->file_name, rc);
+               td_verror(td, rc, "dfs_open");
+               return rc;
+       }
+
+       return 0;
+}
+
+static int daos_fio_unlink(struct thread_data *td, struct fio_file *f)
+{
+       int rc;
+
+       dprint(FD_FILE, "dfs remove %s\n", f->file_name);
+
+       rc = dfs_remove(dfs, NULL, f->file_name, false, NULL);
+       if (rc) {
+               log_err("Failed to remove %s: %d\n", f->file_name, rc);
+               td_verror(td, rc, "dfs_remove");
+               return rc;
+       }
+
+       return 0;
+}
+
+static int daos_fio_invalidate(struct thread_data *td, struct fio_file *f)
+{
+       dprint(FD_FILE, "dfs invalidate %s\n", f->file_name);
+       return 0;
+}
+
+static void daos_fio_io_u_free(struct thread_data *td, struct io_u *io_u)
+{
+       struct daos_iou *io = io_u->engine_data;
+
+       if (io) {
+               io_u->engine_data = NULL;
+               free(io);
+       }
+}
+
+static int daos_fio_io_u_init(struct thread_data *td, struct io_u *io_u)
+{
+       struct daos_iou *io;
+
+       io = malloc(sizeof(struct daos_iou));
+       if (!io) {
+               td_verror(td, ENOMEM, "malloc");
+               return ENOMEM;
+       }
+       io->io_u = io_u;
+       io_u->engine_data = io;
+       return 0;
+}
+
+static struct io_u * daos_fio_event(struct thread_data *td, int event)
+{
+       struct daos_data *dd = td->io_ops_data;
+
+       return dd->io_us[event];
+}
+
+static int daos_fio_getevents(struct thread_data *td, unsigned int min,
+                             unsigned int max, const struct timespec *t)
+{
+       struct daos_data        *dd = td->io_ops_data;
+       daos_event_t            *evp[max];
+       unsigned int            events = 0;
+       int                     i;
+       int                     rc;
+
+       while (events < min) {
+               rc = daos_eq_poll(dd->eqh, 0, DAOS_EQ_NOWAIT, max, evp);
+               if (rc < 0) {
+                       log_err("Event poll failed: %d\n", rc);
+                       td_verror(td, rc, "daos_eq_poll");
+                       return events;
+               }
+
+               for (i = 0; i < rc; i++) {
+                       struct daos_iou *io;
+                       struct io_u     *io_u;
+
+                       io = container_of(evp[i], struct daos_iou, ev);
+                       if (io->complete)
+                               log_err("Completion on already completed I/O\n");
+
+                       io_u = io->io_u;
+                       if (io->ev.ev_error)
+                               io_u->error = io->ev.ev_error;
+                       else
+                               io_u->resid = 0;
+
+                       dd->io_us[events] = io_u;
+                       dd->queued--;
+                       daos_event_fini(&io->ev);
+                       io->complete = true;
+                       events++;
+               }
+       }
+
+       dprint(FD_IO, "dfs eq_pool returning %d (%u/%u)\n", events, min, max);
+
+       return events;
+}
+
+static enum fio_q_status daos_fio_queue(struct thread_data *td,
+                                       struct io_u *io_u)
+{
+       struct daos_data        *dd = td->io_ops_data;
+       struct daos_iou         *io = io_u->engine_data;
+       daos_off_t              offset = io_u->offset;
+       int                     rc;
+
+       if (dd->queued == td->o.iodepth)
+               return FIO_Q_BUSY;
+
+       io->sgl.sg_nr = 1;
+       io->sgl.sg_nr_out = 0;
+       d_iov_set(&io->iov, io_u->xfer_buf, io_u->xfer_buflen);
+       io->sgl.sg_iovs = &io->iov;
+       io->size = io_u->xfer_buflen;
+
+       io->complete = false;
+       rc = daos_event_init(&io->ev, dd->eqh, NULL);
+       if (rc) {
+               log_err("Event init failed: %d\n", rc);
+               io_u->error = rc;
+               return FIO_Q_COMPLETED;
+       }
+
+       switch (io_u->ddir) {
+       case DDIR_WRITE:
+               rc = dfs_write(dfs, dd->obj, &io->sgl, offset, &io->ev);
+               if (rc) {
+                       log_err("dfs_write failed: %d\n", rc);
+                       io_u->error = rc;
+                       return FIO_Q_COMPLETED;
+               }
+               break;
+       case DDIR_READ:
+               rc = dfs_read(dfs, dd->obj, &io->sgl, offset, &io->size,
+                             &io->ev);
+               if (rc) {
+                       log_err("dfs_read failed: %d\n", rc);
+                       io_u->error = rc;
+                       return FIO_Q_COMPLETED;
+               }
+               break;
+       case DDIR_SYNC:
+               io_u->error = 0;
+               return FIO_Q_COMPLETED;
+       default:
+               dprint(FD_IO, "Invalid IO type: %d\n", io_u->ddir);
+               io_u->error = -DER_INVAL;
+               return FIO_Q_COMPLETED;
+       }
+
+       dd->queued++;
+       return FIO_Q_QUEUED;
+}
+
+static int daos_fio_prep(struct thread_data fio_unused *td, struct io_u *io_u)
+{
+       return 0;
+}
+
+/* ioengine_ops for get_ioengine() */
+FIO_STATIC struct ioengine_ops ioengine = {
+       .name                   = "dfs",
+       .version                = FIO_IOOPS_VERSION,
+       .flags                  = FIO_DISKLESSIO | FIO_NODISKUTIL,
+
+       .setup                  = daos_fio_setup,
+       .init                   = daos_fio_init,
+       .prep                   = daos_fio_prep,
+       .cleanup                = daos_fio_cleanup,
+
+       .open_file              = daos_fio_open,
+       .invalidate             = daos_fio_invalidate,
+       .get_file_size          = daos_fio_get_file_size,
+       .close_file             = daos_fio_close,
+       .unlink_file            = daos_fio_unlink,
+
+       .queue                  = daos_fio_queue,
+       .getevents              = daos_fio_getevents,
+       .event                  = daos_fio_event,
+       .io_u_init              = daos_fio_io_u_init,
+       .io_u_free              = daos_fio_io_u_free,
+
+       .option_struct_size     = sizeof(struct daos_fio_options),
+       .options                = options,
+};
+
+static void fio_init fio_dfs_register(void)
+{
+       register_ioengine(&ioengine);
+}
+
+static void fio_exit fio_dfs_unregister(void)
+{
+       unregister_ioengine(&ioengine);
+}
index 6382569b9a92c42c1ad5d76a75d9441808de6cc8..4b05ed68fb467263ddbebabe7e79dd5231f92b63 100644 (file)
@@ -25,8 +25,8 @@ static int open_file(struct thread_data *td, struct fio_file *f)
 
        dprint(FD_FILE, "fd open %s\n", f->file_name);
 
-       if (f->filetype != FIO_TYPE_FILE) {
-               log_err("fio: only files are supported fallocate \n");
+       if (f->filetype != FIO_TYPE_FILE && f->filetype != FIO_TYPE_BLOCK) {
+               log_err("fio: only files and blockdev are supported fallocate \n");
                return 1;
        }
        if (!strcmp(f->file_name, "-")) {
index 5fec8544a6a79898fed4e6c4854ffd416234c66a..16c64928162654ad76c6697958b846eb5bb0d985 100644 (file)
@@ -22,7 +22,7 @@ static int open_file(struct thread_data *td, struct fio_file *f)
        dprint(FD_FILE, "fd open %s\n", f->file_name);
 
        if (f->filetype != FIO_TYPE_FILE) {
-               log_err("fio: only files are supported fallocate \n");
+               log_err("fio: only files are supported\n");
                return 1;
        }
        if (!strcmp(f->file_name, "-")) {
diff --git a/engines/filedelete.c b/engines/filedelete.c
new file mode 100644 (file)
index 0000000..64c5863
--- /dev/null
@@ -0,0 +1,115 @@
+/*
+ * file delete engine
+ *
+ * IO engine that doesn't do any IO, just delete files and track the latency
+ * of the file deletion.
+ */
+#include <stdio.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include "../fio.h"
+
+struct fc_data {
+       enum fio_ddir stat_ddir;
+};
+
+static int delete_file(struct thread_data *td, struct fio_file *f)
+{
+       struct timespec start;
+       int do_lat = !td->o.disable_lat;
+       int ret;
+
+       dprint(FD_FILE, "fd delete %s\n", f->file_name);
+
+       if (f->filetype != FIO_TYPE_FILE) {
+               log_err("fio: only files are supported\n");
+               return 1;
+       }
+       if (!strcmp(f->file_name, "-")) {
+               log_err("fio: can't read/write to stdin/out\n");
+               return 1;
+       }
+
+       if (do_lat)
+               fio_gettime(&start, NULL);
+
+       ret = unlink(f->file_name);
+
+       if (ret == -1) {
+               char buf[FIO_VERROR_SIZE];
+               int e = errno;
+
+               snprintf(buf, sizeof(buf), "delete(%s)", f->file_name);
+               td_verror(td, e, buf);
+               return 1;
+       }
+
+       if (do_lat) {
+               struct fc_data *data = td->io_ops_data;
+               uint64_t nsec;
+
+               nsec = ntime_since_now(&start);
+               add_clat_sample(td, data->stat_ddir, nsec, 0, 0, 0);
+       }
+
+       return 0;
+}
+
+
+static enum fio_q_status queue_io(struct thread_data *td, struct io_u fio_unused *io_u)
+{
+       return FIO_Q_COMPLETED;
+}
+
+static int init(struct thread_data *td)
+{
+       struct fc_data *data;
+
+       data = calloc(1, sizeof(*data));
+
+       if (td_read(td))
+               data->stat_ddir = DDIR_READ;
+       else if (td_write(td))
+               data->stat_ddir = DDIR_WRITE;
+
+       td->io_ops_data = data;
+       return 0;
+}
+
+static int delete_invalidate(struct thread_data *td, struct fio_file *f)
+{
+    /* do nothing because file not opened */
+    return 0;
+}
+
+static void cleanup(struct thread_data *td)
+{
+       struct fc_data *data = td->io_ops_data;
+
+       free(data);
+}
+
+static struct ioengine_ops ioengine = {
+       .name           = "filedelete",
+       .version        = FIO_IOOPS_VERSION,
+       .init           = init,
+       .invalidate     = delete_invalidate,
+       .cleanup        = cleanup,
+       .queue          = queue_io,
+       .get_file_size  = generic_get_file_size,
+       .open_file      = delete_file,
+       .flags          =  FIO_SYNCIO | FIO_FAKEIO |
+                               FIO_NOSTATS | FIO_NOFILEHASH,
+};
+
+static void fio_init fio_filedelete_register(void)
+{
+       register_ioengine(&ioengine);
+}
+
+static void fio_exit fio_filedelete_unregister(void)
+{
+       unregister_ioengine(&ioengine);
+}
index c9036ba079b808b65f15a7d3ea920424018471ca..b962e8041b6f8d113669b4b2a31224a68d19aa0f 100644 (file)
@@ -696,11 +696,11 @@ static int fio_ioring_post_init(struct thread_data *td)
 
        err = fio_ioring_queue_init(td);
        if (err) {
-               int __errno = errno;
+               int init_err = errno;
 
-               if (__errno == ENOSYS)
+               if (init_err == ENOSYS)
                        log_err("fio: your kernel doesn't support io_uring\n");
-               td_verror(td, __errno, "io_queue_init");
+               td_verror(td, init_err, "io_queue_init");
                return 1;
        }
 
diff --git a/engines/librpma_apm.c b/engines/librpma_apm.c
new file mode 100644 (file)
index 0000000..ffa3769
--- /dev/null
@@ -0,0 +1,256 @@
+/*
+* librpma_apm: IO engine that uses PMDK librpma to read and write data,
+ *             based on Appliance Persistency Method
+ *
+ * Copyright 2020-2021, Intel Corporation
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License,
+ * version 2 as published by the Free Software Foundation..
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ */
+
+#include "librpma_fio.h"
+
+/* client side implementation */
+
+static inline int client_io_flush(struct thread_data *td,
+               struct io_u *first_io_u, struct io_u *last_io_u,
+               unsigned long long int len);
+
+static int client_get_io_u_index(struct rpma_completion *cmpl,
+               unsigned int *io_u_index);
+
+static int client_init(struct thread_data *td)
+{
+       struct librpma_fio_client_data *ccd;
+       unsigned int sq_size;
+       uint32_t cq_size;
+       struct rpma_conn_cfg *cfg = NULL;
+       struct rpma_peer_cfg *pcfg = NULL;
+       int ret;
+
+       /* not supported readwrite = trim / randtrim / trimwrite */
+       if (td_trim(td)) {
+               td_verror(td, EINVAL, "Not supported mode.");
+               return -1;
+       }
+
+       /*
+        * Calculate the required queue sizes where:
+        * - the send queue (SQ) has to be big enough to accommodate
+        *   all io_us (WRITEs) and all flush requests (FLUSHes)
+        * - the completion queue (CQ) has to be big enough to accommodate all
+        *   success and error completions (cq_size = sq_size)
+        */
+       if (td_random(td) || td_rw(td)) {
+               /*
+                * sq_size = max(rand_read_sq_size, rand_write_sq_size)
+                * where rand_read_sq_size < rand_write_sq_size because read
+                * does not require flush afterwards
+                * rand_write_sq_size = N * (WRITE + FLUSH)
+                *
+                * Note: rw is no different from random write since having
+                * interleaved reads with writes in extreme forces you to flush
+                * as often as when the writes are random.
+                */
+               sq_size = 2 * td->o.iodepth;
+       } else if (td_write(td)) {
+               /* sequential TD_DDIR_WRITE only */
+               if (td->o.sync_io) {
+                       sq_size = 2; /* WRITE + FLUSH */
+               } else {
+                       /*
+                        * N * WRITE + B * FLUSH where:
+                        * - B == ceil(iodepth / iodepth_batch)
+                        *   which is the number of batches for N writes
+                        */
+                       sq_size = td->o.iodepth + LIBRPMA_FIO_CEIL(td->o.iodepth,
+                                       td->o.iodepth_batch);
+               }
+       } else {
+               /* TD_DDIR_READ only */
+               if (td->o.sync_io) {
+                       sq_size = 1; /* READ */
+               } else {
+                       sq_size = td->o.iodepth; /* N x READ */
+               }
+       }
+       cq_size = sq_size;
+
+       /* create a connection configuration object */
+       if ((ret = rpma_conn_cfg_new(&cfg))) {
+               librpma_td_verror(td, ret, "rpma_conn_cfg_new");
+               return -1;
+       }
+
+       /* apply queue sizes */
+       if ((ret = rpma_conn_cfg_set_sq_size(cfg, sq_size))) {
+               librpma_td_verror(td, ret, "rpma_conn_cfg_set_sq_size");
+               goto err_cfg_delete;
+       }
+       if ((ret = rpma_conn_cfg_set_cq_size(cfg, cq_size))) {
+               librpma_td_verror(td, ret, "rpma_conn_cfg_set_cq_size");
+               goto err_cfg_delete;
+       }
+
+       if (librpma_fio_client_init(td, cfg))
+               goto err_cfg_delete;
+
+       ccd = td->io_ops_data;
+
+       if (ccd->server_mr_flush_type == RPMA_FLUSH_TYPE_PERSISTENT) {
+               if (!ccd->ws->direct_write_to_pmem) {
+                       if (td->thread_number == 1)
+                               log_err(
+                                       "Fio librpma engine will not work until the Direct Write to PMem on the server side is possible (direct_write_to_pmem)\n");
+                       goto err_cleanup_common;
+               }
+
+               /* configure peer's direct write to pmem support */
+               if ((ret = rpma_peer_cfg_new(&pcfg))) {
+                       librpma_td_verror(td, ret, "rpma_peer_cfg_new");
+                       goto err_cleanup_common;
+               }
+
+               if ((ret = rpma_peer_cfg_set_direct_write_to_pmem(pcfg, true))) {
+                       librpma_td_verror(td, ret,
+                               "rpma_peer_cfg_set_direct_write_to_pmem");
+                       (void) rpma_peer_cfg_delete(&pcfg);
+                       goto err_cleanup_common;
+               }
+
+               if ((ret = rpma_conn_apply_remote_peer_cfg(ccd->conn, pcfg))) {
+                       librpma_td_verror(td, ret,
+                               "rpma_conn_apply_remote_peer_cfg");
+                       (void) rpma_peer_cfg_delete(&pcfg);
+                       goto err_cleanup_common;
+               }
+
+               (void) rpma_peer_cfg_delete(&pcfg);
+       } else if (td->thread_number == 1) {
+               /* XXX log_info mixes with the JSON output */
+               log_err(
+                       "Note: Direct Write to PMem is not supported by default nor required if you use DRAM instead of PMem on the server side (direct_write_to_pmem).\n"
+                       "Remember that flushing to DRAM does not make your data persistent and may be used only for experimental purposes.\n");
+       }
+
+       if ((ret = rpma_conn_cfg_delete(&cfg))) {
+               librpma_td_verror(td, ret, "rpma_conn_cfg_delete");
+               /* non fatal error - continue */
+       }
+
+       ccd->flush = client_io_flush;
+       ccd->get_io_u_index = client_get_io_u_index;
+
+       return 0;
+
+err_cleanup_common:
+       librpma_fio_client_cleanup(td);
+
+err_cfg_delete:
+       (void) rpma_conn_cfg_delete(&cfg);
+
+       return -1;
+}
+
+static void client_cleanup(struct thread_data *td)
+{
+       struct librpma_fio_client_data *ccd = td->io_ops_data;
+
+       if (ccd == NULL)
+               return;
+
+       free(ccd->client_data);
+
+       librpma_fio_client_cleanup(td);
+}
+
+static inline int client_io_flush(struct thread_data *td,
+               struct io_u *first_io_u, struct io_u *last_io_u,
+               unsigned long long int len)
+{
+       struct librpma_fio_client_data *ccd = td->io_ops_data;
+       size_t dst_offset = first_io_u->offset;
+       int ret;
+
+       if ((ret = rpma_flush(ccd->conn, ccd->server_mr, dst_offset, len,
+                       ccd->server_mr_flush_type, RPMA_F_COMPLETION_ALWAYS,
+                       (void *)(uintptr_t)last_io_u->index))) {
+               librpma_td_verror(td, ret, "rpma_flush");
+               return -1;
+       }
+
+       return 0;
+}
+
+static int client_get_io_u_index(struct rpma_completion *cmpl,
+               unsigned int *io_u_index)
+{
+       memcpy(io_u_index, &cmpl->op_context, sizeof(*io_u_index));
+
+       return 1;
+}
+
+FIO_STATIC struct ioengine_ops ioengine_client = {
+       .name                   = "librpma_apm_client",
+       .version                = FIO_IOOPS_VERSION,
+       .init                   = client_init,
+       .post_init              = librpma_fio_client_post_init,
+       .get_file_size          = librpma_fio_client_get_file_size,
+       .open_file              = librpma_fio_file_nop,
+       .queue                  = librpma_fio_client_queue,
+       .commit                 = librpma_fio_client_commit,
+       .getevents              = librpma_fio_client_getevents,
+       .event                  = librpma_fio_client_event,
+       .errdetails             = librpma_fio_client_errdetails,
+       .close_file             = librpma_fio_file_nop,
+       .cleanup                = client_cleanup,
+       .flags                  = FIO_DISKLESSIO,
+       .options                = librpma_fio_options,
+       .option_struct_size     = sizeof(struct librpma_fio_options_values),
+};
+
+/* server side implementation */
+
+static int server_open_file(struct thread_data *td, struct fio_file *f)
+{
+       return librpma_fio_server_open_file(td, f, NULL);
+}
+
+static enum fio_q_status server_queue(struct thread_data *td, struct io_u *io_u)
+{
+       return FIO_Q_COMPLETED;
+}
+
+FIO_STATIC struct ioengine_ops ioengine_server = {
+       .name                   = "librpma_apm_server",
+       .version                = FIO_IOOPS_VERSION,
+       .init                   = librpma_fio_server_init,
+       .open_file              = server_open_file,
+       .close_file             = librpma_fio_server_close_file,
+       .queue                  = server_queue,
+       .invalidate             = librpma_fio_file_nop,
+       .cleanup                = librpma_fio_server_cleanup,
+       .flags                  = FIO_SYNCIO,
+       .options                = librpma_fio_options,
+       .option_struct_size     = sizeof(struct librpma_fio_options_values),
+};
+
+/* register both engines */
+
+static void fio_init fio_librpma_apm_register(void)
+{
+       register_ioengine(&ioengine_client);
+       register_ioengine(&ioengine_server);
+}
+
+static void fio_exit fio_librpma_apm_unregister(void)
+{
+       unregister_ioengine(&ioengine_client);
+       unregister_ioengine(&ioengine_server);
+}
diff --git a/engines/librpma_fio.c b/engines/librpma_fio.c
new file mode 100644 (file)
index 0000000..810b55e
--- /dev/null
@@ -0,0 +1,1051 @@
+/*
+ * librpma_fio: librpma_apm and librpma_gpspm engines' common part.
+ *
+ * Copyright 2021, Intel Corporation
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License,
+ * version 2 as published by the Free Software Foundation..
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ */
+
+#include "librpma_fio.h"
+
+#include <libpmem.h>
+
+struct fio_option librpma_fio_options[] = {
+       {
+               .name   = "serverip",
+               .lname  = "rpma_server_ip",
+               .type   = FIO_OPT_STR_STORE,
+               .off1   = offsetof(struct librpma_fio_options_values, server_ip),
+               .help   = "IP address the server is listening on",
+               .def    = "",
+               .category = FIO_OPT_C_ENGINE,
+               .group  = FIO_OPT_G_LIBRPMA,
+       },
+       {
+               .name   = "port",
+               .lname  = "rpma_server port",
+               .type   = FIO_OPT_STR_STORE,
+               .off1   = offsetof(struct librpma_fio_options_values, port),
+               .help   = "port the server is listening on",
+               .def    = "7204",
+               .category = FIO_OPT_C_ENGINE,
+               .group  = FIO_OPT_G_LIBRPMA,
+       },
+       {
+               .name   = "direct_write_to_pmem",
+               .lname  = "Direct Write to PMem (via RDMA) from the remote host is possible",
+               .type   = FIO_OPT_BOOL,
+               .off1   = offsetof(struct librpma_fio_options_values,
+                                       direct_write_to_pmem),
+               .help   = "Set to true ONLY when Direct Write to PMem from the remote host is possible (https://pmem.io/rpma/documentation/basic-direct-write-to-pmem.html)",
+               .def    = "",
+               .category = FIO_OPT_C_ENGINE,
+               .group  = FIO_OPT_G_LIBRPMA,
+       },
+       {
+               .name   = NULL,
+       },
+};
+
+int librpma_fio_td_port(const char *port_base_str, struct thread_data *td,
+               char *port_out)
+{
+       unsigned long int port_ul = strtoul(port_base_str, NULL, 10);
+       unsigned int port_new;
+
+       port_out[0] = '\0';
+
+       if (port_ul == ULONG_MAX) {
+               td_verror(td, errno, "strtoul");
+               return -1;
+       }
+       port_ul += td->thread_number - 1;
+       if (port_ul >= UINT_MAX) {
+               log_err("[%u] port number (%lu) bigger than UINT_MAX\n",
+                       td->thread_number, port_ul);
+               return -1;
+       }
+
+       port_new = port_ul;
+       snprintf(port_out, LIBRPMA_FIO_PORT_STR_LEN_MAX - 1, "%u", port_new);
+
+       return 0;
+}
+
+char *librpma_fio_allocate_dram(struct thread_data *td, size_t size,
+       struct librpma_fio_mem *mem)
+{
+       char *mem_ptr = NULL;
+       int ret;
+
+       if ((ret = posix_memalign((void **)&mem_ptr, page_size, size))) {
+               log_err("fio: posix_memalign() failed\n");
+               td_verror(td, ret, "posix_memalign");
+               return NULL;
+       }
+
+       mem->mem_ptr = mem_ptr;
+       mem->size_mmap = 0;
+
+       return mem_ptr;
+}
+
+char *librpma_fio_allocate_pmem(struct thread_data *td, const char *filename,
+               size_t size, struct librpma_fio_mem *mem)
+{
+       size_t size_mmap = 0;
+       char *mem_ptr = NULL;
+       int is_pmem = 0;
+       size_t ws_offset;
+
+       if (size % page_size) {
+               log_err("fio: size (%zu) is not aligned to page size (%zu)\n",
+                       size, page_size);
+               return NULL;
+       }
+
+       ws_offset = (td->thread_number - 1) * size;
+
+       if (!filename) {
+               log_err("fio: filename is not set\n");
+               return NULL;
+       }
+
+       /* map the file */
+       mem_ptr = pmem_map_file(filename, 0 /* len */, 0 /* flags */,
+                       0 /* mode */, &size_mmap, &is_pmem);
+       if (mem_ptr == NULL) {
+               log_err("fio: pmem_map_file(%s) failed\n", filename);
+               /* pmem_map_file() sets errno on failure */
+               td_verror(td, errno, "pmem_map_file");
+               return NULL;
+       }
+
+       /* pmem is expected */
+       if (!is_pmem) {
+               log_err("fio: %s is not located in persistent memory\n",
+                       filename);
+               goto err_unmap;
+       }
+
+       /* check size of allocated persistent memory */
+       if (size_mmap < ws_offset + size) {
+               log_err(
+                       "fio: %s is too small to handle so many threads (%zu < %zu)\n",
+                       filename, size_mmap, ws_offset + size);
+               goto err_unmap;
+       }
+
+       log_info("fio: size of memory mapped from the file %s: %zu\n",
+               filename, size_mmap);
+
+       mem->mem_ptr = mem_ptr;
+       mem->size_mmap = size_mmap;
+
+       return mem_ptr + ws_offset;
+
+err_unmap:
+       (void) pmem_unmap(mem_ptr, size_mmap);
+       return NULL;
+}
+
+void librpma_fio_free(struct librpma_fio_mem *mem)
+{
+       if (mem->size_mmap)
+               (void) pmem_unmap(mem->mem_ptr, mem->size_mmap);
+       else
+               free(mem->mem_ptr);
+}
+
+#define LIBRPMA_FIO_RETRY_MAX_NO       10
+#define LIBRPMA_FIO_RETRY_DELAY_S      5
+
+int librpma_fio_client_init(struct thread_data *td,
+               struct rpma_conn_cfg *cfg)
+{
+       struct librpma_fio_client_data *ccd;
+       struct librpma_fio_options_values *o = td->eo;
+       struct ibv_context *dev = NULL;
+       char port_td[LIBRPMA_FIO_PORT_STR_LEN_MAX];
+       struct rpma_conn_req *req = NULL;
+       enum rpma_conn_event event;
+       struct rpma_conn_private_data pdata;
+       enum rpma_log_level log_level_aux = RPMA_LOG_LEVEL_WARNING;
+       int remote_flush_type;
+       int retry;
+       int ret;
+
+       /* --debug=net sets RPMA_LOG_THRESHOLD_AUX to RPMA_LOG_LEVEL_INFO */
+#ifdef FIO_INC_DEBUG
+       if ((1UL << FD_NET) & fio_debug)
+               log_level_aux = RPMA_LOG_LEVEL_INFO;
+#endif
+
+       /* configure logging thresholds to see more details */
+       rpma_log_set_threshold(RPMA_LOG_THRESHOLD, RPMA_LOG_LEVEL_INFO);
+       rpma_log_set_threshold(RPMA_LOG_THRESHOLD_AUX, log_level_aux);
+
+       /* obtain an IBV context for a remote IP address */
+       if ((ret = rpma_utils_get_ibv_context(o->server_ip,
+                       RPMA_UTIL_IBV_CONTEXT_REMOTE, &dev))) {
+               librpma_td_verror(td, ret, "rpma_utils_get_ibv_context");
+               return -1;
+       }
+
+       /* allocate client's data */
+       ccd = calloc(1, sizeof(*ccd));
+       if (ccd == NULL) {
+               td_verror(td, errno, "calloc");
+               return -1;
+       }
+
+       /* allocate all in-memory queues */
+       ccd->io_us_queued = calloc(td->o.iodepth, sizeof(*ccd->io_us_queued));
+       if (ccd->io_us_queued == NULL) {
+               td_verror(td, errno, "calloc");
+               goto err_free_ccd;
+       }
+
+       ccd->io_us_flight = calloc(td->o.iodepth, sizeof(*ccd->io_us_flight));
+       if (ccd->io_us_flight == NULL) {
+               td_verror(td, errno, "calloc");
+               goto err_free_io_u_queues;
+       }
+
+       ccd->io_us_completed = calloc(td->o.iodepth,
+                       sizeof(*ccd->io_us_completed));
+       if (ccd->io_us_completed == NULL) {
+               td_verror(td, errno, "calloc");
+               goto err_free_io_u_queues;
+       }
+
+       /* create a new peer object */
+       if ((ret = rpma_peer_new(dev, &ccd->peer))) {
+               librpma_td_verror(td, ret, "rpma_peer_new");
+               goto err_free_io_u_queues;
+       }
+
+       /* create a connection request */
+       if (librpma_fio_td_port(o->port, td, port_td))
+               goto err_peer_delete;
+
+       for (retry = 0; retry < LIBRPMA_FIO_RETRY_MAX_NO; retry++) {
+               if ((ret = rpma_conn_req_new(ccd->peer, o->server_ip, port_td,
+                               cfg, &req))) {
+                       librpma_td_verror(td, ret, "rpma_conn_req_new");
+                       goto err_peer_delete;
+               }
+
+               /*
+                * Connect the connection request
+                * and obtain the connection object.
+                */
+               if ((ret = rpma_conn_req_connect(&req, NULL, &ccd->conn))) {
+                       librpma_td_verror(td, ret, "rpma_conn_req_connect");
+                       goto err_req_delete;
+               }
+
+               /* wait for the connection to establish */
+               if ((ret = rpma_conn_next_event(ccd->conn, &event))) {
+                       librpma_td_verror(td, ret, "rpma_conn_next_event");
+                       goto err_conn_delete;
+               } else if (event == RPMA_CONN_ESTABLISHED) {
+                       break;
+               } else if (event == RPMA_CONN_REJECTED) {
+                       (void) rpma_conn_disconnect(ccd->conn);
+                       (void) rpma_conn_delete(&ccd->conn);
+                       if (retry < LIBRPMA_FIO_RETRY_MAX_NO - 1) {
+                               log_err("Thread [%d]: Retrying (#%i) ...\n",
+                                       td->thread_number, retry + 1);
+                               sleep(LIBRPMA_FIO_RETRY_DELAY_S);
+                       } else {
+                               log_err(
+                                       "Thread [%d]: The maximum number of retries exceeded. Closing.\n",
+                                       td->thread_number);
+                       }
+               } else {
+                       log_err(
+                               "rpma_conn_next_event returned an unexptected event: (%s != RPMA_CONN_ESTABLISHED)\n",
+                               rpma_utils_conn_event_2str(event));
+                       goto err_conn_delete;
+               }
+       }
+
+       if (retry > 0)
+               log_err("Thread [%d]: Connected after retry #%i\n",
+                       td->thread_number, retry);
+
+       if (ccd->conn == NULL)
+               goto err_peer_delete;
+
+       /* get the connection's private data sent from the server */
+       if ((ret = rpma_conn_get_private_data(ccd->conn, &pdata))) {
+               librpma_td_verror(td, ret, "rpma_conn_get_private_data");
+               goto err_conn_delete;
+       }
+
+       /* get the server's workspace representation */
+       ccd->ws = pdata.ptr;
+
+       /* create the server's memory representation */
+       if ((ret = rpma_mr_remote_from_descriptor(&ccd->ws->descriptor[0],
+                       ccd->ws->mr_desc_size, &ccd->server_mr))) {
+               librpma_td_verror(td, ret, "rpma_mr_remote_from_descriptor");
+               goto err_conn_delete;
+       }
+
+       /* get the total size of the shared server memory */
+       if ((ret = rpma_mr_remote_get_size(ccd->server_mr, &ccd->ws_size))) {
+               librpma_td_verror(td, ret, "rpma_mr_remote_get_size");
+               goto err_conn_delete;
+       }
+
+       /* get flush type of the remote node */
+       if ((ret = rpma_mr_remote_get_flush_type(ccd->server_mr,
+                       &remote_flush_type))) {
+               librpma_td_verror(td, ret, "rpma_mr_remote_get_flush_type");
+               goto err_conn_delete;
+       }
+
+       ccd->server_mr_flush_type =
+               (remote_flush_type & RPMA_MR_USAGE_FLUSH_TYPE_PERSISTENT) ?
+               RPMA_FLUSH_TYPE_PERSISTENT : RPMA_FLUSH_TYPE_VISIBILITY;
+
+       /*
+        * Assure an io_us buffer allocation is page-size-aligned which is required
+        * to register for RDMA. User-provided value is intentionally ignored.
+        */
+       td->o.mem_align = page_size;
+
+       td->io_ops_data = ccd;
+
+       return 0;
+
+err_conn_delete:
+       (void) rpma_conn_disconnect(ccd->conn);
+       (void) rpma_conn_delete(&ccd->conn);
+
+err_req_delete:
+       (void) rpma_conn_req_delete(&req);
+
+err_peer_delete:
+       (void) rpma_peer_delete(&ccd->peer);
+
+err_free_io_u_queues:
+       free(ccd->io_us_queued);
+       free(ccd->io_us_flight);
+       free(ccd->io_us_completed);
+
+err_free_ccd:
+       free(ccd);
+
+       return -1;
+}
+
+void librpma_fio_client_cleanup(struct thread_data *td)
+{
+       struct librpma_fio_client_data *ccd = td->io_ops_data;
+       enum rpma_conn_event ev;
+       int ret;
+
+       if (ccd == NULL)
+               return;
+
+       /* delete the iou's memory registration */
+       if ((ret = rpma_mr_dereg(&ccd->orig_mr)))
+               librpma_td_verror(td, ret, "rpma_mr_dereg");
+       /* delete the iou's memory registration */
+       if ((ret = rpma_mr_remote_delete(&ccd->server_mr)))
+               librpma_td_verror(td, ret, "rpma_mr_remote_delete");
+       /* initiate disconnection */
+       if ((ret = rpma_conn_disconnect(ccd->conn)))
+               librpma_td_verror(td, ret, "rpma_conn_disconnect");
+       /* wait for disconnection to end up */
+       if ((ret = rpma_conn_next_event(ccd->conn, &ev))) {
+               librpma_td_verror(td, ret, "rpma_conn_next_event");
+       } else if (ev != RPMA_CONN_CLOSED) {
+               log_err(
+                       "client_cleanup received an unexpected event (%s != RPMA_CONN_CLOSED)\n",
+                       rpma_utils_conn_event_2str(ev));
+       }
+       /* delete the connection */
+       if ((ret = rpma_conn_delete(&ccd->conn)))
+               librpma_td_verror(td, ret, "rpma_conn_delete");
+       /* delete the peer */
+       if ((ret = rpma_peer_delete(&ccd->peer)))
+               librpma_td_verror(td, ret, "rpma_peer_delete");
+       /* free the software queues */
+       free(ccd->io_us_queued);
+       free(ccd->io_us_flight);
+       free(ccd->io_us_completed);
+       free(ccd);
+       td->io_ops_data = NULL; /* zero ccd */
+}
+
+int librpma_fio_file_nop(struct thread_data *td, struct fio_file *f)
+{
+       /* NOP */
+       return 0;
+}
+
+int librpma_fio_client_post_init(struct thread_data *td)
+{
+       struct librpma_fio_client_data *ccd =  td->io_ops_data;
+       size_t io_us_size;
+       int ret;
+
+       /*
+        * td->orig_buffer is not aligned. The engine requires aligned io_us
+        * so FIO alignes up the address using the formula below.
+        */
+       ccd->orig_buffer_aligned = PTR_ALIGN(td->orig_buffer, page_mask) +
+                       td->o.mem_align;
+
+       /*
+        * td->orig_buffer_size beside the space really consumed by io_us
+        * has paddings which can be omitted for the memory registration.
+        */
+       io_us_size = (unsigned long long)td_max_bs(td) *
+                       (unsigned long long)td->o.iodepth;
+
+       if ((ret = rpma_mr_reg(ccd->peer, ccd->orig_buffer_aligned, io_us_size,
+                       RPMA_MR_USAGE_READ_DST | RPMA_MR_USAGE_READ_SRC |
+                       RPMA_MR_USAGE_WRITE_DST | RPMA_MR_USAGE_WRITE_SRC |
+                       RPMA_MR_USAGE_FLUSH_TYPE_PERSISTENT, &ccd->orig_mr)))
+               librpma_td_verror(td, ret, "rpma_mr_reg");
+       return ret;
+}
+
+int librpma_fio_client_get_file_size(struct thread_data *td,
+               struct fio_file *f)
+{
+       struct librpma_fio_client_data *ccd = td->io_ops_data;
+
+       f->real_file_size = ccd->ws_size;
+       fio_file_set_size_known(f);
+
+       return 0;
+}
+
+static enum fio_q_status client_queue_sync(struct thread_data *td,
+               struct io_u *io_u)
+{
+       struct librpma_fio_client_data *ccd = td->io_ops_data;
+       struct rpma_completion cmpl;
+       unsigned io_u_index;
+       int ret;
+
+       /* execute io_u */
+       if (io_u->ddir == DDIR_READ) {
+               /* post an RDMA read operation */
+               if (librpma_fio_client_io_read(td, io_u,
+                               RPMA_F_COMPLETION_ALWAYS))
+                       goto err;
+       } else if (io_u->ddir == DDIR_WRITE) {
+               /* post an RDMA write operation */
+               if (librpma_fio_client_io_write(td, io_u))
+                       goto err;
+               if (ccd->flush(td, io_u, io_u, io_u->xfer_buflen))
+                       goto err;
+       } else {
+               log_err("unsupported IO mode: %s\n", io_ddir_name(io_u->ddir));
+               goto err;
+       }
+
+       do {
+               /* get a completion */
+               ret = rpma_conn_completion_get(ccd->conn, &cmpl);
+               if (ret == RPMA_E_NO_COMPLETION) {
+                       /* lack of completion is not an error */
+                       continue;
+               } else if (ret != 0) {
+                       /* an error occurred */
+                       librpma_td_verror(td, ret, "rpma_conn_completion_get");
+                       goto err;
+               }
+
+               /* if io_us has completed with an error */
+               if (cmpl.op_status != IBV_WC_SUCCESS)
+                       goto err;
+
+               if (cmpl.op == RPMA_OP_SEND)
+                       ++ccd->op_send_completed;
+               else {
+                       if (cmpl.op == RPMA_OP_RECV)
+                               ++ccd->op_recv_completed;
+
+                       break;
+               }
+       } while (1);
+
+       if (ccd->get_io_u_index(&cmpl, &io_u_index) != 1)
+               goto err;
+
+       if (io_u->index != io_u_index) {
+               log_err(
+                       "no matching io_u for received completion found (io_u_index=%u)\n",
+                       io_u_index);
+               goto err;
+       }
+
+       /* make sure all SENDs are completed before exit - clean up SQ */
+       if (librpma_fio_client_io_complete_all_sends(td))
+               goto err;
+
+       return FIO_Q_COMPLETED;
+
+err:
+       io_u->error = -1;
+       return FIO_Q_COMPLETED;
+}
+
+enum fio_q_status librpma_fio_client_queue(struct thread_data *td,
+               struct io_u *io_u)
+{
+       struct librpma_fio_client_data *ccd = td->io_ops_data;
+
+       if (ccd->io_u_queued_nr == (int)td->o.iodepth)
+               return FIO_Q_BUSY;
+
+       if (td->o.sync_io)
+               return client_queue_sync(td, io_u);
+
+       /* io_u -> queued[] */
+       ccd->io_us_queued[ccd->io_u_queued_nr] = io_u;
+       ccd->io_u_queued_nr++;
+
+       return FIO_Q_QUEUED;
+}
+
+int librpma_fio_client_commit(struct thread_data *td)
+{
+       struct librpma_fio_client_data *ccd = td->io_ops_data;
+       int flags = RPMA_F_COMPLETION_ON_ERROR;
+       struct timespec now;
+       bool fill_time;
+       int i;
+       struct io_u *flush_first_io_u = NULL;
+       unsigned long long int flush_len = 0;
+
+       if (!ccd->io_us_queued)
+               return -1;
+
+       /* execute all io_us from queued[] */
+       for (i = 0; i < ccd->io_u_queued_nr; i++) {
+               struct io_u *io_u = ccd->io_us_queued[i];
+
+               if (io_u->ddir == DDIR_READ) {
+                       if (i + 1 == ccd->io_u_queued_nr ||
+                           ccd->io_us_queued[i + 1]->ddir == DDIR_WRITE)
+                               flags = RPMA_F_COMPLETION_ALWAYS;
+                       /* post an RDMA read operation */
+                       if (librpma_fio_client_io_read(td, io_u, flags))
+                               return -1;
+               } else if (io_u->ddir == DDIR_WRITE) {
+                       /* post an RDMA write operation */
+                       if (librpma_fio_client_io_write(td, io_u))
+                               return -1;
+
+                       /* cache the first io_u in the sequence */
+                       if (flush_first_io_u == NULL)
+                               flush_first_io_u = io_u;
+
+                       /*
+                        * the flush length is the sum of all io_u's creating
+                        * the sequence
+                        */
+                       flush_len += io_u->xfer_buflen;
+
+                       /*
+                        * if io_u's are random the rpma_flush is required
+                        * after each one of them
+                        */
+                       if (!td_random(td)) {
+                               /*
+                                * When the io_u's are sequential and
+                                * the current io_u is not the last one and
+                                * the next one is also a write operation
+                                * the flush can be postponed by one io_u and
+                                * cover all of them which build a continuous
+                                * sequence.
+                                */
+                               if ((i + 1 < ccd->io_u_queued_nr) &&
+                                   (ccd->io_us_queued[i + 1]->ddir == DDIR_WRITE))
+                                       continue;
+                       }
+
+                       /* flush all writes which build a continuous sequence */
+                       if (ccd->flush(td, flush_first_io_u, io_u, flush_len))
+                               return -1;
+
+                       /*
+                        * reset the flush parameters in preparation for
+                        * the next one
+                        */
+                       flush_first_io_u = NULL;
+                       flush_len = 0;
+               } else {
+                       log_err("unsupported IO mode: %s\n",
+                               io_ddir_name(io_u->ddir));
+                       return -1;
+               }
+       }
+
+       if ((fill_time = fio_fill_issue_time(td)))
+               fio_gettime(&now, NULL);
+
+       /* move executed io_us from queued[] to flight[] */
+       for (i = 0; i < ccd->io_u_queued_nr; i++) {
+               struct io_u *io_u = ccd->io_us_queued[i];
+
+               /* FIO does not do this if the engine is asynchronous */
+               if (fill_time)
+                       memcpy(&io_u->issue_time, &now, sizeof(now));
+
+               /* move executed io_us from queued[] to flight[] */
+               ccd->io_us_flight[ccd->io_u_flight_nr] = io_u;
+               ccd->io_u_flight_nr++;
+
+               /*
+                * FIO says:
+                * If an engine has the commit hook
+                * it has to call io_u_queued() itself.
+                */
+               io_u_queued(td, io_u);
+       }
+
+       /* FIO does not do this if an engine has the commit hook. */
+       io_u_mark_submit(td, ccd->io_u_queued_nr);
+       ccd->io_u_queued_nr = 0;
+
+       return 0;
+}
+
+/*
+ * RETURN VALUE
+ * - > 0  - a number of completed io_us
+ * -   0  - when no complicitions received
+ * - (-1) - when an error occurred
+ */
+static int client_getevent_process(struct thread_data *td)
+{
+       struct librpma_fio_client_data *ccd = td->io_ops_data;
+       struct rpma_completion cmpl;
+       /* io_u->index of completed io_u (cmpl.op_context) */
+       unsigned int io_u_index;
+       /* # of completed io_us */
+       int cmpl_num = 0;
+       /* helpers */
+       struct io_u *io_u;
+       int i;
+       int ret;
+
+       /* get a completion */
+       if ((ret = rpma_conn_completion_get(ccd->conn, &cmpl))) {
+               /* lack of completion is not an error */
+               if (ret == RPMA_E_NO_COMPLETION) {
+                       /* lack of completion is not an error */
+                       return 0;
+               }
+
+               /* an error occurred */
+               librpma_td_verror(td, ret, "rpma_conn_completion_get");
+               return -1;
+       }
+
+       /* if io_us has completed with an error */
+       if (cmpl.op_status != IBV_WC_SUCCESS) {
+               td->error = cmpl.op_status;
+               return -1;
+       }
+
+       if (cmpl.op == RPMA_OP_SEND)
+               ++ccd->op_send_completed;
+       else if (cmpl.op == RPMA_OP_RECV)
+               ++ccd->op_recv_completed;
+
+       if ((ret = ccd->get_io_u_index(&cmpl, &io_u_index)) != 1)
+               return ret;
+
+       /* look for an io_u being completed */
+       for (i = 0; i < ccd->io_u_flight_nr; ++i) {
+               if (ccd->io_us_flight[i]->index == io_u_index) {
+                       cmpl_num = i + 1;
+                       break;
+               }
+       }
+
+       /* if no matching io_u has been found */
+       if (cmpl_num == 0) {
+               log_err(
+                       "no matching io_u for received completion found (io_u_index=%u)\n",
+                       io_u_index);
+               return -1;
+       }
+
+       /* move completed io_us to the completed in-memory queue */
+       for (i = 0; i < cmpl_num; ++i) {
+               /* get and prepare io_u */
+               io_u = ccd->io_us_flight[i];
+
+               /* append to the queue */
+               ccd->io_us_completed[ccd->io_u_completed_nr] = io_u;
+               ccd->io_u_completed_nr++;
+       }
+
+       /* remove completed io_us from the flight queue */
+       for (i = cmpl_num; i < ccd->io_u_flight_nr; ++i)
+               ccd->io_us_flight[i - cmpl_num] = ccd->io_us_flight[i];
+       ccd->io_u_flight_nr -= cmpl_num;
+
+       return cmpl_num;
+}
+
+int librpma_fio_client_getevents(struct thread_data *td, unsigned int min,
+               unsigned int max, const struct timespec *t)
+{
+       struct librpma_fio_client_data *ccd = td->io_ops_data;
+       /* total # of completed io_us */
+       int cmpl_num_total = 0;
+       /* # of completed io_us from a single event */
+       int cmpl_num;
+
+       do {
+               cmpl_num = client_getevent_process(td);
+               if (cmpl_num > 0) {
+                       /* new completions collected */
+                       cmpl_num_total += cmpl_num;
+               } else if (cmpl_num == 0) {
+                       /*
+                        * It is required to make sure that CQEs for SENDs
+                        * will flow at least at the same pace as CQEs for RECVs.
+                        */
+                       if (cmpl_num_total >= min &&
+                           ccd->op_send_completed >= ccd->op_recv_completed)
+                               break;
+
+                       /*
+                        * To reduce CPU consumption one can use
+                        * the rpma_conn_completion_wait() function.
+                        * Note this greatly increase the latency
+                        * and make the results less stable.
+                        * The bandwidth stays more or less the same.
+                        */
+               } else {
+                       /* an error occurred */
+                       return -1;
+               }
+
+               /*
+                * The expected max can be exceeded if CQEs for RECVs will come up
+                * faster than CQEs for SENDs. But it is required to make sure CQEs for
+                * SENDs will flow at least at the same pace as CQEs for RECVs.
+                */
+       } while (cmpl_num_total < max ||
+                       ccd->op_send_completed < ccd->op_recv_completed);
+
+       /*
+        * All posted SENDs are completed and RECVs for them (responses) are
+        * completed. This is the initial situation so the counters are reset.
+        */
+       if (ccd->op_send_posted == ccd->op_send_completed &&
+                       ccd->op_send_completed == ccd->op_recv_completed) {
+               ccd->op_send_posted = 0;
+               ccd->op_send_completed = 0;
+               ccd->op_recv_completed = 0;
+       }
+
+       return cmpl_num_total;
+}
+
+struct io_u *librpma_fio_client_event(struct thread_data *td, int event)
+{
+       struct librpma_fio_client_data *ccd = td->io_ops_data;
+       struct io_u *io_u;
+       int i;
+
+       /* get the first io_u from the queue */
+       io_u = ccd->io_us_completed[0];
+
+       /* remove the first io_u from the queue */
+       for (i = 1; i < ccd->io_u_completed_nr; ++i)
+               ccd->io_us_completed[i - 1] = ccd->io_us_completed[i];
+       ccd->io_u_completed_nr--;
+
+       dprint_io_u(io_u, "client_event");
+
+       return io_u;
+}
+
+char *librpma_fio_client_errdetails(struct io_u *io_u)
+{
+       /* get the string representation of an error */
+       enum ibv_wc_status status = io_u->error;
+       const char *status_str = ibv_wc_status_str(status);
+
+       char *details = strdup(status_str);
+       if (details == NULL) {
+               fprintf(stderr, "Error: %s\n", status_str);
+               fprintf(stderr, "Fatal error: out of memory. Aborting.\n");
+               abort();
+       }
+
+       /* FIO frees the returned string when it becomes obsolete */
+       return details;
+}
+
+int librpma_fio_server_init(struct thread_data *td)
+{
+       struct librpma_fio_options_values *o = td->eo;
+       struct librpma_fio_server_data *csd;
+       struct ibv_context *dev = NULL;
+       enum rpma_log_level log_level_aux = RPMA_LOG_LEVEL_WARNING;
+       int ret = -1;
+
+       /* --debug=net sets RPMA_LOG_THRESHOLD_AUX to RPMA_LOG_LEVEL_INFO */
+#ifdef FIO_INC_DEBUG
+       if ((1UL << FD_NET) & fio_debug)
+               log_level_aux = RPMA_LOG_LEVEL_INFO;
+#endif
+
+       /* configure logging thresholds to see more details */
+       rpma_log_set_threshold(RPMA_LOG_THRESHOLD, RPMA_LOG_LEVEL_INFO);
+       rpma_log_set_threshold(RPMA_LOG_THRESHOLD_AUX, log_level_aux);
+
+
+       /* obtain an IBV context for a remote IP address */
+       if ((ret = rpma_utils_get_ibv_context(o->server_ip,
+                       RPMA_UTIL_IBV_CONTEXT_LOCAL, &dev))) {
+               librpma_td_verror(td, ret, "rpma_utils_get_ibv_context");
+               return -1;
+       }
+
+       /* allocate server's data */
+       csd = calloc(1, sizeof(*csd));
+       if (csd == NULL) {
+               td_verror(td, errno, "calloc");
+               return -1;
+       }
+
+       /* create a new peer object */
+       if ((ret = rpma_peer_new(dev, &csd->peer))) {
+               librpma_td_verror(td, ret, "rpma_peer_new");
+               goto err_free_csd;
+       }
+
+       td->io_ops_data = csd;
+
+       return 0;
+
+err_free_csd:
+       free(csd);
+
+       return -1;
+}
+
+void librpma_fio_server_cleanup(struct thread_data *td)
+{
+       struct librpma_fio_server_data *csd =  td->io_ops_data;
+       int ret;
+
+       if (csd == NULL)
+               return;
+
+       /* free the peer */
+       if ((ret = rpma_peer_delete(&csd->peer)))
+               librpma_td_verror(td, ret, "rpma_peer_delete");
+
+       free(csd);
+}
+
+int librpma_fio_server_open_file(struct thread_data *td, struct fio_file *f,
+               struct rpma_conn_cfg *cfg)
+{
+       struct librpma_fio_server_data *csd = td->io_ops_data;
+       struct librpma_fio_options_values *o = td->eo;
+       enum rpma_conn_event conn_event = RPMA_CONN_UNDEFINED;
+       struct librpma_fio_workspace ws = {0};
+       struct rpma_conn_private_data pdata;
+       uint32_t max_msg_num;
+       struct rpma_conn_req *conn_req;
+       struct rpma_conn *conn;
+       struct rpma_mr_local *mr;
+       char port_td[LIBRPMA_FIO_PORT_STR_LEN_MAX];
+       struct rpma_ep *ep;
+       size_t mem_size = td->o.size;
+       size_t mr_desc_size;
+       void *ws_ptr;
+       int usage_mem_type;
+       int ret;
+
+       if (!f->file_name) {
+               log_err("fio: filename is not set\n");
+               return -1;
+       }
+
+       /* start a listening endpoint at addr:port */
+       if (librpma_fio_td_port(o->port, td, port_td))
+               return -1;
+
+       if ((ret = rpma_ep_listen(csd->peer, o->server_ip, port_td, &ep))) {
+               librpma_td_verror(td, ret, "rpma_ep_listen");
+               return -1;
+       }
+
+       if (strcmp(f->file_name, "malloc") == 0) {
+               /* allocation from DRAM using posix_memalign() */
+               ws_ptr = librpma_fio_allocate_dram(td, mem_size, &csd->mem);
+               usage_mem_type = RPMA_MR_USAGE_FLUSH_TYPE_VISIBILITY;
+       } else {
+               /* allocation from PMEM using pmem_map_file() */
+               ws_ptr = librpma_fio_allocate_pmem(td, f->file_name,
+                               mem_size, &csd->mem);
+               usage_mem_type = RPMA_MR_USAGE_FLUSH_TYPE_PERSISTENT;
+       }
+
+       if (ws_ptr == NULL)
+               goto err_ep_shutdown;
+
+       f->real_file_size = mem_size;
+
+       if ((ret = rpma_mr_reg(csd->peer, ws_ptr, mem_size,
+                       RPMA_MR_USAGE_READ_DST | RPMA_MR_USAGE_READ_SRC |
+                       RPMA_MR_USAGE_WRITE_DST | RPMA_MR_USAGE_WRITE_SRC |
+                       usage_mem_type, &mr))) {
+               librpma_td_verror(td, ret, "rpma_mr_reg");
+               goto err_free;
+       }
+
+       /* get size of the memory region's descriptor */
+       if ((ret = rpma_mr_get_descriptor_size(mr, &mr_desc_size))) {
+               librpma_td_verror(td, ret, "rpma_mr_get_descriptor_size");
+               goto err_mr_dereg;
+       }
+
+       /* verify size of the memory region's descriptor */
+       if (mr_desc_size > LIBRPMA_FIO_DESCRIPTOR_MAX_SIZE) {
+               log_err(
+                       "size of the memory region's descriptor is too big (max=%i)\n",
+                       LIBRPMA_FIO_DESCRIPTOR_MAX_SIZE);
+               goto err_mr_dereg;
+       }
+
+       /* get the memory region's descriptor */
+       if ((ret = rpma_mr_get_descriptor(mr, &ws.descriptor[0]))) {
+               librpma_td_verror(td, ret, "rpma_mr_get_descriptor");
+               goto err_mr_dereg;
+       }
+
+       if (cfg != NULL) {
+               if ((ret = rpma_conn_cfg_get_rq_size(cfg, &max_msg_num))) {
+                       librpma_td_verror(td, ret, "rpma_conn_cfg_get_rq_size");
+                       goto err_mr_dereg;
+               }
+
+               /* verify whether iodepth fits into uint16_t */
+               if (max_msg_num > UINT16_MAX) {
+                       log_err("fio: iodepth too big (%u > %u)\n",
+                               max_msg_num, UINT16_MAX);
+                       return -1;
+               }
+
+               ws.max_msg_num = max_msg_num;
+       }
+
+       /* prepare a workspace description */
+       ws.direct_write_to_pmem = o->direct_write_to_pmem;
+       ws.mr_desc_size = mr_desc_size;
+       pdata.ptr = &ws;
+       pdata.len = sizeof(ws);
+
+       /* receive an incoming connection request */
+       if ((ret = rpma_ep_next_conn_req(ep, cfg, &conn_req))) {
+               librpma_td_verror(td, ret, "rpma_ep_next_conn_req");
+               goto err_mr_dereg;
+       }
+
+       if (csd->prepare_connection && csd->prepare_connection(td, conn_req))
+               goto err_req_delete;
+
+       /* accept the connection request and obtain the connection object */
+       if ((ret = rpma_conn_req_connect(&conn_req, &pdata, &conn))) {
+               librpma_td_verror(td, ret, "rpma_conn_req_connect");
+               goto err_req_delete;
+       }
+
+       /* wait for the connection to be established */
+       if ((ret = rpma_conn_next_event(conn, &conn_event))) {
+               librpma_td_verror(td, ret, "rpma_conn_next_event");
+               goto err_conn_delete;
+       } else if (conn_event != RPMA_CONN_ESTABLISHED) {
+               log_err("rpma_conn_next_event returned an unexptected event\n");
+               goto err_conn_delete;
+       }
+
+       /* end-point is no longer needed */
+       (void) rpma_ep_shutdown(&ep);
+
+       csd->ws_mr = mr;
+       csd->ws_ptr = ws_ptr;
+       csd->conn = conn;
+
+       return 0;
+
+err_conn_delete:
+       (void) rpma_conn_delete(&conn);
+
+err_req_delete:
+       (void) rpma_conn_req_delete(&conn_req);
+
+err_mr_dereg:
+       (void) rpma_mr_dereg(&mr);
+
+err_free:
+       librpma_fio_free(&csd->mem);
+
+err_ep_shutdown:
+       (void) rpma_ep_shutdown(&ep);
+
+       return -1;
+}
+
+int librpma_fio_server_close_file(struct thread_data *td, struct fio_file *f)
+{
+       struct librpma_fio_server_data *csd = td->io_ops_data;
+       enum rpma_conn_event conn_event = RPMA_CONN_UNDEFINED;
+       int rv = 0;
+       int ret;
+
+       /* wait for the connection to be closed */
+       ret = rpma_conn_next_event(csd->conn, &conn_event);
+       if (!ret && conn_event != RPMA_CONN_CLOSED) {
+               log_err("rpma_conn_next_event returned an unexptected event\n");
+               rv = -1;
+       }
+
+       if ((ret = rpma_conn_disconnect(csd->conn))) {
+               librpma_td_verror(td, ret, "rpma_conn_disconnect");
+               rv = -1;
+       }
+
+       if ((ret = rpma_conn_delete(&csd->conn))) {
+               librpma_td_verror(td, ret, "rpma_conn_delete");
+               rv = -1;
+       }
+
+       if ((ret = rpma_mr_dereg(&csd->ws_mr))) {
+               librpma_td_verror(td, ret, "rpma_mr_dereg");
+               rv = -1;
+       }
+
+       librpma_fio_free(&csd->mem);
+
+       return rv;
+}
diff --git a/engines/librpma_fio.h b/engines/librpma_fio.h
new file mode 100644 (file)
index 0000000..8cfb2e2
--- /dev/null
@@ -0,0 +1,273 @@
+/*
+ * librpma_fio: librpma_apm and librpma_gpspm engines' common header.
+ *
+ * Copyright 2021, Intel Corporation
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License,
+ * version 2 as published by the Free Software Foundation..
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ */
+
+#ifndef LIBRPMA_FIO_H
+#define LIBRPMA_FIO_H 1
+
+#include "../fio.h"
+#include "../optgroup.h"
+
+#include <librpma.h>
+
+/* servers' and clients' common */
+
+#define librpma_td_verror(td, err, func) \
+       td_vmsg((td), (err), rpma_err_2str(err), (func))
+
+/* ceil(a / b) = (a + b - 1) / b */
+#define LIBRPMA_FIO_CEIL(a, b) (((a) + (b) - 1) / (b))
+
+/* common option structure for server and client */
+struct librpma_fio_options_values {
+       /*
+        * FIO considers .off1 == 0 absent so the first meaningful field has to
+        * have padding ahead of it.
+        */
+       void *pad;
+       char *server_ip;
+       /* base server listening port */
+       char *port;
+       /* Direct Write to PMem is possible */
+       unsigned int direct_write_to_pmem;
+};
+
+extern struct fio_option librpma_fio_options[];
+
+/*
+ * Limited by the maximum length of the private data
+ * for rdma_connect() in case of RDMA_PS_TCP (28 bytes).
+ */
+#define LIBRPMA_FIO_DESCRIPTOR_MAX_SIZE 24
+
+struct librpma_fio_workspace {
+       uint16_t max_msg_num;   /* # of RQ slots */
+       uint8_t direct_write_to_pmem; /* Direct Write to PMem is possible */
+       uint8_t mr_desc_size;   /* size of mr_desc in descriptor[] */
+       /* buffer containing mr_desc */
+       char descriptor[LIBRPMA_FIO_DESCRIPTOR_MAX_SIZE];
+};
+
+#define LIBRPMA_FIO_PORT_STR_LEN_MAX 12
+
+int librpma_fio_td_port(const char *port_base_str, struct thread_data *td,
+               char *port_out);
+
+struct librpma_fio_mem {
+       /* memory buffer */
+       char *mem_ptr;
+
+       /* size of the mapped persistent memory */
+       size_t size_mmap;
+};
+
+char *librpma_fio_allocate_dram(struct thread_data *td, size_t size,
+               struct librpma_fio_mem *mem);
+
+char *librpma_fio_allocate_pmem(struct thread_data *td, const char *filename,
+               size_t size, struct librpma_fio_mem *mem);
+
+void librpma_fio_free(struct librpma_fio_mem *mem);
+
+/* clients' common */
+
+typedef int (*librpma_fio_flush_t)(struct thread_data *td,
+               struct io_u *first_io_u, struct io_u *last_io_u,
+               unsigned long long int len);
+
+/*
+ * RETURN VALUE
+ * - ( 1) - on success
+ * - ( 0) - skip
+ * - (-1) - on error
+ */
+typedef int (*librpma_fio_get_io_u_index_t)(struct rpma_completion *cmpl,
+               unsigned int *io_u_index);
+
+struct librpma_fio_client_data {
+       struct rpma_peer *peer;
+       struct rpma_conn *conn;
+
+       /* aligned td->orig_buffer */
+       char *orig_buffer_aligned;
+
+       /* ious's base address memory registration (cd->orig_buffer_aligned) */
+       struct rpma_mr_local *orig_mr;
+
+       struct librpma_fio_workspace *ws;
+
+       /* a server's memory representation */
+       struct rpma_mr_remote *server_mr;
+       enum rpma_flush_type server_mr_flush_type;
+
+       /* remote workspace description */
+       size_t ws_size;
+
+       /* in-memory queues */
+       struct io_u **io_us_queued;
+       int io_u_queued_nr;
+       struct io_u **io_us_flight;
+       int io_u_flight_nr;
+       struct io_u **io_us_completed;
+       int io_u_completed_nr;
+
+       /* SQ control. Note: all of them have to be kept in sync. */
+       uint32_t op_send_posted;
+       uint32_t op_send_completed;
+       uint32_t op_recv_completed;
+
+       librpma_fio_flush_t flush;
+       librpma_fio_get_io_u_index_t get_io_u_index;
+
+       /* engine-specific client data */
+       void *client_data;
+};
+
+int librpma_fio_client_init(struct thread_data *td,
+               struct rpma_conn_cfg *cfg);
+void librpma_fio_client_cleanup(struct thread_data *td);
+
+int librpma_fio_file_nop(struct thread_data *td, struct fio_file *f);
+int librpma_fio_client_get_file_size(struct thread_data *td,
+               struct fio_file *f);
+
+int librpma_fio_client_post_init(struct thread_data *td);
+
+enum fio_q_status librpma_fio_client_queue(struct thread_data *td,
+               struct io_u *io_u);
+
+int librpma_fio_client_commit(struct thread_data *td);
+
+int librpma_fio_client_getevents(struct thread_data *td, unsigned int min,
+               unsigned int max, const struct timespec *t);
+
+struct io_u *librpma_fio_client_event(struct thread_data *td, int event);
+
+char *librpma_fio_client_errdetails(struct io_u *io_u);
+
+static inline int librpma_fio_client_io_read(struct thread_data *td,
+               struct io_u *io_u, int flags)
+{
+       struct librpma_fio_client_data *ccd = td->io_ops_data;
+       size_t dst_offset = (char *)(io_u->xfer_buf) - ccd->orig_buffer_aligned;
+       size_t src_offset = io_u->offset;
+       int ret;
+
+       if ((ret = rpma_read(ccd->conn, ccd->orig_mr, dst_offset,
+                       ccd->server_mr, src_offset, io_u->xfer_buflen,
+                       flags, (void *)(uintptr_t)io_u->index))) {
+               librpma_td_verror(td, ret, "rpma_read");
+               return -1;
+       }
+
+       return 0;
+}
+
+static inline int librpma_fio_client_io_write(struct thread_data *td,
+               struct io_u *io_u)
+{
+       struct librpma_fio_client_data *ccd = td->io_ops_data;
+       size_t src_offset = (char *)(io_u->xfer_buf) - ccd->orig_buffer_aligned;
+       size_t dst_offset = io_u->offset;
+       int ret;
+
+       if ((ret = rpma_write(ccd->conn, ccd->server_mr, dst_offset,
+                       ccd->orig_mr, src_offset, io_u->xfer_buflen,
+                       RPMA_F_COMPLETION_ON_ERROR,
+                       (void *)(uintptr_t)io_u->index))) {
+               librpma_td_verror(td, ret, "rpma_write");
+               return -1;
+       }
+
+       return 0;
+}
+
+static inline int librpma_fio_client_io_complete_all_sends(
+               struct thread_data *td)
+{
+       struct librpma_fio_client_data *ccd = td->io_ops_data;
+       struct rpma_completion cmpl;
+       int ret;
+
+       while (ccd->op_send_posted != ccd->op_send_completed) {
+               /* get a completion */
+               ret = rpma_conn_completion_get(ccd->conn, &cmpl);
+               if (ret == RPMA_E_NO_COMPLETION) {
+                       /* lack of completion is not an error */
+                       continue;
+               } else if (ret != 0) {
+                       /* an error occurred */
+                       librpma_td_verror(td, ret, "rpma_conn_completion_get");
+                       break;
+               }
+
+               if (cmpl.op_status != IBV_WC_SUCCESS)
+                       return -1;
+
+               if (cmpl.op == RPMA_OP_SEND)
+                       ++ccd->op_send_completed;
+               else {
+                       log_err(
+                               "A completion other than RPMA_OP_SEND got during cleaning up the CQ from SENDs\n");
+                       return -1;
+               }
+       }
+
+       /*
+        * All posted SENDs are completed and RECVs for them (responses) are
+        * completed. This is the initial situation so the counters are reset.
+        */
+       if (ccd->op_send_posted == ccd->op_send_completed &&
+                       ccd->op_send_completed == ccd->op_recv_completed) {
+               ccd->op_send_posted = 0;
+               ccd->op_send_completed = 0;
+               ccd->op_recv_completed = 0;
+       }
+
+       return 0;
+}
+
+/* servers' common */
+
+typedef int (*librpma_fio_prepare_connection_t)(
+               struct thread_data *td,
+               struct rpma_conn_req *conn_req);
+
+struct librpma_fio_server_data {
+       struct rpma_peer *peer;
+
+       /* resources of an incoming connection */
+       struct rpma_conn *conn;
+
+       char *ws_ptr;
+       struct rpma_mr_local *ws_mr;
+       struct librpma_fio_mem mem;
+
+       /* engine-specific server data */
+       void *server_data;
+
+       librpma_fio_prepare_connection_t prepare_connection;
+};
+
+int librpma_fio_server_init(struct thread_data *td);
+
+void librpma_fio_server_cleanup(struct thread_data *td);
+
+int librpma_fio_server_open_file(struct thread_data *td,
+               struct fio_file *f, struct rpma_conn_cfg *cfg);
+
+int librpma_fio_server_close_file(struct thread_data *td,
+               struct fio_file *f);
+
+#endif /* LIBRPMA_FIO_H */
diff --git a/engines/librpma_gpspm.c b/engines/librpma_gpspm.c
new file mode 100644 (file)
index 0000000..ac614f4
--- /dev/null
@@ -0,0 +1,755 @@
+/*
+ * librpma_gpspm: IO engine that uses PMDK librpma to write data,
+ *             based on General Purpose Server Persistency Method
+ *
+ * Copyright 2020-2021, Intel Corporation
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License,
+ * version 2 as published by the Free Software Foundation..
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ */
+
+#include "librpma_fio.h"
+
+#include <libpmem.h>
+
+/* Generated by the protocol buffer compiler from: librpma_gpspm_flush.proto */
+#include "librpma_gpspm_flush.pb-c.h"
+
+#define MAX_MSG_SIZE (512)
+#define IO_U_BUF_LEN (2 * MAX_MSG_SIZE)
+#define SEND_OFFSET (0)
+#define RECV_OFFSET (SEND_OFFSET + MAX_MSG_SIZE)
+
+#define GPSPM_FLUSH_REQUEST__LAST \
+       { PROTOBUF_C_MESSAGE_INIT(&gpspm_flush_request__descriptor), 0, 0, 0 }
+
+/*
+ * 'Flush_req_last' is the last flush request
+ * the client has to send to server to indicate
+ * that the client is done.
+ */
+static const GPSPMFlushRequest Flush_req_last = GPSPM_FLUSH_REQUEST__LAST;
+
+#define IS_NOT_THE_LAST_MESSAGE(flush_req) \
+       (flush_req->length != Flush_req_last.length || \
+       flush_req->offset != Flush_req_last.offset)
+
+/* client side implementation */
+
+/* get next io_u message buffer in the round-robin fashion */
+#define IO_U_NEXT_BUF_OFF_CLIENT(cd) \
+       (IO_U_BUF_LEN * ((cd->msg_curr++) % cd->msg_num))
+
+struct client_data {
+       /* memory for sending and receiving buffered */
+       char *io_us_msgs;
+
+       /* resources for messaging buffer */
+       uint32_t msg_num;
+       uint32_t msg_curr;
+       struct rpma_mr_local *msg_mr;
+};
+
+static inline int client_io_flush(struct thread_data *td,
+               struct io_u *first_io_u, struct io_u *last_io_u,
+               unsigned long long int len);
+
+static int client_get_io_u_index(struct rpma_completion *cmpl,
+               unsigned int *io_u_index);
+
+static int client_init(struct thread_data *td)
+{
+       struct librpma_fio_client_data *ccd;
+       struct client_data *cd;
+       uint32_t write_num;
+       struct rpma_conn_cfg *cfg = NULL;
+       int ret;
+
+       /*
+        * not supported:
+        * - readwrite = read / trim / randread / randtrim /
+        *               / rw / randrw / trimwrite
+        */
+       if (td_read(td) || td_trim(td)) {
+               td_verror(td, EINVAL, "Not supported mode.");
+               return -1;
+       }
+
+       /* allocate client's data */
+       cd = calloc(1, sizeof(*cd));
+       if (cd == NULL) {
+               td_verror(td, errno, "calloc");
+               return -1;
+       }
+
+       /*
+        * Calculate the required number of WRITEs and FLUSHes.
+        *
+        * Note: Each flush is a request (SEND) and response (RECV) pair.
+        */
+       if (td_random(td)) {
+               write_num = td->o.iodepth; /* WRITE * N */
+               cd->msg_num = td->o.iodepth; /* FLUSH * N */
+       } else {
+               if (td->o.sync_io) {
+                       write_num = 1; /* WRITE */
+                       cd->msg_num = 1; /* FLUSH */
+               } else {
+                       write_num = td->o.iodepth; /* WRITE * N */
+                       /*
+                        * FLUSH * B where:
+                        * - B == ceil(iodepth / iodepth_batch)
+                        *   which is the number of batches for N writes
+                        */
+                       cd->msg_num = LIBRPMA_FIO_CEIL(td->o.iodepth,
+                                       td->o.iodepth_batch);
+               }
+       }
+
+       /* create a connection configuration object */
+       if ((ret = rpma_conn_cfg_new(&cfg))) {
+               librpma_td_verror(td, ret, "rpma_conn_cfg_new");
+               goto err_free_cd;
+       }
+
+       /*
+        * Calculate the required queue sizes where:
+        * - the send queue (SQ) has to be big enough to accommodate
+        *   all io_us (WRITEs) and all flush requests (SENDs)
+        * - the receive queue (RQ) has to be big enough to accommodate
+        *   all flush responses (RECVs)
+        * - the completion queue (CQ) has to be big enough to accommodate all
+        *   success and error completions (sq_size + rq_size)
+        */
+       if ((ret = rpma_conn_cfg_set_sq_size(cfg, write_num + cd->msg_num))) {
+               librpma_td_verror(td, ret, "rpma_conn_cfg_set_sq_size");
+               goto err_cfg_delete;
+       }
+       if ((ret = rpma_conn_cfg_set_rq_size(cfg, cd->msg_num))) {
+               librpma_td_verror(td, ret, "rpma_conn_cfg_set_rq_size");
+               goto err_cfg_delete;
+       }
+       if ((ret = rpma_conn_cfg_set_cq_size(cfg, write_num + cd->msg_num * 2))) {
+               librpma_td_verror(td, ret, "rpma_conn_cfg_set_cq_size");
+               goto err_cfg_delete;
+       }
+
+       if (librpma_fio_client_init(td, cfg))
+               goto err_cfg_delete;
+
+       ccd = td->io_ops_data;
+
+       if (ccd->ws->direct_write_to_pmem &&
+           ccd->server_mr_flush_type == RPMA_FLUSH_TYPE_PERSISTENT &&
+           td->thread_number == 1) {
+               /* XXX log_info mixes with the JSON output */
+               log_err(
+                       "Note: The server side supports Direct Write to PMem and it is equipped with PMem (direct_write_to_pmem).\n"
+                       "You can use librpma_client and librpma_server engines for better performance instead of GPSPM.\n");
+       }
+
+       /* validate the server's RQ capacity */
+       if (cd->msg_num > ccd->ws->max_msg_num) {
+               log_err(
+                       "server's RQ size (iodepth) too small to handle the client's workspace requirements (%u < %u)\n",
+                       ccd->ws->max_msg_num, cd->msg_num);
+               goto err_cleanup_common;
+       }
+
+       if ((ret = rpma_conn_cfg_delete(&cfg))) {
+               librpma_td_verror(td, ret, "rpma_conn_cfg_delete");
+               /* non fatal error - continue */
+       }
+
+       ccd->flush = client_io_flush;
+       ccd->get_io_u_index = client_get_io_u_index;
+       ccd->client_data = cd;
+
+       return 0;
+
+err_cleanup_common:
+       librpma_fio_client_cleanup(td);
+
+err_cfg_delete:
+       (void) rpma_conn_cfg_delete(&cfg);
+
+err_free_cd:
+       free(cd);
+
+       return -1;
+}
+
+static int client_post_init(struct thread_data *td)
+{
+       struct librpma_fio_client_data *ccd = td->io_ops_data;
+       struct client_data *cd = ccd->client_data;
+       unsigned int io_us_msgs_size;
+       int ret;
+
+       /* message buffers initialization and registration */
+       io_us_msgs_size = cd->msg_num * IO_U_BUF_LEN;
+       if ((ret = posix_memalign((void **)&cd->io_us_msgs, page_size,
+                       io_us_msgs_size))) {
+               td_verror(td, ret, "posix_memalign");
+               return ret;
+       }
+       if ((ret = rpma_mr_reg(ccd->peer, cd->io_us_msgs, io_us_msgs_size,
+                       RPMA_MR_USAGE_SEND | RPMA_MR_USAGE_RECV,
+                       &cd->msg_mr))) {
+               librpma_td_verror(td, ret, "rpma_mr_reg");
+               return ret;
+       }
+
+       return librpma_fio_client_post_init(td);
+}
+
+static void client_cleanup(struct thread_data *td)
+{
+       struct librpma_fio_client_data *ccd = td->io_ops_data;
+       struct client_data *cd;
+       size_t flush_req_size;
+       size_t io_u_buf_off;
+       size_t send_offset;
+       void *send_ptr;
+       int ret;
+
+       if (ccd == NULL)
+               return;
+
+       cd = ccd->client_data;
+       if (cd == NULL) {
+               librpma_fio_client_cleanup(td);
+               return;
+       }
+
+       /*
+        * Make sure all SEND completions are collected ergo there are free
+        * slots in the SQ for the last SEND message.
+        *
+        * Note: If any operation will fail we still can send the termination
+        * notice.
+        */
+       (void) librpma_fio_client_io_complete_all_sends(td);
+
+       /* prepare the last flush message and pack it to the send buffer */
+       flush_req_size = gpspm_flush_request__get_packed_size(&Flush_req_last);
+       if (flush_req_size > MAX_MSG_SIZE) {
+               log_err(
+                       "Packed flush request size is bigger than available send buffer space (%zu > %d\n",
+                       flush_req_size, MAX_MSG_SIZE);
+       } else {
+               io_u_buf_off = IO_U_NEXT_BUF_OFF_CLIENT(cd);
+               send_offset = io_u_buf_off + SEND_OFFSET;
+               send_ptr = cd->io_us_msgs + send_offset;
+               (void) gpspm_flush_request__pack(&Flush_req_last, send_ptr);
+
+               /* send the flush message */
+               if ((ret = rpma_send(ccd->conn, cd->msg_mr, send_offset,
+                               flush_req_size, RPMA_F_COMPLETION_ALWAYS,
+                               NULL)))
+                       librpma_td_verror(td, ret, "rpma_send");
+
+               ++ccd->op_send_posted;
+
+               /* Wait for the SEND to complete */
+               (void) librpma_fio_client_io_complete_all_sends(td);
+       }
+
+       /* deregister the messaging buffer memory */
+       if ((ret = rpma_mr_dereg(&cd->msg_mr)))
+               librpma_td_verror(td, ret, "rpma_mr_dereg");
+
+       free(ccd->client_data);
+
+       librpma_fio_client_cleanup(td);
+}
+
+static inline int client_io_flush(struct thread_data *td,
+               struct io_u *first_io_u, struct io_u *last_io_u,
+               unsigned long long int len)
+{
+       struct librpma_fio_client_data *ccd = td->io_ops_data;
+       struct client_data *cd = ccd->client_data;
+       size_t io_u_buf_off = IO_U_NEXT_BUF_OFF_CLIENT(cd);
+       size_t send_offset = io_u_buf_off + SEND_OFFSET;
+       size_t recv_offset = io_u_buf_off + RECV_OFFSET;
+       void *send_ptr = cd->io_us_msgs + send_offset;
+       void *recv_ptr = cd->io_us_msgs + recv_offset;
+       GPSPMFlushRequest flush_req = GPSPM_FLUSH_REQUEST__INIT;
+       size_t flush_req_size = 0;
+       int ret;
+
+       /* prepare a response buffer */
+       if ((ret = rpma_recv(ccd->conn, cd->msg_mr, recv_offset, MAX_MSG_SIZE,
+                       recv_ptr))) {
+               librpma_td_verror(td, ret, "rpma_recv");
+               return -1;
+       }
+
+       /* prepare a flush message and pack it to a send buffer */
+       flush_req.offset = first_io_u->offset;
+       flush_req.length = len;
+       flush_req.op_context = last_io_u->index;
+       flush_req_size = gpspm_flush_request__get_packed_size(&flush_req);
+       if (flush_req_size > MAX_MSG_SIZE) {
+               log_err(
+                       "Packed flush request size is bigger than available send buffer space (%"
+                       PRIu64 " > %d\n", flush_req_size, MAX_MSG_SIZE);
+               return -1;
+       }
+       (void) gpspm_flush_request__pack(&flush_req, send_ptr);
+
+       /* send the flush message */
+       if ((ret = rpma_send(ccd->conn, cd->msg_mr, send_offset, flush_req_size,
+                       RPMA_F_COMPLETION_ALWAYS, NULL))) {
+               librpma_td_verror(td, ret, "rpma_send");
+               return -1;
+       }
+
+       ++ccd->op_send_posted;
+
+       return 0;
+}
+
+static int client_get_io_u_index(struct rpma_completion *cmpl,
+               unsigned int *io_u_index)
+{
+       GPSPMFlushResponse *flush_resp;
+
+       if (cmpl->op != RPMA_OP_RECV)
+               return 0;
+
+       /* unpack a response from the received buffer */
+       flush_resp = gpspm_flush_response__unpack(NULL,
+                       cmpl->byte_len, cmpl->op_context);
+       if (flush_resp == NULL) {
+               log_err("Cannot unpack the flush response buffer\n");
+               return -1;
+       }
+
+       memcpy(io_u_index, &flush_resp->op_context, sizeof(*io_u_index));
+
+       gpspm_flush_response__free_unpacked(flush_resp, NULL);
+
+       return 1;
+}
+
+FIO_STATIC struct ioengine_ops ioengine_client = {
+       .name                   = "librpma_gpspm_client",
+       .version                = FIO_IOOPS_VERSION,
+       .init                   = client_init,
+       .post_init              = client_post_init,
+       .get_file_size          = librpma_fio_client_get_file_size,
+       .open_file              = librpma_fio_file_nop,
+       .queue                  = librpma_fio_client_queue,
+       .commit                 = librpma_fio_client_commit,
+       .getevents              = librpma_fio_client_getevents,
+       .event                  = librpma_fio_client_event,
+       .errdetails             = librpma_fio_client_errdetails,
+       .close_file             = librpma_fio_file_nop,
+       .cleanup                = client_cleanup,
+       .flags                  = FIO_DISKLESSIO,
+       .options                = librpma_fio_options,
+       .option_struct_size     = sizeof(struct librpma_fio_options_values),
+};
+
+/* server side implementation */
+
+#define IO_U_BUFF_OFF_SERVER(i) (i * IO_U_BUF_LEN)
+
+struct server_data {
+       /* aligned td->orig_buffer */
+       char *orig_buffer_aligned;
+
+       /* resources for messaging buffer from DRAM allocated by fio */
+       struct rpma_mr_local *msg_mr;
+
+       uint32_t msg_sqe_available; /* # of free SQ slots */
+
+       /* in-memory queues */
+       struct rpma_completion *msgs_queued;
+       uint32_t msg_queued_nr;
+};
+
+static int server_init(struct thread_data *td)
+{
+       struct librpma_fio_server_data *csd;
+       struct server_data *sd;
+       int ret = -1;
+
+       if ((ret = librpma_fio_server_init(td)))
+               return ret;
+
+       csd = td->io_ops_data;
+
+       /* allocate server's data */
+       sd = calloc(1, sizeof(*sd));
+       if (sd == NULL) {
+               td_verror(td, errno, "calloc");
+               goto err_server_cleanup;
+       }
+
+       /* allocate in-memory queue */
+       sd->msgs_queued = calloc(td->o.iodepth, sizeof(*sd->msgs_queued));
+       if (sd->msgs_queued == NULL) {
+               td_verror(td, errno, "calloc");
+               goto err_free_sd;
+       }
+
+       /*
+        * Assure a single io_u buffer can store both SEND and RECV messages and
+        * an io_us buffer allocation is page-size-aligned which is required
+        * to register for RDMA. User-provided values are intentionally ignored.
+        */
+       td->o.max_bs[DDIR_READ] = IO_U_BUF_LEN;
+       td->o.mem_align = page_size;
+
+       csd->server_data = sd;
+
+       return 0;
+
+err_free_sd:
+       free(sd);
+
+err_server_cleanup:
+       librpma_fio_server_cleanup(td);
+
+       return -1;
+}
+
+static int server_post_init(struct thread_data *td)
+{
+       struct librpma_fio_server_data *csd = td->io_ops_data;
+       struct server_data *sd = csd->server_data;
+       size_t io_us_size;
+       size_t io_u_buflen;
+       int ret;
+
+       /*
+        * td->orig_buffer is not aligned. The engine requires aligned io_us
+        * so FIO alignes up the address using the formula below.
+        */
+       sd->orig_buffer_aligned = PTR_ALIGN(td->orig_buffer, page_mask) +
+                       td->o.mem_align;
+
+       /*
+        * XXX
+        * Each io_u message buffer contains recv and send messages.
+        * Aligning each of those buffers may potentially give
+        * some performance benefits.
+        */
+       io_u_buflen = td_max_bs(td);
+
+       /* check whether io_u buffer is big enough */
+       if (io_u_buflen < IO_U_BUF_LEN) {
+               log_err(
+                       "blocksize too small to accommodate assumed maximal request/response pair size (%" PRIu64 " < %d)\n",
+                       io_u_buflen, IO_U_BUF_LEN);
+               return -1;
+       }
+
+       /*
+        * td->orig_buffer_size beside the space really consumed by io_us
+        * has paddings which can be omitted for the memory registration.
+        */
+       io_us_size = (unsigned long long)io_u_buflen *
+                       (unsigned long long)td->o.iodepth;
+
+       if ((ret = rpma_mr_reg(csd->peer, sd->orig_buffer_aligned, io_us_size,
+                       RPMA_MR_USAGE_SEND | RPMA_MR_USAGE_RECV,
+                       &sd->msg_mr))) {
+               librpma_td_verror(td, ret, "rpma_mr_reg");
+               return -1;
+       }
+
+       return 0;
+}
+
+static void server_cleanup(struct thread_data *td)
+{
+       struct librpma_fio_server_data *csd = td->io_ops_data;
+       struct server_data *sd;
+       int ret;
+
+       if (csd == NULL)
+               return;
+
+       sd = csd->server_data;
+
+       if (sd != NULL) {
+               /* rpma_mr_dereg(messaging buffer from DRAM) */
+               if ((ret = rpma_mr_dereg(&sd->msg_mr)))
+                       librpma_td_verror(td, ret, "rpma_mr_dereg");
+
+               free(sd->msgs_queued);
+               free(sd);
+       }
+
+       librpma_fio_server_cleanup(td);
+}
+
+static int prepare_connection(struct thread_data *td,
+               struct rpma_conn_req *conn_req)
+{
+       struct librpma_fio_server_data *csd = td->io_ops_data;
+       struct server_data *sd = csd->server_data;
+       int ret;
+       int i;
+
+       /* prepare buffers for a flush requests */
+       sd->msg_sqe_available = td->o.iodepth;
+       for (i = 0; i < td->o.iodepth; i++) {
+               size_t offset_recv_msg = IO_U_BUFF_OFF_SERVER(i) + RECV_OFFSET;
+               if ((ret = rpma_conn_req_recv(conn_req, sd->msg_mr,
+                               offset_recv_msg, MAX_MSG_SIZE,
+                               (const void *)(uintptr_t)i))) {
+                       librpma_td_verror(td, ret, "rpma_conn_req_recv");
+                       return ret;
+               }
+       }
+
+       return 0;
+}
+
+static int server_open_file(struct thread_data *td, struct fio_file *f)
+{
+       struct librpma_fio_server_data *csd = td->io_ops_data;
+       struct rpma_conn_cfg *cfg = NULL;
+       uint16_t max_msg_num = td->o.iodepth;
+       int ret;
+
+       csd->prepare_connection = prepare_connection;
+
+       /* create a connection configuration object */
+       if ((ret = rpma_conn_cfg_new(&cfg))) {
+               librpma_td_verror(td, ret, "rpma_conn_cfg_new");
+               return -1;
+       }
+
+       /*
+        * Calculate the required queue sizes where:
+        * - the send queue (SQ) has to be big enough to accommodate
+        *   all possible flush requests (SENDs)
+        * - the receive queue (RQ) has to be big enough to accommodate
+        *   all flush responses (RECVs)
+        * - the completion queue (CQ) has to be big enough to accommodate
+        *   all success and error completions (sq_size + rq_size)
+        */
+       if ((ret = rpma_conn_cfg_set_sq_size(cfg, max_msg_num))) {
+               librpma_td_verror(td, ret, "rpma_conn_cfg_set_sq_size");
+               goto err_cfg_delete;
+       }
+       if ((ret = rpma_conn_cfg_set_rq_size(cfg, max_msg_num))) {
+               librpma_td_verror(td, ret, "rpma_conn_cfg_set_rq_size");
+               goto err_cfg_delete;
+       }
+       if ((ret = rpma_conn_cfg_set_cq_size(cfg, max_msg_num * 2))) {
+               librpma_td_verror(td, ret, "rpma_conn_cfg_set_cq_size");
+               goto err_cfg_delete;
+       }
+
+       ret = librpma_fio_server_open_file(td, f, cfg);
+
+err_cfg_delete:
+       (void) rpma_conn_cfg_delete(&cfg);
+
+       return ret;
+}
+
+static int server_qe_process(struct thread_data *td,
+               struct rpma_completion *cmpl)
+{
+       struct librpma_fio_server_data *csd = td->io_ops_data;
+       struct server_data *sd = csd->server_data;
+       GPSPMFlushRequest *flush_req;
+       GPSPMFlushResponse flush_resp = GPSPM_FLUSH_RESPONSE__INIT;
+       size_t flush_resp_size = 0;
+       size_t send_buff_offset;
+       size_t recv_buff_offset;
+       size_t io_u_buff_offset;
+       void *send_buff_ptr;
+       void *recv_buff_ptr;
+       void *op_ptr;
+       int msg_index;
+       int ret;
+
+       /* calculate SEND/RECV pair parameters */
+       msg_index = (int)(uintptr_t)cmpl->op_context;
+       io_u_buff_offset = IO_U_BUFF_OFF_SERVER(msg_index);
+       send_buff_offset = io_u_buff_offset + SEND_OFFSET;
+       recv_buff_offset = io_u_buff_offset + RECV_OFFSET;
+       send_buff_ptr = sd->orig_buffer_aligned + send_buff_offset;
+       recv_buff_ptr = sd->orig_buffer_aligned + recv_buff_offset;
+
+       /* unpack a flush request from the received buffer */
+       flush_req = gpspm_flush_request__unpack(NULL, cmpl->byte_len,
+                       recv_buff_ptr);
+       if (flush_req == NULL) {
+               log_err("cannot unpack the flush request buffer\n");
+               goto err_terminate;
+       }
+
+       if (IS_NOT_THE_LAST_MESSAGE(flush_req)) {
+               op_ptr = csd->ws_ptr + flush_req->offset;
+               pmem_persist(op_ptr, flush_req->length);
+       } else {
+               /*
+                * This is the last message - the client is done.
+                */
+               gpspm_flush_request__free_unpacked(flush_req, NULL);
+               td->done = true;
+               return 0;
+       }
+
+       /* initiate the next receive operation */
+       if ((ret = rpma_recv(csd->conn, sd->msg_mr, recv_buff_offset,
+                       MAX_MSG_SIZE,
+                       (const void *)(uintptr_t)msg_index))) {
+               librpma_td_verror(td, ret, "rpma_recv");
+               goto err_free_unpacked;
+       }
+
+       /* prepare a flush response and pack it to a send buffer */
+       flush_resp.op_context = flush_req->op_context;
+       flush_resp_size = gpspm_flush_response__get_packed_size(&flush_resp);
+       if (flush_resp_size > MAX_MSG_SIZE) {
+               log_err(
+                       "Size of the packed flush response is bigger than the available space of the send buffer (%"
+                       PRIu64 " > %i\n", flush_resp_size, MAX_MSG_SIZE);
+               goto err_free_unpacked;
+       }
+
+       (void) gpspm_flush_response__pack(&flush_resp, send_buff_ptr);
+
+       /* send the flush response */
+       if ((ret = rpma_send(csd->conn, sd->msg_mr, send_buff_offset,
+                       flush_resp_size, RPMA_F_COMPLETION_ALWAYS, NULL))) {
+               librpma_td_verror(td, ret, "rpma_send");
+               goto err_free_unpacked;
+       }
+       --sd->msg_sqe_available;
+
+       gpspm_flush_request__free_unpacked(flush_req, NULL);
+
+       return 0;
+
+err_free_unpacked:
+       gpspm_flush_request__free_unpacked(flush_req, NULL);
+
+err_terminate:
+       td->terminate = true;
+
+       return -1;
+}
+
+static inline int server_queue_process(struct thread_data *td)
+{
+       struct librpma_fio_server_data *csd = td->io_ops_data;
+       struct server_data *sd = csd->server_data;
+       int ret;
+       int i;
+
+       /* min(# of queue entries, # of SQ entries available) */
+       uint32_t qes_to_process = min(sd->msg_queued_nr, sd->msg_sqe_available);
+       if (qes_to_process == 0)
+               return 0;
+
+       /* process queued completions */
+       for (i = 0; i < qes_to_process; ++i) {
+               if ((ret = server_qe_process(td, &sd->msgs_queued[i])))
+                       return ret;
+       }
+
+       /* progress the queue */
+       for (i = 0; i < sd->msg_queued_nr - qes_to_process; ++i) {
+               memcpy(&sd->msgs_queued[i],
+                       &sd->msgs_queued[qes_to_process + i],
+                       sizeof(sd->msgs_queued[i]));
+       }
+
+       sd->msg_queued_nr -= qes_to_process;
+
+       return 0;
+}
+
+static int server_cmpl_process(struct thread_data *td)
+{
+       struct librpma_fio_server_data *csd = td->io_ops_data;
+       struct server_data *sd = csd->server_data;
+       struct rpma_completion *cmpl = &sd->msgs_queued[sd->msg_queued_nr];
+       int ret;
+
+       ret = rpma_conn_completion_get(csd->conn, cmpl);
+       if (ret == RPMA_E_NO_COMPLETION) {
+               /* lack of completion is not an error */
+               return 0;
+       } else if (ret != 0) {
+               librpma_td_verror(td, ret, "rpma_conn_completion_get");
+               goto err_terminate;
+       }
+
+       /* validate the completion */
+       if (cmpl->op_status != IBV_WC_SUCCESS)
+               goto err_terminate;
+
+       if (cmpl->op == RPMA_OP_RECV)
+               ++sd->msg_queued_nr;
+       else if (cmpl->op == RPMA_OP_SEND)
+               ++sd->msg_sqe_available;
+
+       return 0;
+
+err_terminate:
+       td->terminate = true;
+
+       return -1;
+}
+
+static enum fio_q_status server_queue(struct thread_data *td, struct io_u *io_u)
+{
+       do {
+               if (server_cmpl_process(td))
+                       return FIO_Q_BUSY;
+
+               if (server_queue_process(td))
+                       return FIO_Q_BUSY;
+
+       } while (!td->done);
+
+       return FIO_Q_COMPLETED;
+}
+
+FIO_STATIC struct ioengine_ops ioengine_server = {
+       .name                   = "librpma_gpspm_server",
+       .version                = FIO_IOOPS_VERSION,
+       .init                   = server_init,
+       .post_init              = server_post_init,
+       .open_file              = server_open_file,
+       .close_file             = librpma_fio_server_close_file,
+       .queue                  = server_queue,
+       .invalidate             = librpma_fio_file_nop,
+       .cleanup                = server_cleanup,
+       .flags                  = FIO_SYNCIO,
+       .options                = librpma_fio_options,
+       .option_struct_size     = sizeof(struct librpma_fio_options_values),
+};
+
+/* register both engines */
+
+static void fio_init fio_librpma_gpspm_register(void)
+{
+       register_ioengine(&ioengine_client);
+       register_ioengine(&ioengine_server);
+}
+
+static void fio_exit fio_librpma_gpspm_unregister(void)
+{
+       unregister_ioengine(&ioengine_client);
+       unregister_ioengine(&ioengine_server);
+}
diff --git a/engines/librpma_gpspm_flush.pb-c.c b/engines/librpma_gpspm_flush.pb-c.c
new file mode 100644 (file)
index 0000000..3ff2475
--- /dev/null
@@ -0,0 +1,214 @@
+/*
+ * Copyright 2020, Intel Corporation
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License,
+ * version 2 as published by the Free Software Foundation..
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ */
+
+/* Generated by the protocol buffer compiler. DO NOT EDIT! */
+/* Generated from: librpma_gpspm_flush.proto */
+
+/* Do not generate deprecated warnings for self */
+#ifndef PROTOBUF_C__NO_DEPRECATED
+#define PROTOBUF_C__NO_DEPRECATED
+#endif
+
+#include "librpma_gpspm_flush.pb-c.h"
+void   gpspm_flush_request__init
+                     (GPSPMFlushRequest         *message)
+{
+  static const GPSPMFlushRequest init_value = GPSPM_FLUSH_REQUEST__INIT;
+  *message = init_value;
+}
+size_t gpspm_flush_request__get_packed_size
+                     (const GPSPMFlushRequest *message)
+{
+  assert(message->base.descriptor == &gpspm_flush_request__descriptor);
+  return protobuf_c_message_get_packed_size ((const ProtobufCMessage*)(message));
+}
+size_t gpspm_flush_request__pack
+                     (const GPSPMFlushRequest *message,
+                      uint8_t       *out)
+{
+  assert(message->base.descriptor == &gpspm_flush_request__descriptor);
+  return protobuf_c_message_pack ((const ProtobufCMessage*)message, out);
+}
+size_t gpspm_flush_request__pack_to_buffer
+                     (const GPSPMFlushRequest *message,
+                      ProtobufCBuffer *buffer)
+{
+  assert(message->base.descriptor == &gpspm_flush_request__descriptor);
+  return protobuf_c_message_pack_to_buffer ((const ProtobufCMessage*)message, buffer);
+}
+GPSPMFlushRequest *
+       gpspm_flush_request__unpack
+                     (ProtobufCAllocator  *allocator,
+                      size_t               len,
+                      const uint8_t       *data)
+{
+  return (GPSPMFlushRequest *)
+     protobuf_c_message_unpack (&gpspm_flush_request__descriptor,
+                                allocator, len, data);
+}
+void   gpspm_flush_request__free_unpacked
+                     (GPSPMFlushRequest *message,
+                      ProtobufCAllocator *allocator)
+{
+  if(!message)
+    return;
+  assert(message->base.descriptor == &gpspm_flush_request__descriptor);
+  protobuf_c_message_free_unpacked ((ProtobufCMessage*)message, allocator);
+}
+void   gpspm_flush_response__init
+                     (GPSPMFlushResponse         *message)
+{
+  static const GPSPMFlushResponse init_value = GPSPM_FLUSH_RESPONSE__INIT;
+  *message = init_value;
+}
+size_t gpspm_flush_response__get_packed_size
+                     (const GPSPMFlushResponse *message)
+{
+  assert(message->base.descriptor == &gpspm_flush_response__descriptor);
+  return protobuf_c_message_get_packed_size ((const ProtobufCMessage*)(message));
+}
+size_t gpspm_flush_response__pack
+                     (const GPSPMFlushResponse *message,
+                      uint8_t       *out)
+{
+  assert(message->base.descriptor == &gpspm_flush_response__descriptor);
+  return protobuf_c_message_pack ((const ProtobufCMessage*)message, out);
+}
+size_t gpspm_flush_response__pack_to_buffer
+                     (const GPSPMFlushResponse *message,
+                      ProtobufCBuffer *buffer)
+{
+  assert(message->base.descriptor == &gpspm_flush_response__descriptor);
+  return protobuf_c_message_pack_to_buffer ((const ProtobufCMessage*)message, buffer);
+}
+GPSPMFlushResponse *
+       gpspm_flush_response__unpack
+                     (ProtobufCAllocator  *allocator,
+                      size_t               len,
+                      const uint8_t       *data)
+{
+  return (GPSPMFlushResponse *)
+     protobuf_c_message_unpack (&gpspm_flush_response__descriptor,
+                                allocator, len, data);
+}
+void   gpspm_flush_response__free_unpacked
+                     (GPSPMFlushResponse *message,
+                      ProtobufCAllocator *allocator)
+{
+  if(!message)
+    return;
+  assert(message->base.descriptor == &gpspm_flush_response__descriptor);
+  protobuf_c_message_free_unpacked ((ProtobufCMessage*)message, allocator);
+}
+static const ProtobufCFieldDescriptor gpspm_flush_request__field_descriptors[3] =
+{
+  {
+    "offset",
+    1,
+    PROTOBUF_C_LABEL_REQUIRED,
+    PROTOBUF_C_TYPE_FIXED64,
+    0,   /* quantifier_offset */
+    offsetof(GPSPMFlushRequest, offset),
+    NULL,
+    NULL,
+    0,             /* flags */
+    0,NULL,NULL    /* reserved1,reserved2, etc */
+  },
+  {
+    "length",
+    2,
+    PROTOBUF_C_LABEL_REQUIRED,
+    PROTOBUF_C_TYPE_FIXED64,
+    0,   /* quantifier_offset */
+    offsetof(GPSPMFlushRequest, length),
+    NULL,
+    NULL,
+    0,             /* flags */
+    0,NULL,NULL    /* reserved1,reserved2, etc */
+  },
+  {
+    "op_context",
+    3,
+    PROTOBUF_C_LABEL_REQUIRED,
+    PROTOBUF_C_TYPE_FIXED64,
+    0,   /* quantifier_offset */
+    offsetof(GPSPMFlushRequest, op_context),
+    NULL,
+    NULL,
+    0,             /* flags */
+    0,NULL,NULL    /* reserved1,reserved2, etc */
+  },
+};
+static const unsigned gpspm_flush_request__field_indices_by_name[] = {
+  1,   /* field[1] = length */
+  0,   /* field[0] = offset */
+  2,   /* field[2] = op_context */
+};
+static const ProtobufCIntRange gpspm_flush_request__number_ranges[1 + 1] =
+{
+  { 1, 0 },
+  { 0, 3 }
+};
+const ProtobufCMessageDescriptor gpspm_flush_request__descriptor =
+{
+  PROTOBUF_C__MESSAGE_DESCRIPTOR_MAGIC,
+  "GPSPM_flush_request",
+  "GPSPMFlushRequest",
+  "GPSPMFlushRequest",
+  "",
+  sizeof(GPSPMFlushRequest),
+  3,
+  gpspm_flush_request__field_descriptors,
+  gpspm_flush_request__field_indices_by_name,
+  1,  gpspm_flush_request__number_ranges,
+  (ProtobufCMessageInit) gpspm_flush_request__init,
+  NULL,NULL,NULL    /* reserved[123] */
+};
+static const ProtobufCFieldDescriptor gpspm_flush_response__field_descriptors[1] =
+{
+  {
+    "op_context",
+    1,
+    PROTOBUF_C_LABEL_REQUIRED,
+    PROTOBUF_C_TYPE_FIXED64,
+    0,   /* quantifier_offset */
+    offsetof(GPSPMFlushResponse, op_context),
+    NULL,
+    NULL,
+    0,             /* flags */
+    0,NULL,NULL    /* reserved1,reserved2, etc */
+  },
+};
+static const unsigned gpspm_flush_response__field_indices_by_name[] = {
+  0,   /* field[0] = op_context */
+};
+static const ProtobufCIntRange gpspm_flush_response__number_ranges[1 + 1] =
+{
+  { 1, 0 },
+  { 0, 1 }
+};
+const ProtobufCMessageDescriptor gpspm_flush_response__descriptor =
+{
+  PROTOBUF_C__MESSAGE_DESCRIPTOR_MAGIC,
+  "GPSPM_flush_response",
+  "GPSPMFlushResponse",
+  "GPSPMFlushResponse",
+  "",
+  sizeof(GPSPMFlushResponse),
+  1,
+  gpspm_flush_response__field_descriptors,
+  gpspm_flush_response__field_indices_by_name,
+  1,  gpspm_flush_response__number_ranges,
+  (ProtobufCMessageInit) gpspm_flush_response__init,
+  NULL,NULL,NULL    /* reserved[123] */
+};
diff --git a/engines/librpma_gpspm_flush.pb-c.h b/engines/librpma_gpspm_flush.pb-c.h
new file mode 100644 (file)
index 0000000..ad475a9
--- /dev/null
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2020, Intel Corporation
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License,
+ * version 2 as published by the Free Software Foundation..
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ */
+
+/* Generated by the protocol buffer compiler. DO NOT EDIT! */
+/* Generated from: librpma_gpspm_flush.proto */
+
+#ifndef PROTOBUF_C_GPSPM_5fflush_2eproto__INCLUDED
+#define PROTOBUF_C_GPSPM_5fflush_2eproto__INCLUDED
+
+#include <protobuf-c/protobuf-c.h>
+
+PROTOBUF_C__BEGIN_DECLS
+
+#if PROTOBUF_C_VERSION_NUMBER < 1000000
+# error This file was generated by a newer version of protoc-c which is incompatible with your libprotobuf-c headers. Please update your headers.
+#elif 1003003 < PROTOBUF_C_MIN_COMPILER_VERSION
+# error This file was generated by an older version of protoc-c which is incompatible with your libprotobuf-c headers. Please regenerate this file with a newer version of protoc-c.
+#endif
+
+
+typedef struct _GPSPMFlushRequest GPSPMFlushRequest;
+typedef struct _GPSPMFlushResponse GPSPMFlushResponse;
+
+
+/* --- enums --- */
+
+
+/* --- messages --- */
+
+struct  _GPSPMFlushRequest
+{
+  ProtobufCMessage base;
+  uint64_t offset;
+  uint64_t length;
+  uint64_t op_context;
+};
+#define GPSPM_FLUSH_REQUEST__INIT \
+ { PROTOBUF_C_MESSAGE_INIT (&gpspm_flush_request__descriptor) \
+    , 0, 0, 0 }
+
+
+struct  _GPSPMFlushResponse
+{
+  ProtobufCMessage base;
+  uint64_t op_context;
+};
+#define GPSPM_FLUSH_RESPONSE__INIT \
+ { PROTOBUF_C_MESSAGE_INIT (&gpspm_flush_response__descriptor) \
+    , 0 }
+
+
+/* GPSPMFlushRequest methods */
+void   gpspm_flush_request__init
+                     (GPSPMFlushRequest         *message);
+size_t gpspm_flush_request__get_packed_size
+                     (const GPSPMFlushRequest   *message);
+size_t gpspm_flush_request__pack
+                     (const GPSPMFlushRequest   *message,
+                      uint8_t             *out);
+size_t gpspm_flush_request__pack_to_buffer
+                     (const GPSPMFlushRequest   *message,
+                      ProtobufCBuffer     *buffer);
+GPSPMFlushRequest *
+       gpspm_flush_request__unpack
+                     (ProtobufCAllocator  *allocator,
+                      size_t               len,
+                      const uint8_t       *data);
+void   gpspm_flush_request__free_unpacked
+                     (GPSPMFlushRequest *message,
+                      ProtobufCAllocator *allocator);
+/* GPSPMFlushResponse methods */
+void   gpspm_flush_response__init
+                     (GPSPMFlushResponse         *message);
+size_t gpspm_flush_response__get_packed_size
+                     (const GPSPMFlushResponse   *message);
+size_t gpspm_flush_response__pack
+                     (const GPSPMFlushResponse   *message,
+                      uint8_t             *out);
+size_t gpspm_flush_response__pack_to_buffer
+                     (const GPSPMFlushResponse   *message,
+                      ProtobufCBuffer     *buffer);
+GPSPMFlushResponse *
+       gpspm_flush_response__unpack
+                     (ProtobufCAllocator  *allocator,
+                      size_t               len,
+                      const uint8_t       *data);
+void   gpspm_flush_response__free_unpacked
+                     (GPSPMFlushResponse *message,
+                      ProtobufCAllocator *allocator);
+/* --- per-message closures --- */
+
+typedef void (*GPSPMFlushRequest_Closure)
+                 (const GPSPMFlushRequest *message,
+                  void *closure_data);
+typedef void (*GPSPMFlushResponse_Closure)
+                 (const GPSPMFlushResponse *message,
+                  void *closure_data);
+
+/* --- services --- */
+
+
+/* --- descriptors --- */
+
+extern const ProtobufCMessageDescriptor gpspm_flush_request__descriptor;
+extern const ProtobufCMessageDescriptor gpspm_flush_response__descriptor;
+
+PROTOBUF_C__END_DECLS
+
+
+#endif  /* PROTOBUF_C_GPSPM_5fflush_2eproto__INCLUDED */
diff --git a/engines/librpma_gpspm_flush.proto b/engines/librpma_gpspm_flush.proto
new file mode 100644 (file)
index 0000000..91765a7
--- /dev/null
@@ -0,0 +1,15 @@
+syntax = "proto2";
+
+message GPSPM_flush_request {
+    /* an offset of a region to be flushed within its memory registration */
+    required fixed64 offset = 1;
+    /* a length of a region to be flushed */
+    required fixed64 length = 2;
+    /* a user-defined operation context */
+    required fixed64 op_context = 3;
+}
+
+message GPSPM_flush_response {
+    /* the operation context of a completed request */
+    required fixed64 op_context = 1;
+}
index 42ee48ff02b3f6371027ab4cbcbc304aefbbea10..23e62c4c45e3c5bf15a9d646636d1cfe3cf49b0c 100644 (file)
@@ -38,6 +38,7 @@ struct rados_options {
        char *pool_name;
        char *client_name;
        int busy_poll;
+       int touch_objects;
 };
 
 static struct fio_option options[] = {
@@ -78,6 +79,16 @@ static struct fio_option options[] = {
                .category = FIO_OPT_C_ENGINE,
                .group    = FIO_OPT_G_RBD,
        },
+       {
+               .name     = "touch_objects",
+               .lname    = "touch objects on start",
+               .type     = FIO_OPT_BOOL,
+               .help     = "Touch (create) objects on start",
+               .off1     = offsetof(struct rados_options, touch_objects),
+               .def      = "1",
+               .category = FIO_OPT_C_ENGINE,
+               .group    = FIO_OPT_G_RBD,
+       },
        {
                .name     = NULL,
        },
@@ -194,9 +205,11 @@ static int _fio_rados_connect(struct thread_data *td)
        for (i = 0; i < td->o.nr_files; i++) {
                f = td->files[i];
                f->real_file_size = file_size;
-               r = rados_write(rados->io_ctx, f->file_name, "", 0, 0);
-               if (r < 0) {
-                       goto failed_obj_create;
+               if (o->touch_objects) {
+                       r = rados_write(rados->io_ctx, f->file_name, "", 0, 0);
+                       if (r < 0) {
+                               goto failed_obj_create;
+                       }
                }
        }
        return 0;
diff --git a/eta.c b/eta.c
index 978430120b850f8b6712a98aeae90e0101040238..db13cb18103226028ca324cf5acdfb2d03fe4507 100644 (file)
--- a/eta.c
+++ b/eta.c
@@ -331,7 +331,7 @@ static void calc_rate(int unified_rw_rep, unsigned long mtime,
                else
                        this_rate = 0;
 
-               if (unified_rw_rep) {
+               if (unified_rw_rep == UNIFIED_MIXED) {
                        rate[i] = 0;
                        rate[0] += this_rate;
                } else
@@ -356,7 +356,7 @@ static void calc_iops(int unified_rw_rep, unsigned long mtime,
                else
                        this_iops = 0;
 
-               if (unified_rw_rep) {
+               if (unified_rw_rep == UNIFIED_MIXED) {
                        iops[i] = 0;
                        iops[0] += this_iops;
                } else
diff --git a/examples/dfs.fio b/examples/dfs.fio
new file mode 100644 (file)
index 0000000..5de887d
--- /dev/null
@@ -0,0 +1,33 @@
+[global]
+ioengine=dfs
+pool=${POOL}
+cont=${CONT}
+filename_format=fio-test.$jobnum
+
+cpus_allowed_policy=split
+group_reporting=1
+time_based=0
+percentile_list=99.0:99.9:99.99:99.999:99.9999:100
+disable_slat=1
+disable_clat=1
+
+bs=1M
+size=100G
+iodepth=16
+numjobs=16
+
+[daos-seqwrite]
+rw=write
+stonewall
+
+[daos-seqread]
+rw=read
+stonewall
+
+[daos-randwrite]
+rw=randwrite
+stonewall
+
+[daos-randread]
+rw=randread
+stonewall
diff --git a/examples/filedelete-ioengine.fio b/examples/filedelete-ioengine.fio
new file mode 100644 (file)
index 0000000..3c0028f
--- /dev/null
@@ -0,0 +1,18 @@
+# Example filedelete job
+
+# 'filedelete' engine only do 'unlink(filename)', file will not be open().
+# 'filesize' must be set, then files will be created at setup stage.
+# 'unlink' is better set to 0, since the file is deleted in measurement.
+# the options disabled completion latency output such as 'disable_clat' and 'gtod_reduce' must not set.
+[global]
+ioengine=filedelete
+filesize=4k
+nrfiles=200
+unlink=0
+
+[t0]
+[t1]
+[t2]
+[t3]
+[t4]
+[t5]
diff --git a/examples/librpma_apm-client.fio b/examples/librpma_apm-client.fio
new file mode 100644 (file)
index 0000000..82a5d20
--- /dev/null
@@ -0,0 +1,24 @@
+# Example of the librpma_apm_client job
+
+[global]
+ioengine=librpma_apm_client
+create_serialize=0 # (required) forces specific initiation sequence
+serverip=[serverip] #IP address the server is listening on
+port=7204 # port(s) the server will listen on, <port; port + numjobs - 1> will be used
+thread
+
+# The client will get a remote memory region description after establishing
+# a connection.
+
+[client]
+numjobs=1 # number of parallel connections
+group_reporting=1
+sync=1 # 1 is the best for latency measurements, 0 for bandwidth
+iodepth=2 # total number of ious
+iodepth_batch_submit=1 # number of ious to be submitted at once
+rw=write # read/write/randread/randwrite/readwrite/rw
+rwmixread=70 # % of a mixed workload that should be reads
+blocksize=4KiB
+ramp_time=15s # gives some time to stabilize the workload
+time_based
+runtime=60s # run the workload for the specified period of time
diff --git a/examples/librpma_apm-server.fio b/examples/librpma_apm-server.fio
new file mode 100644 (file)
index 0000000..062b521
--- /dev/null
@@ -0,0 +1,26 @@
+# Example of the librpma_apm_server job
+
+[global]
+ioengine=librpma_apm_server
+create_serialize=0 # (required) forces specific initiation sequence
+kb_base=1000 # turn on the straight units handling (non-compatibility mode)
+serverip=[serverip] # IP address to listen on
+port=7204 # port(s) the server jobs will listen on, ports <port; port + numjobs - 1> will be used
+thread
+
+# The server side spawns one thread for each expected connection from
+# the client-side, opens and registers the range dedicated for this thread
+# (a workspace) from the provided memory.
+# Each of the server threads accepts a connection on the dedicated port
+# (different for each and every working thread) and waits for it to end up,
+# and closes itself.
+
+[server]
+# set to 1 (true) ONLY when Direct Write to PMem from the remote host is possible
+# (https://pmem.io/rpma/documentation/basic-direct-write-to-pmem.html)
+direct_write_to_pmem=0
+
+numjobs=1 # number of expected incomming connections
+size=100MiB # size of workspace for a single connection
+filename=malloc # device dax or an existing fsdax file or "malloc" for allocation from DRAM
+# filename=/dev/dax1.0
diff --git a/examples/librpma_gpspm-client.fio b/examples/librpma_gpspm-client.fio
new file mode 100644 (file)
index 0000000..843382d
--- /dev/null
@@ -0,0 +1,23 @@
+# Example of the librpma_gpspm_client job
+
+[global]
+ioengine=librpma_gpspm_client
+create_serialize=0 # (required) forces specific initiation sequence
+serverip=[serverip] #IP address the server is listening on
+port=7204 # port(s) the server will listen on, <port; port + numjobs - 1> will be used
+thread
+
+# The client will get a remote memory region description after establishing
+# a connection.
+
+[client]
+numjobs=1 # number of parallel connections
+group_reporting=1
+sync=1 # 1 is the best for latency measurements, 0 for bandwidth
+iodepth=2 # total number of ious
+iodepth_batch_submit=1 # number of ious to be submitted at once
+rw=write # write/randwrite
+blocksize=4KiB
+ramp_time=15s # gives some time to stabilize the workload
+time_based
+runtime=60s # run the workload for the specified period of time
diff --git a/examples/librpma_gpspm-server.fio b/examples/librpma_gpspm-server.fio
new file mode 100644 (file)
index 0000000..d618f2d
--- /dev/null
@@ -0,0 +1,31 @@
+# Example of the librpma_gpspm_server job
+
+[global]
+ioengine=librpma_gpspm_server
+create_serialize=0 # (required) forces specific initiation sequence
+kb_base=1000 # turn on the straight units handling (non-compatibility mode)
+serverip=[serverip] #IP address to listen on
+port=7204 # port(s) the server jobs will listen on, ports <port; port + numjobs - 1> will be used
+thread
+
+# The server side spawns one thread for each expected connection from
+# the client-side, opens and registers the range dedicated for this thread
+# (a workspace) from the provided memory.
+# Each of the server threads accepts a connection on the dedicated port
+# (different for each and every working thread), accepts and executes flush
+# requests, and sends back a flush response for each of the requests.
+# When the client is done it sends the termination notice to the server's thread.
+
+[server]
+# set to 1 (true) ONLY when Direct Write to PMem from the remote host is possible
+# (https://pmem.io/rpma/documentation/basic-direct-write-to-pmem.html)
+direct_write_to_pmem=0
+numjobs=1 # number of expected incomming connections
+iodepth=2 # number of parallel GPSPM requests
+size=100MiB # size of workspace for a single connection
+filename=malloc # device dax or an existing fsdax file or "malloc" for allocation from DRAM
+# filename=/dev/dax1.0
+
+# The client will terminate the server when the client will end up its job.
+time_based
+runtime=365d
index 661d4c2fa083e2c4b948b628a344c6f294b28ebe..e664f8b42f795f4d03675e437870382a0243b13b 100644 (file)
@@ -1118,6 +1118,13 @@ int setup_files(struct thread_data *td)
        if (o->read_iolog_file)
                goto done;
 
+       if (td->o.zone_mode == ZONE_MODE_ZBD) {
+               err = zbd_init_files(td);
+               if (err)
+                       goto err_out;
+       }
+       zbd_recalc_options_with_zone_granularity(td);
+
        /*
         * check sizes. if the files/devices do not exist and the size
         * isn't passed to fio, abort.
@@ -1395,16 +1402,17 @@ int setup_files(struct thread_data *td)
        }
 
 done:
-       if (o->create_only)
-               td->done = 1;
-
-       td_restore_runstate(td, old_state);
-
        if (td->o.zone_mode == ZONE_MODE_ZBD) {
                err = zbd_setup_files(td);
                if (err)
                        goto err_out;
        }
+
+       if (o->create_only)
+               td->done = 1;
+
+       td_restore_runstate(td, old_state);
+
        return 0;
 
 err_offset:
diff --git a/fio.1 b/fio.1
index accc6a329a12fa98b65cd7fcd443b06faaef8fcf..18dc156ad027471b8d11083158c21688c610d93d 100644 (file)
--- a/fio.1
+++ b/fio.1
@@ -348,6 +348,9 @@ us or usec means microseconds
 .PD
 .RE
 .P
+`z' suffix specifies that the value is measured in zones.
+Value is recalculated once block device's zone size becomes known.
+.P
 If the option accepts an upper and lower range, use a colon ':' or
 minus '\-' to separate such values. See \fIirange\fR parameter type.
 If the lower value specified happens to be larger than the upper value
@@ -687,7 +690,8 @@ of how that would work.
 .TP
 .BI ioscheduler \fR=\fPstr
 Attempt to switch the device hosting the file to the specified I/O scheduler
-before running.
+before running. If the file is a pipe, a character device file or if device
+hosting the file could not be determined, this option is ignored.
 .TP
 .BI create_serialize \fR=\fPbool
 If true, serialize the file creation for the jobs. This may be handy to
@@ -783,7 +787,7 @@ If not specified it defaults to the zone size. If the target device is a zoned
 block device, the zone capacity is obtained from the device information and this
 option is ignored.
 .TP
-.BI zoneskip \fR=\fPint
+.BI zoneskip \fR=\fPint[z]
 For \fBzonemode\fR=strided, the number of bytes to skip after \fBzonesize\fR
 bytes of data have been transferred.
 
@@ -921,10 +925,32 @@ behaves in a similar fashion, except it sends the same offset 8 number of
 times before generating a new offset.
 .RE
 .TP
-.BI unified_rw_reporting \fR=\fPbool
+.BI unified_rw_reporting \fR=\fPstr
 Fio normally reports statistics on a per data direction basis, meaning that
-reads, writes, and trims are accounted and reported separately. If this
-option is set fio sums the results and report them as "mixed" instead.
+reads, writes, and trims are accounted and reported separately. This option
+determines whether fio reports the results normally, summed together, or as
+both options.
+Accepted values are:
+.RS
+.TP
+.B none
+Normal statistics reporting.
+.TP
+.B mixed
+Statistics are summed per data direction and reported together.
+.TP
+.B both
+Statistics are reported normally, followed by the mixed statistics.
+.TP
+.B 0
+Backward-compatible alias for \fBnone\fR.
+.TP
+.B 1
+Backward-compatible alias for \fBmixed\fR.
+.TP
+.B 2
+Alias for \fBboth\fR.
+.RE
 .TP
 .BI randrepeat \fR=\fPbool
 Seed the random number generator used for random I/O patterns in a
@@ -1033,7 +1059,7 @@ The values are all relative to each other, and no absolute meaning
 should be associated with them.
 .RE
 .TP
-.BI offset \fR=\fPint
+.BI offset \fR=\fPint[%|z]
 Start I/O at the provided offset in the file, given as either a fixed size in
 bytes or a percentage. If a percentage is given, the generated offset will be
 aligned to the minimum \fBblocksize\fR or to the value of \fBoffset_align\fR if
@@ -1048,7 +1074,7 @@ If set to non-zero value, the byte offset generated by a percentage \fBoffset\fR
 is aligned upwards to this value. Defaults to 0 meaning that a percentage
 offset is aligned to the minimum block size.
 .TP
-.BI offset_increment \fR=\fPint
+.BI offset_increment \fR=\fPint[%|z]
 If this is provided, then the real offset becomes `\fBoffset\fR + \fBoffset_increment\fR
 * thread_number', where the thread number is a counter that starts at 0 and
 is incremented for each sub-job (i.e. when \fBnumjobs\fR option is
@@ -1570,7 +1596,7 @@ Pin the specified amount of memory with \fBmlock\fR\|(2). Can be used to
 simulate a smaller amount of memory. The amount specified is per worker.
 .SS "I/O size"
 .TP
-.BI size \fR=\fPint
+.BI size \fR=\fPint[%|z]
 The total size of file I/O for each thread of this job. Fio will run until
 this many bytes has been transferred, unless runtime is limited by other options
 (such as \fBruntime\fR, for instance, or increased/decreased by \fBio_size\fR).
@@ -1585,7 +1611,7 @@ given, fio will use 20% of the full size of the given files or devices.
 Can be combined with \fBoffset\fR to constrain the start and end range
 that I/O will be done within.
 .TP
-.BI io_size \fR=\fPint "\fR,\fB io_limit" \fR=\fPint
+.BI io_size \fR=\fPint[%|z] "\fR,\fB io_limit" \fR=\fPint[%|z]
 Normally fio operates within the region set by \fBsize\fR, which means
 that the \fBsize\fR option sets both the region and size of I/O to be
 performed. Sometimes that is not what you want. With this option, it is
@@ -1822,6 +1848,11 @@ Simply do stat() and do no I/O to the file. You need to set 'filesize'
 and 'nrfiles', so that files will be created.
 This engine is to measure file lookup and meta data access.
 .TP
+.B filedelete
+Simply delete files by unlink() and do no I/O to the file. You need to set 'filesize'
+and 'nrfiles', so that files will be created.
+This engine is to measure file delete.
+.TP
 .B libpmem
 Read and write using mmap I/O to a file on a filesystem
 mounted with DAX on a persistent memory device through the PMDK
@@ -1853,6 +1884,10 @@ GPUDirect Storage-supported filesystem. This engine performs
 I/O without transferring buffers between user-space and the kernel,
 unless \fBverify\fR is set or \fBcuda_io\fR is \fBposix\fR. \fBiomem\fR must
 not be \fBcudamalloc\fR. This ioengine defines engine specific options.
+.TP
+.B dfs
+I/O engine supporting asynchronous read and write operations to the DAOS File
+System (DFS) via libdfs.
 .SS "I/O engine specific parameters"
 In addition, there are some parameters which are only valid when a specific
 \fBioengine\fR is in use. These are used identically to normal parameters,
@@ -1949,7 +1984,7 @@ The TCP or UDP port to bind to or connect to. If this is used with
 this will be the starting port number since fio will use a range of
 ports.
 .TP
-.BI (rdma)port
+.BI (rdma, librpma_*)port
 The port to use for RDMA-CM communication. This should be the same
 value on the client and the server side.
 .TP
@@ -1958,6 +1993,12 @@ The hostname or IP address to use for TCP, UDP or RDMA-CM based I/O.
 If the job is a TCP listener or UDP reader, the hostname is not used
 and must be omitted unless it is a valid UDP multicast address.
 .TP
+.BI (librpma_*)serverip \fR=\fPstr
+The IP address to be used for RDMA-CM based I/O.
+.TP
+.BI (librpma_*_server)direct_write_to_pmem \fR=\fPbool
+Set to 1 only when Direct Write to PMem from the remote host is possible. Otherwise, set to 0.
+.TP
 .BI (netsplice,net)interface \fR=\fPstr
 The IP address of the network interface used to send or receive UDP
 multicast.
@@ -2052,6 +2093,11 @@ by default.
 Poll store instead of waiting for completion. Usually this provides better
 throughput at cost of higher(up to 100%) CPU utilization.
 .TP
+.BI (rados)touch_objects \fR=\fPbool
+During initialization, touch (create if do not exist) all objects (files).
+Touching all objects affects ceph caches and likely impacts test results.
+Enabled by default.
+.TP
 .BI (http)http_host \fR=\fPstr
 Hostname to connect to. For S3, this could be the bucket name. Default
 is \fBlocalhost\fR
@@ -2206,6 +2252,20 @@ from RAM to GPU after a read. \fBverify\fR does not affect
 the use of cudaMemcpy.
 .RE
 .RE
+.TP
+.BI (dfs)pool
+Specify the UUID of the DAOS pool to connect to.
+.TP
+.BI (dfs)cont
+Specify the UUID of the DAOS DAOS container to open.
+.TP
+.BI (dfs)chunk_size
+Specificy a different chunk size (in bytes) for the dfs file.
+Use DAOS container's chunk size by default.
+.TP
+.BI (dfs)object_class
+Specificy a different object class for the dfs file.
+Use DAOS container's object class by default.
 .SS "I/O depth"
 .TP
 .BI iodepth \fR=\fPint
@@ -2400,10 +2460,11 @@ Used with \fBlatency_target\fR. If false (default), fio will find the highest
 queue depth that meets \fBlatency_target\fR and exit. If true, fio will continue
 running and try to meet \fBlatency_target\fR by adjusting queue depth.
 .TP
-.BI max_latency \fR=\fPtime
+.BI max_latency \fR=\fPtime[,time][,time]
 If set, fio will exit the job with an ETIMEDOUT error if it exceeds this
 maximum latency. When the unit is omitted, the value is interpreted in
-microseconds.
+microseconds. Comma-separated values may be specified for reads, writes,
+and trims as described in \fBblocksize\fR.
 .TP
 .BI rate_cycle \fR=\fPint
 Average bandwidth for \fBrate\fR and \fBrate_min\fR over this number
index f85da6e082c8d7cef315f644a2be90cf853ce82f..e3f483a700130a5fc6d490809b5c78bdd1934cd3 100644 (file)
--- a/gettime.c
+++ b/gettime.c
@@ -671,12 +671,21 @@ static int clock_cmp(const void *p1, const void *p2)
 int fio_monotonic_clocktest(int debug)
 {
        struct clock_thread *cthreads;
-       unsigned int nr_cpus = cpus_online();
+       unsigned int seen_cpus, nr_cpus = cpus_online();
        struct clock_entry *entries;
        unsigned long nr_entries, tentries, failed = 0;
        struct clock_entry *prev, *this;
        uint32_t seq = 0;
        unsigned int i;
+       os_cpu_mask_t mask;
+
+#ifdef CONFIG_PTHREAD_GETAFFINITY
+       fio_get_thread_affinity(mask);
+#else
+       memset(&mask, 0, sizeof(mask));
+       for (i = 0; i < nr_cpus; i++)
+               fio_cpu_set(&mask, i);
+#endif
 
        if (debug) {
                log_info("cs: reliable_tsc: %s\n", tsc_reliable ? "yes" : "no");
@@ -703,25 +712,31 @@ int fio_monotonic_clocktest(int debug)
        if (debug)
                log_info("cs: Testing %u CPUs\n", nr_cpus);
 
+       seen_cpus = 0;
        for (i = 0; i < nr_cpus; i++) {
                struct clock_thread *t = &cthreads[i];
 
+               if (!fio_cpu_isset(&mask, i))
+                       continue;
                t->cpu = i;
                t->debug = debug;
                t->seq = &seq;
                t->nr_entries = nr_entries;
-               t->entries = &entries[i * nr_entries];
+               t->entries = &entries[seen_cpus * nr_entries];
                __fio_sem_init(&t->lock, FIO_SEM_LOCKED);
                if (pthread_create(&t->thread, NULL, clock_thread_fn, t)) {
                        failed++;
                        nr_cpus = i;
                        break;
                }
+               seen_cpus++;
        }
 
        for (i = 0; i < nr_cpus; i++) {
                struct clock_thread *t = &cthreads[i];
 
+               if (!fio_cpu_isset(&mask, i))
+                       continue;
                fio_sem_up(&t->lock);
        }
 
@@ -729,6 +744,8 @@ int fio_monotonic_clocktest(int debug)
                struct clock_thread *t = &cthreads[i];
                void *ret;
 
+               if (!fio_cpu_isset(&mask, i))
+                       continue;
                pthread_join(t->thread, &ret);
                if (ret)
                        failed++;
@@ -742,6 +759,7 @@ int fio_monotonic_clocktest(int debug)
                goto err;
        }
 
+       tentries = nr_entries * seen_cpus;
        qsort(entries, tentries, sizeof(struct clock_entry), clock_cmp);
 
        /* silence silly gcc */
diff --git a/init.c b/init.c
index eea6e54692b177036dce001134f8ed1baeb62ca8..60c7cff405d70d8e974545026e2fe659b512b7ed 100644 (file)
--- a/init.c
+++ b/init.c
@@ -448,19 +448,6 @@ static void dump_opt_list(struct thread_data *td)
        }
 }
 
-static void fio_dump_options_free(struct thread_data *td)
-{
-       while (!flist_empty(&td->opt_list)) {
-               struct print_option *p;
-
-               p = flist_first_entry(&td->opt_list, struct print_option, list);
-               flist_del_init(&p->list);
-               free(p->name);
-               free(p->value);
-               free(p);
-       }
-}
-
 static void copy_opt_list(struct thread_data *dst, struct thread_data *src)
 {
        struct flist_head *entry;
@@ -646,6 +633,11 @@ static int fixup_options(struct thread_data *td)
                ret |= 1;
        }
 
+       if (o->zone_mode == ZONE_MODE_ZBD && !o->create_serialize) {
+               log_err("fio: --zonemode=zbd and --create_serialize=0 are not compatible.\n");
+               ret |= 1;
+       }
+
        if (o->zone_mode == ZONE_MODE_STRIDED && !o->zone_size) {
                log_err("fio: --zonesize must be specified when using --zonemode=strided.\n");
                ret |= 1;
@@ -961,7 +953,9 @@ static int fixup_options(struct thread_data *td)
        /*
         * Fix these up to be nsec internally
         */
-       o->max_latency *= 1000ULL;
+       for_each_rw_ddir(ddir)
+               o->max_latency[ddir] *= 1000ULL;
+
        o->latency_target *= 1000ULL;
 
        return ret;
diff --git a/io_u.c b/io_u.c
index 00a219c2e85922906dd859583ef6f1ae31ad29c0..b421a579bd0a1aaa594692a21731a2774de77cea 100644 (file)
--- a/io_u.c
+++ b/io_u.c
@@ -1389,11 +1389,16 @@ static long set_io_u_file(struct thread_data *td, struct io_u *io_u)
        return 0;
 }
 
-static void lat_fatal(struct thread_data *td, struct io_completion_data *icd,
+static void lat_fatal(struct thread_data *td, struct io_u *io_u, struct io_completion_data *icd,
                      unsigned long long tnsec, unsigned long long max_nsec)
 {
-       if (!td->error)
-               log_err("fio: latency of %llu nsec exceeds specified max (%llu nsec)\n", tnsec, max_nsec);
+       if (!td->error) {
+               log_err("fio: latency of %llu nsec exceeds specified max (%llu nsec): %s %s %llu %llu\n",
+                                       tnsec, max_nsec,
+                                       io_u->file->file_name,
+                                       io_ddir_name(io_u->ddir),
+                                       io_u->offset, io_u->buflen);
+       }
        td_verror(td, ETIMEDOUT, "max latency exceeded");
        icd->error = ETIMEDOUT;
 }
@@ -1888,11 +1893,13 @@ static void account_io_completion(struct thread_data *td, struct io_u *io_u,
                                icd->error = ops->io_u_lat(td, tnsec);
                }
 
-               if (td->o.max_latency && tnsec > td->o.max_latency)
-                       lat_fatal(td, icd, tnsec, td->o.max_latency);
-               if (td->o.latency_target && tnsec > td->o.latency_target) {
-                       if (lat_target_failed(td))
-                               lat_fatal(td, icd, tnsec, td->o.latency_target);
+               if (ddir_rw(idx)) {
+                       if (td->o.max_latency[idx] && tnsec > td->o.max_latency[idx])
+                               lat_fatal(td, io_u, icd, tnsec, td->o.max_latency[idx]);
+                       if (td->o.latency_target && tnsec > td->o.latency_target) {
+                               if (lat_target_failed(td))
+                                       lat_fatal(td, io_u, icd, tnsec, td->o.latency_target);
+                       }
                }
        }
 
diff --git a/iolog.c b/iolog.c
index fa40c8572664a4e14d1e53d39eae3547dc43b672..cf264916a9ecd83f7b3079bd186573441dba65fb 100644 (file)
--- a/iolog.c
+++ b/iolog.c
@@ -607,12 +607,11 @@ static int open_socket(const char *path)
 /*
  * open iolog, check version, and call appropriate parser
  */
-static bool init_iolog_read(struct thread_data *td)
+static bool init_iolog_read(struct thread_data *td, char *fname)
 {
-       char buffer[256], *p, *fname;
+       char buffer[256], *p;
        FILE *f = NULL;
 
-       fname = get_name_by_idx(td->o.read_iolog_file, td->subjob_number);
        dprint(FD_IO, "iolog: name=%s\n", fname);
 
        if (is_socket(fname)) {
@@ -701,15 +700,16 @@ bool init_iolog(struct thread_data *td)
 
        if (td->o.read_iolog_file) {
                int need_swap;
+               char * fname = get_name_by_idx(td->o.read_iolog_file, td->subjob_number);
 
                /*
                 * Check if it's a blktrace file and load that if possible.
                 * Otherwise assume it's a normal log file and load that.
                 */
-               if (is_blktrace(td->o.read_iolog_file, &need_swap))
-                       ret = load_blktrace(td, td->o.read_iolog_file, need_swap);
+               if (is_blktrace(fname, &need_swap))
+                       ret = load_blktrace(td, fname, need_swap);
                else
-                       ret = init_iolog_read(td);
+                       ret = init_iolog_read(td, fname);
        } else if (td->o.write_iolog_file)
                ret = init_iolog_write(td);
        else
index 647748963193db4db016b2befde7fa2ec4f3a758..15a16229ef9a5d02524cb478a74ba080ecc911e0 100644 (file)
@@ -141,6 +141,10 @@ static const struct opt_group fio_opt_cat_groups[] = {
                .name   = "RDMA I/O engine", /* rdma */
                .mask   = FIO_OPT_G_RDMA,
        },
+       {
+               .name   = "librpma I/O engines", /* librpma_apm && librpma_gpspm */
+               .mask   = FIO_OPT_G_LIBRPMA,
+       },
        {
                .name   = "libaio I/O engine", /* libaio */
                .mask   = FIO_OPT_G_LIBAIO,
@@ -177,6 +181,10 @@ static const struct opt_group fio_opt_cat_groups[] = {
                .name   = "libcufile I/O engine", /* libcufile */
                .mask   = FIO_OPT_G_LIBCUFILE,
        },
+       {
+               .name   = "DAOS File System (dfs) I/O engine", /* dfs */
+               .mask   = FIO_OPT_G_DFS,
+       },
        {
                .name   = NULL,
        },
index d2f1ceb391c34fd6101cf3e4b6a867b078b178e6..ff74862968e4335a1bb064d499c08441a54b16ac 100644 (file)
@@ -52,6 +52,7 @@ enum opt_category_group {
        __FIO_OPT_G_E4DEFRAG,
        __FIO_OPT_G_NETIO,
        __FIO_OPT_G_RDMA,
+       __FIO_OPT_G_LIBRPMA,
        __FIO_OPT_G_LIBAIO,
        __FIO_OPT_G_ACT,
        __FIO_OPT_G_LATPROF,
@@ -68,6 +69,7 @@ enum opt_category_group {
        __FIO_OPT_G_FILESTAT,
        __FIO_OPT_G_NR,
        __FIO_OPT_G_LIBCUFILE,
+       __FIO_OPT_G_DFS,
 
        FIO_OPT_G_RATE          = (1ULL << __FIO_OPT_G_RATE),
        FIO_OPT_G_ZONE          = (1ULL << __FIO_OPT_G_ZONE),
@@ -94,6 +96,7 @@ enum opt_category_group {
        FIO_OPT_G_E4DEFRAG      = (1ULL << __FIO_OPT_G_E4DEFRAG),
        FIO_OPT_G_NETIO         = (1ULL << __FIO_OPT_G_NETIO),
        FIO_OPT_G_RDMA          = (1ULL << __FIO_OPT_G_RDMA),
+       FIO_OPT_G_LIBRPMA       = (1ULL << __FIO_OPT_G_LIBRPMA),
        FIO_OPT_G_LIBAIO        = (1ULL << __FIO_OPT_G_LIBAIO),
        FIO_OPT_G_ACT           = (1ULL << __FIO_OPT_G_ACT),
        FIO_OPT_G_LATPROF       = (1ULL << __FIO_OPT_G_LATPROF),
@@ -110,6 +113,7 @@ enum opt_category_group {
        FIO_OPT_G_IOURING       = (1ULL << __FIO_OPT_G_IOURING),
        FIO_OPT_G_FILESTAT      = (1ULL << __FIO_OPT_G_FILESTAT),
        FIO_OPT_G_LIBCUFILE     = (1ULL << __FIO_OPT_G_LIBCUFILE),
+       FIO_OPT_G_DFS           = (1ULL << __FIO_OPT_G_DFS),
 };
 
 extern const struct opt_group *opt_group_from_mask(uint64_t *mask);
index e62e0cfb35413a3f59b8c8d3d141a40d6c44f8b7..ddabaa82d240202df131c0a81a0ae761cbcded66 100644 (file)
--- a/options.c
+++ b/options.c
@@ -1471,8 +1471,13 @@ static int str_offset_cb(void *data, unsigned long long *__val)
        if (parse_is_percent(v)) {
                td->o.start_offset = 0;
                td->o.start_offset_percent = -1ULL - v;
+               td->o.start_offset_nz = 0;
                dprint(FD_PARSE, "SET start_offset_percent %d\n",
                                        td->o.start_offset_percent);
+       } else if (parse_is_zone(v)) {
+               td->o.start_offset = 0;
+               td->o.start_offset_percent = 0;
+               td->o.start_offset_nz = v - ZONE_BASE_VAL;
        } else
                td->o.start_offset = v;
 
@@ -1487,8 +1492,13 @@ static int str_offset_increment_cb(void *data, unsigned long long *__val)
        if (parse_is_percent(v)) {
                td->o.offset_increment = 0;
                td->o.offset_increment_percent = -1ULL - v;
+               td->o.offset_increment_nz = 0;
                dprint(FD_PARSE, "SET offset_increment_percent %d\n",
                                        td->o.offset_increment_percent);
+       } else if (parse_is_zone(v)) {
+               td->o.offset_increment = 0;
+               td->o.offset_increment_percent = 0;
+               td->o.offset_increment_nz = v - ZONE_BASE_VAL;
        } else
                td->o.offset_increment = v;
 
@@ -1505,6 +1515,10 @@ static int str_size_cb(void *data, unsigned long long *__val)
                td->o.size_percent = -1ULL - v;
                dprint(FD_PARSE, "SET size_percent %d\n",
                                        td->o.size_percent);
+       } else if (parse_is_zone(v)) {
+               td->o.size = 0;
+               td->o.size_percent = 0;
+               td->o.size_nz = v - ZONE_BASE_VAL;
        } else
                td->o.size = v;
 
@@ -1525,12 +1539,30 @@ static int str_io_size_cb(void *data, unsigned long long *__val)
                }
                dprint(FD_PARSE, "SET io_size_percent %d\n",
                                        td->o.io_size_percent);
+       } else if (parse_is_zone(v)) {
+               td->o.io_size = 0;
+               td->o.io_size_percent = 0;
+               td->o.io_size_nz = v - ZONE_BASE_VAL;
        } else
                td->o.io_size = v;
 
        return 0;
 }
 
+static int str_zoneskip_cb(void *data, unsigned long long *__val)
+{
+       struct thread_data *td = cb_data_to_td(data);
+       unsigned long long v = *__val;
+
+       if (parse_is_zone(v)) {
+               td->o.zone_skip = 0;
+               td->o.zone_skip_nz = v - ZONE_BASE_VAL;
+       } else
+               td->o.zone_skip = v;
+
+       return 0;
+}
+
 static int str_write_bw_log_cb(void *data, const char *str)
 {
        struct thread_data *td = cb_data_to_td(data);
@@ -1913,6 +1945,16 @@ struct fio_option fio_options[FIO_MAX_OPTS] = {
                            .help = "RDMA IO engine",
                          },
 #endif
+#ifdef CONFIG_LIBRPMA_APM
+                         { .ival = "librpma_apm",
+                           .help = "librpma IO engine in APM mode",
+                         },
+#endif
+#ifdef CONFIG_LIBRPMA_GPSPM
+                         { .ival = "librpma_gpspm",
+                           .help = "librpma IO engine in GPSPM mode",
+                         },
+#endif
 #ifdef CONFIG_LINUX_EXT4_MOVE_EXTENT
                          { .ival = "e4defrag",
                            .help = "ext4 defrag engine",
@@ -1979,6 +2021,11 @@ struct fio_option fio_options[FIO_MAX_OPTS] = {
                          { .ival = "nbd",
                            .help = "Network Block Device (NBD) IO engine"
                          },
+#ifdef CONFIG_DFS
+                         { .ival = "dfs",
+                           .help = "DAOS File System (dfs) IO engine",
+                         },
+#endif
                },
        },
        {
@@ -2081,11 +2128,10 @@ struct fio_option fio_options[FIO_MAX_OPTS] = {
        {
                .name   = "size",
                .lname  = "Size",
-               .type   = FIO_OPT_STR_VAL,
+               .type   = FIO_OPT_STR_VAL_ZONE,
                .cb     = str_size_cb,
                .off1   = offsetof(struct thread_options, size),
                .help   = "Total size of device or files",
-               .interval = 1024 * 1024,
                .category = FIO_OPT_C_IO,
                .group  = FIO_OPT_G_INVALID,
        },
@@ -2093,11 +2139,10 @@ struct fio_option fio_options[FIO_MAX_OPTS] = {
                .name   = "io_size",
                .alias  = "io_limit",
                .lname  = "IO Size",
-               .type   = FIO_OPT_STR_VAL,
+               .type   = FIO_OPT_STR_VAL_ZONE,
                .cb     = str_io_size_cb,
                .off1   = offsetof(struct thread_options, io_size),
                .help   = "Total size of I/O to be performed",
-               .interval = 1024 * 1024,
                .category = FIO_OPT_C_IO,
                .group  = FIO_OPT_G_INVALID,
        },
@@ -2138,12 +2183,11 @@ struct fio_option fio_options[FIO_MAX_OPTS] = {
                .name   = "offset",
                .lname  = "IO offset",
                .alias  = "fileoffset",
-               .type   = FIO_OPT_STR_VAL,
+               .type   = FIO_OPT_STR_VAL_ZONE,
                .cb     = str_offset_cb,
                .off1   = offsetof(struct thread_options, start_offset),
                .help   = "Start IO from this offset",
                .def    = "0",
-               .interval = 1024 * 1024,
                .category = FIO_OPT_C_IO,
                .group  = FIO_OPT_G_INVALID,
        },
@@ -2161,14 +2205,13 @@ struct fio_option fio_options[FIO_MAX_OPTS] = {
        {
                .name   = "offset_increment",
                .lname  = "IO offset increment",
-               .type   = FIO_OPT_STR_VAL,
+               .type   = FIO_OPT_STR_VAL_ZONE,
                .cb     = str_offset_increment_cb,
                .off1   = offsetof(struct thread_options, offset_increment),
                .help   = "What is the increment from one offset to the next",
                .parent = "offset",
                .hide   = 1,
                .def    = "0",
-               .interval = 1024 * 1024,
                .category = FIO_OPT_C_IO,
                .group  = FIO_OPT_G_INVALID,
        },
@@ -3404,11 +3447,11 @@ struct fio_option fio_options[FIO_MAX_OPTS] = {
        {
                .name   = "zoneskip",
                .lname  = "Zone skip",
-               .type   = FIO_OPT_STR_VAL,
+               .type   = FIO_OPT_STR_VAL_ZONE,
+               .cb     = str_zoneskip_cb,
                .off1   = offsetof(struct thread_options, zone_skip),
                .help   = "Space between IO zones",
                .def    = "0",
-               .interval = 1024 * 1024,
                .category = FIO_OPT_C_IO,
                .group  = FIO_OPT_G_ZONE,
        },
@@ -3728,8 +3771,10 @@ struct fio_option fio_options[FIO_MAX_OPTS] = {
        {
                .name   = "max_latency",
                .lname  = "Max Latency (usec)",
-               .type   = FIO_OPT_STR_VAL_TIME,
-               .off1   = offsetof(struct thread_options, max_latency),
+               .type   = FIO_OPT_ULL,
+               .off1   = offsetof(struct thread_options, max_latency[DDIR_READ]),
+               .off2   = offsetof(struct thread_options, max_latency[DDIR_WRITE]),
+               .off3   = offsetof(struct thread_options, max_latency[DDIR_TRIM]),
                .help   = "Maximum tolerated IO latency (usec)",
                .is_time = 1,
                .category = FIO_OPT_C_IO,
@@ -4588,12 +4633,39 @@ struct fio_option fio_options[FIO_MAX_OPTS] = {
        {
                .name   = "unified_rw_reporting",
                .lname  = "Unified RW Reporting",
-               .type   = FIO_OPT_BOOL,
+               .type   = FIO_OPT_STR,
                .off1   = offsetof(struct thread_options, unified_rw_rep),
                .help   = "Unify reporting across data direction",
-               .def    = "0",
+               .def    = "none",
                .category = FIO_OPT_C_GENERAL,
                .group  = FIO_OPT_G_INVALID,
+               .posval = {
+                         { .ival = "none",
+                           .oval = UNIFIED_SPLIT,
+                           .help = "Normal statistics reporting",
+                         },
+                         { .ival = "mixed",
+                           .oval = UNIFIED_MIXED,
+                           .help = "Statistics are summed per data direction and reported together",
+                         },
+                         { .ival = "both",
+                           .oval = UNIFIED_BOTH,
+                           .help = "Statistics are reported normally, followed by the mixed statistics"
+                         },
+                         /* Compatibility with former boolean values */
+                         { .ival = "0",
+                           .oval = UNIFIED_SPLIT,
+                           .help = "Alias for 'none'",
+                         },
+                         { .ival = "1",
+                           .oval = UNIFIED_MIXED,
+                           .help = "Alias for 'mixed'",
+                         },
+                         { .ival = "2",
+                           .oval = UNIFIED_BOTH,
+                           .help = "Alias for 'both'",
+                         },
+               },
        },
        {
                .name   = "continue_on_error",
@@ -5426,6 +5498,19 @@ void fio_options_free(struct thread_data *td)
        }
 }
 
+void fio_dump_options_free(struct thread_data *td)
+{
+       while (!flist_empty(&td->opt_list)) {
+               struct print_option *p;
+
+               p = flist_first_entry(&td->opt_list, struct print_option, list);
+               flist_del_init(&p->list);
+               free(p->name);
+               free(p->value);
+               free(p);
+       }
+}
+
 struct fio_option *fio_option_find(const char *name)
 {
        return find_option(fio_options, name);
index 5276f31e6818673a338fcbc3ef18997263b0abd0..df80fd9864bdd3f18d84e22c9aee937f45eab8aa 100644 (file)
--- a/options.h
+++ b/options.h
@@ -16,6 +16,7 @@ void add_opt_posval(const char *, const char *, const char *);
 void del_opt_posval(const char *, const char *);
 struct thread_data;
 void fio_options_free(struct thread_data *);
+void fio_dump_options_free(struct thread_data *);
 char *get_next_str(char **ptr);
 int get_max_str_idx(char *input);
 char* get_name_by_idx(char *input, int index);
index 5562b0da93a67bd2949032e54e914ce85ae7c380..ea8d79221c06454168d39857a994ea2e88e83ba5 100644 (file)
@@ -74,6 +74,9 @@ typedef cpu_set_t os_cpu_mask_t;
        sched_getaffinity((pid), (ptr))
 #endif
 
+#define fio_get_thread_affinity(mask)  \
+       pthread_getaffinity_np(pthread_self(), sizeof(mask), &(mask))
+
 #define fio_cpu_clear(mask, cpu)       (void) CPU_CLR((cpu), (mask))
 #define fio_cpu_set(mask, cpu)         (void) CPU_SET((cpu), (mask))
 #define fio_cpu_isset(mask, cpu)       (CPU_ISSET((cpu), (mask)) != 0)
diff --git a/parse.c b/parse.c
index 44bf950768d9b51bcc3e43ba30c2564614a6e4c6..45f4f2d3dd6db800f4f8e11a168c8ef36e788d3b 100644 (file)
--- a/parse.c
+++ b/parse.c
@@ -37,6 +37,7 @@ static const char *opt_type_names[] = {
        "OPT_BOOL",
        "OPT_FLOAT_LIST",
        "OPT_STR_SET",
+       "OPT_STR_VAL_ZONE",
        "OPT_DEPRECATED",
        "OPT_SOFT_DEPRECATED",
        "OPT_UNSUPPORTED",
@@ -599,9 +600,35 @@ static int __handle_option(const struct fio_option *o, const char *ptr,
                fallthrough;
        case FIO_OPT_ULL:
        case FIO_OPT_INT:
-       case FIO_OPT_STR_VAL: {
+       case FIO_OPT_STR_VAL:
+       case FIO_OPT_STR_VAL_ZONE:
+       {
                fio_opt_str_val_fn *fn = o->cb;
                char tmp[128], *p;
+               size_t len = strlen(ptr);
+
+               if (len > 0 && ptr[len - 1] == 'z') {
+                       if (o->type == FIO_OPT_STR_VAL_ZONE) {
+                               char *ep;
+                               unsigned long long val;
+
+                               errno = 0;
+                               val = strtoul(ptr, &ep, 10);
+                               if (errno == 0 && ep != ptr && *ep == 'z') {
+                                       ull = ZONE_BASE_VAL + (uint32_t)val;
+                                       ret = 0;
+                                       goto store_option_value;
+                               } else {
+                                       log_err("%s: unexpected zone value '%s'\n",
+                                               o->name, ptr);
+                                       return 1;
+                               }
+                       } else {
+                               log_err("%s: 'z' suffix isn't applicable\n",
+                                       o->name);
+                               return 1;
+                       }
+               }
 
                if (!is_time && o->is_time)
                        is_time = o->is_time;
@@ -655,6 +682,7 @@ static int __handle_option(const struct fio_option *o, const char *ptr,
                        }
                }
 
+store_option_value:
                if (fn)
                        ret = fn(data, &ull);
                else {
diff --git a/parse.h b/parse.h
index e6663ed484ed343b096ebc33a28a52560f642aea..d68484eaf0c65572352222297162ebd0d20cf7e7 100644 (file)
--- a/parse.h
+++ b/parse.h
@@ -21,6 +21,7 @@ enum fio_opt_type {
        FIO_OPT_BOOL,
        FIO_OPT_FLOAT_LIST,
        FIO_OPT_STR_SET,
+       FIO_OPT_STR_VAL_ZONE,
        FIO_OPT_DEPRECATED,
        FIO_OPT_SOFT_DEPRECATED,
        FIO_OPT_UNSUPPORTED,    /* keep this last */
@@ -130,12 +131,18 @@ static inline void *td_var(void *to, const struct fio_option *o,
 
 static inline int parse_is_percent(unsigned long long val)
 {
-       return val <= -1ULL && val >= (-1ULL - 100ULL);
+       return val >= -101ULL;
 }
 
+#define ZONE_BASE_VAL ((-1ULL >> 1) + 1)
 static inline int parse_is_percent_uncapped(unsigned long long val)
 {
-       return (long long)val <= -1;
+       return ZONE_BASE_VAL + -1U < val;
+}
+
+static inline int parse_is_zone(unsigned long long val)
+{
+       return (val - ZONE_BASE_VAL) <= -1U;
 }
 
 struct print_option {
index 1b65297ec25feb166e3f39e6b01b7c081a96fa42..8daefbabfeae93f6c260c0b74eec6fedc7bbd973 100644 (file)
--- a/server.c
+++ b/server.c
@@ -1909,7 +1909,7 @@ static int fio_append_iolog_gz(struct sk_entry *first, struct io_log *log)
                        break;
                }
                flist_add_tail(&entry->list, &first->next);
-       } while (ret != Z_STREAM_END);
+       }
 
        ret = deflateEnd(&stream);
        if (ret == Z_OK)
index 9256d44c5001c2daa72232407650ecef36d4fdfa..b45b319ba2013e56ecc87056e8e0b6ca1aaec6e2 100644 (file)
--- a/server.h
+++ b/server.h
@@ -48,7 +48,7 @@ struct fio_net_cmd_reply {
 };
 
 enum {
-       FIO_SERVER_VER                  = 87,
+       FIO_SERVER_VER                  = 89,
 
        FIO_SERVER_MAX_FRAGMENT_PDU     = 1024,
        FIO_SERVER_MAX_CMD_MB           = 2048,
diff --git a/stat.c b/stat.c
index b723795301cc4c9bb3e71ad31450b6a6af97366b..b7222f465f63a4c785ce080f523c89e2ec6c91f1 100644 (file)
--- a/stat.c
+++ b/stat.c
@@ -282,6 +282,46 @@ bool calc_lat(struct io_stat *is, unsigned long long *min,
        return true;
 }
 
+void show_mixed_group_stats(struct group_run_stats *rs, struct buf_output *out) 
+{
+       char *io, *agg, *min, *max;
+       char *ioalt, *aggalt, *minalt, *maxalt;
+       uint64_t io_mix = 0, agg_mix = 0, min_mix = -1, max_mix = 0, min_run = -1, max_run = 0;
+       int i;
+       const int i2p = is_power_of_2(rs->kb_base);
+
+       for (i = 0; i < DDIR_RWDIR_CNT; i++) {
+               if (!rs->max_run[i])
+                       continue;
+               io_mix += rs->iobytes[i];
+               agg_mix += rs->agg[i];
+               min_mix = min_mix < rs->min_bw[i] ? min_mix : rs->min_bw[i];
+               max_mix = max_mix > rs->max_bw[i] ? max_mix : rs->max_bw[i];
+               min_run = min_run < rs->min_run[i] ? min_run : rs->min_run[i];
+               max_run = max_run > rs->max_run[i] ? max_run : rs->max_run[i];
+       }
+       io = num2str(io_mix, rs->sig_figs, 1, i2p, N2S_BYTE);
+       ioalt = num2str(io_mix, rs->sig_figs, 1, !i2p, N2S_BYTE);
+       agg = num2str(agg_mix, rs->sig_figs, 1, i2p, rs->unit_base);
+       aggalt = num2str(agg_mix, rs->sig_figs, 1, !i2p, rs->unit_base);
+       min = num2str(min_mix, rs->sig_figs, 1, i2p, rs->unit_base);
+       minalt = num2str(min_mix, rs->sig_figs, 1, !i2p, rs->unit_base);
+       max = num2str(max_mix, rs->sig_figs, 1, i2p, rs->unit_base);
+       maxalt = num2str(max_mix, rs->sig_figs, 1, !i2p, rs->unit_base);
+       log_buf(out, "  MIXED: bw=%s (%s), %s-%s (%s-%s), io=%s (%s), run=%llu-%llumsec\n",
+                       agg, aggalt, min, max, minalt, maxalt, io, ioalt,
+                       (unsigned long long) min_run,
+                       (unsigned long long) max_run);
+       free(io);
+       free(agg);
+       free(min);
+       free(max);
+       free(ioalt);
+       free(aggalt);
+       free(minalt);
+       free(maxalt);
+}
+
 void show_group_stats(struct group_run_stats *rs, struct buf_output *out)
 {
        char *io, *agg, *min, *max;
@@ -306,7 +346,7 @@ void show_group_stats(struct group_run_stats *rs, struct buf_output *out)
                max = num2str(rs->max_bw[i], rs->sig_figs, 1, i2p, rs->unit_base);
                maxalt = num2str(rs->max_bw[i], rs->sig_figs, 1, !i2p, rs->unit_base);
                log_buf(out, "%s: bw=%s (%s), %s-%s (%s-%s), io=%s (%s), run=%llu-%llumsec\n",
-                               rs->unified_rw_rep ? "  MIXED" : str[i],
+                               (rs->unified_rw_rep == UNIFIED_MIXED) ? "  MIXED" : str[i],
                                agg, aggalt, min, max, minalt, maxalt, io, ioalt,
                                (unsigned long long) rs->min_run[i],
                                (unsigned long long) rs->max_run[i]);
@@ -320,6 +360,10 @@ void show_group_stats(struct group_run_stats *rs, struct buf_output *out)
                free(minalt);
                free(maxalt);
        }
+       
+       /* Need to aggregate statisitics to show mixed values */
+       if (rs->unified_rw_rep == UNIFIED_BOTH) 
+               show_mixed_group_stats(rs, out);
 }
 
 void stat_calc_dist(uint64_t *map, unsigned long total, double *io_u_dist)
@@ -426,6 +470,168 @@ static double convert_agg_kbytes_percent(struct group_run_stats *rs, int ddir, i
        return p_of_agg;
 }
 
+static void show_mixed_ddir_status(struct group_run_stats *rs, struct thread_stat *ts,
+                            struct buf_output *out)
+{
+       unsigned long runt;
+       unsigned long long min, max, bw, iops;
+       double mean, dev;
+       char *io_p, *bw_p, *bw_p_alt, *iops_p, *post_st = NULL;
+       struct thread_stat *ts_lcl;
+
+       int i2p;
+       int ddir = 0, i;
+
+       /* Handle aggregation of Reads (ddir = 0), Writes (ddir = 1), and Trims (ddir = 2) */
+       ts_lcl = malloc(sizeof(struct thread_stat));
+       memset((void *)ts_lcl, 0, sizeof(struct thread_stat));
+       ts_lcl->unified_rw_rep = UNIFIED_MIXED;               /* calculate mixed stats  */
+       for (i = 0; i < DDIR_RWDIR_CNT; i++) {
+               ts_lcl->clat_stat[i].min_val = ULONG_MAX;
+               ts_lcl->slat_stat[i].min_val = ULONG_MAX;
+               ts_lcl->lat_stat[i].min_val = ULONG_MAX;
+               ts_lcl->bw_stat[i].min_val = ULONG_MAX;
+               ts_lcl->iops_stat[i].min_val = ULONG_MAX;
+               ts_lcl->clat_high_prio_stat[i].min_val = ULONG_MAX;
+               ts_lcl->clat_low_prio_stat[i].min_val = ULONG_MAX;
+       }
+       ts_lcl->sync_stat.min_val = ULONG_MAX;
+
+       sum_thread_stats(ts_lcl, ts, 1);
+
+       assert(ddir_rw(ddir));
+
+       if (!ts_lcl->runtime[ddir])
+               return;
+
+       i2p = is_power_of_2(rs->kb_base);
+       runt = ts_lcl->runtime[ddir];
+
+       bw = (1000 * ts_lcl->io_bytes[ddir]) / runt;
+       io_p = num2str(ts_lcl->io_bytes[ddir], ts->sig_figs, 1, i2p, N2S_BYTE);
+       bw_p = num2str(bw, ts->sig_figs, 1, i2p, ts->unit_base);
+       bw_p_alt = num2str(bw, ts->sig_figs, 1, !i2p, ts->unit_base);
+
+       iops = (1000 * ts_lcl->total_io_u[ddir]) / runt;
+       iops_p = num2str(iops, ts->sig_figs, 1, 0, N2S_NONE);
+
+       log_buf(out, "  mixed: IOPS=%s, BW=%s (%s)(%s/%llumsec)%s\n",
+                       iops_p, bw_p, bw_p_alt, io_p,
+                       (unsigned long long) ts_lcl->runtime[ddir],
+                       post_st ? : "");
+
+       free(post_st);
+       free(io_p);
+       free(bw_p);
+       free(bw_p_alt);
+       free(iops_p);
+
+       if (calc_lat(&ts_lcl->slat_stat[ddir], &min, &max, &mean, &dev))
+               display_lat("slat", min, max, mean, dev, out);
+       if (calc_lat(&ts_lcl->clat_stat[ddir], &min, &max, &mean, &dev))
+               display_lat("clat", min, max, mean, dev, out);
+       if (calc_lat(&ts_lcl->lat_stat[ddir], &min, &max, &mean, &dev))
+               display_lat(" lat", min, max, mean, dev, out);
+       if (calc_lat(&ts_lcl->clat_high_prio_stat[ddir], &min, &max, &mean, &dev)) {
+               display_lat(ts_lcl->lat_percentiles ? "high prio_lat" : "high prio_clat",
+                               min, max, mean, dev, out);
+               if (calc_lat(&ts_lcl->clat_low_prio_stat[ddir], &min, &max, &mean, &dev))
+                       display_lat(ts_lcl->lat_percentiles ? "low prio_lat" : "low prio_clat",
+                                       min, max, mean, dev, out);
+       }
+
+       if (ts->slat_percentiles && ts_lcl->slat_stat[ddir].samples > 0)
+               show_clat_percentiles(ts_lcl->io_u_plat[FIO_SLAT][ddir],
+                               ts_lcl->slat_stat[ddir].samples,
+                               ts->percentile_list,
+                               ts->percentile_precision, "slat", out);
+       if (ts->clat_percentiles && ts_lcl->clat_stat[ddir].samples > 0)
+               show_clat_percentiles(ts_lcl->io_u_plat[FIO_CLAT][ddir],
+                               ts_lcl->clat_stat[ddir].samples,
+                               ts->percentile_list,
+                               ts->percentile_precision, "clat", out);
+       if (ts->lat_percentiles && ts_lcl->lat_stat[ddir].samples > 0)
+               show_clat_percentiles(ts_lcl->io_u_plat[FIO_LAT][ddir],
+                               ts_lcl->lat_stat[ddir].samples,
+                               ts->percentile_list,
+                               ts->percentile_precision, "lat", out);
+
+       if (ts->clat_percentiles || ts->lat_percentiles) {
+               const char *name = ts->lat_percentiles ? "lat" : "clat";
+               char prio_name[32];
+               uint64_t samples;
+
+               if (ts->lat_percentiles)
+                       samples = ts_lcl->lat_stat[ddir].samples;
+               else
+                       samples = ts_lcl->clat_stat[ddir].samples;
+
+               /* Only print this if some high and low priority stats were collected */
+               if (ts_lcl->clat_high_prio_stat[ddir].samples > 0 &&
+                               ts_lcl->clat_low_prio_stat[ddir].samples > 0)
+               {
+                       sprintf(prio_name, "high prio (%.2f%%) %s",
+                                       100. * (double) ts_lcl->clat_high_prio_stat[ddir].samples / (double) samples,
+                                       name);
+                       show_clat_percentiles(ts_lcl->io_u_plat_high_prio[ddir],
+                                       ts_lcl->clat_high_prio_stat[ddir].samples,
+                                       ts->percentile_list,
+                                       ts->percentile_precision, prio_name, out);
+
+                       sprintf(prio_name, "low prio (%.2f%%) %s",
+                                       100. * (double) ts_lcl->clat_low_prio_stat[ddir].samples / (double) samples,
+                                       name);
+                       show_clat_percentiles(ts_lcl->io_u_plat_low_prio[ddir],
+                                       ts_lcl->clat_low_prio_stat[ddir].samples,
+                                       ts->percentile_list,
+                                       ts->percentile_precision, prio_name, out);
+               }
+       }
+
+       if (calc_lat(&ts_lcl->bw_stat[ddir], &min, &max, &mean, &dev)) {
+               double p_of_agg = 100.0, fkb_base = (double)rs->kb_base;
+               const char *bw_str;
+
+               if ((rs->unit_base == 1) && i2p)
+                       bw_str = "Kibit";
+               else if (rs->unit_base == 1)
+                       bw_str = "kbit";
+               else if (i2p)
+                       bw_str = "KiB";
+               else
+                       bw_str = "kB";
+
+               p_of_agg = convert_agg_kbytes_percent(rs, ddir, mean);
+
+               if (rs->unit_base == 1) {
+                       min *= 8.0;
+                       max *= 8.0;
+                       mean *= 8.0;
+                       dev *= 8.0;
+               }
+
+               if (mean > fkb_base * fkb_base) {
+                       min /= fkb_base;
+                       max /= fkb_base;
+                       mean /= fkb_base;
+                       dev /= fkb_base;
+                       bw_str = (rs->unit_base == 1 ? "Mibit" : "MiB");
+               }
+
+               log_buf(out, "   bw (%5s/s): min=%5llu, max=%5llu, per=%3.2f%%, "
+                       "avg=%5.02f, stdev=%5.02f, samples=%" PRIu64 "\n",
+                       bw_str, min, max, p_of_agg, mean, dev,
+                       (&ts_lcl->bw_stat[ddir])->samples);
+       }
+       if (calc_lat(&ts_lcl->iops_stat[ddir], &min, &max, &mean, &dev)) {
+               log_buf(out, "   iops        : min=%5llu, max=%5llu, "
+                       "avg=%5.02f, stdev=%5.02f, samples=%" PRIu64 "\n",
+                       min, max, mean, dev, (&ts_lcl->iops_stat[ddir])->samples);
+       }
+
+       free(ts_lcl);
+}
+
 static void show_ddir_status(struct group_run_stats *rs, struct thread_stat *ts,
                             int ddir, struct buf_output *out)
 {
@@ -477,7 +683,7 @@ static void show_ddir_status(struct group_run_stats *rs, struct thread_stat *ts,
        }
 
        log_buf(out, "  %s: IOPS=%s, BW=%s (%s)(%s/%llumsec)%s\n",
-                       rs->unified_rw_rep ? "mixed" : io_ddir_name(ddir),
+                       (ts->unified_rw_rep == UNIFIED_MIXED) ? "mixed" : io_ddir_name(ddir),
                        iops_p, bw_p, bw_p_alt, io_p,
                        (unsigned long long) ts->runtime[ddir],
                        post_st ? : "");
@@ -1083,6 +1289,9 @@ static void show_thread_status_normal(struct thread_stat *ts,
                        show_ddir_status(rs, ts, ddir, out);
        }
 
+       if (ts->unified_rw_rep == UNIFIED_BOTH)
+               show_mixed_ddir_status(rs, ts, out);
+
        show_latencies(ts, out);
 
        if (ts->sync_stat.samples)
@@ -1205,7 +1414,7 @@ static void show_ddir_status_terse(struct thread_stat *ts,
                                        &minv);
        else
                len = 0;
-
+       
        for (i = 0; i < FIO_IO_U_LIST_MAX_LEN; i++) {
                if (i >= len) {
                        log_buf(out, ";0%%=0");
@@ -1249,6 +1458,40 @@ static void show_ddir_status_terse(struct thread_stat *ts,
        }
 }
 
+static void show_mixed_ddir_status_terse(struct thread_stat *ts,
+                                  struct group_run_stats *rs,
+                                  int ver, struct buf_output *out)
+{
+       struct thread_stat *ts_lcl;
+       int i;
+
+       /* Handle aggregation of Reads (ddir = 0), Writes (ddir = 1), and Trims (ddir = 2) */
+       ts_lcl = malloc(sizeof(struct thread_stat));
+       memset((void *)ts_lcl, 0, sizeof(struct thread_stat));
+       ts_lcl->unified_rw_rep = UNIFIED_MIXED;               /* calculate mixed stats  */
+       for (i = 0; i < DDIR_RWDIR_CNT; i++) {
+               ts_lcl->clat_stat[i].min_val = ULONG_MAX;
+               ts_lcl->slat_stat[i].min_val = ULONG_MAX;
+               ts_lcl->lat_stat[i].min_val = ULONG_MAX;
+               ts_lcl->bw_stat[i].min_val = ULONG_MAX;
+               ts_lcl->iops_stat[i].min_val = ULONG_MAX;
+               ts_lcl->clat_high_prio_stat[i].min_val = ULONG_MAX;
+               ts_lcl->clat_low_prio_stat[i].min_val = ULONG_MAX;
+       }
+       ts_lcl->sync_stat.min_val = ULONG_MAX;
+       ts_lcl->lat_percentiles = ts->lat_percentiles;
+       ts_lcl->clat_percentiles = ts->clat_percentiles;
+       ts_lcl->slat_percentiles = ts->slat_percentiles;
+       ts_lcl->percentile_precision = ts->percentile_precision;                
+       memcpy(ts_lcl->percentile_list, ts->percentile_list, sizeof(ts->percentile_list));
+       
+       sum_thread_stats(ts_lcl, ts, 1);
+
+       /* add the aggregated stats to json parent */
+       show_ddir_status_terse(ts_lcl, rs, DDIR_READ, ver, out);
+       free(ts_lcl);
+}
+
 static struct json_object *add_ddir_lat_json(struct thread_stat *ts, uint32_t percentiles,
                struct io_stat *lat_stat, uint64_t *io_u_plat)
 {
@@ -1310,12 +1553,12 @@ static void add_ddir_status_json(struct thread_stat *ts,
 
        assert(ddir_rw(ddir) || ddir_sync(ddir));
 
-       if (ts->unified_rw_rep && ddir != DDIR_READ)
+       if ((ts->unified_rw_rep == UNIFIED_MIXED) && ddir != DDIR_READ)
                return;
 
        dir_object = json_create_object();
        json_object_add_value_object(parent,
-               ts->unified_rw_rep ? "mixed" : io_ddir_name(ddir), dir_object);
+               (ts->unified_rw_rep == UNIFIED_MIXED) ? "mixed" : io_ddir_name(ddir), dir_object);
 
        if (ddir_rw(ddir)) {
                bw_bytes = 0;
@@ -1418,6 +1661,39 @@ static void add_ddir_status_json(struct thread_stat *ts,
        }
 }
 
+static void add_mixed_ddir_status_json(struct thread_stat *ts,
+               struct group_run_stats *rs, struct json_object *parent)
+{
+       struct thread_stat *ts_lcl;
+       int i;
+
+       /* Handle aggregation of Reads (ddir = 0), Writes (ddir = 1), and Trims (ddir = 2) */
+       ts_lcl = malloc(sizeof(struct thread_stat));
+       memset((void *)ts_lcl, 0, sizeof(struct thread_stat));
+       ts_lcl->unified_rw_rep = UNIFIED_MIXED;               /* calculate mixed stats  */
+       for (i = 0; i < DDIR_RWDIR_CNT; i++) {
+               ts_lcl->clat_stat[i].min_val = ULONG_MAX;
+               ts_lcl->slat_stat[i].min_val = ULONG_MAX;
+               ts_lcl->lat_stat[i].min_val = ULONG_MAX;
+               ts_lcl->bw_stat[i].min_val = ULONG_MAX;
+               ts_lcl->iops_stat[i].min_val = ULONG_MAX;
+               ts_lcl->clat_high_prio_stat[i].min_val = ULONG_MAX;
+               ts_lcl->clat_low_prio_stat[i].min_val = ULONG_MAX;
+       }
+       ts_lcl->sync_stat.min_val = ULONG_MAX;
+       ts_lcl->lat_percentiles = ts->lat_percentiles;
+       ts_lcl->clat_percentiles = ts->clat_percentiles;
+       ts_lcl->slat_percentiles = ts->slat_percentiles;
+       ts_lcl->percentile_precision = ts->percentile_precision;                
+       memcpy(ts_lcl->percentile_list, ts->percentile_list, sizeof(ts->percentile_list));
+
+       sum_thread_stats(ts_lcl, ts, 1);
+
+       /* add the aggregated stats to json parent */
+       add_ddir_status_json(ts_lcl, rs, DDIR_READ, parent);
+       free(ts_lcl);
+}
+
 static void show_thread_status_terse_all(struct thread_stat *ts,
                                         struct group_run_stats *rs, int ver,
                                         struct buf_output *out)
@@ -1435,14 +1711,17 @@ static void show_thread_status_terse_all(struct thread_stat *ts,
                log_buf(out, "%d;%s;%s;%d;%d", ver, fio_version_string,
                        ts->name, ts->groupid, ts->error);
 
-       /* Log Read Status */
+       /* Log Read Status, or mixed if unified_rw_rep = 1 */
        show_ddir_status_terse(ts, rs, DDIR_READ, ver, out);
-       /* Log Write Status */
-       show_ddir_status_terse(ts, rs, DDIR_WRITE, ver, out);
-       /* Log Trim Status */
-       if (ver == 2 || ver == 4 || ver == 5)
-               show_ddir_status_terse(ts, rs, DDIR_TRIM, ver, out);
-
+       if (ts->unified_rw_rep != UNIFIED_MIXED) {
+               /* Log Write Status */
+               show_ddir_status_terse(ts, rs, DDIR_WRITE, ver, out);
+               /* Log Trim Status */
+               if (ver == 2 || ver == 4 || ver == 5)
+                       show_ddir_status_terse(ts, rs, DDIR_TRIM, ver, out);
+       }
+       if (ts->unified_rw_rep == UNIFIED_BOTH)
+               show_mixed_ddir_status_terse(ts, rs, ver, out);
        /* CPU Usage */
        if (ts->total_run_time) {
                double runt = (double) ts->total_run_time;
@@ -1547,6 +1826,9 @@ static struct json_object *show_thread_status_json(struct thread_stat *ts,
        add_ddir_status_json(ts, rs, DDIR_TRIM, root);
        add_ddir_status_json(ts, rs, DDIR_SYNC, root);
 
+       if (ts->unified_rw_rep == UNIFIED_BOTH)
+               add_mixed_ddir_status_json(ts, rs, root);
+
        /* CPU Usage */
        if (ts->total_run_time) {
                double runt = (double) ts->total_run_time;
@@ -1875,7 +2157,7 @@ void sum_thread_stats(struct thread_stat *dst, struct thread_stat *src,
        int k, l, m;
 
        for (l = 0; l < DDIR_RWDIR_CNT; l++) {
-               if (!dst->unified_rw_rep) {
+               if (!(dst->unified_rw_rep == UNIFIED_MIXED)) {
                        sum_stat(&dst->clat_stat[l], &src->clat_stat[l], first, false);
                        sum_stat(&dst->clat_high_prio_stat[l], &src->clat_high_prio_stat[l], first, false);
                        sum_stat(&dst->clat_low_prio_stat[l], &src->clat_low_prio_stat[l], first, false);
@@ -1931,7 +2213,7 @@ void sum_thread_stats(struct thread_stat *dst, struct thread_stat *src,
                dst->io_u_lat_m[k] += src->io_u_lat_m[k];
 
        for (k = 0; k < DDIR_RWDIR_CNT; k++) {
-               if (!dst->unified_rw_rep) {
+               if (!(dst->unified_rw_rep == UNIFIED_MIXED)) {
                        dst->total_io_u[k] += src->total_io_u[k];
                        dst->short_io_u[k] += src->short_io_u[k];
                        dst->drop_io_u[k] += src->drop_io_u[k];
@@ -1947,7 +2229,7 @@ void sum_thread_stats(struct thread_stat *dst, struct thread_stat *src,
        for (k = 0; k < FIO_LAT_CNT; k++)
                for (l = 0; l < DDIR_RWDIR_CNT; l++)
                        for (m = 0; m < FIO_IO_U_PLAT_NR; m++)
-                               if (!dst->unified_rw_rep)
+                               if (!(dst->unified_rw_rep == UNIFIED_MIXED))
                                        dst->io_u_plat[k][l][m] += src->io_u_plat[k][l][m];
                                else
                                        dst->io_u_plat[k][0][m] += src->io_u_plat[k][l][m];
@@ -1957,7 +2239,7 @@ void sum_thread_stats(struct thread_stat *dst, struct thread_stat *src,
 
        for (k = 0; k < DDIR_RWDIR_CNT; k++) {
                for (m = 0; m < FIO_IO_U_PLAT_NR; m++) {
-                       if (!dst->unified_rw_rep) {
+                       if (!(dst->unified_rw_rep == UNIFIED_MIXED)) {
                                dst->io_u_plat_high_prio[k][m] += src->io_u_plat_high_prio[k][m];
                                dst->io_u_plat_low_prio[k][m] += src->io_u_plat_low_prio[k][m];
                        } else {
@@ -2166,7 +2448,7 @@ void __show_run_stats(void)
                rs->kb_base = ts->kb_base;
                rs->unit_base = ts->unit_base;
                rs->sig_figs = ts->sig_figs;
-               rs->unified_rw_rep += ts->unified_rw_rep;
+               rs->unified_rw_rep |= ts->unified_rw_rep;
 
                for (j = 0; j < DDIR_RWDIR_CNT; j++) {
                        if (!ts->runtime[j])
diff --git a/stat.h b/stat.h
index 6dd5ef743a0803b79d5652615dabba416c01dad3..d08d4dc09780720eea3f1fa5896d12a11bae5e40 100644 (file)
--- a/stat.h
+++ b/stat.h
@@ -146,6 +146,9 @@ enum block_info_state {
 #define FIO_JOBNAME_SIZE       128
 #define FIO_JOBDESC_SIZE       256
 #define FIO_VERROR_SIZE                128
+#define UNIFIED_SPLIT          0
+#define UNIFIED_MIXED          1
+#define UNIFIED_BOTH           2
 
 enum fio_lat {
        FIO_SLAT = 0,
index 044f9195679566f802460672a3b9c6d96f33be07..ff4c7a7c01807ed46bd73dca4da85de0e606158d 100644 (file)
@@ -233,8 +233,7 @@ static int prep_more_ios(struct submitter *s, int max_ios)
        next_tail = tail = *ring->tail;
        do {
                next_tail++;
-               read_barrier();
-               if (next_tail == *ring->head)
+               if (next_tail == atomic_load_acquire(ring->head))
                        break;
 
                index = tail & sq_ring_mask;
@@ -244,10 +243,8 @@ static int prep_more_ios(struct submitter *s, int max_ios)
                tail = next_tail;
        } while (prepped < max_ios);
 
-       if (*ring->tail != tail) {
-               *ring->tail = tail;
-               write_barrier();
-       }
+       if (prepped)
+               atomic_store_release(ring->tail, tail);
        return prepped;
 }
 
@@ -284,7 +281,7 @@ static int reap_events(struct submitter *s)
                struct file *f;
 
                read_barrier();
-               if (head == *ring->tail)
+               if (head == atomic_load_acquire(ring->tail))
                        break;
                cqe = &ring->cqes[head & cq_ring_mask];
                if (!do_nop) {
@@ -301,9 +298,10 @@ static int reap_events(struct submitter *s)
                head++;
        } while (1);
 
-       s->inflight -= reaped;
-       *ring->head = head;
-       write_barrier();
+       if (reaped) {
+               s->inflight -= reaped;
+               atomic_store_release(ring->head, head);
+       }
        return reaped;
 }
 
@@ -320,6 +318,7 @@ static void *submitter_fn(void *data)
        prepped = 0;
        do {
                int to_wait, to_submit, this_reap, to_prep;
+               unsigned ring_flags = 0;
 
                if (!prepped && s->inflight < depth) {
                        to_prep = min(depth - s->inflight, batch_submit);
@@ -338,15 +337,20 @@ submit:
                 * Only need to call io_uring_enter if we're not using SQ thread
                 * poll, or if IORING_SQ_NEED_WAKEUP is set.
                 */
-               if (!sq_thread_poll || (*ring->flags & IORING_SQ_NEED_WAKEUP)) {
+               if (sq_thread_poll)
+                       ring_flags = atomic_load_acquire(ring->flags);
+               if (!sq_thread_poll || ring_flags & IORING_SQ_NEED_WAKEUP) {
                        unsigned flags = 0;
 
                        if (to_wait)
                                flags = IORING_ENTER_GETEVENTS;
-                       if ((*ring->flags & IORING_SQ_NEED_WAKEUP))
+                       if (ring_flags & IORING_SQ_NEED_WAKEUP)
                                flags |= IORING_ENTER_SQ_WAKEUP;
                        ret = io_uring_enter(s, to_submit, to_wait, flags);
                        s->calls++;
+               } else {
+                       /* for SQPOLL, we submitted it all effectively */
+                       ret = to_submit;
                }
 
                /*
index 1658dc25013be4c0714538bdd58fc2dcabeb03f8..26aff3731b8c5933a3e7358f1342cd9a5942f3c1 100755 (executable)
@@ -1153,6 +1153,72 @@ test54() {
                >> "${logfile}.${test_number}" 2>&1 || return $?
 }
 
+# test 'z' suffix parsing only
+test55() {
+       local bs
+       bs=$((logical_block_size))
+
+       require_zbd || return $SKIP_TESTCASE
+       # offset=1z + offset_increment=10z + size=2z
+       require_seq_zones 13 || return $SKIP_TESTCASE
+
+       run_fio --name=j                \
+               --filename=${dev}       \
+               --direct=1              \
+               "$(ioengine "psync")"   \
+               --zonemode=zbd          \
+               --zonesize=${zone_size} \
+               --rw=write              \
+               --bs=${bs}              \
+               --numjobs=2             \
+               --offset_increment=10z  \
+               --offset=1z             \
+               --size=2z               \
+               --io_size=3z            \
+               ${job_var_opts[@]} --debug=zbd \
+               >> "${logfile}.${test_number}" 2>&1 || return $?
+}
+
+# test 'z' suffix parsing only
+test56() {
+       local bs
+       bs=$((logical_block_size))
+
+       require_regular_block_dev || return $SKIP_TESTCASE
+       require_seq_zones 10 || return $SKIP_TESTCASE
+
+       run_fio --name=j                \
+               --filename=${dev}       \
+               --direct=1              \
+               "$(ioengine "psync")"   \
+               --zonemode=strided      \
+               --zonesize=${zone_size} \
+               --rw=write              \
+               --bs=${bs}              \
+               --size=10z              \
+               --zoneskip=2z           \
+               ${job_var_opts[@]} --debug=zbd \
+               >> "${logfile}.${test_number}" 2>&1 || return $?
+}
+
+# Test that repeated async write job does not cause zone reset during writes
+# in-flight, when the block size is not a divisor of the zone size.
+test57() {
+       local bs off
+
+       require_zbd || return $SKIP_TESTCASE
+
+       bs=$((4096 * 7))
+       off=$((first_sequential_zone_sector * 512))
+
+       run_fio --name=job --filename="${dev}" --rw=randwrite --bs="${bs}" \
+               --offset="${off}" --size=$((4 * zone_size)) --iodepth=256 \
+               "$(ioengine "libaio")" --time_based=1 --runtime=30s \
+               --zonemode=zbd --direct=1 --zonesize="${zone_size}" \
+               ${job_var_opts[@]} \
+               >> "${logfile}.${test_number}" 2>&1 || return $?
+}
+
 SECONDS=0
 tests=()
 dynamic_analyzer=()
index f6b15403c4c21df1c183b47337e71a6018e1fe62..5ecc72d7b590c37615d3625bff10a0ea907a5f1d 100644 (file)
@@ -83,13 +83,16 @@ struct thread_options {
        unsigned long long size;
        unsigned long long io_size;
        unsigned int size_percent;
+       unsigned int size_nz;
        unsigned int io_size_percent;
+       unsigned int io_size_nz;
        unsigned int fill_device;
        unsigned int file_append;
        unsigned long long file_size_low;
        unsigned long long file_size_high;
        unsigned long long start_offset;
        unsigned long long start_offset_align;
+       unsigned int start_offset_nz;
 
        unsigned long long bs[DDIR_RWDIR_CNT];
        unsigned long long ba[DDIR_RWDIR_CNT];
@@ -198,12 +201,13 @@ struct thread_options {
        unsigned long long zone_size;
        unsigned long long zone_capacity;
        unsigned long long zone_skip;
+       uint32_t zone_skip_nz;
        enum fio_zone_mode zone_mode;
        unsigned long long lockmem;
        enum fio_memtype mem_type;
        unsigned int mem_align;
 
-       unsigned long long max_latency;
+       unsigned long long max_latency[DDIR_RWDIR_CNT];
 
        unsigned int exit_what;
        unsigned int stonewall;
@@ -315,6 +319,7 @@ struct thread_options {
        unsigned int gid;
 
        unsigned int offset_increment_percent;
+       unsigned int offset_increment_nz;
        unsigned long long offset_increment;
        unsigned long long number_ios;
 
@@ -384,14 +389,19 @@ struct thread_options_pack {
        uint64_t size;
        uint64_t io_size;
        uint32_t size_percent;
+       uint32_t size_nz;
        uint32_t io_size_percent;
+       uint32_t io_size_nz;
        uint32_t fill_device;
        uint32_t file_append;
        uint32_t unique_filename;
+       uint32_t pad3;
        uint64_t file_size_low;
        uint64_t file_size_high;
        uint64_t start_offset;
        uint64_t start_offset_align;
+       uint32_t start_offset_nz;
+       uint32_t pad4;
 
        uint64_t bs[DDIR_RWDIR_CNT];
        uint64_t ba[DDIR_RWDIR_CNT];
@@ -464,8 +474,6 @@ struct thread_options_pack {
        struct zone_split zone_split[DDIR_RWDIR_CNT][ZONESPLIT_MAX];
        uint32_t zone_split_nr[DDIR_RWDIR_CNT];
 
-       uint8_t pad1[4];
-
        fio_fp64_t zipf_theta;
        fio_fp64_t pareto_h;
        fio_fp64_t gauss_dev;
@@ -501,6 +509,7 @@ struct thread_options_pack {
        uint64_t zone_capacity;
        uint64_t zone_skip;
        uint64_t lockmem;
+       uint32_t zone_skip_nz;
        uint32_t mem_type;
        uint32_t mem_align;
 
@@ -509,8 +518,6 @@ struct thread_options_pack {
        uint32_t new_group;
        uint32_t numjobs;
 
-       uint8_t pad3[4];
-
        /*
         * We currently can't convert these, so don't enable them
         */
@@ -616,12 +623,14 @@ struct thread_options_pack {
        uint32_t gid;
 
        uint32_t offset_increment_percent;
+       uint32_t offset_increment_nz;
        uint64_t offset_increment;
        uint64_t number_ios;
 
        uint64_t latency_target;
        uint64_t latency_window;
-       uint64_t max_latency;
+       uint64_t max_latency[DDIR_RWDIR_CNT];
+       uint32_t pad5;
        fio_fp64_t latency_percentile;
        uint32_t latency_run;
 
diff --git a/zbd.c b/zbd.c
index 6a26fe108a68acf55c7ad2b7e59e8fccb97f0dc8..eed796b3217d297eb94942b28b4cfbf0ab5113ab 100644 (file)
--- a/zbd.c
+++ b/zbd.c
@@ -285,9 +285,7 @@ static bool zbd_verify_sizes(void)
                                return false;
                        }
 
-                       if (td->o.zone_skip &&
-                           (td->o.zone_skip < td->o.zone_size ||
-                            td->o.zone_skip % td->o.zone_size)) {
+                       if (td->o.zone_skip % td->o.zone_size) {
                                log_err("%s: zoneskip %llu is not a multiple of the device zone size %llu.\n",
                                        f->file_name, (unsigned long long) td->o.zone_skip,
                                        (unsigned long long) td->o.zone_size);
@@ -335,20 +333,21 @@ static bool zbd_verify_bs(void)
 {
        struct thread_data *td;
        struct fio_file *f;
-       uint32_t zone_size;
        int i, j, k;
 
        for_each_td(td, i) {
                for_each_file(td, f, j) {
+                       uint64_t zone_size;
+
                        if (!f->zbd_info)
                                continue;
                        zone_size = f->zbd_info->zone_size;
                        for (k = 0; k < FIO_ARRAY_SIZE(td->o.bs); k++) {
                                if (td->o.verify != VERIFY_NONE &&
                                    zone_size % td->o.bs[k] != 0) {
-                                       log_info("%s: block size %llu is not a divisor of the zone size %d\n",
+                                       log_info("%s: block size %llu is not a divisor of the zone size %llu\n",
                                                 f->file_name, td->o.bs[k],
-                                                zone_size);
+                                                (unsigned long long)zone_size);
                                        return false;
                                }
                        }
@@ -648,7 +647,7 @@ static bool zbd_open_zone(struct thread_data *td, const struct fio_file *f,
 static int zbd_reset_zone(struct thread_data *td, struct fio_file *f,
                          struct fio_zone_info *z);
 
-int zbd_setup_files(struct thread_data *td)
+int zbd_init_files(struct thread_data *td)
 {
        struct fio_file *f;
        int i;
@@ -657,6 +656,44 @@ int zbd_setup_files(struct thread_data *td)
                if (zbd_init_zone_info(td, f))
                        return 1;
        }
+       return 0;
+}
+
+void zbd_recalc_options_with_zone_granularity(struct thread_data *td)
+{
+       struct fio_file *f;
+       int i;
+
+       for_each_file(td, f, i) {
+               struct zoned_block_device_info *zbd = f->zbd_info;
+               // zonemode=strided doesn't get per-file zone size.
+               uint64_t zone_size = zbd ? zbd->zone_size : td->o.zone_size;
+
+               if (zone_size == 0)
+                       continue;
+
+               if (td->o.size_nz > 0) {
+                       td->o.size = td->o.size_nz * zone_size;
+               }
+               if (td->o.io_size_nz > 0) {
+                       td->o.io_size = td->o.io_size_nz * zone_size;
+               }
+               if (td->o.start_offset_nz > 0) {
+                       td->o.start_offset = td->o.start_offset_nz * zone_size;
+               }
+               if (td->o.offset_increment_nz > 0) {
+                       td->o.offset_increment = td->o.offset_increment_nz * zone_size;
+               }
+               if (td->o.zone_skip_nz > 0) {
+                       td->o.zone_skip = td->o.zone_skip_nz * zone_size;
+               }
+       }
+}
+
+int zbd_setup_files(struct thread_data *td)
+{
+       struct fio_file *f;
+       int i;
 
        if (!zbd_using_direct_io()) {
                log_err("Using direct I/O is mandatory for writing to ZBD drives\n\n");
@@ -805,16 +842,13 @@ static void zbd_close_zone(struct thread_data *td, const struct fio_file *f,
  * @f: fio file for which to reset zones
  * @zb: first zone to reset.
  * @ze: first zone not to reset.
- * @all_zones: whether to reset all zones or only those zones for which the
- *     write pointer is not a multiple of td->o.min_bs[DDIR_WRITE].
  */
 static int zbd_reset_zones(struct thread_data *td, struct fio_file *f,
                           struct fio_zone_info *const zb,
-                          struct fio_zone_info *const ze, bool all_zones)
+                          struct fio_zone_info *const ze)
 {
        struct fio_zone_info *z;
        const uint32_t min_bs = td->o.min_bs[DDIR_WRITE];
-       bool reset_wp;
        int res = 0;
 
        assert(min_bs);
@@ -827,16 +861,10 @@ static int zbd_reset_zones(struct thread_data *td, struct fio_file *f,
                if (!z->has_wp)
                        continue;
                zone_lock(td, f, z);
-               if (all_zones) {
-                       pthread_mutex_lock(&f->zbd_info->mutex);
-                       zbd_close_zone(td, f, nz);
-                       pthread_mutex_unlock(&f->zbd_info->mutex);
-
-                       reset_wp = z->wp != z->start;
-               } else {
-                       reset_wp = z->wp % min_bs != 0;
-               }
-               if (reset_wp) {
+               pthread_mutex_lock(&f->zbd_info->mutex);
+               zbd_close_zone(td, f, nz);
+               pthread_mutex_unlock(&f->zbd_info->mutex);
+               if (z->wp != z->start) {
                        dprint(FD_ZBD, "%s: resetting zone %u\n",
                               f->file_name, zbd_zone_nr(f, z));
                        if (zbd_reset_zone(td, f, z) < 0)
@@ -959,8 +987,8 @@ void zbd_file_reset(struct thread_data *td, struct fio_file *f)
         * writing any data to avoid that a zone reset has to be issued while
         * writing data, which causes data loss.
         */
-       zbd_reset_zones(td, f, zb, ze, td->o.verify != VERIFY_NONE &&
-                       td->runstate != TD_VERIFYING);
+       if (td->o.verify != VERIFY_NONE && td->runstate != TD_VERIFYING)
+               zbd_reset_zones(td, f, zb, ze);
        zbd_reset_write_cnt(td, f);
 }
 
diff --git a/zbd.h b/zbd.h
index cc3ab6241e9b35782c0580d8b397edf1f2bba9b4..6453439313f8de4d5c049c371237762f05d8b7d6 100644 (file)
--- a/zbd.h
+++ b/zbd.h
@@ -87,6 +87,8 @@ struct zoned_block_device_info {
        struct fio_zone_info    zone_info[0];
 };
 
+int zbd_init_files(struct thread_data *td);
+void zbd_recalc_options_with_zone_granularity(struct thread_data *td);
 int zbd_setup_files(struct thread_data *td);
 void zbd_free_zone_info(struct fio_file *f);
 void zbd_file_reset(struct thread_data *td, struct fio_file *f);