engines/rados: changed polling to completion callbacks methodology
authorAdam Kupczyk <akupczyk@redhat.com>
Mon, 25 Nov 2019 17:55:07 +0000 (12:55 -0500)
committerAdam Kupczyk <akupczyk@redhat.com>
Mon, 25 Nov 2019 18:43:05 +0000 (13:43 -0500)
Previously, getevent() function polled operations that have been submitted to check if some of them have already finished.
Now, each rados operation invokes completion callback. This callback adds to list of completed operations.
In this version getevent() only peeks if completed list has some operations.

There are two benefits:
1) small one - this works faster then previous version
2) big one - when there is a huge amount of operations in fly, getevent() might be more overloaded then producer. In this scenario some old operations are finished, but not picked up and properly noticed. This caused absurdely long execution times, when actually everything was working properly.

Signed-off-by: Adam Kupczyk <akupczyk@redhat.com>
engines/rados.c

index 86100dc4918c586c29d6da8914afc6fbea913e8c..cde538b982085054a1d678bd729f43609f9b8d3b 100644 (file)
 #include "fio.h"
 #include "../optgroup.h"
 
+struct rados_data {
+        rados_t cluster;
+        rados_ioctx_t io_ctx;
+        struct io_u **aio_events;
+        bool connected;
+        pthread_mutex_t completed_lock;
+        pthread_cond_t completed_more_io;
+        struct flist_head completed_operations;
+};
+
 struct fio_rados_iou {
+       struct flist_head list;
        struct thread_data *td;
        struct io_u *io_u;
        rados_completion_t completion;
        rados_write_op_t write_op;
 };
 
-struct rados_data {
-       rados_t cluster;
-       rados_ioctx_t io_ctx;
-       struct io_u **aio_events;
-       bool connected;
-};
-
 /* fio configuration options read from the job file */
 struct rados_options {
        void *pad;
@@ -94,6 +98,9 @@ static int _fio_setup_rados_data(struct thread_data *td,
        rados->aio_events = calloc(td->o.iodepth, sizeof(struct io_u *));
        if (!rados->aio_events)
                goto failed;
+       pthread_mutex_init(&rados->completed_lock, NULL);
+       pthread_cond_init(&rados->completed_more_io, NULL);
+       INIT_FLIST_HEAD(&rados->completed_operations);
        *rados_data_ptr = rados;
        return 0;
 
@@ -229,6 +236,18 @@ static void fio_rados_cleanup(struct thread_data *td)
        }
 }
 
+static void complete_callback(rados_completion_t cb, void *arg)
+{
+       struct fio_rados_iou *fri = (struct fio_rados_iou *)arg;
+       struct rados_data *rados = fri->td->io_ops_data;
+       assert(fri->completion);
+       assert(rados_aio_is_complete(fri->completion));
+       pthread_mutex_lock(&rados->completed_lock);
+       flist_add_tail(&fri->list, &rados->completed_operations);
+       pthread_mutex_unlock(&rados->completed_lock);
+       pthread_cond_signal(&rados->completed_more_io);
+}
+
 static enum fio_q_status fio_rados_queue(struct thread_data *td,
                                         struct io_u *io_u)
 {
@@ -240,7 +259,7 @@ static enum fio_q_status fio_rados_queue(struct thread_data *td,
        fio_ro_check(td, io_u);
 
        if (io_u->ddir == DDIR_WRITE) {
-                r = rados_aio_create_completion(fri, NULL,
+                r = rados_aio_create_completion(fri, complete_callback,
                        NULL, &fri->completion);
                if (r < 0) {
                        log_err("rados_aio_create_completion failed.\n");
@@ -255,7 +274,7 @@ static enum fio_q_status fio_rados_queue(struct thread_data *td,
                }
                return FIO_Q_QUEUED;
        } else if (io_u->ddir == DDIR_READ) {
-               r = rados_aio_create_completion(fri, NULL,
+               r = rados_aio_create_completion(fri, complete_callback,
                        NULL, &fri->completion);
                if (r < 0) {
                        log_err("rados_aio_create_completion failed.\n");
@@ -269,7 +288,7 @@ static enum fio_q_status fio_rados_queue(struct thread_data *td,
                }
                return FIO_Q_QUEUED;
        } else if (io_u->ddir == DDIR_TRIM) {
-               r = rados_aio_create_completion(fri, NULL,
+               r = rados_aio_create_completion(fri, complete_callback,
                        NULL , &fri->completion);
                if (r < 0) {
                        log_err("rados_aio_create_completion failed.\n");
@@ -313,50 +332,33 @@ int fio_rados_getevents(struct thread_data *td, unsigned int min,
        unsigned int max, const struct timespec *t)
 {
        struct rados_data *rados = td->io_ops_data;
-       struct rados_options *o = td->eo;
-       int busy_poll = o->busy_poll;
        unsigned int events = 0;
-       struct io_u *u;
        struct fio_rados_iou *fri;
-       unsigned int i;
-       rados_completion_t first_unfinished;
-       int observed_new = 0;
-
-       /* loop through inflight ios until we find 'min' completions */
-       do {
-               first_unfinished = NULL;
-               io_u_qiter(&td->io_u_all, u, i) {
-                       if (!(u->flags & IO_U_F_FLIGHT))
-                               continue;
-
-                       fri = u->engine_data;
-                       if (fri->completion) {
-                               if (rados_aio_is_complete(fri->completion)) {
-                                       if (fri->write_op != NULL) {
-                                               rados_release_write_op(fri->write_op);
-                                               fri->write_op = NULL;
-                                       }
-                                       rados_aio_release(fri->completion);
-                                       fri->completion = NULL;
-                                       rados->aio_events[events] = u;
-                                       events++;
-                                       observed_new = 1;
-                               } else if (first_unfinished == NULL) {
-                                       first_unfinished = fri->completion;
-                               }
-                       }
-                       if (events >= max)
-                               break;
+
+       pthread_mutex_lock(&rados->completed_lock);
+       while (events < min) {
+               while (flist_empty(&rados->completed_operations)) {
+                       pthread_cond_wait(&rados->completed_more_io, &rados->completed_lock);
                }
-               if (events >= min)
-                       return events;
-               if (first_unfinished == NULL || busy_poll)
-                       continue;
-
-               if (!observed_new)
-                       rados_aio_wait_for_complete(first_unfinished);
-       } while (1);
-  return events;
+               assert(!flist_empty(&rados->completed_operations));
+               
+               fri = flist_last_entry(&rados->completed_operations, struct fio_rados_iou, list);
+               assert(fri->completion);
+               assert(rados_aio_is_complete(fri->completion));
+               if (fri->write_op != NULL) {
+                       rados_release_write_op(fri->write_op);
+                       fri->write_op = NULL;
+               }
+               rados_aio_release(fri->completion);
+               fri->completion = NULL;
+
+               rados->aio_events[events] = fri->io_u;
+               events ++;
+               flist_del(&fri->list);
+               if (events >= max) break;
+       }
+       pthread_mutex_unlock(&rados->completed_lock);
+       return events;
 }
 
 static int fio_rados_setup(struct thread_data *td)
@@ -425,6 +427,7 @@ static int fio_rados_io_u_init(struct thread_data *td, struct io_u *io_u)
        fri = calloc(1, sizeof(*fri));
        fri->io_u = io_u;
        fri->td = td;
+       INIT_FLIST_HEAD(&fri->list);
        io_u->engine_data = fri;
        return 0;
 }