configure: add option to disable libnuma usage
[fio.git] / engines / net.c
1 /*
2  * net engine
3  *
4  * IO engine that reads/writes to/from sockets.
5  *
6  */
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include <unistd.h>
10 #include <signal.h>
11 #include <errno.h>
12 #include <assert.h>
13 #include <netinet/in.h>
14 #include <netinet/tcp.h>
15 #include <arpa/inet.h>
16 #include <netdb.h>
17 #include <sys/poll.h>
18 #include <sys/types.h>
19 #include <sys/stat.h>
20 #include <sys/socket.h>
21 #include <sys/un.h>
22
23 #include "../fio.h"
24
25 struct netio_data {
26         int listenfd;
27         int use_splice;
28         int pipes[2];
29         struct sockaddr_in addr;
30         struct sockaddr_un addr_un;
31 };
32
33 struct netio_options {
34         struct thread_data *td;
35         unsigned int port;
36         unsigned int proto;
37         unsigned int listen;
38         unsigned int pingpong;
39         unsigned int nodelay;
40         unsigned int ttl;
41         char * interface;
42 };
43
44 struct udp_close_msg {
45         uint32_t magic;
46         uint32_t cmd;
47 };
48
49 enum {
50         FIO_LINK_CLOSE = 0x89,
51         FIO_LINK_OPEN_CLOSE_MAGIC = 0x6c696e6b,
52         FIO_LINK_OPEN = 0x98,
53
54         FIO_TYPE_TCP    = 1,
55         FIO_TYPE_UDP    = 2,
56         FIO_TYPE_UNIX   = 3,
57 };
58
59 static int str_hostname_cb(void *data, const char *input);
60 static struct fio_option options[] = {
61         {
62                 .name   = "hostname",
63                 .lname  = "net engine hostname",
64                 .type   = FIO_OPT_STR_STORE,
65                 .cb     = str_hostname_cb,
66                 .help   = "Hostname for net IO engine",
67                 .category = FIO_OPT_C_ENGINE,
68                 .group  = FIO_OPT_G_NETIO,
69         },
70         {
71                 .name   = "port",
72                 .lname  = "net engine port",
73                 .type   = FIO_OPT_INT,
74                 .off1   = offsetof(struct netio_options, port),
75                 .minval = 1,
76                 .maxval = 65535,
77                 .help   = "Port to use for TCP or UDP net connections",
78                 .category = FIO_OPT_C_ENGINE,
79                 .group  = FIO_OPT_G_NETIO,
80         },
81         {
82                 .name   = "protocol",
83                 .lname  = "net engine protocol",
84                 .alias  = "proto",
85                 .type   = FIO_OPT_STR,
86                 .off1   = offsetof(struct netio_options, proto),
87                 .help   = "Network protocol to use",
88                 .def    = "tcp",
89                 .posval = {
90                           { .ival = "tcp",
91                             .oval = FIO_TYPE_TCP,
92                             .help = "Transmission Control Protocol",
93                           },
94                           { .ival = "udp",
95                             .oval = FIO_TYPE_UDP,
96                             .help = "User Datagram Protocol",
97                           },
98                           { .ival = "unix",
99                             .oval = FIO_TYPE_UNIX,
100                             .help = "UNIX domain socket",
101                           },
102                 },
103                 .category = FIO_OPT_C_ENGINE,
104                 .group  = FIO_OPT_G_NETIO,
105         },
106 #ifdef CONFIG_TCP_NODELAY
107         {
108                 .name   = "nodelay",
109                 .type   = FIO_OPT_BOOL,
110                 .off1   = offsetof(struct netio_options, nodelay),
111                 .help   = "Use TCP_NODELAY on TCP connections",
112                 .category = FIO_OPT_C_ENGINE,
113                 .group  = FIO_OPT_G_NETIO,
114         },
115 #endif
116         {
117                 .name   = "listen",
118                 .lname  = "net engine listen",
119                 .type   = FIO_OPT_STR_SET,
120                 .off1   = offsetof(struct netio_options, listen),
121                 .help   = "Listen for incoming TCP connections",
122                 .category = FIO_OPT_C_ENGINE,
123                 .group  = FIO_OPT_G_NETIO,
124         },
125         {
126                 .name   = "pingpong",
127                 .type   = FIO_OPT_STR_SET,
128                 .off1   = offsetof(struct netio_options, pingpong),
129                 .help   = "Ping-pong IO requests",
130                 .category = FIO_OPT_C_ENGINE,
131                 .group  = FIO_OPT_G_NETIO,
132         },
133         {
134                 .name   = "interface",
135                 .lname  = "net engine interface",
136                 .type   = FIO_OPT_STR_STORE,
137                 .off1   = offsetof(struct netio_options, interface),
138                 .help   = "Network interface to use",
139                 .category = FIO_OPT_C_ENGINE,
140                 .group  = FIO_OPT_G_NETIO,
141         },
142         {
143                 .name   = "ttl",
144                 .lname  = "net engine multicast ttl",
145                 .type   = FIO_OPT_INT,
146                 .off1   = offsetof(struct netio_options, ttl),
147                 .def    = "1",
148                 .minval = 0,
149                 .help   = "Time-to-live value for outgoing UDP multicast packets",
150                 .category = FIO_OPT_C_ENGINE,
151                 .group  = FIO_OPT_G_NETIO,
152         },
153         {
154                 .name   = NULL,
155         },
156 };
157
158 /*
159  * Return -1 for error and 'nr events' for a positive number
160  * of events
161  */
162 static int poll_wait(struct thread_data *td, int fd, short events)
163 {
164         struct pollfd pfd;
165         int ret;
166
167         while (!td->terminate) {
168                 pfd.fd = fd;
169                 pfd.events = events;
170                 ret = poll(&pfd, 1, -1);
171                 if (ret < 0) {
172                         if (errno == EINTR)
173                                 break;
174
175                         td_verror(td, errno, "poll");
176                         return -1;
177                 } else if (!ret)
178                         continue;
179
180                 break;
181         }
182
183         if (pfd.revents & events)
184                 return 1;
185
186         return -1;
187 }
188
189 static int fio_netio_is_multicast(const char *mcaddr)
190 {
191         in_addr_t addr = inet_network(mcaddr);
192         if (addr == -1)
193                 return 0;
194
195         if (inet_network("224.0.0.0") <= addr &&
196             inet_network("239.255.255.255") >= addr)
197                 return 1;
198
199         return 0;
200 }
201
202
203 static int fio_netio_prep(struct thread_data *td, struct io_u *io_u)
204 {
205         struct netio_options *o = td->eo;
206
207         /*
208          * Make sure we don't see spurious reads to a receiver, and vice versa
209          */
210         if (o->proto == FIO_TYPE_TCP)
211                 return 0;
212
213         if ((o->listen && io_u->ddir == DDIR_WRITE) ||
214             (!o->listen && io_u->ddir == DDIR_READ)) {
215                 td_verror(td, EINVAL, "bad direction");
216                 return 1;
217         }
218
219         return 0;
220 }
221
222 #ifdef CONFIG_LINUX_SPLICE
223 static int splice_io_u(int fdin, int fdout, unsigned int len)
224 {
225         int bytes = 0;
226
227         while (len) {
228                 int ret = splice(fdin, NULL, fdout, NULL, len, 0);
229
230                 if (ret < 0) {
231                         if (!bytes)
232                                 bytes = ret;
233
234                         break;
235                 } else if (!ret)
236                         break;
237
238                 bytes += ret;
239                 len -= ret;
240         }
241
242         return bytes;
243 }
244
245 /*
246  * Receive bytes from a socket and fill them into the internal pipe
247  */
248 static int splice_in(struct thread_data *td, struct io_u *io_u)
249 {
250         struct netio_data *nd = td->io_ops->data;
251
252         return splice_io_u(io_u->file->fd, nd->pipes[1], io_u->xfer_buflen);
253 }
254
255 /*
256  * Transmit 'len' bytes from the internal pipe
257  */
258 static int splice_out(struct thread_data *td, struct io_u *io_u,
259                       unsigned int len)
260 {
261         struct netio_data *nd = td->io_ops->data;
262
263         return splice_io_u(nd->pipes[0], io_u->file->fd, len);
264 }
265
266 static int vmsplice_io_u(struct io_u *io_u, int fd, unsigned int len)
267 {
268         struct iovec iov = {
269                 .iov_base = io_u->xfer_buf,
270                 .iov_len = len,
271         };
272         int bytes = 0;
273
274         while (iov.iov_len) {
275                 int ret = vmsplice(fd, &iov, 1, SPLICE_F_MOVE);
276
277                 if (ret < 0) {
278                         if (!bytes)
279                                 bytes = ret;
280                         break;
281                 } else if (!ret)
282                         break;
283
284                 iov.iov_len -= ret;
285                 iov.iov_base += ret;
286                 bytes += ret;
287         }
288
289         return bytes;
290
291 }
292
293 /*
294  * vmsplice() pipe to io_u buffer
295  */
296 static int vmsplice_io_u_out(struct thread_data *td, struct io_u *io_u,
297                              unsigned int len)
298 {
299         struct netio_data *nd = td->io_ops->data;
300
301         return vmsplice_io_u(io_u, nd->pipes[0], len);
302 }
303
304 /*
305  * vmsplice() io_u to pipe
306  */
307 static int vmsplice_io_u_in(struct thread_data *td, struct io_u *io_u)
308 {
309         struct netio_data *nd = td->io_ops->data;
310
311         return vmsplice_io_u(io_u, nd->pipes[1], io_u->xfer_buflen);
312 }
313
314 /*
315  * splice receive - transfer socket data into a pipe using splice, then map
316  * that pipe data into the io_u using vmsplice.
317  */
318 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
319 {
320         int ret;
321
322         ret = splice_in(td, io_u);
323         if (ret > 0)
324                 return vmsplice_io_u_out(td, io_u, ret);
325
326         return ret;
327 }
328
329 /*
330  * splice transmit - map data from the io_u into a pipe by using vmsplice,
331  * then transfer that pipe to a socket using splice.
332  */
333 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
334 {
335         int ret;
336
337         ret = vmsplice_io_u_in(td, io_u);
338         if (ret > 0)
339                 return splice_out(td, io_u, ret);
340
341         return ret;
342 }
343 #else
344 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
345 {
346         errno = EOPNOTSUPP;
347         return -1;
348 }
349
350 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
351 {
352         errno = EOPNOTSUPP;
353         return -1;
354 }
355 #endif
356
357 static int fio_netio_send(struct thread_data *td, struct io_u *io_u)
358 {
359         struct netio_data *nd = td->io_ops->data;
360         struct netio_options *o = td->eo;
361         int ret, flags = 0;
362
363         do {
364                 if (o->proto == FIO_TYPE_UDP) {
365                         struct sockaddr *to = (struct sockaddr *) &nd->addr;
366
367                         ret = sendto(io_u->file->fd, io_u->xfer_buf,
368                                         io_u->xfer_buflen, flags, to,
369                                         sizeof(*to));
370                 } else {
371                         /*
372                          * if we are going to write more, set MSG_MORE
373                          */
374 #ifdef MSG_MORE
375                         if ((td->this_io_bytes[DDIR_WRITE] + io_u->xfer_buflen <
376                             td->o.size) && !o->pingpong)
377                                 flags |= MSG_MORE;
378 #endif
379                         ret = send(io_u->file->fd, io_u->xfer_buf,
380                                         io_u->xfer_buflen, flags);
381                 }
382                 if (ret > 0)
383                         break;
384
385                 ret = poll_wait(td, io_u->file->fd, POLLOUT);
386                 if (ret <= 0)
387                         break;
388         } while (1);
389
390         return ret;
391 }
392
393 static int is_udp_close(struct io_u *io_u, int len)
394 {
395         struct udp_close_msg *msg;
396
397         if (len != sizeof(struct udp_close_msg))
398                 return 0;
399
400         msg = io_u->xfer_buf;
401         if (ntohl(msg->magic) != FIO_LINK_OPEN_CLOSE_MAGIC)
402                 return 0;
403         if (ntohl(msg->cmd) != FIO_LINK_CLOSE)
404                 return 0;
405
406         return 1;
407 }
408
409 static int fio_netio_recv(struct thread_data *td, struct io_u *io_u)
410 {
411         struct netio_data *nd = td->io_ops->data;
412         struct netio_options *o = td->eo;
413         int ret, flags = 0;
414
415         do {
416                 if (o->proto == FIO_TYPE_UDP) {
417                         socklen_t l;
418                         socklen_t *len = &l;
419                         struct sockaddr *from;
420
421                         if (o->listen) {
422                                 from = (struct sockaddr *) &nd->addr;
423                                 *len = sizeof(nd->addr);
424                         } else {
425                                 from = NULL;
426                                 len = NULL;
427                         }
428
429                         ret = recvfrom(io_u->file->fd, io_u->xfer_buf,
430                                         io_u->xfer_buflen, flags, from, len);
431                         if (is_udp_close(io_u, ret)) {
432                                 td->done = 1;
433                                 return 0;
434                         }
435                 } else {
436                         ret = recv(io_u->file->fd, io_u->xfer_buf,
437                                         io_u->xfer_buflen, flags);
438                 }
439                 if (ret > 0)
440                         break;
441                 else if (!ret && (flags & MSG_WAITALL))
442                         break;
443
444                 ret = poll_wait(td, io_u->file->fd, POLLIN);
445                 if (ret <= 0)
446                         break;
447                 flags |= MSG_WAITALL;
448         } while (1);
449
450         return ret;
451 }
452
453 static int __fio_netio_queue(struct thread_data *td, struct io_u *io_u,
454                              enum fio_ddir ddir)
455 {
456         struct netio_data *nd = td->io_ops->data;
457         struct netio_options *o = td->eo;
458         int ret;
459
460         if (ddir == DDIR_WRITE) {
461                 if (!nd->use_splice || o->proto == FIO_TYPE_UDP ||
462                     o->proto == FIO_TYPE_UNIX)
463                         ret = fio_netio_send(td, io_u);
464                 else
465                         ret = fio_netio_splice_out(td, io_u);
466         } else if (ddir == DDIR_READ) {
467                 if (!nd->use_splice || o->proto == FIO_TYPE_UDP ||
468                     o->proto == FIO_TYPE_UNIX)
469                         ret = fio_netio_recv(td, io_u);
470                 else
471                         ret = fio_netio_splice_in(td, io_u);
472         } else
473                 ret = 0;        /* must be a SYNC */
474
475         if (ret != (int) io_u->xfer_buflen) {
476                 if (ret >= 0) {
477                         io_u->resid = io_u->xfer_buflen - ret;
478                         io_u->error = 0;
479                         return FIO_Q_COMPLETED;
480                 } else {
481                         int err = errno;
482
483                         if (ddir == DDIR_WRITE && err == EMSGSIZE)
484                                 return FIO_Q_BUSY;
485
486                         io_u->error = err;
487                 }
488         }
489
490         if (io_u->error)
491                 td_verror(td, io_u->error, "xfer");
492
493         return FIO_Q_COMPLETED;
494 }
495
496 static int fio_netio_queue(struct thread_data *td, struct io_u *io_u)
497 {
498         struct netio_options *o = td->eo;
499         int ret;
500
501         fio_ro_check(td, io_u);
502
503         ret = __fio_netio_queue(td, io_u, io_u->ddir);
504         if (!o->pingpong || ret != FIO_Q_COMPLETED)
505                 return ret;
506
507         /*
508          * For ping-pong mode, receive or send reply as needed
509          */
510         if (td_read(td) && io_u->ddir == DDIR_READ)
511                 ret = __fio_netio_queue(td, io_u, DDIR_WRITE);
512         else if (td_write(td) && io_u->ddir == DDIR_WRITE)
513                 ret = __fio_netio_queue(td, io_u, DDIR_READ);
514
515         return ret;
516 }
517
518 static int fio_netio_connect(struct thread_data *td, struct fio_file *f)
519 {
520         struct netio_data *nd = td->io_ops->data;
521         struct netio_options *o = td->eo;
522         int type, domain;
523
524         if (o->proto == FIO_TYPE_TCP) {
525                 domain = AF_INET;
526                 type = SOCK_STREAM;
527         } else if (o->proto == FIO_TYPE_UDP) {
528                 domain = AF_INET;
529                 type = SOCK_DGRAM;
530         } else if (o->proto == FIO_TYPE_UNIX) {
531                 domain = AF_UNIX;
532                 type = SOCK_STREAM;
533         } else {
534                 log_err("fio: bad network type %d\n", o->proto);
535                 f->fd = -1;
536                 return 1;
537         }
538
539         f->fd = socket(domain, type, 0);
540         if (f->fd < 0) {
541                 td_verror(td, errno, "socket");
542                 return 1;
543         }
544
545 #ifdef CONFIG_TCP_NODELAY
546         if (o->nodelay && o->proto == FIO_TYPE_TCP) {
547                 int optval = 1;
548
549                 if (setsockopt(f->fd, IPPROTO_TCP, TCP_NODELAY, (void *) &optval, sizeof(int)) < 0) {
550                         log_err("fio: cannot set TCP_NODELAY option on socket (%s), disable with 'nodelay=0'\n", strerror(errno));
551                         return 1;
552                 }
553         }
554 #endif
555
556         if (o->proto == FIO_TYPE_UDP) {
557                 if (!fio_netio_is_multicast(td->o.filename))
558                         return 0;
559
560                 if (o->interface) {
561                         struct in_addr interface_addr;
562                         if (inet_aton(o->interface, &interface_addr) == 0) {
563                                 log_err("fio: interface not valid interface IP\n");
564                                 close(f->fd);
565                                 return 1;
566                         }
567                         if (setsockopt(f->fd, IPPROTO_IP, IP_MULTICAST_IF, &interface_addr, sizeof(interface_addr)) < 0) {
568                                 td_verror(td, errno, "setsockopt IP_MULTICAST_IF");
569                                 close(f->fd);
570                                 return 1;
571                         }
572                 }
573                 if (setsockopt(f->fd, IPPROTO_IP, IP_MULTICAST_TTL, &o->ttl, sizeof(o->ttl)) < 0) {
574                         td_verror(td, errno, "setsockopt IP_MULTICAST_TTL");
575                         close(f->fd);
576                         return 1;
577                 }
578                 return 0;
579         } else if (o->proto == FIO_TYPE_TCP) {
580                 socklen_t len = sizeof(nd->addr);
581
582                 if (connect(f->fd, (struct sockaddr *) &nd->addr, len) < 0) {
583                         td_verror(td, errno, "connect");
584                         close(f->fd);
585                         return 1;
586                 }
587         } else {
588                 struct sockaddr_un *addr = &nd->addr_un;
589                 socklen_t len;
590
591                 len = sizeof(addr->sun_family) + strlen(addr->sun_path) + 1;
592
593                 if (connect(f->fd, (struct sockaddr *) addr, len) < 0) {
594                         td_verror(td, errno, "connect");
595                         close(f->fd);
596                         return 1;
597                 }
598         }
599
600         return 0;
601 }
602
603 static int fio_netio_accept(struct thread_data *td, struct fio_file *f)
604 {
605         struct netio_data *nd = td->io_ops->data;
606         struct netio_options *o = td->eo;
607         socklen_t socklen = sizeof(nd->addr);
608         int state;
609
610         if (o->proto == FIO_TYPE_UDP) {
611                 f->fd = nd->listenfd;
612                 return 0;
613         }
614
615         state = td->runstate;
616         td_set_runstate(td, TD_SETTING_UP);
617
618         log_info("fio: waiting for connection\n");
619
620         if (poll_wait(td, nd->listenfd, POLLIN) < 0)
621                 goto err;
622
623         f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr, &socklen);
624         if (f->fd < 0) {
625                 td_verror(td, errno, "accept");
626                 goto err;
627         }
628
629 #ifdef CONFIG_TCP_NODELAY
630         if (o->nodelay && o->proto == FIO_TYPE_TCP) {
631                 int optval = 1;
632
633                 if (setsockopt(f->fd, IPPROTO_TCP, TCP_NODELAY, (void *) &optval, sizeof(int)) < 0) {
634                         log_err("fio: cannot set TCP_NODELAY option on socket (%s), disable with 'nodelay=0'\n", strerror(errno));
635                         return 1;
636                 }
637         }
638 #endif
639
640         reset_all_stats(td);
641         td_set_runstate(td, state);
642         return 0;
643 err:
644         td_set_runstate(td, state);
645         return 1;
646 }
647
648 static void fio_netio_udp_close(struct thread_data *td, struct fio_file *f)
649 {
650         struct netio_data *nd = td->io_ops->data;
651         struct udp_close_msg msg;
652         struct sockaddr *to = (struct sockaddr *) &nd->addr;
653         int ret;
654
655         msg.magic = htonl(FIO_LINK_OPEN_CLOSE_MAGIC);
656         msg.cmd = htonl(FIO_LINK_CLOSE);
657
658         ret = sendto(f->fd, (void *) &msg, sizeof(msg), MSG_WAITALL, to,
659                         sizeof(nd->addr));
660         if (ret < 0)
661                 td_verror(td, errno, "sendto udp link close");
662 }
663
664 static int fio_netio_close_file(struct thread_data *td, struct fio_file *f)
665 {
666         struct netio_options *o = td->eo;
667
668         /*
669          * If this is an UDP connection, notify the receiver that we are
670          * closing down the link
671          */
672         if (o->proto == FIO_TYPE_UDP)
673                 fio_netio_udp_close(td, f);
674
675         return generic_close_file(td, f);
676 }
677
678 static int fio_netio_udp_recv_open(struct thread_data *td, struct fio_file *f)
679 {
680         struct netio_data *nd = td->io_ops->data;
681         struct udp_close_msg msg;
682         struct sockaddr *to = (struct sockaddr *) &nd->addr;
683         socklen_t len = sizeof(nd->addr);
684         int ret;
685
686         ret = recvfrom(f->fd, (void *) &msg, sizeof(msg), MSG_WAITALL, to, &len);
687         if (ret < 0) {
688                 td_verror(td, errno, "recvfrom udp link open");
689                 return ret;
690         }
691
692         if (ntohl(msg.magic) != FIO_LINK_OPEN_CLOSE_MAGIC ||
693             ntohl(msg.cmd) != FIO_LINK_OPEN) {
694                 log_err("fio: bad udp open magic %x/%x\n", ntohl(msg.magic),
695                                                                 ntohl(msg.cmd));
696                 return -1;
697         }
698
699         return 0;
700 }
701
702 static int fio_netio_udp_send_open(struct thread_data *td, struct fio_file *f)
703 {
704         struct netio_data *nd = td->io_ops->data;
705         struct udp_close_msg msg;
706         struct sockaddr *to = (struct sockaddr *) &nd->addr;
707         int ret;
708
709         msg.magic = htonl(FIO_LINK_OPEN_CLOSE_MAGIC);
710         msg.cmd = htonl(FIO_LINK_OPEN);
711
712         ret = sendto(f->fd, (void *) &msg, sizeof(msg), MSG_WAITALL, to,
713                         sizeof(nd->addr));
714         if (ret < 0) {
715                 td_verror(td, errno, "sendto udp link open");
716                 return ret;
717         }
718
719         return 0;
720 }
721
722 static int fio_netio_open_file(struct thread_data *td, struct fio_file *f)
723 {
724         int ret;
725         struct netio_options *o = td->eo;
726
727         if (o->listen)
728                 ret = fio_netio_accept(td, f);
729         else
730                 ret = fio_netio_connect(td, f);
731
732         if (ret) {
733                 f->fd = -1;
734                 return ret;
735         }
736
737         if (o->proto == FIO_TYPE_UDP) {
738                 if (td_write(td))
739                         ret = fio_netio_udp_send_open(td, f);
740                 else {
741                         int state;
742
743                         state = td->runstate;
744                         td_set_runstate(td, TD_SETTING_UP);
745                         ret = fio_netio_udp_recv_open(td, f);
746                         td_set_runstate(td, state);
747                 }
748         }
749
750         if (ret)
751                 fio_netio_close_file(td, f);
752
753         return ret;
754 }
755
756 static int fio_netio_setup_connect_inet(struct thread_data *td,
757                                         const char *host, unsigned short port)
758 {
759         struct netio_data *nd = td->io_ops->data;
760
761         if (!host) {
762                 log_err("fio: connect with no host to connect to.\n");
763                 if (td_read(td))
764                         log_err("fio: did you forget to set 'listen'?\n");
765
766                 td_verror(td, EINVAL, "no hostname= set");
767                 return 1;
768         }
769
770         nd->addr.sin_family = AF_INET;
771         nd->addr.sin_port = htons(port);
772
773         if (inet_aton(host, &nd->addr.sin_addr) != 1) {
774                 struct hostent *hent;
775
776                 hent = gethostbyname(host);
777                 if (!hent) {
778                         td_verror(td, errno, "gethostbyname");
779                         return 1;
780                 }
781
782                 memcpy(&nd->addr.sin_addr, hent->h_addr, 4);
783         }
784
785         return 0;
786 }
787
788 static int fio_netio_setup_connect_unix(struct thread_data *td,
789                                         const char *path)
790 {
791         struct netio_data *nd = td->io_ops->data;
792         struct sockaddr_un *soun = &nd->addr_un;
793
794         soun->sun_family = AF_UNIX;
795         strcpy(soun->sun_path, path);
796         return 0;
797 }
798
799 static int fio_netio_setup_connect(struct thread_data *td)
800 {
801         struct netio_options *o = td->eo;
802
803         if (o->proto == FIO_TYPE_UDP || o->proto == FIO_TYPE_TCP)
804                 return fio_netio_setup_connect_inet(td, td->o.filename,o->port);
805         else
806                 return fio_netio_setup_connect_unix(td, td->o.filename);
807 }
808
809 static int fio_netio_setup_listen_unix(struct thread_data *td, const char *path)
810 {
811         struct netio_data *nd = td->io_ops->data;
812         struct sockaddr_un *addr = &nd->addr_un;
813         mode_t mode;
814         int len, fd;
815
816         fd = socket(AF_UNIX, SOCK_STREAM, 0);
817         if (fd < 0) {
818                 log_err("fio: socket: %s\n", strerror(errno));
819                 return -1;
820         }
821
822         mode = umask(000);
823
824         memset(addr, 0, sizeof(*addr));
825         addr->sun_family = AF_UNIX;
826         strcpy(addr->sun_path, path);
827         unlink(path);
828
829         len = sizeof(addr->sun_family) + strlen(path) + 1;
830
831         if (bind(fd, (struct sockaddr *) addr, len) < 0) {
832                 log_err("fio: bind: %s\n", strerror(errno));
833                 close(fd);
834                 return -1;
835         }
836
837         umask(mode);
838         nd->listenfd = fd;
839         return 0;
840 }
841
842 static int fio_netio_setup_listen_inet(struct thread_data *td, short port)
843 {
844         struct netio_data *nd = td->io_ops->data;
845         struct netio_options *o = td->eo;
846         struct ip_mreq mr;
847         struct sockaddr_in sin;
848         int fd, opt, type;
849
850         memset(&sin, 0, sizeof(sin));
851         if (o->proto == FIO_TYPE_TCP)
852                 type = SOCK_STREAM;
853         else
854                 type = SOCK_DGRAM;
855
856         fd = socket(AF_INET, type, 0);
857         if (fd < 0) {
858                 td_verror(td, errno, "socket");
859                 return 1;
860         }
861
862         opt = 1;
863         if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *) &opt, sizeof(opt)) < 0) {
864                 td_verror(td, errno, "setsockopt");
865                 close(fd);
866                 return 1;
867         }
868 #ifdef SO_REUSEPORT
869         if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, (void *) &opt, sizeof(opt)) < 0) {
870                 td_verror(td, errno, "setsockopt");
871                 close(fd);
872                 return 1;
873         }
874 #endif
875
876         if (td->o.filename){
877                 if(o->proto != FIO_TYPE_UDP ||
878                    !fio_netio_is_multicast(td->o.filename)) {
879                         log_err("fio: hostname not valid for non-multicast inbound network IO\n");
880                         close(fd);
881                         return 1;
882                 }
883
884                 inet_aton(td->o.filename, &sin.sin_addr);
885
886                 mr.imr_multiaddr = sin.sin_addr;
887                 if (o->interface) {
888                         if (inet_aton(o->interface, &mr.imr_interface) == 0) {
889                                 log_err("fio: interface not valid interface IP\n");
890                                 close(fd);
891                                 return 1;
892                         }
893                 } else {
894                         mr.imr_interface.s_addr = htonl(INADDR_ANY);
895                 }
896                 if (setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mr, sizeof(mr)) < 0) {
897                         td_verror(td, errno, "setsockopt IP_ADD_MEMBERSHIP");
898                         close(fd);
899                         return 1;
900                 }
901         }
902
903         nd->addr.sin_family = AF_INET;
904         nd->addr.sin_addr.s_addr = sin.sin_addr.s_addr ? sin.sin_addr.s_addr : htonl(INADDR_ANY);
905         nd->addr.sin_port = htons(port);
906
907         if (bind(fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
908                 td_verror(td, errno, "bind");
909                 return 1;
910         }
911
912         nd->listenfd = fd;
913         return 0;
914 }
915
916 static int fio_netio_setup_listen(struct thread_data *td)
917 {
918         struct netio_data *nd = td->io_ops->data;
919         struct netio_options *o = td->eo;
920         int ret;
921
922         if (o->proto == FIO_TYPE_UDP || o->proto == FIO_TYPE_TCP)
923                 ret = fio_netio_setup_listen_inet(td, o->port);
924         else
925                 ret = fio_netio_setup_listen_unix(td, td->o.filename);
926
927         if (ret)
928                 return ret;
929         if (o->proto == FIO_TYPE_UDP)
930                 return 0;
931
932         if (listen(nd->listenfd, 10) < 0) {
933                 td_verror(td, errno, "listen");
934                 nd->listenfd = -1;
935                 return 1;
936         }
937
938         return 0;
939 }
940
941 static int fio_netio_init(struct thread_data *td)
942 {
943         struct netio_options *o = td->eo;
944         int ret;
945
946 #ifdef WIN32
947         WSADATA wsd;
948         WSAStartup(MAKEWORD(2,2), &wsd);
949 #endif
950
951         if (td_random(td)) {
952                 log_err("fio: network IO can't be random\n");
953                 return 1;
954         }
955
956         if (o->proto == FIO_TYPE_UNIX && o->port) {
957                 log_err("fio: network IO port not valid with unix socket\n");
958                 return 1;
959         } else if (o->proto != FIO_TYPE_UNIX && !o->port) {
960                 log_err("fio: network IO requires port for tcp or udp\n");
961                 return 1;
962         }
963
964         if (o->proto != FIO_TYPE_TCP) {
965                 if (o->listen) {
966                         log_err("fio: listen only valid for TCP proto IO\n");
967                         return 1;
968                 }
969                 if (td_rw(td)) {
970                         log_err("fio: datagram network connections must be"
971                                    " read OR write\n");
972                         return 1;
973                 }
974                 if (o->proto == FIO_TYPE_UNIX && !td->o.filename) {
975                         log_err("fio: UNIX sockets need host/filename\n");
976                         return 1;
977                 }
978                 o->listen = td_read(td);
979         }
980
981         if (o->listen)
982                 ret = fio_netio_setup_listen(td);
983         else
984                 ret = fio_netio_setup_connect(td);
985
986         return ret;
987 }
988
989 static void fio_netio_cleanup(struct thread_data *td)
990 {
991         struct netio_data *nd = td->io_ops->data;
992
993         if (nd) {
994                 if (nd->listenfd != -1)
995                         close(nd->listenfd);
996                 if (nd->pipes[0] != -1)
997                         close(nd->pipes[0]);
998                 if (nd->pipes[1] != -1)
999                         close(nd->pipes[1]);
1000
1001                 free(nd);
1002         }
1003 }
1004
1005 static int fio_netio_setup(struct thread_data *td)
1006 {
1007         struct netio_data *nd;
1008
1009         if (!td->files_index) {
1010                 add_file(td, td->o.filename ?: "net");
1011                 td->o.nr_files = td->o.nr_files ?: 1;
1012         }
1013
1014         if (!td->io_ops->data) {
1015                 nd = malloc(sizeof(*nd));;
1016
1017                 memset(nd, 0, sizeof(*nd));
1018                 nd->listenfd = -1;
1019                 nd->pipes[0] = nd->pipes[1] = -1;
1020                 td->io_ops->data = nd;
1021         }
1022
1023         return 0;
1024 }
1025
1026 static void fio_netio_terminate(struct thread_data *td)
1027 {
1028         kill(td->pid, SIGUSR2);
1029 }
1030
1031 #ifdef CONFIG_LINUX_SPLICE
1032 static int fio_netio_setup_splice(struct thread_data *td)
1033 {
1034         struct netio_data *nd;
1035
1036         fio_netio_setup(td);
1037
1038         nd = td->io_ops->data;
1039         if (nd) {
1040                 if (pipe(nd->pipes) < 0)
1041                         return 1;
1042
1043                 nd->use_splice = 1;
1044                 return 0;
1045         }
1046
1047         return 1;
1048 }
1049
1050 static struct ioengine_ops ioengine_splice = {
1051         .name                   = "netsplice",
1052         .version                = FIO_IOOPS_VERSION,
1053         .prep                   = fio_netio_prep,
1054         .queue                  = fio_netio_queue,
1055         .setup                  = fio_netio_setup_splice,
1056         .init                   = fio_netio_init,
1057         .cleanup                = fio_netio_cleanup,
1058         .open_file              = fio_netio_open_file,
1059         .close_file             = fio_netio_close_file,
1060         .terminate              = fio_netio_terminate,
1061         .options                = options,
1062         .option_struct_size     = sizeof(struct netio_options),
1063         .flags                  = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
1064                                   FIO_PIPEIO,
1065 };
1066 #endif
1067
1068 static struct ioengine_ops ioengine_rw = {
1069         .name                   = "net",
1070         .version                = FIO_IOOPS_VERSION,
1071         .prep                   = fio_netio_prep,
1072         .queue                  = fio_netio_queue,
1073         .setup                  = fio_netio_setup,
1074         .init                   = fio_netio_init,
1075         .cleanup                = fio_netio_cleanup,
1076         .open_file              = fio_netio_open_file,
1077         .close_file             = fio_netio_close_file,
1078         .terminate              = fio_netio_terminate,
1079         .options                = options,
1080         .option_struct_size     = sizeof(struct netio_options),
1081         .flags                  = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
1082                                   FIO_PIPEIO | FIO_BIT_BASED,
1083 };
1084
1085 static int str_hostname_cb(void *data, const char *input)
1086 {
1087         struct netio_options *o = data;
1088
1089         if (o->td->o.filename)
1090                 free(o->td->o.filename);
1091         o->td->o.filename = strdup(input);
1092         return 0;
1093 }
1094
1095 static void fio_init fio_netio_register(void)
1096 {
1097         register_ioengine(&ioengine_rw);
1098 #ifdef CONFIG_LINUX_SPLICE
1099         register_ioengine(&ioengine_splice);
1100 #endif
1101 }
1102
1103 static void fio_exit fio_netio_unregister(void)
1104 {
1105         unregister_ioengine(&ioengine_rw);
1106 #ifdef CONFIG_LINUX_SPLICE
1107         unregister_ioengine(&ioengine_splice);
1108 #endif
1109 }