Fixed the sorting and parsing of traces with payloads
[blktrace.git] / blktrace.c
1 /*
2  * block queue tracing application
3  *
4  * TODO (in no particular order):
5  *      - Add ability to specify capture mask instead of logging all events
6  *      - Add option for relayfs mount point
7  *
8  */
9 #include <pthread.h>
10 #include <sys/types.h>
11 #include <sys/stat.h>
12 #include <unistd.h>
13 #include <locale.h>
14 #include <signal.h>
15 #include <fcntl.h>
16 #include <string.h>
17 #include <sys/ioctl.h>
18 #include <stdio.h>
19 #include <stdlib.h>
20 #include <sched.h>
21
22 #include "blktrace.h"
23
24 #define BUF_SIZE        (128 *1024)
25 #define BUF_NR          (4)
26
27 struct thread_information {
28         int cpu;
29         pthread_t thread;
30         unsigned long events_processed;
31 };
32
33 static char relay_path[] = "/relay/";
34
35 #define is_done()       (*(volatile int *)(&done))
36 static volatile int done;
37
38 static int devfd, ncpus;
39 static struct thread_information *thread_information;
40 static char *buts_name_p;
41
42 int start_trace(char *dev)
43 {
44         struct blk_user_trace_setup buts;
45
46         devfd = open(dev, O_RDONLY);
47         if (devfd < 0) {
48                 perror(dev);
49                 return 1;
50         }
51
52         memset(&buts, sizeof(buts), 0);
53         buts.buf_size = BUF_SIZE;
54         buts.buf_nr = BUF_NR;
55
56         printf("Starting trace on %s\n", dev);
57         if (ioctl(devfd, BLKSTARTTRACE, &buts) < 0) {
58                 perror("BLKSTARTTRACE");
59                 return 1;
60         }
61
62         buts_name_p = strdup(buts.name);
63         return 0;
64 }
65
66 void stop_trace(void)
67 {
68         if (ioctl(devfd, BLKSTOPTRACE) < 0)
69                 perror("BLKSTOPTRACE");
70
71         close(devfd);
72 }
73
74 inline int verify_trace(struct blk_io_trace *t)
75 {
76         if (!CHECK_MAGIC(t)) {
77                 fprintf(stderr, "bad trace magic %x\n", t->magic);
78                 return 1;
79         }
80         if ((t->magic & 0xff) != SUPPORTED_VERSION) {
81                 fprintf(stderr, "unsupported trace version %x\n", 
82                         t->magic & 0xff);
83                 return 1;
84         }
85
86         return 0;
87 }
88
89 void extract_data(int cpu, char *ifn, int ifd, char *ofn, int ofd, int nb)
90 {
91         int ret, bytes_left;
92         unsigned char buf[nb], *p;
93
94         p = buf;
95         bytes_left = nb;
96         while (bytes_left > 0) {
97                 ret = read(ifd, p, bytes_left);
98                 if (ret < 0) {
99                         perror(ifn);
100                         fprintf(stderr, "Thread %d extract_data %s failed\n",
101                                 cpu, ifn);
102                         exit(1);
103                 } else if (ret == 0)
104                         usleep(1000);
105                 else {
106                         p += ret;
107                         bytes_left -= ret;
108                 }
109         }
110
111         ret = write(ofd, buf, nb);
112         if (ret != nb) {
113                 perror(ofn);
114                 fprintf(stderr,"Thread %d extract_data %s failed\n", cpu, ofn);
115                 exit(1);
116         }
117 }
118
119 void *extract(void *arg)
120 {
121         struct thread_information *tip = arg;
122         int tracefd, ret, ofd, dfd;
123         char ip[64], op[64], dp[64];
124         struct blk_io_trace t;
125         pid_t pid = getpid();
126         cpu_set_t cpu_mask;
127
128         CPU_ZERO(&cpu_mask);
129         CPU_SET(tip->cpu, &cpu_mask);
130
131         if (sched_setaffinity(pid, sizeof(cpu_mask), &cpu_mask) == -1) {
132                 perror("sched_setaffinity");
133                 exit(1);
134         }
135
136         sprintf(op, "%s_out.%d", buts_name_p, tip->cpu);
137         ofd = open(op, O_CREAT|O_TRUNC|O_WRONLY, 0644);
138         if (ofd < 0) {
139                 perror(op);
140                 fprintf(stderr,"Thread %d failed creat of %s\n", tip->cpu, op);
141                 exit(1);
142         }
143
144         sprintf(dp, "%s_dat.%d", buts_name_p, tip->cpu);
145         dfd = open(dp, O_CREAT|O_TRUNC|O_WRONLY, 0644);
146         if (dfd < 0) {
147                 perror(dp);
148                 fprintf(stderr,"Thread %d failed creat of %s\n", tip->cpu, dp);
149                 exit(1);
150         }
151
152         sprintf(ip, "%s%s%d", relay_path, buts_name_p, tip->cpu);
153         tracefd = open(ip, O_RDONLY);
154         if (tracefd < 0) {
155                 perror(ip);
156                 fprintf(stderr,"Thread %d failed open of %s\n", tip->cpu, ip);
157                 exit(1);
158         }
159
160         while (!is_done()) {
161                 ret = read(tracefd, &t, sizeof(t));
162                 if (ret != sizeof(t)) {
163                         if (ret < 0) {
164                                 perror(ip);
165                                 fprintf(stderr,"Thread %d failed read of %s\n",
166                                         tip->cpu, ip);
167                                 exit(1);
168                         } else if (ret > 0) {
169                                 fprintf(stderr,"Thread %d misread %s %d,%d\n",
170                                         tip->cpu, ip, ret, (int)sizeof(t));
171                                 exit(1);
172                         } else {
173                                 usleep(10000);
174                                 continue;
175                         }
176                 }
177
178                 if (verify_trace(&t))
179                         exit(1);
180
181                 switch (t.action & 0xffff) {
182                 case __BLK_TA_ISSUE:
183                 case __BLK_TA_COMPLETE:
184                         if (!t.pdu_len)
185                                 break;
186                         else if (t.pdu_len > 64) {
187                                 fprintf(stderr, 
188                                         "Thread %d Payload too large %d\n", 
189                                         tip->cpu, t.pdu_len);
190                                 exit(1);
191                         }
192                         extract_data(tip->cpu, ip, tracefd, dp, dfd, t.pdu_len);
193                         break;
194                 }
195
196                 /* version is verified, stuff with CPU number now */
197                 t.magic = tip->cpu;
198                 ret = write(ofd, &t, sizeof(t));
199                 if (ret < 0) {
200                         perror(op);
201                         fprintf(stderr,"Thread %d failed write of %s\n", 
202                                 tip->cpu, op);
203                         exit(1);
204                 }
205
206                 tip->events_processed++;
207         }
208
209         return NULL;
210 }
211
212 int start_threads(void)
213 {
214         struct thread_information *tip;
215         int i;
216
217         ncpus = sysconf(_SC_NPROCESSORS_ONLN);
218         if (ncpus < 0) {
219                 fprintf(stderr, "sysconf(_SC_NPROCESSORS_ONLN) failed\n");
220                 return 1;
221         }
222         printf("Processors online: %d\n", ncpus);
223
224         thread_information = malloc(ncpus * sizeof(struct thread_information));
225         for (i = 0, tip = thread_information; i < ncpus; i++, tip++) {
226                 tip->cpu = i;
227                 tip->events_processed = 0;
228
229                 if (pthread_create(&tip->thread, NULL, extract, tip)) {
230                         perror( "pthread_create");
231                         return 0;
232                 }
233         }
234
235         return ncpus;
236 }
237
238 void show_stats(void)
239 {
240         int i;
241         struct thread_information *tip;
242         unsigned long events_processed = 0;
243
244         for (i = 0, tip = thread_information; i < ncpus; i++, tip++) {
245                 printf("CPU%3d: %20ld events\n",
246                        tip->cpu, tip->events_processed);
247                 events_processed += tip->events_processed;
248         }
249
250         printf("Total:  %20ld events\n", events_processed);
251 }
252
253 void handle_sigint(int sig)
254 {
255         printf("exiting on signal %d\n", sig);
256         done = 1;
257 }
258
259 int main(int argc, char *argv[])
260 {
261         struct thread_information *tip;
262         struct stat st;
263         int i;
264
265         if (argc < 2) {
266                 fprintf(stderr, "Usage: %s <dev>\n", argv[0]);
267                 return 1;
268         }
269
270         if (stat(relay_path, &st) < 0) {
271                 fprintf(stderr,"%s does not appear to be mounted\n", 
272                         relay_path);
273                 return 2;
274         }
275
276         if (start_trace(argv[1])) {
277                 fprintf(stderr, "Failed to start trace\n");
278                 stop_trace();
279                 return 3;
280         }
281
282         setlocale(LC_NUMERIC, "en_US");
283
284         i = start_threads();
285         if (!i) {
286                 fprintf(stderr, "Failed to start worker threads\n");
287                 stop_trace();
288                 return 4;
289         }
290
291         printf("Threads started  : %d\n", i);
292
293         signal(SIGINT, handle_sigint);
294         signal(SIGHUP, handle_sigint);
295         signal(SIGTERM, handle_sigint);
296
297         while (!is_done())
298                 sleep(1);
299
300         for (i = 0, tip = thread_information; i < ncpus; i++, tip++) {
301                 int ret;
302
303                 if (pthread_join(tip->thread, (void *) &ret))
304                         perror("thread_join");
305         }
306
307         stop_trace();
308         close(devfd);
309         show_stats();
310
311         return 0;
312 }
313