client: display summed total of all clients when all stats have been received
[fio.git] / client.c
1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <unistd.h>
4 #include <limits.h>
5 #include <errno.h>
6 #include <fcntl.h>
7 #include <sys/poll.h>
8 #include <sys/types.h>
9 #include <sys/stat.h>
10 #include <sys/wait.h>
11 #include <sys/socket.h>
12 #include <sys/un.h>
13 #include <netinet/in.h>
14 #include <arpa/inet.h>
15 #include <netdb.h>
16 #include <signal.h>
17
18 #include "fio.h"
19 #include "server.h"
20 #include "flist.h"
21 #include "hash.h"
22
23 struct client_eta {
24         struct jobs_eta eta;
25         unsigned int pending;
26 };
27
28 struct fio_client {
29         struct flist_head list;
30         struct flist_head hash_list;
31         struct flist_head arg_list;
32         struct sockaddr_in addr;
33         struct sockaddr_un addr_un;
34         char *hostname;
35         int port;
36         int fd;
37
38         char *name;
39
40         int state;
41
42         int skip_newline;
43         int is_sock;
44
45         struct flist_head eta_list;
46         struct client_eta *eta_in_flight;
47
48         struct flist_head cmd_list;
49
50         uint16_t argc;
51         char **argv;
52 };
53
54 static struct timeval eta_tv;
55
56 enum {
57         Client_created          = 0,
58         Client_connected        = 1,
59         Client_started          = 2,
60         Client_stopped          = 3,
61         Client_exited           = 4,
62 };
63
64 static FLIST_HEAD(client_list);
65 static FLIST_HEAD(eta_list);
66
67 static FLIST_HEAD(arg_list);
68
69 static struct thread_stat client_ts;
70 static struct group_run_stats client_gs;
71 static int sum_stat_clients;
72 static int sum_stat_nr;
73
74 #define FIO_CLIENT_HASH_BITS    7
75 #define FIO_CLIENT_HASH_SZ      (1 << FIO_CLIENT_HASH_BITS)
76 #define FIO_CLIENT_HASH_MASK    (FIO_CLIENT_HASH_SZ - 1)
77 static struct flist_head client_hash[FIO_CLIENT_HASH_SZ];
78
79 static int handle_client(struct fio_client *client);
80 static void dec_jobs_eta(struct client_eta *eta);
81
82 static void fio_client_add_hash(struct fio_client *client)
83 {
84         int bucket = hash_long(client->fd, FIO_CLIENT_HASH_BITS);
85
86         bucket &= FIO_CLIENT_HASH_MASK;
87         flist_add(&client->hash_list, &client_hash[bucket]);
88 }
89
90 static void fio_client_remove_hash(struct fio_client *client)
91 {
92         if (!flist_empty(&client->hash_list))
93                 flist_del_init(&client->hash_list);
94 }
95
96 static void fio_init fio_client_hash_init(void)
97 {
98         int i;
99
100         for (i = 0; i < FIO_CLIENT_HASH_SZ; i++)
101                 INIT_FLIST_HEAD(&client_hash[i]);
102 }
103
104 static struct fio_client *find_client_by_fd(int fd)
105 {
106         int bucket = hash_long(fd, FIO_CLIENT_HASH_BITS) & FIO_CLIENT_HASH_MASK;
107         struct fio_client *client;
108         struct flist_head *entry;
109
110         flist_for_each(entry, &client_hash[bucket]) {
111                 client = flist_entry(entry, struct fio_client, hash_list);
112
113                 if (client->fd == fd)
114                         return client;
115         }
116
117         return NULL;
118 }
119
120 static void remove_client(struct fio_client *client)
121 {
122         dprint(FD_NET, "client: removed <%s>\n", client->hostname);
123         flist_del(&client->list);
124
125         fio_client_remove_hash(client);
126
127         if (!flist_empty(&client->eta_list)) {
128                 flist_del_init(&client->eta_list);
129                 dec_jobs_eta(client->eta_in_flight);
130         }
131
132         free(client->hostname);
133         if (client->argv)
134                 free(client->argv);
135         if (client->name)
136                 free(client->name);
137
138         free(client);
139         nr_clients--;
140 }
141
142 static void __fio_client_add_cmd_option(struct fio_client *client,
143                                         const char *opt)
144 {
145         int index;
146
147         index = client->argc++;
148         client->argv = realloc(client->argv, sizeof(char *) * client->argc);
149         client->argv[index] = strdup(opt);
150         dprint(FD_NET, "client: add cmd %d: %s\n", index, opt);
151 }
152
153 void fio_client_add_cmd_option(void *cookie, const char *opt)
154 {
155         struct fio_client *client = cookie;
156         struct flist_head *entry;
157
158         if (!client || !opt)
159                 return;
160
161         __fio_client_add_cmd_option(client, opt);
162
163         /*
164          * Duplicate arguments to shared client group
165          */
166         flist_for_each(entry, &arg_list) {
167                 client = flist_entry(entry, struct fio_client, arg_list);
168
169                 __fio_client_add_cmd_option(client, opt);
170         }
171 }
172
173 int fio_client_add(const char *hostname, void **cookie)
174 {
175         struct fio_client *existing = *cookie;
176         struct fio_client *client;
177
178         if (existing) {
179                 /*
180                  * We always add our "exec" name as the option, hence 1
181                  * means empty.
182                  */
183                 if (existing->argc == 1)
184                         flist_add_tail(&existing->arg_list, &arg_list);
185                 else {
186                         while (!flist_empty(&arg_list))
187                                 flist_del_init(arg_list.next);
188                 }
189         }
190
191         client = malloc(sizeof(*client));
192         memset(client, 0, sizeof(*client));
193
194         INIT_FLIST_HEAD(&client->list);
195         INIT_FLIST_HEAD(&client->hash_list);
196         INIT_FLIST_HEAD(&client->arg_list);
197         INIT_FLIST_HEAD(&client->eta_list);
198         INIT_FLIST_HEAD(&client->cmd_list);
199
200         if (fio_server_parse_string(hostname, &client->hostname,
201                                         &client->is_sock, &client->port,
202                                         &client->addr.sin_addr))
203                 return -1;
204
205         client->fd = -1;
206
207         __fio_client_add_cmd_option(client, "fio");
208
209         flist_add(&client->list, &client_list);
210         nr_clients++;
211         dprint(FD_NET, "client: added <%s>\n", client->hostname);
212         *cookie = client;
213         return 0;
214 }
215
216 static int fio_client_connect_ip(struct fio_client *client)
217 {
218         int fd;
219
220         client->addr.sin_family = AF_INET;
221         client->addr.sin_port = htons(client->port);
222
223         fd = socket(AF_INET, SOCK_STREAM, 0);
224         if (fd < 0) {
225                 log_err("fio: socket: %s\n", strerror(errno));
226                 return -1;
227         }
228
229         if (connect(fd, (struct sockaddr *) &client->addr, sizeof(client->addr)) < 0) {
230                 log_err("fio: connect: %s\n", strerror(errno));
231                 log_err("fio: failed to connect to %s:%u\n", client->hostname,
232                                                                 client->port);
233                 close(fd);
234                 return -1;
235         }
236
237         return fd;
238 }
239
240 static int fio_client_connect_sock(struct fio_client *client)
241 {
242         struct sockaddr_un *addr = &client->addr_un;
243         fio_socklen_t len;
244         int fd;
245
246         memset(addr, 0, sizeof(*addr));
247         addr->sun_family = AF_UNIX;
248         strcpy(addr->sun_path, client->hostname);
249
250         fd = socket(AF_UNIX, SOCK_STREAM, 0);
251         if (fd < 0) {
252                 log_err("fio: socket: %s\n", strerror(errno));
253                 return -1;
254         }
255
256         len = sizeof(addr->sun_family) + strlen(addr->sun_path) + 1;
257         if (connect(fd, (struct sockaddr *) addr, len) < 0) {
258                 log_err("fio: connect; %s\n", strerror(errno));
259                 close(fd);
260                 return -1;
261         }
262
263         return fd;
264 }
265
266 static int fio_client_connect(struct fio_client *client)
267 {
268         int fd;
269
270         dprint(FD_NET, "client: connect to host %s\n", client->hostname);
271
272         if (client->is_sock)
273                 fd = fio_client_connect_sock(client);
274         else
275                 fd = fio_client_connect_ip(client);
276
277         dprint(FD_NET, "client: %s connected %d\n", client->hostname, fd);
278
279         if (fd < 0)
280                 return 1;
281
282         client->fd = fd;
283         fio_client_add_hash(client);
284         client->state = Client_connected;
285         return 0;
286 }
287
288 void fio_clients_terminate(void)
289 {
290         struct flist_head *entry;
291         struct fio_client *client;
292
293         dprint(FD_NET, "client: terminate clients\n");
294
295         flist_for_each(entry, &client_list) {
296                 client = flist_entry(entry, struct fio_client, list);
297
298                 fio_net_send_simple_cmd(client->fd, FIO_NET_CMD_QUIT, 0, NULL);
299         }
300 }
301
302 static void sig_int(int sig)
303 {
304         dprint(FD_NET, "client: got signal %d\n", sig);
305         fio_clients_terminate();
306 }
307
308 static void client_signal_handler(void)
309 {
310         struct sigaction act;
311
312         memset(&act, 0, sizeof(act));
313         act.sa_handler = sig_int;
314         act.sa_flags = SA_RESTART;
315         sigaction(SIGINT, &act, NULL);
316
317         memset(&act, 0, sizeof(act));
318         act.sa_handler = sig_int;
319         act.sa_flags = SA_RESTART;
320         sigaction(SIGTERM, &act, NULL);
321 }
322
323 static void probe_client(struct fio_client *client)
324 {
325         dprint(FD_NET, "client: send probe\n");
326
327         fio_net_send_simple_cmd(client->fd, FIO_NET_CMD_PROBE, 0, &client->cmd_list);
328 }
329
330 static int send_client_cmd_line(struct fio_client *client)
331 {
332         struct cmd_single_line_pdu *cslp;
333         struct cmd_line_pdu *clp;
334         unsigned long offset;
335         unsigned int *lens;
336         void *pdu;
337         size_t mem;
338         int i, ret;
339
340         dprint(FD_NET, "client: send cmdline %d\n", client->argc);
341
342         lens = malloc(client->argc * sizeof(unsigned int));
343
344         /*
345          * Find out how much mem we need
346          */
347         for (i = 0, mem = 0; i < client->argc; i++) {
348                 lens[i] = strlen(client->argv[i]) + 1;
349                 mem += lens[i];
350         }
351
352         /*
353          * We need one cmd_line_pdu, and argc number of cmd_single_line_pdu
354          */
355         mem += sizeof(*clp) + (client->argc * sizeof(*cslp));
356
357         pdu = malloc(mem);
358         clp = pdu;
359         offset = sizeof(*clp);
360
361         for (i = 0; i < client->argc; i++) {
362                 uint16_t arg_len = lens[i];
363
364                 cslp = pdu + offset;
365                 strcpy((char *) cslp->text, client->argv[i]);
366                 cslp->len = cpu_to_le16(arg_len);
367                 offset += sizeof(*cslp) + arg_len;
368         }
369
370         free(lens);
371         clp->lines = cpu_to_le16(client->argc);
372         ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_JOBLINE, pdu, mem, 0);
373         free(pdu);
374         return ret;
375 }
376
377 int fio_clients_connect(void)
378 {
379         struct fio_client *client;
380         struct flist_head *entry, *tmp;
381         int ret;
382
383         dprint(FD_NET, "client: connect all\n");
384
385         client_signal_handler();
386
387         flist_for_each_safe(entry, tmp, &client_list) {
388                 client = flist_entry(entry, struct fio_client, list);
389
390                 ret = fio_client_connect(client);
391                 if (ret) {
392                         remove_client(client);
393                         continue;
394                 }
395
396                 probe_client(client);
397
398                 if (client->argc > 1)
399                         send_client_cmd_line(client);
400         }
401
402         return !nr_clients;
403 }
404
405 /*
406  * Send file contents to server backend. We could use sendfile(), but to remain
407  * more portable lets just read/write the darn thing.
408  */
409 static int fio_client_send_ini(struct fio_client *client, const char *filename)
410 {
411         struct stat sb;
412         char *p, *buf;
413         off_t len;
414         int fd, ret;
415
416         dprint(FD_NET, "send ini %s to %s\n", filename, client->hostname);
417
418         fd = open(filename, O_RDONLY);
419         if (fd < 0) {
420                 log_err("fio: job file <%s> open: %s\n", filename, strerror(errno));
421                 return 1;
422         }
423
424         if (fstat(fd, &sb) < 0) {
425                 log_err("fio: job file stat: %s\n", strerror(errno));
426                 close(fd);
427                 return 1;
428         }
429
430         buf = malloc(sb.st_size);
431
432         len = sb.st_size;
433         p = buf;
434         do {
435                 ret = read(fd, p, len);
436                 if (ret > 0) {
437                         len -= ret;
438                         if (!len)
439                                 break;
440                         p += ret;
441                         continue;
442                 } else if (!ret)
443                         break;
444                 else if (errno == EAGAIN || errno == EINTR)
445                         continue;
446         } while (1);
447
448         if (len) {
449                 log_err("fio: failed reading job file %s\n", filename);
450                 close(fd);
451                 free(buf);
452                 return 1;
453         }
454
455         ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_JOB, buf, sb.st_size, 0);
456         free(buf);
457         close(fd);
458         return ret;
459 }
460
461 int fio_clients_send_ini(const char *filename)
462 {
463         struct fio_client *client;
464         struct flist_head *entry, *tmp;
465
466         flist_for_each_safe(entry, tmp, &client_list) {
467                 client = flist_entry(entry, struct fio_client, list);
468
469                 if (fio_client_send_ini(client, filename))
470                         remove_client(client);
471         }
472
473         return !nr_clients;
474 }
475
476 static void convert_io_stat(struct io_stat *dst, struct io_stat *src)
477 {
478         dst->max_val    = le64_to_cpu(src->max_val);
479         dst->min_val    = le64_to_cpu(src->min_val);
480         dst->samples    = le64_to_cpu(src->samples);
481
482         /*
483          * Floats arrive as IEEE 754 encoded uint64_t, convert back to double
484          */
485         dst->mean.u.f   = fio_uint64_to_double(le64_to_cpu(dst->mean.u.i));
486         dst->S.u.f      = fio_uint64_to_double(le64_to_cpu(dst->S.u.i));
487 }
488
489 static void convert_ts(struct thread_stat *dst, struct thread_stat *src)
490 {
491         int i, j;
492
493         dst->error      = le32_to_cpu(src->error);
494         dst->groupid    = le32_to_cpu(src->groupid);
495         dst->pid        = le32_to_cpu(src->pid);
496         dst->members    = le32_to_cpu(src->members);
497
498         for (i = 0; i < 2; i++) {
499                 convert_io_stat(&dst->clat_stat[i], &src->clat_stat[i]);
500                 convert_io_stat(&dst->slat_stat[i], &src->slat_stat[i]);
501                 convert_io_stat(&dst->lat_stat[i], &src->lat_stat[i]);
502                 convert_io_stat(&dst->bw_stat[i], &src->bw_stat[i]);
503         }
504
505         dst->usr_time           = le64_to_cpu(src->usr_time);
506         dst->sys_time           = le64_to_cpu(src->sys_time);
507         dst->ctx                = le64_to_cpu(src->ctx);
508         dst->minf               = le64_to_cpu(src->minf);
509         dst->majf               = le64_to_cpu(src->majf);
510         dst->clat_percentiles   = le64_to_cpu(src->clat_percentiles);
511
512         for (i = 0; i < FIO_IO_U_LIST_MAX_LEN; i++) {
513                 fio_fp64_t *fps = &src->percentile_list[i];
514                 fio_fp64_t *fpd = &dst->percentile_list[i];
515
516                 fpd->u.f = fio_uint64_to_double(le64_to_cpu(fps->u.i));
517         }
518
519         for (i = 0; i < FIO_IO_U_MAP_NR; i++) {
520                 dst->io_u_map[i]        = le32_to_cpu(src->io_u_map[i]);
521                 dst->io_u_submit[i]     = le32_to_cpu(src->io_u_submit[i]);
522                 dst->io_u_complete[i]   = le32_to_cpu(src->io_u_complete[i]);
523         }
524
525         for (i = 0; i < FIO_IO_U_LAT_U_NR; i++) {
526                 dst->io_u_lat_u[i]      = le32_to_cpu(src->io_u_lat_u[i]);
527                 dst->io_u_lat_m[i]      = le32_to_cpu(src->io_u_lat_m[i]);
528         }
529
530         for (i = 0; i < 2; i++)
531                 for (j = 0; j < FIO_IO_U_PLAT_NR; j++)
532                         dst->io_u_plat[i][j] = le32_to_cpu(src->io_u_plat[i][j]);
533
534         for (i = 0; i < 3; i++) {
535                 dst->total_io_u[i]      = le64_to_cpu(src->total_io_u[i]);
536                 dst->short_io_u[i]      = le64_to_cpu(src->short_io_u[i]);
537         }
538
539         dst->total_submit       = le64_to_cpu(src->total_submit);
540         dst->total_complete     = le64_to_cpu(src->total_complete);
541
542         for (i = 0; i < 2; i++) {
543                 dst->io_bytes[i]        = le64_to_cpu(src->io_bytes[i]);
544                 dst->runtime[i]         = le64_to_cpu(src->runtime[i]);
545         }
546
547         dst->total_run_time     = le64_to_cpu(src->total_run_time);
548         dst->continue_on_error  = le16_to_cpu(src->continue_on_error);
549         dst->total_err_count    = le64_to_cpu(src->total_err_count);
550         dst->first_error        = le32_to_cpu(src->first_error);
551         dst->kb_base            = le32_to_cpu(src->kb_base);
552 }
553
554 static void convert_gs(struct group_run_stats *dst, struct group_run_stats *src)
555 {
556         int i;
557
558         for (i = 0; i < 2; i++) {
559                 dst->max_run[i]         = le64_to_cpu(src->max_run[i]);
560                 dst->min_run[i]         = le64_to_cpu(src->min_run[i]);
561                 dst->max_bw[i]          = le64_to_cpu(src->max_bw[i]);
562                 dst->min_bw[i]          = le64_to_cpu(src->min_bw[i]);
563                 dst->io_kb[i]           = le64_to_cpu(src->io_kb[i]);
564                 dst->agg[i]             = le64_to_cpu(src->agg[i]);
565         }
566
567         dst->kb_base    = le32_to_cpu(src->kb_base);
568         dst->groupid    = le32_to_cpu(src->groupid);
569 }
570
571 static void handle_ts(struct fio_net_cmd *cmd)
572 {
573         struct cmd_ts_pdu *p = (struct cmd_ts_pdu *) cmd->payload;
574
575         convert_ts(&p->ts, &p->ts);
576         convert_gs(&p->rs, &p->rs);
577
578         show_thread_status(&p->ts, &p->rs);
579
580         if (sum_stat_clients == 1)
581                 return;
582
583         sum_thread_stats(&client_ts, &p->ts, sum_stat_nr);
584         sum_group_stats(&client_gs, &p->rs);
585
586         client_ts.members++;
587         client_ts.groupid = p->ts.groupid;
588
589         if (++sum_stat_nr == sum_stat_clients) {
590                 strcpy(client_ts.name, "All clients");
591                 show_thread_status(&client_ts, &client_gs);
592         }
593 }
594
595 static void handle_gs(struct fio_net_cmd *cmd)
596 {
597         struct group_run_stats *gs = (struct group_run_stats *) cmd->payload;
598
599         convert_gs(gs, gs);
600         show_group_stats(gs);
601 }
602
603 static void convert_jobs_eta(struct jobs_eta *je)
604 {
605         int i;
606
607         je->nr_running          = le32_to_cpu(je->nr_running);
608         je->nr_ramp             = le32_to_cpu(je->nr_ramp);
609         je->nr_pending          = le32_to_cpu(je->nr_pending);
610         je->files_open          = le32_to_cpu(je->files_open);
611         je->m_rate              = le32_to_cpu(je->m_rate);
612         je->t_rate              = le32_to_cpu(je->t_rate);
613         je->m_iops              = le32_to_cpu(je->m_iops);
614         je->t_iops              = le32_to_cpu(je->t_iops);
615
616         for (i = 0; i < 2; i++) {
617                 je->rate[i]     = le32_to_cpu(je->rate[i]);
618                 je->iops[i]     = le32_to_cpu(je->iops[i]);
619         }
620
621         je->elapsed_sec         = le64_to_cpu(je->elapsed_sec);
622         je->eta_sec             = le64_to_cpu(je->eta_sec);
623 }
624
625 static void sum_jobs_eta(struct jobs_eta *dst, struct jobs_eta *je)
626 {
627         int i;
628
629         dst->nr_running         += je->nr_running;
630         dst->nr_ramp            += je->nr_ramp;
631         dst->nr_pending         += je->nr_pending;
632         dst->files_open         += je->files_open;
633         dst->m_rate             += je->m_rate;
634         dst->t_rate             += je->t_rate;
635         dst->m_iops             += je->m_iops;
636         dst->t_iops             += je->t_iops;
637
638         for (i = 0; i < 2; i++) {
639                 dst->rate[i]    += je->rate[i];
640                 dst->iops[i]    += je->iops[i];
641         }
642
643         dst->elapsed_sec        += je->elapsed_sec;
644
645         if (je->eta_sec > dst->eta_sec)
646                 dst->eta_sec = je->eta_sec;
647 }
648
649 static void dec_jobs_eta(struct client_eta *eta)
650 {
651         if (!--eta->pending) {
652                 display_thread_status(&eta->eta);
653                 free(eta);
654         }
655 }
656
657 static void remove_reply_cmd(struct fio_client *client, struct fio_net_cmd *cmd)
658 {
659         struct fio_net_int_cmd *icmd = NULL;
660         struct flist_head *entry;
661
662         flist_for_each(entry, &client->cmd_list) {
663                 icmd = flist_entry(entry, struct fio_net_int_cmd, list);
664
665                 if (cmd->tag == (uint64_t) icmd)
666                         break;
667
668                 icmd = NULL;
669         }
670
671         if (!icmd) {
672                 log_err("fio: client: unable to find matching tag\n");
673                 return;
674         }
675
676         flist_del(&icmd->list);
677         cmd->tag = icmd->saved_tag;
678         free(icmd);
679 }
680
681 static void handle_eta(struct fio_client *client, struct fio_net_cmd *cmd)
682 {
683         struct jobs_eta *je = (struct jobs_eta *) cmd->payload;
684         struct client_eta *eta = (struct client_eta *) cmd->tag;
685
686         dprint(FD_NET, "client: got eta tag %p, %d\n", eta, eta->pending);
687
688         assert(client->eta_in_flight == eta);
689
690         client->eta_in_flight = NULL;
691         flist_del_init(&client->eta_list);
692
693         convert_jobs_eta(je);
694         sum_jobs_eta(&eta->eta, je);
695         dec_jobs_eta(eta);
696 }
697
698 static void handle_probe(struct fio_client *client, struct fio_net_cmd *cmd)
699 {
700         struct cmd_probe_pdu *probe = (struct cmd_probe_pdu *) cmd->payload;
701         const char *os, *arch;
702
703         os = fio_get_os_string(probe->os);
704         if (!os)
705                 os = "unknown";
706
707         arch = fio_get_arch_string(probe->arch);
708         if (!arch)
709                 os = "unknown";
710
711         log_info("hostname=%s, be=%u, os=%s, arch=%s, fio=%u.%u.%u\n",
712                 probe->hostname, probe->bigendian, os, arch, probe->fio_major,
713                 probe->fio_minor, probe->fio_patch);
714
715         if (!client->name)
716                 client->name = strdup((char *) probe->hostname);
717 }
718
719 static int handle_client(struct fio_client *client)
720 {
721         struct fio_net_cmd *cmd;
722
723         dprint(FD_NET, "client: handle %s\n", client->hostname);
724
725         cmd = fio_net_recv_cmd(client->fd);
726         if (!cmd)
727                 return 0;
728
729         dprint(FD_NET, "client: got cmd op %s from %s\n",
730                                 fio_server_op(cmd->opcode), client->hostname);
731
732         switch (cmd->opcode) {
733         case FIO_NET_CMD_QUIT:
734                 remove_client(client);
735                 free(cmd);
736                 break;
737         case FIO_NET_CMD_TEXT: {
738                 const char *buf = (const char *) cmd->payload;
739                 const char *name;
740                 int fio_unused ret;
741
742                 name = client->name ? client->name : client->hostname;
743
744                 if (!client->skip_newline)
745                         fprintf(f_out, "<%s> ", name);
746                 ret = fwrite(buf, cmd->pdu_len, 1, f_out);
747                 fflush(f_out);
748                 client->skip_newline = strchr(buf, '\n') == NULL;
749                 free(cmd);
750                 break;
751                 }
752         case FIO_NET_CMD_TS:
753                 handle_ts(cmd);
754                 free(cmd);
755                 break;
756         case FIO_NET_CMD_GS:
757                 handle_gs(cmd);
758                 free(cmd);
759                 break;
760         case FIO_NET_CMD_ETA:
761                 remove_reply_cmd(client, cmd);
762                 handle_eta(client, cmd);
763                 free(cmd);
764                 break;
765         case FIO_NET_CMD_PROBE:
766                 remove_reply_cmd(client, cmd);
767                 handle_probe(client, cmd);
768                 free(cmd);
769                 break;
770         case FIO_NET_CMD_START:
771                 client->state = Client_started;
772                 free(cmd);
773                 break;
774         case FIO_NET_CMD_STOP:
775                 client->state = Client_stopped;
776                 free(cmd);
777                 break;
778         default:
779                 log_err("fio: unknown client op: %s\n", fio_server_op(cmd->opcode));
780                 free(cmd);
781                 break;
782         }
783
784         return 1;
785 }
786
787 static void request_client_etas(void)
788 {
789         struct fio_client *client;
790         struct flist_head *entry;
791         struct client_eta *eta;
792         int skipped = 0;
793
794         dprint(FD_NET, "client: request eta (%d)\n", nr_clients);
795
796         eta = malloc(sizeof(*eta));
797         memset(&eta->eta, 0, sizeof(eta->eta));
798         eta->pending = nr_clients;
799
800         flist_for_each(entry, &client_list) {
801                 client = flist_entry(entry, struct fio_client, list);
802
803                 if (!flist_empty(&client->eta_list)) {
804                         skipped++;
805                         continue;
806                 }
807
808                 assert(!client->eta_in_flight);
809                 flist_add_tail(&client->eta_list, &eta_list);
810                 client->eta_in_flight = eta;
811                 fio_net_send_simple_cmd(client->fd, FIO_NET_CMD_SEND_ETA,
812                                         (uint64_t) eta, &client->cmd_list);
813         }
814
815         while (skipped--)
816                 dec_jobs_eta(eta);
817
818         dprint(FD_NET, "client: requested eta tag %p\n", eta);
819 }
820
821 static int client_check_cmd_timeout(struct fio_client *client,
822                                     struct timeval *now)
823 {
824         struct fio_net_int_cmd *cmd;
825         struct flist_head *entry, *tmp;
826         int ret = 0;
827
828         flist_for_each_safe(entry, tmp, &client->cmd_list) {
829                 cmd = flist_entry(entry, struct fio_net_int_cmd, list);
830
831                 if (mtime_since(&cmd->tv, now) < FIO_NET_CLIENT_TIMEOUT)
832                         continue;
833
834                 log_err("fio: client %s, timeout on cmd %s\n", client->hostname,
835                                                 fio_server_op(cmd->cmd.opcode));
836                 flist_del(&cmd->list);
837                 free(cmd);
838                 ret = 1;
839         }
840
841         return flist_empty(&client->cmd_list) && ret;
842 }
843
844 static int fio_client_timed_out(void)
845 {
846         struct fio_client *client;
847         struct flist_head *entry, *tmp;
848         struct timeval tv;
849         int ret = 0;
850
851         gettimeofday(&tv, NULL);
852
853         flist_for_each_safe(entry, tmp, &client_list) {
854                 client = flist_entry(entry, struct fio_client, list);
855
856                 if (flist_empty(&client->cmd_list))
857                         continue;
858
859                 if (!client_check_cmd_timeout(client, &tv))
860                         continue;
861
862                 log_err("fio: client %s timed out\n", client->hostname);
863                 remove_client(client);
864                 ret = 1;
865         }
866
867         return ret;
868 }
869
870 int fio_handle_clients(void)
871 {
872         struct fio_client *client;
873         struct flist_head *entry;
874         struct pollfd *pfds;
875         int i, ret = 0;
876
877         gettimeofday(&eta_tv, NULL);
878
879         pfds = malloc(nr_clients * sizeof(struct pollfd));
880
881         sum_stat_clients = nr_clients;
882         init_thread_stat(&client_ts);
883         init_group_run_stat(&client_gs);
884
885         while (!exit_backend && nr_clients) {
886                 i = 0;
887                 flist_for_each(entry, &client_list) {
888                         client = flist_entry(entry, struct fio_client, list);
889
890                         pfds[i].fd = client->fd;
891                         pfds[i].events = POLLIN;
892                         i++;
893                 }
894
895                 assert(i == nr_clients);
896
897                 do {
898                         struct timeval tv;
899
900                         gettimeofday(&tv, NULL);
901                         if (mtime_since(&eta_tv, &tv) >= 900) {
902                                 request_client_etas();
903                                 memcpy(&eta_tv, &tv, sizeof(tv));
904
905                                 if (fio_client_timed_out())
906                                         break;
907                         }
908
909                         ret = poll(pfds, nr_clients, 100);
910                         if (ret < 0) {
911                                 if (errno == EINTR)
912                                         continue;
913                                 log_err("fio: poll clients: %s\n", strerror(errno));
914                                 break;
915                         } else if (!ret)
916                                 continue;
917                 } while (ret <= 0);
918
919                 for (i = 0; i < nr_clients; i++) {
920                         if (!(pfds[i].revents & POLLIN))
921                                 continue;
922
923                         client = find_client_by_fd(pfds[i].fd);
924                         if (!client) {
925                                 log_err("fio: unknown client fd %d\n", pfds[i].fd);
926                                 continue;
927                         }
928                         if (!handle_client(client)) {
929                                 log_info("client: host=%s disconnected\n",
930                                                 client->hostname);
931                                 remove_client(client);
932                         }
933                 }
934         }
935
936         free(pfds);
937         return 0;
938 }