Merge branch 'appveyor-artifacts' of https://github.com/vincentkfu/fio
[fio.git] / engines / rados.c
1 /*
2  *  Ceph Rados engine
3  *
4  * IO engine using Ceph's RADOS interface to test low-level performance of
5  * Ceph OSDs.
6  *
7  */
8
9 #include <rados/librados.h>
10 #include <pthread.h>
11 #include "fio.h"
12 #include "../optgroup.h"
13
14 struct rados_data {
15         rados_t cluster;
16         rados_ioctx_t io_ctx;
17         struct io_u **aio_events;
18         bool connected;
19         pthread_mutex_t completed_lock;
20         pthread_cond_t completed_more_io;
21         struct flist_head completed_operations;
22 };
23
24 struct fio_rados_iou {
25         struct flist_head list;
26         struct thread_data *td;
27         struct io_u *io_u;
28         rados_completion_t completion;
29         rados_write_op_t write_op;
30 };
31
32 /* fio configuration options read from the job file */
33 struct rados_options {
34         void *pad;
35         char *cluster_name;
36         char *pool_name;
37         char *client_name;
38         int busy_poll;
39 };
40
41 static struct fio_option options[] = {
42         {
43                 .name     = "clustername",
44                 .lname    = "ceph cluster name",
45                 .type     = FIO_OPT_STR_STORE,
46                 .help     = "Cluster name for ceph",
47                 .off1     = offsetof(struct rados_options, cluster_name),
48                 .category = FIO_OPT_C_ENGINE,
49                 .group    = FIO_OPT_G_RBD,
50         },
51         {
52                 .name     = "pool",
53                 .lname    = "pool name to use",
54                 .type     = FIO_OPT_STR_STORE,
55                 .help     = "Ceph pool name to benchmark against",
56                 .off1     = offsetof(struct rados_options, pool_name),
57                 .category = FIO_OPT_C_ENGINE,
58                 .group    = FIO_OPT_G_RBD,
59         },
60         {
61                 .name     = "clientname",
62                 .lname    = "rados engine clientname",
63                 .type     = FIO_OPT_STR_STORE,
64                 .help     = "Name of the ceph client to access RADOS engine",
65                 .off1     = offsetof(struct rados_options, client_name),
66                 .category = FIO_OPT_C_ENGINE,
67                 .group    = FIO_OPT_G_RBD,
68         },
69         {
70                 .name     = "busy_poll",
71                 .lname    = "busy poll mode",
72                 .type     = FIO_OPT_BOOL,
73                 .help     = "Busy poll for completions instead of sleeping",
74                 .off1     = offsetof(struct rados_options, busy_poll),
75                 .def      = "0",
76                 .category = FIO_OPT_C_ENGINE,
77                 .group    = FIO_OPT_G_RBD,
78         },
79         {
80                 .name     = NULL,
81         },
82 };
83
84 static int _fio_setup_rados_data(struct thread_data *td,
85                                 struct rados_data **rados_data_ptr)
86 {
87         struct rados_data *rados;
88
89         if (td->io_ops_data)
90                 return 0;
91
92         rados = calloc(1, sizeof(struct rados_data));
93         if (!rados)
94                 goto failed;
95
96         rados->connected = false;
97
98         rados->aio_events = calloc(td->o.iodepth, sizeof(struct io_u *));
99         if (!rados->aio_events)
100                 goto failed;
101         pthread_mutex_init(&rados->completed_lock, NULL);
102         pthread_cond_init(&rados->completed_more_io, NULL);
103         INIT_FLIST_HEAD(&rados->completed_operations);
104         *rados_data_ptr = rados;
105         return 0;
106
107 failed:
108         if (rados) {
109                 if (rados->aio_events)
110                         free(rados->aio_events);
111                 free(rados);
112         }
113         return 1;
114 }
115
116 static void _fio_rados_rm_objects(struct thread_data *td, struct rados_data *rados)
117 {
118         size_t i;
119         for (i = 0; i < td->o.nr_files; i++) {
120                 struct fio_file *f = td->files[i];
121                 rados_remove(rados->io_ctx, f->file_name);
122         }
123 }
124
125 static int _fio_rados_connect(struct thread_data *td)
126 {
127         struct rados_data *rados = td->io_ops_data;
128         struct rados_options *o = td->eo;
129         int r;
130         const uint64_t file_size =
131                 td->o.size / (td->o.nr_files ? td->o.nr_files : 1u);
132         struct fio_file *f;
133         uint32_t i;
134
135         if (o->cluster_name) {
136                 char *client_name = NULL;
137
138                 /*
139                 * If we specify cluser name, the rados_create2
140                 * will not assume 'client.'. name is considered
141                 * as a full type.id namestr
142                 */
143                 if (o->client_name) {
144                         if (!index(o->client_name, '.')) {
145                                 client_name = calloc(1, strlen("client.") +
146                                         strlen(o->client_name) + 1);
147                                 strcat(client_name, "client.");
148                                 strcat(client_name, o->client_name);
149                         } else {
150                                 client_name = o->client_name;
151                         }
152                 }
153
154                 r = rados_create2(&rados->cluster, o->cluster_name,
155                         client_name, 0);
156
157                 if (client_name && !index(o->client_name, '.'))
158                         free(client_name);
159         } else
160                 r = rados_create(&rados->cluster, o->client_name);
161
162         if (o->pool_name == NULL) {
163                 log_err("rados pool name must be provided.\n");
164                 goto failed_early;
165         }
166
167         if (r < 0) {
168                 log_err("rados_create failed.\n");
169                 goto failed_early;
170         }
171
172         r = rados_conf_read_file(rados->cluster, NULL);
173         if (r < 0) {
174                 log_err("rados_conf_read_file failed.\n");
175                 goto failed_early;
176         }
177
178         r = rados_connect(rados->cluster);
179         if (r < 0) {
180                 log_err("rados_connect failed.\n");
181                 goto failed_early;
182         }
183
184         r = rados_ioctx_create(rados->cluster, o->pool_name, &rados->io_ctx);
185         if (r < 0) {
186                 log_err("rados_ioctx_create failed.\n");
187                 goto failed_shutdown;
188         }
189
190         for (i = 0; i < td->o.nr_files; i++) {
191                 f = td->files[i];
192                 f->real_file_size = file_size;
193                 r = rados_write(rados->io_ctx, f->file_name, "", 0, 0);
194                 if (r < 0) {
195                         goto failed_obj_create;
196                 }
197         }
198         return 0;
199
200 failed_obj_create:
201         _fio_rados_rm_objects(td, rados);
202         rados_ioctx_destroy(rados->io_ctx);
203         rados->io_ctx = NULL;
204 failed_shutdown:
205         rados_shutdown(rados->cluster);
206         rados->cluster = NULL;
207 failed_early:
208         return 1;
209 }
210
211 static void _fio_rados_disconnect(struct rados_data *rados)
212 {
213         if (!rados)
214                 return;
215
216         if (rados->io_ctx) {
217                 rados_ioctx_destroy(rados->io_ctx);
218                 rados->io_ctx = NULL;
219         }
220
221         if (rados->cluster) {
222                 rados_shutdown(rados->cluster);
223                 rados->cluster = NULL;
224         }
225 }
226
227 static void fio_rados_cleanup(struct thread_data *td)
228 {
229         struct rados_data *rados = td->io_ops_data;
230
231         if (rados) {
232                 _fio_rados_rm_objects(td, rados);
233                 _fio_rados_disconnect(rados);
234                 free(rados->aio_events);
235                 free(rados);
236         }
237 }
238
239 static void complete_callback(rados_completion_t cb, void *arg)
240 {
241         struct fio_rados_iou *fri = (struct fio_rados_iou *)arg;
242         struct rados_data *rados = fri->td->io_ops_data;
243         assert(fri->completion);
244         assert(rados_aio_is_complete(fri->completion));
245         pthread_mutex_lock(&rados->completed_lock);
246         flist_add_tail(&fri->list, &rados->completed_operations);
247         pthread_mutex_unlock(&rados->completed_lock);
248         pthread_cond_signal(&rados->completed_more_io);
249 }
250
251 static enum fio_q_status fio_rados_queue(struct thread_data *td,
252                                          struct io_u *io_u)
253 {
254         struct rados_data *rados = td->io_ops_data;
255         struct fio_rados_iou *fri = io_u->engine_data;
256         char *object = io_u->file->file_name;
257         int r = -1;
258
259         fio_ro_check(td, io_u);
260
261         if (io_u->ddir == DDIR_WRITE) {
262                  r = rados_aio_create_completion(fri, complete_callback,
263                         NULL, &fri->completion);
264                 if (r < 0) {
265                         log_err("rados_aio_create_completion failed.\n");
266                         goto failed;
267                 }
268
269                 r = rados_aio_write(rados->io_ctx, object, fri->completion,
270                         io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
271                 if (r < 0) {
272                         log_err("rados_write failed.\n");
273                         goto failed_comp;
274                 }
275                 return FIO_Q_QUEUED;
276         } else if (io_u->ddir == DDIR_READ) {
277                 r = rados_aio_create_completion(fri, complete_callback,
278                         NULL, &fri->completion);
279                 if (r < 0) {
280                         log_err("rados_aio_create_completion failed.\n");
281                         goto failed;
282                 }
283                 r = rados_aio_read(rados->io_ctx, object, fri->completion,
284                         io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
285                 if (r < 0) {
286                         log_err("rados_aio_read failed.\n");
287                         goto failed_comp;
288                 }
289                 return FIO_Q_QUEUED;
290         } else if (io_u->ddir == DDIR_TRIM) {
291                 r = rados_aio_create_completion(fri, complete_callback,
292                         NULL , &fri->completion);
293                 if (r < 0) {
294                         log_err("rados_aio_create_completion failed.\n");
295                         goto failed;
296                 }
297                 fri->write_op = rados_create_write_op();
298                 if (fri->write_op == NULL) {
299                         log_err("rados_create_write_op failed.\n");
300                         goto failed_comp;
301                 }
302                 rados_write_op_zero(fri->write_op, io_u->offset,
303                         io_u->xfer_buflen);
304                 r = rados_aio_write_op_operate(fri->write_op, rados->io_ctx,
305                         fri->completion, object, NULL, 0);
306                 if (r < 0) {
307                         log_err("rados_aio_write_op_operate failed.\n");
308                         goto failed_write_op;
309                 }
310                 return FIO_Q_QUEUED;
311          }
312
313         log_err("WARNING: Only DDIR_READ, DDIR_WRITE and DDIR_TRIM are supported!");
314
315 failed_write_op:
316         rados_release_write_op(fri->write_op);
317 failed_comp:
318         rados_aio_release(fri->completion);
319 failed:
320         io_u->error = -r;
321         td_verror(td, io_u->error, "xfer");
322         return FIO_Q_COMPLETED;
323 }
324
325 static struct io_u *fio_rados_event(struct thread_data *td, int event)
326 {
327         struct rados_data *rados = td->io_ops_data;
328         return rados->aio_events[event];
329 }
330
331 int fio_rados_getevents(struct thread_data *td, unsigned int min,
332         unsigned int max, const struct timespec *t)
333 {
334         struct rados_data *rados = td->io_ops_data;
335         unsigned int events = 0;
336         struct fio_rados_iou *fri;
337
338         pthread_mutex_lock(&rados->completed_lock);
339         while (events < min) {
340                 while (flist_empty(&rados->completed_operations)) {
341                         pthread_cond_wait(&rados->completed_more_io, &rados->completed_lock);
342                 }
343                 assert(!flist_empty(&rados->completed_operations));
344                 
345                 fri = flist_first_entry(&rados->completed_operations, struct fio_rados_iou, list);
346                 assert(fri->completion);
347                 assert(rados_aio_is_complete(fri->completion));
348                 if (fri->write_op != NULL) {
349                         rados_release_write_op(fri->write_op);
350                         fri->write_op = NULL;
351                 }
352                 rados_aio_release(fri->completion);
353                 fri->completion = NULL;
354
355                 rados->aio_events[events] = fri->io_u;
356                 events ++;
357                 flist_del(&fri->list);
358                 if (events >= max) break;
359         }
360         pthread_mutex_unlock(&rados->completed_lock);
361         return events;
362 }
363
364 static int fio_rados_setup(struct thread_data *td)
365 {
366         struct rados_data *rados = NULL;
367         int r;
368         /* allocate engine specific structure to deal with librados. */
369         r = _fio_setup_rados_data(td, &rados);
370         if (r) {
371                 log_err("fio_setup_rados_data failed.\n");
372                 goto cleanup;
373         }
374         td->io_ops_data = rados;
375
376         /* Force single process mode.
377         */
378         td->o.use_thread = 1;
379
380         /* connect in the main thread to determine to determine
381         * the size of the given RADOS block device. And disconnect
382         * later on.
383         */
384         r = _fio_rados_connect(td);
385         if (r) {
386                 log_err("fio_rados_connect failed.\n");
387                 goto cleanup;
388         }
389         rados->connected = true;
390
391         return 0;
392 cleanup:
393         fio_rados_cleanup(td);
394         return r;
395 }
396
397 /* open/invalidate are noops. we set the FIO_DISKLESSIO flag in ioengine_ops to
398    prevent fio from creating the files
399 */
400 static int fio_rados_open(struct thread_data *td, struct fio_file *f)
401 {
402         return 0;
403 }
404 static int fio_rados_invalidate(struct thread_data *td, struct fio_file *f)
405 {
406         return 0;
407 }
408
409 static void fio_rados_io_u_free(struct thread_data *td, struct io_u *io_u)
410 {
411         struct fio_rados_iou *fri = io_u->engine_data;
412
413         if (fri) {
414                 io_u->engine_data = NULL;
415                 fri->td = NULL;
416                 if (fri->completion)
417                         rados_aio_release(fri->completion);
418                 if (fri->write_op)
419                         rados_release_write_op(fri->write_op);
420                 free(fri);
421         }
422 }
423
424 static int fio_rados_io_u_init(struct thread_data *td, struct io_u *io_u)
425 {
426         struct fio_rados_iou *fri;
427         fri = calloc(1, sizeof(*fri));
428         fri->io_u = io_u;
429         fri->td = td;
430         INIT_FLIST_HEAD(&fri->list);
431         io_u->engine_data = fri;
432         return 0;
433 }
434
435 /* ioengine_ops for get_ioengine() */
436 static struct ioengine_ops ioengine = {
437         .name = "rados",
438         .version                = FIO_IOOPS_VERSION,
439         .flags                  = FIO_DISKLESSIO,
440         .setup                  = fio_rados_setup,
441         .queue                  = fio_rados_queue,
442         .getevents              = fio_rados_getevents,
443         .event                  = fio_rados_event,
444         .cleanup                = fio_rados_cleanup,
445         .open_file              = fio_rados_open,
446         .invalidate             = fio_rados_invalidate,
447         .options                = options,
448         .io_u_init              = fio_rados_io_u_init,
449         .io_u_free              = fio_rados_io_u_free,
450         .option_struct_size     = sizeof(struct rados_options),
451 };
452
453 static void fio_init fio_rados_register(void)
454 {
455         register_ioengine(&ioengine);
456 }
457
458 static void fio_exit fio_rados_unregister(void)
459 {
460         unregister_ioengine(&ioengine);
461 }