engines/io_uring: add verbose error for ENOSYS
[fio.git] / engines / rados.c
1 /*
2  *  Ceph Rados engine
3  *
4  * IO engine using Ceph's RADOS interface to test low-level performance of
5  * Ceph OSDs.
6  *
7  */
8
9 #include <rados/librados.h>
10 #include <pthread.h>
11 #include "fio.h"
12 #include "../optgroup.h"
13
14 struct rados_data {
15         rados_t cluster;
16         rados_ioctx_t io_ctx;
17         struct io_u **aio_events;
18         bool connected;
19         pthread_mutex_t completed_lock;
20         pthread_cond_t completed_more_io;
21         struct flist_head completed_operations;
22         uint64_t ops_scheduled;
23         uint64_t ops_completed;
24 };
25
26 struct fio_rados_iou {
27         struct flist_head list;
28         struct thread_data *td;
29         struct io_u *io_u;
30         rados_completion_t completion;
31         rados_write_op_t write_op;
32 };
33
34 /* fio configuration options read from the job file */
35 struct rados_options {
36         void *pad;
37         char *cluster_name;
38         char *pool_name;
39         char *client_name;
40         int busy_poll;
41 };
42
43 static struct fio_option options[] = {
44         {
45                 .name     = "clustername",
46                 .lname    = "ceph cluster name",
47                 .type     = FIO_OPT_STR_STORE,
48                 .help     = "Cluster name for ceph",
49                 .off1     = offsetof(struct rados_options, cluster_name),
50                 .category = FIO_OPT_C_ENGINE,
51                 .group    = FIO_OPT_G_RBD,
52         },
53         {
54                 .name     = "pool",
55                 .lname    = "pool name to use",
56                 .type     = FIO_OPT_STR_STORE,
57                 .help     = "Ceph pool name to benchmark against",
58                 .off1     = offsetof(struct rados_options, pool_name),
59                 .category = FIO_OPT_C_ENGINE,
60                 .group    = FIO_OPT_G_RBD,
61         },
62         {
63                 .name     = "clientname",
64                 .lname    = "rados engine clientname",
65                 .type     = FIO_OPT_STR_STORE,
66                 .help     = "Name of the ceph client to access RADOS engine",
67                 .off1     = offsetof(struct rados_options, client_name),
68                 .category = FIO_OPT_C_ENGINE,
69                 .group    = FIO_OPT_G_RBD,
70         },
71         {
72                 .name     = "busy_poll",
73                 .lname    = "busy poll mode",
74                 .type     = FIO_OPT_BOOL,
75                 .help     = "Busy poll for completions instead of sleeping",
76                 .off1     = offsetof(struct rados_options, busy_poll),
77                 .def      = "0",
78                 .category = FIO_OPT_C_ENGINE,
79                 .group    = FIO_OPT_G_RBD,
80         },
81         {
82                 .name     = NULL,
83         },
84 };
85
86 static int _fio_setup_rados_data(struct thread_data *td,
87                                 struct rados_data **rados_data_ptr)
88 {
89         struct rados_data *rados;
90
91         if (td->io_ops_data)
92                 return 0;
93
94         rados = calloc(1, sizeof(struct rados_data));
95         if (!rados)
96                 goto failed;
97
98         rados->connected = false;
99
100         rados->aio_events = calloc(td->o.iodepth, sizeof(struct io_u *));
101         if (!rados->aio_events)
102                 goto failed;
103         pthread_mutex_init(&rados->completed_lock, NULL);
104         pthread_cond_init(&rados->completed_more_io, NULL);
105         INIT_FLIST_HEAD(&rados->completed_operations);
106         rados->ops_scheduled = 0;
107         rados->ops_completed = 0;
108         *rados_data_ptr = rados;
109         return 0;
110
111 failed:
112         if (rados) {
113                 if (rados->aio_events)
114                         free(rados->aio_events);
115                 free(rados);
116         }
117         return 1;
118 }
119
120 static void _fio_rados_rm_objects(struct thread_data *td, struct rados_data *rados)
121 {
122         size_t i;
123         for (i = 0; i < td->o.nr_files; i++) {
124                 struct fio_file *f = td->files[i];
125                 rados_remove(rados->io_ctx, f->file_name);
126         }
127 }
128
129 static int _fio_rados_connect(struct thread_data *td)
130 {
131         struct rados_data *rados = td->io_ops_data;
132         struct rados_options *o = td->eo;
133         int r;
134         const uint64_t file_size =
135                 td->o.size / (td->o.nr_files ? td->o.nr_files : 1u);
136         struct fio_file *f;
137         uint32_t i;
138
139         if (o->cluster_name) {
140                 char *client_name = NULL;
141
142                 /*
143                 * If we specify cluser name, the rados_create2
144                 * will not assume 'client.'. name is considered
145                 * as a full type.id namestr
146                 */
147                 if (o->client_name) {
148                         if (!index(o->client_name, '.')) {
149                                 client_name = calloc(1, strlen("client.") +
150                                         strlen(o->client_name) + 1);
151                                 strcat(client_name, "client.");
152                                 strcat(client_name, o->client_name);
153                         } else {
154                                 client_name = o->client_name;
155                         }
156                 }
157
158                 r = rados_create2(&rados->cluster, o->cluster_name,
159                         client_name, 0);
160
161                 if (client_name && !index(o->client_name, '.'))
162                         free(client_name);
163         } else
164                 r = rados_create(&rados->cluster, o->client_name);
165
166         if (o->pool_name == NULL) {
167                 log_err("rados pool name must be provided.\n");
168                 goto failed_early;
169         }
170
171         if (r < 0) {
172                 log_err("rados_create failed.\n");
173                 goto failed_early;
174         }
175
176         r = rados_conf_read_file(rados->cluster, NULL);
177         if (r < 0) {
178                 log_err("rados_conf_read_file failed.\n");
179                 goto failed_early;
180         }
181
182         r = rados_connect(rados->cluster);
183         if (r < 0) {
184                 log_err("rados_connect failed.\n");
185                 goto failed_early;
186         }
187
188         r = rados_ioctx_create(rados->cluster, o->pool_name, &rados->io_ctx);
189         if (r < 0) {
190                 log_err("rados_ioctx_create failed.\n");
191                 goto failed_shutdown;
192         }
193
194         for (i = 0; i < td->o.nr_files; i++) {
195                 f = td->files[i];
196                 f->real_file_size = file_size;
197                 r = rados_write(rados->io_ctx, f->file_name, "", 0, 0);
198                 if (r < 0) {
199                         goto failed_obj_create;
200                 }
201         }
202         return 0;
203
204 failed_obj_create:
205         _fio_rados_rm_objects(td, rados);
206         rados_ioctx_destroy(rados->io_ctx);
207         rados->io_ctx = NULL;
208 failed_shutdown:
209         rados_shutdown(rados->cluster);
210         rados->cluster = NULL;
211 failed_early:
212         return 1;
213 }
214
215 static void _fio_rados_disconnect(struct rados_data *rados)
216 {
217         if (!rados)
218                 return;
219
220         if (rados->io_ctx) {
221                 rados_ioctx_destroy(rados->io_ctx);
222                 rados->io_ctx = NULL;
223         }
224
225         if (rados->cluster) {
226                 rados_shutdown(rados->cluster);
227                 rados->cluster = NULL;
228         }
229 }
230
231 static void fio_rados_cleanup(struct thread_data *td)
232 {
233         struct rados_data *rados = td->io_ops_data;
234         if (rados) {
235                 pthread_mutex_lock(&rados->completed_lock);
236                 while (rados->ops_scheduled != rados->ops_completed)
237                         pthread_cond_wait(&rados->completed_more_io, &rados->completed_lock);
238                 pthread_mutex_unlock(&rados->completed_lock);
239                 _fio_rados_rm_objects(td, rados);
240                 _fio_rados_disconnect(rados);
241                 free(rados->aio_events);
242                 free(rados);
243         }
244 }
245
246 static void complete_callback(rados_completion_t cb, void *arg)
247 {
248         struct fio_rados_iou *fri = (struct fio_rados_iou *)arg;
249         struct rados_data *rados = fri->td->io_ops_data;
250         assert(fri->completion);
251         assert(rados_aio_is_complete(fri->completion));
252         pthread_mutex_lock(&rados->completed_lock);
253         flist_add_tail(&fri->list, &rados->completed_operations);
254         rados->ops_completed++;
255         pthread_mutex_unlock(&rados->completed_lock);
256         pthread_cond_signal(&rados->completed_more_io);
257 }
258
259 static enum fio_q_status fio_rados_queue(struct thread_data *td,
260                                          struct io_u *io_u)
261 {
262         struct rados_data *rados = td->io_ops_data;
263         struct fio_rados_iou *fri = io_u->engine_data;
264         char *object = io_u->file->file_name;
265         int r = -1;
266
267         fio_ro_check(td, io_u);
268
269         if (io_u->ddir == DDIR_WRITE) {
270                  r = rados_aio_create_completion(fri, complete_callback,
271                         NULL, &fri->completion);
272                 if (r < 0) {
273                         log_err("rados_aio_create_completion failed.\n");
274                         goto failed;
275                 }
276
277                 r = rados_aio_write(rados->io_ctx, object, fri->completion,
278                         io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
279                 if (r < 0) {
280                         log_err("rados_write failed.\n");
281                         goto failed_comp;
282                 }
283                 rados->ops_scheduled++;
284                 return FIO_Q_QUEUED;
285         } else if (io_u->ddir == DDIR_READ) {
286                 r = rados_aio_create_completion(fri, complete_callback,
287                         NULL, &fri->completion);
288                 if (r < 0) {
289                         log_err("rados_aio_create_completion failed.\n");
290                         goto failed;
291                 }
292                 r = rados_aio_read(rados->io_ctx, object, fri->completion,
293                         io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
294                 if (r < 0) {
295                         log_err("rados_aio_read failed.\n");
296                         goto failed_comp;
297                 }
298                 rados->ops_scheduled++;
299                 return FIO_Q_QUEUED;
300         } else if (io_u->ddir == DDIR_TRIM) {
301                 r = rados_aio_create_completion(fri, complete_callback,
302                         NULL , &fri->completion);
303                 if (r < 0) {
304                         log_err("rados_aio_create_completion failed.\n");
305                         goto failed;
306                 }
307                 fri->write_op = rados_create_write_op();
308                 if (fri->write_op == NULL) {
309                         log_err("rados_create_write_op failed.\n");
310                         goto failed_comp;
311                 }
312                 rados_write_op_zero(fri->write_op, io_u->offset,
313                         io_u->xfer_buflen);
314                 r = rados_aio_write_op_operate(fri->write_op, rados->io_ctx,
315                         fri->completion, object, NULL, 0);
316                 if (r < 0) {
317                         log_err("rados_aio_write_op_operate failed.\n");
318                         goto failed_write_op;
319                 }
320                 rados->ops_scheduled++;
321                 return FIO_Q_QUEUED;
322          }
323
324         log_err("WARNING: Only DDIR_READ, DDIR_WRITE and DDIR_TRIM are supported!");
325
326 failed_write_op:
327         rados_release_write_op(fri->write_op);
328 failed_comp:
329         rados_aio_release(fri->completion);
330 failed:
331         io_u->error = -r;
332         td_verror(td, io_u->error, "xfer");
333         return FIO_Q_COMPLETED;
334 }
335
336 static struct io_u *fio_rados_event(struct thread_data *td, int event)
337 {
338         struct rados_data *rados = td->io_ops_data;
339         return rados->aio_events[event];
340 }
341
342 int fio_rados_getevents(struct thread_data *td, unsigned int min,
343         unsigned int max, const struct timespec *t)
344 {
345         struct rados_data *rados = td->io_ops_data;
346         unsigned int events = 0;
347         struct fio_rados_iou *fri;
348
349         pthread_mutex_lock(&rados->completed_lock);
350         while (events < min) {
351                 while (flist_empty(&rados->completed_operations)) {
352                         pthread_cond_wait(&rados->completed_more_io, &rados->completed_lock);
353                 }
354                 assert(!flist_empty(&rados->completed_operations));
355                 
356                 fri = flist_first_entry(&rados->completed_operations, struct fio_rados_iou, list);
357                 assert(fri->completion);
358                 assert(rados_aio_is_complete(fri->completion));
359                 if (fri->write_op != NULL) {
360                         rados_release_write_op(fri->write_op);
361                         fri->write_op = NULL;
362                 }
363                 rados_aio_release(fri->completion);
364                 fri->completion = NULL;
365
366                 rados->aio_events[events] = fri->io_u;
367                 events ++;
368                 flist_del(&fri->list);
369                 if (events >= max) break;
370         }
371         pthread_mutex_unlock(&rados->completed_lock);
372         return events;
373 }
374
375 static int fio_rados_setup(struct thread_data *td)
376 {
377         struct rados_data *rados = NULL;
378         int r;
379         /* allocate engine specific structure to deal with librados. */
380         r = _fio_setup_rados_data(td, &rados);
381         if (r) {
382                 log_err("fio_setup_rados_data failed.\n");
383                 goto cleanup;
384         }
385         td->io_ops_data = rados;
386
387         /* Force single process mode.
388         */
389         td->o.use_thread = 1;
390
391         /* connect in the main thread to determine to determine
392         * the size of the given RADOS block device. And disconnect
393         * later on.
394         */
395         r = _fio_rados_connect(td);
396         if (r) {
397                 log_err("fio_rados_connect failed.\n");
398                 goto cleanup;
399         }
400         rados->connected = true;
401
402         return 0;
403 cleanup:
404         fio_rados_cleanup(td);
405         return r;
406 }
407
408 /* open/invalidate are noops. we set the FIO_DISKLESSIO flag in ioengine_ops to
409    prevent fio from creating the files
410 */
411 static int fio_rados_open(struct thread_data *td, struct fio_file *f)
412 {
413         return 0;
414 }
415 static int fio_rados_invalidate(struct thread_data *td, struct fio_file *f)
416 {
417         return 0;
418 }
419
420 static void fio_rados_io_u_free(struct thread_data *td, struct io_u *io_u)
421 {
422         struct fio_rados_iou *fri = io_u->engine_data;
423
424         if (fri) {
425                 io_u->engine_data = NULL;
426                 fri->td = NULL;
427                 if (fri->completion)
428                         rados_aio_release(fri->completion);
429                 if (fri->write_op)
430                         rados_release_write_op(fri->write_op);
431                 free(fri);
432         }
433 }
434
435 static int fio_rados_io_u_init(struct thread_data *td, struct io_u *io_u)
436 {
437         struct fio_rados_iou *fri;
438         fri = calloc(1, sizeof(*fri));
439         fri->io_u = io_u;
440         fri->td = td;
441         INIT_FLIST_HEAD(&fri->list);
442         io_u->engine_data = fri;
443         return 0;
444 }
445
446 /* ioengine_ops for get_ioengine() */
447 FIO_STATIC struct ioengine_ops ioengine = {
448         .name = "rados",
449         .version                = FIO_IOOPS_VERSION,
450         .flags                  = FIO_DISKLESSIO,
451         .setup                  = fio_rados_setup,
452         .queue                  = fio_rados_queue,
453         .getevents              = fio_rados_getevents,
454         .event                  = fio_rados_event,
455         .cleanup                = fio_rados_cleanup,
456         .open_file              = fio_rados_open,
457         .invalidate             = fio_rados_invalidate,
458         .options                = options,
459         .io_u_init              = fio_rados_io_u_init,
460         .io_u_free              = fio_rados_io_u_free,
461         .option_struct_size     = sizeof(struct rados_options),
462 };
463
464 static void fio_init fio_rados_register(void)
465 {
466         register_ioengine(&ioengine);
467 }
468
469 static void fio_exit fio_rados_unregister(void)
470 {
471         unregister_ioengine(&ioengine);
472 }