4 * IO engine that reads/writes to/from sockets.
12 #include <netinet/in.h>
13 #include <arpa/inet.h>
16 #include <sys/types.h>
17 #include <sys/socket.h>
28 struct sockaddr_in addr;
32 * Return -1 for error and 'nr events' for a positive number
35 static int poll_wait(struct thread_data *td, int fd, short events)
40 while (!td->terminate) {
43 ret = poll(&pfd, 1, -1);
48 td_verror(td, errno, "poll");
56 if (pfd.revents & events)
58 else if (td->terminate)
64 static int fio_netio_prep(struct thread_data *td, struct io_u *io_u)
66 struct netio_data *nd = td->io_ops->data;
69 * Make sure we don't see spurious reads to a receiver, and vice versa
71 if ((nd->send_to_net && io_u->ddir == DDIR_READ) ||
72 (!nd->send_to_net && io_u->ddir == DDIR_WRITE)) {
73 td_verror(td, EINVAL, "bad direction");
80 #ifdef FIO_HAVE_SPLICE
81 static int splice_io_u(int fdin, int fdout, unsigned int len)
86 int ret = splice(fdin, NULL, fdout, NULL, len, 0);
104 * Receive bytes from a socket and fill them into the internal pipe
106 static int splice_in(struct thread_data *td, struct io_u *io_u)
108 struct netio_data *nd = td->io_ops->data;
110 return splice_io_u(io_u->file->fd, nd->pipes[1], io_u->xfer_buflen);
114 * Transmit 'len' bytes from the internal pipe
116 static int splice_out(struct thread_data *td, struct io_u *io_u,
119 struct netio_data *nd = td->io_ops->data;
121 return splice_io_u(nd->pipes[0], io_u->file->fd, len);
124 static int vmsplice_io_u(struct io_u *io_u, int fd, unsigned int len)
127 .iov_base = io_u->xfer_buf,
132 while (iov.iov_len) {
133 int ret = vmsplice(fd, &iov, 1, SPLICE_F_MOVE);
152 * vmsplice() pipe to io_u buffer
154 static int vmsplice_io_u_out(struct thread_data *td, struct io_u *io_u,
157 struct netio_data *nd = td->io_ops->data;
159 return vmsplice_io_u(io_u, nd->pipes[0], len);
163 * vmsplice() io_u to pipe
165 static int vmsplice_io_u_in(struct thread_data *td, struct io_u *io_u)
167 struct netio_data *nd = td->io_ops->data;
169 return vmsplice_io_u(io_u, nd->pipes[1], io_u->xfer_buflen);
173 * splice receive - transfer socket data into a pipe using splice, then map
174 * that pipe data into the io_u using vmsplice.
176 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
180 ret = splice_in(td, io_u);
182 return vmsplice_io_u_out(td, io_u, ret);
188 * splice transmit - map data from the io_u into a pipe by using vmsplice,
189 * then transfer that pipe to a socket using splice.
191 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
195 ret = vmsplice_io_u_in(td, io_u);
197 return splice_out(td, io_u, ret);
202 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
208 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
215 static int fio_netio_send(struct thread_data *td, struct io_u *io_u)
217 struct netio_data *nd = td->io_ops->data;
220 ret = poll_wait(td, io_u->file->fd, POLLOUT);
225 * if we are going to write more, set MSG_MORE
228 if (td->this_io_bytes[DDIR_WRITE] + io_u->xfer_buflen < td->o.size)
232 if (nd->net_protocol == IPPROTO_UDP) {
233 return sendto(io_u->file->fd, io_u->xfer_buf, io_u->xfer_buflen,
234 0, &nd->addr, sizeof(nd->addr));
236 return send(io_u->file->fd, io_u->xfer_buf, io_u->xfer_buflen,
241 static int fio_netio_recv(struct thread_data *td, struct io_u *io_u)
243 struct netio_data *nd = td->io_ops->data;
244 int ret, flags = MSG_WAITALL;
246 ret = poll_wait(td, io_u->file->fd, POLLIN);
250 if (nd->net_protocol == IPPROTO_UDP) {
251 socklen_t len = sizeof(nd->addr);
253 return recvfrom(io_u->file->fd, io_u->xfer_buf,
254 io_u->xfer_buflen, 0, &nd->addr, &len);
256 return recv(io_u->file->fd, io_u->xfer_buf, io_u->xfer_buflen,
261 static int fio_netio_queue(struct thread_data *td, struct io_u *io_u)
263 struct netio_data *nd = td->io_ops->data;
266 fio_ro_check(td, io_u);
268 if (io_u->ddir == DDIR_WRITE) {
269 if (!nd->use_splice || nd->net_protocol == IPPROTO_UDP)
270 ret = fio_netio_send(td, io_u);
272 ret = fio_netio_splice_out(td, io_u);
273 } else if (io_u->ddir == DDIR_READ) {
274 if (!nd->use_splice || nd->net_protocol == IPPROTO_UDP)
275 ret = fio_netio_recv(td, io_u);
277 ret = fio_netio_splice_in(td, io_u);
279 ret = 0; /* must be a SYNC */
281 if (ret != (int) io_u->xfer_buflen) {
283 io_u->resid = io_u->xfer_buflen - ret;
285 return FIO_Q_COMPLETED;
289 if (io_u->ddir == DDIR_WRITE && err == EMSGSIZE)
297 td_verror(td, io_u->error, "xfer");
299 return FIO_Q_COMPLETED;
302 static int fio_netio_connect(struct thread_data *td, struct fio_file *f)
304 struct netio_data *nd = td->io_ops->data;
307 if (nd->net_protocol == IPPROTO_TCP)
312 f->fd = socket(AF_INET, type, nd->net_protocol);
314 td_verror(td, errno, "socket");
318 if (nd->net_protocol == IPPROTO_UDP)
321 if (connect(f->fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
322 td_verror(td, errno, "connect");
329 static int fio_netio_accept(struct thread_data *td, struct fio_file *f)
331 struct netio_data *nd = td->io_ops->data;
332 socklen_t socklen = sizeof(nd->addr);
334 if (nd->net_protocol == IPPROTO_UDP) {
335 f->fd = nd->listenfd;
339 log_info("fio: waiting for connection\n");
341 if (poll_wait(td, nd->listenfd, POLLIN) < 0)
344 f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr, &socklen);
346 td_verror(td, errno, "accept");
353 static int fio_netio_open_file(struct thread_data *td, struct fio_file *f)
356 return fio_netio_accept(td, f);
358 return fio_netio_connect(td, f);
361 static int fio_netio_setup_connect(struct thread_data *td, const char *host,
364 struct netio_data *nd = td->io_ops->data;
366 nd->addr.sin_family = AF_INET;
367 nd->addr.sin_port = htons(port);
369 if (inet_aton(host, &nd->addr.sin_addr) != 1) {
370 struct hostent *hent;
372 hent = gethostbyname(host);
374 td_verror(td, errno, "gethostbyname");
378 memcpy(&nd->addr.sin_addr, hent->h_addr, 4);
384 static int fio_netio_setup_listen(struct thread_data *td, short port)
386 struct netio_data *nd = td->io_ops->data;
389 if (nd->net_protocol == IPPROTO_TCP)
394 fd = socket(AF_INET, type, nd->net_protocol);
396 td_verror(td, errno, "socket");
401 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
402 td_verror(td, errno, "setsockopt");
406 if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) < 0) {
407 td_verror(td, errno, "setsockopt");
412 nd->addr.sin_family = AF_INET;
413 nd->addr.sin_addr.s_addr = htonl(INADDR_ANY);
414 nd->addr.sin_port = htons(port);
416 if (bind(fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
417 td_verror(td, errno, "bind");
420 if (nd->net_protocol == IPPROTO_TCP && listen(fd, 1) < 0) {
421 td_verror(td, errno, "listen");
429 static int fio_netio_init(struct thread_data *td)
431 struct netio_data *nd = td->io_ops->data;
433 char host[64], buf[128];
434 char *sep, *portp, *modep;
438 log_err("fio: network connections must be read OR write\n");
442 log_err("fio: network IO can't be random\n");
446 strcpy(buf, td->o.filename);
448 sep = strchr(buf, '/');
460 sep = strchr(portp, '/');
466 port = strtol(portp, NULL, 10);
467 if (!port || port > 65535)
471 if (!strncmp("tcp", modep, strlen(modep)) ||
472 !strncmp("TCP", modep, strlen(modep)))
473 nd->net_protocol = IPPROTO_TCP;
474 else if (!strncmp("udp", modep, strlen(modep)) ||
475 !strncmp("UDP", modep, strlen(modep)))
476 nd->net_protocol = IPPROTO_UDP;
480 nd->net_protocol = IPPROTO_TCP;
484 ret = fio_netio_setup_listen(td, port);
487 ret = fio_netio_setup_connect(td, host, port);
492 log_err("fio: bad network host/port/protocol: %s\n", td->o.filename);
496 static void fio_netio_cleanup(struct thread_data *td)
498 struct netio_data *nd = td->io_ops->data;
501 if (nd->listenfd != -1)
503 if (nd->pipes[0] != -1)
505 if (nd->pipes[1] != -1)
512 static int fio_netio_setup(struct thread_data *td)
514 struct netio_data *nd;
516 if (!td->io_ops->data) {
517 nd = malloc(sizeof(*nd));;
519 memset(nd, 0, sizeof(*nd));
521 nd->pipes[0] = nd->pipes[1] = -1;
522 td->io_ops->data = nd;
528 #ifdef FIO_HAVE_SPLICE
529 static int fio_netio_setup_splice(struct thread_data *td)
531 struct netio_data *nd;
535 nd = td->io_ops->data;
537 if (pipe(nd->pipes) < 0)
547 static struct ioengine_ops ioengine_splice = {
549 .version = FIO_IOOPS_VERSION,
550 .prep = fio_netio_prep,
551 .queue = fio_netio_queue,
552 .setup = fio_netio_setup_splice,
553 .init = fio_netio_init,
554 .cleanup = fio_netio_cleanup,
555 .open_file = fio_netio_open_file,
556 .close_file = generic_close_file,
557 .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
562 static struct ioengine_ops ioengine_rw = {
564 .version = FIO_IOOPS_VERSION,
565 .prep = fio_netio_prep,
566 .queue = fio_netio_queue,
567 .setup = fio_netio_setup,
568 .init = fio_netio_init,
569 .cleanup = fio_netio_cleanup,
570 .open_file = fio_netio_open_file,
571 .close_file = generic_close_file,
572 .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
576 static void fio_init fio_netio_register(void)
578 register_ioengine(&ioengine_rw);
579 #ifdef FIO_HAVE_SPLICE
580 register_ioengine(&ioengine_splice);
584 static void fio_exit fio_netio_unregister(void)
586 unregister_ioengine(&ioengine_rw);
587 #ifdef FIO_HAVE_SPLICE
588 unregister_ioengine(&ioengine_splice);