87ed360f7c26a25fb401aac833b9fe05d0c288f4
[fio.git] / engines / rbd.c
1 /*
2  * rbd engine
3  *
4  * IO engine using Ceph's librbd to test RADOS Block Devices.
5  *
6  */
7
8 #include <rbd/librbd.h>
9
10 #include "../fio.h"
11 #include "../optgroup.h"
12
13 struct fio_rbd_iou {
14         struct io_u *io_u;
15         rbd_completion_t completion;
16         int io_seen;
17         int io_complete;
18 };
19
20 struct rbd_data {
21         rados_t cluster;
22         rados_ioctx_t io_ctx;
23         rbd_image_t image;
24         struct io_u **aio_events;
25         struct io_u **sort_events;
26 };
27
28 struct rbd_options {
29         void *pad;
30         char *cluster_name;
31         char *rbd_name;
32         char *pool_name;
33         char *client_name;
34         int busy_poll;
35 };
36
37 static struct fio_option options[] = {
38         {
39                 .name           = "clustername",
40                 .lname          = "ceph cluster name",
41                 .type           = FIO_OPT_STR_STORE,
42                 .help           = "Cluster name for ceph",
43                 .off1           = offsetof(struct rbd_options, cluster_name),
44                 .category       = FIO_OPT_C_ENGINE,
45                 .group          = FIO_OPT_G_RBD,
46         },
47         {
48                 .name           = "rbdname",
49                 .lname          = "rbd engine rbdname",
50                 .type           = FIO_OPT_STR_STORE,
51                 .help           = "RBD name for RBD engine",
52                 .off1           = offsetof(struct rbd_options, rbd_name),
53                 .category       = FIO_OPT_C_ENGINE,
54                 .group          = FIO_OPT_G_RBD,
55         },
56         {
57                 .name           = "pool",
58                 .lname          = "rbd engine pool",
59                 .type           = FIO_OPT_STR_STORE,
60                 .help           = "Name of the pool hosting the RBD for the RBD engine",
61                 .off1           = offsetof(struct rbd_options, pool_name),
62                 .category       = FIO_OPT_C_ENGINE,
63                 .group          = FIO_OPT_G_RBD,
64         },
65         {
66                 .name           = "clientname",
67                 .lname          = "rbd engine clientname",
68                 .type           = FIO_OPT_STR_STORE,
69                 .help           = "Name of the ceph client to access the RBD for the RBD engine",
70                 .off1           = offsetof(struct rbd_options, client_name),
71                 .category       = FIO_OPT_C_ENGINE,
72                 .group          = FIO_OPT_G_RBD,
73         },
74         {
75                 .name           = "busy_poll",
76                 .lname          = "Busy poll",
77                 .type           = FIO_OPT_BOOL,
78                 .help           = "Busy poll for completions instead of sleeping",
79                 .off1           = offsetof(struct rbd_options, busy_poll),
80                 .def            = "0",
81                 .category       = FIO_OPT_C_ENGINE,
82                 .group          = FIO_OPT_G_RBD,
83         },
84         {
85                 .name = NULL,
86         },
87 };
88
89 static int _fio_setup_rbd_data(struct thread_data *td,
90                                struct rbd_data **rbd_data_ptr)
91 {
92         struct rbd_data *rbd;
93
94         if (td->io_ops->data)
95                 return 0;
96
97         rbd = calloc(1, sizeof(struct rbd_data));
98         if (!rbd)
99                 goto failed;
100
101         rbd->aio_events = calloc(td->o.iodepth, sizeof(struct io_u *));
102         if (!rbd->aio_events)
103                 goto failed;
104
105         rbd->sort_events = calloc(td->o.iodepth, sizeof(struct io_u *));
106         if (!rbd->sort_events)
107                 goto failed;
108
109         *rbd_data_ptr = rbd;
110         return 0;
111
112 failed:
113         if (rbd)
114                 free(rbd);
115         return 1;
116
117 }
118
119 static int _fio_rbd_connect(struct thread_data *td)
120 {
121         struct rbd_data *rbd = td->io_ops->data;
122         struct rbd_options *o = td->eo;
123         int r;
124
125         if (o->cluster_name) {
126                 char *client_name = NULL; 
127
128                 /*
129                  * If we specify cluser name, the rados_creat2
130                  * will not assume 'client.'. name is considered
131                  * as a full type.id namestr
132                  */
133                 if (!index(o->client_name, '.')) {
134                         client_name = calloc(1, strlen("client.") +
135                                                 strlen(o->client_name) + 1);
136                         strcat(client_name, "client.");
137                         o->client_name = strcat(client_name, o->client_name);
138                 }
139                 r = rados_create2(&rbd->cluster, o->cluster_name,
140                                         o->client_name, 0);
141         } else
142                 r = rados_create(&rbd->cluster, o->client_name);
143         
144         if (r < 0) {
145                 log_err("rados_create failed.\n");
146                 goto failed_early;
147         }
148
149         r = rados_conf_read_file(rbd->cluster, NULL);
150         if (r < 0) {
151                 log_err("rados_conf_read_file failed.\n");
152                 goto failed_early;
153         }
154
155         r = rados_connect(rbd->cluster);
156         if (r < 0) {
157                 log_err("rados_connect failed.\n");
158                 goto failed_shutdown;
159         }
160
161         r = rados_ioctx_create(rbd->cluster, o->pool_name, &rbd->io_ctx);
162         if (r < 0) {
163                 log_err("rados_ioctx_create failed.\n");
164                 goto failed_shutdown;
165         }
166
167         r = rbd_open(rbd->io_ctx, o->rbd_name, &rbd->image, NULL /*snap */ );
168         if (r < 0) {
169                 log_err("rbd_open failed.\n");
170                 goto failed_open;
171         }
172         return 0;
173
174 failed_open:
175         rados_ioctx_destroy(rbd->io_ctx);
176         rbd->io_ctx = NULL;
177 failed_shutdown:
178         rados_shutdown(rbd->cluster);
179         rbd->cluster = NULL;
180 failed_early:
181         return 1;
182 }
183
184 static void _fio_rbd_disconnect(struct rbd_data *rbd)
185 {
186         if (!rbd)
187                 return;
188
189         /* shutdown everything */
190         if (rbd->image) {
191                 rbd_close(rbd->image);
192                 rbd->image = NULL;
193         }
194
195         if (rbd->io_ctx) {
196                 rados_ioctx_destroy(rbd->io_ctx);
197                 rbd->io_ctx = NULL;
198         }
199
200         if (rbd->cluster) {
201                 rados_shutdown(rbd->cluster);
202                 rbd->cluster = NULL;
203         }
204 }
205
206 static void _fio_rbd_finish_aiocb(rbd_completion_t comp, void *data)
207 {
208         struct fio_rbd_iou *fri = data;
209         struct io_u *io_u = fri->io_u;
210         ssize_t ret;
211
212         /*
213          * Looks like return value is 0 for success, or < 0 for
214          * a specific error. So we have to assume that it can't do
215          * partial completions.
216          */
217         ret = rbd_aio_get_return_value(fri->completion);
218         if (ret < 0) {
219                 io_u->error = ret;
220                 io_u->resid = io_u->xfer_buflen;
221         } else
222                 io_u->error = 0;
223
224         fri->io_complete = 1;
225 }
226
227 static struct io_u *fio_rbd_event(struct thread_data *td, int event)
228 {
229         struct rbd_data *rbd = td->io_ops->data;
230
231         return rbd->aio_events[event];
232 }
233
234 static inline int fri_check_complete(struct rbd_data *rbd, struct io_u *io_u,
235                                      unsigned int *events)
236 {
237         struct fio_rbd_iou *fri = io_u->engine_data;
238
239         if (fri->io_complete) {
240                 fri->io_seen = 1;
241                 rbd->aio_events[*events] = io_u;
242                 (*events)++;
243
244                 rbd_aio_release(fri->completion);
245                 return 1;
246         }
247
248         return 0;
249 }
250
251 static inline int rbd_io_u_seen(struct io_u *io_u)
252 {
253         struct fio_rbd_iou *fri = io_u->engine_data;
254
255         return fri->io_seen;
256 }
257
258 static void rbd_io_u_wait_complete(struct io_u *io_u)
259 {
260         struct fio_rbd_iou *fri = io_u->engine_data;
261
262         rbd_aio_wait_for_complete(fri->completion);
263 }
264
265 static int rbd_io_u_cmp(const void *p1, const void *p2)
266 {
267         const struct io_u **a = (const struct io_u **) p1;
268         const struct io_u **b = (const struct io_u **) p2;
269         uint64_t at, bt;
270
271         at = utime_since_now(&(*a)->start_time);
272         bt = utime_since_now(&(*b)->start_time);
273
274         if (at < bt)
275                 return -1;
276         else if (at == bt)
277                 return 0;
278         else
279                 return 1;
280 }
281
282 static int rbd_iter_events(struct thread_data *td, unsigned int *events,
283                            unsigned int min_evts, int wait)
284 {
285         struct rbd_data *rbd = td->io_ops->data;
286         unsigned int this_events = 0;
287         struct io_u *io_u;
288         int i, sidx;
289
290         sidx = 0;
291         io_u_qiter(&td->io_u_all, io_u, i) {
292                 if (!(io_u->flags & IO_U_F_FLIGHT))
293                         continue;
294                 if (rbd_io_u_seen(io_u))
295                         continue;
296
297                 if (fri_check_complete(rbd, io_u, events))
298                         this_events++;
299                 else if (wait)
300                         rbd->sort_events[sidx++] = io_u;
301         }
302
303         if (!wait || !sidx)
304                 return this_events;
305
306         /*
307          * Sort events, oldest issue first, then wait on as many as we
308          * need in order of age. If we have enough events, stop waiting,
309          * and just check if any of the older ones are done.
310          */
311         if (sidx > 1)
312                 qsort(rbd->sort_events, sidx, sizeof(struct io_u *), rbd_io_u_cmp);
313
314         for (i = 0; i < sidx; i++) {
315                 io_u = rbd->sort_events[i];
316
317                 if (fri_check_complete(rbd, io_u, events)) {
318                         this_events++;
319                         continue;
320                 }
321
322                 /*
323                  * Stop waiting when we have enough, but continue checking
324                  * all pending IOs if they are complete.
325                  */
326                 if (*events >= min_evts)
327                         continue;
328
329                 rbd_io_u_wait_complete(io_u);
330
331                 if (fri_check_complete(rbd, io_u, events))
332                         this_events++;
333         }
334
335         return this_events;
336 }
337
338 static int fio_rbd_getevents(struct thread_data *td, unsigned int min,
339                              unsigned int max, const struct timespec *t)
340 {
341         unsigned int this_events, events = 0;
342         struct rbd_options *o = td->eo;
343         int wait = 0;
344
345         do {
346                 this_events = rbd_iter_events(td, &events, min, wait);
347
348                 if (events >= min)
349                         break;
350                 if (this_events)
351                         continue;
352
353                 if (!o->busy_poll)
354                         wait = 1;
355                 else
356                         nop;
357         } while (1);
358
359         return events;
360 }
361
362 static int fio_rbd_queue(struct thread_data *td, struct io_u *io_u)
363 {
364         struct rbd_data *rbd = td->io_ops->data;
365         struct fio_rbd_iou *fri = io_u->engine_data;
366         int r = -1;
367
368         fio_ro_check(td, io_u);
369
370         fri->io_seen = 0;
371         fri->io_complete = 0;
372
373         r = rbd_aio_create_completion(fri, _fio_rbd_finish_aiocb,
374                                                 &fri->completion);
375         if (r < 0) {
376                 log_err("rbd_aio_create_completion failed.\n");
377                 goto failed;
378         }
379
380         if (io_u->ddir == DDIR_WRITE) {
381                 r = rbd_aio_write(rbd->image, io_u->offset, io_u->xfer_buflen,
382                                          io_u->xfer_buf, fri->completion);
383                 if (r < 0) {
384                         log_err("rbd_aio_write failed.\n");
385                         goto failed_comp;
386                 }
387
388         } else if (io_u->ddir == DDIR_READ) {
389                 r = rbd_aio_read(rbd->image, io_u->offset, io_u->xfer_buflen,
390                                         io_u->xfer_buf, fri->completion);
391
392                 if (r < 0) {
393                         log_err("rbd_aio_read failed.\n");
394                         goto failed_comp;
395                 }
396         } else if (io_u->ddir == DDIR_TRIM) {
397                 r = rbd_aio_discard(rbd->image, io_u->offset,
398                                         io_u->xfer_buflen, fri->completion);
399                 if (r < 0) {
400                         log_err("rbd_aio_discard failed.\n");
401                         goto failed_comp;
402                 }
403         } else if (io_u->ddir == DDIR_SYNC) {
404                 r = rbd_aio_flush(rbd->image, fri->completion);
405                 if (r < 0) {
406                         log_err("rbd_flush failed.\n");
407                         goto failed_comp;
408                 }
409         } else {
410                 dprint(FD_IO, "%s: Warning: unhandled ddir: %d\n", __func__,
411                        io_u->ddir);
412                 goto failed_comp;
413         }
414
415         return FIO_Q_QUEUED;
416 failed_comp:
417         rbd_aio_release(fri->completion);
418 failed:
419         io_u->error = r;
420         td_verror(td, io_u->error, "xfer");
421         return FIO_Q_COMPLETED;
422 }
423
424 static int fio_rbd_init(struct thread_data *td)
425 {
426         int r;
427
428         r = _fio_rbd_connect(td);
429         if (r) {
430                 log_err("fio_rbd_connect failed, return code: %d .\n", r);
431                 goto failed;
432         }
433
434         return 0;
435
436 failed:
437         return 1;
438 }
439
440 static void fio_rbd_cleanup(struct thread_data *td)
441 {
442         struct rbd_data *rbd = td->io_ops->data;
443
444         if (rbd) {
445                 _fio_rbd_disconnect(rbd);
446                 free(rbd->aio_events);
447                 free(rbd->sort_events);
448                 free(rbd);
449         }
450 }
451
452 static int fio_rbd_setup(struct thread_data *td)
453 {
454         rbd_image_info_t info;
455         struct fio_file *f;
456         struct rbd_data *rbd = NULL;
457         int major, minor, extra;
458         int r;
459
460         /* log version of librbd. No cluster connection required. */
461         rbd_version(&major, &minor, &extra);
462         log_info("rbd engine: RBD version: %d.%d.%d\n", major, minor, extra);
463
464         /* allocate engine specific structure to deal with librbd. */
465         r = _fio_setup_rbd_data(td, &rbd);
466         if (r) {
467                 log_err("fio_setup_rbd_data failed.\n");
468                 goto cleanup;
469         }
470         td->io_ops->data = rbd;
471
472         /* librbd does not allow us to run first in the main thread and later
473          * in a fork child. It needs to be the same process context all the
474          * time. 
475          */
476         td->o.use_thread = 1;
477
478         /* connect in the main thread to determine to determine
479          * the size of the given RADOS block device. And disconnect
480          * later on.
481          */
482         r = _fio_rbd_connect(td);
483         if (r) {
484                 log_err("fio_rbd_connect failed.\n");
485                 goto cleanup;
486         }
487
488         /* get size of the RADOS block device */
489         r = rbd_stat(rbd->image, &info, sizeof(info));
490         if (r < 0) {
491                 log_err("rbd_status failed.\n");
492                 goto disconnect;
493         }
494         dprint(FD_IO, "rbd-engine: image size: %lu\n", info.size);
495
496         /* taken from "net" engine. Pretend we deal with files,
497          * even if we do not have any ideas about files.
498          * The size of the RBD is set instead of a artificial file.
499          */
500         if (!td->files_index) {
501                 add_file(td, td->o.filename ? : "rbd", 0, 0);
502                 td->o.nr_files = td->o.nr_files ? : 1;
503                 td->o.open_files++;
504         }
505         f = td->files[0];
506         f->real_file_size = info.size;
507
508         /* disconnect, then we were only connected to determine
509          * the size of the RBD.
510          */
511         _fio_rbd_disconnect(rbd);
512         return 0;
513
514 disconnect:
515         _fio_rbd_disconnect(rbd);
516 cleanup:
517         fio_rbd_cleanup(td);
518         return r;
519 }
520
521 static int fio_rbd_open(struct thread_data *td, struct fio_file *f)
522 {
523         return 0;
524 }
525
526 static int fio_rbd_invalidate(struct thread_data *td, struct fio_file *f)
527 {
528 #if defined(CONFIG_RBD_INVAL)
529         struct rbd_data *rbd = td->io_ops->data;
530
531         return rbd_invalidate_cache(rbd->image);
532 #else
533         return 0;
534 #endif
535 }
536
537 static void fio_rbd_io_u_free(struct thread_data *td, struct io_u *io_u)
538 {
539         struct fio_rbd_iou *fri = io_u->engine_data;
540
541         if (fri) {
542                 io_u->engine_data = NULL;
543                 free(fri);
544         }
545 }
546
547 static int fio_rbd_io_u_init(struct thread_data *td, struct io_u *io_u)
548 {
549         struct fio_rbd_iou *fri;
550
551         fri = calloc(1, sizeof(*fri));
552         fri->io_u = io_u;
553         io_u->engine_data = fri;
554         return 0;
555 }
556
557 static struct ioengine_ops ioengine = {
558         .name                   = "rbd",
559         .version                = FIO_IOOPS_VERSION,
560         .setup                  = fio_rbd_setup,
561         .init                   = fio_rbd_init,
562         .queue                  = fio_rbd_queue,
563         .getevents              = fio_rbd_getevents,
564         .event                  = fio_rbd_event,
565         .cleanup                = fio_rbd_cleanup,
566         .open_file              = fio_rbd_open,
567         .invalidate             = fio_rbd_invalidate,
568         .options                = options,
569         .io_u_init              = fio_rbd_io_u_init,
570         .io_u_free              = fio_rbd_io_u_free,
571         .option_struct_size     = sizeof(struct rbd_options),
572 };
573
574 static void fio_init fio_rbd_register(void)
575 {
576         register_ioengine(&ioengine);
577 }
578
579 static void fio_exit fio_rbd_unregister(void)
580 {
581         unregister_ioengine(&ioengine);
582 }