rxrpc: Don't use sk->sk_receive_queue.lock to guard socket state changes
[linux-block.git] / net / rxrpc / io_thread.c
CommitLineData
96b2d69b
DH
1// SPDX-License-Identifier: GPL-2.0-or-later
2/* RxRPC packet reception
3 *
a275da62 4 * Copyright (C) 2007, 2016, 2022 Red Hat, Inc. All Rights Reserved.
96b2d69b
DH
5 * Written by David Howells (dhowells@redhat.com)
6 */
7
8#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
9
10#include "ar-internal.h"
11
446b3e14
DH
12/*
13 * handle data received on the local endpoint
14 * - may be called in interrupt context
15 *
16 * [!] Note that as this is called from the encap_rcv hook, the socket is not
17 * held locked by the caller and nothing prevents sk_user_data on the UDP from
18 * being cleared in the middle of processing this function.
19 *
20 * Called with the RCU read lock held from the IP layer via UDP.
21 */
22int rxrpc_encap_rcv(struct sock *udp_sk, struct sk_buff *skb)
23{
24 struct rxrpc_local *local = rcu_dereference_sk_user_data(udp_sk);
25
26 if (unlikely(!local)) {
27 kfree_skb(skb);
28 return 0;
29 }
30 if (skb->tstamp == 0)
31 skb->tstamp = ktime_get_real();
32
33 skb->mark = RXRPC_SKB_MARK_PACKET;
34 rxrpc_new_skb(skb, rxrpc_skb_new_encap_rcv);
35 skb_queue_tail(&local->rx_queue, skb);
36 rxrpc_wake_up_io_thread(local);
37 return 0;
38}
39
ff734825
DH
40/*
41 * Handle an error received on the local endpoint.
42 */
43void rxrpc_error_report(struct sock *sk)
44{
45 struct rxrpc_local *local;
46 struct sk_buff *skb;
47
48 rcu_read_lock();
49 local = rcu_dereference_sk_user_data(sk);
50 if (unlikely(!local)) {
51 rcu_read_unlock();
52 return;
53 }
54
55 while ((skb = skb_dequeue(&sk->sk_error_queue))) {
56 skb->mark = RXRPC_SKB_MARK_ERROR;
57 rxrpc_new_skb(skb, rxrpc_skb_new_error_report);
58 skb_queue_tail(&local->rx_queue, skb);
59 }
60
61 rxrpc_wake_up_io_thread(local);
62 rcu_read_unlock();
63}
64
96b2d69b
DH
65/*
66 * post connection-level events to the connection
67 * - this includes challenges, responses, some aborts and call terminal packet
68 * retransmission.
69 */
70static void rxrpc_post_packet_to_conn(struct rxrpc_connection *conn,
71 struct sk_buff *skb)
72{
73 _enter("%p,%p", conn, skb);
74
75 skb_queue_tail(&conn->rx_queue, skb);
76 rxrpc_queue_conn(conn, rxrpc_conn_queue_rx_work);
77}
78
79/*
80 * post endpoint-level events to the local endpoint
81 * - this includes debug and version messages
82 */
83static void rxrpc_post_packet_to_local(struct rxrpc_local *local,
84 struct sk_buff *skb)
85{
86 _enter("%p,%p", local, skb);
87
88 if (rxrpc_get_local_maybe(local, rxrpc_local_get_queue)) {
89 skb_queue_tail(&local->event_queue, skb);
90 rxrpc_queue_local(local);
91 } else {
92 rxrpc_free_skb(skb, rxrpc_skb_put_input);
93 }
94}
95
96/*
97 * put a packet up for transport-level abort
98 */
99static void rxrpc_reject_packet(struct rxrpc_local *local, struct sk_buff *skb)
100{
101 if (rxrpc_get_local_maybe(local, rxrpc_local_get_queue)) {
102 skb_queue_tail(&local->reject_queue, skb);
103 rxrpc_queue_local(local);
104 } else {
105 rxrpc_free_skb(skb, rxrpc_skb_put_input);
106 }
107}
108
109/*
110 * Extract the wire header from a packet and translate the byte order.
111 */
112static noinline
113int rxrpc_extract_header(struct rxrpc_skb_priv *sp, struct sk_buff *skb)
114{
115 struct rxrpc_wire_header whdr;
116
117 /* dig out the RxRPC connection details */
118 if (skb_copy_bits(skb, 0, &whdr, sizeof(whdr)) < 0) {
119 trace_rxrpc_rx_eproto(NULL, sp->hdr.serial,
120 tracepoint_string("bad_hdr"));
121 return -EBADMSG;
122 }
123
124 memset(sp, 0, sizeof(*sp));
125 sp->hdr.epoch = ntohl(whdr.epoch);
126 sp->hdr.cid = ntohl(whdr.cid);
127 sp->hdr.callNumber = ntohl(whdr.callNumber);
128 sp->hdr.seq = ntohl(whdr.seq);
129 sp->hdr.serial = ntohl(whdr.serial);
130 sp->hdr.flags = whdr.flags;
131 sp->hdr.type = whdr.type;
132 sp->hdr.userStatus = whdr.userStatus;
133 sp->hdr.securityIndex = whdr.securityIndex;
134 sp->hdr._rsvd = ntohs(whdr._rsvd);
135 sp->hdr.serviceId = ntohs(whdr.serviceId);
136 return 0;
137}
138
139/*
140 * Extract the abort code from an ABORT packet and stash it in skb->priority.
141 */
142static bool rxrpc_extract_abort(struct sk_buff *skb)
143{
144 __be32 wtmp;
145
146 if (skb_copy_bits(skb, sizeof(struct rxrpc_wire_header),
147 &wtmp, sizeof(wtmp)) < 0)
148 return false;
149 skb->priority = ntohl(wtmp);
150 return true;
151}
152
153/*
446b3e14 154 * Process packets received on the local endpoint
96b2d69b 155 */
446b3e14 156static int rxrpc_input_packet(struct rxrpc_local *local, struct sk_buff *skb)
96b2d69b 157{
96b2d69b
DH
158 struct rxrpc_connection *conn;
159 struct rxrpc_channel *chan;
160 struct rxrpc_call *call = NULL;
161 struct rxrpc_skb_priv *sp;
162 struct rxrpc_peer *peer = NULL;
163 struct rxrpc_sock *rx = NULL;
164 unsigned int channel;
165
96b2d69b
DH
166 if (skb->tstamp == 0)
167 skb->tstamp = ktime_get_real();
168
96b2d69b
DH
169 skb_pull(skb, sizeof(struct udphdr));
170
171 /* The UDP protocol already released all skb resources;
172 * we are free to add our own data there.
173 */
174 sp = rxrpc_skb(skb);
175
176 /* dig out the RxRPC connection details */
177 if (rxrpc_extract_header(sp, skb) < 0)
178 goto bad_message;
179
180 if (IS_ENABLED(CONFIG_AF_RXRPC_INJECT_LOSS)) {
181 static int lose;
182 if ((lose++ & 7) == 7) {
183 trace_rxrpc_rx_lose(sp);
184 rxrpc_free_skb(skb, rxrpc_skb_put_lose);
185 return 0;
186 }
187 }
188
189 if (skb->tstamp == 0)
190 skb->tstamp = ktime_get_real();
191 trace_rxrpc_rx_packet(sp);
192
193 switch (sp->hdr.type) {
194 case RXRPC_PACKET_TYPE_VERSION:
195 if (rxrpc_to_client(sp))
196 goto discard;
197 rxrpc_post_packet_to_local(local, skb);
198 goto out;
199
200 case RXRPC_PACKET_TYPE_BUSY:
201 if (rxrpc_to_server(sp))
202 goto discard;
203 fallthrough;
204 case RXRPC_PACKET_TYPE_ACK:
205 case RXRPC_PACKET_TYPE_ACKALL:
206 if (sp->hdr.callNumber == 0)
207 goto bad_message;
208 break;
209 case RXRPC_PACKET_TYPE_ABORT:
210 if (!rxrpc_extract_abort(skb))
211 return true; /* Just discard if malformed */
212 break;
213
214 case RXRPC_PACKET_TYPE_DATA:
215 if (sp->hdr.callNumber == 0 ||
216 sp->hdr.seq == 0)
217 goto bad_message;
218
219 /* Unshare the packet so that it can be modified for in-place
220 * decryption.
221 */
222 if (sp->hdr.securityIndex != 0) {
223 struct sk_buff *nskb = skb_unshare(skb, GFP_ATOMIC);
224 if (!nskb) {
225 rxrpc_eaten_skb(skb, rxrpc_skb_eaten_by_unshare_nomem);
226 goto out;
227 }
228
229 if (nskb != skb) {
230 rxrpc_eaten_skb(skb, rxrpc_skb_eaten_by_unshare);
231 skb = nskb;
232 rxrpc_new_skb(skb, rxrpc_skb_new_unshared);
233 sp = rxrpc_skb(skb);
234 }
235 }
236 break;
237
238 case RXRPC_PACKET_TYPE_CHALLENGE:
239 if (rxrpc_to_server(sp))
240 goto discard;
241 break;
242 case RXRPC_PACKET_TYPE_RESPONSE:
243 if (rxrpc_to_client(sp))
244 goto discard;
245 break;
246
247 /* Packet types 9-11 should just be ignored. */
248 case RXRPC_PACKET_TYPE_PARAMS:
249 case RXRPC_PACKET_TYPE_10:
250 case RXRPC_PACKET_TYPE_11:
251 goto discard;
252
253 default:
254 goto bad_message;
255 }
256
257 if (sp->hdr.serviceId == 0)
258 goto bad_message;
259
260 if (rxrpc_to_server(sp)) {
261 /* Weed out packets to services we're not offering. Packets
262 * that would begin a call are explicitly rejected and the rest
263 * are just discarded.
264 */
265 rx = rcu_dereference(local->service);
266 if (!rx || (sp->hdr.serviceId != rx->srx.srx_service &&
267 sp->hdr.serviceId != rx->second_service)) {
268 if (sp->hdr.type == RXRPC_PACKET_TYPE_DATA &&
269 sp->hdr.seq == 1)
270 goto unsupported_service;
271 goto discard;
272 }
273 }
274
275 conn = rxrpc_find_connection_rcu(local, skb, &peer);
276 if (conn) {
277 if (sp->hdr.securityIndex != conn->security_ix)
278 goto wrong_security;
279
280 if (sp->hdr.serviceId != conn->service_id) {
281 int old_id;
282
283 if (!test_bit(RXRPC_CONN_PROBING_FOR_UPGRADE, &conn->flags))
284 goto reupgrade;
285 old_id = cmpxchg(&conn->service_id, conn->orig_service_id,
286 sp->hdr.serviceId);
287
288 if (old_id != conn->orig_service_id &&
289 old_id != sp->hdr.serviceId)
290 goto reupgrade;
291 }
292
293 if (sp->hdr.callNumber == 0) {
294 /* Connection-level packet */
295 _debug("CONN %p {%d}", conn, conn->debug_id);
296 rxrpc_post_packet_to_conn(conn, skb);
297 goto out;
298 }
299
300 if ((int)sp->hdr.serial - (int)conn->hi_serial > 0)
301 conn->hi_serial = sp->hdr.serial;
302
303 /* Call-bound packets are routed by connection channel. */
304 channel = sp->hdr.cid & RXRPC_CHANNELMASK;
305 chan = &conn->channels[channel];
306
307 /* Ignore really old calls */
308 if (sp->hdr.callNumber < chan->last_call)
309 goto discard;
310
311 if (sp->hdr.callNumber == chan->last_call) {
312 if (chan->call ||
313 sp->hdr.type == RXRPC_PACKET_TYPE_ABORT)
314 goto discard;
315
316 /* For the previous service call, if completed
317 * successfully, we discard all further packets.
318 */
319 if (rxrpc_conn_is_service(conn) &&
320 chan->last_type == RXRPC_PACKET_TYPE_ACK)
321 goto discard;
322
323 /* But otherwise we need to retransmit the final packet
324 * from data cached in the connection record.
325 */
326 if (sp->hdr.type == RXRPC_PACKET_TYPE_DATA)
327 trace_rxrpc_rx_data(chan->call_debug_id,
328 sp->hdr.seq,
329 sp->hdr.serial,
330 sp->hdr.flags);
331 rxrpc_post_packet_to_conn(conn, skb);
332 goto out;
333 }
334
335 call = rcu_dereference(chan->call);
336
337 if (sp->hdr.callNumber > chan->call_id) {
338 if (rxrpc_to_client(sp))
339 goto reject_packet;
340 if (call)
341 rxrpc_input_implicit_end_call(rx, conn, call);
342 call = NULL;
343 }
344
345 if (call) {
346 if (sp->hdr.serviceId != call->service_id)
347 call->service_id = sp->hdr.serviceId;
348 if ((int)sp->hdr.serial - (int)call->rx_serial > 0)
349 call->rx_serial = sp->hdr.serial;
350 if (!test_bit(RXRPC_CALL_RX_HEARD, &call->flags))
351 set_bit(RXRPC_CALL_RX_HEARD, &call->flags);
352 }
353 }
354
355 if (!call || refcount_read(&call->ref) == 0) {
356 if (rxrpc_to_client(sp) ||
357 sp->hdr.type != RXRPC_PACKET_TYPE_DATA)
358 goto bad_message;
359 if (sp->hdr.seq != 1)
360 goto discard;
361 call = rxrpc_new_incoming_call(local, rx, skb);
362 if (!call)
363 goto reject_packet;
364 }
365
366 /* Process a call packet; this either discards or passes on the ref
367 * elsewhere.
368 */
369 rxrpc_input_call_packet(call, skb);
370 goto out;
371
372discard:
373 rxrpc_free_skb(skb, rxrpc_skb_put_input);
374out:
375 trace_rxrpc_rx_done(0, 0);
376 return 0;
377
378wrong_security:
379 trace_rxrpc_abort(0, "SEC", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
380 RXKADINCONSISTENCY, EBADMSG);
381 skb->priority = RXKADINCONSISTENCY;
382 goto post_abort;
383
384unsupported_service:
385 trace_rxrpc_abort(0, "INV", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
386 RX_INVALID_OPERATION, EOPNOTSUPP);
387 skb->priority = RX_INVALID_OPERATION;
388 goto post_abort;
389
390reupgrade:
391 trace_rxrpc_abort(0, "UPG", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
392 RX_PROTOCOL_ERROR, EBADMSG);
393 goto protocol_error;
394
395bad_message:
396 trace_rxrpc_abort(0, "BAD", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
397 RX_PROTOCOL_ERROR, EBADMSG);
398protocol_error:
399 skb->priority = RX_PROTOCOL_ERROR;
400post_abort:
401 skb->mark = RXRPC_SKB_MARK_REJECT_ABORT;
402reject_packet:
403 trace_rxrpc_rx_done(skb->mark, skb->priority);
404 rxrpc_reject_packet(local, skb);
405 _leave(" [badmsg]");
406 return 0;
407}
a275da62
DH
408
409/*
410 * I/O and event handling thread.
411 */
412int rxrpc_io_thread(void *data)
413{
414 struct sk_buff_head rx_queue;
415 struct rxrpc_local *local = data;
416 struct sk_buff *skb;
417
418 skb_queue_head_init(&rx_queue);
419
420 set_user_nice(current, MIN_NICE);
421
422 for (;;) {
423 rxrpc_inc_stat(local->rxnet, stat_io_loop);
424
425 /* Process received packets and errors. */
426 if ((skb = __skb_dequeue(&rx_queue))) {
446b3e14
DH
427 switch (skb->mark) {
428 case RXRPC_SKB_MARK_PACKET:
429 rcu_read_lock();
430 rxrpc_input_packet(local, skb);
431 rcu_read_unlock();
432 break;
ff734825
DH
433 case RXRPC_SKB_MARK_ERROR:
434 rxrpc_input_error(local, skb);
435 rxrpc_free_skb(skb, rxrpc_skb_put_error_report);
436 break;
446b3e14
DH
437 default:
438 WARN_ON_ONCE(1);
439 rxrpc_free_skb(skb, rxrpc_skb_put_unknown);
440 break;
441 }
a275da62
DH
442 continue;
443 }
444
445 if (!skb_queue_empty(&local->rx_queue)) {
446 spin_lock_irq(&local->rx_queue.lock);
447 skb_queue_splice_tail_init(&local->rx_queue, &rx_queue);
448 spin_unlock_irq(&local->rx_queue.lock);
449 continue;
450 }
451
452 set_current_state(TASK_INTERRUPTIBLE);
453 if (!skb_queue_empty(&local->rx_queue)) {
454 __set_current_state(TASK_RUNNING);
455 continue;
456 }
457
458 if (kthread_should_stop())
459 break;
460 schedule();
461 }
462
463 __set_current_state(TASK_RUNNING);
464 rxrpc_see_local(local, rxrpc_local_stop);
465 rxrpc_destroy_local(local);
466 local->io_thread = NULL;
467 rxrpc_see_local(local, rxrpc_local_stopped);
468 return 0;
469}