gfio: make the end results window a scrolled one
[fio.git] / client.c
1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <unistd.h>
4 #include <limits.h>
5 #include <errno.h>
6 #include <fcntl.h>
7 #include <sys/poll.h>
8 #include <sys/types.h>
9 #include <sys/stat.h>
10 #include <sys/wait.h>
11 #include <sys/socket.h>
12 #include <sys/un.h>
13 #include <netinet/in.h>
14 #include <arpa/inet.h>
15 #include <netdb.h>
16 #include <signal.h>
17
18 #include "fio.h"
19 #include "client.h"
20 #include "server.h"
21 #include "flist.h"
22 #include "hash.h"
23
24 static void handle_du(struct fio_client *client, struct fio_net_cmd *cmd);
25 static void handle_ts(struct fio_client *client, struct fio_net_cmd *cmd);
26 static void handle_gs(struct fio_client *client, struct fio_net_cmd *cmd);
27 static void handle_probe(struct fio_client *client, struct fio_net_cmd *cmd);
28 static void handle_text(struct fio_client *client, struct fio_net_cmd *cmd);
29
30 struct client_ops fio_client_ops = {
31         .text_op        = handle_text,
32         .disk_util      = handle_du,
33         .thread_status  = handle_ts,
34         .group_stats    = handle_gs,
35         .eta            = display_thread_status,
36         .probe          = handle_probe,
37 };
38
39 static struct timeval eta_tv;
40
41 enum {
42         Client_created          = 0,
43         Client_connected        = 1,
44         Client_started          = 2,
45         Client_running          = 3,
46         Client_stopped          = 4,
47         Client_exited           = 5,
48 };
49
50 static FLIST_HEAD(client_list);
51 static FLIST_HEAD(eta_list);
52
53 static FLIST_HEAD(arg_list);
54
55 struct thread_stat client_ts;
56 struct group_run_stats client_gs;
57 int sum_stat_clients;
58
59 static int sum_stat_nr;
60
61 #define FIO_CLIENT_HASH_BITS    7
62 #define FIO_CLIENT_HASH_SZ      (1 << FIO_CLIENT_HASH_BITS)
63 #define FIO_CLIENT_HASH_MASK    (FIO_CLIENT_HASH_SZ - 1)
64 static struct flist_head client_hash[FIO_CLIENT_HASH_SZ];
65
66 static void fio_client_add_hash(struct fio_client *client)
67 {
68         int bucket = hash_long(client->fd, FIO_CLIENT_HASH_BITS);
69
70         bucket &= FIO_CLIENT_HASH_MASK;
71         flist_add(&client->hash_list, &client_hash[bucket]);
72 }
73
74 static void fio_client_remove_hash(struct fio_client *client)
75 {
76         if (!flist_empty(&client->hash_list))
77                 flist_del_init(&client->hash_list);
78 }
79
80 static void fio_init fio_client_hash_init(void)
81 {
82         int i;
83
84         for (i = 0; i < FIO_CLIENT_HASH_SZ; i++)
85                 INIT_FLIST_HEAD(&client_hash[i]);
86 }
87
88 static struct fio_client *find_client_by_fd(int fd)
89 {
90         int bucket = hash_long(fd, FIO_CLIENT_HASH_BITS) & FIO_CLIENT_HASH_MASK;
91         struct fio_client *client;
92         struct flist_head *entry;
93
94         flist_for_each(entry, &client_hash[bucket]) {
95                 client = flist_entry(entry, struct fio_client, hash_list);
96
97                 if (client->fd == fd) {
98                         client->refs++;
99                         return client;
100                 }
101         }
102
103         return NULL;
104 }
105
106 static void remove_client(struct fio_client *client)
107 {
108         assert(client->refs);
109
110         if (--client->refs)
111                 return;
112
113         dprint(FD_NET, "client: removed <%s>\n", client->hostname);
114         flist_del(&client->list);
115
116         fio_client_remove_hash(client);
117
118         if (!flist_empty(&client->eta_list)) {
119                 flist_del_init(&client->eta_list);
120                 fio_client_dec_jobs_eta(client->eta_in_flight, client->ops->eta);
121         }
122
123         free(client->hostname);
124         if (client->argv)
125                 free(client->argv);
126         if (client->name)
127                 free(client->name);
128
129         free(client);
130         nr_clients--;
131         sum_stat_clients--;
132 }
133
134 static void put_client(struct fio_client *client)
135 {
136         remove_client(client);
137 }
138
139 static void __fio_client_add_cmd_option(struct fio_client *client,
140                                         const char *opt)
141 {
142         int index;
143
144         index = client->argc++;
145         client->argv = realloc(client->argv, sizeof(char *) * client->argc);
146         client->argv[index] = strdup(opt);
147         dprint(FD_NET, "client: add cmd %d: %s\n", index, opt);
148 }
149
150 void fio_client_add_cmd_option(void *cookie, const char *opt)
151 {
152         struct fio_client *client = cookie;
153         struct flist_head *entry;
154
155         if (!client || !opt)
156                 return;
157
158         __fio_client_add_cmd_option(client, opt);
159
160         /*
161          * Duplicate arguments to shared client group
162          */
163         flist_for_each(entry, &arg_list) {
164                 client = flist_entry(entry, struct fio_client, arg_list);
165
166                 __fio_client_add_cmd_option(client, opt);
167         }
168 }
169
170 struct fio_client *fio_client_add_explicit(struct client_ops *ops,
171                                            const char *hostname, int type,
172                                            int port)
173 {
174         struct fio_client *client;
175
176         client = malloc(sizeof(*client));
177         memset(client, 0, sizeof(*client));
178
179         INIT_FLIST_HEAD(&client->list);
180         INIT_FLIST_HEAD(&client->hash_list);
181         INIT_FLIST_HEAD(&client->arg_list);
182         INIT_FLIST_HEAD(&client->eta_list);
183         INIT_FLIST_HEAD(&client->cmd_list);
184
185         client->hostname = strdup(hostname);
186
187         if (type == Fio_client_socket)
188                 client->is_sock = 1;
189         else {
190                 int ipv6;
191
192                 ipv6 = type == Fio_client_ipv6;
193                 if (fio_server_parse_host(hostname, &ipv6,
194                                                 &client->addr.sin_addr,
195                                                 &client->addr6.sin6_addr))
196                         goto err;
197
198                 client->port = port;
199         }
200
201         client->fd = -1;
202         client->ops = ops;
203         client->refs = 1;
204
205         __fio_client_add_cmd_option(client, "fio");
206
207         flist_add(&client->list, &client_list);
208         nr_clients++;
209         dprint(FD_NET, "client: added <%s>\n", client->hostname);
210         return client;
211 err:
212         free(client);
213         return NULL;
214 }
215
216 int fio_client_add(struct client_ops *ops, const char *hostname, void **cookie)
217 {
218         struct fio_client *existing = *cookie;
219         struct fio_client *client;
220
221         if (existing) {
222                 /*
223                  * We always add our "exec" name as the option, hence 1
224                  * means empty.
225                  */
226                 if (existing->argc == 1)
227                         flist_add_tail(&existing->arg_list, &arg_list);
228                 else {
229                         while (!flist_empty(&arg_list))
230                                 flist_del_init(arg_list.next);
231                 }
232         }
233
234         client = malloc(sizeof(*client));
235         memset(client, 0, sizeof(*client));
236
237         INIT_FLIST_HEAD(&client->list);
238         INIT_FLIST_HEAD(&client->hash_list);
239         INIT_FLIST_HEAD(&client->arg_list);
240         INIT_FLIST_HEAD(&client->eta_list);
241         INIT_FLIST_HEAD(&client->cmd_list);
242
243         if (fio_server_parse_string(hostname, &client->hostname,
244                                         &client->is_sock, &client->port,
245                                         &client->addr.sin_addr,
246                                         &client->addr6.sin6_addr,
247                                         &client->ipv6))
248                 return -1;
249
250         client->fd = -1;
251         client->ops = ops;
252         client->refs = 1;
253
254         __fio_client_add_cmd_option(client, "fio");
255
256         flist_add(&client->list, &client_list);
257         nr_clients++;
258         dprint(FD_NET, "client: added <%s>\n", client->hostname);
259         *cookie = client;
260         return 0;
261 }
262
263 static int fio_client_connect_ip(struct fio_client *client)
264 {
265         struct sockaddr *addr;
266         fio_socklen_t socklen;
267         int fd, domain;
268
269         if (client->ipv6) {
270                 client->addr6.sin6_family = AF_INET6;
271                 client->addr6.sin6_port = htons(client->port);
272                 domain = AF_INET6;
273                 addr = (struct sockaddr *) &client->addr6;
274                 socklen = sizeof(client->addr6);
275         } else {
276                 client->addr.sin_family = AF_INET;
277                 client->addr.sin_port = htons(client->port);
278                 domain = AF_INET;
279                 addr = (struct sockaddr *) &client->addr;
280                 socklen = sizeof(client->addr);
281         }
282
283         fd = socket(domain, SOCK_STREAM, 0);
284         if (fd < 0) {
285                 log_err("fio: socket: %s\n", strerror(errno));
286                 return -1;
287         }
288
289         if (connect(fd, addr, socklen) < 0) {
290                 log_err("fio: connect: %s\n", strerror(errno));
291                 log_err("fio: failed to connect to %s:%u\n", client->hostname,
292                                                                 client->port);
293                 close(fd);
294                 return -1;
295         }
296
297         return fd;
298 }
299
300 static int fio_client_connect_sock(struct fio_client *client)
301 {
302         struct sockaddr_un *addr = &client->addr_un;
303         fio_socklen_t len;
304         int fd;
305
306         memset(addr, 0, sizeof(*addr));
307         addr->sun_family = AF_UNIX;
308         strcpy(addr->sun_path, client->hostname);
309
310         fd = socket(AF_UNIX, SOCK_STREAM, 0);
311         if (fd < 0) {
312                 log_err("fio: socket: %s\n", strerror(errno));
313                 return -1;
314         }
315
316         len = sizeof(addr->sun_family) + strlen(addr->sun_path) + 1;
317         if (connect(fd, (struct sockaddr *) addr, len) < 0) {
318                 log_err("fio: connect; %s\n", strerror(errno));
319                 close(fd);
320                 return -1;
321         }
322
323         return fd;
324 }
325
326 static int fio_client_connect(struct fio_client *client)
327 {
328         int fd;
329
330         dprint(FD_NET, "client: connect to host %s\n", client->hostname);
331
332         if (client->is_sock)
333                 fd = fio_client_connect_sock(client);
334         else
335                 fd = fio_client_connect_ip(client);
336
337         dprint(FD_NET, "client: %s connected %d\n", client->hostname, fd);
338
339         if (fd < 0)
340                 return 1;
341
342         client->fd = fd;
343         fio_client_add_hash(client);
344         client->state = Client_connected;
345         return 0;
346 }
347
348 void fio_clients_terminate(void)
349 {
350         struct flist_head *entry;
351         struct fio_client *client;
352
353         dprint(FD_NET, "client: terminate clients\n");
354
355         flist_for_each(entry, &client_list) {
356                 client = flist_entry(entry, struct fio_client, list);
357
358                 fio_net_send_simple_cmd(client->fd, FIO_NET_CMD_QUIT, 0, NULL);
359         }
360 }
361
362 static void sig_int(int sig)
363 {
364         dprint(FD_NET, "client: got signal %d\n", sig);
365         fio_clients_terminate();
366 }
367
368 static void client_signal_handler(void)
369 {
370         struct sigaction act;
371
372         memset(&act, 0, sizeof(act));
373         act.sa_handler = sig_int;
374         act.sa_flags = SA_RESTART;
375         sigaction(SIGINT, &act, NULL);
376
377         memset(&act, 0, sizeof(act));
378         act.sa_handler = sig_int;
379         act.sa_flags = SA_RESTART;
380         sigaction(SIGTERM, &act, NULL);
381 }
382
383 static void probe_client(struct fio_client *client)
384 {
385         dprint(FD_NET, "client: send probe\n");
386
387         fio_net_send_simple_cmd(client->fd, FIO_NET_CMD_PROBE, 0, &client->cmd_list);
388 }
389
390 static int send_client_cmd_line(struct fio_client *client)
391 {
392         struct cmd_single_line_pdu *cslp;
393         struct cmd_line_pdu *clp;
394         unsigned long offset;
395         unsigned int *lens;
396         void *pdu;
397         size_t mem;
398         int i, ret;
399
400         dprint(FD_NET, "client: send cmdline %d\n", client->argc);
401
402         lens = malloc(client->argc * sizeof(unsigned int));
403
404         /*
405          * Find out how much mem we need
406          */
407         for (i = 0, mem = 0; i < client->argc; i++) {
408                 lens[i] = strlen(client->argv[i]) + 1;
409                 mem += lens[i];
410         }
411
412         /*
413          * We need one cmd_line_pdu, and argc number of cmd_single_line_pdu
414          */
415         mem += sizeof(*clp) + (client->argc * sizeof(*cslp));
416
417         pdu = malloc(mem);
418         clp = pdu;
419         offset = sizeof(*clp);
420
421         for (i = 0; i < client->argc; i++) {
422                 uint16_t arg_len = lens[i];
423
424                 cslp = pdu + offset;
425                 strcpy((char *) cslp->text, client->argv[i]);
426                 cslp->len = cpu_to_le16(arg_len);
427                 offset += sizeof(*cslp) + arg_len;
428         }
429
430         free(lens);
431         clp->lines = cpu_to_le16(client->argc);
432         ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_JOBLINE, pdu, mem, 0);
433         free(pdu);
434         return ret;
435 }
436
437 int fio_clients_connect(void)
438 {
439         struct fio_client *client;
440         struct flist_head *entry, *tmp;
441         int ret;
442
443 #ifdef WIN32
444         WSADATA wsd;
445         WSAStartup(MAKEWORD(2,2), &wsd);
446 #endif
447
448         dprint(FD_NET, "client: connect all\n");
449
450         client_signal_handler();
451
452         flist_for_each_safe(entry, tmp, &client_list) {
453                 client = flist_entry(entry, struct fio_client, list);
454
455                 ret = fio_client_connect(client);
456                 if (ret) {
457                         remove_client(client);
458                         continue;
459                 }
460
461                 probe_client(client);
462
463                 if (client->argc > 1)
464                         send_client_cmd_line(client);
465         }
466
467         return !nr_clients;
468 }
469
470 /*
471  * Send file contents to server backend. We could use sendfile(), but to remain
472  * more portable lets just read/write the darn thing.
473  */
474 static int fio_client_send_ini(struct fio_client *client, const char *filename)
475 {
476         struct stat sb;
477         char *p, *buf;
478         off_t len;
479         int fd, ret;
480
481         dprint(FD_NET, "send ini %s to %s\n", filename, client->hostname);
482
483         fd = open(filename, O_RDONLY);
484         if (fd < 0) {
485                 log_err("fio: job file <%s> open: %s\n", filename, strerror(errno));
486                 return 1;
487         }
488
489         if (fstat(fd, &sb) < 0) {
490                 log_err("fio: job file stat: %s\n", strerror(errno));
491                 close(fd);
492                 return 1;
493         }
494
495         buf = malloc(sb.st_size);
496
497         len = sb.st_size;
498         p = buf;
499         do {
500                 ret = read(fd, p, len);
501                 if (ret > 0) {
502                         len -= ret;
503                         if (!len)
504                                 break;
505                         p += ret;
506                         continue;
507                 } else if (!ret)
508                         break;
509                 else if (errno == EAGAIN || errno == EINTR)
510                         continue;
511         } while (1);
512
513         if (len) {
514                 log_err("fio: failed reading job file %s\n", filename);
515                 close(fd);
516                 free(buf);
517                 return 1;
518         }
519
520         client->sent_job = 1;
521         ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_JOB, buf, sb.st_size, 0);
522         free(buf);
523         close(fd);
524         return ret;
525 }
526
527 int fio_clients_send_ini(const char *filename)
528 {
529         struct fio_client *client;
530         struct flist_head *entry, *tmp;
531
532         flist_for_each_safe(entry, tmp, &client_list) {
533                 client = flist_entry(entry, struct fio_client, list);
534
535                 if (fio_client_send_ini(client, filename))
536                         remove_client(client);
537
538                 client->sent_job = 1;
539         }
540
541         return !nr_clients;
542 }
543
544 static void convert_io_stat(struct io_stat *dst, struct io_stat *src)
545 {
546         dst->max_val    = le64_to_cpu(src->max_val);
547         dst->min_val    = le64_to_cpu(src->min_val);
548         dst->samples    = le64_to_cpu(src->samples);
549
550         /*
551          * Floats arrive as IEEE 754 encoded uint64_t, convert back to double
552          */
553         dst->mean.u.f   = fio_uint64_to_double(le64_to_cpu(dst->mean.u.i));
554         dst->S.u.f      = fio_uint64_to_double(le64_to_cpu(dst->S.u.i));
555 }
556
557 static void convert_ts(struct thread_stat *dst, struct thread_stat *src)
558 {
559         int i, j;
560
561         dst->error      = le32_to_cpu(src->error);
562         dst->groupid    = le32_to_cpu(src->groupid);
563         dst->pid        = le32_to_cpu(src->pid);
564         dst->members    = le32_to_cpu(src->members);
565
566         for (i = 0; i < 2; i++) {
567                 convert_io_stat(&dst->clat_stat[i], &src->clat_stat[i]);
568                 convert_io_stat(&dst->slat_stat[i], &src->slat_stat[i]);
569                 convert_io_stat(&dst->lat_stat[i], &src->lat_stat[i]);
570                 convert_io_stat(&dst->bw_stat[i], &src->bw_stat[i]);
571         }
572
573         dst->usr_time           = le64_to_cpu(src->usr_time);
574         dst->sys_time           = le64_to_cpu(src->sys_time);
575         dst->ctx                = le64_to_cpu(src->ctx);
576         dst->minf               = le64_to_cpu(src->minf);
577         dst->majf               = le64_to_cpu(src->majf);
578         dst->clat_percentiles   = le64_to_cpu(src->clat_percentiles);
579
580         for (i = 0; i < FIO_IO_U_LIST_MAX_LEN; i++) {
581                 fio_fp64_t *fps = &src->percentile_list[i];
582                 fio_fp64_t *fpd = &dst->percentile_list[i];
583
584                 fpd->u.f = fio_uint64_to_double(le64_to_cpu(fps->u.i));
585         }
586
587         for (i = 0; i < FIO_IO_U_MAP_NR; i++) {
588                 dst->io_u_map[i]        = le32_to_cpu(src->io_u_map[i]);
589                 dst->io_u_submit[i]     = le32_to_cpu(src->io_u_submit[i]);
590                 dst->io_u_complete[i]   = le32_to_cpu(src->io_u_complete[i]);
591         }
592
593         for (i = 0; i < FIO_IO_U_LAT_U_NR; i++) {
594                 dst->io_u_lat_u[i]      = le32_to_cpu(src->io_u_lat_u[i]);
595                 dst->io_u_lat_m[i]      = le32_to_cpu(src->io_u_lat_m[i]);
596         }
597
598         for (i = 0; i < 2; i++)
599                 for (j = 0; j < FIO_IO_U_PLAT_NR; j++)
600                         dst->io_u_plat[i][j] = le32_to_cpu(src->io_u_plat[i][j]);
601
602         for (i = 0; i < 3; i++) {
603                 dst->total_io_u[i]      = le64_to_cpu(src->total_io_u[i]);
604                 dst->short_io_u[i]      = le64_to_cpu(src->short_io_u[i]);
605         }
606
607         dst->total_submit       = le64_to_cpu(src->total_submit);
608         dst->total_complete     = le64_to_cpu(src->total_complete);
609
610         for (i = 0; i < 2; i++) {
611                 dst->io_bytes[i]        = le64_to_cpu(src->io_bytes[i]);
612                 dst->runtime[i]         = le64_to_cpu(src->runtime[i]);
613         }
614
615         dst->total_run_time     = le64_to_cpu(src->total_run_time);
616         dst->continue_on_error  = le16_to_cpu(src->continue_on_error);
617         dst->total_err_count    = le64_to_cpu(src->total_err_count);
618         dst->first_error        = le32_to_cpu(src->first_error);
619         dst->kb_base            = le32_to_cpu(src->kb_base);
620 }
621
622 static void convert_gs(struct group_run_stats *dst, struct group_run_stats *src)
623 {
624         int i;
625
626         for (i = 0; i < 2; i++) {
627                 dst->max_run[i]         = le64_to_cpu(src->max_run[i]);
628                 dst->min_run[i]         = le64_to_cpu(src->min_run[i]);
629                 dst->max_bw[i]          = le64_to_cpu(src->max_bw[i]);
630                 dst->min_bw[i]          = le64_to_cpu(src->min_bw[i]);
631                 dst->io_kb[i]           = le64_to_cpu(src->io_kb[i]);
632                 dst->agg[i]             = le64_to_cpu(src->agg[i]);
633         }
634
635         dst->kb_base    = le32_to_cpu(src->kb_base);
636         dst->groupid    = le32_to_cpu(src->groupid);
637 }
638
639 static void handle_ts(struct fio_client *client, struct fio_net_cmd *cmd)
640 {
641         struct cmd_ts_pdu *p = (struct cmd_ts_pdu *) cmd->payload;
642
643         show_thread_status(&p->ts, &p->rs);
644
645         if (sum_stat_clients == 1)
646                 return;
647
648         sum_thread_stats(&client_ts, &p->ts, sum_stat_nr);
649         sum_group_stats(&client_gs, &p->rs);
650
651         client_ts.members++;
652         client_ts.groupid = p->ts.groupid;
653
654         if (++sum_stat_nr == sum_stat_clients) {
655                 strcpy(client_ts.name, "All clients");
656                 show_thread_status(&client_ts, &client_gs);
657         }
658 }
659
660 static void handle_gs(struct fio_client *client, struct fio_net_cmd *cmd)
661 {
662         struct group_run_stats *gs = (struct group_run_stats *) cmd->payload;
663
664         show_group_stats(gs);
665 }
666
667 static void handle_text(struct fio_client *client, struct fio_net_cmd *cmd)
668 {
669         struct cmd_text_pdu *pdu = (struct cmd_text_pdu *) cmd->payload;
670         const char *buf = (const char *) pdu->buf;
671         const char *name;
672         int fio_unused ret;
673
674         name = client->name ? client->name : client->hostname;
675
676         if (!client->skip_newline)
677                 fprintf(f_out, "<%s> ", name);
678         ret = fwrite(buf, pdu->buf_len, 1, f_out);
679         fflush(f_out);
680         client->skip_newline = strchr(buf, '\n') == NULL;
681 }
682
683 static void convert_agg(struct disk_util_agg *agg)
684 {
685         int i;
686
687         for (i = 0; i < 2; i++) {
688                 agg->ios[i]     = le32_to_cpu(agg->ios[i]);
689                 agg->merges[i]  = le32_to_cpu(agg->merges[i]);
690                 agg->sectors[i] = le64_to_cpu(agg->sectors[i]);
691                 agg->ticks[i]   = le32_to_cpu(agg->ticks[i]);
692         }
693
694         agg->io_ticks           = le32_to_cpu(agg->io_ticks);
695         agg->time_in_queue      = le32_to_cpu(agg->time_in_queue);
696         agg->slavecount         = le32_to_cpu(agg->slavecount);
697         agg->max_util.u.f       = fio_uint64_to_double(__le64_to_cpu(agg->max_util.u.i));
698 }
699
700 static void convert_dus(struct disk_util_stat *dus)
701 {
702         int i;
703
704         for (i = 0; i < 2; i++) {
705                 dus->ios[i]     = le32_to_cpu(dus->ios[i]);
706                 dus->merges[i]  = le32_to_cpu(dus->merges[i]);
707                 dus->sectors[i] = le64_to_cpu(dus->sectors[i]);
708                 dus->ticks[i]   = le32_to_cpu(dus->ticks[i]);
709         }
710
711         dus->io_ticks           = le32_to_cpu(dus->io_ticks);
712         dus->time_in_queue      = le32_to_cpu(dus->time_in_queue);
713         dus->msec               = le64_to_cpu(dus->msec);
714 }
715
716 static void handle_du(struct fio_client *client, struct fio_net_cmd *cmd)
717 {
718         struct cmd_du_pdu *du = (struct cmd_du_pdu *) cmd->payload;
719
720         if (!client->disk_stats_shown) {
721                 client->disk_stats_shown = 1;
722                 log_info("\nDisk stats (read/write):\n");
723         }
724
725         print_disk_util(&du->dus, &du->agg, terse_output);
726 }
727
728 static void convert_jobs_eta(struct jobs_eta *je)
729 {
730         int i;
731
732         je->nr_running          = le32_to_cpu(je->nr_running);
733         je->nr_ramp             = le32_to_cpu(je->nr_ramp);
734         je->nr_pending          = le32_to_cpu(je->nr_pending);
735         je->files_open          = le32_to_cpu(je->files_open);
736
737         for (i = 0; i < 2; i++) {
738                 je->m_rate[i]           = le32_to_cpu(je->m_rate[i]);
739                 je->t_rate[i]           = le32_to_cpu(je->t_rate[i]);
740                 je->m_iops[i]           = le32_to_cpu(je->m_iops[i]);
741                 je->t_iops[i]           = le32_to_cpu(je->t_iops[i]);
742                 je->rate[i]     = le32_to_cpu(je->rate[i]);
743                 je->iops[i]     = le32_to_cpu(je->iops[i]);
744         }
745
746         je->elapsed_sec         = le64_to_cpu(je->elapsed_sec);
747         je->eta_sec             = le64_to_cpu(je->eta_sec);
748 }
749
750 void fio_client_sum_jobs_eta(struct jobs_eta *dst, struct jobs_eta *je)
751 {
752         int i;
753
754         dst->nr_running         += je->nr_running;
755         dst->nr_ramp            += je->nr_ramp;
756         dst->nr_pending         += je->nr_pending;
757         dst->files_open         += je->files_open;
758
759         for (i = 0; i < 2; i++) {
760                 dst->m_rate[i]  += je->m_rate[i];
761                 dst->t_rate[i]  += je->t_rate[i];
762                 dst->m_iops[i]  += je->m_iops[i];
763                 dst->t_iops[i]  += je->t_iops[i];
764                 dst->rate[i]    += je->rate[i];
765                 dst->iops[i]    += je->iops[i];
766         }
767
768         dst->elapsed_sec        += je->elapsed_sec;
769
770         if (je->eta_sec > dst->eta_sec)
771                 dst->eta_sec = je->eta_sec;
772 }
773
774 void fio_client_dec_jobs_eta(struct client_eta *eta, client_eta_op eta_fn)
775 {
776         if (!--eta->pending) {
777                 eta_fn(&eta->eta);
778                 free(eta);
779         }
780 }
781
782 static void remove_reply_cmd(struct fio_client *client, struct fio_net_cmd *cmd)
783 {
784         struct fio_net_int_cmd *icmd = NULL;
785         struct flist_head *entry;
786
787         flist_for_each(entry, &client->cmd_list) {
788                 icmd = flist_entry(entry, struct fio_net_int_cmd, list);
789
790                 if (cmd->tag == (uintptr_t) icmd)
791                         break;
792
793                 icmd = NULL;
794         }
795
796         if (!icmd) {
797                 log_err("fio: client: unable to find matching tag\n");
798                 return;
799         }
800
801         flist_del(&icmd->list);
802         cmd->tag = icmd->saved_tag;
803         free(icmd);
804 }
805
806 static void handle_eta(struct fio_client *client, struct fio_net_cmd *cmd)
807 {
808         struct jobs_eta *je = (struct jobs_eta *) cmd->payload;
809         struct client_eta *eta = (struct client_eta *) (uintptr_t) cmd->tag;
810
811         dprint(FD_NET, "client: got eta tag %p, %d\n", eta, eta->pending);
812
813         assert(client->eta_in_flight == eta);
814
815         client->eta_in_flight = NULL;
816         flist_del_init(&client->eta_list);
817
818         fio_client_sum_jobs_eta(&eta->eta, je);
819         fio_client_dec_jobs_eta(eta, client->ops->eta);
820 }
821
822 static void handle_probe(struct fio_client *client, struct fio_net_cmd *cmd)
823 {
824         struct cmd_probe_pdu *probe = (struct cmd_probe_pdu *) cmd->payload;
825         const char *os, *arch;
826         char bit[16];
827
828         os = fio_get_os_string(probe->os);
829         if (!os)
830                 os = "unknown";
831
832         arch = fio_get_arch_string(probe->arch);
833         if (!arch)
834                 os = "unknown";
835
836         sprintf(bit, "%d-bit", probe->bpp * 8);
837
838         log_info("hostname=%s, be=%u, %s, os=%s, arch=%s, fio=%u.%u.%u\n",
839                 probe->hostname, probe->bigendian, bit, os, arch,
840                 probe->fio_major, probe->fio_minor, probe->fio_patch);
841
842         if (!client->name)
843                 client->name = strdup((char *) probe->hostname);
844 }
845
846 static void handle_start(struct fio_client *client, struct fio_net_cmd *cmd)
847 {
848         struct cmd_start_pdu *pdu = (struct cmd_start_pdu *) cmd->payload;
849
850         client->state = Client_started;
851         client->jobs = le32_to_cpu(pdu->jobs);
852 }
853
854 static void handle_stop(struct fio_client *client, struct fio_net_cmd *cmd)
855 {
856         struct cmd_end_pdu *pdu = (struct cmd_end_pdu *) cmd->payload;
857
858         client->state = Client_stopped;
859         client->error = le32_to_cpu(pdu->error);
860
861         if (client->error)
862                 log_info("client <%s>: exited with error %d\n", client->hostname, client->error);
863 }
864
865 static void convert_text(struct fio_net_cmd *cmd)
866 {
867         struct cmd_text_pdu *pdu = (struct cmd_text_pdu *) cmd->payload;
868
869         pdu->level      = le32_to_cpu(pdu->level);
870         pdu->buf_len    = le32_to_cpu(pdu->buf_len);
871         pdu->log_sec    = le64_to_cpu(pdu->log_sec);
872         pdu->log_usec   = le64_to_cpu(pdu->log_usec);
873 }
874
875 int fio_handle_client(struct fio_client *client)
876 {
877         struct client_ops *ops = client->ops;
878         struct fio_net_cmd *cmd;
879
880         dprint(FD_NET, "client: handle %s\n", client->hostname);
881
882         cmd = fio_net_recv_cmd(client->fd);
883         if (!cmd)
884                 return 0;
885
886         dprint(FD_NET, "client: got cmd op %s from %s\n",
887                                 fio_server_op(cmd->opcode), client->hostname);
888
889         switch (cmd->opcode) {
890         case FIO_NET_CMD_QUIT:
891                 if (ops->quit)
892                         ops->quit(client);
893                 remove_client(client);
894                 free(cmd);
895                 break;
896         case FIO_NET_CMD_TEXT:
897                 convert_text(cmd);
898                 ops->text_op(client, cmd);
899                 free(cmd);
900                 break;
901         case FIO_NET_CMD_DU: {
902                 struct cmd_du_pdu *du = (struct cmd_du_pdu *) cmd->payload;
903
904                 convert_dus(&du->dus);
905                 convert_agg(&du->agg);
906
907                 ops->disk_util(client, cmd);
908                 free(cmd);
909                 break;
910                 }
911         case FIO_NET_CMD_TS: {
912                 struct cmd_ts_pdu *p = (struct cmd_ts_pdu *) cmd->payload;
913
914                 convert_ts(&p->ts, &p->ts);
915                 convert_gs(&p->rs, &p->rs);
916
917                 ops->thread_status(client, cmd);
918                 free(cmd);
919                 break;
920                 }
921         case FIO_NET_CMD_GS: {
922                 struct group_run_stats *gs = (struct group_run_stats *) cmd->payload;
923
924                 convert_gs(gs, gs);
925
926                 ops->group_stats(client, cmd);
927                 free(cmd);
928                 break;
929                 }
930         case FIO_NET_CMD_ETA: {
931                 struct jobs_eta *je = (struct jobs_eta *) cmd->payload;
932
933                 remove_reply_cmd(client, cmd);
934                 convert_jobs_eta(je);
935                 handle_eta(client, cmd);
936                 free(cmd);
937                 break;
938                 }
939         case FIO_NET_CMD_PROBE:
940                 remove_reply_cmd(client, cmd);
941                 ops->probe(client, cmd);
942                 free(cmd);
943                 break;
944         case FIO_NET_CMD_RUN:
945                 client->state = Client_running;
946                 free(cmd);
947                 break;
948         case FIO_NET_CMD_START:
949                 handle_start(client, cmd);
950                 free(cmd);
951                 break;
952         case FIO_NET_CMD_STOP:
953                 handle_stop(client, cmd);
954                 free(cmd);
955                 break;
956         case FIO_NET_CMD_ADD_JOB:
957                 if (ops->add_job)
958                         ops->add_job(client, cmd);
959                 free(cmd);
960                 break;
961         default:
962                 log_err("fio: unknown client op: %s\n", fio_server_op(cmd->opcode));
963                 free(cmd);
964                 break;
965         }
966
967         return 1;
968 }
969
970 static void request_client_etas(struct client_ops *ops)
971 {
972         struct fio_client *client;
973         struct flist_head *entry;
974         struct client_eta *eta;
975         int skipped = 0;
976
977         dprint(FD_NET, "client: request eta (%d)\n", nr_clients);
978
979         eta = malloc(sizeof(*eta));
980         memset(&eta->eta, 0, sizeof(eta->eta));
981         eta->pending = nr_clients;
982
983         flist_for_each(entry, &client_list) {
984                 client = flist_entry(entry, struct fio_client, list);
985
986                 if (!flist_empty(&client->eta_list)) {
987                         skipped++;
988                         continue;
989                 }
990                 if (client->state != Client_running)
991                         continue;
992
993                 assert(!client->eta_in_flight);
994                 flist_add_tail(&client->eta_list, &eta_list);
995                 client->eta_in_flight = eta;
996                 fio_net_send_simple_cmd(client->fd, FIO_NET_CMD_SEND_ETA,
997                                         (uintptr_t) eta, &client->cmd_list);
998         }
999
1000         while (skipped--)
1001                 fio_client_dec_jobs_eta(eta, ops->eta);
1002
1003         dprint(FD_NET, "client: requested eta tag %p\n", eta);
1004 }
1005
1006 static int client_check_cmd_timeout(struct fio_client *client,
1007                                     struct timeval *now)
1008 {
1009         struct fio_net_int_cmd *cmd;
1010         struct flist_head *entry, *tmp;
1011         int ret = 0;
1012
1013         flist_for_each_safe(entry, tmp, &client->cmd_list) {
1014                 cmd = flist_entry(entry, struct fio_net_int_cmd, list);
1015
1016                 if (mtime_since(&cmd->tv, now) < FIO_NET_CLIENT_TIMEOUT)
1017                         continue;
1018
1019                 log_err("fio: client %s, timeout on cmd %s\n", client->hostname,
1020                                                 fio_server_op(cmd->cmd.opcode));
1021                 flist_del(&cmd->list);
1022                 free(cmd);
1023                 ret = 1;
1024         }
1025
1026         return flist_empty(&client->cmd_list) && ret;
1027 }
1028
1029 static int fio_check_clients_timed_out(void)
1030 {
1031         struct fio_client *client;
1032         struct flist_head *entry, *tmp;
1033         struct timeval tv;
1034         int ret = 0;
1035
1036         gettimeofday(&tv, NULL);
1037
1038         flist_for_each_safe(entry, tmp, &client_list) {
1039                 client = flist_entry(entry, struct fio_client, list);
1040
1041                 if (flist_empty(&client->cmd_list))
1042                         continue;
1043
1044                 if (!client_check_cmd_timeout(client, &tv))
1045                         continue;
1046
1047                 if (client->ops->timed_out)
1048                         client->ops->timed_out(client);
1049                 else
1050                         log_err("fio: client %s timed out\n", client->hostname);
1051
1052                 remove_client(client);
1053                 ret = 1;
1054         }
1055
1056         return ret;
1057 }
1058
1059 int fio_handle_clients(struct client_ops *ops)
1060 {
1061         struct pollfd *pfds;
1062         int i, ret = 0, retval = 0;
1063
1064         gettimeofday(&eta_tv, NULL);
1065
1066         pfds = malloc(nr_clients * sizeof(struct pollfd));
1067
1068         sum_stat_clients = nr_clients;
1069         init_thread_stat(&client_ts);
1070         init_group_run_stat(&client_gs);
1071
1072         while (!exit_backend && nr_clients) {
1073                 struct flist_head *entry, *tmp;
1074                 struct fio_client *client;
1075
1076                 i = 0;
1077                 flist_for_each_safe(entry, tmp, &client_list) {
1078                         client = flist_entry(entry, struct fio_client, list);
1079
1080                         if (!client->sent_job && !client->ops->stay_connected &&
1081                             flist_empty(&client->cmd_list)) {
1082                                 remove_client(client);
1083                                 continue;
1084                         }
1085
1086                         pfds[i].fd = client->fd;
1087                         pfds[i].events = POLLIN;
1088                         i++;
1089                 }
1090
1091                 if (!nr_clients)
1092                         break;
1093
1094                 assert(i == nr_clients);
1095
1096                 do {
1097                         struct timeval tv;
1098
1099                         gettimeofday(&tv, NULL);
1100                         if (mtime_since(&eta_tv, &tv) >= 900) {
1101                                 request_client_etas(ops);
1102                                 memcpy(&eta_tv, &tv, sizeof(tv));
1103
1104                                 if (fio_check_clients_timed_out())
1105                                         break;
1106                         }
1107
1108                         ret = poll(pfds, nr_clients, 100);
1109                         if (ret < 0) {
1110                                 if (errno == EINTR)
1111                                         continue;
1112                                 log_err("fio: poll clients: %s\n", strerror(errno));
1113                                 break;
1114                         } else if (!ret)
1115                                 continue;
1116                 } while (ret <= 0);
1117
1118                 for (i = 0; i < nr_clients; i++) {
1119                         if (!(pfds[i].revents & POLLIN))
1120                                 continue;
1121
1122                         client = find_client_by_fd(pfds[i].fd);
1123                         if (!client) {
1124                                 log_err("fio: unknown client fd %d\n", pfds[i].fd);
1125                                 continue;
1126                         }
1127                         if (!fio_handle_client(client)) {
1128                                 log_info("client: host=%s disconnected\n",
1129                                                 client->hostname);
1130                                 remove_client(client);
1131                                 retval = 1;
1132                         } else if (client->error)
1133                                 retval = 1;
1134                         put_client(client);
1135                 }
1136         }
1137
1138         free(pfds);
1139         return retval;
1140 }