Merge branch 'histogram-delta' of https://github.com/cronburg/fio into histogram
[fio.git] / engines / net.c
1 /*
2  * net engine
3  *
4  * IO engine that reads/writes to/from sockets.
5  *
6  */
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include <unistd.h>
10 #include <signal.h>
11 #include <errno.h>
12 #include <assert.h>
13 #include <netinet/in.h>
14 #include <netinet/tcp.h>
15 #include <arpa/inet.h>
16 #include <netdb.h>
17 #include <sys/poll.h>
18 #include <sys/types.h>
19 #include <sys/stat.h>
20 #include <sys/socket.h>
21 #include <sys/un.h>
22
23 #include "../fio.h"
24 #include "../verify.h"
25 #include "../optgroup.h"
26
27 struct netio_data {
28         int listenfd;
29         int use_splice;
30         int seq_off;
31         int pipes[2];
32         struct sockaddr_in addr;
33         struct sockaddr_in6 addr6;
34         struct sockaddr_un addr_un;
35         uint64_t udp_send_seq;
36         uint64_t udp_recv_seq;
37 };
38
39 struct netio_options {
40         struct thread_data *td;
41         unsigned int port;
42         unsigned int proto;
43         unsigned int listen;
44         unsigned int pingpong;
45         unsigned int nodelay;
46         unsigned int ttl;
47         unsigned int window_size;
48         unsigned int mss;
49         char *intfc;
50 };
51
52 struct udp_close_msg {
53         uint32_t magic;
54         uint32_t cmd;
55 };
56
57 struct udp_seq {
58         uint64_t magic;
59         uint64_t seq;
60         uint64_t bs;
61 };
62
63 enum {
64         FIO_LINK_CLOSE = 0x89,
65         FIO_LINK_OPEN_CLOSE_MAGIC = 0x6c696e6b,
66         FIO_LINK_OPEN = 0x98,
67         FIO_UDP_SEQ_MAGIC = 0x657375716e556563ULL,
68
69         FIO_TYPE_TCP    = 1,
70         FIO_TYPE_UDP    = 2,
71         FIO_TYPE_UNIX   = 3,
72         FIO_TYPE_TCP_V6 = 4,
73         FIO_TYPE_UDP_V6 = 5,
74 };
75
76 static int str_hostname_cb(void *data, const char *input);
77 static struct fio_option options[] = {
78         {
79                 .name   = "hostname",
80                 .lname  = "net engine hostname",
81                 .type   = FIO_OPT_STR_STORE,
82                 .cb     = str_hostname_cb,
83                 .help   = "Hostname for net IO engine",
84                 .category = FIO_OPT_C_ENGINE,
85                 .group  = FIO_OPT_G_NETIO,
86         },
87         {
88                 .name   = "port",
89                 .lname  = "net engine port",
90                 .type   = FIO_OPT_INT,
91                 .off1   = offsetof(struct netio_options, port),
92                 .minval = 1,
93                 .maxval = 65535,
94                 .help   = "Port to use for TCP or UDP net connections",
95                 .category = FIO_OPT_C_ENGINE,
96                 .group  = FIO_OPT_G_NETIO,
97         },
98         {
99                 .name   = "protocol",
100                 .lname  = "net engine protocol",
101                 .alias  = "proto",
102                 .type   = FIO_OPT_STR,
103                 .off1   = offsetof(struct netio_options, proto),
104                 .help   = "Network protocol to use",
105                 .def    = "tcp",
106                 .posval = {
107                           { .ival = "tcp",
108                             .oval = FIO_TYPE_TCP,
109                             .help = "Transmission Control Protocol",
110                           },
111 #ifdef CONFIG_IPV6
112                           { .ival = "tcpv6",
113                             .oval = FIO_TYPE_TCP_V6,
114                             .help = "Transmission Control Protocol V6",
115                           },
116 #endif
117                           { .ival = "udp",
118                             .oval = FIO_TYPE_UDP,
119                             .help = "User Datagram Protocol",
120                           },
121 #ifdef CONFIG_IPV6
122                           { .ival = "udpv6",
123                             .oval = FIO_TYPE_UDP_V6,
124                             .help = "User Datagram Protocol V6",
125                           },
126 #endif
127                           { .ival = "unix",
128                             .oval = FIO_TYPE_UNIX,
129                             .help = "UNIX domain socket",
130                           },
131                 },
132                 .category = FIO_OPT_C_ENGINE,
133                 .group  = FIO_OPT_G_NETIO,
134         },
135 #ifdef CONFIG_TCP_NODELAY
136         {
137                 .name   = "nodelay",
138                 .lname  = "No Delay",
139                 .type   = FIO_OPT_BOOL,
140                 .off1   = offsetof(struct netio_options, nodelay),
141                 .help   = "Use TCP_NODELAY on TCP connections",
142                 .category = FIO_OPT_C_ENGINE,
143                 .group  = FIO_OPT_G_NETIO,
144         },
145 #endif
146         {
147                 .name   = "listen",
148                 .lname  = "net engine listen",
149                 .type   = FIO_OPT_STR_SET,
150                 .off1   = offsetof(struct netio_options, listen),
151                 .help   = "Listen for incoming TCP connections",
152                 .category = FIO_OPT_C_ENGINE,
153                 .group  = FIO_OPT_G_NETIO,
154         },
155         {
156                 .name   = "pingpong",
157                 .lname  = "Ping Pong",
158                 .type   = FIO_OPT_STR_SET,
159                 .off1   = offsetof(struct netio_options, pingpong),
160                 .help   = "Ping-pong IO requests",
161                 .category = FIO_OPT_C_ENGINE,
162                 .group  = FIO_OPT_G_NETIO,
163         },
164         {
165                 .name   = "interface",
166                 .lname  = "net engine interface",
167                 .type   = FIO_OPT_STR_STORE,
168                 .off1   = offsetof(struct netio_options, intfc),
169                 .help   = "Network interface to use",
170                 .category = FIO_OPT_C_ENGINE,
171                 .group  = FIO_OPT_G_NETIO,
172         },
173         {
174                 .name   = "ttl",
175                 .lname  = "net engine multicast ttl",
176                 .type   = FIO_OPT_INT,
177                 .off1   = offsetof(struct netio_options, ttl),
178                 .def    = "1",
179                 .minval = 0,
180                 .help   = "Time-to-live value for outgoing UDP multicast packets",
181                 .category = FIO_OPT_C_ENGINE,
182                 .group  = FIO_OPT_G_NETIO,
183         },
184 #ifdef CONFIG_NET_WINDOWSIZE
185         {
186                 .name   = "window_size",
187                 .lname  = "Window Size",
188                 .type   = FIO_OPT_INT,
189                 .off1   = offsetof(struct netio_options, window_size),
190                 .minval = 0,
191                 .help   = "Set socket buffer window size",
192                 .category = FIO_OPT_C_ENGINE,
193                 .group  = FIO_OPT_G_NETIO,
194         },
195 #endif
196 #ifdef CONFIG_NET_MSS
197         {
198                 .name   = "mss",
199                 .lname  = "Maximum segment size",
200                 .type   = FIO_OPT_INT,
201                 .off1   = offsetof(struct netio_options, mss),
202                 .minval = 0,
203                 .help   = "Set TCP maximum segment size",
204                 .category = FIO_OPT_C_ENGINE,
205                 .group  = FIO_OPT_G_NETIO,
206         },
207 #endif
208         {
209                 .name   = NULL,
210         },
211 };
212
213 static inline int is_udp(struct netio_options *o)
214 {
215         return o->proto == FIO_TYPE_UDP || o->proto == FIO_TYPE_UDP_V6;
216 }
217
218 static inline int is_tcp(struct netio_options *o)
219 {
220         return o->proto == FIO_TYPE_TCP || o->proto == FIO_TYPE_TCP_V6;
221 }
222
223 static inline int is_ipv6(struct netio_options *o)
224 {
225         return o->proto == FIO_TYPE_UDP_V6 || o->proto == FIO_TYPE_TCP_V6;
226 }
227
228 static int set_window_size(struct thread_data *td, int fd)
229 {
230 #ifdef CONFIG_NET_WINDOWSIZE
231         struct netio_options *o = td->eo;
232         unsigned int wss;
233         int snd, rcv, ret;
234
235         if (!o->window_size)
236                 return 0;
237
238         rcv = o->listen || o->pingpong;
239         snd = !o->listen || o->pingpong;
240         wss = o->window_size;
241         ret = 0;
242
243         if (rcv) {
244                 ret = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (void *) &wss,
245                                         sizeof(wss));
246                 if (ret < 0)
247                         td_verror(td, errno, "rcvbuf window size");
248         }
249         if (snd && !ret) {
250                 ret = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (void *) &wss,
251                                         sizeof(wss));
252                 if (ret < 0)
253                         td_verror(td, errno, "sndbuf window size");
254         }
255
256         return ret;
257 #else
258         td_verror(td, -EINVAL, "setsockopt window size");
259         return -1;
260 #endif
261 }
262
263 static int set_mss(struct thread_data *td, int fd)
264 {
265 #ifdef CONFIG_NET_MSS
266         struct netio_options *o = td->eo;
267         unsigned int mss;
268         int ret;
269
270         if (!o->mss || !is_tcp(o))
271                 return 0;
272
273         mss = o->mss;
274         ret = setsockopt(fd, IPPROTO_TCP, TCP_MAXSEG, (void *) &mss,
275                                 sizeof(mss));
276         if (ret < 0)
277                 td_verror(td, errno, "setsockopt TCP_MAXSEG");
278
279         return ret;
280 #else
281         td_verror(td, -EINVAL, "setsockopt TCP_MAXSEG");
282         return -1;
283 #endif
284 }
285
286
287 /*
288  * Return -1 for error and 'nr events' for a positive number
289  * of events
290  */
291 static int poll_wait(struct thread_data *td, int fd, short events)
292 {
293         struct pollfd pfd;
294         int ret;
295
296         while (!td->terminate) {
297                 pfd.fd = fd;
298                 pfd.events = events;
299                 ret = poll(&pfd, 1, -1);
300                 if (ret < 0) {
301                         if (errno == EINTR)
302                                 break;
303
304                         td_verror(td, errno, "poll");
305                         return -1;
306                 } else if (!ret)
307                         continue;
308
309                 break;
310         }
311
312         if (pfd.revents & events)
313                 return 1;
314
315         return -1;
316 }
317
318 static int fio_netio_is_multicast(const char *mcaddr)
319 {
320         in_addr_t addr = inet_network(mcaddr);
321         if (addr == -1)
322                 return 0;
323
324         if (inet_network("224.0.0.0") <= addr &&
325             inet_network("239.255.255.255") >= addr)
326                 return 1;
327
328         return 0;
329 }
330
331
332 static int fio_netio_prep(struct thread_data *td, struct io_u *io_u)
333 {
334         struct netio_options *o = td->eo;
335
336         /*
337          * Make sure we don't see spurious reads to a receiver, and vice versa
338          */
339         if (is_tcp(o))
340                 return 0;
341
342         if ((o->listen && io_u->ddir == DDIR_WRITE) ||
343             (!o->listen && io_u->ddir == DDIR_READ)) {
344                 td_verror(td, EINVAL, "bad direction");
345                 return 1;
346         }
347
348         return 0;
349 }
350
351 #ifdef CONFIG_LINUX_SPLICE
352 static int splice_io_u(int fdin, int fdout, unsigned int len)
353 {
354         int bytes = 0;
355
356         while (len) {
357                 int ret = splice(fdin, NULL, fdout, NULL, len, 0);
358
359                 if (ret < 0) {
360                         if (!bytes)
361                                 bytes = ret;
362
363                         break;
364                 } else if (!ret)
365                         break;
366
367                 bytes += ret;
368                 len -= ret;
369         }
370
371         return bytes;
372 }
373
374 /*
375  * Receive bytes from a socket and fill them into the internal pipe
376  */
377 static int splice_in(struct thread_data *td, struct io_u *io_u)
378 {
379         struct netio_data *nd = td->io_ops_data;
380
381         return splice_io_u(io_u->file->fd, nd->pipes[1], io_u->xfer_buflen);
382 }
383
384 /*
385  * Transmit 'len' bytes from the internal pipe
386  */
387 static int splice_out(struct thread_data *td, struct io_u *io_u,
388                       unsigned int len)
389 {
390         struct netio_data *nd = td->io_ops_data;
391
392         return splice_io_u(nd->pipes[0], io_u->file->fd, len);
393 }
394
395 static int vmsplice_io_u(struct io_u *io_u, int fd, unsigned int len)
396 {
397         struct iovec iov = {
398                 .iov_base = io_u->xfer_buf,
399                 .iov_len = len,
400         };
401         int bytes = 0;
402
403         while (iov.iov_len) {
404                 int ret = vmsplice(fd, &iov, 1, SPLICE_F_MOVE);
405
406                 if (ret < 0) {
407                         if (!bytes)
408                                 bytes = ret;
409                         break;
410                 } else if (!ret)
411                         break;
412
413                 iov.iov_len -= ret;
414                 iov.iov_base += ret;
415                 bytes += ret;
416         }
417
418         return bytes;
419
420 }
421
422 /*
423  * vmsplice() pipe to io_u buffer
424  */
425 static int vmsplice_io_u_out(struct thread_data *td, struct io_u *io_u,
426                              unsigned int len)
427 {
428         struct netio_data *nd = td->io_ops_data;
429
430         return vmsplice_io_u(io_u, nd->pipes[0], len);
431 }
432
433 /*
434  * vmsplice() io_u to pipe
435  */
436 static int vmsplice_io_u_in(struct thread_data *td, struct io_u *io_u)
437 {
438         struct netio_data *nd = td->io_ops_data;
439
440         return vmsplice_io_u(io_u, nd->pipes[1], io_u->xfer_buflen);
441 }
442
443 /*
444  * splice receive - transfer socket data into a pipe using splice, then map
445  * that pipe data into the io_u using vmsplice.
446  */
447 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
448 {
449         int ret;
450
451         ret = splice_in(td, io_u);
452         if (ret > 0)
453                 return vmsplice_io_u_out(td, io_u, ret);
454
455         return ret;
456 }
457
458 /*
459  * splice transmit - map data from the io_u into a pipe by using vmsplice,
460  * then transfer that pipe to a socket using splice.
461  */
462 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
463 {
464         int ret;
465
466         ret = vmsplice_io_u_in(td, io_u);
467         if (ret > 0)
468                 return splice_out(td, io_u, ret);
469
470         return ret;
471 }
472 #else
473 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
474 {
475         errno = EOPNOTSUPP;
476         return -1;
477 }
478
479 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
480 {
481         errno = EOPNOTSUPP;
482         return -1;
483 }
484 #endif
485
486 static void store_udp_seq(struct netio_data *nd, struct io_u *io_u)
487 {
488         struct udp_seq *us;
489
490         if (io_u->xfer_buflen < sizeof(*us))
491                 return;
492
493         us = io_u->xfer_buf + io_u->xfer_buflen - sizeof(*us);
494         us->magic = cpu_to_le64((uint64_t) FIO_UDP_SEQ_MAGIC);
495         us->bs = cpu_to_le64((uint64_t) io_u->xfer_buflen);
496         us->seq = cpu_to_le64(nd->udp_send_seq++);
497 }
498
499 static void verify_udp_seq(struct thread_data *td, struct netio_data *nd,
500                            struct io_u *io_u)
501 {
502         struct udp_seq *us;
503         uint64_t seq;
504
505         if (io_u->xfer_buflen < sizeof(*us))
506                 return;
507
508         if (nd->seq_off)
509                 return;
510
511         us = io_u->xfer_buf + io_u->xfer_buflen - sizeof(*us);
512         if (le64_to_cpu(us->magic) != FIO_UDP_SEQ_MAGIC)
513                 return;
514         if (le64_to_cpu(us->bs) != io_u->xfer_buflen) {
515                 nd->seq_off = 1;
516                 return;
517         }
518
519         seq = le64_to_cpu(us->seq);
520
521         if (seq != nd->udp_recv_seq)
522                 td->ts.drop_io_u[io_u->ddir] += seq - nd->udp_recv_seq;
523
524         nd->udp_recv_seq = seq + 1;
525 }
526
527 static int fio_netio_send(struct thread_data *td, struct io_u *io_u)
528 {
529         struct netio_data *nd = td->io_ops_data;
530         struct netio_options *o = td->eo;
531         int ret, flags = 0;
532
533         do {
534                 if (is_udp(o)) {
535                         const struct sockaddr *to;
536                         socklen_t len;
537
538                         if (is_ipv6(o)) {
539                                 to = (struct sockaddr *) &nd->addr6;
540                                 len = sizeof(nd->addr6);
541                         } else {
542                                 to = (struct sockaddr *) &nd->addr;
543                                 len = sizeof(nd->addr);
544                         }
545
546                         if (td->o.verify == VERIFY_NONE)
547                                 store_udp_seq(nd, io_u);
548
549                         ret = sendto(io_u->file->fd, io_u->xfer_buf,
550                                         io_u->xfer_buflen, flags, to, len);
551                 } else {
552                         /*
553                          * if we are going to write more, set MSG_MORE
554                          */
555 #ifdef MSG_MORE
556                         if ((td->this_io_bytes[DDIR_WRITE] + io_u->xfer_buflen <
557                             td->o.size) && !o->pingpong)
558                                 flags |= MSG_MORE;
559 #endif
560                         ret = send(io_u->file->fd, io_u->xfer_buf,
561                                         io_u->xfer_buflen, flags);
562                 }
563                 if (ret > 0)
564                         break;
565
566                 ret = poll_wait(td, io_u->file->fd, POLLOUT);
567                 if (ret <= 0)
568                         break;
569         } while (1);
570
571         return ret;
572 }
573
574 static int is_close_msg(struct io_u *io_u, int len)
575 {
576         struct udp_close_msg *msg;
577
578         if (len != sizeof(struct udp_close_msg))
579                 return 0;
580
581         msg = io_u->xfer_buf;
582         if (le32_to_cpu(msg->magic) != FIO_LINK_OPEN_CLOSE_MAGIC)
583                 return 0;
584         if (le32_to_cpu(msg->cmd) != FIO_LINK_CLOSE)
585                 return 0;
586
587         return 1;
588 }
589
590 static int fio_netio_recv(struct thread_data *td, struct io_u *io_u)
591 {
592         struct netio_data *nd = td->io_ops_data;
593         struct netio_options *o = td->eo;
594         int ret, flags = 0;
595
596         do {
597                 if (is_udp(o)) {
598                         struct sockaddr *from;
599                         socklen_t l, *len = &l;
600
601                         if (o->listen) {
602                                 if (!is_ipv6(o)) {
603                                         from = (struct sockaddr *) &nd->addr;
604                                         *len = sizeof(nd->addr);
605                                 } else {
606                                         from = (struct sockaddr *) &nd->addr6;
607                                         *len = sizeof(nd->addr6);
608                                 }
609                         } else {
610                                 from = NULL;
611                                 len = NULL;
612                         }
613
614                         ret = recvfrom(io_u->file->fd, io_u->xfer_buf,
615                                         io_u->xfer_buflen, flags, from, len);
616
617                         if (is_close_msg(io_u, ret)) {
618                                 td->done = 1;
619                                 return 0;
620                         }
621                 } else {
622                         ret = recv(io_u->file->fd, io_u->xfer_buf,
623                                         io_u->xfer_buflen, flags);
624
625                         if (is_close_msg(io_u, ret)) {
626                                 td->done = 1;
627                                 return 0;
628                         }
629                 }
630                 if (ret > 0)
631                         break;
632                 else if (!ret && (flags & MSG_WAITALL))
633                         break;
634
635                 ret = poll_wait(td, io_u->file->fd, POLLIN);
636                 if (ret <= 0)
637                         break;
638                 flags |= MSG_WAITALL;
639         } while (1);
640
641         if (is_udp(o) && td->o.verify == VERIFY_NONE)
642                 verify_udp_seq(td, nd, io_u);
643
644         return ret;
645 }
646
647 static int __fio_netio_queue(struct thread_data *td, struct io_u *io_u,
648                              enum fio_ddir ddir)
649 {
650         struct netio_data *nd = td->io_ops_data;
651         struct netio_options *o = td->eo;
652         int ret;
653
654         if (ddir == DDIR_WRITE) {
655                 if (!nd->use_splice || is_udp(o) ||
656                     o->proto == FIO_TYPE_UNIX)
657                         ret = fio_netio_send(td, io_u);
658                 else
659                         ret = fio_netio_splice_out(td, io_u);
660         } else if (ddir == DDIR_READ) {
661                 if (!nd->use_splice || is_udp(o) ||
662                     o->proto == FIO_TYPE_UNIX)
663                         ret = fio_netio_recv(td, io_u);
664                 else
665                         ret = fio_netio_splice_in(td, io_u);
666         } else
667                 ret = 0;        /* must be a SYNC */
668
669         if (ret != (int) io_u->xfer_buflen) {
670                 if (ret > 0) {
671                         io_u->resid = io_u->xfer_buflen - ret;
672                         io_u->error = 0;
673                         return FIO_Q_COMPLETED;
674                 } else if (!ret)
675                         return FIO_Q_BUSY;
676                 else {
677                         int err = errno;
678
679                         if (ddir == DDIR_WRITE && err == EMSGSIZE)
680                                 return FIO_Q_BUSY;
681
682                         io_u->error = err;
683                 }
684         }
685
686         if (io_u->error)
687                 td_verror(td, io_u->error, "xfer");
688
689         return FIO_Q_COMPLETED;
690 }
691
692 static int fio_netio_queue(struct thread_data *td, struct io_u *io_u)
693 {
694         struct netio_options *o = td->eo;
695         int ret;
696
697         fio_ro_check(td, io_u);
698
699         ret = __fio_netio_queue(td, io_u, io_u->ddir);
700         if (!o->pingpong || ret != FIO_Q_COMPLETED)
701                 return ret;
702
703         /*
704          * For ping-pong mode, receive or send reply as needed
705          */
706         if (td_read(td) && io_u->ddir == DDIR_READ)
707                 ret = __fio_netio_queue(td, io_u, DDIR_WRITE);
708         else if (td_write(td) && io_u->ddir == DDIR_WRITE)
709                 ret = __fio_netio_queue(td, io_u, DDIR_READ);
710
711         return ret;
712 }
713
714 static int fio_netio_connect(struct thread_data *td, struct fio_file *f)
715 {
716         struct netio_data *nd = td->io_ops_data;
717         struct netio_options *o = td->eo;
718         int type, domain;
719
720         if (o->proto == FIO_TYPE_TCP) {
721                 domain = AF_INET;
722                 type = SOCK_STREAM;
723         } else if (o->proto == FIO_TYPE_TCP_V6) {
724                 domain = AF_INET6;
725                 type = SOCK_STREAM;
726         } else if (o->proto == FIO_TYPE_UDP) {
727                 domain = AF_INET;
728                 type = SOCK_DGRAM;
729         } else if (o->proto == FIO_TYPE_UDP_V6) {
730                 domain = AF_INET6;
731                 type = SOCK_DGRAM;
732         } else if (o->proto == FIO_TYPE_UNIX) {
733                 domain = AF_UNIX;
734                 type = SOCK_STREAM;
735         } else {
736                 log_err("fio: bad network type %d\n", o->proto);
737                 f->fd = -1;
738                 return 1;
739         }
740
741         f->fd = socket(domain, type, 0);
742         if (f->fd < 0) {
743                 td_verror(td, errno, "socket");
744                 return 1;
745         }
746
747 #ifdef CONFIG_TCP_NODELAY
748         if (o->nodelay && is_tcp(o)) {
749                 int optval = 1;
750
751                 if (setsockopt(f->fd, IPPROTO_TCP, TCP_NODELAY, (void *) &optval, sizeof(int)) < 0) {
752                         log_err("fio: cannot set TCP_NODELAY option on socket (%s), disable with 'nodelay=0'\n", strerror(errno));
753                         return 1;
754                 }
755         }
756 #endif
757
758         if (set_window_size(td, f->fd)) {
759                 close(f->fd);
760                 return 1;
761         }
762         if (set_mss(td, f->fd)) {
763                 close(f->fd);
764                 return 1;
765         }
766
767         if (is_udp(o)) {
768                 if (!fio_netio_is_multicast(td->o.filename))
769                         return 0;
770                 if (is_ipv6(o)) {
771                         log_err("fio: multicast not supported on IPv6\n");
772                         close(f->fd);
773                         return 1;
774                 }
775
776                 if (o->intfc) {
777                         struct in_addr interface_addr;
778
779                         if (inet_aton(o->intfc, &interface_addr) == 0) {
780                                 log_err("fio: interface not valid interface IP\n");
781                                 close(f->fd);
782                                 return 1;
783                         }
784                         if (setsockopt(f->fd, IPPROTO_IP, IP_MULTICAST_IF, (const char*)&interface_addr, sizeof(interface_addr)) < 0) {
785                                 td_verror(td, errno, "setsockopt IP_MULTICAST_IF");
786                                 close(f->fd);
787                                 return 1;
788                         }
789                 }
790                 if (setsockopt(f->fd, IPPROTO_IP, IP_MULTICAST_TTL, (const char*)&o->ttl, sizeof(o->ttl)) < 0) {
791                         td_verror(td, errno, "setsockopt IP_MULTICAST_TTL");
792                         close(f->fd);
793                         return 1;
794                 }
795                 return 0;
796         } else if (o->proto == FIO_TYPE_TCP) {
797                 socklen_t len = sizeof(nd->addr);
798
799                 if (connect(f->fd, (struct sockaddr *) &nd->addr, len) < 0) {
800                         td_verror(td, errno, "connect");
801                         close(f->fd);
802                         return 1;
803                 }
804         } else if (o->proto == FIO_TYPE_TCP_V6) {
805                 socklen_t len = sizeof(nd->addr6);
806
807                 if (connect(f->fd, (struct sockaddr *) &nd->addr6, len) < 0) {
808                         td_verror(td, errno, "connect");
809                         close(f->fd);
810                         return 1;
811                 }
812
813         } else {
814                 struct sockaddr_un *addr = &nd->addr_un;
815                 socklen_t len;
816
817                 len = sizeof(addr->sun_family) + strlen(addr->sun_path) + 1;
818
819                 if (connect(f->fd, (struct sockaddr *) addr, len) < 0) {
820                         td_verror(td, errno, "connect");
821                         close(f->fd);
822                         return 1;
823                 }
824         }
825
826         return 0;
827 }
828
829 static int fio_netio_accept(struct thread_data *td, struct fio_file *f)
830 {
831         struct netio_data *nd = td->io_ops_data;
832         struct netio_options *o = td->eo;
833         socklen_t socklen;
834         int state;
835
836         if (is_udp(o)) {
837                 f->fd = nd->listenfd;
838                 return 0;
839         }
840
841         state = td->runstate;
842         td_set_runstate(td, TD_SETTING_UP);
843
844         log_info("fio: waiting for connection\n");
845
846         if (poll_wait(td, nd->listenfd, POLLIN) < 0)
847                 goto err;
848
849         if (o->proto == FIO_TYPE_TCP) {
850                 socklen = sizeof(nd->addr);
851                 f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr, &socklen);
852         } else {
853                 socklen = sizeof(nd->addr6);
854                 f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr6, &socklen);
855         }
856
857         if (f->fd < 0) {
858                 td_verror(td, errno, "accept");
859                 goto err;
860         }
861
862 #ifdef CONFIG_TCP_NODELAY
863         if (o->nodelay && is_tcp(o)) {
864                 int optval = 1;
865
866                 if (setsockopt(f->fd, IPPROTO_TCP, TCP_NODELAY, (void *) &optval, sizeof(int)) < 0) {
867                         log_err("fio: cannot set TCP_NODELAY option on socket (%s), disable with 'nodelay=0'\n", strerror(errno));
868                         return 1;
869                 }
870         }
871 #endif
872
873         reset_all_stats(td);
874         td_set_runstate(td, state);
875         return 0;
876 err:
877         td_set_runstate(td, state);
878         return 1;
879 }
880
881 static void fio_netio_send_close(struct thread_data *td, struct fio_file *f)
882 {
883         struct netio_data *nd = td->io_ops_data;
884         struct netio_options *o = td->eo;
885         struct udp_close_msg msg;
886         struct sockaddr *to;
887         socklen_t len;
888         int ret;
889
890         if (is_ipv6(o)) {
891                 to = (struct sockaddr *) &nd->addr6;
892                 len = sizeof(nd->addr6);
893         } else {
894                 to = (struct sockaddr *) &nd->addr;
895                 len = sizeof(nd->addr);
896         }
897
898         msg.magic = cpu_to_le32((uint32_t) FIO_LINK_OPEN_CLOSE_MAGIC);
899         msg.cmd = cpu_to_le32((uint32_t) FIO_LINK_CLOSE);
900
901         ret = sendto(f->fd, (void *) &msg, sizeof(msg), MSG_WAITALL, to, len);
902         if (ret < 0)
903                 td_verror(td, errno, "sendto udp link close");
904 }
905
906 static int fio_netio_close_file(struct thread_data *td, struct fio_file *f)
907 {
908         /*
909          * Notify the receiver that we are closing down the link
910          */
911         fio_netio_send_close(td, f);
912
913         return generic_close_file(td, f);
914 }
915
916 static int fio_netio_udp_recv_open(struct thread_data *td, struct fio_file *f)
917 {
918         struct netio_data *nd = td->io_ops_data;
919         struct netio_options *o = td->eo;
920         struct udp_close_msg msg;
921         struct sockaddr *to;
922         socklen_t len;
923         int ret;
924
925         if (is_ipv6(o)) {
926                 len = sizeof(nd->addr6);
927                 to = (struct sockaddr *) &nd->addr6;
928         } else {
929                 len = sizeof(nd->addr);
930                 to = (struct sockaddr *) &nd->addr;
931         }
932
933         ret = recvfrom(f->fd, (void *) &msg, sizeof(msg), MSG_WAITALL, to, &len);
934         if (ret < 0) {
935                 td_verror(td, errno, "recvfrom udp link open");
936                 return ret;
937         }
938
939         if (ntohl(msg.magic) != FIO_LINK_OPEN_CLOSE_MAGIC ||
940             ntohl(msg.cmd) != FIO_LINK_OPEN) {
941                 log_err("fio: bad udp open magic %x/%x\n", ntohl(msg.magic),
942                                                                 ntohl(msg.cmd));
943                 return -1;
944         }
945
946         fio_gettime(&td->start, NULL);
947         return 0;
948 }
949
950 static int fio_netio_send_open(struct thread_data *td, struct fio_file *f)
951 {
952         struct netio_data *nd = td->io_ops_data;
953         struct netio_options *o = td->eo;
954         struct udp_close_msg msg;
955         struct sockaddr *to;
956         socklen_t len;
957         int ret;
958
959         if (is_ipv6(o)) {
960                 len = sizeof(nd->addr6);
961                 to = (struct sockaddr *) &nd->addr6;
962         } else {
963                 len = sizeof(nd->addr);
964                 to = (struct sockaddr *) &nd->addr;
965         }
966
967         msg.magic = htonl(FIO_LINK_OPEN_CLOSE_MAGIC);
968         msg.cmd = htonl(FIO_LINK_OPEN);
969
970         ret = sendto(f->fd, (void *) &msg, sizeof(msg), MSG_WAITALL, to, len);
971         if (ret < 0) {
972                 td_verror(td, errno, "sendto udp link open");
973                 return ret;
974         }
975
976         return 0;
977 }
978
979 static int fio_netio_open_file(struct thread_data *td, struct fio_file *f)
980 {
981         int ret;
982         struct netio_options *o = td->eo;
983
984         if (o->listen)
985                 ret = fio_netio_accept(td, f);
986         else
987                 ret = fio_netio_connect(td, f);
988
989         if (ret) {
990                 f->fd = -1;
991                 return ret;
992         }
993
994         if (is_udp(o)) {
995                 if (td_write(td))
996                         ret = fio_netio_send_open(td, f);
997                 else {
998                         int state;
999
1000                         state = td->runstate;
1001                         td_set_runstate(td, TD_SETTING_UP);
1002                         ret = fio_netio_udp_recv_open(td, f);
1003                         td_set_runstate(td, state);
1004                 }
1005         }
1006
1007         if (ret)
1008                 fio_netio_close_file(td, f);
1009
1010         return ret;
1011 }
1012
1013 static int fio_fill_addr(struct thread_data *td, const char *host, int af,
1014                          void *dst, struct addrinfo **res)
1015 {
1016         struct netio_options *o = td->eo;
1017         struct addrinfo hints;
1018         int ret;
1019
1020         if (inet_pton(af, host, dst))
1021                 return 0;
1022
1023         memset(&hints, 0, sizeof(hints));
1024
1025         if (is_tcp(o))
1026                 hints.ai_socktype = SOCK_STREAM;
1027         else
1028                 hints.ai_socktype = SOCK_DGRAM;
1029
1030         if (is_ipv6(o))
1031                 hints.ai_family = AF_INET6;
1032         else
1033                 hints.ai_family = AF_INET;
1034
1035         ret = getaddrinfo(host, NULL, &hints, res);
1036         if (ret) {
1037                 int e = EINVAL;
1038                 char str[128];
1039
1040                 if (ret == EAI_SYSTEM)
1041                         e = errno;
1042
1043                 snprintf(str, sizeof(str), "getaddrinfo: %s", gai_strerror(ret));
1044                 td_verror(td, e, str);
1045                 return 1;
1046         }
1047
1048         return 0;
1049 }
1050
1051 static int fio_netio_setup_connect_inet(struct thread_data *td,
1052                                         const char *host, unsigned short port)
1053 {
1054         struct netio_data *nd = td->io_ops_data;
1055         struct netio_options *o = td->eo;
1056         struct addrinfo *res = NULL;
1057         void *dst, *src;
1058         int af, len;
1059
1060         if (!host) {
1061                 log_err("fio: connect with no host to connect to.\n");
1062                 if (td_read(td))
1063                         log_err("fio: did you forget to set 'listen'?\n");
1064
1065                 td_verror(td, EINVAL, "no hostname= set");
1066                 return 1;
1067         }
1068
1069         nd->addr.sin_family = AF_INET;
1070         nd->addr.sin_port = htons(port);
1071         nd->addr6.sin6_family = AF_INET6;
1072         nd->addr6.sin6_port = htons(port);
1073
1074         if (is_ipv6(o)) {
1075                 af = AF_INET6;
1076                 dst = &nd->addr6.sin6_addr;
1077         } else {
1078                 af = AF_INET;
1079                 dst = &nd->addr.sin_addr;
1080         }
1081
1082         if (fio_fill_addr(td, host, af, dst, &res))
1083                 return 1;
1084
1085         if (!res)
1086                 return 0;
1087
1088         if (is_ipv6(o)) {
1089                 len = sizeof(nd->addr6.sin6_addr);
1090                 src = &((struct sockaddr_in6 *) res->ai_addr)->sin6_addr;
1091         } else {
1092                 len = sizeof(nd->addr.sin_addr);
1093                 src = &((struct sockaddr_in *) res->ai_addr)->sin_addr;
1094         }
1095
1096         memcpy(dst, src, len);
1097         freeaddrinfo(res);
1098         return 0;
1099 }
1100
1101 static int fio_netio_setup_connect_unix(struct thread_data *td,
1102                                         const char *path)
1103 {
1104         struct netio_data *nd = td->io_ops_data;
1105         struct sockaddr_un *soun = &nd->addr_un;
1106
1107         soun->sun_family = AF_UNIX;
1108         memset(soun->sun_path, 0, sizeof(soun->sun_path));
1109         strncpy(soun->sun_path, path, sizeof(soun->sun_path) - 1);
1110         return 0;
1111 }
1112
1113 static int fio_netio_setup_connect(struct thread_data *td)
1114 {
1115         struct netio_options *o = td->eo;
1116
1117         if (is_udp(o) || is_tcp(o))
1118                 return fio_netio_setup_connect_inet(td, td->o.filename,o->port);
1119         else
1120                 return fio_netio_setup_connect_unix(td, td->o.filename);
1121 }
1122
1123 static int fio_netio_setup_listen_unix(struct thread_data *td, const char *path)
1124 {
1125         struct netio_data *nd = td->io_ops_data;
1126         struct sockaddr_un *addr = &nd->addr_un;
1127         mode_t mode;
1128         int len, fd;
1129
1130         fd = socket(AF_UNIX, SOCK_STREAM, 0);
1131         if (fd < 0) {
1132                 log_err("fio: socket: %s\n", strerror(errno));
1133                 return -1;
1134         }
1135
1136         mode = umask(000);
1137
1138         memset(addr, 0, sizeof(*addr));
1139         addr->sun_family = AF_UNIX;
1140         strncpy(addr->sun_path, path, sizeof(addr->sun_path) - 1);
1141         unlink(path);
1142
1143         len = sizeof(addr->sun_family) + strlen(path) + 1;
1144
1145         if (bind(fd, (struct sockaddr *) addr, len) < 0) {
1146                 log_err("fio: bind: %s\n", strerror(errno));
1147                 close(fd);
1148                 return -1;
1149         }
1150
1151         umask(mode);
1152         nd->listenfd = fd;
1153         return 0;
1154 }
1155
1156 static int fio_netio_setup_listen_inet(struct thread_data *td, short port)
1157 {
1158         struct netio_data *nd = td->io_ops_data;
1159         struct netio_options *o = td->eo;
1160         struct ip_mreq mr;
1161         struct sockaddr_in sin;
1162         struct sockaddr *saddr;
1163         int fd, opt, type, domain;
1164         socklen_t len;
1165
1166         memset(&sin, 0, sizeof(sin));
1167
1168         if (o->proto == FIO_TYPE_TCP) {
1169                 type = SOCK_STREAM;
1170                 domain = AF_INET;
1171         } else if (o->proto == FIO_TYPE_TCP_V6) {
1172                 type = SOCK_STREAM;
1173                 domain = AF_INET6;
1174         } else if (o->proto == FIO_TYPE_UDP) {
1175                 type = SOCK_DGRAM;
1176                 domain = AF_INET;
1177         } else if (o->proto == FIO_TYPE_UDP_V6) {
1178                 type = SOCK_DGRAM;
1179                 domain = AF_INET6;
1180         } else {
1181                 log_err("fio: unknown proto %d\n", o->proto);
1182                 return 1;
1183         }
1184
1185         fd = socket(domain, type, 0);
1186         if (fd < 0) {
1187                 td_verror(td, errno, "socket");
1188                 return 1;
1189         }
1190
1191         opt = 1;
1192         if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *) &opt, sizeof(opt)) < 0) {
1193                 td_verror(td, errno, "setsockopt");
1194                 close(fd);
1195                 return 1;
1196         }
1197 #ifdef SO_REUSEPORT
1198         if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, (void *) &opt, sizeof(opt)) < 0) {
1199                 td_verror(td, errno, "setsockopt");
1200                 close(fd);
1201                 return 1;
1202         }
1203 #endif
1204
1205         if (set_window_size(td, fd)) {
1206                 close(fd);
1207                 return 1;
1208         }
1209         if (set_mss(td, fd)) {
1210                 close(fd);
1211                 return 1;
1212         }
1213
1214         if (td->o.filename) {
1215                 if (!is_udp(o) || !fio_netio_is_multicast(td->o.filename)) {
1216                         log_err("fio: hostname not valid for non-multicast inbound network IO\n");
1217                         close(fd);
1218                         return 1;
1219                 }
1220                 if (is_ipv6(o)) {
1221                         log_err("fio: IPv6 not supported for multicast network IO");
1222                         close(fd);
1223                         return 1;
1224                 }
1225
1226                 inet_aton(td->o.filename, &sin.sin_addr);
1227
1228                 mr.imr_multiaddr = sin.sin_addr;
1229                 if (o->intfc) {
1230                         if (inet_aton(o->intfc, &mr.imr_interface) == 0) {
1231                                 log_err("fio: interface not valid interface IP\n");
1232                                 close(fd);
1233                                 return 1;
1234                         }
1235                 } else {
1236                         mr.imr_interface.s_addr = htonl(INADDR_ANY);
1237                 }
1238
1239                 if (setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (const char*)&mr, sizeof(mr)) < 0) {
1240                         td_verror(td, errno, "setsockopt IP_ADD_MEMBERSHIP");
1241                         close(fd);
1242                         return 1;
1243                 }
1244         }
1245
1246         if (!is_ipv6(o)) {
1247                 saddr = (struct sockaddr *) &nd->addr;
1248                 len = sizeof(nd->addr);
1249
1250                 nd->addr.sin_family = AF_INET;
1251                 nd->addr.sin_addr.s_addr = sin.sin_addr.s_addr ? sin.sin_addr.s_addr : htonl(INADDR_ANY);
1252                 nd->addr.sin_port = htons(port);
1253         } else {
1254                 saddr = (struct sockaddr *) &nd->addr6;
1255                 len = sizeof(nd->addr6);
1256
1257                 nd->addr6.sin6_family = AF_INET6;
1258                 nd->addr6.sin6_addr = in6addr_any;
1259                 nd->addr6.sin6_port = htons(port);
1260         }
1261
1262         if (bind(fd, saddr, len) < 0) {
1263                 close(fd);
1264                 td_verror(td, errno, "bind");
1265                 return 1;
1266         }
1267
1268         nd->listenfd = fd;
1269         return 0;
1270 }
1271
1272 static int fio_netio_setup_listen(struct thread_data *td)
1273 {
1274         struct netio_data *nd = td->io_ops_data;
1275         struct netio_options *o = td->eo;
1276         int ret;
1277
1278         if (is_udp(o) || is_tcp(o))
1279                 ret = fio_netio_setup_listen_inet(td, o->port);
1280         else
1281                 ret = fio_netio_setup_listen_unix(td, td->o.filename);
1282
1283         if (ret)
1284                 return ret;
1285         if (is_udp(o))
1286                 return 0;
1287
1288         if (listen(nd->listenfd, 10) < 0) {
1289                 td_verror(td, errno, "listen");
1290                 nd->listenfd = -1;
1291                 return 1;
1292         }
1293
1294         return 0;
1295 }
1296
1297 static int fio_netio_init(struct thread_data *td)
1298 {
1299         struct netio_options *o = td->eo;
1300         int ret;
1301
1302 #ifdef WIN32
1303         WSADATA wsd;
1304         WSAStartup(MAKEWORD(2,2), &wsd);
1305 #endif
1306
1307         if (td_random(td)) {
1308                 log_err("fio: network IO can't be random\n");
1309                 return 1;
1310         }
1311
1312         if (o->proto == FIO_TYPE_UNIX && o->port) {
1313                 log_err("fio: network IO port not valid with unix socket\n");
1314                 return 1;
1315         } else if (o->proto != FIO_TYPE_UNIX && !o->port) {
1316                 log_err("fio: network IO requires port for tcp or udp\n");
1317                 return 1;
1318         }
1319
1320         o->port += td->subjob_number;
1321
1322         if (!is_tcp(o)) {
1323                 if (o->listen) {
1324                         log_err("fio: listen only valid for TCP proto IO\n");
1325                         return 1;
1326                 }
1327                 if (td_rw(td)) {
1328                         log_err("fio: datagram network connections must be"
1329                                    " read OR write\n");
1330                         return 1;
1331                 }
1332                 if (o->proto == FIO_TYPE_UNIX && !td->o.filename) {
1333                         log_err("fio: UNIX sockets need host/filename\n");
1334                         return 1;
1335                 }
1336                 o->listen = td_read(td);
1337         }
1338
1339         if (o->listen)
1340                 ret = fio_netio_setup_listen(td);
1341         else
1342                 ret = fio_netio_setup_connect(td);
1343
1344         return ret;
1345 }
1346
1347 static void fio_netio_cleanup(struct thread_data *td)
1348 {
1349         struct netio_data *nd = td->io_ops_data;
1350
1351         if (nd) {
1352                 if (nd->listenfd != -1)
1353                         close(nd->listenfd);
1354                 if (nd->pipes[0] != -1)
1355                         close(nd->pipes[0]);
1356                 if (nd->pipes[1] != -1)
1357                         close(nd->pipes[1]);
1358
1359                 free(nd);
1360         }
1361 }
1362
1363 static int fio_netio_setup(struct thread_data *td)
1364 {
1365         struct netio_data *nd;
1366
1367         if (!td->files_index) {
1368                 add_file(td, td->o.filename ?: "net", 0, 0);
1369                 td->o.nr_files = td->o.nr_files ?: 1;
1370                 td->o.open_files++;
1371         }
1372
1373         if (!td->io_ops_data) {
1374                 nd = malloc(sizeof(*nd));;
1375
1376                 memset(nd, 0, sizeof(*nd));
1377                 nd->listenfd = -1;
1378                 nd->pipes[0] = nd->pipes[1] = -1;
1379                 td->io_ops_data = nd;
1380         }
1381
1382         return 0;
1383 }
1384
1385 static void fio_netio_terminate(struct thread_data *td)
1386 {
1387         kill(td->pid, SIGTERM);
1388 }
1389
1390 #ifdef CONFIG_LINUX_SPLICE
1391 static int fio_netio_setup_splice(struct thread_data *td)
1392 {
1393         struct netio_data *nd;
1394
1395         fio_netio_setup(td);
1396
1397         nd = td->io_ops_data;
1398         if (nd) {
1399                 if (pipe(nd->pipes) < 0)
1400                         return 1;
1401
1402                 nd->use_splice = 1;
1403                 return 0;
1404         }
1405
1406         return 1;
1407 }
1408
1409 static struct ioengine_ops ioengine_splice = {
1410         .name                   = "netsplice",
1411         .version                = FIO_IOOPS_VERSION,
1412         .prep                   = fio_netio_prep,
1413         .queue                  = fio_netio_queue,
1414         .setup                  = fio_netio_setup_splice,
1415         .init                   = fio_netio_init,
1416         .cleanup                = fio_netio_cleanup,
1417         .open_file              = fio_netio_open_file,
1418         .close_file             = fio_netio_close_file,
1419         .terminate              = fio_netio_terminate,
1420         .options                = options,
1421         .option_struct_size     = sizeof(struct netio_options),
1422         .flags                  = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
1423                                   FIO_PIPEIO,
1424 };
1425 #endif
1426
1427 static struct ioengine_ops ioengine_rw = {
1428         .name                   = "net",
1429         .version                = FIO_IOOPS_VERSION,
1430         .prep                   = fio_netio_prep,
1431         .queue                  = fio_netio_queue,
1432         .setup                  = fio_netio_setup,
1433         .init                   = fio_netio_init,
1434         .cleanup                = fio_netio_cleanup,
1435         .open_file              = fio_netio_open_file,
1436         .close_file             = fio_netio_close_file,
1437         .terminate              = fio_netio_terminate,
1438         .options                = options,
1439         .option_struct_size     = sizeof(struct netio_options),
1440         .flags                  = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
1441                                   FIO_PIPEIO | FIO_BIT_BASED,
1442 };
1443
1444 static int str_hostname_cb(void *data, const char *input)
1445 {
1446         struct netio_options *o = data;
1447
1448         if (o->td->o.filename)
1449                 free(o->td->o.filename);
1450         o->td->o.filename = strdup(input);
1451         return 0;
1452 }
1453
1454 static void fio_init fio_netio_register(void)
1455 {
1456         register_ioengine(&ioengine_rw);
1457 #ifdef CONFIG_LINUX_SPLICE
1458         register_ioengine(&ioengine_splice);
1459 #endif
1460 }
1461
1462 static void fio_exit fio_netio_unregister(void)
1463 {
1464         unregister_ioengine(&ioengine_rw);
1465 #ifdef CONFIG_LINUX_SPLICE
1466         unregister_ioengine(&ioengine_splice);
1467 #endif
1468 }