4 * IO engine that reads/writes to/from sockets.
12 #include <netinet/in.h>
13 #include <arpa/inet.h>
16 #include <sys/types.h>
18 #include <sys/socket.h>
27 struct sockaddr_in addr;
28 struct sockaddr_un addr_un;
31 struct netio_options {
32 struct thread_data *td;
38 struct udp_close_msg {
44 FIO_LINK_CLOSE = 0x89,
45 FIO_LINK_CLOSE_MAGIC = 0x6c696e6b,
52 static int str_hostname_cb(void *data, const char *input);
53 static struct fio_option options[] = {
56 .type = FIO_OPT_STR_STORE,
57 .cb = str_hostname_cb,
58 .help = "Hostname for net IO engine",
63 .off1 = offsetof(struct netio_options, port),
66 .help = "Port to use for TCP or UDP net connections",
72 .off1 = offsetof(struct netio_options, proto),
73 .help = "Network protocol to use",
78 .help = "Transmission Control Protocol",
82 .help = "User Datagram Protocol",
85 .oval = FIO_TYPE_UNIX,
86 .help = "UNIX domain socket",
92 .type = FIO_OPT_STR_SET,
93 .off1 = offsetof(struct netio_options, listen),
94 .help = "Listen for incoming TCP connections",
102 * Return -1 for error and 'nr events' for a positive number
105 static int poll_wait(struct thread_data *td, int fd, short events)
110 while (!td->terminate) {
113 ret = poll(&pfd, 1, -1);
118 td_verror(td, errno, "poll");
126 if (pfd.revents & events)
132 static int fio_netio_prep(struct thread_data *td, struct io_u *io_u)
134 struct netio_options *o = td->eo;
137 * Make sure we don't see spurious reads to a receiver, and vice versa
139 if (o->proto == FIO_TYPE_TCP)
142 if ((o->listen && io_u->ddir == DDIR_WRITE) ||
143 (!o->listen && io_u->ddir == DDIR_READ)) {
144 td_verror(td, EINVAL, "bad direction");
151 #ifdef FIO_HAVE_SPLICE
152 static int splice_io_u(int fdin, int fdout, unsigned int len)
157 int ret = splice(fdin, NULL, fdout, NULL, len, 0);
175 * Receive bytes from a socket and fill them into the internal pipe
177 static int splice_in(struct thread_data *td, struct io_u *io_u)
179 struct netio_data *nd = td->io_ops->data;
181 return splice_io_u(io_u->file->fd, nd->pipes[1], io_u->xfer_buflen);
185 * Transmit 'len' bytes from the internal pipe
187 static int splice_out(struct thread_data *td, struct io_u *io_u,
190 struct netio_data *nd = td->io_ops->data;
192 return splice_io_u(nd->pipes[0], io_u->file->fd, len);
195 static int vmsplice_io_u(struct io_u *io_u, int fd, unsigned int len)
198 .iov_base = io_u->xfer_buf,
203 while (iov.iov_len) {
204 int ret = vmsplice(fd, &iov, 1, SPLICE_F_MOVE);
223 * vmsplice() pipe to io_u buffer
225 static int vmsplice_io_u_out(struct thread_data *td, struct io_u *io_u,
228 struct netio_data *nd = td->io_ops->data;
230 return vmsplice_io_u(io_u, nd->pipes[0], len);
234 * vmsplice() io_u to pipe
236 static int vmsplice_io_u_in(struct thread_data *td, struct io_u *io_u)
238 struct netio_data *nd = td->io_ops->data;
240 return vmsplice_io_u(io_u, nd->pipes[1], io_u->xfer_buflen);
244 * splice receive - transfer socket data into a pipe using splice, then map
245 * that pipe data into the io_u using vmsplice.
247 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
251 ret = splice_in(td, io_u);
253 return vmsplice_io_u_out(td, io_u, ret);
259 * splice transmit - map data from the io_u into a pipe by using vmsplice,
260 * then transfer that pipe to a socket using splice.
262 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
266 ret = vmsplice_io_u_in(td, io_u);
268 return splice_out(td, io_u, ret);
273 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
279 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
286 static int fio_netio_send(struct thread_data *td, struct io_u *io_u)
288 struct netio_data *nd = td->io_ops->data;
289 struct netio_options *o = td->eo;
290 int ret, flags = OS_MSG_DONTWAIT;
293 if (o->proto == FIO_TYPE_UDP) {
294 struct sockaddr *to = (struct sockaddr *) &nd->addr;
296 ret = sendto(io_u->file->fd, io_u->xfer_buf,
297 io_u->xfer_buflen, flags, to,
301 * if we are going to write more, set MSG_MORE
304 if (td->this_io_bytes[DDIR_WRITE] + io_u->xfer_buflen <
308 ret = send(io_u->file->fd, io_u->xfer_buf,
309 io_u->xfer_buflen, flags);
314 ret = poll_wait(td, io_u->file->fd, POLLOUT);
318 flags &= ~OS_MSG_DONTWAIT;
324 static int is_udp_close(struct io_u *io_u, int len)
326 struct udp_close_msg *msg;
328 if (len != sizeof(struct udp_close_msg))
331 msg = io_u->xfer_buf;
332 if (ntohl(msg->magic) != FIO_LINK_CLOSE_MAGIC)
334 if (ntohl(msg->cmd) != FIO_LINK_CLOSE)
340 static int fio_netio_recv(struct thread_data *td, struct io_u *io_u)
342 struct netio_data *nd = td->io_ops->data;
343 struct netio_options *o = td->eo;
344 int ret, flags = OS_MSG_DONTWAIT;
347 if (o->proto == FIO_TYPE_UDP) {
348 fio_socklen_t len = sizeof(nd->addr);
349 struct sockaddr *from = (struct sockaddr *) &nd->addr;
351 ret = recvfrom(io_u->file->fd, io_u->xfer_buf,
352 io_u->xfer_buflen, flags, from, &len);
353 if (is_udp_close(io_u, ret)) {
358 ret = recv(io_u->file->fd, io_u->xfer_buf,
359 io_u->xfer_buflen, flags);
363 else if (!ret && (flags & MSG_WAITALL))
366 ret = poll_wait(td, io_u->file->fd, POLLIN);
369 flags &= ~OS_MSG_DONTWAIT;
370 flags |= MSG_WAITALL;
376 static int fio_netio_queue(struct thread_data *td, struct io_u *io_u)
378 struct netio_data *nd = td->io_ops->data;
379 struct netio_options *o = td->eo;
382 fio_ro_check(td, io_u);
384 if (io_u->ddir == DDIR_WRITE) {
385 if (!nd->use_splice || o->proto == FIO_TYPE_UDP ||
386 o->proto == FIO_TYPE_UNIX)
387 ret = fio_netio_send(td, io_u);
389 ret = fio_netio_splice_out(td, io_u);
390 } else if (io_u->ddir == DDIR_READ) {
391 if (!nd->use_splice || o->proto == FIO_TYPE_UDP ||
392 o->proto == FIO_TYPE_UNIX)
393 ret = fio_netio_recv(td, io_u);
395 ret = fio_netio_splice_in(td, io_u);
397 ret = 0; /* must be a SYNC */
399 if (ret != (int) io_u->xfer_buflen) {
401 io_u->resid = io_u->xfer_buflen - ret;
403 return FIO_Q_COMPLETED;
407 if (io_u->ddir == DDIR_WRITE && err == EMSGSIZE)
415 td_verror(td, io_u->error, "xfer");
417 return FIO_Q_COMPLETED;
420 static int fio_netio_connect(struct thread_data *td, struct fio_file *f)
422 struct netio_data *nd = td->io_ops->data;
423 struct netio_options *o = td->eo;
426 if (o->proto == FIO_TYPE_TCP) {
429 } else if (o->proto == FIO_TYPE_UDP) {
432 } else if (o->proto == FIO_TYPE_UNIX) {
436 log_err("fio: bad network type %d\n", o->proto);
441 f->fd = socket(domain, type, 0);
443 td_verror(td, errno, "socket");
447 if (o->proto == FIO_TYPE_UDP)
449 else if (o->proto == FIO_TYPE_TCP) {
450 fio_socklen_t len = sizeof(nd->addr);
452 if (connect(f->fd, (struct sockaddr *) &nd->addr, len) < 0) {
453 td_verror(td, errno, "connect");
458 struct sockaddr_un *addr = &nd->addr_un;
461 len = sizeof(addr->sun_family) + strlen(addr->sun_path) + 1;
463 if (connect(f->fd, (struct sockaddr *) addr, len) < 0) {
464 td_verror(td, errno, "connect");
473 static int fio_netio_accept(struct thread_data *td, struct fio_file *f)
475 struct netio_data *nd = td->io_ops->data;
476 struct netio_options *o = td->eo;
477 fio_socklen_t socklen = sizeof(nd->addr);
480 if (o->proto == FIO_TYPE_UDP) {
481 f->fd = nd->listenfd;
485 state = td->runstate;
486 td_set_runstate(td, TD_SETTING_UP);
488 log_info("fio: waiting for connection\n");
490 if (poll_wait(td, nd->listenfd, POLLIN) < 0)
493 f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr, &socklen);
495 td_verror(td, errno, "accept");
499 td_set_runstate(td, state);
502 td_set_runstate(td, state);
506 static int fio_netio_open_file(struct thread_data *td, struct fio_file *f)
509 struct netio_options *o = td->eo;
512 ret = fio_netio_accept(td, f);
514 ret = fio_netio_connect(td, f);
521 static void fio_netio_udp_close(struct thread_data *td, struct fio_file *f)
523 struct netio_data *nd = td->io_ops->data;
524 struct udp_close_msg msg;
525 struct sockaddr *to = (struct sockaddr *) &nd->addr;
528 msg.magic = htonl(FIO_LINK_CLOSE_MAGIC);
529 msg.cmd = htonl(FIO_LINK_CLOSE);
531 ret = sendto(f->fd, &msg, sizeof(msg), MSG_WAITALL, to,
534 td_verror(td, errno, "sendto udp link close");
537 static int fio_netio_close_file(struct thread_data *td, struct fio_file *f)
539 struct netio_options *o = td->eo;
542 * If this is an UDP connection, notify the receiver that we are
543 * closing down the link
545 if (o->proto == FIO_TYPE_UDP)
546 fio_netio_udp_close(td, f);
548 return generic_close_file(td, f);
551 static int fio_netio_setup_connect_inet(struct thread_data *td,
552 const char *host, unsigned short port)
554 struct netio_data *nd = td->io_ops->data;
557 log_err("fio: connect with no host to connect to.\n");
559 log_err("fio: did you forget to set 'listen'?\n");
561 td_verror(td, EINVAL, "no hostname= set");
565 nd->addr.sin_family = AF_INET;
566 nd->addr.sin_port = htons(port);
568 if (inet_aton(host, &nd->addr.sin_addr) != 1) {
569 struct hostent *hent;
571 hent = gethostbyname(host);
573 td_verror(td, errno, "gethostbyname");
577 memcpy(&nd->addr.sin_addr, hent->h_addr, 4);
583 static int fio_netio_setup_connect_unix(struct thread_data *td,
586 struct netio_data *nd = td->io_ops->data;
587 struct sockaddr_un *soun = &nd->addr_un;
589 soun->sun_family = AF_UNIX;
590 strcpy(soun->sun_path, path);
594 static int fio_netio_setup_connect(struct thread_data *td)
596 struct netio_options *o = td->eo;
598 if (o->proto == FIO_TYPE_UDP || o->proto == FIO_TYPE_TCP)
599 return fio_netio_setup_connect_inet(td, td->o.filename,o->port);
601 return fio_netio_setup_connect_unix(td, td->o.filename);
604 static int fio_netio_setup_listen_unix(struct thread_data *td, const char *path)
606 struct netio_data *nd = td->io_ops->data;
607 struct sockaddr_un *addr = &nd->addr_un;
611 fd = socket(AF_UNIX, SOCK_STREAM, 0);
613 log_err("fio: socket: %s\n", strerror(errno));
619 memset(addr, 0, sizeof(*addr));
620 addr->sun_family = AF_UNIX;
621 strcpy(addr->sun_path, path);
624 len = sizeof(addr->sun_family) + strlen(path) + 1;
626 if (bind(fd, (struct sockaddr *) addr, len) < 0) {
627 log_err("fio: bind: %s\n", strerror(errno));
637 static int fio_netio_setup_listen_inet(struct thread_data *td, short port)
639 struct netio_data *nd = td->io_ops->data;
640 struct netio_options *o = td->eo;
643 if (o->proto == FIO_TYPE_TCP)
648 fd = socket(AF_INET, type, 0);
650 td_verror(td, errno, "socket");
655 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
656 td_verror(td, errno, "setsockopt");
660 if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) < 0) {
661 td_verror(td, errno, "setsockopt");
666 nd->addr.sin_family = AF_INET;
667 nd->addr.sin_addr.s_addr = htonl(INADDR_ANY);
668 nd->addr.sin_port = htons(port);
670 if (bind(fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
671 td_verror(td, errno, "bind");
679 static int fio_netio_setup_listen(struct thread_data *td)
681 struct netio_data *nd = td->io_ops->data;
682 struct netio_options *o = td->eo;
685 if (o->proto == FIO_TYPE_UDP || o->proto == FIO_TYPE_TCP)
686 ret = fio_netio_setup_listen_inet(td, o->port);
688 ret = fio_netio_setup_listen_unix(td, td->o.filename);
692 if (o->proto == FIO_TYPE_UDP)
695 if (listen(nd->listenfd, 10) < 0) {
696 td_verror(td, errno, "listen");
704 static int fio_netio_init(struct thread_data *td)
706 struct netio_options *o = td->eo;
711 WSAStartup(MAKEWORD(2,2), &wsd);
715 log_err("fio: network IO can't be random\n");
719 if (o->proto == FIO_TYPE_UNIX && o->port) {
720 log_err("fio: network IO port not valid with unix socket\n");
722 } else if (o->proto != FIO_TYPE_UNIX && !o->port) {
723 log_err("fio: network IO requires port for tcp or udp\n");
727 if (o->proto != FIO_TYPE_TCP) {
729 log_err("fio: listen only valid for TCP proto IO\n");
733 log_err("fio: datagram network connections must be"
737 if (o->proto == FIO_TYPE_UNIX && !td->o.filename) {
738 log_err("fio: UNIX sockets need host/filename\n");
741 o->listen = td_read(td);
744 if (o->proto != FIO_TYPE_UNIX && o->listen && td->o.filename) {
745 log_err("fio: hostname not valid for inbound network IO\n");
750 ret = fio_netio_setup_listen(td);
752 ret = fio_netio_setup_connect(td);
757 static void fio_netio_cleanup(struct thread_data *td)
759 struct netio_data *nd = td->io_ops->data;
762 if (nd->listenfd != -1)
764 if (nd->pipes[0] != -1)
766 if (nd->pipes[1] != -1)
773 static int fio_netio_setup(struct thread_data *td)
775 struct netio_data *nd;
777 if (!td->files_index) {
778 add_file(td, td->o.filename ?: "net");
779 td->o.nr_files = td->o.nr_files ?: 1;
782 if (!td->io_ops->data) {
783 nd = malloc(sizeof(*nd));;
785 memset(nd, 0, sizeof(*nd));
787 nd->pipes[0] = nd->pipes[1] = -1;
788 td->io_ops->data = nd;
794 #ifdef FIO_HAVE_SPLICE
795 static int fio_netio_setup_splice(struct thread_data *td)
797 struct netio_data *nd;
801 nd = td->io_ops->data;
803 if (pipe(nd->pipes) < 0)
813 static struct ioengine_ops ioengine_splice = {
815 .version = FIO_IOOPS_VERSION,
816 .prep = fio_netio_prep,
817 .queue = fio_netio_queue,
818 .setup = fio_netio_setup_splice,
819 .init = fio_netio_init,
820 .cleanup = fio_netio_cleanup,
821 .open_file = fio_netio_open_file,
822 .close_file = generic_close_file,
824 .option_struct_size = sizeof(struct netio_options),
825 .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
826 FIO_SIGTERM | FIO_PIPEIO,
830 static struct ioengine_ops ioengine_rw = {
832 .version = FIO_IOOPS_VERSION,
833 .prep = fio_netio_prep,
834 .queue = fio_netio_queue,
835 .setup = fio_netio_setup,
836 .init = fio_netio_init,
837 .cleanup = fio_netio_cleanup,
838 .open_file = fio_netio_open_file,
839 .close_file = fio_netio_close_file,
841 .option_struct_size = sizeof(struct netio_options),
842 .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
843 FIO_SIGTERM | FIO_PIPEIO,
846 static int str_hostname_cb(void *data, const char *input)
848 struct netio_options *o = data;
850 if (o->td->o.filename)
851 free(o->td->o.filename);
852 o->td->o.filename = strdup(input);
856 static void fio_init fio_netio_register(void)
858 register_ioengine(&ioengine_rw);
859 #ifdef FIO_HAVE_SPLICE
860 register_ioengine(&ioengine_splice);
864 static void fio_exit fio_netio_unregister(void)
866 unregister_ioengine(&ioengine_rw);
867 #ifdef FIO_HAVE_SPLICE
868 unregister_ioengine(&ioengine_splice);