net: Allow setting network interface to use for multicast
[fio.git] / engines / net.c
1 /*
2  * net engine
3  *
4  * IO engine that reads/writes to/from sockets.
5  *
6  */
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include <unistd.h>
10 #include <signal.h>
11 #include <errno.h>
12 #include <assert.h>
13 #include <netinet/in.h>
14 #include <netinet/tcp.h>
15 #include <arpa/inet.h>
16 #include <netdb.h>
17 #include <sys/poll.h>
18 #include <sys/types.h>
19 #include <sys/stat.h>
20 #include <sys/socket.h>
21 #include <sys/un.h>
22
23 #include "../fio.h"
24
25 struct netio_data {
26         int listenfd;
27         int use_splice;
28         int pipes[2];
29         struct sockaddr_in addr;
30         struct sockaddr_un addr_un;
31 };
32
33 struct netio_options {
34         struct thread_data *td;
35         unsigned int port;
36         unsigned int proto;
37         unsigned int listen;
38         unsigned int pingpong;
39         unsigned int nodelay;
40         char * interface;
41 };
42
43 struct udp_close_msg {
44         uint32_t magic;
45         uint32_t cmd;
46 };
47
48 enum {
49         FIO_LINK_CLOSE = 0x89,
50         FIO_LINK_OPEN_CLOSE_MAGIC = 0x6c696e6b,
51         FIO_LINK_OPEN = 0x98,
52
53         FIO_TYPE_TCP    = 1,
54         FIO_TYPE_UDP    = 2,
55         FIO_TYPE_UNIX   = 3,
56 };
57
58 static int str_hostname_cb(void *data, const char *input);
59 static struct fio_option options[] = {
60         {
61                 .name   = "hostname",
62                 .lname  = "net engine hostname",
63                 .type   = FIO_OPT_STR_STORE,
64                 .cb     = str_hostname_cb,
65                 .help   = "Hostname for net IO engine",
66                 .category = FIO_OPT_C_ENGINE,
67                 .group  = FIO_OPT_G_NETIO,
68         },
69         {
70                 .name   = "port",
71                 .lname  = "net engine port",
72                 .type   = FIO_OPT_INT,
73                 .off1   = offsetof(struct netio_options, port),
74                 .minval = 1,
75                 .maxval = 65535,
76                 .help   = "Port to use for TCP or UDP net connections",
77                 .category = FIO_OPT_C_ENGINE,
78                 .group  = FIO_OPT_G_NETIO,
79         },
80         {
81                 .name   = "protocol",
82                 .lname  = "net engine protocol",
83                 .alias  = "proto",
84                 .type   = FIO_OPT_STR,
85                 .off1   = offsetof(struct netio_options, proto),
86                 .help   = "Network protocol to use",
87                 .def    = "tcp",
88                 .posval = {
89                           { .ival = "tcp",
90                             .oval = FIO_TYPE_TCP,
91                             .help = "Transmission Control Protocol",
92                           },
93                           { .ival = "udp",
94                             .oval = FIO_TYPE_UDP,
95                             .help = "User Datagram Protocol",
96                           },
97                           { .ival = "unix",
98                             .oval = FIO_TYPE_UNIX,
99                             .help = "UNIX domain socket",
100                           },
101                 },
102                 .category = FIO_OPT_C_ENGINE,
103                 .group  = FIO_OPT_G_NETIO,
104         },
105 #ifdef CONFIG_TCP_NODELAY
106         {
107                 .name   = "nodelay",
108                 .type   = FIO_OPT_BOOL,
109                 .off1   = offsetof(struct netio_options, nodelay),
110                 .help   = "Use TCP_NODELAY on TCP connections",
111                 .category = FIO_OPT_C_ENGINE,
112                 .group  = FIO_OPT_G_NETIO,
113         },
114 #endif
115         {
116                 .name   = "listen",
117                 .lname  = "net engine listen",
118                 .type   = FIO_OPT_STR_SET,
119                 .off1   = offsetof(struct netio_options, listen),
120                 .help   = "Listen for incoming TCP connections",
121                 .category = FIO_OPT_C_ENGINE,
122                 .group  = FIO_OPT_G_NETIO,
123         },
124         {
125                 .name   = "pingpong",
126                 .type   = FIO_OPT_STR_SET,
127                 .off1   = offsetof(struct netio_options, pingpong),
128                 .help   = "Ping-pong IO requests",
129                 .category = FIO_OPT_C_ENGINE,
130                 .group  = FIO_OPT_G_NETIO,
131         },
132         {
133                 .name   = "interface",
134                 .lname  = "net engine interface",
135                 .type   = FIO_OPT_STR_STORE,
136                 .off1   = offsetof(struct netio_options, interface),
137                 .help   = "Network interface to use",
138                 .category = FIO_OPT_C_ENGINE,
139                 .group  = FIO_OPT_G_NETIO,
140         },
141         {
142                 .name   = NULL,
143         },
144 };
145
146 /*
147  * Return -1 for error and 'nr events' for a positive number
148  * of events
149  */
150 static int poll_wait(struct thread_data *td, int fd, short events)
151 {
152         struct pollfd pfd;
153         int ret;
154
155         while (!td->terminate) {
156                 pfd.fd = fd;
157                 pfd.events = events;
158                 ret = poll(&pfd, 1, -1);
159                 if (ret < 0) {
160                         if (errno == EINTR)
161                                 break;
162
163                         td_verror(td, errno, "poll");
164                         return -1;
165                 } else if (!ret)
166                         continue;
167
168                 break;
169         }
170
171         if (pfd.revents & events)
172                 return 1;
173
174         return -1;
175 }
176
177 static int fio_netio_is_multicast(const char *mcaddr)
178 {
179         in_addr_t addr = inet_network(mcaddr);
180         if (addr == -1)
181                 return 0;
182
183         if (inet_network("224.0.0.0") <= addr &&
184             inet_network("239.255.255.255") >= addr)
185                 return 1;
186
187         return 0;
188 }
189
190
191 static int fio_netio_prep(struct thread_data *td, struct io_u *io_u)
192 {
193         struct netio_options *o = td->eo;
194
195         /*
196          * Make sure we don't see spurious reads to a receiver, and vice versa
197          */
198         if (o->proto == FIO_TYPE_TCP)
199                 return 0;
200
201         if ((o->listen && io_u->ddir == DDIR_WRITE) ||
202             (!o->listen && io_u->ddir == DDIR_READ)) {
203                 td_verror(td, EINVAL, "bad direction");
204                 return 1;
205         }
206
207         return 0;
208 }
209
210 #ifdef CONFIG_LINUX_SPLICE
211 static int splice_io_u(int fdin, int fdout, unsigned int len)
212 {
213         int bytes = 0;
214
215         while (len) {
216                 int ret = splice(fdin, NULL, fdout, NULL, len, 0);
217
218                 if (ret < 0) {
219                         if (!bytes)
220                                 bytes = ret;
221
222                         break;
223                 } else if (!ret)
224                         break;
225
226                 bytes += ret;
227                 len -= ret;
228         }
229
230         return bytes;
231 }
232
233 /*
234  * Receive bytes from a socket and fill them into the internal pipe
235  */
236 static int splice_in(struct thread_data *td, struct io_u *io_u)
237 {
238         struct netio_data *nd = td->io_ops->data;
239
240         return splice_io_u(io_u->file->fd, nd->pipes[1], io_u->xfer_buflen);
241 }
242
243 /*
244  * Transmit 'len' bytes from the internal pipe
245  */
246 static int splice_out(struct thread_data *td, struct io_u *io_u,
247                       unsigned int len)
248 {
249         struct netio_data *nd = td->io_ops->data;
250
251         return splice_io_u(nd->pipes[0], io_u->file->fd, len);
252 }
253
254 static int vmsplice_io_u(struct io_u *io_u, int fd, unsigned int len)
255 {
256         struct iovec iov = {
257                 .iov_base = io_u->xfer_buf,
258                 .iov_len = len,
259         };
260         int bytes = 0;
261
262         while (iov.iov_len) {
263                 int ret = vmsplice(fd, &iov, 1, SPLICE_F_MOVE);
264
265                 if (ret < 0) {
266                         if (!bytes)
267                                 bytes = ret;
268                         break;
269                 } else if (!ret)
270                         break;
271
272                 iov.iov_len -= ret;
273                 iov.iov_base += ret;
274                 bytes += ret;
275         }
276
277         return bytes;
278
279 }
280
281 /*
282  * vmsplice() pipe to io_u buffer
283  */
284 static int vmsplice_io_u_out(struct thread_data *td, struct io_u *io_u,
285                              unsigned int len)
286 {
287         struct netio_data *nd = td->io_ops->data;
288
289         return vmsplice_io_u(io_u, nd->pipes[0], len);
290 }
291
292 /*
293  * vmsplice() io_u to pipe
294  */
295 static int vmsplice_io_u_in(struct thread_data *td, struct io_u *io_u)
296 {
297         struct netio_data *nd = td->io_ops->data;
298
299         return vmsplice_io_u(io_u, nd->pipes[1], io_u->xfer_buflen);
300 }
301
302 /*
303  * splice receive - transfer socket data into a pipe using splice, then map
304  * that pipe data into the io_u using vmsplice.
305  */
306 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
307 {
308         int ret;
309
310         ret = splice_in(td, io_u);
311         if (ret > 0)
312                 return vmsplice_io_u_out(td, io_u, ret);
313
314         return ret;
315 }
316
317 /*
318  * splice transmit - map data from the io_u into a pipe by using vmsplice,
319  * then transfer that pipe to a socket using splice.
320  */
321 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
322 {
323         int ret;
324
325         ret = vmsplice_io_u_in(td, io_u);
326         if (ret > 0)
327                 return splice_out(td, io_u, ret);
328
329         return ret;
330 }
331 #else
332 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
333 {
334         errno = EOPNOTSUPP;
335         return -1;
336 }
337
338 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
339 {
340         errno = EOPNOTSUPP;
341         return -1;
342 }
343 #endif
344
345 static int fio_netio_send(struct thread_data *td, struct io_u *io_u)
346 {
347         struct netio_data *nd = td->io_ops->data;
348         struct netio_options *o = td->eo;
349         int ret, flags = 0;
350
351         do {
352                 if (o->proto == FIO_TYPE_UDP) {
353                         struct sockaddr *to = (struct sockaddr *) &nd->addr;
354
355                         ret = sendto(io_u->file->fd, io_u->xfer_buf,
356                                         io_u->xfer_buflen, flags, to,
357                                         sizeof(*to));
358                 } else {
359                         /*
360                          * if we are going to write more, set MSG_MORE
361                          */
362 #ifdef MSG_MORE
363                         if ((td->this_io_bytes[DDIR_WRITE] + io_u->xfer_buflen <
364                             td->o.size) && !o->pingpong)
365                                 flags |= MSG_MORE;
366 #endif
367                         ret = send(io_u->file->fd, io_u->xfer_buf,
368                                         io_u->xfer_buflen, flags);
369                 }
370                 if (ret > 0)
371                         break;
372
373                 ret = poll_wait(td, io_u->file->fd, POLLOUT);
374                 if (ret <= 0)
375                         break;
376         } while (1);
377
378         return ret;
379 }
380
381 static int is_udp_close(struct io_u *io_u, int len)
382 {
383         struct udp_close_msg *msg;
384
385         if (len != sizeof(struct udp_close_msg))
386                 return 0;
387
388         msg = io_u->xfer_buf;
389         if (ntohl(msg->magic) != FIO_LINK_OPEN_CLOSE_MAGIC)
390                 return 0;
391         if (ntohl(msg->cmd) != FIO_LINK_CLOSE)
392                 return 0;
393
394         return 1;
395 }
396
397 static int fio_netio_recv(struct thread_data *td, struct io_u *io_u)
398 {
399         struct netio_data *nd = td->io_ops->data;
400         struct netio_options *o = td->eo;
401         int ret, flags = 0;
402
403         do {
404                 if (o->proto == FIO_TYPE_UDP) {
405                         socklen_t l;
406                         socklen_t *len = &l;
407                         struct sockaddr *from;
408
409                         if (o->listen) {
410                                 from = (struct sockaddr *) &nd->addr;
411                                 *len = sizeof(nd->addr);
412                         } else {
413                                 from = NULL;
414                                 len = NULL;
415                         }
416
417                         ret = recvfrom(io_u->file->fd, io_u->xfer_buf,
418                                         io_u->xfer_buflen, flags, from, len);
419                         if (is_udp_close(io_u, ret)) {
420                                 td->done = 1;
421                                 return 0;
422                         }
423                 } else {
424                         ret = recv(io_u->file->fd, io_u->xfer_buf,
425                                         io_u->xfer_buflen, flags);
426                 }
427                 if (ret > 0)
428                         break;
429                 else if (!ret && (flags & MSG_WAITALL))
430                         break;
431
432                 ret = poll_wait(td, io_u->file->fd, POLLIN);
433                 if (ret <= 0)
434                         break;
435                 flags |= MSG_WAITALL;
436         } while (1);
437
438         return ret;
439 }
440
441 static int __fio_netio_queue(struct thread_data *td, struct io_u *io_u,
442                              enum fio_ddir ddir)
443 {
444         struct netio_data *nd = td->io_ops->data;
445         struct netio_options *o = td->eo;
446         int ret;
447
448         if (ddir == DDIR_WRITE) {
449                 if (!nd->use_splice || o->proto == FIO_TYPE_UDP ||
450                     o->proto == FIO_TYPE_UNIX)
451                         ret = fio_netio_send(td, io_u);
452                 else
453                         ret = fio_netio_splice_out(td, io_u);
454         } else if (ddir == DDIR_READ) {
455                 if (!nd->use_splice || o->proto == FIO_TYPE_UDP ||
456                     o->proto == FIO_TYPE_UNIX)
457                         ret = fio_netio_recv(td, io_u);
458                 else
459                         ret = fio_netio_splice_in(td, io_u);
460         } else
461                 ret = 0;        /* must be a SYNC */
462
463         if (ret != (int) io_u->xfer_buflen) {
464                 if (ret >= 0) {
465                         io_u->resid = io_u->xfer_buflen - ret;
466                         io_u->error = 0;
467                         return FIO_Q_COMPLETED;
468                 } else {
469                         int err = errno;
470
471                         if (ddir == DDIR_WRITE && err == EMSGSIZE)
472                                 return FIO_Q_BUSY;
473
474                         io_u->error = err;
475                 }
476         }
477
478         if (io_u->error)
479                 td_verror(td, io_u->error, "xfer");
480
481         return FIO_Q_COMPLETED;
482 }
483
484 static int fio_netio_queue(struct thread_data *td, struct io_u *io_u)
485 {
486         struct netio_options *o = td->eo;
487         int ret;
488
489         fio_ro_check(td, io_u);
490
491         ret = __fio_netio_queue(td, io_u, io_u->ddir);
492         if (!o->pingpong || ret != FIO_Q_COMPLETED)
493                 return ret;
494
495         /*
496          * For ping-pong mode, receive or send reply as needed
497          */
498         if (td_read(td) && io_u->ddir == DDIR_READ)
499                 ret = __fio_netio_queue(td, io_u, DDIR_WRITE);
500         else if (td_write(td) && io_u->ddir == DDIR_WRITE)
501                 ret = __fio_netio_queue(td, io_u, DDIR_READ);
502
503         return ret;
504 }
505
506 static int fio_netio_connect(struct thread_data *td, struct fio_file *f)
507 {
508         struct netio_data *nd = td->io_ops->data;
509         struct netio_options *o = td->eo;
510         int type, domain;
511
512         if (o->proto == FIO_TYPE_TCP) {
513                 domain = AF_INET;
514                 type = SOCK_STREAM;
515         } else if (o->proto == FIO_TYPE_UDP) {
516                 domain = AF_INET;
517                 type = SOCK_DGRAM;
518         } else if (o->proto == FIO_TYPE_UNIX) {
519                 domain = AF_UNIX;
520                 type = SOCK_STREAM;
521         } else {
522                 log_err("fio: bad network type %d\n", o->proto);
523                 f->fd = -1;
524                 return 1;
525         }
526
527         f->fd = socket(domain, type, 0);
528         if (f->fd < 0) {
529                 td_verror(td, errno, "socket");
530                 return 1;
531         }
532
533 #ifdef CONFIG_TCP_NODELAY
534         if (o->nodelay && o->proto == FIO_TYPE_TCP) {
535                 int optval = 1;
536
537                 if (setsockopt(f->fd, IPPROTO_TCP, TCP_NODELAY, (void *) &optval, sizeof(int)) < 0) {
538                         log_err("fio: cannot set TCP_NODELAY option on socket (%s), disable with 'nodelay=0'\n", strerror(errno));
539                         return 1;
540                 }
541         }
542 #endif
543
544         if (o->proto == FIO_TYPE_UDP) {
545                 if (o->interface && fio_netio_is_multicast(td->o.filename)) {
546                         struct in_addr interface_addr;
547                         if (inet_aton(o->interface, &interface_addr) == 0) {
548                                 log_err("fio: interface not valid interface IP\n");
549                                 close(f->fd);
550                                 return 1;
551                         }
552                         if (setsockopt(f->fd, IPPROTO_IP, IP_MULTICAST_IF, &interface_addr, sizeof(interface_addr)) < 0) {
553                                 td_verror(td, errno, "setsockopt IP_MULTICAST_IF");
554                                 close(f->fd);
555                                 return 1;
556                         }
557                 }
558                 return 0;
559         } else if (o->proto == FIO_TYPE_TCP) {
560                 socklen_t len = sizeof(nd->addr);
561
562                 if (connect(f->fd, (struct sockaddr *) &nd->addr, len) < 0) {
563                         td_verror(td, errno, "connect");
564                         close(f->fd);
565                         return 1;
566                 }
567         } else {
568                 struct sockaddr_un *addr = &nd->addr_un;
569                 socklen_t len;
570
571                 len = sizeof(addr->sun_family) + strlen(addr->sun_path) + 1;
572
573                 if (connect(f->fd, (struct sockaddr *) addr, len) < 0) {
574                         td_verror(td, errno, "connect");
575                         close(f->fd);
576                         return 1;
577                 }
578         }
579
580         return 0;
581 }
582
583 static int fio_netio_accept(struct thread_data *td, struct fio_file *f)
584 {
585         struct netio_data *nd = td->io_ops->data;
586         struct netio_options *o = td->eo;
587         socklen_t socklen = sizeof(nd->addr);
588         int state;
589
590         if (o->proto == FIO_TYPE_UDP) {
591                 f->fd = nd->listenfd;
592                 return 0;
593         }
594
595         state = td->runstate;
596         td_set_runstate(td, TD_SETTING_UP);
597
598         log_info("fio: waiting for connection\n");
599
600         if (poll_wait(td, nd->listenfd, POLLIN) < 0)
601                 goto err;
602
603         f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr, &socklen);
604         if (f->fd < 0) {
605                 td_verror(td, errno, "accept");
606                 goto err;
607         }
608
609 #ifdef CONFIG_TCP_NODELAY
610         if (o->nodelay && o->proto == FIO_TYPE_TCP) {
611                 int optval = 1;
612
613                 if (setsockopt(f->fd, IPPROTO_TCP, TCP_NODELAY, (void *) &optval, sizeof(int)) < 0) {
614                         log_err("fio: cannot set TCP_NODELAY option on socket (%s), disable with 'nodelay=0'\n", strerror(errno));
615                         return 1;
616                 }
617         }
618 #endif
619
620         reset_all_stats(td);
621         td_set_runstate(td, state);
622         return 0;
623 err:
624         td_set_runstate(td, state);
625         return 1;
626 }
627
628 static void fio_netio_udp_close(struct thread_data *td, struct fio_file *f)
629 {
630         struct netio_data *nd = td->io_ops->data;
631         struct udp_close_msg msg;
632         struct sockaddr *to = (struct sockaddr *) &nd->addr;
633         int ret;
634
635         msg.magic = htonl(FIO_LINK_OPEN_CLOSE_MAGIC);
636         msg.cmd = htonl(FIO_LINK_CLOSE);
637
638         ret = sendto(f->fd, (void *) &msg, sizeof(msg), MSG_WAITALL, to,
639                         sizeof(nd->addr));
640         if (ret < 0)
641                 td_verror(td, errno, "sendto udp link close");
642 }
643
644 static int fio_netio_close_file(struct thread_data *td, struct fio_file *f)
645 {
646         struct netio_options *o = td->eo;
647
648         /*
649          * If this is an UDP connection, notify the receiver that we are
650          * closing down the link
651          */
652         if (o->proto == FIO_TYPE_UDP)
653                 fio_netio_udp_close(td, f);
654
655         return generic_close_file(td, f);
656 }
657
658 static int fio_netio_udp_recv_open(struct thread_data *td, struct fio_file *f)
659 {
660         struct netio_data *nd = td->io_ops->data;
661         struct udp_close_msg msg;
662         struct sockaddr *to = (struct sockaddr *) &nd->addr;
663         socklen_t len = sizeof(nd->addr);
664         int ret;
665
666         ret = recvfrom(f->fd, (void *) &msg, sizeof(msg), MSG_WAITALL, to, &len);
667         if (ret < 0) {
668                 td_verror(td, errno, "recvfrom udp link open");
669                 return ret;
670         }
671
672         if (ntohl(msg.magic) != FIO_LINK_OPEN_CLOSE_MAGIC ||
673             ntohl(msg.cmd) != FIO_LINK_OPEN) {
674                 log_err("fio: bad udp open magic %x/%x\n", ntohl(msg.magic),
675                                                                 ntohl(msg.cmd));
676                 return -1;
677         }
678
679         return 0;
680 }
681
682 static int fio_netio_udp_send_open(struct thread_data *td, struct fio_file *f)
683 {
684         struct netio_data *nd = td->io_ops->data;
685         struct udp_close_msg msg;
686         struct sockaddr *to = (struct sockaddr *) &nd->addr;
687         int ret;
688
689         msg.magic = htonl(FIO_LINK_OPEN_CLOSE_MAGIC);
690         msg.cmd = htonl(FIO_LINK_OPEN);
691
692         ret = sendto(f->fd, (void *) &msg, sizeof(msg), MSG_WAITALL, to,
693                         sizeof(nd->addr));
694         if (ret < 0) {
695                 td_verror(td, errno, "sendto udp link open");
696                 return ret;
697         }
698
699         return 0;
700 }
701
702 static int fio_netio_open_file(struct thread_data *td, struct fio_file *f)
703 {
704         int ret;
705         struct netio_options *o = td->eo;
706
707         if (o->listen)
708                 ret = fio_netio_accept(td, f);
709         else
710                 ret = fio_netio_connect(td, f);
711
712         if (ret) {
713                 f->fd = -1;
714                 return ret;
715         }
716
717         if (o->proto == FIO_TYPE_UDP) {
718                 if (td_write(td))
719                         ret = fio_netio_udp_send_open(td, f);
720                 else {
721                         int state;
722
723                         state = td->runstate;
724                         td_set_runstate(td, TD_SETTING_UP);
725                         ret = fio_netio_udp_recv_open(td, f);
726                         td_set_runstate(td, state);
727                 }
728         }
729
730         if (ret)
731                 fio_netio_close_file(td, f);
732
733         return ret;
734 }
735
736 static int fio_netio_setup_connect_inet(struct thread_data *td,
737                                         const char *host, unsigned short port)
738 {
739         struct netio_data *nd = td->io_ops->data;
740
741         if (!host) {
742                 log_err("fio: connect with no host to connect to.\n");
743                 if (td_read(td))
744                         log_err("fio: did you forget to set 'listen'?\n");
745
746                 td_verror(td, EINVAL, "no hostname= set");
747                 return 1;
748         }
749
750         nd->addr.sin_family = AF_INET;
751         nd->addr.sin_port = htons(port);
752
753         if (inet_aton(host, &nd->addr.sin_addr) != 1) {
754                 struct hostent *hent;
755
756                 hent = gethostbyname(host);
757                 if (!hent) {
758                         td_verror(td, errno, "gethostbyname");
759                         return 1;
760                 }
761
762                 memcpy(&nd->addr.sin_addr, hent->h_addr, 4);
763         }
764
765         return 0;
766 }
767
768 static int fio_netio_setup_connect_unix(struct thread_data *td,
769                                         const char *path)
770 {
771         struct netio_data *nd = td->io_ops->data;
772         struct sockaddr_un *soun = &nd->addr_un;
773
774         soun->sun_family = AF_UNIX;
775         strcpy(soun->sun_path, path);
776         return 0;
777 }
778
779 static int fio_netio_setup_connect(struct thread_data *td)
780 {
781         struct netio_options *o = td->eo;
782
783         if (o->proto == FIO_TYPE_UDP || o->proto == FIO_TYPE_TCP)
784                 return fio_netio_setup_connect_inet(td, td->o.filename,o->port);
785         else
786                 return fio_netio_setup_connect_unix(td, td->o.filename);
787 }
788
789 static int fio_netio_setup_listen_unix(struct thread_data *td, const char *path)
790 {
791         struct netio_data *nd = td->io_ops->data;
792         struct sockaddr_un *addr = &nd->addr_un;
793         mode_t mode;
794         int len, fd;
795
796         fd = socket(AF_UNIX, SOCK_STREAM, 0);
797         if (fd < 0) {
798                 log_err("fio: socket: %s\n", strerror(errno));
799                 return -1;
800         }
801
802         mode = umask(000);
803
804         memset(addr, 0, sizeof(*addr));
805         addr->sun_family = AF_UNIX;
806         strcpy(addr->sun_path, path);
807         unlink(path);
808
809         len = sizeof(addr->sun_family) + strlen(path) + 1;
810
811         if (bind(fd, (struct sockaddr *) addr, len) < 0) {
812                 log_err("fio: bind: %s\n", strerror(errno));
813                 close(fd);
814                 return -1;
815         }
816
817         umask(mode);
818         nd->listenfd = fd;
819         return 0;
820 }
821
822 static int fio_netio_setup_listen_inet(struct thread_data *td, short port)
823 {
824         struct netio_data *nd = td->io_ops->data;
825         struct netio_options *o = td->eo;
826         struct ip_mreq mr;
827         struct sockaddr_in sin;
828         int fd, opt, type;
829
830         memset(&sin, 0, sizeof(sin));
831         if (o->proto == FIO_TYPE_TCP)
832                 type = SOCK_STREAM;
833         else
834                 type = SOCK_DGRAM;
835
836         fd = socket(AF_INET, type, 0);
837         if (fd < 0) {
838                 td_verror(td, errno, "socket");
839                 return 1;
840         }
841
842         opt = 1;
843         if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *) &opt, sizeof(opt)) < 0) {
844                 td_verror(td, errno, "setsockopt");
845                 close(fd);
846                 return 1;
847         }
848 #ifdef SO_REUSEPORT
849         if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, (void *) &opt, sizeof(opt)) < 0) {
850                 td_verror(td, errno, "setsockopt");
851                 close(fd);
852                 return 1;
853         }
854 #endif
855
856         if (td->o.filename){
857                 if(o->proto != FIO_TYPE_UDP ||
858                    !fio_netio_is_multicast(td->o.filename)) {
859                         log_err("fio: hostname not valid for non-multicast inbound network IO\n");
860                         close(fd);
861                         return 1;
862                 }
863
864                 inet_aton(td->o.filename, &sin.sin_addr);
865
866                 mr.imr_multiaddr = sin.sin_addr;
867                 if (o->interface) {
868                         if (inet_aton(o->interface, &mr.imr_interface) == 0) {
869                                 log_err("fio: interface not valid interface IP\n");
870                                 close(fd);
871                                 return 1;
872                         }
873                 } else {
874                         mr.imr_interface.s_addr = htonl(INADDR_ANY);
875                 }
876                 if (setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mr, sizeof(mr)) < 0) {
877                         td_verror(td, errno, "setsockopt IP_ADD_MEMBERSHIP");
878                         close(fd);
879                         return 1;
880                 }
881         }
882
883         nd->addr.sin_family = AF_INET;
884         nd->addr.sin_addr.s_addr = sin.sin_addr.s_addr ? sin.sin_addr.s_addr : htonl(INADDR_ANY);
885         nd->addr.sin_port = htons(port);
886
887         if (bind(fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
888                 td_verror(td, errno, "bind");
889                 return 1;
890         }
891
892         nd->listenfd = fd;
893         return 0;
894 }
895
896 static int fio_netio_setup_listen(struct thread_data *td)
897 {
898         struct netio_data *nd = td->io_ops->data;
899         struct netio_options *o = td->eo;
900         int ret;
901
902         if (o->proto == FIO_TYPE_UDP || o->proto == FIO_TYPE_TCP)
903                 ret = fio_netio_setup_listen_inet(td, o->port);
904         else
905                 ret = fio_netio_setup_listen_unix(td, td->o.filename);
906
907         if (ret)
908                 return ret;
909         if (o->proto == FIO_TYPE_UDP)
910                 return 0;
911
912         if (listen(nd->listenfd, 10) < 0) {
913                 td_verror(td, errno, "listen");
914                 nd->listenfd = -1;
915                 return 1;
916         }
917
918         return 0;
919 }
920
921 static int fio_netio_init(struct thread_data *td)
922 {
923         struct netio_options *o = td->eo;
924         int ret;
925
926 #ifdef WIN32
927         WSADATA wsd;
928         WSAStartup(MAKEWORD(2,2), &wsd);
929 #endif
930
931         if (td_random(td)) {
932                 log_err("fio: network IO can't be random\n");
933                 return 1;
934         }
935
936         if (o->proto == FIO_TYPE_UNIX && o->port) {
937                 log_err("fio: network IO port not valid with unix socket\n");
938                 return 1;
939         } else if (o->proto != FIO_TYPE_UNIX && !o->port) {
940                 log_err("fio: network IO requires port for tcp or udp\n");
941                 return 1;
942         }
943
944         if (o->proto != FIO_TYPE_TCP) {
945                 if (o->listen) {
946                         log_err("fio: listen only valid for TCP proto IO\n");
947                         return 1;
948                 }
949                 if (td_rw(td)) {
950                         log_err("fio: datagram network connections must be"
951                                    " read OR write\n");
952                         return 1;
953                 }
954                 if (o->proto == FIO_TYPE_UNIX && !td->o.filename) {
955                         log_err("fio: UNIX sockets need host/filename\n");
956                         return 1;
957                 }
958                 o->listen = td_read(td);
959         }
960
961         if (o->listen)
962                 ret = fio_netio_setup_listen(td);
963         else
964                 ret = fio_netio_setup_connect(td);
965
966         return ret;
967 }
968
969 static void fio_netio_cleanup(struct thread_data *td)
970 {
971         struct netio_data *nd = td->io_ops->data;
972
973         if (nd) {
974                 if (nd->listenfd != -1)
975                         close(nd->listenfd);
976                 if (nd->pipes[0] != -1)
977                         close(nd->pipes[0]);
978                 if (nd->pipes[1] != -1)
979                         close(nd->pipes[1]);
980
981                 free(nd);
982         }
983 }
984
985 static int fio_netio_setup(struct thread_data *td)
986 {
987         struct netio_data *nd;
988
989         if (!td->files_index) {
990                 add_file(td, td->o.filename ?: "net");
991                 td->o.nr_files = td->o.nr_files ?: 1;
992         }
993
994         if (!td->io_ops->data) {
995                 nd = malloc(sizeof(*nd));;
996
997                 memset(nd, 0, sizeof(*nd));
998                 nd->listenfd = -1;
999                 nd->pipes[0] = nd->pipes[1] = -1;
1000                 td->io_ops->data = nd;
1001         }
1002
1003         return 0;
1004 }
1005
1006 static void fio_netio_terminate(struct thread_data *td)
1007 {
1008         kill(td->pid, SIGUSR2);
1009 }
1010
1011 #ifdef CONFIG_LINUX_SPLICE
1012 static int fio_netio_setup_splice(struct thread_data *td)
1013 {
1014         struct netio_data *nd;
1015
1016         fio_netio_setup(td);
1017
1018         nd = td->io_ops->data;
1019         if (nd) {
1020                 if (pipe(nd->pipes) < 0)
1021                         return 1;
1022
1023                 nd->use_splice = 1;
1024                 return 0;
1025         }
1026
1027         return 1;
1028 }
1029
1030 static struct ioengine_ops ioengine_splice = {
1031         .name                   = "netsplice",
1032         .version                = FIO_IOOPS_VERSION,
1033         .prep                   = fio_netio_prep,
1034         .queue                  = fio_netio_queue,
1035         .setup                  = fio_netio_setup_splice,
1036         .init                   = fio_netio_init,
1037         .cleanup                = fio_netio_cleanup,
1038         .open_file              = fio_netio_open_file,
1039         .close_file             = fio_netio_close_file,
1040         .terminate              = fio_netio_terminate,
1041         .options                = options,
1042         .option_struct_size     = sizeof(struct netio_options),
1043         .flags                  = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
1044                                   FIO_PIPEIO,
1045 };
1046 #endif
1047
1048 static struct ioengine_ops ioengine_rw = {
1049         .name                   = "net",
1050         .version                = FIO_IOOPS_VERSION,
1051         .prep                   = fio_netio_prep,
1052         .queue                  = fio_netio_queue,
1053         .setup                  = fio_netio_setup,
1054         .init                   = fio_netio_init,
1055         .cleanup                = fio_netio_cleanup,
1056         .open_file              = fio_netio_open_file,
1057         .close_file             = fio_netio_close_file,
1058         .terminate              = fio_netio_terminate,
1059         .options                = options,
1060         .option_struct_size     = sizeof(struct netio_options),
1061         .flags                  = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
1062                                   FIO_PIPEIO | FIO_BIT_BASED,
1063 };
1064
1065 static int str_hostname_cb(void *data, const char *input)
1066 {
1067         struct netio_options *o = data;
1068
1069         if (o->td->o.filename)
1070                 free(o->td->o.filename);
1071         o->td->o.filename = strdup(input);
1072         return 0;
1073 }
1074
1075 static void fio_init fio_netio_register(void)
1076 {
1077         register_ioengine(&ioengine_rw);
1078 #ifdef CONFIG_LINUX_SPLICE
1079         register_ioengine(&ioengine_splice);
1080 #endif
1081 }
1082
1083 static void fio_exit fio_netio_unregister(void)
1084 {
1085         unregister_ioengine(&ioengine_rw);
1086 #ifdef CONFIG_LINUX_SPLICE
1087         unregister_ioengine(&ioengine_splice);
1088 #endif
1089 }