4 * IO engine that reads/writes to/from sockets.
12 #include <netinet/in.h>
13 #include <arpa/inet.h>
16 #include <sys/types.h>
18 #include <sys/socket.h>
27 struct sockaddr_in addr;
28 struct sockaddr_un addr_un;
31 struct netio_options {
32 struct thread_data *td;
36 unsigned int pingpong;
39 struct udp_close_msg {
45 FIO_LINK_CLOSE = 0x89,
46 FIO_LINK_OPEN_CLOSE_MAGIC = 0x6c696e6b,
54 static int str_hostname_cb(void *data, const char *input);
55 static struct fio_option options[] = {
58 .type = FIO_OPT_STR_STORE,
59 .cb = str_hostname_cb,
60 .help = "Hostname for net IO engine",
65 .off1 = offsetof(struct netio_options, port),
68 .help = "Port to use for TCP or UDP net connections",
74 .off1 = offsetof(struct netio_options, proto),
75 .help = "Network protocol to use",
80 .help = "Transmission Control Protocol",
84 .help = "User Datagram Protocol",
87 .oval = FIO_TYPE_UNIX,
88 .help = "UNIX domain socket",
94 .type = FIO_OPT_STR_SET,
95 .off1 = offsetof(struct netio_options, listen),
96 .help = "Listen for incoming TCP connections",
100 .type = FIO_OPT_STR_SET,
101 .off1 = offsetof(struct netio_options, pingpong),
102 .help = "Ping-pong IO requests",
110 * Return -1 for error and 'nr events' for a positive number
113 static int poll_wait(struct thread_data *td, int fd, short events)
118 while (!td->terminate) {
121 ret = poll(&pfd, 1, -1);
126 td_verror(td, errno, "poll");
134 if (pfd.revents & events)
140 static int fio_netio_prep(struct thread_data *td, struct io_u *io_u)
142 struct netio_options *o = td->eo;
145 * Make sure we don't see spurious reads to a receiver, and vice versa
147 if (o->proto == FIO_TYPE_TCP)
150 if ((o->listen && io_u->ddir == DDIR_WRITE) ||
151 (!o->listen && io_u->ddir == DDIR_READ)) {
152 td_verror(td, EINVAL, "bad direction");
159 #ifdef FIO_HAVE_SPLICE
160 static int splice_io_u(int fdin, int fdout, unsigned int len)
165 int ret = splice(fdin, NULL, fdout, NULL, len, 0);
183 * Receive bytes from a socket and fill them into the internal pipe
185 static int splice_in(struct thread_data *td, struct io_u *io_u)
187 struct netio_data *nd = td->io_ops->data;
189 return splice_io_u(io_u->file->fd, nd->pipes[1], io_u->xfer_buflen);
193 * Transmit 'len' bytes from the internal pipe
195 static int splice_out(struct thread_data *td, struct io_u *io_u,
198 struct netio_data *nd = td->io_ops->data;
200 return splice_io_u(nd->pipes[0], io_u->file->fd, len);
203 static int vmsplice_io_u(struct io_u *io_u, int fd, unsigned int len)
206 .iov_base = io_u->xfer_buf,
211 while (iov.iov_len) {
212 int ret = vmsplice(fd, &iov, 1, SPLICE_F_MOVE);
231 * vmsplice() pipe to io_u buffer
233 static int vmsplice_io_u_out(struct thread_data *td, struct io_u *io_u,
236 struct netio_data *nd = td->io_ops->data;
238 return vmsplice_io_u(io_u, nd->pipes[0], len);
242 * vmsplice() io_u to pipe
244 static int vmsplice_io_u_in(struct thread_data *td, struct io_u *io_u)
246 struct netio_data *nd = td->io_ops->data;
248 return vmsplice_io_u(io_u, nd->pipes[1], io_u->xfer_buflen);
252 * splice receive - transfer socket data into a pipe using splice, then map
253 * that pipe data into the io_u using vmsplice.
255 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
259 ret = splice_in(td, io_u);
261 return vmsplice_io_u_out(td, io_u, ret);
267 * splice transmit - map data from the io_u into a pipe by using vmsplice,
268 * then transfer that pipe to a socket using splice.
270 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
274 ret = vmsplice_io_u_in(td, io_u);
276 return splice_out(td, io_u, ret);
281 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
287 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
294 static int fio_netio_send(struct thread_data *td, struct io_u *io_u)
296 struct netio_data *nd = td->io_ops->data;
297 struct netio_options *o = td->eo;
301 if (o->proto == FIO_TYPE_UDP) {
302 struct sockaddr *to = (struct sockaddr *) &nd->addr;
304 ret = sendto(io_u->file->fd, io_u->xfer_buf,
305 io_u->xfer_buflen, flags, to,
309 * if we are going to write more, set MSG_MORE
312 if ((td->this_io_bytes[DDIR_WRITE] + io_u->xfer_buflen <
313 td->o.size) && !o->pingpong)
316 ret = send(io_u->file->fd, io_u->xfer_buf,
317 io_u->xfer_buflen, flags);
322 ret = poll_wait(td, io_u->file->fd, POLLOUT);
330 static int is_udp_close(struct io_u *io_u, int len)
332 struct udp_close_msg *msg;
334 if (len != sizeof(struct udp_close_msg))
337 msg = io_u->xfer_buf;
338 if (ntohl(msg->magic) != FIO_LINK_OPEN_CLOSE_MAGIC)
340 if (ntohl(msg->cmd) != FIO_LINK_CLOSE)
346 static int fio_netio_recv(struct thread_data *td, struct io_u *io_u)
348 struct netio_data *nd = td->io_ops->data;
349 struct netio_options *o = td->eo;
353 if (o->proto == FIO_TYPE_UDP) {
354 fio_socklen_t len = sizeof(nd->addr);
355 struct sockaddr *from = (struct sockaddr *) &nd->addr;
357 ret = recvfrom(io_u->file->fd, io_u->xfer_buf,
358 io_u->xfer_buflen, flags, from, &len);
359 if (is_udp_close(io_u, ret)) {
364 ret = recv(io_u->file->fd, io_u->xfer_buf,
365 io_u->xfer_buflen, flags);
369 else if (!ret && (flags & MSG_WAITALL))
372 ret = poll_wait(td, io_u->file->fd, POLLIN);
375 flags |= MSG_WAITALL;
381 static int __fio_netio_queue(struct thread_data *td, struct io_u *io_u,
384 struct netio_data *nd = td->io_ops->data;
385 struct netio_options *o = td->eo;
388 if (ddir == DDIR_WRITE) {
389 if (!nd->use_splice || o->proto == FIO_TYPE_UDP ||
390 o->proto == FIO_TYPE_UNIX)
391 ret = fio_netio_send(td, io_u);
393 ret = fio_netio_splice_out(td, io_u);
394 } else if (ddir == DDIR_READ) {
395 if (!nd->use_splice || o->proto == FIO_TYPE_UDP ||
396 o->proto == FIO_TYPE_UNIX)
397 ret = fio_netio_recv(td, io_u);
399 ret = fio_netio_splice_in(td, io_u);
401 ret = 0; /* must be a SYNC */
403 if (ret != (int) io_u->xfer_buflen) {
405 io_u->resid = io_u->xfer_buflen - ret;
407 return FIO_Q_COMPLETED;
411 if (ddir == DDIR_WRITE && err == EMSGSIZE)
419 td_verror(td, io_u->error, "xfer");
421 return FIO_Q_COMPLETED;
424 static int fio_netio_queue(struct thread_data *td, struct io_u *io_u)
426 struct netio_options *o = td->eo;
429 fio_ro_check(td, io_u);
431 ret = __fio_netio_queue(td, io_u, io_u->ddir);
432 if (!o->pingpong || ret != FIO_Q_COMPLETED)
436 * For ping-pong mode, receive or send reply as needed
438 if (td_read(td) && io_u->ddir == DDIR_READ)
439 ret = __fio_netio_queue(td, io_u, DDIR_WRITE);
440 else if (td_write(td) && io_u->ddir == DDIR_WRITE)
441 ret = __fio_netio_queue(td, io_u, DDIR_READ);
446 static int fio_netio_connect(struct thread_data *td, struct fio_file *f)
448 struct netio_data *nd = td->io_ops->data;
449 struct netio_options *o = td->eo;
452 if (o->proto == FIO_TYPE_TCP) {
455 } else if (o->proto == FIO_TYPE_UDP) {
458 } else if (o->proto == FIO_TYPE_UNIX) {
462 log_err("fio: bad network type %d\n", o->proto);
467 f->fd = socket(domain, type, 0);
469 td_verror(td, errno, "socket");
473 if (o->proto == FIO_TYPE_UDP)
475 else if (o->proto == FIO_TYPE_TCP) {
476 fio_socklen_t len = sizeof(nd->addr);
478 if (connect(f->fd, (struct sockaddr *) &nd->addr, len) < 0) {
479 td_verror(td, errno, "connect");
484 struct sockaddr_un *addr = &nd->addr_un;
487 len = sizeof(addr->sun_family) + strlen(addr->sun_path) + 1;
489 if (connect(f->fd, (struct sockaddr *) addr, len) < 0) {
490 td_verror(td, errno, "connect");
499 static int fio_netio_accept(struct thread_data *td, struct fio_file *f)
501 struct netio_data *nd = td->io_ops->data;
502 struct netio_options *o = td->eo;
503 fio_socklen_t socklen = sizeof(nd->addr);
506 if (o->proto == FIO_TYPE_UDP) {
507 f->fd = nd->listenfd;
511 state = td->runstate;
512 td_set_runstate(td, TD_SETTING_UP);
514 log_info("fio: waiting for connection\n");
516 if (poll_wait(td, nd->listenfd, POLLIN) < 0)
519 f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr, &socklen);
521 td_verror(td, errno, "accept");
525 td_set_runstate(td, state);
528 td_set_runstate(td, state);
532 static void fio_netio_udp_close(struct thread_data *td, struct fio_file *f)
534 struct netio_data *nd = td->io_ops->data;
535 struct udp_close_msg msg;
536 struct sockaddr *to = (struct sockaddr *) &nd->addr;
539 msg.magic = htonl(FIO_LINK_OPEN_CLOSE_MAGIC);
540 msg.cmd = htonl(FIO_LINK_CLOSE);
542 ret = sendto(f->fd, &msg, sizeof(msg), MSG_WAITALL, to,
545 td_verror(td, errno, "sendto udp link close");
548 static int fio_netio_close_file(struct thread_data *td, struct fio_file *f)
550 struct netio_options *o = td->eo;
553 * If this is an UDP connection, notify the receiver that we are
554 * closing down the link
556 if (o->proto == FIO_TYPE_UDP)
557 fio_netio_udp_close(td, f);
559 return generic_close_file(td, f);
562 static int fio_netio_udp_recv_open(struct thread_data *td, struct fio_file *f)
564 struct netio_data *nd = td->io_ops->data;
565 struct udp_close_msg msg;
566 struct sockaddr *to = (struct sockaddr *) &nd->addr;
567 fio_socklen_t len = sizeof(nd->addr);
570 ret = recvfrom(f->fd, &msg, sizeof(msg), MSG_WAITALL, to, &len);
572 td_verror(td, errno, "sendto udp link open");
576 if (ntohl(msg.magic) != FIO_LINK_OPEN_CLOSE_MAGIC ||
577 ntohl(msg.cmd) != FIO_LINK_OPEN) {
578 log_err("fio: bad udp open magic %x/%x\n", ntohl(msg.magic),
586 static int fio_netio_udp_send_open(struct thread_data *td, struct fio_file *f)
588 struct netio_data *nd = td->io_ops->data;
589 struct udp_close_msg msg;
590 struct sockaddr *to = (struct sockaddr *) &nd->addr;
593 msg.magic = htonl(FIO_LINK_OPEN_CLOSE_MAGIC);
594 msg.cmd = htonl(FIO_LINK_OPEN);
596 ret = sendto(f->fd, &msg, sizeof(msg), MSG_WAITALL, to,
599 td_verror(td, errno, "sendto udp link open");
606 static int fio_netio_open_file(struct thread_data *td, struct fio_file *f)
609 struct netio_options *o = td->eo;
612 ret = fio_netio_accept(td, f);
614 ret = fio_netio_connect(td, f);
621 if (o->proto == FIO_TYPE_UDP) {
623 ret = fio_netio_udp_send_open(td, f);
627 state = td->runstate;
628 td_set_runstate(td, TD_SETTING_UP);
629 ret = fio_netio_udp_recv_open(td, f);
630 td_set_runstate(td, state);
635 fio_netio_close_file(td, f);
640 static int fio_netio_setup_connect_inet(struct thread_data *td,
641 const char *host, unsigned short port)
643 struct netio_data *nd = td->io_ops->data;
646 log_err("fio: connect with no host to connect to.\n");
648 log_err("fio: did you forget to set 'listen'?\n");
650 td_verror(td, EINVAL, "no hostname= set");
654 nd->addr.sin_family = AF_INET;
655 nd->addr.sin_port = htons(port);
657 if (inet_aton(host, &nd->addr.sin_addr) != 1) {
658 struct hostent *hent;
660 hent = gethostbyname(host);
662 td_verror(td, errno, "gethostbyname");
666 memcpy(&nd->addr.sin_addr, hent->h_addr, 4);
672 static int fio_netio_setup_connect_unix(struct thread_data *td,
675 struct netio_data *nd = td->io_ops->data;
676 struct sockaddr_un *soun = &nd->addr_un;
678 soun->sun_family = AF_UNIX;
679 strcpy(soun->sun_path, path);
683 static int fio_netio_setup_connect(struct thread_data *td)
685 struct netio_options *o = td->eo;
687 if (o->proto == FIO_TYPE_UDP || o->proto == FIO_TYPE_TCP)
688 return fio_netio_setup_connect_inet(td, td->o.filename,o->port);
690 return fio_netio_setup_connect_unix(td, td->o.filename);
693 static int fio_netio_setup_listen_unix(struct thread_data *td, const char *path)
695 struct netio_data *nd = td->io_ops->data;
696 struct sockaddr_un *addr = &nd->addr_un;
700 fd = socket(AF_UNIX, SOCK_STREAM, 0);
702 log_err("fio: socket: %s\n", strerror(errno));
708 memset(addr, 0, sizeof(*addr));
709 addr->sun_family = AF_UNIX;
710 strcpy(addr->sun_path, path);
713 len = sizeof(addr->sun_family) + strlen(path) + 1;
715 if (bind(fd, (struct sockaddr *) addr, len) < 0) {
716 log_err("fio: bind: %s\n", strerror(errno));
726 static int fio_netio_setup_listen_inet(struct thread_data *td, short port)
728 struct netio_data *nd = td->io_ops->data;
729 struct netio_options *o = td->eo;
732 if (o->proto == FIO_TYPE_TCP)
737 fd = socket(AF_INET, type, 0);
739 td_verror(td, errno, "socket");
744 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
745 td_verror(td, errno, "setsockopt");
749 if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) < 0) {
750 td_verror(td, errno, "setsockopt");
755 nd->addr.sin_family = AF_INET;
756 nd->addr.sin_addr.s_addr = htonl(INADDR_ANY);
757 nd->addr.sin_port = htons(port);
759 if (bind(fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
760 td_verror(td, errno, "bind");
768 static int fio_netio_setup_listen(struct thread_data *td)
770 struct netio_data *nd = td->io_ops->data;
771 struct netio_options *o = td->eo;
774 if (o->proto == FIO_TYPE_UDP || o->proto == FIO_TYPE_TCP)
775 ret = fio_netio_setup_listen_inet(td, o->port);
777 ret = fio_netio_setup_listen_unix(td, td->o.filename);
781 if (o->proto == FIO_TYPE_UDP)
784 if (listen(nd->listenfd, 10) < 0) {
785 td_verror(td, errno, "listen");
793 static int fio_netio_init(struct thread_data *td)
795 struct netio_options *o = td->eo;
800 WSAStartup(MAKEWORD(2,2), &wsd);
804 log_err("fio: network IO can't be random\n");
808 if (o->proto == FIO_TYPE_UNIX && o->port) {
809 log_err("fio: network IO port not valid with unix socket\n");
811 } else if (o->proto != FIO_TYPE_UNIX && !o->port) {
812 log_err("fio: network IO requires port for tcp or udp\n");
816 if (o->proto != FIO_TYPE_TCP) {
818 log_err("fio: listen only valid for TCP proto IO\n");
822 log_err("fio: datagram network connections must be"
826 if (o->proto == FIO_TYPE_UNIX && !td->o.filename) {
827 log_err("fio: UNIX sockets need host/filename\n");
830 o->listen = td_read(td);
833 if (o->proto != FIO_TYPE_UNIX && o->listen && td->o.filename) {
834 log_err("fio: hostname not valid for inbound network IO\n");
839 ret = fio_netio_setup_listen(td);
841 ret = fio_netio_setup_connect(td);
846 static void fio_netio_cleanup(struct thread_data *td)
848 struct netio_data *nd = td->io_ops->data;
851 if (nd->listenfd != -1)
853 if (nd->pipes[0] != -1)
855 if (nd->pipes[1] != -1)
862 static int fio_netio_setup(struct thread_data *td)
864 struct netio_data *nd;
866 if (!td->files_index) {
867 add_file(td, td->o.filename ?: "net");
868 td->o.nr_files = td->o.nr_files ?: 1;
871 if (!td->io_ops->data) {
872 nd = malloc(sizeof(*nd));;
874 memset(nd, 0, sizeof(*nd));
876 nd->pipes[0] = nd->pipes[1] = -1;
877 td->io_ops->data = nd;
883 #ifdef FIO_HAVE_SPLICE
884 static int fio_netio_setup_splice(struct thread_data *td)
886 struct netio_data *nd;
890 nd = td->io_ops->data;
892 if (pipe(nd->pipes) < 0)
902 static struct ioengine_ops ioengine_splice = {
904 .version = FIO_IOOPS_VERSION,
905 .prep = fio_netio_prep,
906 .queue = fio_netio_queue,
907 .setup = fio_netio_setup_splice,
908 .init = fio_netio_init,
909 .cleanup = fio_netio_cleanup,
910 .open_file = fio_netio_open_file,
911 .close_file = generic_close_file,
913 .option_struct_size = sizeof(struct netio_options),
914 .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
915 FIO_SIGTERM | FIO_PIPEIO,
919 static struct ioengine_ops ioengine_rw = {
921 .version = FIO_IOOPS_VERSION,
922 .prep = fio_netio_prep,
923 .queue = fio_netio_queue,
924 .setup = fio_netio_setup,
925 .init = fio_netio_init,
926 .cleanup = fio_netio_cleanup,
927 .open_file = fio_netio_open_file,
928 .close_file = fio_netio_close_file,
930 .option_struct_size = sizeof(struct netio_options),
931 .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
932 FIO_SIGTERM | FIO_PIPEIO,
935 static int str_hostname_cb(void *data, const char *input)
937 struct netio_options *o = data;
939 if (o->td->o.filename)
940 free(o->td->o.filename);
941 o->td->o.filename = strdup(input);
945 static void fio_init fio_netio_register(void)
947 register_ioengine(&ioengine_rw);
948 #ifdef FIO_HAVE_SPLICE
949 register_ioengine(&ioengine_splice);
953 static void fio_exit fio_netio_unregister(void)
955 unregister_ioengine(&ioengine_rw);
956 #ifdef FIO_HAVE_SPLICE
957 unregister_ioengine(&ioengine_splice);