posixaio: not all platforms have aio_fsync()
[fio.git] / engines / net.c
CommitLineData
ed92ac0c 1/*
da751ca9
JA
2 * net engine
3 *
4 * IO engine that reads/writes to/from sockets.
5 *
ed92ac0c
JA
6 */
7#include <stdio.h>
8#include <stdlib.h>
9#include <unistd.h>
10#include <errno.h>
11#include <assert.h>
12#include <netinet/in.h>
13#include <arpa/inet.h>
14#include <netdb.h>
5fdd124a 15#include <sys/poll.h>
ed92ac0c
JA
16
17#include "../fio.h"
ed92ac0c 18
b5af8293
JA
19struct netio_data {
20 int listenfd;
21 int send_to_net;
9cce02e8
JA
22 int use_splice;
23 int pipes[2];
b5af8293
JA
24 char host[64];
25 struct sockaddr_in addr;
26};
ed92ac0c
JA
27
28static int fio_netio_prep(struct thread_data *td, struct io_u *io_u)
29{
b5af8293 30 struct netio_data *nd = td->io_ops->data;
ed92ac0c 31
7a6499da
JA
32 /*
33 * Make sure we don't see spurious reads to a receiver, and vice versa
34 */
b5af8293
JA
35 if ((nd->send_to_net && io_u->ddir == DDIR_READ) ||
36 (!nd->send_to_net && io_u->ddir == DDIR_WRITE)) {
e1161c32 37 td_verror(td, EINVAL, "bad direction");
7a6499da 38 return 1;
ed92ac0c 39 }
7a6499da 40
f85ac25a 41 return 0;
ed92ac0c
JA
42}
43
5921e80c 44#ifdef FIO_HAVE_SPLICE
cd963e18 45static int splice_io_u(int fdin, int fdout, unsigned int len)
ed92ac0c 46{
9cce02e8 47 int bytes = 0;
7a6499da 48
9cce02e8 49 while (len) {
cd963e18 50 int ret = splice(fdin, NULL, fdout, NULL, len, 0);
9cce02e8
JA
51
52 if (ret < 0) {
53 if (!bytes)
54 bytes = ret;
55
56 break;
57 } else if (!ret)
58 break;
59
60 bytes += ret;
f657a2fb 61 len -= ret;
9cce02e8
JA
62 }
63
64 return bytes;
65}
66
67/*
cd963e18 68 * Receive bytes from a socket and fill them into the internal pipe
9cce02e8 69 */
cd963e18 70static int splice_in(struct thread_data *td, struct io_u *io_u)
9cce02e8
JA
71{
72 struct netio_data *nd = td->io_ops->data;
9cce02e8 73
cd963e18 74 return splice_io_u(io_u->file->fd, nd->pipes[1], io_u->xfer_buflen);
9cce02e8
JA
75}
76
77/*
cd963e18 78 * Transmit 'len' bytes from the internal pipe
9cce02e8 79 */
cd963e18
JA
80static int splice_out(struct thread_data *td, struct io_u *io_u,
81 unsigned int len)
9cce02e8
JA
82{
83 struct netio_data *nd = td->io_ops->data;
cd963e18
JA
84
85 return splice_io_u(nd->pipes[0], io_u->file->fd, len);
86}
87
88static int vmsplice_io_u(struct io_u *io_u, int fd, unsigned int len)
89{
9cce02e8
JA
90 struct iovec iov = {
91 .iov_base = io_u->xfer_buf,
92 .iov_len = len,
93 };
94 int bytes = 0;
95
96 while (iov.iov_len) {
cd963e18 97 int ret = vmsplice(fd, &iov, 1, SPLICE_F_MOVE);
9cce02e8
JA
98
99 if (ret < 0) {
100 if (!bytes)
101 bytes = ret;
102 break;
103 } else if (!ret)
104 break;
105
106 iov.iov_len -= ret;
cd963e18 107 iov.iov_base += ret;
f657a2fb 108 bytes += ret;
9cce02e8
JA
109 }
110
111 return bytes;
cd963e18 112
9cce02e8
JA
113}
114
115/*
cd963e18 116 * vmsplice() pipe to io_u buffer
9cce02e8 117 */
cd963e18
JA
118static int vmsplice_io_u_out(struct thread_data *td, struct io_u *io_u,
119 unsigned int len)
9cce02e8
JA
120{
121 struct netio_data *nd = td->io_ops->data;
9cce02e8 122
cd963e18
JA
123 return vmsplice_io_u(io_u, nd->pipes[0], len);
124}
9cce02e8 125
cd963e18
JA
126/*
127 * vmsplice() io_u to pipe
128 */
129static int vmsplice_io_u_in(struct thread_data *td, struct io_u *io_u)
130{
131 struct netio_data *nd = td->io_ops->data;
ed92ac0c 132
cd963e18 133 return vmsplice_io_u(io_u, nd->pipes[1], io_u->xfer_buflen);
9cce02e8
JA
134}
135
cd963e18
JA
136/*
137 * splice receive - transfer socket data into a pipe using splice, then map
138 * that pipe data into the io_u using vmsplice.
139 */
9cce02e8
JA
140static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
141{
142 int ret;
143
144 ret = splice_in(td, io_u);
cd963e18
JA
145 if (ret > 0)
146 return vmsplice_io_u_out(td, io_u, ret);
9cce02e8 147
cd963e18 148 return ret;
9cce02e8
JA
149}
150
cd963e18
JA
151/*
152 * splice transmit - map data from the io_u into a pipe by using vmsplice,
153 * then transfer that pipe to a socket using splice.
154 */
9cce02e8
JA
155static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
156{
157 int ret;
158
159 ret = vmsplice_io_u_in(td, io_u);
cd963e18
JA
160 if (ret > 0)
161 return splice_out(td, io_u, ret);
9cce02e8 162
cd963e18 163 return ret;
9cce02e8 164}
5921e80c
JA
165#else
166static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
167{
af8771b9 168 errno = EOPNOTSUPP;
5921e80c
JA
169 return -1;
170}
171
172static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
173{
af8771b9 174 errno = EOPNOTSUPP;
5921e80c
JA
175 return -1;
176}
177#endif
9cce02e8
JA
178
179static int fio_netio_send(struct thread_data *td, struct io_u *io_u)
180{
181 int flags = 0;
182
183 /*
184 * if we are going to write more, set MSG_MORE
185 */
5921e80c 186#ifdef MSG_MORE
9cce02e8
JA
187 if (td->this_io_bytes[DDIR_WRITE] + io_u->xfer_buflen < td->o.size)
188 flags = MSG_MORE;
5921e80c 189#endif
9cce02e8
JA
190
191 return send(io_u->file->fd, io_u->xfer_buf, io_u->xfer_buflen, flags);
192}
193
194static int fio_netio_recv(struct io_u *io_u)
195{
196 int flags = MSG_WAITALL;
197
198 return recv(io_u->file->fd, io_u->xfer_buf, io_u->xfer_buflen, flags);
199}
200
201static int fio_netio_queue(struct thread_data *td, struct io_u *io_u)
202{
203 struct netio_data *nd = td->io_ops->data;
204 int ret;
205
7101d9c2
JA
206 fio_ro_check(td, io_u);
207
9cce02e8
JA
208 if (io_u->ddir == DDIR_WRITE) {
209 if (nd->use_splice)
210 ret = fio_netio_splice_out(td, io_u);
211 else
212 ret = fio_netio_send(td, io_u);
d4f12dd0 213 } else if (io_u->ddir == DDIR_READ) {
9cce02e8
JA
214 if (nd->use_splice)
215 ret = fio_netio_splice_in(td, io_u);
216 else
217 ret = fio_netio_recv(io_u);
d4f12dd0 218 } else
7a6499da 219 ret = 0; /* must be a SYNC */
ed92ac0c 220
cec6b55d 221 if (ret != (int) io_u->xfer_buflen) {
22819ec2 222 if (ret >= 0) {
cec6b55d
JA
223 io_u->resid = io_u->xfer_buflen - ret;
224 io_u->error = 0;
36167d82 225 return FIO_Q_COMPLETED;
ed92ac0c
JA
226 } else
227 io_u->error = errno;
228 }
229
36167d82 230 if (io_u->error)
e1161c32 231 td_verror(td, io_u->error, "xfer");
ed92ac0c 232
36167d82 233 return FIO_Q_COMPLETED;
ed92ac0c
JA
234}
235
b5af8293 236static int fio_netio_connect(struct thread_data *td, struct fio_file *f)
ed92ac0c 237{
b5af8293 238 struct netio_data *nd = td->io_ops->data;
ed92ac0c 239
b5af8293
JA
240 f->fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
241 if (f->fd < 0) {
242 td_verror(td, errno, "socket");
243 return 1;
ed92ac0c
JA
244 }
245
b5af8293
JA
246 if (connect(f->fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
247 td_verror(td, errno, "connect");
248 return 1;
ed92ac0c
JA
249 }
250
251 return 0;
ed92ac0c
JA
252}
253
b5af8293 254static int fio_netio_accept(struct thread_data *td, struct fio_file *f)
5fdd124a 255{
b5af8293
JA
256 struct netio_data *nd = td->io_ops->data;
257 socklen_t socklen = sizeof(nd->addr);
5fdd124a 258 struct pollfd pfd;
b5af8293 259 int ret;
5fdd124a 260
6d86144d 261 log_info("fio: waiting for connection\n");
5fdd124a
JA
262
263 /*
264 * Accept loop. poll for incoming events, accept them. Repeat until we
265 * have all connections.
266 */
b5af8293
JA
267 while (!td->terminate) {
268 pfd.fd = nd->listenfd;
5fdd124a
JA
269 pfd.events = POLLIN;
270
271 ret = poll(&pfd, 1, -1);
272 if (ret < 0) {
273 if (errno == EINTR)
274 continue;
275
e1161c32 276 td_verror(td, errno, "poll");
5fdd124a
JA
277 break;
278 } else if (!ret)
279 continue;
280
0c09442b
JA
281 /*
282 * should be impossible
283 */
284 if (!(pfd.revents & POLLIN))
285 continue;
286
b5af8293
JA
287 f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr, &socklen);
288 if (f->fd < 0) {
289 td_verror(td, errno, "accept");
290 return 1;
291 }
292 break;
293 }
5fdd124a 294
b5af8293
JA
295 return 0;
296}
297
b5af8293
JA
298static int fio_netio_open_file(struct thread_data *td, struct fio_file *f)
299{
300 if (td_read(td))
301 return fio_netio_accept(td, f);
302 else
303 return fio_netio_connect(td, f);
304}
305
306static int fio_netio_setup_connect(struct thread_data *td, const char *host,
307 unsigned short port)
308{
309 struct netio_data *nd = td->io_ops->data;
310
311 nd->addr.sin_family = AF_INET;
312 nd->addr.sin_port = htons(port);
313
314 if (inet_aton(host, &nd->addr.sin_addr) != 1) {
315 struct hostent *hent;
316
317 hent = gethostbyname(host);
318 if (!hent) {
319 td_verror(td, errno, "gethostbyname");
320 return 1;
5fdd124a 321 }
b5af8293
JA
322
323 memcpy(&nd->addr.sin_addr, hent->h_addr, 4);
5fdd124a
JA
324 }
325
326 return 0;
327}
328
b5af8293 329static int fio_netio_setup_listen(struct thread_data *td, short port)
ed92ac0c 330{
b5af8293 331 struct netio_data *nd = td->io_ops->data;
5fdd124a 332 int fd, opt;
ed92ac0c 333
6bedbfaf 334 fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
ed92ac0c 335 if (fd < 0) {
e1161c32 336 td_verror(td, errno, "socket");
ed92ac0c
JA
337 return 1;
338 }
339
340 opt = 1;
341 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
e1161c32 342 td_verror(td, errno, "setsockopt");
ed92ac0c
JA
343 return 1;
344 }
6bedbfaf
JA
345#ifdef SO_REUSEPORT
346 if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) < 0) {
e1161c32 347 td_verror(td, errno, "setsockopt");
6bedbfaf
JA
348 return 1;
349 }
350#endif
ed92ac0c 351
b5af8293
JA
352 nd->addr.sin_family = AF_INET;
353 nd->addr.sin_addr.s_addr = htonl(INADDR_ANY);
354 nd->addr.sin_port = htons(port);
ed92ac0c 355
b5af8293 356 if (bind(fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
e1161c32 357 td_verror(td, errno, "bind");
ed92ac0c
JA
358 return 1;
359 }
360 if (listen(fd, 1) < 0) {
e1161c32 361 td_verror(td, errno, "listen");
ed92ac0c
JA
362 return 1;
363 }
364
b5af8293
JA
365 nd->listenfd = fd;
366 return 0;
ed92ac0c
JA
367}
368
9bec88e1 369static int fio_netio_init(struct thread_data *td)
ed92ac0c 370{
b5af8293 371 struct netio_data *nd = td->io_ops->data;
443662ef 372 unsigned int port;
b5af8293 373 char host[64], buf[128];
ed92ac0c 374 char *sep;
af52b345 375 int ret;
ed92ac0c 376
413dd459 377 if (td_rw(td)) {
ed92ac0c
JA
378 log_err("fio: network connections must be read OR write\n");
379 return 1;
380 }
16d55aae
JA
381 if (td_random(td)) {
382 log_err("fio: network IO can't be random\n");
383 return 1;
384 }
ed92ac0c 385
2dc1bbeb 386 strcpy(buf, td->o.filename);
ed92ac0c 387
9f9214f2 388 sep = strchr(buf, '/');
443662ef
JA
389 if (!sep)
390 goto bad_host;
ed92ac0c
JA
391
392 *sep = '\0';
393 sep++;
394 strcpy(host, buf);
443662ef
JA
395 if (!strlen(host))
396 goto bad_host;
397
398 port = strtol(sep, NULL, 10);
399 if (!port || port > 65535)
400 goto bad_host;
ed92ac0c 401
413dd459 402 if (td_read(td)) {
b5af8293 403 nd->send_to_net = 0;
ed92ac0c
JA
404 ret = fio_netio_setup_listen(td, port);
405 } else {
b5af8293 406 nd->send_to_net = 1;
ed92ac0c
JA
407 ret = fio_netio_setup_connect(td, host, port);
408 }
409
7bb48f84 410 return ret;
443662ef
JA
411bad_host:
412 log_err("fio: bad network host/port: %s\n", td->o.filename);
413 return 1;
ed92ac0c
JA
414}
415
b5af8293 416static void fio_netio_cleanup(struct thread_data *td)
9bec88e1 417{
b5af8293
JA
418 struct netio_data *nd = td->io_ops->data;
419
420 if (nd) {
64b24cd8
JA
421 if (nd->listenfd != -1)
422 close(nd->listenfd);
423 if (nd->pipes[0] != -1)
424 close(nd->pipes[0]);
425 if (nd->pipes[1] != -1)
426 close(nd->pipes[1]);
427
b5af8293 428 free(nd);
b5af8293
JA
429 }
430}
431
432static int fio_netio_setup(struct thread_data *td)
433{
7bb48f84 434 struct netio_data *nd;
7bb48f84
JA
435
436 if (!td->io_ops->data) {
437 nd = malloc(sizeof(*nd));;
438
439 memset(nd, 0, sizeof(*nd));
440 nd->listenfd = -1;
64b24cd8 441 nd->pipes[0] = nd->pipes[1] = -1;
7bb48f84 442 td->io_ops->data = nd;
7bb48f84 443 }
b5af8293 444
9bec88e1
JA
445 return 0;
446}
447
5921e80c 448#ifdef FIO_HAVE_SPLICE
9cce02e8
JA
449static int fio_netio_setup_splice(struct thread_data *td)
450{
451 struct netio_data *nd;
452
453 fio_netio_setup(td);
454
455 nd = td->io_ops->data;
456 if (nd) {
457 if (pipe(nd->pipes) < 0)
458 return 1;
459
460 nd->use_splice = 1;
461 return 0;
462 }
463
464 return 1;
465}
466
5921e80c
JA
467static struct ioengine_ops ioengine_splice = {
468 .name = "netsplice",
ed92ac0c 469 .version = FIO_IOOPS_VERSION,
ed92ac0c
JA
470 .prep = fio_netio_prep,
471 .queue = fio_netio_queue,
5921e80c 472 .setup = fio_netio_setup_splice,
9bec88e1 473 .init = fio_netio_init,
b5af8293
JA
474 .cleanup = fio_netio_cleanup,
475 .open_file = fio_netio_open_file,
476 .close_file = generic_close_file,
ad830ed7
JA
477 .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
478 FIO_SIGQUIT,
ed92ac0c 479};
5921e80c 480#endif
ed92ac0c 481
5921e80c
JA
482static struct ioengine_ops ioengine_rw = {
483 .name = "net",
9cce02e8
JA
484 .version = FIO_IOOPS_VERSION,
485 .prep = fio_netio_prep,
486 .queue = fio_netio_queue,
5921e80c 487 .setup = fio_netio_setup,
9cce02e8
JA
488 .init = fio_netio_init,
489 .cleanup = fio_netio_cleanup,
490 .open_file = fio_netio_open_file,
491 .close_file = generic_close_file,
ad830ed7
JA
492 .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
493 FIO_SIGQUIT,
9cce02e8
JA
494};
495
ed92ac0c
JA
496static void fio_init fio_netio_register(void)
497{
9cce02e8 498 register_ioengine(&ioengine_rw);
5921e80c 499#ifdef FIO_HAVE_SPLICE
9cce02e8 500 register_ioengine(&ioengine_splice);
5921e80c 501#endif
ed92ac0c
JA
502}
503
504static void fio_exit fio_netio_unregister(void)
505{
9cce02e8 506 unregister_ioengine(&ioengine_rw);
5921e80c 507#ifdef FIO_HAVE_SPLICE
9cce02e8 508 unregister_ioengine(&ioengine_splice);
5921e80c 509#endif
ed92ac0c 510}