4 * IO engine that reads/writes to/from sockets.
12 #include <netinet/in.h>
13 #include <arpa/inet.h>
16 #include <sys/types.h>
17 #include <sys/socket.h>
27 struct sockaddr_in addr;
30 static int fio_netio_prep(struct thread_data *td, struct io_u *io_u)
32 struct netio_data *nd = td->io_ops->data;
35 * Make sure we don't see spurious reads to a receiver, and vice versa
37 if ((nd->send_to_net && io_u->ddir == DDIR_READ) ||
38 (!nd->send_to_net && io_u->ddir == DDIR_WRITE)) {
39 td_verror(td, EINVAL, "bad direction");
46 #ifdef FIO_HAVE_SPLICE
47 static int splice_io_u(int fdin, int fdout, unsigned int len)
52 int ret = splice(fdin, NULL, fdout, NULL, len, 0);
70 * Receive bytes from a socket and fill them into the internal pipe
72 static int splice_in(struct thread_data *td, struct io_u *io_u)
74 struct netio_data *nd = td->io_ops->data;
76 return splice_io_u(io_u->file->fd, nd->pipes[1], io_u->xfer_buflen);
80 * Transmit 'len' bytes from the internal pipe
82 static int splice_out(struct thread_data *td, struct io_u *io_u,
85 struct netio_data *nd = td->io_ops->data;
87 return splice_io_u(nd->pipes[0], io_u->file->fd, len);
90 static int vmsplice_io_u(struct io_u *io_u, int fd, unsigned int len)
93 .iov_base = io_u->xfer_buf,
99 int ret = vmsplice(fd, &iov, 1, SPLICE_F_MOVE);
118 * vmsplice() pipe to io_u buffer
120 static int vmsplice_io_u_out(struct thread_data *td, struct io_u *io_u,
123 struct netio_data *nd = td->io_ops->data;
125 return vmsplice_io_u(io_u, nd->pipes[0], len);
129 * vmsplice() io_u to pipe
131 static int vmsplice_io_u_in(struct thread_data *td, struct io_u *io_u)
133 struct netio_data *nd = td->io_ops->data;
135 return vmsplice_io_u(io_u, nd->pipes[1], io_u->xfer_buflen);
139 * splice receive - transfer socket data into a pipe using splice, then map
140 * that pipe data into the io_u using vmsplice.
142 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
146 ret = splice_in(td, io_u);
148 return vmsplice_io_u_out(td, io_u, ret);
154 * splice transmit - map data from the io_u into a pipe by using vmsplice,
155 * then transfer that pipe to a socket using splice.
157 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
161 ret = vmsplice_io_u_in(td, io_u);
163 return splice_out(td, io_u, ret);
168 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
174 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
181 static int fio_netio_send(struct thread_data *td, struct io_u *io_u)
186 * if we are going to write more, set MSG_MORE
189 if (td->this_io_bytes[DDIR_WRITE] + io_u->xfer_buflen < td->o.size)
193 return send(io_u->file->fd, io_u->xfer_buf, io_u->xfer_buflen, flags);
196 static int fio_netio_recv(struct io_u *io_u)
198 int flags = MSG_WAITALL;
200 return recv(io_u->file->fd, io_u->xfer_buf, io_u->xfer_buflen, flags);
203 static int fio_netio_queue(struct thread_data *td, struct io_u *io_u)
205 struct netio_data *nd = td->io_ops->data;
208 fio_ro_check(td, io_u);
210 if (io_u->ddir == DDIR_WRITE) {
212 ret = fio_netio_splice_out(td, io_u);
214 ret = fio_netio_send(td, io_u);
215 } else if (io_u->ddir == DDIR_READ) {
217 ret = fio_netio_splice_in(td, io_u);
219 ret = fio_netio_recv(io_u);
221 ret = 0; /* must be a SYNC */
223 if (ret != (int) io_u->xfer_buflen) {
225 io_u->resid = io_u->xfer_buflen - ret;
227 return FIO_Q_COMPLETED;
233 td_verror(td, io_u->error, "xfer");
235 return FIO_Q_COMPLETED;
238 static int fio_netio_connect(struct thread_data *td, struct fio_file *f)
240 struct netio_data *nd = td->io_ops->data;
242 f->fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
244 td_verror(td, errno, "socket");
248 if (connect(f->fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
249 td_verror(td, errno, "connect");
256 static int fio_netio_accept(struct thread_data *td, struct fio_file *f)
258 struct netio_data *nd = td->io_ops->data;
259 socklen_t socklen = sizeof(nd->addr);
263 log_info("fio: waiting for connection\n");
266 * Accept loop. poll for incoming events, accept them. Repeat until we
267 * have all connections.
269 while (!td->terminate) {
270 pfd.fd = nd->listenfd;
273 ret = poll(&pfd, 1, -1);
278 td_verror(td, errno, "poll");
284 * should be impossible
286 if (!(pfd.revents & POLLIN))
289 f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr, &socklen);
291 td_verror(td, errno, "accept");
300 static int fio_netio_open_file(struct thread_data *td, struct fio_file *f)
303 return fio_netio_accept(td, f);
305 return fio_netio_connect(td, f);
308 static int fio_netio_setup_connect(struct thread_data *td, const char *host,
311 struct netio_data *nd = td->io_ops->data;
313 nd->addr.sin_family = AF_INET;
314 nd->addr.sin_port = htons(port);
316 if (inet_aton(host, &nd->addr.sin_addr) != 1) {
317 struct hostent *hent;
319 hent = gethostbyname(host);
321 td_verror(td, errno, "gethostbyname");
325 memcpy(&nd->addr.sin_addr, hent->h_addr, 4);
331 static int fio_netio_setup_listen(struct thread_data *td, short port)
333 struct netio_data *nd = td->io_ops->data;
336 fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
338 td_verror(td, errno, "socket");
343 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
344 td_verror(td, errno, "setsockopt");
348 if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) < 0) {
349 td_verror(td, errno, "setsockopt");
354 nd->addr.sin_family = AF_INET;
355 nd->addr.sin_addr.s_addr = htonl(INADDR_ANY);
356 nd->addr.sin_port = htons(port);
358 if (bind(fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
359 td_verror(td, errno, "bind");
362 if (listen(fd, 1) < 0) {
363 td_verror(td, errno, "listen");
371 static int fio_netio_init(struct thread_data *td)
373 struct netio_data *nd = td->io_ops->data;
375 char host[64], buf[128];
380 log_err("fio: network connections must be read OR write\n");
384 log_err("fio: network IO can't be random\n");
388 strcpy(buf, td->o.filename);
390 sep = strchr(buf, '/');
400 port = strtol(sep, NULL, 10);
401 if (!port || port > 65535)
406 ret = fio_netio_setup_listen(td, port);
409 ret = fio_netio_setup_connect(td, host, port);
414 log_err("fio: bad network host/port: %s\n", td->o.filename);
418 static void fio_netio_cleanup(struct thread_data *td)
420 struct netio_data *nd = td->io_ops->data;
423 if (nd->listenfd != -1)
425 if (nd->pipes[0] != -1)
427 if (nd->pipes[1] != -1)
434 static int fio_netio_setup(struct thread_data *td)
436 struct netio_data *nd;
438 if (!td->io_ops->data) {
439 nd = malloc(sizeof(*nd));;
441 memset(nd, 0, sizeof(*nd));
443 nd->pipes[0] = nd->pipes[1] = -1;
444 td->io_ops->data = nd;
450 #ifdef FIO_HAVE_SPLICE
451 static int fio_netio_setup_splice(struct thread_data *td)
453 struct netio_data *nd;
457 nd = td->io_ops->data;
459 if (pipe(nd->pipes) < 0)
469 static struct ioengine_ops ioengine_splice = {
471 .version = FIO_IOOPS_VERSION,
472 .prep = fio_netio_prep,
473 .queue = fio_netio_queue,
474 .setup = fio_netio_setup_splice,
475 .init = fio_netio_init,
476 .cleanup = fio_netio_cleanup,
477 .open_file = fio_netio_open_file,
478 .close_file = generic_close_file,
479 .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
484 static struct ioengine_ops ioengine_rw = {
486 .version = FIO_IOOPS_VERSION,
487 .prep = fio_netio_prep,
488 .queue = fio_netio_queue,
489 .setup = fio_netio_setup,
490 .init = fio_netio_init,
491 .cleanup = fio_netio_cleanup,
492 .open_file = fio_netio_open_file,
493 .close_file = generic_close_file,
494 .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
498 static void fio_init fio_netio_register(void)
500 register_ioengine(&ioengine_rw);
501 #ifdef FIO_HAVE_SPLICE
502 register_ioengine(&ioengine_splice);
506 static void fio_exit fio_netio_unregister(void)
508 unregister_ioengine(&ioengine_rw);
509 #ifdef FIO_HAVE_SPLICE
510 unregister_ioengine(&ioengine_splice);