X-Git-Url: https://git.kernel.dk/?p=fio.git;a=blobdiff_plain;f=server.c;h=f11e97278b20f52614269087e37393b2e94b3bf1;hp=efdf51f3870a8587fbaa8a0be9eafa03b75bc055;hb=bc0fec0e12f19dd424f4bf83cfca89d434184c8d;hpb=b38823202a5015e27cc4a683aa8d930764620443 diff --git a/server.c b/server.c index efdf51f3..f11e9727 100644 --- a/server.c +++ b/server.c @@ -40,20 +40,23 @@ enum { }; struct sk_entry { - struct flist_head list; - int opcode; + struct flist_head list; /* link on sk_out->list */ + int flags; /* SK_F_* */ + int opcode; /* Actual command fields */ void *buf; off_t size; uint64_t *tagptr; - int flags; - struct flist_head next; + struct flist_head next; /* Other sk_entry's, if linked command */ }; struct sk_out { - int sk; - struct fio_mutex *lock; - struct flist_head list; - struct fio_mutex *wait; + unsigned int refs; /* frees sk_out when it drops to zero. + * protected by below ->lock */ + + int sk; /* socket fd to talk to client */ + struct fio_mutex *lock; /* protects ref and below list */ + struct flist_head list; /* list of pending transmit work */ + struct fio_mutex *wait; /* wake backend when items added to list */ }; static char *fio_server_arg; @@ -121,6 +124,72 @@ static void sk_unlock(struct sk_out *sk_out) fio_mutex_up(sk_out->lock); } +void sk_out_assign(struct sk_out *sk_out) +{ + if (!sk_out) + return; + + sk_lock(sk_out); + sk_out->refs++; + sk_unlock(sk_out); + pthread_setspecific(sk_out_key, sk_out); +} + +static void sk_out_free(struct sk_out *sk_out) +{ + fio_mutex_remove(sk_out->lock); + fio_mutex_remove(sk_out->wait); + sfree(sk_out); +} + +static int __sk_out_drop(struct sk_out *sk_out) +{ + if (sk_out) { + int refs; + + sk_lock(sk_out); + refs = --sk_out->refs; + sk_unlock(sk_out); + + if (!refs) { + sk_out_free(sk_out); + return 0; + } + } + + return 1; +} + +void sk_out_drop(void) +{ + struct sk_out *sk_out; + + sk_out = pthread_getspecific(sk_out_key); + if (!__sk_out_drop(sk_out)) + pthread_setspecific(sk_out_key, NULL); +} + +static void __fio_init_net_cmd(struct fio_net_cmd *cmd, uint16_t opcode, + uint32_t pdu_len, uint64_t tag) +{ + memset(cmd, 0, sizeof(*cmd)); + + cmd->version = __cpu_to_le16(FIO_SERVER_VER); + cmd->opcode = cpu_to_le16(opcode); + cmd->tag = cpu_to_le64(tag); + cmd->pdu_len = cpu_to_le32(pdu_len); +} + + +static void fio_init_net_cmd(struct fio_net_cmd *cmd, uint16_t opcode, + const void *pdu, uint32_t pdu_len, uint64_t tag) +{ + __fio_init_net_cmd(cmd, opcode, pdu_len, tag); + + if (pdu) + memcpy(&cmd->payload, pdu, pdu_len); +} + const char *fio_server_op(unsigned int op) { static char buf[32]; @@ -180,7 +249,7 @@ static int fio_sendv_data(int sk, struct iovec *iov, int count) return 1; } -int fio_send_data(int sk, const void *p, unsigned int len) +static int fio_send_data(int sk, const void *p, unsigned int len) { struct iovec iov = { .iov_base = (void *) p, .iov_len = len }; @@ -189,7 +258,7 @@ int fio_send_data(int sk, const void *p, unsigned int len) return fio_sendv_data(sk, &iov, 1); } -int fio_recv_data(int sk, void *p, unsigned int len) +static int fio_recv_data(int sk, void *p, unsigned int len) { do { int ret = recv(sk, p, len, MSG_WAITALL); @@ -381,7 +450,7 @@ static void free_reply(uint64_t tag) free(reply); } -void fio_net_cmd_crc_pdu(struct fio_net_cmd *cmd, const void *pdu) +static void fio_net_cmd_crc_pdu(struct fio_net_cmd *cmd, const void *pdu) { uint32_t pdu_len; @@ -391,7 +460,7 @@ void fio_net_cmd_crc_pdu(struct fio_net_cmd *cmd, const void *pdu) cmd->pdu_crc16 = __cpu_to_le16(fio_crc16(pdu, pdu_len)); } -void fio_net_cmd_crc(struct fio_net_cmd *cmd) +static void fio_net_cmd_crc(struct fio_net_cmd *cmd) { fio_net_cmd_crc_pdu(cmd, cmd->payload); } @@ -448,8 +517,8 @@ int fio_net_send_cmd(int fd, uint16_t opcode, const void *buf, off_t size, return ret; } -struct sk_entry *fio_net_prep_cmd(uint16_t opcode, void *buf, off_t size, - uint64_t *tagptr, int flags) +static struct sk_entry *fio_net_prep_cmd(uint16_t opcode, void *buf, off_t size, + uint64_t *tagptr, int flags) { struct sk_entry *entry; @@ -672,7 +741,6 @@ static int handle_load_file_cmd(struct fio_net_cmd *cmd) static int handle_run_cmd(struct sk_out *sk_out, struct flist_head *job_list, struct fio_net_cmd *cmd) { - struct backend_data data; pid_t pid; int ret; @@ -685,10 +753,7 @@ static int handle_run_cmd(struct sk_out *sk_out, struct flist_head *job_list, return 0; } - data.key = sk_out_key; - data.ptr = sk_out; - //pthread_setspecific(sk_out_key, sk_out); - ret = fio_backend(&data); + ret = fio_backend(sk_out); free_threads_shm(); _exit(ret); } @@ -1149,13 +1214,15 @@ static int handle_connection(struct sk_out *sk_out) handle_xmits(sk_out); close(sk_out->sk); + sk_out->sk = -1; + __sk_out_drop(sk_out); _exit(ret); } /* get the address on this host bound by the input socket, * whether it is ipv6 or ipv4 */ -int get_my_addr_str(int sk) +static int get_my_addr_str(int sk) { struct sockaddr_in6 myaddr6 = { 0, }; struct sockaddr_in myaddr4 = { 0, }; @@ -1196,21 +1263,14 @@ static int accept_loop(int listen_sk) socklen_t len = use_ipv6 ? sizeof(addr6) : sizeof(addr); struct pollfd pfd; int ret = 0, sk, exitval = 0; - struct sk_out *sk_out; FLIST_HEAD(conn_list); dprint(FD_NET, "server enter accept loop\n"); fio_set_fd_nonblocking(listen_sk, "server"); - sk_out = smalloc(sizeof(*sk_out)); - INIT_FLIST_HEAD(&sk_out->list); - sk_out->lock = fio_mutex_init(FIO_MUTEX_UNLOCKED); - sk_out->wait = fio_mutex_init(FIO_MUTEX_LOCKED); - - pthread_setspecific(sk_out_key, sk_out); - while (!exit_backend) { + struct sk_out *sk_out; const char *from; char buf[64]; pid_t pid; @@ -1260,28 +1320,30 @@ static int accept_loop(int listen_sk) dprint(FD_NET, "server: connect from %s\n", from); + sk_out = smalloc(sizeof(*sk_out)); sk_out->sk = sk; + INIT_FLIST_HEAD(&sk_out->list); + sk_out->lock = fio_mutex_init(FIO_MUTEX_UNLOCKED); + sk_out->wait = fio_mutex_init(FIO_MUTEX_LOCKED); pid = fork(); if (pid) { close(sk); fio_server_add_conn_pid(&conn_list, pid); - pthread_setspecific(sk_out_key, sk_out); continue; } - /* exits */ - get_my_addr_str(sk); /* if error, it's already logged, non-fatal */ + /* if error, it's already logged, non-fatal */ + get_my_addr_str(sk); + + /* + * Assign sk_out here, it'll be dropped in handle_connection() + * since that function calls _exit() when done + */ + sk_out_assign(sk_out); handle_connection(sk_out); } -#if 0 - fio_mutex_remove(sk_out->lock); - fio_mutex_remove(sk_out->wait); - sfree(sk_out); - pthread_setspecific(sk_out_key, NULL); -#endif - return exitval; } @@ -1450,6 +1512,47 @@ void fio_server_send_gs(struct group_run_stats *rs) fio_net_queue_cmd(FIO_NET_CMD_GS, &gs, sizeof(gs), NULL, SK_F_COPY); } +void fio_server_send_job_options(struct flist_head *opt_list, + unsigned int groupid) +{ + struct cmd_job_option pdu; + struct flist_head *entry; + + if (flist_empty(opt_list)) + return; + + flist_for_each(entry, opt_list) { + struct print_option *p; + size_t len; + + p = flist_entry(entry, struct print_option, list); + memset(&pdu, 0, sizeof(pdu)); + + if (groupid == -1U) { + pdu.global = __cpu_to_le16(1); + pdu.groupid = 0; + } else { + pdu.global = 0; + pdu.groupid = cpu_to_le32(groupid); + } + len = strlen(p->name); + if (len >= sizeof(pdu.name)) { + len = sizeof(pdu.name) - 1; + pdu.truncated = __cpu_to_le16(1); + } + memcpy(pdu.name, p->name, len); + if (p->value) { + len = strlen(p->value); + if (len >= sizeof(pdu.value)) { + len = sizeof(pdu.value) - 1; + pdu.truncated = __cpu_to_le16(1); + } + memcpy(pdu.value, p->value, len); + } + fio_net_queue_cmd(FIO_NET_CMD_JOB_OPT, &pdu, sizeof(pdu), NULL, SK_F_COPY); + } +} + static void convert_agg(struct disk_util_agg *dst, struct disk_util_agg *src) { int i; @@ -1827,7 +1930,7 @@ static int fio_init_server_connection(void) log_info("fio: server listening on %s\n", bind_str); - if (listen(sk, 0) < 0) { + if (listen(sk, 4) < 0) { log_err("fio: listen: %s\n", strerror(errno)); close(sk); return -1; @@ -2022,6 +2125,13 @@ static int fio_server(void) { int sk, ret; + if (pthread_key_create(&sk_out_key, NULL)) { + log_err("fio: can't create sk_out backend key\n"); + return -1; + } + + pthread_setspecific(sk_out_key, NULL); + dprint(FD_NET, "starting server\n"); if (fio_handle_server_arg()) @@ -2033,9 +2143,6 @@ static int fio_server(void) set_sig_handlers(); - if (pthread_key_create(&sk_out_key, NULL)) - log_err("fio: can't create sk_out backend key\n"); - ret = accept_loop(sk); close(sk);