Not all platforms have MSG_DONTWAIT
[fio.git] / engines / net.c
1 /*
2  * net engine
3  *
4  * IO engine that reads/writes to/from sockets.
5  *
6  */
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include <unistd.h>
10 #include <errno.h>
11 #include <assert.h>
12 #include <netinet/in.h>
13 #include <arpa/inet.h>
14 #include <netdb.h>
15 #include <sys/poll.h>
16 #include <sys/types.h>
17 #include <sys/socket.h>
18
19 #include "../fio.h"
20
21 struct netio_data {
22         int listenfd;
23         int send_to_net;
24         int use_splice;
25         int net_protocol;
26         int pipes[2];
27         char host[64];
28         struct sockaddr_in addr;
29 };
30
31 struct udp_close_msg {
32         uint32_t magic;
33         uint32_t cmd;
34 };
35
36 enum {
37         FIO_LINK_CLOSE = 0x89,
38         FIO_LINK_CLOSE_MAGIC = 0x6c696e6b,
39 };
40
41 /*
42  * Return -1 for error and 'nr events' for a positive number
43  * of events
44  */
45 static int poll_wait(struct thread_data *td, int fd, short events)
46 {
47         struct pollfd pfd;
48         int ret;
49
50         while (!td->terminate) {
51                 pfd.fd = fd;
52                 pfd.events = events;
53                 ret = poll(&pfd, 1, -1);
54                 if (ret < 0) {
55                         if (errno == EINTR)
56                                 break;
57
58                         td_verror(td, errno, "poll");
59                         return -1;
60                 } else if (!ret)
61                         continue;
62
63                 break;
64         }
65
66         if (pfd.revents & events)
67                 return 1;
68
69         return -1;
70 }
71
72 static int fio_netio_prep(struct thread_data *td, struct io_u *io_u)
73 {
74         struct netio_data *nd = td->io_ops->data;
75
76         /*
77          * Make sure we don't see spurious reads to a receiver, and vice versa
78          */
79         if ((nd->send_to_net && io_u->ddir == DDIR_READ) ||
80             (!nd->send_to_net && io_u->ddir == DDIR_WRITE)) {
81                 td_verror(td, EINVAL, "bad direction");
82                 return 1;
83         }
84                 
85         return 0;
86 }
87
88 #ifdef FIO_HAVE_SPLICE
89 static int splice_io_u(int fdin, int fdout, unsigned int len)
90 {
91         int bytes = 0;
92
93         while (len) {
94                 int ret = splice(fdin, NULL, fdout, NULL, len, 0);
95
96                 if (ret < 0) {
97                         if (!bytes)
98                                 bytes = ret;
99
100                         break;
101                 } else if (!ret)
102                         break;
103
104                 bytes += ret;
105                 len -= ret;
106         }
107
108         return bytes;
109 }
110
111 /*
112  * Receive bytes from a socket and fill them into the internal pipe
113  */
114 static int splice_in(struct thread_data *td, struct io_u *io_u)
115 {
116         struct netio_data *nd = td->io_ops->data;
117
118         return splice_io_u(io_u->file->fd, nd->pipes[1], io_u->xfer_buflen);
119 }
120
121 /*
122  * Transmit 'len' bytes from the internal pipe
123  */
124 static int splice_out(struct thread_data *td, struct io_u *io_u,
125                       unsigned int len)
126 {
127         struct netio_data *nd = td->io_ops->data;
128
129         return splice_io_u(nd->pipes[0], io_u->file->fd, len);
130 }
131
132 static int vmsplice_io_u(struct io_u *io_u, int fd, unsigned int len)
133 {
134         struct iovec iov = {
135                 .iov_base = io_u->xfer_buf,
136                 .iov_len = len,
137         };
138         int bytes = 0;
139
140         while (iov.iov_len) {
141                 int ret = vmsplice(fd, &iov, 1, SPLICE_F_MOVE);
142
143                 if (ret < 0) {
144                         if (!bytes)
145                                 bytes = ret;
146                         break;
147                 } else if (!ret)
148                         break;
149
150                 iov.iov_len -= ret;
151                 iov.iov_base += ret;
152                 bytes += ret;
153         }
154
155         return bytes;
156
157 }
158
159 /*
160  * vmsplice() pipe to io_u buffer
161  */
162 static int vmsplice_io_u_out(struct thread_data *td, struct io_u *io_u,
163                              unsigned int len)
164 {
165         struct netio_data *nd = td->io_ops->data;
166
167         return vmsplice_io_u(io_u, nd->pipes[0], len);
168 }
169
170 /*
171  * vmsplice() io_u to pipe
172  */
173 static int vmsplice_io_u_in(struct thread_data *td, struct io_u *io_u)
174 {
175         struct netio_data *nd = td->io_ops->data;
176
177         return vmsplice_io_u(io_u, nd->pipes[1], io_u->xfer_buflen);
178 }
179
180 /*
181  * splice receive - transfer socket data into a pipe using splice, then map
182  * that pipe data into the io_u using vmsplice.
183  */
184 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
185 {
186         int ret;
187
188         ret = splice_in(td, io_u);
189         if (ret > 0)
190                 return vmsplice_io_u_out(td, io_u, ret);
191
192         return ret;
193 }
194
195 /*
196  * splice transmit - map data from the io_u into a pipe by using vmsplice,
197  * then transfer that pipe to a socket using splice.
198  */
199 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
200 {
201         int ret;
202
203         ret = vmsplice_io_u_in(td, io_u);
204         if (ret > 0)
205                 return splice_out(td, io_u, ret);
206
207         return ret;
208 }
209 #else
210 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
211 {
212         errno = EOPNOTSUPP;
213         return -1;
214 }
215
216 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
217 {
218         errno = EOPNOTSUPP;
219         return -1;
220 }
221 #endif
222
223 static int fio_netio_send(struct thread_data *td, struct io_u *io_u)
224 {
225         struct netio_data *nd = td->io_ops->data;
226         int ret, flags = 0;
227 #ifdef MSG_DONTWAIT
228         flags = MSG_DONTWAIT;
229 #endif
230
231         do {
232                 if (nd->net_protocol == IPPROTO_UDP) {
233                         struct sockaddr *to = (struct sockaddr *) &nd->addr;
234
235                         ret = sendto(io_u->file->fd, io_u->xfer_buf,
236                                         io_u->xfer_buflen, flags, to,
237                                         sizeof(*to));
238                 } else {
239                         /*
240                          * if we are going to write more, set MSG_MORE
241                          */
242 #ifdef MSG_MORE
243                         if (td->this_io_bytes[DDIR_WRITE] + io_u->xfer_buflen <
244                             td->o.size)
245                                 flags |= MSG_MORE;
246 #endif
247                         ret = send(io_u->file->fd, io_u->xfer_buf,
248                                         io_u->xfer_buflen, flags);
249                 }
250                 if (ret > 0)
251                         break;
252
253                 ret = poll_wait(td, io_u->file->fd, POLLOUT);
254                 if (ret <= 0)
255                         break;
256
257 #ifdef MSG_DONTWAIT
258                 flags &= ~MSG_DONTWAIT;
259 #endif
260         } while (1);
261
262         return ret;
263 }
264
265 static int is_udp_close(struct io_u *io_u, int len)
266 {
267         struct udp_close_msg *msg;
268
269         if (len != sizeof(struct udp_close_msg))
270                 return 0;
271
272         msg = io_u->xfer_buf;
273         if (ntohl(msg->magic) != FIO_LINK_CLOSE_MAGIC)
274                 return 0;
275         if (ntohl(msg->cmd) != FIO_LINK_CLOSE)
276                 return 0;
277
278         return 1;
279 }
280
281 static int fio_netio_recv(struct thread_data *td, struct io_u *io_u)
282 {
283         struct netio_data *nd = td->io_ops->data;
284         int ret, flags = 0;
285 #ifdef MSG_DONTWAIT
286         flags = MSG_DONTWAIT;
287 #endif
288
289         do {
290                 if (nd->net_protocol == IPPROTO_UDP) {
291                         socklen_t len = sizeof(nd->addr);
292                         struct sockaddr *from = (struct sockaddr *) &nd->addr;
293
294                         ret = recvfrom(io_u->file->fd, io_u->xfer_buf,
295                                         io_u->xfer_buflen, flags, from, &len);
296                         if (is_udp_close(io_u, ret)) {
297                                 td->done = 1;
298                                 return 0;
299                         }
300                 } else {
301                         ret = recv(io_u->file->fd, io_u->xfer_buf,
302                                         io_u->xfer_buflen, flags);
303                 }
304                 if (ret > 0)
305                         break;
306
307                 ret = poll_wait(td, io_u->file->fd, POLLIN);
308                 if (ret <= 0)
309                         break;
310 #ifdef MSG_DONTWAIT
311                 flags &= ~MSG_DONTWAIT;
312 #endif
313                 flags |= MSG_WAITALL;
314         } while (1);
315
316         return ret;
317 }
318
319 static int fio_netio_queue(struct thread_data *td, struct io_u *io_u)
320 {
321         struct netio_data *nd = td->io_ops->data;
322         int ret;
323
324         fio_ro_check(td, io_u);
325
326         if (io_u->ddir == DDIR_WRITE) {
327                 if (!nd->use_splice || nd->net_protocol == IPPROTO_UDP)
328                         ret = fio_netio_send(td, io_u);
329                 else
330                         ret = fio_netio_splice_out(td, io_u);
331         } else if (io_u->ddir == DDIR_READ) {
332                 if (!nd->use_splice || nd->net_protocol == IPPROTO_UDP)
333                         ret = fio_netio_recv(td, io_u);
334                 else
335                         ret = fio_netio_splice_in(td, io_u);
336         } else
337                 ret = 0;        /* must be a SYNC */
338
339         if (ret != (int) io_u->xfer_buflen) {
340                 if (ret >= 0) {
341                         io_u->resid = io_u->xfer_buflen - ret;
342                         io_u->error = 0;
343                         return FIO_Q_COMPLETED;
344                 } else {
345                         int err = errno;
346
347                         if (io_u->ddir == DDIR_WRITE && err == EMSGSIZE)
348                                 return FIO_Q_BUSY;
349
350                         io_u->error = err;
351                 }
352         }
353
354         if (io_u->error)
355                 td_verror(td, io_u->error, "xfer");
356
357         return FIO_Q_COMPLETED;
358 }
359
360 static int fio_netio_connect(struct thread_data *td, struct fio_file *f)
361 {
362         struct netio_data *nd = td->io_ops->data;
363         int type;
364
365         if (nd->net_protocol == IPPROTO_TCP)
366                 type = SOCK_STREAM;
367         else
368                 type = SOCK_DGRAM;
369
370         f->fd = socket(AF_INET, type, nd->net_protocol);
371         if (f->fd < 0) {
372                 td_verror(td, errno, "socket");
373                 return 1;
374         }
375
376         if (nd->net_protocol == IPPROTO_UDP)
377                 return 0;
378
379         if (connect(f->fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
380                 td_verror(td, errno, "connect");
381                 return 1;
382         }
383
384         return 0;
385 }
386
387 static int fio_netio_accept(struct thread_data *td, struct fio_file *f)
388 {
389         struct netio_data *nd = td->io_ops->data;
390         socklen_t socklen = sizeof(nd->addr);
391
392         if (nd->net_protocol == IPPROTO_UDP) {
393                 f->fd = nd->listenfd;
394                 return 0;
395         }
396
397         log_info("fio: waiting for connection\n");
398
399         if (poll_wait(td, nd->listenfd, POLLIN) < 0)
400                 return 1;
401
402         f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr, &socklen);
403         if (f->fd < 0) {
404                 td_verror(td, errno, "accept");
405                 return 1;
406         }
407
408         return 0;
409 }
410
411 static int fio_netio_open_file(struct thread_data *td, struct fio_file *f)
412 {
413         if (td_read(td))
414                 return fio_netio_accept(td, f);
415         else
416                 return fio_netio_connect(td, f);
417 }
418
419 static void fio_netio_udp_close(struct thread_data *td, struct fio_file *f)
420 {
421         struct netio_data *nd = td->io_ops->data;
422         struct udp_close_msg msg;
423         struct sockaddr *to = (struct sockaddr *) &nd->addr;
424         int ret;
425
426         msg.magic = htonl(FIO_LINK_CLOSE_MAGIC);
427         msg.cmd = htonl(FIO_LINK_CLOSE);
428
429         ret = sendto(f->fd, &msg, sizeof(msg), MSG_WAITALL, to,
430                         sizeof(nd->addr));
431         if (ret < 0)
432                 td_verror(td, errno, "sendto udp link close");
433 }
434
435 static int fio_netio_close_file(struct thread_data *td, struct fio_file *f)
436 {
437         struct netio_data *nd = td->io_ops->data;
438
439         /*
440          * If this is an UDP connection, notify the receiver that we are
441          * closing down the link
442          */
443         if (nd->net_protocol == IPPROTO_UDP)
444                 fio_netio_udp_close(td, f);
445
446         return generic_close_file(td, f);
447 }
448
449 static int fio_netio_setup_connect(struct thread_data *td, const char *host,
450                                    unsigned short port)
451 {
452         struct netio_data *nd = td->io_ops->data;
453
454         nd->addr.sin_family = AF_INET;
455         nd->addr.sin_port = htons(port);
456
457         if (inet_aton(host, &nd->addr.sin_addr) != 1) {
458                 struct hostent *hent;
459
460                 hent = gethostbyname(host);
461                 if (!hent) {
462                         td_verror(td, errno, "gethostbyname");
463                         return 1;
464                 }
465
466                 memcpy(&nd->addr.sin_addr, hent->h_addr, 4);
467         }
468
469         return 0;
470 }
471
472 static int fio_netio_setup_listen(struct thread_data *td, short port)
473 {
474         struct netio_data *nd = td->io_ops->data;
475         int fd, opt, type;
476
477         if (nd->net_protocol == IPPROTO_TCP)
478                 type = SOCK_STREAM;
479         else
480                 type = SOCK_DGRAM;
481
482         fd = socket(AF_INET, type, nd->net_protocol);
483         if (fd < 0) {
484                 td_verror(td, errno, "socket");
485                 return 1;
486         }
487
488         opt = 1;
489         if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
490                 td_verror(td, errno, "setsockopt");
491                 return 1;
492         }
493 #ifdef SO_REUSEPORT
494         if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) < 0) {
495                 td_verror(td, errno, "setsockopt");
496                 return 1;
497         }
498 #endif
499
500         nd->addr.sin_family = AF_INET;
501         nd->addr.sin_addr.s_addr = htonl(INADDR_ANY);
502         nd->addr.sin_port = htons(port);
503
504         if (bind(fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
505                 td_verror(td, errno, "bind");
506                 return 1;
507         }
508         if (nd->net_protocol == IPPROTO_TCP && listen(fd, 1) < 0) {
509                 td_verror(td, errno, "listen");
510                 return 1;
511         }
512
513         nd->listenfd = fd;
514         return 0;
515 }
516
517 static int fio_netio_init(struct thread_data *td)
518 {
519         struct netio_data *nd = td->io_ops->data;
520         unsigned int port;
521         char host[64], buf[128];
522         char *sep, *portp, *modep;
523         int ret;
524
525         if (td_rw(td)) {
526                 log_err("fio: network connections must be read OR write\n");
527                 return 1;
528         }
529         if (td_random(td)) {
530                 log_err("fio: network IO can't be random\n");
531                 return 1;
532         }
533
534         strcpy(buf, td->o.filename);
535
536         sep = strchr(buf, '/');
537         if (!sep)
538                 goto bad_host;
539
540         *sep = '\0';
541         sep++;
542         strcpy(host, buf);
543         if (!strlen(host))
544                 goto bad_host;
545
546         modep = NULL;
547         portp = sep;
548         sep = strchr(portp, '/');
549         if (sep) {
550                 *sep = '\0';
551                 modep = sep + 1;
552         }
553                 
554         port = strtol(portp, NULL, 10);
555         if (!port || port > 65535)
556                 goto bad_host;
557
558         if (modep) {
559                 if (!strncmp("tcp", modep, strlen(modep)) ||
560                     !strncmp("TCP", modep, strlen(modep)))
561                         nd->net_protocol = IPPROTO_TCP;
562                 else if (!strncmp("udp", modep, strlen(modep)) ||
563                          !strncmp("UDP", modep, strlen(modep)))
564                         nd->net_protocol = IPPROTO_UDP;
565                 else
566                         goto bad_host;
567         } else
568                 nd->net_protocol = IPPROTO_TCP;
569
570         if (td_read(td)) {
571                 nd->send_to_net = 0;
572                 ret = fio_netio_setup_listen(td, port);
573         } else {
574                 nd->send_to_net = 1;
575                 ret = fio_netio_setup_connect(td, host, port);
576         }
577
578         return ret;
579 bad_host:
580         log_err("fio: bad network host/port/protocol: %s\n", td->o.filename);
581         return 1;
582 }
583
584 static void fio_netio_cleanup(struct thread_data *td)
585 {
586         struct netio_data *nd = td->io_ops->data;
587
588         if (nd) {
589                 if (nd->listenfd != -1)
590                         close(nd->listenfd);
591                 if (nd->pipes[0] != -1)
592                         close(nd->pipes[0]);
593                 if (nd->pipes[1] != -1)
594                         close(nd->pipes[1]);
595
596                 free(nd);
597         }
598 }
599
600 static int fio_netio_setup(struct thread_data *td)
601 {
602         struct netio_data *nd;
603
604         if (!td->io_ops->data) {
605                 nd = malloc(sizeof(*nd));;
606
607                 memset(nd, 0, sizeof(*nd));
608                 nd->listenfd = -1;
609                 nd->pipes[0] = nd->pipes[1] = -1;
610                 td->io_ops->data = nd;
611         }
612
613         return 0;
614 }
615
616 #ifdef FIO_HAVE_SPLICE
617 static int fio_netio_setup_splice(struct thread_data *td)
618 {
619         struct netio_data *nd;
620
621         fio_netio_setup(td);
622
623         nd = td->io_ops->data;
624         if (nd) {
625                 if (pipe(nd->pipes) < 0)
626                         return 1;
627
628                 nd->use_splice = 1;
629                 return 0;
630         }
631
632         return 1;
633 }
634
635 static struct ioengine_ops ioengine_splice = {
636         .name           = "netsplice",
637         .version        = FIO_IOOPS_VERSION,
638         .prep           = fio_netio_prep,
639         .queue          = fio_netio_queue,
640         .setup          = fio_netio_setup_splice,
641         .init           = fio_netio_init,
642         .cleanup        = fio_netio_cleanup,
643         .open_file      = fio_netio_open_file,
644         .close_file     = generic_close_file,
645         .flags          = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
646                           FIO_SIGQUIT | FIO_PIPEIO,
647 };
648 #endif
649
650 static struct ioengine_ops ioengine_rw = {
651         .name           = "net",
652         .version        = FIO_IOOPS_VERSION,
653         .prep           = fio_netio_prep,
654         .queue          = fio_netio_queue,
655         .setup          = fio_netio_setup,
656         .init           = fio_netio_init,
657         .cleanup        = fio_netio_cleanup,
658         .open_file      = fio_netio_open_file,
659         .close_file     = fio_netio_close_file,
660         .flags          = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
661                           FIO_SIGQUIT | FIO_PIPEIO,
662 };
663
664 static void fio_init fio_netio_register(void)
665 {
666         register_ioengine(&ioengine_rw);
667 #ifdef FIO_HAVE_SPLICE
668         register_ioengine(&ioengine_splice);
669 #endif
670 }
671
672 static void fio_exit fio_netio_unregister(void)
673 {
674         unregister_ioengine(&ioengine_rw);
675 #ifdef FIO_HAVE_SPLICE
676         unregister_ioengine(&ioengine_splice);
677 #endif
678 }