- Better handling of vectored commands
- Improve sk_out mutex handling (don't alloc separately)
- Add support for sync sending of network data
- Prep for network xmit of logs
Signed-off-by: Jens Axboe <axboe@fb.com>
static void handle_text(struct fio_client *client, struct fio_net_cmd *cmd);
static void handle_stop(struct fio_client *client, struct fio_net_cmd *cmd);
static void handle_start(struct fio_client *client, struct fio_net_cmd *cmd);
static void handle_text(struct fio_client *client, struct fio_net_cmd *cmd);
static void handle_stop(struct fio_client *client, struct fio_net_cmd *cmd);
static void handle_start(struct fio_client *client, struct fio_net_cmd *cmd);
+static void handle_iolog(struct fio_client *client, struct cmd_iolog_pdu *pdu);
struct client_ops fio_client_ops = {
.text = handle_text,
struct client_ops fio_client_ops = {
.text = handle_text,
.start = handle_start,
.eta = display_thread_status,
.probe = handle_probe,
.start = handle_start,
.eta = display_thread_status,
.probe = handle_probe,
.eta_msec = FIO_CLIENT_DEF_ETA_MSEC,
.client_type = FIO_CLIENT_TYPE_CLI,
};
.eta_msec = FIO_CLIENT_DEF_ETA_MSEC,
.client_type = FIO_CLIENT_TYPE_CLI,
};
fio_client_dec_jobs_eta(eta, client->ops->eta);
}
fio_client_dec_jobs_eta(eta, client->ops->eta);
}
+static void handle_iolog(struct fio_client *client, struct cmd_iolog_pdu *pdu)
+{
+ FILE *f;
+
+ printf("got log compressed; %d\n", pdu->compressed);
+
+ f = fopen((const char *) pdu->name, "w");
+ if (!f) {
+ perror("fopen log");
+ return;
+ }
+
+ flush_samples(f, pdu->samples,
+ pdu->nr_samples * sizeof(struct io_sample));
+ fclose(f);
+}
+
static void handle_probe(struct fio_client *client, struct fio_net_cmd *cmd)
{
struct cmd_probe_reply_pdu *probe = (struct cmd_probe_reply_pdu *) cmd->payload;
static void handle_probe(struct fio_client *client, struct fio_net_cmd *cmd)
{
struct cmd_probe_reply_pdu *probe = (struct cmd_probe_reply_pdu *) cmd->payload;
-static void flush_samples(FILE *f, void *samples, uint64_t sample_size)
+void flush_samples(FILE *f, void *samples, uint64_t sample_size)
{
struct io_sample *s;
int log_offset;
{
struct io_sample *s;
int log_offset;
} else
fio_lock_file(log->filename);
} else
fio_lock_file(log->filename);
+ /*
+ * We should do this for any networked client. Will enable when
+ * the kinks are ironed out.
+ *
+ * if (td->client_type == FIO_CLIENT_TYPE_GUI || is_backed)
+ */
if (td->client_type == FIO_CLIENT_TYPE_GUI)
fio_send_iolog(td, log, log->filename);
else
if (td->client_type == FIO_CLIENT_TYPE_GUI)
fio_send_iolog(td, log, log->filename);
else
extern void finalize_logs(struct thread_data *td);
extern void setup_log(struct io_log **, struct log_params *, const char *);
extern void flush_log(struct io_log *, int);
extern void finalize_logs(struct thread_data *td);
extern void setup_log(struct io_log **, struct log_params *, const char *);
extern void flush_log(struct io_log *, int);
+extern void flush_samples(FILE *, void *, uint64_t);
extern void free_log(struct io_log *);
extern void fio_writeout_logs(struct thread_data *);
extern int iolog_flush(struct io_log *, int);
extern void free_log(struct io_log *);
extern void fio_writeout_logs(struct thread_data *);
extern int iolog_flush(struct io_log *, int);
SK_F_COPY = 2,
SK_F_SIMPLE = 4,
SK_F_VEC = 8,
SK_F_COPY = 2,
SK_F_SIMPLE = 4,
SK_F_VEC = 8,
* protected by below ->lock */
int sk; /* socket fd to talk to client */
* protected by below ->lock */
int sk; /* socket fd to talk to client */
- struct fio_mutex *lock; /* protects ref and below list */
+ struct fio_mutex lock; /* protects ref and below list */
struct flist_head list; /* list of pending transmit work */
struct flist_head list; /* list of pending transmit work */
- struct fio_mutex *wait; /* wake backend when items added to list */
+ struct fio_mutex wait; /* wake backend when items added to list */
+ struct fio_mutex xmit; /* held while sending data */
};
static char *fio_server_arg;
};
static char *fio_server_arg;
static void sk_lock(struct sk_out *sk_out)
{
static void sk_lock(struct sk_out *sk_out)
{
- fio_mutex_down(sk_out->lock);
+ fio_mutex_down(&sk_out->lock);
}
static void sk_unlock(struct sk_out *sk_out)
{
}
static void sk_unlock(struct sk_out *sk_out)
{
- fio_mutex_up(sk_out->lock);
+ fio_mutex_up(&sk_out->lock);
}
void sk_out_assign(struct sk_out *sk_out)
}
void sk_out_assign(struct sk_out *sk_out)
static void sk_out_free(struct sk_out *sk_out)
{
static void sk_out_free(struct sk_out *sk_out)
{
- fio_mutex_remove(sk_out->lock);
- fio_mutex_remove(sk_out->wait);
+ __fio_mutex_remove(&sk_out->lock);
+ __fio_mutex_remove(&sk_out->wait);
+ __fio_mutex_remove(&sk_out->xmit);
memcpy(entry->buf, buf, size);
} else
entry->buf = buf;
memcpy(entry->buf, buf, size);
} else
entry->buf = buf;
entry->size = size;
entry->tagptr = tagptr;
entry->flags = flags;
entry->size = size;
entry->tagptr = tagptr;
entry->flags = flags;
+static int handle_sk_entry(struct sk_out *sk_out, struct sk_entry *entry);
+
static void fio_net_queue_entry(struct sk_entry *entry)
{
struct sk_out *sk_out = pthread_getspecific(sk_out_key);
static void fio_net_queue_entry(struct sk_entry *entry)
{
struct sk_out *sk_out = pthread_getspecific(sk_out_key);
- sk_lock(sk_out);
- flist_add_tail(&entry->list, &sk_out->list);
- sk_unlock(sk_out);
+ if (entry->flags & SK_F_INLINE)
+ handle_sk_entry(sk_out, entry);
+ else {
+ sk_lock(sk_out);
+ flist_add_tail(&entry->list, &sk_out->list);
+ sk_unlock(sk_out);
- fio_mutex_up(sk_out->wait);
+ fio_mutex_up(&sk_out->wait);
+ }
}
static int fio_net_queue_cmd(uint16_t opcode, void *buf, off_t size,
}
static int fio_net_queue_cmd(uint16_t opcode, void *buf, off_t size,
+ fio_mutex_down(&sk_out->xmit);
+
if (entry->flags & SK_F_VEC)
ret = send_vec_entry(sk_out, entry);
if (entry->flags & SK_F_VEC)
ret = send_vec_entry(sk_out, entry);
- if (entry->flags & SK_F_SIMPLE) {
+ else if (entry->flags & SK_F_SIMPLE) {
uint64_t tag = 0;
if (entry->tagptr)
uint64_t tag = 0;
if (entry->tagptr)
} else
ret = fio_net_send_cmd(sk_out->sk, entry->opcode, entry->buf, entry->size, entry->tagptr, NULL);
} else
ret = fio_net_send_cmd(sk_out->sk, entry->opcode, entry->buf, entry->size, entry->tagptr, NULL);
+ fio_mutex_up(&sk_out->xmit);
+
if (ret)
log_err("fio: failed handling cmd %s\n", fio_server_op(entry->opcode));
if (ret)
log_err("fio: failed handling cmd %s\n", fio_server_op(entry->opcode));
break;
} else if (!ret) {
fio_server_check_jobs(&job_list);
break;
} else if (!ret) {
fio_server_check_jobs(&job_list);
- fio_mutex_down_timeout(sk_out->wait, timeout);
+ fio_mutex_down_timeout(&sk_out->wait, timeout);
sk_out = smalloc(sizeof(*sk_out));
sk_out->sk = sk;
INIT_FLIST_HEAD(&sk_out->list);
sk_out = smalloc(sizeof(*sk_out));
sk_out->sk = sk;
INIT_FLIST_HEAD(&sk_out->list);
- sk_out->lock = fio_mutex_init(FIO_MUTEX_UNLOCKED);
- sk_out->wait = fio_mutex_init(FIO_MUTEX_LOCKED);
+ __fio_mutex_init(&sk_out->lock, FIO_MUTEX_UNLOCKED);
+ __fio_mutex_init(&sk_out->wait, FIO_MUTEX_LOCKED);
+ __fio_mutex_init(&sk_out->xmit, FIO_MUTEX_UNLOCKED);
this_len = FIO_SERVER_MAX_FRAGMENT_PDU - stream.avail_out;
entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, out_pdu, this_len,
this_len = FIO_SERVER_MAX_FRAGMENT_PDU - stream.avail_out;
entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, out_pdu, this_len,
- NULL, SK_F_FREE | SK_F_VEC);
+ NULL, SK_F_VEC | SK_F_INLINE | SK_F_FREE);
+ out_pdu = NULL;
flist_add_tail(&entry->list, &first->next);
} while (stream.avail_in);
flist_add_tail(&entry->list, &first->next);
} while (stream.avail_in);
/*
* Assemble header entry first
*/
/*
* Assemble header entry first
*/
- first = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, &pdu, sizeof(pdu), NULL, SK_F_COPY | SK_F_VEC);
+ first = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, &pdu, sizeof(pdu), NULL, SK_F_VEC | SK_F_INLINE | SK_F_COPY);
/*
* Now append actual log entries. Compress if we can, otherwise just
/*
* Now append actual log entries. Compress if we can, otherwise just
entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, log->log,
log->nr_samples * log_entry_sz(log),
entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, log->log,
log->nr_samples * log_entry_sz(log),
- NULL, SK_F_FREE | SK_F_VEC);
+ NULL, SK_F_VEC | SK_F_INLINE);
flist_add_tail(&entry->list, &first->next);
}
flist_add_tail(&entry->list, &first->next);
}
+ fio_net_queue_entry(first);