Merge branch 'fix-m' into add-P
[blktrace.git] / blktrace.c
1 /*
2  * block queue tracing application
3  *
4  * Copyright (C) 2005 Jens Axboe <axboe@suse.de>
5  * Copyright (C) 2006 Jens Axboe <axboe@kernel.dk>
6  *
7  *  This program is free software; you can redistribute it and/or modify
8  *  it under the terms of the GNU General Public License as published by
9  *  the Free Software Foundation; either version 2 of the License, or
10  *  (at your option) any later version.
11  *
12  *  This program is distributed in the hope that it will be useful,
13  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *  GNU General Public License for more details.
16  *
17  *  You should have received a copy of the GNU General Public License
18  *  along with this program; if not, write to the Free Software
19  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
20  *
21  */
22 #include <pthread.h>
23 #include <sys/types.h>
24 #include <sys/stat.h>
25 #include <unistd.h>
26 #include <locale.h>
27 #include <signal.h>
28 #include <fcntl.h>
29 #include <string.h>
30 #include <sys/ioctl.h>
31 #include <sys/param.h>
32 #include <sys/statfs.h>
33 #include <sys/poll.h>
34 #include <sys/mman.h>
35 #include <sys/socket.h>
36 #include <stdio.h>
37 #include <stdlib.h>
38 #include <sched.h>
39 #include <ctype.h>
40 #include <getopt.h>
41 #include <errno.h>
42 #include <netinet/in.h>
43 #include <arpa/inet.h>
44 #include <netdb.h>
45 #include <sys/sendfile.h>
46
47 #include "blktrace.h"
48 #include "barrier.h"
49
50 static char blktrace_version[] = "1.0.0";
51
52 /*
53  * You may want to increase this even more, if you are logging at a high
54  * rate and see skipped/missed events
55  */
56 #define BUF_SIZE        (512 * 1024)
57 #define BUF_NR          (4)
58
59 #define OFILE_BUF       (128 * 1024)
60
61 #define DEBUGFS_TYPE    0x64626720
62
63 #define S_OPTS  "d:a:A:r:o:kw:vVb:n:D:lh:p:sI:"
64 static struct option l_opts[] = {
65         {
66                 .name = "dev",
67                 .has_arg = required_argument,
68                 .flag = NULL,
69                 .val = 'd'
70         },
71         {
72                 .name = "input-devs",
73                 .has_arg = required_argument,
74                 .flag = NULL,
75                 .val = 'I'
76         },
77         {
78                 .name = "act-mask",
79                 .has_arg = required_argument,
80                 .flag = NULL,
81                 .val = 'a'
82         },
83         {
84                 .name = "set-mask",
85                 .has_arg = required_argument,
86                 .flag = NULL,
87                 .val = 'A'
88         },
89         {
90                 .name = "relay",
91                 .has_arg = required_argument,
92                 .flag = NULL,
93                 .val = 'r'
94         },
95         {
96                 .name = "output",
97                 .has_arg = required_argument,
98                 .flag = NULL,
99                 .val = 'o'
100         },
101         {
102                 .name = "kill",
103                 .has_arg = no_argument,
104                 .flag = NULL,
105                 .val = 'k'
106         },
107         {
108                 .name = "stopwatch",
109                 .has_arg = required_argument,
110                 .flag = NULL,
111                 .val = 'w'
112         },
113         {
114                 .name = "version",
115                 .has_arg = no_argument,
116                 .flag = NULL,
117                 .val = 'v'
118         },
119         {
120                 .name = "version",
121                 .has_arg = no_argument,
122                 .flag = NULL,
123                 .val = 'V'
124         },
125         {
126                 .name = "buffer-size",
127                 .has_arg = required_argument,
128                 .flag = NULL,
129                 .val = 'b'
130         },
131         {
132                 .name = "num-sub-buffers",
133                 .has_arg = required_argument,
134                 .flag = NULL,
135                 .val = 'n'
136         },
137         {
138                 .name = "output-dir",
139                 .has_arg = required_argument,
140                 .flag = NULL,
141                 .val = 'D'
142         },
143         {
144                 .name = "listen",
145                 .has_arg = no_argument,
146                 .flag = NULL,
147                 .val = 'l'
148         },
149         {
150                 .name = "host",
151                 .has_arg = required_argument,
152                 .flag = NULL,
153                 .val = 'h'
154         },
155         {
156                 .name = "port",
157                 .has_arg = required_argument,
158                 .flag = NULL,
159                 .val = 'p'
160         },
161         {
162                 .name = "no-sendfile",
163                 .has_arg = no_argument,
164                 .flag = NULL,
165                 .val = 's'
166         },
167         {
168                 .name = NULL,
169         }
170 };
171
172 struct tip_subbuf {
173         void *buf;
174         unsigned int len;
175         unsigned int max_len;
176 };
177
178 #define FIFO_SIZE       (1024)  /* should be plenty big! */
179 #define CL_SIZE         (128)   /* cache line, any bigger? */
180
181 struct tip_subbuf_fifo {
182         int tail __attribute__((aligned(CL_SIZE)));
183         int head __attribute__((aligned(CL_SIZE)));
184         struct tip_subbuf *q[FIFO_SIZE];
185 };
186
187 struct thread_information {
188         int cpu;
189         pthread_t thread;
190
191         int fd;
192         void *fd_buf;
193         char fn[MAXPATHLEN + 64];
194
195         FILE *ofile;
196         char *ofile_buffer;
197         off_t ofile_offset;
198         int ofile_stdout;
199         int ofile_mmap;
200
201         int (*get_subbuf)(struct thread_information *, unsigned int);
202         int (*flush_subbuf)(struct thread_information *, struct tip_subbuf *);
203         int (*read_data)(struct thread_information *, void *, unsigned int);
204
205         unsigned long events_processed;
206         unsigned long long data_read;
207         unsigned long long data_queued;
208         struct device_information *device;
209
210         int exited;
211
212         /*
213          * piped fifo buffers
214          */
215         struct tip_subbuf_fifo fifo;
216         struct tip_subbuf *leftover_ts;
217
218         /*
219          * mmap controlled output files
220          */
221         unsigned long long fs_size;
222         unsigned long long fs_max_size;
223         unsigned long fs_off;
224         void *fs_buf;
225         unsigned long fs_buf_len;
226
227         struct net_connection *nc;
228 };
229
230 struct device_information {
231         int fd;
232         char *path;
233         char buts_name[32];
234         volatile int trace_started;
235         unsigned long drop_count;
236         struct thread_information *threads;
237         unsigned long buf_size;
238         unsigned long buf_nr;
239         unsigned int page_size;
240
241         struct cl_host *ch;
242         u32 cl_id;
243         time_t cl_connect_time;
244 };
245
246 static int ncpus;
247 static struct thread_information *thread_information;
248 static int ndevs;
249 static struct device_information *device_information;
250
251 /* command line option globals */
252 static char *debugfs_path;
253 static char *output_name;
254 static char *output_dir;
255 static int act_mask = ~0U;
256 static int kill_running_trace;
257 static unsigned long buf_size = BUF_SIZE;
258 static unsigned long buf_nr = BUF_NR;
259 static unsigned int page_size;
260
261 #define is_done()       (*(volatile int *)(&done))
262 static volatile int done;
263
264 #define is_trace_stopped()      (*(volatile int *)(&trace_stopped))
265 static volatile int trace_stopped;
266
267 #define is_stat_shown() (*(volatile int *)(&stat_shown))
268 static volatile int stat_shown;
269
270 int data_is_native = -1;
271
272 static void exit_trace(int status);
273
274 #define dip_tracing(dip)        (*(volatile int *)(&(dip)->trace_started))
275 #define dip_set_tracing(dip, v) ((dip)->trace_started = (v))
276
277 #define __for_each_dip(__d, __di, __e, __i)     \
278         for (__i = 0, __d = __di; __i < __e; __i++, __d++)
279
280 #define for_each_dip(__d, __i)          \
281         __for_each_dip(__d, device_information, ndevs, __i)
282 #define for_each_nc_dip(__nc, __d, __i)         \
283         __for_each_dip(__d, (__nc)->ch->device_information, (__nc)->ch->ndevs, __i)
284
285 #define __for_each_tip(__d, __t, __ncpus, __j)  \
286         for (__j = 0, __t = (__d)->threads; __j < __ncpus; __j++, __t++)
287 #define for_each_tip(__d, __t, __j)     \
288         __for_each_tip(__d, __t, ncpus, __j)
289 #define for_each_cl_host(__c)   \
290         for (__c = cl_host_list; __c; __c = __c->list_next)
291
292 /*
293  * networking stuff follows. we include a magic number so we know whether
294  * to endianness convert or not
295  */
296 struct blktrace_net_hdr {
297         u32 magic;              /* same as trace magic */
298         char buts_name[32];     /* trace name */
299         u32 cpu;                /* for which cpu */
300         u32 max_cpus;
301         u32 len;                /* length of following trace data */
302         u32 cl_id;              /* id for set of client per-cpu connections */
303         u32 buf_size;           /* client buf_size for this trace  */
304         u32 buf_nr;             /* client buf_nr for this trace  */
305         u32 page_size;          /* client page_size for this trace  */
306 };
307
308 #define TRACE_NET_PORT          (8462)
309
310 enum {
311         Net_none = 0,
312         Net_server,
313         Net_client,
314 };
315
316 /*
317  * network cmd line params
318  */
319 static char hostname[MAXHOSTNAMELEN];
320 static int net_port = TRACE_NET_PORT;
321 static int net_mode = 0;
322 static int net_use_sendfile = 1;
323
324 struct cl_host {
325         struct cl_host *list_next;
326         struct in_addr cl_in_addr;
327         struct net_connection *net_connections;
328         int nconn;
329         struct device_information *device_information;
330         int ndevs;
331         int ncpus;
332         int ndevs_done;
333 };
334
335 struct net_connection {
336         int in_fd;
337         struct pollfd pfd;
338         time_t connect_time;
339         struct cl_host *ch;
340         int ncpus;
341 };
342
343 #define NET_MAX_CL_HOSTS        (1024)
344 static struct cl_host *cl_host_list;
345 static int cl_hosts;
346 static int net_connects;
347
348 static int *net_out_fd;
349
350 static void handle_sigint(__attribute__((__unused__)) int sig)
351 {
352         struct device_information *dip;
353         int i;
354
355         /*
356          * stop trace so we can reap currently produced data
357          */
358         for_each_dip(dip, i) {
359                 if (dip->fd == -1)
360                         continue;
361                 if (ioctl(dip->fd, BLKTRACESTOP) < 0)
362                         perror("BLKTRACESTOP");
363         }
364
365         done = 1;
366 }
367
368 static int get_dropped_count(const char *buts_name)
369 {
370         int fd;
371         char tmp[MAXPATHLEN + 64];
372
373         snprintf(tmp, sizeof(tmp), "%s/block/%s/dropped",
374                  debugfs_path, buts_name);
375
376         fd = open(tmp, O_RDONLY);
377         if (fd < 0) {
378                 /*
379                  * this may be ok, if the kernel doesn't support dropped counts
380                  */
381                 if (errno == ENOENT)
382                         return 0;
383
384                 fprintf(stderr, "Couldn't open dropped file %s\n", tmp);
385                 return -1;
386         }
387
388         if (read(fd, tmp, sizeof(tmp)) < 0) {
389                 perror(tmp);
390                 close(fd);
391                 return -1;
392         }
393
394         close(fd);
395
396         return atoi(tmp);
397 }
398
399 static int start_trace(struct device_information *dip)
400 {
401         struct blk_user_trace_setup buts;
402
403         memset(&buts, 0, sizeof(buts));
404         buts.buf_size = dip->buf_size;
405         buts.buf_nr = dip->buf_nr;
406         buts.act_mask = act_mask;
407
408         if (ioctl(dip->fd, BLKTRACESETUP, &buts) < 0) {
409                 perror("BLKTRACESETUP");
410                 return 1;
411         }
412
413         if (ioctl(dip->fd, BLKTRACESTART) < 0) {
414                 perror("BLKTRACESTART");
415                 return 1;
416         }
417
418         memcpy(dip->buts_name, buts.name, sizeof(dip->buts_name));
419         dip_set_tracing(dip, 1);
420         return 0;
421 }
422
423 static void stop_trace(struct device_information *dip)
424 {
425         if (dip_tracing(dip) || kill_running_trace) {
426                 dip_set_tracing(dip, 0);
427
428                 /*
429                  * should be stopped, just don't complain if it isn't
430                  */
431                 ioctl(dip->fd, BLKTRACESTOP);
432
433                 if (ioctl(dip->fd, BLKTRACETEARDOWN) < 0)
434                         perror("BLKTRACETEARDOWN");
435
436                 close(dip->fd);
437                 dip->fd = -1;
438         }
439 }
440
441 static void stop_all_traces(void)
442 {
443         struct device_information *dip;
444         int i;
445
446         for_each_dip(dip, i) {
447                 dip->drop_count = get_dropped_count(dip->buts_name);
448                 stop_trace(dip);
449         }
450 }
451
452 static void wait_for_data(struct thread_information *tip, int timeout)
453 {
454         struct pollfd pfd = { .fd = tip->fd, .events = POLLIN };
455
456         while (!is_done()) {
457                 if (poll(&pfd, 1, timeout) < 0) {
458                         perror("poll");
459                         break;
460                 }
461                 if (pfd.revents & POLLIN)
462                         break;
463                 if (tip->ofile_stdout)
464                         break;
465         }
466 }
467
468 static int read_data_file(struct thread_information *tip, void *buf,
469                           unsigned int len)
470 {
471         int ret = 0;
472
473         do {
474                 wait_for_data(tip, 100);
475
476                 ret = read(tip->fd, buf, len);
477                 if (!ret)
478                         continue;
479                 else if (ret > 0)
480                         return ret;
481                 else {
482                         if (errno != EAGAIN) {
483                                 perror(tip->fn);
484                                 fprintf(stderr,"Thread %d failed read of %s\n",
485                                         tip->cpu, tip->fn);
486                                 break;
487                         }
488                         continue;
489                 }
490         } while (!is_done());
491
492         return ret;
493
494 }
495
496 static int read_data_net(struct thread_information *tip, void *buf,
497                          unsigned int len)
498 {
499         struct net_connection *nc = tip->nc;
500         unsigned int bytes_left = len;
501         int ret = 0;
502
503         do {
504                 ret = recv(nc->in_fd, buf, bytes_left, MSG_WAITALL);
505
506                 if (!ret)
507                         continue;
508                 else if (ret < 0) {
509                         if (errno != EAGAIN) {
510                                 perror(tip->fn);
511                                 fprintf(stderr, "server: failed read\n");
512                                 return 0;
513                         }
514                         continue;
515                 } else {
516                         buf += ret;
517                         bytes_left -= ret;
518                 }
519         } while (!is_done() && bytes_left);
520
521         return len - bytes_left;
522 }
523
524 static inline struct tip_subbuf *
525 subbuf_fifo_dequeue(struct thread_information *tip)
526 {
527         const int head = tip->fifo.head;
528         const int next = (head + 1) & (FIFO_SIZE - 1);
529
530         if (head != tip->fifo.tail) {
531                 struct tip_subbuf *ts = tip->fifo.q[head];
532
533                 store_barrier();
534                 tip->fifo.head = next;
535                 return ts;
536         }
537
538         return NULL;
539 }
540
541 static inline int subbuf_fifo_queue(struct thread_information *tip,
542                                     struct tip_subbuf *ts)
543 {
544         const int tail = tip->fifo.tail;
545         const int next = (tail + 1) & (FIFO_SIZE - 1);
546
547         if (next != tip->fifo.head) {
548                 tip->fifo.q[tail] = ts;
549                 store_barrier();
550                 tip->fifo.tail = next;
551                 return 0;
552         }
553
554         fprintf(stderr, "fifo too small!\n");
555         return 1;
556 }
557
558 /*
559  * For file output, truncate and mmap the file appropriately
560  */
561 static int mmap_subbuf(struct thread_information *tip, unsigned int maxlen)
562 {
563         int ofd = fileno(tip->ofile);
564         int ret;
565         unsigned long nr;
566
567         /*
568          * extend file, if we have to. use chunks of 16 subbuffers.
569          */
570         if (tip->fs_off + maxlen > tip->fs_buf_len) {
571                 if (tip->fs_buf) {
572                         munlock(tip->fs_buf, tip->fs_buf_len);
573                         munmap(tip->fs_buf, tip->fs_buf_len);
574                         tip->fs_buf = NULL;
575                 }
576
577                 tip->fs_off = tip->fs_size & (tip->device->page_size - 1);
578                 nr = max(16, tip->device->buf_nr);
579                 tip->fs_buf_len = (nr * tip->device->buf_size) - tip->fs_off;
580                 tip->fs_max_size += tip->fs_buf_len;
581
582                 if (ftruncate(ofd, tip->fs_max_size) < 0) {
583                         perror("ftruncate");
584                         return -1;
585                 }
586
587                 tip->fs_buf = mmap(NULL, tip->fs_buf_len, PROT_WRITE,
588                                    MAP_SHARED, ofd, tip->fs_size - tip->fs_off);
589                 if (tip->fs_buf == MAP_FAILED) {
590                         perror("mmap");
591                         return -1;
592                 }
593                 mlock(tip->fs_buf, tip->fs_buf_len);
594         }
595
596         ret = tip->read_data(tip, tip->fs_buf + tip->fs_off, maxlen);
597         if (ret >= 0) {
598                 tip->data_read += ret;
599                 tip->fs_size += ret;
600                 tip->fs_off += ret;
601                 return 0;
602         }
603
604         return -1;
605 }
606
607 /*
608  * Use the copy approach for pipes and network
609  */
610 static int get_subbuf(struct thread_information *tip, unsigned int maxlen)
611 {
612         struct tip_subbuf *ts = malloc(sizeof(*ts));
613         int ret;
614
615         ts->buf = malloc(tip->device->buf_size);
616         ts->max_len = maxlen;
617
618         ret = tip->read_data(tip, ts->buf, ts->max_len);
619         if (ret > 0) {
620                 ts->len = ret;
621                 tip->data_read += ret;
622                 if (subbuf_fifo_queue(tip, ts))
623                         ret = -1;
624         }
625
626         if (ret <= 0) {
627                 free(ts->buf);
628                 free(ts);
629         }
630
631         return ret;
632 }
633
634 static void close_thread(struct thread_information *tip)
635 {
636         if (tip->fd != -1)
637                 close(tip->fd);
638         if (tip->ofile)
639                 fclose(tip->ofile);
640         if (tip->ofile_buffer)
641                 free(tip->ofile_buffer);
642         if (tip->fd_buf)
643                 free(tip->fd_buf);
644
645         tip->fd = -1;
646         tip->ofile = NULL;
647         tip->ofile_buffer = NULL;
648         tip->fd_buf = NULL;
649 }
650
651 static void tip_ftrunc_final(struct thread_information *tip)
652 {
653         /*
654          * truncate to right size and cleanup mmap
655          */
656         if (tip->ofile_mmap && tip->ofile) {
657                 int ofd = fileno(tip->ofile);
658
659                 if (tip->fs_buf)
660                         munmap(tip->fs_buf, tip->fs_buf_len);
661
662                 ftruncate(ofd, tip->fs_size);
663         }
664 }
665
666 static void *thread_main(void *arg)
667 {
668         struct thread_information *tip = arg;
669         pid_t pid = getpid();
670         cpu_set_t cpu_mask;
671
672         CPU_ZERO(&cpu_mask);
673         CPU_SET((tip->cpu), &cpu_mask);
674
675         if (sched_setaffinity(pid, sizeof(cpu_mask), &cpu_mask) == -1) {
676                 perror("sched_setaffinity");
677                 exit_trace(1);
678         }
679
680         snprintf(tip->fn, sizeof(tip->fn), "%s/block/%s/trace%d",
681                         debugfs_path, tip->device->buts_name, tip->cpu);
682         tip->fd = open(tip->fn, O_RDONLY);
683         if (tip->fd < 0) {
684                 perror(tip->fn);
685                 fprintf(stderr,"Thread %d failed open of %s\n", tip->cpu,
686                         tip->fn);
687                 exit_trace(1);
688         }
689
690         while (!is_done()) {
691                 if (tip->get_subbuf(tip, tip->device->buf_size) < 0)
692                         break;
693         }
694
695         /*
696          * trace is stopped, pull data until we get a short read
697          */
698         while (tip->get_subbuf(tip, tip->device->buf_size) > 0)
699                 ;
700
701         tip_ftrunc_final(tip);
702         tip->exited = 1;
703         return NULL;
704 }
705
706 static int write_data_net(int fd, void *buf, unsigned int buf_len)
707 {
708         unsigned int bytes_left = buf_len;
709         int ret;
710
711         while (bytes_left) {
712                 ret = send(fd, buf, bytes_left, 0);
713                 if (ret < 0) {
714                         perror("send");
715                         return 1;
716                 }
717
718                 buf += ret;
719                 bytes_left -= ret;
720         }
721
722         return 0;
723 }
724
725 static int net_send_header(struct thread_information *tip, unsigned int len)
726 {
727         struct blktrace_net_hdr hdr;
728
729         hdr.magic = BLK_IO_TRACE_MAGIC;
730         strcpy(hdr.buts_name, tip->device->buts_name);
731         hdr.cpu = tip->cpu;
732         hdr.max_cpus = ncpus;
733         hdr.len = len;
734         hdr.cl_id = getpid();
735         hdr.buf_size = tip->device->buf_size;
736         hdr.buf_nr = tip->device->buf_nr;
737         hdr.page_size = tip->device->page_size;
738         
739         return write_data_net(net_out_fd[tip->cpu], &hdr, sizeof(hdr));
740 }
741
742 /*
743  * send header with 0 length to signal end-of-run
744  */
745 static void net_client_send_close(void)
746 {
747         struct device_information *dip;
748         struct blktrace_net_hdr hdr;
749         int i;
750
751         for_each_dip(dip, i) {
752                 hdr.magic = BLK_IO_TRACE_MAGIC;
753                 hdr.max_cpus = ncpus;
754                 hdr.len = 0;
755                 strcpy(hdr.buts_name, dip->buts_name);
756                 hdr.cpu = get_dropped_count(dip->buts_name);
757                 hdr.cl_id = getpid();
758                 hdr.buf_size = dip->buf_size;
759                 hdr.buf_nr = dip->buf_nr;
760                 hdr.page_size = dip->page_size;
761
762                 write_data_net(net_out_fd[0], &hdr, sizeof(hdr));
763         }
764
765 }
766
767 static int flush_subbuf_net(struct thread_information *tip,
768                             struct tip_subbuf *ts)
769 {
770         if (net_send_header(tip, ts->len))
771                 return -1;
772         if (write_data_net(net_out_fd[tip->cpu], ts->buf, ts->len))
773                 return -1;
774
775         free(ts->buf);
776         free(ts);
777         return 1;
778 }
779
780 static int net_sendfile(struct thread_information *tip, struct tip_subbuf *ts)
781 {
782         int ret = sendfile(net_out_fd[tip->cpu], tip->fd, NULL, ts->len);
783
784         if (ret < 0) {
785                 perror("sendfile");
786                 return 1;
787         } else if (ret < (int) ts->len) {
788                 fprintf(stderr, "short sendfile send (%d of %d)\n", ret, ts->len);
789                 return 1;
790         }
791
792         return 0;
793 }
794
795 static int flush_subbuf_sendfile(struct thread_information *tip,
796                                  struct tip_subbuf *ts)
797 {
798         int ret = -1;
799
800         if (net_send_header(tip, ts->len))
801                 goto err;
802         if (net_sendfile(tip, ts))
803                 goto err;
804
805         tip->data_read += ts->len;
806         ret = 1;
807 err:
808         free(ts);
809         return ret;
810 }
811
812 static int get_subbuf_sendfile(struct thread_information *tip,
813                                __attribute__((__unused__)) unsigned int maxlen)
814 {
815         struct tip_subbuf *ts;
816         struct stat sb;
817         unsigned int ready;
818
819         wait_for_data(tip, -1);
820
821         if (fstat(tip->fd, &sb) < 0) {
822                 perror("trace stat");
823                 return -1;
824         }
825
826         ready = sb.st_size - tip->data_queued;
827         if (!ready) {
828                 usleep(1000);
829                 return 0;
830         }
831
832         ts = malloc(sizeof(*ts));
833         ts->buf = NULL;
834         ts->max_len = 0;
835         ts->len = ready;
836         tip->data_queued += ready;
837
838         if (flush_subbuf_sendfile(tip, ts) < 0)
839                 return -1;
840
841         return ready;
842 }
843
844 static int write_data(struct thread_information *tip, void *buf,
845                       unsigned int buf_len)
846 {
847         int ret;
848
849         if (!buf_len)
850                 return 0;
851
852         ret = fwrite(buf, buf_len, 1, tip->ofile);
853         if (ferror(tip->ofile) || ret != 1) {
854                 perror("fwrite");
855                 clearerr(tip->ofile);
856                 return 1;
857         }
858
859         if (tip->ofile_stdout)
860                 fflush(tip->ofile);
861
862         return 0;
863 }
864
865 static int flush_subbuf_file(struct thread_information *tip,
866                              struct tip_subbuf *ts)
867 {
868         unsigned int offset = 0;
869         struct blk_io_trace *t;
870         int pdu_len, events = 0;
871
872         /*
873          * surplus from last run
874          */
875         if (tip->leftover_ts) {
876                 struct tip_subbuf *prev_ts = tip->leftover_ts;
877
878                 if (prev_ts->len + ts->len > prev_ts->max_len) {
879                         prev_ts->max_len += ts->len;
880                         prev_ts->buf = realloc(prev_ts->buf, prev_ts->max_len);
881                 }
882
883                 memcpy(prev_ts->buf + prev_ts->len, ts->buf, ts->len);
884                 prev_ts->len += ts->len;
885
886                 free(ts->buf);
887                 free(ts);
888
889                 ts = prev_ts;
890                 tip->leftover_ts = NULL;
891         }
892
893         while (offset + sizeof(*t) <= ts->len) {
894                 t = ts->buf + offset;
895
896                 if (verify_trace(t)) {
897                         write_data(tip, ts->buf, offset);
898                         return -1;
899                 }
900
901                 pdu_len = t->pdu_len;
902
903                 if (offset + sizeof(*t) + pdu_len > ts->len)
904                         break;
905
906                 offset += sizeof(*t) + pdu_len;
907                 tip->events_processed++;
908                 tip->data_read += sizeof(*t) + pdu_len;
909                 events++;
910         }
911
912         if (write_data(tip, ts->buf, offset))
913                 return -1;
914
915         /*
916          * leftover bytes, save them for next time
917          */
918         if (offset != ts->len) {
919                 tip->leftover_ts = ts;
920                 ts->len -= offset;
921                 memmove(ts->buf, ts->buf + offset, ts->len);
922         } else {
923                 free(ts->buf);
924                 free(ts);
925         }
926
927         return events;
928 }
929
930 static int write_tip_events(struct thread_information *tip)
931 {
932         struct tip_subbuf *ts = subbuf_fifo_dequeue(tip);
933
934         if (ts)
935                 return tip->flush_subbuf(tip, ts);
936
937         return 0;
938 }
939
940 /*
941  * scans the tips we know and writes out the subbuffers we accumulate
942  */
943 static void get_and_write_events(void)
944 {
945         struct device_information *dip;
946         struct thread_information *tip;
947         int i, j, events, ret, tips_running;
948
949         while (!is_done()) {
950                 events = 0;
951
952                 for_each_dip(dip, i) {
953                         for_each_tip(dip, tip, j) {
954                                 ret = write_tip_events(tip);
955                                 if (ret > 0)
956                                         events += ret;
957                         }
958                 }
959
960                 if (!events)
961                         usleep(100000);
962         }
963
964         /*
965          * reap stored events
966          */
967         do {
968                 events = 0;
969                 tips_running = 0;
970                 for_each_dip(dip, i) {
971                         for_each_tip(dip, tip, j) {
972                                 ret = write_tip_events(tip);
973                                 if (ret > 0)
974                                         events += ret;
975                                 tips_running += !tip->exited;
976                         }
977                 }
978                 usleep(10);
979         } while (events || tips_running);
980 }
981
982 static void wait_for_threads(void)
983 {
984         /*
985          * for piped or network output, poll and fetch data for writeout.
986          * for files, we just wait around for trace threads to exit
987          */
988         if ((output_name && !strcmp(output_name, "-")) ||
989             ((net_mode == Net_client) && !net_use_sendfile))
990                 get_and_write_events();
991         else {
992                 struct device_information *dip;
993                 struct thread_information *tip;
994                 int i, j, tips_running;
995
996                 do {
997                         tips_running = 0;
998                         usleep(100000);
999
1000                         for_each_dip(dip, i)
1001                                 for_each_tip(dip, tip, j)
1002                                         tips_running += !tip->exited;
1003                 } while (tips_running);
1004         }
1005
1006         if (net_mode == Net_client)
1007                 net_client_send_close();
1008 }
1009
1010 static int fill_ofname(struct device_information *dip,
1011                        struct thread_information *tip, char *dst,
1012                        char *buts_name)
1013 {
1014         struct stat sb;
1015         int len = 0;
1016
1017         if (output_dir)
1018                 len = sprintf(dst, "%s/", output_dir);
1019         else
1020                 len = sprintf(dst, "./");
1021
1022         if (net_mode == Net_server) {
1023                 struct net_connection *nc = tip->nc;
1024
1025                 len += sprintf(dst + len, "%s-", inet_ntoa(nc->ch->cl_in_addr));
1026                 len += strftime(dst + len, 64, "%F-%T/", gmtime(&dip->cl_connect_time));
1027         }
1028
1029         if (stat(dst, &sb) < 0) {
1030                 if (errno != ENOENT) {
1031                         perror("stat");
1032                         return 1;
1033                 }
1034                 if (mkdir(dst, 0755) < 0) {
1035                         perror(dst);
1036                         fprintf(stderr, "Can't make output dir\n");
1037                         return 1;
1038                 }
1039         }
1040
1041         if (output_name)
1042                 sprintf(dst + len, "%s.blktrace.%d", output_name, tip->cpu);
1043         else
1044                 sprintf(dst + len, "%s.blktrace.%d", buts_name, tip->cpu);
1045
1046         return 0;
1047 }
1048
1049 static void fill_ops(struct thread_information *tip)
1050 {
1051         /*
1052          * setup ops
1053          */
1054         if (net_mode == Net_client) {
1055                 if (net_use_sendfile) {
1056                         tip->get_subbuf = get_subbuf_sendfile;
1057                         tip->flush_subbuf = NULL;
1058                 } else {
1059                         tip->get_subbuf = get_subbuf;
1060                         tip->flush_subbuf = flush_subbuf_net;
1061                 }
1062         } else {
1063                 if (tip->ofile_mmap)
1064                         tip->get_subbuf = mmap_subbuf;
1065                 else
1066                         tip->get_subbuf = get_subbuf;
1067
1068                 tip->flush_subbuf = flush_subbuf_file;
1069         }
1070                         
1071         if (net_mode == Net_server)
1072                 tip->read_data = read_data_net;
1073         else
1074                 tip->read_data = read_data_file;
1075 }
1076
1077 static int tip_open_output(struct device_information *dip,
1078                            struct thread_information *tip)
1079 {
1080         int pipeline = output_name && !strcmp(output_name, "-");
1081         int mode, vbuf_size;
1082         char op[128];
1083
1084         if (net_mode == Net_client) {
1085                 tip->ofile = NULL;
1086                 tip->ofile_stdout = 0;
1087                 tip->ofile_mmap = 0;
1088                 goto done;
1089         } else if (pipeline) {
1090                 tip->ofile = fdopen(STDOUT_FILENO, "w");
1091                 tip->ofile_stdout = 1;
1092                 tip->ofile_mmap = 0;
1093                 mode = _IOLBF;
1094                 vbuf_size = 512;
1095         } else {
1096                 if (fill_ofname(dip, tip, op, dip->buts_name))
1097                         return 1;
1098                 tip->ofile = fopen(op, "w+");
1099                 tip->ofile_stdout = 0;
1100                 tip->ofile_mmap = 1;
1101                 mode = _IOFBF;
1102                 vbuf_size = OFILE_BUF;
1103         }
1104
1105         if (tip->ofile == NULL) {
1106                 perror(op);
1107                 return 1;
1108         }
1109
1110         tip->ofile_buffer = malloc(vbuf_size);
1111         if (setvbuf(tip->ofile, tip->ofile_buffer, mode, vbuf_size)) {
1112                 perror("setvbuf");
1113                 close_thread(tip);
1114                 return 1;
1115         }
1116
1117 done:
1118         fill_ops(tip);
1119         return 0;
1120 }
1121
1122 static int start_threads(struct device_information *dip)
1123 {
1124         struct thread_information *tip;
1125         int j;
1126
1127         for_each_tip(dip, tip, j) {
1128                 tip->cpu = j;
1129                 tip->device = dip;
1130                 tip->events_processed = 0;
1131                 tip->fd = -1;
1132                 memset(&tip->fifo, 0, sizeof(tip->fifo));
1133                 tip->leftover_ts = NULL;
1134
1135                 if (tip_open_output(dip, tip))
1136                         return 1;
1137
1138                 if (pthread_create(&tip->thread, NULL, thread_main, tip)) {
1139                         perror("pthread_create");
1140                         close_thread(tip);
1141                         return 1;
1142                 }
1143         }
1144
1145         return 0;
1146 }
1147
1148 static void stop_threads(struct device_information *dip)
1149 {
1150         struct thread_information *tip;
1151         unsigned long ret;
1152         int i;
1153
1154         for_each_tip(dip, tip, i) {
1155                 (void) pthread_join(tip->thread, (void *) &ret);
1156                 close_thread(tip);
1157         }
1158 }
1159
1160 static void stop_all_threads(void)
1161 {
1162         struct device_information *dip;
1163         int i;
1164
1165         for_each_dip(dip, i)
1166                 stop_threads(dip);
1167 }
1168
1169 static void stop_all_tracing(void)
1170 {
1171         struct device_information *dip;
1172         int i;
1173
1174         for_each_dip(dip, i)
1175                 stop_trace(dip);
1176 }
1177
1178 static void exit_trace(int status)
1179 {
1180         if (!is_trace_stopped()) {
1181                 trace_stopped = 1;
1182                 stop_all_threads();
1183                 stop_all_tracing();
1184         }
1185
1186         exit(status);
1187 }
1188
1189 static int resize_devices(char *path)
1190 {
1191         int size = (ndevs + 1) * sizeof(struct device_information);
1192
1193         device_information = realloc(device_information, size);
1194         if (!device_information) {
1195                 fprintf(stderr, "Out of memory, device %s (%d)\n", path, size);
1196                 return 1;
1197         }
1198         device_information[ndevs].path = path;
1199         ndevs++;
1200         return 0;
1201 }
1202
1203 static int open_devices(void)
1204 {
1205         struct device_information *dip;
1206         int i;
1207
1208         for_each_dip(dip, i) {
1209                 dip->fd = open(dip->path, O_RDONLY | O_NONBLOCK);
1210                 if (dip->fd < 0) {
1211                         perror(dip->path);
1212                         return 1;
1213                 }
1214                 dip->buf_size = buf_size;
1215                 dip->buf_nr = buf_nr;
1216                 dip->page_size = page_size;
1217         }
1218
1219         return 0;
1220 }
1221
1222 static int start_devices(void)
1223 {
1224         struct device_information *dip;
1225         int i, j, size;
1226
1227         size = ncpus * sizeof(struct thread_information);
1228         thread_information = malloc(size * ndevs);
1229         if (!thread_information) {
1230                 fprintf(stderr, "Out of memory, threads (%d)\n", size * ndevs);
1231                 return 1;
1232         }
1233         memset(thread_information, 0, size * ndevs);
1234
1235         for_each_dip(dip, i) {
1236                 if (start_trace(dip)) {
1237                         close(dip->fd);
1238                         fprintf(stderr, "Failed to start trace on %s\n",
1239                                 dip->path);
1240                         break;
1241                 }
1242         }
1243
1244         if (i != ndevs) {
1245                 __for_each_dip(dip, device_information, i, j)
1246                         stop_trace(dip);
1247
1248                 return 1;
1249         }
1250
1251         for_each_dip(dip, i) {
1252                 dip->threads = thread_information + (i * ncpus);
1253                 if (start_threads(dip)) {
1254                         fprintf(stderr, "Failed to start worker threads\n");
1255                         break;
1256                 }
1257         }
1258
1259         if (i != ndevs) {
1260                 __for_each_dip(dip, device_information, i, j)
1261                         stop_threads(dip);
1262                 for_each_dip(dip, i)
1263                         stop_trace(dip);
1264
1265                 return 1;
1266         }
1267
1268         return 0;
1269 }
1270
1271 static void show_stats(struct device_information *dips, int ndips, int cpus)
1272 {
1273         struct device_information *dip;
1274         struct thread_information *tip;
1275         unsigned long long events_processed, data_read;
1276         unsigned long total_drops;
1277         int i, j, no_stdout = 0;
1278
1279         if (is_stat_shown())
1280                 return;
1281
1282         if (output_name && !strcmp(output_name, "-"))
1283                 no_stdout = 1;
1284
1285         stat_shown = 1;
1286
1287         total_drops = 0;
1288         __for_each_dip(dip, dips, ndips, i) {
1289                 if (!no_stdout)
1290                         printf("Device: %s\n", dip->path);
1291                 events_processed = 0;
1292                 data_read = 0;
1293                 __for_each_tip(dip, tip, cpus, j) {
1294                         if (!no_stdout)
1295                                 printf("  CPU%3d: %20lu events, %8llu KiB data\n",
1296                                         tip->cpu, tip->events_processed,
1297                                         (tip->data_read + 1023) >> 10);
1298                         events_processed += tip->events_processed;
1299                         data_read += tip->data_read;
1300                 }
1301                 total_drops += dip->drop_count;
1302                 if (!no_stdout)
1303                         printf("  Total:  %20llu events (dropped %lu), %8llu KiB data\n",
1304                                         events_processed, dip->drop_count,
1305                                         (data_read + 1023) >> 10);
1306         }
1307
1308         if (total_drops)
1309                 fprintf(stderr, "You have dropped events, consider using a larger buffer size (-b)\n");
1310 }
1311
1312 static struct device_information *net_get_dip(struct net_connection *nc,
1313                                               struct blktrace_net_hdr *bnh)
1314 {
1315         struct device_information *dip, *cl_dip = NULL;
1316         struct cl_host *ch = nc->ch;
1317         int i;
1318
1319         for (i = 0; i < ch->ndevs; i++) {
1320                 dip = &ch->device_information[i];
1321
1322                 if (!strcmp(dip->buts_name, bnh->buts_name))
1323                         return dip;
1324
1325                 if (dip->cl_id == bnh->cl_id)
1326                         cl_dip = dip;
1327         }
1328
1329         ch->device_information = realloc(ch->device_information, (ch->ndevs + 1) * sizeof(*dip));
1330         dip = &ch->device_information[ch->ndevs];
1331         memset(dip, 0, sizeof(*dip));
1332         dip->fd = -1;
1333         dip->ch = ch;
1334         dip->cl_id = bnh->cl_id;
1335         dip->buf_size = bnh->buf_size;
1336         dip->buf_nr = bnh->buf_nr;
1337         dip->page_size = bnh->page_size;
1338
1339         if (cl_dip)
1340                 dip->cl_connect_time = cl_dip->cl_connect_time;
1341         else
1342                 dip->cl_connect_time = nc->connect_time;
1343         strcpy(dip->buts_name, bnh->buts_name);
1344         dip->path = strdup(bnh->buts_name);
1345         dip->trace_started = 1;
1346         ch->ndevs++;
1347         dip->threads = malloc(nc->ncpus * sizeof(struct thread_information));
1348         memset(dip->threads, 0, nc->ncpus * sizeof(struct thread_information));
1349
1350         /*
1351          * open all files
1352          */
1353         for (i = 0; i < nc->ncpus; i++) {
1354                 struct thread_information *tip = &dip->threads[i];
1355
1356                 tip->cpu = i;
1357                 tip->device = dip;
1358                 tip->fd = -1;
1359                 tip->nc = nc;
1360                 
1361                 if (tip_open_output(dip, tip))
1362                         return NULL;
1363
1364                 tip->nc = NULL;
1365         }
1366
1367         return dip;
1368 }
1369
1370 static struct thread_information *net_get_tip(struct net_connection *nc,
1371                                               struct blktrace_net_hdr *bnh)
1372 {
1373         struct device_information *dip;
1374         struct thread_information *tip;
1375
1376         dip = net_get_dip(nc, bnh);
1377         if (!dip->trace_started) {
1378                 fprintf(stderr, "Events for closed devices %s\n", dip->buts_name);
1379                 return NULL;
1380         }
1381
1382         tip = &dip->threads[bnh->cpu];
1383         if (!tip->nc)
1384                 tip->nc = nc;
1385         
1386         return tip;
1387 }
1388
1389 static int net_get_header(struct net_connection *nc,
1390                           struct blktrace_net_hdr *bnh)
1391 {
1392         int fl = fcntl(nc->in_fd, F_GETFL);
1393         int bytes_left, ret;
1394         void *p = bnh;
1395
1396         fcntl(nc->in_fd, F_SETFL, fl | O_NONBLOCK);
1397         bytes_left = sizeof(*bnh);
1398         while (bytes_left && !is_done()) {
1399                 ret = recv(nc->in_fd, p, bytes_left, MSG_WAITALL);
1400                 if (ret < 0) {
1401                         if (errno != EAGAIN) {
1402                                 perror("recv header");
1403                                 return 1;
1404                         }
1405                         usleep(1000);
1406                         continue;
1407                 } else if (!ret) {
1408                         usleep(1000);
1409                         continue;
1410                 } else {
1411                         p += ret;
1412                         bytes_left -= ret;
1413                 }
1414         }
1415         fcntl(nc->in_fd, F_SETFL, fl & ~O_NONBLOCK);
1416         return bytes_left;
1417 }
1418
1419 /*
1420  * finalize a net client: truncate files, show stats, cleanup, etc
1421  */
1422 static void device_done(struct net_connection *nc, struct device_information *dip)
1423 {
1424         struct thread_information *tip;
1425         int i;
1426
1427         __for_each_tip(dip, tip, nc->ncpus, i)
1428                 tip_ftrunc_final(tip);
1429
1430         show_stats(dip, 1, nc->ncpus);
1431
1432         /*
1433          * cleanup for next run
1434          */
1435         __for_each_tip(dip, tip, nc->ncpus, i) {
1436                 if (tip->ofile)
1437                         fclose(tip->ofile);
1438         }
1439
1440         free(dip->threads);
1441         free(dip->path);
1442
1443         close(nc->in_fd);
1444         nc->in_fd = -1;
1445
1446         stat_shown = 0;
1447 }
1448
1449 static inline int in_addr_eq(struct in_addr a, struct in_addr b)
1450 {
1451         return a.s_addr == b.s_addr;
1452 }
1453
1454 static void net_add_client_host(struct cl_host *ch)
1455 {
1456         ch->list_next = cl_host_list;
1457         cl_host_list = ch;
1458         cl_hosts++;
1459 }
1460
1461 static void net_remove_client_host(struct cl_host *ch)
1462 {
1463         struct cl_host *p, *c;
1464         
1465         for (p = c = cl_host_list; c; c = c->list_next) {
1466                 if (c == ch) {
1467                         if (p == c)
1468                                 cl_host_list = c->list_next;
1469                         else
1470                                 p->list_next = c->list_next;
1471                         cl_hosts--;
1472                         return;
1473                 }
1474                 p = c;
1475         }
1476 }
1477
1478 static struct cl_host *net_find_client_host(struct in_addr cl_in_addr)
1479 {
1480         struct cl_host *ch = cl_host_list;
1481
1482         while (ch) {
1483                 if (in_addr_eq(ch->cl_in_addr, cl_in_addr))
1484                         return ch;
1485                 ch = ch->list_next;
1486         }
1487
1488         return NULL;
1489 }
1490
1491 static void net_client_host_done(struct cl_host *ch)
1492 {
1493         free(ch->device_information);
1494         free(ch->net_connections);
1495         net_connects -= ch->nconn;
1496         net_remove_client_host(ch);
1497         free(ch);
1498 }
1499
1500 /*
1501  * handle incoming events from a net client
1502  */
1503 static int net_client_data(struct net_connection *nc)
1504 {
1505         struct thread_information *tip;
1506         struct blktrace_net_hdr bnh;
1507
1508         if (net_get_header(nc, &bnh))
1509                 return 1;
1510
1511         if (data_is_native == -1 && check_data_endianness(bnh.magic)) {
1512                 fprintf(stderr, "server: received data is bad\n");
1513                 return 1;
1514         }
1515
1516         if (!data_is_native) {
1517                 bnh.magic = be32_to_cpu(bnh.magic);
1518                 bnh.cpu = be32_to_cpu(bnh.cpu);
1519                 bnh.max_cpus = be32_to_cpu(bnh.max_cpus);
1520                 bnh.len = be32_to_cpu(bnh.len);
1521                 bnh.cl_id = be32_to_cpu(bnh.cl_id);
1522                 bnh.buf_size = be32_to_cpu(bnh.buf_size);
1523                 bnh.buf_nr = be32_to_cpu(bnh.buf_nr);
1524                 bnh.page_size = be32_to_cpu(bnh.page_size);
1525         }
1526
1527         if ((bnh.magic & 0xffffff00) != BLK_IO_TRACE_MAGIC) {
1528                 fprintf(stderr, "server: bad data magic\n");
1529                 return 1;
1530         }
1531
1532         if (nc->ncpus == -1)
1533                 nc->ncpus = bnh.max_cpus;
1534
1535         /*
1536          * len == 0 means that the other end signalled end-of-run
1537          */
1538         if (!bnh.len) {
1539                 /*
1540                  * overload cpu count with dropped events
1541                  */
1542                 struct device_information *dip;
1543
1544                 dip = net_get_dip(nc, &bnh);
1545                 dip->drop_count = bnh.cpu;
1546                 dip->trace_started = 0;
1547
1548                 printf("server: end of run for %s\n", dip->buts_name);
1549
1550                 device_done(nc, dip);
1551
1552                 if (++nc->ch->ndevs_done == nc->ch->ndevs)
1553                         net_client_host_done(nc->ch);
1554
1555                 return 0;
1556         }
1557
1558         tip = net_get_tip(nc, &bnh);
1559         if (!tip)
1560                 return 1;
1561
1562         if (mmap_subbuf(tip, bnh.len))
1563                 return 1;
1564
1565         return 0;
1566 }
1567
1568 static void net_add_connection(int listen_fd, struct sockaddr_in *addr)
1569 {
1570         socklen_t socklen = sizeof(*addr);
1571         struct net_connection *nc;
1572         struct cl_host *ch;
1573         int in_fd;
1574
1575         in_fd = accept(listen_fd, (struct sockaddr *) addr, &socklen);
1576         if (in_fd < 0) {
1577                 perror("accept");
1578                 return;
1579         }
1580
1581         ch = net_find_client_host(addr->sin_addr);
1582         if (!ch) {
1583                 if (cl_hosts == NET_MAX_CL_HOSTS) {
1584                         fprintf(stderr, "server: no more clients allowed\n");
1585                         return;
1586                 }
1587                 ch = malloc(sizeof(struct cl_host));
1588                 memset(ch, 0, sizeof(*ch));
1589                 ch->cl_in_addr = addr->sin_addr;
1590                 net_add_client_host(ch);
1591
1592                 printf("server: connection from %s\n", inet_ntoa(addr->sin_addr));
1593         }
1594
1595         ch->net_connections = realloc(ch->net_connections, (ch->nconn + 1) * sizeof(*nc));
1596         nc = &ch->net_connections[ch->nconn++];
1597         memset(nc, 0, sizeof(*nc));
1598
1599         time(&nc->connect_time);
1600         nc->ch = ch;
1601         nc->in_fd = in_fd;
1602         nc->ncpus = -1;
1603         net_connects++;
1604 }
1605
1606 /*
1607  * event driven loop, handle new incoming connections and data from
1608  * existing connections
1609  */
1610 static void net_server_handle_connections(int listen_fd,
1611                                           struct sockaddr_in *addr)
1612 {
1613         struct pollfd *pfds = NULL;
1614         struct net_connection **ncs = NULL;
1615         int max_connects = 0;
1616         int i, nconns, events;
1617         struct cl_host *ch;
1618         struct net_connection *nc;
1619         
1620         printf("server: waiting for connections...\n");
1621
1622         while (!is_done()) {
1623                 if (net_connects >= max_connects) {
1624                         pfds = realloc(pfds, (net_connects + 1) * sizeof(*pfds));
1625                         ncs = realloc(ncs, (net_connects + 1) * sizeof(*ncs));
1626                         max_connects = net_connects + 1;
1627                 }
1628                 /*
1629                  * the zero entry is for incoming connections, remaining
1630                  * entries for clients
1631                  */
1632                 pfds[0].fd = listen_fd;
1633                 pfds[0].events = POLLIN;
1634                 nconns = 0;
1635                 for_each_cl_host(ch) {
1636                         for (i = 0; i < ch->nconn; i++) {
1637                                 nc = &ch->net_connections[i];
1638                                 pfds[nconns + 1].fd = nc->in_fd;
1639                                 pfds[nconns + 1].events = POLLIN;
1640                                 ncs[nconns++] = nc;
1641                         }
1642                 }
1643
1644                 events = poll(pfds, 1 + nconns, -1);
1645                 if (events < 0) {
1646                         if (errno == EINTR)
1647                                 continue;
1648
1649                         perror("poll");
1650                         break;
1651                 } else if (!events)
1652                         continue;
1653
1654                 if (pfds[0].revents & POLLIN) {
1655                         net_add_connection(listen_fd, addr);
1656                         events--;
1657                 }
1658
1659                 for (i = 0; events && i < nconns; i++) {
1660                         if (pfds[i + 1].revents & POLLIN) {
1661                                 net_client_data(ncs[i]);
1662                                 events--;
1663                         }
1664                 }
1665         }
1666 }
1667
1668 /*
1669  * Start here when we are in server mode - just fetch data from the network
1670  * and dump to files
1671  */
1672 static int net_server(void)
1673 {
1674         struct sockaddr_in addr;
1675         int fd, opt;
1676
1677         fd = socket(AF_INET, SOCK_STREAM, 0);
1678         if (fd < 0) {
1679                 perror("server: socket");
1680                 return 1;
1681         }
1682
1683         opt = 1;
1684         if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
1685                 perror("setsockopt");
1686                 return 1;
1687         }
1688
1689         memset(&addr, 0, sizeof(addr));
1690         addr.sin_family = AF_INET;
1691         addr.sin_addr.s_addr = htonl(INADDR_ANY);
1692         addr.sin_port = htons(net_port);
1693
1694         if (bind(fd, (struct sockaddr *) &addr, sizeof(addr)) < 0) {
1695                 perror("bind");
1696                 return 1;
1697         }
1698
1699         if (listen(fd, 1) < 0) {
1700                 perror("listen");
1701                 return 1;
1702         }
1703
1704         net_server_handle_connections(fd, &addr);
1705         return 0;
1706 }
1707
1708 /*
1709  * Setup outgoing network connection where we will transmit data
1710  */
1711 static int net_setup_client_cpu(int i, struct sockaddr_in *addr)
1712 {
1713         int fd;
1714
1715         fd = socket(AF_INET, SOCK_STREAM, 0);
1716         if (fd < 0) {
1717                 perror("client: socket");
1718                 return 1;
1719         }
1720
1721         if (connect(fd, (struct sockaddr *) addr, sizeof(*addr)) < 0) {
1722                 perror("client: connect");
1723                 return 1;
1724         }
1725
1726         net_out_fd[i] = fd;
1727         return 0;
1728 }
1729
1730 static int net_setup_client(void)
1731 {
1732         struct sockaddr_in addr;
1733         int i;
1734
1735         memset(&addr, 0, sizeof(addr));
1736         addr.sin_family = AF_INET;
1737         addr.sin_port = htons(net_port);
1738
1739         if (inet_aton(hostname, &addr.sin_addr) != 1) {
1740                 struct hostent *hent = gethostbyname(hostname);
1741                 if (!hent) {
1742                         perror("gethostbyname");
1743                         return 1;
1744                 }
1745
1746                 memcpy(&addr.sin_addr, hent->h_addr, 4);
1747                 strcpy(hostname, hent->h_name);
1748         }
1749
1750         printf("blktrace: connecting to %s\n", hostname);
1751
1752         net_out_fd = malloc(ncpus * sizeof(*net_out_fd));
1753         for (i = 0; i < ncpus; i++) {
1754                 if (net_setup_client_cpu(i, &addr))
1755                         return 1;
1756         }
1757
1758         printf("blktrace: connected!\n");
1759         
1760         return 0;
1761 }
1762
1763 static char usage_str[] = \
1764         "-d <dev> [ -r debugfs path ] [ -o <output> ] [-k ] [ -w time ]\n" \
1765         "[ -a action ] [ -A action mask ] [ -I  <devs file> ] [ -v ]\n\n" \
1766         "\t-d Use specified device. May also be given last after options\n" \
1767         "\t-r Path to mounted debugfs, defaults to /sys/kernel/debug\n" \
1768         "\t-o File(s) to send output to\n" \
1769         "\t-D Directory to prepend to output file names\n" \
1770         "\t-k Kill a running trace\n" \
1771         "\t-w Stop after defined time, in seconds\n" \
1772         "\t-a Only trace specified actions. See documentation\n" \
1773         "\t-A Give trace mask as a single value. See documentation\n" \
1774         "\t-b Sub buffer size in KiB\n" \
1775         "\t-n Number of sub buffers\n" \
1776         "\t-l Run in network listen mode (blktrace server)\n" \
1777         "\t-h Run in network client mode, connecting to the given host\n" \
1778         "\t-p Network port to use (default 8462)\n" \
1779         "\t-s Make the network client NOT use sendfile() to transfer data\n" \
1780         "\t-I Add devices found in <devs file>\n" \
1781         "\t-V Print program version info\n\n";
1782
1783 static void show_usage(char *program)
1784 {
1785         fprintf(stderr, "Usage: %s %s %s",program, blktrace_version, usage_str);
1786 }
1787
1788 int main(int argc, char *argv[])
1789 {
1790         static char default_debugfs_path[] = "/sys/kernel/debug";
1791         struct statfs st;
1792         int i, c;
1793         int stop_watch = 0;
1794         int act_mask_tmp = 0;
1795
1796         while ((c = getopt_long(argc, argv, S_OPTS, l_opts, NULL)) >= 0) {
1797                 switch (c) {
1798                 case 'a':
1799                         i = find_mask_map(optarg);
1800                         if (i < 0) {
1801                                 fprintf(stderr,"Invalid action mask %s\n",
1802                                         optarg);
1803                                 return 1;
1804                         }
1805                         act_mask_tmp |= i;
1806                         break;
1807
1808                 case 'A':
1809                         if ((sscanf(optarg, "%x", &i) != 1) || 
1810                                                         !valid_act_opt(i)) {
1811                                 fprintf(stderr,
1812                                         "Invalid set action mask %s/0x%x\n",
1813                                         optarg, i);
1814                                 return 1;
1815                         }
1816                         act_mask_tmp = i;
1817                         break;
1818
1819                 case 'd':
1820                         if (resize_devices(optarg) != 0)
1821                                 return 1;
1822                         break;
1823
1824                 case 'I': {
1825                         char dev_line[256];
1826                         FILE *ifp = fopen(optarg, "r");
1827
1828                         if (!ifp) {
1829                                 fprintf(stderr, 
1830                                         "Invalid file for devices %s\n", 
1831                                         optarg);
1832                                 return 1;
1833                         }
1834
1835                         while (fscanf(ifp, "%s\n", dev_line) == 1)
1836                                 if (resize_devices(strdup(dev_line)) != 0)
1837                                         return 1;
1838                         break;
1839                 }
1840                         
1841
1842                 case 'r':
1843                         debugfs_path = optarg;
1844                         break;
1845
1846                 case 'o':
1847                         output_name = optarg;
1848                         break;
1849                 case 'k':
1850                         kill_running_trace = 1;
1851                         break;
1852                 case 'w':
1853                         stop_watch = atoi(optarg);
1854                         if (stop_watch <= 0) {
1855                                 fprintf(stderr,
1856                                         "Invalid stopwatch value (%d secs)\n",
1857                                         stop_watch);
1858                                 return 1;
1859                         }
1860                         break;
1861                 case 'V':
1862                 case 'v':
1863                         printf("%s version %s\n", argv[0], blktrace_version);
1864                         return 0;
1865                 case 'b':
1866                         buf_size = strtoul(optarg, NULL, 10);
1867                         if (buf_size <= 0 || buf_size > 16*1024) {
1868                                 fprintf(stderr,
1869                                         "Invalid buffer size (%lu)\n",buf_size);
1870                                 return 1;
1871                         }
1872                         buf_size <<= 10;
1873                         break;
1874                 case 'n':
1875                         buf_nr = strtoul(optarg, NULL, 10);
1876                         if (buf_nr <= 0) {
1877                                 fprintf(stderr,
1878                                         "Invalid buffer nr (%lu)\n", buf_nr);
1879                                 return 1;
1880                         }
1881                         break;
1882                 case 'D':
1883                         output_dir = optarg;
1884                         break;
1885                 case 'h':
1886                         net_mode = Net_client;
1887                         strcpy(hostname, optarg);
1888                         break;
1889                 case 'l':
1890                         net_mode = Net_server;
1891                         break;
1892                 case 'p':
1893                         net_port = atoi(optarg);
1894                         break;
1895                 case 's':
1896                         net_use_sendfile = 0;
1897                         break;
1898                 default:
1899                         show_usage(argv[0]);
1900                         return 1;
1901                 }
1902         }
1903
1904         setlocale(LC_NUMERIC, "en_US");
1905
1906         page_size = getpagesize();
1907
1908         if (net_mode == Net_server) {
1909                 if (output_name) {
1910                         fprintf(stderr, "-o ignored in server mode\n");
1911                         output_name = NULL;
1912                 }
1913
1914                 return net_server();
1915         }
1916
1917         while (optind < argc) {
1918                 if (resize_devices(argv[optind++]) != 0)
1919                         return 1;
1920         }
1921
1922         if (ndevs == 0) {
1923                 show_usage(argv[0]);
1924                 return 1;
1925         }
1926
1927         if (act_mask_tmp != 0)
1928                 act_mask = act_mask_tmp;
1929
1930         if (!debugfs_path)
1931                 debugfs_path = default_debugfs_path;
1932
1933         if (statfs(debugfs_path, &st) < 0) {
1934                 perror("statfs");
1935                 fprintf(stderr,"%s does not appear to be a valid path\n",
1936                         debugfs_path);
1937                 return 1;
1938         } else if (st.f_type != (long) DEBUGFS_TYPE) {
1939                 fprintf(stderr,"%s does not appear to be a debug filesystem\n",
1940                         debugfs_path);
1941                 return 1;
1942         }
1943
1944         if (open_devices() != 0)
1945                 return 1;
1946
1947         if (kill_running_trace) {
1948                 stop_all_traces();
1949                 return 0;
1950         }
1951
1952         ncpus = sysconf(_SC_NPROCESSORS_ONLN);
1953         if (ncpus < 0) {
1954                 fprintf(stderr, "sysconf(_SC_NPROCESSORS_ONLN) failed\n");
1955                 return 1;
1956         }
1957
1958         signal(SIGINT, handle_sigint);
1959         signal(SIGHUP, handle_sigint);
1960         signal(SIGTERM, handle_sigint);
1961         signal(SIGALRM, handle_sigint);
1962         signal(SIGPIPE, SIG_IGN);
1963
1964         if (net_mode == Net_client && net_setup_client())
1965                 return 1;
1966
1967         if (start_devices() != 0)
1968                 return 1;
1969
1970         atexit(stop_all_tracing);
1971
1972         if (stop_watch)
1973                 alarm(stop_watch);
1974
1975         wait_for_threads();
1976
1977         if (!is_trace_stopped()) {
1978                 trace_stopped = 1;
1979                 stop_all_threads();
1980                 stop_all_traces();
1981         }
1982
1983         show_stats(device_information, ndevs, ncpus);
1984
1985         return 0;
1986 }
1987