Reworked blktrace master/thread interface
[blktrace.git] / blktrace.c
1 /*
2  * block queue tracing application
3  *
4  * Copyright (C) 2005 Jens Axboe <axboe@suse.de>
5  * Copyright (C) 2006 Jens Axboe <axboe@kernel.dk>
6  *
7  * Rewrite to have a single thread per CPU (managing all devices on that CPU)
8  *      Alan D. Brunelle <alan.brunelle@hp.com> - January 2009
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *
20  *  You should have received a copy of the GNU General Public License
21  *  along with this program; if not, write to the Free Software
22  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
23  *
24  */
25
26 #include <errno.h>
27 #include <stdarg.h>
28 #include <stdio.h>
29 #include <stdlib.h>
30 #include <string.h>
31 #include <fcntl.h>
32 #include <getopt.h>
33 #include <sched.h>
34 #include <unistd.h>
35 #include <poll.h>
36 #include <signal.h>
37 #include <pthread.h>
38 #include <locale.h>
39 #include <sys/ioctl.h>
40 #include <sys/types.h>
41 #include <sys/stat.h>
42 #include <sys/vfs.h>
43 #include <sys/mman.h>
44 #include <sys/param.h>
45 #include <sys/time.h>
46 #include <sys/resource.h>
47 #include <sys/socket.h>
48 #include <netinet/in.h>
49 #include <arpa/inet.h>
50 #include <netdb.h>
51 #include <sys/sendfile.h>
52
53 #include "btt/list.h"
54 #include "blktrace.h"
55
56 /*
57  * You may want to increase this even more, if you are logging at a high
58  * rate and see skipped/missed events
59  */
60 #define BUF_SIZE                (512 * 1024)
61 #define BUF_NR                  (4)
62
63 #define FILE_VBUF_SIZE          (128 * 1024)
64
65 #define DEBUGFS_TYPE            (0x64626720)
66 #define TRACE_NET_PORT          (8462)
67
68 enum {
69         Net_none = 0,
70         Net_server,
71         Net_client,
72 };
73
74 enum thread_status {
75         Th_running,
76         Th_leaving,
77         Th_error
78 };
79
80 /*
81  * Generic stats collected: nevents can be _roughly_ estimated by data_read
82  * (discounting pdu...)
83  *
84  * These fields are updated w/ pdc_dr_update & pdc_nev_update below.
85  */
86 struct pdc_stats {
87         unsigned long long data_read;
88         unsigned long long nevents;
89 };
90
91 struct devpath {
92         struct list_head head;
93         char *path;                     /* path to device special file */
94         char *buts_name;                /* name returned from bt kernel code */
95         struct pdc_stats *stats;
96         int fd, idx, ncpus;
97         unsigned long long drops;
98
99         /*
100          * For piped output only:
101          *
102          * Each tracer will have a tracer_devpath_head that it will add new
103          * data onto. It's list is protected above (tracer_devpath_head.mutex)
104          * and it will signal the processing thread using the dp_cond,
105          * dp_mutex & dp_entries variables above.
106          */
107         struct tracer_devpath_head *heads;
108
109         /*
110          * For network server mode only:
111          */
112         struct cl_host *ch;
113         u32 cl_id;
114         time_t cl_connect_time;
115         struct io_info *ios;
116 };
117
118 /*
119  * For piped output to stdout we will have each tracer thread (one per dev)
120  * tack buffers read from the relay queues on a per-device list.
121  *
122  * The main thread will then collect trace buffers from each of lists in turn.
123  *
124  * We will use a mutex to guard each of the trace_buf list. The tracers
125  * can then signal the main thread using <dp_cond,dp_mutex> and
126  * dp_entries. (When dp_entries is 0, and a tracer adds an entry it will
127  * signal. When dp_entries is 0, the main thread will wait for that condition
128  * to be signalled.)
129  *
130  * adb: It may be better just to have a large buffer per tracer per dev,
131  * and then use it as a ring-buffer. This would certainly cut down a lot
132  * of malloc/free thrashing, at the cost of more memory movements (potentially).
133  */
134 struct trace_buf {
135         struct list_head head;
136         struct devpath *dpp;
137         void *buf;
138         int cpu, len;
139 };
140
141 struct tracer_devpath_head {
142         pthread_mutex_t mutex;
143         struct list_head head;
144         struct trace_buf *prev;
145 };
146
147 /*
148  * Used to handle the mmap() interfaces for output file (containing traces)
149  */
150 struct mmap_info {
151         void *fs_buf;
152         unsigned long long fs_size, fs_max_size, fs_off, fs_buf_len;
153         unsigned long buf_size, buf_nr;
154         int pagesize;
155 };
156
157 /*
158  * Each thread doing work on a (client) side of blktrace will have one
159  * of these. The ios array contains input/output information, pfds holds
160  * poll() data. The volatile's provide flags to/from the main executing
161  * thread.
162  */
163 struct tracer {
164         struct list_head head;
165         struct io_info *ios;
166         struct pollfd *pfds;
167         pthread_t thread;
168         int cpu, nios;
169         volatile int status, is_done;
170 };
171
172 /*
173  * networking stuff follows. we include a magic number so we know whether
174  * to endianness convert or not.
175  *
176  * The len field is overloaded:
177  *      0 - Indicates an "open" - allowing the server to set up for a dev/cpu
178  *      1 - Indicates a "close" - Shut down connection orderly
179  *
180  * The cpu field is overloaded on close: it will contain the number of drops.
181  */
182 struct blktrace_net_hdr {
183         u32 magic;              /* same as trace magic */
184         char buts_name[32];     /* trace name */
185         u32 cpu;                /* for which cpu */
186         u32 max_cpus;
187         u32 len;                /* length of following trace data */
188         u32 cl_id;              /* id for set of client per-cpu connections */
189         u32 buf_size;           /* client buf_size for this trace  */
190         u32 buf_nr;             /* client buf_nr for this trace  */
191         u32 page_size;          /* client page_size for this trace  */
192 };
193
194 /*
195  * Each host encountered has one of these. The head is used to link this
196  * on to the network server's ch_list. Connections associated with this
197  * host are linked on conn_list, and any devices traced on that host
198  * are connected on the devpaths list.
199  */
200 struct cl_host {
201         struct list_head head;
202         struct list_head conn_list;
203         struct list_head devpaths;
204         struct net_server_s *ns;
205         char *hostname;
206         struct in_addr cl_in_addr;
207         int connects, ndevs, cl_opens;
208 };
209
210 /*
211  * Each connection (client to server socket ('fd')) has one of these. A
212  * back reference to the host ('ch'), and lists headers (for the host
213  * list, and the network server conn_list) are also included.
214  */
215 struct cl_conn {
216         struct list_head ch_head, ns_head;
217         struct cl_host *ch;
218         int fd, ncpus;
219         time_t connect_time;
220 };
221
222 /*
223  * The network server requires some poll structures to be maintained -
224  * one per conection currently on conn_list. The nchs/ch_list values
225  * are for each host connected to this server. The addr field is used
226  * for scratch as new connections are established.
227  */
228 struct net_server_s {
229         struct list_head conn_list;
230         struct list_head ch_list;
231         struct pollfd *pfds;
232         int listen_fd, connects, nchs;
233         struct sockaddr_in addr;
234 };
235
236 /*
237  * This structure is (generically) used to providide information
238  * for a read-to-write set of values.
239  *
240  * ifn & ifd represent input information
241  *
242  * ofn, ofd, ofp, obuf & mmap_info are used for output file (optionally).
243  */
244 struct io_info {
245         struct devpath *dpp;
246         FILE *ofp;
247         char *obuf;
248         struct cl_conn *nc;     /* Server network connection */
249
250         /*
251          * mmap controlled output files
252          */
253         struct mmap_info mmap_info;
254
255         /*
256          * Client network fields
257          */
258         unsigned int ready;
259         unsigned long long data_queued;
260
261         /*
262          * Input/output file descriptors & names
263          */
264         int ifd, ofd;
265         char ifn[MAXPATHLEN + 64];
266         char ofn[MAXPATHLEN + 64];
267 };
268
269 static char blktrace_version[] = "2.0.0";
270
271 /*
272  * Linkage to blktrace helper routines (trace conversions)
273  */
274 int data_is_native = -1;
275
276 static int ncpus;
277 static int pagesize;
278 static int act_mask = ~0U;
279 static char *debugfs_path = "/sys/kernel/debug";
280 static char *output_name;
281 static char *output_dir;
282 static int kill_running_trace;
283 static int stop_watch;
284 static unsigned long buf_size = BUF_SIZE;
285 static unsigned long buf_nr = BUF_NR;
286 static LIST_HEAD(devpaths);
287 static LIST_HEAD(tracers);
288 static int ndevs;
289 static volatile int done;
290 static FILE *pfp;
291 static int piped_output;
292
293 /*
294  * tracer threads add entries, the main thread takes them off and processes
295  * them. These protect the dp_entries variable.
296  */
297 static pthread_cond_t dp_cond = PTHREAD_COND_INITIALIZER;
298 static pthread_mutex_t dp_mutex = PTHREAD_MUTEX_INITIALIZER;
299 static volatile int dp_entries;
300
301 /*
302  * These synchronize master / thread interactions.
303  */
304 static pthread_cond_t mt_cond = PTHREAD_COND_INITIALIZER;
305 static pthread_mutex_t mt_mutex = PTHREAD_MUTEX_INITIALIZER;
306 static volatile int nthreads_running;
307 static volatile int nthreads_leaving;
308 static volatile int nthreads_error;
309 static volatile int tracers_run;
310
311 /*
312  * network cmd line params
313  */
314 static struct sockaddr_in hostname_addr;
315 static char hostname[MAXHOSTNAMELEN];
316 static int net_port = TRACE_NET_PORT;
317 static int net_use_sendfile = 1;
318 static int net_mode;
319 static int *cl_fds;
320
321 static int (*handle_pfds)(struct tracer *, int, int);
322 static int (*handle_list)(struct tracer_devpath_head *, struct list_head *);
323
324 #define S_OPTS  "d:a:A:r:o:kw:vVb:n:D:lh:p:sI:"
325 static struct option l_opts[] = {
326         {
327                 .name = "dev",
328                 .has_arg = required_argument,
329                 .flag = NULL,
330                 .val = 'd'
331         },
332         {
333                 .name = "input-devs",
334                 .has_arg = required_argument,
335                 .flag = NULL,
336                 .val = 'I'
337         },
338         {
339                 .name = "act-mask",
340                 .has_arg = required_argument,
341                 .flag = NULL,
342                 .val = 'a'
343         },
344         {
345                 .name = "set-mask",
346                 .has_arg = required_argument,
347                 .flag = NULL,
348                 .val = 'A'
349         },
350         {
351                 .name = "relay",
352                 .has_arg = required_argument,
353                 .flag = NULL,
354                 .val = 'r'
355         },
356         {
357                 .name = "output",
358                 .has_arg = required_argument,
359                 .flag = NULL,
360                 .val = 'o'
361         },
362         {
363                 .name = "kill",
364                 .has_arg = no_argument,
365                 .flag = NULL,
366                 .val = 'k'
367         },
368         {
369                 .name = "stopwatch",
370                 .has_arg = required_argument,
371                 .flag = NULL,
372                 .val = 'w'
373         },
374         {
375                 .name = "version",
376                 .has_arg = no_argument,
377                 .flag = NULL,
378                 .val = 'v'
379         },
380         {
381                 .name = "version",
382                 .has_arg = no_argument,
383                 .flag = NULL,
384                 .val = 'V'
385         },
386         {
387                 .name = "buffer-size",
388                 .has_arg = required_argument,
389                 .flag = NULL,
390                 .val = 'b'
391         },
392         {
393                 .name = "num-sub-buffers",
394                 .has_arg = required_argument,
395                 .flag = NULL,
396                 .val = 'n'
397         },
398         {
399                 .name = "output-dir",
400                 .has_arg = required_argument,
401                 .flag = NULL,
402                 .val = 'D'
403         },
404         {
405                 .name = "listen",
406                 .has_arg = no_argument,
407                 .flag = NULL,
408                 .val = 'l'
409         },
410         {
411                 .name = "host",
412                 .has_arg = required_argument,
413                 .flag = NULL,
414                 .val = 'h'
415         },
416         {
417                 .name = "port",
418                 .has_arg = required_argument,
419                 .flag = NULL,
420                 .val = 'p'
421         },
422         {
423                 .name = "no-sendfile",
424                 .has_arg = no_argument,
425                 .flag = NULL,
426                 .val = 's'
427         },
428         {
429                 .name = NULL,
430         }
431 };
432
433 static char usage_str[] = \
434         "-d <dev> [ -r debugfs path ] [ -o <output> ] [-k ] [ -w time ]\n" \
435         "[ -a action ] [ -A action mask ] [ -I  <devs file> ] [ -v ]\n\n" \
436         "\t-d Use specified device. May also be given last after options\n" \
437         "\t-r Path to mounted debugfs, defaults to /sys/kernel/debug\n" \
438         "\t-o File(s) to send output to\n" \
439         "\t-D Directory to prepend to output file names\n" \
440         "\t-k Kill a running trace\n" \
441         "\t-w Stop after defined time, in seconds\n" \
442         "\t-a Only trace specified actions. See documentation\n" \
443         "\t-A Give trace mask as a single value. See documentation\n" \
444         "\t-b Sub buffer size in KiB\n" \
445         "\t-n Number of sub buffers\n" \
446         "\t-l Run in network listen mode (blktrace server)\n" \
447         "\t-h Run in network client mode, connecting to the given host\n" \
448         "\t-p Network port to use (default 8462)\n" \
449         "\t-s Make the network client NOT use sendfile() to transfer data\n" \
450         "\t-I Add devices found in <devs file>\n" \
451         "\t-V Print program version info\n\n";
452
453 static void clear_events(struct pollfd *pfd)
454 {
455         pfd->events = 0;
456         pfd->revents = 0;
457 }
458
459 static inline int net_client_use_sendfile(void)
460 {
461         return net_mode == Net_client && net_use_sendfile;
462 }
463
464 static inline int net_client_use_send(void)
465 {
466         return net_mode == Net_client && !net_use_sendfile;
467 }
468
469 static inline int use_tracer_devpaths(void)
470 {
471         return piped_output || net_client_use_send();
472 }
473
474 static inline int in_addr_eq(struct in_addr a, struct in_addr b)
475 {
476         return a.s_addr == b.s_addr;
477 }
478
479 static inline void pdc_dr_update(struct devpath *dpp, int cpu, int data_read)
480 {
481         dpp->stats[cpu].data_read += data_read;
482 }
483
484 static inline void pdc_nev_update(struct devpath *dpp, int cpu, int nevents)
485 {
486         dpp->stats[cpu].nevents += nevents;
487 }
488
489 static void show_usage(char *prog)
490 {
491         fprintf(stderr, "Usage: %s %s %s", prog, blktrace_version, usage_str);
492 }
493
494 /*
495  * Create a timespec 'msec' milliseconds into the future
496  */
497 static inline void make_timespec(struct timespec *tsp, long delta_msec)
498 {
499         struct timeval now;
500
501         gettimeofday(&now, NULL);
502         tsp->tv_sec = now.tv_sec;
503         tsp->tv_nsec = 1000L * now.tv_usec;
504
505         tsp->tv_nsec += (delta_msec * 1000000L);
506         if (tsp->tv_nsec > 1000000000L) {
507                 long secs = tsp->tv_nsec / 1000000000L;
508
509                 tsp->tv_sec += secs;
510                 tsp->tv_nsec -= (secs * 1000000000L);
511         }
512 }
513
514 /*
515  * Add a timer to ensure wait ends
516  */
517 static void t_pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
518 {
519         struct timespec ts;
520
521         make_timespec(&ts, 50);
522         pthread_cond_timedwait(cond, mutex, &ts);
523 }
524
525 static void unblock_tracers(void)
526 {
527         pthread_mutex_lock(&mt_mutex);
528         tracers_run = 1;
529         pthread_cond_broadcast(&mt_cond);
530         pthread_mutex_unlock(&mt_mutex);
531 }
532
533 static void tracer_wait_unblock(struct tracer *tp)
534 {
535         pthread_mutex_lock(&mt_mutex);
536         while (!tp->is_done && !tracers_run)
537                 pthread_cond_wait(&mt_cond, &mt_mutex);
538         pthread_mutex_unlock(&mt_mutex);
539 }
540
541 static void tracer_signal_ready(struct tracer *tp,
542                                 enum thread_status th_status,
543                                 int status)
544 {
545         pthread_mutex_lock(&mt_mutex);
546         tp->status = status;
547
548         if (th_status == Th_running)
549                 nthreads_running++;
550         else if (th_status == Th_error)
551                 nthreads_error++;
552         else
553                 nthreads_leaving++;
554
555         pthread_cond_signal(&mt_cond);
556         pthread_mutex_unlock(&mt_mutex);
557 }
558
559 static void wait_tracers_ready(int ncpus_started)
560 {
561         pthread_mutex_lock(&mt_mutex);
562         while ((nthreads_running + nthreads_error) < ncpus_started)
563                 t_pthread_cond_wait(&mt_cond, &mt_mutex);
564         pthread_mutex_unlock(&mt_mutex);
565 }
566
567 static void wait_tracers_leaving(void)
568 {
569         pthread_mutex_lock(&mt_mutex);
570         while (nthreads_leaving < nthreads_running)
571                 t_pthread_cond_wait(&mt_cond, &mt_mutex);
572         pthread_mutex_unlock(&mt_mutex);
573 }
574
575 static void init_mmap_info(struct mmap_info *mip)
576 {
577         mip->buf_size = buf_size;
578         mip->buf_nr = buf_nr;
579         mip->pagesize = pagesize;
580 }
581
582 static void net_close_connection(int *fd)
583 {
584         shutdown(*fd, SHUT_RDWR);
585         close(*fd);
586         *fd = -1;
587 }
588
589 static void dpp_free(struct devpath *dpp)
590 {
591         if (dpp->stats)
592                 free(dpp->stats);
593         if (dpp->ios)
594                 free(dpp->ios);
595         if (dpp->path)
596                 free(dpp->path);
597         if (dpp->buts_name)
598                 free(dpp->buts_name);
599         free(dpp);
600 }
601
602 static int lock_on_cpu(int cpu)
603 {
604         cpu_set_t cpu_mask;
605
606         CPU_ZERO(&cpu_mask);
607         CPU_SET(cpu, &cpu_mask);
608         if (sched_setaffinity(getpid(), sizeof(cpu_mask), &cpu_mask) < 0)
609                 return errno;
610
611         return 0;
612 }
613
614 static int increase_limit(int resource, rlim_t increase)
615 {
616         struct rlimit rlim;
617         int save_errno = errno;
618
619         if (!getrlimit(resource, &rlim)) {
620                 rlim.rlim_cur += increase;
621                 if (rlim.rlim_cur >= rlim.rlim_max)
622                         rlim.rlim_max = rlim.rlim_cur + increase;
623
624                 if (!setrlimit(resource, &rlim))
625                         return 1;
626         }
627
628         errno = save_errno;
629         return 0;
630 }
631
632 static int handle_open_failure(void)
633 {
634         if (errno == ENFILE || errno == EMFILE)
635                 return increase_limit(RLIMIT_NOFILE, 16);
636         return 0;
637 }
638
639 static int handle_mem_failure(size_t length)
640 {
641         if (errno == ENFILE)
642                 return handle_open_failure();
643         else if (errno == ENOMEM)
644                 return increase_limit(RLIMIT_MEMLOCK, 2 * length);
645         return 0;
646 }
647
648 static FILE *my_fopen(const char *path, const char *mode)
649 {
650         FILE *fp;
651
652         do {
653                 fp = fopen(path, mode);
654         } while (fp == NULL && handle_open_failure());
655
656         return fp;
657 }
658
659 static int my_open(const char *path, int flags)
660 {
661         int fd;
662
663         do {
664                 fd = open(path, flags);
665         } while (fd < 0 && handle_open_failure());
666
667         return fd;
668 }
669
670 static int my_socket(int domain, int type, int protocol)
671 {
672         int fd;
673
674         do {
675                 fd = socket(domain, type, protocol);
676         } while (fd < 0 && handle_open_failure());
677
678         return fd;
679 }
680
681 static int my_accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen)
682 {
683         int fd;
684
685         do {
686                 fd = accept(sockfd, addr, addrlen);
687         } while (fd < 0 && handle_open_failure());
688
689         return fd;
690 }
691
692 static void *my_mmap(void *addr, size_t length, int prot, int flags, int fd,
693                      off_t offset)
694 {
695         void *new;
696
697         do {
698                 new = mmap(addr, length, prot, flags, fd, offset);
699         } while (new == MAP_FAILED && handle_mem_failure(length));
700
701         return new;
702 }
703
704 static int my_mlock(const void *addr, size_t len)
705 {
706         int ret;
707
708         do {
709                 ret = mlock(addr, len);
710         } while (ret < 0 && handle_mem_failure(len));
711
712         return ret;
713 }
714
715 static int __stop_trace(int fd)
716 {
717         /*
718          * Should be stopped, don't complain if it isn't
719          */
720         ioctl(fd, BLKTRACESTOP);
721         return ioctl(fd, BLKTRACETEARDOWN);
722 }
723
724 static int write_data(char *buf, int len)
725 {
726         int ret;
727
728 rewrite:
729         ret = fwrite(buf, len, 1, pfp);
730         if (ferror(pfp) || ret != 1) {
731                 if (errno == EINTR) {
732                         clearerr(pfp);
733                         goto rewrite;
734                 }
735
736                 if (!piped_output || (errno != EPIPE && errno != EBADF)) {
737                         fprintf(stderr, "write(%d) failed: %d/%s\n",
738                                 len, errno, strerror(errno));
739                 }
740                 goto err;
741         }
742
743         fflush(pfp);
744         return 0;
745
746 err:
747         clearerr(pfp);
748         return 1;
749 }
750
751 /*
752  * Returns the number of bytes read (successfully)
753  */
754 static int __net_recv_data(int fd, void *buf, unsigned int len)
755 {
756         unsigned int bytes_left = len;
757
758         while (bytes_left && !done) {
759                 int ret = recv(fd, buf, bytes_left, MSG_WAITALL);
760
761                 if (ret == 0)
762                         break;
763                 else if (ret < 0) {
764                         if (errno != EAGAIN) {
765                                 perror("server: net_recv_data: recv failed");
766                                 break;
767                         } else
768                                 break;
769                 } else {
770                         buf += ret;
771                         bytes_left -= ret;
772                 }
773         }
774
775         return len - bytes_left;
776 }
777
778 static int net_recv_data(int fd, void *buf, unsigned int len)
779 {
780         return __net_recv_data(fd, buf, len);
781 }
782
783 /*
784  * Returns number of bytes written
785  */
786 static int net_send_data(int fd, void *buf, unsigned int buf_len)
787 {
788         int ret;
789         unsigned int bytes_left = buf_len;
790
791         while (bytes_left) {
792                 ret = send(fd, buf, bytes_left, 0);
793                 if (ret < 0) {
794                         perror("send");
795                         break;
796                 }
797
798                 buf += ret;
799                 bytes_left -= ret;
800         }
801
802         return buf_len - bytes_left;
803 }
804
805 static int net_send_header(int fd, int cpu, char *buts_name, int len)
806 {
807         struct blktrace_net_hdr hdr;
808
809         memset(&hdr, 0, sizeof(hdr));
810
811         hdr.magic = BLK_IO_TRACE_MAGIC;
812         strncpy(hdr.buts_name, buts_name, sizeof(hdr.buts_name));
813         hdr.buts_name[sizeof(hdr.buts_name)-1] = '\0';
814         hdr.cpu = cpu;
815         hdr.max_cpus = ncpus;
816         hdr.len = len;
817         hdr.cl_id = getpid();
818         hdr.buf_size = buf_size;
819         hdr.buf_nr = buf_nr;
820         hdr.page_size = pagesize;
821
822         return net_send_data(fd, &hdr, sizeof(hdr)) != sizeof(hdr);
823 }
824
825 static void net_send_open_close(int fd, int cpu, char *buts_name, int len)
826 {
827         struct blktrace_net_hdr ret_hdr;
828
829         net_send_header(fd, cpu, buts_name, len);
830         net_recv_data(fd, &ret_hdr, sizeof(ret_hdr));
831 }
832
833 static void net_send_open(int fd, int cpu, char *buts_name)
834 {
835         net_send_open_close(fd, cpu, buts_name, 0);
836 }
837
838 static void net_send_close(int fd, char *buts_name, int drops)
839 {
840         /*
841          * Overload CPU w/ number of drops
842          *
843          * XXX: Need to clear/set done around call - done=1 (which
844          * is true here) stops reads from happening... :-(
845          */
846         done = 0;
847         net_send_open_close(fd, drops, buts_name, 1);
848         done = 1;
849 }
850
851 static void ack_open_close(int fd, char *buts_name)
852 {
853         net_send_header(fd, 0, buts_name, 2);
854 }
855
856 static void net_send_drops(int fd)
857 {
858         struct list_head *p;
859
860         __list_for_each(p, &devpaths) {
861                 struct devpath *dpp = list_entry(p, struct devpath, head);
862
863                 net_send_close(fd, dpp->buts_name, dpp->drops);
864         }
865 }
866
867 /*
868  * Returns:
869  *       0: "EOF"
870  *       1: OK
871  *      -1: Error
872  */
873 static int net_get_header(struct cl_conn *nc, struct blktrace_net_hdr *bnh)
874 {
875         int bytes_read;
876         int fl = fcntl(nc->fd, F_GETFL);
877
878         fcntl(nc->fd, F_SETFL, fl | O_NONBLOCK);
879         bytes_read = __net_recv_data(nc->fd, bnh, sizeof(*bnh));
880         fcntl(nc->fd, F_SETFL, fl & ~O_NONBLOCK);
881
882         if (bytes_read == sizeof(*bnh))
883                 return 1;
884         else if (bytes_read == 0)
885                 return 0;
886         return -1;
887 }
888
889 static int net_setup_addr(void)
890 {
891         struct sockaddr_in *addr = &hostname_addr;
892
893         memset(addr, 0, sizeof(*addr));
894         addr->sin_family = AF_INET;
895         addr->sin_port = htons(net_port);
896
897         if (inet_aton(hostname, &addr->sin_addr) != 1) {
898                 struct hostent *hent;
899 retry:
900                 hent = gethostbyname(hostname);
901                 if (!hent) {
902                         if (h_errno == TRY_AGAIN) {
903                                 usleep(100);
904                                 goto retry;
905                         } else if (h_errno == NO_RECOVERY) {
906                                 fprintf(stderr, "gethostbyname(%s)"
907                                         "non-recoverable error encountered\n",
908                                         hostname);
909                         } else {
910                                 /*
911                                  * HOST_NOT_FOUND, NO_ADDRESS or NO_DATA
912                                  */
913                                 fprintf(stderr, "Host %s not found\n",
914                                         hostname);
915                         }
916                         return 1;
917                 }
918
919                 memcpy(&addr->sin_addr, hent->h_addr, 4);
920                 strcpy(hostname, hent->h_name);
921         }
922
923         return 0;
924 }
925
926 static int net_setup_client(void)
927 {
928         int fd;
929         struct sockaddr_in *addr = &hostname_addr;
930
931         fd = my_socket(AF_INET, SOCK_STREAM, 0);
932         if (fd < 0) {
933                 perror("client: socket");
934                 return -1;
935         }
936
937         if (connect(fd, (struct sockaddr *)addr, sizeof(*addr)) < 0) {
938                 if (errno == ECONNREFUSED)
939                         fprintf(stderr,
940                                 "\nclient: Connection to %s refused, "
941                                 "perhaps the server is not started?\n\n",
942                                 hostname);
943                 else
944                         perror("client: connect");
945                 close(fd);
946                 return -1;
947         }
948
949         return fd;
950 }
951
952 static int open_client_connections(void)
953 {
954         int cpu;
955
956         cl_fds = calloc(ncpus, sizeof(*cl_fds));
957         for (cpu = 0; cpu < ncpus; cpu++) {
958                 cl_fds[cpu] = net_setup_client();
959                 if (cl_fds[cpu] < 0)
960                         goto err;
961         }
962         return 0;
963
964 err:
965         while (cpu > 0)
966                 close(cl_fds[cpu--]);
967         free(cl_fds);
968         return 1;
969 }
970
971 static void close_client_connections(void)
972 {
973         if (cl_fds) {
974                 int cpu, *fdp;
975
976                 for (cpu = 0, fdp = cl_fds; cpu < ncpus; cpu++, fdp++) {
977                         if (*fdp >= 0) {
978                                 net_send_drops(*fdp);
979                                 net_close_connection(fdp);
980                         }
981                 }
982                 free(cl_fds);
983         }
984 }
985
986 static void setup_buts(void)
987 {
988         struct list_head *p;
989
990         __list_for_each(p, &devpaths) {
991                 struct blk_user_trace_setup buts;
992                 struct devpath *dpp = list_entry(p, struct devpath, head);
993
994                 memset(&buts, 0, sizeof(buts));
995                 buts.buf_size = buf_size;
996                 buts.buf_nr = buf_nr;
997                 buts.act_mask = act_mask;
998
999                 if (ioctl(dpp->fd, BLKTRACESETUP, &buts) < 0) {
1000                         fprintf(stderr, "BLKTRACESETUP(2) %s failed: %d/%s\n",
1001                                 dpp->path, errno, strerror(errno));
1002                 }
1003                 else {
1004                         dpp->ncpus = ncpus;
1005                         dpp->buts_name = strdup(buts.name);
1006                         if (dpp->stats)
1007                                 free(dpp->stats);
1008                         dpp->stats = calloc(dpp->ncpus, sizeof(*dpp->stats));
1009                         memset(dpp->stats, 0, dpp->ncpus * sizeof(*dpp->stats));
1010                 }
1011         }
1012 }
1013
1014 static void start_buts(void)
1015 {
1016         struct list_head *p;
1017
1018         __list_for_each(p, &devpaths) {
1019                 struct devpath *dpp = list_entry(p, struct devpath, head);
1020
1021                 if (ioctl(dpp->fd, BLKTRACESTART) < 0) {
1022                         fprintf(stderr, "BLKTRACESTART %s failed: %d/%s\n",
1023                                 dpp->path, errno, strerror(errno));
1024                 }
1025         }
1026 }
1027
1028 static int get_drops(struct devpath *dpp)
1029 {
1030         int fd, drops = 0;
1031         char fn[MAXPATHLEN + 64], tmp[256];
1032
1033         snprintf(fn, sizeof(fn), "%s/block/%s/dropped", debugfs_path,
1034                  dpp->buts_name);
1035
1036         fd = my_open(fn, O_RDONLY);
1037         if (fd < 0) {
1038                 /*
1039                  * This may be ok: the kernel may not support
1040                  * dropped counts.
1041                  */
1042                 if (errno != ENOENT)
1043                         fprintf(stderr, "Could not open %s: %d/%s\n",
1044                                 fn, errno, strerror(errno));
1045                 return 0;
1046         } else if (read(fd, tmp, sizeof(tmp)) < 0) {
1047                 fprintf(stderr, "Could not read %s: %d/%s\n",
1048                         fn, errno, strerror(errno));
1049         } else
1050                 drops = atoi(tmp);
1051         close(fd);
1052
1053         return drops;
1054 }
1055
1056 static void get_all_drops(void)
1057 {
1058         struct list_head *p;
1059
1060         __list_for_each(p, &devpaths) {
1061                 struct devpath *dpp = list_entry(p, struct devpath, head);
1062                 dpp->drops = get_drops(dpp);
1063         }
1064 }
1065
1066 static inline struct trace_buf *alloc_trace_buf(int cpu, int bufsize)
1067 {
1068         struct trace_buf *tbp;
1069
1070         tbp = malloc(sizeof(*tbp) + bufsize);
1071         INIT_LIST_HEAD(&tbp->head);
1072         tbp->len = 0;
1073         tbp->buf = (void *)(tbp + 1);
1074         tbp->cpu = cpu;
1075         tbp->dpp = NULL;        /* Will be set when tbp is added */
1076
1077         return tbp;
1078 }
1079
1080 static void free_tracer_heads(struct devpath *dpp)
1081 {
1082         int cpu;
1083         struct tracer_devpath_head *hd;
1084
1085         for (cpu = 0, hd = dpp->heads; cpu < ncpus; cpu++, hd++) {
1086                 if (hd->prev)
1087                         free(hd->prev);
1088                 pthread_mutex_destroy(&hd->mutex);
1089         }
1090         free(dpp->heads);
1091 }
1092
1093 static int setup_tracer_devpaths(void)
1094 {
1095         struct list_head *p;
1096
1097         if (net_client_use_send())
1098                 if (open_client_connections())
1099                         return 1;
1100
1101         __list_for_each(p, &devpaths) {
1102                 int cpu;
1103                 struct tracer_devpath_head *hd;
1104                 struct devpath *dpp = list_entry(p, struct devpath, head);
1105
1106                 dpp->heads = calloc(ncpus, sizeof(struct tracer_devpath_head));
1107                 for (cpu = 0, hd = dpp->heads; cpu < ncpus; cpu++, hd++) {
1108                         INIT_LIST_HEAD(&hd->head);
1109                         pthread_mutex_init(&hd->mutex, NULL);
1110                         hd->prev = NULL;
1111                 }
1112         }
1113
1114         return 0;
1115 }
1116
1117 static inline void add_trace_buf(struct devpath *dpp, int cpu,
1118                                                 struct trace_buf **tbpp)
1119 {
1120         struct trace_buf *tbp = *tbpp;
1121         struct tracer_devpath_head *hd = &dpp->heads[cpu];
1122
1123         tbp->dpp = dpp;
1124
1125         pthread_mutex_lock(&hd->mutex);
1126         list_add_tail(&tbp->head, &hd->head);
1127         pthread_mutex_unlock(&hd->mutex);
1128
1129         *tbpp = alloc_trace_buf(cpu, buf_size);
1130 }
1131
1132 static inline void incr_entries(int entries_handled)
1133 {
1134         pthread_mutex_lock(&dp_mutex);
1135         if (dp_entries == 0)
1136                 pthread_cond_signal(&dp_cond);
1137         dp_entries += entries_handled;
1138         pthread_mutex_unlock(&dp_mutex);
1139 }
1140
1141 static int add_devpath(char *path)
1142 {
1143         int fd;
1144         struct devpath *dpp;
1145
1146         /*
1147          * Verify device is valid before going too far
1148          */
1149         fd = my_open(path, O_RDONLY | O_NONBLOCK);
1150         if (fd < 0) {
1151                 fprintf(stderr, "Invalid path %s specified: %d/%s\n",
1152                         path, errno, strerror(errno));
1153                 return 1;
1154         }
1155
1156         dpp = malloc(sizeof(*dpp));
1157         memset(dpp, 0, sizeof(*dpp));
1158         dpp->path = strdup(path);
1159         dpp->fd = fd;
1160         dpp->idx = ndevs++;
1161         list_add_tail(&dpp->head, &devpaths);
1162
1163         return 0;
1164 }
1165
1166 static void rel_devpaths(void)
1167 {
1168         struct list_head *p, *q;
1169
1170         list_for_each_safe(p, q, &devpaths) {
1171                 struct devpath *dpp = list_entry(p, struct devpath, head);
1172
1173                 list_del(&dpp->head);
1174                 __stop_trace(dpp->fd);
1175                 close(dpp->fd);
1176
1177                 if (dpp->heads)
1178                         free_tracer_heads(dpp);
1179
1180                 dpp_free(dpp);
1181                 ndevs--;
1182         }
1183 }
1184
1185 static int flush_subbuf_net(struct trace_buf *tbp)
1186 {
1187         int fd = cl_fds[tbp->cpu];
1188         struct devpath *dpp = tbp->dpp;
1189
1190         if (net_send_header(fd, tbp->cpu, dpp->buts_name, tbp->len))
1191                 return 1;
1192
1193         if (net_send_data(fd, tbp->buf, tbp->len) != tbp->len)
1194                 return 1;
1195
1196         return 0;
1197 }
1198
1199 static int
1200 handle_list_net(__attribute__((__unused__))struct tracer_devpath_head *hd,
1201                 struct list_head *list)
1202 {
1203         struct trace_buf *tbp;
1204         struct list_head *p, *q;
1205         int entries_handled = 0;
1206
1207         list_for_each_safe(p, q, list) {
1208                 tbp = list_entry(p, struct trace_buf, head);
1209
1210                 list_del(&tbp->head);
1211                 entries_handled++;
1212
1213                 if (cl_fds[tbp->cpu] >= 0) {
1214                         if (flush_subbuf_net(tbp)) {
1215                                 close(cl_fds[tbp->cpu]);
1216                                 cl_fds[tbp->cpu] = -1;
1217                         }
1218                 }
1219
1220                 free(tbp);
1221         }
1222
1223         return entries_handled;
1224 }
1225
1226 static int handle_list_file(struct tracer_devpath_head *hd,
1227                             struct list_head *list)
1228 {
1229         int off, t_len, nevents;
1230         struct blk_io_trace *t;
1231         struct list_head *p, *q;
1232         int entries_handled = 0;
1233         struct trace_buf *tbp, *prev;
1234
1235         prev = hd->prev;
1236         list_for_each_safe(p, q, list) {
1237                 tbp = list_entry(p, struct trace_buf, head);
1238                 list_del(&tbp->head);
1239                 entries_handled++;
1240
1241                 /*
1242                  * If there was some leftover before, tack this new
1243                  * entry onto the tail of the previous one.
1244                  */
1245                 if (prev) {
1246                         unsigned long tot_len;
1247                         struct trace_buf *tmp = tbp;
1248
1249                         tbp = prev;
1250                         prev = NULL;
1251
1252                         tot_len = tbp->len + tmp->len;
1253                         if (tot_len > buf_size) {
1254                                 /*
1255                                  * tbp->head isn't connected (it was 'prev'
1256                                  * so it had been taken off of the list
1257                                  * before). Therefore, we can realloc
1258                                  * the whole structures, as the other fields
1259                                  * are "static".
1260                                  */
1261                                 tbp = realloc(tbp->buf, sizeof(*tbp) + tot_len);
1262                                 tbp->buf = (void *)(tbp + 1);
1263                         }
1264
1265                         memcpy(tbp->buf + tbp->len, tmp->buf, tmp->len);
1266                         tbp->len = tot_len;
1267
1268                         free(tmp);
1269                 }
1270
1271                 /*
1272                  * See how many whole traces there are - send them
1273                  * all out in one go.
1274                  */
1275                 off = 0;
1276                 nevents = 0;
1277                 while (off + (int)sizeof(*t) <= tbp->len) {
1278                         t = (struct blk_io_trace *)(tbp->buf + off);
1279                         t_len = sizeof(*t) + t->pdu_len;
1280                         if (off + t_len > tbp->len)
1281                                 break;
1282
1283                         off += t_len;
1284                         nevents++;
1285                 }
1286                 if (nevents)
1287                         pdc_nev_update(tbp->dpp, tbp->cpu, nevents);
1288
1289                 /*
1290                  * Write any full set of traces, any remaining data is kept
1291                  * for the next pass.
1292                  */
1293                 if (off) {
1294                         if (write_data(tbp->buf, off) || off == tbp->len)
1295                                 free(tbp);
1296                         else {
1297                                 /*
1298                                  * Move valid data to beginning of buffer
1299                                  */
1300                                 tbp->len -= off;
1301                                 memmove(tbp->buf, tbp->buf + off, tbp->len);
1302                                 prev = tbp;
1303                         }
1304                 } else
1305                         prev = tbp;
1306         }
1307         hd->prev = prev;
1308
1309         return entries_handled;
1310 }
1311
1312 static void __process_trace_bufs(void)
1313 {
1314         int cpu;
1315         struct list_head *p;
1316         struct list_head list;
1317         int handled = 0;
1318
1319         __list_for_each(p, &devpaths) {
1320                 struct devpath *dpp = list_entry(p, struct devpath, head);
1321                 struct tracer_devpath_head *hd = dpp->heads;
1322
1323                 for (cpu = 0; cpu < ncpus; cpu++, hd++) {
1324                         pthread_mutex_lock(&hd->mutex);
1325                         if (list_empty(&hd->head)) {
1326                                 pthread_mutex_unlock(&hd->mutex);
1327                                 continue;
1328                         }
1329
1330                         list_replace_init(&hd->head, &list);
1331                         pthread_mutex_unlock(&hd->mutex);
1332
1333                         handled += handle_list(hd, &list);
1334                 }
1335         }
1336
1337         if (handled) {
1338                 pthread_mutex_lock(&dp_mutex);
1339                 dp_entries -= handled;
1340                 pthread_mutex_unlock(&dp_mutex);
1341         }
1342 }
1343
1344 static void process_trace_bufs(void)
1345 {
1346         while (!done) {
1347                 pthread_mutex_lock(&dp_mutex);
1348                 while (!done && dp_entries == 0)
1349                         t_pthread_cond_wait(&dp_cond, &dp_mutex);
1350                 pthread_mutex_unlock(&dp_mutex);
1351
1352                 __process_trace_bufs();
1353         }
1354 }
1355
1356 static void clean_trace_bufs(void)
1357 {
1358         /*
1359          * No mutex needed here: we're only reading from the lists,
1360          * tracers are done
1361          */
1362         while (dp_entries)
1363                 __process_trace_bufs();
1364 }
1365
1366 static inline void read_err(int cpu, char *ifn)
1367 {
1368         if (errno != EAGAIN)
1369                 fprintf(stderr, "Thread %d failed read of %s: %d/%s\n",
1370                         cpu, ifn, errno, strerror(errno));
1371 }
1372
1373 static int net_sendfile(struct io_info *iop)
1374 {
1375         int ret;
1376
1377         ret = sendfile(iop->ofd, iop->ifd, NULL, iop->ready);
1378         if (ret < 0) {
1379                 perror("sendfile");
1380                 return 1;
1381         } else if (ret < (int)iop->ready) {
1382                 fprintf(stderr, "short sendfile send (%d of %d)\n",
1383                         ret, iop->ready);
1384                 return 1;
1385         }
1386
1387         return 0;
1388 }
1389
1390 static inline int net_sendfile_data(struct tracer *tp, struct io_info *iop)
1391 {
1392         struct devpath *dpp = iop->dpp;
1393
1394         if (net_send_header(iop->ofd, tp->cpu, dpp->buts_name, iop->ready))
1395                 return 1;
1396         return net_sendfile(iop);
1397 }
1398
1399 static int handle_pfds_netclient(struct tracer *tp, int nevs, int force_read)
1400 {
1401         struct stat sb;
1402         int i, nentries = 0;
1403         struct pdc_stats *sp;
1404         struct pollfd *pfd = tp->pfds;
1405         struct io_info *iop = tp->ios;
1406
1407         for (i = 0; nevs > 0 && i < ndevs; i++, pfd++, iop++, sp++) {
1408                 if (pfd->revents & POLLIN || force_read) {
1409                         if (fstat(iop->ifd, &sb) < 0) {
1410                                 perror(iop->ifn);
1411                                 pfd->events = 0;
1412                         } else if (sb.st_size > (off_t)iop->data_queued) {
1413                                 iop->ready = sb.st_size - iop->data_queued;
1414                                 iop->data_queued = sb.st_size;
1415                                 if (!net_sendfile_data(tp, iop)) {
1416                                         pdc_dr_update(iop->dpp, tp->cpu,
1417                                                       iop->ready);
1418                                         nentries++;
1419                                 } else
1420                                         clear_events(pfd);
1421                         }
1422                         nevs--;
1423                 }
1424         }
1425
1426         if (nentries)
1427                 incr_entries(nentries);
1428
1429         return nentries;
1430 }
1431
1432 static int handle_pfds_entries(struct tracer *tp, int nevs, int force_read)
1433 {
1434         int i, nentries = 0;
1435         struct trace_buf *tbp;
1436         struct pollfd *pfd = tp->pfds;
1437         struct io_info *iop = tp->ios;
1438
1439         tbp = alloc_trace_buf(tp->cpu, buf_size);
1440         for (i = 0; i < ndevs; i++, pfd++, iop++) {
1441                 if (pfd->revents & POLLIN || force_read) {
1442                         tbp->len = read(iop->ifd, tbp->buf, buf_size);
1443                         if (tbp->len > 0) {
1444                                 pdc_dr_update(iop->dpp, tp->cpu, tbp->len);
1445                                 add_trace_buf(iop->dpp, tp->cpu, &tbp);
1446                                 nentries++;
1447                         } else if (tbp->len == 0) {
1448                                 /*
1449                                  * Short reads after we're done stop us
1450                                  * from trying reads.
1451                                  */
1452                                 if (tp->is_done)
1453                                         clear_events(pfd);
1454                         } else {
1455                                 read_err(tp->cpu, iop->ifn);
1456                                 if (errno != EAGAIN || tp->is_done)
1457                                         clear_events(pfd);
1458                         }
1459                         if (!piped_output && --nevs == 0)
1460                                 break;
1461                 }
1462         }
1463         free(tbp);
1464
1465         if (nentries)
1466                 incr_entries(nentries);
1467
1468         return nentries;
1469 }
1470
1471 static int fill_ofname(struct io_info *iop, int cpu)
1472 {
1473         int len;
1474         struct stat sb;
1475         char *dst = iop->ofn;
1476
1477         if (output_dir)
1478                 len = snprintf(iop->ofn, sizeof(iop->ofn), "%s/", output_dir);
1479         else
1480                 len = snprintf(iop->ofn, sizeof(iop->ofn), "./");
1481
1482         if (net_mode == Net_server) {
1483                 struct cl_conn *nc = iop->nc;
1484
1485                 len += sprintf(dst + len, "%s-", nc->ch->hostname);
1486                 len += strftime(dst + len, 64, "%F-%T/",
1487                                 gmtime(&iop->dpp->cl_connect_time));
1488         }
1489
1490         if (stat(iop->ofn, &sb) < 0) {
1491                 if (errno != ENOENT) {
1492                         fprintf(stderr,
1493                                 "Destination dir %s stat failed: %d/%s\n",
1494                                 iop->ofn, errno, strerror(errno));
1495                         return 1;
1496                 }
1497                 if (mkdir(iop->ofn, 0755) < 0) {
1498                         fprintf(stderr,
1499                                 "Destination dir %s can't be made: %d/%s\n",
1500                                 iop->ofn, errno, strerror(errno));
1501                         return 1;
1502                 }
1503         }
1504
1505         if (output_name)
1506                 snprintf(iop->ofn + len, sizeof(iop->ofn), "%s.blktrace.%d",
1507                          output_name, cpu);
1508         else
1509                 snprintf(iop->ofn + len, sizeof(iop->ofn), "%s.blktrace.%d",
1510                          iop->dpp->buts_name, cpu);
1511
1512         return 0;
1513 }
1514
1515 static int set_vbuf(struct io_info *iop, int mode, size_t size)
1516 {
1517         iop->obuf = malloc(size);
1518         if (setvbuf(iop->ofp, iop->obuf, mode, size) < 0) {
1519                 fprintf(stderr, "setvbuf(%s, %d) failed: %d/%s\n",
1520                         iop->dpp->path, (int)size, errno,
1521                         strerror(errno));
1522                 free(iop->obuf);
1523                 return 1;
1524         }
1525
1526         return 0;
1527 }
1528
1529 static int iop_open(struct io_info *iop, int cpu)
1530 {
1531         iop->ofd = -1;
1532         if (fill_ofname(iop, cpu))
1533                 return 1;
1534
1535         iop->ofp = my_fopen(iop->ofn, "w+");
1536         if (iop->ofp == NULL) {
1537                 fprintf(stderr, "Open output file %s failed: %d/%s\n",
1538                         iop->ofn, errno, strerror(errno));
1539                 return 1;
1540         }
1541         if (set_vbuf(iop, _IOLBF, FILE_VBUF_SIZE)) {
1542                 fprintf(stderr, "set_vbuf for file %s failed: %d/%s\n",
1543                         iop->ofn, errno, strerror(errno));
1544                 fclose(iop->ofp);
1545                 return 1;
1546         }
1547
1548         iop->ofd = fileno(iop->ofp);
1549         return 0;
1550 }
1551
1552 static void close_iop(struct io_info *iop)
1553 {
1554         struct mmap_info *mip = &iop->mmap_info;
1555
1556         if (mip->fs_buf)
1557                 munmap(mip->fs_buf, mip->fs_buf_len);
1558
1559         if (!piped_output) {
1560                 if (ftruncate(fileno(iop->ofp), mip->fs_size) < 0) {
1561                         fprintf(stderr,
1562                                 "Ignoring err: ftruncate(%s): %d/%s\n",
1563                                 iop->ofn, errno, strerror(errno));
1564                 }
1565         }
1566
1567         if (iop->ofp)
1568                 fclose(iop->ofp);
1569         if (iop->obuf)
1570                 free(iop->obuf);
1571 }
1572
1573 static void close_ios(struct tracer *tp)
1574 {
1575         while (tp->nios > 0) {
1576                 struct io_info *iop = &tp->ios[--tp->nios];
1577
1578                 iop->dpp->drops = get_drops(iop->dpp);
1579                 if (iop->ifd >= 0)
1580                         close(iop->ifd);
1581
1582                 if (iop->ofp)
1583                         close_iop(iop);
1584                 else if (iop->ofd >= 0) {
1585                         struct devpath *dpp = iop->dpp;
1586
1587                         net_send_close(iop->ofd, dpp->buts_name, dpp->drops);
1588                         net_close_connection(&iop->ofd);
1589                 }
1590         }
1591
1592         free(tp->ios);
1593         free(tp->pfds);
1594 }
1595
1596 static int open_ios(struct tracer *tp)
1597 {
1598         struct pollfd *pfd;
1599         struct io_info *iop;
1600         struct list_head *p;
1601
1602         tp->ios = calloc(ndevs, sizeof(struct io_info));
1603         tp->pfds = calloc(ndevs, sizeof(struct pollfd));
1604
1605         memset(tp->ios, 0, ndevs * sizeof(struct io_info));
1606         memset(tp->pfds, 0, ndevs * sizeof(struct pollfd));
1607
1608         tp->nios = 0;
1609         iop = tp->ios;
1610         pfd = tp->pfds;
1611         __list_for_each(p, &devpaths) {
1612                 struct devpath *dpp = list_entry(p, struct devpath, head);
1613
1614                 iop->dpp = dpp;
1615                 iop->ofd = -1;
1616                 snprintf(iop->ifn, sizeof(iop->ifn), "%s/block/%s/trace%d",
1617                         debugfs_path, dpp->buts_name, tp->cpu);
1618
1619                 iop->ifd = my_open(iop->ifn, O_RDONLY | O_NONBLOCK);
1620                 if (iop->ifd < 0) {
1621                         fprintf(stderr, "Thread %d failed open %s: %d/%s\n",
1622                                 tp->cpu, iop->ifn, errno, strerror(errno));
1623                         return 1;
1624                 }
1625
1626                 init_mmap_info(&iop->mmap_info);
1627
1628                 pfd->fd = iop->ifd;
1629                 pfd->events = POLLIN;
1630
1631                 if (piped_output)
1632                         ;
1633                 else if (net_client_use_sendfile()) {
1634                         iop->ofd = net_setup_client();
1635                         if (iop->ofd < 0)
1636                                 goto err;
1637                         net_send_open(iop->ofd, tp->cpu, dpp->buts_name);
1638                 } else if (net_mode == Net_none) {
1639                         if (iop_open(iop, tp->cpu))
1640                                 goto err;
1641                 } else {
1642                         /*
1643                          * This ensures that the server knows about all
1644                          * connections & devices before _any_ closes
1645                          */
1646                         net_send_open(cl_fds[tp->cpu], tp->cpu, dpp->buts_name);
1647                 }
1648
1649                 pfd++;
1650                 iop++;
1651                 tp->nios++;
1652         }
1653
1654         return 0;
1655
1656 err:
1657         close(iop->ifd);        /* tp->nios _not_ bumped */
1658         close_ios(tp);
1659         return 1;
1660 }
1661
1662 static int setup_mmap(int fd, unsigned int maxlen, struct mmap_info *mip)
1663 {
1664         if (mip->fs_off + maxlen > mip->fs_buf_len) {
1665                 unsigned long nr = max(16, mip->buf_nr);
1666
1667                 if (mip->fs_buf) {
1668                         munlock(mip->fs_buf, mip->fs_buf_len);
1669                         munmap(mip->fs_buf, mip->fs_buf_len);
1670                         mip->fs_buf = NULL;
1671                 }
1672
1673                 mip->fs_off = mip->fs_size & (mip->pagesize - 1);
1674                 mip->fs_buf_len = (nr * mip->buf_size) - mip->fs_off;
1675                 mip->fs_max_size += mip->fs_buf_len;
1676
1677                 if (ftruncate(fd, mip->fs_max_size) < 0) {
1678                         perror("__setup_mmap: ftruncate");
1679                         return 1;
1680                 }
1681
1682                 mip->fs_buf = my_mmap(NULL, mip->fs_buf_len, PROT_WRITE,
1683                                       MAP_SHARED, fd,
1684                                       mip->fs_size - mip->fs_off);
1685                 if (mip->fs_buf == MAP_FAILED) {
1686                         perror("__setup_mmap: mmap");
1687                         return 1;
1688                 }
1689                 my_mlock(mip->fs_buf, mip->fs_buf_len);
1690         }
1691
1692         return 0;
1693 }
1694
1695 static int handle_pfds_file(struct tracer *tp, int nevs, int force_read)
1696 {
1697         struct mmap_info *mip;
1698         int i, ret, nentries = 0;
1699         struct pollfd *pfd = tp->pfds;
1700         struct io_info *iop = tp->ios;
1701
1702         for (i = 0; nevs > 0 && i < ndevs; i++, pfd++, iop++) {
1703                 if (pfd->revents & POLLIN || force_read) {
1704                         mip = &iop->mmap_info;
1705
1706                         ret = setup_mmap(iop->ofd, buf_size, mip);
1707                         if (ret < 0) {
1708                                 pfd->events = 0;
1709                                 break;
1710                         }
1711
1712                         ret = read(iop->ifd, mip->fs_buf + mip->fs_off,
1713                                    buf_size);
1714                         if (ret > 0) {
1715                                 pdc_dr_update(iop->dpp, tp->cpu, ret);
1716                                 mip->fs_size += ret;
1717                                 mip->fs_off += ret;
1718                                 nentries++;
1719                         } else if (ret == 0) {
1720                                 /*
1721                                  * Short reads after we're done stop us
1722                                  * from trying reads.
1723                                  */
1724                                 if (tp->is_done)
1725                                         clear_events(pfd);
1726                         } else {
1727                                 read_err(tp->cpu, iop->ifn);
1728                                 if (errno != EAGAIN || tp->is_done)
1729                                         clear_events(pfd);
1730                         }
1731                         nevs--;
1732                 }
1733         }
1734
1735         return nentries;
1736 }
1737
1738 static void *thread_main(void *arg)
1739 {
1740         int ret, ndone, to_val;
1741         struct tracer *tp = arg;
1742
1743         ret = lock_on_cpu(tp->cpu);
1744         if (ret)
1745                 goto err;
1746
1747         ret = open_ios(tp);
1748         if (ret)
1749                 goto err;
1750
1751         if (piped_output)
1752                 to_val = 50;            /* Frequent partial handles */
1753         else
1754                 to_val = 500;           /* 1/2 second intervals */
1755
1756
1757         tracer_signal_ready(tp, Th_running, 0);
1758         tracer_wait_unblock(tp);
1759
1760         while (!tp->is_done) {
1761                 ndone = poll(tp->pfds, ndevs, to_val);
1762                 if (ndone || piped_output)
1763                         (void)handle_pfds(tp, ndone, piped_output);
1764                 else if (ndone < 0 && errno != EINTR)
1765                         fprintf(stderr, "Thread %d poll failed: %d/%s\n",
1766                                 tp->cpu, errno, strerror(errno));
1767         }
1768
1769         /*
1770          * Trace is stopped, pull data until we get a short read
1771          */
1772         while (handle_pfds(tp, ndevs, 1) > 0)
1773                 ;
1774         close_ios(tp);
1775         tracer_signal_ready(tp, Th_leaving, 0);
1776         return NULL;
1777
1778 err:
1779         tracer_signal_ready(tp, Th_error, ret);
1780         return NULL;
1781 }
1782
1783 static int start_tracer(int cpu)
1784 {
1785         struct tracer *tp;
1786
1787         tp = malloc(sizeof(*tp));
1788         memset(tp, 0, sizeof(*tp));
1789
1790         INIT_LIST_HEAD(&tp->head);
1791         tp->status = 0;
1792         tp->cpu = cpu;
1793
1794         if (pthread_create(&tp->thread, NULL, thread_main, tp)) {
1795                 fprintf(stderr, "FAILED to start thread on CPU %d: %d/%s\n",
1796                         cpu, errno, strerror(errno));
1797                 free(tp);
1798                 return 1;
1799         }
1800
1801         list_add_tail(&tp->head, &tracers);
1802         return 0;
1803 }
1804
1805 static void start_tracers(void)
1806 {
1807         int cpu;
1808         struct list_head *p;
1809
1810         for (cpu = 0; cpu < ncpus; cpu++)
1811                 if (start_tracer(cpu))
1812                         break;
1813
1814         wait_tracers_ready(cpu);
1815
1816         __list_for_each(p, &tracers) {
1817                 struct tracer *tp = list_entry(p, struct tracer, head);
1818                 if (tp->status)
1819                         fprintf(stderr,
1820                                 "FAILED to start thread on CPU %d: %d/%s\n",
1821                                 tp->cpu, tp->status, strerror(tp->status));
1822         }
1823 }
1824
1825 static void stop_tracers(void)
1826 {
1827         struct list_head *p;
1828
1829         /*
1830          * Stop the tracing - makes the tracer threads clean up quicker.
1831          */
1832         __list_for_each(p, &devpaths) {
1833                 struct devpath *dpp = list_entry(p, struct devpath, head);
1834                 (void)ioctl(dpp->fd, BLKTRACESTOP);
1835         }
1836
1837         /*
1838          * Tell each tracer to quit
1839          */
1840         __list_for_each(p, &tracers) {
1841                 struct tracer *tp = list_entry(p, struct tracer, head);
1842                 tp->is_done = 1;
1843         }
1844 }
1845
1846 static void del_tracers(void)
1847 {
1848         struct list_head *p, *q;
1849
1850         list_for_each_safe(p, q, &tracers) {
1851                 struct tracer *tp = list_entry(p, struct tracer, head);
1852
1853                 list_del(&tp->head);
1854                 free(tp);
1855         }
1856 }
1857
1858 static void wait_tracers(void)
1859 {
1860         struct list_head *p;
1861
1862         if (use_tracer_devpaths())
1863                 process_trace_bufs();
1864
1865         wait_tracers_leaving();
1866
1867         __list_for_each(p, &tracers) {
1868                 int ret;
1869                 struct tracer *tp = list_entry(p, struct tracer, head);
1870
1871                 ret = pthread_join(tp->thread, NULL);
1872                 if (ret)
1873                         fprintf(stderr, "Thread join %d failed %d\n",
1874                                 tp->cpu, ret);
1875         }
1876
1877         if (use_tracer_devpaths())
1878                 clean_trace_bufs();
1879
1880         get_all_drops();
1881 }
1882
1883 static void exit_tracing(void)
1884 {
1885         signal(SIGINT, SIG_IGN);
1886         signal(SIGHUP, SIG_IGN);
1887         signal(SIGTERM, SIG_IGN);
1888         signal(SIGALRM, SIG_IGN);
1889
1890         stop_tracers();
1891         wait_tracers();
1892         del_tracers();
1893         rel_devpaths();
1894 }
1895
1896 static void handle_sigint(__attribute__((__unused__)) int sig)
1897 {
1898         done = 1;
1899         stop_tracers();
1900 }
1901
1902 static void show_stats(struct list_head *devpaths)
1903 {
1904         FILE *ofp;
1905         struct list_head *p;
1906         unsigned long long nevents, data_read;
1907         unsigned long long total_drops = 0;
1908         unsigned long long total_events = 0;
1909
1910         if (piped_output)
1911                 ofp = my_fopen("/dev/null", "w");
1912         else
1913                 ofp = stdout;
1914
1915         __list_for_each(p, devpaths) {
1916                 int cpu;
1917                 struct pdc_stats *sp;
1918                 struct devpath *dpp = list_entry(p, struct devpath, head);
1919
1920                 if (net_mode == Net_server)
1921                         printf("server: end of run for %s:%s\n",
1922                                 dpp->ch->hostname, dpp->buts_name);
1923
1924                 data_read = 0;
1925                 nevents = 0;
1926
1927                 fprintf(ofp, "=== %s ===\n", dpp->buts_name);
1928                 for (cpu = 0, sp = dpp->stats; cpu < dpp->ncpus; cpu++, sp++) {
1929                         /*
1930                          * Estimate events if not known...
1931                          */
1932                         if (sp->nevents == 0) {
1933                                 sp->nevents = sp->data_read /
1934                                                 sizeof(struct blk_io_trace);
1935                         }
1936
1937                         fprintf(ofp,
1938                                 "  CPU%3d: %20llu events, %8llu KiB data\n",
1939                                 cpu, sp->nevents, (sp->data_read + 1023) >> 10);
1940
1941                         data_read += sp->data_read;
1942                         nevents += sp->nevents;
1943                 }
1944
1945                 fprintf(ofp, "  Total:  %20llu events (dropped %llu),"
1946                              " %8llu KiB data\n", nevents,
1947                              dpp->drops, (data_read + 1024) >> 10);
1948
1949                 total_drops += dpp->drops;
1950                 total_events += (nevents + dpp->drops);
1951         }
1952
1953         fflush(ofp);
1954         if (piped_output)
1955                 fclose(ofp);
1956
1957         if (total_drops) {
1958                 double drops_ratio = 1.0;
1959
1960                 if (total_events)
1961                         drops_ratio = (double)total_drops/(double)total_events;
1962
1963                 fprintf(stderr, "\nYou have %llu (%5.1lf%%) dropped events\n"
1964                                 "Consider using a larger buffer size (-b) "
1965                                 "and/or more buffers (-n)\n",
1966                         total_drops, 100.0 * drops_ratio);
1967         }
1968 }
1969
1970 static int handle_args(int argc, char *argv[])
1971 {
1972         int c, i;
1973         struct statfs st;
1974         int act_mask_tmp = 0;
1975
1976         while ((c = getopt_long(argc, argv, S_OPTS, l_opts, NULL)) >= 0) {
1977                 switch (c) {
1978                 case 'a':
1979                         i = find_mask_map(optarg);
1980                         if (i < 0) {
1981                                 fprintf(stderr, "Invalid action mask %s\n",
1982                                         optarg);
1983                                 return 1;
1984                         }
1985                         act_mask_tmp |= i;
1986                         break;
1987
1988                 case 'A':
1989                         if ((sscanf(optarg, "%x", &i) != 1) ||
1990                                                         !valid_act_opt(i)) {
1991                                 fprintf(stderr,
1992                                         "Invalid set action mask %s/0x%x\n",
1993                                         optarg, i);
1994                                 return 1;
1995                         }
1996                         act_mask_tmp = i;
1997                         break;
1998
1999                 case 'd':
2000                         if (add_devpath(optarg) != 0)
2001                                 return 1;
2002                         break;
2003
2004                 case 'I': {
2005                         char dev_line[256];
2006                         FILE *ifp = my_fopen(optarg, "r");
2007
2008                         if (!ifp) {
2009                                 fprintf(stderr,
2010                                         "Invalid file for devices %s\n",
2011                                         optarg);
2012                                 return 1;
2013                         }
2014
2015                         while (fscanf(ifp, "%s\n", dev_line) == 1)
2016                                 if (add_devpath(dev_line) != 0)
2017                                         return 1;
2018                         break;
2019                 }
2020
2021                 case 'r':
2022                         debugfs_path = optarg;
2023                         break;
2024
2025                 case 'o':
2026                         output_name = optarg;
2027                         break;
2028                 case 'k':
2029                         kill_running_trace = 1;
2030                         break;
2031                 case 'w':
2032                         stop_watch = atoi(optarg);
2033                         if (stop_watch <= 0) {
2034                                 fprintf(stderr,
2035                                         "Invalid stopwatch value (%d secs)\n",
2036                                         stop_watch);
2037                                 return 1;
2038                         }
2039                         break;
2040                 case 'V':
2041                 case 'v':
2042                         printf("%s version %s\n", argv[0], blktrace_version);
2043                         exit(0);
2044                         /*NOTREACHED*/
2045                 case 'b':
2046                         buf_size = strtoul(optarg, NULL, 10);
2047                         if (buf_size <= 0 || buf_size > 16*1024) {
2048                                 fprintf(stderr, "Invalid buffer size (%lu)\n",
2049                                         buf_size);
2050                                 return 1;
2051                         }
2052                         buf_size <<= 10;
2053                         break;
2054                 case 'n':
2055                         buf_nr = strtoul(optarg, NULL, 10);
2056                         if (buf_nr <= 0) {
2057                                 fprintf(stderr,
2058                                         "Invalid buffer nr (%lu)\n", buf_nr);
2059                                 return 1;
2060                         }
2061                         break;
2062                 case 'D':
2063                         output_dir = optarg;
2064                         break;
2065                 case 'h':
2066                         net_mode = Net_client;
2067                         strcpy(hostname, optarg);
2068                         break;
2069                 case 'l':
2070                         net_mode = Net_server;
2071                         break;
2072                 case 'p':
2073                         net_port = atoi(optarg);
2074                         break;
2075                 case 's':
2076                         net_use_sendfile = 0;
2077                         break;
2078                 default:
2079                         show_usage(argv[0]);
2080                         exit(1);
2081                         /*NOTREACHED*/
2082                 }
2083         }
2084
2085         while (optind < argc)
2086                 if (add_devpath(argv[optind++]) != 0)
2087                         return 1;
2088
2089         if (net_mode != Net_server && ndevs == 0) {
2090                 show_usage(argv[0]);
2091                 return 1;
2092         }
2093
2094         if (statfs(debugfs_path, &st) < 0 || st.f_type != (long)DEBUGFS_TYPE) {
2095                 fprintf(stderr, "Invalid debug path %s: %d/%s\n",
2096                         debugfs_path, errno, strerror(errno));
2097                 return 1;
2098         }
2099
2100         if (act_mask_tmp != 0)
2101                 act_mask = act_mask_tmp;
2102
2103         if (net_mode == Net_client && net_setup_addr())
2104                 return 1;
2105
2106         /*
2107          * Set up for appropriate PFD handler based upon output name.
2108          */
2109         if (net_client_use_sendfile())
2110                 handle_pfds = handle_pfds_netclient;
2111         else if (net_client_use_send())
2112                 handle_pfds = handle_pfds_entries;
2113         else if (output_name && (strcmp(output_name, "-") == 0)) {
2114                 piped_output = 1;
2115                 handle_pfds = handle_pfds_entries;
2116                 pfp = stdout;
2117                 setvbuf(pfp, NULL, _IONBF, 0);
2118         } else
2119                 handle_pfds = handle_pfds_file;
2120         return 0;
2121 }
2122
2123 static void ch_add_connection(struct net_server_s *ns, struct cl_host *ch,
2124                               int fd)
2125 {
2126         struct cl_conn *nc;
2127
2128         nc = malloc(sizeof(*nc));
2129         memset(nc, 0, sizeof(*nc));
2130
2131         time(&nc->connect_time);
2132         nc->ch = ch;
2133         nc->fd = fd;
2134         nc->ncpus = -1;
2135
2136         list_add_tail(&nc->ch_head, &ch->conn_list);
2137         ch->connects++;
2138
2139         list_add_tail(&nc->ns_head, &ns->conn_list);
2140         ns->connects++;
2141         ns->pfds = realloc(ns->pfds, (ns->connects+1) * sizeof(struct pollfd));
2142 }
2143
2144 static void ch_rem_connection(struct net_server_s *ns, struct cl_host *ch,
2145                               struct cl_conn *nc)
2146 {
2147         net_close_connection(&nc->fd);
2148
2149         list_del(&nc->ch_head);
2150         ch->connects--;
2151
2152         list_del(&nc->ns_head);
2153         ns->connects--;
2154         ns->pfds = realloc(ns->pfds, (ns->connects+1) * sizeof(struct pollfd));
2155
2156         free(nc);
2157 }
2158
2159 static struct cl_host *net_find_client_host(struct net_server_s *ns,
2160                                             struct in_addr cl_in_addr)
2161 {
2162         struct list_head *p;
2163
2164         __list_for_each(p, &ns->ch_list) {
2165                 struct cl_host *ch = list_entry(p, struct cl_host, head);
2166
2167                 if (in_addr_eq(ch->cl_in_addr, cl_in_addr))
2168                         return ch;
2169         }
2170
2171         return NULL;
2172 }
2173
2174 static struct cl_host *net_add_client_host(struct net_server_s *ns,
2175                                            struct sockaddr_in *addr)
2176 {
2177         struct cl_host *ch;
2178
2179         ch = malloc(sizeof(*ch));
2180         memset(ch, 0, sizeof(*ch));
2181
2182         ch->ns = ns;
2183         ch->cl_in_addr = addr->sin_addr;
2184         list_add_tail(&ch->head, &ns->ch_list);
2185         ns->nchs++;
2186
2187         ch->hostname = strdup(inet_ntoa(addr->sin_addr));
2188         printf("server: connection from %s\n", ch->hostname);
2189
2190         INIT_LIST_HEAD(&ch->conn_list);
2191         INIT_LIST_HEAD(&ch->devpaths);
2192
2193         return ch;
2194 }
2195
2196 static void device_done(struct devpath *dpp, int ncpus)
2197 {
2198         int cpu;
2199         struct io_info *iop;
2200
2201         for (cpu = 0, iop = dpp->ios; cpu < ncpus; cpu++, iop++)
2202                 close_iop(iop);
2203
2204         list_del(&dpp->head);
2205         dpp_free(dpp);
2206 }
2207
2208 static void net_ch_remove(struct cl_host *ch, int ncpus)
2209 {
2210         struct list_head *p, *q;
2211         struct net_server_s *ns = ch->ns;
2212
2213         list_for_each_safe(p, q, &ch->devpaths) {
2214                 struct devpath *dpp = list_entry(p, struct devpath, head);
2215                 device_done(dpp, ncpus);
2216         }
2217
2218         list_for_each_safe(p, q, &ch->conn_list) {
2219                 struct cl_conn *nc = list_entry(p, struct cl_conn, ch_head);
2220
2221                 ch_rem_connection(ns, ch, nc);
2222         }
2223
2224         list_del(&ch->head);
2225         ns->nchs--;
2226
2227         if (ch->hostname)
2228                 free(ch->hostname);
2229         free(ch);
2230 }
2231
2232 static void net_add_connection(struct net_server_s *ns)
2233 {
2234         int fd;
2235         struct cl_host *ch;
2236         socklen_t socklen = sizeof(ns->addr);
2237
2238         fd = my_accept(ns->listen_fd, (struct sockaddr *)&ns->addr, &socklen);
2239         if (fd < 0) {
2240                 /*
2241                  * This is OK: we just won't accept this connection,
2242                  * nothing fatal.
2243                  */
2244                 perror("accept");
2245         } else {
2246                 ch = net_find_client_host(ns, ns->addr.sin_addr);
2247                 if (!ch)
2248                         ch = net_add_client_host(ns, &ns->addr);
2249
2250                 ch_add_connection(ns, ch, fd);
2251         }
2252 }
2253
2254 static struct devpath *nc_add_dpp(struct cl_conn *nc,
2255                                   struct blktrace_net_hdr *bnh,
2256                                   time_t connect_time)
2257 {
2258         int cpu;
2259         struct io_info *iop;
2260         struct devpath *dpp;
2261
2262         dpp = malloc(sizeof(*dpp));
2263         memset(dpp, 0, sizeof(*dpp));
2264
2265         dpp->buts_name = strdup(bnh->buts_name);
2266         dpp->path = strdup(bnh->buts_name);
2267         dpp->fd = -1;
2268         dpp->ch = nc->ch;
2269         dpp->cl_id = bnh->cl_id;
2270         dpp->cl_connect_time = connect_time;
2271         dpp->ncpus = nc->ncpus;
2272         dpp->stats = calloc(dpp->ncpus, sizeof(*dpp->stats));
2273         memset(dpp->stats, 0, dpp->ncpus * sizeof(*dpp->stats));
2274
2275         list_add_tail(&dpp->head, &nc->ch->devpaths);
2276         nc->ch->ndevs++;
2277
2278         dpp->ios = calloc(nc->ncpus, sizeof(*iop));
2279         memset(dpp->ios, 0, ndevs * sizeof(*iop));
2280
2281         for (cpu = 0, iop = dpp->ios; cpu < nc->ncpus; cpu++, iop++) {
2282                 iop->dpp = dpp;
2283                 iop->nc = nc;
2284                 init_mmap_info(&iop->mmap_info);
2285
2286                 if (iop_open(iop, cpu))
2287                         goto err;
2288         }
2289
2290         return dpp;
2291
2292 err:
2293         /*
2294          * Need to unravel what's been done...
2295          */
2296         while (cpu >= 0)
2297                 close_iop(&dpp->ios[cpu--]);
2298         dpp_free(dpp);
2299
2300         return NULL;
2301 }
2302
2303 static struct devpath *nc_find_dpp(struct cl_conn *nc,
2304                                    struct blktrace_net_hdr *bnh)
2305 {
2306         struct list_head *p;
2307         time_t connect_time = nc->connect_time;
2308
2309         __list_for_each(p, &nc->ch->devpaths) {
2310                 struct devpath *dpp = list_entry(p, struct devpath, head);
2311
2312                 if (!strcmp(dpp->buts_name, bnh->buts_name))
2313                         return dpp;
2314
2315                 if (dpp->cl_id == bnh->cl_id)
2316                         connect_time = dpp->cl_connect_time;
2317         }
2318
2319         return nc_add_dpp(nc, bnh, connect_time);
2320 }
2321
2322 static void net_client_read_data(struct cl_conn *nc, struct devpath *dpp,
2323                                  struct blktrace_net_hdr *bnh)
2324 {
2325         int ret;
2326         struct io_info *iop = &dpp->ios[bnh->cpu];
2327         struct mmap_info *mip = &iop->mmap_info;
2328
2329         if (setup_mmap(iop->ofd, bnh->len, &iop->mmap_info)) {
2330                 fprintf(stderr, "ncd(%s:%d): mmap failed\n",
2331                         nc->ch->hostname, nc->fd);
2332                 exit(1);
2333         }
2334
2335         ret = net_recv_data(nc->fd, mip->fs_buf + mip->fs_off, bnh->len);
2336         if (ret > 0) {
2337                 pdc_dr_update(dpp, bnh->cpu, ret);
2338                 mip->fs_size += ret;
2339                 mip->fs_off += ret;
2340         } else if (ret < 0)
2341                 exit(1);
2342 }
2343
2344 /*
2345  * Returns 1 if we closed a host - invalidates other polling information
2346  * that may be present.
2347  */
2348 static int net_client_data(struct cl_conn *nc)
2349 {
2350         int ret;
2351         struct devpath *dpp;
2352         struct blktrace_net_hdr bnh;
2353
2354         ret = net_get_header(nc, &bnh);
2355         if (ret == 0)
2356                 return 0;
2357
2358         if (ret < 0) {
2359                 fprintf(stderr, "ncd(%d): header read failed\n", nc->fd);
2360                 exit(1);
2361         }
2362
2363         if (data_is_native == -1 && check_data_endianness(bnh.magic)) {
2364                 fprintf(stderr, "ncd(%d): received data is bad\n", nc->fd);
2365                 exit(1);
2366         }
2367
2368         if (!data_is_native) {
2369                 bnh.magic = be32_to_cpu(bnh.magic);
2370                 bnh.cpu = be32_to_cpu(bnh.cpu);
2371                 bnh.max_cpus = be32_to_cpu(bnh.max_cpus);
2372                 bnh.len = be32_to_cpu(bnh.len);
2373                 bnh.cl_id = be32_to_cpu(bnh.cl_id);
2374                 bnh.buf_size = be32_to_cpu(bnh.buf_size);
2375                 bnh.buf_nr = be32_to_cpu(bnh.buf_nr);
2376                 bnh.page_size = be32_to_cpu(bnh.page_size);
2377         }
2378
2379         if ((bnh.magic & 0xffffff00) != BLK_IO_TRACE_MAGIC) {
2380                 fprintf(stderr, "ncd(%s:%d): bad data magic\n",
2381                         nc->ch->hostname, nc->fd);
2382                 exit(1);
2383         }
2384
2385         if (nc->ncpus == -1)
2386                 nc->ncpus = bnh.max_cpus;
2387
2388         /*
2389          * len == 0 means the other end is sending us a new connection/dpp
2390          * len == 1 means that the other end signalled end-of-run
2391          */
2392         dpp = nc_find_dpp(nc, &bnh);
2393         if (bnh.len == 0) {
2394                 /*
2395                  * Just adding in the dpp above is enough
2396                  */
2397                 ack_open_close(nc->fd, dpp->buts_name);
2398                 nc->ch->cl_opens++;
2399         } else if (bnh.len == 1) {
2400                 /*
2401                  * overload cpu count with dropped events
2402                  */
2403                 dpp->drops = bnh.cpu;
2404
2405                 ack_open_close(nc->fd, dpp->buts_name);
2406                 if (--nc->ch->cl_opens == 0) {
2407                         show_stats(&nc->ch->devpaths);
2408                         net_ch_remove(nc->ch, nc->ncpus);
2409                         return 1;
2410                 }
2411         } else
2412                 net_client_read_data(nc, dpp, &bnh);
2413
2414         return 0;
2415 }
2416
2417 static void handle_client_data(struct net_server_s *ns, int events)
2418 {
2419         struct cl_conn *nc;
2420         struct pollfd *pfd;
2421         struct list_head *p, *q;
2422
2423         pfd = &ns->pfds[1];
2424         list_for_each_safe(p, q, &ns->conn_list) {
2425                 if (pfd->revents & POLLIN) {
2426                         nc = list_entry(p, struct cl_conn, ns_head);
2427
2428                         if (net_client_data(nc) || --events == 0)
2429                                 break;
2430                 }
2431                 pfd++;
2432         }
2433 }
2434
2435 static void net_setup_pfds(struct net_server_s *ns)
2436 {
2437         struct pollfd *pfd;
2438         struct list_head *p;
2439
2440         ns->pfds[0].fd = ns->listen_fd;
2441         ns->pfds[0].events = POLLIN;
2442
2443         pfd = &ns->pfds[1];
2444         __list_for_each(p, &ns->conn_list) {
2445                 struct cl_conn *nc = list_entry(p, struct cl_conn, ns_head);
2446
2447                 pfd->fd = nc->fd;
2448                 pfd->events = POLLIN;
2449                 pfd++;
2450         }
2451 }
2452
2453 static int net_server_handle_connections(struct net_server_s *ns)
2454 {
2455         int events;
2456
2457         printf("server: waiting for connections...\n");
2458
2459         while (!done) {
2460                 net_setup_pfds(ns);
2461                 events = poll(ns->pfds, ns->connects + 1, -1);
2462                 if (events < 0) {
2463                         if (errno != EINTR) {
2464                                 perror("FATAL: poll error");
2465                                 return 1;
2466                         }
2467                 } else if (events > 0) {
2468                         if (ns->pfds[0].revents & POLLIN) {
2469                                 net_add_connection(ns);
2470                                 events--;
2471                         }
2472
2473                         if (events)
2474                                 handle_client_data(ns, events);
2475                 }
2476         }
2477
2478         return 0;
2479 }
2480
2481 static int net_server(void)
2482 {
2483         int fd, opt;
2484         int ret = 1;
2485         struct net_server_s net_server;
2486         struct net_server_s *ns = &net_server;
2487
2488         memset(ns, 0, sizeof(*ns));
2489         INIT_LIST_HEAD(&ns->ch_list);
2490         INIT_LIST_HEAD(&ns->conn_list);
2491         ns->pfds = malloc(sizeof(struct pollfd));
2492
2493         fd = my_socket(AF_INET, SOCK_STREAM, 0);
2494         if (fd < 0) {
2495                 perror("server: socket");
2496                 goto out;
2497         }
2498
2499         opt = 1;
2500         if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
2501                 perror("setsockopt");
2502                 goto out;
2503         }
2504
2505         memset(&ns->addr, 0, sizeof(ns->addr));
2506         ns->addr.sin_family = AF_INET;
2507         ns->addr.sin_addr.s_addr = htonl(INADDR_ANY);
2508         ns->addr.sin_port = htons(net_port);
2509
2510         if (bind(fd, (struct sockaddr *) &ns->addr, sizeof(ns->addr)) < 0) {
2511                 perror("bind");
2512                 goto out;
2513         }
2514
2515         if (listen(fd, 1) < 0) {
2516                 perror("listen");
2517                 goto out;
2518         }
2519
2520         /*
2521          * The actual server looping is done here:
2522          */
2523         ns->listen_fd = fd;
2524         ret = net_server_handle_connections(ns);
2525
2526         /*
2527          * Clean up and return...
2528          */
2529 out:
2530         free(ns->pfds);
2531         return ret;
2532 }
2533
2534 int main(int argc, char *argv[])
2535 {
2536         int ret = 0;
2537
2538         setlocale(LC_NUMERIC, "en_US");
2539         pagesize = getpagesize();
2540         ncpus = sysconf(_SC_NPROCESSORS_ONLN);
2541         if (ncpus < 0) {
2542                 fprintf(stderr, "sysconf(_SC_NPROCESSORS_ONLN) failed %d/%s\n",
2543                         errno, strerror(errno));
2544                 ret = 1;
2545                 goto out;
2546         }
2547
2548         if (handle_args(argc, argv)) {
2549                 ret = 1;
2550                 goto out;
2551         }
2552
2553         signal(SIGINT, handle_sigint);
2554         signal(SIGHUP, handle_sigint);
2555         signal(SIGTERM, handle_sigint);
2556         signal(SIGALRM, handle_sigint);
2557         signal(SIGPIPE, SIG_IGN);
2558
2559         if (kill_running_trace) {
2560                 struct devpath *dpp;
2561                 struct list_head *p;
2562
2563                 __list_for_each(p, &devpaths) {
2564                         dpp = list_entry(p, struct devpath, head);
2565                         if (__stop_trace(dpp->fd)) {
2566                                 fprintf(stderr,
2567                                         "BLKTRACETEARDOWN %s failed: %d/%s\n",
2568                                         dpp->path, errno, strerror(errno));
2569                         }
2570                 }
2571         } else if (net_mode == Net_server) {
2572                 if (output_name) {
2573                         fprintf(stderr, "-o ignored in server mode\n");
2574                         output_name = NULL;
2575                 }
2576
2577                 ret = net_server();
2578         } else {
2579                 atexit(exit_tracing);
2580
2581                 if (net_mode == Net_client)
2582                         printf("blktrace: connecting to %s\n", hostname);
2583
2584                 setup_buts();
2585
2586                 if (use_tracer_devpaths()) {
2587                         if (setup_tracer_devpaths())
2588                                 goto out;
2589
2590                         if (piped_output)
2591                                 handle_list = handle_list_file;
2592                         else
2593                                 handle_list = handle_list_net;
2594                 }
2595
2596                 start_tracers();
2597                 if (nthreads_running == ncpus) {
2598                         unblock_tracers();
2599                         start_buts();
2600
2601                         if (net_mode == Net_client)
2602                                 printf("blktrace: connected!\n");
2603
2604                         if (stop_watch)
2605                                 alarm(stop_watch);
2606                 } else
2607                         stop_tracers();
2608
2609                 wait_tracers();
2610                 if (nthreads_running == ncpus)
2611                         show_stats(&devpaths);
2612
2613                 if (net_client_use_send())
2614                         close_client_connections();
2615                 del_tracers();
2616         }
2617
2618 out:
2619         if (pfp)
2620                 fclose(pfp);
2621         rel_devpaths();
2622         return ret;
2623 }