/tags
/TAGS
/t/zbd/test-zbd-support.log.*
+/t/fuzz/fuzz_parseini
#!/bin/sh
GVF=FIO-VERSION-FILE
-DEF_VER=fio-3.25
+DEF_VER=fio-3.26
LF='
'
behaves in a similar fashion, except it sends the same offset 8 number of
times before generating a new offset.
-.. option:: unified_rw_reporting=bool
+.. option:: unified_rw_reporting=str
Fio normally reports statistics on a per data direction basis, meaning that
- reads, writes, and trims are accounted and reported separately. If this
- option is set fio sums the results and report them as "mixed" instead.
+ reads, writes, and trims are accounted and reported separately. This option
+ determines whether fio reports the results normally, summed together, or as
+ both options.
+ Accepted values are:
+
+ **none**
+ Normal statistics reporting.
+
+ **mixed**
+ Statistics are summed per data direction and reported together.
+
+ **both**
+ Statistics are reported normally, followed by the mixed statistics.
+
+ **0**
+ Backward-compatible alias for **none**.
+
+ **1**
+ Backward-compatible alias for **mixed**.
+
+ **2**
+ Alias for **both**.
.. option:: randrepeat=bool
and 'nrfiles', so that files will be created.
This engine is to measure file lookup and meta data access.
+ **filedelete**
+ Simply delete the files by unlink() and do no I/O to them. You need to set 'filesize'
+ and 'nrfiles', so that the files will be created.
+ This engine is to measure file delete.
+
**libpmem**
Read and write using mmap I/O to a file on a filesystem
mounted with DAX on a persistent memory device through the PMDK
unless :option:`verify` is set or :option:`cuda_io` is `posix`.
:option:`iomem` must not be `cudamalloc`. This ioengine defines
engine specific options.
+ **dfs**
+ I/O engine supporting asynchronous read and write operations to the
+ DAOS File System (DFS) via libdfs.
I/O engine specific parameters
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
this will be the starting port number since fio will use a range of
ports.
- [rdma]
+ [rdma], [librpma_*]
The port to use for RDMA-CM communication. This should be the same value
on the client and the server side.
is a TCP listener or UDP reader, the hostname is not used and must be omitted
unless it is a valid UDP multicast address.
+.. option:: serverip=str : [librpma_*]
+
+ The IP address to be used for RDMA-CM based I/O.
+
+.. option:: direct_write_to_pmem=bool : [librpma_*]
+
+ Set to 1 only when Direct Write to PMem from the remote host is possible.
+ Otherwise, set to 0.
+
.. option:: interface=str : [netsplice] [net]
The IP address of the network interface used to send or receive UDP
Poll store instead of waiting for completion. Usually this provides better
throughput at cost of higher(up to 100%) CPU utilization.
+.. option:: touch_objects=bool : [rados]
+
+ During initialization, touch (create if do not exist) all objects (files).
+ Touching all objects affects ceph caches and likely impacts test results.
+ Enabled by default.
+
.. option:: skip_bad=bool : [mtd]
Skip operations against known bad blocks.
GPU to RAM before a write and copied from RAM to GPU after a
read. :option:`verify` does not affect use of cudaMemcpy.
+.. option:: pool=str : [dfs]
+
+ Specify the UUID of the DAOS pool to connect to.
+
+.. option:: cont=str : [dfs]
+
+ Specify the UUID of the DAOS container to open.
+
+.. option:: chunk_size=int : [dfs]
+
+ Specificy a different chunk size (in bytes) for the dfs file.
+ Use DAOS container's chunk size by default.
+
+.. option:: object_class=str : [dfs]
+
+ Specificy a different object class for the dfs file.
+ Use DAOS container's object class by default.
+
I/O depth
~~~~~~~~~
true, fio will continue running and try to meet :option:`latency_target`
by adjusting queue depth.
-.. option:: max_latency=time
+.. option:: max_latency=time[,time][,time]
If set, fio will exit the job with an ETIMEDOUT error if it exceeds this
maximum latency. When the unit is omitted, the value is interpreted in
- microseconds.
+ microseconds. Comma-separated values may be specified for reads, writes,
+ and trims as described in :option:`blocksize`.
.. option:: rate_cycle=int
pshared.c options.c \
smalloc.c filehash.c profile.c debug.c engines/cpu.c \
engines/mmap.c engines/sync.c engines/null.c engines/net.c \
- engines/ftruncate.c engines/filecreate.c engines/filestat.c \
+ engines/ftruncate.c engines/filecreate.c engines/filestat.c engines/filedelete.c \
server.c client.c iolog.c backend.c libfio.c flow.c cconv.c \
gettime-thread.c helpers.c json.c idletime.c td_error.c \
profiles/tiobench.c profiles/act.c io_u_queue.c filelock.c \
rdma_LIBS = -libverbs -lrdmacm
ENGINES += rdma
endif
+ifdef CONFIG_LIBRPMA_APM
+ librpma_apm_SRCS = engines/librpma_apm.c
+ librpma_fio_SRCS = engines/librpma_fio.c
+ librpma_apm_LIBS = -lrpma -lpmem
+ ENGINES += librpma_apm
+endif
+ifdef CONFIG_LIBRPMA_GPSPM
+ librpma_gpspm_SRCS = engines/librpma_gpspm.c engines/librpma_gpspm_flush.pb-c.c
+ librpma_fio_SRCS = engines/librpma_fio.c
+ librpma_gpspm_LIBS = -lrpma -lpmem -lprotobuf-c
+ ENGINES += librpma_gpspm
+endif
+ifdef librpma_fio_SRCS
+ SOURCE += $(librpma_fio_SRCS)
+endif
ifdef CONFIG_POSIXAIO
SOURCE += engines/posixaio.c
endif
http_LIBS = -lcurl -lssl -lcrypto
ENGINES += http
endif
+ifdef CONFIG_DFS
+ dfs_SRCS = engines/dfs.c
+ dfs_LIBS = -luuid -ldaos -ldfs
+ ENGINES += dfs
+endif
SOURCE += oslib/asprintf.c
ifndef CONFIG_STRSEP
SOURCE += oslib/strsep.c
return 0;
}
+#ifdef FIO_HAVE_IOSCHED_SWITCH
/*
- * This function is Linux specific.
+ * These functions are Linux specific.
* FIO_HAVE_IOSCHED_SWITCH enabled currently means it's Linux.
*/
-static int switch_ioscheduler(struct thread_data *td)
+static int set_ioscheduler(struct thread_data *td, struct fio_file *file)
{
-#ifdef FIO_HAVE_IOSCHED_SWITCH
char tmp[256], tmp2[128], *p;
FILE *f;
int ret;
- if (td_ioengine_flagged(td, FIO_DISKLESSIO))
- return 0;
-
- assert(td->files && td->files[0]);
- sprintf(tmp, "%s/queue/scheduler", td->files[0]->du->sysfs_root);
+ assert(file->du && file->du->sysfs_root);
+ sprintf(tmp, "%s/queue/scheduler", file->du->sysfs_root);
f = fopen(tmp, "r+");
if (!f) {
fclose(f);
return 0;
+}
+
+static int switch_ioscheduler(struct thread_data *td)
+{
+ struct fio_file *f;
+ unsigned int i;
+ int ret = 0;
+
+ if (td_ioengine_flagged(td, FIO_DISKLESSIO))
+ return 0;
+
+ assert(td->files && td->files[0]);
+
+ for_each_file(td, f, i) {
+
+ /* Only consider regular files and block device files */
+ switch (f->filetype) {
+ case FIO_TYPE_FILE:
+ case FIO_TYPE_BLOCK:
+ /*
+ * Make sure that the device hosting the file could
+ * be determined.
+ */
+ if (!f->du)
+ continue;
+ break;
+ case FIO_TYPE_CHAR:
+ case FIO_TYPE_PIPE:
+ default:
+ continue;
+ }
+
+ ret = set_ioscheduler(td, f);
+ if (ret)
+ return ret;
+ }
+
+ return 0;
+}
+
#else
+
+static int switch_ioscheduler(struct thread_data *td)
+{
return 0;
-#endif
}
+#endif /* FIO_HAVE_IOSCHED_SWITCH */
+
static bool keep_running(struct thread_data *td)
{
unsigned long long limit;
for_each_td(td, i) {
steadystate_free(td);
fio_options_free(td);
+ fio_dump_options_free(td);
if (td->rusage_sem) {
fio_sem_remove(td->rusage_sem);
td->rusage_sem = NULL;
o->rate_iops_min[i] = le32_to_cpu(top->rate_iops_min[i]);
o->perc_rand[i] = le32_to_cpu(top->perc_rand[i]);
+
+ o->max_latency[i] = le64_to_cpu(top->max_latency[i]);
}
o->ratecycle = le32_to_cpu(top->ratecycle);
o->sync_file_range = le32_to_cpu(top->sync_file_range);
o->latency_target = le64_to_cpu(top->latency_target);
o->latency_window = le64_to_cpu(top->latency_window);
- o->max_latency = le64_to_cpu(top->max_latency);
o->latency_percentile.u.f = fio_uint64_to_double(le64_to_cpu(top->latency_percentile.u.i));
o->latency_run = le32_to_cpu(top->latency_run);
o->compress_percentage = le32_to_cpu(top->compress_percentage);
top->sync_file_range = cpu_to_le32(o->sync_file_range);
top->latency_target = __cpu_to_le64(o->latency_target);
top->latency_window = __cpu_to_le64(o->latency_window);
- top->max_latency = __cpu_to_le64(o->max_latency);
top->latency_percentile.u.i = __cpu_to_le64(fio_double_to_uint64(o->latency_percentile.u.f));
top->latency_run = __cpu_to_le32(o->latency_run);
top->compress_percentage = cpu_to_le32(o->compress_percentage);
top->rate_iops_min[i] = cpu_to_le32(o->rate_iops_min[i]);
top->perc_rand[i] = cpu_to_le32(o->perc_rand[i]);
+
+ top->max_latency[i] = __cpu_to_le64(o->max_latency[i]);
}
memcpy(top->verify_pattern, o->verify_pattern, MAX_PATTERN_SIZE);
--- /dev/null
+#!/bin/bash -e
+
+# 11.02.2021 Merge pull request #866 from ldorau/rpma-mmap-memory-for-rpma_mr_reg-in-rpma_flush_apm_new
+LIBRPMA_VERSION=fbac593917e98f3f26abf14f4fad5a832b330f5c
+ZIP_FILE=rpma.zip
+
+WORKDIR=$(pwd)
+
+# install librpma
+wget -O $ZIP_FILE https://github.com/pmem/rpma/archive/${LIBRPMA_VERSION}.zip
+unzip $ZIP_FILE
+mkdir -p rpma-${LIBRPMA_VERSION}/build
+cd rpma-${LIBRPMA_VERSION}/build
+cmake .. -DCMAKE_BUILD_TYPE=Release \
+ -DCMAKE_INSTALL_PREFIX=/usr \
+ -DBUILD_DOC=OFF \
+ -DBUILD_EXAMPLES=OFF \
+ -DBUILD_TESTS=OFF
+make -j$(nproc)
+sudo make -j$(nproc) install
+cd $WORKDIR
+rm -rf $ZIP_FILE rpma-${LIBRPMA_VERSION}
--- /dev/null
+#!/bin/bash -e
+
+# pmdk v1.9.1 release
+PMDK_VERSION=1.9.1
+
+WORKDIR=$(pwd)
+
+#
+# The '/bin/sh' shell used by PMDK's 'make install'
+# does not know the exact localization of clang
+# and fails with:
+# /bin/sh: 1: clang: not found
+# if CC is not set to the full path of clang.
+#
+export CC=$(which $CC)
+
+# Install PMDK libraries, because PMDK's libpmem
+# is a dependency of the librpma fio engine.
+# Install it from a release package
+# with already generated documentation,
+# in order to not install 'pandoc'.
+wget https://github.com/pmem/pmdk/releases/download/${PMDK_VERSION}/pmdk-${PMDK_VERSION}.tar.gz
+tar -xzf pmdk-${PMDK_VERSION}.tar.gz
+cd pmdk-${PMDK_VERSION}
+make -j$(nproc) NDCTL_ENABLE=n
+sudo make -j$(nproc) install prefix=/usr NDCTL_ENABLE=n
+cd $WORKDIR
+rm -rf pmdk-${PMDK_VERSION}
)
sudo apt-get -qq update
sudo apt-get install --no-install-recommends -qq -y "${pkgs[@]}"
+ # librpma is supported on the amd64 (x86_64) architecture for now
+ if [[ $CI_TARGET_ARCH == "amd64" ]]; then
+ # install libprotobuf-c-dev required by librpma_gpspm
+ sudo apt-get install --no-install-recommends -qq -y libprotobuf-c-dev
+ # PMDK libraries have to be installed, because
+ # libpmem is a dependency of the librpma fio engine
+ ci/travis-install-pmdk.sh
+ # install librpma from sources from GitHub
+ ci/travis-install-librpma.sh
+ fi
;;
"osx")
brew update >/dev/null 2>&1
libiscsi="no"
libnbd="no"
libzbc=""
+dfs=""
dynamic_engines="no"
prefix=/usr/local
;;
--dynamic-libengines) dynamic_engines="yes"
;;
+ --disable-dfs) dfs="no"
+ ;;
--help)
show_help="yes"
;;
echo "--disable-libzbc Disable libzbc even if found"
echo "--disable-tcmalloc Disable tcmalloc support"
echo "--dynamic-libengines Lib-based ioengines as dynamic libraries"
+ echo "--disable-dfs Disable DAOS File System support even if found"
exit $exit_val
fi
clock_gettime="yes" # clock_monotonic probe has dependency on this
clock_monotonic="yes"
sched_idle="yes"
+ pthread_condattr_setclock="no"
+ pthread_affinity="no"
;;
esac
##########################################
# POSIX pthread_condattr_setclock() probe
-if test "$pthread_condattr_setclock" != "yes" ; then
- pthread_condattr_setclock="no"
-fi
-cat > $TMPC <<EOF
+if test "$pthread_condattr_setclock" != "no" ; then
+ cat > $TMPC <<EOF
#include <pthread.h>
int main(void)
{
return 0;
}
EOF
-if compile_prog "" "$LIBS" "pthread_condattr_setclock" ; then
- pthread_condattr_setclock=yes
-elif compile_prog "" "$LIBS -lpthread" "pthread_condattr_setclock" ; then
- pthread_condattr_setclock=yes
- LIBS="$LIBS -lpthread"
+ if compile_prog "" "$LIBS" "pthread_condattr_setclock" ; then
+ pthread_condattr_setclock=yes
+ elif compile_prog "" "$LIBS -lpthread" "pthread_condattr_setclock" ; then
+ pthread_condattr_setclock=yes
+ LIBS="$LIBS -lpthread"
+ fi
fi
print_config "pthread_condattr_setclock()" "$pthread_condattr_setclock"
fi
print_config "pthread_sigmask()" "$pthread_sigmask"
+##########################################
+# pthread_getaffinity_np() probe
+if test "$pthread_getaffinity" != "yes" ; then
+ pthread_getaffinity="no"
+fi
+cat > $TMPC <<EOF
+#include <stddef.h> /* NULL */
+#include <signal.h> /* pthread_sigmask() */
+#include <pthread.h>
+int main(void)
+{
+ cpu_set_t set;
+ return pthread_getaffinity_np(pthread_self(), sizeof(set), &set);
+}
+EOF
+if compile_prog "" "$LIBS" "pthread_getaffinity" ; then
+ pthread_getaffinity="yes"
+elif compile_prog "" "$LIBS -lpthread" "pthread_getaffinity" ; then
+ pthread_getaffinity="yes"
+ LIBS="$LIBS -lpthread"
+fi
+print_config "pthread_getaffinity_np()" "$pthread_getaffinity"
+
##########################################
# solaris aio probe
if test "$solaris_aio" != "yes" ; then
fi
print_config "rdmacm" "$rdmacm"
+##########################################
+# librpma probe
+if test "$librpma" != "yes" ; then
+ librpma="no"
+fi
+cat > $TMPC << EOF
+#include <stdio.h>
+#include <librpma.h>
+int main(int argc, char **argv)
+{
+ enum rpma_conn_event event = RPMA_CONN_REJECTED;
+ (void) event; /* unused */
+ rpma_log_set_threshold(RPMA_LOG_THRESHOLD, RPMA_LOG_LEVEL_INFO);
+ return 0;
+}
+EOF
+if test "$disable_rdma" != "yes" && compile_prog "" "-lrpma" "rpma"; then
+ librpma="yes"
+fi
+print_config "librpma" "$librpma"
+
+##########################################
+# libprotobuf-c probe
+if test "$libprotobuf_c" != "yes" ; then
+ libprotobuf_c="no"
+fi
+cat > $TMPC << EOF
+#include <stdio.h>
+#include <protobuf-c/protobuf-c.h>
+#if !defined(PROTOBUF_C_VERSION_NUMBER)
+# error PROTOBUF_C_VERSION_NUMBER is not defined!
+#endif
+int main(int argc, char **argv)
+{
+ (void)protobuf_c_message_check(NULL);
+ return 0;
+}
+EOF
+if compile_prog "" "-lprotobuf-c" "protobuf_c"; then
+ libprotobuf_c="yes"
+fi
+print_config "libprotobuf_c" "$libprotobuf_c"
+
##########################################
# asprintf() and vasprintf() probes
if test "$have_asprintf" != "yes" ; then
fi
print_config "NBD engine" "$libnbd"
+##########################################
+# check for dfs (DAOS File System)
+if test "$dfs" != "no" ; then
+ cat > $TMPC << EOF
+#include <fcntl.h>
+#include <daos.h>
+#include <daos_fs.h>
+
+int main(int argc, char **argv)
+{
+ daos_handle_t poh;
+ daos_handle_t coh;
+ dfs_t *dfs;
+
+ (void) dfs_mount(poh, coh, O_RDWR, &dfs);
+
+ return 0;
+}
+EOF
+ if compile_prog "" "-luuid -ldfs -ldaos" "dfs"; then
+ dfs="yes"
+ else
+ dfs="no"
+ fi
+fi
+print_config "DAOS File System (dfs) Engine" "$dfs"
+
##########################################
# Check if we have lex/yacc available
yacc="no"
if test "$pthread_sigmask" = "yes" ; then
output_sym "CONFIG_PTHREAD_SIGMASK"
fi
+if test "$pthread_getaffinity" = "yes" ; then
+ output_sym "CONFIG_PTHREAD_GETAFFINITY"
+fi
if test "$have_asprintf" = "yes" ; then
output_sym "CONFIG_HAVE_ASPRINTF"
fi
if test "$libverbs" = "yes" -a "$rdmacm" = "yes" ; then
output_sym "CONFIG_RDMA"
fi
+# librpma is supported on the 'x86_64' architecture for now
+if test "$cpu" = "x86_64" -a "$libverbs" = "yes" -a "$rdmacm" = "yes" \
+ -a "$librpma" = "yes" -a "$libpmem" = "yes" ; then
+ output_sym "CONFIG_LIBRPMA_APM"
+fi
+if test "$cpu" = "x86_64" -a "$libverbs" = "yes" -a "$rdmacm" = "yes" \
+ -a "$librpma" = "yes" -a "$libpmem" = "yes" -a "$libprotobuf_c" = "yes" ; then
+ output_sym "CONFIG_LIBRPMA_GPSPM"
+fi
if test "$clock_gettime" = "yes" ; then
output_sym "CONFIG_CLOCK_GETTIME"
fi
if test "$clock_monotonic" = "yes" ; then
output_sym "CONFIG_CLOCK_MONOTONIC"
fi
-if test "$clock_monotonic_raw" = "yes" ; then
- output_sym "CONFIG_CLOCK_MONOTONIC_RAW"
-fi
-if test "$clock_monotonic_precise" = "yes" ; then
- output_sym "CONFIG_CLOCK_MONOTONIC_PRECISE"
-fi
if test "$clockid_t" = "yes"; then
output_sym "CONFIG_CLOCKID_T"
fi
if test "$libcufile" = "yes" ; then
output_sym "CONFIG_LIBCUFILE"
fi
+if test "$dfs" = "yes" ; then
+ output_sym "CONFIG_DFS"
+fi
if test "$march_set" = "no" && test "$build_native" = "yes" ; then
output_sym "CONFIG_BUILD_NATIVE"
fi
--- /dev/null
+/**
+ * FIO engine for DAOS File System (dfs).
+ *
+ * (C) Copyright 2020-2021 Intel Corporation.
+ */
+
+#include <fio.h>
+#include <optgroup.h>
+
+#include <daos.h>
+#include <daos_fs.h>
+
+static bool daos_initialized;
+static int num_threads;
+static pthread_mutex_t daos_mutex = PTHREAD_MUTEX_INITIALIZER;
+daos_handle_t poh; /* pool handle */
+daos_handle_t coh; /* container handle */
+daos_oclass_id_t cid = OC_UNKNOWN; /* object class */
+dfs_t *dfs; /* dfs mount reference */
+
+struct daos_iou {
+ struct io_u *io_u;
+ daos_event_t ev;
+ d_sg_list_t sgl;
+ d_iov_t iov;
+ daos_size_t size;
+ bool complete;
+};
+
+struct daos_data {
+ daos_handle_t eqh;
+ dfs_obj_t *obj;
+ struct io_u **io_us;
+ int queued;
+ int num_ios;
+};
+
+struct daos_fio_options {
+ void *pad;
+ char *pool; /* Pool UUID */
+ char *cont; /* Container UUID */
+ daos_size_t chsz; /* Chunk size */
+ char *oclass; /* object class */
+#if !defined(DAOS_API_VERSION_MAJOR) || DAOS_API_VERSION_MAJOR < 1
+ char *svcl; /* service replica list, deprecated */
+#endif
+};
+
+static struct fio_option options[] = {
+ {
+ .name = "pool",
+ .lname = "pool uuid",
+ .type = FIO_OPT_STR_STORE,
+ .off1 = offsetof(struct daos_fio_options, pool),
+ .help = "DAOS pool uuid",
+ .category = FIO_OPT_C_ENGINE,
+ .group = FIO_OPT_G_DFS,
+ },
+ {
+ .name = "cont",
+ .lname = "container uuid",
+ .type = FIO_OPT_STR_STORE,
+ .off1 = offsetof(struct daos_fio_options, cont),
+ .help = "DAOS container uuid",
+ .category = FIO_OPT_C_ENGINE,
+ .group = FIO_OPT_G_DFS,
+ },
+ {
+ .name = "chunk_size",
+ .lname = "DFS chunk size",
+ .type = FIO_OPT_ULL,
+ .off1 = offsetof(struct daos_fio_options, chsz),
+ .help = "DFS chunk size in bytes",
+ .def = "0", /* use container default */
+ .category = FIO_OPT_C_ENGINE,
+ .group = FIO_OPT_G_DFS,
+ },
+ {
+ .name = "object_class",
+ .lname = "object class",
+ .type = FIO_OPT_STR_STORE,
+ .off1 = offsetof(struct daos_fio_options, oclass),
+ .help = "DAOS object class",
+ .category = FIO_OPT_C_ENGINE,
+ .group = FIO_OPT_G_DFS,
+ },
+#if !defined(DAOS_API_VERSION_MAJOR) || DAOS_API_VERSION_MAJOR < 1
+ {
+ .name = "svcl",
+ .lname = "List of service ranks",
+ .type = FIO_OPT_STR_STORE,
+ .off1 = offsetof(struct daos_fio_options, svcl),
+ .help = "List of pool replicated service ranks",
+ .category = FIO_OPT_C_ENGINE,
+ .group = FIO_OPT_G_DFS,
+ },
+#endif
+ {
+ .name = NULL,
+ },
+};
+
+static int daos_fio_global_init(struct thread_data *td)
+{
+ struct daos_fio_options *eo = td->eo;
+ uuid_t pool_uuid, co_uuid;
+ daos_pool_info_t pool_info;
+ daos_cont_info_t co_info;
+ int rc = 0;
+
+#if !defined(DAOS_API_VERSION_MAJOR) || DAOS_API_VERSION_MAJOR < 1
+ if (!eo->pool || !eo->cont || !eo->svcl) {
+#else
+ if (!eo->pool || !eo->cont) {
+#endif
+ log_err("Missing required DAOS options\n");
+ return EINVAL;
+ }
+
+ rc = daos_init();
+ if (rc != -DER_ALREADY && rc) {
+ log_err("Failed to initialize daos %d\n", rc);
+ td_verror(td, rc, "daos_init");
+ return rc;
+ }
+
+ rc = uuid_parse(eo->pool, pool_uuid);
+ if (rc) {
+ log_err("Failed to parse 'Pool uuid': %s\n", eo->pool);
+ td_verror(td, EINVAL, "uuid_parse(eo->pool)");
+ return EINVAL;
+ }
+
+ rc = uuid_parse(eo->cont, co_uuid);
+ if (rc) {
+ log_err("Failed to parse 'Cont uuid': %s\n", eo->cont);
+ td_verror(td, EINVAL, "uuid_parse(eo->cont)");
+ return EINVAL;
+ }
+
+ /* Connect to the DAOS pool */
+#if !defined(DAOS_API_VERSION_MAJOR) || DAOS_API_VERSION_MAJOR < 1
+ d_rank_list_t *svcl = NULL;
+
+ svcl = daos_rank_list_parse(eo->svcl, ":");
+ if (svcl == NULL) {
+ log_err("Failed to parse svcl\n");
+ td_verror(td, EINVAL, "daos_rank_list_parse");
+ return EINVAL;
+ }
+
+ rc = daos_pool_connect(pool_uuid, NULL, svcl, DAOS_PC_RW,
+ &poh, &pool_info, NULL);
+ d_rank_list_free(svcl);
+#else
+ rc = daos_pool_connect(pool_uuid, NULL, DAOS_PC_RW, &poh, &pool_info,
+ NULL);
+#endif
+ if (rc) {
+ log_err("Failed to connect to pool %d\n", rc);
+ td_verror(td, rc, "daos_pool_connect");
+ return rc;
+ }
+
+ /* Open the DAOS container */
+ rc = daos_cont_open(poh, co_uuid, DAOS_COO_RW, &coh, &co_info, NULL);
+ if (rc) {
+ log_err("Failed to open container: %d\n", rc);
+ td_verror(td, rc, "daos_cont_open");
+ (void)daos_pool_disconnect(poh, NULL);
+ return rc;
+ }
+
+ /* Mount encapsulated filesystem */
+ rc = dfs_mount(poh, coh, O_RDWR, &dfs);
+ if (rc) {
+ log_err("Failed to mount DFS namespace: %d\n", rc);
+ td_verror(td, rc, "dfs_mount");
+ (void)daos_pool_disconnect(poh, NULL);
+ (void)daos_cont_close(coh, NULL);
+ return rc;
+ }
+
+ /* Retrieve object class to use, if specified */
+ if (eo->oclass)
+ cid = daos_oclass_name2id(eo->oclass);
+
+ return 0;
+}
+
+static int daos_fio_global_cleanup()
+{
+ int rc;
+ int ret = 0;
+
+ rc = dfs_umount(dfs);
+ if (rc) {
+ log_err("failed to umount dfs: %d\n", rc);
+ ret = rc;
+ }
+ rc = daos_cont_close(coh, NULL);
+ if (rc) {
+ log_err("failed to close container: %d\n", rc);
+ if (ret == 0)
+ ret = rc;
+ }
+ rc = daos_pool_disconnect(poh, NULL);
+ if (rc) {
+ log_err("failed to disconnect pool: %d\n", rc);
+ if (ret == 0)
+ ret = rc;
+ }
+ rc = daos_fini();
+ if (rc) {
+ log_err("failed to finalize daos: %d\n", rc);
+ if (ret == 0)
+ ret = rc;
+ }
+
+ return ret;
+}
+
+static int daos_fio_setup(struct thread_data *td)
+{
+ return 0;
+}
+
+static int daos_fio_init(struct thread_data *td)
+{
+ struct daos_data *dd;
+ int rc = 0;
+
+ pthread_mutex_lock(&daos_mutex);
+
+ dd = malloc(sizeof(*dd));
+ if (dd == NULL) {
+ log_err("Failed to allocate DAOS-private data\n");
+ rc = ENOMEM;
+ goto out;
+ }
+
+ dd->queued = 0;
+ dd->num_ios = td->o.iodepth;
+ dd->io_us = calloc(dd->num_ios, sizeof(struct io_u *));
+ if (dd->io_us == NULL) {
+ log_err("Failed to allocate IO queue\n");
+ rc = ENOMEM;
+ goto out;
+ }
+
+ /* initialize DAOS stack if not already up */
+ if (!daos_initialized) {
+ rc = daos_fio_global_init(td);
+ if (rc)
+ goto out;
+ daos_initialized = true;
+ }
+
+ rc = daos_eq_create(&dd->eqh);
+ if (rc) {
+ log_err("Failed to create event queue: %d\n", rc);
+ td_verror(td, rc, "daos_eq_create");
+ goto out;
+ }
+
+ td->io_ops_data = dd;
+ num_threads++;
+out:
+ if (rc) {
+ if (dd) {
+ free(dd->io_us);
+ free(dd);
+ }
+ if (num_threads == 0 && daos_initialized) {
+ /* don't clobber error return value */
+ (void)daos_fio_global_cleanup();
+ daos_initialized = false;
+ }
+ }
+ pthread_mutex_unlock(&daos_mutex);
+ return rc;
+}
+
+static void daos_fio_cleanup(struct thread_data *td)
+{
+ struct daos_data *dd = td->io_ops_data;
+ int rc;
+
+ if (dd == NULL)
+ return;
+
+ rc = daos_eq_destroy(dd->eqh, DAOS_EQ_DESTROY_FORCE);
+ if (rc < 0) {
+ log_err("failed to destroy event queue: %d\n", rc);
+ td_verror(td, rc, "daos_eq_destroy");
+ }
+
+ free(dd->io_us);
+ free(dd);
+
+ pthread_mutex_lock(&daos_mutex);
+ num_threads--;
+ if (daos_initialized && num_threads == 0) {
+ int ret;
+
+ ret = daos_fio_global_cleanup();
+ if (ret < 0 && rc == 0) {
+ log_err("failed to clean up: %d\n", ret);
+ td_verror(td, ret, "daos_fio_global_cleanup");
+ }
+ daos_initialized = false;
+ }
+ pthread_mutex_unlock(&daos_mutex);
+}
+
+static int daos_fio_get_file_size(struct thread_data *td, struct fio_file *f)
+{
+ char *file_name = f->file_name;
+ struct stat stbuf = {0};
+ int rc;
+
+ dprint(FD_FILE, "dfs stat %s\n", f->file_name);
+
+ if (!daos_initialized)
+ return 0;
+
+ rc = dfs_stat(dfs, NULL, file_name, &stbuf);
+ if (rc) {
+ log_err("Failed to stat %s: %d\n", f->file_name, rc);
+ td_verror(td, rc, "dfs_stat");
+ return rc;
+ }
+
+ f->real_file_size = stbuf.st_size;
+ return 0;
+}
+
+static int daos_fio_close(struct thread_data *td, struct fio_file *f)
+{
+ struct daos_data *dd = td->io_ops_data;
+ int rc;
+
+ dprint(FD_FILE, "dfs release %s\n", f->file_name);
+
+ rc = dfs_release(dd->obj);
+ if (rc) {
+ log_err("Failed to release %s: %d\n", f->file_name, rc);
+ td_verror(td, rc, "dfs_release");
+ return rc;
+ }
+
+ return 0;
+}
+
+static int daos_fio_open(struct thread_data *td, struct fio_file *f)
+{
+ struct daos_data *dd = td->io_ops_data;
+ struct daos_fio_options *eo = td->eo;
+ int flags = 0;
+ int rc;
+
+ dprint(FD_FILE, "dfs open %s (%s/%d/%d)\n",
+ f->file_name, td_write(td) & !read_only ? "rw" : "r",
+ td->o.create_on_open, td->o.allow_create);
+
+ if (td->o.create_on_open && td->o.allow_create)
+ flags |= O_CREAT;
+
+ if (td_write(td)) {
+ if (!read_only)
+ flags |= O_RDWR;
+ if (td->o.allow_create)
+ flags |= O_CREAT;
+ } else if (td_read(td)) {
+ flags |= O_RDONLY;
+ }
+
+ rc = dfs_open(dfs, NULL, f->file_name,
+ S_IFREG | S_IRUSR | S_IWUSR,
+ flags, cid, eo->chsz, NULL, &dd->obj);
+ if (rc) {
+ log_err("Failed to open %s: %d\n", f->file_name, rc);
+ td_verror(td, rc, "dfs_open");
+ return rc;
+ }
+
+ return 0;
+}
+
+static int daos_fio_unlink(struct thread_data *td, struct fio_file *f)
+{
+ int rc;
+
+ dprint(FD_FILE, "dfs remove %s\n", f->file_name);
+
+ rc = dfs_remove(dfs, NULL, f->file_name, false, NULL);
+ if (rc) {
+ log_err("Failed to remove %s: %d\n", f->file_name, rc);
+ td_verror(td, rc, "dfs_remove");
+ return rc;
+ }
+
+ return 0;
+}
+
+static int daos_fio_invalidate(struct thread_data *td, struct fio_file *f)
+{
+ dprint(FD_FILE, "dfs invalidate %s\n", f->file_name);
+ return 0;
+}
+
+static void daos_fio_io_u_free(struct thread_data *td, struct io_u *io_u)
+{
+ struct daos_iou *io = io_u->engine_data;
+
+ if (io) {
+ io_u->engine_data = NULL;
+ free(io);
+ }
+}
+
+static int daos_fio_io_u_init(struct thread_data *td, struct io_u *io_u)
+{
+ struct daos_iou *io;
+
+ io = malloc(sizeof(struct daos_iou));
+ if (!io) {
+ td_verror(td, ENOMEM, "malloc");
+ return ENOMEM;
+ }
+ io->io_u = io_u;
+ io_u->engine_data = io;
+ return 0;
+}
+
+static struct io_u * daos_fio_event(struct thread_data *td, int event)
+{
+ struct daos_data *dd = td->io_ops_data;
+
+ return dd->io_us[event];
+}
+
+static int daos_fio_getevents(struct thread_data *td, unsigned int min,
+ unsigned int max, const struct timespec *t)
+{
+ struct daos_data *dd = td->io_ops_data;
+ daos_event_t *evp[max];
+ unsigned int events = 0;
+ int i;
+ int rc;
+
+ while (events < min) {
+ rc = daos_eq_poll(dd->eqh, 0, DAOS_EQ_NOWAIT, max, evp);
+ if (rc < 0) {
+ log_err("Event poll failed: %d\n", rc);
+ td_verror(td, rc, "daos_eq_poll");
+ return events;
+ }
+
+ for (i = 0; i < rc; i++) {
+ struct daos_iou *io;
+ struct io_u *io_u;
+
+ io = container_of(evp[i], struct daos_iou, ev);
+ if (io->complete)
+ log_err("Completion on already completed I/O\n");
+
+ io_u = io->io_u;
+ if (io->ev.ev_error)
+ io_u->error = io->ev.ev_error;
+ else
+ io_u->resid = 0;
+
+ dd->io_us[events] = io_u;
+ dd->queued--;
+ daos_event_fini(&io->ev);
+ io->complete = true;
+ events++;
+ }
+ }
+
+ dprint(FD_IO, "dfs eq_pool returning %d (%u/%u)\n", events, min, max);
+
+ return events;
+}
+
+static enum fio_q_status daos_fio_queue(struct thread_data *td,
+ struct io_u *io_u)
+{
+ struct daos_data *dd = td->io_ops_data;
+ struct daos_iou *io = io_u->engine_data;
+ daos_off_t offset = io_u->offset;
+ int rc;
+
+ if (dd->queued == td->o.iodepth)
+ return FIO_Q_BUSY;
+
+ io->sgl.sg_nr = 1;
+ io->sgl.sg_nr_out = 0;
+ d_iov_set(&io->iov, io_u->xfer_buf, io_u->xfer_buflen);
+ io->sgl.sg_iovs = &io->iov;
+ io->size = io_u->xfer_buflen;
+
+ io->complete = false;
+ rc = daos_event_init(&io->ev, dd->eqh, NULL);
+ if (rc) {
+ log_err("Event init failed: %d\n", rc);
+ io_u->error = rc;
+ return FIO_Q_COMPLETED;
+ }
+
+ switch (io_u->ddir) {
+ case DDIR_WRITE:
+ rc = dfs_write(dfs, dd->obj, &io->sgl, offset, &io->ev);
+ if (rc) {
+ log_err("dfs_write failed: %d\n", rc);
+ io_u->error = rc;
+ return FIO_Q_COMPLETED;
+ }
+ break;
+ case DDIR_READ:
+ rc = dfs_read(dfs, dd->obj, &io->sgl, offset, &io->size,
+ &io->ev);
+ if (rc) {
+ log_err("dfs_read failed: %d\n", rc);
+ io_u->error = rc;
+ return FIO_Q_COMPLETED;
+ }
+ break;
+ case DDIR_SYNC:
+ io_u->error = 0;
+ return FIO_Q_COMPLETED;
+ default:
+ dprint(FD_IO, "Invalid IO type: %d\n", io_u->ddir);
+ io_u->error = -DER_INVAL;
+ return FIO_Q_COMPLETED;
+ }
+
+ dd->queued++;
+ return FIO_Q_QUEUED;
+}
+
+static int daos_fio_prep(struct thread_data fio_unused *td, struct io_u *io_u)
+{
+ return 0;
+}
+
+/* ioengine_ops for get_ioengine() */
+FIO_STATIC struct ioengine_ops ioengine = {
+ .name = "dfs",
+ .version = FIO_IOOPS_VERSION,
+ .flags = FIO_DISKLESSIO | FIO_NODISKUTIL,
+
+ .setup = daos_fio_setup,
+ .init = daos_fio_init,
+ .prep = daos_fio_prep,
+ .cleanup = daos_fio_cleanup,
+
+ .open_file = daos_fio_open,
+ .invalidate = daos_fio_invalidate,
+ .get_file_size = daos_fio_get_file_size,
+ .close_file = daos_fio_close,
+ .unlink_file = daos_fio_unlink,
+
+ .queue = daos_fio_queue,
+ .getevents = daos_fio_getevents,
+ .event = daos_fio_event,
+ .io_u_init = daos_fio_io_u_init,
+ .io_u_free = daos_fio_io_u_free,
+
+ .option_struct_size = sizeof(struct daos_fio_options),
+ .options = options,
+};
+
+static void fio_init fio_dfs_register(void)
+{
+ register_ioengine(&ioengine);
+}
+
+static void fio_exit fio_dfs_unregister(void)
+{
+ unregister_ioengine(&ioengine);
+}
dprint(FD_FILE, "fd open %s\n", f->file_name);
- if (f->filetype != FIO_TYPE_FILE) {
- log_err("fio: only files are supported fallocate \n");
+ if (f->filetype != FIO_TYPE_FILE && f->filetype != FIO_TYPE_BLOCK) {
+ log_err("fio: only files and blockdev are supported fallocate \n");
return 1;
}
if (!strcmp(f->file_name, "-")) {
dprint(FD_FILE, "fd open %s\n", f->file_name);
if (f->filetype != FIO_TYPE_FILE) {
- log_err("fio: only files are supported fallocate \n");
+ log_err("fio: only files are supported\n");
return 1;
}
if (!strcmp(f->file_name, "-")) {
--- /dev/null
+/*
+ * file delete engine
+ *
+ * IO engine that doesn't do any IO, just delete files and track the latency
+ * of the file deletion.
+ */
+#include <stdio.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include "../fio.h"
+
+struct fc_data {
+ enum fio_ddir stat_ddir;
+};
+
+static int delete_file(struct thread_data *td, struct fio_file *f)
+{
+ struct timespec start;
+ int do_lat = !td->o.disable_lat;
+ int ret;
+
+ dprint(FD_FILE, "fd delete %s\n", f->file_name);
+
+ if (f->filetype != FIO_TYPE_FILE) {
+ log_err("fio: only files are supported\n");
+ return 1;
+ }
+ if (!strcmp(f->file_name, "-")) {
+ log_err("fio: can't read/write to stdin/out\n");
+ return 1;
+ }
+
+ if (do_lat)
+ fio_gettime(&start, NULL);
+
+ ret = unlink(f->file_name);
+
+ if (ret == -1) {
+ char buf[FIO_VERROR_SIZE];
+ int e = errno;
+
+ snprintf(buf, sizeof(buf), "delete(%s)", f->file_name);
+ td_verror(td, e, buf);
+ return 1;
+ }
+
+ if (do_lat) {
+ struct fc_data *data = td->io_ops_data;
+ uint64_t nsec;
+
+ nsec = ntime_since_now(&start);
+ add_clat_sample(td, data->stat_ddir, nsec, 0, 0, 0);
+ }
+
+ return 0;
+}
+
+
+static enum fio_q_status queue_io(struct thread_data *td, struct io_u fio_unused *io_u)
+{
+ return FIO_Q_COMPLETED;
+}
+
+static int init(struct thread_data *td)
+{
+ struct fc_data *data;
+
+ data = calloc(1, sizeof(*data));
+
+ if (td_read(td))
+ data->stat_ddir = DDIR_READ;
+ else if (td_write(td))
+ data->stat_ddir = DDIR_WRITE;
+
+ td->io_ops_data = data;
+ return 0;
+}
+
+static int delete_invalidate(struct thread_data *td, struct fio_file *f)
+{
+ /* do nothing because file not opened */
+ return 0;
+}
+
+static void cleanup(struct thread_data *td)
+{
+ struct fc_data *data = td->io_ops_data;
+
+ free(data);
+}
+
+static struct ioengine_ops ioengine = {
+ .name = "filedelete",
+ .version = FIO_IOOPS_VERSION,
+ .init = init,
+ .invalidate = delete_invalidate,
+ .cleanup = cleanup,
+ .queue = queue_io,
+ .get_file_size = generic_get_file_size,
+ .open_file = delete_file,
+ .flags = FIO_SYNCIO | FIO_FAKEIO |
+ FIO_NOSTATS | FIO_NOFILEHASH,
+};
+
+static void fio_init fio_filedelete_register(void)
+{
+ register_ioengine(&ioengine);
+}
+
+static void fio_exit fio_filedelete_unregister(void)
+{
+ unregister_ioengine(&ioengine);
+}
err = fio_ioring_queue_init(td);
if (err) {
- int __errno = errno;
+ int init_err = errno;
- if (__errno == ENOSYS)
+ if (init_err == ENOSYS)
log_err("fio: your kernel doesn't support io_uring\n");
- td_verror(td, __errno, "io_queue_init");
+ td_verror(td, init_err, "io_queue_init");
return 1;
}
--- /dev/null
+/*
+* librpma_apm: IO engine that uses PMDK librpma to read and write data,
+ * based on Appliance Persistency Method
+ *
+ * Copyright 2020-2021, Intel Corporation
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License,
+ * version 2 as published by the Free Software Foundation..
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ */
+
+#include "librpma_fio.h"
+
+/* client side implementation */
+
+static inline int client_io_flush(struct thread_data *td,
+ struct io_u *first_io_u, struct io_u *last_io_u,
+ unsigned long long int len);
+
+static int client_get_io_u_index(struct rpma_completion *cmpl,
+ unsigned int *io_u_index);
+
+static int client_init(struct thread_data *td)
+{
+ struct librpma_fio_client_data *ccd;
+ unsigned int sq_size;
+ uint32_t cq_size;
+ struct rpma_conn_cfg *cfg = NULL;
+ struct rpma_peer_cfg *pcfg = NULL;
+ int ret;
+
+ /* not supported readwrite = trim / randtrim / trimwrite */
+ if (td_trim(td)) {
+ td_verror(td, EINVAL, "Not supported mode.");
+ return -1;
+ }
+
+ /*
+ * Calculate the required queue sizes where:
+ * - the send queue (SQ) has to be big enough to accommodate
+ * all io_us (WRITEs) and all flush requests (FLUSHes)
+ * - the completion queue (CQ) has to be big enough to accommodate all
+ * success and error completions (cq_size = sq_size)
+ */
+ if (td_random(td) || td_rw(td)) {
+ /*
+ * sq_size = max(rand_read_sq_size, rand_write_sq_size)
+ * where rand_read_sq_size < rand_write_sq_size because read
+ * does not require flush afterwards
+ * rand_write_sq_size = N * (WRITE + FLUSH)
+ *
+ * Note: rw is no different from random write since having
+ * interleaved reads with writes in extreme forces you to flush
+ * as often as when the writes are random.
+ */
+ sq_size = 2 * td->o.iodepth;
+ } else if (td_write(td)) {
+ /* sequential TD_DDIR_WRITE only */
+ if (td->o.sync_io) {
+ sq_size = 2; /* WRITE + FLUSH */
+ } else {
+ /*
+ * N * WRITE + B * FLUSH where:
+ * - B == ceil(iodepth / iodepth_batch)
+ * which is the number of batches for N writes
+ */
+ sq_size = td->o.iodepth + LIBRPMA_FIO_CEIL(td->o.iodepth,
+ td->o.iodepth_batch);
+ }
+ } else {
+ /* TD_DDIR_READ only */
+ if (td->o.sync_io) {
+ sq_size = 1; /* READ */
+ } else {
+ sq_size = td->o.iodepth; /* N x READ */
+ }
+ }
+ cq_size = sq_size;
+
+ /* create a connection configuration object */
+ if ((ret = rpma_conn_cfg_new(&cfg))) {
+ librpma_td_verror(td, ret, "rpma_conn_cfg_new");
+ return -1;
+ }
+
+ /* apply queue sizes */
+ if ((ret = rpma_conn_cfg_set_sq_size(cfg, sq_size))) {
+ librpma_td_verror(td, ret, "rpma_conn_cfg_set_sq_size");
+ goto err_cfg_delete;
+ }
+ if ((ret = rpma_conn_cfg_set_cq_size(cfg, cq_size))) {
+ librpma_td_verror(td, ret, "rpma_conn_cfg_set_cq_size");
+ goto err_cfg_delete;
+ }
+
+ if (librpma_fio_client_init(td, cfg))
+ goto err_cfg_delete;
+
+ ccd = td->io_ops_data;
+
+ if (ccd->server_mr_flush_type == RPMA_FLUSH_TYPE_PERSISTENT) {
+ if (!ccd->ws->direct_write_to_pmem) {
+ if (td->thread_number == 1)
+ log_err(
+ "Fio librpma engine will not work until the Direct Write to PMem on the server side is possible (direct_write_to_pmem)\n");
+ goto err_cleanup_common;
+ }
+
+ /* configure peer's direct write to pmem support */
+ if ((ret = rpma_peer_cfg_new(&pcfg))) {
+ librpma_td_verror(td, ret, "rpma_peer_cfg_new");
+ goto err_cleanup_common;
+ }
+
+ if ((ret = rpma_peer_cfg_set_direct_write_to_pmem(pcfg, true))) {
+ librpma_td_verror(td, ret,
+ "rpma_peer_cfg_set_direct_write_to_pmem");
+ (void) rpma_peer_cfg_delete(&pcfg);
+ goto err_cleanup_common;
+ }
+
+ if ((ret = rpma_conn_apply_remote_peer_cfg(ccd->conn, pcfg))) {
+ librpma_td_verror(td, ret,
+ "rpma_conn_apply_remote_peer_cfg");
+ (void) rpma_peer_cfg_delete(&pcfg);
+ goto err_cleanup_common;
+ }
+
+ (void) rpma_peer_cfg_delete(&pcfg);
+ } else if (td->thread_number == 1) {
+ /* XXX log_info mixes with the JSON output */
+ log_err(
+ "Note: Direct Write to PMem is not supported by default nor required if you use DRAM instead of PMem on the server side (direct_write_to_pmem).\n"
+ "Remember that flushing to DRAM does not make your data persistent and may be used only for experimental purposes.\n");
+ }
+
+ if ((ret = rpma_conn_cfg_delete(&cfg))) {
+ librpma_td_verror(td, ret, "rpma_conn_cfg_delete");
+ /* non fatal error - continue */
+ }
+
+ ccd->flush = client_io_flush;
+ ccd->get_io_u_index = client_get_io_u_index;
+
+ return 0;
+
+err_cleanup_common:
+ librpma_fio_client_cleanup(td);
+
+err_cfg_delete:
+ (void) rpma_conn_cfg_delete(&cfg);
+
+ return -1;
+}
+
+static void client_cleanup(struct thread_data *td)
+{
+ struct librpma_fio_client_data *ccd = td->io_ops_data;
+
+ if (ccd == NULL)
+ return;
+
+ free(ccd->client_data);
+
+ librpma_fio_client_cleanup(td);
+}
+
+static inline int client_io_flush(struct thread_data *td,
+ struct io_u *first_io_u, struct io_u *last_io_u,
+ unsigned long long int len)
+{
+ struct librpma_fio_client_data *ccd = td->io_ops_data;
+ size_t dst_offset = first_io_u->offset;
+ int ret;
+
+ if ((ret = rpma_flush(ccd->conn, ccd->server_mr, dst_offset, len,
+ ccd->server_mr_flush_type, RPMA_F_COMPLETION_ALWAYS,
+ (void *)(uintptr_t)last_io_u->index))) {
+ librpma_td_verror(td, ret, "rpma_flush");
+ return -1;
+ }
+
+ return 0;
+}
+
+static int client_get_io_u_index(struct rpma_completion *cmpl,
+ unsigned int *io_u_index)
+{
+ memcpy(io_u_index, &cmpl->op_context, sizeof(*io_u_index));
+
+ return 1;
+}
+
+FIO_STATIC struct ioengine_ops ioengine_client = {
+ .name = "librpma_apm_client",
+ .version = FIO_IOOPS_VERSION,
+ .init = client_init,
+ .post_init = librpma_fio_client_post_init,
+ .get_file_size = librpma_fio_client_get_file_size,
+ .open_file = librpma_fio_file_nop,
+ .queue = librpma_fio_client_queue,
+ .commit = librpma_fio_client_commit,
+ .getevents = librpma_fio_client_getevents,
+ .event = librpma_fio_client_event,
+ .errdetails = librpma_fio_client_errdetails,
+ .close_file = librpma_fio_file_nop,
+ .cleanup = client_cleanup,
+ .flags = FIO_DISKLESSIO,
+ .options = librpma_fio_options,
+ .option_struct_size = sizeof(struct librpma_fio_options_values),
+};
+
+/* server side implementation */
+
+static int server_open_file(struct thread_data *td, struct fio_file *f)
+{
+ return librpma_fio_server_open_file(td, f, NULL);
+}
+
+static enum fio_q_status server_queue(struct thread_data *td, struct io_u *io_u)
+{
+ return FIO_Q_COMPLETED;
+}
+
+FIO_STATIC struct ioengine_ops ioengine_server = {
+ .name = "librpma_apm_server",
+ .version = FIO_IOOPS_VERSION,
+ .init = librpma_fio_server_init,
+ .open_file = server_open_file,
+ .close_file = librpma_fio_server_close_file,
+ .queue = server_queue,
+ .invalidate = librpma_fio_file_nop,
+ .cleanup = librpma_fio_server_cleanup,
+ .flags = FIO_SYNCIO,
+ .options = librpma_fio_options,
+ .option_struct_size = sizeof(struct librpma_fio_options_values),
+};
+
+/* register both engines */
+
+static void fio_init fio_librpma_apm_register(void)
+{
+ register_ioengine(&ioengine_client);
+ register_ioengine(&ioengine_server);
+}
+
+static void fio_exit fio_librpma_apm_unregister(void)
+{
+ unregister_ioengine(&ioengine_client);
+ unregister_ioengine(&ioengine_server);
+}
--- /dev/null
+/*
+ * librpma_fio: librpma_apm and librpma_gpspm engines' common part.
+ *
+ * Copyright 2021, Intel Corporation
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License,
+ * version 2 as published by the Free Software Foundation..
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ */
+
+#include "librpma_fio.h"
+
+#include <libpmem.h>
+
+struct fio_option librpma_fio_options[] = {
+ {
+ .name = "serverip",
+ .lname = "rpma_server_ip",
+ .type = FIO_OPT_STR_STORE,
+ .off1 = offsetof(struct librpma_fio_options_values, server_ip),
+ .help = "IP address the server is listening on",
+ .def = "",
+ .category = FIO_OPT_C_ENGINE,
+ .group = FIO_OPT_G_LIBRPMA,
+ },
+ {
+ .name = "port",
+ .lname = "rpma_server port",
+ .type = FIO_OPT_STR_STORE,
+ .off1 = offsetof(struct librpma_fio_options_values, port),
+ .help = "port the server is listening on",
+ .def = "7204",
+ .category = FIO_OPT_C_ENGINE,
+ .group = FIO_OPT_G_LIBRPMA,
+ },
+ {
+ .name = "direct_write_to_pmem",
+ .lname = "Direct Write to PMem (via RDMA) from the remote host is possible",
+ .type = FIO_OPT_BOOL,
+ .off1 = offsetof(struct librpma_fio_options_values,
+ direct_write_to_pmem),
+ .help = "Set to true ONLY when Direct Write to PMem from the remote host is possible (https://pmem.io/rpma/documentation/basic-direct-write-to-pmem.html)",
+ .def = "",
+ .category = FIO_OPT_C_ENGINE,
+ .group = FIO_OPT_G_LIBRPMA,
+ },
+ {
+ .name = NULL,
+ },
+};
+
+int librpma_fio_td_port(const char *port_base_str, struct thread_data *td,
+ char *port_out)
+{
+ unsigned long int port_ul = strtoul(port_base_str, NULL, 10);
+ unsigned int port_new;
+
+ port_out[0] = '\0';
+
+ if (port_ul == ULONG_MAX) {
+ td_verror(td, errno, "strtoul");
+ return -1;
+ }
+ port_ul += td->thread_number - 1;
+ if (port_ul >= UINT_MAX) {
+ log_err("[%u] port number (%lu) bigger than UINT_MAX\n",
+ td->thread_number, port_ul);
+ return -1;
+ }
+
+ port_new = port_ul;
+ snprintf(port_out, LIBRPMA_FIO_PORT_STR_LEN_MAX - 1, "%u", port_new);
+
+ return 0;
+}
+
+char *librpma_fio_allocate_dram(struct thread_data *td, size_t size,
+ struct librpma_fio_mem *mem)
+{
+ char *mem_ptr = NULL;
+ int ret;
+
+ if ((ret = posix_memalign((void **)&mem_ptr, page_size, size))) {
+ log_err("fio: posix_memalign() failed\n");
+ td_verror(td, ret, "posix_memalign");
+ return NULL;
+ }
+
+ mem->mem_ptr = mem_ptr;
+ mem->size_mmap = 0;
+
+ return mem_ptr;
+}
+
+char *librpma_fio_allocate_pmem(struct thread_data *td, const char *filename,
+ size_t size, struct librpma_fio_mem *mem)
+{
+ size_t size_mmap = 0;
+ char *mem_ptr = NULL;
+ int is_pmem = 0;
+ size_t ws_offset;
+
+ if (size % page_size) {
+ log_err("fio: size (%zu) is not aligned to page size (%zu)\n",
+ size, page_size);
+ return NULL;
+ }
+
+ ws_offset = (td->thread_number - 1) * size;
+
+ if (!filename) {
+ log_err("fio: filename is not set\n");
+ return NULL;
+ }
+
+ /* map the file */
+ mem_ptr = pmem_map_file(filename, 0 /* len */, 0 /* flags */,
+ 0 /* mode */, &size_mmap, &is_pmem);
+ if (mem_ptr == NULL) {
+ log_err("fio: pmem_map_file(%s) failed\n", filename);
+ /* pmem_map_file() sets errno on failure */
+ td_verror(td, errno, "pmem_map_file");
+ return NULL;
+ }
+
+ /* pmem is expected */
+ if (!is_pmem) {
+ log_err("fio: %s is not located in persistent memory\n",
+ filename);
+ goto err_unmap;
+ }
+
+ /* check size of allocated persistent memory */
+ if (size_mmap < ws_offset + size) {
+ log_err(
+ "fio: %s is too small to handle so many threads (%zu < %zu)\n",
+ filename, size_mmap, ws_offset + size);
+ goto err_unmap;
+ }
+
+ log_info("fio: size of memory mapped from the file %s: %zu\n",
+ filename, size_mmap);
+
+ mem->mem_ptr = mem_ptr;
+ mem->size_mmap = size_mmap;
+
+ return mem_ptr + ws_offset;
+
+err_unmap:
+ (void) pmem_unmap(mem_ptr, size_mmap);
+ return NULL;
+}
+
+void librpma_fio_free(struct librpma_fio_mem *mem)
+{
+ if (mem->size_mmap)
+ (void) pmem_unmap(mem->mem_ptr, mem->size_mmap);
+ else
+ free(mem->mem_ptr);
+}
+
+#define LIBRPMA_FIO_RETRY_MAX_NO 10
+#define LIBRPMA_FIO_RETRY_DELAY_S 5
+
+int librpma_fio_client_init(struct thread_data *td,
+ struct rpma_conn_cfg *cfg)
+{
+ struct librpma_fio_client_data *ccd;
+ struct librpma_fio_options_values *o = td->eo;
+ struct ibv_context *dev = NULL;
+ char port_td[LIBRPMA_FIO_PORT_STR_LEN_MAX];
+ struct rpma_conn_req *req = NULL;
+ enum rpma_conn_event event;
+ struct rpma_conn_private_data pdata;
+ enum rpma_log_level log_level_aux = RPMA_LOG_LEVEL_WARNING;
+ int remote_flush_type;
+ int retry;
+ int ret;
+
+ /* --debug=net sets RPMA_LOG_THRESHOLD_AUX to RPMA_LOG_LEVEL_INFO */
+#ifdef FIO_INC_DEBUG
+ if ((1UL << FD_NET) & fio_debug)
+ log_level_aux = RPMA_LOG_LEVEL_INFO;
+#endif
+
+ /* configure logging thresholds to see more details */
+ rpma_log_set_threshold(RPMA_LOG_THRESHOLD, RPMA_LOG_LEVEL_INFO);
+ rpma_log_set_threshold(RPMA_LOG_THRESHOLD_AUX, log_level_aux);
+
+ /* obtain an IBV context for a remote IP address */
+ if ((ret = rpma_utils_get_ibv_context(o->server_ip,
+ RPMA_UTIL_IBV_CONTEXT_REMOTE, &dev))) {
+ librpma_td_verror(td, ret, "rpma_utils_get_ibv_context");
+ return -1;
+ }
+
+ /* allocate client's data */
+ ccd = calloc(1, sizeof(*ccd));
+ if (ccd == NULL) {
+ td_verror(td, errno, "calloc");
+ return -1;
+ }
+
+ /* allocate all in-memory queues */
+ ccd->io_us_queued = calloc(td->o.iodepth, sizeof(*ccd->io_us_queued));
+ if (ccd->io_us_queued == NULL) {
+ td_verror(td, errno, "calloc");
+ goto err_free_ccd;
+ }
+
+ ccd->io_us_flight = calloc(td->o.iodepth, sizeof(*ccd->io_us_flight));
+ if (ccd->io_us_flight == NULL) {
+ td_verror(td, errno, "calloc");
+ goto err_free_io_u_queues;
+ }
+
+ ccd->io_us_completed = calloc(td->o.iodepth,
+ sizeof(*ccd->io_us_completed));
+ if (ccd->io_us_completed == NULL) {
+ td_verror(td, errno, "calloc");
+ goto err_free_io_u_queues;
+ }
+
+ /* create a new peer object */
+ if ((ret = rpma_peer_new(dev, &ccd->peer))) {
+ librpma_td_verror(td, ret, "rpma_peer_new");
+ goto err_free_io_u_queues;
+ }
+
+ /* create a connection request */
+ if (librpma_fio_td_port(o->port, td, port_td))
+ goto err_peer_delete;
+
+ for (retry = 0; retry < LIBRPMA_FIO_RETRY_MAX_NO; retry++) {
+ if ((ret = rpma_conn_req_new(ccd->peer, o->server_ip, port_td,
+ cfg, &req))) {
+ librpma_td_verror(td, ret, "rpma_conn_req_new");
+ goto err_peer_delete;
+ }
+
+ /*
+ * Connect the connection request
+ * and obtain the connection object.
+ */
+ if ((ret = rpma_conn_req_connect(&req, NULL, &ccd->conn))) {
+ librpma_td_verror(td, ret, "rpma_conn_req_connect");
+ goto err_req_delete;
+ }
+
+ /* wait for the connection to establish */
+ if ((ret = rpma_conn_next_event(ccd->conn, &event))) {
+ librpma_td_verror(td, ret, "rpma_conn_next_event");
+ goto err_conn_delete;
+ } else if (event == RPMA_CONN_ESTABLISHED) {
+ break;
+ } else if (event == RPMA_CONN_REJECTED) {
+ (void) rpma_conn_disconnect(ccd->conn);
+ (void) rpma_conn_delete(&ccd->conn);
+ if (retry < LIBRPMA_FIO_RETRY_MAX_NO - 1) {
+ log_err("Thread [%d]: Retrying (#%i) ...\n",
+ td->thread_number, retry + 1);
+ sleep(LIBRPMA_FIO_RETRY_DELAY_S);
+ } else {
+ log_err(
+ "Thread [%d]: The maximum number of retries exceeded. Closing.\n",
+ td->thread_number);
+ }
+ } else {
+ log_err(
+ "rpma_conn_next_event returned an unexptected event: (%s != RPMA_CONN_ESTABLISHED)\n",
+ rpma_utils_conn_event_2str(event));
+ goto err_conn_delete;
+ }
+ }
+
+ if (retry > 0)
+ log_err("Thread [%d]: Connected after retry #%i\n",
+ td->thread_number, retry);
+
+ if (ccd->conn == NULL)
+ goto err_peer_delete;
+
+ /* get the connection's private data sent from the server */
+ if ((ret = rpma_conn_get_private_data(ccd->conn, &pdata))) {
+ librpma_td_verror(td, ret, "rpma_conn_get_private_data");
+ goto err_conn_delete;
+ }
+
+ /* get the server's workspace representation */
+ ccd->ws = pdata.ptr;
+
+ /* create the server's memory representation */
+ if ((ret = rpma_mr_remote_from_descriptor(&ccd->ws->descriptor[0],
+ ccd->ws->mr_desc_size, &ccd->server_mr))) {
+ librpma_td_verror(td, ret, "rpma_mr_remote_from_descriptor");
+ goto err_conn_delete;
+ }
+
+ /* get the total size of the shared server memory */
+ if ((ret = rpma_mr_remote_get_size(ccd->server_mr, &ccd->ws_size))) {
+ librpma_td_verror(td, ret, "rpma_mr_remote_get_size");
+ goto err_conn_delete;
+ }
+
+ /* get flush type of the remote node */
+ if ((ret = rpma_mr_remote_get_flush_type(ccd->server_mr,
+ &remote_flush_type))) {
+ librpma_td_verror(td, ret, "rpma_mr_remote_get_flush_type");
+ goto err_conn_delete;
+ }
+
+ ccd->server_mr_flush_type =
+ (remote_flush_type & RPMA_MR_USAGE_FLUSH_TYPE_PERSISTENT) ?
+ RPMA_FLUSH_TYPE_PERSISTENT : RPMA_FLUSH_TYPE_VISIBILITY;
+
+ /*
+ * Assure an io_us buffer allocation is page-size-aligned which is required
+ * to register for RDMA. User-provided value is intentionally ignored.
+ */
+ td->o.mem_align = page_size;
+
+ td->io_ops_data = ccd;
+
+ return 0;
+
+err_conn_delete:
+ (void) rpma_conn_disconnect(ccd->conn);
+ (void) rpma_conn_delete(&ccd->conn);
+
+err_req_delete:
+ (void) rpma_conn_req_delete(&req);
+
+err_peer_delete:
+ (void) rpma_peer_delete(&ccd->peer);
+
+err_free_io_u_queues:
+ free(ccd->io_us_queued);
+ free(ccd->io_us_flight);
+ free(ccd->io_us_completed);
+
+err_free_ccd:
+ free(ccd);
+
+ return -1;
+}
+
+void librpma_fio_client_cleanup(struct thread_data *td)
+{
+ struct librpma_fio_client_data *ccd = td->io_ops_data;
+ enum rpma_conn_event ev;
+ int ret;
+
+ if (ccd == NULL)
+ return;
+
+ /* delete the iou's memory registration */
+ if ((ret = rpma_mr_dereg(&ccd->orig_mr)))
+ librpma_td_verror(td, ret, "rpma_mr_dereg");
+ /* delete the iou's memory registration */
+ if ((ret = rpma_mr_remote_delete(&ccd->server_mr)))
+ librpma_td_verror(td, ret, "rpma_mr_remote_delete");
+ /* initiate disconnection */
+ if ((ret = rpma_conn_disconnect(ccd->conn)))
+ librpma_td_verror(td, ret, "rpma_conn_disconnect");
+ /* wait for disconnection to end up */
+ if ((ret = rpma_conn_next_event(ccd->conn, &ev))) {
+ librpma_td_verror(td, ret, "rpma_conn_next_event");
+ } else if (ev != RPMA_CONN_CLOSED) {
+ log_err(
+ "client_cleanup received an unexpected event (%s != RPMA_CONN_CLOSED)\n",
+ rpma_utils_conn_event_2str(ev));
+ }
+ /* delete the connection */
+ if ((ret = rpma_conn_delete(&ccd->conn)))
+ librpma_td_verror(td, ret, "rpma_conn_delete");
+ /* delete the peer */
+ if ((ret = rpma_peer_delete(&ccd->peer)))
+ librpma_td_verror(td, ret, "rpma_peer_delete");
+ /* free the software queues */
+ free(ccd->io_us_queued);
+ free(ccd->io_us_flight);
+ free(ccd->io_us_completed);
+ free(ccd);
+ td->io_ops_data = NULL; /* zero ccd */
+}
+
+int librpma_fio_file_nop(struct thread_data *td, struct fio_file *f)
+{
+ /* NOP */
+ return 0;
+}
+
+int librpma_fio_client_post_init(struct thread_data *td)
+{
+ struct librpma_fio_client_data *ccd = td->io_ops_data;
+ size_t io_us_size;
+ int ret;
+
+ /*
+ * td->orig_buffer is not aligned. The engine requires aligned io_us
+ * so FIO alignes up the address using the formula below.
+ */
+ ccd->orig_buffer_aligned = PTR_ALIGN(td->orig_buffer, page_mask) +
+ td->o.mem_align;
+
+ /*
+ * td->orig_buffer_size beside the space really consumed by io_us
+ * has paddings which can be omitted for the memory registration.
+ */
+ io_us_size = (unsigned long long)td_max_bs(td) *
+ (unsigned long long)td->o.iodepth;
+
+ if ((ret = rpma_mr_reg(ccd->peer, ccd->orig_buffer_aligned, io_us_size,
+ RPMA_MR_USAGE_READ_DST | RPMA_MR_USAGE_READ_SRC |
+ RPMA_MR_USAGE_WRITE_DST | RPMA_MR_USAGE_WRITE_SRC |
+ RPMA_MR_USAGE_FLUSH_TYPE_PERSISTENT, &ccd->orig_mr)))
+ librpma_td_verror(td, ret, "rpma_mr_reg");
+ return ret;
+}
+
+int librpma_fio_client_get_file_size(struct thread_data *td,
+ struct fio_file *f)
+{
+ struct librpma_fio_client_data *ccd = td->io_ops_data;
+
+ f->real_file_size = ccd->ws_size;
+ fio_file_set_size_known(f);
+
+ return 0;
+}
+
+static enum fio_q_status client_queue_sync(struct thread_data *td,
+ struct io_u *io_u)
+{
+ struct librpma_fio_client_data *ccd = td->io_ops_data;
+ struct rpma_completion cmpl;
+ unsigned io_u_index;
+ int ret;
+
+ /* execute io_u */
+ if (io_u->ddir == DDIR_READ) {
+ /* post an RDMA read operation */
+ if (librpma_fio_client_io_read(td, io_u,
+ RPMA_F_COMPLETION_ALWAYS))
+ goto err;
+ } else if (io_u->ddir == DDIR_WRITE) {
+ /* post an RDMA write operation */
+ if (librpma_fio_client_io_write(td, io_u))
+ goto err;
+ if (ccd->flush(td, io_u, io_u, io_u->xfer_buflen))
+ goto err;
+ } else {
+ log_err("unsupported IO mode: %s\n", io_ddir_name(io_u->ddir));
+ goto err;
+ }
+
+ do {
+ /* get a completion */
+ ret = rpma_conn_completion_get(ccd->conn, &cmpl);
+ if (ret == RPMA_E_NO_COMPLETION) {
+ /* lack of completion is not an error */
+ continue;
+ } else if (ret != 0) {
+ /* an error occurred */
+ librpma_td_verror(td, ret, "rpma_conn_completion_get");
+ goto err;
+ }
+
+ /* if io_us has completed with an error */
+ if (cmpl.op_status != IBV_WC_SUCCESS)
+ goto err;
+
+ if (cmpl.op == RPMA_OP_SEND)
+ ++ccd->op_send_completed;
+ else {
+ if (cmpl.op == RPMA_OP_RECV)
+ ++ccd->op_recv_completed;
+
+ break;
+ }
+ } while (1);
+
+ if (ccd->get_io_u_index(&cmpl, &io_u_index) != 1)
+ goto err;
+
+ if (io_u->index != io_u_index) {
+ log_err(
+ "no matching io_u for received completion found (io_u_index=%u)\n",
+ io_u_index);
+ goto err;
+ }
+
+ /* make sure all SENDs are completed before exit - clean up SQ */
+ if (librpma_fio_client_io_complete_all_sends(td))
+ goto err;
+
+ return FIO_Q_COMPLETED;
+
+err:
+ io_u->error = -1;
+ return FIO_Q_COMPLETED;
+}
+
+enum fio_q_status librpma_fio_client_queue(struct thread_data *td,
+ struct io_u *io_u)
+{
+ struct librpma_fio_client_data *ccd = td->io_ops_data;
+
+ if (ccd->io_u_queued_nr == (int)td->o.iodepth)
+ return FIO_Q_BUSY;
+
+ if (td->o.sync_io)
+ return client_queue_sync(td, io_u);
+
+ /* io_u -> queued[] */
+ ccd->io_us_queued[ccd->io_u_queued_nr] = io_u;
+ ccd->io_u_queued_nr++;
+
+ return FIO_Q_QUEUED;
+}
+
+int librpma_fio_client_commit(struct thread_data *td)
+{
+ struct librpma_fio_client_data *ccd = td->io_ops_data;
+ int flags = RPMA_F_COMPLETION_ON_ERROR;
+ struct timespec now;
+ bool fill_time;
+ int i;
+ struct io_u *flush_first_io_u = NULL;
+ unsigned long long int flush_len = 0;
+
+ if (!ccd->io_us_queued)
+ return -1;
+
+ /* execute all io_us from queued[] */
+ for (i = 0; i < ccd->io_u_queued_nr; i++) {
+ struct io_u *io_u = ccd->io_us_queued[i];
+
+ if (io_u->ddir == DDIR_READ) {
+ if (i + 1 == ccd->io_u_queued_nr ||
+ ccd->io_us_queued[i + 1]->ddir == DDIR_WRITE)
+ flags = RPMA_F_COMPLETION_ALWAYS;
+ /* post an RDMA read operation */
+ if (librpma_fio_client_io_read(td, io_u, flags))
+ return -1;
+ } else if (io_u->ddir == DDIR_WRITE) {
+ /* post an RDMA write operation */
+ if (librpma_fio_client_io_write(td, io_u))
+ return -1;
+
+ /* cache the first io_u in the sequence */
+ if (flush_first_io_u == NULL)
+ flush_first_io_u = io_u;
+
+ /*
+ * the flush length is the sum of all io_u's creating
+ * the sequence
+ */
+ flush_len += io_u->xfer_buflen;
+
+ /*
+ * if io_u's are random the rpma_flush is required
+ * after each one of them
+ */
+ if (!td_random(td)) {
+ /*
+ * When the io_u's are sequential and
+ * the current io_u is not the last one and
+ * the next one is also a write operation
+ * the flush can be postponed by one io_u and
+ * cover all of them which build a continuous
+ * sequence.
+ */
+ if ((i + 1 < ccd->io_u_queued_nr) &&
+ (ccd->io_us_queued[i + 1]->ddir == DDIR_WRITE))
+ continue;
+ }
+
+ /* flush all writes which build a continuous sequence */
+ if (ccd->flush(td, flush_first_io_u, io_u, flush_len))
+ return -1;
+
+ /*
+ * reset the flush parameters in preparation for
+ * the next one
+ */
+ flush_first_io_u = NULL;
+ flush_len = 0;
+ } else {
+ log_err("unsupported IO mode: %s\n",
+ io_ddir_name(io_u->ddir));
+ return -1;
+ }
+ }
+
+ if ((fill_time = fio_fill_issue_time(td)))
+ fio_gettime(&now, NULL);
+
+ /* move executed io_us from queued[] to flight[] */
+ for (i = 0; i < ccd->io_u_queued_nr; i++) {
+ struct io_u *io_u = ccd->io_us_queued[i];
+
+ /* FIO does not do this if the engine is asynchronous */
+ if (fill_time)
+ memcpy(&io_u->issue_time, &now, sizeof(now));
+
+ /* move executed io_us from queued[] to flight[] */
+ ccd->io_us_flight[ccd->io_u_flight_nr] = io_u;
+ ccd->io_u_flight_nr++;
+
+ /*
+ * FIO says:
+ * If an engine has the commit hook
+ * it has to call io_u_queued() itself.
+ */
+ io_u_queued(td, io_u);
+ }
+
+ /* FIO does not do this if an engine has the commit hook. */
+ io_u_mark_submit(td, ccd->io_u_queued_nr);
+ ccd->io_u_queued_nr = 0;
+
+ return 0;
+}
+
+/*
+ * RETURN VALUE
+ * - > 0 - a number of completed io_us
+ * - 0 - when no complicitions received
+ * - (-1) - when an error occurred
+ */
+static int client_getevent_process(struct thread_data *td)
+{
+ struct librpma_fio_client_data *ccd = td->io_ops_data;
+ struct rpma_completion cmpl;
+ /* io_u->index of completed io_u (cmpl.op_context) */
+ unsigned int io_u_index;
+ /* # of completed io_us */
+ int cmpl_num = 0;
+ /* helpers */
+ struct io_u *io_u;
+ int i;
+ int ret;
+
+ /* get a completion */
+ if ((ret = rpma_conn_completion_get(ccd->conn, &cmpl))) {
+ /* lack of completion is not an error */
+ if (ret == RPMA_E_NO_COMPLETION) {
+ /* lack of completion is not an error */
+ return 0;
+ }
+
+ /* an error occurred */
+ librpma_td_verror(td, ret, "rpma_conn_completion_get");
+ return -1;
+ }
+
+ /* if io_us has completed with an error */
+ if (cmpl.op_status != IBV_WC_SUCCESS) {
+ td->error = cmpl.op_status;
+ return -1;
+ }
+
+ if (cmpl.op == RPMA_OP_SEND)
+ ++ccd->op_send_completed;
+ else if (cmpl.op == RPMA_OP_RECV)
+ ++ccd->op_recv_completed;
+
+ if ((ret = ccd->get_io_u_index(&cmpl, &io_u_index)) != 1)
+ return ret;
+
+ /* look for an io_u being completed */
+ for (i = 0; i < ccd->io_u_flight_nr; ++i) {
+ if (ccd->io_us_flight[i]->index == io_u_index) {
+ cmpl_num = i + 1;
+ break;
+ }
+ }
+
+ /* if no matching io_u has been found */
+ if (cmpl_num == 0) {
+ log_err(
+ "no matching io_u for received completion found (io_u_index=%u)\n",
+ io_u_index);
+ return -1;
+ }
+
+ /* move completed io_us to the completed in-memory queue */
+ for (i = 0; i < cmpl_num; ++i) {
+ /* get and prepare io_u */
+ io_u = ccd->io_us_flight[i];
+
+ /* append to the queue */
+ ccd->io_us_completed[ccd->io_u_completed_nr] = io_u;
+ ccd->io_u_completed_nr++;
+ }
+
+ /* remove completed io_us from the flight queue */
+ for (i = cmpl_num; i < ccd->io_u_flight_nr; ++i)
+ ccd->io_us_flight[i - cmpl_num] = ccd->io_us_flight[i];
+ ccd->io_u_flight_nr -= cmpl_num;
+
+ return cmpl_num;
+}
+
+int librpma_fio_client_getevents(struct thread_data *td, unsigned int min,
+ unsigned int max, const struct timespec *t)
+{
+ struct librpma_fio_client_data *ccd = td->io_ops_data;
+ /* total # of completed io_us */
+ int cmpl_num_total = 0;
+ /* # of completed io_us from a single event */
+ int cmpl_num;
+
+ do {
+ cmpl_num = client_getevent_process(td);
+ if (cmpl_num > 0) {
+ /* new completions collected */
+ cmpl_num_total += cmpl_num;
+ } else if (cmpl_num == 0) {
+ /*
+ * It is required to make sure that CQEs for SENDs
+ * will flow at least at the same pace as CQEs for RECVs.
+ */
+ if (cmpl_num_total >= min &&
+ ccd->op_send_completed >= ccd->op_recv_completed)
+ break;
+
+ /*
+ * To reduce CPU consumption one can use
+ * the rpma_conn_completion_wait() function.
+ * Note this greatly increase the latency
+ * and make the results less stable.
+ * The bandwidth stays more or less the same.
+ */
+ } else {
+ /* an error occurred */
+ return -1;
+ }
+
+ /*
+ * The expected max can be exceeded if CQEs for RECVs will come up
+ * faster than CQEs for SENDs. But it is required to make sure CQEs for
+ * SENDs will flow at least at the same pace as CQEs for RECVs.
+ */
+ } while (cmpl_num_total < max ||
+ ccd->op_send_completed < ccd->op_recv_completed);
+
+ /*
+ * All posted SENDs are completed and RECVs for them (responses) are
+ * completed. This is the initial situation so the counters are reset.
+ */
+ if (ccd->op_send_posted == ccd->op_send_completed &&
+ ccd->op_send_completed == ccd->op_recv_completed) {
+ ccd->op_send_posted = 0;
+ ccd->op_send_completed = 0;
+ ccd->op_recv_completed = 0;
+ }
+
+ return cmpl_num_total;
+}
+
+struct io_u *librpma_fio_client_event(struct thread_data *td, int event)
+{
+ struct librpma_fio_client_data *ccd = td->io_ops_data;
+ struct io_u *io_u;
+ int i;
+
+ /* get the first io_u from the queue */
+ io_u = ccd->io_us_completed[0];
+
+ /* remove the first io_u from the queue */
+ for (i = 1; i < ccd->io_u_completed_nr; ++i)
+ ccd->io_us_completed[i - 1] = ccd->io_us_completed[i];
+ ccd->io_u_completed_nr--;
+
+ dprint_io_u(io_u, "client_event");
+
+ return io_u;
+}
+
+char *librpma_fio_client_errdetails(struct io_u *io_u)
+{
+ /* get the string representation of an error */
+ enum ibv_wc_status status = io_u->error;
+ const char *status_str = ibv_wc_status_str(status);
+
+ char *details = strdup(status_str);
+ if (details == NULL) {
+ fprintf(stderr, "Error: %s\n", status_str);
+ fprintf(stderr, "Fatal error: out of memory. Aborting.\n");
+ abort();
+ }
+
+ /* FIO frees the returned string when it becomes obsolete */
+ return details;
+}
+
+int librpma_fio_server_init(struct thread_data *td)
+{
+ struct librpma_fio_options_values *o = td->eo;
+ struct librpma_fio_server_data *csd;
+ struct ibv_context *dev = NULL;
+ enum rpma_log_level log_level_aux = RPMA_LOG_LEVEL_WARNING;
+ int ret = -1;
+
+ /* --debug=net sets RPMA_LOG_THRESHOLD_AUX to RPMA_LOG_LEVEL_INFO */
+#ifdef FIO_INC_DEBUG
+ if ((1UL << FD_NET) & fio_debug)
+ log_level_aux = RPMA_LOG_LEVEL_INFO;
+#endif
+
+ /* configure logging thresholds to see more details */
+ rpma_log_set_threshold(RPMA_LOG_THRESHOLD, RPMA_LOG_LEVEL_INFO);
+ rpma_log_set_threshold(RPMA_LOG_THRESHOLD_AUX, log_level_aux);
+
+
+ /* obtain an IBV context for a remote IP address */
+ if ((ret = rpma_utils_get_ibv_context(o->server_ip,
+ RPMA_UTIL_IBV_CONTEXT_LOCAL, &dev))) {
+ librpma_td_verror(td, ret, "rpma_utils_get_ibv_context");
+ return -1;
+ }
+
+ /* allocate server's data */
+ csd = calloc(1, sizeof(*csd));
+ if (csd == NULL) {
+ td_verror(td, errno, "calloc");
+ return -1;
+ }
+
+ /* create a new peer object */
+ if ((ret = rpma_peer_new(dev, &csd->peer))) {
+ librpma_td_verror(td, ret, "rpma_peer_new");
+ goto err_free_csd;
+ }
+
+ td->io_ops_data = csd;
+
+ return 0;
+
+err_free_csd:
+ free(csd);
+
+ return -1;
+}
+
+void librpma_fio_server_cleanup(struct thread_data *td)
+{
+ struct librpma_fio_server_data *csd = td->io_ops_data;
+ int ret;
+
+ if (csd == NULL)
+ return;
+
+ /* free the peer */
+ if ((ret = rpma_peer_delete(&csd->peer)))
+ librpma_td_verror(td, ret, "rpma_peer_delete");
+
+ free(csd);
+}
+
+int librpma_fio_server_open_file(struct thread_data *td, struct fio_file *f,
+ struct rpma_conn_cfg *cfg)
+{
+ struct librpma_fio_server_data *csd = td->io_ops_data;
+ struct librpma_fio_options_values *o = td->eo;
+ enum rpma_conn_event conn_event = RPMA_CONN_UNDEFINED;
+ struct librpma_fio_workspace ws = {0};
+ struct rpma_conn_private_data pdata;
+ uint32_t max_msg_num;
+ struct rpma_conn_req *conn_req;
+ struct rpma_conn *conn;
+ struct rpma_mr_local *mr;
+ char port_td[LIBRPMA_FIO_PORT_STR_LEN_MAX];
+ struct rpma_ep *ep;
+ size_t mem_size = td->o.size;
+ size_t mr_desc_size;
+ void *ws_ptr;
+ int usage_mem_type;
+ int ret;
+
+ if (!f->file_name) {
+ log_err("fio: filename is not set\n");
+ return -1;
+ }
+
+ /* start a listening endpoint at addr:port */
+ if (librpma_fio_td_port(o->port, td, port_td))
+ return -1;
+
+ if ((ret = rpma_ep_listen(csd->peer, o->server_ip, port_td, &ep))) {
+ librpma_td_verror(td, ret, "rpma_ep_listen");
+ return -1;
+ }
+
+ if (strcmp(f->file_name, "malloc") == 0) {
+ /* allocation from DRAM using posix_memalign() */
+ ws_ptr = librpma_fio_allocate_dram(td, mem_size, &csd->mem);
+ usage_mem_type = RPMA_MR_USAGE_FLUSH_TYPE_VISIBILITY;
+ } else {
+ /* allocation from PMEM using pmem_map_file() */
+ ws_ptr = librpma_fio_allocate_pmem(td, f->file_name,
+ mem_size, &csd->mem);
+ usage_mem_type = RPMA_MR_USAGE_FLUSH_TYPE_PERSISTENT;
+ }
+
+ if (ws_ptr == NULL)
+ goto err_ep_shutdown;
+
+ f->real_file_size = mem_size;
+
+ if ((ret = rpma_mr_reg(csd->peer, ws_ptr, mem_size,
+ RPMA_MR_USAGE_READ_DST | RPMA_MR_USAGE_READ_SRC |
+ RPMA_MR_USAGE_WRITE_DST | RPMA_MR_USAGE_WRITE_SRC |
+ usage_mem_type, &mr))) {
+ librpma_td_verror(td, ret, "rpma_mr_reg");
+ goto err_free;
+ }
+
+ /* get size of the memory region's descriptor */
+ if ((ret = rpma_mr_get_descriptor_size(mr, &mr_desc_size))) {
+ librpma_td_verror(td, ret, "rpma_mr_get_descriptor_size");
+ goto err_mr_dereg;
+ }
+
+ /* verify size of the memory region's descriptor */
+ if (mr_desc_size > LIBRPMA_FIO_DESCRIPTOR_MAX_SIZE) {
+ log_err(
+ "size of the memory region's descriptor is too big (max=%i)\n",
+ LIBRPMA_FIO_DESCRIPTOR_MAX_SIZE);
+ goto err_mr_dereg;
+ }
+
+ /* get the memory region's descriptor */
+ if ((ret = rpma_mr_get_descriptor(mr, &ws.descriptor[0]))) {
+ librpma_td_verror(td, ret, "rpma_mr_get_descriptor");
+ goto err_mr_dereg;
+ }
+
+ if (cfg != NULL) {
+ if ((ret = rpma_conn_cfg_get_rq_size(cfg, &max_msg_num))) {
+ librpma_td_verror(td, ret, "rpma_conn_cfg_get_rq_size");
+ goto err_mr_dereg;
+ }
+
+ /* verify whether iodepth fits into uint16_t */
+ if (max_msg_num > UINT16_MAX) {
+ log_err("fio: iodepth too big (%u > %u)\n",
+ max_msg_num, UINT16_MAX);
+ return -1;
+ }
+
+ ws.max_msg_num = max_msg_num;
+ }
+
+ /* prepare a workspace description */
+ ws.direct_write_to_pmem = o->direct_write_to_pmem;
+ ws.mr_desc_size = mr_desc_size;
+ pdata.ptr = &ws;
+ pdata.len = sizeof(ws);
+
+ /* receive an incoming connection request */
+ if ((ret = rpma_ep_next_conn_req(ep, cfg, &conn_req))) {
+ librpma_td_verror(td, ret, "rpma_ep_next_conn_req");
+ goto err_mr_dereg;
+ }
+
+ if (csd->prepare_connection && csd->prepare_connection(td, conn_req))
+ goto err_req_delete;
+
+ /* accept the connection request and obtain the connection object */
+ if ((ret = rpma_conn_req_connect(&conn_req, &pdata, &conn))) {
+ librpma_td_verror(td, ret, "rpma_conn_req_connect");
+ goto err_req_delete;
+ }
+
+ /* wait for the connection to be established */
+ if ((ret = rpma_conn_next_event(conn, &conn_event))) {
+ librpma_td_verror(td, ret, "rpma_conn_next_event");
+ goto err_conn_delete;
+ } else if (conn_event != RPMA_CONN_ESTABLISHED) {
+ log_err("rpma_conn_next_event returned an unexptected event\n");
+ goto err_conn_delete;
+ }
+
+ /* end-point is no longer needed */
+ (void) rpma_ep_shutdown(&ep);
+
+ csd->ws_mr = mr;
+ csd->ws_ptr = ws_ptr;
+ csd->conn = conn;
+
+ return 0;
+
+err_conn_delete:
+ (void) rpma_conn_delete(&conn);
+
+err_req_delete:
+ (void) rpma_conn_req_delete(&conn_req);
+
+err_mr_dereg:
+ (void) rpma_mr_dereg(&mr);
+
+err_free:
+ librpma_fio_free(&csd->mem);
+
+err_ep_shutdown:
+ (void) rpma_ep_shutdown(&ep);
+
+ return -1;
+}
+
+int librpma_fio_server_close_file(struct thread_data *td, struct fio_file *f)
+{
+ struct librpma_fio_server_data *csd = td->io_ops_data;
+ enum rpma_conn_event conn_event = RPMA_CONN_UNDEFINED;
+ int rv = 0;
+ int ret;
+
+ /* wait for the connection to be closed */
+ ret = rpma_conn_next_event(csd->conn, &conn_event);
+ if (!ret && conn_event != RPMA_CONN_CLOSED) {
+ log_err("rpma_conn_next_event returned an unexptected event\n");
+ rv = -1;
+ }
+
+ if ((ret = rpma_conn_disconnect(csd->conn))) {
+ librpma_td_verror(td, ret, "rpma_conn_disconnect");
+ rv = -1;
+ }
+
+ if ((ret = rpma_conn_delete(&csd->conn))) {
+ librpma_td_verror(td, ret, "rpma_conn_delete");
+ rv = -1;
+ }
+
+ if ((ret = rpma_mr_dereg(&csd->ws_mr))) {
+ librpma_td_verror(td, ret, "rpma_mr_dereg");
+ rv = -1;
+ }
+
+ librpma_fio_free(&csd->mem);
+
+ return rv;
+}
--- /dev/null
+/*
+ * librpma_fio: librpma_apm and librpma_gpspm engines' common header.
+ *
+ * Copyright 2021, Intel Corporation
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License,
+ * version 2 as published by the Free Software Foundation..
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ */
+
+#ifndef LIBRPMA_FIO_H
+#define LIBRPMA_FIO_H 1
+
+#include "../fio.h"
+#include "../optgroup.h"
+
+#include <librpma.h>
+
+/* servers' and clients' common */
+
+#define librpma_td_verror(td, err, func) \
+ td_vmsg((td), (err), rpma_err_2str(err), (func))
+
+/* ceil(a / b) = (a + b - 1) / b */
+#define LIBRPMA_FIO_CEIL(a, b) (((a) + (b) - 1) / (b))
+
+/* common option structure for server and client */
+struct librpma_fio_options_values {
+ /*
+ * FIO considers .off1 == 0 absent so the first meaningful field has to
+ * have padding ahead of it.
+ */
+ void *pad;
+ char *server_ip;
+ /* base server listening port */
+ char *port;
+ /* Direct Write to PMem is possible */
+ unsigned int direct_write_to_pmem;
+};
+
+extern struct fio_option librpma_fio_options[];
+
+/*
+ * Limited by the maximum length of the private data
+ * for rdma_connect() in case of RDMA_PS_TCP (28 bytes).
+ */
+#define LIBRPMA_FIO_DESCRIPTOR_MAX_SIZE 24
+
+struct librpma_fio_workspace {
+ uint16_t max_msg_num; /* # of RQ slots */
+ uint8_t direct_write_to_pmem; /* Direct Write to PMem is possible */
+ uint8_t mr_desc_size; /* size of mr_desc in descriptor[] */
+ /* buffer containing mr_desc */
+ char descriptor[LIBRPMA_FIO_DESCRIPTOR_MAX_SIZE];
+};
+
+#define LIBRPMA_FIO_PORT_STR_LEN_MAX 12
+
+int librpma_fio_td_port(const char *port_base_str, struct thread_data *td,
+ char *port_out);
+
+struct librpma_fio_mem {
+ /* memory buffer */
+ char *mem_ptr;
+
+ /* size of the mapped persistent memory */
+ size_t size_mmap;
+};
+
+char *librpma_fio_allocate_dram(struct thread_data *td, size_t size,
+ struct librpma_fio_mem *mem);
+
+char *librpma_fio_allocate_pmem(struct thread_data *td, const char *filename,
+ size_t size, struct librpma_fio_mem *mem);
+
+void librpma_fio_free(struct librpma_fio_mem *mem);
+
+/* clients' common */
+
+typedef int (*librpma_fio_flush_t)(struct thread_data *td,
+ struct io_u *first_io_u, struct io_u *last_io_u,
+ unsigned long long int len);
+
+/*
+ * RETURN VALUE
+ * - ( 1) - on success
+ * - ( 0) - skip
+ * - (-1) - on error
+ */
+typedef int (*librpma_fio_get_io_u_index_t)(struct rpma_completion *cmpl,
+ unsigned int *io_u_index);
+
+struct librpma_fio_client_data {
+ struct rpma_peer *peer;
+ struct rpma_conn *conn;
+
+ /* aligned td->orig_buffer */
+ char *orig_buffer_aligned;
+
+ /* ious's base address memory registration (cd->orig_buffer_aligned) */
+ struct rpma_mr_local *orig_mr;
+
+ struct librpma_fio_workspace *ws;
+
+ /* a server's memory representation */
+ struct rpma_mr_remote *server_mr;
+ enum rpma_flush_type server_mr_flush_type;
+
+ /* remote workspace description */
+ size_t ws_size;
+
+ /* in-memory queues */
+ struct io_u **io_us_queued;
+ int io_u_queued_nr;
+ struct io_u **io_us_flight;
+ int io_u_flight_nr;
+ struct io_u **io_us_completed;
+ int io_u_completed_nr;
+
+ /* SQ control. Note: all of them have to be kept in sync. */
+ uint32_t op_send_posted;
+ uint32_t op_send_completed;
+ uint32_t op_recv_completed;
+
+ librpma_fio_flush_t flush;
+ librpma_fio_get_io_u_index_t get_io_u_index;
+
+ /* engine-specific client data */
+ void *client_data;
+};
+
+int librpma_fio_client_init(struct thread_data *td,
+ struct rpma_conn_cfg *cfg);
+void librpma_fio_client_cleanup(struct thread_data *td);
+
+int librpma_fio_file_nop(struct thread_data *td, struct fio_file *f);
+int librpma_fio_client_get_file_size(struct thread_data *td,
+ struct fio_file *f);
+
+int librpma_fio_client_post_init(struct thread_data *td);
+
+enum fio_q_status librpma_fio_client_queue(struct thread_data *td,
+ struct io_u *io_u);
+
+int librpma_fio_client_commit(struct thread_data *td);
+
+int librpma_fio_client_getevents(struct thread_data *td, unsigned int min,
+ unsigned int max, const struct timespec *t);
+
+struct io_u *librpma_fio_client_event(struct thread_data *td, int event);
+
+char *librpma_fio_client_errdetails(struct io_u *io_u);
+
+static inline int librpma_fio_client_io_read(struct thread_data *td,
+ struct io_u *io_u, int flags)
+{
+ struct librpma_fio_client_data *ccd = td->io_ops_data;
+ size_t dst_offset = (char *)(io_u->xfer_buf) - ccd->orig_buffer_aligned;
+ size_t src_offset = io_u->offset;
+ int ret;
+
+ if ((ret = rpma_read(ccd->conn, ccd->orig_mr, dst_offset,
+ ccd->server_mr, src_offset, io_u->xfer_buflen,
+ flags, (void *)(uintptr_t)io_u->index))) {
+ librpma_td_verror(td, ret, "rpma_read");
+ return -1;
+ }
+
+ return 0;
+}
+
+static inline int librpma_fio_client_io_write(struct thread_data *td,
+ struct io_u *io_u)
+{
+ struct librpma_fio_client_data *ccd = td->io_ops_data;
+ size_t src_offset = (char *)(io_u->xfer_buf) - ccd->orig_buffer_aligned;
+ size_t dst_offset = io_u->offset;
+ int ret;
+
+ if ((ret = rpma_write(ccd->conn, ccd->server_mr, dst_offset,
+ ccd->orig_mr, src_offset, io_u->xfer_buflen,
+ RPMA_F_COMPLETION_ON_ERROR,
+ (void *)(uintptr_t)io_u->index))) {
+ librpma_td_verror(td, ret, "rpma_write");
+ return -1;
+ }
+
+ return 0;
+}
+
+static inline int librpma_fio_client_io_complete_all_sends(
+ struct thread_data *td)
+{
+ struct librpma_fio_client_data *ccd = td->io_ops_data;
+ struct rpma_completion cmpl;
+ int ret;
+
+ while (ccd->op_send_posted != ccd->op_send_completed) {
+ /* get a completion */
+ ret = rpma_conn_completion_get(ccd->conn, &cmpl);
+ if (ret == RPMA_E_NO_COMPLETION) {
+ /* lack of completion is not an error */
+ continue;
+ } else if (ret != 0) {
+ /* an error occurred */
+ librpma_td_verror(td, ret, "rpma_conn_completion_get");
+ break;
+ }
+
+ if (cmpl.op_status != IBV_WC_SUCCESS)
+ return -1;
+
+ if (cmpl.op == RPMA_OP_SEND)
+ ++ccd->op_send_completed;
+ else {
+ log_err(
+ "A completion other than RPMA_OP_SEND got during cleaning up the CQ from SENDs\n");
+ return -1;
+ }
+ }
+
+ /*
+ * All posted SENDs are completed and RECVs for them (responses) are
+ * completed. This is the initial situation so the counters are reset.
+ */
+ if (ccd->op_send_posted == ccd->op_send_completed &&
+ ccd->op_send_completed == ccd->op_recv_completed) {
+ ccd->op_send_posted = 0;
+ ccd->op_send_completed = 0;
+ ccd->op_recv_completed = 0;
+ }
+
+ return 0;
+}
+
+/* servers' common */
+
+typedef int (*librpma_fio_prepare_connection_t)(
+ struct thread_data *td,
+ struct rpma_conn_req *conn_req);
+
+struct librpma_fio_server_data {
+ struct rpma_peer *peer;
+
+ /* resources of an incoming connection */
+ struct rpma_conn *conn;
+
+ char *ws_ptr;
+ struct rpma_mr_local *ws_mr;
+ struct librpma_fio_mem mem;
+
+ /* engine-specific server data */
+ void *server_data;
+
+ librpma_fio_prepare_connection_t prepare_connection;
+};
+
+int librpma_fio_server_init(struct thread_data *td);
+
+void librpma_fio_server_cleanup(struct thread_data *td);
+
+int librpma_fio_server_open_file(struct thread_data *td,
+ struct fio_file *f, struct rpma_conn_cfg *cfg);
+
+int librpma_fio_server_close_file(struct thread_data *td,
+ struct fio_file *f);
+
+#endif /* LIBRPMA_FIO_H */
--- /dev/null
+/*
+ * librpma_gpspm: IO engine that uses PMDK librpma to write data,
+ * based on General Purpose Server Persistency Method
+ *
+ * Copyright 2020-2021, Intel Corporation
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License,
+ * version 2 as published by the Free Software Foundation..
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ */
+
+#include "librpma_fio.h"
+
+#include <libpmem.h>
+
+/* Generated by the protocol buffer compiler from: librpma_gpspm_flush.proto */
+#include "librpma_gpspm_flush.pb-c.h"
+
+#define MAX_MSG_SIZE (512)
+#define IO_U_BUF_LEN (2 * MAX_MSG_SIZE)
+#define SEND_OFFSET (0)
+#define RECV_OFFSET (SEND_OFFSET + MAX_MSG_SIZE)
+
+#define GPSPM_FLUSH_REQUEST__LAST \
+ { PROTOBUF_C_MESSAGE_INIT(&gpspm_flush_request__descriptor), 0, 0, 0 }
+
+/*
+ * 'Flush_req_last' is the last flush request
+ * the client has to send to server to indicate
+ * that the client is done.
+ */
+static const GPSPMFlushRequest Flush_req_last = GPSPM_FLUSH_REQUEST__LAST;
+
+#define IS_NOT_THE_LAST_MESSAGE(flush_req) \
+ (flush_req->length != Flush_req_last.length || \
+ flush_req->offset != Flush_req_last.offset)
+
+/* client side implementation */
+
+/* get next io_u message buffer in the round-robin fashion */
+#define IO_U_NEXT_BUF_OFF_CLIENT(cd) \
+ (IO_U_BUF_LEN * ((cd->msg_curr++) % cd->msg_num))
+
+struct client_data {
+ /* memory for sending and receiving buffered */
+ char *io_us_msgs;
+
+ /* resources for messaging buffer */
+ uint32_t msg_num;
+ uint32_t msg_curr;
+ struct rpma_mr_local *msg_mr;
+};
+
+static inline int client_io_flush(struct thread_data *td,
+ struct io_u *first_io_u, struct io_u *last_io_u,
+ unsigned long long int len);
+
+static int client_get_io_u_index(struct rpma_completion *cmpl,
+ unsigned int *io_u_index);
+
+static int client_init(struct thread_data *td)
+{
+ struct librpma_fio_client_data *ccd;
+ struct client_data *cd;
+ uint32_t write_num;
+ struct rpma_conn_cfg *cfg = NULL;
+ int ret;
+
+ /*
+ * not supported:
+ * - readwrite = read / trim / randread / randtrim /
+ * / rw / randrw / trimwrite
+ */
+ if (td_read(td) || td_trim(td)) {
+ td_verror(td, EINVAL, "Not supported mode.");
+ return -1;
+ }
+
+ /* allocate client's data */
+ cd = calloc(1, sizeof(*cd));
+ if (cd == NULL) {
+ td_verror(td, errno, "calloc");
+ return -1;
+ }
+
+ /*
+ * Calculate the required number of WRITEs and FLUSHes.
+ *
+ * Note: Each flush is a request (SEND) and response (RECV) pair.
+ */
+ if (td_random(td)) {
+ write_num = td->o.iodepth; /* WRITE * N */
+ cd->msg_num = td->o.iodepth; /* FLUSH * N */
+ } else {
+ if (td->o.sync_io) {
+ write_num = 1; /* WRITE */
+ cd->msg_num = 1; /* FLUSH */
+ } else {
+ write_num = td->o.iodepth; /* WRITE * N */
+ /*
+ * FLUSH * B where:
+ * - B == ceil(iodepth / iodepth_batch)
+ * which is the number of batches for N writes
+ */
+ cd->msg_num = LIBRPMA_FIO_CEIL(td->o.iodepth,
+ td->o.iodepth_batch);
+ }
+ }
+
+ /* create a connection configuration object */
+ if ((ret = rpma_conn_cfg_new(&cfg))) {
+ librpma_td_verror(td, ret, "rpma_conn_cfg_new");
+ goto err_free_cd;
+ }
+
+ /*
+ * Calculate the required queue sizes where:
+ * - the send queue (SQ) has to be big enough to accommodate
+ * all io_us (WRITEs) and all flush requests (SENDs)
+ * - the receive queue (RQ) has to be big enough to accommodate
+ * all flush responses (RECVs)
+ * - the completion queue (CQ) has to be big enough to accommodate all
+ * success and error completions (sq_size + rq_size)
+ */
+ if ((ret = rpma_conn_cfg_set_sq_size(cfg, write_num + cd->msg_num))) {
+ librpma_td_verror(td, ret, "rpma_conn_cfg_set_sq_size");
+ goto err_cfg_delete;
+ }
+ if ((ret = rpma_conn_cfg_set_rq_size(cfg, cd->msg_num))) {
+ librpma_td_verror(td, ret, "rpma_conn_cfg_set_rq_size");
+ goto err_cfg_delete;
+ }
+ if ((ret = rpma_conn_cfg_set_cq_size(cfg, write_num + cd->msg_num * 2))) {
+ librpma_td_verror(td, ret, "rpma_conn_cfg_set_cq_size");
+ goto err_cfg_delete;
+ }
+
+ if (librpma_fio_client_init(td, cfg))
+ goto err_cfg_delete;
+
+ ccd = td->io_ops_data;
+
+ if (ccd->ws->direct_write_to_pmem &&
+ ccd->server_mr_flush_type == RPMA_FLUSH_TYPE_PERSISTENT &&
+ td->thread_number == 1) {
+ /* XXX log_info mixes with the JSON output */
+ log_err(
+ "Note: The server side supports Direct Write to PMem and it is equipped with PMem (direct_write_to_pmem).\n"
+ "You can use librpma_client and librpma_server engines for better performance instead of GPSPM.\n");
+ }
+
+ /* validate the server's RQ capacity */
+ if (cd->msg_num > ccd->ws->max_msg_num) {
+ log_err(
+ "server's RQ size (iodepth) too small to handle the client's workspace requirements (%u < %u)\n",
+ ccd->ws->max_msg_num, cd->msg_num);
+ goto err_cleanup_common;
+ }
+
+ if ((ret = rpma_conn_cfg_delete(&cfg))) {
+ librpma_td_verror(td, ret, "rpma_conn_cfg_delete");
+ /* non fatal error - continue */
+ }
+
+ ccd->flush = client_io_flush;
+ ccd->get_io_u_index = client_get_io_u_index;
+ ccd->client_data = cd;
+
+ return 0;
+
+err_cleanup_common:
+ librpma_fio_client_cleanup(td);
+
+err_cfg_delete:
+ (void) rpma_conn_cfg_delete(&cfg);
+
+err_free_cd:
+ free(cd);
+
+ return -1;
+}
+
+static int client_post_init(struct thread_data *td)
+{
+ struct librpma_fio_client_data *ccd = td->io_ops_data;
+ struct client_data *cd = ccd->client_data;
+ unsigned int io_us_msgs_size;
+ int ret;
+
+ /* message buffers initialization and registration */
+ io_us_msgs_size = cd->msg_num * IO_U_BUF_LEN;
+ if ((ret = posix_memalign((void **)&cd->io_us_msgs, page_size,
+ io_us_msgs_size))) {
+ td_verror(td, ret, "posix_memalign");
+ return ret;
+ }
+ if ((ret = rpma_mr_reg(ccd->peer, cd->io_us_msgs, io_us_msgs_size,
+ RPMA_MR_USAGE_SEND | RPMA_MR_USAGE_RECV,
+ &cd->msg_mr))) {
+ librpma_td_verror(td, ret, "rpma_mr_reg");
+ return ret;
+ }
+
+ return librpma_fio_client_post_init(td);
+}
+
+static void client_cleanup(struct thread_data *td)
+{
+ struct librpma_fio_client_data *ccd = td->io_ops_data;
+ struct client_data *cd;
+ size_t flush_req_size;
+ size_t io_u_buf_off;
+ size_t send_offset;
+ void *send_ptr;
+ int ret;
+
+ if (ccd == NULL)
+ return;
+
+ cd = ccd->client_data;
+ if (cd == NULL) {
+ librpma_fio_client_cleanup(td);
+ return;
+ }
+
+ /*
+ * Make sure all SEND completions are collected ergo there are free
+ * slots in the SQ for the last SEND message.
+ *
+ * Note: If any operation will fail we still can send the termination
+ * notice.
+ */
+ (void) librpma_fio_client_io_complete_all_sends(td);
+
+ /* prepare the last flush message and pack it to the send buffer */
+ flush_req_size = gpspm_flush_request__get_packed_size(&Flush_req_last);
+ if (flush_req_size > MAX_MSG_SIZE) {
+ log_err(
+ "Packed flush request size is bigger than available send buffer space (%zu > %d\n",
+ flush_req_size, MAX_MSG_SIZE);
+ } else {
+ io_u_buf_off = IO_U_NEXT_BUF_OFF_CLIENT(cd);
+ send_offset = io_u_buf_off + SEND_OFFSET;
+ send_ptr = cd->io_us_msgs + send_offset;
+ (void) gpspm_flush_request__pack(&Flush_req_last, send_ptr);
+
+ /* send the flush message */
+ if ((ret = rpma_send(ccd->conn, cd->msg_mr, send_offset,
+ flush_req_size, RPMA_F_COMPLETION_ALWAYS,
+ NULL)))
+ librpma_td_verror(td, ret, "rpma_send");
+
+ ++ccd->op_send_posted;
+
+ /* Wait for the SEND to complete */
+ (void) librpma_fio_client_io_complete_all_sends(td);
+ }
+
+ /* deregister the messaging buffer memory */
+ if ((ret = rpma_mr_dereg(&cd->msg_mr)))
+ librpma_td_verror(td, ret, "rpma_mr_dereg");
+
+ free(ccd->client_data);
+
+ librpma_fio_client_cleanup(td);
+}
+
+static inline int client_io_flush(struct thread_data *td,
+ struct io_u *first_io_u, struct io_u *last_io_u,
+ unsigned long long int len)
+{
+ struct librpma_fio_client_data *ccd = td->io_ops_data;
+ struct client_data *cd = ccd->client_data;
+ size_t io_u_buf_off = IO_U_NEXT_BUF_OFF_CLIENT(cd);
+ size_t send_offset = io_u_buf_off + SEND_OFFSET;
+ size_t recv_offset = io_u_buf_off + RECV_OFFSET;
+ void *send_ptr = cd->io_us_msgs + send_offset;
+ void *recv_ptr = cd->io_us_msgs + recv_offset;
+ GPSPMFlushRequest flush_req = GPSPM_FLUSH_REQUEST__INIT;
+ size_t flush_req_size = 0;
+ int ret;
+
+ /* prepare a response buffer */
+ if ((ret = rpma_recv(ccd->conn, cd->msg_mr, recv_offset, MAX_MSG_SIZE,
+ recv_ptr))) {
+ librpma_td_verror(td, ret, "rpma_recv");
+ return -1;
+ }
+
+ /* prepare a flush message and pack it to a send buffer */
+ flush_req.offset = first_io_u->offset;
+ flush_req.length = len;
+ flush_req.op_context = last_io_u->index;
+ flush_req_size = gpspm_flush_request__get_packed_size(&flush_req);
+ if (flush_req_size > MAX_MSG_SIZE) {
+ log_err(
+ "Packed flush request size is bigger than available send buffer space (%"
+ PRIu64 " > %d\n", flush_req_size, MAX_MSG_SIZE);
+ return -1;
+ }
+ (void) gpspm_flush_request__pack(&flush_req, send_ptr);
+
+ /* send the flush message */
+ if ((ret = rpma_send(ccd->conn, cd->msg_mr, send_offset, flush_req_size,
+ RPMA_F_COMPLETION_ALWAYS, NULL))) {
+ librpma_td_verror(td, ret, "rpma_send");
+ return -1;
+ }
+
+ ++ccd->op_send_posted;
+
+ return 0;
+}
+
+static int client_get_io_u_index(struct rpma_completion *cmpl,
+ unsigned int *io_u_index)
+{
+ GPSPMFlushResponse *flush_resp;
+
+ if (cmpl->op != RPMA_OP_RECV)
+ return 0;
+
+ /* unpack a response from the received buffer */
+ flush_resp = gpspm_flush_response__unpack(NULL,
+ cmpl->byte_len, cmpl->op_context);
+ if (flush_resp == NULL) {
+ log_err("Cannot unpack the flush response buffer\n");
+ return -1;
+ }
+
+ memcpy(io_u_index, &flush_resp->op_context, sizeof(*io_u_index));
+
+ gpspm_flush_response__free_unpacked(flush_resp, NULL);
+
+ return 1;
+}
+
+FIO_STATIC struct ioengine_ops ioengine_client = {
+ .name = "librpma_gpspm_client",
+ .version = FIO_IOOPS_VERSION,
+ .init = client_init,
+ .post_init = client_post_init,
+ .get_file_size = librpma_fio_client_get_file_size,
+ .open_file = librpma_fio_file_nop,
+ .queue = librpma_fio_client_queue,
+ .commit = librpma_fio_client_commit,
+ .getevents = librpma_fio_client_getevents,
+ .event = librpma_fio_client_event,
+ .errdetails = librpma_fio_client_errdetails,
+ .close_file = librpma_fio_file_nop,
+ .cleanup = client_cleanup,
+ .flags = FIO_DISKLESSIO,
+ .options = librpma_fio_options,
+ .option_struct_size = sizeof(struct librpma_fio_options_values),
+};
+
+/* server side implementation */
+
+#define IO_U_BUFF_OFF_SERVER(i) (i * IO_U_BUF_LEN)
+
+struct server_data {
+ /* aligned td->orig_buffer */
+ char *orig_buffer_aligned;
+
+ /* resources for messaging buffer from DRAM allocated by fio */
+ struct rpma_mr_local *msg_mr;
+
+ uint32_t msg_sqe_available; /* # of free SQ slots */
+
+ /* in-memory queues */
+ struct rpma_completion *msgs_queued;
+ uint32_t msg_queued_nr;
+};
+
+static int server_init(struct thread_data *td)
+{
+ struct librpma_fio_server_data *csd;
+ struct server_data *sd;
+ int ret = -1;
+
+ if ((ret = librpma_fio_server_init(td)))
+ return ret;
+
+ csd = td->io_ops_data;
+
+ /* allocate server's data */
+ sd = calloc(1, sizeof(*sd));
+ if (sd == NULL) {
+ td_verror(td, errno, "calloc");
+ goto err_server_cleanup;
+ }
+
+ /* allocate in-memory queue */
+ sd->msgs_queued = calloc(td->o.iodepth, sizeof(*sd->msgs_queued));
+ if (sd->msgs_queued == NULL) {
+ td_verror(td, errno, "calloc");
+ goto err_free_sd;
+ }
+
+ /*
+ * Assure a single io_u buffer can store both SEND and RECV messages and
+ * an io_us buffer allocation is page-size-aligned which is required
+ * to register for RDMA. User-provided values are intentionally ignored.
+ */
+ td->o.max_bs[DDIR_READ] = IO_U_BUF_LEN;
+ td->o.mem_align = page_size;
+
+ csd->server_data = sd;
+
+ return 0;
+
+err_free_sd:
+ free(sd);
+
+err_server_cleanup:
+ librpma_fio_server_cleanup(td);
+
+ return -1;
+}
+
+static int server_post_init(struct thread_data *td)
+{
+ struct librpma_fio_server_data *csd = td->io_ops_data;
+ struct server_data *sd = csd->server_data;
+ size_t io_us_size;
+ size_t io_u_buflen;
+ int ret;
+
+ /*
+ * td->orig_buffer is not aligned. The engine requires aligned io_us
+ * so FIO alignes up the address using the formula below.
+ */
+ sd->orig_buffer_aligned = PTR_ALIGN(td->orig_buffer, page_mask) +
+ td->o.mem_align;
+
+ /*
+ * XXX
+ * Each io_u message buffer contains recv and send messages.
+ * Aligning each of those buffers may potentially give
+ * some performance benefits.
+ */
+ io_u_buflen = td_max_bs(td);
+
+ /* check whether io_u buffer is big enough */
+ if (io_u_buflen < IO_U_BUF_LEN) {
+ log_err(
+ "blocksize too small to accommodate assumed maximal request/response pair size (%" PRIu64 " < %d)\n",
+ io_u_buflen, IO_U_BUF_LEN);
+ return -1;
+ }
+
+ /*
+ * td->orig_buffer_size beside the space really consumed by io_us
+ * has paddings which can be omitted for the memory registration.
+ */
+ io_us_size = (unsigned long long)io_u_buflen *
+ (unsigned long long)td->o.iodepth;
+
+ if ((ret = rpma_mr_reg(csd->peer, sd->orig_buffer_aligned, io_us_size,
+ RPMA_MR_USAGE_SEND | RPMA_MR_USAGE_RECV,
+ &sd->msg_mr))) {
+ librpma_td_verror(td, ret, "rpma_mr_reg");
+ return -1;
+ }
+
+ return 0;
+}
+
+static void server_cleanup(struct thread_data *td)
+{
+ struct librpma_fio_server_data *csd = td->io_ops_data;
+ struct server_data *sd;
+ int ret;
+
+ if (csd == NULL)
+ return;
+
+ sd = csd->server_data;
+
+ if (sd != NULL) {
+ /* rpma_mr_dereg(messaging buffer from DRAM) */
+ if ((ret = rpma_mr_dereg(&sd->msg_mr)))
+ librpma_td_verror(td, ret, "rpma_mr_dereg");
+
+ free(sd->msgs_queued);
+ free(sd);
+ }
+
+ librpma_fio_server_cleanup(td);
+}
+
+static int prepare_connection(struct thread_data *td,
+ struct rpma_conn_req *conn_req)
+{
+ struct librpma_fio_server_data *csd = td->io_ops_data;
+ struct server_data *sd = csd->server_data;
+ int ret;
+ int i;
+
+ /* prepare buffers for a flush requests */
+ sd->msg_sqe_available = td->o.iodepth;
+ for (i = 0; i < td->o.iodepth; i++) {
+ size_t offset_recv_msg = IO_U_BUFF_OFF_SERVER(i) + RECV_OFFSET;
+ if ((ret = rpma_conn_req_recv(conn_req, sd->msg_mr,
+ offset_recv_msg, MAX_MSG_SIZE,
+ (const void *)(uintptr_t)i))) {
+ librpma_td_verror(td, ret, "rpma_conn_req_recv");
+ return ret;
+ }
+ }
+
+ return 0;
+}
+
+static int server_open_file(struct thread_data *td, struct fio_file *f)
+{
+ struct librpma_fio_server_data *csd = td->io_ops_data;
+ struct rpma_conn_cfg *cfg = NULL;
+ uint16_t max_msg_num = td->o.iodepth;
+ int ret;
+
+ csd->prepare_connection = prepare_connection;
+
+ /* create a connection configuration object */
+ if ((ret = rpma_conn_cfg_new(&cfg))) {
+ librpma_td_verror(td, ret, "rpma_conn_cfg_new");
+ return -1;
+ }
+
+ /*
+ * Calculate the required queue sizes where:
+ * - the send queue (SQ) has to be big enough to accommodate
+ * all possible flush requests (SENDs)
+ * - the receive queue (RQ) has to be big enough to accommodate
+ * all flush responses (RECVs)
+ * - the completion queue (CQ) has to be big enough to accommodate
+ * all success and error completions (sq_size + rq_size)
+ */
+ if ((ret = rpma_conn_cfg_set_sq_size(cfg, max_msg_num))) {
+ librpma_td_verror(td, ret, "rpma_conn_cfg_set_sq_size");
+ goto err_cfg_delete;
+ }
+ if ((ret = rpma_conn_cfg_set_rq_size(cfg, max_msg_num))) {
+ librpma_td_verror(td, ret, "rpma_conn_cfg_set_rq_size");
+ goto err_cfg_delete;
+ }
+ if ((ret = rpma_conn_cfg_set_cq_size(cfg, max_msg_num * 2))) {
+ librpma_td_verror(td, ret, "rpma_conn_cfg_set_cq_size");
+ goto err_cfg_delete;
+ }
+
+ ret = librpma_fio_server_open_file(td, f, cfg);
+
+err_cfg_delete:
+ (void) rpma_conn_cfg_delete(&cfg);
+
+ return ret;
+}
+
+static int server_qe_process(struct thread_data *td,
+ struct rpma_completion *cmpl)
+{
+ struct librpma_fio_server_data *csd = td->io_ops_data;
+ struct server_data *sd = csd->server_data;
+ GPSPMFlushRequest *flush_req;
+ GPSPMFlushResponse flush_resp = GPSPM_FLUSH_RESPONSE__INIT;
+ size_t flush_resp_size = 0;
+ size_t send_buff_offset;
+ size_t recv_buff_offset;
+ size_t io_u_buff_offset;
+ void *send_buff_ptr;
+ void *recv_buff_ptr;
+ void *op_ptr;
+ int msg_index;
+ int ret;
+
+ /* calculate SEND/RECV pair parameters */
+ msg_index = (int)(uintptr_t)cmpl->op_context;
+ io_u_buff_offset = IO_U_BUFF_OFF_SERVER(msg_index);
+ send_buff_offset = io_u_buff_offset + SEND_OFFSET;
+ recv_buff_offset = io_u_buff_offset + RECV_OFFSET;
+ send_buff_ptr = sd->orig_buffer_aligned + send_buff_offset;
+ recv_buff_ptr = sd->orig_buffer_aligned + recv_buff_offset;
+
+ /* unpack a flush request from the received buffer */
+ flush_req = gpspm_flush_request__unpack(NULL, cmpl->byte_len,
+ recv_buff_ptr);
+ if (flush_req == NULL) {
+ log_err("cannot unpack the flush request buffer\n");
+ goto err_terminate;
+ }
+
+ if (IS_NOT_THE_LAST_MESSAGE(flush_req)) {
+ op_ptr = csd->ws_ptr + flush_req->offset;
+ pmem_persist(op_ptr, flush_req->length);
+ } else {
+ /*
+ * This is the last message - the client is done.
+ */
+ gpspm_flush_request__free_unpacked(flush_req, NULL);
+ td->done = true;
+ return 0;
+ }
+
+ /* initiate the next receive operation */
+ if ((ret = rpma_recv(csd->conn, sd->msg_mr, recv_buff_offset,
+ MAX_MSG_SIZE,
+ (const void *)(uintptr_t)msg_index))) {
+ librpma_td_verror(td, ret, "rpma_recv");
+ goto err_free_unpacked;
+ }
+
+ /* prepare a flush response and pack it to a send buffer */
+ flush_resp.op_context = flush_req->op_context;
+ flush_resp_size = gpspm_flush_response__get_packed_size(&flush_resp);
+ if (flush_resp_size > MAX_MSG_SIZE) {
+ log_err(
+ "Size of the packed flush response is bigger than the available space of the send buffer (%"
+ PRIu64 " > %i\n", flush_resp_size, MAX_MSG_SIZE);
+ goto err_free_unpacked;
+ }
+
+ (void) gpspm_flush_response__pack(&flush_resp, send_buff_ptr);
+
+ /* send the flush response */
+ if ((ret = rpma_send(csd->conn, sd->msg_mr, send_buff_offset,
+ flush_resp_size, RPMA_F_COMPLETION_ALWAYS, NULL))) {
+ librpma_td_verror(td, ret, "rpma_send");
+ goto err_free_unpacked;
+ }
+ --sd->msg_sqe_available;
+
+ gpspm_flush_request__free_unpacked(flush_req, NULL);
+
+ return 0;
+
+err_free_unpacked:
+ gpspm_flush_request__free_unpacked(flush_req, NULL);
+
+err_terminate:
+ td->terminate = true;
+
+ return -1;
+}
+
+static inline int server_queue_process(struct thread_data *td)
+{
+ struct librpma_fio_server_data *csd = td->io_ops_data;
+ struct server_data *sd = csd->server_data;
+ int ret;
+ int i;
+
+ /* min(# of queue entries, # of SQ entries available) */
+ uint32_t qes_to_process = min(sd->msg_queued_nr, sd->msg_sqe_available);
+ if (qes_to_process == 0)
+ return 0;
+
+ /* process queued completions */
+ for (i = 0; i < qes_to_process; ++i) {
+ if ((ret = server_qe_process(td, &sd->msgs_queued[i])))
+ return ret;
+ }
+
+ /* progress the queue */
+ for (i = 0; i < sd->msg_queued_nr - qes_to_process; ++i) {
+ memcpy(&sd->msgs_queued[i],
+ &sd->msgs_queued[qes_to_process + i],
+ sizeof(sd->msgs_queued[i]));
+ }
+
+ sd->msg_queued_nr -= qes_to_process;
+
+ return 0;
+}
+
+static int server_cmpl_process(struct thread_data *td)
+{
+ struct librpma_fio_server_data *csd = td->io_ops_data;
+ struct server_data *sd = csd->server_data;
+ struct rpma_completion *cmpl = &sd->msgs_queued[sd->msg_queued_nr];
+ int ret;
+
+ ret = rpma_conn_completion_get(csd->conn, cmpl);
+ if (ret == RPMA_E_NO_COMPLETION) {
+ /* lack of completion is not an error */
+ return 0;
+ } else if (ret != 0) {
+ librpma_td_verror(td, ret, "rpma_conn_completion_get");
+ goto err_terminate;
+ }
+
+ /* validate the completion */
+ if (cmpl->op_status != IBV_WC_SUCCESS)
+ goto err_terminate;
+
+ if (cmpl->op == RPMA_OP_RECV)
+ ++sd->msg_queued_nr;
+ else if (cmpl->op == RPMA_OP_SEND)
+ ++sd->msg_sqe_available;
+
+ return 0;
+
+err_terminate:
+ td->terminate = true;
+
+ return -1;
+}
+
+static enum fio_q_status server_queue(struct thread_data *td, struct io_u *io_u)
+{
+ do {
+ if (server_cmpl_process(td))
+ return FIO_Q_BUSY;
+
+ if (server_queue_process(td))
+ return FIO_Q_BUSY;
+
+ } while (!td->done);
+
+ return FIO_Q_COMPLETED;
+}
+
+FIO_STATIC struct ioengine_ops ioengine_server = {
+ .name = "librpma_gpspm_server",
+ .version = FIO_IOOPS_VERSION,
+ .init = server_init,
+ .post_init = server_post_init,
+ .open_file = server_open_file,
+ .close_file = librpma_fio_server_close_file,
+ .queue = server_queue,
+ .invalidate = librpma_fio_file_nop,
+ .cleanup = server_cleanup,
+ .flags = FIO_SYNCIO,
+ .options = librpma_fio_options,
+ .option_struct_size = sizeof(struct librpma_fio_options_values),
+};
+
+/* register both engines */
+
+static void fio_init fio_librpma_gpspm_register(void)
+{
+ register_ioengine(&ioengine_client);
+ register_ioengine(&ioengine_server);
+}
+
+static void fio_exit fio_librpma_gpspm_unregister(void)
+{
+ unregister_ioengine(&ioengine_client);
+ unregister_ioengine(&ioengine_server);
+}
--- /dev/null
+/*
+ * Copyright 2020, Intel Corporation
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License,
+ * version 2 as published by the Free Software Foundation..
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ */
+
+/* Generated by the protocol buffer compiler. DO NOT EDIT! */
+/* Generated from: librpma_gpspm_flush.proto */
+
+/* Do not generate deprecated warnings for self */
+#ifndef PROTOBUF_C__NO_DEPRECATED
+#define PROTOBUF_C__NO_DEPRECATED
+#endif
+
+#include "librpma_gpspm_flush.pb-c.h"
+void gpspm_flush_request__init
+ (GPSPMFlushRequest *message)
+{
+ static const GPSPMFlushRequest init_value = GPSPM_FLUSH_REQUEST__INIT;
+ *message = init_value;
+}
+size_t gpspm_flush_request__get_packed_size
+ (const GPSPMFlushRequest *message)
+{
+ assert(message->base.descriptor == &gpspm_flush_request__descriptor);
+ return protobuf_c_message_get_packed_size ((const ProtobufCMessage*)(message));
+}
+size_t gpspm_flush_request__pack
+ (const GPSPMFlushRequest *message,
+ uint8_t *out)
+{
+ assert(message->base.descriptor == &gpspm_flush_request__descriptor);
+ return protobuf_c_message_pack ((const ProtobufCMessage*)message, out);
+}
+size_t gpspm_flush_request__pack_to_buffer
+ (const GPSPMFlushRequest *message,
+ ProtobufCBuffer *buffer)
+{
+ assert(message->base.descriptor == &gpspm_flush_request__descriptor);
+ return protobuf_c_message_pack_to_buffer ((const ProtobufCMessage*)message, buffer);
+}
+GPSPMFlushRequest *
+ gpspm_flush_request__unpack
+ (ProtobufCAllocator *allocator,
+ size_t len,
+ const uint8_t *data)
+{
+ return (GPSPMFlushRequest *)
+ protobuf_c_message_unpack (&gpspm_flush_request__descriptor,
+ allocator, len, data);
+}
+void gpspm_flush_request__free_unpacked
+ (GPSPMFlushRequest *message,
+ ProtobufCAllocator *allocator)
+{
+ if(!message)
+ return;
+ assert(message->base.descriptor == &gpspm_flush_request__descriptor);
+ protobuf_c_message_free_unpacked ((ProtobufCMessage*)message, allocator);
+}
+void gpspm_flush_response__init
+ (GPSPMFlushResponse *message)
+{
+ static const GPSPMFlushResponse init_value = GPSPM_FLUSH_RESPONSE__INIT;
+ *message = init_value;
+}
+size_t gpspm_flush_response__get_packed_size
+ (const GPSPMFlushResponse *message)
+{
+ assert(message->base.descriptor == &gpspm_flush_response__descriptor);
+ return protobuf_c_message_get_packed_size ((const ProtobufCMessage*)(message));
+}
+size_t gpspm_flush_response__pack
+ (const GPSPMFlushResponse *message,
+ uint8_t *out)
+{
+ assert(message->base.descriptor == &gpspm_flush_response__descriptor);
+ return protobuf_c_message_pack ((const ProtobufCMessage*)message, out);
+}
+size_t gpspm_flush_response__pack_to_buffer
+ (const GPSPMFlushResponse *message,
+ ProtobufCBuffer *buffer)
+{
+ assert(message->base.descriptor == &gpspm_flush_response__descriptor);
+ return protobuf_c_message_pack_to_buffer ((const ProtobufCMessage*)message, buffer);
+}
+GPSPMFlushResponse *
+ gpspm_flush_response__unpack
+ (ProtobufCAllocator *allocator,
+ size_t len,
+ const uint8_t *data)
+{
+ return (GPSPMFlushResponse *)
+ protobuf_c_message_unpack (&gpspm_flush_response__descriptor,
+ allocator, len, data);
+}
+void gpspm_flush_response__free_unpacked
+ (GPSPMFlushResponse *message,
+ ProtobufCAllocator *allocator)
+{
+ if(!message)
+ return;
+ assert(message->base.descriptor == &gpspm_flush_response__descriptor);
+ protobuf_c_message_free_unpacked ((ProtobufCMessage*)message, allocator);
+}
+static const ProtobufCFieldDescriptor gpspm_flush_request__field_descriptors[3] =
+{
+ {
+ "offset",
+ 1,
+ PROTOBUF_C_LABEL_REQUIRED,
+ PROTOBUF_C_TYPE_FIXED64,
+ 0, /* quantifier_offset */
+ offsetof(GPSPMFlushRequest, offset),
+ NULL,
+ NULL,
+ 0, /* flags */
+ 0,NULL,NULL /* reserved1,reserved2, etc */
+ },
+ {
+ "length",
+ 2,
+ PROTOBUF_C_LABEL_REQUIRED,
+ PROTOBUF_C_TYPE_FIXED64,
+ 0, /* quantifier_offset */
+ offsetof(GPSPMFlushRequest, length),
+ NULL,
+ NULL,
+ 0, /* flags */
+ 0,NULL,NULL /* reserved1,reserved2, etc */
+ },
+ {
+ "op_context",
+ 3,
+ PROTOBUF_C_LABEL_REQUIRED,
+ PROTOBUF_C_TYPE_FIXED64,
+ 0, /* quantifier_offset */
+ offsetof(GPSPMFlushRequest, op_context),
+ NULL,
+ NULL,
+ 0, /* flags */
+ 0,NULL,NULL /* reserved1,reserved2, etc */
+ },
+};
+static const unsigned gpspm_flush_request__field_indices_by_name[] = {
+ 1, /* field[1] = length */
+ 0, /* field[0] = offset */
+ 2, /* field[2] = op_context */
+};
+static const ProtobufCIntRange gpspm_flush_request__number_ranges[1 + 1] =
+{
+ { 1, 0 },
+ { 0, 3 }
+};
+const ProtobufCMessageDescriptor gpspm_flush_request__descriptor =
+{
+ PROTOBUF_C__MESSAGE_DESCRIPTOR_MAGIC,
+ "GPSPM_flush_request",
+ "GPSPMFlushRequest",
+ "GPSPMFlushRequest",
+ "",
+ sizeof(GPSPMFlushRequest),
+ 3,
+ gpspm_flush_request__field_descriptors,
+ gpspm_flush_request__field_indices_by_name,
+ 1, gpspm_flush_request__number_ranges,
+ (ProtobufCMessageInit) gpspm_flush_request__init,
+ NULL,NULL,NULL /* reserved[123] */
+};
+static const ProtobufCFieldDescriptor gpspm_flush_response__field_descriptors[1] =
+{
+ {
+ "op_context",
+ 1,
+ PROTOBUF_C_LABEL_REQUIRED,
+ PROTOBUF_C_TYPE_FIXED64,
+ 0, /* quantifier_offset */
+ offsetof(GPSPMFlushResponse, op_context),
+ NULL,
+ NULL,
+ 0, /* flags */
+ 0,NULL,NULL /* reserved1,reserved2, etc */
+ },
+};
+static const unsigned gpspm_flush_response__field_indices_by_name[] = {
+ 0, /* field[0] = op_context */
+};
+static const ProtobufCIntRange gpspm_flush_response__number_ranges[1 + 1] =
+{
+ { 1, 0 },
+ { 0, 1 }
+};
+const ProtobufCMessageDescriptor gpspm_flush_response__descriptor =
+{
+ PROTOBUF_C__MESSAGE_DESCRIPTOR_MAGIC,
+ "GPSPM_flush_response",
+ "GPSPMFlushResponse",
+ "GPSPMFlushResponse",
+ "",
+ sizeof(GPSPMFlushResponse),
+ 1,
+ gpspm_flush_response__field_descriptors,
+ gpspm_flush_response__field_indices_by_name,
+ 1, gpspm_flush_response__number_ranges,
+ (ProtobufCMessageInit) gpspm_flush_response__init,
+ NULL,NULL,NULL /* reserved[123] */
+};
--- /dev/null
+/*
+ * Copyright 2020, Intel Corporation
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License,
+ * version 2 as published by the Free Software Foundation..
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ */
+
+/* Generated by the protocol buffer compiler. DO NOT EDIT! */
+/* Generated from: librpma_gpspm_flush.proto */
+
+#ifndef PROTOBUF_C_GPSPM_5fflush_2eproto__INCLUDED
+#define PROTOBUF_C_GPSPM_5fflush_2eproto__INCLUDED
+
+#include <protobuf-c/protobuf-c.h>
+
+PROTOBUF_C__BEGIN_DECLS
+
+#if PROTOBUF_C_VERSION_NUMBER < 1000000
+# error This file was generated by a newer version of protoc-c which is incompatible with your libprotobuf-c headers. Please update your headers.
+#elif 1003003 < PROTOBUF_C_MIN_COMPILER_VERSION
+# error This file was generated by an older version of protoc-c which is incompatible with your libprotobuf-c headers. Please regenerate this file with a newer version of protoc-c.
+#endif
+
+
+typedef struct _GPSPMFlushRequest GPSPMFlushRequest;
+typedef struct _GPSPMFlushResponse GPSPMFlushResponse;
+
+
+/* --- enums --- */
+
+
+/* --- messages --- */
+
+struct _GPSPMFlushRequest
+{
+ ProtobufCMessage base;
+ uint64_t offset;
+ uint64_t length;
+ uint64_t op_context;
+};
+#define GPSPM_FLUSH_REQUEST__INIT \
+ { PROTOBUF_C_MESSAGE_INIT (&gpspm_flush_request__descriptor) \
+ , 0, 0, 0 }
+
+
+struct _GPSPMFlushResponse
+{
+ ProtobufCMessage base;
+ uint64_t op_context;
+};
+#define GPSPM_FLUSH_RESPONSE__INIT \
+ { PROTOBUF_C_MESSAGE_INIT (&gpspm_flush_response__descriptor) \
+ , 0 }
+
+
+/* GPSPMFlushRequest methods */
+void gpspm_flush_request__init
+ (GPSPMFlushRequest *message);
+size_t gpspm_flush_request__get_packed_size
+ (const GPSPMFlushRequest *message);
+size_t gpspm_flush_request__pack
+ (const GPSPMFlushRequest *message,
+ uint8_t *out);
+size_t gpspm_flush_request__pack_to_buffer
+ (const GPSPMFlushRequest *message,
+ ProtobufCBuffer *buffer);
+GPSPMFlushRequest *
+ gpspm_flush_request__unpack
+ (ProtobufCAllocator *allocator,
+ size_t len,
+ const uint8_t *data);
+void gpspm_flush_request__free_unpacked
+ (GPSPMFlushRequest *message,
+ ProtobufCAllocator *allocator);
+/* GPSPMFlushResponse methods */
+void gpspm_flush_response__init
+ (GPSPMFlushResponse *message);
+size_t gpspm_flush_response__get_packed_size
+ (const GPSPMFlushResponse *message);
+size_t gpspm_flush_response__pack
+ (const GPSPMFlushResponse *message,
+ uint8_t *out);
+size_t gpspm_flush_response__pack_to_buffer
+ (const GPSPMFlushResponse *message,
+ ProtobufCBuffer *buffer);
+GPSPMFlushResponse *
+ gpspm_flush_response__unpack
+ (ProtobufCAllocator *allocator,
+ size_t len,
+ const uint8_t *data);
+void gpspm_flush_response__free_unpacked
+ (GPSPMFlushResponse *message,
+ ProtobufCAllocator *allocator);
+/* --- per-message closures --- */
+
+typedef void (*GPSPMFlushRequest_Closure)
+ (const GPSPMFlushRequest *message,
+ void *closure_data);
+typedef void (*GPSPMFlushResponse_Closure)
+ (const GPSPMFlushResponse *message,
+ void *closure_data);
+
+/* --- services --- */
+
+
+/* --- descriptors --- */
+
+extern const ProtobufCMessageDescriptor gpspm_flush_request__descriptor;
+extern const ProtobufCMessageDescriptor gpspm_flush_response__descriptor;
+
+PROTOBUF_C__END_DECLS
+
+
+#endif /* PROTOBUF_C_GPSPM_5fflush_2eproto__INCLUDED */
--- /dev/null
+syntax = "proto2";
+
+message GPSPM_flush_request {
+ /* an offset of a region to be flushed within its memory registration */
+ required fixed64 offset = 1;
+ /* a length of a region to be flushed */
+ required fixed64 length = 2;
+ /* a user-defined operation context */
+ required fixed64 op_context = 3;
+}
+
+message GPSPM_flush_response {
+ /* the operation context of a completed request */
+ required fixed64 op_context = 1;
+}
char *pool_name;
char *client_name;
int busy_poll;
+ int touch_objects;
};
static struct fio_option options[] = {
.category = FIO_OPT_C_ENGINE,
.group = FIO_OPT_G_RBD,
},
+ {
+ .name = "touch_objects",
+ .lname = "touch objects on start",
+ .type = FIO_OPT_BOOL,
+ .help = "Touch (create) objects on start",
+ .off1 = offsetof(struct rados_options, touch_objects),
+ .def = "1",
+ .category = FIO_OPT_C_ENGINE,
+ .group = FIO_OPT_G_RBD,
+ },
{
.name = NULL,
},
for (i = 0; i < td->o.nr_files; i++) {
f = td->files[i];
f->real_file_size = file_size;
- r = rados_write(rados->io_ctx, f->file_name, "", 0, 0);
- if (r < 0) {
- goto failed_obj_create;
+ if (o->touch_objects) {
+ r = rados_write(rados->io_ctx, f->file_name, "", 0, 0);
+ if (r < 0) {
+ goto failed_obj_create;
+ }
}
}
return 0;
else
this_rate = 0;
- if (unified_rw_rep) {
+ if (unified_rw_rep == UNIFIED_MIXED) {
rate[i] = 0;
rate[0] += this_rate;
} else
else
this_iops = 0;
- if (unified_rw_rep) {
+ if (unified_rw_rep == UNIFIED_MIXED) {
iops[i] = 0;
iops[0] += this_iops;
} else
--- /dev/null
+[global]
+ioengine=dfs
+pool=${POOL}
+cont=${CONT}
+filename_format=fio-test.$jobnum
+
+cpus_allowed_policy=split
+group_reporting=1
+time_based=0
+percentile_list=99.0:99.9:99.99:99.999:99.9999:100
+disable_slat=1
+disable_clat=1
+
+bs=1M
+size=100G
+iodepth=16
+numjobs=16
+
+[daos-seqwrite]
+rw=write
+stonewall
+
+[daos-seqread]
+rw=read
+stonewall
+
+[daos-randwrite]
+rw=randwrite
+stonewall
+
+[daos-randread]
+rw=randread
+stonewall
--- /dev/null
+# Example filedelete job
+
+# 'filedelete' engine only do 'unlink(filename)', file will not be open().
+# 'filesize' must be set, then files will be created at setup stage.
+# 'unlink' is better set to 0, since the file is deleted in measurement.
+# the options disabled completion latency output such as 'disable_clat' and 'gtod_reduce' must not set.
+[global]
+ioengine=filedelete
+filesize=4k
+nrfiles=200
+unlink=0
+
+[t0]
+[t1]
+[t2]
+[t3]
+[t4]
+[t5]
--- /dev/null
+# Example of the librpma_apm_client job
+
+[global]
+ioengine=librpma_apm_client
+create_serialize=0 # (required) forces specific initiation sequence
+serverip=[serverip] #IP address the server is listening on
+port=7204 # port(s) the server will listen on, <port; port + numjobs - 1> will be used
+thread
+
+# The client will get a remote memory region description after establishing
+# a connection.
+
+[client]
+numjobs=1 # number of parallel connections
+group_reporting=1
+sync=1 # 1 is the best for latency measurements, 0 for bandwidth
+iodepth=2 # total number of ious
+iodepth_batch_submit=1 # number of ious to be submitted at once
+rw=write # read/write/randread/randwrite/readwrite/rw
+rwmixread=70 # % of a mixed workload that should be reads
+blocksize=4KiB
+ramp_time=15s # gives some time to stabilize the workload
+time_based
+runtime=60s # run the workload for the specified period of time
--- /dev/null
+# Example of the librpma_apm_server job
+
+[global]
+ioengine=librpma_apm_server
+create_serialize=0 # (required) forces specific initiation sequence
+kb_base=1000 # turn on the straight units handling (non-compatibility mode)
+serverip=[serverip] # IP address to listen on
+port=7204 # port(s) the server jobs will listen on, ports <port; port + numjobs - 1> will be used
+thread
+
+# The server side spawns one thread for each expected connection from
+# the client-side, opens and registers the range dedicated for this thread
+# (a workspace) from the provided memory.
+# Each of the server threads accepts a connection on the dedicated port
+# (different for each and every working thread) and waits for it to end up,
+# and closes itself.
+
+[server]
+# set to 1 (true) ONLY when Direct Write to PMem from the remote host is possible
+# (https://pmem.io/rpma/documentation/basic-direct-write-to-pmem.html)
+direct_write_to_pmem=0
+
+numjobs=1 # number of expected incomming connections
+size=100MiB # size of workspace for a single connection
+filename=malloc # device dax or an existing fsdax file or "malloc" for allocation from DRAM
+# filename=/dev/dax1.0
--- /dev/null
+# Example of the librpma_gpspm_client job
+
+[global]
+ioengine=librpma_gpspm_client
+create_serialize=0 # (required) forces specific initiation sequence
+serverip=[serverip] #IP address the server is listening on
+port=7204 # port(s) the server will listen on, <port; port + numjobs - 1> will be used
+thread
+
+# The client will get a remote memory region description after establishing
+# a connection.
+
+[client]
+numjobs=1 # number of parallel connections
+group_reporting=1
+sync=1 # 1 is the best for latency measurements, 0 for bandwidth
+iodepth=2 # total number of ious
+iodepth_batch_submit=1 # number of ious to be submitted at once
+rw=write # write/randwrite
+blocksize=4KiB
+ramp_time=15s # gives some time to stabilize the workload
+time_based
+runtime=60s # run the workload for the specified period of time
--- /dev/null
+# Example of the librpma_gpspm_server job
+
+[global]
+ioengine=librpma_gpspm_server
+create_serialize=0 # (required) forces specific initiation sequence
+kb_base=1000 # turn on the straight units handling (non-compatibility mode)
+serverip=[serverip] #IP address to listen on
+port=7204 # port(s) the server jobs will listen on, ports <port; port + numjobs - 1> will be used
+thread
+
+# The server side spawns one thread for each expected connection from
+# the client-side, opens and registers the range dedicated for this thread
+# (a workspace) from the provided memory.
+# Each of the server threads accepts a connection on the dedicated port
+# (different for each and every working thread), accepts and executes flush
+# requests, and sends back a flush response for each of the requests.
+# When the client is done it sends the termination notice to the server's thread.
+
+[server]
+# set to 1 (true) ONLY when Direct Write to PMem from the remote host is possible
+# (https://pmem.io/rpma/documentation/basic-direct-write-to-pmem.html)
+direct_write_to_pmem=0
+numjobs=1 # number of expected incomming connections
+iodepth=2 # number of parallel GPSPM requests
+size=100MiB # size of workspace for a single connection
+filename=malloc # device dax or an existing fsdax file or "malloc" for allocation from DRAM
+# filename=/dev/dax1.0
+
+# The client will terminate the server when the client will end up its job.
+time_based
+runtime=365d
if (o->read_iolog_file)
goto done;
+ if (td->o.zone_mode == ZONE_MODE_ZBD) {
+ err = zbd_init_files(td);
+ if (err)
+ goto err_out;
+ }
+ zbd_recalc_options_with_zone_granularity(td);
+
/*
* check sizes. if the files/devices do not exist and the size
* isn't passed to fio, abort.
}
done:
- if (o->create_only)
- td->done = 1;
-
- td_restore_runstate(td, old_state);
-
if (td->o.zone_mode == ZONE_MODE_ZBD) {
err = zbd_setup_files(td);
if (err)
goto err_out;
}
+
+ if (o->create_only)
+ td->done = 1;
+
+ td_restore_runstate(td, old_state);
+
return 0;
err_offset:
.PD
.RE
.P
+`z' suffix specifies that the value is measured in zones.
+Value is recalculated once block device's zone size becomes known.
+.P
If the option accepts an upper and lower range, use a colon ':' or
minus '\-' to separate such values. See \fIirange\fR parameter type.
If the lower value specified happens to be larger than the upper value
.TP
.BI ioscheduler \fR=\fPstr
Attempt to switch the device hosting the file to the specified I/O scheduler
-before running.
+before running. If the file is a pipe, a character device file or if device
+hosting the file could not be determined, this option is ignored.
.TP
.BI create_serialize \fR=\fPbool
If true, serialize the file creation for the jobs. This may be handy to
block device, the zone capacity is obtained from the device information and this
option is ignored.
.TP
-.BI zoneskip \fR=\fPint
+.BI zoneskip \fR=\fPint[z]
For \fBzonemode\fR=strided, the number of bytes to skip after \fBzonesize\fR
bytes of data have been transferred.
times before generating a new offset.
.RE
.TP
-.BI unified_rw_reporting \fR=\fPbool
+.BI unified_rw_reporting \fR=\fPstr
Fio normally reports statistics on a per data direction basis, meaning that
-reads, writes, and trims are accounted and reported separately. If this
-option is set fio sums the results and report them as "mixed" instead.
+reads, writes, and trims are accounted and reported separately. This option
+determines whether fio reports the results normally, summed together, or as
+both options.
+Accepted values are:
+.RS
+.TP
+.B none
+Normal statistics reporting.
+.TP
+.B mixed
+Statistics are summed per data direction and reported together.
+.TP
+.B both
+Statistics are reported normally, followed by the mixed statistics.
+.TP
+.B 0
+Backward-compatible alias for \fBnone\fR.
+.TP
+.B 1
+Backward-compatible alias for \fBmixed\fR.
+.TP
+.B 2
+Alias for \fBboth\fR.
+.RE
.TP
.BI randrepeat \fR=\fPbool
Seed the random number generator used for random I/O patterns in a
should be associated with them.
.RE
.TP
-.BI offset \fR=\fPint
+.BI offset \fR=\fPint[%|z]
Start I/O at the provided offset in the file, given as either a fixed size in
bytes or a percentage. If a percentage is given, the generated offset will be
aligned to the minimum \fBblocksize\fR or to the value of \fBoffset_align\fR if
is aligned upwards to this value. Defaults to 0 meaning that a percentage
offset is aligned to the minimum block size.
.TP
-.BI offset_increment \fR=\fPint
+.BI offset_increment \fR=\fPint[%|z]
If this is provided, then the real offset becomes `\fBoffset\fR + \fBoffset_increment\fR
* thread_number', where the thread number is a counter that starts at 0 and
is incremented for each sub-job (i.e. when \fBnumjobs\fR option is
simulate a smaller amount of memory. The amount specified is per worker.
.SS "I/O size"
.TP
-.BI size \fR=\fPint
+.BI size \fR=\fPint[%|z]
The total size of file I/O for each thread of this job. Fio will run until
this many bytes has been transferred, unless runtime is limited by other options
(such as \fBruntime\fR, for instance, or increased/decreased by \fBio_size\fR).
Can be combined with \fBoffset\fR to constrain the start and end range
that I/O will be done within.
.TP
-.BI io_size \fR=\fPint "\fR,\fB io_limit" \fR=\fPint
+.BI io_size \fR=\fPint[%|z] "\fR,\fB io_limit" \fR=\fPint[%|z]
Normally fio operates within the region set by \fBsize\fR, which means
that the \fBsize\fR option sets both the region and size of I/O to be
performed. Sometimes that is not what you want. With this option, it is
and 'nrfiles', so that files will be created.
This engine is to measure file lookup and meta data access.
.TP
+.B filedelete
+Simply delete files by unlink() and do no I/O to the file. You need to set 'filesize'
+and 'nrfiles', so that files will be created.
+This engine is to measure file delete.
+.TP
.B libpmem
Read and write using mmap I/O to a file on a filesystem
mounted with DAX on a persistent memory device through the PMDK
I/O without transferring buffers between user-space and the kernel,
unless \fBverify\fR is set or \fBcuda_io\fR is \fBposix\fR. \fBiomem\fR must
not be \fBcudamalloc\fR. This ioengine defines engine specific options.
+.TP
+.B dfs
+I/O engine supporting asynchronous read and write operations to the DAOS File
+System (DFS) via libdfs.
.SS "I/O engine specific parameters"
In addition, there are some parameters which are only valid when a specific
\fBioengine\fR is in use. These are used identically to normal parameters,
this will be the starting port number since fio will use a range of
ports.
.TP
-.BI (rdma)port
+.BI (rdma, librpma_*)port
The port to use for RDMA-CM communication. This should be the same
value on the client and the server side.
.TP
If the job is a TCP listener or UDP reader, the hostname is not used
and must be omitted unless it is a valid UDP multicast address.
.TP
+.BI (librpma_*)serverip \fR=\fPstr
+The IP address to be used for RDMA-CM based I/O.
+.TP
+.BI (librpma_*_server)direct_write_to_pmem \fR=\fPbool
+Set to 1 only when Direct Write to PMem from the remote host is possible. Otherwise, set to 0.
+.TP
.BI (netsplice,net)interface \fR=\fPstr
The IP address of the network interface used to send or receive UDP
multicast.
Poll store instead of waiting for completion. Usually this provides better
throughput at cost of higher(up to 100%) CPU utilization.
.TP
+.BI (rados)touch_objects \fR=\fPbool
+During initialization, touch (create if do not exist) all objects (files).
+Touching all objects affects ceph caches and likely impacts test results.
+Enabled by default.
+.TP
.BI (http)http_host \fR=\fPstr
Hostname to connect to. For S3, this could be the bucket name. Default
is \fBlocalhost\fR
the use of cudaMemcpy.
.RE
.RE
+.TP
+.BI (dfs)pool
+Specify the UUID of the DAOS pool to connect to.
+.TP
+.BI (dfs)cont
+Specify the UUID of the DAOS DAOS container to open.
+.TP
+.BI (dfs)chunk_size
+Specificy a different chunk size (in bytes) for the dfs file.
+Use DAOS container's chunk size by default.
+.TP
+.BI (dfs)object_class
+Specificy a different object class for the dfs file.
+Use DAOS container's object class by default.
.SS "I/O depth"
.TP
.BI iodepth \fR=\fPint
queue depth that meets \fBlatency_target\fR and exit. If true, fio will continue
running and try to meet \fBlatency_target\fR by adjusting queue depth.
.TP
-.BI max_latency \fR=\fPtime
+.BI max_latency \fR=\fPtime[,time][,time]
If set, fio will exit the job with an ETIMEDOUT error if it exceeds this
maximum latency. When the unit is omitted, the value is interpreted in
-microseconds.
+microseconds. Comma-separated values may be specified for reads, writes,
+and trims as described in \fBblocksize\fR.
.TP
.BI rate_cycle \fR=\fPint
Average bandwidth for \fBrate\fR and \fBrate_min\fR over this number
int fio_monotonic_clocktest(int debug)
{
struct clock_thread *cthreads;
- unsigned int nr_cpus = cpus_online();
+ unsigned int seen_cpus, nr_cpus = cpus_online();
struct clock_entry *entries;
unsigned long nr_entries, tentries, failed = 0;
struct clock_entry *prev, *this;
uint32_t seq = 0;
unsigned int i;
+ os_cpu_mask_t mask;
+
+#ifdef CONFIG_PTHREAD_GETAFFINITY
+ fio_get_thread_affinity(mask);
+#else
+ memset(&mask, 0, sizeof(mask));
+ for (i = 0; i < nr_cpus; i++)
+ fio_cpu_set(&mask, i);
+#endif
if (debug) {
log_info("cs: reliable_tsc: %s\n", tsc_reliable ? "yes" : "no");
if (debug)
log_info("cs: Testing %u CPUs\n", nr_cpus);
+ seen_cpus = 0;
for (i = 0; i < nr_cpus; i++) {
struct clock_thread *t = &cthreads[i];
+ if (!fio_cpu_isset(&mask, i))
+ continue;
t->cpu = i;
t->debug = debug;
t->seq = &seq;
t->nr_entries = nr_entries;
- t->entries = &entries[i * nr_entries];
+ t->entries = &entries[seen_cpus * nr_entries];
__fio_sem_init(&t->lock, FIO_SEM_LOCKED);
if (pthread_create(&t->thread, NULL, clock_thread_fn, t)) {
failed++;
nr_cpus = i;
break;
}
+ seen_cpus++;
}
for (i = 0; i < nr_cpus; i++) {
struct clock_thread *t = &cthreads[i];
+ if (!fio_cpu_isset(&mask, i))
+ continue;
fio_sem_up(&t->lock);
}
struct clock_thread *t = &cthreads[i];
void *ret;
+ if (!fio_cpu_isset(&mask, i))
+ continue;
pthread_join(t->thread, &ret);
if (ret)
failed++;
goto err;
}
+ tentries = nr_entries * seen_cpus;
qsort(entries, tentries, sizeof(struct clock_entry), clock_cmp);
/* silence silly gcc */
}
}
-static void fio_dump_options_free(struct thread_data *td)
-{
- while (!flist_empty(&td->opt_list)) {
- struct print_option *p;
-
- p = flist_first_entry(&td->opt_list, struct print_option, list);
- flist_del_init(&p->list);
- free(p->name);
- free(p->value);
- free(p);
- }
-}
-
static void copy_opt_list(struct thread_data *dst, struct thread_data *src)
{
struct flist_head *entry;
ret |= 1;
}
+ if (o->zone_mode == ZONE_MODE_ZBD && !o->create_serialize) {
+ log_err("fio: --zonemode=zbd and --create_serialize=0 are not compatible.\n");
+ ret |= 1;
+ }
+
if (o->zone_mode == ZONE_MODE_STRIDED && !o->zone_size) {
log_err("fio: --zonesize must be specified when using --zonemode=strided.\n");
ret |= 1;
/*
* Fix these up to be nsec internally
*/
- o->max_latency *= 1000ULL;
+ for_each_rw_ddir(ddir)
+ o->max_latency[ddir] *= 1000ULL;
+
o->latency_target *= 1000ULL;
return ret;
return 0;
}
-static void lat_fatal(struct thread_data *td, struct io_completion_data *icd,
+static void lat_fatal(struct thread_data *td, struct io_u *io_u, struct io_completion_data *icd,
unsigned long long tnsec, unsigned long long max_nsec)
{
- if (!td->error)
- log_err("fio: latency of %llu nsec exceeds specified max (%llu nsec)\n", tnsec, max_nsec);
+ if (!td->error) {
+ log_err("fio: latency of %llu nsec exceeds specified max (%llu nsec): %s %s %llu %llu\n",
+ tnsec, max_nsec,
+ io_u->file->file_name,
+ io_ddir_name(io_u->ddir),
+ io_u->offset, io_u->buflen);
+ }
td_verror(td, ETIMEDOUT, "max latency exceeded");
icd->error = ETIMEDOUT;
}
icd->error = ops->io_u_lat(td, tnsec);
}
- if (td->o.max_latency && tnsec > td->o.max_latency)
- lat_fatal(td, icd, tnsec, td->o.max_latency);
- if (td->o.latency_target && tnsec > td->o.latency_target) {
- if (lat_target_failed(td))
- lat_fatal(td, icd, tnsec, td->o.latency_target);
+ if (ddir_rw(idx)) {
+ if (td->o.max_latency[idx] && tnsec > td->o.max_latency[idx])
+ lat_fatal(td, io_u, icd, tnsec, td->o.max_latency[idx]);
+ if (td->o.latency_target && tnsec > td->o.latency_target) {
+ if (lat_target_failed(td))
+ lat_fatal(td, io_u, icd, tnsec, td->o.latency_target);
+ }
}
}
/*
* open iolog, check version, and call appropriate parser
*/
-static bool init_iolog_read(struct thread_data *td)
+static bool init_iolog_read(struct thread_data *td, char *fname)
{
- char buffer[256], *p, *fname;
+ char buffer[256], *p;
FILE *f = NULL;
- fname = get_name_by_idx(td->o.read_iolog_file, td->subjob_number);
dprint(FD_IO, "iolog: name=%s\n", fname);
if (is_socket(fname)) {
if (td->o.read_iolog_file) {
int need_swap;
+ char * fname = get_name_by_idx(td->o.read_iolog_file, td->subjob_number);
/*
* Check if it's a blktrace file and load that if possible.
* Otherwise assume it's a normal log file and load that.
*/
- if (is_blktrace(td->o.read_iolog_file, &need_swap))
- ret = load_blktrace(td, td->o.read_iolog_file, need_swap);
+ if (is_blktrace(fname, &need_swap))
+ ret = load_blktrace(td, fname, need_swap);
else
- ret = init_iolog_read(td);
+ ret = init_iolog_read(td, fname);
} else if (td->o.write_iolog_file)
ret = init_iolog_write(td);
else
.name = "RDMA I/O engine", /* rdma */
.mask = FIO_OPT_G_RDMA,
},
+ {
+ .name = "librpma I/O engines", /* librpma_apm && librpma_gpspm */
+ .mask = FIO_OPT_G_LIBRPMA,
+ },
{
.name = "libaio I/O engine", /* libaio */
.mask = FIO_OPT_G_LIBAIO,
.name = "libcufile I/O engine", /* libcufile */
.mask = FIO_OPT_G_LIBCUFILE,
},
+ {
+ .name = "DAOS File System (dfs) I/O engine", /* dfs */
+ .mask = FIO_OPT_G_DFS,
+ },
{
.name = NULL,
},
__FIO_OPT_G_E4DEFRAG,
__FIO_OPT_G_NETIO,
__FIO_OPT_G_RDMA,
+ __FIO_OPT_G_LIBRPMA,
__FIO_OPT_G_LIBAIO,
__FIO_OPT_G_ACT,
__FIO_OPT_G_LATPROF,
__FIO_OPT_G_FILESTAT,
__FIO_OPT_G_NR,
__FIO_OPT_G_LIBCUFILE,
+ __FIO_OPT_G_DFS,
FIO_OPT_G_RATE = (1ULL << __FIO_OPT_G_RATE),
FIO_OPT_G_ZONE = (1ULL << __FIO_OPT_G_ZONE),
FIO_OPT_G_E4DEFRAG = (1ULL << __FIO_OPT_G_E4DEFRAG),
FIO_OPT_G_NETIO = (1ULL << __FIO_OPT_G_NETIO),
FIO_OPT_G_RDMA = (1ULL << __FIO_OPT_G_RDMA),
+ FIO_OPT_G_LIBRPMA = (1ULL << __FIO_OPT_G_LIBRPMA),
FIO_OPT_G_LIBAIO = (1ULL << __FIO_OPT_G_LIBAIO),
FIO_OPT_G_ACT = (1ULL << __FIO_OPT_G_ACT),
FIO_OPT_G_LATPROF = (1ULL << __FIO_OPT_G_LATPROF),
FIO_OPT_G_IOURING = (1ULL << __FIO_OPT_G_IOURING),
FIO_OPT_G_FILESTAT = (1ULL << __FIO_OPT_G_FILESTAT),
FIO_OPT_G_LIBCUFILE = (1ULL << __FIO_OPT_G_LIBCUFILE),
+ FIO_OPT_G_DFS = (1ULL << __FIO_OPT_G_DFS),
};
extern const struct opt_group *opt_group_from_mask(uint64_t *mask);
if (parse_is_percent(v)) {
td->o.start_offset = 0;
td->o.start_offset_percent = -1ULL - v;
+ td->o.start_offset_nz = 0;
dprint(FD_PARSE, "SET start_offset_percent %d\n",
td->o.start_offset_percent);
+ } else if (parse_is_zone(v)) {
+ td->o.start_offset = 0;
+ td->o.start_offset_percent = 0;
+ td->o.start_offset_nz = v - ZONE_BASE_VAL;
} else
td->o.start_offset = v;
if (parse_is_percent(v)) {
td->o.offset_increment = 0;
td->o.offset_increment_percent = -1ULL - v;
+ td->o.offset_increment_nz = 0;
dprint(FD_PARSE, "SET offset_increment_percent %d\n",
td->o.offset_increment_percent);
+ } else if (parse_is_zone(v)) {
+ td->o.offset_increment = 0;
+ td->o.offset_increment_percent = 0;
+ td->o.offset_increment_nz = v - ZONE_BASE_VAL;
} else
td->o.offset_increment = v;
td->o.size_percent = -1ULL - v;
dprint(FD_PARSE, "SET size_percent %d\n",
td->o.size_percent);
+ } else if (parse_is_zone(v)) {
+ td->o.size = 0;
+ td->o.size_percent = 0;
+ td->o.size_nz = v - ZONE_BASE_VAL;
} else
td->o.size = v;
}
dprint(FD_PARSE, "SET io_size_percent %d\n",
td->o.io_size_percent);
+ } else if (parse_is_zone(v)) {
+ td->o.io_size = 0;
+ td->o.io_size_percent = 0;
+ td->o.io_size_nz = v - ZONE_BASE_VAL;
} else
td->o.io_size = v;
return 0;
}
+static int str_zoneskip_cb(void *data, unsigned long long *__val)
+{
+ struct thread_data *td = cb_data_to_td(data);
+ unsigned long long v = *__val;
+
+ if (parse_is_zone(v)) {
+ td->o.zone_skip = 0;
+ td->o.zone_skip_nz = v - ZONE_BASE_VAL;
+ } else
+ td->o.zone_skip = v;
+
+ return 0;
+}
+
static int str_write_bw_log_cb(void *data, const char *str)
{
struct thread_data *td = cb_data_to_td(data);
.help = "RDMA IO engine",
},
#endif
+#ifdef CONFIG_LIBRPMA_APM
+ { .ival = "librpma_apm",
+ .help = "librpma IO engine in APM mode",
+ },
+#endif
+#ifdef CONFIG_LIBRPMA_GPSPM
+ { .ival = "librpma_gpspm",
+ .help = "librpma IO engine in GPSPM mode",
+ },
+#endif
#ifdef CONFIG_LINUX_EXT4_MOVE_EXTENT
{ .ival = "e4defrag",
.help = "ext4 defrag engine",
{ .ival = "nbd",
.help = "Network Block Device (NBD) IO engine"
},
+#ifdef CONFIG_DFS
+ { .ival = "dfs",
+ .help = "DAOS File System (dfs) IO engine",
+ },
+#endif
},
},
{
{
.name = "size",
.lname = "Size",
- .type = FIO_OPT_STR_VAL,
+ .type = FIO_OPT_STR_VAL_ZONE,
.cb = str_size_cb,
.off1 = offsetof(struct thread_options, size),
.help = "Total size of device or files",
- .interval = 1024 * 1024,
.category = FIO_OPT_C_IO,
.group = FIO_OPT_G_INVALID,
},
.name = "io_size",
.alias = "io_limit",
.lname = "IO Size",
- .type = FIO_OPT_STR_VAL,
+ .type = FIO_OPT_STR_VAL_ZONE,
.cb = str_io_size_cb,
.off1 = offsetof(struct thread_options, io_size),
.help = "Total size of I/O to be performed",
- .interval = 1024 * 1024,
.category = FIO_OPT_C_IO,
.group = FIO_OPT_G_INVALID,
},
.name = "offset",
.lname = "IO offset",
.alias = "fileoffset",
- .type = FIO_OPT_STR_VAL,
+ .type = FIO_OPT_STR_VAL_ZONE,
.cb = str_offset_cb,
.off1 = offsetof(struct thread_options, start_offset),
.help = "Start IO from this offset",
.def = "0",
- .interval = 1024 * 1024,
.category = FIO_OPT_C_IO,
.group = FIO_OPT_G_INVALID,
},
{
.name = "offset_increment",
.lname = "IO offset increment",
- .type = FIO_OPT_STR_VAL,
+ .type = FIO_OPT_STR_VAL_ZONE,
.cb = str_offset_increment_cb,
.off1 = offsetof(struct thread_options, offset_increment),
.help = "What is the increment from one offset to the next",
.parent = "offset",
.hide = 1,
.def = "0",
- .interval = 1024 * 1024,
.category = FIO_OPT_C_IO,
.group = FIO_OPT_G_INVALID,
},
{
.name = "zoneskip",
.lname = "Zone skip",
- .type = FIO_OPT_STR_VAL,
+ .type = FIO_OPT_STR_VAL_ZONE,
+ .cb = str_zoneskip_cb,
.off1 = offsetof(struct thread_options, zone_skip),
.help = "Space between IO zones",
.def = "0",
- .interval = 1024 * 1024,
.category = FIO_OPT_C_IO,
.group = FIO_OPT_G_ZONE,
},
{
.name = "max_latency",
.lname = "Max Latency (usec)",
- .type = FIO_OPT_STR_VAL_TIME,
- .off1 = offsetof(struct thread_options, max_latency),
+ .type = FIO_OPT_ULL,
+ .off1 = offsetof(struct thread_options, max_latency[DDIR_READ]),
+ .off2 = offsetof(struct thread_options, max_latency[DDIR_WRITE]),
+ .off3 = offsetof(struct thread_options, max_latency[DDIR_TRIM]),
.help = "Maximum tolerated IO latency (usec)",
.is_time = 1,
.category = FIO_OPT_C_IO,
{
.name = "unified_rw_reporting",
.lname = "Unified RW Reporting",
- .type = FIO_OPT_BOOL,
+ .type = FIO_OPT_STR,
.off1 = offsetof(struct thread_options, unified_rw_rep),
.help = "Unify reporting across data direction",
- .def = "0",
+ .def = "none",
.category = FIO_OPT_C_GENERAL,
.group = FIO_OPT_G_INVALID,
+ .posval = {
+ { .ival = "none",
+ .oval = UNIFIED_SPLIT,
+ .help = "Normal statistics reporting",
+ },
+ { .ival = "mixed",
+ .oval = UNIFIED_MIXED,
+ .help = "Statistics are summed per data direction and reported together",
+ },
+ { .ival = "both",
+ .oval = UNIFIED_BOTH,
+ .help = "Statistics are reported normally, followed by the mixed statistics"
+ },
+ /* Compatibility with former boolean values */
+ { .ival = "0",
+ .oval = UNIFIED_SPLIT,
+ .help = "Alias for 'none'",
+ },
+ { .ival = "1",
+ .oval = UNIFIED_MIXED,
+ .help = "Alias for 'mixed'",
+ },
+ { .ival = "2",
+ .oval = UNIFIED_BOTH,
+ .help = "Alias for 'both'",
+ },
+ },
},
{
.name = "continue_on_error",
}
}
+void fio_dump_options_free(struct thread_data *td)
+{
+ while (!flist_empty(&td->opt_list)) {
+ struct print_option *p;
+
+ p = flist_first_entry(&td->opt_list, struct print_option, list);
+ flist_del_init(&p->list);
+ free(p->name);
+ free(p->value);
+ free(p);
+ }
+}
+
struct fio_option *fio_option_find(const char *name)
{
return find_option(fio_options, name);
void del_opt_posval(const char *, const char *);
struct thread_data;
void fio_options_free(struct thread_data *);
+void fio_dump_options_free(struct thread_data *);
char *get_next_str(char **ptr);
int get_max_str_idx(char *input);
char* get_name_by_idx(char *input, int index);
sched_getaffinity((pid), (ptr))
#endif
+#define fio_get_thread_affinity(mask) \
+ pthread_getaffinity_np(pthread_self(), sizeof(mask), &(mask))
+
#define fio_cpu_clear(mask, cpu) (void) CPU_CLR((cpu), (mask))
#define fio_cpu_set(mask, cpu) (void) CPU_SET((cpu), (mask))
#define fio_cpu_isset(mask, cpu) (CPU_ISSET((cpu), (mask)) != 0)
"OPT_BOOL",
"OPT_FLOAT_LIST",
"OPT_STR_SET",
+ "OPT_STR_VAL_ZONE",
"OPT_DEPRECATED",
"OPT_SOFT_DEPRECATED",
"OPT_UNSUPPORTED",
fallthrough;
case FIO_OPT_ULL:
case FIO_OPT_INT:
- case FIO_OPT_STR_VAL: {
+ case FIO_OPT_STR_VAL:
+ case FIO_OPT_STR_VAL_ZONE:
+ {
fio_opt_str_val_fn *fn = o->cb;
char tmp[128], *p;
+ size_t len = strlen(ptr);
+
+ if (len > 0 && ptr[len - 1] == 'z') {
+ if (o->type == FIO_OPT_STR_VAL_ZONE) {
+ char *ep;
+ unsigned long long val;
+
+ errno = 0;
+ val = strtoul(ptr, &ep, 10);
+ if (errno == 0 && ep != ptr && *ep == 'z') {
+ ull = ZONE_BASE_VAL + (uint32_t)val;
+ ret = 0;
+ goto store_option_value;
+ } else {
+ log_err("%s: unexpected zone value '%s'\n",
+ o->name, ptr);
+ return 1;
+ }
+ } else {
+ log_err("%s: 'z' suffix isn't applicable\n",
+ o->name);
+ return 1;
+ }
+ }
if (!is_time && o->is_time)
is_time = o->is_time;
}
}
+store_option_value:
if (fn)
ret = fn(data, &ull);
else {
FIO_OPT_BOOL,
FIO_OPT_FLOAT_LIST,
FIO_OPT_STR_SET,
+ FIO_OPT_STR_VAL_ZONE,
FIO_OPT_DEPRECATED,
FIO_OPT_SOFT_DEPRECATED,
FIO_OPT_UNSUPPORTED, /* keep this last */
static inline int parse_is_percent(unsigned long long val)
{
- return val <= -1ULL && val >= (-1ULL - 100ULL);
+ return val >= -101ULL;
}
+#define ZONE_BASE_VAL ((-1ULL >> 1) + 1)
static inline int parse_is_percent_uncapped(unsigned long long val)
{
- return (long long)val <= -1;
+ return ZONE_BASE_VAL + -1U < val;
+}
+
+static inline int parse_is_zone(unsigned long long val)
+{
+ return (val - ZONE_BASE_VAL) <= -1U;
}
struct print_option {
break;
}
flist_add_tail(&entry->list, &first->next);
- } while (ret != Z_STREAM_END);
+ }
ret = deflateEnd(&stream);
if (ret == Z_OK)
};
enum {
- FIO_SERVER_VER = 87,
+ FIO_SERVER_VER = 89,
FIO_SERVER_MAX_FRAGMENT_PDU = 1024,
FIO_SERVER_MAX_CMD_MB = 2048,
return true;
}
+void show_mixed_group_stats(struct group_run_stats *rs, struct buf_output *out)
+{
+ char *io, *agg, *min, *max;
+ char *ioalt, *aggalt, *minalt, *maxalt;
+ uint64_t io_mix = 0, agg_mix = 0, min_mix = -1, max_mix = 0, min_run = -1, max_run = 0;
+ int i;
+ const int i2p = is_power_of_2(rs->kb_base);
+
+ for (i = 0; i < DDIR_RWDIR_CNT; i++) {
+ if (!rs->max_run[i])
+ continue;
+ io_mix += rs->iobytes[i];
+ agg_mix += rs->agg[i];
+ min_mix = min_mix < rs->min_bw[i] ? min_mix : rs->min_bw[i];
+ max_mix = max_mix > rs->max_bw[i] ? max_mix : rs->max_bw[i];
+ min_run = min_run < rs->min_run[i] ? min_run : rs->min_run[i];
+ max_run = max_run > rs->max_run[i] ? max_run : rs->max_run[i];
+ }
+ io = num2str(io_mix, rs->sig_figs, 1, i2p, N2S_BYTE);
+ ioalt = num2str(io_mix, rs->sig_figs, 1, !i2p, N2S_BYTE);
+ agg = num2str(agg_mix, rs->sig_figs, 1, i2p, rs->unit_base);
+ aggalt = num2str(agg_mix, rs->sig_figs, 1, !i2p, rs->unit_base);
+ min = num2str(min_mix, rs->sig_figs, 1, i2p, rs->unit_base);
+ minalt = num2str(min_mix, rs->sig_figs, 1, !i2p, rs->unit_base);
+ max = num2str(max_mix, rs->sig_figs, 1, i2p, rs->unit_base);
+ maxalt = num2str(max_mix, rs->sig_figs, 1, !i2p, rs->unit_base);
+ log_buf(out, " MIXED: bw=%s (%s), %s-%s (%s-%s), io=%s (%s), run=%llu-%llumsec\n",
+ agg, aggalt, min, max, minalt, maxalt, io, ioalt,
+ (unsigned long long) min_run,
+ (unsigned long long) max_run);
+ free(io);
+ free(agg);
+ free(min);
+ free(max);
+ free(ioalt);
+ free(aggalt);
+ free(minalt);
+ free(maxalt);
+}
+
void show_group_stats(struct group_run_stats *rs, struct buf_output *out)
{
char *io, *agg, *min, *max;
max = num2str(rs->max_bw[i], rs->sig_figs, 1, i2p, rs->unit_base);
maxalt = num2str(rs->max_bw[i], rs->sig_figs, 1, !i2p, rs->unit_base);
log_buf(out, "%s: bw=%s (%s), %s-%s (%s-%s), io=%s (%s), run=%llu-%llumsec\n",
- rs->unified_rw_rep ? " MIXED" : str[i],
+ (rs->unified_rw_rep == UNIFIED_MIXED) ? " MIXED" : str[i],
agg, aggalt, min, max, minalt, maxalt, io, ioalt,
(unsigned long long) rs->min_run[i],
(unsigned long long) rs->max_run[i]);
free(minalt);
free(maxalt);
}
+
+ /* Need to aggregate statisitics to show mixed values */
+ if (rs->unified_rw_rep == UNIFIED_BOTH)
+ show_mixed_group_stats(rs, out);
}
void stat_calc_dist(uint64_t *map, unsigned long total, double *io_u_dist)
return p_of_agg;
}
+static void show_mixed_ddir_status(struct group_run_stats *rs, struct thread_stat *ts,
+ struct buf_output *out)
+{
+ unsigned long runt;
+ unsigned long long min, max, bw, iops;
+ double mean, dev;
+ char *io_p, *bw_p, *bw_p_alt, *iops_p, *post_st = NULL;
+ struct thread_stat *ts_lcl;
+
+ int i2p;
+ int ddir = 0, i;
+
+ /* Handle aggregation of Reads (ddir = 0), Writes (ddir = 1), and Trims (ddir = 2) */
+ ts_lcl = malloc(sizeof(struct thread_stat));
+ memset((void *)ts_lcl, 0, sizeof(struct thread_stat));
+ ts_lcl->unified_rw_rep = UNIFIED_MIXED; /* calculate mixed stats */
+ for (i = 0; i < DDIR_RWDIR_CNT; i++) {
+ ts_lcl->clat_stat[i].min_val = ULONG_MAX;
+ ts_lcl->slat_stat[i].min_val = ULONG_MAX;
+ ts_lcl->lat_stat[i].min_val = ULONG_MAX;
+ ts_lcl->bw_stat[i].min_val = ULONG_MAX;
+ ts_lcl->iops_stat[i].min_val = ULONG_MAX;
+ ts_lcl->clat_high_prio_stat[i].min_val = ULONG_MAX;
+ ts_lcl->clat_low_prio_stat[i].min_val = ULONG_MAX;
+ }
+ ts_lcl->sync_stat.min_val = ULONG_MAX;
+
+ sum_thread_stats(ts_lcl, ts, 1);
+
+ assert(ddir_rw(ddir));
+
+ if (!ts_lcl->runtime[ddir])
+ return;
+
+ i2p = is_power_of_2(rs->kb_base);
+ runt = ts_lcl->runtime[ddir];
+
+ bw = (1000 * ts_lcl->io_bytes[ddir]) / runt;
+ io_p = num2str(ts_lcl->io_bytes[ddir], ts->sig_figs, 1, i2p, N2S_BYTE);
+ bw_p = num2str(bw, ts->sig_figs, 1, i2p, ts->unit_base);
+ bw_p_alt = num2str(bw, ts->sig_figs, 1, !i2p, ts->unit_base);
+
+ iops = (1000 * ts_lcl->total_io_u[ddir]) / runt;
+ iops_p = num2str(iops, ts->sig_figs, 1, 0, N2S_NONE);
+
+ log_buf(out, " mixed: IOPS=%s, BW=%s (%s)(%s/%llumsec)%s\n",
+ iops_p, bw_p, bw_p_alt, io_p,
+ (unsigned long long) ts_lcl->runtime[ddir],
+ post_st ? : "");
+
+ free(post_st);
+ free(io_p);
+ free(bw_p);
+ free(bw_p_alt);
+ free(iops_p);
+
+ if (calc_lat(&ts_lcl->slat_stat[ddir], &min, &max, &mean, &dev))
+ display_lat("slat", min, max, mean, dev, out);
+ if (calc_lat(&ts_lcl->clat_stat[ddir], &min, &max, &mean, &dev))
+ display_lat("clat", min, max, mean, dev, out);
+ if (calc_lat(&ts_lcl->lat_stat[ddir], &min, &max, &mean, &dev))
+ display_lat(" lat", min, max, mean, dev, out);
+ if (calc_lat(&ts_lcl->clat_high_prio_stat[ddir], &min, &max, &mean, &dev)) {
+ display_lat(ts_lcl->lat_percentiles ? "high prio_lat" : "high prio_clat",
+ min, max, mean, dev, out);
+ if (calc_lat(&ts_lcl->clat_low_prio_stat[ddir], &min, &max, &mean, &dev))
+ display_lat(ts_lcl->lat_percentiles ? "low prio_lat" : "low prio_clat",
+ min, max, mean, dev, out);
+ }
+
+ if (ts->slat_percentiles && ts_lcl->slat_stat[ddir].samples > 0)
+ show_clat_percentiles(ts_lcl->io_u_plat[FIO_SLAT][ddir],
+ ts_lcl->slat_stat[ddir].samples,
+ ts->percentile_list,
+ ts->percentile_precision, "slat", out);
+ if (ts->clat_percentiles && ts_lcl->clat_stat[ddir].samples > 0)
+ show_clat_percentiles(ts_lcl->io_u_plat[FIO_CLAT][ddir],
+ ts_lcl->clat_stat[ddir].samples,
+ ts->percentile_list,
+ ts->percentile_precision, "clat", out);
+ if (ts->lat_percentiles && ts_lcl->lat_stat[ddir].samples > 0)
+ show_clat_percentiles(ts_lcl->io_u_plat[FIO_LAT][ddir],
+ ts_lcl->lat_stat[ddir].samples,
+ ts->percentile_list,
+ ts->percentile_precision, "lat", out);
+
+ if (ts->clat_percentiles || ts->lat_percentiles) {
+ const char *name = ts->lat_percentiles ? "lat" : "clat";
+ char prio_name[32];
+ uint64_t samples;
+
+ if (ts->lat_percentiles)
+ samples = ts_lcl->lat_stat[ddir].samples;
+ else
+ samples = ts_lcl->clat_stat[ddir].samples;
+
+ /* Only print this if some high and low priority stats were collected */
+ if (ts_lcl->clat_high_prio_stat[ddir].samples > 0 &&
+ ts_lcl->clat_low_prio_stat[ddir].samples > 0)
+ {
+ sprintf(prio_name, "high prio (%.2f%%) %s",
+ 100. * (double) ts_lcl->clat_high_prio_stat[ddir].samples / (double) samples,
+ name);
+ show_clat_percentiles(ts_lcl->io_u_plat_high_prio[ddir],
+ ts_lcl->clat_high_prio_stat[ddir].samples,
+ ts->percentile_list,
+ ts->percentile_precision, prio_name, out);
+
+ sprintf(prio_name, "low prio (%.2f%%) %s",
+ 100. * (double) ts_lcl->clat_low_prio_stat[ddir].samples / (double) samples,
+ name);
+ show_clat_percentiles(ts_lcl->io_u_plat_low_prio[ddir],
+ ts_lcl->clat_low_prio_stat[ddir].samples,
+ ts->percentile_list,
+ ts->percentile_precision, prio_name, out);
+ }
+ }
+
+ if (calc_lat(&ts_lcl->bw_stat[ddir], &min, &max, &mean, &dev)) {
+ double p_of_agg = 100.0, fkb_base = (double)rs->kb_base;
+ const char *bw_str;
+
+ if ((rs->unit_base == 1) && i2p)
+ bw_str = "Kibit";
+ else if (rs->unit_base == 1)
+ bw_str = "kbit";
+ else if (i2p)
+ bw_str = "KiB";
+ else
+ bw_str = "kB";
+
+ p_of_agg = convert_agg_kbytes_percent(rs, ddir, mean);
+
+ if (rs->unit_base == 1) {
+ min *= 8.0;
+ max *= 8.0;
+ mean *= 8.0;
+ dev *= 8.0;
+ }
+
+ if (mean > fkb_base * fkb_base) {
+ min /= fkb_base;
+ max /= fkb_base;
+ mean /= fkb_base;
+ dev /= fkb_base;
+ bw_str = (rs->unit_base == 1 ? "Mibit" : "MiB");
+ }
+
+ log_buf(out, " bw (%5s/s): min=%5llu, max=%5llu, per=%3.2f%%, "
+ "avg=%5.02f, stdev=%5.02f, samples=%" PRIu64 "\n",
+ bw_str, min, max, p_of_agg, mean, dev,
+ (&ts_lcl->bw_stat[ddir])->samples);
+ }
+ if (calc_lat(&ts_lcl->iops_stat[ddir], &min, &max, &mean, &dev)) {
+ log_buf(out, " iops : min=%5llu, max=%5llu, "
+ "avg=%5.02f, stdev=%5.02f, samples=%" PRIu64 "\n",
+ min, max, mean, dev, (&ts_lcl->iops_stat[ddir])->samples);
+ }
+
+ free(ts_lcl);
+}
+
static void show_ddir_status(struct group_run_stats *rs, struct thread_stat *ts,
int ddir, struct buf_output *out)
{
}
log_buf(out, " %s: IOPS=%s, BW=%s (%s)(%s/%llumsec)%s\n",
- rs->unified_rw_rep ? "mixed" : io_ddir_name(ddir),
+ (ts->unified_rw_rep == UNIFIED_MIXED) ? "mixed" : io_ddir_name(ddir),
iops_p, bw_p, bw_p_alt, io_p,
(unsigned long long) ts->runtime[ddir],
post_st ? : "");
show_ddir_status(rs, ts, ddir, out);
}
+ if (ts->unified_rw_rep == UNIFIED_BOTH)
+ show_mixed_ddir_status(rs, ts, out);
+
show_latencies(ts, out);
if (ts->sync_stat.samples)
&minv);
else
len = 0;
-
+
for (i = 0; i < FIO_IO_U_LIST_MAX_LEN; i++) {
if (i >= len) {
log_buf(out, ";0%%=0");
}
}
+static void show_mixed_ddir_status_terse(struct thread_stat *ts,
+ struct group_run_stats *rs,
+ int ver, struct buf_output *out)
+{
+ struct thread_stat *ts_lcl;
+ int i;
+
+ /* Handle aggregation of Reads (ddir = 0), Writes (ddir = 1), and Trims (ddir = 2) */
+ ts_lcl = malloc(sizeof(struct thread_stat));
+ memset((void *)ts_lcl, 0, sizeof(struct thread_stat));
+ ts_lcl->unified_rw_rep = UNIFIED_MIXED; /* calculate mixed stats */
+ for (i = 0; i < DDIR_RWDIR_CNT; i++) {
+ ts_lcl->clat_stat[i].min_val = ULONG_MAX;
+ ts_lcl->slat_stat[i].min_val = ULONG_MAX;
+ ts_lcl->lat_stat[i].min_val = ULONG_MAX;
+ ts_lcl->bw_stat[i].min_val = ULONG_MAX;
+ ts_lcl->iops_stat[i].min_val = ULONG_MAX;
+ ts_lcl->clat_high_prio_stat[i].min_val = ULONG_MAX;
+ ts_lcl->clat_low_prio_stat[i].min_val = ULONG_MAX;
+ }
+ ts_lcl->sync_stat.min_val = ULONG_MAX;
+ ts_lcl->lat_percentiles = ts->lat_percentiles;
+ ts_lcl->clat_percentiles = ts->clat_percentiles;
+ ts_lcl->slat_percentiles = ts->slat_percentiles;
+ ts_lcl->percentile_precision = ts->percentile_precision;
+ memcpy(ts_lcl->percentile_list, ts->percentile_list, sizeof(ts->percentile_list));
+
+ sum_thread_stats(ts_lcl, ts, 1);
+
+ /* add the aggregated stats to json parent */
+ show_ddir_status_terse(ts_lcl, rs, DDIR_READ, ver, out);
+ free(ts_lcl);
+}
+
static struct json_object *add_ddir_lat_json(struct thread_stat *ts, uint32_t percentiles,
struct io_stat *lat_stat, uint64_t *io_u_plat)
{
assert(ddir_rw(ddir) || ddir_sync(ddir));
- if (ts->unified_rw_rep && ddir != DDIR_READ)
+ if ((ts->unified_rw_rep == UNIFIED_MIXED) && ddir != DDIR_READ)
return;
dir_object = json_create_object();
json_object_add_value_object(parent,
- ts->unified_rw_rep ? "mixed" : io_ddir_name(ddir), dir_object);
+ (ts->unified_rw_rep == UNIFIED_MIXED) ? "mixed" : io_ddir_name(ddir), dir_object);
if (ddir_rw(ddir)) {
bw_bytes = 0;
}
}
+static void add_mixed_ddir_status_json(struct thread_stat *ts,
+ struct group_run_stats *rs, struct json_object *parent)
+{
+ struct thread_stat *ts_lcl;
+ int i;
+
+ /* Handle aggregation of Reads (ddir = 0), Writes (ddir = 1), and Trims (ddir = 2) */
+ ts_lcl = malloc(sizeof(struct thread_stat));
+ memset((void *)ts_lcl, 0, sizeof(struct thread_stat));
+ ts_lcl->unified_rw_rep = UNIFIED_MIXED; /* calculate mixed stats */
+ for (i = 0; i < DDIR_RWDIR_CNT; i++) {
+ ts_lcl->clat_stat[i].min_val = ULONG_MAX;
+ ts_lcl->slat_stat[i].min_val = ULONG_MAX;
+ ts_lcl->lat_stat[i].min_val = ULONG_MAX;
+ ts_lcl->bw_stat[i].min_val = ULONG_MAX;
+ ts_lcl->iops_stat[i].min_val = ULONG_MAX;
+ ts_lcl->clat_high_prio_stat[i].min_val = ULONG_MAX;
+ ts_lcl->clat_low_prio_stat[i].min_val = ULONG_MAX;
+ }
+ ts_lcl->sync_stat.min_val = ULONG_MAX;
+ ts_lcl->lat_percentiles = ts->lat_percentiles;
+ ts_lcl->clat_percentiles = ts->clat_percentiles;
+ ts_lcl->slat_percentiles = ts->slat_percentiles;
+ ts_lcl->percentile_precision = ts->percentile_precision;
+ memcpy(ts_lcl->percentile_list, ts->percentile_list, sizeof(ts->percentile_list));
+
+ sum_thread_stats(ts_lcl, ts, 1);
+
+ /* add the aggregated stats to json parent */
+ add_ddir_status_json(ts_lcl, rs, DDIR_READ, parent);
+ free(ts_lcl);
+}
+
static void show_thread_status_terse_all(struct thread_stat *ts,
struct group_run_stats *rs, int ver,
struct buf_output *out)
log_buf(out, "%d;%s;%s;%d;%d", ver, fio_version_string,
ts->name, ts->groupid, ts->error);
- /* Log Read Status */
+ /* Log Read Status, or mixed if unified_rw_rep = 1 */
show_ddir_status_terse(ts, rs, DDIR_READ, ver, out);
- /* Log Write Status */
- show_ddir_status_terse(ts, rs, DDIR_WRITE, ver, out);
- /* Log Trim Status */
- if (ver == 2 || ver == 4 || ver == 5)
- show_ddir_status_terse(ts, rs, DDIR_TRIM, ver, out);
-
+ if (ts->unified_rw_rep != UNIFIED_MIXED) {
+ /* Log Write Status */
+ show_ddir_status_terse(ts, rs, DDIR_WRITE, ver, out);
+ /* Log Trim Status */
+ if (ver == 2 || ver == 4 || ver == 5)
+ show_ddir_status_terse(ts, rs, DDIR_TRIM, ver, out);
+ }
+ if (ts->unified_rw_rep == UNIFIED_BOTH)
+ show_mixed_ddir_status_terse(ts, rs, ver, out);
/* CPU Usage */
if (ts->total_run_time) {
double runt = (double) ts->total_run_time;
add_ddir_status_json(ts, rs, DDIR_TRIM, root);
add_ddir_status_json(ts, rs, DDIR_SYNC, root);
+ if (ts->unified_rw_rep == UNIFIED_BOTH)
+ add_mixed_ddir_status_json(ts, rs, root);
+
/* CPU Usage */
if (ts->total_run_time) {
double runt = (double) ts->total_run_time;
int k, l, m;
for (l = 0; l < DDIR_RWDIR_CNT; l++) {
- if (!dst->unified_rw_rep) {
+ if (!(dst->unified_rw_rep == UNIFIED_MIXED)) {
sum_stat(&dst->clat_stat[l], &src->clat_stat[l], first, false);
sum_stat(&dst->clat_high_prio_stat[l], &src->clat_high_prio_stat[l], first, false);
sum_stat(&dst->clat_low_prio_stat[l], &src->clat_low_prio_stat[l], first, false);
dst->io_u_lat_m[k] += src->io_u_lat_m[k];
for (k = 0; k < DDIR_RWDIR_CNT; k++) {
- if (!dst->unified_rw_rep) {
+ if (!(dst->unified_rw_rep == UNIFIED_MIXED)) {
dst->total_io_u[k] += src->total_io_u[k];
dst->short_io_u[k] += src->short_io_u[k];
dst->drop_io_u[k] += src->drop_io_u[k];
for (k = 0; k < FIO_LAT_CNT; k++)
for (l = 0; l < DDIR_RWDIR_CNT; l++)
for (m = 0; m < FIO_IO_U_PLAT_NR; m++)
- if (!dst->unified_rw_rep)
+ if (!(dst->unified_rw_rep == UNIFIED_MIXED))
dst->io_u_plat[k][l][m] += src->io_u_plat[k][l][m];
else
dst->io_u_plat[k][0][m] += src->io_u_plat[k][l][m];
for (k = 0; k < DDIR_RWDIR_CNT; k++) {
for (m = 0; m < FIO_IO_U_PLAT_NR; m++) {
- if (!dst->unified_rw_rep) {
+ if (!(dst->unified_rw_rep == UNIFIED_MIXED)) {
dst->io_u_plat_high_prio[k][m] += src->io_u_plat_high_prio[k][m];
dst->io_u_plat_low_prio[k][m] += src->io_u_plat_low_prio[k][m];
} else {
rs->kb_base = ts->kb_base;
rs->unit_base = ts->unit_base;
rs->sig_figs = ts->sig_figs;
- rs->unified_rw_rep += ts->unified_rw_rep;
+ rs->unified_rw_rep |= ts->unified_rw_rep;
for (j = 0; j < DDIR_RWDIR_CNT; j++) {
if (!ts->runtime[j])
#define FIO_JOBNAME_SIZE 128
#define FIO_JOBDESC_SIZE 256
#define FIO_VERROR_SIZE 128
+#define UNIFIED_SPLIT 0
+#define UNIFIED_MIXED 1
+#define UNIFIED_BOTH 2
enum fio_lat {
FIO_SLAT = 0,
next_tail = tail = *ring->tail;
do {
next_tail++;
- read_barrier();
- if (next_tail == *ring->head)
+ if (next_tail == atomic_load_acquire(ring->head))
break;
index = tail & sq_ring_mask;
tail = next_tail;
} while (prepped < max_ios);
- if (*ring->tail != tail) {
- *ring->tail = tail;
- write_barrier();
- }
+ if (prepped)
+ atomic_store_release(ring->tail, tail);
return prepped;
}
struct file *f;
read_barrier();
- if (head == *ring->tail)
+ if (head == atomic_load_acquire(ring->tail))
break;
cqe = &ring->cqes[head & cq_ring_mask];
if (!do_nop) {
head++;
} while (1);
- s->inflight -= reaped;
- *ring->head = head;
- write_barrier();
+ if (reaped) {
+ s->inflight -= reaped;
+ atomic_store_release(ring->head, head);
+ }
return reaped;
}
prepped = 0;
do {
int to_wait, to_submit, this_reap, to_prep;
+ unsigned ring_flags = 0;
if (!prepped && s->inflight < depth) {
to_prep = min(depth - s->inflight, batch_submit);
* Only need to call io_uring_enter if we're not using SQ thread
* poll, or if IORING_SQ_NEED_WAKEUP is set.
*/
- if (!sq_thread_poll || (*ring->flags & IORING_SQ_NEED_WAKEUP)) {
+ if (sq_thread_poll)
+ ring_flags = atomic_load_acquire(ring->flags);
+ if (!sq_thread_poll || ring_flags & IORING_SQ_NEED_WAKEUP) {
unsigned flags = 0;
if (to_wait)
flags = IORING_ENTER_GETEVENTS;
- if ((*ring->flags & IORING_SQ_NEED_WAKEUP))
+ if (ring_flags & IORING_SQ_NEED_WAKEUP)
flags |= IORING_ENTER_SQ_WAKEUP;
ret = io_uring_enter(s, to_submit, to_wait, flags);
s->calls++;
+ } else {
+ /* for SQPOLL, we submitted it all effectively */
+ ret = to_submit;
}
/*
>> "${logfile}.${test_number}" 2>&1 || return $?
}
+# test 'z' suffix parsing only
+test55() {
+ local bs
+ bs=$((logical_block_size))
+
+ require_zbd || return $SKIP_TESTCASE
+ # offset=1z + offset_increment=10z + size=2z
+ require_seq_zones 13 || return $SKIP_TESTCASE
+
+ run_fio --name=j \
+ --filename=${dev} \
+ --direct=1 \
+ "$(ioengine "psync")" \
+ --zonemode=zbd \
+ --zonesize=${zone_size} \
+ --rw=write \
+ --bs=${bs} \
+ --numjobs=2 \
+ --offset_increment=10z \
+ --offset=1z \
+ --size=2z \
+ --io_size=3z \
+ ${job_var_opts[@]} --debug=zbd \
+ >> "${logfile}.${test_number}" 2>&1 || return $?
+}
+
+# test 'z' suffix parsing only
+test56() {
+ local bs
+ bs=$((logical_block_size))
+
+ require_regular_block_dev || return $SKIP_TESTCASE
+ require_seq_zones 10 || return $SKIP_TESTCASE
+
+ run_fio --name=j \
+ --filename=${dev} \
+ --direct=1 \
+ "$(ioengine "psync")" \
+ --zonemode=strided \
+ --zonesize=${zone_size} \
+ --rw=write \
+ --bs=${bs} \
+ --size=10z \
+ --zoneskip=2z \
+ ${job_var_opts[@]} --debug=zbd \
+ >> "${logfile}.${test_number}" 2>&1 || return $?
+}
+
+# Test that repeated async write job does not cause zone reset during writes
+# in-flight, when the block size is not a divisor of the zone size.
+test57() {
+ local bs off
+
+ require_zbd || return $SKIP_TESTCASE
+
+ bs=$((4096 * 7))
+ off=$((first_sequential_zone_sector * 512))
+
+ run_fio --name=job --filename="${dev}" --rw=randwrite --bs="${bs}" \
+ --offset="${off}" --size=$((4 * zone_size)) --iodepth=256 \
+ "$(ioengine "libaio")" --time_based=1 --runtime=30s \
+ --zonemode=zbd --direct=1 --zonesize="${zone_size}" \
+ ${job_var_opts[@]} \
+ >> "${logfile}.${test_number}" 2>&1 || return $?
+}
+
SECONDS=0
tests=()
dynamic_analyzer=()
unsigned long long size;
unsigned long long io_size;
unsigned int size_percent;
+ unsigned int size_nz;
unsigned int io_size_percent;
+ unsigned int io_size_nz;
unsigned int fill_device;
unsigned int file_append;
unsigned long long file_size_low;
unsigned long long file_size_high;
unsigned long long start_offset;
unsigned long long start_offset_align;
+ unsigned int start_offset_nz;
unsigned long long bs[DDIR_RWDIR_CNT];
unsigned long long ba[DDIR_RWDIR_CNT];
unsigned long long zone_size;
unsigned long long zone_capacity;
unsigned long long zone_skip;
+ uint32_t zone_skip_nz;
enum fio_zone_mode zone_mode;
unsigned long long lockmem;
enum fio_memtype mem_type;
unsigned int mem_align;
- unsigned long long max_latency;
+ unsigned long long max_latency[DDIR_RWDIR_CNT];
unsigned int exit_what;
unsigned int stonewall;
unsigned int gid;
unsigned int offset_increment_percent;
+ unsigned int offset_increment_nz;
unsigned long long offset_increment;
unsigned long long number_ios;
uint64_t size;
uint64_t io_size;
uint32_t size_percent;
+ uint32_t size_nz;
uint32_t io_size_percent;
+ uint32_t io_size_nz;
uint32_t fill_device;
uint32_t file_append;
uint32_t unique_filename;
+ uint32_t pad3;
uint64_t file_size_low;
uint64_t file_size_high;
uint64_t start_offset;
uint64_t start_offset_align;
+ uint32_t start_offset_nz;
+ uint32_t pad4;
uint64_t bs[DDIR_RWDIR_CNT];
uint64_t ba[DDIR_RWDIR_CNT];
struct zone_split zone_split[DDIR_RWDIR_CNT][ZONESPLIT_MAX];
uint32_t zone_split_nr[DDIR_RWDIR_CNT];
- uint8_t pad1[4];
-
fio_fp64_t zipf_theta;
fio_fp64_t pareto_h;
fio_fp64_t gauss_dev;
uint64_t zone_capacity;
uint64_t zone_skip;
uint64_t lockmem;
+ uint32_t zone_skip_nz;
uint32_t mem_type;
uint32_t mem_align;
uint32_t new_group;
uint32_t numjobs;
- uint8_t pad3[4];
-
/*
* We currently can't convert these, so don't enable them
*/
uint32_t gid;
uint32_t offset_increment_percent;
+ uint32_t offset_increment_nz;
uint64_t offset_increment;
uint64_t number_ios;
uint64_t latency_target;
uint64_t latency_window;
- uint64_t max_latency;
+ uint64_t max_latency[DDIR_RWDIR_CNT];
+ uint32_t pad5;
fio_fp64_t latency_percentile;
uint32_t latency_run;
return false;
}
- if (td->o.zone_skip &&
- (td->o.zone_skip < td->o.zone_size ||
- td->o.zone_skip % td->o.zone_size)) {
+ if (td->o.zone_skip % td->o.zone_size) {
log_err("%s: zoneskip %llu is not a multiple of the device zone size %llu.\n",
f->file_name, (unsigned long long) td->o.zone_skip,
(unsigned long long) td->o.zone_size);
{
struct thread_data *td;
struct fio_file *f;
- uint32_t zone_size;
int i, j, k;
for_each_td(td, i) {
for_each_file(td, f, j) {
+ uint64_t zone_size;
+
if (!f->zbd_info)
continue;
zone_size = f->zbd_info->zone_size;
for (k = 0; k < FIO_ARRAY_SIZE(td->o.bs); k++) {
if (td->o.verify != VERIFY_NONE &&
zone_size % td->o.bs[k] != 0) {
- log_info("%s: block size %llu is not a divisor of the zone size %d\n",
+ log_info("%s: block size %llu is not a divisor of the zone size %llu\n",
f->file_name, td->o.bs[k],
- zone_size);
+ (unsigned long long)zone_size);
return false;
}
}
static int zbd_reset_zone(struct thread_data *td, struct fio_file *f,
struct fio_zone_info *z);
-int zbd_setup_files(struct thread_data *td)
+int zbd_init_files(struct thread_data *td)
{
struct fio_file *f;
int i;
if (zbd_init_zone_info(td, f))
return 1;
}
+ return 0;
+}
+
+void zbd_recalc_options_with_zone_granularity(struct thread_data *td)
+{
+ struct fio_file *f;
+ int i;
+
+ for_each_file(td, f, i) {
+ struct zoned_block_device_info *zbd = f->zbd_info;
+ // zonemode=strided doesn't get per-file zone size.
+ uint64_t zone_size = zbd ? zbd->zone_size : td->o.zone_size;
+
+ if (zone_size == 0)
+ continue;
+
+ if (td->o.size_nz > 0) {
+ td->o.size = td->o.size_nz * zone_size;
+ }
+ if (td->o.io_size_nz > 0) {
+ td->o.io_size = td->o.io_size_nz * zone_size;
+ }
+ if (td->o.start_offset_nz > 0) {
+ td->o.start_offset = td->o.start_offset_nz * zone_size;
+ }
+ if (td->o.offset_increment_nz > 0) {
+ td->o.offset_increment = td->o.offset_increment_nz * zone_size;
+ }
+ if (td->o.zone_skip_nz > 0) {
+ td->o.zone_skip = td->o.zone_skip_nz * zone_size;
+ }
+ }
+}
+
+int zbd_setup_files(struct thread_data *td)
+{
+ struct fio_file *f;
+ int i;
if (!zbd_using_direct_io()) {
log_err("Using direct I/O is mandatory for writing to ZBD drives\n\n");
* @f: fio file for which to reset zones
* @zb: first zone to reset.
* @ze: first zone not to reset.
- * @all_zones: whether to reset all zones or only those zones for which the
- * write pointer is not a multiple of td->o.min_bs[DDIR_WRITE].
*/
static int zbd_reset_zones(struct thread_data *td, struct fio_file *f,
struct fio_zone_info *const zb,
- struct fio_zone_info *const ze, bool all_zones)
+ struct fio_zone_info *const ze)
{
struct fio_zone_info *z;
const uint32_t min_bs = td->o.min_bs[DDIR_WRITE];
- bool reset_wp;
int res = 0;
assert(min_bs);
if (!z->has_wp)
continue;
zone_lock(td, f, z);
- if (all_zones) {
- pthread_mutex_lock(&f->zbd_info->mutex);
- zbd_close_zone(td, f, nz);
- pthread_mutex_unlock(&f->zbd_info->mutex);
-
- reset_wp = z->wp != z->start;
- } else {
- reset_wp = z->wp % min_bs != 0;
- }
- if (reset_wp) {
+ pthread_mutex_lock(&f->zbd_info->mutex);
+ zbd_close_zone(td, f, nz);
+ pthread_mutex_unlock(&f->zbd_info->mutex);
+ if (z->wp != z->start) {
dprint(FD_ZBD, "%s: resetting zone %u\n",
f->file_name, zbd_zone_nr(f, z));
if (zbd_reset_zone(td, f, z) < 0)
* writing any data to avoid that a zone reset has to be issued while
* writing data, which causes data loss.
*/
- zbd_reset_zones(td, f, zb, ze, td->o.verify != VERIFY_NONE &&
- td->runstate != TD_VERIFYING);
+ if (td->o.verify != VERIFY_NONE && td->runstate != TD_VERIFYING)
+ zbd_reset_zones(td, f, zb, ze);
zbd_reset_write_cnt(td, f);
}
struct fio_zone_info zone_info[0];
};
+int zbd_init_files(struct thread_data *td);
+void zbd_recalc_options_with_zone_granularity(struct thread_data *td);
int zbd_setup_files(struct thread_data *td);
void zbd_free_zone_info(struct fio_file *f);
void zbd_file_reset(struct thread_data *td, struct fio_file *f);