net engine: use poll() always for sending/receiving
[fio.git] / engines / net.c
1 /*
2  * net engine
3  *
4  * IO engine that reads/writes to/from sockets.
5  *
6  */
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include <unistd.h>
10 #include <errno.h>
11 #include <assert.h>
12 #include <netinet/in.h>
13 #include <arpa/inet.h>
14 #include <netdb.h>
15 #include <sys/poll.h>
16 #include <sys/types.h>
17 #include <sys/socket.h>
18
19 #include "../fio.h"
20
21 struct netio_data {
22         int listenfd;
23         int send_to_net;
24         int use_splice;
25         int net_protocol;
26         int pipes[2];
27         char host[64];
28         struct sockaddr_in addr;
29 };
30
31 /*
32  * Return -1 for error and 'nr events' for a positive number
33  * of events
34  */
35 static int poll_wait(struct thread_data *td, int fd, short events)
36 {
37         struct pollfd pfd;
38         int ret;
39
40         while (!td->terminate) {
41                 pfd.fd = fd;
42                 pfd.events = events;
43                 ret = poll(&pfd, 1, -1);
44                 if (ret < 0) {
45                         if (errno == EINTR)
46                                 continue;
47
48                         td_verror(td, errno, "poll");
49                         return -1;
50                 } else if (!ret)
51                         continue;
52
53                 break;
54         }
55
56         if (pfd.revents & events)
57                 return 1;
58         else if (td->terminate)
59                 return 1;
60
61         return -1;
62 }
63
64 static int fio_netio_prep(struct thread_data *td, struct io_u *io_u)
65 {
66         struct netio_data *nd = td->io_ops->data;
67
68         /*
69          * Make sure we don't see spurious reads to a receiver, and vice versa
70          */
71         if ((nd->send_to_net && io_u->ddir == DDIR_READ) ||
72             (!nd->send_to_net && io_u->ddir == DDIR_WRITE)) {
73                 td_verror(td, EINVAL, "bad direction");
74                 return 1;
75         }
76                 
77         return 0;
78 }
79
80 #ifdef FIO_HAVE_SPLICE
81 static int splice_io_u(int fdin, int fdout, unsigned int len)
82 {
83         int bytes = 0;
84
85         while (len) {
86                 int ret = splice(fdin, NULL, fdout, NULL, len, 0);
87
88                 if (ret < 0) {
89                         if (!bytes)
90                                 bytes = ret;
91
92                         break;
93                 } else if (!ret)
94                         break;
95
96                 bytes += ret;
97                 len -= ret;
98         }
99
100         return bytes;
101 }
102
103 /*
104  * Receive bytes from a socket and fill them into the internal pipe
105  */
106 static int splice_in(struct thread_data *td, struct io_u *io_u)
107 {
108         struct netio_data *nd = td->io_ops->data;
109
110         return splice_io_u(io_u->file->fd, nd->pipes[1], io_u->xfer_buflen);
111 }
112
113 /*
114  * Transmit 'len' bytes from the internal pipe
115  */
116 static int splice_out(struct thread_data *td, struct io_u *io_u,
117                       unsigned int len)
118 {
119         struct netio_data *nd = td->io_ops->data;
120
121         return splice_io_u(nd->pipes[0], io_u->file->fd, len);
122 }
123
124 static int vmsplice_io_u(struct io_u *io_u, int fd, unsigned int len)
125 {
126         struct iovec iov = {
127                 .iov_base = io_u->xfer_buf,
128                 .iov_len = len,
129         };
130         int bytes = 0;
131
132         while (iov.iov_len) {
133                 int ret = vmsplice(fd, &iov, 1, SPLICE_F_MOVE);
134
135                 if (ret < 0) {
136                         if (!bytes)
137                                 bytes = ret;
138                         break;
139                 } else if (!ret)
140                         break;
141
142                 iov.iov_len -= ret;
143                 iov.iov_base += ret;
144                 bytes += ret;
145         }
146
147         return bytes;
148
149 }
150
151 /*
152  * vmsplice() pipe to io_u buffer
153  */
154 static int vmsplice_io_u_out(struct thread_data *td, struct io_u *io_u,
155                              unsigned int len)
156 {
157         struct netio_data *nd = td->io_ops->data;
158
159         return vmsplice_io_u(io_u, nd->pipes[0], len);
160 }
161
162 /*
163  * vmsplice() io_u to pipe
164  */
165 static int vmsplice_io_u_in(struct thread_data *td, struct io_u *io_u)
166 {
167         struct netio_data *nd = td->io_ops->data;
168
169         return vmsplice_io_u(io_u, nd->pipes[1], io_u->xfer_buflen);
170 }
171
172 /*
173  * splice receive - transfer socket data into a pipe using splice, then map
174  * that pipe data into the io_u using vmsplice.
175  */
176 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
177 {
178         int ret;
179
180         ret = splice_in(td, io_u);
181         if (ret > 0)
182                 return vmsplice_io_u_out(td, io_u, ret);
183
184         return ret;
185 }
186
187 /*
188  * splice transmit - map data from the io_u into a pipe by using vmsplice,
189  * then transfer that pipe to a socket using splice.
190  */
191 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
192 {
193         int ret;
194
195         ret = vmsplice_io_u_in(td, io_u);
196         if (ret > 0)
197                 return splice_out(td, io_u, ret);
198
199         return ret;
200 }
201 #else
202 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
203 {
204         errno = EOPNOTSUPP;
205         return -1;
206 }
207
208 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
209 {
210         errno = EOPNOTSUPP;
211         return -1;
212 }
213 #endif
214
215 static int fio_netio_send(struct thread_data *td, struct io_u *io_u)
216 {
217         struct netio_data *nd = td->io_ops->data;
218         int ret, flags = 0;
219
220         ret = poll_wait(td, io_u->file->fd, POLLOUT);
221         if (ret <= 0)
222                 return ret;
223
224         /*
225          * if we are going to write more, set MSG_MORE
226          */
227 #ifdef MSG_MORE
228         if (td->this_io_bytes[DDIR_WRITE] + io_u->xfer_buflen < td->o.size)
229                 flags = MSG_MORE;
230 #endif
231
232         if (nd->net_protocol == IPPROTO_UDP) {
233                 return sendto(io_u->file->fd, io_u->xfer_buf, io_u->xfer_buflen,
234                                 flags, &nd->addr, sizeof(nd->addr));
235         } else {
236                 return send(io_u->file->fd, io_u->xfer_buf, io_u->xfer_buflen,
237                                 flags);
238         }
239 }
240
241 static int fio_netio_recv(struct thread_data *td, struct io_u *io_u)
242 {
243         struct netio_data *nd = td->io_ops->data;
244         int ret, flags = MSG_WAITALL;
245
246         ret = poll_wait(td, io_u->file->fd, POLLIN);
247         if (ret <= 0)
248                 return ret;
249
250         if (nd->net_protocol == IPPROTO_UDP) {
251                 socklen_t len = sizeof(nd->addr);
252
253                 return recvfrom(io_u->file->fd, io_u->xfer_buf,
254                                 io_u->xfer_buflen, 0, &nd->addr, &len);
255         } else {
256                 return recv(io_u->file->fd, io_u->xfer_buf, io_u->xfer_buflen,
257                                 flags);
258         }
259 }
260
261 static int fio_netio_queue(struct thread_data *td, struct io_u *io_u)
262 {
263         struct netio_data *nd = td->io_ops->data;
264         int ret;
265
266         fio_ro_check(td, io_u);
267
268         if (io_u->ddir == DDIR_WRITE) {
269                 if (!nd->use_splice || nd->net_protocol == IPPROTO_UDP)
270                         ret = fio_netio_send(td, io_u);
271                 else
272                         ret = fio_netio_splice_out(td, io_u);
273         } else if (io_u->ddir == DDIR_READ) {
274                 if (!nd->use_splice || nd->net_protocol == IPPROTO_UDP)
275                         ret = fio_netio_recv(td, io_u);
276                 else
277                         ret = fio_netio_splice_in(td, io_u);
278         } else
279                 ret = 0;        /* must be a SYNC */
280
281         if (ret != (int) io_u->xfer_buflen) {
282                 if (ret >= 0) {
283                         io_u->resid = io_u->xfer_buflen - ret;
284                         io_u->error = 0;
285                         return FIO_Q_COMPLETED;
286                 } else {
287                         int err = errno;
288
289                         if (io_u->ddir == DDIR_WRITE && err == EMSGSIZE)
290                                 return FIO_Q_BUSY;
291
292                         io_u->error = err;
293                 }
294         }
295
296         if (io_u->error)
297                 td_verror(td, io_u->error, "xfer");
298
299         return FIO_Q_COMPLETED;
300 }
301
302 static int fio_netio_connect(struct thread_data *td, struct fio_file *f)
303 {
304         struct netio_data *nd = td->io_ops->data;
305         int type;
306
307         if (nd->net_protocol == IPPROTO_TCP)
308                 type = SOCK_STREAM;
309         else
310                 type = SOCK_DGRAM;
311
312         f->fd = socket(AF_INET, type, nd->net_protocol);
313         if (f->fd < 0) {
314                 td_verror(td, errno, "socket");
315                 return 1;
316         }
317
318         if (nd->net_protocol == IPPROTO_UDP)
319                 return 0;
320
321         if (connect(f->fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
322                 td_verror(td, errno, "connect");
323                 return 1;
324         }
325
326         return 0;
327 }
328
329 static int fio_netio_accept(struct thread_data *td, struct fio_file *f)
330 {
331         struct netio_data *nd = td->io_ops->data;
332         socklen_t socklen = sizeof(nd->addr);
333
334         if (nd->net_protocol == IPPROTO_UDP) {
335                 f->fd = nd->listenfd;
336                 return 0;
337         }
338
339         log_info("fio: waiting for connection\n");
340
341         if (poll_wait(td, nd->listenfd, POLLIN) < 0)
342                 return 1;
343
344         f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr, &socklen);
345         if (f->fd < 0) {
346                 td_verror(td, errno, "accept");
347                 return 1;
348         }
349
350         return 0;
351 }
352
353 static int fio_netio_open_file(struct thread_data *td, struct fio_file *f)
354 {
355         if (td_read(td))
356                 return fio_netio_accept(td, f);
357         else
358                 return fio_netio_connect(td, f);
359 }
360
361 static int fio_netio_setup_connect(struct thread_data *td, const char *host,
362                                    unsigned short port)
363 {
364         struct netio_data *nd = td->io_ops->data;
365
366         nd->addr.sin_family = AF_INET;
367         nd->addr.sin_port = htons(port);
368
369         if (inet_aton(host, &nd->addr.sin_addr) != 1) {
370                 struct hostent *hent;
371
372                 hent = gethostbyname(host);
373                 if (!hent) {
374                         td_verror(td, errno, "gethostbyname");
375                         return 1;
376                 }
377
378                 memcpy(&nd->addr.sin_addr, hent->h_addr, 4);
379         }
380
381         return 0;
382 }
383
384 static int fio_netio_setup_listen(struct thread_data *td, short port)
385 {
386         struct netio_data *nd = td->io_ops->data;
387         int fd, opt, type;
388
389         if (nd->net_protocol == IPPROTO_TCP)
390                 type = SOCK_STREAM;
391         else
392                 type = SOCK_DGRAM;
393
394         fd = socket(AF_INET, type, nd->net_protocol);
395         if (fd < 0) {
396                 td_verror(td, errno, "socket");
397                 return 1;
398         }
399
400         opt = 1;
401         if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
402                 td_verror(td, errno, "setsockopt");
403                 return 1;
404         }
405 #ifdef SO_REUSEPORT
406         if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) < 0) {
407                 td_verror(td, errno, "setsockopt");
408                 return 1;
409         }
410 #endif
411
412         nd->addr.sin_family = AF_INET;
413         nd->addr.sin_addr.s_addr = htonl(INADDR_ANY);
414         nd->addr.sin_port = htons(port);
415
416         if (bind(fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
417                 td_verror(td, errno, "bind");
418                 return 1;
419         }
420         if (nd->net_protocol == IPPROTO_TCP && listen(fd, 1) < 0) {
421                 td_verror(td, errno, "listen");
422                 return 1;
423         }
424
425         nd->listenfd = fd;
426         return 0;
427 }
428
429 static int fio_netio_init(struct thread_data *td)
430 {
431         struct netio_data *nd = td->io_ops->data;
432         unsigned int port;
433         char host[64], buf[128];
434         char *sep, *portp, *modep;
435         int ret;
436
437         if (td_rw(td)) {
438                 log_err("fio: network connections must be read OR write\n");
439                 return 1;
440         }
441         if (td_random(td)) {
442                 log_err("fio: network IO can't be random\n");
443                 return 1;
444         }
445
446         strcpy(buf, td->o.filename);
447
448         sep = strchr(buf, '/');
449         if (!sep)
450                 goto bad_host;
451
452         *sep = '\0';
453         sep++;
454         strcpy(host, buf);
455         if (!strlen(host))
456                 goto bad_host;
457
458         modep = NULL;
459         portp = sep;
460         sep = strchr(portp, '/');
461         if (sep) {
462                 *sep = '\0';
463                 modep = sep + 1;
464         }
465                 
466         port = strtol(portp, NULL, 10);
467         if (!port || port > 65535)
468                 goto bad_host;
469
470         if (modep) {
471                 if (!strncmp("tcp", modep, strlen(modep)) ||
472                     !strncmp("TCP", modep, strlen(modep)))
473                         nd->net_protocol = IPPROTO_TCP;
474                 else if (!strncmp("udp", modep, strlen(modep)) ||
475                          !strncmp("UDP", modep, strlen(modep)))
476                         nd->net_protocol = IPPROTO_UDP;
477                 else
478                         goto bad_host;
479         } else
480                 nd->net_protocol = IPPROTO_TCP;
481
482         if (td_read(td)) {
483                 nd->send_to_net = 0;
484                 ret = fio_netio_setup_listen(td, port);
485         } else {
486                 nd->send_to_net = 1;
487                 ret = fio_netio_setup_connect(td, host, port);
488         }
489
490         return ret;
491 bad_host:
492         log_err("fio: bad network host/port/protocol: %s\n", td->o.filename);
493         return 1;
494 }
495
496 static void fio_netio_cleanup(struct thread_data *td)
497 {
498         struct netio_data *nd = td->io_ops->data;
499
500         if (nd) {
501                 if (nd->listenfd != -1)
502                         close(nd->listenfd);
503                 if (nd->pipes[0] != -1)
504                         close(nd->pipes[0]);
505                 if (nd->pipes[1] != -1)
506                         close(nd->pipes[1]);
507
508                 free(nd);
509         }
510 }
511
512 static int fio_netio_setup(struct thread_data *td)
513 {
514         struct netio_data *nd;
515
516         if (!td->io_ops->data) {
517                 nd = malloc(sizeof(*nd));;
518
519                 memset(nd, 0, sizeof(*nd));
520                 nd->listenfd = -1;
521                 nd->pipes[0] = nd->pipes[1] = -1;
522                 td->io_ops->data = nd;
523         }
524
525         return 0;
526 }
527
528 #ifdef FIO_HAVE_SPLICE
529 static int fio_netio_setup_splice(struct thread_data *td)
530 {
531         struct netio_data *nd;
532
533         fio_netio_setup(td);
534
535         nd = td->io_ops->data;
536         if (nd) {
537                 if (pipe(nd->pipes) < 0)
538                         return 1;
539
540                 nd->use_splice = 1;
541                 return 0;
542         }
543
544         return 1;
545 }
546
547 static struct ioengine_ops ioengine_splice = {
548         .name           = "netsplice",
549         .version        = FIO_IOOPS_VERSION,
550         .prep           = fio_netio_prep,
551         .queue          = fio_netio_queue,
552         .setup          = fio_netio_setup_splice,
553         .init           = fio_netio_init,
554         .cleanup        = fio_netio_cleanup,
555         .open_file      = fio_netio_open_file,
556         .close_file     = generic_close_file,
557         .flags          = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
558                           FIO_SIGQUIT,
559 };
560 #endif
561
562 static struct ioengine_ops ioengine_rw = {
563         .name           = "net",
564         .version        = FIO_IOOPS_VERSION,
565         .prep           = fio_netio_prep,
566         .queue          = fio_netio_queue,
567         .setup          = fio_netio_setup,
568         .init           = fio_netio_init,
569         .cleanup        = fio_netio_cleanup,
570         .open_file      = fio_netio_open_file,
571         .close_file     = generic_close_file,
572         .flags          = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
573                           FIO_SIGQUIT,
574 };
575
576 static void fio_init fio_netio_register(void)
577 {
578         register_ioengine(&ioengine_rw);
579 #ifdef FIO_HAVE_SPLICE
580         register_ioengine(&ioengine_splice);
581 #endif
582 }
583
584 static void fio_exit fio_netio_unregister(void)
585 {
586         unregister_ioengine(&ioengine_rw);
587 #ifdef FIO_HAVE_SPLICE
588         unregister_ioengine(&ioengine_splice);
589 #endif
590 }