net engine: improve host/port parsing
[fio.git] / engines / net.c
1 /*
2  * net engine
3  *
4  * IO engine that reads/writes to/from sockets.
5  *
6  */
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include <unistd.h>
10 #include <errno.h>
11 #include <assert.h>
12 #include <netinet/in.h>
13 #include <arpa/inet.h>
14 #include <netdb.h>
15 #include <sys/poll.h>
16
17 #include "../fio.h"
18
19 struct netio_data {
20         int listenfd;
21         int send_to_net;
22         int use_splice;
23         int pipes[2];
24         char host[64];
25         struct sockaddr_in addr;
26 };
27
28 static int fio_netio_prep(struct thread_data *td, struct io_u *io_u)
29 {
30         struct netio_data *nd = td->io_ops->data;
31
32         /*
33          * Make sure we don't see spurious reads to a receiver, and vice versa
34          */
35         if ((nd->send_to_net && io_u->ddir == DDIR_READ) ||
36             (!nd->send_to_net && io_u->ddir == DDIR_WRITE)) {
37                 td_verror(td, EINVAL, "bad direction");
38                 return 1;
39         }
40                 
41         return 0;
42 }
43
44 static int splice_io_u(int fdin, int fdout, unsigned int len)
45 {
46         int bytes = 0;
47
48         while (len) {
49                 int ret = splice(fdin, NULL, fdout, NULL, len, 0);
50
51                 if (ret < 0) {
52                         if (!bytes)
53                                 bytes = ret;
54
55                         break;
56                 } else if (!ret)
57                         break;
58
59                 bytes += ret;
60                 len -= ret;
61         }
62
63         return bytes;
64 }
65
66 /*
67  * Receive bytes from a socket and fill them into the internal pipe
68  */
69 static int splice_in(struct thread_data *td, struct io_u *io_u)
70 {
71         struct netio_data *nd = td->io_ops->data;
72
73         return splice_io_u(io_u->file->fd, nd->pipes[1], io_u->xfer_buflen);
74 }
75
76 /*
77  * Transmit 'len' bytes from the internal pipe
78  */
79 static int splice_out(struct thread_data *td, struct io_u *io_u,
80                       unsigned int len)
81 {
82         struct netio_data *nd = td->io_ops->data;
83
84         return splice_io_u(nd->pipes[0], io_u->file->fd, len);
85 }
86
87 static int vmsplice_io_u(struct io_u *io_u, int fd, unsigned int len)
88 {
89         struct iovec iov = {
90                 .iov_base = io_u->xfer_buf,
91                 .iov_len = len,
92         };
93         int bytes = 0;
94
95         while (iov.iov_len) {
96                 int ret = vmsplice(fd, &iov, 1, SPLICE_F_MOVE);
97
98                 if (ret < 0) {
99                         if (!bytes)
100                                 bytes = ret;
101                         break;
102                 } else if (!ret)
103                         break;
104
105                 iov.iov_len -= ret;
106                 iov.iov_base += ret;
107                 bytes += ret;
108         }
109
110         return bytes;
111
112 }
113
114 /*
115  * vmsplice() pipe to io_u buffer
116  */
117 static int vmsplice_io_u_out(struct thread_data *td, struct io_u *io_u,
118                              unsigned int len)
119 {
120         struct netio_data *nd = td->io_ops->data;
121
122         return vmsplice_io_u(io_u, nd->pipes[0], len);
123 }
124
125 /*
126  * vmsplice() io_u to pipe
127  */
128 static int vmsplice_io_u_in(struct thread_data *td, struct io_u *io_u)
129 {
130         struct netio_data *nd = td->io_ops->data;
131
132         return vmsplice_io_u(io_u, nd->pipes[1], io_u->xfer_buflen);
133 }
134
135 /*
136  * splice receive - transfer socket data into a pipe using splice, then map
137  * that pipe data into the io_u using vmsplice.
138  */
139 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
140 {
141         int ret;
142
143         ret = splice_in(td, io_u);
144         if (ret > 0)
145                 return vmsplice_io_u_out(td, io_u, ret);
146
147         return ret;
148 }
149
150 /*
151  * splice transmit - map data from the io_u into a pipe by using vmsplice,
152  * then transfer that pipe to a socket using splice.
153  */
154 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
155 {
156         int ret;
157
158         ret = vmsplice_io_u_in(td, io_u);
159         if (ret > 0)
160                 return splice_out(td, io_u, ret);
161
162         return ret;
163 }
164
165 static int fio_netio_send(struct thread_data *td, struct io_u *io_u)
166 {
167         int flags = 0;
168
169         /*
170          * if we are going to write more, set MSG_MORE
171          */
172         if (td->this_io_bytes[DDIR_WRITE] + io_u->xfer_buflen < td->o.size)
173                 flags = MSG_MORE;
174
175         return send(io_u->file->fd, io_u->xfer_buf, io_u->xfer_buflen, flags);
176 }
177
178 static int fio_netio_recv(struct io_u *io_u)
179 {
180         int flags = MSG_WAITALL;
181
182         return recv(io_u->file->fd, io_u->xfer_buf, io_u->xfer_buflen, flags);
183 }
184
185 static int fio_netio_queue(struct thread_data *td, struct io_u *io_u)
186 {
187         struct netio_data *nd = td->io_ops->data;
188         int ret;
189
190         fio_ro_check(td, io_u);
191
192         if (io_u->ddir == DDIR_WRITE) {
193                 if (nd->use_splice)
194                         ret = fio_netio_splice_out(td, io_u);
195                 else
196                         ret = fio_netio_send(td, io_u);
197         } else if (io_u->ddir == DDIR_READ) {
198                 if (nd->use_splice)
199                         ret = fio_netio_splice_in(td, io_u);
200                 else
201                         ret = fio_netio_recv(io_u);
202         } else
203                 ret = 0;        /* must be a SYNC */
204
205         if (ret != (int) io_u->xfer_buflen) {
206                 if (ret >= 0) {
207                         io_u->resid = io_u->xfer_buflen - ret;
208                         io_u->error = 0;
209                         return FIO_Q_COMPLETED;
210                 } else
211                         io_u->error = errno;
212         }
213
214         if (io_u->error)
215                 td_verror(td, io_u->error, "xfer");
216
217         return FIO_Q_COMPLETED;
218 }
219
220 static int fio_netio_connect(struct thread_data *td, struct fio_file *f)
221 {
222         struct netio_data *nd = td->io_ops->data;
223
224         f->fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
225         if (f->fd < 0) {
226                 td_verror(td, errno, "socket");
227                 return 1;
228         }
229
230         if (connect(f->fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
231                 td_verror(td, errno, "connect");
232                 return 1;
233         }
234
235         return 0;
236 }
237
238 static int fio_netio_accept(struct thread_data *td, struct fio_file *f)
239 {
240         struct netio_data *nd = td->io_ops->data;
241         socklen_t socklen = sizeof(nd->addr);
242         struct pollfd pfd;
243         int ret;
244
245         log_info("fio: waiting for connection\n");
246
247         /*
248          * Accept loop. poll for incoming events, accept them. Repeat until we
249          * have all connections.
250          */
251         while (!td->terminate) {
252                 pfd.fd = nd->listenfd;
253                 pfd.events = POLLIN;
254
255                 ret = poll(&pfd, 1, -1);
256                 if (ret < 0) {
257                         if (errno == EINTR)
258                                 continue;
259
260                         td_verror(td, errno, "poll");
261                         break;
262                 } else if (!ret)
263                         continue;
264
265                 /*
266                  * should be impossible
267                  */
268                 if (!(pfd.revents & POLLIN))
269                         continue;
270
271                 f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr, &socklen);
272                 if (f->fd < 0) {
273                         td_verror(td, errno, "accept");
274                         return 1;
275                 }
276                 break;
277         }
278
279         return 0;
280 }
281
282 static int fio_netio_open_file(struct thread_data *td, struct fio_file *f)
283 {
284         if (td_read(td))
285                 return fio_netio_accept(td, f);
286         else
287                 return fio_netio_connect(td, f);
288 }
289
290 static int fio_netio_setup_connect(struct thread_data *td, const char *host,
291                                    unsigned short port)
292 {
293         struct netio_data *nd = td->io_ops->data;
294
295         nd->addr.sin_family = AF_INET;
296         nd->addr.sin_port = htons(port);
297
298         if (inet_aton(host, &nd->addr.sin_addr) != 1) {
299                 struct hostent *hent;
300
301                 hent = gethostbyname(host);
302                 if (!hent) {
303                         td_verror(td, errno, "gethostbyname");
304                         return 1;
305                 }
306
307                 memcpy(&nd->addr.sin_addr, hent->h_addr, 4);
308         }
309
310         return 0;
311 }
312
313 static int fio_netio_setup_listen(struct thread_data *td, short port)
314 {
315         struct netio_data *nd = td->io_ops->data;
316         int fd, opt;
317
318         fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
319         if (fd < 0) {
320                 td_verror(td, errno, "socket");
321                 return 1;
322         }
323
324         opt = 1;
325         if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
326                 td_verror(td, errno, "setsockopt");
327                 return 1;
328         }
329 #ifdef SO_REUSEPORT
330         if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) < 0) {
331                 td_verror(td, errno, "setsockopt");
332                 return 1;
333         }
334 #endif
335
336         nd->addr.sin_family = AF_INET;
337         nd->addr.sin_addr.s_addr = htonl(INADDR_ANY);
338         nd->addr.sin_port = htons(port);
339
340         if (bind(fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
341                 td_verror(td, errno, "bind");
342                 return 1;
343         }
344         if (listen(fd, 1) < 0) {
345                 td_verror(td, errno, "listen");
346                 return 1;
347         }
348
349         nd->listenfd = fd;
350         return 0;
351 }
352
353 static int fio_netio_init(struct thread_data *td)
354 {
355         struct netio_data *nd = td->io_ops->data;
356         unsigned int port;
357         char host[64], buf[128];
358         char *sep;
359         int ret;
360
361         if (td_rw(td)) {
362                 log_err("fio: network connections must be read OR write\n");
363                 return 1;
364         }
365         if (td_random(td)) {
366                 log_err("fio: network IO can't be random\n");
367                 return 1;
368         }
369
370         strcpy(buf, td->o.filename);
371
372         sep = strchr(buf, '/');
373         if (!sep)
374                 goto bad_host;
375
376         *sep = '\0';
377         sep++;
378         strcpy(host, buf);
379         if (!strlen(host))
380                 goto bad_host;
381
382         port = strtol(sep, NULL, 10);
383         if (!port || port > 65535)
384                 goto bad_host;
385
386         if (td_read(td)) {
387                 nd->send_to_net = 0;
388                 ret = fio_netio_setup_listen(td, port);
389         } else {
390                 nd->send_to_net = 1;
391                 ret = fio_netio_setup_connect(td, host, port);
392         }
393
394         return ret;
395 bad_host:
396         log_err("fio: bad network host/port: %s\n", td->o.filename);
397         return 1;
398 }
399
400 static void fio_netio_cleanup(struct thread_data *td)
401 {
402         struct netio_data *nd = td->io_ops->data;
403
404         if (nd) {
405                 if (nd->listenfd != -1)
406                         close(nd->listenfd);
407                 if (nd->pipes[0] != -1)
408                         close(nd->pipes[0]);
409                 if (nd->pipes[1] != -1)
410                         close(nd->pipes[1]);
411
412                 free(nd);
413                 td->io_ops->data = NULL;
414         }
415 }
416
417 static int fio_netio_setup(struct thread_data *td)
418 {
419         struct netio_data *nd;
420
421         if (!td->io_ops->data) {
422                 nd = malloc(sizeof(*nd));;
423
424                 memset(nd, 0, sizeof(*nd));
425                 nd->listenfd = -1;
426                 nd->pipes[0] = nd->pipes[1] = -1;
427                 td->io_ops->data = nd;
428         }
429
430         return 0;
431 }
432
433 static int fio_netio_setup_splice(struct thread_data *td)
434 {
435         struct netio_data *nd;
436
437         fio_netio_setup(td);
438
439         nd = td->io_ops->data;
440         if (nd) {
441                 if (pipe(nd->pipes) < 0)
442                         return 1;
443
444                 nd->use_splice = 1;
445                 return 0;
446         }
447
448         return 1;
449 }
450
451 static struct ioengine_ops ioengine_rw = {
452         .name           = "net",
453         .version        = FIO_IOOPS_VERSION,
454         .prep           = fio_netio_prep,
455         .queue          = fio_netio_queue,
456         .setup          = fio_netio_setup,
457         .init           = fio_netio_init,
458         .cleanup        = fio_netio_cleanup,
459         .open_file      = fio_netio_open_file,
460         .close_file     = generic_close_file,
461         .flags          = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
462                           FIO_SIGQUIT,
463 };
464
465 static struct ioengine_ops ioengine_splice = {
466         .name           = "netsplice",
467         .version        = FIO_IOOPS_VERSION,
468         .prep           = fio_netio_prep,
469         .queue          = fio_netio_queue,
470         .setup          = fio_netio_setup_splice,
471         .init           = fio_netio_init,
472         .cleanup        = fio_netio_cleanup,
473         .open_file      = fio_netio_open_file,
474         .close_file     = generic_close_file,
475         .flags          = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
476                           FIO_SIGQUIT,
477 };
478
479 static void fio_init fio_netio_register(void)
480 {
481         register_ioengine(&ioengine_rw);
482         register_ioengine(&ioengine_splice);
483 }
484
485 static void fio_exit fio_netio_unregister(void)
486 {
487         unregister_ioengine(&ioengine_rw);
488         unregister_ioengine(&ioengine_splice);
489 }