Fix bug with random IO and network connections
[fio.git] / engines / net.c
CommitLineData
ed92ac0c 1/*
da751ca9
JA
2 * net engine
3 *
4 * IO engine that reads/writes to/from sockets.
5 *
ed92ac0c
JA
6 */
7#include <stdio.h>
8#include <stdlib.h>
9#include <unistd.h>
10#include <errno.h>
11#include <assert.h>
12#include <netinet/in.h>
13#include <arpa/inet.h>
14#include <netdb.h>
5fdd124a 15#include <sys/poll.h>
ed92ac0c
JA
16
17#include "../fio.h"
ed92ac0c 18
b5af8293
JA
19struct netio_data {
20 int listenfd;
21 int send_to_net;
22 char host[64];
23 struct sockaddr_in addr;
24};
ed92ac0c
JA
25
26static int fio_netio_prep(struct thread_data *td, struct io_u *io_u)
27{
b5af8293 28 struct netio_data *nd = td->io_ops->data;
ed92ac0c
JA
29 struct fio_file *f = io_u->file;
30
7a6499da
JA
31 /*
32 * Make sure we don't see spurious reads to a receiver, and vice versa
33 */
b5af8293
JA
34 if ((nd->send_to_net && io_u->ddir == DDIR_READ) ||
35 (!nd->send_to_net && io_u->ddir == DDIR_WRITE)) {
e1161c32 36 td_verror(td, EINVAL, "bad direction");
7a6499da 37 return 1;
ed92ac0c 38 }
7a6499da 39
ed92ac0c
JA
40 if (io_u->ddir == DDIR_SYNC)
41 return 0;
42 if (io_u->offset == f->last_completed_pos)
43 return 0;
44
e01547d2
JA
45 /*
46 * If offset is different from last end position, it's a seek.
47 * As network io is purely sequential, we don't allow seeks.
48 */
e1161c32 49 td_verror(td, EINVAL, "cannot seek");
ed92ac0c
JA
50 return 1;
51}
52
53static int fio_netio_queue(struct thread_data *td, struct io_u *io_u)
54{
ed92ac0c 55 struct fio_file *f = io_u->file;
d4f12dd0 56 int ret, flags = 0;
7a6499da
JA
57
58 if (io_u->ddir == DDIR_WRITE) {
7a6499da
JA
59 /*
60 * if we are going to write more, set MSG_MORE
61 */
62 if (td->this_io_bytes[DDIR_WRITE] + io_u->xfer_buflen <
7bb48f84 63 td->o.size)
7a6499da 64 flags = MSG_MORE;
ed92ac0c 65
7a6499da 66 ret = send(f->fd, io_u->xfer_buf, io_u->xfer_buflen, flags);
d4f12dd0
JA
67 } else if (io_u->ddir == DDIR_READ) {
68 flags = MSG_WAITALL;
69 ret = recv(f->fd, io_u->xfer_buf, io_u->xfer_buflen, flags);
70 } else
7a6499da 71 ret = 0; /* must be a SYNC */
ed92ac0c 72
cec6b55d 73 if (ret != (int) io_u->xfer_buflen) {
22819ec2 74 if (ret >= 0) {
cec6b55d
JA
75 io_u->resid = io_u->xfer_buflen - ret;
76 io_u->error = 0;
36167d82 77 return FIO_Q_COMPLETED;
ed92ac0c
JA
78 } else
79 io_u->error = errno;
80 }
81
36167d82 82 if (io_u->error)
e1161c32 83 td_verror(td, io_u->error, "xfer");
ed92ac0c 84
36167d82 85 return FIO_Q_COMPLETED;
ed92ac0c
JA
86}
87
b5af8293 88static int fio_netio_connect(struct thread_data *td, struct fio_file *f)
ed92ac0c 89{
b5af8293 90 struct netio_data *nd = td->io_ops->data;
ed92ac0c 91
b5af8293
JA
92 f->fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
93 if (f->fd < 0) {
94 td_verror(td, errno, "socket");
95 return 1;
ed92ac0c
JA
96 }
97
b5af8293
JA
98 if (connect(f->fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
99 td_verror(td, errno, "connect");
100 return 1;
ed92ac0c
JA
101 }
102
103 return 0;
ed92ac0c
JA
104}
105
b5af8293 106static int fio_netio_accept(struct thread_data *td, struct fio_file *f)
5fdd124a 107{
b5af8293
JA
108 struct netio_data *nd = td->io_ops->data;
109 socklen_t socklen = sizeof(nd->addr);
5fdd124a 110 struct pollfd pfd;
b5af8293 111 int ret;
5fdd124a 112
6d86144d 113 log_info("fio: waiting for connection\n");
5fdd124a
JA
114
115 /*
116 * Accept loop. poll for incoming events, accept them. Repeat until we
117 * have all connections.
118 */
b5af8293
JA
119 while (!td->terminate) {
120 pfd.fd = nd->listenfd;
5fdd124a
JA
121 pfd.events = POLLIN;
122
123 ret = poll(&pfd, 1, -1);
124 if (ret < 0) {
125 if (errno == EINTR)
126 continue;
127
e1161c32 128 td_verror(td, errno, "poll");
5fdd124a
JA
129 break;
130 } else if (!ret)
131 continue;
132
0c09442b
JA
133 /*
134 * should be impossible
135 */
136 if (!(pfd.revents & POLLIN))
137 continue;
138
b5af8293
JA
139 f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr, &socklen);
140 if (f->fd < 0) {
141 td_verror(td, errno, "accept");
142 return 1;
143 }
144 break;
145 }
5fdd124a 146
b5af8293
JA
147 return 0;
148}
149
150
151static int fio_netio_open_file(struct thread_data *td, struct fio_file *f)
152{
153 if (td_read(td))
154 return fio_netio_accept(td, f);
155 else
156 return fio_netio_connect(td, f);
157}
158
159static int fio_netio_setup_connect(struct thread_data *td, const char *host,
160 unsigned short port)
161{
162 struct netio_data *nd = td->io_ops->data;
163
164 nd->addr.sin_family = AF_INET;
165 nd->addr.sin_port = htons(port);
166
167 if (inet_aton(host, &nd->addr.sin_addr) != 1) {
168 struct hostent *hent;
169
170 hent = gethostbyname(host);
171 if (!hent) {
172 td_verror(td, errno, "gethostbyname");
173 return 1;
5fdd124a 174 }
b5af8293
JA
175
176 memcpy(&nd->addr.sin_addr, hent->h_addr, 4);
5fdd124a
JA
177 }
178
179 return 0;
180}
181
b5af8293 182static int fio_netio_setup_listen(struct thread_data *td, short port)
ed92ac0c 183{
b5af8293 184 struct netio_data *nd = td->io_ops->data;
5fdd124a 185 int fd, opt;
ed92ac0c 186
6bedbfaf 187 fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
ed92ac0c 188 if (fd < 0) {
e1161c32 189 td_verror(td, errno, "socket");
ed92ac0c
JA
190 return 1;
191 }
192
193 opt = 1;
194 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
e1161c32 195 td_verror(td, errno, "setsockopt");
ed92ac0c
JA
196 return 1;
197 }
6bedbfaf
JA
198#ifdef SO_REUSEPORT
199 if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) < 0) {
e1161c32 200 td_verror(td, errno, "setsockopt");
6bedbfaf
JA
201 return 1;
202 }
203#endif
ed92ac0c 204
b5af8293
JA
205 nd->addr.sin_family = AF_INET;
206 nd->addr.sin_addr.s_addr = htonl(INADDR_ANY);
207 nd->addr.sin_port = htons(port);
ed92ac0c 208
b5af8293 209 if (bind(fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
e1161c32 210 td_verror(td, errno, "bind");
ed92ac0c
JA
211 return 1;
212 }
213 if (listen(fd, 1) < 0) {
e1161c32 214 td_verror(td, errno, "listen");
ed92ac0c
JA
215 return 1;
216 }
217
b5af8293
JA
218 nd->listenfd = fd;
219 return 0;
ed92ac0c
JA
220}
221
9bec88e1 222static int fio_netio_init(struct thread_data *td)
ed92ac0c 223{
b5af8293 224 struct netio_data *nd = td->io_ops->data;
e01547d2 225 unsigned short port;
b5af8293 226 char host[64], buf[128];
ed92ac0c 227 char *sep;
af52b345 228 int ret;
ed92ac0c 229
413dd459 230 if (td_rw(td)) {
ed92ac0c
JA
231 log_err("fio: network connections must be read OR write\n");
232 return 1;
233 }
16d55aae
JA
234 if (td_random(td)) {
235 log_err("fio: network IO can't be random\n");
236 return 1;
237 }
ed92ac0c 238
2dc1bbeb 239 strcpy(buf, td->o.filename);
ed92ac0c 240
9f9214f2 241 sep = strchr(buf, '/');
ed92ac0c 242 if (!sep) {
2dc1bbeb 243 log_err("fio: bad network host/port <<%s>>\n", td->o.filename);
ed92ac0c
JA
244 return 1;
245 }
246
247 *sep = '\0';
248 sep++;
249 strcpy(host, buf);
e01547d2 250 port = atoi(sep);
ed92ac0c 251
413dd459 252 if (td_read(td)) {
b5af8293 253 nd->send_to_net = 0;
ed92ac0c
JA
254 ret = fio_netio_setup_listen(td, port);
255 } else {
b5af8293 256 nd->send_to_net = 1;
ed92ac0c
JA
257 ret = fio_netio_setup_connect(td, host, port);
258 }
259
7bb48f84 260 return ret;
ed92ac0c
JA
261}
262
b5af8293 263static void fio_netio_cleanup(struct thread_data *td)
9bec88e1 264{
b5af8293
JA
265 struct netio_data *nd = td->io_ops->data;
266
267 if (nd) {
268 free(nd);
269 td->io_ops->data = NULL;
270 }
271}
272
273static int fio_netio_setup(struct thread_data *td)
274{
7bb48f84 275 struct netio_data *nd;
7bb48f84
JA
276
277 if (!td->io_ops->data) {
278 nd = malloc(sizeof(*nd));;
279
280 memset(nd, 0, sizeof(*nd));
281 nd->listenfd = -1;
282 td->io_ops->data = nd;
7bb48f84 283 }
b5af8293 284
9bec88e1
JA
285 return 0;
286}
287
ed92ac0c
JA
288static struct ioengine_ops ioengine = {
289 .name = "net",
290 .version = FIO_IOOPS_VERSION,
ed92ac0c
JA
291 .prep = fio_netio_prep,
292 .queue = fio_netio_queue,
ed92ac0c 293 .setup = fio_netio_setup,
9bec88e1 294 .init = fio_netio_init,
b5af8293
JA
295 .cleanup = fio_netio_cleanup,
296 .open_file = fio_netio_open_file,
297 .close_file = generic_close_file,
298 .flags = FIO_SYNCIO | FIO_DISKLESSIO,
ed92ac0c
JA
299};
300
301static void fio_init fio_netio_register(void)
302{
303 register_ioengine(&ioengine);
304}
305
306static void fio_exit fio_netio_unregister(void)
307{
308 unregister_ioengine(&ioengine);
309}