4 * IO engine using Ceph's RADOS interface to test low-level performance of
9 #include <rados/librados.h>
12 #include "../optgroup.h"
17 struct io_u **aio_events;
19 pthread_mutex_t completed_lock;
20 pthread_cond_t completed_more_io;
21 struct flist_head completed_operations;
24 struct fio_rados_iou {
25 struct flist_head list;
26 struct thread_data *td;
28 rados_completion_t completion;
29 rados_write_op_t write_op;
32 /* fio configuration options read from the job file */
33 struct rados_options {
41 static struct fio_option options[] = {
43 .name = "clustername",
44 .lname = "ceph cluster name",
45 .type = FIO_OPT_STR_STORE,
46 .help = "Cluster name for ceph",
47 .off1 = offsetof(struct rados_options, cluster_name),
48 .category = FIO_OPT_C_ENGINE,
49 .group = FIO_OPT_G_RBD,
53 .lname = "pool name to use",
54 .type = FIO_OPT_STR_STORE,
55 .help = "Ceph pool name to benchmark against",
56 .off1 = offsetof(struct rados_options, pool_name),
57 .category = FIO_OPT_C_ENGINE,
58 .group = FIO_OPT_G_RBD,
62 .lname = "rados engine clientname",
63 .type = FIO_OPT_STR_STORE,
64 .help = "Name of the ceph client to access RADOS engine",
65 .off1 = offsetof(struct rados_options, client_name),
66 .category = FIO_OPT_C_ENGINE,
67 .group = FIO_OPT_G_RBD,
71 .lname = "busy poll mode",
73 .help = "Busy poll for completions instead of sleeping",
74 .off1 = offsetof(struct rados_options, busy_poll),
76 .category = FIO_OPT_C_ENGINE,
77 .group = FIO_OPT_G_RBD,
84 static int _fio_setup_rados_data(struct thread_data *td,
85 struct rados_data **rados_data_ptr)
87 struct rados_data *rados;
92 rados = calloc(1, sizeof(struct rados_data));
96 rados->connected = false;
98 rados->aio_events = calloc(td->o.iodepth, sizeof(struct io_u *));
99 if (!rados->aio_events)
101 pthread_mutex_init(&rados->completed_lock, NULL);
102 pthread_cond_init(&rados->completed_more_io, NULL);
103 INIT_FLIST_HEAD(&rados->completed_operations);
104 *rados_data_ptr = rados;
109 if (rados->aio_events)
110 free(rados->aio_events);
116 static void _fio_rados_rm_objects(struct thread_data *td, struct rados_data *rados)
119 for (i = 0; i < td->o.nr_files; i++) {
120 struct fio_file *f = td->files[i];
121 rados_remove(rados->io_ctx, f->file_name);
125 static int _fio_rados_connect(struct thread_data *td)
127 struct rados_data *rados = td->io_ops_data;
128 struct rados_options *o = td->eo;
130 const uint64_t file_size =
131 td->o.size / (td->o.nr_files ? td->o.nr_files : 1u);
135 if (o->cluster_name) {
136 char *client_name = NULL;
139 * If we specify cluser name, the rados_create2
140 * will not assume 'client.'. name is considered
141 * as a full type.id namestr
143 if (o->client_name) {
144 if (!index(o->client_name, '.')) {
145 client_name = calloc(1, strlen("client.") +
146 strlen(o->client_name) + 1);
147 strcat(client_name, "client.");
148 strcat(client_name, o->client_name);
150 client_name = o->client_name;
154 r = rados_create2(&rados->cluster, o->cluster_name,
157 if (client_name && !index(o->client_name, '.'))
160 r = rados_create(&rados->cluster, o->client_name);
162 if (o->pool_name == NULL) {
163 log_err("rados pool name must be provided.\n");
168 log_err("rados_create failed.\n");
172 r = rados_conf_read_file(rados->cluster, NULL);
174 log_err("rados_conf_read_file failed.\n");
178 r = rados_connect(rados->cluster);
180 log_err("rados_connect failed.\n");
184 r = rados_ioctx_create(rados->cluster, o->pool_name, &rados->io_ctx);
186 log_err("rados_ioctx_create failed.\n");
187 goto failed_shutdown;
190 for (i = 0; i < td->o.nr_files; i++) {
192 f->real_file_size = file_size;
193 r = rados_write(rados->io_ctx, f->file_name, "", 0, 0);
195 goto failed_obj_create;
201 _fio_rados_rm_objects(td, rados);
202 rados_ioctx_destroy(rados->io_ctx);
203 rados->io_ctx = NULL;
205 rados_shutdown(rados->cluster);
206 rados->cluster = NULL;
211 static void _fio_rados_disconnect(struct rados_data *rados)
217 rados_ioctx_destroy(rados->io_ctx);
218 rados->io_ctx = NULL;
221 if (rados->cluster) {
222 rados_shutdown(rados->cluster);
223 rados->cluster = NULL;
227 static void fio_rados_cleanup(struct thread_data *td)
229 struct rados_data *rados = td->io_ops_data;
232 _fio_rados_rm_objects(td, rados);
233 _fio_rados_disconnect(rados);
234 free(rados->aio_events);
239 static void complete_callback(rados_completion_t cb, void *arg)
241 struct fio_rados_iou *fri = (struct fio_rados_iou *)arg;
242 struct rados_data *rados = fri->td->io_ops_data;
243 assert(fri->completion);
244 assert(rados_aio_is_complete(fri->completion));
245 pthread_mutex_lock(&rados->completed_lock);
246 flist_add_tail(&fri->list, &rados->completed_operations);
247 pthread_mutex_unlock(&rados->completed_lock);
248 pthread_cond_signal(&rados->completed_more_io);
251 static enum fio_q_status fio_rados_queue(struct thread_data *td,
254 struct rados_data *rados = td->io_ops_data;
255 struct fio_rados_iou *fri = io_u->engine_data;
256 char *object = io_u->file->file_name;
259 fio_ro_check(td, io_u);
261 if (io_u->ddir == DDIR_WRITE) {
262 r = rados_aio_create_completion(fri, complete_callback,
263 NULL, &fri->completion);
265 log_err("rados_aio_create_completion failed.\n");
269 r = rados_aio_write(rados->io_ctx, object, fri->completion,
270 io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
272 log_err("rados_write failed.\n");
276 } else if (io_u->ddir == DDIR_READ) {
277 r = rados_aio_create_completion(fri, complete_callback,
278 NULL, &fri->completion);
280 log_err("rados_aio_create_completion failed.\n");
283 r = rados_aio_read(rados->io_ctx, object, fri->completion,
284 io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
286 log_err("rados_aio_read failed.\n");
290 } else if (io_u->ddir == DDIR_TRIM) {
291 r = rados_aio_create_completion(fri, complete_callback,
292 NULL , &fri->completion);
294 log_err("rados_aio_create_completion failed.\n");
297 fri->write_op = rados_create_write_op();
298 if (fri->write_op == NULL) {
299 log_err("rados_create_write_op failed.\n");
302 rados_write_op_zero(fri->write_op, io_u->offset,
304 r = rados_aio_write_op_operate(fri->write_op, rados->io_ctx,
305 fri->completion, object, NULL, 0);
307 log_err("rados_aio_write_op_operate failed.\n");
308 goto failed_write_op;
313 log_err("WARNING: Only DDIR_READ, DDIR_WRITE and DDIR_TRIM are supported!");
316 rados_release_write_op(fri->write_op);
318 rados_aio_release(fri->completion);
321 td_verror(td, io_u->error, "xfer");
322 return FIO_Q_COMPLETED;
325 static struct io_u *fio_rados_event(struct thread_data *td, int event)
327 struct rados_data *rados = td->io_ops_data;
328 return rados->aio_events[event];
331 int fio_rados_getevents(struct thread_data *td, unsigned int min,
332 unsigned int max, const struct timespec *t)
334 struct rados_data *rados = td->io_ops_data;
335 unsigned int events = 0;
336 struct fio_rados_iou *fri;
338 pthread_mutex_lock(&rados->completed_lock);
339 while (events < min) {
340 while (flist_empty(&rados->completed_operations)) {
341 pthread_cond_wait(&rados->completed_more_io, &rados->completed_lock);
343 assert(!flist_empty(&rados->completed_operations));
345 fri = flist_first_entry(&rados->completed_operations, struct fio_rados_iou, list);
346 assert(fri->completion);
347 assert(rados_aio_is_complete(fri->completion));
348 if (fri->write_op != NULL) {
349 rados_release_write_op(fri->write_op);
350 fri->write_op = NULL;
352 rados_aio_release(fri->completion);
353 fri->completion = NULL;
355 rados->aio_events[events] = fri->io_u;
357 flist_del(&fri->list);
358 if (events >= max) break;
360 pthread_mutex_unlock(&rados->completed_lock);
364 static int fio_rados_setup(struct thread_data *td)
366 struct rados_data *rados = NULL;
368 /* allocate engine specific structure to deal with librados. */
369 r = _fio_setup_rados_data(td, &rados);
371 log_err("fio_setup_rados_data failed.\n");
374 td->io_ops_data = rados;
376 /* Force single process mode.
378 td->o.use_thread = 1;
380 /* connect in the main thread to determine to determine
381 * the size of the given RADOS block device. And disconnect
384 r = _fio_rados_connect(td);
386 log_err("fio_rados_connect failed.\n");
389 rados->connected = true;
393 fio_rados_cleanup(td);
397 /* open/invalidate are noops. we set the FIO_DISKLESSIO flag in ioengine_ops to
398 prevent fio from creating the files
400 static int fio_rados_open(struct thread_data *td, struct fio_file *f)
404 static int fio_rados_invalidate(struct thread_data *td, struct fio_file *f)
409 static void fio_rados_io_u_free(struct thread_data *td, struct io_u *io_u)
411 struct fio_rados_iou *fri = io_u->engine_data;
414 io_u->engine_data = NULL;
417 rados_aio_release(fri->completion);
419 rados_release_write_op(fri->write_op);
424 static int fio_rados_io_u_init(struct thread_data *td, struct io_u *io_u)
426 struct fio_rados_iou *fri;
427 fri = calloc(1, sizeof(*fri));
430 INIT_FLIST_HEAD(&fri->list);
431 io_u->engine_data = fri;
435 /* ioengine_ops for get_ioengine() */
436 static struct ioengine_ops ioengine = {
438 .version = FIO_IOOPS_VERSION,
439 .flags = FIO_DISKLESSIO,
440 .setup = fio_rados_setup,
441 .queue = fio_rados_queue,
442 .getevents = fio_rados_getevents,
443 .event = fio_rados_event,
444 .cleanup = fio_rados_cleanup,
445 .open_file = fio_rados_open,
446 .invalidate = fio_rados_invalidate,
448 .io_u_init = fio_rados_io_u_init,
449 .io_u_free = fio_rados_io_u_free,
450 .option_struct_size = sizeof(struct rados_options),
453 static void fio_init fio_rados_register(void)
455 register_ioengine(&ioengine);
458 static void fio_exit fio_rados_unregister(void)
460 unregister_ioengine(&ioengine);