void *buf;
unsigned int len;
unsigned int max_len;
- off_t offset;
};
#define FIFO_SIZE (1024) /* should be plenty big! */
void *fd_buf;
char fn[MAXPATHLEN + 64];
- int pfd;
- size_t *pfd_buf;
+ struct in_addr cl_in_addr;
FILE *ofile;
char *ofile_buffer;
off_t ofile_offset;
int ofile_stdout;
int ofile_mmap;
+ volatile int sendfile_pending;
int (*get_subbuf)(struct thread_information *, unsigned int);
int (*flush_subbuf)(struct thread_information *, struct tip_subbuf *);
unsigned long events_processed;
unsigned long long data_read;
+ unsigned long long data_queued;
struct device_information *device;
int exited;
static char hostname[MAXHOSTNAMELEN];
static int net_port = TRACE_NET_PORT;
static int net_mode = 0;
-static int net_sendfile;
+static int net_use_sendfile;
static int net_in_fd = -1;
static int net_out_fd = -1;
static void handle_sigint(__attribute__((__unused__)) int sig)
{
+ struct device_information *dip;
+ int i;
+
+ /*
+ * stop trace so we can reap currently produced data
+ */
+ for_each_dip(dip, i) {
+ if (dip->fd == -1)
+ continue;
+ if (ioctl(dip->fd, BLKTRACESTOP) < 0)
+ perror("BLKTRACESTOP");
+ }
+
done = 1;
}
return atoi(tmp);
}
-static size_t get_subbuf_padding(struct thread_information *tip,
- unsigned subbuf)
-{
- size_t padding_size = buf_nr * sizeof(size_t);
- size_t ret;
-
- if (read(tip->pfd, tip->pfd_buf, padding_size) < 0) {
- perror("tip pad read");
- ret = -1;
- } else
- ret = tip->pfd_buf[subbuf];
-
- return ret;
-}
-
static int start_trace(struct device_information *dip)
{
struct blk_user_trace_setup buts;
if (dip_tracing(dip) || kill_running_trace) {
dip_set_tracing(dip, 0);
- if (ioctl(dip->fd, BLKTRACESTOP) < 0)
- perror("BLKTRACESTOP");
+ /*
+ * should be stopped, just don't complain if it isn't
+ */
+ ioctl(dip->fd, BLKTRACESTOP);
+
if (ioctl(dip->fd, BLKTRACETEARDOWN) < 0)
perror("BLKTRACETEARDOWN");
if (ret > 0) {
ts->len = ret;
tip->data_read += ret;
- return subbuf_fifo_queue(tip, ts);
+ if (subbuf_fifo_queue(tip, ts))
+ return -1;
}
return ret;
{
struct tip_subbuf *ts;
struct stat sb;
- unsigned int ready, this_size;
- int err;
+ unsigned int ready;
wait_for_data(tip);
if (is_done())
return get_subbuf(tip, maxlen);
- if (fstat(tip->fd, &sb) < 0) {
- perror("trace stat");
- return 1;
- }
-
- ready = sb.st_size - tip->ofile_offset;
- if (!ready) {
- /*
- * delay a little, since we poll() will return data available
- * until sendfile() is run
- */
+ if (tip->sendfile_pending) {
usleep(100);
return 0;
}
- this_size = buf_size;
- while (ready) {
- if (this_size > ready)
- this_size = ready;
-
- ts = malloc(sizeof(*ts));
-
- ts->max_len = maxlen;
- ts->buf = NULL;
-
- ts->len = this_size;
- ts->max_len = ts->len;
- ts->offset = tip->ofile_offset;
- tip->ofile_offset += ts->len;
+ if (fstat(tip->fd, &sb) < 0) {
+ perror("trace stat");
+ return -1;
+ }
+ ready = sb.st_size - tip->data_queued;
+ if (!ready)
+ return 0;
- err = subbuf_fifo_queue(tip, ts);
- if (err)
- return err;
+ ts = malloc(sizeof(*ts));
+ ts->buf = NULL;
+ ts->max_len = 0;
+ ts->len = ready;
+ tip->data_queued += ready;
- ready -= this_size;
- }
+ if (subbuf_fifo_queue(tip, ts))
+ return -1;
- return 0;
+ tip->sendfile_pending++;
+ return ready;
}
static void close_thread(struct thread_information *tip)
{
if (tip->fd != -1)
close(tip->fd);
- if (tip->pfd != -1)
- close(tip->pfd);
if (tip->ofile)
fclose(tip->ofile);
if (tip->ofile_buffer)
free(tip->ofile_buffer);
if (tip->fd_buf)
free(tip->fd_buf);
- if (tip->pfd_buf)
- free(tip->pfd_buf);
tip->fd = -1;
- tip->pfd = -1;
tip->ofile = NULL;
tip->ofile_buffer = NULL;
tip->fd_buf = NULL;
exit_trace(1);
}
- if (net_mode == Net_client && net_sendfile) {
- char tmp[MAXPATHLEN + 64];
-
- snprintf(tmp, sizeof(tmp), "%s/block/%s/trace%d.padding",
- relay_path, tip->device->buts_name, tip->cpu);
-
- tip->pfd = open(tmp, O_RDONLY);
- if (tip->pfd < 0) {
- fprintf(stderr, "Couldn't open padding file %s\n", tmp);
- exit_trace(1);
- }
-
- tip->pfd_buf = malloc(buf_nr * sizeof(size_t));
- }
-
while (!is_done()) {
- if (tip->get_subbuf(tip, buf_size))
+ if (tip->get_subbuf(tip, buf_size) < 0)
break;
}
+ /*
+ * trace is stopped, pull data until we get a short read
+ */
+ while (tip->get_subbuf(tip, buf_size) > 0)
+ ;
+
tip_ftrunc_final(tip);
tip->exited = 1;
return NULL;
return 0;
}
+static int net_sendfile(struct thread_information *tip, struct tip_subbuf *ts)
+{
+ int ret = sendfile(net_out_fd, tip->fd, NULL, ts->len);
+
+ if (ret < 0) {
+ perror("sendfile");
+ return 1;
+ } else if (ret < (int) ts->len) {
+ fprintf(stderr, "short sendfile send (%d of %d)\n", ret, ts->len);
+ return 1;
+ }
+
+ return 0;
+}
+
static int flush_subbuf_sendfile(struct thread_information *tip,
struct tip_subbuf *ts)
{
- size_t padding;
- unsigned subbuf;
- unsigned len;
+ int ret = 1;
/*
* currently we cannot use sendfile() on the last bytes read, as they
if (ts->buf)
return flush_subbuf_net(tip, ts);
- subbuf = (ts->offset / buf_size) % buf_nr;
- padding = get_subbuf_padding(tip, subbuf);
- len = ts->len - padding;
-
- if (net_send_header(tip, len))
- return 1;
- if (sendfile(net_out_fd, tip->fd, &ts->offset, len) < 0) {
- perror("sendfile");
- return 1;
- }
-
- tip->data_read += len;
+ if (net_send_header(tip, ts->len))
+ goto err;
+ if (net_sendfile(tip, ts))
+ goto err;
+
+ tip->data_read += ts->len;
+ tip->ofile_offset += buf_size;
+ ret = 0;
+err:
+ tip->sendfile_pending--;
free(ts);
- return 0;
+ return ret;
}
static int write_data(struct thread_information *tip, void *buf,
net_client_send_close();
}
-static void fill_ofname(char *dst, char *buts_name, int cpu)
+static int fill_ofname(struct thread_information *tip, char *dst,
+ char *buts_name)
{
+ struct stat sb;
int len = 0;
+ time_t t;
if (output_dir)
len = sprintf(dst, "%s/", output_dir);
+ if (net_mode == Net_server) {
+ len += sprintf(dst + len, "%s-", inet_ntoa(tip->cl_in_addr));
+ time(&t);
+ len += strftime(dst + len, 64, "%F-%T/", gmtime(&t));
+ }
+
+ if (stat(dst, &sb) < 0) {
+ if (errno != ENOENT) {
+ perror("stat");
+ return 1;
+ }
+ if (mkdir(dst, 0755) < 0) {
+ perror(dst);
+ fprintf(stderr, "Can't make output dir\n");
+ return 1;
+ }
+ }
+
if (output_name)
- sprintf(dst + len, "%s.blktrace.%d", output_name, cpu);
+ sprintf(dst + len, "%s.blktrace.%d", output_name, tip->cpu);
else
- sprintf(dst + len, "%s.blktrace.%d", buts_name, cpu);
+ sprintf(dst + len, "%s.blktrace.%d", buts_name, tip->cpu);
+
+ return 0;
}
static void fill_ops(struct thread_information *tip)
* setup ops
*/
if (net_mode == Net_client) {
- if (net_sendfile) {
+ if (net_use_sendfile) {
tip->get_subbuf = get_subbuf_sendfile;
tip->flush_subbuf = flush_subbuf_sendfile;
} else {
{
int pipeline = output_name && !strcmp(output_name, "-");
int mode, vbuf_size;
- char op[64];
+ char op[128];
if (net_mode == Net_client) {
tip->ofile = NULL;
tip->ofile_stdout = 0;
tip->ofile_mmap = 0;
- vbuf_size = 0;
- mode = 0; /* gcc 4.x issues a bogus warning */
+ goto done;
} else if (pipeline) {
tip->ofile = fdopen(STDOUT_FILENO, "w");
tip->ofile_stdout = 1;
mode = _IOLBF;
vbuf_size = 512;
} else {
- fill_ofname(op, dip->buts_name, tip->cpu);
+ if (fill_ofname(tip, op, dip->buts_name))
+ return 1;
tip->ofile = fopen(op, "w+");
tip->ofile_stdout = 0;
tip->ofile_mmap = 1;
vbuf_size = OFILE_BUF;
}
- if (net_mode != Net_client && tip->ofile == NULL) {
+ if (tip->ofile == NULL) {
perror(op);
return 1;
}
- if (vbuf_size) {
- tip->ofile_buffer = malloc(vbuf_size);
- if (setvbuf(tip->ofile, tip->ofile_buffer, mode, vbuf_size)) {
- perror("setvbuf");
- close_thread(tip);
- return 1;
- }
+ tip->ofile_buffer = malloc(vbuf_size);
+ if (setvbuf(tip->ofile, tip->ofile_buffer, mode, vbuf_size)) {
+ perror("setvbuf");
+ close_thread(tip);
+ return 1;
}
+done:
fill_ops(tip);
return 0;
}
tip->device = dip;
tip->events_processed = 0;
tip->fd = -1;
- tip->pfd = -1;
memset(&tip->fifo, 0, sizeof(tip->fifo));
tip->leftover_ts = NULL;
fprintf(stderr, "You have dropped events, consider using a larger buffer size (-b)\n");
}
-static struct device_information *net_get_dip(char *buts_name)
+static struct device_information *net_get_dip(char *buts_name,
+ struct in_addr *cl_in_addr)
{
struct device_information *dip;
int i;
device_information = realloc(device_information, (ndevs + 1) * sizeof(*dip));
dip = &device_information[ndevs];
+ memset(dip, 0, sizeof(*dip));
+ dip->fd = -1;
strcpy(dip->buts_name, buts_name);
- strcpy(dip->path, buts_name);
+ dip->path = strdup(buts_name);
ndevs++;
dip->threads = malloc(ncpus * sizeof(struct thread_information));
memset(dip->threads, 0, ncpus * sizeof(struct thread_information));
tip->cpu = i;
tip->device = dip;
+ tip->fd = -1;
+ tip->cl_in_addr = *cl_in_addr;
if (tip_open_output(dip, tip))
return NULL;
return dip;
}
-static struct thread_information *net_get_tip(struct blktrace_net_hdr *bnh)
+static struct thread_information *net_get_tip(struct blktrace_net_hdr *bnh,
+ struct in_addr *cl_in_addr)
{
struct device_information *dip;
ncpus = bnh->max_cpus;
- dip = net_get_dip(bnh->buts_name);
+ dip = net_get_dip(bnh->buts_name, cl_in_addr);
return &dip->threads[bnh->cpu];
}
}
}
fcntl(net_in_fd, F_SETFL, fl & ~O_NONBLOCK);
- return 0;
+ return bytes_left;
}
-static int net_server_loop(void)
+static int net_server_loop(struct in_addr *cl_in_addr)
{
struct thread_information *tip;
struct blktrace_net_hdr bnh;
}
if (!data_is_native) {
+ bnh.magic = be32_to_cpu(bnh.magic);
bnh.cpu = be32_to_cpu(bnh.cpu);
bnh.len = be32_to_cpu(bnh.len);
}
+ if ((bnh.magic & 0xffffff00) != BLK_IO_TRACE_MAGIC) {
+ fprintf(stderr, "server: bad data magic\n");
+ return 1;
+ }
+
/*
* len == 0 means that the other end signalled end-of-run
*/
return 1;
}
- tip = net_get_tip(&bnh);
+ tip = net_get_tip(&bnh, cl_in_addr);
if (!tip)
return 1;
return 0;
}
+static int get_connection(int fd, struct sockaddr_in *addr)
+{
+ struct pollfd pfd = { .fd = fd, .events = POLLIN };
+ socklen_t socklen;
+
+ printf("blktrace: waiting for incoming connection...\n");
+
+ if (poll(&pfd, 1, -1) < 0) {
+ perror("poll for connection");
+ return 1;
+ }
+ if ((pfd.revents & POLLIN) == 0)
+ return 1;
+
+ socklen = sizeof(*addr);
+ net_in_fd = accept(fd, (struct sockaddr *) addr, &socklen);
+ if (net_in_fd < 0) {
+ perror("accept");
+ return 1;
+ }
+
+ printf("blktrace: connection from %s\n", inet_ntoa(addr->sin_addr));
+ return 0;
+}
+
/*
* Start here when we are in server mode - just fetch data from the network
* and dump to files
struct device_information *dip;
struct thread_information *tip;
struct sockaddr_in addr;
- socklen_t socklen;
int fd, opt, i, j;
fd = socket(AF_INET, SOCK_STREAM, 0);
}
repeat:
- signal(SIGINT, NULL);
- signal(SIGHUP, NULL);
- signal(SIGTERM, NULL);
- signal(SIGALRM, NULL);
-
- printf("blktrace: waiting for incoming connection...\n");
-
- socklen = sizeof(addr);
- net_in_fd = accept(fd, (struct sockaddr *) &addr, &socklen);
- if (net_in_fd < 0) {
- perror("accept");
- return 1;
- }
-
- signal(SIGINT, handle_sigint);
- signal(SIGHUP, handle_sigint);
- signal(SIGTERM, handle_sigint);
- signal(SIGALRM, handle_sigint);
-
- printf("blktrace: connection from %s\n", inet_ntoa(addr.sin_addr));
+ if (get_connection(fd, &addr))
+ return 0;
while (!is_done()) {
- if (net_server_loop())
+ if (net_server_loop(&addr.sin_addr))
break;
}
fclose(tip->ofile);
free(dip->threads);
+ free(dip->path);
}
free(device_information);
net_port = atoi(optarg);
break;
case 's':
- net_sendfile = 1;
+ net_use_sendfile = 1;
break;
default:
show_usage(argv[0]);