Move os/arch/compiler headers into directories
[fio.git] / engines / net.c
... / ...
CommitLineData
1/*
2 * net engine
3 *
4 * IO engine that reads/writes to/from sockets.
5 *
6 */
7#include <stdio.h>
8#include <stdlib.h>
9#include <unistd.h>
10#include <errno.h>
11#include <assert.h>
12#include <netinet/in.h>
13#include <arpa/inet.h>
14#include <netdb.h>
15#include <sys/poll.h>
16
17#include "../fio.h"
18
19struct netio_data {
20 int listenfd;
21 int send_to_net;
22 char host[64];
23 struct sockaddr_in addr;
24};
25
26static int fio_netio_prep(struct thread_data *td, struct io_u *io_u)
27{
28 struct netio_data *nd = td->io_ops->data;
29 struct fio_file *f = io_u->file;
30
31 /*
32 * Make sure we don't see spurious reads to a receiver, and vice versa
33 */
34 if ((nd->send_to_net && io_u->ddir == DDIR_READ) ||
35 (!nd->send_to_net && io_u->ddir == DDIR_WRITE)) {
36 td_verror(td, EINVAL, "bad direction");
37 return 1;
38 }
39
40 if (io_u->ddir == DDIR_SYNC)
41 return 0;
42 if (io_u->offset == f->last_completed_pos)
43 return 0;
44
45 /*
46 * If offset is different from last end position, it's a seek.
47 * As network io is purely sequential, we don't allow seeks.
48 */
49 td_verror(td, EINVAL, "cannot seek");
50 return 1;
51}
52
53static int fio_netio_queue(struct thread_data *td, struct io_u *io_u)
54{
55 struct fio_file *f = io_u->file;
56 int ret, flags = 0;
57
58 if (io_u->ddir == DDIR_WRITE) {
59 /*
60 * if we are going to write more, set MSG_MORE
61 */
62 if (td->this_io_bytes[DDIR_WRITE] + io_u->xfer_buflen <
63 td->o.size)
64 flags = MSG_MORE;
65
66 ret = send(f->fd, io_u->xfer_buf, io_u->xfer_buflen, flags);
67 } else if (io_u->ddir == DDIR_READ) {
68 flags = MSG_WAITALL;
69 ret = recv(f->fd, io_u->xfer_buf, io_u->xfer_buflen, flags);
70 } else
71 ret = 0; /* must be a SYNC */
72
73 if (ret != (int) io_u->xfer_buflen) {
74 if (ret >= 0) {
75 io_u->resid = io_u->xfer_buflen - ret;
76 io_u->error = 0;
77 return FIO_Q_COMPLETED;
78 } else
79 io_u->error = errno;
80 }
81
82 if (io_u->error)
83 td_verror(td, io_u->error, "xfer");
84
85 return FIO_Q_COMPLETED;
86}
87
88static int fio_netio_connect(struct thread_data *td, struct fio_file *f)
89{
90 struct netio_data *nd = td->io_ops->data;
91
92 f->fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
93 if (f->fd < 0) {
94 td_verror(td, errno, "socket");
95 return 1;
96 }
97
98 if (connect(f->fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
99 td_verror(td, errno, "connect");
100 return 1;
101 }
102
103 return 0;
104}
105
106static int fio_netio_accept(struct thread_data *td, struct fio_file *f)
107{
108 struct netio_data *nd = td->io_ops->data;
109 socklen_t socklen = sizeof(nd->addr);
110 struct pollfd pfd;
111 int ret;
112
113 log_info("fio: waiting for connection\n");
114
115 /*
116 * Accept loop. poll for incoming events, accept them. Repeat until we
117 * have all connections.
118 */
119 while (!td->terminate) {
120 pfd.fd = nd->listenfd;
121 pfd.events = POLLIN;
122
123 ret = poll(&pfd, 1, -1);
124 if (ret < 0) {
125 if (errno == EINTR)
126 continue;
127
128 td_verror(td, errno, "poll");
129 break;
130 } else if (!ret)
131 continue;
132
133 /*
134 * should be impossible
135 */
136 if (!(pfd.revents & POLLIN))
137 continue;
138
139 f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr, &socklen);
140 if (f->fd < 0) {
141 td_verror(td, errno, "accept");
142 return 1;
143 }
144 break;
145 }
146
147 return 0;
148}
149
150
151static int fio_netio_open_file(struct thread_data *td, struct fio_file *f)
152{
153 if (td_read(td))
154 return fio_netio_accept(td, f);
155 else
156 return fio_netio_connect(td, f);
157}
158
159static int fio_netio_setup_connect(struct thread_data *td, const char *host,
160 unsigned short port)
161{
162 struct netio_data *nd = td->io_ops->data;
163
164 nd->addr.sin_family = AF_INET;
165 nd->addr.sin_port = htons(port);
166
167 if (inet_aton(host, &nd->addr.sin_addr) != 1) {
168 struct hostent *hent;
169
170 hent = gethostbyname(host);
171 if (!hent) {
172 td_verror(td, errno, "gethostbyname");
173 return 1;
174 }
175
176 memcpy(&nd->addr.sin_addr, hent->h_addr, 4);
177 }
178
179 return 0;
180}
181
182static int fio_netio_setup_listen(struct thread_data *td, short port)
183{
184 struct netio_data *nd = td->io_ops->data;
185 int fd, opt;
186
187 fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
188 if (fd < 0) {
189 td_verror(td, errno, "socket");
190 return 1;
191 }
192
193 opt = 1;
194 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
195 td_verror(td, errno, "setsockopt");
196 return 1;
197 }
198#ifdef SO_REUSEPORT
199 if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) < 0) {
200 td_verror(td, errno, "setsockopt");
201 return 1;
202 }
203#endif
204
205 nd->addr.sin_family = AF_INET;
206 nd->addr.sin_addr.s_addr = htonl(INADDR_ANY);
207 nd->addr.sin_port = htons(port);
208
209 if (bind(fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
210 td_verror(td, errno, "bind");
211 return 1;
212 }
213 if (listen(fd, 1) < 0) {
214 td_verror(td, errno, "listen");
215 return 1;
216 }
217
218 nd->listenfd = fd;
219 return 0;
220}
221
222static int fio_netio_init(struct thread_data *td)
223{
224 struct netio_data *nd = td->io_ops->data;
225 unsigned short port;
226 char host[64], buf[128];
227 char *sep;
228 int ret;
229
230 if (td_rw(td)) {
231 log_err("fio: network connections must be read OR write\n");
232 return 1;
233 }
234
235 strcpy(buf, td->o.filename);
236
237 sep = strchr(buf, '/');
238 if (!sep) {
239 log_err("fio: bad network host/port <<%s>>\n", td->o.filename);
240 return 1;
241 }
242
243 *sep = '\0';
244 sep++;
245 strcpy(host, buf);
246 port = atoi(sep);
247
248 if (td_read(td)) {
249 nd->send_to_net = 0;
250 ret = fio_netio_setup_listen(td, port);
251 } else {
252 nd->send_to_net = 1;
253 ret = fio_netio_setup_connect(td, host, port);
254 }
255
256 return ret;
257}
258
259static void fio_netio_cleanup(struct thread_data *td)
260{
261 struct netio_data *nd = td->io_ops->data;
262
263 if (nd) {
264 free(nd);
265 td->io_ops->data = NULL;
266 }
267}
268
269static int fio_netio_setup(struct thread_data *td)
270{
271 struct netio_data *nd;
272 struct fio_file *f;
273 unsigned int i;
274
275 if (!td->io_ops->data) {
276 nd = malloc(sizeof(*nd));;
277
278 memset(nd, 0, sizeof(*nd));
279 nd->listenfd = -1;
280 td->io_ops->data = nd;
281
282 for_each_file(td, f, i)
283 f->real_file_size = -1ULL;
284 }
285
286 return 0;
287}
288
289static struct ioengine_ops ioengine = {
290 .name = "net",
291 .version = FIO_IOOPS_VERSION,
292 .prep = fio_netio_prep,
293 .queue = fio_netio_queue,
294 .setup = fio_netio_setup,
295 .init = fio_netio_init,
296 .cleanup = fio_netio_cleanup,
297 .open_file = fio_netio_open_file,
298 .close_file = generic_close_file,
299 .flags = FIO_SYNCIO | FIO_DISKLESSIO,
300};
301
302static void fio_init fio_netio_register(void)
303{
304 register_ioengine(&ioengine);
305}
306
307static void fio_exit fio_netio_unregister(void)
308{
309 unregister_ioengine(&ioengine);
310}