net engine: add UDP support
[fio.git] / engines / net.c
1 /*
2  * net engine
3  *
4  * IO engine that reads/writes to/from sockets.
5  *
6  */
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include <unistd.h>
10 #include <errno.h>
11 #include <assert.h>
12 #include <netinet/in.h>
13 #include <arpa/inet.h>
14 #include <netdb.h>
15 #include <sys/poll.h>
16 #include <sys/types.h>
17 #include <sys/socket.h>
18
19 #include "../fio.h"
20
21 struct netio_data {
22         int listenfd;
23         int send_to_net;
24         int use_splice;
25         int net_protocol;
26         int pipes[2];
27         char host[64];
28         struct sockaddr_in addr;
29 };
30
31 static int fio_netio_prep(struct thread_data *td, struct io_u *io_u)
32 {
33         struct netio_data *nd = td->io_ops->data;
34
35         /*
36          * Make sure we don't see spurious reads to a receiver, and vice versa
37          */
38         if ((nd->send_to_net && io_u->ddir == DDIR_READ) ||
39             (!nd->send_to_net && io_u->ddir == DDIR_WRITE)) {
40                 td_verror(td, EINVAL, "bad direction");
41                 return 1;
42         }
43                 
44         return 0;
45 }
46
47 #ifdef FIO_HAVE_SPLICE
48 static int splice_io_u(int fdin, int fdout, unsigned int len)
49 {
50         int bytes = 0;
51
52         while (len) {
53                 int ret = splice(fdin, NULL, fdout, NULL, len, 0);
54
55                 if (ret < 0) {
56                         if (!bytes)
57                                 bytes = ret;
58
59                         break;
60                 } else if (!ret)
61                         break;
62
63                 bytes += ret;
64                 len -= ret;
65         }
66
67         return bytes;
68 }
69
70 /*
71  * Receive bytes from a socket and fill them into the internal pipe
72  */
73 static int splice_in(struct thread_data *td, struct io_u *io_u)
74 {
75         struct netio_data *nd = td->io_ops->data;
76
77         return splice_io_u(io_u->file->fd, nd->pipes[1], io_u->xfer_buflen);
78 }
79
80 /*
81  * Transmit 'len' bytes from the internal pipe
82  */
83 static int splice_out(struct thread_data *td, struct io_u *io_u,
84                       unsigned int len)
85 {
86         struct netio_data *nd = td->io_ops->data;
87
88         return splice_io_u(nd->pipes[0], io_u->file->fd, len);
89 }
90
91 static int vmsplice_io_u(struct io_u *io_u, int fd, unsigned int len)
92 {
93         struct iovec iov = {
94                 .iov_base = io_u->xfer_buf,
95                 .iov_len = len,
96         };
97         int bytes = 0;
98
99         while (iov.iov_len) {
100                 int ret = vmsplice(fd, &iov, 1, SPLICE_F_MOVE);
101
102                 if (ret < 0) {
103                         if (!bytes)
104                                 bytes = ret;
105                         break;
106                 } else if (!ret)
107                         break;
108
109                 iov.iov_len -= ret;
110                 iov.iov_base += ret;
111                 bytes += ret;
112         }
113
114         return bytes;
115
116 }
117
118 /*
119  * vmsplice() pipe to io_u buffer
120  */
121 static int vmsplice_io_u_out(struct thread_data *td, struct io_u *io_u,
122                              unsigned int len)
123 {
124         struct netio_data *nd = td->io_ops->data;
125
126         return vmsplice_io_u(io_u, nd->pipes[0], len);
127 }
128
129 /*
130  * vmsplice() io_u to pipe
131  */
132 static int vmsplice_io_u_in(struct thread_data *td, struct io_u *io_u)
133 {
134         struct netio_data *nd = td->io_ops->data;
135
136         return vmsplice_io_u(io_u, nd->pipes[1], io_u->xfer_buflen);
137 }
138
139 /*
140  * splice receive - transfer socket data into a pipe using splice, then map
141  * that pipe data into the io_u using vmsplice.
142  */
143 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
144 {
145         int ret;
146
147         ret = splice_in(td, io_u);
148         if (ret > 0)
149                 return vmsplice_io_u_out(td, io_u, ret);
150
151         return ret;
152 }
153
154 /*
155  * splice transmit - map data from the io_u into a pipe by using vmsplice,
156  * then transfer that pipe to a socket using splice.
157  */
158 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
159 {
160         int ret;
161
162         ret = vmsplice_io_u_in(td, io_u);
163         if (ret > 0)
164                 return splice_out(td, io_u, ret);
165
166         return ret;
167 }
168 #else
169 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
170 {
171         errno = EOPNOTSUPP;
172         return -1;
173 }
174
175 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
176 {
177         errno = EOPNOTSUPP;
178         return -1;
179 }
180 #endif
181
182 static int fio_netio_send(struct thread_data *td, struct io_u *io_u)
183 {
184         struct netio_data *nd = td->io_ops->data;
185         int flags = 0;
186
187         /*
188          * if we are going to write more, set MSG_MORE
189          */
190 #ifdef MSG_MORE
191         if (td->this_io_bytes[DDIR_WRITE] + io_u->xfer_buflen < td->o.size)
192                 flags = MSG_MORE;
193 #endif
194
195         if (nd->net_protocol == IPPROTO_UDP) {
196                 return sendto(io_u->file->fd, io_u->xfer_buf, io_u->xfer_buflen,
197                                 0, &nd->addr, sizeof(nd->addr));
198         } else {
199                 return send(io_u->file->fd, io_u->xfer_buf, io_u->xfer_buflen,
200                                 flags);
201         }
202 }
203
204 static int fio_netio_recv(struct thread_data *td, struct io_u *io_u)
205 {
206         struct netio_data *nd = td->io_ops->data;
207         int flags = MSG_WAITALL;
208
209         if (nd->net_protocol == IPPROTO_UDP) {
210                 socklen_t len = sizeof(nd->addr);
211
212                 return recvfrom(io_u->file->fd, io_u->xfer_buf,
213                                 io_u->xfer_buflen, 0, &nd->addr, &len);
214         } else {
215                 return recv(io_u->file->fd, io_u->xfer_buf, io_u->xfer_buflen,
216                                 flags);
217         }
218 }
219
220 static int fio_netio_queue(struct thread_data *td, struct io_u *io_u)
221 {
222         struct netio_data *nd = td->io_ops->data;
223         int ret;
224
225         fio_ro_check(td, io_u);
226
227         if (io_u->ddir == DDIR_WRITE) {
228                 if (!nd->use_splice || nd->net_protocol == IPPROTO_UDP)
229                         ret = fio_netio_send(td, io_u);
230                 else
231                         ret = fio_netio_splice_out(td, io_u);
232         } else if (io_u->ddir == DDIR_READ) {
233                 if (!nd->use_splice || nd->net_protocol == IPPROTO_UDP)
234                         ret = fio_netio_recv(td, io_u);
235                 else
236                         ret = fio_netio_splice_in(td, io_u);
237         } else
238                 ret = 0;        /* must be a SYNC */
239
240         if (ret != (int) io_u->xfer_buflen) {
241                 if (ret >= 0) {
242                         io_u->resid = io_u->xfer_buflen - ret;
243                         io_u->error = 0;
244                         return FIO_Q_COMPLETED;
245                 } else {
246                         int err = errno;
247
248                         if (io_u->ddir == DDIR_WRITE && err == EMSGSIZE)
249                                 return FIO_Q_BUSY;
250
251                         io_u->error = err;
252                 }
253         }
254
255         if (io_u->error)
256                 td_verror(td, io_u->error, "xfer");
257
258         return FIO_Q_COMPLETED;
259 }
260
261 static int fio_netio_connect(struct thread_data *td, struct fio_file *f)
262 {
263         struct netio_data *nd = td->io_ops->data;
264         int type;
265
266         if (nd->net_protocol == IPPROTO_TCP)
267                 type = SOCK_STREAM;
268         else
269                 type = SOCK_DGRAM;
270
271         f->fd = socket(AF_INET, type, nd->net_protocol);
272         if (f->fd < 0) {
273                 td_verror(td, errno, "socket");
274                 return 1;
275         }
276
277         if (nd->net_protocol == IPPROTO_UDP)
278                 return 0;
279
280         if (connect(f->fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
281                 td_verror(td, errno, "connect");
282                 return 1;
283         }
284
285         return 0;
286 }
287
288 static int fio_netio_accept(struct thread_data *td, struct fio_file *f)
289 {
290         struct netio_data *nd = td->io_ops->data;
291         socklen_t socklen = sizeof(nd->addr);
292         struct pollfd pfd;
293         int ret;
294
295         if (nd->net_protocol == IPPROTO_UDP) {
296                 f->fd = nd->listenfd;
297                 return 0;
298         }
299
300         log_info("fio: waiting for connection\n");
301
302         /*
303          * Accept loop. poll for incoming events, accept them. Repeat until we
304          * have all connections.
305          */
306         while (!td->terminate) {
307                 pfd.fd = nd->listenfd;
308                 pfd.events = POLLIN;
309
310                 ret = poll(&pfd, 1, -1);
311                 if (ret < 0) {
312                         if (errno == EINTR)
313                                 continue;
314
315                         td_verror(td, errno, "poll");
316                         break;
317                 } else if (!ret)
318                         continue;
319
320                 /*
321                  * should be impossible
322                  */
323                 if (!(pfd.revents & POLLIN))
324                         continue;
325
326                 f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr, &socklen);
327                 if (f->fd < 0) {
328                         td_verror(td, errno, "accept");
329                         return 1;
330                 }
331                 break;
332         }
333
334         return 0;
335 }
336
337 static int fio_netio_open_file(struct thread_data *td, struct fio_file *f)
338 {
339         if (td_read(td))
340                 return fio_netio_accept(td, f);
341         else
342                 return fio_netio_connect(td, f);
343 }
344
345 static int fio_netio_setup_connect(struct thread_data *td, const char *host,
346                                    unsigned short port)
347 {
348         struct netio_data *nd = td->io_ops->data;
349
350         nd->addr.sin_family = AF_INET;
351         nd->addr.sin_port = htons(port);
352
353         if (inet_aton(host, &nd->addr.sin_addr) != 1) {
354                 struct hostent *hent;
355
356                 hent = gethostbyname(host);
357                 if (!hent) {
358                         td_verror(td, errno, "gethostbyname");
359                         return 1;
360                 }
361
362                 memcpy(&nd->addr.sin_addr, hent->h_addr, 4);
363         }
364
365         return 0;
366 }
367
368 static int fio_netio_setup_listen(struct thread_data *td, short port)
369 {
370         struct netio_data *nd = td->io_ops->data;
371         int fd, opt, type;
372
373         if (nd->net_protocol == IPPROTO_TCP)
374                 type = SOCK_STREAM;
375         else
376                 type = SOCK_DGRAM;
377
378         fd = socket(AF_INET, type, nd->net_protocol);
379         if (fd < 0) {
380                 td_verror(td, errno, "socket");
381                 return 1;
382         }
383
384         opt = 1;
385         if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
386                 td_verror(td, errno, "setsockopt");
387                 return 1;
388         }
389 #ifdef SO_REUSEPORT
390         if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) < 0) {
391                 td_verror(td, errno, "setsockopt");
392                 return 1;
393         }
394 #endif
395
396         nd->addr.sin_family = AF_INET;
397         nd->addr.sin_addr.s_addr = htonl(INADDR_ANY);
398         nd->addr.sin_port = htons(port);
399
400         if (bind(fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
401                 td_verror(td, errno, "bind");
402                 return 1;
403         }
404         if (nd->net_protocol == IPPROTO_TCP && listen(fd, 1) < 0) {
405                 td_verror(td, errno, "listen");
406                 return 1;
407         }
408
409         nd->listenfd = fd;
410         return 0;
411 }
412
413 static int fio_netio_init(struct thread_data *td)
414 {
415         struct netio_data *nd = td->io_ops->data;
416         unsigned int port;
417         char host[64], buf[128];
418         char *sep, *portp, *modep;
419         int ret;
420
421         if (td_rw(td)) {
422                 log_err("fio: network connections must be read OR write\n");
423                 return 1;
424         }
425         if (td_random(td)) {
426                 log_err("fio: network IO can't be random\n");
427                 return 1;
428         }
429
430         strcpy(buf, td->o.filename);
431
432         sep = strchr(buf, '/');
433         if (!sep)
434                 goto bad_host;
435
436         *sep = '\0';
437         sep++;
438         strcpy(host, buf);
439         if (!strlen(host))
440                 goto bad_host;
441
442         modep = NULL;
443         portp = sep;
444         sep = strchr(portp, '/');
445         if (sep) {
446                 *sep = '\0';
447                 modep = sep + 1;
448         }
449                 
450         port = strtol(portp, NULL, 10);
451         if (!port || port > 65535)
452                 goto bad_host;
453
454         if (modep) {
455                 if (!strncmp("tcp", modep, strlen(modep)))
456                         nd->net_protocol = IPPROTO_TCP;
457                 else if (!strncmp("udp", modep, strlen(modep)))
458                         nd->net_protocol = IPPROTO_UDP;
459                 else
460                         goto bad_host;
461         } else
462                 nd->net_protocol = IPPROTO_TCP;
463
464         if (td_read(td)) {
465                 nd->send_to_net = 0;
466                 ret = fio_netio_setup_listen(td, port);
467         } else {
468                 nd->send_to_net = 1;
469                 ret = fio_netio_setup_connect(td, host, port);
470         }
471
472         return ret;
473 bad_host:
474         log_err("fio: bad network host/port/protocol: %s\n", td->o.filename);
475         return 1;
476 }
477
478 static void fio_netio_cleanup(struct thread_data *td)
479 {
480         struct netio_data *nd = td->io_ops->data;
481
482         if (nd) {
483                 if (nd->listenfd != -1)
484                         close(nd->listenfd);
485                 if (nd->pipes[0] != -1)
486                         close(nd->pipes[0]);
487                 if (nd->pipes[1] != -1)
488                         close(nd->pipes[1]);
489
490                 free(nd);
491         }
492 }
493
494 static int fio_netio_setup(struct thread_data *td)
495 {
496         struct netio_data *nd;
497
498         if (!td->io_ops->data) {
499                 nd = malloc(sizeof(*nd));;
500
501                 memset(nd, 0, sizeof(*nd));
502                 nd->listenfd = -1;
503                 nd->pipes[0] = nd->pipes[1] = -1;
504                 td->io_ops->data = nd;
505         }
506
507         return 0;
508 }
509
510 #ifdef FIO_HAVE_SPLICE
511 static int fio_netio_setup_splice(struct thread_data *td)
512 {
513         struct netio_data *nd;
514
515         fio_netio_setup(td);
516
517         nd = td->io_ops->data;
518         if (nd) {
519                 if (pipe(nd->pipes) < 0)
520                         return 1;
521
522                 nd->use_splice = 1;
523                 return 0;
524         }
525
526         return 1;
527 }
528
529 static struct ioengine_ops ioengine_splice = {
530         .name           = "netsplice",
531         .version        = FIO_IOOPS_VERSION,
532         .prep           = fio_netio_prep,
533         .queue          = fio_netio_queue,
534         .setup          = fio_netio_setup_splice,
535         .init           = fio_netio_init,
536         .cleanup        = fio_netio_cleanup,
537         .open_file      = fio_netio_open_file,
538         .close_file     = generic_close_file,
539         .flags          = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
540                           FIO_SIGQUIT,
541 };
542 #endif
543
544 static struct ioengine_ops ioengine_rw = {
545         .name           = "net",
546         .version        = FIO_IOOPS_VERSION,
547         .prep           = fio_netio_prep,
548         .queue          = fio_netio_queue,
549         .setup          = fio_netio_setup,
550         .init           = fio_netio_init,
551         .cleanup        = fio_netio_cleanup,
552         .open_file      = fio_netio_open_file,
553         .close_file     = generic_close_file,
554         .flags          = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
555                           FIO_SIGQUIT,
556 };
557
558 static void fio_init fio_netio_register(void)
559 {
560         register_ioengine(&ioengine_rw);
561 #ifdef FIO_HAVE_SPLICE
562         register_ioengine(&ioengine_splice);
563 #endif
564 }
565
566 static void fio_exit fio_netio_unregister(void)
567 {
568         unregister_ioengine(&ioengine_rw);
569 #ifdef FIO_HAVE_SPLICE
570         unregister_ioengine(&ioengine_splice);
571 #endif
572 }