4 * IO engine that reads/writes to/from sockets.
13 #include <netinet/in.h>
14 #include <netinet/tcp.h>
15 #include <arpa/inet.h>
18 #include <sys/types.h>
20 #include <sys/socket.h>
29 struct sockaddr_in addr;
30 struct sockaddr_un addr_un;
33 struct netio_options {
34 struct thread_data *td;
38 unsigned int pingpong;
42 struct udp_close_msg {
48 FIO_LINK_CLOSE = 0x89,
49 FIO_LINK_OPEN_CLOSE_MAGIC = 0x6c696e6b,
57 static int str_hostname_cb(void *data, const char *input);
58 static struct fio_option options[] = {
61 .type = FIO_OPT_STR_STORE,
62 .cb = str_hostname_cb,
63 .help = "Hostname for net IO engine",
68 .off1 = offsetof(struct netio_options, port),
71 .help = "Port to use for TCP or UDP net connections",
77 .off1 = offsetof(struct netio_options, proto),
78 .help = "Network protocol to use",
83 .help = "Transmission Control Protocol",
87 .help = "User Datagram Protocol",
90 .oval = FIO_TYPE_UNIX,
91 .help = "UNIX domain socket",
98 .off1 = offsetof(struct netio_options, nodelay),
99 .help = "Use TCP_NODELAY on TCP connections",
103 .type = FIO_OPT_STR_SET,
104 .off1 = offsetof(struct netio_options, listen),
105 .help = "Listen for incoming TCP connections",
109 .type = FIO_OPT_STR_SET,
110 .off1 = offsetof(struct netio_options, pingpong),
111 .help = "Ping-pong IO requests",
119 * Return -1 for error and 'nr events' for a positive number
122 static int poll_wait(struct thread_data *td, int fd, short events)
127 while (!td->terminate) {
130 ret = poll(&pfd, 1, -1);
135 td_verror(td, errno, "poll");
143 if (pfd.revents & events)
149 static int fio_netio_prep(struct thread_data *td, struct io_u *io_u)
151 struct netio_options *o = td->eo;
154 * Make sure we don't see spurious reads to a receiver, and vice versa
156 if (o->proto == FIO_TYPE_TCP)
159 if ((o->listen && io_u->ddir == DDIR_WRITE) ||
160 (!o->listen && io_u->ddir == DDIR_READ)) {
161 td_verror(td, EINVAL, "bad direction");
168 #ifdef CONFIG_LINUX_SPLICE
169 static int splice_io_u(int fdin, int fdout, unsigned int len)
174 int ret = splice(fdin, NULL, fdout, NULL, len, 0);
192 * Receive bytes from a socket and fill them into the internal pipe
194 static int splice_in(struct thread_data *td, struct io_u *io_u)
196 struct netio_data *nd = td->io_ops->data;
198 return splice_io_u(io_u->file->fd, nd->pipes[1], io_u->xfer_buflen);
202 * Transmit 'len' bytes from the internal pipe
204 static int splice_out(struct thread_data *td, struct io_u *io_u,
207 struct netio_data *nd = td->io_ops->data;
209 return splice_io_u(nd->pipes[0], io_u->file->fd, len);
212 static int vmsplice_io_u(struct io_u *io_u, int fd, unsigned int len)
215 .iov_base = io_u->xfer_buf,
220 while (iov.iov_len) {
221 int ret = vmsplice(fd, &iov, 1, SPLICE_F_MOVE);
240 * vmsplice() pipe to io_u buffer
242 static int vmsplice_io_u_out(struct thread_data *td, struct io_u *io_u,
245 struct netio_data *nd = td->io_ops->data;
247 return vmsplice_io_u(io_u, nd->pipes[0], len);
251 * vmsplice() io_u to pipe
253 static int vmsplice_io_u_in(struct thread_data *td, struct io_u *io_u)
255 struct netio_data *nd = td->io_ops->data;
257 return vmsplice_io_u(io_u, nd->pipes[1], io_u->xfer_buflen);
261 * splice receive - transfer socket data into a pipe using splice, then map
262 * that pipe data into the io_u using vmsplice.
264 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
268 ret = splice_in(td, io_u);
270 return vmsplice_io_u_out(td, io_u, ret);
276 * splice transmit - map data from the io_u into a pipe by using vmsplice,
277 * then transfer that pipe to a socket using splice.
279 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
283 ret = vmsplice_io_u_in(td, io_u);
285 return splice_out(td, io_u, ret);
290 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
296 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
303 static int fio_netio_send(struct thread_data *td, struct io_u *io_u)
305 struct netio_data *nd = td->io_ops->data;
306 struct netio_options *o = td->eo;
310 if (o->proto == FIO_TYPE_UDP) {
311 struct sockaddr *to = (struct sockaddr *) &nd->addr;
313 ret = sendto(io_u->file->fd, io_u->xfer_buf,
314 io_u->xfer_buflen, flags, to,
318 * if we are going to write more, set MSG_MORE
321 if ((td->this_io_bytes[DDIR_WRITE] + io_u->xfer_buflen <
322 td->o.size) && !o->pingpong)
325 ret = send(io_u->file->fd, io_u->xfer_buf,
326 io_u->xfer_buflen, flags);
331 ret = poll_wait(td, io_u->file->fd, POLLOUT);
339 static int is_udp_close(struct io_u *io_u, int len)
341 struct udp_close_msg *msg;
343 if (len != sizeof(struct udp_close_msg))
346 msg = io_u->xfer_buf;
347 if (ntohl(msg->magic) != FIO_LINK_OPEN_CLOSE_MAGIC)
349 if (ntohl(msg->cmd) != FIO_LINK_CLOSE)
355 static int fio_netio_recv(struct thread_data *td, struct io_u *io_u)
357 struct netio_data *nd = td->io_ops->data;
358 struct netio_options *o = td->eo;
362 if (o->proto == FIO_TYPE_UDP) {
363 socklen_t len = sizeof(nd->addr);
364 struct sockaddr *from = (struct sockaddr *) &nd->addr;
366 ret = recvfrom(io_u->file->fd, io_u->xfer_buf,
367 io_u->xfer_buflen, flags, from, &len);
368 if (is_udp_close(io_u, ret)) {
373 ret = recv(io_u->file->fd, io_u->xfer_buf,
374 io_u->xfer_buflen, flags);
378 else if (!ret && (flags & MSG_WAITALL))
381 ret = poll_wait(td, io_u->file->fd, POLLIN);
384 flags |= MSG_WAITALL;
390 static int __fio_netio_queue(struct thread_data *td, struct io_u *io_u,
393 struct netio_data *nd = td->io_ops->data;
394 struct netio_options *o = td->eo;
397 if (ddir == DDIR_WRITE) {
398 if (!nd->use_splice || o->proto == FIO_TYPE_UDP ||
399 o->proto == FIO_TYPE_UNIX)
400 ret = fio_netio_send(td, io_u);
402 ret = fio_netio_splice_out(td, io_u);
403 } else if (ddir == DDIR_READ) {
404 if (!nd->use_splice || o->proto == FIO_TYPE_UDP ||
405 o->proto == FIO_TYPE_UNIX)
406 ret = fio_netio_recv(td, io_u);
408 ret = fio_netio_splice_in(td, io_u);
410 ret = 0; /* must be a SYNC */
412 if (ret != (int) io_u->xfer_buflen) {
414 io_u->resid = io_u->xfer_buflen - ret;
416 return FIO_Q_COMPLETED;
420 if (ddir == DDIR_WRITE && err == EMSGSIZE)
428 td_verror(td, io_u->error, "xfer");
430 return FIO_Q_COMPLETED;
433 static int fio_netio_queue(struct thread_data *td, struct io_u *io_u)
435 struct netio_options *o = td->eo;
438 fio_ro_check(td, io_u);
440 ret = __fio_netio_queue(td, io_u, io_u->ddir);
441 if (!o->pingpong || ret != FIO_Q_COMPLETED)
445 * For ping-pong mode, receive or send reply as needed
447 if (td_read(td) && io_u->ddir == DDIR_READ)
448 ret = __fio_netio_queue(td, io_u, DDIR_WRITE);
449 else if (td_write(td) && io_u->ddir == DDIR_WRITE)
450 ret = __fio_netio_queue(td, io_u, DDIR_READ);
455 static int fio_netio_connect(struct thread_data *td, struct fio_file *f)
457 struct netio_data *nd = td->io_ops->data;
458 struct netio_options *o = td->eo;
459 int type, domain, optval;
461 if (o->proto == FIO_TYPE_TCP) {
464 } else if (o->proto == FIO_TYPE_UDP) {
467 } else if (o->proto == FIO_TYPE_UNIX) {
471 log_err("fio: bad network type %d\n", o->proto);
476 f->fd = socket(domain, type, 0);
478 td_verror(td, errno, "socket");
482 if (o->nodelay && o->proto == FIO_TYPE_TCP) {
484 if (setsockopt(f->fd, IPPROTO_TCP, TCP_NODELAY, (void *) &optval, sizeof(int)) < 0) {
485 log_err("fio: cannot set TCP_NODELAY option on socket (%s), disable with 'nodelay=0'\n", strerror(errno));
490 if (o->proto == FIO_TYPE_UDP)
492 else if (o->proto == FIO_TYPE_TCP) {
493 socklen_t len = sizeof(nd->addr);
495 if (connect(f->fd, (struct sockaddr *) &nd->addr, len) < 0) {
496 td_verror(td, errno, "connect");
501 struct sockaddr_un *addr = &nd->addr_un;
504 len = sizeof(addr->sun_family) + strlen(addr->sun_path) + 1;
506 if (connect(f->fd, (struct sockaddr *) addr, len) < 0) {
507 td_verror(td, errno, "connect");
516 static int fio_netio_accept(struct thread_data *td, struct fio_file *f)
518 struct netio_data *nd = td->io_ops->data;
519 struct netio_options *o = td->eo;
520 socklen_t socklen = sizeof(nd->addr);
523 if (o->proto == FIO_TYPE_UDP) {
524 f->fd = nd->listenfd;
528 state = td->runstate;
529 td_set_runstate(td, TD_SETTING_UP);
531 log_info("fio: waiting for connection\n");
533 if (poll_wait(td, nd->listenfd, POLLIN) < 0)
536 f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr, &socklen);
538 td_verror(td, errno, "accept");
542 if (o->nodelay && o->proto == FIO_TYPE_TCP) {
544 if (setsockopt(f->fd, IPPROTO_TCP, TCP_NODELAY, (void *) &optval, sizeof(int)) < 0) {
545 log_err("fio: cannot set TCP_NODELAY option on socket (%s), disable with 'nodelay=0'\n", strerror(errno));
551 td_set_runstate(td, state);
554 td_set_runstate(td, state);
558 static void fio_netio_udp_close(struct thread_data *td, struct fio_file *f)
560 struct netio_data *nd = td->io_ops->data;
561 struct udp_close_msg msg;
562 struct sockaddr *to = (struct sockaddr *) &nd->addr;
565 msg.magic = htonl(FIO_LINK_OPEN_CLOSE_MAGIC);
566 msg.cmd = htonl(FIO_LINK_CLOSE);
568 ret = sendto(f->fd, (void *) &msg, sizeof(msg), MSG_WAITALL, to,
571 td_verror(td, errno, "sendto udp link close");
574 static int fio_netio_close_file(struct thread_data *td, struct fio_file *f)
576 struct netio_options *o = td->eo;
579 * If this is an UDP connection, notify the receiver that we are
580 * closing down the link
582 if (o->proto == FIO_TYPE_UDP)
583 fio_netio_udp_close(td, f);
585 return generic_close_file(td, f);
588 static int fio_netio_udp_recv_open(struct thread_data *td, struct fio_file *f)
590 struct netio_data *nd = td->io_ops->data;
591 struct udp_close_msg msg;
592 struct sockaddr *to = (struct sockaddr *) &nd->addr;
593 socklen_t len = sizeof(nd->addr);
596 ret = recvfrom(f->fd, (void *) &msg, sizeof(msg), MSG_WAITALL, to, &len);
598 td_verror(td, errno, "sendto udp link open");
602 if (ntohl(msg.magic) != FIO_LINK_OPEN_CLOSE_MAGIC ||
603 ntohl(msg.cmd) != FIO_LINK_OPEN) {
604 log_err("fio: bad udp open magic %x/%x\n", ntohl(msg.magic),
612 static int fio_netio_udp_send_open(struct thread_data *td, struct fio_file *f)
614 struct netio_data *nd = td->io_ops->data;
615 struct udp_close_msg msg;
616 struct sockaddr *to = (struct sockaddr *) &nd->addr;
619 msg.magic = htonl(FIO_LINK_OPEN_CLOSE_MAGIC);
620 msg.cmd = htonl(FIO_LINK_OPEN);
622 ret = sendto(f->fd, (void *) &msg, sizeof(msg), MSG_WAITALL, to,
625 td_verror(td, errno, "sendto udp link open");
632 static int fio_netio_open_file(struct thread_data *td, struct fio_file *f)
635 struct netio_options *o = td->eo;
638 ret = fio_netio_accept(td, f);
640 ret = fio_netio_connect(td, f);
647 if (o->proto == FIO_TYPE_UDP) {
649 ret = fio_netio_udp_send_open(td, f);
653 state = td->runstate;
654 td_set_runstate(td, TD_SETTING_UP);
655 ret = fio_netio_udp_recv_open(td, f);
656 td_set_runstate(td, state);
661 fio_netio_close_file(td, f);
666 static int fio_netio_setup_connect_inet(struct thread_data *td,
667 const char *host, unsigned short port)
669 struct netio_data *nd = td->io_ops->data;
672 log_err("fio: connect with no host to connect to.\n");
674 log_err("fio: did you forget to set 'listen'?\n");
676 td_verror(td, EINVAL, "no hostname= set");
680 nd->addr.sin_family = AF_INET;
681 nd->addr.sin_port = htons(port);
683 if (inet_aton(host, &nd->addr.sin_addr) != 1) {
684 struct hostent *hent;
686 hent = gethostbyname(host);
688 td_verror(td, errno, "gethostbyname");
692 memcpy(&nd->addr.sin_addr, hent->h_addr, 4);
698 static int fio_netio_setup_connect_unix(struct thread_data *td,
701 struct netio_data *nd = td->io_ops->data;
702 struct sockaddr_un *soun = &nd->addr_un;
704 soun->sun_family = AF_UNIX;
705 strcpy(soun->sun_path, path);
709 static int fio_netio_setup_connect(struct thread_data *td)
711 struct netio_options *o = td->eo;
713 if (o->proto == FIO_TYPE_UDP || o->proto == FIO_TYPE_TCP)
714 return fio_netio_setup_connect_inet(td, td->o.filename,o->port);
716 return fio_netio_setup_connect_unix(td, td->o.filename);
719 static int fio_netio_setup_listen_unix(struct thread_data *td, const char *path)
721 struct netio_data *nd = td->io_ops->data;
722 struct sockaddr_un *addr = &nd->addr_un;
726 fd = socket(AF_UNIX, SOCK_STREAM, 0);
728 log_err("fio: socket: %s\n", strerror(errno));
734 memset(addr, 0, sizeof(*addr));
735 addr->sun_family = AF_UNIX;
736 strcpy(addr->sun_path, path);
739 len = sizeof(addr->sun_family) + strlen(path) + 1;
741 if (bind(fd, (struct sockaddr *) addr, len) < 0) {
742 log_err("fio: bind: %s\n", strerror(errno));
752 static int fio_netio_setup_listen_inet(struct thread_data *td, short port)
754 struct netio_data *nd = td->io_ops->data;
755 struct netio_options *o = td->eo;
758 if (o->proto == FIO_TYPE_TCP)
763 fd = socket(AF_INET, type, 0);
765 td_verror(td, errno, "socket");
770 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *) &opt, sizeof(opt)) < 0) {
771 td_verror(td, errno, "setsockopt");
775 if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, (void *) &opt, sizeof(opt)) < 0) {
776 td_verror(td, errno, "setsockopt");
781 nd->addr.sin_family = AF_INET;
782 nd->addr.sin_addr.s_addr = htonl(INADDR_ANY);
783 nd->addr.sin_port = htons(port);
785 if (bind(fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
786 td_verror(td, errno, "bind");
794 static int fio_netio_setup_listen(struct thread_data *td)
796 struct netio_data *nd = td->io_ops->data;
797 struct netio_options *o = td->eo;
800 if (o->proto == FIO_TYPE_UDP || o->proto == FIO_TYPE_TCP)
801 ret = fio_netio_setup_listen_inet(td, o->port);
803 ret = fio_netio_setup_listen_unix(td, td->o.filename);
807 if (o->proto == FIO_TYPE_UDP)
810 if (listen(nd->listenfd, 10) < 0) {
811 td_verror(td, errno, "listen");
819 static int fio_netio_init(struct thread_data *td)
821 struct netio_options *o = td->eo;
826 WSAStartup(MAKEWORD(2,2), &wsd);
830 log_err("fio: network IO can't be random\n");
834 if (o->proto == FIO_TYPE_UNIX && o->port) {
835 log_err("fio: network IO port not valid with unix socket\n");
837 } else if (o->proto != FIO_TYPE_UNIX && !o->port) {
838 log_err("fio: network IO requires port for tcp or udp\n");
842 if (o->proto != FIO_TYPE_TCP) {
844 log_err("fio: listen only valid for TCP proto IO\n");
848 log_err("fio: datagram network connections must be"
852 if (o->proto == FIO_TYPE_UNIX && !td->o.filename) {
853 log_err("fio: UNIX sockets need host/filename\n");
856 o->listen = td_read(td);
859 if (o->proto != FIO_TYPE_UNIX && o->listen && td->o.filename) {
860 log_err("fio: hostname not valid for inbound network IO\n");
865 ret = fio_netio_setup_listen(td);
867 ret = fio_netio_setup_connect(td);
872 static void fio_netio_cleanup(struct thread_data *td)
874 struct netio_data *nd = td->io_ops->data;
877 if (nd->listenfd != -1)
879 if (nd->pipes[0] != -1)
881 if (nd->pipes[1] != -1)
888 static int fio_netio_setup(struct thread_data *td)
890 struct netio_data *nd;
892 if (!td->files_index) {
893 add_file(td, td->o.filename ?: "net");
894 td->o.nr_files = td->o.nr_files ?: 1;
897 if (!td->io_ops->data) {
898 nd = malloc(sizeof(*nd));;
900 memset(nd, 0, sizeof(*nd));
902 nd->pipes[0] = nd->pipes[1] = -1;
903 td->io_ops->data = nd;
909 static void fio_netio_terminate(struct thread_data *td)
911 kill(td->pid, SIGUSR2);
914 #ifdef CONFIG_LINUX_SPLICE
915 static int fio_netio_setup_splice(struct thread_data *td)
917 struct netio_data *nd;
921 nd = td->io_ops->data;
923 if (pipe(nd->pipes) < 0)
933 static struct ioengine_ops ioengine_splice = {
935 .version = FIO_IOOPS_VERSION,
936 .prep = fio_netio_prep,
937 .queue = fio_netio_queue,
938 .setup = fio_netio_setup_splice,
939 .init = fio_netio_init,
940 .cleanup = fio_netio_cleanup,
941 .open_file = fio_netio_open_file,
942 .close_file = fio_netio_close_file,
943 .terminate = fio_netio_terminate,
945 .option_struct_size = sizeof(struct netio_options),
946 .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
951 static struct ioengine_ops ioengine_rw = {
953 .version = FIO_IOOPS_VERSION,
954 .prep = fio_netio_prep,
955 .queue = fio_netio_queue,
956 .setup = fio_netio_setup,
957 .init = fio_netio_init,
958 .cleanup = fio_netio_cleanup,
959 .open_file = fio_netio_open_file,
960 .close_file = fio_netio_close_file,
961 .terminate = fio_netio_terminate,
963 .option_struct_size = sizeof(struct netio_options),
964 .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
968 static int str_hostname_cb(void *data, const char *input)
970 struct netio_options *o = data;
972 if (o->td->o.filename)
973 free(o->td->o.filename);
974 o->td->o.filename = strdup(input);
978 static void fio_init fio_netio_register(void)
980 register_ioengine(&ioengine_rw);
981 #ifdef CONFIG_LINUX_SPLICE
982 register_ioengine(&ioengine_splice);
986 static void fio_exit fio_netio_unregister(void)
988 unregister_ioengine(&ioengine_rw);
989 #ifdef CONFIG_LINUX_SPLICE
990 unregister_ioengine(&ioengine_splice);