blktrace segfault
[blktrace.git] / blktrace.c
1 /*
2  * block queue tracing application
3  *
4  * Copyright (C) 2005 Jens Axboe <axboe@suse.de>
5  * Copyright (C) 2006 Jens Axboe <axboe@kernel.dk>
6  *
7  *  This program is free software; you can redistribute it and/or modify
8  *  it under the terms of the GNU General Public License as published by
9  *  the Free Software Foundation; either version 2 of the License, or
10  *  (at your option) any later version.
11  *
12  *  This program is distributed in the hope that it will be useful,
13  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *  GNU General Public License for more details.
16  *
17  *  You should have received a copy of the GNU General Public License
18  *  along with this program; if not, write to the Free Software
19  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
20  *
21  */
22 #include <pthread.h>
23 #include <sys/types.h>
24 #include <sys/stat.h>
25 #include <unistd.h>
26 #include <locale.h>
27 #include <signal.h>
28 #include <fcntl.h>
29 #include <string.h>
30 #include <sys/ioctl.h>
31 #include <sys/param.h>
32 #include <sys/statfs.h>
33 #include <sys/poll.h>
34 #include <sys/mman.h>
35 #include <sys/socket.h>
36 #include <stdio.h>
37 #include <stdlib.h>
38 #include <sched.h>
39 #include <ctype.h>
40 #include <getopt.h>
41 #include <errno.h>
42 #include <netinet/in.h>
43 #include <arpa/inet.h>
44 #include <netdb.h>
45 #include <sys/sendfile.h>
46
47 #include "blktrace.h"
48 #include "barrier.h"
49
50 static char blktrace_version[] = "0.99.3";
51
52 /*
53  * You may want to increase this even more, if you are logging at a high
54  * rate and see skipped/missed events
55  */
56 #define BUF_SIZE        (512 * 1024)
57 #define BUF_NR          (4)
58
59 #define OFILE_BUF       (128 * 1024)
60
61 #define DEBUGFS_TYPE    0x64626720
62
63 #define S_OPTS  "d:a:A:r:o:kw:Vb:n:D:lh:p:sI:"
64 static struct option l_opts[] = {
65         {
66                 .name = "dev",
67                 .has_arg = required_argument,
68                 .flag = NULL,
69                 .val = 'd'
70         },
71         {
72                 .name = "input-devs",
73                 .has_arg = required_argument,
74                 .flag = NULL,
75                 .val = 'I'
76         },
77         {
78                 .name = "act-mask",
79                 .has_arg = required_argument,
80                 .flag = NULL,
81                 .val = 'a'
82         },
83         {
84                 .name = "set-mask",
85                 .has_arg = required_argument,
86                 .flag = NULL,
87                 .val = 'A'
88         },
89         {
90                 .name = "relay",
91                 .has_arg = required_argument,
92                 .flag = NULL,
93                 .val = 'r'
94         },
95         {
96                 .name = "output",
97                 .has_arg = required_argument,
98                 .flag = NULL,
99                 .val = 'o'
100         },
101         {
102                 .name = "kill",
103                 .has_arg = no_argument,
104                 .flag = NULL,
105                 .val = 'k'
106         },
107         {
108                 .name = "stopwatch",
109                 .has_arg = required_argument,
110                 .flag = NULL,
111                 .val = 'w'
112         },
113         {
114                 .name = "version",
115                 .has_arg = no_argument,
116                 .flag = NULL,
117                 .val = 'V'
118         },
119         {
120                 .name = "buffer-size",
121                 .has_arg = required_argument,
122                 .flag = NULL,
123                 .val = 'b'
124         },
125         {
126                 .name = "num-sub-buffers",
127                 .has_arg = required_argument,
128                 .flag = NULL,
129                 .val = 'n'
130         },
131         {
132                 .name = "output-dir",
133                 .has_arg = required_argument,
134                 .flag = NULL,
135                 .val = 'D'
136         },
137         {
138                 .name = "listen",
139                 .has_arg = no_argument,
140                 .flag = NULL,
141                 .val = 'l'
142         },
143         {
144                 .name = "host",
145                 .has_arg = required_argument,
146                 .flag = NULL,
147                 .val = 'h'
148         },
149         {
150                 .name = "port",
151                 .has_arg = required_argument,
152                 .flag = NULL,
153                 .val = 'p'
154         },
155         {
156                 .name = "no-sendfile",
157                 .has_arg = no_argument,
158                 .flag = NULL,
159                 .val = 's'
160         },
161         {
162                 .name = NULL,
163         }
164 };
165
166 struct tip_subbuf {
167         void *buf;
168         unsigned int len;
169         unsigned int max_len;
170 };
171
172 #define FIFO_SIZE       (1024)  /* should be plenty big! */
173 #define CL_SIZE         (128)   /* cache line, any bigger? */
174
175 struct tip_subbuf_fifo {
176         int tail __attribute__((aligned(CL_SIZE)));
177         int head __attribute__((aligned(CL_SIZE)));
178         struct tip_subbuf *q[FIFO_SIZE];
179 };
180
181 struct thread_information {
182         int cpu;
183         pthread_t thread;
184
185         int fd;
186         void *fd_buf;
187         char fn[MAXPATHLEN + 64];
188
189         FILE *ofile;
190         char *ofile_buffer;
191         off_t ofile_offset;
192         int ofile_stdout;
193         int ofile_mmap;
194
195         int (*get_subbuf)(struct thread_information *, unsigned int);
196         int (*flush_subbuf)(struct thread_information *, struct tip_subbuf *);
197         int (*read_data)(struct thread_information *, void *, unsigned int);
198
199         unsigned long events_processed;
200         unsigned long long data_read;
201         unsigned long long data_queued;
202         struct device_information *device;
203
204         int exited;
205
206         /*
207          * piped fifo buffers
208          */
209         struct tip_subbuf_fifo fifo;
210         struct tip_subbuf *leftover_ts;
211
212         /*
213          * mmap controlled output files
214          */
215         unsigned long long fs_size;
216         unsigned long long fs_max_size;
217         unsigned long fs_off;
218         void *fs_buf;
219         unsigned long fs_buf_len;
220
221         struct net_connection *nc;
222 };
223
224 struct device_information {
225         int fd;
226         char *path;
227         char buts_name[32];
228         volatile int trace_started;
229         unsigned long drop_count;
230         struct thread_information *threads;
231         unsigned long buf_size;
232         unsigned long buf_nr;
233         unsigned int page_size;
234
235         struct cl_host *ch;
236         u32 cl_id;
237         time_t cl_connect_time;
238 };
239
240 static int ncpus;
241 static struct thread_information *thread_information;
242 static int ndevs;
243 static struct device_information *device_information;
244
245 /* command line option globals */
246 static char *debugfs_path;
247 static char *output_name;
248 static char *output_dir;
249 static int act_mask = ~0U;
250 static int kill_running_trace;
251 static unsigned long buf_size = BUF_SIZE;
252 static unsigned long buf_nr = BUF_NR;
253 static unsigned int page_size;
254
255 #define is_done()       (*(volatile int *)(&done))
256 static volatile int done;
257
258 #define is_trace_stopped()      (*(volatile int *)(&trace_stopped))
259 static volatile int trace_stopped;
260
261 #define is_stat_shown() (*(volatile int *)(&stat_shown))
262 static volatile int stat_shown;
263
264 int data_is_native = -1;
265
266 static void exit_trace(int status);
267
268 #define dip_tracing(dip)        (*(volatile int *)(&(dip)->trace_started))
269 #define dip_set_tracing(dip, v) ((dip)->trace_started = (v))
270
271 #define __for_each_dip(__d, __di, __e, __i)     \
272         for (__i = 0, __d = __di; __i < __e; __i++, __d++)
273
274 #define for_each_dip(__d, __i)          \
275         __for_each_dip(__d, device_information, ndevs, __i)
276 #define for_each_nc_dip(__nc, __d, __i)         \
277         __for_each_dip(__d, (__nc)->ch->device_information, (__nc)->ch->ndevs, __i)
278
279 #define __for_each_tip(__d, __t, __ncpus, __j)  \
280         for (__j = 0, __t = (__d)->threads; __j < __ncpus; __j++, __t++)
281 #define for_each_tip(__d, __t, __j)     \
282         __for_each_tip(__d, __t, ncpus, __j)
283 #define for_each_cl_host(__c)   \
284         for (__c = cl_host_list; __c; __c = __c->list_next)
285
286 /*
287  * networking stuff follows. we include a magic number so we know whether
288  * to endianness convert or not
289  */
290 struct blktrace_net_hdr {
291         u32 magic;              /* same as trace magic */
292         char buts_name[32];     /* trace name */
293         u32 cpu;                /* for which cpu */
294         u32 max_cpus;
295         u32 len;                /* length of following trace data */
296         u32 cl_id;              /* id for set of client per-cpu connections */
297         u32 buf_size;           /* client buf_size for this trace  */
298         u32 buf_nr;             /* client buf_nr for this trace  */
299         u32 page_size;          /* client page_size for this trace  */
300 };
301
302 #define TRACE_NET_PORT          (8462)
303
304 enum {
305         Net_none = 0,
306         Net_server,
307         Net_client,
308 };
309
310 /*
311  * network cmd line params
312  */
313 static char hostname[MAXHOSTNAMELEN];
314 static int net_port = TRACE_NET_PORT;
315 static int net_mode = 0;
316 static int net_use_sendfile = 1;
317
318 struct cl_host {
319         struct cl_host *list_next;
320         struct in_addr cl_in_addr;
321         struct net_connection *net_connections;
322         int nconn;
323         struct device_information *device_information;
324         int ndevs;
325         int ncpus;
326         int ndevs_done;
327 };
328
329 struct net_connection {
330         int in_fd;
331         struct pollfd pfd;
332         time_t connect_time;
333         struct cl_host *ch;
334         int ncpus;
335 };
336
337 #define NET_MAX_CL_HOSTS        (1024)
338 static struct cl_host *cl_host_list;
339 static int cl_hosts;
340 static int net_connects;
341
342 static int *net_out_fd;
343
344 static void handle_sigint(__attribute__((__unused__)) int sig)
345 {
346         struct device_information *dip;
347         int i;
348
349         /*
350          * stop trace so we can reap currently produced data
351          */
352         for_each_dip(dip, i) {
353                 if (dip->fd == -1)
354                         continue;
355                 if (ioctl(dip->fd, BLKTRACESTOP) < 0)
356                         perror("BLKTRACESTOP");
357         }
358
359         done = 1;
360 }
361
362 static int get_dropped_count(const char *buts_name)
363 {
364         int fd;
365         char tmp[MAXPATHLEN + 64];
366
367         snprintf(tmp, sizeof(tmp), "%s/block/%s/dropped",
368                  debugfs_path, buts_name);
369
370         fd = open(tmp, O_RDONLY);
371         if (fd < 0) {
372                 /*
373                  * this may be ok, if the kernel doesn't support dropped counts
374                  */
375                 if (errno == ENOENT)
376                         return 0;
377
378                 fprintf(stderr, "Couldn't open dropped file %s\n", tmp);
379                 return -1;
380         }
381
382         if (read(fd, tmp, sizeof(tmp)) < 0) {
383                 perror(tmp);
384                 close(fd);
385                 return -1;
386         }
387
388         close(fd);
389
390         return atoi(tmp);
391 }
392
393 static int start_trace(struct device_information *dip)
394 {
395         struct blk_user_trace_setup buts;
396
397         memset(&buts, 0, sizeof(buts));
398         buts.buf_size = dip->buf_size;
399         buts.buf_nr = dip->buf_nr;
400         buts.act_mask = act_mask;
401
402         if (ioctl(dip->fd, BLKTRACESETUP, &buts) < 0) {
403                 perror("BLKTRACESETUP");
404                 return 1;
405         }
406
407         if (ioctl(dip->fd, BLKTRACESTART) < 0) {
408                 perror("BLKTRACESTART");
409                 return 1;
410         }
411
412         memcpy(dip->buts_name, buts.name, sizeof(dip->buts_name));
413         dip_set_tracing(dip, 1);
414         return 0;
415 }
416
417 static void stop_trace(struct device_information *dip)
418 {
419         if (dip_tracing(dip) || kill_running_trace) {
420                 dip_set_tracing(dip, 0);
421
422                 /*
423                  * should be stopped, just don't complain if it isn't
424                  */
425                 ioctl(dip->fd, BLKTRACESTOP);
426
427                 if (ioctl(dip->fd, BLKTRACETEARDOWN) < 0)
428                         perror("BLKTRACETEARDOWN");
429
430                 close(dip->fd);
431                 dip->fd = -1;
432         }
433 }
434
435 static void stop_all_traces(void)
436 {
437         struct device_information *dip;
438         int i;
439
440         for_each_dip(dip, i) {
441                 dip->drop_count = get_dropped_count(dip->buts_name);
442                 stop_trace(dip);
443         }
444 }
445
446 static void wait_for_data(struct thread_information *tip, int timeout)
447 {
448         struct pollfd pfd = { .fd = tip->fd, .events = POLLIN };
449
450         while (!is_done()) {
451                 if (poll(&pfd, 1, timeout) < 0) {
452                         perror("poll");
453                         break;
454                 }
455                 if (pfd.revents & POLLIN)
456                         break;
457                 if (tip->ofile_stdout)
458                         break;
459         }
460 }
461
462 static int read_data_file(struct thread_information *tip, void *buf,
463                           unsigned int len)
464 {
465         int ret = 0;
466
467         do {
468                 wait_for_data(tip, 100);
469
470                 ret = read(tip->fd, buf, len);
471                 if (!ret)
472                         continue;
473                 else if (ret > 0)
474                         return ret;
475                 else {
476                         if (errno != EAGAIN) {
477                                 perror(tip->fn);
478                                 fprintf(stderr,"Thread %d failed read of %s\n",
479                                         tip->cpu, tip->fn);
480                                 break;
481                         }
482                         continue;
483                 }
484         } while (!is_done());
485
486         return ret;
487
488 }
489
490 static int read_data_net(struct thread_information *tip, void *buf,
491                          unsigned int len)
492 {
493         struct net_connection *nc = tip->nc;
494         unsigned int bytes_left = len;
495         int ret = 0;
496
497         do {
498                 ret = recv(nc->in_fd, buf, bytes_left, MSG_WAITALL);
499
500                 if (!ret)
501                         continue;
502                 else if (ret < 0) {
503                         if (errno != EAGAIN) {
504                                 perror(tip->fn);
505                                 fprintf(stderr, "server: failed read\n");
506                                 return 0;
507                         }
508                         continue;
509                 } else {
510                         buf += ret;
511                         bytes_left -= ret;
512                 }
513         } while (!is_done() && bytes_left);
514
515         return len - bytes_left;
516 }
517
518 static inline struct tip_subbuf *
519 subbuf_fifo_dequeue(struct thread_information *tip)
520 {
521         const int head = tip->fifo.head;
522         const int next = (head + 1) & (FIFO_SIZE - 1);
523
524         if (head != tip->fifo.tail) {
525                 struct tip_subbuf *ts = tip->fifo.q[head];
526
527                 store_barrier();
528                 tip->fifo.head = next;
529                 return ts;
530         }
531
532         return NULL;
533 }
534
535 static inline int subbuf_fifo_queue(struct thread_information *tip,
536                                     struct tip_subbuf *ts)
537 {
538         const int tail = tip->fifo.tail;
539         const int next = (tail + 1) & (FIFO_SIZE - 1);
540
541         if (next != tip->fifo.head) {
542                 tip->fifo.q[tail] = ts;
543                 store_barrier();
544                 tip->fifo.tail = next;
545                 return 0;
546         }
547
548         fprintf(stderr, "fifo too small!\n");
549         return 1;
550 }
551
552 /*
553  * For file output, truncate and mmap the file appropriately
554  */
555 static int mmap_subbuf(struct thread_information *tip, unsigned int maxlen)
556 {
557         int ofd = fileno(tip->ofile);
558         int ret;
559         unsigned long nr;
560
561         /*
562          * extend file, if we have to. use chunks of 16 subbuffers.
563          */
564         if (tip->fs_off + maxlen > tip->fs_buf_len) {
565                 if (tip->fs_buf) {
566                         munlock(tip->fs_buf, tip->fs_buf_len);
567                         munmap(tip->fs_buf, tip->fs_buf_len);
568                         tip->fs_buf = NULL;
569                 }
570
571                 tip->fs_off = tip->fs_size & (tip->device->page_size - 1);
572                 nr = max(16, tip->device->buf_nr);
573                 tip->fs_buf_len = (nr * tip->device->buf_size) - tip->fs_off;
574                 tip->fs_max_size += tip->fs_buf_len;
575
576                 if (ftruncate(ofd, tip->fs_max_size) < 0) {
577                         perror("ftruncate");
578                         return -1;
579                 }
580
581                 tip->fs_buf = mmap(NULL, tip->fs_buf_len, PROT_WRITE,
582                                    MAP_SHARED, ofd, tip->fs_size - tip->fs_off);
583                 if (tip->fs_buf == MAP_FAILED) {
584                         perror("mmap");
585                         return -1;
586                 }
587                 mlock(tip->fs_buf, tip->fs_buf_len);
588         }
589
590         ret = tip->read_data(tip, tip->fs_buf + tip->fs_off, maxlen);
591         if (ret >= 0) {
592                 tip->data_read += ret;
593                 tip->fs_size += ret;
594                 tip->fs_off += ret;
595                 return 0;
596         }
597
598         return -1;
599 }
600
601 /*
602  * Use the copy approach for pipes and network
603  */
604 static int get_subbuf(struct thread_information *tip, unsigned int maxlen)
605 {
606         struct tip_subbuf *ts = malloc(sizeof(*ts));
607         int ret;
608
609         ts->buf = malloc(tip->device->buf_size);
610         ts->max_len = maxlen;
611
612         ret = tip->read_data(tip, ts->buf, ts->max_len);
613         if (ret > 0) {
614                 ts->len = ret;
615                 tip->data_read += ret;
616                 if (subbuf_fifo_queue(tip, ts))
617                         ret = -1;
618         }
619
620         if (ret <= 0) {
621                 free(ts->buf);
622                 free(ts);
623         }
624
625         return ret;
626 }
627
628 static void close_thread(struct thread_information *tip)
629 {
630         if (tip->fd != -1)
631                 close(tip->fd);
632         if (tip->ofile)
633                 fclose(tip->ofile);
634         if (tip->ofile_buffer)
635                 free(tip->ofile_buffer);
636         if (tip->fd_buf)
637                 free(tip->fd_buf);
638
639         tip->fd = -1;
640         tip->ofile = NULL;
641         tip->ofile_buffer = NULL;
642         tip->fd_buf = NULL;
643 }
644
645 static void tip_ftrunc_final(struct thread_information *tip)
646 {
647         /*
648          * truncate to right size and cleanup mmap
649          */
650         if (tip->ofile_mmap && tip->ofile) {
651                 int ofd = fileno(tip->ofile);
652
653                 if (tip->fs_buf)
654                         munmap(tip->fs_buf, tip->fs_buf_len);
655
656                 ftruncate(ofd, tip->fs_size);
657         }
658 }
659
660 static void *thread_main(void *arg)
661 {
662         struct thread_information *tip = arg;
663         pid_t pid = getpid();
664         cpu_set_t cpu_mask;
665
666         CPU_ZERO(&cpu_mask);
667         CPU_SET((tip->cpu), &cpu_mask);
668
669         if (sched_setaffinity(pid, sizeof(cpu_mask), &cpu_mask) == -1) {
670                 perror("sched_setaffinity");
671                 exit_trace(1);
672         }
673
674         snprintf(tip->fn, sizeof(tip->fn), "%s/block/%s/trace%d",
675                         debugfs_path, tip->device->buts_name, tip->cpu);
676         tip->fd = open(tip->fn, O_RDONLY);
677         if (tip->fd < 0) {
678                 perror(tip->fn);
679                 fprintf(stderr,"Thread %d failed open of %s\n", tip->cpu,
680                         tip->fn);
681                 exit_trace(1);
682         }
683
684         while (!is_done()) {
685                 if (tip->get_subbuf(tip, tip->device->buf_size) < 0)
686                         break;
687         }
688
689         /*
690          * trace is stopped, pull data until we get a short read
691          */
692         while (tip->get_subbuf(tip, tip->device->buf_size) > 0)
693                 ;
694
695         tip_ftrunc_final(tip);
696         tip->exited = 1;
697         return NULL;
698 }
699
700 static int write_data_net(int fd, void *buf, unsigned int buf_len)
701 {
702         unsigned int bytes_left = buf_len;
703         int ret;
704
705         while (bytes_left) {
706                 ret = send(fd, buf, bytes_left, 0);
707                 if (ret < 0) {
708                         perror("send");
709                         return 1;
710                 }
711
712                 buf += ret;
713                 bytes_left -= ret;
714         }
715
716         return 0;
717 }
718
719 static int net_send_header(struct thread_information *tip, unsigned int len)
720 {
721         struct blktrace_net_hdr hdr;
722
723         hdr.magic = BLK_IO_TRACE_MAGIC;
724         strcpy(hdr.buts_name, tip->device->buts_name);
725         hdr.cpu = tip->cpu;
726         hdr.max_cpus = ncpus;
727         hdr.len = len;
728         hdr.cl_id = getpid();
729         hdr.buf_size = tip->device->buf_size;
730         hdr.buf_nr = tip->device->buf_nr;
731         hdr.page_size = tip->device->page_size;
732         
733         return write_data_net(net_out_fd[tip->cpu], &hdr, sizeof(hdr));
734 }
735
736 /*
737  * send header with 0 length to signal end-of-run
738  */
739 static void net_client_send_close(void)
740 {
741         struct device_information *dip;
742         struct blktrace_net_hdr hdr;
743         int i;
744
745         for_each_dip(dip, i) {
746                 hdr.magic = BLK_IO_TRACE_MAGIC;
747                 hdr.max_cpus = ncpus;
748                 hdr.len = 0;
749                 strcpy(hdr.buts_name, dip->buts_name);
750                 hdr.cpu = get_dropped_count(dip->buts_name);
751                 hdr.cl_id = getpid();
752                 hdr.buf_size = dip->buf_size;
753                 hdr.buf_nr = dip->buf_nr;
754                 hdr.page_size = dip->page_size;
755
756                 write_data_net(net_out_fd[0], &hdr, sizeof(hdr));
757         }
758
759 }
760
761 static int flush_subbuf_net(struct thread_information *tip,
762                             struct tip_subbuf *ts)
763 {
764         if (net_send_header(tip, ts->len))
765                 return -1;
766         if (write_data_net(net_out_fd[tip->cpu], ts->buf, ts->len))
767                 return -1;
768
769         free(ts->buf);
770         free(ts);
771         return 1;
772 }
773
774 static int net_sendfile(struct thread_information *tip, struct tip_subbuf *ts)
775 {
776         int ret = sendfile(net_out_fd[tip->cpu], tip->fd, NULL, ts->len);
777
778         if (ret < 0) {
779                 perror("sendfile");
780                 return 1;
781         } else if (ret < (int) ts->len) {
782                 fprintf(stderr, "short sendfile send (%d of %d)\n", ret, ts->len);
783                 return 1;
784         }
785
786         return 0;
787 }
788
789 static int flush_subbuf_sendfile(struct thread_information *tip,
790                                  struct tip_subbuf *ts)
791 {
792         int ret = -1;
793
794         if (net_send_header(tip, ts->len))
795                 goto err;
796         if (net_sendfile(tip, ts))
797                 goto err;
798
799         tip->data_read += ts->len;
800         ret = 1;
801 err:
802         free(ts);
803         return ret;
804 }
805
806 static int get_subbuf_sendfile(struct thread_information *tip,
807                                __attribute__((__unused__)) unsigned int maxlen)
808 {
809         struct tip_subbuf *ts;
810         struct stat sb;
811         unsigned int ready;
812
813         wait_for_data(tip, -1);
814
815         if (fstat(tip->fd, &sb) < 0) {
816                 perror("trace stat");
817                 return -1;
818         }
819
820         ready = sb.st_size - tip->data_queued;
821         if (!ready) {
822                 usleep(1000);
823                 return 0;
824         }
825
826         ts = malloc(sizeof(*ts));
827         ts->buf = NULL;
828         ts->max_len = 0;
829         ts->len = ready;
830         tip->data_queued += ready;
831
832         if (flush_subbuf_sendfile(tip, ts) < 0)
833                 return -1;
834
835         return ready;
836 }
837
838 static int write_data(struct thread_information *tip, void *buf,
839                       unsigned int buf_len)
840 {
841         int ret;
842
843         if (!buf_len)
844                 return 0;
845
846         ret = fwrite(buf, buf_len, 1, tip->ofile);
847         if (ferror(tip->ofile) || ret != 1) {
848                 perror("fwrite");
849                 clearerr(tip->ofile);
850                 return 1;
851         }
852
853         if (tip->ofile_stdout)
854                 fflush(tip->ofile);
855
856         return 0;
857 }
858
859 static int flush_subbuf_file(struct thread_information *tip,
860                              struct tip_subbuf *ts)
861 {
862         unsigned int offset = 0;
863         struct blk_io_trace *t;
864         int pdu_len, events = 0;
865
866         /*
867          * surplus from last run
868          */
869         if (tip->leftover_ts) {
870                 struct tip_subbuf *prev_ts = tip->leftover_ts;
871
872                 if (prev_ts->len + ts->len > prev_ts->max_len) {
873                         prev_ts->max_len += ts->len;
874                         prev_ts->buf = realloc(prev_ts->buf, prev_ts->max_len);
875                 }
876
877                 memcpy(prev_ts->buf + prev_ts->len, ts->buf, ts->len);
878                 prev_ts->len += ts->len;
879
880                 free(ts->buf);
881                 free(ts);
882
883                 ts = prev_ts;
884                 tip->leftover_ts = NULL;
885         }
886
887         while (offset + sizeof(*t) <= ts->len) {
888                 t = ts->buf + offset;
889
890                 if (verify_trace(t)) {
891                         write_data(tip, ts->buf, offset);
892                         return -1;
893                 }
894
895                 pdu_len = t->pdu_len;
896
897                 if (offset + sizeof(*t) + pdu_len > ts->len)
898                         break;
899
900                 offset += sizeof(*t) + pdu_len;
901                 tip->events_processed++;
902                 tip->data_read += sizeof(*t) + pdu_len;
903                 events++;
904         }
905
906         if (write_data(tip, ts->buf, offset))
907                 return -1;
908
909         /*
910          * leftover bytes, save them for next time
911          */
912         if (offset != ts->len) {
913                 tip->leftover_ts = ts;
914                 ts->len -= offset;
915                 memmove(ts->buf, ts->buf + offset, ts->len);
916         } else {
917                 free(ts->buf);
918                 free(ts);
919         }
920
921         return events;
922 }
923
924 static int write_tip_events(struct thread_information *tip)
925 {
926         struct tip_subbuf *ts = subbuf_fifo_dequeue(tip);
927
928         if (ts)
929                 return tip->flush_subbuf(tip, ts);
930
931         return 0;
932 }
933
934 /*
935  * scans the tips we know and writes out the subbuffers we accumulate
936  */
937 static void get_and_write_events(void)
938 {
939         struct device_information *dip;
940         struct thread_information *tip;
941         int i, j, events, ret, tips_running;
942
943         while (!is_done()) {
944                 events = 0;
945
946                 for_each_dip(dip, i) {
947                         for_each_tip(dip, tip, j) {
948                                 ret = write_tip_events(tip);
949                                 if (ret > 0)
950                                         events += ret;
951                         }
952                 }
953
954                 if (!events)
955                         usleep(100000);
956         }
957
958         /*
959          * reap stored events
960          */
961         do {
962                 events = 0;
963                 tips_running = 0;
964                 for_each_dip(dip, i) {
965                         for_each_tip(dip, tip, j) {
966                                 ret = write_tip_events(tip);
967                                 if (ret > 0)
968                                         events += ret;
969                                 tips_running += !tip->exited;
970                         }
971                 }
972                 usleep(10);
973         } while (events || tips_running);
974 }
975
976 static void wait_for_threads(void)
977 {
978         /*
979          * for piped or network output, poll and fetch data for writeout.
980          * for files, we just wait around for trace threads to exit
981          */
982         if ((output_name && !strcmp(output_name, "-")) ||
983             ((net_mode == Net_client) && !net_use_sendfile))
984                 get_and_write_events();
985         else {
986                 struct device_information *dip;
987                 struct thread_information *tip;
988                 int i, j, tips_running;
989
990                 do {
991                         tips_running = 0;
992                         usleep(100000);
993
994                         for_each_dip(dip, i)
995                                 for_each_tip(dip, tip, j)
996                                         tips_running += !tip->exited;
997                 } while (tips_running);
998         }
999
1000         if (net_mode == Net_client)
1001                 net_client_send_close();
1002 }
1003
1004 static int fill_ofname(struct device_information *dip,
1005                        struct thread_information *tip, char *dst,
1006                        char *buts_name)
1007 {
1008         struct stat sb;
1009         int len = 0;
1010
1011         if (output_dir)
1012                 len = sprintf(dst, "%s/", output_dir);
1013         else
1014                 len = sprintf(dst, "./");
1015
1016         if (net_mode == Net_server) {
1017                 struct net_connection *nc = tip->nc;
1018
1019                 len += sprintf(dst + len, "%s-", inet_ntoa(nc->ch->cl_in_addr));
1020                 len += strftime(dst + len, 64, "%F-%T/", gmtime(&dip->cl_connect_time));
1021         }
1022
1023         if (stat(dst, &sb) < 0) {
1024                 if (errno != ENOENT) {
1025                         perror("stat");
1026                         return 1;
1027                 }
1028                 if (mkdir(dst, 0755) < 0) {
1029                         perror(dst);
1030                         fprintf(stderr, "Can't make output dir\n");
1031                         return 1;
1032                 }
1033         }
1034
1035         if (output_name)
1036                 sprintf(dst + len, "%s.blktrace.%d", output_name, tip->cpu);
1037         else
1038                 sprintf(dst + len, "%s.blktrace.%d", buts_name, tip->cpu);
1039
1040         return 0;
1041 }
1042
1043 static void fill_ops(struct thread_information *tip)
1044 {
1045         /*
1046          * setup ops
1047          */
1048         if (net_mode == Net_client) {
1049                 if (net_use_sendfile) {
1050                         tip->get_subbuf = get_subbuf_sendfile;
1051                         tip->flush_subbuf = NULL;
1052                 } else {
1053                         tip->get_subbuf = get_subbuf;
1054                         tip->flush_subbuf = flush_subbuf_net;
1055                 }
1056         } else {
1057                 if (tip->ofile_mmap)
1058                         tip->get_subbuf = mmap_subbuf;
1059                 else
1060                         tip->get_subbuf = get_subbuf;
1061
1062                 tip->flush_subbuf = flush_subbuf_file;
1063         }
1064                         
1065         if (net_mode == Net_server)
1066                 tip->read_data = read_data_net;
1067         else
1068                 tip->read_data = read_data_file;
1069 }
1070
1071 static int tip_open_output(struct device_information *dip,
1072                            struct thread_information *tip)
1073 {
1074         int pipeline = output_name && !strcmp(output_name, "-");
1075         int mode, vbuf_size;
1076         char op[128];
1077
1078         if (net_mode == Net_client) {
1079                 tip->ofile = NULL;
1080                 tip->ofile_stdout = 0;
1081                 tip->ofile_mmap = 0;
1082                 goto done;
1083         } else if (pipeline) {
1084                 tip->ofile = fdopen(STDOUT_FILENO, "w");
1085                 tip->ofile_stdout = 1;
1086                 tip->ofile_mmap = 0;
1087                 mode = _IOLBF;
1088                 vbuf_size = 512;
1089         } else {
1090                 if (fill_ofname(dip, tip, op, dip->buts_name))
1091                         return 1;
1092                 tip->ofile = fopen(op, "w+");
1093                 tip->ofile_stdout = 0;
1094                 tip->ofile_mmap = 1;
1095                 mode = _IOFBF;
1096                 vbuf_size = OFILE_BUF;
1097         }
1098
1099         if (tip->ofile == NULL) {
1100                 perror(op);
1101                 return 1;
1102         }
1103
1104         tip->ofile_buffer = malloc(vbuf_size);
1105         if (setvbuf(tip->ofile, tip->ofile_buffer, mode, vbuf_size)) {
1106                 perror("setvbuf");
1107                 close_thread(tip);
1108                 return 1;
1109         }
1110
1111 done:
1112         fill_ops(tip);
1113         return 0;
1114 }
1115
1116 static int start_threads(struct device_information *dip)
1117 {
1118         struct thread_information *tip;
1119         int j;
1120
1121         for_each_tip(dip, tip, j) {
1122                 tip->cpu = j;
1123                 tip->device = dip;
1124                 tip->events_processed = 0;
1125                 tip->fd = -1;
1126                 memset(&tip->fifo, 0, sizeof(tip->fifo));
1127                 tip->leftover_ts = NULL;
1128
1129                 if (tip_open_output(dip, tip))
1130                         return 1;
1131
1132                 if (pthread_create(&tip->thread, NULL, thread_main, tip)) {
1133                         perror("pthread_create");
1134                         close_thread(tip);
1135                         return 1;
1136                 }
1137         }
1138
1139         return 0;
1140 }
1141
1142 static void stop_threads(struct device_information *dip)
1143 {
1144         struct thread_information *tip;
1145         unsigned long ret;
1146         int i;
1147
1148         for_each_tip(dip, tip, i) {
1149                 (void) pthread_join(tip->thread, (void *) &ret);
1150                 close_thread(tip);
1151         }
1152 }
1153
1154 static void stop_all_threads(void)
1155 {
1156         struct device_information *dip;
1157         int i;
1158
1159         for_each_dip(dip, i)
1160                 stop_threads(dip);
1161 }
1162
1163 static void stop_all_tracing(void)
1164 {
1165         struct device_information *dip;
1166         int i;
1167
1168         for_each_dip(dip, i)
1169                 stop_trace(dip);
1170 }
1171
1172 static void exit_trace(int status)
1173 {
1174         if (!is_trace_stopped()) {
1175                 trace_stopped = 1;
1176                 stop_all_threads();
1177                 stop_all_tracing();
1178         }
1179
1180         exit(status);
1181 }
1182
1183 static int resize_devices(char *path)
1184 {
1185         int size = (ndevs + 1) * sizeof(struct device_information);
1186
1187         device_information = realloc(device_information, size);
1188         if (!device_information) {
1189                 fprintf(stderr, "Out of memory, device %s (%d)\n", path, size);
1190                 return 1;
1191         }
1192         device_information[ndevs].path = path;
1193         ndevs++;
1194         return 0;
1195 }
1196
1197 static int open_devices(void)
1198 {
1199         struct device_information *dip;
1200         int i;
1201
1202         for_each_dip(dip, i) {
1203                 dip->fd = open(dip->path, O_RDONLY | O_NONBLOCK);
1204                 if (dip->fd < 0) {
1205                         perror(dip->path);
1206                         return 1;
1207                 }
1208                 dip->buf_size = buf_size;
1209                 dip->buf_nr = buf_nr;
1210                 dip->page_size = page_size;
1211         }
1212
1213         return 0;
1214 }
1215
1216 static int start_devices(void)
1217 {
1218         struct device_information *dip;
1219         int i, j, size;
1220
1221         size = ncpus * sizeof(struct thread_information);
1222         thread_information = malloc(size * ndevs);
1223         memset(thread_information, 0, size*ndevs);
1224         if (!thread_information) {
1225                 fprintf(stderr, "Out of memory, threads (%d)\n", size * ndevs);
1226                 return 1;
1227         }
1228
1229         for_each_dip(dip, i) {
1230                 if (start_trace(dip)) {
1231                         close(dip->fd);
1232                         fprintf(stderr, "Failed to start trace on %s\n",
1233                                 dip->path);
1234                         break;
1235                 }
1236         }
1237
1238         if (i != ndevs) {
1239                 __for_each_dip(dip, device_information, i, j)
1240                         stop_trace(dip);
1241
1242                 return 1;
1243         }
1244
1245         for_each_dip(dip, i) {
1246                 dip->threads = thread_information + (i * ncpus);
1247                 if (start_threads(dip)) {
1248                         fprintf(stderr, "Failed to start worker threads\n");
1249                         break;
1250                 }
1251         }
1252
1253         if (i != ndevs) {
1254                 __for_each_dip(dip, device_information, i, j)
1255                         stop_threads(dip);
1256                 for_each_dip(dip, i)
1257                         stop_trace(dip);
1258
1259                 return 1;
1260         }
1261
1262         return 0;
1263 }
1264
1265 static void show_stats(struct device_information *dips, int ndips, int cpus)
1266 {
1267         struct device_information *dip;
1268         struct thread_information *tip;
1269         unsigned long long events_processed, data_read;
1270         unsigned long total_drops;
1271         int i, j, no_stdout = 0;
1272
1273         if (is_stat_shown())
1274                 return;
1275
1276         if (output_name && !strcmp(output_name, "-"))
1277                 no_stdout = 1;
1278
1279         stat_shown = 1;
1280
1281         total_drops = 0;
1282         __for_each_dip(dip, dips, ndips, i) {
1283                 if (!no_stdout)
1284                         printf("Device: %s\n", dip->path);
1285                 events_processed = 0;
1286                 data_read = 0;
1287                 __for_each_tip(dip, tip, cpus, j) {
1288                         if (!no_stdout)
1289                                 printf("  CPU%3d: %20lu events, %8llu KiB data\n",
1290                                         tip->cpu, tip->events_processed,
1291                                         (tip->data_read + 1023) >> 10);
1292                         events_processed += tip->events_processed;
1293                         data_read += tip->data_read;
1294                 }
1295                 total_drops += dip->drop_count;
1296                 if (!no_stdout)
1297                         printf("  Total:  %20llu events (dropped %lu), %8llu KiB data\n",
1298                                         events_processed, dip->drop_count,
1299                                         (data_read + 1023) >> 10);
1300         }
1301
1302         if (total_drops)
1303                 fprintf(stderr, "You have dropped events, consider using a larger buffer size (-b)\n");
1304 }
1305
1306 static struct device_information *net_get_dip(struct net_connection *nc,
1307                                               struct blktrace_net_hdr *bnh)
1308 {
1309         struct device_information *dip, *cl_dip = NULL;
1310         struct cl_host *ch = nc->ch;
1311         int i;
1312
1313         for (i = 0; i < ch->ndevs; i++) {
1314                 dip = &ch->device_information[i];
1315
1316                 if (!strcmp(dip->buts_name, bnh->buts_name))
1317                         return dip;
1318
1319                 if (dip->cl_id == bnh->cl_id)
1320                         cl_dip = dip;
1321         }
1322
1323         ch->device_information = realloc(ch->device_information, (ch->ndevs + 1) * sizeof(*dip));
1324         dip = &ch->device_information[ch->ndevs];
1325         memset(dip, 0, sizeof(*dip));
1326         dip->fd = -1;
1327         dip->ch = ch;
1328         dip->cl_id = bnh->cl_id;
1329         dip->buf_size = bnh->buf_size;
1330         dip->buf_nr = bnh->buf_nr;
1331         dip->page_size = bnh->page_size;
1332
1333         if (cl_dip)
1334                 dip->cl_connect_time = cl_dip->cl_connect_time;
1335         else
1336                 dip->cl_connect_time = nc->connect_time;
1337         strcpy(dip->buts_name, bnh->buts_name);
1338         dip->path = strdup(bnh->buts_name);
1339         dip->trace_started = 1;
1340         ch->ndevs++;
1341         dip->threads = malloc(nc->ncpus * sizeof(struct thread_information));
1342         memset(dip->threads, 0, nc->ncpus * sizeof(struct thread_information));
1343
1344         /*
1345          * open all files
1346          */
1347         for (i = 0; i < nc->ncpus; i++) {
1348                 struct thread_information *tip = &dip->threads[i];
1349
1350                 tip->cpu = i;
1351                 tip->device = dip;
1352                 tip->fd = -1;
1353                 tip->nc = nc;
1354                 
1355                 if (tip_open_output(dip, tip))
1356                         return NULL;
1357
1358                 tip->nc = NULL;
1359         }
1360
1361         return dip;
1362 }
1363
1364 static struct thread_information *net_get_tip(struct net_connection *nc,
1365                                               struct blktrace_net_hdr *bnh)
1366 {
1367         struct device_information *dip;
1368         struct thread_information *tip;
1369
1370         dip = net_get_dip(nc, bnh);
1371         if (!dip->trace_started) {
1372                 fprintf(stderr, "Events for closed devices %s\n", dip->buts_name);
1373                 return NULL;
1374         }
1375
1376         tip = &dip->threads[bnh->cpu];
1377         if (!tip->nc)
1378                 tip->nc = nc;
1379         
1380         return tip;
1381 }
1382
1383 static int net_get_header(struct net_connection *nc,
1384                           struct blktrace_net_hdr *bnh)
1385 {
1386         int fl = fcntl(nc->in_fd, F_GETFL);
1387         int bytes_left, ret;
1388         void *p = bnh;
1389
1390         fcntl(nc->in_fd, F_SETFL, fl | O_NONBLOCK);
1391         bytes_left = sizeof(*bnh);
1392         while (bytes_left && !is_done()) {
1393                 ret = recv(nc->in_fd, p, bytes_left, MSG_WAITALL);
1394                 if (ret < 0) {
1395                         if (errno != EAGAIN) {
1396                                 perror("recv header");
1397                                 return 1;
1398                         }
1399                         usleep(1000);
1400                         continue;
1401                 } else if (!ret) {
1402                         usleep(1000);
1403                         continue;
1404                 } else {
1405                         p += ret;
1406                         bytes_left -= ret;
1407                 }
1408         }
1409         fcntl(nc->in_fd, F_SETFL, fl & ~O_NONBLOCK);
1410         return bytes_left;
1411 }
1412
1413 /*
1414  * finalize a net client: truncate files, show stats, cleanup, etc
1415  */
1416 static void device_done(struct net_connection *nc, struct device_information *dip)
1417 {
1418         struct thread_information *tip;
1419         int i;
1420
1421         __for_each_tip(dip, tip, nc->ncpus, i)
1422                 tip_ftrunc_final(tip);
1423
1424         show_stats(dip, 1, nc->ncpus);
1425
1426         /*
1427          * cleanup for next run
1428          */
1429         __for_each_tip(dip, tip, nc->ncpus, i) {
1430                 if (tip->ofile)
1431                         fclose(tip->ofile);
1432         }
1433
1434         free(dip->threads);
1435         free(dip->path);
1436
1437         close(nc->in_fd);
1438         nc->in_fd = -1;
1439
1440         stat_shown = 0;
1441 }
1442
1443 static inline int in_addr_eq(struct in_addr a, struct in_addr b)
1444 {
1445         return a.s_addr == b.s_addr;
1446 }
1447
1448 static void net_add_client_host(struct cl_host *ch)
1449 {
1450         ch->list_next = cl_host_list;
1451         cl_host_list = ch;
1452         cl_hosts++;
1453 }
1454
1455 static void net_remove_client_host(struct cl_host *ch)
1456 {
1457         struct cl_host *p, *c;
1458         
1459         for (p = c = cl_host_list; c; c = c->list_next) {
1460                 if (c == ch) {
1461                         if (p == c)
1462                                 cl_host_list = c->list_next;
1463                         else
1464                                 p->list_next = c->list_next;
1465                         cl_hosts--;
1466                         return;
1467                 }
1468                 p = c;
1469         }
1470 }
1471
1472 static struct cl_host *net_find_client_host(struct in_addr cl_in_addr)
1473 {
1474         struct cl_host *ch = cl_host_list;
1475
1476         while (ch) {
1477                 if (in_addr_eq(ch->cl_in_addr, cl_in_addr))
1478                         return ch;
1479                 ch = ch->list_next;
1480         }
1481
1482         return NULL;
1483 }
1484
1485 static void net_client_host_done(struct cl_host *ch)
1486 {
1487         free(ch->device_information);
1488         free(ch->net_connections);
1489         net_connects -= ch->nconn;
1490         net_remove_client_host(ch);
1491         free(ch);
1492 }
1493
1494 /*
1495  * handle incoming events from a net client
1496  */
1497 static int net_client_data(struct net_connection *nc)
1498 {
1499         struct thread_information *tip;
1500         struct blktrace_net_hdr bnh;
1501
1502         if (net_get_header(nc, &bnh))
1503                 return 1;
1504
1505         if (data_is_native == -1 && check_data_endianness(bnh.magic)) {
1506                 fprintf(stderr, "server: received data is bad\n");
1507                 return 1;
1508         }
1509
1510         if (!data_is_native) {
1511                 bnh.magic = be32_to_cpu(bnh.magic);
1512                 bnh.cpu = be32_to_cpu(bnh.cpu);
1513                 bnh.max_cpus = be32_to_cpu(bnh.max_cpus);
1514                 bnh.len = be32_to_cpu(bnh.len);
1515                 bnh.cl_id = be32_to_cpu(bnh.cl_id);
1516                 bnh.buf_size = be32_to_cpu(bnh.buf_size);
1517                 bnh.buf_nr = be32_to_cpu(bnh.buf_nr);
1518                 bnh.page_size = be32_to_cpu(bnh.page_size);
1519         }
1520
1521         if ((bnh.magic & 0xffffff00) != BLK_IO_TRACE_MAGIC) {
1522                 fprintf(stderr, "server: bad data magic\n");
1523                 return 1;
1524         }
1525
1526         if (nc->ncpus == -1)
1527                 nc->ncpus = bnh.max_cpus;
1528
1529         /*
1530          * len == 0 means that the other end signalled end-of-run
1531          */
1532         if (!bnh.len) {
1533                 /*
1534                  * overload cpu count with dropped events
1535                  */
1536                 struct device_information *dip;
1537
1538                 dip = net_get_dip(nc, &bnh);
1539                 dip->drop_count = bnh.cpu;
1540                 dip->trace_started = 0;
1541
1542                 printf("server: end of run for %s\n", dip->buts_name);
1543
1544                 device_done(nc, dip);
1545
1546                 if (++nc->ch->ndevs_done == nc->ch->ndevs)
1547                         net_client_host_done(nc->ch);
1548
1549                 return 0;
1550         }
1551
1552         tip = net_get_tip(nc, &bnh);
1553         if (!tip)
1554                 return 1;
1555
1556         if (mmap_subbuf(tip, bnh.len))
1557                 return 1;
1558
1559         return 0;
1560 }
1561
1562 static void net_add_connection(int listen_fd, struct sockaddr_in *addr)
1563 {
1564         socklen_t socklen = sizeof(*addr);
1565         struct net_connection *nc;
1566         struct cl_host *ch;
1567         int in_fd;
1568
1569         in_fd = accept(listen_fd, (struct sockaddr *) addr, &socklen);
1570         if (in_fd < 0) {
1571                 perror("accept");
1572                 return;
1573         }
1574
1575         ch = net_find_client_host(addr->sin_addr);
1576         if (!ch) {
1577                 if (cl_hosts == NET_MAX_CL_HOSTS) {
1578                         fprintf(stderr, "server: no more clients allowed\n");
1579                         return;
1580                 }
1581                 ch = malloc(sizeof(struct cl_host));
1582                 memset(ch, 0, sizeof(*ch));
1583                 ch->cl_in_addr = addr->sin_addr;
1584                 net_add_client_host(ch);
1585
1586                 printf("server: connection from %s\n", inet_ntoa(addr->sin_addr));
1587         }
1588
1589         ch->net_connections = realloc(ch->net_connections, (ch->nconn + 1) * sizeof(*nc));
1590         nc = &ch->net_connections[ch->nconn++];
1591         memset(nc, 0, sizeof(*nc));
1592
1593         time(&nc->connect_time);
1594         nc->ch = ch;
1595         nc->in_fd = in_fd;
1596         nc->ncpus = -1;
1597         net_connects++;
1598 }
1599
1600 /*
1601  * event driven loop, handle new incoming connections and data from
1602  * existing connections
1603  */
1604 static void net_server_handle_connections(int listen_fd,
1605                                           struct sockaddr_in *addr)
1606 {
1607         struct pollfd *pfds = NULL;
1608         struct net_connection **ncs = NULL;
1609         int max_connects = 0;
1610         int i, nconns, events;
1611         struct cl_host *ch;
1612         struct net_connection *nc;
1613         
1614         printf("server: waiting for connections...\n");
1615
1616         while (!is_done()) {
1617                 if (net_connects >= max_connects) {
1618                         pfds = realloc(pfds, (net_connects + 1) * sizeof(*pfds));
1619                         ncs = realloc(ncs, (net_connects + 1) * sizeof(*ncs));
1620                         max_connects = net_connects + 1;
1621                 }
1622                 /*
1623                  * the zero entry is for incoming connections, remaining
1624                  * entries for clients
1625                  */
1626                 pfds[0].fd = listen_fd;
1627                 pfds[0].events = POLLIN;
1628                 nconns = 0;
1629                 for_each_cl_host(ch) {
1630                         for (i = 0; i < ch->nconn; i++) {
1631                                 nc = &ch->net_connections[i];
1632                                 pfds[nconns + 1].fd = nc->in_fd;
1633                                 pfds[nconns + 1].events = POLLIN;
1634                                 ncs[nconns++] = nc;
1635                         }
1636                 }
1637
1638                 events = poll(pfds, 1 + nconns, -1);
1639                 if (events < 0) {
1640                         if (errno == EINTR)
1641                                 continue;
1642
1643                         perror("poll");
1644                         break;
1645                 } else if (!events)
1646                         continue;
1647
1648                 if (pfds[0].revents & POLLIN) {
1649                         net_add_connection(listen_fd, addr);
1650                         events--;
1651                 }
1652
1653                 for (i = 0; events && i < nconns; i++) {
1654                         if (pfds[i + 1].revents & POLLIN) {
1655                                 net_client_data(ncs[i]);
1656                                 events--;
1657                         }
1658                 }
1659         }
1660 }
1661
1662 /*
1663  * Start here when we are in server mode - just fetch data from the network
1664  * and dump to files
1665  */
1666 static int net_server(void)
1667 {
1668         struct sockaddr_in addr;
1669         int fd, opt;
1670
1671         fd = socket(AF_INET, SOCK_STREAM, 0);
1672         if (fd < 0) {
1673                 perror("server: socket");
1674                 return 1;
1675         }
1676
1677         opt = 1;
1678         if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
1679                 perror("setsockopt");
1680                 return 1;
1681         }
1682
1683         memset(&addr, 0, sizeof(addr));
1684         addr.sin_family = AF_INET;
1685         addr.sin_addr.s_addr = htonl(INADDR_ANY);
1686         addr.sin_port = htons(net_port);
1687
1688         if (bind(fd, (struct sockaddr *) &addr, sizeof(addr)) < 0) {
1689                 perror("bind");
1690                 return 1;
1691         }
1692
1693         if (listen(fd, 1) < 0) {
1694                 perror("listen");
1695                 return 1;
1696         }
1697
1698         net_server_handle_connections(fd, &addr);
1699         return 0;
1700 }
1701
1702 /*
1703  * Setup outgoing network connection where we will transmit data
1704  */
1705 static int net_setup_client_cpu(int i, struct sockaddr_in *addr)
1706 {
1707         int fd;
1708
1709         fd = socket(AF_INET, SOCK_STREAM, 0);
1710         if (fd < 0) {
1711                 perror("client: socket");
1712                 return 1;
1713         }
1714
1715         if (connect(fd, (struct sockaddr *) addr, sizeof(*addr)) < 0) {
1716                 perror("client: connect");
1717                 return 1;
1718         }
1719
1720         net_out_fd[i] = fd;
1721         return 0;
1722 }
1723
1724 static int net_setup_client(void)
1725 {
1726         struct sockaddr_in addr;
1727         int i;
1728
1729         memset(&addr, 0, sizeof(addr));
1730         addr.sin_family = AF_INET;
1731         addr.sin_port = htons(net_port);
1732
1733         if (inet_aton(hostname, &addr.sin_addr) != 1) {
1734                 struct hostent *hent = gethostbyname(hostname);
1735                 if (!hent) {
1736                         perror("gethostbyname");
1737                         return 1;
1738                 }
1739
1740                 memcpy(&addr.sin_addr, hent->h_addr, 4);
1741                 strcpy(hostname, hent->h_name);
1742         }
1743
1744         printf("blktrace: connecting to %s\n", hostname);
1745
1746         net_out_fd = malloc(ncpus * sizeof(*net_out_fd));
1747         for (i = 0; i < ncpus; i++) {
1748                 if (net_setup_client_cpu(i, &addr))
1749                         return 1;
1750         }
1751
1752         printf("blktrace: connected!\n");
1753         
1754         return 0;
1755 }
1756
1757 static char usage_str[] = \
1758         "-d <dev> [ -r debugfs path ] [ -o <output> ] [-k ] [ -w time ]\n" \
1759         "[ -a action ] [ -A action mask ] [ -I  <devs file> ] [ -v ]\n\n" \
1760         "\t-d Use specified device. May also be given last after options\n" \
1761         "\t-r Path to mounted debugfs, defaults to /sys/kernel/debug\n" \
1762         "\t-o File(s) to send output to\n" \
1763         "\t-D Directory to prepend to output file names\n" \
1764         "\t-k Kill a running trace\n" \
1765         "\t-w Stop after defined time, in seconds\n" \
1766         "\t-a Only trace specified actions. See documentation\n" \
1767         "\t-A Give trace mask as a single value. See documentation\n" \
1768         "\t-b Sub buffer size in KiB\n" \
1769         "\t-n Number of sub buffers\n" \
1770         "\t-l Run in network listen mode (blktrace server)\n" \
1771         "\t-h Run in network client mode, connecting to the given host\n" \
1772         "\t-p Network port to use (default 8462)\n" \
1773         "\t-s Make the network client NOT use sendfile() to transfer data\n" \
1774         "\t-I Add devices found in <devs file>\n" \
1775         "\t-V Print program version info\n\n";
1776
1777 static void show_usage(char *program)
1778 {
1779         fprintf(stderr, "Usage: %s %s %s",program, blktrace_version, usage_str);
1780 }
1781
1782 int main(int argc, char *argv[])
1783 {
1784         static char default_debugfs_path[] = "/sys/kernel/debug";
1785         struct statfs st;
1786         int i, c;
1787         int stop_watch = 0;
1788         int act_mask_tmp = 0;
1789
1790         while ((c = getopt_long(argc, argv, S_OPTS, l_opts, NULL)) >= 0) {
1791                 switch (c) {
1792                 case 'a':
1793                         i = find_mask_map(optarg);
1794                         if (i < 0) {
1795                                 fprintf(stderr,"Invalid action mask %s\n",
1796                                         optarg);
1797                                 return 1;
1798                         }
1799                         act_mask_tmp |= i;
1800                         break;
1801
1802                 case 'A':
1803                         if ((sscanf(optarg, "%x", &i) != 1) || 
1804                                                         !valid_act_opt(i)) {
1805                                 fprintf(stderr,
1806                                         "Invalid set action mask %s/0x%x\n",
1807                                         optarg, i);
1808                                 return 1;
1809                         }
1810                         act_mask_tmp = i;
1811                         break;
1812
1813                 case 'd':
1814                         if (resize_devices(optarg) != 0)
1815                                 return 1;
1816                         break;
1817
1818                 case 'I': {
1819                         char dev_line[256];
1820                         FILE *ifp = fopen(optarg, "r");
1821
1822                         if (!ifp) {
1823                                 fprintf(stderr, 
1824                                         "Invalid file for devices %s\n", 
1825                                         optarg);
1826                                 return 1;
1827                         }
1828
1829                         while (fscanf(ifp, "%s\n", dev_line) == 1)
1830                                 if (resize_devices(strdup(dev_line)) != 0)
1831                                         return 1;
1832                         break;
1833                 }
1834                         
1835
1836                 case 'r':
1837                         debugfs_path = optarg;
1838                         break;
1839
1840                 case 'o':
1841                         output_name = optarg;
1842                         break;
1843                 case 'k':
1844                         kill_running_trace = 1;
1845                         break;
1846                 case 'w':
1847                         stop_watch = atoi(optarg);
1848                         if (stop_watch <= 0) {
1849                                 fprintf(stderr,
1850                                         "Invalid stopwatch value (%d secs)\n",
1851                                         stop_watch);
1852                                 return 1;
1853                         }
1854                         break;
1855                 case 'V':
1856                         printf("%s version %s\n", argv[0], blktrace_version);
1857                         return 0;
1858                 case 'b':
1859                         buf_size = strtoul(optarg, NULL, 10);
1860                         if (buf_size <= 0 || buf_size > 16*1024) {
1861                                 fprintf(stderr,
1862                                         "Invalid buffer size (%lu)\n",buf_size);
1863                                 return 1;
1864                         }
1865                         buf_size <<= 10;
1866                         break;
1867                 case 'n':
1868                         buf_nr = strtoul(optarg, NULL, 10);
1869                         if (buf_nr <= 0) {
1870                                 fprintf(stderr,
1871                                         "Invalid buffer nr (%lu)\n", buf_nr);
1872                                 return 1;
1873                         }
1874                         break;
1875                 case 'D':
1876                         output_dir = optarg;
1877                         break;
1878                 case 'h':
1879                         net_mode = Net_client;
1880                         strcpy(hostname, optarg);
1881                         break;
1882                 case 'l':
1883                         net_mode = Net_server;
1884                         break;
1885                 case 'p':
1886                         net_port = atoi(optarg);
1887                         break;
1888                 case 's':
1889                         net_use_sendfile = 0;
1890                         break;
1891                 default:
1892                         show_usage(argv[0]);
1893                         return 1;
1894                 }
1895         }
1896
1897         setlocale(LC_NUMERIC, "en_US");
1898
1899         page_size = getpagesize();
1900
1901         if (net_mode == Net_server) {
1902                 if (output_name) {
1903                         fprintf(stderr, "-o ignored in server mode\n");
1904                         output_name = NULL;
1905                 }
1906
1907                 return net_server();
1908         }
1909
1910         while (optind < argc) {
1911                 if (resize_devices(argv[optind++]) != 0)
1912                         return 1;
1913         }
1914
1915         if (ndevs == 0) {
1916                 show_usage(argv[0]);
1917                 return 1;
1918         }
1919
1920         if (act_mask_tmp != 0)
1921                 act_mask = act_mask_tmp;
1922
1923         if (!debugfs_path)
1924                 debugfs_path = default_debugfs_path;
1925
1926         if (statfs(debugfs_path, &st) < 0) {
1927                 perror("statfs");
1928                 fprintf(stderr,"%s does not appear to be a valid path\n",
1929                         debugfs_path);
1930                 return 1;
1931         } else if (st.f_type != (long) DEBUGFS_TYPE) {
1932                 fprintf(stderr,"%s does not appear to be a debug filesystem\n",
1933                         debugfs_path);
1934                 return 1;
1935         }
1936
1937         if (open_devices() != 0)
1938                 return 1;
1939
1940         if (kill_running_trace) {
1941                 stop_all_traces();
1942                 return 0;
1943         }
1944
1945         ncpus = sysconf(_SC_NPROCESSORS_ONLN);
1946         if (ncpus < 0) {
1947                 fprintf(stderr, "sysconf(_SC_NPROCESSORS_ONLN) failed\n");
1948                 return 1;
1949         }
1950
1951         signal(SIGINT, handle_sigint);
1952         signal(SIGHUP, handle_sigint);
1953         signal(SIGTERM, handle_sigint);
1954         signal(SIGALRM, handle_sigint);
1955         signal(SIGPIPE, SIG_IGN);
1956
1957         if (net_mode == Net_client && net_setup_client())
1958                 return 1;
1959
1960         if (start_devices() != 0)
1961                 return 1;
1962
1963         atexit(stop_all_tracing);
1964
1965         if (stop_watch)
1966                 alarm(stop_watch);
1967
1968         wait_for_threads();
1969
1970         if (!is_trace_stopped()) {
1971                 trace_stopped = 1;
1972                 stop_all_threads();
1973                 stop_all_traces();
1974         }
1975
1976         show_stats(device_information, ndevs, ncpus);
1977
1978         return 0;
1979 }
1980