Add support for blkin tracing in rbd engine
[fio.git] / engines / rbd.c
CommitLineData
fc5c0345
DG
1/*
2 * rbd engine
3 *
4 * IO engine using Ceph's librbd to test RADOS Block Devices.
5 *
6 */
7
8#include <rbd/librbd.h>
9
10#include "../fio.h"
d220c761 11#include "../optgroup.h"
a4c4c346 12#ifdef CONFIG_RBD_BLKIN
13#include <zipkin_c.h>
14#endif
fc5c0345
DG
15
16struct fio_rbd_iou {
17 struct io_u *io_u;
d8b64af2 18 rbd_completion_t completion;
d8b64af2 19 int io_seen;
b8ecbef6 20 int io_complete;
a4c4c346 21 struct blkin_trace_info info;
fc5c0345
DG
22};
23
24struct rbd_data {
25 rados_t cluster;
26 rados_ioctx_t io_ctx;
27 rbd_image_t image;
28 struct io_u **aio_events;
6f9961ac 29 struct io_u **sort_events;
fc5c0345
DG
30};
31
32struct rbd_options {
a1f871c7 33 void *pad;
6e20c6e7 34 char *cluster_name;
fc5c0345
DG
35 char *rbd_name;
36 char *pool_name;
37 char *client_name;
d7d702c7 38 int busy_poll;
fc5c0345
DG
39};
40
41static struct fio_option options[] = {
6e20c6e7
T
42 {
43 .name = "clustername",
44 .lname = "ceph cluster name",
45 .type = FIO_OPT_STR_STORE,
46 .help = "Cluster name for ceph",
47 .off1 = offsetof(struct rbd_options, cluster_name),
48 .category = FIO_OPT_C_ENGINE,
49 .group = FIO_OPT_G_RBD,
50 },
fc5c0345 51 {
d8b64af2
JA
52 .name = "rbdname",
53 .lname = "rbd engine rbdname",
54 .type = FIO_OPT_STR_STORE,
55 .help = "RBD name for RBD engine",
56 .off1 = offsetof(struct rbd_options, rbd_name),
57 .category = FIO_OPT_C_ENGINE,
58 .group = FIO_OPT_G_RBD,
59 },
fc5c0345 60 {
d7d702c7
JA
61 .name = "pool",
62 .lname = "rbd engine pool",
63 .type = FIO_OPT_STR_STORE,
64 .help = "Name of the pool hosting the RBD for the RBD engine",
65 .off1 = offsetof(struct rbd_options, pool_name),
66 .category = FIO_OPT_C_ENGINE,
67 .group = FIO_OPT_G_RBD,
d8b64af2 68 },
fc5c0345 69 {
d7d702c7
JA
70 .name = "clientname",
71 .lname = "rbd engine clientname",
72 .type = FIO_OPT_STR_STORE,
73 .help = "Name of the ceph client to access the RBD for the RBD engine",
74 .off1 = offsetof(struct rbd_options, client_name),
75 .category = FIO_OPT_C_ENGINE,
76 .group = FIO_OPT_G_RBD,
77 },
78 {
79 .name = "busy_poll",
80 .lname = "Busy poll",
81 .type = FIO_OPT_BOOL,
82 .help = "Busy poll for completions instead of sleeping",
fea585d4 83 .off1 = offsetof(struct rbd_options, busy_poll),
d7d702c7
JA
84 .def = "0",
85 .category = FIO_OPT_C_ENGINE,
86 .group = FIO_OPT_G_RBD,
d8b64af2 87 },
fc5c0345 88 {
d8b64af2
JA
89 .name = NULL,
90 },
fc5c0345
DG
91};
92
93static int _fio_setup_rbd_data(struct thread_data *td,
94 struct rbd_data **rbd_data_ptr)
95{
6f9961ac 96 struct rbd_data *rbd;
fc5c0345
DG
97
98 if (td->io_ops->data)
99 return 0;
100
6f9961ac
JA
101 rbd = calloc(1, sizeof(struct rbd_data));
102 if (!rbd)
fc5c0345
DG
103 goto failed;
104
6f9961ac
JA
105 rbd->aio_events = calloc(td->o.iodepth, sizeof(struct io_u *));
106 if (!rbd->aio_events)
fc5c0345
DG
107 goto failed;
108
6f9961ac
JA
109 rbd->sort_events = calloc(td->o.iodepth, sizeof(struct io_u *));
110 if (!rbd->sort_events)
111 goto failed;
fc5c0345 112
6f9961ac 113 *rbd_data_ptr = rbd;
fc5c0345
DG
114 return 0;
115
116failed:
6f9961ac
JA
117 if (rbd)
118 free(rbd);
fc5c0345
DG
119 return 1;
120
121}
122
123static int _fio_rbd_connect(struct thread_data *td)
124{
6f9961ac 125 struct rbd_data *rbd = td->io_ops->data;
fc5c0345
DG
126 struct rbd_options *o = td->eo;
127 int r;
128
6e20c6e7
T
129 if (o->cluster_name) {
130 char *client_name = NULL;
131
132 /*
133 * If we specify cluser name, the rados_creat2
134 * will not assume 'client.'. name is considered
135 * as a full type.id namestr
136 */
137 if (!index(o->client_name, '.')) {
138 client_name = calloc(1, strlen("client.") +
139 strlen(o->client_name) + 1);
140 strcat(client_name, "client.");
141 o->client_name = strcat(client_name, o->client_name);
142 }
143 r = rados_create2(&rbd->cluster, o->cluster_name,
144 o->client_name, 0);
145 } else
146 r = rados_create(&rbd->cluster, o->client_name);
147
fc5c0345
DG
148 if (r < 0) {
149 log_err("rados_create failed.\n");
150 goto failed_early;
151 }
152
6f9961ac 153 r = rados_conf_read_file(rbd->cluster, NULL);
fc5c0345
DG
154 if (r < 0) {
155 log_err("rados_conf_read_file failed.\n");
156 goto failed_early;
157 }
158
6f9961ac 159 r = rados_connect(rbd->cluster);
fc5c0345
DG
160 if (r < 0) {
161 log_err("rados_connect failed.\n");
162 goto failed_shutdown;
163 }
164
6f9961ac 165 r = rados_ioctx_create(rbd->cluster, o->pool_name, &rbd->io_ctx);
fc5c0345
DG
166 if (r < 0) {
167 log_err("rados_ioctx_create failed.\n");
168 goto failed_shutdown;
169 }
170
6f9961ac 171 r = rbd_open(rbd->io_ctx, o->rbd_name, &rbd->image, NULL /*snap */ );
fc5c0345
DG
172 if (r < 0) {
173 log_err("rbd_open failed.\n");
174 goto failed_open;
175 }
176 return 0;
177
178failed_open:
6f9961ac
JA
179 rados_ioctx_destroy(rbd->io_ctx);
180 rbd->io_ctx = NULL;
fc5c0345 181failed_shutdown:
6f9961ac
JA
182 rados_shutdown(rbd->cluster);
183 rbd->cluster = NULL;
fc5c0345
DG
184failed_early:
185 return 1;
186}
187
6f9961ac 188static void _fio_rbd_disconnect(struct rbd_data *rbd)
fc5c0345 189{
6f9961ac 190 if (!rbd)
fc5c0345
DG
191 return;
192
193 /* shutdown everything */
6f9961ac
JA
194 if (rbd->image) {
195 rbd_close(rbd->image);
196 rbd->image = NULL;
fc5c0345
DG
197 }
198
6f9961ac
JA
199 if (rbd->io_ctx) {
200 rados_ioctx_destroy(rbd->io_ctx);
201 rbd->io_ctx = NULL;
fc5c0345
DG
202 }
203
6f9961ac
JA
204 if (rbd->cluster) {
205 rados_shutdown(rbd->cluster);
206 rbd->cluster = NULL;
fc5c0345
DG
207 }
208}
209
d8b64af2 210static void _fio_rbd_finish_aiocb(rbd_completion_t comp, void *data)
fc5c0345 211{
dbf388d2
JA
212 struct fio_rbd_iou *fri = data;
213 struct io_u *io_u = fri->io_u;
d8b64af2 214 ssize_t ret;
fc5c0345 215
d8b64af2
JA
216 /*
217 * Looks like return value is 0 for success, or < 0 for
218 * a specific error. So we have to assume that it can't do
219 * partial completions.
220 */
221 ret = rbd_aio_get_return_value(fri->completion);
222 if (ret < 0) {
223 io_u->error = ret;
224 io_u->resid = io_u->xfer_buflen;
225 } else
226 io_u->error = 0;
20cf5aab
JD
227
228 fri->io_complete = 1;
d8b64af2 229}
fc5c0345 230
d8b64af2
JA
231static struct io_u *fio_rbd_event(struct thread_data *td, int event)
232{
6f9961ac 233 struct rbd_data *rbd = td->io_ops->data;
fc5c0345 234
6f9961ac 235 return rbd->aio_events[event];
fc5c0345
DG
236}
237
6f9961ac 238static inline int fri_check_complete(struct rbd_data *rbd, struct io_u *io_u,
d8b64af2 239 unsigned int *events)
fc5c0345 240{
d8b64af2 241 struct fio_rbd_iou *fri = io_u->engine_data;
fc5c0345 242
b8ecbef6 243 if (fri->io_complete) {
d8b64af2 244 fri->io_seen = 1;
6f9961ac 245 rbd->aio_events[*events] = io_u;
d8b64af2 246 (*events)++;
fc5c0345 247
d8b64af2
JA
248 rbd_aio_release(fri->completion);
249 return 1;
250 }
fc5c0345 251
d8b64af2 252 return 0;
fc5c0345
DG
253}
254
6f9961ac
JA
255static inline int rbd_io_u_seen(struct io_u *io_u)
256{
257 struct fio_rbd_iou *fri = io_u->engine_data;
258
259 return fri->io_seen;
260}
261
262static void rbd_io_u_wait_complete(struct io_u *io_u)
263{
264 struct fio_rbd_iou *fri = io_u->engine_data;
265
266 rbd_aio_wait_for_complete(fri->completion);
267}
268
269static int rbd_io_u_cmp(const void *p1, const void *p2)
270{
271 const struct io_u **a = (const struct io_u **) p1;
272 const struct io_u **b = (const struct io_u **) p2;
273 uint64_t at, bt;
274
275 at = utime_since_now(&(*a)->start_time);
276 bt = utime_since_now(&(*b)->start_time);
277
278 if (at < bt)
279 return -1;
280 else if (at == bt)
281 return 0;
282 else
283 return 1;
284}
285
d8b64af2
JA
286static int rbd_iter_events(struct thread_data *td, unsigned int *events,
287 unsigned int min_evts, int wait)
82340a9f 288{
6f9961ac 289 struct rbd_data *rbd = td->io_ops->data;
d8b64af2
JA
290 unsigned int this_events = 0;
291 struct io_u *io_u;
6f9961ac 292 int i, sidx;
82340a9f 293
6f9961ac 294 sidx = 0;
d8b64af2 295 io_u_qiter(&td->io_u_all, io_u, i) {
d8b64af2
JA
296 if (!(io_u->flags & IO_U_F_FLIGHT))
297 continue;
6f9961ac 298 if (rbd_io_u_seen(io_u))
d8b64af2 299 continue;
82340a9f 300
6f9961ac 301 if (fri_check_complete(rbd, io_u, events))
d8b64af2 302 this_events++;
6f9961ac
JA
303 else if (wait)
304 rbd->sort_events[sidx++] = io_u;
305 }
82340a9f 306
6f9961ac
JA
307 if (!wait || !sidx)
308 return this_events;
309
310 /*
311 * Sort events, oldest issue first, then wait on as many as we
312 * need in order of age. If we have enough events, stop waiting,
313 * and just check if any of the older ones are done.
314 */
315 if (sidx > 1)
316 qsort(rbd->sort_events, sidx, sizeof(struct io_u *), rbd_io_u_cmp);
317
318 for (i = 0; i < sidx; i++) {
319 io_u = rbd->sort_events[i];
320
321 if (fri_check_complete(rbd, io_u, events)) {
322 this_events++;
323 continue;
d8b64af2 324 }
6f9961ac
JA
325
326 /*
327 * Stop waiting when we have enough, but continue checking
328 * all pending IOs if they are complete.
329 */
d8b64af2 330 if (*events >= min_evts)
6f9961ac
JA
331 continue;
332
333 rbd_io_u_wait_complete(io_u);
334
335 if (fri_check_complete(rbd, io_u, events))
336 this_events++;
d8b64af2 337 }
fc5c0345 338
d8b64af2 339 return this_events;
fc5c0345
DG
340}
341
342static int fio_rbd_getevents(struct thread_data *td, unsigned int min,
1f440ece 343 unsigned int max, const struct timespec *t)
fc5c0345 344{
d8b64af2 345 unsigned int this_events, events = 0;
d7d702c7 346 struct rbd_options *o = td->eo;
d8b64af2 347 int wait = 0;
fc5c0345
DG
348
349 do {
d8b64af2 350 this_events = rbd_iter_events(td, &events, min, wait);
fc5c0345 351
d8b64af2 352 if (events >= min)
fc5c0345 353 break;
d8b64af2
JA
354 if (this_events)
355 continue;
fc5c0345 356
d7d702c7
JA
357 if (!o->busy_poll)
358 wait = 1;
359 else
360 nop;
fc5c0345
DG
361 } while (1);
362
363 return events;
364}
365
366static int fio_rbd_queue(struct thread_data *td, struct io_u *io_u)
367{
6f9961ac 368 struct rbd_data *rbd = td->io_ops->data;
d8b64af2
JA
369 struct fio_rbd_iou *fri = io_u->engine_data;
370 int r = -1;
fc5c0345
DG
371
372 fio_ro_check(td, io_u);
373
d8b64af2 374 fri->io_seen = 0;
b8ecbef6 375 fri->io_complete = 0;
d8b64af2 376
dbf388d2 377 r = rbd_aio_create_completion(fri, _fio_rbd_finish_aiocb,
d8b64af2 378 &fri->completion);
dbf388d2
JA
379 if (r < 0) {
380 log_err("rbd_aio_create_completion failed.\n");
381 goto failed;
382 }
fc5c0345 383
dbf388d2 384 if (io_u->ddir == DDIR_WRITE) {
a4c4c346 385#ifdef CONFIG_RBD_BLKIN
386 blkin_init_trace_info(&fri->info);
387 r = rbd_aio_write_traced(rbd->image, io_u->offset, io_u->xfer_buflen,
388 io_u->xfer_buf, fri->completion, &fri->info);
389#else
6f9961ac
JA
390 r = rbd_aio_write(rbd->image, io_u->offset, io_u->xfer_buflen,
391 io_u->xfer_buf, fri->completion);
a4c4c346 392#endif
fc5c0345
DG
393 if (r < 0) {
394 log_err("rbd_aio_write failed.\n");
dbf388d2 395 goto failed_comp;
fc5c0345
DG
396 }
397
398 } else if (io_u->ddir == DDIR_READ) {
a4c4c346 399#ifdef CONFIG_RBD_BLKIN
400 blkin_init_trace_info(&fri->info);
401 r = rbd_aio_read_traced(rbd->image, io_u->offset, io_u->xfer_buflen,
402 io_u->xfer_buf, fri->completion, &fri->info);
403#else
6f9961ac
JA
404 r = rbd_aio_read(rbd->image, io_u->offset, io_u->xfer_buflen,
405 io_u->xfer_buf, fri->completion);
a4c4c346 406#endif
fc5c0345
DG
407
408 if (r < 0) {
409 log_err("rbd_aio_read failed.\n");
dbf388d2 410 goto failed_comp;
fc5c0345 411 }
dbf388d2 412 } else if (io_u->ddir == DDIR_TRIM) {
6f9961ac
JA
413 r = rbd_aio_discard(rbd->image, io_u->offset,
414 io_u->xfer_buflen, fri->completion);
82340a9f 415 if (r < 0) {
dbf388d2
JA
416 log_err("rbd_aio_discard failed.\n");
417 goto failed_comp;
82340a9f 418 }
dbf388d2 419 } else if (io_u->ddir == DDIR_SYNC) {
6f9961ac 420 r = rbd_aio_flush(rbd->image, fri->completion);
fc5c0345
DG
421 if (r < 0) {
422 log_err("rbd_flush failed.\n");
dbf388d2 423 goto failed_comp;
fc5c0345 424 }
fc5c0345
DG
425 } else {
426 dprint(FD_IO, "%s: Warning: unhandled ddir: %d\n", __func__,
427 io_u->ddir);
dbf388d2 428 goto failed_comp;
fc5c0345
DG
429 }
430
431 return FIO_Q_QUEUED;
dbf388d2
JA
432failed_comp:
433 rbd_aio_release(fri->completion);
fc5c0345
DG
434failed:
435 io_u->error = r;
436 td_verror(td, io_u->error, "xfer");
437 return FIO_Q_COMPLETED;
438}
439
440static int fio_rbd_init(struct thread_data *td)
441{
442 int r;
443
444 r = _fio_rbd_connect(td);
445 if (r) {
446 log_err("fio_rbd_connect failed, return code: %d .\n", r);
447 goto failed;
448 }
449
450 return 0;
451
452failed:
453 return 1;
fc5c0345
DG
454}
455
456static void fio_rbd_cleanup(struct thread_data *td)
457{
6f9961ac 458 struct rbd_data *rbd = td->io_ops->data;
fc5c0345 459
6f9961ac
JA
460 if (rbd) {
461 _fio_rbd_disconnect(rbd);
462 free(rbd->aio_events);
463 free(rbd->sort_events);
464 free(rbd);
fc5c0345 465 }
fc5c0345
DG
466}
467
468static int fio_rbd_setup(struct thread_data *td)
469{
fc5c0345
DG
470 rbd_image_info_t info;
471 struct fio_file *f;
6f9961ac 472 struct rbd_data *rbd = NULL;
fc5c0345 473 int major, minor, extra;
6f9961ac 474 int r;
fc5c0345
DG
475
476 /* log version of librbd. No cluster connection required. */
477 rbd_version(&major, &minor, &extra);
478 log_info("rbd engine: RBD version: %d.%d.%d\n", major, minor, extra);
479
480 /* allocate engine specific structure to deal with librbd. */
6f9961ac 481 r = _fio_setup_rbd_data(td, &rbd);
fc5c0345
DG
482 if (r) {
483 log_err("fio_setup_rbd_data failed.\n");
484 goto cleanup;
485 }
6f9961ac 486 td->io_ops->data = rbd;
fc5c0345 487
d8b64af2
JA
488 /* librbd does not allow us to run first in the main thread and later
489 * in a fork child. It needs to be the same process context all the
490 * time.
fc5c0345
DG
491 */
492 td->o.use_thread = 1;
493
494 /* connect in the main thread to determine to determine
495 * the size of the given RADOS block device. And disconnect
496 * later on.
497 */
498 r = _fio_rbd_connect(td);
499 if (r) {
500 log_err("fio_rbd_connect failed.\n");
501 goto cleanup;
502 }
503
504 /* get size of the RADOS block device */
6f9961ac 505 r = rbd_stat(rbd->image, &info, sizeof(info));
fc5c0345
DG
506 if (r < 0) {
507 log_err("rbd_status failed.\n");
508 goto disconnect;
509 }
510 dprint(FD_IO, "rbd-engine: image size: %lu\n", info.size);
511
512 /* taken from "net" engine. Pretend we deal with files,
513 * even if we do not have any ideas about files.
514 * The size of the RBD is set instead of a artificial file.
515 */
516 if (!td->files_index) {
5903e7b7 517 add_file(td, td->o.filename ? : "rbd", 0, 0);
fc5c0345 518 td->o.nr_files = td->o.nr_files ? : 1;
b53f2c54 519 td->o.open_files++;
fc5c0345
DG
520 }
521 f = td->files[0];
522 f->real_file_size = info.size;
523
524 /* disconnect, then we were only connected to determine
525 * the size of the RBD.
526 */
6f9961ac 527 _fio_rbd_disconnect(rbd);
fc5c0345
DG
528 return 0;
529
530disconnect:
6f9961ac 531 _fio_rbd_disconnect(rbd);
fc5c0345
DG
532cleanup:
533 fio_rbd_cleanup(td);
534 return r;
535}
536
537static int fio_rbd_open(struct thread_data *td, struct fio_file *f)
538{
539 return 0;
540}
541
d9b100fc
JA
542static int fio_rbd_invalidate(struct thread_data *td, struct fio_file *f)
543{
903b2812 544#if defined(CONFIG_RBD_INVAL)
6f9961ac 545 struct rbd_data *rbd = td->io_ops->data;
903b2812 546
6f9961ac 547 return rbd_invalidate_cache(rbd->image);
903b2812 548#else
d9b100fc 549 return 0;
903b2812 550#endif
d9b100fc
JA
551}
552
fc5c0345
DG
553static void fio_rbd_io_u_free(struct thread_data *td, struct io_u *io_u)
554{
d8b64af2 555 struct fio_rbd_iou *fri = io_u->engine_data;
fc5c0345 556
d8b64af2 557 if (fri) {
fc5c0345 558 io_u->engine_data = NULL;
d8b64af2 559 free(fri);
fc5c0345
DG
560 }
561}
562
563static int fio_rbd_io_u_init(struct thread_data *td, struct io_u *io_u)
564{
d8b64af2 565 struct fio_rbd_iou *fri;
fc5c0345 566
d8b64af2
JA
567 fri = calloc(1, sizeof(*fri));
568 fri->io_u = io_u;
569 io_u->engine_data = fri;
fc5c0345
DG
570 return 0;
571}
572
10aa136b 573static struct ioengine_ops ioengine = {
d9b100fc
JA
574 .name = "rbd",
575 .version = FIO_IOOPS_VERSION,
576 .setup = fio_rbd_setup,
577 .init = fio_rbd_init,
578 .queue = fio_rbd_queue,
579 .getevents = fio_rbd_getevents,
580 .event = fio_rbd_event,
581 .cleanup = fio_rbd_cleanup,
582 .open_file = fio_rbd_open,
583 .invalidate = fio_rbd_invalidate,
584 .options = options,
585 .io_u_init = fio_rbd_io_u_init,
586 .io_u_free = fio_rbd_io_u_free,
587 .option_struct_size = sizeof(struct rbd_options),
fc5c0345
DG
588};
589
590static void fio_init fio_rbd_register(void)
591{
592 register_ioengine(&ioengine);
593}
594
595static void fio_exit fio_rbd_unregister(void)
596{
597 unregister_ioengine(&ioengine);
598}