engines/rbd: use rbd_aio_is_complete() and add TRIM support
[fio.git] / engines / rbd.c
1 /*
2  * rbd engine
3  *
4  * IO engine using Ceph's librbd to test RADOS Block Devices.
5  *
6  */
7
8 #include <rbd/librbd.h>
9
10 #include "../fio.h"
11
12 struct fio_rbd_iou {
13         struct io_u *io_u;
14         rbd_completion_t completion;
15         int io_seen;
16 };
17
18 struct rbd_data {
19         rados_t cluster;
20         rados_ioctx_t io_ctx;
21         rbd_image_t image;
22         struct io_u **aio_events;
23 };
24
25 struct rbd_options {
26         struct thread_data *td;
27         char *rbd_name;
28         char *pool_name;
29         char *client_name;
30 };
31
32 static struct fio_option options[] = {
33         {
34                 .name           = "rbdname",
35                 .lname          = "rbd engine rbdname",
36                 .type           = FIO_OPT_STR_STORE,
37                 .help           = "RBD name for RBD engine",
38                 .off1           = offsetof(struct rbd_options, rbd_name),
39                 .category       = FIO_OPT_C_ENGINE,
40                 .group          = FIO_OPT_G_RBD,
41         },
42         {
43                 .name     = "pool",
44                 .lname    = "rbd engine pool",
45                 .type     = FIO_OPT_STR_STORE,
46                 .help     = "Name of the pool hosting the RBD for the RBD engine",
47                 .off1     = offsetof(struct rbd_options, pool_name),
48                 .category = FIO_OPT_C_ENGINE,
49                 .group    = FIO_OPT_G_RBD,
50         },
51         {
52                 .name     = "clientname",
53                 .lname    = "rbd engine clientname",
54                 .type     = FIO_OPT_STR_STORE,
55                 .help     = "Name of the ceph client to access the RBD for the RBD engine",
56                 .off1     = offsetof(struct rbd_options, client_name),
57                 .category = FIO_OPT_C_ENGINE,
58                 .group    = FIO_OPT_G_RBD,
59         },
60         {
61                 .name = NULL,
62         },
63 };
64
65 static int _fio_setup_rbd_data(struct thread_data *td,
66                                struct rbd_data **rbd_data_ptr)
67 {
68         struct rbd_data *rbd_data;
69
70         if (td->io_ops->data)
71                 return 0;
72
73         rbd_data = malloc(sizeof(struct rbd_data));
74         if (!rbd_data)
75                 goto failed;
76
77         memset(rbd_data, 0, sizeof(struct rbd_data));
78
79         rbd_data->aio_events = malloc(td->o.iodepth * sizeof(struct io_u *));
80         if (!rbd_data->aio_events)
81                 goto failed;
82
83         memset(rbd_data->aio_events, 0, td->o.iodepth * sizeof(struct io_u *));
84
85         *rbd_data_ptr = rbd_data;
86
87         return 0;
88
89 failed:
90         if (rbd_data)
91                 free(rbd_data);
92         return 1;
93
94 }
95
96 static int _fio_rbd_connect(struct thread_data *td)
97 {
98         struct rbd_data *rbd_data = td->io_ops->data;
99         struct rbd_options *o = td->eo;
100         int r;
101
102         r = rados_create(&rbd_data->cluster, o->client_name);
103         if (r < 0) {
104                 log_err("rados_create failed.\n");
105                 goto failed_early;
106         }
107
108         r = rados_conf_read_file(rbd_data->cluster, NULL);
109         if (r < 0) {
110                 log_err("rados_conf_read_file failed.\n");
111                 goto failed_early;
112         }
113
114         r = rados_connect(rbd_data->cluster);
115         if (r < 0) {
116                 log_err("rados_connect failed.\n");
117                 goto failed_shutdown;
118         }
119
120         r = rados_ioctx_create(rbd_data->cluster, o->pool_name,
121                                &rbd_data->io_ctx);
122         if (r < 0) {
123                 log_err("rados_ioctx_create failed.\n");
124                 goto failed_shutdown;
125         }
126
127         r = rbd_open(rbd_data->io_ctx, o->rbd_name, &rbd_data->image,
128                      NULL /*snap */ );
129         if (r < 0) {
130                 log_err("rbd_open failed.\n");
131                 goto failed_open;
132         }
133         return 0;
134
135 failed_open:
136         rados_ioctx_destroy(rbd_data->io_ctx);
137         rbd_data->io_ctx = NULL;
138 failed_shutdown:
139         rados_shutdown(rbd_data->cluster);
140         rbd_data->cluster = NULL;
141 failed_early:
142         return 1;
143 }
144
145 static void _fio_rbd_disconnect(struct rbd_data *rbd_data)
146 {
147         if (!rbd_data)
148                 return;
149
150         /* shutdown everything */
151         if (rbd_data->image) {
152                 rbd_close(rbd_data->image);
153                 rbd_data->image = NULL;
154         }
155
156         if (rbd_data->io_ctx) {
157                 rados_ioctx_destroy(rbd_data->io_ctx);
158                 rbd_data->io_ctx = NULL;
159         }
160
161         if (rbd_data->cluster) {
162                 rados_shutdown(rbd_data->cluster);
163                 rbd_data->cluster = NULL;
164         }
165 }
166
167 static void _fio_rbd_finish_aiocb(rbd_completion_t comp, void *data)
168 {
169         struct fio_rbd_iou *fri = data;
170         struct io_u *io_u = fri->io_u;
171         ssize_t ret;
172
173         /*
174          * Looks like return value is 0 for success, or < 0 for
175          * a specific error. So we have to assume that it can't do
176          * partial completions.
177          */
178         ret = rbd_aio_get_return_value(fri->completion);
179         if (ret < 0) {
180                 io_u->error = ret;
181                 io_u->resid = io_u->xfer_buflen;
182         } else
183                 io_u->error = 0;
184 }
185
186 static struct io_u *fio_rbd_event(struct thread_data *td, int event)
187 {
188         struct rbd_data *rbd_data = td->io_ops->data;
189
190         return rbd_data->aio_events[event];
191 }
192
193 static inline int fri_check_complete(struct rbd_data *rbd_data,
194                                      struct io_u *io_u,
195                                      unsigned int *events)
196 {
197         struct fio_rbd_iou *fri = io_u->engine_data;
198
199         if (rbd_aio_is_complete(fri->completion)) {
200                 fri->io_seen = 1;
201                 rbd_data->aio_events[*events] = io_u;
202                 (*events)++;
203
204                 rbd_aio_release(fri->completion);
205                 return 1;
206         }
207
208         return 0;
209 }
210
211 static int rbd_iter_events(struct thread_data *td, unsigned int *events,
212                            unsigned int min_evts, int wait)
213 {
214         struct rbd_data *rbd_data = td->io_ops->data;
215         unsigned int this_events = 0;
216         struct io_u *io_u;
217         int i;
218
219         io_u_qiter(&td->io_u_all, io_u, i) {
220                 struct fio_rbd_iou *fri = io_u->engine_data;
221
222                 if (!(io_u->flags & IO_U_F_FLIGHT))
223                         continue;
224                 if (fri->io_seen)
225                         continue;
226
227                 if (fri_check_complete(rbd_data, io_u, events))
228                         this_events++;
229                 else if (wait) {
230                         rbd_aio_wait_for_complete(fri->completion);
231
232                         if (fri_check_complete(rbd_data, io_u, events))
233                                 this_events++;
234                 }
235                 if (*events >= min_evts)
236                         break;
237         }
238
239         return this_events;
240 }
241
242 static int fio_rbd_getevents(struct thread_data *td, unsigned int min,
243                              unsigned int max, const struct timespec *t)
244 {
245         unsigned int this_events, events = 0;
246         int wait = 0;
247
248         do {
249                 this_events = rbd_iter_events(td, &events, min, wait);
250
251                 if (events >= min)
252                         break;
253                 if (this_events)
254                         continue;
255
256                 wait = 1;
257         } while (1);
258
259         return events;
260 }
261
262 static int fio_rbd_queue(struct thread_data *td, struct io_u *io_u)
263 {
264         struct rbd_data *rbd_data = td->io_ops->data;
265         struct fio_rbd_iou *fri = io_u->engine_data;
266         int r = -1;
267
268         fio_ro_check(td, io_u);
269
270         fri->io_seen = 0;
271
272         r = rbd_aio_create_completion(fri, _fio_rbd_finish_aiocb,
273                                                 &fri->completion);
274         if (r < 0) {
275                 log_err("rbd_aio_create_completion failed.\n");
276                 goto failed;
277         }
278
279         if (io_u->ddir == DDIR_WRITE) {
280                 r = rbd_aio_write(rbd_data->image, io_u->offset,
281                                   io_u->xfer_buflen, io_u->xfer_buf,
282                                   fri->completion);
283                 if (r < 0) {
284                         log_err("rbd_aio_write failed.\n");
285                         goto failed_comp;
286                 }
287
288         } else if (io_u->ddir == DDIR_READ) {
289                 r = rbd_aio_read(rbd_data->image, io_u->offset,
290                                  io_u->xfer_buflen, io_u->xfer_buf,
291                                  fri->completion);
292
293                 if (r < 0) {
294                         log_err("rbd_aio_read failed.\n");
295                         goto failed_comp;
296                 }
297         } else if (io_u->ddir == DDIR_TRIM) {
298                 r = rbd_aio_discard(rbd_data->image, io_u->offset,
299                                  io_u->xfer_buflen, fri->completion);
300                 if (r < 0) {
301                         log_err("rbd_aio_discard failed.\n");
302                         goto failed_comp;
303                 }
304         } else if (io_u->ddir == DDIR_SYNC) {
305                 r = rbd_aio_flush(rbd_data->image, fri->completion);
306                 if (r < 0) {
307                         log_err("rbd_flush failed.\n");
308                         goto failed_comp;
309                 }
310         } else {
311                 dprint(FD_IO, "%s: Warning: unhandled ddir: %d\n", __func__,
312                        io_u->ddir);
313                 goto failed_comp;
314         }
315
316         return FIO_Q_QUEUED;
317 failed_comp:
318         rbd_aio_release(fri->completion);
319 failed:
320         io_u->error = r;
321         td_verror(td, io_u->error, "xfer");
322         return FIO_Q_COMPLETED;
323 }
324
325 static int fio_rbd_init(struct thread_data *td)
326 {
327         int r;
328
329         r = _fio_rbd_connect(td);
330         if (r) {
331                 log_err("fio_rbd_connect failed, return code: %d .\n", r);
332                 goto failed;
333         }
334
335         return 0;
336
337 failed:
338         return 1;
339 }
340
341 static void fio_rbd_cleanup(struct thread_data *td)
342 {
343         struct rbd_data *rbd_data = td->io_ops->data;
344
345         if (rbd_data) {
346                 _fio_rbd_disconnect(rbd_data);
347                 free(rbd_data->aio_events);
348                 free(rbd_data);
349         }
350
351 }
352
353 static int fio_rbd_setup(struct thread_data *td)
354 {
355         int r = 0;
356         rbd_image_info_t info;
357         struct fio_file *f;
358         struct rbd_data *rbd_data = NULL;
359         int major, minor, extra;
360
361         /* log version of librbd. No cluster connection required. */
362         rbd_version(&major, &minor, &extra);
363         log_info("rbd engine: RBD version: %d.%d.%d\n", major, minor, extra);
364
365         /* allocate engine specific structure to deal with librbd. */
366         r = _fio_setup_rbd_data(td, &rbd_data);
367         if (r) {
368                 log_err("fio_setup_rbd_data failed.\n");
369                 goto cleanup;
370         }
371         td->io_ops->data = rbd_data;
372
373         /* librbd does not allow us to run first in the main thread and later
374          * in a fork child. It needs to be the same process context all the
375          * time. 
376          */
377         td->o.use_thread = 1;
378
379         /* connect in the main thread to determine to determine
380          * the size of the given RADOS block device. And disconnect
381          * later on.
382          */
383         r = _fio_rbd_connect(td);
384         if (r) {
385                 log_err("fio_rbd_connect failed.\n");
386                 goto cleanup;
387         }
388
389         /* get size of the RADOS block device */
390         r = rbd_stat(rbd_data->image, &info, sizeof(info));
391         if (r < 0) {
392                 log_err("rbd_status failed.\n");
393                 goto disconnect;
394         }
395         dprint(FD_IO, "rbd-engine: image size: %lu\n", info.size);
396
397         /* taken from "net" engine. Pretend we deal with files,
398          * even if we do not have any ideas about files.
399          * The size of the RBD is set instead of a artificial file.
400          */
401         if (!td->files_index) {
402                 add_file(td, td->o.filename ? : "rbd", 0, 0);
403                 td->o.nr_files = td->o.nr_files ? : 1;
404                 td->o.open_files++;
405         }
406         f = td->files[0];
407         f->real_file_size = info.size;
408
409         /* disconnect, then we were only connected to determine
410          * the size of the RBD.
411          */
412         _fio_rbd_disconnect(rbd_data);
413         return 0;
414
415 disconnect:
416         _fio_rbd_disconnect(rbd_data);
417 cleanup:
418         fio_rbd_cleanup(td);
419         return r;
420 }
421
422 static int fio_rbd_open(struct thread_data *td, struct fio_file *f)
423 {
424         return 0;
425 }
426
427 static int fio_rbd_invalidate(struct thread_data *td, struct fio_file *f)
428 {
429 #if defined(CONFIG_RBD_INVAL)
430         struct rbd_data *rbd_data = td->io_ops->data;
431
432         return rbd_invalidate_cache(rbd_data->image);
433 #else
434         return 0;
435 #endif
436 }
437
438 static void fio_rbd_io_u_free(struct thread_data *td, struct io_u *io_u)
439 {
440         struct fio_rbd_iou *fri = io_u->engine_data;
441
442         if (fri) {
443                 io_u->engine_data = NULL;
444                 free(fri);
445         }
446 }
447
448 static int fio_rbd_io_u_init(struct thread_data *td, struct io_u *io_u)
449 {
450         struct fio_rbd_iou *fri;
451
452         fri = calloc(1, sizeof(*fri));
453         fri->io_u = io_u;
454         io_u->engine_data = fri;
455         return 0;
456 }
457
458 static struct ioengine_ops ioengine = {
459         .name                   = "rbd",
460         .version                = FIO_IOOPS_VERSION,
461         .setup                  = fio_rbd_setup,
462         .init                   = fio_rbd_init,
463         .queue                  = fio_rbd_queue,
464         .getevents              = fio_rbd_getevents,
465         .event                  = fio_rbd_event,
466         .cleanup                = fio_rbd_cleanup,
467         .open_file              = fio_rbd_open,
468         .invalidate             = fio_rbd_invalidate,
469         .options                = options,
470         .io_u_init              = fio_rbd_io_u_init,
471         .io_u_free              = fio_rbd_io_u_free,
472         .option_struct_size     = sizeof(struct rbd_options),
473 };
474
475 static void fio_init fio_rbd_register(void)
476 {
477         register_ioengine(&ioengine);
478 }
479
480 static void fio_exit fio_rbd_unregister(void)
481 {
482         unregister_ioengine(&ioengine);
483 }