2 * Transfer data over the net.
9 #include <netinet/in.h>
10 #include <arpa/inet.h>
21 struct sockaddr_in addr;
24 static int fio_netio_prep(struct thread_data *td, struct io_u *io_u)
26 struct netio_data *nd = td->io_ops->data;
27 struct fio_file *f = io_u->file;
30 * Make sure we don't see spurious reads to a receiver, and vice versa
32 if ((nd->send_to_net && io_u->ddir == DDIR_READ) ||
33 (!nd->send_to_net && io_u->ddir == DDIR_WRITE)) {
34 td_verror(td, EINVAL, "bad direction");
38 if (io_u->ddir == DDIR_SYNC)
40 if (io_u->offset == f->last_completed_pos)
44 * If offset is different from last end position, it's a seek.
45 * As network io is purely sequential, we don't allow seeks.
47 td_verror(td, EINVAL, "cannot seek");
51 static int fio_netio_queue(struct thread_data *td, struct io_u *io_u)
53 struct fio_file *f = io_u->file;
56 if (io_u->ddir == DDIR_WRITE) {
58 * if we are going to write more, set MSG_MORE
60 if (td->this_io_bytes[DDIR_WRITE] + io_u->xfer_buflen <
64 ret = send(f->fd, io_u->xfer_buf, io_u->xfer_buflen, flags);
65 } else if (io_u->ddir == DDIR_READ) {
67 ret = recv(f->fd, io_u->xfer_buf, io_u->xfer_buflen, flags);
69 ret = 0; /* must be a SYNC */
71 if (ret != (int) io_u->xfer_buflen) {
73 io_u->resid = io_u->xfer_buflen - ret;
75 return FIO_Q_COMPLETED;
81 td_verror(td, io_u->error, "xfer");
83 return FIO_Q_COMPLETED;
86 static int fio_netio_connect(struct thread_data *td, struct fio_file *f)
88 struct netio_data *nd = td->io_ops->data;
90 f->fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
92 td_verror(td, errno, "socket");
96 if (connect(f->fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
97 td_verror(td, errno, "connect");
104 static int fio_netio_accept(struct thread_data *td, struct fio_file *f)
106 struct netio_data *nd = td->io_ops->data;
107 socklen_t socklen = sizeof(nd->addr);
111 fprintf(f_out, "fio: waiting for connection\n");
114 * Accept loop. poll for incoming events, accept them. Repeat until we
115 * have all connections.
117 while (!td->terminate) {
118 pfd.fd = nd->listenfd;
121 ret = poll(&pfd, 1, -1);
126 td_verror(td, errno, "poll");
132 * should be impossible
134 if (!(pfd.revents & POLLIN))
137 f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr, &socklen);
139 td_verror(td, errno, "accept");
149 static int fio_netio_open_file(struct thread_data *td, struct fio_file *f)
152 return fio_netio_accept(td, f);
154 return fio_netio_connect(td, f);
157 static int fio_netio_setup_connect(struct thread_data *td, const char *host,
160 struct netio_data *nd = td->io_ops->data;
162 nd->addr.sin_family = AF_INET;
163 nd->addr.sin_port = htons(port);
165 if (inet_aton(host, &nd->addr.sin_addr) != 1) {
166 struct hostent *hent;
168 hent = gethostbyname(host);
170 td_verror(td, errno, "gethostbyname");
174 memcpy(&nd->addr.sin_addr, hent->h_addr, 4);
180 static int fio_netio_setup_listen(struct thread_data *td, short port)
182 struct netio_data *nd = td->io_ops->data;
185 fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
187 td_verror(td, errno, "socket");
192 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
193 td_verror(td, errno, "setsockopt");
197 if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) < 0) {
198 td_verror(td, errno, "setsockopt");
203 nd->addr.sin_family = AF_INET;
204 nd->addr.sin_addr.s_addr = htonl(INADDR_ANY);
205 nd->addr.sin_port = htons(port);
207 if (bind(fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
208 td_verror(td, errno, "bind");
211 if (listen(fd, 1) < 0) {
212 td_verror(td, errno, "listen");
220 static int fio_netio_init(struct thread_data *td)
222 struct netio_data *nd = td->io_ops->data;
225 char host[64], buf[128];
230 if (!td->total_file_size) {
231 log_err("fio: need size= set\n");
236 log_err("fio: network connections must be read OR write\n");
240 strcpy(buf, td->filename);
242 sep = strchr(buf, '/');
244 log_err("fio: bad network host/port <<%s>>\n", td->filename);
255 ret = fio_netio_setup_listen(td, port);
258 ret = fio_netio_setup_connect(td, host, port);
264 td->io_size = td->total_file_size;
265 td->total_io_size = td->io_size;
267 for_each_file(td, f, i) {
268 f->file_size = td->total_file_size / td->nr_files;
269 f->real_file_size = f->file_size;
275 static void fio_netio_cleanup(struct thread_data *td)
277 struct netio_data *nd = td->io_ops->data;
281 td->io_ops->data = NULL;
285 static int fio_netio_setup(struct thread_data *td)
287 struct netio_data *nd = malloc(sizeof(*nd));
289 memset(nd, 0, sizeof(*nd));
291 td->io_ops->data = nd;
295 static struct ioengine_ops ioengine = {
297 .version = FIO_IOOPS_VERSION,
298 .prep = fio_netio_prep,
299 .queue = fio_netio_queue,
300 .setup = fio_netio_setup,
301 .init = fio_netio_init,
302 .cleanup = fio_netio_cleanup,
303 .open_file = fio_netio_open_file,
304 .close_file = generic_close_file,
305 .flags = FIO_SYNCIO | FIO_DISKLESSIO,
308 static void fio_init fio_netio_register(void)
310 register_ioengine(&ioengine);
313 static void fio_exit fio_netio_unregister(void)
315 unregister_ioengine(&ioengine);