Add support for blkin tracing in rbd engine
[fio.git] / engines / rbd.c
1 /*
2  * rbd engine
3  *
4  * IO engine using Ceph's librbd to test RADOS Block Devices.
5  *
6  */
7
8 #include <rbd/librbd.h>
9
10 #include "../fio.h"
11 #include "../optgroup.h"
12 #ifdef CONFIG_RBD_BLKIN
13 #include <zipkin_c.h>
14 #endif
15
16 struct fio_rbd_iou {
17         struct io_u *io_u;
18         rbd_completion_t completion;
19         int io_seen;
20         int io_complete;
21         struct blkin_trace_info info;
22 };
23
24 struct rbd_data {
25         rados_t cluster;
26         rados_ioctx_t io_ctx;
27         rbd_image_t image;
28         struct io_u **aio_events;
29         struct io_u **sort_events;
30 };
31
32 struct rbd_options {
33         void *pad;
34         char *cluster_name;
35         char *rbd_name;
36         char *pool_name;
37         char *client_name;
38         int busy_poll;
39 };
40
41 static struct fio_option options[] = {
42         {
43                 .name           = "clustername",
44                 .lname          = "ceph cluster name",
45                 .type           = FIO_OPT_STR_STORE,
46                 .help           = "Cluster name for ceph",
47                 .off1           = offsetof(struct rbd_options, cluster_name),
48                 .category       = FIO_OPT_C_ENGINE,
49                 .group          = FIO_OPT_G_RBD,
50         },
51         {
52                 .name           = "rbdname",
53                 .lname          = "rbd engine rbdname",
54                 .type           = FIO_OPT_STR_STORE,
55                 .help           = "RBD name for RBD engine",
56                 .off1           = offsetof(struct rbd_options, rbd_name),
57                 .category       = FIO_OPT_C_ENGINE,
58                 .group          = FIO_OPT_G_RBD,
59         },
60         {
61                 .name           = "pool",
62                 .lname          = "rbd engine pool",
63                 .type           = FIO_OPT_STR_STORE,
64                 .help           = "Name of the pool hosting the RBD for the RBD engine",
65                 .off1           = offsetof(struct rbd_options, pool_name),
66                 .category       = FIO_OPT_C_ENGINE,
67                 .group          = FIO_OPT_G_RBD,
68         },
69         {
70                 .name           = "clientname",
71                 .lname          = "rbd engine clientname",
72                 .type           = FIO_OPT_STR_STORE,
73                 .help           = "Name of the ceph client to access the RBD for the RBD engine",
74                 .off1           = offsetof(struct rbd_options, client_name),
75                 .category       = FIO_OPT_C_ENGINE,
76                 .group          = FIO_OPT_G_RBD,
77         },
78         {
79                 .name           = "busy_poll",
80                 .lname          = "Busy poll",
81                 .type           = FIO_OPT_BOOL,
82                 .help           = "Busy poll for completions instead of sleeping",
83                 .off1           = offsetof(struct rbd_options, busy_poll),
84                 .def            = "0",
85                 .category       = FIO_OPT_C_ENGINE,
86                 .group          = FIO_OPT_G_RBD,
87         },
88         {
89                 .name = NULL,
90         },
91 };
92
93 static int _fio_setup_rbd_data(struct thread_data *td,
94                                struct rbd_data **rbd_data_ptr)
95 {
96         struct rbd_data *rbd;
97
98         if (td->io_ops->data)
99                 return 0;
100
101         rbd = calloc(1, sizeof(struct rbd_data));
102         if (!rbd)
103                 goto failed;
104
105         rbd->aio_events = calloc(td->o.iodepth, sizeof(struct io_u *));
106         if (!rbd->aio_events)
107                 goto failed;
108
109         rbd->sort_events = calloc(td->o.iodepth, sizeof(struct io_u *));
110         if (!rbd->sort_events)
111                 goto failed;
112
113         *rbd_data_ptr = rbd;
114         return 0;
115
116 failed:
117         if (rbd)
118                 free(rbd);
119         return 1;
120
121 }
122
123 static int _fio_rbd_connect(struct thread_data *td)
124 {
125         struct rbd_data *rbd = td->io_ops->data;
126         struct rbd_options *o = td->eo;
127         int r;
128
129         if (o->cluster_name) {
130                 char *client_name = NULL; 
131
132                 /*
133                  * If we specify cluser name, the rados_creat2
134                  * will not assume 'client.'. name is considered
135                  * as a full type.id namestr
136                  */
137                 if (!index(o->client_name, '.')) {
138                         client_name = calloc(1, strlen("client.") +
139                                                 strlen(o->client_name) + 1);
140                         strcat(client_name, "client.");
141                         o->client_name = strcat(client_name, o->client_name);
142                 }
143                 r = rados_create2(&rbd->cluster, o->cluster_name,
144                                         o->client_name, 0);
145         } else
146                 r = rados_create(&rbd->cluster, o->client_name);
147         
148         if (r < 0) {
149                 log_err("rados_create failed.\n");
150                 goto failed_early;
151         }
152
153         r = rados_conf_read_file(rbd->cluster, NULL);
154         if (r < 0) {
155                 log_err("rados_conf_read_file failed.\n");
156                 goto failed_early;
157         }
158
159         r = rados_connect(rbd->cluster);
160         if (r < 0) {
161                 log_err("rados_connect failed.\n");
162                 goto failed_shutdown;
163         }
164
165         r = rados_ioctx_create(rbd->cluster, o->pool_name, &rbd->io_ctx);
166         if (r < 0) {
167                 log_err("rados_ioctx_create failed.\n");
168                 goto failed_shutdown;
169         }
170
171         r = rbd_open(rbd->io_ctx, o->rbd_name, &rbd->image, NULL /*snap */ );
172         if (r < 0) {
173                 log_err("rbd_open failed.\n");
174                 goto failed_open;
175         }
176         return 0;
177
178 failed_open:
179         rados_ioctx_destroy(rbd->io_ctx);
180         rbd->io_ctx = NULL;
181 failed_shutdown:
182         rados_shutdown(rbd->cluster);
183         rbd->cluster = NULL;
184 failed_early:
185         return 1;
186 }
187
188 static void _fio_rbd_disconnect(struct rbd_data *rbd)
189 {
190         if (!rbd)
191                 return;
192
193         /* shutdown everything */
194         if (rbd->image) {
195                 rbd_close(rbd->image);
196                 rbd->image = NULL;
197         }
198
199         if (rbd->io_ctx) {
200                 rados_ioctx_destroy(rbd->io_ctx);
201                 rbd->io_ctx = NULL;
202         }
203
204         if (rbd->cluster) {
205                 rados_shutdown(rbd->cluster);
206                 rbd->cluster = NULL;
207         }
208 }
209
210 static void _fio_rbd_finish_aiocb(rbd_completion_t comp, void *data)
211 {
212         struct fio_rbd_iou *fri = data;
213         struct io_u *io_u = fri->io_u;
214         ssize_t ret;
215
216         /*
217          * Looks like return value is 0 for success, or < 0 for
218          * a specific error. So we have to assume that it can't do
219          * partial completions.
220          */
221         ret = rbd_aio_get_return_value(fri->completion);
222         if (ret < 0) {
223                 io_u->error = ret;
224                 io_u->resid = io_u->xfer_buflen;
225         } else
226                 io_u->error = 0;
227
228         fri->io_complete = 1;
229 }
230
231 static struct io_u *fio_rbd_event(struct thread_data *td, int event)
232 {
233         struct rbd_data *rbd = td->io_ops->data;
234
235         return rbd->aio_events[event];
236 }
237
238 static inline int fri_check_complete(struct rbd_data *rbd, struct io_u *io_u,
239                                      unsigned int *events)
240 {
241         struct fio_rbd_iou *fri = io_u->engine_data;
242
243         if (fri->io_complete) {
244                 fri->io_seen = 1;
245                 rbd->aio_events[*events] = io_u;
246                 (*events)++;
247
248                 rbd_aio_release(fri->completion);
249                 return 1;
250         }
251
252         return 0;
253 }
254
255 static inline int rbd_io_u_seen(struct io_u *io_u)
256 {
257         struct fio_rbd_iou *fri = io_u->engine_data;
258
259         return fri->io_seen;
260 }
261
262 static void rbd_io_u_wait_complete(struct io_u *io_u)
263 {
264         struct fio_rbd_iou *fri = io_u->engine_data;
265
266         rbd_aio_wait_for_complete(fri->completion);
267 }
268
269 static int rbd_io_u_cmp(const void *p1, const void *p2)
270 {
271         const struct io_u **a = (const struct io_u **) p1;
272         const struct io_u **b = (const struct io_u **) p2;
273         uint64_t at, bt;
274
275         at = utime_since_now(&(*a)->start_time);
276         bt = utime_since_now(&(*b)->start_time);
277
278         if (at < bt)
279                 return -1;
280         else if (at == bt)
281                 return 0;
282         else
283                 return 1;
284 }
285
286 static int rbd_iter_events(struct thread_data *td, unsigned int *events,
287                            unsigned int min_evts, int wait)
288 {
289         struct rbd_data *rbd = td->io_ops->data;
290         unsigned int this_events = 0;
291         struct io_u *io_u;
292         int i, sidx;
293
294         sidx = 0;
295         io_u_qiter(&td->io_u_all, io_u, i) {
296                 if (!(io_u->flags & IO_U_F_FLIGHT))
297                         continue;
298                 if (rbd_io_u_seen(io_u))
299                         continue;
300
301                 if (fri_check_complete(rbd, io_u, events))
302                         this_events++;
303                 else if (wait)
304                         rbd->sort_events[sidx++] = io_u;
305         }
306
307         if (!wait || !sidx)
308                 return this_events;
309
310         /*
311          * Sort events, oldest issue first, then wait on as many as we
312          * need in order of age. If we have enough events, stop waiting,
313          * and just check if any of the older ones are done.
314          */
315         if (sidx > 1)
316                 qsort(rbd->sort_events, sidx, sizeof(struct io_u *), rbd_io_u_cmp);
317
318         for (i = 0; i < sidx; i++) {
319                 io_u = rbd->sort_events[i];
320
321                 if (fri_check_complete(rbd, io_u, events)) {
322                         this_events++;
323                         continue;
324                 }
325
326                 /*
327                  * Stop waiting when we have enough, but continue checking
328                  * all pending IOs if they are complete.
329                  */
330                 if (*events >= min_evts)
331                         continue;
332
333                 rbd_io_u_wait_complete(io_u);
334
335                 if (fri_check_complete(rbd, io_u, events))
336                         this_events++;
337         }
338
339         return this_events;
340 }
341
342 static int fio_rbd_getevents(struct thread_data *td, unsigned int min,
343                              unsigned int max, const struct timespec *t)
344 {
345         unsigned int this_events, events = 0;
346         struct rbd_options *o = td->eo;
347         int wait = 0;
348
349         do {
350                 this_events = rbd_iter_events(td, &events, min, wait);
351
352                 if (events >= min)
353                         break;
354                 if (this_events)
355                         continue;
356
357                 if (!o->busy_poll)
358                         wait = 1;
359                 else
360                         nop;
361         } while (1);
362
363         return events;
364 }
365
366 static int fio_rbd_queue(struct thread_data *td, struct io_u *io_u)
367 {
368         struct rbd_data *rbd = td->io_ops->data;
369         struct fio_rbd_iou *fri = io_u->engine_data;
370         int r = -1;
371
372         fio_ro_check(td, io_u);
373
374         fri->io_seen = 0;
375         fri->io_complete = 0;
376
377         r = rbd_aio_create_completion(fri, _fio_rbd_finish_aiocb,
378                                                 &fri->completion);
379         if (r < 0) {
380                 log_err("rbd_aio_create_completion failed.\n");
381                 goto failed;
382         }
383
384         if (io_u->ddir == DDIR_WRITE) {
385 #ifdef CONFIG_RBD_BLKIN
386                 blkin_init_trace_info(&fri->info);
387                 r = rbd_aio_write_traced(rbd->image, io_u->offset, io_u->xfer_buflen,
388                                          io_u->xfer_buf, fri->completion, &fri->info);
389 #else
390                 r = rbd_aio_write(rbd->image, io_u->offset, io_u->xfer_buflen,
391                                          io_u->xfer_buf, fri->completion);
392 #endif
393                 if (r < 0) {
394                         log_err("rbd_aio_write failed.\n");
395                         goto failed_comp;
396                 }
397
398         } else if (io_u->ddir == DDIR_READ) {
399 #ifdef CONFIG_RBD_BLKIN
400                 blkin_init_trace_info(&fri->info);
401                 r = rbd_aio_read_traced(rbd->image, io_u->offset, io_u->xfer_buflen,
402                                         io_u->xfer_buf, fri->completion, &fri->info);
403 #else
404                 r = rbd_aio_read(rbd->image, io_u->offset, io_u->xfer_buflen,
405                                         io_u->xfer_buf, fri->completion);
406 #endif
407
408                 if (r < 0) {
409                         log_err("rbd_aio_read failed.\n");
410                         goto failed_comp;
411                 }
412         } else if (io_u->ddir == DDIR_TRIM) {
413                 r = rbd_aio_discard(rbd->image, io_u->offset,
414                                         io_u->xfer_buflen, fri->completion);
415                 if (r < 0) {
416                         log_err("rbd_aio_discard failed.\n");
417                         goto failed_comp;
418                 }
419         } else if (io_u->ddir == DDIR_SYNC) {
420                 r = rbd_aio_flush(rbd->image, fri->completion);
421                 if (r < 0) {
422                         log_err("rbd_flush failed.\n");
423                         goto failed_comp;
424                 }
425         } else {
426                 dprint(FD_IO, "%s: Warning: unhandled ddir: %d\n", __func__,
427                        io_u->ddir);
428                 goto failed_comp;
429         }
430
431         return FIO_Q_QUEUED;
432 failed_comp:
433         rbd_aio_release(fri->completion);
434 failed:
435         io_u->error = r;
436         td_verror(td, io_u->error, "xfer");
437         return FIO_Q_COMPLETED;
438 }
439
440 static int fio_rbd_init(struct thread_data *td)
441 {
442         int r;
443
444         r = _fio_rbd_connect(td);
445         if (r) {
446                 log_err("fio_rbd_connect failed, return code: %d .\n", r);
447                 goto failed;
448         }
449
450         return 0;
451
452 failed:
453         return 1;
454 }
455
456 static void fio_rbd_cleanup(struct thread_data *td)
457 {
458         struct rbd_data *rbd = td->io_ops->data;
459
460         if (rbd) {
461                 _fio_rbd_disconnect(rbd);
462                 free(rbd->aio_events);
463                 free(rbd->sort_events);
464                 free(rbd);
465         }
466 }
467
468 static int fio_rbd_setup(struct thread_data *td)
469 {
470         rbd_image_info_t info;
471         struct fio_file *f;
472         struct rbd_data *rbd = NULL;
473         int major, minor, extra;
474         int r;
475
476         /* log version of librbd. No cluster connection required. */
477         rbd_version(&major, &minor, &extra);
478         log_info("rbd engine: RBD version: %d.%d.%d\n", major, minor, extra);
479
480         /* allocate engine specific structure to deal with librbd. */
481         r = _fio_setup_rbd_data(td, &rbd);
482         if (r) {
483                 log_err("fio_setup_rbd_data failed.\n");
484                 goto cleanup;
485         }
486         td->io_ops->data = rbd;
487
488         /* librbd does not allow us to run first in the main thread and later
489          * in a fork child. It needs to be the same process context all the
490          * time. 
491          */
492         td->o.use_thread = 1;
493
494         /* connect in the main thread to determine to determine
495          * the size of the given RADOS block device. And disconnect
496          * later on.
497          */
498         r = _fio_rbd_connect(td);
499         if (r) {
500                 log_err("fio_rbd_connect failed.\n");
501                 goto cleanup;
502         }
503
504         /* get size of the RADOS block device */
505         r = rbd_stat(rbd->image, &info, sizeof(info));
506         if (r < 0) {
507                 log_err("rbd_status failed.\n");
508                 goto disconnect;
509         }
510         dprint(FD_IO, "rbd-engine: image size: %lu\n", info.size);
511
512         /* taken from "net" engine. Pretend we deal with files,
513          * even if we do not have any ideas about files.
514          * The size of the RBD is set instead of a artificial file.
515          */
516         if (!td->files_index) {
517                 add_file(td, td->o.filename ? : "rbd", 0, 0);
518                 td->o.nr_files = td->o.nr_files ? : 1;
519                 td->o.open_files++;
520         }
521         f = td->files[0];
522         f->real_file_size = info.size;
523
524         /* disconnect, then we were only connected to determine
525          * the size of the RBD.
526          */
527         _fio_rbd_disconnect(rbd);
528         return 0;
529
530 disconnect:
531         _fio_rbd_disconnect(rbd);
532 cleanup:
533         fio_rbd_cleanup(td);
534         return r;
535 }
536
537 static int fio_rbd_open(struct thread_data *td, struct fio_file *f)
538 {
539         return 0;
540 }
541
542 static int fio_rbd_invalidate(struct thread_data *td, struct fio_file *f)
543 {
544 #if defined(CONFIG_RBD_INVAL)
545         struct rbd_data *rbd = td->io_ops->data;
546
547         return rbd_invalidate_cache(rbd->image);
548 #else
549         return 0;
550 #endif
551 }
552
553 static void fio_rbd_io_u_free(struct thread_data *td, struct io_u *io_u)
554 {
555         struct fio_rbd_iou *fri = io_u->engine_data;
556
557         if (fri) {
558                 io_u->engine_data = NULL;
559                 free(fri);
560         }
561 }
562
563 static int fio_rbd_io_u_init(struct thread_data *td, struct io_u *io_u)
564 {
565         struct fio_rbd_iou *fri;
566
567         fri = calloc(1, sizeof(*fri));
568         fri->io_u = io_u;
569         io_u->engine_data = fri;
570         return 0;
571 }
572
573 static struct ioengine_ops ioengine = {
574         .name                   = "rbd",
575         .version                = FIO_IOOPS_VERSION,
576         .setup                  = fio_rbd_setup,
577         .init                   = fio_rbd_init,
578         .queue                  = fio_rbd_queue,
579         .getevents              = fio_rbd_getevents,
580         .event                  = fio_rbd_event,
581         .cleanup                = fio_rbd_cleanup,
582         .open_file              = fio_rbd_open,
583         .invalidate             = fio_rbd_invalidate,
584         .options                = options,
585         .io_u_init              = fio_rbd_io_u_init,
586         .io_u_free              = fio_rbd_io_u_free,
587         .option_struct_size     = sizeof(struct rbd_options),
588 };
589
590 static void fio_init fio_rbd_register(void)
591 {
592         register_ioengine(&ioengine);
593 }
594
595 static void fio_exit fio_rbd_unregister(void)
596 {
597         unregister_ioengine(&ioengine);
598 }