};
struct sk_out {
+ unsigned int refs;
+
int sk;
struct fio_mutex *lock;
struct flist_head list;
fio_mutex_up(sk_out->lock);
}
+void sk_out_assign(struct sk_out *sk_out)
+{
+ if (!sk_out)
+ return;
+
+ sk_lock(sk_out);
+ sk_out->refs++;
+ sk_unlock(sk_out);
+ pthread_setspecific(sk_out_key, sk_out);
+}
+
+static void __sk_out_drop(struct sk_out *sk_out)
+{
+ fio_mutex_remove(sk_out->lock);
+ fio_mutex_remove(sk_out->wait);
+ sfree(sk_out);
+}
+
+void sk_out_drop(void)
+{
+ struct sk_out *sk_out = pthread_getspecific(sk_out_key);
+
+ if (sk_out) {
+ int refs;
+
+ sk_lock(sk_out);
+ refs = --sk_out->refs;
+ sk_unlock(sk_out);
+
+ if (!refs)
+ __sk_out_drop(sk_out);
+
+ pthread_setspecific(sk_out_key, NULL);
+ }
+}
+
const char *fio_server_op(unsigned int op)
{
static char buf[32];
if (!total_len)
return 0;
- if (errno)
- return -errno;
-
return 1;
}
static int handle_run_cmd(struct sk_out *sk_out, struct flist_head *job_list,
struct fio_net_cmd *cmd)
{
- struct backend_data data;
pid_t pid;
int ret;
return 0;
}
- data.key = sk_out_key;
- data.ptr = sk_out;
- //pthread_setspecific(sk_out_key, sk_out);
- ret = fio_backend(&data);
+ ret = fio_backend(sk_out);
free_threads_shm();
_exit(ret);
}
sfree(entry);
}
-static void send_vec_entry(struct sk_out *sk_out, struct sk_entry *first)
+static void entry_set_flags_tag(struct sk_entry *entry, struct flist_head *list,
+ unsigned int *flags, uint64_t *tag)
{
- uint64_t tag;
- int flags;
-
- if (!flist_empty(&first->next))
- flags = FIO_NET_CMD_F_MORE;
+ if (!flist_empty(list))
+ *flags = FIO_NET_CMD_F_MORE;
else
- flags = 0;
+ *flags = 0;
- if (first->tagptr)
- tag = *first->tagptr;
+ if (entry->tagptr)
+ *tag = *entry->tagptr;
else
- tag = 0;
+ *tag = 0;
+}
+
+static int send_vec_entry(struct sk_out *sk_out, struct sk_entry *first)
+{
+ unsigned int flags;
+ uint64_t tag;
+ int ret;
- fio_send_cmd_ext_pdu(sk_out->sk, first->opcode, first->buf, first->size, tag, flags);
+ entry_set_flags_tag(first, &first->next, &flags, &tag);
+
+ ret = fio_send_cmd_ext_pdu(sk_out->sk, first->opcode, first->buf, first->size, tag, flags);
while (!flist_empty(&first->next)) {
struct sk_entry *next;
next = flist_first_entry(&first->next, struct sk_entry, list);
flist_del_init(&next->list);
- if (flist_empty(&first->next))
- flags = 0;
- if (next->tagptr)
- tag = *next->tagptr;
- else
- tag = 0;
+ entry_set_flags_tag(next, &first->next, &flags, &tag);
- fio_send_cmd_ext_pdu(sk_out->sk, next->opcode, next->buf, next->size, tag, flags);
+ ret += fio_send_cmd_ext_pdu(sk_out->sk, next->opcode, next->buf, next->size, tag, flags);
finish_entry(next);
}
+
+ return ret;
}
-static void handle_sk_entry(struct sk_out *sk_out, struct sk_entry *entry)
+static int handle_sk_entry(struct sk_out *sk_out, struct sk_entry *entry)
{
+ int ret;
+
if (entry->flags & SK_F_VEC)
- send_vec_entry(sk_out, entry);
+ ret = send_vec_entry(sk_out, entry);
if (entry->flags & SK_F_SIMPLE) {
uint64_t tag = 0;
if (entry->tagptr)
tag = *entry->tagptr;
- fio_net_send_simple_cmd(sk_out->sk, entry->opcode, tag, NULL);
+ ret = fio_net_send_simple_cmd(sk_out->sk, entry->opcode, tag, NULL);
} else
- fio_net_send_cmd(sk_out->sk, entry->opcode, entry->buf, entry->size, entry->tagptr, NULL);
+ ret = fio_net_send_cmd(sk_out->sk, entry->opcode, entry->buf, entry->size, entry->tagptr, NULL);
+
+ if (ret)
+ log_err("fio: failed handling cmd %s\n", fio_server_op(entry->opcode));
finish_entry(entry);
+ return ret;
}
-static void handle_xmits(struct sk_out *sk_out)
+static int handle_xmits(struct sk_out *sk_out)
{
struct sk_entry *entry;
FLIST_HEAD(list);
+ int ret = 0;
sk_lock(sk_out);
if (flist_empty(&sk_out->list)) {
sk_unlock(sk_out);
- return;
+ return 0;
}
flist_splice_init(&sk_out->list, &list);
while (!flist_empty(&list)) {
entry = flist_entry(list.next, struct sk_entry, list);
flist_del(&entry->list);
- handle_sk_entry(sk_out, entry);
+ ret += handle_sk_entry(sk_out, entry);
}
+
+ return ret;
}
static int handle_connection(struct sk_out *sk_out)
return 0;
}
-static int accept_loop(int listen_sk)
+static int accept_loop(struct sk_out *sk_out, int listen_sk)
{
struct sockaddr_in addr;
struct sockaddr_in6 addr6;
socklen_t len = use_ipv6 ? sizeof(addr6) : sizeof(addr);
struct pollfd pfd;
int ret = 0, sk, exitval = 0;
- struct sk_out *sk_out;
FLIST_HEAD(conn_list);
dprint(FD_NET, "server enter accept loop\n");
fio_set_fd_nonblocking(listen_sk, "server");
- sk_out = smalloc(sizeof(*sk_out));
- INIT_FLIST_HEAD(&sk_out->list);
- sk_out->lock = fio_mutex_init(FIO_MUTEX_UNLOCKED);
- sk_out->wait = fio_mutex_init(FIO_MUTEX_LOCKED);
-
- pthread_setspecific(sk_out_key, sk_out);
-
while (!exit_backend) {
const char *from;
char buf[64];
handle_connection(sk_out);
}
-#if 0
- fio_mutex_remove(sk_out->lock);
- fio_mutex_remove(sk_out->wait);
- sfree(sk_out);
- pthread_setspecific(sk_out_key, NULL);
-#endif
-
return exitval;
}
static int fio_server(void)
{
+ struct sk_out *sk_out;
int sk, ret;
dprint(FD_NET, "starting server\n");
if (pthread_key_create(&sk_out_key, NULL))
log_err("fio: can't create sk_out backend key\n");
- ret = accept_loop(sk);
+ sk_out = smalloc(sizeof(*sk_out));
+ INIT_FLIST_HEAD(&sk_out->list);
+ sk_out->lock = fio_mutex_init(FIO_MUTEX_UNLOCKED);
+ sk_out->wait = fio_mutex_init(FIO_MUTEX_LOCKED);
+
+ sk_out_assign(sk_out);
+
+ ret = accept_loop(sk_out, sk);
close(sk);
if (bind_sock)
free(bind_sock);
+ sk_out_drop();
+
return ret;
}