net engine: fix listenfd/pipe fd leaks
[fio.git] / engines / net.c
1 /*
2  * net engine
3  *
4  * IO engine that reads/writes to/from sockets.
5  *
6  */
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include <unistd.h>
10 #include <errno.h>
11 #include <assert.h>
12 #include <netinet/in.h>
13 #include <arpa/inet.h>
14 #include <netdb.h>
15 #include <sys/poll.h>
16
17 #include "../fio.h"
18
19 struct netio_data {
20         int listenfd;
21         int send_to_net;
22         int use_splice;
23         int pipes[2];
24         char host[64];
25         struct sockaddr_in addr;
26 };
27
28 static int fio_netio_prep(struct thread_data *td, struct io_u *io_u)
29 {
30         struct netio_data *nd = td->io_ops->data;
31         struct fio_file *f = io_u->file;
32
33         /*
34          * Make sure we don't see spurious reads to a receiver, and vice versa
35          */
36         if ((nd->send_to_net && io_u->ddir == DDIR_READ) ||
37             (!nd->send_to_net && io_u->ddir == DDIR_WRITE)) {
38                 td_verror(td, EINVAL, "bad direction");
39                 return 1;
40         }
41                 
42         if (io_u->ddir == DDIR_SYNC)
43                 return 0;
44         if (io_u->offset == f->last_completed_pos)
45                 return 0;
46
47         /*
48          * If offset is different from last end position, it's a seek.
49          * As network io is purely sequential, we don't allow seeks.
50          */
51         td_verror(td, EINVAL, "cannot seek");
52         return 1;
53 }
54
55 /*
56  * Receive bytes from a socket and fill them into the internal pipe
57  */
58 static int splice_in(struct thread_data *td, struct io_u *io_u)
59 {
60         struct netio_data *nd = td->io_ops->data;
61         unsigned int len = io_u->xfer_buflen;
62         struct fio_file *f = io_u->file;
63         int bytes = 0;
64
65         while (len) {
66                 int ret = splice(f->fd, NULL, nd->pipes[1], NULL, len, 0);
67
68                 if (ret < 0) {
69                         if (!bytes)
70                                 bytes = ret;
71
72                         break;
73                 } else if (!ret)
74                         break;
75
76                 bytes += ret;
77                 len -= ret;
78         }
79
80         return bytes;
81 }
82
83 /*
84  * Transmit 'len' bytes from the internal pipe
85  */
86 static int splice_out(struct thread_data *td, struct io_u *io_u,
87                       unsigned int len)
88 {
89         struct netio_data *nd = td->io_ops->data;
90         struct fio_file *f = io_u->file;
91         int bytes = 0;
92
93         while (len) {
94                 int ret = splice(nd->pipes[0], NULL, f->fd, NULL, len, 0);
95
96                 if (ret < 0) {
97                         if (!bytes)
98                                 bytes = ret;
99                         
100                         break;
101                 } else if (!ret)
102                         break;
103
104                 bytes += ret;
105                 len -= ret;
106         }
107
108         return bytes;
109 }
110
111 /*
112  * vmsplice() pipe to io_u buffer
113  */
114 static int vmsplice_io_u_out(struct thread_data *td, struct io_u *io_u,
115                              unsigned int len)
116 {
117         struct netio_data *nd = td->io_ops->data;
118         struct iovec iov = {
119                 .iov_base = io_u->xfer_buf,
120                 .iov_len = len,
121         };
122         int bytes = 0;
123
124         while (iov.iov_len) {
125                 int ret = vmsplice(nd->pipes[0], &iov, 1, SPLICE_F_MOVE);
126
127                 if (ret < 0) {
128                         if (!bytes)
129                                 bytes = ret;
130                         break;
131                 } else if (!ret)
132                         break;
133
134                 iov.iov_len -= ret;
135                 bytes += ret;
136                 if (iov.iov_len)
137                         iov.iov_base += ret;
138         }
139
140         return bytes;
141 }
142
143 /*
144  * vmsplice() io_u to pipe
145  */
146 static int vmsplice_io_u_in(struct thread_data *td, struct io_u *io_u)
147 {
148         struct netio_data *nd = td->io_ops->data;
149         struct iovec iov = {
150                 .iov_base = io_u->xfer_buf,
151                 .iov_len = io_u->xfer_buflen,
152         };
153         unsigned int bytes = 0;
154
155         while (iov.iov_len) {
156                 int ret = vmsplice(nd->pipes[1], &iov, 1, SPLICE_F_MOVE);
157
158                 if (ret < 0)
159                         return -1;
160                 else if (!ret)
161                         return bytes;
162
163                 iov.iov_len -= ret;
164                 bytes += ret;
165                 if (iov.iov_len)
166                         iov.iov_base += ret;
167         }
168
169         return bytes;
170 }
171
172 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
173 {
174         int ret;
175
176         ret = splice_in(td, io_u);
177         if (ret <= 0)
178                 return ret;
179
180         return vmsplice_io_u_out(td, io_u, ret);
181 }
182
183 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
184 {
185         int ret;
186
187         ret = vmsplice_io_u_in(td, io_u);
188         if (ret <= 0)
189                 return ret;
190
191         return splice_out(td, io_u, ret);
192 }
193
194 static int fio_netio_send(struct thread_data *td, struct io_u *io_u)
195 {
196         int flags = 0;
197
198         /*
199          * if we are going to write more, set MSG_MORE
200          */
201         if (td->this_io_bytes[DDIR_WRITE] + io_u->xfer_buflen < td->o.size)
202                 flags = MSG_MORE;
203
204         return send(io_u->file->fd, io_u->xfer_buf, io_u->xfer_buflen, flags);
205 }
206
207 static int fio_netio_recv(struct io_u *io_u)
208 {
209         int flags = MSG_WAITALL;
210
211         return recv(io_u->file->fd, io_u->xfer_buf, io_u->xfer_buflen, flags);
212 }
213
214 static int fio_netio_queue(struct thread_data *td, struct io_u *io_u)
215 {
216         struct netio_data *nd = td->io_ops->data;
217         int ret;
218
219         if (io_u->ddir == DDIR_WRITE) {
220                 if (nd->use_splice)
221                         ret = fio_netio_splice_out(td, io_u);
222                 else
223                         ret = fio_netio_send(td, io_u);
224         } else if (io_u->ddir == DDIR_READ) {
225                 if (nd->use_splice)
226                         ret = fio_netio_splice_in(td, io_u);
227                 else
228                         ret = fio_netio_recv(io_u);
229         } else
230                 ret = 0;        /* must be a SYNC */
231
232         if (ret != (int) io_u->xfer_buflen) {
233                 if (ret >= 0) {
234                         io_u->resid = io_u->xfer_buflen - ret;
235                         io_u->error = 0;
236                         return FIO_Q_COMPLETED;
237                 } else
238                         io_u->error = errno;
239         }
240
241         if (io_u->error)
242                 td_verror(td, io_u->error, "xfer");
243
244         return FIO_Q_COMPLETED;
245 }
246
247 static int fio_netio_connect(struct thread_data *td, struct fio_file *f)
248 {
249         struct netio_data *nd = td->io_ops->data;
250
251         f->fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
252         if (f->fd < 0) {
253                 td_verror(td, errno, "socket");
254                 return 1;
255         }
256
257         if (connect(f->fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
258                 td_verror(td, errno, "connect");
259                 return 1;
260         }
261
262         return 0;
263 }
264
265 static int fio_netio_accept(struct thread_data *td, struct fio_file *f)
266 {
267         struct netio_data *nd = td->io_ops->data;
268         socklen_t socklen = sizeof(nd->addr);
269         struct pollfd pfd;
270         int ret;
271
272         log_info("fio: waiting for connection\n");
273
274         /*
275          * Accept loop. poll for incoming events, accept them. Repeat until we
276          * have all connections.
277          */
278         while (!td->terminate) {
279                 pfd.fd = nd->listenfd;
280                 pfd.events = POLLIN;
281
282                 ret = poll(&pfd, 1, -1);
283                 if (ret < 0) {
284                         if (errno == EINTR)
285                                 continue;
286
287                         td_verror(td, errno, "poll");
288                         break;
289                 } else if (!ret)
290                         continue;
291
292                 /*
293                  * should be impossible
294                  */
295                 if (!(pfd.revents & POLLIN))
296                         continue;
297
298                 f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr, &socklen);
299                 if (f->fd < 0) {
300                         td_verror(td, errno, "accept");
301                         return 1;
302                 }
303                 break;
304         }
305
306         return 0;
307 }
308
309
310 static int fio_netio_open_file(struct thread_data *td, struct fio_file *f)
311 {
312         if (td_read(td))
313                 return fio_netio_accept(td, f);
314         else
315                 return fio_netio_connect(td, f);
316 }
317
318 static int fio_netio_setup_connect(struct thread_data *td, const char *host,
319                                    unsigned short port)
320 {
321         struct netio_data *nd = td->io_ops->data;
322
323         nd->addr.sin_family = AF_INET;
324         nd->addr.sin_port = htons(port);
325
326         if (inet_aton(host, &nd->addr.sin_addr) != 1) {
327                 struct hostent *hent;
328
329                 hent = gethostbyname(host);
330                 if (!hent) {
331                         td_verror(td, errno, "gethostbyname");
332                         return 1;
333                 }
334
335                 memcpy(&nd->addr.sin_addr, hent->h_addr, 4);
336         }
337
338         return 0;
339 }
340
341 static int fio_netio_setup_listen(struct thread_data *td, short port)
342 {
343         struct netio_data *nd = td->io_ops->data;
344         int fd, opt;
345
346         fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
347         if (fd < 0) {
348                 td_verror(td, errno, "socket");
349                 return 1;
350         }
351
352         opt = 1;
353         if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
354                 td_verror(td, errno, "setsockopt");
355                 return 1;
356         }
357 #ifdef SO_REUSEPORT
358         if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) < 0) {
359                 td_verror(td, errno, "setsockopt");
360                 return 1;
361         }
362 #endif
363
364         nd->addr.sin_family = AF_INET;
365         nd->addr.sin_addr.s_addr = htonl(INADDR_ANY);
366         nd->addr.sin_port = htons(port);
367
368         if (bind(fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
369                 td_verror(td, errno, "bind");
370                 return 1;
371         }
372         if (listen(fd, 1) < 0) {
373                 td_verror(td, errno, "listen");
374                 return 1;
375         }
376
377         nd->listenfd = fd;
378         return 0;
379 }
380
381 static int fio_netio_init(struct thread_data *td)
382 {
383         struct netio_data *nd = td->io_ops->data;
384         unsigned short port;
385         char host[64], buf[128];
386         char *sep;
387         int ret;
388
389         if (td_rw(td)) {
390                 log_err("fio: network connections must be read OR write\n");
391                 return 1;
392         }
393         if (td_random(td)) {
394                 log_err("fio: network IO can't be random\n");
395                 return 1;
396         }
397
398         strcpy(buf, td->o.filename);
399
400         sep = strchr(buf, '/');
401         if (!sep) {
402                 log_err("fio: bad network host/port <<%s>>\n", td->o.filename);
403                 return 1;
404         }
405
406         *sep = '\0';
407         sep++;
408         strcpy(host, buf);
409         port = atoi(sep);
410
411         if (td_read(td)) {
412                 nd->send_to_net = 0;
413                 ret = fio_netio_setup_listen(td, port);
414         } else {
415                 nd->send_to_net = 1;
416                 ret = fio_netio_setup_connect(td, host, port);
417         }
418
419         return ret;
420 }
421
422 static void fio_netio_cleanup(struct thread_data *td)
423 {
424         struct netio_data *nd = td->io_ops->data;
425
426         if (nd) {
427                 if (nd->listenfd != -1)
428                         close(nd->listenfd);
429                 if (nd->pipes[0] != -1)
430                         close(nd->pipes[0]);
431                 if (nd->pipes[1] != -1)
432                         close(nd->pipes[1]);
433
434                 free(nd);
435                 td->io_ops->data = NULL;
436         }
437 }
438
439 static int fio_netio_setup(struct thread_data *td)
440 {
441         struct netio_data *nd;
442
443         if (!td->io_ops->data) {
444                 nd = malloc(sizeof(*nd));;
445
446                 memset(nd, 0, sizeof(*nd));
447                 nd->listenfd = -1;
448                 nd->pipes[0] = nd->pipes[1] = -1;
449                 td->io_ops->data = nd;
450         }
451
452         return 0;
453 }
454
455 static int fio_netio_setup_splice(struct thread_data *td)
456 {
457         struct netio_data *nd;
458
459         fio_netio_setup(td);
460
461         nd = td->io_ops->data;
462         if (nd) {
463                 if (pipe(nd->pipes) < 0)
464                         return 1;
465
466                 nd->use_splice = 1;
467                 return 0;
468         }
469
470         return 1;
471 }
472
473 static struct ioengine_ops ioengine_rw = {
474         .name           = "net",
475         .version        = FIO_IOOPS_VERSION,
476         .prep           = fio_netio_prep,
477         .queue          = fio_netio_queue,
478         .setup          = fio_netio_setup,
479         .init           = fio_netio_init,
480         .cleanup        = fio_netio_cleanup,
481         .open_file      = fio_netio_open_file,
482         .close_file     = generic_close_file,
483         .flags          = FIO_SYNCIO | FIO_DISKLESSIO,
484 };
485
486 static struct ioengine_ops ioengine_splice = {
487         .name           = "netsplice",
488         .version        = FIO_IOOPS_VERSION,
489         .prep           = fio_netio_prep,
490         .queue          = fio_netio_queue,
491         .setup          = fio_netio_setup_splice,
492         .init           = fio_netio_init,
493         .cleanup        = fio_netio_cleanup,
494         .open_file      = fio_netio_open_file,
495         .close_file     = generic_close_file,
496         .flags          = FIO_SYNCIO | FIO_DISKLESSIO,
497 };
498
499 static void fio_init fio_netio_register(void)
500 {
501         register_ioengine(&ioengine_rw);
502         register_ioengine(&ioengine_splice);
503 }
504
505 static void fio_exit fio_netio_unregister(void)
506 {
507         unregister_ioengine(&ioengine_rw);
508         unregister_ioengine(&ioengine_splice);
509 }