--- /dev/null
+/**
+ * FIO engine for DAOS File System (dfs).
+ *
+ * (C) Copyright 2020-2021 Intel Corporation.
+ */
+
+#include <fio.h>
+#include <optgroup.h>
+
+#include <daos.h>
+#include <daos_fs.h>
+
+static bool daos_initialized;
+static int num_threads;
+static pthread_mutex_t daos_mutex = PTHREAD_MUTEX_INITIALIZER;
+daos_handle_t poh; /* pool handle */
+daos_handle_t coh; /* container handle */
+daos_oclass_id_t cid = OC_UNKNOWN; /* object class */
+dfs_t *dfs; /* dfs mount reference */
+
+struct daos_iou {
+ struct io_u *io_u;
+ daos_event_t ev;
+ d_sg_list_t sgl;
+ d_iov_t iov;
+ daos_size_t size;
+ bool complete;
+};
+
+struct daos_data {
+ daos_handle_t eqh;
+ dfs_obj_t *obj;
+ struct io_u **io_us;
+ int queued;
+ int num_ios;
+};
+
+struct daos_fio_options {
+ void *pad;
+ char *pool; /* Pool UUID */
+ char *cont; /* Container UUID */
+ daos_size_t chsz; /* Chunk size */
+ char *oclass; /* object class */
+#if !defined(DAOS_API_VERSION_MAJOR) || DAOS_API_VERSION_MAJOR < 1
+ char *svcl; /* service replica list, deprecated */
+#endif
+};
+
+static struct fio_option options[] = {
+ {
+ .name = "pool",
+ .lname = "pool uuid",
+ .type = FIO_OPT_STR_STORE,
+ .off1 = offsetof(struct daos_fio_options, pool),
+ .help = "DAOS pool uuid",
+ .category = FIO_OPT_C_ENGINE,
+ .group = FIO_OPT_G_DFS,
+ },
+ {
+ .name = "cont",
+ .lname = "container uuid",
+ .type = FIO_OPT_STR_STORE,
+ .off1 = offsetof(struct daos_fio_options, cont),
+ .help = "DAOS container uuid",
+ .category = FIO_OPT_C_ENGINE,
+ .group = FIO_OPT_G_DFS,
+ },
+ {
+ .name = "chunk_size",
+ .lname = "DFS chunk size",
+ .type = FIO_OPT_ULL,
+ .off1 = offsetof(struct daos_fio_options, chsz),
+ .help = "DFS chunk size in bytes",
+ .def = "0", /* use container default */
+ .category = FIO_OPT_C_ENGINE,
+ .group = FIO_OPT_G_DFS,
+ },
+ {
+ .name = "object_class",
+ .lname = "object class",
+ .type = FIO_OPT_STR_STORE,
+ .off1 = offsetof(struct daos_fio_options, oclass),
+ .help = "DAOS object class",
+ .category = FIO_OPT_C_ENGINE,
+ .group = FIO_OPT_G_DFS,
+ },
+#if !defined(DAOS_API_VERSION_MAJOR) || DAOS_API_VERSION_MAJOR < 1
+ {
+ .name = "svcl",
+ .lname = "List of service ranks",
+ .type = FIO_OPT_STR_STORE,
+ .off1 = offsetof(struct daos_fio_options, svcl),
+ .help = "List of pool replicated service ranks",
+ .category = FIO_OPT_C_ENGINE,
+ .group = FIO_OPT_G_DFS,
+ },
+#endif
+ {
+ .name = NULL,
+ },
+};
+
+static int daos_fio_global_init(struct thread_data *td)
+{
+ struct daos_fio_options *eo = td->eo;
+ uuid_t pool_uuid, co_uuid;
+ daos_pool_info_t pool_info;
+ daos_cont_info_t co_info;
+ int rc = 0;
+
+#if !defined(DAOS_API_VERSION_MAJOR) || DAOS_API_VERSION_MAJOR < 1
+ if (!eo->pool || !eo->cont || !eo->svcl) {
+#else
+ if (!eo->pool || !eo->cont) {
+#endif
+ log_err("Missing required DAOS options\n");
+ return EINVAL;
+ }
+
+ rc = daos_init();
+ if (rc != -DER_ALREADY && rc) {
+ log_err("Failed to initialize daos %d\n", rc);
+ td_verror(td, rc, "daos_init");
+ return rc;
+ }
+
+ rc = uuid_parse(eo->pool, pool_uuid);
+ if (rc) {
+ log_err("Failed to parse 'Pool uuid': %s\n", eo->pool);
+ td_verror(td, EINVAL, "uuid_parse(eo->pool)");
+ return EINVAL;
+ }
+
+ rc = uuid_parse(eo->cont, co_uuid);
+ if (rc) {
+ log_err("Failed to parse 'Cont uuid': %s\n", eo->cont);
+ td_verror(td, EINVAL, "uuid_parse(eo->cont)");
+ return EINVAL;
+ }
+
+ /* Connect to the DAOS pool */
+#if !defined(DAOS_API_VERSION_MAJOR) || DAOS_API_VERSION_MAJOR < 1
+ d_rank_list_t *svcl = NULL;
+
+ svcl = daos_rank_list_parse(eo->svcl, ":");
+ if (svcl == NULL) {
+ log_err("Failed to parse svcl\n");
+ td_verror(td, EINVAL, "daos_rank_list_parse");
+ return EINVAL;
+ }
+
+ rc = daos_pool_connect(pool_uuid, NULL, svcl, DAOS_PC_RW,
+ &poh, &pool_info, NULL);
+ d_rank_list_free(svcl);
+#else
+ rc = daos_pool_connect(pool_uuid, NULL, DAOS_PC_RW, &poh, &pool_info,
+ NULL);
+#endif
+ if (rc) {
+ log_err("Failed to connect to pool %d\n", rc);
+ td_verror(td, rc, "daos_pool_connect");
+ return rc;
+ }
+
+ /* Open the DAOS container */
+ rc = daos_cont_open(poh, co_uuid, DAOS_COO_RW, &coh, &co_info, NULL);
+ if (rc) {
+ log_err("Failed to open container: %d\n", rc);
+ td_verror(td, rc, "daos_cont_open");
+ (void)daos_pool_disconnect(poh, NULL);
+ return rc;
+ }
+
+ /* Mount encapsulated filesystem */
+ rc = dfs_mount(poh, coh, O_RDWR, &dfs);
+ if (rc) {
+ log_err("Failed to mount DFS namespace: %d\n", rc);
+ td_verror(td, rc, "dfs_mount");
+ (void)daos_pool_disconnect(poh, NULL);
+ (void)daos_cont_close(coh, NULL);
+ return rc;
+ }
+
+ /* Retrieve object class to use, if specified */
+ if (eo->oclass)
+ cid = daos_oclass_name2id(eo->oclass);
+
+ return 0;
+}
+
+static int daos_fio_global_cleanup()
+{
+ int rc;
+ int ret = 0;
+
+ rc = dfs_umount(dfs);
+ if (rc) {
+ log_err("failed to umount dfs: %d\n", rc);
+ ret = rc;
+ }
+ rc = daos_cont_close(coh, NULL);
+ if (rc) {
+ log_err("failed to close container: %d\n", rc);
+ if (ret == 0)
+ ret = rc;
+ }
+ rc = daos_pool_disconnect(poh, NULL);
+ if (rc) {
+ log_err("failed to disconnect pool: %d\n", rc);
+ if (ret == 0)
+ ret = rc;
+ }
+ rc = daos_fini();
+ if (rc) {
+ log_err("failed to finalize daos: %d\n", rc);
+ if (ret == 0)
+ ret = rc;
+ }
+
+ return ret;
+}
+
+static int daos_fio_setup(struct thread_data *td)
+{
+ return 0;
+}
+
+static int daos_fio_init(struct thread_data *td)
+{
+ struct daos_data *dd;
+ int rc = 0;
+
+ pthread_mutex_lock(&daos_mutex);
+
+ dd = malloc(sizeof(*dd));
+ if (dd == NULL) {
+ log_err("Failed to allocate DAOS-private data\n");
+ rc = ENOMEM;
+ goto out;
+ }
+
+ dd->queued = 0;
+ dd->num_ios = td->o.iodepth;
+ dd->io_us = calloc(dd->num_ios, sizeof(struct io_u *));
+ if (dd->io_us == NULL) {
+ log_err("Failed to allocate IO queue\n");
+ rc = ENOMEM;
+ goto out;
+ }
+
+ /* initialize DAOS stack if not already up */
+ if (!daos_initialized) {
+ rc = daos_fio_global_init(td);
+ if (rc)
+ goto out;
+ daos_initialized = true;
+ }
+
+ rc = daos_eq_create(&dd->eqh);
+ if (rc) {
+ log_err("Failed to create event queue: %d\n", rc);
+ td_verror(td, rc, "daos_eq_create");
+ goto out;
+ }
+
+ td->io_ops_data = dd;
+ num_threads++;
+out:
+ if (rc) {
+ if (dd) {
+ free(dd->io_us);
+ free(dd);
+ }
+ if (num_threads == 0 && daos_initialized) {
+ /* don't clobber error return value */
+ (void)daos_fio_global_cleanup();
+ daos_initialized = false;
+ }
+ }
+ pthread_mutex_unlock(&daos_mutex);
+ return rc;
+}
+
+static void daos_fio_cleanup(struct thread_data *td)
+{
+ struct daos_data *dd = td->io_ops_data;
+ int rc;
+
+ if (dd == NULL)
+ return;
+
+ rc = daos_eq_destroy(dd->eqh, DAOS_EQ_DESTROY_FORCE);
+ if (rc < 0) {
+ log_err("failed to destroy event queue: %d\n", rc);
+ td_verror(td, rc, "daos_eq_destroy");
+ }
+
+ free(dd->io_us);
+ free(dd);
+
+ pthread_mutex_lock(&daos_mutex);
+ num_threads--;
+ if (daos_initialized && num_threads == 0) {
+ int ret;
+
+ ret = daos_fio_global_cleanup();
+ if (ret < 0 && rc == 0) {
+ log_err("failed to clean up: %d\n", ret);
+ td_verror(td, ret, "daos_fio_global_cleanup");
+ }
+ daos_initialized = false;
+ }
+ pthread_mutex_unlock(&daos_mutex);
+}
+
+static int daos_fio_get_file_size(struct thread_data *td, struct fio_file *f)
+{
+ char *file_name = f->file_name;
+ struct stat stbuf = {0};
+ int rc;
+
+ dprint(FD_FILE, "dfs stat %s\n", f->file_name);
+
+ if (!daos_initialized)
+ return 0;
+
+ rc = dfs_stat(dfs, NULL, file_name, &stbuf);
+ if (rc) {
+ log_err("Failed to stat %s: %d\n", f->file_name, rc);
+ td_verror(td, rc, "dfs_stat");
+ return rc;
+ }
+
+ f->real_file_size = stbuf.st_size;
+ return 0;
+}
+
+static int daos_fio_close(struct thread_data *td, struct fio_file *f)
+{
+ struct daos_data *dd = td->io_ops_data;
+ int rc;
+
+ dprint(FD_FILE, "dfs release %s\n", f->file_name);
+
+ rc = dfs_release(dd->obj);
+ if (rc) {
+ log_err("Failed to release %s: %d\n", f->file_name, rc);
+ td_verror(td, rc, "dfs_release");
+ return rc;
+ }
+
+ return 0;
+}
+
+static int daos_fio_open(struct thread_data *td, struct fio_file *f)
+{
+ struct daos_data *dd = td->io_ops_data;
+ struct daos_fio_options *eo = td->eo;
+ int flags = 0;
+ int rc;
+
+ dprint(FD_FILE, "dfs open %s (%s/%d/%d)\n",
+ f->file_name, td_write(td) & !read_only ? "rw" : "r",
+ td->o.create_on_open, td->o.allow_create);
+
+ if (td->o.create_on_open && td->o.allow_create)
+ flags |= O_CREAT;
+
+ if (td_write(td)) {
+ if (!read_only)
+ flags |= O_RDWR;
+ if (td->o.allow_create)
+ flags |= O_CREAT;
+ } else if (td_read(td)) {
+ flags |= O_RDONLY;
+ }
+
+ rc = dfs_open(dfs, NULL, f->file_name,
+ S_IFREG | S_IRUSR | S_IWUSR,
+ flags, cid, eo->chsz, NULL, &dd->obj);
+ if (rc) {
+ log_err("Failed to open %s: %d\n", f->file_name, rc);
+ td_verror(td, rc, "dfs_open");
+ return rc;
+ }
+
+ return 0;
+}
+
+static int daos_fio_unlink(struct thread_data *td, struct fio_file *f)
+{
+ int rc;
+
+ dprint(FD_FILE, "dfs remove %s\n", f->file_name);
+
+ rc = dfs_remove(dfs, NULL, f->file_name, false, NULL);
+ if (rc) {
+ log_err("Failed to remove %s: %d\n", f->file_name, rc);
+ td_verror(td, rc, "dfs_remove");
+ return rc;
+ }
+
+ return 0;
+}
+
+static int daos_fio_invalidate(struct thread_data *td, struct fio_file *f)
+{
+ dprint(FD_FILE, "dfs invalidate %s\n", f->file_name);
+ return 0;
+}
+
+static void daos_fio_io_u_free(struct thread_data *td, struct io_u *io_u)
+{
+ struct daos_iou *io = io_u->engine_data;
+
+ if (io) {
+ io_u->engine_data = NULL;
+ free(io);
+ }
+}
+
+static int daos_fio_io_u_init(struct thread_data *td, struct io_u *io_u)
+{
+ struct daos_iou *io;
+
+ io = malloc(sizeof(struct daos_iou));
+ if (!io) {
+ td_verror(td, ENOMEM, "malloc");
+ return ENOMEM;
+ }
+ io->io_u = io_u;
+ io_u->engine_data = io;
+ return 0;
+}
+
+static struct io_u * daos_fio_event(struct thread_data *td, int event)
+{
+ struct daos_data *dd = td->io_ops_data;
+
+ return dd->io_us[event];
+}
+
+static int daos_fio_getevents(struct thread_data *td, unsigned int min,
+ unsigned int max, const struct timespec *t)
+{
+ struct daos_data *dd = td->io_ops_data;
+ daos_event_t *evp[max];
+ unsigned int events = 0;
+ int i;
+ int rc;
+
+ while (events < min) {
+ rc = daos_eq_poll(dd->eqh, 0, DAOS_EQ_NOWAIT, max, evp);
+ if (rc < 0) {
+ log_err("Event poll failed: %d\n", rc);
+ td_verror(td, rc, "daos_eq_poll");
+ return events;
+ }
+
+ for (i = 0; i < rc; i++) {
+ struct daos_iou *io;
+ struct io_u *io_u;
+
+ io = container_of(evp[i], struct daos_iou, ev);
+ if (io->complete)
+ log_err("Completion on already completed I/O\n");
+
+ io_u = io->io_u;
+ if (io->ev.ev_error)
+ io_u->error = io->ev.ev_error;
+ else
+ io_u->resid = 0;
+
+ dd->io_us[events] = io_u;
+ dd->queued--;
+ daos_event_fini(&io->ev);
+ io->complete = true;
+ events++;
+ }
+ }
+
+ dprint(FD_IO, "dfs eq_pool returning %d (%u/%u)\n", events, min, max);
+
+ return events;
+}
+
+static enum fio_q_status daos_fio_queue(struct thread_data *td,
+ struct io_u *io_u)
+{
+ struct daos_data *dd = td->io_ops_data;
+ struct daos_iou *io = io_u->engine_data;
+ daos_off_t offset = io_u->offset;
+ int rc;
+
+ if (dd->queued == td->o.iodepth)
+ return FIO_Q_BUSY;
+
+ io->sgl.sg_nr = 1;
+ io->sgl.sg_nr_out = 0;
+ d_iov_set(&io->iov, io_u->xfer_buf, io_u->xfer_buflen);
+ io->sgl.sg_iovs = &io->iov;
+ io->size = io_u->xfer_buflen;
+
+ io->complete = false;
+ rc = daos_event_init(&io->ev, dd->eqh, NULL);
+ if (rc) {
+ log_err("Event init failed: %d\n", rc);
+ io_u->error = rc;
+ return FIO_Q_COMPLETED;
+ }
+
+ switch (io_u->ddir) {
+ case DDIR_WRITE:
+ rc = dfs_write(dfs, dd->obj, &io->sgl, offset, &io->ev);
+ if (rc) {
+ log_err("dfs_write failed: %d\n", rc);
+ io_u->error = rc;
+ return FIO_Q_COMPLETED;
+ }
+ break;
+ case DDIR_READ:
+ rc = dfs_read(dfs, dd->obj, &io->sgl, offset, &io->size,
+ &io->ev);
+ if (rc) {
+ log_err("dfs_read failed: %d\n", rc);
+ io_u->error = rc;
+ return FIO_Q_COMPLETED;
+ }
+ break;
+ case DDIR_SYNC:
+ io_u->error = 0;
+ return FIO_Q_COMPLETED;
+ default:
+ dprint(FD_IO, "Invalid IO type: %d\n", io_u->ddir);
+ io_u->error = -DER_INVAL;
+ return FIO_Q_COMPLETED;
+ }
+
+ dd->queued++;
+ return FIO_Q_QUEUED;
+}
+
+static int daos_fio_prep(struct thread_data fio_unused *td, struct io_u *io_u)
+{
+ return 0;
+}
+
+/* ioengine_ops for get_ioengine() */
+FIO_STATIC struct ioengine_ops ioengine = {
+ .name = "dfs",
+ .version = FIO_IOOPS_VERSION,
+ .flags = FIO_DISKLESSIO | FIO_NODISKUTIL,
+
+ .setup = daos_fio_setup,
+ .init = daos_fio_init,
+ .prep = daos_fio_prep,
+ .cleanup = daos_fio_cleanup,
+
+ .open_file = daos_fio_open,
+ .invalidate = daos_fio_invalidate,
+ .get_file_size = daos_fio_get_file_size,
+ .close_file = daos_fio_close,
+ .unlink_file = daos_fio_unlink,
+
+ .queue = daos_fio_queue,
+ .getevents = daos_fio_getevents,
+ .event = daos_fio_event,
+ .io_u_init = daos_fio_io_u_init,
+ .io_u_free = daos_fio_io_u_free,
+
+ .option_struct_size = sizeof(struct daos_fio_options),
+ .options = options,
+};
+
+static void fio_init fio_dfs_register(void)
+{
+ register_ioengine(&ioengine);
+}
+
+static void fio_exit fio_dfs_unregister(void)
+{
+ unregister_ioengine(&ioengine);
+}