Add the 'zbd' debug level
[fio.git] / engines / ime.c
CommitLineData
a40e7a59
GB
1/*
2 * FIO engines for DDN's Infinite Memory Engine.
3 * This file defines 3 engines: ime_psync, ime_psyncv, and ime_aio
4 *
5 * Copyright (C) 2018 DataDirect Networks. All rights reserved.
6 *
7 * This program is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU General Public License,
9 * version 2 as published by the Free Software Foundation..
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 */
16
17/*
18 * Some details about the new engines are given below:
19 *
20 *
21 * ime_psync:
22 * Most basic engine that issues calls to ime_native whenever an IO is queued.
23 *
24 * ime_psyncv:
25 * This engine tries to queue the IOs (by creating iovecs) if asked by FIO (via
26 * iodepth_batch). It refuses to queue when the iovecs can't be appended, and
27 * waits for FIO to issue a commit. After a call to commit and get_events, new
28 * IOs can be queued.
29 *
30 * ime_aio:
31 * This engine tries to queue the IOs (by creating iovecs) if asked by FIO (via
32 * iodepth_batch). When the iovecs can't be appended to the current request, a
33 * new request for IME is created. These requests will be issued to IME when
34 * commit is called. Contrary to ime_psyncv, there can be several requests at
35 * once. We don't need to wait for a request to terminate before creating a new
36 * one.
37 */
38
39#include <stdio.h>
40#include <stdlib.h>
41#include <errno.h>
42#include <linux/limits.h>
43#include <ime_native.h>
44
45#include "../fio.h"
46
47
48/**************************************************************
49 * Types and constants definitions
50 *
51 **************************************************************/
52
53/* define constants for async IOs */
54#define FIO_IME_IN_PROGRESS -1
55#define FIO_IME_REQ_ERROR -2
56
57/* This flag is used when some jobs were created using threads. In that
58 case, IME can't be finalized in the engine-specific cleanup function,
59 because other threads might still use IME. Instead, IME is finalized
60 in the destructor (see fio_ime_unregister), only when the flag
61 fio_ime_is_initialized is true (which means at least one thread has
62 initialized IME). */
63static bool fio_ime_is_initialized = false;
64
65struct imesio_req {
66 int fd; /* File descriptor */
67 enum fio_ddir ddir; /* Type of IO (read or write) */
68 off_t offset; /* File offset */
69};
70struct imeaio_req {
71 struct ime_aiocb iocb; /* IME aio request */
72 ssize_t status; /* Status of the IME request */
73 enum fio_ddir ddir; /* Type of IO (read or write) */
74 pthread_cond_t cond_endio; /* Condition var to notify FIO */
75 pthread_mutex_t status_mutex; /* Mutex for cond_endio */
76};
77
78/* This structure will be used for 2 engines: ime_psyncv and ime_aio */
79struct ime_data {
80 union {
81 struct imeaio_req *aioreqs; /* array of aio requests */
82 struct imesio_req *sioreq; /* pointer to the only syncio request */
83 };
84 struct iovec *iovecs; /* array of queued iovecs */
85 struct io_u **io_us; /* array of queued io_u pointers */
86 struct io_u **event_io_us; /* array of the events retieved afer get_events*/
87 unsigned int queued; /* iovecs/io_us in the queue */
88 unsigned int events; /* number of committed iovecs/io_us */
89
90 /* variables used to implement a "ring" queue */
91 unsigned int depth; /* max entries in the queue */
92 unsigned int head; /* index used to append */
93 unsigned int tail; /* index used to pop */
94 unsigned int cur_commit; /* index of the first uncommitted req */
95
96 /* offset used by the last iovec (used to check if the iovecs can be appended)*/
97 unsigned long long last_offset;
98
99 /* The variables below are used for aio only */
100 struct imeaio_req *last_req; /* last request awaiting committing */
101};
102
103
104/**************************************************************
105 * Private functions for queueing/unqueueing
106 *
107 **************************************************************/
108
109static void fio_ime_queue_incr (struct ime_data *ime_d)
110{
111 ime_d->head = (ime_d->head + 1) % ime_d->depth;
112 ime_d->queued++;
113}
38c1392e 114
a40e7a59
GB
115static void fio_ime_queue_red (struct ime_data *ime_d)
116{
117 ime_d->tail = (ime_d->tail + 1) % ime_d->depth;
118 ime_d->queued--;
119 ime_d->events--;
120}
38c1392e 121
a40e7a59
GB
122static void fio_ime_queue_commit (struct ime_data *ime_d, int iovcnt)
123{
124 ime_d->cur_commit = (ime_d->cur_commit + iovcnt) % ime_d->depth;
125 ime_d->events += iovcnt;
126}
38c1392e 127
a40e7a59
GB
128static void fio_ime_queue_reset (struct ime_data *ime_d)
129{
130 ime_d->head = 0;
131 ime_d->tail = 0;
132 ime_d->cur_commit = 0;
133 ime_d->queued = 0;
134 ime_d->events = 0;
135}
136
a40e7a59
GB
137/**************************************************************
138 * General IME functions
139 * (needed for both sync and async IOs)
140 **************************************************************/
141
142static char *fio_set_ime_filename(char* filename)
143{
144 static __thread char ime_filename[PATH_MAX];
38c1392e
JA
145 int ret;
146
147 ret = snprintf(ime_filename, PATH_MAX, "%s%s", DEFAULT_IME_FILE_PREFIX, filename);
148 if (ret < PATH_MAX)
a40e7a59 149 return ime_filename;
38c1392e
JA
150
151 return NULL;
a40e7a59
GB
152}
153
154static int fio_ime_get_file_size(struct thread_data *td, struct fio_file *f)
155{
156 struct stat buf;
157 int ret;
158 char *ime_filename;
159
160 dprint(FD_FILE, "get file size %s\n", f->file_name);
161
162 ime_filename = fio_set_ime_filename(f->file_name);
163 if (ime_filename == NULL)
164 return 1;
165 ret = ime_native_stat(ime_filename, &buf);
166 if (ret == -1) {
167 td_verror(td, errno, "fstat");
168 return 1;
169 }
170
171 f->real_file_size = buf.st_size;
172 return 0;
173}
174
175/* This functions mimics the generic_file_open function, but issues
176 IME native calls instead of POSIX calls. */
177static int fio_ime_open_file(struct thread_data *td, struct fio_file *f)
178{
179 int flags = 0;
180 int ret;
181 uint64_t desired_fs;
182 char *ime_filename;
183
184 dprint(FD_FILE, "fd open %s\n", f->file_name);
185
186 if (td_trim(td)) {
187 td_verror(td, EINVAL, "IME does not support TRIM operation");
188 return 1;
189 }
190
191 if (td->o.oatomic) {
192 td_verror(td, EINVAL, "IME does not support atomic IO");
193 return 1;
194 }
195 if (td->o.odirect)
196 flags |= O_DIRECT;
197 if (td->o.sync_io)
198 flags |= O_SYNC;
199 if (td->o.create_on_open && td->o.allow_create)
200 flags |= O_CREAT;
201
202 if (td_write(td)) {
203 if (!read_only)
204 flags |= O_RDWR;
205
206 if (td->o.allow_create)
207 flags |= O_CREAT;
38c1392e 208 } else if (td_read(td)) {
a40e7a59 209 flags |= O_RDONLY;
38c1392e 210 } else {
a40e7a59
GB
211 /* We should never go here. */
212 td_verror(td, EINVAL, "Unsopported open mode");
213 return 1;
214 }
215
216 ime_filename = fio_set_ime_filename(f->file_name);
217 if (ime_filename == NULL)
218 return 1;
219 f->fd = ime_native_open(ime_filename, flags, 0600);
220 if (f->fd == -1) {
221 char buf[FIO_VERROR_SIZE];
222 int __e = errno;
223
224 snprintf(buf, sizeof(buf), "open(%s)", f->file_name);
225 td_verror(td, __e, buf);
226 return 1;
227 }
228
229 /* Now we need to make sure the real file size is sufficient for FIO
230 to do its things. This is normally done before the file open function
231 is called, but because FIO would use POSIX calls, we need to do it
232 ourselves */
233 ret = fio_ime_get_file_size(td, f);
234 if (ret < 0) {
235 ime_native_close(f->fd);
236 td_verror(td, errno, "ime_get_file_size");
237 return 1;
238 }
239
240 desired_fs = f->io_size + f->file_offset;
241 if (td_write(td)) {
242 dprint(FD_FILE, "Laying out file %s%s\n",
243 DEFAULT_IME_FILE_PREFIX, f->file_name);
244 if (!td->o.create_on_open &&
245 f->real_file_size < desired_fs &&
246 ime_native_ftruncate(f->fd, desired_fs) < 0) {
247 ime_native_close(f->fd);
248 td_verror(td, errno, "ime_native_ftruncate");
249 return 1;
250 }
251 if (f->real_file_size < desired_fs)
252 f->real_file_size = desired_fs;
38c1392e 253 } else if (td_read(td) && f->real_file_size < desired_fs) {
a40e7a59
GB
254 ime_native_close(f->fd);
255 log_err("error: can't read %lu bytes from file with "
256 "%lu bytes\n", desired_fs, f->real_file_size);
257 return 1;
258 }
259
260 return 0;
261}
262
263static int fio_ime_close_file(struct thread_data fio_unused *td, struct fio_file *f)
264{
265 int ret = 0;
266
267 dprint(FD_FILE, "fd close %s\n", f->file_name);
268
269 if (ime_native_close(f->fd) < 0)
270 ret = errno;
271
272 f->fd = -1;
273 return ret;
274}
275
276static int fio_ime_unlink_file(struct thread_data *td, struct fio_file *f)
277{
38c1392e 278 char *ime_filename = fio_set_ime_filename(f->file_name);
a40e7a59
GB
279 int ret;
280
a40e7a59
GB
281 if (ime_filename == NULL)
282 return 1;
a40e7a59 283
38c1392e 284 ret = unlink(ime_filename);
a40e7a59
GB
285 return ret < 0 ? errno : 0;
286}
287
288static struct io_u *fio_ime_event(struct thread_data *td, int event)
289{
290 struct ime_data *ime_d = td->io_ops_data;
38c1392e 291
a40e7a59
GB
292 return ime_d->event_io_us[event];
293}
294
295/* Setup file used to replace get_file_sizes when settin up the file.
296 Instead we will set real_file_sie to 0 for each file. This way we
297 can avoid calling ime_native_init before the forks are created. */
298static int fio_ime_setup(struct thread_data *td)
299{
300 struct fio_file *f;
301 unsigned int i;
302
303 for_each_file(td, f, i) {
304 dprint(FD_FILE, "setup: set file size to 0 for %p/%d/%s\n",
305 f, i, f->file_name);
306 f->real_file_size = 0;
307 }
308
309 return 0;
310}
311
312static int fio_ime_engine_init(struct thread_data *td)
313{
314 struct fio_file *f;
315 unsigned int i;
316
317 dprint(FD_IO, "ime engine init\n");
318 if (fio_ime_is_initialized && !td->o.use_thread) {
319 log_err("Warning: something might go wrong. Not all threads/forks were"
320 " created before the FIO jobs were initialized.\n");
321 }
322
323 ime_native_init();
324 fio_ime_is_initialized = true;
325
326 /* We have to temporarily set real_file_size so that
327 FIO can initialize properly. It will be corrected
328 on file open. */
329 for_each_file(td, f, i)
330 f->real_file_size = f->io_size + f->file_offset;
331
332 return 0;
333}
334
335static void fio_ime_engine_finalize(struct thread_data *td)
336{
337 /* Only finalize IME when using forks */
338 if (!td->o.use_thread) {
339 if (ime_native_finalize() < 0)
340 log_err("error in ime_native_finalize\n");
341 fio_ime_is_initialized = false;
342 }
343}
344
345
346/**************************************************************
347 * Private functions for blocking IOs
348 * (without iovecs)
349 **************************************************************/
350
351/* Notice: this function comes from the sync engine */
352/* It is used by the commit function to return a proper code and fill
353 some attributes in the io_u used for the IO. */
354static int fio_ime_psync_end(struct thread_data *td, struct io_u *io_u, ssize_t ret)
355{
356 if (ret != (ssize_t) io_u->xfer_buflen) {
357 if (ret >= 0) {
358 io_u->resid = io_u->xfer_buflen - ret;
359 io_u->error = 0;
360 return FIO_Q_COMPLETED;
361 } else
362 io_u->error = errno;
363 }
364
365 if (io_u->error) {
366 io_u_log_error(td, io_u);
367 td_verror(td, io_u->error, "xfer");
368 }
369
370 return FIO_Q_COMPLETED;
371}
372
373static enum fio_q_status fio_ime_psync_queue(struct thread_data *td,
374 struct io_u *io_u)
375{
376 struct fio_file *f = io_u->file;
377 ssize_t ret;
378
379 fio_ro_check(td, io_u);
380
381 if (io_u->ddir == DDIR_READ)
382 ret = ime_native_pread(f->fd, io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
383 else if (io_u->ddir == DDIR_WRITE)
384 ret = ime_native_pwrite(f->fd, io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
385 else if (io_u->ddir == DDIR_SYNC)
386 ret = ime_native_fsync(f->fd);
387 else {
388 ret = io_u->xfer_buflen;
389 io_u->error = EINVAL;
390 }
391
392 return fio_ime_psync_end(td, io_u, ret);
393}
394
395
396/**************************************************************
397 * Private functions for blocking IOs
398 * (with iovecs)
399 **************************************************************/
400
401static bool fio_ime_psyncv_can_queue(struct ime_data *ime_d, struct io_u *io_u)
402{
403 /* We can only queue if:
404 - There are no queued iovecs
405 - Or if there is at least one:
406 - There must be no event waiting for retrieval
407 - The offsets must be contiguous
408 - The ddir and fd must be the same */
409 return (ime_d->queued == 0 || (
410 ime_d->events == 0 &&
411 ime_d->last_offset == io_u->offset &&
412 ime_d->sioreq->ddir == io_u->ddir &&
413 ime_d->sioreq->fd == io_u->file->fd));
414}
415
416/* Before using this function, we should have already
417 ensured that the queue is not full */
418static void fio_ime_psyncv_enqueue(struct ime_data *ime_d, struct io_u *io_u)
419{
420 struct imesio_req *ioreq = ime_d->sioreq;
421 struct iovec *iov = &ime_d->iovecs[ime_d->head];
422
423 iov->iov_base = io_u->xfer_buf;
424 iov->iov_len = io_u->xfer_buflen;
425
426 if (ime_d->queued == 0) {
427 ioreq->offset = io_u->offset;
428 ioreq->ddir = io_u->ddir;
429 ioreq->fd = io_u->file->fd;
430 }
431
432 ime_d->io_us[ime_d->head] = io_u;
433 ime_d->last_offset = io_u->offset + io_u->xfer_buflen;
434 fio_ime_queue_incr(ime_d);
435}
436
437/* Tries to queue an IO. It will fail if the IO can't be appended to the
438 current request or if the current request has been committed but not
439 yet retrieved by get_events. */
440static enum fio_q_status fio_ime_psyncv_queue(struct thread_data *td,
441 struct io_u *io_u)
442{
443 struct ime_data *ime_d = td->io_ops_data;
444
445 fio_ro_check(td, io_u);
446
447 if (ime_d->queued == ime_d->depth)
448 return FIO_Q_BUSY;
449
450 if (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_WRITE) {
451 if (!fio_ime_psyncv_can_queue(ime_d, io_u))
452 return FIO_Q_BUSY;
453
454 dprint(FD_IO, "queue: ddir=%d at %u commit=%u queued=%u events=%u\n",
455 io_u->ddir, ime_d->head, ime_d->cur_commit,
456 ime_d->queued, ime_d->events);
457 fio_ime_psyncv_enqueue(ime_d, io_u);
458 return FIO_Q_QUEUED;
38c1392e 459 } else if (io_u->ddir == DDIR_SYNC) {
a40e7a59
GB
460 if (ime_native_fsync(io_u->file->fd) < 0) {
461 io_u->error = errno;
462 td_verror(td, io_u->error, "fsync");
463 }
464 return FIO_Q_COMPLETED;
465 } else {
466 io_u->error = EINVAL;
467 td_verror(td, io_u->error, "wrong ddir");
468 return FIO_Q_COMPLETED;
469 }
470}
471
472/* Notice: this function comes from the sync engine */
473/* It is used by the commit function to return a proper code and fill
474 some attributes in the io_us appended to the current request. */
475static int fio_ime_psyncv_end(struct thread_data *td, ssize_t bytes)
476{
477 struct ime_data *ime_d = td->io_ops_data;
478 struct io_u *io_u;
479 unsigned int i;
480 int err = errno;
481
482 for (i = 0; i < ime_d->queued; i++) {
483 io_u = ime_d->io_us[i];
484
485 if (bytes == -1)
486 io_u->error = err;
487 else {
488 unsigned int this_io;
489
490 this_io = bytes;
491 if (this_io > io_u->xfer_buflen)
492 this_io = io_u->xfer_buflen;
493
494 io_u->resid = io_u->xfer_buflen - this_io;
495 io_u->error = 0;
496 bytes -= this_io;
497 }
498 }
499
500 if (bytes == -1) {
501 td_verror(td, err, "xfer psyncv");
502 return -err;
503 }
504
505 return 0;
506}
507
508/* Commits the current request by calling ime_native (with one or several
509 iovecs). After this commit, the corresponding events (one per iovec)
510 can be retrieved by get_events. */
511static int fio_ime_psyncv_commit(struct thread_data *td)
512{
513 struct ime_data *ime_d = td->io_ops_data;
514 struct imesio_req *ioreq;
515 int ret = 0;
516
517 /* Exit if there are no (new) events to commit
518 or if the previous committed event haven't been retrieved */
519 if (!ime_d->queued || ime_d->events)
520 return 0;
521
522 ioreq = ime_d->sioreq;
523 ime_d->events = ime_d->queued;
524 if (ioreq->ddir == DDIR_READ)
525 ret = ime_native_preadv(ioreq->fd, ime_d->iovecs, ime_d->queued, ioreq->offset);
526 else
527 ret = ime_native_pwritev(ioreq->fd, ime_d->iovecs, ime_d->queued, ioreq->offset);
528
529 dprint(FD_IO, "committed %d iovecs\n", ime_d->queued);
530
531 return fio_ime_psyncv_end(td, ret);
532}
533
534static int fio_ime_psyncv_getevents(struct thread_data *td, unsigned int min,
535 unsigned int max, const struct timespec *t)
536{
537 struct ime_data *ime_d = td->io_ops_data;
538 struct io_u *io_u;
539 int events = 0;
540 unsigned int count;
541
542 if (ime_d->events) {
543 for (count = 0; count < ime_d->events; count++) {
544 io_u = ime_d->io_us[count];
545 ime_d->event_io_us[events] = io_u;
546 events++;
547 }
548 fio_ime_queue_reset(ime_d);
549 }
550
551 dprint(FD_IO, "getevents(%u,%u) ret=%d queued=%u events=%u\n",
552 min, max, events, ime_d->queued, ime_d->events);
553 return events;
554}
555
556static int fio_ime_psyncv_init(struct thread_data *td)
557{
558 struct ime_data *ime_d;
559
560 if (fio_ime_engine_init(td) < 0)
561 return 1;
562
563 ime_d = calloc(1, sizeof(*ime_d));
564
565 ime_d->sioreq = malloc(sizeof(struct imesio_req));
566 ime_d->iovecs = malloc(td->o.iodepth * sizeof(struct iovec));
567 ime_d->io_us = malloc(2 * td->o.iodepth * sizeof(struct io_u *));
568 ime_d->event_io_us = ime_d->io_us + td->o.iodepth;
569
570 ime_d->depth = td->o.iodepth;
571
572 td->io_ops_data = ime_d;
573 return 0;
574}
575
576static void fio_ime_psyncv_clean(struct thread_data *td)
577{
578 struct ime_data *ime_d = td->io_ops_data;
579
580 if (ime_d) {
581 free(ime_d->sioreq);
582 free(ime_d->iovecs);
583 free(ime_d->io_us);
584 free(ime_d);
585 td->io_ops_data = NULL;
586 }
587
588 fio_ime_engine_finalize(td);
589}
590
591
592/**************************************************************
593 * Private functions for non-blocking IOs
594 *
595 **************************************************************/
596
597void fio_ime_aio_complete_cb (struct ime_aiocb *aiocb, int err,
598 ssize_t bytes)
599{
600 struct imeaio_req *ioreq = (struct imeaio_req *) aiocb->user_context;
601
602 pthread_mutex_lock(&ioreq->status_mutex);
603 ioreq->status = err == 0 ? bytes : FIO_IME_REQ_ERROR;
604 pthread_mutex_unlock(&ioreq->status_mutex);
605
606 pthread_cond_signal(&ioreq->cond_endio);
607}
608
609static bool fio_ime_aio_can_queue (struct ime_data *ime_d, struct io_u *io_u)
610{
611 /* So far we can queue in any case. */
612 return true;
613}
614static bool fio_ime_aio_can_append (struct ime_data *ime_d, struct io_u *io_u)
615{
616 /* We can only append if:
617 - The iovecs will be contiguous in the array
618 - There is already a queued iovec
619 - The offsets are contiguous
620 - The ddir and fs are the same */
621 return (ime_d->head != 0 &&
622 ime_d->queued - ime_d->events > 0 &&
623 ime_d->last_offset == io_u->offset &&
624 ime_d->last_req->ddir == io_u->ddir &&
625 ime_d->last_req->iocb.fd == io_u->file->fd);
626}
627
628/* Before using this function, we should have already
629 ensured that the queue is not full */
630static void fio_ime_aio_enqueue(struct ime_data *ime_d, struct io_u *io_u)
631{
632 struct imeaio_req *ioreq = &ime_d->aioreqs[ime_d->head];
633 struct ime_aiocb *iocb = &ioreq->iocb;
634 struct iovec *iov = &ime_d->iovecs[ime_d->head];
635
636 iov->iov_base = io_u->xfer_buf;
637 iov->iov_len = io_u->xfer_buflen;
638
639 if (fio_ime_aio_can_append(ime_d, io_u))
640 ime_d->last_req->iocb.iovcnt++;
641 else {
642 ioreq->status = FIO_IME_IN_PROGRESS;
643 ioreq->ddir = io_u->ddir;
644 ime_d->last_req = ioreq;
645
646 iocb->complete_cb = &fio_ime_aio_complete_cb;
647 iocb->fd = io_u->file->fd;
648 iocb->file_offset = io_u->offset;
649 iocb->iov = iov;
650 iocb->iovcnt = 1;
651 iocb->flags = 0;
652 iocb->user_context = (intptr_t) ioreq;
653 }
654
655 ime_d->io_us[ime_d->head] = io_u;
656 ime_d->last_offset = io_u->offset + io_u->xfer_buflen;
657 fio_ime_queue_incr(ime_d);
658}
659
660/* Tries to queue an IO. It will create a new request if the IO can't be
661 appended to the current request. It will fail if the queue can't contain
662 any more io_u/iovec. In this case, commit and then get_events need to be
663 called. */
664static enum fio_q_status fio_ime_aio_queue(struct thread_data *td,
665 struct io_u *io_u)
666{
667 struct ime_data *ime_d = td->io_ops_data;
668
669 fio_ro_check(td, io_u);
670
671 dprint(FD_IO, "queue: ddir=%d at %u commit=%u queued=%u events=%u\n",
672 io_u->ddir, ime_d->head, ime_d->cur_commit,
673 ime_d->queued, ime_d->events);
674
675 if (ime_d->queued == ime_d->depth)
676 return FIO_Q_BUSY;
677
678 if (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_WRITE) {
679 if (!fio_ime_aio_can_queue(ime_d, io_u))
680 return FIO_Q_BUSY;
681
682 fio_ime_aio_enqueue(ime_d, io_u);
683 return FIO_Q_QUEUED;
38c1392e 684 } else if (io_u->ddir == DDIR_SYNC) {
a40e7a59
GB
685 if (ime_native_fsync(io_u->file->fd) < 0) {
686 io_u->error = errno;
687 td_verror(td, io_u->error, "fsync");
688 }
689 return FIO_Q_COMPLETED;
690 } else {
691 io_u->error = EINVAL;
692 td_verror(td, io_u->error, "wrong ddir");
693 return FIO_Q_COMPLETED;
694 }
695}
696
697static int fio_ime_aio_commit(struct thread_data *td)
698{
699 struct ime_data *ime_d = td->io_ops_data;
700 struct imeaio_req *ioreq;
701 int ret = 0;
702
703 /* Loop while there are events to commit */
704 while (ime_d->queued - ime_d->events) {
705 ioreq = &ime_d->aioreqs[ime_d->cur_commit];
706 if (ioreq->ddir == DDIR_READ)
707 ret = ime_native_aio_read(&ioreq->iocb);
708 else
709 ret = ime_native_aio_write(&ioreq->iocb);
710
711 fio_ime_queue_commit(ime_d, ioreq->iocb.iovcnt);
712
713 /* fio needs a negative error code */
714 if (ret < 0) {
715 ioreq->status = FIO_IME_REQ_ERROR;
716 return -errno;
717 }
718
719 io_u_mark_submit(td, ioreq->iocb.iovcnt);
720 dprint(FD_IO, "committed %d iovecs commit=%u queued=%u events=%u\n",
721 ioreq->iocb.iovcnt, ime_d->cur_commit,
722 ime_d->queued, ime_d->events);
723 }
724
725 return 0;
726}
727
728static int fio_ime_aio_getevents(struct thread_data *td, unsigned int min,
729 unsigned int max, const struct timespec *t)
730{
731 struct ime_data *ime_d = td->io_ops_data;
732 struct imeaio_req *ioreq;
733 struct io_u *io_u;
734 int events = 0;
735 unsigned int count;
736 ssize_t bytes;
737
738 while (ime_d->events) {
739 ioreq = &ime_d->aioreqs[ime_d->tail];
740
741 /* Break if we already got events, and if we will
742 exceed max if we append the next events */
743 if (events && events + ioreq->iocb.iovcnt > max)
744 break;
745
746 if (ioreq->status != FIO_IME_IN_PROGRESS) {
747
748 bytes = ioreq->status;
749 for (count = 0; count < ioreq->iocb.iovcnt; count++) {
750 io_u = ime_d->io_us[ime_d->tail];
751 ime_d->event_io_us[events] = io_u;
752 events++;
753 fio_ime_queue_red(ime_d);
754
755 if (ioreq->status == FIO_IME_REQ_ERROR)
756 io_u->error = EIO;
757 else {
758 io_u->resid = bytes > io_u->xfer_buflen ?
759 0 : io_u->xfer_buflen - bytes;
760 io_u->error = 0;
761 bytes -= io_u->xfer_buflen - io_u->resid;
762 }
763 }
764 } else {
765 pthread_mutex_lock(&ioreq->status_mutex);
38c1392e 766 while (ioreq->status == FIO_IME_IN_PROGRESS)
a40e7a59 767 pthread_cond_wait(&ioreq->cond_endio, &ioreq->status_mutex);
a40e7a59
GB
768 pthread_mutex_unlock(&ioreq->status_mutex);
769 }
770
771 }
772
773 dprint(FD_IO, "getevents(%u,%u) ret=%d queued=%u events=%u\n", min, max,
774 events, ime_d->queued, ime_d->events);
775 return events;
776}
777
778static int fio_ime_aio_init(struct thread_data *td)
779{
780 struct ime_data *ime_d;
781 struct imeaio_req *ioreq;
782 unsigned int i;
783
784 if (fio_ime_engine_init(td) < 0)
785 return 1;
786
787 ime_d = calloc(1, sizeof(*ime_d));
788
789 ime_d->aioreqs = malloc(td->o.iodepth * sizeof(struct imeaio_req));
790 ime_d->iovecs = malloc(td->o.iodepth * sizeof(struct iovec));
791 ime_d->io_us = malloc(2 * td->o.iodepth * sizeof(struct io_u *));
792 ime_d->event_io_us = ime_d->io_us + td->o.iodepth;
793
794 ime_d->depth = td->o.iodepth;
795 for (i = 0; i < ime_d->depth; i++) {
796 ioreq = &ime_d->aioreqs[i];
797 pthread_cond_init(&ioreq->cond_endio, NULL);
798 pthread_mutex_init(&ioreq->status_mutex, NULL);
799 }
800
801 td->io_ops_data = ime_d;
802 return 0;
803}
804
805static void fio_ime_aio_clean(struct thread_data *td)
806{
807 struct ime_data *ime_d = td->io_ops_data;
808 struct imeaio_req *ioreq;
809 unsigned int i;
810
811 if (ime_d) {
812 for (i = 0; i < ime_d->depth; i++) {
813 ioreq = &ime_d->aioreqs[i];
814 pthread_cond_destroy(&ioreq->cond_endio);
815 pthread_mutex_destroy(&ioreq->status_mutex);
816 }
817 free(ime_d->aioreqs);
818 free(ime_d->iovecs);
819 free(ime_d->io_us);
820 free(ime_d);
821 td->io_ops_data = NULL;
822 }
823
824 fio_ime_engine_finalize(td);
825}
826
827
828/**************************************************************
829 * IO engines definitions
830 *
831 **************************************************************/
832
833/* The FIO_DISKLESSIO flag used for these engines is necessary to prevent
834 FIO from using POSIX calls. See fio_ime_open_file for more details. */
835
836static struct ioengine_ops ioengine_prw = {
837 .name = "ime_psync",
838 .version = FIO_IOOPS_VERSION,
839 .setup = fio_ime_setup,
840 .init = fio_ime_engine_init,
841 .cleanup = fio_ime_engine_finalize,
842 .queue = fio_ime_psync_queue,
843 .open_file = fio_ime_open_file,
844 .close_file = fio_ime_close_file,
845 .get_file_size = fio_ime_get_file_size,
846 .unlink_file = fio_ime_unlink_file,
847 .flags = FIO_SYNCIO | FIO_DISKLESSIO,
848};
849
850static struct ioengine_ops ioengine_pvrw = {
851 .name = "ime_psyncv",
852 .version = FIO_IOOPS_VERSION,
853 .setup = fio_ime_setup,
854 .init = fio_ime_psyncv_init,
855 .cleanup = fio_ime_psyncv_clean,
856 .queue = fio_ime_psyncv_queue,
857 .commit = fio_ime_psyncv_commit,
858 .getevents = fio_ime_psyncv_getevents,
859 .event = fio_ime_event,
860 .open_file = fio_ime_open_file,
861 .close_file = fio_ime_close_file,
862 .get_file_size = fio_ime_get_file_size,
863 .unlink_file = fio_ime_unlink_file,
864 .flags = FIO_SYNCIO | FIO_DISKLESSIO,
865};
866
867static struct ioengine_ops ioengine_aio = {
868 .name = "ime_aio",
869 .version = FIO_IOOPS_VERSION,
870 .setup = fio_ime_setup,
871 .init = fio_ime_aio_init,
872 .cleanup = fio_ime_aio_clean,
873 .queue = fio_ime_aio_queue,
874 .commit = fio_ime_aio_commit,
875 .getevents = fio_ime_aio_getevents,
876 .event = fio_ime_event,
877 .open_file = fio_ime_open_file,
878 .close_file = fio_ime_close_file,
879 .get_file_size = fio_ime_get_file_size,
880 .unlink_file = fio_ime_unlink_file,
881 .flags = FIO_DISKLESSIO,
882};
883
884static void fio_init fio_ime_register(void)
885{
886 register_ioengine(&ioengine_prw);
887 register_ioengine(&ioengine_pvrw);
888 register_ioengine(&ioengine_aio);
889}
890
891static void fio_exit fio_ime_unregister(void)
892{
893 unregister_ioengine(&ioengine_prw);
894 unregister_ioengine(&ioengine_pvrw);
895 unregister_ioengine(&ioengine_aio);
38c1392e
JA
896
897 if (fio_ime_is_initialized && ime_native_finalize() < 0)
a40e7a59 898 log_err("Warning: IME did not finalize properly\n");
a40e7a59 899}