fio: fix aio trim completion latencies
[fio.git] / engines / ime.c
1 /*
2  * FIO engines for DDN's Infinite Memory Engine.
3  * This file defines 3 engines: ime_psync, ime_psyncv, and ime_aio
4  *
5  * Copyright (C) 2018      DataDirect Networks. All rights reserved.
6  *
7  * This program is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU General Public License,
9  * version 2 as published by the Free Software Foundation..
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  */
16
17 /*
18  * Some details about the new engines are given below:
19  *
20  *
21  * ime_psync:
22  * Most basic engine that issues calls to ime_native whenever an IO is queued.
23  *
24  * ime_psyncv:
25  * This engine tries to queue the IOs (by creating iovecs) if asked by FIO (via
26  * iodepth_batch). It refuses to queue when the iovecs can't be appended, and
27  * waits for FIO to issue a commit. After a call to commit and get_events, new
28  * IOs can be queued.
29  *
30  * ime_aio:
31  * This engine tries to queue the IOs (by creating iovecs) if asked by FIO (via
32  * iodepth_batch). When the iovecs can't be appended to the current request, a
33  * new request for IME is created. These requests will be issued to IME when
34  * commit is called. Contrary to ime_psyncv, there can be several requests at
35  * once. We don't need to wait for a request to terminate before creating a new
36  * one.
37  */
38
39 #include <stdio.h>
40 #include <stdlib.h>
41 #include <errno.h>
42 #include <linux/limits.h>
43 #include <ime_native.h>
44
45 #include "../fio.h"
46
47
48 /**************************************************************
49  *              Types and constants definitions
50  *
51  **************************************************************/
52
53 /* define constants for async IOs */
54 #define FIO_IME_IN_PROGRESS -1
55 #define FIO_IME_REQ_ERROR   -2
56
57 /* This flag is used when some jobs were created using threads. In that
58    case, IME can't be finalized in the engine-specific cleanup function,
59    because other threads might still use IME. Instead, IME is finalized
60    in the destructor (see fio_ime_unregister), only when the flag
61    fio_ime_is_initialized is true (which means at least one thread has
62    initialized IME). */
63 static bool fio_ime_is_initialized = false;
64
65 struct imesio_req {
66         int                     fd;             /* File descriptor */
67         enum fio_ddir   ddir;   /* Type of IO (read or write) */
68         off_t                   offset; /* File offset */
69 };
70 struct imeaio_req {
71         struct ime_aiocb        iocb;                   /* IME aio request */
72         ssize_t                 status;                 /* Status of the IME request */
73         enum fio_ddir           ddir;                   /* Type of IO (read or write) */
74         pthread_cond_t          cond_endio;             /* Condition var to notify FIO */
75         pthread_mutex_t         status_mutex;   /* Mutex for cond_endio */
76 };
77
78 /* This structure will be used for 2 engines: ime_psyncv and ime_aio */
79 struct ime_data {
80         union {
81                 struct imeaio_req       *aioreqs;       /* array of aio requests */
82                 struct imesio_req       *sioreq;        /* pointer to the only syncio request */
83         };
84         struct iovec    *iovecs;                /* array of queued iovecs */
85         struct io_u     **io_us;                /* array of queued io_u pointers */
86         struct io_u     **event_io_us;  /* array of the events retieved afer get_events*/
87         unsigned int    queued;                 /* iovecs/io_us in the queue */
88         unsigned int    events;                 /* number of committed iovecs/io_us */
89
90         /* variables used to implement a "ring" queue */
91         unsigned int depth;                     /* max entries in the queue */
92         unsigned int head;                      /* index used to append */
93         unsigned int tail;                      /* index used to pop */
94         unsigned int cur_commit;        /* index of the first uncommitted req */
95
96         /* offset used by the last iovec (used to check if the iovecs can be appended)*/
97         unsigned long long      last_offset;
98
99         /* The variables below are used for aio only */
100         struct imeaio_req       *last_req; /* last request awaiting committing */
101 };
102
103
104 /**************************************************************
105  *         Private functions for queueing/unqueueing
106  *
107  **************************************************************/
108
109 static void fio_ime_queue_incr (struct ime_data *ime_d)
110 {
111         ime_d->head = (ime_d->head + 1) % ime_d->depth;
112         ime_d->queued++;
113 }
114
115 static void fio_ime_queue_red (struct ime_data *ime_d)
116 {
117         ime_d->tail = (ime_d->tail + 1) % ime_d->depth;
118         ime_d->queued--;
119         ime_d->events--;
120 }
121
122 static void fio_ime_queue_commit (struct ime_data *ime_d, int iovcnt)
123 {
124         ime_d->cur_commit = (ime_d->cur_commit + iovcnt) % ime_d->depth;
125         ime_d->events += iovcnt;
126 }
127
128 static void fio_ime_queue_reset (struct ime_data *ime_d)
129 {
130         ime_d->head = 0;
131         ime_d->tail = 0;
132         ime_d->cur_commit = 0;
133         ime_d->queued = 0;
134         ime_d->events = 0;
135 }
136
137 /**************************************************************
138  *                   General IME functions
139  *             (needed for both sync and async IOs)
140  **************************************************************/
141
142 static char *fio_set_ime_filename(char* filename)
143 {
144         static __thread char ime_filename[PATH_MAX];
145         int ret;
146
147         ret = snprintf(ime_filename, PATH_MAX, "%s%s", DEFAULT_IME_FILE_PREFIX, filename);
148         if (ret < PATH_MAX)
149                 return ime_filename;
150
151         return NULL;
152 }
153
154 static int fio_ime_get_file_size(struct thread_data *td, struct fio_file *f)
155 {
156         struct stat buf;
157         int ret;
158         char *ime_filename;
159
160         dprint(FD_FILE, "get file size %s\n", f->file_name);
161
162         ime_filename = fio_set_ime_filename(f->file_name);
163         if (ime_filename == NULL)
164                 return 1;
165         ret = ime_native_stat(ime_filename, &buf);
166         if (ret == -1) {
167                 td_verror(td, errno, "fstat");
168                 return 1;
169         }
170
171         f->real_file_size = buf.st_size;
172         return 0;
173 }
174
175 /* This functions mimics the generic_file_open function, but issues
176    IME native calls instead of POSIX calls. */
177 static int fio_ime_open_file(struct thread_data *td, struct fio_file *f)
178 {
179         int flags = 0;
180         int ret;
181         uint64_t desired_fs;
182         char *ime_filename;
183
184         dprint(FD_FILE, "fd open %s\n", f->file_name);
185
186         if (td_trim(td)) {
187                 td_verror(td, EINVAL, "IME does not support TRIM operation");
188                 return 1;
189         }
190
191         if (td->o.oatomic) {
192                 td_verror(td, EINVAL, "IME does not support atomic IO");
193                 return 1;
194         }
195         if (td->o.odirect)
196                 flags |= O_DIRECT;
197         if (td->o.sync_io)
198                 flags |= O_SYNC;
199         if (td->o.create_on_open && td->o.allow_create)
200                 flags |= O_CREAT;
201
202         if (td_write(td)) {
203                 if (!read_only)
204                         flags |= O_RDWR;
205
206                 if (td->o.allow_create)
207                         flags |= O_CREAT;
208         } else if (td_read(td)) {
209                 flags |= O_RDONLY;
210         } else {
211                 /* We should never go here. */
212                 td_verror(td, EINVAL, "Unsopported open mode");
213                 return 1;
214         }
215
216         ime_filename = fio_set_ime_filename(f->file_name);
217         if (ime_filename == NULL)
218                 return 1;
219         f->fd = ime_native_open(ime_filename, flags, 0600);
220         if (f->fd == -1) {
221                 char buf[FIO_VERROR_SIZE];
222                 int __e = errno;
223
224                 snprintf(buf, sizeof(buf), "open(%s)", f->file_name);
225                 td_verror(td, __e, buf);
226                 return 1;
227         }
228
229         /* Now we need to make sure the real file size is sufficient for FIO
230            to do its things. This is normally done before the file open function
231            is called, but because FIO would use POSIX calls, we need to do it
232            ourselves */
233         ret = fio_ime_get_file_size(td, f);
234         if (ret < 0) {
235                 ime_native_close(f->fd);
236                 td_verror(td, errno, "ime_get_file_size");
237                 return 1;
238         }
239
240         desired_fs = f->io_size + f->file_offset;
241         if (td_write(td)) {
242                 dprint(FD_FILE, "Laying out file %s%s\n",
243                         DEFAULT_IME_FILE_PREFIX, f->file_name);
244                 if (!td->o.create_on_open &&
245                                 f->real_file_size < desired_fs &&
246                                 ime_native_ftruncate(f->fd, desired_fs) < 0) {
247                         ime_native_close(f->fd);
248                         td_verror(td, errno, "ime_native_ftruncate");
249                         return 1;
250                 }
251                 if (f->real_file_size < desired_fs)
252                         f->real_file_size = desired_fs;
253         } else if (td_read(td) && f->real_file_size < desired_fs) {
254                 ime_native_close(f->fd);
255                 log_err("error: can't read %lu bytes from file with "
256                                                 "%lu bytes\n", desired_fs, f->real_file_size);
257                 return 1;
258         }
259
260         return 0;
261 }
262
263 static int fio_ime_close_file(struct thread_data fio_unused *td, struct fio_file *f)
264 {
265         int ret = 0;
266
267         dprint(FD_FILE, "fd close %s\n", f->file_name);
268
269         if (ime_native_close(f->fd) < 0)
270                 ret = errno;
271
272         f->fd = -1;
273         return ret;
274 }
275
276 static int fio_ime_unlink_file(struct thread_data *td, struct fio_file *f)
277 {
278         char *ime_filename = fio_set_ime_filename(f->file_name);
279         int ret;
280
281         if (ime_filename == NULL)
282                 return 1;
283
284         ret = unlink(ime_filename);
285         return ret < 0 ? errno : 0;
286 }
287
288 static struct io_u *fio_ime_event(struct thread_data *td, int event)
289 {
290         struct ime_data *ime_d = td->io_ops_data;
291
292         return ime_d->event_io_us[event];
293 }
294
295 /* Setup file used to replace get_file_sizes when settin up the file.
296    Instead we will set real_file_sie to 0 for each file. This way we
297    can avoid calling ime_native_init before the forks are created. */
298 static int fio_ime_setup(struct thread_data *td)
299 {
300         struct fio_file *f;
301         unsigned int i;
302
303         for_each_file(td, f, i) {
304                 dprint(FD_FILE, "setup: set file size to 0 for %p/%d/%s\n",
305                         f, i, f->file_name);
306                 f->real_file_size = 0;
307         }
308
309         return 0;
310 }
311
312 static int fio_ime_engine_init(struct thread_data *td)
313 {
314         struct fio_file *f;
315         unsigned int i;
316
317         dprint(FD_IO, "ime engine init\n");
318         if (fio_ime_is_initialized && !td->o.use_thread) {
319                 log_err("Warning: something might go wrong. Not all threads/forks were"
320                                 " created before the FIO jobs were initialized.\n");
321         }
322
323         ime_native_init();
324         fio_ime_is_initialized = true;
325
326         /* We have to temporarily set real_file_size so that
327            FIO can initialize properly. It will be corrected
328            on file open. */
329         for_each_file(td, f, i)
330                 f->real_file_size = f->io_size + f->file_offset;
331
332         return 0;
333 }
334
335 static void fio_ime_engine_finalize(struct thread_data *td)
336 {
337         /* Only finalize IME when using forks */
338         if (!td->o.use_thread) {
339                 if (ime_native_finalize() < 0)
340                         log_err("error in ime_native_finalize\n");
341                 fio_ime_is_initialized = false;
342         }
343 }
344
345
346 /**************************************************************
347  *             Private functions for blocking IOs
348  *                     (without iovecs)
349  **************************************************************/
350
351 /* Notice: this function comes from the sync engine */
352 /* It is used by the commit function to return a proper code and fill
353    some attributes in the io_u used for the IO. */
354 static int fio_ime_psync_end(struct thread_data *td, struct io_u *io_u, ssize_t ret)
355 {
356         if (ret != (ssize_t) io_u->xfer_buflen) {
357                 if (ret >= 0) {
358                         io_u->resid = io_u->xfer_buflen - ret;
359                         io_u->error = 0;
360                         return FIO_Q_COMPLETED;
361                 } else
362                         io_u->error = errno;
363         }
364
365         if (io_u->error) {
366                 io_u_log_error(td, io_u);
367                 td_verror(td, io_u->error, "xfer");
368         }
369
370         return FIO_Q_COMPLETED;
371 }
372
373 static enum fio_q_status fio_ime_psync_queue(struct thread_data *td,
374                                            struct io_u *io_u)
375 {
376         struct fio_file *f = io_u->file;
377         ssize_t ret;
378
379         fio_ro_check(td, io_u);
380
381         if (io_u->ddir == DDIR_READ)
382                 ret = ime_native_pread(f->fd, io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
383         else if (io_u->ddir == DDIR_WRITE)
384                 ret = ime_native_pwrite(f->fd, io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
385         else if (io_u->ddir == DDIR_SYNC)
386                 ret = ime_native_fsync(f->fd);
387         else {
388                 ret = io_u->xfer_buflen;
389                 io_u->error = EINVAL;
390         }
391
392         return fio_ime_psync_end(td, io_u, ret);
393 }
394
395
396 /**************************************************************
397  *             Private functions for blocking IOs
398  *                       (with iovecs)
399  **************************************************************/
400
401 static bool fio_ime_psyncv_can_queue(struct ime_data *ime_d, struct io_u *io_u)
402 {
403         /* We can only queue if:
404           - There are no queued iovecs
405           - Or if there is at least one:
406                  - There must be no event waiting for retrieval
407                  - The offsets must be contiguous
408                  - The ddir and fd must be the same */
409         return (ime_d->queued == 0 || (
410                         ime_d->events == 0 &&
411                         ime_d->last_offset == io_u->offset &&
412                         ime_d->sioreq->ddir == io_u->ddir &&
413                         ime_d->sioreq->fd == io_u->file->fd));
414 }
415
416 /* Before using this function, we should have already
417    ensured that the queue is not full */
418 static void fio_ime_psyncv_enqueue(struct ime_data *ime_d, struct io_u *io_u)
419 {
420         struct imesio_req *ioreq = ime_d->sioreq;
421         struct iovec *iov = &ime_d->iovecs[ime_d->head];
422
423         iov->iov_base = io_u->xfer_buf;
424         iov->iov_len = io_u->xfer_buflen;
425
426         if (ime_d->queued == 0) {
427                 ioreq->offset = io_u->offset;
428                 ioreq->ddir = io_u->ddir;
429                 ioreq->fd = io_u->file->fd;
430         }
431
432         ime_d->io_us[ime_d->head] = io_u;
433         ime_d->last_offset = io_u->offset + io_u->xfer_buflen;
434         fio_ime_queue_incr(ime_d);
435 }
436
437 /* Tries to queue an IO. It will fail if the IO can't be appended to the
438    current request or if the current request has been committed but not
439    yet retrieved by get_events. */
440 static enum fio_q_status fio_ime_psyncv_queue(struct thread_data *td,
441         struct io_u *io_u)
442 {
443         struct ime_data *ime_d = td->io_ops_data;
444
445         fio_ro_check(td, io_u);
446
447         if (ime_d->queued == ime_d->depth)
448                 return FIO_Q_BUSY;
449
450         if (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_WRITE) {
451                 if (!fio_ime_psyncv_can_queue(ime_d, io_u))
452                         return FIO_Q_BUSY;
453
454                 dprint(FD_IO, "queue: ddir=%d at %u commit=%u queued=%u events=%u\n",
455                         io_u->ddir, ime_d->head, ime_d->cur_commit,
456                         ime_d->queued, ime_d->events);
457                 fio_ime_psyncv_enqueue(ime_d, io_u);
458                 return FIO_Q_QUEUED;
459         } else if (io_u->ddir == DDIR_SYNC) {
460                 if (ime_native_fsync(io_u->file->fd) < 0) {
461                         io_u->error = errno;
462                         td_verror(td, io_u->error, "fsync");
463                 }
464                 return FIO_Q_COMPLETED;
465         } else {
466                 io_u->error = EINVAL;
467                 td_verror(td, io_u->error, "wrong ddir");
468                 return FIO_Q_COMPLETED;
469         }
470 }
471
472 /* Notice: this function comes from the sync engine */
473 /* It is used by the commit function to return a proper code and fill
474    some attributes in the io_us appended to the current request. */
475 static int fio_ime_psyncv_end(struct thread_data *td, ssize_t bytes)
476 {
477         struct ime_data *ime_d = td->io_ops_data;
478         struct io_u *io_u;
479         unsigned int i;
480         int err = errno;
481
482         for (i = 0; i < ime_d->queued; i++) {
483                 io_u = ime_d->io_us[i];
484
485                 if (bytes == -1)
486                         io_u->error = err;
487                 else {
488                         unsigned int this_io;
489
490                         this_io = bytes;
491                         if (this_io > io_u->xfer_buflen)
492                                 this_io = io_u->xfer_buflen;
493
494                         io_u->resid = io_u->xfer_buflen - this_io;
495                         io_u->error = 0;
496                         bytes -= this_io;
497                 }
498         }
499
500         if (bytes == -1) {
501                 td_verror(td, err, "xfer psyncv");
502                 return -err;
503         }
504
505         return 0;
506 }
507
508 /* Commits the current request by calling ime_native (with one or several
509    iovecs). After this commit, the corresponding events (one per iovec)
510    can be retrieved by get_events. */
511 static int fio_ime_psyncv_commit(struct thread_data *td)
512 {
513         struct ime_data *ime_d = td->io_ops_data;
514         struct imesio_req *ioreq;
515         int ret = 0;
516
517         /* Exit if there are no (new) events to commit
518            or if the previous committed event haven't been retrieved */
519         if (!ime_d->queued || ime_d->events)
520                 return 0;
521
522         ioreq = ime_d->sioreq;
523         ime_d->events = ime_d->queued;
524         if (ioreq->ddir == DDIR_READ)
525                 ret = ime_native_preadv(ioreq->fd, ime_d->iovecs, ime_d->queued, ioreq->offset);
526         else
527                 ret = ime_native_pwritev(ioreq->fd, ime_d->iovecs, ime_d->queued, ioreq->offset);
528
529         dprint(FD_IO, "committed %d iovecs\n", ime_d->queued);
530
531         return fio_ime_psyncv_end(td, ret);
532 }
533
534 static int fio_ime_psyncv_getevents(struct thread_data *td, unsigned int min,
535                                 unsigned int max, const struct timespec *t)
536 {
537         struct ime_data *ime_d = td->io_ops_data;
538         struct io_u *io_u;
539         int events = 0;
540         unsigned int count;
541
542         if (ime_d->events) {
543                 for (count = 0; count < ime_d->events; count++) {
544                         io_u = ime_d->io_us[count];
545                         ime_d->event_io_us[events] = io_u;
546                         events++;
547                 }
548                 fio_ime_queue_reset(ime_d);
549         }
550
551         dprint(FD_IO, "getevents(%u,%u) ret=%d queued=%u events=%u\n",
552                 min, max, events, ime_d->queued, ime_d->events);
553         return events;
554 }
555
556 static int fio_ime_psyncv_init(struct thread_data *td)
557 {
558         struct ime_data *ime_d;
559
560         if (fio_ime_engine_init(td) < 0)
561                 return 1;
562
563         ime_d = calloc(1, sizeof(*ime_d));
564
565         ime_d->sioreq = malloc(sizeof(struct imesio_req));
566         ime_d->iovecs = malloc(td->o.iodepth * sizeof(struct iovec));
567         ime_d->io_us = malloc(2 * td->o.iodepth * sizeof(struct io_u *));
568         ime_d->event_io_us = ime_d->io_us + td->o.iodepth;
569
570         ime_d->depth = td->o.iodepth;
571
572         td->io_ops_data = ime_d;
573         return 0;
574 }
575
576 static void fio_ime_psyncv_clean(struct thread_data *td)
577 {
578         struct ime_data *ime_d = td->io_ops_data;
579
580         if (ime_d) {
581                 free(ime_d->sioreq);
582                 free(ime_d->iovecs);
583                 free(ime_d->io_us);
584                 free(ime_d);
585                 td->io_ops_data = NULL;
586         }
587
588         fio_ime_engine_finalize(td);
589 }
590
591
592 /**************************************************************
593  *           Private functions for non-blocking IOs
594  *
595  **************************************************************/
596
597 void fio_ime_aio_complete_cb  (struct ime_aiocb *aiocb, int err,
598                                                            ssize_t bytes)
599 {
600         struct imeaio_req *ioreq = (struct imeaio_req *) aiocb->user_context;
601
602         pthread_mutex_lock(&ioreq->status_mutex);
603         ioreq->status = err == 0 ? bytes : FIO_IME_REQ_ERROR;
604         pthread_mutex_unlock(&ioreq->status_mutex);
605
606         pthread_cond_signal(&ioreq->cond_endio);
607 }
608
609 static bool fio_ime_aio_can_queue (struct ime_data *ime_d, struct io_u *io_u)
610 {
611         /* So far we can queue in any case. */
612         return true;
613 }
614 static bool fio_ime_aio_can_append (struct ime_data *ime_d, struct io_u *io_u)
615 {
616         /* We can only append if:
617                 - The iovecs will be contiguous in the array
618                 - There is already a queued iovec
619                 - The offsets are contiguous
620                 - The ddir and fs are the same */
621         return (ime_d->head != 0 &&
622                         ime_d->queued - ime_d->events > 0 &&
623                         ime_d->last_offset == io_u->offset &&
624                         ime_d->last_req->ddir == io_u->ddir &&
625                         ime_d->last_req->iocb.fd == io_u->file->fd);
626 }
627
628 /* Before using this function, we should have already
629    ensured that the queue is not full */
630 static void fio_ime_aio_enqueue(struct ime_data *ime_d, struct io_u *io_u)
631 {
632         struct imeaio_req *ioreq = &ime_d->aioreqs[ime_d->head];
633         struct ime_aiocb *iocb = &ioreq->iocb;
634         struct iovec *iov = &ime_d->iovecs[ime_d->head];
635
636         iov->iov_base = io_u->xfer_buf;
637         iov->iov_len = io_u->xfer_buflen;
638
639         if (fio_ime_aio_can_append(ime_d, io_u))
640                 ime_d->last_req->iocb.iovcnt++;
641         else {
642                 ioreq->status = FIO_IME_IN_PROGRESS;
643                 ioreq->ddir = io_u->ddir;
644                 ime_d->last_req = ioreq;
645
646                 iocb->complete_cb = &fio_ime_aio_complete_cb;
647                 iocb->fd = io_u->file->fd;
648                 iocb->file_offset = io_u->offset;
649                 iocb->iov = iov;
650                 iocb->iovcnt = 1;
651                 iocb->flags = 0;
652                 iocb->user_context = (intptr_t) ioreq;
653         }
654
655         ime_d->io_us[ime_d->head] = io_u;
656         ime_d->last_offset = io_u->offset + io_u->xfer_buflen;
657         fio_ime_queue_incr(ime_d);
658 }
659
660 /* Tries to queue an IO. It will create a new request if the IO can't be
661    appended to the current request. It will fail if the queue can't contain
662    any more io_u/iovec. In this case, commit and then get_events need to be
663    called. */
664 static enum fio_q_status fio_ime_aio_queue(struct thread_data *td,
665                 struct io_u *io_u)
666 {
667         struct ime_data *ime_d = td->io_ops_data;
668
669         fio_ro_check(td, io_u);
670
671         dprint(FD_IO, "queue: ddir=%d at %u commit=%u queued=%u events=%u\n",
672                 io_u->ddir, ime_d->head, ime_d->cur_commit,
673                 ime_d->queued, ime_d->events);
674
675         if (ime_d->queued == ime_d->depth)
676                 return FIO_Q_BUSY;
677
678         if (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_WRITE) {
679                 if (!fio_ime_aio_can_queue(ime_d, io_u))
680                         return FIO_Q_BUSY;
681
682                 fio_ime_aio_enqueue(ime_d, io_u);
683                 return FIO_Q_QUEUED;
684         } else if (io_u->ddir == DDIR_SYNC) {
685                 if (ime_native_fsync(io_u->file->fd) < 0) {
686                         io_u->error = errno;
687                         td_verror(td, io_u->error, "fsync");
688                 }
689                 return FIO_Q_COMPLETED;
690         } else {
691                 io_u->error = EINVAL;
692                 td_verror(td, io_u->error, "wrong ddir");
693                 return FIO_Q_COMPLETED;
694         }
695 }
696
697 static int fio_ime_aio_commit(struct thread_data *td)
698 {
699         struct ime_data *ime_d = td->io_ops_data;
700         struct imeaio_req *ioreq;
701         int ret = 0;
702
703         /* Loop while there are events to commit */
704         while (ime_d->queued - ime_d->events) {
705                 ioreq = &ime_d->aioreqs[ime_d->cur_commit];
706                 if (ioreq->ddir == DDIR_READ)
707                         ret = ime_native_aio_read(&ioreq->iocb);
708                 else
709                         ret = ime_native_aio_write(&ioreq->iocb);
710
711                 fio_ime_queue_commit(ime_d, ioreq->iocb.iovcnt);
712
713                 /* fio needs a negative error code */
714                 if (ret < 0) {
715                         ioreq->status = FIO_IME_REQ_ERROR;
716                         return -errno;
717                 }
718
719                 io_u_mark_submit(td, ioreq->iocb.iovcnt);
720                 dprint(FD_IO, "committed %d iovecs commit=%u queued=%u events=%u\n",
721                         ioreq->iocb.iovcnt, ime_d->cur_commit,
722                         ime_d->queued, ime_d->events);
723         }
724
725         return 0;
726 }
727
728 static int fio_ime_aio_getevents(struct thread_data *td, unsigned int min,
729                                 unsigned int max, const struct timespec *t)
730 {
731         struct ime_data *ime_d = td->io_ops_data;
732         struct imeaio_req *ioreq;
733         struct io_u *io_u;
734         int events = 0;
735         unsigned int count;
736         ssize_t bytes;
737
738         while (ime_d->events) {
739                 ioreq = &ime_d->aioreqs[ime_d->tail];
740
741                 /* Break if we already got events, and if we will
742                    exceed max if we append the next events */
743                 if (events && events + ioreq->iocb.iovcnt > max)
744                         break;
745
746                 if (ioreq->status != FIO_IME_IN_PROGRESS) {
747
748                         bytes = ioreq->status;
749                         for (count = 0; count < ioreq->iocb.iovcnt; count++) {
750                                 io_u = ime_d->io_us[ime_d->tail];
751                                 ime_d->event_io_us[events] = io_u;
752                                 events++;
753                                 fio_ime_queue_red(ime_d);
754
755                                 if (ioreq->status == FIO_IME_REQ_ERROR)
756                                         io_u->error = EIO;
757                                 else {
758                                         io_u->resid = bytes > io_u->xfer_buflen ?
759                                                                         0 : io_u->xfer_buflen - bytes;
760                                         io_u->error = 0;
761                                         bytes -= io_u->xfer_buflen - io_u->resid;
762                                 }
763                         }
764                 } else {
765                         pthread_mutex_lock(&ioreq->status_mutex);
766                         while (ioreq->status == FIO_IME_IN_PROGRESS)
767                                 pthread_cond_wait(&ioreq->cond_endio, &ioreq->status_mutex);
768                         pthread_mutex_unlock(&ioreq->status_mutex);
769                 }
770
771         }
772
773         dprint(FD_IO, "getevents(%u,%u) ret=%d queued=%u events=%u\n", min, max,
774                 events, ime_d->queued, ime_d->events);
775         return events;
776 }
777
778 static int fio_ime_aio_init(struct thread_data *td)
779 {
780         struct ime_data *ime_d;
781         struct imeaio_req *ioreq;
782         unsigned int i;
783
784         if (fio_ime_engine_init(td) < 0)
785                 return 1;
786
787         ime_d = calloc(1, sizeof(*ime_d));
788
789         ime_d->aioreqs = malloc(td->o.iodepth * sizeof(struct imeaio_req));
790         ime_d->iovecs = malloc(td->o.iodepth * sizeof(struct iovec));
791         ime_d->io_us = malloc(2 * td->o.iodepth * sizeof(struct io_u *));
792         ime_d->event_io_us = ime_d->io_us + td->o.iodepth;
793
794         ime_d->depth = td->o.iodepth;
795         for (i = 0; i < ime_d->depth; i++) {
796                 ioreq = &ime_d->aioreqs[i];
797                 pthread_cond_init(&ioreq->cond_endio, NULL);
798                 pthread_mutex_init(&ioreq->status_mutex, NULL);
799         }
800
801         td->io_ops_data = ime_d;
802         return 0;
803 }
804
805 static void fio_ime_aio_clean(struct thread_data *td)
806 {
807         struct ime_data *ime_d = td->io_ops_data;
808         struct imeaio_req *ioreq;
809         unsigned int i;
810
811         if (ime_d) {
812                 for (i = 0; i < ime_d->depth; i++) {
813                         ioreq = &ime_d->aioreqs[i];
814                         pthread_cond_destroy(&ioreq->cond_endio);
815                         pthread_mutex_destroy(&ioreq->status_mutex);
816                 }
817                 free(ime_d->aioreqs);
818                 free(ime_d->iovecs);
819                 free(ime_d->io_us);
820                 free(ime_d);
821                 td->io_ops_data = NULL;
822         }
823
824         fio_ime_engine_finalize(td);
825 }
826
827
828 /**************************************************************
829  *                   IO engines definitions
830  *
831  **************************************************************/
832
833 /* The FIO_DISKLESSIO flag used for these engines is necessary to prevent
834    FIO from using POSIX calls. See fio_ime_open_file for more details. */
835
836 static struct ioengine_ops ioengine_prw = {
837         .name           = "ime_psync",
838         .version        = FIO_IOOPS_VERSION,
839         .setup          = fio_ime_setup,
840         .init           = fio_ime_engine_init,
841         .cleanup        = fio_ime_engine_finalize,
842         .queue          = fio_ime_psync_queue,
843         .open_file      = fio_ime_open_file,
844         .close_file     = fio_ime_close_file,
845         .get_file_size  = fio_ime_get_file_size,
846         .unlink_file    = fio_ime_unlink_file,
847         .flags          = FIO_SYNCIO | FIO_DISKLESSIO,
848 };
849
850 static struct ioengine_ops ioengine_pvrw = {
851         .name           = "ime_psyncv",
852         .version        = FIO_IOOPS_VERSION,
853         .setup          = fio_ime_setup,
854         .init           = fio_ime_psyncv_init,
855         .cleanup        = fio_ime_psyncv_clean,
856         .queue          = fio_ime_psyncv_queue,
857         .commit         = fio_ime_psyncv_commit,
858         .getevents      = fio_ime_psyncv_getevents,
859         .event          = fio_ime_event,
860         .open_file      = fio_ime_open_file,
861         .close_file     = fio_ime_close_file,
862         .get_file_size  = fio_ime_get_file_size,
863         .unlink_file    = fio_ime_unlink_file,
864         .flags          = FIO_SYNCIO | FIO_DISKLESSIO,
865 };
866
867 static struct ioengine_ops ioengine_aio = {
868         .name           = "ime_aio",
869         .version        = FIO_IOOPS_VERSION,
870         .setup          = fio_ime_setup,
871         .init           = fio_ime_aio_init,
872         .cleanup        = fio_ime_aio_clean,
873         .queue          = fio_ime_aio_queue,
874         .commit         = fio_ime_aio_commit,
875         .getevents      = fio_ime_aio_getevents,
876         .event          = fio_ime_event,
877         .open_file      = fio_ime_open_file,
878         .close_file     = fio_ime_close_file,
879         .get_file_size  = fio_ime_get_file_size,
880         .unlink_file    = fio_ime_unlink_file,
881         .flags          = FIO_DISKLESSIO,
882 };
883
884 static void fio_init fio_ime_register(void)
885 {
886         register_ioengine(&ioengine_prw);
887         register_ioengine(&ioengine_pvrw);
888         register_ioengine(&ioengine_aio);
889 }
890
891 static void fio_exit fio_ime_unregister(void)
892 {
893         unregister_ioengine(&ioengine_prw);
894         unregister_ioengine(&ioengine_pvrw);
895         unregister_ioengine(&ioengine_aio);
896
897         if (fio_ime_is_initialized && ime_native_finalize() < 0)
898                 log_err("Warning: IME did not finalize properly\n");
899 }