4 * IO engine that reads/writes to/from sockets.
12 #include <netinet/in.h>
13 #include <arpa/inet.h>
16 #include <sys/types.h>
17 #include <sys/socket.h>
28 struct sockaddr_in addr;
32 * Return -1 for error and 'nr events' for a positive number
35 static int poll_wait(struct thread_data *td, int fd, short events)
40 while (!td->terminate) {
43 ret = poll(&pfd, 1, -1);
48 td_verror(td, errno, "poll");
56 if (pfd.revents & events)
62 static int fio_netio_prep(struct thread_data *td, struct io_u *io_u)
64 struct netio_data *nd = td->io_ops->data;
67 * Make sure we don't see spurious reads to a receiver, and vice versa
69 if ((nd->send_to_net && io_u->ddir == DDIR_READ) ||
70 (!nd->send_to_net && io_u->ddir == DDIR_WRITE)) {
71 td_verror(td, EINVAL, "bad direction");
78 #ifdef FIO_HAVE_SPLICE
79 static int splice_io_u(int fdin, int fdout, unsigned int len)
84 int ret = splice(fdin, NULL, fdout, NULL, len, 0);
102 * Receive bytes from a socket and fill them into the internal pipe
104 static int splice_in(struct thread_data *td, struct io_u *io_u)
106 struct netio_data *nd = td->io_ops->data;
108 return splice_io_u(io_u->file->fd, nd->pipes[1], io_u->xfer_buflen);
112 * Transmit 'len' bytes from the internal pipe
114 static int splice_out(struct thread_data *td, struct io_u *io_u,
117 struct netio_data *nd = td->io_ops->data;
119 return splice_io_u(nd->pipes[0], io_u->file->fd, len);
122 static int vmsplice_io_u(struct io_u *io_u, int fd, unsigned int len)
125 .iov_base = io_u->xfer_buf,
130 while (iov.iov_len) {
131 int ret = vmsplice(fd, &iov, 1, SPLICE_F_MOVE);
150 * vmsplice() pipe to io_u buffer
152 static int vmsplice_io_u_out(struct thread_data *td, struct io_u *io_u,
155 struct netio_data *nd = td->io_ops->data;
157 return vmsplice_io_u(io_u, nd->pipes[0], len);
161 * vmsplice() io_u to pipe
163 static int vmsplice_io_u_in(struct thread_data *td, struct io_u *io_u)
165 struct netio_data *nd = td->io_ops->data;
167 return vmsplice_io_u(io_u, nd->pipes[1], io_u->xfer_buflen);
171 * splice receive - transfer socket data into a pipe using splice, then map
172 * that pipe data into the io_u using vmsplice.
174 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
178 ret = splice_in(td, io_u);
180 return vmsplice_io_u_out(td, io_u, ret);
186 * splice transmit - map data from the io_u into a pipe by using vmsplice,
187 * then transfer that pipe to a socket using splice.
189 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
193 ret = vmsplice_io_u_in(td, io_u);
195 return splice_out(td, io_u, ret);
200 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
206 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
213 static int fio_netio_send(struct thread_data *td, struct io_u *io_u)
215 struct netio_data *nd = td->io_ops->data;
218 ret = poll_wait(td, io_u->file->fd, POLLOUT);
223 * if we are going to write more, set MSG_MORE
226 if (td->this_io_bytes[DDIR_WRITE] + io_u->xfer_buflen < td->o.size)
230 if (nd->net_protocol == IPPROTO_UDP) {
231 return sendto(io_u->file->fd, io_u->xfer_buf, io_u->xfer_buflen,
232 0, &nd->addr, sizeof(nd->addr));
234 return send(io_u->file->fd, io_u->xfer_buf, io_u->xfer_buflen,
239 static int fio_netio_recv(struct thread_data *td, struct io_u *io_u)
241 struct netio_data *nd = td->io_ops->data;
242 int ret, flags = MSG_WAITALL;
244 ret = poll_wait(td, io_u->file->fd, POLLIN);
248 if (nd->net_protocol == IPPROTO_UDP) {
249 socklen_t len = sizeof(nd->addr);
251 return recvfrom(io_u->file->fd, io_u->xfer_buf,
252 io_u->xfer_buflen, 0, &nd->addr, &len);
254 return recv(io_u->file->fd, io_u->xfer_buf, io_u->xfer_buflen,
259 static int fio_netio_queue(struct thread_data *td, struct io_u *io_u)
261 struct netio_data *nd = td->io_ops->data;
264 fio_ro_check(td, io_u);
266 if (io_u->ddir == DDIR_WRITE) {
267 if (!nd->use_splice || nd->net_protocol == IPPROTO_UDP)
268 ret = fio_netio_send(td, io_u);
270 ret = fio_netio_splice_out(td, io_u);
271 } else if (io_u->ddir == DDIR_READ) {
272 if (!nd->use_splice || nd->net_protocol == IPPROTO_UDP)
273 ret = fio_netio_recv(td, io_u);
275 ret = fio_netio_splice_in(td, io_u);
277 ret = 0; /* must be a SYNC */
279 if (ret != (int) io_u->xfer_buflen) {
281 io_u->resid = io_u->xfer_buflen - ret;
283 return FIO_Q_COMPLETED;
287 if (io_u->ddir == DDIR_WRITE && err == EMSGSIZE)
295 td_verror(td, io_u->error, "xfer");
297 return FIO_Q_COMPLETED;
300 static int fio_netio_connect(struct thread_data *td, struct fio_file *f)
302 struct netio_data *nd = td->io_ops->data;
305 if (nd->net_protocol == IPPROTO_TCP)
310 f->fd = socket(AF_INET, type, nd->net_protocol);
312 td_verror(td, errno, "socket");
316 if (nd->net_protocol == IPPROTO_UDP)
319 if (connect(f->fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
320 td_verror(td, errno, "connect");
327 static int fio_netio_accept(struct thread_data *td, struct fio_file *f)
329 struct netio_data *nd = td->io_ops->data;
330 socklen_t socklen = sizeof(nd->addr);
332 if (nd->net_protocol == IPPROTO_UDP) {
333 f->fd = nd->listenfd;
337 log_info("fio: waiting for connection\n");
339 if (poll_wait(td, nd->listenfd, POLLIN) < 0)
342 f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr, &socklen);
344 td_verror(td, errno, "accept");
351 static int fio_netio_open_file(struct thread_data *td, struct fio_file *f)
354 return fio_netio_accept(td, f);
356 return fio_netio_connect(td, f);
359 static int fio_netio_setup_connect(struct thread_data *td, const char *host,
362 struct netio_data *nd = td->io_ops->data;
364 nd->addr.sin_family = AF_INET;
365 nd->addr.sin_port = htons(port);
367 if (inet_aton(host, &nd->addr.sin_addr) != 1) {
368 struct hostent *hent;
370 hent = gethostbyname(host);
372 td_verror(td, errno, "gethostbyname");
376 memcpy(&nd->addr.sin_addr, hent->h_addr, 4);
382 static int fio_netio_setup_listen(struct thread_data *td, short port)
384 struct netio_data *nd = td->io_ops->data;
387 if (nd->net_protocol == IPPROTO_TCP)
392 fd = socket(AF_INET, type, nd->net_protocol);
394 td_verror(td, errno, "socket");
399 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
400 td_verror(td, errno, "setsockopt");
404 if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) < 0) {
405 td_verror(td, errno, "setsockopt");
410 nd->addr.sin_family = AF_INET;
411 nd->addr.sin_addr.s_addr = htonl(INADDR_ANY);
412 nd->addr.sin_port = htons(port);
414 if (bind(fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
415 td_verror(td, errno, "bind");
418 if (nd->net_protocol == IPPROTO_TCP && listen(fd, 1) < 0) {
419 td_verror(td, errno, "listen");
427 static int fio_netio_init(struct thread_data *td)
429 struct netio_data *nd = td->io_ops->data;
431 char host[64], buf[128];
432 char *sep, *portp, *modep;
436 log_err("fio: network connections must be read OR write\n");
440 log_err("fio: network IO can't be random\n");
444 strcpy(buf, td->o.filename);
446 sep = strchr(buf, '/');
458 sep = strchr(portp, '/');
464 port = strtol(portp, NULL, 10);
465 if (!port || port > 65535)
469 if (!strncmp("tcp", modep, strlen(modep)) ||
470 !strncmp("TCP", modep, strlen(modep)))
471 nd->net_protocol = IPPROTO_TCP;
472 else if (!strncmp("udp", modep, strlen(modep)) ||
473 !strncmp("UDP", modep, strlen(modep)))
474 nd->net_protocol = IPPROTO_UDP;
478 nd->net_protocol = IPPROTO_TCP;
482 ret = fio_netio_setup_listen(td, port);
485 ret = fio_netio_setup_connect(td, host, port);
490 log_err("fio: bad network host/port/protocol: %s\n", td->o.filename);
494 static void fio_netio_cleanup(struct thread_data *td)
496 struct netio_data *nd = td->io_ops->data;
499 if (nd->listenfd != -1)
501 if (nd->pipes[0] != -1)
503 if (nd->pipes[1] != -1)
510 static int fio_netio_setup(struct thread_data *td)
512 struct netio_data *nd;
514 if (!td->io_ops->data) {
515 nd = malloc(sizeof(*nd));;
517 memset(nd, 0, sizeof(*nd));
519 nd->pipes[0] = nd->pipes[1] = -1;
520 td->io_ops->data = nd;
526 #ifdef FIO_HAVE_SPLICE
527 static int fio_netio_setup_splice(struct thread_data *td)
529 struct netio_data *nd;
533 nd = td->io_ops->data;
535 if (pipe(nd->pipes) < 0)
545 static struct ioengine_ops ioengine_splice = {
547 .version = FIO_IOOPS_VERSION,
548 .prep = fio_netio_prep,
549 .queue = fio_netio_queue,
550 .setup = fio_netio_setup_splice,
551 .init = fio_netio_init,
552 .cleanup = fio_netio_cleanup,
553 .open_file = fio_netio_open_file,
554 .close_file = generic_close_file,
555 .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
560 static struct ioengine_ops ioengine_rw = {
562 .version = FIO_IOOPS_VERSION,
563 .prep = fio_netio_prep,
564 .queue = fio_netio_queue,
565 .setup = fio_netio_setup,
566 .init = fio_netio_init,
567 .cleanup = fio_netio_cleanup,
568 .open_file = fio_netio_open_file,
569 .close_file = generic_close_file,
570 .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
574 static void fio_init fio_netio_register(void)
576 register_ioengine(&ioengine_rw);
577 #ifdef FIO_HAVE_SPLICE
578 register_ioengine(&ioengine_splice);
582 static void fio_exit fio_netio_unregister(void)
584 unregister_ioengine(&ioengine_rw);
585 #ifdef FIO_HAVE_SPLICE
586 unregister_ioengine(&ioengine_splice);