test: add test for readwrite issue
[fio.git] / engines / dfs.c
CommitLineData
c363fdd7
JL
1/**
2 * FIO engine for DAOS File System (dfs).
3 *
4 * (C) Copyright 2020-2021 Intel Corporation.
5 */
6
7#include <fio.h>
8#include <optgroup.h>
9
10#include <daos.h>
11#include <daos_fs.h>
12
13static bool daos_initialized;
14static int num_threads;
15static pthread_mutex_t daos_mutex = PTHREAD_MUTEX_INITIALIZER;
16daos_handle_t poh; /* pool handle */
17daos_handle_t coh; /* container handle */
18daos_oclass_id_t cid = OC_UNKNOWN; /* object class */
19dfs_t *dfs; /* dfs mount reference */
20
21struct daos_iou {
22 struct io_u *io_u;
23 daos_event_t ev;
24 d_sg_list_t sgl;
25 d_iov_t iov;
26 daos_size_t size;
27 bool complete;
28};
29
30struct daos_data {
31 daos_handle_t eqh;
32 dfs_obj_t *obj;
33 struct io_u **io_us;
34 int queued;
35 int num_ios;
36};
37
38struct daos_fio_options {
39 void *pad;
40 char *pool; /* Pool UUID */
41 char *cont; /* Container UUID */
42 daos_size_t chsz; /* Chunk size */
43 char *oclass; /* object class */
44#if !defined(DAOS_API_VERSION_MAJOR) || DAOS_API_VERSION_MAJOR < 1
45 char *svcl; /* service replica list, deprecated */
46#endif
47};
48
49static struct fio_option options[] = {
50 {
51 .name = "pool",
2819492b 52 .lname = "pool uuid or label",
c363fdd7
JL
53 .type = FIO_OPT_STR_STORE,
54 .off1 = offsetof(struct daos_fio_options, pool),
2819492b 55 .help = "DAOS pool uuid or label",
c363fdd7
JL
56 .category = FIO_OPT_C_ENGINE,
57 .group = FIO_OPT_G_DFS,
58 },
59 {
60 .name = "cont",
2819492b 61 .lname = "container uuid or label",
c363fdd7
JL
62 .type = FIO_OPT_STR_STORE,
63 .off1 = offsetof(struct daos_fio_options, cont),
2819492b 64 .help = "DAOS container uuid or label",
c363fdd7
JL
65 .category = FIO_OPT_C_ENGINE,
66 .group = FIO_OPT_G_DFS,
67 },
68 {
69 .name = "chunk_size",
70 .lname = "DFS chunk size",
71 .type = FIO_OPT_ULL,
72 .off1 = offsetof(struct daos_fio_options, chsz),
73 .help = "DFS chunk size in bytes",
74 .def = "0", /* use container default */
75 .category = FIO_OPT_C_ENGINE,
76 .group = FIO_OPT_G_DFS,
77 },
78 {
79 .name = "object_class",
80 .lname = "object class",
81 .type = FIO_OPT_STR_STORE,
82 .off1 = offsetof(struct daos_fio_options, oclass),
83 .help = "DAOS object class",
84 .category = FIO_OPT_C_ENGINE,
85 .group = FIO_OPT_G_DFS,
86 },
87#if !defined(DAOS_API_VERSION_MAJOR) || DAOS_API_VERSION_MAJOR < 1
88 {
89 .name = "svcl",
90 .lname = "List of service ranks",
91 .type = FIO_OPT_STR_STORE,
92 .off1 = offsetof(struct daos_fio_options, svcl),
93 .help = "List of pool replicated service ranks",
94 .category = FIO_OPT_C_ENGINE,
95 .group = FIO_OPT_G_DFS,
96 },
97#endif
98 {
99 .name = NULL,
100 },
101};
102
103static int daos_fio_global_init(struct thread_data *td)
104{
105 struct daos_fio_options *eo = td->eo;
c363fdd7
JL
106 daos_pool_info_t pool_info;
107 daos_cont_info_t co_info;
108 int rc = 0;
109
110#if !defined(DAOS_API_VERSION_MAJOR) || DAOS_API_VERSION_MAJOR < 1
111 if (!eo->pool || !eo->cont || !eo->svcl) {
112#else
113 if (!eo->pool || !eo->cont) {
114#endif
115 log_err("Missing required DAOS options\n");
116 return EINVAL;
117 }
118
119 rc = daos_init();
120 if (rc != -DER_ALREADY && rc) {
121 log_err("Failed to initialize daos %d\n", rc);
122 td_verror(td, rc, "daos_init");
123 return rc;
124 }
125
2819492b
JL
126#if !defined(DAOS_API_VERSION_MAJOR) || \
127 (DAOS_API_VERSION_MAJOR == 1 && DAOS_API_VERSION_MINOR < 3)
128 uuid_t pool_uuid, co_uuid;
129
c363fdd7
JL
130 rc = uuid_parse(eo->pool, pool_uuid);
131 if (rc) {
132 log_err("Failed to parse 'Pool uuid': %s\n", eo->pool);
133 td_verror(td, EINVAL, "uuid_parse(eo->pool)");
134 return EINVAL;
135 }
136
137 rc = uuid_parse(eo->cont, co_uuid);
138 if (rc) {
139 log_err("Failed to parse 'Cont uuid': %s\n", eo->cont);
140 td_verror(td, EINVAL, "uuid_parse(eo->cont)");
141 return EINVAL;
142 }
2819492b 143#endif
c363fdd7
JL
144
145 /* Connect to the DAOS pool */
146#if !defined(DAOS_API_VERSION_MAJOR) || DAOS_API_VERSION_MAJOR < 1
147 d_rank_list_t *svcl = NULL;
148
149 svcl = daos_rank_list_parse(eo->svcl, ":");
150 if (svcl == NULL) {
151 log_err("Failed to parse svcl\n");
152 td_verror(td, EINVAL, "daos_rank_list_parse");
153 return EINVAL;
154 }
155
156 rc = daos_pool_connect(pool_uuid, NULL, svcl, DAOS_PC_RW,
157 &poh, &pool_info, NULL);
158 d_rank_list_free(svcl);
2819492b 159#elif (DAOS_API_VERSION_MAJOR == 1 && DAOS_API_VERSION_MINOR < 3)
c363fdd7
JL
160 rc = daos_pool_connect(pool_uuid, NULL, DAOS_PC_RW, &poh, &pool_info,
161 NULL);
2819492b
JL
162#else
163 rc = daos_pool_connect(eo->pool, NULL, DAOS_PC_RW, &poh, &pool_info,
164 NULL);
c363fdd7
JL
165#endif
166 if (rc) {
167 log_err("Failed to connect to pool %d\n", rc);
168 td_verror(td, rc, "daos_pool_connect");
169 return rc;
170 }
171
172 /* Open the DAOS container */
2819492b
JL
173#if !defined(DAOS_API_VERSION_MAJOR) || \
174 (DAOS_API_VERSION_MAJOR == 1 && DAOS_API_VERSION_MINOR < 3)
c363fdd7 175 rc = daos_cont_open(poh, co_uuid, DAOS_COO_RW, &coh, &co_info, NULL);
2819492b
JL
176#else
177 rc = daos_cont_open(poh, eo->cont, DAOS_COO_RW, &coh, &co_info, NULL);
178#endif
c363fdd7
JL
179 if (rc) {
180 log_err("Failed to open container: %d\n", rc);
181 td_verror(td, rc, "daos_cont_open");
182 (void)daos_pool_disconnect(poh, NULL);
183 return rc;
184 }
185
186 /* Mount encapsulated filesystem */
187 rc = dfs_mount(poh, coh, O_RDWR, &dfs);
188 if (rc) {
189 log_err("Failed to mount DFS namespace: %d\n", rc);
190 td_verror(td, rc, "dfs_mount");
191 (void)daos_pool_disconnect(poh, NULL);
192 (void)daos_cont_close(coh, NULL);
193 return rc;
194 }
195
196 /* Retrieve object class to use, if specified */
197 if (eo->oclass)
198 cid = daos_oclass_name2id(eo->oclass);
199
200 return 0;
201}
202
203static int daos_fio_global_cleanup()
204{
205 int rc;
206 int ret = 0;
207
208 rc = dfs_umount(dfs);
209 if (rc) {
210 log_err("failed to umount dfs: %d\n", rc);
211 ret = rc;
212 }
213 rc = daos_cont_close(coh, NULL);
214 if (rc) {
215 log_err("failed to close container: %d\n", rc);
216 if (ret == 0)
217 ret = rc;
218 }
219 rc = daos_pool_disconnect(poh, NULL);
220 if (rc) {
221 log_err("failed to disconnect pool: %d\n", rc);
222 if (ret == 0)
223 ret = rc;
224 }
225 rc = daos_fini();
226 if (rc) {
227 log_err("failed to finalize daos: %d\n", rc);
228 if (ret == 0)
229 ret = rc;
230 }
231
232 return ret;
233}
234
235static int daos_fio_setup(struct thread_data *td)
236{
237 return 0;
238}
239
240static int daos_fio_init(struct thread_data *td)
241{
242 struct daos_data *dd;
243 int rc = 0;
244
245 pthread_mutex_lock(&daos_mutex);
246
247 dd = malloc(sizeof(*dd));
248 if (dd == NULL) {
249 log_err("Failed to allocate DAOS-private data\n");
250 rc = ENOMEM;
251 goto out;
252 }
253
254 dd->queued = 0;
255 dd->num_ios = td->o.iodepth;
256 dd->io_us = calloc(dd->num_ios, sizeof(struct io_u *));
257 if (dd->io_us == NULL) {
258 log_err("Failed to allocate IO queue\n");
259 rc = ENOMEM;
260 goto out;
261 }
262
263 /* initialize DAOS stack if not already up */
264 if (!daos_initialized) {
265 rc = daos_fio_global_init(td);
266 if (rc)
267 goto out;
268 daos_initialized = true;
269 }
270
271 rc = daos_eq_create(&dd->eqh);
272 if (rc) {
273 log_err("Failed to create event queue: %d\n", rc);
274 td_verror(td, rc, "daos_eq_create");
275 goto out;
276 }
277
278 td->io_ops_data = dd;
279 num_threads++;
280out:
281 if (rc) {
282 if (dd) {
283 free(dd->io_us);
284 free(dd);
285 }
286 if (num_threads == 0 && daos_initialized) {
287 /* don't clobber error return value */
288 (void)daos_fio_global_cleanup();
289 daos_initialized = false;
290 }
291 }
292 pthread_mutex_unlock(&daos_mutex);
293 return rc;
294}
295
296static void daos_fio_cleanup(struct thread_data *td)
297{
298 struct daos_data *dd = td->io_ops_data;
299 int rc;
300
301 if (dd == NULL)
302 return;
303
304 rc = daos_eq_destroy(dd->eqh, DAOS_EQ_DESTROY_FORCE);
305 if (rc < 0) {
306 log_err("failed to destroy event queue: %d\n", rc);
307 td_verror(td, rc, "daos_eq_destroy");
308 }
309
310 free(dd->io_us);
311 free(dd);
312
313 pthread_mutex_lock(&daos_mutex);
314 num_threads--;
315 if (daos_initialized && num_threads == 0) {
316 int ret;
317
318 ret = daos_fio_global_cleanup();
319 if (ret < 0 && rc == 0) {
320 log_err("failed to clean up: %d\n", ret);
321 td_verror(td, ret, "daos_fio_global_cleanup");
322 }
323 daos_initialized = false;
324 }
325 pthread_mutex_unlock(&daos_mutex);
326}
327
328static int daos_fio_get_file_size(struct thread_data *td, struct fio_file *f)
329{
330 char *file_name = f->file_name;
331 struct stat stbuf = {0};
332 int rc;
333
334 dprint(FD_FILE, "dfs stat %s\n", f->file_name);
335
336 if (!daos_initialized)
337 return 0;
338
339 rc = dfs_stat(dfs, NULL, file_name, &stbuf);
340 if (rc) {
341 log_err("Failed to stat %s: %d\n", f->file_name, rc);
342 td_verror(td, rc, "dfs_stat");
343 return rc;
344 }
345
346 f->real_file_size = stbuf.st_size;
347 return 0;
348}
349
350static int daos_fio_close(struct thread_data *td, struct fio_file *f)
351{
352 struct daos_data *dd = td->io_ops_data;
353 int rc;
354
355 dprint(FD_FILE, "dfs release %s\n", f->file_name);
356
357 rc = dfs_release(dd->obj);
358 if (rc) {
359 log_err("Failed to release %s: %d\n", f->file_name, rc);
360 td_verror(td, rc, "dfs_release");
361 return rc;
362 }
363
364 return 0;
365}
366
367static int daos_fio_open(struct thread_data *td, struct fio_file *f)
368{
369 struct daos_data *dd = td->io_ops_data;
370 struct daos_fio_options *eo = td->eo;
371 int flags = 0;
372 int rc;
373
374 dprint(FD_FILE, "dfs open %s (%s/%d/%d)\n",
375 f->file_name, td_write(td) & !read_only ? "rw" : "r",
376 td->o.create_on_open, td->o.allow_create);
377
378 if (td->o.create_on_open && td->o.allow_create)
379 flags |= O_CREAT;
380
381 if (td_write(td)) {
382 if (!read_only)
383 flags |= O_RDWR;
384 if (td->o.allow_create)
385 flags |= O_CREAT;
386 } else if (td_read(td)) {
387 flags |= O_RDONLY;
388 }
389
390 rc = dfs_open(dfs, NULL, f->file_name,
391 S_IFREG | S_IRUSR | S_IWUSR,
392 flags, cid, eo->chsz, NULL, &dd->obj);
393 if (rc) {
394 log_err("Failed to open %s: %d\n", f->file_name, rc);
395 td_verror(td, rc, "dfs_open");
396 return rc;
397 }
398
399 return 0;
400}
401
402static int daos_fio_unlink(struct thread_data *td, struct fio_file *f)
403{
404 int rc;
405
406 dprint(FD_FILE, "dfs remove %s\n", f->file_name);
407
408 rc = dfs_remove(dfs, NULL, f->file_name, false, NULL);
409 if (rc) {
410 log_err("Failed to remove %s: %d\n", f->file_name, rc);
411 td_verror(td, rc, "dfs_remove");
412 return rc;
413 }
414
415 return 0;
416}
417
418static int daos_fio_invalidate(struct thread_data *td, struct fio_file *f)
419{
420 dprint(FD_FILE, "dfs invalidate %s\n", f->file_name);
421 return 0;
422}
423
424static void daos_fio_io_u_free(struct thread_data *td, struct io_u *io_u)
425{
426 struct daos_iou *io = io_u->engine_data;
427
428 if (io) {
429 io_u->engine_data = NULL;
430 free(io);
431 }
432}
433
434static int daos_fio_io_u_init(struct thread_data *td, struct io_u *io_u)
435{
436 struct daos_iou *io;
437
438 io = malloc(sizeof(struct daos_iou));
439 if (!io) {
440 td_verror(td, ENOMEM, "malloc");
441 return ENOMEM;
442 }
443 io->io_u = io_u;
444 io_u->engine_data = io;
445 return 0;
446}
447
448static struct io_u * daos_fio_event(struct thread_data *td, int event)
449{
450 struct daos_data *dd = td->io_ops_data;
451
452 return dd->io_us[event];
453}
454
455static int daos_fio_getevents(struct thread_data *td, unsigned int min,
456 unsigned int max, const struct timespec *t)
457{
458 struct daos_data *dd = td->io_ops_data;
459 daos_event_t *evp[max];
460 unsigned int events = 0;
461 int i;
462 int rc;
463
464 while (events < min) {
465 rc = daos_eq_poll(dd->eqh, 0, DAOS_EQ_NOWAIT, max, evp);
466 if (rc < 0) {
467 log_err("Event poll failed: %d\n", rc);
468 td_verror(td, rc, "daos_eq_poll");
469 return events;
470 }
471
472 for (i = 0; i < rc; i++) {
473 struct daos_iou *io;
474 struct io_u *io_u;
475
476 io = container_of(evp[i], struct daos_iou, ev);
477 if (io->complete)
478 log_err("Completion on already completed I/O\n");
479
480 io_u = io->io_u;
481 if (io->ev.ev_error)
482 io_u->error = io->ev.ev_error;
483 else
484 io_u->resid = 0;
485
486 dd->io_us[events] = io_u;
487 dd->queued--;
488 daos_event_fini(&io->ev);
489 io->complete = true;
490 events++;
491 }
492 }
493
494 dprint(FD_IO, "dfs eq_pool returning %d (%u/%u)\n", events, min, max);
495
496 return events;
497}
498
499static enum fio_q_status daos_fio_queue(struct thread_data *td,
500 struct io_u *io_u)
501{
502 struct daos_data *dd = td->io_ops_data;
503 struct daos_iou *io = io_u->engine_data;
504 daos_off_t offset = io_u->offset;
505 int rc;
506
507 if (dd->queued == td->o.iodepth)
508 return FIO_Q_BUSY;
509
510 io->sgl.sg_nr = 1;
511 io->sgl.sg_nr_out = 0;
512 d_iov_set(&io->iov, io_u->xfer_buf, io_u->xfer_buflen);
513 io->sgl.sg_iovs = &io->iov;
514 io->size = io_u->xfer_buflen;
515
516 io->complete = false;
517 rc = daos_event_init(&io->ev, dd->eqh, NULL);
518 if (rc) {
519 log_err("Event init failed: %d\n", rc);
520 io_u->error = rc;
521 return FIO_Q_COMPLETED;
522 }
523
524 switch (io_u->ddir) {
525 case DDIR_WRITE:
526 rc = dfs_write(dfs, dd->obj, &io->sgl, offset, &io->ev);
527 if (rc) {
528 log_err("dfs_write failed: %d\n", rc);
529 io_u->error = rc;
530 return FIO_Q_COMPLETED;
531 }
532 break;
533 case DDIR_READ:
534 rc = dfs_read(dfs, dd->obj, &io->sgl, offset, &io->size,
535 &io->ev);
536 if (rc) {
537 log_err("dfs_read failed: %d\n", rc);
538 io_u->error = rc;
539 return FIO_Q_COMPLETED;
540 }
541 break;
542 case DDIR_SYNC:
543 io_u->error = 0;
544 return FIO_Q_COMPLETED;
545 default:
546 dprint(FD_IO, "Invalid IO type: %d\n", io_u->ddir);
547 io_u->error = -DER_INVAL;
548 return FIO_Q_COMPLETED;
549 }
550
551 dd->queued++;
552 return FIO_Q_QUEUED;
553}
554
555static int daos_fio_prep(struct thread_data fio_unused *td, struct io_u *io_u)
556{
557 return 0;
558}
559
560/* ioengine_ops for get_ioengine() */
561FIO_STATIC struct ioengine_ops ioengine = {
562 .name = "dfs",
563 .version = FIO_IOOPS_VERSION,
564 .flags = FIO_DISKLESSIO | FIO_NODISKUTIL,
565
566 .setup = daos_fio_setup,
567 .init = daos_fio_init,
568 .prep = daos_fio_prep,
569 .cleanup = daos_fio_cleanup,
570
571 .open_file = daos_fio_open,
572 .invalidate = daos_fio_invalidate,
573 .get_file_size = daos_fio_get_file_size,
574 .close_file = daos_fio_close,
575 .unlink_file = daos_fio_unlink,
576
577 .queue = daos_fio_queue,
578 .getevents = daos_fio_getevents,
579 .event = daos_fio_event,
580 .io_u_init = daos_fio_io_u_init,
581 .io_u_free = daos_fio_io_u_free,
582
583 .option_struct_size = sizeof(struct daos_fio_options),
584 .options = options,
585};
586
587static void fio_init fio_dfs_register(void)
588{
589 register_ioengine(&ioengine);
590}
591
592static void fio_exit fio_dfs_unregister(void)
593{
594 unregister_ioengine(&ioengine);
595}