stat: make add lat percentile functions inline
[fio.git] / engines / rados.c
CommitLineData
d5f9b0ea
IF
1/*
2 * Ceph Rados engine
3 *
4 * IO engine using Ceph's RADOS interface to test low-level performance of
5 * Ceph OSDs.
6 *
7 */
8
9#include <rados/librados.h>
10#include <pthread.h>
11#include "fio.h"
12#include "../optgroup.h"
13
a21dafb0 14struct rados_data {
1e30d8d0
AK
15 rados_t cluster;
16 rados_ioctx_t io_ctx;
17 struct io_u **aio_events;
18 bool connected;
19 pthread_mutex_t completed_lock;
95625c6d 20 pthread_cond_t completed_more_io;
1e30d8d0
AK
21 struct flist_head completed_operations;
22 uint64_t ops_scheduled;
23 uint64_t ops_completed;
a21dafb0
AK
24};
25
d5f9b0ea 26struct fio_rados_iou {
a21dafb0 27 struct flist_head list;
d5f9b0ea
IF
28 struct thread_data *td;
29 struct io_u *io_u;
30 rados_completion_t completion;
31 rados_write_op_t write_op;
32};
33
d5f9b0ea
IF
34/* fio configuration options read from the job file */
35struct rados_options {
36 void *pad;
37 char *cluster_name;
38 char *pool_name;
39 char *client_name;
40 int busy_poll;
2b728756 41 int touch_objects;
d5f9b0ea
IF
42};
43
44static struct fio_option options[] = {
45 {
46 .name = "clustername",
47 .lname = "ceph cluster name",
48 .type = FIO_OPT_STR_STORE,
49 .help = "Cluster name for ceph",
50 .off1 = offsetof(struct rados_options, cluster_name),
51 .category = FIO_OPT_C_ENGINE,
52 .group = FIO_OPT_G_RBD,
53 },
54 {
55 .name = "pool",
56 .lname = "pool name to use",
57 .type = FIO_OPT_STR_STORE,
58 .help = "Ceph pool name to benchmark against",
59 .off1 = offsetof(struct rados_options, pool_name),
60 .category = FIO_OPT_C_ENGINE,
61 .group = FIO_OPT_G_RBD,
62 },
63 {
64 .name = "clientname",
65 .lname = "rados engine clientname",
66 .type = FIO_OPT_STR_STORE,
67 .help = "Name of the ceph client to access RADOS engine",
68 .off1 = offsetof(struct rados_options, client_name),
69 .category = FIO_OPT_C_ENGINE,
70 .group = FIO_OPT_G_RBD,
71 },
72 {
73 .name = "busy_poll",
74 .lname = "busy poll mode",
75 .type = FIO_OPT_BOOL,
76 .help = "Busy poll for completions instead of sleeping",
77 .off1 = offsetof(struct rados_options, busy_poll),
78 .def = "0",
79 .category = FIO_OPT_C_ENGINE,
80 .group = FIO_OPT_G_RBD,
81 },
2b728756
AK
82 {
83 .name = "touch_objects",
84 .lname = "touch objects on start",
85 .type = FIO_OPT_BOOL,
86 .help = "Touch (create) objects on start",
87 .off1 = offsetof(struct rados_options, touch_objects),
88 .def = "1",
89 .category = FIO_OPT_C_ENGINE,
90 .group = FIO_OPT_G_RBD,
91 },
d5f9b0ea
IF
92 {
93 .name = NULL,
94 },
95};
96
97static int _fio_setup_rados_data(struct thread_data *td,
98 struct rados_data **rados_data_ptr)
99{
100 struct rados_data *rados;
101
102 if (td->io_ops_data)
103 return 0;
104
105 rados = calloc(1, sizeof(struct rados_data));
106 if (!rados)
107 goto failed;
108
109 rados->connected = false;
110
111 rados->aio_events = calloc(td->o.iodepth, sizeof(struct io_u *));
112 if (!rados->aio_events)
113 goto failed;
a21dafb0
AK
114 pthread_mutex_init(&rados->completed_lock, NULL);
115 pthread_cond_init(&rados->completed_more_io, NULL);
116 INIT_FLIST_HEAD(&rados->completed_operations);
1e30d8d0
AK
117 rados->ops_scheduled = 0;
118 rados->ops_completed = 0;
d5f9b0ea
IF
119 *rados_data_ptr = rados;
120 return 0;
121
122failed:
123 if (rados) {
d5f9b0ea
IF
124 if (rados->aio_events)
125 free(rados->aio_events);
126 free(rados);
127 }
128 return 1;
129}
130
21f277b8 131static void _fio_rados_rm_objects(struct thread_data *td, struct rados_data *rados)
d5f9b0ea
IF
132{
133 size_t i;
21f277b8
AK
134 for (i = 0; i < td->o.nr_files; i++) {
135 struct fio_file *f = td->files[i];
136 rados_remove(rados->io_ctx, f->file_name);
d5f9b0ea
IF
137 }
138}
139
140static int _fio_rados_connect(struct thread_data *td)
141{
142 struct rados_data *rados = td->io_ops_data;
143 struct rados_options *o = td->eo;
144 int r;
145 const uint64_t file_size =
146 td->o.size / (td->o.nr_files ? td->o.nr_files : 1u);
147 struct fio_file *f;
148 uint32_t i;
d5f9b0ea
IF
149
150 if (o->cluster_name) {
151 char *client_name = NULL;
152
153 /*
154 * If we specify cluser name, the rados_create2
155 * will not assume 'client.'. name is considered
156 * as a full type.id namestr
157 */
158 if (o->client_name) {
159 if (!index(o->client_name, '.')) {
160 client_name = calloc(1, strlen("client.") +
161 strlen(o->client_name) + 1);
162 strcat(client_name, "client.");
163 strcat(client_name, o->client_name);
164 } else {
165 client_name = o->client_name;
166 }
167 }
168
169 r = rados_create2(&rados->cluster, o->cluster_name,
170 client_name, 0);
171
172 if (client_name && !index(o->client_name, '.'))
173 free(client_name);
174 } else
175 r = rados_create(&rados->cluster, o->client_name);
176
21f277b8
AK
177 if (o->pool_name == NULL) {
178 log_err("rados pool name must be provided.\n");
179 goto failed_early;
180 }
181
d5f9b0ea
IF
182 if (r < 0) {
183 log_err("rados_create failed.\n");
184 goto failed_early;
185 }
186
187 r = rados_conf_read_file(rados->cluster, NULL);
188 if (r < 0) {
189 log_err("rados_conf_read_file failed.\n");
190 goto failed_early;
191 }
192
193 r = rados_connect(rados->cluster);
194 if (r < 0) {
195 log_err("rados_connect failed.\n");
196 goto failed_early;
197 }
198
199 r = rados_ioctx_create(rados->cluster, o->pool_name, &rados->io_ctx);
200 if (r < 0) {
201 log_err("rados_ioctx_create failed.\n");
202 goto failed_shutdown;
203 }
204
21f277b8 205 for (i = 0; i < td->o.nr_files; i++) {
d5f9b0ea
IF
206 f = td->files[i];
207 f->real_file_size = file_size;
2b728756
AK
208 if (o->touch_objects) {
209 r = rados_write(rados->io_ctx, f->file_name, "", 0, 0);
210 if (r < 0) {
211 goto failed_obj_create;
212 }
d5f9b0ea
IF
213 }
214 }
21f277b8 215 return 0;
d5f9b0ea
IF
216
217failed_obj_create:
21f277b8 218 _fio_rados_rm_objects(td, rados);
d5f9b0ea
IF
219 rados_ioctx_destroy(rados->io_ctx);
220 rados->io_ctx = NULL;
221failed_shutdown:
222 rados_shutdown(rados->cluster);
223 rados->cluster = NULL;
224failed_early:
225 return 1;
226}
227
228static void _fio_rados_disconnect(struct rados_data *rados)
229{
230 if (!rados)
231 return;
232
d5f9b0ea
IF
233 if (rados->io_ctx) {
234 rados_ioctx_destroy(rados->io_ctx);
235 rados->io_ctx = NULL;
236 }
237
238 if (rados->cluster) {
239 rados_shutdown(rados->cluster);
240 rados->cluster = NULL;
241 }
242}
243
244static void fio_rados_cleanup(struct thread_data *td)
245{
246 struct rados_data *rados = td->io_ops_data;
d5f9b0ea 247 if (rados) {
1e30d8d0
AK
248 pthread_mutex_lock(&rados->completed_lock);
249 while (rados->ops_scheduled != rados->ops_completed)
250 pthread_cond_wait(&rados->completed_more_io, &rados->completed_lock);
251 pthread_mutex_unlock(&rados->completed_lock);
21f277b8 252 _fio_rados_rm_objects(td, rados);
d5f9b0ea 253 _fio_rados_disconnect(rados);
d5f9b0ea
IF
254 free(rados->aio_events);
255 free(rados);
256 }
257}
258
a21dafb0
AK
259static void complete_callback(rados_completion_t cb, void *arg)
260{
261 struct fio_rados_iou *fri = (struct fio_rados_iou *)arg;
262 struct rados_data *rados = fri->td->io_ops_data;
263 assert(fri->completion);
264 assert(rados_aio_is_complete(fri->completion));
265 pthread_mutex_lock(&rados->completed_lock);
266 flist_add_tail(&fri->list, &rados->completed_operations);
1e30d8d0 267 rados->ops_completed++;
a21dafb0
AK
268 pthread_mutex_unlock(&rados->completed_lock);
269 pthread_cond_signal(&rados->completed_more_io);
270}
271
2e4ef4fb
JA
272static enum fio_q_status fio_rados_queue(struct thread_data *td,
273 struct io_u *io_u)
d5f9b0ea
IF
274{
275 struct rados_data *rados = td->io_ops_data;
276 struct fio_rados_iou *fri = io_u->engine_data;
21f277b8 277 char *object = io_u->file->file_name;
d5f9b0ea
IF
278 int r = -1;
279
280 fio_ro_check(td, io_u);
281
282 if (io_u->ddir == DDIR_WRITE) {
a21dafb0 283 r = rados_aio_create_completion(fri, complete_callback,
d5f9b0ea
IF
284 NULL, &fri->completion);
285 if (r < 0) {
286 log_err("rados_aio_create_completion failed.\n");
287 goto failed;
288 }
289
290 r = rados_aio_write(rados->io_ctx, object, fri->completion,
291 io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
292 if (r < 0) {
293 log_err("rados_write failed.\n");
294 goto failed_comp;
295 }
1e30d8d0 296 rados->ops_scheduled++;
d5f9b0ea
IF
297 return FIO_Q_QUEUED;
298 } else if (io_u->ddir == DDIR_READ) {
a21dafb0 299 r = rados_aio_create_completion(fri, complete_callback,
d5f9b0ea
IF
300 NULL, &fri->completion);
301 if (r < 0) {
302 log_err("rados_aio_create_completion failed.\n");
303 goto failed;
304 }
305 r = rados_aio_read(rados->io_ctx, object, fri->completion,
306 io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
307 if (r < 0) {
308 log_err("rados_aio_read failed.\n");
309 goto failed_comp;
310 }
1e30d8d0 311 rados->ops_scheduled++;
d5f9b0ea
IF
312 return FIO_Q_QUEUED;
313 } else if (io_u->ddir == DDIR_TRIM) {
a21dafb0 314 r = rados_aio_create_completion(fri, complete_callback,
d5f9b0ea
IF
315 NULL , &fri->completion);
316 if (r < 0) {
317 log_err("rados_aio_create_completion failed.\n");
318 goto failed;
319 }
320 fri->write_op = rados_create_write_op();
321 if (fri->write_op == NULL) {
322 log_err("rados_create_write_op failed.\n");
323 goto failed_comp;
324 }
325 rados_write_op_zero(fri->write_op, io_u->offset,
326 io_u->xfer_buflen);
327 r = rados_aio_write_op_operate(fri->write_op, rados->io_ctx,
328 fri->completion, object, NULL, 0);
329 if (r < 0) {
330 log_err("rados_aio_write_op_operate failed.\n");
331 goto failed_write_op;
332 }
1e30d8d0 333 rados->ops_scheduled++;
d5f9b0ea
IF
334 return FIO_Q_QUEUED;
335 }
336
337 log_err("WARNING: Only DDIR_READ, DDIR_WRITE and DDIR_TRIM are supported!");
338
339failed_write_op:
340 rados_release_write_op(fri->write_op);
341failed_comp:
342 rados_aio_release(fri->completion);
343failed:
344 io_u->error = -r;
345 td_verror(td, io_u->error, "xfer");
346 return FIO_Q_COMPLETED;
347}
348
349static struct io_u *fio_rados_event(struct thread_data *td, int event)
350{
351 struct rados_data *rados = td->io_ops_data;
352 return rados->aio_events[event];
353}
354
355int fio_rados_getevents(struct thread_data *td, unsigned int min,
356 unsigned int max, const struct timespec *t)
357{
358 struct rados_data *rados = td->io_ops_data;
d5f9b0ea 359 unsigned int events = 0;
d5f9b0ea 360 struct fio_rados_iou *fri;
a21dafb0
AK
361
362 pthread_mutex_lock(&rados->completed_lock);
363 while (events < min) {
364 while (flist_empty(&rados->completed_operations)) {
365 pthread_cond_wait(&rados->completed_more_io, &rados->completed_lock);
d5f9b0ea 366 }
a21dafb0
AK
367 assert(!flist_empty(&rados->completed_operations));
368
6a3f4e6c 369 fri = flist_first_entry(&rados->completed_operations, struct fio_rados_iou, list);
a21dafb0
AK
370 assert(fri->completion);
371 assert(rados_aio_is_complete(fri->completion));
372 if (fri->write_op != NULL) {
373 rados_release_write_op(fri->write_op);
374 fri->write_op = NULL;
375 }
376 rados_aio_release(fri->completion);
377 fri->completion = NULL;
378
379 rados->aio_events[events] = fri->io_u;
380 events ++;
381 flist_del(&fri->list);
382 if (events >= max) break;
383 }
384 pthread_mutex_unlock(&rados->completed_lock);
385 return events;
d5f9b0ea
IF
386}
387
388static int fio_rados_setup(struct thread_data *td)
389{
390 struct rados_data *rados = NULL;
391 int r;
392 /* allocate engine specific structure to deal with librados. */
393 r = _fio_setup_rados_data(td, &rados);
394 if (r) {
395 log_err("fio_setup_rados_data failed.\n");
396 goto cleanup;
397 }
398 td->io_ops_data = rados;
399
400 /* Force single process mode.
401 */
402 td->o.use_thread = 1;
403
404 /* connect in the main thread to determine to determine
405 * the size of the given RADOS block device. And disconnect
406 * later on.
407 */
408 r = _fio_rados_connect(td);
409 if (r) {
410 log_err("fio_rados_connect failed.\n");
411 goto cleanup;
412 }
413 rados->connected = true;
414
415 return 0;
416cleanup:
417 fio_rados_cleanup(td);
418 return r;
419}
420
421/* open/invalidate are noops. we set the FIO_DISKLESSIO flag in ioengine_ops to
422 prevent fio from creating the files
423*/
424static int fio_rados_open(struct thread_data *td, struct fio_file *f)
425{
426 return 0;
427}
428static int fio_rados_invalidate(struct thread_data *td, struct fio_file *f)
429{
430 return 0;
431}
432
433static void fio_rados_io_u_free(struct thread_data *td, struct io_u *io_u)
434{
435 struct fio_rados_iou *fri = io_u->engine_data;
436
437 if (fri) {
438 io_u->engine_data = NULL;
439 fri->td = NULL;
440 if (fri->completion)
441 rados_aio_release(fri->completion);
442 if (fri->write_op)
443 rados_release_write_op(fri->write_op);
444 free(fri);
445 }
446}
447
448static int fio_rados_io_u_init(struct thread_data *td, struct io_u *io_u)
449{
450 struct fio_rados_iou *fri;
451 fri = calloc(1, sizeof(*fri));
452 fri->io_u = io_u;
453 fri->td = td;
a21dafb0 454 INIT_FLIST_HEAD(&fri->list);
d5f9b0ea
IF
455 io_u->engine_data = fri;
456 return 0;
457}
458
459/* ioengine_ops for get_ioengine() */
5a8a6a03 460FIO_STATIC struct ioengine_ops ioengine = {
d5f9b0ea
IF
461 .name = "rados",
462 .version = FIO_IOOPS_VERSION,
463 .flags = FIO_DISKLESSIO,
464 .setup = fio_rados_setup,
465 .queue = fio_rados_queue,
466 .getevents = fio_rados_getevents,
467 .event = fio_rados_event,
468 .cleanup = fio_rados_cleanup,
469 .open_file = fio_rados_open,
470 .invalidate = fio_rados_invalidate,
471 .options = options,
472 .io_u_init = fio_rados_io_u_init,
473 .io_u_free = fio_rados_io_u_free,
474 .option_struct_size = sizeof(struct rados_options),
475};
476
477static void fio_init fio_rados_register(void)
478{
479 register_ioengine(&ioengine);
480}
481
482static void fio_exit fio_rados_unregister(void)
483{
484 unregister_ioengine(&ioengine);
485}