#include "fio.h"
#include "../optgroup.h"
-struct fio_rados_iou {
- struct thread_data *td;
- struct io_u *io_u;
- rados_completion_t completion;
- rados_write_op_t write_op;
-};
-
struct rados_data {
rados_t cluster;
rados_ioctx_t io_ctx;
struct io_u **aio_events;
bool connected;
+ pthread_mutex_t completed_lock;
+ pthread_cond_t completed_more_io;
+ struct flist_head completed_operations;
+ uint64_t ops_scheduled;
+ uint64_t ops_completed;
+};
+
+struct fio_rados_iou {
+ struct flist_head list;
+ struct thread_data *td;
+ struct io_u *io_u;
+ rados_completion_t completion;
+ rados_write_op_t write_op;
};
/* fio configuration options read from the job file */
char *pool_name;
char *client_name;
int busy_poll;
+ int touch_objects;
};
static struct fio_option options[] = {
.category = FIO_OPT_C_ENGINE,
.group = FIO_OPT_G_RBD,
},
+ {
+ .name = "touch_objects",
+ .lname = "touch objects on start",
+ .type = FIO_OPT_BOOL,
+ .help = "Touch (create) objects on start",
+ .off1 = offsetof(struct rados_options, touch_objects),
+ .def = "1",
+ .category = FIO_OPT_C_ENGINE,
+ .group = FIO_OPT_G_RBD,
+ },
{
.name = NULL,
},
rados->aio_events = calloc(td->o.iodepth, sizeof(struct io_u *));
if (!rados->aio_events)
goto failed;
+ pthread_mutex_init(&rados->completed_lock, NULL);
+ pthread_cond_init(&rados->completed_more_io, NULL);
+ INIT_FLIST_HEAD(&rados->completed_operations);
+ rados->ops_scheduled = 0;
+ rados->ops_completed = 0;
*rados_data_ptr = rados;
return 0;
for (i = 0; i < td->o.nr_files; i++) {
f = td->files[i];
f->real_file_size = file_size;
- r = rados_write(rados->io_ctx, f->file_name, "", 0, 0);
- if (r < 0) {
- goto failed_obj_create;
+ if (o->touch_objects) {
+ r = rados_write(rados->io_ctx, f->file_name, "", 0, 0);
+ if (r < 0) {
+ goto failed_obj_create;
+ }
}
}
return 0;
static void fio_rados_cleanup(struct thread_data *td)
{
struct rados_data *rados = td->io_ops_data;
-
if (rados) {
+ pthread_mutex_lock(&rados->completed_lock);
+ while (rados->ops_scheduled != rados->ops_completed)
+ pthread_cond_wait(&rados->completed_more_io, &rados->completed_lock);
+ pthread_mutex_unlock(&rados->completed_lock);
_fio_rados_rm_objects(td, rados);
_fio_rados_disconnect(rados);
free(rados->aio_events);
}
}
+static void complete_callback(rados_completion_t cb, void *arg)
+{
+ struct fio_rados_iou *fri = (struct fio_rados_iou *)arg;
+ struct rados_data *rados = fri->td->io_ops_data;
+ assert(fri->completion);
+ assert(rados_aio_is_complete(fri->completion));
+ pthread_mutex_lock(&rados->completed_lock);
+ flist_add_tail(&fri->list, &rados->completed_operations);
+ rados->ops_completed++;
+ pthread_mutex_unlock(&rados->completed_lock);
+ pthread_cond_signal(&rados->completed_more_io);
+}
+
static enum fio_q_status fio_rados_queue(struct thread_data *td,
struct io_u *io_u)
{
fio_ro_check(td, io_u);
if (io_u->ddir == DDIR_WRITE) {
- r = rados_aio_create_completion(fri, NULL,
+ r = rados_aio_create_completion(fri, complete_callback,
NULL, &fri->completion);
if (r < 0) {
log_err("rados_aio_create_completion failed.\n");
log_err("rados_write failed.\n");
goto failed_comp;
}
+ rados->ops_scheduled++;
return FIO_Q_QUEUED;
} else if (io_u->ddir == DDIR_READ) {
- r = rados_aio_create_completion(fri, NULL,
+ r = rados_aio_create_completion(fri, complete_callback,
NULL, &fri->completion);
if (r < 0) {
log_err("rados_aio_create_completion failed.\n");
log_err("rados_aio_read failed.\n");
goto failed_comp;
}
+ rados->ops_scheduled++;
return FIO_Q_QUEUED;
} else if (io_u->ddir == DDIR_TRIM) {
- r = rados_aio_create_completion(fri, NULL,
+ r = rados_aio_create_completion(fri, complete_callback,
NULL , &fri->completion);
if (r < 0) {
log_err("rados_aio_create_completion failed.\n");
log_err("rados_aio_write_op_operate failed.\n");
goto failed_write_op;
}
+ rados->ops_scheduled++;
return FIO_Q_QUEUED;
}
unsigned int max, const struct timespec *t)
{
struct rados_data *rados = td->io_ops_data;
- struct rados_options *o = td->eo;
- int busy_poll = o->busy_poll;
unsigned int events = 0;
- struct io_u *u;
struct fio_rados_iou *fri;
- unsigned int i;
- rados_completion_t first_unfinished;
- int observed_new = 0;
-
- /* loop through inflight ios until we find 'min' completions */
- do {
- first_unfinished = NULL;
- io_u_qiter(&td->io_u_all, u, i) {
- if (!(u->flags & IO_U_F_FLIGHT))
- continue;
-
- fri = u->engine_data;
- if (fri->completion) {
- if (rados_aio_is_complete(fri->completion)) {
- if (fri->write_op != NULL) {
- rados_release_write_op(fri->write_op);
- fri->write_op = NULL;
- }
- rados_aio_release(fri->completion);
- fri->completion = NULL;
- rados->aio_events[events] = u;
- events++;
- observed_new = 1;
- } else if (first_unfinished == NULL) {
- first_unfinished = fri->completion;
- }
- }
- if (events >= max)
- break;
+
+ pthread_mutex_lock(&rados->completed_lock);
+ while (events < min) {
+ while (flist_empty(&rados->completed_operations)) {
+ pthread_cond_wait(&rados->completed_more_io, &rados->completed_lock);
}
- if (events >= min)
- return events;
- if (first_unfinished == NULL || busy_poll)
- continue;
-
- if (!observed_new)
- rados_aio_wait_for_complete(first_unfinished);
- } while (1);
- return events;
+ assert(!flist_empty(&rados->completed_operations));
+
+ fri = flist_first_entry(&rados->completed_operations, struct fio_rados_iou, list);
+ assert(fri->completion);
+ assert(rados_aio_is_complete(fri->completion));
+ if (fri->write_op != NULL) {
+ rados_release_write_op(fri->write_op);
+ fri->write_op = NULL;
+ }
+ rados_aio_release(fri->completion);
+ fri->completion = NULL;
+
+ rados->aio_events[events] = fri->io_u;
+ events ++;
+ flist_del(&fri->list);
+ if (events >= max) break;
+ }
+ pthread_mutex_unlock(&rados->completed_lock);
+ return events;
}
static int fio_rados_setup(struct thread_data *td)
fri = calloc(1, sizeof(*fri));
fri->io_u = io_u;
fri->td = td;
+ INIT_FLIST_HEAD(&fri->list);
io_u->engine_data = fri;
return 0;
}
/* ioengine_ops for get_ioengine() */
-static struct ioengine_ops ioengine = {
+FIO_STATIC struct ioengine_ops ioengine = {
.name = "rados",
.version = FIO_IOOPS_VERSION,
.flags = FIO_DISKLESSIO,