Merge branch 'evelu-exec' of https://github.com/ErwanAliasr1/fio
[fio.git] / engines / dfs.c
1 /**
2  * FIO engine for DAOS File System (dfs).
3  *
4  * (C) Copyright 2020-2021 Intel Corporation.
5  */
6
7 #include <fio.h>
8 #include <optgroup.h>
9
10 #include <daos.h>
11 #include <daos_fs.h>
12
13 static bool             daos_initialized;
14 static int              num_threads;
15 static pthread_mutex_t  daos_mutex = PTHREAD_MUTEX_INITIALIZER;
16 daos_handle_t           poh;  /* pool handle */
17 daos_handle_t           coh;  /* container handle */
18 daos_oclass_id_t        cid = OC_UNKNOWN;  /* object class */
19 dfs_t                   *dfs; /* dfs mount reference */
20
21 struct daos_iou {
22         struct io_u     *io_u;
23         daos_event_t    ev;
24         d_sg_list_t     sgl;
25         d_iov_t         iov;
26         daos_size_t     size;
27         bool            complete;
28 };
29
30 struct daos_data {
31         daos_handle_t   eqh;
32         dfs_obj_t       *obj;
33         struct io_u     **io_us;
34         int             queued;
35         int             num_ios;
36 };
37
38 struct daos_fio_options {
39         void            *pad;
40         char            *pool;   /* Pool UUID */
41         char            *cont;   /* Container UUID */
42         daos_size_t     chsz;    /* Chunk size */
43         char            *oclass; /* object class */
44 #if !defined(DAOS_API_VERSION_MAJOR) || DAOS_API_VERSION_MAJOR < 1
45         char            *svcl;   /* service replica list, deprecated */
46 #endif
47 };
48
49 static struct fio_option options[] = {
50         {
51                 .name           = "pool",
52                 .lname          = "pool uuid",
53                 .type           = FIO_OPT_STR_STORE,
54                 .off1           = offsetof(struct daos_fio_options, pool),
55                 .help           = "DAOS pool uuid",
56                 .category       = FIO_OPT_C_ENGINE,
57                 .group          = FIO_OPT_G_DFS,
58         },
59         {
60                 .name           = "cont",
61                 .lname          = "container uuid",
62                 .type           = FIO_OPT_STR_STORE,
63                 .off1           = offsetof(struct daos_fio_options, cont),
64                 .help           = "DAOS container uuid",
65                 .category       = FIO_OPT_C_ENGINE,
66                 .group          = FIO_OPT_G_DFS,
67         },
68         {
69                 .name           = "chunk_size",
70                 .lname          = "DFS chunk size",
71                 .type           = FIO_OPT_ULL,
72                 .off1           = offsetof(struct daos_fio_options, chsz),
73                 .help           = "DFS chunk size in bytes",
74                 .def            = "0", /* use container default */
75                 .category       = FIO_OPT_C_ENGINE,
76                 .group          = FIO_OPT_G_DFS,
77         },
78         {
79                 .name           = "object_class",
80                 .lname          = "object class",
81                 .type           = FIO_OPT_STR_STORE,
82                 .off1           = offsetof(struct daos_fio_options, oclass),
83                 .help           = "DAOS object class",
84                 .category       = FIO_OPT_C_ENGINE,
85                 .group          = FIO_OPT_G_DFS,
86         },
87 #if !defined(DAOS_API_VERSION_MAJOR) || DAOS_API_VERSION_MAJOR < 1
88         {
89                 .name           = "svcl",
90                 .lname          = "List of service ranks",
91                 .type           = FIO_OPT_STR_STORE,
92                 .off1           = offsetof(struct daos_fio_options, svcl),
93                 .help           = "List of pool replicated service ranks",
94                 .category       = FIO_OPT_C_ENGINE,
95                 .group          = FIO_OPT_G_DFS,
96         },
97 #endif
98         {
99                 .name           = NULL,
100         },
101 };
102
103 static int daos_fio_global_init(struct thread_data *td)
104 {
105         struct daos_fio_options *eo = td->eo;
106         uuid_t                  pool_uuid, co_uuid;
107         daos_pool_info_t        pool_info;
108         daos_cont_info_t        co_info;
109         int                     rc = 0;
110
111 #if !defined(DAOS_API_VERSION_MAJOR) || DAOS_API_VERSION_MAJOR < 1
112         if (!eo->pool || !eo->cont || !eo->svcl) {
113 #else
114         if (!eo->pool || !eo->cont) {
115 #endif
116                 log_err("Missing required DAOS options\n");
117                 return EINVAL;
118         }
119
120         rc = daos_init();
121         if (rc != -DER_ALREADY && rc) {
122                 log_err("Failed to initialize daos %d\n", rc);
123                 td_verror(td, rc, "daos_init");
124                 return rc;
125         }
126
127         rc = uuid_parse(eo->pool, pool_uuid);
128         if (rc) {
129                 log_err("Failed to parse 'Pool uuid': %s\n", eo->pool);
130                 td_verror(td, EINVAL, "uuid_parse(eo->pool)");
131                 return EINVAL;
132         }
133
134         rc = uuid_parse(eo->cont, co_uuid);
135         if (rc) {
136                 log_err("Failed to parse 'Cont uuid': %s\n", eo->cont);
137                 td_verror(td, EINVAL, "uuid_parse(eo->cont)");
138                 return EINVAL;
139         }
140
141         /* Connect to the DAOS pool */
142 #if !defined(DAOS_API_VERSION_MAJOR) || DAOS_API_VERSION_MAJOR < 1
143         d_rank_list_t *svcl = NULL;
144
145         svcl = daos_rank_list_parse(eo->svcl, ":");
146         if (svcl == NULL) {
147                 log_err("Failed to parse svcl\n");
148                 td_verror(td, EINVAL, "daos_rank_list_parse");
149                 return EINVAL;
150         }
151
152         rc = daos_pool_connect(pool_uuid, NULL, svcl, DAOS_PC_RW,
153                         &poh, &pool_info, NULL);
154         d_rank_list_free(svcl);
155 #else
156         rc = daos_pool_connect(pool_uuid, NULL, DAOS_PC_RW, &poh, &pool_info,
157                                NULL);
158 #endif
159         if (rc) {
160                 log_err("Failed to connect to pool %d\n", rc);
161                 td_verror(td, rc, "daos_pool_connect");
162                 return rc;
163         }
164
165         /* Open the DAOS container */
166         rc = daos_cont_open(poh, co_uuid, DAOS_COO_RW, &coh, &co_info, NULL);
167         if (rc) {
168                 log_err("Failed to open container: %d\n", rc);
169                 td_verror(td, rc, "daos_cont_open");
170                 (void)daos_pool_disconnect(poh, NULL);
171                 return rc;
172         }
173
174         /* Mount encapsulated filesystem */
175         rc = dfs_mount(poh, coh, O_RDWR, &dfs);
176         if (rc) {
177                 log_err("Failed to mount DFS namespace: %d\n", rc);
178                 td_verror(td, rc, "dfs_mount");
179                 (void)daos_pool_disconnect(poh, NULL);
180                 (void)daos_cont_close(coh, NULL);
181                 return rc;
182         }
183
184         /* Retrieve object class to use, if specified */
185         if (eo->oclass)
186                 cid = daos_oclass_name2id(eo->oclass);
187
188         return 0;
189 }
190
191 static int daos_fio_global_cleanup()
192 {
193         int rc;
194         int ret = 0;
195
196         rc = dfs_umount(dfs);
197         if (rc) {
198                 log_err("failed to umount dfs: %d\n", rc);
199                 ret = rc;
200         }
201         rc = daos_cont_close(coh, NULL);
202         if (rc) {
203                 log_err("failed to close container: %d\n", rc);
204                 if (ret == 0)
205                         ret = rc;
206         }
207         rc = daos_pool_disconnect(poh, NULL);
208         if (rc) {
209                 log_err("failed to disconnect pool: %d\n", rc);
210                 if (ret == 0)
211                         ret = rc;
212         }
213         rc = daos_fini();
214         if (rc) {
215                 log_err("failed to finalize daos: %d\n", rc);
216                 if (ret == 0)
217                         ret = rc;
218         }
219
220         return ret;
221 }
222
223 static int daos_fio_setup(struct thread_data *td)
224 {
225         return 0;
226 }
227
228 static int daos_fio_init(struct thread_data *td)
229 {
230         struct daos_data        *dd;
231         int                     rc = 0;
232
233         pthread_mutex_lock(&daos_mutex);
234
235         dd = malloc(sizeof(*dd));
236         if (dd == NULL) {
237                 log_err("Failed to allocate DAOS-private data\n");
238                 rc = ENOMEM;
239                 goto out;
240         }
241
242         dd->queued      = 0;
243         dd->num_ios     = td->o.iodepth;
244         dd->io_us       = calloc(dd->num_ios, sizeof(struct io_u *));
245         if (dd->io_us == NULL) {
246                 log_err("Failed to allocate IO queue\n");
247                 rc = ENOMEM;
248                 goto out;
249         }
250
251         /* initialize DAOS stack if not already up */
252         if (!daos_initialized) {
253                 rc = daos_fio_global_init(td);
254                 if (rc)
255                         goto out;
256                 daos_initialized = true;
257         }
258
259         rc = daos_eq_create(&dd->eqh);
260         if (rc) {
261                 log_err("Failed to create event queue: %d\n", rc);
262                 td_verror(td, rc, "daos_eq_create");
263                 goto out;
264         }
265
266         td->io_ops_data = dd;
267         num_threads++;
268 out:
269         if (rc) {
270                 if (dd) {
271                         free(dd->io_us);
272                         free(dd);
273                 }
274                 if (num_threads == 0 && daos_initialized) {
275                         /* don't clobber error return value */
276                         (void)daos_fio_global_cleanup();
277                         daos_initialized = false;
278                 }
279         }
280         pthread_mutex_unlock(&daos_mutex);
281         return rc;
282 }
283
284 static void daos_fio_cleanup(struct thread_data *td)
285 {
286         struct daos_data        *dd = td->io_ops_data;
287         int                     rc;
288
289         if (dd == NULL)
290                 return;
291
292         rc = daos_eq_destroy(dd->eqh, DAOS_EQ_DESTROY_FORCE);
293         if (rc < 0) {
294                 log_err("failed to destroy event queue: %d\n", rc);
295                 td_verror(td, rc, "daos_eq_destroy");
296         }
297
298         free(dd->io_us);
299         free(dd);
300
301         pthread_mutex_lock(&daos_mutex);
302         num_threads--;
303         if (daos_initialized && num_threads == 0) {
304                 int ret;
305
306                 ret = daos_fio_global_cleanup();
307                 if (ret < 0 && rc == 0) {
308                         log_err("failed to clean up: %d\n", ret);
309                         td_verror(td, ret, "daos_fio_global_cleanup");
310                 }
311                 daos_initialized = false;
312         }
313         pthread_mutex_unlock(&daos_mutex);
314 }
315
316 static int daos_fio_get_file_size(struct thread_data *td, struct fio_file *f)
317 {
318         char            *file_name = f->file_name;
319         struct stat     stbuf = {0};
320         int             rc;
321
322         dprint(FD_FILE, "dfs stat %s\n", f->file_name);
323
324         if (!daos_initialized)
325                 return 0;
326
327         rc = dfs_stat(dfs, NULL, file_name, &stbuf);
328         if (rc) {
329                 log_err("Failed to stat %s: %d\n", f->file_name, rc);
330                 td_verror(td, rc, "dfs_stat");
331                 return rc;
332         }
333
334         f->real_file_size = stbuf.st_size;
335         return 0;
336 }
337
338 static int daos_fio_close(struct thread_data *td, struct fio_file *f)
339 {
340         struct daos_data        *dd = td->io_ops_data;
341         int                     rc;
342
343         dprint(FD_FILE, "dfs release %s\n", f->file_name);
344
345         rc = dfs_release(dd->obj);
346         if (rc) {
347                 log_err("Failed to release %s: %d\n", f->file_name, rc);
348                 td_verror(td, rc, "dfs_release");
349                 return rc;
350         }
351
352         return 0;
353 }
354
355 static int daos_fio_open(struct thread_data *td, struct fio_file *f)
356 {
357         struct daos_data        *dd = td->io_ops_data;
358         struct daos_fio_options *eo = td->eo;
359         int                     flags = 0;
360         int                     rc;
361
362         dprint(FD_FILE, "dfs open %s (%s/%d/%d)\n",
363                f->file_name, td_write(td) & !read_only ? "rw" : "r",
364                td->o.create_on_open, td->o.allow_create);
365
366         if (td->o.create_on_open && td->o.allow_create)
367                 flags |= O_CREAT;
368
369         if (td_write(td)) {
370                 if (!read_only)
371                         flags |= O_RDWR;
372                 if (td->o.allow_create)
373                         flags |= O_CREAT;
374         } else if (td_read(td)) {
375                 flags |= O_RDONLY;
376         }
377
378         rc = dfs_open(dfs, NULL, f->file_name,
379                       S_IFREG | S_IRUSR | S_IWUSR,
380                       flags, cid, eo->chsz, NULL, &dd->obj);
381         if (rc) {
382                 log_err("Failed to open %s: %d\n", f->file_name, rc);
383                 td_verror(td, rc, "dfs_open");
384                 return rc;
385         }
386
387         return 0;
388 }
389
390 static int daos_fio_unlink(struct thread_data *td, struct fio_file *f)
391 {
392         int rc;
393
394         dprint(FD_FILE, "dfs remove %s\n", f->file_name);
395
396         rc = dfs_remove(dfs, NULL, f->file_name, false, NULL);
397         if (rc) {
398                 log_err("Failed to remove %s: %d\n", f->file_name, rc);
399                 td_verror(td, rc, "dfs_remove");
400                 return rc;
401         }
402
403         return 0;
404 }
405
406 static int daos_fio_invalidate(struct thread_data *td, struct fio_file *f)
407 {
408         dprint(FD_FILE, "dfs invalidate %s\n", f->file_name);
409         return 0;
410 }
411
412 static void daos_fio_io_u_free(struct thread_data *td, struct io_u *io_u)
413 {
414         struct daos_iou *io = io_u->engine_data;
415
416         if (io) {
417                 io_u->engine_data = NULL;
418                 free(io);
419         }
420 }
421
422 static int daos_fio_io_u_init(struct thread_data *td, struct io_u *io_u)
423 {
424         struct daos_iou *io;
425
426         io = malloc(sizeof(struct daos_iou));
427         if (!io) {
428                 td_verror(td, ENOMEM, "malloc");
429                 return ENOMEM;
430         }
431         io->io_u = io_u;
432         io_u->engine_data = io;
433         return 0;
434 }
435
436 static struct io_u * daos_fio_event(struct thread_data *td, int event)
437 {
438         struct daos_data *dd = td->io_ops_data;
439
440         return dd->io_us[event];
441 }
442
443 static int daos_fio_getevents(struct thread_data *td, unsigned int min,
444                               unsigned int max, const struct timespec *t)
445 {
446         struct daos_data        *dd = td->io_ops_data;
447         daos_event_t            *evp[max];
448         unsigned int            events = 0;
449         int                     i;
450         int                     rc;
451
452         while (events < min) {
453                 rc = daos_eq_poll(dd->eqh, 0, DAOS_EQ_NOWAIT, max, evp);
454                 if (rc < 0) {
455                         log_err("Event poll failed: %d\n", rc);
456                         td_verror(td, rc, "daos_eq_poll");
457                         return events;
458                 }
459
460                 for (i = 0; i < rc; i++) {
461                         struct daos_iou *io;
462                         struct io_u     *io_u;
463
464                         io = container_of(evp[i], struct daos_iou, ev);
465                         if (io->complete)
466                                 log_err("Completion on already completed I/O\n");
467
468                         io_u = io->io_u;
469                         if (io->ev.ev_error)
470                                 io_u->error = io->ev.ev_error;
471                         else
472                                 io_u->resid = 0;
473
474                         dd->io_us[events] = io_u;
475                         dd->queued--;
476                         daos_event_fini(&io->ev);
477                         io->complete = true;
478                         events++;
479                 }
480         }
481
482         dprint(FD_IO, "dfs eq_pool returning %d (%u/%u)\n", events, min, max);
483
484         return events;
485 }
486
487 static enum fio_q_status daos_fio_queue(struct thread_data *td,
488                                         struct io_u *io_u)
489 {
490         struct daos_data        *dd = td->io_ops_data;
491         struct daos_iou         *io = io_u->engine_data;
492         daos_off_t              offset = io_u->offset;
493         int                     rc;
494
495         if (dd->queued == td->o.iodepth)
496                 return FIO_Q_BUSY;
497
498         io->sgl.sg_nr = 1;
499         io->sgl.sg_nr_out = 0;
500         d_iov_set(&io->iov, io_u->xfer_buf, io_u->xfer_buflen);
501         io->sgl.sg_iovs = &io->iov;
502         io->size = io_u->xfer_buflen;
503
504         io->complete = false;
505         rc = daos_event_init(&io->ev, dd->eqh, NULL);
506         if (rc) {
507                 log_err("Event init failed: %d\n", rc);
508                 io_u->error = rc;
509                 return FIO_Q_COMPLETED;
510         }
511
512         switch (io_u->ddir) {
513         case DDIR_WRITE:
514                 rc = dfs_write(dfs, dd->obj, &io->sgl, offset, &io->ev);
515                 if (rc) {
516                         log_err("dfs_write failed: %d\n", rc);
517                         io_u->error = rc;
518                         return FIO_Q_COMPLETED;
519                 }
520                 break;
521         case DDIR_READ:
522                 rc = dfs_read(dfs, dd->obj, &io->sgl, offset, &io->size,
523                               &io->ev);
524                 if (rc) {
525                         log_err("dfs_read failed: %d\n", rc);
526                         io_u->error = rc;
527                         return FIO_Q_COMPLETED;
528                 }
529                 break;
530         case DDIR_SYNC:
531                 io_u->error = 0;
532                 return FIO_Q_COMPLETED;
533         default:
534                 dprint(FD_IO, "Invalid IO type: %d\n", io_u->ddir);
535                 io_u->error = -DER_INVAL;
536                 return FIO_Q_COMPLETED;
537         }
538
539         dd->queued++;
540         return FIO_Q_QUEUED;
541 }
542
543 static int daos_fio_prep(struct thread_data fio_unused *td, struct io_u *io_u)
544 {
545         return 0;
546 }
547
548 /* ioengine_ops for get_ioengine() */
549 FIO_STATIC struct ioengine_ops ioengine = {
550         .name                   = "dfs",
551         .version                = FIO_IOOPS_VERSION,
552         .flags                  = FIO_DISKLESSIO | FIO_NODISKUTIL,
553
554         .setup                  = daos_fio_setup,
555         .init                   = daos_fio_init,
556         .prep                   = daos_fio_prep,
557         .cleanup                = daos_fio_cleanup,
558
559         .open_file              = daos_fio_open,
560         .invalidate             = daos_fio_invalidate,
561         .get_file_size          = daos_fio_get_file_size,
562         .close_file             = daos_fio_close,
563         .unlink_file            = daos_fio_unlink,
564
565         .queue                  = daos_fio_queue,
566         .getevents              = daos_fio_getevents,
567         .event                  = daos_fio_event,
568         .io_u_init              = daos_fio_io_u_init,
569         .io_u_free              = daos_fio_io_u_free,
570
571         .option_struct_size     = sizeof(struct daos_fio_options),
572         .options                = options,
573 };
574
575 static void fio_init fio_dfs_register(void)
576 {
577         register_ioengine(&ioengine);
578 }
579
580 static void fio_exit fio_dfs_unregister(void)
581 {
582         unregister_ioengine(&ioengine);
583 }