t/nvmept_trim: increase transfer size for some tests
[fio.git] / engines / rados.c
1 /*
2  *  Ceph Rados engine
3  *
4  * IO engine using Ceph's RADOS interface to test low-level performance of
5  * Ceph OSDs.
6  *
7  */
8
9 #include <rados/librados.h>
10 #include <pthread.h>
11 #include "fio.h"
12 #include "../optgroup.h"
13
14 struct rados_data {
15         rados_t cluster;
16         rados_ioctx_t io_ctx;
17         struct io_u **aio_events;
18         bool connected;
19         pthread_mutex_t completed_lock;
20         pthread_cond_t completed_more_io;
21         struct flist_head completed_operations;
22         uint64_t ops_scheduled;
23         uint64_t ops_completed;
24 };
25
26 struct fio_rados_iou {
27         struct flist_head list;
28         struct thread_data *td;
29         struct io_u *io_u;
30         rados_completion_t completion;
31         rados_write_op_t write_op;
32 };
33
34 /* fio configuration options read from the job file */
35 struct rados_options {
36         void *pad;
37         char *cluster_name;
38         char *pool_name;
39         char *client_name;
40         char *conf;
41         int busy_poll;
42         int touch_objects;
43 };
44
45 static struct fio_option options[] = {
46         {
47                 .name     = "clustername",
48                 .lname    = "ceph cluster name",
49                 .type     = FIO_OPT_STR_STORE,
50                 .help     = "Cluster name for ceph",
51                 .off1     = offsetof(struct rados_options, cluster_name),
52                 .category = FIO_OPT_C_ENGINE,
53                 .group    = FIO_OPT_G_RBD,
54         },
55         {
56                 .name     = "pool",
57                 .lname    = "pool name to use",
58                 .type     = FIO_OPT_STR_STORE,
59                 .help     = "Ceph pool name to benchmark against",
60                 .off1     = offsetof(struct rados_options, pool_name),
61                 .category = FIO_OPT_C_ENGINE,
62                 .group    = FIO_OPT_G_RBD,
63         },
64         {
65                 .name     = "clientname",
66                 .lname    = "rados engine clientname",
67                 .type     = FIO_OPT_STR_STORE,
68                 .help     = "Name of the ceph client to access RADOS engine",
69                 .off1     = offsetof(struct rados_options, client_name),
70                 .category = FIO_OPT_C_ENGINE,
71                 .group    = FIO_OPT_G_RBD,
72         },
73         {
74                 .name     = "conf",
75                 .lname    = "ceph configuration file path",
76                 .type     = FIO_OPT_STR_STORE,
77                 .help     = "Path of the ceph configuration file",
78                 .off1     = offsetof(struct rados_options, conf),
79                 .def      = "/etc/ceph/ceph.conf",
80                 .category = FIO_OPT_C_ENGINE,
81                 .group    = FIO_OPT_G_RBD,
82         },
83         {
84                 .name     = "busy_poll",
85                 .lname    = "busy poll mode",
86                 .type     = FIO_OPT_BOOL,
87                 .help     = "Busy poll for completions instead of sleeping",
88                 .off1     = offsetof(struct rados_options, busy_poll),
89                 .def      = "0",
90                 .category = FIO_OPT_C_ENGINE,
91                 .group    = FIO_OPT_G_RBD,
92         },
93         {
94                 .name     = "touch_objects",
95                 .lname    = "touch objects on start",
96                 .type     = FIO_OPT_BOOL,
97                 .help     = "Touch (create) objects on start",
98                 .off1     = offsetof(struct rados_options, touch_objects),
99                 .def      = "1",
100                 .category = FIO_OPT_C_ENGINE,
101                 .group    = FIO_OPT_G_RBD,
102         },
103         {
104                 .name     = NULL,
105         },
106 };
107
108 static int _fio_setup_rados_data(struct thread_data *td,
109                                 struct rados_data **rados_data_ptr)
110 {
111         struct rados_data *rados;
112
113         if (td->io_ops_data)
114                 return 0;
115
116         rados = calloc(1, sizeof(struct rados_data));
117         if (!rados)
118                 goto failed;
119
120         rados->connected = false;
121
122         rados->aio_events = calloc(td->o.iodepth, sizeof(struct io_u *));
123         if (!rados->aio_events)
124                 goto failed;
125         pthread_mutex_init(&rados->completed_lock, NULL);
126         pthread_cond_init(&rados->completed_more_io, NULL);
127         INIT_FLIST_HEAD(&rados->completed_operations);
128         rados->ops_scheduled = 0;
129         rados->ops_completed = 0;
130         *rados_data_ptr = rados;
131         return 0;
132
133 failed:
134         if (rados) {
135                 if (rados->aio_events)
136                         free(rados->aio_events);
137                 free(rados);
138         }
139         return 1;
140 }
141
142 static void _fio_rados_rm_objects(struct thread_data *td, struct rados_data *rados)
143 {
144         size_t i;
145         for (i = 0; i < td->o.nr_files; i++) {
146                 struct fio_file *f = td->files[i];
147                 rados_remove(rados->io_ctx, f->file_name);
148         }
149 }
150
151 static int _fio_rados_connect(struct thread_data *td)
152 {
153         struct rados_data *rados = td->io_ops_data;
154         struct rados_options *o = td->eo;
155         int r;
156         const uint64_t file_size =
157                 td->o.size / (td->o.nr_files ? td->o.nr_files : 1u);
158         struct fio_file *f;
159         uint32_t i;
160
161         if (o->cluster_name) {
162                 char *client_name = NULL;
163
164                 /*
165                 * If we specify cluster name, the rados_create2
166                 * will not assume 'client.'. name is considered
167                 * as a full type.id namestr
168                 */
169                 if (o->client_name) {
170                         if (!index(o->client_name, '.')) {
171                                 client_name = calloc(1, strlen("client.") +
172                                         strlen(o->client_name) + 1);
173                                 strcat(client_name, "client.");
174                                 strcat(client_name, o->client_name);
175                         } else {
176                                 client_name = o->client_name;
177                         }
178                 }
179
180                 r = rados_create2(&rados->cluster, o->cluster_name,
181                         client_name, 0);
182
183                 if (client_name && !index(o->client_name, '.'))
184                         free(client_name);
185         } else
186                 r = rados_create(&rados->cluster, o->client_name);
187
188         if (o->pool_name == NULL) {
189                 log_err("rados pool name must be provided.\n");
190                 goto failed_early;
191         }
192
193         if (r < 0) {
194                 log_err("rados_create failed.\n");
195                 goto failed_early;
196         }
197
198         r = rados_conf_read_file(rados->cluster, o->conf);
199         if (r < 0) {
200                 log_err("rados_conf_read_file failed.\n");
201                 goto failed_early;
202         }
203
204         r = rados_connect(rados->cluster);
205         if (r < 0) {
206                 log_err("rados_connect failed.\n");
207                 goto failed_early;
208         }
209
210         r = rados_ioctx_create(rados->cluster, o->pool_name, &rados->io_ctx);
211         if (r < 0) {
212                 log_err("rados_ioctx_create failed.\n");
213                 goto failed_shutdown;
214         }
215
216         for (i = 0; i < td->o.nr_files; i++) {
217                 f = td->files[i];
218                 f->real_file_size = file_size;
219                 if (o->touch_objects) {
220                         r = rados_write(rados->io_ctx, f->file_name, "", 0, 0);
221                         if (r < 0) {
222                                 goto failed_obj_create;
223                         }
224                 }
225         }
226         return 0;
227
228 failed_obj_create:
229         _fio_rados_rm_objects(td, rados);
230         rados_ioctx_destroy(rados->io_ctx);
231         rados->io_ctx = NULL;
232 failed_shutdown:
233         rados_shutdown(rados->cluster);
234         rados->cluster = NULL;
235 failed_early:
236         return 1;
237 }
238
239 static void _fio_rados_disconnect(struct rados_data *rados)
240 {
241         if (!rados)
242                 return;
243
244         if (rados->io_ctx) {
245                 rados_ioctx_destroy(rados->io_ctx);
246                 rados->io_ctx = NULL;
247         }
248
249         if (rados->cluster) {
250                 rados_shutdown(rados->cluster);
251                 rados->cluster = NULL;
252         }
253 }
254
255 static void fio_rados_cleanup(struct thread_data *td)
256 {
257         struct rados_data *rados = td->io_ops_data;
258         if (rados) {
259                 pthread_mutex_lock(&rados->completed_lock);
260                 while (rados->ops_scheduled != rados->ops_completed)
261                         pthread_cond_wait(&rados->completed_more_io, &rados->completed_lock);
262                 pthread_mutex_unlock(&rados->completed_lock);
263                 _fio_rados_rm_objects(td, rados);
264                 _fio_rados_disconnect(rados);
265                 free(rados->aio_events);
266                 free(rados);
267         }
268 }
269
270 static void complete_callback(rados_completion_t cb, void *arg)
271 {
272         struct fio_rados_iou *fri = (struct fio_rados_iou *)arg;
273         struct rados_data *rados = fri->td->io_ops_data;
274         assert(fri->completion);
275         assert(rados_aio_is_complete(fri->completion));
276         pthread_mutex_lock(&rados->completed_lock);
277         flist_add_tail(&fri->list, &rados->completed_operations);
278         rados->ops_completed++;
279         pthread_mutex_unlock(&rados->completed_lock);
280         pthread_cond_signal(&rados->completed_more_io);
281 }
282
283 static enum fio_q_status fio_rados_queue(struct thread_data *td,
284                                          struct io_u *io_u)
285 {
286         struct rados_data *rados = td->io_ops_data;
287         struct fio_rados_iou *fri = io_u->engine_data;
288         char *object = io_u->file->file_name;
289         int r = -1;
290
291         fio_ro_check(td, io_u);
292
293         if (io_u->ddir == DDIR_WRITE) {
294                  r = rados_aio_create_completion(fri, complete_callback,
295                         NULL, &fri->completion);
296                 if (r < 0) {
297                         log_err("rados_aio_create_completion failed.\n");
298                         goto failed;
299                 }
300
301                 r = rados_aio_write(rados->io_ctx, object, fri->completion,
302                         io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
303                 if (r < 0) {
304                         log_err("rados_write failed.\n");
305                         goto failed_comp;
306                 }
307                 rados->ops_scheduled++;
308                 return FIO_Q_QUEUED;
309         } else if (io_u->ddir == DDIR_READ) {
310                 r = rados_aio_create_completion(fri, complete_callback,
311                         NULL, &fri->completion);
312                 if (r < 0) {
313                         log_err("rados_aio_create_completion failed.\n");
314                         goto failed;
315                 }
316                 r = rados_aio_read(rados->io_ctx, object, fri->completion,
317                         io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
318                 if (r < 0) {
319                         log_err("rados_aio_read failed.\n");
320                         goto failed_comp;
321                 }
322                 rados->ops_scheduled++;
323                 return FIO_Q_QUEUED;
324         } else if (io_u->ddir == DDIR_TRIM) {
325                 r = rados_aio_create_completion(fri, complete_callback,
326                         NULL , &fri->completion);
327                 if (r < 0) {
328                         log_err("rados_aio_create_completion failed.\n");
329                         goto failed;
330                 }
331                 fri->write_op = rados_create_write_op();
332                 if (fri->write_op == NULL) {
333                         log_err("rados_create_write_op failed.\n");
334                         goto failed_comp;
335                 }
336                 rados_write_op_zero(fri->write_op, io_u->offset,
337                         io_u->xfer_buflen);
338                 r = rados_aio_write_op_operate(fri->write_op, rados->io_ctx,
339                         fri->completion, object, NULL, 0);
340                 if (r < 0) {
341                         log_err("rados_aio_write_op_operate failed.\n");
342                         goto failed_write_op;
343                 }
344                 rados->ops_scheduled++;
345                 return FIO_Q_QUEUED;
346          }
347
348         log_err("WARNING: Only DDIR_READ, DDIR_WRITE and DDIR_TRIM are supported!");
349
350 failed_write_op:
351         rados_release_write_op(fri->write_op);
352 failed_comp:
353         rados_aio_release(fri->completion);
354 failed:
355         io_u->error = -r;
356         td_verror(td, io_u->error, "xfer");
357         return FIO_Q_COMPLETED;
358 }
359
360 static struct io_u *fio_rados_event(struct thread_data *td, int event)
361 {
362         struct rados_data *rados = td->io_ops_data;
363         return rados->aio_events[event];
364 }
365
366 int fio_rados_getevents(struct thread_data *td, unsigned int min,
367         unsigned int max, const struct timespec *t)
368 {
369         struct rados_data *rados = td->io_ops_data;
370         unsigned int events = 0;
371         struct fio_rados_iou *fri;
372
373         pthread_mutex_lock(&rados->completed_lock);
374         while (events < min) {
375                 while (flist_empty(&rados->completed_operations)) {
376                         pthread_cond_wait(&rados->completed_more_io, &rados->completed_lock);
377                 }
378                 assert(!flist_empty(&rados->completed_operations));
379                 
380                 fri = flist_first_entry(&rados->completed_operations, struct fio_rados_iou, list);
381                 assert(fri->completion);
382                 assert(rados_aio_is_complete(fri->completion));
383                 if (fri->write_op != NULL) {
384                         rados_release_write_op(fri->write_op);
385                         fri->write_op = NULL;
386                 }
387                 rados_aio_release(fri->completion);
388                 fri->completion = NULL;
389
390                 rados->aio_events[events] = fri->io_u;
391                 events ++;
392                 flist_del(&fri->list);
393                 if (events >= max) break;
394         }
395         pthread_mutex_unlock(&rados->completed_lock);
396         return events;
397 }
398
399 static int fio_rados_setup(struct thread_data *td)
400 {
401         struct rados_data *rados = NULL;
402         int r;
403         /* allocate engine specific structure to deal with librados. */
404         r = _fio_setup_rados_data(td, &rados);
405         if (r) {
406                 log_err("fio_setup_rados_data failed.\n");
407                 goto cleanup;
408         }
409         td->io_ops_data = rados;
410
411         /* Force single process mode.
412         */
413         td->o.use_thread = 1;
414
415         /* connect in the main thread to determine to determine
416         * the size of the given RADOS block device. And disconnect
417         * later on.
418         */
419         r = _fio_rados_connect(td);
420         if (r) {
421                 log_err("fio_rados_connect failed.\n");
422                 goto cleanup;
423         }
424         rados->connected = true;
425
426         return 0;
427 cleanup:
428         fio_rados_cleanup(td);
429         return r;
430 }
431
432 /* open/invalidate are noops. we set the FIO_DISKLESSIO flag in ioengine_ops to
433    prevent fio from creating the files
434 */
435 static int fio_rados_open(struct thread_data *td, struct fio_file *f)
436 {
437         return 0;
438 }
439 static int fio_rados_invalidate(struct thread_data *td, struct fio_file *f)
440 {
441         return 0;
442 }
443
444 static void fio_rados_io_u_free(struct thread_data *td, struct io_u *io_u)
445 {
446         struct fio_rados_iou *fri = io_u->engine_data;
447
448         if (fri) {
449                 io_u->engine_data = NULL;
450                 fri->td = NULL;
451                 if (fri->completion)
452                         rados_aio_release(fri->completion);
453                 if (fri->write_op)
454                         rados_release_write_op(fri->write_op);
455                 free(fri);
456         }
457 }
458
459 static int fio_rados_io_u_init(struct thread_data *td, struct io_u *io_u)
460 {
461         struct fio_rados_iou *fri;
462         fri = calloc(1, sizeof(*fri));
463         fri->io_u = io_u;
464         fri->td = td;
465         INIT_FLIST_HEAD(&fri->list);
466         io_u->engine_data = fri;
467         return 0;
468 }
469
470 /* ioengine_ops for get_ioengine() */
471 FIO_STATIC struct ioengine_ops ioengine = {
472         .name = "rados",
473         .version                = FIO_IOOPS_VERSION,
474         .flags                  = FIO_DISKLESSIO,
475         .setup                  = fio_rados_setup,
476         .queue                  = fio_rados_queue,
477         .getevents              = fio_rados_getevents,
478         .event                  = fio_rados_event,
479         .cleanup                = fio_rados_cleanup,
480         .open_file              = fio_rados_open,
481         .invalidate             = fio_rados_invalidate,
482         .options                = options,
483         .io_u_init              = fio_rados_io_u_init,
484         .io_u_free              = fio_rados_io_u_free,
485         .option_struct_size     = sizeof(struct rados_options),
486 };
487
488 static void fio_init fio_rados_register(void)
489 {
490         register_ioengine(&ioengine);
491 }
492
493 static void fio_exit fio_rados_unregister(void)
494 {
495         unregister_ioengine(&ioengine);
496 }