void *buf;
unsigned int len;
unsigned int max_len;
- off_t offset;
};
#define FIFO_SIZE (1024) /* should be plenty big! */
void *fd_buf;
char fn[MAXPATHLEN + 64];
+ struct in_addr cl_in_addr;
+
FILE *ofile;
char *ofile_buffer;
off_t ofile_offset;
int ofile_stdout;
int ofile_mmap;
+ volatile int sendfile_pending;
int (*get_subbuf)(struct thread_information *, unsigned int);
int (*flush_subbuf)(struct thread_information *, struct tip_subbuf *);
unsigned long events_processed;
unsigned long long data_read;
+ unsigned long long data_queued;
struct device_information *device;
int exited;
static char hostname[MAXHOSTNAMELEN];
static int net_port = TRACE_NET_PORT;
static int net_mode = 0;
-static int net_sendfile;
+static int net_use_sendfile;
static int net_in_fd = -1;
static int net_out_fd = -1;
static void handle_sigint(__attribute__((__unused__)) int sig)
{
+ struct device_information *dip;
+ int i;
+
+ /*
+ * stop trace so we can reap currently produced data
+ */
+ for_each_dip(dip, i) {
+ if (dip->fd == -1)
+ continue;
+ if (ioctl(dip->fd, BLKTRACESTOP) < 0)
+ perror("BLKTRACESTOP");
+ }
+
done = 1;
}
if (dip_tracing(dip) || kill_running_trace) {
dip_set_tracing(dip, 0);
- if (ioctl(dip->fd, BLKTRACESTOP) < 0)
- perror("BLKTRACESTOP");
+ /*
+ * should be stopped, just don't complain if it isn't
+ */
+ ioctl(dip->fd, BLKTRACESTOP);
+
if (ioctl(dip->fd, BLKTRACETEARDOWN) < 0)
perror("BLKTRACETEARDOWN");
static int read_data(struct thread_information *tip, void *buf,
unsigned int len)
{
- int ret = tip->read_data(tip, buf, len);
-
- if (ret > 0)
- tip->data_read += ret;
-
- return ret;
+ return tip->read_data(tip, buf, len);
}
static inline struct tip_subbuf *
ret = read_data(tip, tip->fs_buf + tip->fs_off, maxlen);
if (ret >= 0) {
+ tip->data_read += ret;
tip->fs_size += ret;
tip->fs_off += ret;
return 0;
return -1;
}
-static int get_subbuf_sendfile(struct thread_information *tip,
- unsigned int maxlen)
-{
- struct tip_subbuf *ts = malloc(sizeof(*ts));
- struct stat sb;
-
- ts->buf = malloc(buf_size);
- ts->max_len = maxlen;
- ts->buf = NULL;
-
- if (fstat(tip->fd, &sb) < 0) {
- perror("trace stat");
- return 1;
- }
-
- ts->len = sb.st_size - tip->ofile_offset;
- ts->max_len = ts->len;
- ts->offset = tip->ofile_offset;
- tip->ofile_offset += ts->len;
- return subbuf_fifo_queue(tip, ts);
-}
-
/*
* Use the copy approach for pipes and network
*/
ret = read_data(tip, ts->buf, ts->max_len);
if (ret > 0) {
ts->len = ret;
- return subbuf_fifo_queue(tip, ts);
+ tip->data_read += ret;
+ if (subbuf_fifo_queue(tip, ts))
+ return -1;
}
return ret;
}
+static int get_subbuf_sendfile(struct thread_information *tip,
+ unsigned int maxlen)
+{
+ struct tip_subbuf *ts;
+ struct stat sb;
+ unsigned int ready;
+
+ wait_for_data(tip);
+
+ /*
+ * hack to get last data out, we can't use sendfile for that
+ */
+ if (is_done())
+ return get_subbuf(tip, maxlen);
+
+ if (tip->sendfile_pending) {
+ usleep(100);
+ return 0;
+ }
+
+ if (fstat(tip->fd, &sb) < 0) {
+ perror("trace stat");
+ return -1;
+ }
+ ready = sb.st_size - tip->data_queued;
+ if (!ready)
+ return 0;
+
+ ts = malloc(sizeof(*ts));
+ ts->buf = NULL;
+ ts->max_len = 0;
+ ts->len = ready;
+ tip->data_queued += ready;
+
+ if (subbuf_fifo_queue(tip, ts))
+ return -1;
+
+ tip->sendfile_pending++;
+ return ready;
+}
+
static void close_thread(struct thread_information *tip)
{
if (tip->fd != -1)
}
while (!is_done()) {
- if (tip->get_subbuf(tip, buf_size))
+ if (tip->get_subbuf(tip, buf_size) < 0)
break;
}
+ /*
+ * trace is stopped, pull data until we get a short read
+ */
+ while (tip->get_subbuf(tip, buf_size) > 0)
+ ;
+
tip_ftrunc_final(tip);
tip->exited = 1;
return NULL;
return 0;
}
-static int flush_subbuf_sendfile(struct thread_information *tip,
- struct tip_subbuf *ts)
+static int net_sendfile(struct thread_information *tip, struct tip_subbuf *ts)
{
- if (net_send_header(tip, ts->len))
- return 1;
- if (sendfile(net_out_fd, tip->fd, &ts->offset, ts->len) < 0) {
+ int ret = sendfile(net_out_fd, tip->fd, NULL, ts->len);
+
+ if (ret < 0) {
perror("sendfile");
return 1;
+ } else if (ret < (int) ts->len) {
+ fprintf(stderr, "short sendfile send (%d of %d)\n", ret, ts->len);
+ return 1;
}
- free(ts);
return 0;
}
+static int flush_subbuf_sendfile(struct thread_information *tip,
+ struct tip_subbuf *ts)
+{
+ int ret = 1;
+
+ /*
+ * currently we cannot use sendfile() on the last bytes read, as they
+ * may not be a full subbuffer. get_subbuf_sendfile() falls back to
+ * the read approach for those, so use send() to ship them out
+ */
+ if (ts->buf)
+ return flush_subbuf_net(tip, ts);
+
+ if (net_send_header(tip, ts->len))
+ goto err;
+ if (net_sendfile(tip, ts))
+ goto err;
+
+ tip->data_read += ts->len;
+ tip->ofile_offset += buf_size;
+ ret = 0;
+err:
+ tip->sendfile_pending--;
+ free(ts);
+ return ret;
+}
+
static int write_data(struct thread_information *tip, void *buf,
unsigned int buf_len)
{
net_client_send_close();
}
-static void fill_ofname(char *dst, char *buts_name, int cpu)
+static int fill_ofname(struct thread_information *tip, char *dst,
+ char *buts_name)
{
+ struct stat sb;
int len = 0;
+ time_t t;
if (output_dir)
len = sprintf(dst, "%s/", output_dir);
+ if (net_mode == Net_server) {
+ len += sprintf(dst + len, "%s-", inet_ntoa(tip->cl_in_addr));
+ time(&t);
+ len += strftime(dst + len, 64, "%F-%T/", gmtime(&t));
+ }
+
+ if (stat(dst, &sb) < 0) {
+ if (errno != ENOENT) {
+ perror("stat");
+ return 1;
+ }
+ if (mkdir(dst, 0755) < 0) {
+ perror(dst);
+ fprintf(stderr, "Can't make output dir\n");
+ return 1;
+ }
+ }
+
if (output_name)
- sprintf(dst + len, "%s.blktrace.%d", output_name, cpu);
+ sprintf(dst + len, "%s.blktrace.%d", output_name, tip->cpu);
else
- sprintf(dst + len, "%s.blktrace.%d", buts_name, cpu);
+ sprintf(dst + len, "%s.blktrace.%d", buts_name, tip->cpu);
+
+ return 0;
}
static void fill_ops(struct thread_information *tip)
* setup ops
*/
if (net_mode == Net_client) {
- if (net_sendfile) {
+ if (net_use_sendfile) {
tip->get_subbuf = get_subbuf_sendfile;
tip->flush_subbuf = flush_subbuf_sendfile;
} else {
tip->read_data = read_data_file;
}
+static int tip_open_output(struct device_information *dip,
+ struct thread_information *tip)
+{
+ int pipeline = output_name && !strcmp(output_name, "-");
+ int mode, vbuf_size;
+ char op[128];
+
+ if (net_mode == Net_client) {
+ tip->ofile = NULL;
+ tip->ofile_stdout = 0;
+ tip->ofile_mmap = 0;
+ goto done;
+ } else if (pipeline) {
+ tip->ofile = fdopen(STDOUT_FILENO, "w");
+ tip->ofile_stdout = 1;
+ tip->ofile_mmap = 0;
+ mode = _IOLBF;
+ vbuf_size = 512;
+ } else {
+ if (fill_ofname(tip, op, dip->buts_name))
+ return 1;
+ tip->ofile = fopen(op, "w+");
+ tip->ofile_stdout = 0;
+ tip->ofile_mmap = 1;
+ mode = _IOFBF;
+ vbuf_size = OFILE_BUF;
+ }
+
+ if (tip->ofile == NULL) {
+ perror(op);
+ return 1;
+ }
+
+ tip->ofile_buffer = malloc(vbuf_size);
+ if (setvbuf(tip->ofile, tip->ofile_buffer, mode, vbuf_size)) {
+ perror("setvbuf");
+ close_thread(tip);
+ return 1;
+ }
+
+done:
+ fill_ops(tip);
+ return 0;
+}
+
static int start_threads(struct device_information *dip)
{
struct thread_information *tip;
- int j, pipeline = output_name && !strcmp(output_name, "-");
- int mode, vbuf_size;
- char op[64];
+ int j;
for_each_tip(dip, tip, j) {
tip->cpu = j;
tip->device = dip;
tip->events_processed = 0;
+ tip->fd = -1;
memset(&tip->fifo, 0, sizeof(tip->fifo));
tip->leftover_ts = NULL;
- if (pipeline) {
- tip->ofile = fdopen(STDOUT_FILENO, "w");
- tip->ofile_stdout = 1;
- tip->ofile_mmap = 0;
- mode = _IOLBF;
- vbuf_size = 512;
- } else {
- fill_ofname(op, dip->buts_name, tip->cpu);
- tip->ofile = fopen(op, "w+");
- tip->ofile_stdout = 0;
- tip->ofile_mmap = 1;
- mode = _IOFBF;
- vbuf_size = OFILE_BUF;
- }
-
- if (tip->ofile == NULL) {
- perror(op);
- return 1;
- }
-
- tip->ofile_buffer = malloc(vbuf_size);
- if (setvbuf(tip->ofile, tip->ofile_buffer, mode, vbuf_size)) {
- perror("setvbuf");
- close_thread(tip);
+ if (tip_open_output(dip, tip))
return 1;
- }
-
- fill_ops(tip);
if (pthread_create(&tip->thread, NULL, thread_main, tip)) {
perror("pthread_create");
if (!no_stdout)
printf(" CPU%3d: %20lu events, %8llu KiB data\n",
tip->cpu, tip->events_processed,
- tip->data_read >> 10);
+ (tip->data_read + 1023) >> 10);
events_processed += tip->events_processed;
data_read += tip->data_read;
}
if (!no_stdout)
printf(" Total: %20llu events (dropped %lu), %8llu KiB data\n",
events_processed, dip->drop_count,
- data_read >> 10);
+ (data_read + 1023) >> 10);
}
if (total_drops)
fprintf(stderr, "You have dropped events, consider using a larger buffer size (-b)\n");
}
-static struct device_information *net_get_dip(char *buts_name)
+static struct device_information *net_get_dip(char *buts_name,
+ struct in_addr *cl_in_addr)
{
struct device_information *dip;
int i;
device_information = realloc(device_information, (ndevs + 1) * sizeof(*dip));
dip = &device_information[ndevs];
+ memset(dip, 0, sizeof(*dip));
+ dip->fd = -1;
strcpy(dip->buts_name, buts_name);
- strcpy(dip->path, buts_name);
+ dip->path = strdup(buts_name);
ndevs++;
dip->threads = malloc(ncpus * sizeof(struct thread_information));
memset(dip->threads, 0, ncpus * sizeof(struct thread_information));
*/
for (i = 0; i < ncpus; i++) {
struct thread_information *tip = &dip->threads[i];
- char op[64];
tip->cpu = i;
- tip->ofile_stdout = 0;
- tip->ofile_mmap = 1;
tip->device = dip;
+ tip->fd = -1;
+ tip->cl_in_addr = *cl_in_addr;
- fill_ops(tip);
-
- fill_ofname(op, dip->buts_name, tip->cpu);
-
- tip->ofile = fopen(op, "w+");
- if (!tip->ofile) {
- perror("fopen");
+ if (tip_open_output(dip, tip))
return NULL;
- }
}
return dip;
}
-static struct thread_information *net_get_tip(struct blktrace_net_hdr *bnh)
+static struct thread_information *net_get_tip(struct blktrace_net_hdr *bnh,
+ struct in_addr *cl_in_addr)
{
struct device_information *dip;
ncpus = bnh->max_cpus;
- dip = net_get_dip(bnh->buts_name);
+ dip = net_get_dip(bnh->buts_name, cl_in_addr);
return &dip->threads[bnh->cpu];
}
}
}
fcntl(net_in_fd, F_SETFL, fl & ~O_NONBLOCK);
- return 0;
+ return bytes_left;
}
-static int net_server_loop(void)
+static int net_server_loop(struct in_addr *cl_in_addr)
{
struct thread_information *tip;
struct blktrace_net_hdr bnh;
}
if (!data_is_native) {
+ bnh.magic = be32_to_cpu(bnh.magic);
bnh.cpu = be32_to_cpu(bnh.cpu);
bnh.len = be32_to_cpu(bnh.len);
}
+ if ((bnh.magic & 0xffffff00) != BLK_IO_TRACE_MAGIC) {
+ fprintf(stderr, "server: bad data magic\n");
+ return 1;
+ }
+
/*
* len == 0 means that the other end signalled end-of-run
*/
return 1;
}
- tip = net_get_tip(&bnh);
+ tip = net_get_tip(&bnh, cl_in_addr);
if (!tip)
return 1;
return 0;
}
+static int get_connection(int fd, struct sockaddr_in *addr)
+{
+ struct pollfd pfd = { .fd = fd, .events = POLLIN };
+ socklen_t socklen;
+
+ printf("blktrace: waiting for incoming connection...\n");
+
+ if (poll(&pfd, 1, -1) < 0) {
+ perror("poll for connection");
+ return 1;
+ }
+ if ((pfd.revents & POLLIN) == 0)
+ return 1;
+
+ socklen = sizeof(*addr);
+ net_in_fd = accept(fd, (struct sockaddr *) addr, &socklen);
+ if (net_in_fd < 0) {
+ perror("accept");
+ return 1;
+ }
+
+ printf("blktrace: connection from %s\n", inet_ntoa(addr->sin_addr));
+ return 0;
+}
+
/*
* Start here when we are in server mode - just fetch data from the network
* and dump to files
struct device_information *dip;
struct thread_information *tip;
struct sockaddr_in addr;
- socklen_t socklen;
int fd, opt, i, j;
fd = socket(AF_INET, SOCK_STREAM, 0);
}
repeat:
- printf("blktrace: waiting for incoming connection...\n");
-
- socklen = sizeof(addr);
- net_in_fd = accept(fd, (struct sockaddr *) &addr, &socklen);
- if (net_in_fd < 0) {
- perror("accept");
- return 1;
- }
-
- signal(SIGINT, handle_sigint);
- signal(SIGHUP, handle_sigint);
- signal(SIGTERM, handle_sigint);
- signal(SIGALRM, handle_sigint);
-
- printf("blktrace: connected!\n");
+ if (get_connection(fd, &addr))
+ return 0;
while (!is_done()) {
- if (net_server_loop())
+ if (net_server_loop(&addr.sin_addr))
break;
}
fclose(tip->ofile);
free(dip->threads);
+ free(dip->path);
}
free(device_information);
device_information = NULL;
ncpus = ndevs = 0;
+
+ close(net_in_fd);
+ net_in_fd = -1;
+ stat_shown = 0;
goto repeat;
}
net_port = atoi(optarg);
break;
case 's':
- net_sendfile = 1;
+ net_use_sendfile = 1;
break;
default:
show_usage(argv[0]);