fix include statement in stats.h
[blktrace.git] / blktrace.c
1 /*
2  * block queue tracing application
3  *
4  * Copyright (C) 2005 Jens Axboe <axboe@suse.de>
5  * Copyright (C) 2006 Jens Axboe <axboe@kernel.dk>
6  *
7  * Rewrite to have a single thread per CPU (managing all devices on that CPU)
8  *      Alan D. Brunelle <alan.brunelle@hp.com> - January 2009
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *
20  *  You should have received a copy of the GNU General Public License
21  *  along with this program; if not, write to the Free Software
22  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
23  *
24  */
25
26 #include <errno.h>
27 #include <stdarg.h>
28 #include <stdio.h>
29 #include <stdlib.h>
30 #include <string.h>
31 #include <fcntl.h>
32 #include <getopt.h>
33 #include <sched.h>
34 #include <unistd.h>
35 #include <poll.h>
36 #include <signal.h>
37 #include <pthread.h>
38 #include <locale.h>
39 #include <sys/ioctl.h>
40 #include <sys/types.h>
41 #include <sys/stat.h>
42 #include <sys/vfs.h>
43 #include <sys/mman.h>
44 #include <sys/param.h>
45 #include <sys/time.h>
46 #include <sys/resource.h>
47 #include <sys/socket.h>
48 #include <netinet/in.h>
49 #include <arpa/inet.h>
50 #include <netdb.h>
51 #include <sys/sendfile.h>
52
53 #include "btt/list.h"
54 #include "blktrace.h"
55
56 /*
57  * You may want to increase this even more, if you are logging at a high
58  * rate and see skipped/missed events
59  */
60 #define BUF_SIZE                (512 * 1024)
61 #define BUF_NR                  (4)
62
63 #define FILE_VBUF_SIZE          (128 * 1024)
64
65 #define DEBUGFS_TYPE            (0x64626720)
66 #define TRACE_NET_PORT          (8462)
67
68 enum {
69         Net_none = 0,
70         Net_server,
71         Net_client,
72 };
73
74 enum thread_status {
75         Th_running,
76         Th_leaving,
77         Th_error
78 };
79
80 /*
81  * Generic stats collected: nevents can be _roughly_ estimated by data_read
82  * (discounting pdu...)
83  *
84  * These fields are updated w/ pdc_dr_update & pdc_nev_update below.
85  */
86 struct pdc_stats {
87         unsigned long long data_read;
88         unsigned long long nevents;
89 };
90
91 struct devpath {
92         struct list_head head;
93         char *path;                     /* path to device special file */
94         char *buts_name;                /* name returned from bt kernel code */
95         struct pdc_stats *stats;
96         int fd, idx, ncpus;
97         unsigned long long drops;
98
99         /*
100          * For piped output only:
101          *
102          * Each tracer will have a tracer_devpath_head that it will add new
103          * data onto. It's list is protected above (tracer_devpath_head.mutex)
104          * and it will signal the processing thread using the dp_cond,
105          * dp_mutex & dp_entries variables above.
106          */
107         struct tracer_devpath_head *heads;
108
109         /*
110          * For network server mode only:
111          */
112         struct cl_host *ch;
113         u32 cl_id;
114         time_t cl_connect_time;
115         struct io_info *ios;
116 };
117
118 /*
119  * For piped output to stdout we will have each tracer thread (one per dev)
120  * tack buffers read from the relay queues on a per-device list.
121  *
122  * The main thread will then collect trace buffers from each of lists in turn.
123  *
124  * We will use a mutex to guard each of the trace_buf list. The tracers
125  * can then signal the main thread using <dp_cond,dp_mutex> and
126  * dp_entries. (When dp_entries is 0, and a tracer adds an entry it will
127  * signal. When dp_entries is 0, the main thread will wait for that condition
128  * to be signalled.)
129  *
130  * adb: It may be better just to have a large buffer per tracer per dev,
131  * and then use it as a ring-buffer. This would certainly cut down a lot
132  * of malloc/free thrashing, at the cost of more memory movements (potentially).
133  */
134 struct trace_buf {
135         struct list_head head;
136         struct devpath *dpp;
137         void *buf;
138         int cpu, len;
139 };
140
141 struct tracer_devpath_head {
142         pthread_mutex_t mutex;
143         struct list_head head;
144         struct trace_buf *prev;
145 };
146
147 /*
148  * Used to handle the mmap() interfaces for output file (containing traces)
149  */
150 struct mmap_info {
151         void *fs_buf;
152         unsigned long long fs_size, fs_max_size, fs_off, fs_buf_len;
153         unsigned long buf_size, buf_nr;
154         int pagesize;
155 };
156
157 /*
158  * Each thread doing work on a (client) side of blktrace will have one
159  * of these. The ios array contains input/output information, pfds holds
160  * poll() data. The volatile's provide flags to/from the main executing
161  * thread.
162  */
163 struct tracer {
164         struct list_head head;
165         struct io_info *ios;
166         struct pollfd *pfds;
167         pthread_t thread;
168         int cpu, nios;
169         volatile int status, is_done;
170 };
171
172 /*
173  * networking stuff follows. we include a magic number so we know whether
174  * to endianness convert or not.
175  *
176  * The len field is overloaded:
177  *      0 - Indicates an "open" - allowing the server to set up for a dev/cpu
178  *      1 - Indicates a "close" - Shut down connection orderly
179  *
180  * The cpu field is overloaded on close: it will contain the number of drops.
181  */
182 struct blktrace_net_hdr {
183         u32 magic;              /* same as trace magic */
184         char buts_name[32];     /* trace name */
185         u32 cpu;                /* for which cpu */
186         u32 max_cpus;
187         u32 len;                /* length of following trace data */
188         u32 cl_id;              /* id for set of client per-cpu connections */
189         u32 buf_size;           /* client buf_size for this trace  */
190         u32 buf_nr;             /* client buf_nr for this trace  */
191         u32 page_size;          /* client page_size for this trace  */
192 };
193
194 /*
195  * Each host encountered has one of these. The head is used to link this
196  * on to the network server's ch_list. Connections associated with this
197  * host are linked on conn_list, and any devices traced on that host
198  * are connected on the devpaths list.
199  */
200 struct cl_host {
201         struct list_head head;
202         struct list_head conn_list;
203         struct list_head devpaths;
204         struct net_server_s *ns;
205         char *hostname;
206         struct in_addr cl_in_addr;
207         int connects, ndevs, cl_opens;
208 };
209
210 /*
211  * Each connection (client to server socket ('fd')) has one of these. A
212  * back reference to the host ('ch'), and lists headers (for the host
213  * list, and the network server conn_list) are also included.
214  */
215 struct cl_conn {
216         struct list_head ch_head, ns_head;
217         struct cl_host *ch;
218         int fd, ncpus;
219         time_t connect_time;
220 };
221
222 /*
223  * The network server requires some poll structures to be maintained -
224  * one per conection currently on conn_list. The nchs/ch_list values
225  * are for each host connected to this server. The addr field is used
226  * for scratch as new connections are established.
227  */
228 struct net_server_s {
229         struct list_head conn_list;
230         struct list_head ch_list;
231         struct pollfd *pfds;
232         int listen_fd, connects, nchs;
233         struct sockaddr_in addr;
234 };
235
236 /*
237  * This structure is (generically) used to providide information
238  * for a read-to-write set of values.
239  *
240  * ifn & ifd represent input information
241  *
242  * ofn, ofd, ofp, obuf & mmap_info are used for output file (optionally).
243  */
244 struct io_info {
245         struct devpath *dpp;
246         FILE *ofp;
247         char *obuf;
248         struct cl_conn *nc;     /* Server network connection */
249
250         /*
251          * mmap controlled output files
252          */
253         struct mmap_info mmap_info;
254
255         /*
256          * Client network fields
257          */
258         unsigned int ready;
259         unsigned long long data_queued;
260
261         /*
262          * Input/output file descriptors & names
263          */
264         int ifd, ofd;
265         char ifn[MAXPATHLEN + 64];
266         char ofn[MAXPATHLEN + 64];
267 };
268
269 static char blktrace_version[] = "2.0.0";
270
271 /*
272  * Linkage to blktrace helper routines (trace conversions)
273  */
274 int data_is_native = -1;
275
276 static int ndevs;
277 static int ncpus;
278 static int pagesize;
279 static int act_mask = ~0U;
280 static int kill_running_trace;
281 static int stop_watch;
282 static int piped_output;
283
284 static char *debugfs_path = "/sys/kernel/debug";
285 static char *output_name;
286 static char *output_dir;
287
288 static unsigned long buf_size = BUF_SIZE;
289 static unsigned long buf_nr = BUF_NR;
290
291 static FILE *pfp;
292
293 static LIST_HEAD(devpaths);
294 static LIST_HEAD(tracers);
295
296 static volatile int done;
297
298 /*
299  * tracer threads add entries, the main thread takes them off and processes
300  * them. These protect the dp_entries variable.
301  */
302 static pthread_cond_t dp_cond = PTHREAD_COND_INITIALIZER;
303 static pthread_mutex_t dp_mutex = PTHREAD_MUTEX_INITIALIZER;
304 static volatile int dp_entries;
305
306 /*
307  * These synchronize master / thread interactions.
308  */
309 static pthread_cond_t mt_cond = PTHREAD_COND_INITIALIZER;
310 static pthread_mutex_t mt_mutex = PTHREAD_MUTEX_INITIALIZER;
311 static volatile int nthreads_running;
312 static volatile int nthreads_leaving;
313 static volatile int nthreads_error;
314 static volatile int tracers_run;
315
316 /*
317  * network cmd line params
318  */
319 static struct sockaddr_in hostname_addr;
320 static char hostname[MAXHOSTNAMELEN];
321 static int net_port = TRACE_NET_PORT;
322 static int net_use_sendfile = 1;
323 static int net_mode;
324 static int *cl_fds;
325
326 static int (*handle_pfds)(struct tracer *, int, int);
327 static int (*handle_list)(struct tracer_devpath_head *, struct list_head *);
328
329 #define S_OPTS  "d:a:A:r:o:kw:vVb:n:D:lh:p:sI:"
330 static struct option l_opts[] = {
331         {
332                 .name = "dev",
333                 .has_arg = required_argument,
334                 .flag = NULL,
335                 .val = 'd'
336         },
337         {
338                 .name = "input-devs",
339                 .has_arg = required_argument,
340                 .flag = NULL,
341                 .val = 'I'
342         },
343         {
344                 .name = "act-mask",
345                 .has_arg = required_argument,
346                 .flag = NULL,
347                 .val = 'a'
348         },
349         {
350                 .name = "set-mask",
351                 .has_arg = required_argument,
352                 .flag = NULL,
353                 .val = 'A'
354         },
355         {
356                 .name = "relay",
357                 .has_arg = required_argument,
358                 .flag = NULL,
359                 .val = 'r'
360         },
361         {
362                 .name = "output",
363                 .has_arg = required_argument,
364                 .flag = NULL,
365                 .val = 'o'
366         },
367         {
368                 .name = "kill",
369                 .has_arg = no_argument,
370                 .flag = NULL,
371                 .val = 'k'
372         },
373         {
374                 .name = "stopwatch",
375                 .has_arg = required_argument,
376                 .flag = NULL,
377                 .val = 'w'
378         },
379         {
380                 .name = "version",
381                 .has_arg = no_argument,
382                 .flag = NULL,
383                 .val = 'v'
384         },
385         {
386                 .name = "version",
387                 .has_arg = no_argument,
388                 .flag = NULL,
389                 .val = 'V'
390         },
391         {
392                 .name = "buffer-size",
393                 .has_arg = required_argument,
394                 .flag = NULL,
395                 .val = 'b'
396         },
397         {
398                 .name = "num-sub-buffers",
399                 .has_arg = required_argument,
400                 .flag = NULL,
401                 .val = 'n'
402         },
403         {
404                 .name = "output-dir",
405                 .has_arg = required_argument,
406                 .flag = NULL,
407                 .val = 'D'
408         },
409         {
410                 .name = "listen",
411                 .has_arg = no_argument,
412                 .flag = NULL,
413                 .val = 'l'
414         },
415         {
416                 .name = "host",
417                 .has_arg = required_argument,
418                 .flag = NULL,
419                 .val = 'h'
420         },
421         {
422                 .name = "port",
423                 .has_arg = required_argument,
424                 .flag = NULL,
425                 .val = 'p'
426         },
427         {
428                 .name = "no-sendfile",
429                 .has_arg = no_argument,
430                 .flag = NULL,
431                 .val = 's'
432         },
433         {
434                 .name = NULL,
435         }
436 };
437
438 static char usage_str[] = \
439         "-d <dev> [ -r debugfs path ] [ -o <output> ] [-k ] [ -w time ]\n" \
440         "[ -a action ] [ -A action mask ] [ -I  <devs file> ] [ -v ]\n\n" \
441         "\t-d Use specified device. May also be given last after options\n" \
442         "\t-r Path to mounted debugfs, defaults to /sys/kernel/debug\n" \
443         "\t-o File(s) to send output to\n" \
444         "\t-D Directory to prepend to output file names\n" \
445         "\t-k Kill a running trace\n" \
446         "\t-w Stop after defined time, in seconds\n" \
447         "\t-a Only trace specified actions. See documentation\n" \
448         "\t-A Give trace mask as a single value. See documentation\n" \
449         "\t-b Sub buffer size in KiB\n" \
450         "\t-n Number of sub buffers\n" \
451         "\t-l Run in network listen mode (blktrace server)\n" \
452         "\t-h Run in network client mode, connecting to the given host\n" \
453         "\t-p Network port to use (default 8462)\n" \
454         "\t-s Make the network client NOT use sendfile() to transfer data\n" \
455         "\t-I Add devices found in <devs file>\n" \
456         "\t-V Print program version info\n\n";
457
458 static void clear_events(struct pollfd *pfd)
459 {
460         pfd->events = 0;
461         pfd->revents = 0;
462 }
463
464 static inline int net_client_use_sendfile(void)
465 {
466         return net_mode == Net_client && net_use_sendfile;
467 }
468
469 static inline int net_client_use_send(void)
470 {
471         return net_mode == Net_client && !net_use_sendfile;
472 }
473
474 static inline int use_tracer_devpaths(void)
475 {
476         return piped_output || net_client_use_send();
477 }
478
479 static inline int in_addr_eq(struct in_addr a, struct in_addr b)
480 {
481         return a.s_addr == b.s_addr;
482 }
483
484 static inline void pdc_dr_update(struct devpath *dpp, int cpu, int data_read)
485 {
486         dpp->stats[cpu].data_read += data_read;
487 }
488
489 static inline void pdc_nev_update(struct devpath *dpp, int cpu, int nevents)
490 {
491         dpp->stats[cpu].nevents += nevents;
492 }
493
494 static void show_usage(char *prog)
495 {
496         fprintf(stderr, "Usage: %s %s %s", prog, blktrace_version, usage_str);
497 }
498
499 /*
500  * Create a timespec 'msec' milliseconds into the future
501  */
502 static inline void make_timespec(struct timespec *tsp, long delta_msec)
503 {
504         struct timeval now;
505
506         gettimeofday(&now, NULL);
507         tsp->tv_sec = now.tv_sec;
508         tsp->tv_nsec = 1000L * now.tv_usec;
509
510         tsp->tv_nsec += (delta_msec * 1000000L);
511         if (tsp->tv_nsec > 1000000000L) {
512                 long secs = tsp->tv_nsec / 1000000000L;
513
514                 tsp->tv_sec += secs;
515                 tsp->tv_nsec -= (secs * 1000000000L);
516         }
517 }
518
519 /*
520  * Add a timer to ensure wait ends
521  */
522 static void t_pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
523 {
524         struct timespec ts;
525
526         make_timespec(&ts, 50);
527         pthread_cond_timedwait(cond, mutex, &ts);
528 }
529
530 static void unblock_tracers(void)
531 {
532         pthread_mutex_lock(&mt_mutex);
533         tracers_run = 1;
534         pthread_cond_broadcast(&mt_cond);
535         pthread_mutex_unlock(&mt_mutex);
536 }
537
538 static void tracer_wait_unblock(struct tracer *tp)
539 {
540         pthread_mutex_lock(&mt_mutex);
541         while (!tp->is_done && !tracers_run)
542                 pthread_cond_wait(&mt_cond, &mt_mutex);
543         pthread_mutex_unlock(&mt_mutex);
544 }
545
546 static void tracer_signal_ready(struct tracer *tp,
547                                 enum thread_status th_status,
548                                 int status)
549 {
550         pthread_mutex_lock(&mt_mutex);
551         tp->status = status;
552
553         if (th_status == Th_running)
554                 nthreads_running++;
555         else if (th_status == Th_error)
556                 nthreads_error++;
557         else
558                 nthreads_leaving++;
559
560         pthread_cond_signal(&mt_cond);
561         pthread_mutex_unlock(&mt_mutex);
562 }
563
564 static void wait_tracers_ready(int ncpus_started)
565 {
566         pthread_mutex_lock(&mt_mutex);
567         while ((nthreads_running + nthreads_error) < ncpus_started)
568                 t_pthread_cond_wait(&mt_cond, &mt_mutex);
569         pthread_mutex_unlock(&mt_mutex);
570 }
571
572 static void wait_tracers_leaving(void)
573 {
574         pthread_mutex_lock(&mt_mutex);
575         while (nthreads_leaving < nthreads_running)
576                 t_pthread_cond_wait(&mt_cond, &mt_mutex);
577         pthread_mutex_unlock(&mt_mutex);
578 }
579
580 static void init_mmap_info(struct mmap_info *mip)
581 {
582         mip->buf_size = buf_size;
583         mip->buf_nr = buf_nr;
584         mip->pagesize = pagesize;
585 }
586
587 static void net_close_connection(int *fd)
588 {
589         shutdown(*fd, SHUT_RDWR);
590         close(*fd);
591         *fd = -1;
592 }
593
594 static void dpp_free(struct devpath *dpp)
595 {
596         if (dpp->stats)
597                 free(dpp->stats);
598         if (dpp->ios)
599                 free(dpp->ios);
600         if (dpp->path)
601                 free(dpp->path);
602         if (dpp->buts_name)
603                 free(dpp->buts_name);
604         free(dpp);
605 }
606
607 static int lock_on_cpu(int cpu)
608 {
609         cpu_set_t cpu_mask;
610
611         CPU_ZERO(&cpu_mask);
612         CPU_SET(cpu, &cpu_mask);
613         if (sched_setaffinity(0, sizeof(cpu_mask), &cpu_mask) < 0)
614                 return errno;
615
616         return 0;
617 }
618
619 static int increase_limit(int resource, rlim_t increase)
620 {
621         struct rlimit rlim;
622         int save_errno = errno;
623
624         if (!getrlimit(resource, &rlim)) {
625                 rlim.rlim_cur += increase;
626                 if (rlim.rlim_cur >= rlim.rlim_max)
627                         rlim.rlim_max = rlim.rlim_cur + increase;
628
629                 if (!setrlimit(resource, &rlim))
630                         return 1;
631         }
632
633         errno = save_errno;
634         return 0;
635 }
636
637 static int handle_open_failure(void)
638 {
639         if (errno == ENFILE || errno == EMFILE)
640                 return increase_limit(RLIMIT_NOFILE, 16);
641         return 0;
642 }
643
644 static int handle_mem_failure(size_t length)
645 {
646         if (errno == ENFILE)
647                 return handle_open_failure();
648         else if (errno == ENOMEM)
649                 return increase_limit(RLIMIT_MEMLOCK, 2 * length);
650         return 0;
651 }
652
653 static FILE *my_fopen(const char *path, const char *mode)
654 {
655         FILE *fp;
656
657         do {
658                 fp = fopen(path, mode);
659         } while (fp == NULL && handle_open_failure());
660
661         return fp;
662 }
663
664 static int my_open(const char *path, int flags)
665 {
666         int fd;
667
668         do {
669                 fd = open(path, flags);
670         } while (fd < 0 && handle_open_failure());
671
672         return fd;
673 }
674
675 static int my_socket(int domain, int type, int protocol)
676 {
677         int fd;
678
679         do {
680                 fd = socket(domain, type, protocol);
681         } while (fd < 0 && handle_open_failure());
682
683         return fd;
684 }
685
686 static int my_accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen)
687 {
688         int fd;
689
690         do {
691                 fd = accept(sockfd, addr, addrlen);
692         } while (fd < 0 && handle_open_failure());
693
694         return fd;
695 }
696
697 static void *my_mmap(void *addr, size_t length, int prot, int flags, int fd,
698                      off_t offset)
699 {
700         void *new;
701
702         do {
703                 new = mmap(addr, length, prot, flags, fd, offset);
704         } while (new == MAP_FAILED && handle_mem_failure(length));
705
706         return new;
707 }
708
709 static int my_mlock(const void *addr, size_t len)
710 {
711         int ret;
712
713         do {
714                 ret = mlock(addr, len);
715         } while (ret < 0 && handle_mem_failure(len));
716
717         return ret;
718 }
719
720 static int setup_mmap(int fd, unsigned int maxlen, struct mmap_info *mip)
721 {
722         if (mip->fs_off + maxlen > mip->fs_buf_len) {
723                 unsigned long nr = max(16, mip->buf_nr);
724
725                 if (mip->fs_buf) {
726                         munlock(mip->fs_buf, mip->fs_buf_len);
727                         munmap(mip->fs_buf, mip->fs_buf_len);
728                         mip->fs_buf = NULL;
729                 }
730
731                 mip->fs_off = mip->fs_size & (mip->pagesize - 1);
732                 mip->fs_buf_len = (nr * mip->buf_size) - mip->fs_off;
733                 mip->fs_max_size += mip->fs_buf_len;
734
735                 if (ftruncate(fd, mip->fs_max_size) < 0) {
736                         perror("setup_mmap: ftruncate");
737                         return 1;
738                 }
739
740                 mip->fs_buf = my_mmap(NULL, mip->fs_buf_len, PROT_WRITE,
741                                       MAP_SHARED, fd,
742                                       mip->fs_size - mip->fs_off);
743                 if (mip->fs_buf == MAP_FAILED) {
744                         perror("setup_mmap: mmap");
745                         return 1;
746                 }
747                 my_mlock(mip->fs_buf, mip->fs_buf_len);
748         }
749
750         return 0;
751 }
752
753 static int __stop_trace(int fd)
754 {
755         /*
756          * Should be stopped, don't complain if it isn't
757          */
758         ioctl(fd, BLKTRACESTOP);
759         return ioctl(fd, BLKTRACETEARDOWN);
760 }
761
762 static int write_data(char *buf, int len)
763 {
764         int ret;
765
766 rewrite:
767         ret = fwrite(buf, len, 1, pfp);
768         if (ferror(pfp) || ret != 1) {
769                 if (errno == EINTR) {
770                         clearerr(pfp);
771                         goto rewrite;
772                 }
773
774                 if (!piped_output || (errno != EPIPE && errno != EBADF)) {
775                         fprintf(stderr, "write(%d) failed: %d/%s\n",
776                                 len, errno, strerror(errno));
777                 }
778                 goto err;
779         }
780
781         fflush(pfp);
782         return 0;
783
784 err:
785         clearerr(pfp);
786         return 1;
787 }
788
789 /*
790  * Returns the number of bytes read (successfully)
791  */
792 static int __net_recv_data(int fd, void *buf, unsigned int len)
793 {
794         unsigned int bytes_left = len;
795
796         while (bytes_left && !done) {
797                 int ret = recv(fd, buf, bytes_left, MSG_WAITALL);
798
799                 if (ret == 0)
800                         break;
801                 else if (ret < 0) {
802                         if (errno == EAGAIN) {
803                                 usleep(50);
804                                 continue;
805                         }
806                         perror("server: net_recv_data: recv failed");
807                         break;
808                 } else {
809                         buf += ret;
810                         bytes_left -= ret;
811                 }
812         }
813
814         return len - bytes_left;
815 }
816
817 static int net_recv_data(int fd, void *buf, unsigned int len)
818 {
819         return __net_recv_data(fd, buf, len);
820 }
821
822 /*
823  * Returns number of bytes written
824  */
825 static int net_send_data(int fd, void *buf, unsigned int buf_len)
826 {
827         int ret;
828         unsigned int bytes_left = buf_len;
829
830         while (bytes_left) {
831                 ret = send(fd, buf, bytes_left, 0);
832                 if (ret < 0) {
833                         perror("send");
834                         break;
835                 }
836
837                 buf += ret;
838                 bytes_left -= ret;
839         }
840
841         return buf_len - bytes_left;
842 }
843
844 static int net_send_header(int fd, int cpu, char *buts_name, int len)
845 {
846         struct blktrace_net_hdr hdr;
847
848         memset(&hdr, 0, sizeof(hdr));
849
850         hdr.magic = BLK_IO_TRACE_MAGIC;
851         strncpy(hdr.buts_name, buts_name, sizeof(hdr.buts_name));
852         hdr.buts_name[sizeof(hdr.buts_name)-1] = '\0';
853         hdr.cpu = cpu;
854         hdr.max_cpus = ncpus;
855         hdr.len = len;
856         hdr.cl_id = getpid();
857         hdr.buf_size = buf_size;
858         hdr.buf_nr = buf_nr;
859         hdr.page_size = pagesize;
860
861         return net_send_data(fd, &hdr, sizeof(hdr)) != sizeof(hdr);
862 }
863
864 static void net_send_open_close(int fd, int cpu, char *buts_name, int len)
865 {
866         struct blktrace_net_hdr ret_hdr;
867
868         net_send_header(fd, cpu, buts_name, len);
869         net_recv_data(fd, &ret_hdr, sizeof(ret_hdr));
870 }
871
872 static void net_send_open(int fd, int cpu, char *buts_name)
873 {
874         net_send_open_close(fd, cpu, buts_name, 0);
875 }
876
877 static void net_send_close(int fd, char *buts_name, int drops)
878 {
879         /*
880          * Overload CPU w/ number of drops
881          *
882          * XXX: Need to clear/set done around call - done=1 (which
883          * is true here) stops reads from happening... :-(
884          */
885         done = 0;
886         net_send_open_close(fd, drops, buts_name, 1);
887         done = 1;
888 }
889
890 static void ack_open_close(int fd, char *buts_name)
891 {
892         net_send_header(fd, 0, buts_name, 2);
893 }
894
895 static void net_send_drops(int fd)
896 {
897         struct list_head *p;
898
899         __list_for_each(p, &devpaths) {
900                 struct devpath *dpp = list_entry(p, struct devpath, head);
901
902                 net_send_close(fd, dpp->buts_name, dpp->drops);
903         }
904 }
905
906 /*
907  * Returns:
908  *       0: "EOF"
909  *       1: OK
910  *      -1: Error
911  */
912 static int net_get_header(struct cl_conn *nc, struct blktrace_net_hdr *bnh)
913 {
914         int bytes_read;
915         int fl = fcntl(nc->fd, F_GETFL);
916
917         fcntl(nc->fd, F_SETFL, fl | O_NONBLOCK);
918         bytes_read = __net_recv_data(nc->fd, bnh, sizeof(*bnh));
919         fcntl(nc->fd, F_SETFL, fl & ~O_NONBLOCK);
920
921         if (bytes_read == sizeof(*bnh))
922                 return 1;
923         else if (bytes_read == 0)
924                 return 0;
925         else
926                 return -1;
927 }
928
929 static int net_setup_addr(void)
930 {
931         struct sockaddr_in *addr = &hostname_addr;
932
933         memset(addr, 0, sizeof(*addr));
934         addr->sin_family = AF_INET;
935         addr->sin_port = htons(net_port);
936
937         if (inet_aton(hostname, &addr->sin_addr) != 1) {
938                 struct hostent *hent;
939 retry:
940                 hent = gethostbyname(hostname);
941                 if (!hent) {
942                         if (h_errno == TRY_AGAIN) {
943                                 usleep(100);
944                                 goto retry;
945                         } else if (h_errno == NO_RECOVERY) {
946                                 fprintf(stderr, "gethostbyname(%s)"
947                                         "non-recoverable error encountered\n",
948                                         hostname);
949                         } else {
950                                 /*
951                                  * HOST_NOT_FOUND, NO_ADDRESS or NO_DATA
952                                  */
953                                 fprintf(stderr, "Host %s not found\n",
954                                         hostname);
955                         }
956                         return 1;
957                 }
958
959                 memcpy(&addr->sin_addr, hent->h_addr, 4);
960                 strcpy(hostname, hent->h_name);
961         }
962
963         return 0;
964 }
965
966 static int net_setup_client(void)
967 {
968         int fd;
969         struct sockaddr_in *addr = &hostname_addr;
970
971         fd = my_socket(AF_INET, SOCK_STREAM, 0);
972         if (fd < 0) {
973                 perror("client: socket");
974                 return -1;
975         }
976
977         if (connect(fd, (struct sockaddr *)addr, sizeof(*addr)) < 0) {
978                 if (errno == ECONNREFUSED)
979                         fprintf(stderr,
980                                 "\nclient: Connection to %s refused, "
981                                 "perhaps the server is not started?\n\n",
982                                 hostname);
983                 else
984                         perror("client: connect");
985
986                 close(fd);
987                 return -1;
988         }
989
990         return fd;
991 }
992
993 static int open_client_connections(void)
994 {
995         int cpu;
996
997         cl_fds = calloc(ncpus, sizeof(*cl_fds));
998         for (cpu = 0; cpu < ncpus; cpu++) {
999                 cl_fds[cpu] = net_setup_client();
1000                 if (cl_fds[cpu] < 0)
1001                         goto err;
1002         }
1003         return 0;
1004
1005 err:
1006         while (cpu > 0)
1007                 close(cl_fds[cpu--]);
1008         free(cl_fds);
1009         return 1;
1010 }
1011
1012 static void close_client_connections(void)
1013 {
1014         if (cl_fds) {
1015                 int cpu, *fdp;
1016
1017                 for (cpu = 0, fdp = cl_fds; cpu < ncpus; cpu++, fdp++) {
1018                         if (*fdp >= 0) {
1019                                 net_send_drops(*fdp);
1020                                 net_close_connection(fdp);
1021                         }
1022                 }
1023                 free(cl_fds);
1024         }
1025 }
1026
1027 static void setup_buts(void)
1028 {
1029         struct list_head *p;
1030
1031         __list_for_each(p, &devpaths) {
1032                 struct blk_user_trace_setup buts;
1033                 struct devpath *dpp = list_entry(p, struct devpath, head);
1034
1035                 memset(&buts, 0, sizeof(buts));
1036                 buts.buf_size = buf_size;
1037                 buts.buf_nr = buf_nr;
1038                 buts.act_mask = act_mask;
1039
1040                 if (ioctl(dpp->fd, BLKTRACESETUP, &buts) >= 0) {
1041                         dpp->ncpus = ncpus;
1042                         dpp->buts_name = strdup(buts.name);
1043                         if (dpp->stats)
1044                                 free(dpp->stats);
1045                         dpp->stats = calloc(dpp->ncpus, sizeof(*dpp->stats));
1046                         memset(dpp->stats, 0, dpp->ncpus * sizeof(*dpp->stats));
1047                 } else
1048                         fprintf(stderr, "BLKTRACESETUP(2) %s failed: %d/%s\n",
1049                                 dpp->path, errno, strerror(errno));
1050         }
1051 }
1052
1053 static void start_buts(void)
1054 {
1055         struct list_head *p;
1056
1057         __list_for_each(p, &devpaths) {
1058                 struct devpath *dpp = list_entry(p, struct devpath, head);
1059
1060                 if (ioctl(dpp->fd, BLKTRACESTART) < 0) {
1061                         fprintf(stderr, "BLKTRACESTART %s failed: %d/%s\n",
1062                                 dpp->path, errno, strerror(errno));
1063                 }
1064         }
1065 }
1066
1067 static int get_drops(struct devpath *dpp)
1068 {
1069         int fd, drops = 0;
1070         char fn[MAXPATHLEN + 64], tmp[256];
1071
1072         snprintf(fn, sizeof(fn), "%s/block/%s/dropped", debugfs_path,
1073                  dpp->buts_name);
1074
1075         fd = my_open(fn, O_RDONLY);
1076         if (fd < 0) {
1077                 /*
1078                  * This may be ok: the kernel may not support
1079                  * dropped counts.
1080                  */
1081                 if (errno != ENOENT)
1082                         fprintf(stderr, "Could not open %s: %d/%s\n",
1083                                 fn, errno, strerror(errno));
1084                 return 0;
1085         } else if (read(fd, tmp, sizeof(tmp)) < 0) {
1086                 fprintf(stderr, "Could not read %s: %d/%s\n",
1087                         fn, errno, strerror(errno));
1088         } else
1089                 drops = atoi(tmp);
1090         close(fd);
1091
1092         return drops;
1093 }
1094
1095 static void get_all_drops(void)
1096 {
1097         struct list_head *p;
1098
1099         __list_for_each(p, &devpaths) {
1100                 struct devpath *dpp = list_entry(p, struct devpath, head);
1101
1102                 dpp->drops = get_drops(dpp);
1103         }
1104 }
1105
1106 static inline struct trace_buf *alloc_trace_buf(int cpu, int bufsize)
1107 {
1108         struct trace_buf *tbp;
1109
1110         tbp = malloc(sizeof(*tbp) + bufsize);
1111         INIT_LIST_HEAD(&tbp->head);
1112         tbp->len = 0;
1113         tbp->buf = (void *)(tbp + 1);
1114         tbp->cpu = cpu;
1115         tbp->dpp = NULL;        /* Will be set when tbp is added */
1116
1117         return tbp;
1118 }
1119
1120 static void free_tracer_heads(struct devpath *dpp)
1121 {
1122         int cpu;
1123         struct tracer_devpath_head *hd;
1124
1125         for (cpu = 0, hd = dpp->heads; cpu < ncpus; cpu++, hd++) {
1126                 if (hd->prev)
1127                         free(hd->prev);
1128
1129                 pthread_mutex_destroy(&hd->mutex);
1130         }
1131         free(dpp->heads);
1132 }
1133
1134 static int setup_tracer_devpaths(void)
1135 {
1136         struct list_head *p;
1137
1138         if (net_client_use_send())
1139                 if (open_client_connections())
1140                         return 1;
1141
1142         __list_for_each(p, &devpaths) {
1143                 int cpu;
1144                 struct tracer_devpath_head *hd;
1145                 struct devpath *dpp = list_entry(p, struct devpath, head);
1146
1147                 dpp->heads = calloc(ncpus, sizeof(struct tracer_devpath_head));
1148                 for (cpu = 0, hd = dpp->heads; cpu < ncpus; cpu++, hd++) {
1149                         INIT_LIST_HEAD(&hd->head);
1150                         pthread_mutex_init(&hd->mutex, NULL);
1151                         hd->prev = NULL;
1152                 }
1153         }
1154
1155         return 0;
1156 }
1157
1158 static inline void add_trace_buf(struct devpath *dpp, int cpu,
1159                                                 struct trace_buf **tbpp)
1160 {
1161         struct trace_buf *tbp = *tbpp;
1162         struct tracer_devpath_head *hd = &dpp->heads[cpu];
1163
1164         tbp->dpp = dpp;
1165
1166         pthread_mutex_lock(&hd->mutex);
1167         list_add_tail(&tbp->head, &hd->head);
1168         pthread_mutex_unlock(&hd->mutex);
1169
1170         *tbpp = alloc_trace_buf(cpu, buf_size);
1171 }
1172
1173 static inline void incr_entries(int entries_handled)
1174 {
1175         pthread_mutex_lock(&dp_mutex);
1176         if (dp_entries == 0)
1177                 pthread_cond_signal(&dp_cond);
1178         dp_entries += entries_handled;
1179         pthread_mutex_unlock(&dp_mutex);
1180 }
1181
1182 static void decr_entries(int handled)
1183 {
1184         pthread_mutex_lock(&dp_mutex);
1185         dp_entries -= handled;
1186         pthread_mutex_unlock(&dp_mutex);
1187 }
1188
1189 static int wait_empty_entries(void)
1190 {
1191         pthread_mutex_lock(&dp_mutex);
1192         while (!done && dp_entries == 0)
1193                 t_pthread_cond_wait(&dp_cond, &dp_mutex);
1194         pthread_mutex_unlock(&dp_mutex);
1195
1196         return !done;
1197 }
1198
1199 static int add_devpath(char *path)
1200 {
1201         int fd;
1202         struct devpath *dpp;
1203
1204         /*
1205          * Verify device is valid before going too far
1206          */
1207         fd = my_open(path, O_RDONLY | O_NONBLOCK);
1208         if (fd < 0) {
1209                 fprintf(stderr, "Invalid path %s specified: %d/%s\n",
1210                         path, errno, strerror(errno));
1211                 return 1;
1212         }
1213
1214         dpp = malloc(sizeof(*dpp));
1215         memset(dpp, 0, sizeof(*dpp));
1216         dpp->path = strdup(path);
1217         dpp->fd = fd;
1218         dpp->idx = ndevs++;
1219         list_add_tail(&dpp->head, &devpaths);
1220
1221         return 0;
1222 }
1223
1224 static void rel_devpaths(void)
1225 {
1226         struct list_head *p, *q;
1227
1228         list_for_each_safe(p, q, &devpaths) {
1229                 struct devpath *dpp = list_entry(p, struct devpath, head);
1230
1231                 list_del(&dpp->head);
1232                 __stop_trace(dpp->fd);
1233                 close(dpp->fd);
1234
1235                 if (dpp->heads)
1236                         free_tracer_heads(dpp);
1237
1238                 dpp_free(dpp);
1239                 ndevs--;
1240         }
1241 }
1242
1243 static int flush_subbuf_net(struct trace_buf *tbp)
1244 {
1245         int fd = cl_fds[tbp->cpu];
1246         struct devpath *dpp = tbp->dpp;
1247
1248         if (net_send_header(fd, tbp->cpu, dpp->buts_name, tbp->len))
1249                 return 1;
1250         else if (net_send_data(fd, tbp->buf, tbp->len) != tbp->len)
1251                 return 1;
1252
1253         return 0;
1254 }
1255
1256 static int
1257 handle_list_net(__attribute__((__unused__))struct tracer_devpath_head *hd,
1258                 struct list_head *list)
1259 {
1260         struct trace_buf *tbp;
1261         struct list_head *p, *q;
1262         int entries_handled = 0;
1263
1264         list_for_each_safe(p, q, list) {
1265                 tbp = list_entry(p, struct trace_buf, head);
1266
1267                 list_del(&tbp->head);
1268                 entries_handled++;
1269
1270                 if (cl_fds[tbp->cpu] >= 0) {
1271                         if (flush_subbuf_net(tbp)) {
1272                                 close(cl_fds[tbp->cpu]);
1273                                 cl_fds[tbp->cpu] = -1;
1274                         }
1275                 }
1276
1277                 free(tbp);
1278         }
1279
1280         return entries_handled;
1281 }
1282
1283 /*
1284  * Tack 'tbp's buf onto the tail of 'prev's buf
1285  */
1286 static struct trace_buf *tb_combine(struct trace_buf *prev,
1287                                     struct trace_buf *tbp)
1288 {
1289         unsigned long tot_len;
1290
1291         tot_len = prev->len + tbp->len;
1292         if (tot_len > buf_size) {
1293                 /*
1294                  * tbp->head isn't connected (it was 'prev'
1295                  * so it had been taken off of the list
1296                  * before). Therefore, we can realloc
1297                  * the whole structures, as the other fields
1298                  * are "static".
1299                  */
1300                 prev = realloc(prev->buf, sizeof(*prev) + tot_len);
1301                 prev->buf = (void *)(prev + 1);
1302         }
1303
1304         memcpy(prev->buf + prev->len, tbp->buf, tbp->len);
1305         prev->len = tot_len;
1306
1307         free(tbp);
1308         return prev;
1309 }
1310
1311 static int handle_list_file(struct tracer_devpath_head *hd,
1312                             struct list_head *list)
1313 {
1314         int off, t_len, nevents;
1315         struct blk_io_trace *t;
1316         struct list_head *p, *q;
1317         int entries_handled = 0;
1318         struct trace_buf *tbp, *prev;
1319
1320         prev = hd->prev;
1321         list_for_each_safe(p, q, list) {
1322                 tbp = list_entry(p, struct trace_buf, head);
1323                 list_del(&tbp->head);
1324                 entries_handled++;
1325
1326                 /*
1327                  * If there was some leftover before, tack this new
1328                  * entry onto the tail of the previous one.
1329                  */
1330                 if (prev)
1331                         tbp = tb_combine(prev, tbp);
1332
1333                 /*
1334                  * See how many whole traces there are - send them
1335                  * all out in one go.
1336                  */
1337                 off = 0;
1338                 nevents = 0;
1339                 while (off + (int)sizeof(*t) <= tbp->len) {
1340                         t = (struct blk_io_trace *)(tbp->buf + off);
1341                         t_len = sizeof(*t) + t->pdu_len;
1342                         if (off + t_len > tbp->len)
1343                                 break;
1344
1345                         off += t_len;
1346                         nevents++;
1347                 }
1348                 if (nevents)
1349                         pdc_nev_update(tbp->dpp, tbp->cpu, nevents);
1350
1351                 /*
1352                  * Write any full set of traces, any remaining data is kept
1353                  * for the next pass.
1354                  */
1355                 if (off) {
1356                         if (write_data(tbp->buf, off) || off == tbp->len) {
1357                                 free(tbp);
1358                                 prev = NULL;
1359                         }
1360                         else {
1361                                 /*
1362                                  * Move valid data to beginning of buffer
1363                                  */
1364                                 tbp->len -= off;
1365                                 memmove(tbp->buf, tbp->buf + off, tbp->len);
1366                                 prev = tbp;
1367                         }
1368                 } else
1369                         prev = tbp;
1370         }
1371         hd->prev = prev;
1372
1373         return entries_handled;
1374 }
1375
1376 static void __process_trace_bufs(void)
1377 {
1378         int cpu;
1379         struct list_head *p;
1380         struct list_head list;
1381         int handled = 0;
1382
1383         __list_for_each(p, &devpaths) {
1384                 struct devpath *dpp = list_entry(p, struct devpath, head);
1385                 struct tracer_devpath_head *hd = dpp->heads;
1386
1387                 for (cpu = 0; cpu < ncpus; cpu++, hd++) {
1388                         pthread_mutex_lock(&hd->mutex);
1389                         if (list_empty(&hd->head)) {
1390                                 pthread_mutex_unlock(&hd->mutex);
1391                                 continue;
1392                         }
1393
1394                         list_replace_init(&hd->head, &list);
1395                         pthread_mutex_unlock(&hd->mutex);
1396
1397                         handled += handle_list(hd, &list);
1398                 }
1399         }
1400
1401         if (handled)
1402                 decr_entries(handled);
1403 }
1404
1405 static void process_trace_bufs(void)
1406 {
1407         while (wait_empty_entries())
1408                 __process_trace_bufs();
1409 }
1410
1411 static void clean_trace_bufs(void)
1412 {
1413         /*
1414          * No mutex needed here: we're only reading from the lists,
1415          * tracers are done
1416          */
1417         while (dp_entries)
1418                 __process_trace_bufs();
1419 }
1420
1421 static inline void read_err(int cpu, char *ifn)
1422 {
1423         if (errno != EAGAIN)
1424                 fprintf(stderr, "Thread %d failed read of %s: %d/%s\n",
1425                         cpu, ifn, errno, strerror(errno));
1426 }
1427
1428 static int net_sendfile(struct io_info *iop)
1429 {
1430         int ret;
1431
1432         ret = sendfile(iop->ofd, iop->ifd, NULL, iop->ready);
1433         if (ret < 0) {
1434                 perror("sendfile");
1435                 return 1;
1436         } else if (ret < (int)iop->ready) {
1437                 fprintf(stderr, "short sendfile send (%d of %d)\n",
1438                         ret, iop->ready);
1439                 return 1;
1440         }
1441
1442         return 0;
1443 }
1444
1445 static inline int net_sendfile_data(struct tracer *tp, struct io_info *iop)
1446 {
1447         struct devpath *dpp = iop->dpp;
1448
1449         if (net_send_header(iop->ofd, tp->cpu, dpp->buts_name, iop->ready))
1450                 return 1;
1451         return net_sendfile(iop);
1452 }
1453
1454 static int fill_ofname(struct io_info *iop, int cpu)
1455 {
1456         int len;
1457         struct stat sb;
1458         char *dst = iop->ofn;
1459
1460         if (output_dir)
1461                 len = snprintf(iop->ofn, sizeof(iop->ofn), "%s/", output_dir);
1462         else
1463                 len = snprintf(iop->ofn, sizeof(iop->ofn), "./");
1464
1465         if (net_mode == Net_server) {
1466                 struct cl_conn *nc = iop->nc;
1467
1468                 len += sprintf(dst + len, "%s-", nc->ch->hostname);
1469                 len += strftime(dst + len, 64, "%F-%T/",
1470                                 gmtime(&iop->dpp->cl_connect_time));
1471         }
1472
1473         if (stat(iop->ofn, &sb) < 0) {
1474                 if (errno != ENOENT) {
1475                         fprintf(stderr,
1476                                 "Destination dir %s stat failed: %d/%s\n",
1477                                 iop->ofn, errno, strerror(errno));
1478                         return 1;
1479                 }
1480                 /*
1481                  * There is no synchronization between multiple threads
1482                  * trying to create the directory at once.  It's harmless
1483                  * to let them try, so just detect the problem and move on.
1484                  */
1485                 if (mkdir(iop->ofn, 0755) < 0 && errno != EEXIST) {
1486                         fprintf(stderr,
1487                                 "Destination dir %s can't be made: %d/%s\n",
1488                                 iop->ofn, errno, strerror(errno));
1489                         return 1;
1490                 }
1491         }
1492
1493         if (output_name)
1494                 snprintf(iop->ofn + len, sizeof(iop->ofn), "%s.blktrace.%d",
1495                          output_name, cpu);
1496         else
1497                 snprintf(iop->ofn + len, sizeof(iop->ofn), "%s.blktrace.%d",
1498                          iop->dpp->buts_name, cpu);
1499
1500         return 0;
1501 }
1502
1503 static int set_vbuf(struct io_info *iop, int mode, size_t size)
1504 {
1505         iop->obuf = malloc(size);
1506         if (setvbuf(iop->ofp, iop->obuf, mode, size) < 0) {
1507                 fprintf(stderr, "setvbuf(%s, %d) failed: %d/%s\n",
1508                         iop->dpp->path, (int)size, errno,
1509                         strerror(errno));
1510                 free(iop->obuf);
1511                 return 1;
1512         }
1513
1514         return 0;
1515 }
1516
1517 static int iop_open(struct io_info *iop, int cpu)
1518 {
1519         iop->ofd = -1;
1520         if (fill_ofname(iop, cpu))
1521                 return 1;
1522
1523         iop->ofp = my_fopen(iop->ofn, "w+");
1524         if (iop->ofp == NULL) {
1525                 fprintf(stderr, "Open output file %s failed: %d/%s\n",
1526                         iop->ofn, errno, strerror(errno));
1527                 return 1;
1528         }
1529
1530         if (set_vbuf(iop, _IOLBF, FILE_VBUF_SIZE)) {
1531                 fprintf(stderr, "set_vbuf for file %s failed: %d/%s\n",
1532                         iop->ofn, errno, strerror(errno));
1533                 fclose(iop->ofp);
1534                 return 1;
1535         }
1536
1537         iop->ofd = fileno(iop->ofp);
1538         return 0;
1539 }
1540
1541 static void close_iop(struct io_info *iop)
1542 {
1543         struct mmap_info *mip = &iop->mmap_info;
1544
1545         if (mip->fs_buf)
1546                 munmap(mip->fs_buf, mip->fs_buf_len);
1547
1548         if (!piped_output) {
1549                 if (ftruncate(fileno(iop->ofp), mip->fs_size) < 0) {
1550                         fprintf(stderr,
1551                                 "Ignoring err: ftruncate(%s): %d/%s\n",
1552                                 iop->ofn, errno, strerror(errno));
1553                 }
1554         }
1555
1556         if (iop->ofp)
1557                 fclose(iop->ofp);
1558         if (iop->obuf)
1559                 free(iop->obuf);
1560 }
1561
1562 static void close_ios(struct tracer *tp)
1563 {
1564         while (tp->nios > 0) {
1565                 struct io_info *iop = &tp->ios[--tp->nios];
1566
1567                 iop->dpp->drops = get_drops(iop->dpp);
1568                 if (iop->ifd >= 0)
1569                         close(iop->ifd);
1570
1571                 if (iop->ofp)
1572                         close_iop(iop);
1573                 else if (iop->ofd >= 0) {
1574                         struct devpath *dpp = iop->dpp;
1575
1576                         net_send_close(iop->ofd, dpp->buts_name, dpp->drops);
1577                         net_close_connection(&iop->ofd);
1578                 }
1579         }
1580
1581         free(tp->ios);
1582         free(tp->pfds);
1583 }
1584
1585 static int open_ios(struct tracer *tp)
1586 {
1587         struct pollfd *pfd;
1588         struct io_info *iop;
1589         struct list_head *p;
1590
1591         tp->ios = calloc(ndevs, sizeof(struct io_info));
1592         memset(tp->ios, 0, ndevs * sizeof(struct io_info));
1593
1594         tp->pfds = calloc(ndevs, sizeof(struct pollfd));
1595         memset(tp->pfds, 0, ndevs * sizeof(struct pollfd));
1596
1597         tp->nios = 0;
1598         iop = tp->ios;
1599         pfd = tp->pfds;
1600         __list_for_each(p, &devpaths) {
1601                 struct devpath *dpp = list_entry(p, struct devpath, head);
1602
1603                 iop->dpp = dpp;
1604                 iop->ofd = -1;
1605                 snprintf(iop->ifn, sizeof(iop->ifn), "%s/block/%s/trace%d",
1606                         debugfs_path, dpp->buts_name, tp->cpu);
1607
1608                 iop->ifd = my_open(iop->ifn, O_RDONLY | O_NONBLOCK);
1609                 if (iop->ifd < 0) {
1610                         fprintf(stderr, "Thread %d failed open %s: %d/%s\n",
1611                                 tp->cpu, iop->ifn, errno, strerror(errno));
1612                         return 1;
1613                 }
1614
1615                 init_mmap_info(&iop->mmap_info);
1616
1617                 pfd->fd = iop->ifd;
1618                 pfd->events = POLLIN;
1619
1620                 if (piped_output)
1621                         ;
1622                 else if (net_client_use_sendfile()) {
1623                         iop->ofd = net_setup_client();
1624                         if (iop->ofd < 0)
1625                                 goto err;
1626                         net_send_open(iop->ofd, tp->cpu, dpp->buts_name);
1627                 } else if (net_mode == Net_none) {
1628                         if (iop_open(iop, tp->cpu))
1629                                 goto err;
1630                 } else {
1631                         /*
1632                          * This ensures that the server knows about all
1633                          * connections & devices before _any_ closes
1634                          */
1635                         net_send_open(cl_fds[tp->cpu], tp->cpu, dpp->buts_name);
1636                 }
1637
1638                 pfd++;
1639                 iop++;
1640                 tp->nios++;
1641         }
1642
1643         return 0;
1644
1645 err:
1646         close(iop->ifd);        /* tp->nios _not_ bumped */
1647         close_ios(tp);
1648         return 1;
1649 }
1650
1651 static int handle_pfds_file(struct tracer *tp, int nevs, int force_read)
1652 {
1653         struct mmap_info *mip;
1654         int i, ret, nentries = 0;
1655         struct pollfd *pfd = tp->pfds;
1656         struct io_info *iop = tp->ios;
1657
1658         for (i = 0; nevs > 0 && i < ndevs; i++, pfd++, iop++) {
1659                 if (pfd->revents & POLLIN || force_read) {
1660                         mip = &iop->mmap_info;
1661
1662                         ret = setup_mmap(iop->ofd, buf_size, mip);
1663                         if (ret < 0) {
1664                                 pfd->events = 0;
1665                                 break;
1666                         }
1667
1668                         ret = read(iop->ifd, mip->fs_buf + mip->fs_off,
1669                                    buf_size);
1670                         if (ret > 0) {
1671                                 pdc_dr_update(iop->dpp, tp->cpu, ret);
1672                                 mip->fs_size += ret;
1673                                 mip->fs_off += ret;
1674                                 nentries++;
1675                         } else if (ret == 0) {
1676                                 /*
1677                                  * Short reads after we're done stop us
1678                                  * from trying reads.
1679                                  */
1680                                 if (tp->is_done)
1681                                         clear_events(pfd);
1682                         } else {
1683                                 read_err(tp->cpu, iop->ifn);
1684                                 if (errno != EAGAIN || tp->is_done)
1685                                         clear_events(pfd);
1686                         }
1687                         nevs--;
1688                 }
1689         }
1690
1691         return nentries;
1692 }
1693
1694 static int handle_pfds_netclient(struct tracer *tp, int nevs, int force_read)
1695 {
1696         struct stat sb;
1697         int i, nentries = 0;
1698         struct pdc_stats *sp;
1699         struct pollfd *pfd = tp->pfds;
1700         struct io_info *iop = tp->ios;
1701
1702         for (i = 0; i < ndevs; i++, pfd++, iop++, sp++) {
1703                 if (pfd->revents & POLLIN || force_read) {
1704                         if (fstat(iop->ifd, &sb) < 0) {
1705                                 perror(iop->ifn);
1706                                 pfd->events = 0;
1707                         } else if (sb.st_size > (off_t)iop->data_queued) {
1708                                 iop->ready = sb.st_size - iop->data_queued;
1709                                 iop->data_queued = sb.st_size;
1710
1711                                 if (!net_sendfile_data(tp, iop)) {
1712                                         pdc_dr_update(iop->dpp, tp->cpu,
1713                                                       iop->ready);
1714                                         nentries++;
1715                                 } else
1716                                         clear_events(pfd);
1717                         }
1718                         if (--nevs == 0)
1719                                 break;
1720                 }
1721         }
1722
1723         if (nentries)
1724                 incr_entries(nentries);
1725
1726         return nentries;
1727 }
1728
1729 static int handle_pfds_entries(struct tracer *tp, int nevs, int force_read)
1730 {
1731         int i, nentries = 0;
1732         struct trace_buf *tbp;
1733         struct pollfd *pfd = tp->pfds;
1734         struct io_info *iop = tp->ios;
1735
1736         tbp = alloc_trace_buf(tp->cpu, buf_size);
1737         for (i = 0; i < ndevs; i++, pfd++, iop++) {
1738                 if (pfd->revents & POLLIN || force_read) {
1739                         tbp->len = read(iop->ifd, tbp->buf, buf_size);
1740                         if (tbp->len > 0) {
1741                                 pdc_dr_update(iop->dpp, tp->cpu, tbp->len);
1742                                 add_trace_buf(iop->dpp, tp->cpu, &tbp);
1743                                 nentries++;
1744                         } else if (tbp->len == 0) {
1745                                 /*
1746                                  * Short reads after we're done stop us
1747                                  * from trying reads.
1748                                  */
1749                                 if (tp->is_done)
1750                                         clear_events(pfd);
1751                         } else {
1752                                 read_err(tp->cpu, iop->ifn);
1753                                 if (errno != EAGAIN || tp->is_done)
1754                                         clear_events(pfd);
1755                         }
1756                         if (!piped_output && --nevs == 0)
1757                                 break;
1758                 }
1759         }
1760         free(tbp);
1761
1762         if (nentries)
1763                 incr_entries(nentries);
1764
1765         return nentries;
1766 }
1767
1768 static void *thread_main(void *arg)
1769 {
1770         int ret, ndone, to_val;
1771         struct tracer *tp = arg;
1772
1773         ret = lock_on_cpu(tp->cpu);
1774         if (ret)
1775                 goto err;
1776
1777         ret = open_ios(tp);
1778         if (ret)
1779                 goto err;
1780
1781         if (piped_output)
1782                 to_val = 50;            /* Frequent partial handles */
1783         else
1784                 to_val = 500;           /* 1/2 second intervals */
1785
1786
1787         tracer_signal_ready(tp, Th_running, 0);
1788         tracer_wait_unblock(tp);
1789
1790         while (!tp->is_done) {
1791                 ndone = poll(tp->pfds, ndevs, to_val);
1792                 if (ndone || piped_output)
1793                         (void)handle_pfds(tp, ndone, piped_output);
1794                 else if (ndone < 0 && errno != EINTR)
1795                         fprintf(stderr, "Thread %d poll failed: %d/%s\n",
1796                                 tp->cpu, errno, strerror(errno));
1797         }
1798
1799         /*
1800          * Trace is stopped, pull data until we get a short read
1801          */
1802         while (handle_pfds(tp, ndevs, 1) > 0)
1803                 ;
1804
1805         close_ios(tp);
1806         tracer_signal_ready(tp, Th_leaving, 0);
1807         return NULL;
1808
1809 err:
1810         tracer_signal_ready(tp, Th_error, ret);
1811         return NULL;
1812 }
1813
1814 static int start_tracer(int cpu)
1815 {
1816         struct tracer *tp;
1817
1818         tp = malloc(sizeof(*tp));
1819         memset(tp, 0, sizeof(*tp));
1820
1821         INIT_LIST_HEAD(&tp->head);
1822         tp->status = 0;
1823         tp->cpu = cpu;
1824
1825         if (pthread_create(&tp->thread, NULL, thread_main, tp)) {
1826                 fprintf(stderr, "FAILED to start thread on CPU %d: %d/%s\n",
1827                         cpu, errno, strerror(errno));
1828                 free(tp);
1829                 return 1;
1830         }
1831
1832         list_add_tail(&tp->head, &tracers);
1833         return 0;
1834 }
1835
1836 static void start_tracers(void)
1837 {
1838         int cpu;
1839         struct list_head *p;
1840
1841         for (cpu = 0; cpu < ncpus; cpu++)
1842                 if (start_tracer(cpu))
1843                         break;
1844
1845         wait_tracers_ready(cpu);
1846
1847         __list_for_each(p, &tracers) {
1848                 struct tracer *tp = list_entry(p, struct tracer, head);
1849                 if (tp->status)
1850                         fprintf(stderr,
1851                                 "FAILED to start thread on CPU %d: %d/%s\n",
1852                                 tp->cpu, tp->status, strerror(tp->status));
1853         }
1854 }
1855
1856 static void stop_tracers(void)
1857 {
1858         struct list_head *p;
1859
1860         /*
1861          * Stop the tracing - makes the tracer threads clean up quicker.
1862          */
1863         __list_for_each(p, &devpaths) {
1864                 struct devpath *dpp = list_entry(p, struct devpath, head);
1865                 (void)ioctl(dpp->fd, BLKTRACESTOP);
1866         }
1867
1868         /*
1869          * Tell each tracer to quit
1870          */
1871         __list_for_each(p, &tracers) {
1872                 struct tracer *tp = list_entry(p, struct tracer, head);
1873                 tp->is_done = 1;
1874         }
1875 }
1876
1877 static void del_tracers(void)
1878 {
1879         struct list_head *p, *q;
1880
1881         list_for_each_safe(p, q, &tracers) {
1882                 struct tracer *tp = list_entry(p, struct tracer, head);
1883
1884                 list_del(&tp->head);
1885                 free(tp);
1886         }
1887 }
1888
1889 static void wait_tracers(void)
1890 {
1891         struct list_head *p;
1892
1893         if (use_tracer_devpaths())
1894                 process_trace_bufs();
1895
1896         wait_tracers_leaving();
1897
1898         __list_for_each(p, &tracers) {
1899                 int ret;
1900                 struct tracer *tp = list_entry(p, struct tracer, head);
1901
1902                 ret = pthread_join(tp->thread, NULL);
1903                 if (ret)
1904                         fprintf(stderr, "Thread join %d failed %d\n",
1905                                 tp->cpu, ret);
1906         }
1907
1908         if (use_tracer_devpaths())
1909                 clean_trace_bufs();
1910
1911         get_all_drops();
1912 }
1913
1914 static void exit_tracing(void)
1915 {
1916         signal(SIGINT, SIG_IGN);
1917         signal(SIGHUP, SIG_IGN);
1918         signal(SIGTERM, SIG_IGN);
1919         signal(SIGALRM, SIG_IGN);
1920
1921         stop_tracers();
1922         wait_tracers();
1923         del_tracers();
1924         rel_devpaths();
1925 }
1926
1927 static void handle_sigint(__attribute__((__unused__)) int sig)
1928 {
1929         done = 1;
1930         stop_tracers();
1931 }
1932
1933 static void show_stats(struct list_head *devpaths)
1934 {
1935         FILE *ofp;
1936         struct list_head *p;
1937         unsigned long long nevents, data_read;
1938         unsigned long long total_drops = 0;
1939         unsigned long long total_events = 0;
1940
1941         if (piped_output)
1942                 ofp = my_fopen("/dev/null", "w");
1943         else
1944                 ofp = stdout;
1945
1946         __list_for_each(p, devpaths) {
1947                 int cpu;
1948                 struct pdc_stats *sp;
1949                 struct devpath *dpp = list_entry(p, struct devpath, head);
1950
1951                 if (net_mode == Net_server)
1952                         printf("server: end of run for %s:%s\n",
1953                                 dpp->ch->hostname, dpp->buts_name);
1954
1955                 data_read = 0;
1956                 nevents = 0;
1957
1958                 fprintf(ofp, "=== %s ===\n", dpp->buts_name);
1959                 for (cpu = 0, sp = dpp->stats; cpu < dpp->ncpus; cpu++, sp++) {
1960                         /*
1961                          * Estimate events if not known...
1962                          */
1963                         if (sp->nevents == 0) {
1964                                 sp->nevents = sp->data_read /
1965                                                 sizeof(struct blk_io_trace);
1966                         }
1967
1968                         fprintf(ofp,
1969                                 "  CPU%3d: %20llu events, %8llu KiB data\n",
1970                                 cpu, sp->nevents, (sp->data_read + 1023) >> 10);
1971
1972                         data_read += sp->data_read;
1973                         nevents += sp->nevents;
1974                 }
1975
1976                 fprintf(ofp, "  Total:  %20llu events (dropped %llu),"
1977                              " %8llu KiB data\n", nevents,
1978                              dpp->drops, (data_read + 1024) >> 10);
1979
1980                 total_drops += dpp->drops;
1981                 total_events += (nevents + dpp->drops);
1982         }
1983
1984         fflush(ofp);
1985         if (piped_output)
1986                 fclose(ofp);
1987
1988         if (total_drops) {
1989                 double drops_ratio = 1.0;
1990
1991                 if (total_events)
1992                         drops_ratio = (double)total_drops/(double)total_events;
1993
1994                 fprintf(stderr, "\nYou have %llu (%5.1lf%%) dropped events\n"
1995                                 "Consider using a larger buffer size (-b) "
1996                                 "and/or more buffers (-n)\n",
1997                         total_drops, 100.0 * drops_ratio);
1998         }
1999 }
2000
2001 static int handle_args(int argc, char *argv[])
2002 {
2003         int c, i;
2004         struct statfs st;
2005         int act_mask_tmp = 0;
2006
2007         while ((c = getopt_long(argc, argv, S_OPTS, l_opts, NULL)) >= 0) {
2008                 switch (c) {
2009                 case 'a':
2010                         i = find_mask_map(optarg);
2011                         if (i < 0) {
2012                                 fprintf(stderr, "Invalid action mask %s\n",
2013                                         optarg);
2014                                 return 1;
2015                         }
2016                         act_mask_tmp |= i;
2017                         break;
2018
2019                 case 'A':
2020                         if ((sscanf(optarg, "%x", &i) != 1) ||
2021                                                         !valid_act_opt(i)) {
2022                                 fprintf(stderr,
2023                                         "Invalid set action mask %s/0x%x\n",
2024                                         optarg, i);
2025                                 return 1;
2026                         }
2027                         act_mask_tmp = i;
2028                         break;
2029
2030                 case 'd':
2031                         if (add_devpath(optarg) != 0)
2032                                 return 1;
2033                         break;
2034
2035                 case 'I': {
2036                         char dev_line[256];
2037                         FILE *ifp = my_fopen(optarg, "r");
2038
2039                         if (!ifp) {
2040                                 fprintf(stderr,
2041                                         "Invalid file for devices %s\n",
2042                                         optarg);
2043                                 return 1;
2044                         }
2045
2046                         while (fscanf(ifp, "%s\n", dev_line) == 1)
2047                                 if (add_devpath(dev_line) != 0)
2048                                         return 1;
2049                         break;
2050                 }
2051
2052                 case 'r':
2053                         debugfs_path = optarg;
2054                         break;
2055
2056                 case 'o':
2057                         output_name = optarg;
2058                         break;
2059                 case 'k':
2060                         kill_running_trace = 1;
2061                         break;
2062                 case 'w':
2063                         stop_watch = atoi(optarg);
2064                         if (stop_watch <= 0) {
2065                                 fprintf(stderr,
2066                                         "Invalid stopwatch value (%d secs)\n",
2067                                         stop_watch);
2068                                 return 1;
2069                         }
2070                         break;
2071                 case 'V':
2072                 case 'v':
2073                         printf("%s version %s\n", argv[0], blktrace_version);
2074                         exit(0);
2075                         /*NOTREACHED*/
2076                 case 'b':
2077                         buf_size = strtoul(optarg, NULL, 10);
2078                         if (buf_size <= 0 || buf_size > 16*1024) {
2079                                 fprintf(stderr, "Invalid buffer size (%lu)\n",
2080                                         buf_size);
2081                                 return 1;
2082                         }
2083                         buf_size <<= 10;
2084                         break;
2085                 case 'n':
2086                         buf_nr = strtoul(optarg, NULL, 10);
2087                         if (buf_nr <= 0) {
2088                                 fprintf(stderr,
2089                                         "Invalid buffer nr (%lu)\n", buf_nr);
2090                                 return 1;
2091                         }
2092                         break;
2093                 case 'D':
2094                         output_dir = optarg;
2095                         break;
2096                 case 'h':
2097                         net_mode = Net_client;
2098                         strcpy(hostname, optarg);
2099                         break;
2100                 case 'l':
2101                         net_mode = Net_server;
2102                         break;
2103                 case 'p':
2104                         net_port = atoi(optarg);
2105                         break;
2106                 case 's':
2107                         net_use_sendfile = 0;
2108                         break;
2109                 default:
2110                         show_usage(argv[0]);
2111                         exit(1);
2112                         /*NOTREACHED*/
2113                 }
2114         }
2115
2116         while (optind < argc)
2117                 if (add_devpath(argv[optind++]) != 0)
2118                         return 1;
2119
2120         if (net_mode != Net_server && ndevs == 0) {
2121                 show_usage(argv[0]);
2122                 return 1;
2123         }
2124
2125         if (statfs(debugfs_path, &st) < 0 || st.f_type != (long)DEBUGFS_TYPE) {
2126                 fprintf(stderr, "Invalid debug path %s: %d/%s\n",
2127                         debugfs_path, errno, strerror(errno));
2128                 return 1;
2129         }
2130
2131         if (act_mask_tmp != 0)
2132                 act_mask = act_mask_tmp;
2133
2134         if (net_mode == Net_client && net_setup_addr())
2135                 return 1;
2136
2137         /*
2138          * Set up for appropriate PFD handler based upon output name.
2139          */
2140         if (net_client_use_sendfile())
2141                 handle_pfds = handle_pfds_netclient;
2142         else if (net_client_use_send())
2143                 handle_pfds = handle_pfds_entries;
2144         else if (output_name && (strcmp(output_name, "-") == 0)) {
2145                 piped_output = 1;
2146                 handle_pfds = handle_pfds_entries;
2147                 pfp = stdout;
2148                 setvbuf(pfp, NULL, _IONBF, 0);
2149         } else
2150                 handle_pfds = handle_pfds_file;
2151         return 0;
2152 }
2153
2154 static void ch_add_connection(struct net_server_s *ns, struct cl_host *ch,
2155                               int fd)
2156 {
2157         struct cl_conn *nc;
2158
2159         nc = malloc(sizeof(*nc));
2160         memset(nc, 0, sizeof(*nc));
2161
2162         time(&nc->connect_time);
2163         nc->ch = ch;
2164         nc->fd = fd;
2165         nc->ncpus = -1;
2166
2167         list_add_tail(&nc->ch_head, &ch->conn_list);
2168         ch->connects++;
2169
2170         list_add_tail(&nc->ns_head, &ns->conn_list);
2171         ns->connects++;
2172         ns->pfds = realloc(ns->pfds, (ns->connects+1) * sizeof(struct pollfd));
2173 }
2174
2175 static void ch_rem_connection(struct net_server_s *ns, struct cl_host *ch,
2176                               struct cl_conn *nc)
2177 {
2178         net_close_connection(&nc->fd);
2179
2180         list_del(&nc->ch_head);
2181         ch->connects--;
2182
2183         list_del(&nc->ns_head);
2184         ns->connects--;
2185         ns->pfds = realloc(ns->pfds, (ns->connects+1) * sizeof(struct pollfd));
2186
2187         free(nc);
2188 }
2189
2190 static struct cl_host *net_find_client_host(struct net_server_s *ns,
2191                                             struct in_addr cl_in_addr)
2192 {
2193         struct list_head *p;
2194
2195         __list_for_each(p, &ns->ch_list) {
2196                 struct cl_host *ch = list_entry(p, struct cl_host, head);
2197
2198                 if (in_addr_eq(ch->cl_in_addr, cl_in_addr))
2199                         return ch;
2200         }
2201
2202         return NULL;
2203 }
2204
2205 static struct cl_host *net_add_client_host(struct net_server_s *ns,
2206                                            struct sockaddr_in *addr)
2207 {
2208         struct cl_host *ch;
2209
2210         ch = malloc(sizeof(*ch));
2211         memset(ch, 0, sizeof(*ch));
2212
2213         ch->ns = ns;
2214         ch->cl_in_addr = addr->sin_addr;
2215         list_add_tail(&ch->head, &ns->ch_list);
2216         ns->nchs++;
2217
2218         ch->hostname = strdup(inet_ntoa(addr->sin_addr));
2219         printf("server: connection from %s\n", ch->hostname);
2220
2221         INIT_LIST_HEAD(&ch->conn_list);
2222         INIT_LIST_HEAD(&ch->devpaths);
2223
2224         return ch;
2225 }
2226
2227 static void device_done(struct devpath *dpp, int ncpus)
2228 {
2229         int cpu;
2230         struct io_info *iop;
2231
2232         for (cpu = 0, iop = dpp->ios; cpu < ncpus; cpu++, iop++)
2233                 close_iop(iop);
2234
2235         list_del(&dpp->head);
2236         dpp_free(dpp);
2237 }
2238
2239 static void net_ch_remove(struct cl_host *ch, int ncpus)
2240 {
2241         struct list_head *p, *q;
2242         struct net_server_s *ns = ch->ns;
2243
2244         list_for_each_safe(p, q, &ch->devpaths) {
2245                 struct devpath *dpp = list_entry(p, struct devpath, head);
2246                 device_done(dpp, ncpus);
2247         }
2248
2249         list_for_each_safe(p, q, &ch->conn_list) {
2250                 struct cl_conn *nc = list_entry(p, struct cl_conn, ch_head);
2251
2252                 ch_rem_connection(ns, ch, nc);
2253         }
2254
2255         list_del(&ch->head);
2256         ns->nchs--;
2257
2258         if (ch->hostname)
2259                 free(ch->hostname);
2260         free(ch);
2261 }
2262
2263 static void net_add_connection(struct net_server_s *ns)
2264 {
2265         int fd;
2266         struct cl_host *ch;
2267         socklen_t socklen = sizeof(ns->addr);
2268
2269         fd = my_accept(ns->listen_fd, (struct sockaddr *)&ns->addr, &socklen);
2270         if (fd < 0) {
2271                 /*
2272                  * This is OK: we just won't accept this connection,
2273                  * nothing fatal.
2274                  */
2275                 perror("accept");
2276         } else {
2277                 ch = net_find_client_host(ns, ns->addr.sin_addr);
2278                 if (!ch)
2279                         ch = net_add_client_host(ns, &ns->addr);
2280
2281                 ch_add_connection(ns, ch, fd);
2282         }
2283 }
2284
2285 static struct devpath *nc_add_dpp(struct cl_conn *nc,
2286                                   struct blktrace_net_hdr *bnh,
2287                                   time_t connect_time)
2288 {
2289         int cpu;
2290         struct io_info *iop;
2291         struct devpath *dpp;
2292
2293         dpp = malloc(sizeof(*dpp));
2294         memset(dpp, 0, sizeof(*dpp));
2295
2296         dpp->buts_name = strdup(bnh->buts_name);
2297         dpp->path = strdup(bnh->buts_name);
2298         dpp->fd = -1;
2299         dpp->ch = nc->ch;
2300         dpp->cl_id = bnh->cl_id;
2301         dpp->cl_connect_time = connect_time;
2302         dpp->ncpus = nc->ncpus;
2303         dpp->stats = calloc(dpp->ncpus, sizeof(*dpp->stats));
2304         memset(dpp->stats, 0, dpp->ncpus * sizeof(*dpp->stats));
2305
2306         list_add_tail(&dpp->head, &nc->ch->devpaths);
2307         nc->ch->ndevs++;
2308
2309         dpp->ios = calloc(nc->ncpus, sizeof(*iop));
2310         memset(dpp->ios, 0, ndevs * sizeof(*iop));
2311
2312         for (cpu = 0, iop = dpp->ios; cpu < nc->ncpus; cpu++, iop++) {
2313                 iop->dpp = dpp;
2314                 iop->nc = nc;
2315                 init_mmap_info(&iop->mmap_info);
2316
2317                 if (iop_open(iop, cpu))
2318                         goto err;
2319         }
2320
2321         return dpp;
2322
2323 err:
2324         /*
2325          * Need to unravel what's been done...
2326          */
2327         while (cpu >= 0)
2328                 close_iop(&dpp->ios[cpu--]);
2329         dpp_free(dpp);
2330
2331         return NULL;
2332 }
2333
2334 static struct devpath *nc_find_dpp(struct cl_conn *nc,
2335                                    struct blktrace_net_hdr *bnh)
2336 {
2337         struct list_head *p;
2338         time_t connect_time = nc->connect_time;
2339
2340         __list_for_each(p, &nc->ch->devpaths) {
2341                 struct devpath *dpp = list_entry(p, struct devpath, head);
2342
2343                 if (!strcmp(dpp->buts_name, bnh->buts_name))
2344                         return dpp;
2345
2346                 if (dpp->cl_id == bnh->cl_id)
2347                         connect_time = dpp->cl_connect_time;
2348         }
2349
2350         return nc_add_dpp(nc, bnh, connect_time);
2351 }
2352
2353 static void net_client_read_data(struct cl_conn *nc, struct devpath *dpp,
2354                                  struct blktrace_net_hdr *bnh)
2355 {
2356         int ret;
2357         struct io_info *iop = &dpp->ios[bnh->cpu];
2358         struct mmap_info *mip = &iop->mmap_info;
2359
2360         if (setup_mmap(iop->ofd, bnh->len, &iop->mmap_info)) {
2361                 fprintf(stderr, "ncd(%s:%d): mmap failed\n",
2362                         nc->ch->hostname, nc->fd);
2363                 exit(1);
2364         }
2365
2366         ret = net_recv_data(nc->fd, mip->fs_buf + mip->fs_off, bnh->len);
2367         if (ret > 0) {
2368                 pdc_dr_update(dpp, bnh->cpu, ret);
2369                 mip->fs_size += ret;
2370                 mip->fs_off += ret;
2371         } else if (ret < 0)
2372                 exit(1);
2373 }
2374
2375 /*
2376  * Returns 1 if we closed a host - invalidates other polling information
2377  * that may be present.
2378  */
2379 static int net_client_data(struct cl_conn *nc)
2380 {
2381         int ret;
2382         struct devpath *dpp;
2383         struct blktrace_net_hdr bnh;
2384
2385         ret = net_get_header(nc, &bnh);
2386         if (ret == 0)
2387                 return 0;
2388
2389         if (ret < 0) {
2390                 fprintf(stderr, "ncd(%d): header read failed\n", nc->fd);
2391                 exit(1);
2392         }
2393
2394         if (data_is_native == -1 && check_data_endianness(bnh.magic)) {
2395                 fprintf(stderr, "ncd(%d): received data is bad\n", nc->fd);
2396                 exit(1);
2397         }
2398
2399         if (!data_is_native) {
2400                 bnh.magic = be32_to_cpu(bnh.magic);
2401                 bnh.cpu = be32_to_cpu(bnh.cpu);
2402                 bnh.max_cpus = be32_to_cpu(bnh.max_cpus);
2403                 bnh.len = be32_to_cpu(bnh.len);
2404                 bnh.cl_id = be32_to_cpu(bnh.cl_id);
2405                 bnh.buf_size = be32_to_cpu(bnh.buf_size);
2406                 bnh.buf_nr = be32_to_cpu(bnh.buf_nr);
2407                 bnh.page_size = be32_to_cpu(bnh.page_size);
2408         }
2409
2410         if ((bnh.magic & 0xffffff00) != BLK_IO_TRACE_MAGIC) {
2411                 fprintf(stderr, "ncd(%s:%d): bad data magic\n",
2412                         nc->ch->hostname, nc->fd);
2413                 exit(1);
2414         }
2415
2416         if (nc->ncpus == -1)
2417                 nc->ncpus = bnh.max_cpus;
2418
2419         /*
2420          * len == 0 means the other end is sending us a new connection/dpp
2421          * len == 1 means that the other end signalled end-of-run
2422          */
2423         dpp = nc_find_dpp(nc, &bnh);
2424         if (bnh.len == 0) {
2425                 /*
2426                  * Just adding in the dpp above is enough
2427                  */
2428                 ack_open_close(nc->fd, dpp->buts_name);
2429                 nc->ch->cl_opens++;
2430         } else if (bnh.len == 1) {
2431                 /*
2432                  * overload cpu count with dropped events
2433                  */
2434                 dpp->drops = bnh.cpu;
2435
2436                 ack_open_close(nc->fd, dpp->buts_name);
2437                 if (--nc->ch->cl_opens == 0) {
2438                         show_stats(&nc->ch->devpaths);
2439                         net_ch_remove(nc->ch, nc->ncpus);
2440                         return 1;
2441                 }
2442         } else
2443                 net_client_read_data(nc, dpp, &bnh);
2444
2445         return 0;
2446 }
2447
2448 static void handle_client_data(struct net_server_s *ns, int events)
2449 {
2450         struct cl_conn *nc;
2451         struct pollfd *pfd;
2452         struct list_head *p, *q;
2453
2454         pfd = &ns->pfds[1];
2455         list_for_each_safe(p, q, &ns->conn_list) {
2456                 if (pfd->revents & POLLIN) {
2457                         nc = list_entry(p, struct cl_conn, ns_head);
2458
2459                         if (net_client_data(nc) || --events == 0)
2460                                 break;
2461                 }
2462                 pfd++;
2463         }
2464 }
2465
2466 static void net_setup_pfds(struct net_server_s *ns)
2467 {
2468         struct pollfd *pfd;
2469         struct list_head *p;
2470
2471         ns->pfds[0].fd = ns->listen_fd;
2472         ns->pfds[0].events = POLLIN;
2473
2474         pfd = &ns->pfds[1];
2475         __list_for_each(p, &ns->conn_list) {
2476                 struct cl_conn *nc = list_entry(p, struct cl_conn, ns_head);
2477
2478                 pfd->fd = nc->fd;
2479                 pfd->events = POLLIN;
2480                 pfd++;
2481         }
2482 }
2483
2484 static int net_server_handle_connections(struct net_server_s *ns)
2485 {
2486         int events;
2487
2488         printf("server: waiting for connections...\n");
2489
2490         while (!done) {
2491                 net_setup_pfds(ns);
2492                 events = poll(ns->pfds, ns->connects + 1, -1);
2493                 if (events < 0) {
2494                         if (errno != EINTR) {
2495                                 perror("FATAL: poll error");
2496                                 return 1;
2497                         }
2498                 } else if (events > 0) {
2499                         if (ns->pfds[0].revents & POLLIN) {
2500                                 net_add_connection(ns);
2501                                 events--;
2502                         }
2503
2504                         if (events)
2505                                 handle_client_data(ns, events);
2506                 }
2507         }
2508
2509         return 0;
2510 }
2511
2512 static int net_server(void)
2513 {
2514         int fd, opt;
2515         int ret = 1;
2516         struct net_server_s net_server;
2517         struct net_server_s *ns = &net_server;
2518
2519         memset(ns, 0, sizeof(*ns));
2520         INIT_LIST_HEAD(&ns->ch_list);
2521         INIT_LIST_HEAD(&ns->conn_list);
2522         ns->pfds = malloc(sizeof(struct pollfd));
2523
2524         fd = my_socket(AF_INET, SOCK_STREAM, 0);
2525         if (fd < 0) {
2526                 perror("server: socket");
2527                 goto out;
2528         }
2529
2530         opt = 1;
2531         if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
2532                 perror("setsockopt");
2533                 goto out;
2534         }
2535
2536         memset(&ns->addr, 0, sizeof(ns->addr));
2537         ns->addr.sin_family = AF_INET;
2538         ns->addr.sin_addr.s_addr = htonl(INADDR_ANY);
2539         ns->addr.sin_port = htons(net_port);
2540
2541         if (bind(fd, (struct sockaddr *) &ns->addr, sizeof(ns->addr)) < 0) {
2542                 perror("bind");
2543                 goto out;
2544         }
2545
2546         if (listen(fd, 1) < 0) {
2547                 perror("listen");
2548                 goto out;
2549         }
2550
2551         /*
2552          * The actual server looping is done here:
2553          */
2554         ns->listen_fd = fd;
2555         ret = net_server_handle_connections(ns);
2556
2557         /*
2558          * Clean up and return...
2559          */
2560 out:
2561         free(ns->pfds);
2562         return ret;
2563 }
2564
2565 static int run_tracers(void)
2566 {
2567         atexit(exit_tracing);
2568         if (net_mode == Net_client)
2569                 printf("blktrace: connecting to %s\n", hostname);
2570
2571         setup_buts();
2572
2573         if (use_tracer_devpaths()) {
2574                 if (setup_tracer_devpaths())
2575                         return 1;
2576
2577                 if (piped_output)
2578                         handle_list = handle_list_file;
2579                 else
2580                         handle_list = handle_list_net;
2581         }
2582
2583         start_tracers();
2584         if (nthreads_running == ncpus) {
2585                 unblock_tracers();
2586                 start_buts();
2587                 if (net_mode == Net_client)
2588                         printf("blktrace: connected!\n");
2589                 if (stop_watch)
2590                         alarm(stop_watch);
2591         } else
2592                 stop_tracers();
2593
2594         wait_tracers();
2595         if (nthreads_running == ncpus)
2596                 show_stats(&devpaths);
2597         if (net_client_use_send())
2598                 close_client_connections();
2599         del_tracers();
2600
2601         return 0;
2602 }
2603
2604 int main(int argc, char *argv[])
2605 {
2606         int ret = 0;
2607
2608         setlocale(LC_NUMERIC, "en_US");
2609         pagesize = getpagesize();
2610         ncpus = sysconf(_SC_NPROCESSORS_ONLN);
2611         if (ncpus < 0) {
2612                 fprintf(stderr, "sysconf(_SC_NPROCESSORS_ONLN) failed %d/%s\n",
2613                         errno, strerror(errno));
2614                 ret = 1;
2615                 goto out;
2616         } else if (handle_args(argc, argv)) {
2617                 ret = 1;
2618                 goto out;
2619         }
2620
2621         signal(SIGINT, handle_sigint);
2622         signal(SIGHUP, handle_sigint);
2623         signal(SIGTERM, handle_sigint);
2624         signal(SIGALRM, handle_sigint);
2625         signal(SIGPIPE, SIG_IGN);
2626
2627         if (kill_running_trace) {
2628                 struct devpath *dpp;
2629                 struct list_head *p;
2630
2631                 __list_for_each(p, &devpaths) {
2632                         dpp = list_entry(p, struct devpath, head);
2633                         if (__stop_trace(dpp->fd)) {
2634                                 fprintf(stderr,
2635                                         "BLKTRACETEARDOWN %s failed: %d/%s\n",
2636                                         dpp->path, errno, strerror(errno));
2637                         }
2638                 }
2639         } else if (net_mode == Net_server) {
2640                 if (output_name) {
2641                         fprintf(stderr, "-o ignored in server mode\n");
2642                         output_name = NULL;
2643                 }
2644                 ret = net_server();
2645         } else
2646                 ret = run_tracers();
2647
2648 out:
2649         if (pfp)
2650                 fclose(pfp);
2651         rel_devpaths();
2652         return ret;
2653 }