93693f6d803233c7142e97305d8cecd32bc4ee90
[fio.git] / engines / net.c
1 /*
2  * net engine
3  *
4  * IO engine that reads/writes to/from sockets.
5  *
6  */
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include <unistd.h>
10 #include <errno.h>
11 #include <assert.h>
12 #include <netinet/in.h>
13 #include <arpa/inet.h>
14 #include <netdb.h>
15 #include <sys/poll.h>
16 #include <sys/types.h>
17 #include <sys/socket.h>
18
19 #include "../fio.h"
20
21 struct netio_data {
22         int listenfd;
23         int send_to_net;
24         int use_splice;
25         int net_protocol;
26         int pipes[2];
27         char host[64];
28         struct sockaddr_in addr;
29 };
30
31 struct udp_close_msg {
32         uint32_t magic;
33         uint32_t cmd;
34 };
35
36 enum {
37         FIO_LINK_CLOSE = 0x89,
38         FIO_LINK_CLOSE_MAGIC = 0x6c696e6b,
39 };
40
41 /*
42  * Return -1 for error and 'nr events' for a positive number
43  * of events
44  */
45 static int poll_wait(struct thread_data *td, int fd, short events)
46 {
47         struct pollfd pfd;
48         int ret;
49
50         while (!td->terminate) {
51                 pfd.fd = fd;
52                 pfd.events = events;
53                 ret = poll(&pfd, 1, -1);
54                 if (ret < 0) {
55                         if (errno == EINTR)
56                                 break;
57
58                         td_verror(td, errno, "poll");
59                         return -1;
60                 } else if (!ret)
61                         continue;
62
63                 break;
64         }
65
66         if (pfd.revents & events)
67                 return 1;
68
69         return -1;
70 }
71
72 static int fio_netio_prep(struct thread_data *td, struct io_u *io_u)
73 {
74         struct netio_data *nd = td->io_ops->data;
75
76         /*
77          * Make sure we don't see spurious reads to a receiver, and vice versa
78          */
79         if ((nd->send_to_net && io_u->ddir == DDIR_READ) ||
80             (!nd->send_to_net && io_u->ddir == DDIR_WRITE)) {
81                 td_verror(td, EINVAL, "bad direction");
82                 return 1;
83         }
84                 
85         return 0;
86 }
87
88 #ifdef FIO_HAVE_SPLICE
89 static int splice_io_u(int fdin, int fdout, unsigned int len)
90 {
91         int bytes = 0;
92
93         while (len) {
94                 int ret = splice(fdin, NULL, fdout, NULL, len, 0);
95
96                 if (ret < 0) {
97                         if (!bytes)
98                                 bytes = ret;
99
100                         break;
101                 } else if (!ret)
102                         break;
103
104                 bytes += ret;
105                 len -= ret;
106         }
107
108         return bytes;
109 }
110
111 /*
112  * Receive bytes from a socket and fill them into the internal pipe
113  */
114 static int splice_in(struct thread_data *td, struct io_u *io_u)
115 {
116         struct netio_data *nd = td->io_ops->data;
117
118         return splice_io_u(io_u->file->fd, nd->pipes[1], io_u->xfer_buflen);
119 }
120
121 /*
122  * Transmit 'len' bytes from the internal pipe
123  */
124 static int splice_out(struct thread_data *td, struct io_u *io_u,
125                       unsigned int len)
126 {
127         struct netio_data *nd = td->io_ops->data;
128
129         return splice_io_u(nd->pipes[0], io_u->file->fd, len);
130 }
131
132 static int vmsplice_io_u(struct io_u *io_u, int fd, unsigned int len)
133 {
134         struct iovec iov = {
135                 .iov_base = io_u->xfer_buf,
136                 .iov_len = len,
137         };
138         int bytes = 0;
139
140         while (iov.iov_len) {
141                 int ret = vmsplice(fd, &iov, 1, SPLICE_F_MOVE);
142
143                 if (ret < 0) {
144                         if (!bytes)
145                                 bytes = ret;
146                         break;
147                 } else if (!ret)
148                         break;
149
150                 iov.iov_len -= ret;
151                 iov.iov_base += ret;
152                 bytes += ret;
153         }
154
155         return bytes;
156
157 }
158
159 /*
160  * vmsplice() pipe to io_u buffer
161  */
162 static int vmsplice_io_u_out(struct thread_data *td, struct io_u *io_u,
163                              unsigned int len)
164 {
165         struct netio_data *nd = td->io_ops->data;
166
167         return vmsplice_io_u(io_u, nd->pipes[0], len);
168 }
169
170 /*
171  * vmsplice() io_u to pipe
172  */
173 static int vmsplice_io_u_in(struct thread_data *td, struct io_u *io_u)
174 {
175         struct netio_data *nd = td->io_ops->data;
176
177         return vmsplice_io_u(io_u, nd->pipes[1], io_u->xfer_buflen);
178 }
179
180 /*
181  * splice receive - transfer socket data into a pipe using splice, then map
182  * that pipe data into the io_u using vmsplice.
183  */
184 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
185 {
186         int ret;
187
188         ret = splice_in(td, io_u);
189         if (ret > 0)
190                 return vmsplice_io_u_out(td, io_u, ret);
191
192         return ret;
193 }
194
195 /*
196  * splice transmit - map data from the io_u into a pipe by using vmsplice,
197  * then transfer that pipe to a socket using splice.
198  */
199 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
200 {
201         int ret;
202
203         ret = vmsplice_io_u_in(td, io_u);
204         if (ret > 0)
205                 return splice_out(td, io_u, ret);
206
207         return ret;
208 }
209 #else
210 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
211 {
212         errno = EOPNOTSUPP;
213         return -1;
214 }
215
216 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
217 {
218         errno = EOPNOTSUPP;
219         return -1;
220 }
221 #endif
222
223 static int fio_netio_send(struct thread_data *td, struct io_u *io_u)
224 {
225         struct netio_data *nd = td->io_ops->data;
226         int ret, flags = MSG_DONTWAIT;
227
228         do {
229                 if (nd->net_protocol == IPPROTO_UDP) {
230                         ret = sendto(io_u->file->fd, io_u->xfer_buf,
231                                         io_u->xfer_buflen, flags, &nd->addr,
232                                         sizeof(nd->addr));
233                 } else {
234                         /*
235                          * if we are going to write more, set MSG_MORE
236                          */
237 #ifdef MSG_MORE
238                         if (td->this_io_bytes[DDIR_WRITE] + io_u->xfer_buflen <
239                             td->o.size)
240                                 flags |= MSG_MORE;
241 #endif
242                         ret = send(io_u->file->fd, io_u->xfer_buf,
243                                         io_u->xfer_buflen, flags);
244                 }
245                 if (ret > 0)
246                         break;
247
248                 ret = poll_wait(td, io_u->file->fd, POLLOUT);
249                 if (ret <= 0)
250                         break;
251
252                 flags &= ~MSG_DONTWAIT;
253         } while (1);
254
255         return ret;
256 }
257
258 static int is_udp_close(struct io_u *io_u, int len)
259 {
260         struct udp_close_msg *msg;
261
262         if (len != sizeof(struct udp_close_msg))
263                 return 0;
264
265         msg = io_u->xfer_buf;
266         if (ntohl(msg->magic) != FIO_LINK_CLOSE_MAGIC)
267                 return 0;
268         if (ntohl(msg->cmd) != FIO_LINK_CLOSE)
269                 return 0;
270
271         return 1;
272 }
273
274 static int fio_netio_recv(struct thread_data *td, struct io_u *io_u)
275 {
276         struct netio_data *nd = td->io_ops->data;
277         int ret, flags = MSG_DONTWAIT;
278
279         do {
280                 if (nd->net_protocol == IPPROTO_UDP) {
281                         socklen_t len = sizeof(nd->addr);
282
283                         ret = recvfrom(io_u->file->fd, io_u->xfer_buf,
284                                         io_u->xfer_buflen, flags, &nd->addr,
285                                         &len);
286                         if (is_udp_close(io_u, ret)) {
287                                 td->done = 1;
288                                 return 0;
289                         }
290                 } else {
291                         ret = recv(io_u->file->fd, io_u->xfer_buf,
292                                         io_u->xfer_buflen, flags);
293                 }
294                 if (ret > 0)
295                         break;
296
297                 ret = poll_wait(td, io_u->file->fd, POLLIN);
298                 if (ret <= 0)
299                         break;
300                 flags &= ~MSG_DONTWAIT;
301                 flags |= MSG_WAITALL;
302         } while (1);
303
304         return ret;
305 }
306
307 static int fio_netio_queue(struct thread_data *td, struct io_u *io_u)
308 {
309         struct netio_data *nd = td->io_ops->data;
310         int ret;
311
312         fio_ro_check(td, io_u);
313
314         if (io_u->ddir == DDIR_WRITE) {
315                 if (!nd->use_splice || nd->net_protocol == IPPROTO_UDP)
316                         ret = fio_netio_send(td, io_u);
317                 else
318                         ret = fio_netio_splice_out(td, io_u);
319         } else if (io_u->ddir == DDIR_READ) {
320                 if (!nd->use_splice || nd->net_protocol == IPPROTO_UDP)
321                         ret = fio_netio_recv(td, io_u);
322                 else
323                         ret = fio_netio_splice_in(td, io_u);
324         } else
325                 ret = 0;        /* must be a SYNC */
326
327         if (ret != (int) io_u->xfer_buflen) {
328                 if (ret >= 0) {
329                         io_u->resid = io_u->xfer_buflen - ret;
330                         io_u->error = 0;
331                         return FIO_Q_COMPLETED;
332                 } else {
333                         int err = errno;
334
335                         if (io_u->ddir == DDIR_WRITE && err == EMSGSIZE)
336                                 return FIO_Q_BUSY;
337
338                         io_u->error = err;
339                 }
340         }
341
342         if (io_u->error)
343                 td_verror(td, io_u->error, "xfer");
344
345         return FIO_Q_COMPLETED;
346 }
347
348 static int fio_netio_connect(struct thread_data *td, struct fio_file *f)
349 {
350         struct netio_data *nd = td->io_ops->data;
351         int type;
352
353         if (nd->net_protocol == IPPROTO_TCP)
354                 type = SOCK_STREAM;
355         else
356                 type = SOCK_DGRAM;
357
358         f->fd = socket(AF_INET, type, nd->net_protocol);
359         if (f->fd < 0) {
360                 td_verror(td, errno, "socket");
361                 return 1;
362         }
363
364         if (nd->net_protocol == IPPROTO_UDP)
365                 return 0;
366
367         if (connect(f->fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
368                 td_verror(td, errno, "connect");
369                 return 1;
370         }
371
372         return 0;
373 }
374
375 static int fio_netio_accept(struct thread_data *td, struct fio_file *f)
376 {
377         struct netio_data *nd = td->io_ops->data;
378         socklen_t socklen = sizeof(nd->addr);
379
380         if (nd->net_protocol == IPPROTO_UDP) {
381                 f->fd = nd->listenfd;
382                 return 0;
383         }
384
385         log_info("fio: waiting for connection\n");
386
387         if (poll_wait(td, nd->listenfd, POLLIN) < 0)
388                 return 1;
389
390         f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr, &socklen);
391         if (f->fd < 0) {
392                 td_verror(td, errno, "accept");
393                 return 1;
394         }
395
396         return 0;
397 }
398
399 static int fio_netio_open_file(struct thread_data *td, struct fio_file *f)
400 {
401         if (td_read(td))
402                 return fio_netio_accept(td, f);
403         else
404                 return fio_netio_connect(td, f);
405 }
406
407 static void fio_netio_udp_close(struct thread_data *td, struct fio_file *f)
408 {
409         struct netio_data *nd = td->io_ops->data;
410         struct udp_close_msg msg;
411         int ret;
412
413         msg.magic = htonl(FIO_LINK_CLOSE_MAGIC);
414         msg.cmd = htonl(FIO_LINK_CLOSE);
415
416         ret = sendto(f->fd, &msg, sizeof(msg), MSG_WAITALL, &nd->addr,
417                         sizeof(nd->addr));
418         if (ret < 0)
419                 td_verror(td, errno, "sendto udp link close");
420 }
421
422 static int fio_netio_close_file(struct thread_data *td, struct fio_file *f)
423 {
424         struct netio_data *nd = td->io_ops->data;
425
426         /*
427          * If this is an UDP connection, notify the receiver that we are
428          * closing down the link
429          */
430         if (nd->net_protocol == IPPROTO_UDP)
431                 fio_netio_udp_close(td, f);
432
433         return generic_close_file(td, f);
434 }
435
436 static int fio_netio_setup_connect(struct thread_data *td, const char *host,
437                                    unsigned short port)
438 {
439         struct netio_data *nd = td->io_ops->data;
440
441         nd->addr.sin_family = AF_INET;
442         nd->addr.sin_port = htons(port);
443
444         if (inet_aton(host, &nd->addr.sin_addr) != 1) {
445                 struct hostent *hent;
446
447                 hent = gethostbyname(host);
448                 if (!hent) {
449                         td_verror(td, errno, "gethostbyname");
450                         return 1;
451                 }
452
453                 memcpy(&nd->addr.sin_addr, hent->h_addr, 4);
454         }
455
456         return 0;
457 }
458
459 static int fio_netio_setup_listen(struct thread_data *td, short port)
460 {
461         struct netio_data *nd = td->io_ops->data;
462         int fd, opt, type;
463
464         if (nd->net_protocol == IPPROTO_TCP)
465                 type = SOCK_STREAM;
466         else
467                 type = SOCK_DGRAM;
468
469         fd = socket(AF_INET, type, nd->net_protocol);
470         if (fd < 0) {
471                 td_verror(td, errno, "socket");
472                 return 1;
473         }
474
475         opt = 1;
476         if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
477                 td_verror(td, errno, "setsockopt");
478                 return 1;
479         }
480 #ifdef SO_REUSEPORT
481         if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) < 0) {
482                 td_verror(td, errno, "setsockopt");
483                 return 1;
484         }
485 #endif
486
487         nd->addr.sin_family = AF_INET;
488         nd->addr.sin_addr.s_addr = htonl(INADDR_ANY);
489         nd->addr.sin_port = htons(port);
490
491         if (bind(fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
492                 td_verror(td, errno, "bind");
493                 return 1;
494         }
495         if (nd->net_protocol == IPPROTO_TCP && listen(fd, 1) < 0) {
496                 td_verror(td, errno, "listen");
497                 return 1;
498         }
499
500         nd->listenfd = fd;
501         return 0;
502 }
503
504 static int fio_netio_init(struct thread_data *td)
505 {
506         struct netio_data *nd = td->io_ops->data;
507         unsigned int port;
508         char host[64], buf[128];
509         char *sep, *portp, *modep;
510         int ret;
511
512         if (td_rw(td)) {
513                 log_err("fio: network connections must be read OR write\n");
514                 return 1;
515         }
516         if (td_random(td)) {
517                 log_err("fio: network IO can't be random\n");
518                 return 1;
519         }
520
521         strcpy(buf, td->o.filename);
522
523         sep = strchr(buf, '/');
524         if (!sep)
525                 goto bad_host;
526
527         *sep = '\0';
528         sep++;
529         strcpy(host, buf);
530         if (!strlen(host))
531                 goto bad_host;
532
533         modep = NULL;
534         portp = sep;
535         sep = strchr(portp, '/');
536         if (sep) {
537                 *sep = '\0';
538                 modep = sep + 1;
539         }
540                 
541         port = strtol(portp, NULL, 10);
542         if (!port || port > 65535)
543                 goto bad_host;
544
545         if (modep) {
546                 if (!strncmp("tcp", modep, strlen(modep)) ||
547                     !strncmp("TCP", modep, strlen(modep)))
548                         nd->net_protocol = IPPROTO_TCP;
549                 else if (!strncmp("udp", modep, strlen(modep)) ||
550                          !strncmp("UDP", modep, strlen(modep)))
551                         nd->net_protocol = IPPROTO_UDP;
552                 else
553                         goto bad_host;
554         } else
555                 nd->net_protocol = IPPROTO_TCP;
556
557         if (td_read(td)) {
558                 nd->send_to_net = 0;
559                 ret = fio_netio_setup_listen(td, port);
560         } else {
561                 nd->send_to_net = 1;
562                 ret = fio_netio_setup_connect(td, host, port);
563         }
564
565         return ret;
566 bad_host:
567         log_err("fio: bad network host/port/protocol: %s\n", td->o.filename);
568         return 1;
569 }
570
571 static void fio_netio_cleanup(struct thread_data *td)
572 {
573         struct netio_data *nd = td->io_ops->data;
574
575         if (nd) {
576                 if (nd->listenfd != -1)
577                         close(nd->listenfd);
578                 if (nd->pipes[0] != -1)
579                         close(nd->pipes[0]);
580                 if (nd->pipes[1] != -1)
581                         close(nd->pipes[1]);
582
583                 free(nd);
584         }
585 }
586
587 static int fio_netio_setup(struct thread_data *td)
588 {
589         struct netio_data *nd;
590
591         if (!td->io_ops->data) {
592                 nd = malloc(sizeof(*nd));;
593
594                 memset(nd, 0, sizeof(*nd));
595                 nd->listenfd = -1;
596                 nd->pipes[0] = nd->pipes[1] = -1;
597                 td->io_ops->data = nd;
598         }
599
600         return 0;
601 }
602
603 #ifdef FIO_HAVE_SPLICE
604 static int fio_netio_setup_splice(struct thread_data *td)
605 {
606         struct netio_data *nd;
607
608         fio_netio_setup(td);
609
610         nd = td->io_ops->data;
611         if (nd) {
612                 if (pipe(nd->pipes) < 0)
613                         return 1;
614
615                 nd->use_splice = 1;
616                 return 0;
617         }
618
619         return 1;
620 }
621
622 static struct ioengine_ops ioengine_splice = {
623         .name           = "netsplice",
624         .version        = FIO_IOOPS_VERSION,
625         .prep           = fio_netio_prep,
626         .queue          = fio_netio_queue,
627         .setup          = fio_netio_setup_splice,
628         .init           = fio_netio_init,
629         .cleanup        = fio_netio_cleanup,
630         .open_file      = fio_netio_open_file,
631         .close_file     = generic_close_file,
632         .flags          = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
633                           FIO_SIGQUIT,
634 };
635 #endif
636
637 static struct ioengine_ops ioengine_rw = {
638         .name           = "net",
639         .version        = FIO_IOOPS_VERSION,
640         .prep           = fio_netio_prep,
641         .queue          = fio_netio_queue,
642         .setup          = fio_netio_setup,
643         .init           = fio_netio_init,
644         .cleanup        = fio_netio_cleanup,
645         .open_file      = fio_netio_open_file,
646         .close_file     = fio_netio_close_file,
647         .flags          = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
648                           FIO_SIGQUIT,
649 };
650
651 static void fio_init fio_netio_register(void)
652 {
653         register_ioengine(&ioengine_rw);
654 #ifdef FIO_HAVE_SPLICE
655         register_ioengine(&ioengine_splice);
656 #endif
657 }
658
659 static void fio_exit fio_netio_unregister(void)
660 {
661         unregister_ioengine(&ioengine_rw);
662 #ifdef FIO_HAVE_SPLICE
663         unregister_ioengine(&ioengine_splice);
664 #endif
665 }