options: split out option grouping code
[fio.git] / engines / rbd.c
1 /*
2  * rbd engine
3  *
4  * IO engine using Ceph's librbd to test RADOS Block Devices.
5  *
6  */
7
8 #include <rbd/librbd.h>
9
10 #include "../fio.h"
11 #include "../optgroup.h"
12
13 struct fio_rbd_iou {
14         struct io_u *io_u;
15         rbd_completion_t completion;
16         int io_seen;
17         int io_complete;
18 };
19
20 struct rbd_data {
21         rados_t cluster;
22         rados_ioctx_t io_ctx;
23         rbd_image_t image;
24         struct io_u **aio_events;
25         struct io_u **sort_events;
26 };
27
28 struct rbd_options {
29         void *pad;
30         char *rbd_name;
31         char *pool_name;
32         char *client_name;
33         int busy_poll;
34 };
35
36 static struct fio_option options[] = {
37         {
38                 .name           = "rbdname",
39                 .lname          = "rbd engine rbdname",
40                 .type           = FIO_OPT_STR_STORE,
41                 .help           = "RBD name for RBD engine",
42                 .off1           = offsetof(struct rbd_options, rbd_name),
43                 .category       = FIO_OPT_C_ENGINE,
44                 .group          = FIO_OPT_G_RBD,
45         },
46         {
47                 .name           = "pool",
48                 .lname          = "rbd engine pool",
49                 .type           = FIO_OPT_STR_STORE,
50                 .help           = "Name of the pool hosting the RBD for the RBD engine",
51                 .off1           = offsetof(struct rbd_options, pool_name),
52                 .category       = FIO_OPT_C_ENGINE,
53                 .group          = FIO_OPT_G_RBD,
54         },
55         {
56                 .name           = "clientname",
57                 .lname          = "rbd engine clientname",
58                 .type           = FIO_OPT_STR_STORE,
59                 .help           = "Name of the ceph client to access the RBD for the RBD engine",
60                 .off1           = offsetof(struct rbd_options, client_name),
61                 .category       = FIO_OPT_C_ENGINE,
62                 .group          = FIO_OPT_G_RBD,
63         },
64         {
65                 .name           = "busy_poll",
66                 .lname          = "Busy poll",
67                 .type           = FIO_OPT_BOOL,
68                 .help           = "Busy poll for completions instead of sleeping",
69                 .off1           = offsetof(struct rbd_options, busy_poll),
70                 .def            = "0",
71                 .category       = FIO_OPT_C_ENGINE,
72                 .group          = FIO_OPT_G_RBD,
73         },
74         {
75                 .name = NULL,
76         },
77 };
78
79 static int _fio_setup_rbd_data(struct thread_data *td,
80                                struct rbd_data **rbd_data_ptr)
81 {
82         struct rbd_data *rbd;
83
84         if (td->io_ops->data)
85                 return 0;
86
87         rbd = calloc(1, sizeof(struct rbd_data));
88         if (!rbd)
89                 goto failed;
90
91         rbd->aio_events = calloc(td->o.iodepth, sizeof(struct io_u *));
92         if (!rbd->aio_events)
93                 goto failed;
94
95         rbd->sort_events = calloc(td->o.iodepth, sizeof(struct io_u *));
96         if (!rbd->sort_events)
97                 goto failed;
98
99         *rbd_data_ptr = rbd;
100         return 0;
101
102 failed:
103         if (rbd)
104                 free(rbd);
105         return 1;
106
107 }
108
109 static int _fio_rbd_connect(struct thread_data *td)
110 {
111         struct rbd_data *rbd = td->io_ops->data;
112         struct rbd_options *o = td->eo;
113         int r;
114
115         r = rados_create(&rbd->cluster, o->client_name);
116         if (r < 0) {
117                 log_err("rados_create failed.\n");
118                 goto failed_early;
119         }
120
121         r = rados_conf_read_file(rbd->cluster, NULL);
122         if (r < 0) {
123                 log_err("rados_conf_read_file failed.\n");
124                 goto failed_early;
125         }
126
127         r = rados_connect(rbd->cluster);
128         if (r < 0) {
129                 log_err("rados_connect failed.\n");
130                 goto failed_shutdown;
131         }
132
133         r = rados_ioctx_create(rbd->cluster, o->pool_name, &rbd->io_ctx);
134         if (r < 0) {
135                 log_err("rados_ioctx_create failed.\n");
136                 goto failed_shutdown;
137         }
138
139         r = rbd_open(rbd->io_ctx, o->rbd_name, &rbd->image, NULL /*snap */ );
140         if (r < 0) {
141                 log_err("rbd_open failed.\n");
142                 goto failed_open;
143         }
144         return 0;
145
146 failed_open:
147         rados_ioctx_destroy(rbd->io_ctx);
148         rbd->io_ctx = NULL;
149 failed_shutdown:
150         rados_shutdown(rbd->cluster);
151         rbd->cluster = NULL;
152 failed_early:
153         return 1;
154 }
155
156 static void _fio_rbd_disconnect(struct rbd_data *rbd)
157 {
158         if (!rbd)
159                 return;
160
161         /* shutdown everything */
162         if (rbd->image) {
163                 rbd_close(rbd->image);
164                 rbd->image = NULL;
165         }
166
167         if (rbd->io_ctx) {
168                 rados_ioctx_destroy(rbd->io_ctx);
169                 rbd->io_ctx = NULL;
170         }
171
172         if (rbd->cluster) {
173                 rados_shutdown(rbd->cluster);
174                 rbd->cluster = NULL;
175         }
176 }
177
178 static void _fio_rbd_finish_aiocb(rbd_completion_t comp, void *data)
179 {
180         struct fio_rbd_iou *fri = data;
181         struct io_u *io_u = fri->io_u;
182         ssize_t ret;
183
184         /*
185          * Looks like return value is 0 for success, or < 0 for
186          * a specific error. So we have to assume that it can't do
187          * partial completions.
188          */
189         ret = rbd_aio_get_return_value(fri->completion);
190         if (ret < 0) {
191                 io_u->error = ret;
192                 io_u->resid = io_u->xfer_buflen;
193         } else
194                 io_u->error = 0;
195
196         fri->io_complete = 1;
197 }
198
199 static struct io_u *fio_rbd_event(struct thread_data *td, int event)
200 {
201         struct rbd_data *rbd = td->io_ops->data;
202
203         return rbd->aio_events[event];
204 }
205
206 static inline int fri_check_complete(struct rbd_data *rbd, struct io_u *io_u,
207                                      unsigned int *events)
208 {
209         struct fio_rbd_iou *fri = io_u->engine_data;
210
211         if (fri->io_complete) {
212                 fri->io_seen = 1;
213                 rbd->aio_events[*events] = io_u;
214                 (*events)++;
215
216                 rbd_aio_release(fri->completion);
217                 return 1;
218         }
219
220         return 0;
221 }
222
223 static inline int rbd_io_u_seen(struct io_u *io_u)
224 {
225         struct fio_rbd_iou *fri = io_u->engine_data;
226
227         return fri->io_seen;
228 }
229
230 static void rbd_io_u_wait_complete(struct io_u *io_u)
231 {
232         struct fio_rbd_iou *fri = io_u->engine_data;
233
234         rbd_aio_wait_for_complete(fri->completion);
235 }
236
237 static int rbd_io_u_cmp(const void *p1, const void *p2)
238 {
239         const struct io_u **a = (const struct io_u **) p1;
240         const struct io_u **b = (const struct io_u **) p2;
241         uint64_t at, bt;
242
243         at = utime_since_now(&(*a)->start_time);
244         bt = utime_since_now(&(*b)->start_time);
245
246         if (at < bt)
247                 return -1;
248         else if (at == bt)
249                 return 0;
250         else
251                 return 1;
252 }
253
254 static int rbd_iter_events(struct thread_data *td, unsigned int *events,
255                            unsigned int min_evts, int wait)
256 {
257         struct rbd_data *rbd = td->io_ops->data;
258         unsigned int this_events = 0;
259         struct io_u *io_u;
260         int i, sidx;
261
262         sidx = 0;
263         io_u_qiter(&td->io_u_all, io_u, i) {
264                 if (!(io_u->flags & IO_U_F_FLIGHT))
265                         continue;
266                 if (rbd_io_u_seen(io_u))
267                         continue;
268
269                 if (fri_check_complete(rbd, io_u, events))
270                         this_events++;
271                 else if (wait)
272                         rbd->sort_events[sidx++] = io_u;
273         }
274
275         if (!wait || !sidx)
276                 return this_events;
277
278         /*
279          * Sort events, oldest issue first, then wait on as many as we
280          * need in order of age. If we have enough events, stop waiting,
281          * and just check if any of the older ones are done.
282          */
283         if (sidx > 1)
284                 qsort(rbd->sort_events, sidx, sizeof(struct io_u *), rbd_io_u_cmp);
285
286         for (i = 0; i < sidx; i++) {
287                 io_u = rbd->sort_events[i];
288
289                 if (fri_check_complete(rbd, io_u, events)) {
290                         this_events++;
291                         continue;
292                 }
293
294                 /*
295                  * Stop waiting when we have enough, but continue checking
296                  * all pending IOs if they are complete.
297                  */
298                 if (*events >= min_evts)
299                         continue;
300
301                 rbd_io_u_wait_complete(io_u);
302
303                 if (fri_check_complete(rbd, io_u, events))
304                         this_events++;
305         }
306
307         return this_events;
308 }
309
310 static int fio_rbd_getevents(struct thread_data *td, unsigned int min,
311                              unsigned int max, const struct timespec *t)
312 {
313         unsigned int this_events, events = 0;
314         struct rbd_options *o = td->eo;
315         int wait = 0;
316
317         do {
318                 this_events = rbd_iter_events(td, &events, min, wait);
319
320                 if (events >= min)
321                         break;
322                 if (this_events)
323                         continue;
324
325                 if (!o->busy_poll)
326                         wait = 1;
327                 else
328                         nop;
329         } while (1);
330
331         return events;
332 }
333
334 static int fio_rbd_queue(struct thread_data *td, struct io_u *io_u)
335 {
336         struct rbd_data *rbd = td->io_ops->data;
337         struct fio_rbd_iou *fri = io_u->engine_data;
338         int r = -1;
339
340         fio_ro_check(td, io_u);
341
342         fri->io_seen = 0;
343         fri->io_complete = 0;
344
345         r = rbd_aio_create_completion(fri, _fio_rbd_finish_aiocb,
346                                                 &fri->completion);
347         if (r < 0) {
348                 log_err("rbd_aio_create_completion failed.\n");
349                 goto failed;
350         }
351
352         if (io_u->ddir == DDIR_WRITE) {
353                 r = rbd_aio_write(rbd->image, io_u->offset, io_u->xfer_buflen,
354                                          io_u->xfer_buf, fri->completion);
355                 if (r < 0) {
356                         log_err("rbd_aio_write failed.\n");
357                         goto failed_comp;
358                 }
359
360         } else if (io_u->ddir == DDIR_READ) {
361                 r = rbd_aio_read(rbd->image, io_u->offset, io_u->xfer_buflen,
362                                         io_u->xfer_buf, fri->completion);
363
364                 if (r < 0) {
365                         log_err("rbd_aio_read failed.\n");
366                         goto failed_comp;
367                 }
368         } else if (io_u->ddir == DDIR_TRIM) {
369                 r = rbd_aio_discard(rbd->image, io_u->offset,
370                                         io_u->xfer_buflen, fri->completion);
371                 if (r < 0) {
372                         log_err("rbd_aio_discard failed.\n");
373                         goto failed_comp;
374                 }
375         } else if (io_u->ddir == DDIR_SYNC) {
376                 r = rbd_aio_flush(rbd->image, fri->completion);
377                 if (r < 0) {
378                         log_err("rbd_flush failed.\n");
379                         goto failed_comp;
380                 }
381         } else {
382                 dprint(FD_IO, "%s: Warning: unhandled ddir: %d\n", __func__,
383                        io_u->ddir);
384                 goto failed_comp;
385         }
386
387         return FIO_Q_QUEUED;
388 failed_comp:
389         rbd_aio_release(fri->completion);
390 failed:
391         io_u->error = r;
392         td_verror(td, io_u->error, "xfer");
393         return FIO_Q_COMPLETED;
394 }
395
396 static int fio_rbd_init(struct thread_data *td)
397 {
398         int r;
399
400         r = _fio_rbd_connect(td);
401         if (r) {
402                 log_err("fio_rbd_connect failed, return code: %d .\n", r);
403                 goto failed;
404         }
405
406         return 0;
407
408 failed:
409         return 1;
410 }
411
412 static void fio_rbd_cleanup(struct thread_data *td)
413 {
414         struct rbd_data *rbd = td->io_ops->data;
415
416         if (rbd) {
417                 _fio_rbd_disconnect(rbd);
418                 free(rbd->aio_events);
419                 free(rbd->sort_events);
420                 free(rbd);
421         }
422 }
423
424 static int fio_rbd_setup(struct thread_data *td)
425 {
426         rbd_image_info_t info;
427         struct fio_file *f;
428         struct rbd_data *rbd = NULL;
429         int major, minor, extra;
430         int r;
431
432         /* log version of librbd. No cluster connection required. */
433         rbd_version(&major, &minor, &extra);
434         log_info("rbd engine: RBD version: %d.%d.%d\n", major, minor, extra);
435
436         /* allocate engine specific structure to deal with librbd. */
437         r = _fio_setup_rbd_data(td, &rbd);
438         if (r) {
439                 log_err("fio_setup_rbd_data failed.\n");
440                 goto cleanup;
441         }
442         td->io_ops->data = rbd;
443
444         /* librbd does not allow us to run first in the main thread and later
445          * in a fork child. It needs to be the same process context all the
446          * time. 
447          */
448         td->o.use_thread = 1;
449
450         /* connect in the main thread to determine to determine
451          * the size of the given RADOS block device. And disconnect
452          * later on.
453          */
454         r = _fio_rbd_connect(td);
455         if (r) {
456                 log_err("fio_rbd_connect failed.\n");
457                 goto cleanup;
458         }
459
460         /* get size of the RADOS block device */
461         r = rbd_stat(rbd->image, &info, sizeof(info));
462         if (r < 0) {
463                 log_err("rbd_status failed.\n");
464                 goto disconnect;
465         }
466         dprint(FD_IO, "rbd-engine: image size: %lu\n", info.size);
467
468         /* taken from "net" engine. Pretend we deal with files,
469          * even if we do not have any ideas about files.
470          * The size of the RBD is set instead of a artificial file.
471          */
472         if (!td->files_index) {
473                 add_file(td, td->o.filename ? : "rbd", 0, 0);
474                 td->o.nr_files = td->o.nr_files ? : 1;
475                 td->o.open_files++;
476         }
477         f = td->files[0];
478         f->real_file_size = info.size;
479
480         /* disconnect, then we were only connected to determine
481          * the size of the RBD.
482          */
483         _fio_rbd_disconnect(rbd);
484         return 0;
485
486 disconnect:
487         _fio_rbd_disconnect(rbd);
488 cleanup:
489         fio_rbd_cleanup(td);
490         return r;
491 }
492
493 static int fio_rbd_open(struct thread_data *td, struct fio_file *f)
494 {
495         return 0;
496 }
497
498 static int fio_rbd_invalidate(struct thread_data *td, struct fio_file *f)
499 {
500 #if defined(CONFIG_RBD_INVAL)
501         struct rbd_data *rbd = td->io_ops->data;
502
503         return rbd_invalidate_cache(rbd->image);
504 #else
505         return 0;
506 #endif
507 }
508
509 static void fio_rbd_io_u_free(struct thread_data *td, struct io_u *io_u)
510 {
511         struct fio_rbd_iou *fri = io_u->engine_data;
512
513         if (fri) {
514                 io_u->engine_data = NULL;
515                 free(fri);
516         }
517 }
518
519 static int fio_rbd_io_u_init(struct thread_data *td, struct io_u *io_u)
520 {
521         struct fio_rbd_iou *fri;
522
523         fri = calloc(1, sizeof(*fri));
524         fri->io_u = io_u;
525         io_u->engine_data = fri;
526         return 0;
527 }
528
529 static struct ioengine_ops ioengine = {
530         .name                   = "rbd",
531         .version                = FIO_IOOPS_VERSION,
532         .setup                  = fio_rbd_setup,
533         .init                   = fio_rbd_init,
534         .queue                  = fio_rbd_queue,
535         .getevents              = fio_rbd_getevents,
536         .event                  = fio_rbd_event,
537         .cleanup                = fio_rbd_cleanup,
538         .open_file              = fio_rbd_open,
539         .invalidate             = fio_rbd_invalidate,
540         .options                = options,
541         .io_u_init              = fio_rbd_io_u_init,
542         .io_u_free              = fio_rbd_io_u_free,
543         .option_struct_size     = sizeof(struct rbd_options),
544 };
545
546 static void fio_init fio_rbd_register(void)
547 {
548         register_ioengine(&ioengine);
549 }
550
551 static void fio_exit fio_rbd_unregister(void)
552 {
553         unregister_ioengine(&ioengine);
554 }