blktrace: avoid device duplication
[blktrace.git] / blktrace.c
1 /*
2  * block queue tracing application
3  *
4  * Copyright (C) 2005 Jens Axboe <axboe@suse.de>
5  * Copyright (C) 2006 Jens Axboe <axboe@kernel.dk>
6  *
7  * Rewrite to have a single thread per CPU (managing all devices on that CPU)
8  *      Alan D. Brunelle <alan.brunelle@hp.com> - January 2009
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *
20  *  You should have received a copy of the GNU General Public License
21  *  along with this program; if not, write to the Free Software
22  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
23  *
24  */
25
26 #include <errno.h>
27 #include <stdarg.h>
28 #include <stdio.h>
29 #include <stdlib.h>
30 #include <string.h>
31 #include <fcntl.h>
32 #include <getopt.h>
33 #include <sched.h>
34 #include <unistd.h>
35 #include <poll.h>
36 #include <signal.h>
37 #include <pthread.h>
38 #include <locale.h>
39 #include <sys/ioctl.h>
40 #include <sys/types.h>
41 #include <sys/stat.h>
42 #include <sys/vfs.h>
43 #include <sys/mman.h>
44 #include <sys/param.h>
45 #include <sys/time.h>
46 #include <sys/resource.h>
47 #include <sys/socket.h>
48 #include <netinet/in.h>
49 #include <arpa/inet.h>
50 #include <netdb.h>
51 #include <sys/sendfile.h>
52
53 #include "btt/list.h"
54 #include "blktrace.h"
55
56 /*
57  * You may want to increase this even more, if you are logging at a high
58  * rate and see skipped/missed events
59  */
60 #define BUF_SIZE                (512 * 1024)
61 #define BUF_NR                  (4)
62
63 #define FILE_VBUF_SIZE          (128 * 1024)
64
65 #define DEBUGFS_TYPE            (0x64626720)
66 #define TRACE_NET_PORT          (8462)
67
68 enum {
69         Net_none = 0,
70         Net_server,
71         Net_client,
72 };
73
74 enum thread_status {
75         Th_running,
76         Th_leaving,
77         Th_error
78 };
79
80 /*
81  * Generic stats collected: nevents can be _roughly_ estimated by data_read
82  * (discounting pdu...)
83  *
84  * These fields are updated w/ pdc_dr_update & pdc_nev_update below.
85  */
86 struct pdc_stats {
87         unsigned long long data_read;
88         unsigned long long nevents;
89 };
90
91 struct devpath {
92         struct list_head head;
93         char *path;                     /* path to device special file */
94         char *buts_name;                /* name returned from bt kernel code */
95         struct pdc_stats *stats;
96         int fd, idx, ncpus;
97         unsigned long long drops;
98
99         /*
100          * For piped output only:
101          *
102          * Each tracer will have a tracer_devpath_head that it will add new
103          * data onto. It's list is protected above (tracer_devpath_head.mutex)
104          * and it will signal the processing thread using the dp_cond,
105          * dp_mutex & dp_entries variables above.
106          */
107         struct tracer_devpath_head *heads;
108
109         /*
110          * For network server mode only:
111          */
112         struct cl_host *ch;
113         u32 cl_id;
114         time_t cl_connect_time;
115         struct io_info *ios;
116 };
117
118 /*
119  * For piped output to stdout we will have each tracer thread (one per dev)
120  * tack buffers read from the relay queues on a per-device list.
121  *
122  * The main thread will then collect trace buffers from each of lists in turn.
123  *
124  * We will use a mutex to guard each of the trace_buf list. The tracers
125  * can then signal the main thread using <dp_cond,dp_mutex> and
126  * dp_entries. (When dp_entries is 0, and a tracer adds an entry it will
127  * signal. When dp_entries is 0, the main thread will wait for that condition
128  * to be signalled.)
129  *
130  * adb: It may be better just to have a large buffer per tracer per dev,
131  * and then use it as a ring-buffer. This would certainly cut down a lot
132  * of malloc/free thrashing, at the cost of more memory movements (potentially).
133  */
134 struct trace_buf {
135         struct list_head head;
136         struct devpath *dpp;
137         void *buf;
138         int cpu, len;
139 };
140
141 struct tracer_devpath_head {
142         pthread_mutex_t mutex;
143         struct list_head head;
144         struct trace_buf *prev;
145 };
146
147 /*
148  * Used to handle the mmap() interfaces for output file (containing traces)
149  */
150 struct mmap_info {
151         void *fs_buf;
152         unsigned long long fs_size, fs_max_size, fs_off, fs_buf_len;
153         unsigned long buf_size, buf_nr;
154         int pagesize;
155 };
156
157 /*
158  * Each thread doing work on a (client) side of blktrace will have one
159  * of these. The ios array contains input/output information, pfds holds
160  * poll() data. The volatile's provide flags to/from the main executing
161  * thread.
162  */
163 struct tracer {
164         struct list_head head;
165         struct io_info *ios;
166         struct pollfd *pfds;
167         pthread_t thread;
168         int cpu, nios;
169         volatile int status, is_done;
170 };
171
172 /*
173  * networking stuff follows. we include a magic number so we know whether
174  * to endianness convert or not.
175  *
176  * The len field is overloaded:
177  *      0 - Indicates an "open" - allowing the server to set up for a dev/cpu
178  *      1 - Indicates a "close" - Shut down connection orderly
179  *
180  * The cpu field is overloaded on close: it will contain the number of drops.
181  */
182 struct blktrace_net_hdr {
183         u32 magic;              /* same as trace magic */
184         char buts_name[32];     /* trace name */
185         u32 cpu;                /* for which cpu */
186         u32 max_cpus;
187         u32 len;                /* length of following trace data */
188         u32 cl_id;              /* id for set of client per-cpu connections */
189         u32 buf_size;           /* client buf_size for this trace  */
190         u32 buf_nr;             /* client buf_nr for this trace  */
191         u32 page_size;          /* client page_size for this trace  */
192 };
193
194 /*
195  * Each host encountered has one of these. The head is used to link this
196  * on to the network server's ch_list. Connections associated with this
197  * host are linked on conn_list, and any devices traced on that host
198  * are connected on the devpaths list.
199  */
200 struct cl_host {
201         struct list_head head;
202         struct list_head conn_list;
203         struct list_head devpaths;
204         struct net_server_s *ns;
205         char *hostname;
206         struct in_addr cl_in_addr;
207         int connects, ndevs, cl_opens;
208 };
209
210 /*
211  * Each connection (client to server socket ('fd')) has one of these. A
212  * back reference to the host ('ch'), and lists headers (for the host
213  * list, and the network server conn_list) are also included.
214  */
215 struct cl_conn {
216         struct list_head ch_head, ns_head;
217         struct cl_host *ch;
218         int fd, ncpus;
219         time_t connect_time;
220 };
221
222 /*
223  * The network server requires some poll structures to be maintained -
224  * one per conection currently on conn_list. The nchs/ch_list values
225  * are for each host connected to this server. The addr field is used
226  * for scratch as new connections are established.
227  */
228 struct net_server_s {
229         struct list_head conn_list;
230         struct list_head ch_list;
231         struct pollfd *pfds;
232         int listen_fd, connects, nchs;
233         struct sockaddr_in addr;
234 };
235
236 /*
237  * This structure is (generically) used to providide information
238  * for a read-to-write set of values.
239  *
240  * ifn & ifd represent input information
241  *
242  * ofn, ofd, ofp, obuf & mmap_info are used for output file (optionally).
243  */
244 struct io_info {
245         struct devpath *dpp;
246         FILE *ofp;
247         char *obuf;
248         struct cl_conn *nc;     /* Server network connection */
249
250         /*
251          * mmap controlled output files
252          */
253         struct mmap_info mmap_info;
254
255         /*
256          * Client network fields
257          */
258         unsigned int ready;
259         unsigned long long data_queued;
260
261         /*
262          * Input/output file descriptors & names
263          */
264         int ifd, ofd;
265         char ifn[MAXPATHLEN + 64];
266         char ofn[MAXPATHLEN + 64];
267 };
268
269 static char blktrace_version[] = "2.0.0";
270
271 /*
272  * Linkage to blktrace helper routines (trace conversions)
273  */
274 int data_is_native = -1;
275
276 static int ndevs;
277 static int ncpus;
278 static int pagesize;
279 static int act_mask = ~0U;
280 static int kill_running_trace;
281 static int stop_watch;
282 static int piped_output;
283
284 static char *debugfs_path = "/sys/kernel/debug";
285 static char *output_name;
286 static char *output_dir;
287
288 static unsigned long buf_size = BUF_SIZE;
289 static unsigned long buf_nr = BUF_NR;
290
291 static FILE *pfp;
292
293 static LIST_HEAD(devpaths);
294 static LIST_HEAD(tracers);
295
296 static volatile int done;
297
298 /*
299  * tracer threads add entries, the main thread takes them off and processes
300  * them. These protect the dp_entries variable.
301  */
302 static pthread_cond_t dp_cond = PTHREAD_COND_INITIALIZER;
303 static pthread_mutex_t dp_mutex = PTHREAD_MUTEX_INITIALIZER;
304 static volatile int dp_entries;
305
306 /*
307  * These synchronize master / thread interactions.
308  */
309 static pthread_cond_t mt_cond = PTHREAD_COND_INITIALIZER;
310 static pthread_mutex_t mt_mutex = PTHREAD_MUTEX_INITIALIZER;
311 static volatile int nthreads_running;
312 static volatile int nthreads_leaving;
313 static volatile int nthreads_error;
314 static volatile int tracers_run;
315
316 /*
317  * network cmd line params
318  */
319 static struct sockaddr_in hostname_addr;
320 static char hostname[MAXHOSTNAMELEN];
321 static int net_port = TRACE_NET_PORT;
322 static int net_use_sendfile = 1;
323 static int net_mode;
324 static int *cl_fds;
325
326 static int (*handle_pfds)(struct tracer *, int, int);
327 static int (*handle_list)(struct tracer_devpath_head *, struct list_head *);
328
329 #define S_OPTS  "d:a:A:r:o:kw:vVb:n:D:lh:p:sI:"
330 static struct option l_opts[] = {
331         {
332                 .name = "dev",
333                 .has_arg = required_argument,
334                 .flag = NULL,
335                 .val = 'd'
336         },
337         {
338                 .name = "input-devs",
339                 .has_arg = required_argument,
340                 .flag = NULL,
341                 .val = 'I'
342         },
343         {
344                 .name = "act-mask",
345                 .has_arg = required_argument,
346                 .flag = NULL,
347                 .val = 'a'
348         },
349         {
350                 .name = "set-mask",
351                 .has_arg = required_argument,
352                 .flag = NULL,
353                 .val = 'A'
354         },
355         {
356                 .name = "relay",
357                 .has_arg = required_argument,
358                 .flag = NULL,
359                 .val = 'r'
360         },
361         {
362                 .name = "output",
363                 .has_arg = required_argument,
364                 .flag = NULL,
365                 .val = 'o'
366         },
367         {
368                 .name = "kill",
369                 .has_arg = no_argument,
370                 .flag = NULL,
371                 .val = 'k'
372         },
373         {
374                 .name = "stopwatch",
375                 .has_arg = required_argument,
376                 .flag = NULL,
377                 .val = 'w'
378         },
379         {
380                 .name = "version",
381                 .has_arg = no_argument,
382                 .flag = NULL,
383                 .val = 'v'
384         },
385         {
386                 .name = "version",
387                 .has_arg = no_argument,
388                 .flag = NULL,
389                 .val = 'V'
390         },
391         {
392                 .name = "buffer-size",
393                 .has_arg = required_argument,
394                 .flag = NULL,
395                 .val = 'b'
396         },
397         {
398                 .name = "num-sub-buffers",
399                 .has_arg = required_argument,
400                 .flag = NULL,
401                 .val = 'n'
402         },
403         {
404                 .name = "output-dir",
405                 .has_arg = required_argument,
406                 .flag = NULL,
407                 .val = 'D'
408         },
409         {
410                 .name = "listen",
411                 .has_arg = no_argument,
412                 .flag = NULL,
413                 .val = 'l'
414         },
415         {
416                 .name = "host",
417                 .has_arg = required_argument,
418                 .flag = NULL,
419                 .val = 'h'
420         },
421         {
422                 .name = "port",
423                 .has_arg = required_argument,
424                 .flag = NULL,
425                 .val = 'p'
426         },
427         {
428                 .name = "no-sendfile",
429                 .has_arg = no_argument,
430                 .flag = NULL,
431                 .val = 's'
432         },
433         {
434                 .name = NULL,
435         }
436 };
437
438 static char usage_str[] = \
439         "-d <dev> [ -r debugfs path ] [ -o <output> ] [-k ] [ -w time ]\n" \
440         "[ -a action ] [ -A action mask ] [ -I  <devs file> ] [ -v ]\n\n" \
441         "\t-d Use specified device. May also be given last after options\n" \
442         "\t-r Path to mounted debugfs, defaults to /sys/kernel/debug\n" \
443         "\t-o File(s) to send output to\n" \
444         "\t-D Directory to prepend to output file names\n" \
445         "\t-k Kill a running trace\n" \
446         "\t-w Stop after defined time, in seconds\n" \
447         "\t-a Only trace specified actions. See documentation\n" \
448         "\t-A Give trace mask as a single value. See documentation\n" \
449         "\t-b Sub buffer size in KiB\n" \
450         "\t-n Number of sub buffers\n" \
451         "\t-l Run in network listen mode (blktrace server)\n" \
452         "\t-h Run in network client mode, connecting to the given host\n" \
453         "\t-p Network port to use (default 8462)\n" \
454         "\t-s Make the network client NOT use sendfile() to transfer data\n" \
455         "\t-I Add devices found in <devs file>\n" \
456         "\t-V Print program version info\n\n";
457
458 static void clear_events(struct pollfd *pfd)
459 {
460         pfd->events = 0;
461         pfd->revents = 0;
462 }
463
464 static inline int net_client_use_sendfile(void)
465 {
466         return net_mode == Net_client && net_use_sendfile;
467 }
468
469 static inline int net_client_use_send(void)
470 {
471         return net_mode == Net_client && !net_use_sendfile;
472 }
473
474 static inline int use_tracer_devpaths(void)
475 {
476         return piped_output || net_client_use_send();
477 }
478
479 static inline int in_addr_eq(struct in_addr a, struct in_addr b)
480 {
481         return a.s_addr == b.s_addr;
482 }
483
484 static inline void pdc_dr_update(struct devpath *dpp, int cpu, int data_read)
485 {
486         dpp->stats[cpu].data_read += data_read;
487 }
488
489 static inline void pdc_nev_update(struct devpath *dpp, int cpu, int nevents)
490 {
491         dpp->stats[cpu].nevents += nevents;
492 }
493
494 static void show_usage(char *prog)
495 {
496         fprintf(stderr, "Usage: %s %s %s", prog, blktrace_version, usage_str);
497 }
498
499 /*
500  * Create a timespec 'msec' milliseconds into the future
501  */
502 static inline void make_timespec(struct timespec *tsp, long delta_msec)
503 {
504         struct timeval now;
505
506         gettimeofday(&now, NULL);
507         tsp->tv_sec = now.tv_sec;
508         tsp->tv_nsec = 1000L * now.tv_usec;
509
510         tsp->tv_nsec += (delta_msec * 1000000L);
511         if (tsp->tv_nsec > 1000000000L) {
512                 long secs = tsp->tv_nsec / 1000000000L;
513
514                 tsp->tv_sec += secs;
515                 tsp->tv_nsec -= (secs * 1000000000L);
516         }
517 }
518
519 /*
520  * Add a timer to ensure wait ends
521  */
522 static void t_pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
523 {
524         struct timespec ts;
525
526         make_timespec(&ts, 50);
527         pthread_cond_timedwait(cond, mutex, &ts);
528 }
529
530 static void unblock_tracers(void)
531 {
532         pthread_mutex_lock(&mt_mutex);
533         tracers_run = 1;
534         pthread_cond_broadcast(&mt_cond);
535         pthread_mutex_unlock(&mt_mutex);
536 }
537
538 static void tracer_wait_unblock(struct tracer *tp)
539 {
540         pthread_mutex_lock(&mt_mutex);
541         while (!tp->is_done && !tracers_run)
542                 pthread_cond_wait(&mt_cond, &mt_mutex);
543         pthread_mutex_unlock(&mt_mutex);
544 }
545
546 static void tracer_signal_ready(struct tracer *tp,
547                                 enum thread_status th_status,
548                                 int status)
549 {
550         pthread_mutex_lock(&mt_mutex);
551         tp->status = status;
552
553         if (th_status == Th_running)
554                 nthreads_running++;
555         else if (th_status == Th_error)
556                 nthreads_error++;
557         else
558                 nthreads_leaving++;
559
560         pthread_cond_signal(&mt_cond);
561         pthread_mutex_unlock(&mt_mutex);
562 }
563
564 static void wait_tracers_ready(int ncpus_started)
565 {
566         pthread_mutex_lock(&mt_mutex);
567         while ((nthreads_running + nthreads_error) < ncpus_started)
568                 t_pthread_cond_wait(&mt_cond, &mt_mutex);
569         pthread_mutex_unlock(&mt_mutex);
570 }
571
572 static void wait_tracers_leaving(void)
573 {
574         pthread_mutex_lock(&mt_mutex);
575         while (nthreads_leaving < nthreads_running)
576                 t_pthread_cond_wait(&mt_cond, &mt_mutex);
577         pthread_mutex_unlock(&mt_mutex);
578 }
579
580 static void init_mmap_info(struct mmap_info *mip)
581 {
582         mip->buf_size = buf_size;
583         mip->buf_nr = buf_nr;
584         mip->pagesize = pagesize;
585 }
586
587 static void net_close_connection(int *fd)
588 {
589         shutdown(*fd, SHUT_RDWR);
590         close(*fd);
591         *fd = -1;
592 }
593
594 static void dpp_free(struct devpath *dpp)
595 {
596         if (dpp->stats)
597                 free(dpp->stats);
598         if (dpp->ios)
599                 free(dpp->ios);
600         if (dpp->path)
601                 free(dpp->path);
602         if (dpp->buts_name)
603                 free(dpp->buts_name);
604         free(dpp);
605 }
606
607 static int lock_on_cpu(int cpu)
608 {
609         cpu_set_t cpu_mask;
610
611         CPU_ZERO(&cpu_mask);
612         CPU_SET(cpu, &cpu_mask);
613         if (sched_setaffinity(0, sizeof(cpu_mask), &cpu_mask) < 0)
614                 return errno;
615
616         return 0;
617 }
618
619 static int increase_limit(int resource, rlim_t increase)
620 {
621         struct rlimit rlim;
622         int save_errno = errno;
623
624         if (!getrlimit(resource, &rlim)) {
625                 rlim.rlim_cur += increase;
626                 if (rlim.rlim_cur >= rlim.rlim_max)
627                         rlim.rlim_max = rlim.rlim_cur + increase;
628
629                 if (!setrlimit(resource, &rlim))
630                         return 1;
631         }
632
633         errno = save_errno;
634         return 0;
635 }
636
637 static int handle_open_failure(void)
638 {
639         if (errno == ENFILE || errno == EMFILE)
640                 return increase_limit(RLIMIT_NOFILE, 16);
641         return 0;
642 }
643
644 static int handle_mem_failure(size_t length)
645 {
646         if (errno == ENFILE)
647                 return handle_open_failure();
648         else if (errno == ENOMEM)
649                 return increase_limit(RLIMIT_MEMLOCK, 2 * length);
650         return 0;
651 }
652
653 static FILE *my_fopen(const char *path, const char *mode)
654 {
655         FILE *fp;
656
657         do {
658                 fp = fopen(path, mode);
659         } while (fp == NULL && handle_open_failure());
660
661         return fp;
662 }
663
664 static int my_open(const char *path, int flags)
665 {
666         int fd;
667
668         do {
669                 fd = open(path, flags);
670         } while (fd < 0 && handle_open_failure());
671
672         return fd;
673 }
674
675 static int my_socket(int domain, int type, int protocol)
676 {
677         int fd;
678
679         do {
680                 fd = socket(domain, type, protocol);
681         } while (fd < 0 && handle_open_failure());
682
683         return fd;
684 }
685
686 static int my_accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen)
687 {
688         int fd;
689
690         do {
691                 fd = accept(sockfd, addr, addrlen);
692         } while (fd < 0 && handle_open_failure());
693
694         return fd;
695 }
696
697 static void *my_mmap(void *addr, size_t length, int prot, int flags, int fd,
698                      off_t offset)
699 {
700         void *new;
701
702         do {
703                 new = mmap(addr, length, prot, flags, fd, offset);
704         } while (new == MAP_FAILED && handle_mem_failure(length));
705
706         return new;
707 }
708
709 static int my_mlock(const void *addr, size_t len)
710 {
711         int ret;
712
713         do {
714                 ret = mlock(addr, len);
715         } while (ret < 0 && handle_mem_failure(len));
716
717         return ret;
718 }
719
720 static int setup_mmap(int fd, unsigned int maxlen, struct mmap_info *mip)
721 {
722         if (mip->fs_off + maxlen > mip->fs_buf_len) {
723                 unsigned long nr = max(16, mip->buf_nr);
724
725                 if (mip->fs_buf) {
726                         munlock(mip->fs_buf, mip->fs_buf_len);
727                         munmap(mip->fs_buf, mip->fs_buf_len);
728                         mip->fs_buf = NULL;
729                 }
730
731                 mip->fs_off = mip->fs_size & (mip->pagesize - 1);
732                 mip->fs_buf_len = (nr * mip->buf_size) - mip->fs_off;
733                 mip->fs_max_size += mip->fs_buf_len;
734
735                 if (ftruncate(fd, mip->fs_max_size) < 0) {
736                         perror("setup_mmap: ftruncate");
737                         return 1;
738                 }
739
740                 mip->fs_buf = my_mmap(NULL, mip->fs_buf_len, PROT_WRITE,
741                                       MAP_SHARED, fd,
742                                       mip->fs_size - mip->fs_off);
743                 if (mip->fs_buf == MAP_FAILED) {
744                         perror("setup_mmap: mmap");
745                         return 1;
746                 }
747                 my_mlock(mip->fs_buf, mip->fs_buf_len);
748         }
749
750         return 0;
751 }
752
753 static int __stop_trace(int fd)
754 {
755         /*
756          * Should be stopped, don't complain if it isn't
757          */
758         ioctl(fd, BLKTRACESTOP);
759         return ioctl(fd, BLKTRACETEARDOWN);
760 }
761
762 static int write_data(char *buf, int len)
763 {
764         int ret;
765
766 rewrite:
767         ret = fwrite(buf, len, 1, pfp);
768         if (ferror(pfp) || ret != 1) {
769                 if (errno == EINTR) {
770                         clearerr(pfp);
771                         goto rewrite;
772                 }
773
774                 if (!piped_output || (errno != EPIPE && errno != EBADF)) {
775                         fprintf(stderr, "write(%d) failed: %d/%s\n",
776                                 len, errno, strerror(errno));
777                 }
778                 goto err;
779         }
780
781         fflush(pfp);
782         return 0;
783
784 err:
785         clearerr(pfp);
786         return 1;
787 }
788
789 /*
790  * Returns the number of bytes read (successfully)
791  */
792 static int __net_recv_data(int fd, void *buf, unsigned int len)
793 {
794         unsigned int bytes_left = len;
795
796         while (bytes_left && !done) {
797                 int ret = recv(fd, buf, bytes_left, MSG_WAITALL);
798
799                 if (ret == 0)
800                         break;
801                 else if (ret < 0) {
802                         if (errno == EAGAIN) {
803                                 usleep(50);
804                                 continue;
805                         }
806                         perror("server: net_recv_data: recv failed");
807                         break;
808                 } else {
809                         buf += ret;
810                         bytes_left -= ret;
811                 }
812         }
813
814         return len - bytes_left;
815 }
816
817 static int net_recv_data(int fd, void *buf, unsigned int len)
818 {
819         return __net_recv_data(fd, buf, len);
820 }
821
822 /*
823  * Returns number of bytes written
824  */
825 static int net_send_data(int fd, void *buf, unsigned int buf_len)
826 {
827         int ret;
828         unsigned int bytes_left = buf_len;
829
830         while (bytes_left) {
831                 ret = send(fd, buf, bytes_left, 0);
832                 if (ret < 0) {
833                         perror("send");
834                         break;
835                 }
836
837                 buf += ret;
838                 bytes_left -= ret;
839         }
840
841         return buf_len - bytes_left;
842 }
843
844 static int net_send_header(int fd, int cpu, char *buts_name, int len)
845 {
846         struct blktrace_net_hdr hdr;
847
848         memset(&hdr, 0, sizeof(hdr));
849
850         hdr.magic = BLK_IO_TRACE_MAGIC;
851         strncpy(hdr.buts_name, buts_name, sizeof(hdr.buts_name));
852         hdr.buts_name[sizeof(hdr.buts_name)-1] = '\0';
853         hdr.cpu = cpu;
854         hdr.max_cpus = ncpus;
855         hdr.len = len;
856         hdr.cl_id = getpid();
857         hdr.buf_size = buf_size;
858         hdr.buf_nr = buf_nr;
859         hdr.page_size = pagesize;
860
861         return net_send_data(fd, &hdr, sizeof(hdr)) != sizeof(hdr);
862 }
863
864 static void net_send_open_close(int fd, int cpu, char *buts_name, int len)
865 {
866         struct blktrace_net_hdr ret_hdr;
867
868         net_send_header(fd, cpu, buts_name, len);
869         net_recv_data(fd, &ret_hdr, sizeof(ret_hdr));
870 }
871
872 static void net_send_open(int fd, int cpu, char *buts_name)
873 {
874         net_send_open_close(fd, cpu, buts_name, 0);
875 }
876
877 static void net_send_close(int fd, char *buts_name, int drops)
878 {
879         /*
880          * Overload CPU w/ number of drops
881          *
882          * XXX: Need to clear/set done around call - done=1 (which
883          * is true here) stops reads from happening... :-(
884          */
885         done = 0;
886         net_send_open_close(fd, drops, buts_name, 1);
887         done = 1;
888 }
889
890 static void ack_open_close(int fd, char *buts_name)
891 {
892         net_send_header(fd, 0, buts_name, 2);
893 }
894
895 static void net_send_drops(int fd)
896 {
897         struct list_head *p;
898
899         __list_for_each(p, &devpaths) {
900                 struct devpath *dpp = list_entry(p, struct devpath, head);
901
902                 net_send_close(fd, dpp->buts_name, dpp->drops);
903         }
904 }
905
906 /*
907  * Returns:
908  *       0: "EOF"
909  *       1: OK
910  *      -1: Error
911  */
912 static int net_get_header(struct cl_conn *nc, struct blktrace_net_hdr *bnh)
913 {
914         int bytes_read;
915         int fl = fcntl(nc->fd, F_GETFL);
916
917         fcntl(nc->fd, F_SETFL, fl | O_NONBLOCK);
918         bytes_read = __net_recv_data(nc->fd, bnh, sizeof(*bnh));
919         fcntl(nc->fd, F_SETFL, fl & ~O_NONBLOCK);
920
921         if (bytes_read == sizeof(*bnh))
922                 return 1;
923         else if (bytes_read == 0)
924                 return 0;
925         else
926                 return -1;
927 }
928
929 static int net_setup_addr(void)
930 {
931         struct sockaddr_in *addr = &hostname_addr;
932
933         memset(addr, 0, sizeof(*addr));
934         addr->sin_family = AF_INET;
935         addr->sin_port = htons(net_port);
936
937         if (inet_aton(hostname, &addr->sin_addr) != 1) {
938                 struct hostent *hent;
939 retry:
940                 hent = gethostbyname(hostname);
941                 if (!hent) {
942                         if (h_errno == TRY_AGAIN) {
943                                 usleep(100);
944                                 goto retry;
945                         } else if (h_errno == NO_RECOVERY) {
946                                 fprintf(stderr, "gethostbyname(%s)"
947                                         "non-recoverable error encountered\n",
948                                         hostname);
949                         } else {
950                                 /*
951                                  * HOST_NOT_FOUND, NO_ADDRESS or NO_DATA
952                                  */
953                                 fprintf(stderr, "Host %s not found\n",
954                                         hostname);
955                         }
956                         return 1;
957                 }
958
959                 memcpy(&addr->sin_addr, hent->h_addr, 4);
960                 strcpy(hostname, hent->h_name);
961         }
962
963         return 0;
964 }
965
966 static int net_setup_client(void)
967 {
968         int fd;
969         struct sockaddr_in *addr = &hostname_addr;
970
971         fd = my_socket(AF_INET, SOCK_STREAM, 0);
972         if (fd < 0) {
973                 perror("client: socket");
974                 return -1;
975         }
976
977         if (connect(fd, (struct sockaddr *)addr, sizeof(*addr)) < 0) {
978                 if (errno == ECONNREFUSED)
979                         fprintf(stderr,
980                                 "\nclient: Connection to %s refused, "
981                                 "perhaps the server is not started?\n\n",
982                                 hostname);
983                 else
984                         perror("client: connect");
985
986                 close(fd);
987                 return -1;
988         }
989
990         return fd;
991 }
992
993 static int open_client_connections(void)
994 {
995         int cpu;
996
997         cl_fds = calloc(ncpus, sizeof(*cl_fds));
998         for (cpu = 0; cpu < ncpus; cpu++) {
999                 cl_fds[cpu] = net_setup_client();
1000                 if (cl_fds[cpu] < 0)
1001                         goto err;
1002         }
1003         return 0;
1004
1005 err:
1006         while (cpu > 0)
1007                 close(cl_fds[cpu--]);
1008         free(cl_fds);
1009         return 1;
1010 }
1011
1012 static void close_client_connections(void)
1013 {
1014         if (cl_fds) {
1015                 int cpu, *fdp;
1016
1017                 for (cpu = 0, fdp = cl_fds; cpu < ncpus; cpu++, fdp++) {
1018                         if (*fdp >= 0) {
1019                                 net_send_drops(*fdp);
1020                                 net_close_connection(fdp);
1021                         }
1022                 }
1023                 free(cl_fds);
1024         }
1025 }
1026
1027 static void setup_buts(void)
1028 {
1029         struct list_head *p;
1030
1031         __list_for_each(p, &devpaths) {
1032                 struct blk_user_trace_setup buts;
1033                 struct devpath *dpp = list_entry(p, struct devpath, head);
1034
1035                 memset(&buts, 0, sizeof(buts));
1036                 buts.buf_size = buf_size;
1037                 buts.buf_nr = buf_nr;
1038                 buts.act_mask = act_mask;
1039
1040                 if (ioctl(dpp->fd, BLKTRACESETUP, &buts) >= 0) {
1041                         dpp->ncpus = ncpus;
1042                         dpp->buts_name = strdup(buts.name);
1043                         if (dpp->stats)
1044                                 free(dpp->stats);
1045                         dpp->stats = calloc(dpp->ncpus, sizeof(*dpp->stats));
1046                         memset(dpp->stats, 0, dpp->ncpus * sizeof(*dpp->stats));
1047                 } else
1048                         fprintf(stderr, "BLKTRACESETUP(2) %s failed: %d/%s\n",
1049                                 dpp->path, errno, strerror(errno));
1050         }
1051 }
1052
1053 static void start_buts(void)
1054 {
1055         struct list_head *p;
1056
1057         __list_for_each(p, &devpaths) {
1058                 struct devpath *dpp = list_entry(p, struct devpath, head);
1059
1060                 if (ioctl(dpp->fd, BLKTRACESTART) < 0) {
1061                         fprintf(stderr, "BLKTRACESTART %s failed: %d/%s\n",
1062                                 dpp->path, errno, strerror(errno));
1063                 }
1064         }
1065 }
1066
1067 static int get_drops(struct devpath *dpp)
1068 {
1069         int fd, drops = 0;
1070         char fn[MAXPATHLEN + 64], tmp[256];
1071
1072         snprintf(fn, sizeof(fn), "%s/block/%s/dropped", debugfs_path,
1073                  dpp->buts_name);
1074
1075         fd = my_open(fn, O_RDONLY);
1076         if (fd < 0) {
1077                 /*
1078                  * This may be ok: the kernel may not support
1079                  * dropped counts.
1080                  */
1081                 if (errno != ENOENT)
1082                         fprintf(stderr, "Could not open %s: %d/%s\n",
1083                                 fn, errno, strerror(errno));
1084                 return 0;
1085         } else if (read(fd, tmp, sizeof(tmp)) < 0) {
1086                 fprintf(stderr, "Could not read %s: %d/%s\n",
1087                         fn, errno, strerror(errno));
1088         } else
1089                 drops = atoi(tmp);
1090         close(fd);
1091
1092         return drops;
1093 }
1094
1095 static void get_all_drops(void)
1096 {
1097         struct list_head *p;
1098
1099         __list_for_each(p, &devpaths) {
1100                 struct devpath *dpp = list_entry(p, struct devpath, head);
1101
1102                 dpp->drops = get_drops(dpp);
1103         }
1104 }
1105
1106 static inline struct trace_buf *alloc_trace_buf(int cpu, int bufsize)
1107 {
1108         struct trace_buf *tbp;
1109
1110         tbp = malloc(sizeof(*tbp) + bufsize);
1111         INIT_LIST_HEAD(&tbp->head);
1112         tbp->len = 0;
1113         tbp->buf = (void *)(tbp + 1);
1114         tbp->cpu = cpu;
1115         tbp->dpp = NULL;        /* Will be set when tbp is added */
1116
1117         return tbp;
1118 }
1119
1120 static void free_tracer_heads(struct devpath *dpp)
1121 {
1122         int cpu;
1123         struct tracer_devpath_head *hd;
1124
1125         for (cpu = 0, hd = dpp->heads; cpu < ncpus; cpu++, hd++) {
1126                 if (hd->prev)
1127                         free(hd->prev);
1128
1129                 pthread_mutex_destroy(&hd->mutex);
1130         }
1131         free(dpp->heads);
1132 }
1133
1134 static int setup_tracer_devpaths(void)
1135 {
1136         struct list_head *p;
1137
1138         if (net_client_use_send())
1139                 if (open_client_connections())
1140                         return 1;
1141
1142         __list_for_each(p, &devpaths) {
1143                 int cpu;
1144                 struct tracer_devpath_head *hd;
1145                 struct devpath *dpp = list_entry(p, struct devpath, head);
1146
1147                 dpp->heads = calloc(ncpus, sizeof(struct tracer_devpath_head));
1148                 for (cpu = 0, hd = dpp->heads; cpu < ncpus; cpu++, hd++) {
1149                         INIT_LIST_HEAD(&hd->head);
1150                         pthread_mutex_init(&hd->mutex, NULL);
1151                         hd->prev = NULL;
1152                 }
1153         }
1154
1155         return 0;
1156 }
1157
1158 static inline void add_trace_buf(struct devpath *dpp, int cpu,
1159                                                 struct trace_buf **tbpp)
1160 {
1161         struct trace_buf *tbp = *tbpp;
1162         struct tracer_devpath_head *hd = &dpp->heads[cpu];
1163
1164         tbp->dpp = dpp;
1165
1166         pthread_mutex_lock(&hd->mutex);
1167         list_add_tail(&tbp->head, &hd->head);
1168         pthread_mutex_unlock(&hd->mutex);
1169
1170         *tbpp = alloc_trace_buf(cpu, buf_size);
1171 }
1172
1173 static inline void incr_entries(int entries_handled)
1174 {
1175         pthread_mutex_lock(&dp_mutex);
1176         if (dp_entries == 0)
1177                 pthread_cond_signal(&dp_cond);
1178         dp_entries += entries_handled;
1179         pthread_mutex_unlock(&dp_mutex);
1180 }
1181
1182 static void decr_entries(int handled)
1183 {
1184         pthread_mutex_lock(&dp_mutex);
1185         dp_entries -= handled;
1186         pthread_mutex_unlock(&dp_mutex);
1187 }
1188
1189 static int wait_empty_entries(void)
1190 {
1191         pthread_mutex_lock(&dp_mutex);
1192         while (!done && dp_entries == 0)
1193                 t_pthread_cond_wait(&dp_cond, &dp_mutex);
1194         pthread_mutex_unlock(&dp_mutex);
1195
1196         return !done;
1197 }
1198
1199 static int add_devpath(char *path)
1200 {
1201         int fd;
1202         struct devpath *dpp;
1203         struct list_head *p;
1204
1205         /*
1206          * Verify device is not duplicated
1207          */
1208         __list_for_each(p, &devpaths) {
1209                struct devpath *tmp = list_entry(p, struct devpath, head);
1210                if (!strcmp(tmp->path, path))
1211                         return 0;
1212         }
1213         /*
1214          * Verify device is valid before going too far
1215          */
1216         fd = my_open(path, O_RDONLY | O_NONBLOCK);
1217         if (fd < 0) {
1218                 fprintf(stderr, "Invalid path %s specified: %d/%s\n",
1219                         path, errno, strerror(errno));
1220                 return 1;
1221         }
1222
1223         dpp = malloc(sizeof(*dpp));
1224         memset(dpp, 0, sizeof(*dpp));
1225         dpp->path = strdup(path);
1226         dpp->fd = fd;
1227         dpp->idx = ndevs++;
1228         list_add_tail(&dpp->head, &devpaths);
1229
1230         return 0;
1231 }
1232
1233 static void rel_devpaths(void)
1234 {
1235         struct list_head *p, *q;
1236
1237         list_for_each_safe(p, q, &devpaths) {
1238                 struct devpath *dpp = list_entry(p, struct devpath, head);
1239
1240                 list_del(&dpp->head);
1241                 __stop_trace(dpp->fd);
1242                 close(dpp->fd);
1243
1244                 if (dpp->heads)
1245                         free_tracer_heads(dpp);
1246
1247                 dpp_free(dpp);
1248                 ndevs--;
1249         }
1250 }
1251
1252 static int flush_subbuf_net(struct trace_buf *tbp)
1253 {
1254         int fd = cl_fds[tbp->cpu];
1255         struct devpath *dpp = tbp->dpp;
1256
1257         if (net_send_header(fd, tbp->cpu, dpp->buts_name, tbp->len))
1258                 return 1;
1259         else if (net_send_data(fd, tbp->buf, tbp->len) != tbp->len)
1260                 return 1;
1261
1262         return 0;
1263 }
1264
1265 static int
1266 handle_list_net(__attribute__((__unused__))struct tracer_devpath_head *hd,
1267                 struct list_head *list)
1268 {
1269         struct trace_buf *tbp;
1270         struct list_head *p, *q;
1271         int entries_handled = 0;
1272
1273         list_for_each_safe(p, q, list) {
1274                 tbp = list_entry(p, struct trace_buf, head);
1275
1276                 list_del(&tbp->head);
1277                 entries_handled++;
1278
1279                 if (cl_fds[tbp->cpu] >= 0) {
1280                         if (flush_subbuf_net(tbp)) {
1281                                 close(cl_fds[tbp->cpu]);
1282                                 cl_fds[tbp->cpu] = -1;
1283                         }
1284                 }
1285
1286                 free(tbp);
1287         }
1288
1289         return entries_handled;
1290 }
1291
1292 /*
1293  * Tack 'tbp's buf onto the tail of 'prev's buf
1294  */
1295 static struct trace_buf *tb_combine(struct trace_buf *prev,
1296                                     struct trace_buf *tbp)
1297 {
1298         unsigned long tot_len;
1299
1300         tot_len = prev->len + tbp->len;
1301         if (tot_len > buf_size) {
1302                 /*
1303                  * tbp->head isn't connected (it was 'prev'
1304                  * so it had been taken off of the list
1305                  * before). Therefore, we can realloc
1306                  * the whole structures, as the other fields
1307                  * are "static".
1308                  */
1309                 prev = realloc(prev->buf, sizeof(*prev) + tot_len);
1310                 prev->buf = (void *)(prev + 1);
1311         }
1312
1313         memcpy(prev->buf + prev->len, tbp->buf, tbp->len);
1314         prev->len = tot_len;
1315
1316         free(tbp);
1317         return prev;
1318 }
1319
1320 static int handle_list_file(struct tracer_devpath_head *hd,
1321                             struct list_head *list)
1322 {
1323         int off, t_len, nevents;
1324         struct blk_io_trace *t;
1325         struct list_head *p, *q;
1326         int entries_handled = 0;
1327         struct trace_buf *tbp, *prev;
1328
1329         prev = hd->prev;
1330         list_for_each_safe(p, q, list) {
1331                 tbp = list_entry(p, struct trace_buf, head);
1332                 list_del(&tbp->head);
1333                 entries_handled++;
1334
1335                 /*
1336                  * If there was some leftover before, tack this new
1337                  * entry onto the tail of the previous one.
1338                  */
1339                 if (prev)
1340                         tbp = tb_combine(prev, tbp);
1341
1342                 /*
1343                  * See how many whole traces there are - send them
1344                  * all out in one go.
1345                  */
1346                 off = 0;
1347                 nevents = 0;
1348                 while (off + (int)sizeof(*t) <= tbp->len) {
1349                         t = (struct blk_io_trace *)(tbp->buf + off);
1350                         t_len = sizeof(*t) + t->pdu_len;
1351                         if (off + t_len > tbp->len)
1352                                 break;
1353
1354                         off += t_len;
1355                         nevents++;
1356                 }
1357                 if (nevents)
1358                         pdc_nev_update(tbp->dpp, tbp->cpu, nevents);
1359
1360                 /*
1361                  * Write any full set of traces, any remaining data is kept
1362                  * for the next pass.
1363                  */
1364                 if (off) {
1365                         if (write_data(tbp->buf, off) || off == tbp->len) {
1366                                 free(tbp);
1367                                 prev = NULL;
1368                         }
1369                         else {
1370                                 /*
1371                                  * Move valid data to beginning of buffer
1372                                  */
1373                                 tbp->len -= off;
1374                                 memmove(tbp->buf, tbp->buf + off, tbp->len);
1375                                 prev = tbp;
1376                         }
1377                 } else
1378                         prev = tbp;
1379         }
1380         hd->prev = prev;
1381
1382         return entries_handled;
1383 }
1384
1385 static void __process_trace_bufs(void)
1386 {
1387         int cpu;
1388         struct list_head *p;
1389         struct list_head list;
1390         int handled = 0;
1391
1392         __list_for_each(p, &devpaths) {
1393                 struct devpath *dpp = list_entry(p, struct devpath, head);
1394                 struct tracer_devpath_head *hd = dpp->heads;
1395
1396                 for (cpu = 0; cpu < ncpus; cpu++, hd++) {
1397                         pthread_mutex_lock(&hd->mutex);
1398                         if (list_empty(&hd->head)) {
1399                                 pthread_mutex_unlock(&hd->mutex);
1400                                 continue;
1401                         }
1402
1403                         list_replace_init(&hd->head, &list);
1404                         pthread_mutex_unlock(&hd->mutex);
1405
1406                         handled += handle_list(hd, &list);
1407                 }
1408         }
1409
1410         if (handled)
1411                 decr_entries(handled);
1412 }
1413
1414 static void process_trace_bufs(void)
1415 {
1416         while (wait_empty_entries())
1417                 __process_trace_bufs();
1418 }
1419
1420 static void clean_trace_bufs(void)
1421 {
1422         /*
1423          * No mutex needed here: we're only reading from the lists,
1424          * tracers are done
1425          */
1426         while (dp_entries)
1427                 __process_trace_bufs();
1428 }
1429
1430 static inline void read_err(int cpu, char *ifn)
1431 {
1432         if (errno != EAGAIN)
1433                 fprintf(stderr, "Thread %d failed read of %s: %d/%s\n",
1434                         cpu, ifn, errno, strerror(errno));
1435 }
1436
1437 static int net_sendfile(struct io_info *iop)
1438 {
1439         int ret;
1440
1441         ret = sendfile(iop->ofd, iop->ifd, NULL, iop->ready);
1442         if (ret < 0) {
1443                 perror("sendfile");
1444                 return 1;
1445         } else if (ret < (int)iop->ready) {
1446                 fprintf(stderr, "short sendfile send (%d of %d)\n",
1447                         ret, iop->ready);
1448                 return 1;
1449         }
1450
1451         return 0;
1452 }
1453
1454 static inline int net_sendfile_data(struct tracer *tp, struct io_info *iop)
1455 {
1456         struct devpath *dpp = iop->dpp;
1457
1458         if (net_send_header(iop->ofd, tp->cpu, dpp->buts_name, iop->ready))
1459                 return 1;
1460         return net_sendfile(iop);
1461 }
1462
1463 static int fill_ofname(struct io_info *iop, int cpu)
1464 {
1465         int len;
1466         struct stat sb;
1467         char *dst = iop->ofn;
1468
1469         if (output_dir)
1470                 len = snprintf(iop->ofn, sizeof(iop->ofn), "%s/", output_dir);
1471         else
1472                 len = snprintf(iop->ofn, sizeof(iop->ofn), "./");
1473
1474         if (net_mode == Net_server) {
1475                 struct cl_conn *nc = iop->nc;
1476
1477                 len += sprintf(dst + len, "%s-", nc->ch->hostname);
1478                 len += strftime(dst + len, 64, "%F-%T/",
1479                                 gmtime(&iop->dpp->cl_connect_time));
1480         }
1481
1482         if (stat(iop->ofn, &sb) < 0) {
1483                 if (errno != ENOENT) {
1484                         fprintf(stderr,
1485                                 "Destination dir %s stat failed: %d/%s\n",
1486                                 iop->ofn, errno, strerror(errno));
1487                         return 1;
1488                 }
1489                 /*
1490                  * There is no synchronization between multiple threads
1491                  * trying to create the directory at once.  It's harmless
1492                  * to let them try, so just detect the problem and move on.
1493                  */
1494                 if (mkdir(iop->ofn, 0755) < 0 && errno != EEXIST) {
1495                         fprintf(stderr,
1496                                 "Destination dir %s can't be made: %d/%s\n",
1497                                 iop->ofn, errno, strerror(errno));
1498                         return 1;
1499                 }
1500         }
1501
1502         if (output_name)
1503                 snprintf(iop->ofn + len, sizeof(iop->ofn), "%s.blktrace.%d",
1504                          output_name, cpu);
1505         else
1506                 snprintf(iop->ofn + len, sizeof(iop->ofn), "%s.blktrace.%d",
1507                          iop->dpp->buts_name, cpu);
1508
1509         return 0;
1510 }
1511
1512 static int set_vbuf(struct io_info *iop, int mode, size_t size)
1513 {
1514         iop->obuf = malloc(size);
1515         if (setvbuf(iop->ofp, iop->obuf, mode, size) < 0) {
1516                 fprintf(stderr, "setvbuf(%s, %d) failed: %d/%s\n",
1517                         iop->dpp->path, (int)size, errno,
1518                         strerror(errno));
1519                 free(iop->obuf);
1520                 return 1;
1521         }
1522
1523         return 0;
1524 }
1525
1526 static int iop_open(struct io_info *iop, int cpu)
1527 {
1528         iop->ofd = -1;
1529         if (fill_ofname(iop, cpu))
1530                 return 1;
1531
1532         iop->ofp = my_fopen(iop->ofn, "w+");
1533         if (iop->ofp == NULL) {
1534                 fprintf(stderr, "Open output file %s failed: %d/%s\n",
1535                         iop->ofn, errno, strerror(errno));
1536                 return 1;
1537         }
1538
1539         if (set_vbuf(iop, _IOLBF, FILE_VBUF_SIZE)) {
1540                 fprintf(stderr, "set_vbuf for file %s failed: %d/%s\n",
1541                         iop->ofn, errno, strerror(errno));
1542                 fclose(iop->ofp);
1543                 return 1;
1544         }
1545
1546         iop->ofd = fileno(iop->ofp);
1547         return 0;
1548 }
1549
1550 static void close_iop(struct io_info *iop)
1551 {
1552         struct mmap_info *mip = &iop->mmap_info;
1553
1554         if (mip->fs_buf)
1555                 munmap(mip->fs_buf, mip->fs_buf_len);
1556
1557         if (!piped_output) {
1558                 if (ftruncate(fileno(iop->ofp), mip->fs_size) < 0) {
1559                         fprintf(stderr,
1560                                 "Ignoring err: ftruncate(%s): %d/%s\n",
1561                                 iop->ofn, errno, strerror(errno));
1562                 }
1563         }
1564
1565         if (iop->ofp)
1566                 fclose(iop->ofp);
1567         if (iop->obuf)
1568                 free(iop->obuf);
1569 }
1570
1571 static void close_ios(struct tracer *tp)
1572 {
1573         while (tp->nios > 0) {
1574                 struct io_info *iop = &tp->ios[--tp->nios];
1575
1576                 iop->dpp->drops = get_drops(iop->dpp);
1577                 if (iop->ifd >= 0)
1578                         close(iop->ifd);
1579
1580                 if (iop->ofp)
1581                         close_iop(iop);
1582                 else if (iop->ofd >= 0) {
1583                         struct devpath *dpp = iop->dpp;
1584
1585                         net_send_close(iop->ofd, dpp->buts_name, dpp->drops);
1586                         net_close_connection(&iop->ofd);
1587                 }
1588         }
1589
1590         free(tp->ios);
1591         free(tp->pfds);
1592 }
1593
1594 static int open_ios(struct tracer *tp)
1595 {
1596         struct pollfd *pfd;
1597         struct io_info *iop;
1598         struct list_head *p;
1599
1600         tp->ios = calloc(ndevs, sizeof(struct io_info));
1601         memset(tp->ios, 0, ndevs * sizeof(struct io_info));
1602
1603         tp->pfds = calloc(ndevs, sizeof(struct pollfd));
1604         memset(tp->pfds, 0, ndevs * sizeof(struct pollfd));
1605
1606         tp->nios = 0;
1607         iop = tp->ios;
1608         pfd = tp->pfds;
1609         __list_for_each(p, &devpaths) {
1610                 struct devpath *dpp = list_entry(p, struct devpath, head);
1611
1612                 iop->dpp = dpp;
1613                 iop->ofd = -1;
1614                 snprintf(iop->ifn, sizeof(iop->ifn), "%s/block/%s/trace%d",
1615                         debugfs_path, dpp->buts_name, tp->cpu);
1616
1617                 iop->ifd = my_open(iop->ifn, O_RDONLY | O_NONBLOCK);
1618                 if (iop->ifd < 0) {
1619                         fprintf(stderr, "Thread %d failed open %s: %d/%s\n",
1620                                 tp->cpu, iop->ifn, errno, strerror(errno));
1621                         return 1;
1622                 }
1623
1624                 init_mmap_info(&iop->mmap_info);
1625
1626                 pfd->fd = iop->ifd;
1627                 pfd->events = POLLIN;
1628
1629                 if (piped_output)
1630                         ;
1631                 else if (net_client_use_sendfile()) {
1632                         iop->ofd = net_setup_client();
1633                         if (iop->ofd < 0)
1634                                 goto err;
1635                         net_send_open(iop->ofd, tp->cpu, dpp->buts_name);
1636                 } else if (net_mode == Net_none) {
1637                         if (iop_open(iop, tp->cpu))
1638                                 goto err;
1639                 } else {
1640                         /*
1641                          * This ensures that the server knows about all
1642                          * connections & devices before _any_ closes
1643                          */
1644                         net_send_open(cl_fds[tp->cpu], tp->cpu, dpp->buts_name);
1645                 }
1646
1647                 pfd++;
1648                 iop++;
1649                 tp->nios++;
1650         }
1651
1652         return 0;
1653
1654 err:
1655         close(iop->ifd);        /* tp->nios _not_ bumped */
1656         close_ios(tp);
1657         return 1;
1658 }
1659
1660 static int handle_pfds_file(struct tracer *tp, int nevs, int force_read)
1661 {
1662         struct mmap_info *mip;
1663         int i, ret, nentries = 0;
1664         struct pollfd *pfd = tp->pfds;
1665         struct io_info *iop = tp->ios;
1666
1667         for (i = 0; nevs > 0 && i < ndevs; i++, pfd++, iop++) {
1668                 if (pfd->revents & POLLIN || force_read) {
1669                         mip = &iop->mmap_info;
1670
1671                         ret = setup_mmap(iop->ofd, buf_size, mip);
1672                         if (ret < 0) {
1673                                 pfd->events = 0;
1674                                 break;
1675                         }
1676
1677                         ret = read(iop->ifd, mip->fs_buf + mip->fs_off,
1678                                    buf_size);
1679                         if (ret > 0) {
1680                                 pdc_dr_update(iop->dpp, tp->cpu, ret);
1681                                 mip->fs_size += ret;
1682                                 mip->fs_off += ret;
1683                                 nentries++;
1684                         } else if (ret == 0) {
1685                                 /*
1686                                  * Short reads after we're done stop us
1687                                  * from trying reads.
1688                                  */
1689                                 if (tp->is_done)
1690                                         clear_events(pfd);
1691                         } else {
1692                                 read_err(tp->cpu, iop->ifn);
1693                                 if (errno != EAGAIN || tp->is_done)
1694                                         clear_events(pfd);
1695                         }
1696                         nevs--;
1697                 }
1698         }
1699
1700         return nentries;
1701 }
1702
1703 static int handle_pfds_netclient(struct tracer *tp, int nevs, int force_read)
1704 {
1705         struct stat sb;
1706         int i, nentries = 0;
1707         struct pdc_stats *sp;
1708         struct pollfd *pfd = tp->pfds;
1709         struct io_info *iop = tp->ios;
1710
1711         for (i = 0; i < ndevs; i++, pfd++, iop++, sp++) {
1712                 if (pfd->revents & POLLIN || force_read) {
1713                         if (fstat(iop->ifd, &sb) < 0) {
1714                                 perror(iop->ifn);
1715                                 pfd->events = 0;
1716                         } else if (sb.st_size > (off_t)iop->data_queued) {
1717                                 iop->ready = sb.st_size - iop->data_queued;
1718                                 iop->data_queued = sb.st_size;
1719
1720                                 if (!net_sendfile_data(tp, iop)) {
1721                                         pdc_dr_update(iop->dpp, tp->cpu,
1722                                                       iop->ready);
1723                                         nentries++;
1724                                 } else
1725                                         clear_events(pfd);
1726                         }
1727                         if (--nevs == 0)
1728                                 break;
1729                 }
1730         }
1731
1732         if (nentries)
1733                 incr_entries(nentries);
1734
1735         return nentries;
1736 }
1737
1738 static int handle_pfds_entries(struct tracer *tp, int nevs, int force_read)
1739 {
1740         int i, nentries = 0;
1741         struct trace_buf *tbp;
1742         struct pollfd *pfd = tp->pfds;
1743         struct io_info *iop = tp->ios;
1744
1745         tbp = alloc_trace_buf(tp->cpu, buf_size);
1746         for (i = 0; i < ndevs; i++, pfd++, iop++) {
1747                 if (pfd->revents & POLLIN || force_read) {
1748                         tbp->len = read(iop->ifd, tbp->buf, buf_size);
1749                         if (tbp->len > 0) {
1750                                 pdc_dr_update(iop->dpp, tp->cpu, tbp->len);
1751                                 add_trace_buf(iop->dpp, tp->cpu, &tbp);
1752                                 nentries++;
1753                         } else if (tbp->len == 0) {
1754                                 /*
1755                                  * Short reads after we're done stop us
1756                                  * from trying reads.
1757                                  */
1758                                 if (tp->is_done)
1759                                         clear_events(pfd);
1760                         } else {
1761                                 read_err(tp->cpu, iop->ifn);
1762                                 if (errno != EAGAIN || tp->is_done)
1763                                         clear_events(pfd);
1764                         }
1765                         if (!piped_output && --nevs == 0)
1766                                 break;
1767                 }
1768         }
1769         free(tbp);
1770
1771         if (nentries)
1772                 incr_entries(nentries);
1773
1774         return nentries;
1775 }
1776
1777 static void *thread_main(void *arg)
1778 {
1779         int ret, ndone, to_val;
1780         struct tracer *tp = arg;
1781
1782         ret = lock_on_cpu(tp->cpu);
1783         if (ret)
1784                 goto err;
1785
1786         ret = open_ios(tp);
1787         if (ret)
1788                 goto err;
1789
1790         if (piped_output)
1791                 to_val = 50;            /* Frequent partial handles */
1792         else
1793                 to_val = 500;           /* 1/2 second intervals */
1794
1795
1796         tracer_signal_ready(tp, Th_running, 0);
1797         tracer_wait_unblock(tp);
1798
1799         while (!tp->is_done) {
1800                 ndone = poll(tp->pfds, ndevs, to_val);
1801                 if (ndone || piped_output)
1802                         (void)handle_pfds(tp, ndone, piped_output);
1803                 else if (ndone < 0 && errno != EINTR)
1804                         fprintf(stderr, "Thread %d poll failed: %d/%s\n",
1805                                 tp->cpu, errno, strerror(errno));
1806         }
1807
1808         /*
1809          * Trace is stopped, pull data until we get a short read
1810          */
1811         while (handle_pfds(tp, ndevs, 1) > 0)
1812                 ;
1813
1814         close_ios(tp);
1815         tracer_signal_ready(tp, Th_leaving, 0);
1816         return NULL;
1817
1818 err:
1819         tracer_signal_ready(tp, Th_error, ret);
1820         return NULL;
1821 }
1822
1823 static int start_tracer(int cpu)
1824 {
1825         struct tracer *tp;
1826
1827         tp = malloc(sizeof(*tp));
1828         memset(tp, 0, sizeof(*tp));
1829
1830         INIT_LIST_HEAD(&tp->head);
1831         tp->status = 0;
1832         tp->cpu = cpu;
1833
1834         if (pthread_create(&tp->thread, NULL, thread_main, tp)) {
1835                 fprintf(stderr, "FAILED to start thread on CPU %d: %d/%s\n",
1836                         cpu, errno, strerror(errno));
1837                 free(tp);
1838                 return 1;
1839         }
1840
1841         list_add_tail(&tp->head, &tracers);
1842         return 0;
1843 }
1844
1845 static void start_tracers(void)
1846 {
1847         int cpu;
1848         struct list_head *p;
1849
1850         for (cpu = 0; cpu < ncpus; cpu++)
1851                 if (start_tracer(cpu))
1852                         break;
1853
1854         wait_tracers_ready(cpu);
1855
1856         __list_for_each(p, &tracers) {
1857                 struct tracer *tp = list_entry(p, struct tracer, head);
1858                 if (tp->status)
1859                         fprintf(stderr,
1860                                 "FAILED to start thread on CPU %d: %d/%s\n",
1861                                 tp->cpu, tp->status, strerror(tp->status));
1862         }
1863 }
1864
1865 static void stop_tracers(void)
1866 {
1867         struct list_head *p;
1868
1869         /*
1870          * Stop the tracing - makes the tracer threads clean up quicker.
1871          */
1872         __list_for_each(p, &devpaths) {
1873                 struct devpath *dpp = list_entry(p, struct devpath, head);
1874                 (void)ioctl(dpp->fd, BLKTRACESTOP);
1875         }
1876
1877         /*
1878          * Tell each tracer to quit
1879          */
1880         __list_for_each(p, &tracers) {
1881                 struct tracer *tp = list_entry(p, struct tracer, head);
1882                 tp->is_done = 1;
1883         }
1884 }
1885
1886 static void del_tracers(void)
1887 {
1888         struct list_head *p, *q;
1889
1890         list_for_each_safe(p, q, &tracers) {
1891                 struct tracer *tp = list_entry(p, struct tracer, head);
1892
1893                 list_del(&tp->head);
1894                 free(tp);
1895         }
1896 }
1897
1898 static void wait_tracers(void)
1899 {
1900         struct list_head *p;
1901
1902         if (use_tracer_devpaths())
1903                 process_trace_bufs();
1904
1905         wait_tracers_leaving();
1906
1907         __list_for_each(p, &tracers) {
1908                 int ret;
1909                 struct tracer *tp = list_entry(p, struct tracer, head);
1910
1911                 ret = pthread_join(tp->thread, NULL);
1912                 if (ret)
1913                         fprintf(stderr, "Thread join %d failed %d\n",
1914                                 tp->cpu, ret);
1915         }
1916
1917         if (use_tracer_devpaths())
1918                 clean_trace_bufs();
1919
1920         get_all_drops();
1921 }
1922
1923 static void exit_tracing(void)
1924 {
1925         signal(SIGINT, SIG_IGN);
1926         signal(SIGHUP, SIG_IGN);
1927         signal(SIGTERM, SIG_IGN);
1928         signal(SIGALRM, SIG_IGN);
1929
1930         stop_tracers();
1931         wait_tracers();
1932         del_tracers();
1933         rel_devpaths();
1934 }
1935
1936 static void handle_sigint(__attribute__((__unused__)) int sig)
1937 {
1938         done = 1;
1939         stop_tracers();
1940 }
1941
1942 static void show_stats(struct list_head *devpaths)
1943 {
1944         FILE *ofp;
1945         struct list_head *p;
1946         unsigned long long nevents, data_read;
1947         unsigned long long total_drops = 0;
1948         unsigned long long total_events = 0;
1949
1950         if (piped_output)
1951                 ofp = my_fopen("/dev/null", "w");
1952         else
1953                 ofp = stdout;
1954
1955         __list_for_each(p, devpaths) {
1956                 int cpu;
1957                 struct pdc_stats *sp;
1958                 struct devpath *dpp = list_entry(p, struct devpath, head);
1959
1960                 if (net_mode == Net_server)
1961                         printf("server: end of run for %s:%s\n",
1962                                 dpp->ch->hostname, dpp->buts_name);
1963
1964                 data_read = 0;
1965                 nevents = 0;
1966
1967                 fprintf(ofp, "=== %s ===\n", dpp->buts_name);
1968                 for (cpu = 0, sp = dpp->stats; cpu < dpp->ncpus; cpu++, sp++) {
1969                         /*
1970                          * Estimate events if not known...
1971                          */
1972                         if (sp->nevents == 0) {
1973                                 sp->nevents = sp->data_read /
1974                                                 sizeof(struct blk_io_trace);
1975                         }
1976
1977                         fprintf(ofp,
1978                                 "  CPU%3d: %20llu events, %8llu KiB data\n",
1979                                 cpu, sp->nevents, (sp->data_read + 1023) >> 10);
1980
1981                         data_read += sp->data_read;
1982                         nevents += sp->nevents;
1983                 }
1984
1985                 fprintf(ofp, "  Total:  %20llu events (dropped %llu),"
1986                              " %8llu KiB data\n", nevents,
1987                              dpp->drops, (data_read + 1024) >> 10);
1988
1989                 total_drops += dpp->drops;
1990                 total_events += (nevents + dpp->drops);
1991         }
1992
1993         fflush(ofp);
1994         if (piped_output)
1995                 fclose(ofp);
1996
1997         if (total_drops) {
1998                 double drops_ratio = 1.0;
1999
2000                 if (total_events)
2001                         drops_ratio = (double)total_drops/(double)total_events;
2002
2003                 fprintf(stderr, "\nYou have %llu (%5.1lf%%) dropped events\n"
2004                                 "Consider using a larger buffer size (-b) "
2005                                 "and/or more buffers (-n)\n",
2006                         total_drops, 100.0 * drops_ratio);
2007         }
2008 }
2009
2010 static int handle_args(int argc, char *argv[])
2011 {
2012         int c, i;
2013         struct statfs st;
2014         int act_mask_tmp = 0;
2015
2016         while ((c = getopt_long(argc, argv, S_OPTS, l_opts, NULL)) >= 0) {
2017                 switch (c) {
2018                 case 'a':
2019                         i = find_mask_map(optarg);
2020                         if (i < 0) {
2021                                 fprintf(stderr, "Invalid action mask %s\n",
2022                                         optarg);
2023                                 return 1;
2024                         }
2025                         act_mask_tmp |= i;
2026                         break;
2027
2028                 case 'A':
2029                         if ((sscanf(optarg, "%x", &i) != 1) ||
2030                                                         !valid_act_opt(i)) {
2031                                 fprintf(stderr,
2032                                         "Invalid set action mask %s/0x%x\n",
2033                                         optarg, i);
2034                                 return 1;
2035                         }
2036                         act_mask_tmp = i;
2037                         break;
2038
2039                 case 'd':
2040                         if (add_devpath(optarg) != 0)
2041                                 return 1;
2042                         break;
2043
2044                 case 'I': {
2045                         char dev_line[256];
2046                         FILE *ifp = my_fopen(optarg, "r");
2047
2048                         if (!ifp) {
2049                                 fprintf(stderr,
2050                                         "Invalid file for devices %s\n",
2051                                         optarg);
2052                                 return 1;
2053                         }
2054
2055                         while (fscanf(ifp, "%s\n", dev_line) == 1)
2056                                 if (add_devpath(dev_line) != 0)
2057                                         return 1;
2058                         break;
2059                 }
2060
2061                 case 'r':
2062                         debugfs_path = optarg;
2063                         break;
2064
2065                 case 'o':
2066                         output_name = optarg;
2067                         break;
2068                 case 'k':
2069                         kill_running_trace = 1;
2070                         break;
2071                 case 'w':
2072                         stop_watch = atoi(optarg);
2073                         if (stop_watch <= 0) {
2074                                 fprintf(stderr,
2075                                         "Invalid stopwatch value (%d secs)\n",
2076                                         stop_watch);
2077                                 return 1;
2078                         }
2079                         break;
2080                 case 'V':
2081                 case 'v':
2082                         printf("%s version %s\n", argv[0], blktrace_version);
2083                         exit(0);
2084                         /*NOTREACHED*/
2085                 case 'b':
2086                         buf_size = strtoul(optarg, NULL, 10);
2087                         if (buf_size <= 0 || buf_size > 16*1024) {
2088                                 fprintf(stderr, "Invalid buffer size (%lu)\n",
2089                                         buf_size);
2090                                 return 1;
2091                         }
2092                         buf_size <<= 10;
2093                         break;
2094                 case 'n':
2095                         buf_nr = strtoul(optarg, NULL, 10);
2096                         if (buf_nr <= 0) {
2097                                 fprintf(stderr,
2098                                         "Invalid buffer nr (%lu)\n", buf_nr);
2099                                 return 1;
2100                         }
2101                         break;
2102                 case 'D':
2103                         output_dir = optarg;
2104                         break;
2105                 case 'h':
2106                         net_mode = Net_client;
2107                         strcpy(hostname, optarg);
2108                         break;
2109                 case 'l':
2110                         net_mode = Net_server;
2111                         break;
2112                 case 'p':
2113                         net_port = atoi(optarg);
2114                         break;
2115                 case 's':
2116                         net_use_sendfile = 0;
2117                         break;
2118                 default:
2119                         show_usage(argv[0]);
2120                         exit(1);
2121                         /*NOTREACHED*/
2122                 }
2123         }
2124
2125         while (optind < argc)
2126                 if (add_devpath(argv[optind++]) != 0)
2127                         return 1;
2128
2129         if (net_mode != Net_server && ndevs == 0) {
2130                 show_usage(argv[0]);
2131                 return 1;
2132         }
2133
2134         if (statfs(debugfs_path, &st) < 0 || st.f_type != (long)DEBUGFS_TYPE) {
2135                 fprintf(stderr, "Invalid debug path %s: %d/%s\n",
2136                         debugfs_path, errno, strerror(errno));
2137                 return 1;
2138         }
2139
2140         if (act_mask_tmp != 0)
2141                 act_mask = act_mask_tmp;
2142
2143         if (net_mode == Net_client && net_setup_addr())
2144                 return 1;
2145
2146         /*
2147          * Set up for appropriate PFD handler based upon output name.
2148          */
2149         if (net_client_use_sendfile())
2150                 handle_pfds = handle_pfds_netclient;
2151         else if (net_client_use_send())
2152                 handle_pfds = handle_pfds_entries;
2153         else if (output_name && (strcmp(output_name, "-") == 0)) {
2154                 piped_output = 1;
2155                 handle_pfds = handle_pfds_entries;
2156                 pfp = stdout;
2157                 setvbuf(pfp, NULL, _IONBF, 0);
2158         } else
2159                 handle_pfds = handle_pfds_file;
2160         return 0;
2161 }
2162
2163 static void ch_add_connection(struct net_server_s *ns, struct cl_host *ch,
2164                               int fd)
2165 {
2166         struct cl_conn *nc;
2167
2168         nc = malloc(sizeof(*nc));
2169         memset(nc, 0, sizeof(*nc));
2170
2171         time(&nc->connect_time);
2172         nc->ch = ch;
2173         nc->fd = fd;
2174         nc->ncpus = -1;
2175
2176         list_add_tail(&nc->ch_head, &ch->conn_list);
2177         ch->connects++;
2178
2179         list_add_tail(&nc->ns_head, &ns->conn_list);
2180         ns->connects++;
2181         ns->pfds = realloc(ns->pfds, (ns->connects+1) * sizeof(struct pollfd));
2182 }
2183
2184 static void ch_rem_connection(struct net_server_s *ns, struct cl_host *ch,
2185                               struct cl_conn *nc)
2186 {
2187         net_close_connection(&nc->fd);
2188
2189         list_del(&nc->ch_head);
2190         ch->connects--;
2191
2192         list_del(&nc->ns_head);
2193         ns->connects--;
2194         ns->pfds = realloc(ns->pfds, (ns->connects+1) * sizeof(struct pollfd));
2195
2196         free(nc);
2197 }
2198
2199 static struct cl_host *net_find_client_host(struct net_server_s *ns,
2200                                             struct in_addr cl_in_addr)
2201 {
2202         struct list_head *p;
2203
2204         __list_for_each(p, &ns->ch_list) {
2205                 struct cl_host *ch = list_entry(p, struct cl_host, head);
2206
2207                 if (in_addr_eq(ch->cl_in_addr, cl_in_addr))
2208                         return ch;
2209         }
2210
2211         return NULL;
2212 }
2213
2214 static struct cl_host *net_add_client_host(struct net_server_s *ns,
2215                                            struct sockaddr_in *addr)
2216 {
2217         struct cl_host *ch;
2218
2219         ch = malloc(sizeof(*ch));
2220         memset(ch, 0, sizeof(*ch));
2221
2222         ch->ns = ns;
2223         ch->cl_in_addr = addr->sin_addr;
2224         list_add_tail(&ch->head, &ns->ch_list);
2225         ns->nchs++;
2226
2227         ch->hostname = strdup(inet_ntoa(addr->sin_addr));
2228         printf("server: connection from %s\n", ch->hostname);
2229
2230         INIT_LIST_HEAD(&ch->conn_list);
2231         INIT_LIST_HEAD(&ch->devpaths);
2232
2233         return ch;
2234 }
2235
2236 static void device_done(struct devpath *dpp, int ncpus)
2237 {
2238         int cpu;
2239         struct io_info *iop;
2240
2241         for (cpu = 0, iop = dpp->ios; cpu < ncpus; cpu++, iop++)
2242                 close_iop(iop);
2243
2244         list_del(&dpp->head);
2245         dpp_free(dpp);
2246 }
2247
2248 static void net_ch_remove(struct cl_host *ch, int ncpus)
2249 {
2250         struct list_head *p, *q;
2251         struct net_server_s *ns = ch->ns;
2252
2253         list_for_each_safe(p, q, &ch->devpaths) {
2254                 struct devpath *dpp = list_entry(p, struct devpath, head);
2255                 device_done(dpp, ncpus);
2256         }
2257
2258         list_for_each_safe(p, q, &ch->conn_list) {
2259                 struct cl_conn *nc = list_entry(p, struct cl_conn, ch_head);
2260
2261                 ch_rem_connection(ns, ch, nc);
2262         }
2263
2264         list_del(&ch->head);
2265         ns->nchs--;
2266
2267         if (ch->hostname)
2268                 free(ch->hostname);
2269         free(ch);
2270 }
2271
2272 static void net_add_connection(struct net_server_s *ns)
2273 {
2274         int fd;
2275         struct cl_host *ch;
2276         socklen_t socklen = sizeof(ns->addr);
2277
2278         fd = my_accept(ns->listen_fd, (struct sockaddr *)&ns->addr, &socklen);
2279         if (fd < 0) {
2280                 /*
2281                  * This is OK: we just won't accept this connection,
2282                  * nothing fatal.
2283                  */
2284                 perror("accept");
2285         } else {
2286                 ch = net_find_client_host(ns, ns->addr.sin_addr);
2287                 if (!ch)
2288                         ch = net_add_client_host(ns, &ns->addr);
2289
2290                 ch_add_connection(ns, ch, fd);
2291         }
2292 }
2293
2294 static struct devpath *nc_add_dpp(struct cl_conn *nc,
2295                                   struct blktrace_net_hdr *bnh,
2296                                   time_t connect_time)
2297 {
2298         int cpu;
2299         struct io_info *iop;
2300         struct devpath *dpp;
2301
2302         dpp = malloc(sizeof(*dpp));
2303         memset(dpp, 0, sizeof(*dpp));
2304
2305         dpp->buts_name = strdup(bnh->buts_name);
2306         dpp->path = strdup(bnh->buts_name);
2307         dpp->fd = -1;
2308         dpp->ch = nc->ch;
2309         dpp->cl_id = bnh->cl_id;
2310         dpp->cl_connect_time = connect_time;
2311         dpp->ncpus = nc->ncpus;
2312         dpp->stats = calloc(dpp->ncpus, sizeof(*dpp->stats));
2313         memset(dpp->stats, 0, dpp->ncpus * sizeof(*dpp->stats));
2314
2315         list_add_tail(&dpp->head, &nc->ch->devpaths);
2316         nc->ch->ndevs++;
2317
2318         dpp->ios = calloc(nc->ncpus, sizeof(*iop));
2319         memset(dpp->ios, 0, ndevs * sizeof(*iop));
2320
2321         for (cpu = 0, iop = dpp->ios; cpu < nc->ncpus; cpu++, iop++) {
2322                 iop->dpp = dpp;
2323                 iop->nc = nc;
2324                 init_mmap_info(&iop->mmap_info);
2325
2326                 if (iop_open(iop, cpu))
2327                         goto err;
2328         }
2329
2330         return dpp;
2331
2332 err:
2333         /*
2334          * Need to unravel what's been done...
2335          */
2336         while (cpu >= 0)
2337                 close_iop(&dpp->ios[cpu--]);
2338         dpp_free(dpp);
2339
2340         return NULL;
2341 }
2342
2343 static struct devpath *nc_find_dpp(struct cl_conn *nc,
2344                                    struct blktrace_net_hdr *bnh)
2345 {
2346         struct list_head *p;
2347         time_t connect_time = nc->connect_time;
2348
2349         __list_for_each(p, &nc->ch->devpaths) {
2350                 struct devpath *dpp = list_entry(p, struct devpath, head);
2351
2352                 if (!strcmp(dpp->buts_name, bnh->buts_name))
2353                         return dpp;
2354
2355                 if (dpp->cl_id == bnh->cl_id)
2356                         connect_time = dpp->cl_connect_time;
2357         }
2358
2359         return nc_add_dpp(nc, bnh, connect_time);
2360 }
2361
2362 static void net_client_read_data(struct cl_conn *nc, struct devpath *dpp,
2363                                  struct blktrace_net_hdr *bnh)
2364 {
2365         int ret;
2366         struct io_info *iop = &dpp->ios[bnh->cpu];
2367         struct mmap_info *mip = &iop->mmap_info;
2368
2369         if (setup_mmap(iop->ofd, bnh->len, &iop->mmap_info)) {
2370                 fprintf(stderr, "ncd(%s:%d): mmap failed\n",
2371                         nc->ch->hostname, nc->fd);
2372                 exit(1);
2373         }
2374
2375         ret = net_recv_data(nc->fd, mip->fs_buf + mip->fs_off, bnh->len);
2376         if (ret > 0) {
2377                 pdc_dr_update(dpp, bnh->cpu, ret);
2378                 mip->fs_size += ret;
2379                 mip->fs_off += ret;
2380         } else if (ret < 0)
2381                 exit(1);
2382 }
2383
2384 /*
2385  * Returns 1 if we closed a host - invalidates other polling information
2386  * that may be present.
2387  */
2388 static int net_client_data(struct cl_conn *nc)
2389 {
2390         int ret;
2391         struct devpath *dpp;
2392         struct blktrace_net_hdr bnh;
2393
2394         ret = net_get_header(nc, &bnh);
2395         if (ret == 0)
2396                 return 0;
2397
2398         if (ret < 0) {
2399                 fprintf(stderr, "ncd(%d): header read failed\n", nc->fd);
2400                 exit(1);
2401         }
2402
2403         if (data_is_native == -1 && check_data_endianness(bnh.magic)) {
2404                 fprintf(stderr, "ncd(%d): received data is bad\n", nc->fd);
2405                 exit(1);
2406         }
2407
2408         if (!data_is_native) {
2409                 bnh.magic = be32_to_cpu(bnh.magic);
2410                 bnh.cpu = be32_to_cpu(bnh.cpu);
2411                 bnh.max_cpus = be32_to_cpu(bnh.max_cpus);
2412                 bnh.len = be32_to_cpu(bnh.len);
2413                 bnh.cl_id = be32_to_cpu(bnh.cl_id);
2414                 bnh.buf_size = be32_to_cpu(bnh.buf_size);
2415                 bnh.buf_nr = be32_to_cpu(bnh.buf_nr);
2416                 bnh.page_size = be32_to_cpu(bnh.page_size);
2417         }
2418
2419         if ((bnh.magic & 0xffffff00) != BLK_IO_TRACE_MAGIC) {
2420                 fprintf(stderr, "ncd(%s:%d): bad data magic\n",
2421                         nc->ch->hostname, nc->fd);
2422                 exit(1);
2423         }
2424
2425         if (nc->ncpus == -1)
2426                 nc->ncpus = bnh.max_cpus;
2427
2428         /*
2429          * len == 0 means the other end is sending us a new connection/dpp
2430          * len == 1 means that the other end signalled end-of-run
2431          */
2432         dpp = nc_find_dpp(nc, &bnh);
2433         if (bnh.len == 0) {
2434                 /*
2435                  * Just adding in the dpp above is enough
2436                  */
2437                 ack_open_close(nc->fd, dpp->buts_name);
2438                 nc->ch->cl_opens++;
2439         } else if (bnh.len == 1) {
2440                 /*
2441                  * overload cpu count with dropped events
2442                  */
2443                 dpp->drops = bnh.cpu;
2444
2445                 ack_open_close(nc->fd, dpp->buts_name);
2446                 if (--nc->ch->cl_opens == 0) {
2447                         show_stats(&nc->ch->devpaths);
2448                         net_ch_remove(nc->ch, nc->ncpus);
2449                         return 1;
2450                 }
2451         } else
2452                 net_client_read_data(nc, dpp, &bnh);
2453
2454         return 0;
2455 }
2456
2457 static void handle_client_data(struct net_server_s *ns, int events)
2458 {
2459         struct cl_conn *nc;
2460         struct pollfd *pfd;
2461         struct list_head *p, *q;
2462
2463         pfd = &ns->pfds[1];
2464         list_for_each_safe(p, q, &ns->conn_list) {
2465                 if (pfd->revents & POLLIN) {
2466                         nc = list_entry(p, struct cl_conn, ns_head);
2467
2468                         if (net_client_data(nc) || --events == 0)
2469                                 break;
2470                 }
2471                 pfd++;
2472         }
2473 }
2474
2475 static void net_setup_pfds(struct net_server_s *ns)
2476 {
2477         struct pollfd *pfd;
2478         struct list_head *p;
2479
2480         ns->pfds[0].fd = ns->listen_fd;
2481         ns->pfds[0].events = POLLIN;
2482
2483         pfd = &ns->pfds[1];
2484         __list_for_each(p, &ns->conn_list) {
2485                 struct cl_conn *nc = list_entry(p, struct cl_conn, ns_head);
2486
2487                 pfd->fd = nc->fd;
2488                 pfd->events = POLLIN;
2489                 pfd++;
2490         }
2491 }
2492
2493 static int net_server_handle_connections(struct net_server_s *ns)
2494 {
2495         int events;
2496
2497         printf("server: waiting for connections...\n");
2498
2499         while (!done) {
2500                 net_setup_pfds(ns);
2501                 events = poll(ns->pfds, ns->connects + 1, -1);
2502                 if (events < 0) {
2503                         if (errno != EINTR) {
2504                                 perror("FATAL: poll error");
2505                                 return 1;
2506                         }
2507                 } else if (events > 0) {
2508                         if (ns->pfds[0].revents & POLLIN) {
2509                                 net_add_connection(ns);
2510                                 events--;
2511                         }
2512
2513                         if (events)
2514                                 handle_client_data(ns, events);
2515                 }
2516         }
2517
2518         return 0;
2519 }
2520
2521 static int net_server(void)
2522 {
2523         int fd, opt;
2524         int ret = 1;
2525         struct net_server_s net_server;
2526         struct net_server_s *ns = &net_server;
2527
2528         memset(ns, 0, sizeof(*ns));
2529         INIT_LIST_HEAD(&ns->ch_list);
2530         INIT_LIST_HEAD(&ns->conn_list);
2531         ns->pfds = malloc(sizeof(struct pollfd));
2532
2533         fd = my_socket(AF_INET, SOCK_STREAM, 0);
2534         if (fd < 0) {
2535                 perror("server: socket");
2536                 goto out;
2537         }
2538
2539         opt = 1;
2540         if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
2541                 perror("setsockopt");
2542                 goto out;
2543         }
2544
2545         memset(&ns->addr, 0, sizeof(ns->addr));
2546         ns->addr.sin_family = AF_INET;
2547         ns->addr.sin_addr.s_addr = htonl(INADDR_ANY);
2548         ns->addr.sin_port = htons(net_port);
2549
2550         if (bind(fd, (struct sockaddr *) &ns->addr, sizeof(ns->addr)) < 0) {
2551                 perror("bind");
2552                 goto out;
2553         }
2554
2555         if (listen(fd, 1) < 0) {
2556                 perror("listen");
2557                 goto out;
2558         }
2559
2560         /*
2561          * The actual server looping is done here:
2562          */
2563         ns->listen_fd = fd;
2564         ret = net_server_handle_connections(ns);
2565
2566         /*
2567          * Clean up and return...
2568          */
2569 out:
2570         free(ns->pfds);
2571         return ret;
2572 }
2573
2574 static int run_tracers(void)
2575 {
2576         atexit(exit_tracing);
2577         if (net_mode == Net_client)
2578                 printf("blktrace: connecting to %s\n", hostname);
2579
2580         setup_buts();
2581
2582         if (use_tracer_devpaths()) {
2583                 if (setup_tracer_devpaths())
2584                         return 1;
2585
2586                 if (piped_output)
2587                         handle_list = handle_list_file;
2588                 else
2589                         handle_list = handle_list_net;
2590         }
2591
2592         start_tracers();
2593         if (nthreads_running == ncpus) {
2594                 unblock_tracers();
2595                 start_buts();
2596                 if (net_mode == Net_client)
2597                         printf("blktrace: connected!\n");
2598                 if (stop_watch)
2599                         alarm(stop_watch);
2600         } else
2601                 stop_tracers();
2602
2603         wait_tracers();
2604         if (nthreads_running == ncpus)
2605                 show_stats(&devpaths);
2606         if (net_client_use_send())
2607                 close_client_connections();
2608         del_tracers();
2609
2610         return 0;
2611 }
2612
2613 int main(int argc, char *argv[])
2614 {
2615         int ret = 0;
2616
2617         setlocale(LC_NUMERIC, "en_US");
2618         pagesize = getpagesize();
2619         ncpus = sysconf(_SC_NPROCESSORS_ONLN);
2620         if (ncpus < 0) {
2621                 fprintf(stderr, "sysconf(_SC_NPROCESSORS_ONLN) failed %d/%s\n",
2622                         errno, strerror(errno));
2623                 ret = 1;
2624                 goto out;
2625         } else if (handle_args(argc, argv)) {
2626                 ret = 1;
2627                 goto out;
2628         }
2629
2630         signal(SIGINT, handle_sigint);
2631         signal(SIGHUP, handle_sigint);
2632         signal(SIGTERM, handle_sigint);
2633         signal(SIGALRM, handle_sigint);
2634         signal(SIGPIPE, SIG_IGN);
2635
2636         if (kill_running_trace) {
2637                 struct devpath *dpp;
2638                 struct list_head *p;
2639
2640                 __list_for_each(p, &devpaths) {
2641                         dpp = list_entry(p, struct devpath, head);
2642                         if (__stop_trace(dpp->fd)) {
2643                                 fprintf(stderr,
2644                                         "BLKTRACETEARDOWN %s failed: %d/%s\n",
2645                                         dpp->path, errno, strerror(errno));
2646                         }
2647                 }
2648         } else if (net_mode == Net_server) {
2649                 if (output_name) {
2650                         fprintf(stderr, "-o ignored in server mode\n");
2651                         output_name = NULL;
2652                 }
2653                 ret = net_server();
2654         } else
2655                 ret = run_tracers();
2656
2657 out:
2658         if (pfp)
2659                 fclose(pfp);
2660         rel_devpaths();
2661         return ret;
2662 }