4 * IO engine using Ceph's RADOS interface to test low-level performance of
9 #include <rados/librados.h>
12 #include "../optgroup.h"
17 struct io_u **aio_events;
19 pthread_mutex_t completed_lock;
20 pthread_cond_t completed_more_io;
21 struct flist_head completed_operations;
22 uint64_t ops_scheduled;
23 uint64_t ops_completed;
26 struct fio_rados_iou {
27 struct flist_head list;
28 struct thread_data *td;
30 rados_completion_t completion;
31 rados_write_op_t write_op;
34 /* fio configuration options read from the job file */
35 struct rados_options {
43 static struct fio_option options[] = {
45 .name = "clustername",
46 .lname = "ceph cluster name",
47 .type = FIO_OPT_STR_STORE,
48 .help = "Cluster name for ceph",
49 .off1 = offsetof(struct rados_options, cluster_name),
50 .category = FIO_OPT_C_ENGINE,
51 .group = FIO_OPT_G_RBD,
55 .lname = "pool name to use",
56 .type = FIO_OPT_STR_STORE,
57 .help = "Ceph pool name to benchmark against",
58 .off1 = offsetof(struct rados_options, pool_name),
59 .category = FIO_OPT_C_ENGINE,
60 .group = FIO_OPT_G_RBD,
64 .lname = "rados engine clientname",
65 .type = FIO_OPT_STR_STORE,
66 .help = "Name of the ceph client to access RADOS engine",
67 .off1 = offsetof(struct rados_options, client_name),
68 .category = FIO_OPT_C_ENGINE,
69 .group = FIO_OPT_G_RBD,
73 .lname = "busy poll mode",
75 .help = "Busy poll for completions instead of sleeping",
76 .off1 = offsetof(struct rados_options, busy_poll),
78 .category = FIO_OPT_C_ENGINE,
79 .group = FIO_OPT_G_RBD,
86 static int _fio_setup_rados_data(struct thread_data *td,
87 struct rados_data **rados_data_ptr)
89 struct rados_data *rados;
94 rados = calloc(1, sizeof(struct rados_data));
98 rados->connected = false;
100 rados->aio_events = calloc(td->o.iodepth, sizeof(struct io_u *));
101 if (!rados->aio_events)
103 pthread_mutex_init(&rados->completed_lock, NULL);
104 pthread_cond_init(&rados->completed_more_io, NULL);
105 INIT_FLIST_HEAD(&rados->completed_operations);
106 rados->ops_scheduled = 0;
107 rados->ops_completed = 0;
108 *rados_data_ptr = rados;
113 if (rados->aio_events)
114 free(rados->aio_events);
120 static void _fio_rados_rm_objects(struct thread_data *td, struct rados_data *rados)
123 for (i = 0; i < td->o.nr_files; i++) {
124 struct fio_file *f = td->files[i];
125 rados_remove(rados->io_ctx, f->file_name);
129 static int _fio_rados_connect(struct thread_data *td)
131 struct rados_data *rados = td->io_ops_data;
132 struct rados_options *o = td->eo;
134 const uint64_t file_size =
135 td->o.size / (td->o.nr_files ? td->o.nr_files : 1u);
139 if (o->cluster_name) {
140 char *client_name = NULL;
143 * If we specify cluser name, the rados_create2
144 * will not assume 'client.'. name is considered
145 * as a full type.id namestr
147 if (o->client_name) {
148 if (!index(o->client_name, '.')) {
149 client_name = calloc(1, strlen("client.") +
150 strlen(o->client_name) + 1);
151 strcat(client_name, "client.");
152 strcat(client_name, o->client_name);
154 client_name = o->client_name;
158 r = rados_create2(&rados->cluster, o->cluster_name,
161 if (client_name && !index(o->client_name, '.'))
164 r = rados_create(&rados->cluster, o->client_name);
166 if (o->pool_name == NULL) {
167 log_err("rados pool name must be provided.\n");
172 log_err("rados_create failed.\n");
176 r = rados_conf_read_file(rados->cluster, NULL);
178 log_err("rados_conf_read_file failed.\n");
182 r = rados_connect(rados->cluster);
184 log_err("rados_connect failed.\n");
188 r = rados_ioctx_create(rados->cluster, o->pool_name, &rados->io_ctx);
190 log_err("rados_ioctx_create failed.\n");
191 goto failed_shutdown;
194 for (i = 0; i < td->o.nr_files; i++) {
196 f->real_file_size = file_size;
197 r = rados_write(rados->io_ctx, f->file_name, "", 0, 0);
199 goto failed_obj_create;
205 _fio_rados_rm_objects(td, rados);
206 rados_ioctx_destroy(rados->io_ctx);
207 rados->io_ctx = NULL;
209 rados_shutdown(rados->cluster);
210 rados->cluster = NULL;
215 static void _fio_rados_disconnect(struct rados_data *rados)
221 rados_ioctx_destroy(rados->io_ctx);
222 rados->io_ctx = NULL;
225 if (rados->cluster) {
226 rados_shutdown(rados->cluster);
227 rados->cluster = NULL;
231 static void fio_rados_cleanup(struct thread_data *td)
233 struct rados_data *rados = td->io_ops_data;
235 pthread_mutex_lock(&rados->completed_lock);
236 while (rados->ops_scheduled != rados->ops_completed)
237 pthread_cond_wait(&rados->completed_more_io, &rados->completed_lock);
238 pthread_mutex_unlock(&rados->completed_lock);
239 _fio_rados_rm_objects(td, rados);
240 _fio_rados_disconnect(rados);
241 free(rados->aio_events);
246 static void complete_callback(rados_completion_t cb, void *arg)
248 struct fio_rados_iou *fri = (struct fio_rados_iou *)arg;
249 struct rados_data *rados = fri->td->io_ops_data;
250 assert(fri->completion);
251 assert(rados_aio_is_complete(fri->completion));
252 pthread_mutex_lock(&rados->completed_lock);
253 flist_add_tail(&fri->list, &rados->completed_operations);
254 rados->ops_completed++;
255 pthread_mutex_unlock(&rados->completed_lock);
256 pthread_cond_signal(&rados->completed_more_io);
259 static enum fio_q_status fio_rados_queue(struct thread_data *td,
262 struct rados_data *rados = td->io_ops_data;
263 struct fio_rados_iou *fri = io_u->engine_data;
264 char *object = io_u->file->file_name;
267 fio_ro_check(td, io_u);
269 if (io_u->ddir == DDIR_WRITE) {
270 r = rados_aio_create_completion(fri, complete_callback,
271 NULL, &fri->completion);
273 log_err("rados_aio_create_completion failed.\n");
277 r = rados_aio_write(rados->io_ctx, object, fri->completion,
278 io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
280 log_err("rados_write failed.\n");
283 rados->ops_scheduled++;
285 } else if (io_u->ddir == DDIR_READ) {
286 r = rados_aio_create_completion(fri, complete_callback,
287 NULL, &fri->completion);
289 log_err("rados_aio_create_completion failed.\n");
292 r = rados_aio_read(rados->io_ctx, object, fri->completion,
293 io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
295 log_err("rados_aio_read failed.\n");
298 rados->ops_scheduled++;
300 } else if (io_u->ddir == DDIR_TRIM) {
301 r = rados_aio_create_completion(fri, complete_callback,
302 NULL , &fri->completion);
304 log_err("rados_aio_create_completion failed.\n");
307 fri->write_op = rados_create_write_op();
308 if (fri->write_op == NULL) {
309 log_err("rados_create_write_op failed.\n");
312 rados_write_op_zero(fri->write_op, io_u->offset,
314 r = rados_aio_write_op_operate(fri->write_op, rados->io_ctx,
315 fri->completion, object, NULL, 0);
317 log_err("rados_aio_write_op_operate failed.\n");
318 goto failed_write_op;
320 rados->ops_scheduled++;
324 log_err("WARNING: Only DDIR_READ, DDIR_WRITE and DDIR_TRIM are supported!");
327 rados_release_write_op(fri->write_op);
329 rados_aio_release(fri->completion);
332 td_verror(td, io_u->error, "xfer");
333 return FIO_Q_COMPLETED;
336 static struct io_u *fio_rados_event(struct thread_data *td, int event)
338 struct rados_data *rados = td->io_ops_data;
339 return rados->aio_events[event];
342 int fio_rados_getevents(struct thread_data *td, unsigned int min,
343 unsigned int max, const struct timespec *t)
345 struct rados_data *rados = td->io_ops_data;
346 unsigned int events = 0;
347 struct fio_rados_iou *fri;
349 pthread_mutex_lock(&rados->completed_lock);
350 while (events < min) {
351 while (flist_empty(&rados->completed_operations)) {
352 pthread_cond_wait(&rados->completed_more_io, &rados->completed_lock);
354 assert(!flist_empty(&rados->completed_operations));
356 fri = flist_first_entry(&rados->completed_operations, struct fio_rados_iou, list);
357 assert(fri->completion);
358 assert(rados_aio_is_complete(fri->completion));
359 if (fri->write_op != NULL) {
360 rados_release_write_op(fri->write_op);
361 fri->write_op = NULL;
363 rados_aio_release(fri->completion);
364 fri->completion = NULL;
366 rados->aio_events[events] = fri->io_u;
368 flist_del(&fri->list);
369 if (events >= max) break;
371 pthread_mutex_unlock(&rados->completed_lock);
375 static int fio_rados_setup(struct thread_data *td)
377 struct rados_data *rados = NULL;
379 /* allocate engine specific structure to deal with librados. */
380 r = _fio_setup_rados_data(td, &rados);
382 log_err("fio_setup_rados_data failed.\n");
385 td->io_ops_data = rados;
387 /* Force single process mode.
389 td->o.use_thread = 1;
391 /* connect in the main thread to determine to determine
392 * the size of the given RADOS block device. And disconnect
395 r = _fio_rados_connect(td);
397 log_err("fio_rados_connect failed.\n");
400 rados->connected = true;
404 fio_rados_cleanup(td);
408 /* open/invalidate are noops. we set the FIO_DISKLESSIO flag in ioengine_ops to
409 prevent fio from creating the files
411 static int fio_rados_open(struct thread_data *td, struct fio_file *f)
415 static int fio_rados_invalidate(struct thread_data *td, struct fio_file *f)
420 static void fio_rados_io_u_free(struct thread_data *td, struct io_u *io_u)
422 struct fio_rados_iou *fri = io_u->engine_data;
425 io_u->engine_data = NULL;
428 rados_aio_release(fri->completion);
430 rados_release_write_op(fri->write_op);
435 static int fio_rados_io_u_init(struct thread_data *td, struct io_u *io_u)
437 struct fio_rados_iou *fri;
438 fri = calloc(1, sizeof(*fri));
441 INIT_FLIST_HEAD(&fri->list);
442 io_u->engine_data = fri;
446 /* ioengine_ops for get_ioengine() */
447 FIO_STATIC struct ioengine_ops ioengine = {
449 .version = FIO_IOOPS_VERSION,
450 .flags = FIO_DISKLESSIO,
451 .setup = fio_rados_setup,
452 .queue = fio_rados_queue,
453 .getevents = fio_rados_getevents,
454 .event = fio_rados_event,
455 .cleanup = fio_rados_cleanup,
456 .open_file = fio_rados_open,
457 .invalidate = fio_rados_invalidate,
459 .io_u_init = fio_rados_io_u_init,
460 .io_u_free = fio_rados_io_u_free,
461 .option_struct_size = sizeof(struct rados_options),
464 static void fio_init fio_rados_register(void)
466 register_ioengine(&ioengine);
469 static void fio_exit fio_rados_unregister(void)
471 unregister_ioengine(&ioengine);