rbd: fix crash with zero sized image
[fio.git] / engines / rbd.c
1 /*
2  * rbd engine
3  *
4  * IO engine using Ceph's librbd to test RADOS Block Devices.
5  *
6  */
7
8 #include <rbd/librbd.h>
9
10 #include "../fio.h"
11 #include "../optgroup.h"
12 #ifdef CONFIG_RBD_BLKIN
13 #include <zipkin_c.h>
14 #endif
15
16 #ifdef CONFIG_RBD_POLL
17 /* add for poll */
18 #include <poll.h>
19 #include <sys/eventfd.h>
20 #endif
21
22 struct fio_rbd_iou {
23         struct io_u *io_u;
24         rbd_completion_t completion;
25         int io_seen;
26         int io_complete;
27 #ifdef CONFIG_RBD_BLKIN
28         struct blkin_trace_info info;
29 #endif
30 };
31
32 struct rbd_data {
33         rados_t cluster;
34         rados_ioctx_t io_ctx;
35         rbd_image_t image;
36         struct io_u **aio_events;
37         struct io_u **sort_events;
38         int fd; /* add for poll */
39 };
40
41 struct rbd_options {
42         void *pad;
43         char *cluster_name;
44         char *rbd_name;
45         char *pool_name;
46         char *client_name;
47         int busy_poll;
48 };
49
50 static struct fio_option options[] = {
51         {
52                 .name           = "clustername",
53                 .lname          = "ceph cluster name",
54                 .type           = FIO_OPT_STR_STORE,
55                 .help           = "Cluster name for ceph",
56                 .off1           = offsetof(struct rbd_options, cluster_name),
57                 .category       = FIO_OPT_C_ENGINE,
58                 .group          = FIO_OPT_G_RBD,
59         },
60         {
61                 .name           = "rbdname",
62                 .lname          = "rbd engine rbdname",
63                 .type           = FIO_OPT_STR_STORE,
64                 .help           = "RBD name for RBD engine",
65                 .off1           = offsetof(struct rbd_options, rbd_name),
66                 .category       = FIO_OPT_C_ENGINE,
67                 .group          = FIO_OPT_G_RBD,
68         },
69         {
70                 .name           = "pool",
71                 .lname          = "rbd engine pool",
72                 .type           = FIO_OPT_STR_STORE,
73                 .help           = "Name of the pool hosting the RBD for the RBD engine",
74                 .off1           = offsetof(struct rbd_options, pool_name),
75                 .category       = FIO_OPT_C_ENGINE,
76                 .group          = FIO_OPT_G_RBD,
77         },
78         {
79                 .name           = "clientname",
80                 .lname          = "rbd engine clientname",
81                 .type           = FIO_OPT_STR_STORE,
82                 .help           = "Name of the ceph client to access the RBD for the RBD engine",
83                 .off1           = offsetof(struct rbd_options, client_name),
84                 .category       = FIO_OPT_C_ENGINE,
85                 .group          = FIO_OPT_G_RBD,
86         },
87         {
88                 .name           = "busy_poll",
89                 .lname          = "Busy poll",
90                 .type           = FIO_OPT_BOOL,
91                 .help           = "Busy poll for completions instead of sleeping",
92                 .off1           = offsetof(struct rbd_options, busy_poll),
93                 .def            = "0",
94                 .category       = FIO_OPT_C_ENGINE,
95                 .group          = FIO_OPT_G_RBD,
96         },
97         {
98                 .name = NULL,
99         },
100 };
101
102 static int _fio_setup_rbd_data(struct thread_data *td,
103                                struct rbd_data **rbd_data_ptr)
104 {
105         struct rbd_data *rbd;
106
107         if (td->io_ops_data)
108                 return 0;
109
110         rbd = calloc(1, sizeof(struct rbd_data));
111         if (!rbd)
112                 goto failed;
113
114         /* add for poll, init fd: -1 */
115         rbd->fd = -1;
116
117         rbd->aio_events = calloc(td->o.iodepth, sizeof(struct io_u *));
118         if (!rbd->aio_events)
119                 goto failed;
120
121         rbd->sort_events = calloc(td->o.iodepth, sizeof(struct io_u *));
122         if (!rbd->sort_events)
123                 goto failed;
124
125         *rbd_data_ptr = rbd;
126         return 0;
127
128 failed:
129         if (rbd) {
130                 if (rbd->aio_events) 
131                         free(rbd->aio_events);
132                 if (rbd->sort_events)
133                         free(rbd->sort_events);
134                 free(rbd);
135         }
136         return 1;
137
138 }
139
140 #ifdef CONFIG_RBD_POLL
141 static bool _fio_rbd_setup_poll(struct rbd_data *rbd)
142 {
143         int r;
144
145         /* add for rbd poll */
146         rbd->fd = eventfd(0, EFD_NONBLOCK);
147         if (rbd->fd < 0) {
148                 log_err("eventfd failed.\n");
149                 return false;
150         }
151
152         r = rbd_set_image_notification(rbd->image, rbd->fd, EVENT_TYPE_EVENTFD);
153         if (r < 0) {
154                 log_err("rbd_set_image_notification failed.\n");
155                 close(rbd->fd);
156                 rbd->fd = -1;
157                 return false;
158         }
159
160         return true;
161 }
162 #else
163 static bool _fio_rbd_setup_poll(struct rbd_data *rbd)
164 {
165         return true;
166 }
167 #endif
168
169 static int _fio_rbd_connect(struct thread_data *td)
170 {
171         struct rbd_data *rbd = td->io_ops_data;
172         struct rbd_options *o = td->eo;
173         int r;
174
175         if (o->cluster_name) {
176                 char *client_name = NULL; 
177
178                 /*
179                  * If we specify cluser name, the rados_create2
180                  * will not assume 'client.'. name is considered
181                  * as a full type.id namestr
182                  */
183                 if (o->client_name) {
184                         if (!index(o->client_name, '.')) {
185                                 client_name = calloc(1, strlen("client.") +
186                                                     strlen(o->client_name) + 1);
187                                 strcat(client_name, "client.");
188                                 strcat(client_name, o->client_name);
189                         } else {
190                                 client_name = o->client_name;
191                         }
192                 }
193
194                 r = rados_create2(&rbd->cluster, o->cluster_name,
195                                  client_name, 0);
196
197                 if (client_name && !index(o->client_name, '.'))
198                         free(client_name);
199         } else
200                 r = rados_create(&rbd->cluster, o->client_name);
201         
202         if (r < 0) {
203                 log_err("rados_create failed.\n");
204                 goto failed_early;
205         }
206
207         r = rados_conf_read_file(rbd->cluster, NULL);
208         if (r < 0) {
209                 log_err("rados_conf_read_file failed.\n");
210                 goto failed_early;
211         }
212
213         r = rados_connect(rbd->cluster);
214         if (r < 0) {
215                 log_err("rados_connect failed.\n");
216                 goto failed_shutdown;
217         }
218
219         r = rados_ioctx_create(rbd->cluster, o->pool_name, &rbd->io_ctx);
220         if (r < 0) {
221                 log_err("rados_ioctx_create failed.\n");
222                 goto failed_shutdown;
223         }
224
225         r = rbd_open(rbd->io_ctx, o->rbd_name, &rbd->image, NULL /*snap */ );
226         if (r < 0) {
227                 log_err("rbd_open failed.\n");
228                 goto failed_open;
229         }
230
231         if (!_fio_rbd_setup_poll(rbd))
232                 goto failed_poll;
233
234         return 0;
235
236 failed_poll:
237         rbd_close(rbd->image);
238         rbd->image = NULL;
239 failed_open:
240         rados_ioctx_destroy(rbd->io_ctx);
241         rbd->io_ctx = NULL;
242 failed_shutdown:
243         rados_shutdown(rbd->cluster);
244         rbd->cluster = NULL;
245 failed_early:
246         return 1;
247 }
248
249 static void _fio_rbd_disconnect(struct rbd_data *rbd)
250 {
251         if (!rbd)
252                 return;
253
254         /* close eventfd */
255         if (rbd->fd != -1) {
256                 close(rbd->fd);
257                 rbd->fd = -1;
258         }
259
260         /* shutdown everything */
261         if (rbd->image) {
262                 rbd_close(rbd->image);
263                 rbd->image = NULL;
264         }
265
266         if (rbd->io_ctx) {
267                 rados_ioctx_destroy(rbd->io_ctx);
268                 rbd->io_ctx = NULL;
269         }
270
271         if (rbd->cluster) {
272                 rados_shutdown(rbd->cluster);
273                 rbd->cluster = NULL;
274         }
275 }
276
277 static void _fio_rbd_finish_aiocb(rbd_completion_t comp, void *data)
278 {
279         struct fio_rbd_iou *fri = data;
280         struct io_u *io_u = fri->io_u;
281         ssize_t ret;
282
283         /*
284          * Looks like return value is 0 for success, or < 0 for
285          * a specific error. So we have to assume that it can't do
286          * partial completions.
287          */
288         ret = rbd_aio_get_return_value(fri->completion);
289         if (ret < 0) {
290                 io_u->error = ret;
291                 io_u->resid = io_u->xfer_buflen;
292         } else
293                 io_u->error = 0;
294
295         fri->io_complete = 1;
296 }
297
298 static struct io_u *fio_rbd_event(struct thread_data *td, int event)
299 {
300         struct rbd_data *rbd = td->io_ops_data;
301
302         return rbd->aio_events[event];
303 }
304
305 static inline int fri_check_complete(struct rbd_data *rbd, struct io_u *io_u,
306                                      unsigned int *events)
307 {
308         struct fio_rbd_iou *fri = io_u->engine_data;
309
310         if (fri->io_complete) {
311                 fri->io_seen = 1;
312                 rbd->aio_events[*events] = io_u;
313                 (*events)++;
314
315                 rbd_aio_release(fri->completion);
316                 return 1;
317         }
318
319         return 0;
320 }
321
322 static inline int rbd_io_u_seen(struct io_u *io_u)
323 {
324         struct fio_rbd_iou *fri = io_u->engine_data;
325
326         return fri->io_seen;
327 }
328
329 static void rbd_io_u_wait_complete(struct io_u *io_u)
330 {
331         struct fio_rbd_iou *fri = io_u->engine_data;
332
333         rbd_aio_wait_for_complete(fri->completion);
334 }
335
336 static int rbd_io_u_cmp(const void *p1, const void *p2)
337 {
338         const struct io_u **a = (const struct io_u **) p1;
339         const struct io_u **b = (const struct io_u **) p2;
340         uint64_t at, bt;
341
342         at = utime_since_now(&(*a)->start_time);
343         bt = utime_since_now(&(*b)->start_time);
344
345         if (at < bt)
346                 return -1;
347         else if (at == bt)
348                 return 0;
349         else
350                 return 1;
351 }
352
353 static int rbd_iter_events(struct thread_data *td, unsigned int *events,
354                            unsigned int min_evts, int wait)
355 {
356         struct rbd_data *rbd = td->io_ops_data;
357         unsigned int this_events = 0;
358         struct io_u *io_u;
359         int i, sidx = 0;
360
361 #ifdef CONFIG_RBD_POLL
362         int ret = 0;
363         int event_num = 0;
364         struct fio_rbd_iou *fri = NULL;
365         rbd_completion_t comps[min_evts];
366
367         struct pollfd pfd;
368         pfd.fd = rbd->fd;
369         pfd.events = POLLIN;
370
371         ret = poll(&pfd, 1, -1);
372         if (ret <= 0)
373                 return 0;
374
375         assert(pfd.revents & POLLIN);
376
377         event_num = rbd_poll_io_events(rbd->image, comps, min_evts);
378
379         for (i = 0; i < event_num; i++) {
380                 fri = rbd_aio_get_arg(comps[i]);
381                 io_u = fri->io_u;
382 #else
383         io_u_qiter(&td->io_u_all, io_u, i) {
384 #endif
385                 if (!(io_u->flags & IO_U_F_FLIGHT))
386                         continue;
387                 if (rbd_io_u_seen(io_u))
388                         continue;
389
390                 if (fri_check_complete(rbd, io_u, events))
391                         this_events++;
392                 else if (wait)
393                         rbd->sort_events[sidx++] = io_u;
394         }
395
396         if (!wait || !sidx)
397                 return this_events;
398
399         /*
400          * Sort events, oldest issue first, then wait on as many as we
401          * need in order of age. If we have enough events, stop waiting,
402          * and just check if any of the older ones are done.
403          */
404         if (sidx > 1)
405                 qsort(rbd->sort_events, sidx, sizeof(struct io_u *), rbd_io_u_cmp);
406
407         for (i = 0; i < sidx; i++) {
408                 io_u = rbd->sort_events[i];
409
410                 if (fri_check_complete(rbd, io_u, events)) {
411                         this_events++;
412                         continue;
413                 }
414
415                 /*
416                  * Stop waiting when we have enough, but continue checking
417                  * all pending IOs if they are complete.
418                  */
419                 if (*events >= min_evts)
420                         continue;
421
422                 rbd_io_u_wait_complete(io_u);
423
424                 if (fri_check_complete(rbd, io_u, events))
425                         this_events++;
426         }
427
428         return this_events;
429 }
430
431 static int fio_rbd_getevents(struct thread_data *td, unsigned int min,
432                              unsigned int max, const struct timespec *t)
433 {
434         unsigned int this_events, events = 0;
435         struct rbd_options *o = td->eo;
436         int wait = 0;
437
438         do {
439                 this_events = rbd_iter_events(td, &events, min, wait);
440
441                 if (events >= min)
442                         break;
443                 if (this_events)
444                         continue;
445
446                 if (!o->busy_poll)
447                         wait = 1;
448                 else
449                         nop;
450         } while (1);
451
452         return events;
453 }
454
455 static int fio_rbd_queue(struct thread_data *td, struct io_u *io_u)
456 {
457         struct rbd_data *rbd = td->io_ops_data;
458         struct fio_rbd_iou *fri = io_u->engine_data;
459         int r = -1;
460
461         fio_ro_check(td, io_u);
462
463         fri->io_seen = 0;
464         fri->io_complete = 0;
465
466         r = rbd_aio_create_completion(fri, _fio_rbd_finish_aiocb,
467                                                 &fri->completion);
468         if (r < 0) {
469                 log_err("rbd_aio_create_completion failed.\n");
470                 goto failed;
471         }
472
473         if (io_u->ddir == DDIR_WRITE) {
474 #ifdef CONFIG_RBD_BLKIN
475                 blkin_init_trace_info(&fri->info);
476                 r = rbd_aio_write_traced(rbd->image, io_u->offset, io_u->xfer_buflen,
477                                          io_u->xfer_buf, fri->completion, &fri->info);
478 #else
479                 r = rbd_aio_write(rbd->image, io_u->offset, io_u->xfer_buflen,
480                                          io_u->xfer_buf, fri->completion);
481 #endif
482                 if (r < 0) {
483                         log_err("rbd_aio_write failed.\n");
484                         goto failed_comp;
485                 }
486
487         } else if (io_u->ddir == DDIR_READ) {
488 #ifdef CONFIG_RBD_BLKIN
489                 blkin_init_trace_info(&fri->info);
490                 r = rbd_aio_read_traced(rbd->image, io_u->offset, io_u->xfer_buflen,
491                                         io_u->xfer_buf, fri->completion, &fri->info);
492 #else
493                 r = rbd_aio_read(rbd->image, io_u->offset, io_u->xfer_buflen,
494                                         io_u->xfer_buf, fri->completion);
495 #endif
496
497                 if (r < 0) {
498                         log_err("rbd_aio_read failed.\n");
499                         goto failed_comp;
500                 }
501         } else if (io_u->ddir == DDIR_TRIM) {
502                 r = rbd_aio_discard(rbd->image, io_u->offset,
503                                         io_u->xfer_buflen, fri->completion);
504                 if (r < 0) {
505                         log_err("rbd_aio_discard failed.\n");
506                         goto failed_comp;
507                 }
508         } else if (io_u->ddir == DDIR_SYNC) {
509                 r = rbd_aio_flush(rbd->image, fri->completion);
510                 if (r < 0) {
511                         log_err("rbd_flush failed.\n");
512                         goto failed_comp;
513                 }
514         } else {
515                 dprint(FD_IO, "%s: Warning: unhandled ddir: %d\n", __func__,
516                        io_u->ddir);
517                 goto failed_comp;
518         }
519
520         return FIO_Q_QUEUED;
521 failed_comp:
522         rbd_aio_release(fri->completion);
523 failed:
524         io_u->error = r;
525         td_verror(td, io_u->error, "xfer");
526         return FIO_Q_COMPLETED;
527 }
528
529 static int fio_rbd_init(struct thread_data *td)
530 {
531         int r;
532
533         r = _fio_rbd_connect(td);
534         if (r) {
535                 log_err("fio_rbd_connect failed, return code: %d .\n", r);
536                 goto failed;
537         }
538
539         return 0;
540
541 failed:
542         return 1;
543 }
544
545 static void fio_rbd_cleanup(struct thread_data *td)
546 {
547         struct rbd_data *rbd = td->io_ops_data;
548
549         if (rbd) {
550                 _fio_rbd_disconnect(rbd);
551                 free(rbd->aio_events);
552                 free(rbd->sort_events);
553                 free(rbd);
554         }
555 }
556
557 static int fio_rbd_setup(struct thread_data *td)
558 {
559         rbd_image_info_t info;
560         struct fio_file *f;
561         struct rbd_data *rbd = NULL;
562         int major, minor, extra;
563         int r;
564
565         /* log version of librbd. No cluster connection required. */
566         rbd_version(&major, &minor, &extra);
567         log_info("rbd engine: RBD version: %d.%d.%d\n", major, minor, extra);
568
569         /* allocate engine specific structure to deal with librbd. */
570         r = _fio_setup_rbd_data(td, &rbd);
571         if (r) {
572                 log_err("fio_setup_rbd_data failed.\n");
573                 goto cleanup;
574         }
575         td->io_ops_data = rbd;
576
577         /* librbd does not allow us to run first in the main thread and later
578          * in a fork child. It needs to be the same process context all the
579          * time. 
580          */
581         td->o.use_thread = 1;
582
583         /* connect in the main thread to determine to determine
584          * the size of the given RADOS block device. And disconnect
585          * later on.
586          */
587         r = _fio_rbd_connect(td);
588         if (r) {
589                 log_err("fio_rbd_connect failed.\n");
590                 goto cleanup;
591         }
592
593         /* get size of the RADOS block device */
594         r = rbd_stat(rbd->image, &info, sizeof(info));
595         if (r < 0) {
596                 log_err("rbd_status failed.\n");
597                 goto disconnect;
598         } else if (info.size == 0) {
599                 log_err("image size should be larger than zero.\n");
600                 r = -EINVAL;
601                 goto disconnect;
602         }
603
604         dprint(FD_IO, "rbd-engine: image size: %lu\n", info.size);
605
606         /* taken from "net" engine. Pretend we deal with files,
607          * even if we do not have any ideas about files.
608          * The size of the RBD is set instead of a artificial file.
609          */
610         if (!td->files_index) {
611                 add_file(td, td->o.filename ? : "rbd", 0, 0);
612                 td->o.nr_files = td->o.nr_files ? : 1;
613                 td->o.open_files++;
614         }
615         f = td->files[0];
616         f->real_file_size = info.size;
617
618         /* disconnect, then we were only connected to determine
619          * the size of the RBD.
620          */
621         _fio_rbd_disconnect(rbd);
622         return 0;
623
624 disconnect:
625         _fio_rbd_disconnect(rbd);
626 cleanup:
627         fio_rbd_cleanup(td);
628         return r;
629 }
630
631 static int fio_rbd_open(struct thread_data *td, struct fio_file *f)
632 {
633         return 0;
634 }
635
636 static int fio_rbd_invalidate(struct thread_data *td, struct fio_file *f)
637 {
638 #if defined(CONFIG_RBD_INVAL)
639         struct rbd_data *rbd = td->io_ops_data;
640
641         return rbd_invalidate_cache(rbd->image);
642 #else
643         return 0;
644 #endif
645 }
646
647 static void fio_rbd_io_u_free(struct thread_data *td, struct io_u *io_u)
648 {
649         struct fio_rbd_iou *fri = io_u->engine_data;
650
651         if (fri) {
652                 io_u->engine_data = NULL;
653                 free(fri);
654         }
655 }
656
657 static int fio_rbd_io_u_init(struct thread_data *td, struct io_u *io_u)
658 {
659         struct fio_rbd_iou *fri;
660
661         fri = calloc(1, sizeof(*fri));
662         fri->io_u = io_u;
663         io_u->engine_data = fri;
664         return 0;
665 }
666
667 static struct ioengine_ops ioengine = {
668         .name                   = "rbd",
669         .version                = FIO_IOOPS_VERSION,
670         .setup                  = fio_rbd_setup,
671         .init                   = fio_rbd_init,
672         .queue                  = fio_rbd_queue,
673         .getevents              = fio_rbd_getevents,
674         .event                  = fio_rbd_event,
675         .cleanup                = fio_rbd_cleanup,
676         .open_file              = fio_rbd_open,
677         .invalidate             = fio_rbd_invalidate,
678         .options                = options,
679         .io_u_init              = fio_rbd_io_u_init,
680         .io_u_free              = fio_rbd_io_u_free,
681         .option_struct_size     = sizeof(struct rbd_options),
682 };
683
684 static void fio_init fio_rbd_register(void)
685 {
686         register_ioengine(&ioengine);
687 }
688
689 static void fio_exit fio_rbd_unregister(void)
690 {
691         unregister_ioengine(&ioengine);
692 }