fb554a4808745f278054515b05c62370f36c154c
[fio.git] / engines / net.c
1 /*
2  * net engine
3  *
4  * IO engine that reads/writes to/from sockets.
5  *
6  */
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include <unistd.h>
10 #include <errno.h>
11 #include <assert.h>
12 #include <netinet/in.h>
13 #include <arpa/inet.h>
14 #include <netdb.h>
15 #include <sys/poll.h>
16 #include <sys/types.h>
17 #include <sys/stat.h>
18 #include <sys/socket.h>
19 #include <sys/un.h>
20
21 #include "../fio.h"
22
23 struct netio_data {
24         int listenfd;
25         int use_splice;
26         int pipes[2];
27         struct sockaddr_in addr;
28         struct sockaddr_un addr_un;
29 };
30
31 struct netio_options {
32         struct thread_data *td;
33         unsigned int port;
34         unsigned int proto;
35         unsigned int listen;
36 };
37
38 struct udp_close_msg {
39         uint32_t magic;
40         uint32_t cmd;
41 };
42
43 enum {
44         FIO_LINK_CLOSE = 0x89,
45         FIO_LINK_OPEN_CLOSE_MAGIC = 0x6c696e6b,
46         FIO_LINK_OPEN = 0x98,
47
48         FIO_TYPE_TCP    = 1,
49         FIO_TYPE_UDP    = 2,
50         FIO_TYPE_UNIX   = 3,
51 };
52
53 static int str_hostname_cb(void *data, const char *input);
54 static struct fio_option options[] = {
55         {
56                 .name   = "hostname",
57                 .type   = FIO_OPT_STR_STORE,
58                 .cb     = str_hostname_cb,
59                 .help   = "Hostname for net IO engine",
60         },
61         {
62                 .name   = "port",
63                 .type   = FIO_OPT_INT,
64                 .off1   = offsetof(struct netio_options, port),
65                 .minval = 1,
66                 .maxval = 65535,
67                 .help   = "Port to use for TCP or UDP net connections",
68         },
69         {
70                 .name   = "protocol",
71                 .alias  = "proto",
72                 .type   = FIO_OPT_STR,
73                 .off1   = offsetof(struct netio_options, proto),
74                 .help   = "Network protocol to use",
75                 .def    = "tcp",
76                 .posval = {
77                           { .ival = "tcp",
78                             .oval = FIO_TYPE_TCP,
79                             .help = "Transmission Control Protocol",
80                           },
81                           { .ival = "udp",
82                             .oval = FIO_TYPE_UDP,
83                             .help = "User Datagram Protocol",
84                           },
85                           { .ival = "unix",
86                             .oval = FIO_TYPE_UNIX,
87                             .help = "UNIX domain socket",
88                           },
89                 },
90         },
91         {
92                 .name   = "listen",
93                 .type   = FIO_OPT_STR_SET,
94                 .off1   = offsetof(struct netio_options, listen),
95                 .help   = "Listen for incoming TCP connections",
96         },
97         {
98                 .name   = NULL,
99         },
100 };
101
102 /*
103  * Return -1 for error and 'nr events' for a positive number
104  * of events
105  */
106 static int poll_wait(struct thread_data *td, int fd, short events)
107 {
108         struct pollfd pfd;
109         int ret;
110
111         while (!td->terminate) {
112                 pfd.fd = fd;
113                 pfd.events = events;
114                 ret = poll(&pfd, 1, -1);
115                 if (ret < 0) {
116                         if (errno == EINTR)
117                                 break;
118
119                         td_verror(td, errno, "poll");
120                         return -1;
121                 } else if (!ret)
122                         continue;
123
124                 break;
125         }
126
127         if (pfd.revents & events)
128                 return 1;
129
130         return -1;
131 }
132
133 static int fio_netio_prep(struct thread_data *td, struct io_u *io_u)
134 {
135         struct netio_options *o = td->eo;
136
137         /*
138          * Make sure we don't see spurious reads to a receiver, and vice versa
139          */
140         if (o->proto == FIO_TYPE_TCP)
141                 return 0;
142
143         if ((o->listen && io_u->ddir == DDIR_WRITE) ||
144             (!o->listen && io_u->ddir == DDIR_READ)) {
145                 td_verror(td, EINVAL, "bad direction");
146                 return 1;
147         }
148
149         return 0;
150 }
151
152 #ifdef FIO_HAVE_SPLICE
153 static int splice_io_u(int fdin, int fdout, unsigned int len)
154 {
155         int bytes = 0;
156
157         while (len) {
158                 int ret = splice(fdin, NULL, fdout, NULL, len, 0);
159
160                 if (ret < 0) {
161                         if (!bytes)
162                                 bytes = ret;
163
164                         break;
165                 } else if (!ret)
166                         break;
167
168                 bytes += ret;
169                 len -= ret;
170         }
171
172         return bytes;
173 }
174
175 /*
176  * Receive bytes from a socket and fill them into the internal pipe
177  */
178 static int splice_in(struct thread_data *td, struct io_u *io_u)
179 {
180         struct netio_data *nd = td->io_ops->data;
181
182         return splice_io_u(io_u->file->fd, nd->pipes[1], io_u->xfer_buflen);
183 }
184
185 /*
186  * Transmit 'len' bytes from the internal pipe
187  */
188 static int splice_out(struct thread_data *td, struct io_u *io_u,
189                       unsigned int len)
190 {
191         struct netio_data *nd = td->io_ops->data;
192
193         return splice_io_u(nd->pipes[0], io_u->file->fd, len);
194 }
195
196 static int vmsplice_io_u(struct io_u *io_u, int fd, unsigned int len)
197 {
198         struct iovec iov = {
199                 .iov_base = io_u->xfer_buf,
200                 .iov_len = len,
201         };
202         int bytes = 0;
203
204         while (iov.iov_len) {
205                 int ret = vmsplice(fd, &iov, 1, SPLICE_F_MOVE);
206
207                 if (ret < 0) {
208                         if (!bytes)
209                                 bytes = ret;
210                         break;
211                 } else if (!ret)
212                         break;
213
214                 iov.iov_len -= ret;
215                 iov.iov_base += ret;
216                 bytes += ret;
217         }
218
219         return bytes;
220
221 }
222
223 /*
224  * vmsplice() pipe to io_u buffer
225  */
226 static int vmsplice_io_u_out(struct thread_data *td, struct io_u *io_u,
227                              unsigned int len)
228 {
229         struct netio_data *nd = td->io_ops->data;
230
231         return vmsplice_io_u(io_u, nd->pipes[0], len);
232 }
233
234 /*
235  * vmsplice() io_u to pipe
236  */
237 static int vmsplice_io_u_in(struct thread_data *td, struct io_u *io_u)
238 {
239         struct netio_data *nd = td->io_ops->data;
240
241         return vmsplice_io_u(io_u, nd->pipes[1], io_u->xfer_buflen);
242 }
243
244 /*
245  * splice receive - transfer socket data into a pipe using splice, then map
246  * that pipe data into the io_u using vmsplice.
247  */
248 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
249 {
250         int ret;
251
252         ret = splice_in(td, io_u);
253         if (ret > 0)
254                 return vmsplice_io_u_out(td, io_u, ret);
255
256         return ret;
257 }
258
259 /*
260  * splice transmit - map data from the io_u into a pipe by using vmsplice,
261  * then transfer that pipe to a socket using splice.
262  */
263 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
264 {
265         int ret;
266
267         ret = vmsplice_io_u_in(td, io_u);
268         if (ret > 0)
269                 return splice_out(td, io_u, ret);
270
271         return ret;
272 }
273 #else
274 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
275 {
276         errno = EOPNOTSUPP;
277         return -1;
278 }
279
280 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
281 {
282         errno = EOPNOTSUPP;
283         return -1;
284 }
285 #endif
286
287 static int fio_netio_send(struct thread_data *td, struct io_u *io_u)
288 {
289         struct netio_data *nd = td->io_ops->data;
290         struct netio_options *o = td->eo;
291         int ret, flags = OS_MSG_DONTWAIT;
292
293         do {
294                 if (o->proto == FIO_TYPE_UDP) {
295                         struct sockaddr *to = (struct sockaddr *) &nd->addr;
296
297                         ret = sendto(io_u->file->fd, io_u->xfer_buf,
298                                         io_u->xfer_buflen, flags, to,
299                                         sizeof(*to));
300                 } else {
301                         /*
302                          * if we are going to write more, set MSG_MORE
303                          */
304 #ifdef MSG_MORE
305                         if (td->this_io_bytes[DDIR_WRITE] + io_u->xfer_buflen <
306                             td->o.size)
307                                 flags |= MSG_MORE;
308 #endif
309                         ret = send(io_u->file->fd, io_u->xfer_buf,
310                                         io_u->xfer_buflen, flags);
311                 }
312                 if (ret > 0)
313                         break;
314
315                 ret = poll_wait(td, io_u->file->fd, POLLOUT);
316                 if (ret <= 0)
317                         break;
318
319                 flags &= ~OS_MSG_DONTWAIT;
320         } while (1);
321
322         return ret;
323 }
324
325 static int is_udp_close(struct io_u *io_u, int len)
326 {
327         struct udp_close_msg *msg;
328
329         if (len != sizeof(struct udp_close_msg))
330                 return 0;
331
332         msg = io_u->xfer_buf;
333         if (ntohl(msg->magic) != FIO_LINK_OPEN_CLOSE_MAGIC)
334                 return 0;
335         if (ntohl(msg->cmd) != FIO_LINK_CLOSE)
336                 return 0;
337
338         return 1;
339 }
340
341 static int fio_netio_recv(struct thread_data *td, struct io_u *io_u)
342 {
343         struct netio_data *nd = td->io_ops->data;
344         struct netio_options *o = td->eo;
345         int ret, flags = OS_MSG_DONTWAIT;
346
347         do {
348                 if (o->proto == FIO_TYPE_UDP) {
349                         fio_socklen_t len = sizeof(nd->addr);
350                         struct sockaddr *from = (struct sockaddr *) &nd->addr;
351
352                         ret = recvfrom(io_u->file->fd, io_u->xfer_buf,
353                                         io_u->xfer_buflen, flags, from, &len);
354                         if (is_udp_close(io_u, ret)) {
355                                 td->done = 1;
356                                 return 0;
357                         }
358                 } else {
359                         ret = recv(io_u->file->fd, io_u->xfer_buf,
360                                         io_u->xfer_buflen, flags);
361                 }
362                 if (ret > 0)
363                         break;
364                 else if (!ret && (flags & MSG_WAITALL))
365                         break;
366
367                 ret = poll_wait(td, io_u->file->fd, POLLIN);
368                 if (ret <= 0)
369                         break;
370                 flags &= ~OS_MSG_DONTWAIT;
371                 flags |= MSG_WAITALL;
372         } while (1);
373
374         return ret;
375 }
376
377 static int fio_netio_queue(struct thread_data *td, struct io_u *io_u)
378 {
379         struct netio_data *nd = td->io_ops->data;
380         struct netio_options *o = td->eo;
381         int ret;
382
383         fio_ro_check(td, io_u);
384
385         if (io_u->ddir == DDIR_WRITE) {
386                 if (!nd->use_splice || o->proto == FIO_TYPE_UDP ||
387                     o->proto == FIO_TYPE_UNIX)
388                         ret = fio_netio_send(td, io_u);
389                 else
390                         ret = fio_netio_splice_out(td, io_u);
391         } else if (io_u->ddir == DDIR_READ) {
392                 if (!nd->use_splice || o->proto == FIO_TYPE_UDP ||
393                     o->proto == FIO_TYPE_UNIX)
394                         ret = fio_netio_recv(td, io_u);
395                 else
396                         ret = fio_netio_splice_in(td, io_u);
397         } else
398                 ret = 0;        /* must be a SYNC */
399
400         if (ret != (int) io_u->xfer_buflen) {
401                 if (ret >= 0) {
402                         io_u->resid = io_u->xfer_buflen - ret;
403                         io_u->error = 0;
404                         return FIO_Q_COMPLETED;
405                 } else {
406                         int err = errno;
407
408                         if (io_u->ddir == DDIR_WRITE && err == EMSGSIZE)
409                                 return FIO_Q_BUSY;
410
411                         io_u->error = err;
412                 }
413         }
414
415         if (io_u->error)
416                 td_verror(td, io_u->error, "xfer");
417
418         return FIO_Q_COMPLETED;
419 }
420
421 static int fio_netio_connect(struct thread_data *td, struct fio_file *f)
422 {
423         struct netio_data *nd = td->io_ops->data;
424         struct netio_options *o = td->eo;
425         int type, domain;
426
427         if (o->proto == FIO_TYPE_TCP) {
428                 domain = AF_INET;
429                 type = SOCK_STREAM;
430         } else if (o->proto == FIO_TYPE_UDP) {
431                 domain = AF_INET;
432                 type = SOCK_DGRAM;
433         } else if (o->proto == FIO_TYPE_UNIX) {
434                 domain = AF_UNIX;
435                 type = SOCK_STREAM;
436         } else {
437                 log_err("fio: bad network type %d\n", o->proto);
438                 f->fd = -1;
439                 return 1;
440         }
441
442         f->fd = socket(domain, type, 0);
443         if (f->fd < 0) {
444                 td_verror(td, errno, "socket");
445                 return 1;
446         }
447
448         if (o->proto == FIO_TYPE_UDP)
449                 return 0;
450         else if (o->proto == FIO_TYPE_TCP) {
451                 fio_socklen_t len = sizeof(nd->addr);
452
453                 if (connect(f->fd, (struct sockaddr *) &nd->addr, len) < 0) {
454                         td_verror(td, errno, "connect");
455                         close(f->fd);
456                         return 1;
457                 }
458         } else {
459                 struct sockaddr_un *addr = &nd->addr_un;
460                 fio_socklen_t len;
461
462                 len = sizeof(addr->sun_family) + strlen(addr->sun_path) + 1;
463
464                 if (connect(f->fd, (struct sockaddr *) addr, len) < 0) {
465                         td_verror(td, errno, "connect");
466                         close(f->fd);
467                         return 1;
468                 }
469         }
470
471         return 0;
472 }
473
474 static int fio_netio_accept(struct thread_data *td, struct fio_file *f)
475 {
476         struct netio_data *nd = td->io_ops->data;
477         struct netio_options *o = td->eo;
478         fio_socklen_t socklen = sizeof(nd->addr);
479         int state;
480
481         if (o->proto == FIO_TYPE_UDP) {
482                 f->fd = nd->listenfd;
483                 return 0;
484         }
485
486         state = td->runstate;
487         td_set_runstate(td, TD_SETTING_UP);
488
489         log_info("fio: waiting for connection\n");
490
491         if (poll_wait(td, nd->listenfd, POLLIN) < 0)
492                 goto err;
493
494         f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr, &socklen);
495         if (f->fd < 0) {
496                 td_verror(td, errno, "accept");
497                 goto err;
498         }
499
500         td_set_runstate(td, state);
501         return 0;
502 err:
503         td_set_runstate(td, state);
504         return 1;
505 }
506
507 static void fio_netio_udp_close(struct thread_data *td, struct fio_file *f)
508 {
509         struct netio_data *nd = td->io_ops->data;
510         struct udp_close_msg msg;
511         struct sockaddr *to = (struct sockaddr *) &nd->addr;
512         int ret;
513
514         msg.magic = htonl(FIO_LINK_OPEN_CLOSE_MAGIC);
515         msg.cmd = htonl(FIO_LINK_CLOSE);
516
517         ret = sendto(f->fd, &msg, sizeof(msg), MSG_WAITALL, to,
518                         sizeof(nd->addr));
519         if (ret < 0)
520                 td_verror(td, errno, "sendto udp link close");
521 }
522
523 static int fio_netio_close_file(struct thread_data *td, struct fio_file *f)
524 {
525         struct netio_options *o = td->eo;
526
527         /*
528          * If this is an UDP connection, notify the receiver that we are
529          * closing down the link
530          */
531         if (o->proto == FIO_TYPE_UDP)
532                 fio_netio_udp_close(td, f);
533
534         return generic_close_file(td, f);
535 }
536
537 static int fio_netio_udp_recv_open(struct thread_data *td, struct fio_file *f)
538 {
539         struct netio_data *nd = td->io_ops->data;
540         struct udp_close_msg msg;
541         struct sockaddr *to = (struct sockaddr *) &nd->addr;
542         fio_socklen_t len = sizeof(nd->addr);
543         int ret;
544
545         ret = recvfrom(f->fd, &msg, sizeof(msg), MSG_WAITALL, to, &len);
546         if (ret < 0) {
547                 td_verror(td, errno, "sendto udp link open");
548                 return ret;
549         }
550
551         if (ntohl(msg.magic) != FIO_LINK_OPEN_CLOSE_MAGIC ||
552             ntohl(msg.cmd) != FIO_LINK_OPEN) {
553                 log_err("fio: bad udp open magic %x/%x\n", ntohl(msg.magic),
554                                                                 ntohl(msg.cmd));
555                 return -1;
556         }
557
558         return 0;
559 }
560
561 static int fio_netio_udp_send_open(struct thread_data *td, struct fio_file *f)
562 {
563         struct netio_data *nd = td->io_ops->data;
564         struct udp_close_msg msg;
565         struct sockaddr *to = (struct sockaddr *) &nd->addr;
566         int ret;
567
568         msg.magic = htonl(FIO_LINK_OPEN_CLOSE_MAGIC);
569         msg.cmd = htonl(FIO_LINK_OPEN);
570
571         ret = sendto(f->fd, &msg, sizeof(msg), MSG_WAITALL, to,
572                         sizeof(nd->addr));
573         if (ret < 0) {
574                 td_verror(td, errno, "sendto udp link open");
575                 return ret;
576         }
577
578         return 0;
579 }
580
581 static int fio_netio_open_file(struct thread_data *td, struct fio_file *f)
582 {
583         int ret;
584         struct netio_options *o = td->eo;
585
586         if (o->listen)
587                 ret = fio_netio_accept(td, f);
588         else
589                 ret = fio_netio_connect(td, f);
590
591         if (ret) {
592                 f->fd = -1;
593                 return ret;
594         }
595
596         if (o->proto == FIO_TYPE_UDP) {
597                 if (td_write(td))
598                         ret = fio_netio_udp_send_open(td, f);
599                 else {
600                         int state;
601
602                         state = td->runstate;
603                         td_set_runstate(td, TD_SETTING_UP);
604                         ret = fio_netio_udp_recv_open(td, f);
605                         td_set_runstate(td, state);
606                 }
607         }
608
609         if (ret)
610                 fio_netio_close_file(td, f);
611
612         return ret;
613 }
614
615 static int fio_netio_setup_connect_inet(struct thread_data *td,
616                                         const char *host, unsigned short port)
617 {
618         struct netio_data *nd = td->io_ops->data;
619
620         if (!host) {
621                 log_err("fio: connect with no host to connect to.\n");
622                 if (td_read(td))
623                         log_err("fio: did you forget to set 'listen'?\n");
624
625                 td_verror(td, EINVAL, "no hostname= set");
626                 return 1;
627         }
628
629         nd->addr.sin_family = AF_INET;
630         nd->addr.sin_port = htons(port);
631
632         if (inet_aton(host, &nd->addr.sin_addr) != 1) {
633                 struct hostent *hent;
634
635                 hent = gethostbyname(host);
636                 if (!hent) {
637                         td_verror(td, errno, "gethostbyname");
638                         return 1;
639                 }
640
641                 memcpy(&nd->addr.sin_addr, hent->h_addr, 4);
642         }
643
644         return 0;
645 }
646
647 static int fio_netio_setup_connect_unix(struct thread_data *td,
648                                         const char *path)
649 {
650         struct netio_data *nd = td->io_ops->data;
651         struct sockaddr_un *soun = &nd->addr_un;
652
653         soun->sun_family = AF_UNIX;
654         strcpy(soun->sun_path, path);
655         return 0;
656 }
657
658 static int fio_netio_setup_connect(struct thread_data *td)
659 {
660         struct netio_options *o = td->eo;
661
662         if (o->proto == FIO_TYPE_UDP || o->proto == FIO_TYPE_TCP)
663                 return fio_netio_setup_connect_inet(td, td->o.filename,o->port);
664         else
665                 return fio_netio_setup_connect_unix(td, td->o.filename);
666 }
667
668 static int fio_netio_setup_listen_unix(struct thread_data *td, const char *path)
669 {
670         struct netio_data *nd = td->io_ops->data;
671         struct sockaddr_un *addr = &nd->addr_un;
672         mode_t mode;
673         int len, fd;
674
675         fd = socket(AF_UNIX, SOCK_STREAM, 0);
676         if (fd < 0) {
677                 log_err("fio: socket: %s\n", strerror(errno));
678                 return -1;
679         }
680
681         mode = umask(000);
682
683         memset(addr, 0, sizeof(*addr));
684         addr->sun_family = AF_UNIX;
685         strcpy(addr->sun_path, path);
686         unlink(path);
687
688         len = sizeof(addr->sun_family) + strlen(path) + 1;
689
690         if (bind(fd, (struct sockaddr *) addr, len) < 0) {
691                 log_err("fio: bind: %s\n", strerror(errno));
692                 close(fd);
693                 return -1;
694         }
695
696         umask(mode);
697         nd->listenfd = fd;
698         return 0;
699 }
700
701 static int fio_netio_setup_listen_inet(struct thread_data *td, short port)
702 {
703         struct netio_data *nd = td->io_ops->data;
704         struct netio_options *o = td->eo;
705         int fd, opt, type;
706
707         if (o->proto == FIO_TYPE_TCP)
708                 type = SOCK_STREAM;
709         else
710                 type = SOCK_DGRAM;
711
712         fd = socket(AF_INET, type, 0);
713         if (fd < 0) {
714                 td_verror(td, errno, "socket");
715                 return 1;
716         }
717
718         opt = 1;
719         if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
720                 td_verror(td, errno, "setsockopt");
721                 return 1;
722         }
723 #ifdef SO_REUSEPORT
724         if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) < 0) {
725                 td_verror(td, errno, "setsockopt");
726                 return 1;
727         }
728 #endif
729
730         nd->addr.sin_family = AF_INET;
731         nd->addr.sin_addr.s_addr = htonl(INADDR_ANY);
732         nd->addr.sin_port = htons(port);
733
734         if (bind(fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
735                 td_verror(td, errno, "bind");
736                 return 1;
737         }
738
739         nd->listenfd = fd;
740         return 0;
741 }
742
743 static int fio_netio_setup_listen(struct thread_data *td)
744 {
745         struct netio_data *nd = td->io_ops->data;
746         struct netio_options *o = td->eo;
747         int ret;
748
749         if (o->proto == FIO_TYPE_UDP || o->proto == FIO_TYPE_TCP)
750                 ret = fio_netio_setup_listen_inet(td, o->port);
751         else
752                 ret = fio_netio_setup_listen_unix(td, td->o.filename);
753
754         if (ret)
755                 return ret;
756         if (o->proto == FIO_TYPE_UDP)
757                 return 0;
758
759         if (listen(nd->listenfd, 10) < 0) {
760                 td_verror(td, errno, "listen");
761                 nd->listenfd = -1;
762                 return 1;
763         }
764
765         return 0;
766 }
767
768 static int fio_netio_init(struct thread_data *td)
769 {
770         struct netio_options *o = td->eo;
771         int ret;
772
773 #ifdef WIN32
774         WSADATA wsd;
775         WSAStartup(MAKEWORD(2,2), &wsd);
776 #endif
777
778         if (td_random(td)) {
779                 log_err("fio: network IO can't be random\n");
780                 return 1;
781         }
782
783         if (o->proto == FIO_TYPE_UNIX && o->port) {
784                 log_err("fio: network IO port not valid with unix socket\n");
785                 return 1;
786         } else if (o->proto != FIO_TYPE_UNIX && !o->port) {
787                 log_err("fio: network IO requires port for tcp or udp\n");
788                 return 1;
789         }
790
791         if (o->proto != FIO_TYPE_TCP) {
792                 if (o->listen) {
793                         log_err("fio: listen only valid for TCP proto IO\n");
794                         return 1;
795                 }
796                 if (td_rw(td)) {
797                         log_err("fio: datagram network connections must be"
798                                    " read OR write\n");
799                         return 1;
800                 }
801                 if (o->proto == FIO_TYPE_UNIX && !td->o.filename) {
802                         log_err("fio: UNIX sockets need host/filename\n");
803                         return 1;
804                 }
805                 o->listen = td_read(td);
806         }
807
808         if (o->proto != FIO_TYPE_UNIX && o->listen && td->o.filename) {
809                 log_err("fio: hostname not valid for inbound network IO\n");
810                 return 1;
811         }
812
813         if (o->listen)
814                 ret = fio_netio_setup_listen(td);
815         else
816                 ret = fio_netio_setup_connect(td);
817
818         return ret;
819 }
820
821 static void fio_netio_cleanup(struct thread_data *td)
822 {
823         struct netio_data *nd = td->io_ops->data;
824
825         if (nd) {
826                 if (nd->listenfd != -1)
827                         close(nd->listenfd);
828                 if (nd->pipes[0] != -1)
829                         close(nd->pipes[0]);
830                 if (nd->pipes[1] != -1)
831                         close(nd->pipes[1]);
832
833                 free(nd);
834         }
835 }
836
837 static int fio_netio_setup(struct thread_data *td)
838 {
839         struct netio_data *nd;
840
841         if (!td->files_index) {
842                 add_file(td, td->o.filename ?: "net");
843                 td->o.nr_files = td->o.nr_files ?: 1;
844         }
845
846         if (!td->io_ops->data) {
847                 nd = malloc(sizeof(*nd));;
848
849                 memset(nd, 0, sizeof(*nd));
850                 nd->listenfd = -1;
851                 nd->pipes[0] = nd->pipes[1] = -1;
852                 td->io_ops->data = nd;
853         }
854
855         return 0;
856 }
857
858 #ifdef FIO_HAVE_SPLICE
859 static int fio_netio_setup_splice(struct thread_data *td)
860 {
861         struct netio_data *nd;
862
863         fio_netio_setup(td);
864
865         nd = td->io_ops->data;
866         if (nd) {
867                 if (pipe(nd->pipes) < 0)
868                         return 1;
869
870                 nd->use_splice = 1;
871                 return 0;
872         }
873
874         return 1;
875 }
876
877 static struct ioengine_ops ioengine_splice = {
878         .name                   = "netsplice",
879         .version                = FIO_IOOPS_VERSION,
880         .prep                   = fio_netio_prep,
881         .queue                  = fio_netio_queue,
882         .setup                  = fio_netio_setup_splice,
883         .init                   = fio_netio_init,
884         .cleanup                = fio_netio_cleanup,
885         .open_file              = fio_netio_open_file,
886         .close_file             = generic_close_file,
887         .options                = options,
888         .option_struct_size     = sizeof(struct netio_options),
889         .flags                  = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
890                                   FIO_SIGTERM | FIO_PIPEIO,
891 };
892 #endif
893
894 static struct ioengine_ops ioengine_rw = {
895         .name                   = "net",
896         .version                = FIO_IOOPS_VERSION,
897         .prep                   = fio_netio_prep,
898         .queue                  = fio_netio_queue,
899         .setup                  = fio_netio_setup,
900         .init                   = fio_netio_init,
901         .cleanup                = fio_netio_cleanup,
902         .open_file              = fio_netio_open_file,
903         .close_file             = fio_netio_close_file,
904         .options                = options,
905         .option_struct_size     = sizeof(struct netio_options),
906         .flags                  = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
907                                   FIO_SIGTERM | FIO_PIPEIO,
908 };
909
910 static int str_hostname_cb(void *data, const char *input)
911 {
912         struct netio_options *o = data;
913
914         if (o->td->o.filename)
915                 free(o->td->o.filename);
916         o->td->o.filename = strdup(input);
917         return 0;
918 }
919
920 static void fio_init fio_netio_register(void)
921 {
922         register_ioengine(&ioengine_rw);
923 #ifdef FIO_HAVE_SPLICE
924         register_ioengine(&ioengine_splice);
925 #endif
926 }
927
928 static void fio_exit fio_netio_unregister(void)
929 {
930         unregister_ioengine(&ioengine_rw);
931 #ifdef FIO_HAVE_SPLICE
932         unregister_ioengine(&ioengine_splice);
933 #endif
934 }