4 * IO engine that reads/writes to/from sockets.
12 #include <netinet/in.h>
13 #include <arpa/inet.h>
16 #include <sys/types.h>
18 #include <sys/socket.h>
27 struct sockaddr_in addr;
28 struct sockaddr_un addr_un;
31 struct netio_options {
32 struct thread_data *td;
38 struct udp_close_msg {
44 FIO_LINK_CLOSE = 0x89,
45 FIO_LINK_OPEN_CLOSE_MAGIC = 0x6c696e6b,
53 static int str_hostname_cb(void *data, const char *input);
54 static struct fio_option options[] = {
57 .type = FIO_OPT_STR_STORE,
58 .cb = str_hostname_cb,
59 .help = "Hostname for net IO engine",
64 .off1 = offsetof(struct netio_options, port),
67 .help = "Port to use for TCP or UDP net connections",
73 .off1 = offsetof(struct netio_options, proto),
74 .help = "Network protocol to use",
79 .help = "Transmission Control Protocol",
83 .help = "User Datagram Protocol",
86 .oval = FIO_TYPE_UNIX,
87 .help = "UNIX domain socket",
93 .type = FIO_OPT_STR_SET,
94 .off1 = offsetof(struct netio_options, listen),
95 .help = "Listen for incoming TCP connections",
103 * Return -1 for error and 'nr events' for a positive number
106 static int poll_wait(struct thread_data *td, int fd, short events)
111 while (!td->terminate) {
114 ret = poll(&pfd, 1, -1);
119 td_verror(td, errno, "poll");
127 if (pfd.revents & events)
133 static int fio_netio_prep(struct thread_data *td, struct io_u *io_u)
135 struct netio_options *o = td->eo;
138 * Make sure we don't see spurious reads to a receiver, and vice versa
140 if (o->proto == FIO_TYPE_TCP)
143 if ((o->listen && io_u->ddir == DDIR_WRITE) ||
144 (!o->listen && io_u->ddir == DDIR_READ)) {
145 td_verror(td, EINVAL, "bad direction");
152 #ifdef FIO_HAVE_SPLICE
153 static int splice_io_u(int fdin, int fdout, unsigned int len)
158 int ret = splice(fdin, NULL, fdout, NULL, len, 0);
176 * Receive bytes from a socket and fill them into the internal pipe
178 static int splice_in(struct thread_data *td, struct io_u *io_u)
180 struct netio_data *nd = td->io_ops->data;
182 return splice_io_u(io_u->file->fd, nd->pipes[1], io_u->xfer_buflen);
186 * Transmit 'len' bytes from the internal pipe
188 static int splice_out(struct thread_data *td, struct io_u *io_u,
191 struct netio_data *nd = td->io_ops->data;
193 return splice_io_u(nd->pipes[0], io_u->file->fd, len);
196 static int vmsplice_io_u(struct io_u *io_u, int fd, unsigned int len)
199 .iov_base = io_u->xfer_buf,
204 while (iov.iov_len) {
205 int ret = vmsplice(fd, &iov, 1, SPLICE_F_MOVE);
224 * vmsplice() pipe to io_u buffer
226 static int vmsplice_io_u_out(struct thread_data *td, struct io_u *io_u,
229 struct netio_data *nd = td->io_ops->data;
231 return vmsplice_io_u(io_u, nd->pipes[0], len);
235 * vmsplice() io_u to pipe
237 static int vmsplice_io_u_in(struct thread_data *td, struct io_u *io_u)
239 struct netio_data *nd = td->io_ops->data;
241 return vmsplice_io_u(io_u, nd->pipes[1], io_u->xfer_buflen);
245 * splice receive - transfer socket data into a pipe using splice, then map
246 * that pipe data into the io_u using vmsplice.
248 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
252 ret = splice_in(td, io_u);
254 return vmsplice_io_u_out(td, io_u, ret);
260 * splice transmit - map data from the io_u into a pipe by using vmsplice,
261 * then transfer that pipe to a socket using splice.
263 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
267 ret = vmsplice_io_u_in(td, io_u);
269 return splice_out(td, io_u, ret);
274 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
280 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
287 static int fio_netio_send(struct thread_data *td, struct io_u *io_u)
289 struct netio_data *nd = td->io_ops->data;
290 struct netio_options *o = td->eo;
291 int ret, flags = OS_MSG_DONTWAIT;
294 if (o->proto == FIO_TYPE_UDP) {
295 struct sockaddr *to = (struct sockaddr *) &nd->addr;
297 ret = sendto(io_u->file->fd, io_u->xfer_buf,
298 io_u->xfer_buflen, flags, to,
302 * if we are going to write more, set MSG_MORE
305 if (td->this_io_bytes[DDIR_WRITE] + io_u->xfer_buflen <
309 ret = send(io_u->file->fd, io_u->xfer_buf,
310 io_u->xfer_buflen, flags);
315 ret = poll_wait(td, io_u->file->fd, POLLOUT);
319 flags &= ~OS_MSG_DONTWAIT;
325 static int is_udp_close(struct io_u *io_u, int len)
327 struct udp_close_msg *msg;
329 if (len != sizeof(struct udp_close_msg))
332 msg = io_u->xfer_buf;
333 if (ntohl(msg->magic) != FIO_LINK_OPEN_CLOSE_MAGIC)
335 if (ntohl(msg->cmd) != FIO_LINK_CLOSE)
341 static int fio_netio_recv(struct thread_data *td, struct io_u *io_u)
343 struct netio_data *nd = td->io_ops->data;
344 struct netio_options *o = td->eo;
345 int ret, flags = OS_MSG_DONTWAIT;
348 if (o->proto == FIO_TYPE_UDP) {
349 fio_socklen_t len = sizeof(nd->addr);
350 struct sockaddr *from = (struct sockaddr *) &nd->addr;
352 ret = recvfrom(io_u->file->fd, io_u->xfer_buf,
353 io_u->xfer_buflen, flags, from, &len);
354 if (is_udp_close(io_u, ret)) {
359 ret = recv(io_u->file->fd, io_u->xfer_buf,
360 io_u->xfer_buflen, flags);
364 else if (!ret && (flags & MSG_WAITALL))
367 ret = poll_wait(td, io_u->file->fd, POLLIN);
370 flags &= ~OS_MSG_DONTWAIT;
371 flags |= MSG_WAITALL;
377 static int fio_netio_queue(struct thread_data *td, struct io_u *io_u)
379 struct netio_data *nd = td->io_ops->data;
380 struct netio_options *o = td->eo;
383 fio_ro_check(td, io_u);
385 if (io_u->ddir == DDIR_WRITE) {
386 if (!nd->use_splice || o->proto == FIO_TYPE_UDP ||
387 o->proto == FIO_TYPE_UNIX)
388 ret = fio_netio_send(td, io_u);
390 ret = fio_netio_splice_out(td, io_u);
391 } else if (io_u->ddir == DDIR_READ) {
392 if (!nd->use_splice || o->proto == FIO_TYPE_UDP ||
393 o->proto == FIO_TYPE_UNIX)
394 ret = fio_netio_recv(td, io_u);
396 ret = fio_netio_splice_in(td, io_u);
398 ret = 0; /* must be a SYNC */
400 if (ret != (int) io_u->xfer_buflen) {
402 io_u->resid = io_u->xfer_buflen - ret;
404 return FIO_Q_COMPLETED;
408 if (io_u->ddir == DDIR_WRITE && err == EMSGSIZE)
416 td_verror(td, io_u->error, "xfer");
418 return FIO_Q_COMPLETED;
421 static int fio_netio_connect(struct thread_data *td, struct fio_file *f)
423 struct netio_data *nd = td->io_ops->data;
424 struct netio_options *o = td->eo;
427 if (o->proto == FIO_TYPE_TCP) {
430 } else if (o->proto == FIO_TYPE_UDP) {
433 } else if (o->proto == FIO_TYPE_UNIX) {
437 log_err("fio: bad network type %d\n", o->proto);
442 f->fd = socket(domain, type, 0);
444 td_verror(td, errno, "socket");
448 if (o->proto == FIO_TYPE_UDP)
450 else if (o->proto == FIO_TYPE_TCP) {
451 fio_socklen_t len = sizeof(nd->addr);
453 if (connect(f->fd, (struct sockaddr *) &nd->addr, len) < 0) {
454 td_verror(td, errno, "connect");
459 struct sockaddr_un *addr = &nd->addr_un;
462 len = sizeof(addr->sun_family) + strlen(addr->sun_path) + 1;
464 if (connect(f->fd, (struct sockaddr *) addr, len) < 0) {
465 td_verror(td, errno, "connect");
474 static int fio_netio_accept(struct thread_data *td, struct fio_file *f)
476 struct netio_data *nd = td->io_ops->data;
477 struct netio_options *o = td->eo;
478 fio_socklen_t socklen = sizeof(nd->addr);
481 if (o->proto == FIO_TYPE_UDP) {
482 f->fd = nd->listenfd;
486 state = td->runstate;
487 td_set_runstate(td, TD_SETTING_UP);
489 log_info("fio: waiting for connection\n");
491 if (poll_wait(td, nd->listenfd, POLLIN) < 0)
494 f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr, &socklen);
496 td_verror(td, errno, "accept");
500 td_set_runstate(td, state);
503 td_set_runstate(td, state);
507 static void fio_netio_udp_close(struct thread_data *td, struct fio_file *f)
509 struct netio_data *nd = td->io_ops->data;
510 struct udp_close_msg msg;
511 struct sockaddr *to = (struct sockaddr *) &nd->addr;
514 msg.magic = htonl(FIO_LINK_OPEN_CLOSE_MAGIC);
515 msg.cmd = htonl(FIO_LINK_CLOSE);
517 ret = sendto(f->fd, &msg, sizeof(msg), MSG_WAITALL, to,
520 td_verror(td, errno, "sendto udp link close");
523 static int fio_netio_close_file(struct thread_data *td, struct fio_file *f)
525 struct netio_options *o = td->eo;
528 * If this is an UDP connection, notify the receiver that we are
529 * closing down the link
531 if (o->proto == FIO_TYPE_UDP)
532 fio_netio_udp_close(td, f);
534 return generic_close_file(td, f);
537 static int fio_netio_udp_recv_open(struct thread_data *td, struct fio_file *f)
539 struct netio_data *nd = td->io_ops->data;
540 struct udp_close_msg msg;
541 struct sockaddr *to = (struct sockaddr *) &nd->addr;
542 fio_socklen_t len = sizeof(nd->addr);
545 ret = recvfrom(f->fd, &msg, sizeof(msg), MSG_WAITALL, to, &len);
547 td_verror(td, errno, "sendto udp link open");
551 if (ntohl(msg.magic) != FIO_LINK_OPEN_CLOSE_MAGIC ||
552 ntohl(msg.cmd) != FIO_LINK_OPEN) {
553 log_err("fio: bad udp open magic %x/%x\n", ntohl(msg.magic),
561 static int fio_netio_udp_send_open(struct thread_data *td, struct fio_file *f)
563 struct netio_data *nd = td->io_ops->data;
564 struct udp_close_msg msg;
565 struct sockaddr *to = (struct sockaddr *) &nd->addr;
568 msg.magic = htonl(FIO_LINK_OPEN_CLOSE_MAGIC);
569 msg.cmd = htonl(FIO_LINK_OPEN);
571 ret = sendto(f->fd, &msg, sizeof(msg), MSG_WAITALL, to,
574 td_verror(td, errno, "sendto udp link open");
581 static int fio_netio_open_file(struct thread_data *td, struct fio_file *f)
584 struct netio_options *o = td->eo;
587 ret = fio_netio_accept(td, f);
589 ret = fio_netio_connect(td, f);
596 if (o->proto == FIO_TYPE_UDP) {
598 ret = fio_netio_udp_send_open(td, f);
602 state = td->runstate;
603 td_set_runstate(td, TD_SETTING_UP);
604 ret = fio_netio_udp_recv_open(td, f);
605 td_set_runstate(td, state);
610 fio_netio_close_file(td, f);
615 static int fio_netio_setup_connect_inet(struct thread_data *td,
616 const char *host, unsigned short port)
618 struct netio_data *nd = td->io_ops->data;
621 log_err("fio: connect with no host to connect to.\n");
623 log_err("fio: did you forget to set 'listen'?\n");
625 td_verror(td, EINVAL, "no hostname= set");
629 nd->addr.sin_family = AF_INET;
630 nd->addr.sin_port = htons(port);
632 if (inet_aton(host, &nd->addr.sin_addr) != 1) {
633 struct hostent *hent;
635 hent = gethostbyname(host);
637 td_verror(td, errno, "gethostbyname");
641 memcpy(&nd->addr.sin_addr, hent->h_addr, 4);
647 static int fio_netio_setup_connect_unix(struct thread_data *td,
650 struct netio_data *nd = td->io_ops->data;
651 struct sockaddr_un *soun = &nd->addr_un;
653 soun->sun_family = AF_UNIX;
654 strcpy(soun->sun_path, path);
658 static int fio_netio_setup_connect(struct thread_data *td)
660 struct netio_options *o = td->eo;
662 if (o->proto == FIO_TYPE_UDP || o->proto == FIO_TYPE_TCP)
663 return fio_netio_setup_connect_inet(td, td->o.filename,o->port);
665 return fio_netio_setup_connect_unix(td, td->o.filename);
668 static int fio_netio_setup_listen_unix(struct thread_data *td, const char *path)
670 struct netio_data *nd = td->io_ops->data;
671 struct sockaddr_un *addr = &nd->addr_un;
675 fd = socket(AF_UNIX, SOCK_STREAM, 0);
677 log_err("fio: socket: %s\n", strerror(errno));
683 memset(addr, 0, sizeof(*addr));
684 addr->sun_family = AF_UNIX;
685 strcpy(addr->sun_path, path);
688 len = sizeof(addr->sun_family) + strlen(path) + 1;
690 if (bind(fd, (struct sockaddr *) addr, len) < 0) {
691 log_err("fio: bind: %s\n", strerror(errno));
701 static int fio_netio_setup_listen_inet(struct thread_data *td, short port)
703 struct netio_data *nd = td->io_ops->data;
704 struct netio_options *o = td->eo;
707 if (o->proto == FIO_TYPE_TCP)
712 fd = socket(AF_INET, type, 0);
714 td_verror(td, errno, "socket");
719 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
720 td_verror(td, errno, "setsockopt");
724 if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) < 0) {
725 td_verror(td, errno, "setsockopt");
730 nd->addr.sin_family = AF_INET;
731 nd->addr.sin_addr.s_addr = htonl(INADDR_ANY);
732 nd->addr.sin_port = htons(port);
734 if (bind(fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
735 td_verror(td, errno, "bind");
743 static int fio_netio_setup_listen(struct thread_data *td)
745 struct netio_data *nd = td->io_ops->data;
746 struct netio_options *o = td->eo;
749 if (o->proto == FIO_TYPE_UDP || o->proto == FIO_TYPE_TCP)
750 ret = fio_netio_setup_listen_inet(td, o->port);
752 ret = fio_netio_setup_listen_unix(td, td->o.filename);
756 if (o->proto == FIO_TYPE_UDP)
759 if (listen(nd->listenfd, 10) < 0) {
760 td_verror(td, errno, "listen");
768 static int fio_netio_init(struct thread_data *td)
770 struct netio_options *o = td->eo;
775 WSAStartup(MAKEWORD(2,2), &wsd);
779 log_err("fio: network IO can't be random\n");
783 if (o->proto == FIO_TYPE_UNIX && o->port) {
784 log_err("fio: network IO port not valid with unix socket\n");
786 } else if (o->proto != FIO_TYPE_UNIX && !o->port) {
787 log_err("fio: network IO requires port for tcp or udp\n");
791 if (o->proto != FIO_TYPE_TCP) {
793 log_err("fio: listen only valid for TCP proto IO\n");
797 log_err("fio: datagram network connections must be"
801 if (o->proto == FIO_TYPE_UNIX && !td->o.filename) {
802 log_err("fio: UNIX sockets need host/filename\n");
805 o->listen = td_read(td);
808 if (o->proto != FIO_TYPE_UNIX && o->listen && td->o.filename) {
809 log_err("fio: hostname not valid for inbound network IO\n");
814 ret = fio_netio_setup_listen(td);
816 ret = fio_netio_setup_connect(td);
821 static void fio_netio_cleanup(struct thread_data *td)
823 struct netio_data *nd = td->io_ops->data;
826 if (nd->listenfd != -1)
828 if (nd->pipes[0] != -1)
830 if (nd->pipes[1] != -1)
837 static int fio_netio_setup(struct thread_data *td)
839 struct netio_data *nd;
841 if (!td->files_index) {
842 add_file(td, td->o.filename ?: "net");
843 td->o.nr_files = td->o.nr_files ?: 1;
846 if (!td->io_ops->data) {
847 nd = malloc(sizeof(*nd));;
849 memset(nd, 0, sizeof(*nd));
851 nd->pipes[0] = nd->pipes[1] = -1;
852 td->io_ops->data = nd;
858 #ifdef FIO_HAVE_SPLICE
859 static int fio_netio_setup_splice(struct thread_data *td)
861 struct netio_data *nd;
865 nd = td->io_ops->data;
867 if (pipe(nd->pipes) < 0)
877 static struct ioengine_ops ioengine_splice = {
879 .version = FIO_IOOPS_VERSION,
880 .prep = fio_netio_prep,
881 .queue = fio_netio_queue,
882 .setup = fio_netio_setup_splice,
883 .init = fio_netio_init,
884 .cleanup = fio_netio_cleanup,
885 .open_file = fio_netio_open_file,
886 .close_file = generic_close_file,
888 .option_struct_size = sizeof(struct netio_options),
889 .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
890 FIO_SIGTERM | FIO_PIPEIO,
894 static struct ioengine_ops ioengine_rw = {
896 .version = FIO_IOOPS_VERSION,
897 .prep = fio_netio_prep,
898 .queue = fio_netio_queue,
899 .setup = fio_netio_setup,
900 .init = fio_netio_init,
901 .cleanup = fio_netio_cleanup,
902 .open_file = fio_netio_open_file,
903 .close_file = fio_netio_close_file,
905 .option_struct_size = sizeof(struct netio_options),
906 .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
907 FIO_SIGTERM | FIO_PIPEIO,
910 static int str_hostname_cb(void *data, const char *input)
912 struct netio_options *o = data;
914 if (o->td->o.filename)
915 free(o->td->o.filename);
916 o->td->o.filename = strdup(input);
920 static void fio_init fio_netio_register(void)
922 register_ioengine(&ioengine_rw);
923 #ifdef FIO_HAVE_SPLICE
924 register_ioengine(&ioengine_splice);
928 static void fio_exit fio_netio_unregister(void)
930 unregister_ioengine(&ioengine_rw);
931 #ifdef FIO_HAVE_SPLICE
932 unregister_ioengine(&ioengine_splice);