Refactor #includes and headers
[fio.git] / engines / net.c
1 /*
2  * net engine
3  *
4  * IO engine that reads/writes to/from sockets.
5  *
6  */
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include <unistd.h>
10 #include <signal.h>
11 #include <errno.h>
12 #include <netinet/in.h>
13 #include <netinet/tcp.h>
14 #include <arpa/inet.h>
15 #include <netdb.h>
16 #include <sys/poll.h>
17 #include <sys/stat.h>
18 #include <sys/socket.h>
19 #include <sys/un.h>
20
21 #include "../fio.h"
22 #include "../verify.h"
23 #include "../optgroup.h"
24
25 struct netio_data {
26         int listenfd;
27         int use_splice;
28         int seq_off;
29         int pipes[2];
30         struct sockaddr_in addr;
31         struct sockaddr_in6 addr6;
32         struct sockaddr_un addr_un;
33         uint64_t udp_send_seq;
34         uint64_t udp_recv_seq;
35 };
36
37 struct netio_options {
38         struct thread_data *td;
39         unsigned int port;
40         unsigned int proto;
41         unsigned int listen;
42         unsigned int pingpong;
43         unsigned int nodelay;
44         unsigned int ttl;
45         unsigned int window_size;
46         unsigned int mss;
47         char *intfc;
48 };
49
50 struct udp_close_msg {
51         uint32_t magic;
52         uint32_t cmd;
53 };
54
55 struct udp_seq {
56         uint64_t magic;
57         uint64_t seq;
58         uint64_t bs;
59 };
60
61 enum {
62         FIO_LINK_CLOSE = 0x89,
63         FIO_LINK_OPEN_CLOSE_MAGIC = 0x6c696e6b,
64         FIO_LINK_OPEN = 0x98,
65         FIO_UDP_SEQ_MAGIC = 0x657375716e556563ULL,
66
67         FIO_TYPE_TCP    = 1,
68         FIO_TYPE_UDP    = 2,
69         FIO_TYPE_UNIX   = 3,
70         FIO_TYPE_TCP_V6 = 4,
71         FIO_TYPE_UDP_V6 = 5,
72 };
73
74 static int str_hostname_cb(void *data, const char *input);
75 static struct fio_option options[] = {
76         {
77                 .name   = "hostname",
78                 .lname  = "net engine hostname",
79                 .type   = FIO_OPT_STR_STORE,
80                 .cb     = str_hostname_cb,
81                 .help   = "Hostname for net IO engine",
82                 .category = FIO_OPT_C_ENGINE,
83                 .group  = FIO_OPT_G_NETIO,
84         },
85         {
86                 .name   = "port",
87                 .lname  = "net engine port",
88                 .type   = FIO_OPT_INT,
89                 .off1   = offsetof(struct netio_options, port),
90                 .minval = 1,
91                 .maxval = 65535,
92                 .help   = "Port to use for TCP or UDP net connections",
93                 .category = FIO_OPT_C_ENGINE,
94                 .group  = FIO_OPT_G_NETIO,
95         },
96         {
97                 .name   = "protocol",
98                 .lname  = "net engine protocol",
99                 .alias  = "proto",
100                 .type   = FIO_OPT_STR,
101                 .off1   = offsetof(struct netio_options, proto),
102                 .help   = "Network protocol to use",
103                 .def    = "tcp",
104                 .posval = {
105                           { .ival = "tcp",
106                             .oval = FIO_TYPE_TCP,
107                             .help = "Transmission Control Protocol",
108                           },
109 #ifdef CONFIG_IPV6
110                           { .ival = "tcpv6",
111                             .oval = FIO_TYPE_TCP_V6,
112                             .help = "Transmission Control Protocol V6",
113                           },
114 #endif
115                           { .ival = "udp",
116                             .oval = FIO_TYPE_UDP,
117                             .help = "User Datagram Protocol",
118                           },
119 #ifdef CONFIG_IPV6
120                           { .ival = "udpv6",
121                             .oval = FIO_TYPE_UDP_V6,
122                             .help = "User Datagram Protocol V6",
123                           },
124 #endif
125                           { .ival = "unix",
126                             .oval = FIO_TYPE_UNIX,
127                             .help = "UNIX domain socket",
128                           },
129                 },
130                 .category = FIO_OPT_C_ENGINE,
131                 .group  = FIO_OPT_G_NETIO,
132         },
133 #ifdef CONFIG_TCP_NODELAY
134         {
135                 .name   = "nodelay",
136                 .lname  = "No Delay",
137                 .type   = FIO_OPT_BOOL,
138                 .off1   = offsetof(struct netio_options, nodelay),
139                 .help   = "Use TCP_NODELAY on TCP connections",
140                 .category = FIO_OPT_C_ENGINE,
141                 .group  = FIO_OPT_G_NETIO,
142         },
143 #endif
144         {
145                 .name   = "listen",
146                 .lname  = "net engine listen",
147                 .type   = FIO_OPT_STR_SET,
148                 .off1   = offsetof(struct netio_options, listen),
149                 .help   = "Listen for incoming TCP connections",
150                 .category = FIO_OPT_C_ENGINE,
151                 .group  = FIO_OPT_G_NETIO,
152         },
153         {
154                 .name   = "pingpong",
155                 .lname  = "Ping Pong",
156                 .type   = FIO_OPT_STR_SET,
157                 .off1   = offsetof(struct netio_options, pingpong),
158                 .help   = "Ping-pong IO requests",
159                 .category = FIO_OPT_C_ENGINE,
160                 .group  = FIO_OPT_G_NETIO,
161         },
162         {
163                 .name   = "interface",
164                 .lname  = "net engine interface",
165                 .type   = FIO_OPT_STR_STORE,
166                 .off1   = offsetof(struct netio_options, intfc),
167                 .help   = "Network interface to use",
168                 .category = FIO_OPT_C_ENGINE,
169                 .group  = FIO_OPT_G_NETIO,
170         },
171         {
172                 .name   = "ttl",
173                 .lname  = "net engine multicast ttl",
174                 .type   = FIO_OPT_INT,
175                 .off1   = offsetof(struct netio_options, ttl),
176                 .def    = "1",
177                 .minval = 0,
178                 .help   = "Time-to-live value for outgoing UDP multicast packets",
179                 .category = FIO_OPT_C_ENGINE,
180                 .group  = FIO_OPT_G_NETIO,
181         },
182 #ifdef CONFIG_NET_WINDOWSIZE
183         {
184                 .name   = "window_size",
185                 .lname  = "Window Size",
186                 .type   = FIO_OPT_INT,
187                 .off1   = offsetof(struct netio_options, window_size),
188                 .minval = 0,
189                 .help   = "Set socket buffer window size",
190                 .category = FIO_OPT_C_ENGINE,
191                 .group  = FIO_OPT_G_NETIO,
192         },
193 #endif
194 #ifdef CONFIG_NET_MSS
195         {
196                 .name   = "mss",
197                 .lname  = "Maximum segment size",
198                 .type   = FIO_OPT_INT,
199                 .off1   = offsetof(struct netio_options, mss),
200                 .minval = 0,
201                 .help   = "Set TCP maximum segment size",
202                 .category = FIO_OPT_C_ENGINE,
203                 .group  = FIO_OPT_G_NETIO,
204         },
205 #endif
206         {
207                 .name   = NULL,
208         },
209 };
210
211 static inline int is_udp(struct netio_options *o)
212 {
213         return o->proto == FIO_TYPE_UDP || o->proto == FIO_TYPE_UDP_V6;
214 }
215
216 static inline int is_tcp(struct netio_options *o)
217 {
218         return o->proto == FIO_TYPE_TCP || o->proto == FIO_TYPE_TCP_V6;
219 }
220
221 static inline int is_ipv6(struct netio_options *o)
222 {
223         return o->proto == FIO_TYPE_UDP_V6 || o->proto == FIO_TYPE_TCP_V6;
224 }
225
226 static int set_window_size(struct thread_data *td, int fd)
227 {
228 #ifdef CONFIG_NET_WINDOWSIZE
229         struct netio_options *o = td->eo;
230         unsigned int wss;
231         int snd, rcv, ret;
232
233         if (!o->window_size)
234                 return 0;
235
236         rcv = o->listen || o->pingpong;
237         snd = !o->listen || o->pingpong;
238         wss = o->window_size;
239         ret = 0;
240
241         if (rcv) {
242                 ret = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (void *) &wss,
243                                         sizeof(wss));
244                 if (ret < 0)
245                         td_verror(td, errno, "rcvbuf window size");
246         }
247         if (snd && !ret) {
248                 ret = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (void *) &wss,
249                                         sizeof(wss));
250                 if (ret < 0)
251                         td_verror(td, errno, "sndbuf window size");
252         }
253
254         return ret;
255 #else
256         td_verror(td, -EINVAL, "setsockopt window size");
257         return -1;
258 #endif
259 }
260
261 static int set_mss(struct thread_data *td, int fd)
262 {
263 #ifdef CONFIG_NET_MSS
264         struct netio_options *o = td->eo;
265         unsigned int mss;
266         int ret;
267
268         if (!o->mss || !is_tcp(o))
269                 return 0;
270
271         mss = o->mss;
272         ret = setsockopt(fd, IPPROTO_TCP, TCP_MAXSEG, (void *) &mss,
273                                 sizeof(mss));
274         if (ret < 0)
275                 td_verror(td, errno, "setsockopt TCP_MAXSEG");
276
277         return ret;
278 #else
279         td_verror(td, -EINVAL, "setsockopt TCP_MAXSEG");
280         return -1;
281 #endif
282 }
283
284
285 /*
286  * Return -1 for error and 'nr events' for a positive number
287  * of events
288  */
289 static int poll_wait(struct thread_data *td, int fd, short events)
290 {
291         struct pollfd pfd;
292         int ret;
293
294         while (!td->terminate) {
295                 pfd.fd = fd;
296                 pfd.events = events;
297                 ret = poll(&pfd, 1, -1);
298                 if (ret < 0) {
299                         if (errno == EINTR)
300                                 break;
301
302                         td_verror(td, errno, "poll");
303                         return -1;
304                 } else if (!ret)
305                         continue;
306
307                 break;
308         }
309
310         if (pfd.revents & events)
311                 return 1;
312
313         return -1;
314 }
315
316 static int fio_netio_is_multicast(const char *mcaddr)
317 {
318         in_addr_t addr = inet_network(mcaddr);
319         if (addr == -1)
320                 return 0;
321
322         if (inet_network("224.0.0.0") <= addr &&
323             inet_network("239.255.255.255") >= addr)
324                 return 1;
325
326         return 0;
327 }
328
329
330 static int fio_netio_prep(struct thread_data *td, struct io_u *io_u)
331 {
332         struct netio_options *o = td->eo;
333
334         /*
335          * Make sure we don't see spurious reads to a receiver, and vice versa
336          */
337         if (is_tcp(o))
338                 return 0;
339
340         if ((o->listen && io_u->ddir == DDIR_WRITE) ||
341             (!o->listen && io_u->ddir == DDIR_READ)) {
342                 td_verror(td, EINVAL, "bad direction");
343                 return 1;
344         }
345
346         return 0;
347 }
348
349 #ifdef CONFIG_LINUX_SPLICE
350 static int splice_io_u(int fdin, int fdout, unsigned int len)
351 {
352         int bytes = 0;
353
354         while (len) {
355                 int ret = splice(fdin, NULL, fdout, NULL, len, 0);
356
357                 if (ret < 0) {
358                         if (!bytes)
359                                 bytes = ret;
360
361                         break;
362                 } else if (!ret)
363                         break;
364
365                 bytes += ret;
366                 len -= ret;
367         }
368
369         return bytes;
370 }
371
372 /*
373  * Receive bytes from a socket and fill them into the internal pipe
374  */
375 static int splice_in(struct thread_data *td, struct io_u *io_u)
376 {
377         struct netio_data *nd = td->io_ops_data;
378
379         return splice_io_u(io_u->file->fd, nd->pipes[1], io_u->xfer_buflen);
380 }
381
382 /*
383  * Transmit 'len' bytes from the internal pipe
384  */
385 static int splice_out(struct thread_data *td, struct io_u *io_u,
386                       unsigned int len)
387 {
388         struct netio_data *nd = td->io_ops_data;
389
390         return splice_io_u(nd->pipes[0], io_u->file->fd, len);
391 }
392
393 static int vmsplice_io_u(struct io_u *io_u, int fd, unsigned int len)
394 {
395         struct iovec iov = {
396                 .iov_base = io_u->xfer_buf,
397                 .iov_len = len,
398         };
399         int bytes = 0;
400
401         while (iov.iov_len) {
402                 int ret = vmsplice(fd, &iov, 1, SPLICE_F_MOVE);
403
404                 if (ret < 0) {
405                         if (!bytes)
406                                 bytes = ret;
407                         break;
408                 } else if (!ret)
409                         break;
410
411                 iov.iov_len -= ret;
412                 iov.iov_base += ret;
413                 bytes += ret;
414         }
415
416         return bytes;
417
418 }
419
420 /*
421  * vmsplice() pipe to io_u buffer
422  */
423 static int vmsplice_io_u_out(struct thread_data *td, struct io_u *io_u,
424                              unsigned int len)
425 {
426         struct netio_data *nd = td->io_ops_data;
427
428         return vmsplice_io_u(io_u, nd->pipes[0], len);
429 }
430
431 /*
432  * vmsplice() io_u to pipe
433  */
434 static int vmsplice_io_u_in(struct thread_data *td, struct io_u *io_u)
435 {
436         struct netio_data *nd = td->io_ops_data;
437
438         return vmsplice_io_u(io_u, nd->pipes[1], io_u->xfer_buflen);
439 }
440
441 /*
442  * splice receive - transfer socket data into a pipe using splice, then map
443  * that pipe data into the io_u using vmsplice.
444  */
445 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
446 {
447         int ret;
448
449         ret = splice_in(td, io_u);
450         if (ret > 0)
451                 return vmsplice_io_u_out(td, io_u, ret);
452
453         return ret;
454 }
455
456 /*
457  * splice transmit - map data from the io_u into a pipe by using vmsplice,
458  * then transfer that pipe to a socket using splice.
459  */
460 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
461 {
462         int ret;
463
464         ret = vmsplice_io_u_in(td, io_u);
465         if (ret > 0)
466                 return splice_out(td, io_u, ret);
467
468         return ret;
469 }
470 #else
471 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
472 {
473         errno = EOPNOTSUPP;
474         return -1;
475 }
476
477 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
478 {
479         errno = EOPNOTSUPP;
480         return -1;
481 }
482 #endif
483
484 static void store_udp_seq(struct netio_data *nd, struct io_u *io_u)
485 {
486         struct udp_seq *us;
487
488         if (io_u->xfer_buflen < sizeof(*us))
489                 return;
490
491         us = io_u->xfer_buf + io_u->xfer_buflen - sizeof(*us);
492         us->magic = cpu_to_le64((uint64_t) FIO_UDP_SEQ_MAGIC);
493         us->bs = cpu_to_le64((uint64_t) io_u->xfer_buflen);
494         us->seq = cpu_to_le64(nd->udp_send_seq++);
495 }
496
497 static void verify_udp_seq(struct thread_data *td, struct netio_data *nd,
498                            struct io_u *io_u)
499 {
500         struct udp_seq *us;
501         uint64_t seq;
502
503         if (io_u->xfer_buflen < sizeof(*us))
504                 return;
505
506         if (nd->seq_off)
507                 return;
508
509         us = io_u->xfer_buf + io_u->xfer_buflen - sizeof(*us);
510         if (le64_to_cpu(us->magic) != FIO_UDP_SEQ_MAGIC)
511                 return;
512         if (le64_to_cpu(us->bs) != io_u->xfer_buflen) {
513                 nd->seq_off = 1;
514                 return;
515         }
516
517         seq = le64_to_cpu(us->seq);
518
519         if (seq != nd->udp_recv_seq)
520                 td->ts.drop_io_u[io_u->ddir] += seq - nd->udp_recv_seq;
521
522         nd->udp_recv_seq = seq + 1;
523 }
524
525 static int fio_netio_send(struct thread_data *td, struct io_u *io_u)
526 {
527         struct netio_data *nd = td->io_ops_data;
528         struct netio_options *o = td->eo;
529         int ret, flags = 0;
530
531         do {
532                 if (is_udp(o)) {
533                         const struct sockaddr *to;
534                         socklen_t len;
535
536                         if (is_ipv6(o)) {
537                                 to = (struct sockaddr *) &nd->addr6;
538                                 len = sizeof(nd->addr6);
539                         } else {
540                                 to = (struct sockaddr *) &nd->addr;
541                                 len = sizeof(nd->addr);
542                         }
543
544                         if (td->o.verify == VERIFY_NONE)
545                                 store_udp_seq(nd, io_u);
546
547                         ret = sendto(io_u->file->fd, io_u->xfer_buf,
548                                         io_u->xfer_buflen, flags, to, len);
549                 } else {
550                         /*
551                          * if we are going to write more, set MSG_MORE
552                          */
553 #ifdef MSG_MORE
554                         if ((td->this_io_bytes[DDIR_WRITE] + io_u->xfer_buflen <
555                             td->o.size) && !o->pingpong)
556                                 flags |= MSG_MORE;
557 #endif
558                         ret = send(io_u->file->fd, io_u->xfer_buf,
559                                         io_u->xfer_buflen, flags);
560                 }
561                 if (ret > 0)
562                         break;
563
564                 ret = poll_wait(td, io_u->file->fd, POLLOUT);
565                 if (ret <= 0)
566                         break;
567         } while (1);
568
569         return ret;
570 }
571
572 static int is_close_msg(struct io_u *io_u, int len)
573 {
574         struct udp_close_msg *msg;
575
576         if (len != sizeof(struct udp_close_msg))
577                 return 0;
578
579         msg = io_u->xfer_buf;
580         if (le32_to_cpu(msg->magic) != FIO_LINK_OPEN_CLOSE_MAGIC)
581                 return 0;
582         if (le32_to_cpu(msg->cmd) != FIO_LINK_CLOSE)
583                 return 0;
584
585         return 1;
586 }
587
588 static int fio_netio_recv(struct thread_data *td, struct io_u *io_u)
589 {
590         struct netio_data *nd = td->io_ops_data;
591         struct netio_options *o = td->eo;
592         int ret, flags = 0;
593
594         do {
595                 if (is_udp(o)) {
596                         struct sockaddr *from;
597                         socklen_t l, *len = &l;
598
599                         if (o->listen) {
600                                 if (!is_ipv6(o)) {
601                                         from = (struct sockaddr *) &nd->addr;
602                                         *len = sizeof(nd->addr);
603                                 } else {
604                                         from = (struct sockaddr *) &nd->addr6;
605                                         *len = sizeof(nd->addr6);
606                                 }
607                         } else {
608                                 from = NULL;
609                                 len = NULL;
610                         }
611
612                         ret = recvfrom(io_u->file->fd, io_u->xfer_buf,
613                                         io_u->xfer_buflen, flags, from, len);
614
615                         if (is_close_msg(io_u, ret)) {
616                                 td->done = 1;
617                                 return 0;
618                         }
619                 } else {
620                         ret = recv(io_u->file->fd, io_u->xfer_buf,
621                                         io_u->xfer_buflen, flags);
622
623                         if (is_close_msg(io_u, ret)) {
624                                 td->done = 1;
625                                 return 0;
626                         }
627                 }
628                 if (ret > 0)
629                         break;
630                 else if (!ret && (flags & MSG_WAITALL))
631                         break;
632
633                 ret = poll_wait(td, io_u->file->fd, POLLIN);
634                 if (ret <= 0)
635                         break;
636                 flags |= MSG_WAITALL;
637         } while (1);
638
639         if (is_udp(o) && td->o.verify == VERIFY_NONE)
640                 verify_udp_seq(td, nd, io_u);
641
642         return ret;
643 }
644
645 static int __fio_netio_queue(struct thread_data *td, struct io_u *io_u,
646                              enum fio_ddir ddir)
647 {
648         struct netio_data *nd = td->io_ops_data;
649         struct netio_options *o = td->eo;
650         int ret;
651
652         if (ddir == DDIR_WRITE) {
653                 if (!nd->use_splice || is_udp(o) ||
654                     o->proto == FIO_TYPE_UNIX)
655                         ret = fio_netio_send(td, io_u);
656                 else
657                         ret = fio_netio_splice_out(td, io_u);
658         } else if (ddir == DDIR_READ) {
659                 if (!nd->use_splice || is_udp(o) ||
660                     o->proto == FIO_TYPE_UNIX)
661                         ret = fio_netio_recv(td, io_u);
662                 else
663                         ret = fio_netio_splice_in(td, io_u);
664         } else
665                 ret = 0;        /* must be a SYNC */
666
667         if (ret != (int) io_u->xfer_buflen) {
668                 if (ret > 0) {
669                         io_u->resid = io_u->xfer_buflen - ret;
670                         io_u->error = 0;
671                         return FIO_Q_COMPLETED;
672                 } else if (!ret)
673                         return FIO_Q_BUSY;
674                 else {
675                         int err = errno;
676
677                         if (ddir == DDIR_WRITE && err == EMSGSIZE)
678                                 return FIO_Q_BUSY;
679
680                         io_u->error = err;
681                 }
682         }
683
684         if (io_u->error)
685                 td_verror(td, io_u->error, "xfer");
686
687         return FIO_Q_COMPLETED;
688 }
689
690 static int fio_netio_queue(struct thread_data *td, struct io_u *io_u)
691 {
692         struct netio_options *o = td->eo;
693         int ret;
694
695         fio_ro_check(td, io_u);
696
697         ret = __fio_netio_queue(td, io_u, io_u->ddir);
698         if (!o->pingpong || ret != FIO_Q_COMPLETED)
699                 return ret;
700
701         /*
702          * For ping-pong mode, receive or send reply as needed
703          */
704         if (td_read(td) && io_u->ddir == DDIR_READ)
705                 ret = __fio_netio_queue(td, io_u, DDIR_WRITE);
706         else if (td_write(td) && io_u->ddir == DDIR_WRITE)
707                 ret = __fio_netio_queue(td, io_u, DDIR_READ);
708
709         return ret;
710 }
711
712 static int fio_netio_connect(struct thread_data *td, struct fio_file *f)
713 {
714         struct netio_data *nd = td->io_ops_data;
715         struct netio_options *o = td->eo;
716         int type, domain;
717
718         if (o->proto == FIO_TYPE_TCP) {
719                 domain = AF_INET;
720                 type = SOCK_STREAM;
721         } else if (o->proto == FIO_TYPE_TCP_V6) {
722                 domain = AF_INET6;
723                 type = SOCK_STREAM;
724         } else if (o->proto == FIO_TYPE_UDP) {
725                 domain = AF_INET;
726                 type = SOCK_DGRAM;
727         } else if (o->proto == FIO_TYPE_UDP_V6) {
728                 domain = AF_INET6;
729                 type = SOCK_DGRAM;
730         } else if (o->proto == FIO_TYPE_UNIX) {
731                 domain = AF_UNIX;
732                 type = SOCK_STREAM;
733         } else {
734                 log_err("fio: bad network type %d\n", o->proto);
735                 f->fd = -1;
736                 return 1;
737         }
738
739         f->fd = socket(domain, type, 0);
740         if (f->fd < 0) {
741                 td_verror(td, errno, "socket");
742                 return 1;
743         }
744
745 #ifdef CONFIG_TCP_NODELAY
746         if (o->nodelay && is_tcp(o)) {
747                 int optval = 1;
748
749                 if (setsockopt(f->fd, IPPROTO_TCP, TCP_NODELAY, (void *) &optval, sizeof(int)) < 0) {
750                         log_err("fio: cannot set TCP_NODELAY option on socket (%s), disable with 'nodelay=0'\n", strerror(errno));
751                         return 1;
752                 }
753         }
754 #endif
755
756         if (set_window_size(td, f->fd)) {
757                 close(f->fd);
758                 return 1;
759         }
760         if (set_mss(td, f->fd)) {
761                 close(f->fd);
762                 return 1;
763         }
764
765         if (is_udp(o)) {
766                 if (!fio_netio_is_multicast(td->o.filename))
767                         return 0;
768                 if (is_ipv6(o)) {
769                         log_err("fio: multicast not supported on IPv6\n");
770                         close(f->fd);
771                         return 1;
772                 }
773
774                 if (o->intfc) {
775                         struct in_addr interface_addr;
776
777                         if (inet_aton(o->intfc, &interface_addr) == 0) {
778                                 log_err("fio: interface not valid interface IP\n");
779                                 close(f->fd);
780                                 return 1;
781                         }
782                         if (setsockopt(f->fd, IPPROTO_IP, IP_MULTICAST_IF, (const char*)&interface_addr, sizeof(interface_addr)) < 0) {
783                                 td_verror(td, errno, "setsockopt IP_MULTICAST_IF");
784                                 close(f->fd);
785                                 return 1;
786                         }
787                 }
788                 if (setsockopt(f->fd, IPPROTO_IP, IP_MULTICAST_TTL, (const char*)&o->ttl, sizeof(o->ttl)) < 0) {
789                         td_verror(td, errno, "setsockopt IP_MULTICAST_TTL");
790                         close(f->fd);
791                         return 1;
792                 }
793                 return 0;
794         } else if (o->proto == FIO_TYPE_TCP) {
795                 socklen_t len = sizeof(nd->addr);
796
797                 if (connect(f->fd, (struct sockaddr *) &nd->addr, len) < 0) {
798                         td_verror(td, errno, "connect");
799                         close(f->fd);
800                         return 1;
801                 }
802         } else if (o->proto == FIO_TYPE_TCP_V6) {
803                 socklen_t len = sizeof(nd->addr6);
804
805                 if (connect(f->fd, (struct sockaddr *) &nd->addr6, len) < 0) {
806                         td_verror(td, errno, "connect");
807                         close(f->fd);
808                         return 1;
809                 }
810
811         } else {
812                 struct sockaddr_un *addr = &nd->addr_un;
813                 socklen_t len;
814
815                 len = sizeof(addr->sun_family) + strlen(addr->sun_path) + 1;
816
817                 if (connect(f->fd, (struct sockaddr *) addr, len) < 0) {
818                         td_verror(td, errno, "connect");
819                         close(f->fd);
820                         return 1;
821                 }
822         }
823
824         return 0;
825 }
826
827 static int fio_netio_accept(struct thread_data *td, struct fio_file *f)
828 {
829         struct netio_data *nd = td->io_ops_data;
830         struct netio_options *o = td->eo;
831         socklen_t socklen;
832         int state;
833
834         if (is_udp(o)) {
835                 f->fd = nd->listenfd;
836                 return 0;
837         }
838
839         state = td->runstate;
840         td_set_runstate(td, TD_SETTING_UP);
841
842         log_info("fio: waiting for connection\n");
843
844         if (poll_wait(td, nd->listenfd, POLLIN) < 0)
845                 goto err;
846
847         if (o->proto == FIO_TYPE_TCP) {
848                 socklen = sizeof(nd->addr);
849                 f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr, &socklen);
850         } else {
851                 socklen = sizeof(nd->addr6);
852                 f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr6, &socklen);
853         }
854
855         if (f->fd < 0) {
856                 td_verror(td, errno, "accept");
857                 goto err;
858         }
859
860 #ifdef CONFIG_TCP_NODELAY
861         if (o->nodelay && is_tcp(o)) {
862                 int optval = 1;
863
864                 if (setsockopt(f->fd, IPPROTO_TCP, TCP_NODELAY, (void *) &optval, sizeof(int)) < 0) {
865                         log_err("fio: cannot set TCP_NODELAY option on socket (%s), disable with 'nodelay=0'\n", strerror(errno));
866                         return 1;
867                 }
868         }
869 #endif
870
871         reset_all_stats(td);
872         td_set_runstate(td, state);
873         return 0;
874 err:
875         td_set_runstate(td, state);
876         return 1;
877 }
878
879 static void fio_netio_send_close(struct thread_data *td, struct fio_file *f)
880 {
881         struct netio_data *nd = td->io_ops_data;
882         struct netio_options *o = td->eo;
883         struct udp_close_msg msg;
884         struct sockaddr *to;
885         socklen_t len;
886         int ret;
887
888         if (is_ipv6(o)) {
889                 to = (struct sockaddr *) &nd->addr6;
890                 len = sizeof(nd->addr6);
891         } else {
892                 to = (struct sockaddr *) &nd->addr;
893                 len = sizeof(nd->addr);
894         }
895
896         msg.magic = cpu_to_le32((uint32_t) FIO_LINK_OPEN_CLOSE_MAGIC);
897         msg.cmd = cpu_to_le32((uint32_t) FIO_LINK_CLOSE);
898
899         ret = sendto(f->fd, (void *) &msg, sizeof(msg), MSG_WAITALL, to, len);
900         if (ret < 0)
901                 td_verror(td, errno, "sendto udp link close");
902 }
903
904 static int fio_netio_close_file(struct thread_data *td, struct fio_file *f)
905 {
906         /*
907          * Notify the receiver that we are closing down the link
908          */
909         fio_netio_send_close(td, f);
910
911         return generic_close_file(td, f);
912 }
913
914 static int fio_netio_udp_recv_open(struct thread_data *td, struct fio_file *f)
915 {
916         struct netio_data *nd = td->io_ops_data;
917         struct netio_options *o = td->eo;
918         struct udp_close_msg msg;
919         struct sockaddr *to;
920         socklen_t len;
921         int ret;
922
923         if (is_ipv6(o)) {
924                 len = sizeof(nd->addr6);
925                 to = (struct sockaddr *) &nd->addr6;
926         } else {
927                 len = sizeof(nd->addr);
928                 to = (struct sockaddr *) &nd->addr;
929         }
930
931         ret = recvfrom(f->fd, (void *) &msg, sizeof(msg), MSG_WAITALL, to, &len);
932         if (ret < 0) {
933                 td_verror(td, errno, "recvfrom udp link open");
934                 return ret;
935         }
936
937         if (ntohl(msg.magic) != FIO_LINK_OPEN_CLOSE_MAGIC ||
938             ntohl(msg.cmd) != FIO_LINK_OPEN) {
939                 log_err("fio: bad udp open magic %x/%x\n", ntohl(msg.magic),
940                                                                 ntohl(msg.cmd));
941                 return -1;
942         }
943
944         fio_gettime(&td->start, NULL);
945         return 0;
946 }
947
948 static int fio_netio_send_open(struct thread_data *td, struct fio_file *f)
949 {
950         struct netio_data *nd = td->io_ops_data;
951         struct netio_options *o = td->eo;
952         struct udp_close_msg msg;
953         struct sockaddr *to;
954         socklen_t len;
955         int ret;
956
957         if (is_ipv6(o)) {
958                 len = sizeof(nd->addr6);
959                 to = (struct sockaddr *) &nd->addr6;
960         } else {
961                 len = sizeof(nd->addr);
962                 to = (struct sockaddr *) &nd->addr;
963         }
964
965         msg.magic = htonl(FIO_LINK_OPEN_CLOSE_MAGIC);
966         msg.cmd = htonl(FIO_LINK_OPEN);
967
968         ret = sendto(f->fd, (void *) &msg, sizeof(msg), MSG_WAITALL, to, len);
969         if (ret < 0) {
970                 td_verror(td, errno, "sendto udp link open");
971                 return ret;
972         }
973
974         return 0;
975 }
976
977 static int fio_netio_open_file(struct thread_data *td, struct fio_file *f)
978 {
979         int ret;
980         struct netio_options *o = td->eo;
981
982         if (o->listen)
983                 ret = fio_netio_accept(td, f);
984         else
985                 ret = fio_netio_connect(td, f);
986
987         if (ret) {
988                 f->fd = -1;
989                 return ret;
990         }
991
992         if (is_udp(o)) {
993                 if (td_write(td))
994                         ret = fio_netio_send_open(td, f);
995                 else {
996                         int state;
997
998                         state = td->runstate;
999                         td_set_runstate(td, TD_SETTING_UP);
1000                         ret = fio_netio_udp_recv_open(td, f);
1001                         td_set_runstate(td, state);
1002                 }
1003         }
1004
1005         if (ret)
1006                 fio_netio_close_file(td, f);
1007
1008         return ret;
1009 }
1010
1011 static int fio_fill_addr(struct thread_data *td, const char *host, int af,
1012                          void *dst, struct addrinfo **res)
1013 {
1014         struct netio_options *o = td->eo;
1015         struct addrinfo hints;
1016         int ret;
1017
1018         if (inet_pton(af, host, dst))
1019                 return 0;
1020
1021         memset(&hints, 0, sizeof(hints));
1022
1023         if (is_tcp(o))
1024                 hints.ai_socktype = SOCK_STREAM;
1025         else
1026                 hints.ai_socktype = SOCK_DGRAM;
1027
1028         if (is_ipv6(o))
1029                 hints.ai_family = AF_INET6;
1030         else
1031                 hints.ai_family = AF_INET;
1032
1033         ret = getaddrinfo(host, NULL, &hints, res);
1034         if (ret) {
1035                 int e = EINVAL;
1036                 char str[128];
1037
1038                 if (ret == EAI_SYSTEM)
1039                         e = errno;
1040
1041                 snprintf(str, sizeof(str), "getaddrinfo: %s", gai_strerror(ret));
1042                 td_verror(td, e, str);
1043                 return 1;
1044         }
1045
1046         return 0;
1047 }
1048
1049 static int fio_netio_setup_connect_inet(struct thread_data *td,
1050                                         const char *host, unsigned short port)
1051 {
1052         struct netio_data *nd = td->io_ops_data;
1053         struct netio_options *o = td->eo;
1054         struct addrinfo *res = NULL;
1055         void *dst, *src;
1056         int af, len;
1057
1058         if (!host) {
1059                 log_err("fio: connect with no host to connect to.\n");
1060                 if (td_read(td))
1061                         log_err("fio: did you forget to set 'listen'?\n");
1062
1063                 td_verror(td, EINVAL, "no hostname= set");
1064                 return 1;
1065         }
1066
1067         nd->addr.sin_family = AF_INET;
1068         nd->addr.sin_port = htons(port);
1069         nd->addr6.sin6_family = AF_INET6;
1070         nd->addr6.sin6_port = htons(port);
1071
1072         if (is_ipv6(o)) {
1073                 af = AF_INET6;
1074                 dst = &nd->addr6.sin6_addr;
1075         } else {
1076                 af = AF_INET;
1077                 dst = &nd->addr.sin_addr;
1078         }
1079
1080         if (fio_fill_addr(td, host, af, dst, &res))
1081                 return 1;
1082
1083         if (!res)
1084                 return 0;
1085
1086         if (is_ipv6(o)) {
1087                 len = sizeof(nd->addr6.sin6_addr);
1088                 src = &((struct sockaddr_in6 *) res->ai_addr)->sin6_addr;
1089         } else {
1090                 len = sizeof(nd->addr.sin_addr);
1091                 src = &((struct sockaddr_in *) res->ai_addr)->sin_addr;
1092         }
1093
1094         memcpy(dst, src, len);
1095         freeaddrinfo(res);
1096         return 0;
1097 }
1098
1099 static int fio_netio_setup_connect_unix(struct thread_data *td,
1100                                         const char *path)
1101 {
1102         struct netio_data *nd = td->io_ops_data;
1103         struct sockaddr_un *soun = &nd->addr_un;
1104
1105         soun->sun_family = AF_UNIX;
1106         memset(soun->sun_path, 0, sizeof(soun->sun_path));
1107         strncpy(soun->sun_path, path, sizeof(soun->sun_path) - 1);
1108         return 0;
1109 }
1110
1111 static int fio_netio_setup_connect(struct thread_data *td)
1112 {
1113         struct netio_options *o = td->eo;
1114
1115         if (is_udp(o) || is_tcp(o))
1116                 return fio_netio_setup_connect_inet(td, td->o.filename,o->port);
1117         else
1118                 return fio_netio_setup_connect_unix(td, td->o.filename);
1119 }
1120
1121 static int fio_netio_setup_listen_unix(struct thread_data *td, const char *path)
1122 {
1123         struct netio_data *nd = td->io_ops_data;
1124         struct sockaddr_un *addr = &nd->addr_un;
1125         mode_t mode;
1126         int len, fd;
1127
1128         fd = socket(AF_UNIX, SOCK_STREAM, 0);
1129         if (fd < 0) {
1130                 log_err("fio: socket: %s\n", strerror(errno));
1131                 return -1;
1132         }
1133
1134         mode = umask(000);
1135
1136         memset(addr, 0, sizeof(*addr));
1137         addr->sun_family = AF_UNIX;
1138         strncpy(addr->sun_path, path, sizeof(addr->sun_path) - 1);
1139         unlink(path);
1140
1141         len = sizeof(addr->sun_family) + strlen(path) + 1;
1142
1143         if (bind(fd, (struct sockaddr *) addr, len) < 0) {
1144                 log_err("fio: bind: %s\n", strerror(errno));
1145                 close(fd);
1146                 return -1;
1147         }
1148
1149         umask(mode);
1150         nd->listenfd = fd;
1151         return 0;
1152 }
1153
1154 static int fio_netio_setup_listen_inet(struct thread_data *td, short port)
1155 {
1156         struct netio_data *nd = td->io_ops_data;
1157         struct netio_options *o = td->eo;
1158         struct ip_mreq mr;
1159         struct sockaddr_in sin;
1160         struct sockaddr *saddr;
1161         int fd, opt, type, domain;
1162         socklen_t len;
1163
1164         memset(&sin, 0, sizeof(sin));
1165
1166         if (o->proto == FIO_TYPE_TCP) {
1167                 type = SOCK_STREAM;
1168                 domain = AF_INET;
1169         } else if (o->proto == FIO_TYPE_TCP_V6) {
1170                 type = SOCK_STREAM;
1171                 domain = AF_INET6;
1172         } else if (o->proto == FIO_TYPE_UDP) {
1173                 type = SOCK_DGRAM;
1174                 domain = AF_INET;
1175         } else if (o->proto == FIO_TYPE_UDP_V6) {
1176                 type = SOCK_DGRAM;
1177                 domain = AF_INET6;
1178         } else {
1179                 log_err("fio: unknown proto %d\n", o->proto);
1180                 return 1;
1181         }
1182
1183         fd = socket(domain, type, 0);
1184         if (fd < 0) {
1185                 td_verror(td, errno, "socket");
1186                 return 1;
1187         }
1188
1189         opt = 1;
1190         if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *) &opt, sizeof(opt)) < 0) {
1191                 td_verror(td, errno, "setsockopt");
1192                 close(fd);
1193                 return 1;
1194         }
1195 #ifdef SO_REUSEPORT
1196         if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, (void *) &opt, sizeof(opt)) < 0) {
1197                 td_verror(td, errno, "setsockopt");
1198                 close(fd);
1199                 return 1;
1200         }
1201 #endif
1202
1203         if (set_window_size(td, fd)) {
1204                 close(fd);
1205                 return 1;
1206         }
1207         if (set_mss(td, fd)) {
1208                 close(fd);
1209                 return 1;
1210         }
1211
1212         if (td->o.filename) {
1213                 if (!is_udp(o) || !fio_netio_is_multicast(td->o.filename)) {
1214                         log_err("fio: hostname not valid for non-multicast inbound network IO\n");
1215                         close(fd);
1216                         return 1;
1217                 }
1218                 if (is_ipv6(o)) {
1219                         log_err("fio: IPv6 not supported for multicast network IO\n");
1220                         close(fd);
1221                         return 1;
1222                 }
1223
1224                 inet_aton(td->o.filename, &sin.sin_addr);
1225
1226                 mr.imr_multiaddr = sin.sin_addr;
1227                 if (o->intfc) {
1228                         if (inet_aton(o->intfc, &mr.imr_interface) == 0) {
1229                                 log_err("fio: interface not valid interface IP\n");
1230                                 close(fd);
1231                                 return 1;
1232                         }
1233                 } else {
1234                         mr.imr_interface.s_addr = htonl(INADDR_ANY);
1235                 }
1236
1237                 if (setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (const char*)&mr, sizeof(mr)) < 0) {
1238                         td_verror(td, errno, "setsockopt IP_ADD_MEMBERSHIP");
1239                         close(fd);
1240                         return 1;
1241                 }
1242         }
1243
1244         if (!is_ipv6(o)) {
1245                 saddr = (struct sockaddr *) &nd->addr;
1246                 len = sizeof(nd->addr);
1247
1248                 nd->addr.sin_family = AF_INET;
1249                 nd->addr.sin_addr.s_addr = sin.sin_addr.s_addr ? sin.sin_addr.s_addr : htonl(INADDR_ANY);
1250                 nd->addr.sin_port = htons(port);
1251         } else {
1252                 saddr = (struct sockaddr *) &nd->addr6;
1253                 len = sizeof(nd->addr6);
1254
1255                 nd->addr6.sin6_family = AF_INET6;
1256                 nd->addr6.sin6_addr = in6addr_any;
1257                 nd->addr6.sin6_port = htons(port);
1258         }
1259
1260         if (bind(fd, saddr, len) < 0) {
1261                 close(fd);
1262                 td_verror(td, errno, "bind");
1263                 return 1;
1264         }
1265
1266         nd->listenfd = fd;
1267         return 0;
1268 }
1269
1270 static int fio_netio_setup_listen(struct thread_data *td)
1271 {
1272         struct netio_data *nd = td->io_ops_data;
1273         struct netio_options *o = td->eo;
1274         int ret;
1275
1276         if (is_udp(o) || is_tcp(o))
1277                 ret = fio_netio_setup_listen_inet(td, o->port);
1278         else
1279                 ret = fio_netio_setup_listen_unix(td, td->o.filename);
1280
1281         if (ret)
1282                 return ret;
1283         if (is_udp(o))
1284                 return 0;
1285
1286         if (listen(nd->listenfd, 10) < 0) {
1287                 td_verror(td, errno, "listen");
1288                 nd->listenfd = -1;
1289                 return 1;
1290         }
1291
1292         return 0;
1293 }
1294
1295 static int fio_netio_init(struct thread_data *td)
1296 {
1297         struct netio_options *o = td->eo;
1298         int ret;
1299
1300 #ifdef WIN32
1301         WSADATA wsd;
1302         WSAStartup(MAKEWORD(2,2), &wsd);
1303 #endif
1304
1305         if (td_random(td)) {
1306                 log_err("fio: network IO can't be random\n");
1307                 return 1;
1308         }
1309
1310         if (o->proto == FIO_TYPE_UNIX && o->port) {
1311                 log_err("fio: network IO port not valid with unix socket\n");
1312                 return 1;
1313         } else if (o->proto != FIO_TYPE_UNIX && !o->port) {
1314                 log_err("fio: network IO requires port for tcp or udp\n");
1315                 return 1;
1316         }
1317
1318         o->port += td->subjob_number;
1319
1320         if (!is_tcp(o)) {
1321                 if (o->listen) {
1322                         log_err("fio: listen only valid for TCP proto IO\n");
1323                         return 1;
1324                 }
1325                 if (td_rw(td)) {
1326                         log_err("fio: datagram network connections must be"
1327                                    " read OR write\n");
1328                         return 1;
1329                 }
1330                 if (o->proto == FIO_TYPE_UNIX && !td->o.filename) {
1331                         log_err("fio: UNIX sockets need host/filename\n");
1332                         return 1;
1333                 }
1334                 o->listen = td_read(td);
1335         }
1336
1337         if (o->listen)
1338                 ret = fio_netio_setup_listen(td);
1339         else
1340                 ret = fio_netio_setup_connect(td);
1341
1342         return ret;
1343 }
1344
1345 static void fio_netio_cleanup(struct thread_data *td)
1346 {
1347         struct netio_data *nd = td->io_ops_data;
1348
1349         if (nd) {
1350                 if (nd->listenfd != -1)
1351                         close(nd->listenfd);
1352                 if (nd->pipes[0] != -1)
1353                         close(nd->pipes[0]);
1354                 if (nd->pipes[1] != -1)
1355                         close(nd->pipes[1]);
1356
1357                 free(nd);
1358         }
1359 }
1360
1361 static int fio_netio_setup(struct thread_data *td)
1362 {
1363         struct netio_data *nd;
1364
1365         if (!td->files_index) {
1366                 add_file(td, td->o.filename ?: "net", 0, 0);
1367                 td->o.nr_files = td->o.nr_files ?: 1;
1368                 td->o.open_files++;
1369         }
1370
1371         if (!td->io_ops_data) {
1372                 nd = malloc(sizeof(*nd));
1373
1374                 memset(nd, 0, sizeof(*nd));
1375                 nd->listenfd = -1;
1376                 nd->pipes[0] = nd->pipes[1] = -1;
1377                 td->io_ops_data = nd;
1378         }
1379
1380         return 0;
1381 }
1382
1383 static void fio_netio_terminate(struct thread_data *td)
1384 {
1385         kill(td->pid, SIGTERM);
1386 }
1387
1388 #ifdef CONFIG_LINUX_SPLICE
1389 static int fio_netio_setup_splice(struct thread_data *td)
1390 {
1391         struct netio_data *nd;
1392
1393         fio_netio_setup(td);
1394
1395         nd = td->io_ops_data;
1396         if (nd) {
1397                 if (pipe(nd->pipes) < 0)
1398                         return 1;
1399
1400                 nd->use_splice = 1;
1401                 return 0;
1402         }
1403
1404         return 1;
1405 }
1406
1407 static struct ioengine_ops ioengine_splice = {
1408         .name                   = "netsplice",
1409         .version                = FIO_IOOPS_VERSION,
1410         .prep                   = fio_netio_prep,
1411         .queue                  = fio_netio_queue,
1412         .setup                  = fio_netio_setup_splice,
1413         .init                   = fio_netio_init,
1414         .cleanup                = fio_netio_cleanup,
1415         .open_file              = fio_netio_open_file,
1416         .close_file             = fio_netio_close_file,
1417         .terminate              = fio_netio_terminate,
1418         .options                = options,
1419         .option_struct_size     = sizeof(struct netio_options),
1420         .flags                  = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
1421                                   FIO_PIPEIO,
1422 };
1423 #endif
1424
1425 static struct ioengine_ops ioengine_rw = {
1426         .name                   = "net",
1427         .version                = FIO_IOOPS_VERSION,
1428         .prep                   = fio_netio_prep,
1429         .queue                  = fio_netio_queue,
1430         .setup                  = fio_netio_setup,
1431         .init                   = fio_netio_init,
1432         .cleanup                = fio_netio_cleanup,
1433         .open_file              = fio_netio_open_file,
1434         .close_file             = fio_netio_close_file,
1435         .terminate              = fio_netio_terminate,
1436         .options                = options,
1437         .option_struct_size     = sizeof(struct netio_options),
1438         .flags                  = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
1439                                   FIO_PIPEIO | FIO_BIT_BASED,
1440 };
1441
1442 static int str_hostname_cb(void *data, const char *input)
1443 {
1444         struct netio_options *o = data;
1445
1446         if (o->td->o.filename)
1447                 free(o->td->o.filename);
1448         o->td->o.filename = strdup(input);
1449         return 0;
1450 }
1451
1452 static void fio_init fio_netio_register(void)
1453 {
1454         register_ioengine(&ioengine_rw);
1455 #ifdef CONFIG_LINUX_SPLICE
1456         register_ioengine(&ioengine_splice);
1457 #endif
1458 }
1459
1460 static void fio_exit fio_netio_unregister(void)
1461 {
1462         unregister_ioengine(&ioengine_rw);
1463 #ifdef CONFIG_LINUX_SPLICE
1464         unregister_ioengine(&ioengine_splice);
1465 #endif
1466 }