467059e431bf091b967d6a07a9ef7fc011ec11a6
[fio.git] / engines / net.c
1 /*
2  * Transfer data over the net.
3  */
4 #include <stdio.h>
5 #include <stdlib.h>
6 #include <unistd.h>
7 #include <errno.h>
8 #include <assert.h>
9 #include <netinet/in.h>
10 #include <arpa/inet.h>
11 #include <netdb.h>
12 #include <sys/poll.h>
13
14 #include "../fio.h"
15 #include "../os.h"
16
17 #define send_to_net(td) ((td)->io_ops->priv)
18
19 static int fio_netio_prep(struct thread_data *td, struct io_u *io_u)
20 {
21         struct fio_file *f = io_u->file;
22
23         /*
24          * Make sure we don't see spurious reads to a receiver, and vice versa
25          */
26         if ((send_to_net(td) && io_u->ddir == DDIR_READ) ||
27             (!send_to_net(td) && io_u->ddir == DDIR_WRITE)) {
28                 td_verror(td, EINVAL, "bad direction");
29                 return 1;
30         }
31                 
32         if (io_u->ddir == DDIR_SYNC)
33                 return 0;
34         if (io_u->offset == f->last_completed_pos)
35                 return 0;
36
37         /*
38          * If offset is different from last end position, it's a seek.
39          * As network io is purely sequential, we don't allow seeks.
40          */
41         td_verror(td, EINVAL, "cannot seek");
42         return 1;
43 }
44
45 static int fio_netio_queue(struct thread_data *td, struct io_u *io_u)
46 {
47         struct fio_file *f = io_u->file;
48         int ret, flags = 0;
49
50         if (io_u->ddir == DDIR_WRITE) {
51                 /*
52                  * if we are going to write more, set MSG_MORE
53                  */
54                 if (td->this_io_bytes[DDIR_WRITE] + io_u->xfer_buflen <
55                     td->io_size)
56                         flags = MSG_MORE;
57
58                 ret = send(f->fd, io_u->xfer_buf, io_u->xfer_buflen, flags);
59         } else if (io_u->ddir == DDIR_READ) {
60                 flags = MSG_WAITALL;
61                 ret = recv(f->fd, io_u->xfer_buf, io_u->xfer_buflen, flags);
62         } else
63                 ret = 0;        /* must be a SYNC */
64
65         if (ret != (int) io_u->xfer_buflen) {
66                 if (ret >= 0) {
67                         io_u->resid = io_u->xfer_buflen - ret;
68                         io_u->error = 0;
69                         return FIO_Q_COMPLETED;
70                 } else
71                         io_u->error = errno;
72         }
73
74         if (io_u->error)
75                 td_verror(td, io_u->error, "xfer");
76
77         return FIO_Q_COMPLETED;
78 }
79
80 static int fio_netio_setup_connect(struct thread_data *td, const char *host,
81                                    unsigned short port)
82 {
83         struct sockaddr_in addr;
84         struct fio_file *f;
85         int i;
86
87         memset(&addr, 0, sizeof(addr));
88         addr.sin_family = AF_INET;
89         addr.sin_port = htons(port);
90
91         if (inet_aton(host, &addr.sin_addr) != 1) {
92                 struct hostent *hent;
93
94                 hent = gethostbyname(host);
95                 if (!hent) {
96                         td_verror(td, errno, "gethostbyname");
97                         return 1;
98                 }
99
100                 memcpy(&addr.sin_addr, hent->h_addr, 4);
101         }
102
103         for_each_file(td, f, i) {
104                 f->fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
105                 if (f->fd < 0) {
106                         td_verror(td, errno, "socket");
107                         return 1;
108                 }
109
110                 if (connect(f->fd, (struct sockaddr *) &addr, sizeof(addr)) < 0) {
111                         td_verror(td, errno, "connect");
112                         return 1;
113                 }
114         }
115
116         return 0;
117
118 }
119
120 static int fio_netio_accept_connections(struct thread_data *td, int fd,
121                                         struct sockaddr_in *addr)
122 {
123         socklen_t socklen = sizeof(*addr);
124         unsigned int accepts = 0;
125         struct pollfd pfd;
126
127         fprintf(f_out, "fio: waiting for %u connections\n", td->nr_files);
128
129         /*
130          * Accept loop. poll for incoming events, accept them. Repeat until we
131          * have all connections.
132          */
133         while (!td->terminate && accepts < td->nr_files) {
134                 struct fio_file *f;
135                 int ret, i;
136
137                 pfd.fd = fd;
138                 pfd.events = POLLIN;
139
140                 ret = poll(&pfd, 1, -1);
141                 if (ret < 0) {
142                         if (errno == EINTR)
143                                 continue;
144
145                         td_verror(td, errno, "poll");
146                         break;
147                 } else if (!ret)
148                         continue;
149
150                 /*
151                  * should be impossible
152                  */
153                 if (!(pfd.revents & POLLIN))
154                         continue;
155
156                 for_each_file(td, f, i) {
157                         if (f->fd != -1)
158                                 continue;
159
160                         f->fd = accept(fd, (struct sockaddr *) addr, &socklen);
161                         if (f->fd < 0) {
162                                 td_verror(td, errno, "accept");
163                                 return 1;
164                         }
165                         accepts++;
166                         break;
167                 }
168         }
169
170         td->nr_open_files = accepts;
171         return 0;
172 }
173
174 static int fio_netio_setup_listen(struct thread_data *td, unsigned short port)
175 {
176         struct sockaddr_in addr;
177         int fd, opt;
178
179         fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
180         if (fd < 0) {
181                 td_verror(td, errno, "socket");
182                 return 1;
183         }
184
185         opt = 1;
186         if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
187                 td_verror(td, errno, "setsockopt");
188                 return 1;
189         }
190 #ifdef SO_REUSEPORT
191         if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) < 0) {
192                 td_verror(td, errno, "setsockopt");
193                 return 1;
194         }
195 #endif
196
197         memset(&addr, 0, sizeof(addr));
198         addr.sin_family = AF_INET;
199         addr.sin_addr.s_addr = htonl(INADDR_ANY);
200         addr.sin_port = htons(port);
201
202         if (bind(fd, (struct sockaddr *) &addr, sizeof(addr)) < 0) {
203                 td_verror(td, errno, "bind");
204                 return 1;
205         }
206         if (listen(fd, 1) < 0) {
207                 td_verror(td, errno, "listen");
208                 return 1;
209         }
210
211         return fio_netio_accept_connections(td, fd, &addr);
212 }
213
214 static int fio_netio_setup(struct thread_data *td)
215 {
216         char host[64], buf[128];
217         unsigned short port;
218         struct fio_file *f;
219         char *sep;
220         int ret, i;
221
222         if (!td->total_file_size) {
223                 log_err("fio: need size= set\n");
224                 return 1;
225         }
226
227         if (td_rw(td)) {
228                 log_err("fio: network connections must be read OR write\n");
229                 return 1;
230         }
231
232         strcpy(buf, td->filename);
233
234         sep = strchr(buf, ':');
235         if (!sep) {
236                 log_err("fio: bad network host:port <<%s>>\n", td->filename);
237                 return 1;
238         }
239
240         *sep = '\0';
241         sep++;
242         strcpy(host, buf);
243         port = atoi(sep);
244
245         if (td_read(td)) {
246                 send_to_net(td) = 0;
247                 ret = fio_netio_setup_listen(td, port);
248         } else {
249                 send_to_net(td) = 1;
250                 ret = fio_netio_setup_connect(td, host, port);
251         }
252
253         if (ret)
254                 return ret;
255
256         td->io_size = td->total_file_size;
257         td->total_io_size = td->io_size;
258
259         for_each_file(td, f, i) {
260                 f->file_size = td->total_file_size / td->nr_files;
261                 f->real_file_size = f->file_size;
262         }
263
264         return 0;
265 }
266
267 static struct ioengine_ops ioengine = {
268         .name           = "net",
269         .version        = FIO_IOOPS_VERSION,
270         .prep           = fio_netio_prep,
271         .queue          = fio_netio_queue,
272         .setup          = fio_netio_setup,
273         .flags          = FIO_SYNCIO | FIO_DISKLESSIO | FIO_SELFOPEN,
274 };
275
276 static void fio_init fio_netio_register(void)
277 {
278         register_ioengine(&ioengine);
279 }
280
281 static void fio_exit fio_netio_unregister(void)
282 {
283         unregister_ioengine(&ioengine);
284 }