* propose an ACK be sent
*/
void __rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
- u32 serial, bool immediate)
+ u16 skew, u32 serial, bool immediate)
{
unsigned long expiry;
s8 prior = rxrpc_ack_priority[ack_reason];
/* update DELAY, IDLE, REQUESTED and PING_RESPONSE ACK serial
* numbers */
if (prior == rxrpc_ack_priority[call->ackr_reason]) {
- if (prior <= 4)
+ if (prior <= 4) {
+ call->ackr_skew = skew;
call->ackr_serial = serial;
+ }
if (immediate)
goto cancel_timer;
return;
_debug("cancel timer %%%u", serial);
try_to_del_timer_sync(&call->ack_timer);
read_lock_bh(&call->state_lock);
- if (call->state <= RXRPC_CALL_COMPLETE &&
+ if (call->state < RXRPC_CALL_COMPLETE &&
!test_and_set_bit(RXRPC_CALL_EV_ACK, &call->events))
rxrpc_queue_call(call);
read_unlock_bh(&call->state_lock);
* propose an ACK be sent, locking the call structure
*/
void rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
- u32 serial, bool immediate)
+ u16 skew, u32 serial, bool immediate)
{
s8 prior = rxrpc_ack_priority[ack_reason];
if (prior > rxrpc_ack_priority[call->ackr_reason]) {
spin_lock_bh(&call->lock);
- __rxrpc_propose_ACK(call, ack_reason, serial, immediate);
+ __rxrpc_propose_ACK(call, ack_reason, skew, serial, immediate);
spin_unlock_bh(&call->lock);
}
}
unsigned long resend_at)
{
read_lock_bh(&call->state_lock);
- if (call->state >= RXRPC_CALL_COMPLETE)
+ if (call->state == RXRPC_CALL_COMPLETE)
resend = 0;
if (resend & 1) {
stop = true;
sp->resend_at = jiffies + 3;
} else {
+ if (rxrpc_is_client_call(call))
+ rxrpc_expose_client_call(call);
sp->resend_at =
jiffies + rxrpc_resend_timeout;
}
_enter("%d,%d,%d",
call->acks_tail, call->acks_unacked, call->acks_head);
- if (call->state >= RXRPC_CALL_COMPLETE)
+ if (call->state == RXRPC_CALL_COMPLETE)
return;
resend = 0;
call->acks_hard++;
}
- wake_up(&call->tx_waitq);
+ wake_up(&call->waitq);
}
/*
skb = skb_dequeue(&call->rx_oos_queue);
if (skb) {
+ rxrpc_see_skb(skb);
sp = rxrpc_skb(skb);
_debug("drain OOS packet %d [%d]",
/* find out what the next packet is */
skb = skb_peek(&call->rx_oos_queue);
+ rxrpc_see_skb(skb);
if (skb)
call->rx_first_oos = rxrpc_skb(skb)->hdr.seq;
else
skb->destructor = rxrpc_packet_destructor;
ASSERTCMP(sp->call, ==, NULL);
sp->call = call;
- rxrpc_get_call(call);
- atomic_inc(&call->skb_count);
+ rxrpc_get_call_for_skb(call, skb);
/* insert into the buffer in sequence order */
spin_lock_bh(&call->lock);
mtu = min(ntohl(ackinfo.rxMTU), ntohl(ackinfo.maxMTU));
- peer = call->conn->params.peer;
+ peer = call->peer;
if (mtu < peer->maxdata) {
spin_lock_bh(&peer->lock);
peer->maxdata = mtu;
if (!skb)
return -EAGAIN;
+ rxrpc_see_skb(skb);
_net("deferred skb %p", skb);
sp = rxrpc_skb(skb);
/* secured packets must be verified and possibly decrypted */
if (call->conn->security->verify_packet(call, skb,
- _abort_code) < 0)
+ sp->hdr.seq,
+ sp->hdr.cksum) < 0)
goto protocol_error;
rxrpc_insert_oos_packet(call, skb);
if (ack.reason == RXRPC_ACK_PING) {
_proto("Rx ACK %%%u PING Request", latest);
rxrpc_propose_ACK(call, RXRPC_ACK_PING_RESPONSE,
- sp->hdr.serial, true);
+ skb->priority, sp->hdr.serial, true);
}
/* discard any out-of-order or duplicate ACKs */
break;
case RXRPC_CALL_SERVER_AWAIT_ACK:
_debug("srv complete");
- call->state = RXRPC_CALL_COMPLETE;
+ __rxrpc_call_completed(call);
post_ACK = true;
break;
case RXRPC_CALL_CLIENT_SEND_REQUEST:
_debug("post ACK");
skb->mark = RXRPC_SKB_MARK_FINAL_ACK;
sp->call = call;
- rxrpc_get_call(call);
- atomic_inc(&call->skb_count);
+ rxrpc_get_call_for_skb(call, skb);
spin_lock_bh(&call->lock);
if (rxrpc_queue_rcv_skb(call, skb, true, true) < 0)
BUG();
memset(sp, 0, sizeof(*sp));
sp->error = error;
sp->call = call;
- rxrpc_get_call(call);
- atomic_inc(&call->skb_count);
+ rxrpc_get_call_for_skb(call, skb);
spin_lock_bh(&call->lock);
ret = rxrpc_queue_rcv_skb(call, skb, true, fatal);
}
/*
- * handle background processing of incoming call packets and ACK / abort
- * generation
+ * Handle background processing of incoming call packets and ACK / abort
+ * generation. A ref on the call is donated to us by whoever queued the work
+ * item.
*/
void rxrpc_process_call(struct work_struct *work)
{
unsigned long bits;
__be32 data, pad;
size_t len;
+ bool requeue = false;
int loop, nbit, ioc, ret, mtu;
u32 serial, abort_code = RX_PROTOCOL_ERROR;
u8 *acks = NULL;
+ rxrpc_see_call(call);
+
//printk("\n--------------------\n");
_enter("{%d,%s,%lx} [%lu]",
call->debug_id, rxrpc_call_states[call->state], call->events,
(jiffies - call->creation_jif) / (HZ / 10));
- if (test_and_set_bit(RXRPC_CALL_PROC_BUSY, &call->flags)) {
- _debug("XXXXXXXXXXXXX RUNNING ON MULTIPLE CPUS XXXXXXXXXXXXX");
+ if (call->state >= RXRPC_CALL_COMPLETE) {
+ rxrpc_put_call(call, rxrpc_call_put);
return;
}
/* there's a good chance we're going to have to send a message, so set
* one up in advance */
- msg.msg_name = &call->conn->params.peer->srx.transport;
- msg.msg_namelen = call->conn->params.peer->srx.transport_len;
+ msg.msg_name = &call->peer->srx.transport;
+ msg.msg_namelen = call->peer->srx.transport_len;
msg.msg_control = NULL;
msg.msg_controllen = 0;
msg.msg_flags = 0;
/* deal with events of a final nature */
if (test_bit(RXRPC_CALL_EV_RCVD_ERROR, &call->events)) {
enum rxrpc_skb_mark mark;
- int error;
clear_bit(RXRPC_CALL_EV_CONN_ABORT, &call->events);
clear_bit(RXRPC_CALL_EV_REJECT_BUSY, &call->events);
clear_bit(RXRPC_CALL_EV_ABORT, &call->events);
- error = call->error_report;
- if (error < RXRPC_LOCAL_ERROR_OFFSET) {
+ if (call->completion == RXRPC_CALL_NETWORK_ERROR) {
mark = RXRPC_SKB_MARK_NET_ERROR;
- _debug("post net error %d", error);
+ _debug("post net error %d", call->error);
} else {
mark = RXRPC_SKB_MARK_LOCAL_ERROR;
- error -= RXRPC_LOCAL_ERROR_OFFSET;
- _debug("post net local error %d", error);
+ _debug("post net local error %d", call->error);
}
- if (rxrpc_post_message(call, mark, error, true) < 0)
+ if (rxrpc_post_message(call, mark, call->error, true) < 0)
goto no_mem;
clear_bit(RXRPC_CALL_EV_RCVD_ERROR, &call->events);
goto kill_ACKs;
}
if (test_bit(RXRPC_CALL_EV_CONN_ABORT, &call->events)) {
- ASSERTCMP(call->state, >, RXRPC_CALL_COMPLETE);
+ ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE);
clear_bit(RXRPC_CALL_EV_REJECT_BUSY, &call->events);
clear_bit(RXRPC_CALL_EV_ABORT, &call->events);
_debug("post conn abort");
if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
- call->conn->error, true) < 0)
+ call->error, true) < 0)
goto no_mem;
clear_bit(RXRPC_CALL_EV_CONN_ABORT, &call->events);
goto kill_ACKs;
}
if (test_bit(RXRPC_CALL_EV_ABORT, &call->events)) {
- ASSERTCMP(call->state, >, RXRPC_CALL_COMPLETE);
+ ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE);
if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
- ECONNABORTED, true) < 0)
+ call->error, true) < 0)
goto no_mem;
whdr.type = RXRPC_PACKET_TYPE_ABORT;
- data = htonl(call->local_abort);
+ data = htonl(call->abort_code);
iov[1].iov_base = &data;
iov[1].iov_len = sizeof(data);
genbit = RXRPC_CALL_EV_ABORT;
}
if (test_bit(RXRPC_CALL_EV_LIFE_TIMER, &call->events)) {
- write_lock_bh(&call->state_lock);
- if (call->state <= RXRPC_CALL_COMPLETE) {
- call->state = RXRPC_CALL_LOCALLY_ABORTED;
- call->local_abort = RX_CALL_TIMEOUT;
- set_bit(RXRPC_CALL_EV_ABORT, &call->events);
- }
- write_unlock_bh(&call->state_lock);
+ rxrpc_abort_call("EXP", call, 0, RX_CALL_TIMEOUT, ETIME);
_debug("post timeout");
if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
/* deal with assorted inbound messages */
if (!skb_queue_empty(&call->rx_queue)) {
- switch (rxrpc_process_rx_queue(call, &abort_code)) {
+ ret = rxrpc_process_rx_queue(call, &abort_code);
+ switch (ret) {
case 0:
case -EAGAIN:
break;
case -EKEYEXPIRED:
case -EKEYREJECTED:
case -EPROTO:
- rxrpc_abort_call(call, abort_code);
+ rxrpc_abort_call("PRO", call, 0, abort_code, -ret);
goto kill_ACKs;
}
}
spin_lock_bh(&call->lock);
if (call->state == RXRPC_CALL_SERVER_SECURING) {
+ struct rxrpc_sock *rx;
_debug("securing");
- write_lock(&call->socket->call_lock);
- if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
- !test_bit(RXRPC_CALL_EV_RELEASE, &call->events)) {
- _debug("not released");
- call->state = RXRPC_CALL_SERVER_ACCEPTING;
- list_move_tail(&call->accept_link,
- &call->socket->acceptq);
+ rcu_read_lock();
+ rx = rcu_dereference(call->socket);
+ if (rx) {
+ write_lock(&rx->call_lock);
+ if (!test_bit(RXRPC_CALL_RELEASED, &call->flags)) {
+ _debug("not released");
+ call->state = RXRPC_CALL_SERVER_ACCEPTING;
+ list_move_tail(&call->accept_link,
+ &rx->acceptq);
+ }
+ write_unlock(&rx->call_lock);
}
- write_unlock(&call->socket->call_lock);
+ rcu_read_unlock();
read_lock(&call->state_lock);
if (call->state < RXRPC_CALL_COMPLETE)
set_bit(RXRPC_CALL_EV_POST_ACCEPT, &call->events);
goto maybe_reschedule;
}
- if (test_bit(RXRPC_CALL_EV_RELEASE, &call->events)) {
- rxrpc_release_call(call);
- clear_bit(RXRPC_CALL_EV_RELEASE, &call->events);
- }
-
/* other events may have been raised since we started checking */
goto maybe_reschedule;
send_ACK_with_skew:
- ack.maxSkew = htons(atomic_read(&call->conn->hi_serial) -
- ntohl(ack.serial));
+ ack.maxSkew = htons(call->ackr_skew);
send_ACK:
- mtu = call->conn->params.peer->if_mtu;
- mtu -= call->conn->params.peer->hdrsize;
+ mtu = call->peer->if_mtu;
+ mtu -= call->peer->hdrsize;
ackinfo.maxMTU = htonl(mtu);
ackinfo.rwind = htonl(rxrpc_rx_window_size);
&msg, iov, ioc, len);
if (ret < 0) {
_debug("sendmsg failed: %d", ret);
- read_lock_bh(&call->state_lock);
- if (call->state < RXRPC_CALL_DEAD)
- rxrpc_queue_call(call);
- read_unlock_bh(&call->state_lock);
+ if (call->state < RXRPC_CALL_COMPLETE)
+ requeue = true;
goto error;
}
goto kill_ACKs;
case RXRPC_CALL_EV_ACK_FINAL:
- write_lock_bh(&call->state_lock);
- if (call->state == RXRPC_CALL_CLIENT_FINAL_ACK)
- call->state = RXRPC_CALL_COMPLETE;
- write_unlock_bh(&call->state_lock);
+ rxrpc_call_completed(call);
goto kill_ACKs;
default:
case RXRPC_CALL_SERVER_ACK_REQUEST:
_debug("start ACK timer");
rxrpc_propose_ACK(call, RXRPC_ACK_DELAY,
- call->ackr_serial, false);
+ call->ackr_skew, call->ackr_serial,
+ false);
default:
break;
}
kill_ACKs:
del_timer_sync(&call->ack_timer);
- if (test_and_clear_bit(RXRPC_CALL_EV_ACK_FINAL, &call->events))
- rxrpc_put_call(call);
clear_bit(RXRPC_CALL_EV_ACK, &call->events);
maybe_reschedule:
if (call->events || !skb_queue_empty(&call->rx_queue)) {
- read_lock_bh(&call->state_lock);
- if (call->state < RXRPC_CALL_DEAD)
- rxrpc_queue_call(call);
- read_unlock_bh(&call->state_lock);
- }
-
- /* don't leave aborted connections on the accept queue */
- if (call->state >= RXRPC_CALL_COMPLETE &&
- !list_empty(&call->accept_link)) {
- _debug("X unlinking once-pending call %p { e=%lx f=%lx c=%x }",
- call, call->events, call->flags, call->conn->proto.cid);
-
- read_lock_bh(&call->state_lock);
- if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
- !test_and_set_bit(RXRPC_CALL_EV_RELEASE, &call->events))
- rxrpc_queue_call(call);
- read_unlock_bh(&call->state_lock);
+ if (call->state < RXRPC_CALL_COMPLETE)
+ requeue = true;
}
error:
- clear_bit(RXRPC_CALL_PROC_BUSY, &call->flags);
kfree(acks);
- /* because we don't want two CPUs both processing the work item for one
- * call at the same time, we use a flag to note when it's busy; however
- * this means there's a race between clearing the flag and setting the
- * work pending bit and the work item being processed again */
- if (call->events && !work_pending(&call->processor)) {
+ if ((requeue || call->events) && !work_pending(&call->processor)) {
_debug("jumpstart %x", call->conn->proto.cid);
- rxrpc_queue_call(call);
+ __rxrpc_queue_call(call);
+ } else {
+ rxrpc_put_call(call, rxrpc_call_put);
}
_leave("");