4 * IO engine that reads/writes to/from sockets.
13 #include <netinet/in.h>
14 #include <netinet/tcp.h>
15 #include <arpa/inet.h>
18 #include <sys/types.h>
20 #include <sys/socket.h>
29 struct sockaddr_in addr;
30 struct sockaddr_un addr_un;
33 struct netio_options {
34 struct thread_data *td;
38 unsigned int pingpong;
42 struct udp_close_msg {
48 FIO_LINK_CLOSE = 0x89,
49 FIO_LINK_OPEN_CLOSE_MAGIC = 0x6c696e6b,
57 static int str_hostname_cb(void *data, const char *input);
58 static struct fio_option options[] = {
61 .lname = "net engine hostname",
62 .type = FIO_OPT_STR_STORE,
63 .cb = str_hostname_cb,
64 .help = "Hostname for net IO engine",
65 .category = FIO_OPT_C_ENGINE,
66 .group = FIO_OPT_G_NETIO,
70 .lname = "net engine port",
72 .off1 = offsetof(struct netio_options, port),
75 .help = "Port to use for TCP or UDP net connections",
76 .category = FIO_OPT_C_ENGINE,
77 .group = FIO_OPT_G_NETIO,
81 .lname = "net engine protocol",
84 .off1 = offsetof(struct netio_options, proto),
85 .help = "Network protocol to use",
90 .help = "Transmission Control Protocol",
94 .help = "User Datagram Protocol",
97 .oval = FIO_TYPE_UNIX,
98 .help = "UNIX domain socket",
101 .category = FIO_OPT_C_ENGINE,
102 .group = FIO_OPT_G_NETIO,
104 #ifdef CONFIG_TCP_NODELAY
107 .type = FIO_OPT_BOOL,
108 .off1 = offsetof(struct netio_options, nodelay),
109 .help = "Use TCP_NODELAY on TCP connections",
110 .category = FIO_OPT_C_ENGINE,
111 .group = FIO_OPT_G_NETIO,
116 .lname = "net engine listen",
117 .type = FIO_OPT_STR_SET,
118 .off1 = offsetof(struct netio_options, listen),
119 .help = "Listen for incoming TCP connections",
120 .category = FIO_OPT_C_ENGINE,
121 .group = FIO_OPT_G_NETIO,
125 .type = FIO_OPT_STR_SET,
126 .off1 = offsetof(struct netio_options, pingpong),
127 .help = "Ping-pong IO requests",
128 .category = FIO_OPT_C_ENGINE,
129 .group = FIO_OPT_G_NETIO,
137 * Return -1 for error and 'nr events' for a positive number
140 static int poll_wait(struct thread_data *td, int fd, short events)
145 while (!td->terminate) {
148 ret = poll(&pfd, 1, -1);
153 td_verror(td, errno, "poll");
161 if (pfd.revents & events)
167 static int fio_netio_is_multicast(const char *mcaddr)
169 in_addr_t addr = inet_network(mcaddr);
173 if (inet_network("224.0.0.0") <= addr &&
174 inet_network("239.255.255.255") >= addr)
181 static int fio_netio_prep(struct thread_data *td, struct io_u *io_u)
183 struct netio_options *o = td->eo;
186 * Make sure we don't see spurious reads to a receiver, and vice versa
188 if (o->proto == FIO_TYPE_TCP)
191 if ((o->listen && io_u->ddir == DDIR_WRITE) ||
192 (!o->listen && io_u->ddir == DDIR_READ)) {
193 td_verror(td, EINVAL, "bad direction");
200 #ifdef CONFIG_LINUX_SPLICE
201 static int splice_io_u(int fdin, int fdout, unsigned int len)
206 int ret = splice(fdin, NULL, fdout, NULL, len, 0);
224 * Receive bytes from a socket and fill them into the internal pipe
226 static int splice_in(struct thread_data *td, struct io_u *io_u)
228 struct netio_data *nd = td->io_ops->data;
230 return splice_io_u(io_u->file->fd, nd->pipes[1], io_u->xfer_buflen);
234 * Transmit 'len' bytes from the internal pipe
236 static int splice_out(struct thread_data *td, struct io_u *io_u,
239 struct netio_data *nd = td->io_ops->data;
241 return splice_io_u(nd->pipes[0], io_u->file->fd, len);
244 static int vmsplice_io_u(struct io_u *io_u, int fd, unsigned int len)
247 .iov_base = io_u->xfer_buf,
252 while (iov.iov_len) {
253 int ret = vmsplice(fd, &iov, 1, SPLICE_F_MOVE);
272 * vmsplice() pipe to io_u buffer
274 static int vmsplice_io_u_out(struct thread_data *td, struct io_u *io_u,
277 struct netio_data *nd = td->io_ops->data;
279 return vmsplice_io_u(io_u, nd->pipes[0], len);
283 * vmsplice() io_u to pipe
285 static int vmsplice_io_u_in(struct thread_data *td, struct io_u *io_u)
287 struct netio_data *nd = td->io_ops->data;
289 return vmsplice_io_u(io_u, nd->pipes[1], io_u->xfer_buflen);
293 * splice receive - transfer socket data into a pipe using splice, then map
294 * that pipe data into the io_u using vmsplice.
296 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
300 ret = splice_in(td, io_u);
302 return vmsplice_io_u_out(td, io_u, ret);
308 * splice transmit - map data from the io_u into a pipe by using vmsplice,
309 * then transfer that pipe to a socket using splice.
311 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
315 ret = vmsplice_io_u_in(td, io_u);
317 return splice_out(td, io_u, ret);
322 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
328 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
335 static int fio_netio_send(struct thread_data *td, struct io_u *io_u)
337 struct netio_data *nd = td->io_ops->data;
338 struct netio_options *o = td->eo;
342 if (o->proto == FIO_TYPE_UDP) {
343 struct sockaddr *to = (struct sockaddr *) &nd->addr;
345 ret = sendto(io_u->file->fd, io_u->xfer_buf,
346 io_u->xfer_buflen, flags, to,
350 * if we are going to write more, set MSG_MORE
353 if ((td->this_io_bytes[DDIR_WRITE] + io_u->xfer_buflen <
354 td->o.size) && !o->pingpong)
357 ret = send(io_u->file->fd, io_u->xfer_buf,
358 io_u->xfer_buflen, flags);
363 ret = poll_wait(td, io_u->file->fd, POLLOUT);
371 static int is_udp_close(struct io_u *io_u, int len)
373 struct udp_close_msg *msg;
375 if (len != sizeof(struct udp_close_msg))
378 msg = io_u->xfer_buf;
379 if (ntohl(msg->magic) != FIO_LINK_OPEN_CLOSE_MAGIC)
381 if (ntohl(msg->cmd) != FIO_LINK_CLOSE)
387 static int fio_netio_recv(struct thread_data *td, struct io_u *io_u)
389 struct netio_data *nd = td->io_ops->data;
390 struct netio_options *o = td->eo;
394 if (o->proto == FIO_TYPE_UDP) {
397 struct sockaddr *from;
400 from = (struct sockaddr *) &nd->addr;
401 *len = sizeof(nd->addr);
407 ret = recvfrom(io_u->file->fd, io_u->xfer_buf,
408 io_u->xfer_buflen, flags, from, len);
409 if (is_udp_close(io_u, ret)) {
414 ret = recv(io_u->file->fd, io_u->xfer_buf,
415 io_u->xfer_buflen, flags);
419 else if (!ret && (flags & MSG_WAITALL))
422 ret = poll_wait(td, io_u->file->fd, POLLIN);
425 flags |= MSG_WAITALL;
431 static int __fio_netio_queue(struct thread_data *td, struct io_u *io_u,
434 struct netio_data *nd = td->io_ops->data;
435 struct netio_options *o = td->eo;
438 if (ddir == DDIR_WRITE) {
439 if (!nd->use_splice || o->proto == FIO_TYPE_UDP ||
440 o->proto == FIO_TYPE_UNIX)
441 ret = fio_netio_send(td, io_u);
443 ret = fio_netio_splice_out(td, io_u);
444 } else if (ddir == DDIR_READ) {
445 if (!nd->use_splice || o->proto == FIO_TYPE_UDP ||
446 o->proto == FIO_TYPE_UNIX)
447 ret = fio_netio_recv(td, io_u);
449 ret = fio_netio_splice_in(td, io_u);
451 ret = 0; /* must be a SYNC */
453 if (ret != (int) io_u->xfer_buflen) {
455 io_u->resid = io_u->xfer_buflen - ret;
457 return FIO_Q_COMPLETED;
461 if (ddir == DDIR_WRITE && err == EMSGSIZE)
469 td_verror(td, io_u->error, "xfer");
471 return FIO_Q_COMPLETED;
474 static int fio_netio_queue(struct thread_data *td, struct io_u *io_u)
476 struct netio_options *o = td->eo;
479 fio_ro_check(td, io_u);
481 ret = __fio_netio_queue(td, io_u, io_u->ddir);
482 if (!o->pingpong || ret != FIO_Q_COMPLETED)
486 * For ping-pong mode, receive or send reply as needed
488 if (td_read(td) && io_u->ddir == DDIR_READ)
489 ret = __fio_netio_queue(td, io_u, DDIR_WRITE);
490 else if (td_write(td) && io_u->ddir == DDIR_WRITE)
491 ret = __fio_netio_queue(td, io_u, DDIR_READ);
496 static int fio_netio_connect(struct thread_data *td, struct fio_file *f)
498 struct netio_data *nd = td->io_ops->data;
499 struct netio_options *o = td->eo;
502 if (o->proto == FIO_TYPE_TCP) {
505 } else if (o->proto == FIO_TYPE_UDP) {
508 } else if (o->proto == FIO_TYPE_UNIX) {
512 log_err("fio: bad network type %d\n", o->proto);
517 f->fd = socket(domain, type, 0);
519 td_verror(td, errno, "socket");
523 #ifdef CONFIG_TCP_NODELAY
524 if (o->nodelay && o->proto == FIO_TYPE_TCP) {
527 if (setsockopt(f->fd, IPPROTO_TCP, TCP_NODELAY, (void *) &optval, sizeof(int)) < 0) {
528 log_err("fio: cannot set TCP_NODELAY option on socket (%s), disable with 'nodelay=0'\n", strerror(errno));
534 if (o->proto == FIO_TYPE_UDP)
536 else if (o->proto == FIO_TYPE_TCP) {
537 socklen_t len = sizeof(nd->addr);
539 if (connect(f->fd, (struct sockaddr *) &nd->addr, len) < 0) {
540 td_verror(td, errno, "connect");
545 struct sockaddr_un *addr = &nd->addr_un;
548 len = sizeof(addr->sun_family) + strlen(addr->sun_path) + 1;
550 if (connect(f->fd, (struct sockaddr *) addr, len) < 0) {
551 td_verror(td, errno, "connect");
560 static int fio_netio_accept(struct thread_data *td, struct fio_file *f)
562 struct netio_data *nd = td->io_ops->data;
563 struct netio_options *o = td->eo;
564 socklen_t socklen = sizeof(nd->addr);
567 if (o->proto == FIO_TYPE_UDP) {
568 f->fd = nd->listenfd;
572 state = td->runstate;
573 td_set_runstate(td, TD_SETTING_UP);
575 log_info("fio: waiting for connection\n");
577 if (poll_wait(td, nd->listenfd, POLLIN) < 0)
580 f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr, &socklen);
582 td_verror(td, errno, "accept");
586 #ifdef CONFIG_TCP_NODELAY
587 if (o->nodelay && o->proto == FIO_TYPE_TCP) {
590 if (setsockopt(f->fd, IPPROTO_TCP, TCP_NODELAY, (void *) &optval, sizeof(int)) < 0) {
591 log_err("fio: cannot set TCP_NODELAY option on socket (%s), disable with 'nodelay=0'\n", strerror(errno));
598 td_set_runstate(td, state);
601 td_set_runstate(td, state);
605 static void fio_netio_udp_close(struct thread_data *td, struct fio_file *f)
607 struct netio_data *nd = td->io_ops->data;
608 struct udp_close_msg msg;
609 struct sockaddr *to = (struct sockaddr *) &nd->addr;
612 msg.magic = htonl(FIO_LINK_OPEN_CLOSE_MAGIC);
613 msg.cmd = htonl(FIO_LINK_CLOSE);
615 ret = sendto(f->fd, (void *) &msg, sizeof(msg), MSG_WAITALL, to,
618 td_verror(td, errno, "sendto udp link close");
621 static int fio_netio_close_file(struct thread_data *td, struct fio_file *f)
623 struct netio_options *o = td->eo;
626 * If this is an UDP connection, notify the receiver that we are
627 * closing down the link
629 if (o->proto == FIO_TYPE_UDP)
630 fio_netio_udp_close(td, f);
632 return generic_close_file(td, f);
635 static int fio_netio_udp_recv_open(struct thread_data *td, struct fio_file *f)
637 struct netio_data *nd = td->io_ops->data;
638 struct udp_close_msg msg;
639 struct sockaddr *to = (struct sockaddr *) &nd->addr;
640 socklen_t len = sizeof(nd->addr);
643 ret = recvfrom(f->fd, (void *) &msg, sizeof(msg), MSG_WAITALL, to, &len);
645 td_verror(td, errno, "recvfrom udp link open");
649 if (ntohl(msg.magic) != FIO_LINK_OPEN_CLOSE_MAGIC ||
650 ntohl(msg.cmd) != FIO_LINK_OPEN) {
651 log_err("fio: bad udp open magic %x/%x\n", ntohl(msg.magic),
659 static int fio_netio_udp_send_open(struct thread_data *td, struct fio_file *f)
661 struct netio_data *nd = td->io_ops->data;
662 struct udp_close_msg msg;
663 struct sockaddr *to = (struct sockaddr *) &nd->addr;
666 msg.magic = htonl(FIO_LINK_OPEN_CLOSE_MAGIC);
667 msg.cmd = htonl(FIO_LINK_OPEN);
669 ret = sendto(f->fd, (void *) &msg, sizeof(msg), MSG_WAITALL, to,
672 td_verror(td, errno, "sendto udp link open");
679 static int fio_netio_open_file(struct thread_data *td, struct fio_file *f)
682 struct netio_options *o = td->eo;
685 ret = fio_netio_accept(td, f);
687 ret = fio_netio_connect(td, f);
694 if (o->proto == FIO_TYPE_UDP) {
696 ret = fio_netio_udp_send_open(td, f);
700 state = td->runstate;
701 td_set_runstate(td, TD_SETTING_UP);
702 ret = fio_netio_udp_recv_open(td, f);
703 td_set_runstate(td, state);
708 fio_netio_close_file(td, f);
713 static int fio_netio_setup_connect_inet(struct thread_data *td,
714 const char *host, unsigned short port)
716 struct netio_data *nd = td->io_ops->data;
719 log_err("fio: connect with no host to connect to.\n");
721 log_err("fio: did you forget to set 'listen'?\n");
723 td_verror(td, EINVAL, "no hostname= set");
727 nd->addr.sin_family = AF_INET;
728 nd->addr.sin_port = htons(port);
730 if (inet_aton(host, &nd->addr.sin_addr) != 1) {
731 struct hostent *hent;
733 hent = gethostbyname(host);
735 td_verror(td, errno, "gethostbyname");
739 memcpy(&nd->addr.sin_addr, hent->h_addr, 4);
745 static int fio_netio_setup_connect_unix(struct thread_data *td,
748 struct netio_data *nd = td->io_ops->data;
749 struct sockaddr_un *soun = &nd->addr_un;
751 soun->sun_family = AF_UNIX;
752 strcpy(soun->sun_path, path);
756 static int fio_netio_setup_connect(struct thread_data *td)
758 struct netio_options *o = td->eo;
760 if (o->proto == FIO_TYPE_UDP || o->proto == FIO_TYPE_TCP)
761 return fio_netio_setup_connect_inet(td, td->o.filename,o->port);
763 return fio_netio_setup_connect_unix(td, td->o.filename);
766 static int fio_netio_setup_listen_unix(struct thread_data *td, const char *path)
768 struct netio_data *nd = td->io_ops->data;
769 struct sockaddr_un *addr = &nd->addr_un;
773 fd = socket(AF_UNIX, SOCK_STREAM, 0);
775 log_err("fio: socket: %s\n", strerror(errno));
781 memset(addr, 0, sizeof(*addr));
782 addr->sun_family = AF_UNIX;
783 strcpy(addr->sun_path, path);
786 len = sizeof(addr->sun_family) + strlen(path) + 1;
788 if (bind(fd, (struct sockaddr *) addr, len) < 0) {
789 log_err("fio: bind: %s\n", strerror(errno));
799 static int fio_netio_setup_listen_inet(struct thread_data *td, short port)
801 struct netio_data *nd = td->io_ops->data;
802 struct netio_options *o = td->eo;
804 struct sockaddr_in sin;
807 memset(&sin, 0, sizeof(sin));
808 if (o->proto == FIO_TYPE_TCP)
813 fd = socket(AF_INET, type, 0);
815 td_verror(td, errno, "socket");
820 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *) &opt, sizeof(opt)) < 0) {
821 td_verror(td, errno, "setsockopt");
826 if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, (void *) &opt, sizeof(opt)) < 0) {
827 td_verror(td, errno, "setsockopt");
834 if(o->proto != FIO_TYPE_UDP ||
835 !fio_netio_is_multicast(td->o.filename)) {
836 log_err("fio: hostname not valid for non-multicast inbound network IO\n");
841 inet_aton(td->o.filename, &sin.sin_addr);
843 mr.imr_multiaddr = sin.sin_addr;
844 mr.imr_interface.s_addr = htonl(INADDR_ANY);
845 if (setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mr, sizeof(mr)) < 0) {
846 td_verror(td, errno, "setsockopt IP_ADD_MEMBERSHIP");
852 nd->addr.sin_family = AF_INET;
853 nd->addr.sin_addr.s_addr = sin.sin_addr.s_addr ? sin.sin_addr.s_addr : htonl(INADDR_ANY);
854 nd->addr.sin_port = htons(port);
856 if (bind(fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
857 td_verror(td, errno, "bind");
865 static int fio_netio_setup_listen(struct thread_data *td)
867 struct netio_data *nd = td->io_ops->data;
868 struct netio_options *o = td->eo;
871 if (o->proto == FIO_TYPE_UDP || o->proto == FIO_TYPE_TCP)
872 ret = fio_netio_setup_listen_inet(td, o->port);
874 ret = fio_netio_setup_listen_unix(td, td->o.filename);
878 if (o->proto == FIO_TYPE_UDP)
881 if (listen(nd->listenfd, 10) < 0) {
882 td_verror(td, errno, "listen");
890 static int fio_netio_init(struct thread_data *td)
892 struct netio_options *o = td->eo;
897 WSAStartup(MAKEWORD(2,2), &wsd);
901 log_err("fio: network IO can't be random\n");
905 if (o->proto == FIO_TYPE_UNIX && o->port) {
906 log_err("fio: network IO port not valid with unix socket\n");
908 } else if (o->proto != FIO_TYPE_UNIX && !o->port) {
909 log_err("fio: network IO requires port for tcp or udp\n");
913 if (o->proto != FIO_TYPE_TCP) {
915 log_err("fio: listen only valid for TCP proto IO\n");
919 log_err("fio: datagram network connections must be"
923 if (o->proto == FIO_TYPE_UNIX && !td->o.filename) {
924 log_err("fio: UNIX sockets need host/filename\n");
927 o->listen = td_read(td);
931 ret = fio_netio_setup_listen(td);
933 ret = fio_netio_setup_connect(td);
938 static void fio_netio_cleanup(struct thread_data *td)
940 struct netio_data *nd = td->io_ops->data;
943 if (nd->listenfd != -1)
945 if (nd->pipes[0] != -1)
947 if (nd->pipes[1] != -1)
954 static int fio_netio_setup(struct thread_data *td)
956 struct netio_data *nd;
958 if (!td->files_index) {
959 add_file(td, td->o.filename ?: "net");
960 td->o.nr_files = td->o.nr_files ?: 1;
963 if (!td->io_ops->data) {
964 nd = malloc(sizeof(*nd));;
966 memset(nd, 0, sizeof(*nd));
968 nd->pipes[0] = nd->pipes[1] = -1;
969 td->io_ops->data = nd;
975 static void fio_netio_terminate(struct thread_data *td)
977 kill(td->pid, SIGUSR2);
980 #ifdef CONFIG_LINUX_SPLICE
981 static int fio_netio_setup_splice(struct thread_data *td)
983 struct netio_data *nd;
987 nd = td->io_ops->data;
989 if (pipe(nd->pipes) < 0)
999 static struct ioengine_ops ioengine_splice = {
1000 .name = "netsplice",
1001 .version = FIO_IOOPS_VERSION,
1002 .prep = fio_netio_prep,
1003 .queue = fio_netio_queue,
1004 .setup = fio_netio_setup_splice,
1005 .init = fio_netio_init,
1006 .cleanup = fio_netio_cleanup,
1007 .open_file = fio_netio_open_file,
1008 .close_file = fio_netio_close_file,
1009 .terminate = fio_netio_terminate,
1011 .option_struct_size = sizeof(struct netio_options),
1012 .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
1017 static struct ioengine_ops ioengine_rw = {
1019 .version = FIO_IOOPS_VERSION,
1020 .prep = fio_netio_prep,
1021 .queue = fio_netio_queue,
1022 .setup = fio_netio_setup,
1023 .init = fio_netio_init,
1024 .cleanup = fio_netio_cleanup,
1025 .open_file = fio_netio_open_file,
1026 .close_file = fio_netio_close_file,
1027 .terminate = fio_netio_terminate,
1029 .option_struct_size = sizeof(struct netio_options),
1030 .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
1031 FIO_PIPEIO | FIO_BIT_BASED,
1034 static int str_hostname_cb(void *data, const char *input)
1036 struct netio_options *o = data;
1038 if (o->td->o.filename)
1039 free(o->td->o.filename);
1040 o->td->o.filename = strdup(input);
1044 static void fio_init fio_netio_register(void)
1046 register_ioengine(&ioengine_rw);
1047 #ifdef CONFIG_LINUX_SPLICE
1048 register_ioengine(&ioengine_splice);
1052 static void fio_exit fio_netio_unregister(void)
1054 unregister_ioengine(&ioengine_rw);
1055 #ifdef CONFIG_LINUX_SPLICE
1056 unregister_ioengine(&ioengine_splice);