Add support for DDN's Infinite Memory Engine
[fio.git] / engines / ime.c
1 /*
2  * FIO engines for DDN's Infinite Memory Engine.
3  * This file defines 3 engines: ime_psync, ime_psyncv, and ime_aio
4  *
5  * Copyright (C) 2018      DataDirect Networks. All rights reserved.
6  *
7  * This program is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU General Public License,
9  * version 2 as published by the Free Software Foundation..
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  */
16
17 /*
18  * Some details about the new engines are given below:
19  *
20  *
21  * ime_psync:
22  * Most basic engine that issues calls to ime_native whenever an IO is queued.
23  *
24  * ime_psyncv:
25  * This engine tries to queue the IOs (by creating iovecs) if asked by FIO (via
26  * iodepth_batch). It refuses to queue when the iovecs can't be appended, and
27  * waits for FIO to issue a commit. After a call to commit and get_events, new
28  * IOs can be queued.
29  *
30  * ime_aio:
31  * This engine tries to queue the IOs (by creating iovecs) if asked by FIO (via
32  * iodepth_batch). When the iovecs can't be appended to the current request, a
33  * new request for IME is created. These requests will be issued to IME when
34  * commit is called. Contrary to ime_psyncv, there can be several requests at
35  * once. We don't need to wait for a request to terminate before creating a new
36  * one.
37  */
38
39 #include <stdio.h>
40 #include <stdlib.h>
41 #include <errno.h>
42 #include <linux/limits.h>
43 #include <ime_native.h>
44
45 #include "../fio.h"
46
47
48 /**************************************************************
49  *              Types and constants definitions
50  *
51  **************************************************************/
52
53 /* define constants for async IOs */
54 #define FIO_IME_IN_PROGRESS -1
55 #define FIO_IME_REQ_ERROR   -2
56
57 /* This flag is used when some jobs were created using threads. In that
58    case, IME can't be finalized in the engine-specific cleanup function,
59    because other threads might still use IME. Instead, IME is finalized
60    in the destructor (see fio_ime_unregister), only when the flag
61    fio_ime_is_initialized is true (which means at least one thread has
62    initialized IME). */
63 static bool fio_ime_is_initialized = false;
64
65 struct imesio_req {
66         int                     fd;             /* File descriptor */
67         enum fio_ddir   ddir;   /* Type of IO (read or write) */
68         off_t                   offset; /* File offset */
69 };
70 struct imeaio_req {
71         struct ime_aiocb        iocb;                   /* IME aio request */
72         ssize_t                 status;                 /* Status of the IME request */
73         enum fio_ddir           ddir;                   /* Type of IO (read or write) */
74         pthread_cond_t          cond_endio;             /* Condition var to notify FIO */
75         pthread_mutex_t         status_mutex;   /* Mutex for cond_endio */
76 };
77
78 /* This structure will be used for 2 engines: ime_psyncv and ime_aio */
79 struct ime_data {
80         union {
81                 struct imeaio_req       *aioreqs;       /* array of aio requests */
82                 struct imesio_req       *sioreq;        /* pointer to the only syncio request */
83         };
84         struct iovec    *iovecs;                /* array of queued iovecs */
85         struct io_u     **io_us;                /* array of queued io_u pointers */
86         struct io_u     **event_io_us;  /* array of the events retieved afer get_events*/
87         unsigned int    queued;                 /* iovecs/io_us in the queue */
88         unsigned int    events;                 /* number of committed iovecs/io_us */
89
90         /* variables used to implement a "ring" queue */
91         unsigned int depth;                     /* max entries in the queue */
92         unsigned int head;                      /* index used to append */
93         unsigned int tail;                      /* index used to pop */
94         unsigned int cur_commit;        /* index of the first uncommitted req */
95
96         /* offset used by the last iovec (used to check if the iovecs can be appended)*/
97         unsigned long long      last_offset;
98
99         /* The variables below are used for aio only */
100         struct imeaio_req       *last_req; /* last request awaiting committing */
101 };
102
103
104 /**************************************************************
105  *         Private functions for queueing/unqueueing
106  *
107  **************************************************************/
108
109 static void fio_ime_queue_incr (struct ime_data *ime_d)
110 {
111         ime_d->head = (ime_d->head + 1) % ime_d->depth;
112         ime_d->queued++;
113 }
114 static void fio_ime_queue_red (struct ime_data *ime_d)
115 {
116         ime_d->tail = (ime_d->tail + 1) % ime_d->depth;
117         ime_d->queued--;
118         ime_d->events--;
119 }
120 static void fio_ime_queue_commit (struct ime_data *ime_d, int iovcnt)
121 {
122         ime_d->cur_commit = (ime_d->cur_commit + iovcnt) % ime_d->depth;
123         ime_d->events += iovcnt;
124 }
125 static void fio_ime_queue_reset (struct ime_data *ime_d)
126 {
127         ime_d->head = 0;
128         ime_d->tail = 0;
129         ime_d->cur_commit = 0;
130         ime_d->queued = 0;
131         ime_d->events = 0;
132 }
133
134
135 /**************************************************************
136  *                   General IME functions
137  *             (needed for both sync and async IOs)
138  **************************************************************/
139
140 static char *fio_set_ime_filename(char* filename)
141 {
142         static __thread char ime_filename[PATH_MAX];
143         if (snprintf(ime_filename, PATH_MAX, "%s%s", DEFAULT_IME_FILE_PREFIX, filename) < PATH_MAX)
144                 return ime_filename;
145         else
146                 return NULL;
147 }
148
149 static int fio_ime_get_file_size(struct thread_data *td, struct fio_file *f)
150 {
151         struct stat buf;
152         int ret;
153         char *ime_filename;
154
155         dprint(FD_FILE, "get file size %s\n", f->file_name);
156
157         ime_filename = fio_set_ime_filename(f->file_name);
158         if (ime_filename == NULL)
159                 return 1;
160         ret = ime_native_stat(ime_filename, &buf);
161         if (ret == -1) {
162                 td_verror(td, errno, "fstat");
163                 return 1;
164         }
165
166         f->real_file_size = buf.st_size;
167         return 0;
168 }
169
170 /* This functions mimics the generic_file_open function, but issues
171    IME native calls instead of POSIX calls. */
172 static int fio_ime_open_file(struct thread_data *td, struct fio_file *f)
173 {
174         int flags = 0;
175         int ret;
176         uint64_t desired_fs;
177         char *ime_filename;
178
179         dprint(FD_FILE, "fd open %s\n", f->file_name);
180
181         if (td_trim(td)) {
182                 td_verror(td, EINVAL, "IME does not support TRIM operation");
183                 return 1;
184         }
185
186         if (td->o.oatomic) {
187                 td_verror(td, EINVAL, "IME does not support atomic IO");
188                 return 1;
189         }
190         if (td->o.odirect)
191                 flags |= O_DIRECT;
192         if (td->o.sync_io)
193                 flags |= O_SYNC;
194         if (td->o.create_on_open && td->o.allow_create)
195                 flags |= O_CREAT;
196
197         if (td_write(td)) {
198                 if (!read_only)
199                         flags |= O_RDWR;
200
201                 if (td->o.allow_create)
202                         flags |= O_CREAT;
203         }
204         else if (td_read(td)) {
205                 flags |= O_RDONLY;
206         }
207         else {
208                 /* We should never go here. */
209                 td_verror(td, EINVAL, "Unsopported open mode");
210                 return 1;
211         }
212
213         ime_filename = fio_set_ime_filename(f->file_name);
214         if (ime_filename == NULL)
215                 return 1;
216         f->fd = ime_native_open(ime_filename, flags, 0600);
217         if (f->fd == -1) {
218                 char buf[FIO_VERROR_SIZE];
219                 int __e = errno;
220
221                 snprintf(buf, sizeof(buf), "open(%s)", f->file_name);
222                 td_verror(td, __e, buf);
223                 return 1;
224         }
225
226         /* Now we need to make sure the real file size is sufficient for FIO
227            to do its things. This is normally done before the file open function
228            is called, but because FIO would use POSIX calls, we need to do it
229            ourselves */
230         ret = fio_ime_get_file_size(td, f);
231         if (ret < 0) {
232                 ime_native_close(f->fd);
233                 td_verror(td, errno, "ime_get_file_size");
234                 return 1;
235         }
236
237         desired_fs = f->io_size + f->file_offset;
238         if (td_write(td)) {
239                 dprint(FD_FILE, "Laying out file %s%s\n",
240                         DEFAULT_IME_FILE_PREFIX, f->file_name);
241                 if (!td->o.create_on_open &&
242                                 f->real_file_size < desired_fs &&
243                                 ime_native_ftruncate(f->fd, desired_fs) < 0) {
244                         ime_native_close(f->fd);
245                         td_verror(td, errno, "ime_native_ftruncate");
246                         return 1;
247                 }
248                 if (f->real_file_size < desired_fs)
249                         f->real_file_size = desired_fs;
250         }
251         else if (td_read(td) && f->real_file_size < desired_fs) {
252                 ime_native_close(f->fd);
253                 log_err("error: can't read %lu bytes from file with "
254                                                 "%lu bytes\n", desired_fs, f->real_file_size);
255                 return 1;
256         }
257
258         return 0;
259 }
260
261 static int fio_ime_close_file(struct thread_data fio_unused *td, struct fio_file *f)
262 {
263         int ret = 0;
264
265         dprint(FD_FILE, "fd close %s\n", f->file_name);
266
267         if (ime_native_close(f->fd) < 0)
268                 ret = errno;
269
270         f->fd = -1;
271         return ret;
272 }
273
274 static int fio_ime_unlink_file(struct thread_data *td, struct fio_file *f)
275 {
276         int ret;
277
278         char *ime_filename = fio_set_ime_filename(f->file_name);
279         if (ime_filename == NULL)
280                 return 1;
281         ret = unlink(ime_filename);
282
283         return ret < 0 ? errno : 0;
284 }
285
286 static struct io_u *fio_ime_event(struct thread_data *td, int event)
287 {
288         struct ime_data *ime_d = td->io_ops_data;
289         return ime_d->event_io_us[event];
290 }
291
292 /* Setup file used to replace get_file_sizes when settin up the file.
293    Instead we will set real_file_sie to 0 for each file. This way we
294    can avoid calling ime_native_init before the forks are created. */
295 static int fio_ime_setup(struct thread_data *td)
296 {
297         struct fio_file *f;
298         unsigned int i;
299
300         for_each_file(td, f, i) {
301                 dprint(FD_FILE, "setup: set file size to 0 for %p/%d/%s\n",
302                         f, i, f->file_name);
303                 f->real_file_size = 0;
304         }
305
306         return 0;
307 }
308
309 static int fio_ime_engine_init(struct thread_data *td)
310 {
311         struct fio_file *f;
312         unsigned int i;
313
314         dprint(FD_IO, "ime engine init\n");
315         if (fio_ime_is_initialized && !td->o.use_thread) {
316                 log_err("Warning: something might go wrong. Not all threads/forks were"
317                                 " created before the FIO jobs were initialized.\n");
318         }
319
320         ime_native_init();
321         fio_ime_is_initialized = true;
322
323         /* We have to temporarily set real_file_size so that
324            FIO can initialize properly. It will be corrected
325            on file open. */
326         for_each_file(td, f, i)
327                 f->real_file_size = f->io_size + f->file_offset;
328
329         return 0;
330 }
331
332 static void fio_ime_engine_finalize(struct thread_data *td)
333 {
334         /* Only finalize IME when using forks */
335         if (!td->o.use_thread) {
336                 if (ime_native_finalize() < 0)
337                         log_err("error in ime_native_finalize\n");
338                 fio_ime_is_initialized = false;
339         }
340 }
341
342
343 /**************************************************************
344  *             Private functions for blocking IOs
345  *                     (without iovecs)
346  **************************************************************/
347
348 /* Notice: this function comes from the sync engine */
349 /* It is used by the commit function to return a proper code and fill
350    some attributes in the io_u used for the IO. */
351 static int fio_ime_psync_end(struct thread_data *td, struct io_u *io_u, ssize_t ret)
352 {
353         if (ret != (ssize_t) io_u->xfer_buflen) {
354                 if (ret >= 0) {
355                         io_u->resid = io_u->xfer_buflen - ret;
356                         io_u->error = 0;
357                         return FIO_Q_COMPLETED;
358                 } else
359                         io_u->error = errno;
360         }
361
362         if (io_u->error) {
363                 io_u_log_error(td, io_u);
364                 td_verror(td, io_u->error, "xfer");
365         }
366
367         return FIO_Q_COMPLETED;
368 }
369
370 static enum fio_q_status fio_ime_psync_queue(struct thread_data *td,
371                                            struct io_u *io_u)
372 {
373         struct fio_file *f = io_u->file;
374         ssize_t ret;
375
376         fio_ro_check(td, io_u);
377
378         if (io_u->ddir == DDIR_READ)
379                 ret = ime_native_pread(f->fd, io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
380         else if (io_u->ddir == DDIR_WRITE)
381                 ret = ime_native_pwrite(f->fd, io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
382         else if (io_u->ddir == DDIR_SYNC)
383                 ret = ime_native_fsync(f->fd);
384         else {
385                 ret = io_u->xfer_buflen;
386                 io_u->error = EINVAL;
387         }
388
389         return fio_ime_psync_end(td, io_u, ret);
390 }
391
392
393 /**************************************************************
394  *             Private functions for blocking IOs
395  *                       (with iovecs)
396  **************************************************************/
397
398 static bool fio_ime_psyncv_can_queue(struct ime_data *ime_d, struct io_u *io_u)
399 {
400         /* We can only queue if:
401           - There are no queued iovecs
402           - Or if there is at least one:
403                  - There must be no event waiting for retrieval
404                  - The offsets must be contiguous
405                  - The ddir and fd must be the same */
406         return (ime_d->queued == 0 || (
407                         ime_d->events == 0 &&
408                         ime_d->last_offset == io_u->offset &&
409                         ime_d->sioreq->ddir == io_u->ddir &&
410                         ime_d->sioreq->fd == io_u->file->fd));
411 }
412
413 /* Before using this function, we should have already
414    ensured that the queue is not full */
415 static void fio_ime_psyncv_enqueue(struct ime_data *ime_d, struct io_u *io_u)
416 {
417         struct imesio_req *ioreq = ime_d->sioreq;
418         struct iovec *iov = &ime_d->iovecs[ime_d->head];
419
420         iov->iov_base = io_u->xfer_buf;
421         iov->iov_len = io_u->xfer_buflen;
422
423         if (ime_d->queued == 0) {
424                 ioreq->offset = io_u->offset;
425                 ioreq->ddir = io_u->ddir;
426                 ioreq->fd = io_u->file->fd;
427         }
428
429         ime_d->io_us[ime_d->head] = io_u;
430         ime_d->last_offset = io_u->offset + io_u->xfer_buflen;
431         fio_ime_queue_incr(ime_d);
432 }
433
434 /* Tries to queue an IO. It will fail if the IO can't be appended to the
435    current request or if the current request has been committed but not
436    yet retrieved by get_events. */
437 static enum fio_q_status fio_ime_psyncv_queue(struct thread_data *td,
438         struct io_u *io_u)
439 {
440         struct ime_data *ime_d = td->io_ops_data;
441
442         fio_ro_check(td, io_u);
443
444         if (ime_d->queued == ime_d->depth)
445                 return FIO_Q_BUSY;
446
447         if (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_WRITE) {
448                 if (!fio_ime_psyncv_can_queue(ime_d, io_u))
449                         return FIO_Q_BUSY;
450
451                 dprint(FD_IO, "queue: ddir=%d at %u commit=%u queued=%u events=%u\n",
452                         io_u->ddir, ime_d->head, ime_d->cur_commit,
453                         ime_d->queued, ime_d->events);
454                 fio_ime_psyncv_enqueue(ime_d, io_u);
455                 return FIO_Q_QUEUED;
456         }
457         else if (io_u->ddir == DDIR_SYNC) {
458                 if (ime_native_fsync(io_u->file->fd) < 0) {
459                         io_u->error = errno;
460                         td_verror(td, io_u->error, "fsync");
461                 }
462                 return FIO_Q_COMPLETED;
463         } else {
464                 io_u->error = EINVAL;
465                 td_verror(td, io_u->error, "wrong ddir");
466                 return FIO_Q_COMPLETED;
467         }
468 }
469
470 /* Notice: this function comes from the sync engine */
471 /* It is used by the commit function to return a proper code and fill
472    some attributes in the io_us appended to the current request. */
473 static int fio_ime_psyncv_end(struct thread_data *td, ssize_t bytes)
474 {
475         struct ime_data *ime_d = td->io_ops_data;
476         struct io_u *io_u;
477         unsigned int i;
478         int err = errno;
479
480         for (i = 0; i < ime_d->queued; i++) {
481                 io_u = ime_d->io_us[i];
482
483                 if (bytes == -1)
484                         io_u->error = err;
485                 else {
486                         unsigned int this_io;
487
488                         this_io = bytes;
489                         if (this_io > io_u->xfer_buflen)
490                                 this_io = io_u->xfer_buflen;
491
492                         io_u->resid = io_u->xfer_buflen - this_io;
493                         io_u->error = 0;
494                         bytes -= this_io;
495                 }
496         }
497
498         if (bytes == -1) {
499                 td_verror(td, err, "xfer psyncv");
500                 return -err;
501         }
502
503         return 0;
504 }
505
506 /* Commits the current request by calling ime_native (with one or several
507    iovecs). After this commit, the corresponding events (one per iovec)
508    can be retrieved by get_events. */
509 static int fio_ime_psyncv_commit(struct thread_data *td)
510 {
511         struct ime_data *ime_d = td->io_ops_data;
512         struct imesio_req *ioreq;
513         int ret = 0;
514
515         /* Exit if there are no (new) events to commit
516            or if the previous committed event haven't been retrieved */
517         if (!ime_d->queued || ime_d->events)
518                 return 0;
519
520         ioreq = ime_d->sioreq;
521         ime_d->events = ime_d->queued;
522         if (ioreq->ddir == DDIR_READ)
523                 ret = ime_native_preadv(ioreq->fd, ime_d->iovecs, ime_d->queued, ioreq->offset);
524         else
525                 ret = ime_native_pwritev(ioreq->fd, ime_d->iovecs, ime_d->queued, ioreq->offset);
526
527         dprint(FD_IO, "committed %d iovecs\n", ime_d->queued);
528
529         return fio_ime_psyncv_end(td, ret);
530 }
531
532 static int fio_ime_psyncv_getevents(struct thread_data *td, unsigned int min,
533                                 unsigned int max, const struct timespec *t)
534 {
535         struct ime_data *ime_d = td->io_ops_data;
536         struct io_u *io_u;
537         int events = 0;
538         unsigned int count;
539
540         if (ime_d->events) {
541                 for (count = 0; count < ime_d->events; count++) {
542                         io_u = ime_d->io_us[count];
543                         ime_d->event_io_us[events] = io_u;
544                         events++;
545                 }
546                 fio_ime_queue_reset(ime_d);
547         }
548
549         dprint(FD_IO, "getevents(%u,%u) ret=%d queued=%u events=%u\n",
550                 min, max, events, ime_d->queued, ime_d->events);
551         return events;
552 }
553
554 static int fio_ime_psyncv_init(struct thread_data *td)
555 {
556         struct ime_data *ime_d;
557
558         if (fio_ime_engine_init(td) < 0)
559                 return 1;
560
561         ime_d = calloc(1, sizeof(*ime_d));
562
563         ime_d->sioreq = malloc(sizeof(struct imesio_req));
564         ime_d->iovecs = malloc(td->o.iodepth * sizeof(struct iovec));
565         ime_d->io_us = malloc(2 * td->o.iodepth * sizeof(struct io_u *));
566         ime_d->event_io_us = ime_d->io_us + td->o.iodepth;
567
568         ime_d->depth = td->o.iodepth;
569
570         td->io_ops_data = ime_d;
571         return 0;
572 }
573
574 static void fio_ime_psyncv_clean(struct thread_data *td)
575 {
576         struct ime_data *ime_d = td->io_ops_data;
577
578         if (ime_d) {
579                 free(ime_d->sioreq);
580                 free(ime_d->iovecs);
581                 free(ime_d->io_us);
582                 free(ime_d);
583                 td->io_ops_data = NULL;
584         }
585
586         fio_ime_engine_finalize(td);
587 }
588
589
590 /**************************************************************
591  *           Private functions for non-blocking IOs
592  *
593  **************************************************************/
594
595 void fio_ime_aio_complete_cb  (struct ime_aiocb *aiocb, int err,
596                                                            ssize_t bytes)
597 {
598         struct imeaio_req *ioreq = (struct imeaio_req *) aiocb->user_context;
599
600         pthread_mutex_lock(&ioreq->status_mutex);
601         ioreq->status = err == 0 ? bytes : FIO_IME_REQ_ERROR;
602         pthread_mutex_unlock(&ioreq->status_mutex);
603
604         pthread_cond_signal(&ioreq->cond_endio);
605 }
606
607 static bool fio_ime_aio_can_queue (struct ime_data *ime_d, struct io_u *io_u)
608 {
609         /* So far we can queue in any case. */
610         return true;
611 }
612 static bool fio_ime_aio_can_append (struct ime_data *ime_d, struct io_u *io_u)
613 {
614         /* We can only append if:
615                 - The iovecs will be contiguous in the array
616                 - There is already a queued iovec
617                 - The offsets are contiguous
618                 - The ddir and fs are the same */
619         return (ime_d->head != 0 &&
620                         ime_d->queued - ime_d->events > 0 &&
621                         ime_d->last_offset == io_u->offset &&
622                         ime_d->last_req->ddir == io_u->ddir &&
623                         ime_d->last_req->iocb.fd == io_u->file->fd);
624 }
625
626 /* Before using this function, we should have already
627    ensured that the queue is not full */
628 static void fio_ime_aio_enqueue(struct ime_data *ime_d, struct io_u *io_u)
629 {
630         struct imeaio_req *ioreq = &ime_d->aioreqs[ime_d->head];
631         struct ime_aiocb *iocb = &ioreq->iocb;
632         struct iovec *iov = &ime_d->iovecs[ime_d->head];
633
634         iov->iov_base = io_u->xfer_buf;
635         iov->iov_len = io_u->xfer_buflen;
636
637         if (fio_ime_aio_can_append(ime_d, io_u))
638                 ime_d->last_req->iocb.iovcnt++;
639         else {
640                 ioreq->status = FIO_IME_IN_PROGRESS;
641                 ioreq->ddir = io_u->ddir;
642                 ime_d->last_req = ioreq;
643
644                 iocb->complete_cb = &fio_ime_aio_complete_cb;
645                 iocb->fd = io_u->file->fd;
646                 iocb->file_offset = io_u->offset;
647                 iocb->iov = iov;
648                 iocb->iovcnt = 1;
649                 iocb->flags = 0;
650                 iocb->user_context = (intptr_t) ioreq;
651         }
652
653         ime_d->io_us[ime_d->head] = io_u;
654         ime_d->last_offset = io_u->offset + io_u->xfer_buflen;
655         fio_ime_queue_incr(ime_d);
656 }
657
658 /* Tries to queue an IO. It will create a new request if the IO can't be
659    appended to the current request. It will fail if the queue can't contain
660    any more io_u/iovec. In this case, commit and then get_events need to be
661    called. */
662 static enum fio_q_status fio_ime_aio_queue(struct thread_data *td,
663                 struct io_u *io_u)
664 {
665         struct ime_data *ime_d = td->io_ops_data;
666
667         fio_ro_check(td, io_u);
668
669         dprint(FD_IO, "queue: ddir=%d at %u commit=%u queued=%u events=%u\n",
670                 io_u->ddir, ime_d->head, ime_d->cur_commit,
671                 ime_d->queued, ime_d->events);
672
673         if (ime_d->queued == ime_d->depth)
674                 return FIO_Q_BUSY;
675
676         if (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_WRITE) {
677                 if (!fio_ime_aio_can_queue(ime_d, io_u))
678                         return FIO_Q_BUSY;
679
680                 fio_ime_aio_enqueue(ime_d, io_u);
681                 return FIO_Q_QUEUED;
682         }
683         else if (io_u->ddir == DDIR_SYNC) {
684                 if (ime_native_fsync(io_u->file->fd) < 0) {
685                         io_u->error = errno;
686                         td_verror(td, io_u->error, "fsync");
687                 }
688                 return FIO_Q_COMPLETED;
689         } else {
690                 io_u->error = EINVAL;
691                 td_verror(td, io_u->error, "wrong ddir");
692                 return FIO_Q_COMPLETED;
693         }
694 }
695
696 static int fio_ime_aio_commit(struct thread_data *td)
697 {
698         struct ime_data *ime_d = td->io_ops_data;
699         struct imeaio_req *ioreq;
700         int ret = 0;
701
702         /* Loop while there are events to commit */
703         while (ime_d->queued - ime_d->events) {
704                 ioreq = &ime_d->aioreqs[ime_d->cur_commit];
705                 if (ioreq->ddir == DDIR_READ)
706                         ret = ime_native_aio_read(&ioreq->iocb);
707                 else
708                         ret = ime_native_aio_write(&ioreq->iocb);
709
710                 fio_ime_queue_commit(ime_d, ioreq->iocb.iovcnt);
711
712                 /* fio needs a negative error code */
713                 if (ret < 0) {
714                         ioreq->status = FIO_IME_REQ_ERROR;
715                         return -errno;
716                 }
717
718                 io_u_mark_submit(td, ioreq->iocb.iovcnt);
719                 dprint(FD_IO, "committed %d iovecs commit=%u queued=%u events=%u\n",
720                         ioreq->iocb.iovcnt, ime_d->cur_commit,
721                         ime_d->queued, ime_d->events);
722         }
723
724         return 0;
725 }
726
727 static int fio_ime_aio_getevents(struct thread_data *td, unsigned int min,
728                                 unsigned int max, const struct timespec *t)
729 {
730         struct ime_data *ime_d = td->io_ops_data;
731         struct imeaio_req *ioreq;
732         struct io_u *io_u;
733         int events = 0;
734         unsigned int count;
735         ssize_t bytes;
736
737         while (ime_d->events) {
738                 ioreq = &ime_d->aioreqs[ime_d->tail];
739
740                 /* Break if we already got events, and if we will
741                    exceed max if we append the next events */
742                 if (events && events + ioreq->iocb.iovcnt > max)
743                         break;
744
745                 if (ioreq->status != FIO_IME_IN_PROGRESS) {
746
747                         bytes = ioreq->status;
748                         for (count = 0; count < ioreq->iocb.iovcnt; count++) {
749                                 io_u = ime_d->io_us[ime_d->tail];
750                                 ime_d->event_io_us[events] = io_u;
751                                 events++;
752                                 fio_ime_queue_red(ime_d);
753
754                                 if (ioreq->status == FIO_IME_REQ_ERROR)
755                                         io_u->error = EIO;
756                                 else {
757                                         io_u->resid = bytes > io_u->xfer_buflen ?
758                                                                         0 : io_u->xfer_buflen - bytes;
759                                         io_u->error = 0;
760                                         bytes -= io_u->xfer_buflen - io_u->resid;
761                                 }
762                         }
763                 } else {
764                         pthread_mutex_lock(&ioreq->status_mutex);
765                         while (ioreq->status == FIO_IME_IN_PROGRESS) {
766                                 pthread_cond_wait(&ioreq->cond_endio, &ioreq->status_mutex);
767                         }
768                         pthread_mutex_unlock(&ioreq->status_mutex);
769                 }
770
771         }
772
773         dprint(FD_IO, "getevents(%u,%u) ret=%d queued=%u events=%u\n", min, max,
774                 events, ime_d->queued, ime_d->events);
775         return events;
776 }
777
778 static int fio_ime_aio_init(struct thread_data *td)
779 {
780         struct ime_data *ime_d;
781         struct imeaio_req *ioreq;
782         unsigned int i;
783
784         if (fio_ime_engine_init(td) < 0)
785                 return 1;
786
787         ime_d = calloc(1, sizeof(*ime_d));
788
789         ime_d->aioreqs = malloc(td->o.iodepth * sizeof(struct imeaio_req));
790         ime_d->iovecs = malloc(td->o.iodepth * sizeof(struct iovec));
791         ime_d->io_us = malloc(2 * td->o.iodepth * sizeof(struct io_u *));
792         ime_d->event_io_us = ime_d->io_us + td->o.iodepth;
793
794         ime_d->depth = td->o.iodepth;
795         for (i = 0; i < ime_d->depth; i++) {
796                 ioreq = &ime_d->aioreqs[i];
797                 pthread_cond_init(&ioreq->cond_endio, NULL);
798                 pthread_mutex_init(&ioreq->status_mutex, NULL);
799         }
800
801         td->io_ops_data = ime_d;
802         return 0;
803 }
804
805 static void fio_ime_aio_clean(struct thread_data *td)
806 {
807         struct ime_data *ime_d = td->io_ops_data;
808         struct imeaio_req *ioreq;
809         unsigned int i;
810
811         if (ime_d) {
812                 for (i = 0; i < ime_d->depth; i++) {
813                         ioreq = &ime_d->aioreqs[i];
814                         pthread_cond_destroy(&ioreq->cond_endio);
815                         pthread_mutex_destroy(&ioreq->status_mutex);
816                 }
817                 free(ime_d->aioreqs);
818                 free(ime_d->iovecs);
819                 free(ime_d->io_us);
820                 free(ime_d);
821                 td->io_ops_data = NULL;
822         }
823
824         fio_ime_engine_finalize(td);
825 }
826
827
828 /**************************************************************
829  *                   IO engines definitions
830  *
831  **************************************************************/
832
833 /* The FIO_DISKLESSIO flag used for these engines is necessary to prevent
834    FIO from using POSIX calls. See fio_ime_open_file for more details. */
835
836 static struct ioengine_ops ioengine_prw = {
837         .name           = "ime_psync",
838         .version        = FIO_IOOPS_VERSION,
839         .setup          = fio_ime_setup,
840         .init           = fio_ime_engine_init,
841         .cleanup        = fio_ime_engine_finalize,
842         .queue          = fio_ime_psync_queue,
843         .open_file      = fio_ime_open_file,
844         .close_file     = fio_ime_close_file,
845         .get_file_size  = fio_ime_get_file_size,
846         .unlink_file    = fio_ime_unlink_file,
847         .flags          = FIO_SYNCIO | FIO_DISKLESSIO,
848 };
849
850 static struct ioengine_ops ioengine_pvrw = {
851         .name           = "ime_psyncv",
852         .version        = FIO_IOOPS_VERSION,
853         .setup          = fio_ime_setup,
854         .init           = fio_ime_psyncv_init,
855         .cleanup        = fio_ime_psyncv_clean,
856         .queue          = fio_ime_psyncv_queue,
857         .commit         = fio_ime_psyncv_commit,
858         .getevents      = fio_ime_psyncv_getevents,
859         .event          = fio_ime_event,
860         .open_file      = fio_ime_open_file,
861         .close_file     = fio_ime_close_file,
862         .get_file_size  = fio_ime_get_file_size,
863         .unlink_file    = fio_ime_unlink_file,
864         .flags          = FIO_SYNCIO | FIO_DISKLESSIO,
865 };
866
867 static struct ioengine_ops ioengine_aio = {
868         .name           = "ime_aio",
869         .version        = FIO_IOOPS_VERSION,
870         .setup          = fio_ime_setup,
871         .init           = fio_ime_aio_init,
872         .cleanup        = fio_ime_aio_clean,
873         .queue          = fio_ime_aio_queue,
874         .commit         = fio_ime_aio_commit,
875         .getevents      = fio_ime_aio_getevents,
876         .event          = fio_ime_event,
877         .open_file      = fio_ime_open_file,
878         .close_file     = fio_ime_close_file,
879         .get_file_size  = fio_ime_get_file_size,
880         .unlink_file    = fio_ime_unlink_file,
881         .flags          = FIO_DISKLESSIO,
882 };
883
884 static void fio_init fio_ime_register(void)
885 {
886         register_ioengine(&ioengine_prw);
887         register_ioengine(&ioengine_pvrw);
888         register_ioengine(&ioengine_aio);
889 }
890
891 static void fio_exit fio_ime_unregister(void)
892 {
893         unregister_ioengine(&ioengine_prw);
894         unregister_ioengine(&ioengine_pvrw);
895         unregister_ioengine(&ioengine_aio);
896         if (fio_ime_is_initialized && ime_native_finalize() < 0) {
897                 log_err("Warning: IME did not finalize properly\n");
898         }
899 }