4 * IO engine that reads/writes to/from sockets.
12 #include <netinet/in.h>
13 #include <arpa/inet.h>
16 #include <sys/types.h>
18 #include <sys/socket.h>
27 struct sockaddr_in addr;
28 struct sockaddr_un addr_un;
31 struct netio_options {
32 struct thread_data *td;
38 struct udp_close_msg {
44 FIO_LINK_CLOSE = 0x89,
45 FIO_LINK_CLOSE_MAGIC = 0x6c696e6b,
52 static int str_hostname_cb(void *data, const char *input);
53 static struct fio_option options[] = {
56 .type = FIO_OPT_STR_STORE,
57 .cb = str_hostname_cb,
58 .help = "Hostname for net IO engine",
59 .category = FIO_OPT_G_IO_ENG,
64 .off1 = offsetof(struct netio_options, port),
67 .help = "Port to use for TCP or UDP net connections",
68 .category = FIO_OPT_G_IO_ENG,
74 .off1 = offsetof(struct netio_options, proto),
75 .help = "Network protocol to use",
77 .category = FIO_OPT_G_IO_ENG,
81 .help = "Transmission Control Protocol",
85 .help = "Unreliable Datagram Protocol",
88 .oval = FIO_TYPE_UNIX,
89 .help = "UNIX domain socket",
95 .type = FIO_OPT_STR_SET,
96 .off1 = offsetof(struct netio_options, listen),
97 .help = "Listen for incoming TCP connections",
98 .category = FIO_OPT_G_IO_ENG,
106 * Return -1 for error and 'nr events' for a positive number
109 static int poll_wait(struct thread_data *td, int fd, short events)
114 while (!td->terminate) {
117 ret = poll(&pfd, 1, -1);
122 td_verror(td, errno, "poll");
130 if (pfd.revents & events)
136 static int fio_netio_prep(struct thread_data *td, struct io_u *io_u)
138 struct netio_options *o = td->eo;
141 * Make sure we don't see spurious reads to a receiver, and vice versa
143 if (o->proto == FIO_TYPE_TCP)
146 if ((o->listen && io_u->ddir == DDIR_WRITE) ||
147 (!o->listen && io_u->ddir == DDIR_READ)) {
148 td_verror(td, EINVAL, "bad direction");
155 #ifdef FIO_HAVE_SPLICE
156 static int splice_io_u(int fdin, int fdout, unsigned int len)
161 int ret = splice(fdin, NULL, fdout, NULL, len, 0);
179 * Receive bytes from a socket and fill them into the internal pipe
181 static int splice_in(struct thread_data *td, struct io_u *io_u)
183 struct netio_data *nd = td->io_ops->data;
185 return splice_io_u(io_u->file->fd, nd->pipes[1], io_u->xfer_buflen);
189 * Transmit 'len' bytes from the internal pipe
191 static int splice_out(struct thread_data *td, struct io_u *io_u,
194 struct netio_data *nd = td->io_ops->data;
196 return splice_io_u(nd->pipes[0], io_u->file->fd, len);
199 static int vmsplice_io_u(struct io_u *io_u, int fd, unsigned int len)
202 .iov_base = io_u->xfer_buf,
207 while (iov.iov_len) {
208 int ret = vmsplice(fd, &iov, 1, SPLICE_F_MOVE);
227 * vmsplice() pipe to io_u buffer
229 static int vmsplice_io_u_out(struct thread_data *td, struct io_u *io_u,
232 struct netio_data *nd = td->io_ops->data;
234 return vmsplice_io_u(io_u, nd->pipes[0], len);
238 * vmsplice() io_u to pipe
240 static int vmsplice_io_u_in(struct thread_data *td, struct io_u *io_u)
242 struct netio_data *nd = td->io_ops->data;
244 return vmsplice_io_u(io_u, nd->pipes[1], io_u->xfer_buflen);
248 * splice receive - transfer socket data into a pipe using splice, then map
249 * that pipe data into the io_u using vmsplice.
251 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
255 ret = splice_in(td, io_u);
257 return vmsplice_io_u_out(td, io_u, ret);
263 * splice transmit - map data from the io_u into a pipe by using vmsplice,
264 * then transfer that pipe to a socket using splice.
266 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
270 ret = vmsplice_io_u_in(td, io_u);
272 return splice_out(td, io_u, ret);
277 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
283 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
290 static int fio_netio_send(struct thread_data *td, struct io_u *io_u)
292 struct netio_data *nd = td->io_ops->data;
293 struct netio_options *o = td->eo;
294 int ret, flags = OS_MSG_DONTWAIT;
297 if (o->proto == FIO_TYPE_UDP) {
298 struct sockaddr *to = (struct sockaddr *) &nd->addr;
300 ret = sendto(io_u->file->fd, io_u->xfer_buf,
301 io_u->xfer_buflen, flags, to,
305 * if we are going to write more, set MSG_MORE
308 if (td->this_io_bytes[DDIR_WRITE] + io_u->xfer_buflen <
312 ret = send(io_u->file->fd, io_u->xfer_buf,
313 io_u->xfer_buflen, flags);
318 ret = poll_wait(td, io_u->file->fd, POLLOUT);
322 flags &= ~OS_MSG_DONTWAIT;
328 static int is_udp_close(struct io_u *io_u, int len)
330 struct udp_close_msg *msg;
332 if (len != sizeof(struct udp_close_msg))
335 msg = io_u->xfer_buf;
336 if (ntohl(msg->magic) != FIO_LINK_CLOSE_MAGIC)
338 if (ntohl(msg->cmd) != FIO_LINK_CLOSE)
344 static int fio_netio_recv(struct thread_data *td, struct io_u *io_u)
346 struct netio_data *nd = td->io_ops->data;
347 struct netio_options *o = td->eo;
348 int ret, flags = OS_MSG_DONTWAIT;
351 if (o->proto == FIO_TYPE_UDP) {
352 fio_socklen_t len = sizeof(nd->addr);
353 struct sockaddr *from = (struct sockaddr *) &nd->addr;
355 ret = recvfrom(io_u->file->fd, io_u->xfer_buf,
356 io_u->xfer_buflen, flags, from, &len);
357 if (is_udp_close(io_u, ret)) {
362 ret = recv(io_u->file->fd, io_u->xfer_buf,
363 io_u->xfer_buflen, flags);
368 ret = poll_wait(td, io_u->file->fd, POLLIN);
371 flags &= ~OS_MSG_DONTWAIT;
372 flags |= MSG_WAITALL;
378 static int fio_netio_queue(struct thread_data *td, struct io_u *io_u)
380 struct netio_data *nd = td->io_ops->data;
381 struct netio_options *o = td->eo;
384 fio_ro_check(td, io_u);
386 if (io_u->ddir == DDIR_WRITE) {
387 if (!nd->use_splice || o->proto == FIO_TYPE_UDP ||
388 o->proto == FIO_TYPE_UNIX)
389 ret = fio_netio_send(td, io_u);
391 ret = fio_netio_splice_out(td, io_u);
392 } else if (io_u->ddir == DDIR_READ) {
393 if (!nd->use_splice || o->proto == FIO_TYPE_UDP ||
394 o->proto == FIO_TYPE_UNIX)
395 ret = fio_netio_recv(td, io_u);
397 ret = fio_netio_splice_in(td, io_u);
399 ret = 0; /* must be a SYNC */
401 if (ret != (int) io_u->xfer_buflen) {
403 io_u->resid = io_u->xfer_buflen - ret;
405 return FIO_Q_COMPLETED;
409 if (io_u->ddir == DDIR_WRITE && err == EMSGSIZE)
417 td_verror(td, io_u->error, "xfer");
419 return FIO_Q_COMPLETED;
422 static int fio_netio_connect(struct thread_data *td, struct fio_file *f)
424 struct netio_data *nd = td->io_ops->data;
425 struct netio_options *o = td->eo;
428 if (o->proto == FIO_TYPE_TCP) {
431 } else if (o->proto == FIO_TYPE_UDP) {
434 } else if (o->proto == FIO_TYPE_UNIX) {
438 log_err("fio: bad network type %d\n", o->proto);
443 f->fd = socket(domain, type, 0);
445 td_verror(td, errno, "socket");
449 if (o->proto == FIO_TYPE_UDP)
451 else if (o->proto == FIO_TYPE_TCP) {
452 fio_socklen_t len = sizeof(nd->addr);
454 if (connect(f->fd, (struct sockaddr *) &nd->addr, len) < 0) {
455 td_verror(td, errno, "connect");
460 struct sockaddr_un *addr = &nd->addr_un;
463 len = sizeof(addr->sun_family) + strlen(addr->sun_path) + 1;
465 if (connect(f->fd, (struct sockaddr *) addr, len) < 0) {
466 td_verror(td, errno, "connect");
475 static int fio_netio_accept(struct thread_data *td, struct fio_file *f)
477 struct netio_data *nd = td->io_ops->data;
478 struct netio_options *o = td->eo;
479 fio_socklen_t socklen = sizeof(nd->addr);
481 if (o->proto == FIO_TYPE_UDP) {
482 f->fd = nd->listenfd;
486 log_info("fio: waiting for connection\n");
488 if (poll_wait(td, nd->listenfd, POLLIN) < 0)
491 f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr, &socklen);
493 td_verror(td, errno, "accept");
500 static int fio_netio_open_file(struct thread_data *td, struct fio_file *f)
503 struct netio_options *o = td->eo;
506 ret = fio_netio_accept(td, f);
508 ret = fio_netio_connect(td, f);
515 static void fio_netio_udp_close(struct thread_data *td, struct fio_file *f)
517 struct netio_data *nd = td->io_ops->data;
518 struct udp_close_msg msg;
519 struct sockaddr *to = (struct sockaddr *) &nd->addr;
522 msg.magic = htonl(FIO_LINK_CLOSE_MAGIC);
523 msg.cmd = htonl(FIO_LINK_CLOSE);
525 ret = sendto(f->fd, &msg, sizeof(msg), MSG_WAITALL, to,
528 td_verror(td, errno, "sendto udp link close");
531 static int fio_netio_close_file(struct thread_data *td, struct fio_file *f)
533 struct netio_options *o = td->eo;
536 * If this is an UDP connection, notify the receiver that we are
537 * closing down the link
539 if (o->proto == FIO_TYPE_UDP)
540 fio_netio_udp_close(td, f);
542 return generic_close_file(td, f);
545 static int fio_netio_setup_connect_inet(struct thread_data *td,
546 const char *host, unsigned short port)
548 struct netio_data *nd = td->io_ops->data;
550 nd->addr.sin_family = AF_INET;
551 nd->addr.sin_port = htons(port);
553 if (inet_aton(host, &nd->addr.sin_addr) != 1) {
554 struct hostent *hent;
556 hent = gethostbyname(host);
558 td_verror(td, errno, "gethostbyname");
562 memcpy(&nd->addr.sin_addr, hent->h_addr, 4);
568 static int fio_netio_setup_connect_unix(struct thread_data *td,
571 struct netio_data *nd = td->io_ops->data;
572 struct sockaddr_un *soun = &nd->addr_un;
574 soun->sun_family = AF_UNIX;
575 strcpy(soun->sun_path, path);
579 static int fio_netio_setup_connect(struct thread_data *td)
581 struct netio_options *o = td->eo;
583 if (o->proto == FIO_TYPE_UDP || o->proto == FIO_TYPE_TCP)
584 return fio_netio_setup_connect_inet(td, td->o.filename,o->port);
586 return fio_netio_setup_connect_unix(td, td->o.filename);
589 static int fio_netio_setup_listen_unix(struct thread_data *td, const char *path)
591 struct netio_data *nd = td->io_ops->data;
592 struct sockaddr_un *addr = &nd->addr_un;
596 fd = socket(AF_UNIX, SOCK_STREAM, 0);
598 log_err("fio: socket: %s\n", strerror(errno));
604 memset(addr, 0, sizeof(*addr));
605 addr->sun_family = AF_UNIX;
606 strcpy(addr->sun_path, path);
609 len = sizeof(addr->sun_family) + strlen(path) + 1;
611 if (bind(fd, (struct sockaddr *) addr, len) < 0) {
612 log_err("fio: bind: %s\n", strerror(errno));
622 static int fio_netio_setup_listen_inet(struct thread_data *td, short port)
624 struct netio_data *nd = td->io_ops->data;
625 struct netio_options *o = td->eo;
628 if (o->proto == FIO_TYPE_TCP)
633 fd = socket(AF_INET, type, 0);
635 td_verror(td, errno, "socket");
640 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
641 td_verror(td, errno, "setsockopt");
645 if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) < 0) {
646 td_verror(td, errno, "setsockopt");
651 nd->addr.sin_family = AF_INET;
652 nd->addr.sin_addr.s_addr = htonl(INADDR_ANY);
653 nd->addr.sin_port = htons(port);
655 if (bind(fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
656 td_verror(td, errno, "bind");
664 static int fio_netio_setup_listen(struct thread_data *td)
666 struct netio_data *nd = td->io_ops->data;
667 struct netio_options *o = td->eo;
670 if (o->proto == FIO_TYPE_UDP || o->proto == FIO_TYPE_TCP)
671 ret = fio_netio_setup_listen_inet(td, o->port);
673 ret = fio_netio_setup_listen_unix(td, td->o.filename);
677 if (o->proto == FIO_TYPE_UDP)
680 if (listen(nd->listenfd, 10) < 0) {
681 td_verror(td, errno, "listen");
689 static int fio_netio_init(struct thread_data *td)
691 struct netio_options *o = td->eo;
695 log_err("fio: network IO can't be random\n");
699 if (o->proto == FIO_TYPE_UNIX && o->port) {
700 log_err("fio: network IO port not valid with unix socket\n");
702 } else if (o->proto != FIO_TYPE_UNIX && !o->port) {
703 log_err("fio: network IO requires port for tcp or udp\n");
707 if (o->proto != FIO_TYPE_TCP) {
709 log_err("fio: listen only valid for TCP proto IO\n");
713 log_err("fio: datagram network connections must be"
717 if (o->proto == FIO_TYPE_UNIX && !td->o.filename) {
718 log_err("fio: UNIX sockets need host/filename\n");
721 o->listen = td_read(td);
724 if (o->proto != FIO_TYPE_UNIX && o->listen && td->o.filename) {
725 log_err("fio: hostname not valid for inbound network IO\n");
730 ret = fio_netio_setup_listen(td);
732 ret = fio_netio_setup_connect(td);
737 static void fio_netio_cleanup(struct thread_data *td)
739 struct netio_data *nd = td->io_ops->data;
742 if (nd->listenfd != -1)
744 if (nd->pipes[0] != -1)
746 if (nd->pipes[1] != -1)
753 static int fio_netio_setup(struct thread_data *td)
755 struct netio_data *nd;
757 if (!td->files_index) {
758 add_file(td, td->o.filename ?: "net");
759 td->o.nr_files = td->o.nr_files ?: 1;
762 if (!td->io_ops->data) {
763 nd = malloc(sizeof(*nd));;
765 memset(nd, 0, sizeof(*nd));
767 nd->pipes[0] = nd->pipes[1] = -1;
768 td->io_ops->data = nd;
774 #ifdef FIO_HAVE_SPLICE
775 static int fio_netio_setup_splice(struct thread_data *td)
777 struct netio_data *nd;
781 nd = td->io_ops->data;
783 if (pipe(nd->pipes) < 0)
793 static struct ioengine_ops ioengine_splice = {
795 .version = FIO_IOOPS_VERSION,
796 .prep = fio_netio_prep,
797 .queue = fio_netio_queue,
798 .setup = fio_netio_setup_splice,
799 .init = fio_netio_init,
800 .cleanup = fio_netio_cleanup,
801 .open_file = fio_netio_open_file,
802 .close_file = generic_close_file,
804 .option_struct_size = sizeof(struct netio_options),
805 .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
806 FIO_SIGTERM | FIO_PIPEIO,
810 static struct ioengine_ops ioengine_rw = {
812 .version = FIO_IOOPS_VERSION,
813 .prep = fio_netio_prep,
814 .queue = fio_netio_queue,
815 .setup = fio_netio_setup,
816 .init = fio_netio_init,
817 .cleanup = fio_netio_cleanup,
818 .open_file = fio_netio_open_file,
819 .close_file = fio_netio_close_file,
821 .option_struct_size = sizeof(struct netio_options),
822 .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
823 FIO_SIGTERM | FIO_PIPEIO,
826 static int str_hostname_cb(void *data, const char *input)
828 struct netio_options *o = data;
830 if (o->td->o.filename)
831 free(o->td->o.filename);
832 o->td->o.filename = strdup(input);
836 static void fio_init fio_netio_register(void)
838 register_ioengine(&ioengine_rw);
839 #ifdef FIO_HAVE_SPLICE
840 register_ioengine(&ioengine_splice);
844 static void fio_exit fio_netio_unregister(void)
846 unregister_ioengine(&ioengine_rw);
847 #ifdef FIO_HAVE_SPLICE
848 unregister_ioengine(&ioengine_splice);