363be65bea58ded83e37eb7b867283d6aada67f8
[fio.git] / engines / rbd.c
1 /*
2  * rbd engine
3  *
4  * IO engine using Ceph's librbd to test RADOS Block Devices.
5  *
6  */
7
8 #include <rbd/librbd.h>
9
10 #include "../fio.h"
11
12 struct fio_rbd_iou {
13         struct io_u *io_u;
14         rbd_completion_t completion;
15         int io_seen;
16 };
17
18 struct rbd_data {
19         rados_t cluster;
20         rados_ioctx_t io_ctx;
21         rbd_image_t image;
22         struct io_u **aio_events;
23 };
24
25 struct rbd_options {
26         struct thread_data *td;
27         char *rbd_name;
28         char *pool_name;
29         char *client_name;
30         int busy_poll;
31 };
32
33 static struct fio_option options[] = {
34         {
35                 .name           = "rbdname",
36                 .lname          = "rbd engine rbdname",
37                 .type           = FIO_OPT_STR_STORE,
38                 .help           = "RBD name for RBD engine",
39                 .off1           = offsetof(struct rbd_options, rbd_name),
40                 .category       = FIO_OPT_C_ENGINE,
41                 .group          = FIO_OPT_G_RBD,
42         },
43         {
44                 .name           = "pool",
45                 .lname          = "rbd engine pool",
46                 .type           = FIO_OPT_STR_STORE,
47                 .help           = "Name of the pool hosting the RBD for the RBD engine",
48                 .off1           = offsetof(struct rbd_options, pool_name),
49                 .category       = FIO_OPT_C_ENGINE,
50                 .group          = FIO_OPT_G_RBD,
51         },
52         {
53                 .name           = "clientname",
54                 .lname          = "rbd engine clientname",
55                 .type           = FIO_OPT_STR_STORE,
56                 .help           = "Name of the ceph client to access the RBD for the RBD engine",
57                 .off1           = offsetof(struct rbd_options, client_name),
58                 .category       = FIO_OPT_C_ENGINE,
59                 .group          = FIO_OPT_G_RBD,
60         },
61         {
62                 .name           = "busy_poll",
63                 .lname          = "Busy poll",
64                 .type           = FIO_OPT_BOOL,
65                 .help           = "Busy poll for completions instead of sleeping",
66                 .off1           = offsetof(struct rbd_options, busy_poll),
67                 .def            = "0",
68                 .category       = FIO_OPT_C_ENGINE,
69                 .group          = FIO_OPT_G_RBD,
70         },
71         {
72                 .name = NULL,
73         },
74 };
75
76 static int _fio_setup_rbd_data(struct thread_data *td,
77                                struct rbd_data **rbd_data_ptr)
78 {
79         struct rbd_data *rbd_data;
80
81         if (td->io_ops->data)
82                 return 0;
83
84         rbd_data = malloc(sizeof(struct rbd_data));
85         if (!rbd_data)
86                 goto failed;
87
88         memset(rbd_data, 0, sizeof(struct rbd_data));
89
90         rbd_data->aio_events = malloc(td->o.iodepth * sizeof(struct io_u *));
91         if (!rbd_data->aio_events)
92                 goto failed;
93
94         memset(rbd_data->aio_events, 0, td->o.iodepth * sizeof(struct io_u *));
95
96         *rbd_data_ptr = rbd_data;
97
98         return 0;
99
100 failed:
101         if (rbd_data)
102                 free(rbd_data);
103         return 1;
104
105 }
106
107 static int _fio_rbd_connect(struct thread_data *td)
108 {
109         struct rbd_data *rbd_data = td->io_ops->data;
110         struct rbd_options *o = td->eo;
111         int r;
112
113         r = rados_create(&rbd_data->cluster, o->client_name);
114         if (r < 0) {
115                 log_err("rados_create failed.\n");
116                 goto failed_early;
117         }
118
119         r = rados_conf_read_file(rbd_data->cluster, NULL);
120         if (r < 0) {
121                 log_err("rados_conf_read_file failed.\n");
122                 goto failed_early;
123         }
124
125         r = rados_connect(rbd_data->cluster);
126         if (r < 0) {
127                 log_err("rados_connect failed.\n");
128                 goto failed_shutdown;
129         }
130
131         r = rados_ioctx_create(rbd_data->cluster, o->pool_name,
132                                &rbd_data->io_ctx);
133         if (r < 0) {
134                 log_err("rados_ioctx_create failed.\n");
135                 goto failed_shutdown;
136         }
137
138         r = rbd_open(rbd_data->io_ctx, o->rbd_name, &rbd_data->image,
139                      NULL /*snap */ );
140         if (r < 0) {
141                 log_err("rbd_open failed.\n");
142                 goto failed_open;
143         }
144         return 0;
145
146 failed_open:
147         rados_ioctx_destroy(rbd_data->io_ctx);
148         rbd_data->io_ctx = NULL;
149 failed_shutdown:
150         rados_shutdown(rbd_data->cluster);
151         rbd_data->cluster = NULL;
152 failed_early:
153         return 1;
154 }
155
156 static void _fio_rbd_disconnect(struct rbd_data *rbd_data)
157 {
158         if (!rbd_data)
159                 return;
160
161         /* shutdown everything */
162         if (rbd_data->image) {
163                 rbd_close(rbd_data->image);
164                 rbd_data->image = NULL;
165         }
166
167         if (rbd_data->io_ctx) {
168                 rados_ioctx_destroy(rbd_data->io_ctx);
169                 rbd_data->io_ctx = NULL;
170         }
171
172         if (rbd_data->cluster) {
173                 rados_shutdown(rbd_data->cluster);
174                 rbd_data->cluster = NULL;
175         }
176 }
177
178 static void _fio_rbd_finish_aiocb(rbd_completion_t comp, void *data)
179 {
180         struct fio_rbd_iou *fri = data;
181         struct io_u *io_u = fri->io_u;
182         ssize_t ret;
183
184         /*
185          * Looks like return value is 0 for success, or < 0 for
186          * a specific error. So we have to assume that it can't do
187          * partial completions.
188          */
189         ret = rbd_aio_get_return_value(fri->completion);
190         if (ret < 0) {
191                 io_u->error = ret;
192                 io_u->resid = io_u->xfer_buflen;
193         } else
194                 io_u->error = 0;
195 }
196
197 static struct io_u *fio_rbd_event(struct thread_data *td, int event)
198 {
199         struct rbd_data *rbd_data = td->io_ops->data;
200
201         return rbd_data->aio_events[event];
202 }
203
204 static inline int fri_check_complete(struct rbd_data *rbd_data,
205                                      struct io_u *io_u,
206                                      unsigned int *events)
207 {
208         struct fio_rbd_iou *fri = io_u->engine_data;
209
210         if (rbd_aio_is_complete(fri->completion)) {
211                 fri->io_seen = 1;
212                 rbd_data->aio_events[*events] = io_u;
213                 (*events)++;
214
215                 rbd_aio_release(fri->completion);
216                 return 1;
217         }
218
219         return 0;
220 }
221
222 static int rbd_iter_events(struct thread_data *td, unsigned int *events,
223                            unsigned int min_evts, int wait)
224 {
225         struct rbd_data *rbd_data = td->io_ops->data;
226         unsigned int this_events = 0;
227         struct io_u *io_u;
228         int i;
229
230         io_u_qiter(&td->io_u_all, io_u, i) {
231                 struct fio_rbd_iou *fri = io_u->engine_data;
232
233                 if (!(io_u->flags & IO_U_F_FLIGHT))
234                         continue;
235                 if (fri->io_seen)
236                         continue;
237
238                 if (fri_check_complete(rbd_data, io_u, events))
239                         this_events++;
240                 else if (wait) {
241                         rbd_aio_wait_for_complete(fri->completion);
242
243                         if (fri_check_complete(rbd_data, io_u, events))
244                                 this_events++;
245                 }
246                 if (*events >= min_evts)
247                         break;
248         }
249
250         return this_events;
251 }
252
253 static int fio_rbd_getevents(struct thread_data *td, unsigned int min,
254                              unsigned int max, const struct timespec *t)
255 {
256         unsigned int this_events, events = 0;
257         struct rbd_options *o = td->eo;
258         int wait = 0;
259
260         do {
261                 this_events = rbd_iter_events(td, &events, min, wait);
262
263                 if (events >= min)
264                         break;
265                 if (this_events)
266                         continue;
267
268                 if (!o->busy_poll)
269                         wait = 1;
270                 else
271                         nop;
272         } while (1);
273
274         return events;
275 }
276
277 static int fio_rbd_queue(struct thread_data *td, struct io_u *io_u)
278 {
279         struct rbd_data *rbd_data = td->io_ops->data;
280         struct fio_rbd_iou *fri = io_u->engine_data;
281         int r = -1;
282
283         fio_ro_check(td, io_u);
284
285         fri->io_seen = 0;
286
287         r = rbd_aio_create_completion(fri, _fio_rbd_finish_aiocb,
288                                                 &fri->completion);
289         if (r < 0) {
290                 log_err("rbd_aio_create_completion failed.\n");
291                 goto failed;
292         }
293
294         if (io_u->ddir == DDIR_WRITE) {
295                 r = rbd_aio_write(rbd_data->image, io_u->offset,
296                                   io_u->xfer_buflen, io_u->xfer_buf,
297                                   fri->completion);
298                 if (r < 0) {
299                         log_err("rbd_aio_write failed.\n");
300                         goto failed_comp;
301                 }
302
303         } else if (io_u->ddir == DDIR_READ) {
304                 r = rbd_aio_read(rbd_data->image, io_u->offset,
305                                  io_u->xfer_buflen, io_u->xfer_buf,
306                                  fri->completion);
307
308                 if (r < 0) {
309                         log_err("rbd_aio_read failed.\n");
310                         goto failed_comp;
311                 }
312         } else if (io_u->ddir == DDIR_TRIM) {
313                 r = rbd_aio_discard(rbd_data->image, io_u->offset,
314                                  io_u->xfer_buflen, fri->completion);
315                 if (r < 0) {
316                         log_err("rbd_aio_discard failed.\n");
317                         goto failed_comp;
318                 }
319         } else if (io_u->ddir == DDIR_SYNC) {
320                 r = rbd_aio_flush(rbd_data->image, fri->completion);
321                 if (r < 0) {
322                         log_err("rbd_flush failed.\n");
323                         goto failed_comp;
324                 }
325         } else {
326                 dprint(FD_IO, "%s: Warning: unhandled ddir: %d\n", __func__,
327                        io_u->ddir);
328                 goto failed_comp;
329         }
330
331         return FIO_Q_QUEUED;
332 failed_comp:
333         rbd_aio_release(fri->completion);
334 failed:
335         io_u->error = r;
336         td_verror(td, io_u->error, "xfer");
337         return FIO_Q_COMPLETED;
338 }
339
340 static int fio_rbd_init(struct thread_data *td)
341 {
342         int r;
343
344         r = _fio_rbd_connect(td);
345         if (r) {
346                 log_err("fio_rbd_connect failed, return code: %d .\n", r);
347                 goto failed;
348         }
349
350         return 0;
351
352 failed:
353         return 1;
354 }
355
356 static void fio_rbd_cleanup(struct thread_data *td)
357 {
358         struct rbd_data *rbd_data = td->io_ops->data;
359
360         if (rbd_data) {
361                 _fio_rbd_disconnect(rbd_data);
362                 free(rbd_data->aio_events);
363                 free(rbd_data);
364         }
365
366 }
367
368 static int fio_rbd_setup(struct thread_data *td)
369 {
370         int r = 0;
371         rbd_image_info_t info;
372         struct fio_file *f;
373         struct rbd_data *rbd_data = NULL;
374         int major, minor, extra;
375
376         /* log version of librbd. No cluster connection required. */
377         rbd_version(&major, &minor, &extra);
378         log_info("rbd engine: RBD version: %d.%d.%d\n", major, minor, extra);
379
380         /* allocate engine specific structure to deal with librbd. */
381         r = _fio_setup_rbd_data(td, &rbd_data);
382         if (r) {
383                 log_err("fio_setup_rbd_data failed.\n");
384                 goto cleanup;
385         }
386         td->io_ops->data = rbd_data;
387
388         /* librbd does not allow us to run first in the main thread and later
389          * in a fork child. It needs to be the same process context all the
390          * time. 
391          */
392         td->o.use_thread = 1;
393
394         /* connect in the main thread to determine to determine
395          * the size of the given RADOS block device. And disconnect
396          * later on.
397          */
398         r = _fio_rbd_connect(td);
399         if (r) {
400                 log_err("fio_rbd_connect failed.\n");
401                 goto cleanup;
402         }
403
404         /* get size of the RADOS block device */
405         r = rbd_stat(rbd_data->image, &info, sizeof(info));
406         if (r < 0) {
407                 log_err("rbd_status failed.\n");
408                 goto disconnect;
409         }
410         dprint(FD_IO, "rbd-engine: image size: %lu\n", info.size);
411
412         /* taken from "net" engine. Pretend we deal with files,
413          * even if we do not have any ideas about files.
414          * The size of the RBD is set instead of a artificial file.
415          */
416         if (!td->files_index) {
417                 add_file(td, td->o.filename ? : "rbd", 0, 0);
418                 td->o.nr_files = td->o.nr_files ? : 1;
419                 td->o.open_files++;
420         }
421         f = td->files[0];
422         f->real_file_size = info.size;
423
424         /* disconnect, then we were only connected to determine
425          * the size of the RBD.
426          */
427         _fio_rbd_disconnect(rbd_data);
428         return 0;
429
430 disconnect:
431         _fio_rbd_disconnect(rbd_data);
432 cleanup:
433         fio_rbd_cleanup(td);
434         return r;
435 }
436
437 static int fio_rbd_open(struct thread_data *td, struct fio_file *f)
438 {
439         return 0;
440 }
441
442 static int fio_rbd_invalidate(struct thread_data *td, struct fio_file *f)
443 {
444 #if defined(CONFIG_RBD_INVAL)
445         struct rbd_data *rbd_data = td->io_ops->data;
446
447         return rbd_invalidate_cache(rbd_data->image);
448 #else
449         return 0;
450 #endif
451 }
452
453 static void fio_rbd_io_u_free(struct thread_data *td, struct io_u *io_u)
454 {
455         struct fio_rbd_iou *fri = io_u->engine_data;
456
457         if (fri) {
458                 io_u->engine_data = NULL;
459                 free(fri);
460         }
461 }
462
463 static int fio_rbd_io_u_init(struct thread_data *td, struct io_u *io_u)
464 {
465         struct fio_rbd_iou *fri;
466
467         fri = calloc(1, sizeof(*fri));
468         fri->io_u = io_u;
469         io_u->engine_data = fri;
470         return 0;
471 }
472
473 static struct ioengine_ops ioengine = {
474         .name                   = "rbd",
475         .version                = FIO_IOOPS_VERSION,
476         .setup                  = fio_rbd_setup,
477         .init                   = fio_rbd_init,
478         .queue                  = fio_rbd_queue,
479         .getevents              = fio_rbd_getevents,
480         .event                  = fio_rbd_event,
481         .cleanup                = fio_rbd_cleanup,
482         .open_file              = fio_rbd_open,
483         .invalidate             = fio_rbd_invalidate,
484         .options                = options,
485         .io_u_init              = fio_rbd_io_u_init,
486         .io_u_free              = fio_rbd_io_u_free,
487         .option_struct_size     = sizeof(struct rbd_options),
488 };
489
490 static void fio_init fio_rbd_register(void)
491 {
492         register_ioengine(&ioengine);
493 }
494
495 static void fio_exit fio_rbd_unregister(void)
496 {
497         unregister_ioengine(&ioengine);
498 }