Add dprint() to io_u requeue
[fio.git] / engines / net.c
... / ...
CommitLineData
1/*
2 * net engine
3 *
4 * IO engine that reads/writes to/from sockets.
5 *
6 */
7#include <stdio.h>
8#include <stdlib.h>
9#include <unistd.h>
10#include <errno.h>
11#include <assert.h>
12#include <netinet/in.h>
13#include <arpa/inet.h>
14#include <netdb.h>
15#include <sys/poll.h>
16
17#include "../fio.h"
18
19struct netio_data {
20 int listenfd;
21 int send_to_net;
22 int use_splice;
23 int pipes[2];
24 char host[64];
25 struct sockaddr_in addr;
26};
27
28static int fio_netio_prep(struct thread_data *td, struct io_u *io_u)
29{
30 struct netio_data *nd = td->io_ops->data;
31
32 /*
33 * Make sure we don't see spurious reads to a receiver, and vice versa
34 */
35 if ((nd->send_to_net && io_u->ddir == DDIR_READ) ||
36 (!nd->send_to_net && io_u->ddir == DDIR_WRITE)) {
37 td_verror(td, EINVAL, "bad direction");
38 return 1;
39 }
40
41 return 0;
42}
43
44#ifdef FIO_HAVE_SPLICE
45static int splice_io_u(int fdin, int fdout, unsigned int len)
46{
47 int bytes = 0;
48
49 while (len) {
50 int ret = splice(fdin, NULL, fdout, NULL, len, 0);
51
52 if (ret < 0) {
53 if (!bytes)
54 bytes = ret;
55
56 break;
57 } else if (!ret)
58 break;
59
60 bytes += ret;
61 len -= ret;
62 }
63
64 return bytes;
65}
66
67/*
68 * Receive bytes from a socket and fill them into the internal pipe
69 */
70static int splice_in(struct thread_data *td, struct io_u *io_u)
71{
72 struct netio_data *nd = td->io_ops->data;
73
74 return splice_io_u(io_u->file->fd, nd->pipes[1], io_u->xfer_buflen);
75}
76
77/*
78 * Transmit 'len' bytes from the internal pipe
79 */
80static int splice_out(struct thread_data *td, struct io_u *io_u,
81 unsigned int len)
82{
83 struct netio_data *nd = td->io_ops->data;
84
85 return splice_io_u(nd->pipes[0], io_u->file->fd, len);
86}
87
88static int vmsplice_io_u(struct io_u *io_u, int fd, unsigned int len)
89{
90 struct iovec iov = {
91 .iov_base = io_u->xfer_buf,
92 .iov_len = len,
93 };
94 int bytes = 0;
95
96 while (iov.iov_len) {
97 int ret = vmsplice(fd, &iov, 1, SPLICE_F_MOVE);
98
99 if (ret < 0) {
100 if (!bytes)
101 bytes = ret;
102 break;
103 } else if (!ret)
104 break;
105
106 iov.iov_len -= ret;
107 iov.iov_base += ret;
108 bytes += ret;
109 }
110
111 return bytes;
112
113}
114
115/*
116 * vmsplice() pipe to io_u buffer
117 */
118static int vmsplice_io_u_out(struct thread_data *td, struct io_u *io_u,
119 unsigned int len)
120{
121 struct netio_data *nd = td->io_ops->data;
122
123 return vmsplice_io_u(io_u, nd->pipes[0], len);
124}
125
126/*
127 * vmsplice() io_u to pipe
128 */
129static int vmsplice_io_u_in(struct thread_data *td, struct io_u *io_u)
130{
131 struct netio_data *nd = td->io_ops->data;
132
133 return vmsplice_io_u(io_u, nd->pipes[1], io_u->xfer_buflen);
134}
135
136/*
137 * splice receive - transfer socket data into a pipe using splice, then map
138 * that pipe data into the io_u using vmsplice.
139 */
140static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
141{
142 int ret;
143
144 ret = splice_in(td, io_u);
145 if (ret > 0)
146 return vmsplice_io_u_out(td, io_u, ret);
147
148 return ret;
149}
150
151/*
152 * splice transmit - map data from the io_u into a pipe by using vmsplice,
153 * then transfer that pipe to a socket using splice.
154 */
155static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
156{
157 int ret;
158
159 ret = vmsplice_io_u_in(td, io_u);
160 if (ret > 0)
161 return splice_out(td, io_u, ret);
162
163 return ret;
164}
165#else
166static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
167{
168 errno = -EOPNOTSUPP;
169 return -1;
170}
171
172static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
173{
174 errno = -EOPNOTSUPP;
175 return -1;
176}
177#endif
178
179static int fio_netio_send(struct thread_data *td, struct io_u *io_u)
180{
181 int flags = 0;
182
183 /*
184 * if we are going to write more, set MSG_MORE
185 */
186#ifdef MSG_MORE
187 if (td->this_io_bytes[DDIR_WRITE] + io_u->xfer_buflen < td->o.size)
188 flags = MSG_MORE;
189#endif
190
191 return send(io_u->file->fd, io_u->xfer_buf, io_u->xfer_buflen, flags);
192}
193
194static int fio_netio_recv(struct io_u *io_u)
195{
196 int flags = MSG_WAITALL;
197
198 return recv(io_u->file->fd, io_u->xfer_buf, io_u->xfer_buflen, flags);
199}
200
201static int fio_netio_queue(struct thread_data *td, struct io_u *io_u)
202{
203 struct netio_data *nd = td->io_ops->data;
204 int ret;
205
206 fio_ro_check(td, io_u);
207
208 if (io_u->ddir == DDIR_WRITE) {
209 if (nd->use_splice)
210 ret = fio_netio_splice_out(td, io_u);
211 else
212 ret = fio_netio_send(td, io_u);
213 } else if (io_u->ddir == DDIR_READ) {
214 if (nd->use_splice)
215 ret = fio_netio_splice_in(td, io_u);
216 else
217 ret = fio_netio_recv(io_u);
218 } else
219 ret = 0; /* must be a SYNC */
220
221 if (ret != (int) io_u->xfer_buflen) {
222 if (ret >= 0) {
223 io_u->resid = io_u->xfer_buflen - ret;
224 io_u->error = 0;
225 return FIO_Q_COMPLETED;
226 } else
227 io_u->error = errno;
228 }
229
230 if (io_u->error)
231 td_verror(td, io_u->error, "xfer");
232
233 return FIO_Q_COMPLETED;
234}
235
236static int fio_netio_connect(struct thread_data *td, struct fio_file *f)
237{
238 struct netio_data *nd = td->io_ops->data;
239
240 f->fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
241 if (f->fd < 0) {
242 td_verror(td, errno, "socket");
243 return 1;
244 }
245
246 if (connect(f->fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
247 td_verror(td, errno, "connect");
248 return 1;
249 }
250
251 return 0;
252}
253
254static int fio_netio_accept(struct thread_data *td, struct fio_file *f)
255{
256 struct netio_data *nd = td->io_ops->data;
257 socklen_t socklen = sizeof(nd->addr);
258 struct pollfd pfd;
259 int ret;
260
261 log_info("fio: waiting for connection\n");
262
263 /*
264 * Accept loop. poll for incoming events, accept them. Repeat until we
265 * have all connections.
266 */
267 while (!td->terminate) {
268 pfd.fd = nd->listenfd;
269 pfd.events = POLLIN;
270
271 ret = poll(&pfd, 1, -1);
272 if (ret < 0) {
273 if (errno == EINTR)
274 continue;
275
276 td_verror(td, errno, "poll");
277 break;
278 } else if (!ret)
279 continue;
280
281 /*
282 * should be impossible
283 */
284 if (!(pfd.revents & POLLIN))
285 continue;
286
287 f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr, &socklen);
288 if (f->fd < 0) {
289 td_verror(td, errno, "accept");
290 return 1;
291 }
292 break;
293 }
294
295 return 0;
296}
297
298static int fio_netio_open_file(struct thread_data *td, struct fio_file *f)
299{
300 if (td_read(td))
301 return fio_netio_accept(td, f);
302 else
303 return fio_netio_connect(td, f);
304}
305
306static int fio_netio_setup_connect(struct thread_data *td, const char *host,
307 unsigned short port)
308{
309 struct netio_data *nd = td->io_ops->data;
310
311 nd->addr.sin_family = AF_INET;
312 nd->addr.sin_port = htons(port);
313
314 if (inet_aton(host, &nd->addr.sin_addr) != 1) {
315 struct hostent *hent;
316
317 hent = gethostbyname(host);
318 if (!hent) {
319 td_verror(td, errno, "gethostbyname");
320 return 1;
321 }
322
323 memcpy(&nd->addr.sin_addr, hent->h_addr, 4);
324 }
325
326 return 0;
327}
328
329static int fio_netio_setup_listen(struct thread_data *td, short port)
330{
331 struct netio_data *nd = td->io_ops->data;
332 int fd, opt;
333
334 fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
335 if (fd < 0) {
336 td_verror(td, errno, "socket");
337 return 1;
338 }
339
340 opt = 1;
341 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
342 td_verror(td, errno, "setsockopt");
343 return 1;
344 }
345#ifdef SO_REUSEPORT
346 if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) < 0) {
347 td_verror(td, errno, "setsockopt");
348 return 1;
349 }
350#endif
351
352 nd->addr.sin_family = AF_INET;
353 nd->addr.sin_addr.s_addr = htonl(INADDR_ANY);
354 nd->addr.sin_port = htons(port);
355
356 if (bind(fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
357 td_verror(td, errno, "bind");
358 return 1;
359 }
360 if (listen(fd, 1) < 0) {
361 td_verror(td, errno, "listen");
362 return 1;
363 }
364
365 nd->listenfd = fd;
366 return 0;
367}
368
369static int fio_netio_init(struct thread_data *td)
370{
371 struct netio_data *nd = td->io_ops->data;
372 unsigned int port;
373 char host[64], buf[128];
374 char *sep;
375 int ret;
376
377 if (td_rw(td)) {
378 log_err("fio: network connections must be read OR write\n");
379 return 1;
380 }
381 if (td_random(td)) {
382 log_err("fio: network IO can't be random\n");
383 return 1;
384 }
385
386 strcpy(buf, td->o.filename);
387
388 sep = strchr(buf, '/');
389 if (!sep)
390 goto bad_host;
391
392 *sep = '\0';
393 sep++;
394 strcpy(host, buf);
395 if (!strlen(host))
396 goto bad_host;
397
398 port = strtol(sep, NULL, 10);
399 if (!port || port > 65535)
400 goto bad_host;
401
402 if (td_read(td)) {
403 nd->send_to_net = 0;
404 ret = fio_netio_setup_listen(td, port);
405 } else {
406 nd->send_to_net = 1;
407 ret = fio_netio_setup_connect(td, host, port);
408 }
409
410 return ret;
411bad_host:
412 log_err("fio: bad network host/port: %s\n", td->o.filename);
413 return 1;
414}
415
416static void fio_netio_cleanup(struct thread_data *td)
417{
418 struct netio_data *nd = td->io_ops->data;
419
420 if (nd) {
421 if (nd->listenfd != -1)
422 close(nd->listenfd);
423 if (nd->pipes[0] != -1)
424 close(nd->pipes[0]);
425 if (nd->pipes[1] != -1)
426 close(nd->pipes[1]);
427
428 free(nd);
429 }
430}
431
432static int fio_netio_setup(struct thread_data *td)
433{
434 struct netio_data *nd;
435
436 if (!td->io_ops->data) {
437 nd = malloc(sizeof(*nd));;
438
439 memset(nd, 0, sizeof(*nd));
440 nd->listenfd = -1;
441 nd->pipes[0] = nd->pipes[1] = -1;
442 td->io_ops->data = nd;
443 }
444
445 return 0;
446}
447
448#ifdef FIO_HAVE_SPLICE
449static int fio_netio_setup_splice(struct thread_data *td)
450{
451 struct netio_data *nd;
452
453 fio_netio_setup(td);
454
455 nd = td->io_ops->data;
456 if (nd) {
457 if (pipe(nd->pipes) < 0)
458 return 1;
459
460 nd->use_splice = 1;
461 return 0;
462 }
463
464 return 1;
465}
466
467static struct ioengine_ops ioengine_splice = {
468 .name = "netsplice",
469 .version = FIO_IOOPS_VERSION,
470 .prep = fio_netio_prep,
471 .queue = fio_netio_queue,
472 .setup = fio_netio_setup_splice,
473 .init = fio_netio_init,
474 .cleanup = fio_netio_cleanup,
475 .open_file = fio_netio_open_file,
476 .close_file = generic_close_file,
477 .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
478 FIO_SIGQUIT,
479};
480#endif
481
482static struct ioengine_ops ioengine_rw = {
483 .name = "net",
484 .version = FIO_IOOPS_VERSION,
485 .prep = fio_netio_prep,
486 .queue = fio_netio_queue,
487 .setup = fio_netio_setup,
488 .init = fio_netio_init,
489 .cleanup = fio_netio_cleanup,
490 .open_file = fio_netio_open_file,
491 .close_file = generic_close_file,
492 .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
493 FIO_SIGQUIT,
494};
495
496static void fio_init fio_netio_register(void)
497{
498 register_ioengine(&ioengine_rw);
499#ifdef FIO_HAVE_SPLICE
500 register_ioengine(&ioengine_splice);
501#endif
502}
503
504static void fio_exit fio_netio_unregister(void)
505{
506 unregister_ioengine(&ioengine_rw);
507#ifdef FIO_HAVE_SPLICE
508 unregister_ioengine(&ioengine_splice);
509#endif
510}