Merge branch 'master' of https://github.com/celestinechen/fio
[fio.git] / engines / ime.c
CommitLineData
a40e7a59
GB
1/*
2 * FIO engines for DDN's Infinite Memory Engine.
3 * This file defines 3 engines: ime_psync, ime_psyncv, and ime_aio
4 *
5 * Copyright (C) 2018 DataDirect Networks. All rights reserved.
6 *
7 * This program is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU General Public License,
9 * version 2 as published by the Free Software Foundation..
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 */
16
17/*
18 * Some details about the new engines are given below:
19 *
20 *
21 * ime_psync:
22 * Most basic engine that issues calls to ime_native whenever an IO is queued.
23 *
24 * ime_psyncv:
25 * This engine tries to queue the IOs (by creating iovecs) if asked by FIO (via
26 * iodepth_batch). It refuses to queue when the iovecs can't be appended, and
27 * waits for FIO to issue a commit. After a call to commit and get_events, new
28 * IOs can be queued.
29 *
30 * ime_aio:
31 * This engine tries to queue the IOs (by creating iovecs) if asked by FIO (via
32 * iodepth_batch). When the iovecs can't be appended to the current request, a
33 * new request for IME is created. These requests will be issued to IME when
34 * commit is called. Contrary to ime_psyncv, there can be several requests at
35 * once. We don't need to wait for a request to terminate before creating a new
36 * one.
37 */
38
39#include <stdio.h>
40#include <stdlib.h>
41#include <errno.h>
42#include <linux/limits.h>
43#include <ime_native.h>
44
45#include "../fio.h"
46
47
48/**************************************************************
49 * Types and constants definitions
50 *
51 **************************************************************/
52
53/* define constants for async IOs */
54#define FIO_IME_IN_PROGRESS -1
55#define FIO_IME_REQ_ERROR -2
56
57/* This flag is used when some jobs were created using threads. In that
58 case, IME can't be finalized in the engine-specific cleanup function,
59 because other threads might still use IME. Instead, IME is finalized
60 in the destructor (see fio_ime_unregister), only when the flag
61 fio_ime_is_initialized is true (which means at least one thread has
62 initialized IME). */
63static bool fio_ime_is_initialized = false;
64
65struct imesio_req {
66 int fd; /* File descriptor */
67 enum fio_ddir ddir; /* Type of IO (read or write) */
68 off_t offset; /* File offset */
69};
70struct imeaio_req {
71 struct ime_aiocb iocb; /* IME aio request */
72 ssize_t status; /* Status of the IME request */
73 enum fio_ddir ddir; /* Type of IO (read or write) */
74 pthread_cond_t cond_endio; /* Condition var to notify FIO */
75 pthread_mutex_t status_mutex; /* Mutex for cond_endio */
76};
77
78/* This structure will be used for 2 engines: ime_psyncv and ime_aio */
79struct ime_data {
80 union {
81 struct imeaio_req *aioreqs; /* array of aio requests */
82 struct imesio_req *sioreq; /* pointer to the only syncio request */
83 };
84 struct iovec *iovecs; /* array of queued iovecs */
85 struct io_u **io_us; /* array of queued io_u pointers */
fc002f14 86 struct io_u **event_io_us; /* array of the events retrieved after get_events*/
a40e7a59
GB
87 unsigned int queued; /* iovecs/io_us in the queue */
88 unsigned int events; /* number of committed iovecs/io_us */
89
90 /* variables used to implement a "ring" queue */
91 unsigned int depth; /* max entries in the queue */
92 unsigned int head; /* index used to append */
93 unsigned int tail; /* index used to pop */
94 unsigned int cur_commit; /* index of the first uncommitted req */
95
96 /* offset used by the last iovec (used to check if the iovecs can be appended)*/
97 unsigned long long last_offset;
98
99 /* The variables below are used for aio only */
100 struct imeaio_req *last_req; /* last request awaiting committing */
101};
102
103
104/**************************************************************
105 * Private functions for queueing/unqueueing
106 *
107 **************************************************************/
108
109static void fio_ime_queue_incr (struct ime_data *ime_d)
110{
111 ime_d->head = (ime_d->head + 1) % ime_d->depth;
112 ime_d->queued++;
113}
38c1392e 114
a40e7a59
GB
115static void fio_ime_queue_red (struct ime_data *ime_d)
116{
117 ime_d->tail = (ime_d->tail + 1) % ime_d->depth;
118 ime_d->queued--;
119 ime_d->events--;
120}
38c1392e 121
a40e7a59
GB
122static void fio_ime_queue_commit (struct ime_data *ime_d, int iovcnt)
123{
124 ime_d->cur_commit = (ime_d->cur_commit + iovcnt) % ime_d->depth;
125 ime_d->events += iovcnt;
126}
38c1392e 127
a40e7a59
GB
128static void fio_ime_queue_reset (struct ime_data *ime_d)
129{
130 ime_d->head = 0;
131 ime_d->tail = 0;
132 ime_d->cur_commit = 0;
133 ime_d->queued = 0;
134 ime_d->events = 0;
135}
136
a40e7a59
GB
137/**************************************************************
138 * General IME functions
139 * (needed for both sync and async IOs)
140 **************************************************************/
141
142static char *fio_set_ime_filename(char* filename)
143{
144 static __thread char ime_filename[PATH_MAX];
38c1392e
JA
145 int ret;
146
147 ret = snprintf(ime_filename, PATH_MAX, "%s%s", DEFAULT_IME_FILE_PREFIX, filename);
148 if (ret < PATH_MAX)
a40e7a59 149 return ime_filename;
38c1392e
JA
150
151 return NULL;
a40e7a59
GB
152}
153
154static int fio_ime_get_file_size(struct thread_data *td, struct fio_file *f)
155{
156 struct stat buf;
157 int ret;
158 char *ime_filename;
159
160 dprint(FD_FILE, "get file size %s\n", f->file_name);
161
162 ime_filename = fio_set_ime_filename(f->file_name);
163 if (ime_filename == NULL)
164 return 1;
165 ret = ime_native_stat(ime_filename, &buf);
166 if (ret == -1) {
167 td_verror(td, errno, "fstat");
168 return 1;
169 }
170
171 f->real_file_size = buf.st_size;
172 return 0;
173}
174
175/* This functions mimics the generic_file_open function, but issues
176 IME native calls instead of POSIX calls. */
177static int fio_ime_open_file(struct thread_data *td, struct fio_file *f)
178{
179 int flags = 0;
180 int ret;
181 uint64_t desired_fs;
182 char *ime_filename;
183
184 dprint(FD_FILE, "fd open %s\n", f->file_name);
185
186 if (td_trim(td)) {
187 td_verror(td, EINVAL, "IME does not support TRIM operation");
188 return 1;
189 }
190
a40e7a59
GB
191 if (td->o.odirect)
192 flags |= O_DIRECT;
eb9f8d7f 193 flags |= td->o.sync_io;
a40e7a59
GB
194 if (td->o.create_on_open && td->o.allow_create)
195 flags |= O_CREAT;
196
197 if (td_write(td)) {
198 if (!read_only)
199 flags |= O_RDWR;
200
201 if (td->o.allow_create)
202 flags |= O_CREAT;
38c1392e 203 } else if (td_read(td)) {
a40e7a59 204 flags |= O_RDONLY;
38c1392e 205 } else {
a40e7a59
GB
206 /* We should never go here. */
207 td_verror(td, EINVAL, "Unsopported open mode");
208 return 1;
209 }
210
211 ime_filename = fio_set_ime_filename(f->file_name);
212 if (ime_filename == NULL)
213 return 1;
214 f->fd = ime_native_open(ime_filename, flags, 0600);
215 if (f->fd == -1) {
216 char buf[FIO_VERROR_SIZE];
217 int __e = errno;
218
219 snprintf(buf, sizeof(buf), "open(%s)", f->file_name);
220 td_verror(td, __e, buf);
221 return 1;
222 }
223
224 /* Now we need to make sure the real file size is sufficient for FIO
225 to do its things. This is normally done before the file open function
226 is called, but because FIO would use POSIX calls, we need to do it
227 ourselves */
228 ret = fio_ime_get_file_size(td, f);
229 if (ret < 0) {
230 ime_native_close(f->fd);
231 td_verror(td, errno, "ime_get_file_size");
232 return 1;
233 }
234
235 desired_fs = f->io_size + f->file_offset;
236 if (td_write(td)) {
237 dprint(FD_FILE, "Laying out file %s%s\n",
238 DEFAULT_IME_FILE_PREFIX, f->file_name);
239 if (!td->o.create_on_open &&
240 f->real_file_size < desired_fs &&
241 ime_native_ftruncate(f->fd, desired_fs) < 0) {
242 ime_native_close(f->fd);
243 td_verror(td, errno, "ime_native_ftruncate");
244 return 1;
245 }
246 if (f->real_file_size < desired_fs)
247 f->real_file_size = desired_fs;
38c1392e 248 } else if (td_read(td) && f->real_file_size < desired_fs) {
a40e7a59
GB
249 ime_native_close(f->fd);
250 log_err("error: can't read %lu bytes from file with "
251 "%lu bytes\n", desired_fs, f->real_file_size);
252 return 1;
253 }
254
255 return 0;
256}
257
258static int fio_ime_close_file(struct thread_data fio_unused *td, struct fio_file *f)
259{
260 int ret = 0;
261
262 dprint(FD_FILE, "fd close %s\n", f->file_name);
263
264 if (ime_native_close(f->fd) < 0)
265 ret = errno;
266
267 f->fd = -1;
268 return ret;
269}
270
271static int fio_ime_unlink_file(struct thread_data *td, struct fio_file *f)
272{
38c1392e 273 char *ime_filename = fio_set_ime_filename(f->file_name);
a40e7a59
GB
274 int ret;
275
a40e7a59
GB
276 if (ime_filename == NULL)
277 return 1;
a40e7a59 278
38c1392e 279 ret = unlink(ime_filename);
a40e7a59
GB
280 return ret < 0 ? errno : 0;
281}
282
283static struct io_u *fio_ime_event(struct thread_data *td, int event)
284{
285 struct ime_data *ime_d = td->io_ops_data;
38c1392e 286
a40e7a59
GB
287 return ime_d->event_io_us[event];
288}
289
290/* Setup file used to replace get_file_sizes when settin up the file.
291 Instead we will set real_file_sie to 0 for each file. This way we
292 can avoid calling ime_native_init before the forks are created. */
293static int fio_ime_setup(struct thread_data *td)
294{
295 struct fio_file *f;
296 unsigned int i;
297
298 for_each_file(td, f, i) {
299 dprint(FD_FILE, "setup: set file size to 0 for %p/%d/%s\n",
300 f, i, f->file_name);
301 f->real_file_size = 0;
302 }
303
304 return 0;
305}
306
307static int fio_ime_engine_init(struct thread_data *td)
308{
309 struct fio_file *f;
310 unsigned int i;
311
312 dprint(FD_IO, "ime engine init\n");
313 if (fio_ime_is_initialized && !td->o.use_thread) {
314 log_err("Warning: something might go wrong. Not all threads/forks were"
315 " created before the FIO jobs were initialized.\n");
316 }
317
318 ime_native_init();
319 fio_ime_is_initialized = true;
320
321 /* We have to temporarily set real_file_size so that
322 FIO can initialize properly. It will be corrected
323 on file open. */
324 for_each_file(td, f, i)
325 f->real_file_size = f->io_size + f->file_offset;
326
327 return 0;
328}
329
330static void fio_ime_engine_finalize(struct thread_data *td)
331{
332 /* Only finalize IME when using forks */
333 if (!td->o.use_thread) {
334 if (ime_native_finalize() < 0)
335 log_err("error in ime_native_finalize\n");
336 fio_ime_is_initialized = false;
337 }
338}
339
340
341/**************************************************************
342 * Private functions for blocking IOs
343 * (without iovecs)
344 **************************************************************/
345
346/* Notice: this function comes from the sync engine */
347/* It is used by the commit function to return a proper code and fill
348 some attributes in the io_u used for the IO. */
349static int fio_ime_psync_end(struct thread_data *td, struct io_u *io_u, ssize_t ret)
350{
351 if (ret != (ssize_t) io_u->xfer_buflen) {
352 if (ret >= 0) {
353 io_u->resid = io_u->xfer_buflen - ret;
354 io_u->error = 0;
355 return FIO_Q_COMPLETED;
356 } else
357 io_u->error = errno;
358 }
359
360 if (io_u->error) {
361 io_u_log_error(td, io_u);
362 td_verror(td, io_u->error, "xfer");
363 }
364
365 return FIO_Q_COMPLETED;
366}
367
368static enum fio_q_status fio_ime_psync_queue(struct thread_data *td,
369 struct io_u *io_u)
370{
371 struct fio_file *f = io_u->file;
372 ssize_t ret;
373
374 fio_ro_check(td, io_u);
375
376 if (io_u->ddir == DDIR_READ)
377 ret = ime_native_pread(f->fd, io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
378 else if (io_u->ddir == DDIR_WRITE)
379 ret = ime_native_pwrite(f->fd, io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
380 else if (io_u->ddir == DDIR_SYNC)
381 ret = ime_native_fsync(f->fd);
382 else {
383 ret = io_u->xfer_buflen;
384 io_u->error = EINVAL;
385 }
386
387 return fio_ime_psync_end(td, io_u, ret);
388}
389
390
391/**************************************************************
392 * Private functions for blocking IOs
393 * (with iovecs)
394 **************************************************************/
395
396static bool fio_ime_psyncv_can_queue(struct ime_data *ime_d, struct io_u *io_u)
397{
398 /* We can only queue if:
399 - There are no queued iovecs
400 - Or if there is at least one:
401 - There must be no event waiting for retrieval
402 - The offsets must be contiguous
403 - The ddir and fd must be the same */
404 return (ime_d->queued == 0 || (
405 ime_d->events == 0 &&
406 ime_d->last_offset == io_u->offset &&
407 ime_d->sioreq->ddir == io_u->ddir &&
408 ime_d->sioreq->fd == io_u->file->fd));
409}
410
411/* Before using this function, we should have already
412 ensured that the queue is not full */
413static void fio_ime_psyncv_enqueue(struct ime_data *ime_d, struct io_u *io_u)
414{
415 struct imesio_req *ioreq = ime_d->sioreq;
416 struct iovec *iov = &ime_d->iovecs[ime_d->head];
417
418 iov->iov_base = io_u->xfer_buf;
419 iov->iov_len = io_u->xfer_buflen;
420
421 if (ime_d->queued == 0) {
422 ioreq->offset = io_u->offset;
423 ioreq->ddir = io_u->ddir;
424 ioreq->fd = io_u->file->fd;
425 }
426
427 ime_d->io_us[ime_d->head] = io_u;
428 ime_d->last_offset = io_u->offset + io_u->xfer_buflen;
429 fio_ime_queue_incr(ime_d);
430}
431
432/* Tries to queue an IO. It will fail if the IO can't be appended to the
433 current request or if the current request has been committed but not
434 yet retrieved by get_events. */
435static enum fio_q_status fio_ime_psyncv_queue(struct thread_data *td,
436 struct io_u *io_u)
437{
438 struct ime_data *ime_d = td->io_ops_data;
439
440 fio_ro_check(td, io_u);
441
442 if (ime_d->queued == ime_d->depth)
443 return FIO_Q_BUSY;
444
445 if (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_WRITE) {
446 if (!fio_ime_psyncv_can_queue(ime_d, io_u))
447 return FIO_Q_BUSY;
448
449 dprint(FD_IO, "queue: ddir=%d at %u commit=%u queued=%u events=%u\n",
450 io_u->ddir, ime_d->head, ime_d->cur_commit,
451 ime_d->queued, ime_d->events);
452 fio_ime_psyncv_enqueue(ime_d, io_u);
453 return FIO_Q_QUEUED;
38c1392e 454 } else if (io_u->ddir == DDIR_SYNC) {
a40e7a59
GB
455 if (ime_native_fsync(io_u->file->fd) < 0) {
456 io_u->error = errno;
457 td_verror(td, io_u->error, "fsync");
458 }
459 return FIO_Q_COMPLETED;
460 } else {
461 io_u->error = EINVAL;
462 td_verror(td, io_u->error, "wrong ddir");
463 return FIO_Q_COMPLETED;
464 }
465}
466
467/* Notice: this function comes from the sync engine */
468/* It is used by the commit function to return a proper code and fill
469 some attributes in the io_us appended to the current request. */
470static int fio_ime_psyncv_end(struct thread_data *td, ssize_t bytes)
471{
472 struct ime_data *ime_d = td->io_ops_data;
473 struct io_u *io_u;
474 unsigned int i;
475 int err = errno;
476
477 for (i = 0; i < ime_d->queued; i++) {
478 io_u = ime_d->io_us[i];
479
480 if (bytes == -1)
481 io_u->error = err;
482 else {
483 unsigned int this_io;
484
485 this_io = bytes;
486 if (this_io > io_u->xfer_buflen)
487 this_io = io_u->xfer_buflen;
488
489 io_u->resid = io_u->xfer_buflen - this_io;
490 io_u->error = 0;
491 bytes -= this_io;
492 }
493 }
494
495 if (bytes == -1) {
496 td_verror(td, err, "xfer psyncv");
497 return -err;
498 }
499
500 return 0;
501}
502
503/* Commits the current request by calling ime_native (with one or several
504 iovecs). After this commit, the corresponding events (one per iovec)
505 can be retrieved by get_events. */
506static int fio_ime_psyncv_commit(struct thread_data *td)
507{
508 struct ime_data *ime_d = td->io_ops_data;
509 struct imesio_req *ioreq;
510 int ret = 0;
511
512 /* Exit if there are no (new) events to commit
513 or if the previous committed event haven't been retrieved */
514 if (!ime_d->queued || ime_d->events)
515 return 0;
516
517 ioreq = ime_d->sioreq;
518 ime_d->events = ime_d->queued;
519 if (ioreq->ddir == DDIR_READ)
520 ret = ime_native_preadv(ioreq->fd, ime_d->iovecs, ime_d->queued, ioreq->offset);
521 else
522 ret = ime_native_pwritev(ioreq->fd, ime_d->iovecs, ime_d->queued, ioreq->offset);
523
524 dprint(FD_IO, "committed %d iovecs\n", ime_d->queued);
525
526 return fio_ime_psyncv_end(td, ret);
527}
528
529static int fio_ime_psyncv_getevents(struct thread_data *td, unsigned int min,
530 unsigned int max, const struct timespec *t)
531{
532 struct ime_data *ime_d = td->io_ops_data;
533 struct io_u *io_u;
534 int events = 0;
535 unsigned int count;
536
537 if (ime_d->events) {
538 for (count = 0; count < ime_d->events; count++) {
539 io_u = ime_d->io_us[count];
540 ime_d->event_io_us[events] = io_u;
541 events++;
542 }
543 fio_ime_queue_reset(ime_d);
544 }
545
546 dprint(FD_IO, "getevents(%u,%u) ret=%d queued=%u events=%u\n",
547 min, max, events, ime_d->queued, ime_d->events);
548 return events;
549}
550
551static int fio_ime_psyncv_init(struct thread_data *td)
552{
553 struct ime_data *ime_d;
554
555 if (fio_ime_engine_init(td) < 0)
556 return 1;
557
558 ime_d = calloc(1, sizeof(*ime_d));
559
560 ime_d->sioreq = malloc(sizeof(struct imesio_req));
561 ime_d->iovecs = malloc(td->o.iodepth * sizeof(struct iovec));
562 ime_d->io_us = malloc(2 * td->o.iodepth * sizeof(struct io_u *));
563 ime_d->event_io_us = ime_d->io_us + td->o.iodepth;
564
565 ime_d->depth = td->o.iodepth;
566
567 td->io_ops_data = ime_d;
568 return 0;
569}
570
571static void fio_ime_psyncv_clean(struct thread_data *td)
572{
573 struct ime_data *ime_d = td->io_ops_data;
574
575 if (ime_d) {
576 free(ime_d->sioreq);
577 free(ime_d->iovecs);
578 free(ime_d->io_us);
579 free(ime_d);
580 td->io_ops_data = NULL;
581 }
582
583 fio_ime_engine_finalize(td);
584}
585
586
587/**************************************************************
588 * Private functions for non-blocking IOs
589 *
590 **************************************************************/
591
592void fio_ime_aio_complete_cb (struct ime_aiocb *aiocb, int err,
593 ssize_t bytes)
594{
595 struct imeaio_req *ioreq = (struct imeaio_req *) aiocb->user_context;
596
597 pthread_mutex_lock(&ioreq->status_mutex);
598 ioreq->status = err == 0 ? bytes : FIO_IME_REQ_ERROR;
599 pthread_mutex_unlock(&ioreq->status_mutex);
600
601 pthread_cond_signal(&ioreq->cond_endio);
602}
603
604static bool fio_ime_aio_can_queue (struct ime_data *ime_d, struct io_u *io_u)
605{
606 /* So far we can queue in any case. */
607 return true;
608}
609static bool fio_ime_aio_can_append (struct ime_data *ime_d, struct io_u *io_u)
610{
611 /* We can only append if:
612 - The iovecs will be contiguous in the array
613 - There is already a queued iovec
614 - The offsets are contiguous
615 - The ddir and fs are the same */
616 return (ime_d->head != 0 &&
617 ime_d->queued - ime_d->events > 0 &&
618 ime_d->last_offset == io_u->offset &&
619 ime_d->last_req->ddir == io_u->ddir &&
620 ime_d->last_req->iocb.fd == io_u->file->fd);
621}
622
623/* Before using this function, we should have already
624 ensured that the queue is not full */
625static void fio_ime_aio_enqueue(struct ime_data *ime_d, struct io_u *io_u)
626{
627 struct imeaio_req *ioreq = &ime_d->aioreqs[ime_d->head];
628 struct ime_aiocb *iocb = &ioreq->iocb;
629 struct iovec *iov = &ime_d->iovecs[ime_d->head];
630
631 iov->iov_base = io_u->xfer_buf;
632 iov->iov_len = io_u->xfer_buflen;
633
634 if (fio_ime_aio_can_append(ime_d, io_u))
635 ime_d->last_req->iocb.iovcnt++;
636 else {
637 ioreq->status = FIO_IME_IN_PROGRESS;
638 ioreq->ddir = io_u->ddir;
639 ime_d->last_req = ioreq;
640
641 iocb->complete_cb = &fio_ime_aio_complete_cb;
642 iocb->fd = io_u->file->fd;
643 iocb->file_offset = io_u->offset;
644 iocb->iov = iov;
645 iocb->iovcnt = 1;
646 iocb->flags = 0;
647 iocb->user_context = (intptr_t) ioreq;
648 }
649
650 ime_d->io_us[ime_d->head] = io_u;
651 ime_d->last_offset = io_u->offset + io_u->xfer_buflen;
652 fio_ime_queue_incr(ime_d);
653}
654
655/* Tries to queue an IO. It will create a new request if the IO can't be
656 appended to the current request. It will fail if the queue can't contain
657 any more io_u/iovec. In this case, commit and then get_events need to be
658 called. */
659static enum fio_q_status fio_ime_aio_queue(struct thread_data *td,
660 struct io_u *io_u)
661{
662 struct ime_data *ime_d = td->io_ops_data;
663
664 fio_ro_check(td, io_u);
665
666 dprint(FD_IO, "queue: ddir=%d at %u commit=%u queued=%u events=%u\n",
667 io_u->ddir, ime_d->head, ime_d->cur_commit,
668 ime_d->queued, ime_d->events);
669
670 if (ime_d->queued == ime_d->depth)
671 return FIO_Q_BUSY;
672
673 if (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_WRITE) {
674 if (!fio_ime_aio_can_queue(ime_d, io_u))
675 return FIO_Q_BUSY;
676
677 fio_ime_aio_enqueue(ime_d, io_u);
678 return FIO_Q_QUEUED;
38c1392e 679 } else if (io_u->ddir == DDIR_SYNC) {
a40e7a59
GB
680 if (ime_native_fsync(io_u->file->fd) < 0) {
681 io_u->error = errno;
682 td_verror(td, io_u->error, "fsync");
683 }
684 return FIO_Q_COMPLETED;
685 } else {
686 io_u->error = EINVAL;
687 td_verror(td, io_u->error, "wrong ddir");
688 return FIO_Q_COMPLETED;
689 }
690}
691
692static int fio_ime_aio_commit(struct thread_data *td)
693{
694 struct ime_data *ime_d = td->io_ops_data;
695 struct imeaio_req *ioreq;
696 int ret = 0;
697
698 /* Loop while there are events to commit */
699 while (ime_d->queued - ime_d->events) {
700 ioreq = &ime_d->aioreqs[ime_d->cur_commit];
701 if (ioreq->ddir == DDIR_READ)
702 ret = ime_native_aio_read(&ioreq->iocb);
703 else
704 ret = ime_native_aio_write(&ioreq->iocb);
705
706 fio_ime_queue_commit(ime_d, ioreq->iocb.iovcnt);
707
708 /* fio needs a negative error code */
709 if (ret < 0) {
710 ioreq->status = FIO_IME_REQ_ERROR;
711 return -errno;
712 }
713
714 io_u_mark_submit(td, ioreq->iocb.iovcnt);
715 dprint(FD_IO, "committed %d iovecs commit=%u queued=%u events=%u\n",
716 ioreq->iocb.iovcnt, ime_d->cur_commit,
717 ime_d->queued, ime_d->events);
718 }
719
720 return 0;
721}
722
723static int fio_ime_aio_getevents(struct thread_data *td, unsigned int min,
724 unsigned int max, const struct timespec *t)
725{
726 struct ime_data *ime_d = td->io_ops_data;
727 struct imeaio_req *ioreq;
728 struct io_u *io_u;
729 int events = 0;
730 unsigned int count;
731 ssize_t bytes;
732
733 while (ime_d->events) {
734 ioreq = &ime_d->aioreqs[ime_d->tail];
735
736 /* Break if we already got events, and if we will
737 exceed max if we append the next events */
738 if (events && events + ioreq->iocb.iovcnt > max)
739 break;
740
741 if (ioreq->status != FIO_IME_IN_PROGRESS) {
742
743 bytes = ioreq->status;
744 for (count = 0; count < ioreq->iocb.iovcnt; count++) {
745 io_u = ime_d->io_us[ime_d->tail];
746 ime_d->event_io_us[events] = io_u;
747 events++;
748 fio_ime_queue_red(ime_d);
749
750 if (ioreq->status == FIO_IME_REQ_ERROR)
751 io_u->error = EIO;
752 else {
753 io_u->resid = bytes > io_u->xfer_buflen ?
754 0 : io_u->xfer_buflen - bytes;
755 io_u->error = 0;
756 bytes -= io_u->xfer_buflen - io_u->resid;
757 }
758 }
759 } else {
760 pthread_mutex_lock(&ioreq->status_mutex);
38c1392e 761 while (ioreq->status == FIO_IME_IN_PROGRESS)
a40e7a59 762 pthread_cond_wait(&ioreq->cond_endio, &ioreq->status_mutex);
a40e7a59
GB
763 pthread_mutex_unlock(&ioreq->status_mutex);
764 }
765
766 }
767
768 dprint(FD_IO, "getevents(%u,%u) ret=%d queued=%u events=%u\n", min, max,
769 events, ime_d->queued, ime_d->events);
770 return events;
771}
772
773static int fio_ime_aio_init(struct thread_data *td)
774{
775 struct ime_data *ime_d;
776 struct imeaio_req *ioreq;
777 unsigned int i;
778
779 if (fio_ime_engine_init(td) < 0)
780 return 1;
781
782 ime_d = calloc(1, sizeof(*ime_d));
783
784 ime_d->aioreqs = malloc(td->o.iodepth * sizeof(struct imeaio_req));
785 ime_d->iovecs = malloc(td->o.iodepth * sizeof(struct iovec));
786 ime_d->io_us = malloc(2 * td->o.iodepth * sizeof(struct io_u *));
787 ime_d->event_io_us = ime_d->io_us + td->o.iodepth;
788
789 ime_d->depth = td->o.iodepth;
790 for (i = 0; i < ime_d->depth; i++) {
791 ioreq = &ime_d->aioreqs[i];
792 pthread_cond_init(&ioreq->cond_endio, NULL);
793 pthread_mutex_init(&ioreq->status_mutex, NULL);
794 }
795
796 td->io_ops_data = ime_d;
797 return 0;
798}
799
800static void fio_ime_aio_clean(struct thread_data *td)
801{
802 struct ime_data *ime_d = td->io_ops_data;
803 struct imeaio_req *ioreq;
804 unsigned int i;
805
806 if (ime_d) {
807 for (i = 0; i < ime_d->depth; i++) {
808 ioreq = &ime_d->aioreqs[i];
809 pthread_cond_destroy(&ioreq->cond_endio);
810 pthread_mutex_destroy(&ioreq->status_mutex);
811 }
812 free(ime_d->aioreqs);
813 free(ime_d->iovecs);
814 free(ime_d->io_us);
815 free(ime_d);
816 td->io_ops_data = NULL;
817 }
818
819 fio_ime_engine_finalize(td);
820}
821
822
823/**************************************************************
824 * IO engines definitions
825 *
826 **************************************************************/
827
828/* The FIO_DISKLESSIO flag used for these engines is necessary to prevent
829 FIO from using POSIX calls. See fio_ime_open_file for more details. */
830
831static struct ioengine_ops ioengine_prw = {
832 .name = "ime_psync",
833 .version = FIO_IOOPS_VERSION,
834 .setup = fio_ime_setup,
835 .init = fio_ime_engine_init,
836 .cleanup = fio_ime_engine_finalize,
837 .queue = fio_ime_psync_queue,
838 .open_file = fio_ime_open_file,
839 .close_file = fio_ime_close_file,
840 .get_file_size = fio_ime_get_file_size,
841 .unlink_file = fio_ime_unlink_file,
842 .flags = FIO_SYNCIO | FIO_DISKLESSIO,
843};
844
845static struct ioengine_ops ioengine_pvrw = {
846 .name = "ime_psyncv",
847 .version = FIO_IOOPS_VERSION,
848 .setup = fio_ime_setup,
849 .init = fio_ime_psyncv_init,
850 .cleanup = fio_ime_psyncv_clean,
851 .queue = fio_ime_psyncv_queue,
852 .commit = fio_ime_psyncv_commit,
853 .getevents = fio_ime_psyncv_getevents,
854 .event = fio_ime_event,
855 .open_file = fio_ime_open_file,
856 .close_file = fio_ime_close_file,
857 .get_file_size = fio_ime_get_file_size,
858 .unlink_file = fio_ime_unlink_file,
859 .flags = FIO_SYNCIO | FIO_DISKLESSIO,
860};
861
862static struct ioengine_ops ioengine_aio = {
863 .name = "ime_aio",
864 .version = FIO_IOOPS_VERSION,
865 .setup = fio_ime_setup,
866 .init = fio_ime_aio_init,
867 .cleanup = fio_ime_aio_clean,
868 .queue = fio_ime_aio_queue,
869 .commit = fio_ime_aio_commit,
870 .getevents = fio_ime_aio_getevents,
871 .event = fio_ime_event,
872 .open_file = fio_ime_open_file,
873 .close_file = fio_ime_close_file,
874 .get_file_size = fio_ime_get_file_size,
875 .unlink_file = fio_ime_unlink_file,
876 .flags = FIO_DISKLESSIO,
877};
878
879static void fio_init fio_ime_register(void)
880{
881 register_ioengine(&ioengine_prw);
882 register_ioengine(&ioengine_pvrw);
883 register_ioengine(&ioengine_aio);
884}
885
886static void fio_exit fio_ime_unregister(void)
887{
888 unregister_ioengine(&ioengine_prw);
889 unregister_ioengine(&ioengine_pvrw);
890 unregister_ioengine(&ioengine_aio);
38c1392e
JA
891
892 if (fio_ime_is_initialized && ime_native_finalize() < 0)
a40e7a59 893 log_err("Warning: IME did not finalize properly\n");
a40e7a59 894}