X-Git-Url: https://git.kernel.dk/?a=blobdiff_plain;f=engines%2Frados.c;h=42ee48ff02b3f6371027ab4cbcbc304aefbbea10;hb=c4f5c92fac8a39ffff29d57e99c3c0163358dd7a;hp=86100dc4918c586c29d6da8914afc6fbea913e8c;hpb=9a0ac6c7b69355ea095f06c6b0a08115c946dd61;p=fio.git diff --git a/engines/rados.c b/engines/rados.c index 86100dc4..42ee48ff 100644 --- a/engines/rados.c +++ b/engines/rados.c @@ -11,18 +11,24 @@ #include "fio.h" #include "../optgroup.h" -struct fio_rados_iou { - struct thread_data *td; - struct io_u *io_u; - rados_completion_t completion; - rados_write_op_t write_op; -}; - struct rados_data { rados_t cluster; rados_ioctx_t io_ctx; struct io_u **aio_events; bool connected; + pthread_mutex_t completed_lock; + pthread_cond_t completed_more_io; + struct flist_head completed_operations; + uint64_t ops_scheduled; + uint64_t ops_completed; +}; + +struct fio_rados_iou { + struct flist_head list; + struct thread_data *td; + struct io_u *io_u; + rados_completion_t completion; + rados_write_op_t write_op; }; /* fio configuration options read from the job file */ @@ -94,6 +100,11 @@ static int _fio_setup_rados_data(struct thread_data *td, rados->aio_events = calloc(td->o.iodepth, sizeof(struct io_u *)); if (!rados->aio_events) goto failed; + pthread_mutex_init(&rados->completed_lock, NULL); + pthread_cond_init(&rados->completed_more_io, NULL); + INIT_FLIST_HEAD(&rados->completed_operations); + rados->ops_scheduled = 0; + rados->ops_completed = 0; *rados_data_ptr = rados; return 0; @@ -220,8 +231,11 @@ static void _fio_rados_disconnect(struct rados_data *rados) static void fio_rados_cleanup(struct thread_data *td) { struct rados_data *rados = td->io_ops_data; - if (rados) { + pthread_mutex_lock(&rados->completed_lock); + while (rados->ops_scheduled != rados->ops_completed) + pthread_cond_wait(&rados->completed_more_io, &rados->completed_lock); + pthread_mutex_unlock(&rados->completed_lock); _fio_rados_rm_objects(td, rados); _fio_rados_disconnect(rados); free(rados->aio_events); @@ -229,6 +243,19 @@ static void fio_rados_cleanup(struct thread_data *td) } } +static void complete_callback(rados_completion_t cb, void *arg) +{ + struct fio_rados_iou *fri = (struct fio_rados_iou *)arg; + struct rados_data *rados = fri->td->io_ops_data; + assert(fri->completion); + assert(rados_aio_is_complete(fri->completion)); + pthread_mutex_lock(&rados->completed_lock); + flist_add_tail(&fri->list, &rados->completed_operations); + rados->ops_completed++; + pthread_mutex_unlock(&rados->completed_lock); + pthread_cond_signal(&rados->completed_more_io); +} + static enum fio_q_status fio_rados_queue(struct thread_data *td, struct io_u *io_u) { @@ -240,7 +267,7 @@ static enum fio_q_status fio_rados_queue(struct thread_data *td, fio_ro_check(td, io_u); if (io_u->ddir == DDIR_WRITE) { - r = rados_aio_create_completion(fri, NULL, + r = rados_aio_create_completion(fri, complete_callback, NULL, &fri->completion); if (r < 0) { log_err("rados_aio_create_completion failed.\n"); @@ -253,9 +280,10 @@ static enum fio_q_status fio_rados_queue(struct thread_data *td, log_err("rados_write failed.\n"); goto failed_comp; } + rados->ops_scheduled++; return FIO_Q_QUEUED; } else if (io_u->ddir == DDIR_READ) { - r = rados_aio_create_completion(fri, NULL, + r = rados_aio_create_completion(fri, complete_callback, NULL, &fri->completion); if (r < 0) { log_err("rados_aio_create_completion failed.\n"); @@ -267,9 +295,10 @@ static enum fio_q_status fio_rados_queue(struct thread_data *td, log_err("rados_aio_read failed.\n"); goto failed_comp; } + rados->ops_scheduled++; return FIO_Q_QUEUED; } else if (io_u->ddir == DDIR_TRIM) { - r = rados_aio_create_completion(fri, NULL, + r = rados_aio_create_completion(fri, complete_callback, NULL , &fri->completion); if (r < 0) { log_err("rados_aio_create_completion failed.\n"); @@ -288,6 +317,7 @@ static enum fio_q_status fio_rados_queue(struct thread_data *td, log_err("rados_aio_write_op_operate failed.\n"); goto failed_write_op; } + rados->ops_scheduled++; return FIO_Q_QUEUED; } @@ -313,50 +343,33 @@ int fio_rados_getevents(struct thread_data *td, unsigned int min, unsigned int max, const struct timespec *t) { struct rados_data *rados = td->io_ops_data; - struct rados_options *o = td->eo; - int busy_poll = o->busy_poll; unsigned int events = 0; - struct io_u *u; struct fio_rados_iou *fri; - unsigned int i; - rados_completion_t first_unfinished; - int observed_new = 0; - - /* loop through inflight ios until we find 'min' completions */ - do { - first_unfinished = NULL; - io_u_qiter(&td->io_u_all, u, i) { - if (!(u->flags & IO_U_F_FLIGHT)) - continue; - - fri = u->engine_data; - if (fri->completion) { - if (rados_aio_is_complete(fri->completion)) { - if (fri->write_op != NULL) { - rados_release_write_op(fri->write_op); - fri->write_op = NULL; - } - rados_aio_release(fri->completion); - fri->completion = NULL; - rados->aio_events[events] = u; - events++; - observed_new = 1; - } else if (first_unfinished == NULL) { - first_unfinished = fri->completion; - } - } - if (events >= max) - break; + + pthread_mutex_lock(&rados->completed_lock); + while (events < min) { + while (flist_empty(&rados->completed_operations)) { + pthread_cond_wait(&rados->completed_more_io, &rados->completed_lock); + } + assert(!flist_empty(&rados->completed_operations)); + + fri = flist_first_entry(&rados->completed_operations, struct fio_rados_iou, list); + assert(fri->completion); + assert(rados_aio_is_complete(fri->completion)); + if (fri->write_op != NULL) { + rados_release_write_op(fri->write_op); + fri->write_op = NULL; } - if (events >= min) - return events; - if (first_unfinished == NULL || busy_poll) - continue; - - if (!observed_new) - rados_aio_wait_for_complete(first_unfinished); - } while (1); - return events; + rados_aio_release(fri->completion); + fri->completion = NULL; + + rados->aio_events[events] = fri->io_u; + events ++; + flist_del(&fri->list); + if (events >= max) break; + } + pthread_mutex_unlock(&rados->completed_lock); + return events; } static int fio_rados_setup(struct thread_data *td) @@ -425,12 +438,13 @@ static int fio_rados_io_u_init(struct thread_data *td, struct io_u *io_u) fri = calloc(1, sizeof(*fri)); fri->io_u = io_u; fri->td = td; + INIT_FLIST_HEAD(&fri->list); io_u->engine_data = fri; return 0; } /* ioengine_ops for get_ioengine() */ -static struct ioengine_ops ioengine = { +FIO_STATIC struct ioengine_ops ioengine = { .name = "rados", .version = FIO_IOOPS_VERSION, .flags = FIO_DISKLESSIO,