[PATCH] blktrace: completely rewrite the event pull/pushing loops
authorJens Axboe <axboe@suse.de>
Fri, 27 Jan 2006 13:39:35 +0000 (14:39 +0100)
committerJens Axboe <axboe@suse.de>
Fri, 27 Jan 2006 13:39:35 +0000 (14:39 +0100)
blktrace.c

index f9d87eb267c89f491b9cede7deba88384bc71fb5..318568451fb0bf575746b44bc0d14ace743c4971 100644 (file)
@@ -39,6 +39,7 @@
 #include <assert.h>
 
 #include "blktrace.h"
+#include "list.h"
 
 static char blktrace_version[] = "0.99";
 
@@ -129,6 +130,13 @@ static struct option l_opts[] = {
        }
 };
 
+struct tip_subbuf {
+       struct list_head list;
+       void *buf;
+       int len;
+       int max_len;
+};
+
 struct thread_information {
        int cpu;
        pthread_t thread;
@@ -140,13 +148,20 @@ struct thread_information {
        unsigned long fd_max_size;
        char fn[MAXPATHLEN + 64];
 
-       pthread_mutex_t *fd_lock;
        FILE *ofile;
        char *ofile_buffer;
-       int ofile_flush;
+       int ofile_stdout;
 
        unsigned long events_processed;
        struct device_information *device;
+
+       int exited;
+
+       pthread_mutex_t lock;
+       struct list_head subbuf_list;
+       void *leftover_ts;
+       int leftover_ts_len;
+       int leftover_ts_max;
 };
 
 struct device_information {
@@ -181,8 +196,6 @@ static volatile int trace_stopped;
 #define is_stat_shown()        (*(volatile int *)(&stat_shown))
 static volatile int stat_shown;
 
-static pthread_mutex_t stdout_mutex = PTHREAD_MUTEX_INITIALIZER;
-
 static void exit_trace(int status);
 
 #define dip_tracing(dip)       (*(volatile int *)(&(dip)->trace_started))
@@ -192,8 +205,8 @@ static void exit_trace(int status);
        for (__i = 0, __d = device_information; __i < __e; __i++, __d++)
 
 #define for_each_dip(__d, __i) __for_each_dip(__d, __i, ndevs)
-#define for_each_tip(__d, __t, __i)    \
-       for (__i = 0, __t = (__d)->threads; __i < ncpus; __i++, __t++)
+#define for_each_tip(__d, __t, __j)    \
+       for (__j = 0, __t = (__d)->threads; __j < ncpus; __j++, __t++)
 
 static int get_dropped_count(const char *buts_name)
 {
@@ -273,126 +286,129 @@ static void wait_for_data(struct thread_information *tip)
 {
        struct pollfd pfd = { .fd = tip->fd, .events = POLLIN };
 
-       poll(&pfd, 1, 10);
+       do {
+               poll(&pfd, 1, 10);
+               if (pfd.revents & POLLIN)
+                       break;
+               if (tip->ofile_stdout)
+                       break;
+       } while (!is_done());
 }
 
-static int __read_data(struct thread_information *tip, void *buf, int len,
-                      int block)
+static int read_data(struct thread_information *tip, void *buf, int len)
 {
        int ret = 0;
 
-       while (!is_done()) {
-               ret = read(tip->fd, buf, len);
-               if (ret > 0)
-                       break;
-               else if (!ret) {
-                       if (!block)
-                               break;
+       do {
+               wait_for_data(tip);
 
-                       wait_for_data(tip);
-               } else {
+               ret = read(tip->fd, buf, len);
+               if (!ret)
+                       continue;
+               else if (ret > 0)
+                       return ret;
+               else {
                        if (errno != EAGAIN) {
                                perror(tip->fn);
                                fprintf(stderr,"Thread %d failed read of %s\n",
                                        tip->cpu, tip->fn);
                                break;
                        }
-                       if (!block) {
-                               ret = 0;
-                               break;
-                       }
-
-                       wait_for_data(tip);
+                       continue;
                }
-       }
+       } while (!is_done());
 
        return ret;
 }
 
-#define can_grow_ring(tip)     ((tip)->fd_max_size < RING_MAX_NR * buf_size * buf_nr)
-
-static int resize_ringbuffer(struct thread_information *tip)
+static inline void tip_fd_unlock(struct thread_information *tip)
 {
-       if (!can_grow_ring(tip))
-               return 1;
-
-       tip->fd_buf = realloc(tip->fd_buf, 2 * tip->fd_max_size);
-
-       /*
-        * if the ring currently wraps, copy range over
-        */
-       if (tip->fd_off + tip->fd_size > tip->fd_max_size) {
-               unsigned long wrap_size = tip->fd_size - (tip->fd_max_size - tip->fd_off);
-               memmove(tip->fd_buf + tip->fd_max_size, tip->fd_buf, wrap_size);
-       }
+       pthread_mutex_unlock(&tip->lock);
+}
 
-       tip->fd_max_size <<= 1;
-       return 0;
+static inline void tip_fd_lock(struct thread_information *tip)
+{
+       pthread_mutex_lock(&tip->lock);
 }
 
-static int __refill_ringbuffer(struct thread_information *tip, int len,
-                              int block)
+static int get_subbuf(struct thread_information *tip)
 {
-       unsigned long off;
-       int ret;
+       struct tip_subbuf *ts;
+       int ts_size;
 
-       off = (tip->fd_size + tip->fd_off) & (tip->fd_max_size - 1);
-       if (off + len > tip->fd_max_size)
-               len = tip->fd_max_size - off;
+       /*
+        * live tracing should get a lower count to make it more "realtime"
+        */
+       if (tip->ofile_stdout)
+               ts_size = 1024;
+       else
+               ts_size = buf_size;
 
-       assert(len > 0);
+       ts = malloc(sizeof(*ts));
+       ts->buf = malloc(ts_size);
+       ts->max_len = ts_size;
 
-       ret = __read_data(tip, tip->fd_buf + off, len, block);
-       if (ret < 0)
-               return -1;
+       ts->len = read_data(tip, ts->buf, ts_size);
+       if (ts->len > 0) {
+               tip_fd_lock(tip);
+               list_add_tail(&ts->list, &tip->subbuf_list);
+               tip_fd_unlock(tip);
+               return 0;
+       }
 
-       tip->fd_size += ret;
-       return ret;
+       free(ts->buf);
+       free(ts);
+       return -1;
 }
 
-/*
- * keep filling ring until we get a short read
- */
-static void refill_ringbuffer(struct thread_information *tip, int block)
+static void close_thread(struct thread_information *tip)
 {
-       int len = buf_size;
-       int ret;
-
-       do {
-               if (len + tip->fd_size > tip->fd_max_size)
-                       resize_ringbuffer(tip);
+       if (tip->fd != -1)
+               close(tip->fd);
+       if (tip->ofile)
+               fclose(tip->ofile);
+       if (tip->ofile_buffer)
+               free(tip->ofile_buffer);
+       if (tip->fd_buf)
+               free(tip->fd_buf);
 
-               ret = __refill_ringbuffer(tip, len, block);
-       } while ((ret == len) && !is_done());
+       tip->fd = -1;
+       tip->ofile = NULL;
+       tip->ofile_buffer = NULL;
+       tip->fd_buf = NULL;
 }
 
-static int read_data(struct thread_information *tip, void *buf,
-                    unsigned int len)
+static void *thread_main(void *arg)
 {
-       unsigned int start_size, end_size;
+       struct thread_information *tip = arg;
+       pid_t pid = getpid();
+       cpu_set_t cpu_mask;
 
-       refill_ringbuffer(tip, len > tip->fd_size);
+       CPU_ZERO(&cpu_mask);
+       CPU_SET((tip->cpu), &cpu_mask);
 
-       if (len > tip->fd_size)
-               return -1;
+       if (sched_setaffinity(pid, sizeof(cpu_mask), &cpu_mask) == -1) {
+               perror("sched_setaffinity");
+               exit_trace(1);
+       }
 
-       /*
-        * see if we wrap the ring
-        */
-       start_size = len;
-       end_size = 0;
-       if (len > (tip->fd_max_size - tip->fd_off)) {
-               start_size = tip->fd_max_size - tip->fd_off;
-               end_size = len - start_size;
+       snprintf(tip->fn, sizeof(tip->fn), "%s/block/%s/trace%d",
+                       relay_path, tip->device->buts_name, tip->cpu);
+       tip->fd = open(tip->fn, O_RDONLY);
+       if (tip->fd < 0) {
+               perror(tip->fn);
+               fprintf(stderr,"Thread %d failed open of %s\n", tip->cpu,
+                       tip->fn);
+               exit_trace(1);
        }
 
-       memcpy(buf, tip->fd_buf + tip->fd_off, start_size);
-       if (end_size)
-               memcpy(buf + start_size, tip->fd_buf, end_size);
+       for (;;) {
+               if (get_subbuf(tip))
+                       break;
+       }
 
-       tip->fd_off = (tip->fd_off + len) & (tip->fd_max_size - 1);
-       tip->fd_size -= len;
-       return 0;
+       tip->exited = 1;
+       return NULL;
 }
 
 static int write_data(struct thread_information *tip,
@@ -411,214 +427,130 @@ static int write_data(struct thread_information *tip,
                }
        }
 
-       if (tip->ofile_flush)
+       if (tip->ofile_stdout)
                fflush(tip->ofile);
 
        return 0;
 }
 
-static void *extract_data(struct thread_information *tip, int nb)
+static int flush_subbuf(struct thread_information *tip, struct tip_subbuf *ts)
 {
-       unsigned char *buf;
+       unsigned int offset = 0;
+       struct blk_io_trace *t;
+       int pdu_len, events = 0;
 
-       buf = malloc(nb);
-       if (!read_data(tip, buf, nb))
-               return buf;
+       /*
+        * surplus from last run, a little ugly...
+        */
+       if (tip->leftover_ts_len) {
+               if (ts->len + tip->leftover_ts_len > ts->max_len)
+                       ts->buf = realloc(ts->buf, ts->len + tip->leftover_ts_len);
 
-       free(buf);
-       return NULL;
-}
+               memmove(ts->buf + tip->leftover_ts_len, ts->buf, ts->len);
+               memcpy(ts->buf, tip->leftover_ts, tip->leftover_ts_len);
+               ts->len += tip->leftover_ts_len;
+               tip->leftover_ts_len = 0;
+       }
 
-/*
- * trace may start inside 'bit' or may need to be gotten further on
- */
-static int get_event_slow(struct thread_information *tip,
-                         struct blk_io_trace *bit)
-{
-       const int inc = sizeof(__u32);
-       struct blk_io_trace foo;
-       unsigned int offset;
-       void *p;
+       while (offset + sizeof(*t) <= ts->len) {
+               t = ts->buf + offset;
 
-       /*
-        * check if trace is inside
-        */
-       offset = 0;
-       p = bit;
-       while (offset < sizeof(*bit)) {
-               p += inc;
-               offset += inc;
+               if (verify_trace(t))
+                       return -1;
 
-               memcpy(&foo, p, inc);
+               pdu_len = t->pdu_len;
 
-               if (CHECK_MAGIC(&foo))
+               if (offset + sizeof(*t) + pdu_len > ts->len)
                        break;
-       }
 
-       /*
-        * part trace found inside, read the rest
-        */
-       if (offset < sizeof(*bit)) {
-               int good_bytes = sizeof(*bit) - offset;
+               trace_to_be(t);
 
-               memmove(bit, p, good_bytes);
-               p = (void *) bit + good_bytes;
+               if (write_data(tip, t, sizeof(*t) + pdu_len))
+                       return -1;
 
-               return read_data(tip, p, offset);
+               offset += sizeof(*t) + pdu_len;
+               tip->events_processed++;
+               events++;
        }
 
        /*
-        * nothing found, keep looking for start of trace
-        */
-       do {
-               if (read_data(tip, bit, sizeof(bit->magic)))
-                       return -1;
-       } while (!CHECK_MAGIC(bit));
-
-       /*
-        * now get the rest of it
+        * leftover bytes, save them for next time
         */
-       p = &bit->sequence;
-       if (read_data(tip, p, sizeof(*bit) - inc))
-               return -1;
+       if (offset != ts->len) {
+               int surplus = ts->len - offset;
 
-       return 0;
-}
-
-/*
- * Sometimes relayfs screws us a little, if an event crosses a sub buffer
- * boundary. So keep looking forward in the trace data until an event
- * is found
- */
-static int get_event(struct thread_information *tip, struct blk_io_trace *bit)
-{
-       /*
-        * optimize for the common fast case, a full trace read that
-        * succeeds
-        */
-       if (read_data(tip, bit, sizeof(*bit)))
-               return -1;
+               t = ts->buf + offset;
+               if (surplus > tip->leftover_ts_max) {
+                       tip->leftover_ts = realloc(tip->leftover_ts, surplus);
+                       tip->leftover_ts_max = surplus;
+               }
 
-       if (CHECK_MAGIC(bit))
-               return 0;
+               memcpy(tip->leftover_ts, ts->buf + offset, surplus);
+               tip->leftover_ts_len = surplus;
+       }
 
-       /*
-        * ok that didn't work, the event may start somewhere inside the
-        * trace itself
-        */
-       return get_event_slow(tip, bit);
+       free(ts->buf);
+       free(ts);
+       return events;
 }
 
-static inline void tip_fd_unlock(struct thread_information *tip)
+static int write_tip_events(struct thread_information *tip)
 {
-       if (tip->fd_lock)
-               pthread_mutex_unlock(tip->fd_lock);
-}
+       struct tip_subbuf *ts = NULL;
 
-static inline void tip_fd_lock(struct thread_information *tip)
-{
-       if (tip->fd_lock)
-               pthread_mutex_lock(tip->fd_lock);
-}
+       tip_fd_lock(tip);
+       if (!list_empty(&tip->subbuf_list)) {
+               ts = list_entry(tip->subbuf_list.next, struct tip_subbuf, list);
+               list_del(&ts->list);
+       }
+       tip_fd_unlock(tip);
 
-static void close_thread(struct thread_information *tip)
-{
-       if (tip->fd != -1)
-               close(tip->fd);
-       if (tip->ofile)
-               fclose(tip->ofile);
-       if (tip->ofile_buffer)
-               free(tip->ofile_buffer);
-       if (tip->fd_buf)
-               free(tip->fd_buf);
+       if (ts)
+               return flush_subbuf(tip, ts);
 
-       tip->fd = -1;
-       tip->ofile = NULL;
-       tip->ofile_buffer = NULL;
-       tip->fd_buf = NULL;
+       return 0;
 }
 
-static void *extract(void *arg)
+/*
+ * scans the tips we know and writes out the subbuffers we accumulate
+ */
+static void get_and_write_events(void)
 {
-       struct thread_information *tip = arg;
-       int pdu_len;
-       char *pdu_data;
-       struct blk_io_trace t;
-       pid_t pid = getpid();
-       cpu_set_t cpu_mask;
+       struct device_information *dip;
+       struct thread_information *tip;
+       int i, j, events, ret, all_exited;
 
-       CPU_ZERO(&cpu_mask);
-       CPU_SET((tip->cpu), &cpu_mask);
+       while (!is_done()) {
+               events = 0;
 
-       if (sched_setaffinity(pid, sizeof(cpu_mask), &cpu_mask) == -1) {
-               perror("sched_setaffinity");
-               exit_trace(1);
-       }
+               for_each_dip(dip, i) {
+                       for_each_tip(dip, tip, j) {
+                               ret = write_tip_events(tip);
+                               if (ret > 0)
+                                       events += ret;
+                       }
+               }
 
-       snprintf(tip->fn, sizeof(tip->fn), "%s/block/%s/trace%d",
-                       relay_path, tip->device->buts_name, tip->cpu);
-       tip->fd = open(tip->fn, O_RDONLY | O_NONBLOCK);
-       if (tip->fd < 0) {
-               perror(tip->fn);
-               fprintf(stderr,"Thread %d failed open of %s\n", tip->cpu,
-                       tip->fn);
-               exit_trace(1);
+               if (!events)
+                       usleep(10);
        }
 
        /*
-        * start with a ringbuffer that is twice the size of the kernel side
+        * reap stored events
         */
-       tip->fd_max_size = buf_size * buf_nr * RING_INIT_NR;
-       tip->fd_buf = malloc(tip->fd_max_size);
-       tip->fd_off = 0;
-       tip->fd_size = 0;
-
-       pdu_data = NULL;
-       while (1) {
-               if (get_event(tip, &t))
-                       break;
-
-               if (verify_trace(&t))
-                       break;
-
-               pdu_len = t.pdu_len;
-
-               trace_to_be(&t);
-
-               if (pdu_len) {
-                       pdu_data = extract_data(tip, pdu_len);
-                       if (!pdu_data)
-                               break;
-               }
-
-               /*
-                * now we have both trace and payload, get a lock on the
-                * output descriptor and send it off
-                */
-               tip_fd_lock(tip);
-
-               if (write_data(tip, &t, sizeof(t))) {
-                       tip_fd_unlock(tip);
-                       break;
-               }
-
-               if (pdu_data && write_data(tip, pdu_data, pdu_len)) {
-                       tip_fd_unlock(tip);
-                       break;
-               }
-
-               tip_fd_unlock(tip);
-
-               if (pdu_data) {
-                       free(pdu_data);
-                       pdu_data = NULL;
+       do {
+               events = 0;
+               all_exited = 0;
+               for_each_dip(dip, i) {
+                       for_each_tip(dip, tip, j) {
+                               ret = write_tip_events(tip);
+                               if (ret > 0)
+                                       events += ret;
+                               all_exited += !tip->exited;
+                       }
                }
-
-               tip->events_processed++;
-       }
-
-       close_thread(tip);
-       return NULL;
+               usleep(10);
+       } while (events && !all_exited);
 }
 
 static int start_threads(struct device_information *dip)
@@ -631,13 +563,15 @@ static int start_threads(struct device_information *dip)
        for_each_tip(dip, tip, j) {
                tip->cpu = j;
                tip->device = dip;
-               tip->fd_lock = NULL;
                tip->events_processed = 0;
+               INIT_LIST_HEAD(&tip->subbuf_list);
+               tip->leftover_ts_len = 0;
+               tip->leftover_ts_max = 0;
+               tip->leftover_ts = NULL;
 
                if (pipeline) {
                        tip->ofile = fdopen(STDOUT_FILENO, "w");
-                       tip->fd_lock = &stdout_mutex;
-                       tip->ofile_flush = 1;
+                       tip->ofile_stdout = 1;
                        mode = _IOLBF;
                        vbuf_size = 512;
                } else {
@@ -654,7 +588,7 @@ static int start_threads(struct device_information *dip)
                                        dip->buts_name, tip->cpu);
                        }
                        tip->ofile = fopen(op, "w");
-                       tip->ofile_flush = 0;
+                       tip->ofile_stdout = 0;
                        mode = _IOFBF;
                        vbuf_size = OFILE_BUF;
                }
@@ -671,7 +605,7 @@ static int start_threads(struct device_information *dip)
                        return 1;
                }
 
-               if (pthread_create(&tip->thread, NULL, extract, tip)) {
+               if (pthread_create(&tip->thread, NULL, thread_main, tip)) {
                        perror("pthread_create");
                        close_thread(tip);
                        return 1;
@@ -687,8 +621,10 @@ static void stop_threads(struct device_information *dip)
        unsigned long ret;
        int i;
 
-       for_each_tip(dip, tip, i)
+       for_each_tip(dip, tip, i) {
                (void) pthread_join(tip->thread, (void *) &ret);
+               close_thread(tip);
+       }
 }
 
 static void stop_all_threads(void)
@@ -800,33 +736,30 @@ static int start_devices(void)
 
 static void show_stats(void)
 {
-       int i, j, no_stdout = 0;
        struct device_information *dip;
        struct thread_information *tip;
        unsigned long long events_processed;
        unsigned long total_drops;
+       int i, j;
 
        if (is_stat_shown())
                return;
 
        stat_shown = 1;
 
-       if (output_name && !strcmp(output_name, "-"))
-               no_stdout = 1;
-
        total_drops = 0;
        for_each_dip(dip, i) {
-               if (!no_stdout)
+               if (!tip->ofile_stdout)
                        printf("Device: %s\n", dip->path);
                events_processed = 0;
                for_each_tip(dip, tip, j) {
-                       if (!no_stdout)
+                       if (!tip->ofile_stdout)
                                printf("  CPU%3d: %20ld events\n",
                                        tip->cpu, tip->events_processed);
                        events_processed += tip->events_processed;
                }
                total_drops += dip->drop_count;
-               if (!no_stdout)
+               if (!tip->ofile_stdout)
                        printf("  Total:  %20lld events (dropped %lu)\n",
                                        events_processed, dip->drop_count);
        }
@@ -857,13 +790,6 @@ static void show_usage(char *program)
 static void handle_sigint(__attribute__((__unused__)) int sig)
 {
        done = 1;
-       if (!is_trace_stopped()) {
-               trace_stopped = 1;
-               stop_all_threads();
-               stop_all_traces();
-       }
-
-       show_stats();
 }
 
 int main(int argc, char *argv[])
@@ -1006,8 +932,7 @@ int main(int argc, char *argv[])
        if (stop_watch)
                alarm(stop_watch);
 
-       while (!is_done())
-               sleep(1);
+       get_and_write_events();
 
        if (!is_trace_stopped()) {
                trace_stopped = 1;