int refs;
sk_lock(sk_out);
+ assert(sk_out->refs != 0);
refs = --sk_out->refs;
sk_unlock(sk_out);
if (!refs) {
sk_out_free(sk_out);
+ pthread_setspecific(sk_out_key, NULL);
return 0;
}
}
struct sk_out *sk_out;
sk_out = pthread_getspecific(sk_out_key);
- if (!__sk_out_drop(sk_out))
- pthread_setspecific(sk_out_key, NULL);
+ __sk_out_drop(sk_out);
}
static void __fio_init_net_cmd(struct fio_net_cmd *cmd, uint16_t opcode,
return fio_sendv_data(sk, &iov, 1);
}
-static int fio_recv_data(int sk, void *p, unsigned int len)
+static int fio_recv_data(int sk, void *p, unsigned int len, bool wait)
{
+ int flags;
+
+ if (wait)
+ flags = MSG_WAITALL;
+ else
+ flags = OS_MSG_DONTWAIT;
+
do {
- int ret = recv(sk, p, len, MSG_WAITALL);
+ int ret = recv(sk, p, len, flags);
if (ret > 0) {
len -= ret;
continue;
} else if (!ret)
break;
- else if (errno == EAGAIN || errno == EINTR)
- continue;
- else
+ else if (errno == EAGAIN || errno == EINTR) {
+ if (wait)
+ continue;
+ break;
+ } else
break;
} while (!exit_backend);
/*
* Read (and defragment, if necessary) incoming commands
*/
-struct fio_net_cmd *fio_net_recv_cmd(int sk)
+struct fio_net_cmd *fio_net_recv_cmd(int sk, bool wait)
{
struct fio_net_cmd cmd, *tmp, *cmdret = NULL;
size_t cmd_size = 0, pdu_offset = 0;
void *pdu = NULL;
do {
- ret = fio_recv_data(sk, &cmd, sizeof(cmd));
+ ret = fio_recv_data(sk, &cmd, sizeof(cmd), wait);
if (ret)
break;
/* There's payload, get it */
pdu = (void *) cmdret->payload + pdu_offset;
- ret = fio_recv_data(sk, pdu, cmd.pdu_len);
+ ret = fio_recv_data(sk, pdu, cmd.pdu_len, wait);
if (ret)
break;
pid_t pid;
int ret;
+ sk_out_assign(sk_out);
+
fio_time_init();
set_genesis_time();
ret = fio_backend(sk_out);
free_threads_shm();
+ sk_out_drop();
_exit(ret);
}
struct all_io_list state;
state.threads = cpu_to_le64((uint64_t) 0);
- fio_net_queue_cmd(FIO_NET_CMD_VTRIGGER, &state, sizeof(state), NULL, SK_F_COPY);
+ fio_net_queue_cmd(FIO_NET_CMD_VTRIGGER, &state, sizeof(state), NULL, SK_F_COPY | SK_F_INLINE);
} else
- fio_net_queue_cmd(FIO_NET_CMD_VTRIGGER, rep, sz, NULL, SK_F_FREE);
+ fio_net_queue_cmd(FIO_NET_CMD_VTRIGGER, rep, sz, NULL, SK_F_FREE | SK_F_INLINE);
exec_trigger(buf);
return 0;
{
struct fio_net_cmd cmd;
struct iovec iov[2];
+ size_t this_len;
+ int ret;
iov[0].iov_base = (void *) &cmd;
iov[0].iov_len = sizeof(cmd);
- iov[1].iov_base = (void *) buf;
- iov[1].iov_len = size;
- __fio_init_net_cmd(&cmd, opcode, size, tag);
- cmd.flags = __cpu_to_le32(flags);
- fio_net_cmd_crc_pdu(&cmd, buf);
+ do {
+ uint32_t this_flags = flags;
+
+ this_len = size;
+ if (this_len > FIO_SERVER_MAX_FRAGMENT_PDU)
+ this_len = FIO_SERVER_MAX_FRAGMENT_PDU;
+
+ if (this_len < size)
+ this_flags |= FIO_NET_CMD_F_MORE;
+
+ __fio_init_net_cmd(&cmd, opcode, this_len, tag);
+ cmd.flags = __cpu_to_le32(this_flags);
+ fio_net_cmd_crc_pdu(&cmd, buf);
+
+ iov[1].iov_base = (void *) buf;
+ iov[1].iov_len = this_len;
+
+ ret = fio_sendv_data(sk, iov, 2);
+ size -= this_len;
+ buf += this_len;
+ } while (!ret && size);
- return fio_sendv_data(sk, iov, 2);
+ return ret;
}
static void finish_entry(struct sk_entry *entry)
if (ret < 0)
break;
- cmd = fio_net_recv_cmd(sk_out->sk);
+ cmd = fio_net_recv_cmd(sk_out->sk, true);
if (!cmd) {
ret = -1;
break;
}
int fio_server_get_verify_state(const char *name, int threadnumber,
- void **datap, int *version)
+ void **datap)
{
struct thread_io_list *s;
struct cmd_sendfile out;
struct cmd_reply *rep;
uint64_t tag;
void *data;
+ int ret;
dprint(FD_NET, "server: request verify state\n");
rep = smalloc(sizeof(*rep));
if (!rep) {
log_err("fio: smalloc pool too small\n");
- return 1;
+ return ENOMEM;
}
__fio_mutex_init(&rep->lock, FIO_MUTEX_LOCKED);
*/
if (fio_mutex_down_timeout(&rep->lock, 10000)) {
log_err("fio: timed out waiting for reply\n");
+ ret = ETIMEDOUT;
goto fail;
}
if (rep->error) {
- log_err("fio: failure on receiving state file: %s\n",
- strerror(rep->error));
+ log_err("fio: failure on receiving state file %s: %s\n",
+ out.path, strerror(rep->error));
+ ret = rep->error;
fail:
*datap = NULL;
sfree(rep);
fio_net_queue_quit();
- return 1;
+ return ret;
}
/*
* the header, and the thread_io_list checksum
*/
s = rep->data + sizeof(struct verify_state_hdr);
- if (verify_state_hdr(rep->data, s, version))
+ if (verify_state_hdr(rep->data, s)) {
+ ret = EILSEQ;
goto fail;
+ }
/*
* Don't need the header from now, copy just the thread_io_list
*/
+ ret = 0;
rep->size -= sizeof(struct verify_state_hdr);
data = malloc(rep->size);
memcpy(data, s, rep->size);
sfree(rep->data);
__fio_mutex_remove(&rep->lock);
sfree(rep);
- return 0;
+ return ret;
}
static int fio_init_server_ip(void)
return -1;
}
#ifdef SO_REUSEPORT
- if (setsockopt(sk, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) < 0) {
- log_err("fio: setsockopt(REUSEPORT): %s\n", strerror(errno));
- close(sk);
- return -1;
- }
+ /*
+ * Not fatal if fails, so just ignore it if that happens
+ */
+ setsockopt(sk, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt));
#endif
if (use_ipv6) {
sigaction(SIGINT, &act, NULL);
}
-static int fio_server(void)
+void fio_server_destroy_sk_key(void)
{
- int sk, ret;
+ pthread_key_delete(sk_out_key);
+}
+int fio_server_create_sk_key(void)
+{
if (pthread_key_create(&sk_out_key, NULL)) {
log_err("fio: can't create sk_out backend key\n");
- return -1;
+ return 1;
}
pthread_setspecific(sk_out_key, NULL);
+ return 0;
+}
+
+static int fio_server(void)
+{
+ int sk, ret;
dprint(FD_NET, "starting server\n");