docs: update for new data placement options
[fio.git] / engines / ime.c
1 /*
2  * FIO engines for DDN's Infinite Memory Engine.
3  * This file defines 3 engines: ime_psync, ime_psyncv, and ime_aio
4  *
5  * Copyright (C) 2018      DataDirect Networks. All rights reserved.
6  *
7  * This program is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU General Public License,
9  * version 2 as published by the Free Software Foundation..
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  */
16
17 /*
18  * Some details about the new engines are given below:
19  *
20  *
21  * ime_psync:
22  * Most basic engine that issues calls to ime_native whenever an IO is queued.
23  *
24  * ime_psyncv:
25  * This engine tries to queue the IOs (by creating iovecs) if asked by FIO (via
26  * iodepth_batch). It refuses to queue when the iovecs can't be appended, and
27  * waits for FIO to issue a commit. After a call to commit and get_events, new
28  * IOs can be queued.
29  *
30  * ime_aio:
31  * This engine tries to queue the IOs (by creating iovecs) if asked by FIO (via
32  * iodepth_batch). When the iovecs can't be appended to the current request, a
33  * new request for IME is created. These requests will be issued to IME when
34  * commit is called. Contrary to ime_psyncv, there can be several requests at
35  * once. We don't need to wait for a request to terminate before creating a new
36  * one.
37  */
38
39 #include <stdio.h>
40 #include <stdlib.h>
41 #include <errno.h>
42 #include <linux/limits.h>
43 #include <ime_native.h>
44
45 #include "../fio.h"
46
47
48 /**************************************************************
49  *              Types and constants definitions
50  *
51  **************************************************************/
52
53 /* define constants for async IOs */
54 #define FIO_IME_IN_PROGRESS -1
55 #define FIO_IME_REQ_ERROR   -2
56
57 /* This flag is used when some jobs were created using threads. In that
58    case, IME can't be finalized in the engine-specific cleanup function,
59    because other threads might still use IME. Instead, IME is finalized
60    in the destructor (see fio_ime_unregister), only when the flag
61    fio_ime_is_initialized is true (which means at least one thread has
62    initialized IME). */
63 static bool fio_ime_is_initialized = false;
64
65 struct imesio_req {
66         int                     fd;             /* File descriptor */
67         enum fio_ddir   ddir;   /* Type of IO (read or write) */
68         off_t                   offset; /* File offset */
69 };
70 struct imeaio_req {
71         struct ime_aiocb        iocb;                   /* IME aio request */
72         ssize_t                 status;                 /* Status of the IME request */
73         enum fio_ddir           ddir;                   /* Type of IO (read or write) */
74         pthread_cond_t          cond_endio;             /* Condition var to notify FIO */
75         pthread_mutex_t         status_mutex;   /* Mutex for cond_endio */
76 };
77
78 /* This structure will be used for 2 engines: ime_psyncv and ime_aio */
79 struct ime_data {
80         union {
81                 struct imeaio_req       *aioreqs;       /* array of aio requests */
82                 struct imesio_req       *sioreq;        /* pointer to the only syncio request */
83         };
84         struct iovec    *iovecs;                /* array of queued iovecs */
85         struct io_u     **io_us;                /* array of queued io_u pointers */
86         struct io_u     **event_io_us;  /* array of the events retieved afer get_events*/
87         unsigned int    queued;                 /* iovecs/io_us in the queue */
88         unsigned int    events;                 /* number of committed iovecs/io_us */
89
90         /* variables used to implement a "ring" queue */
91         unsigned int depth;                     /* max entries in the queue */
92         unsigned int head;                      /* index used to append */
93         unsigned int tail;                      /* index used to pop */
94         unsigned int cur_commit;        /* index of the first uncommitted req */
95
96         /* offset used by the last iovec (used to check if the iovecs can be appended)*/
97         unsigned long long      last_offset;
98
99         /* The variables below are used for aio only */
100         struct imeaio_req       *last_req; /* last request awaiting committing */
101 };
102
103
104 /**************************************************************
105  *         Private functions for queueing/unqueueing
106  *
107  **************************************************************/
108
109 static void fio_ime_queue_incr (struct ime_data *ime_d)
110 {
111         ime_d->head = (ime_d->head + 1) % ime_d->depth;
112         ime_d->queued++;
113 }
114
115 static void fio_ime_queue_red (struct ime_data *ime_d)
116 {
117         ime_d->tail = (ime_d->tail + 1) % ime_d->depth;
118         ime_d->queued--;
119         ime_d->events--;
120 }
121
122 static void fio_ime_queue_commit (struct ime_data *ime_d, int iovcnt)
123 {
124         ime_d->cur_commit = (ime_d->cur_commit + iovcnt) % ime_d->depth;
125         ime_d->events += iovcnt;
126 }
127
128 static void fio_ime_queue_reset (struct ime_data *ime_d)
129 {
130         ime_d->head = 0;
131         ime_d->tail = 0;
132         ime_d->cur_commit = 0;
133         ime_d->queued = 0;
134         ime_d->events = 0;
135 }
136
137 /**************************************************************
138  *                   General IME functions
139  *             (needed for both sync and async IOs)
140  **************************************************************/
141
142 static char *fio_set_ime_filename(char* filename)
143 {
144         static __thread char ime_filename[PATH_MAX];
145         int ret;
146
147         ret = snprintf(ime_filename, PATH_MAX, "%s%s", DEFAULT_IME_FILE_PREFIX, filename);
148         if (ret < PATH_MAX)
149                 return ime_filename;
150
151         return NULL;
152 }
153
154 static int fio_ime_get_file_size(struct thread_data *td, struct fio_file *f)
155 {
156         struct stat buf;
157         int ret;
158         char *ime_filename;
159
160         dprint(FD_FILE, "get file size %s\n", f->file_name);
161
162         ime_filename = fio_set_ime_filename(f->file_name);
163         if (ime_filename == NULL)
164                 return 1;
165         ret = ime_native_stat(ime_filename, &buf);
166         if (ret == -1) {
167                 td_verror(td, errno, "fstat");
168                 return 1;
169         }
170
171         f->real_file_size = buf.st_size;
172         return 0;
173 }
174
175 /* This functions mimics the generic_file_open function, but issues
176    IME native calls instead of POSIX calls. */
177 static int fio_ime_open_file(struct thread_data *td, struct fio_file *f)
178 {
179         int flags = 0;
180         int ret;
181         uint64_t desired_fs;
182         char *ime_filename;
183
184         dprint(FD_FILE, "fd open %s\n", f->file_name);
185
186         if (td_trim(td)) {
187                 td_verror(td, EINVAL, "IME does not support TRIM operation");
188                 return 1;
189         }
190
191         if (td->o.oatomic) {
192                 td_verror(td, EINVAL, "IME does not support atomic IO");
193                 return 1;
194         }
195         if (td->o.odirect)
196                 flags |= O_DIRECT;
197         flags |= td->o.sync_io;
198         if (td->o.create_on_open && td->o.allow_create)
199                 flags |= O_CREAT;
200
201         if (td_write(td)) {
202                 if (!read_only)
203                         flags |= O_RDWR;
204
205                 if (td->o.allow_create)
206                         flags |= O_CREAT;
207         } else if (td_read(td)) {
208                 flags |= O_RDONLY;
209         } else {
210                 /* We should never go here. */
211                 td_verror(td, EINVAL, "Unsopported open mode");
212                 return 1;
213         }
214
215         ime_filename = fio_set_ime_filename(f->file_name);
216         if (ime_filename == NULL)
217                 return 1;
218         f->fd = ime_native_open(ime_filename, flags, 0600);
219         if (f->fd == -1) {
220                 char buf[FIO_VERROR_SIZE];
221                 int __e = errno;
222
223                 snprintf(buf, sizeof(buf), "open(%s)", f->file_name);
224                 td_verror(td, __e, buf);
225                 return 1;
226         }
227
228         /* Now we need to make sure the real file size is sufficient for FIO
229            to do its things. This is normally done before the file open function
230            is called, but because FIO would use POSIX calls, we need to do it
231            ourselves */
232         ret = fio_ime_get_file_size(td, f);
233         if (ret < 0) {
234                 ime_native_close(f->fd);
235                 td_verror(td, errno, "ime_get_file_size");
236                 return 1;
237         }
238
239         desired_fs = f->io_size + f->file_offset;
240         if (td_write(td)) {
241                 dprint(FD_FILE, "Laying out file %s%s\n",
242                         DEFAULT_IME_FILE_PREFIX, f->file_name);
243                 if (!td->o.create_on_open &&
244                                 f->real_file_size < desired_fs &&
245                                 ime_native_ftruncate(f->fd, desired_fs) < 0) {
246                         ime_native_close(f->fd);
247                         td_verror(td, errno, "ime_native_ftruncate");
248                         return 1;
249                 }
250                 if (f->real_file_size < desired_fs)
251                         f->real_file_size = desired_fs;
252         } else if (td_read(td) && f->real_file_size < desired_fs) {
253                 ime_native_close(f->fd);
254                 log_err("error: can't read %lu bytes from file with "
255                                                 "%lu bytes\n", desired_fs, f->real_file_size);
256                 return 1;
257         }
258
259         return 0;
260 }
261
262 static int fio_ime_close_file(struct thread_data fio_unused *td, struct fio_file *f)
263 {
264         int ret = 0;
265
266         dprint(FD_FILE, "fd close %s\n", f->file_name);
267
268         if (ime_native_close(f->fd) < 0)
269                 ret = errno;
270
271         f->fd = -1;
272         return ret;
273 }
274
275 static int fio_ime_unlink_file(struct thread_data *td, struct fio_file *f)
276 {
277         char *ime_filename = fio_set_ime_filename(f->file_name);
278         int ret;
279
280         if (ime_filename == NULL)
281                 return 1;
282
283         ret = unlink(ime_filename);
284         return ret < 0 ? errno : 0;
285 }
286
287 static struct io_u *fio_ime_event(struct thread_data *td, int event)
288 {
289         struct ime_data *ime_d = td->io_ops_data;
290
291         return ime_d->event_io_us[event];
292 }
293
294 /* Setup file used to replace get_file_sizes when settin up the file.
295    Instead we will set real_file_sie to 0 for each file. This way we
296    can avoid calling ime_native_init before the forks are created. */
297 static int fio_ime_setup(struct thread_data *td)
298 {
299         struct fio_file *f;
300         unsigned int i;
301
302         for_each_file(td, f, i) {
303                 dprint(FD_FILE, "setup: set file size to 0 for %p/%d/%s\n",
304                         f, i, f->file_name);
305                 f->real_file_size = 0;
306         }
307
308         return 0;
309 }
310
311 static int fio_ime_engine_init(struct thread_data *td)
312 {
313         struct fio_file *f;
314         unsigned int i;
315
316         dprint(FD_IO, "ime engine init\n");
317         if (fio_ime_is_initialized && !td->o.use_thread) {
318                 log_err("Warning: something might go wrong. Not all threads/forks were"
319                                 " created before the FIO jobs were initialized.\n");
320         }
321
322         ime_native_init();
323         fio_ime_is_initialized = true;
324
325         /* We have to temporarily set real_file_size so that
326            FIO can initialize properly. It will be corrected
327            on file open. */
328         for_each_file(td, f, i)
329                 f->real_file_size = f->io_size + f->file_offset;
330
331         return 0;
332 }
333
334 static void fio_ime_engine_finalize(struct thread_data *td)
335 {
336         /* Only finalize IME when using forks */
337         if (!td->o.use_thread) {
338                 if (ime_native_finalize() < 0)
339                         log_err("error in ime_native_finalize\n");
340                 fio_ime_is_initialized = false;
341         }
342 }
343
344
345 /**************************************************************
346  *             Private functions for blocking IOs
347  *                     (without iovecs)
348  **************************************************************/
349
350 /* Notice: this function comes from the sync engine */
351 /* It is used by the commit function to return a proper code and fill
352    some attributes in the io_u used for the IO. */
353 static int fio_ime_psync_end(struct thread_data *td, struct io_u *io_u, ssize_t ret)
354 {
355         if (ret != (ssize_t) io_u->xfer_buflen) {
356                 if (ret >= 0) {
357                         io_u->resid = io_u->xfer_buflen - ret;
358                         io_u->error = 0;
359                         return FIO_Q_COMPLETED;
360                 } else
361                         io_u->error = errno;
362         }
363
364         if (io_u->error) {
365                 io_u_log_error(td, io_u);
366                 td_verror(td, io_u->error, "xfer");
367         }
368
369         return FIO_Q_COMPLETED;
370 }
371
372 static enum fio_q_status fio_ime_psync_queue(struct thread_data *td,
373                                            struct io_u *io_u)
374 {
375         struct fio_file *f = io_u->file;
376         ssize_t ret;
377
378         fio_ro_check(td, io_u);
379
380         if (io_u->ddir == DDIR_READ)
381                 ret = ime_native_pread(f->fd, io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
382         else if (io_u->ddir == DDIR_WRITE)
383                 ret = ime_native_pwrite(f->fd, io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
384         else if (io_u->ddir == DDIR_SYNC)
385                 ret = ime_native_fsync(f->fd);
386         else {
387                 ret = io_u->xfer_buflen;
388                 io_u->error = EINVAL;
389         }
390
391         return fio_ime_psync_end(td, io_u, ret);
392 }
393
394
395 /**************************************************************
396  *             Private functions for blocking IOs
397  *                       (with iovecs)
398  **************************************************************/
399
400 static bool fio_ime_psyncv_can_queue(struct ime_data *ime_d, struct io_u *io_u)
401 {
402         /* We can only queue if:
403           - There are no queued iovecs
404           - Or if there is at least one:
405                  - There must be no event waiting for retrieval
406                  - The offsets must be contiguous
407                  - The ddir and fd must be the same */
408         return (ime_d->queued == 0 || (
409                         ime_d->events == 0 &&
410                         ime_d->last_offset == io_u->offset &&
411                         ime_d->sioreq->ddir == io_u->ddir &&
412                         ime_d->sioreq->fd == io_u->file->fd));
413 }
414
415 /* Before using this function, we should have already
416    ensured that the queue is not full */
417 static void fio_ime_psyncv_enqueue(struct ime_data *ime_d, struct io_u *io_u)
418 {
419         struct imesio_req *ioreq = ime_d->sioreq;
420         struct iovec *iov = &ime_d->iovecs[ime_d->head];
421
422         iov->iov_base = io_u->xfer_buf;
423         iov->iov_len = io_u->xfer_buflen;
424
425         if (ime_d->queued == 0) {
426                 ioreq->offset = io_u->offset;
427                 ioreq->ddir = io_u->ddir;
428                 ioreq->fd = io_u->file->fd;
429         }
430
431         ime_d->io_us[ime_d->head] = io_u;
432         ime_d->last_offset = io_u->offset + io_u->xfer_buflen;
433         fio_ime_queue_incr(ime_d);
434 }
435
436 /* Tries to queue an IO. It will fail if the IO can't be appended to the
437    current request or if the current request has been committed but not
438    yet retrieved by get_events. */
439 static enum fio_q_status fio_ime_psyncv_queue(struct thread_data *td,
440         struct io_u *io_u)
441 {
442         struct ime_data *ime_d = td->io_ops_data;
443
444         fio_ro_check(td, io_u);
445
446         if (ime_d->queued == ime_d->depth)
447                 return FIO_Q_BUSY;
448
449         if (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_WRITE) {
450                 if (!fio_ime_psyncv_can_queue(ime_d, io_u))
451                         return FIO_Q_BUSY;
452
453                 dprint(FD_IO, "queue: ddir=%d at %u commit=%u queued=%u events=%u\n",
454                         io_u->ddir, ime_d->head, ime_d->cur_commit,
455                         ime_d->queued, ime_d->events);
456                 fio_ime_psyncv_enqueue(ime_d, io_u);
457                 return FIO_Q_QUEUED;
458         } else if (io_u->ddir == DDIR_SYNC) {
459                 if (ime_native_fsync(io_u->file->fd) < 0) {
460                         io_u->error = errno;
461                         td_verror(td, io_u->error, "fsync");
462                 }
463                 return FIO_Q_COMPLETED;
464         } else {
465                 io_u->error = EINVAL;
466                 td_verror(td, io_u->error, "wrong ddir");
467                 return FIO_Q_COMPLETED;
468         }
469 }
470
471 /* Notice: this function comes from the sync engine */
472 /* It is used by the commit function to return a proper code and fill
473    some attributes in the io_us appended to the current request. */
474 static int fio_ime_psyncv_end(struct thread_data *td, ssize_t bytes)
475 {
476         struct ime_data *ime_d = td->io_ops_data;
477         struct io_u *io_u;
478         unsigned int i;
479         int err = errno;
480
481         for (i = 0; i < ime_d->queued; i++) {
482                 io_u = ime_d->io_us[i];
483
484                 if (bytes == -1)
485                         io_u->error = err;
486                 else {
487                         unsigned int this_io;
488
489                         this_io = bytes;
490                         if (this_io > io_u->xfer_buflen)
491                                 this_io = io_u->xfer_buflen;
492
493                         io_u->resid = io_u->xfer_buflen - this_io;
494                         io_u->error = 0;
495                         bytes -= this_io;
496                 }
497         }
498
499         if (bytes == -1) {
500                 td_verror(td, err, "xfer psyncv");
501                 return -err;
502         }
503
504         return 0;
505 }
506
507 /* Commits the current request by calling ime_native (with one or several
508    iovecs). After this commit, the corresponding events (one per iovec)
509    can be retrieved by get_events. */
510 static int fio_ime_psyncv_commit(struct thread_data *td)
511 {
512         struct ime_data *ime_d = td->io_ops_data;
513         struct imesio_req *ioreq;
514         int ret = 0;
515
516         /* Exit if there are no (new) events to commit
517            or if the previous committed event haven't been retrieved */
518         if (!ime_d->queued || ime_d->events)
519                 return 0;
520
521         ioreq = ime_d->sioreq;
522         ime_d->events = ime_d->queued;
523         if (ioreq->ddir == DDIR_READ)
524                 ret = ime_native_preadv(ioreq->fd, ime_d->iovecs, ime_d->queued, ioreq->offset);
525         else
526                 ret = ime_native_pwritev(ioreq->fd, ime_d->iovecs, ime_d->queued, ioreq->offset);
527
528         dprint(FD_IO, "committed %d iovecs\n", ime_d->queued);
529
530         return fio_ime_psyncv_end(td, ret);
531 }
532
533 static int fio_ime_psyncv_getevents(struct thread_data *td, unsigned int min,
534                                 unsigned int max, const struct timespec *t)
535 {
536         struct ime_data *ime_d = td->io_ops_data;
537         struct io_u *io_u;
538         int events = 0;
539         unsigned int count;
540
541         if (ime_d->events) {
542                 for (count = 0; count < ime_d->events; count++) {
543                         io_u = ime_d->io_us[count];
544                         ime_d->event_io_us[events] = io_u;
545                         events++;
546                 }
547                 fio_ime_queue_reset(ime_d);
548         }
549
550         dprint(FD_IO, "getevents(%u,%u) ret=%d queued=%u events=%u\n",
551                 min, max, events, ime_d->queued, ime_d->events);
552         return events;
553 }
554
555 static int fio_ime_psyncv_init(struct thread_data *td)
556 {
557         struct ime_data *ime_d;
558
559         if (fio_ime_engine_init(td) < 0)
560                 return 1;
561
562         ime_d = calloc(1, sizeof(*ime_d));
563
564         ime_d->sioreq = malloc(sizeof(struct imesio_req));
565         ime_d->iovecs = malloc(td->o.iodepth * sizeof(struct iovec));
566         ime_d->io_us = malloc(2 * td->o.iodepth * sizeof(struct io_u *));
567         ime_d->event_io_us = ime_d->io_us + td->o.iodepth;
568
569         ime_d->depth = td->o.iodepth;
570
571         td->io_ops_data = ime_d;
572         return 0;
573 }
574
575 static void fio_ime_psyncv_clean(struct thread_data *td)
576 {
577         struct ime_data *ime_d = td->io_ops_data;
578
579         if (ime_d) {
580                 free(ime_d->sioreq);
581                 free(ime_d->iovecs);
582                 free(ime_d->io_us);
583                 free(ime_d);
584                 td->io_ops_data = NULL;
585         }
586
587         fio_ime_engine_finalize(td);
588 }
589
590
591 /**************************************************************
592  *           Private functions for non-blocking IOs
593  *
594  **************************************************************/
595
596 void fio_ime_aio_complete_cb  (struct ime_aiocb *aiocb, int err,
597                                                            ssize_t bytes)
598 {
599         struct imeaio_req *ioreq = (struct imeaio_req *) aiocb->user_context;
600
601         pthread_mutex_lock(&ioreq->status_mutex);
602         ioreq->status = err == 0 ? bytes : FIO_IME_REQ_ERROR;
603         pthread_mutex_unlock(&ioreq->status_mutex);
604
605         pthread_cond_signal(&ioreq->cond_endio);
606 }
607
608 static bool fio_ime_aio_can_queue (struct ime_data *ime_d, struct io_u *io_u)
609 {
610         /* So far we can queue in any case. */
611         return true;
612 }
613 static bool fio_ime_aio_can_append (struct ime_data *ime_d, struct io_u *io_u)
614 {
615         /* We can only append if:
616                 - The iovecs will be contiguous in the array
617                 - There is already a queued iovec
618                 - The offsets are contiguous
619                 - The ddir and fs are the same */
620         return (ime_d->head != 0 &&
621                         ime_d->queued - ime_d->events > 0 &&
622                         ime_d->last_offset == io_u->offset &&
623                         ime_d->last_req->ddir == io_u->ddir &&
624                         ime_d->last_req->iocb.fd == io_u->file->fd);
625 }
626
627 /* Before using this function, we should have already
628    ensured that the queue is not full */
629 static void fio_ime_aio_enqueue(struct ime_data *ime_d, struct io_u *io_u)
630 {
631         struct imeaio_req *ioreq = &ime_d->aioreqs[ime_d->head];
632         struct ime_aiocb *iocb = &ioreq->iocb;
633         struct iovec *iov = &ime_d->iovecs[ime_d->head];
634
635         iov->iov_base = io_u->xfer_buf;
636         iov->iov_len = io_u->xfer_buflen;
637
638         if (fio_ime_aio_can_append(ime_d, io_u))
639                 ime_d->last_req->iocb.iovcnt++;
640         else {
641                 ioreq->status = FIO_IME_IN_PROGRESS;
642                 ioreq->ddir = io_u->ddir;
643                 ime_d->last_req = ioreq;
644
645                 iocb->complete_cb = &fio_ime_aio_complete_cb;
646                 iocb->fd = io_u->file->fd;
647                 iocb->file_offset = io_u->offset;
648                 iocb->iov = iov;
649                 iocb->iovcnt = 1;
650                 iocb->flags = 0;
651                 iocb->user_context = (intptr_t) ioreq;
652         }
653
654         ime_d->io_us[ime_d->head] = io_u;
655         ime_d->last_offset = io_u->offset + io_u->xfer_buflen;
656         fio_ime_queue_incr(ime_d);
657 }
658
659 /* Tries to queue an IO. It will create a new request if the IO can't be
660    appended to the current request. It will fail if the queue can't contain
661    any more io_u/iovec. In this case, commit and then get_events need to be
662    called. */
663 static enum fio_q_status fio_ime_aio_queue(struct thread_data *td,
664                 struct io_u *io_u)
665 {
666         struct ime_data *ime_d = td->io_ops_data;
667
668         fio_ro_check(td, io_u);
669
670         dprint(FD_IO, "queue: ddir=%d at %u commit=%u queued=%u events=%u\n",
671                 io_u->ddir, ime_d->head, ime_d->cur_commit,
672                 ime_d->queued, ime_d->events);
673
674         if (ime_d->queued == ime_d->depth)
675                 return FIO_Q_BUSY;
676
677         if (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_WRITE) {
678                 if (!fio_ime_aio_can_queue(ime_d, io_u))
679                         return FIO_Q_BUSY;
680
681                 fio_ime_aio_enqueue(ime_d, io_u);
682                 return FIO_Q_QUEUED;
683         } else if (io_u->ddir == DDIR_SYNC) {
684                 if (ime_native_fsync(io_u->file->fd) < 0) {
685                         io_u->error = errno;
686                         td_verror(td, io_u->error, "fsync");
687                 }
688                 return FIO_Q_COMPLETED;
689         } else {
690                 io_u->error = EINVAL;
691                 td_verror(td, io_u->error, "wrong ddir");
692                 return FIO_Q_COMPLETED;
693         }
694 }
695
696 static int fio_ime_aio_commit(struct thread_data *td)
697 {
698         struct ime_data *ime_d = td->io_ops_data;
699         struct imeaio_req *ioreq;
700         int ret = 0;
701
702         /* Loop while there are events to commit */
703         while (ime_d->queued - ime_d->events) {
704                 ioreq = &ime_d->aioreqs[ime_d->cur_commit];
705                 if (ioreq->ddir == DDIR_READ)
706                         ret = ime_native_aio_read(&ioreq->iocb);
707                 else
708                         ret = ime_native_aio_write(&ioreq->iocb);
709
710                 fio_ime_queue_commit(ime_d, ioreq->iocb.iovcnt);
711
712                 /* fio needs a negative error code */
713                 if (ret < 0) {
714                         ioreq->status = FIO_IME_REQ_ERROR;
715                         return -errno;
716                 }
717
718                 io_u_mark_submit(td, ioreq->iocb.iovcnt);
719                 dprint(FD_IO, "committed %d iovecs commit=%u queued=%u events=%u\n",
720                         ioreq->iocb.iovcnt, ime_d->cur_commit,
721                         ime_d->queued, ime_d->events);
722         }
723
724         return 0;
725 }
726
727 static int fio_ime_aio_getevents(struct thread_data *td, unsigned int min,
728                                 unsigned int max, const struct timespec *t)
729 {
730         struct ime_data *ime_d = td->io_ops_data;
731         struct imeaio_req *ioreq;
732         struct io_u *io_u;
733         int events = 0;
734         unsigned int count;
735         ssize_t bytes;
736
737         while (ime_d->events) {
738                 ioreq = &ime_d->aioreqs[ime_d->tail];
739
740                 /* Break if we already got events, and if we will
741                    exceed max if we append the next events */
742                 if (events && events + ioreq->iocb.iovcnt > max)
743                         break;
744
745                 if (ioreq->status != FIO_IME_IN_PROGRESS) {
746
747                         bytes = ioreq->status;
748                         for (count = 0; count < ioreq->iocb.iovcnt; count++) {
749                                 io_u = ime_d->io_us[ime_d->tail];
750                                 ime_d->event_io_us[events] = io_u;
751                                 events++;
752                                 fio_ime_queue_red(ime_d);
753
754                                 if (ioreq->status == FIO_IME_REQ_ERROR)
755                                         io_u->error = EIO;
756                                 else {
757                                         io_u->resid = bytes > io_u->xfer_buflen ?
758                                                                         0 : io_u->xfer_buflen - bytes;
759                                         io_u->error = 0;
760                                         bytes -= io_u->xfer_buflen - io_u->resid;
761                                 }
762                         }
763                 } else {
764                         pthread_mutex_lock(&ioreq->status_mutex);
765                         while (ioreq->status == FIO_IME_IN_PROGRESS)
766                                 pthread_cond_wait(&ioreq->cond_endio, &ioreq->status_mutex);
767                         pthread_mutex_unlock(&ioreq->status_mutex);
768                 }
769
770         }
771
772         dprint(FD_IO, "getevents(%u,%u) ret=%d queued=%u events=%u\n", min, max,
773                 events, ime_d->queued, ime_d->events);
774         return events;
775 }
776
777 static int fio_ime_aio_init(struct thread_data *td)
778 {
779         struct ime_data *ime_d;
780         struct imeaio_req *ioreq;
781         unsigned int i;
782
783         if (fio_ime_engine_init(td) < 0)
784                 return 1;
785
786         ime_d = calloc(1, sizeof(*ime_d));
787
788         ime_d->aioreqs = malloc(td->o.iodepth * sizeof(struct imeaio_req));
789         ime_d->iovecs = malloc(td->o.iodepth * sizeof(struct iovec));
790         ime_d->io_us = malloc(2 * td->o.iodepth * sizeof(struct io_u *));
791         ime_d->event_io_us = ime_d->io_us + td->o.iodepth;
792
793         ime_d->depth = td->o.iodepth;
794         for (i = 0; i < ime_d->depth; i++) {
795                 ioreq = &ime_d->aioreqs[i];
796                 pthread_cond_init(&ioreq->cond_endio, NULL);
797                 pthread_mutex_init(&ioreq->status_mutex, NULL);
798         }
799
800         td->io_ops_data = ime_d;
801         return 0;
802 }
803
804 static void fio_ime_aio_clean(struct thread_data *td)
805 {
806         struct ime_data *ime_d = td->io_ops_data;
807         struct imeaio_req *ioreq;
808         unsigned int i;
809
810         if (ime_d) {
811                 for (i = 0; i < ime_d->depth; i++) {
812                         ioreq = &ime_d->aioreqs[i];
813                         pthread_cond_destroy(&ioreq->cond_endio);
814                         pthread_mutex_destroy(&ioreq->status_mutex);
815                 }
816                 free(ime_d->aioreqs);
817                 free(ime_d->iovecs);
818                 free(ime_d->io_us);
819                 free(ime_d);
820                 td->io_ops_data = NULL;
821         }
822
823         fio_ime_engine_finalize(td);
824 }
825
826
827 /**************************************************************
828  *                   IO engines definitions
829  *
830  **************************************************************/
831
832 /* The FIO_DISKLESSIO flag used for these engines is necessary to prevent
833    FIO from using POSIX calls. See fio_ime_open_file for more details. */
834
835 static struct ioengine_ops ioengine_prw = {
836         .name           = "ime_psync",
837         .version        = FIO_IOOPS_VERSION,
838         .setup          = fio_ime_setup,
839         .init           = fio_ime_engine_init,
840         .cleanup        = fio_ime_engine_finalize,
841         .queue          = fio_ime_psync_queue,
842         .open_file      = fio_ime_open_file,
843         .close_file     = fio_ime_close_file,
844         .get_file_size  = fio_ime_get_file_size,
845         .unlink_file    = fio_ime_unlink_file,
846         .flags          = FIO_SYNCIO | FIO_DISKLESSIO,
847 };
848
849 static struct ioengine_ops ioengine_pvrw = {
850         .name           = "ime_psyncv",
851         .version        = FIO_IOOPS_VERSION,
852         .setup          = fio_ime_setup,
853         .init           = fio_ime_psyncv_init,
854         .cleanup        = fio_ime_psyncv_clean,
855         .queue          = fio_ime_psyncv_queue,
856         .commit         = fio_ime_psyncv_commit,
857         .getevents      = fio_ime_psyncv_getevents,
858         .event          = fio_ime_event,
859         .open_file      = fio_ime_open_file,
860         .close_file     = fio_ime_close_file,
861         .get_file_size  = fio_ime_get_file_size,
862         .unlink_file    = fio_ime_unlink_file,
863         .flags          = FIO_SYNCIO | FIO_DISKLESSIO,
864 };
865
866 static struct ioengine_ops ioengine_aio = {
867         .name           = "ime_aio",
868         .version        = FIO_IOOPS_VERSION,
869         .setup          = fio_ime_setup,
870         .init           = fio_ime_aio_init,
871         .cleanup        = fio_ime_aio_clean,
872         .queue          = fio_ime_aio_queue,
873         .commit         = fio_ime_aio_commit,
874         .getevents      = fio_ime_aio_getevents,
875         .event          = fio_ime_event,
876         .open_file      = fio_ime_open_file,
877         .close_file     = fio_ime_close_file,
878         .get_file_size  = fio_ime_get_file_size,
879         .unlink_file    = fio_ime_unlink_file,
880         .flags          = FIO_DISKLESSIO,
881 };
882
883 static void fio_init fio_ime_register(void)
884 {
885         register_ioengine(&ioengine_prw);
886         register_ioengine(&ioengine_pvrw);
887         register_ioengine(&ioengine_aio);
888 }
889
890 static void fio_exit fio_ime_unregister(void)
891 {
892         unregister_ioengine(&ioengine_prw);
893         unregister_ioengine(&ioengine_pvrw);
894         unregister_ioengine(&ioengine_aio);
895
896         if (fio_ime_is_initialized && ime_native_finalize() < 0)
897                 log_err("Warning: IME did not finalize properly\n");
898 }