net engine: missing includes
[fio.git] / engines / net.c
1 /*
2  * net engine
3  *
4  * IO engine that reads/writes to/from sockets.
5  *
6  */
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include <unistd.h>
10 #include <errno.h>
11 #include <assert.h>
12 #include <netinet/in.h>
13 #include <arpa/inet.h>
14 #include <netdb.h>
15 #include <sys/poll.h>
16 #include <sys/types.h>
17 #include <sys/socket.h>
18
19 #include "../fio.h"
20
21 struct netio_data {
22         int listenfd;
23         int send_to_net;
24         int use_splice;
25         int pipes[2];
26         char host[64];
27         struct sockaddr_in addr;
28 };
29
30 static int fio_netio_prep(struct thread_data *td, struct io_u *io_u)
31 {
32         struct netio_data *nd = td->io_ops->data;
33
34         /*
35          * Make sure we don't see spurious reads to a receiver, and vice versa
36          */
37         if ((nd->send_to_net && io_u->ddir == DDIR_READ) ||
38             (!nd->send_to_net && io_u->ddir == DDIR_WRITE)) {
39                 td_verror(td, EINVAL, "bad direction");
40                 return 1;
41         }
42                 
43         return 0;
44 }
45
46 #ifdef FIO_HAVE_SPLICE
47 static int splice_io_u(int fdin, int fdout, unsigned int len)
48 {
49         int bytes = 0;
50
51         while (len) {
52                 int ret = splice(fdin, NULL, fdout, NULL, len, 0);
53
54                 if (ret < 0) {
55                         if (!bytes)
56                                 bytes = ret;
57
58                         break;
59                 } else if (!ret)
60                         break;
61
62                 bytes += ret;
63                 len -= ret;
64         }
65
66         return bytes;
67 }
68
69 /*
70  * Receive bytes from a socket and fill them into the internal pipe
71  */
72 static int splice_in(struct thread_data *td, struct io_u *io_u)
73 {
74         struct netio_data *nd = td->io_ops->data;
75
76         return splice_io_u(io_u->file->fd, nd->pipes[1], io_u->xfer_buflen);
77 }
78
79 /*
80  * Transmit 'len' bytes from the internal pipe
81  */
82 static int splice_out(struct thread_data *td, struct io_u *io_u,
83                       unsigned int len)
84 {
85         struct netio_data *nd = td->io_ops->data;
86
87         return splice_io_u(nd->pipes[0], io_u->file->fd, len);
88 }
89
90 static int vmsplice_io_u(struct io_u *io_u, int fd, unsigned int len)
91 {
92         struct iovec iov = {
93                 .iov_base = io_u->xfer_buf,
94                 .iov_len = len,
95         };
96         int bytes = 0;
97
98         while (iov.iov_len) {
99                 int ret = vmsplice(fd, &iov, 1, SPLICE_F_MOVE);
100
101                 if (ret < 0) {
102                         if (!bytes)
103                                 bytes = ret;
104                         break;
105                 } else if (!ret)
106                         break;
107
108                 iov.iov_len -= ret;
109                 iov.iov_base += ret;
110                 bytes += ret;
111         }
112
113         return bytes;
114
115 }
116
117 /*
118  * vmsplice() pipe to io_u buffer
119  */
120 static int vmsplice_io_u_out(struct thread_data *td, struct io_u *io_u,
121                              unsigned int len)
122 {
123         struct netio_data *nd = td->io_ops->data;
124
125         return vmsplice_io_u(io_u, nd->pipes[0], len);
126 }
127
128 /*
129  * vmsplice() io_u to pipe
130  */
131 static int vmsplice_io_u_in(struct thread_data *td, struct io_u *io_u)
132 {
133         struct netio_data *nd = td->io_ops->data;
134
135         return vmsplice_io_u(io_u, nd->pipes[1], io_u->xfer_buflen);
136 }
137
138 /*
139  * splice receive - transfer socket data into a pipe using splice, then map
140  * that pipe data into the io_u using vmsplice.
141  */
142 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
143 {
144         int ret;
145
146         ret = splice_in(td, io_u);
147         if (ret > 0)
148                 return vmsplice_io_u_out(td, io_u, ret);
149
150         return ret;
151 }
152
153 /*
154  * splice transmit - map data from the io_u into a pipe by using vmsplice,
155  * then transfer that pipe to a socket using splice.
156  */
157 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
158 {
159         int ret;
160
161         ret = vmsplice_io_u_in(td, io_u);
162         if (ret > 0)
163                 return splice_out(td, io_u, ret);
164
165         return ret;
166 }
167 #else
168 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
169 {
170         errno = EOPNOTSUPP;
171         return -1;
172 }
173
174 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
175 {
176         errno = EOPNOTSUPP;
177         return -1;
178 }
179 #endif
180
181 static int fio_netio_send(struct thread_data *td, struct io_u *io_u)
182 {
183         int flags = 0;
184
185         /*
186          * if we are going to write more, set MSG_MORE
187          */
188 #ifdef MSG_MORE
189         if (td->this_io_bytes[DDIR_WRITE] + io_u->xfer_buflen < td->o.size)
190                 flags = MSG_MORE;
191 #endif
192
193         return send(io_u->file->fd, io_u->xfer_buf, io_u->xfer_buflen, flags);
194 }
195
196 static int fio_netio_recv(struct io_u *io_u)
197 {
198         int flags = MSG_WAITALL;
199
200         return recv(io_u->file->fd, io_u->xfer_buf, io_u->xfer_buflen, flags);
201 }
202
203 static int fio_netio_queue(struct thread_data *td, struct io_u *io_u)
204 {
205         struct netio_data *nd = td->io_ops->data;
206         int ret;
207
208         fio_ro_check(td, io_u);
209
210         if (io_u->ddir == DDIR_WRITE) {
211                 if (nd->use_splice)
212                         ret = fio_netio_splice_out(td, io_u);
213                 else
214                         ret = fio_netio_send(td, io_u);
215         } else if (io_u->ddir == DDIR_READ) {
216                 if (nd->use_splice)
217                         ret = fio_netio_splice_in(td, io_u);
218                 else
219                         ret = fio_netio_recv(io_u);
220         } else
221                 ret = 0;        /* must be a SYNC */
222
223         if (ret != (int) io_u->xfer_buflen) {
224                 if (ret >= 0) {
225                         io_u->resid = io_u->xfer_buflen - ret;
226                         io_u->error = 0;
227                         return FIO_Q_COMPLETED;
228                 } else
229                         io_u->error = errno;
230         }
231
232         if (io_u->error)
233                 td_verror(td, io_u->error, "xfer");
234
235         return FIO_Q_COMPLETED;
236 }
237
238 static int fio_netio_connect(struct thread_data *td, struct fio_file *f)
239 {
240         struct netio_data *nd = td->io_ops->data;
241
242         f->fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
243         if (f->fd < 0) {
244                 td_verror(td, errno, "socket");
245                 return 1;
246         }
247
248         if (connect(f->fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
249                 td_verror(td, errno, "connect");
250                 return 1;
251         }
252
253         return 0;
254 }
255
256 static int fio_netio_accept(struct thread_data *td, struct fio_file *f)
257 {
258         struct netio_data *nd = td->io_ops->data;
259         socklen_t socklen = sizeof(nd->addr);
260         struct pollfd pfd;
261         int ret;
262
263         log_info("fio: waiting for connection\n");
264
265         /*
266          * Accept loop. poll for incoming events, accept them. Repeat until we
267          * have all connections.
268          */
269         while (!td->terminate) {
270                 pfd.fd = nd->listenfd;
271                 pfd.events = POLLIN;
272
273                 ret = poll(&pfd, 1, -1);
274                 if (ret < 0) {
275                         if (errno == EINTR)
276                                 continue;
277
278                         td_verror(td, errno, "poll");
279                         break;
280                 } else if (!ret)
281                         continue;
282
283                 /*
284                  * should be impossible
285                  */
286                 if (!(pfd.revents & POLLIN))
287                         continue;
288
289                 f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr, &socklen);
290                 if (f->fd < 0) {
291                         td_verror(td, errno, "accept");
292                         return 1;
293                 }
294                 break;
295         }
296
297         return 0;
298 }
299
300 static int fio_netio_open_file(struct thread_data *td, struct fio_file *f)
301 {
302         if (td_read(td))
303                 return fio_netio_accept(td, f);
304         else
305                 return fio_netio_connect(td, f);
306 }
307
308 static int fio_netio_setup_connect(struct thread_data *td, const char *host,
309                                    unsigned short port)
310 {
311         struct netio_data *nd = td->io_ops->data;
312
313         nd->addr.sin_family = AF_INET;
314         nd->addr.sin_port = htons(port);
315
316         if (inet_aton(host, &nd->addr.sin_addr) != 1) {
317                 struct hostent *hent;
318
319                 hent = gethostbyname(host);
320                 if (!hent) {
321                         td_verror(td, errno, "gethostbyname");
322                         return 1;
323                 }
324
325                 memcpy(&nd->addr.sin_addr, hent->h_addr, 4);
326         }
327
328         return 0;
329 }
330
331 static int fio_netio_setup_listen(struct thread_data *td, short port)
332 {
333         struct netio_data *nd = td->io_ops->data;
334         int fd, opt;
335
336         fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
337         if (fd < 0) {
338                 td_verror(td, errno, "socket");
339                 return 1;
340         }
341
342         opt = 1;
343         if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
344                 td_verror(td, errno, "setsockopt");
345                 return 1;
346         }
347 #ifdef SO_REUSEPORT
348         if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) < 0) {
349                 td_verror(td, errno, "setsockopt");
350                 return 1;
351         }
352 #endif
353
354         nd->addr.sin_family = AF_INET;
355         nd->addr.sin_addr.s_addr = htonl(INADDR_ANY);
356         nd->addr.sin_port = htons(port);
357
358         if (bind(fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
359                 td_verror(td, errno, "bind");
360                 return 1;
361         }
362         if (listen(fd, 1) < 0) {
363                 td_verror(td, errno, "listen");
364                 return 1;
365         }
366
367         nd->listenfd = fd;
368         return 0;
369 }
370
371 static int fio_netio_init(struct thread_data *td)
372 {
373         struct netio_data *nd = td->io_ops->data;
374         unsigned int port;
375         char host[64], buf[128];
376         char *sep;
377         int ret;
378
379         if (td_rw(td)) {
380                 log_err("fio: network connections must be read OR write\n");
381                 return 1;
382         }
383         if (td_random(td)) {
384                 log_err("fio: network IO can't be random\n");
385                 return 1;
386         }
387
388         strcpy(buf, td->o.filename);
389
390         sep = strchr(buf, '/');
391         if (!sep)
392                 goto bad_host;
393
394         *sep = '\0';
395         sep++;
396         strcpy(host, buf);
397         if (!strlen(host))
398                 goto bad_host;
399
400         port = strtol(sep, NULL, 10);
401         if (!port || port > 65535)
402                 goto bad_host;
403
404         if (td_read(td)) {
405                 nd->send_to_net = 0;
406                 ret = fio_netio_setup_listen(td, port);
407         } else {
408                 nd->send_to_net = 1;
409                 ret = fio_netio_setup_connect(td, host, port);
410         }
411
412         return ret;
413 bad_host:
414         log_err("fio: bad network host/port: %s\n", td->o.filename);
415         return 1;
416 }
417
418 static void fio_netio_cleanup(struct thread_data *td)
419 {
420         struct netio_data *nd = td->io_ops->data;
421
422         if (nd) {
423                 if (nd->listenfd != -1)
424                         close(nd->listenfd);
425                 if (nd->pipes[0] != -1)
426                         close(nd->pipes[0]);
427                 if (nd->pipes[1] != -1)
428                         close(nd->pipes[1]);
429
430                 free(nd);
431         }
432 }
433
434 static int fio_netio_setup(struct thread_data *td)
435 {
436         struct netio_data *nd;
437
438         if (!td->io_ops->data) {
439                 nd = malloc(sizeof(*nd));;
440
441                 memset(nd, 0, sizeof(*nd));
442                 nd->listenfd = -1;
443                 nd->pipes[0] = nd->pipes[1] = -1;
444                 td->io_ops->data = nd;
445         }
446
447         return 0;
448 }
449
450 #ifdef FIO_HAVE_SPLICE
451 static int fio_netio_setup_splice(struct thread_data *td)
452 {
453         struct netio_data *nd;
454
455         fio_netio_setup(td);
456
457         nd = td->io_ops->data;
458         if (nd) {
459                 if (pipe(nd->pipes) < 0)
460                         return 1;
461
462                 nd->use_splice = 1;
463                 return 0;
464         }
465
466         return 1;
467 }
468
469 static struct ioengine_ops ioengine_splice = {
470         .name           = "netsplice",
471         .version        = FIO_IOOPS_VERSION,
472         .prep           = fio_netio_prep,
473         .queue          = fio_netio_queue,
474         .setup          = fio_netio_setup_splice,
475         .init           = fio_netio_init,
476         .cleanup        = fio_netio_cleanup,
477         .open_file      = fio_netio_open_file,
478         .close_file     = generic_close_file,
479         .flags          = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
480                           FIO_SIGQUIT,
481 };
482 #endif
483
484 static struct ioengine_ops ioengine_rw = {
485         .name           = "net",
486         .version        = FIO_IOOPS_VERSION,
487         .prep           = fio_netio_prep,
488         .queue          = fio_netio_queue,
489         .setup          = fio_netio_setup,
490         .init           = fio_netio_init,
491         .cleanup        = fio_netio_cleanup,
492         .open_file      = fio_netio_open_file,
493         .close_file     = generic_close_file,
494         .flags          = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
495                           FIO_SIGQUIT,
496 };
497
498 static void fio_init fio_netio_register(void)
499 {
500         register_ioengine(&ioengine_rw);
501 #ifdef FIO_HAVE_SPLICE
502         register_ioengine(&ioengine_splice);
503 #endif
504 }
505
506 static void fio_exit fio_netio_unregister(void)
507 {
508         unregister_ioengine(&ioengine_rw);
509 #ifdef FIO_HAVE_SPLICE
510         unregister_ioengine(&ioengine_splice);
511 #endif
512 }