X-Git-Url: https://git.kernel.dk/?p=fio.git;a=blobdiff_plain;f=server.c;h=9f2220dd729274733d48bf1e8472bc6fb943c44e;hp=dfcc9f3879596da8fbc7cb38f57cb48234357a3f;hb=c3b4dd80150b7f706ec4e23065a851e5fa658c00;hpb=415c313375f52bdcf3517a7388f0a52f155d3d60 diff --git a/server.c b/server.c index dfcc9f38..9f2220dd 100644 --- a/server.c +++ b/server.c @@ -114,6 +114,7 @@ static const char *fio_server_ops[FIO_NET_CMD_NR] = { "LOAD_FILE", "VTRIGGER", "SENDFILE", + "JOB_OPT", }; static void sk_lock(struct sk_out *sk_out) @@ -151,11 +152,13 @@ static int __sk_out_drop(struct sk_out *sk_out) int refs; sk_lock(sk_out); + assert(sk_out->refs != 0); refs = --sk_out->refs; sk_unlock(sk_out); if (!refs) { sk_out_free(sk_out); + pthread_setspecific(sk_out_key, NULL); return 0; } } @@ -168,8 +171,7 @@ void sk_out_drop(void) struct sk_out *sk_out; sk_out = pthread_getspecific(sk_out_key); - if (!__sk_out_drop(sk_out)) - pthread_setspecific(sk_out_key, NULL); + __sk_out_drop(sk_out); } static void __fio_init_net_cmd(struct fio_net_cmd *cmd, uint16_t opcode, @@ -261,10 +263,17 @@ static int fio_send_data(int sk, const void *p, unsigned int len) return fio_sendv_data(sk, &iov, 1); } -static int fio_recv_data(int sk, void *p, unsigned int len) +static int fio_recv_data(int sk, void *p, unsigned int len, bool wait) { + int flags; + + if (wait) + flags = MSG_WAITALL; + else + flags = OS_MSG_DONTWAIT; + do { - int ret = recv(sk, p, len, MSG_WAITALL); + int ret = recv(sk, p, len, flags); if (ret > 0) { len -= ret; @@ -274,9 +283,11 @@ static int fio_recv_data(int sk, void *p, unsigned int len) continue; } else if (!ret) break; - else if (errno == EAGAIN || errno == EINTR) - continue; - else + else if (errno == EAGAIN || errno == EINTR) { + if (wait) + continue; + break; + } else break; } while (!exit_backend); @@ -325,7 +336,7 @@ static int verify_convert_cmd(struct fio_net_cmd *cmd) /* * Read (and defragment, if necessary) incoming commands */ -struct fio_net_cmd *fio_net_recv_cmd(int sk) +struct fio_net_cmd *fio_net_recv_cmd(int sk, bool wait) { struct fio_net_cmd cmd, *tmp, *cmdret = NULL; size_t cmd_size = 0, pdu_offset = 0; @@ -334,7 +345,7 @@ struct fio_net_cmd *fio_net_recv_cmd(int sk) void *pdu = NULL; do { - ret = fio_recv_data(sk, &cmd, sizeof(cmd)); + ret = fio_recv_data(sk, &cmd, sizeof(cmd), wait); if (ret) break; @@ -378,7 +389,7 @@ struct fio_net_cmd *fio_net_recv_cmd(int sk) /* There's payload, get it */ pdu = (void *) cmdret->payload + pdu_offset; - ret = fio_recv_data(sk, pdu, cmd.pdu_len); + ret = fio_recv_data(sk, pdu, cmd.pdu_len, wait); if (ret) break; @@ -611,7 +622,7 @@ static int fio_net_queue_quit(void) { dprint(FD_NET, "server: sending quit\n"); - return fio_net_queue_cmd(FIO_NET_CMD_QUIT, NULL, 0, 0, SK_F_SIMPLE); + return fio_net_queue_cmd(FIO_NET_CMD_QUIT, NULL, 0, NULL, SK_F_SIMPLE); } int fio_net_send_quit(int sk) @@ -757,6 +768,8 @@ static int handle_run_cmd(struct sk_out *sk_out, struct flist_head *job_list, pid_t pid; int ret; + sk_out_assign(sk_out); + fio_time_init(); set_genesis_time(); @@ -768,6 +781,7 @@ static int handle_run_cmd(struct sk_out *sk_out, struct flist_head *job_list, ret = fio_backend(sk_out); free_threads_shm(); + sk_out_drop(); _exit(ret); } @@ -958,9 +972,9 @@ static int handle_trigger_cmd(struct fio_net_cmd *cmd) struct all_io_list state; state.threads = cpu_to_le64((uint64_t) 0); - fio_net_queue_cmd(FIO_NET_CMD_VTRIGGER, &state, sizeof(state), NULL, SK_F_COPY); + fio_net_queue_cmd(FIO_NET_CMD_VTRIGGER, &state, sizeof(state), NULL, SK_F_COPY | SK_F_INLINE); } else - fio_net_queue_cmd(FIO_NET_CMD_VTRIGGER, rep, sz, NULL, SK_F_FREE); + fio_net_queue_cmd(FIO_NET_CMD_VTRIGGER, rep, sz, NULL, SK_F_FREE | SK_F_INLINE); exec_trigger(buf); return 0; @@ -1049,17 +1063,35 @@ static int fio_send_cmd_ext_pdu(int sk, uint16_t opcode, const void *buf, { struct fio_net_cmd cmd; struct iovec iov[2]; + size_t this_len; + int ret; iov[0].iov_base = (void *) &cmd; iov[0].iov_len = sizeof(cmd); - iov[1].iov_base = (void *) buf; - iov[1].iov_len = size; - __fio_init_net_cmd(&cmd, opcode, size, tag); - cmd.flags = __cpu_to_le32(flags); - fio_net_cmd_crc_pdu(&cmd, buf); + do { + uint32_t this_flags = flags; - return fio_sendv_data(sk, iov, 2); + this_len = size; + if (this_len > FIO_SERVER_MAX_FRAGMENT_PDU) + this_len = FIO_SERVER_MAX_FRAGMENT_PDU; + + if (this_len < size) + this_flags |= FIO_NET_CMD_F_MORE; + + __fio_init_net_cmd(&cmd, opcode, this_len, tag); + cmd.flags = __cpu_to_le32(this_flags); + fio_net_cmd_crc_pdu(&cmd, buf); + + iov[1].iov_base = (void *) buf; + iov[1].iov_len = this_len; + + ret = fio_sendv_data(sk, iov, 2); + size -= this_len; + buf += this_len; + } while (!ret && size); + + return ret; } static void finish_entry(struct sk_entry *entry) @@ -1205,7 +1237,7 @@ static int handle_connection(struct sk_out *sk_out) if (ret < 0) break; - cmd = fio_net_recv_cmd(sk_out->sk); + cmd = fio_net_recv_cmd(sk_out->sk, true); if (!cmd) { ret = -1; break; @@ -1621,58 +1653,107 @@ void fio_server_send_du(void) } } +#ifdef CONFIG_ZLIB +static int __fio_append_iolog_gz(struct sk_entry *first, struct io_log *log, + struct io_logs *cur_log, z_stream *stream) +{ + unsigned int this_len; + void *out_pdu; + int ret; + + stream->next_in = (void *) cur_log->log; + stream->avail_in = cur_log->nr_samples * log_entry_sz(log); + + do { + struct sk_entry *entry; + + /* + * Dirty - since the log is potentially huge, compress it into + * FIO_SERVER_MAX_FRAGMENT_PDU chunks and let the receiving + * side defragment it. + */ + out_pdu = malloc(FIO_SERVER_MAX_FRAGMENT_PDU); + + stream->avail_out = FIO_SERVER_MAX_FRAGMENT_PDU; + stream->next_out = out_pdu; + ret = deflate(stream, Z_BLOCK); + /* may be Z_OK, or Z_STREAM_END */ + if (ret < 0) { + free(out_pdu); + return 1; + } + + this_len = FIO_SERVER_MAX_FRAGMENT_PDU - stream->avail_out; + + entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, out_pdu, this_len, + NULL, SK_F_VEC | SK_F_INLINE | SK_F_FREE); + flist_add_tail(&entry->list, &first->next); + } while (stream->avail_in); + + return 0; +} + static int fio_append_iolog_gz(struct sk_entry *first, struct io_log *log) { int ret = 0; -#ifdef CONFIG_ZLIB - struct sk_entry *entry; z_stream stream; - void *out_pdu; - - /* - * Dirty - since the log is potentially huge, compress it into - * FIO_SERVER_MAX_FRAGMENT_PDU chunks and let the receiving - * side defragment it. - */ - out_pdu = malloc(FIO_SERVER_MAX_FRAGMENT_PDU); + memset(&stream, 0, sizeof(stream)); stream.zalloc = Z_NULL; stream.zfree = Z_NULL; stream.opaque = Z_NULL; - if (deflateInit(&stream, Z_DEFAULT_COMPRESSION) != Z_OK) { - ret = 1; - goto err; + if (deflateInit(&stream, Z_DEFAULT_COMPRESSION) != Z_OK) + return 1; + + while (!flist_empty(&log->io_logs)) { + struct io_logs *cur_log; + + cur_log = flist_first_entry(&log->io_logs, struct io_logs, list); + flist_del_init(&cur_log->list); + + ret = __fio_append_iolog_gz(first, log, cur_log, &stream); + if (ret) + break; } - stream.next_in = (void *) log->log; - stream.avail_in = log->nr_samples * log_entry_sz(log); + ret = deflate(&stream, Z_FINISH); - do { + while (ret != Z_STREAM_END) { + struct sk_entry *entry; unsigned int this_len; + void *out_pdu; + out_pdu = malloc(FIO_SERVER_MAX_FRAGMENT_PDU); stream.avail_out = FIO_SERVER_MAX_FRAGMENT_PDU; stream.next_out = out_pdu; + ret = deflate(&stream, Z_FINISH); /* may be Z_OK, or Z_STREAM_END */ - if (ret < 0) - goto err_zlib; + if (ret < 0) { + free(out_pdu); + break; + } this_len = FIO_SERVER_MAX_FRAGMENT_PDU - stream.avail_out; entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, out_pdu, this_len, - NULL, SK_F_VEC | SK_F_INLINE | SK_F_FREE); - out_pdu = NULL; + NULL, SK_F_VEC | SK_F_INLINE | SK_F_FREE); flist_add_tail(&entry->list, &first->next); - } while (stream.avail_in); + } while (ret != Z_STREAM_END); -err_zlib: - deflateEnd(&stream); -err: - free(out_pdu); -#endif - return ret; + ret = deflateEnd(&stream); + if (ret == Z_OK) + return 0; + + return 1; } +#else +static int fio_append_iolog_gz(struct sk_entry *first, struct io_log *log) +{ + return 1; +} +#endif static int fio_append_gz_chunks(struct sk_entry *first, struct io_log *log) { @@ -1696,11 +1777,21 @@ static int fio_append_gz_chunks(struct sk_entry *first, struct io_log *log) static int fio_append_text_log(struct sk_entry *first, struct io_log *log) { struct sk_entry *entry; - size_t size = log->nr_samples * log_entry_sz(log); - entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, log->log, size, - NULL, SK_F_VEC | SK_F_INLINE); - flist_add_tail(&entry->list, &first->next); + while (!flist_empty(&log->io_logs)) { + struct io_logs *cur_log; + size_t size; + + cur_log = flist_first_entry(&log->io_logs, struct io_logs, list); + flist_del_init(&cur_log->list); + + size = cur_log->nr_samples * log_entry_sz(log); + + entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, cur_log->log, size, + NULL, SK_F_VEC | SK_F_INLINE); + flist_add_tail(&entry->list, &first->next); + } + return 0; } @@ -1708,9 +1799,10 @@ int fio_send_iolog(struct thread_data *td, struct io_log *log, const char *name) { struct cmd_iolog_pdu pdu; struct sk_entry *first; - int i, ret = 0; + struct flist_head *entry; + int ret = 0; - pdu.nr_samples = cpu_to_le64(log->nr_samples); + pdu.nr_samples = cpu_to_le64(iolog_nr_samples(log)); pdu.thread_number = cpu_to_le32(td->thread_number); pdu.log_type = cpu_to_le32(log->log_type); @@ -1728,18 +1820,25 @@ int fio_send_iolog(struct thread_data *td, struct io_log *log, const char *name) * We can't do this for a pre-compressed log, but for that case, * log->nr_samples is zero anyway. */ - for (i = 0; i < log->nr_samples; i++) { - struct io_sample *s = get_sample(log, i); + flist_for_each(entry, &log->io_logs) { + struct io_logs *cur_log; + int i; + + cur_log = flist_entry(entry, struct io_logs, list); - s->time = cpu_to_le64(s->time); - s->val = cpu_to_le64(s->val); - s->__ddir = cpu_to_le32(s->__ddir); - s->bs = cpu_to_le32(s->bs); + for (i = 0; i < cur_log->nr_samples; i++) { + struct io_sample *s = get_sample(log, cur_log, i); - if (log->log_offset) { - struct io_sample_offset *so = (void *) s; + s->time = cpu_to_le64(s->time); + s->val = cpu_to_le64(s->val); + s->__ddir = cpu_to_le32(s->__ddir); + s->bs = cpu_to_le32(s->bs); - so->offset = cpu_to_le64(so->offset); + if (log->log_offset) { + struct io_sample_offset *so = (void *) s; + + so->offset = cpu_to_le64(so->offset); + } } } @@ -1784,24 +1883,25 @@ void fio_server_send_start(struct thread_data *td) assert(sk_out->sk != -1); - fio_net_queue_cmd(FIO_NET_CMD_SERVER_START, NULL, 0, 0, SK_F_SIMPLE); + fio_net_queue_cmd(FIO_NET_CMD_SERVER_START, NULL, 0, NULL, SK_F_SIMPLE); } int fio_server_get_verify_state(const char *name, int threadnumber, - void **datap, int *version) + void **datap) { struct thread_io_list *s; struct cmd_sendfile out; struct cmd_reply *rep; uint64_t tag; void *data; + int ret; dprint(FD_NET, "server: request verify state\n"); rep = smalloc(sizeof(*rep)); if (!rep) { log_err("fio: smalloc pool too small\n"); - return 1; + return ENOMEM; } __fio_mutex_init(&rep->lock, FIO_MUTEX_LOCKED); @@ -1819,17 +1919,19 @@ int fio_server_get_verify_state(const char *name, int threadnumber, */ if (fio_mutex_down_timeout(&rep->lock, 10000)) { log_err("fio: timed out waiting for reply\n"); + ret = ETIMEDOUT; goto fail; } if (rep->error) { log_err("fio: failure on receiving state file %s: %s\n", out.path, strerror(rep->error)); + ret = rep->error; fail: *datap = NULL; sfree(rep); fio_net_queue_quit(); - return 1; + return ret; } /* @@ -1837,12 +1939,15 @@ fail: * the header, and the thread_io_list checksum */ s = rep->data + sizeof(struct verify_state_hdr); - if (verify_state_hdr(rep->data, s, version)) + if (verify_state_hdr(rep->data, s)) { + ret = EILSEQ; goto fail; + } /* * Don't need the header from now, copy just the thread_io_list */ + ret = 0; rep->size -= sizeof(struct verify_state_hdr); data = malloc(rep->size); memcpy(data, s, rep->size); @@ -1851,7 +1956,7 @@ fail: sfree(rep->data); __fio_mutex_remove(&rep->lock); sfree(rep); - return 0; + return ret; } static int fio_init_server_ip(void) @@ -1879,11 +1984,10 @@ static int fio_init_server_ip(void) return -1; } #ifdef SO_REUSEPORT - if (setsockopt(sk, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) < 0) { - log_err("fio: setsockopt(REUSEPORT): %s\n", strerror(errno)); - close(sk); - return -1; - } + /* + * Not fatal if fails, so just ignore it if that happens + */ + setsockopt(sk, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)); #endif if (use_ipv6) { @@ -2176,16 +2280,25 @@ static void set_sig_handlers(void) sigaction(SIGINT, &act, NULL); } -static int fio_server(void) +void fio_server_destroy_sk_key(void) { - int sk, ret; + pthread_key_delete(sk_out_key); +} +int fio_server_create_sk_key(void) +{ if (pthread_key_create(&sk_out_key, NULL)) { log_err("fio: can't create sk_out backend key\n"); - return -1; + return 1; } pthread_setspecific(sk_out_key, NULL); + return 0; +} + +static int fio_server(void) +{ + int sk, ret; dprint(FD_NET, "starting server\n");