d6821a4092e7898aa0fea956a6069931fde3a8b2
[fio.git] / engines / net.c
1 /*
2  * net engine
3  *
4  * IO engine that reads/writes to/from sockets.
5  *
6  */
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include <unistd.h>
10 #include <errno.h>
11 #include <assert.h>
12 #include <netinet/in.h>
13 #include <arpa/inet.h>
14 #include <netdb.h>
15 #include <sys/poll.h>
16 #include <sys/types.h>
17 #include <sys/stat.h>
18 #include <sys/socket.h>
19 #include <sys/un.h>
20
21 #include "../fio.h"
22
23 struct netio_data {
24         int listenfd;
25         int send_to_net;
26         int use_splice;
27         int type;
28         int pipes[2];
29         char host[64];
30         struct sockaddr_in addr;
31         struct sockaddr_un addr_un;
32 };
33
34 struct udp_close_msg {
35         uint32_t magic;
36         uint32_t cmd;
37 };
38
39 enum {
40         FIO_LINK_CLOSE = 0x89,
41         FIO_LINK_CLOSE_MAGIC = 0x6c696e6b,
42
43         FIO_TYPE_TCP    = 1,
44         FIO_TYPE_UDP    = 2,
45         FIO_TYPE_UNIX   = 3,
46 };
47
48 /*
49  * Return -1 for error and 'nr events' for a positive number
50  * of events
51  */
52 static int poll_wait(struct thread_data *td, int fd, short events)
53 {
54         struct pollfd pfd;
55         int ret;
56
57         while (!td->terminate) {
58                 pfd.fd = fd;
59                 pfd.events = events;
60                 ret = poll(&pfd, 1, -1);
61                 if (ret < 0) {
62                         if (errno == EINTR)
63                                 break;
64
65                         td_verror(td, errno, "poll");
66                         return -1;
67                 } else if (!ret)
68                         continue;
69
70                 break;
71         }
72
73         if (pfd.revents & events)
74                 return 1;
75
76         return -1;
77 }
78
79 static int fio_netio_prep(struct thread_data *td, struct io_u *io_u)
80 {
81         struct netio_data *nd = td->io_ops->data;
82
83         /*
84          * Make sure we don't see spurious reads to a receiver, and vice versa
85          */
86         if ((nd->send_to_net && io_u->ddir == DDIR_READ) ||
87             (!nd->send_to_net && io_u->ddir == DDIR_WRITE)) {
88                 td_verror(td, EINVAL, "bad direction");
89                 return 1;
90         }
91                 
92         return 0;
93 }
94
95 #ifdef FIO_HAVE_SPLICE
96 static int splice_io_u(int fdin, int fdout, unsigned int len)
97 {
98         int bytes = 0;
99
100         while (len) {
101                 int ret = splice(fdin, NULL, fdout, NULL, len, 0);
102
103                 if (ret < 0) {
104                         if (!bytes)
105                                 bytes = ret;
106
107                         break;
108                 } else if (!ret)
109                         break;
110
111                 bytes += ret;
112                 len -= ret;
113         }
114
115         return bytes;
116 }
117
118 /*
119  * Receive bytes from a socket and fill them into the internal pipe
120  */
121 static int splice_in(struct thread_data *td, struct io_u *io_u)
122 {
123         struct netio_data *nd = td->io_ops->data;
124
125         return splice_io_u(io_u->file->fd, nd->pipes[1], io_u->xfer_buflen);
126 }
127
128 /*
129  * Transmit 'len' bytes from the internal pipe
130  */
131 static int splice_out(struct thread_data *td, struct io_u *io_u,
132                       unsigned int len)
133 {
134         struct netio_data *nd = td->io_ops->data;
135
136         return splice_io_u(nd->pipes[0], io_u->file->fd, len);
137 }
138
139 static int vmsplice_io_u(struct io_u *io_u, int fd, unsigned int len)
140 {
141         struct iovec iov = {
142                 .iov_base = io_u->xfer_buf,
143                 .iov_len = len,
144         };
145         int bytes = 0;
146
147         while (iov.iov_len) {
148                 int ret = vmsplice(fd, &iov, 1, SPLICE_F_MOVE);
149
150                 if (ret < 0) {
151                         if (!bytes)
152                                 bytes = ret;
153                         break;
154                 } else if (!ret)
155                         break;
156
157                 iov.iov_len -= ret;
158                 iov.iov_base += ret;
159                 bytes += ret;
160         }
161
162         return bytes;
163
164 }
165
166 /*
167  * vmsplice() pipe to io_u buffer
168  */
169 static int vmsplice_io_u_out(struct thread_data *td, struct io_u *io_u,
170                              unsigned int len)
171 {
172         struct netio_data *nd = td->io_ops->data;
173
174         return vmsplice_io_u(io_u, nd->pipes[0], len);
175 }
176
177 /*
178  * vmsplice() io_u to pipe
179  */
180 static int vmsplice_io_u_in(struct thread_data *td, struct io_u *io_u)
181 {
182         struct netio_data *nd = td->io_ops->data;
183
184         return vmsplice_io_u(io_u, nd->pipes[1], io_u->xfer_buflen);
185 }
186
187 /*
188  * splice receive - transfer socket data into a pipe using splice, then map
189  * that pipe data into the io_u using vmsplice.
190  */
191 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
192 {
193         int ret;
194
195         ret = splice_in(td, io_u);
196         if (ret > 0)
197                 return vmsplice_io_u_out(td, io_u, ret);
198
199         return ret;
200 }
201
202 /*
203  * splice transmit - map data from the io_u into a pipe by using vmsplice,
204  * then transfer that pipe to a socket using splice.
205  */
206 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
207 {
208         int ret;
209
210         ret = vmsplice_io_u_in(td, io_u);
211         if (ret > 0)
212                 return splice_out(td, io_u, ret);
213
214         return ret;
215 }
216 #else
217 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
218 {
219         errno = EOPNOTSUPP;
220         return -1;
221 }
222
223 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
224 {
225         errno = EOPNOTSUPP;
226         return -1;
227 }
228 #endif
229
230 static int fio_netio_send(struct thread_data *td, struct io_u *io_u)
231 {
232         struct netio_data *nd = td->io_ops->data;
233         int ret, flags = OS_MSG_DONTWAIT;
234
235         do {
236                 if (nd->type == FIO_TYPE_UDP) {
237                         struct sockaddr *to = (struct sockaddr *) &nd->addr;
238
239                         ret = sendto(io_u->file->fd, io_u->xfer_buf,
240                                         io_u->xfer_buflen, flags, to,
241                                         sizeof(*to));
242                 } else {
243                         /*
244                          * if we are going to write more, set MSG_MORE
245                          */
246 #ifdef MSG_MORE
247                         if (td->this_io_bytes[DDIR_WRITE] + io_u->xfer_buflen <
248                             td->o.size)
249                                 flags |= MSG_MORE;
250 #endif
251                         ret = send(io_u->file->fd, io_u->xfer_buf,
252                                         io_u->xfer_buflen, flags);
253                 }
254                 if (ret > 0)
255                         break;
256
257                 ret = poll_wait(td, io_u->file->fd, POLLOUT);
258                 if (ret <= 0)
259                         break;
260
261                 flags &= ~OS_MSG_DONTWAIT;
262         } while (1);
263
264         return ret;
265 }
266
267 static int is_udp_close(struct io_u *io_u, int len)
268 {
269         struct udp_close_msg *msg;
270
271         if (len != sizeof(struct udp_close_msg))
272                 return 0;
273
274         msg = io_u->xfer_buf;
275         if (ntohl(msg->magic) != FIO_LINK_CLOSE_MAGIC)
276                 return 0;
277         if (ntohl(msg->cmd) != FIO_LINK_CLOSE)
278                 return 0;
279
280         return 1;
281 }
282
283 static int fio_netio_recv(struct thread_data *td, struct io_u *io_u)
284 {
285         struct netio_data *nd = td->io_ops->data;
286         int ret, flags = OS_MSG_DONTWAIT;
287
288         do {
289                 if (nd->type == FIO_TYPE_UDP) {
290                         fio_socklen_t len = sizeof(nd->addr);
291                         struct sockaddr *from = (struct sockaddr *) &nd->addr;
292
293                         ret = recvfrom(io_u->file->fd, io_u->xfer_buf,
294                                         io_u->xfer_buflen, flags, from, &len);
295                         if (is_udp_close(io_u, ret)) {
296                                 td->done = 1;
297                                 return 0;
298                         }
299                 } else {
300                         ret = recv(io_u->file->fd, io_u->xfer_buf,
301                                         io_u->xfer_buflen, flags);
302                 }
303                 if (ret > 0)
304                         break;
305
306                 ret = poll_wait(td, io_u->file->fd, POLLIN);
307                 if (ret <= 0)
308                         break;
309                 flags &= ~OS_MSG_DONTWAIT;
310                 flags |= MSG_WAITALL;
311         } while (1);
312
313         return ret;
314 }
315
316 static int fio_netio_queue(struct thread_data *td, struct io_u *io_u)
317 {
318         struct netio_data *nd = td->io_ops->data;
319         int ret;
320
321         fio_ro_check(td, io_u);
322
323         if (io_u->ddir == DDIR_WRITE) {
324                 if (!nd->use_splice || nd->type == FIO_TYPE_UDP ||
325                     nd->type == FIO_TYPE_UNIX) 
326                         ret = fio_netio_send(td, io_u);
327                 else
328                         ret = fio_netio_splice_out(td, io_u);
329         } else if (io_u->ddir == DDIR_READ) {
330                 if (!nd->use_splice || nd->type == FIO_TYPE_UDP ||
331                     nd->type == FIO_TYPE_UDP)
332                         ret = fio_netio_recv(td, io_u);
333                 else
334                         ret = fio_netio_splice_in(td, io_u);
335         } else
336                 ret = 0;        /* must be a SYNC */
337
338         if (ret != (int) io_u->xfer_buflen) {
339                 if (ret >= 0) {
340                         io_u->resid = io_u->xfer_buflen - ret;
341                         io_u->error = 0;
342                         return FIO_Q_COMPLETED;
343                 } else {
344                         int err = errno;
345
346                         if (io_u->ddir == DDIR_WRITE && err == EMSGSIZE)
347                                 return FIO_Q_BUSY;
348
349                         io_u->error = err;
350                 }
351         }
352
353         if (io_u->error)
354                 td_verror(td, io_u->error, "xfer");
355
356         return FIO_Q_COMPLETED;
357 }
358
359 static int fio_netio_connect(struct thread_data *td, struct fio_file *f)
360 {
361         struct netio_data *nd = td->io_ops->data;
362         int type, domain;
363
364         if (nd->type == FIO_TYPE_TCP) {
365                 domain = AF_INET;
366                 type = SOCK_STREAM;
367         } else if (nd->type == FIO_TYPE_UDP) {
368                 domain = AF_INET;
369                 type = SOCK_DGRAM;
370         } else if (nd->type == FIO_TYPE_UNIX) {
371                 domain = AF_UNIX;
372                 type = SOCK_STREAM;
373         } else {
374                 log_err("fio: bad network type %d\n", nd->type);
375                 f->fd = -1;
376                 return 1;
377         }
378
379         f->fd = socket(domain, type, 0);
380         if (f->fd < 0) {
381                 td_verror(td, errno, "socket");
382                 return 1;
383         }
384
385         if (nd->type == FIO_TYPE_UDP)
386                 return 0;
387         else if (nd->type == FIO_TYPE_TCP) {
388                 fio_socklen_t len = sizeof(nd->addr);
389
390                 if (connect(f->fd, (struct sockaddr *) &nd->addr, len) < 0) {
391                         td_verror(td, errno, "connect");
392                         close(f->fd);
393                         return 1;
394                 }
395         } else {
396                 struct sockaddr_un *addr = &nd->addr_un;
397                 fio_socklen_t len;
398
399                 len = sizeof(addr->sun_family) + strlen(addr->sun_path) + 1;
400
401                 if (connect(f->fd, (struct sockaddr *) addr, len) < 0) {
402                         td_verror(td, errno, "connect");
403                         close(f->fd);
404                         return 1;
405                 }
406         }
407
408         return 0;
409 }
410
411 static int fio_netio_accept(struct thread_data *td, struct fio_file *f)
412 {
413         struct netio_data *nd = td->io_ops->data;
414         fio_socklen_t socklen = sizeof(nd->addr);
415
416         if (nd->type == FIO_TYPE_UDP) {
417                 f->fd = nd->listenfd;
418                 return 0;
419         }
420
421         log_info("fio: waiting for connection\n");
422
423         if (poll_wait(td, nd->listenfd, POLLIN) < 0)
424                 return 1;
425
426         f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr, &socklen);
427         if (f->fd < 0) {
428                 td_verror(td, errno, "accept");
429                 return 1;
430         }
431
432         return 0;
433 }
434
435 static int fio_netio_open_file(struct thread_data *td, struct fio_file *f)
436 {
437         int ret;
438
439         if (td_read(td))
440                 ret = fio_netio_accept(td, f);
441         else
442                 ret = fio_netio_connect(td, f);
443
444         if (ret)
445                 f->fd = -1;
446         return ret;
447 }
448
449 static void fio_netio_udp_close(struct thread_data *td, struct fio_file *f)
450 {
451         struct netio_data *nd = td->io_ops->data;
452         struct udp_close_msg msg;
453         struct sockaddr *to = (struct sockaddr *) &nd->addr;
454         int ret;
455
456         msg.magic = htonl(FIO_LINK_CLOSE_MAGIC);
457         msg.cmd = htonl(FIO_LINK_CLOSE);
458
459         ret = sendto(f->fd, &msg, sizeof(msg), MSG_WAITALL, to,
460                         sizeof(nd->addr));
461         if (ret < 0)
462                 td_verror(td, errno, "sendto udp link close");
463 }
464
465 static int fio_netio_close_file(struct thread_data *td, struct fio_file *f)
466 {
467         struct netio_data *nd = td->io_ops->data;
468
469         /*
470          * If this is an UDP connection, notify the receiver that we are
471          * closing down the link
472          */
473         if (nd->type == FIO_TYPE_UDP)
474                 fio_netio_udp_close(td, f);
475
476         return generic_close_file(td, f);
477 }
478
479 static int fio_netio_setup_connect_inet(struct thread_data *td,
480                                         const char *host, unsigned short port)
481 {
482         struct netio_data *nd = td->io_ops->data;
483
484         nd->addr.sin_family = AF_INET;
485         nd->addr.sin_port = htons(port);
486
487         if (inet_aton(host, &nd->addr.sin_addr) != 1) {
488                 struct hostent *hent;
489
490                 hent = gethostbyname(host);
491                 if (!hent) {
492                         td_verror(td, errno, "gethostbyname");
493                         return 1;
494                 }
495
496                 memcpy(&nd->addr.sin_addr, hent->h_addr, 4);
497         }
498
499         return 0;
500 }
501
502 static int fio_netio_setup_connect_unix(struct thread_data *td,
503                                         const char *path)
504 {
505         struct netio_data *nd = td->io_ops->data;
506         struct sockaddr_un *soun = &nd->addr_un;
507
508         soun->sun_family = AF_UNIX;
509         strcpy(soun->sun_path, path);
510         return 0;
511 }
512
513 static int fio_netio_setup_connect(struct thread_data *td, const char *host,
514                                    unsigned short port)
515 {
516         struct netio_data *nd = td->io_ops->data;
517
518         if (nd->type == FIO_TYPE_UDP || nd->type == FIO_TYPE_TCP)
519                 return fio_netio_setup_connect_inet(td, host, port);
520         else
521                 return fio_netio_setup_connect_unix(td, host);
522 }
523
524 static int fio_netio_setup_listen_unix(struct thread_data *td, const char *path)
525 {
526         struct netio_data *nd = td->io_ops->data;
527         struct sockaddr_un *addr = &nd->addr_un;
528         mode_t mode;
529         int len, fd;
530
531         fd = socket(AF_UNIX, SOCK_STREAM, 0);
532         if (fd < 0) {
533                 log_err("fio: socket: %s\n", strerror(errno));
534                 return -1;
535         }
536
537         mode = umask(000);
538
539         memset(addr, 0, sizeof(*addr));
540         addr->sun_family = AF_UNIX;
541         strcpy(addr->sun_path, path);
542         unlink(path);
543
544         len = sizeof(addr->sun_family) + strlen(path) + 1;
545
546         if (bind(fd, (struct sockaddr *) addr, len) < 0) {
547                 log_err("fio: bind: %s\n", strerror(errno));
548                 close(fd);
549                 return -1;
550         }
551
552         umask(mode);
553         nd->listenfd = fd;
554         return 0;
555 }
556
557 static int fio_netio_setup_listen_inet(struct thread_data *td, short port)
558 {
559         struct netio_data *nd = td->io_ops->data;
560         int fd, opt, type;
561
562         if (nd->type == FIO_TYPE_TCP)
563                 type = SOCK_STREAM;
564         else
565                 type = SOCK_DGRAM;
566
567         fd = socket(AF_INET, type, 0);
568         if (fd < 0) {
569                 td_verror(td, errno, "socket");
570                 return 1;
571         }
572
573         opt = 1;
574         if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
575                 td_verror(td, errno, "setsockopt");
576                 return 1;
577         }
578 #ifdef SO_REUSEPORT
579         if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) < 0) {
580                 td_verror(td, errno, "setsockopt");
581                 return 1;
582         }
583 #endif
584
585         nd->addr.sin_family = AF_INET;
586         nd->addr.sin_addr.s_addr = htonl(INADDR_ANY);
587         nd->addr.sin_port = htons(port);
588
589         if (bind(fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
590                 td_verror(td, errno, "bind");
591                 return 1;
592         }
593
594         nd->listenfd = fd;
595         return 0;
596 }
597
598 static int fio_netio_setup_listen(struct thread_data *td, const char *path,
599                                   short port)
600 {
601         struct netio_data *nd = td->io_ops->data;
602         int ret;
603
604         if (nd->type == FIO_TYPE_UDP || nd->type == FIO_TYPE_TCP)
605                 ret = fio_netio_setup_listen_inet(td, port);
606         else
607                 ret = fio_netio_setup_listen_unix(td, path);
608
609         if (ret)
610                 return ret;
611         if (nd->type == FIO_TYPE_UDP)
612                 return 0;
613
614         if (listen(nd->listenfd, 10) < 0) {
615                 td_verror(td, errno, "listen");
616                 nd->listenfd = -1;
617                 return 1;
618         }
619
620         return 0;
621 }
622
623 static int fio_netio_init(struct thread_data *td)
624 {
625         struct netio_data *nd = td->io_ops->data;
626         unsigned int port;
627         char host[64], buf[128];
628         char *sep, *portp, *modep;
629         int ret;
630
631         if (td_rw(td)) {
632                 log_err("fio: network connections must be read OR write\n");
633                 return 1;
634         }
635         if (td_random(td)) {
636                 log_err("fio: network IO can't be random\n");
637                 return 1;
638         }
639
640         strcpy(buf, td->o.filename);
641
642         sep = strchr(buf, ',');
643         if (!sep)
644                 goto bad_host;
645
646         *sep = '\0';
647         sep++;
648         strcpy(host, buf);
649         if (!strlen(host))
650                 goto bad_host;
651
652         modep = NULL;
653         portp = sep;
654         sep = strchr(portp, ',');
655         if (sep) {
656                 *sep = '\0';
657                 modep = sep + 1;
658         }
659
660         if (!strncmp("tcp", modep, strlen(modep)) ||
661             !strncmp("TCP", modep, strlen(modep)))
662                 nd->type = FIO_TYPE_TCP;
663         else if (!strncmp("udp", modep, strlen(modep)) ||
664                  !strncmp("UDP", modep, strlen(modep)))
665                 nd->type = FIO_TYPE_UDP;
666         else if (!strncmp("unix", modep, strlen(modep)) ||
667                  !strncmp("UNIX", modep, strlen(modep)))
668                 nd->type = FIO_TYPE_UNIX;
669         else
670                 goto bad_host;
671
672         if (nd->type != FIO_TYPE_UNIX) {
673                 port = strtol(portp, NULL, 10);
674                 if (!port || port > 65535)
675                         goto bad_host;
676         } else
677                 port = 0;
678
679         if (td_read(td)) {
680                 nd->send_to_net = 0;
681                 ret = fio_netio_setup_listen(td, host, port);
682         } else {
683                 nd->send_to_net = 1;
684                 ret = fio_netio_setup_connect(td, host, port);
685         }
686
687         return ret;
688 bad_host:
689         log_err("fio: bad network host/port/protocol: %s\n", td->o.filename);
690         return 1;
691 }
692
693 static void fio_netio_cleanup(struct thread_data *td)
694 {
695         struct netio_data *nd = td->io_ops->data;
696
697         if (nd) {
698                 if (nd->listenfd != -1)
699                         close(nd->listenfd);
700                 if (nd->pipes[0] != -1)
701                         close(nd->pipes[0]);
702                 if (nd->pipes[1] != -1)
703                         close(nd->pipes[1]);
704
705                 free(nd);
706         }
707 }
708
709 static int fio_netio_setup(struct thread_data *td)
710 {
711         struct netio_data *nd;
712
713         if (!td->io_ops->data) {
714                 nd = malloc(sizeof(*nd));;
715
716                 memset(nd, 0, sizeof(*nd));
717                 nd->listenfd = -1;
718                 nd->pipes[0] = nd->pipes[1] = -1;
719                 td->io_ops->data = nd;
720         }
721
722         return 0;
723 }
724
725 #ifdef FIO_HAVE_SPLICE
726 static int fio_netio_setup_splice(struct thread_data *td)
727 {
728         struct netio_data *nd;
729
730         fio_netio_setup(td);
731
732         nd = td->io_ops->data;
733         if (nd) {
734                 if (pipe(nd->pipes) < 0)
735                         return 1;
736
737                 nd->use_splice = 1;
738                 return 0;
739         }
740
741         return 1;
742 }
743
744 static struct ioengine_ops ioengine_splice = {
745         .name           = "netsplice",
746         .version        = FIO_IOOPS_VERSION,
747         .prep           = fio_netio_prep,
748         .queue          = fio_netio_queue,
749         .setup          = fio_netio_setup_splice,
750         .init           = fio_netio_init,
751         .cleanup        = fio_netio_cleanup,
752         .open_file      = fio_netio_open_file,
753         .close_file     = generic_close_file,
754         .flags          = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
755                           FIO_SIGTERM | FIO_PIPEIO,
756 };
757 #endif
758
759 static struct ioengine_ops ioengine_rw = {
760         .name           = "net",
761         .version        = FIO_IOOPS_VERSION,
762         .prep           = fio_netio_prep,
763         .queue          = fio_netio_queue,
764         .setup          = fio_netio_setup,
765         .init           = fio_netio_init,
766         .cleanup        = fio_netio_cleanup,
767         .open_file      = fio_netio_open_file,
768         .close_file     = fio_netio_close_file,
769         .flags          = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
770                           FIO_SIGTERM | FIO_PIPEIO,
771 };
772
773 static void fio_init fio_netio_register(void)
774 {
775         register_ioengine(&ioengine_rw);
776 #ifdef FIO_HAVE_SPLICE
777         register_ioengine(&ioengine_splice);
778 #endif
779 }
780
781 static void fio_exit fio_netio_unregister(void)
782 {
783         unregister_ioengine(&ioengine_rw);
784 #ifdef FIO_HAVE_SPLICE
785         unregister_ioengine(&ioengine_splice);
786 #endif
787 }