docs: update for new data placement options
[fio.git] / engines / ime.c
1 /*
2  * FIO engines for DDN's Infinite Memory Engine.
3  * This file defines 3 engines: ime_psync, ime_psyncv, and ime_aio
4  *
5  * Copyright (C) 2018      DataDirect Networks. All rights reserved.
6  *
7  * This program is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU General Public License,
9  * version 2 as published by the Free Software Foundation..
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  */
16
17 /*
18  * Some details about the new engines are given below:
19  *
20  *
21  * ime_psync:
22  * Most basic engine that issues calls to ime_native whenever an IO is queued.
23  *
24  * ime_psyncv:
25  * This engine tries to queue the IOs (by creating iovecs) if asked by FIO (via
26  * iodepth_batch). It refuses to queue when the iovecs can't be appended, and
27  * waits for FIO to issue a commit. After a call to commit and get_events, new
28  * IOs can be queued.
29  *
30  * ime_aio:
31  * This engine tries to queue the IOs (by creating iovecs) if asked by FIO (via
32  * iodepth_batch). When the iovecs can't be appended to the current request, a
33  * new request for IME is created. These requests will be issued to IME when
34  * commit is called. Contrary to ime_psyncv, there can be several requests at
35  * once. We don't need to wait for a request to terminate before creating a new
36  * one.
37  */
38
39 #include <stdio.h>
40 #include <stdlib.h>
41 #include <errno.h>
42 #include <linux/limits.h>
43 #include <ime_native.h>
44
45 #include "../fio.h"
46
47
48 /**************************************************************
49  *              Types and constants definitions
50  *
51  **************************************************************/
52
53 /* define constants for async IOs */
54 #define FIO_IME_IN_PROGRESS -1
55 #define FIO_IME_REQ_ERROR   -2
56
57 /* This flag is used when some jobs were created using threads. In that
58    case, IME can't be finalized in the engine-specific cleanup function,
59    because other threads might still use IME. Instead, IME is finalized
60    in the destructor (see fio_ime_unregister), only when the flag
61    fio_ime_is_initialized is true (which means at least one thread has
62    initialized IME). */
63 static bool fio_ime_is_initialized = false;
64
65 struct imesio_req {
66         int                     fd;             /* File descriptor */
67         enum fio_ddir   ddir;   /* Type of IO (read or write) */
68         off_t                   offset; /* File offset */
69 };
70 struct imeaio_req {
71         struct ime_aiocb        iocb;                   /* IME aio request */
72         ssize_t                 status;                 /* Status of the IME request */
73         enum fio_ddir           ddir;                   /* Type of IO (read or write) */
74         pthread_cond_t          cond_endio;             /* Condition var to notify FIO */
75         pthread_mutex_t         status_mutex;   /* Mutex for cond_endio */
76 };
77
78 /* This structure will be used for 2 engines: ime_psyncv and ime_aio */
79 struct ime_data {
80         union {
81                 struct imeaio_req       *aioreqs;       /* array of aio requests */
82                 struct imesio_req       *sioreq;        /* pointer to the only syncio request */
83         };
84         struct iovec    *iovecs;                /* array of queued iovecs */
85         struct io_u     **io_us;                /* array of queued io_u pointers */
86         struct io_u     **event_io_us;  /* array of the events retrieved after get_events*/
87         unsigned int    queued;                 /* iovecs/io_us in the queue */
88         unsigned int    events;                 /* number of committed iovecs/io_us */
89
90         /* variables used to implement a "ring" queue */
91         unsigned int depth;                     /* max entries in the queue */
92         unsigned int head;                      /* index used to append */
93         unsigned int tail;                      /* index used to pop */
94         unsigned int cur_commit;        /* index of the first uncommitted req */
95
96         /* offset used by the last iovec (used to check if the iovecs can be appended)*/
97         unsigned long long      last_offset;
98
99         /* The variables below are used for aio only */
100         struct imeaio_req       *last_req; /* last request awaiting committing */
101 };
102
103
104 /**************************************************************
105  *         Private functions for queueing/unqueueing
106  *
107  **************************************************************/
108
109 static void fio_ime_queue_incr (struct ime_data *ime_d)
110 {
111         ime_d->head = (ime_d->head + 1) % ime_d->depth;
112         ime_d->queued++;
113 }
114
115 static void fio_ime_queue_red (struct ime_data *ime_d)
116 {
117         ime_d->tail = (ime_d->tail + 1) % ime_d->depth;
118         ime_d->queued--;
119         ime_d->events--;
120 }
121
122 static void fio_ime_queue_commit (struct ime_data *ime_d, int iovcnt)
123 {
124         ime_d->cur_commit = (ime_d->cur_commit + iovcnt) % ime_d->depth;
125         ime_d->events += iovcnt;
126 }
127
128 static void fio_ime_queue_reset (struct ime_data *ime_d)
129 {
130         ime_d->head = 0;
131         ime_d->tail = 0;
132         ime_d->cur_commit = 0;
133         ime_d->queued = 0;
134         ime_d->events = 0;
135 }
136
137 /**************************************************************
138  *                   General IME functions
139  *             (needed for both sync and async IOs)
140  **************************************************************/
141
142 static char *fio_set_ime_filename(char* filename)
143 {
144         static __thread char ime_filename[PATH_MAX];
145         int ret;
146
147         ret = snprintf(ime_filename, PATH_MAX, "%s%s", DEFAULT_IME_FILE_PREFIX, filename);
148         if (ret < PATH_MAX)
149                 return ime_filename;
150
151         return NULL;
152 }
153
154 static int fio_ime_get_file_size(struct thread_data *td, struct fio_file *f)
155 {
156         struct stat buf;
157         int ret;
158         char *ime_filename;
159
160         dprint(FD_FILE, "get file size %s\n", f->file_name);
161
162         ime_filename = fio_set_ime_filename(f->file_name);
163         if (ime_filename == NULL)
164                 return 1;
165         ret = ime_native_stat(ime_filename, &buf);
166         if (ret == -1) {
167                 td_verror(td, errno, "fstat");
168                 return 1;
169         }
170
171         f->real_file_size = buf.st_size;
172         return 0;
173 }
174
175 /* This functions mimics the generic_file_open function, but issues
176    IME native calls instead of POSIX calls. */
177 static int fio_ime_open_file(struct thread_data *td, struct fio_file *f)
178 {
179         int flags = 0;
180         int ret;
181         uint64_t desired_fs;
182         char *ime_filename;
183
184         dprint(FD_FILE, "fd open %s\n", f->file_name);
185
186         if (td_trim(td)) {
187                 td_verror(td, EINVAL, "IME does not support TRIM operation");
188                 return 1;
189         }
190
191         if (td->o.odirect)
192                 flags |= O_DIRECT;
193         flags |= td->o.sync_io;
194         if (td->o.create_on_open && td->o.allow_create)
195                 flags |= O_CREAT;
196
197         if (td_write(td)) {
198                 if (!read_only)
199                         flags |= O_RDWR;
200
201                 if (td->o.allow_create)
202                         flags |= O_CREAT;
203         } else if (td_read(td)) {
204                 flags |= O_RDONLY;
205         } else {
206                 /* We should never go here. */
207                 td_verror(td, EINVAL, "Unsopported open mode");
208                 return 1;
209         }
210
211         ime_filename = fio_set_ime_filename(f->file_name);
212         if (ime_filename == NULL)
213                 return 1;
214         f->fd = ime_native_open(ime_filename, flags, 0600);
215         if (f->fd == -1) {
216                 char buf[FIO_VERROR_SIZE];
217                 int __e = errno;
218
219                 snprintf(buf, sizeof(buf), "open(%s)", f->file_name);
220                 td_verror(td, __e, buf);
221                 return 1;
222         }
223
224         /* Now we need to make sure the real file size is sufficient for FIO
225            to do its things. This is normally done before the file open function
226            is called, but because FIO would use POSIX calls, we need to do it
227            ourselves */
228         ret = fio_ime_get_file_size(td, f);
229         if (ret < 0) {
230                 ime_native_close(f->fd);
231                 td_verror(td, errno, "ime_get_file_size");
232                 return 1;
233         }
234
235         desired_fs = f->io_size + f->file_offset;
236         if (td_write(td)) {
237                 dprint(FD_FILE, "Laying out file %s%s\n",
238                         DEFAULT_IME_FILE_PREFIX, f->file_name);
239                 if (!td->o.create_on_open &&
240                                 f->real_file_size < desired_fs &&
241                                 ime_native_ftruncate(f->fd, desired_fs) < 0) {
242                         ime_native_close(f->fd);
243                         td_verror(td, errno, "ime_native_ftruncate");
244                         return 1;
245                 }
246                 if (f->real_file_size < desired_fs)
247                         f->real_file_size = desired_fs;
248         } else if (td_read(td) && f->real_file_size < desired_fs) {
249                 ime_native_close(f->fd);
250                 log_err("error: can't read %lu bytes from file with "
251                                                 "%lu bytes\n", desired_fs, f->real_file_size);
252                 return 1;
253         }
254
255         return 0;
256 }
257
258 static int fio_ime_close_file(struct thread_data fio_unused *td, struct fio_file *f)
259 {
260         int ret = 0;
261
262         dprint(FD_FILE, "fd close %s\n", f->file_name);
263
264         if (ime_native_close(f->fd) < 0)
265                 ret = errno;
266
267         f->fd = -1;
268         return ret;
269 }
270
271 static int fio_ime_unlink_file(struct thread_data *td, struct fio_file *f)
272 {
273         char *ime_filename = fio_set_ime_filename(f->file_name);
274         int ret;
275
276         if (ime_filename == NULL)
277                 return 1;
278
279         ret = unlink(ime_filename);
280         return ret < 0 ? errno : 0;
281 }
282
283 static struct io_u *fio_ime_event(struct thread_data *td, int event)
284 {
285         struct ime_data *ime_d = td->io_ops_data;
286
287         return ime_d->event_io_us[event];
288 }
289
290 /* Setup file used to replace get_file_sizes when settin up the file.
291    Instead we will set real_file_sie to 0 for each file. This way we
292    can avoid calling ime_native_init before the forks are created. */
293 static int fio_ime_setup(struct thread_data *td)
294 {
295         struct fio_file *f;
296         unsigned int i;
297
298         for_each_file(td, f, i) {
299                 dprint(FD_FILE, "setup: set file size to 0 for %p/%d/%s\n",
300                         f, i, f->file_name);
301                 f->real_file_size = 0;
302         }
303
304         return 0;
305 }
306
307 static int fio_ime_engine_init(struct thread_data *td)
308 {
309         struct fio_file *f;
310         unsigned int i;
311
312         dprint(FD_IO, "ime engine init\n");
313         if (fio_ime_is_initialized && !td->o.use_thread) {
314                 log_err("Warning: something might go wrong. Not all threads/forks were"
315                                 " created before the FIO jobs were initialized.\n");
316         }
317
318         ime_native_init();
319         fio_ime_is_initialized = true;
320
321         /* We have to temporarily set real_file_size so that
322            FIO can initialize properly. It will be corrected
323            on file open. */
324         for_each_file(td, f, i)
325                 f->real_file_size = f->io_size + f->file_offset;
326
327         return 0;
328 }
329
330 static void fio_ime_engine_finalize(struct thread_data *td)
331 {
332         /* Only finalize IME when using forks */
333         if (!td->o.use_thread) {
334                 if (ime_native_finalize() < 0)
335                         log_err("error in ime_native_finalize\n");
336                 fio_ime_is_initialized = false;
337         }
338 }
339
340
341 /**************************************************************
342  *             Private functions for blocking IOs
343  *                     (without iovecs)
344  **************************************************************/
345
346 /* Notice: this function comes from the sync engine */
347 /* It is used by the commit function to return a proper code and fill
348    some attributes in the io_u used for the IO. */
349 static int fio_ime_psync_end(struct thread_data *td, struct io_u *io_u, ssize_t ret)
350 {
351         if (ret != (ssize_t) io_u->xfer_buflen) {
352                 if (ret >= 0) {
353                         io_u->resid = io_u->xfer_buflen - ret;
354                         io_u->error = 0;
355                         return FIO_Q_COMPLETED;
356                 } else
357                         io_u->error = errno;
358         }
359
360         if (io_u->error) {
361                 io_u_log_error(td, io_u);
362                 td_verror(td, io_u->error, "xfer");
363         }
364
365         return FIO_Q_COMPLETED;
366 }
367
368 static enum fio_q_status fio_ime_psync_queue(struct thread_data *td,
369                                            struct io_u *io_u)
370 {
371         struct fio_file *f = io_u->file;
372         ssize_t ret;
373
374         fio_ro_check(td, io_u);
375
376         if (io_u->ddir == DDIR_READ)
377                 ret = ime_native_pread(f->fd, io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
378         else if (io_u->ddir == DDIR_WRITE)
379                 ret = ime_native_pwrite(f->fd, io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
380         else if (io_u->ddir == DDIR_SYNC)
381                 ret = ime_native_fsync(f->fd);
382         else {
383                 ret = io_u->xfer_buflen;
384                 io_u->error = EINVAL;
385         }
386
387         return fio_ime_psync_end(td, io_u, ret);
388 }
389
390
391 /**************************************************************
392  *             Private functions for blocking IOs
393  *                       (with iovecs)
394  **************************************************************/
395
396 static bool fio_ime_psyncv_can_queue(struct ime_data *ime_d, struct io_u *io_u)
397 {
398         /* We can only queue if:
399           - There are no queued iovecs
400           - Or if there is at least one:
401                  - There must be no event waiting for retrieval
402                  - The offsets must be contiguous
403                  - The ddir and fd must be the same */
404         return (ime_d->queued == 0 || (
405                         ime_d->events == 0 &&
406                         ime_d->last_offset == io_u->offset &&
407                         ime_d->sioreq->ddir == io_u->ddir &&
408                         ime_d->sioreq->fd == io_u->file->fd));
409 }
410
411 /* Before using this function, we should have already
412    ensured that the queue is not full */
413 static void fio_ime_psyncv_enqueue(struct ime_data *ime_d, struct io_u *io_u)
414 {
415         struct imesio_req *ioreq = ime_d->sioreq;
416         struct iovec *iov = &ime_d->iovecs[ime_d->head];
417
418         iov->iov_base = io_u->xfer_buf;
419         iov->iov_len = io_u->xfer_buflen;
420
421         if (ime_d->queued == 0) {
422                 ioreq->offset = io_u->offset;
423                 ioreq->ddir = io_u->ddir;
424                 ioreq->fd = io_u->file->fd;
425         }
426
427         ime_d->io_us[ime_d->head] = io_u;
428         ime_d->last_offset = io_u->offset + io_u->xfer_buflen;
429         fio_ime_queue_incr(ime_d);
430 }
431
432 /* Tries to queue an IO. It will fail if the IO can't be appended to the
433    current request or if the current request has been committed but not
434    yet retrieved by get_events. */
435 static enum fio_q_status fio_ime_psyncv_queue(struct thread_data *td,
436         struct io_u *io_u)
437 {
438         struct ime_data *ime_d = td->io_ops_data;
439
440         fio_ro_check(td, io_u);
441
442         if (ime_d->queued == ime_d->depth)
443                 return FIO_Q_BUSY;
444
445         if (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_WRITE) {
446                 if (!fio_ime_psyncv_can_queue(ime_d, io_u))
447                         return FIO_Q_BUSY;
448
449                 dprint(FD_IO, "queue: ddir=%d at %u commit=%u queued=%u events=%u\n",
450                         io_u->ddir, ime_d->head, ime_d->cur_commit,
451                         ime_d->queued, ime_d->events);
452                 fio_ime_psyncv_enqueue(ime_d, io_u);
453                 return FIO_Q_QUEUED;
454         } else if (io_u->ddir == DDIR_SYNC) {
455                 if (ime_native_fsync(io_u->file->fd) < 0) {
456                         io_u->error = errno;
457                         td_verror(td, io_u->error, "fsync");
458                 }
459                 return FIO_Q_COMPLETED;
460         } else {
461                 io_u->error = EINVAL;
462                 td_verror(td, io_u->error, "wrong ddir");
463                 return FIO_Q_COMPLETED;
464         }
465 }
466
467 /* Notice: this function comes from the sync engine */
468 /* It is used by the commit function to return a proper code and fill
469    some attributes in the io_us appended to the current request. */
470 static int fio_ime_psyncv_end(struct thread_data *td, ssize_t bytes)
471 {
472         struct ime_data *ime_d = td->io_ops_data;
473         struct io_u *io_u;
474         unsigned int i;
475         int err = errno;
476
477         for (i = 0; i < ime_d->queued; i++) {
478                 io_u = ime_d->io_us[i];
479
480                 if (bytes == -1)
481                         io_u->error = err;
482                 else {
483                         unsigned int this_io;
484
485                         this_io = bytes;
486                         if (this_io > io_u->xfer_buflen)
487                                 this_io = io_u->xfer_buflen;
488
489                         io_u->resid = io_u->xfer_buflen - this_io;
490                         io_u->error = 0;
491                         bytes -= this_io;
492                 }
493         }
494
495         if (bytes == -1) {
496                 td_verror(td, err, "xfer psyncv");
497                 return -err;
498         }
499
500         return 0;
501 }
502
503 /* Commits the current request by calling ime_native (with one or several
504    iovecs). After this commit, the corresponding events (one per iovec)
505    can be retrieved by get_events. */
506 static int fio_ime_psyncv_commit(struct thread_data *td)
507 {
508         struct ime_data *ime_d = td->io_ops_data;
509         struct imesio_req *ioreq;
510         int ret = 0;
511
512         /* Exit if there are no (new) events to commit
513            or if the previous committed event haven't been retrieved */
514         if (!ime_d->queued || ime_d->events)
515                 return 0;
516
517         ioreq = ime_d->sioreq;
518         ime_d->events = ime_d->queued;
519         if (ioreq->ddir == DDIR_READ)
520                 ret = ime_native_preadv(ioreq->fd, ime_d->iovecs, ime_d->queued, ioreq->offset);
521         else
522                 ret = ime_native_pwritev(ioreq->fd, ime_d->iovecs, ime_d->queued, ioreq->offset);
523
524         dprint(FD_IO, "committed %d iovecs\n", ime_d->queued);
525
526         return fio_ime_psyncv_end(td, ret);
527 }
528
529 static int fio_ime_psyncv_getevents(struct thread_data *td, unsigned int min,
530                                 unsigned int max, const struct timespec *t)
531 {
532         struct ime_data *ime_d = td->io_ops_data;
533         struct io_u *io_u;
534         int events = 0;
535         unsigned int count;
536
537         if (ime_d->events) {
538                 for (count = 0; count < ime_d->events; count++) {
539                         io_u = ime_d->io_us[count];
540                         ime_d->event_io_us[events] = io_u;
541                         events++;
542                 }
543                 fio_ime_queue_reset(ime_d);
544         }
545
546         dprint(FD_IO, "getevents(%u,%u) ret=%d queued=%u events=%u\n",
547                 min, max, events, ime_d->queued, ime_d->events);
548         return events;
549 }
550
551 static int fio_ime_psyncv_init(struct thread_data *td)
552 {
553         struct ime_data *ime_d;
554
555         if (fio_ime_engine_init(td) < 0)
556                 return 1;
557
558         ime_d = calloc(1, sizeof(*ime_d));
559
560         ime_d->sioreq = malloc(sizeof(struct imesio_req));
561         ime_d->iovecs = malloc(td->o.iodepth * sizeof(struct iovec));
562         ime_d->io_us = malloc(2 * td->o.iodepth * sizeof(struct io_u *));
563         ime_d->event_io_us = ime_d->io_us + td->o.iodepth;
564
565         ime_d->depth = td->o.iodepth;
566
567         td->io_ops_data = ime_d;
568         return 0;
569 }
570
571 static void fio_ime_psyncv_clean(struct thread_data *td)
572 {
573         struct ime_data *ime_d = td->io_ops_data;
574
575         if (ime_d) {
576                 free(ime_d->sioreq);
577                 free(ime_d->iovecs);
578                 free(ime_d->io_us);
579                 free(ime_d);
580                 td->io_ops_data = NULL;
581         }
582
583         fio_ime_engine_finalize(td);
584 }
585
586
587 /**************************************************************
588  *           Private functions for non-blocking IOs
589  *
590  **************************************************************/
591
592 void fio_ime_aio_complete_cb  (struct ime_aiocb *aiocb, int err,
593                                                            ssize_t bytes)
594 {
595         struct imeaio_req *ioreq = (struct imeaio_req *) aiocb->user_context;
596
597         pthread_mutex_lock(&ioreq->status_mutex);
598         ioreq->status = err == 0 ? bytes : FIO_IME_REQ_ERROR;
599         pthread_mutex_unlock(&ioreq->status_mutex);
600
601         pthread_cond_signal(&ioreq->cond_endio);
602 }
603
604 static bool fio_ime_aio_can_queue (struct ime_data *ime_d, struct io_u *io_u)
605 {
606         /* So far we can queue in any case. */
607         return true;
608 }
609 static bool fio_ime_aio_can_append (struct ime_data *ime_d, struct io_u *io_u)
610 {
611         /* We can only append if:
612                 - The iovecs will be contiguous in the array
613                 - There is already a queued iovec
614                 - The offsets are contiguous
615                 - The ddir and fs are the same */
616         return (ime_d->head != 0 &&
617                         ime_d->queued - ime_d->events > 0 &&
618                         ime_d->last_offset == io_u->offset &&
619                         ime_d->last_req->ddir == io_u->ddir &&
620                         ime_d->last_req->iocb.fd == io_u->file->fd);
621 }
622
623 /* Before using this function, we should have already
624    ensured that the queue is not full */
625 static void fio_ime_aio_enqueue(struct ime_data *ime_d, struct io_u *io_u)
626 {
627         struct imeaio_req *ioreq = &ime_d->aioreqs[ime_d->head];
628         struct ime_aiocb *iocb = &ioreq->iocb;
629         struct iovec *iov = &ime_d->iovecs[ime_d->head];
630
631         iov->iov_base = io_u->xfer_buf;
632         iov->iov_len = io_u->xfer_buflen;
633
634         if (fio_ime_aio_can_append(ime_d, io_u))
635                 ime_d->last_req->iocb.iovcnt++;
636         else {
637                 ioreq->status = FIO_IME_IN_PROGRESS;
638                 ioreq->ddir = io_u->ddir;
639                 ime_d->last_req = ioreq;
640
641                 iocb->complete_cb = &fio_ime_aio_complete_cb;
642                 iocb->fd = io_u->file->fd;
643                 iocb->file_offset = io_u->offset;
644                 iocb->iov = iov;
645                 iocb->iovcnt = 1;
646                 iocb->flags = 0;
647                 iocb->user_context = (intptr_t) ioreq;
648         }
649
650         ime_d->io_us[ime_d->head] = io_u;
651         ime_d->last_offset = io_u->offset + io_u->xfer_buflen;
652         fio_ime_queue_incr(ime_d);
653 }
654
655 /* Tries to queue an IO. It will create a new request if the IO can't be
656    appended to the current request. It will fail if the queue can't contain
657    any more io_u/iovec. In this case, commit and then get_events need to be
658    called. */
659 static enum fio_q_status fio_ime_aio_queue(struct thread_data *td,
660                 struct io_u *io_u)
661 {
662         struct ime_data *ime_d = td->io_ops_data;
663
664         fio_ro_check(td, io_u);
665
666         dprint(FD_IO, "queue: ddir=%d at %u commit=%u queued=%u events=%u\n",
667                 io_u->ddir, ime_d->head, ime_d->cur_commit,
668                 ime_d->queued, ime_d->events);
669
670         if (ime_d->queued == ime_d->depth)
671                 return FIO_Q_BUSY;
672
673         if (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_WRITE) {
674                 if (!fio_ime_aio_can_queue(ime_d, io_u))
675                         return FIO_Q_BUSY;
676
677                 fio_ime_aio_enqueue(ime_d, io_u);
678                 return FIO_Q_QUEUED;
679         } else if (io_u->ddir == DDIR_SYNC) {
680                 if (ime_native_fsync(io_u->file->fd) < 0) {
681                         io_u->error = errno;
682                         td_verror(td, io_u->error, "fsync");
683                 }
684                 return FIO_Q_COMPLETED;
685         } else {
686                 io_u->error = EINVAL;
687                 td_verror(td, io_u->error, "wrong ddir");
688                 return FIO_Q_COMPLETED;
689         }
690 }
691
692 static int fio_ime_aio_commit(struct thread_data *td)
693 {
694         struct ime_data *ime_d = td->io_ops_data;
695         struct imeaio_req *ioreq;
696         int ret = 0;
697
698         /* Loop while there are events to commit */
699         while (ime_d->queued - ime_d->events) {
700                 ioreq = &ime_d->aioreqs[ime_d->cur_commit];
701                 if (ioreq->ddir == DDIR_READ)
702                         ret = ime_native_aio_read(&ioreq->iocb);
703                 else
704                         ret = ime_native_aio_write(&ioreq->iocb);
705
706                 fio_ime_queue_commit(ime_d, ioreq->iocb.iovcnt);
707
708                 /* fio needs a negative error code */
709                 if (ret < 0) {
710                         ioreq->status = FIO_IME_REQ_ERROR;
711                         return -errno;
712                 }
713
714                 io_u_mark_submit(td, ioreq->iocb.iovcnt);
715                 dprint(FD_IO, "committed %d iovecs commit=%u queued=%u events=%u\n",
716                         ioreq->iocb.iovcnt, ime_d->cur_commit,
717                         ime_d->queued, ime_d->events);
718         }
719
720         return 0;
721 }
722
723 static int fio_ime_aio_getevents(struct thread_data *td, unsigned int min,
724                                 unsigned int max, const struct timespec *t)
725 {
726         struct ime_data *ime_d = td->io_ops_data;
727         struct imeaio_req *ioreq;
728         struct io_u *io_u;
729         int events = 0;
730         unsigned int count;
731         ssize_t bytes;
732
733         while (ime_d->events) {
734                 ioreq = &ime_d->aioreqs[ime_d->tail];
735
736                 /* Break if we already got events, and if we will
737                    exceed max if we append the next events */
738                 if (events && events + ioreq->iocb.iovcnt > max)
739                         break;
740
741                 if (ioreq->status != FIO_IME_IN_PROGRESS) {
742
743                         bytes = ioreq->status;
744                         for (count = 0; count < ioreq->iocb.iovcnt; count++) {
745                                 io_u = ime_d->io_us[ime_d->tail];
746                                 ime_d->event_io_us[events] = io_u;
747                                 events++;
748                                 fio_ime_queue_red(ime_d);
749
750                                 if (ioreq->status == FIO_IME_REQ_ERROR)
751                                         io_u->error = EIO;
752                                 else {
753                                         io_u->resid = bytes > io_u->xfer_buflen ?
754                                                                         0 : io_u->xfer_buflen - bytes;
755                                         io_u->error = 0;
756                                         bytes -= io_u->xfer_buflen - io_u->resid;
757                                 }
758                         }
759                 } else {
760                         pthread_mutex_lock(&ioreq->status_mutex);
761                         while (ioreq->status == FIO_IME_IN_PROGRESS)
762                                 pthread_cond_wait(&ioreq->cond_endio, &ioreq->status_mutex);
763                         pthread_mutex_unlock(&ioreq->status_mutex);
764                 }
765
766         }
767
768         dprint(FD_IO, "getevents(%u,%u) ret=%d queued=%u events=%u\n", min, max,
769                 events, ime_d->queued, ime_d->events);
770         return events;
771 }
772
773 static int fio_ime_aio_init(struct thread_data *td)
774 {
775         struct ime_data *ime_d;
776         struct imeaio_req *ioreq;
777         unsigned int i;
778
779         if (fio_ime_engine_init(td) < 0)
780                 return 1;
781
782         ime_d = calloc(1, sizeof(*ime_d));
783
784         ime_d->aioreqs = malloc(td->o.iodepth * sizeof(struct imeaio_req));
785         ime_d->iovecs = malloc(td->o.iodepth * sizeof(struct iovec));
786         ime_d->io_us = malloc(2 * td->o.iodepth * sizeof(struct io_u *));
787         ime_d->event_io_us = ime_d->io_us + td->o.iodepth;
788
789         ime_d->depth = td->o.iodepth;
790         for (i = 0; i < ime_d->depth; i++) {
791                 ioreq = &ime_d->aioreqs[i];
792                 pthread_cond_init(&ioreq->cond_endio, NULL);
793                 pthread_mutex_init(&ioreq->status_mutex, NULL);
794         }
795
796         td->io_ops_data = ime_d;
797         return 0;
798 }
799
800 static void fio_ime_aio_clean(struct thread_data *td)
801 {
802         struct ime_data *ime_d = td->io_ops_data;
803         struct imeaio_req *ioreq;
804         unsigned int i;
805
806         if (ime_d) {
807                 for (i = 0; i < ime_d->depth; i++) {
808                         ioreq = &ime_d->aioreqs[i];
809                         pthread_cond_destroy(&ioreq->cond_endio);
810                         pthread_mutex_destroy(&ioreq->status_mutex);
811                 }
812                 free(ime_d->aioreqs);
813                 free(ime_d->iovecs);
814                 free(ime_d->io_us);
815                 free(ime_d);
816                 td->io_ops_data = NULL;
817         }
818
819         fio_ime_engine_finalize(td);
820 }
821
822
823 /**************************************************************
824  *                   IO engines definitions
825  *
826  **************************************************************/
827
828 /* The FIO_DISKLESSIO flag used for these engines is necessary to prevent
829    FIO from using POSIX calls. See fio_ime_open_file for more details. */
830
831 static struct ioengine_ops ioengine_prw = {
832         .name           = "ime_psync",
833         .version        = FIO_IOOPS_VERSION,
834         .setup          = fio_ime_setup,
835         .init           = fio_ime_engine_init,
836         .cleanup        = fio_ime_engine_finalize,
837         .queue          = fio_ime_psync_queue,
838         .open_file      = fio_ime_open_file,
839         .close_file     = fio_ime_close_file,
840         .get_file_size  = fio_ime_get_file_size,
841         .unlink_file    = fio_ime_unlink_file,
842         .flags          = FIO_SYNCIO | FIO_DISKLESSIO,
843 };
844
845 static struct ioengine_ops ioengine_pvrw = {
846         .name           = "ime_psyncv",
847         .version        = FIO_IOOPS_VERSION,
848         .setup          = fio_ime_setup,
849         .init           = fio_ime_psyncv_init,
850         .cleanup        = fio_ime_psyncv_clean,
851         .queue          = fio_ime_psyncv_queue,
852         .commit         = fio_ime_psyncv_commit,
853         .getevents      = fio_ime_psyncv_getevents,
854         .event          = fio_ime_event,
855         .open_file      = fio_ime_open_file,
856         .close_file     = fio_ime_close_file,
857         .get_file_size  = fio_ime_get_file_size,
858         .unlink_file    = fio_ime_unlink_file,
859         .flags          = FIO_SYNCIO | FIO_DISKLESSIO,
860 };
861
862 static struct ioengine_ops ioengine_aio = {
863         .name           = "ime_aio",
864         .version        = FIO_IOOPS_VERSION,
865         .setup          = fio_ime_setup,
866         .init           = fio_ime_aio_init,
867         .cleanup        = fio_ime_aio_clean,
868         .queue          = fio_ime_aio_queue,
869         .commit         = fio_ime_aio_commit,
870         .getevents      = fio_ime_aio_getevents,
871         .event          = fio_ime_event,
872         .open_file      = fio_ime_open_file,
873         .close_file     = fio_ime_close_file,
874         .get_file_size  = fio_ime_get_file_size,
875         .unlink_file    = fio_ime_unlink_file,
876         .flags          = FIO_DISKLESSIO,
877 };
878
879 static void fio_init fio_ime_register(void)
880 {
881         register_ioengine(&ioengine_prw);
882         register_ioengine(&ioengine_pvrw);
883         register_ioengine(&ioengine_aio);
884 }
885
886 static void fio_exit fio_ime_unregister(void)
887 {
888         unregister_ioengine(&ioengine_prw);
889         unregister_ioengine(&ioengine_pvrw);
890         unregister_ioengine(&ioengine_aio);
891
892         if (fio_ime_is_initialized && ime_native_finalize() < 0)
893                 log_err("Warning: IME did not finalize properly\n");
894 }