Reworked blktrace master/thread interface
authorAlan D. Brunelle <alan.brunelle@hp.com>
Thu, 12 Feb 2009 16:13:20 +0000 (11:13 -0500)
committerAlan D. Brunelle <alan.brunelle@hp.com>
Thu, 12 Feb 2009 16:13:20 +0000 (11:13 -0500)
Allows parallel initializations.

Signed-off-by: Alan D. Brunelle <alan.brunelle@hp.com>
blktrace.c

index d27ab0578c3aa170157eaf6ab84cc85d161c5a98..95b573f06f6c904ca61a0006e9fcab5fb58d50e0 100644 (file)
@@ -71,6 +71,12 @@ enum {
        Net_client,
 };
 
+enum thread_status {
+       Th_running,
+       Th_leaving,
+       Th_error
+};
+
 /*
  * Generic stats collected: nevents can be _roughly_ estimated by data_read
  * (discounting pdu...)
@@ -159,10 +165,8 @@ struct tracer {
        struct io_info *ios;
        struct pollfd *pfds;
        pthread_t thread;
-       pthread_mutex_t mutex;
-       pthread_cond_t cond;
        int cpu, nios;
-       volatile int running, status, is_done;
+       volatile int status, is_done;
 };
 
 /*
@@ -285,7 +289,6 @@ static int ndevs;
 static volatile int done;
 static FILE *pfp;
 static int piped_output;
-static int ntracers;
 
 /*
  * tracer threads add entries, the main thread takes them off and processes
@@ -296,12 +299,14 @@ static pthread_mutex_t dp_mutex = PTHREAD_MUTEX_INITIALIZER;
 static volatile int dp_entries;
 
 /*
- * This synchronizes the starting of trace gathering amongst all tracer
- * threads.
+ * These synchronize master / thread interactions.
  */
-static pthread_cond_t ub_cond = PTHREAD_COND_INITIALIZER;
-static pthread_mutex_t ub_mutex = PTHREAD_MUTEX_INITIALIZER;
-static volatile int unblock_tracers;
+static pthread_cond_t mt_cond = PTHREAD_COND_INITIALIZER;
+static pthread_mutex_t mt_mutex = PTHREAD_MUTEX_INITIALIZER;
+static volatile int nthreads_running;
+static volatile int nthreads_leaving;
+static volatile int nthreads_error;
+static volatile int tracers_run;
 
 /*
  * network cmd line params
@@ -486,6 +491,87 @@ static void show_usage(char *prog)
        fprintf(stderr, "Usage: %s %s %s", prog, blktrace_version, usage_str);
 }
 
+/*
+ * Create a timespec 'msec' milliseconds into the future
+ */
+static inline void make_timespec(struct timespec *tsp, long delta_msec)
+{
+       struct timeval now;
+
+       gettimeofday(&now, NULL);
+       tsp->tv_sec = now.tv_sec;
+       tsp->tv_nsec = 1000L * now.tv_usec;
+
+       tsp->tv_nsec += (delta_msec * 1000000L);
+       if (tsp->tv_nsec > 1000000000L) {
+               long secs = tsp->tv_nsec / 1000000000L;
+
+               tsp->tv_sec += secs;
+               tsp->tv_nsec -= (secs * 1000000000L);
+       }
+}
+
+/*
+ * Add a timer to ensure wait ends
+ */
+static void t_pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
+{
+       struct timespec ts;
+
+       make_timespec(&ts, 50);
+       pthread_cond_timedwait(cond, mutex, &ts);
+}
+
+static void unblock_tracers(void)
+{
+       pthread_mutex_lock(&mt_mutex);
+       tracers_run = 1;
+       pthread_cond_broadcast(&mt_cond);
+       pthread_mutex_unlock(&mt_mutex);
+}
+
+static void tracer_wait_unblock(struct tracer *tp)
+{
+       pthread_mutex_lock(&mt_mutex);
+       while (!tp->is_done && !tracers_run)
+               pthread_cond_wait(&mt_cond, &mt_mutex);
+       pthread_mutex_unlock(&mt_mutex);
+}
+
+static void tracer_signal_ready(struct tracer *tp,
+                               enum thread_status th_status,
+                               int status)
+{
+       pthread_mutex_lock(&mt_mutex);
+       tp->status = status;
+
+       if (th_status == Th_running)
+               nthreads_running++;
+       else if (th_status == Th_error)
+               nthreads_error++;
+       else
+               nthreads_leaving++;
+
+       pthread_cond_signal(&mt_cond);
+       pthread_mutex_unlock(&mt_mutex);
+}
+
+static void wait_tracers_ready(int ncpus_started)
+{
+       pthread_mutex_lock(&mt_mutex);
+       while ((nthreads_running + nthreads_error) < ncpus_started)
+               t_pthread_cond_wait(&mt_cond, &mt_mutex);
+       pthread_mutex_unlock(&mt_mutex);
+}
+
+static void wait_tracers_leaving(void)
+{
+       pthread_mutex_lock(&mt_mutex);
+       while (nthreads_leaving < nthreads_running)
+               t_pthread_cond_wait(&mt_cond, &mt_mutex);
+       pthread_mutex_unlock(&mt_mutex);
+}
+
 static void init_mmap_info(struct mmap_info *mip)
 {
        mip->buf_size = buf_size;
@@ -525,26 +611,6 @@ static int lock_on_cpu(int cpu)
        return 0;
 }
 
