Merge branch 'master' into gfio
[fio.git] / engines / net.c
1 /*
2  * net engine
3  *
4  * IO engine that reads/writes to/from sockets.
5  *
6  */
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include <unistd.h>
10 #include <signal.h>
11 #include <errno.h>
12 #include <assert.h>
13 #include <netinet/in.h>
14 #include <netinet/tcp.h>
15 #include <arpa/inet.h>
16 #include <netdb.h>
17 #include <sys/poll.h>
18 #include <sys/types.h>
19 #include <sys/stat.h>
20 #include <sys/socket.h>
21 #include <sys/un.h>
22
23 #include "../fio.h"
24
25 struct netio_data {
26         int listenfd;
27         int use_splice;
28         int pipes[2];
29         struct sockaddr_in addr;
30         struct sockaddr_un addr_un;
31 };
32
33 struct netio_options {
34         struct thread_data *td;
35         unsigned int port;
36         unsigned int proto;
37         unsigned int listen;
38         unsigned int pingpong;
39         unsigned int nodelay;
40 };
41
42 struct udp_close_msg {
43         uint32_t magic;
44         uint32_t cmd;
45 };
46
47 enum {
48         FIO_LINK_CLOSE = 0x89,
49         FIO_LINK_OPEN_CLOSE_MAGIC = 0x6c696e6b,
50         FIO_LINK_OPEN = 0x98,
51
52         FIO_TYPE_TCP    = 1,
53         FIO_TYPE_UDP    = 2,
54         FIO_TYPE_UNIX   = 3,
55 };
56
57 static int str_hostname_cb(void *data, const char *input);
58 static struct fio_option options[] = {
59         {
60                 .name   = "hostname",
61                 .lname  = "net engine hostname",
62                 .type   = FIO_OPT_STR_STORE,
63                 .cb     = str_hostname_cb,
64                 .help   = "Hostname for net IO engine",
65                 .category = FIO_OPT_C_ENGINE,
66                 .group  = FIO_OPT_G_NETIO,
67         },
68         {
69                 .name   = "port",
70                 .lname  = "net engine port",
71                 .type   = FIO_OPT_INT,
72                 .off1   = offsetof(struct netio_options, port),
73                 .minval = 1,
74                 .maxval = 65535,
75                 .help   = "Port to use for TCP or UDP net connections",
76                 .category = FIO_OPT_C_ENGINE,
77                 .group  = FIO_OPT_G_NETIO,
78         },
79         {
80                 .name   = "protocol",
81                 .lname  = "net engine protocol",
82                 .alias  = "proto",
83                 .type   = FIO_OPT_STR,
84                 .off1   = offsetof(struct netio_options, proto),
85                 .help   = "Network protocol to use",
86                 .def    = "tcp",
87                 .category = FIO_OPT_C_IO,
88                 .posval = {
89                           { .ival = "tcp",
90                             .oval = FIO_TYPE_TCP,
91                             .help = "Transmission Control Protocol",
92                           },
93                           { .ival = "udp",
94                             .oval = FIO_TYPE_UDP,
95                             .help = "User Datagram Protocol",
96                           },
97                           { .ival = "unix",
98                             .oval = FIO_TYPE_UNIX,
99                             .help = "UNIX domain socket",
100                           },
101                 },
102                 .category = FIO_OPT_C_ENGINE,
103                 .group  = FIO_OPT_G_NETIO,
104         },
105 #ifdef CONFIG_TCP_NODELAY
106         {
107                 .name   = "nodelay",
108                 .type   = FIO_OPT_BOOL,
109                 .off1   = offsetof(struct netio_options, nodelay),
110                 .help   = "Use TCP_NODELAY on TCP connections",
111                 .category = FIO_OPT_C_ENGINE,
112                 .group  = FIO_OPT_G_NETIO,
113         },
114 #endif
115         {
116                 .name   = "listen",
117                 .lname  = "net engine listen",
118                 .type   = FIO_OPT_STR_SET,
119                 .off1   = offsetof(struct netio_options, listen),
120                 .help   = "Listen for incoming TCP connections",
121                 .category = FIO_OPT_C_ENGINE,
122                 .group  = FIO_OPT_G_NETIO,
123         },
124         {
125                 .name   = "pingpong",
126                 .type   = FIO_OPT_STR_SET,
127                 .off1   = offsetof(struct netio_options, pingpong),
128                 .help   = "Ping-pong IO requests",
129                 .category = FIO_OPT_C_ENGINE,
130                 .group  = FIO_OPT_G_NETIO,
131         },
132         {
133                 .name   = NULL,
134         },
135 };
136
137 /*
138  * Return -1 for error and 'nr events' for a positive number
139  * of events
140  */
141 static int poll_wait(struct thread_data *td, int fd, short events)
142 {
143         struct pollfd pfd;
144         int ret;
145
146         while (!td->terminate) {
147                 pfd.fd = fd;
148                 pfd.events = events;
149                 ret = poll(&pfd, 1, -1);
150                 if (ret < 0) {
151                         if (errno == EINTR)
152                                 break;
153
154                         td_verror(td, errno, "poll");
155                         return -1;
156                 } else if (!ret)
157                         continue;
158
159                 break;
160         }
161
162         if (pfd.revents & events)
163                 return 1;
164
165         return -1;
166 }
167
168 static int fio_netio_prep(struct thread_data *td, struct io_u *io_u)
169 {
170         struct netio_options *o = td->eo;
171
172         /*
173          * Make sure we don't see spurious reads to a receiver, and vice versa
174          */
175         if (o->proto == FIO_TYPE_TCP)
176                 return 0;
177
178         if ((o->listen && io_u->ddir == DDIR_WRITE) ||
179             (!o->listen && io_u->ddir == DDIR_READ)) {
180                 td_verror(td, EINVAL, "bad direction");
181                 return 1;
182         }
183
184         return 0;
185 }
186
187 #ifdef CONFIG_LINUX_SPLICE
188 static int splice_io_u(int fdin, int fdout, unsigned int len)
189 {
190         int bytes = 0;
191
192         while (len) {
193                 int ret = splice(fdin, NULL, fdout, NULL, len, 0);
194
195                 if (ret < 0) {
196                         if (!bytes)
197                                 bytes = ret;
198
199                         break;
200                 } else if (!ret)
201                         break;
202
203                 bytes += ret;
204                 len -= ret;
205         }
206
207         return bytes;
208 }
209
210 /*
211  * Receive bytes from a socket and fill them into the internal pipe
212  */
213 static int splice_in(struct thread_data *td, struct io_u *io_u)
214 {
215         struct netio_data *nd = td->io_ops->data;
216
217         return splice_io_u(io_u->file->fd, nd->pipes[1], io_u->xfer_buflen);
218 }
219
220 /*
221  * Transmit 'len' bytes from the internal pipe
222  */
223 static int splice_out(struct thread_data *td, struct io_u *io_u,
224                       unsigned int len)
225 {
226         struct netio_data *nd = td->io_ops->data;
227
228         return splice_io_u(nd->pipes[0], io_u->file->fd, len);
229 }
230
231 static int vmsplice_io_u(struct io_u *io_u, int fd, unsigned int len)
232 {
233         struct iovec iov = {
234                 .iov_base = io_u->xfer_buf,
235                 .iov_len = len,
236         };
237         int bytes = 0;
238
239         while (iov.iov_len) {
240                 int ret = vmsplice(fd, &iov, 1, SPLICE_F_MOVE);
241
242                 if (ret < 0) {
243                         if (!bytes)
244                                 bytes = ret;
245                         break;
246                 } else if (!ret)
247                         break;
248
249                 iov.iov_len -= ret;
250                 iov.iov_base += ret;
251                 bytes += ret;
252         }
253
254         return bytes;
255
256 }
257
258 /*
259  * vmsplice() pipe to io_u buffer
260  */
261 static int vmsplice_io_u_out(struct thread_data *td, struct io_u *io_u,
262                              unsigned int len)
263 {
264         struct netio_data *nd = td->io_ops->data;
265
266         return vmsplice_io_u(io_u, nd->pipes[0], len);
267 }
268
269 /*
270  * vmsplice() io_u to pipe
271  */
272 static int vmsplice_io_u_in(struct thread_data *td, struct io_u *io_u)
273 {
274         struct netio_data *nd = td->io_ops->data;
275
276         return vmsplice_io_u(io_u, nd->pipes[1], io_u->xfer_buflen);
277 }
278
279 /*
280  * splice receive - transfer socket data into a pipe using splice, then map
281  * that pipe data into the io_u using vmsplice.
282  */
283 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
284 {
285         int ret;
286
287         ret = splice_in(td, io_u);
288         if (ret > 0)
289                 return vmsplice_io_u_out(td, io_u, ret);
290
291         return ret;
292 }
293
294 /*
295  * splice transmit - map data from the io_u into a pipe by using vmsplice,
296  * then transfer that pipe to a socket using splice.
297  */
298 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
299 {
300         int ret;
301
302         ret = vmsplice_io_u_in(td, io_u);
303         if (ret > 0)
304                 return splice_out(td, io_u, ret);
305
306         return ret;
307 }
308 #else
309 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
310 {
311         errno = EOPNOTSUPP;
312         return -1;
313 }
314
315 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
316 {
317         errno = EOPNOTSUPP;
318         return -1;
319 }
320 #endif
321
322 static int fio_netio_send(struct thread_data *td, struct io_u *io_u)
323 {
324         struct netio_data *nd = td->io_ops->data;
325         struct netio_options *o = td->eo;
326         int ret, flags = 0;
327
328         do {
329                 if (o->proto == FIO_TYPE_UDP) {
330                         struct sockaddr *to = (struct sockaddr *) &nd->addr;
331
332                         ret = sendto(io_u->file->fd, io_u->xfer_buf,
333                                         io_u->xfer_buflen, flags, to,
334                                         sizeof(*to));
335                 } else {
336                         /*
337                          * if we are going to write more, set MSG_MORE
338                          */
339 #ifdef MSG_MORE
340                         if ((td->this_io_bytes[DDIR_WRITE] + io_u->xfer_buflen <
341                             td->o.size) && !o->pingpong)
342                                 flags |= MSG_MORE;
343 #endif
344                         ret = send(io_u->file->fd, io_u->xfer_buf,
345                                         io_u->xfer_buflen, flags);
346                 }
347                 if (ret > 0)
348                         break;
349
350                 ret = poll_wait(td, io_u->file->fd, POLLOUT);
351                 if (ret <= 0)
352                         break;
353         } while (1);
354
355         return ret;
356 }
357
358 static int is_udp_close(struct io_u *io_u, int len)
359 {
360         struct udp_close_msg *msg;
361
362         if (len != sizeof(struct udp_close_msg))
363                 return 0;
364
365         msg = io_u->xfer_buf;
366         if (ntohl(msg->magic) != FIO_LINK_OPEN_CLOSE_MAGIC)
367                 return 0;
368         if (ntohl(msg->cmd) != FIO_LINK_CLOSE)
369                 return 0;
370
371         return 1;
372 }
373
374 static int fio_netio_recv(struct thread_data *td, struct io_u *io_u)
375 {
376         struct netio_data *nd = td->io_ops->data;
377         struct netio_options *o = td->eo;
378         int ret, flags = 0;
379
380         do {
381                 if (o->proto == FIO_TYPE_UDP) {
382                         socklen_t len = sizeof(nd->addr);
383                         struct sockaddr *from = (struct sockaddr *) &nd->addr;
384
385                         ret = recvfrom(io_u->file->fd, io_u->xfer_buf,
386                                         io_u->xfer_buflen, flags, from, &len);
387                         if (is_udp_close(io_u, ret)) {
388                                 td->done = 1;
389                                 return 0;
390                         }
391                 } else {
392                         ret = recv(io_u->file->fd, io_u->xfer_buf,
393                                         io_u->xfer_buflen, flags);
394                 }
395                 if (ret > 0)
396                         break;
397                 else if (!ret && (flags & MSG_WAITALL))
398                         break;
399
400                 ret = poll_wait(td, io_u->file->fd, POLLIN);
401                 if (ret <= 0)
402                         break;
403                 flags |= MSG_WAITALL;
404         } while (1);
405
406         return ret;
407 }
408
409 static int __fio_netio_queue(struct thread_data *td, struct io_u *io_u,
410                              enum fio_ddir ddir)
411 {
412         struct netio_data *nd = td->io_ops->data;
413         struct netio_options *o = td->eo;
414         int ret;
415
416         if (ddir == DDIR_WRITE) {
417                 if (!nd->use_splice || o->proto == FIO_TYPE_UDP ||
418                     o->proto == FIO_TYPE_UNIX)
419                         ret = fio_netio_send(td, io_u);
420                 else
421                         ret = fio_netio_splice_out(td, io_u);
422         } else if (ddir == DDIR_READ) {
423                 if (!nd->use_splice || o->proto == FIO_TYPE_UDP ||
424                     o->proto == FIO_TYPE_UNIX)
425                         ret = fio_netio_recv(td, io_u);
426                 else
427                         ret = fio_netio_splice_in(td, io_u);
428         } else
429                 ret = 0;        /* must be a SYNC */
430
431         if (ret != (int) io_u->xfer_buflen) {
432                 if (ret >= 0) {
433                         io_u->resid = io_u->xfer_buflen - ret;
434                         io_u->error = 0;
435                         return FIO_Q_COMPLETED;
436                 } else {
437                         int err = errno;
438
439                         if (ddir == DDIR_WRITE && err == EMSGSIZE)
440                                 return FIO_Q_BUSY;
441
442                         io_u->error = err;
443                 }
444         }
445
446         if (io_u->error)
447                 td_verror(td, io_u->error, "xfer");
448
449         return FIO_Q_COMPLETED;
450 }
451
452 static int fio_netio_queue(struct thread_data *td, struct io_u *io_u)
453 {
454         struct netio_options *o = td->eo;
455         int ret;
456
457         fio_ro_check(td, io_u);
458
459         ret = __fio_netio_queue(td, io_u, io_u->ddir);
460         if (!o->pingpong || ret != FIO_Q_COMPLETED)
461                 return ret;
462
463         /*
464          * For ping-pong mode, receive or send reply as needed
465          */
466         if (td_read(td) && io_u->ddir == DDIR_READ)
467                 ret = __fio_netio_queue(td, io_u, DDIR_WRITE);
468         else if (td_write(td) && io_u->ddir == DDIR_WRITE)
469                 ret = __fio_netio_queue(td, io_u, DDIR_READ);
470
471         return ret;
472 }
473
474 static int fio_netio_connect(struct thread_data *td, struct fio_file *f)
475 {
476         struct netio_data *nd = td->io_ops->data;
477         struct netio_options *o = td->eo;
478         int type, domain;
479
480         if (o->proto == FIO_TYPE_TCP) {
481                 domain = AF_INET;
482                 type = SOCK_STREAM;
483         } else if (o->proto == FIO_TYPE_UDP) {
484                 domain = AF_INET;
485                 type = SOCK_DGRAM;
486         } else if (o->proto == FIO_TYPE_UNIX) {
487                 domain = AF_UNIX;
488                 type = SOCK_STREAM;
489         } else {
490                 log_err("fio: bad network type %d\n", o->proto);
491                 f->fd = -1;
492                 return 1;
493         }
494
495         f->fd = socket(domain, type, 0);
496         if (f->fd < 0) {
497                 td_verror(td, errno, "socket");
498                 return 1;
499         }
500
501 #ifdef CONFIG_TCP_NODELAY
502         if (o->nodelay && o->proto == FIO_TYPE_TCP) {
503                 int optval = 1;
504
505                 if (setsockopt(f->fd, IPPROTO_TCP, TCP_NODELAY, (void *) &optval, sizeof(int)) < 0) {
506                         log_err("fio: cannot set TCP_NODELAY option on socket (%s), disable with 'nodelay=0'\n", strerror(errno));
507                         return 1;
508                 }
509         }
510 #endif
511
512         if (o->proto == FIO_TYPE_UDP)
513                 return 0;
514         else if (o->proto == FIO_TYPE_TCP) {
515                 socklen_t len = sizeof(nd->addr);
516
517                 if (connect(f->fd, (struct sockaddr *) &nd->addr, len) < 0) {
518                         td_verror(td, errno, "connect");
519                         close(f->fd);
520                         return 1;
521                 }
522         } else {
523                 struct sockaddr_un *addr = &nd->addr_un;
524                 socklen_t len;
525
526                 len = sizeof(addr->sun_family) + strlen(addr->sun_path) + 1;
527
528                 if (connect(f->fd, (struct sockaddr *) addr, len) < 0) {
529                         td_verror(td, errno, "connect");
530                         close(f->fd);
531                         return 1;
532                 }
533         }
534
535         return 0;
536 }
537
538 static int fio_netio_accept(struct thread_data *td, struct fio_file *f)
539 {
540         struct netio_data *nd = td->io_ops->data;
541         struct netio_options *o = td->eo;
542         socklen_t socklen = sizeof(nd->addr);
543         int state;
544
545         if (o->proto == FIO_TYPE_UDP) {
546                 f->fd = nd->listenfd;
547                 return 0;
548         }
549
550         state = td->runstate;
551         td_set_runstate(td, TD_SETTING_UP);
552
553         log_info("fio: waiting for connection\n");
554
555         if (poll_wait(td, nd->listenfd, POLLIN) < 0)
556                 goto err;
557
558         f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr, &socklen);
559         if (f->fd < 0) {
560                 td_verror(td, errno, "accept");
561                 goto err;
562         }
563
564 #ifdef CONFIG_TCP_NODELAY
565         if (o->nodelay && o->proto == FIO_TYPE_TCP) {
566                 int optval = 1;
567
568                 if (setsockopt(f->fd, IPPROTO_TCP, TCP_NODELAY, (void *) &optval, sizeof(int)) < 0) {
569                         log_err("fio: cannot set TCP_NODELAY option on socket (%s), disable with 'nodelay=0'\n", strerror(errno));
570                         return 1;
571                 }
572         }
573 #endif
574
575         reset_all_stats(td);
576         td_set_runstate(td, state);
577         return 0;
578 err:
579         td_set_runstate(td, state);
580         return 1;
581 }
582
583 static void fio_netio_udp_close(struct thread_data *td, struct fio_file *f)
584 {
585         struct netio_data *nd = td->io_ops->data;
586         struct udp_close_msg msg;
587         struct sockaddr *to = (struct sockaddr *) &nd->addr;
588         int ret;
589
590         msg.magic = htonl(FIO_LINK_OPEN_CLOSE_MAGIC);
591         msg.cmd = htonl(FIO_LINK_CLOSE);
592
593         ret = sendto(f->fd, (void *) &msg, sizeof(msg), MSG_WAITALL, to,
594                         sizeof(nd->addr));
595         if (ret < 0)
596                 td_verror(td, errno, "sendto udp link close");
597 }
598
599 static int fio_netio_close_file(struct thread_data *td, struct fio_file *f)
600 {
601         struct netio_options *o = td->eo;
602
603         /*
604          * If this is an UDP connection, notify the receiver that we are
605          * closing down the link
606          */
607         if (o->proto == FIO_TYPE_UDP)
608                 fio_netio_udp_close(td, f);
609
610         return generic_close_file(td, f);
611 }
612
613 static int fio_netio_udp_recv_open(struct thread_data *td, struct fio_file *f)
614 {
615         struct netio_data *nd = td->io_ops->data;
616         struct udp_close_msg msg;
617         struct sockaddr *to = (struct sockaddr *) &nd->addr;
618         socklen_t len = sizeof(nd->addr);
619         int ret;
620
621         ret = recvfrom(f->fd, (void *) &msg, sizeof(msg), MSG_WAITALL, to, &len);
622         if (ret < 0) {
623                 td_verror(td, errno, "sendto udp link open");
624                 return ret;
625         }
626
627         if (ntohl(msg.magic) != FIO_LINK_OPEN_CLOSE_MAGIC ||
628             ntohl(msg.cmd) != FIO_LINK_OPEN) {
629                 log_err("fio: bad udp open magic %x/%x\n", ntohl(msg.magic),
630                                                                 ntohl(msg.cmd));
631                 return -1;
632         }
633
634         return 0;
635 }
636
637 static int fio_netio_udp_send_open(struct thread_data *td, struct fio_file *f)
638 {
639         struct netio_data *nd = td->io_ops->data;
640         struct udp_close_msg msg;
641         struct sockaddr *to = (struct sockaddr *) &nd->addr;
642         int ret;
643
644         msg.magic = htonl(FIO_LINK_OPEN_CLOSE_MAGIC);
645         msg.cmd = htonl(FIO_LINK_OPEN);
646
647         ret = sendto(f->fd, (void *) &msg, sizeof(msg), MSG_WAITALL, to,
648                         sizeof(nd->addr));
649         if (ret < 0) {
650                 td_verror(td, errno, "sendto udp link open");
651                 return ret;
652         }
653
654         return 0;
655 }
656
657 static int fio_netio_open_file(struct thread_data *td, struct fio_file *f)
658 {
659         int ret;
660         struct netio_options *o = td->eo;
661
662         if (o->listen)
663                 ret = fio_netio_accept(td, f);
664         else
665                 ret = fio_netio_connect(td, f);
666
667         if (ret) {
668                 f->fd = -1;
669                 return ret;
670         }
671
672         if (o->proto == FIO_TYPE_UDP) {
673                 if (td_write(td))
674                         ret = fio_netio_udp_send_open(td, f);
675                 else {
676                         int state;
677
678                         state = td->runstate;
679                         td_set_runstate(td, TD_SETTING_UP);
680                         ret = fio_netio_udp_recv_open(td, f);
681                         td_set_runstate(td, state);
682                 }
683         }
684
685         if (ret)
686                 fio_netio_close_file(td, f);
687
688         return ret;
689 }
690
691 static int fio_netio_setup_connect_inet(struct thread_data *td,
692                                         const char *host, unsigned short port)
693 {
694         struct netio_data *nd = td->io_ops->data;
695
696         if (!host) {
697                 log_err("fio: connect with no host to connect to.\n");
698                 if (td_read(td))
699                         log_err("fio: did you forget to set 'listen'?\n");
700
701                 td_verror(td, EINVAL, "no hostname= set");
702                 return 1;
703         }
704
705         nd->addr.sin_family = AF_INET;
706         nd->addr.sin_port = htons(port);
707
708         if (inet_aton(host, &nd->addr.sin_addr) != 1) {
709                 struct hostent *hent;
710
711                 hent = gethostbyname(host);
712                 if (!hent) {
713                         td_verror(td, errno, "gethostbyname");
714                         return 1;
715                 }
716
717                 memcpy(&nd->addr.sin_addr, hent->h_addr, 4);
718         }
719
720         return 0;
721 }
722
723 static int fio_netio_setup_connect_unix(struct thread_data *td,
724                                         const char *path)
725 {
726         struct netio_data *nd = td->io_ops->data;
727         struct sockaddr_un *soun = &nd->addr_un;
728
729         soun->sun_family = AF_UNIX;
730         strcpy(soun->sun_path, path);
731         return 0;
732 }
733
734 static int fio_netio_setup_connect(struct thread_data *td)
735 {
736         struct netio_options *o = td->eo;
737
738         if (o->proto == FIO_TYPE_UDP || o->proto == FIO_TYPE_TCP)
739                 return fio_netio_setup_connect_inet(td, td->o.filename,o->port);
740         else
741                 return fio_netio_setup_connect_unix(td, td->o.filename);
742 }
743
744 static int fio_netio_setup_listen_unix(struct thread_data *td, const char *path)
745 {
746         struct netio_data *nd = td->io_ops->data;
747         struct sockaddr_un *addr = &nd->addr_un;
748         mode_t mode;
749         int len, fd;
750
751         fd = socket(AF_UNIX, SOCK_STREAM, 0);
752         if (fd < 0) {
753                 log_err("fio: socket: %s\n", strerror(errno));
754                 return -1;
755         }
756
757         mode = umask(000);
758
759         memset(addr, 0, sizeof(*addr));
760         addr->sun_family = AF_UNIX;
761         strcpy(addr->sun_path, path);
762         unlink(path);
763
764         len = sizeof(addr->sun_family) + strlen(path) + 1;
765
766         if (bind(fd, (struct sockaddr *) addr, len) < 0) {
767                 log_err("fio: bind: %s\n", strerror(errno));
768                 close(fd);
769                 return -1;
770         }
771
772         umask(mode);
773         nd->listenfd = fd;
774         return 0;
775 }
776
777 static int fio_netio_setup_listen_inet(struct thread_data *td, short port)
778 {
779         struct netio_data *nd = td->io_ops->data;
780         struct netio_options *o = td->eo;
781         int fd, opt, type;
782
783         if (o->proto == FIO_TYPE_TCP)
784                 type = SOCK_STREAM;
785         else
786                 type = SOCK_DGRAM;
787
788         fd = socket(AF_INET, type, 0);
789         if (fd < 0) {
790                 td_verror(td, errno, "socket");
791                 return 1;
792         }
793
794         opt = 1;
795         if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *) &opt, sizeof(opt)) < 0) {
796                 td_verror(td, errno, "setsockopt");
797                 return 1;
798         }
799 #ifdef SO_REUSEPORT
800         if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, (void *) &opt, sizeof(opt)) < 0) {
801                 td_verror(td, errno, "setsockopt");
802                 return 1;
803         }
804 #endif
805
806         nd->addr.sin_family = AF_INET;
807         nd->addr.sin_addr.s_addr = htonl(INADDR_ANY);
808         nd->addr.sin_port = htons(port);
809
810         if (bind(fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
811                 td_verror(td, errno, "bind");
812                 return 1;
813         }
814
815         nd->listenfd = fd;
816         return 0;
817 }
818
819 static int fio_netio_setup_listen(struct thread_data *td)
820 {
821         struct netio_data *nd = td->io_ops->data;
822         struct netio_options *o = td->eo;
823         int ret;
824
825         if (o->proto == FIO_TYPE_UDP || o->proto == FIO_TYPE_TCP)
826                 ret = fio_netio_setup_listen_inet(td, o->port);
827         else
828                 ret = fio_netio_setup_listen_unix(td, td->o.filename);
829
830         if (ret)
831                 return ret;
832         if (o->proto == FIO_TYPE_UDP)
833                 return 0;
834
835         if (listen(nd->listenfd, 10) < 0) {
836                 td_verror(td, errno, "listen");
837                 nd->listenfd = -1;
838                 return 1;
839         }
840
841         return 0;
842 }
843
844 static int fio_netio_init(struct thread_data *td)
845 {
846         struct netio_options *o = td->eo;
847         int ret;
848
849 #ifdef WIN32
850         WSADATA wsd;
851         WSAStartup(MAKEWORD(2,2), &wsd);
852 #endif
853
854         if (td_random(td)) {
855                 log_err("fio: network IO can't be random\n");
856                 return 1;
857         }
858
859         if (o->proto == FIO_TYPE_UNIX && o->port) {
860                 log_err("fio: network IO port not valid with unix socket\n");
861                 return 1;
862         } else if (o->proto != FIO_TYPE_UNIX && !o->port) {
863                 log_err("fio: network IO requires port for tcp or udp\n");
864                 return 1;
865         }
866
867         if (o->proto != FIO_TYPE_TCP) {
868                 if (o->listen) {
869                         log_err("fio: listen only valid for TCP proto IO\n");
870                         return 1;
871                 }
872                 if (td_rw(td)) {
873                         log_err("fio: datagram network connections must be"
874                                    " read OR write\n");
875                         return 1;
876                 }
877                 if (o->proto == FIO_TYPE_UNIX && !td->o.filename) {
878                         log_err("fio: UNIX sockets need host/filename\n");
879                         return 1;
880                 }
881                 o->listen = td_read(td);
882         }
883
884         if (o->proto != FIO_TYPE_UNIX && o->listen && td->o.filename) {
885                 log_err("fio: hostname not valid for inbound network IO\n");
886                 return 1;
887         }
888
889         if (o->listen)
890                 ret = fio_netio_setup_listen(td);
891         else
892                 ret = fio_netio_setup_connect(td);
893
894         return ret;
895 }
896
897 static void fio_netio_cleanup(struct thread_data *td)
898 {
899         struct netio_data *nd = td->io_ops->data;
900
901         if (nd) {
902                 if (nd->listenfd != -1)
903                         close(nd->listenfd);
904                 if (nd->pipes[0] != -1)
905                         close(nd->pipes[0]);
906                 if (nd->pipes[1] != -1)
907                         close(nd->pipes[1]);
908
909                 free(nd);
910         }
911 }
912
913 static int fio_netio_setup(struct thread_data *td)
914 {
915         struct netio_data *nd;
916
917         if (!td->files_index) {
918                 add_file(td, td->o.filename ?: "net");
919                 td->o.nr_files = td->o.nr_files ?: 1;
920         }
921
922         if (!td->io_ops->data) {
923                 nd = malloc(sizeof(*nd));;
924
925                 memset(nd, 0, sizeof(*nd));
926                 nd->listenfd = -1;
927                 nd->pipes[0] = nd->pipes[1] = -1;
928                 td->io_ops->data = nd;
929         }
930
931         return 0;
932 }
933
934 static void fio_netio_terminate(struct thread_data *td)
935 {
936         kill(td->pid, SIGUSR2);
937 }
938
939 #ifdef CONFIG_LINUX_SPLICE
940 static int fio_netio_setup_splice(struct thread_data *td)
941 {
942         struct netio_data *nd;
943
944         fio_netio_setup(td);
945
946         nd = td->io_ops->data;
947         if (nd) {
948                 if (pipe(nd->pipes) < 0)
949                         return 1;
950
951                 nd->use_splice = 1;
952                 return 0;
953         }
954
955         return 1;
956 }
957
958 static struct ioengine_ops ioengine_splice = {
959         .name                   = "netsplice",
960         .version                = FIO_IOOPS_VERSION,
961         .prep                   = fio_netio_prep,
962         .queue                  = fio_netio_queue,
963         .setup                  = fio_netio_setup_splice,
964         .init                   = fio_netio_init,
965         .cleanup                = fio_netio_cleanup,
966         .open_file              = fio_netio_open_file,
967         .close_file             = fio_netio_close_file,
968         .terminate              = fio_netio_terminate,
969         .options                = options,
970         .option_struct_size     = sizeof(struct netio_options),
971         .flags                  = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
972                                   FIO_PIPEIO,
973 };
974 #endif
975
976 static struct ioengine_ops ioengine_rw = {
977         .name                   = "net",
978         .version                = FIO_IOOPS_VERSION,
979         .prep                   = fio_netio_prep,
980         .queue                  = fio_netio_queue,
981         .setup                  = fio_netio_setup,
982         .init                   = fio_netio_init,
983         .cleanup                = fio_netio_cleanup,
984         .open_file              = fio_netio_open_file,
985         .close_file             = fio_netio_close_file,
986         .terminate              = fio_netio_terminate,
987         .options                = options,
988         .option_struct_size     = sizeof(struct netio_options),
989         .flags                  = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
990                                   FIO_PIPEIO | FIO_BIT_BASED,
991 };
992
993 static int str_hostname_cb(void *data, const char *input)
994 {
995         struct netio_options *o = data;
996
997         if (o->td->o.filename)
998                 free(o->td->o.filename);
999         o->td->o.filename = strdup(input);
1000         return 0;
1001 }
1002
1003 static void fio_init fio_netio_register(void)
1004 {
1005         register_ioengine(&ioengine_rw);
1006 #ifdef CONFIG_LINUX_SPLICE
1007         register_ioengine(&ioengine_splice);
1008 #endif
1009 }
1010
1011 static void fio_exit fio_netio_unregister(void)
1012 {
1013         unregister_ioengine(&ioengine_rw);
1014 #ifdef CONFIG_LINUX_SPLICE
1015         unregister_ioengine(&ioengine_splice);
1016 #endif
1017 }