blktrace: disallow -o when using multiple devices
[blktrace.git] / blktrace.c
1 /*
2  * block queue tracing application
3  *
4  * Copyright (C) 2005 Jens Axboe <axboe@suse.de>
5  * Copyright (C) 2006 Jens Axboe <axboe@kernel.dk>
6  *
7  * Rewrite to have a single thread per CPU (managing all devices on that CPU)
8  *      Alan D. Brunelle <alan.brunelle@hp.com> - January 2009
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *
20  *  You should have received a copy of the GNU General Public License
21  *  along with this program; if not, write to the Free Software
22  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
23  *
24  */
25
26 #include <errno.h>
27 #include <stdarg.h>
28 #include <stdio.h>
29 #include <stdlib.h>
30 #include <string.h>
31 #include <fcntl.h>
32 #include <getopt.h>
33 #include <sched.h>
34 #include <unistd.h>
35 #include <poll.h>
36 #include <signal.h>
37 #include <pthread.h>
38 #include <locale.h>
39 #include <sys/ioctl.h>
40 #include <sys/types.h>
41 #include <sys/stat.h>
42 #include <sys/vfs.h>
43 #include <sys/mman.h>
44 #include <sys/param.h>
45 #include <sys/time.h>
46 #include <sys/resource.h>
47 #include <sys/socket.h>
48 #include <netinet/in.h>
49 #include <arpa/inet.h>
50 #include <netdb.h>
51 #include <sys/sendfile.h>
52
53 #include "btt/list.h"
54 #include "blktrace.h"
55
56 /*
57  * You may want to increase this even more, if you are logging at a high
58  * rate and see skipped/missed events
59  */
60 #define BUF_SIZE                (512 * 1024)
61 #define BUF_NR                  (4)
62
63 #define FILE_VBUF_SIZE          (128 * 1024)
64
65 #define DEBUGFS_TYPE            (0x64626720)
66 #define TRACE_NET_PORT          (8462)
67
68 enum {
69         Net_none = 0,
70         Net_server,
71         Net_client,
72 };
73
74 enum thread_status {
75         Th_running,
76         Th_leaving,
77         Th_error
78 };
79
80 /*
81  * Generic stats collected: nevents can be _roughly_ estimated by data_read
82  * (discounting pdu...)
83  *
84  * These fields are updated w/ pdc_dr_update & pdc_nev_update below.
85  */
86 struct pdc_stats {
87         unsigned long long data_read;
88         unsigned long long nevents;
89 };
90
91 struct devpath {
92         struct list_head head;
93         char *path;                     /* path to device special file */
94         char *buts_name;                /* name returned from bt kernel code */
95         struct pdc_stats *stats;
96         int fd, idx, ncpus;
97         unsigned long long drops;
98
99         /*
100          * For piped output only:
101          *
102          * Each tracer will have a tracer_devpath_head that it will add new
103          * data onto. It's list is protected above (tracer_devpath_head.mutex)
104          * and it will signal the processing thread using the dp_cond,
105          * dp_mutex & dp_entries variables above.
106          */
107         struct tracer_devpath_head *heads;
108
109         /*
110          * For network server mode only:
111          */
112         struct cl_host *ch;
113         u32 cl_id;
114         time_t cl_connect_time;
115         struct io_info *ios;
116 };
117
118 /*
119  * For piped output to stdout we will have each tracer thread (one per dev)
120  * tack buffers read from the relay queues on a per-device list.
121  *
122  * The main thread will then collect trace buffers from each of lists in turn.
123  *
124  * We will use a mutex to guard each of the trace_buf list. The tracers
125  * can then signal the main thread using <dp_cond,dp_mutex> and
126  * dp_entries. (When dp_entries is 0, and a tracer adds an entry it will
127  * signal. When dp_entries is 0, the main thread will wait for that condition
128  * to be signalled.)
129  *
130  * adb: It may be better just to have a large buffer per tracer per dev,
131  * and then use it as a ring-buffer. This would certainly cut down a lot
132  * of malloc/free thrashing, at the cost of more memory movements (potentially).
133  */
134 struct trace_buf {
135         struct list_head head;
136         struct devpath *dpp;
137         void *buf;
138         int cpu, len;
139 };
140
141 struct tracer_devpath_head {
142         pthread_mutex_t mutex;
143         struct list_head head;
144         struct trace_buf *prev;
145 };
146
147 /*
148  * Used to handle the mmap() interfaces for output file (containing traces)
149  */
150 struct mmap_info {
151         void *fs_buf;
152         unsigned long long fs_size, fs_max_size, fs_off, fs_buf_len;
153         unsigned long buf_size, buf_nr;
154         int pagesize;
155 };
156
157 /*
158  * Each thread doing work on a (client) side of blktrace will have one
159  * of these. The ios array contains input/output information, pfds holds
160  * poll() data. The volatile's provide flags to/from the main executing
161  * thread.
162  */
163 struct tracer {
164         struct list_head head;
165         struct io_info *ios;
166         struct pollfd *pfds;
167         pthread_t thread;
168         int cpu, nios;
169         volatile int status, is_done;
170 };
171
172 /*
173  * networking stuff follows. we include a magic number so we know whether
174  * to endianness convert or not.
175  *
176  * The len field is overloaded:
177  *      0 - Indicates an "open" - allowing the server to set up for a dev/cpu
178  *      1 - Indicates a "close" - Shut down connection orderly
179  *
180  * The cpu field is overloaded on close: it will contain the number of drops.
181  */
182 struct blktrace_net_hdr {
183         u32 magic;              /* same as trace magic */
184         char buts_name[32];     /* trace name */
185         u32 cpu;                /* for which cpu */
186         u32 max_cpus;
187         u32 len;                /* length of following trace data */
188         u32 cl_id;              /* id for set of client per-cpu connections */
189         u32 buf_size;           /* client buf_size for this trace  */
190         u32 buf_nr;             /* client buf_nr for this trace  */
191         u32 page_size;          /* client page_size for this trace  */
192 };
193
194 /*
195  * Each host encountered has one of these. The head is used to link this
196  * on to the network server's ch_list. Connections associated with this
197  * host are linked on conn_list, and any devices traced on that host
198  * are connected on the devpaths list.
199  */
200 struct cl_host {
201         struct list_head head;
202         struct list_head conn_list;
203         struct list_head devpaths;
204         struct net_server_s *ns;
205         char *hostname;
206         struct in_addr cl_in_addr;
207         int connects, ndevs, cl_opens;
208 };
209
210 /*
211  * Each connection (client to server socket ('fd')) has one of these. A
212  * back reference to the host ('ch'), and lists headers (for the host
213  * list, and the network server conn_list) are also included.
214  */
215 struct cl_conn {
216         struct list_head ch_head, ns_head;
217         struct cl_host *ch;
218         int fd, ncpus;
219         time_t connect_time;
220 };
221
222 /*
223  * The network server requires some poll structures to be maintained -
224  * one per conection currently on conn_list. The nchs/ch_list values
225  * are for each host connected to this server. The addr field is used
226  * for scratch as new connections are established.
227  */
228 struct net_server_s {
229         struct list_head conn_list;
230         struct list_head ch_list;
231         struct pollfd *pfds;
232         int listen_fd, connects, nchs;
233         struct sockaddr_in addr;
234 };
235
236 /*
237  * This structure is (generically) used to providide information
238  * for a read-to-write set of values.
239  *
240  * ifn & ifd represent input information
241  *
242  * ofn, ofd, ofp, obuf & mmap_info are used for output file (optionally).
243  */
244 struct io_info {
245         struct devpath *dpp;
246         FILE *ofp;
247         char *obuf;
248         struct cl_conn *nc;     /* Server network connection */
249
250         /*
251          * mmap controlled output files
252          */
253         struct mmap_info mmap_info;
254
255         /*
256          * Client network fields
257          */
258         unsigned int ready;
259         unsigned long long data_queued;
260
261         /*
262          * Input/output file descriptors & names
263          */
264         int ifd, ofd;
265         char ifn[MAXPATHLEN + 64];
266         char ofn[MAXPATHLEN + 64];
267 };
268
269 static char blktrace_version[] = "2.0.0";
270
271 /*
272  * Linkage to blktrace helper routines (trace conversions)
273  */
274 int data_is_native = -1;
275
276 static int ndevs;
277 static int ncpus;
278 static int pagesize;
279 static int act_mask = ~0U;
280 static int kill_running_trace;
281 static int stop_watch;
282 static int piped_output;
283
284 static char *debugfs_path = "/sys/kernel/debug";
285 static char *output_name;
286 static char *output_dir;
287
288 static unsigned long buf_size = BUF_SIZE;
289 static unsigned long buf_nr = BUF_NR;
290
291 static FILE *pfp;
292
293 static LIST_HEAD(devpaths);
294 static LIST_HEAD(tracers);
295
296 static volatile int done;
297
298 /*
299  * tracer threads add entries, the main thread takes them off and processes
300  * them. These protect the dp_entries variable.
301  */
302 static pthread_cond_t dp_cond = PTHREAD_COND_INITIALIZER;
303 static pthread_mutex_t dp_mutex = PTHREAD_MUTEX_INITIALIZER;
304 static volatile int dp_entries;
305
306 /*
307  * These synchronize master / thread interactions.
308  */
309 static pthread_cond_t mt_cond = PTHREAD_COND_INITIALIZER;
310 static pthread_mutex_t mt_mutex = PTHREAD_MUTEX_INITIALIZER;
311 static volatile int nthreads_running;
312 static volatile int nthreads_leaving;
313 static volatile int nthreads_error;
314 static volatile int tracers_run;
315
316 /*
317  * network cmd line params
318  */
319 static struct sockaddr_in hostname_addr;
320 static char hostname[MAXHOSTNAMELEN];
321 static int net_port = TRACE_NET_PORT;
322 static int net_use_sendfile = 1;
323 static int net_mode;
324 static int *cl_fds;
325
326 static int (*handle_pfds)(struct tracer *, int, int);
327 static int (*handle_list)(struct tracer_devpath_head *, struct list_head *);
328
329 #define S_OPTS  "d:a:A:r:o:kw:vVb:n:D:lh:p:sI:"
330 static struct option l_opts[] = {
331         {
332                 .name = "dev",
333                 .has_arg = required_argument,
334                 .flag = NULL,
335                 .val = 'd'
336         },
337         {
338                 .name = "input-devs",
339                 .has_arg = required_argument,
340                 .flag = NULL,
341                 .val = 'I'
342         },
343         {
344                 .name = "act-mask",
345                 .has_arg = required_argument,
346                 .flag = NULL,
347                 .val = 'a'
348         },
349         {
350                 .name = "set-mask",
351                 .has_arg = required_argument,
352                 .flag = NULL,
353                 .val = 'A'
354         },
355         {
356                 .name = "relay",
357                 .has_arg = required_argument,
358                 .flag = NULL,
359                 .val = 'r'
360         },
361         {
362                 .name = "output",
363                 .has_arg = required_argument,
364                 .flag = NULL,
365                 .val = 'o'
366         },
367         {
368                 .name = "kill",
369                 .has_arg = no_argument,
370                 .flag = NULL,
371                 .val = 'k'
372         },
373         {
374                 .name = "stopwatch",
375                 .has_arg = required_argument,
376                 .flag = NULL,
377                 .val = 'w'
378         },
379         {
380                 .name = "version",
381                 .has_arg = no_argument,
382                 .flag = NULL,
383                 .val = 'v'
384         },
385         {
386                 .name = "version",
387                 .has_arg = no_argument,
388                 .flag = NULL,
389                 .val = 'V'
390         },
391         {
392                 .name = "buffer-size",
393                 .has_arg = required_argument,
394                 .flag = NULL,
395                 .val = 'b'
396         },
397         {
398                 .name = "num-sub-buffers",
399                 .has_arg = required_argument,
400                 .flag = NULL,
401                 .val = 'n'
402         },
403         {
404                 .name = "output-dir",
405                 .has_arg = required_argument,
406                 .flag = NULL,
407                 .val = 'D'
408         },
409         {
410                 .name = "listen",
411                 .has_arg = no_argument,
412                 .flag = NULL,
413                 .val = 'l'
414         },
415         {
416                 .name = "host",
417                 .has_arg = required_argument,
418                 .flag = NULL,
419                 .val = 'h'
420         },
421         {
422                 .name = "port",
423                 .has_arg = required_argument,
424                 .flag = NULL,
425                 .val = 'p'
426         },
427         {
428                 .name = "no-sendfile",
429                 .has_arg = no_argument,
430                 .flag = NULL,
431                 .val = 's'
432         },
433         {
434                 .name = NULL,
435         }
436 };
437
438 static char usage_str[] = \
439         "-d <dev> [ -r debugfs path ] [ -o <output> ] [-k ] [ -w time ]\n" \
440         "[ -a action ] [ -A action mask ] [ -I  <devs file> ] [ -v ]\n\n" \
441         "\t-d Use specified device. May also be given last after options\n" \
442         "\t-r Path to mounted debugfs, defaults to /sys/kernel/debug\n" \
443         "\t-o File(s) to send output to\n" \
444         "\t-D Directory to prepend to output file names\n" \
445         "\t-w Stop after defined time, in seconds\n" \
446         "\t-a Only trace specified actions. See documentation\n" \
447         "\t-A Give trace mask as a single value. See documentation\n" \
448         "\t-b Sub buffer size in KiB\n" \
449         "\t-n Number of sub buffers\n" \
450         "\t-l Run in network listen mode (blktrace server)\n" \
451         "\t-h Run in network client mode, connecting to the given host\n" \
452         "\t-p Network port to use (default 8462)\n" \
453         "\t-s Make the network client NOT use sendfile() to transfer data\n" \
454         "\t-I Add devices found in <devs file>\n" \
455         "\t-V Print program version info\n\n";
456
457 static void clear_events(struct pollfd *pfd)
458 {
459         pfd->events = 0;
460         pfd->revents = 0;
461 }
462
463 static inline int net_client_use_sendfile(void)
464 {
465         return net_mode == Net_client && net_use_sendfile;
466 }
467
468 static inline int net_client_use_send(void)
469 {
470         return net_mode == Net_client && !net_use_sendfile;
471 }
472
473 static inline int use_tracer_devpaths(void)
474 {
475         return piped_output || net_client_use_send();
476 }
477
478 static inline int in_addr_eq(struct in_addr a, struct in_addr b)
479 {
480         return a.s_addr == b.s_addr;
481 }
482
483 static inline void pdc_dr_update(struct devpath *dpp, int cpu, int data_read)
484 {
485         dpp->stats[cpu].data_read += data_read;
486 }
487
488 static inline void pdc_nev_update(struct devpath *dpp, int cpu, int nevents)
489 {
490         dpp->stats[cpu].nevents += nevents;
491 }
492
493 static void show_usage(char *prog)
494 {
495         fprintf(stderr, "Usage: %s %s", prog, usage_str);
496 }
497
498 /*
499  * Create a timespec 'msec' milliseconds into the future
500  */
501 static inline void make_timespec(struct timespec *tsp, long delta_msec)
502 {
503         struct timeval now;
504
505         gettimeofday(&now, NULL);
506         tsp->tv_sec = now.tv_sec;
507         tsp->tv_nsec = 1000L * now.tv_usec;
508
509         tsp->tv_nsec += (delta_msec * 1000000L);
510         if (tsp->tv_nsec > 1000000000L) {
511                 long secs = tsp->tv_nsec / 1000000000L;
512
513                 tsp->tv_sec += secs;
514                 tsp->tv_nsec -= (secs * 1000000000L);
515         }
516 }
517
518 /*
519  * Add a timer to ensure wait ends
520  */
521 static void t_pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
522 {
523         struct timespec ts;
524
525         make_timespec(&ts, 50);
526         pthread_cond_timedwait(cond, mutex, &ts);
527 }
528
529 static void unblock_tracers(void)
530 {
531         pthread_mutex_lock(&mt_mutex);
532         tracers_run = 1;
533         pthread_cond_broadcast(&mt_cond);
534         pthread_mutex_unlock(&mt_mutex);
535 }
536
537 static void tracer_wait_unblock(struct tracer *tp)
538 {
539         pthread_mutex_lock(&mt_mutex);
540         while (!tp->is_done && !tracers_run)
541                 pthread_cond_wait(&mt_cond, &mt_mutex);
542         pthread_mutex_unlock(&mt_mutex);
543 }
544
545 static void tracer_signal_ready(struct tracer *tp,
546                                 enum thread_status th_status,
547                                 int status)
548 {
549         pthread_mutex_lock(&mt_mutex);
550         tp->status = status;
551
552         if (th_status == Th_running)
553                 nthreads_running++;
554         else if (th_status == Th_error)
555                 nthreads_error++;
556         else
557                 nthreads_leaving++;
558
559         pthread_cond_signal(&mt_cond);
560         pthread_mutex_unlock(&mt_mutex);
561 }
562
563 static void wait_tracers_ready(int ncpus_started)
564 {
565         pthread_mutex_lock(&mt_mutex);
566         while ((nthreads_running + nthreads_error) < ncpus_started)
567                 t_pthread_cond_wait(&mt_cond, &mt_mutex);
568         pthread_mutex_unlock(&mt_mutex);
569 }
570
571 static void wait_tracers_leaving(void)
572 {
573         pthread_mutex_lock(&mt_mutex);
574         while (nthreads_leaving < nthreads_running)
575                 t_pthread_cond_wait(&mt_cond, &mt_mutex);
576         pthread_mutex_unlock(&mt_mutex);
577 }
578
579 static void init_mmap_info(struct mmap_info *mip)
580 {
581         mip->buf_size = buf_size;
582         mip->buf_nr = buf_nr;
583         mip->pagesize = pagesize;
584 }
585
586 static void net_close_connection(int *fd)
587 {
588         shutdown(*fd, SHUT_RDWR);
589         close(*fd);
590         *fd = -1;
591 }
592
593 static void dpp_free(struct devpath *dpp)
594 {
595         if (dpp->stats)
596                 free(dpp->stats);
597         if (dpp->ios)
598                 free(dpp->ios);
599         if (dpp->path)
600                 free(dpp->path);
601         if (dpp->buts_name)
602                 free(dpp->buts_name);
603         free(dpp);
604 }
605
606 static int lock_on_cpu(int cpu)
607 {
608         cpu_set_t cpu_mask;
609
610         CPU_ZERO(&cpu_mask);
611         CPU_SET(cpu, &cpu_mask);
612         if (sched_setaffinity(0, sizeof(cpu_mask), &cpu_mask) < 0)
613                 return errno;
614
615         return 0;
616 }
617
618 static int increase_limit(int resource, rlim_t increase)
619 {
620         struct rlimit rlim;
621         int save_errno = errno;
622
623         if (!getrlimit(resource, &rlim)) {
624                 rlim.rlim_cur += increase;
625                 if (rlim.rlim_cur >= rlim.rlim_max)
626                         rlim.rlim_max = rlim.rlim_cur + increase;
627
628                 if (!setrlimit(resource, &rlim))
629                         return 1;
630         }
631
632         errno = save_errno;
633         return 0;
634 }
635
636 static int handle_open_failure(void)
637 {
638         if (errno == ENFILE || errno == EMFILE)
639                 return increase_limit(RLIMIT_NOFILE, 16);
640         return 0;
641 }
642
643 static int handle_mem_failure(size_t length)
644 {
645         if (errno == ENFILE)
646                 return handle_open_failure();
647         else if (errno == ENOMEM)
648                 return increase_limit(RLIMIT_MEMLOCK, 2 * length);
649         return 0;
650 }
651
652 static FILE *my_fopen(const char *path, const char *mode)
653 {
654         FILE *fp;
655
656         do {
657                 fp = fopen(path, mode);
658         } while (fp == NULL && handle_open_failure());
659
660         return fp;
661 }
662
663 static int my_open(const char *path, int flags)
664 {
665         int fd;
666
667         do {
668                 fd = open(path, flags);
669         } while (fd < 0 && handle_open_failure());
670
671         return fd;
672 }
673
674 static int my_socket(int domain, int type, int protocol)
675 {
676         int fd;
677
678         do {
679                 fd = socket(domain, type, protocol);
680         } while (fd < 0 && handle_open_failure());
681
682         return fd;
683 }
684
685 static int my_accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen)
686 {
687         int fd;
688
689         do {
690                 fd = accept(sockfd, addr, addrlen);
691         } while (fd < 0 && handle_open_failure());
692
693         return fd;
694 }
695
696 static void *my_mmap(void *addr, size_t length, int prot, int flags, int fd,
697                      off_t offset)
698 {
699         void *new;
700
701         do {
702                 new = mmap(addr, length, prot, flags, fd, offset);
703         } while (new == MAP_FAILED && handle_mem_failure(length));
704
705         return new;
706 }
707
708 static int my_mlock(const void *addr, size_t len)
709 {
710         int ret;
711
712         do {
713                 ret = mlock(addr, len);
714         } while (ret < 0 && handle_mem_failure(len));
715
716         return ret;
717 }
718
719 static int setup_mmap(int fd, unsigned int maxlen, struct mmap_info *mip)
720 {
721         if (mip->fs_off + maxlen > mip->fs_buf_len) {
722                 unsigned long nr = max(16, mip->buf_nr);
723
724                 if (mip->fs_buf) {
725                         munlock(mip->fs_buf, mip->fs_buf_len);
726                         munmap(mip->fs_buf, mip->fs_buf_len);
727                         mip->fs_buf = NULL;
728                 }
729
730                 mip->fs_off = mip->fs_size & (mip->pagesize - 1);
731                 mip->fs_buf_len = (nr * mip->buf_size) - mip->fs_off;
732                 mip->fs_max_size += mip->fs_buf_len;
733
734                 if (ftruncate(fd, mip->fs_max_size) < 0) {
735                         perror("setup_mmap: ftruncate");
736                         return 1;
737                 }
738
739                 mip->fs_buf = my_mmap(NULL, mip->fs_buf_len, PROT_WRITE,
740                                       MAP_SHARED, fd,
741                                       mip->fs_size - mip->fs_off);
742                 if (mip->fs_buf == MAP_FAILED) {
743                         perror("setup_mmap: mmap");
744                         return 1;
745                 }
746                 my_mlock(mip->fs_buf, mip->fs_buf_len);
747         }
748
749         return 0;
750 }
751
752 static int __stop_trace(int fd)
753 {
754         /*
755          * Should be stopped, don't complain if it isn't
756          */
757         ioctl(fd, BLKTRACESTOP);
758         return ioctl(fd, BLKTRACETEARDOWN);
759 }
760
761 static int write_data(char *buf, int len)
762 {
763         int ret;
764
765 rewrite:
766         ret = fwrite(buf, len, 1, pfp);
767         if (ferror(pfp) || ret != 1) {
768                 if (errno == EINTR) {
769                         clearerr(pfp);
770                         goto rewrite;
771                 }
772
773                 if (!piped_output || (errno != EPIPE && errno != EBADF)) {
774                         fprintf(stderr, "write(%d) failed: %d/%s\n",
775                                 len, errno, strerror(errno));
776                 }
777                 goto err;
778         }
779
780         fflush(pfp);
781         return 0;
782
783 err:
784         clearerr(pfp);
785         return 1;
786 }
787
788 /*
789  * Returns the number of bytes read (successfully)
790  */
791 static int __net_recv_data(int fd, void *buf, unsigned int len)
792 {
793         unsigned int bytes_left = len;
794
795         while (bytes_left && !done) {
796                 int ret = recv(fd, buf, bytes_left, MSG_WAITALL);
797
798                 if (ret == 0)
799                         break;
800                 else if (ret < 0) {
801                         if (errno == EAGAIN) {
802                                 usleep(50);
803                                 continue;
804                         }
805                         perror("server: net_recv_data: recv failed");
806                         break;
807                 } else {
808                         buf += ret;
809                         bytes_left -= ret;
810                 }
811         }
812
813         return len - bytes_left;
814 }
815
816 static int net_recv_data(int fd, void *buf, unsigned int len)
817 {
818         return __net_recv_data(fd, buf, len);
819 }
820
821 /*
822  * Returns number of bytes written
823  */
824 static int net_send_data(int fd, void *buf, unsigned int buf_len)
825 {
826         int ret;
827         unsigned int bytes_left = buf_len;
828
829         while (bytes_left) {
830                 ret = send(fd, buf, bytes_left, 0);
831                 if (ret < 0) {
832                         perror("send");
833                         break;
834                 }
835
836                 buf += ret;
837                 bytes_left -= ret;
838         }
839
840         return buf_len - bytes_left;
841 }
842
843 static int net_send_header(int fd, int cpu, char *buts_name, int len)
844 {
845         struct blktrace_net_hdr hdr;
846
847         memset(&hdr, 0, sizeof(hdr));
848
849         hdr.magic = BLK_IO_TRACE_MAGIC;
850         strncpy(hdr.buts_name, buts_name, sizeof(hdr.buts_name));
851         hdr.buts_name[sizeof(hdr.buts_name)-1] = '\0';
852         hdr.cpu = cpu;
853         hdr.max_cpus = ncpus;
854         hdr.len = len;
855         hdr.cl_id = getpid();
856         hdr.buf_size = buf_size;
857         hdr.buf_nr = buf_nr;
858         hdr.page_size = pagesize;
859
860         return net_send_data(fd, &hdr, sizeof(hdr)) != sizeof(hdr);
861 }
862
863 static void net_send_open_close(int fd, int cpu, char *buts_name, int len)
864 {
865         struct blktrace_net_hdr ret_hdr;
866
867         net_send_header(fd, cpu, buts_name, len);
868         net_recv_data(fd, &ret_hdr, sizeof(ret_hdr));
869 }
870
871 static void net_send_open(int fd, int cpu, char *buts_name)
872 {
873         net_send_open_close(fd, cpu, buts_name, 0);
874 }
875
876 static void net_send_close(int fd, char *buts_name, int drops)
877 {
878         /*
879          * Overload CPU w/ number of drops
880          *
881          * XXX: Need to clear/set done around call - done=1 (which
882          * is true here) stops reads from happening... :-(
883          */
884         done = 0;
885         net_send_open_close(fd, drops, buts_name, 1);
886         done = 1;
887 }
888
889 static void ack_open_close(int fd, char *buts_name)
890 {
891         net_send_header(fd, 0, buts_name, 2);
892 }
893
894 static void net_send_drops(int fd)
895 {
896         struct list_head *p;
897
898         __list_for_each(p, &devpaths) {
899                 struct devpath *dpp = list_entry(p, struct devpath, head);
900
901                 net_send_close(fd, dpp->buts_name, dpp->drops);
902         }
903 }
904
905 /*
906  * Returns:
907  *       0: "EOF"
908  *       1: OK
909  *      -1: Error
910  */
911 static int net_get_header(struct cl_conn *nc, struct blktrace_net_hdr *bnh)
912 {
913         int bytes_read;
914         int fl = fcntl(nc->fd, F_GETFL);
915
916         fcntl(nc->fd, F_SETFL, fl | O_NONBLOCK);
917         bytes_read = __net_recv_data(nc->fd, bnh, sizeof(*bnh));
918         fcntl(nc->fd, F_SETFL, fl & ~O_NONBLOCK);
919
920         if (bytes_read == sizeof(*bnh))
921                 return 1;
922         else if (bytes_read == 0)
923                 return 0;
924         else
925                 return -1;
926 }
927
928 static int net_setup_addr(void)
929 {
930         struct sockaddr_in *addr = &hostname_addr;
931
932         memset(addr, 0, sizeof(*addr));
933         addr->sin_family = AF_INET;
934         addr->sin_port = htons(net_port);
935
936         if (inet_aton(hostname, &addr->sin_addr) != 1) {
937                 struct hostent *hent;
938 retry:
939                 hent = gethostbyname(hostname);
940                 if (!hent) {
941                         if (h_errno == TRY_AGAIN) {
942                                 usleep(100);
943                                 goto retry;
944                         } else if (h_errno == NO_RECOVERY) {
945                                 fprintf(stderr, "gethostbyname(%s)"
946                                         "non-recoverable error encountered\n",
947                                         hostname);
948                         } else {
949                                 /*
950                                  * HOST_NOT_FOUND, NO_ADDRESS or NO_DATA
951                                  */
952                                 fprintf(stderr, "Host %s not found\n",
953                                         hostname);
954                         }
955                         return 1;
956                 }
957
958                 memcpy(&addr->sin_addr, hent->h_addr, 4);
959                 strcpy(hostname, hent->h_name);
960         }
961
962         return 0;
963 }
964
965 static int net_setup_client(void)
966 {
967         int fd;
968         struct sockaddr_in *addr = &hostname_addr;
969
970         fd = my_socket(AF_INET, SOCK_STREAM, 0);
971         if (fd < 0) {
972                 perror("client: socket");
973                 return -1;
974         }
975
976         if (connect(fd, (struct sockaddr *)addr, sizeof(*addr)) < 0) {
977                 if (errno == ECONNREFUSED)
978                         fprintf(stderr,
979                                 "\nclient: Connection to %s refused, "
980                                 "perhaps the server is not started?\n\n",
981                                 hostname);
982                 else
983                         perror("client: connect");
984
985                 close(fd);
986                 return -1;
987         }
988
989         return fd;
990 }
991
992 static int open_client_connections(void)
993 {
994         int cpu;
995
996         cl_fds = calloc(ncpus, sizeof(*cl_fds));
997         for (cpu = 0; cpu < ncpus; cpu++) {
998                 cl_fds[cpu] = net_setup_client();
999                 if (cl_fds[cpu] < 0)
1000                         goto err;
1001         }
1002         return 0;
1003
1004 err:
1005         while (cpu > 0)
1006                 close(cl_fds[cpu--]);
1007         free(cl_fds);
1008         return 1;
1009 }
1010
1011 static void close_client_connections(void)
1012 {
1013         if (cl_fds) {
1014                 int cpu, *fdp;
1015
1016                 for (cpu = 0, fdp = cl_fds; cpu < ncpus; cpu++, fdp++) {
1017                         if (*fdp >= 0) {
1018                                 net_send_drops(*fdp);
1019                                 net_close_connection(fdp);
1020                         }
1021                 }
1022                 free(cl_fds);
1023         }
1024 }
1025
1026 static void setup_buts(void)
1027 {
1028         struct list_head *p;
1029
1030         __list_for_each(p, &devpaths) {
1031                 struct blk_user_trace_setup buts;
1032                 struct devpath *dpp = list_entry(p, struct devpath, head);
1033
1034                 memset(&buts, 0, sizeof(buts));
1035                 buts.buf_size = buf_size;
1036                 buts.buf_nr = buf_nr;
1037                 buts.act_mask = act_mask;
1038
1039                 if (ioctl(dpp->fd, BLKTRACESETUP, &buts) >= 0) {
1040                         dpp->ncpus = ncpus;
1041                         dpp->buts_name = strdup(buts.name);
1042                         if (dpp->stats)
1043                                 free(dpp->stats);
1044                         dpp->stats = calloc(dpp->ncpus, sizeof(*dpp->stats));
1045                         memset(dpp->stats, 0, dpp->ncpus * sizeof(*dpp->stats));
1046                 } else
1047                         fprintf(stderr, "BLKTRACESETUP(2) %s failed: %d/%s\n",
1048                                 dpp->path, errno, strerror(errno));
1049         }
1050 }
1051
1052 static void start_buts(void)
1053 {
1054         struct list_head *p;
1055
1056         __list_for_each(p, &devpaths) {
1057                 struct devpath *dpp = list_entry(p, struct devpath, head);
1058
1059                 if (ioctl(dpp->fd, BLKTRACESTART) < 0) {
1060                         fprintf(stderr, "BLKTRACESTART %s failed: %d/%s\n",
1061                                 dpp->path, errno, strerror(errno));
1062                 }
1063         }
1064 }
1065
1066 static int get_drops(struct devpath *dpp)
1067 {
1068         int fd, drops = 0;
1069         char fn[MAXPATHLEN + 64], tmp[256];
1070
1071         snprintf(fn, sizeof(fn), "%s/block/%s/dropped", debugfs_path,
1072                  dpp->buts_name);
1073
1074         fd = my_open(fn, O_RDONLY);
1075         if (fd < 0) {
1076                 /*
1077                  * This may be ok: the kernel may not support
1078                  * dropped counts.
1079                  */
1080                 if (errno != ENOENT)
1081                         fprintf(stderr, "Could not open %s: %d/%s\n",
1082                                 fn, errno, strerror(errno));
1083                 return 0;
1084         } else if (read(fd, tmp, sizeof(tmp)) < 0) {
1085                 fprintf(stderr, "Could not read %s: %d/%s\n",
1086                         fn, errno, strerror(errno));
1087         } else
1088                 drops = atoi(tmp);
1089         close(fd);
1090
1091         return drops;
1092 }
1093
1094 static void get_all_drops(void)
1095 {
1096         struct list_head *p;
1097
1098         __list_for_each(p, &devpaths) {
1099                 struct devpath *dpp = list_entry(p, struct devpath, head);
1100
1101                 dpp->drops = get_drops(dpp);
1102         }
1103 }
1104
1105 static inline struct trace_buf *alloc_trace_buf(int cpu, int bufsize)
1106 {
1107         struct trace_buf *tbp;
1108
1109         tbp = malloc(sizeof(*tbp) + bufsize);
1110         INIT_LIST_HEAD(&tbp->head);
1111         tbp->len = 0;
1112         tbp->buf = (void *)(tbp + 1);
1113         tbp->cpu = cpu;
1114         tbp->dpp = NULL;        /* Will be set when tbp is added */
1115
1116         return tbp;
1117 }
1118
1119 static void free_tracer_heads(struct devpath *dpp)
1120 {
1121         int cpu;
1122         struct tracer_devpath_head *hd;
1123
1124         for (cpu = 0, hd = dpp->heads; cpu < ncpus; cpu++, hd++) {
1125                 if (hd->prev)
1126                         free(hd->prev);
1127
1128                 pthread_mutex_destroy(&hd->mutex);
1129         }
1130         free(dpp->heads);
1131 }
1132
1133 static int setup_tracer_devpaths(void)
1134 {
1135         struct list_head *p;
1136
1137         if (net_client_use_send())
1138                 if (open_client_connections())
1139                         return 1;
1140
1141         __list_for_each(p, &devpaths) {
1142                 int cpu;
1143                 struct tracer_devpath_head *hd;
1144                 struct devpath *dpp = list_entry(p, struct devpath, head);
1145
1146                 dpp->heads = calloc(ncpus, sizeof(struct tracer_devpath_head));
1147                 for (cpu = 0, hd = dpp->heads; cpu < ncpus; cpu++, hd++) {
1148                         INIT_LIST_HEAD(&hd->head);
1149                         pthread_mutex_init(&hd->mutex, NULL);
1150                         hd->prev = NULL;
1151                 }
1152         }
1153
1154         return 0;
1155 }
1156
1157 static inline void add_trace_buf(struct devpath *dpp, int cpu,
1158                                                 struct trace_buf **tbpp)
1159 {
1160         struct trace_buf *tbp = *tbpp;
1161         struct tracer_devpath_head *hd = &dpp->heads[cpu];
1162
1163         tbp->dpp = dpp;
1164
1165         pthread_mutex_lock(&hd->mutex);
1166         list_add_tail(&tbp->head, &hd->head);
1167         pthread_mutex_unlock(&hd->mutex);
1168
1169         *tbpp = alloc_trace_buf(cpu, buf_size);
1170 }
1171
1172 static inline void incr_entries(int entries_handled)
1173 {
1174         pthread_mutex_lock(&dp_mutex);
1175         if (dp_entries == 0)
1176                 pthread_cond_signal(&dp_cond);
1177         dp_entries += entries_handled;
1178         pthread_mutex_unlock(&dp_mutex);
1179 }
1180
1181 static void decr_entries(int handled)
1182 {
1183         pthread_mutex_lock(&dp_mutex);
1184         dp_entries -= handled;
1185         pthread_mutex_unlock(&dp_mutex);
1186 }
1187
1188 static int wait_empty_entries(void)
1189 {
1190         pthread_mutex_lock(&dp_mutex);
1191         while (!done && dp_entries == 0)
1192                 t_pthread_cond_wait(&dp_cond, &dp_mutex);
1193         pthread_mutex_unlock(&dp_mutex);
1194
1195         return !done;
1196 }
1197
1198 static int add_devpath(char *path)
1199 {
1200         int fd;
1201         struct devpath *dpp;
1202         struct list_head *p;
1203
1204         /*
1205          * Verify device is not duplicated
1206          */
1207         __list_for_each(p, &devpaths) {
1208                struct devpath *tmp = list_entry(p, struct devpath, head);
1209                if (!strcmp(tmp->path, path))
1210                         return 0;
1211         }
1212         /*
1213          * Verify device is valid before going too far
1214          */
1215         fd = my_open(path, O_RDONLY | O_NONBLOCK);
1216         if (fd < 0) {
1217                 fprintf(stderr, "Invalid path %s specified: %d/%s\n",
1218                         path, errno, strerror(errno));
1219                 return 1;
1220         }
1221
1222         dpp = malloc(sizeof(*dpp));
1223         memset(dpp, 0, sizeof(*dpp));
1224         dpp->path = strdup(path);
1225         dpp->fd = fd;
1226         dpp->idx = ndevs++;
1227         list_add_tail(&dpp->head, &devpaths);
1228
1229         return 0;
1230 }
1231
1232 static void rel_devpaths(void)
1233 {
1234         struct list_head *p, *q;
1235
1236         list_for_each_safe(p, q, &devpaths) {
1237                 struct devpath *dpp = list_entry(p, struct devpath, head);
1238
1239                 list_del(&dpp->head);
1240                 __stop_trace(dpp->fd);
1241                 close(dpp->fd);
1242
1243                 if (dpp->heads)
1244                         free_tracer_heads(dpp);
1245
1246                 dpp_free(dpp);
1247                 ndevs--;
1248         }
1249 }
1250
1251 static int flush_subbuf_net(struct trace_buf *tbp)
1252 {
1253         int fd = cl_fds[tbp->cpu];
1254         struct devpath *dpp = tbp->dpp;
1255
1256         if (net_send_header(fd, tbp->cpu, dpp->buts_name, tbp->len))
1257                 return 1;
1258         else if (net_send_data(fd, tbp->buf, tbp->len) != tbp->len)
1259                 return 1;
1260
1261         return 0;
1262 }
1263
1264 static int
1265 handle_list_net(__attribute__((__unused__))struct tracer_devpath_head *hd,
1266                 struct list_head *list)
1267 {
1268         struct trace_buf *tbp;
1269         struct list_head *p, *q;
1270         int entries_handled = 0;
1271
1272         list_for_each_safe(p, q, list) {
1273                 tbp = list_entry(p, struct trace_buf, head);
1274
1275                 list_del(&tbp->head);
1276                 entries_handled++;
1277
1278                 if (cl_fds[tbp->cpu] >= 0) {
1279                         if (flush_subbuf_net(tbp)) {
1280                                 close(cl_fds[tbp->cpu]);
1281                                 cl_fds[tbp->cpu] = -1;
1282                         }
1283                 }
1284
1285                 free(tbp);
1286         }
1287
1288         return entries_handled;
1289 }
1290
1291 /*
1292  * Tack 'tbp's buf onto the tail of 'prev's buf
1293  */
1294 static struct trace_buf *tb_combine(struct trace_buf *prev,
1295                                     struct trace_buf *tbp)
1296 {
1297         unsigned long tot_len;
1298
1299         tot_len = prev->len + tbp->len;
1300         if (tot_len > buf_size) {
1301                 /*
1302                  * tbp->head isn't connected (it was 'prev'
1303                  * so it had been taken off of the list
1304                  * before). Therefore, we can realloc
1305                  * the whole structures, as the other fields
1306                  * are "static".
1307                  */
1308                 prev = realloc(prev->buf, sizeof(*prev) + tot_len);
1309                 prev->buf = (void *)(prev + 1);
1310         }
1311
1312         memcpy(prev->buf + prev->len, tbp->buf, tbp->len);
1313         prev->len = tot_len;
1314
1315         free(tbp);
1316         return prev;
1317 }
1318
1319 static int handle_list_file(struct tracer_devpath_head *hd,
1320                             struct list_head *list)
1321 {
1322         int off, t_len, nevents;
1323         struct blk_io_trace *t;
1324         struct list_head *p, *q;
1325         int entries_handled = 0;
1326         struct trace_buf *tbp, *prev;
1327
1328         prev = hd->prev;
1329         list_for_each_safe(p, q, list) {
1330                 tbp = list_entry(p, struct trace_buf, head);
1331                 list_del(&tbp->head);
1332                 entries_handled++;
1333
1334                 /*
1335                  * If there was some leftover before, tack this new
1336                  * entry onto the tail of the previous one.
1337                  */
1338                 if (prev)
1339                         tbp = tb_combine(prev, tbp);
1340
1341                 /*
1342                  * See how many whole traces there are - send them
1343                  * all out in one go.
1344                  */
1345                 off = 0;
1346                 nevents = 0;
1347                 while (off + (int)sizeof(*t) <= tbp->len) {
1348                         t = (struct blk_io_trace *)(tbp->buf + off);
1349                         t_len = sizeof(*t) + t->pdu_len;
1350                         if (off + t_len > tbp->len)
1351                                 break;
1352
1353                         off += t_len;
1354                         nevents++;
1355                 }
1356                 if (nevents)
1357                         pdc_nev_update(tbp->dpp, tbp->cpu, nevents);
1358
1359                 /*
1360                  * Write any full set of traces, any remaining data is kept
1361                  * for the next pass.
1362                  */
1363                 if (off) {
1364                         if (write_data(tbp->buf, off) || off == tbp->len) {
1365                                 free(tbp);
1366                                 prev = NULL;
1367                         }
1368                         else {
1369                                 /*
1370                                  * Move valid data to beginning of buffer
1371                                  */
1372                                 tbp->len -= off;
1373                                 memmove(tbp->buf, tbp->buf + off, tbp->len);
1374                                 prev = tbp;
1375                         }
1376                 } else
1377                         prev = tbp;
1378         }
1379         hd->prev = prev;
1380
1381         return entries_handled;
1382 }
1383
1384 static void __process_trace_bufs(void)
1385 {
1386         int cpu;
1387         struct list_head *p;
1388         struct list_head list;
1389         int handled = 0;
1390
1391         __list_for_each(p, &devpaths) {
1392                 struct devpath *dpp = list_entry(p, struct devpath, head);
1393                 struct tracer_devpath_head *hd = dpp->heads;
1394
1395                 for (cpu = 0; cpu < ncpus; cpu++, hd++) {
1396                         pthread_mutex_lock(&hd->mutex);
1397                         if (list_empty(&hd->head)) {
1398                                 pthread_mutex_unlock(&hd->mutex);
1399                                 continue;
1400                         }
1401
1402                         list_replace_init(&hd->head, &list);
1403                         pthread_mutex_unlock(&hd->mutex);
1404
1405                         handled += handle_list(hd, &list);
1406                 }
1407         }
1408
1409         if (handled)
1410                 decr_entries(handled);
1411 }
1412
1413 static void process_trace_bufs(void)
1414 {
1415         while (wait_empty_entries())
1416                 __process_trace_bufs();
1417 }
1418
1419 static void clean_trace_bufs(void)
1420 {
1421         /*
1422          * No mutex needed here: we're only reading from the lists,
1423          * tracers are done
1424          */
1425         while (dp_entries)
1426                 __process_trace_bufs();
1427 }
1428
1429 static inline void read_err(int cpu, char *ifn)
1430 {
1431         if (errno != EAGAIN)
1432                 fprintf(stderr, "Thread %d failed read of %s: %d/%s\n",
1433                         cpu, ifn, errno, strerror(errno));
1434 }
1435
1436 static int net_sendfile(struct io_info *iop)
1437 {
1438         int ret;
1439
1440         ret = sendfile(iop->ofd, iop->ifd, NULL, iop->ready);
1441         if (ret < 0) {
1442                 perror("sendfile");
1443                 return 1;
1444         } else if (ret < (int)iop->ready) {
1445                 fprintf(stderr, "short sendfile send (%d of %d)\n",
1446                         ret, iop->ready);
1447                 return 1;
1448         }
1449
1450         return 0;
1451 }
1452
1453 static inline int net_sendfile_data(struct tracer *tp, struct io_info *iop)
1454 {
1455         struct devpath *dpp = iop->dpp;
1456
1457         if (net_send_header(iop->ofd, tp->cpu, dpp->buts_name, iop->ready))
1458                 return 1;
1459         return net_sendfile(iop);
1460 }
1461
1462 static int fill_ofname(struct io_info *iop, int cpu)
1463 {
1464         int len;
1465         struct stat sb;
1466         char *dst = iop->ofn;
1467
1468         if (output_dir)
1469                 len = snprintf(iop->ofn, sizeof(iop->ofn), "%s/", output_dir);
1470         else
1471                 len = snprintf(iop->ofn, sizeof(iop->ofn), "./");
1472
1473         if (net_mode == Net_server) {
1474                 struct cl_conn *nc = iop->nc;
1475
1476                 len += sprintf(dst + len, "%s-", nc->ch->hostname);
1477                 len += strftime(dst + len, 64, "%F-%T/",
1478                                 gmtime(&iop->dpp->cl_connect_time));
1479         }
1480
1481         if (stat(iop->ofn, &sb) < 0) {
1482                 if (errno != ENOENT) {
1483                         fprintf(stderr,
1484                                 "Destination dir %s stat failed: %d/%s\n",
1485                                 iop->ofn, errno, strerror(errno));
1486                         return 1;
1487                 }
1488                 /*
1489                  * There is no synchronization between multiple threads
1490                  * trying to create the directory at once.  It's harmless
1491                  * to let them try, so just detect the problem and move on.
1492                  */
1493                 if (mkdir(iop->ofn, 0755) < 0 && errno != EEXIST) {
1494                         fprintf(stderr,
1495                                 "Destination dir %s can't be made: %d/%s\n",
1496                                 iop->ofn, errno, strerror(errno));
1497                         return 1;
1498                 }
1499         }
1500
1501         if (output_name)
1502                 snprintf(iop->ofn + len, sizeof(iop->ofn), "%s.blktrace.%d",
1503                          output_name, cpu);
1504         else
1505                 snprintf(iop->ofn + len, sizeof(iop->ofn), "%s.blktrace.%d",
1506                          iop->dpp->buts_name, cpu);
1507
1508         return 0;
1509 }
1510
1511 static int set_vbuf(struct io_info *iop, int mode, size_t size)
1512 {
1513         iop->obuf = malloc(size);
1514         if (setvbuf(iop->ofp, iop->obuf, mode, size) < 0) {
1515                 fprintf(stderr, "setvbuf(%s, %d) failed: %d/%s\n",
1516                         iop->dpp->path, (int)size, errno,
1517                         strerror(errno));
1518                 free(iop->obuf);
1519                 return 1;
1520         }
1521
1522         return 0;
1523 }
1524
1525 static int iop_open(struct io_info *iop, int cpu)
1526 {
1527         iop->ofd = -1;
1528         if (fill_ofname(iop, cpu))
1529                 return 1;
1530
1531         iop->ofp = my_fopen(iop->ofn, "w+");
1532         if (iop->ofp == NULL) {
1533                 fprintf(stderr, "Open output file %s failed: %d/%s\n",
1534                         iop->ofn, errno, strerror(errno));
1535                 return 1;
1536         }
1537
1538         if (set_vbuf(iop, _IOLBF, FILE_VBUF_SIZE)) {
1539                 fprintf(stderr, "set_vbuf for file %s failed: %d/%s\n",
1540                         iop->ofn, errno, strerror(errno));
1541                 fclose(iop->ofp);
1542                 return 1;
1543         }
1544
1545         iop->ofd = fileno(iop->ofp);
1546         return 0;
1547 }
1548
1549 static void close_iop(struct io_info *iop)
1550 {
1551         struct mmap_info *mip = &iop->mmap_info;
1552
1553         if (mip->fs_buf)
1554                 munmap(mip->fs_buf, mip->fs_buf_len);
1555
1556         if (!piped_output) {
1557                 if (ftruncate(fileno(iop->ofp), mip->fs_size) < 0) {
1558                         fprintf(stderr,
1559                                 "Ignoring err: ftruncate(%s): %d/%s\n",
1560                                 iop->ofn, errno, strerror(errno));
1561                 }
1562         }
1563
1564         if (iop->ofp)
1565                 fclose(iop->ofp);
1566         if (iop->obuf)
1567                 free(iop->obuf);
1568 }
1569
1570 static void close_ios(struct tracer *tp)
1571 {
1572         while (tp->nios > 0) {
1573                 struct io_info *iop = &tp->ios[--tp->nios];
1574
1575                 iop->dpp->drops = get_drops(iop->dpp);
1576                 if (iop->ifd >= 0)
1577                         close(iop->ifd);
1578
1579                 if (iop->ofp)
1580                         close_iop(iop);
1581                 else if (iop->ofd >= 0) {
1582                         struct devpath *dpp = iop->dpp;
1583
1584                         net_send_close(iop->ofd, dpp->buts_name, dpp->drops);
1585                         net_close_connection(&iop->ofd);
1586                 }
1587         }
1588
1589         free(tp->ios);
1590         free(tp->pfds);
1591 }
1592
1593 static int open_ios(struct tracer *tp)
1594 {
1595         struct pollfd *pfd;
1596         struct io_info *iop;
1597         struct list_head *p;
1598
1599         tp->ios = calloc(ndevs, sizeof(struct io_info));
1600         memset(tp->ios, 0, ndevs * sizeof(struct io_info));
1601
1602         tp->pfds = calloc(ndevs, sizeof(struct pollfd));
1603         memset(tp->pfds, 0, ndevs * sizeof(struct pollfd));
1604
1605         tp->nios = 0;
1606         iop = tp->ios;
1607         pfd = tp->pfds;
1608         __list_for_each(p, &devpaths) {
1609                 struct devpath *dpp = list_entry(p, struct devpath, head);
1610
1611                 iop->dpp = dpp;
1612                 iop->ofd = -1;
1613                 snprintf(iop->ifn, sizeof(iop->ifn), "%s/block/%s/trace%d",
1614                         debugfs_path, dpp->buts_name, tp->cpu);
1615
1616                 iop->ifd = my_open(iop->ifn, O_RDONLY | O_NONBLOCK);
1617                 if (iop->ifd < 0) {
1618                         fprintf(stderr, "Thread %d failed open %s: %d/%s\n",
1619                                 tp->cpu, iop->ifn, errno, strerror(errno));
1620                         return 1;
1621                 }
1622
1623                 init_mmap_info(&iop->mmap_info);
1624
1625                 pfd->fd = iop->ifd;
1626                 pfd->events = POLLIN;
1627
1628                 if (piped_output)
1629                         ;
1630                 else if (net_client_use_sendfile()) {
1631                         iop->ofd = net_setup_client();
1632                         if (iop->ofd < 0)
1633                                 goto err;
1634                         net_send_open(iop->ofd, tp->cpu, dpp->buts_name);
1635                 } else if (net_mode == Net_none) {
1636                         if (iop_open(iop, tp->cpu))
1637                                 goto err;
1638                 } else {
1639                         /*
1640                          * This ensures that the server knows about all
1641                          * connections & devices before _any_ closes
1642                          */
1643                         net_send_open(cl_fds[tp->cpu], tp->cpu, dpp->buts_name);
1644                 }
1645
1646                 pfd++;
1647                 iop++;
1648                 tp->nios++;
1649         }
1650
1651         return 0;
1652
1653 err:
1654         close(iop->ifd);        /* tp->nios _not_ bumped */
1655         close_ios(tp);
1656         return 1;
1657 }
1658
1659 static int handle_pfds_file(struct tracer *tp, int nevs, int force_read)
1660 {
1661         struct mmap_info *mip;
1662         int i, ret, nentries = 0;
1663         struct pollfd *pfd = tp->pfds;
1664         struct io_info *iop = tp->ios;
1665
1666         for (i = 0; nevs > 0 && i < ndevs; i++, pfd++, iop++) {
1667                 if (pfd->revents & POLLIN || force_read) {
1668                         mip = &iop->mmap_info;
1669
1670                         ret = setup_mmap(iop->ofd, buf_size, mip);
1671                         if (ret < 0) {
1672                                 pfd->events = 0;
1673                                 break;
1674                         }
1675
1676                         ret = read(iop->ifd, mip->fs_buf + mip->fs_off,
1677                                    buf_size);
1678                         if (ret > 0) {
1679                                 pdc_dr_update(iop->dpp, tp->cpu, ret);
1680                                 mip->fs_size += ret;
1681                                 mip->fs_off += ret;
1682                                 nentries++;
1683                         } else if (ret == 0) {
1684                                 /*
1685                                  * Short reads after we're done stop us
1686                                  * from trying reads.
1687                                  */
1688                                 if (tp->is_done)
1689                                         clear_events(pfd);
1690                         } else {
1691                                 read_err(tp->cpu, iop->ifn);
1692                                 if (errno != EAGAIN || tp->is_done)
1693                                         clear_events(pfd);
1694                         }
1695                         nevs--;
1696                 }
1697         }
1698
1699         return nentries;
1700 }
1701
1702 static int handle_pfds_netclient(struct tracer *tp, int nevs, int force_read)
1703 {
1704         struct stat sb;
1705         int i, nentries = 0;
1706         struct pdc_stats *sp;
1707         struct pollfd *pfd = tp->pfds;
1708         struct io_info *iop = tp->ios;
1709
1710         for (i = 0; i < ndevs; i++, pfd++, iop++, sp++) {
1711                 if (pfd->revents & POLLIN || force_read) {
1712                         if (fstat(iop->ifd, &sb) < 0) {
1713                                 perror(iop->ifn);
1714                                 pfd->events = 0;
1715                         } else if (sb.st_size > (off_t)iop->data_queued) {
1716                                 iop->ready = sb.st_size - iop->data_queued;
1717                                 iop->data_queued = sb.st_size;
1718
1719                                 if (!net_sendfile_data(tp, iop)) {
1720                                         pdc_dr_update(iop->dpp, tp->cpu,
1721                                                       iop->ready);
1722                                         nentries++;
1723                                 } else
1724                                         clear_events(pfd);
1725                         }
1726                         if (--nevs == 0)
1727                                 break;
1728                 }
1729         }
1730
1731         if (nentries)
1732                 incr_entries(nentries);
1733
1734         return nentries;
1735 }
1736
1737 static int handle_pfds_entries(struct tracer *tp, int nevs, int force_read)
1738 {
1739         int i, nentries = 0;
1740         struct trace_buf *tbp;
1741         struct pollfd *pfd = tp->pfds;
1742         struct io_info *iop = tp->ios;
1743
1744         tbp = alloc_trace_buf(tp->cpu, buf_size);
1745         for (i = 0; i < ndevs; i++, pfd++, iop++) {
1746                 if (pfd->revents & POLLIN || force_read) {
1747                         tbp->len = read(iop->ifd, tbp->buf, buf_size);
1748                         if (tbp->len > 0) {
1749                                 pdc_dr_update(iop->dpp, tp->cpu, tbp->len);
1750                                 add_trace_buf(iop->dpp, tp->cpu, &tbp);
1751                                 nentries++;
1752                         } else if (tbp->len == 0) {
1753                                 /*
1754                                  * Short reads after we're done stop us
1755                                  * from trying reads.
1756                                  */
1757                                 if (tp->is_done)
1758                                         clear_events(pfd);
1759                         } else {
1760                                 read_err(tp->cpu, iop->ifn);
1761                                 if (errno != EAGAIN || tp->is_done)
1762                                         clear_events(pfd);
1763                         }
1764                         if (!piped_output && --nevs == 0)
1765                                 break;
1766                 }
1767         }
1768         free(tbp);
1769
1770         if (nentries)
1771                 incr_entries(nentries);
1772
1773         return nentries;
1774 }
1775
1776 static void *thread_main(void *arg)
1777 {
1778         int ret, ndone, to_val;
1779         struct tracer *tp = arg;
1780
1781         ret = lock_on_cpu(tp->cpu);
1782         if (ret)
1783                 goto err;
1784
1785         ret = open_ios(tp);
1786         if (ret)
1787                 goto err;
1788
1789         if (piped_output)
1790                 to_val = 50;            /* Frequent partial handles */
1791         else
1792                 to_val = 500;           /* 1/2 second intervals */
1793
1794
1795         tracer_signal_ready(tp, Th_running, 0);
1796         tracer_wait_unblock(tp);
1797
1798         while (!tp->is_done) {
1799                 ndone = poll(tp->pfds, ndevs, to_val);
1800                 if (ndone || piped_output)
1801                         (void)handle_pfds(tp, ndone, piped_output);
1802                 else if (ndone < 0 && errno != EINTR)
1803                         fprintf(stderr, "Thread %d poll failed: %d/%s\n",
1804                                 tp->cpu, errno, strerror(errno));
1805         }
1806
1807         /*
1808          * Trace is stopped, pull data until we get a short read
1809          */
1810         while (handle_pfds(tp, ndevs, 1) > 0)
1811                 ;
1812
1813         close_ios(tp);
1814         tracer_signal_ready(tp, Th_leaving, 0);
1815         return NULL;
1816
1817 err:
1818         tracer_signal_ready(tp, Th_error, ret);
1819         return NULL;
1820 }
1821
1822 static int start_tracer(int cpu)
1823 {
1824         struct tracer *tp;
1825
1826         tp = malloc(sizeof(*tp));
1827         memset(tp, 0, sizeof(*tp));
1828
1829         INIT_LIST_HEAD(&tp->head);
1830         tp->status = 0;
1831         tp->cpu = cpu;
1832
1833         if (pthread_create(&tp->thread, NULL, thread_main, tp)) {
1834                 fprintf(stderr, "FAILED to start thread on CPU %d: %d/%s\n",
1835                         cpu, errno, strerror(errno));
1836                 free(tp);
1837                 return 1;
1838         }
1839
1840         list_add_tail(&tp->head, &tracers);
1841         return 0;
1842 }
1843
1844 static void start_tracers(void)
1845 {
1846         int cpu;
1847         struct list_head *p;
1848
1849         for (cpu = 0; cpu < ncpus; cpu++)
1850                 if (start_tracer(cpu))
1851                         break;
1852
1853         wait_tracers_ready(cpu);
1854
1855         __list_for_each(p, &tracers) {
1856                 struct tracer *tp = list_entry(p, struct tracer, head);
1857                 if (tp->status)
1858                         fprintf(stderr,
1859                                 "FAILED to start thread on CPU %d: %d/%s\n",
1860                                 tp->cpu, tp->status, strerror(tp->status));
1861         }
1862 }
1863
1864 static void stop_tracers(void)
1865 {
1866         struct list_head *p;
1867
1868         /*
1869          * Stop the tracing - makes the tracer threads clean up quicker.
1870          */
1871         __list_for_each(p, &devpaths) {
1872                 struct devpath *dpp = list_entry(p, struct devpath, head);
1873                 (void)ioctl(dpp->fd, BLKTRACESTOP);
1874         }
1875
1876         /*
1877          * Tell each tracer to quit
1878          */
1879         __list_for_each(p, &tracers) {
1880                 struct tracer *tp = list_entry(p, struct tracer, head);
1881                 tp->is_done = 1;
1882         }
1883 }
1884
1885 static void del_tracers(void)
1886 {
1887         struct list_head *p, *q;
1888
1889         list_for_each_safe(p, q, &tracers) {
1890                 struct tracer *tp = list_entry(p, struct tracer, head);
1891
1892                 list_del(&tp->head);
1893                 free(tp);
1894         }
1895 }
1896
1897 static void wait_tracers(void)
1898 {
1899         struct list_head *p;
1900
1901         if (use_tracer_devpaths())
1902                 process_trace_bufs();
1903
1904         wait_tracers_leaving();
1905
1906         __list_for_each(p, &tracers) {
1907                 int ret;
1908                 struct tracer *tp = list_entry(p, struct tracer, head);
1909
1910                 ret = pthread_join(tp->thread, NULL);
1911                 if (ret)
1912                         fprintf(stderr, "Thread join %d failed %d\n",
1913                                 tp->cpu, ret);
1914         }
1915
1916         if (use_tracer_devpaths())
1917                 clean_trace_bufs();
1918
1919         get_all_drops();
1920 }
1921
1922 static void exit_tracing(void)
1923 {
1924         signal(SIGINT, SIG_IGN);
1925         signal(SIGHUP, SIG_IGN);
1926         signal(SIGTERM, SIG_IGN);
1927         signal(SIGALRM, SIG_IGN);
1928
1929         stop_tracers();
1930         wait_tracers();
1931         del_tracers();
1932         rel_devpaths();
1933 }
1934
1935 static void handle_sigint(__attribute__((__unused__)) int sig)
1936 {
1937         done = 1;
1938         stop_tracers();
1939 }
1940
1941 static void show_stats(struct list_head *devpaths)
1942 {
1943         FILE *ofp;
1944         struct list_head *p;
1945         unsigned long long nevents, data_read;
1946         unsigned long long total_drops = 0;
1947         unsigned long long total_events = 0;
1948
1949         if (piped_output)
1950                 ofp = my_fopen("/dev/null", "w");
1951         else
1952                 ofp = stdout;
1953
1954         __list_for_each(p, devpaths) {
1955                 int cpu;
1956                 struct pdc_stats *sp;
1957                 struct devpath *dpp = list_entry(p, struct devpath, head);
1958
1959                 if (net_mode == Net_server)
1960                         printf("server: end of run for %s:%s\n",
1961                                 dpp->ch->hostname, dpp->buts_name);
1962
1963                 data_read = 0;
1964                 nevents = 0;
1965
1966                 fprintf(ofp, "=== %s ===\n", dpp->buts_name);
1967                 for (cpu = 0, sp = dpp->stats; cpu < dpp->ncpus; cpu++, sp++) {
1968                         /*
1969                          * Estimate events if not known...
1970                          */
1971                         if (sp->nevents == 0) {
1972                                 sp->nevents = sp->data_read /
1973                                                 sizeof(struct blk_io_trace);
1974                         }
1975
1976                         fprintf(ofp,
1977                                 "  CPU%3d: %20llu events, %8llu KiB data\n",
1978                                 cpu, sp->nevents, (sp->data_read + 1023) >> 10);
1979
1980                         data_read += sp->data_read;
1981                         nevents += sp->nevents;
1982                 }
1983
1984                 fprintf(ofp, "  Total:  %20llu events (dropped %llu),"
1985                              " %8llu KiB data\n", nevents,
1986                              dpp->drops, (data_read + 1024) >> 10);
1987
1988                 total_drops += dpp->drops;
1989                 total_events += (nevents + dpp->drops);
1990         }
1991
1992         fflush(ofp);
1993         if (piped_output)
1994                 fclose(ofp);
1995
1996         if (total_drops) {
1997                 double drops_ratio = 1.0;
1998
1999                 if (total_events)
2000                         drops_ratio = (double)total_drops/(double)total_events;
2001
2002                 fprintf(stderr, "\nYou have %llu (%5.1lf%%) dropped events\n"
2003                                 "Consider using a larger buffer size (-b) "
2004                                 "and/or more buffers (-n)\n",
2005                         total_drops, 100.0 * drops_ratio);
2006         }
2007 }
2008
2009 static int handle_args(int argc, char *argv[])
2010 {
2011         int c, i;
2012         struct statfs st;
2013         int act_mask_tmp = 0;
2014
2015         while ((c = getopt_long(argc, argv, S_OPTS, l_opts, NULL)) >= 0) {
2016                 switch (c) {
2017                 case 'a':
2018                         i = find_mask_map(optarg);
2019                         if (i < 0) {
2020                                 fprintf(stderr, "Invalid action mask %s\n",
2021                                         optarg);
2022                                 return 1;
2023                         }
2024                         act_mask_tmp |= i;
2025                         break;
2026
2027                 case 'A':
2028                         if ((sscanf(optarg, "%x", &i) != 1) ||
2029                                                         !valid_act_opt(i)) {
2030                                 fprintf(stderr,
2031                                         "Invalid set action mask %s/0x%x\n",
2032                                         optarg, i);
2033                                 return 1;
2034                         }
2035                         act_mask_tmp = i;
2036                         break;
2037
2038                 case 'd':
2039                         if (add_devpath(optarg) != 0)
2040                                 return 1;
2041                         break;
2042
2043                 case 'I': {
2044                         char dev_line[256];
2045                         FILE *ifp = my_fopen(optarg, "r");
2046
2047                         if (!ifp) {
2048                                 fprintf(stderr,
2049                                         "Invalid file for devices %s\n",
2050                                         optarg);
2051                                 return 1;
2052                         }
2053
2054                         while (fscanf(ifp, "%s\n", dev_line) == 1)
2055                                 if (add_devpath(dev_line) != 0)
2056                                         return 1;
2057                         break;
2058                 }
2059
2060                 case 'r':
2061                         debugfs_path = optarg;
2062                         break;
2063
2064                 case 'o':
2065                         output_name = optarg;
2066                         break;
2067                 case 'k':
2068                         kill_running_trace = 1;
2069                         break;
2070                 case 'w':
2071                         stop_watch = atoi(optarg);
2072                         if (stop_watch <= 0) {
2073                                 fprintf(stderr,
2074                                         "Invalid stopwatch value (%d secs)\n",
2075                                         stop_watch);
2076                                 return 1;
2077                         }
2078                         break;
2079                 case 'V':
2080                 case 'v':
2081                         printf("%s version %s\n", argv[0], blktrace_version);
2082                         exit(0);
2083                         /*NOTREACHED*/
2084                 case 'b':
2085                         buf_size = strtoul(optarg, NULL, 10);
2086                         if (buf_size <= 0 || buf_size > 16*1024) {
2087                                 fprintf(stderr, "Invalid buffer size (%lu)\n",
2088                                         buf_size);
2089                                 return 1;
2090                         }
2091                         buf_size <<= 10;
2092                         break;
2093                 case 'n':
2094                         buf_nr = strtoul(optarg, NULL, 10);
2095                         if (buf_nr <= 0) {
2096                                 fprintf(stderr,
2097                                         "Invalid buffer nr (%lu)\n", buf_nr);
2098                                 return 1;
2099                         }
2100                         break;
2101                 case 'D':
2102                         output_dir = optarg;
2103                         break;
2104                 case 'h':
2105                         net_mode = Net_client;
2106                         strcpy(hostname, optarg);
2107                         break;
2108                 case 'l':
2109                         net_mode = Net_server;
2110                         break;
2111                 case 'p':
2112                         net_port = atoi(optarg);
2113                         break;
2114                 case 's':
2115                         net_use_sendfile = 0;
2116                         break;
2117                 default:
2118                         show_usage(argv[0]);
2119                         exit(1);
2120                         /*NOTREACHED*/
2121                 }
2122         }
2123
2124         while (optind < argc)
2125                 if (add_devpath(argv[optind++]) != 0)
2126                         return 1;
2127
2128         if (net_mode != Net_server && ndevs == 0) {
2129                 show_usage(argv[0]);
2130                 return 1;
2131         }
2132
2133         if (statfs(debugfs_path, &st) < 0 || st.f_type != (long)DEBUGFS_TYPE) {
2134                 fprintf(stderr, "Invalid debug path %s: %d/%s\n",
2135                         debugfs_path, errno, strerror(errno));
2136                 return 1;
2137         }
2138
2139         if (act_mask_tmp != 0)
2140                 act_mask = act_mask_tmp;
2141
2142         if (net_mode == Net_client && net_setup_addr())
2143                 return 1;
2144
2145         /*
2146          * Set up for appropriate PFD handler based upon output name.
2147          */
2148         if (net_client_use_sendfile())
2149                 handle_pfds = handle_pfds_netclient;
2150         else if (net_client_use_send())
2151                 handle_pfds = handle_pfds_entries;
2152         else if (output_name && (strcmp(output_name, "-") == 0)) {
2153                 piped_output = 1;
2154                 handle_pfds = handle_pfds_entries;
2155                 pfp = stdout;
2156                 setvbuf(pfp, NULL, _IONBF, 0);
2157         } else
2158                 handle_pfds = handle_pfds_file;
2159         return 0;
2160 }
2161
2162 static void ch_add_connection(struct net_server_s *ns, struct cl_host *ch,
2163                               int fd)
2164 {
2165         struct cl_conn *nc;
2166
2167         nc = malloc(sizeof(*nc));
2168         memset(nc, 0, sizeof(*nc));
2169
2170         time(&nc->connect_time);
2171         nc->ch = ch;
2172         nc->fd = fd;
2173         nc->ncpus = -1;
2174
2175         list_add_tail(&nc->ch_head, &ch->conn_list);
2176         ch->connects++;
2177
2178         list_add_tail(&nc->ns_head, &ns->conn_list);
2179         ns->connects++;
2180         ns->pfds = realloc(ns->pfds, (ns->connects+1) * sizeof(struct pollfd));
2181 }
2182
2183 static void ch_rem_connection(struct net_server_s *ns, struct cl_host *ch,
2184                               struct cl_conn *nc)
2185 {
2186         net_close_connection(&nc->fd);
2187
2188         list_del(&nc->ch_head);
2189         ch->connects--;
2190
2191         list_del(&nc->ns_head);
2192         ns->connects--;
2193         ns->pfds = realloc(ns->pfds, (ns->connects+1) * sizeof(struct pollfd));
2194
2195         free(nc);
2196 }
2197
2198 static struct cl_host *net_find_client_host(struct net_server_s *ns,
2199                                             struct in_addr cl_in_addr)
2200 {
2201         struct list_head *p;
2202
2203         __list_for_each(p, &ns->ch_list) {
2204                 struct cl_host *ch = list_entry(p, struct cl_host, head);
2205
2206                 if (in_addr_eq(ch->cl_in_addr, cl_in_addr))
2207                         return ch;
2208         }
2209
2210         return NULL;
2211 }
2212
2213 static struct cl_host *net_add_client_host(struct net_server_s *ns,
2214                                            struct sockaddr_in *addr)
2215 {
2216         struct cl_host *ch;
2217
2218         ch = malloc(sizeof(*ch));
2219         memset(ch, 0, sizeof(*ch));
2220
2221         ch->ns = ns;
2222         ch->cl_in_addr = addr->sin_addr;
2223         list_add_tail(&ch->head, &ns->ch_list);
2224         ns->nchs++;
2225
2226         ch->hostname = strdup(inet_ntoa(addr->sin_addr));
2227         printf("server: connection from %s\n", ch->hostname);
2228
2229         INIT_LIST_HEAD(&ch->conn_list);
2230         INIT_LIST_HEAD(&ch->devpaths);
2231
2232         return ch;
2233 }
2234
2235 static void device_done(struct devpath *dpp, int ncpus)
2236 {
2237         int cpu;
2238         struct io_info *iop;
2239
2240         for (cpu = 0, iop = dpp->ios; cpu < ncpus; cpu++, iop++)
2241                 close_iop(iop);
2242
2243         list_del(&dpp->head);
2244         dpp_free(dpp);
2245 }
2246
2247 static void net_ch_remove(struct cl_host *ch, int ncpus)
2248 {
2249         struct list_head *p, *q;
2250         struct net_server_s *ns = ch->ns;
2251
2252         list_for_each_safe(p, q, &ch->devpaths) {
2253                 struct devpath *dpp = list_entry(p, struct devpath, head);
2254                 device_done(dpp, ncpus);
2255         }
2256
2257         list_for_each_safe(p, q, &ch->conn_list) {
2258                 struct cl_conn *nc = list_entry(p, struct cl_conn, ch_head);
2259
2260                 ch_rem_connection(ns, ch, nc);
2261         }
2262
2263         list_del(&ch->head);
2264         ns->nchs--;
2265
2266         if (ch->hostname)
2267                 free(ch->hostname);
2268         free(ch);
2269 }
2270
2271 static void net_add_connection(struct net_server_s *ns)
2272 {
2273         int fd;
2274         struct cl_host *ch;
2275         socklen_t socklen = sizeof(ns->addr);
2276
2277         fd = my_accept(ns->listen_fd, (struct sockaddr *)&ns->addr, &socklen);
2278         if (fd < 0) {
2279                 /*
2280                  * This is OK: we just won't accept this connection,
2281                  * nothing fatal.
2282                  */
2283                 perror("accept");
2284         } else {
2285                 ch = net_find_client_host(ns, ns->addr.sin_addr);
2286                 if (!ch)
2287                         ch = net_add_client_host(ns, &ns->addr);
2288
2289                 ch_add_connection(ns, ch, fd);
2290         }
2291 }
2292
2293 static struct devpath *nc_add_dpp(struct cl_conn *nc,
2294                                   struct blktrace_net_hdr *bnh,
2295                                   time_t connect_time)
2296 {
2297         int cpu;
2298         struct io_info *iop;
2299         struct devpath *dpp;
2300
2301         dpp = malloc(sizeof(*dpp));
2302         memset(dpp, 0, sizeof(*dpp));
2303
2304         dpp->buts_name = strdup(bnh->buts_name);
2305         dpp->path = strdup(bnh->buts_name);
2306         dpp->fd = -1;
2307         dpp->ch = nc->ch;
2308         dpp->cl_id = bnh->cl_id;
2309         dpp->cl_connect_time = connect_time;
2310         dpp->ncpus = nc->ncpus;
2311         dpp->stats = calloc(dpp->ncpus, sizeof(*dpp->stats));
2312         memset(dpp->stats, 0, dpp->ncpus * sizeof(*dpp->stats));
2313
2314         list_add_tail(&dpp->head, &nc->ch->devpaths);
2315         nc->ch->ndevs++;
2316
2317         dpp->ios = calloc(nc->ncpus, sizeof(*iop));
2318         memset(dpp->ios, 0, ndevs * sizeof(*iop));
2319
2320         for (cpu = 0, iop = dpp->ios; cpu < nc->ncpus; cpu++, iop++) {
2321                 iop->dpp = dpp;
2322                 iop->nc = nc;
2323                 init_mmap_info(&iop->mmap_info);
2324
2325                 if (iop_open(iop, cpu))
2326                         goto err;
2327         }
2328
2329         return dpp;
2330
2331 err:
2332         /*
2333          * Need to unravel what's been done...
2334          */
2335         while (cpu >= 0)
2336                 close_iop(&dpp->ios[cpu--]);
2337         dpp_free(dpp);
2338
2339         return NULL;
2340 }
2341
2342 static struct devpath *nc_find_dpp(struct cl_conn *nc,
2343                                    struct blktrace_net_hdr *bnh)
2344 {
2345         struct list_head *p;
2346         time_t connect_time = nc->connect_time;
2347
2348         __list_for_each(p, &nc->ch->devpaths) {
2349                 struct devpath *dpp = list_entry(p, struct devpath, head);
2350
2351                 if (!strcmp(dpp->buts_name, bnh->buts_name))
2352                         return dpp;
2353
2354                 if (dpp->cl_id == bnh->cl_id)
2355                         connect_time = dpp->cl_connect_time;
2356         }
2357
2358         return nc_add_dpp(nc, bnh, connect_time);
2359 }
2360
2361 static void net_client_read_data(struct cl_conn *nc, struct devpath *dpp,
2362                                  struct blktrace_net_hdr *bnh)
2363 {
2364         int ret;
2365         struct io_info *iop = &dpp->ios[bnh->cpu];
2366         struct mmap_info *mip = &iop->mmap_info;
2367
2368         if (setup_mmap(iop->ofd, bnh->len, &iop->mmap_info)) {
2369                 fprintf(stderr, "ncd(%s:%d): mmap failed\n",
2370                         nc->ch->hostname, nc->fd);
2371                 exit(1);
2372         }
2373
2374         ret = net_recv_data(nc->fd, mip->fs_buf + mip->fs_off, bnh->len);
2375         if (ret > 0) {
2376                 pdc_dr_update(dpp, bnh->cpu, ret);
2377                 mip->fs_size += ret;
2378                 mip->fs_off += ret;
2379         } else if (ret < 0)
2380                 exit(1);
2381 }
2382
2383 /*
2384  * Returns 1 if we closed a host - invalidates other polling information
2385  * that may be present.
2386  */
2387 static int net_client_data(struct cl_conn *nc)
2388 {
2389         int ret;
2390         struct devpath *dpp;
2391         struct blktrace_net_hdr bnh;
2392
2393         ret = net_get_header(nc, &bnh);
2394         if (ret == 0)
2395                 return 0;
2396
2397         if (ret < 0) {
2398                 fprintf(stderr, "ncd(%d): header read failed\n", nc->fd);
2399                 exit(1);
2400         }
2401
2402         if (data_is_native == -1 && check_data_endianness(bnh.magic)) {
2403                 fprintf(stderr, "ncd(%d): received data is bad\n", nc->fd);
2404                 exit(1);
2405         }
2406
2407         if (!data_is_native) {
2408                 bnh.magic = be32_to_cpu(bnh.magic);
2409                 bnh.cpu = be32_to_cpu(bnh.cpu);
2410                 bnh.max_cpus = be32_to_cpu(bnh.max_cpus);
2411                 bnh.len = be32_to_cpu(bnh.len);
2412                 bnh.cl_id = be32_to_cpu(bnh.cl_id);
2413                 bnh.buf_size = be32_to_cpu(bnh.buf_size);
2414                 bnh.buf_nr = be32_to_cpu(bnh.buf_nr);
2415                 bnh.page_size = be32_to_cpu(bnh.page_size);
2416         }
2417
2418         if ((bnh.magic & 0xffffff00) != BLK_IO_TRACE_MAGIC) {
2419                 fprintf(stderr, "ncd(%s:%d): bad data magic\n",
2420                         nc->ch->hostname, nc->fd);
2421                 exit(1);
2422         }
2423
2424         if (nc->ncpus == -1)
2425                 nc->ncpus = bnh.max_cpus;
2426
2427         /*
2428          * len == 0 means the other end is sending us a new connection/dpp
2429          * len == 1 means that the other end signalled end-of-run
2430          */
2431         dpp = nc_find_dpp(nc, &bnh);
2432         if (bnh.len == 0) {
2433                 /*
2434                  * Just adding in the dpp above is enough
2435                  */
2436                 ack_open_close(nc->fd, dpp->buts_name);
2437                 nc->ch->cl_opens++;
2438         } else if (bnh.len == 1) {
2439                 /*
2440                  * overload cpu count with dropped events
2441                  */
2442                 dpp->drops = bnh.cpu;
2443
2444                 ack_open_close(nc->fd, dpp->buts_name);
2445                 if (--nc->ch->cl_opens == 0) {
2446                         show_stats(&nc->ch->devpaths);
2447                         net_ch_remove(nc->ch, nc->ncpus);
2448                         return 1;
2449                 }
2450         } else
2451                 net_client_read_data(nc, dpp, &bnh);
2452
2453         return 0;
2454 }
2455
2456 static void handle_client_data(struct net_server_s *ns, int events)
2457 {
2458         struct cl_conn *nc;
2459         struct pollfd *pfd;
2460         struct list_head *p, *q;
2461
2462         pfd = &ns->pfds[1];
2463         list_for_each_safe(p, q, &ns->conn_list) {
2464                 if (pfd->revents & POLLIN) {
2465                         nc = list_entry(p, struct cl_conn, ns_head);
2466
2467                         if (net_client_data(nc) || --events == 0)
2468                                 break;
2469                 }
2470                 pfd++;
2471         }
2472 }
2473
2474 static void net_setup_pfds(struct net_server_s *ns)
2475 {
2476         struct pollfd *pfd;
2477         struct list_head *p;
2478
2479         ns->pfds[0].fd = ns->listen_fd;
2480         ns->pfds[0].events = POLLIN;
2481
2482         pfd = &ns->pfds[1];
2483         __list_for_each(p, &ns->conn_list) {
2484                 struct cl_conn *nc = list_entry(p, struct cl_conn, ns_head);
2485
2486                 pfd->fd = nc->fd;
2487                 pfd->events = POLLIN;
2488                 pfd++;
2489         }
2490 }
2491
2492 static int net_server_handle_connections(struct net_server_s *ns)
2493 {
2494         int events;
2495
2496         printf("server: waiting for connections...\n");
2497
2498         while (!done) {
2499                 net_setup_pfds(ns);
2500                 events = poll(ns->pfds, ns->connects + 1, -1);
2501                 if (events < 0) {
2502                         if (errno != EINTR) {
2503                                 perror("FATAL: poll error");
2504                                 return 1;
2505                         }
2506                 } else if (events > 0) {
2507                         if (ns->pfds[0].revents & POLLIN) {
2508                                 net_add_connection(ns);
2509                                 events--;
2510                         }
2511
2512                         if (events)
2513                                 handle_client_data(ns, events);
2514                 }
2515         }
2516
2517         return 0;
2518 }
2519
2520 static int net_server(void)
2521 {
2522         int fd, opt;
2523         int ret = 1;
2524         struct net_server_s net_server;
2525         struct net_server_s *ns = &net_server;
2526
2527         memset(ns, 0, sizeof(*ns));
2528         INIT_LIST_HEAD(&ns->ch_list);
2529         INIT_LIST_HEAD(&ns->conn_list);
2530         ns->pfds = malloc(sizeof(struct pollfd));
2531
2532         fd = my_socket(AF_INET, SOCK_STREAM, 0);
2533         if (fd < 0) {
2534                 perror("server: socket");
2535                 goto out;
2536         }
2537
2538         opt = 1;
2539         if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
2540                 perror("setsockopt");
2541                 goto out;
2542         }
2543
2544         memset(&ns->addr, 0, sizeof(ns->addr));
2545         ns->addr.sin_family = AF_INET;
2546         ns->addr.sin_addr.s_addr = htonl(INADDR_ANY);
2547         ns->addr.sin_port = htons(net_port);
2548
2549         if (bind(fd, (struct sockaddr *) &ns->addr, sizeof(ns->addr)) < 0) {
2550                 perror("bind");
2551                 goto out;
2552         }
2553
2554         if (listen(fd, 1) < 0) {
2555                 perror("listen");
2556                 goto out;
2557         }
2558
2559         /*
2560          * The actual server looping is done here:
2561          */
2562         ns->listen_fd = fd;
2563         ret = net_server_handle_connections(ns);
2564
2565         /*
2566          * Clean up and return...
2567          */
2568 out:
2569         free(ns->pfds);
2570         return ret;
2571 }
2572
2573 static int run_tracers(void)
2574 {
2575         atexit(exit_tracing);
2576         if (net_mode == Net_client)
2577                 printf("blktrace: connecting to %s\n", hostname);
2578
2579         setup_buts();
2580
2581         if (use_tracer_devpaths()) {
2582                 if (setup_tracer_devpaths())
2583                         return 1;
2584
2585                 if (piped_output)
2586                         handle_list = handle_list_file;
2587                 else
2588                         handle_list = handle_list_net;
2589         }
2590
2591         start_tracers();
2592         if (nthreads_running == ncpus) {
2593                 unblock_tracers();
2594                 start_buts();
2595                 if (net_mode == Net_client)
2596                         printf("blktrace: connected!\n");
2597                 if (stop_watch)
2598                         alarm(stop_watch);
2599         } else
2600                 stop_tracers();
2601
2602         wait_tracers();
2603         if (nthreads_running == ncpus)
2604                 show_stats(&devpaths);
2605         if (net_client_use_send())
2606                 close_client_connections();
2607         del_tracers();
2608
2609         return 0;
2610 }
2611
2612 int main(int argc, char *argv[])
2613 {
2614         int ret = 0;
2615
2616         setlocale(LC_NUMERIC, "en_US");
2617         pagesize = getpagesize();
2618         ncpus = sysconf(_SC_NPROCESSORS_ONLN);
2619         if (ncpus < 0) {
2620                 fprintf(stderr, "sysconf(_SC_NPROCESSORS_ONLN) failed %d/%s\n",
2621                         errno, strerror(errno));
2622                 ret = 1;
2623                 goto out;
2624         } else if (handle_args(argc, argv)) {
2625                 ret = 1;
2626                 goto out;
2627         }
2628
2629         if (ndevs > 1 && output_name && strcmp(output_name, "-") != 0) {
2630                 fprintf(stderr, "-o not supported with multiple devices\n");
2631                 ret = 1;
2632                 goto out;
2633         }
2634
2635         signal(SIGINT, handle_sigint);
2636         signal(SIGHUP, handle_sigint);
2637         signal(SIGTERM, handle_sigint);
2638         signal(SIGALRM, handle_sigint);
2639         signal(SIGPIPE, SIG_IGN);
2640
2641         if (kill_running_trace) {
2642                 struct devpath *dpp;
2643                 struct list_head *p;
2644
2645                 __list_for_each(p, &devpaths) {
2646                         dpp = list_entry(p, struct devpath, head);
2647                         if (__stop_trace(dpp->fd)) {
2648                                 fprintf(stderr,
2649                                         "BLKTRACETEARDOWN %s failed: %d/%s\n",
2650                                         dpp->path, errno, strerror(errno));
2651                         }
2652                 }
2653         } else if (net_mode == Net_server) {
2654                 if (output_name) {
2655                         fprintf(stderr, "-o ignored in server mode\n");
2656                         output_name = NULL;
2657                 }
2658                 ret = net_server();
2659         } else
2660                 ret = run_tracers();
2661
2662 out:
2663         if (pfp)
2664                 fclose(pfp);
2665         rel_devpaths();
2666         return ret;
2667 }