extend --sync to allow {sync,dsync,0,1}, to support O_DSYNC
[fio.git] / engines / ime.c
CommitLineData
a40e7a59
GB
1/*
2 * FIO engines for DDN's Infinite Memory Engine.
3 * This file defines 3 engines: ime_psync, ime_psyncv, and ime_aio
4 *
5 * Copyright (C) 2018 DataDirect Networks. All rights reserved.
6 *
7 * This program is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU General Public License,
9 * version 2 as published by the Free Software Foundation..
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 */
16
17/*
18 * Some details about the new engines are given below:
19 *
20 *
21 * ime_psync:
22 * Most basic engine that issues calls to ime_native whenever an IO is queued.
23 *
24 * ime_psyncv:
25 * This engine tries to queue the IOs (by creating iovecs) if asked by FIO (via
26 * iodepth_batch). It refuses to queue when the iovecs can't be appended, and
27 * waits for FIO to issue a commit. After a call to commit and get_events, new
28 * IOs can be queued.
29 *
30 * ime_aio:
31 * This engine tries to queue the IOs (by creating iovecs) if asked by FIO (via
32 * iodepth_batch). When the iovecs can't be appended to the current request, a
33 * new request for IME is created. These requests will be issued to IME when
34 * commit is called. Contrary to ime_psyncv, there can be several requests at
35 * once. We don't need to wait for a request to terminate before creating a new
36 * one.
37 */
38
39#include <stdio.h>
40#include <stdlib.h>
41#include <errno.h>
42#include <linux/limits.h>
43#include <ime_native.h>
44
45#include "../fio.h"
46
47
48/**************************************************************
49 * Types and constants definitions
50 *
51 **************************************************************/
52
53/* define constants for async IOs */
54#define FIO_IME_IN_PROGRESS -1
55#define FIO_IME_REQ_ERROR -2
56
57/* This flag is used when some jobs were created using threads. In that
58 case, IME can't be finalized in the engine-specific cleanup function,
59 because other threads might still use IME. Instead, IME is finalized
60 in the destructor (see fio_ime_unregister), only when the flag
61 fio_ime_is_initialized is true (which means at least one thread has
62 initialized IME). */
63static bool fio_ime_is_initialized = false;
64
65struct imesio_req {
66 int fd; /* File descriptor */
67 enum fio_ddir ddir; /* Type of IO (read or write) */
68 off_t offset; /* File offset */
69};
70struct imeaio_req {
71 struct ime_aiocb iocb; /* IME aio request */
72 ssize_t status; /* Status of the IME request */
73 enum fio_ddir ddir; /* Type of IO (read or write) */
74 pthread_cond_t cond_endio; /* Condition var to notify FIO */
75 pthread_mutex_t status_mutex; /* Mutex for cond_endio */
76};
77
78/* This structure will be used for 2 engines: ime_psyncv and ime_aio */
79struct ime_data {
80 union {
81 struct imeaio_req *aioreqs; /* array of aio requests */
82 struct imesio_req *sioreq; /* pointer to the only syncio request */
83 };
84 struct iovec *iovecs; /* array of queued iovecs */
85 struct io_u **io_us; /* array of queued io_u pointers */
86 struct io_u **event_io_us; /* array of the events retieved afer get_events*/
87 unsigned int queued; /* iovecs/io_us in the queue */
88 unsigned int events; /* number of committed iovecs/io_us */
89
90 /* variables used to implement a "ring" queue */
91 unsigned int depth; /* max entries in the queue */
92 unsigned int head; /* index used to append */
93 unsigned int tail; /* index used to pop */
94 unsigned int cur_commit; /* index of the first uncommitted req */
95
96 /* offset used by the last iovec (used to check if the iovecs can be appended)*/
97 unsigned long long last_offset;
98
99 /* The variables below are used for aio only */
100 struct imeaio_req *last_req; /* last request awaiting committing */
101};
102
103
104/**************************************************************
105 * Private functions for queueing/unqueueing
106 *
107 **************************************************************/
108
109static void fio_ime_queue_incr (struct ime_data *ime_d)
110{
111 ime_d->head = (ime_d->head + 1) % ime_d->depth;
112 ime_d->queued++;
113}
38c1392e 114
a40e7a59
GB
115static void fio_ime_queue_red (struct ime_data *ime_d)
116{
117 ime_d->tail = (ime_d->tail + 1) % ime_d->depth;
118 ime_d->queued--;
119 ime_d->events--;
120}
38c1392e 121
a40e7a59
GB
122static void fio_ime_queue_commit (struct ime_data *ime_d, int iovcnt)
123{
124 ime_d->cur_commit = (ime_d->cur_commit + iovcnt) % ime_d->depth;
125 ime_d->events += iovcnt;
126}
38c1392e 127
a40e7a59
GB
128static void fio_ime_queue_reset (struct ime_data *ime_d)
129{
130 ime_d->head = 0;
131 ime_d->tail = 0;
132 ime_d->cur_commit = 0;
133 ime_d->queued = 0;
134 ime_d->events = 0;
135}
136
a40e7a59
GB
137/**************************************************************
138 * General IME functions
139 * (needed for both sync and async IOs)
140 **************************************************************/
141
142static char *fio_set_ime_filename(char* filename)
143{
144 static __thread char ime_filename[PATH_MAX];
38c1392e
JA
145 int ret;
146
147 ret = snprintf(ime_filename, PATH_MAX, "%s%s", DEFAULT_IME_FILE_PREFIX, filename);
148 if (ret < PATH_MAX)
a40e7a59 149 return ime_filename;
38c1392e
JA
150
151 return NULL;
a40e7a59
GB
152}
153
154static int fio_ime_get_file_size(struct thread_data *td, struct fio_file *f)
155{
156 struct stat buf;
157 int ret;
158 char *ime_filename;
159
160 dprint(FD_FILE, "get file size %s\n", f->file_name);
161
162 ime_filename = fio_set_ime_filename(f->file_name);
163 if (ime_filename == NULL)
164 return 1;
165 ret = ime_native_stat(ime_filename, &buf);
166 if (ret == -1) {
167 td_verror(td, errno, "fstat");
168 return 1;
169 }
170
171 f->real_file_size = buf.st_size;
172 return 0;
173}
174
175/* This functions mimics the generic_file_open function, but issues
176 IME native calls instead of POSIX calls. */
177static int fio_ime_open_file(struct thread_data *td, struct fio_file *f)
178{
179 int flags = 0;
180 int ret;
181 uint64_t desired_fs;
182 char *ime_filename;
183
184 dprint(FD_FILE, "fd open %s\n", f->file_name);
185
186 if (td_trim(td)) {
187 td_verror(td, EINVAL, "IME does not support TRIM operation");
188 return 1;
189 }
190
191 if (td->o.oatomic) {
192 td_verror(td, EINVAL, "IME does not support atomic IO");
193 return 1;
194 }
195 if (td->o.odirect)
196 flags |= O_DIRECT;
eb9f8d7f 197 flags |= td->o.sync_io;
a40e7a59
GB
198 if (td->o.create_on_open && td->o.allow_create)
199 flags |= O_CREAT;
200
201 if (td_write(td)) {
202 if (!read_only)
203 flags |= O_RDWR;
204
205 if (td->o.allow_create)
206 flags |= O_CREAT;
38c1392e 207 } else if (td_read(td)) {
a40e7a59 208 flags |= O_RDONLY;
38c1392e 209 } else {
a40e7a59
GB
210 /* We should never go here. */
211 td_verror(td, EINVAL, "Unsopported open mode");
212 return 1;
213 }
214
215 ime_filename = fio_set_ime_filename(f->file_name);
216 if (ime_filename == NULL)
217 return 1;
218 f->fd = ime_native_open(ime_filename, flags, 0600);
219 if (f->fd == -1) {
220 char buf[FIO_VERROR_SIZE];
221 int __e = errno;
222
223 snprintf(buf, sizeof(buf), "open(%s)", f->file_name);
224 td_verror(td, __e, buf);
225 return 1;
226 }
227
228 /* Now we need to make sure the real file size is sufficient for FIO
229 to do its things. This is normally done before the file open function
230 is called, but because FIO would use POSIX calls, we need to do it
231 ourselves */
232 ret = fio_ime_get_file_size(td, f);
233 if (ret < 0) {
234 ime_native_close(f->fd);
235 td_verror(td, errno, "ime_get_file_size");
236 return 1;
237 }
238
239 desired_fs = f->io_size + f->file_offset;
240 if (td_write(td)) {
241 dprint(FD_FILE, "Laying out file %s%s\n",
242 DEFAULT_IME_FILE_PREFIX, f->file_name);
243 if (!td->o.create_on_open &&
244 f->real_file_size < desired_fs &&
245 ime_native_ftruncate(f->fd, desired_fs) < 0) {
246 ime_native_close(f->fd);
247 td_verror(td, errno, "ime_native_ftruncate");
248 return 1;
249 }
250 if (f->real_file_size < desired_fs)
251 f->real_file_size = desired_fs;
38c1392e 252 } else if (td_read(td) && f->real_file_size < desired_fs) {
a40e7a59
GB
253 ime_native_close(f->fd);
254 log_err("error: can't read %lu bytes from file with "
255 "%lu bytes\n", desired_fs, f->real_file_size);
256 return 1;
257 }
258
259 return 0;
260}
261
262static int fio_ime_close_file(struct thread_data fio_unused *td, struct fio_file *f)
263{
264 int ret = 0;
265
266 dprint(FD_FILE, "fd close %s\n", f->file_name);
267
268 if (ime_native_close(f->fd) < 0)
269 ret = errno;
270
271 f->fd = -1;
272 return ret;
273}
274
275static int fio_ime_unlink_file(struct thread_data *td, struct fio_file *f)
276{
38c1392e 277 char *ime_filename = fio_set_ime_filename(f->file_name);
a40e7a59
GB
278 int ret;
279
a40e7a59
GB
280 if (ime_filename == NULL)
281 return 1;
a40e7a59 282
38c1392e 283 ret = unlink(ime_filename);
a40e7a59
GB
284 return ret < 0 ? errno : 0;
285}
286
287static struct io_u *fio_ime_event(struct thread_data *td, int event)
288{
289 struct ime_data *ime_d = td->io_ops_data;
38c1392e 290
a40e7a59
GB
291 return ime_d->event_io_us[event];
292}
293
294/* Setup file used to replace get_file_sizes when settin up the file.
295 Instead we will set real_file_sie to 0 for each file. This way we
296 can avoid calling ime_native_init before the forks are created. */
297static int fio_ime_setup(struct thread_data *td)
298{
299 struct fio_file *f;
300 unsigned int i;
301
302 for_each_file(td, f, i) {
303 dprint(FD_FILE, "setup: set file size to 0 for %p/%d/%s\n",
304 f, i, f->file_name);
305 f->real_file_size = 0;
306 }
307
308 return 0;
309}
310
311static int fio_ime_engine_init(struct thread_data *td)
312{
313 struct fio_file *f;
314 unsigned int i;
315
316 dprint(FD_IO, "ime engine init\n");
317 if (fio_ime_is_initialized && !td->o.use_thread) {
318 log_err("Warning: something might go wrong. Not all threads/forks were"
319 " created before the FIO jobs were initialized.\n");
320 }
321
322 ime_native_init();
323 fio_ime_is_initialized = true;
324
325 /* We have to temporarily set real_file_size so that
326 FIO can initialize properly. It will be corrected
327 on file open. */
328 for_each_file(td, f, i)
329 f->real_file_size = f->io_size + f->file_offset;
330
331 return 0;
332}
333
334static void fio_ime_engine_finalize(struct thread_data *td)
335{
336 /* Only finalize IME when using forks */
337 if (!td->o.use_thread) {
338 if (ime_native_finalize() < 0)
339 log_err("error in ime_native_finalize\n");
340 fio_ime_is_initialized = false;
341 }
342}
343
344
345/**************************************************************
346 * Private functions for blocking IOs
347 * (without iovecs)
348 **************************************************************/
349
350/* Notice: this function comes from the sync engine */
351/* It is used by the commit function to return a proper code and fill
352 some attributes in the io_u used for the IO. */
353static int fio_ime_psync_end(struct thread_data *td, struct io_u *io_u, ssize_t ret)
354{
355 if (ret != (ssize_t) io_u->xfer_buflen) {
356 if (ret >= 0) {
357 io_u->resid = io_u->xfer_buflen - ret;
358 io_u->error = 0;
359 return FIO_Q_COMPLETED;
360 } else
361 io_u->error = errno;
362 }
363
364 if (io_u->error) {
365 io_u_log_error(td, io_u);
366 td_verror(td, io_u->error, "xfer");
367 }
368
369 return FIO_Q_COMPLETED;
370}
371
372static enum fio_q_status fio_ime_psync_queue(struct thread_data *td,
373 struct io_u *io_u)
374{
375 struct fio_file *f = io_u->file;
376 ssize_t ret;
377
378 fio_ro_check(td, io_u);
379
380 if (io_u->ddir == DDIR_READ)
381 ret = ime_native_pread(f->fd, io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
382 else if (io_u->ddir == DDIR_WRITE)
383 ret = ime_native_pwrite(f->fd, io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
384 else if (io_u->ddir == DDIR_SYNC)
385 ret = ime_native_fsync(f->fd);
386 else {
387 ret = io_u->xfer_buflen;
388 io_u->error = EINVAL;
389 }
390
391 return fio_ime_psync_end(td, io_u, ret);
392}
393
394
395/**************************************************************
396 * Private functions for blocking IOs
397 * (with iovecs)
398 **************************************************************/
399
400static bool fio_ime_psyncv_can_queue(struct ime_data *ime_d, struct io_u *io_u)
401{
402 /* We can only queue if:
403 - There are no queued iovecs
404 - Or if there is at least one:
405 - There must be no event waiting for retrieval
406 - The offsets must be contiguous
407 - The ddir and fd must be the same */
408 return (ime_d->queued == 0 || (
409 ime_d->events == 0 &&
410 ime_d->last_offset == io_u->offset &&
411 ime_d->sioreq->ddir == io_u->ddir &&
412 ime_d->sioreq->fd == io_u->file->fd));
413}
414
415/* Before using this function, we should have already
416 ensured that the queue is not full */
417static void fio_ime_psyncv_enqueue(struct ime_data *ime_d, struct io_u *io_u)
418{
419 struct imesio_req *ioreq = ime_d->sioreq;
420 struct iovec *iov = &ime_d->iovecs[ime_d->head];
421
422 iov->iov_base = io_u->xfer_buf;
423 iov->iov_len = io_u->xfer_buflen;
424
425 if (ime_d->queued == 0) {
426 ioreq->offset = io_u->offset;
427 ioreq->ddir = io_u->ddir;
428 ioreq->fd = io_u->file->fd;
429 }
430
431 ime_d->io_us[ime_d->head] = io_u;
432 ime_d->last_offset = io_u->offset + io_u->xfer_buflen;
433 fio_ime_queue_incr(ime_d);
434}
435
436/* Tries to queue an IO. It will fail if the IO can't be appended to the
437 current request or if the current request has been committed but not
438 yet retrieved by get_events. */
439static enum fio_q_status fio_ime_psyncv_queue(struct thread_data *td,
440 struct io_u *io_u)
441{
442 struct ime_data *ime_d = td->io_ops_data;
443
444 fio_ro_check(td, io_u);
445
446 if (ime_d->queued == ime_d->depth)
447 return FIO_Q_BUSY;
448
449 if (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_WRITE) {
450 if (!fio_ime_psyncv_can_queue(ime_d, io_u))
451 return FIO_Q_BUSY;
452
453 dprint(FD_IO, "queue: ddir=%d at %u commit=%u queued=%u events=%u\n",
454 io_u->ddir, ime_d->head, ime_d->cur_commit,
455 ime_d->queued, ime_d->events);
456 fio_ime_psyncv_enqueue(ime_d, io_u);
457 return FIO_Q_QUEUED;
38c1392e 458 } else if (io_u->ddir == DDIR_SYNC) {
a40e7a59
GB
459 if (ime_native_fsync(io_u->file->fd) < 0) {
460 io_u->error = errno;
461 td_verror(td, io_u->error, "fsync");
462 }
463 return FIO_Q_COMPLETED;
464 } else {
465 io_u->error = EINVAL;
466 td_verror(td, io_u->error, "wrong ddir");
467 return FIO_Q_COMPLETED;
468 }
469}
470
471/* Notice: this function comes from the sync engine */
472/* It is used by the commit function to return a proper code and fill
473 some attributes in the io_us appended to the current request. */
474static int fio_ime_psyncv_end(struct thread_data *td, ssize_t bytes)
475{
476 struct ime_data *ime_d = td->io_ops_data;
477 struct io_u *io_u;
478 unsigned int i;
479 int err = errno;
480
481 for (i = 0; i < ime_d->queued; i++) {
482 io_u = ime_d->io_us[i];
483
484 if (bytes == -1)
485 io_u->error = err;
486 else {
487 unsigned int this_io;
488
489 this_io = bytes;
490 if (this_io > io_u->xfer_buflen)
491 this_io = io_u->xfer_buflen;
492
493 io_u->resid = io_u->xfer_buflen - this_io;
494 io_u->error = 0;
495 bytes -= this_io;
496 }
497 }
498
499 if (bytes == -1) {
500 td_verror(td, err, "xfer psyncv");
501 return -err;
502 }
503
504 return 0;
505}
506
507/* Commits the current request by calling ime_native (with one or several
508 iovecs). After this commit, the corresponding events (one per iovec)
509 can be retrieved by get_events. */
510static int fio_ime_psyncv_commit(struct thread_data *td)
511{
512 struct ime_data *ime_d = td->io_ops_data;
513 struct imesio_req *ioreq;
514 int ret = 0;
515
516 /* Exit if there are no (new) events to commit
517 or if the previous committed event haven't been retrieved */
518 if (!ime_d->queued || ime_d->events)
519 return 0;
520
521 ioreq = ime_d->sioreq;
522 ime_d->events = ime_d->queued;
523 if (ioreq->ddir == DDIR_READ)
524 ret = ime_native_preadv(ioreq->fd, ime_d->iovecs, ime_d->queued, ioreq->offset);
525 else
526 ret = ime_native_pwritev(ioreq->fd, ime_d->iovecs, ime_d->queued, ioreq->offset);
527
528 dprint(FD_IO, "committed %d iovecs\n", ime_d->queued);
529
530 return fio_ime_psyncv_end(td, ret);
531}
532
533static int fio_ime_psyncv_getevents(struct thread_data *td, unsigned int min,
534 unsigned int max, const struct timespec *t)
535{
536 struct ime_data *ime_d = td->io_ops_data;
537 struct io_u *io_u;
538 int events = 0;
539 unsigned int count;
540
541 if (ime_d->events) {
542 for (count = 0; count < ime_d->events; count++) {
543 io_u = ime_d->io_us[count];
544 ime_d->event_io_us[events] = io_u;
545 events++;
546 }
547 fio_ime_queue_reset(ime_d);
548 }
549
550 dprint(FD_IO, "getevents(%u,%u) ret=%d queued=%u events=%u\n",
551 min, max, events, ime_d->queued, ime_d->events);
552 return events;
553}
554
555static int fio_ime_psyncv_init(struct thread_data *td)
556{
557 struct ime_data *ime_d;
558
559 if (fio_ime_engine_init(td) < 0)
560 return 1;
561
562 ime_d = calloc(1, sizeof(*ime_d));
563
564 ime_d->sioreq = malloc(sizeof(struct imesio_req));
565 ime_d->iovecs = malloc(td->o.iodepth * sizeof(struct iovec));
566 ime_d->io_us = malloc(2 * td->o.iodepth * sizeof(struct io_u *));
567 ime_d->event_io_us = ime_d->io_us + td->o.iodepth;
568
569 ime_d->depth = td->o.iodepth;
570
571 td->io_ops_data = ime_d;
572 return 0;
573}
574
575static void fio_ime_psyncv_clean(struct thread_data *td)
576{
577 struct ime_data *ime_d = td->io_ops_data;
578
579 if (ime_d) {
580 free(ime_d->sioreq);
581 free(ime_d->iovecs);
582 free(ime_d->io_us);
583 free(ime_d);
584 td->io_ops_data = NULL;
585 }
586
587 fio_ime_engine_finalize(td);
588}
589
590
591/**************************************************************
592 * Private functions for non-blocking IOs
593 *
594 **************************************************************/
595
596void fio_ime_aio_complete_cb (struct ime_aiocb *aiocb, int err,
597 ssize_t bytes)
598{
599 struct imeaio_req *ioreq = (struct imeaio_req *) aiocb->user_context;
600
601 pthread_mutex_lock(&ioreq->status_mutex);
602 ioreq->status = err == 0 ? bytes : FIO_IME_REQ_ERROR;
603 pthread_mutex_unlock(&ioreq->status_mutex);
604
605 pthread_cond_signal(&ioreq->cond_endio);
606}
607
608static bool fio_ime_aio_can_queue (struct ime_data *ime_d, struct io_u *io_u)
609{
610 /* So far we can queue in any case. */
611 return true;
612}
613static bool fio_ime_aio_can_append (struct ime_data *ime_d, struct io_u *io_u)
614{
615 /* We can only append if:
616 - The iovecs will be contiguous in the array
617 - There is already a queued iovec
618 - The offsets are contiguous
619 - The ddir and fs are the same */
620 return (ime_d->head != 0 &&
621 ime_d->queued - ime_d->events > 0 &&
622 ime_d->last_offset == io_u->offset &&
623 ime_d->last_req->ddir == io_u->ddir &&
624 ime_d->last_req->iocb.fd == io_u->file->fd);
625}
626
627/* Before using this function, we should have already
628 ensured that the queue is not full */
629static void fio_ime_aio_enqueue(struct ime_data *ime_d, struct io_u *io_u)
630{
631 struct imeaio_req *ioreq = &ime_d->aioreqs[ime_d->head];
632 struct ime_aiocb *iocb = &ioreq->iocb;
633 struct iovec *iov = &ime_d->iovecs[ime_d->head];
634
635 iov->iov_base = io_u->xfer_buf;
636 iov->iov_len = io_u->xfer_buflen;
637
638 if (fio_ime_aio_can_append(ime_d, io_u))
639 ime_d->last_req->iocb.iovcnt++;
640 else {
641 ioreq->status = FIO_IME_IN_PROGRESS;
642 ioreq->ddir = io_u->ddir;
643 ime_d->last_req = ioreq;
644
645 iocb->complete_cb = &fio_ime_aio_complete_cb;
646 iocb->fd = io_u->file->fd;
647 iocb->file_offset = io_u->offset;
648 iocb->iov = iov;
649 iocb->iovcnt = 1;
650 iocb->flags = 0;
651 iocb->user_context = (intptr_t) ioreq;
652 }
653
654 ime_d->io_us[ime_d->head] = io_u;
655 ime_d->last_offset = io_u->offset + io_u->xfer_buflen;
656 fio_ime_queue_incr(ime_d);
657}
658
659/* Tries to queue an IO. It will create a new request if the IO can't be
660 appended to the current request. It will fail if the queue can't contain
661 any more io_u/iovec. In this case, commit and then get_events need to be
662 called. */
663static enum fio_q_status fio_ime_aio_queue(struct thread_data *td,
664 struct io_u *io_u)
665{
666 struct ime_data *ime_d = td->io_ops_data;
667
668 fio_ro_check(td, io_u);
669
670 dprint(FD_IO, "queue: ddir=%d at %u commit=%u queued=%u events=%u\n",
671 io_u->ddir, ime_d->head, ime_d->cur_commit,
672 ime_d->queued, ime_d->events);
673
674 if (ime_d->queued == ime_d->depth)
675 return FIO_Q_BUSY;
676
677 if (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_WRITE) {
678 if (!fio_ime_aio_can_queue(ime_d, io_u))
679 return FIO_Q_BUSY;
680
681 fio_ime_aio_enqueue(ime_d, io_u);
682 return FIO_Q_QUEUED;
38c1392e 683 } else if (io_u->ddir == DDIR_SYNC) {
a40e7a59
GB
684 if (ime_native_fsync(io_u->file->fd) < 0) {
685 io_u->error = errno;
686 td_verror(td, io_u->error, "fsync");
687 }
688 return FIO_Q_COMPLETED;
689 } else {
690 io_u->error = EINVAL;
691 td_verror(td, io_u->error, "wrong ddir");
692 return FIO_Q_COMPLETED;
693 }
694}
695
696static int fio_ime_aio_commit(struct thread_data *td)
697{
698 struct ime_data *ime_d = td->io_ops_data;
699 struct imeaio_req *ioreq;
700 int ret = 0;
701
702 /* Loop while there are events to commit */
703 while (ime_d->queued - ime_d->events) {
704 ioreq = &ime_d->aioreqs[ime_d->cur_commit];
705 if (ioreq->ddir == DDIR_READ)
706 ret = ime_native_aio_read(&ioreq->iocb);
707 else
708 ret = ime_native_aio_write(&ioreq->iocb);
709
710 fio_ime_queue_commit(ime_d, ioreq->iocb.iovcnt);
711
712 /* fio needs a negative error code */
713 if (ret < 0) {
714 ioreq->status = FIO_IME_REQ_ERROR;
715 return -errno;
716 }
717
718 io_u_mark_submit(td, ioreq->iocb.iovcnt);
719 dprint(FD_IO, "committed %d iovecs commit=%u queued=%u events=%u\n",
720 ioreq->iocb.iovcnt, ime_d->cur_commit,
721 ime_d->queued, ime_d->events);
722 }
723
724 return 0;
725}
726
727static int fio_ime_aio_getevents(struct thread_data *td, unsigned int min,
728 unsigned int max, const struct timespec *t)
729{
730 struct ime_data *ime_d = td->io_ops_data;
731 struct imeaio_req *ioreq;
732 struct io_u *io_u;
733 int events = 0;
734 unsigned int count;
735 ssize_t bytes;
736
737 while (ime_d->events) {
738 ioreq = &ime_d->aioreqs[ime_d->tail];
739
740 /* Break if we already got events, and if we will
741 exceed max if we append the next events */
742 if (events && events + ioreq->iocb.iovcnt > max)
743 break;
744
745 if (ioreq->status != FIO_IME_IN_PROGRESS) {
746
747 bytes = ioreq->status;
748 for (count = 0; count < ioreq->iocb.iovcnt; count++) {
749 io_u = ime_d->io_us[ime_d->tail];
750 ime_d->event_io_us[events] = io_u;
751 events++;
752 fio_ime_queue_red(ime_d);
753
754 if (ioreq->status == FIO_IME_REQ_ERROR)
755 io_u->error = EIO;
756 else {
757 io_u->resid = bytes > io_u->xfer_buflen ?
758 0 : io_u->xfer_buflen - bytes;
759 io_u->error = 0;
760 bytes -= io_u->xfer_buflen - io_u->resid;
761 }
762 }
763 } else {
764 pthread_mutex_lock(&ioreq->status_mutex);
38c1392e 765 while (ioreq->status == FIO_IME_IN_PROGRESS)
a40e7a59 766 pthread_cond_wait(&ioreq->cond_endio, &ioreq->status_mutex);
a40e7a59
GB
767 pthread_mutex_unlock(&ioreq->status_mutex);
768 }
769
770 }
771
772 dprint(FD_IO, "getevents(%u,%u) ret=%d queued=%u events=%u\n", min, max,
773 events, ime_d->queued, ime_d->events);
774 return events;
775}
776
777static int fio_ime_aio_init(struct thread_data *td)
778{
779 struct ime_data *ime_d;
780 struct imeaio_req *ioreq;
781 unsigned int i;
782
783 if (fio_ime_engine_init(td) < 0)
784 return 1;
785
786 ime_d = calloc(1, sizeof(*ime_d));
787
788 ime_d->aioreqs = malloc(td->o.iodepth * sizeof(struct imeaio_req));
789 ime_d->iovecs = malloc(td->o.iodepth * sizeof(struct iovec));
790 ime_d->io_us = malloc(2 * td->o.iodepth * sizeof(struct io_u *));
791 ime_d->event_io_us = ime_d->io_us + td->o.iodepth;
792
793 ime_d->depth = td->o.iodepth;
794 for (i = 0; i < ime_d->depth; i++) {
795 ioreq = &ime_d->aioreqs[i];
796 pthread_cond_init(&ioreq->cond_endio, NULL);
797 pthread_mutex_init(&ioreq->status_mutex, NULL);
798 }
799
800 td->io_ops_data = ime_d;
801 return 0;
802}
803
804static void fio_ime_aio_clean(struct thread_data *td)
805{
806 struct ime_data *ime_d = td->io_ops_data;
807 struct imeaio_req *ioreq;
808 unsigned int i;
809
810 if (ime_d) {
811 for (i = 0; i < ime_d->depth; i++) {
812 ioreq = &ime_d->aioreqs[i];
813 pthread_cond_destroy(&ioreq->cond_endio);
814 pthread_mutex_destroy(&ioreq->status_mutex);
815 }
816 free(ime_d->aioreqs);
817 free(ime_d->iovecs);
818 free(ime_d->io_us);
819 free(ime_d);
820 td->io_ops_data = NULL;
821 }
822
823 fio_ime_engine_finalize(td);
824}
825
826
827/**************************************************************
828 * IO engines definitions
829 *
830 **************************************************************/
831
832/* The FIO_DISKLESSIO flag used for these engines is necessary to prevent
833 FIO from using POSIX calls. See fio_ime_open_file for more details. */
834
835static struct ioengine_ops ioengine_prw = {
836 .name = "ime_psync",
837 .version = FIO_IOOPS_VERSION,
838 .setup = fio_ime_setup,
839 .init = fio_ime_engine_init,
840 .cleanup = fio_ime_engine_finalize,
841 .queue = fio_ime_psync_queue,
842 .open_file = fio_ime_open_file,
843 .close_file = fio_ime_close_file,
844 .get_file_size = fio_ime_get_file_size,
845 .unlink_file = fio_ime_unlink_file,
846 .flags = FIO_SYNCIO | FIO_DISKLESSIO,
847};
848
849static struct ioengine_ops ioengine_pvrw = {
850 .name = "ime_psyncv",
851 .version = FIO_IOOPS_VERSION,
852 .setup = fio_ime_setup,
853 .init = fio_ime_psyncv_init,
854 .cleanup = fio_ime_psyncv_clean,
855 .queue = fio_ime_psyncv_queue,
856 .commit = fio_ime_psyncv_commit,
857 .getevents = fio_ime_psyncv_getevents,
858 .event = fio_ime_event,
859 .open_file = fio_ime_open_file,
860 .close_file = fio_ime_close_file,
861 .get_file_size = fio_ime_get_file_size,
862 .unlink_file = fio_ime_unlink_file,
863 .flags = FIO_SYNCIO | FIO_DISKLESSIO,
864};
865
866static struct ioengine_ops ioengine_aio = {
867 .name = "ime_aio",
868 .version = FIO_IOOPS_VERSION,
869 .setup = fio_ime_setup,
870 .init = fio_ime_aio_init,
871 .cleanup = fio_ime_aio_clean,
872 .queue = fio_ime_aio_queue,
873 .commit = fio_ime_aio_commit,
874 .getevents = fio_ime_aio_getevents,
875 .event = fio_ime_event,
876 .open_file = fio_ime_open_file,
877 .close_file = fio_ime_close_file,
878 .get_file_size = fio_ime_get_file_size,
879 .unlink_file = fio_ime_unlink_file,
880 .flags = FIO_DISKLESSIO,
881};
882
883static void fio_init fio_ime_register(void)
884{
885 register_ioengine(&ioengine_prw);
886 register_ioengine(&ioengine_pvrw);
887 register_ioengine(&ioengine_aio);
888}
889
890static void fio_exit fio_ime_unregister(void)
891{
892 unregister_ioengine(&ioengine_prw);
893 unregister_ioengine(&ioengine_pvrw);
894 unregister_ioengine(&ioengine_aio);
38c1392e
JA
895
896 if (fio_ime_is_initialized && ime_native_finalize() < 0)
a40e7a59 897 log_err("Warning: IME did not finalize properly\n");
a40e7a59 898}