4 * IO engine that reads/writes to/from sockets.
13 #include <netinet/in.h>
14 #include <arpa/inet.h>
17 #include <sys/types.h>
19 #include <sys/socket.h>
28 struct sockaddr_in addr;
29 struct sockaddr_un addr_un;
32 struct netio_options {
33 struct thread_data *td;
37 unsigned int pingpong;
40 struct udp_close_msg {
46 FIO_LINK_CLOSE = 0x89,
47 FIO_LINK_OPEN_CLOSE_MAGIC = 0x6c696e6b,
55 static int str_hostname_cb(void *data, const char *input);
56 static struct fio_option options[] = {
59 .type = FIO_OPT_STR_STORE,
60 .cb = str_hostname_cb,
61 .help = "Hostname for net IO engine",
66 .off1 = offsetof(struct netio_options, port),
69 .help = "Port to use for TCP or UDP net connections",
75 .off1 = offsetof(struct netio_options, proto),
76 .help = "Network protocol to use",
81 .help = "Transmission Control Protocol",
85 .help = "User Datagram Protocol",
88 .oval = FIO_TYPE_UNIX,
89 .help = "UNIX domain socket",
95 .type = FIO_OPT_STR_SET,
96 .off1 = offsetof(struct netio_options, listen),
97 .help = "Listen for incoming TCP connections",
101 .type = FIO_OPT_STR_SET,
102 .off1 = offsetof(struct netio_options, pingpong),
103 .help = "Ping-pong IO requests",
111 * Return -1 for error and 'nr events' for a positive number
114 static int poll_wait(struct thread_data *td, int fd, short events)
119 while (!td->terminate) {
122 ret = poll(&pfd, 1, -1);
127 td_verror(td, errno, "poll");
135 if (pfd.revents & events)
141 static int fio_netio_prep(struct thread_data *td, struct io_u *io_u)
143 struct netio_options *o = td->eo;
146 * Make sure we don't see spurious reads to a receiver, and vice versa
148 if (o->proto == FIO_TYPE_TCP)
151 if ((o->listen && io_u->ddir == DDIR_WRITE) ||
152 (!o->listen && io_u->ddir == DDIR_READ)) {
153 td_verror(td, EINVAL, "bad direction");
160 #ifdef FIO_HAVE_SPLICE
161 static int splice_io_u(int fdin, int fdout, unsigned int len)
166 int ret = splice(fdin, NULL, fdout, NULL, len, 0);
184 * Receive bytes from a socket and fill them into the internal pipe
186 static int splice_in(struct thread_data *td, struct io_u *io_u)
188 struct netio_data *nd = td->io_ops->data;
190 return splice_io_u(io_u->file->fd, nd->pipes[1], io_u->xfer_buflen);
194 * Transmit 'len' bytes from the internal pipe
196 static int splice_out(struct thread_data *td, struct io_u *io_u,
199 struct netio_data *nd = td->io_ops->data;
201 return splice_io_u(nd->pipes[0], io_u->file->fd, len);
204 static int vmsplice_io_u(struct io_u *io_u, int fd, unsigned int len)
207 .iov_base = io_u->xfer_buf,
212 while (iov.iov_len) {
213 int ret = vmsplice(fd, &iov, 1, SPLICE_F_MOVE);
232 * vmsplice() pipe to io_u buffer
234 static int vmsplice_io_u_out(struct thread_data *td, struct io_u *io_u,
237 struct netio_data *nd = td->io_ops->data;
239 return vmsplice_io_u(io_u, nd->pipes[0], len);
243 * vmsplice() io_u to pipe
245 static int vmsplice_io_u_in(struct thread_data *td, struct io_u *io_u)
247 struct netio_data *nd = td->io_ops->data;
249 return vmsplice_io_u(io_u, nd->pipes[1], io_u->xfer_buflen);
253 * splice receive - transfer socket data into a pipe using splice, then map
254 * that pipe data into the io_u using vmsplice.
256 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
260 ret = splice_in(td, io_u);
262 return vmsplice_io_u_out(td, io_u, ret);
268 * splice transmit - map data from the io_u into a pipe by using vmsplice,
269 * then transfer that pipe to a socket using splice.
271 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
275 ret = vmsplice_io_u_in(td, io_u);
277 return splice_out(td, io_u, ret);
282 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
288 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
295 static int fio_netio_send(struct thread_data *td, struct io_u *io_u)
297 struct netio_data *nd = td->io_ops->data;
298 struct netio_options *o = td->eo;
302 if (o->proto == FIO_TYPE_UDP) {
303 struct sockaddr *to = (struct sockaddr *) &nd->addr;
305 ret = sendto(io_u->file->fd, io_u->xfer_buf,
306 io_u->xfer_buflen, flags, to,
310 * if we are going to write more, set MSG_MORE
313 if ((td->this_io_bytes[DDIR_WRITE] + io_u->xfer_buflen <
314 td->o.size) && !o->pingpong)
317 ret = send(io_u->file->fd, io_u->xfer_buf,
318 io_u->xfer_buflen, flags);
323 ret = poll_wait(td, io_u->file->fd, POLLOUT);
331 static int is_udp_close(struct io_u *io_u, int len)
333 struct udp_close_msg *msg;
335 if (len != sizeof(struct udp_close_msg))
338 msg = io_u->xfer_buf;
339 if (ntohl(msg->magic) != FIO_LINK_OPEN_CLOSE_MAGIC)
341 if (ntohl(msg->cmd) != FIO_LINK_CLOSE)
347 static int fio_netio_recv(struct thread_data *td, struct io_u *io_u)
349 struct netio_data *nd = td->io_ops->data;
350 struct netio_options *o = td->eo;
354 if (o->proto == FIO_TYPE_UDP) {
355 fio_socklen_t len = sizeof(nd->addr);
356 struct sockaddr *from = (struct sockaddr *) &nd->addr;
358 ret = recvfrom(io_u->file->fd, io_u->xfer_buf,
359 io_u->xfer_buflen, flags, from, &len);
360 if (is_udp_close(io_u, ret)) {
365 ret = recv(io_u->file->fd, io_u->xfer_buf,
366 io_u->xfer_buflen, flags);
370 else if (!ret && (flags & MSG_WAITALL))
373 ret = poll_wait(td, io_u->file->fd, POLLIN);
376 flags |= MSG_WAITALL;
382 static int __fio_netio_queue(struct thread_data *td, struct io_u *io_u,
385 struct netio_data *nd = td->io_ops->data;
386 struct netio_options *o = td->eo;
389 if (ddir == DDIR_WRITE) {
390 if (!nd->use_splice || o->proto == FIO_TYPE_UDP ||
391 o->proto == FIO_TYPE_UNIX)
392 ret = fio_netio_send(td, io_u);
394 ret = fio_netio_splice_out(td, io_u);
395 } else if (ddir == DDIR_READ) {
396 if (!nd->use_splice || o->proto == FIO_TYPE_UDP ||
397 o->proto == FIO_TYPE_UNIX)
398 ret = fio_netio_recv(td, io_u);
400 ret = fio_netio_splice_in(td, io_u);
402 ret = 0; /* must be a SYNC */
404 if (ret != (int) io_u->xfer_buflen) {
406 io_u->resid = io_u->xfer_buflen - ret;
408 return FIO_Q_COMPLETED;
412 if (ddir == DDIR_WRITE && err == EMSGSIZE)
420 td_verror(td, io_u->error, "xfer");
422 return FIO_Q_COMPLETED;
425 static int fio_netio_queue(struct thread_data *td, struct io_u *io_u)
427 struct netio_options *o = td->eo;
430 fio_ro_check(td, io_u);
432 ret = __fio_netio_queue(td, io_u, io_u->ddir);
433 if (!o->pingpong || ret != FIO_Q_COMPLETED)
437 * For ping-pong mode, receive or send reply as needed
439 if (td_read(td) && io_u->ddir == DDIR_READ)
440 ret = __fio_netio_queue(td, io_u, DDIR_WRITE);
441 else if (td_write(td) && io_u->ddir == DDIR_WRITE)
442 ret = __fio_netio_queue(td, io_u, DDIR_READ);
447 static int fio_netio_connect(struct thread_data *td, struct fio_file *f)
449 struct netio_data *nd = td->io_ops->data;
450 struct netio_options *o = td->eo;
453 if (o->proto == FIO_TYPE_TCP) {
456 } else if (o->proto == FIO_TYPE_UDP) {
459 } else if (o->proto == FIO_TYPE_UNIX) {
463 log_err("fio: bad network type %d\n", o->proto);
468 f->fd = socket(domain, type, 0);
470 td_verror(td, errno, "socket");
474 if (o->proto == FIO_TYPE_UDP)
476 else if (o->proto == FIO_TYPE_TCP) {
477 fio_socklen_t len = sizeof(nd->addr);
479 if (connect(f->fd, (struct sockaddr *) &nd->addr, len) < 0) {
480 td_verror(td, errno, "connect");
485 struct sockaddr_un *addr = &nd->addr_un;
488 len = sizeof(addr->sun_family) + strlen(addr->sun_path) + 1;
490 if (connect(f->fd, (struct sockaddr *) addr, len) < 0) {
491 td_verror(td, errno, "connect");
500 static int fio_netio_accept(struct thread_data *td, struct fio_file *f)
502 struct netio_data *nd = td->io_ops->data;
503 struct netio_options *o = td->eo;
504 fio_socklen_t socklen = sizeof(nd->addr);
507 if (o->proto == FIO_TYPE_UDP) {
508 f->fd = nd->listenfd;
512 state = td->runstate;
513 td_set_runstate(td, TD_SETTING_UP);
515 log_info("fio: waiting for connection\n");
517 if (poll_wait(td, nd->listenfd, POLLIN) < 0)
520 f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr, &socklen);
522 td_verror(td, errno, "accept");
527 td_set_runstate(td, state);
530 td_set_runstate(td, state);
534 static void fio_netio_udp_close(struct thread_data *td, struct fio_file *f)
536 struct netio_data *nd = td->io_ops->data;
537 struct udp_close_msg msg;
538 struct sockaddr *to = (struct sockaddr *) &nd->addr;
541 msg.magic = htonl(FIO_LINK_OPEN_CLOSE_MAGIC);
542 msg.cmd = htonl(FIO_LINK_CLOSE);
544 ret = sendto(f->fd, &msg, sizeof(msg), MSG_WAITALL, to,
547 td_verror(td, errno, "sendto udp link close");
550 static int fio_netio_close_file(struct thread_data *td, struct fio_file *f)
552 struct netio_options *o = td->eo;
555 * If this is an UDP connection, notify the receiver that we are
556 * closing down the link
558 if (o->proto == FIO_TYPE_UDP)
559 fio_netio_udp_close(td, f);
561 return generic_close_file(td, f);
564 static int fio_netio_udp_recv_open(struct thread_data *td, struct fio_file *f)
566 struct netio_data *nd = td->io_ops->data;
567 struct udp_close_msg msg;
568 struct sockaddr *to = (struct sockaddr *) &nd->addr;
569 fio_socklen_t len = sizeof(nd->addr);
572 ret = recvfrom(f->fd, &msg, sizeof(msg), MSG_WAITALL, to, &len);
574 td_verror(td, errno, "sendto udp link open");
578 if (ntohl(msg.magic) != FIO_LINK_OPEN_CLOSE_MAGIC ||
579 ntohl(msg.cmd) != FIO_LINK_OPEN) {
580 log_err("fio: bad udp open magic %x/%x\n", ntohl(msg.magic),
588 static int fio_netio_udp_send_open(struct thread_data *td, struct fio_file *f)
590 struct netio_data *nd = td->io_ops->data;
591 struct udp_close_msg msg;
592 struct sockaddr *to = (struct sockaddr *) &nd->addr;
595 msg.magic = htonl(FIO_LINK_OPEN_CLOSE_MAGIC);
596 msg.cmd = htonl(FIO_LINK_OPEN);
598 ret = sendto(f->fd, &msg, sizeof(msg), MSG_WAITALL, to,
601 td_verror(td, errno, "sendto udp link open");
608 static int fio_netio_open_file(struct thread_data *td, struct fio_file *f)
611 struct netio_options *o = td->eo;
614 ret = fio_netio_accept(td, f);
616 ret = fio_netio_connect(td, f);
623 if (o->proto == FIO_TYPE_UDP) {
625 ret = fio_netio_udp_send_open(td, f);
629 state = td->runstate;
630 td_set_runstate(td, TD_SETTING_UP);
631 ret = fio_netio_udp_recv_open(td, f);
632 td_set_runstate(td, state);
637 fio_netio_close_file(td, f);
642 static int fio_netio_setup_connect_inet(struct thread_data *td,
643 const char *host, unsigned short port)
645 struct netio_data *nd = td->io_ops->data;
648 log_err("fio: connect with no host to connect to.\n");
650 log_err("fio: did you forget to set 'listen'?\n");
652 td_verror(td, EINVAL, "no hostname= set");
656 nd->addr.sin_family = AF_INET;
657 nd->addr.sin_port = htons(port);
659 if (inet_aton(host, &nd->addr.sin_addr) != 1) {
660 struct hostent *hent;
662 hent = gethostbyname(host);
664 td_verror(td, errno, "gethostbyname");
668 memcpy(&nd->addr.sin_addr, hent->h_addr, 4);
674 static int fio_netio_setup_connect_unix(struct thread_data *td,
677 struct netio_data *nd = td->io_ops->data;
678 struct sockaddr_un *soun = &nd->addr_un;
680 soun->sun_family = AF_UNIX;
681 strcpy(soun->sun_path, path);
685 static int fio_netio_setup_connect(struct thread_data *td)
687 struct netio_options *o = td->eo;
689 if (o->proto == FIO_TYPE_UDP || o->proto == FIO_TYPE_TCP)
690 return fio_netio_setup_connect_inet(td, td->o.filename,o->port);
692 return fio_netio_setup_connect_unix(td, td->o.filename);
695 static int fio_netio_setup_listen_unix(struct thread_data *td, const char *path)
697 struct netio_data *nd = td->io_ops->data;
698 struct sockaddr_un *addr = &nd->addr_un;
702 fd = socket(AF_UNIX, SOCK_STREAM, 0);
704 log_err("fio: socket: %s\n", strerror(errno));
710 memset(addr, 0, sizeof(*addr));
711 addr->sun_family = AF_UNIX;
712 strcpy(addr->sun_path, path);
715 len = sizeof(addr->sun_family) + strlen(path) + 1;
717 if (bind(fd, (struct sockaddr *) addr, len) < 0) {
718 log_err("fio: bind: %s\n", strerror(errno));
728 static int fio_netio_setup_listen_inet(struct thread_data *td, short port)
730 struct netio_data *nd = td->io_ops->data;
731 struct netio_options *o = td->eo;
734 if (o->proto == FIO_TYPE_TCP)
739 fd = socket(AF_INET, type, 0);
741 td_verror(td, errno, "socket");
746 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
747 td_verror(td, errno, "setsockopt");
751 if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) < 0) {
752 td_verror(td, errno, "setsockopt");
757 nd->addr.sin_family = AF_INET;
758 nd->addr.sin_addr.s_addr = htonl(INADDR_ANY);
759 nd->addr.sin_port = htons(port);
761 if (bind(fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
762 td_verror(td, errno, "bind");
770 static int fio_netio_setup_listen(struct thread_data *td)
772 struct netio_data *nd = td->io_ops->data;
773 struct netio_options *o = td->eo;
776 if (o->proto == FIO_TYPE_UDP || o->proto == FIO_TYPE_TCP)
777 ret = fio_netio_setup_listen_inet(td, o->port);
779 ret = fio_netio_setup_listen_unix(td, td->o.filename);
783 if (o->proto == FIO_TYPE_UDP)
786 if (listen(nd->listenfd, 10) < 0) {
787 td_verror(td, errno, "listen");
795 static int fio_netio_init(struct thread_data *td)
797 struct netio_options *o = td->eo;
802 WSAStartup(MAKEWORD(2,2), &wsd);
806 log_err("fio: network IO can't be random\n");
810 if (o->proto == FIO_TYPE_UNIX && o->port) {
811 log_err("fio: network IO port not valid with unix socket\n");
813 } else if (o->proto != FIO_TYPE_UNIX && !o->port) {
814 log_err("fio: network IO requires port for tcp or udp\n");
818 if (o->proto != FIO_TYPE_TCP) {
820 log_err("fio: listen only valid for TCP proto IO\n");
824 log_err("fio: datagram network connections must be"
828 if (o->proto == FIO_TYPE_UNIX && !td->o.filename) {
829 log_err("fio: UNIX sockets need host/filename\n");
832 o->listen = td_read(td);
835 if (o->proto != FIO_TYPE_UNIX && o->listen && td->o.filename) {
836 log_err("fio: hostname not valid for inbound network IO\n");
841 ret = fio_netio_setup_listen(td);
843 ret = fio_netio_setup_connect(td);
848 static void fio_netio_cleanup(struct thread_data *td)
850 struct netio_data *nd = td->io_ops->data;
853 if (nd->listenfd != -1)
855 if (nd->pipes[0] != -1)
857 if (nd->pipes[1] != -1)
864 static int fio_netio_setup(struct thread_data *td)
866 struct netio_data *nd;
868 if (!td->files_index) {
869 add_file(td, td->o.filename ?: "net");
870 td->o.nr_files = td->o.nr_files ?: 1;
873 if (!td->io_ops->data) {
874 nd = malloc(sizeof(*nd));;
876 memset(nd, 0, sizeof(*nd));
878 nd->pipes[0] = nd->pipes[1] = -1;
879 td->io_ops->data = nd;
885 static void fio_netio_terminate(struct thread_data *td)
887 kill(td->pid, SIGUSR2);
890 #ifdef FIO_HAVE_SPLICE
891 static int fio_netio_setup_splice(struct thread_data *td)
893 struct netio_data *nd;
897 nd = td->io_ops->data;
899 if (pipe(nd->pipes) < 0)
909 static struct ioengine_ops ioengine_splice = {
911 .version = FIO_IOOPS_VERSION,
912 .prep = fio_netio_prep,
913 .queue = fio_netio_queue,
914 .setup = fio_netio_setup_splice,
915 .init = fio_netio_init,
916 .cleanup = fio_netio_cleanup,
917 .open_file = fio_netio_open_file,
918 .close_file = fio_netio_close_file,
919 .terminate = fio_netio_terminate,
921 .option_struct_size = sizeof(struct netio_options),
922 .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
927 static struct ioengine_ops ioengine_rw = {
929 .version = FIO_IOOPS_VERSION,
930 .prep = fio_netio_prep,
931 .queue = fio_netio_queue,
932 .setup = fio_netio_setup,
933 .init = fio_netio_init,
934 .cleanup = fio_netio_cleanup,
935 .open_file = fio_netio_open_file,
936 .close_file = fio_netio_close_file,
937 .terminate = fio_netio_terminate,
939 .option_struct_size = sizeof(struct netio_options),
940 .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
944 static int str_hostname_cb(void *data, const char *input)
946 struct netio_options *o = data;
948 if (o->td->o.filename)
949 free(o->td->o.filename);
950 o->td->o.filename = strdup(input);
954 static void fio_init fio_netio_register(void)
956 register_ioengine(&ioengine_rw);
957 #ifdef FIO_HAVE_SPLICE
958 register_ioengine(&ioengine_splice);
962 static void fio_exit fio_netio_unregister(void)
964 unregister_ioengine(&ioengine_rw);
965 #ifdef FIO_HAVE_SPLICE
966 unregister_ioengine(&ioengine_splice);