engines/io_uring: add verbose error for ENOSYS
[fio.git] / engines / rados.c
index f3795c5742ecfd3bd36ebe5f321ce1450d3d4cff..42ee48ff02b3f6371027ab4cbcbc304aefbbea10 100644 (file)
 #include "fio.h"
 #include "../optgroup.h"
 
-struct fio_rados_iou {
-       struct thread_data *td;
-       struct io_u *io_u;
-       rados_completion_t completion;
-       rados_write_op_t write_op;
-};
-
 struct rados_data {
        rados_t cluster;
        rados_ioctx_t io_ctx;
-       char **objects;
-       size_t object_count;
        struct io_u **aio_events;
        bool connected;
+       pthread_mutex_t completed_lock;
+       pthread_cond_t completed_more_io;
+       struct flist_head completed_operations;
+       uint64_t ops_scheduled;
+       uint64_t ops_completed;
+};
+
+struct fio_rados_iou {
+       struct flist_head list;
+       struct thread_data *td;
+       struct io_u *io_u;
+       rados_completion_t completion;
+       rados_write_op_t write_op;
 };
 
 /* fio configuration options read from the job file */
@@ -96,18 +100,16 @@ static int _fio_setup_rados_data(struct thread_data *td,
        rados->aio_events = calloc(td->o.iodepth, sizeof(struct io_u *));
        if (!rados->aio_events)
                goto failed;
-
-       rados->object_count = td->o.nr_files;
-       rados->objects = calloc(rados->object_count, sizeof(char*));
-       if (!rados->objects)
-               goto failed;
-
+       pthread_mutex_init(&rados->completed_lock, NULL);
+       pthread_cond_init(&rados->completed_more_io, NULL);
+       INIT_FLIST_HEAD(&rados->completed_operations);
+       rados->ops_scheduled = 0;
+       rados->ops_completed = 0;
        *rados_data_ptr = rados;
        return 0;
 
 failed:
        if (rados) {
-               rados->object_count = 0;
                if (rados->aio_events)
                        free(rados->aio_events);
                free(rados);
@@ -115,15 +117,12 @@ failed:
        return 1;
 }
 
-static void _fio_rados_rm_objects(struct rados_data *rados)
+static void _fio_rados_rm_objects(struct thread_data *td, struct rados_data *rados)
 {
        size_t i;
-       for (i = 0; i < rados->object_count; ++i) {
-               if (rados->objects[i]) {
-                       rados_remove(rados->io_ctx, rados->objects[i]);
-                       free(rados->objects[i]);
-                       rados->objects[i] = NULL;
-               }
+       for (i = 0; i < td->o.nr_files; i++) {
+               struct fio_file *f = td->files[i];
+               rados_remove(rados->io_ctx, f->file_name);
        }
 }
 
@@ -136,7 +135,6 @@ static int _fio_rados_connect(struct thread_data *td)
                td->o.size / (td->o.nr_files ? td->o.nr_files : 1u);
        struct fio_file *f;
        uint32_t i;
-       size_t oname_len = 0;
 
        if (o->cluster_name) {
                char *client_name = NULL;
@@ -165,6 +163,11 @@ static int _fio_rados_connect(struct thread_data *td)
        } else
                r = rados_create(&rados->cluster, o->client_name);
 
+       if (o->pool_name == NULL) {
+               log_err("rados pool name must be provided.\n");
+               goto failed_early;
+       }
+
        if (r < 0) {
                log_err("rados_create failed.\n");
                goto failed_early;
@@ -188,30 +191,18 @@ static int _fio_rados_connect(struct thread_data *td)
                goto failed_shutdown;
        }
 
-       for (i = 0; i < rados->object_count; i++) {
+       for (i = 0; i < td->o.nr_files; i++) {
                f = td->files[i];
                f->real_file_size = file_size;
-               f->engine_pos = i;
-
-               oname_len = strlen(f->file_name) + 32;
-               rados->objects[i] = malloc(oname_len);
-               /* vary objects for different jobs */
-               snprintf(rados->objects[i], oname_len - 1,
-                       "fio_rados_bench.%s.%x",
-                       f->file_name, td->thread_number);
-               r = rados_write(rados->io_ctx, rados->objects[i], "", 0, 0);
+               r = rados_write(rados->io_ctx, f->file_name, "", 0, 0);
                if (r < 0) {
-                       free(rados->objects[i]);
-                       rados->objects[i] = NULL;
-                       log_err("error creating object.\n");
                        goto failed_obj_create;
                }
        }
-
-  return 0;
+       return 0;
 
 failed_obj_create:
-       _fio_rados_rm_objects(rados);
+       _fio_rados_rm_objects(td, rados);
        rados_ioctx_destroy(rados->io_ctx);
        rados->io_ctx = NULL;
 failed_shutdown:
@@ -226,8 +217,6 @@ static void _fio_rados_disconnect(struct rados_data *rados)
        if (!rados)
                return;
 
-       _fio_rados_rm_objects(rados);
-
        if (rados->io_ctx) {
                rados_ioctx_destroy(rados->io_ctx);
                rados->io_ctx = NULL;
@@ -242,27 +231,43 @@ static void _fio_rados_disconnect(struct rados_data *rados)
 static void fio_rados_cleanup(struct thread_data *td)
 {
        struct rados_data *rados = td->io_ops_data;
-
        if (rados) {
+               pthread_mutex_lock(&rados->completed_lock);
+               while (rados->ops_scheduled != rados->ops_completed)
+                       pthread_cond_wait(&rados->completed_more_io, &rados->completed_lock);
+               pthread_mutex_unlock(&rados->completed_lock);
+               _fio_rados_rm_objects(td, rados);
                _fio_rados_disconnect(rados);
-               free(rados->objects);
                free(rados->aio_events);
                free(rados);
        }
 }
 
-static enum fio_q_status
-fio_rados_queue(struct thread_data *td, struct io_u *io_u)
+static void complete_callback(rados_completion_t cb, void *arg)
+{
+       struct fio_rados_iou *fri = (struct fio_rados_iou *)arg;
+       struct rados_data *rados = fri->td->io_ops_data;
+       assert(fri->completion);
+       assert(rados_aio_is_complete(fri->completion));
+       pthread_mutex_lock(&rados->completed_lock);
+       flist_add_tail(&fri->list, &rados->completed_operations);
+       rados->ops_completed++;
+       pthread_mutex_unlock(&rados->completed_lock);
+       pthread_cond_signal(&rados->completed_more_io);
+}
+
+static enum fio_q_status fio_rados_queue(struct thread_data *td,
+                                        struct io_u *io_u)
 {
        struct rados_data *rados = td->io_ops_data;
        struct fio_rados_iou *fri = io_u->engine_data;
-       char *object = rados->objects[io_u->file->engine_pos];
+       char *object = io_u->file->file_name;
        int r = -1;
 
        fio_ro_check(td, io_u);
 
        if (io_u->ddir == DDIR_WRITE) {
-                r = rados_aio_create_completion(fri, NULL,
+                r = rados_aio_create_completion(fri, complete_callback,
                        NULL, &fri->completion);
                if (r < 0) {
                        log_err("rados_aio_create_completion failed.\n");
@@ -275,9 +280,10 @@ fio_rados_queue(struct thread_data *td, struct io_u *io_u)
                        log_err("rados_write failed.\n");
                        goto failed_comp;
                }
+               rados->ops_scheduled++;
                return FIO_Q_QUEUED;
        } else if (io_u->ddir == DDIR_READ) {
-               r = rados_aio_create_completion(fri, NULL,
+               r = rados_aio_create_completion(fri, complete_callback,
                        NULL, &fri->completion);
                if (r < 0) {
                        log_err("rados_aio_create_completion failed.\n");
@@ -289,9 +295,10 @@ fio_rados_queue(struct thread_data *td, struct io_u *io_u)
                        log_err("rados_aio_read failed.\n");
                        goto failed_comp;
                }
+               rados->ops_scheduled++;
                return FIO_Q_QUEUED;
        } else if (io_u->ddir == DDIR_TRIM) {
-               r = rados_aio_create_completion(fri, NULL,
+               r = rados_aio_create_completion(fri, complete_callback,
                        NULL , &fri->completion);
                if (r < 0) {
                        log_err("rados_aio_create_completion failed.\n");
@@ -310,6 +317,7 @@ fio_rados_queue(struct thread_data *td, struct io_u *io_u)
                        log_err("rados_aio_write_op_operate failed.\n");
                        goto failed_write_op;
                }
+               rados->ops_scheduled++;
                return FIO_Q_QUEUED;
         }
 
@@ -335,50 +343,33 @@ int fio_rados_getevents(struct thread_data *td, unsigned int min,
        unsigned int max, const struct timespec *t)
 {
        struct rados_data *rados = td->io_ops_data;
-       struct rados_options *o = td->eo;
-       int busy_poll = o->busy_poll;
        unsigned int events = 0;
-       struct io_u *u;
        struct fio_rados_iou *fri;
-       unsigned int i;
-       rados_completion_t first_unfinished;
-       int observed_new = 0;
-
-       /* loop through inflight ios until we find 'min' completions */
-       do {
-               first_unfinished = NULL;
-               io_u_qiter(&td->io_u_all, u, i) {
-                       if (!(u->flags & IO_U_F_FLIGHT))
-                               continue;
-
-                       fri = u->engine_data;
-                       if (fri->completion) {
-                               if (rados_aio_is_complete(fri->completion)) {
-                                       if (fri->write_op != NULL) {
-                                               rados_release_write_op(fri->write_op);
-                                               fri->write_op = NULL;
-                                       }
-                                       rados_aio_release(fri->completion);
-                                       fri->completion = NULL;
-                                       rados->aio_events[events] = u;
-                                       events++;
-                                       observed_new = 1;
-                               } else if (first_unfinished == NULL) {
-                                       first_unfinished = fri->completion;
-                               }
-                       }
-                       if (events >= max)
-                               break;
+
+       pthread_mutex_lock(&rados->completed_lock);
+       while (events < min) {
+               while (flist_empty(&rados->completed_operations)) {
+                       pthread_cond_wait(&rados->completed_more_io, &rados->completed_lock);
                }
-               if (events >= min)
-                       return events;
-               if (first_unfinished == NULL || busy_poll)
-                       continue;
-
-               if (!observed_new)
-                       rados_aio_wait_for_complete(first_unfinished);
-       } while (1);
-  return events;
+               assert(!flist_empty(&rados->completed_operations));
+               
+               fri = flist_first_entry(&rados->completed_operations, struct fio_rados_iou, list);
+               assert(fri->completion);
+               assert(rados_aio_is_complete(fri->completion));
+               if (fri->write_op != NULL) {
+                       rados_release_write_op(fri->write_op);
+                       fri->write_op = NULL;
+               }
+               rados_aio_release(fri->completion);
+               fri->completion = NULL;
+
+               rados->aio_events[events] = fri->io_u;
+               events ++;
+               flist_del(&fri->list);
+               if (events >= max) break;
+       }
+       pthread_mutex_unlock(&rados->completed_lock);
+       return events;
 }
 
 static int fio_rados_setup(struct thread_data *td)
@@ -447,12 +438,13 @@ static int fio_rados_io_u_init(struct thread_data *td, struct io_u *io_u)
        fri = calloc(1, sizeof(*fri));
        fri->io_u = io_u;
        fri->td = td;
+       INIT_FLIST_HEAD(&fri->list);
        io_u->engine_data = fri;
        return 0;
 }
 
 /* ioengine_ops for get_ioengine() */
-static struct ioengine_ops ioengine = {
+FIO_STATIC struct ioengine_ops ioengine = {
        .name = "rados",
        .version                = FIO_IOOPS_VERSION,
        .flags                  = FIO_DISKLESSIO,