Merge branch 'arm-detect-pmull' of https://github.com/sitsofe/fio
[fio.git] / engines / rados.c
1 /*
2  *  Ceph Rados engine
3  *
4  * IO engine using Ceph's RADOS interface to test low-level performance of
5  * Ceph OSDs.
6  *
7  */
8
9 #include <rados/librados.h>
10 #include <pthread.h>
11 #include "fio.h"
12 #include "../optgroup.h"
13
14 struct rados_data {
15         rados_t cluster;
16         rados_ioctx_t io_ctx;
17         struct io_u **aio_events;
18         bool connected;
19         pthread_mutex_t completed_lock;
20         pthread_cond_t completed_more_io;
21         struct flist_head completed_operations;
22         uint64_t ops_scheduled;
23         uint64_t ops_completed;
24 };
25
26 struct fio_rados_iou {
27         struct flist_head list;
28         struct thread_data *td;
29         struct io_u *io_u;
30         rados_completion_t completion;
31         rados_write_op_t write_op;
32 };
33
34 /* fio configuration options read from the job file */
35 struct rados_options {
36         void *pad;
37         char *cluster_name;
38         char *pool_name;
39         char *client_name;
40         int busy_poll;
41         int touch_objects;
42 };
43
44 static struct fio_option options[] = {
45         {
46                 .name     = "clustername",
47                 .lname    = "ceph cluster name",
48                 .type     = FIO_OPT_STR_STORE,
49                 .help     = "Cluster name for ceph",
50                 .off1     = offsetof(struct rados_options, cluster_name),
51                 .category = FIO_OPT_C_ENGINE,
52                 .group    = FIO_OPT_G_RBD,
53         },
54         {
55                 .name     = "pool",
56                 .lname    = "pool name to use",
57                 .type     = FIO_OPT_STR_STORE,
58                 .help     = "Ceph pool name to benchmark against",
59                 .off1     = offsetof(struct rados_options, pool_name),
60                 .category = FIO_OPT_C_ENGINE,
61                 .group    = FIO_OPT_G_RBD,
62         },
63         {
64                 .name     = "clientname",
65                 .lname    = "rados engine clientname",
66                 .type     = FIO_OPT_STR_STORE,
67                 .help     = "Name of the ceph client to access RADOS engine",
68                 .off1     = offsetof(struct rados_options, client_name),
69                 .category = FIO_OPT_C_ENGINE,
70                 .group    = FIO_OPT_G_RBD,
71         },
72         {
73                 .name     = "busy_poll",
74                 .lname    = "busy poll mode",
75                 .type     = FIO_OPT_BOOL,
76                 .help     = "Busy poll for completions instead of sleeping",
77                 .off1     = offsetof(struct rados_options, busy_poll),
78                 .def      = "0",
79                 .category = FIO_OPT_C_ENGINE,
80                 .group    = FIO_OPT_G_RBD,
81         },
82         {
83                 .name     = "touch_objects",
84                 .lname    = "touch objects on start",
85                 .type     = FIO_OPT_BOOL,
86                 .help     = "Touch (create) objects on start",
87                 .off1     = offsetof(struct rados_options, touch_objects),
88                 .def      = "1",
89                 .category = FIO_OPT_C_ENGINE,
90                 .group    = FIO_OPT_G_RBD,
91         },
92         {
93                 .name     = NULL,
94         },
95 };
96
97 static int _fio_setup_rados_data(struct thread_data *td,
98                                 struct rados_data **rados_data_ptr)
99 {
100         struct rados_data *rados;
101
102         if (td->io_ops_data)
103                 return 0;
104
105         rados = calloc(1, sizeof(struct rados_data));
106         if (!rados)
107                 goto failed;
108
109         rados->connected = false;
110
111         rados->aio_events = calloc(td->o.iodepth, sizeof(struct io_u *));
112         if (!rados->aio_events)
113                 goto failed;
114         pthread_mutex_init(&rados->completed_lock, NULL);
115         pthread_cond_init(&rados->completed_more_io, NULL);
116         INIT_FLIST_HEAD(&rados->completed_operations);
117         rados->ops_scheduled = 0;
118         rados->ops_completed = 0;
119         *rados_data_ptr = rados;
120         return 0;
121
122 failed:
123         if (rados) {
124                 if (rados->aio_events)
125                         free(rados->aio_events);
126                 free(rados);
127         }
128         return 1;
129 }
130
131 static void _fio_rados_rm_objects(struct thread_data *td, struct rados_data *rados)
132 {
133         size_t i;
134         for (i = 0; i < td->o.nr_files; i++) {
135                 struct fio_file *f = td->files[i];
136                 rados_remove(rados->io_ctx, f->file_name);
137         }
138 }
139
140 static int _fio_rados_connect(struct thread_data *td)
141 {
142         struct rados_data *rados = td->io_ops_data;
143         struct rados_options *o = td->eo;
144         int r;
145         const uint64_t file_size =
146                 td->o.size / (td->o.nr_files ? td->o.nr_files : 1u);
147         struct fio_file *f;
148         uint32_t i;
149
150         if (o->cluster_name) {
151                 char *client_name = NULL;
152
153                 /*
154                 * If we specify cluser name, the rados_create2
155                 * will not assume 'client.'. name is considered
156                 * as a full type.id namestr
157                 */
158                 if (o->client_name) {
159                         if (!index(o->client_name, '.')) {
160                                 client_name = calloc(1, strlen("client.") +
161                                         strlen(o->client_name) + 1);
162                                 strcat(client_name, "client.");
163                                 strcat(client_name, o->client_name);
164                         } else {
165                                 client_name = o->client_name;
166                         }
167                 }
168
169                 r = rados_create2(&rados->cluster, o->cluster_name,
170                         client_name, 0);
171
172                 if (client_name && !index(o->client_name, '.'))
173                         free(client_name);
174         } else
175                 r = rados_create(&rados->cluster, o->client_name);
176
177         if (o->pool_name == NULL) {
178                 log_err("rados pool name must be provided.\n");
179                 goto failed_early;
180         }
181
182         if (r < 0) {
183                 log_err("rados_create failed.\n");
184                 goto failed_early;
185         }
186
187         r = rados_conf_read_file(rados->cluster, NULL);
188         if (r < 0) {
189                 log_err("rados_conf_read_file failed.\n");
190                 goto failed_early;
191         }
192
193         r = rados_connect(rados->cluster);
194         if (r < 0) {
195                 log_err("rados_connect failed.\n");
196                 goto failed_early;
197         }
198
199         r = rados_ioctx_create(rados->cluster, o->pool_name, &rados->io_ctx);
200         if (r < 0) {
201                 log_err("rados_ioctx_create failed.\n");
202                 goto failed_shutdown;
203         }
204
205         for (i = 0; i < td->o.nr_files; i++) {
206                 f = td->files[i];
207                 f->real_file_size = file_size;
208                 if (o->touch_objects) {
209                         r = rados_write(rados->io_ctx, f->file_name, "", 0, 0);
210                         if (r < 0) {
211                                 goto failed_obj_create;
212                         }
213                 }
214         }
215         return 0;
216
217 failed_obj_create:
218         _fio_rados_rm_objects(td, rados);
219         rados_ioctx_destroy(rados->io_ctx);
220         rados->io_ctx = NULL;
221 failed_shutdown:
222         rados_shutdown(rados->cluster);
223         rados->cluster = NULL;
224 failed_early:
225         return 1;
226 }
227
228 static void _fio_rados_disconnect(struct rados_data *rados)
229 {
230         if (!rados)
231                 return;
232
233         if (rados->io_ctx) {
234                 rados_ioctx_destroy(rados->io_ctx);
235                 rados->io_ctx = NULL;
236         }
237
238         if (rados->cluster) {
239                 rados_shutdown(rados->cluster);
240                 rados->cluster = NULL;
241         }
242 }
243
244 static void fio_rados_cleanup(struct thread_data *td)
245 {
246         struct rados_data *rados = td->io_ops_data;
247         if (rados) {
248                 pthread_mutex_lock(&rados->completed_lock);
249                 while (rados->ops_scheduled != rados->ops_completed)
250                         pthread_cond_wait(&rados->completed_more_io, &rados->completed_lock);
251                 pthread_mutex_unlock(&rados->completed_lock);
252                 _fio_rados_rm_objects(td, rados);
253                 _fio_rados_disconnect(rados);
254                 free(rados->aio_events);
255                 free(rados);
256         }
257 }
258
259 static void complete_callback(rados_completion_t cb, void *arg)
260 {
261         struct fio_rados_iou *fri = (struct fio_rados_iou *)arg;
262         struct rados_data *rados = fri->td->io_ops_data;
263         assert(fri->completion);
264         assert(rados_aio_is_complete(fri->completion));
265         pthread_mutex_lock(&rados->completed_lock);
266         flist_add_tail(&fri->list, &rados->completed_operations);
267         rados->ops_completed++;
268         pthread_mutex_unlock(&rados->completed_lock);
269         pthread_cond_signal(&rados->completed_more_io);
270 }
271
272 static enum fio_q_status fio_rados_queue(struct thread_data *td,
273                                          struct io_u *io_u)
274 {
275         struct rados_data *rados = td->io_ops_data;
276         struct fio_rados_iou *fri = io_u->engine_data;
277         char *object = io_u->file->file_name;
278         int r = -1;
279
280         fio_ro_check(td, io_u);
281
282         if (io_u->ddir == DDIR_WRITE) {
283                  r = rados_aio_create_completion(fri, complete_callback,
284                         NULL, &fri->completion);
285                 if (r < 0) {
286                         log_err("rados_aio_create_completion failed.\n");
287                         goto failed;
288                 }
289
290                 r = rados_aio_write(rados->io_ctx, object, fri->completion,
291                         io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
292                 if (r < 0) {
293                         log_err("rados_write failed.\n");
294                         goto failed_comp;
295                 }
296                 rados->ops_scheduled++;
297                 return FIO_Q_QUEUED;
298         } else if (io_u->ddir == DDIR_READ) {
299                 r = rados_aio_create_completion(fri, complete_callback,
300                         NULL, &fri->completion);
301                 if (r < 0) {
302                         log_err("rados_aio_create_completion failed.\n");
303                         goto failed;
304                 }
305                 r = rados_aio_read(rados->io_ctx, object, fri->completion,
306                         io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
307                 if (r < 0) {
308                         log_err("rados_aio_read failed.\n");
309                         goto failed_comp;
310                 }
311                 rados->ops_scheduled++;
312                 return FIO_Q_QUEUED;
313         } else if (io_u->ddir == DDIR_TRIM) {
314                 r = rados_aio_create_completion(fri, complete_callback,
315                         NULL , &fri->completion);
316                 if (r < 0) {
317                         log_err("rados_aio_create_completion failed.\n");
318                         goto failed;
319                 }
320                 fri->write_op = rados_create_write_op();
321                 if (fri->write_op == NULL) {
322                         log_err("rados_create_write_op failed.\n");
323                         goto failed_comp;
324                 }
325                 rados_write_op_zero(fri->write_op, io_u->offset,
326                         io_u->xfer_buflen);
327                 r = rados_aio_write_op_operate(fri->write_op, rados->io_ctx,
328                         fri->completion, object, NULL, 0);
329                 if (r < 0) {
330                         log_err("rados_aio_write_op_operate failed.\n");
331                         goto failed_write_op;
332                 }
333                 rados->ops_scheduled++;
334                 return FIO_Q_QUEUED;
335          }
336
337         log_err("WARNING: Only DDIR_READ, DDIR_WRITE and DDIR_TRIM are supported!");
338
339 failed_write_op:
340         rados_release_write_op(fri->write_op);
341 failed_comp:
342         rados_aio_release(fri->completion);
343 failed:
344         io_u->error = -r;
345         td_verror(td, io_u->error, "xfer");
346         return FIO_Q_COMPLETED;
347 }
348
349 static struct io_u *fio_rados_event(struct thread_data *td, int event)
350 {
351         struct rados_data *rados = td->io_ops_data;
352         return rados->aio_events[event];
353 }
354
355 int fio_rados_getevents(struct thread_data *td, unsigned int min,
356         unsigned int max, const struct timespec *t)
357 {
358         struct rados_data *rados = td->io_ops_data;
359         unsigned int events = 0;
360         struct fio_rados_iou *fri;
361
362         pthread_mutex_lock(&rados->completed_lock);
363         while (events < min) {
364                 while (flist_empty(&rados->completed_operations)) {
365                         pthread_cond_wait(&rados->completed_more_io, &rados->completed_lock);
366                 }
367                 assert(!flist_empty(&rados->completed_operations));
368                 
369                 fri = flist_first_entry(&rados->completed_operations, struct fio_rados_iou, list);
370                 assert(fri->completion);
371                 assert(rados_aio_is_complete(fri->completion));
372                 if (fri->write_op != NULL) {
373                         rados_release_write_op(fri->write_op);
374                         fri->write_op = NULL;
375                 }
376                 rados_aio_release(fri->completion);
377                 fri->completion = NULL;
378
379                 rados->aio_events[events] = fri->io_u;
380                 events ++;
381                 flist_del(&fri->list);
382                 if (events >= max) break;
383         }
384         pthread_mutex_unlock(&rados->completed_lock);
385         return events;
386 }
387
388 static int fio_rados_setup(struct thread_data *td)
389 {
390         struct rados_data *rados = NULL;
391         int r;
392         /* allocate engine specific structure to deal with librados. */
393         r = _fio_setup_rados_data(td, &rados);
394         if (r) {
395                 log_err("fio_setup_rados_data failed.\n");
396                 goto cleanup;
397         }
398         td->io_ops_data = rados;
399
400         /* Force single process mode.
401         */
402         td->o.use_thread = 1;
403
404         /* connect in the main thread to determine to determine
405         * the size of the given RADOS block device. And disconnect
406         * later on.
407         */
408         r = _fio_rados_connect(td);
409         if (r) {
410                 log_err("fio_rados_connect failed.\n");
411                 goto cleanup;
412         }
413         rados->connected = true;
414
415         return 0;
416 cleanup:
417         fio_rados_cleanup(td);
418         return r;
419 }
420
421 /* open/invalidate are noops. we set the FIO_DISKLESSIO flag in ioengine_ops to
422    prevent fio from creating the files
423 */
424 static int fio_rados_open(struct thread_data *td, struct fio_file *f)
425 {
426         return 0;
427 }
428 static int fio_rados_invalidate(struct thread_data *td, struct fio_file *f)
429 {
430         return 0;
431 }
432
433 static void fio_rados_io_u_free(struct thread_data *td, struct io_u *io_u)
434 {
435         struct fio_rados_iou *fri = io_u->engine_data;
436
437         if (fri) {
438                 io_u->engine_data = NULL;
439                 fri->td = NULL;
440                 if (fri->completion)
441                         rados_aio_release(fri->completion);
442                 if (fri->write_op)
443                         rados_release_write_op(fri->write_op);
444                 free(fri);
445         }
446 }
447
448 static int fio_rados_io_u_init(struct thread_data *td, struct io_u *io_u)
449 {
450         struct fio_rados_iou *fri;
451         fri = calloc(1, sizeof(*fri));
452         fri->io_u = io_u;
453         fri->td = td;
454         INIT_FLIST_HEAD(&fri->list);
455         io_u->engine_data = fri;
456         return 0;
457 }
458
459 /* ioengine_ops for get_ioengine() */
460 FIO_STATIC struct ioengine_ops ioengine = {
461         .name = "rados",
462         .version                = FIO_IOOPS_VERSION,
463         .flags                  = FIO_DISKLESSIO,
464         .setup                  = fio_rados_setup,
465         .queue                  = fio_rados_queue,
466         .getevents              = fio_rados_getevents,
467         .event                  = fio_rados_event,
468         .cleanup                = fio_rados_cleanup,
469         .open_file              = fio_rados_open,
470         .invalidate             = fio_rados_invalidate,
471         .options                = options,
472         .io_u_init              = fio_rados_io_u_init,
473         .io_u_free              = fio_rados_io_u_free,
474         .option_struct_size     = sizeof(struct rados_options),
475 };
476
477 static void fio_init fio_rados_register(void)
478 {
479         register_ioengine(&ioengine);
480 }
481
482 static void fio_exit fio_rados_unregister(void)
483 {
484         unregister_ioengine(&ioengine);
485 }