static int fio_netio_prep(struct thread_data *td, struct io_u *io_u)
{
struct netio_data *nd = td->io_ops->data;
- struct fio_file *f = io_u->file;
/*
* Make sure we don't see spurious reads to a receiver, and vice versa
return 1;
}
- if (io_u->ddir == DDIR_SYNC)
- return 0;
- if (io_u->offset == f->last_completed_pos)
- return 0;
-
- /*
- * If offset is different from last end position, it's a seek.
- * As network io is purely sequential, we don't allow seeks.
- */
- td_verror(td, EINVAL, "cannot seek");
- return 1;
+ return 0;
}
-/*
- * Receive bytes from a socket and fill them into the internal pipe
- */
-static int splice_in(struct thread_data *td, struct io_u *io_u)
+static int splice_io_u(int fdin, int fdout, unsigned int len)
{
- struct netio_data *nd = td->io_ops->data;
- unsigned int len = io_u->xfer_buflen;
- struct fio_file *f = io_u->file;
int bytes = 0;
while (len) {
- int ret = splice(nd->pipes[1], NULL, f->fd, NULL, len, 0);
+ int ret = splice(fdin, NULL, fdout, NULL, len, 0);
if (ret < 0) {
if (!bytes)
break;
bytes += ret;
+ len -= ret;
}
return bytes;
}
+/*
+ * Receive bytes from a socket and fill them into the internal pipe
+ */
+static int splice_in(struct thread_data *td, struct io_u *io_u)
+{
+ struct netio_data *nd = td->io_ops->data;
+
+ return splice_io_u(io_u->file->fd, nd->pipes[1], io_u->xfer_buflen);
+}
+
/*
* Transmit 'len' bytes from the internal pipe
*/
unsigned int len)
{
struct netio_data *nd = td->io_ops->data;
- struct fio_file *f = io_u->file;
+
+ return splice_io_u(nd->pipes[0], io_u->file->fd, len);
+}
+
+static int vmsplice_io_u(struct io_u *io_u, int fd, unsigned int len)
+{
+ struct iovec iov = {
+ .iov_base = io_u->xfer_buf,
+ .iov_len = len,
+ };
int bytes = 0;
- while (len) {
- int ret = splice(nd->pipes[0], NULL, f->fd, NULL, len, 0);
+ while (iov.iov_len) {
+ int ret = vmsplice(fd, &iov, 1, SPLICE_F_MOVE);
if (ret < 0) {
if (!bytes)
bytes = ret;
-
break;
} else if (!ret)
break;
+ iov.iov_len -= ret;
+ iov.iov_base += ret;
bytes += ret;
- len -= ret;
}
return bytes;
+
}
/*
unsigned int len)
{
struct netio_data *nd = td->io_ops->data;
- struct iovec iov = {
- .iov_base = io_u->xfer_buf,
- .iov_len = len,
- };
- int bytes = 0;
-
- while (iov.iov_len) {
- int ret = vmsplice(nd->pipes[0], &iov, 1, 0);
-
- if (ret < 0) {
- if (!bytes)
- bytes = ret;
- break;
- } else if (!ret)
- break;
- iov.iov_len -= ret;
- if (iov.iov_len)
- iov.iov_base += ret;
- }
-
- return bytes;
+ return vmsplice_io_u(io_u, nd->pipes[0], len);
}
/*
static int vmsplice_io_u_in(struct thread_data *td, struct io_u *io_u)
{
struct netio_data *nd = td->io_ops->data;
- struct iovec iov = {
- .iov_base = io_u->xfer_buf,
- .iov_len = io_u->xfer_buflen,
- };
- unsigned int bytes = 0;
-
- while (iov.iov_len) {
- int ret = vmsplice(nd->pipes[1], &iov, 1, 0);
- if (ret < 0)
- return -1;
- else if (!ret)
- return bytes;
-
- iov.iov_len -= ret;
- bytes += ret;
- if (iov.iov_len)
- iov.iov_base += ret;
- }
-
- return bytes;
+ return vmsplice_io_u(io_u, nd->pipes[1], io_u->xfer_buflen);
}
+/*
+ * splice receive - transfer socket data into a pipe using splice, then map
+ * that pipe data into the io_u using vmsplice.
+ */
static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
{
int ret;
ret = splice_in(td, io_u);
- if (ret <= 0)
- return ret;
+ if (ret > 0)
+ return vmsplice_io_u_out(td, io_u, ret);
- return vmsplice_io_u_out(td, io_u, ret);
+ return ret;
}
+/*
+ * splice transmit - map data from the io_u into a pipe by using vmsplice,
+ * then transfer that pipe to a socket using splice.
+ */
static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
{
int ret;
ret = vmsplice_io_u_in(td, io_u);
- if (ret <= 0)
- return ret;
+ if (ret > 0)
+ return splice_out(td, io_u, ret);
- return splice_out(td, io_u, ret);
+ return ret;
}
static int fio_netio_send(struct thread_data *td, struct io_u *io_u)
struct netio_data *nd = td->io_ops->data;
int ret;
+ fio_ro_check(td, io_u);
+
if (io_u->ddir == DDIR_WRITE) {
if (nd->use_splice)
ret = fio_netio_splice_out(td, io_u);
return 0;
}
-
static int fio_netio_open_file(struct thread_data *td, struct fio_file *f)
{
if (td_read(td))
struct netio_data *nd = td->io_ops->data;
if (nd) {
+ if (nd->listenfd != -1)
+ close(nd->listenfd);
+ if (nd->pipes[0] != -1)
+ close(nd->pipes[0]);
+ if (nd->pipes[1] != -1)
+ close(nd->pipes[1]);
+
free(nd);
td->io_ops->data = NULL;
}
memset(nd, 0, sizeof(*nd));
nd->listenfd = -1;
+ nd->pipes[0] = nd->pipes[1] = -1;
td->io_ops->data = nd;
}
.cleanup = fio_netio_cleanup,
.open_file = fio_netio_open_file,
.close_file = generic_close_file,
- .flags = FIO_SYNCIO | FIO_DISKLESSIO,
+ .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
+ FIO_SIGQUIT,
};
static struct ioengine_ops ioengine_splice = {
.cleanup = fio_netio_cleanup,
.open_file = fio_netio_open_file,
.close_file = generic_close_file,
- .flags = FIO_SYNCIO | FIO_DISKLESSIO,
+ .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
+ FIO_SIGQUIT,
};
static void fio_init fio_netio_register(void)