X-Git-Url: https://git.kernel.dk/?p=fio.git;a=blobdiff_plain;f=server.c;h=38a6bf8543c71fc3f0d48b743c222d4b03a9fea0;hp=b0ff1dcca07566bab309513925a45fda01841235;hb=0279b88017b5d21f0fcbb1b57481339735c41797;hpb=2466096336bd0fbc1a94811aa338926af6baf42f diff --git a/server.c b/server.c index b0ff1dcc..38a6bf85 100644 --- a/server.c +++ b/server.c @@ -40,22 +40,23 @@ enum { }; struct sk_entry { - struct flist_head list; - int opcode; + struct flist_head list; /* link on sk_out->list */ + int flags; /* SK_F_* */ + int opcode; /* Actual command fields */ void *buf; off_t size; uint64_t *tagptr; - int flags; - struct flist_head next; + struct flist_head next; /* Other sk_entry's, if linked command */ }; struct sk_out { - unsigned int refs; + unsigned int refs; /* frees sk_out when it drops to zero. + * protected by below ->lock */ - int sk; - struct fio_mutex *lock; - struct flist_head list; - struct fio_mutex *wait; + int sk; /* socket fd to talk to client */ + struct fio_mutex *lock; /* protects ref and below list */ + struct flist_head list; /* list of pending transmit work */ + struct fio_mutex *wait; /* wake backend when items added to list */ }; static char *fio_server_arg; @@ -134,17 +135,15 @@ void sk_out_assign(struct sk_out *sk_out) pthread_setspecific(sk_out_key, sk_out); } -static void __sk_out_drop(struct sk_out *sk_out) +static void sk_out_free(struct sk_out *sk_out) { fio_mutex_remove(sk_out->lock); fio_mutex_remove(sk_out->wait); sfree(sk_out); } -void sk_out_drop(void) +static int __sk_out_drop(struct sk_out *sk_out) { - struct sk_out *sk_out = pthread_getspecific(sk_out_key); - if (sk_out) { int refs; @@ -152,11 +151,43 @@ void sk_out_drop(void) refs = --sk_out->refs; sk_unlock(sk_out); - if (!refs) - __sk_out_drop(sk_out); + if (!refs) { + sk_out_free(sk_out); + return 0; + } + } + + return 1; +} + +void sk_out_drop(void) +{ + struct sk_out *sk_out; + sk_out = pthread_getspecific(sk_out_key); + if (!__sk_out_drop(sk_out)) pthread_setspecific(sk_out_key, NULL); - } +} + +static void __fio_init_net_cmd(struct fio_net_cmd *cmd, uint16_t opcode, + uint32_t pdu_len, uint64_t tag) +{ + memset(cmd, 0, sizeof(*cmd)); + + cmd->version = __cpu_to_le16(FIO_SERVER_VER); + cmd->opcode = cpu_to_le16(opcode); + cmd->tag = cpu_to_le64(tag); + cmd->pdu_len = cpu_to_le32(pdu_len); +} + + +static void fio_init_net_cmd(struct fio_net_cmd *cmd, uint16_t opcode, + const void *pdu, uint32_t pdu_len, uint64_t tag) +{ + __fio_init_net_cmd(cmd, opcode, pdu_len, tag); + + if (pdu) + memcpy(&cmd->payload, pdu, pdu_len); } const char *fio_server_op(unsigned int op) @@ -218,7 +249,7 @@ static int fio_sendv_data(int sk, struct iovec *iov, int count) return 1; } -int fio_send_data(int sk, const void *p, unsigned int len) +static int fio_send_data(int sk, const void *p, unsigned int len) { struct iovec iov = { .iov_base = (void *) p, .iov_len = len }; @@ -227,7 +258,7 @@ int fio_send_data(int sk, const void *p, unsigned int len) return fio_sendv_data(sk, &iov, 1); } -int fio_recv_data(int sk, void *p, unsigned int len) +static int fio_recv_data(int sk, void *p, unsigned int len) { do { int ret = recv(sk, p, len, MSG_WAITALL); @@ -419,7 +450,7 @@ static void free_reply(uint64_t tag) free(reply); } -void fio_net_cmd_crc_pdu(struct fio_net_cmd *cmd, const void *pdu) +static void fio_net_cmd_crc_pdu(struct fio_net_cmd *cmd, const void *pdu) { uint32_t pdu_len; @@ -429,7 +460,7 @@ void fio_net_cmd_crc_pdu(struct fio_net_cmd *cmd, const void *pdu) cmd->pdu_crc16 = __cpu_to_le16(fio_crc16(pdu, pdu_len)); } -void fio_net_cmd_crc(struct fio_net_cmd *cmd) +static void fio_net_cmd_crc(struct fio_net_cmd *cmd) { fio_net_cmd_crc_pdu(cmd, cmd->payload); } @@ -486,8 +517,8 @@ int fio_net_send_cmd(int fd, uint16_t opcode, const void *buf, off_t size, return ret; } -struct sk_entry *fio_net_prep_cmd(uint16_t opcode, void *buf, off_t size, - uint64_t *tagptr, int flags) +static struct sk_entry *fio_net_prep_cmd(uint16_t opcode, void *buf, off_t size, + uint64_t *tagptr, int flags) { struct sk_entry *entry; @@ -1183,13 +1214,15 @@ static int handle_connection(struct sk_out *sk_out) handle_xmits(sk_out); close(sk_out->sk); + sk_out->sk = -1; + __sk_out_drop(sk_out); _exit(ret); } /* get the address on this host bound by the input socket, * whether it is ipv6 or ipv4 */ -int get_my_addr_str(int sk) +static int get_my_addr_str(int sk) { struct sockaddr_in6 myaddr6 = { 0, }; struct sockaddr_in myaddr4 = { 0, }; @@ -1223,7 +1256,7 @@ int get_my_addr_str(int sk) return 0; } -static int accept_loop(struct sk_out *sk_out, int listen_sk) +static int accept_loop(int listen_sk) { struct sockaddr_in addr; struct sockaddr_in6 addr6; @@ -1237,6 +1270,7 @@ static int accept_loop(struct sk_out *sk_out, int listen_sk) fio_set_fd_nonblocking(listen_sk, "server"); while (!exit_backend) { + struct sk_out *sk_out; const char *from; char buf[64]; pid_t pid; @@ -1286,18 +1320,27 @@ static int accept_loop(struct sk_out *sk_out, int listen_sk) dprint(FD_NET, "server: connect from %s\n", from); + sk_out = smalloc(sizeof(*sk_out)); sk_out->sk = sk; + INIT_FLIST_HEAD(&sk_out->list); + sk_out->lock = fio_mutex_init(FIO_MUTEX_UNLOCKED); + sk_out->wait = fio_mutex_init(FIO_MUTEX_LOCKED); pid = fork(); if (pid) { close(sk); fio_server_add_conn_pid(&conn_list, pid); - pthread_setspecific(sk_out_key, sk_out); continue; } - /* exits */ - get_my_addr_str(sk); /* if error, it's already logged, non-fatal */ + /* if error, it's already logged, non-fatal */ + get_my_addr_str(sk); + + /* + * Assign sk_out here, it'll be dropped in handle_connection() + * since that function calls _exit() when done + */ + sk_out_assign(sk_out); handle_connection(sk_out); } @@ -1469,6 +1512,34 @@ void fio_server_send_gs(struct group_run_stats *rs) fio_net_queue_cmd(FIO_NET_CMD_GS, &gs, sizeof(gs), NULL, SK_F_COPY); } +void fio_server_send_job_options(struct flist_head *opt_list, + unsigned int groupid) +{ + struct cmd_job_option pdu; + struct flist_head *entry; + + if (flist_empty(opt_list)) + return; + + flist_for_each(entry, opt_list) { + struct print_option *p; + + p = flist_entry(entry, struct print_option, list); + memset(&pdu, 0, sizeof(pdu)); + if (groupid == -1U) { + pdu.global = __cpu_to_le16(1); + pdu.groupid = 0; + } else { + pdu.global = 0; + pdu.groupid = __cpu_to_le16(groupid); + } + memcpy(pdu.name, p->name, strlen(p->name)); + if (p->value) + memcpy(pdu.value, p->value, strlen(p->value)); + fio_net_queue_cmd(FIO_NET_CMD_JOB_OPT, &pdu, sizeof(pdu), NULL, SK_F_COPY); + } +} + static void convert_agg(struct disk_util_agg *dst, struct disk_util_agg *src) { int i; @@ -1846,7 +1917,7 @@ static int fio_init_server_connection(void) log_info("fio: server listening on %s\n", bind_str); - if (listen(sk, 0) < 0) { + if (listen(sk, 4) < 0) { log_err("fio: listen: %s\n", strerror(errno)); close(sk); return -1; @@ -2039,9 +2110,15 @@ static void set_sig_handlers(void) static int fio_server(void) { - struct sk_out *sk_out; int sk, ret; + if (pthread_key_create(&sk_out_key, NULL)) { + log_err("fio: can't create sk_out backend key\n"); + return -1; + } + + pthread_setspecific(sk_out_key, NULL); + dprint(FD_NET, "starting server\n"); if (fio_handle_server_arg()) @@ -2053,17 +2130,7 @@ static int fio_server(void) set_sig_handlers(); - if (pthread_key_create(&sk_out_key, NULL)) - log_err("fio: can't create sk_out backend key\n"); - - sk_out = smalloc(sizeof(*sk_out)); - INIT_FLIST_HEAD(&sk_out->list); - sk_out->lock = fio_mutex_init(FIO_MUTEX_UNLOCKED); - sk_out->wait = fio_mutex_init(FIO_MUTEX_LOCKED); - - sk_out_assign(sk_out); - - ret = accept_loop(sk_out, sk); + ret = accept_loop(sk); close(sk); @@ -2074,8 +2141,6 @@ static int fio_server(void) if (bind_sock) free(bind_sock); - sk_out_drop(); - return ret; }