net engine: termination fixes
[fio.git] / engines / net.c
1 /*
2  * net engine
3  *
4  * IO engine that reads/writes to/from sockets.
5  *
6  */
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include <unistd.h>
10 #include <errno.h>
11 #include <assert.h>
12 #include <netinet/in.h>
13 #include <arpa/inet.h>
14 #include <netdb.h>
15 #include <sys/poll.h>
16 #include <sys/types.h>
17 #include <sys/socket.h>
18
19 #include "../fio.h"
20
21 struct netio_data {
22         int listenfd;
23         int send_to_net;
24         int use_splice;
25         int net_protocol;
26         int pipes[2];
27         char host[64];
28         struct sockaddr_in addr;
29 };
30
31 /*
32  * Return -1 for error and 'nr events' for a positive number
33  * of events
34  */
35 static int poll_wait(struct thread_data *td, int fd, short events)
36 {
37         struct pollfd pfd;
38         int ret;
39
40         while (!td->terminate) {
41                 pfd.fd = fd;
42                 pfd.events = events;
43                 ret = poll(&pfd, 1, -1);
44                 if (ret < 0) {
45                         if (errno == EINTR)
46                                 break;
47
48                         td_verror(td, errno, "poll");
49                         return -1;
50                 } else if (!ret)
51                         continue;
52
53                 break;
54         }
55
56         if (pfd.revents & events)
57                 return 1;
58
59         return -1;
60 }
61
62 static int fio_netio_prep(struct thread_data *td, struct io_u *io_u)
63 {
64         struct netio_data *nd = td->io_ops->data;
65
66         /*
67          * Make sure we don't see spurious reads to a receiver, and vice versa
68          */
69         if ((nd->send_to_net && io_u->ddir == DDIR_READ) ||
70             (!nd->send_to_net && io_u->ddir == DDIR_WRITE)) {
71                 td_verror(td, EINVAL, "bad direction");
72                 return 1;
73         }
74                 
75         return 0;
76 }
77
78 #ifdef FIO_HAVE_SPLICE
79 static int splice_io_u(int fdin, int fdout, unsigned int len)
80 {
81         int bytes = 0;
82
83         while (len) {
84                 int ret = splice(fdin, NULL, fdout, NULL, len, 0);
85
86                 if (ret < 0) {
87                         if (!bytes)
88                                 bytes = ret;
89
90                         break;
91                 } else if (!ret)
92                         break;
93
94                 bytes += ret;
95                 len -= ret;
96         }
97
98         return bytes;
99 }
100
101 /*
102  * Receive bytes from a socket and fill them into the internal pipe
103  */
104 static int splice_in(struct thread_data *td, struct io_u *io_u)
105 {
106         struct netio_data *nd = td->io_ops->data;
107
108         return splice_io_u(io_u->file->fd, nd->pipes[1], io_u->xfer_buflen);
109 }
110
111 /*
112  * Transmit 'len' bytes from the internal pipe
113  */
114 static int splice_out(struct thread_data *td, struct io_u *io_u,
115                       unsigned int len)
116 {
117         struct netio_data *nd = td->io_ops->data;
118
119         return splice_io_u(nd->pipes[0], io_u->file->fd, len);
120 }
121
122 static int vmsplice_io_u(struct io_u *io_u, int fd, unsigned int len)
123 {
124         struct iovec iov = {
125                 .iov_base = io_u->xfer_buf,
126                 .iov_len = len,
127         };
128         int bytes = 0;
129
130         while (iov.iov_len) {
131                 int ret = vmsplice(fd, &iov, 1, SPLICE_F_MOVE);
132
133                 if (ret < 0) {
134                         if (!bytes)
135                                 bytes = ret;
136                         break;
137                 } else if (!ret)
138                         break;
139
140                 iov.iov_len -= ret;
141                 iov.iov_base += ret;
142                 bytes += ret;
143         }
144
145         return bytes;
146
147 }
148
149 /*
150  * vmsplice() pipe to io_u buffer
151  */
152 static int vmsplice_io_u_out(struct thread_data *td, struct io_u *io_u,
153                              unsigned int len)
154 {
155         struct netio_data *nd = td->io_ops->data;
156
157         return vmsplice_io_u(io_u, nd->pipes[0], len);
158 }
159
160 /*
161  * vmsplice() io_u to pipe
162  */
163 static int vmsplice_io_u_in(struct thread_data *td, struct io_u *io_u)
164 {
165         struct netio_data *nd = td->io_ops->data;
166
167         return vmsplice_io_u(io_u, nd->pipes[1], io_u->xfer_buflen);
168 }
169
170 /*
171  * splice receive - transfer socket data into a pipe using splice, then map
172  * that pipe data into the io_u using vmsplice.
173  */
174 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
175 {
176         int ret;
177
178         ret = splice_in(td, io_u);
179         if (ret > 0)
180                 return vmsplice_io_u_out(td, io_u, ret);
181
182         return ret;
183 }
184
185 /*
186  * splice transmit - map data from the io_u into a pipe by using vmsplice,
187  * then transfer that pipe to a socket using splice.
188  */
189 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
190 {
191         int ret;
192
193         ret = vmsplice_io_u_in(td, io_u);
194         if (ret > 0)
195                 return splice_out(td, io_u, ret);
196
197         return ret;
198 }
199 #else
200 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
201 {
202         errno = EOPNOTSUPP;
203         return -1;
204 }
205
206 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
207 {
208         errno = EOPNOTSUPP;
209         return -1;
210 }
211 #endif
212
213 static int fio_netio_send(struct thread_data *td, struct io_u *io_u)
214 {
215         struct netio_data *nd = td->io_ops->data;
216         int ret, flags = 0;
217
218         ret = poll_wait(td, io_u->file->fd, POLLOUT);
219         if (ret <= 0)
220                 return ret;
221
222         /*
223          * if we are going to write more, set MSG_MORE
224          */
225 #ifdef MSG_MORE
226         if (td->this_io_bytes[DDIR_WRITE] + io_u->xfer_buflen < td->o.size)
227                 flags = MSG_MORE;
228 #endif
229
230         if (nd->net_protocol == IPPROTO_UDP) {
231                 return sendto(io_u->file->fd, io_u->xfer_buf, io_u->xfer_buflen,
232                                 0, &nd->addr, sizeof(nd->addr));
233         } else {
234                 return send(io_u->file->fd, io_u->xfer_buf, io_u->xfer_buflen,
235                                 flags);
236         }
237 }
238
239 static int fio_netio_recv(struct thread_data *td, struct io_u *io_u)
240 {
241         struct netio_data *nd = td->io_ops->data;
242         int ret, flags = MSG_WAITALL;
243
244         ret = poll_wait(td, io_u->file->fd, POLLIN);
245         if (ret <= 0)
246                 return ret;
247
248         if (nd->net_protocol == IPPROTO_UDP) {
249                 socklen_t len = sizeof(nd->addr);
250
251                 return recvfrom(io_u->file->fd, io_u->xfer_buf,
252                                 io_u->xfer_buflen, 0, &nd->addr, &len);
253         } else {
254                 return recv(io_u->file->fd, io_u->xfer_buf, io_u->xfer_buflen,
255                                 flags);
256         }
257 }
258
259 static int fio_netio_queue(struct thread_data *td, struct io_u *io_u)
260 {
261         struct netio_data *nd = td->io_ops->data;
262         int ret;
263
264         fio_ro_check(td, io_u);
265
266         if (io_u->ddir == DDIR_WRITE) {
267                 if (!nd->use_splice || nd->net_protocol == IPPROTO_UDP)
268                         ret = fio_netio_send(td, io_u);
269                 else
270                         ret = fio_netio_splice_out(td, io_u);
271         } else if (io_u->ddir == DDIR_READ) {
272                 if (!nd->use_splice || nd->net_protocol == IPPROTO_UDP)
273                         ret = fio_netio_recv(td, io_u);
274                 else
275                         ret = fio_netio_splice_in(td, io_u);
276         } else
277                 ret = 0;        /* must be a SYNC */
278
279         if (ret != (int) io_u->xfer_buflen) {
280                 if (ret >= 0) {
281                         io_u->resid = io_u->xfer_buflen - ret;
282                         io_u->error = 0;
283                         return FIO_Q_COMPLETED;
284                 } else {
285                         int err = errno;
286
287                         if (io_u->ddir == DDIR_WRITE && err == EMSGSIZE)
288                                 return FIO_Q_BUSY;
289
290                         io_u->error = err;
291                 }
292         }
293
294         if (io_u->error)
295                 td_verror(td, io_u->error, "xfer");
296
297         return FIO_Q_COMPLETED;
298 }
299
300 static int fio_netio_connect(struct thread_data *td, struct fio_file *f)
301 {
302         struct netio_data *nd = td->io_ops->data;
303         int type;
304
305         if (nd->net_protocol == IPPROTO_TCP)
306                 type = SOCK_STREAM;
307         else
308                 type = SOCK_DGRAM;
309
310         f->fd = socket(AF_INET, type, nd->net_protocol);
311         if (f->fd < 0) {
312                 td_verror(td, errno, "socket");
313                 return 1;
314         }
315
316         if (nd->net_protocol == IPPROTO_UDP)
317                 return 0;
318
319         if (connect(f->fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
320                 td_verror(td, errno, "connect");
321                 return 1;
322         }
323
324         return 0;
325 }
326
327 static int fio_netio_accept(struct thread_data *td, struct fio_file *f)
328 {
329         struct netio_data *nd = td->io_ops->data;
330         socklen_t socklen = sizeof(nd->addr);
331
332         if (nd->net_protocol == IPPROTO_UDP) {
333                 f->fd = nd->listenfd;
334                 return 0;
335         }
336
337         log_info("fio: waiting for connection\n");
338
339         if (poll_wait(td, nd->listenfd, POLLIN) < 0)
340                 return 1;
341
342         f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr, &socklen);
343         if (f->fd < 0) {
344                 td_verror(td, errno, "accept");
345                 return 1;
346         }
347
348         return 0;
349 }
350
351 static int fio_netio_open_file(struct thread_data *td, struct fio_file *f)
352 {
353         if (td_read(td))
354                 return fio_netio_accept(td, f);
355         else
356                 return fio_netio_connect(td, f);
357 }
358
359 static int fio_netio_setup_connect(struct thread_data *td, const char *host,
360                                    unsigned short port)
361 {
362         struct netio_data *nd = td->io_ops->data;
363
364         nd->addr.sin_family = AF_INET;
365         nd->addr.sin_port = htons(port);
366
367         if (inet_aton(host, &nd->addr.sin_addr) != 1) {
368                 struct hostent *hent;
369
370                 hent = gethostbyname(host);
371                 if (!hent) {
372                         td_verror(td, errno, "gethostbyname");
373                         return 1;
374                 }
375
376                 memcpy(&nd->addr.sin_addr, hent->h_addr, 4);
377         }
378
379         return 0;
380 }
381
382 static int fio_netio_setup_listen(struct thread_data *td, short port)
383 {
384         struct netio_data *nd = td->io_ops->data;
385         int fd, opt, type;
386
387         if (nd->net_protocol == IPPROTO_TCP)
388                 type = SOCK_STREAM;
389         else
390                 type = SOCK_DGRAM;
391
392         fd = socket(AF_INET, type, nd->net_protocol);
393         if (fd < 0) {
394                 td_verror(td, errno, "socket");
395                 return 1;
396         }
397
398         opt = 1;
399         if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
400                 td_verror(td, errno, "setsockopt");
401                 return 1;
402         }
403 #ifdef SO_REUSEPORT
404         if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) < 0) {
405                 td_verror(td, errno, "setsockopt");
406                 return 1;
407         }
408 #endif
409
410         nd->addr.sin_family = AF_INET;
411         nd->addr.sin_addr.s_addr = htonl(INADDR_ANY);
412         nd->addr.sin_port = htons(port);
413
414         if (bind(fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
415                 td_verror(td, errno, "bind");
416                 return 1;
417         }
418         if (nd->net_protocol == IPPROTO_TCP && listen(fd, 1) < 0) {
419                 td_verror(td, errno, "listen");
420                 return 1;
421         }
422
423         nd->listenfd = fd;
424         return 0;
425 }
426
427 static int fio_netio_init(struct thread_data *td)
428 {
429         struct netio_data *nd = td->io_ops->data;
430         unsigned int port;
431         char host[64], buf[128];
432         char *sep, *portp, *modep;
433         int ret;
434
435         if (td_rw(td)) {
436                 log_err("fio: network connections must be read OR write\n");
437                 return 1;
438         }
439         if (td_random(td)) {
440                 log_err("fio: network IO can't be random\n");
441                 return 1;
442         }
443
444         strcpy(buf, td->o.filename);
445
446         sep = strchr(buf, '/');
447         if (!sep)
448                 goto bad_host;
449
450         *sep = '\0';
451         sep++;
452         strcpy(host, buf);
453         if (!strlen(host))
454                 goto bad_host;
455
456         modep = NULL;
457         portp = sep;
458         sep = strchr(portp, '/');
459         if (sep) {
460                 *sep = '\0';
461                 modep = sep + 1;
462         }
463                 
464         port = strtol(portp, NULL, 10);
465         if (!port || port > 65535)
466                 goto bad_host;
467
468         if (modep) {
469                 if (!strncmp("tcp", modep, strlen(modep)) ||
470                     !strncmp("TCP", modep, strlen(modep)))
471                         nd->net_protocol = IPPROTO_TCP;
472                 else if (!strncmp("udp", modep, strlen(modep)) ||
473                          !strncmp("UDP", modep, strlen(modep)))
474                         nd->net_protocol = IPPROTO_UDP;
475                 else
476                         goto bad_host;
477         } else
478                 nd->net_protocol = IPPROTO_TCP;
479
480         if (td_read(td)) {
481                 nd->send_to_net = 0;
482                 ret = fio_netio_setup_listen(td, port);
483         } else {
484                 nd->send_to_net = 1;
485                 ret = fio_netio_setup_connect(td, host, port);
486         }
487
488         return ret;
489 bad_host:
490         log_err("fio: bad network host/port/protocol: %s\n", td->o.filename);
491         return 1;
492 }
493
494 static void fio_netio_cleanup(struct thread_data *td)
495 {
496         struct netio_data *nd = td->io_ops->data;
497
498         if (nd) {
499                 if (nd->listenfd != -1)
500                         close(nd->listenfd);
501                 if (nd->pipes[0] != -1)
502                         close(nd->pipes[0]);
503                 if (nd->pipes[1] != -1)
504                         close(nd->pipes[1]);
505
506                 free(nd);
507         }
508 }
509
510 static int fio_netio_setup(struct thread_data *td)
511 {
512         struct netio_data *nd;
513
514         if (!td->io_ops->data) {
515                 nd = malloc(sizeof(*nd));;
516
517                 memset(nd, 0, sizeof(*nd));
518                 nd->listenfd = -1;
519                 nd->pipes[0] = nd->pipes[1] = -1;
520                 td->io_ops->data = nd;
521         }
522
523         return 0;
524 }
525
526 #ifdef FIO_HAVE_SPLICE
527 static int fio_netio_setup_splice(struct thread_data *td)
528 {
529         struct netio_data *nd;
530
531         fio_netio_setup(td);
532
533         nd = td->io_ops->data;
534         if (nd) {
535                 if (pipe(nd->pipes) < 0)
536                         return 1;
537
538                 nd->use_splice = 1;
539                 return 0;
540         }
541
542         return 1;
543 }
544
545 static struct ioengine_ops ioengine_splice = {
546         .name           = "netsplice",
547         .version        = FIO_IOOPS_VERSION,
548         .prep           = fio_netio_prep,
549         .queue          = fio_netio_queue,
550         .setup          = fio_netio_setup_splice,
551         .init           = fio_netio_init,
552         .cleanup        = fio_netio_cleanup,
553         .open_file      = fio_netio_open_file,
554         .close_file     = generic_close_file,
555         .flags          = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
556                           FIO_SIGQUIT,
557 };
558 #endif
559
560 static struct ioengine_ops ioengine_rw = {
561         .name           = "net",
562         .version        = FIO_IOOPS_VERSION,
563         .prep           = fio_netio_prep,
564         .queue          = fio_netio_queue,
565         .setup          = fio_netio_setup,
566         .init           = fio_netio_init,
567         .cleanup        = fio_netio_cleanup,
568         .open_file      = fio_netio_open_file,
569         .close_file     = generic_close_file,
570         .flags          = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
571                           FIO_SIGQUIT,
572 };
573
574 static void fio_init fio_netio_register(void)
575 {
576         register_ioengine(&ioengine_rw);
577 #ifdef FIO_HAVE_SPLICE
578         register_ioengine(&ioengine_splice);
579 #endif
580 }
581
582 static void fio_exit fio_netio_unregister(void)
583 {
584         unregister_ioengine(&ioengine_rw);
585 #ifdef FIO_HAVE_SPLICE
586         unregister_ioengine(&ioengine_splice);
587 #endif
588 }