server: idle loop support
[fio.git] / client.c
1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <unistd.h>
4 #include <limits.h>
5 #include <errno.h>
6 #include <fcntl.h>
7 #include <sys/poll.h>
8 #include <sys/types.h>
9 #include <sys/stat.h>
10 #include <sys/wait.h>
11 #include <sys/mman.h>
12 #include <netinet/in.h>
13 #include <arpa/inet.h>
14 #include <netdb.h>
15
16 #include "fio.h"
17 #include "server.h"
18 #include "crc/crc32.h"
19 #include "flist.h"
20
21 struct fio_client {
22         struct flist_head list;
23         struct sockaddr_in addr;
24         char *hostname;
25         int fd;
26 };
27
28 static FLIST_HEAD(client_list);
29
30 static struct fio_client *find_client_by_fd(int fd)
31 {
32         struct fio_client *client;
33         struct flist_head *entry;
34
35         flist_for_each(entry, &client_list) {
36                 client = flist_entry(entry, struct fio_client, list);
37
38                 if (client->fd == fd)
39                         return client;
40         }
41
42         return NULL;
43 }
44
45 #if 0
46 static struct fio_client *find_client_by_name(const char *name)
47 {
48         struct fio_client *client;
49         struct flist_head *entry;
50
51         flist_for_each(entry, &client_list) {
52                 client = flist_entry(entry, struct fio_client, list);
53
54                 if (!strcmp(name, client->hostname))
55                         return client;
56         }
57
58         return NULL;
59 }
60 #endif
61
62 static void remove_client(struct fio_client *client)
63 {
64         dprint(FD_NET, "removed client <%s>\n", client->hostname);
65         flist_del(&client->list);
66         nr_clients--;
67         free(client->hostname);
68         free(client);
69 }
70
71 void fio_client_add(const char *hostname)
72 {
73         struct fio_client *client;
74
75         dprint(FD_NET, "added client <%s>\n", hostname);
76         client = malloc(sizeof(*client));
77         memset(client, 0, sizeof(*client));
78         client->hostname = strdup(hostname);
79         client->fd = -1;
80         flist_add(&client->list, &client_list);
81         nr_clients++;
82 }
83
84 static int fio_client_connect(struct fio_client *client)
85 {
86         int fd;
87
88         dprint(FD_NET, "connect to host %s\n", client->hostname);
89
90         memset(&client->addr, 0, sizeof(client->addr));
91         client->addr.sin_family = AF_INET;
92         client->addr.sin_port = htons(fio_net_port);
93
94         if (inet_aton(client->hostname, &client->addr.sin_addr) != 1) {
95                 struct hostent *hent;
96
97                 hent = gethostbyname(client->hostname);
98                 if (!hent) {
99                         log_err("fio: gethostbyname: %s\n", strerror(errno));
100                         return 1;
101                 }
102
103                 memcpy(&client->addr.sin_addr, hent->h_addr, 4);
104         }
105
106         fd = socket(AF_INET, SOCK_STREAM, 0);
107         if (fd < 0) {
108                 log_err("fio: socket: %s\n", strerror(errno));
109                 return 1;
110         }
111
112         if (connect(fd, (struct sockaddr *) &client->addr, sizeof(client->addr)) < 0) {
113                 log_err("fio: connect: %s\n", strerror(errno));
114                 log_err("fio: failed to connect to %s\n", client->hostname);
115                 return 1;
116         }
117
118         client->fd = fd;
119         return 0;
120 }
121
122 void fio_clients_terminate(void)
123 {
124         struct flist_head *entry;
125         struct fio_client *client;
126
127         flist_for_each(entry, &client_list) {
128                 client = flist_entry(entry, struct fio_client, list);
129
130                 fio_net_send_simple_cmd(client->fd, FIO_NET_CMD_QUIT, 0);
131         }
132 }
133
134 static void sig_int(int sig)
135 {
136         fio_clients_terminate();
137 }
138
139 static void client_signal_handler(void)
140 {
141         struct sigaction act;
142
143         memset(&act, 0, sizeof(act));
144         act.sa_handler = sig_int;
145         act.sa_flags = SA_RESTART;
146         sigaction(SIGINT, &act, NULL);
147
148         memset(&act, 0, sizeof(act));
149         act.sa_handler = sig_int;
150         act.sa_flags = SA_RESTART;
151         sigaction(SIGTERM, &act, NULL);
152 }
153
154 int fio_clients_connect(void)
155 {
156         struct fio_client *client;
157         struct flist_head *entry, *tmp;
158         int ret;
159
160         client_signal_handler();
161
162         flist_for_each_safe(entry, tmp, &client_list) {
163                 client = flist_entry(entry, struct fio_client, list);
164
165                 ret = fio_client_connect(client);
166                 if (ret)
167                         remove_client(client);
168         }
169
170         return !nr_clients;
171 }
172
173 static int send_file_buf(struct fio_client *client, char *buf, off_t size)
174 {
175         return fio_net_send_cmd(client->fd, FIO_NET_CMD_JOB, buf, size);
176 }
177
178 /*
179  * Send file contents to server backend. We could use sendfile(), but to remain
180  * more portable lets just read/write the darn thing.
181  */
182 static int fio_client_send_ini(struct fio_client *client, const char *filename)
183 {
184         struct stat sb;
185         char *p, *buf;
186         off_t len;
187         int fd, ret;
188
189         dprint(FD_NET, "send ini %s to %s\n", filename, client->hostname);
190
191         fd = open(filename, O_RDONLY);
192         if (fd < 0) {
193                 log_err("fio: job file open: %s\n", strerror(errno));
194                 return 1;
195         }
196
197         if (fstat(fd, &sb) < 0) {
198                 log_err("fio: job file stat: %s\n", strerror(errno));
199                 return 1;
200         }
201
202         buf = malloc(sb.st_size);
203
204         len = sb.st_size;
205         p = buf;
206         do {
207                 ret = read(fd, p, len);
208                 if (ret > 0) {
209                         len -= ret;
210                         if (!len)
211                                 break;
212                         p += ret;
213                         continue;
214                 } else if (!ret)
215                         break;
216                 else if (errno == EAGAIN || errno == EINTR)
217                         continue;
218         } while (1);
219
220         ret = send_file_buf(client, buf, sb.st_size);
221         free(buf);
222         return ret;
223 }
224
225 int fio_clients_send_ini(const char *filename)
226 {
227         struct fio_client *client;
228         struct flist_head *entry, *tmp;
229
230         flist_for_each_safe(entry, tmp, &client_list) {
231                 client = flist_entry(entry, struct fio_client, list);
232
233                 if (fio_client_send_ini(client, filename))
234                         remove_client(client);
235         }
236
237         return !nr_clients;
238 }
239
240 static void convert_io_stat(struct io_stat *dst, struct io_stat *src)
241 {
242         dst->max_val    = le64_to_cpu(src->max_val);
243         dst->min_val    = le64_to_cpu(src->min_val);
244         dst->samples    = le64_to_cpu(src->samples);
245         /* FIXME */
246         dst->mean       = __le64_to_cpu(src->mean);
247         dst->S          = __le64_to_cpu(src->S);
248 }
249
250 static void convert_ts(struct thread_stat *dst, struct thread_stat *src)
251 {
252         int i, j;
253
254         dst->error      = le32_to_cpu(src->error);
255         dst->groupid    = le32_to_cpu(src->groupid);
256         dst->pid        = le32_to_cpu(src->pid);
257         dst->members    = le32_to_cpu(src->members);
258
259         for (i = 0; i < 2; i++) {
260                 convert_io_stat(&dst->clat_stat[i], &src->clat_stat[i]);
261                 convert_io_stat(&dst->slat_stat[i], &src->slat_stat[i]);
262                 convert_io_stat(&dst->lat_stat[i], &src->lat_stat[i]);
263                 convert_io_stat(&dst->bw_stat[i], &src->bw_stat[i]);
264         }
265
266         dst->usr_time           = le64_to_cpu(src->usr_time);
267         dst->sys_time           = le64_to_cpu(src->sys_time);
268         dst->ctx                = le64_to_cpu(src->ctx);
269         dst->minf               = le64_to_cpu(src->minf);
270         dst->majf               = le64_to_cpu(src->majf);
271         dst->clat_percentiles   = le64_to_cpu(src->clat_percentiles);
272         dst->percentile_list    = NULL;
273
274         for (i = 0; i < FIO_IO_U_MAP_NR; i++) {
275                 dst->io_u_map[i]        = le32_to_cpu(src->io_u_map[i]);
276                 dst->io_u_submit[i]     = le32_to_cpu(src->io_u_submit[i]);
277                 dst->io_u_complete[i]   = le32_to_cpu(src->io_u_complete[i]);
278         }
279
280         for (i = 0; i < FIO_IO_U_LAT_U_NR; i++) {
281                 dst->io_u_lat_u[i]      = le32_to_cpu(src->io_u_lat_u[i]);
282                 dst->io_u_lat_m[i]      = le32_to_cpu(src->io_u_lat_m[i]);
283         }
284
285         for (i = 0; i < 2; i++)
286                 for (j = 0; j < FIO_IO_U_PLAT_NR; j++)
287                         dst->io_u_plat[i][j] = le32_to_cpu(src->io_u_plat[i][j]);
288
289         for (i = 0; i < 3; i++) {
290                 dst->total_io_u[i]      = le64_to_cpu(src->total_io_u[i]);
291                 dst->short_io_u[i]      = le64_to_cpu(src->short_io_u[i]);
292         }
293
294         dst->total_submit       = le64_to_cpu(src->total_submit);
295         dst->total_complete     = le64_to_cpu(src->total_complete);
296
297         for (i = 0; i < 2; i++) {
298                 dst->io_bytes[i]        = le64_to_cpu(src->io_bytes[i]);
299                 dst->runtime[i]         = le64_to_cpu(src->runtime[i]);
300         }
301
302         dst->total_run_time     = le64_to_cpu(src->total_run_time);
303         dst->continue_on_error  = le16_to_cpu(src->continue_on_error);
304         dst->total_err_count    = le64_to_cpu(src->total_err_count);
305         dst->first_error        = le32_to_cpu(src->first_error);
306         dst->kb_base            = le32_to_cpu(src->kb_base);
307 }
308
309 static void convert_gs(struct group_run_stats *dst, struct group_run_stats *src)
310 {
311         int i;
312
313         for (i = 0; i < 2; i++) {
314                 dst->max_run[i]         = le64_to_cpu(src->max_run[i]);
315                 dst->min_run[i]         = le64_to_cpu(src->min_run[i]);
316                 dst->max_bw[i]          = le64_to_cpu(src->max_bw[i]);
317                 dst->min_bw[i]          = le64_to_cpu(src->min_bw[i]);
318                 dst->io_kb[i]           = le64_to_cpu(src->io_kb[i]);
319                 dst->agg[i]             = le64_to_cpu(src->agg[i]);
320         }
321
322         dst->kb_base    = le32_to_cpu(src->kb_base);
323         dst->groupid    = le32_to_cpu(src->groupid);
324 }
325
326 static void handle_ts(struct fio_net_cmd *cmd)
327 {
328         struct cmd_ts_pdu *p = (struct cmd_ts_pdu *) cmd->payload;
329
330         convert_ts(&p->ts, &p->ts);
331         convert_gs(&p->rs, &p->rs);
332
333         show_thread_status(&p->ts, &p->rs);
334 }
335
336 static void handle_gs(struct fio_net_cmd *cmd)
337 {
338         struct group_run_stats *gs = (struct group_run_stats *) cmd->payload;
339
340         convert_gs(gs, gs);
341         show_group_stats(gs);
342 }
343
344 static void handle_eta(struct fio_net_cmd *cmd)
345 {
346         struct jobs_eta *je = (struct jobs_eta *) cmd->payload;
347         int i;
348
349         je->nr_running          = le32_to_cpu(je->nr_running);
350         je->nr_ramp             = le32_to_cpu(je->nr_ramp);
351         je->nr_pending          = le32_to_cpu(je->nr_pending);
352         je->files_open          = le32_to_cpu(je->files_open);
353         je->m_rate              = le32_to_cpu(je->m_rate);
354         je->t_rate              = le32_to_cpu(je->t_rate);
355         je->m_iops              = le32_to_cpu(je->m_iops);
356         je->t_iops              = le32_to_cpu(je->t_iops);
357
358         for (i = 0; i < 2; i++) {
359                 je->rate[i]     = le32_to_cpu(je->rate[i]);
360                 je->iops[i]     = le32_to_cpu(je->iops[i]);
361         }
362
363         je->elapsed_sec         = le32_to_cpu(je->nr_running);
364         je->eta_sec             = le64_to_cpu(je->eta_sec);
365
366         display_thread_status(je);
367 }
368
369 static void handle_probe(struct fio_net_cmd *cmd)
370 {
371         struct cmd_probe_pdu *probe = (struct cmd_probe_pdu *) cmd->payload;
372
373         log_info("Probe: %s: %u.%u.%u\n", probe->hostname, probe->fio_major,
374                                         probe->fio_minor, probe->fio_patch);
375 }
376
377 static int handle_client(struct fio_client *client)
378 {
379         struct fio_net_cmd *cmd;
380         int done = 0;
381
382         while ((cmd = fio_net_recv_cmd(client->fd, 1)) != NULL) {
383                 dprint(FD_NET, "%s: got cmd op %d\n", client->hostname,
384                                                         cmd->opcode);
385
386                 switch (cmd->opcode) {
387                 case FIO_NET_CMD_QUIT:
388                         remove_client(client);
389                         free(cmd);
390                         done = 1;
391                         break;
392                 case FIO_NET_CMD_TEXT:
393                         fwrite(cmd->payload, cmd->pdu_len, 1, stdout);
394                         fflush(stdout);
395                         free(cmd);
396                         break;
397                 case FIO_NET_CMD_TS:
398                         handle_ts(cmd);
399                         free(cmd);
400                         break;
401                 case FIO_NET_CMD_GS:
402                         handle_gs(cmd);
403                         free(cmd);
404                         break;
405                 case FIO_NET_CMD_ETA:
406                         handle_eta(cmd);
407                         free(cmd);
408                         break;
409                 case FIO_NET_CMD_PROBE:
410                         handle_probe(cmd);
411                         free(cmd);
412                         break;
413                 default:
414                         log_err("fio: unknown client op: %d\n", cmd->opcode);
415                         free(cmd);
416                         break;
417                 }
418
419                 if (done)
420                         break;
421         }
422
423         return 0;
424 }
425
426 int fio_handle_clients(void)
427 {
428         struct fio_client *client;
429         struct flist_head *entry;
430         struct pollfd *pfds;
431         int i, ret = 0;
432
433         pfds = malloc(nr_clients * sizeof(struct pollfd));
434
435         while (!exit_backend && nr_clients) {
436                 i = 0;
437                 flist_for_each(entry, &client_list) {
438                         client = flist_entry(entry, struct fio_client, list);
439
440                         pfds[i].fd = client->fd;
441                         pfds[i].events = POLLIN;
442                         i++;
443                 }
444
445                 assert(i == nr_clients);
446
447                 ret = poll(pfds, nr_clients, 100);
448                 if (ret < 0) {
449                         if (errno == EINTR)
450                                 continue;
451                         log_err("fio: poll clients: %s\n", strerror(errno));
452                         break;
453                 } else if (!ret)
454                         continue;
455
456                 for (i = 0; i < nr_clients; i++) {
457                         if (!(pfds[i].revents & POLLIN))
458                                 continue;
459
460                         client = find_client_by_fd(pfds[i].fd);
461                         if (!client) {
462                                 log_err("fio: unknown client\n");
463                                 continue;
464                         }
465                         handle_client(client);
466                 }
467         }
468
469         free(pfds);
470         return 0;
471 }