[PATCH] Improve run_str[] updates and printing
[fio.git] / ioengines.c
1 /*
2  * The io parts of the fio tool, includes workers for sync and mmap'ed
3  * io, as well as both posix and linux libaio support.
4  *
5  * sync io is implemented on top of aio.
6  *
7  * This is not really specific to fio, if the get_io_u/put_io_u and
8  * structures was pulled into this as well it would be a perfectly
9  * generic io engine that could be used for other projects.
10  *
11  */
12 #include <stdio.h>
13 #include <stdlib.h>
14 #include <unistd.h>
15 #include <errno.h>
16 #include <assert.h>
17 #include <time.h>
18 #include <string.h>
19 #include <sys/mman.h>
20 #include <sys/poll.h>
21 #include "fio.h"
22 #include "os.h"
23
24 static int fill_timespec(struct timespec *ts)
25 {
26 #ifdef _POSIX_TIMERS
27         if (!clock_gettime(CLOCK_MONOTONIC, ts))
28                 return 0;
29
30         perror("clock_gettime");
31 #endif
32         return 1;
33 }
34
35 static unsigned long long ts_utime_since_now(struct timespec *t)
36 {
37         long long sec, nsec;
38         struct timespec now;
39
40         if (fill_timespec(&now))
41                 return 0;
42         
43         sec = now.tv_sec - t->tv_sec;
44         nsec = now.tv_nsec - t->tv_nsec;
45         if (sec > 0 && nsec < 0) {
46                 sec--;
47                 nsec += 1000000000;
48         }
49
50         sec *= 1000000;
51         nsec /= 1000;
52         return sec + nsec;
53 }
54
55 static int fio_io_sync(struct thread_data *td)
56 {
57         return fsync(td->fd);
58 }
59
60 #ifdef FIO_HAVE_LIBAIO
61
62 #define ev_to_iou(ev)   (struct io_u *) ((unsigned long) (ev)->obj)
63
64 struct libaio_data {
65         io_context_t aio_ctx;
66         struct io_event *aio_events;
67 };
68
69 static int fio_libaio_io_prep(struct thread_data *td, struct io_u *io_u)
70 {
71         if (io_u->ddir == DDIR_READ)
72                 io_prep_pread(&io_u->iocb, td->fd, io_u->buf, io_u->buflen, io_u->offset);
73         else
74                 io_prep_pwrite(&io_u->iocb, td->fd, io_u->buf, io_u->buflen, io_u->offset);
75
76         return 0;
77 }
78
79 static struct io_u *fio_libaio_event(struct thread_data *td, int event)
80 {
81         struct libaio_data *ld = td->io_data;
82
83         return ev_to_iou(ld->aio_events + event);
84 }
85
86 static int fio_libaio_getevents(struct thread_data *td, int min, int max,
87                                 struct timespec *t)
88 {
89         struct libaio_data *ld = td->io_data;
90         int r;
91
92         do {
93                 r = io_getevents(ld->aio_ctx, min, max, ld->aio_events, t);
94                 if (r == -EAGAIN) {
95                         usleep(100);
96                         continue;
97                 } else if (r == -EINTR)
98                         continue;
99                 else
100                         break;
101         } while (1);
102
103         return r;
104 }
105
106 static int fio_libaio_queue(struct thread_data *td, struct io_u *io_u)
107 {
108         struct libaio_data *ld = td->io_data;
109         struct iocb *iocb = &io_u->iocb;
110         int ret;
111
112         do {
113                 ret = io_submit(ld->aio_ctx, 1, &iocb);
114                 if (ret == 1)
115                         return 0;
116                 else if (ret == -EAGAIN)
117                         usleep(100);
118                 else if (ret == -EINTR)
119                         continue;
120                 else
121                         break;
122         } while (1);
123
124         return ret;
125
126 }
127
128 static int fio_libaio_cancel(struct thread_data *td, struct io_u *io_u)
129 {
130         struct libaio_data *ld = td->io_data;
131
132         return io_cancel(ld->aio_ctx, &io_u->iocb, ld->aio_events);
133 }
134
135 static void fio_libaio_cleanup(struct thread_data *td)
136 {
137         struct libaio_data *ld = td->io_data;
138
139         if (ld) {
140                 io_destroy(ld->aio_ctx);
141                 if (ld->aio_events)
142                         free(ld->aio_events);
143
144                 free(ld);
145                 td->io_data = NULL;
146         }
147 }
148
149 int fio_libaio_init(struct thread_data *td)
150 {
151         struct libaio_data *ld = malloc(sizeof(*ld));
152
153         memset(ld, 0, sizeof(*ld));
154         if (io_queue_init(td->iodepth, &ld->aio_ctx)) {
155                 td_verror(td, errno);
156                 return 1;
157         }
158
159         td->io_prep = fio_libaio_io_prep;
160         td->io_queue = fio_libaio_queue;
161         td->io_getevents = fio_libaio_getevents;
162         td->io_event = fio_libaio_event;
163         td->io_cancel = fio_libaio_cancel;
164         td->io_cleanup = fio_libaio_cleanup;
165         td->io_sync = fio_io_sync;
166
167         ld->aio_events = malloc(td->iodepth * sizeof(struct io_event));
168         td->io_data = ld;
169         return 0;
170 }
171
172 #else /* FIO_HAVE_LIBAIO */
173
174 int fio_libaio_init(struct thread_data *td)
175 {
176         return EINVAL;
177 }
178
179 #endif /* FIO_HAVE_LIBAIO */
180
181 #ifdef FIO_HAVE_POSIXAIO
182
183 struct posixaio_data {
184         struct io_u **aio_events;
185 };
186
187 static int fio_posixaio_cancel(struct thread_data *td, struct io_u *io_u)
188 {
189         int r = aio_cancel(td->fd, &io_u->aiocb);
190
191         if (r == 1 || r == AIO_CANCELED)
192                 return 0;
193
194         return 1;
195 }
196
197 static int fio_posixaio_prep(struct thread_data *td, struct io_u *io_u)
198 {
199         struct aiocb *aiocb = &io_u->aiocb;
200
201         aiocb->aio_fildes = td->fd;
202         aiocb->aio_buf = io_u->buf;
203         aiocb->aio_nbytes = io_u->buflen;
204         aiocb->aio_offset = io_u->offset;
205
206         io_u->seen = 0;
207         return 0;
208 }
209
210 static int fio_posixaio_getevents(struct thread_data *td, int min, int max,
211                                   struct timespec *t)
212 {
213         struct posixaio_data *pd = td->io_data;
214         struct list_head *entry;
215         struct timespec start;
216         int r, have_timeout = 0;
217
218         if (t && !fill_timespec(&start))
219                 have_timeout = 1;
220
221         r = 0;
222 restart:
223         list_for_each(entry, &td->io_u_busylist) {
224                 struct io_u *io_u = list_entry(entry, struct io_u, list);
225                 int err;
226
227                 if (io_u->seen)
228                         continue;
229
230                 err = aio_error(&io_u->aiocb);
231                 switch (err) {
232                         default:
233                                 io_u->error = err;
234                         case ECANCELED:
235                         case 0:
236                                 pd->aio_events[r++] = io_u;
237                                 io_u->seen = 1;
238                                 break;
239                         case EINPROGRESS:
240                                 break;
241                 }
242
243                 if (r >= max)
244                         break;
245         }
246
247         if (r >= min)
248                 return r;
249
250         if (have_timeout) {
251                 unsigned long long usec;
252
253                 usec = (t->tv_sec * 1000000) + (t->tv_nsec / 1000);
254                 if (ts_utime_since_now(&start) > usec)
255                         return r;
256         }
257
258         /*
259          * hrmpf, we need to wait for more. we should use aio_suspend, for
260          * now just sleep a little and recheck status of busy-and-not-seen
261          */
262         usleep(1000);
263         goto restart;
264 }
265
266 static struct io_u *fio_posixaio_event(struct thread_data *td, int event)
267 {
268         struct posixaio_data *pd = td->io_data;
269
270         return pd->aio_events[event];
271 }
272
273 static int fio_posixaio_queue(struct thread_data *td, struct io_u *io_u)
274 {
275         struct aiocb *aiocb = &io_u->aiocb;
276         int ret;
277
278         if (io_u->ddir == DDIR_READ)
279                 ret = aio_read(aiocb);
280         else
281                 ret = aio_write(aiocb);
282
283         if (ret)
284                 io_u->error = errno;
285                 
286         return io_u->error;
287 }
288
289 static void fio_posixaio_cleanup(struct thread_data *td)
290 {
291         struct posixaio_data *pd = td->io_data;
292
293         if (pd) {
294                 free(pd->aio_events);
295                 free(pd);
296                 td->io_data = NULL;
297         }
298 }
299
300 int fio_posixaio_init(struct thread_data *td)
301 {
302         struct posixaio_data *pd = malloc(sizeof(*pd));
303
304         pd->aio_events = malloc(td->iodepth * sizeof(struct io_u *));
305
306         td->io_prep = fio_posixaio_prep;
307         td->io_queue = fio_posixaio_queue;
308         td->io_getevents = fio_posixaio_getevents;
309         td->io_event = fio_posixaio_event;
310         td->io_cancel = fio_posixaio_cancel;
311         td->io_cleanup = fio_posixaio_cleanup;
312         td->io_sync = fio_io_sync;
313
314         td->io_data = pd;
315         return 0;
316 }
317
318 #else /* FIO_HAVE_POSIXAIO */
319
320 int fio_posixaio_init(struct thread_data *td)
321 {
322         return EINVAL;
323 }
324
325 #endif /* FIO_HAVE_POSIXAIO */
326
327 struct syncio_data {
328         struct io_u *last_io_u;
329 };
330
331 static int fio_syncio_getevents(struct thread_data *td, int min, int max,
332                                 struct timespec *t)
333 {
334         assert(max <= 1);
335
336         /*
337          * we can only have one finished io_u for sync io, since the depth
338          * is always 1
339          */
340         if (list_empty(&td->io_u_busylist))
341                 return 0;
342
343         return 1;
344 }
345
346 static struct io_u *fio_syncio_event(struct thread_data *td, int event)
347 {
348         struct syncio_data *sd = td->io_data;
349
350         assert(event == 0);
351
352         return sd->last_io_u;
353 }
354
355 static int fio_syncio_prep(struct thread_data *td, struct io_u *io_u)
356 {
357         if (lseek(td->fd, io_u->offset, SEEK_SET) == -1) {
358                 td_verror(td, errno);
359                 return 1;
360         }
361
362         return 0;
363 }
364
365 static int fio_syncio_queue(struct thread_data *td, struct io_u *io_u)
366 {
367         struct syncio_data *sd = td->io_data;
368         int ret;
369
370         if (io_u->ddir == DDIR_READ)
371                 ret = read(td->fd, io_u->buf, io_u->buflen);
372         else
373                 ret = write(td->fd, io_u->buf, io_u->buflen);
374
375         if ((unsigned int) ret != io_u->buflen) {
376                 if (ret > 0) {
377                         io_u->resid = io_u->buflen - ret;
378                         io_u->error = EIO;
379                 } else
380                         io_u->error = errno;
381         }
382
383         if (!io_u->error)
384                 sd->last_io_u = io_u;
385
386         return io_u->error;
387 }
388
389 static void fio_syncio_cleanup(struct thread_data *td)
390 {
391         if (td->io_data) {
392                 free(td->io_data);
393                 td->io_data = NULL;
394         }
395 }
396
397 int fio_syncio_init(struct thread_data *td)
398 {
399         struct syncio_data *sd = malloc(sizeof(*sd));
400
401         td->io_prep = fio_syncio_prep;
402         td->io_queue = fio_syncio_queue;
403         td->io_getevents = fio_syncio_getevents;
404         td->io_event = fio_syncio_event;
405         td->io_cancel = NULL;
406         td->io_cleanup = fio_syncio_cleanup;
407         td->io_sync = fio_io_sync;
408
409         sd->last_io_u = NULL;
410         td->io_data = sd;
411         return 0;
412 }
413
414 static int fio_mmapio_queue(struct thread_data *td, struct io_u *io_u)
415 {
416         unsigned long long real_off = io_u->offset - td->file_offset;
417         struct syncio_data *sd = td->io_data;
418
419         if (io_u->ddir == DDIR_READ)
420                 memcpy(io_u->buf, td->mmap + real_off, io_u->buflen);
421         else
422                 memcpy(td->mmap + real_off, io_u->buf, io_u->buflen);
423
424         /*
425          * not really direct, but should drop the pages from the cache
426          */
427         if (td->odirect) {
428                 if (msync(td->mmap + real_off, io_u->buflen, MS_SYNC) < 0)
429                         io_u->error = errno;
430                 if (madvise(td->mmap + real_off, io_u->buflen,  MADV_DONTNEED) < 0)
431                         io_u->error = errno;
432         }
433
434         if (!io_u->error)
435                 sd->last_io_u = io_u;
436
437         return io_u->error;
438 }
439
440 static int fio_mmapio_sync(struct thread_data *td)
441 {
442         return msync(td->mmap, td->file_size, MS_SYNC);
443 }
444
445 int fio_mmapio_init(struct thread_data *td)
446 {
447         struct syncio_data *sd = malloc(sizeof(*sd));
448
449         td->io_prep = NULL;
450         td->io_queue = fio_mmapio_queue;
451         td->io_getevents = fio_syncio_getevents;
452         td->io_event = fio_syncio_event;
453         td->io_cancel = NULL;
454         td->io_cleanup = fio_syncio_cleanup;
455         td->io_sync = fio_mmapio_sync;
456
457         sd->last_io_u = NULL;
458         td->io_data = sd;
459         return 0;
460 }
461
462 #ifdef FIO_HAVE_SGIO
463
464 struct sgio_cmd {
465         unsigned char cdb[10];
466         int nr;
467 };
468
469 struct sgio_data {
470         struct sgio_cmd *cmds;
471         struct io_u **events;
472         unsigned int bs;
473 };
474
475 static void sgio_hdr_init(struct sgio_data *sd, struct sg_io_hdr *hdr,
476                           struct io_u *io_u, int fs)
477 {
478         struct sgio_cmd *sc = &sd->cmds[io_u->index];
479
480         memset(hdr, 0, sizeof(*hdr));
481         memset(sc->cdb, 0, sizeof(sc->cdb));
482
483         hdr->interface_id = 'S';
484         hdr->cmdp = sc->cdb;
485         hdr->cmd_len = sizeof(sc->cdb);
486         hdr->pack_id = io_u->index;
487         hdr->usr_ptr = io_u;
488
489         if (fs) {
490                 hdr->dxferp = io_u->buf;
491                 hdr->dxfer_len = io_u->buflen;
492         }
493 }
494
495 static int fio_sgio_getevents(struct thread_data *td, int min, int max,
496                               struct timespec *t)
497 {
498         struct sgio_data *sd = td->io_data;
499         struct pollfd pfd = { .fd = td->fd, .events = POLLIN };
500         void *buf = malloc(max * sizeof(struct sg_io_hdr));
501         int left = max, ret, events, i, r = 0, fl = 0;
502
503         /*
504          * don't block for !events
505          */
506         if (!min) {
507                 fl = fcntl(td->fd, F_GETFL);
508                 fcntl(td->fd, F_SETFL, fl | O_NONBLOCK);
509         }
510
511         while (left) {
512                 do {
513                         if (!min)
514                                 break;
515                         poll(&pfd, 1, -1);
516                         if (pfd.revents & POLLIN)
517                                 break;
518                 } while (1);
519
520                 ret = read(td->fd, buf, left * sizeof(struct sg_io_hdr));
521                 if (ret < 0) {
522                         if (errno == EAGAIN)
523                                 break;
524                         td_verror(td, errno);
525                         r = -1;
526                         break;
527                 } else if (!ret)
528                         break;
529
530                 events = ret / sizeof(struct sg_io_hdr);
531                 left -= events;
532                 r += events;
533
534                 for (i = 0; i < events; i++) {
535                         struct sg_io_hdr *hdr = (struct sg_io_hdr *) buf + i;
536
537                         sd->events[i] = hdr->usr_ptr;
538                 }
539         }
540
541         if (!min)
542                 fcntl(td->fd, F_SETFL, fl);
543
544         free(buf);
545         return r;
546 }
547
548 static int fio_sgio_ioctl_doio(struct thread_data *td, struct io_u *io_u)
549 {
550         struct sgio_data *sd = td->io_data;
551         struct sg_io_hdr *hdr = &io_u->hdr;
552
553         sd->events[0] = io_u;
554
555         return ioctl(td->fd, SG_IO, hdr);
556 }
557
558 static int fio_sgio_rw_doio(struct thread_data *td, struct io_u *io_u, int sync)
559 {
560         struct sg_io_hdr *hdr = &io_u->hdr;
561         int ret;
562
563         ret = write(td->fd, hdr, sizeof(*hdr));
564         if (ret < 0)
565                 return errno;
566
567         if (sync) {
568                 ret = read(td->fd, hdr, sizeof(*hdr));
569                 if (ret < 0)
570                         return errno;
571         }
572
573         return 0;
574 }
575
576 static int fio_sgio_doio(struct thread_data *td, struct io_u *io_u, int sync)
577 {
578         if (td->filetype == FIO_TYPE_BD)
579                 return fio_sgio_ioctl_doio(td, io_u);
580
581         return fio_sgio_rw_doio(td, io_u, sync);
582 }
583
584 static int fio_sgio_sync(struct thread_data *td)
585 {
586         struct sgio_data *sd = td->io_data;
587         struct sg_io_hdr *hdr;
588         struct io_u *io_u;
589         int ret;
590
591         io_u = __get_io_u(td);
592         if (!io_u)
593                 return ENOMEM;
594
595         hdr = &io_u->hdr;
596         sgio_hdr_init(sd, hdr, io_u, 0);
597         hdr->dxfer_direction = SG_DXFER_NONE;
598
599         hdr->cmdp[0] = 0x35;
600
601         ret = fio_sgio_doio(td, io_u, 1);
602         put_io_u(td, io_u);
603         return ret;
604 }
605
606 static int fio_sgio_prep(struct thread_data *td, struct io_u *io_u)
607 {
608         struct sg_io_hdr *hdr = &io_u->hdr;
609         struct sgio_data *sd = td->io_data;
610         int nr_blocks, lba;
611
612         if (io_u->buflen & (sd->bs - 1)) {
613                 fprintf(stderr, "read/write not sector aligned\n");
614                 return EINVAL;
615         }
616
617         sgio_hdr_init(sd, hdr, io_u, 1);
618
619         if (io_u->ddir == DDIR_READ) {
620                 hdr->dxfer_direction = SG_DXFER_FROM_DEV;
621                 hdr->cmdp[0] = 0x28;
622         } else {
623                 hdr->dxfer_direction = SG_DXFER_TO_DEV;
624                 hdr->cmdp[0] = 0x2a;
625         }
626
627         nr_blocks = io_u->buflen / sd->bs;
628         lba = io_u->offset / sd->bs;
629         hdr->cmdp[2] = (lba >> 24) & 0xff;
630         hdr->cmdp[3] = (lba >> 16) & 0xff;
631         hdr->cmdp[4] = (lba >>  8) & 0xff;
632         hdr->cmdp[5] = lba & 0xff;
633         hdr->cmdp[7] = (nr_blocks >> 8) & 0xff;
634         hdr->cmdp[8] = nr_blocks & 0xff;
635         return 0;
636 }
637
638 static int fio_sgio_queue(struct thread_data *td, struct io_u *io_u)
639 {
640         struct sg_io_hdr *hdr = &io_u->hdr;
641         int ret;
642
643         ret = fio_sgio_doio(td, io_u, 0);
644
645         if (ret < 0)
646                 io_u->error = errno;
647         else if (hdr->status) {
648                 io_u->resid = hdr->resid;
649                 io_u->error = EIO;
650         }
651
652         return io_u->error;
653 }
654
655 static struct io_u *fio_sgio_event(struct thread_data *td, int event)
656 {
657         struct sgio_data *sd = td->io_data;
658
659         return sd->events[event];
660 }
661
662 static int fio_sgio_get_bs(struct thread_data *td, unsigned int *bs)
663 {
664         struct sgio_data *sd = td->io_data;
665         struct io_u *io_u;
666         struct sg_io_hdr *hdr;
667         unsigned char buf[8];
668         int ret;
669
670         io_u = __get_io_u(td);
671         assert(io_u);
672
673         hdr = &io_u->hdr;
674         sgio_hdr_init(sd, hdr, io_u, 0);
675         memset(buf, 0, sizeof(buf));
676
677         hdr->cmdp[0] = 0x25;
678         hdr->dxfer_direction = SG_DXFER_FROM_DEV;
679         hdr->dxferp = buf;
680         hdr->dxfer_len = sizeof(buf);
681
682         ret = fio_sgio_doio(td, io_u, 1);
683         if (ret) {
684                 put_io_u(td, io_u);
685                 return ret;
686         }
687
688         *bs = (buf[4] << 24) | (buf[5] << 16) | (buf[6] << 8) | buf[7];
689         put_io_u(td, io_u);
690         return 0;
691 }
692
693 int fio_sgio_init(struct thread_data *td)
694 {
695         struct sgio_data *sd;
696         unsigned int bs;
697         int ret;
698
699         sd = malloc(sizeof(*sd));
700         sd->cmds = malloc(td->iodepth * sizeof(struct sgio_cmd));
701         sd->events = malloc(td->iodepth * sizeof(struct io_u *));
702         td->io_data = sd;
703
704         if (td->filetype == FIO_TYPE_BD) {
705                 if (ioctl(td->fd, BLKSSZGET, &bs) < 0) {
706                         td_verror(td, errno);
707                         return 1;
708                 }
709         } else if (td->filetype == FIO_TYPE_CHAR) {
710                 int version;
711
712                 if (ioctl(td->fd, SG_GET_VERSION_NUM, &version) < 0) {
713                         td_verror(td, errno);
714                         return 1;
715                 }
716
717                 ret = fio_sgio_get_bs(td, &bs);
718                 if (ret)
719                         return ret;
720         } else {
721                 fprintf(stderr, "ioengine sgio only works on block devices\n");
722                 return 1;
723         }
724
725         sd->bs = bs;
726
727         td->io_prep = fio_sgio_prep;
728         td->io_queue = fio_sgio_queue;
729
730         if (td->filetype == FIO_TYPE_BD)
731                 td->io_getevents = fio_syncio_getevents;
732         else
733                 td->io_getevents = fio_sgio_getevents;
734
735         td->io_event = fio_sgio_event;
736         td->io_cancel = NULL;
737         td->io_cleanup = fio_syncio_cleanup;
738         td->io_sync = fio_sgio_sync;
739
740         /*
741          * we want to do it, regardless of whether odirect is set or not
742          */
743         td->override_sync = 1;
744         return 0;
745 }
746
747 #else /* FIO_HAVE_SGIO */
748
749 int fio_sgio_init(struct thread_data *td)
750 {
751         return EINVAL;
752 }
753
754 #endif /* FIO_HAVE_SGIO */
755
756 #ifdef FIO_HAVE_SPLICE
757 struct spliceio_data {
758         struct io_u *last_io_u;
759         int pipe[2];
760 };
761
762 static struct io_u *fio_spliceio_event(struct thread_data *td, int event)
763 {
764         struct spliceio_data *sd = td->io_data;
765
766         assert(event == 0);
767
768         return sd->last_io_u;
769 }
770
771 /*
772  * For splice reading, we unfortunately cannot (yet) vmsplice the other way.
773  * So just splice the data from the file into the pipe, and use regular
774  * read to fill the buffer. Doesn't make a lot of sense, but...
775  */
776 static int fio_splice_read(struct thread_data *td, struct io_u *io_u)
777 {
778         struct spliceio_data *sd = td->io_data;
779         int ret, ret2, buflen;
780         off_t offset;
781         void *p;
782
783         offset = io_u->offset;
784         buflen = io_u->buflen;
785         p = io_u->buf;
786         while (buflen) {
787                 int this_len = buflen;
788
789                 if (this_len > SPLICE_DEF_SIZE)
790                         this_len = SPLICE_DEF_SIZE;
791
792                 ret = splice(td->fd, &offset, sd->pipe[1], NULL, this_len, SPLICE_F_MORE);
793                 if (ret < 0) {
794                         if (errno == ENODATA || errno == EAGAIN)
795                                 continue;
796
797                         return errno;
798                 }
799
800                 buflen -= ret;
801
802                 while (ret) {
803                         ret2 = read(sd->pipe[0], p, ret);
804                         if (ret2 < 0)
805                                 return errno;
806
807                         ret -= ret2;
808                         p += ret2;
809                 }
810         }
811
812         return io_u->buflen;
813 }
814
815 /*
816  * For splice writing, we can vmsplice our data buffer directly into a
817  * pipe and then splice that to a file.
818  */
819 static int fio_splice_write(struct thread_data *td, struct io_u *io_u)
820 {
821         struct spliceio_data *sd = td->io_data;
822         struct iovec iov[1] = {
823                 {
824                         .iov_base = io_u->buf,
825                         .iov_len = io_u->buflen,
826                 }
827         };
828         struct pollfd pfd = { .fd = sd->pipe[1], .events = POLLOUT, };
829         off_t off = io_u->offset;
830         int ret, ret2;
831
832         while (iov[0].iov_len) {
833                 if (poll(&pfd, 1, -1) < 0)
834                         return errno;
835
836                 ret = vmsplice(sd->pipe[1], iov, 1, SPLICE_F_NONBLOCK);
837                 if (ret < 0)
838                         return errno;
839
840                 iov[0].iov_len -= ret;
841                 iov[0].iov_base += ret;
842
843                 while (ret) {
844                         ret2 = splice(sd->pipe[0], NULL, td->fd, &off, ret, 0);
845                         if (ret2 < 0)
846                                 return errno;
847
848                         ret -= ret2;
849                 }
850         }
851
852         return io_u->buflen;
853 }
854
855 static int fio_spliceio_queue(struct thread_data *td, struct io_u *io_u)
856 {
857         struct spliceio_data *sd = td->io_data;
858         int ret;
859
860         if (io_u->ddir == DDIR_READ)
861                 ret = fio_splice_read(td, io_u);
862         else
863                 ret = fio_splice_write(td, io_u);
864
865         if ((unsigned int) ret != io_u->buflen) {
866                 if (ret > 0) {
867                         io_u->resid = io_u->buflen - ret;
868                         io_u->error = ENODATA;
869                 } else
870                         io_u->error = errno;
871         }
872
873         if (!io_u->error)
874                 sd->last_io_u = io_u;
875
876         return io_u->error;
877 }
878
879 static void fio_spliceio_cleanup(struct thread_data *td)
880 {
881         struct spliceio_data *sd = td->io_data;
882
883         if (sd) {
884                 close(sd->pipe[0]);
885                 close(sd->pipe[1]);
886                 free(sd);
887                 td->io_data = NULL;
888         }
889 }
890
891 int fio_spliceio_init(struct thread_data *td)
892 {
893         struct spliceio_data *sd = malloc(sizeof(*sd));
894
895         td->io_queue = fio_spliceio_queue;
896         td->io_getevents = fio_syncio_getevents;
897         td->io_event = fio_spliceio_event;
898         td->io_cancel = NULL;
899         td->io_cleanup = fio_spliceio_cleanup;
900         td->io_sync = fio_io_sync;
901
902         sd->last_io_u = NULL;
903         if (pipe(sd->pipe) < 0) {
904                 td_verror(td, errno);
905                 free(sd);
906                 return 1;
907         }
908
909         td->io_data = sd;
910         return 0;
911 }
912
913 #else /* FIO_HAVE_SPLICE */
914
915 int fio_spliceio_init(struct thread_data *td)
916 {
917         return EINVAL;
918 }
919
920 #endif /* FIO_HAVE_SPLICE */