static void handle_text(struct fio_client *client, struct fio_net_cmd *cmd);
static void handle_stop(struct fio_client *client, struct fio_net_cmd *cmd);
static void handle_start(struct fio_client *client, struct fio_net_cmd *cmd);
-static void handle_iolog(struct fio_client *client, struct cmd_iolog_pdu *pdu);
struct client_ops fio_client_ops = {
.text = handle_text,
.start = handle_start,
.eta = display_thread_status,
.probe = handle_probe,
- .iolog = handle_iolog,
.eta_msec = FIO_CLIENT_DEF_ETA_MSEC,
.client_type = FIO_CLIENT_TYPE_CLI,
};
#define FIO_CLIENT_HASH_MASK (FIO_CLIENT_HASH_SZ - 1)
static struct flist_head client_hash[FIO_CLIENT_HASH_SZ];
+static struct cmd_iolog_pdu *convert_iolog(struct fio_net_cmd *, bool *);
+
static void fio_client_add_hash(struct fio_client *client)
{
int bucket = hash_long(client->fd, FIO_CLIENT_HASH_BITS);
fio_client_dec_jobs_eta(eta, client->ops->eta);
}
-static void handle_iolog(struct fio_client *client, struct cmd_iolog_pdu *pdu)
+void fio_client_handle_iolog(struct fio_client *client, struct fio_net_cmd *cmd)
{
- FILE *f;
-
- printf("got log compressed; %d\n", pdu->compressed);
+ struct cmd_iolog_pdu *pdu;
+ bool store_direct;
- f = fopen((const char *) pdu->name, "w");
- if (!f) {
- perror("fopen log");
+ pdu = convert_iolog(cmd, &store_direct);
+ if (!pdu)
return;
- }
- flush_samples(f, pdu->samples,
- pdu->nr_samples * sizeof(struct io_sample));
- fclose(f);
+ if (store_direct) {
+ ssize_t ret;
+ size_t sz;
+ int fd;
+
+ fd = open((const char *) pdu->name,
+ O_WRONLY | O_CREAT | O_TRUNC, 0644);
+ if (fd < 0) {
+ perror("open log");
+ return;
+ }
+ sz = cmd->pdu_len - sizeof(*pdu);
+ ret = write(fd, pdu->samples, sz);
+ if (ret != sz)
+ log_err("fio: short write on compressed log\n");
+ close(fd);
+ } else {
+ FILE *f;
+
+ f = fopen((const char *) pdu->name, "w");
+ if (!f) {
+ perror("fopen log");
+ return;
+ }
+
+ flush_samples(f, pdu->samples,
+ pdu->nr_samples * sizeof(struct io_sample));
+ fclose(f);
+ }
}
static void handle_probe(struct fio_client *client, struct fio_net_cmd *cmd)
* This has been compressed on the server side, since it can be big.
* Uncompress here.
*/
-static struct cmd_iolog_pdu *convert_iolog(struct fio_net_cmd *cmd)
+static struct cmd_iolog_pdu *convert_iolog(struct fio_net_cmd *cmd,
+ bool *store_direct)
{
struct cmd_iolog_pdu *pdu = (struct cmd_iolog_pdu *) cmd->payload;
struct cmd_iolog_pdu *ret;
uint64_t i;
+ int compressed;
void *samples;
+ *store_direct = false;
+
/*
* Convert if compressed and we support it. If it's not
* compressed, we need not do anything.
*/
- if (le32_to_cpu(pdu->compressed)) {
+ compressed = le32_to_cpu(pdu->compressed);
+ if (compressed == XMIT_COMPRESSED) {
#ifndef CONFIG_ZLIB
log_err("fio: server sent compressed data by mistake\n");
return NULL;
#endif
ret = convert_iolog_gz(cmd, pdu);
+ printf("compressed iolog, %p\n", ret);
if (!ret) {
log_err("fio: failed decompressing log\n");
return NULL;
}
+ } else if (compressed == STORE_COMPRESSED) {
+ *store_direct = true;
+ ret = pdu;
} else
ret = pdu;
ret->compressed = le32_to_cpu(ret->compressed);
ret->log_offset = le32_to_cpu(ret->log_offset);
+ if (*store_direct)
+ return ret;
+
samples = &ret->samples[0];
for (i = 0; i < ret->nr_samples; i++) {
struct io_sample *s;
break;
}
case FIO_NET_CMD_IOLOG:
- if (ops->iolog) {
- struct cmd_iolog_pdu *pdu;
-
- pdu = convert_iolog(cmd);
- ops->iolog(client, pdu);
- }
+ fio_client_handle_iolog(client, cmd);
break;
case FIO_NET_CMD_UPDATE_JOB:
ops->update_job(client, cmd);
unsigned int nr_files;
};
-struct cmd_iolog_pdu;
typedef void (client_cmd_op)(struct fio_client *, struct fio_net_cmd *);
typedef void (client_eta_op)(struct jobs_eta *je);
typedef void (client_timed_out_op)(struct fio_client *);
typedef void (client_jobs_eta_op)(struct fio_client *client, struct jobs_eta *je);
-typedef void (client_iolog_op)(struct fio_client *client, struct cmd_iolog_pdu *);
struct client_ops {
client_cmd_op *text;
client_cmd_op *stop;
client_cmd_op *start;
client_cmd_op *job_start;
- client_iolog_op *iolog;
client_timed_out_op *removed;
unsigned int eta_msec;
# zlib probe
zlib="no"
cat > $TMPC <<EOF
-#include <zlib.h>
+#include <zlib2.h>
int main(void)
{
z_stream stream;
gdk_threads_leave();
}
-static void gfio_client_iolog(struct fio_client *client, struct cmd_iolog_pdu *pdu)
-{
- printf("got iolog: name=%s, type=%u, entries=%lu\n", pdu->name, pdu->log_type, (unsigned long) pdu->nr_samples);
-}
-
static void gfio_add_total_depths_tree(GtkListStore *model,
struct thread_stat *ts, unsigned int len)
{
.stop = gfio_client_stop,
.start = gfio_client_start,
.job_start = gfio_client_job_start,
- .iolog = gfio_client_iolog,
.removed = gfio_client_removed,
.eta_msec = FIO_CLIENT_DEF_ETA_MSEC,
.stay_connected = 1,
uint64_t nr_samples;
};
-struct iolog_compress {
- struct flist_head list;
- void *buf;
- size_t len;
- unsigned int seq;
-};
-
#define GZ_CHUNK 131072
static struct iolog_compress *get_new_chunk(unsigned int seq)
} else
fio_lock_file(log->filename);
- /*
- * We should do this for any networked client. Will enable when
- * the kinks are ironed out.
- *
- * if (td->client_type == FIO_CLIENT_TYPE_GUI || is_backed)
- */
- if (td->client_type == FIO_CLIENT_TYPE_GUI)
+ if (td->client_type == FIO_CLIENT_TYPE_GUI || is_backend)
fio_send_iolog(td, log, log->filename);
else
flush_log(log, !td->o.per_job_logs);
return 0;
}
+size_t log_chunk_sizes(struct io_log *log)
+{
+ struct flist_head *entry;
+ size_t ret;
+
+ if (flist_empty(&log->chunk_list))
+ return 0;
+
+ ret = 0;
+ pthread_mutex_lock(&log->chunk_lock);
+ flist_for_each(entry, &log->chunk_list) {
+ struct iolog_compress *c;
+
+ c = flist_entry(entry, struct iolog_compress, list);
+ ret += c->len;
+ }
+ pthread_mutex_unlock(&log->chunk_lock);
+ return ret;
+}
+
#ifdef CONFIG_ZLIB
static void drop_data_unlock(struct iolog_flush_data *data)
extern void write_iolog_close(struct thread_data *);
extern int iolog_compress_init(struct thread_data *, struct sk_out *);
extern void iolog_compress_exit(struct thread_data *);
+extern size_t log_chunk_sizes(struct io_log *);
#ifdef CONFIG_ZLIB
extern int iolog_file_inflate(const char *);
INIT_FLIST_HEAD(&ipo->trim_list);
}
+struct iolog_compress {
+ struct flist_head list;
+ void *buf;
+ size_t len;
+ unsigned int seq;
+};
+
#endif
return ret;
}
-static struct sk_entry *fio_net_prep_cmd(uint16_t opcode, void *buf, off_t size,
- uint64_t *tagptr, int flags)
+static struct sk_entry *fio_net_prep_cmd(uint16_t opcode, void *buf,
+ size_t size, uint64_t *tagptr,
+ int flags)
{
struct sk_entry *entry;
return ret;
}
+static int fio_send_gz_chunks(struct sk_entry *first, struct io_log *log)
+{
+ struct sk_entry *entry;
+ struct flist_head *node;
+
+ pthread_mutex_lock(&log->chunk_lock);
+ flist_for_each(node, &log->chunk_list) {
+ struct iolog_compress *c;
+
+ c = flist_entry(node, struct iolog_compress, list);
+ entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, c->buf, c->len,
+ NULL, SK_F_VEC | SK_F_INLINE);
+ flist_add_tail(&entry->list, &first->next);
+ }
+ pthread_mutex_unlock(&log->chunk_lock);
+
+ return 0;
+}
+
int fio_send_iolog(struct thread_data *td, struct io_log *log, const char *name)
{
struct cmd_iolog_pdu pdu;
struct sk_entry *first;
int i, ret = 0;
+ if (!flist_empty(&log->chunk_list))
+ printf("log has chunks\n");
+
pdu.nr_samples = cpu_to_le64(log->nr_samples);
pdu.thread_number = cpu_to_le32(td->thread_number);
pdu.log_type = cpu_to_le32(log->log_type);
- pdu.compressed = cpu_to_le32(use_zlib);
+
+ if (!flist_empty(&log->chunk_list))
+ pdu.compressed = __cpu_to_le32(STORE_COMPRESSED);
+ else if (use_zlib)
+ pdu.compressed = __cpu_to_le32(XMIT_COMPRESSED);
+ else
+ pdu.compressed = 0;
strncpy((char *) pdu.name, name, FIO_NET_NAME_MAX);
pdu.name[FIO_NET_NAME_MAX - 1] = '\0';
+ /*
+ * We can't do this for a pre-compressed log, but for that case,
+ * log->nr_samples is zero anyway.
+ */
for (i = 0; i < log->nr_samples; i++) {
struct io_sample *s = get_sample(log, i);
* Now append actual log entries. Compress if we can, otherwise just
* plain text output.
*/
- if (use_zlib)
+ if (!flist_empty(&log->chunk_list))
+ ret = fio_send_gz_chunks(first, log);
+ else if (use_zlib)
ret = fio_send_iolog_gz(first, log);
else {
struct sk_entry *entry;
+ size_t size = log->nr_samples * log_entry_sz(log);
- entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, log->log,
- log->nr_samples * log_entry_sz(log),
- NULL, SK_F_VEC | SK_F_INLINE);
+ entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, log->log, size,
+ NULL, SK_F_VEC | SK_F_INLINE);
flist_add_tail(&entry->list, &first->next);
}
uint8_t buf[0];
};
+enum {
+ XMIT_COMPRESSED = 1U,
+ STORE_COMPRESSED = 2U,
+};
+
struct cmd_iolog_pdu {
uint64_t nr_samples;
uint32_t thread_number;