Merge branch 'ime-support' of https://github.com/DDNStorage/fio-public into ddn-ime
[fio.git] / engines / ime.c
CommitLineData
a40e7a59
GB
1/*
2 * FIO engines for DDN's Infinite Memory Engine.
3 * This file defines 3 engines: ime_psync, ime_psyncv, and ime_aio
4 *
5 * Copyright (C) 2018 DataDirect Networks. All rights reserved.
6 *
7 * This program is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU General Public License,
9 * version 2 as published by the Free Software Foundation..
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 */
16
17/*
18 * Some details about the new engines are given below:
19 *
20 *
21 * ime_psync:
22 * Most basic engine that issues calls to ime_native whenever an IO is queued.
23 *
24 * ime_psyncv:
25 * This engine tries to queue the IOs (by creating iovecs) if asked by FIO (via
26 * iodepth_batch). It refuses to queue when the iovecs can't be appended, and
27 * waits for FIO to issue a commit. After a call to commit and get_events, new
28 * IOs can be queued.
29 *
30 * ime_aio:
31 * This engine tries to queue the IOs (by creating iovecs) if asked by FIO (via
32 * iodepth_batch). When the iovecs can't be appended to the current request, a
33 * new request for IME is created. These requests will be issued to IME when
34 * commit is called. Contrary to ime_psyncv, there can be several requests at
35 * once. We don't need to wait for a request to terminate before creating a new
36 * one.
37 */
38
39#include <stdio.h>
40#include <stdlib.h>
41#include <errno.h>
42#include <linux/limits.h>
43#include <ime_native.h>
44
45#include "../fio.h"
46
47
48/**************************************************************
49 * Types and constants definitions
50 *
51 **************************************************************/
52
53/* define constants for async IOs */
54#define FIO_IME_IN_PROGRESS -1
55#define FIO_IME_REQ_ERROR -2
56
57/* This flag is used when some jobs were created using threads. In that
58 case, IME can't be finalized in the engine-specific cleanup function,
59 because other threads might still use IME. Instead, IME is finalized
60 in the destructor (see fio_ime_unregister), only when the flag
61 fio_ime_is_initialized is true (which means at least one thread has
62 initialized IME). */
63static bool fio_ime_is_initialized = false;
64
65struct imesio_req {
66 int fd; /* File descriptor */
67 enum fio_ddir ddir; /* Type of IO (read or write) */
68 off_t offset; /* File offset */
69};
70struct imeaio_req {
71 struct ime_aiocb iocb; /* IME aio request */
72 ssize_t status; /* Status of the IME request */
73 enum fio_ddir ddir; /* Type of IO (read or write) */
74 pthread_cond_t cond_endio; /* Condition var to notify FIO */
75 pthread_mutex_t status_mutex; /* Mutex for cond_endio */
76};
77
78/* This structure will be used for 2 engines: ime_psyncv and ime_aio */
79struct ime_data {
80 union {
81 struct imeaio_req *aioreqs; /* array of aio requests */
82 struct imesio_req *sioreq; /* pointer to the only syncio request */
83 };
84 struct iovec *iovecs; /* array of queued iovecs */
85 struct io_u **io_us; /* array of queued io_u pointers */
86 struct io_u **event_io_us; /* array of the events retieved afer get_events*/
87 unsigned int queued; /* iovecs/io_us in the queue */
88 unsigned int events; /* number of committed iovecs/io_us */
89
90 /* variables used to implement a "ring" queue */
91 unsigned int depth; /* max entries in the queue */
92 unsigned int head; /* index used to append */
93 unsigned int tail; /* index used to pop */
94 unsigned int cur_commit; /* index of the first uncommitted req */
95
96 /* offset used by the last iovec (used to check if the iovecs can be appended)*/
97 unsigned long long last_offset;
98
99 /* The variables below are used for aio only */
100 struct imeaio_req *last_req; /* last request awaiting committing */
101};
102
103
104/**************************************************************
105 * Private functions for queueing/unqueueing
106 *
107 **************************************************************/
108
109static void fio_ime_queue_incr (struct ime_data *ime_d)
110{
111 ime_d->head = (ime_d->head + 1) % ime_d->depth;
112 ime_d->queued++;
113}
114static void fio_ime_queue_red (struct ime_data *ime_d)
115{
116 ime_d->tail = (ime_d->tail + 1) % ime_d->depth;
117 ime_d->queued--;
118 ime_d->events--;
119}
120static void fio_ime_queue_commit (struct ime_data *ime_d, int iovcnt)
121{
122 ime_d->cur_commit = (ime_d->cur_commit + iovcnt) % ime_d->depth;
123 ime_d->events += iovcnt;
124}
125static void fio_ime_queue_reset (struct ime_data *ime_d)
126{
127 ime_d->head = 0;
128 ime_d->tail = 0;
129 ime_d->cur_commit = 0;
130 ime_d->queued = 0;
131 ime_d->events = 0;
132}
133
134
135/**************************************************************
136 * General IME functions
137 * (needed for both sync and async IOs)
138 **************************************************************/
139
140static char *fio_set_ime_filename(char* filename)
141{
142 static __thread char ime_filename[PATH_MAX];
143 if (snprintf(ime_filename, PATH_MAX, "%s%s", DEFAULT_IME_FILE_PREFIX, filename) < PATH_MAX)
144 return ime_filename;
145 else
146 return NULL;
147}
148
149static int fio_ime_get_file_size(struct thread_data *td, struct fio_file *f)
150{
151 struct stat buf;
152 int ret;
153 char *ime_filename;
154
155 dprint(FD_FILE, "get file size %s\n", f->file_name);
156
157 ime_filename = fio_set_ime_filename(f->file_name);
158 if (ime_filename == NULL)
159 return 1;
160 ret = ime_native_stat(ime_filename, &buf);
161 if (ret == -1) {
162 td_verror(td, errno, "fstat");
163 return 1;
164 }
165
166 f->real_file_size = buf.st_size;
167 return 0;
168}
169
170/* This functions mimics the generic_file_open function, but issues
171 IME native calls instead of POSIX calls. */
172static int fio_ime_open_file(struct thread_data *td, struct fio_file *f)
173{
174 int flags = 0;
175 int ret;
176 uint64_t desired_fs;
177 char *ime_filename;
178
179 dprint(FD_FILE, "fd open %s\n", f->file_name);
180
181 if (td_trim(td)) {
182 td_verror(td, EINVAL, "IME does not support TRIM operation");
183 return 1;
184 }
185
186 if (td->o.oatomic) {
187 td_verror(td, EINVAL, "IME does not support atomic IO");
188 return 1;
189 }
190 if (td->o.odirect)
191 flags |= O_DIRECT;
192 if (td->o.sync_io)
193 flags |= O_SYNC;
194 if (td->o.create_on_open && td->o.allow_create)
195 flags |= O_CREAT;
196
197 if (td_write(td)) {
198 if (!read_only)
199 flags |= O_RDWR;
200
201 if (td->o.allow_create)
202 flags |= O_CREAT;
203 }
204 else if (td_read(td)) {
205 flags |= O_RDONLY;
206 }
207 else {
208 /* We should never go here. */
209 td_verror(td, EINVAL, "Unsopported open mode");
210 return 1;
211 }
212
213 ime_filename = fio_set_ime_filename(f->file_name);
214 if (ime_filename == NULL)
215 return 1;
216 f->fd = ime_native_open(ime_filename, flags, 0600);
217 if (f->fd == -1) {
218 char buf[FIO_VERROR_SIZE];
219 int __e = errno;
220
221 snprintf(buf, sizeof(buf), "open(%s)", f->file_name);
222 td_verror(td, __e, buf);
223 return 1;
224 }
225
226 /* Now we need to make sure the real file size is sufficient for FIO
227 to do its things. This is normally done before the file open function
228 is called, but because FIO would use POSIX calls, we need to do it
229 ourselves */
230 ret = fio_ime_get_file_size(td, f);
231 if (ret < 0) {
232 ime_native_close(f->fd);
233 td_verror(td, errno, "ime_get_file_size");
234 return 1;
235 }
236
237 desired_fs = f->io_size + f->file_offset;
238 if (td_write(td)) {
239 dprint(FD_FILE, "Laying out file %s%s\n",
240 DEFAULT_IME_FILE_PREFIX, f->file_name);
241 if (!td->o.create_on_open &&
242 f->real_file_size < desired_fs &&
243 ime_native_ftruncate(f->fd, desired_fs) < 0) {
244 ime_native_close(f->fd);
245 td_verror(td, errno, "ime_native_ftruncate");
246 return 1;
247 }
248 if (f->real_file_size < desired_fs)
249 f->real_file_size = desired_fs;
250 }
251 else if (td_read(td) && f->real_file_size < desired_fs) {
252 ime_native_close(f->fd);
253 log_err("error: can't read %lu bytes from file with "
254 "%lu bytes\n", desired_fs, f->real_file_size);
255 return 1;
256 }
257
258 return 0;
259}
260
261static int fio_ime_close_file(struct thread_data fio_unused *td, struct fio_file *f)
262{
263 int ret = 0;
264
265 dprint(FD_FILE, "fd close %s\n", f->file_name);
266
267 if (ime_native_close(f->fd) < 0)
268 ret = errno;
269
270 f->fd = -1;
271 return ret;
272}
273
274static int fio_ime_unlink_file(struct thread_data *td, struct fio_file *f)
275{
276 int ret;
277
278 char *ime_filename = fio_set_ime_filename(f->file_name);
279 if (ime_filename == NULL)
280 return 1;
281 ret = unlink(ime_filename);
282
283 return ret < 0 ? errno : 0;
284}
285
286static struct io_u *fio_ime_event(struct thread_data *td, int event)
287{
288 struct ime_data *ime_d = td->io_ops_data;
289 return ime_d->event_io_us[event];
290}
291
292/* Setup file used to replace get_file_sizes when settin up the file.
293 Instead we will set real_file_sie to 0 for each file. This way we
294 can avoid calling ime_native_init before the forks are created. */
295static int fio_ime_setup(struct thread_data *td)
296{
297 struct fio_file *f;
298 unsigned int i;
299
300 for_each_file(td, f, i) {
301 dprint(FD_FILE, "setup: set file size to 0 for %p/%d/%s\n",
302 f, i, f->file_name);
303 f->real_file_size = 0;
304 }
305
306 return 0;
307}
308
309static int fio_ime_engine_init(struct thread_data *td)
310{
311 struct fio_file *f;
312 unsigned int i;
313
314 dprint(FD_IO, "ime engine init\n");
315 if (fio_ime_is_initialized && !td->o.use_thread) {
316 log_err("Warning: something might go wrong. Not all threads/forks were"
317 " created before the FIO jobs were initialized.\n");
318 }
319
320 ime_native_init();
321 fio_ime_is_initialized = true;
322
323 /* We have to temporarily set real_file_size so that
324 FIO can initialize properly. It will be corrected
325 on file open. */
326 for_each_file(td, f, i)
327 f->real_file_size = f->io_size + f->file_offset;
328
329 return 0;
330}
331
332static void fio_ime_engine_finalize(struct thread_data *td)
333{
334 /* Only finalize IME when using forks */
335 if (!td->o.use_thread) {
336 if (ime_native_finalize() < 0)
337 log_err("error in ime_native_finalize\n");
338 fio_ime_is_initialized = false;
339 }
340}
341
342
343/**************************************************************
344 * Private functions for blocking IOs
345 * (without iovecs)
346 **************************************************************/
347
348/* Notice: this function comes from the sync engine */
349/* It is used by the commit function to return a proper code and fill
350 some attributes in the io_u used for the IO. */
351static int fio_ime_psync_end(struct thread_data *td, struct io_u *io_u, ssize_t ret)
352{
353 if (ret != (ssize_t) io_u->xfer_buflen) {
354 if (ret >= 0) {
355 io_u->resid = io_u->xfer_buflen - ret;
356 io_u->error = 0;
357 return FIO_Q_COMPLETED;
358 } else
359 io_u->error = errno;
360 }
361
362 if (io_u->error) {
363 io_u_log_error(td, io_u);
364 td_verror(td, io_u->error, "xfer");
365 }
366
367 return FIO_Q_COMPLETED;
368}
369
370static enum fio_q_status fio_ime_psync_queue(struct thread_data *td,
371 struct io_u *io_u)
372{
373 struct fio_file *f = io_u->file;
374 ssize_t ret;
375
376 fio_ro_check(td, io_u);
377
378 if (io_u->ddir == DDIR_READ)
379 ret = ime_native_pread(f->fd, io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
380 else if (io_u->ddir == DDIR_WRITE)
381 ret = ime_native_pwrite(f->fd, io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
382 else if (io_u->ddir == DDIR_SYNC)
383 ret = ime_native_fsync(f->fd);
384 else {
385 ret = io_u->xfer_buflen;
386 io_u->error = EINVAL;
387 }
388
389 return fio_ime_psync_end(td, io_u, ret);
390}
391
392
393/**************************************************************
394 * Private functions for blocking IOs
395 * (with iovecs)
396 **************************************************************/
397
398static bool fio_ime_psyncv_can_queue(struct ime_data *ime_d, struct io_u *io_u)
399{
400 /* We can only queue if:
401 - There are no queued iovecs
402 - Or if there is at least one:
403 - There must be no event waiting for retrieval
404 - The offsets must be contiguous
405 - The ddir and fd must be the same */
406 return (ime_d->queued == 0 || (
407 ime_d->events == 0 &&
408 ime_d->last_offset == io_u->offset &&
409 ime_d->sioreq->ddir == io_u->ddir &&
410 ime_d->sioreq->fd == io_u->file->fd));
411}
412
413/* Before using this function, we should have already
414 ensured that the queue is not full */
415static void fio_ime_psyncv_enqueue(struct ime_data *ime_d, struct io_u *io_u)
416{
417 struct imesio_req *ioreq = ime_d->sioreq;
418 struct iovec *iov = &ime_d->iovecs[ime_d->head];
419
420 iov->iov_base = io_u->xfer_buf;
421 iov->iov_len = io_u->xfer_buflen;
422
423 if (ime_d->queued == 0) {
424 ioreq->offset = io_u->offset;
425 ioreq->ddir = io_u->ddir;
426 ioreq->fd = io_u->file->fd;
427 }
428
429 ime_d->io_us[ime_d->head] = io_u;
430 ime_d->last_offset = io_u->offset + io_u->xfer_buflen;
431 fio_ime_queue_incr(ime_d);
432}
433
434/* Tries to queue an IO. It will fail if the IO can't be appended to the
435 current request or if the current request has been committed but not
436 yet retrieved by get_events. */
437static enum fio_q_status fio_ime_psyncv_queue(struct thread_data *td,
438 struct io_u *io_u)
439{
440 struct ime_data *ime_d = td->io_ops_data;
441
442 fio_ro_check(td, io_u);
443
444 if (ime_d->queued == ime_d->depth)
445 return FIO_Q_BUSY;
446
447 if (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_WRITE) {
448 if (!fio_ime_psyncv_can_queue(ime_d, io_u))
449 return FIO_Q_BUSY;
450
451 dprint(FD_IO, "queue: ddir=%d at %u commit=%u queued=%u events=%u\n",
452 io_u->ddir, ime_d->head, ime_d->cur_commit,
453 ime_d->queued, ime_d->events);
454 fio_ime_psyncv_enqueue(ime_d, io_u);
455 return FIO_Q_QUEUED;
456 }
457 else if (io_u->ddir == DDIR_SYNC) {
458 if (ime_native_fsync(io_u->file->fd) < 0) {
459 io_u->error = errno;
460 td_verror(td, io_u->error, "fsync");
461 }
462 return FIO_Q_COMPLETED;
463 } else {
464 io_u->error = EINVAL;
465 td_verror(td, io_u->error, "wrong ddir");
466 return FIO_Q_COMPLETED;
467 }
468}
469
470/* Notice: this function comes from the sync engine */
471/* It is used by the commit function to return a proper code and fill
472 some attributes in the io_us appended to the current request. */
473static int fio_ime_psyncv_end(struct thread_data *td, ssize_t bytes)
474{
475 struct ime_data *ime_d = td->io_ops_data;
476 struct io_u *io_u;
477 unsigned int i;
478 int err = errno;
479
480 for (i = 0; i < ime_d->queued; i++) {
481 io_u = ime_d->io_us[i];
482
483 if (bytes == -1)
484 io_u->error = err;
485 else {
486 unsigned int this_io;
487
488 this_io = bytes;
489 if (this_io > io_u->xfer_buflen)
490 this_io = io_u->xfer_buflen;
491
492 io_u->resid = io_u->xfer_buflen - this_io;
493 io_u->error = 0;
494 bytes -= this_io;
495 }
496 }
497
498 if (bytes == -1) {
499 td_verror(td, err, "xfer psyncv");
500 return -err;
501 }
502
503 return 0;
504}
505
506/* Commits the current request by calling ime_native (with one or several
507 iovecs). After this commit, the corresponding events (one per iovec)
508 can be retrieved by get_events. */
509static int fio_ime_psyncv_commit(struct thread_data *td)
510{
511 struct ime_data *ime_d = td->io_ops_data;
512 struct imesio_req *ioreq;
513 int ret = 0;
514
515 /* Exit if there are no (new) events to commit
516 or if the previous committed event haven't been retrieved */
517 if (!ime_d->queued || ime_d->events)
518 return 0;
519
520 ioreq = ime_d->sioreq;
521 ime_d->events = ime_d->queued;
522 if (ioreq->ddir == DDIR_READ)
523 ret = ime_native_preadv(ioreq->fd, ime_d->iovecs, ime_d->queued, ioreq->offset);
524 else
525 ret = ime_native_pwritev(ioreq->fd, ime_d->iovecs, ime_d->queued, ioreq->offset);
526
527 dprint(FD_IO, "committed %d iovecs\n", ime_d->queued);
528
529 return fio_ime_psyncv_end(td, ret);
530}
531
532static int fio_ime_psyncv_getevents(struct thread_data *td, unsigned int min,
533 unsigned int max, const struct timespec *t)
534{
535 struct ime_data *ime_d = td->io_ops_data;
536 struct io_u *io_u;
537 int events = 0;
538 unsigned int count;
539
540 if (ime_d->events) {
541 for (count = 0; count < ime_d->events; count++) {
542 io_u = ime_d->io_us[count];
543 ime_d->event_io_us[events] = io_u;
544 events++;
545 }
546 fio_ime_queue_reset(ime_d);
547 }
548
549 dprint(FD_IO, "getevents(%u,%u) ret=%d queued=%u events=%u\n",
550 min, max, events, ime_d->queued, ime_d->events);
551 return events;
552}
553
554static int fio_ime_psyncv_init(struct thread_data *td)
555{
556 struct ime_data *ime_d;
557
558 if (fio_ime_engine_init(td) < 0)
559 return 1;
560
561 ime_d = calloc(1, sizeof(*ime_d));
562
563 ime_d->sioreq = malloc(sizeof(struct imesio_req));
564 ime_d->iovecs = malloc(td->o.iodepth * sizeof(struct iovec));
565 ime_d->io_us = malloc(2 * td->o.iodepth * sizeof(struct io_u *));
566 ime_d->event_io_us = ime_d->io_us + td->o.iodepth;
567
568 ime_d->depth = td->o.iodepth;
569
570 td->io_ops_data = ime_d;
571 return 0;
572}
573
574static void fio_ime_psyncv_clean(struct thread_data *td)
575{
576 struct ime_data *ime_d = td->io_ops_data;
577
578 if (ime_d) {
579 free(ime_d->sioreq);
580 free(ime_d->iovecs);
581 free(ime_d->io_us);
582 free(ime_d);
583 td->io_ops_data = NULL;
584 }
585
586 fio_ime_engine_finalize(td);
587}
588
589
590/**************************************************************
591 * Private functions for non-blocking IOs
592 *
593 **************************************************************/
594
595void fio_ime_aio_complete_cb (struct ime_aiocb *aiocb, int err,
596 ssize_t bytes)
597{
598 struct imeaio_req *ioreq = (struct imeaio_req *) aiocb->user_context;
599
600 pthread_mutex_lock(&ioreq->status_mutex);
601 ioreq->status = err == 0 ? bytes : FIO_IME_REQ_ERROR;
602 pthread_mutex_unlock(&ioreq->status_mutex);
603
604 pthread_cond_signal(&ioreq->cond_endio);
605}
606
607static bool fio_ime_aio_can_queue (struct ime_data *ime_d, struct io_u *io_u)
608{
609 /* So far we can queue in any case. */
610 return true;
611}
612static bool fio_ime_aio_can_append (struct ime_data *ime_d, struct io_u *io_u)
613{
614 /* We can only append if:
615 - The iovecs will be contiguous in the array
616 - There is already a queued iovec
617 - The offsets are contiguous
618 - The ddir and fs are the same */
619 return (ime_d->head != 0 &&
620 ime_d->queued - ime_d->events > 0 &&
621 ime_d->last_offset == io_u->offset &&
622 ime_d->last_req->ddir == io_u->ddir &&
623 ime_d->last_req->iocb.fd == io_u->file->fd);
624}
625
626/* Before using this function, we should have already
627 ensured that the queue is not full */
628static void fio_ime_aio_enqueue(struct ime_data *ime_d, struct io_u *io_u)
629{
630 struct imeaio_req *ioreq = &ime_d->aioreqs[ime_d->head];
631 struct ime_aiocb *iocb = &ioreq->iocb;
632 struct iovec *iov = &ime_d->iovecs[ime_d->head];
633
634 iov->iov_base = io_u->xfer_buf;
635 iov->iov_len = io_u->xfer_buflen;
636
637 if (fio_ime_aio_can_append(ime_d, io_u))
638 ime_d->last_req->iocb.iovcnt++;
639 else {
640 ioreq->status = FIO_IME_IN_PROGRESS;
641 ioreq->ddir = io_u->ddir;
642 ime_d->last_req = ioreq;
643
644 iocb->complete_cb = &fio_ime_aio_complete_cb;
645 iocb->fd = io_u->file->fd;
646 iocb->file_offset = io_u->offset;
647 iocb->iov = iov;
648 iocb->iovcnt = 1;
649 iocb->flags = 0;
650 iocb->user_context = (intptr_t) ioreq;
651 }
652
653 ime_d->io_us[ime_d->head] = io_u;
654 ime_d->last_offset = io_u->offset + io_u->xfer_buflen;
655 fio_ime_queue_incr(ime_d);
656}
657
658/* Tries to queue an IO. It will create a new request if the IO can't be
659 appended to the current request. It will fail if the queue can't contain
660 any more io_u/iovec. In this case, commit and then get_events need to be
661 called. */
662static enum fio_q_status fio_ime_aio_queue(struct thread_data *td,
663 struct io_u *io_u)
664{
665 struct ime_data *ime_d = td->io_ops_data;
666
667 fio_ro_check(td, io_u);
668
669 dprint(FD_IO, "queue: ddir=%d at %u commit=%u queued=%u events=%u\n",
670 io_u->ddir, ime_d->head, ime_d->cur_commit,
671 ime_d->queued, ime_d->events);
672
673 if (ime_d->queued == ime_d->depth)
674 return FIO_Q_BUSY;
675
676 if (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_WRITE) {
677 if (!fio_ime_aio_can_queue(ime_d, io_u))
678 return FIO_Q_BUSY;
679
680 fio_ime_aio_enqueue(ime_d, io_u);
681 return FIO_Q_QUEUED;
682 }
683 else if (io_u->ddir == DDIR_SYNC) {
684 if (ime_native_fsync(io_u->file->fd) < 0) {
685 io_u->error = errno;
686 td_verror(td, io_u->error, "fsync");
687 }
688 return FIO_Q_COMPLETED;
689 } else {
690 io_u->error = EINVAL;
691 td_verror(td, io_u->error, "wrong ddir");
692 return FIO_Q_COMPLETED;
693 }
694}
695
696static int fio_ime_aio_commit(struct thread_data *td)
697{
698 struct ime_data *ime_d = td->io_ops_data;
699 struct imeaio_req *ioreq;
700 int ret = 0;
701
702 /* Loop while there are events to commit */
703 while (ime_d->queued - ime_d->events) {
704 ioreq = &ime_d->aioreqs[ime_d->cur_commit];
705 if (ioreq->ddir == DDIR_READ)
706 ret = ime_native_aio_read(&ioreq->iocb);
707 else
708 ret = ime_native_aio_write(&ioreq->iocb);
709
710 fio_ime_queue_commit(ime_d, ioreq->iocb.iovcnt);
711
712 /* fio needs a negative error code */
713 if (ret < 0) {
714 ioreq->status = FIO_IME_REQ_ERROR;
715 return -errno;
716 }
717
718 io_u_mark_submit(td, ioreq->iocb.iovcnt);
719 dprint(FD_IO, "committed %d iovecs commit=%u queued=%u events=%u\n",
720 ioreq->iocb.iovcnt, ime_d->cur_commit,
721 ime_d->queued, ime_d->events);
722 }
723
724 return 0;
725}
726
727static int fio_ime_aio_getevents(struct thread_data *td, unsigned int min,
728 unsigned int max, const struct timespec *t)
729{
730 struct ime_data *ime_d = td->io_ops_data;
731 struct imeaio_req *ioreq;
732 struct io_u *io_u;
733 int events = 0;
734 unsigned int count;
735 ssize_t bytes;
736
737 while (ime_d->events) {
738 ioreq = &ime_d->aioreqs[ime_d->tail];
739
740 /* Break if we already got events, and if we will
741 exceed max if we append the next events */
742 if (events && events + ioreq->iocb.iovcnt > max)
743 break;
744
745 if (ioreq->status != FIO_IME_IN_PROGRESS) {
746
747 bytes = ioreq->status;
748 for (count = 0; count < ioreq->iocb.iovcnt; count++) {
749 io_u = ime_d->io_us[ime_d->tail];
750 ime_d->event_io_us[events] = io_u;
751 events++;
752 fio_ime_queue_red(ime_d);
753
754 if (ioreq->status == FIO_IME_REQ_ERROR)
755 io_u->error = EIO;
756 else {
757 io_u->resid = bytes > io_u->xfer_buflen ?
758 0 : io_u->xfer_buflen - bytes;
759 io_u->error = 0;
760 bytes -= io_u->xfer_buflen - io_u->resid;
761 }
762 }
763 } else {
764 pthread_mutex_lock(&ioreq->status_mutex);
765 while (ioreq->status == FIO_IME_IN_PROGRESS) {
766 pthread_cond_wait(&ioreq->cond_endio, &ioreq->status_mutex);
767 }
768 pthread_mutex_unlock(&ioreq->status_mutex);
769 }
770
771 }
772
773 dprint(FD_IO, "getevents(%u,%u) ret=%d queued=%u events=%u\n", min, max,
774 events, ime_d->queued, ime_d->events);
775 return events;
776}
777
778static int fio_ime_aio_init(struct thread_data *td)
779{
780 struct ime_data *ime_d;
781 struct imeaio_req *ioreq;
782 unsigned int i;
783
784 if (fio_ime_engine_init(td) < 0)
785 return 1;
786
787 ime_d = calloc(1, sizeof(*ime_d));
788
789 ime_d->aioreqs = malloc(td->o.iodepth * sizeof(struct imeaio_req));
790 ime_d->iovecs = malloc(td->o.iodepth * sizeof(struct iovec));
791 ime_d->io_us = malloc(2 * td->o.iodepth * sizeof(struct io_u *));
792 ime_d->event_io_us = ime_d->io_us + td->o.iodepth;
793
794 ime_d->depth = td->o.iodepth;
795 for (i = 0; i < ime_d->depth; i++) {
796 ioreq = &ime_d->aioreqs[i];
797 pthread_cond_init(&ioreq->cond_endio, NULL);
798 pthread_mutex_init(&ioreq->status_mutex, NULL);
799 }
800
801 td->io_ops_data = ime_d;
802 return 0;
803}
804
805static void fio_ime_aio_clean(struct thread_data *td)
806{
807 struct ime_data *ime_d = td->io_ops_data;
808 struct imeaio_req *ioreq;
809 unsigned int i;
810
811 if (ime_d) {
812 for (i = 0; i < ime_d->depth; i++) {
813 ioreq = &ime_d->aioreqs[i];
814 pthread_cond_destroy(&ioreq->cond_endio);
815 pthread_mutex_destroy(&ioreq->status_mutex);
816 }
817 free(ime_d->aioreqs);
818 free(ime_d->iovecs);
819 free(ime_d->io_us);
820 free(ime_d);
821 td->io_ops_data = NULL;
822 }
823
824 fio_ime_engine_finalize(td);
825}
826
827
828/**************************************************************
829 * IO engines definitions
830 *
831 **************************************************************/
832
833/* The FIO_DISKLESSIO flag used for these engines is necessary to prevent
834 FIO from using POSIX calls. See fio_ime_open_file for more details. */
835
836static struct ioengine_ops ioengine_prw = {
837 .name = "ime_psync",
838 .version = FIO_IOOPS_VERSION,
839 .setup = fio_ime_setup,
840 .init = fio_ime_engine_init,
841 .cleanup = fio_ime_engine_finalize,
842 .queue = fio_ime_psync_queue,
843 .open_file = fio_ime_open_file,
844 .close_file = fio_ime_close_file,
845 .get_file_size = fio_ime_get_file_size,
846 .unlink_file = fio_ime_unlink_file,
847 .flags = FIO_SYNCIO | FIO_DISKLESSIO,
848};
849
850static struct ioengine_ops ioengine_pvrw = {
851 .name = "ime_psyncv",
852 .version = FIO_IOOPS_VERSION,
853 .setup = fio_ime_setup,
854 .init = fio_ime_psyncv_init,
855 .cleanup = fio_ime_psyncv_clean,
856 .queue = fio_ime_psyncv_queue,
857 .commit = fio_ime_psyncv_commit,
858 .getevents = fio_ime_psyncv_getevents,
859 .event = fio_ime_event,
860 .open_file = fio_ime_open_file,
861 .close_file = fio_ime_close_file,
862 .get_file_size = fio_ime_get_file_size,
863 .unlink_file = fio_ime_unlink_file,
864 .flags = FIO_SYNCIO | FIO_DISKLESSIO,
865};
866
867static struct ioengine_ops ioengine_aio = {
868 .name = "ime_aio",
869 .version = FIO_IOOPS_VERSION,
870 .setup = fio_ime_setup,
871 .init = fio_ime_aio_init,
872 .cleanup = fio_ime_aio_clean,
873 .queue = fio_ime_aio_queue,
874 .commit = fio_ime_aio_commit,
875 .getevents = fio_ime_aio_getevents,
876 .event = fio_ime_event,
877 .open_file = fio_ime_open_file,
878 .close_file = fio_ime_close_file,
879 .get_file_size = fio_ime_get_file_size,
880 .unlink_file = fio_ime_unlink_file,
881 .flags = FIO_DISKLESSIO,
882};
883
884static void fio_init fio_ime_register(void)
885{
886 register_ioengine(&ioengine_prw);
887 register_ioengine(&ioengine_pvrw);
888 register_ioengine(&ioengine_aio);
889}
890
891static void fio_exit fio_ime_unregister(void)
892{
893 unregister_ioengine(&ioengine_prw);
894 unregister_ioengine(&ioengine_pvrw);
895 unregister_ioengine(&ioengine_aio);
896 if (fio_ime_is_initialized && ime_native_finalize() < 0) {
897 log_err("Warning: IME did not finalize properly\n");
898 }
899}