engines/dfs: add DAOS File System (dfs) engine
[fio.git] / engines / dfs.c
diff --git a/engines/dfs.c b/engines/dfs.c
new file mode 100644 (file)
index 0000000..0343b10
--- /dev/null
@@ -0,0 +1,583 @@
+/**
+ * FIO engine for DAOS File System (dfs).
+ *
+ * (C) Copyright 2020-2021 Intel Corporation.
+ */
+
+#include <fio.h>
+#include <optgroup.h>
+
+#include <daos.h>
+#include <daos_fs.h>
+
+static bool            daos_initialized;
+static int             num_threads;
+static pthread_mutex_t daos_mutex = PTHREAD_MUTEX_INITIALIZER;
+daos_handle_t          poh;  /* pool handle */
+daos_handle_t          coh;  /* container handle */
+daos_oclass_id_t       cid = OC_UNKNOWN;  /* object class */
+dfs_t                  *dfs; /* dfs mount reference */
+
+struct daos_iou {
+       struct io_u     *io_u;
+       daos_event_t    ev;
+       d_sg_list_t     sgl;
+       d_iov_t         iov;
+       daos_size_t     size;
+       bool            complete;
+};
+
+struct daos_data {
+       daos_handle_t   eqh;
+       dfs_obj_t       *obj;
+       struct io_u     **io_us;
+       int             queued;
+       int             num_ios;
+};
+
+struct daos_fio_options {
+       void            *pad;
+       char            *pool;   /* Pool UUID */
+       char            *cont;   /* Container UUID */
+       daos_size_t     chsz;    /* Chunk size */
+       char            *oclass; /* object class */
+#if !defined(DAOS_API_VERSION_MAJOR) || DAOS_API_VERSION_MAJOR < 1
+       char            *svcl;   /* service replica list, deprecated */
+#endif
+};
+
+static struct fio_option options[] = {
+       {
+               .name           = "pool",
+               .lname          = "pool uuid",
+               .type           = FIO_OPT_STR_STORE,
+               .off1           = offsetof(struct daos_fio_options, pool),
+               .help           = "DAOS pool uuid",
+               .category       = FIO_OPT_C_ENGINE,
+               .group          = FIO_OPT_G_DFS,
+       },
+       {
+               .name           = "cont",
+               .lname          = "container uuid",
+               .type           = FIO_OPT_STR_STORE,
+               .off1           = offsetof(struct daos_fio_options, cont),
+               .help           = "DAOS container uuid",
+               .category       = FIO_OPT_C_ENGINE,
+               .group          = FIO_OPT_G_DFS,
+       },
+       {
+               .name           = "chunk_size",
+               .lname          = "DFS chunk size",
+               .type           = FIO_OPT_ULL,
+               .off1           = offsetof(struct daos_fio_options, chsz),
+               .help           = "DFS chunk size in bytes",
+               .def            = "0", /* use container default */
+               .category       = FIO_OPT_C_ENGINE,
+               .group          = FIO_OPT_G_DFS,
+       },
+       {
+               .name           = "object_class",
+               .lname          = "object class",
+               .type           = FIO_OPT_STR_STORE,
+               .off1           = offsetof(struct daos_fio_options, oclass),
+               .help           = "DAOS object class",
+               .category       = FIO_OPT_C_ENGINE,
+               .group          = FIO_OPT_G_DFS,
+       },
+#if !defined(DAOS_API_VERSION_MAJOR) || DAOS_API_VERSION_MAJOR < 1
+       {
+               .name           = "svcl",
+               .lname          = "List of service ranks",
+               .type           = FIO_OPT_STR_STORE,
+               .off1           = offsetof(struct daos_fio_options, svcl),
+               .help           = "List of pool replicated service ranks",
+               .category       = FIO_OPT_C_ENGINE,
+               .group          = FIO_OPT_G_DFS,
+       },
+#endif
+       {
+               .name           = NULL,
+       },
+};
+
+static int daos_fio_global_init(struct thread_data *td)
+{
+       struct daos_fio_options *eo = td->eo;
+       uuid_t                  pool_uuid, co_uuid;
+       daos_pool_info_t        pool_info;
+       daos_cont_info_t        co_info;
+       int                     rc = 0;
+
+#if !defined(DAOS_API_VERSION_MAJOR) || DAOS_API_VERSION_MAJOR < 1
+       if (!eo->pool || !eo->cont || !eo->svcl) {
+#else
+       if (!eo->pool || !eo->cont) {
+#endif
+               log_err("Missing required DAOS options\n");
+               return EINVAL;
+       }
+
+       rc = daos_init();
+       if (rc != -DER_ALREADY && rc) {
+               log_err("Failed to initialize daos %d\n", rc);
+               td_verror(td, rc, "daos_init");
+               return rc;
+       }
+
+       rc = uuid_parse(eo->pool, pool_uuid);
+       if (rc) {
+               log_err("Failed to parse 'Pool uuid': %s\n", eo->pool);
+               td_verror(td, EINVAL, "uuid_parse(eo->pool)");
+               return EINVAL;
+       }
+
+       rc = uuid_parse(eo->cont, co_uuid);
+       if (rc) {
+               log_err("Failed to parse 'Cont uuid': %s\n", eo->cont);
+               td_verror(td, EINVAL, "uuid_parse(eo->cont)");
+               return EINVAL;
+       }
+
+       /* Connect to the DAOS pool */
+#if !defined(DAOS_API_VERSION_MAJOR) || DAOS_API_VERSION_MAJOR < 1
+       d_rank_list_t *svcl = NULL;
+
+       svcl = daos_rank_list_parse(eo->svcl, ":");
+       if (svcl == NULL) {
+               log_err("Failed to parse svcl\n");
+               td_verror(td, EINVAL, "daos_rank_list_parse");
+               return EINVAL;
+       }
+
+       rc = daos_pool_connect(pool_uuid, NULL, svcl, DAOS_PC_RW,
+                       &poh, &pool_info, NULL);
+       d_rank_list_free(svcl);
+#else
+       rc = daos_pool_connect(pool_uuid, NULL, DAOS_PC_RW, &poh, &pool_info,
+                              NULL);
+#endif
+       if (rc) {
+               log_err("Failed to connect to pool %d\n", rc);
+               td_verror(td, rc, "daos_pool_connect");
+               return rc;
+       }
+
+       /* Open the DAOS container */
+       rc = daos_cont_open(poh, co_uuid, DAOS_COO_RW, &coh, &co_info, NULL);
+       if (rc) {
+               log_err("Failed to open container: %d\n", rc);
+               td_verror(td, rc, "daos_cont_open");
+               (void)daos_pool_disconnect(poh, NULL);
+               return rc;
+       }
+
+       /* Mount encapsulated filesystem */
+       rc = dfs_mount(poh, coh, O_RDWR, &dfs);
+       if (rc) {
+               log_err("Failed to mount DFS namespace: %d\n", rc);
+               td_verror(td, rc, "dfs_mount");
+               (void)daos_pool_disconnect(poh, NULL);
+               (void)daos_cont_close(coh, NULL);
+               return rc;
+       }
+
+       /* Retrieve object class to use, if specified */
+       if (eo->oclass)
+               cid = daos_oclass_name2id(eo->oclass);
+
+       return 0;
+}
+
+static int daos_fio_global_cleanup()
+{
+       int rc;
+       int ret = 0;
+
+       rc = dfs_umount(dfs);
+       if (rc) {
+               log_err("failed to umount dfs: %d\n", rc);
+               ret = rc;
+       }
+       rc = daos_cont_close(coh, NULL);
+       if (rc) {
+               log_err("failed to close container: %d\n", rc);
+               if (ret == 0)
+                       ret = rc;
+       }
+       rc = daos_pool_disconnect(poh, NULL);
+       if (rc) {
+               log_err("failed to disconnect pool: %d\n", rc);
+               if (ret == 0)
+                       ret = rc;
+       }
+       rc = daos_fini();
+       if (rc) {
+               log_err("failed to finalize daos: %d\n", rc);
+               if (ret == 0)
+                       ret = rc;
+       }
+
+       return ret;
+}
+
+static int daos_fio_setup(struct thread_data *td)
+{
+       return 0;
+}
+
+static int daos_fio_init(struct thread_data *td)
+{
+       struct daos_data        *dd;
+       int                     rc = 0;
+
+       pthread_mutex_lock(&daos_mutex);
+
+       dd = malloc(sizeof(*dd));
+       if (dd == NULL) {
+               log_err("Failed to allocate DAOS-private data\n");
+               rc = ENOMEM;
+               goto out;
+       }
+
+       dd->queued      = 0;
+       dd->num_ios     = td->o.iodepth;
+       dd->io_us       = calloc(dd->num_ios, sizeof(struct io_u *));
+       if (dd->io_us == NULL) {
+               log_err("Failed to allocate IO queue\n");
+               rc = ENOMEM;
+               goto out;
+       }
+
+       /* initialize DAOS stack if not already up */
+       if (!daos_initialized) {
+               rc = daos_fio_global_init(td);
+               if (rc)
+                       goto out;
+               daos_initialized = true;
+       }
+
+       rc = daos_eq_create(&dd->eqh);
+       if (rc) {
+               log_err("Failed to create event queue: %d\n", rc);
+               td_verror(td, rc, "daos_eq_create");
+               goto out;
+       }
+
+       td->io_ops_data = dd;
+       num_threads++;
+out:
+       if (rc) {
+               if (dd) {
+                       free(dd->io_us);
+                       free(dd);
+               }
+               if (num_threads == 0 && daos_initialized) {
+                       /* don't clobber error return value */
+                       (void)daos_fio_global_cleanup();
+                       daos_initialized = false;
+               }
+       }
+       pthread_mutex_unlock(&daos_mutex);
+       return rc;
+}
+
+static void daos_fio_cleanup(struct thread_data *td)
+{
+       struct daos_data        *dd = td->io_ops_data;
+       int                     rc;
+
+       if (dd == NULL)
+               return;
+
+       rc = daos_eq_destroy(dd->eqh, DAOS_EQ_DESTROY_FORCE);
+       if (rc < 0) {
+               log_err("failed to destroy event queue: %d\n", rc);
+               td_verror(td, rc, "daos_eq_destroy");
+       }
+
+       free(dd->io_us);
+       free(dd);
+
+       pthread_mutex_lock(&daos_mutex);
+       num_threads--;
+       if (daos_initialized && num_threads == 0) {
+               int ret;
+
+               ret = daos_fio_global_cleanup();
+               if (ret < 0 && rc == 0) {
+                       log_err("failed to clean up: %d\n", ret);
+                       td_verror(td, ret, "daos_fio_global_cleanup");
+               }
+               daos_initialized = false;
+       }
+       pthread_mutex_unlock(&daos_mutex);
+}
+
+static int daos_fio_get_file_size(struct thread_data *td, struct fio_file *f)
+{
+       char            *file_name = f->file_name;
+       struct stat     stbuf = {0};
+       int             rc;
+
+       dprint(FD_FILE, "dfs stat %s\n", f->file_name);
+
+       if (!daos_initialized)
+               return 0;
+
+       rc = dfs_stat(dfs, NULL, file_name, &stbuf);
+       if (rc) {
+               log_err("Failed to stat %s: %d\n", f->file_name, rc);
+               td_verror(td, rc, "dfs_stat");
+               return rc;
+       }
+
+       f->real_file_size = stbuf.st_size;
+       return 0;
+}
+
+static int daos_fio_close(struct thread_data *td, struct fio_file *f)
+{
+       struct daos_data        *dd = td->io_ops_data;
+       int                     rc;
+
+       dprint(FD_FILE, "dfs release %s\n", f->file_name);
+
+       rc = dfs_release(dd->obj);
+       if (rc) {
+               log_err("Failed to release %s: %d\n", f->file_name, rc);
+               td_verror(td, rc, "dfs_release");
+               return rc;
+       }
+
+       return 0;
+}
+
+static int daos_fio_open(struct thread_data *td, struct fio_file *f)
+{
+       struct daos_data        *dd = td->io_ops_data;
+       struct daos_fio_options *eo = td->eo;
+       int                     flags = 0;
+       int                     rc;
+
+       dprint(FD_FILE, "dfs open %s (%s/%d/%d)\n",
+              f->file_name, td_write(td) & !read_only ? "rw" : "r",
+              td->o.create_on_open, td->o.allow_create);
+
+       if (td->o.create_on_open && td->o.allow_create)
+               flags |= O_CREAT;
+
+       if (td_write(td)) {
+               if (!read_only)
+                       flags |= O_RDWR;
+               if (td->o.allow_create)
+                       flags |= O_CREAT;
+       } else if (td_read(td)) {
+               flags |= O_RDONLY;
+       }
+
+       rc = dfs_open(dfs, NULL, f->file_name,
+                     S_IFREG | S_IRUSR | S_IWUSR,
+                     flags, cid, eo->chsz, NULL, &dd->obj);
+       if (rc) {
+               log_err("Failed to open %s: %d\n", f->file_name, rc);
+               td_verror(td, rc, "dfs_open");
+               return rc;
+       }
+
+       return 0;
+}
+
+static int daos_fio_unlink(struct thread_data *td, struct fio_file *f)
+{
+       int rc;
+
+       dprint(FD_FILE, "dfs remove %s\n", f->file_name);
+
+       rc = dfs_remove(dfs, NULL, f->file_name, false, NULL);
+       if (rc) {
+               log_err("Failed to remove %s: %d\n", f->file_name, rc);
+               td_verror(td, rc, "dfs_remove");
+               return rc;
+       }
+
+       return 0;
+}
+
+static int daos_fio_invalidate(struct thread_data *td, struct fio_file *f)
+{
+       dprint(FD_FILE, "dfs invalidate %s\n", f->file_name);
+       return 0;
+}
+
+static void daos_fio_io_u_free(struct thread_data *td, struct io_u *io_u)
+{
+       struct daos_iou *io = io_u->engine_data;
+
+       if (io) {
+               io_u->engine_data = NULL;
+               free(io);
+       }
+}
+
+static int daos_fio_io_u_init(struct thread_data *td, struct io_u *io_u)
+{
+       struct daos_iou *io;
+
+       io = malloc(sizeof(struct daos_iou));
+       if (!io) {
+               td_verror(td, ENOMEM, "malloc");
+               return ENOMEM;
+       }
+       io->io_u = io_u;
+       io_u->engine_data = io;
+       return 0;
+}
+
+static struct io_u * daos_fio_event(struct thread_data *td, int event)
+{
+       struct daos_data *dd = td->io_ops_data;
+
+       return dd->io_us[event];
+}
+
+static int daos_fio_getevents(struct thread_data *td, unsigned int min,
+                             unsigned int max, const struct timespec *t)
+{
+       struct daos_data        *dd = td->io_ops_data;
+       daos_event_t            *evp[max];
+       unsigned int            events = 0;
+       int                     i;
+       int                     rc;
+
+       while (events < min) {
+               rc = daos_eq_poll(dd->eqh, 0, DAOS_EQ_NOWAIT, max, evp);
+               if (rc < 0) {
+                       log_err("Event poll failed: %d\n", rc);
+                       td_verror(td, rc, "daos_eq_poll");
+                       return events;
+               }
+
+               for (i = 0; i < rc; i++) {
+                       struct daos_iou *io;
+                       struct io_u     *io_u;
+
+                       io = container_of(evp[i], struct daos_iou, ev);
+                       if (io->complete)
+                               log_err("Completion on already completed I/O\n");
+
+                       io_u = io->io_u;
+                       if (io->ev.ev_error)
+                               io_u->error = io->ev.ev_error;
+                       else
+                               io_u->resid = 0;
+
+                       dd->io_us[events] = io_u;
+                       dd->queued--;
+                       daos_event_fini(&io->ev);
+                       io->complete = true;
+                       events++;
+               }
+       }
+
+       dprint(FD_IO, "dfs eq_pool returning %d (%u/%u)\n", events, min, max);
+
+       return events;
+}
+
+static enum fio_q_status daos_fio_queue(struct thread_data *td,
+                                       struct io_u *io_u)
+{
+       struct daos_data        *dd = td->io_ops_data;
+       struct daos_iou         *io = io_u->engine_data;
+       daos_off_t              offset = io_u->offset;
+       int                     rc;
+
+       if (dd->queued == td->o.iodepth)
+               return FIO_Q_BUSY;
+
+       io->sgl.sg_nr = 1;
+       io->sgl.sg_nr_out = 0;
+       d_iov_set(&io->iov, io_u->xfer_buf, io_u->xfer_buflen);
+       io->sgl.sg_iovs = &io->iov;
+       io->size = io_u->xfer_buflen;
+
+       io->complete = false;
+       rc = daos_event_init(&io->ev, dd->eqh, NULL);
+       if (rc) {
+               log_err("Event init failed: %d\n", rc);
+               io_u->error = rc;
+               return FIO_Q_COMPLETED;
+       }
+
+       switch (io_u->ddir) {
+       case DDIR_WRITE:
+               rc = dfs_write(dfs, dd->obj, &io->sgl, offset, &io->ev);
+               if (rc) {
+                       log_err("dfs_write failed: %d\n", rc);
+                       io_u->error = rc;
+                       return FIO_Q_COMPLETED;
+               }
+               break;
+       case DDIR_READ:
+               rc = dfs_read(dfs, dd->obj, &io->sgl, offset, &io->size,
+                             &io->ev);
+               if (rc) {
+                       log_err("dfs_read failed: %d\n", rc);
+                       io_u->error = rc;
+                       return FIO_Q_COMPLETED;
+               }
+               break;
+       case DDIR_SYNC:
+               io_u->error = 0;
+               return FIO_Q_COMPLETED;
+       default:
+               dprint(FD_IO, "Invalid IO type: %d\n", io_u->ddir);
+               io_u->error = -DER_INVAL;
+               return FIO_Q_COMPLETED;
+       }
+
+       dd->queued++;
+       return FIO_Q_QUEUED;
+}
+
+static int daos_fio_prep(struct thread_data fio_unused *td, struct io_u *io_u)
+{
+       return 0;
+}
+
+/* ioengine_ops for get_ioengine() */
+FIO_STATIC struct ioengine_ops ioengine = {
+       .name                   = "dfs",
+       .version                = FIO_IOOPS_VERSION,
+       .flags                  = FIO_DISKLESSIO | FIO_NODISKUTIL,
+
+       .setup                  = daos_fio_setup,
+       .init                   = daos_fio_init,
+       .prep                   = daos_fio_prep,
+       .cleanup                = daos_fio_cleanup,
+
+       .open_file              = daos_fio_open,
+       .invalidate             = daos_fio_invalidate,
+       .get_file_size          = daos_fio_get_file_size,
+       .close_file             = daos_fio_close,
+       .unlink_file            = daos_fio_unlink,
+
+       .queue                  = daos_fio_queue,
+       .getevents              = daos_fio_getevents,
+       .event                  = daos_fio_event,
+       .io_u_init              = daos_fio_io_u_init,
+       .io_u_free              = daos_fio_io_u_free,
+
+       .option_struct_size     = sizeof(struct daos_fio_options),
+       .options                = options,
+};
+
+static void fio_init fio_dfs_register(void)
+{
+       register_ioengine(&ioengine);
+}
+
+static void fio_exit fio_dfs_unregister(void)
+{
+       unregister_ioengine(&ioengine);
+}