9a327da8a442659f1098711671227102c27126da
[fio.git] / engines / net.c
1 /*
2  * net engine
3  *
4  * IO engine that reads/writes to/from sockets.
5  *
6  */
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include <unistd.h>
10 #include <signal.h>
11 #include <errno.h>
12 #include <assert.h>
13 #include <netinet/in.h>
14 #include <netinet/tcp.h>
15 #include <arpa/inet.h>
16 #include <netdb.h>
17 #include <sys/poll.h>
18 #include <sys/types.h>
19 #include <sys/stat.h>
20 #include <sys/socket.h>
21 #include <sys/un.h>
22
23 #include "../fio.h"
24
25 struct netio_data {
26         int listenfd;
27         int use_splice;
28         int pipes[2];
29         struct sockaddr_in addr;
30         struct sockaddr_in6 addr6;
31         struct sockaddr_un addr_un;
32 };
33
34 struct netio_options {
35         struct thread_data *td;
36         unsigned int port;
37         unsigned int proto;
38         unsigned int listen;
39         unsigned int pingpong;
40         unsigned int nodelay;
41         unsigned int ttl;
42         unsigned int window_size;
43         unsigned int mss;
44         char *intfc;
45 };
46
47 struct udp_close_msg {
48         uint32_t magic;
49         uint32_t cmd;
50 };
51
52 enum {
53         FIO_LINK_CLOSE = 0x89,
54         FIO_LINK_OPEN_CLOSE_MAGIC = 0x6c696e6b,
55         FIO_LINK_OPEN = 0x98,
56
57         FIO_TYPE_TCP    = 1,
58         FIO_TYPE_UDP    = 2,
59         FIO_TYPE_UNIX   = 3,
60         FIO_TYPE_TCP_V6 = 4,
61         FIO_TYPE_UDP_V6 = 5,
62 };
63
64 static int str_hostname_cb(void *data, const char *input);
65 static struct fio_option options[] = {
66         {
67                 .name   = "hostname",
68                 .lname  = "net engine hostname",
69                 .type   = FIO_OPT_STR_STORE,
70                 .cb     = str_hostname_cb,
71                 .help   = "Hostname for net IO engine",
72                 .category = FIO_OPT_C_ENGINE,
73                 .group  = FIO_OPT_G_NETIO,
74         },
75         {
76                 .name   = "port",
77                 .lname  = "net engine port",
78                 .type   = FIO_OPT_INT,
79                 .off1   = offsetof(struct netio_options, port),
80                 .minval = 1,
81                 .maxval = 65535,
82                 .help   = "Port to use for TCP or UDP net connections",
83                 .category = FIO_OPT_C_ENGINE,
84                 .group  = FIO_OPT_G_NETIO,
85         },
86         {
87                 .name   = "protocol",
88                 .lname  = "net engine protocol",
89                 .alias  = "proto",
90                 .type   = FIO_OPT_STR,
91                 .off1   = offsetof(struct netio_options, proto),
92                 .help   = "Network protocol to use",
93                 .def    = "tcp",
94                 .posval = {
95                           { .ival = "tcp",
96                             .oval = FIO_TYPE_TCP,
97                             .help = "Transmission Control Protocol",
98                           },
99 #ifdef CONFIG_IPV6
100                           { .ival = "tcpv6",
101                             .oval = FIO_TYPE_TCP_V6,
102                             .help = "Transmission Control Protocol V6",
103                           },
104 #endif
105                           { .ival = "udp",
106                             .oval = FIO_TYPE_UDP,
107                             .help = "User Datagram Protocol",
108                           },
109 #ifdef CONFIG_IPV6
110                           { .ival = "udpv6",
111                             .oval = FIO_TYPE_UDP_V6,
112                             .help = "User Datagram Protocol V6",
113                           },
114 #endif
115                           { .ival = "unix",
116                             .oval = FIO_TYPE_UNIX,
117                             .help = "UNIX domain socket",
118                           },
119                 },
120                 .category = FIO_OPT_C_ENGINE,
121                 .group  = FIO_OPT_G_NETIO,
122         },
123 #ifdef CONFIG_TCP_NODELAY
124         {
125                 .name   = "nodelay",
126                 .type   = FIO_OPT_BOOL,
127                 .off1   = offsetof(struct netio_options, nodelay),
128                 .help   = "Use TCP_NODELAY on TCP connections",
129                 .category = FIO_OPT_C_ENGINE,
130                 .group  = FIO_OPT_G_NETIO,
131         },
132 #endif
133         {
134                 .name   = "listen",
135                 .lname  = "net engine listen",
136                 .type   = FIO_OPT_STR_SET,
137                 .off1   = offsetof(struct netio_options, listen),
138                 .help   = "Listen for incoming TCP connections",
139                 .category = FIO_OPT_C_ENGINE,
140                 .group  = FIO_OPT_G_NETIO,
141         },
142         {
143                 .name   = "pingpong",
144                 .type   = FIO_OPT_STR_SET,
145                 .off1   = offsetof(struct netio_options, pingpong),
146                 .help   = "Ping-pong IO requests",
147                 .category = FIO_OPT_C_ENGINE,
148                 .group  = FIO_OPT_G_NETIO,
149         },
150         {
151                 .name   = "interface",
152                 .lname  = "net engine interface",
153                 .type   = FIO_OPT_STR_STORE,
154                 .off1   = offsetof(struct netio_options, intfc),
155                 .help   = "Network interface to use",
156                 .category = FIO_OPT_C_ENGINE,
157                 .group  = FIO_OPT_G_NETIO,
158         },
159         {
160                 .name   = "ttl",
161                 .lname  = "net engine multicast ttl",
162                 .type   = FIO_OPT_INT,
163                 .off1   = offsetof(struct netio_options, ttl),
164                 .def    = "1",
165                 .minval = 0,
166                 .help   = "Time-to-live value for outgoing UDP multicast packets",
167                 .category = FIO_OPT_C_ENGINE,
168                 .group  = FIO_OPT_G_NETIO,
169         },
170 #ifdef CONFIG_NET_WINDOWSIZE
171         {
172                 .name   = "window_size",
173                 .lname  = "Window Size",
174                 .type   = FIO_OPT_INT,
175                 .off1   = offsetof(struct netio_options, window_size),
176                 .minval = 0,
177                 .help   = "Set socket buffer window size",
178                 .category = FIO_OPT_C_ENGINE,
179                 .group  = FIO_OPT_G_NETIO,
180         },
181 #endif
182 #ifdef CONFIG_NET_MSS
183         {
184                 .name   = "mss",
185                 .lname  = "Maximum segment size",
186                 .type   = FIO_OPT_INT,
187                 .off1   = offsetof(struct netio_options, mss),
188                 .minval = 0,
189                 .help   = "Set TCP maximum segment size",
190                 .category = FIO_OPT_C_ENGINE,
191                 .group  = FIO_OPT_G_NETIO,
192         },
193 #endif
194         {
195                 .name   = NULL,
196         },
197 };
198
199 static inline int is_udp(struct netio_options *o)
200 {
201         return o->proto == FIO_TYPE_UDP || o->proto == FIO_TYPE_UDP_V6;
202 }
203
204 static inline int is_tcp(struct netio_options *o)
205 {
206         return o->proto == FIO_TYPE_TCP || o->proto == FIO_TYPE_TCP_V6;
207 }
208
209 static inline int is_ipv6(struct netio_options *o)
210 {
211         return o->proto == FIO_TYPE_UDP_V6 || o->proto == FIO_TYPE_TCP_V6;
212 }
213
214 static int set_window_size(struct thread_data *td, int fd)
215 {
216 #ifdef CONFIG_NET_WINDOWSIZE
217         struct netio_options *o = td->eo;
218         unsigned int wss;
219         int snd, rcv, ret;
220
221         if (!o->window_size)
222                 return 0;
223
224         rcv = o->listen || o->pingpong;
225         snd = !o->listen || o->pingpong;
226         wss = o->window_size;
227         ret = 0;
228
229         if (rcv) {
230                 ret = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (void *) &wss,
231                                         sizeof(wss));
232                 if (ret < 0)
233                         td_verror(td, errno, "rcvbuf window size");
234         }
235         if (snd && !ret) {
236                 ret = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (void *) &wss,
237                                         sizeof(wss));
238                 if (ret < 0)
239                         td_verror(td, errno, "sndbuf window size");
240         }
241
242         return ret;
243 #else
244         td_verror(td, -EINVAL, "setsockopt window size");
245         return -1;
246 #endif
247 }
248
249 static int set_mss(struct thread_data *td, int fd)
250 {
251 #ifdef CONFIG_NET_MSS
252         struct netio_options *o = td->eo;
253         unsigned int mss;
254         int ret;
255
256         if (!o->mss || !is_tcp(o))
257                 return 0;
258
259         mss = o->mss;
260         ret = setsockopt(fd, IPPROTO_TCP, TCP_MAXSEG, (void *) &mss,
261                                 sizeof(mss));
262         if (ret < 0)
263                 td_verror(td, errno, "setsockopt TCP_MAXSEG");
264
265         return ret;
266 #else
267         td_verror(td, -EINVAL, "setsockopt TCP_MAXSEG");
268         return -1;
269 #endif
270 }
271
272
273 /*
274  * Return -1 for error and 'nr events' for a positive number
275  * of events
276  */
277 static int poll_wait(struct thread_data *td, int fd, short events)
278 {
279         struct pollfd pfd;
280         int ret;
281
282         while (!td->terminate) {
283                 pfd.fd = fd;
284                 pfd.events = events;
285                 ret = poll(&pfd, 1, -1);
286                 if (ret < 0) {
287                         if (errno == EINTR)
288                                 break;
289
290                         td_verror(td, errno, "poll");
291                         return -1;
292                 } else if (!ret)
293                         continue;
294
295                 break;
296         }
297
298         if (pfd.revents & events)
299                 return 1;
300
301         return -1;
302 }
303
304 static int fio_netio_is_multicast(const char *mcaddr)
305 {
306         in_addr_t addr = inet_network(mcaddr);
307         if (addr == -1)
308                 return 0;
309
310         if (inet_network("224.0.0.0") <= addr &&
311             inet_network("239.255.255.255") >= addr)
312                 return 1;
313
314         return 0;
315 }
316
317
318 static int fio_netio_prep(struct thread_data *td, struct io_u *io_u)
319 {
320         struct netio_options *o = td->eo;
321
322         /*
323          * Make sure we don't see spurious reads to a receiver, and vice versa
324          */
325         if (is_tcp(o))
326                 return 0;
327
328         if ((o->listen && io_u->ddir == DDIR_WRITE) ||
329             (!o->listen && io_u->ddir == DDIR_READ)) {
330                 td_verror(td, EINVAL, "bad direction");
331                 return 1;
332         }
333
334         return 0;
335 }
336
337 #ifdef CONFIG_LINUX_SPLICE
338 static int splice_io_u(int fdin, int fdout, unsigned int len)
339 {
340         int bytes = 0;
341
342         while (len) {
343                 int ret = splice(fdin, NULL, fdout, NULL, len, 0);
344
345                 if (ret < 0) {
346                         if (!bytes)
347                                 bytes = ret;
348
349                         break;
350                 } else if (!ret)
351                         break;
352
353                 bytes += ret;
354                 len -= ret;
355         }
356
357         return bytes;
358 }
359
360 /*
361  * Receive bytes from a socket and fill them into the internal pipe
362  */
363 static int splice_in(struct thread_data *td, struct io_u *io_u)
364 {
365         struct netio_data *nd = td->io_ops->data;
366
367         return splice_io_u(io_u->file->fd, nd->pipes[1], io_u->xfer_buflen);
368 }
369
370 /*
371  * Transmit 'len' bytes from the internal pipe
372  */
373 static int splice_out(struct thread_data *td, struct io_u *io_u,
374                       unsigned int len)
375 {
376         struct netio_data *nd = td->io_ops->data;
377
378         return splice_io_u(nd->pipes[0], io_u->file->fd, len);
379 }
380
381 static int vmsplice_io_u(struct io_u *io_u, int fd, unsigned int len)
382 {
383         struct iovec iov = {
384                 .iov_base = io_u->xfer_buf,
385                 .iov_len = len,
386         };
387         int bytes = 0;
388
389         while (iov.iov_len) {
390                 int ret = vmsplice(fd, &iov, 1, SPLICE_F_MOVE);
391
392                 if (ret < 0) {
393                         if (!bytes)
394                                 bytes = ret;
395                         break;
396                 } else if (!ret)
397                         break;
398
399                 iov.iov_len -= ret;
400                 iov.iov_base += ret;
401                 bytes += ret;
402         }
403
404         return bytes;
405
406 }
407
408 /*
409  * vmsplice() pipe to io_u buffer
410  */
411 static int vmsplice_io_u_out(struct thread_data *td, struct io_u *io_u,
412                              unsigned int len)
413 {
414         struct netio_data *nd = td->io_ops->data;
415
416         return vmsplice_io_u(io_u, nd->pipes[0], len);
417 }
418
419 /*
420  * vmsplice() io_u to pipe
421  */
422 static int vmsplice_io_u_in(struct thread_data *td, struct io_u *io_u)
423 {
424         struct netio_data *nd = td->io_ops->data;
425
426         return vmsplice_io_u(io_u, nd->pipes[1], io_u->xfer_buflen);
427 }
428
429 /*
430  * splice receive - transfer socket data into a pipe using splice, then map
431  * that pipe data into the io_u using vmsplice.
432  */
433 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
434 {
435         int ret;
436
437         ret = splice_in(td, io_u);
438         if (ret > 0)
439                 return vmsplice_io_u_out(td, io_u, ret);
440
441         return ret;
442 }
443
444 /*
445  * splice transmit - map data from the io_u into a pipe by using vmsplice,
446  * then transfer that pipe to a socket using splice.
447  */
448 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
449 {
450         int ret;
451
452         ret = vmsplice_io_u_in(td, io_u);
453         if (ret > 0)
454                 return splice_out(td, io_u, ret);
455
456         return ret;
457 }
458 #else
459 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
460 {
461         errno = EOPNOTSUPP;
462         return -1;
463 }
464
465 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
466 {
467         errno = EOPNOTSUPP;
468         return -1;
469 }
470 #endif
471
472 static int fio_netio_send(struct thread_data *td, struct io_u *io_u)
473 {
474         struct netio_data *nd = td->io_ops->data;
475         struct netio_options *o = td->eo;
476         int ret, flags = 0;
477
478         do {
479                 if (is_udp(o)) {
480                         const struct sockaddr *to;
481                         socklen_t len;
482
483                         if (is_ipv6(o)) {
484                                 to = (struct sockaddr *) &nd->addr6;
485                                 len = sizeof(nd->addr6);
486                         } else {
487                                 to = (struct sockaddr *) &nd->addr;
488                                 len = sizeof(nd->addr);
489                         }
490
491                         ret = sendto(io_u->file->fd, io_u->xfer_buf,
492                                         io_u->xfer_buflen, flags, to, len);
493                 } else {
494                         /*
495                          * if we are going to write more, set MSG_MORE
496                          */
497 #ifdef MSG_MORE
498                         if ((td->this_io_bytes[DDIR_WRITE] + io_u->xfer_buflen <
499                             td->o.size) && !o->pingpong)
500                                 flags |= MSG_MORE;
501 #endif
502                         ret = send(io_u->file->fd, io_u->xfer_buf,
503                                         io_u->xfer_buflen, flags);
504                 }
505                 if (ret > 0)
506                         break;
507
508                 ret = poll_wait(td, io_u->file->fd, POLLOUT);
509                 if (ret <= 0)
510                         break;
511         } while (1);
512
513         return ret;
514 }
515
516 static int is_udp_close(struct io_u *io_u, int len)
517 {
518         struct udp_close_msg *msg;
519
520         if (len != sizeof(struct udp_close_msg))
521                 return 0;
522
523         msg = io_u->xfer_buf;
524         if (ntohl(msg->magic) != FIO_LINK_OPEN_CLOSE_MAGIC)
525                 return 0;
526         if (ntohl(msg->cmd) != FIO_LINK_CLOSE)
527                 return 0;
528
529         return 1;
530 }
531
532 static int fio_netio_recv(struct thread_data *td, struct io_u *io_u)
533 {
534         struct netio_data *nd = td->io_ops->data;
535         struct netio_options *o = td->eo;
536         int ret, flags = 0;
537
538         do {
539                 if (is_udp(o)) {
540                         struct sockaddr *from;
541                         socklen_t l, *len = &l;
542
543                         if (o->listen) {
544                                 if (!is_ipv6(o)) {
545                                         from = (struct sockaddr *) &nd->addr;
546                                         *len = sizeof(nd->addr);
547                                 } else {
548                                         from = (struct sockaddr *) &nd->addr6;
549                                         *len = sizeof(nd->addr6);
550                                 }
551                         } else {
552                                 from = NULL;
553                                 len = NULL;
554                         }
555
556                         ret = recvfrom(io_u->file->fd, io_u->xfer_buf,
557                                         io_u->xfer_buflen, flags, from, len);
558                         if (is_udp_close(io_u, ret)) {
559                                 td->done = 1;
560                                 return 0;
561                         }
562                 } else {
563                         ret = recv(io_u->file->fd, io_u->xfer_buf,
564                                         io_u->xfer_buflen, flags);
565                 }
566                 if (ret > 0)
567                         break;
568                 else if (!ret && (flags & MSG_WAITALL))
569                         break;
570
571                 ret = poll_wait(td, io_u->file->fd, POLLIN);
572                 if (ret <= 0)
573                         break;
574                 flags |= MSG_WAITALL;
575         } while (1);
576
577         return ret;
578 }
579
580 static int __fio_netio_queue(struct thread_data *td, struct io_u *io_u,
581                              enum fio_ddir ddir)
582 {
583         struct netio_data *nd = td->io_ops->data;
584         struct netio_options *o = td->eo;
585         int ret;
586
587         if (ddir == DDIR_WRITE) {
588                 if (!nd->use_splice || is_udp(o) ||
589                     o->proto == FIO_TYPE_UNIX)
590                         ret = fio_netio_send(td, io_u);
591                 else
592                         ret = fio_netio_splice_out(td, io_u);
593         } else if (ddir == DDIR_READ) {
594                 if (!nd->use_splice || is_udp(o) ||
595                     o->proto == FIO_TYPE_UNIX)
596                         ret = fio_netio_recv(td, io_u);
597                 else
598                         ret = fio_netio_splice_in(td, io_u);
599         } else
600                 ret = 0;        /* must be a SYNC */
601
602         if (ret != (int) io_u->xfer_buflen) {
603                 if (ret > 0) {
604                         io_u->resid = io_u->xfer_buflen - ret;
605                         io_u->error = 0;
606                         return FIO_Q_COMPLETED;
607                 } else if (!ret)
608                         return FIO_Q_BUSY;
609                 else {
610                         int err = errno;
611
612                         if (ddir == DDIR_WRITE && err == EMSGSIZE)
613                                 return FIO_Q_BUSY;
614
615                         io_u->error = err;
616                 }
617         }
618
619         if (io_u->error)
620                 td_verror(td, io_u->error, "xfer");
621
622         return FIO_Q_COMPLETED;
623 }
624
625 static int fio_netio_queue(struct thread_data *td, struct io_u *io_u)
626 {
627         struct netio_options *o = td->eo;
628         int ret;
629
630         fio_ro_check(td, io_u);
631
632         ret = __fio_netio_queue(td, io_u, io_u->ddir);
633         if (!o->pingpong || ret != FIO_Q_COMPLETED)
634                 return ret;
635
636         /*
637          * For ping-pong mode, receive or send reply as needed
638          */
639         if (td_read(td) && io_u->ddir == DDIR_READ)
640                 ret = __fio_netio_queue(td, io_u, DDIR_WRITE);
641         else if (td_write(td) && io_u->ddir == DDIR_WRITE)
642                 ret = __fio_netio_queue(td, io_u, DDIR_READ);
643
644         return ret;
645 }
646
647 static int fio_netio_connect(struct thread_data *td, struct fio_file *f)
648 {
649         struct netio_data *nd = td->io_ops->data;
650         struct netio_options *o = td->eo;
651         int type, domain;
652
653         if (o->proto == FIO_TYPE_TCP) {
654                 domain = AF_INET;
655                 type = SOCK_STREAM;
656         } else if (o->proto == FIO_TYPE_TCP_V6) {
657                 domain = AF_INET6;
658                 type = SOCK_STREAM;
659         } else if (o->proto == FIO_TYPE_UDP) {
660                 domain = AF_INET;
661                 type = SOCK_DGRAM;
662         } else if (o->proto == FIO_TYPE_UDP_V6) {
663                 domain = AF_INET6;
664                 type = SOCK_DGRAM;
665         } else if (o->proto == FIO_TYPE_UNIX) {
666                 domain = AF_UNIX;
667                 type = SOCK_STREAM;
668         } else {
669                 log_err("fio: bad network type %d\n", o->proto);
670                 f->fd = -1;
671                 return 1;
672         }
673
674         f->fd = socket(domain, type, 0);
675         if (f->fd < 0) {
676                 td_verror(td, errno, "socket");
677                 return 1;
678         }
679
680 #ifdef CONFIG_TCP_NODELAY
681         if (o->nodelay && is_tcp(o)) {
682                 int optval = 1;
683
684                 if (setsockopt(f->fd, IPPROTO_TCP, TCP_NODELAY, (void *) &optval, sizeof(int)) < 0) {
685                         log_err("fio: cannot set TCP_NODELAY option on socket (%s), disable with 'nodelay=0'\n", strerror(errno));
686                         return 1;
687                 }
688         }
689 #endif
690
691         if (set_window_size(td, f->fd)) {
692                 close(f->fd);
693                 return 1;
694         }
695         if (set_mss(td, f->fd)) {
696                 close(f->fd);
697                 return 1;
698         }
699
700         if (is_udp(o)) {
701                 if (!fio_netio_is_multicast(td->o.filename))
702                         return 0;
703                 if (is_ipv6(o)) {
704                         log_err("fio: multicast not supported on IPv6\n");
705                         close(f->fd);
706                         return 1;
707                 }
708
709                 if (o->intfc) {
710                         struct in_addr interface_addr;
711
712                         if (inet_aton(o->intfc, &interface_addr) == 0) {
713                                 log_err("fio: interface not valid interface IP\n");
714                                 close(f->fd);
715                                 return 1;
716                         }
717                         if (setsockopt(f->fd, IPPROTO_IP, IP_MULTICAST_IF, (const char*)&interface_addr, sizeof(interface_addr)) < 0) {
718                                 td_verror(td, errno, "setsockopt IP_MULTICAST_IF");
719                                 close(f->fd);
720                                 return 1;
721                         }
722                 }
723                 if (setsockopt(f->fd, IPPROTO_IP, IP_MULTICAST_TTL, (const char*)&o->ttl, sizeof(o->ttl)) < 0) {
724                         td_verror(td, errno, "setsockopt IP_MULTICAST_TTL");
725                         close(f->fd);
726                         return 1;
727                 }
728                 return 0;
729         } else if (o->proto == FIO_TYPE_TCP) {
730                 socklen_t len = sizeof(nd->addr);
731
732                 if (connect(f->fd, (struct sockaddr *) &nd->addr, len) < 0) {
733                         td_verror(td, errno, "connect");
734                         close(f->fd);
735                         return 1;
736                 }
737         } else if (o->proto == FIO_TYPE_TCP_V6) {
738                 socklen_t len = sizeof(nd->addr6);
739
740                 if (connect(f->fd, (struct sockaddr *) &nd->addr6, len) < 0) {
741                         td_verror(td, errno, "connect");
742                         close(f->fd);
743                         return 1;
744                 }
745
746         } else {
747                 struct sockaddr_un *addr = &nd->addr_un;
748                 socklen_t len;
749
750                 len = sizeof(addr->sun_family) + strlen(addr->sun_path) + 1;
751
752                 if (connect(f->fd, (struct sockaddr *) addr, len) < 0) {
753                         td_verror(td, errno, "connect");
754                         close(f->fd);
755                         return 1;
756                 }
757         }
758
759         return 0;
760 }
761
762 static int fio_netio_accept(struct thread_data *td, struct fio_file *f)
763 {
764         struct netio_data *nd = td->io_ops->data;
765         struct netio_options *o = td->eo;
766         socklen_t socklen;
767         int state;
768
769         if (is_udp(o)) {
770                 f->fd = nd->listenfd;
771                 return 0;
772         }
773
774         state = td->runstate;
775         td_set_runstate(td, TD_SETTING_UP);
776
777         log_info("fio: waiting for connection\n");
778
779         if (poll_wait(td, nd->listenfd, POLLIN) < 0)
780                 goto err;
781
782         if (o->proto == FIO_TYPE_TCP) {
783                 socklen = sizeof(nd->addr);
784                 f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr, &socklen);
785         } else {
786                 socklen = sizeof(nd->addr6);
787                 f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr6, &socklen);
788         }
789
790         if (f->fd < 0) {
791                 td_verror(td, errno, "accept");
792                 goto err;
793         }
794
795 #ifdef CONFIG_TCP_NODELAY
796         if (o->nodelay && is_tcp(o)) {
797                 int optval = 1;
798
799                 if (setsockopt(f->fd, IPPROTO_TCP, TCP_NODELAY, (void *) &optval, sizeof(int)) < 0) {
800                         log_err("fio: cannot set TCP_NODELAY option on socket (%s), disable with 'nodelay=0'\n", strerror(errno));
801                         return 1;
802                 }
803         }
804 #endif
805
806         reset_all_stats(td);
807         td_set_runstate(td, state);
808         return 0;
809 err:
810         td_set_runstate(td, state);
811         return 1;
812 }
813
814 static void fio_netio_udp_close(struct thread_data *td, struct fio_file *f)
815 {
816         struct netio_data *nd = td->io_ops->data;
817         struct netio_options *o = td->eo;
818         struct udp_close_msg msg;
819         struct sockaddr *to;
820         socklen_t len;
821         int ret;
822
823         if (is_ipv6(o)) {
824                 to = (struct sockaddr *) &nd->addr6;
825                 len = sizeof(nd->addr6);
826         } else {
827                 to = (struct sockaddr *) &nd->addr;
828                 len = sizeof(nd->addr);
829         }
830
831         msg.magic = htonl(FIO_LINK_OPEN_CLOSE_MAGIC);
832         msg.cmd = htonl(FIO_LINK_CLOSE);
833
834         ret = sendto(f->fd, (void *) &msg, sizeof(msg), MSG_WAITALL, to, len);
835         if (ret < 0)
836                 td_verror(td, errno, "sendto udp link close");
837 }
838
839 static int fio_netio_close_file(struct thread_data *td, struct fio_file *f)
840 {
841         struct netio_options *o = td->eo;
842
843         /*
844          * If this is an UDP connection, notify the receiver that we are
845          * closing down the link
846          */
847         if (is_udp(o))
848                 fio_netio_udp_close(td, f);
849
850         return generic_close_file(td, f);
851 }
852
853 static int fio_netio_udp_recv_open(struct thread_data *td, struct fio_file *f)
854 {
855         struct netio_data *nd = td->io_ops->data;
856         struct netio_options *o = td->eo;
857         struct udp_close_msg msg;
858         struct sockaddr *to;
859         socklen_t len;
860         int ret;
861
862         if (is_ipv6(o)) {
863                 len = sizeof(nd->addr6);
864                 to = (struct sockaddr *) &nd->addr6;
865         } else {
866                 len = sizeof(nd->addr);
867                 to = (struct sockaddr *) &nd->addr;
868         }
869
870         ret = recvfrom(f->fd, (void *) &msg, sizeof(msg), MSG_WAITALL, to, &len);
871         if (ret < 0) {
872                 td_verror(td, errno, "recvfrom udp link open");
873                 return ret;
874         }
875
876         if (ntohl(msg.magic) != FIO_LINK_OPEN_CLOSE_MAGIC ||
877             ntohl(msg.cmd) != FIO_LINK_OPEN) {
878                 log_err("fio: bad udp open magic %x/%x\n", ntohl(msg.magic),
879                                                                 ntohl(msg.cmd));
880                 return -1;
881         }
882
883         return 0;
884 }
885
886 static int fio_netio_udp_send_open(struct thread_data *td, struct fio_file *f)
887 {
888         struct netio_data *nd = td->io_ops->data;
889         struct netio_options *o = td->eo;
890         struct udp_close_msg msg;
891         struct sockaddr *to;
892         socklen_t len;
893         int ret;
894
895         if (is_ipv6(o)) {
896                 len = sizeof(nd->addr6);
897                 to = (struct sockaddr *) &nd->addr6;
898         } else {
899                 len = sizeof(nd->addr);
900                 to = (struct sockaddr *) &nd->addr;
901         }
902
903         msg.magic = htonl(FIO_LINK_OPEN_CLOSE_MAGIC);
904         msg.cmd = htonl(FIO_LINK_OPEN);
905
906         ret = sendto(f->fd, (void *) &msg, sizeof(msg), MSG_WAITALL, to, len);
907         if (ret < 0) {
908                 td_verror(td, errno, "sendto udp link open");
909                 return ret;
910         }
911
912         return 0;
913 }
914
915 static int fio_netio_open_file(struct thread_data *td, struct fio_file *f)
916 {
917         int ret;
918         struct netio_options *o = td->eo;
919
920         if (o->listen)
921                 ret = fio_netio_accept(td, f);
922         else
923                 ret = fio_netio_connect(td, f);
924
925         if (ret) {
926                 f->fd = -1;
927                 return ret;
928         }
929
930         if (is_udp(o)) {
931                 if (td_write(td))
932                         ret = fio_netio_udp_send_open(td, f);
933                 else {
934                         int state;
935
936                         state = td->runstate;
937                         td_set_runstate(td, TD_SETTING_UP);
938                         ret = fio_netio_udp_recv_open(td, f);
939                         td_set_runstate(td, state);
940                 }
941         }
942
943         if (ret)
944                 fio_netio_close_file(td, f);
945
946         return ret;
947 }
948
949 static int fio_fill_addr(struct thread_data *td, const char *host, int af,
950                          void *dst, struct addrinfo **res)
951 {
952         struct netio_options *o = td->eo;
953         struct addrinfo hints;
954         int ret;
955
956         if (inet_pton(af, host, dst))
957                 return 0;
958
959         memset(&hints, 0, sizeof(hints));
960
961         if (is_tcp(o))
962                 hints.ai_socktype = SOCK_STREAM;
963         else
964                 hints.ai_socktype = SOCK_DGRAM;
965
966         if (is_ipv6(o))
967                 hints.ai_family = AF_INET6;
968         else
969                 hints.ai_family = AF_INET;
970
971         ret = getaddrinfo(host, NULL, &hints, res);
972         if (ret) {
973                 int e = EINVAL;
974                 char str[128];
975
976                 if (ret == EAI_SYSTEM)
977                         e = errno;
978
979                 snprintf(str, sizeof(str), "getaddrinfo: %s", gai_strerror(ret));
980                 td_verror(td, e, str);
981                 return 1;
982         }
983
984         return 0;
985 }
986
987 static int fio_netio_setup_connect_inet(struct thread_data *td,
988                                         const char *host, unsigned short port)
989 {
990         struct netio_data *nd = td->io_ops->data;
991         struct netio_options *o = td->eo;
992         struct addrinfo *res = NULL;
993         void *dst, *src;
994         int af, len;
995
996         if (!host) {
997                 log_err("fio: connect with no host to connect to.\n");
998                 if (td_read(td))
999                         log_err("fio: did you forget to set 'listen'?\n");
1000
1001                 td_verror(td, EINVAL, "no hostname= set");
1002                 return 1;
1003         }
1004
1005         nd->addr.sin_family = AF_INET;
1006         nd->addr.sin_port = htons(port);
1007         nd->addr6.sin6_family = AF_INET6;
1008         nd->addr6.sin6_port = htons(port);
1009
1010         if (is_ipv6(o)) {
1011                 af = AF_INET6;
1012                 dst = &nd->addr6.sin6_addr;
1013         } else {
1014                 af = AF_INET;
1015                 dst = &nd->addr.sin_addr;
1016         }
1017
1018         if (fio_fill_addr(td, host, af, dst, &res))
1019                 return 1;
1020
1021         if (!res)
1022                 return 0;
1023
1024         if (is_ipv6(o)) {
1025                 len = sizeof(nd->addr6.sin6_addr);
1026                 src = &((struct sockaddr_in6 *) res->ai_addr)->sin6_addr;
1027         } else {
1028                 len = sizeof(nd->addr.sin_addr);
1029                 src = &((struct sockaddr_in *) res->ai_addr)->sin_addr;
1030         }
1031
1032         memcpy(dst, src, len);
1033         freeaddrinfo(res);
1034         return 0;
1035 }
1036
1037 static int fio_netio_setup_connect_unix(struct thread_data *td,
1038                                         const char *path)
1039 {
1040         struct netio_data *nd = td->io_ops->data;
1041         struct sockaddr_un *soun = &nd->addr_un;
1042
1043         soun->sun_family = AF_UNIX;
1044         memset(soun->sun_path, 0, sizeof(soun->sun_path));
1045         strncpy(soun->sun_path, path, sizeof(soun->sun_path) - 1);
1046         return 0;
1047 }
1048
1049 static int fio_netio_setup_connect(struct thread_data *td)
1050 {
1051         struct netio_options *o = td->eo;
1052
1053         if (is_udp(o) || is_tcp(o))
1054                 return fio_netio_setup_connect_inet(td, td->o.filename,o->port);
1055         else
1056                 return fio_netio_setup_connect_unix(td, td->o.filename);
1057 }
1058
1059 static int fio_netio_setup_listen_unix(struct thread_data *td, const char *path)
1060 {
1061         struct netio_data *nd = td->io_ops->data;
1062         struct sockaddr_un *addr = &nd->addr_un;
1063         mode_t mode;
1064         int len, fd;
1065
1066         fd = socket(AF_UNIX, SOCK_STREAM, 0);
1067         if (fd < 0) {
1068                 log_err("fio: socket: %s\n", strerror(errno));
1069                 return -1;
1070         }
1071
1072         mode = umask(000);
1073
1074         memset(addr, 0, sizeof(*addr));
1075         addr->sun_family = AF_UNIX;
1076         strncpy(addr->sun_path, path, sizeof(addr->sun_path) - 1);
1077         unlink(path);
1078
1079         len = sizeof(addr->sun_family) + strlen(path) + 1;
1080
1081         if (bind(fd, (struct sockaddr *) addr, len) < 0) {
1082                 log_err("fio: bind: %s\n", strerror(errno));
1083                 close(fd);
1084                 return -1;
1085         }
1086
1087         umask(mode);
1088         nd->listenfd = fd;
1089         return 0;
1090 }
1091
1092 static int fio_netio_setup_listen_inet(struct thread_data *td, short port)
1093 {
1094         struct netio_data *nd = td->io_ops->data;
1095         struct netio_options *o = td->eo;
1096         struct ip_mreq mr;
1097         struct sockaddr_in sin;
1098         struct sockaddr *saddr;
1099         int fd, opt, type, domain;
1100         socklen_t len;
1101
1102         memset(&sin, 0, sizeof(sin));
1103
1104         if (o->proto == FIO_TYPE_TCP) {
1105                 type = SOCK_STREAM;
1106                 domain = AF_INET;
1107         } else if (o->proto == FIO_TYPE_TCP_V6) {
1108                 type = SOCK_STREAM;
1109                 domain = AF_INET6;
1110         } else if (o->proto == FIO_TYPE_UDP) {
1111                 type = SOCK_DGRAM;
1112                 domain = AF_INET;
1113         } else if (o->proto == FIO_TYPE_UDP_V6) {
1114                 type = SOCK_DGRAM;
1115                 domain = AF_INET6;
1116         } else {
1117                 log_err("fio: unknown proto %d\n", o->proto);
1118                 return 1;
1119         }
1120
1121         fd = socket(domain, type, 0);
1122         if (fd < 0) {
1123                 td_verror(td, errno, "socket");
1124                 return 1;
1125         }
1126
1127         opt = 1;
1128         if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *) &opt, sizeof(opt)) < 0) {
1129                 td_verror(td, errno, "setsockopt");
1130                 close(fd);
1131                 return 1;
1132         }
1133 #ifdef SO_REUSEPORT
1134         if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, (void *) &opt, sizeof(opt)) < 0) {
1135                 td_verror(td, errno, "setsockopt");
1136                 close(fd);
1137                 return 1;
1138         }
1139 #endif
1140
1141         if (set_window_size(td, fd)) {
1142                 close(fd);
1143                 return 1;
1144         }
1145         if (set_mss(td, fd)) {
1146                 close(fd);
1147                 return 1;
1148         }
1149
1150         if (td->o.filename) {
1151                 if (!is_udp(o) || !fio_netio_is_multicast(td->o.filename)) {
1152                         log_err("fio: hostname not valid for non-multicast inbound network IO\n");
1153                         close(fd);
1154                         return 1;
1155                 }
1156                 if (is_ipv6(o)) {
1157                         log_err("fio: IPv6 not supported for multicast network IO");
1158                         close(fd);
1159                         return 1;
1160                 }
1161
1162                 inet_aton(td->o.filename, &sin.sin_addr);
1163
1164                 mr.imr_multiaddr = sin.sin_addr;
1165                 if (o->intfc) {
1166                         if (inet_aton(o->intfc, &mr.imr_interface) == 0) {
1167                                 log_err("fio: interface not valid interface IP\n");
1168                                 close(fd);
1169                                 return 1;
1170                         }
1171                 } else {
1172                         mr.imr_interface.s_addr = htonl(INADDR_ANY);
1173                 }
1174
1175                 if (setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (const char*)&mr, sizeof(mr)) < 0) {
1176                         td_verror(td, errno, "setsockopt IP_ADD_MEMBERSHIP");
1177                         close(fd);
1178                         return 1;
1179                 }
1180         }
1181
1182         if (!is_ipv6(o)) {
1183                 saddr = (struct sockaddr *) &nd->addr;
1184                 len = sizeof(nd->addr);
1185
1186                 nd->addr.sin_family = AF_INET;
1187                 nd->addr.sin_addr.s_addr = sin.sin_addr.s_addr ? sin.sin_addr.s_addr : htonl(INADDR_ANY);
1188                 nd->addr.sin_port = htons(port);
1189         } else {
1190                 saddr = (struct sockaddr *) &nd->addr6;
1191                 len = sizeof(nd->addr6);
1192
1193                 nd->addr6.sin6_family = AF_INET6;
1194                 nd->addr6.sin6_addr = in6addr_any;
1195                 nd->addr6.sin6_port = htons(port);
1196         }
1197
1198         if (bind(fd, saddr, len) < 0) {
1199                 close(fd);
1200                 td_verror(td, errno, "bind");
1201                 return 1;
1202         }
1203
1204         nd->listenfd = fd;
1205         return 0;
1206 }
1207
1208 static int fio_netio_setup_listen(struct thread_data *td)
1209 {
1210         struct netio_data *nd = td->io_ops->data;
1211         struct netio_options *o = td->eo;
1212         int ret;
1213
1214         if (is_udp(o) || is_tcp(o))
1215                 ret = fio_netio_setup_listen_inet(td, o->port);
1216         else
1217                 ret = fio_netio_setup_listen_unix(td, td->o.filename);
1218
1219         if (ret)
1220                 return ret;
1221         if (is_udp(o))
1222                 return 0;
1223
1224         if (listen(nd->listenfd, 10) < 0) {
1225                 td_verror(td, errno, "listen");
1226                 nd->listenfd = -1;
1227                 return 1;
1228         }
1229
1230         return 0;
1231 }
1232
1233 static int fio_netio_init(struct thread_data *td)
1234 {
1235         struct netio_options *o = td->eo;
1236         int ret;
1237
1238 #ifdef WIN32
1239         WSADATA wsd;
1240         WSAStartup(MAKEWORD(2,2), &wsd);
1241 #endif
1242
1243         if (td_random(td)) {
1244                 log_err("fio: network IO can't be random\n");
1245                 return 1;
1246         }
1247
1248         if (o->proto == FIO_TYPE_UNIX && o->port) {
1249                 log_err("fio: network IO port not valid with unix socket\n");
1250                 return 1;
1251         } else if (o->proto != FIO_TYPE_UNIX && !o->port) {
1252                 log_err("fio: network IO requires port for tcp or udp\n");
1253                 return 1;
1254         }
1255
1256         if (!is_tcp(o)) {
1257                 if (o->listen) {
1258                         log_err("fio: listen only valid for TCP proto IO\n");
1259                         return 1;
1260                 }
1261                 if (td_rw(td)) {
1262                         log_err("fio: datagram network connections must be"
1263                                    " read OR write\n");
1264                         return 1;
1265                 }
1266                 if (o->proto == FIO_TYPE_UNIX && !td->o.filename) {
1267                         log_err("fio: UNIX sockets need host/filename\n");
1268                         return 1;
1269                 }
1270                 o->listen = td_read(td);
1271         }
1272
1273         if (o->listen)
1274                 ret = fio_netio_setup_listen(td);
1275         else
1276                 ret = fio_netio_setup_connect(td);
1277
1278         return ret;
1279 }
1280
1281 static void fio_netio_cleanup(struct thread_data *td)
1282 {
1283         struct netio_data *nd = td->io_ops->data;
1284
1285         if (nd) {
1286                 if (nd->listenfd != -1)
1287                         close(nd->listenfd);
1288                 if (nd->pipes[0] != -1)
1289                         close(nd->pipes[0]);
1290                 if (nd->pipes[1] != -1)
1291                         close(nd->pipes[1]);
1292
1293                 free(nd);
1294         }
1295 }
1296
1297 static int fio_netio_setup(struct thread_data *td)
1298 {
1299         struct netio_data *nd;
1300
1301         if (!td->files_index) {
1302                 add_file(td, td->o.filename ?: "net", 0, 0);
1303                 td->o.nr_files = td->o.nr_files ?: 1;
1304                 td->o.open_files++;
1305         }
1306
1307         if (!td->io_ops->data) {
1308                 nd = malloc(sizeof(*nd));;
1309
1310                 memset(nd, 0, sizeof(*nd));
1311                 nd->listenfd = -1;
1312                 nd->pipes[0] = nd->pipes[1] = -1;
1313                 td->io_ops->data = nd;
1314         }
1315
1316         return 0;
1317 }
1318
1319 static void fio_netio_terminate(struct thread_data *td)
1320 {
1321         kill(td->pid, SIGTERM);
1322 }
1323
1324 #ifdef CONFIG_LINUX_SPLICE
1325 static int fio_netio_setup_splice(struct thread_data *td)
1326 {
1327         struct netio_data *nd;
1328
1329         fio_netio_setup(td);
1330
1331         nd = td->io_ops->data;
1332         if (nd) {
1333                 if (pipe(nd->pipes) < 0)
1334                         return 1;
1335
1336                 nd->use_splice = 1;
1337                 return 0;
1338         }
1339
1340         return 1;
1341 }
1342
1343 static struct ioengine_ops ioengine_splice = {
1344         .name                   = "netsplice",
1345         .version                = FIO_IOOPS_VERSION,
1346         .prep                   = fio_netio_prep,
1347         .queue                  = fio_netio_queue,
1348         .setup                  = fio_netio_setup_splice,
1349         .init                   = fio_netio_init,
1350         .cleanup                = fio_netio_cleanup,
1351         .open_file              = fio_netio_open_file,
1352         .close_file             = fio_netio_close_file,
1353         .terminate              = fio_netio_terminate,
1354         .options                = options,
1355         .option_struct_size     = sizeof(struct netio_options),
1356         .flags                  = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
1357                                   FIO_PIPEIO,
1358 };
1359 #endif
1360
1361 static struct ioengine_ops ioengine_rw = {
1362         .name                   = "net",
1363         .version                = FIO_IOOPS_VERSION,
1364         .prep                   = fio_netio_prep,
1365         .queue                  = fio_netio_queue,
1366         .setup                  = fio_netio_setup,
1367         .init                   = fio_netio_init,
1368         .cleanup                = fio_netio_cleanup,
1369         .open_file              = fio_netio_open_file,
1370         .close_file             = fio_netio_close_file,
1371         .terminate              = fio_netio_terminate,
1372         .options                = options,
1373         .option_struct_size     = sizeof(struct netio_options),
1374         .flags                  = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
1375                                   FIO_PIPEIO | FIO_BIT_BASED,
1376 };
1377
1378 static int str_hostname_cb(void *data, const char *input)
1379 {
1380         struct netio_options *o = data;
1381
1382         if (o->td->o.filename)
1383                 free(o->td->o.filename);
1384         o->td->o.filename = strdup(input);
1385         return 0;
1386 }
1387
1388 static void fio_init fio_netio_register(void)
1389 {
1390         register_ioengine(&ioengine_rw);
1391 #ifdef CONFIG_LINUX_SPLICE
1392         register_ioengine(&ioengine_splice);
1393 #endif
1394 }
1395
1396 static void fio_exit fio_netio_unregister(void)
1397 {
1398         unregister_ioengine(&ioengine_rw);
1399 #ifdef CONFIG_LINUX_SPLICE
1400         unregister_ioengine(&ioengine_splice);
1401 #endif
1402 }