+ list_replace_init(&hd->head, &list);
+ pthread_mutex_unlock(&hd->mutex);
+
+ handled += handle_list(hd, &list);
+ }
+ }
+
+ if (handled)
+ decr_entries(handled);
+}
+
+static void process_trace_bufs(void)
+{
+ while (wait_empty_entries())
+ __process_trace_bufs();
+}
+
+static void clean_trace_bufs(void)
+{
+ /*
+ * No mutex needed here: we're only reading from the lists,
+ * tracers are done
+ */
+ while (dp_entries)
+ __process_trace_bufs();
+}
+
+static inline void read_err(int cpu, char *ifn)
+{
+ if (errno != EAGAIN)
+ fprintf(stderr, "Thread %d failed read of %s: %d/%s\n",
+ cpu, ifn, errno, strerror(errno));
+}
+
+static int net_sendfile(struct io_info *iop)
+{
+ int ret;
+
+ ret = sendfile(iop->ofd, iop->ifd, NULL, iop->ready);
+ if (ret < 0) {
+ perror("sendfile");
+ return 1;
+ } else if (ret < (int)iop->ready) {
+ fprintf(stderr, "short sendfile send (%d of %d)\n",
+ ret, iop->ready);
+ return 1;
+ }
+
+ return 0;
+}
+
+static inline int net_sendfile_data(struct tracer *tp, struct io_info *iop)
+{
+ struct devpath *dpp = iop->dpp;
+
+ if (net_send_header(iop->ofd, tp->cpu, dpp->buts_name, iop->ready))
+ return 1;
+ return net_sendfile(iop);
+}
+
+static int fill_ofname(char *dst, int dstlen, char *subdir, char *buts_name,
+ int cpu)
+{
+ int len;
+ struct stat sb;
+
+ if (output_dir)
+ len = snprintf(dst, dstlen, "%s/", output_dir);
+ else
+ len = snprintf(dst, dstlen, "./");
+
+ if (subdir)
+ len += snprintf(dst + len, dstlen - len, "%s", subdir);
+
+ if (stat(dst, &sb) < 0) {
+ if (errno != ENOENT) {
+ fprintf(stderr,
+ "Destination dir %s stat failed: %d/%s\n",
+ dst, errno, strerror(errno));
+ return 1;
+ }
+ /*
+ * There is no synchronization between multiple threads
+ * trying to create the directory at once. It's harmless
+ * to let them try, so just detect the problem and move on.
+ */
+ if (mkdir(dst, 0755) < 0 && errno != EEXIST) {
+ fprintf(stderr,
+ "Destination dir %s can't be made: %d/%s\n",
+ dst, errno, strerror(errno));
+ return 1;
+ }
+ }
+
+ if (output_name)
+ snprintf(dst + len, dstlen - len, "%s.blktrace.%d",
+ output_name, cpu);
+ else
+ snprintf(dst + len, dstlen - len, "%s.blktrace.%d",
+ buts_name, cpu);
+
+ return 0;
+}
+
+static int set_vbuf(struct io_info *iop, int mode, size_t size)
+{
+ iop->obuf = malloc(size);
+ if (setvbuf(iop->ofp, iop->obuf, mode, size) < 0) {
+ fprintf(stderr, "setvbuf(%s, %d) failed: %d/%s\n",
+ iop->dpp->path, (int)size, errno,
+ strerror(errno));
+ free(iop->obuf);
+ return 1;
+ }
+
+ return 0;
+}
+
+static int iop_open(struct io_info *iop, int cpu)
+{
+ char hostdir[MAXPATHLEN + 64];
+
+ iop->ofd = -1;
+ if (net_mode == Net_server) {
+ struct cl_conn *nc = iop->nc;
+ int len;
+
+ len = snprintf(hostdir, sizeof(hostdir), "%s-",
+ nc->ch->hostname);
+ len += strftime(hostdir + len, sizeof(hostdir) - len, "%F-%T/",
+ gmtime(&iop->dpp->cl_connect_time));
+ } else {
+ hostdir[0] = 0;
+ }
+
+ if (fill_ofname(iop->ofn, sizeof(iop->ofn), hostdir,
+ iop->dpp->buts_name, cpu))
+ return 1;
+
+ iop->ofp = my_fopen(iop->ofn, "w+");
+ if (iop->ofp == NULL) {
+ fprintf(stderr, "Open output file %s failed: %d/%s\n",
+ iop->ofn, errno, strerror(errno));
+ return 1;
+ }
+
+ if (set_vbuf(iop, _IOLBF, FILE_VBUF_SIZE)) {
+ fprintf(stderr, "set_vbuf for file %s failed: %d/%s\n",
+ iop->ofn, errno, strerror(errno));
+ fclose(iop->ofp);
+ return 1;
+ }
+
+ iop->ofd = fileno(iop->ofp);
+ return 0;
+}
+
+static void close_iop(struct io_info *iop)
+{
+ struct mmap_info *mip = &iop->mmap_info;
+
+ if (mip->fs_buf)
+ munmap(mip->fs_buf, mip->fs_buf_len);
+
+ if (!piped_output) {
+ if (ftruncate(fileno(iop->ofp), mip->fs_size) < 0) {
+ fprintf(stderr,
+ "Ignoring err: ftruncate(%s): %d/%s\n",
+ iop->ofn, errno, strerror(errno));
+ }
+ }
+
+ if (iop->ofp)
+ fclose(iop->ofp);
+ if (iop->obuf)
+ free(iop->obuf);
+}
+
+static void close_ios(struct tracer *tp)
+{
+ while (tp->nios > 0) {
+ struct io_info *iop = &tp->ios[--tp->nios];
+
+ iop->dpp->drops = get_drops(iop->dpp);
+ if (iop->ifd >= 0)
+ close(iop->ifd);
+
+ if (iop->ofp)
+ close_iop(iop);
+ else if (iop->ofd >= 0) {
+ struct devpath *dpp = iop->dpp;
+
+ net_send_close(iop->ofd, dpp->buts_name, dpp->drops);
+ net_close_connection(&iop->ofd);
+ }
+ }
+
+ free(tp->ios);
+ free(tp->pfds);
+}
+
+static int open_ios(struct tracer *tp)
+{
+ struct pollfd *pfd;
+ struct io_info *iop;
+ struct list_head *p;
+
+ tp->ios = calloc(ndevs, sizeof(struct io_info));
+ memset(tp->ios, 0, ndevs * sizeof(struct io_info));
+
+ tp->pfds = calloc(ndevs, sizeof(struct pollfd));
+ memset(tp->pfds, 0, ndevs * sizeof(struct pollfd));
+
+ tp->nios = 0;
+ iop = tp->ios;
+ pfd = tp->pfds;
+ __list_for_each(p, &devpaths) {
+ struct devpath *dpp = list_entry(p, struct devpath, head);
+
+ iop->dpp = dpp;
+ iop->ofd = -1;
+ snprintf(iop->ifn, sizeof(iop->ifn), "%s/block/%s/trace%d",
+ debugfs_path, dpp->buts_name, tp->cpu);
+
+ iop->ifd = my_open(iop->ifn, O_RDONLY | O_NONBLOCK);
+ if (iop->ifd < 0) {
+ fprintf(stderr, "Thread %d failed open %s: %d/%s\n",
+ tp->cpu, iop->ifn, errno, strerror(errno));
+ return 1;
+ }
+
+ init_mmap_info(&iop->mmap_info);
+
+ pfd->fd = iop->ifd;
+ pfd->events = POLLIN;
+
+ if (piped_output)
+ ;
+ else if (net_client_use_sendfile()) {
+ iop->ofd = net_setup_client();
+ if (iop->ofd < 0)
+ goto err;
+ net_send_open(iop->ofd, tp->cpu, dpp->buts_name);
+ } else if (net_mode == Net_none) {
+ if (iop_open(iop, tp->cpu))
+ goto err;
+ } else {
+ /*
+ * This ensures that the server knows about all
+ * connections & devices before _any_ closes
+ */
+ net_send_open(cl_fds[tp->cpu], tp->cpu, dpp->buts_name);
+ }
+
+ pfd++;
+ iop++;
+ tp->nios++;
+ }
+
+ return 0;
+
+err:
+ close(iop->ifd); /* tp->nios _not_ bumped */
+ close_ios(tp);
+ return 1;
+}
+
+static int handle_pfds_file(struct tracer *tp, int nevs, int force_read)
+{
+ struct mmap_info *mip;
+ int i, ret, nentries = 0;
+ struct pollfd *pfd = tp->pfds;
+ struct io_info *iop = tp->ios;
+
+ for (i = 0; nevs > 0 && i < ndevs; i++, pfd++, iop++) {
+ if (pfd->revents & POLLIN || force_read) {
+ mip = &iop->mmap_info;
+
+ ret = setup_mmap(iop->ofd, buf_size, mip, tp);
+ if (ret < 0) {
+ pfd->events = 0;
+ break;
+ }
+
+ ret = read(iop->ifd, mip->fs_buf + mip->fs_off,
+ buf_size);
+ if (ret > 0) {
+ pdc_dr_update(iop->dpp, tp->cpu, ret);
+ mip->fs_size += ret;
+ mip->fs_off += ret;
+ nentries++;
+ } else if (ret == 0) {
+ /*
+ * Short reads after we're done stop us
+ * from trying reads.
+ */
+ if (tp->is_done)
+ clear_events(pfd);
+ } else {
+ read_err(tp->cpu, iop->ifn);
+ if (errno != EAGAIN || tp->is_done)
+ clear_events(pfd);
+ }
+ nevs--;
+ }
+ }
+
+ return nentries;
+}
+
+static int handle_pfds_netclient(struct tracer *tp, int nevs, int force_read)
+{
+ struct stat sb;
+ int i, nentries = 0;
+ struct pollfd *pfd = tp->pfds;
+ struct io_info *iop = tp->ios;
+
+ for (i = 0; i < ndevs; i++, pfd++, iop++) {
+ if (pfd->revents & POLLIN || force_read) {
+ if (fstat(iop->ifd, &sb) < 0) {
+ perror(iop->ifn);
+ pfd->events = 0;
+ } else if (sb.st_size > (off_t)iop->data_queued) {
+ iop->ready = sb.st_size - iop->data_queued;
+ iop->data_queued = sb.st_size;
+
+ if (!net_sendfile_data(tp, iop)) {
+ pdc_dr_update(iop->dpp, tp->cpu,
+ iop->ready);
+ nentries++;
+ } else
+ clear_events(pfd);
+ }
+ if (--nevs == 0)
+ break;
+ }
+ }
+
+ if (nentries)
+ incr_entries(nentries);
+
+ return nentries;
+}
+
+static int handle_pfds_entries(struct tracer *tp, int nevs, int force_read)
+{
+ int i, nentries = 0;
+ struct trace_buf *tbp;
+ struct pollfd *pfd = tp->pfds;
+ struct io_info *iop = tp->ios;
+
+ tbp = alloc_trace_buf(tp->cpu, buf_size);
+ for (i = 0; i < ndevs; i++, pfd++, iop++) {
+ if (pfd->revents & POLLIN || force_read) {
+ tbp->len = read(iop->ifd, tbp->buf, buf_size);
+ if (tbp->len > 0) {
+ pdc_dr_update(iop->dpp, tp->cpu, tbp->len);
+ add_trace_buf(iop->dpp, tp->cpu, &tbp);
+ nentries++;
+ } else if (tbp->len == 0) {
+ /*
+ * Short reads after we're done stop us
+ * from trying reads.
+ */
+ if (tp->is_done)
+ clear_events(pfd);
+ } else {
+ read_err(tp->cpu, iop->ifn);
+ if (errno != EAGAIN || tp->is_done)
+ clear_events(pfd);
+ }
+ if (!piped_output && --nevs == 0)
+ break;
+ }
+ }
+ free(tbp);
+
+ if (nentries)
+ incr_entries(nentries);
+
+ return nentries;
+}
+
+static void *thread_main(void *arg)
+{
+ int ret, ndone, to_val;
+ struct tracer *tp = arg;
+
+ ret = lock_on_cpu(tp->cpu);
+ if (ret)
+ goto err;
+
+ ret = open_ios(tp);
+ if (ret)
+ goto err;
+
+ if (piped_output)
+ to_val = 50; /* Frequent partial handles */
+ else
+ to_val = 500; /* 1/2 second intervals */
+
+
+ tracer_signal_ready(tp, Th_running, 0);
+ tracer_wait_unblock(tp);
+
+ while (!tp->is_done) {
+ ndone = poll(tp->pfds, ndevs, to_val);
+ if (ndone || piped_output)
+ (void)handle_pfds(tp, ndone, piped_output);
+ else if (ndone < 0 && errno != EINTR)
+ fprintf(stderr, "Thread %d poll failed: %d/%s\n",
+ tp->cpu, errno, strerror(errno));
+ }
+
+ /*
+ * Trace is stopped, pull data until we get a short read
+ */
+ while (handle_pfds(tp, ndevs, 1) > 0)
+ ;
+
+ close_ios(tp);
+ tracer_signal_ready(tp, Th_leaving, 0);
+ return NULL;
+
+err:
+ tracer_signal_ready(tp, Th_error, ret);
+ return NULL;
+}
+
+static int start_tracer(int cpu)
+{
+ struct tracer *tp;
+
+ tp = malloc(sizeof(*tp));
+ memset(tp, 0, sizeof(*tp));
+
+ INIT_LIST_HEAD(&tp->head);
+ tp->status = 0;
+ tp->cpu = cpu;
+
+ if (pthread_create(&tp->thread, NULL, thread_main, tp)) {
+ fprintf(stderr, "FAILED to start thread on CPU %d: %d/%s\n",
+ cpu, errno, strerror(errno));
+ free(tp);
+ return 1;
+ }
+
+ list_add_tail(&tp->head, &tracers);
+ return 0;
+}
+
+static int create_output_files(int cpu)
+{
+ char fname[MAXPATHLEN + 64];
+ struct list_head *p;
+ FILE *f;
+
+ __list_for_each(p, &devpaths) {
+ struct devpath *dpp = list_entry(p, struct devpath, head);
+
+ if (fill_ofname(fname, sizeof(fname), NULL, dpp->buts_name,
+ cpu))
+ return 1;
+ f = my_fopen(fname, "w+");
+ if (!f)
+ return 1;
+ fclose(f);
+ }
+ return 0;
+}
+
+static void start_tracers(void)
+{
+ int cpu, started = 0;
+ struct list_head *p;
+ size_t alloc_size = CPU_ALLOC_SIZE(max_cpus);
+
+ for (cpu = 0; cpu < max_cpus; cpu++) {
+ if (!CPU_ISSET_S(cpu, alloc_size, online_cpus)) {
+ /*
+ * Create fake empty output files so that other tools
+ * like blkparse don't have to bother with sparse CPU
+ * number space.
+ */
+ if (create_output_files(cpu))
+ break;
+ continue;
+ }
+ if (start_tracer(cpu))
+ break;
+ started++;
+ }
+
+ wait_tracers_ready(started);
+
+ __list_for_each(p, &tracers) {
+ struct tracer *tp = list_entry(p, struct tracer, head);
+ if (tp->status)
+ fprintf(stderr,
+ "FAILED to start thread on CPU %d: %d/%s\n",
+ tp->cpu, tp->status, strerror(tp->status));
+ }
+}
+
+static void stop_tracers(void)
+{
+ struct list_head *p;
+
+ /*
+ * Stop the tracing - makes the tracer threads clean up quicker.
+ */
+ __list_for_each(p, &devpaths) {
+ struct devpath *dpp = list_entry(p, struct devpath, head);
+ (void)ioctl(dpp->fd, BLKTRACESTOP);
+ }
+
+ /*
+ * Tell each tracer to quit
+ */
+ __list_for_each(p, &tracers) {
+ struct tracer *tp = list_entry(p, struct tracer, head);
+ tp->is_done = 1;
+ }
+ pthread_cond_broadcast(&mt_cond);
+}
+
+static void del_tracers(void)
+{
+ struct list_head *p, *q;
+
+ list_for_each_safe(p, q, &tracers) {
+ struct tracer *tp = list_entry(p, struct tracer, head);
+
+ list_del(&tp->head);
+ free(tp);
+ }
+}
+
+static void wait_tracers(void)
+{
+ struct list_head *p;
+
+ if (use_tracer_devpaths())
+ process_trace_bufs();
+
+ wait_tracers_leaving();
+
+ __list_for_each(p, &tracers) {
+ int ret;
+ struct tracer *tp = list_entry(p, struct tracer, head);
+
+ ret = pthread_join(tp->thread, NULL);
+ if (ret)
+ fprintf(stderr, "Thread join %d failed %d\n",
+ tp->cpu, ret);
+ }
+
+ if (use_tracer_devpaths())
+ clean_trace_bufs();
+
+ get_all_drops();
+}
+
+static void exit_tracing(void)
+{
+ signal(SIGINT, SIG_IGN);
+ signal(SIGHUP, SIG_IGN);
+ signal(SIGTERM, SIG_IGN);
+ signal(SIGALRM, SIG_IGN);
+
+ stop_tracers();
+ wait_tracers();
+ del_tracers();
+ rel_devpaths();
+}
+
+static void handle_sigint(__attribute__((__unused__)) int sig)
+{
+ done = 1;
+ stop_tracers();
+}
+
+static void show_stats(struct list_head *devpaths)
+{
+ FILE *ofp;
+ struct list_head *p;
+ unsigned long long nevents, data_read;
+ unsigned long long total_drops = 0;
+ unsigned long long total_events = 0;
+
+ if (piped_output)
+ ofp = my_fopen("/dev/null", "w");
+ else
+ ofp = stdout;
+
+ __list_for_each(p, devpaths) {
+ int cpu;
+ struct pdc_stats *sp;
+ struct devpath *dpp = list_entry(p, struct devpath, head);
+
+ if (net_mode == Net_server)
+ printf("server: end of run for %s:%s\n",
+ dpp->ch->hostname, dpp->buts_name);
+
+ data_read = 0;
+ nevents = 0;
+
+ fprintf(ofp, "=== %s ===\n", dpp->buts_name);
+ for (cpu = 0, sp = dpp->stats; cpu < dpp->ncpus; cpu++, sp++) {
+ /*
+ * Estimate events if not known...
+ */
+ if (sp->nevents == 0) {
+ sp->nevents = sp->data_read /
+ sizeof(struct blk_io_trace);
+ }
+
+ fprintf(ofp,
+ " CPU%3d: %20llu events, %8llu KiB data\n",
+ cpu, sp->nevents, (sp->data_read + 1023) >> 10);
+
+ data_read += sp->data_read;
+ nevents += sp->nevents;
+ }
+
+ fprintf(ofp, " Total: %20llu events (dropped %llu),"
+ " %8llu KiB data\n", nevents,
+ dpp->drops, (data_read + 1024) >> 10);
+
+ total_drops += dpp->drops;
+ total_events += (nevents + dpp->drops);
+ }
+
+ fflush(ofp);
+ if (piped_output)
+ fclose(ofp);
+
+ if (total_drops) {
+ double drops_ratio = 1.0;
+
+ if (total_events)
+ drops_ratio = (double)total_drops/(double)total_events;
+
+ fprintf(stderr, "\nYou have %llu (%5.1lf%%) dropped events\n"
+ "Consider using a larger buffer size (-b) "
+ "and/or more buffers (-n)\n",
+ total_drops, 100.0 * drops_ratio);
+ }
+}
+
+static int handle_args(int argc, char *argv[])