[PATCH] Juggle some includes, add cscope make target
[fio.git] / fio-io.c
1 /*
2  * The io parts of the fio tool, includes workers for sync and mmap'ed
3  * io, as well as both posix and linux libaio support.
4  *
5  * sync io is implemented on top of aio.
6  *
7  * This is not really specific to fio, if the get_io_u/put_io_u and
8  * structures was pulled into this as well it would be a perfectly
9  * generic io engine that could be used for other projects.
10  *
11  */
12 #include <stdio.h>
13 #include <stdlib.h>
14 #include <unistd.h>
15 #include <errno.h>
16 #include <assert.h>
17 #include <time.h>
18 #include <sys/mman.h>
19 #include <sys/poll.h>
20 #include "fio.h"
21 #include "os.h"
22
23 #ifdef FIO_HAVE_LIBAIO
24
25 #define ev_to_iou(ev)   (struct io_u *) ((unsigned long) (ev)->obj)
26
27 static int fio_io_sync(struct thread_data *td)
28 {
29         return fsync(td->fd);
30 }
31
32 static int fill_timespec(struct timespec *ts)
33 {
34 #ifdef _POSIX_TIMERS
35         if (!clock_gettime(CLOCK_MONOTONIC, ts))
36                 return 0;
37
38         perror("clock_gettime");
39 #endif
40         return 1;
41 }
42
43 static unsigned long long ts_utime_since_now(struct timespec *t)
44 {
45         long long sec, nsec;
46         struct timespec now;
47
48         if (fill_timespec(&now))
49                 return 0;
50         
51         sec = now.tv_sec - t->tv_sec;
52         nsec = now.tv_nsec - t->tv_nsec;
53         if (sec > 0 && nsec < 0) {
54                 sec--;
55                 nsec += 1000000000;
56         }
57
58         sec *= 1000000;
59         nsec /= 1000;
60         return sec + nsec;
61 }
62
63 struct libaio_data {
64         io_context_t aio_ctx;
65         struct io_event *aio_events;
66 };
67
68 static int fio_libaio_io_prep(struct thread_data *td, struct io_u *io_u)
69 {
70         if (io_u->ddir == DDIR_READ)
71                 io_prep_pread(&io_u->iocb, td->fd, io_u->buf, io_u->buflen, io_u->offset);
72         else
73                 io_prep_pwrite(&io_u->iocb, td->fd, io_u->buf, io_u->buflen, io_u->offset);
74
75         return 0;
76 }
77
78 static struct io_u *fio_libaio_event(struct thread_data *td, int event)
79 {
80         struct libaio_data *ld = td->io_data;
81
82         return ev_to_iou(ld->aio_events + event);
83 }
84
85 static int fio_libaio_getevents(struct thread_data *td, int min, int max,
86                                 struct timespec *t)
87 {
88         struct libaio_data *ld = td->io_data;
89         int r;
90
91         do {
92                 r = io_getevents(ld->aio_ctx, min, max, ld->aio_events, t);
93                 if (r == -EAGAIN) {
94                         usleep(100);
95                         continue;
96                 } else if (r == -EINTR)
97                         continue;
98                 else
99                         break;
100         } while (1);
101
102         return r;
103 }
104
105 static int fio_libaio_queue(struct thread_data *td, struct io_u *io_u)
106 {
107         struct libaio_data *ld = td->io_data;
108         struct iocb *iocb = &io_u->iocb;
109         int ret;
110
111         do {
112                 ret = io_submit(ld->aio_ctx, 1, &iocb);
113                 if (ret == 1)
114                         return 0;
115                 else if (ret == -EAGAIN)
116                         usleep(100);
117                 else if (ret == -EINTR)
118                         continue;
119                 else
120                         break;
121         } while (1);
122
123         return ret;
124
125 }
126
127 static int fio_libaio_cancel(struct thread_data *td, struct io_u *io_u)
128 {
129         struct libaio_data *ld = td->io_data;
130
131         return io_cancel(ld->aio_ctx, &io_u->iocb, ld->aio_events);
132 }
133
134 static void fio_libaio_cleanup(struct thread_data *td)
135 {
136         struct libaio_data *ld = td->io_data;
137
138         if (ld) {
139                 io_destroy(ld->aio_ctx);
140                 if (ld->aio_events)
141                         free(ld->aio_events);
142
143                 free(ld);
144                 td->io_data = NULL;
145         }
146 }
147
148 int fio_libaio_init(struct thread_data *td)
149 {
150         struct libaio_data *ld = malloc(sizeof(*ld));
151
152         memset(ld, 0, sizeof(*ld));
153         if (io_queue_init(td->iodepth, &ld->aio_ctx)) {
154                 td_verror(td, errno);
155                 return 1;
156         }
157
158         td->io_prep = fio_libaio_io_prep;
159         td->io_queue = fio_libaio_queue;
160         td->io_getevents = fio_libaio_getevents;
161         td->io_event = fio_libaio_event;
162         td->io_cancel = fio_libaio_cancel;
163         td->io_cleanup = fio_libaio_cleanup;
164         td->io_sync = fio_io_sync;
165
166         ld->aio_events = malloc(td->iodepth * sizeof(struct io_event));
167         td->io_data = ld;
168         return 0;
169 }
170
171 #else /* FIO_HAVE_LIBAIO */
172
173 int fio_libaio_init(struct thread_data *td)
174 {
175         return EINVAL;
176 }
177
178 #endif /* FIO_HAVE_LIBAIO */
179
180 #ifdef FIO_HAVE_POSIXAIO
181
182 struct posixaio_data {
183         struct io_u **aio_events;
184 };
185
186 static int fio_posixaio_cancel(struct thread_data *td, struct io_u *io_u)
187 {
188         int r = aio_cancel(td->fd, &io_u->aiocb);
189
190         if (r == 1 || r == AIO_CANCELED)
191                 return 0;
192
193         return 1;
194 }
195
196 static int fio_posixaio_prep(struct thread_data *td, struct io_u *io_u)
197 {
198         struct aiocb *aiocb = &io_u->aiocb;
199
200         aiocb->aio_fildes = td->fd;
201         aiocb->aio_buf = io_u->buf;
202         aiocb->aio_nbytes = io_u->buflen;
203         aiocb->aio_offset = io_u->offset;
204
205         io_u->seen = 0;
206         return 0;
207 }
208
209 static int fio_posixaio_getevents(struct thread_data *td, int min, int max,
210                                   struct timespec *t)
211 {
212         struct posixaio_data *pd = td->io_data;
213         struct list_head *entry;
214         struct timespec start;
215         int r, have_timeout = 0;
216
217         if (t && !fill_timespec(&start))
218                 have_timeout = 1;
219
220         r = 0;
221 restart:
222         list_for_each(entry, &td->io_u_busylist) {
223                 struct io_u *io_u = list_entry(entry, struct io_u, list);
224                 int err;
225
226                 if (io_u->seen)
227                         continue;
228
229                 err = aio_error(&io_u->aiocb);
230                 switch (err) {
231                         default:
232                                 io_u->error = err;
233                         case ECANCELED:
234                         case 0:
235                                 pd->aio_events[r++] = io_u;
236                                 io_u->seen = 1;
237                                 break;
238                         case EINPROGRESS:
239                                 break;
240                 }
241
242                 if (r >= max)
243                         break;
244         }
245
246         if (r >= min)
247                 return r;
248
249         if (have_timeout) {
250                 unsigned long long usec;
251
252                 usec = (t->tv_sec * 1000000) + (t->tv_nsec / 1000);
253                 if (ts_utime_since_now(&start) > usec)
254                         return r;
255         }
256
257         /*
258          * hrmpf, we need to wait for more. we should use aio_suspend, for
259          * now just sleep a little and recheck status of busy-and-not-seen
260          */
261         usleep(1000);
262         goto restart;
263 }
264
265 static struct io_u *fio_posixaio_event(struct thread_data *td, int event)
266 {
267         struct posixaio_data *pd = td->io_data;
268
269         return pd->aio_events[event];
270 }
271
272 static int fio_posixaio_queue(struct thread_data *td, struct io_u *io_u)
273 {
274         struct aiocb *aiocb = &io_u->aiocb;
275         int ret;
276
277         if (io_u->ddir == DDIR_READ)
278                 ret = aio_read(aiocb);
279         else
280                 ret = aio_write(aiocb);
281
282         if (ret)
283                 io_u->error = errno;
284                 
285         return io_u->error;
286 }
287
288 static void fio_posixaio_cleanup(struct thread_data *td)
289 {
290         struct posixaio_data *pd = td->io_data;
291
292         if (pd) {
293                 free(pd->aio_events);
294                 free(pd);
295                 td->io_data = NULL;
296         }
297 }
298
299 int fio_posixaio_init(struct thread_data *td)
300 {
301         struct posixaio_data *pd = malloc(sizeof(*pd));
302
303         pd->aio_events = malloc(td->iodepth * sizeof(struct io_u *));
304
305         td->io_prep = fio_posixaio_prep;
306         td->io_queue = fio_posixaio_queue;
307         td->io_getevents = fio_posixaio_getevents;
308         td->io_event = fio_posixaio_event;
309         td->io_cancel = fio_posixaio_cancel;
310         td->io_cleanup = fio_posixaio_cleanup;
311         td->io_sync = fio_io_sync;
312
313         td->io_data = pd;
314         return 0;
315 }
316
317 #else /* FIO_HAVE_POSIXAIO */
318
319 int fio_posixaio_init(struct thread_data *td)
320 {
321         return EINVAL;
322 }
323
324 #endif /* FIO_HAVE_POSIXAIO */
325
326 struct syncio_data {
327         struct io_u *last_io_u;
328 };
329
330 static int fio_syncio_getevents(struct thread_data *td, int min, int max,
331                                 struct timespec *t)
332 {
333         assert(max <= 1);
334
335         /*
336          * we can only have one finished io_u for sync io, since the depth
337          * is always 1
338          */
339         if (list_empty(&td->io_u_busylist))
340                 return 0;
341
342         return 1;
343 }
344
345 static struct io_u *fio_syncio_event(struct thread_data *td, int event)
346 {
347         struct syncio_data *sd = td->io_data;
348
349         assert(event == 0);
350
351         return sd->last_io_u;
352 }
353
354 static int fio_syncio_prep(struct thread_data *td, struct io_u *io_u)
355 {
356         if (lseek(td->fd, io_u->offset, SEEK_SET) == -1) {
357                 td_verror(td, errno);
358                 return 1;
359         }
360
361         return 0;
362 }
363
364 static int fio_syncio_queue(struct thread_data *td, struct io_u *io_u)
365 {
366         struct syncio_data *sd = td->io_data;
367         int ret;
368
369         if (io_u->ddir == DDIR_READ)
370                 ret = read(td->fd, io_u->buf, io_u->buflen);
371         else
372                 ret = write(td->fd, io_u->buf, io_u->buflen);
373
374         if ((unsigned int) ret != io_u->buflen) {
375                 if (ret > 0) {
376                         io_u->resid = io_u->buflen - ret;
377                         io_u->error = ENODATA;
378                 } else
379                         io_u->error = errno;
380         }
381
382         if (!io_u->error)
383                 sd->last_io_u = io_u;
384
385         return io_u->error;
386 }
387
388 static void fio_syncio_cleanup(struct thread_data *td)
389 {
390         if (td->io_data) {
391                 free(td->io_data);
392                 td->io_data = NULL;
393         }
394 }
395
396 int fio_syncio_init(struct thread_data *td)
397 {
398         struct syncio_data *sd = malloc(sizeof(*sd));
399
400         td->io_prep = fio_syncio_prep;
401         td->io_queue = fio_syncio_queue;
402         td->io_getevents = fio_syncio_getevents;
403         td->io_event = fio_syncio_event;
404         td->io_cancel = NULL;
405         td->io_cleanup = fio_syncio_cleanup;
406         td->io_sync = fio_io_sync;
407
408         sd->last_io_u = NULL;
409         td->io_data = sd;
410         return 0;
411 }
412
413 static int fio_mmapio_queue(struct thread_data *td, struct io_u *io_u)
414 {
415         unsigned long long real_off = io_u->offset - td->file_offset;
416         struct syncio_data *sd = td->io_data;
417
418         if (io_u->ddir == DDIR_READ)
419                 memcpy(io_u->buf, td->mmap + real_off, io_u->buflen);
420         else
421                 memcpy(td->mmap + real_off, io_u->buf, io_u->buflen);
422
423         /*
424          * not really direct, but should drop the pages from the cache
425          */
426         if (td->odirect) {
427                 if (msync(td->mmap + real_off, io_u->buflen, MS_SYNC) < 0)
428                         io_u->error = errno;
429                 if (madvise(td->mmap + real_off, io_u->buflen,  MADV_DONTNEED) < 0)
430                         io_u->error = errno;
431         }
432
433         if (!io_u->error)
434                 sd->last_io_u = io_u;
435
436         return io_u->error;
437 }
438
439 static int fio_mmapio_sync(struct thread_data *td)
440 {
441         return msync(td->mmap, td->file_size, MS_SYNC);
442 }
443
444 int fio_mmapio_init(struct thread_data *td)
445 {
446         struct syncio_data *sd = malloc(sizeof(*sd));
447
448         td->io_prep = NULL;
449         td->io_queue = fio_mmapio_queue;
450         td->io_getevents = fio_syncio_getevents;
451         td->io_event = fio_syncio_event;
452         td->io_cancel = NULL;
453         td->io_cleanup = fio_syncio_cleanup;
454         td->io_sync = fio_mmapio_sync;
455
456         sd->last_io_u = NULL;
457         td->io_data = sd;
458         return 0;
459 }
460
461 #ifdef FIO_HAVE_SGIO
462
463 struct sgio_cmd {
464         unsigned char cdb[10];
465         int nr;
466 };
467
468 struct sgio_data {
469         struct sgio_cmd *cmds;
470         struct io_u **events;
471         unsigned int bs;
472 };
473
474 static void sgio_hdr_init(struct sgio_data *sd, struct sg_io_hdr *hdr,
475                           struct io_u *io_u, int fs)
476 {
477         struct sgio_cmd *sc = &sd->cmds[io_u->index];
478
479         memset(hdr, 0, sizeof(*hdr));
480         memset(sc->cdb, 0, sizeof(sc->cdb));
481
482         hdr->interface_id = 'S';
483         hdr->cmdp = sc->cdb;
484         hdr->cmd_len = sizeof(sc->cdb);
485         hdr->pack_id = io_u->index;
486         hdr->usr_ptr = io_u;
487
488         if (fs) {
489                 hdr->dxferp = io_u->buf;
490                 hdr->dxfer_len = io_u->buflen;
491         }
492 }
493
494 static int fio_sgio_getevents(struct thread_data *td, int min, int max,
495                               struct timespec *t)
496 {
497         struct sgio_data *sd = td->io_data;
498         struct pollfd pfd = { .fd = td->fd, .events = POLLIN };
499         void *buf = malloc(max * sizeof(struct sg_io_hdr));
500         int left = max, ret, events, i, r = 0, fl = 0;
501
502         /*
503          * don't block for !events
504          */
505         if (!min) {
506                 fl = fcntl(td->fd, F_GETFL);
507                 fcntl(td->fd, F_SETFL, fl | O_NONBLOCK);
508         }
509
510         while (left) {
511                 do {
512                         if (!min)
513                                 break;
514                         poll(&pfd, 1, -1);
515                         if (pfd.revents & POLLIN)
516                                 break;
517                 } while (1);
518
519                 ret = read(td->fd, buf, left * sizeof(struct sg_io_hdr));
520                 if (ret < 0) {
521                         if (errno == EAGAIN)
522                                 break;
523                         td_verror(td, errno);
524                         r = -1;
525                         break;
526                 } else if (!ret)
527                         break;
528
529                 events = ret / sizeof(struct sg_io_hdr);
530                 left -= events;
531                 r += events;
532
533                 for (i = 0; i < events; i++) {
534                         struct sg_io_hdr *hdr = (struct sg_io_hdr *) buf + i;
535
536                         sd->events[i] = hdr->usr_ptr;
537                 }
538         }
539
540         if (!min)
541                 fcntl(td->fd, F_SETFL, fl);
542
543         free(buf);
544         return r;
545 }
546
547 static int fio_sgio_ioctl_doio(struct thread_data *td, struct io_u *io_u)
548 {
549         struct sgio_data *sd = td->io_data;
550         struct sg_io_hdr *hdr = &io_u->hdr;
551
552         sd->events[0] = io_u;
553
554         return ioctl(td->fd, SG_IO, hdr);
555 }
556
557 static int fio_sgio_rw_doio(struct thread_data *td, struct io_u *io_u, int sync)
558 {
559         struct sg_io_hdr *hdr = &io_u->hdr;
560         int ret;
561
562         ret = write(td->fd, hdr, sizeof(*hdr));
563         if (ret < 0)
564                 return errno;
565
566         if (sync) {
567                 ret = read(td->fd, hdr, sizeof(*hdr));
568                 if (ret < 0)
569                         return errno;
570         }
571
572         return 0;
573 }
574
575 static int fio_sgio_doio(struct thread_data *td, struct io_u *io_u, int sync)
576 {
577         if (td->filetype == FIO_TYPE_BD)
578                 return fio_sgio_ioctl_doio(td, io_u);
579
580         return fio_sgio_rw_doio(td, io_u, sync);
581 }
582
583 static int fio_sgio_sync(struct thread_data *td)
584 {
585         struct sgio_data *sd = td->io_data;
586         struct sg_io_hdr *hdr;
587         struct io_u *io_u;
588         int ret;
589
590         io_u = __get_io_u(td);
591         if (!io_u)
592                 return ENOMEM;
593
594         hdr = &io_u->hdr;
595         sgio_hdr_init(sd, hdr, io_u, 0);
596         hdr->dxfer_direction = SG_DXFER_NONE;
597
598         hdr->cmdp[0] = 0x35;
599
600         ret = fio_sgio_doio(td, io_u, 1);
601         put_io_u(td, io_u);
602         return ret;
603 }
604
605 static int fio_sgio_prep(struct thread_data *td, struct io_u *io_u)
606 {
607         struct sg_io_hdr *hdr = &io_u->hdr;
608         struct sgio_data *sd = td->io_data;
609         int nr_blocks, lba;
610
611         if (io_u->buflen & (sd->bs - 1)) {
612                 fprintf(stderr, "read/write not sector aligned\n");
613                 return EINVAL;
614         }
615
616         sgio_hdr_init(sd, hdr, io_u, 1);
617
618         if (io_u->ddir == DDIR_READ) {
619                 hdr->dxfer_direction = SG_DXFER_FROM_DEV;
620                 hdr->cmdp[0] = 0x28;
621         } else {
622                 hdr->dxfer_direction = SG_DXFER_TO_DEV;
623                 hdr->cmdp[0] = 0x2a;
624         }
625
626         nr_blocks = io_u->buflen / sd->bs;
627         lba = io_u->offset / sd->bs;
628         hdr->cmdp[2] = (lba >> 24) & 0xff;
629         hdr->cmdp[3] = (lba >> 16) & 0xff;
630         hdr->cmdp[4] = (lba >>  8) & 0xff;
631         hdr->cmdp[5] = lba & 0xff;
632         hdr->cmdp[7] = (nr_blocks >> 8) & 0xff;
633         hdr->cmdp[8] = nr_blocks & 0xff;
634         return 0;
635 }
636
637 static int fio_sgio_queue(struct thread_data *td, struct io_u *io_u)
638 {
639         struct sg_io_hdr *hdr = &io_u->hdr;
640         int ret;
641
642         ret = fio_sgio_doio(td, io_u, 0);
643
644         if (ret < 0)
645                 io_u->error = errno;
646         else if (hdr->status) {
647                 io_u->resid = hdr->resid;
648                 io_u->error = EIO;
649         }
650
651         return io_u->error;
652 }
653
654 static struct io_u *fio_sgio_event(struct thread_data *td, int event)
655 {
656         struct sgio_data *sd = td->io_data;
657
658         return sd->events[event];
659 }
660
661 static int fio_sgio_get_bs(struct thread_data *td, unsigned int *bs)
662 {
663         struct sgio_data *sd = td->io_data;
664         struct io_u *io_u;
665         struct sg_io_hdr *hdr;
666         unsigned char buf[8];
667         int ret;
668
669         io_u = __get_io_u(td);
670         assert(io_u);
671
672         hdr = &io_u->hdr;
673         sgio_hdr_init(sd, hdr, io_u, 0);
674         memset(buf, 0, sizeof(buf));
675
676         hdr->cmdp[0] = 0x25;
677         hdr->dxfer_direction = SG_DXFER_FROM_DEV;
678         hdr->dxferp = buf;
679         hdr->dxfer_len = sizeof(buf);
680
681         ret = fio_sgio_doio(td, io_u, 1);
682         if (ret) {
683                 put_io_u(td, io_u);
684                 return ret;
685         }
686
687         *bs = (buf[4] << 24) | (buf[5] << 16) | (buf[6] << 8) | buf[7];
688         put_io_u(td, io_u);
689         return 0;
690 }
691
692 int fio_sgio_init(struct thread_data *td)
693 {
694         struct sgio_data *sd;
695         unsigned int bs;
696         int ret;
697
698         sd = malloc(sizeof(*sd));
699         sd->cmds = malloc(td->iodepth * sizeof(struct sgio_cmd));
700         sd->events = malloc(td->iodepth * sizeof(struct io_u *));
701         td->io_data = sd;
702
703         if (td->filetype == FIO_TYPE_BD) {
704                 if (ioctl(td->fd, BLKSSZGET, &bs) < 0) {
705                         td_verror(td, errno);
706                         return 1;
707                 }
708         } else if (td->filetype == FIO_TYPE_CHAR) {
709                 int version;
710
711                 if (ioctl(td->fd, SG_GET_VERSION_NUM, &version) < 0) {
712                         td_verror(td, errno);
713                         return 1;
714                 }
715
716                 ret = fio_sgio_get_bs(td, &bs);
717                 if (ret)
718                         return ret;
719         } else {
720                 fprintf(stderr, "ioengine sgio only works on block devices\n");
721                 return 1;
722         }
723
724         sd->bs = bs;
725
726         td->io_prep = fio_sgio_prep;
727         td->io_queue = fio_sgio_queue;
728
729         if (td->filetype == FIO_TYPE_BD)
730                 td->io_getevents = fio_syncio_getevents;
731         else
732                 td->io_getevents = fio_sgio_getevents;
733
734         td->io_event = fio_sgio_event;
735         td->io_cancel = NULL;
736         td->io_cleanup = fio_syncio_cleanup;
737         td->io_sync = fio_sgio_sync;
738
739         /*
740          * we want to do it, regardless of whether odirect is set or not
741          */
742         td->override_sync = 1;
743         return 0;
744 }
745
746 #else /* FIO_HAVE_SGIO */
747
748 int fio_sgio_init(struct thread_data *td)
749 {
750         return EINVAL;
751 }
752
753 #endif /* FIO_HAVE_SGIO */
754
755 #ifdef FIO_HAVE_SPLICE
756 struct spliceio_data {
757         struct io_u *last_io_u;
758         int pipe[2];
759 };
760
761 static struct io_u *fio_spliceio_event(struct thread_data *td, int event)
762 {
763         struct spliceio_data *sd = td->io_data;
764
765         assert(event == 0);
766
767         return sd->last_io_u;
768 }
769
770 /*
771  * For splice reading, we unfortunately cannot (yet) vmsplice the other way.
772  * So just splice the data from the file into the pipe, and use regular
773  * read to fill the buffer. Doesn't make a lot of sense, but...
774  */
775 static int fio_splice_read(struct thread_data *td, struct io_u *io_u)
776 {
777         struct spliceio_data *sd = td->io_data;
778         int ret, ret2, buflen;
779         off_t offset;
780         void *p;
781
782         offset = io_u->offset;
783         buflen = io_u->buflen;
784         p = io_u->buf;
785         while (buflen) {
786                 int this_len = buflen;
787
788                 if (this_len > SPLICE_DEF_SIZE)
789                         this_len = SPLICE_DEF_SIZE;
790
791                 ret = splice(td->fd, &offset, sd->pipe[1], NULL, this_len, SPLICE_F_MORE);
792                 if (ret < 0) {
793                         if (errno == ENODATA || errno == EAGAIN)
794                                 continue;
795
796                         return errno;
797                 }
798
799                 buflen -= ret;
800
801                 while (ret) {
802                         ret2 = read(sd->pipe[0], p, ret);
803                         if (ret2 < 0)
804                                 return errno;
805
806                         ret -= ret2;
807                         p += ret2;
808                 }
809         }
810
811         return io_u->buflen;
812 }
813
814 /*
815  * For splice writing, we can vmsplice our data buffer directly into a
816  * pipe and then splice that to a file.
817  */
818 static int fio_splice_write(struct thread_data *td, struct io_u *io_u)
819 {
820         struct spliceio_data *sd = td->io_data;
821         struct iovec iov[1] = {
822                 {
823                         .iov_base = io_u->buf,
824                         .iov_len = io_u->buflen,
825                 }
826         };
827         struct pollfd pfd = { .fd = sd->pipe[1], .events = POLLOUT, };
828         off_t off = io_u->offset;
829         int ret, ret2;
830
831         while (iov[0].iov_len) {
832                 if (poll(&pfd, 1, -1) < 0)
833                         return errno;
834
835                 ret = vmsplice(sd->pipe[1], iov, 1, SPLICE_F_NONBLOCK);
836                 if (ret < 0)
837                         return errno;
838
839                 iov[0].iov_len -= ret;
840                 iov[0].iov_base += ret;
841
842                 while (ret) {
843                         ret2 = splice(sd->pipe[0], NULL, td->fd, &off, ret, 0);
844                         if (ret2 < 0)
845                                 return errno;
846
847                         ret -= ret2;
848                 }
849         }
850
851         return io_u->buflen;
852 }
853
854 static int fio_spliceio_queue(struct thread_data *td, struct io_u *io_u)
855 {
856         struct spliceio_data *sd = td->io_data;
857         int ret;
858
859         if (io_u->ddir == DDIR_READ)
860                 ret = fio_splice_read(td, io_u);
861         else
862                 ret = fio_splice_write(td, io_u);
863
864         if ((unsigned int) ret != io_u->buflen) {
865                 if (ret > 0) {
866                         io_u->resid = io_u->buflen - ret;
867                         io_u->error = ENODATA;
868                 } else
869                         io_u->error = errno;
870         }
871
872         if (!io_u->error)
873                 sd->last_io_u = io_u;
874
875         return io_u->error;
876 }
877
878 static void fio_spliceio_cleanup(struct thread_data *td)
879 {
880         struct spliceio_data *sd = td->io_data;
881
882         if (sd) {
883                 close(sd->pipe[0]);
884                 close(sd->pipe[1]);
885                 free(sd);
886                 td->io_data = NULL;
887         }
888 }
889
890 int fio_spliceio_init(struct thread_data *td)
891 {
892         struct spliceio_data *sd = malloc(sizeof(*sd));
893
894         td->io_queue = fio_spliceio_queue;
895         td->io_getevents = fio_syncio_getevents;
896         td->io_event = fio_spliceio_event;
897         td->io_cancel = NULL;
898         td->io_cleanup = fio_spliceio_cleanup;
899         td->io_sync = fio_io_sync;
900
901         sd->last_io_u = NULL;
902         if (pipe(sd->pipe) < 0) {
903                 td_verror(td, errno);
904                 free(sd);
905                 return 1;
906         }
907
908         td->io_data = sd;
909         return 0;
910 }
911
912 #else /* FIO_HAVE_SPLICE */
913
914 int fio_spliceio_init(struct thread_data *td)
915 {
916         return EINVAL;
917 }
918
919 #endif /* FIO_HAVE_SPLICE */