X-Git-Url: https://git.kernel.dk/?a=blobdiff_plain;f=engines%2Fdfs.c;fp=engines%2Fdfs.c;h=0343b101e92c2899db26c7dddecef696ae70e7b0;hb=c363fdd7fb54713bc04582df300722e97ac31348;hp=0000000000000000000000000000000000000000;hpb=ea8c20c3416d03233879f0d96d745606bd66125b;p=fio.git diff --git a/engines/dfs.c b/engines/dfs.c new file mode 100644 index 00000000..0343b101 --- /dev/null +++ b/engines/dfs.c @@ -0,0 +1,583 @@ +/** + * FIO engine for DAOS File System (dfs). + * + * (C) Copyright 2020-2021 Intel Corporation. + */ + +#include +#include + +#include +#include + +static bool daos_initialized; +static int num_threads; +static pthread_mutex_t daos_mutex = PTHREAD_MUTEX_INITIALIZER; +daos_handle_t poh; /* pool handle */ +daos_handle_t coh; /* container handle */ +daos_oclass_id_t cid = OC_UNKNOWN; /* object class */ +dfs_t *dfs; /* dfs mount reference */ + +struct daos_iou { + struct io_u *io_u; + daos_event_t ev; + d_sg_list_t sgl; + d_iov_t iov; + daos_size_t size; + bool complete; +}; + +struct daos_data { + daos_handle_t eqh; + dfs_obj_t *obj; + struct io_u **io_us; + int queued; + int num_ios; +}; + +struct daos_fio_options { + void *pad; + char *pool; /* Pool UUID */ + char *cont; /* Container UUID */ + daos_size_t chsz; /* Chunk size */ + char *oclass; /* object class */ +#if !defined(DAOS_API_VERSION_MAJOR) || DAOS_API_VERSION_MAJOR < 1 + char *svcl; /* service replica list, deprecated */ +#endif +}; + +static struct fio_option options[] = { + { + .name = "pool", + .lname = "pool uuid", + .type = FIO_OPT_STR_STORE, + .off1 = offsetof(struct daos_fio_options, pool), + .help = "DAOS pool uuid", + .category = FIO_OPT_C_ENGINE, + .group = FIO_OPT_G_DFS, + }, + { + .name = "cont", + .lname = "container uuid", + .type = FIO_OPT_STR_STORE, + .off1 = offsetof(struct daos_fio_options, cont), + .help = "DAOS container uuid", + .category = FIO_OPT_C_ENGINE, + .group = FIO_OPT_G_DFS, + }, + { + .name = "chunk_size", + .lname = "DFS chunk size", + .type = FIO_OPT_ULL, + .off1 = offsetof(struct daos_fio_options, chsz), + .help = "DFS chunk size in bytes", + .def = "0", /* use container default */ + .category = FIO_OPT_C_ENGINE, + .group = FIO_OPT_G_DFS, + }, + { + .name = "object_class", + .lname = "object class", + .type = FIO_OPT_STR_STORE, + .off1 = offsetof(struct daos_fio_options, oclass), + .help = "DAOS object class", + .category = FIO_OPT_C_ENGINE, + .group = FIO_OPT_G_DFS, + }, +#if !defined(DAOS_API_VERSION_MAJOR) || DAOS_API_VERSION_MAJOR < 1 + { + .name = "svcl", + .lname = "List of service ranks", + .type = FIO_OPT_STR_STORE, + .off1 = offsetof(struct daos_fio_options, svcl), + .help = "List of pool replicated service ranks", + .category = FIO_OPT_C_ENGINE, + .group = FIO_OPT_G_DFS, + }, +#endif + { + .name = NULL, + }, +}; + +static int daos_fio_global_init(struct thread_data *td) +{ + struct daos_fio_options *eo = td->eo; + uuid_t pool_uuid, co_uuid; + daos_pool_info_t pool_info; + daos_cont_info_t co_info; + int rc = 0; + +#if !defined(DAOS_API_VERSION_MAJOR) || DAOS_API_VERSION_MAJOR < 1 + if (!eo->pool || !eo->cont || !eo->svcl) { +#else + if (!eo->pool || !eo->cont) { +#endif + log_err("Missing required DAOS options\n"); + return EINVAL; + } + + rc = daos_init(); + if (rc != -DER_ALREADY && rc) { + log_err("Failed to initialize daos %d\n", rc); + td_verror(td, rc, "daos_init"); + return rc; + } + + rc = uuid_parse(eo->pool, pool_uuid); + if (rc) { + log_err("Failed to parse 'Pool uuid': %s\n", eo->pool); + td_verror(td, EINVAL, "uuid_parse(eo->pool)"); + return EINVAL; + } + + rc = uuid_parse(eo->cont, co_uuid); + if (rc) { + log_err("Failed to parse 'Cont uuid': %s\n", eo->cont); + td_verror(td, EINVAL, "uuid_parse(eo->cont)"); + return EINVAL; + } + + /* Connect to the DAOS pool */ +#if !defined(DAOS_API_VERSION_MAJOR) || DAOS_API_VERSION_MAJOR < 1 + d_rank_list_t *svcl = NULL; + + svcl = daos_rank_list_parse(eo->svcl, ":"); + if (svcl == NULL) { + log_err("Failed to parse svcl\n"); + td_verror(td, EINVAL, "daos_rank_list_parse"); + return EINVAL; + } + + rc = daos_pool_connect(pool_uuid, NULL, svcl, DAOS_PC_RW, + &poh, &pool_info, NULL); + d_rank_list_free(svcl); +#else + rc = daos_pool_connect(pool_uuid, NULL, DAOS_PC_RW, &poh, &pool_info, + NULL); +#endif + if (rc) { + log_err("Failed to connect to pool %d\n", rc); + td_verror(td, rc, "daos_pool_connect"); + return rc; + } + + /* Open the DAOS container */ + rc = daos_cont_open(poh, co_uuid, DAOS_COO_RW, &coh, &co_info, NULL); + if (rc) { + log_err("Failed to open container: %d\n", rc); + td_verror(td, rc, "daos_cont_open"); + (void)daos_pool_disconnect(poh, NULL); + return rc; + } + + /* Mount encapsulated filesystem */ + rc = dfs_mount(poh, coh, O_RDWR, &dfs); + if (rc) { + log_err("Failed to mount DFS namespace: %d\n", rc); + td_verror(td, rc, "dfs_mount"); + (void)daos_pool_disconnect(poh, NULL); + (void)daos_cont_close(coh, NULL); + return rc; + } + + /* Retrieve object class to use, if specified */ + if (eo->oclass) + cid = daos_oclass_name2id(eo->oclass); + + return 0; +} + +static int daos_fio_global_cleanup() +{ + int rc; + int ret = 0; + + rc = dfs_umount(dfs); + if (rc) { + log_err("failed to umount dfs: %d\n", rc); + ret = rc; + } + rc = daos_cont_close(coh, NULL); + if (rc) { + log_err("failed to close container: %d\n", rc); + if (ret == 0) + ret = rc; + } + rc = daos_pool_disconnect(poh, NULL); + if (rc) { + log_err("failed to disconnect pool: %d\n", rc); + if (ret == 0) + ret = rc; + } + rc = daos_fini(); + if (rc) { + log_err("failed to finalize daos: %d\n", rc); + if (ret == 0) + ret = rc; + } + + return ret; +} + +static int daos_fio_setup(struct thread_data *td) +{ + return 0; +} + +static int daos_fio_init(struct thread_data *td) +{ + struct daos_data *dd; + int rc = 0; + + pthread_mutex_lock(&daos_mutex); + + dd = malloc(sizeof(*dd)); + if (dd == NULL) { + log_err("Failed to allocate DAOS-private data\n"); + rc = ENOMEM; + goto out; + } + + dd->queued = 0; + dd->num_ios = td->o.iodepth; + dd->io_us = calloc(dd->num_ios, sizeof(struct io_u *)); + if (dd->io_us == NULL) { + log_err("Failed to allocate IO queue\n"); + rc = ENOMEM; + goto out; + } + + /* initialize DAOS stack if not already up */ + if (!daos_initialized) { + rc = daos_fio_global_init(td); + if (rc) + goto out; + daos_initialized = true; + } + + rc = daos_eq_create(&dd->eqh); + if (rc) { + log_err("Failed to create event queue: %d\n", rc); + td_verror(td, rc, "daos_eq_create"); + goto out; + } + + td->io_ops_data = dd; + num_threads++; +out: + if (rc) { + if (dd) { + free(dd->io_us); + free(dd); + } + if (num_threads == 0 && daos_initialized) { + /* don't clobber error return value */ + (void)daos_fio_global_cleanup(); + daos_initialized = false; + } + } + pthread_mutex_unlock(&daos_mutex); + return rc; +} + +static void daos_fio_cleanup(struct thread_data *td) +{ + struct daos_data *dd = td->io_ops_data; + int rc; + + if (dd == NULL) + return; + + rc = daos_eq_destroy(dd->eqh, DAOS_EQ_DESTROY_FORCE); + if (rc < 0) { + log_err("failed to destroy event queue: %d\n", rc); + td_verror(td, rc, "daos_eq_destroy"); + } + + free(dd->io_us); + free(dd); + + pthread_mutex_lock(&daos_mutex); + num_threads--; + if (daos_initialized && num_threads == 0) { + int ret; + + ret = daos_fio_global_cleanup(); + if (ret < 0 && rc == 0) { + log_err("failed to clean up: %d\n", ret); + td_verror(td, ret, "daos_fio_global_cleanup"); + } + daos_initialized = false; + } + pthread_mutex_unlock(&daos_mutex); +} + +static int daos_fio_get_file_size(struct thread_data *td, struct fio_file *f) +{ + char *file_name = f->file_name; + struct stat stbuf = {0}; + int rc; + + dprint(FD_FILE, "dfs stat %s\n", f->file_name); + + if (!daos_initialized) + return 0; + + rc = dfs_stat(dfs, NULL, file_name, &stbuf); + if (rc) { + log_err("Failed to stat %s: %d\n", f->file_name, rc); + td_verror(td, rc, "dfs_stat"); + return rc; + } + + f->real_file_size = stbuf.st_size; + return 0; +} + +static int daos_fio_close(struct thread_data *td, struct fio_file *f) +{ + struct daos_data *dd = td->io_ops_data; + int rc; + + dprint(FD_FILE, "dfs release %s\n", f->file_name); + + rc = dfs_release(dd->obj); + if (rc) { + log_err("Failed to release %s: %d\n", f->file_name, rc); + td_verror(td, rc, "dfs_release"); + return rc; + } + + return 0; +} + +static int daos_fio_open(struct thread_data *td, struct fio_file *f) +{ + struct daos_data *dd = td->io_ops_data; + struct daos_fio_options *eo = td->eo; + int flags = 0; + int rc; + + dprint(FD_FILE, "dfs open %s (%s/%d/%d)\n", + f->file_name, td_write(td) & !read_only ? "rw" : "r", + td->o.create_on_open, td->o.allow_create); + + if (td->o.create_on_open && td->o.allow_create) + flags |= O_CREAT; + + if (td_write(td)) { + if (!read_only) + flags |= O_RDWR; + if (td->o.allow_create) + flags |= O_CREAT; + } else if (td_read(td)) { + flags |= O_RDONLY; + } + + rc = dfs_open(dfs, NULL, f->file_name, + S_IFREG | S_IRUSR | S_IWUSR, + flags, cid, eo->chsz, NULL, &dd->obj); + if (rc) { + log_err("Failed to open %s: %d\n", f->file_name, rc); + td_verror(td, rc, "dfs_open"); + return rc; + } + + return 0; +} + +static int daos_fio_unlink(struct thread_data *td, struct fio_file *f) +{ + int rc; + + dprint(FD_FILE, "dfs remove %s\n", f->file_name); + + rc = dfs_remove(dfs, NULL, f->file_name, false, NULL); + if (rc) { + log_err("Failed to remove %s: %d\n", f->file_name, rc); + td_verror(td, rc, "dfs_remove"); + return rc; + } + + return 0; +} + +static int daos_fio_invalidate(struct thread_data *td, struct fio_file *f) +{ + dprint(FD_FILE, "dfs invalidate %s\n", f->file_name); + return 0; +} + +static void daos_fio_io_u_free(struct thread_data *td, struct io_u *io_u) +{ + struct daos_iou *io = io_u->engine_data; + + if (io) { + io_u->engine_data = NULL; + free(io); + } +} + +static int daos_fio_io_u_init(struct thread_data *td, struct io_u *io_u) +{ + struct daos_iou *io; + + io = malloc(sizeof(struct daos_iou)); + if (!io) { + td_verror(td, ENOMEM, "malloc"); + return ENOMEM; + } + io->io_u = io_u; + io_u->engine_data = io; + return 0; +} + +static struct io_u * daos_fio_event(struct thread_data *td, int event) +{ + struct daos_data *dd = td->io_ops_data; + + return dd->io_us[event]; +} + +static int daos_fio_getevents(struct thread_data *td, unsigned int min, + unsigned int max, const struct timespec *t) +{ + struct daos_data *dd = td->io_ops_data; + daos_event_t *evp[max]; + unsigned int events = 0; + int i; + int rc; + + while (events < min) { + rc = daos_eq_poll(dd->eqh, 0, DAOS_EQ_NOWAIT, max, evp); + if (rc < 0) { + log_err("Event poll failed: %d\n", rc); + td_verror(td, rc, "daos_eq_poll"); + return events; + } + + for (i = 0; i < rc; i++) { + struct daos_iou *io; + struct io_u *io_u; + + io = container_of(evp[i], struct daos_iou, ev); + if (io->complete) + log_err("Completion on already completed I/O\n"); + + io_u = io->io_u; + if (io->ev.ev_error) + io_u->error = io->ev.ev_error; + else + io_u->resid = 0; + + dd->io_us[events] = io_u; + dd->queued--; + daos_event_fini(&io->ev); + io->complete = true; + events++; + } + } + + dprint(FD_IO, "dfs eq_pool returning %d (%u/%u)\n", events, min, max); + + return events; +} + +static enum fio_q_status daos_fio_queue(struct thread_data *td, + struct io_u *io_u) +{ + struct daos_data *dd = td->io_ops_data; + struct daos_iou *io = io_u->engine_data; + daos_off_t offset = io_u->offset; + int rc; + + if (dd->queued == td->o.iodepth) + return FIO_Q_BUSY; + + io->sgl.sg_nr = 1; + io->sgl.sg_nr_out = 0; + d_iov_set(&io->iov, io_u->xfer_buf, io_u->xfer_buflen); + io->sgl.sg_iovs = &io->iov; + io->size = io_u->xfer_buflen; + + io->complete = false; + rc = daos_event_init(&io->ev, dd->eqh, NULL); + if (rc) { + log_err("Event init failed: %d\n", rc); + io_u->error = rc; + return FIO_Q_COMPLETED; + } + + switch (io_u->ddir) { + case DDIR_WRITE: + rc = dfs_write(dfs, dd->obj, &io->sgl, offset, &io->ev); + if (rc) { + log_err("dfs_write failed: %d\n", rc); + io_u->error = rc; + return FIO_Q_COMPLETED; + } + break; + case DDIR_READ: + rc = dfs_read(dfs, dd->obj, &io->sgl, offset, &io->size, + &io->ev); + if (rc) { + log_err("dfs_read failed: %d\n", rc); + io_u->error = rc; + return FIO_Q_COMPLETED; + } + break; + case DDIR_SYNC: + io_u->error = 0; + return FIO_Q_COMPLETED; + default: + dprint(FD_IO, "Invalid IO type: %d\n", io_u->ddir); + io_u->error = -DER_INVAL; + return FIO_Q_COMPLETED; + } + + dd->queued++; + return FIO_Q_QUEUED; +} + +static int daos_fio_prep(struct thread_data fio_unused *td, struct io_u *io_u) +{ + return 0; +} + +/* ioengine_ops for get_ioengine() */ +FIO_STATIC struct ioengine_ops ioengine = { + .name = "dfs", + .version = FIO_IOOPS_VERSION, + .flags = FIO_DISKLESSIO | FIO_NODISKUTIL, + + .setup = daos_fio_setup, + .init = daos_fio_init, + .prep = daos_fio_prep, + .cleanup = daos_fio_cleanup, + + .open_file = daos_fio_open, + .invalidate = daos_fio_invalidate, + .get_file_size = daos_fio_get_file_size, + .close_file = daos_fio_close, + .unlink_file = daos_fio_unlink, + + .queue = daos_fio_queue, + .getevents = daos_fio_getevents, + .event = daos_fio_event, + .io_u_init = daos_fio_io_u_init, + .io_u_free = daos_fio_io_u_free, + + .option_struct_size = sizeof(struct daos_fio_options), + .options = options, +}; + +static void fio_init fio_dfs_register(void) +{ + register_ioengine(&ioengine); +} + +static void fio_exit fio_dfs_unregister(void) +{ + unregister_ioengine(&ioengine); +}