This commit / feature adds completion latency histogram output to fio, piggybacking
[fio.git] / engines / rbd.c
1 /*
2  * rbd engine
3  *
4  * IO engine using Ceph's librbd to test RADOS Block Devices.
5  *
6  */
7
8 #include <rbd/librbd.h>
9
10 #include "../fio.h"
11 #include "../optgroup.h"
12
13 struct fio_rbd_iou {
14         struct io_u *io_u;
15         rbd_completion_t completion;
16         int io_seen;
17         int io_complete;
18 };
19
20 struct rbd_data {
21         rados_t cluster;
22         rados_ioctx_t io_ctx;
23         rbd_image_t image;
24         struct io_u **aio_events;
25         struct io_u **sort_events;
26 };
27
28 struct rbd_options {
29         void *pad;
30         char *cluster_name;
31         char *rbd_name;
32         char *pool_name;
33         char *client_name;
34         int busy_poll;
35 };
36
37 static struct fio_option options[] = {
38         {
39                 .name           = "clustername",
40                 .lname          = "ceph cluster name",
41                 .type           = FIO_OPT_STR_STORE,
42                 .help           = "Cluster name for ceph",
43                 .off1           = offsetof(struct rbd_options, cluster_name),
44                 .category       = FIO_OPT_C_ENGINE,
45                 .group          = FIO_OPT_G_RBD,
46         },
47         {
48                 .name           = "rbdname",
49                 .lname          = "rbd engine rbdname",
50                 .type           = FIO_OPT_STR_STORE,
51                 .help           = "RBD name for RBD engine",
52                 .off1           = offsetof(struct rbd_options, rbd_name),
53                 .category       = FIO_OPT_C_ENGINE,
54                 .group          = FIO_OPT_G_RBD,
55         },
56         {
57                 .name           = "pool",
58                 .lname          = "rbd engine pool",
59                 .type           = FIO_OPT_STR_STORE,
60                 .help           = "Name of the pool hosting the RBD for the RBD engine",
61                 .off1           = offsetof(struct rbd_options, pool_name),
62                 .category       = FIO_OPT_C_ENGINE,
63                 .group          = FIO_OPT_G_RBD,
64         },
65         {
66                 .name           = "clientname",
67                 .lname          = "rbd engine clientname",
68                 .type           = FIO_OPT_STR_STORE,
69                 .help           = "Name of the ceph client to access the RBD for the RBD engine",
70                 .off1           = offsetof(struct rbd_options, client_name),
71                 .category       = FIO_OPT_C_ENGINE,
72                 .group          = FIO_OPT_G_RBD,
73         },
74         {
75                 .name           = "busy_poll",
76                 .lname          = "Busy poll",
77                 .type           = FIO_OPT_BOOL,
78                 .help           = "Busy poll for completions instead of sleeping",
79                 .off1           = offsetof(struct rbd_options, busy_poll),
80                 .def            = "0",
81                 .category       = FIO_OPT_C_ENGINE,
82                 .group          = FIO_OPT_G_RBD,
83         },
84         {
85                 .name = NULL,
86         },
87 };
88
89 static int _fio_setup_rbd_data(struct thread_data *td,
90                                struct rbd_data **rbd_data_ptr)
91 {
92         struct rbd_data *rbd;
93
94         if (td->io_ops_data)
95                 return 0;
96
97         rbd = calloc(1, sizeof(struct rbd_data));
98         if (!rbd)
99                 goto failed;
100
101         rbd->aio_events = calloc(td->o.iodepth, sizeof(struct io_u *));
102         if (!rbd->aio_events)
103                 goto failed;
104
105         rbd->sort_events = calloc(td->o.iodepth, sizeof(struct io_u *));
106         if (!rbd->sort_events)
107                 goto failed;
108
109         *rbd_data_ptr = rbd;
110         return 0;
111
112 failed:
113         if (rbd) {
114                 if (rbd->aio_events) 
115                         free(rbd->aio_events);
116                 if (rbd->sort_events)
117                         free(rbd->sort_events);
118                 free(rbd);
119         }
120         return 1;
121
122 }
123
124 static int _fio_rbd_connect(struct thread_data *td)
125 {
126         struct rbd_data *rbd = td->io_ops_data;
127         struct rbd_options *o = td->eo;
128         int r;
129
130         if (o->cluster_name) {
131                 char *client_name = NULL; 
132
133                 /*
134                  * If we specify cluser name, the rados_create2
135                  * will not assume 'client.'. name is considered
136                  * as a full type.id namestr
137                  */
138                 if (o->client_name) {
139                         if (!index(o->client_name, '.')) {
140                                 client_name = calloc(1, strlen("client.") +
141                                                     strlen(o->client_name) + 1);
142                                 strcat(client_name, "client.");
143                                 strcat(client_name, o->client_name);
144                         } else {
145                                 client_name = o->client_name;
146                         }
147                 }
148
149                 r = rados_create2(&rbd->cluster, o->cluster_name,
150                                  client_name, 0);
151
152                 if (client_name && !index(o->client_name, '.'))
153                         free(client_name);
154         } else
155                 r = rados_create(&rbd->cluster, o->client_name);
156         
157         if (r < 0) {
158                 log_err("rados_create failed.\n");
159                 goto failed_early;
160         }
161
162         r = rados_conf_read_file(rbd->cluster, NULL);
163         if (r < 0) {
164                 log_err("rados_conf_read_file failed.\n");
165                 goto failed_early;
166         }
167
168         r = rados_connect(rbd->cluster);
169         if (r < 0) {
170                 log_err("rados_connect failed.\n");
171                 goto failed_shutdown;
172         }
173
174         r = rados_ioctx_create(rbd->cluster, o->pool_name, &rbd->io_ctx);
175         if (r < 0) {
176                 log_err("rados_ioctx_create failed.\n");
177                 goto failed_shutdown;
178         }
179
180         r = rbd_open(rbd->io_ctx, o->rbd_name, &rbd->image, NULL /*snap */ );
181         if (r < 0) {
182                 log_err("rbd_open failed.\n");
183                 goto failed_open;
184         }
185         return 0;
186
187 failed_open:
188         rados_ioctx_destroy(rbd->io_ctx);
189         rbd->io_ctx = NULL;
190 failed_shutdown:
191         rados_shutdown(rbd->cluster);
192         rbd->cluster = NULL;
193 failed_early:
194         return 1;
195 }
196
197 static void _fio_rbd_disconnect(struct rbd_data *rbd)
198 {
199         if (!rbd)
200                 return;
201
202         /* shutdown everything */
203         if (rbd->image) {
204                 rbd_close(rbd->image);
205                 rbd->image = NULL;
206         }
207
208         if (rbd->io_ctx) {
209                 rados_ioctx_destroy(rbd->io_ctx);
210                 rbd->io_ctx = NULL;
211         }
212
213         if (rbd->cluster) {
214                 rados_shutdown(rbd->cluster);
215                 rbd->cluster = NULL;
216         }
217 }
218
219 static void _fio_rbd_finish_aiocb(rbd_completion_t comp, void *data)
220 {
221         struct fio_rbd_iou *fri = data;
222         struct io_u *io_u = fri->io_u;
223         ssize_t ret;
224
225         /*
226          * Looks like return value is 0 for success, or < 0 for
227          * a specific error. So we have to assume that it can't do
228          * partial completions.
229          */
230         ret = rbd_aio_get_return_value(fri->completion);
231         if (ret < 0) {
232                 io_u->error = ret;
233                 io_u->resid = io_u->xfer_buflen;
234         } else
235                 io_u->error = 0;
236
237         fri->io_complete = 1;
238 }
239
240 static struct io_u *fio_rbd_event(struct thread_data *td, int event)
241 {
242         struct rbd_data *rbd = td->io_ops_data;
243
244         return rbd->aio_events[event];
245 }
246
247 static inline int fri_check_complete(struct rbd_data *rbd, struct io_u *io_u,
248                                      unsigned int *events)
249 {
250         struct fio_rbd_iou *fri = io_u->engine_data;
251
252         if (fri->io_complete) {
253                 fri->io_seen = 1;
254                 rbd->aio_events[*events] = io_u;
255                 (*events)++;
256
257                 rbd_aio_release(fri->completion);
258                 return 1;
259         }
260
261         return 0;
262 }
263
264 static inline int rbd_io_u_seen(struct io_u *io_u)
265 {
266         struct fio_rbd_iou *fri = io_u->engine_data;
267
268         return fri->io_seen;
269 }
270
271 static void rbd_io_u_wait_complete(struct io_u *io_u)
272 {
273         struct fio_rbd_iou *fri = io_u->engine_data;
274
275         rbd_aio_wait_for_complete(fri->completion);
276 }
277
278 static int rbd_io_u_cmp(const void *p1, const void *p2)
279 {
280         const struct io_u **a = (const struct io_u **) p1;
281         const struct io_u **b = (const struct io_u **) p2;
282         uint64_t at, bt;
283
284         at = utime_since_now(&(*a)->start_time);
285         bt = utime_since_now(&(*b)->start_time);
286
287         if (at < bt)
288                 return -1;
289         else if (at == bt)
290                 return 0;
291         else
292                 return 1;
293 }
294
295 static int rbd_iter_events(struct thread_data *td, unsigned int *events,
296                            unsigned int min_evts, int wait)
297 {
298         struct rbd_data *rbd = td->io_ops_data;
299         unsigned int this_events = 0;
300         struct io_u *io_u;
301         int i, sidx;
302
303         sidx = 0;
304         io_u_qiter(&td->io_u_all, io_u, i) {
305                 if (!(io_u->flags & IO_U_F_FLIGHT))
306                         continue;
307                 if (rbd_io_u_seen(io_u))
308                         continue;
309
310                 if (fri_check_complete(rbd, io_u, events))
311                         this_events++;
312                 else if (wait)
313                         rbd->sort_events[sidx++] = io_u;
314         }
315
316         if (!wait || !sidx)
317                 return this_events;
318
319         /*
320          * Sort events, oldest issue first, then wait on as many as we
321          * need in order of age. If we have enough events, stop waiting,
322          * and just check if any of the older ones are done.
323          */
324         if (sidx > 1)
325                 qsort(rbd->sort_events, sidx, sizeof(struct io_u *), rbd_io_u_cmp);
326
327         for (i = 0; i < sidx; i++) {
328                 io_u = rbd->sort_events[i];
329
330                 if (fri_check_complete(rbd, io_u, events)) {
331                         this_events++;
332                         continue;
333                 }
334
335                 /*
336                  * Stop waiting when we have enough, but continue checking
337                  * all pending IOs if they are complete.
338                  */
339                 if (*events >= min_evts)
340                         continue;
341
342                 rbd_io_u_wait_complete(io_u);
343
344                 if (fri_check_complete(rbd, io_u, events))
345                         this_events++;
346         }
347
348         return this_events;
349 }
350
351 static int fio_rbd_getevents(struct thread_data *td, unsigned int min,
352                              unsigned int max, const struct timespec *t)
353 {
354         unsigned int this_events, events = 0;
355         struct rbd_options *o = td->eo;
356         int wait = 0;
357
358         do {
359                 this_events = rbd_iter_events(td, &events, min, wait);
360
361                 if (events >= min)
362                         break;
363                 if (this_events)
364                         continue;
365
366                 if (!o->busy_poll)
367                         wait = 1;
368                 else
369                         nop;
370         } while (1);
371
372         return events;
373 }
374
375 static int fio_rbd_queue(struct thread_data *td, struct io_u *io_u)
376 {
377         struct rbd_data *rbd = td->io_ops_data;
378         struct fio_rbd_iou *fri = io_u->engine_data;
379         int r = -1;
380
381         fio_ro_check(td, io_u);
382
383         fri->io_seen = 0;
384         fri->io_complete = 0;
385
386         r = rbd_aio_create_completion(fri, _fio_rbd_finish_aiocb,
387                                                 &fri->completion);
388         if (r < 0) {
389                 log_err("rbd_aio_create_completion failed.\n");
390                 goto failed;
391         }
392
393         if (io_u->ddir == DDIR_WRITE) {
394                 r = rbd_aio_write(rbd->image, io_u->offset, io_u->xfer_buflen,
395                                          io_u->xfer_buf, fri->completion);
396                 if (r < 0) {
397                         log_err("rbd_aio_write failed.\n");
398                         goto failed_comp;
399                 }
400
401         } else if (io_u->ddir == DDIR_READ) {
402                 r = rbd_aio_read(rbd->image, io_u->offset, io_u->xfer_buflen,
403                                         io_u->xfer_buf, fri->completion);
404
405                 if (r < 0) {
406                         log_err("rbd_aio_read failed.\n");
407                         goto failed_comp;
408                 }
409         } else if (io_u->ddir == DDIR_TRIM) {
410                 r = rbd_aio_discard(rbd->image, io_u->offset,
411                                         io_u->xfer_buflen, fri->completion);
412                 if (r < 0) {
413                         log_err("rbd_aio_discard failed.\n");
414                         goto failed_comp;
415                 }
416         } else if (io_u->ddir == DDIR_SYNC) {
417                 r = rbd_aio_flush(rbd->image, fri->completion);
418                 if (r < 0) {
419                         log_err("rbd_flush failed.\n");
420                         goto failed_comp;
421                 }
422         } else {
423                 dprint(FD_IO, "%s: Warning: unhandled ddir: %d\n", __func__,
424                        io_u->ddir);
425                 goto failed_comp;
426         }
427
428         return FIO_Q_QUEUED;
429 failed_comp:
430         rbd_aio_release(fri->completion);
431 failed:
432         io_u->error = r;
433         td_verror(td, io_u->error, "xfer");
434         return FIO_Q_COMPLETED;
435 }
436
437 static int fio_rbd_init(struct thread_data *td)
438 {
439         int r;
440
441         r = _fio_rbd_connect(td);
442         if (r) {
443                 log_err("fio_rbd_connect failed, return code: %d .\n", r);
444                 goto failed;
445         }
446
447         return 0;
448
449 failed:
450         return 1;
451 }
452
453 static void fio_rbd_cleanup(struct thread_data *td)
454 {
455         struct rbd_data *rbd = td->io_ops_data;
456
457         if (rbd) {
458                 _fio_rbd_disconnect(rbd);
459                 free(rbd->aio_events);
460                 free(rbd->sort_events);
461                 free(rbd);
462         }
463 }
464
465 static int fio_rbd_setup(struct thread_data *td)
466 {
467         rbd_image_info_t info;
468         struct fio_file *f;
469         struct rbd_data *rbd = NULL;
470         int major, minor, extra;
471         int r;
472
473         /* log version of librbd. No cluster connection required. */
474         rbd_version(&major, &minor, &extra);
475         log_info("rbd engine: RBD version: %d.%d.%d\n", major, minor, extra);
476
477         /* allocate engine specific structure to deal with librbd. */
478         r = _fio_setup_rbd_data(td, &rbd);
479         if (r) {
480                 log_err("fio_setup_rbd_data failed.\n");
481                 goto cleanup;
482         }
483         td->io_ops_data = rbd;
484
485         /* librbd does not allow us to run first in the main thread and later
486          * in a fork child. It needs to be the same process context all the
487          * time. 
488          */
489         td->o.use_thread = 1;
490
491         /* connect in the main thread to determine to determine
492          * the size of the given RADOS block device. And disconnect
493          * later on.
494          */
495         r = _fio_rbd_connect(td);
496         if (r) {
497                 log_err("fio_rbd_connect failed.\n");
498                 goto cleanup;
499         }
500
501         /* get size of the RADOS block device */
502         r = rbd_stat(rbd->image, &info, sizeof(info));
503         if (r < 0) {
504                 log_err("rbd_status failed.\n");
505                 goto disconnect;
506         }
507         dprint(FD_IO, "rbd-engine: image size: %lu\n", info.size);
508
509         /* taken from "net" engine. Pretend we deal with files,
510          * even if we do not have any ideas about files.
511          * The size of the RBD is set instead of a artificial file.
512          */
513         if (!td->files_index) {
514                 add_file(td, td->o.filename ? : "rbd", 0, 0);
515                 td->o.nr_files = td->o.nr_files ? : 1;
516                 td->o.open_files++;
517         }
518         f = td->files[0];
519         f->real_file_size = info.size;
520
521         /* disconnect, then we were only connected to determine
522          * the size of the RBD.
523          */
524         _fio_rbd_disconnect(rbd);
525         return 0;
526
527 disconnect:
528         _fio_rbd_disconnect(rbd);
529 cleanup:
530         fio_rbd_cleanup(td);
531         return r;
532 }
533
534 static int fio_rbd_open(struct thread_data *td, struct fio_file *f)
535 {
536         return 0;
537 }
538
539 static int fio_rbd_invalidate(struct thread_data *td, struct fio_file *f)
540 {
541 #if defined(CONFIG_RBD_INVAL)
542         struct rbd_data *rbd = td->io_ops_data;
543
544         return rbd_invalidate_cache(rbd->image);
545 #else
546         return 0;
547 #endif
548 }
549
550 static void fio_rbd_io_u_free(struct thread_data *td, struct io_u *io_u)
551 {
552         struct fio_rbd_iou *fri = io_u->engine_data;
553
554         if (fri) {
555                 io_u->engine_data = NULL;
556                 free(fri);
557         }
558 }
559
560 static int fio_rbd_io_u_init(struct thread_data *td, struct io_u *io_u)
561 {
562         struct fio_rbd_iou *fri;
563
564         fri = calloc(1, sizeof(*fri));
565         fri->io_u = io_u;
566         io_u->engine_data = fri;
567         return 0;
568 }
569
570 static struct ioengine_ops ioengine = {
571         .name                   = "rbd",
572         .version                = FIO_IOOPS_VERSION,
573         .setup                  = fio_rbd_setup,
574         .init                   = fio_rbd_init,
575         .queue                  = fio_rbd_queue,
576         .getevents              = fio_rbd_getevents,
577         .event                  = fio_rbd_event,
578         .cleanup                = fio_rbd_cleanup,
579         .open_file              = fio_rbd_open,
580         .invalidate             = fio_rbd_invalidate,
581         .options                = options,
582         .io_u_init              = fio_rbd_io_u_init,
583         .io_u_free              = fio_rbd_io_u_free,
584         .option_struct_size     = sizeof(struct rbd_options),
585 };
586
587 static void fio_init fio_rbd_register(void)
588 {
589         register_ioengine(&ioengine);
590 }
591
592 static void fio_exit fio_rbd_unregister(void)
593 {
594         unregister_ioengine(&ioengine);
595 }