02fe655202ca6320617493e994df80693b6a3cf9
[fio.git] / engines / rbd.c
1 /*
2  * rbd engine
3  *
4  * IO engine using Ceph's librbd to test RADOS Block Devices.
5  *
6  */
7
8 #include <rbd/librbd.h>
9
10 #include "../fio.h"
11 #include "../optgroup.h"
12 #ifdef CONFIG_RBD_BLKIN
13 #include <zipkin_c.h>
14 #endif
15
16 #ifdef CONFIG_RBD_POLL
17 /* add for poll */
18 #include <poll.h>
19 #include <sys/eventfd.h>
20 #endif
21
22 struct fio_rbd_iou {
23         struct io_u *io_u;
24         rbd_completion_t completion;
25         int io_seen;
26         int io_complete;
27 #ifdef CONFIG_RBD_BLKIN
28         struct blkin_trace_info info;
29 #endif
30 };
31
32 struct rbd_data {
33         rados_t cluster;
34         rados_ioctx_t io_ctx;
35         rbd_image_t image;
36         struct io_u **aio_events;
37         struct io_u **sort_events;
38 #ifdef CONFIG_RBD_POLL
39         int fd; /* add for poll */
40 #endif
41 };
42
43 struct rbd_options {
44         void *pad;
45         char *cluster_name;
46         char *rbd_name;
47         char *pool_name;
48         char *client_name;
49         int busy_poll;
50 };
51
52 static struct fio_option options[] = {
53         {
54                 .name           = "clustername",
55                 .lname          = "ceph cluster name",
56                 .type           = FIO_OPT_STR_STORE,
57                 .help           = "Cluster name for ceph",
58                 .off1           = offsetof(struct rbd_options, cluster_name),
59                 .category       = FIO_OPT_C_ENGINE,
60                 .group          = FIO_OPT_G_RBD,
61         },
62         {
63                 .name           = "rbdname",
64                 .lname          = "rbd engine rbdname",
65                 .type           = FIO_OPT_STR_STORE,
66                 .help           = "RBD name for RBD engine",
67                 .off1           = offsetof(struct rbd_options, rbd_name),
68                 .category       = FIO_OPT_C_ENGINE,
69                 .group          = FIO_OPT_G_RBD,
70         },
71         {
72                 .name           = "pool",
73                 .lname          = "rbd engine pool",
74                 .type           = FIO_OPT_STR_STORE,
75                 .help           = "Name of the pool hosting the RBD for the RBD engine",
76                 .off1           = offsetof(struct rbd_options, pool_name),
77                 .category       = FIO_OPT_C_ENGINE,
78                 .group          = FIO_OPT_G_RBD,
79         },
80         {
81                 .name           = "clientname",
82                 .lname          = "rbd engine clientname",
83                 .type           = FIO_OPT_STR_STORE,
84                 .help           = "Name of the ceph client to access the RBD for the RBD engine",
85                 .off1           = offsetof(struct rbd_options, client_name),
86                 .category       = FIO_OPT_C_ENGINE,
87                 .group          = FIO_OPT_G_RBD,
88         },
89         {
90                 .name           = "busy_poll",
91                 .lname          = "Busy poll",
92                 .type           = FIO_OPT_BOOL,
93                 .help           = "Busy poll for completions instead of sleeping",
94                 .off1           = offsetof(struct rbd_options, busy_poll),
95                 .def            = "0",
96                 .category       = FIO_OPT_C_ENGINE,
97                 .group          = FIO_OPT_G_RBD,
98         },
99         {
100                 .name = NULL,
101         },
102 };
103
104 static int _fio_setup_rbd_data(struct thread_data *td,
105                                struct rbd_data **rbd_data_ptr)
106 {
107         struct rbd_data *rbd;
108
109         if (td->io_ops_data)
110                 return 0;
111
112         rbd = calloc(1, sizeof(struct rbd_data));
113         if (!rbd)
114                 goto failed;
115
116 #ifdef CONFIG_RBD_POLL
117         /* add for poll, init fd: -1 */
118         rbd->fd = -1;
119 #endif
120
121         rbd->aio_events = calloc(td->o.iodepth, sizeof(struct io_u *));
122         if (!rbd->aio_events)
123                 goto failed;
124
125         rbd->sort_events = calloc(td->o.iodepth, sizeof(struct io_u *));
126         if (!rbd->sort_events)
127                 goto failed;
128
129         *rbd_data_ptr = rbd;
130         return 0;
131
132 failed:
133         if (rbd) {
134                 if (rbd->aio_events) 
135                         free(rbd->aio_events);
136                 if (rbd->sort_events)
137                         free(rbd->sort_events);
138                 free(rbd);
139         }
140         return 1;
141
142 }
143
144 static int _fio_rbd_connect(struct thread_data *td)
145 {
146         struct rbd_data *rbd = td->io_ops_data;
147         struct rbd_options *o = td->eo;
148         int r;
149
150         if (o->cluster_name) {
151                 char *client_name = NULL; 
152
153                 /*
154                  * If we specify cluser name, the rados_create2
155                  * will not assume 'client.'. name is considered
156                  * as a full type.id namestr
157                  */
158                 if (o->client_name) {
159                         if (!index(o->client_name, '.')) {
160                                 client_name = calloc(1, strlen("client.") +
161                                                     strlen(o->client_name) + 1);
162                                 strcat(client_name, "client.");
163                                 strcat(client_name, o->client_name);
164                         } else {
165                                 client_name = o->client_name;
166                         }
167                 }
168
169                 r = rados_create2(&rbd->cluster, o->cluster_name,
170                                  client_name, 0);
171
172                 if (client_name && !index(o->client_name, '.'))
173                         free(client_name);
174         } else
175                 r = rados_create(&rbd->cluster, o->client_name);
176         
177         if (r < 0) {
178                 log_err("rados_create failed.\n");
179                 goto failed_early;
180         }
181
182         r = rados_conf_read_file(rbd->cluster, NULL);
183         if (r < 0) {
184                 log_err("rados_conf_read_file failed.\n");
185                 goto failed_early;
186         }
187
188         r = rados_connect(rbd->cluster);
189         if (r < 0) {
190                 log_err("rados_connect failed.\n");
191                 goto failed_shutdown;
192         }
193
194         r = rados_ioctx_create(rbd->cluster, o->pool_name, &rbd->io_ctx);
195         if (r < 0) {
196                 log_err("rados_ioctx_create failed.\n");
197                 goto failed_shutdown;
198         }
199
200         r = rbd_open(rbd->io_ctx, o->rbd_name, &rbd->image, NULL /*snap */ );
201         if (r < 0) {
202                 log_err("rbd_open failed.\n");
203                 goto failed_open;
204         }
205
206 #ifdef CONFIG_RBD_POLL
207         /* add for rbd poll */
208         rbd->fd = eventfd(0, EFD_NONBLOCK);
209         if (rbd->fd < 0) {
210                 log_err("eventfd failed.\n");
211                 goto failed_open;
212         }
213     
214         r = rbd_set_image_notification(rbd->image, rbd->fd, EVENT_TYPE_EVENTFD);
215         if (r < 0) {
216                 log_err("rbd_set_image_notification failed.\n");
217                 goto failed_notify;
218         }
219 #endif
220
221         return 0;
222
223 #ifdef CONFIG_RBD_POLL
224 failed_notify:
225         close(rbd->fd);
226         rbd->fd = -1;
227 #endif
228
229 failed_open:
230         rados_ioctx_destroy(rbd->io_ctx);
231         rbd->io_ctx = NULL;
232 failed_shutdown:
233         rados_shutdown(rbd->cluster);
234         rbd->cluster = NULL;
235 failed_early:
236         return 1;
237 }
238
239 static void _fio_rbd_disconnect(struct rbd_data *rbd)
240 {
241         if (!rbd)
242                 return;
243
244 #ifdef CONFIG_RBD_POLL
245         /* close eventfd */
246         if (rbd->fd >= 0) {
247                 close(rbd->fd);
248                 rbd->fd = -1;
249         }
250 #endif
251
252         /* shutdown everything */
253         if (rbd->image) {
254                 rbd_close(rbd->image);
255                 rbd->image = NULL;
256         }
257
258         if (rbd->io_ctx) {
259                 rados_ioctx_destroy(rbd->io_ctx);
260                 rbd->io_ctx = NULL;
261         }
262
263         if (rbd->cluster) {
264                 rados_shutdown(rbd->cluster);
265                 rbd->cluster = NULL;
266         }
267 }
268
269 static void _fio_rbd_finish_aiocb(rbd_completion_t comp, void *data)
270 {
271         struct fio_rbd_iou *fri = data;
272         struct io_u *io_u = fri->io_u;
273         ssize_t ret;
274
275         /*
276          * Looks like return value is 0 for success, or < 0 for
277          * a specific error. So we have to assume that it can't do
278          * partial completions.
279          */
280         ret = rbd_aio_get_return_value(fri->completion);
281         if (ret < 0) {
282                 io_u->error = ret;
283                 io_u->resid = io_u->xfer_buflen;
284         } else
285                 io_u->error = 0;
286
287         fri->io_complete = 1;
288 }
289
290 static struct io_u *fio_rbd_event(struct thread_data *td, int event)
291 {
292         struct rbd_data *rbd = td->io_ops_data;
293
294         return rbd->aio_events[event];
295 }
296
297 static inline int fri_check_complete(struct rbd_data *rbd, struct io_u *io_u,
298                                      unsigned int *events)
299 {
300         struct fio_rbd_iou *fri = io_u->engine_data;
301
302         if (fri->io_complete) {
303                 fri->io_seen = 1;
304                 rbd->aio_events[*events] = io_u;
305                 (*events)++;
306
307                 rbd_aio_release(fri->completion);
308                 return 1;
309         }
310
311         return 0;
312 }
313
314 static inline int rbd_io_u_seen(struct io_u *io_u)
315 {
316         struct fio_rbd_iou *fri = io_u->engine_data;
317
318         return fri->io_seen;
319 }
320
321 static void rbd_io_u_wait_complete(struct io_u *io_u)
322 {
323         struct fio_rbd_iou *fri = io_u->engine_data;
324
325         rbd_aio_wait_for_complete(fri->completion);
326 }
327
328 static int rbd_io_u_cmp(const void *p1, const void *p2)
329 {
330         const struct io_u **a = (const struct io_u **) p1;
331         const struct io_u **b = (const struct io_u **) p2;
332         uint64_t at, bt;
333
334         at = utime_since_now(&(*a)->start_time);
335         bt = utime_since_now(&(*b)->start_time);
336
337         if (at < bt)
338                 return -1;
339         else if (at == bt)
340                 return 0;
341         else
342                 return 1;
343 }
344
345 static int rbd_iter_events(struct thread_data *td, unsigned int *events,
346                            unsigned int min_evts, int wait)
347 {
348         struct rbd_data *rbd = td->io_ops_data;
349         unsigned int this_events = 0;
350         struct io_u *io_u;
351         int i, sidx = 0;
352
353 #ifdef CONFIG_RBD_POLL
354         int ret = 0;
355         int event_num = 0;
356         struct fio_rbd_iou *fri = NULL;
357         rbd_completion_t comps[min_evts];
358
359         struct pollfd pfd;
360         pfd.fd = rbd->fd;
361         pfd.events = POLLIN;
362
363         ret = poll(&pfd, 1, -1);
364         if (ret <= 0) {
365                 return 0;
366         }
367         
368         assert(pfd.revents & POLLIN);
369
370         event_num = rbd_poll_io_events(rbd->image, comps, min_evts);
371
372         for (i = 0; i < event_num; i++) {
373                 fri = rbd_aio_get_arg(comps[i]);
374                 io_u = fri->io_u;
375 #else
376         io_u_qiter(&td->io_u_all, io_u, i) {
377 #endif
378                 if (!(io_u->flags & IO_U_F_FLIGHT))
379                         continue;
380                 if (rbd_io_u_seen(io_u))
381                         continue;
382
383                 if (fri_check_complete(rbd, io_u, events))
384                         this_events++;
385                 else if (wait)
386                         rbd->sort_events[sidx++] = io_u;
387         }
388
389         if (!wait || !sidx)
390                 return this_events;
391
392         /*
393          * Sort events, oldest issue first, then wait on as many as we
394          * need in order of age. If we have enough events, stop waiting,
395          * and just check if any of the older ones are done.
396          */
397         if (sidx > 1)
398                 qsort(rbd->sort_events, sidx, sizeof(struct io_u *), rbd_io_u_cmp);
399
400         for (i = 0; i < sidx; i++) {
401                 io_u = rbd->sort_events[i];
402
403                 if (fri_check_complete(rbd, io_u, events)) {
404                         this_events++;
405                         continue;
406                 }
407
408                 /*
409                  * Stop waiting when we have enough, but continue checking
410                  * all pending IOs if they are complete.
411                  */
412                 if (*events >= min_evts)
413                         continue;
414
415                 rbd_io_u_wait_complete(io_u);
416
417                 if (fri_check_complete(rbd, io_u, events))
418                         this_events++;
419         }
420
421         return this_events;
422 }
423
424 static int fio_rbd_getevents(struct thread_data *td, unsigned int min,
425                              unsigned int max, const struct timespec *t)
426 {
427         unsigned int this_events, events = 0;
428         struct rbd_options *o = td->eo;
429         int wait = 0;
430
431         do {
432                 this_events = rbd_iter_events(td, &events, min, wait);
433
434                 if (events >= min)
435                         break;
436                 if (this_events)
437                         continue;
438
439                 if (!o->busy_poll)
440                         wait = 1;
441                 else
442                         nop;
443         } while (1);
444
445         return events;
446 }
447
448 static int fio_rbd_queue(struct thread_data *td, struct io_u *io_u)
449 {
450         struct rbd_data *rbd = td->io_ops_data;
451         struct fio_rbd_iou *fri = io_u->engine_data;
452         int r = -1;
453
454         fio_ro_check(td, io_u);
455
456         fri->io_seen = 0;
457         fri->io_complete = 0;
458
459         r = rbd_aio_create_completion(fri, _fio_rbd_finish_aiocb,
460                                                 &fri->completion);
461         if (r < 0) {
462                 log_err("rbd_aio_create_completion failed.\n");
463                 goto failed;
464         }
465
466         if (io_u->ddir == DDIR_WRITE) {
467 #ifdef CONFIG_RBD_BLKIN
468                 blkin_init_trace_info(&fri->info);
469                 r = rbd_aio_write_traced(rbd->image, io_u->offset, io_u->xfer_buflen,
470                                          io_u->xfer_buf, fri->completion, &fri->info);
471 #else
472                 r = rbd_aio_write(rbd->image, io_u->offset, io_u->xfer_buflen,
473                                          io_u->xfer_buf, fri->completion);
474 #endif
475                 if (r < 0) {
476                         log_err("rbd_aio_write failed.\n");
477                         goto failed_comp;
478                 }
479
480         } else if (io_u->ddir == DDIR_READ) {
481 #ifdef CONFIG_RBD_BLKIN
482                 blkin_init_trace_info(&fri->info);
483                 r = rbd_aio_read_traced(rbd->image, io_u->offset, io_u->xfer_buflen,
484                                         io_u->xfer_buf, fri->completion, &fri->info);
485 #else
486                 r = rbd_aio_read(rbd->image, io_u->offset, io_u->xfer_buflen,
487                                         io_u->xfer_buf, fri->completion);
488 #endif
489
490                 if (r < 0) {
491                         log_err("rbd_aio_read failed.\n");
492                         goto failed_comp;
493                 }
494         } else if (io_u->ddir == DDIR_TRIM) {
495                 r = rbd_aio_discard(rbd->image, io_u->offset,
496                                         io_u->xfer_buflen, fri->completion);
497                 if (r < 0) {
498                         log_err("rbd_aio_discard failed.\n");
499                         goto failed_comp;
500                 }
501         } else if (io_u->ddir == DDIR_SYNC) {
502                 r = rbd_aio_flush(rbd->image, fri->completion);
503                 if (r < 0) {
504                         log_err("rbd_flush failed.\n");
505                         goto failed_comp;
506                 }
507         } else {
508                 dprint(FD_IO, "%s: Warning: unhandled ddir: %d\n", __func__,
509                        io_u->ddir);
510                 goto failed_comp;
511         }
512
513         return FIO_Q_QUEUED;
514 failed_comp:
515         rbd_aio_release(fri->completion);
516 failed:
517         io_u->error = r;
518         td_verror(td, io_u->error, "xfer");
519         return FIO_Q_COMPLETED;
520 }
521
522 static int fio_rbd_init(struct thread_data *td)
523 {
524         int r;
525
526         r = _fio_rbd_connect(td);
527         if (r) {
528                 log_err("fio_rbd_connect failed, return code: %d .\n", r);
529                 goto failed;
530         }
531
532         return 0;
533
534 failed:
535         return 1;
536 }
537
538 static void fio_rbd_cleanup(struct thread_data *td)
539 {
540         struct rbd_data *rbd = td->io_ops_data;
541
542         if (rbd) {
543                 _fio_rbd_disconnect(rbd);
544                 free(rbd->aio_events);
545                 free(rbd->sort_events);
546                 free(rbd);
547         }
548 }
549
550 static int fio_rbd_setup(struct thread_data *td)
551 {
552         rbd_image_info_t info;
553         struct fio_file *f;
554         struct rbd_data *rbd = NULL;
555         int major, minor, extra;
556         int r;
557
558         /* log version of librbd. No cluster connection required. */
559         rbd_version(&major, &minor, &extra);
560         log_info("rbd engine: RBD version: %d.%d.%d\n", major, minor, extra);
561
562         /* allocate engine specific structure to deal with librbd. */
563         r = _fio_setup_rbd_data(td, &rbd);
564         if (r) {
565                 log_err("fio_setup_rbd_data failed.\n");
566                 goto cleanup;
567         }
568         td->io_ops_data = rbd;
569
570         /* librbd does not allow us to run first in the main thread and later
571          * in a fork child. It needs to be the same process context all the
572          * time. 
573          */
574         td->o.use_thread = 1;
575
576         /* connect in the main thread to determine to determine
577          * the size of the given RADOS block device. And disconnect
578          * later on.
579          */
580         r = _fio_rbd_connect(td);
581         if (r) {
582                 log_err("fio_rbd_connect failed.\n");
583                 goto cleanup;
584         }
585
586         /* get size of the RADOS block device */
587         r = rbd_stat(rbd->image, &info, sizeof(info));
588         if (r < 0) {
589                 log_err("rbd_status failed.\n");
590                 goto disconnect;
591         }
592         dprint(FD_IO, "rbd-engine: image size: %lu\n", info.size);
593
594         /* taken from "net" engine. Pretend we deal with files,
595          * even if we do not have any ideas about files.
596          * The size of the RBD is set instead of a artificial file.
597          */
598         if (!td->files_index) {
599                 add_file(td, td->o.filename ? : "rbd", 0, 0);
600                 td->o.nr_files = td->o.nr_files ? : 1;
601                 td->o.open_files++;
602         }
603         f = td->files[0];
604         f->real_file_size = info.size;
605
606         /* disconnect, then we were only connected to determine
607          * the size of the RBD.
608          */
609         _fio_rbd_disconnect(rbd);
610         return 0;
611
612 disconnect:
613         _fio_rbd_disconnect(rbd);
614 cleanup:
615         fio_rbd_cleanup(td);
616         return r;
617 }
618
619 static int fio_rbd_open(struct thread_data *td, struct fio_file *f)
620 {
621         return 0;
622 }
623
624 static int fio_rbd_invalidate(struct thread_data *td, struct fio_file *f)
625 {
626 #if defined(CONFIG_RBD_INVAL)
627         struct rbd_data *rbd = td->io_ops_data;
628
629         return rbd_invalidate_cache(rbd->image);
630 #else
631         return 0;
632 #endif
633 }
634
635 static void fio_rbd_io_u_free(struct thread_data *td, struct io_u *io_u)
636 {
637         struct fio_rbd_iou *fri = io_u->engine_data;
638
639         if (fri) {
640                 io_u->engine_data = NULL;
641                 free(fri);
642         }
643 }
644
645 static int fio_rbd_io_u_init(struct thread_data *td, struct io_u *io_u)
646 {
647         struct fio_rbd_iou *fri;
648
649         fri = calloc(1, sizeof(*fri));
650         fri->io_u = io_u;
651         io_u->engine_data = fri;
652         return 0;
653 }
654
655 static struct ioengine_ops ioengine = {
656         .name                   = "rbd",
657         .version                = FIO_IOOPS_VERSION,
658         .setup                  = fio_rbd_setup,
659         .init                   = fio_rbd_init,
660         .queue                  = fio_rbd_queue,
661         .getevents              = fio_rbd_getevents,
662         .event                  = fio_rbd_event,
663         .cleanup                = fio_rbd_cleanup,
664         .open_file              = fio_rbd_open,
665         .invalidate             = fio_rbd_invalidate,
666         .options                = options,
667         .io_u_init              = fio_rbd_io_u_init,
668         .io_u_free              = fio_rbd_io_u_free,
669         .option_struct_size     = sizeof(struct rbd_options),
670 };
671
672 static void fio_init fio_rbd_register(void)
673 {
674         register_ioengine(&ioengine);
675 }
676
677 static void fio_exit fio_rbd_unregister(void)
678 {
679         unregister_ioengine(&ioengine);
680 }