glusterfs: update for new API
[fio.git] / engines / net.c
1 /*
2  * net engine
3  *
4  * IO engine that reads/writes to/from sockets.
5  *
6  */
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include <unistd.h>
10 #include <signal.h>
11 #include <errno.h>
12 #include <netinet/in.h>
13 #include <netinet/tcp.h>
14 #include <arpa/inet.h>
15 #include <netdb.h>
16 #include <poll.h>
17 #include <sys/stat.h>
18 #include <sys/socket.h>
19 #include <sys/un.h>
20
21 #include "../fio.h"
22 #include "../verify.h"
23 #include "../optgroup.h"
24
25 struct netio_data {
26         int listenfd;
27         int use_splice;
28         int seq_off;
29         int pipes[2];
30         struct sockaddr_in addr;
31         struct sockaddr_in6 addr6;
32         struct sockaddr_un addr_un;
33         uint64_t udp_send_seq;
34         uint64_t udp_recv_seq;
35 };
36
37 struct netio_options {
38         struct thread_data *td;
39         unsigned int port;
40         unsigned int proto;
41         unsigned int listen;
42         unsigned int pingpong;
43         unsigned int nodelay;
44         unsigned int ttl;
45         unsigned int window_size;
46         unsigned int mss;
47         char *intfc;
48 };
49
50 struct udp_close_msg {
51         uint32_t magic;
52         uint32_t cmd;
53 };
54
55 struct udp_seq {
56         uint64_t magic;
57         uint64_t seq;
58         uint64_t bs;
59 };
60
61 enum {
62         FIO_LINK_CLOSE = 0x89,
63         FIO_LINK_OPEN_CLOSE_MAGIC = 0x6c696e6b,
64         FIO_LINK_OPEN = 0x98,
65         FIO_UDP_SEQ_MAGIC = 0x657375716e556563ULL,
66
67         FIO_TYPE_TCP    = 1,
68         FIO_TYPE_UDP    = 2,
69         FIO_TYPE_UNIX   = 3,
70         FIO_TYPE_TCP_V6 = 4,
71         FIO_TYPE_UDP_V6 = 5,
72 };
73
74 static int str_hostname_cb(void *data, const char *input);
75 static struct fio_option options[] = {
76         {
77                 .name   = "hostname",
78                 .lname  = "net engine hostname",
79                 .type   = FIO_OPT_STR_STORE,
80                 .cb     = str_hostname_cb,
81                 .help   = "Hostname for net IO engine",
82                 .category = FIO_OPT_C_ENGINE,
83                 .group  = FIO_OPT_G_NETIO,
84         },
85         {
86                 .name   = "port",
87                 .lname  = "net engine port",
88                 .type   = FIO_OPT_INT,
89                 .off1   = offsetof(struct netio_options, port),
90                 .minval = 1,
91                 .maxval = 65535,
92                 .help   = "Port to use for TCP or UDP net connections",
93                 .category = FIO_OPT_C_ENGINE,
94                 .group  = FIO_OPT_G_NETIO,
95         },
96         {
97                 .name   = "protocol",
98                 .lname  = "net engine protocol",
99                 .alias  = "proto",
100                 .type   = FIO_OPT_STR,
101                 .off1   = offsetof(struct netio_options, proto),
102                 .help   = "Network protocol to use",
103                 .def    = "tcp",
104                 .posval = {
105                           { .ival = "tcp",
106                             .oval = FIO_TYPE_TCP,
107                             .help = "Transmission Control Protocol",
108                           },
109 #ifdef CONFIG_IPV6
110                           { .ival = "tcpv6",
111                             .oval = FIO_TYPE_TCP_V6,
112                             .help = "Transmission Control Protocol V6",
113                           },
114 #endif
115                           { .ival = "udp",
116                             .oval = FIO_TYPE_UDP,
117                             .help = "User Datagram Protocol",
118                           },
119 #ifdef CONFIG_IPV6
120                           { .ival = "udpv6",
121                             .oval = FIO_TYPE_UDP_V6,
122                             .help = "User Datagram Protocol V6",
123                           },
124 #endif
125                           { .ival = "unix",
126                             .oval = FIO_TYPE_UNIX,
127                             .help = "UNIX domain socket",
128                           },
129                 },
130                 .category = FIO_OPT_C_ENGINE,
131                 .group  = FIO_OPT_G_NETIO,
132         },
133 #ifdef CONFIG_TCP_NODELAY
134         {
135                 .name   = "nodelay",
136                 .lname  = "No Delay",
137                 .type   = FIO_OPT_BOOL,
138                 .off1   = offsetof(struct netio_options, nodelay),
139                 .help   = "Use TCP_NODELAY on TCP connections",
140                 .category = FIO_OPT_C_ENGINE,
141                 .group  = FIO_OPT_G_NETIO,
142         },
143 #endif
144         {
145                 .name   = "listen",
146                 .lname  = "net engine listen",
147                 .type   = FIO_OPT_STR_SET,
148                 .off1   = offsetof(struct netio_options, listen),
149                 .help   = "Listen for incoming TCP connections",
150                 .category = FIO_OPT_C_ENGINE,
151                 .group  = FIO_OPT_G_NETIO,
152         },
153         {
154                 .name   = "pingpong",
155                 .lname  = "Ping Pong",
156                 .type   = FIO_OPT_STR_SET,
157                 .off1   = offsetof(struct netio_options, pingpong),
158                 .help   = "Ping-pong IO requests",
159                 .category = FIO_OPT_C_ENGINE,
160                 .group  = FIO_OPT_G_NETIO,
161         },
162         {
163                 .name   = "interface",
164                 .lname  = "net engine interface",
165                 .type   = FIO_OPT_STR_STORE,
166                 .off1   = offsetof(struct netio_options, intfc),
167                 .help   = "Network interface to use",
168                 .category = FIO_OPT_C_ENGINE,
169                 .group  = FIO_OPT_G_NETIO,
170         },
171         {
172                 .name   = "ttl",
173                 .lname  = "net engine multicast ttl",
174                 .type   = FIO_OPT_INT,
175                 .off1   = offsetof(struct netio_options, ttl),
176                 .def    = "1",
177                 .minval = 0,
178                 .help   = "Time-to-live value for outgoing UDP multicast packets",
179                 .category = FIO_OPT_C_ENGINE,
180                 .group  = FIO_OPT_G_NETIO,
181         },
182 #ifdef CONFIG_NET_WINDOWSIZE
183         {
184                 .name   = "window_size",
185                 .lname  = "Window Size",
186                 .type   = FIO_OPT_INT,
187                 .off1   = offsetof(struct netio_options, window_size),
188                 .minval = 0,
189                 .help   = "Set socket buffer window size",
190                 .category = FIO_OPT_C_ENGINE,
191                 .group  = FIO_OPT_G_NETIO,
192         },
193 #endif
194 #ifdef CONFIG_NET_MSS
195         {
196                 .name   = "mss",
197                 .lname  = "Maximum segment size",
198                 .type   = FIO_OPT_INT,
199                 .off1   = offsetof(struct netio_options, mss),
200                 .minval = 0,
201                 .help   = "Set TCP maximum segment size",
202                 .category = FIO_OPT_C_ENGINE,
203                 .group  = FIO_OPT_G_NETIO,
204         },
205 #endif
206         {
207                 .name   = NULL,
208         },
209 };
210
211 static inline int is_udp(struct netio_options *o)
212 {
213         return o->proto == FIO_TYPE_UDP || o->proto == FIO_TYPE_UDP_V6;
214 }
215
216 static inline int is_tcp(struct netio_options *o)
217 {
218         return o->proto == FIO_TYPE_TCP || o->proto == FIO_TYPE_TCP_V6;
219 }
220
221 static inline int is_ipv6(struct netio_options *o)
222 {
223         return o->proto == FIO_TYPE_UDP_V6 || o->proto == FIO_TYPE_TCP_V6;
224 }
225
226 static int set_window_size(struct thread_data *td, int fd)
227 {
228 #ifdef CONFIG_NET_WINDOWSIZE
229         struct netio_options *o = td->eo;
230         unsigned int wss;
231         int snd, rcv, ret;
232
233         if (!o->window_size)
234                 return 0;
235
236         rcv = o->listen || o->pingpong;
237         snd = !o->listen || o->pingpong;
238         wss = o->window_size;
239         ret = 0;
240
241         if (rcv) {
242                 ret = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (void *) &wss,
243                                         sizeof(wss));
244                 if (ret < 0)
245                         td_verror(td, errno, "rcvbuf window size");
246         }
247         if (snd && !ret) {
248                 ret = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (void *) &wss,
249                                         sizeof(wss));
250                 if (ret < 0)
251                         td_verror(td, errno, "sndbuf window size");
252         }
253
254         return ret;
255 #else
256         td_verror(td, -EINVAL, "setsockopt window size");
257         return -1;
258 #endif
259 }
260
261 static int set_mss(struct thread_data *td, int fd)
262 {
263 #ifdef CONFIG_NET_MSS
264         struct netio_options *o = td->eo;
265         unsigned int mss;
266         int ret;
267
268         if (!o->mss || !is_tcp(o))
269                 return 0;
270
271         mss = o->mss;
272         ret = setsockopt(fd, IPPROTO_TCP, TCP_MAXSEG, (void *) &mss,
273                                 sizeof(mss));
274         if (ret < 0)
275                 td_verror(td, errno, "setsockopt TCP_MAXSEG");
276
277         return ret;
278 #else
279         td_verror(td, -EINVAL, "setsockopt TCP_MAXSEG");
280         return -1;
281 #endif
282 }
283
284
285 /*
286  * Return -1 for error and 'nr events' for a positive number
287  * of events
288  */
289 static int poll_wait(struct thread_data *td, int fd, short events)
290 {
291         struct pollfd pfd;
292         int ret;
293
294         while (!td->terminate) {
295                 pfd.fd = fd;
296                 pfd.events = events;
297                 ret = poll(&pfd, 1, -1);
298                 if (ret < 0) {
299                         if (errno == EINTR)
300                                 break;
301
302                         td_verror(td, errno, "poll");
303                         return -1;
304                 } else if (!ret)
305                         continue;
306
307                 break;
308         }
309
310         if (pfd.revents & events)
311                 return 1;
312
313         return -1;
314 }
315
316 static int fio_netio_is_multicast(const char *mcaddr)
317 {
318         in_addr_t addr = inet_network(mcaddr);
319         if (addr == -1)
320                 return 0;
321
322         if (inet_network("224.0.0.0") <= addr &&
323             inet_network("239.255.255.255") >= addr)
324                 return 1;
325
326         return 0;
327 }
328
329
330 static int fio_netio_prep(struct thread_data *td, struct io_u *io_u)
331 {
332         struct netio_options *o = td->eo;
333
334         /*
335          * Make sure we don't see spurious reads to a receiver, and vice versa
336          */
337         if (is_tcp(o))
338                 return 0;
339
340         if ((o->listen && io_u->ddir == DDIR_WRITE) ||
341             (!o->listen && io_u->ddir == DDIR_READ)) {
342                 td_verror(td, EINVAL, "bad direction");
343                 return 1;
344         }
345
346         return 0;
347 }
348
349 #ifdef CONFIG_LINUX_SPLICE
350 static int splice_io_u(int fdin, int fdout, unsigned int len)
351 {
352         int bytes = 0;
353
354         while (len) {
355                 int ret = splice(fdin, NULL, fdout, NULL, len, 0);
356
357                 if (ret < 0) {
358                         if (!bytes)
359                                 bytes = ret;
360
361                         break;
362                 } else if (!ret)
363                         break;
364
365                 bytes += ret;
366                 len -= ret;
367         }
368
369         return bytes;
370 }
371
372 /*
373  * Receive bytes from a socket and fill them into the internal pipe
374  */
375 static int splice_in(struct thread_data *td, struct io_u *io_u)
376 {
377         struct netio_data *nd = td->io_ops_data;
378
379         return splice_io_u(io_u->file->fd, nd->pipes[1], io_u->xfer_buflen);
380 }
381
382 /*
383  * Transmit 'len' bytes from the internal pipe
384  */
385 static int splice_out(struct thread_data *td, struct io_u *io_u,
386                       unsigned int len)
387 {
388         struct netio_data *nd = td->io_ops_data;
389
390         return splice_io_u(nd->pipes[0], io_u->file->fd, len);
391 }
392
393 static int vmsplice_io_u(struct io_u *io_u, int fd, unsigned int len)
394 {
395         struct iovec iov = {
396                 .iov_base = io_u->xfer_buf,
397                 .iov_len = len,
398         };
399         int bytes = 0;
400
401         while (iov.iov_len) {
402                 int ret = vmsplice(fd, &iov, 1, SPLICE_F_MOVE);
403
404                 if (ret < 0) {
405                         if (!bytes)
406                                 bytes = ret;
407                         break;
408                 } else if (!ret)
409                         break;
410
411                 iov.iov_len -= ret;
412                 iov.iov_base += ret;
413                 bytes += ret;
414         }
415
416         return bytes;
417
418 }
419
420 /*
421  * vmsplice() pipe to io_u buffer
422  */
423 static int vmsplice_io_u_out(struct thread_data *td, struct io_u *io_u,
424                              unsigned int len)
425 {
426         struct netio_data *nd = td->io_ops_data;
427
428         return vmsplice_io_u(io_u, nd->pipes[0], len);
429 }
430
431 /*
432  * vmsplice() io_u to pipe
433  */
434 static int vmsplice_io_u_in(struct thread_data *td, struct io_u *io_u)
435 {
436         struct netio_data *nd = td->io_ops_data;
437
438         return vmsplice_io_u(io_u, nd->pipes[1], io_u->xfer_buflen);
439 }
440
441 /*
442  * splice receive - transfer socket data into a pipe using splice, then map
443  * that pipe data into the io_u using vmsplice.
444  */
445 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
446 {
447         int ret;
448
449         ret = splice_in(td, io_u);
450         if (ret > 0)
451                 return vmsplice_io_u_out(td, io_u, ret);
452
453         return ret;
454 }
455
456 /*
457  * splice transmit - map data from the io_u into a pipe by using vmsplice,
458  * then transfer that pipe to a socket using splice.
459  */
460 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
461 {
462         int ret;
463
464         ret = vmsplice_io_u_in(td, io_u);
465         if (ret > 0)
466                 return splice_out(td, io_u, ret);
467
468         return ret;
469 }
470 #else
471 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
472 {
473         errno = EOPNOTSUPP;
474         return -1;
475 }
476
477 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
478 {
479         errno = EOPNOTSUPP;
480         return -1;
481 }
482 #endif
483
484 static void store_udp_seq(struct netio_data *nd, struct io_u *io_u)
485 {
486         struct udp_seq *us;
487
488         if (io_u->xfer_buflen < sizeof(*us))
489                 return;
490
491         us = io_u->xfer_buf + io_u->xfer_buflen - sizeof(*us);
492         us->magic = cpu_to_le64((uint64_t) FIO_UDP_SEQ_MAGIC);
493         us->bs = cpu_to_le64((uint64_t) io_u->xfer_buflen);
494         us->seq = cpu_to_le64(nd->udp_send_seq++);
495 }
496
497 static void verify_udp_seq(struct thread_data *td, struct netio_data *nd,
498                            struct io_u *io_u)
499 {
500         struct udp_seq *us;
501         uint64_t seq;
502
503         if (io_u->xfer_buflen < sizeof(*us))
504                 return;
505
506         if (nd->seq_off)
507                 return;
508
509         us = io_u->xfer_buf + io_u->xfer_buflen - sizeof(*us);
510         if (le64_to_cpu(us->magic) != FIO_UDP_SEQ_MAGIC)
511                 return;
512         if (le64_to_cpu(us->bs) != io_u->xfer_buflen) {
513                 nd->seq_off = 1;
514                 return;
515         }
516
517         seq = le64_to_cpu(us->seq);
518
519         if (seq != nd->udp_recv_seq)
520                 td->ts.drop_io_u[io_u->ddir] += seq - nd->udp_recv_seq;
521
522         nd->udp_recv_seq = seq + 1;
523 }
524
525 static int fio_netio_send(struct thread_data *td, struct io_u *io_u)
526 {
527         struct netio_data *nd = td->io_ops_data;
528         struct netio_options *o = td->eo;
529         int ret, flags = 0;
530
531         do {
532                 if (is_udp(o)) {
533                         const struct sockaddr *to;
534                         socklen_t len;
535
536                         if (is_ipv6(o)) {
537                                 to = (struct sockaddr *) &nd->addr6;
538                                 len = sizeof(nd->addr6);
539                         } else {
540                                 to = (struct sockaddr *) &nd->addr;
541                                 len = sizeof(nd->addr);
542                         }
543
544                         if (td->o.verify == VERIFY_NONE)
545                                 store_udp_seq(nd, io_u);
546
547                         ret = sendto(io_u->file->fd, io_u->xfer_buf,
548                                         io_u->xfer_buflen, flags, to, len);
549                 } else {
550                         /*
551                          * if we are going to write more, set MSG_MORE
552                          */
553 #ifdef MSG_MORE
554                         if ((td->this_io_bytes[DDIR_WRITE] + io_u->xfer_buflen <
555                             td->o.size) && !o->pingpong)
556                                 flags |= MSG_MORE;
557 #endif
558                         ret = send(io_u->file->fd, io_u->xfer_buf,
559                                         io_u->xfer_buflen, flags);
560                 }
561                 if (ret > 0)
562                         break;
563
564                 ret = poll_wait(td, io_u->file->fd, POLLOUT);
565                 if (ret <= 0)
566                         break;
567         } while (1);
568
569         return ret;
570 }
571
572 static int is_close_msg(struct io_u *io_u, int len)
573 {
574         struct udp_close_msg *msg;
575
576         if (len != sizeof(struct udp_close_msg))
577                 return 0;
578
579         msg = io_u->xfer_buf;
580         if (le32_to_cpu(msg->magic) != FIO_LINK_OPEN_CLOSE_MAGIC)
581                 return 0;
582         if (le32_to_cpu(msg->cmd) != FIO_LINK_CLOSE)
583                 return 0;
584
585         return 1;
586 }
587
588 static int fio_netio_recv(struct thread_data *td, struct io_u *io_u)
589 {
590         struct netio_data *nd = td->io_ops_data;
591         struct netio_options *o = td->eo;
592         int ret, flags = 0;
593
594         do {
595                 if (is_udp(o)) {
596                         struct sockaddr *from;
597                         socklen_t l, *len = &l;
598
599                         if (o->listen) {
600                                 if (!is_ipv6(o)) {
601                                         from = (struct sockaddr *) &nd->addr;
602                                         *len = sizeof(nd->addr);
603                                 } else {
604                                         from = (struct sockaddr *) &nd->addr6;
605                                         *len = sizeof(nd->addr6);
606                                 }
607                         } else {
608                                 from = NULL;
609                                 len = NULL;
610                         }
611
612                         ret = recvfrom(io_u->file->fd, io_u->xfer_buf,
613                                         io_u->xfer_buflen, flags, from, len);
614
615                         if (is_close_msg(io_u, ret)) {
616                                 td->done = 1;
617                                 return 0;
618                         }
619                 } else {
620                         ret = recv(io_u->file->fd, io_u->xfer_buf,
621                                         io_u->xfer_buflen, flags);
622
623                         if (is_close_msg(io_u, ret)) {
624                                 td->done = 1;
625                                 return 0;
626                         }
627                 }
628                 if (ret > 0)
629                         break;
630                 else if (!ret && (flags & MSG_WAITALL))
631                         break;
632
633                 ret = poll_wait(td, io_u->file->fd, POLLIN);
634                 if (ret <= 0)
635                         break;
636                 flags |= MSG_WAITALL;
637         } while (1);
638
639         if (is_udp(o) && td->o.verify == VERIFY_NONE)
640                 verify_udp_seq(td, nd, io_u);
641
642         return ret;
643 }
644
645 static enum fio_q_status __fio_netio_queue(struct thread_data *td,
646                                            struct io_u *io_u,
647                                            enum fio_ddir ddir)
648 {
649         struct netio_data *nd = td->io_ops_data;
650         struct netio_options *o = td->eo;
651         int ret;
652
653         if (ddir == DDIR_WRITE) {
654                 if (!nd->use_splice || is_udp(o) ||
655                     o->proto == FIO_TYPE_UNIX)
656                         ret = fio_netio_send(td, io_u);
657                 else
658                         ret = fio_netio_splice_out(td, io_u);
659         } else if (ddir == DDIR_READ) {
660                 if (!nd->use_splice || is_udp(o) ||
661                     o->proto == FIO_TYPE_UNIX)
662                         ret = fio_netio_recv(td, io_u);
663                 else
664                         ret = fio_netio_splice_in(td, io_u);
665         } else
666                 ret = 0;        /* must be a SYNC */
667
668         if (ret != (int) io_u->xfer_buflen) {
669                 if (ret > 0) {
670                         io_u->resid = io_u->xfer_buflen - ret;
671                         io_u->error = 0;
672                         return FIO_Q_COMPLETED;
673                 } else if (!ret)
674                         return FIO_Q_BUSY;
675                 else {
676                         int err = errno;
677
678                         if (ddir == DDIR_WRITE && err == EMSGSIZE)
679                                 return FIO_Q_BUSY;
680
681                         io_u->error = err;
682                 }
683         }
684
685         if (io_u->error)
686                 td_verror(td, io_u->error, "xfer");
687
688         return FIO_Q_COMPLETED;
689 }
690
691 static enum fio_q_status fio_netio_queue(struct thread_data *td,
692                                          struct io_u *io_u)
693 {
694         struct netio_options *o = td->eo;
695         int ret;
696
697         fio_ro_check(td, io_u);
698
699         ret = __fio_netio_queue(td, io_u, io_u->ddir);
700         if (!o->pingpong || ret != FIO_Q_COMPLETED)
701                 return ret;
702
703         /*
704          * For ping-pong mode, receive or send reply as needed
705          */
706         if (td_read(td) && io_u->ddir == DDIR_READ)
707                 ret = __fio_netio_queue(td, io_u, DDIR_WRITE);
708         else if (td_write(td) && io_u->ddir == DDIR_WRITE)
709                 ret = __fio_netio_queue(td, io_u, DDIR_READ);
710
711         return ret;
712 }
713
714 static int fio_netio_connect(struct thread_data *td, struct fio_file *f)
715 {
716         struct netio_data *nd = td->io_ops_data;
717         struct netio_options *o = td->eo;
718         int type, domain;
719
720         if (o->proto == FIO_TYPE_TCP) {
721                 domain = AF_INET;
722                 type = SOCK_STREAM;
723         } else if (o->proto == FIO_TYPE_TCP_V6) {
724                 domain = AF_INET6;
725                 type = SOCK_STREAM;
726         } else if (o->proto == FIO_TYPE_UDP) {
727                 domain = AF_INET;
728                 type = SOCK_DGRAM;
729         } else if (o->proto == FIO_TYPE_UDP_V6) {
730                 domain = AF_INET6;
731                 type = SOCK_DGRAM;
732         } else if (o->proto == FIO_TYPE_UNIX) {
733                 domain = AF_UNIX;
734                 type = SOCK_STREAM;
735         } else {
736                 log_err("fio: bad network type %d\n", o->proto);
737                 f->fd = -1;
738                 return 1;
739         }
740
741         f->fd = socket(domain, type, 0);
742         if (f->fd < 0) {
743                 td_verror(td, errno, "socket");
744                 return 1;
745         }
746
747 #ifdef CONFIG_TCP_NODELAY
748         if (o->nodelay && is_tcp(o)) {
749                 int optval = 1;
750
751                 if (setsockopt(f->fd, IPPROTO_TCP, TCP_NODELAY, (void *) &optval, sizeof(int)) < 0) {
752                         log_err("fio: cannot set TCP_NODELAY option on socket (%s), disable with 'nodelay=0'\n", strerror(errno));
753                         return 1;
754                 }
755         }
756 #endif
757
758         if (set_window_size(td, f->fd)) {
759                 close(f->fd);
760                 return 1;
761         }
762         if (set_mss(td, f->fd)) {
763                 close(f->fd);
764                 return 1;
765         }
766
767         if (is_udp(o)) {
768                 if (!fio_netio_is_multicast(td->o.filename))
769                         return 0;
770                 if (is_ipv6(o)) {
771                         log_err("fio: multicast not supported on IPv6\n");
772                         close(f->fd);
773                         return 1;
774                 }
775
776                 if (o->intfc) {
777                         struct in_addr interface_addr;
778
779                         if (inet_aton(o->intfc, &interface_addr) == 0) {
780                                 log_err("fio: interface not valid interface IP\n");
781                                 close(f->fd);
782                                 return 1;
783                         }
784                         if (setsockopt(f->fd, IPPROTO_IP, IP_MULTICAST_IF, (const char*)&interface_addr, sizeof(interface_addr)) < 0) {
785                                 td_verror(td, errno, "setsockopt IP_MULTICAST_IF");
786                                 close(f->fd);
787                                 return 1;
788                         }
789                 }
790                 if (setsockopt(f->fd, IPPROTO_IP, IP_MULTICAST_TTL, (const char*)&o->ttl, sizeof(o->ttl)) < 0) {
791                         td_verror(td, errno, "setsockopt IP_MULTICAST_TTL");
792                         close(f->fd);
793                         return 1;
794                 }
795                 return 0;
796         } else if (o->proto == FIO_TYPE_TCP) {
797                 socklen_t len = sizeof(nd->addr);
798
799                 if (connect(f->fd, (struct sockaddr *) &nd->addr, len) < 0) {
800                         td_verror(td, errno, "connect");
801                         close(f->fd);
802                         return 1;
803                 }
804         } else if (o->proto == FIO_TYPE_TCP_V6) {
805                 socklen_t len = sizeof(nd->addr6);
806
807                 if (connect(f->fd, (struct sockaddr *) &nd->addr6, len) < 0) {
808                         td_verror(td, errno, "connect");
809                         close(f->fd);
810                         return 1;
811                 }
812
813         } else {
814                 struct sockaddr_un *addr = &nd->addr_un;
815                 socklen_t len;
816
817                 len = sizeof(addr->sun_family) + strlen(addr->sun_path) + 1;
818
819                 if (connect(f->fd, (struct sockaddr *) addr, len) < 0) {
820                         td_verror(td, errno, "connect");
821                         close(f->fd);
822                         return 1;
823                 }
824         }
825
826         return 0;
827 }
828
829 static int fio_netio_accept(struct thread_data *td, struct fio_file *f)
830 {
831         struct netio_data *nd = td->io_ops_data;
832         struct netio_options *o = td->eo;
833         socklen_t socklen;
834         int state;
835
836         if (is_udp(o)) {
837                 f->fd = nd->listenfd;
838                 return 0;
839         }
840
841         state = td->runstate;
842         td_set_runstate(td, TD_SETTING_UP);
843
844         log_info("fio: waiting for connection\n");
845
846         if (poll_wait(td, nd->listenfd, POLLIN) < 0)
847                 goto err;
848
849         if (o->proto == FIO_TYPE_TCP) {
850                 socklen = sizeof(nd->addr);
851                 f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr, &socklen);
852         } else {
853                 socklen = sizeof(nd->addr6);
854                 f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr6, &socklen);
855         }
856
857         if (f->fd < 0) {
858                 td_verror(td, errno, "accept");
859                 goto err;
860         }
861
862 #ifdef CONFIG_TCP_NODELAY
863         if (o->nodelay && is_tcp(o)) {
864                 int optval = 1;
865
866                 if (setsockopt(f->fd, IPPROTO_TCP, TCP_NODELAY, (void *) &optval, sizeof(int)) < 0) {
867                         log_err("fio: cannot set TCP_NODELAY option on socket (%s), disable with 'nodelay=0'\n", strerror(errno));
868                         return 1;
869                 }
870         }
871 #endif
872
873         reset_all_stats(td);
874         td_set_runstate(td, state);
875         return 0;
876 err:
877         td_set_runstate(td, state);
878         return 1;
879 }
880
881 static void fio_netio_send_close(struct thread_data *td, struct fio_file *f)
882 {
883         struct netio_data *nd = td->io_ops_data;
884         struct netio_options *o = td->eo;
885         struct udp_close_msg msg;
886         struct sockaddr *to;
887         socklen_t len;
888         int ret;
889
890         if (is_ipv6(o)) {
891                 to = (struct sockaddr *) &nd->addr6;
892                 len = sizeof(nd->addr6);
893         } else {
894                 to = (struct sockaddr *) &nd->addr;
895                 len = sizeof(nd->addr);
896         }
897
898         msg.magic = cpu_to_le32((uint32_t) FIO_LINK_OPEN_CLOSE_MAGIC);
899         msg.cmd = cpu_to_le32((uint32_t) FIO_LINK_CLOSE);
900
901         ret = sendto(f->fd, (void *) &msg, sizeof(msg), MSG_WAITALL, to, len);
902         if (ret < 0)
903                 td_verror(td, errno, "sendto udp link close");
904 }
905
906 static int fio_netio_close_file(struct thread_data *td, struct fio_file *f)
907 {
908         /*
909          * Notify the receiver that we are closing down the link
910          */
911         fio_netio_send_close(td, f);
912
913         return generic_close_file(td, f);
914 }
915
916 static int fio_netio_udp_recv_open(struct thread_data *td, struct fio_file *f)
917 {
918         struct netio_data *nd = td->io_ops_data;
919         struct netio_options *o = td->eo;
920         struct udp_close_msg msg;
921         struct sockaddr *to;
922         socklen_t len;
923         int ret;
924
925         if (is_ipv6(o)) {
926                 len = sizeof(nd->addr6);
927                 to = (struct sockaddr *) &nd->addr6;
928         } else {
929                 len = sizeof(nd->addr);
930                 to = (struct sockaddr *) &nd->addr;
931         }
932
933         ret = recvfrom(f->fd, (void *) &msg, sizeof(msg), MSG_WAITALL, to, &len);
934         if (ret < 0) {
935                 td_verror(td, errno, "recvfrom udp link open");
936                 return ret;
937         }
938
939         if (ntohl(msg.magic) != FIO_LINK_OPEN_CLOSE_MAGIC ||
940             ntohl(msg.cmd) != FIO_LINK_OPEN) {
941                 log_err("fio: bad udp open magic %x/%x\n", ntohl(msg.magic),
942                                                                 ntohl(msg.cmd));
943                 return -1;
944         }
945
946         fio_gettime(&td->start, NULL);
947         return 0;
948 }
949
950 static int fio_netio_send_open(struct thread_data *td, struct fio_file *f)
951 {
952         struct netio_data *nd = td->io_ops_data;
953         struct netio_options *o = td->eo;
954         struct udp_close_msg msg;
955         struct sockaddr *to;
956         socklen_t len;
957         int ret;
958
959         if (is_ipv6(o)) {
960                 len = sizeof(nd->addr6);
961                 to = (struct sockaddr *) &nd->addr6;
962         } else {
963                 len = sizeof(nd->addr);
964                 to = (struct sockaddr *) &nd->addr;
965         }
966
967         msg.magic = htonl(FIO_LINK_OPEN_CLOSE_MAGIC);
968         msg.cmd = htonl(FIO_LINK_OPEN);
969
970         ret = sendto(f->fd, (void *) &msg, sizeof(msg), MSG_WAITALL, to, len);
971         if (ret < 0) {
972                 td_verror(td, errno, "sendto udp link open");
973                 return ret;
974         }
975
976         return 0;
977 }
978
979 static int fio_netio_open_file(struct thread_data *td, struct fio_file *f)
980 {
981         int ret;
982         struct netio_options *o = td->eo;
983
984         if (o->listen)
985                 ret = fio_netio_accept(td, f);
986         else
987                 ret = fio_netio_connect(td, f);
988
989         if (ret) {
990                 f->fd = -1;
991                 return ret;
992         }
993
994         if (is_udp(o)) {
995                 if (td_write(td))
996                         ret = fio_netio_send_open(td, f);
997                 else {
998                         int state;
999
1000                         state = td->runstate;
1001                         td_set_runstate(td, TD_SETTING_UP);
1002                         ret = fio_netio_udp_recv_open(td, f);
1003                         td_set_runstate(td, state);
1004                 }
1005         }
1006
1007         if (ret)
1008                 fio_netio_close_file(td, f);
1009
1010         return ret;
1011 }
1012
1013 static int fio_fill_addr(struct thread_data *td, const char *host, int af,
1014                          void *dst, struct addrinfo **res)
1015 {
1016         struct netio_options *o = td->eo;
1017         struct addrinfo hints;
1018         int ret;
1019
1020         if (inet_pton(af, host, dst))
1021                 return 0;
1022
1023         memset(&hints, 0, sizeof(hints));
1024
1025         if (is_tcp(o))
1026                 hints.ai_socktype = SOCK_STREAM;
1027         else
1028                 hints.ai_socktype = SOCK_DGRAM;
1029
1030         if (is_ipv6(o))
1031                 hints.ai_family = AF_INET6;
1032         else
1033                 hints.ai_family = AF_INET;
1034
1035         ret = getaddrinfo(host, NULL, &hints, res);
1036         if (ret) {
1037                 int e = EINVAL;
1038                 char str[128];
1039
1040                 if (ret == EAI_SYSTEM)
1041                         e = errno;
1042
1043                 snprintf(str, sizeof(str), "getaddrinfo: %s", gai_strerror(ret));
1044                 td_verror(td, e, str);
1045                 return 1;
1046         }
1047
1048         return 0;
1049 }
1050
1051 static int fio_netio_setup_connect_inet(struct thread_data *td,
1052                                         const char *host, unsigned short port)
1053 {
1054         struct netio_data *nd = td->io_ops_data;
1055         struct netio_options *o = td->eo;
1056         struct addrinfo *res = NULL;
1057         void *dst, *src;
1058         int af, len;
1059
1060         if (!host) {
1061                 log_err("fio: connect with no host to connect to.\n");
1062                 if (td_read(td))
1063                         log_err("fio: did you forget to set 'listen'?\n");
1064
1065                 td_verror(td, EINVAL, "no hostname= set");
1066                 return 1;
1067         }
1068
1069         nd->addr.sin_family = AF_INET;
1070         nd->addr.sin_port = htons(port);
1071         nd->addr6.sin6_family = AF_INET6;
1072         nd->addr6.sin6_port = htons(port);
1073
1074         if (is_ipv6(o)) {
1075                 af = AF_INET6;
1076                 dst = &nd->addr6.sin6_addr;
1077         } else {
1078                 af = AF_INET;
1079                 dst = &nd->addr.sin_addr;
1080         }
1081
1082         if (fio_fill_addr(td, host, af, dst, &res))
1083                 return 1;
1084
1085         if (!res)
1086                 return 0;
1087
1088         if (is_ipv6(o)) {
1089                 len = sizeof(nd->addr6.sin6_addr);
1090                 src = &((struct sockaddr_in6 *) res->ai_addr)->sin6_addr;
1091         } else {
1092                 len = sizeof(nd->addr.sin_addr);
1093                 src = &((struct sockaddr_in *) res->ai_addr)->sin_addr;
1094         }
1095
1096         memcpy(dst, src, len);
1097         freeaddrinfo(res);
1098         return 0;
1099 }
1100
1101 static int fio_netio_setup_connect_unix(struct thread_data *td,
1102                                         const char *path)
1103 {
1104         struct netio_data *nd = td->io_ops_data;
1105         struct sockaddr_un *soun = &nd->addr_un;
1106
1107         soun->sun_family = AF_UNIX;
1108         memset(soun->sun_path, 0, sizeof(soun->sun_path));
1109         strncpy(soun->sun_path, path, sizeof(soun->sun_path) - 1);
1110         return 0;
1111 }
1112
1113 static int fio_netio_setup_connect(struct thread_data *td)
1114 {
1115         struct netio_options *o = td->eo;
1116
1117         if (is_udp(o) || is_tcp(o))
1118                 return fio_netio_setup_connect_inet(td, td->o.filename,o->port);
1119         else
1120                 return fio_netio_setup_connect_unix(td, td->o.filename);
1121 }
1122
1123 static int fio_netio_setup_listen_unix(struct thread_data *td, const char *path)
1124 {
1125         struct netio_data *nd = td->io_ops_data;
1126         struct sockaddr_un *addr = &nd->addr_un;
1127         mode_t mode;
1128         int len, fd;
1129
1130         fd = socket(AF_UNIX, SOCK_STREAM, 0);
1131         if (fd < 0) {
1132                 log_err("fio: socket: %s\n", strerror(errno));
1133                 return -1;
1134         }
1135
1136         mode = umask(000);
1137
1138         memset(addr, 0, sizeof(*addr));
1139         addr->sun_family = AF_UNIX;
1140         strncpy(addr->sun_path, path, sizeof(addr->sun_path) - 1);
1141         unlink(path);
1142
1143         len = sizeof(addr->sun_family) + strlen(path) + 1;
1144
1145         if (bind(fd, (struct sockaddr *) addr, len) < 0) {
1146                 log_err("fio: bind: %s\n", strerror(errno));
1147                 close(fd);
1148                 return -1;
1149         }
1150
1151         umask(mode);
1152         nd->listenfd = fd;
1153         return 0;
1154 }
1155
1156 static int fio_netio_setup_listen_inet(struct thread_data *td, short port)
1157 {
1158         struct netio_data *nd = td->io_ops_data;
1159         struct netio_options *o = td->eo;
1160         struct ip_mreq mr;
1161         struct sockaddr_in sin;
1162         struct sockaddr *saddr;
1163         int fd, opt, type, domain;
1164         socklen_t len;
1165
1166         memset(&sin, 0, sizeof(sin));
1167
1168         if (o->proto == FIO_TYPE_TCP) {
1169                 type = SOCK_STREAM;
1170                 domain = AF_INET;
1171         } else if (o->proto == FIO_TYPE_TCP_V6) {
1172                 type = SOCK_STREAM;
1173                 domain = AF_INET6;
1174         } else if (o->proto == FIO_TYPE_UDP) {
1175                 type = SOCK_DGRAM;
1176                 domain = AF_INET;
1177         } else if (o->proto == FIO_TYPE_UDP_V6) {
1178                 type = SOCK_DGRAM;
1179                 domain = AF_INET6;
1180         } else {
1181                 log_err("fio: unknown proto %d\n", o->proto);
1182                 return 1;
1183         }
1184
1185         fd = socket(domain, type, 0);
1186         if (fd < 0) {
1187                 td_verror(td, errno, "socket");
1188                 return 1;
1189         }
1190
1191         opt = 1;
1192         if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *) &opt, sizeof(opt)) < 0) {
1193                 td_verror(td, errno, "setsockopt");
1194                 close(fd);
1195                 return 1;
1196         }
1197 #ifdef SO_REUSEPORT
1198         if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, (void *) &opt, sizeof(opt)) < 0) {
1199                 td_verror(td, errno, "setsockopt");
1200                 close(fd);
1201                 return 1;
1202         }
1203 #endif
1204
1205         if (set_window_size(td, fd)) {
1206                 close(fd);
1207                 return 1;
1208         }
1209         if (set_mss(td, fd)) {
1210                 close(fd);
1211                 return 1;
1212         }
1213
1214         if (td->o.filename) {
1215                 if (!is_udp(o) || !fio_netio_is_multicast(td->o.filename)) {
1216                         log_err("fio: hostname not valid for non-multicast inbound network IO\n");
1217                         close(fd);
1218                         return 1;
1219                 }
1220                 if (is_ipv6(o)) {
1221                         log_err("fio: IPv6 not supported for multicast network IO\n");
1222                         close(fd);
1223                         return 1;
1224                 }
1225
1226                 inet_aton(td->o.filename, &sin.sin_addr);
1227
1228                 mr.imr_multiaddr = sin.sin_addr;
1229                 if (o->intfc) {
1230                         if (inet_aton(o->intfc, &mr.imr_interface) == 0) {
1231                                 log_err("fio: interface not valid interface IP\n");
1232                                 close(fd);
1233                                 return 1;
1234                         }
1235                 } else {
1236                         mr.imr_interface.s_addr = htonl(INADDR_ANY);
1237                 }
1238
1239                 if (setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (const char*)&mr, sizeof(mr)) < 0) {
1240                         td_verror(td, errno, "setsockopt IP_ADD_MEMBERSHIP");
1241                         close(fd);
1242                         return 1;
1243                 }
1244         }
1245
1246         if (!is_ipv6(o)) {
1247                 saddr = (struct sockaddr *) &nd->addr;
1248                 len = sizeof(nd->addr);
1249
1250                 nd->addr.sin_family = AF_INET;
1251                 nd->addr.sin_addr.s_addr = sin.sin_addr.s_addr ? sin.sin_addr.s_addr : htonl(INADDR_ANY);
1252                 nd->addr.sin_port = htons(port);
1253         } else {
1254                 saddr = (struct sockaddr *) &nd->addr6;
1255                 len = sizeof(nd->addr6);
1256
1257                 nd->addr6.sin6_family = AF_INET6;
1258                 nd->addr6.sin6_addr = in6addr_any;
1259                 nd->addr6.sin6_port = htons(port);
1260         }
1261
1262         if (bind(fd, saddr, len) < 0) {
1263                 close(fd);
1264                 td_verror(td, errno, "bind");
1265                 return 1;
1266         }
1267
1268         nd->listenfd = fd;
1269         return 0;
1270 }
1271
1272 static int fio_netio_setup_listen(struct thread_data *td)
1273 {
1274         struct netio_data *nd = td->io_ops_data;
1275         struct netio_options *o = td->eo;
1276         int ret;
1277
1278         if (is_udp(o) || is_tcp(o))
1279                 ret = fio_netio_setup_listen_inet(td, o->port);
1280         else
1281                 ret = fio_netio_setup_listen_unix(td, td->o.filename);
1282
1283         if (ret)
1284                 return ret;
1285         if (is_udp(o))
1286                 return 0;
1287
1288         if (listen(nd->listenfd, 10) < 0) {
1289                 td_verror(td, errno, "listen");
1290                 nd->listenfd = -1;
1291                 return 1;
1292         }
1293
1294         return 0;
1295 }
1296
1297 static int fio_netio_init(struct thread_data *td)
1298 {
1299         struct netio_options *o = td->eo;
1300         int ret;
1301
1302 #ifdef WIN32
1303         WSADATA wsd;
1304         WSAStartup(MAKEWORD(2,2), &wsd);
1305 #endif
1306
1307         if (td_random(td)) {
1308                 log_err("fio: network IO can't be random\n");
1309                 return 1;
1310         }
1311
1312         if (o->proto == FIO_TYPE_UNIX && o->port) {
1313                 log_err("fio: network IO port not valid with unix socket\n");
1314                 return 1;
1315         } else if (o->proto != FIO_TYPE_UNIX && !o->port) {
1316                 log_err("fio: network IO requires port for tcp or udp\n");
1317                 return 1;
1318         }
1319
1320         o->port += td->subjob_number;
1321
1322         if (!is_tcp(o)) {
1323                 if (o->listen) {
1324                         log_err("fio: listen only valid for TCP proto IO\n");
1325                         return 1;
1326                 }
1327                 if (td_rw(td)) {
1328                         log_err("fio: datagram network connections must be"
1329                                    " read OR write\n");
1330                         return 1;
1331                 }
1332                 if (o->proto == FIO_TYPE_UNIX && !td->o.filename) {
1333                         log_err("fio: UNIX sockets need host/filename\n");
1334                         return 1;
1335                 }
1336                 o->listen = td_read(td);
1337         }
1338
1339         if (o->listen)
1340                 ret = fio_netio_setup_listen(td);
1341         else
1342                 ret = fio_netio_setup_connect(td);
1343
1344         return ret;
1345 }
1346
1347 static void fio_netio_cleanup(struct thread_data *td)
1348 {
1349         struct netio_data *nd = td->io_ops_data;
1350
1351         if (nd) {
1352                 if (nd->listenfd != -1)
1353                         close(nd->listenfd);
1354                 if (nd->pipes[0] != -1)
1355                         close(nd->pipes[0]);
1356                 if (nd->pipes[1] != -1)
1357                         close(nd->pipes[1]);
1358
1359                 free(nd);
1360         }
1361 }
1362
1363 static int fio_netio_setup(struct thread_data *td)
1364 {
1365         struct netio_data *nd;
1366
1367         if (!td->files_index) {
1368                 add_file(td, td->o.filename ?: "net", 0, 0);
1369                 td->o.nr_files = td->o.nr_files ?: 1;
1370                 td->o.open_files++;
1371         }
1372
1373         if (!td->io_ops_data) {
1374                 nd = malloc(sizeof(*nd));
1375
1376                 memset(nd, 0, sizeof(*nd));
1377                 nd->listenfd = -1;
1378                 nd->pipes[0] = nd->pipes[1] = -1;
1379                 td->io_ops_data = nd;
1380         }
1381
1382         return 0;
1383 }
1384
1385 static void fio_netio_terminate(struct thread_data *td)
1386 {
1387         kill(td->pid, SIGTERM);
1388 }
1389
1390 #ifdef CONFIG_LINUX_SPLICE
1391 static int fio_netio_setup_splice(struct thread_data *td)
1392 {
1393         struct netio_data *nd;
1394
1395         fio_netio_setup(td);
1396
1397         nd = td->io_ops_data;
1398         if (nd) {
1399                 if (pipe(nd->pipes) < 0)
1400                         return 1;
1401
1402                 nd->use_splice = 1;
1403                 return 0;
1404         }
1405
1406         return 1;
1407 }
1408
1409 static struct ioengine_ops ioengine_splice = {
1410         .name                   = "netsplice",
1411         .version                = FIO_IOOPS_VERSION,
1412         .prep                   = fio_netio_prep,
1413         .queue                  = fio_netio_queue,
1414         .setup                  = fio_netio_setup_splice,
1415         .init                   = fio_netio_init,
1416         .cleanup                = fio_netio_cleanup,
1417         .open_file              = fio_netio_open_file,
1418         .close_file             = fio_netio_close_file,
1419         .terminate              = fio_netio_terminate,
1420         .options                = options,
1421         .option_struct_size     = sizeof(struct netio_options),
1422         .flags                  = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
1423                                   FIO_PIPEIO,
1424 };
1425 #endif
1426
1427 static struct ioengine_ops ioengine_rw = {
1428         .name                   = "net",
1429         .version                = FIO_IOOPS_VERSION,
1430         .prep                   = fio_netio_prep,
1431         .queue                  = fio_netio_queue,
1432         .setup                  = fio_netio_setup,
1433         .init                   = fio_netio_init,
1434         .cleanup                = fio_netio_cleanup,
1435         .open_file              = fio_netio_open_file,
1436         .close_file             = fio_netio_close_file,
1437         .terminate              = fio_netio_terminate,
1438         .options                = options,
1439         .option_struct_size     = sizeof(struct netio_options),
1440         .flags                  = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
1441                                   FIO_PIPEIO | FIO_BIT_BASED,
1442 };
1443
1444 static int str_hostname_cb(void *data, const char *input)
1445 {
1446         struct netio_options *o = data;
1447
1448         if (o->td->o.filename)
1449                 free(o->td->o.filename);
1450         o->td->o.filename = strdup(input);
1451         return 0;
1452 }
1453
1454 static void fio_init fio_netio_register(void)
1455 {
1456         register_ioengine(&ioengine_rw);
1457 #ifdef CONFIG_LINUX_SPLICE
1458         register_ioengine(&ioengine_splice);
1459 #endif
1460 }
1461
1462 static void fio_exit fio_netio_unregister(void)
1463 {
1464         unregister_ioengine(&ioengine_rw);
1465 #ifdef CONFIG_LINUX_SPLICE
1466         unregister_ioengine(&ioengine_splice);
1467 #endif
1468 }