[PATCH] blktrace: various net related fixes (accounting, stats, etc)
[blktrace.git] / blktrace.c
1 /*
2  * block queue tracing application
3  *
4  * Copyright (C) 2005 Jens Axboe <axboe@suse.de>
5  *
6  *  This program is free software; you can redistribute it and/or modify
7  *  it under the terms of the GNU General Public License as published by
8  *  the Free Software Foundation; either version 2 of the License, or
9  *  (at your option) any later version.
10  *
11  *  This program is distributed in the hope that it will be useful,
12  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  *  GNU General Public License for more details.
15  *
16  *  You should have received a copy of the GNU General Public License
17  *  along with this program; if not, write to the Free Software
18  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19  *
20  */
21 #include <pthread.h>
22 #include <sys/types.h>
23 #include <sys/stat.h>
24 #include <unistd.h>
25 #include <locale.h>
26 #include <signal.h>
27 #include <fcntl.h>
28 #include <string.h>
29 #include <sys/ioctl.h>
30 #include <sys/param.h>
31 #include <sys/statfs.h>
32 #include <sys/poll.h>
33 #include <sys/mman.h>
34 #include <sys/socket.h>
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <sched.h>
38 #include <ctype.h>
39 #include <getopt.h>
40 #include <errno.h>
41 #include <netinet/in.h>
42 #include <arpa/inet.h>
43 #include <netdb.h>
44
45 #include "blktrace.h"
46 #include "barrier.h"
47
48 static char blktrace_version[] = "0.99";
49
50 /*
51  * You may want to increase this even more, if you are logging at a high
52  * rate and see skipped/missed events
53  */
54 #define BUF_SIZE        (512 * 1024)
55 #define BUF_NR          (4)
56
57 #define OFILE_BUF       (128 * 1024)
58
59 #define RELAYFS_TYPE    0xF0B4A981
60
61 #define S_OPTS  "d:a:A:r:o:kw:Vb:n:D:lh:p:"
62 static struct option l_opts[] = {
63         {
64                 .name = "dev",
65                 .has_arg = required_argument,
66                 .flag = NULL,
67                 .val = 'd'
68         },
69         {
70                 .name = "act-mask",
71                 .has_arg = required_argument,
72                 .flag = NULL,
73                 .val = 'a'
74         },
75         {
76                 .name = "set-mask",
77                 .has_arg = required_argument,
78                 .flag = NULL,
79                 .val = 'A'
80         },
81         {
82                 .name = "relay",
83                 .has_arg = required_argument,
84                 .flag = NULL,
85                 .val = 'r'
86         },
87         {
88                 .name = "output",
89                 .has_arg = required_argument,
90                 .flag = NULL,
91                 .val = 'o'
92         },
93         {
94                 .name = "kill",
95                 .has_arg = no_argument,
96                 .flag = NULL,
97                 .val = 'k'
98         },
99         {
100                 .name = "stopwatch",
101                 .has_arg = required_argument,
102                 .flag = NULL,
103                 .val = 'w'
104         },
105         {
106                 .name = "version",
107                 .has_arg = no_argument,
108                 .flag = NULL,
109                 .val = 'V'
110         },
111         {
112                 .name = "buffer-size",
113                 .has_arg = required_argument,
114                 .flag = NULL,
115                 .val = 'b'
116         },
117         {
118                 .name = "num-sub-buffers",
119                 .has_arg = required_argument,
120                 .flag = NULL,
121                 .val = 'n'
122         },
123         {
124                 .name = "output-dir",
125                 .has_arg = required_argument,
126                 .flag = NULL,
127                 .val = 'D'
128         },
129         {
130                 .name = "listen",
131                 .has_arg = no_argument,
132                 .flag = NULL,
133                 .val = 'l'
134         },
135         {
136                 .name = "host",
137                 .has_arg = required_argument,
138                 .flag = NULL,
139                 .val = 'h'
140         },
141         {
142                 .name = "port",
143                 .has_arg = required_argument,
144                 .flag = NULL,
145                 .val = 'p'
146         },
147         {
148                 .name = NULL,
149         }
150 };
151
152 struct tip_subbuf {
153         void *buf;
154         unsigned int len;
155         unsigned int max_len;
156 };
157
158 #define FIFO_SIZE       (1024)  /* should be plenty big! */
159 #define CL_SIZE         (128)   /* cache line, any bigger? */
160
161 struct tip_subbuf_fifo {
162         int tail __attribute__((aligned(CL_SIZE)));
163         int head __attribute__((aligned(CL_SIZE)));
164         struct tip_subbuf *q[FIFO_SIZE];
165 };
166
167 struct thread_information {
168         int cpu;
169         pthread_t thread;
170
171         int fd;
172         void *fd_buf;
173         char fn[MAXPATHLEN + 64];
174
175         FILE *ofile;
176         char *ofile_buffer;
177         int ofile_stdout;
178         int ofile_mmap;
179
180         unsigned long events_processed;
181         unsigned long long data_read;
182         struct device_information *device;
183
184         int exited;
185
186         /*
187          * piped fifo buffers
188          */
189         struct tip_subbuf_fifo fifo;
190         struct tip_subbuf *leftover_ts;
191
192         /*
193          * mmap controlled output files
194          */
195         unsigned long long fs_size;
196         unsigned long long fs_max_size;
197         unsigned long fs_off;
198         void *fs_buf;
199         unsigned long fs_buf_len;
200 };
201
202 struct device_information {
203         int fd;
204         char *path;
205         char buts_name[32];
206         volatile int trace_started;
207         unsigned long drop_count;
208         struct thread_information *threads;
209 };
210
211 static int ncpus;
212 static struct thread_information *thread_information;
213 static int ndevs;
214 static struct device_information *device_information;
215
216 /* command line option globals */
217 static char *relay_path;
218 static char *output_name;
219 static char *output_dir;
220 static int act_mask = ~0U;
221 static int kill_running_trace;
222 static unsigned long buf_size = BUF_SIZE;
223 static unsigned long buf_nr = BUF_NR;
224 static unsigned int page_size;
225
226 #define is_done()       (*(volatile int *)(&done))
227 static volatile int done;
228
229 #define is_trace_stopped()      (*(volatile int *)(&trace_stopped))
230 static volatile int trace_stopped;
231
232 #define is_stat_shown() (*(volatile int *)(&stat_shown))
233 static volatile int stat_shown;
234
235 int data_is_native = -1;
236
237 static void exit_trace(int status);
238
239 #define dip_tracing(dip)        (*(volatile int *)(&(dip)->trace_started))
240 #define dip_set_tracing(dip, v) ((dip)->trace_started = (v))
241
242 #define __for_each_dip(__d, __i, __e)   \
243         for (__i = 0, __d = device_information; __i < __e; __i++, __d++)
244
245 #define for_each_dip(__d, __i)  __for_each_dip(__d, __i, ndevs)
246 #define for_each_tip(__d, __t, __j)     \
247         for (__j = 0, __t = (__d)->threads; __j < ncpus; __j++, __t++)
248
249 /*
250  * networking stuff follows. we include a magic number so we know whether
251  * to endianness convert or not
252  */
253 struct blktrace_net_hdr {
254         u32 magic;              /* same as trace magic */
255         char buts_name[32];     /* trace name */
256         u32 cpu;                /* for which cpu */
257         u32 max_cpus;
258         u32 len;                /* length of following trace data */
259 };
260
261 #define TRACE_NET_PORT          (8462)
262
263 enum {
264         Net_none = 0,
265         Net_server,
266         Net_client,
267 };
268
269 /*
270  * network cmd line params
271  */
272 static char hostname[MAXHOSTNAMELEN];
273 static int net_port = TRACE_NET_PORT;
274 static int net_mode = 0;
275
276 static int net_in_fd = -1;
277 static int net_out_fd = -1;
278
279 static void handle_sigint(__attribute__((__unused__)) int sig)
280 {
281         done = 1;
282 }
283
284 static int get_dropped_count(const char *buts_name)
285 {
286         int fd;
287         char tmp[MAXPATHLEN + 64];
288
289         snprintf(tmp, sizeof(tmp), "%s/block/%s/dropped",
290                  relay_path, buts_name);
291
292         fd = open(tmp, O_RDONLY);
293         if (fd < 0) {
294                 /*
295                  * this may be ok, if the kernel doesn't support dropped counts
296                  */
297                 if (errno == ENOENT)
298                         return 0;
299
300                 fprintf(stderr, "Couldn't open dropped file %s\n", tmp);
301                 return -1;
302         }
303
304         if (read(fd, tmp, sizeof(tmp)) < 0) {
305                 perror(tmp);
306                 close(fd);
307                 return -1;
308         }
309
310         close(fd);
311
312         return atoi(tmp);
313 }
314
315 static int start_trace(struct device_information *dip)
316 {
317         struct blk_user_trace_setup buts;
318
319         memset(&buts, 0, sizeof(buts));
320         buts.buf_size = buf_size;
321         buts.buf_nr = buf_nr;
322         buts.act_mask = act_mask;
323
324         if (ioctl(dip->fd, BLKTRACESETUP, &buts) < 0) {
325                 perror("BLKTRACESETUP");
326                 return 1;
327         }
328
329         if (ioctl(dip->fd, BLKTRACESTART) < 0) {
330                 perror("BLKTRACESTART");
331                 return 1;
332         }
333
334         memcpy(dip->buts_name, buts.name, sizeof(dip->buts_name));
335         dip_set_tracing(dip, 1);
336         return 0;
337 }
338
339 static void stop_trace(struct device_information *dip)
340 {
341         if (dip_tracing(dip) || kill_running_trace) {
342                 dip_set_tracing(dip, 0);
343
344                 if (ioctl(dip->fd, BLKTRACESTOP) < 0)
345                         perror("BLKTRACESTOP");
346                 if (ioctl(dip->fd, BLKTRACETEARDOWN) < 0)
347                         perror("BLKTRACETEARDOWN");
348
349                 close(dip->fd);
350                 dip->fd = -1;
351         }
352 }
353
354 static void stop_all_traces(void)
355 {
356         struct device_information *dip;
357         int i;
358
359         for_each_dip(dip, i) {
360                 dip->drop_count = get_dropped_count(dip->buts_name);
361                 stop_trace(dip);
362         }
363 }
364
365 static void wait_for_data(struct thread_information *tip)
366 {
367         struct pollfd pfd = { .fd = tip->fd, .events = POLLIN };
368
369         do {
370                 poll(&pfd, 1, 100);
371                 if (pfd.revents & POLLIN)
372                         break;
373                 if (tip->ofile_stdout)
374                         break;
375         } while (!is_done());
376 }
377
378 static int read_data_file(struct thread_information *tip, void *buf, int len)
379 {
380         int ret = 0;
381
382         do {
383                 wait_for_data(tip);
384
385                 ret = read(tip->fd, buf, len);
386                 if (!ret)
387                         continue;
388                 else if (ret > 0)
389                         return ret;
390                 else {
391                         if (errno != EAGAIN) {
392                                 perror(tip->fn);
393                                 fprintf(stderr,"Thread %d failed read of %s\n",
394                                         tip->cpu, tip->fn);
395                                 break;
396                         }
397                         continue;
398                 }
399         } while (!is_done());
400
401         return ret;
402
403 }
404
405 static int read_data_net(struct thread_information *tip, void *buf, int len)
406 {
407         unsigned int bytes_left = len;
408         int ret = 0;
409
410         do {
411                 ret = recv(net_in_fd, buf, bytes_left, MSG_WAITALL);
412
413                 if (!ret)
414                         continue;
415                 else if (ret < 0) {
416                         if (errno != EAGAIN) {
417                                 perror(tip->fn);
418                                 fprintf(stderr, "server: failed read\n");
419                                 return 0;
420                         }
421                         continue;
422                 } else {
423                         buf += ret;
424                         bytes_left -= ret;
425                 }
426         } while (!is_done() && bytes_left);
427
428         return len - bytes_left;
429 }
430
431 static int read_data(struct thread_information *tip, void *buf, int len)
432 {
433         int ret;
434
435         if (net_mode == Net_server)
436                 ret = read_data_net(tip, buf, len);
437         else
438                 ret = read_data_file(tip, buf, len);
439
440         if (ret > 0)
441                 tip->data_read += ret;
442
443         return ret;
444 }
445
446 static inline struct tip_subbuf *
447 subbuf_fifo_dequeue(struct thread_information *tip)
448 {
449         const int head = tip->fifo.head;
450         const int next = (head + 1) & (FIFO_SIZE - 1);
451
452         if (head != tip->fifo.tail) {
453                 struct tip_subbuf *ts = tip->fifo.q[head];
454
455                 store_barrier();
456                 tip->fifo.head = next;
457                 return ts;
458         }
459
460         return NULL;
461 }
462
463 static inline int subbuf_fifo_queue(struct thread_information *tip,
464                                     struct tip_subbuf *ts)
465 {
466         const int tail = tip->fifo.tail;
467         const int next = (tail + 1) & (FIFO_SIZE - 1);
468
469         if (next != tip->fifo.head) {
470                 tip->fifo.q[tail] = ts;
471                 store_barrier();
472                 tip->fifo.tail = next;
473                 return 0;
474         }
475
476         fprintf(stderr, "fifo too small!\n");
477         return 1;
478 }
479
480 /*
481  * For file output, truncate and mmap the file appropriately
482  */
483 static int mmap_subbuf(struct thread_information *tip, unsigned int maxlen)
484 {
485         int ofd = fileno(tip->ofile);
486         int ret;
487
488         /*
489          * extend file, if we have to. use chunks of 16 subbuffers.
490          */
491         if (tip->fs_off + buf_size > tip->fs_buf_len) {
492                 if (tip->fs_buf) {
493                         munlock(tip->fs_buf, tip->fs_buf_len);
494                         munmap(tip->fs_buf, tip->fs_buf_len);
495                         tip->fs_buf = NULL;
496                 }
497
498                 tip->fs_off = tip->fs_size & (page_size - 1);
499                 tip->fs_buf_len = (16 * buf_size) - tip->fs_off;
500                 tip->fs_max_size += tip->fs_buf_len;
501
502                 if (ftruncate(ofd, tip->fs_max_size) < 0) {
503                         perror("ftruncate");
504                         return -1;
505                 }
506
507                 tip->fs_buf = mmap(NULL, tip->fs_buf_len, PROT_WRITE,
508                                    MAP_SHARED, ofd, tip->fs_size - tip->fs_off);
509                 if (tip->fs_buf == MAP_FAILED) {
510                         perror("mmap");
511                         return -1;
512                 }
513                 mlock(tip->fs_buf, tip->fs_buf_len);
514         }
515
516         ret = read_data(tip, tip->fs_buf + tip->fs_off, maxlen);
517         if (ret >= 0) {
518                 tip->fs_size += ret;
519                 tip->fs_off += ret;
520                 return 0;
521         }
522
523         return -1;
524 }
525
526 /*
527  * Use the copy approach for pipes and network
528  */
529 static int get_subbuf(struct thread_information *tip)
530 {
531         struct tip_subbuf *ts = malloc(sizeof(*ts));
532         int ret;
533
534         ts->buf = malloc(buf_size);
535         ts->max_len = buf_size;
536
537         ret = read_data(tip, ts->buf, ts->max_len);
538         if (ret > 0) {
539                 ts->len = ret;
540                 return subbuf_fifo_queue(tip, ts);
541         }
542
543         return ret;
544 }
545
546 static void close_thread(struct thread_information *tip)
547 {
548         if (tip->fd != -1)
549                 close(tip->fd);
550         if (tip->ofile)
551                 fclose(tip->ofile);
552         if (tip->ofile_buffer)
553                 free(tip->ofile_buffer);
554         if (tip->fd_buf)
555                 free(tip->fd_buf);
556
557         tip->fd = -1;
558         tip->ofile = NULL;
559         tip->ofile_buffer = NULL;
560         tip->fd_buf = NULL;
561 }
562
563 static void tip_ftrunc_final(struct thread_information *tip)
564 {
565         /*
566          * truncate to right size and cleanup mmap
567          */
568         if (tip->ofile_mmap) {
569                 int ofd = fileno(tip->ofile);
570
571                 if (tip->fs_buf)
572                         munmap(tip->fs_buf, tip->fs_buf_len);
573
574                 ftruncate(ofd, tip->fs_size);
575         }
576 }
577
578 static void *thread_main(void *arg)
579 {
580         struct thread_information *tip = arg;
581         pid_t pid = getpid();
582         cpu_set_t cpu_mask;
583
584         CPU_ZERO(&cpu_mask);
585         CPU_SET((tip->cpu), &cpu_mask);
586
587         if (sched_setaffinity(pid, sizeof(cpu_mask), &cpu_mask) == -1) {
588                 perror("sched_setaffinity");
589                 exit_trace(1);
590         }
591
592         snprintf(tip->fn, sizeof(tip->fn), "%s/block/%s/trace%d",
593                         relay_path, tip->device->buts_name, tip->cpu);
594         tip->fd = open(tip->fn, O_RDONLY);
595         if (tip->fd < 0) {
596                 perror(tip->fn);
597                 fprintf(stderr,"Thread %d failed open of %s\n", tip->cpu,
598                         tip->fn);
599                 exit_trace(1);
600         }
601
602         while (!is_done()) {
603                 if (tip->ofile_mmap && net_mode != Net_client) {
604                         if (mmap_subbuf(tip, buf_size))
605                                 break;
606                 } else {
607                         if (get_subbuf(tip))
608                                 break;
609                 }
610         }
611
612         tip_ftrunc_final(tip);
613         tip->exited = 1;
614         return NULL;
615 }
616
617 static int write_data_net(int fd, void *buf, unsigned int buf_len)
618 {
619         unsigned int bytes_left = buf_len;
620         int ret;
621
622         while (bytes_left) {
623                 ret = send(fd, buf, bytes_left, 0);
624                 if (ret < 0) {
625                         perror("send");
626                         return 1;
627                 }
628
629                 buf += ret;
630                 bytes_left -= ret;
631         }
632
633         return 0;
634 }
635
636 static int flush_subbuf_net(struct thread_information *tip,
637                             struct tip_subbuf *ts)
638 {
639         struct blktrace_net_hdr hdr;
640
641         hdr.magic = BLK_IO_TRACE_MAGIC;
642         strcpy(hdr.buts_name, tip->device->buts_name);
643         hdr.cpu = tip->cpu;
644         hdr.max_cpus = ncpus;
645         hdr.len = ts->len;
646
647         if (write_data_net(net_out_fd, &hdr, sizeof(hdr)))
648                 return 1;
649
650         if (write_data_net(net_out_fd, ts->buf, ts->len))
651                 return 1;
652
653         free(ts->buf);
654         free(ts);
655         return 0;
656 }
657
658 static int write_data(struct thread_information *tip, void *buf,
659                       unsigned int buf_len)
660 {
661         int ret;
662
663         if (!buf_len)
664                 return 0;
665
666         while (1) {
667                 ret = fwrite(buf, buf_len, 1, tip->ofile);
668                 if (ret == 1)
669                         break;
670
671                 if (ret < 0) {
672                         perror("write");
673                         return 1;
674                 }
675         }
676
677         if (tip->ofile_stdout)
678                 fflush(tip->ofile);
679
680         return 0;
681 }
682
683 static int flush_subbuf_file(struct thread_information *tip,
684                              struct tip_subbuf *ts)
685 {
686         unsigned int offset = 0;
687         struct blk_io_trace *t;
688         int pdu_len, events = 0;
689
690         /*
691          * surplus from last run
692          */
693         if (tip->leftover_ts) {
694                 struct tip_subbuf *prev_ts = tip->leftover_ts;
695
696                 if (prev_ts->len + ts->len > prev_ts->max_len) {
697                         prev_ts->max_len += ts->len;
698                         prev_ts->buf = realloc(prev_ts->buf, prev_ts->max_len);
699                 }
700
701                 memcpy(prev_ts->buf + prev_ts->len, ts->buf, ts->len);
702                 prev_ts->len += ts->len;
703
704                 free(ts->buf);
705                 free(ts);
706
707                 ts = prev_ts;
708                 tip->leftover_ts = NULL;
709         }
710
711         while (offset + sizeof(*t) <= ts->len) {
712                 t = ts->buf + offset;
713
714                 if (verify_trace(t)) {
715                         write_data(tip, ts->buf, offset);
716                         return -1;
717                 }
718
719                 pdu_len = t->pdu_len;
720
721                 if (offset + sizeof(*t) + pdu_len > ts->len)
722                         break;
723
724                 offset += sizeof(*t) + pdu_len;
725                 tip->events_processed++;
726                 tip->data_read += sizeof(*t) + pdu_len;
727                 events++;
728         }
729
730         if (write_data(tip, ts->buf, offset))
731                 return -1;
732
733         /*
734          * leftover bytes, save them for next time
735          */
736         if (offset != ts->len) {
737                 tip->leftover_ts = ts;
738                 ts->len -= offset;
739                 memmove(ts->buf, ts->buf + offset, ts->len);
740         } else {
741                 free(ts->buf);
742                 free(ts);
743         }
744
745         return events;
746 }
747
748 static int write_tip_events(struct thread_information *tip)
749 {
750         struct tip_subbuf *ts = subbuf_fifo_dequeue(tip);
751
752         if (ts) {
753                 if (net_mode == Net_client)
754                         return flush_subbuf_net(tip, ts);
755
756                 return flush_subbuf_file(tip, ts);
757         }
758
759         return 0;
760 }
761
762 /*
763  * scans the tips we know and writes out the subbuffers we accumulate
764  */
765 static void get_and_write_events(void)
766 {
767         struct device_information *dip;
768         struct thread_information *tip;
769         int i, j, events, ret, tips_running;
770
771         while (!is_done()) {
772                 events = 0;
773
774                 for_each_dip(dip, i) {
775                         for_each_tip(dip, tip, j) {
776                                 ret = write_tip_events(tip);
777                                 if (ret > 0)
778                                         events += ret;
779                         }
780                 }
781
782                 if (!events)
783                         usleep(10);
784         }
785
786         /*
787          * reap stored events
788          */
789         do {
790                 events = 0;
791                 tips_running = 0;
792                 for_each_dip(dip, i) {
793                         for_each_tip(dip, tip, j) {
794                                 ret = write_tip_events(tip);
795                                 if (ret > 0)
796                                         events += ret;
797                                 tips_running += !tip->exited;
798                         }
799                 }
800                 usleep(10);
801         } while (events || tips_running);
802 }
803
804 static void wait_for_threads(void)
805 {
806         /*
807          * for piped or network output, poll and fetch data for writeout.
808          * for files, we just wait around for trace threads to exit
809          */
810         if ((output_name && !strcmp(output_name, "-")) ||
811             net_mode == Net_client)
812                 get_and_write_events();
813         else {
814                 struct device_information *dip;
815                 struct thread_information *tip;
816                 int i, j, tips_running;
817
818                 do {
819                         tips_running = 0;
820                         usleep(1000);
821
822                         for_each_dip(dip, i)
823                                 for_each_tip(dip, tip, j)
824                                         tips_running += !tip->exited;
825                 } while (tips_running);
826         }
827 }
828
829 static void fill_ofname(char *dst, char *buts_name, int cpu)
830 {
831         int len = 0;
832
833         if (output_dir)
834                 len = sprintf(dst, "%s/", output_dir);
835
836         if (output_name)
837                 sprintf(dst + len, "%s.blktrace.%d", output_name, cpu);
838         else
839                 sprintf(dst + len, "%s.blktrace.%d", buts_name, cpu);
840 }
841
842 static int start_threads(struct device_information *dip)
843 {
844         struct thread_information *tip;
845         int j, pipeline = output_name && !strcmp(output_name, "-");
846         int mode, vbuf_size;
847         char op[64];
848
849         for_each_tip(dip, tip, j) {
850                 tip->cpu = j;
851                 tip->device = dip;
852                 tip->events_processed = 0;
853                 memset(&tip->fifo, 0, sizeof(tip->fifo));
854                 tip->leftover_ts = NULL;
855
856                 if (pipeline) {
857                         tip->ofile = fdopen(STDOUT_FILENO, "w");
858                         tip->ofile_stdout = 1;
859                         tip->ofile_mmap = 0;
860                         mode = _IOLBF;
861                         vbuf_size = 512;
862                 } else {
863                         fill_ofname(op, dip->buts_name, tip->cpu);
864                         tip->ofile = fopen(op, "w+");
865                         tip->ofile_stdout = 0;
866                         tip->ofile_mmap = 1;
867                         mode = _IOFBF;
868                         vbuf_size = OFILE_BUF;
869                 }
870
871                 if (tip->ofile == NULL) {
872                         perror(op);
873                         return 1;
874                 }
875
876                 tip->ofile_buffer = malloc(vbuf_size);
877                 if (setvbuf(tip->ofile, tip->ofile_buffer, mode, vbuf_size)) {
878                         perror("setvbuf");
879                         close_thread(tip);
880                         return 1;
881                 }
882
883                 if (pthread_create(&tip->thread, NULL, thread_main, tip)) {
884                         perror("pthread_create");
885                         close_thread(tip);
886                         return 1;
887                 }
888         }
889
890         return 0;
891 }
892
893 static void stop_threads(struct device_information *dip)
894 {
895         struct thread_information *tip;
896         unsigned long ret;
897         int i;
898
899         for_each_tip(dip, tip, i) {
900                 (void) pthread_join(tip->thread, (void *) &ret);
901                 close_thread(tip);
902         }
903 }
904
905 static void stop_all_threads(void)
906 {
907         struct device_information *dip;
908         int i;
909
910         for_each_dip(dip, i)
911                 stop_threads(dip);
912 }
913
914 static void stop_all_tracing(void)
915 {
916         struct device_information *dip;
917         int i;
918
919         for_each_dip(dip, i)
920                 stop_trace(dip);
921 }
922
923 static void exit_trace(int status)
924 {
925         if (!is_trace_stopped()) {
926                 trace_stopped = 1;
927                 stop_all_threads();
928                 stop_all_tracing();
929         }
930
931         exit(status);
932 }
933
934 static int resize_devices(char *path)
935 {
936         int size = (ndevs + 1) * sizeof(struct device_information);
937
938         device_information = realloc(device_information, size);
939         if (!device_information) {
940                 fprintf(stderr, "Out of memory, device %s (%d)\n", path, size);
941                 return 1;
942         }
943         device_information[ndevs].path = path;
944         ndevs++;
945         return 0;
946 }
947
948 static int open_devices(void)
949 {
950         struct device_information *dip;
951         int i;
952
953         for_each_dip(dip, i) {
954                 dip->fd = open(dip->path, O_RDONLY | O_NONBLOCK);
955                 if (dip->fd < 0) {
956                         perror(dip->path);
957                         return 1;
958                 }
959         }
960
961         return 0;
962 }
963
964 static int start_devices(void)
965 {
966         struct device_information *dip;
967         int i, j, size;
968
969         size = ncpus * sizeof(struct thread_information);
970         thread_information = malloc(size * ndevs);
971         if (!thread_information) {
972                 fprintf(stderr, "Out of memory, threads (%d)\n", size * ndevs);
973                 return 1;
974         }
975
976         for_each_dip(dip, i) {
977                 if (start_trace(dip)) {
978                         close(dip->fd);
979                         fprintf(stderr, "Failed to start trace on %s\n",
980                                 dip->path);
981                         break;
982                 }
983         }
984
985         if (i != ndevs) {
986                 __for_each_dip(dip, j, i)
987                         stop_trace(dip);
988
989                 return 1;
990         }
991
992         for_each_dip(dip, i) {
993                 dip->threads = thread_information + (i * ncpus);
994                 if (start_threads(dip)) {
995                         fprintf(stderr, "Failed to start worker threads\n");
996                         break;
997                 }
998         }
999
1000         if (i != ndevs) {
1001                 __for_each_dip(dip, j, i)
1002                         stop_threads(dip);
1003                 for_each_dip(dip, i)
1004                         stop_trace(dip);
1005
1006                 return 1;
1007         }
1008
1009         return 0;
1010 }
1011
1012 static void show_stats(void)
1013 {
1014         struct device_information *dip;
1015         struct thread_information *tip;
1016         unsigned long long events_processed, data_read;
1017         unsigned long total_drops;
1018         int i, j, no_stdout = 0;
1019
1020         if (is_stat_shown())
1021                 return;
1022
1023         if (output_name && !strcmp(output_name, "-"))
1024                 no_stdout = 1;
1025
1026         stat_shown = 1;
1027
1028         total_drops = 0;
1029         for_each_dip(dip, i) {
1030                 if (!no_stdout)
1031                         printf("Device: %s\n", dip->path);
1032                 events_processed = 0;
1033                 data_read = 0;
1034                 for_each_tip(dip, tip, j) {
1035                         if (!no_stdout)
1036                                 printf("  CPU%3d: %20lu events, %8llu KiB data\n",
1037                                         tip->cpu, tip->events_processed,
1038                                         tip->data_read >> 10);
1039                         events_processed += tip->events_processed;
1040                         data_read += tip->data_read;
1041                 }
1042                 total_drops += dip->drop_count;
1043                 if (!no_stdout)
1044                         printf("  Total:  %20llu events (dropped %lu), %8llu KiB data\n",
1045                                         events_processed, dip->drop_count,
1046                                         data_read >> 10);
1047         }
1048
1049         if (total_drops)
1050                 fprintf(stderr, "You have dropped events, consider using a larger buffer size (-b)\n");
1051 }
1052
1053 static struct device_information *net_get_dip(char *buts_name)
1054 {
1055         struct device_information *dip;
1056         int i;
1057
1058         for (i = 0; i < ndevs; i++) {
1059                 dip = &device_information[i];
1060
1061                 if (!strcmp(dip->buts_name, buts_name))
1062                         return dip;
1063         }
1064
1065         device_information = realloc(device_information, (ndevs + 1) * sizeof(*dip));
1066         dip = &device_information[ndevs];
1067         strcpy(dip->buts_name, buts_name);
1068         strcpy(dip->path, buts_name);
1069         ndevs++;
1070         dip->threads = malloc(ncpus * sizeof(struct thread_information));
1071         memset(dip->threads, 0, ncpus * sizeof(struct thread_information));
1072
1073         /*
1074          * open all files
1075          */
1076         for (i = 0; i < ncpus; i++) {
1077                 struct thread_information *tip = &dip->threads[i];
1078                 char op[64];
1079
1080                 tip->cpu = i;
1081                 tip->ofile_stdout = 0;
1082                 tip->ofile_mmap = 1;
1083                 tip->device = dip;
1084
1085                 fill_ofname(op, dip->buts_name, tip->cpu);
1086
1087                 tip->ofile = fopen(op, "w+");
1088                 if (!tip->ofile) {
1089                         perror("fopen");
1090                         return NULL;
1091                 }
1092         }
1093
1094         return dip;
1095 }
1096
1097 static struct thread_information *net_get_tip(struct blktrace_net_hdr *bnh)
1098 {
1099         struct device_information *dip;
1100
1101         ncpus = bnh->max_cpus;
1102         dip = net_get_dip(bnh->buts_name);
1103         return &dip->threads[bnh->cpu];
1104 }
1105
1106 static int net_get_header(struct blktrace_net_hdr *bnh)
1107 {
1108         int fl = fcntl(net_in_fd, F_GETFL);
1109         int bytes_left, ret;
1110         void *p = bnh;
1111
1112         fcntl(net_in_fd, F_SETFL, fl | O_NONBLOCK);
1113         bytes_left = sizeof(*bnh);
1114         while (bytes_left && !is_done()) {
1115                 ret = recv(net_in_fd, p, bytes_left, MSG_WAITALL);
1116                 if (ret < 0) {
1117                         if (errno != EAGAIN) {
1118                                 perror("recv header");
1119                                 return 1;
1120                         }
1121                         usleep(100);
1122                         continue;
1123                 } else if (!ret) {
1124                         usleep(100);
1125                         continue;
1126                 } else {
1127                         p += ret;
1128                         bytes_left -= ret;
1129                 }
1130         }
1131         fcntl(net_in_fd, F_SETFL, fl & ~O_NONBLOCK);
1132         return 0;
1133 }
1134
1135 static int net_server_loop(void)
1136 {
1137         struct thread_information *tip;
1138         struct blktrace_net_hdr bnh;
1139
1140         if (net_get_header(&bnh))
1141                 return 1;
1142
1143         if (data_is_native == -1 && check_data_endianness(bnh.magic)) {
1144                 fprintf(stderr, "server: received data is bad\n");
1145                 return 1;
1146         }
1147
1148         if (!data_is_native) {
1149                 bnh.cpu = be32_to_cpu(bnh.cpu);
1150                 bnh.len = be32_to_cpu(bnh.len);
1151         }
1152
1153         tip = net_get_tip(&bnh);
1154         if (!tip)
1155                 return 1;
1156
1157         if (mmap_subbuf(tip, bnh.len))
1158                 return 1;
1159
1160         return 0;
1161 }
1162
1163 /*
1164  * Start here when we are in server mode - just fetch data from the network
1165  * and dump to files
1166  */
1167 static int net_server(void)
1168 {
1169         struct sockaddr_in addr;
1170         socklen_t socklen;
1171         int fd, opt, i, j;
1172
1173         fd = socket(AF_INET, SOCK_STREAM, 0);
1174         if (fd < 0) {
1175                 perror("server: socket");
1176                 return 1;
1177         }
1178
1179         opt = 1;
1180         if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
1181                 perror("setsockopt");
1182                 return 1;
1183         }
1184
1185         memset(&addr, 0, sizeof(addr));
1186         addr.sin_family = AF_INET;
1187         addr.sin_addr.s_addr = htonl(INADDR_ANY);
1188         addr.sin_port = htons(net_port);
1189
1190         if (bind(fd, (struct sockaddr *) &addr, sizeof(addr)) < 0) {
1191                 perror("bind");
1192                 return 1;
1193         }
1194
1195         if (listen(fd, 1) < 0) {
1196                 perror("listen");
1197                 return 1;
1198         }
1199
1200         printf("blktrace: waiting for incoming connection...\n");
1201
1202         socklen = sizeof(addr);
1203         net_in_fd = accept(fd, (struct sockaddr *) &addr, &socklen);
1204         if (net_in_fd < 0) {
1205                 perror("accept");
1206                 return 1;
1207         }
1208
1209         signal(SIGINT, handle_sigint);
1210         signal(SIGHUP, handle_sigint);
1211         signal(SIGTERM, handle_sigint);
1212         signal(SIGALRM, handle_sigint);
1213
1214         printf("blktrace: connected!\n");
1215
1216         while (!is_done()) {
1217                 if (net_server_loop())
1218                         break;
1219         }
1220
1221         for (i = 0; i < ndevs; i++) {
1222                 struct device_information *dip = &device_information[i];
1223
1224                 for (j = 0; j < ncpus; j++)
1225                         tip_ftrunc_final(&dip->threads[j]);
1226         }
1227
1228         show_stats();
1229         return 0;
1230 }
1231
1232 /*
1233  * Setup outgoing network connection where we will transmit data
1234  */
1235 static int net_setup_client(void)
1236 {
1237         struct sockaddr_in addr;
1238         int fd;
1239
1240         fd = socket(AF_INET, SOCK_STREAM, 0);
1241         if (fd < 0) {
1242                 perror("client: socket");
1243                 return 1;
1244         }
1245
1246         memset(&addr, 0, sizeof(addr));
1247         addr.sin_family = AF_INET;
1248         addr.sin_port = htons(net_port);
1249
1250         if (inet_aton(hostname, &addr.sin_addr) != 1) {
1251                 struct hostent *hent = gethostbyname(hostname);
1252                 if (!hent) {
1253                         perror("gethostbyname");
1254                         return 1;
1255                 }
1256
1257                 memcpy(&addr.sin_addr, hent->h_addr, 4);
1258                 strcpy(hostname, hent->h_name);
1259         }
1260
1261         printf("blktrace: connecting to %s\n", hostname);
1262
1263         if (connect(fd, (struct sockaddr *) &addr, sizeof(addr)) < 0) {
1264                 perror("client: connect");
1265                 return 1;
1266         }
1267
1268         printf("blktrace: connected!\n");
1269         net_out_fd = fd;
1270         return 0;
1271 }
1272
1273 static char usage_str[] = \
1274         "-d <dev> [ -r relay path ] [ -o <output> ] [-k ] [ -w time ]\n" \
1275         "[ -a action ] [ -A action mask ] [ -v ]\n\n" \
1276         "\t-d Use specified device. May also be given last after options\n" \
1277         "\t-r Path to mounted relayfs, defaults to /relay\n" \
1278         "\t-o File(s) to send output to\n" \
1279         "\t-D Directory to prepend to output file names\n" \
1280         "\t-k Kill a running trace\n" \
1281         "\t-w Stop after defined time, in seconds\n" \
1282         "\t-a Only trace specified actions. See documentation\n" \
1283         "\t-A Give trace mask as a single value. See documentation\n" \
1284         "\t-b Sub buffer size in KiB\n" \
1285         "\t-n Number of sub buffers\n" \
1286         "\t-v Print program version info\n\n";
1287
1288 static void show_usage(char *program)
1289 {
1290         fprintf(stderr, "Usage: %s %s %s",program, blktrace_version, usage_str);
1291 }
1292
1293 int main(int argc, char *argv[])
1294 {
1295         static char default_relay_path[] = "/relay";
1296         struct statfs st;
1297         int i, c;
1298         int stop_watch = 0;
1299         int act_mask_tmp = 0;
1300
1301         while ((c = getopt_long(argc, argv, S_OPTS, l_opts, NULL)) >= 0) {
1302                 switch (c) {
1303                 case 'a':
1304                         i = find_mask_map(optarg);
1305                         if (i < 0) {
1306                                 fprintf(stderr,"Invalid action mask %s\n",
1307                                         optarg);
1308                                 return 1;
1309                         }
1310                         act_mask_tmp |= i;
1311                         break;
1312
1313                 case 'A':
1314                         if ((sscanf(optarg, "%x", &i) != 1) || 
1315                                                         !valid_act_opt(i)) {
1316                                 fprintf(stderr,
1317                                         "Invalid set action mask %s/0x%x\n",
1318                                         optarg, i);
1319                                 return 1;
1320                         }
1321                         act_mask_tmp = i;
1322                         break;
1323
1324                 case 'd':
1325                         if (resize_devices(optarg) != 0)
1326                                 return 1;
1327                         break;
1328
1329                 case 'r':
1330                         relay_path = optarg;
1331                         break;
1332
1333                 case 'o':
1334                         output_name = optarg;
1335                         break;
1336                 case 'k':
1337                         kill_running_trace = 1;
1338                         break;
1339                 case 'w':
1340                         stop_watch = atoi(optarg);
1341                         if (stop_watch <= 0) {
1342                                 fprintf(stderr,
1343                                         "Invalid stopwatch value (%d secs)\n",
1344                                         stop_watch);
1345                                 return 1;
1346                         }
1347                         break;
1348                 case 'V':
1349                         printf("%s version %s\n", argv[0], blktrace_version);
1350                         return 0;
1351                 case 'b':
1352                         buf_size = strtoul(optarg, NULL, 10);
1353                         if (buf_size <= 0 || buf_size > 16*1024) {
1354                                 fprintf(stderr,
1355                                         "Invalid buffer size (%lu)\n",buf_size);
1356                                 return 1;
1357                         }
1358                         buf_size <<= 10;
1359                         break;
1360                 case 'n':
1361                         buf_nr = strtoul(optarg, NULL, 10);
1362                         if (buf_nr <= 0) {
1363                                 fprintf(stderr,
1364                                         "Invalid buffer nr (%lu)\n", buf_nr);
1365                                 return 1;
1366                         }
1367                         break;
1368                 case 'D':
1369                         output_dir = optarg;
1370                         break;
1371                 case 'h':
1372                         net_mode = Net_client;
1373                         strcpy(hostname, optarg);
1374                         break;
1375                 case 'l':
1376                         net_mode = Net_server;
1377                         break;
1378                 case 'p':
1379                         net_port = atoi(optarg);
1380                         break;
1381                 default:
1382                         show_usage(argv[0]);
1383                         return 1;
1384                 }
1385         }
1386
1387         setlocale(LC_NUMERIC, "en_US");
1388
1389         page_size = getpagesize();
1390
1391         if (net_mode == Net_server)
1392                 return net_server();
1393
1394         while (optind < argc) {
1395                 if (resize_devices(argv[optind++]) != 0)
1396                         return 1;
1397         }
1398
1399         if (ndevs == 0) {
1400                 show_usage(argv[0]);
1401                 return 1;
1402         }
1403
1404         if (!relay_path)
1405                 relay_path = default_relay_path;
1406
1407         if (act_mask_tmp != 0)
1408                 act_mask = act_mask_tmp;
1409
1410         if (statfs(relay_path, &st) < 0) {
1411                 perror("statfs");
1412                 fprintf(stderr,"%s does not appear to be a valid path\n",
1413                         relay_path);
1414                 return 1;
1415         } else if (st.f_type != (long) RELAYFS_TYPE) {
1416                 fprintf(stderr,"%s does not appear to be a relay filesystem\n",
1417                         relay_path);
1418                 return 1;
1419         }
1420
1421         if (open_devices() != 0)
1422                 return 1;
1423
1424         if (kill_running_trace) {
1425                 stop_all_traces();
1426                 return 0;
1427         }
1428
1429         ncpus = sysconf(_SC_NPROCESSORS_ONLN);
1430         if (ncpus < 0) {
1431                 fprintf(stderr, "sysconf(_SC_NPROCESSORS_ONLN) failed\n");
1432                 return 1;
1433         }
1434
1435         signal(SIGINT, handle_sigint);
1436         signal(SIGHUP, handle_sigint);
1437         signal(SIGTERM, handle_sigint);
1438         signal(SIGALRM, handle_sigint);
1439
1440         if (net_mode == Net_client && net_setup_client())
1441                 return 1;
1442
1443         if (start_devices() != 0)
1444                 return 1;
1445
1446         atexit(stop_all_tracing);
1447
1448         if (stop_watch)
1449                 alarm(stop_watch);
1450
1451         wait_for_threads();
1452
1453         if (!is_trace_stopped()) {
1454                 trace_stopped = 1;
1455                 stop_all_threads();
1456                 stop_all_traces();
1457         }
1458
1459         show_stats();
1460
1461         return 0;
1462 }
1463