faa08d5f6741b10641c36f12de933f88406846fe
[fio.git] / engines / net.c
1 /*
2  * net engine
3  *
4  * IO engine that reads/writes to/from sockets.
5  *
6  */
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include <unistd.h>
10 #include <errno.h>
11 #include <assert.h>
12 #include <netinet/in.h>
13 #include <arpa/inet.h>
14 #include <netdb.h>
15 #include <sys/poll.h>
16 #include <sys/types.h>
17 #include <sys/stat.h>
18 #include <sys/socket.h>
19 #include <sys/un.h>
20
21 #include "../fio.h"
22
23 struct netio_data {
24         int listenfd;
25         int send_to_net;
26         int use_splice;
27         int type;
28         int pipes[2];
29         char host[64];
30         struct sockaddr_in addr;
31         struct sockaddr_un addr_un;
32 };
33
34 struct udp_close_msg {
35         uint32_t magic;
36         uint32_t cmd;
37 };
38
39 enum {
40         FIO_LINK_CLOSE = 0x89,
41         FIO_LINK_CLOSE_MAGIC = 0x6c696e6b,
42
43         FIO_TYPE_TCP    = 1,
44         FIO_TYPE_UDP    = 2,
45         FIO_TYPE_UNIX   = 3,
46 };
47
48 /*
49  * Return -1 for error and 'nr events' for a positive number
50  * of events
51  */
52 static int poll_wait(struct thread_data *td, int fd, short events)
53 {
54         struct pollfd pfd;
55         int ret;
56
57         while (!td->terminate) {
58                 pfd.fd = fd;
59                 pfd.events = events;
60                 ret = poll(&pfd, 1, -1);
61                 if (ret < 0) {
62                         if (errno == EINTR)
63                                 break;
64
65                         td_verror(td, errno, "poll");
66                         return -1;
67                 } else if (!ret)
68                         continue;
69
70                 break;
71         }
72
73         if (pfd.revents & events)
74                 return 1;
75
76         return -1;
77 }
78
79 static int fio_netio_prep(struct thread_data *td, struct io_u *io_u)
80 {
81         struct netio_data *nd = td->io_ops->data;
82
83         /*
84          * Make sure we don't see spurious reads to a receiver, and vice versa
85          */
86         if ((nd->send_to_net && io_u->ddir == DDIR_READ) ||
87             (!nd->send_to_net && io_u->ddir == DDIR_WRITE)) {
88                 td_verror(td, EINVAL, "bad direction");
89                 return 1;
90         }
91                 
92         return 0;
93 }
94
95 #ifdef FIO_HAVE_SPLICE
96 static int splice_io_u(int fdin, int fdout, unsigned int len)
97 {
98         int bytes = 0;
99
100         while (len) {
101                 int ret = splice(fdin, NULL, fdout, NULL, len, 0);
102
103                 if (ret < 0) {
104                         if (!bytes)
105                                 bytes = ret;
106
107                         break;
108                 } else if (!ret)
109                         break;
110
111                 bytes += ret;
112                 len -= ret;
113         }
114
115         return bytes;
116 }
117
118 /*
119  * Receive bytes from a socket and fill them into the internal pipe
120  */
121 static int splice_in(struct thread_data *td, struct io_u *io_u)
122 {
123         struct netio_data *nd = td->io_ops->data;
124
125         return splice_io_u(io_u->file->fd, nd->pipes[1], io_u->xfer_buflen);
126 }
127
128 /*
129  * Transmit 'len' bytes from the internal pipe
130  */
131 static int splice_out(struct thread_data *td, struct io_u *io_u,
132                       unsigned int len)
133 {
134         struct netio_data *nd = td->io_ops->data;
135
136         return splice_io_u(nd->pipes[0], io_u->file->fd, len);
137 }
138
139 static int vmsplice_io_u(struct io_u *io_u, int fd, unsigned int len)
140 {
141         struct iovec iov = {
142                 .iov_base = io_u->xfer_buf,
143                 .iov_len = len,
144         };
145         int bytes = 0;
146
147         while (iov.iov_len) {
148                 int ret = vmsplice(fd, &iov, 1, SPLICE_F_MOVE);
149
150                 if (ret < 0) {
151                         if (!bytes)
152                                 bytes = ret;
153                         break;
154                 } else if (!ret)
155                         break;
156
157                 iov.iov_len -= ret;
158                 iov.iov_base += ret;
159                 bytes += ret;
160         }
161
162         return bytes;
163
164 }
165
166 /*
167  * vmsplice() pipe to io_u buffer
168  */
169 static int vmsplice_io_u_out(struct thread_data *td, struct io_u *io_u,
170                              unsigned int len)
171 {
172         struct netio_data *nd = td->io_ops->data;
173
174         return vmsplice_io_u(io_u, nd->pipes[0], len);
175 }
176
177 /*
178  * vmsplice() io_u to pipe
179  */
180 static int vmsplice_io_u_in(struct thread_data *td, struct io_u *io_u)
181 {
182         struct netio_data *nd = td->io_ops->data;
183
184         return vmsplice_io_u(io_u, nd->pipes[1], io_u->xfer_buflen);
185 }
186
187 /*
188  * splice receive - transfer socket data into a pipe using splice, then map
189  * that pipe data into the io_u using vmsplice.
190  */
191 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
192 {
193         int ret;
194
195         ret = splice_in(td, io_u);
196         if (ret > 0)
197                 return vmsplice_io_u_out(td, io_u, ret);
198
199         return ret;
200 }
201
202 /*
203  * splice transmit - map data from the io_u into a pipe by using vmsplice,
204  * then transfer that pipe to a socket using splice.
205  */
206 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
207 {
208         int ret;
209
210         ret = vmsplice_io_u_in(td, io_u);
211         if (ret > 0)
212                 return splice_out(td, io_u, ret);
213
214         return ret;
215 }
216 #else
217 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
218 {
219         errno = EOPNOTSUPP;
220         return -1;
221 }
222
223 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
224 {
225         errno = EOPNOTSUPP;
226         return -1;
227 }
228 #endif
229
230 static int fio_netio_send(struct thread_data *td, struct io_u *io_u)
231 {
232         struct netio_data *nd = td->io_ops->data;
233         int ret, flags = OS_MSG_DONTWAIT;
234
235         do {
236                 if (nd->type == FIO_TYPE_UDP) {
237                         struct sockaddr *to = (struct sockaddr *) &nd->addr;
238
239                         ret = sendto(io_u->file->fd, io_u->xfer_buf,
240                                         io_u->xfer_buflen, flags, to,
241                                         sizeof(*to));
242                 } else {
243                         /*
244                          * if we are going to write more, set MSG_MORE
245                          */
246 #ifdef MSG_MORE
247                         if (td->this_io_bytes[DDIR_WRITE] + io_u->xfer_buflen <
248                             td->o.size)
249                                 flags |= MSG_MORE;
250 #endif
251                         ret = send(io_u->file->fd, io_u->xfer_buf,
252                                         io_u->xfer_buflen, flags);
253                 }
254                 if (ret > 0)
255                         break;
256
257                 ret = poll_wait(td, io_u->file->fd, POLLOUT);
258                 if (ret <= 0)
259                         break;
260
261                 flags &= ~OS_MSG_DONTWAIT;
262         } while (1);
263
264         return ret;
265 }
266
267 static int is_udp_close(struct io_u *io_u, int len)
268 {
269         struct udp_close_msg *msg;
270
271         if (len != sizeof(struct udp_close_msg))
272                 return 0;
273
274         msg = io_u->xfer_buf;
275         if (ntohl(msg->magic) != FIO_LINK_CLOSE_MAGIC)
276                 return 0;
277         if (ntohl(msg->cmd) != FIO_LINK_CLOSE)
278                 return 0;
279
280         return 1;
281 }
282
283 static int fio_netio_recv(struct thread_data *td, struct io_u *io_u)
284 {
285         struct netio_data *nd = td->io_ops->data;
286         int ret, flags = OS_MSG_DONTWAIT;
287
288         do {
289                 if (nd->type == FIO_TYPE_UDP) {
290                         fio_socklen_t len = sizeof(nd->addr);
291                         struct sockaddr *from = (struct sockaddr *) &nd->addr;
292
293                         ret = recvfrom(io_u->file->fd, io_u->xfer_buf,
294                                         io_u->xfer_buflen, flags, from, &len);
295                         if (is_udp_close(io_u, ret)) {
296                                 td->done = 1;
297                                 return 0;
298                         }
299                 } else {
300                         ret = recv(io_u->file->fd, io_u->xfer_buf,
301                                         io_u->xfer_buflen, flags);
302                 }
303                 if (ret > 0)
304                         break;
305
306                 ret = poll_wait(td, io_u->file->fd, POLLIN);
307                 if (ret <= 0)
308                         break;
309                 flags &= ~OS_MSG_DONTWAIT;
310                 flags |= MSG_WAITALL;
311         } while (1);
312
313         return ret;
314 }
315
316 static int fio_netio_queue(struct thread_data *td, struct io_u *io_u)
317 {
318         struct netio_data *nd = td->io_ops->data;
319         int ret;
320
321         fio_ro_check(td, io_u);
322
323         if (io_u->ddir == DDIR_WRITE) {
324                 if (!nd->use_splice || nd->type == FIO_TYPE_UDP ||
325                     nd->type == FIO_TYPE_UNIX) 
326                         ret = fio_netio_send(td, io_u);
327                 else
328                         ret = fio_netio_splice_out(td, io_u);
329         } else if (io_u->ddir == DDIR_READ) {
330                 if (!nd->use_splice || nd->type == FIO_TYPE_UDP ||
331                     nd->type == FIO_TYPE_UDP)
332                         ret = fio_netio_recv(td, io_u);
333                 else
334                         ret = fio_netio_splice_in(td, io_u);
335         } else
336                 ret = 0;        /* must be a SYNC */
337
338         if (ret != (int) io_u->xfer_buflen) {
339                 if (ret >= 0) {
340                         io_u->resid = io_u->xfer_buflen - ret;
341                         io_u->error = 0;
342                         return FIO_Q_COMPLETED;
343                 } else {
344                         int err = errno;
345
346                         if (io_u->ddir == DDIR_WRITE && err == EMSGSIZE)
347                                 return FIO_Q_BUSY;
348
349                         io_u->error = err;
350                 }
351         }
352
353         if (io_u->error)
354                 td_verror(td, io_u->error, "xfer");
355
356         return FIO_Q_COMPLETED;
357 }
358
359 static int fio_netio_connect(struct thread_data *td, struct fio_file *f)
360 {
361         struct netio_data *nd = td->io_ops->data;
362         int type, domain;
363
364         if (nd->type == FIO_TYPE_TCP) {
365                 domain = AF_INET;
366                 type = SOCK_STREAM;
367         } else if (nd->type == FIO_TYPE_UDP) {
368                 domain = AF_INET;
369                 type = SOCK_DGRAM;
370         } else if (nd->type == FIO_TYPE_UNIX) {
371                 domain = AF_UNIX;
372                 type = SOCK_STREAM;
373         } else {
374                 log_err("fio: bad network type %d\n", nd->type);
375                 f->fd = -1;
376                 return 1;
377         }
378
379         f->fd = socket(domain, type, 0);
380         if (f->fd < 0) {
381                 td_verror(td, errno, "socket");
382                 return 1;
383         }
384
385         if (nd->type == FIO_TYPE_UDP)
386                 return 0;
387         else if (nd->type == FIO_TYPE_TCP) {
388                 fio_socklen_t len = sizeof(nd->addr);
389
390                 if (connect(f->fd, (struct sockaddr *) &nd->addr, len) < 0) {
391                         td_verror(td, errno, "connect");
392                         return 1;
393                 }
394         } else {
395                 struct sockaddr_un *addr = &nd->addr_un;
396                 fio_socklen_t len;
397
398                 len = sizeof(addr->sun_family) + strlen(addr->sun_path) + 1;
399
400                 if (connect(f->fd, (struct sockaddr *) addr, len) < 0) {
401                         td_verror(td, errno, "connect");
402                         return 1;
403                 }
404         }
405
406         return 0;
407 }
408
409 static int fio_netio_accept(struct thread_data *td, struct fio_file *f)
410 {
411         struct netio_data *nd = td->io_ops->data;
412         fio_socklen_t socklen = sizeof(nd->addr);
413
414         if (nd->type == FIO_TYPE_UDP) {
415                 f->fd = nd->listenfd;
416                 return 0;
417         }
418
419         log_info("fio: waiting for connection\n");
420
421         if (poll_wait(td, nd->listenfd, POLLIN) < 0)
422                 return 1;
423
424         f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr, &socklen);
425         if (f->fd < 0) {
426                 td_verror(td, errno, "accept");
427                 return 1;
428         }
429
430         return 0;
431 }
432
433 static int fio_netio_open_file(struct thread_data *td, struct fio_file *f)
434 {
435         int ret;
436
437         if (td_read(td))
438                 ret = fio_netio_accept(td, f);
439         else
440                 ret = fio_netio_connect(td, f);
441
442         if (ret)
443                 f->fd = -1;
444         return ret;
445 }
446
447 static void fio_netio_udp_close(struct thread_data *td, struct fio_file *f)
448 {
449         struct netio_data *nd = td->io_ops->data;
450         struct udp_close_msg msg;
451         struct sockaddr *to = (struct sockaddr *) &nd->addr;
452         int ret;
453
454         msg.magic = htonl(FIO_LINK_CLOSE_MAGIC);
455         msg.cmd = htonl(FIO_LINK_CLOSE);
456
457         ret = sendto(f->fd, &msg, sizeof(msg), MSG_WAITALL, to,
458                         sizeof(nd->addr));
459         if (ret < 0)
460                 td_verror(td, errno, "sendto udp link close");
461 }
462
463 static int fio_netio_close_file(struct thread_data *td, struct fio_file *f)
464 {
465         struct netio_data *nd = td->io_ops->data;
466
467         /*
468          * If this is an UDP connection, notify the receiver that we are
469          * closing down the link
470          */
471         if (nd->type == FIO_TYPE_UDP)
472                 fio_netio_udp_close(td, f);
473
474         return generic_close_file(td, f);
475 }
476
477 static int fio_netio_setup_connect_inet(struct thread_data *td,
478                                         const char *host, unsigned short port)
479 {
480         struct netio_data *nd = td->io_ops->data;
481
482         nd->addr.sin_family = AF_INET;
483         nd->addr.sin_port = htons(port);
484
485         if (inet_aton(host, &nd->addr.sin_addr) != 1) {
486                 struct hostent *hent;
487
488                 hent = gethostbyname(host);
489                 if (!hent) {
490                         td_verror(td, errno, "gethostbyname");
491                         return 1;
492                 }
493
494                 memcpy(&nd->addr.sin_addr, hent->h_addr, 4);
495         }
496
497         return 0;
498 }
499
500 static int fio_netio_setup_connect_unix(struct thread_data *td,
501                                         const char *path)
502 {
503         struct netio_data *nd = td->io_ops->data;
504         struct sockaddr_un *soun = &nd->addr_un;
505
506         soun->sun_family = AF_UNIX;
507         strcpy(soun->sun_path, path);
508         return 0;
509 }
510
511 static int fio_netio_setup_connect(struct thread_data *td, const char *host,
512                                    unsigned short port)
513 {
514         struct netio_data *nd = td->io_ops->data;
515
516         if (nd->type == FIO_TYPE_UDP || nd->type == FIO_TYPE_TCP)
517                 return fio_netio_setup_connect_inet(td, host, port);
518         else
519                 return fio_netio_setup_connect_unix(td, host);
520 }
521
522 static int fio_netio_setup_listen_unix(struct thread_data *td, const char *path)
523 {
524         struct netio_data *nd = td->io_ops->data;
525         struct sockaddr_un *addr = &nd->addr_un;
526         mode_t mode;
527         int len, fd;
528
529         fd = socket(AF_UNIX, SOCK_STREAM, 0);
530         if (fd < 0) {
531                 log_err("fio: socket: %s\n", strerror(errno));
532                 return -1;
533         }
534
535         mode = umask(000);
536
537         memset(addr, 0, sizeof(*addr));
538         addr->sun_family = AF_UNIX;
539         strcpy(addr->sun_path, path);
540         unlink(path);
541
542         len = sizeof(addr->sun_family) + strlen(path) + 1;
543
544         if (bind(fd, (struct sockaddr *) addr, len) < 0) {
545                 log_err("fio: bind: %s\n", strerror(errno));
546                 return -1;
547         }
548
549         umask(mode);
550         nd->listenfd = fd;
551         return 0;
552 }
553
554 static int fio_netio_setup_listen_inet(struct thread_data *td, short port)
555 {
556         struct netio_data *nd = td->io_ops->data;
557         int fd, opt, type;
558
559         if (nd->type == FIO_TYPE_TCP)
560                 type = SOCK_STREAM;
561         else
562                 type = SOCK_DGRAM;
563
564         fd = socket(AF_INET, type, 0);
565         if (fd < 0) {
566                 td_verror(td, errno, "socket");
567                 return 1;
568         }
569
570         opt = 1;
571         if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
572                 td_verror(td, errno, "setsockopt");
573                 return 1;
574         }
575 #ifdef SO_REUSEPORT
576         if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) < 0) {
577                 td_verror(td, errno, "setsockopt");
578                 return 1;
579         }
580 #endif
581
582         nd->addr.sin_family = AF_INET;
583         nd->addr.sin_addr.s_addr = htonl(INADDR_ANY);
584         nd->addr.sin_port = htons(port);
585
586         if (bind(fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
587                 td_verror(td, errno, "bind");
588                 return 1;
589         }
590
591         nd->listenfd = fd;
592         return 0;
593 }
594
595 static int fio_netio_setup_listen(struct thread_data *td, const char *path,
596                                   short port)
597 {
598         struct netio_data *nd = td->io_ops->data;
599         int ret;
600
601         if (nd->type == FIO_TYPE_UDP || nd->type == FIO_TYPE_TCP)
602                 ret = fio_netio_setup_listen_inet(td, port);
603         else
604                 ret = fio_netio_setup_listen_unix(td, path);
605
606         if (ret)
607                 return ret;
608         if (nd->type == FIO_TYPE_UDP)
609                 return 0;
610
611         if (listen(nd->listenfd, 10) < 0) {
612                 td_verror(td, errno, "listen");
613                 nd->listenfd = -1;
614                 return 1;
615         }
616
617         return 0;
618 }
619
620 static int fio_netio_init(struct thread_data *td)
621 {
622         struct netio_data *nd = td->io_ops->data;
623         unsigned int port;
624         char host[64], buf[128];
625         char *sep, *portp, *modep;
626         int ret;
627
628         if (td_rw(td)) {
629                 log_err("fio: network connections must be read OR write\n");
630                 return 1;
631         }
632         if (td_random(td)) {
633                 log_err("fio: network IO can't be random\n");
634                 return 1;
635         }
636
637         strcpy(buf, td->o.filename);
638
639         sep = strchr(buf, ',');
640         if (!sep)
641                 goto bad_host;
642
643         *sep = '\0';
644         sep++;
645         strcpy(host, buf);
646         if (!strlen(host))
647                 goto bad_host;
648
649         modep = NULL;
650         portp = sep;
651         sep = strchr(portp, ',');
652         if (sep) {
653                 *sep = '\0';
654                 modep = sep + 1;
655         }
656
657         if (!strncmp("tcp", modep, strlen(modep)) ||
658             !strncmp("TCP", modep, strlen(modep)))
659                 nd->type = FIO_TYPE_TCP;
660         else if (!strncmp("udp", modep, strlen(modep)) ||
661                  !strncmp("UDP", modep, strlen(modep)))
662                 nd->type = FIO_TYPE_UDP;
663         else if (!strncmp("unix", modep, strlen(modep)) ||
664                  !strncmp("UNIX", modep, strlen(modep)))
665                 nd->type = FIO_TYPE_UNIX;
666         else
667                 goto bad_host;
668
669         if (nd->type != FIO_TYPE_UNIX) {
670                 port = strtol(portp, NULL, 10);
671                 if (!port || port > 65535)
672                         goto bad_host;
673         } else
674                 port = 0;
675
676         if (td_read(td)) {
677                 nd->send_to_net = 0;
678                 ret = fio_netio_setup_listen(td, host, port);
679         } else {
680                 nd->send_to_net = 1;
681                 ret = fio_netio_setup_connect(td, host, port);
682         }
683
684         return ret;
685 bad_host:
686         log_err("fio: bad network host/port/protocol: %s\n", td->o.filename);
687         return 1;
688 }
689
690 static void fio_netio_cleanup(struct thread_data *td)
691 {
692         struct netio_data *nd = td->io_ops->data;
693
694         if (nd) {
695                 if (nd->listenfd != -1)
696                         close(nd->listenfd);
697                 if (nd->pipes[0] != -1)
698                         close(nd->pipes[0]);
699                 if (nd->pipes[1] != -1)
700                         close(nd->pipes[1]);
701
702                 free(nd);
703         }
704 }
705
706 static int fio_netio_setup(struct thread_data *td)
707 {
708         struct netio_data *nd;
709
710         if (!td->io_ops->data) {
711                 nd = malloc(sizeof(*nd));;
712
713                 memset(nd, 0, sizeof(*nd));
714                 nd->listenfd = -1;
715                 nd->pipes[0] = nd->pipes[1] = -1;
716                 td->io_ops->data = nd;
717         }
718
719         return 0;
720 }
721
722 #ifdef FIO_HAVE_SPLICE
723 static int fio_netio_setup_splice(struct thread_data *td)
724 {
725         struct netio_data *nd;
726
727         fio_netio_setup(td);
728
729         nd = td->io_ops->data;
730         if (nd) {
731                 if (pipe(nd->pipes) < 0)
732                         return 1;
733
734                 nd->use_splice = 1;
735                 return 0;
736         }
737
738         return 1;
739 }
740
741 static struct ioengine_ops ioengine_splice = {
742         .name           = "netsplice",
743         .version        = FIO_IOOPS_VERSION,
744         .prep           = fio_netio_prep,
745         .queue          = fio_netio_queue,
746         .setup          = fio_netio_setup_splice,
747         .init           = fio_netio_init,
748         .cleanup        = fio_netio_cleanup,
749         .open_file      = fio_netio_open_file,
750         .close_file     = generic_close_file,
751         .flags          = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
752                           FIO_SIGTERM | FIO_PIPEIO,
753 };
754 #endif
755
756 static struct ioengine_ops ioengine_rw = {
757         .name           = "net",
758         .version        = FIO_IOOPS_VERSION,
759         .prep           = fio_netio_prep,
760         .queue          = fio_netio_queue,
761         .setup          = fio_netio_setup,
762         .init           = fio_netio_init,
763         .cleanup        = fio_netio_cleanup,
764         .open_file      = fio_netio_open_file,
765         .close_file     = fio_netio_close_file,
766         .flags          = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
767                           FIO_SIGTERM | FIO_PIPEIO,
768 };
769
770 static void fio_init fio_netio_register(void)
771 {
772         register_ioengine(&ioengine_rw);
773 #ifdef FIO_HAVE_SPLICE
774         register_ioengine(&ioengine_splice);
775 #endif
776 }
777
778 static void fio_exit fio_netio_unregister(void)
779 {
780         unregister_ioengine(&ioengine_rw);
781 #ifdef FIO_HAVE_SPLICE
782         unregister_ioengine(&ioengine_splice);
783 #endif
784 }