4 * IO engine that reads/writes to/from sockets.
12 #include <netinet/in.h>
13 #include <arpa/inet.h>
23 struct sockaddr_in addr;
26 static int fio_netio_prep(struct thread_data *td, struct io_u *io_u)
28 struct netio_data *nd = td->io_ops->data;
29 struct fio_file *f = io_u->file;
32 * Make sure we don't see spurious reads to a receiver, and vice versa
34 if ((nd->send_to_net && io_u->ddir == DDIR_READ) ||
35 (!nd->send_to_net && io_u->ddir == DDIR_WRITE)) {
36 td_verror(td, EINVAL, "bad direction");
40 if (io_u->ddir == DDIR_SYNC)
42 if (io_u->offset == f->last_completed_pos)
46 * If offset is different from last end position, it's a seek.
47 * As network io is purely sequential, we don't allow seeks.
49 td_verror(td, EINVAL, "cannot seek");
53 static int fio_netio_queue(struct thread_data *td, struct io_u *io_u)
55 struct fio_file *f = io_u->file;
58 if (io_u->ddir == DDIR_WRITE) {
60 * if we are going to write more, set MSG_MORE
62 if (td->this_io_bytes[DDIR_WRITE] + io_u->xfer_buflen <
66 ret = send(f->fd, io_u->xfer_buf, io_u->xfer_buflen, flags);
67 } else if (io_u->ddir == DDIR_READ) {
69 ret = recv(f->fd, io_u->xfer_buf, io_u->xfer_buflen, flags);
71 ret = 0; /* must be a SYNC */
73 if (ret != (int) io_u->xfer_buflen) {
75 io_u->resid = io_u->xfer_buflen - ret;
77 return FIO_Q_COMPLETED;
83 td_verror(td, io_u->error, "xfer");
85 return FIO_Q_COMPLETED;
88 static int fio_netio_connect(struct thread_data *td, struct fio_file *f)
90 struct netio_data *nd = td->io_ops->data;
92 f->fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
94 td_verror(td, errno, "socket");
98 if (connect(f->fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
99 td_verror(td, errno, "connect");
106 static int fio_netio_accept(struct thread_data *td, struct fio_file *f)
108 struct netio_data *nd = td->io_ops->data;
109 socklen_t socklen = sizeof(nd->addr);
113 log_info("fio: waiting for connection\n");
116 * Accept loop. poll for incoming events, accept them. Repeat until we
117 * have all connections.
119 while (!td->terminate) {
120 pfd.fd = nd->listenfd;
123 ret = poll(&pfd, 1, -1);
128 td_verror(td, errno, "poll");
134 * should be impossible
136 if (!(pfd.revents & POLLIN))
139 f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr, &socklen);
141 td_verror(td, errno, "accept");
151 static int fio_netio_open_file(struct thread_data *td, struct fio_file *f)
154 return fio_netio_accept(td, f);
156 return fio_netio_connect(td, f);
159 static int fio_netio_setup_connect(struct thread_data *td, const char *host,
162 struct netio_data *nd = td->io_ops->data;
164 nd->addr.sin_family = AF_INET;
165 nd->addr.sin_port = htons(port);
167 if (inet_aton(host, &nd->addr.sin_addr) != 1) {
168 struct hostent *hent;
170 hent = gethostbyname(host);
172 td_verror(td, errno, "gethostbyname");
176 memcpy(&nd->addr.sin_addr, hent->h_addr, 4);
182 static int fio_netio_setup_listen(struct thread_data *td, short port)
184 struct netio_data *nd = td->io_ops->data;
187 fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
189 td_verror(td, errno, "socket");
194 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
195 td_verror(td, errno, "setsockopt");
199 if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) < 0) {
200 td_verror(td, errno, "setsockopt");
205 nd->addr.sin_family = AF_INET;
206 nd->addr.sin_addr.s_addr = htonl(INADDR_ANY);
207 nd->addr.sin_port = htons(port);
209 if (bind(fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
210 td_verror(td, errno, "bind");
213 if (listen(fd, 1) < 0) {
214 td_verror(td, errno, "listen");
222 static int fio_netio_init(struct thread_data *td)
224 struct netio_data *nd = td->io_ops->data;
226 char host[64], buf[128];
231 log_err("fio: network connections must be read OR write\n");
235 log_err("fio: network IO can't be random\n");
239 strcpy(buf, td->o.filename);
241 sep = strchr(buf, '/');
243 log_err("fio: bad network host/port <<%s>>\n", td->o.filename);
254 ret = fio_netio_setup_listen(td, port);
257 ret = fio_netio_setup_connect(td, host, port);
263 static void fio_netio_cleanup(struct thread_data *td)
265 struct netio_data *nd = td->io_ops->data;
269 td->io_ops->data = NULL;
273 static int fio_netio_setup(struct thread_data *td)
275 struct netio_data *nd;
277 if (!td->io_ops->data) {
278 nd = malloc(sizeof(*nd));;
280 memset(nd, 0, sizeof(*nd));
282 td->io_ops->data = nd;
288 static struct ioengine_ops ioengine = {
290 .version = FIO_IOOPS_VERSION,
291 .prep = fio_netio_prep,
292 .queue = fio_netio_queue,
293 .setup = fio_netio_setup,
294 .init = fio_netio_init,
295 .cleanup = fio_netio_cleanup,
296 .open_file = fio_netio_open_file,
297 .close_file = generic_close_file,
298 .flags = FIO_SYNCIO | FIO_DISKLESSIO,
301 static void fio_init fio_netio_register(void)
303 register_ioengine(&ioengine);
306 static void fio_exit fio_netio_unregister(void)
308 unregister_ioengine(&ioengine);