X-Git-Url: https://git.kernel.dk/?a=blobdiff_plain;f=engines%2Frados.c;h=42ee48ff02b3f6371027ab4cbcbc304aefbbea10;hb=8e4b2e55512f1b75e99a9c4fe1fd7af5e05ecc4d;hp=f3795c5742ecfd3bd36ebe5f321ce1450d3d4cff;hpb=4634d057febe9b4d7028e6f02963a8fd8996ac6a;p=fio.git diff --git a/engines/rados.c b/engines/rados.c index f3795c57..42ee48ff 100644 --- a/engines/rados.c +++ b/engines/rados.c @@ -11,20 +11,24 @@ #include "fio.h" #include "../optgroup.h" -struct fio_rados_iou { - struct thread_data *td; - struct io_u *io_u; - rados_completion_t completion; - rados_write_op_t write_op; -}; - struct rados_data { rados_t cluster; rados_ioctx_t io_ctx; - char **objects; - size_t object_count; struct io_u **aio_events; bool connected; + pthread_mutex_t completed_lock; + pthread_cond_t completed_more_io; + struct flist_head completed_operations; + uint64_t ops_scheduled; + uint64_t ops_completed; +}; + +struct fio_rados_iou { + struct flist_head list; + struct thread_data *td; + struct io_u *io_u; + rados_completion_t completion; + rados_write_op_t write_op; }; /* fio configuration options read from the job file */ @@ -96,18 +100,16 @@ static int _fio_setup_rados_data(struct thread_data *td, rados->aio_events = calloc(td->o.iodepth, sizeof(struct io_u *)); if (!rados->aio_events) goto failed; - - rados->object_count = td->o.nr_files; - rados->objects = calloc(rados->object_count, sizeof(char*)); - if (!rados->objects) - goto failed; - + pthread_mutex_init(&rados->completed_lock, NULL); + pthread_cond_init(&rados->completed_more_io, NULL); + INIT_FLIST_HEAD(&rados->completed_operations); + rados->ops_scheduled = 0; + rados->ops_completed = 0; *rados_data_ptr = rados; return 0; failed: if (rados) { - rados->object_count = 0; if (rados->aio_events) free(rados->aio_events); free(rados); @@ -115,15 +117,12 @@ failed: return 1; } -static void _fio_rados_rm_objects(struct rados_data *rados) +static void _fio_rados_rm_objects(struct thread_data *td, struct rados_data *rados) { size_t i; - for (i = 0; i < rados->object_count; ++i) { - if (rados->objects[i]) { - rados_remove(rados->io_ctx, rados->objects[i]); - free(rados->objects[i]); - rados->objects[i] = NULL; - } + for (i = 0; i < td->o.nr_files; i++) { + struct fio_file *f = td->files[i]; + rados_remove(rados->io_ctx, f->file_name); } } @@ -136,7 +135,6 @@ static int _fio_rados_connect(struct thread_data *td) td->o.size / (td->o.nr_files ? td->o.nr_files : 1u); struct fio_file *f; uint32_t i; - size_t oname_len = 0; if (o->cluster_name) { char *client_name = NULL; @@ -165,6 +163,11 @@ static int _fio_rados_connect(struct thread_data *td) } else r = rados_create(&rados->cluster, o->client_name); + if (o->pool_name == NULL) { + log_err("rados pool name must be provided.\n"); + goto failed_early; + } + if (r < 0) { log_err("rados_create failed.\n"); goto failed_early; @@ -188,30 +191,18 @@ static int _fio_rados_connect(struct thread_data *td) goto failed_shutdown; } - for (i = 0; i < rados->object_count; i++) { + for (i = 0; i < td->o.nr_files; i++) { f = td->files[i]; f->real_file_size = file_size; - f->engine_pos = i; - - oname_len = strlen(f->file_name) + 32; - rados->objects[i] = malloc(oname_len); - /* vary objects for different jobs */ - snprintf(rados->objects[i], oname_len - 1, - "fio_rados_bench.%s.%x", - f->file_name, td->thread_number); - r = rados_write(rados->io_ctx, rados->objects[i], "", 0, 0); + r = rados_write(rados->io_ctx, f->file_name, "", 0, 0); if (r < 0) { - free(rados->objects[i]); - rados->objects[i] = NULL; - log_err("error creating object.\n"); goto failed_obj_create; } } - - return 0; + return 0; failed_obj_create: - _fio_rados_rm_objects(rados); + _fio_rados_rm_objects(td, rados); rados_ioctx_destroy(rados->io_ctx); rados->io_ctx = NULL; failed_shutdown: @@ -226,8 +217,6 @@ static void _fio_rados_disconnect(struct rados_data *rados) if (!rados) return; - _fio_rados_rm_objects(rados); - if (rados->io_ctx) { rados_ioctx_destroy(rados->io_ctx); rados->io_ctx = NULL; @@ -242,27 +231,43 @@ static void _fio_rados_disconnect(struct rados_data *rados) static void fio_rados_cleanup(struct thread_data *td) { struct rados_data *rados = td->io_ops_data; - if (rados) { + pthread_mutex_lock(&rados->completed_lock); + while (rados->ops_scheduled != rados->ops_completed) + pthread_cond_wait(&rados->completed_more_io, &rados->completed_lock); + pthread_mutex_unlock(&rados->completed_lock); + _fio_rados_rm_objects(td, rados); _fio_rados_disconnect(rados); - free(rados->objects); free(rados->aio_events); free(rados); } } -static enum fio_q_status -fio_rados_queue(struct thread_data *td, struct io_u *io_u) +static void complete_callback(rados_completion_t cb, void *arg) +{ + struct fio_rados_iou *fri = (struct fio_rados_iou *)arg; + struct rados_data *rados = fri->td->io_ops_data; + assert(fri->completion); + assert(rados_aio_is_complete(fri->completion)); + pthread_mutex_lock(&rados->completed_lock); + flist_add_tail(&fri->list, &rados->completed_operations); + rados->ops_completed++; + pthread_mutex_unlock(&rados->completed_lock); + pthread_cond_signal(&rados->completed_more_io); +} + +static enum fio_q_status fio_rados_queue(struct thread_data *td, + struct io_u *io_u) { struct rados_data *rados = td->io_ops_data; struct fio_rados_iou *fri = io_u->engine_data; - char *object = rados->objects[io_u->file->engine_pos]; + char *object = io_u->file->file_name; int r = -1; fio_ro_check(td, io_u); if (io_u->ddir == DDIR_WRITE) { - r = rados_aio_create_completion(fri, NULL, + r = rados_aio_create_completion(fri, complete_callback, NULL, &fri->completion); if (r < 0) { log_err("rados_aio_create_completion failed.\n"); @@ -275,9 +280,10 @@ fio_rados_queue(struct thread_data *td, struct io_u *io_u) log_err("rados_write failed.\n"); goto failed_comp; } + rados->ops_scheduled++; return FIO_Q_QUEUED; } else if (io_u->ddir == DDIR_READ) { - r = rados_aio_create_completion(fri, NULL, + r = rados_aio_create_completion(fri, complete_callback, NULL, &fri->completion); if (r < 0) { log_err("rados_aio_create_completion failed.\n"); @@ -289,9 +295,10 @@ fio_rados_queue(struct thread_data *td, struct io_u *io_u) log_err("rados_aio_read failed.\n"); goto failed_comp; } + rados->ops_scheduled++; return FIO_Q_QUEUED; } else if (io_u->ddir == DDIR_TRIM) { - r = rados_aio_create_completion(fri, NULL, + r = rados_aio_create_completion(fri, complete_callback, NULL , &fri->completion); if (r < 0) { log_err("rados_aio_create_completion failed.\n"); @@ -310,6 +317,7 @@ fio_rados_queue(struct thread_data *td, struct io_u *io_u) log_err("rados_aio_write_op_operate failed.\n"); goto failed_write_op; } + rados->ops_scheduled++; return FIO_Q_QUEUED; } @@ -335,50 +343,33 @@ int fio_rados_getevents(struct thread_data *td, unsigned int min, unsigned int max, const struct timespec *t) { struct rados_data *rados = td->io_ops_data; - struct rados_options *o = td->eo; - int busy_poll = o->busy_poll; unsigned int events = 0; - struct io_u *u; struct fio_rados_iou *fri; - unsigned int i; - rados_completion_t first_unfinished; - int observed_new = 0; - - /* loop through inflight ios until we find 'min' completions */ - do { - first_unfinished = NULL; - io_u_qiter(&td->io_u_all, u, i) { - if (!(u->flags & IO_U_F_FLIGHT)) - continue; - - fri = u->engine_data; - if (fri->completion) { - if (rados_aio_is_complete(fri->completion)) { - if (fri->write_op != NULL) { - rados_release_write_op(fri->write_op); - fri->write_op = NULL; - } - rados_aio_release(fri->completion); - fri->completion = NULL; - rados->aio_events[events] = u; - events++; - observed_new = 1; - } else if (first_unfinished == NULL) { - first_unfinished = fri->completion; - } - } - if (events >= max) - break; + + pthread_mutex_lock(&rados->completed_lock); + while (events < min) { + while (flist_empty(&rados->completed_operations)) { + pthread_cond_wait(&rados->completed_more_io, &rados->completed_lock); } - if (events >= min) - return events; - if (first_unfinished == NULL || busy_poll) - continue; - - if (!observed_new) - rados_aio_wait_for_complete(first_unfinished); - } while (1); - return events; + assert(!flist_empty(&rados->completed_operations)); + + fri = flist_first_entry(&rados->completed_operations, struct fio_rados_iou, list); + assert(fri->completion); + assert(rados_aio_is_complete(fri->completion)); + if (fri->write_op != NULL) { + rados_release_write_op(fri->write_op); + fri->write_op = NULL; + } + rados_aio_release(fri->completion); + fri->completion = NULL; + + rados->aio_events[events] = fri->io_u; + events ++; + flist_del(&fri->list); + if (events >= max) break; + } + pthread_mutex_unlock(&rados->completed_lock); + return events; } static int fio_rados_setup(struct thread_data *td) @@ -447,12 +438,13 @@ static int fio_rados_io_u_init(struct thread_data *td, struct io_u *io_u) fri = calloc(1, sizeof(*fri)); fri->io_u = io_u; fri->td = td; + INIT_FLIST_HEAD(&fri->list); io_u->engine_data = fri; return 0; } /* ioengine_ops for get_ioengine() */ -static struct ioengine_ops ioengine = { +FIO_STATIC struct ioengine_ops ioengine = { .name = "rados", .version = FIO_IOOPS_VERSION, .flags = FIO_DISKLESSIO,