HOWTO: update to include iolog replay
[fio.git] / engines / rbd.c
1 /*
2  * rbd engine
3  *
4  * IO engine using Ceph's librbd to test RADOS Block Devices.
5  *
6  */
7
8 #include <rbd/librbd.h>
9
10 #include "../fio.h"
11 #include "../optgroup.h"
12 #ifdef CONFIG_RBD_BLKIN
13 #include <zipkin_c.h>
14 #endif
15
16 struct fio_rbd_iou {
17         struct io_u *io_u;
18         rbd_completion_t completion;
19         int io_seen;
20         int io_complete;
21 #ifdef CONFIG_RBD_BLKIN
22         struct blkin_trace_info info;
23 #endif
24 };
25
26 struct rbd_data {
27         rados_t cluster;
28         rados_ioctx_t io_ctx;
29         rbd_image_t image;
30         struct io_u **aio_events;
31         struct io_u **sort_events;
32 };
33
34 struct rbd_options {
35         void *pad;
36         char *cluster_name;
37         char *rbd_name;
38         char *pool_name;
39         char *client_name;
40         int busy_poll;
41 };
42
43 static struct fio_option options[] = {
44         {
45                 .name           = "clustername",
46                 .lname          = "ceph cluster name",
47                 .type           = FIO_OPT_STR_STORE,
48                 .help           = "Cluster name for ceph",
49                 .off1           = offsetof(struct rbd_options, cluster_name),
50                 .category       = FIO_OPT_C_ENGINE,
51                 .group          = FIO_OPT_G_RBD,
52         },
53         {
54                 .name           = "rbdname",
55                 .lname          = "rbd engine rbdname",
56                 .type           = FIO_OPT_STR_STORE,
57                 .help           = "RBD name for RBD engine",
58                 .off1           = offsetof(struct rbd_options, rbd_name),
59                 .category       = FIO_OPT_C_ENGINE,
60                 .group          = FIO_OPT_G_RBD,
61         },
62         {
63                 .name           = "pool",
64                 .lname          = "rbd engine pool",
65                 .type           = FIO_OPT_STR_STORE,
66                 .help           = "Name of the pool hosting the RBD for the RBD engine",
67                 .off1           = offsetof(struct rbd_options, pool_name),
68                 .category       = FIO_OPT_C_ENGINE,
69                 .group          = FIO_OPT_G_RBD,
70         },
71         {
72                 .name           = "clientname",
73                 .lname          = "rbd engine clientname",
74                 .type           = FIO_OPT_STR_STORE,
75                 .help           = "Name of the ceph client to access the RBD for the RBD engine",
76                 .off1           = offsetof(struct rbd_options, client_name),
77                 .category       = FIO_OPT_C_ENGINE,
78                 .group          = FIO_OPT_G_RBD,
79         },
80         {
81                 .name           = "busy_poll",
82                 .lname          = "Busy poll",
83                 .type           = FIO_OPT_BOOL,
84                 .help           = "Busy poll for completions instead of sleeping",
85                 .off1           = offsetof(struct rbd_options, busy_poll),
86                 .def            = "0",
87                 .category       = FIO_OPT_C_ENGINE,
88                 .group          = FIO_OPT_G_RBD,
89         },
90         {
91                 .name = NULL,
92         },
93 };
94
95 static int _fio_setup_rbd_data(struct thread_data *td,
96                                struct rbd_data **rbd_data_ptr)
97 {
98         struct rbd_data *rbd;
99
100         if (td->io_ops_data)
101                 return 0;
102
103         rbd = calloc(1, sizeof(struct rbd_data));
104         if (!rbd)
105                 goto failed;
106
107         rbd->aio_events = calloc(td->o.iodepth, sizeof(struct io_u *));
108         if (!rbd->aio_events)
109                 goto failed;
110
111         rbd->sort_events = calloc(td->o.iodepth, sizeof(struct io_u *));
112         if (!rbd->sort_events)
113                 goto failed;
114
115         *rbd_data_ptr = rbd;
116         return 0;
117
118 failed:
119         if (rbd) {
120                 if (rbd->aio_events) 
121                         free(rbd->aio_events);
122                 if (rbd->sort_events)
123                         free(rbd->sort_events);
124                 free(rbd);
125         }
126         return 1;
127
128 }
129
130 static int _fio_rbd_connect(struct thread_data *td)
131 {
132         struct rbd_data *rbd = td->io_ops_data;
133         struct rbd_options *o = td->eo;
134         int r;
135
136         if (o->cluster_name) {
137                 char *client_name = NULL; 
138
139                 /*
140                  * If we specify cluser name, the rados_create2
141                  * will not assume 'client.'. name is considered
142                  * as a full type.id namestr
143                  */
144                 if (o->client_name) {
145                         if (!index(o->client_name, '.')) {
146                                 client_name = calloc(1, strlen("client.") +
147                                                     strlen(o->client_name) + 1);
148                                 strcat(client_name, "client.");
149                                 strcat(client_name, o->client_name);
150                         } else {
151                                 client_name = o->client_name;
152                         }
153                 }
154
155                 r = rados_create2(&rbd->cluster, o->cluster_name,
156                                  client_name, 0);
157
158                 if (client_name && !index(o->client_name, '.'))
159                         free(client_name);
160         } else
161                 r = rados_create(&rbd->cluster, o->client_name);
162         
163         if (r < 0) {
164                 log_err("rados_create failed.\n");
165                 goto failed_early;
166         }
167
168         r = rados_conf_read_file(rbd->cluster, NULL);
169         if (r < 0) {
170                 log_err("rados_conf_read_file failed.\n");
171                 goto failed_early;
172         }
173
174         r = rados_connect(rbd->cluster);
175         if (r < 0) {
176                 log_err("rados_connect failed.\n");
177                 goto failed_shutdown;
178         }
179
180         r = rados_ioctx_create(rbd->cluster, o->pool_name, &rbd->io_ctx);
181         if (r < 0) {
182                 log_err("rados_ioctx_create failed.\n");
183                 goto failed_shutdown;
184         }
185
186         r = rbd_open(rbd->io_ctx, o->rbd_name, &rbd->image, NULL /*snap */ );
187         if (r < 0) {
188                 log_err("rbd_open failed.\n");
189                 goto failed_open;
190         }
191         return 0;
192
193 failed_open:
194         rados_ioctx_destroy(rbd->io_ctx);
195         rbd->io_ctx = NULL;
196 failed_shutdown:
197         rados_shutdown(rbd->cluster);
198         rbd->cluster = NULL;
199 failed_early:
200         return 1;
201 }
202
203 static void _fio_rbd_disconnect(struct rbd_data *rbd)
204 {
205         if (!rbd)
206                 return;
207
208         /* shutdown everything */
209         if (rbd->image) {
210                 rbd_close(rbd->image);
211                 rbd->image = NULL;
212         }
213
214         if (rbd->io_ctx) {
215                 rados_ioctx_destroy(rbd->io_ctx);
216                 rbd->io_ctx = NULL;
217         }
218
219         if (rbd->cluster) {
220                 rados_shutdown(rbd->cluster);
221                 rbd->cluster = NULL;
222         }
223 }
224
225 static void _fio_rbd_finish_aiocb(rbd_completion_t comp, void *data)
226 {
227         struct fio_rbd_iou *fri = data;
228         struct io_u *io_u = fri->io_u;
229         ssize_t ret;
230
231         /*
232          * Looks like return value is 0 for success, or < 0 for
233          * a specific error. So we have to assume that it can't do
234          * partial completions.
235          */
236         ret = rbd_aio_get_return_value(fri->completion);
237         if (ret < 0) {
238                 io_u->error = ret;
239                 io_u->resid = io_u->xfer_buflen;
240         } else
241                 io_u->error = 0;
242
243         fri->io_complete = 1;
244 }
245
246 static struct io_u *fio_rbd_event(struct thread_data *td, int event)
247 {
248         struct rbd_data *rbd = td->io_ops_data;
249
250         return rbd->aio_events[event];
251 }
252
253 static inline int fri_check_complete(struct rbd_data *rbd, struct io_u *io_u,
254                                      unsigned int *events)
255 {
256         struct fio_rbd_iou *fri = io_u->engine_data;
257
258         if (fri->io_complete) {
259                 fri->io_seen = 1;
260                 rbd->aio_events[*events] = io_u;
261                 (*events)++;
262
263                 rbd_aio_release(fri->completion);
264                 return 1;
265         }
266
267         return 0;
268 }
269
270 static inline int rbd_io_u_seen(struct io_u *io_u)
271 {
272         struct fio_rbd_iou *fri = io_u->engine_data;
273
274         return fri->io_seen;
275 }
276
277 static void rbd_io_u_wait_complete(struct io_u *io_u)
278 {
279         struct fio_rbd_iou *fri = io_u->engine_data;
280
281         rbd_aio_wait_for_complete(fri->completion);
282 }
283
284 static int rbd_io_u_cmp(const void *p1, const void *p2)
285 {
286         const struct io_u **a = (const struct io_u **) p1;
287         const struct io_u **b = (const struct io_u **) p2;
288         uint64_t at, bt;
289
290         at = utime_since_now(&(*a)->start_time);
291         bt = utime_since_now(&(*b)->start_time);
292
293         if (at < bt)
294                 return -1;
295         else if (at == bt)
296                 return 0;
297         else
298                 return 1;
299 }
300
301 static int rbd_iter_events(struct thread_data *td, unsigned int *events,
302                            unsigned int min_evts, int wait)
303 {
304         struct rbd_data *rbd = td->io_ops_data;
305         unsigned int this_events = 0;
306         struct io_u *io_u;
307         int i, sidx;
308
309         sidx = 0;
310         io_u_qiter(&td->io_u_all, io_u, i) {
311                 if (!(io_u->flags & IO_U_F_FLIGHT))
312                         continue;
313                 if (rbd_io_u_seen(io_u))
314                         continue;
315
316                 if (fri_check_complete(rbd, io_u, events))
317                         this_events++;
318                 else if (wait)
319                         rbd->sort_events[sidx++] = io_u;
320         }
321
322         if (!wait || !sidx)
323                 return this_events;
324
325         /*
326          * Sort events, oldest issue first, then wait on as many as we
327          * need in order of age. If we have enough events, stop waiting,
328          * and just check if any of the older ones are done.
329          */
330         if (sidx > 1)
331                 qsort(rbd->sort_events, sidx, sizeof(struct io_u *), rbd_io_u_cmp);
332
333         for (i = 0; i < sidx; i++) {
334                 io_u = rbd->sort_events[i];
335
336                 if (fri_check_complete(rbd, io_u, events)) {
337                         this_events++;
338                         continue;
339                 }
340
341                 /*
342                  * Stop waiting when we have enough, but continue checking
343                  * all pending IOs if they are complete.
344                  */
345                 if (*events >= min_evts)
346                         continue;
347
348                 rbd_io_u_wait_complete(io_u);
349
350                 if (fri_check_complete(rbd, io_u, events))
351                         this_events++;
352         }
353
354         return this_events;
355 }
356
357 static int fio_rbd_getevents(struct thread_data *td, unsigned int min,
358                              unsigned int max, const struct timespec *t)
359 {
360         unsigned int this_events, events = 0;
361         struct rbd_options *o = td->eo;
362         int wait = 0;
363
364         do {
365                 this_events = rbd_iter_events(td, &events, min, wait);
366
367                 if (events >= min)
368                         break;
369                 if (this_events)
370                         continue;
371
372                 if (!o->busy_poll)
373                         wait = 1;
374                 else
375                         nop;
376         } while (1);
377
378         return events;
379 }
380
381 static int fio_rbd_queue(struct thread_data *td, struct io_u *io_u)
382 {
383         struct rbd_data *rbd = td->io_ops_data;
384         struct fio_rbd_iou *fri = io_u->engine_data;
385         int r = -1;
386
387         fio_ro_check(td, io_u);
388
389         fri->io_seen = 0;
390         fri->io_complete = 0;
391
392         r = rbd_aio_create_completion(fri, _fio_rbd_finish_aiocb,
393                                                 &fri->completion);
394         if (r < 0) {
395                 log_err("rbd_aio_create_completion failed.\n");
396                 goto failed;
397         }
398
399         if (io_u->ddir == DDIR_WRITE) {
400 #ifdef CONFIG_RBD_BLKIN
401                 blkin_init_trace_info(&fri->info);
402                 r = rbd_aio_write_traced(rbd->image, io_u->offset, io_u->xfer_buflen,
403                                          io_u->xfer_buf, fri->completion, &fri->info);
404 #else
405                 r = rbd_aio_write(rbd->image, io_u->offset, io_u->xfer_buflen,
406                                          io_u->xfer_buf, fri->completion);
407 #endif
408                 if (r < 0) {
409                         log_err("rbd_aio_write failed.\n");
410                         goto failed_comp;
411                 }
412
413         } else if (io_u->ddir == DDIR_READ) {
414 #ifdef CONFIG_RBD_BLKIN
415                 blkin_init_trace_info(&fri->info);
416                 r = rbd_aio_read_traced(rbd->image, io_u->offset, io_u->xfer_buflen,
417                                         io_u->xfer_buf, fri->completion, &fri->info);
418 #else
419                 r = rbd_aio_read(rbd->image, io_u->offset, io_u->xfer_buflen,
420                                         io_u->xfer_buf, fri->completion);
421 #endif
422
423                 if (r < 0) {
424                         log_err("rbd_aio_read failed.\n");
425                         goto failed_comp;
426                 }
427         } else if (io_u->ddir == DDIR_TRIM) {
428                 r = rbd_aio_discard(rbd->image, io_u->offset,
429                                         io_u->xfer_buflen, fri->completion);
430                 if (r < 0) {
431                         log_err("rbd_aio_discard failed.\n");
432                         goto failed_comp;
433                 }
434         } else if (io_u->ddir == DDIR_SYNC) {
435                 r = rbd_aio_flush(rbd->image, fri->completion);
436                 if (r < 0) {
437                         log_err("rbd_flush failed.\n");
438                         goto failed_comp;
439                 }
440         } else {
441                 dprint(FD_IO, "%s: Warning: unhandled ddir: %d\n", __func__,
442                        io_u->ddir);
443                 goto failed_comp;
444         }
445
446         return FIO_Q_QUEUED;
447 failed_comp:
448         rbd_aio_release(fri->completion);
449 failed:
450         io_u->error = r;
451         td_verror(td, io_u->error, "xfer");
452         return FIO_Q_COMPLETED;
453 }
454
455 static int fio_rbd_init(struct thread_data *td)
456 {
457         int r;
458
459         r = _fio_rbd_connect(td);
460         if (r) {
461                 log_err("fio_rbd_connect failed, return code: %d .\n", r);
462                 goto failed;
463         }
464
465         return 0;
466
467 failed:
468         return 1;
469 }
470
471 static void fio_rbd_cleanup(struct thread_data *td)
472 {
473         struct rbd_data *rbd = td->io_ops_data;
474
475         if (rbd) {
476                 _fio_rbd_disconnect(rbd);
477                 free(rbd->aio_events);
478                 free(rbd->sort_events);
479                 free(rbd);
480         }
481 }
482
483 static int fio_rbd_setup(struct thread_data *td)
484 {
485         rbd_image_info_t info;
486         struct fio_file *f;
487         struct rbd_data *rbd = NULL;
488         int major, minor, extra;
489         int r;
490
491         /* log version of librbd. No cluster connection required. */
492         rbd_version(&major, &minor, &extra);
493         log_info("rbd engine: RBD version: %d.%d.%d\n", major, minor, extra);
494
495         /* allocate engine specific structure to deal with librbd. */
496         r = _fio_setup_rbd_data(td, &rbd);
497         if (r) {
498                 log_err("fio_setup_rbd_data failed.\n");
499                 goto cleanup;
500         }
501         td->io_ops_data = rbd;
502
503         /* librbd does not allow us to run first in the main thread and later
504          * in a fork child. It needs to be the same process context all the
505          * time. 
506          */
507         td->o.use_thread = 1;
508
509         /* connect in the main thread to determine to determine
510          * the size of the given RADOS block device. And disconnect
511          * later on.
512          */
513         r = _fio_rbd_connect(td);
514         if (r) {
515                 log_err("fio_rbd_connect failed.\n");
516                 goto cleanup;
517         }
518
519         /* get size of the RADOS block device */
520         r = rbd_stat(rbd->image, &info, sizeof(info));
521         if (r < 0) {
522                 log_err("rbd_status failed.\n");
523                 goto disconnect;
524         }
525         dprint(FD_IO, "rbd-engine: image size: %lu\n", info.size);
526
527         /* taken from "net" engine. Pretend we deal with files,
528          * even if we do not have any ideas about files.
529          * The size of the RBD is set instead of a artificial file.
530          */
531         if (!td->files_index) {
532                 add_file(td, td->o.filename ? : "rbd", 0, 0);
533                 td->o.nr_files = td->o.nr_files ? : 1;
534                 td->o.open_files++;
535         }
536         f = td->files[0];
537         f->real_file_size = info.size;
538
539         /* disconnect, then we were only connected to determine
540          * the size of the RBD.
541          */
542         _fio_rbd_disconnect(rbd);
543         return 0;
544
545 disconnect:
546         _fio_rbd_disconnect(rbd);
547 cleanup:
548         fio_rbd_cleanup(td);
549         return r;
550 }
551
552 static int fio_rbd_open(struct thread_data *td, struct fio_file *f)
553 {
554         return 0;
555 }
556
557 static int fio_rbd_invalidate(struct thread_data *td, struct fio_file *f)
558 {
559 #if defined(CONFIG_RBD_INVAL)
560         struct rbd_data *rbd = td->io_ops_data;
561
562         return rbd_invalidate_cache(rbd->image);
563 #else
564         return 0;
565 #endif
566 }
567
568 static void fio_rbd_io_u_free(struct thread_data *td, struct io_u *io_u)
569 {
570         struct fio_rbd_iou *fri = io_u->engine_data;
571
572         if (fri) {
573                 io_u->engine_data = NULL;
574                 free(fri);
575         }
576 }
577
578 static int fio_rbd_io_u_init(struct thread_data *td, struct io_u *io_u)
579 {
580         struct fio_rbd_iou *fri;
581
582         fri = calloc(1, sizeof(*fri));
583         fri->io_u = io_u;
584         io_u->engine_data = fri;
585         return 0;
586 }
587
588 static struct ioengine_ops ioengine = {
589         .name                   = "rbd",
590         .version                = FIO_IOOPS_VERSION,
591         .setup                  = fio_rbd_setup,
592         .init                   = fio_rbd_init,
593         .queue                  = fio_rbd_queue,
594         .getevents              = fio_rbd_getevents,
595         .event                  = fio_rbd_event,
596         .cleanup                = fio_rbd_cleanup,
597         .open_file              = fio_rbd_open,
598         .invalidate             = fio_rbd_invalidate,
599         .options                = options,
600         .io_u_init              = fio_rbd_io_u_init,
601         .io_u_free              = fio_rbd_io_u_free,
602         .option_struct_size     = sizeof(struct rbd_options),
603 };
604
605 static void fio_init fio_rbd_register(void)
606 {
607         register_ioengine(&ioengine);
608 }
609
610 static void fio_exit fio_rbd_unregister(void)
611 {
612         unregister_ioengine(&ioengine);
613 }