2d8a2bb151fbc52db553cf5cca5c59871181cac7
[fio.git] / client.c
1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <unistd.h>
4 #include <errno.h>
5 #include <fcntl.h>
6 #include <poll.h>
7 #include <sys/types.h>
8 #include <sys/stat.h>
9 #include <sys/socket.h>
10 #include <sys/un.h>
11 #include <netinet/in.h>
12 #include <arpa/inet.h>
13 #include <netdb.h>
14 #include <signal.h>
15 #ifdef CONFIG_ZLIB
16 #include <zlib.h>
17 #endif
18
19 #include "fio.h"
20 #include "client.h"
21 #include "server.h"
22 #include "flist.h"
23 #include "hash.h"
24 #include "verify-state.h"
25
26 static void handle_du(struct fio_client *client, struct fio_net_cmd *cmd);
27 static void handle_ts(struct fio_client *client, struct fio_net_cmd *cmd);
28 static void handle_gs(struct fio_client *client, struct fio_net_cmd *cmd);
29 static void handle_probe(struct fio_client *client, struct fio_net_cmd *cmd);
30 static void handle_text(struct fio_client *client, struct fio_net_cmd *cmd);
31 static void handle_stop(struct fio_client *client);
32 static void handle_start(struct fio_client *client, struct fio_net_cmd *cmd);
33
34 static void convert_text(struct fio_net_cmd *cmd);
35 static void client_display_thread_status(struct jobs_eta *je);
36
37 struct client_ops fio_client_ops = {
38         .text           = handle_text,
39         .disk_util      = handle_du,
40         .thread_status  = handle_ts,
41         .group_stats    = handle_gs,
42         .stop           = handle_stop,
43         .start          = handle_start,
44         .eta            = client_display_thread_status,
45         .probe          = handle_probe,
46         .eta_msec       = FIO_CLIENT_DEF_ETA_MSEC,
47         .client_type    = FIO_CLIENT_TYPE_CLI,
48 };
49
50 static struct timespec eta_ts;
51
52 static FLIST_HEAD(client_list);
53 static FLIST_HEAD(eta_list);
54
55 static FLIST_HEAD(arg_list);
56
57 struct thread_stat client_ts;
58 struct group_run_stats client_gs;
59 int sum_stat_clients;
60
61 static int sum_stat_nr;
62 static struct json_object *root = NULL;
63 static struct json_object *job_opt_object = NULL;
64 static struct json_array *clients_array = NULL;
65 static struct json_array *du_array = NULL;
66
67 static int error_clients;
68
69 #define FIO_CLIENT_HASH_BITS    7
70 #define FIO_CLIENT_HASH_SZ      (1 << FIO_CLIENT_HASH_BITS)
71 #define FIO_CLIENT_HASH_MASK    (FIO_CLIENT_HASH_SZ - 1)
72 static struct flist_head client_hash[FIO_CLIENT_HASH_SZ];
73
74 static struct cmd_iolog_pdu *convert_iolog(struct fio_net_cmd *, bool *);
75
76 static void fio_client_add_hash(struct fio_client *client)
77 {
78         int bucket = hash_long(client->fd, FIO_CLIENT_HASH_BITS);
79
80         bucket &= FIO_CLIENT_HASH_MASK;
81         flist_add(&client->hash_list, &client_hash[bucket]);
82 }
83
84 static void fio_client_remove_hash(struct fio_client *client)
85 {
86         if (!flist_empty(&client->hash_list))
87                 flist_del_init(&client->hash_list);
88 }
89
90 static void fio_init fio_client_hash_init(void)
91 {
92         int i;
93
94         for (i = 0; i < FIO_CLIENT_HASH_SZ; i++)
95                 INIT_FLIST_HEAD(&client_hash[i]);
96 }
97
98 static int read_data(int fd, void *data, size_t size)
99 {
100         ssize_t ret;
101
102         while (size) {
103                 ret = read(fd, data, size);
104                 if (ret < 0) {
105                         if (errno == EAGAIN || errno == EINTR)
106                                 continue;
107                         break;
108                 } else if (!ret)
109                         break;
110                 else {
111                         data += ret;
112                         size -= ret;
113                 }
114         }
115
116         if (size)
117                 return EAGAIN;
118
119         return 0;
120 }
121
122 static int read_ini_data(int fd, void *data, size_t size)
123 {
124         char *p = data;
125         int ret = 0;
126         FILE *fp;
127         int dupfd;
128
129         dupfd = dup(fd);
130         if (dupfd < 0)
131                 return errno;
132
133         fp = fdopen(dupfd, "r");
134         if (!fp) {
135                 ret = errno;
136                 close(dupfd);
137                 goto out;
138         }
139
140         while (1) {
141                 ssize_t len;
142                 char buf[OPT_LEN_MAX+1], *sub;
143
144                 if (!fgets(buf, sizeof(buf), fp)) {
145                         if (ferror(fp)) {
146                                 if (errno == EAGAIN || errno == EINTR)
147                                         continue;
148                                 ret = errno;
149                         }
150                         break;
151                 }
152
153                 sub = fio_option_dup_subs(buf);
154                 len = strlen(sub);
155                 if (len + 1 > size) {
156                         log_err("fio: no space left to read data\n");
157                         free(sub);
158                         ret = ENOSPC;
159                         break;
160                 }
161
162                 memcpy(p, sub, len);
163                 free(sub);
164                 p += len;
165                 *p = '\0';
166                 size -= len;
167         }
168
169         fclose(fp);
170 out:
171         return ret;
172 }
173
174 static void fio_client_json_init(void)
175 {
176         char time_buf[32];
177         time_t time_p;
178
179         if (!(output_format & FIO_OUTPUT_JSON))
180                 return;
181
182         time(&time_p);
183         os_ctime_r((const time_t *) &time_p, time_buf, sizeof(time_buf));
184         time_buf[strlen(time_buf) - 1] = '\0';
185
186         root = json_create_object();
187         json_object_add_value_string(root, "fio version", fio_version_string);
188         json_object_add_value_int(root, "timestamp", time_p);
189         json_object_add_value_string(root, "time", time_buf);
190
191         job_opt_object = json_create_object();
192         json_object_add_value_object(root, "global options", job_opt_object);
193         clients_array = json_create_array();
194         json_object_add_value_array(root, "client_stats", clients_array);
195         du_array = json_create_array();
196         json_object_add_value_array(root, "disk_util", du_array);
197 }
198
199 static void fio_client_json_fini(void)
200 {
201         if (!(output_format & FIO_OUTPUT_JSON))
202                 return;
203
204         log_info("\n");
205         json_print_object(root, NULL);
206         log_info("\n");
207         json_free_object(root);
208         root = NULL;
209         clients_array = NULL;
210         du_array = NULL;
211 }
212
213 static struct fio_client *find_client_by_fd(int fd)
214 {
215         int bucket = hash_long(fd, FIO_CLIENT_HASH_BITS) & FIO_CLIENT_HASH_MASK;
216         struct fio_client *client;
217         struct flist_head *entry;
218
219         flist_for_each(entry, &client_hash[bucket]) {
220                 client = flist_entry(entry, struct fio_client, hash_list);
221
222                 if (client->fd == fd) {
223                         client->refs++;
224                         return client;
225                 }
226         }
227
228         return NULL;
229 }
230
231 void fio_put_client(struct fio_client *client)
232 {
233         if (--client->refs)
234                 return;
235
236         free(client->hostname);
237         if (client->argv)
238                 free(client->argv);
239         if (client->name)
240                 free(client->name);
241         while (client->nr_files) {
242                 struct client_file *cf = &client->files[--client->nr_files];
243
244                 free(cf->file);
245         }
246         if (client->files)
247                 free(client->files);
248         if (client->opt_lists)
249                 free(client->opt_lists);
250
251         if (!client->did_stat)
252                 sum_stat_clients--;
253
254         if (client->error)
255                 error_clients++;
256
257         free(client);
258 }
259
260 static int fio_client_dec_jobs_eta(struct client_eta *eta, client_eta_op eta_fn)
261 {
262         if (!--eta->pending) {
263                 eta_fn(&eta->eta);
264                 free(eta);
265                 return 0;
266         }
267
268         return 1;
269 }
270
271 static void fio_drain_client_text(struct fio_client *client)
272 {
273         do {
274                 struct fio_net_cmd *cmd;
275
276                 cmd = fio_net_recv_cmd(client->fd, false);
277                 if (!cmd)
278                         break;
279
280                 if (cmd->opcode == FIO_NET_CMD_TEXT) {
281                         convert_text(cmd);
282                         client->ops->text(client, cmd);
283                 }
284
285                 free(cmd);
286         } while (1);
287 }
288
289 static void remove_client(struct fio_client *client)
290 {
291         assert(client->refs);
292
293         dprint(FD_NET, "client: removed <%s>\n", client->hostname);
294
295         fio_drain_client_text(client);
296
297         if (!flist_empty(&client->list))
298                 flist_del_init(&client->list);
299
300         fio_client_remove_hash(client);
301
302         if (!flist_empty(&client->eta_list)) {
303                 flist_del_init(&client->eta_list);
304                 fio_client_dec_jobs_eta(client->eta_in_flight, client->ops->eta);
305         }
306
307         close(client->fd);
308         client->fd = -1;
309
310         if (client->ops->removed)
311                 client->ops->removed(client);
312
313         nr_clients--;
314         fio_put_client(client);
315 }
316
317 struct fio_client *fio_get_client(struct fio_client *client)
318 {
319         client->refs++;
320         return client;
321 }
322
323 static void __fio_client_add_cmd_option(struct fio_client *client,
324                                         const char *opt)
325 {
326         int index;
327
328         index = client->argc++;
329         client->argv = realloc(client->argv, sizeof(char *) * client->argc);
330         client->argv[index] = strdup(opt);
331         dprint(FD_NET, "client: add cmd %d: %s\n", index, opt);
332 }
333
334 void fio_client_add_cmd_option(void *cookie, const char *opt)
335 {
336         struct fio_client *client = cookie;
337         struct flist_head *entry;
338
339         if (!client || !opt)
340                 return;
341
342         __fio_client_add_cmd_option(client, opt);
343
344         /*
345          * Duplicate arguments to shared client group
346          */
347         flist_for_each(entry, &arg_list) {
348                 client = flist_entry(entry, struct fio_client, arg_list);
349
350                 __fio_client_add_cmd_option(client, opt);
351         }
352 }
353
354 struct fio_client *fio_client_add_explicit(struct client_ops *ops,
355                                            const char *hostname, int type,
356                                            int port)
357 {
358         struct fio_client *client;
359
360         client = malloc(sizeof(*client));
361         memset(client, 0, sizeof(*client));
362
363         INIT_FLIST_HEAD(&client->list);
364         INIT_FLIST_HEAD(&client->hash_list);
365         INIT_FLIST_HEAD(&client->arg_list);
366         INIT_FLIST_HEAD(&client->eta_list);
367         INIT_FLIST_HEAD(&client->cmd_list);
368
369         client->hostname = strdup(hostname);
370
371         if (type == Fio_client_socket)
372                 client->is_sock = true;
373         else {
374                 int ipv6;
375
376                 ipv6 = type == Fio_client_ipv6;
377                 if (fio_server_parse_host(hostname, ipv6,
378                                                 &client->addr.sin_addr,
379                                                 &client->addr6.sin6_addr))
380                         goto err;
381
382                 client->port = port;
383         }
384
385         client->fd = -1;
386         client->ops = ops;
387         client->refs = 1;
388         client->type = ops->client_type;
389
390         __fio_client_add_cmd_option(client, "fio");
391
392         flist_add(&client->list, &client_list);
393         nr_clients++;
394         dprint(FD_NET, "client: added <%s>\n", client->hostname);
395         return client;
396 err:
397         free(client);
398         return NULL;
399 }
400
401 int fio_client_add_ini_file(void *cookie, const char *ini_file, bool remote)
402 {
403         struct fio_client *client = cookie;
404         struct client_file *cf;
405         size_t new_size;
406         void *new_files;
407
408         if (!client)
409                 return 1;
410
411         dprint(FD_NET, "client <%s>: add ini %s\n", client->hostname, ini_file);
412
413         new_size = (client->nr_files + 1) * sizeof(struct client_file);
414         new_files = realloc(client->files, new_size);
415         if (!new_files)
416                 return 1;
417
418         client->files = new_files;
419         cf = &client->files[client->nr_files];
420         cf->file = strdup(ini_file);
421         cf->remote = remote;
422         client->nr_files++;
423         return 0;
424 }
425
426 int fio_client_add(struct client_ops *ops, const char *hostname, void **cookie)
427 {
428         struct fio_client *existing = *cookie;
429         struct fio_client *client;
430
431         if (existing) {
432                 /*
433                  * We always add our "exec" name as the option, hence 1
434                  * means empty.
435                  */
436                 if (existing->argc == 1)
437                         flist_add_tail(&existing->arg_list, &arg_list);
438                 else {
439                         while (!flist_empty(&arg_list))
440                                 flist_del_init(arg_list.next);
441                 }
442         }
443
444         client = malloc(sizeof(*client));
445         memset(client, 0, sizeof(*client));
446
447         INIT_FLIST_HEAD(&client->list);
448         INIT_FLIST_HEAD(&client->hash_list);
449         INIT_FLIST_HEAD(&client->arg_list);
450         INIT_FLIST_HEAD(&client->eta_list);
451         INIT_FLIST_HEAD(&client->cmd_list);
452
453         if (fio_server_parse_string(hostname, &client->hostname,
454                                         &client->is_sock, &client->port,
455                                         &client->addr.sin_addr,
456                                         &client->addr6.sin6_addr,
457                                         &client->ipv6))
458                 return -1;
459
460         client->fd = -1;
461         client->ops = ops;
462         client->refs = 1;
463         client->type = ops->client_type;
464
465         __fio_client_add_cmd_option(client, "fio");
466
467         flist_add(&client->list, &client_list);
468         nr_clients++;
469         dprint(FD_NET, "client: added <%s>\n", client->hostname);
470         *cookie = client;
471         return 0;
472 }
473
474 static const char *server_name(struct fio_client *client, char *buf,
475                                size_t bufsize)
476 {
477         const char *from;
478
479         if (client->ipv6)
480                 from = inet_ntop(AF_INET6, (struct sockaddr *) &client->addr6.sin6_addr, buf, bufsize);
481         else if (client->is_sock)
482                 from = "sock";
483         else
484                 from = inet_ntop(AF_INET, (struct sockaddr *) &client->addr.sin_addr, buf, bufsize);
485
486         return from;
487 }
488
489 static void probe_client(struct fio_client *client)
490 {
491         struct cmd_client_probe_pdu pdu;
492         const char *sname;
493         uint64_t tag;
494         char buf[64];
495
496         dprint(FD_NET, "client: send probe\n");
497
498 #ifdef CONFIG_ZLIB
499         pdu.flags = __le64_to_cpu(FIO_PROBE_FLAG_ZLIB);
500 #else
501         pdu.flags = 0;
502 #endif
503
504         sname = server_name(client, buf, sizeof(buf));
505         memset(pdu.server, 0, sizeof(pdu.server));
506         strncpy((char *) pdu.server, sname, sizeof(pdu.server) - 1);
507
508         fio_net_send_cmd(client->fd, FIO_NET_CMD_PROBE, &pdu, sizeof(pdu), &tag, &client->cmd_list);
509 }
510
511 static int fio_client_connect_ip(struct fio_client *client)
512 {
513         struct sockaddr *addr;
514         socklen_t socklen;
515         int fd, domain;
516
517         if (client->ipv6) {
518                 client->addr6.sin6_family = AF_INET6;
519                 client->addr6.sin6_port = htons(client->port);
520                 domain = AF_INET6;
521                 addr = (struct sockaddr *) &client->addr6;
522                 socklen = sizeof(client->addr6);
523         } else {
524                 client->addr.sin_family = AF_INET;
525                 client->addr.sin_port = htons(client->port);
526                 domain = AF_INET;
527                 addr = (struct sockaddr *) &client->addr;
528                 socklen = sizeof(client->addr);
529         }
530
531         fd = socket(domain, SOCK_STREAM, 0);
532         if (fd < 0) {
533                 int ret = -errno;
534
535                 log_err("fio: socket: %s\n", strerror(errno));
536                 return ret;
537         }
538
539         if (connect(fd, addr, socklen) < 0) {
540                 int ret = -errno;
541
542                 log_err("fio: connect: %s\n", strerror(errno));
543                 log_err("fio: failed to connect to %s:%u\n", client->hostname,
544                                                                 client->port);
545                 close(fd);
546                 return ret;
547         }
548
549         return fd;
550 }
551
552 static int fio_client_connect_sock(struct fio_client *client)
553 {
554         struct sockaddr_un *addr = &client->addr_un;
555         socklen_t len;
556         int fd;
557
558         memset(addr, 0, sizeof(*addr));
559         addr->sun_family = AF_UNIX;
560         strncpy(addr->sun_path, client->hostname, sizeof(addr->sun_path) - 1);
561
562         fd = socket(AF_UNIX, SOCK_STREAM, 0);
563         if (fd < 0) {
564                 int ret = -errno;
565
566                 log_err("fio: socket: %s\n", strerror(errno));
567                 return ret;
568         }
569
570         len = sizeof(addr->sun_family) + strlen(addr->sun_path) + 1;
571         if (connect(fd, (struct sockaddr *) addr, len) < 0) {
572                 int ret = -errno;
573
574                 log_err("fio: connect; %s\n", strerror(errno));
575                 close(fd);
576                 return ret;
577         }
578
579         return fd;
580 }
581
582 int fio_client_connect(struct fio_client *client)
583 {
584         int fd;
585
586         dprint(FD_NET, "client: connect to host %s\n", client->hostname);
587
588         if (client->is_sock)
589                 fd = fio_client_connect_sock(client);
590         else
591                 fd = fio_client_connect_ip(client);
592
593         dprint(FD_NET, "client: %s connected %d\n", client->hostname, fd);
594
595         if (fd < 0)
596                 return fd;
597
598         client->fd = fd;
599         fio_client_add_hash(client);
600         client->state = Client_connected;
601
602         probe_client(client);
603         return 0;
604 }
605
606 int fio_client_terminate(struct fio_client *client)
607 {
608         return fio_net_send_quit(client->fd);
609 }
610
611 static void fio_clients_terminate(void)
612 {
613         struct flist_head *entry;
614         struct fio_client *client;
615
616         dprint(FD_NET, "client: terminate clients\n");
617
618         flist_for_each(entry, &client_list) {
619                 client = flist_entry(entry, struct fio_client, list);
620                 fio_client_terminate(client);
621         }
622 }
623
624 static void sig_int(int sig)
625 {
626         dprint(FD_NET, "client: got signal %d\n", sig);
627         fio_clients_terminate();
628 }
629
630 static void client_signal_handler(void)
631 {
632         struct sigaction act;
633
634         memset(&act, 0, sizeof(act));
635         act.sa_handler = sig_int;
636         act.sa_flags = SA_RESTART;
637         sigaction(SIGINT, &act, NULL);
638
639         memset(&act, 0, sizeof(act));
640         act.sa_handler = sig_int;
641         act.sa_flags = SA_RESTART;
642         sigaction(SIGTERM, &act, NULL);
643
644 /* Windows uses SIGBREAK as a quit signal from other applications */
645 #ifdef WIN32
646         memset(&act, 0, sizeof(act));
647         act.sa_handler = sig_int;
648         act.sa_flags = SA_RESTART;
649         sigaction(SIGBREAK, &act, NULL);
650 #endif
651
652         memset(&act, 0, sizeof(act));
653         act.sa_handler = sig_show_status;
654         act.sa_flags = SA_RESTART;
655         sigaction(SIGUSR1, &act, NULL);
656 }
657
658 static int send_client_cmd_line(struct fio_client *client)
659 {
660         struct cmd_single_line_pdu *cslp;
661         struct cmd_line_pdu *clp;
662         unsigned long offset;
663         unsigned int *lens;
664         void *pdu;
665         size_t mem;
666         int i, ret;
667
668         dprint(FD_NET, "client: send cmdline %d\n", client->argc);
669
670         lens = malloc(client->argc * sizeof(unsigned int));
671
672         /*
673          * Find out how much mem we need
674          */
675         for (i = 0, mem = 0; i < client->argc; i++) {
676                 lens[i] = strlen(client->argv[i]) + 1;
677                 mem += lens[i];
678         }
679
680         /*
681          * We need one cmd_line_pdu, and argc number of cmd_single_line_pdu
682          */
683         mem += sizeof(*clp) + (client->argc * sizeof(*cslp));
684
685         pdu = malloc(mem);
686         clp = pdu;
687         offset = sizeof(*clp);
688
689         for (i = 0; i < client->argc; i++) {
690                 uint16_t arg_len = lens[i];
691
692                 cslp = pdu + offset;
693                 strcpy((char *) cslp->text, client->argv[i]);
694                 cslp->len = cpu_to_le16(arg_len);
695                 offset += sizeof(*cslp) + arg_len;
696         }
697
698         free(lens);
699         clp->lines = cpu_to_le16(client->argc);
700         clp->client_type = __cpu_to_le16(client->type);
701         ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_JOBLINE, pdu, mem, NULL, NULL);
702         free(pdu);
703         return ret;
704 }
705
706 int fio_clients_connect(void)
707 {
708         struct fio_client *client;
709         struct flist_head *entry, *tmp;
710         int ret;
711
712 #ifdef WIN32
713         WSADATA wsd;
714         WSAStartup(MAKEWORD(2, 2), &wsd);
715 #endif
716
717         dprint(FD_NET, "client: connect all\n");
718
719         client_signal_handler();
720
721         flist_for_each_safe(entry, tmp, &client_list) {
722                 client = flist_entry(entry, struct fio_client, list);
723
724                 ret = fio_client_connect(client);
725                 if (ret) {
726                         remove_client(client);
727                         continue;
728                 }
729
730                 if (client->argc > 1)
731                         send_client_cmd_line(client);
732         }
733
734         return !nr_clients;
735 }
736
737 int fio_start_client(struct fio_client *client)
738 {
739         dprint(FD_NET, "client: start %s\n", client->hostname);
740         return fio_net_send_simple_cmd(client->fd, FIO_NET_CMD_RUN, 0, NULL);
741 }
742
743 int fio_start_all_clients(void)
744 {
745         struct fio_client *client;
746         struct flist_head *entry, *tmp;
747         int ret;
748
749         dprint(FD_NET, "client: start all\n");
750
751         fio_client_json_init();
752
753         flist_for_each_safe(entry, tmp, &client_list) {
754                 client = flist_entry(entry, struct fio_client, list);
755
756                 ret = fio_start_client(client);
757                 if (ret) {
758                         remove_client(client);
759                         continue;
760                 }
761         }
762
763         return flist_empty(&client_list);
764 }
765
766 static int __fio_client_send_remote_ini(struct fio_client *client,
767                                         const char *filename)
768 {
769         struct cmd_load_file_pdu *pdu;
770         size_t p_size;
771         int ret;
772
773         dprint(FD_NET, "send remote ini %s to %s\n", filename, client->hostname);
774
775         p_size = sizeof(*pdu) + strlen(filename) + 1;
776         pdu = malloc(p_size);
777         memset(pdu, 0, p_size);
778         pdu->name_len = strlen(filename);
779         strcpy((char *) pdu->file, filename);
780         pdu->client_type = cpu_to_le16((uint16_t) client->type);
781
782         client->sent_job = true;
783         ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_LOAD_FILE, pdu, p_size,NULL, NULL);
784         free(pdu);
785         return ret;
786 }
787
788 /*
789  * Send file contents to server backend. We could use sendfile(), but to remain
790  * more portable lets just read/write the darn thing.
791  */
792 static int __fio_client_send_local_ini(struct fio_client *client,
793                                        const char *filename)
794 {
795         struct cmd_job_pdu *pdu;
796         size_t p_size;
797         struct stat sb;
798         char *p;
799         void *buf;
800         off_t len;
801         int fd, ret;
802
803         dprint(FD_NET, "send ini %s to %s\n", filename, client->hostname);
804
805         fd = open(filename, O_RDONLY);
806         if (fd < 0) {
807                 ret = -errno;
808                 log_err("fio: job file <%s> open: %s\n", filename, strerror(errno));
809                 return ret;
810         }
811
812         if (fstat(fd, &sb) < 0) {
813                 ret = -errno;
814                 log_err("fio: job file stat: %s\n", strerror(errno));
815                 close(fd);
816                 return ret;
817         }
818
819         /*
820          * Add extra space for variable expansion, but doesn't guarantee.
821          */
822         sb.st_size += OPT_LEN_MAX;
823         p_size = sb.st_size + sizeof(*pdu);
824         pdu = malloc(p_size);
825         buf = pdu->buf;
826
827         len = sb.st_size;
828         p = buf;
829         if (read_ini_data(fd, p, len)) {
830                 log_err("fio: failed reading job file %s\n", filename);
831                 close(fd);
832                 free(pdu);
833                 return 1;
834         }
835
836         pdu->buf_len = __cpu_to_le32(sb.st_size);
837         pdu->client_type = cpu_to_le32(client->type);
838
839         client->sent_job = true;
840         ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_JOB, pdu, p_size, NULL, NULL);
841         free(pdu);
842         close(fd);
843         return ret;
844 }
845
846 int fio_client_send_ini(struct fio_client *client, const char *filename,
847                         bool remote)
848 {
849         int ret;
850
851         if (!remote)
852                 ret = __fio_client_send_local_ini(client, filename);
853         else
854                 ret = __fio_client_send_remote_ini(client, filename);
855
856         if (!ret)
857                 client->sent_job = true;
858
859         return ret;
860 }
861
862 static int fio_client_send_cf(struct fio_client *client,
863                               struct client_file *cf)
864 {
865         return fio_client_send_ini(client, cf->file, cf->remote);
866 }
867
868 int fio_clients_send_ini(const char *filename)
869 {
870         struct fio_client *client;
871         struct flist_head *entry, *tmp;
872
873         flist_for_each_safe(entry, tmp, &client_list) {
874                 bool failed = false;
875
876                 client = flist_entry(entry, struct fio_client, list);
877
878                 if (client->nr_files) {
879                         int i;
880
881                         for (i = 0; i < client->nr_files; i++) {
882                                 struct client_file *cf;
883
884                                 cf = &client->files[i];
885
886                                 if (fio_client_send_cf(client, cf)) {
887                                         failed = true;
888                                         remove_client(client);
889                                         break;
890                                 }
891                         }
892                 }
893                 if (client->sent_job || failed)
894                         continue;
895                 if (!filename || fio_client_send_ini(client, filename, 0))
896                         remove_client(client);
897         }
898
899         return !nr_clients;
900 }
901
902 int fio_client_update_options(struct fio_client *client,
903                               struct thread_options *o, uint64_t *tag)
904 {
905         struct cmd_add_job_pdu pdu;
906
907         pdu.thread_number = cpu_to_le32(client->thread_number);
908         pdu.groupid = cpu_to_le32(client->groupid);
909         convert_thread_options_to_net(&pdu.top, o);
910
911         return fio_net_send_cmd(client->fd, FIO_NET_CMD_UPDATE_JOB, &pdu, sizeof(pdu), tag, &client->cmd_list);
912 }
913
914 static void convert_io_stat(struct io_stat *dst, struct io_stat *src)
915 {
916         dst->max_val    = le64_to_cpu(src->max_val);
917         dst->min_val    = le64_to_cpu(src->min_val);
918         dst->samples    = le64_to_cpu(src->samples);
919
920         /*
921          * Floats arrive as IEEE 754 encoded uint64_t, convert back to double
922          */
923         dst->mean.u.f   = fio_uint64_to_double(le64_to_cpu(dst->mean.u.i));
924         dst->S.u.f      = fio_uint64_to_double(le64_to_cpu(dst->S.u.i));
925 }
926
927 static void convert_ts(struct thread_stat *dst, struct thread_stat *src)
928 {
929         int i, j;
930
931         dst->error              = le32_to_cpu(src->error);
932         dst->thread_number      = le32_to_cpu(src->thread_number);
933         dst->groupid            = le32_to_cpu(src->groupid);
934         dst->pid                = le32_to_cpu(src->pid);
935         dst->members            = le32_to_cpu(src->members);
936         dst->unified_rw_rep     = le32_to_cpu(src->unified_rw_rep);
937
938         for (i = 0; i < DDIR_RWDIR_CNT; i++) {
939                 convert_io_stat(&dst->clat_stat[i], &src->clat_stat[i]);
940                 convert_io_stat(&dst->slat_stat[i], &src->slat_stat[i]);
941                 convert_io_stat(&dst->lat_stat[i], &src->lat_stat[i]);
942                 convert_io_stat(&dst->bw_stat[i], &src->bw_stat[i]);
943                 convert_io_stat(&dst->iops_stat[i], &src->iops_stat[i]);
944         }
945
946         dst->usr_time           = le64_to_cpu(src->usr_time);
947         dst->sys_time           = le64_to_cpu(src->sys_time);
948         dst->ctx                = le64_to_cpu(src->ctx);
949         dst->minf               = le64_to_cpu(src->minf);
950         dst->majf               = le64_to_cpu(src->majf);
951         dst->clat_percentiles   = le32_to_cpu(src->clat_percentiles);
952         dst->lat_percentiles    = le32_to_cpu(src->lat_percentiles);
953         dst->percentile_precision = le64_to_cpu(src->percentile_precision);
954
955         for (i = 0; i < FIO_IO_U_LIST_MAX_LEN; i++) {
956                 fio_fp64_t *fps = &src->percentile_list[i];
957                 fio_fp64_t *fpd = &dst->percentile_list[i];
958
959                 fpd->u.f = fio_uint64_to_double(le64_to_cpu(fps->u.i));
960         }
961
962         for (i = 0; i < FIO_IO_U_MAP_NR; i++) {
963                 dst->io_u_map[i]        = le64_to_cpu(src->io_u_map[i]);
964                 dst->io_u_submit[i]     = le64_to_cpu(src->io_u_submit[i]);
965                 dst->io_u_complete[i]   = le64_to_cpu(src->io_u_complete[i]);
966         }
967
968         for (i = 0; i < FIO_IO_U_LAT_N_NR; i++)
969                 dst->io_u_lat_n[i]      = le64_to_cpu(src->io_u_lat_n[i]);
970         for (i = 0; i < FIO_IO_U_LAT_U_NR; i++)
971                 dst->io_u_lat_u[i]      = le64_to_cpu(src->io_u_lat_u[i]);
972         for (i = 0; i < FIO_IO_U_LAT_M_NR; i++)
973                 dst->io_u_lat_m[i]      = le64_to_cpu(src->io_u_lat_m[i]);
974
975         for (i = 0; i < DDIR_RWDIR_CNT; i++)
976                 for (j = 0; j < FIO_IO_U_PLAT_NR; j++)
977                         dst->io_u_plat[i][j] = le64_to_cpu(src->io_u_plat[i][j]);
978
979         for (i = 0; i < DDIR_RWDIR_CNT; i++) {
980                 dst->total_io_u[i]      = le64_to_cpu(src->total_io_u[i]);
981                 dst->short_io_u[i]      = le64_to_cpu(src->short_io_u[i]);
982                 dst->drop_io_u[i]       = le64_to_cpu(src->drop_io_u[i]);
983         }
984
985         dst->total_submit       = le64_to_cpu(src->total_submit);
986         dst->total_complete     = le64_to_cpu(src->total_complete);
987
988         for (i = 0; i < DDIR_RWDIR_CNT; i++) {
989                 dst->io_bytes[i]        = le64_to_cpu(src->io_bytes[i]);
990                 dst->runtime[i]         = le64_to_cpu(src->runtime[i]);
991         }
992
993         dst->total_run_time     = le64_to_cpu(src->total_run_time);
994         dst->continue_on_error  = le16_to_cpu(src->continue_on_error);
995         dst->total_err_count    = le64_to_cpu(src->total_err_count);
996         dst->first_error        = le32_to_cpu(src->first_error);
997         dst->kb_base            = le32_to_cpu(src->kb_base);
998         dst->unit_base          = le32_to_cpu(src->unit_base);
999
1000         dst->sig_figs           = le32_to_cpu(src->sig_figs);
1001
1002         dst->latency_depth      = le32_to_cpu(src->latency_depth);
1003         dst->latency_target     = le64_to_cpu(src->latency_target);
1004         dst->latency_window     = le64_to_cpu(src->latency_window);
1005         dst->latency_percentile.u.f = fio_uint64_to_double(le64_to_cpu(src->latency_percentile.u.i));
1006
1007         dst->nr_block_infos     = le64_to_cpu(src->nr_block_infos);
1008         for (i = 0; i < dst->nr_block_infos; i++)
1009                 dst->block_infos[i] = le32_to_cpu(src->block_infos[i]);
1010
1011         dst->ss_dur             = le64_to_cpu(src->ss_dur);
1012         dst->ss_state           = le32_to_cpu(src->ss_state);
1013         dst->ss_head            = le32_to_cpu(src->ss_head);
1014         dst->ss_limit.u.f       = fio_uint64_to_double(le64_to_cpu(src->ss_limit.u.i));
1015         dst->ss_slope.u.f       = fio_uint64_to_double(le64_to_cpu(src->ss_slope.u.i));
1016         dst->ss_deviation.u.f   = fio_uint64_to_double(le64_to_cpu(src->ss_deviation.u.i));
1017         dst->ss_criterion.u.f   = fio_uint64_to_double(le64_to_cpu(src->ss_criterion.u.i));
1018
1019         if (dst->ss_state & FIO_SS_DATA) {
1020                 for (i = 0; i < dst->ss_dur; i++ ) {
1021                         dst->ss_iops_data[i] = le64_to_cpu(src->ss_iops_data[i]);
1022                         dst->ss_bw_data[i] = le64_to_cpu(src->ss_bw_data[i]);
1023                 }
1024         }
1025 }
1026
1027 static void convert_gs(struct group_run_stats *dst, struct group_run_stats *src)
1028 {
1029         int i;
1030
1031         for (i = 0; i < DDIR_RWDIR_CNT; i++) {
1032                 dst->max_run[i]         = le64_to_cpu(src->max_run[i]);
1033                 dst->min_run[i]         = le64_to_cpu(src->min_run[i]);
1034                 dst->max_bw[i]          = le64_to_cpu(src->max_bw[i]);
1035                 dst->min_bw[i]          = le64_to_cpu(src->min_bw[i]);
1036                 dst->iobytes[i]         = le64_to_cpu(src->iobytes[i]);
1037                 dst->agg[i]             = le64_to_cpu(src->agg[i]);
1038         }
1039
1040         dst->kb_base    = le32_to_cpu(src->kb_base);
1041         dst->unit_base  = le32_to_cpu(src->unit_base);
1042         dst->sig_figs   = le32_to_cpu(src->sig_figs);
1043         dst->groupid    = le32_to_cpu(src->groupid);
1044         dst->unified_rw_rep     = le32_to_cpu(src->unified_rw_rep);
1045 }
1046
1047 static void json_object_add_client_info(struct json_object *obj,
1048                                         struct fio_client *client)
1049 {
1050         const char *hostname = client->hostname ? client->hostname : "";
1051
1052         json_object_add_value_string(obj, "hostname", hostname);
1053         json_object_add_value_int(obj, "port", client->port);
1054 }
1055
1056 static void handle_ts(struct fio_client *client, struct fio_net_cmd *cmd)
1057 {
1058         struct cmd_ts_pdu *p = (struct cmd_ts_pdu *) cmd->payload;
1059         struct flist_head *opt_list = NULL;
1060         struct json_object *tsobj;
1061
1062         if (output_format & FIO_OUTPUT_TERSE)
1063                 return;
1064
1065         if (client->opt_lists && p->ts.thread_number <= client->jobs)
1066                 opt_list = &client->opt_lists[p->ts.thread_number - 1];
1067
1068         tsobj = show_thread_status(&p->ts, &p->rs, opt_list, NULL);
1069         client->did_stat = true;
1070         if (tsobj) {
1071                 json_object_add_client_info(tsobj, client);
1072                 json_array_add_value_object(clients_array, tsobj);
1073         }
1074
1075         if (sum_stat_clients <= 1)
1076                 return;
1077
1078         sum_thread_stats(&client_ts, &p->ts, sum_stat_nr == 1);
1079         sum_group_stats(&client_gs, &p->rs);
1080
1081         client_ts.members++;
1082         client_ts.thread_number = p->ts.thread_number;
1083         client_ts.groupid = p->ts.groupid;
1084         client_ts.unified_rw_rep = p->ts.unified_rw_rep;
1085         client_ts.sig_figs = p->ts.sig_figs;
1086
1087         if (++sum_stat_nr == sum_stat_clients) {
1088                 strcpy(client_ts.name, "All clients");
1089                 tsobj = show_thread_status(&client_ts, &client_gs, NULL, NULL);
1090                 if (tsobj) {
1091                         json_object_add_client_info(tsobj, client);
1092                         json_array_add_value_object(clients_array, tsobj);
1093                 }
1094         }
1095 }
1096
1097 static void handle_gs(struct fio_client *client, struct fio_net_cmd *cmd)
1098 {
1099         struct group_run_stats *gs = (struct group_run_stats *) cmd->payload;
1100
1101         if (output_format & FIO_OUTPUT_TERSE)
1102                 return;
1103
1104         if (output_format & FIO_OUTPUT_NORMAL)
1105                 show_group_stats(gs, NULL);
1106 }
1107
1108 static void handle_job_opt(struct fio_client *client, struct fio_net_cmd *cmd)
1109 {
1110         struct cmd_job_option *pdu = (struct cmd_job_option *) cmd->payload;
1111         struct print_option *p;
1112
1113         if (!job_opt_object)
1114                 return;
1115
1116         pdu->global = le16_to_cpu(pdu->global);
1117         pdu->truncated = le16_to_cpu(pdu->truncated);
1118         pdu->groupid = le32_to_cpu(pdu->groupid);
1119
1120         p = malloc(sizeof(*p));
1121         p->name = strdup((char *) pdu->name);
1122         if (pdu->value[0] != '\0')
1123                 p->value = strdup((char *) pdu->value);
1124         else
1125                 p->value = NULL;
1126
1127         if (pdu->global) {
1128                 const char *pos = "";
1129
1130                 if (p->value)
1131                         pos = p->value;
1132
1133                 json_object_add_value_string(job_opt_object, p->name, pos);
1134         } else if (client->opt_lists) {
1135                 struct flist_head *opt_list = &client->opt_lists[pdu->groupid];
1136
1137                 flist_add_tail(&p->list, opt_list);
1138         }
1139 }
1140
1141 static void handle_text(struct fio_client *client, struct fio_net_cmd *cmd)
1142 {
1143         struct cmd_text_pdu *pdu = (struct cmd_text_pdu *) cmd->payload;
1144         const char *buf = (const char *) pdu->buf;
1145         const char *name;
1146         int fio_unused ret;
1147         struct buf_output out;
1148
1149         buf_output_init(&out);
1150
1151         name = client->name ? client->name : client->hostname;
1152
1153         if (!client->skip_newline && !(output_format & FIO_OUTPUT_TERSE))
1154                 log_buf(&out, "<%s> ", name);
1155         log_buf(&out, "%s", buf);
1156         log_info_buf(out.buf, out.buflen);
1157         client->skip_newline = strchr(buf, '\n') == NULL;
1158 }
1159
1160 static void convert_agg(struct disk_util_agg *agg)
1161 {
1162         int i;
1163
1164         for (i = 0; i < 2; i++) {
1165                 agg->ios[i]     = le64_to_cpu(agg->ios[i]);
1166                 agg->merges[i]  = le64_to_cpu(agg->merges[i]);
1167                 agg->sectors[i] = le64_to_cpu(agg->sectors[i]);
1168                 agg->ticks[i]   = le64_to_cpu(agg->ticks[i]);
1169         }
1170
1171         agg->io_ticks           = le64_to_cpu(agg->io_ticks);
1172         agg->time_in_queue      = le64_to_cpu(agg->time_in_queue);
1173         agg->slavecount         = le32_to_cpu(agg->slavecount);
1174         agg->max_util.u.f       = fio_uint64_to_double(le64_to_cpu(agg->max_util.u.i));
1175 }
1176
1177 static void convert_dus(struct disk_util_stat *dus)
1178 {
1179         int i;
1180
1181         for (i = 0; i < 2; i++) {
1182                 dus->s.ios[i]           = le64_to_cpu(dus->s.ios[i]);
1183                 dus->s.merges[i]        = le64_to_cpu(dus->s.merges[i]);
1184                 dus->s.sectors[i]       = le64_to_cpu(dus->s.sectors[i]);
1185                 dus->s.ticks[i]         = le64_to_cpu(dus->s.ticks[i]);
1186         }
1187
1188         dus->s.io_ticks         = le64_to_cpu(dus->s.io_ticks);
1189         dus->s.time_in_queue    = le64_to_cpu(dus->s.time_in_queue);
1190         dus->s.msec             = le64_to_cpu(dus->s.msec);
1191 }
1192
1193 static void handle_du(struct fio_client *client, struct fio_net_cmd *cmd)
1194 {
1195         struct cmd_du_pdu *du = (struct cmd_du_pdu *) cmd->payload;
1196
1197         if (output_format & FIO_OUTPUT_TERSE)
1198                 return;
1199
1200         if (!client->disk_stats_shown) {
1201                 client->disk_stats_shown = true;
1202                 if (!(output_format & FIO_OUTPUT_JSON))
1203                         log_info("\nDisk stats (read/write):\n");
1204         }
1205
1206         if (output_format & FIO_OUTPUT_JSON) {
1207                 struct json_object *duobj;
1208                 json_array_add_disk_util(&du->dus, &du->agg, du_array);
1209                 duobj = json_array_last_value_object(du_array);
1210                 json_object_add_client_info(duobj, client);
1211         }
1212         if (output_format & FIO_OUTPUT_NORMAL)
1213                 print_disk_util(&du->dus, &du->agg, 0, NULL);
1214 }
1215
1216 static void convert_jobs_eta(struct jobs_eta *je)
1217 {
1218         int i;
1219
1220         je->nr_running          = le32_to_cpu(je->nr_running);
1221         je->nr_ramp             = le32_to_cpu(je->nr_ramp);
1222         je->nr_pending          = le32_to_cpu(je->nr_pending);
1223         je->nr_setting_up       = le32_to_cpu(je->nr_setting_up);
1224         je->files_open          = le32_to_cpu(je->files_open);
1225
1226         for (i = 0; i < DDIR_RWDIR_CNT; i++) {
1227                 je->m_rate[i]   = le64_to_cpu(je->m_rate[i]);
1228                 je->t_rate[i]   = le64_to_cpu(je->t_rate[i]);
1229                 je->m_iops[i]   = le32_to_cpu(je->m_iops[i]);
1230                 je->t_iops[i]   = le32_to_cpu(je->t_iops[i]);
1231                 je->rate[i]     = le64_to_cpu(je->rate[i]);
1232                 je->iops[i]     = le32_to_cpu(je->iops[i]);
1233         }
1234
1235         je->elapsed_sec         = le64_to_cpu(je->elapsed_sec);
1236         je->eta_sec             = le64_to_cpu(je->eta_sec);
1237         je->nr_threads          = le32_to_cpu(je->nr_threads);
1238         je->is_pow2             = le32_to_cpu(je->is_pow2);
1239         je->unit_base           = le32_to_cpu(je->unit_base);
1240         je->sig_figs            = le32_to_cpu(je->sig_figs);
1241 }
1242
1243 void fio_client_sum_jobs_eta(struct jobs_eta *dst, struct jobs_eta *je)
1244 {
1245         int i;
1246
1247         dst->nr_running         += je->nr_running;
1248         dst->nr_ramp            += je->nr_ramp;
1249         dst->nr_pending         += je->nr_pending;
1250         dst->nr_setting_up      += je->nr_setting_up;
1251         dst->files_open         += je->files_open;
1252
1253         for (i = 0; i < DDIR_RWDIR_CNT; i++) {
1254                 dst->m_rate[i]  += je->m_rate[i];
1255                 dst->t_rate[i]  += je->t_rate[i];
1256                 dst->m_iops[i]  += je->m_iops[i];
1257                 dst->t_iops[i]  += je->t_iops[i];
1258                 dst->rate[i]    += je->rate[i];
1259                 dst->iops[i]    += je->iops[i];
1260         }
1261
1262         dst->elapsed_sec        += je->elapsed_sec;
1263
1264         if (je->eta_sec > dst->eta_sec)
1265                 dst->eta_sec = je->eta_sec;
1266
1267         dst->nr_threads         += je->nr_threads;
1268
1269         /*
1270          * This wont be correct for multiple strings, but at least it
1271          * works for the basic cases.
1272          */
1273         strcpy((char *) dst->run_str, (char *) je->run_str);
1274 }
1275
1276 static bool remove_reply_cmd(struct fio_client *client, struct fio_net_cmd *cmd)
1277 {
1278         struct fio_net_cmd_reply *reply = NULL;
1279         struct flist_head *entry;
1280
1281         flist_for_each(entry, &client->cmd_list) {
1282                 reply = flist_entry(entry, struct fio_net_cmd_reply, list);
1283
1284                 if (cmd->tag == (uintptr_t) reply)
1285                         break;
1286
1287                 reply = NULL;
1288         }
1289
1290         if (!reply) {
1291                 log_err("fio: client: unable to find matching tag (%llx)\n", (unsigned long long) cmd->tag);
1292                 return false;
1293         }
1294
1295         flist_del(&reply->list);
1296         cmd->tag = reply->saved_tag;
1297         free(reply);
1298         return true;
1299 }
1300
1301 int fio_client_wait_for_reply(struct fio_client *client, uint64_t tag)
1302 {
1303         do {
1304                 struct fio_net_cmd_reply *reply = NULL;
1305                 struct flist_head *entry;
1306
1307                 flist_for_each(entry, &client->cmd_list) {
1308                         reply = flist_entry(entry, struct fio_net_cmd_reply, list);
1309
1310                         if (tag == (uintptr_t) reply)
1311                                 break;
1312
1313                         reply = NULL;
1314                 }
1315
1316                 if (!reply)
1317                         break;
1318
1319                 usleep(1000);
1320         } while (1);
1321
1322         return 0;
1323 }
1324
1325 static void handle_eta(struct fio_client *client, struct fio_net_cmd *cmd)
1326 {
1327         struct jobs_eta *je = (struct jobs_eta *) cmd->payload;
1328         struct client_eta *eta = (struct client_eta *) (uintptr_t) cmd->tag;
1329
1330         dprint(FD_NET, "client: got eta tag %p, %d\n", eta, eta->pending);
1331
1332         assert(client->eta_in_flight == eta);
1333
1334         client->eta_in_flight = NULL;
1335         flist_del_init(&client->eta_list);
1336         client->eta_timeouts = 0;
1337
1338         if (client->ops->jobs_eta)
1339                 client->ops->jobs_eta(client, je);
1340
1341         fio_client_sum_jobs_eta(&eta->eta, je);
1342         fio_client_dec_jobs_eta(eta, client->ops->eta);
1343 }
1344
1345 static void client_flush_hist_samples(FILE *f, int hist_coarseness, void *samples,
1346                                       uint64_t sample_size)
1347 {
1348         struct io_sample *s;
1349         int log_offset;
1350         uint64_t i, j, nr_samples;
1351         struct io_u_plat_entry *entry;
1352         uint64_t *io_u_plat;
1353
1354         int stride = 1 << hist_coarseness;
1355
1356         if (!sample_size)
1357                 return;
1358
1359         s = __get_sample(samples, 0, 0);
1360         log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
1361
1362         nr_samples = sample_size / __log_entry_sz(log_offset);
1363
1364         for (i = 0; i < nr_samples; i++) {
1365
1366                 s = (struct io_sample *)((char *)__get_sample(samples, log_offset, i) +
1367                         i * sizeof(struct io_u_plat_entry));
1368
1369                 entry = s->data.plat_entry;
1370                 io_u_plat = entry->io_u_plat;
1371
1372                 fprintf(f, "%lu, %u, %llu, ", (unsigned long) s->time,
1373                                                 io_sample_ddir(s), (unsigned long long) s->bs);
1374                 for (j = 0; j < FIO_IO_U_PLAT_NR - stride; j += stride) {
1375                         fprintf(f, "%llu, ", (unsigned long long)hist_sum(j, stride, io_u_plat, NULL));
1376                 }
1377                 fprintf(f, "%llu\n", (unsigned long long)
1378                         hist_sum(FIO_IO_U_PLAT_NR - stride, stride, io_u_plat, NULL));
1379
1380         }
1381 }
1382
1383 static int fio_client_handle_iolog(struct fio_client *client,
1384                                    struct fio_net_cmd *cmd)
1385 {
1386         struct cmd_iolog_pdu *pdu = NULL;
1387         bool store_direct;
1388         char *log_pathname = NULL;
1389         int ret = 0;
1390
1391         pdu = convert_iolog(cmd, &store_direct);
1392         if (!pdu) {
1393                 log_err("fio: failed converting IO log\n");
1394                 ret = 1;
1395                 goto out;
1396         }
1397
1398         /* allocate buffer big enough for next sprintf() call */
1399         log_pathname = malloc(10 + strlen((char *)pdu->name) +
1400                         strlen(client->hostname));
1401         if (!log_pathname) {
1402                 log_err("fio: memory allocation of unique pathname failed\n");
1403                 ret = -1;
1404                 goto out;
1405         }
1406         /* generate a unique pathname for the log file using hostname */
1407         sprintf(log_pathname, "%s.%s", pdu->name, client->hostname);
1408
1409         if (store_direct) {
1410                 ssize_t wrote;
1411                 size_t sz;
1412                 int fd;
1413
1414                 fd = open((const char *) log_pathname,
1415                                 O_WRONLY | O_CREAT | O_TRUNC, 0644);
1416                 if (fd < 0) {
1417                         log_err("fio: open log %s: %s\n",
1418                                 log_pathname, strerror(errno));
1419                         ret = 1;
1420                         goto out;
1421                 }
1422
1423                 sz = cmd->pdu_len - sizeof(*pdu);
1424                 wrote = write(fd, pdu->samples, sz);
1425                 close(fd);
1426
1427                 if (wrote != sz) {
1428                         log_err("fio: short write on compressed log\n");
1429                         ret = 1;
1430                         goto out;
1431                 }
1432
1433                 ret = 0;
1434         } else {
1435                 FILE *f;
1436                 f = fopen((const char *) log_pathname, "w");
1437                 if (!f) {
1438                         log_err("fio: fopen log %s : %s\n",
1439                                 log_pathname, strerror(errno));
1440                         ret = 1;
1441                         goto out;
1442                 }
1443
1444                 if (pdu->log_type == IO_LOG_TYPE_HIST) {
1445                         client_flush_hist_samples(f, pdu->log_hist_coarseness, pdu->samples,
1446                                            pdu->nr_samples * sizeof(struct io_sample));
1447                 } else {
1448                         flush_samples(f, pdu->samples,
1449                                         pdu->nr_samples * sizeof(struct io_sample));
1450                 }
1451                 fclose(f);
1452                 ret = 0;
1453         }
1454
1455 out:
1456         if (pdu && pdu != (void *) cmd->payload)
1457                 free(pdu);
1458
1459         if (log_pathname)
1460                 free(log_pathname);
1461
1462         return ret;
1463 }
1464
1465 static void handle_probe(struct fio_client *client, struct fio_net_cmd *cmd)
1466 {
1467         struct cmd_probe_reply_pdu *probe = (struct cmd_probe_reply_pdu *) cmd->payload;
1468         const char *os, *arch;
1469         char bit[16];
1470
1471         if (output_format & FIO_OUTPUT_TERSE)
1472                 return;
1473
1474         os = fio_get_os_string(probe->os);
1475         if (!os)
1476                 os = "unknown";
1477
1478         arch = fio_get_arch_string(probe->arch);
1479         if (!arch)
1480                 os = "unknown";
1481
1482         sprintf(bit, "%d-bit", probe->bpp * 8);
1483         probe->flags = le64_to_cpu(probe->flags);
1484
1485         if (!(output_format & FIO_OUTPUT_JSON))
1486                 log_info("hostname=%s, be=%u, %s, os=%s, arch=%s, fio=%s, flags=%lx\n",
1487                         probe->hostname, probe->bigendian, bit, os, arch,
1488                         probe->fio_version, (unsigned long) probe->flags);
1489
1490         if (!client->name)
1491                 client->name = strdup((char *) probe->hostname);
1492 }
1493
1494 static void handle_start(struct fio_client *client, struct fio_net_cmd *cmd)
1495 {
1496         struct cmd_start_pdu *pdu = (struct cmd_start_pdu *) cmd->payload;
1497
1498         client->state = Client_started;
1499         client->jobs = le32_to_cpu(pdu->jobs);
1500         client->nr_stat = le32_to_cpu(pdu->stat_outputs);
1501
1502         if (client->jobs) {
1503                 int i;
1504
1505                 if (client->opt_lists)
1506                         free(client->opt_lists);
1507
1508                 client->opt_lists = malloc(client->jobs * sizeof(struct flist_head));
1509                 for (i = 0; i < client->jobs; i++)
1510                         INIT_FLIST_HEAD(&client->opt_lists[i]);
1511         }
1512
1513         sum_stat_clients += client->nr_stat;
1514 }
1515
1516 static void handle_stop(struct fio_client *client)
1517 {
1518         if (client->error)
1519                 log_info("client <%s>: exited with error %d\n", client->hostname, client->error);
1520 }
1521
1522 static void convert_stop(struct fio_net_cmd *cmd)
1523 {
1524         struct cmd_end_pdu *pdu = (struct cmd_end_pdu *) cmd->payload;
1525
1526         pdu->error = le32_to_cpu(pdu->error);
1527 }
1528
1529 static void convert_text(struct fio_net_cmd *cmd)
1530 {
1531         struct cmd_text_pdu *pdu = (struct cmd_text_pdu *) cmd->payload;
1532
1533         pdu->level      = le32_to_cpu(pdu->level);
1534         pdu->buf_len    = le32_to_cpu(pdu->buf_len);
1535         pdu->log_sec    = le64_to_cpu(pdu->log_sec);
1536         pdu->log_usec   = le64_to_cpu(pdu->log_usec);
1537 }
1538
1539 static struct cmd_iolog_pdu *convert_iolog_gz(struct fio_net_cmd *cmd,
1540                                               struct cmd_iolog_pdu *pdu)
1541 {
1542 #ifdef CONFIG_ZLIB
1543         struct cmd_iolog_pdu *ret;
1544         z_stream stream;
1545         uint64_t nr_samples;
1546         size_t total;
1547         char *p;
1548
1549         stream.zalloc = Z_NULL;
1550         stream.zfree = Z_NULL;
1551         stream.opaque = Z_NULL;
1552         stream.avail_in = 0;
1553         stream.next_in = Z_NULL;
1554
1555         if (inflateInit(&stream) != Z_OK)
1556                 return NULL;
1557
1558         /*
1559          * Get header first, it's not compressed
1560          */
1561         nr_samples = le64_to_cpu(pdu->nr_samples);
1562
1563         if (pdu->log_type == IO_LOG_TYPE_HIST)
1564                 total = nr_samples * (__log_entry_sz(le32_to_cpu(pdu->log_offset)) +
1565                                         sizeof(struct io_u_plat_entry));
1566         else
1567                 total = nr_samples * __log_entry_sz(le32_to_cpu(pdu->log_offset));
1568         ret = malloc(total + sizeof(*pdu));
1569         ret->nr_samples = nr_samples;
1570
1571         memcpy(ret, pdu, sizeof(*pdu));
1572
1573         p = (char *) ret + sizeof(*pdu);
1574
1575         stream.avail_in = cmd->pdu_len - sizeof(*pdu);
1576         stream.next_in = (void *)((char *) pdu + sizeof(*pdu));
1577         while (stream.avail_in) {
1578                 unsigned int this_chunk = 65536;
1579                 unsigned int this_len;
1580                 int err;
1581
1582                 if (this_chunk > total)
1583                         this_chunk = total;
1584
1585                 stream.avail_out = this_chunk;
1586                 stream.next_out = (void *)p;
1587                 err = inflate(&stream, Z_NO_FLUSH);
1588                 /* may be Z_OK, or Z_STREAM_END */
1589                 if (err < 0) {
1590                         log_err("fio: inflate error %d\n", err);
1591                         free(ret);
1592                         ret = NULL;
1593                         goto err;
1594                 }
1595
1596                 this_len = this_chunk - stream.avail_out;
1597                 p += this_len;
1598                 total -= this_len;
1599         }
1600
1601 err:
1602         inflateEnd(&stream);
1603         return ret;
1604 #else
1605         return NULL;
1606 #endif
1607 }
1608
1609 /*
1610  * This has been compressed on the server side, since it can be big.
1611  * Uncompress here.
1612  */
1613 static struct cmd_iolog_pdu *convert_iolog(struct fio_net_cmd *cmd,
1614                                            bool *store_direct)
1615 {
1616         struct cmd_iolog_pdu *pdu = (struct cmd_iolog_pdu *) cmd->payload;
1617         struct cmd_iolog_pdu *ret;
1618         uint64_t i;
1619         int compressed;
1620         void *samples;
1621
1622         *store_direct = false;
1623
1624         /*
1625          * Convert if compressed and we support it. If it's not
1626          * compressed, we need not do anything.
1627          */
1628         compressed = le32_to_cpu(pdu->compressed);
1629         if (compressed == XMIT_COMPRESSED) {
1630 #ifndef CONFIG_ZLIB
1631                 log_err("fio: server sent compressed data by mistake\n");
1632                 return NULL;
1633 #endif
1634                 ret = convert_iolog_gz(cmd, pdu);
1635                 if (!ret) {
1636                         log_err("fio: failed decompressing log\n");
1637                         return NULL;
1638                 }
1639         } else if (compressed == STORE_COMPRESSED) {
1640                 *store_direct = true;
1641                 ret = pdu;
1642         } else
1643                 ret = pdu;
1644
1645         ret->nr_samples         = le64_to_cpu(ret->nr_samples);
1646         ret->thread_number      = le32_to_cpu(ret->thread_number);
1647         ret->log_type           = le32_to_cpu(ret->log_type);
1648         ret->compressed         = le32_to_cpu(ret->compressed);
1649         ret->log_offset         = le32_to_cpu(ret->log_offset);
1650         ret->log_hist_coarseness = le32_to_cpu(ret->log_hist_coarseness);
1651
1652         if (*store_direct)
1653                 return ret;
1654
1655         samples = &ret->samples[0];
1656         for (i = 0; i < ret->nr_samples; i++) {
1657                 struct io_sample *s;
1658
1659                 s = __get_sample(samples, ret->log_offset, i);
1660                 if (ret->log_type == IO_LOG_TYPE_HIST)
1661                         s = (struct io_sample *)((char *)s + sizeof(struct io_u_plat_entry) * i);
1662
1663                 s->time         = le64_to_cpu(s->time);
1664                 s->data.val     = le64_to_cpu(s->data.val);
1665                 s->__ddir       = le32_to_cpu(s->__ddir);
1666                 s->bs           = le64_to_cpu(s->bs);
1667
1668                 if (ret->log_offset) {
1669                         struct io_sample_offset *so = (void *) s;
1670
1671                         so->offset = le64_to_cpu(so->offset);
1672                 }
1673
1674                 if (ret->log_type == IO_LOG_TYPE_HIST) {
1675                         s->data.plat_entry = (struct io_u_plat_entry *)(((char *)s) + sizeof(*s));
1676                         s->data.plat_entry->list.next = NULL;
1677                         s->data.plat_entry->list.prev = NULL;
1678                 }
1679         }
1680
1681         return ret;
1682 }
1683
1684 static void sendfile_reply(int fd, struct cmd_sendfile_reply *rep,
1685                            size_t size, uint64_t tag)
1686 {
1687         rep->error = cpu_to_le32(rep->error);
1688         fio_net_send_cmd(fd, FIO_NET_CMD_SENDFILE, rep, size, &tag, NULL);
1689 }
1690
1691 static int fio_send_file(struct fio_client *client, struct cmd_sendfile *pdu,
1692                          uint64_t tag)
1693 {
1694         struct cmd_sendfile_reply *rep;
1695         struct stat sb;
1696         size_t size;
1697         int fd;
1698
1699         size = sizeof(*rep);
1700         rep = malloc(size);
1701
1702         if (stat((char *)pdu->path, &sb) < 0) {
1703 fail:
1704                 rep->error = errno;
1705                 sendfile_reply(client->fd, rep, size, tag);
1706                 free(rep);
1707                 return 1;
1708         }
1709
1710         size += sb.st_size;
1711         rep = realloc(rep, size);
1712         rep->size = cpu_to_le32((uint32_t) sb.st_size);
1713
1714         fd = open((char *)pdu->path, O_RDONLY);
1715         if (fd == -1 )
1716                 goto fail;
1717
1718         rep->error = read_data(fd, &rep->data, sb.st_size);
1719         sendfile_reply(client->fd, rep, size, tag);
1720         free(rep);
1721         close(fd);
1722         return 0;
1723 }
1724
1725 int fio_handle_client(struct fio_client *client)
1726 {
1727         struct client_ops *ops = client->ops;
1728         struct fio_net_cmd *cmd;
1729         int size;
1730
1731         dprint(FD_NET, "client: handle %s\n", client->hostname);
1732
1733         cmd = fio_net_recv_cmd(client->fd, true);
1734         if (!cmd)
1735                 return 0;
1736
1737         dprint(FD_NET, "client: got cmd op %s from %s (pdu=%u)\n",
1738                 fio_server_op(cmd->opcode), client->hostname, cmd->pdu_len);
1739
1740         client->last_cmd = cmd->opcode;
1741
1742         switch (cmd->opcode) {
1743         case FIO_NET_CMD_QUIT:
1744                 if (ops->quit)
1745                         ops->quit(client, cmd);
1746                 remove_client(client);
1747                 break;
1748         case FIO_NET_CMD_TEXT:
1749                 convert_text(cmd);
1750                 ops->text(client, cmd);
1751                 break;
1752         case FIO_NET_CMD_DU: {
1753                 struct cmd_du_pdu *du = (struct cmd_du_pdu *) cmd->payload;
1754
1755                 convert_dus(&du->dus);
1756                 convert_agg(&du->agg);
1757
1758                 ops->disk_util(client, cmd);
1759                 break;
1760                 }
1761         case FIO_NET_CMD_TS: {
1762                 struct cmd_ts_pdu *p = (struct cmd_ts_pdu *) cmd->payload;
1763
1764                 dprint(FD_NET, "client: ts->ss_state = %u\n", (unsigned int) le32_to_cpu(p->ts.ss_state));
1765                 if (le32_to_cpu(p->ts.ss_state) & FIO_SS_DATA) {
1766                         dprint(FD_NET, "client: received steadystate ring buffers\n");
1767
1768                         size = le64_to_cpu(p->ts.ss_dur);
1769                         p->ts.ss_iops_data = (uint64_t *) ((struct cmd_ts_pdu *)cmd->payload + 1);
1770                         p->ts.ss_bw_data = p->ts.ss_iops_data + size;
1771                 }
1772
1773                 convert_ts(&p->ts, &p->ts);
1774                 convert_gs(&p->rs, &p->rs);
1775
1776                 ops->thread_status(client, cmd);
1777                 break;
1778                 }
1779         case FIO_NET_CMD_GS: {
1780                 struct group_run_stats *gs = (struct group_run_stats *) cmd->payload;
1781
1782                 convert_gs(gs, gs);
1783
1784                 ops->group_stats(client, cmd);
1785                 break;
1786                 }
1787         case FIO_NET_CMD_ETA: {
1788                 struct jobs_eta *je = (struct jobs_eta *) cmd->payload;
1789
1790                 if (!remove_reply_cmd(client, cmd))
1791                         break;
1792                 convert_jobs_eta(je);
1793                 handle_eta(client, cmd);
1794                 break;
1795                 }
1796         case FIO_NET_CMD_PROBE:
1797                 remove_reply_cmd(client, cmd);
1798                 ops->probe(client, cmd);
1799                 break;
1800         case FIO_NET_CMD_SERVER_START:
1801                 client->state = Client_running;
1802                 if (ops->job_start)
1803                         ops->job_start(client, cmd);
1804                 break;
1805         case FIO_NET_CMD_START: {
1806                 struct cmd_start_pdu *pdu = (struct cmd_start_pdu *) cmd->payload;
1807
1808                 pdu->jobs = le32_to_cpu(pdu->jobs);
1809                 ops->start(client, cmd);
1810                 break;
1811                 }
1812         case FIO_NET_CMD_STOP: {
1813                 struct cmd_end_pdu *pdu = (struct cmd_end_pdu *) cmd->payload;
1814
1815                 convert_stop(cmd);
1816                 client->state = Client_stopped;
1817                 client->error = le32_to_cpu(pdu->error);
1818                 client->signal = le32_to_cpu(pdu->signal);
1819                 ops->stop(client);
1820                 break;
1821                 }
1822         case FIO_NET_CMD_ADD_JOB: {
1823                 struct cmd_add_job_pdu *pdu = (struct cmd_add_job_pdu *) cmd->payload;
1824
1825                 client->thread_number = le32_to_cpu(pdu->thread_number);
1826                 client->groupid = le32_to_cpu(pdu->groupid);
1827
1828                 if (ops->add_job)
1829                         ops->add_job(client, cmd);
1830                 break;
1831                 }
1832         case FIO_NET_CMD_IOLOG:
1833                 fio_client_handle_iolog(client, cmd);
1834                 break;
1835         case FIO_NET_CMD_UPDATE_JOB:
1836                 ops->update_job(client, cmd);
1837                 remove_reply_cmd(client, cmd);
1838                 break;
1839         case FIO_NET_CMD_VTRIGGER: {
1840                 struct all_io_list *pdu = (struct all_io_list *) cmd->payload;
1841                 char buf[128];
1842                 int off = 0;
1843
1844                 if (aux_path) {
1845                         strcpy(buf, aux_path);
1846                         off = strlen(buf);
1847                 }
1848
1849                 __verify_save_state(pdu, server_name(client, &buf[off], sizeof(buf) - off));
1850                 exec_trigger(trigger_cmd);
1851                 break;
1852                 }
1853         case FIO_NET_CMD_SENDFILE: {
1854                 struct cmd_sendfile *pdu = (struct cmd_sendfile *) cmd->payload;
1855                 fio_send_file(client, pdu, cmd->tag);
1856                 break;
1857                 }
1858         case FIO_NET_CMD_JOB_OPT: {
1859                 handle_job_opt(client, cmd);
1860                 break;
1861         }
1862         default:
1863                 log_err("fio: unknown client op: %s\n", fio_server_op(cmd->opcode));
1864                 break;
1865         }
1866
1867         free(cmd);
1868         return 1;
1869 }
1870
1871 int fio_clients_send_trigger(const char *cmd)
1872 {
1873         struct flist_head *entry;
1874         struct fio_client *client;
1875         size_t slen;
1876
1877         dprint(FD_NET, "client: send vtrigger: %s\n", cmd);
1878
1879         if (!cmd)
1880                 slen = 0;
1881         else
1882                 slen = strlen(cmd);
1883
1884         flist_for_each(entry, &client_list) {
1885                 struct cmd_vtrigger_pdu *pdu;
1886
1887                 client = flist_entry(entry, struct fio_client, list);
1888
1889                 pdu = malloc(sizeof(*pdu) + slen);
1890                 pdu->len = cpu_to_le16((uint16_t) slen);
1891                 if (slen)
1892                         memcpy(pdu->cmd, cmd, slen);
1893                 fio_net_send_cmd(client->fd, FIO_NET_CMD_VTRIGGER, pdu,
1894                                         sizeof(*pdu) + slen, NULL, NULL);
1895                 free(pdu);
1896         }
1897
1898         return 0;
1899 }
1900
1901 static void request_client_etas(struct client_ops *ops)
1902 {
1903         struct fio_client *client;
1904         struct flist_head *entry;
1905         struct client_eta *eta;
1906         int skipped = 0;
1907
1908         if (eta_print == FIO_ETA_NEVER)
1909                 return;
1910
1911         dprint(FD_NET, "client: request eta (%d)\n", nr_clients);
1912
1913         eta = calloc(1, sizeof(*eta) + __THREAD_RUNSTR_SZ(REAL_MAX_JOBS));
1914         eta->pending = nr_clients;
1915
1916         flist_for_each(entry, &client_list) {
1917                 client = flist_entry(entry, struct fio_client, list);
1918
1919                 if (!flist_empty(&client->eta_list)) {
1920                         skipped++;
1921                         continue;
1922                 }
1923                 if (client->state != Client_running)
1924                         continue;
1925
1926                 assert(!client->eta_in_flight);
1927                 flist_add_tail(&client->eta_list, &eta_list);
1928                 client->eta_in_flight = eta;
1929                 fio_net_send_simple_cmd(client->fd, FIO_NET_CMD_SEND_ETA,
1930                                         (uintptr_t) eta, &client->cmd_list);
1931         }
1932
1933         while (skipped--) {
1934                 if (!fio_client_dec_jobs_eta(eta, ops->eta))
1935                         break;
1936         }
1937
1938         dprint(FD_NET, "client: requested eta tag %p\n", eta);
1939 }
1940
1941 /*
1942  * A single SEND_ETA timeout isn't fatal. Attempt to recover.
1943  */
1944 static int handle_cmd_timeout(struct fio_client *client,
1945                               struct fio_net_cmd_reply *reply)
1946 {
1947         uint16_t reply_opcode = reply->opcode;
1948
1949         flist_del(&reply->list);
1950         free(reply);
1951
1952         if (reply_opcode != FIO_NET_CMD_SEND_ETA)
1953                 return 1;
1954
1955         log_info("client <%s>: timeout on SEND_ETA\n", client->hostname);
1956
1957         flist_del_init(&client->eta_list);
1958         if (client->eta_in_flight) {
1959                 fio_client_dec_jobs_eta(client->eta_in_flight, client->ops->eta);
1960                 client->eta_in_flight = NULL;
1961         }
1962
1963         /*
1964          * If we fail 5 in a row, give up...
1965          */
1966         if (client->eta_timeouts++ > 5)
1967                 return 1;
1968
1969         return 0;
1970 }
1971
1972 static int client_check_cmd_timeout(struct fio_client *client,
1973                                     struct timespec *now)
1974 {
1975         struct fio_net_cmd_reply *reply;
1976         struct flist_head *entry, *tmp;
1977         int ret = 0;
1978
1979         flist_for_each_safe(entry, tmp, &client->cmd_list) {
1980                 unsigned int op;
1981
1982                 reply = flist_entry(entry, struct fio_net_cmd_reply, list);
1983
1984                 if (mtime_since(&reply->ts, now) < FIO_NET_CLIENT_TIMEOUT)
1985                         continue;
1986
1987                 op = reply->opcode;
1988                 if (!handle_cmd_timeout(client, reply))
1989                         continue;
1990
1991                 log_err("fio: client %s, timeout on cmd %s\n", client->hostname,
1992                                                 fio_server_op(op));
1993                 ret = 1;
1994         }
1995
1996         return flist_empty(&client->cmd_list) && ret;
1997 }
1998
1999 static int fio_check_clients_timed_out(void)
2000 {
2001         struct fio_client *client;
2002         struct flist_head *entry, *tmp;
2003         struct timespec ts;
2004         int ret = 0;
2005
2006         fio_gettime(&ts, NULL);
2007
2008         flist_for_each_safe(entry, tmp, &client_list) {
2009                 client = flist_entry(entry, struct fio_client, list);
2010
2011                 if (flist_empty(&client->cmd_list))
2012                         continue;
2013
2014                 if (!client_check_cmd_timeout(client, &ts))
2015                         continue;
2016
2017                 if (client->ops->timed_out)
2018                         client->ops->timed_out(client);
2019                 else
2020                         log_err("fio: client %s timed out\n", client->hostname);
2021
2022                 if (client->last_cmd != FIO_NET_CMD_VTRIGGER)
2023                         client->error = ETIMEDOUT;
2024                 else
2025                         log_info("fio: ignoring timeout due to vtrigger\n");
2026                 remove_client(client);
2027                 ret = 1;
2028         }
2029
2030         return ret;
2031 }
2032
2033 int fio_handle_clients(struct client_ops *ops)
2034 {
2035         struct pollfd *pfds;
2036         int i, ret = 0, retval = 0;
2037
2038         fio_gettime(&eta_ts, NULL);
2039
2040         pfds = malloc(nr_clients * sizeof(struct pollfd));
2041
2042         init_thread_stat(&client_ts);
2043         init_group_run_stat(&client_gs);
2044
2045         while (!exit_backend && nr_clients) {
2046                 struct flist_head *entry, *tmp;
2047                 struct fio_client *client;
2048
2049                 i = 0;
2050                 flist_for_each_safe(entry, tmp, &client_list) {
2051                         client = flist_entry(entry, struct fio_client, list);
2052
2053                         if (!client->sent_job && !client->ops->stay_connected &&
2054                             flist_empty(&client->cmd_list)) {
2055                                 remove_client(client);
2056                                 continue;
2057                         }
2058
2059                         pfds[i].fd = client->fd;
2060                         pfds[i].events = POLLIN;
2061                         i++;
2062                 }
2063
2064                 if (!nr_clients)
2065                         break;
2066
2067                 assert(i == nr_clients);
2068
2069                 do {
2070                         struct timespec ts;
2071                         int timeout;
2072
2073                         fio_gettime(&ts, NULL);
2074                         if (eta_time_within_slack(mtime_since(&eta_ts, &ts))) {
2075                                 request_client_etas(ops);
2076                                 memcpy(&eta_ts, &ts, sizeof(ts));
2077
2078                                 if (fio_check_clients_timed_out())
2079                                         break;
2080                         }
2081
2082                         check_trigger_file();
2083
2084                         timeout = min(100u, ops->eta_msec);
2085
2086                         ret = poll(pfds, nr_clients, timeout);
2087                         if (ret < 0) {
2088                                 if (errno == EINTR)
2089                                         continue;
2090                                 log_err("fio: poll clients: %s\n", strerror(errno));
2091                                 break;
2092                         } else if (!ret)
2093                                 continue;
2094                 } while (ret <= 0);
2095
2096                 for (i = 0; i < nr_clients; i++) {
2097                         if (!(pfds[i].revents & POLLIN))
2098                                 continue;
2099
2100                         client = find_client_by_fd(pfds[i].fd);
2101                         if (!client) {
2102                                 log_err("fio: unknown client fd %ld\n", (long) pfds[i].fd);
2103                                 continue;
2104                         }
2105                         if (!fio_handle_client(client)) {
2106                                 log_info("client: host=%s disconnected\n",
2107                                                 client->hostname);
2108                                 remove_client(client);
2109                                 retval = 1;
2110                         } else if (client->error)
2111                                 retval = 1;
2112                         fio_put_client(client);
2113                 }
2114         }
2115
2116         fio_client_json_fini();
2117
2118         free(pfds);
2119         return retval || error_clients;
2120 }
2121
2122 static void client_display_thread_status(struct jobs_eta *je)
2123 {
2124         if (!(output_format & FIO_OUTPUT_JSON))
2125                 display_thread_status(je);
2126 }