engines/rados: use fio provided file names
[fio.git] / engines / rados.c
1 /*
2  *  Ceph Rados engine
3  *
4  * IO engine using Ceph's RADOS interface to test low-level performance of
5  * Ceph OSDs.
6  *
7  */
8
9 #include <rados/librados.h>
10 #include <pthread.h>
11 #include "fio.h"
12 #include "../optgroup.h"
13
14 struct fio_rados_iou {
15         struct thread_data *td;
16         struct io_u *io_u;
17         rados_completion_t completion;
18         rados_write_op_t write_op;
19 };
20
21 struct rados_data {
22         rados_t cluster;
23         rados_ioctx_t io_ctx;
24         struct io_u **aio_events;
25         bool connected;
26 };
27
28 /* fio configuration options read from the job file */
29 struct rados_options {
30         void *pad;
31         char *cluster_name;
32         char *pool_name;
33         char *client_name;
34         int busy_poll;
35 };
36
37 static struct fio_option options[] = {
38         {
39                 .name     = "clustername",
40                 .lname    = "ceph cluster name",
41                 .type     = FIO_OPT_STR_STORE,
42                 .help     = "Cluster name for ceph",
43                 .off1     = offsetof(struct rados_options, cluster_name),
44                 .category = FIO_OPT_C_ENGINE,
45                 .group    = FIO_OPT_G_RBD,
46         },
47         {
48                 .name     = "pool",
49                 .lname    = "pool name to use",
50                 .type     = FIO_OPT_STR_STORE,
51                 .help     = "Ceph pool name to benchmark against",
52                 .off1     = offsetof(struct rados_options, pool_name),
53                 .category = FIO_OPT_C_ENGINE,
54                 .group    = FIO_OPT_G_RBD,
55         },
56         {
57                 .name     = "clientname",
58                 .lname    = "rados engine clientname",
59                 .type     = FIO_OPT_STR_STORE,
60                 .help     = "Name of the ceph client to access RADOS engine",
61                 .off1     = offsetof(struct rados_options, client_name),
62                 .category = FIO_OPT_C_ENGINE,
63                 .group    = FIO_OPT_G_RBD,
64         },
65         {
66                 .name     = "busy_poll",
67                 .lname    = "busy poll mode",
68                 .type     = FIO_OPT_BOOL,
69                 .help     = "Busy poll for completions instead of sleeping",
70                 .off1     = offsetof(struct rados_options, busy_poll),
71                 .def      = "0",
72                 .category = FIO_OPT_C_ENGINE,
73                 .group    = FIO_OPT_G_RBD,
74         },
75         {
76                 .name     = NULL,
77         },
78 };
79
80 static int _fio_setup_rados_data(struct thread_data *td,
81                                 struct rados_data **rados_data_ptr)
82 {
83         struct rados_data *rados;
84
85         if (td->io_ops_data)
86                 return 0;
87
88         rados = calloc(1, sizeof(struct rados_data));
89         if (!rados)
90                 goto failed;
91
92         rados->connected = false;
93
94         rados->aio_events = calloc(td->o.iodepth, sizeof(struct io_u *));
95         if (!rados->aio_events)
96                 goto failed;
97         *rados_data_ptr = rados;
98         return 0;
99
100 failed:
101         if (rados) {
102                 if (rados->aio_events)
103                         free(rados->aio_events);
104                 free(rados);
105         }
106         return 1;
107 }
108
109 static void _fio_rados_rm_objects(struct thread_data *td, struct rados_data *rados)
110 {
111         size_t i;
112         for (i = 0; i < td->o.nr_files; i++) {
113                 struct fio_file *f = td->files[i];
114                 rados_remove(rados->io_ctx, f->file_name);
115         }
116 }
117
118 static int _fio_rados_connect(struct thread_data *td)
119 {
120         struct rados_data *rados = td->io_ops_data;
121         struct rados_options *o = td->eo;
122         int r;
123         const uint64_t file_size =
124                 td->o.size / (td->o.nr_files ? td->o.nr_files : 1u);
125         struct fio_file *f;
126         uint32_t i;
127
128         if (o->cluster_name) {
129                 char *client_name = NULL;
130
131                 /*
132                 * If we specify cluser name, the rados_create2
133                 * will not assume 'client.'. name is considered
134                 * as a full type.id namestr
135                 */
136                 if (o->client_name) {
137                         if (!index(o->client_name, '.')) {
138                                 client_name = calloc(1, strlen("client.") +
139                                         strlen(o->client_name) + 1);
140                                 strcat(client_name, "client.");
141                                 strcat(client_name, o->client_name);
142                         } else {
143                                 client_name = o->client_name;
144                         }
145                 }
146
147                 r = rados_create2(&rados->cluster, o->cluster_name,
148                         client_name, 0);
149
150                 if (client_name && !index(o->client_name, '.'))
151                         free(client_name);
152         } else
153                 r = rados_create(&rados->cluster, o->client_name);
154
155         if (o->pool_name == NULL) {
156                 log_err("rados pool name must be provided.\n");
157                 goto failed_early;
158         }
159
160         if (r < 0) {
161                 log_err("rados_create failed.\n");
162                 goto failed_early;
163         }
164
165         r = rados_conf_read_file(rados->cluster, NULL);
166         if (r < 0) {
167                 log_err("rados_conf_read_file failed.\n");
168                 goto failed_early;
169         }
170
171         r = rados_connect(rados->cluster);
172         if (r < 0) {
173                 log_err("rados_connect failed.\n");
174                 goto failed_early;
175         }
176
177         r = rados_ioctx_create(rados->cluster, o->pool_name, &rados->io_ctx);
178         if (r < 0) {
179                 log_err("rados_ioctx_create failed.\n");
180                 goto failed_shutdown;
181         }
182
183         for (i = 0; i < td->o.nr_files; i++) {
184                 f = td->files[i];
185                 f->real_file_size = file_size;
186                 r = rados_write(rados->io_ctx, f->file_name, "", 0, 0);
187                 if (r < 0) {
188                         goto failed_obj_create;
189                 }
190         }
191         return 0;
192
193 failed_obj_create:
194         _fio_rados_rm_objects(td, rados);
195         rados_ioctx_destroy(rados->io_ctx);
196         rados->io_ctx = NULL;
197 failed_shutdown:
198         rados_shutdown(rados->cluster);
199         rados->cluster = NULL;
200 failed_early:
201         return 1;
202 }
203
204 static void _fio_rados_disconnect(struct rados_data *rados)
205 {
206         if (!rados)
207                 return;
208
209         if (rados->io_ctx) {
210                 rados_ioctx_destroy(rados->io_ctx);
211                 rados->io_ctx = NULL;
212         }
213
214         if (rados->cluster) {
215                 rados_shutdown(rados->cluster);
216                 rados->cluster = NULL;
217         }
218 }
219
220 static void fio_rados_cleanup(struct thread_data *td)
221 {
222         struct rados_data *rados = td->io_ops_data;
223
224         if (rados) {
225                 _fio_rados_rm_objects(td, rados);
226                 _fio_rados_disconnect(rados);
227                 free(rados->aio_events);
228                 free(rados);
229         }
230 }
231
232 static enum fio_q_status fio_rados_queue(struct thread_data *td,
233                                          struct io_u *io_u)
234 {
235         struct rados_data *rados = td->io_ops_data;
236         struct fio_rados_iou *fri = io_u->engine_data;
237         char *object = io_u->file->file_name;
238         int r = -1;
239
240         fio_ro_check(td, io_u);
241
242         if (io_u->ddir == DDIR_WRITE) {
243                  r = rados_aio_create_completion(fri, NULL,
244                         NULL, &fri->completion);
245                 if (r < 0) {
246                         log_err("rados_aio_create_completion failed.\n");
247                         goto failed;
248                 }
249
250                 r = rados_aio_write(rados->io_ctx, object, fri->completion,
251                         io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
252                 if (r < 0) {
253                         log_err("rados_write failed.\n");
254                         goto failed_comp;
255                 }
256                 return FIO_Q_QUEUED;
257         } else if (io_u->ddir == DDIR_READ) {
258                 r = rados_aio_create_completion(fri, NULL,
259                         NULL, &fri->completion);
260                 if (r < 0) {
261                         log_err("rados_aio_create_completion failed.\n");
262                         goto failed;
263                 }
264                 r = rados_aio_read(rados->io_ctx, object, fri->completion,
265                         io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
266                 if (r < 0) {
267                         log_err("rados_aio_read failed.\n");
268                         goto failed_comp;
269                 }
270                 return FIO_Q_QUEUED;
271         } else if (io_u->ddir == DDIR_TRIM) {
272                 r = rados_aio_create_completion(fri, NULL,
273                         NULL , &fri->completion);
274                 if (r < 0) {
275                         log_err("rados_aio_create_completion failed.\n");
276                         goto failed;
277                 }
278                 fri->write_op = rados_create_write_op();
279                 if (fri->write_op == NULL) {
280                         log_err("rados_create_write_op failed.\n");
281                         goto failed_comp;
282                 }
283                 rados_write_op_zero(fri->write_op, io_u->offset,
284                         io_u->xfer_buflen);
285                 r = rados_aio_write_op_operate(fri->write_op, rados->io_ctx,
286                         fri->completion, object, NULL, 0);
287                 if (r < 0) {
288                         log_err("rados_aio_write_op_operate failed.\n");
289                         goto failed_write_op;
290                 }
291                 return FIO_Q_QUEUED;
292          }
293
294         log_err("WARNING: Only DDIR_READ, DDIR_WRITE and DDIR_TRIM are supported!");
295
296 failed_write_op:
297         rados_release_write_op(fri->write_op);
298 failed_comp:
299         rados_aio_release(fri->completion);
300 failed:
301         io_u->error = -r;
302         td_verror(td, io_u->error, "xfer");
303         return FIO_Q_COMPLETED;
304 }
305
306 static struct io_u *fio_rados_event(struct thread_data *td, int event)
307 {
308         struct rados_data *rados = td->io_ops_data;
309         return rados->aio_events[event];
310 }
311
312 int fio_rados_getevents(struct thread_data *td, unsigned int min,
313         unsigned int max, const struct timespec *t)
314 {
315         struct rados_data *rados = td->io_ops_data;
316         struct rados_options *o = td->eo;
317         int busy_poll = o->busy_poll;
318         unsigned int events = 0;
319         struct io_u *u;
320         struct fio_rados_iou *fri;
321         unsigned int i;
322         rados_completion_t first_unfinished;
323         int observed_new = 0;
324
325         /* loop through inflight ios until we find 'min' completions */
326         do {
327                 first_unfinished = NULL;
328                 io_u_qiter(&td->io_u_all, u, i) {
329                         if (!(u->flags & IO_U_F_FLIGHT))
330                                 continue;
331
332                         fri = u->engine_data;
333                         if (fri->completion) {
334                                 if (rados_aio_is_complete(fri->completion)) {
335                                         if (fri->write_op != NULL) {
336                                                 rados_release_write_op(fri->write_op);
337                                                 fri->write_op = NULL;
338                                         }
339                                         rados_aio_release(fri->completion);
340                                         fri->completion = NULL;
341                                         rados->aio_events[events] = u;
342                                         events++;
343                                         observed_new = 1;
344                                 } else if (first_unfinished == NULL) {
345                                         first_unfinished = fri->completion;
346                                 }
347                         }
348                         if (events >= max)
349                                 break;
350                 }
351                 if (events >= min)
352                         return events;
353                 if (first_unfinished == NULL || busy_poll)
354                         continue;
355
356                 if (!observed_new)
357                         rados_aio_wait_for_complete(first_unfinished);
358         } while (1);
359   return events;
360 }
361
362 static int fio_rados_setup(struct thread_data *td)
363 {
364         struct rados_data *rados = NULL;
365         int r;
366         /* allocate engine specific structure to deal with librados. */
367         r = _fio_setup_rados_data(td, &rados);
368         if (r) {
369                 log_err("fio_setup_rados_data failed.\n");
370                 goto cleanup;
371         }
372         td->io_ops_data = rados;
373
374         /* Force single process mode.
375         */
376         td->o.use_thread = 1;
377
378         /* connect in the main thread to determine to determine
379         * the size of the given RADOS block device. And disconnect
380         * later on.
381         */
382         r = _fio_rados_connect(td);
383         if (r) {
384                 log_err("fio_rados_connect failed.\n");
385                 goto cleanup;
386         }
387         rados->connected = true;
388
389         return 0;
390 cleanup:
391         fio_rados_cleanup(td);
392         return r;
393 }
394
395 /* open/invalidate are noops. we set the FIO_DISKLESSIO flag in ioengine_ops to
396    prevent fio from creating the files
397 */
398 static int fio_rados_open(struct thread_data *td, struct fio_file *f)
399 {
400         return 0;
401 }
402 static int fio_rados_invalidate(struct thread_data *td, struct fio_file *f)
403 {
404         return 0;
405 }
406
407 static void fio_rados_io_u_free(struct thread_data *td, struct io_u *io_u)
408 {
409         struct fio_rados_iou *fri = io_u->engine_data;
410
411         if (fri) {
412                 io_u->engine_data = NULL;
413                 fri->td = NULL;
414                 if (fri->completion)
415                         rados_aio_release(fri->completion);
416                 if (fri->write_op)
417                         rados_release_write_op(fri->write_op);
418                 free(fri);
419         }
420 }
421
422 static int fio_rados_io_u_init(struct thread_data *td, struct io_u *io_u)
423 {
424         struct fio_rados_iou *fri;
425         fri = calloc(1, sizeof(*fri));
426         fri->io_u = io_u;
427         fri->td = td;
428         io_u->engine_data = fri;
429         return 0;
430 }
431
432 /* ioengine_ops for get_ioengine() */
433 static struct ioengine_ops ioengine = {
434         .name = "rados",
435         .version                = FIO_IOOPS_VERSION,
436         .flags                  = FIO_DISKLESSIO,
437         .setup                  = fio_rados_setup,
438         .queue                  = fio_rados_queue,
439         .getevents              = fio_rados_getevents,
440         .event                  = fio_rados_event,
441         .cleanup                = fio_rados_cleanup,
442         .open_file              = fio_rados_open,
443         .invalidate             = fio_rados_invalidate,
444         .options                = options,
445         .io_u_init              = fio_rados_io_u_init,
446         .io_u_free              = fio_rados_io_u_free,
447         .option_struct_size     = sizeof(struct rados_options),
448 };
449
450 static void fio_init fio_rados_register(void)
451 {
452         register_ioengine(&ioengine);
453 }
454
455 static void fio_exit fio_rados_unregister(void)
456 {
457         unregister_ioengine(&ioengine);
458 }