#include <net/af_rxrpc.h>
#include "ar-internal.h"
-enum rxrpc_command {
- RXRPC_CMD_SEND_DATA, /* send data message */
- RXRPC_CMD_SEND_ABORT, /* request abort generation */
- RXRPC_CMD_ACCEPT, /* [server] accept incoming call */
- RXRPC_CMD_REJECT_BUSY, /* [server] reject a call as busy */
-};
-
-struct rxrpc_send_params {
- s64 tx_total_len; /* Total Tx data length (if send data) */
- unsigned long user_call_ID; /* User's call ID */
- u32 abort_code; /* Abort code to Tx (if abort) */
- enum rxrpc_command command : 8; /* The command to implement */
- bool exclusive; /* Shared or exclusive call */
- bool upgrade; /* If the connection is upgradeable */
-};
+/*
+ * Wait for space to appear in the Tx queue or a signal to occur.
+ */
+static int rxrpc_wait_for_tx_window_intr(struct rxrpc_sock *rx,
+ struct rxrpc_call *call,
+ long *timeo)
+{
+ for (;;) {
+ set_current_state(TASK_INTERRUPTIBLE);
+ if (call->tx_top - call->tx_hard_ack <
+ min_t(unsigned int, call->tx_winsize,
+ call->cong_cwnd + call->cong_extra))
+ return 0;
+
+ if (call->state >= RXRPC_CALL_COMPLETE)
+ return call->error;
+
+ if (signal_pending(current))
+ return sock_intr_errno(*timeo);
+
+ trace_rxrpc_transmit(call, rxrpc_transmit_wait);
+ mutex_unlock(&call->user_mutex);
+ *timeo = schedule_timeout(*timeo);
+ if (mutex_lock_interruptible(&call->user_mutex) < 0)
+ return sock_intr_errno(*timeo);
+ }
+}
+
+/*
+ * Wait for space to appear in the Tx queue uninterruptibly, but with
+ * a timeout of 2*RTT if no progress was made and a signal occurred.
+ */
+static int rxrpc_wait_for_tx_window_nonintr(struct rxrpc_sock *rx,
+ struct rxrpc_call *call)
+{
+ rxrpc_seq_t tx_start, tx_win;
+ signed long rtt2, timeout;
+ u64 rtt;
+
+ rtt = READ_ONCE(call->peer->rtt);
+ rtt2 = nsecs_to_jiffies64(rtt) * 2;
+ if (rtt2 < 1)
+ rtt2 = 1;
+
+ timeout = rtt2;
+ tx_start = READ_ONCE(call->tx_hard_ack);
+
+ for (;;) {
+ set_current_state(TASK_UNINTERRUPTIBLE);
+
+ tx_win = READ_ONCE(call->tx_hard_ack);
+ if (call->tx_top - tx_win <
+ min_t(unsigned int, call->tx_winsize,
+ call->cong_cwnd + call->cong_extra))
+ return 0;
+
+ if (call->state >= RXRPC_CALL_COMPLETE)
+ return call->error;
+
+ if (timeout == 0 &&
+ tx_win == tx_start && signal_pending(current))
+ return -EINTR;
+
+ if (tx_win != tx_start) {
+ timeout = rtt2;
+ tx_start = tx_win;
+ }
+
+ trace_rxrpc_transmit(call, rxrpc_transmit_wait);
+ timeout = schedule_timeout(timeout);
+ }
+}
/*
* wait for space to appear in the transmit/ACK window
*/
static int rxrpc_wait_for_tx_window(struct rxrpc_sock *rx,
struct rxrpc_call *call,
- long *timeo)
+ long *timeo,
+ bool waitall)
{
DECLARE_WAITQUEUE(myself, current);
int ret;
add_wait_queue(&call->waitq, &myself);
- for (;;) {
- set_current_state(TASK_INTERRUPTIBLE);
- ret = 0;
- if (call->tx_top - call->tx_hard_ack <
- min_t(unsigned int, call->tx_winsize,
- call->cong_cwnd + call->cong_extra))
- break;
- if (call->state >= RXRPC_CALL_COMPLETE) {
- ret = call->error;
- break;
- }
- if (signal_pending(current)) {
- ret = sock_intr_errno(*timeo);
- break;
- }
-
- trace_rxrpc_transmit(call, rxrpc_transmit_wait);
- mutex_unlock(&call->user_mutex);
- *timeo = schedule_timeout(*timeo);
- if (mutex_lock_interruptible(&call->user_mutex) < 0) {
- ret = sock_intr_errno(*timeo);
- break;
- }
- }
+ if (waitall)
+ ret = rxrpc_wait_for_tx_window_nonintr(rx, call);
+ else
+ ret = rxrpc_wait_for_tx_window_intr(rx, call, timeo);
remove_wait_queue(&call->waitq, &myself);
set_current_state(TASK_RUNNING);
ktime_get_real());
if (!last)
break;
+ /* Fall through */
case RXRPC_CALL_SERVER_SEND_REPLY:
call->state = RXRPC_CALL_SERVER_AWAIT_ACK;
rxrpc_notify_end_tx(rx, call, notify_end_tx);
if (msg->msg_flags & MSG_DONTWAIT)
goto maybe_error;
ret = rxrpc_wait_for_tx_window(rx, call,
- &timeo);
+ &timeo,
+ msg->msg_flags & MSG_WAITALL);
if (ret < 0)
goto maybe_error;
}
if (msg->msg_flags & MSG_CMSG_COMPAT) {
if (len != sizeof(u32))
return -EINVAL;
- p->user_call_ID = *(u32 *)CMSG_DATA(cmsg);
+ p->call.user_call_ID = *(u32 *)CMSG_DATA(cmsg);
} else {
if (len != sizeof(unsigned long))
return -EINVAL;
- p->user_call_ID = *(unsigned long *)
+ p->call.user_call_ID = *(unsigned long *)
CMSG_DATA(cmsg);
}
got_user_ID = true;
break;
case RXRPC_TX_LENGTH:
- if (p->tx_total_len != -1 || len != sizeof(__s64))
+ if (p->call.tx_total_len != -1 || len != sizeof(__s64))
return -EINVAL;
- p->tx_total_len = *(__s64 *)CMSG_DATA(cmsg);
- if (p->tx_total_len < 0)
+ p->call.tx_total_len = *(__s64 *)CMSG_DATA(cmsg);
+ if (p->call.tx_total_len < 0)
return -EINVAL;
break;
if (!got_user_ID)
return -EINVAL;
- if (p->tx_total_len != -1 && p->command != RXRPC_CMD_SEND_DATA)
+ if (p->call.tx_total_len != -1 && p->command != RXRPC_CMD_SEND_DATA)
return -EINVAL;
_leave(" = 0");
return 0;
cp.exclusive = rx->exclusive | p->exclusive;
cp.upgrade = p->upgrade;
cp.service_id = srx->srx_service;
- call = rxrpc_new_client_call(rx, &cp, srx, p->user_call_ID,
- p->tx_total_len, GFP_KERNEL);
+ call = rxrpc_new_client_call(rx, &cp, srx, &p->call, GFP_KERNEL);
/* The socket is now unlocked */
_leave(" = %p\n", call);
int ret;
struct rxrpc_send_params p = {
- .tx_total_len = -1,
- .user_call_ID = 0,
- .abort_code = 0,
- .command = RXRPC_CMD_SEND_DATA,
- .exclusive = false,
- .upgrade = true,
+ .call.tx_total_len = -1,
+ .call.user_call_ID = 0,
+ .abort_code = 0,
+ .command = RXRPC_CMD_SEND_DATA,
+ .exclusive = false,
+ .upgrade = false,
};
_enter("");
ret = -EINVAL;
if (rx->sk.sk_state != RXRPC_SERVER_LISTENING)
goto error_release_sock;
- call = rxrpc_accept_call(rx, p.user_call_ID, NULL);
+ call = rxrpc_accept_call(rx, p.call.user_call_ID, NULL);
/* The socket is now unlocked. */
if (IS_ERR(call))
return PTR_ERR(call);
- rxrpc_put_call(call, rxrpc_call_put);
- return 0;
+ ret = 0;
+ goto out_put_unlock;
}
- call = rxrpc_find_call_by_user_ID(rx, p.user_call_ID);
+ call = rxrpc_find_call_by_user_ID(rx, p.call.user_call_ID);
if (!call) {
ret = -EBADSLT;
if (p.command != RXRPC_CMD_SEND_DATA)
goto error_put;
}
- if (p.tx_total_len != -1) {
+ if (p.call.tx_total_len != -1) {
ret = -EINVAL;
if (call->tx_total_len != -1 ||
call->tx_pending ||
call->tx_top != 0)
goto error_put;
- call->tx_total_len = p.tx_total_len;
+ call->tx_total_len = p.call.tx_total_len;
}
}
ret = rxrpc_send_data(rx, call, msg, len, NULL);
}
+out_put_unlock:
mutex_unlock(&call->user_mutex);
error_put:
rxrpc_put_call(call, rxrpc_call_put);