core: Removing duplicated code
[fio.git] / client.c
1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <unistd.h>
4 #include <limits.h>
5 #include <errno.h>
6 #include <fcntl.h>
7 #include <sys/poll.h>
8 #include <sys/types.h>
9 #include <sys/stat.h>
10 #include <sys/wait.h>
11 #include <sys/socket.h>
12 #include <sys/un.h>
13 #include <netinet/in.h>
14 #include <arpa/inet.h>
15 #include <netdb.h>
16 #include <signal.h>
17 #ifdef CONFIG_ZLIB
18 #include <zlib.h>
19 #endif
20
21 #include "fio.h"
22 #include "client.h"
23 #include "server.h"
24 #include "flist.h"
25 #include "hash.h"
26
27 static void handle_du(struct fio_client *client, struct fio_net_cmd *cmd);
28 static void handle_ts(struct fio_client *client, struct fio_net_cmd *cmd);
29 static void handle_gs(struct fio_client *client, struct fio_net_cmd *cmd);
30 static void handle_probe(struct fio_client *client, struct fio_net_cmd *cmd);
31 static void handle_text(struct fio_client *client, struct fio_net_cmd *cmd);
32 static void handle_stop(struct fio_client *client, struct fio_net_cmd *cmd);
33 static void handle_start(struct fio_client *client, struct fio_net_cmd *cmd);
34
35 struct client_ops fio_client_ops = {
36         .text           = handle_text,
37         .disk_util      = handle_du,
38         .thread_status  = handle_ts,
39         .group_stats    = handle_gs,
40         .stop           = handle_stop,
41         .start          = handle_start,
42         .eta            = display_thread_status,
43         .probe          = handle_probe,
44         .eta_msec       = FIO_CLIENT_DEF_ETA_MSEC,
45         .client_type    = FIO_CLIENT_TYPE_CLI,
46 };
47
48 static struct timeval eta_tv;
49
50 static FLIST_HEAD(client_list);
51 static FLIST_HEAD(eta_list);
52
53 static FLIST_HEAD(arg_list);
54
55 struct thread_stat client_ts;
56 struct group_run_stats client_gs;
57 int sum_stat_clients;
58
59 static int sum_stat_nr;
60 static int do_output_all_clients;
61
62 #define FIO_CLIENT_HASH_BITS    7
63 #define FIO_CLIENT_HASH_SZ      (1 << FIO_CLIENT_HASH_BITS)
64 #define FIO_CLIENT_HASH_MASK    (FIO_CLIENT_HASH_SZ - 1)
65 static struct flist_head client_hash[FIO_CLIENT_HASH_SZ];
66
67 static void fio_client_add_hash(struct fio_client *client)
68 {
69         int bucket = hash_long(client->fd, FIO_CLIENT_HASH_BITS);
70
71         bucket &= FIO_CLIENT_HASH_MASK;
72         flist_add(&client->hash_list, &client_hash[bucket]);
73 }
74
75 static void fio_client_remove_hash(struct fio_client *client)
76 {
77         if (!flist_empty(&client->hash_list))
78                 flist_del_init(&client->hash_list);
79 }
80
81 static void fio_init fio_client_hash_init(void)
82 {
83         int i;
84
85         for (i = 0; i < FIO_CLIENT_HASH_SZ; i++)
86                 INIT_FLIST_HEAD(&client_hash[i]);
87 }
88
89 static struct fio_client *find_client_by_fd(int fd)
90 {
91         int bucket = hash_long(fd, FIO_CLIENT_HASH_BITS) & FIO_CLIENT_HASH_MASK;
92         struct fio_client *client;
93         struct flist_head *entry;
94
95         flist_for_each(entry, &client_hash[bucket]) {
96                 client = flist_entry(entry, struct fio_client, hash_list);
97
98                 if (client->fd == fd) {
99                         client->refs++;
100                         return client;
101                 }
102         }
103
104         return NULL;
105 }
106
107 void fio_put_client(struct fio_client *client)
108 {
109         if (--client->refs)
110                 return;
111
112         free(client->hostname);
113         if (client->argv)
114                 free(client->argv);
115         if (client->name)
116                 free(client->name);
117         while (client->nr_ini_file)
118                 free(client->ini_file[--client->nr_ini_file]);
119         if (client->ini_file)
120                 free(client->ini_file);
121
122         if (!client->did_stat)
123                 sum_stat_clients -= client->nr_stat;
124
125         free(client);
126 }
127
128 static void remove_client(struct fio_client *client)
129 {
130         assert(client->refs);
131
132         dprint(FD_NET, "client: removed <%s>\n", client->hostname);
133
134         if (!flist_empty(&client->list))
135                 flist_del_init(&client->list);
136
137         fio_client_remove_hash(client);
138
139         if (!flist_empty(&client->eta_list)) {
140                 flist_del_init(&client->eta_list);
141                 fio_client_dec_jobs_eta(client->eta_in_flight, client->ops->eta);
142         }
143
144         close(client->fd);
145         client->fd = -1;
146
147         if (client->ops->removed)
148                 client->ops->removed(client);
149
150         nr_clients--;
151         fio_put_client(client);
152 }
153
154 struct fio_client *fio_get_client(struct fio_client *client)
155 {
156         client->refs++;
157         return client;
158 }
159
160 static void __fio_client_add_cmd_option(struct fio_client *client,
161                                         const char *opt)
162 {
163         int index;
164
165         index = client->argc++;
166         client->argv = realloc(client->argv, sizeof(char *) * client->argc);
167         client->argv[index] = strdup(opt);
168         dprint(FD_NET, "client: add cmd %d: %s\n", index, opt);
169 }
170
171 void fio_client_add_cmd_option(void *cookie, const char *opt)
172 {
173         struct fio_client *client = cookie;
174         struct flist_head *entry;
175
176         if (!client || !opt)
177                 return;
178
179         __fio_client_add_cmd_option(client, opt);
180
181         /*
182          * Duplicate arguments to shared client group
183          */
184         flist_for_each(entry, &arg_list) {
185                 client = flist_entry(entry, struct fio_client, arg_list);
186
187                 __fio_client_add_cmd_option(client, opt);
188         }
189 }
190
191 struct fio_client *fio_client_add_explicit(struct client_ops *ops,
192                                            const char *hostname, int type,
193                                            int port)
194 {
195         struct fio_client *client;
196
197         client = malloc(sizeof(*client));
198         memset(client, 0, sizeof(*client));
199
200         INIT_FLIST_HEAD(&client->list);
201         INIT_FLIST_HEAD(&client->hash_list);
202         INIT_FLIST_HEAD(&client->arg_list);
203         INIT_FLIST_HEAD(&client->eta_list);
204         INIT_FLIST_HEAD(&client->cmd_list);
205
206         client->hostname = strdup(hostname);
207
208         if (type == Fio_client_socket)
209                 client->is_sock = 1;
210         else {
211                 int ipv6;
212
213                 ipv6 = type == Fio_client_ipv6;
214                 if (fio_server_parse_host(hostname, &ipv6,
215                                                 &client->addr.sin_addr,
216                                                 &client->addr6.sin6_addr))
217                         goto err;
218
219                 client->port = port;
220         }
221
222         client->fd = -1;
223         client->ops = ops;
224         client->refs = 1;
225         client->type = ops->client_type;
226
227         __fio_client_add_cmd_option(client, "fio");
228
229         flist_add(&client->list, &client_list);
230         nr_clients++;
231         dprint(FD_NET, "client: added <%s>\n", client->hostname);
232         return client;
233 err:
234         free(client);
235         return NULL;
236 }
237
238 void fio_client_add_ini_file(void *cookie, const char *ini_file)
239 {
240         struct fio_client *client = cookie;
241         size_t new_size;
242
243         dprint(FD_NET, "client <%s>: add ini %s\n", client->hostname, ini_file);
244
245         new_size = (client->nr_ini_file + 1) * sizeof(char *);
246         client->ini_file = realloc(client->ini_file, new_size);
247         client->ini_file[client->nr_ini_file] = strdup(ini_file);
248         client->nr_ini_file++;
249 }
250
251 int fio_client_add(struct client_ops *ops, const char *hostname, void **cookie)
252 {
253         struct fio_client *existing = *cookie;
254         struct fio_client *client;
255
256         if (existing) {
257                 /*
258                  * We always add our "exec" name as the option, hence 1
259                  * means empty.
260                  */
261                 if (existing->argc == 1)
262                         flist_add_tail(&existing->arg_list, &arg_list);
263                 else {
264                         while (!flist_empty(&arg_list))
265                                 flist_del_init(arg_list.next);
266                 }
267         }
268
269         client = malloc(sizeof(*client));
270         memset(client, 0, sizeof(*client));
271
272         INIT_FLIST_HEAD(&client->list);
273         INIT_FLIST_HEAD(&client->hash_list);
274         INIT_FLIST_HEAD(&client->arg_list);
275         INIT_FLIST_HEAD(&client->eta_list);
276         INIT_FLIST_HEAD(&client->cmd_list);
277
278         if (fio_server_parse_string(hostname, &client->hostname,
279                                         &client->is_sock, &client->port,
280                                         &client->addr.sin_addr,
281                                         &client->addr6.sin6_addr,
282                                         &client->ipv6))
283                 return -1;
284
285         client->fd = -1;
286         client->ops = ops;
287         client->refs = 1;
288         client->type = ops->client_type;
289
290         __fio_client_add_cmd_option(client, "fio");
291
292         flist_add(&client->list, &client_list);
293         nr_clients++;
294         dprint(FD_NET, "client: added <%s>\n", client->hostname);
295         *cookie = client;
296         return 0;
297 }
298
299 static void probe_client(struct fio_client *client)
300 {
301         struct cmd_client_probe_pdu pdu;
302         uint64_t tag;
303
304         dprint(FD_NET, "client: send probe\n");
305
306 #ifdef CONFIG_ZLIB
307         pdu.flags = __le64_to_cpu(FIO_PROBE_FLAG_ZLIB);
308 #else
309         pdu.flags = 0;
310 #endif
311
312         fio_net_send_cmd(client->fd, FIO_NET_CMD_PROBE, &pdu, sizeof(pdu), &tag, &client->cmd_list);
313 }
314
315 static int fio_client_connect_ip(struct fio_client *client)
316 {
317         struct sockaddr *addr;
318         socklen_t socklen;
319         int fd, domain;
320
321         if (client->ipv6) {
322                 client->addr6.sin6_family = AF_INET6;
323                 client->addr6.sin6_port = htons(client->port);
324                 domain = AF_INET6;
325                 addr = (struct sockaddr *) &client->addr6;
326                 socklen = sizeof(client->addr6);
327         } else {
328                 client->addr.sin_family = AF_INET;
329                 client->addr.sin_port = htons(client->port);
330                 domain = AF_INET;
331                 addr = (struct sockaddr *) &client->addr;
332                 socklen = sizeof(client->addr);
333         }
334
335         fd = socket(domain, SOCK_STREAM, 0);
336         if (fd < 0) {
337                 int ret = -errno;
338
339                 log_err("fio: socket: %s\n", strerror(errno));
340                 return ret;
341         }
342
343         if (connect(fd, addr, socklen) < 0) {
344                 int ret = -errno;
345
346                 log_err("fio: connect: %s\n", strerror(errno));
347                 log_err("fio: failed to connect to %s:%u\n", client->hostname,
348                                                                 client->port);
349                 close(fd);
350                 return ret;
351         }
352
353         return fd;
354 }
355
356 static int fio_client_connect_sock(struct fio_client *client)
357 {
358         struct sockaddr_un *addr = &client->addr_un;
359         socklen_t len;
360         int fd;
361
362         memset(addr, 0, sizeof(*addr));
363         addr->sun_family = AF_UNIX;
364         strcpy(addr->sun_path, client->hostname);
365
366         fd = socket(AF_UNIX, SOCK_STREAM, 0);
367         if (fd < 0) {
368                 int ret = -errno;
369
370                 log_err("fio: socket: %s\n", strerror(errno));
371                 return ret;
372         }
373
374         len = sizeof(addr->sun_family) + strlen(addr->sun_path) + 1;
375         if (connect(fd, (struct sockaddr *) addr, len) < 0) {
376                 int ret = -errno;
377
378                 log_err("fio: connect; %s\n", strerror(errno));
379                 close(fd);
380                 return ret;
381         }
382
383         return fd;
384 }
385
386 int fio_client_connect(struct fio_client *client)
387 {
388         int fd;
389
390         dprint(FD_NET, "client: connect to host %s\n", client->hostname);
391
392         if (client->is_sock)
393                 fd = fio_client_connect_sock(client);
394         else
395                 fd = fio_client_connect_ip(client);
396
397         dprint(FD_NET, "client: %s connected %d\n", client->hostname, fd);
398
399         if (fd < 0)
400                 return fd;
401
402         client->fd = fd;
403         fio_client_add_hash(client);
404         client->state = Client_connected;
405
406         probe_client(client);
407         return 0;
408 }
409
410 int fio_client_terminate(struct fio_client *client)
411 {
412         return fio_net_send_quit(client->fd);
413 }
414
415 void fio_clients_terminate(void)
416 {
417         struct flist_head *entry;
418         struct fio_client *client;
419
420         dprint(FD_NET, "client: terminate clients\n");
421
422         flist_for_each(entry, &client_list) {
423                 client = flist_entry(entry, struct fio_client, list);
424                 fio_client_terminate(client);
425         }
426 }
427
428 static void sig_int(int sig)
429 {
430         dprint(FD_NET, "client: got signal %d\n", sig);
431         fio_clients_terminate();
432 }
433
434 static void sig_show_status(int sig)
435 {
436         show_running_run_stats();
437 }
438
439 static void client_signal_handler(void)
440 {
441         struct sigaction act;
442
443         memset(&act, 0, sizeof(act));
444         act.sa_handler = sig_int;
445         act.sa_flags = SA_RESTART;
446         sigaction(SIGINT, &act, NULL);
447
448         memset(&act, 0, sizeof(act));
449         act.sa_handler = sig_int;
450         act.sa_flags = SA_RESTART;
451         sigaction(SIGTERM, &act, NULL);
452
453 /* Windows uses SIGBREAK as a quit signal from other applications */
454 #ifdef WIN32
455         memset(&act, 0, sizeof(act));
456         act.sa_handler = sig_int;
457         act.sa_flags = SA_RESTART;
458         sigaction(SIGBREAK, &act, NULL);
459 #endif
460
461         memset(&act, 0, sizeof(act));
462         act.sa_handler = sig_show_status;
463         act.sa_flags = SA_RESTART;
464         sigaction(SIGUSR1, &act, NULL);
465 }
466
467 static int send_client_cmd_line(struct fio_client *client)
468 {
469         struct cmd_single_line_pdu *cslp;
470         struct cmd_line_pdu *clp;
471         unsigned long offset;
472         unsigned int *lens;
473         void *pdu;
474         size_t mem;
475         int i, ret;
476
477         dprint(FD_NET, "client: send cmdline %d\n", client->argc);
478
479         lens = malloc(client->argc * sizeof(unsigned int));
480
481         /*
482          * Find out how much mem we need
483          */
484         for (i = 0, mem = 0; i < client->argc; i++) {
485                 lens[i] = strlen(client->argv[i]) + 1;
486                 mem += lens[i];
487         }
488
489         /*
490          * We need one cmd_line_pdu, and argc number of cmd_single_line_pdu
491          */
492         mem += sizeof(*clp) + (client->argc * sizeof(*cslp));
493
494         pdu = malloc(mem);
495         clp = pdu;
496         offset = sizeof(*clp);
497
498         for (i = 0; i < client->argc; i++) {
499                 uint16_t arg_len = lens[i];
500
501                 cslp = pdu + offset;
502                 strcpy((char *) cslp->text, client->argv[i]);
503                 cslp->len = cpu_to_le16(arg_len);
504                 offset += sizeof(*cslp) + arg_len;
505         }
506
507         free(lens);
508         clp->lines = cpu_to_le16(client->argc);
509         clp->client_type = __cpu_to_le16(client->type);
510         ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_JOBLINE, pdu, mem, NULL, NULL);
511         free(pdu);
512         return ret;
513 }
514
515 int fio_clients_connect(void)
516 {
517         struct fio_client *client;
518         struct flist_head *entry, *tmp;
519         int ret;
520
521 #ifdef WIN32
522         WSADATA wsd;
523         WSAStartup(MAKEWORD(2, 2), &wsd);
524 #endif
525
526         dprint(FD_NET, "client: connect all\n");
527
528         client_signal_handler();
529
530         flist_for_each_safe(entry, tmp, &client_list) {
531                 client = flist_entry(entry, struct fio_client, list);
532
533                 ret = fio_client_connect(client);
534                 if (ret) {
535                         remove_client(client);
536                         continue;
537                 }
538
539                 if (client->argc > 1)
540                         send_client_cmd_line(client);
541         }
542
543         return !nr_clients;
544 }
545
546 int fio_start_client(struct fio_client *client)
547 {
548         dprint(FD_NET, "client: start %s\n", client->hostname);
549         return fio_net_send_simple_cmd(client->fd, FIO_NET_CMD_RUN, 0, NULL);
550 }
551
552 int fio_start_all_clients(void)
553 {
554         struct fio_client *client;
555         struct flist_head *entry, *tmp;
556         int ret;
557
558         dprint(FD_NET, "client: start all\n");
559
560         flist_for_each_safe(entry, tmp, &client_list) {
561                 client = flist_entry(entry, struct fio_client, list);
562
563                 ret = fio_start_client(client);
564                 if (ret) {
565                         remove_client(client);
566                         continue;
567                 }
568         }
569
570         return flist_empty(&client_list);
571 }
572
573 /*
574  * Send file contents to server backend. We could use sendfile(), but to remain
575  * more portable lets just read/write the darn thing.
576  */
577 static int __fio_client_send_ini(struct fio_client *client, const char *filename)
578 {
579         struct cmd_job_pdu *pdu;
580         size_t p_size;
581         struct stat sb;
582         char *p;
583         void *buf;
584         off_t len;
585         int fd, ret;
586
587         dprint(FD_NET, "send ini %s to %s\n", filename, client->hostname);
588
589         fd = open(filename, O_RDONLY);
590         if (fd < 0) {
591                 int ret = -errno;
592
593                 log_err("fio: job file <%s> open: %s\n", filename, strerror(errno));
594                 return ret;
595         }
596
597         if (fstat(fd, &sb) < 0) {
598                 int ret = -errno;
599
600                 log_err("fio: job file stat: %s\n", strerror(errno));
601                 close(fd);
602                 return ret;
603         }
604
605         p_size = sb.st_size + sizeof(*pdu);
606         pdu = malloc(p_size);
607         buf = pdu->buf;
608
609         len = sb.st_size;
610         p = buf;
611         do {
612                 ret = read(fd, p, len);
613                 if (ret > 0) {
614                         len -= ret;
615                         if (!len)
616                                 break;
617                         p += ret;
618                         continue;
619                 } else if (!ret)
620                         break;
621                 else if (errno == EAGAIN || errno == EINTR)
622                         continue;
623         } while (1);
624
625         if (len) {
626                 log_err("fio: failed reading job file %s\n", filename);
627                 close(fd);
628                 free(buf);
629                 return 1;
630         }
631
632         pdu->buf_len = __cpu_to_le32(sb.st_size);
633         pdu->client_type = cpu_to_le32(client->type);
634
635         client->sent_job = 1;
636         ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_JOB, pdu, p_size, NULL, NULL);
637         free(pdu);
638         close(fd);
639         return ret;
640 }
641
642 int fio_client_send_ini(struct fio_client *client, const char *filename)
643 {
644         int ret;
645
646         ret = __fio_client_send_ini(client, filename);
647         if (!ret)
648                 client->sent_job = 1;
649
650         return ret;
651 }
652
653 int fio_clients_send_ini(const char *filename)
654 {
655         struct fio_client *client;
656         struct flist_head *entry, *tmp;
657
658         flist_for_each_safe(entry, tmp, &client_list) {
659                 client = flist_entry(entry, struct fio_client, list);
660
661                 if (client->nr_ini_file) {
662                         int i;
663
664                         for (i = 0; i < client->nr_ini_file; i++) {
665                                 const char *ini = client->ini_file[i];
666
667                                 if (fio_client_send_ini(client, ini)) {
668                                         remove_client(client);
669                                         break;
670                                 }
671                         }
672                 } else if (!filename || fio_client_send_ini(client, filename))
673                         remove_client(client);
674         }
675
676         return !nr_clients;
677 }
678
679 int fio_client_update_options(struct fio_client *client,
680                               struct thread_options *o, uint64_t *tag)
681 {
682         struct cmd_add_job_pdu pdu;
683
684         pdu.thread_number = cpu_to_le32(client->thread_number);
685         pdu.groupid = cpu_to_le32(client->groupid);
686         convert_thread_options_to_net(&pdu.top, o);
687         
688         return fio_net_send_cmd(client->fd, FIO_NET_CMD_UPDATE_JOB, &pdu, sizeof(pdu), tag, &client->cmd_list);
689 }
690
691 static void convert_io_stat(struct io_stat *dst, struct io_stat *src)
692 {
693         dst->max_val    = le64_to_cpu(src->max_val);
694         dst->min_val    = le64_to_cpu(src->min_val);
695         dst->samples    = le64_to_cpu(src->samples);
696
697         /*
698          * Floats arrive as IEEE 754 encoded uint64_t, convert back to double
699          */
700         dst->mean.u.f   = fio_uint64_to_double(le64_to_cpu(dst->mean.u.i));
701         dst->S.u.f      = fio_uint64_to_double(le64_to_cpu(dst->S.u.i));
702 }
703
704 static void convert_ts(struct thread_stat *dst, struct thread_stat *src)
705 {
706         int i, j;
707
708         dst->error              = le32_to_cpu(src->error);
709         dst->thread_number      = le32_to_cpu(src->thread_number);
710         dst->groupid            = le32_to_cpu(src->groupid);
711         dst->pid                = le32_to_cpu(src->pid);
712         dst->members            = le32_to_cpu(src->members);
713         dst->unified_rw_rep     = le32_to_cpu(src->unified_rw_rep);
714
715         for (i = 0; i < DDIR_RWDIR_CNT; i++) {
716                 convert_io_stat(&dst->clat_stat[i], &src->clat_stat[i]);
717                 convert_io_stat(&dst->slat_stat[i], &src->slat_stat[i]);
718                 convert_io_stat(&dst->lat_stat[i], &src->lat_stat[i]);
719                 convert_io_stat(&dst->bw_stat[i], &src->bw_stat[i]);
720         }
721
722         dst->usr_time           = le64_to_cpu(src->usr_time);
723         dst->sys_time           = le64_to_cpu(src->sys_time);
724         dst->ctx                = le64_to_cpu(src->ctx);
725         dst->minf               = le64_to_cpu(src->minf);
726         dst->majf               = le64_to_cpu(src->majf);
727         dst->clat_percentiles   = le64_to_cpu(src->clat_percentiles);
728
729         for (i = 0; i < FIO_IO_U_LIST_MAX_LEN; i++) {
730                 fio_fp64_t *fps = &src->percentile_list[i];
731                 fio_fp64_t *fpd = &dst->percentile_list[i];
732
733                 fpd->u.f = fio_uint64_to_double(le64_to_cpu(fps->u.i));
734         }
735
736         for (i = 0; i < FIO_IO_U_MAP_NR; i++) {
737                 dst->io_u_map[i]        = le32_to_cpu(src->io_u_map[i]);
738                 dst->io_u_submit[i]     = le32_to_cpu(src->io_u_submit[i]);
739                 dst->io_u_complete[i]   = le32_to_cpu(src->io_u_complete[i]);
740         }
741
742         for (i = 0; i < FIO_IO_U_LAT_U_NR; i++) {
743                 dst->io_u_lat_u[i]      = le32_to_cpu(src->io_u_lat_u[i]);
744                 dst->io_u_lat_m[i]      = le32_to_cpu(src->io_u_lat_m[i]);
745         }
746
747         for (i = 0; i < DDIR_RWDIR_CNT; i++)
748                 for (j = 0; j < FIO_IO_U_PLAT_NR; j++)
749                         dst->io_u_plat[i][j] = le32_to_cpu(src->io_u_plat[i][j]);
750
751         for (i = 0; i < DDIR_RWDIR_CNT; i++) {
752                 dst->total_io_u[i]      = le64_to_cpu(src->total_io_u[i]);
753                 dst->short_io_u[i]      = le64_to_cpu(src->short_io_u[i]);
754         }
755
756         dst->total_submit       = le64_to_cpu(src->total_submit);
757         dst->total_complete     = le64_to_cpu(src->total_complete);
758
759         for (i = 0; i < DDIR_RWDIR_CNT; i++) {
760                 dst->io_bytes[i]        = le64_to_cpu(src->io_bytes[i]);
761                 dst->runtime[i]         = le64_to_cpu(src->runtime[i]);
762         }
763
764         dst->total_run_time     = le64_to_cpu(src->total_run_time);
765         dst->continue_on_error  = le16_to_cpu(src->continue_on_error);
766         dst->total_err_count    = le64_to_cpu(src->total_err_count);
767         dst->first_error        = le32_to_cpu(src->first_error);
768         dst->kb_base            = le32_to_cpu(src->kb_base);
769         dst->unit_base          = le32_to_cpu(src->unit_base);
770 }
771
772 static void convert_gs(struct group_run_stats *dst, struct group_run_stats *src)
773 {
774         int i;
775
776         for (i = 0; i < DDIR_RWDIR_CNT; i++) {
777                 dst->max_run[i]         = le64_to_cpu(src->max_run[i]);
778                 dst->min_run[i]         = le64_to_cpu(src->min_run[i]);
779                 dst->max_bw[i]          = le64_to_cpu(src->max_bw[i]);
780                 dst->min_bw[i]          = le64_to_cpu(src->min_bw[i]);
781                 dst->io_kb[i]           = le64_to_cpu(src->io_kb[i]);
782                 dst->agg[i]             = le64_to_cpu(src->agg[i]);
783         }
784
785         dst->kb_base    = le32_to_cpu(src->kb_base);
786         dst->unit_base  = le32_to_cpu(src->unit_base);
787         dst->groupid    = le32_to_cpu(src->groupid);
788         dst->unified_rw_rep     = le32_to_cpu(src->unified_rw_rep);
789 }
790
791 static void handle_ts(struct fio_client *client, struct fio_net_cmd *cmd)
792 {
793         struct cmd_ts_pdu *p = (struct cmd_ts_pdu *) cmd->payload;
794
795         show_thread_status(&p->ts, &p->rs);
796         client->did_stat = 1;
797
798         if (!do_output_all_clients)
799                 return;
800
801         sum_thread_stats(&client_ts, &p->ts, sum_stat_nr);
802         sum_group_stats(&client_gs, &p->rs);
803
804         client_ts.members++;
805         client_ts.thread_number = p->ts.thread_number;
806         client_ts.groupid = p->ts.groupid;
807         client_ts.unified_rw_rep = p->ts.unified_rw_rep;
808
809         if (++sum_stat_nr == sum_stat_clients) {
810                 strcpy(client_ts.name, "All clients");
811                 show_thread_status(&client_ts, &client_gs);
812         }
813 }
814
815 static void handle_gs(struct fio_client *client, struct fio_net_cmd *cmd)
816 {
817         struct group_run_stats *gs = (struct group_run_stats *) cmd->payload;
818
819         show_group_stats(gs);
820 }
821
822 static void handle_text(struct fio_client *client, struct fio_net_cmd *cmd)
823 {
824         struct cmd_text_pdu *pdu = (struct cmd_text_pdu *) cmd->payload;
825         const char *buf = (const char *) pdu->buf;
826         const char *name;
827         int fio_unused ret;
828
829         name = client->name ? client->name : client->hostname;
830
831         if (!client->skip_newline)
832                 fprintf(f_out, "<%s> ", name);
833         ret = fwrite(buf, pdu->buf_len, 1, f_out);
834         fflush(f_out);
835         client->skip_newline = strchr(buf, '\n') == NULL;
836 }
837
838 static void convert_agg(struct disk_util_agg *agg)
839 {
840         int i;
841
842         for (i = 0; i < 2; i++) {
843                 agg->ios[i]     = le32_to_cpu(agg->ios[i]);
844                 agg->merges[i]  = le32_to_cpu(agg->merges[i]);
845                 agg->sectors[i] = le64_to_cpu(agg->sectors[i]);
846                 agg->ticks[i]   = le32_to_cpu(agg->ticks[i]);
847         }
848
849         agg->io_ticks           = le32_to_cpu(agg->io_ticks);
850         agg->time_in_queue      = le32_to_cpu(agg->time_in_queue);
851         agg->slavecount         = le32_to_cpu(agg->slavecount);
852         agg->max_util.u.f       = fio_uint64_to_double(__le64_to_cpu(agg->max_util.u.i));
853 }
854
855 static void convert_dus(struct disk_util_stat *dus)
856 {
857         int i;
858
859         for (i = 0; i < 2; i++) {
860                 dus->ios[i]     = le32_to_cpu(dus->ios[i]);
861                 dus->merges[i]  = le32_to_cpu(dus->merges[i]);
862                 dus->sectors[i] = le64_to_cpu(dus->sectors[i]);
863                 dus->ticks[i]   = le32_to_cpu(dus->ticks[i]);
864         }
865
866         dus->io_ticks           = le32_to_cpu(dus->io_ticks);
867         dus->time_in_queue      = le32_to_cpu(dus->time_in_queue);
868         dus->msec               = le64_to_cpu(dus->msec);
869 }
870
871 static void handle_du(struct fio_client *client, struct fio_net_cmd *cmd)
872 {
873         struct cmd_du_pdu *du = (struct cmd_du_pdu *) cmd->payload;
874
875         if (!client->disk_stats_shown) {
876                 client->disk_stats_shown = 1;
877                 log_info("\nDisk stats (read/write):\n");
878         }
879
880         print_disk_util(&du->dus, &du->agg, output_format == FIO_OUTPUT_TERSE);
881 }
882
883 static void convert_jobs_eta(struct jobs_eta *je)
884 {
885         int i;
886
887         je->nr_running          = le32_to_cpu(je->nr_running);
888         je->nr_ramp             = le32_to_cpu(je->nr_ramp);
889         je->nr_pending          = le32_to_cpu(je->nr_pending);
890         je->nr_setting_up       = le32_to_cpu(je->nr_setting_up);
891         je->files_open          = le32_to_cpu(je->files_open);
892
893         for (i = 0; i < DDIR_RWDIR_CNT; i++) {
894                 je->m_rate[i]   = le32_to_cpu(je->m_rate[i]);
895                 je->t_rate[i]   = le32_to_cpu(je->t_rate[i]);
896                 je->m_iops[i]   = le32_to_cpu(je->m_iops[i]);
897                 je->t_iops[i]   = le32_to_cpu(je->t_iops[i]);
898                 je->rate[i]     = le32_to_cpu(je->rate[i]);
899                 je->iops[i]     = le32_to_cpu(je->iops[i]);
900         }
901
902         je->elapsed_sec         = le64_to_cpu(je->elapsed_sec);
903         je->eta_sec             = le64_to_cpu(je->eta_sec);
904         je->nr_threads          = le32_to_cpu(je->nr_threads);
905         je->is_pow2             = le32_to_cpu(je->is_pow2);
906         je->unit_base           = le32_to_cpu(je->unit_base);
907 }
908
909 void fio_client_sum_jobs_eta(struct jobs_eta *dst, struct jobs_eta *je)
910 {
911         int i;
912
913         dst->nr_running         += je->nr_running;
914         dst->nr_ramp            += je->nr_ramp;
915         dst->nr_pending         += je->nr_pending;
916         dst->nr_setting_up      += je->nr_setting_up;
917         dst->files_open         += je->files_open;
918
919         for (i = 0; i < DDIR_RWDIR_CNT; i++) {
920                 dst->m_rate[i]  += je->m_rate[i];
921                 dst->t_rate[i]  += je->t_rate[i];
922                 dst->m_iops[i]  += je->m_iops[i];
923                 dst->t_iops[i]  += je->t_iops[i];
924                 dst->rate[i]    += je->rate[i];
925                 dst->iops[i]    += je->iops[i];
926         }
927
928         dst->elapsed_sec        += je->elapsed_sec;
929
930         if (je->eta_sec > dst->eta_sec)
931                 dst->eta_sec = je->eta_sec;
932
933         dst->nr_threads         += je->nr_threads;
934         /* we need to handle je->run_str too ... */
935 }
936
937 void fio_client_dec_jobs_eta(struct client_eta *eta, client_eta_op eta_fn)
938 {
939         if (!--eta->pending) {
940                 eta_fn(&eta->eta);
941                 free(eta);
942         }
943 }
944
945 static void remove_reply_cmd(struct fio_client *client, struct fio_net_cmd *cmd)
946 {
947         struct fio_net_cmd_reply *reply = NULL;
948         struct flist_head *entry;
949
950         flist_for_each(entry, &client->cmd_list) {
951                 reply = flist_entry(entry, struct fio_net_cmd_reply, list);
952
953                 if (cmd->tag == (uintptr_t) reply)
954                         break;
955
956                 reply = NULL;
957         }
958
959         if (!reply) {
960                 log_err("fio: client: unable to find matching tag (%llx)\n", (unsigned long long) cmd->tag);
961                 return;
962         }
963
964         flist_del(&reply->list);
965         cmd->tag = reply->saved_tag;
966         free(reply);
967 }
968
969 int fio_client_wait_for_reply(struct fio_client *client, uint64_t tag)
970 {
971         do {
972                 struct fio_net_cmd_reply *reply = NULL;
973                 struct flist_head *entry;
974
975                 flist_for_each(entry, &client->cmd_list) {
976                         reply = flist_entry(entry, struct fio_net_cmd_reply, list);
977
978                         if (tag == (uintptr_t) reply)
979                                 break;
980
981                         reply = NULL;
982                 }
983
984                 if (!reply)
985                         break;
986
987                 usleep(1000);
988         } while (1);
989
990         return 0;
991 }
992
993 static void handle_eta(struct fio_client *client, struct fio_net_cmd *cmd)
994 {
995         struct jobs_eta *je = (struct jobs_eta *) cmd->payload;
996         struct client_eta *eta = (struct client_eta *) (uintptr_t) cmd->tag;
997
998         dprint(FD_NET, "client: got eta tag %p, %d\n", eta, eta->pending);
999
1000         assert(client->eta_in_flight == eta);
1001
1002         client->eta_in_flight = NULL;
1003         flist_del_init(&client->eta_list);
1004
1005         if (client->ops->jobs_eta)
1006                 client->ops->jobs_eta(client, je);
1007
1008         fio_client_sum_jobs_eta(&eta->eta, je);
1009         fio_client_dec_jobs_eta(eta, client->ops->eta);
1010 }
1011
1012 static void handle_probe(struct fio_client *client, struct fio_net_cmd *cmd)
1013 {
1014         struct cmd_probe_reply_pdu *probe = (struct cmd_probe_reply_pdu *) cmd->payload;
1015         const char *os, *arch;
1016         char bit[16];
1017
1018         os = fio_get_os_string(probe->os);
1019         if (!os)
1020                 os = "unknown";
1021
1022         arch = fio_get_arch_string(probe->arch);
1023         if (!arch)
1024                 os = "unknown";
1025
1026         sprintf(bit, "%d-bit", probe->bpp * 8);
1027         probe->flags = le64_to_cpu(probe->flags);
1028
1029         log_info("hostname=%s, be=%u, %s, os=%s, arch=%s, fio=%s, flags=%lx\n",
1030                 probe->hostname, probe->bigendian, bit, os, arch,
1031                 probe->fio_version, (unsigned long) probe->flags);
1032
1033         if (!client->name)
1034                 client->name = strdup((char *) probe->hostname);
1035 }
1036
1037 static void handle_start(struct fio_client *client, struct fio_net_cmd *cmd)
1038 {
1039         struct cmd_start_pdu *pdu = (struct cmd_start_pdu *) cmd->payload;
1040
1041         client->state = Client_started;
1042         client->jobs = le32_to_cpu(pdu->jobs);
1043         client->nr_stat = le32_to_cpu(pdu->stat_outputs);
1044
1045         if (sum_stat_clients > 1)
1046                 do_output_all_clients = 1;
1047
1048         sum_stat_clients += client->nr_stat;
1049 }
1050
1051 static void handle_stop(struct fio_client *client, struct fio_net_cmd *cmd)
1052 {
1053         if (client->error)
1054                 log_info("client <%s>: exited with error %d\n", client->hostname, client->error);
1055 }
1056
1057 static void convert_stop(struct fio_net_cmd *cmd)
1058 {
1059         struct cmd_end_pdu *pdu = (struct cmd_end_pdu *) cmd->payload;
1060
1061         pdu->error = le32_to_cpu(pdu->error);
1062 }
1063
1064 static void convert_text(struct fio_net_cmd *cmd)
1065 {
1066         struct cmd_text_pdu *pdu = (struct cmd_text_pdu *) cmd->payload;
1067
1068         pdu->level      = le32_to_cpu(pdu->level);
1069         pdu->buf_len    = le32_to_cpu(pdu->buf_len);
1070         pdu->log_sec    = le64_to_cpu(pdu->log_sec);
1071         pdu->log_usec   = le64_to_cpu(pdu->log_usec);
1072 }
1073
1074 static struct cmd_iolog_pdu *convert_iolog_gz(struct fio_net_cmd *cmd,
1075                                               struct cmd_iolog_pdu *pdu)
1076 {
1077 #ifdef CONFIG_ZLIB
1078         struct cmd_iolog_pdu *ret;
1079         z_stream stream;
1080         uint32_t nr_samples;
1081         size_t total;
1082         void *p;
1083
1084         stream.zalloc = Z_NULL;
1085         stream.zfree = Z_NULL;
1086         stream.opaque = Z_NULL;
1087         stream.avail_in = 0;
1088         stream.next_in = Z_NULL;
1089
1090         if (inflateInit(&stream) != Z_OK)
1091                 return NULL;
1092
1093         /*
1094          * Get header first, it's not compressed
1095          */
1096         nr_samples = le32_to_cpu(pdu->nr_samples);
1097
1098         total = nr_samples * sizeof(struct io_sample);
1099         ret = malloc(total + sizeof(*pdu));
1100         ret->nr_samples = nr_samples;
1101
1102         memcpy(ret, pdu, sizeof(*pdu));
1103
1104         p = (void *) ret + sizeof(*pdu);
1105
1106         stream.avail_in = cmd->pdu_len - sizeof(*pdu);
1107         stream.next_in = (void *) pdu + sizeof(*pdu);
1108         while (stream.avail_in) {
1109                 unsigned int this_chunk = 65536;
1110                 unsigned int this_len;
1111                 int err;
1112
1113                 if (this_chunk > total)
1114                         this_chunk = total;
1115
1116                 stream.avail_out = this_chunk;
1117                 stream.next_out = p;
1118                 err = inflate(&stream, Z_NO_FLUSH);
1119                 /* may be Z_OK, or Z_STREAM_END */
1120                 if (err < 0) {
1121                         log_err("fio: inflate error %d\n", err);
1122                         free(ret);
1123                         ret = NULL;
1124                         goto err;
1125                 }
1126
1127                 this_len = this_chunk - stream.avail_out;
1128                 p += this_len;
1129                 total -= this_len;
1130         }
1131
1132 err:
1133         inflateEnd(&stream);
1134         return ret;
1135 #else
1136         return NULL;
1137 #endif
1138 }
1139
1140 /*
1141  * This has been compressed on the server side, since it can be big.
1142  * Uncompress here.
1143  */
1144 static struct cmd_iolog_pdu *convert_iolog(struct fio_net_cmd *cmd)
1145 {
1146         struct cmd_iolog_pdu *pdu = (struct cmd_iolog_pdu *) cmd->payload;
1147         struct cmd_iolog_pdu *ret;
1148         int i;
1149
1150         /*
1151          * Convert if compressed and we support it. If it's not
1152          * compressed, we need not do anything.
1153          */
1154         if (le32_to_cpu(pdu->compressed)) {
1155 #ifndef CONFIG_ZLIB
1156                 log_err("fio: server sent compressed data by mistake\n");
1157                 return NULL;
1158 #endif
1159                 ret = convert_iolog_gz(cmd, pdu);
1160                 if (!ret) {
1161                         log_err("fio: failed decompressing log\n");
1162                         return NULL;
1163                 }
1164         } else
1165                 ret = pdu;
1166
1167         ret->thread_number      = le32_to_cpu(ret->thread_number);
1168         ret->nr_samples         = le32_to_cpu(ret->nr_samples);
1169         ret->log_type           = le32_to_cpu(ret->log_type);
1170         ret->compressed         = le32_to_cpu(ret->compressed);
1171
1172         for (i = 0; i < ret->nr_samples; i++) {
1173                 struct io_sample *s = &ret->samples[i];
1174
1175                 s->time = le64_to_cpu(s->time);
1176                 s->val  = le64_to_cpu(s->val);
1177                 s->ddir = le32_to_cpu(s->ddir);
1178                 s->bs   = le32_to_cpu(s->bs);
1179         }
1180
1181         return ret;
1182 }
1183
1184 int fio_handle_client(struct fio_client *client)
1185 {
1186         struct client_ops *ops = client->ops;
1187         struct fio_net_cmd *cmd;
1188
1189         dprint(FD_NET, "client: handle %s\n", client->hostname);
1190
1191         cmd = fio_net_recv_cmd(client->fd);
1192         if (!cmd)
1193                 return 0;
1194
1195         dprint(FD_NET, "client: got cmd op %s from %s (pdu=%u)\n",
1196                 fio_server_op(cmd->opcode), client->hostname, cmd->pdu_len);
1197
1198         switch (cmd->opcode) {
1199         case FIO_NET_CMD_QUIT:
1200                 if (ops->quit)
1201                         ops->quit(client, cmd);
1202                 remove_client(client);
1203                 free(cmd);
1204                 break;
1205         case FIO_NET_CMD_TEXT:
1206                 convert_text(cmd);
1207                 ops->text(client, cmd);
1208                 free(cmd);
1209                 break;
1210         case FIO_NET_CMD_DU: {
1211                 struct cmd_du_pdu *du = (struct cmd_du_pdu *) cmd->payload;
1212
1213                 convert_dus(&du->dus);
1214                 convert_agg(&du->agg);
1215
1216                 ops->disk_util(client, cmd);
1217                 free(cmd);
1218                 break;
1219                 }
1220         case FIO_NET_CMD_TS: {
1221                 struct cmd_ts_pdu *p = (struct cmd_ts_pdu *) cmd->payload;
1222
1223                 convert_ts(&p->ts, &p->ts);
1224                 convert_gs(&p->rs, &p->rs);
1225
1226                 ops->thread_status(client, cmd);
1227                 free(cmd);
1228                 break;
1229                 }
1230         case FIO_NET_CMD_GS: {
1231                 struct group_run_stats *gs = (struct group_run_stats *) cmd->payload;
1232
1233                 convert_gs(gs, gs);
1234
1235                 ops->group_stats(client, cmd);
1236                 free(cmd);
1237                 break;
1238                 }
1239         case FIO_NET_CMD_ETA: {
1240                 struct jobs_eta *je = (struct jobs_eta *) cmd->payload;
1241
1242                 remove_reply_cmd(client, cmd);
1243                 convert_jobs_eta(je);
1244                 handle_eta(client, cmd);
1245                 free(cmd);
1246                 break;
1247                 }
1248         case FIO_NET_CMD_PROBE:
1249                 remove_reply_cmd(client, cmd);
1250                 ops->probe(client, cmd);
1251                 free(cmd);
1252                 break;
1253         case FIO_NET_CMD_SERVER_START:
1254                 client->state = Client_running;
1255                 if (ops->job_start)
1256                         ops->job_start(client, cmd);
1257                 free(cmd);
1258                 break;
1259         case FIO_NET_CMD_START: {
1260                 struct cmd_start_pdu *pdu = (struct cmd_start_pdu *) cmd->payload;
1261
1262                 pdu->jobs = le32_to_cpu(pdu->jobs);
1263                 ops->start(client, cmd);
1264                 free(cmd);
1265                 break;
1266                 }
1267         case FIO_NET_CMD_STOP: {
1268                 struct cmd_end_pdu *pdu = (struct cmd_end_pdu *) cmd->payload;
1269
1270                 convert_stop(cmd);
1271                 client->state = Client_stopped;
1272                 client->error = le32_to_cpu(pdu->error);
1273                 client->signal = le32_to_cpu(pdu->signal);
1274                 ops->stop(client, cmd);
1275                 free(cmd);
1276                 break;
1277                 }
1278         case FIO_NET_CMD_ADD_JOB: {
1279                 struct cmd_add_job_pdu *pdu = (struct cmd_add_job_pdu *) cmd->payload;
1280
1281                 client->thread_number = le32_to_cpu(pdu->thread_number);
1282                 client->groupid = le32_to_cpu(pdu->groupid);
1283
1284                 if (ops->add_job)
1285                         ops->add_job(client, cmd);
1286                 free(cmd);
1287                 break;
1288                 }
1289         case FIO_NET_CMD_IOLOG:
1290                 if (ops->iolog) {
1291                         struct cmd_iolog_pdu *pdu;
1292
1293                         pdu = convert_iolog(cmd);
1294                         ops->iolog(client, pdu);
1295                 }
1296                 free(cmd);
1297                 break;
1298         case FIO_NET_CMD_UPDATE_JOB:
1299                 ops->update_job(client, cmd);
1300                 remove_reply_cmd(client, cmd);
1301                 free(cmd);
1302                 break;
1303         default:
1304                 log_err("fio: unknown client op: %s\n", fio_server_op(cmd->opcode));
1305                 free(cmd);
1306                 break;
1307         }
1308
1309         return 1;
1310 }
1311
1312 static void request_client_etas(struct client_ops *ops)
1313 {
1314         struct fio_client *client;
1315         struct flist_head *entry;
1316         struct client_eta *eta;
1317         int skipped = 0;
1318
1319         dprint(FD_NET, "client: request eta (%d)\n", nr_clients);
1320
1321         eta = malloc(sizeof(*eta));
1322         memset(&eta->eta, 0, sizeof(eta->eta));
1323         eta->pending = nr_clients;
1324
1325         flist_for_each(entry, &client_list) {
1326                 client = flist_entry(entry, struct fio_client, list);
1327
1328                 if (!flist_empty(&client->eta_list)) {
1329                         skipped++;
1330                         continue;
1331                 }
1332                 if (client->state != Client_running)
1333                         continue;
1334
1335                 assert(!client->eta_in_flight);
1336                 flist_add_tail(&client->eta_list, &eta_list);
1337                 client->eta_in_flight = eta;
1338                 fio_net_send_simple_cmd(client->fd, FIO_NET_CMD_SEND_ETA,
1339                                         (uintptr_t) eta, &client->cmd_list);
1340         }
1341
1342         while (skipped--)
1343                 fio_client_dec_jobs_eta(eta, ops->eta);
1344
1345         dprint(FD_NET, "client: requested eta tag %p\n", eta);
1346 }
1347
1348 static int client_check_cmd_timeout(struct fio_client *client,
1349                                     struct timeval *now)
1350 {
1351         struct fio_net_cmd_reply *reply;
1352         struct flist_head *entry, *tmp;
1353         int ret = 0;
1354
1355         flist_for_each_safe(entry, tmp, &client->cmd_list) {
1356                 reply = flist_entry(entry, struct fio_net_cmd_reply, list);
1357
1358                 if (mtime_since(&reply->tv, now) < FIO_NET_CLIENT_TIMEOUT)
1359                         continue;
1360
1361                 log_err("fio: client %s, timeout on cmd %s\n", client->hostname,
1362                                                 fio_server_op(reply->opcode));
1363                 flist_del(&reply->list);
1364                 free(reply);
1365                 ret = 1;
1366         }
1367
1368         return flist_empty(&client->cmd_list) && ret;
1369 }
1370
1371 static int fio_check_clients_timed_out(void)
1372 {
1373         struct fio_client *client;
1374         struct flist_head *entry, *tmp;
1375         struct timeval tv;
1376         int ret = 0;
1377
1378         fio_gettime(&tv, NULL);
1379
1380         flist_for_each_safe(entry, tmp, &client_list) {
1381                 client = flist_entry(entry, struct fio_client, list);
1382
1383                 if (flist_empty(&client->cmd_list))
1384                         continue;
1385
1386                 if (!client_check_cmd_timeout(client, &tv))
1387                         continue;
1388
1389                 if (client->ops->timed_out)
1390                         client->ops->timed_out(client);
1391                 else
1392                         log_err("fio: client %s timed out\n", client->hostname);
1393
1394                 remove_client(client);
1395                 ret = 1;
1396         }
1397
1398         return ret;
1399 }
1400
1401 int fio_handle_clients(struct client_ops *ops)
1402 {
1403         struct pollfd *pfds;
1404         int i, ret = 0, retval = 0;
1405
1406         fio_gettime(&eta_tv, NULL);
1407
1408         pfds = malloc(nr_clients * sizeof(struct pollfd));
1409
1410         init_thread_stat(&client_ts);
1411         init_group_run_stat(&client_gs);
1412
1413         while (!exit_backend && nr_clients) {
1414                 struct flist_head *entry, *tmp;
1415                 struct fio_client *client;
1416
1417                 i = 0;
1418                 flist_for_each_safe(entry, tmp, &client_list) {
1419                         client = flist_entry(entry, struct fio_client, list);
1420
1421                         if (!client->sent_job && !client->ops->stay_connected &&
1422                             flist_empty(&client->cmd_list)) {
1423                                 remove_client(client);
1424                                 continue;
1425                         }
1426
1427                         pfds[i].fd = client->fd;
1428                         pfds[i].events = POLLIN;
1429                         i++;
1430                 }
1431
1432                 if (!nr_clients)
1433                         break;
1434
1435                 assert(i == nr_clients);
1436
1437                 do {
1438                         struct timeval tv;
1439
1440                         fio_gettime(&tv, NULL);
1441                         if (mtime_since(&eta_tv, &tv) >= 900) {
1442                                 request_client_etas(ops);
1443                                 memcpy(&eta_tv, &tv, sizeof(tv));
1444
1445                                 if (fio_check_clients_timed_out())
1446                                         break;
1447                         }
1448
1449                         ret = poll(pfds, nr_clients, ops->eta_msec);
1450                         if (ret < 0) {
1451                                 if (errno == EINTR)
1452                                         continue;
1453                                 log_err("fio: poll clients: %s\n", strerror(errno));
1454                                 break;
1455                         } else if (!ret)
1456                                 continue;
1457                 } while (ret <= 0);
1458
1459                 for (i = 0; i < nr_clients; i++) {
1460                         if (!(pfds[i].revents & POLLIN))
1461                                 continue;
1462
1463                         client = find_client_by_fd(pfds[i].fd);
1464                         if (!client) {
1465                                 log_err("fio: unknown client fd %d\n", pfds[i].fd);
1466                                 continue;
1467                         }
1468                         if (!fio_handle_client(client)) {
1469                                 log_info("client: host=%s disconnected\n",
1470                                                 client->hostname);
1471                                 remove_client(client);
1472                                 retval = 1;
1473                         } else if (client->error)
1474                                 retval = 1;
1475                         fio_put_client(client);
1476                 }
1477         }
1478
1479         free(pfds);
1480         return retval;
1481 }