-/*
- * Create a timespec 'msec' milliseconds into the future
- */
-static inline void make_timespec(struct timespec *tsp, long delta_msec)
-{
-       struct timeval now;
-
-       gettimeofday(&now, NULL);
-       tsp->tv_sec = now.tv_sec;
-       tsp->tv_nsec = 1000L * now.tv_usec;
-
-       tsp->tv_nsec += (delta_msec * 1000000L);
-       if (tsp->tv_nsec > 1000000000L) {
-               long secs = tsp->tv_nsec / 1000000000L;
-
-               tsp->tv_sec += secs;
-               tsp->tv_nsec -= (secs * 1000000000L);
-       }
-}
-
 static int increase_limit(int resource, rlim_t increase)
 {
        struct rlimit rlim;
@@ -1279,12 +1345,8 @@ static void process_trace_bufs(void)
 {
        while (!done) {
                pthread_mutex_lock(&dp_mutex);
-               while (!done && dp_entries == 0) {
-                       struct timespec ts;
-
-                       make_timespec(&ts, 50);
-                       pthread_cond_timedwait(&dp_cond, &dp_mutex, &ts);
-               }
+               while (!done && dp_entries == 0)
+                       t_pthread_cond_wait(&dp_cond, &dp_mutex);
                pthread_mutex_unlock(&dp_mutex);
 
                __process_trace_bufs();
@@ -1487,6 +1549,50 @@ static int iop_open(struct io_info *iop, int cpu)
        return 0;
 }
 
+static void close_iop(struct io_info *iop)
+{
+       struct mmap_info *mip = &iop->mmap_info;
+
+       if (mip->fs_buf)
+               munmap(mip->fs_buf, mip->fs_buf_len);
+
+       if (!piped_output) {
+               if (ftruncate(fileno(iop->ofp), mip->fs_size) < 0) {
+                       fprintf(stderr,
+                               "Ignoring err: ftruncate(%s): %d/%s\n",
+                               iop->ofn, errno, strerror(errno));
+               }
+       }
+
+       if (iop->ofp)
+               fclose(iop->ofp);
+       if (iop->obuf)
+               free(iop->obuf);
+}
+
+static void close_ios(struct tracer *tp)
+{
+       while (tp->nios > 0) {
+               struct io_info *iop = &tp->ios[--tp->nios];
+
+               iop->dpp->drops = get_drops(iop->dpp);
+               if (iop->ifd >= 0)
+                       close(iop->ifd);
+
+               if (iop->ofp)
+                       close_iop(iop);
+               else if (iop->ofd >= 0) {
+                       struct devpath *dpp = iop->dpp;
+
+                       net_send_close(iop->ofd, dpp->buts_name, dpp->drops);
+                       net_close_connection(&iop->ofd);
+               }
+       }
+
+       free(tp->ios);
+       free(tp->pfds);
+}
+
 static int open_ios(struct tracer *tp)
 {
        struct pollfd *pfd;
@@ -1549,53 +1655,10 @@ static int open_ios(struct tracer *tp)
 
 err:
        close(iop->ifd);        /* tp->nios _not_ bumped */
+       close_ios(tp);
        return 1;
 }
 
-static void close_iop(struct io_info *iop)
-{
-       struct mmap_info *mip = &iop->mmap_info;
-
-       if (mip->fs_buf)
-               munmap(mip->fs_buf, mip->fs_buf_len);
-
-       if (!piped_output) {
-               if (ftruncate(fileno(iop->ofp), mip->fs_size) < 0) {
-                       fprintf(stderr,
-                               "Ignoring err: ftruncate(%s): %d/%s\n",
-                               iop->ofn, errno, strerror(errno));
-               }
-       }
-
-       if (iop->ofp)
-               fclose(iop->ofp);
-       if (iop->obuf)
-               free(iop->obuf);
-}
-
-static void close_ios(struct tracer *tp)
-{
-       while (tp->nios > 0) {
-               struct io_info *iop = &tp->ios[--tp->nios];
-
-               iop->dpp->drops = get_drops(iop->dpp);
-               if (iop->ifd >= 0)
-                       close(iop->ifd);
-
-               if (iop->ofp)
-                       close_iop(iop);
-               else if (iop->ofd >= 0) {
-                       struct devpath *dpp = iop->dpp;
-
-                       net_send_close(iop->ofd, dpp->buts_name, dpp->drops);
-                       net_close_connection(&iop->ofd);
-               }
-       }
-
-       free(tp->ios);
-       free(tp->pfds);
-}
-
 static int setup_mmap(int fd, unsigned int maxlen, struct mmap_info *mip)
 {
        if (mip->fs_off + maxlen > mip->fs_buf_len) {
@@ -1674,9 +1737,7 @@ static int handle_pfds_file(struct tracer *tp, int nevs, int force_read)
 
 static void *thread_main(void *arg)
 {
-       int ret, ndone;
-       int to_val;
-
+       int ret, ndone, to_val;
        struct tracer *tp = arg;
 
        ret = lock_on_cpu(tp->cpu);
@@ -1684,25 +1745,17 @@ static void *thread_main(void *arg)
                goto err;
 
        ret = open_ios(tp);
-       if (ret) {
-               close_ios(tp);
+       if (ret)
                goto err;
-       }
-
-       pthread_mutex_lock(&tp->mutex);
-       tp->running = 1;
-       pthread_cond_signal(&tp->cond);
-       pthread_mutex_unlock(&tp->mutex);
 
        if (piped_output)
                to_val = 50;            /* Frequent partial handles */
        else
                to_val = 500;           /* 1/2 second intervals */
 
-       pthread_mutex_lock(&ub_mutex);
-       while (!tp->is_done && !unblock_tracers)
-               pthread_cond_wait(&ub_cond, &ub_mutex);
-       pthread_mutex_unlock(&ub_mutex);
+
+       tracer_signal_ready(tp, Th_running, 0);
+       tracer_wait_unblock(tp);
 
        while (!tp->is_done) {
                ndone = poll(tp->pfds, ndevs, to_val);
@@ -1718,15 +1771,12 @@ static void *thread_main(void *arg)
         */
        while (handle_pfds(tp, ndevs, 1) > 0)
                ;
-
        close_ios(tp);
+       tracer_signal_ready(tp, Th_leaving, 0);
+       return NULL;
 
 err:
-       pthread_mutex_lock(&tp->mutex);
-       tp->running = 0;
-       tp->status = ret;
-       pthread_cond_signal(&tp->cond);
-       pthread_mutex_unlock(&tp->mutex);
+       tracer_signal_ready(tp, Th_error, ret);
        return NULL;
 }
 
@@ -1738,46 +1788,38 @@ static int start_tracer(int cpu)
        memset(tp, 0, sizeof(*tp));
 
        INIT_LIST_HEAD(&tp->head);
-       pthread_mutex_init(&tp->mutex, NULL);
-       pthread_cond_init(&tp->cond, NULL);
-       tp->running = 0;
        tp->status = 0;
        tp->cpu = cpu;
 
        if (pthread_create(&tp->thread, NULL, thread_main, tp)) {
                fprintf(stderr, "FAILED to start thread on CPU %d: %d/%s\n",
                        cpu, errno, strerror(errno));
-               goto err;
-       }
-
-       pthread_mutex_lock(&tp->mutex);
-       while (!tp->running && (tp->status == 0))
-               pthread_cond_wait(&tp->cond, &tp->mutex);
-       pthread_mutex_unlock(&tp->mutex);
-
-       if (tp->status == 0) {
-               list_add_tail(&tp->head, &tracers);
-               return 0;
+               free(tp);
+               return 1;
        }
 
-       fprintf(stderr, "FAILED to start thread on CPU %d\n", cpu);
-
-err:
-       pthread_mutex_destroy(&tp->mutex);
-       pthread_cond_destroy(&tp->cond);
-       free(tp);
-       return 1;
+       list_add_tail(&tp->head, &tracers);
+       return 0;
 }
 
-static int start_tracers(void)
+static void start_tracers(void)
 {
        int cpu;
+       struct list_head *p;
 
        for (cpu = 0; cpu < ncpus; cpu++)
                if (start_tracer(cpu))
                        break;
 
-       return cpu;
+       wait_tracers_ready(cpu);
+
+       __list_for_each(p, &tracers) {
+               struct tracer *tp = list_entry(p, struct tracer, head);
+               if (tp->status)
+                       fprintf(stderr,
+                               "FAILED to start thread on CPU %d: %d/%s\n",
+                               tp->cpu, tp->status, strerror(tp->status));
+       }
 }
 
 static void stop_tracers(void)
@@ -1811,7 +1853,6 @@ static void del_tracers(void)
                list_del(&tp->head);
                free(tp);
        }
-       ntracers = 0;
 }
 
 static void wait_tracers(void)
@@ -1821,15 +1862,12 @@ static void wait_tracers(void)
        if (use_tracer_devpaths())
                process_trace_bufs();
 
+       wait_tracers_leaving();
+
        __list_for_each(p, &tracers) {
                int ret;
                struct tracer *tp = list_entry(p, struct tracer, head);
 
-               pthread_mutex_lock(&tp->mutex);
-               while (tp->running)
-                       pthread_cond_wait(&tp->cond, &tp->mutex);
-               pthread_mutex_unlock(&tp->mutex);
-
                ret = pthread_join(tp->thread, NULL);
                if (ret)
                        fprintf(stderr, "Thread join %d failed %d\n",
@@ -2555,18 +2593,9 @@ int main(int argc, char *argv[])
                                handle_list = handle_list_net;
                }
 
-               ntracers = start_tracers();
-               if (ntracers != ncpus)
-                       stop_tracers();
-               else {
-                       /*
-                        * Let tracers go
-                        */
-                       pthread_mutex_lock(&ub_mutex);
-                       unblock_tracers = 1;
-                       pthread_cond_broadcast(&ub_cond);
-                       pthread_mutex_unlock(&ub_mutex);
-
+               start_tracers();
+               if (nthreads_running == ncpus) {
+                       unblock_tracers();
                        start_buts();
 
                        if (net_mode == Net_client)
@@ -2574,10 +2603,11 @@ int main(int argc, char *argv[])
 
                        if (stop_watch)
                                alarm(stop_watch);
-               }
+               } else
+                       stop_tracers();
 
                wait_tracers();
-               if (ntracers == ncpus)
+               if (nthreads_running == ncpus)
                        show_stats(&devpaths);
 
                if (net_client_use_send())