off_t ofile_offset;
int ofile_stdout;
int ofile_mmap;
- volatile int sendfile_pending;
int (*get_subbuf)(struct thread_information *, unsigned int);
int (*flush_subbuf)(struct thread_information *, struct tip_subbuf *);
}
}
-static void wait_for_data(struct thread_information *tip)
+static void wait_for_data(struct thread_information *tip, int events)
{
- struct pollfd pfd = { .fd = tip->fd, .events = POLLIN };
+ struct pollfd pfd = { .fd = tip->fd, .events = events };
do {
- poll(&pfd, 1, 100);
- if (pfd.revents & POLLIN)
+ if (poll(&pfd, 1, 100) < 0) {
+ perror("poll");
+ break;
+ }
+ if (pfd.revents & events)
break;
if (tip->ofile_stdout)
break;
int ret = 0;
do {
- wait_for_data(tip);
+ wait_for_data(tip, POLLIN);
ret = read(tip->fd, buf, len);
if (!ret)
return len - bytes_left;
}
-static int read_data(struct thread_information *tip, void *buf,
- unsigned int len)
-{
- return tip->read_data(tip, buf, len);
-}
-
static inline struct tip_subbuf *
subbuf_fifo_dequeue(struct thread_information *tip)
{
mlock(tip->fs_buf, tip->fs_buf_len);
}
- ret = read_data(tip, tip->fs_buf + tip->fs_off, maxlen);
+ ret = tip->read_data(tip, tip->fs_buf + tip->fs_off, maxlen);
if (ret >= 0) {
tip->data_read += ret;
tip->fs_size += ret;
ts->buf = malloc(buf_size);
ts->max_len = maxlen;
- ret = read_data(tip, ts->buf, ts->max_len);
+ ret = tip->read_data(tip, ts->buf, ts->max_len);
if (ret > 0) {
ts->len = ret;
tip->data_read += ret;
struct stat sb;
unsigned int ready;
- wait_for_data(tip);
+ wait_for_data(tip, POLLMSG);
/*
* hack to get last data out, we can't use sendfile for that
if (is_done())
return get_subbuf(tip, maxlen);
- if (tip->sendfile_pending) {
- usleep(100);
- return 0;
- }
-
if (fstat(tip->fd, &sb) < 0) {
perror("trace stat");
return -1;
}
ready = sb.st_size - tip->data_queued;
- if (!ready)
+ if (!ready) {
+ usleep(1000);
return 0;
+ }
ts = malloc(sizeof(*ts));
ts->buf = NULL;
if (subbuf_fifo_queue(tip, ts))
return -1;
- tip->sendfile_pending++;
return ready;
}
*/
static void net_client_send_close(void)
{
+ struct device_information *dip;
struct blktrace_net_hdr hdr;
+ int i;
hdr.magic = BLK_IO_TRACE_MAGIC;
hdr.cpu = 0;
hdr.max_cpus = ncpus;
hdr.len = 0;
+ for_each_dip(dip, i) {
+ strcpy(hdr.buts_name, dip->buts_name);
+ hdr.cpu += get_dropped_count(dip->buts_name);
+ }
+
write_data_net(net_out_fd, &hdr, sizeof(hdr));
}
struct tip_subbuf *ts)
{
if (net_send_header(tip, ts->len))
- return 1;
+ return -1;
if (write_data_net(net_out_fd, ts->buf, ts->len))
- return 1;
+ return -1;
free(ts->buf);
free(ts);
- return 0;
+ return 1;
}
static int net_sendfile(struct thread_information *tip, struct tip_subbuf *ts)
static int flush_subbuf_sendfile(struct thread_information *tip,
struct tip_subbuf *ts)
{
- int ret = 1;
+ int ret = -1;
/*
* currently we cannot use sendfile() on the last bytes read, as they
tip->data_read += ts->len;
tip->ofile_offset += buf_size;
- ret = 0;
+ ret = 1;
err:
- tip->sendfile_pending--;
free(ts);
return ret;
}
}
if (!events)
- usleep(10);
+ usleep(100000);
}
/*
do {
tips_running = 0;
- usleep(1000);
+ usleep(100000);
for_each_dip(dip, i)
for_each_tip(dip, tip, j)
perror("recv header");
return 1;
}
- usleep(100);
+ usleep(1000);
continue;
} else if (!ret) {
- usleep(100);
+ usleep(1000);
continue;
} else {
p += ret;
* len == 0 means that the other end signalled end-of-run
*/
if (!bnh.len) {
+ /*
+ * overload cpu count with dropped events
+ */
+ struct device_information *dip;
+
+ dip = net_get_dip(bnh.buts_name, cl_in_addr);
+ dip->drop_count = bnh.cpu;
+
fprintf(stderr, "server: end of run\n");
return 1;
}